• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 14188288034

01 Apr 2025 05:56AM UTC coverage: 21.809% (-0.2%) from 22.009%
14188288034

push

github

web-flow
ovn lb select the local chassis's backend prefer (#4894)

* add lb:option prefer_local_backend

Signed-off-by: clyi <clyi@alauda.io>

* support metallb underlay

Signed-off-by: clyi <clyi@alauda.io>

0 of 446 new or added lines in 8 files covered. (0.0%)

1 existing line in 1 file now uncovered.

10265 of 47068 relevant lines covered (21.81%)

0.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/controller/endpoint.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "time"
7

8
        v1 "k8s.io/api/core/v1"
9
        "k8s.io/apimachinery/pkg/api/errors"
10
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11
        "k8s.io/apimachinery/pkg/labels"
12
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
13
        "k8s.io/client-go/tools/cache"
14
        "k8s.io/klog/v2"
15

16
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
17
        "github.com/kubeovn/kube-ovn/pkg/util"
18
)
19

20
func (c *Controller) enqueueAddEndpoint(obj interface{}) {
×
21
        key := cache.MetaObjectToName(obj.(*v1.Endpoints)).String()
×
22
        klog.V(3).Infof("enqueue add endpoint %s", key)
×
23
        c.addOrUpdateEndpointQueue.Add(key)
×
24
}
×
25

26
func (c *Controller) enqueueUpdateEndpoint(oldObj, newObj interface{}) {
×
27
        oldEp := oldObj.(*v1.Endpoints)
×
28
        newEp := newObj.(*v1.Endpoints)
×
29
        if oldEp.ResourceVersion == newEp.ResourceVersion {
×
30
                return
×
31
        }
×
32

33
        if len(oldEp.Subsets) == 0 && len(newEp.Subsets) == 0 {
×
34
                return
×
35
        }
×
36

37
        key := cache.MetaObjectToName(newEp).String()
×
38
        klog.V(3).Infof("enqueue update endpoint %s", key)
×
39
        c.addOrUpdateEndpointQueue.Add(key)
×
40
}
41

42
func (c *Controller) handleUpdateEndpoint(key string) error {
×
43
        namespace, name, err := cache.SplitMetaNamespaceKey(key)
×
44
        if err != nil {
×
45
                utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
×
46
                return nil
×
47
        }
×
48

49
        c.epKeyMutex.LockKey(key)
×
50
        defer func() { _ = c.epKeyMutex.UnlockKey(key) }()
×
51
        klog.Infof("handle update endpoint %s", key)
×
52

×
53
        ep, err := c.endpointsLister.Endpoints(namespace).Get(name)
×
54
        if err != nil {
×
55
                if errors.IsNotFound(err) {
×
56
                        return nil
×
57
                }
×
58
                klog.Error(err)
×
59
                return err
×
60
        }
61

62
        cachedService, err := c.servicesLister.Services(namespace).Get(name)
×
63
        if err != nil {
×
64
                if errors.IsNotFound(err) {
×
65
                        return nil
×
66
                }
×
67
                klog.Error(err)
×
68
                return err
×
69
        }
70
        svc := cachedService.DeepCopy()
×
71

×
72
        var (
×
73
                pods                     []*v1.Pod
×
74
                lbVips                   []string
×
75
                vip, vpcName, subnetName string
×
76
                ok                       bool
×
77
                ignoreHealthCheck        = true
×
NEW
78
                isPreferLocalBackend     = false
×
79
        )
×
80

×
81
        if vip, ok = svc.Annotations[util.SwitchLBRuleVipsAnnotation]; ok {
×
82
                lbVips = []string{vip}
×
83

×
84
                for _, subset := range ep.Subsets {
×
85
                        for _, address := range subset.Addresses {
×
86
                                // TODO: IPv6
×
87
                                if util.CheckProtocol(vip) == kubeovnv1.ProtocolIPv4 &&
×
88
                                        address.TargetRef.Name != "" {
×
89
                                        ignoreHealthCheck = false
×
90
                                }
×
91
                        }
92
                }
93
        } else if lbVips = util.ServiceClusterIPs(*svc); len(lbVips) == 0 {
×
94
                return nil
×
95
        }
×
96

NEW
97
        if c.config.EnableLb && c.config.EnableOVNLBPreferLocal {
×
NEW
98
                if svc.Spec.Type == v1.ServiceTypeLoadBalancer && svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal {
×
NEW
99
                        if len(svc.Status.LoadBalancer.Ingress) > 0 {
×
NEW
100
                                for _, ingress := range svc.Status.LoadBalancer.Ingress {
×
NEW
101
                                        if ingress.IP != "" {
×
NEW
102
                                                lbVips = append(lbVips, ingress.IP)
×
NEW
103
                                        }
×
104
                                }
105
                        }
NEW
106
                        isPreferLocalBackend = true
×
NEW
107
                } else if svc.Spec.Type == v1.ServiceTypeClusterIP && svc.Spec.InternalTrafficPolicy != nil && *svc.Spec.InternalTrafficPolicy == v1.ServiceInternalTrafficPolicyLocal {
×
NEW
108
                        isPreferLocalBackend = true
×
NEW
109
                }
×
110
        }
111

112
        if pods, err = c.podsLister.Pods(namespace).List(labels.Set(svc.Spec.Selector).AsSelector()); err != nil {
×
113
                klog.Errorf("failed to get pods for service %s in namespace %s: %v", name, namespace, err)
×
114
                return err
×
115
        }
×
116
        for i, pod := range pods {
×
117
                if pod.Status.PodIP != "" || len(pod.Status.PodIPs) != 0 {
×
118
                        continue
×
119
                }
120

121
                for _, subset := range ep.Subsets {
×
122
                        for _, addr := range subset.Addresses {
×
123
                                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" || addr.TargetRef.Name != pod.Name {
×
124
                                        continue
×
125
                                }
126

127
                                p, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Get(context.Background(), pod.Name, metav1.GetOptions{})
×
128
                                if err != nil {
×
129
                                        klog.Errorf("failed to get pod %s/%s: %v", pod.Namespace, pod.Name, err)
×
130
                                        return err
×
131
                                }
×
132
                                pods[i] = p.DeepCopy()
×
133
                                break
×
134
                        }
135
                        if pods[i] != pod {
×
136
                                break
×
137
                        }
138
                }
139
        }
140

141
        vpcName, subnetName = c.getVpcSubnetName(pods, ep, svc)
×
142

×
143
        var (
×
144
                vpc    *kubeovnv1.Vpc
×
145
                svcVpc string
×
146
        )
×
147

×
148
        if vpc, err = c.vpcsLister.Get(vpcName); err != nil {
×
149
                klog.Errorf("failed to get vpc %s, %v", vpcName, err)
×
150
                return err
×
151
        }
×
152

153
        tcpLb, udpLb, sctpLb := vpc.Status.TCPLoadBalancer, vpc.Status.UDPLoadBalancer, vpc.Status.SctpLoadBalancer
×
154
        oldTCPLb, oldUDPLb, oldSctpLb := vpc.Status.TCPSessionLoadBalancer, vpc.Status.UDPSessionLoadBalancer, vpc.Status.SctpSessionLoadBalancer
×
155
        if svc.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
×
156
                tcpLb, udpLb, sctpLb, oldTCPLb, oldUDPLb, oldSctpLb = oldTCPLb, oldUDPLb, oldSctpLb, tcpLb, udpLb, sctpLb
×
157
        }
×
158

159
        for _, lbVip := range lbVips {
×
160
                for _, port := range svc.Spec.Ports {
×
161
                        var lb, oldLb string
×
162
                        switch port.Protocol {
×
163
                        case v1.ProtocolTCP:
×
164
                                lb, oldLb = tcpLb, oldTCPLb
×
165
                        case v1.ProtocolUDP:
×
166
                                lb, oldLb = udpLb, oldUDPLb
×
167
                        case v1.ProtocolSCTP:
×
168
                                lb, oldLb = sctpLb, oldSctpLb
×
169
                        }
170

171
                        var (
×
172
                                vip, checkIP             string
×
173
                                backends                 []string
×
174
                                ipPortMapping, externals map[string]string
×
175
                        )
×
176

×
177
                        if !ignoreHealthCheck {
×
178
                                if checkIP, err = c.getHealthCheckVip(subnetName, lbVip); err != nil {
×
179
                                        klog.Error(err)
×
180
                                        return err
×
181
                                }
×
182
                                externals = map[string]string{
×
183
                                        util.SwitchLBRuleSubnet: subnetName,
×
184
                                }
×
185
                        }
186

NEW
187
                        if isPreferLocalBackend {
×
NEW
188
                                // only use the ipportmapping's lsp to ip map when the backend is local
×
NEW
189
                                checkIP = util.MasqueradeCheckIP
×
NEW
190
                        }
×
NEW
191
                        isGenIPPortMapping := !ignoreHealthCheck || isPreferLocalBackend
×
NEW
192
                        ipPortMapping, backends = getIPPortMappingBackend(ep, pods, port, lbVip, checkIP, isGenIPPortMapping)
×
193
                        // for performance reason delete lb with no backends
×
194
                        if len(backends) != 0 {
×
195
                                vip = util.JoinHostPort(lbVip, port.Port)
×
196
                                klog.Infof("add vip endpoint %s, backends %v to LB %s", vip, backends, lb)
×
197
                                if err = c.OVNNbClient.LoadBalancerAddVip(lb, vip, backends...); err != nil {
×
198
                                        klog.Errorf("failed to add vip %s with backends %s to LB %s: %v", lbVip, backends, lb, err)
×
199
                                        return err
×
200
                                }
×
201

NEW
202
                                if isPreferLocalBackend && len(ipPortMapping) != 0 {
×
NEW
203
                                        if err = c.OVNNbClient.LoadBalancerUpdateIPPortMapping(lb, vip, ipPortMapping); err != nil {
×
NEW
204
                                                klog.Errorf("failed to update ip port mapping %s for vip %s to LB %s: %v", ipPortMapping, vip, lb, err)
×
NEW
205
                                                return err
×
NEW
206
                                        }
×
207
                                }
208

209
                                if !ignoreHealthCheck && len(ipPortMapping) != 0 {
×
210
                                        klog.Infof("add health check ip port mapping %v to LB %s", ipPortMapping, lb)
×
211
                                        if err = c.OVNNbClient.LoadBalancerAddHealthCheck(lb, vip, ignoreHealthCheck, ipPortMapping, externals); err != nil {
×
212
                                                klog.Errorf("failed to add health check for vip %s with ip port mapping %s to LB %s: %v", lbVip, ipPortMapping, lb, err)
×
213
                                                return err
×
214
                                        }
×
215
                                }
216
                        } else {
×
NEW
217
                                vip = util.JoinHostPort(lbVip, port.Port)
×
218
                                klog.V(3).Infof("delete vip endpoint %s from LB %s", vip, lb)
×
219
                                if err = c.OVNNbClient.LoadBalancerDeleteVip(lb, vip, ignoreHealthCheck); err != nil {
×
220
                                        klog.Errorf("failed to delete vip endpoint %s from LB %s: %v", vip, lb, err)
×
221
                                        return err
×
222
                                }
×
223

224
                                klog.V(3).Infof("delete vip endpoint %s from old LB %s", vip, oldLb)
×
225
                                if err = c.OVNNbClient.LoadBalancerDeleteVip(oldLb, vip, ignoreHealthCheck); err != nil {
×
226
                                        klog.Errorf("failed to delete vip %s from LB %s: %v", vip, oldLb, err)
×
227
                                        return err
×
228
                                }
×
229

NEW
230
                                if c.config.EnableOVNLBPreferLocal {
×
NEW
231
                                        if err := c.OVNNbClient.LoadBalancerDeleteIPPortMapping(lb, vip); err != nil {
×
NEW
232
                                                klog.Errorf("failed to delete ip port mapping for vip %s from LB %s: %v", vip, lb, err)
×
NEW
233
                                                return err
×
NEW
234
                                        }
×
NEW
235
                                        if err := c.OVNNbClient.LoadBalancerDeleteIPPortMapping(oldLb, vip); err != nil {
×
NEW
236
                                                klog.Errorf("failed to delete ip port mapping for vip %s from LB %s: %v", vip, lb, err)
×
NEW
237
                                                return err
×
NEW
238
                                        }
×
239
                                }
240
                        }
241
                }
242
        }
243

244
        if svcVpc = svc.Annotations[util.VpcAnnotation]; svcVpc != vpcName {
×
245
                patch := util.KVPatch{util.VpcAnnotation: vpcName}
×
246
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Services(namespace), svc.Name, patch); err != nil {
×
247
                        klog.Errorf("failed to patch service %s: %v", key, err)
×
248
                        return err
×
249
                }
×
250
        }
251

252
        return nil
×
253
}
254

255
func (c *Controller) getVpcSubnetName(pods []*v1.Pod, endpoints *v1.Endpoints, service *v1.Service) (string, string) {
×
256
        var (
×
257
                vpcName    string
×
258
                subnetName string
×
259
        )
×
260

×
261
        for _, pod := range pods {
×
262
                if len(pod.Annotations) == 0 {
×
263
                        continue
×
264
                }
265
                if subnetName == "" {
×
266
                        subnetName = pod.Annotations[util.LogicalSwitchAnnotation]
×
267
                }
×
268

269
        LOOP:
×
270
                for _, subset := range endpoints.Subsets {
×
271
                        for _, addr := range subset.Addresses {
×
272
                                if addr.IP == pod.Status.PodIP {
×
273
                                        if vpcName == "" {
×
274
                                                vpcName = pod.Annotations[util.LogicalRouterAnnotation]
×
275
                                        }
×
276
                                        if vpcName != "" {
×
277
                                                break LOOP
×
278
                                        }
279
                                }
280
                        }
281
                }
282

283
                if vpcName != "" && subnetName != "" {
×
284
                        break
×
285
                }
286
        }
287

288
        if vpcName == "" {
×
289
                if vpcName = service.Annotations[util.VpcAnnotation]; vpcName == "" {
×
290
                        vpcName = c.config.ClusterRouter
×
291
                }
×
292
        }
293

294
        if subnetName == "" {
×
295
                subnetName = util.DefaultSubnet
×
296
        }
×
297

298
        return vpcName, subnetName
×
299
}
300

301
// getHealthCheckVip get health check vip for load balancer, the vip name is the subnet name
302
// the vip is used to check the health of the backend pod
303
func (c *Controller) getHealthCheckVip(subnetName, lbVip string) (string, error) {
×
304
        var (
×
305
                needCreateHealthCheckVip bool
×
306
                checkVip                 *kubeovnv1.Vip
×
307
                checkIP                  string
×
308
                err                      error
×
309
        )
×
310
        vipName := subnetName
×
311
        checkVip, err = c.virtualIpsLister.Get(vipName)
×
312
        if err != nil {
×
313
                if errors.IsNotFound(err) {
×
314
                        needCreateHealthCheckVip = true
×
315
                } else {
×
316
                        klog.Errorf("failed to get health check vip %s, %v", vipName, err)
×
317
                        return "", err
×
318
                }
×
319
        }
320
        if needCreateHealthCheckVip {
×
321
                vip := &kubeovnv1.Vip{
×
322
                        ObjectMeta: metav1.ObjectMeta{
×
323
                                Name: vipName,
×
324
                        },
×
325
                        Spec: kubeovnv1.VipSpec{
×
326
                                Subnet: subnetName,
×
327
                        },
×
328
                }
×
329
                if _, err = c.config.KubeOvnClient.KubeovnV1().Vips().Create(context.Background(), vip, metav1.CreateOptions{}); err != nil {
×
330
                        klog.Errorf("failed to create health check vip %s, %v", vipName, err)
×
331
                        return "", err
×
332
                }
×
333

334
                // wait for vip created
335
                time.Sleep(1 * time.Second)
×
336
                checkVip, err = c.virtualIpsLister.Get(vipName)
×
337
                if err != nil {
×
338
                        klog.Errorf("failed to get health check vip %s, %v", vipName, err)
×
339
                        return "", err
×
340
                }
×
341
        }
342

343
        if checkVip.Status.V4ip == "" && checkVip.Status.V6ip == "" {
×
344
                err = fmt.Errorf("vip %s is not ready", vipName)
×
345
                klog.Error(err)
×
346
                return "", err
×
347
        }
×
348

349
        switch util.CheckProtocol(lbVip) {
×
350
        case kubeovnv1.ProtocolIPv4:
×
351
                checkIP = checkVip.Status.V4ip
×
352
        case kubeovnv1.ProtocolIPv6:
×
353
                checkIP = checkVip.Status.V6ip
×
354
        }
355
        if checkIP == "" {
×
356
                err = fmt.Errorf("failed to get health check vip subnet %s", vipName)
×
357
                klog.Error(err)
×
358
                return "", err
×
359
        }
×
360

361
        return checkIP, nil
×
362
}
363

NEW
364
func getIPPortMappingBackend(endpoints *v1.Endpoints, pods []*v1.Pod, servicePort v1.ServicePort, serviceIP, checkVip string, isGenIPPortMapping bool) (map[string]string, []string) {
×
365
        var (
×
366
                ipPortMapping = map[string]string{}
×
367
                backends      = []string{}
×
368
                protocol      = util.CheckProtocol(serviceIP)
×
369
        )
×
370

×
371
        for _, subset := range endpoints.Subsets {
×
372
                var targetPort int32
×
373
                for _, port := range subset.Ports {
×
374
                        if port.Name == servicePort.Name {
×
375
                                targetPort = port.Port
×
376
                                break
×
377
                        }
378
                }
379
                if targetPort == 0 {
×
380
                        continue
×
381
                }
382

383
                for _, address := range subset.Addresses {
×
NEW
384
                        if isGenIPPortMapping && address.TargetRef.Name != "" {
×
NEW
385
                                ipName := fmt.Sprintf("%s.%s", address.TargetRef.Name, address.TargetRef.Namespace)
×
386
                                ipPortMapping[address.IP] = fmt.Sprintf(util.HealthCheckNamedVipTemplate, ipName, checkVip)
×
387
                        }
×
388
                        if address.TargetRef == nil || address.TargetRef.Kind != "Pod" {
×
389
                                if util.CheckProtocol(address.IP) == protocol {
×
390
                                        backends = append(backends, util.JoinHostPort(address.IP, targetPort))
×
391
                                }
×
392
                                continue
×
393
                        }
394
                        var ip string
×
395
                        for _, pod := range pods {
×
396
                                if pod.Name == address.TargetRef.Name {
×
397
                                        for _, podIP := range util.PodIPs(*pod) {
×
398
                                                if util.CheckProtocol(podIP) == protocol {
×
399
                                                        ip = podIP
×
400
                                                        break
×
401
                                                }
402
                                        }
403
                                        break
×
404
                                }
405
                        }
406
                        if ip == "" && util.CheckProtocol(address.IP) == protocol {
×
407
                                ip = address.IP
×
408
                        }
×
409
                        if ip != "" {
×
410
                                backends = append(backends, util.JoinHostPort(ip, targetPort))
×
411
                        }
×
412
                }
413
        }
414

415
        return ipPortMapping, backends
×
416
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc