• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 14372473322

10 Apr 2025 04:42AM UTC coverage: 21.704% (-0.3%) from 22.009%
14372473322

Pull #5110

github

zbb88888
fix fmt

Signed-off-by: zbb88888 <jmdxjsjgcxy@gmail.com>
Pull Request #5110: enable check vlan conflict

0 of 178 new or added lines in 6 files covered. (0.0%)

1053 existing lines in 9 files now uncovered.

10263 of 47286 relevant lines covered (21.7%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/controller/endpoint.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "time"
7

8
        v1 "k8s.io/api/core/v1"
9
        "k8s.io/apimachinery/pkg/api/errors"
10
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11
        "k8s.io/apimachinery/pkg/labels"
12
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
13
        "k8s.io/client-go/tools/cache"
14
        "k8s.io/klog/v2"
15

16
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
17
        "github.com/kubeovn/kube-ovn/pkg/util"
18
)
19

20
func (c *Controller) enqueueAddEndpoint(obj interface{}) {
×
21
        key := cache.MetaObjectToName(obj.(*v1.Endpoints)).String()
×
22
        klog.V(3).Infof("enqueue add endpoint %s", key)
×
23
        c.addOrUpdateEndpointQueue.Add(key)
×
24
}
×
25

26
func (c *Controller) enqueueUpdateEndpoint(oldObj, newObj interface{}) {
×
27
        oldEp := oldObj.(*v1.Endpoints)
×
28
        newEp := newObj.(*v1.Endpoints)
×
29
        if oldEp.ResourceVersion == newEp.ResourceVersion {
×
30
                return
×
31
        }
×
32

33
        if len(oldEp.Subsets) == 0 && len(newEp.Subsets) == 0 {
×
34
                return
×
35
        }
×
36

37
        key := cache.MetaObjectToName(newEp).String()
×
38
        klog.V(3).Infof("enqueue update endpoint %s", key)
×
39
        c.addOrUpdateEndpointQueue.Add(key)
×
40
}
41

42
func (c *Controller) handleUpdateEndpoint(key string) error {
×
43
        namespace, name, err := cache.SplitMetaNamespaceKey(key)
×
44
        if err != nil {
×
45
                utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
×
46
                return nil
×
47
        }
×
48

49
        c.epKeyMutex.LockKey(key)
×
50
        defer func() { _ = c.epKeyMutex.UnlockKey(key) }()
×
51
        klog.Infof("handle update endpoint %s", key)
×
52

×
53
        ep, err := c.endpointsLister.Endpoints(namespace).Get(name)
×
54
        if err != nil {
×
55
                if errors.IsNotFound(err) {
×
56
                        return nil
×
57
                }
×
58
                klog.Error(err)
×
59
                return err
×
60
        }
61

62
        cachedService, err := c.servicesLister.Services(namespace).Get(name)
×
63
        if err != nil {
×
64
                if errors.IsNotFound(err) {
×
65
                        return nil
×
66
                }
×
67
                klog.Error(err)
×
68
                return err
×
69
        }
70
        svc := cachedService.DeepCopy()
×
71

×
72
        var (
×
73
                pods                     []*v1.Pod
×
74
                lbVips                   []string
×
75
                vip, vpcName, subnetName string
×
76
                ok                       bool
×
77
                ignoreHealthCheck        = true
×
78
        )
×
79

×
80
        if vip, ok = svc.Annotations[util.SwitchLBRuleVipsAnnotation]; ok {
×
81
                lbVips = []string{vip}
×
82

×
83
                for _, subset := range ep.Subsets {
×
84
                        for _, address := range subset.Addresses {
×
85
                                // TODO: IPv6
×
86
                                if util.CheckProtocol(vip) == kubeovnv1.ProtocolIPv4 &&
×
87
                                        address.TargetRef.Name != "" {
×
88
                                        ignoreHealthCheck = false
×
89
                                }
×
UNCOV
90
                        }
×
91
                }
92
        } else if lbVips = util.ServiceClusterIPs(*svc); len(lbVips) == 0 {
93
                return nil
×
94
        }
×
UNCOV
95

×
96
        if pods, err = c.podsLister.Pods(namespace).List(labels.Set(svc.Spec.Selector).AsSelector()); err != nil {
97
                klog.Errorf("failed to get pods for service %s in namespace %s: %v", name, namespace, err)
×
98
                return err
×
99
        }
×
100
        for i, pod := range pods {
×
101
                if pod.Status.PodIP != "" || len(pod.Status.PodIPs) != 0 {
×
102
                        continue
×
UNCOV
103
                }
×
104

105
                for _, subset := range ep.Subsets {
106
                        for _, addr := range subset.Addresses {
×
107
                                if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" || addr.TargetRef.Name != pod.Name {
×
108
                                        continue
×
UNCOV
109
                                }
×
110

111
                                p, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Get(context.Background(), pod.Name, metav1.GetOptions{})
112
                                if err != nil {
×
113
                                        klog.Errorf("failed to get pod %s/%s: %v", pod.Namespace, pod.Name, err)
×
114
                                        return err
×
115
                                }
×
116
                                pods[i] = p.DeepCopy()
×
117
                                break
×
UNCOV
118
                        }
×
119
                        if pods[i] != pod {
120
                                break
UNCOV
121
                        }
×
UNCOV
122
                }
×
UNCOV
123
        }
×
UNCOV
124

×
125
        vpcName, subnetName = c.getVpcSubnetName(pods, ep, svc)
126

127
        var (
×
128
                vpc    *kubeovnv1.Vpc
×
129
                svcVpc string
×
130
        )
×
131

×
132
        if vpc, err = c.vpcsLister.Get(vpcName); err != nil {
×
133
                klog.Errorf("failed to get vpc %s, %v", vpcName, err)
×
134
                return err
135
        }
×
UNCOV
136

×
137
        tcpLb, udpLb, sctpLb := vpc.Status.TCPLoadBalancer, vpc.Status.UDPLoadBalancer, vpc.Status.SctpLoadBalancer
138
        oldTCPLb, oldUDPLb, oldSctpLb := vpc.Status.TCPSessionLoadBalancer, vpc.Status.UDPSessionLoadBalancer, vpc.Status.SctpSessionLoadBalancer
139
        if svc.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
140
                tcpLb, udpLb, sctpLb, oldTCPLb, oldUDPLb, oldSctpLb = oldTCPLb, oldUDPLb, oldSctpLb, tcpLb, udpLb, sctpLb
141
        }
×
UNCOV
142

×
143
        for _, lbVip := range lbVips {
×
144
                for _, port := range svc.Spec.Ports {
×
145
                        var lb, oldLb string
×
146
                        switch port.Protocol {
×
147
                        case v1.ProtocolTCP:
×
148
                                lb, oldLb = tcpLb, oldTCPLb
×
149
                        case v1.ProtocolUDP:
×
150
                                lb, oldLb = udpLb, oldUDPLb
×
151
                        case v1.ProtocolSCTP:
×
152
                                lb, oldLb = sctpLb, oldSctpLb
UNCOV
153
                        }
×
UNCOV
154

×
155
                        var (
×
156
                                vip, checkIP             string
×
157
                                backends                 []string
×
158
                                ipPortMapping, externals map[string]string
159
                        )
×
160

×
161
                        if !ignoreHealthCheck {
×
162
                                if checkIP, err = c.getHealthCheckVip(subnetName, lbVip); err != nil {
×
163
                                        klog.Error(err)
×
164
                                        return err
×
165
                                }
×
166
                                externals = map[string]string{
×
167
                                        util.SwitchLBRuleSubnet: subnetName,
×
168
                                }
×
169
                        }
170

171
                        ipPortMapping, backends = getIPPortMappingBackend(ep, pods, port, lbVip, checkIP, ignoreHealthCheck)
×
172

×
173
                        // for performance reason delete lb with no backends
×
174
                        if len(backends) != 0 {
×
175
                                vip = util.JoinHostPort(lbVip, port.Port)
×
176
                                klog.Infof("add vip endpoint %s, backends %v to LB %s", vip, backends, lb)
×
177
                                if err = c.OVNNbClient.LoadBalancerAddVip(lb, vip, backends...); err != nil {
×
178
                                        klog.Errorf("failed to add vip %s with backends %s to LB %s: %v", lbVip, backends, lb, err)
×
179
                                        return err
×
180
                                }
×
181
                                if !ignoreHealthCheck && len(ipPortMapping) != 0 {
×
182
                                        klog.Infof("add health check ip port mapping %v to LB %s", ipPortMapping, lb)
×
183
                                        if err = c.OVNNbClient.LoadBalancerAddHealthCheck(lb, vip, ignoreHealthCheck, ipPortMapping, externals); err != nil {
×
184
                                                klog.Errorf("failed to add health check for vip %s with ip port mapping %s to LB %s: %v", lbVip, ipPortMapping, lb, err)
×
185
                                                return err
186
                                        }
UNCOV
187
                                }
×
188
                        } else {
×
189
                                klog.V(3).Infof("delete vip endpoint %s from LB %s", vip, lb)
×
190
                                if err = c.OVNNbClient.LoadBalancerDeleteVip(lb, vip, ignoreHealthCheck); err != nil {
×
191
                                        klog.Errorf("failed to delete vip endpoint %s from LB %s: %v", vip, lb, err)
×
192
                                        return err
×
193
                                }
×
UNCOV
194

×
195
                                klog.V(3).Infof("delete vip endpoint %s from old LB %s", vip, oldLb)
×
196
                                if err = c.OVNNbClient.LoadBalancerDeleteVip(oldLb, vip, ignoreHealthCheck); err != nil {
×
197
                                        klog.Errorf("failed to delete vip %s from LB %s: %v", vip, oldLb, err)
×
198
                                        return err
×
199
                                }
×
UNCOV
200
                        }
×
201
                }
UNCOV
202
        }
×
UNCOV
203

×
204
        if svcVpc = svc.Annotations[util.VpcAnnotation]; svcVpc != vpcName {
×
205
                patch := util.KVPatch{util.VpcAnnotation: vpcName}
×
206
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Services(namespace), svc.Name, patch); err != nil {
×
207
                        klog.Errorf("failed to patch service %s: %v", key, err)
208
                        return err
209
                }
×
UNCOV
210
        }
×
UNCOV
211

×
212
        return nil
×
UNCOV
213
}
×
UNCOV
214

×
215
func (c *Controller) getVpcSubnetName(pods []*v1.Pod, endpoints *v1.Endpoints, service *v1.Service) (string, string) {
216
        var (
×
217
                vpcName    string
×
218
                subnetName string
×
219
        )
×
220

×
221
        for _, pod := range pods {
×
222
                if len(pod.Annotations) == 0 {
×
223
                        continue
UNCOV
224
                }
×
225
                if subnetName == "" {
×
226
                        subnetName = pod.Annotations[util.LogicalSwitchAnnotation]
×
227
                }
×
UNCOV
228

×
229
        LOOP:
230
                for _, subset := range endpoints.Subsets {
×
231
                        for _, addr := range subset.Addresses {
×
232
                                if addr.IP == pod.Status.PodIP {
×
233
                                        if vpcName == "" {
×
234
                                                vpcName = pod.Annotations[util.LogicalRouterAnnotation]
×
235
                                        }
×
236
                                        if vpcName != "" {
×
237
                                                break LOOP
×
UNCOV
238
                                        }
×
239
                                }
240
                        }
241
                }
242

243
                if vpcName != "" && subnetName != "" {
244
                        break
×
UNCOV
245
                }
×
UNCOV
246
        }
×
UNCOV
247

×
248
        if vpcName == "" {
×
249
                if vpcName = service.Annotations[util.VpcAnnotation]; vpcName == "" {
×
250
                        vpcName = c.config.ClusterRouter
251
                }
UNCOV
252
        }
×
253

254
        if subnetName == "" {
255
                subnetName = util.DefaultSubnet
×
256
        }
×
UNCOV
257

×
258
        return vpcName, subnetName
×
UNCOV
259
}
×
UNCOV
260

×
UNCOV
261
// getHealthCheckVip get health check vip for load balancer, the vip name is the subnet name
×
UNCOV
262
// the vip is used to check the health of the backend pod
×
263
func (c *Controller) getHealthCheckVip(subnetName, lbVip string) (string, error) {
×
264
        var (
265
                needCreateHealthCheckVip bool
×
266
                checkVip                 *kubeovnv1.Vip
×
267
                checkIP                  string
×
268
                err                      error
269
        )
×
270
        vipName := subnetName
×
271
        checkVip, err = c.virtualIpsLister.Get(vipName)
×
272
        if err != nil {
×
273
                if errors.IsNotFound(err) {
×
274
                        needCreateHealthCheckVip = true
×
275
                } else {
×
276
                        klog.Errorf("failed to get health check vip %s, %v", vipName, err)
×
277
                        return "", err
×
278
                }
279
        }
280
        if needCreateHealthCheckVip {
281
                vip := &kubeovnv1.Vip{
282
                        ObjectMeta: metav1.ObjectMeta{
283
                                Name: vipName,
×
284
                        },
×
285
                        Spec: kubeovnv1.VipSpec{
286
                                Subnet: subnetName,
287
                        },
288
                }
×
289
                if _, err = c.config.KubeOvnClient.KubeovnV1().Vips().Create(context.Background(), vip, metav1.CreateOptions{}); err != nil {
×
290
                        klog.Errorf("failed to create health check vip %s, %v", vipName, err)
×
291
                        return "", err
×
292
                }
293

UNCOV
294
                // wait for vip created
×
295
                time.Sleep(1 * time.Second)
×
296
                checkVip, err = c.virtualIpsLister.Get(vipName)
×
297
                if err != nil {
298
                        klog.Errorf("failed to get health check vip %s, %v", vipName, err)
×
299
                        return "", err
300
                }
301
        }
302

303
        if checkVip.Status.V4ip == "" && checkVip.Status.V6ip == "" {
×
304
                err = fmt.Errorf("vip %s is not ready", vipName)
×
305
                klog.Error(err)
×
306
                return "", err
×
307
        }
×
UNCOV
308

×
309
        switch util.CheckProtocol(lbVip) {
×
310
        case kubeovnv1.ProtocolIPv4:
×
311
                checkIP = checkVip.Status.V4ip
×
312
        case kubeovnv1.ProtocolIPv6:
×
313
                checkIP = checkVip.Status.V6ip
×
UNCOV
314
        }
×
315
        if checkIP == "" {
×
316
                err = fmt.Errorf("failed to get health check vip subnet %s", vipName)
×
317
                klog.Error(err)
×
318
                return "", err
×
319
        }
UNCOV
320

×
321
        return checkIP, nil
×
UNCOV
322
}
×
UNCOV
323

×
324
func getIPPortMappingBackend(endpoints *v1.Endpoints, pods []*v1.Pod, servicePort v1.ServicePort, serviceIP, checkVip string, ignoreHealthCheck bool) (map[string]string, []string) {
×
325
        var (
×
326
                ipPortMapping = map[string]string{}
×
327
                backends      = []string{}
×
328
                protocol      = util.CheckProtocol(serviceIP)
×
329
        )
×
330

×
331
        for _, subset := range endpoints.Subsets {
×
332
                var targetPort int32
×
333
                for _, port := range subset.Ports {
334
                        if port.Name == servicePort.Name {
335
                                targetPort = port.Port
×
336
                                break
×
UNCOV
337
                        }
×
UNCOV
338
                }
×
339
                if targetPort == 0 {
×
340
                        continue
×
341
                }
342

343
                for _, address := range subset.Addresses {
×
344
                        if !ignoreHealthCheck && address.TargetRef.Name != "" {
×
345
                                ipName := fmt.Sprintf("%s.%s", address.TargetRef.Name, endpoints.Namespace)
×
346
                                ipPortMapping[address.IP] = fmt.Sprintf(util.HealthCheckNamedVipTemplate, ipName, checkVip)
×
347
                        }
×
348
                        if address.TargetRef == nil || address.TargetRef.Kind != "Pod" {
349
                                if util.CheckProtocol(address.IP) == protocol {
×
350
                                        backends = append(backends, util.JoinHostPort(address.IP, targetPort))
×
351
                                }
×
352
                                continue
×
UNCOV
353
                        }
×
354
                        var ip string
355
                        for _, pod := range pods {
×
356
                                if pod.Name == address.TargetRef.Name {
×
357
                                        for _, podIP := range util.PodIPs(*pod) {
×
358
                                                if util.CheckProtocol(podIP) == protocol {
×
359
                                                        ip = podIP
×
360
                                                        break
UNCOV
361
                                                }
×
362
                                        }
363
                                        break
UNCOV
364
                                }
×
UNCOV
365
                        }
×
366
                        if ip == "" && util.CheckProtocol(address.IP) == protocol {
×
367
                                ip = address.IP
×
368
                        }
×
369
                        if ip != "" {
×
370
                                backends = append(backends, util.JoinHostPort(ip, targetPort))
×
371
                        }
×
UNCOV
372
                }
×
UNCOV
373
        }
×
UNCOV
374

×
375
        return ipPortMapping, backends
×
UNCOV
376
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc