• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 13425935899

20 Feb 2025 01:32AM UTC coverage: 22.263% (+0.2%) from 22.068%
13425935899

Pull #4991

github

zhangzujian
wip

Signed-off-by: zhangzujian <zhangzujian.7@gmail.com>
Pull Request #4991: add support for internalTrafficPolicy=Local

175 of 617 new or added lines in 13 files covered. (28.36%)

8 existing lines in 5 files now uncovered.

10436 of 46876 relevant lines covered (22.26%)

0.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/controller/endpoint.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "maps"
7
        "net"
8
        "slices"
9
        "strings"
10
        "time"
11

12
        v1 "k8s.io/api/core/v1"
13
        "k8s.io/apimachinery/pkg/api/errors"
14
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15
        "k8s.io/apimachinery/pkg/labels"
16
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
17
        "k8s.io/client-go/tools/cache"
18
        "k8s.io/klog/v2"
19
        "k8s.io/utils/set"
20

21
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
22
        "github.com/kubeovn/kube-ovn/pkg/util"
23
)
24

25
func (c *Controller) enqueueAddEndpoint(obj interface{}) {
×
26
        key := cache.MetaObjectToName(obj.(*v1.Endpoints)).String()
×
27
        klog.V(3).Infof("enqueue add endpoint %s", key)
×
28
        c.addOrUpdateEndpointQueue.Add(key)
×
29
}
×
30

31
func (c *Controller) enqueueUpdateEndpoint(oldObj, newObj interface{}) {
×
32
        oldEp := oldObj.(*v1.Endpoints)
×
33
        newEp := newObj.(*v1.Endpoints)
×
34
        if oldEp.ResourceVersion == newEp.ResourceVersion {
×
35
                return
×
36
        }
×
37

38
        if len(oldEp.Subsets) == 0 && len(newEp.Subsets) == 0 {
×
39
                return
×
40
        }
×
41

42
        key := cache.MetaObjectToName(newEp).String()
×
43
        klog.V(3).Infof("enqueue update endpoint %s", key)
×
44
        c.addOrUpdateEndpointQueue.Add(key)
×
45
}
46

47
func (c *Controller) handleUpdateEndpoint(key string) error {
×
48
        namespace, name, err := cache.SplitMetaNamespaceKey(key)
×
49
        if err != nil {
×
50
                utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
×
51
                return nil
×
52
        }
×
53

54
        c.epKeyMutex.LockKey(key)
×
55
        defer func() { _ = c.epKeyMutex.UnlockKey(key) }()
×
56
        klog.Infof("handle update endpoint %s", key)
×
57

×
58
        ep, err := c.endpointsLister.Endpoints(namespace).Get(name)
×
59
        if err != nil {
×
60
                if errors.IsNotFound(err) {
×
61
                        return nil
×
62
                }
×
63
                klog.Error(err)
×
64
                return err
×
65
        }
66

67
        cachedService, err := c.servicesLister.Services(namespace).Get(name)
×
68
        if err != nil {
×
69
                if errors.IsNotFound(err) {
×
70
                        return nil
×
71
                }
×
72
                klog.Error(err)
×
73
                return err
×
74
        }
75
        svc := cachedService.DeepCopy()
×
76

×
77
        var (
×
78
                pods                     []*v1.Pod
×
NEW
79
                vips                     []string
×
80
                vip, vpcName, subnetName string
×
81
                ok                       bool
×
82
                ignoreHealthCheck        = true
×
83
        )
×
84

×
85
        if vip, ok = svc.Annotations[util.SwitchLBRuleVipsAnnotation]; ok {
×
NEW
86
                vips = []string{vip}
×
87

×
88
                for _, subset := range ep.Subsets {
×
89
                        for _, address := range subset.Addresses {
×
90
                                // TODO: IPv6
×
91
                                if util.CheckProtocol(vip) == kubeovnv1.ProtocolIPv4 &&
×
92
                                        address.TargetRef.Name != "" {
×
93
                                        ignoreHealthCheck = false
×
94
                                }
×
95
                        }
96
                }
NEW
97
        } else if vips = util.ServiceClusterIPs(*svc); len(vips) == 0 {
×
98
                return nil
×
99
        }
×
100

101
        if pods, err = c.podsLister.Pods(namespace).List(labels.Set(svc.Spec.Selector).AsSelector()); err != nil {
×
102
                klog.Errorf("failed to get pods for service %s in namespace %s: %v", name, namespace, err)
×
103
                return err
×
104
        }
×
105
        for i, pod := range pods {
×
106
                if pod.Status.PodIP != "" || len(pod.Status.PodIPs) != 0 {
×
107
                        continue
×
108
                }
109

110
                for _, subset := range ep.Subsets {
×
111
                        for _, addr := range subset.Addresses {
×
112
                                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" || addr.TargetRef.Name != pod.Name {
×
113
                                        continue
×
114
                                }
115

116
                                p, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Get(context.Background(), pod.Name, metav1.GetOptions{})
×
117
                                if err != nil {
×
118
                                        klog.Errorf("failed to get pod %s/%s: %v", pod.Namespace, pod.Name, err)
×
119
                                        return err
×
120
                                }
×
121
                                pods[i] = p.DeepCopy()
×
122
                                break
×
123
                        }
124
                        if pods[i] != pod {
×
125
                                break
×
126
                        }
127
                }
128
        }
129

130
        vpcName, subnetName = c.getVpcSubnetName(pods, ep, svc)
×
NEW
131
        vpc, err := c.vpcsLister.Get(vpcName)
×
NEW
132
        if err != nil {
×
133
                klog.Errorf("failed to get vpc %s, %v", vpcName, err)
×
134
                return err
×
135
        }
×
136

NEW
137
        var tcpLB, udpLB, sctpLB string
×
NEW
138
        tcpLBs := set.New(vpc.Status.TCPLoadBalancer, vpc.Status.TCPSessionLoadBalancer, vpc.Status.LocalTCPLoadBalancer, vpc.Status.TCPSessionLoadBalancer)
×
NEW
139
        udpLBs := set.New(vpc.Status.UDPLoadBalancer, vpc.Status.UDPSessionLoadBalancer, vpc.Status.LocalUDPLoadBalancer, vpc.Status.UDPSessionLoadBalancer)
×
NEW
140
        sctpLBs := set.New(vpc.Status.SCTPLoadBalancer, vpc.Status.SCTPSessionLoadBalancer, vpc.Status.LocalSCTPLoadBalancer, vpc.Status.SCTPSessionLoadBalancer)
×
NEW
141
        tpLocal := svc.Spec.InternalTrafficPolicy != nil && *svc.Spec.InternalTrafficPolicy == v1.ServiceInternalTrafficPolicyLocal
×
NEW
142
        if tpLocal {
×
NEW
143
                if svc.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
×
NEW
144
                        tcpLB, udpLB, sctpLB = vpc.Status.LocalTCPSessionLoadBalancer, vpc.Status.LocalUDPSessionLoadBalancer, vpc.Status.LocalSCTPSessionLoadBalancer
×
NEW
145
                } else {
×
NEW
146
                        tcpLB, udpLB, sctpLB = vpc.Status.LocalTCPLoadBalancer, vpc.Status.LocalUDPLoadBalancer, vpc.Status.LocalSCTPLoadBalancer
×
NEW
147
                }
×
NEW
148
        } else {
×
NEW
149
                if svc.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
×
NEW
150
                        tcpLB, udpLB, sctpLB = vpc.Status.TCPSessionLoadBalancer, vpc.Status.UDPSessionLoadBalancer, vpc.Status.SCTPSessionLoadBalancer
×
NEW
151
                } else {
×
NEW
152
                        tcpLB, udpLB, sctpLB = vpc.Status.TCPLoadBalancer, vpc.Status.UDPLoadBalancer, vpc.Status.SCTPLoadBalancer
×
NEW
153
                }
×
154
        }
155

NEW
156
        for _, vip := range vips {
×
157
                for _, port := range svc.Spec.Ports {
×
NEW
158
                        var lb string
×
NEW
159
                        var lbs set.Set[string]
×
160
                        switch port.Protocol {
×
161
                        case v1.ProtocolTCP:
×
NEW
162
                                lb, lbs = tcpLB, tcpLBs.Clone()
×
163
                        case v1.ProtocolUDP:
×
NEW
164
                                lb, lbs = udpLB, udpLBs.Clone()
×
165
                        case v1.ProtocolSCTP:
×
NEW
166
                                lb, lbs = sctpLB, sctpLBs.Clone()
×
NEW
167
                        default:
×
NEW
168
                                klog.Errorf("unsupported protocol %q", port.Protocol)
×
NEW
169
                                continue
×
170
                        }
171

172
                        var (
×
NEW
173
                                checkIP   string
×
NEW
174
                                externals map[string]string
×
175
                        )
×
176

×
177
                        if !ignoreHealthCheck {
×
NEW
178
                                if checkIP, err = c.getHealthCheckVip(subnetName, vip); err != nil {
×
179
                                        klog.Error(err)
×
180
                                        return err
×
181
                                }
×
182
                                externals = map[string]string{
×
183
                                        util.SwitchLBRuleSubnet: subnetName,
×
184
                                }
×
185
                        }
186

187
                        // chassis template variable name format:
188
                        // LB_VIP_<PROTOCOL>_<IP_HEX>_<PORT>
189
                        // LB_BACKENDS_<PROTOCOL>_<IP_HEX>_<PORT>
NEW
190
                        lbVIP := util.JoinHostPort(vip, port.Port)
×
NEW
191
                        vipHex := util.IP2Hex(net.ParseIP(vip))
×
NEW
192
                        vipVar := strings.ToUpper(fmt.Sprintf("LB_VIP_%s_%s_%d", port.Protocol, vipHex, port.Port))
×
NEW
193
                        backendsVar := strings.ToUpper(fmt.Sprintf("LB_BACKENDS_%s_%s_%d", port.Protocol, vipHex, port.Port))
×
NEW
194
                        ipPortMapping, backends := getIPPortMappingBackend(ep, pods, port, vip, checkIP, ignoreHealthCheck)
×
195
                        if len(backends) != 0 {
×
NEW
196
                                vip := lbVIP
×
NEW
197
                                var lbBackends []string
×
NEW
198
                                if tpLocal {
×
NEW
199
                                        vip = "^" + vipVar
×
NEW
200
                                        lbBackends = []string{"^" + backendsVar}
×
NEW
201
                                        // add/update template variable
×
NEW
202
                                        nodeValues := make(map[string]string, len(backends))
×
NEW
203
                                        for node, endpoints := range backends {
×
NEW
204
                                                nodeValues[node] = strings.Join(endpoints, ",")
×
NEW
205
                                        }
×
NEW
206
                                        if err = c.OVNNbClient.UpdateChassisTemplateVarVariables(backendsVar, nodeValues); err != nil {
×
NEW
207
                                                klog.Errorf("failed to update Chassis_Template_Var variable %s: %v", backendsVar, err)
×
NEW
208
                                                return err
×
NEW
209
                                        }
×
NEW
210
                                } else {
×
NEW
211
                                        lbBackends = slices.Concat(slices.Collect(maps.Values(backends))...)
×
NEW
212
                                }
×
213

NEW
214
                                klog.Infof("add vip %s with backends %v to LB %s", vip, lbBackends, lb)
×
NEW
215
                                if err = c.OVNNbClient.LoadBalancerAddVip(lb, vip, lbBackends...); err != nil {
×
NEW
216
                                        klog.Errorf("failed to add vip %s with backends %s to LB %s: %v", vip, lbBackends, lb, err)
×
217
                                        return err
×
218
                                }
×
219
                                if !ignoreHealthCheck && len(ipPortMapping) != 0 {
×
220
                                        klog.Infof("add health check ip port mapping %v to LB %s", ipPortMapping, lb)
×
221
                                        if err = c.OVNNbClient.LoadBalancerAddHealthCheck(lb, vip, ignoreHealthCheck, ipPortMapping, externals); err != nil {
×
NEW
222
                                                klog.Errorf("failed to add health check for vip %s with ip port mapping %s to LB %s: %v", vip, ipPortMapping, lb, err)
×
223
                                                return err
×
224
                                        }
×
225
                                }
226

227
                                // for performance reason delete lb with no backends
NEW
228
                                lbs.Delete(lb)
×
229
                        }
230

NEW
231
                        for _, lb := range lbs.UnsortedList() {
×
NEW
232
                                klog.V(3).Infof("delete vip %s from LB %s", lbVIP, lb)
×
NEW
233
                                if err = c.OVNNbClient.LoadBalancerDeleteVip(lb, lbVIP, ignoreHealthCheck); err != nil {
×
NEW
234
                                        klog.Errorf("failed to delete vip %s from LB %s: %v", lbVIP, lb, err)
×
235
                                        return err
×
236
                                }
×
237
                        }
238

NEW
239
                        if !tpLocal || len(backends) == 0 {
×
NEW
240
                                // delete chassis template var after the lb vip is deleted to avoid ovn-controller error parsing actions
×
NEW
241
                                if err = c.OVNNbClient.DeleteChassisTemplateVarVariables(backendsVar); err != nil {
×
NEW
242
                                        klog.Errorf("failed to delete Chassis_Template_Var variable %s: %v", backendsVar, err)
×
243
                                        return err
×
244
                                }
×
245
                        }
246
                }
247
        }
248

NEW
249
        if svc.Annotations[util.VpcAnnotation] != vpcName {
×
250
                patch := util.KVPatch{util.VpcAnnotation: vpcName}
×
251
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Services(namespace), svc.Name, patch); err != nil {
×
252
                        klog.Errorf("failed to patch service %s: %v", key, err)
×
253
                        return err
×
254
                }
×
255
        }
256

257
        return nil
×
258
}
259

260
func (c *Controller) getVpcSubnetName(pods []*v1.Pod, endpoints *v1.Endpoints, service *v1.Service) (string, string) {
×
261
        var (
×
262
                vpcName    string
×
263
                subnetName string
×
264
        )
×
265

×
266
        for _, pod := range pods {
×
267
                if len(pod.Annotations) == 0 {
×
268
                        continue
×
269
                }
270
                if subnetName == "" {
×
271
                        subnetName = pod.Annotations[util.LogicalSwitchAnnotation]
×
272
                }
×
273

274
        LOOP:
×
275
                for _, subset := range endpoints.Subsets {
×
276
                        for _, addr := range subset.Addresses {
×
277
                                if addr.IP == pod.Status.PodIP {
×
278
                                        if vpcName == "" {
×
279
                                                vpcName = pod.Annotations[util.LogicalRouterAnnotation]
×
280
                                        }
×
281
                                        if vpcName != "" {
×
282
                                                break LOOP
×
283
                                        }
284
                                }
285
                        }
286
                }
287

288
                if vpcName != "" && subnetName != "" {
×
289
                        break
×
290
                }
291
        }
292

293
        if vpcName == "" {
×
294
                if vpcName = service.Annotations[util.VpcAnnotation]; vpcName == "" {
×
295
                        vpcName = c.config.ClusterRouter
×
296
                }
×
297
        }
298

299
        if subnetName == "" {
×
300
                subnetName = util.DefaultSubnet
×
301
        }
×
302

303
        return vpcName, subnetName
×
304
}
305

306
// getHealthCheckVip get health check vip for load balancer, the vip name is the subnet name
307
// the vip is used to check the health of the backend pod
308
func (c *Controller) getHealthCheckVip(subnetName, lbVip string) (string, error) {
×
309
        var (
×
310
                needCreateHealthCheckVip bool
×
311
                checkVip                 *kubeovnv1.Vip
×
312
                checkIP                  string
×
313
                err                      error
×
314
        )
×
315
        vipName := subnetName
×
316
        checkVip, err = c.virtualIpsLister.Get(vipName)
×
317
        if err != nil {
×
318
                if errors.IsNotFound(err) {
×
319
                        needCreateHealthCheckVip = true
×
320
                } else {
×
321
                        klog.Errorf("failed to get health check vip %s, %v", vipName, err)
×
322
                        return "", err
×
323
                }
×
324
        }
325
        if needCreateHealthCheckVip {
×
326
                vip := &kubeovnv1.Vip{
×
327
                        ObjectMeta: metav1.ObjectMeta{
×
328
                                Name: vipName,
×
329
                        },
×
330
                        Spec: kubeovnv1.VipSpec{
×
331
                                Subnet: subnetName,
×
332
                        },
×
333
                }
×
334
                if _, err = c.config.KubeOvnClient.KubeovnV1().Vips().Create(context.Background(), vip, metav1.CreateOptions{}); err != nil {
×
335
                        klog.Errorf("failed to create health check vip %s, %v", vipName, err)
×
336
                        return "", err
×
337
                }
×
338

339
                // wait for vip created
340
                time.Sleep(1 * time.Second)
×
341
                checkVip, err = c.virtualIpsLister.Get(vipName)
×
342
                if err != nil {
×
343
                        klog.Errorf("failed to get health check vip %s, %v", vipName, err)
×
344
                        return "", err
×
345
                }
×
346
        }
347

348
        if checkVip.Status.V4ip == "" && checkVip.Status.V6ip == "" {
×
349
                err = fmt.Errorf("vip %s is not ready", vipName)
×
350
                klog.Error(err)
×
351
                return "", err
×
352
        }
×
353

354
        switch util.CheckProtocol(lbVip) {
×
355
        case kubeovnv1.ProtocolIPv4:
×
356
                checkIP = checkVip.Status.V4ip
×
357
        case kubeovnv1.ProtocolIPv6:
×
358
                checkIP = checkVip.Status.V6ip
×
359
        }
360
        if checkIP == "" {
×
361
                err = fmt.Errorf("failed to get health check vip subnet %s", vipName)
×
362
                klog.Error(err)
×
363
                return "", err
×
364
        }
×
365

366
        return checkIP, nil
×
367
}
368

NEW
369
func getIPPortMappingBackend(endpoints *v1.Endpoints, pods []*v1.Pod, servicePort v1.ServicePort, serviceIP, checkVip string, ignoreHealthCheck bool) (map[string]string, map[string][]string) {
×
370
        var (
×
371
                ipPortMapping = map[string]string{}
×
NEW
372
                backends      = make(map[string][]string)
×
373
                protocol      = util.CheckProtocol(serviceIP)
×
374
        )
×
375

×
376
        for _, subset := range endpoints.Subsets {
×
377
                var targetPort int32
×
378
                for _, port := range subset.Ports {
×
379
                        if port.Name == servicePort.Name {
×
380
                                targetPort = port.Port
×
381
                                break
×
382
                        }
383
                }
384
                if targetPort == 0 {
×
385
                        continue
×
386
                }
387

388
                for _, address := range subset.Addresses {
×
NEW
389
                        var node string
×
390
                        if !ignoreHealthCheck && address.TargetRef.Name != "" {
×
391
                                ipName := fmt.Sprintf("%s.%s", address.TargetRef.Name, endpoints.Namespace)
×
392
                                ipPortMapping[address.IP] = fmt.Sprintf(util.HealthCheckNamedVipTemplate, ipName, checkVip)
×
393
                        }
×
394
                        if address.TargetRef == nil || address.TargetRef.Kind != "Pod" {
×
395
                                if util.CheckProtocol(address.IP) == protocol {
×
NEW
396
                                        if address.NodeName != nil {
×
NEW
397
                                                node = *address.NodeName
×
NEW
398
                                        }
×
NEW
399
                                        backends[node] = append(backends[node], util.JoinHostPort(address.IP, targetPort))
×
400
                                }
401
                                continue
×
402
                        }
403
                        var ip string
×
404
                        for _, pod := range pods {
×
405
                                if pod.Name == address.TargetRef.Name {
×
406
                                        for _, podIP := range util.PodIPs(*pod) {
×
407
                                                if util.CheckProtocol(podIP) == protocol {
×
408
                                                        ip = podIP
×
409
                                                        break
×
410
                                                }
411
                                        }
NEW
412
                                        node = pod.Spec.NodeName
×
UNCOV
413
                                        break
×
414
                                }
415
                        }
416
                        if ip == "" && util.CheckProtocol(address.IP) == protocol {
×
417
                                ip = address.IP
×
418
                        }
×
419
                        if ip != "" {
×
NEW
420
                                backends[node] = append(backends[node], util.JoinHostPort(ip, targetPort))
×
421
                        }
×
422
                }
423
        }
424

425
        return ipPortMapping, backends
×
426
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc