• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 14372473322

10 Apr 2025 04:42AM UTC coverage: 21.704% (-0.3%) from 22.009%
14372473322

Pull #5110

github

zbb88888
fix fmt

Signed-off-by: zbb88888 <jmdxjsjgcxy@gmail.com>
Pull Request #5110: enable check vlan conflict

0 of 178 new or added lines in 6 files covered. (0.0%)

1053 existing lines in 9 files now uncovered.

10263 of 47286 relevant lines covered (21.7%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/controller/service.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "net"
7
        "reflect"
8
        "slices"
9
        "strings"
10
        "time"
11

12
        v1 "k8s.io/api/core/v1"
13
        "k8s.io/apimachinery/pkg/api/equality"
14
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
15
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16
        "k8s.io/apimachinery/pkg/labels"
17
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
18
        "k8s.io/client-go/tools/cache"
19
        "k8s.io/klog/v2"
20

21
        "github.com/kubeovn/kube-ovn/pkg/util"
22
)
23

24
type vpcService struct {
25
        Vips     []string
26
        Vpc      string
27
        Protocol v1.Protocol
28
        Svc      *v1.Service
29
}
30

31
type updateSvcObject struct {
32
        key      string
33
        oldPorts []v1.ServicePort
34
        newPorts []v1.ServicePort
35
}
36

37
func (c *Controller) enqueueAddService(obj interface{}) {
×
38
        svc := obj.(*v1.Service)
×
39
        key := cache.MetaObjectToName(svc).String()
×
40
        klog.V(3).Infof("enqueue add endpoint %s", key)
×
41
        c.addOrUpdateEndpointQueue.Add(key)
×
42

×
43
        if c.config.EnableNP {
×
44
                netpols, err := c.svcMatchNetworkPolicies(svc)
×
45
                if err != nil {
×
46
                        utilruntime.HandleError(err)
×
47
                        return
×
48
                }
×
49

50
                for _, np := range netpols {
×
51
                        c.updateNpQueue.Add(np)
×
52
                }
×
53
        }
54

55
        if c.config.EnableLbSvc {
×
56
                klog.V(3).Infof("enqueue add service %s", key)
×
57
                c.addServiceQueue.Add(key)
×
58
        }
×
59
}
60

61
func (c *Controller) enqueueDeleteService(obj interface{}) {
×
62
        svc := obj.(*v1.Service)
×
63
        klog.Infof("enqueue delete service %s/%s", svc.Namespace, svc.Name)
×
64

×
65
        vip, ok := svc.Annotations[util.SwitchLBRuleVipsAnnotation]
×
66
        if ok || svc.Spec.ClusterIP != v1.ClusterIPNone && svc.Spec.ClusterIP != "" {
×
67
                if c.config.EnableNP {
×
68
                        netpols, err := c.svcMatchNetworkPolicies(svc)
×
69
                        if err != nil {
×
70
                                utilruntime.HandleError(err)
×
71
                                return
×
72
                        }
×
73

74
                        for _, np := range netpols {
×
75
                                c.updateNpQueue.Add(np)
×
76
                        }
×
77
                }
78

79
                ips := util.ServiceClusterIPs(*svc)
×
80
                if ok {
×
81
                        ips = strings.Split(vip, ",")
×
82
                }
×
83

84
                for _, port := range svc.Spec.Ports {
×
85
                        vpcSvc := &vpcService{
×
86
                                Protocol: port.Protocol,
×
87
                                Vpc:      svc.Annotations[util.VpcAnnotation],
×
88
                                Svc:      svc,
89
                        }
90
                        for _, ip := range ips {
×
91
                                vpcSvc.Vips = append(vpcSvc.Vips, util.JoinHostPort(ip, port.Port))
×
92
                        }
×
93
                        klog.V(3).Infof("delete vpc service: %v", vpcSvc)
×
94
                        c.deleteServiceQueue.Add(vpcSvc)
×
UNCOV
95
                }
×
UNCOV
96
        }
×
UNCOV
97
}
×
UNCOV
98

×
99
func (c *Controller) enqueueUpdateService(oldObj, newObj interface{}) {
×
100
        oldSvc := oldObj.(*v1.Service)
×
101
        newSvc := newObj.(*v1.Service)
102
        if oldSvc.ResourceVersion == newSvc.ResourceVersion {
103
                return
104
        }
UNCOV
105

×
106
        oldClusterIps := getVipIps(oldSvc)
×
107
        newClusterIps := getVipIps(newSvc)
×
108
        var ipsToDel []string
×
109
        for _, oldClusterIP := range oldClusterIps {
×
110
                if !slices.Contains(newClusterIps, oldClusterIP) {
×
111
                        ipsToDel = append(ipsToDel, oldClusterIP)
112
                }
×
UNCOV
113
        }
×
UNCOV
114

×
115
        key := cache.MetaObjectToName(newSvc).String()
×
116
        klog.V(3).Infof("enqueue update service %s", key)
×
117
        if len(ipsToDel) != 0 {
×
118
                ipsToDelStr := strings.Join(ipsToDel, ",")
×
119
                key = strings.Join([]string{key, ipsToDelStr}, "#")
120
        }
UNCOV
121

×
122
        updateSvc := &updateSvcObject{
×
123
                key:      key,
×
124
                oldPorts: oldSvc.Spec.Ports,
×
125
                newPorts: newSvc.Spec.Ports,
×
126
        }
×
127
        c.updateServiceQueue.Add(updateSvc)
UNCOV
128
}
×
UNCOV
129

×
130
func (c *Controller) handleDeleteService(service *vpcService) error {
×
131
        key := cache.MetaObjectToName(service.Svc).String()
×
132

×
133
        c.svcKeyMutex.LockKey(key)
×
134
        defer func() { _ = c.svcKeyMutex.UnlockKey(key) }()
135
        klog.Infof("handle delete service %s", key)
136

×
137
        svcs, err := c.servicesLister.Services(v1.NamespaceAll).List(labels.Everything())
×
138
        if err != nil {
×
139
                klog.Errorf("failed to list svc, %v", err)
×
140
                return err
×
141
        }
×
UNCOV
142

×
143
        var (
×
144
                vpcLB             [2]string
×
145
                vpcLbConfig       = c.GenVpcLoadBalancer(service.Vpc)
×
146
                ignoreHealthCheck = true
×
147
        )
×
148

149
        switch service.Protocol {
×
150
        case v1.ProtocolTCP:
×
151
                vpcLB = [2]string{vpcLbConfig.TCPLoadBalancer, vpcLbConfig.TCPSessLoadBalancer}
×
152
        case v1.ProtocolUDP:
×
153
                vpcLB = [2]string{vpcLbConfig.UDPLoadBalancer, vpcLbConfig.UDPSessLoadBalancer}
×
154
        case v1.ProtocolSCTP:
×
155
                vpcLB = [2]string{vpcLbConfig.SctpLoadBalancer, vpcLbConfig.SctpSessLoadBalancer}
×
UNCOV
156
        }
×
UNCOV
157

×
158
        for _, vip := range service.Vips {
×
159
                var (
×
160
                        ip    string
×
161
                        found bool
×
162
                )
163
                ip = parseVipAddr(vip)
164

×
165
                for _, svc := range svcs {
×
166
                        if slices.Contains(util.ServiceClusterIPs(*svc), ip) {
×
167
                                found = true
×
168
                                break
×
UNCOV
169
                        }
×
UNCOV
170
                }
×
171
                if found {
×
172
                        continue
×
UNCOV
173
                }
×
UNCOV
174

×
175
                for _, lb := range vpcLB {
176
                        if err = c.OVNNbClient.LoadBalancerDeleteVip(lb, vip, ignoreHealthCheck); err != nil {
177
                                klog.Errorf("failed to delete vip %s from LB %s: %v", vip, lb, err)
×
178
                                return err
×
179
                        }
180
                }
UNCOV
181
        }
×
UNCOV
182

×
183
        if service.Svc.Spec.Type == v1.ServiceTypeLoadBalancer && c.config.EnableLbSvc {
×
184
                if err := c.deleteLbSvc(service.Svc); err != nil {
×
185
                        klog.Errorf("failed to delete service %s, %v", service.Svc.Name, err)
×
186
                        return err
187
                }
×
UNCOV
188
        }
×
UNCOV
189

×
190
        return nil
×
UNCOV
191
}
×
192

193
func (c *Controller) handleUpdateService(svcObject *updateSvcObject) error {
194
        key := svcObject.key
195
        keys := strings.Split(key, "#")
196
        key = keys[0]
×
197
        var ipsToDel []string
×
198
        if len(keys) == 2 {
×
199
                ipsToDelStr := keys[1]
×
200
                ipsToDel = strings.Split(ipsToDelStr, ",")
×
201
        }
202

203
        namespace, name, err := cache.SplitMetaNamespaceKey(key)
×
204
        if err != nil {
205
                klog.Error(err)
206
                utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
×
207
                return nil
×
208
        }
×
UNCOV
209

×
210
        c.svcKeyMutex.LockKey(key)
×
211
        defer func() { _ = c.svcKeyMutex.UnlockKey(key) }()
×
212
        klog.Infof("handle update service %s", key)
×
213

×
214
        svc, err := c.servicesLister.Services(namespace).Get(name)
×
215
        if err != nil {
216
                if k8serrors.IsNotFound(err) {
×
217
                        return nil
×
218
                }
×
219
                klog.Error(err)
×
220
                return err
×
UNCOV
221
        }
×
222

223
        ips := getVipIps(svc)
×
224

×
225
        vpcName := svc.Annotations[util.VpcAnnotation]
×
226
        if vpcName == "" {
×
227
                vpcName = c.config.ClusterRouter
×
228
        }
×
229
        vpc, err := c.vpcsLister.Get(vpcName)
×
230
        if err != nil {
×
231
                klog.Errorf("failed to get vpc %s of lb, %v", vpcName, err)
×
232
                return err
×
233
        }
×
234

235
        tcpLb, udpLb, sctpLb := vpc.Status.TCPLoadBalancer, vpc.Status.UDPLoadBalancer, vpc.Status.SctpLoadBalancer
236
        oTCPLb, oUDPLb, oSctpLb := vpc.Status.TCPSessionLoadBalancer, vpc.Status.UDPSessionLoadBalancer, vpc.Status.SctpSessionLoadBalancer
×
237
        if svc.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
×
238
                tcpLb, udpLb, sctpLb, oTCPLb, oUDPLb, oSctpLb = oTCPLb, oUDPLb, oSctpLb, tcpLb, udpLb, sctpLb
×
239
        }
×
UNCOV
240

×
241
        var tcpVips, udpVips, sctpVips []string
×
242
        for _, port := range svc.Spec.Ports {
×
243
                for _, ip := range ips {
×
244
                        switch port.Protocol {
×
245
                        case v1.ProtocolTCP:
×
246
                                tcpVips = append(tcpVips, util.JoinHostPort(ip, port.Port))
×
247
                        case v1.ProtocolUDP:
248
                                udpVips = append(udpVips, util.JoinHostPort(ip, port.Port))
×
249
                        case v1.ProtocolSCTP:
×
250
                                sctpVips = append(sctpVips, util.JoinHostPort(ip, port.Port))
×
UNCOV
251
                        }
×
UNCOV
252
                }
×
253
        }
UNCOV
254

×
255
        var (
×
256
                needUpdateEndpointQueue = false
×
257
                ignoreHealthCheck       = true
×
258
        )
×
259

×
260
        // for service update
×
261
        updateVip := func(lbName, oLbName string, svcVips []string) error {
×
262
                if len(lbName) == 0 {
×
263
                        return nil
×
264
                }
265

266
                lb, err := c.OVNNbClient.GetLoadBalancer(lbName, false)
267
                if err != nil {
268
                        klog.Errorf("failed to get LB %s: %v", lbName, err)
×
269
                        return err
×
270
                }
×
271
                klog.V(3).Infof("existing vips of LB %s: %v", lbName, lb.Vips)
×
272
                for _, vip := range svcVips {
×
273
                        if err := c.OVNNbClient.LoadBalancerDeleteVip(oLbName, vip, ignoreHealthCheck); err != nil {
×
274
                                klog.Errorf("failed to delete vip %s from LB %s: %v", vip, oLbName, err)
×
275
                                return err
×
276
                        }
×
UNCOV
277

×
278
                        if _, ok := lb.Vips[vip]; !ok {
279
                                klog.Infof("add vip %s to LB %s", vip, lbName)
×
280
                                needUpdateEndpointQueue = true
×
281
                        }
×
UNCOV
282
                }
×
283
                for vip := range lb.Vips {
×
284
                        if ip := parseVipAddr(vip); (slices.Contains(ips, ip) && !slices.Contains(svcVips, vip)) || slices.Contains(ipsToDel, ip) {
×
285
                                klog.Infof("remove stale vip %s from LB %s", vip, lbName)
×
286
                                if err := c.OVNNbClient.LoadBalancerDeleteVip(lbName, vip, ignoreHealthCheck); err != nil {
×
287
                                        klog.Errorf("failed to delete vip %s from LB %s: %v", vip, lbName, err)
×
288
                                        return err
×
289
                                }
×
290
                        }
UNCOV
291
                }
×
UNCOV
292

×
293
                if len(oLbName) == 0 {
×
294
                        return nil
×
295
                }
UNCOV
296

×
297
                oLb, err := c.OVNNbClient.GetLoadBalancer(oLbName, false)
×
298
                if err != nil {
×
299
                        klog.Errorf("failed to get LB %s: %v", oLbName, err)
×
300
                        return err
×
301
                }
×
302
                klog.V(3).Infof("existing vips of LB %s: %v", oLbName, lb.Vips)
×
303
                for vip := range oLb.Vips {
304
                        if ip := parseVipAddr(vip); slices.Contains(ips, ip) || slices.Contains(ipsToDel, ip) {
305
                                klog.Infof("remove stale vip %s from LB %s", vip, oLbName)
306
                                if err = c.OVNNbClient.LoadBalancerDeleteVip(oLbName, vip, ignoreHealthCheck); err != nil {
×
307
                                        klog.Errorf("failed to delete vip %s from LB %s: %v", vip, oLbName, err)
×
308
                                        return err
×
309
                                }
UNCOV
310
                        }
×
UNCOV
311
                }
×
312
                return nil
×
UNCOV
313
        }
×
UNCOV
314

×
315
        if err = updateVip(tcpLb, oTCPLb, tcpVips); err != nil {
×
316
                klog.Error(err)
×
317
                return err
×
318
        }
×
319
        if err = updateVip(udpLb, oUDPLb, udpVips); err != nil {
×
320
                klog.Error(err)
×
321
                return err
×
322
        }
×
323
        if err = updateVip(sctpLb, oSctpLb, sctpVips); err != nil {
324
                klog.Error(err)
325
                return err
×
326
        }
327

328
        if needUpdateEndpointQueue {
×
329
                c.addOrUpdateEndpointQueue.Add(key)
×
330
        }
×
UNCOV
331

×
332
        if c.config.EnableLbSvc && svc.Spec.Type == v1.ServiceTypeLoadBalancer {
×
333
                changed, err := c.checkLbSvcDeployAnnotationChanged(svc)
×
334
                if err != nil {
×
335
                        klog.Errorf("failed to check annotation change for lb svc %s: %v", key, err)
×
336
                        return err
×
337
                }
×
UNCOV
338

×
UNCOV
339
                // only process svc.spec.ports update
×
340
                if !changed {
341
                        klog.Infof("update loadbalancer service %s", key)
×
342
                        pod, err := c.getLbSvcPod(name, namespace)
×
343
                        if err != nil {
×
344
                                klog.Errorf("failed to get pod for lb svc %s: %v", key, err)
×
345
                                if strings.Contains(err.Error(), "not found") {
346
                                        return nil
×
347
                                }
×
348
                                return err
×
349
                        }
UNCOV
350

×
351
                        toDel := diffSvcPorts(svcObject.oldPorts, svcObject.newPorts)
×
352
                        if err := c.delDnatRules(pod, toDel, svc); err != nil {
×
353
                                klog.Errorf("failed to delete dnat rules, err: %v", err)
×
354
                                return err
×
355
                        }
×
356
                        if err = c.updatePodAttachNets(pod, svc); err != nil {
357
                                klog.Errorf("failed to update pod attachment network for lb svc %s: %v", key, err)
358
                                return err
×
359
                        }
×
UNCOV
360
                }
×
UNCOV
361
        }
×
UNCOV
362

×
363
        return nil
×
UNCOV
364
}
×
UNCOV
365

×
UNCOV
366
// Parse key of map, [fd00:10:96::11c9]:10665 for example
×
367
func parseVipAddr(vip string) string {
368
        host, _, err := net.SplitHostPort(vip)
369
        if err != nil {
×
370
                klog.Errorf("failed to parse vip %q: %v", vip, err)
×
371
                return ""
×
372
        }
×
373
        return host
×
UNCOV
374
}
×
UNCOV
375

×
376
func (c *Controller) handleAddService(key string) error {
×
377
        if !c.config.EnableLbSvc {
×
378
                return nil
379
        }
380

381
        namespace, name, err := cache.SplitMetaNamespaceKey(key)
×
382
        if err != nil {
383
                klog.Error(err)
384
                utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
385
                return nil
×
386
        }
×
UNCOV
387

×
388
        c.svcKeyMutex.LockKey(key)
×
389
        defer func() { _ = c.svcKeyMutex.UnlockKey(key) }()
×
390
        klog.Infof("handle add service %s", key)
×
391

×
392
        svc, err := c.servicesLister.Services(namespace).Get(name)
393
        if err != nil {
394
                if k8serrors.IsNotFound(err) {
×
395
                        return nil
×
396
                }
×
397
                klog.Error(err)
×
398
                return err
UNCOV
399
        }
×
400
        if svc.Spec.Type != v1.ServiceTypeLoadBalancer {
×
401
                return nil
×
402
        }
×
UNCOV
403
        // Skip non kube-ovn lb-svc.
×
404
        if _, ok := svc.Annotations[util.AttachmentProvider]; !ok {
×
405
                return nil
406
        }
×
UNCOV
407

×
408
        klog.Infof("handle add loadbalancer service %s", key)
×
409

×
410
        if err = c.validateSvc(svc); err != nil {
×
411
                c.recorder.Event(svc, v1.EventTypeWarning, "ValidateSvcFailed", err.Error())
×
412
                klog.Errorf("failed to validate lb svc %s: %v", key, err)
×
413
                return err
×
414
        }
×
UNCOV
415

×
416
        nad, err := c.getAttachNetwork(svc)
×
417
        if err != nil {
418
                c.recorder.Event(svc, v1.EventTypeWarning, "GetNADFailed", err.Error())
×
419
                klog.Errorf("failed to check attachment network of lb svc %s: %v", key, err)
×
420
                return err
×
421
        }
UNCOV
422

×
423
        if err = c.createLbSvcPod(svc, nad); err != nil {
×
424
                klog.Errorf("failed to create lb svc pod for %s: %v", key, err)
×
425
                return err
426
        }
×
UNCOV
427

×
428
        var pod *v1.Pod
×
429
        for {
×
430
                pod, err = c.getLbSvcPod(name, namespace)
×
431
                if err != nil {
×
432
                        klog.Warningf("pod for lb svc %s is not running: %v", key, err)
×
433
                        time.Sleep(time.Second)
434
                }
×
435
                if pod != nil {
×
436
                        break
×
UNCOV
437
                }
×
UNCOV
438

×
UNCOV
439
                // It's important here to check existing of svc, used to break the loop.
×
440
                _, err = c.servicesLister.Services(namespace).Get(name)
441
                if err != nil {
×
442
                        if k8serrors.IsNotFound(err) {
×
443
                                return nil
×
444
                        }
×
445
                        klog.Error(err)
446
                        return err
×
UNCOV
447
                }
×
UNCOV
448
        }
×
UNCOV
449

×
450
        loadBalancerIP, err := c.getPodAttachIP(pod, svc)
×
451
        if err != nil {
×
452
                klog.Errorf("failed to get loadBalancerIP: %v", err)
×
453
                return err
×
454
        }
×
455

456
        svc, err = c.servicesLister.Services(namespace).Get(name)
457
        if err != nil {
458
                if k8serrors.IsNotFound(err) {
×
459
                        return nil
×
460
                }
×
461
                klog.Error(err)
×
462
                return err
×
UNCOV
463
        }
×
464
        targetSvc := svc.DeepCopy()
×
465
        if err = c.updatePodAttachNets(pod, targetSvc); err != nil {
466
                klog.Errorf("failed to update pod attachment network for service %s/%s: %v", namespace, name, err)
467
                return err
468
        }
×
UNCOV
469

×
UNCOV
470
        // compatible with IPv4 and IPv6 dual stack subnet.
×
471
        var ingress []v1.LoadBalancerIngress
×
472
        for _, ip := range strings.Split(loadBalancerIP, ",") {
×
473
                if ip != "" && net.ParseIP(ip) != nil {
474
                        ingress = append(ingress, v1.LoadBalancerIngress{IP: ip})
×
475
                }
×
UNCOV
476
        }
×
477
        targetSvc.Status.LoadBalancer.Ingress = ingress
×
478
        if !equality.Semantic.DeepEqual(svc.Status, targetSvc.Status) {
×
479
                if _, err = c.config.KubeClient.CoreV1().Services(namespace).
×
480
                        UpdateStatus(context.Background(), targetSvc, metav1.UpdateOptions{}); err != nil {
×
481
                        klog.Errorf("failed to update status of service %s/%s: %v", namespace, name, err)
482
                        return err
×
483
                }
×
UNCOV
484
        }
×
UNCOV
485

×
486
        return nil
×
487
}
488

489
func getVipIps(svc *v1.Service) []string {
×
490
        var ips []string
×
491
        if vip, ok := svc.Annotations[util.SwitchLBRuleVipsAnnotation]; ok {
×
492
                ips = strings.Split(vip, ",")
×
493
        } else {
×
494
                ips = util.ServiceClusterIPs(*svc)
495
        }
×
496
        return ips
×
UNCOV
497
}
×
UNCOV
498

×
499
func diffSvcPorts(oldPorts, newPorts []v1.ServicePort) (toDel []v1.ServicePort) {
×
500
        for _, oldPort := range oldPorts {
×
501
                found := false
×
502
                for _, newPort := range newPorts {
503
                        if reflect.DeepEqual(oldPort, newPort) {
504
                                found = true
×
505
                                break
506
                        }
UNCOV
507
                }
×
508
                if !found {
×
509
                        toDel = append(toDel, oldPort)
×
510
                }
×
UNCOV
511
        }
×
UNCOV
512

×
513
        return toDel
×
UNCOV
514
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc