• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 21031892891

15 Jan 2026 12:49PM UTC coverage: 22.711% (-0.02%) from 22.73%
21031892891

Pull #6159

github

changluyi
fix enable u2o may cause metallb underlay err

Signed-off-by: clyi <clyi@alauda.io>
Pull Request #6159: add metallb underlay v6 dualcase

0 of 62 new or added lines in 3 files covered. (0.0%)

2 existing lines in 2 files now uncovered.

12193 of 53688 relevant lines covered (22.71%)

0.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.1
/pkg/controller/endpoint_slice.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "slices"
7
        "strings"
8
        "time"
9

10
        v1 "k8s.io/api/core/v1"
11
        discoveryv1 "k8s.io/api/discovery/v1"
12
        "k8s.io/apimachinery/pkg/api/errors"
13
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14
        "k8s.io/apimachinery/pkg/labels"
15
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
16
        "k8s.io/client-go/tools/cache"
17
        "k8s.io/klog/v2"
18

19
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
20
        "github.com/kubeovn/kube-ovn/pkg/ovs"
21
        "github.com/kubeovn/kube-ovn/pkg/util"
22
)
23

24
type IPPortMapping map[string]string
25

26
// getServiceForEndpointSlice returns the service linked to an EndpointSlice
27
func getServiceForEndpointSlice(endpointSlice *discoveryv1.EndpointSlice) string {
1✔
28
        if endpointSlice != nil && endpointSlice.Labels != nil {
2✔
29
                return endpointSlice.Labels[discoveryv1.LabelServiceName]
1✔
30
        }
1✔
31

32
        return ""
1✔
33
}
34

35
func findServiceKey(endpointSlice *discoveryv1.EndpointSlice) string {
1✔
36
        service := getServiceForEndpointSlice(endpointSlice)
1✔
37
        if service == "" {
2✔
38
                return ""
1✔
39
        }
1✔
40

41
        return endpointSlice.Namespace + "/" + service
1✔
42
}
43

44
func (c *Controller) enqueueAddEndpointSlice(obj any) {
×
45
        key := findServiceKey(obj.(*discoveryv1.EndpointSlice))
×
46
        if key != "" {
×
47
                klog.V(3).Infof("enqueue add endpointSlice %s", key)
×
48
                c.addOrUpdateEndpointSliceQueue.Add(key)
×
49
        }
×
50
}
51

52
func (c *Controller) enqueueUpdateEndpointSlice(oldObj, newObj any) {
×
53
        oldEndpointSlice := oldObj.(*discoveryv1.EndpointSlice)
×
54
        newEndpointSlice := newObj.(*discoveryv1.EndpointSlice)
×
55
        if oldEndpointSlice.ResourceVersion == newEndpointSlice.ResourceVersion {
×
56
                return
×
57
        }
×
58

59
        if len(oldEndpointSlice.Endpoints) == 0 && len(newEndpointSlice.Endpoints) == 0 {
×
60
                return
×
61
        }
×
62

63
        key := findServiceKey(newEndpointSlice)
×
64
        if key != "" {
×
65
                klog.V(3).Infof("enqueue update endpointSlice for service %s", key)
×
66
                c.addOrUpdateEndpointSliceQueue.Add(key)
×
67
        }
×
68
}
69

70
func (c *Controller) handleUpdateEndpointSlice(key string) error {
×
71
        namespace, name, err := cache.SplitMetaNamespaceKey(key)
×
72
        if err != nil {
×
73
                utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
×
74
                return nil
×
75
        }
×
76

77
        c.epKeyMutex.LockKey(key)
×
78
        defer func() { _ = c.epKeyMutex.UnlockKey(key) }()
×
79
        klog.Infof("handle update endpointSlice for service %s", key)
×
80

×
81
        endpointSlices, err := c.endpointSlicesLister.EndpointSlices(namespace).List(labels.Set{discoveryv1.LabelServiceName: name}.AsSelector())
×
82
        if err != nil {
×
83
                if errors.IsNotFound(err) {
×
84
                        return nil
×
85
                }
×
86
                klog.Error(err)
×
87
                return err
×
88
        }
89

90
        cachedService, err := c.servicesLister.Services(namespace).Get(name)
×
91
        if err != nil {
×
92
                if errors.IsNotFound(err) {
×
93
                        return nil
×
94
                }
×
95
                klog.Error(err)
×
96
                return err
×
97
        }
98
        svc := cachedService.DeepCopy()
×
99

×
100
        var (
×
101
                lbVips                   []string
×
102
                vip, vpcName, subnetName string
×
103
                ok                       bool
×
104
                ignoreHealthCheck        = true
×
105
                isPreferLocalBackend     = false
×
106
        )
×
107

×
108
        if vip, ok = svc.Annotations[util.SwitchLBRuleVipsAnnotation]; ok {
×
109
                lbVips = []string{vip}
×
110

×
111
                // Health checks can only run against IPv4 endpoints and if the service doesn't specify they must be disabled
×
112
                if util.CheckProtocol(vip) == kubeovnv1.ProtocolIPv4 && !serviceHealthChecksDisabled(svc) {
×
113
                        ignoreHealthCheck = false
×
114
                }
×
115
        } else if lbVips = util.ServiceClusterIPs(*svc); len(lbVips) == 0 {
×
116
                return nil
×
117
        }
×
118

119
        if c.config.EnableLb && c.config.EnableOVNLBPreferLocal {
×
120
                if svc.Spec.Type == v1.ServiceTypeLoadBalancer && svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal {
×
121
                        if len(svc.Status.LoadBalancer.Ingress) > 0 {
×
122
                                for _, ingress := range svc.Status.LoadBalancer.Ingress {
×
123
                                        if ingress.IP != "" {
×
124
                                                lbVips = append(lbVips, ingress.IP)
×
125
                                        }
×
126
                                }
127
                        }
128
                        isPreferLocalBackend = true
×
129
                } else if svc.Spec.Type == v1.ServiceTypeClusterIP && svc.Spec.InternalTrafficPolicy != nil && *svc.Spec.InternalTrafficPolicy == v1.ServiceInternalTrafficPolicyLocal {
×
130
                        isPreferLocalBackend = true
×
131
                }
×
132
        }
133

134
        // If Kube-OVN is running in secondary CNI mode, the endpoint IPs should be derived from the network attachment definitions
135
        // This overwrite can be removed if endpoint construction accounts for network attachment IP address
136
        // TODO: Identify how endpoints are constructed, by default, endpoints has IP address of eth0 interface
137
        if c.config.EnableNonPrimaryCNI && serviceHasSelector(svc) {
×
138
                var pods []*v1.Pod
×
139
                if pods, err = c.podsLister.Pods(namespace).List(labels.Set(svc.Spec.Selector).AsSelector()); err != nil {
×
140
                        klog.Errorf("failed to get pods for service %s in namespace %s: %v", name, namespace, err)
×
141
                        return err
×
142
                }
×
143
                err = c.replaceEndpointAddressesWithSecondaryIPs(endpointSlices, pods)
×
144
                if err != nil {
×
145
                        klog.Errorf("failed to update endpointSlice: %v", err)
×
146
                        return err
×
147
                }
×
148
        }
149

150
        vpcName, subnetName, err = c.getVpcAndSubnetForEndpoints(endpointSlices, svc)
×
151
        if err != nil {
×
152
                return err
×
153
        }
×
154

155
        var (
×
156
                vpc    *kubeovnv1.Vpc
×
157
                svcVpc string
×
158
        )
×
159

×
160
        if vpc, err = c.vpcsLister.Get(vpcName); err != nil {
×
161
                klog.Errorf("failed to get vpc %s, %v", vpcName, err)
×
162
                return err
×
163
        }
×
164

165
        tcpLb, udpLb, sctpLb := vpc.Status.TCPLoadBalancer, vpc.Status.UDPLoadBalancer, vpc.Status.SctpLoadBalancer
×
166
        oldTCPLb, oldUDPLb, oldSctpLb := vpc.Status.TCPSessionLoadBalancer, vpc.Status.UDPSessionLoadBalancer, vpc.Status.SctpSessionLoadBalancer
×
167
        if svc.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
×
168
                tcpLb, udpLb, sctpLb, oldTCPLb, oldUDPLb, oldSctpLb = oldTCPLb, oldUDPLb, oldSctpLb, tcpLb, udpLb, sctpLb
×
169
        }
×
170

171
        for _, lbVip := range lbVips {
×
172
                for _, port := range svc.Spec.Ports {
×
173
                        var lb, oldLb string
×
174
                        switch port.Protocol {
×
175
                        case v1.ProtocolTCP:
×
176
                                lb, oldLb = tcpLb, oldTCPLb
×
177
                        case v1.ProtocolUDP:
×
178
                                lb, oldLb = udpLb, oldUDPLb
×
179
                        case v1.ProtocolSCTP:
×
180
                                lb, oldLb = sctpLb, oldSctpLb
×
181
                        }
182

183
                        var (
×
184
                                vip, checkIP             string
×
185
                                backends                 []string
×
186
                                ipPortMapping, externals map[string]string
×
187
                        )
×
188

×
189
                        if !ignoreHealthCheck {
×
190
                                if checkIP, err = c.getHealthCheckVip(subnetName, lbVip); err != nil {
×
191
                                        klog.Error(err)
×
192
                                        return err
×
193
                                }
×
194
                                externals = map[string]string{
×
195
                                        util.SwitchLBRuleSubnet: subnetName,
×
196
                                }
×
197
                        }
198

199
                        if isPreferLocalBackend {
×
200
                                // only use the ipportmapping's lsp to ip map when the backend is local
×
201
                                checkIP = util.MasqueradeCheckIP
×
202
                        }
×
203

204
                        backends = c.getEndpointBackend(endpointSlices, port, lbVip)
×
205

×
206
                        if !ignoreHealthCheck || isPreferLocalBackend {
×
207
                                ipPortMapping, err = c.getIPPortMapping(endpointSlices, svc, checkIP)
×
208
                                if err != nil {
×
209
                                        err := fmt.Errorf("couldn't get ip port mapping for svc %s/%s: %w", svc.Namespace, svc.Name, err)
×
210
                                        return err
×
211
                                }
×
212
                        }
213

214
                        // for performance reason delete lb with no backends
215
                        if len(backends) != 0 {
×
216
                                vip = util.JoinHostPort(lbVip, port.Port)
×
217
                                klog.Infof("add vip endpoint %s, backends %v to LB %s", vip, backends, lb)
×
218
                                if err = c.OVNNbClient.LoadBalancerAddVip(lb, vip, backends...); err != nil {
×
219
                                        klog.Errorf("failed to add vip %s with backends %s to LB %s: %v", lbVip, backends, lb, err)
×
220
                                        return err
×
221
                                }
×
222

223
                                if isPreferLocalBackend && len(ipPortMapping) != 0 {
×
224
                                        if err = c.OVNNbClient.LoadBalancerUpdateIPPortMapping(lb, vip, ipPortMapping); err != nil {
×
225
                                                klog.Errorf("failed to update ip port mapping %s for vip %s to LB %s: %v", ipPortMapping, vip, lb, err)
×
226
                                                return err
×
227
                                        }
×
228
                                }
229

230
                                if !ignoreHealthCheck {
×
231
                                        klog.Infof("add health check ip port mapping %v to LB %s", ipPortMapping, lb)
×
232
                                        if err = c.OVNNbClient.LoadBalancerAddHealthCheck(lb, vip, ignoreHealthCheck, ipPortMapping, externals); err != nil {
×
233
                                                klog.Errorf("failed to add health check for vip %s with ip port mapping %s to LB %s: %v", lbVip, ipPortMapping, lb, err)
×
234
                                                return err
×
235
                                        }
×
236
                                }
237
                        } else {
×
238
                                vip = util.JoinHostPort(lbVip, port.Port)
×
239
                                klog.V(3).Infof("delete vip endpoint %s from LB %s", vip, lb)
×
240
                                if err = c.OVNNbClient.LoadBalancerDeleteVip(lb, vip, true); err != nil {
×
241
                                        klog.Errorf("failed to delete vip endpoint %s from LB %s: %v", vip, lb, err)
×
242
                                        return err
×
243
                                }
×
244

245
                                klog.V(3).Infof("delete vip endpoint %s from old LB %s", vip, oldLb)
×
246
                                if err = c.OVNNbClient.LoadBalancerDeleteVip(oldLb, vip, true); err != nil {
×
247
                                        klog.Errorf("failed to delete vip %s from LB %s: %v", vip, oldLb, err)
×
248
                                        return err
×
249
                                }
×
250

251
                                if c.config.EnableOVNLBPreferLocal {
×
252
                                        if err := c.OVNNbClient.LoadBalancerDeleteIPPortMapping(lb, vip); err != nil {
×
253
                                                klog.Errorf("failed to delete ip port mapping for vip %s from LB %s: %v", vip, lb, err)
×
254
                                                return err
×
255
                                        }
×
256
                                        if err := c.OVNNbClient.LoadBalancerDeleteIPPortMapping(oldLb, vip); err != nil {
×
257
                                                klog.Errorf("failed to delete ip port mapping for vip %s from LB %s: %v", vip, lb, err)
×
258
                                                return err
×
259
                                        }
×
260
                                }
261
                        }
262
                }
263
        }
264

265
        if svcVpc = svc.Annotations[util.VpcAnnotation]; svcVpc != vpcName {
×
266
                patch := util.KVPatch{util.VpcAnnotation: vpcName}
×
267
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Services(namespace), svc.Name, patch); err != nil {
×
268
                        klog.Errorf("failed to patch service %s: %v", key, err)
×
269
                        return err
×
270
                }
×
271
        }
272

273
        return nil
×
274
}
275

276
// Update the endpoint IP address with the secondary IP address of the pod using the network attachment definition annotation
277
// This is a temporary fix to allow consumers to use the secondary IP address of the pod
278
// TODO: Remove this function and update the endpoint construction to use the secondary IP address of the pod
279
func (c *Controller) replaceEndpointAddressesWithSecondaryIPs(endpointSlices []*discoveryv1.EndpointSlice, pods []*v1.Pod) error {
1✔
280
        // Track which pods have been processed
1✔
281
        processedPods := make(map[string]bool)
1✔
282
        // Store pod information in a map
1✔
283
        podMap := make(map[string]*v1.Pod, len(pods))
1✔
284
        for i := range pods {
2✔
285
                podMap[pods[i].Name] = pods[i]
1✔
286
        }
1✔
287
        // Pre-compute secondary IPs for all pods to avoid repeated annotation lookups
288
        secondaryIPs := make(map[string]string, len(pods))
1✔
289
        for _, pod := range pods {
2✔
290
                providers, err := c.getPodProviders(pod)
1✔
291
                if err != nil {
1✔
292
                        return err
×
293
                }
×
294
                if len(providers) > 0 {
2✔
295
                        ipAddress := pod.Annotations[fmt.Sprintf(util.IPAddressAnnotationTemplate, providers[0])]
1✔
296
                        if ipAddress != "" {
2✔
297
                                secondaryIPs[pod.Name] = ipAddress
1✔
298
                        }
1✔
299
                }
300
        }
301
        // Process each endpoint slice
302
        for i, endpoint := range endpointSlices {
2✔
303
                var copiedSlice *discoveryv1.EndpointSlice
1✔
304
                needsUpdate := false
1✔
305
                // Check if any endpoints need updating first
1✔
306
                for j, ep := range endpoint.Endpoints {
2✔
307
                        if ep.TargetRef != nil && ep.TargetRef.Kind == util.KindPod {
2✔
308
                                podName := ep.TargetRef.Name
1✔
309
                                // Skip if already processed this pod
1✔
310
                                // Include slice index to handle pod in multiple slices
1✔
311
                                podKey := fmt.Sprintf("%s/%d", podName, i)
1✔
312
                                if processedPods[podKey] {
1✔
313
                                        continue
×
314
                                }
315
                                if secondaryIP, hasSecondaryIP := secondaryIPs[podName]; hasSecondaryIP {
2✔
316
                                        if pod, ok := podMap[podName]; ok {
2✔
317
                                                // Check if any address needs replacement
1✔
318
                                                for k, address := range ep.Addresses {
2✔
319
                                                        // Only replace if it's the primary IP
1✔
320
                                                        if address == pod.Status.PodIP {
2✔
321
                                                                // Lazy deep copy
1✔
322
                                                                if !needsUpdate {
2✔
323
                                                                        copiedSlice = endpoint.DeepCopy()
1✔
324
                                                                        needsUpdate = true
1✔
325
                                                                }
1✔
326
                                                                klog.Infof("updating pod %s/%s ip address %s to %s",
1✔
327
                                                                        pod.Namespace, pod.Name, pod.Status.PodIP, secondaryIP)
1✔
328
                                                                copiedSlice.Endpoints[j].Addresses[k] = secondaryIP
1✔
329
                                                                processedPods[podKey] = true
1✔
330
                                                                // Only one primary IP per endpoint
1✔
331
                                                                break
1✔
332
                                                        } else if address == secondaryIP {
×
333
                                                                // Already has secondary IP, mark as processed
×
334
                                                                processedPods[podKey] = true
×
335
                                                                break
×
336
                                                        }
337
                                                }
338
                                        }
339
                                }
340
                        }
341
                }
342
                // Replace the slice if we made changes
343
                if needsUpdate {
2✔
344
                        endpointSlices[i] = copiedSlice
1✔
345
                }
1✔
346
        }
347

348
        return nil
1✔
349
}
350

351
// enqueueStaticEndpointUpdateInNamespace enqueues updates for every statically generated EndpointSlice in a namespace.
352
// Statically generated EndpointSlices are not generated by the selectors of their parent service.
353
func (c *Controller) enqueueStaticEndpointUpdateInNamespace(namespace string) {
×
354
        // Find all the statically generated EndpointSlices in the namespace
×
355
        endpointSlices, err := c.findStaticEndpointSlicesInNamespace(namespace)
×
356
        if err != nil {
×
357
                err := fmt.Errorf("couldn't find static endpointslices in namespace %s: %w", namespace, err)
×
358
                klog.Error(err)
×
359
        }
×
360

361
        // Enqueue updates for all the EndpointSlices
362
        for _, slice := range endpointSlices {
×
363
                c.enqueueAddEndpointSlice(slice)
×
364
        }
×
365
}
366

367
// serviceHealthChecksDisabled returns whether health checks must be omitted for a particular service
368
func serviceHealthChecksDisabled(service *v1.Service) bool {
1✔
369
        // Service must not have disabled health checks
1✔
370
        if service.Annotations != nil && service.Annotations[util.ServiceHealthCheck] == "false" {
2✔
371
                return true
1✔
372
        }
1✔
373

374
        // If nothing is specified, checks are enabled by default
375
        return false
1✔
376
}
377

378
// findStaticEndpointSlicesInNamespace finds all the EndpointSlices in a namespace that are statically generated.
379
// Statically generated EndpointSlices are not generated by the selectors of their parent service.
380
func (c *Controller) findStaticEndpointSlicesInNamespace(namespace string) ([]*discoveryv1.EndpointSlice, error) {
×
381
        // Retrieve all the services in the namespace
×
382
        services, err := c.servicesLister.Services(namespace).List(labels.Everything())
×
383
        if err != nil {
×
384
                err := fmt.Errorf("couldn't list services in namespace %s: %w", namespace, err)
×
385
                klog.Error(err)
×
386
                return nil, err
×
387
        }
×
388

389
        // Only handle services that have static endpoints provided, and not selectors
390
        var filteredServices []*v1.Service
×
391
        for _, service := range services {
×
392
                if serviceHasSelector(service) {
×
393
                        continue
×
394
                }
395

396
                filteredServices = append(filteredServices, service)
×
397
        }
398

399
        // Find the EndpointSlices linked to those services
400
        endpointSlices, err := c.findEndpointSlicesForServices(namespace, filteredServices)
×
401
        if err != nil {
×
402
                return nil, err
×
403
        }
×
404

405
        return endpointSlices, nil
×
406
}
407

408
// findEndpointSlicesForServices returns all the EndpointSlices that are linked to services in the same namespace.
409
// Parameter "namespace" is the namespace in which all the services are located.
410
// Parameter "services" is a list of all the services for which we want to find the EndpointSlices.
411
func (c *Controller) findEndpointSlicesForServices(namespace string, services []*v1.Service) ([]*discoveryv1.EndpointSlice, error) {
×
412
        var endpointSlices []*discoveryv1.EndpointSlice
×
413

×
414
        // Retrieve all the endpointSlices in the namespace of the services
×
415
        eps, err := c.endpointSlicesLister.EndpointSlices(namespace).List(labels.Everything())
×
416
        if err != nil {
×
417
                err := fmt.Errorf("couldn't list endpointslices in namespace %s: %w", namespace, err)
×
418
                klog.Error(err)
×
419
                return nil, err
×
420
        }
×
421

422
        // Find the EndpointSlices part of each service
423
        for _, service := range services {
×
424
                for _, endpointSlice := range eps {
×
425
                        if getServiceForEndpointSlice(endpointSlice) == service.Name {
×
426
                                endpointSlices = append(endpointSlices, endpointSlice)
×
427
                        }
×
428
                }
429
        }
430

431
        return endpointSlices, nil
×
432
}
433

434
// serviceHasSelector returns if a service has selectors
435
func serviceHasSelector(service *v1.Service) bool {
1✔
436
        return len(service.Spec.Selector) > 0
1✔
437
}
1✔
438

439
// getCustomServiceVpcAndSubnet returns the custom VPC/Subnet defined on a service
440
func getCustomServiceVpcAndSubnet(service *v1.Service) (vpcName, subnetName string) {
×
441
        if service.Annotations != nil {
×
442
                vpcName = service.Annotations[util.LogicalRouterAnnotation]
×
443
                subnetName = service.Annotations[util.LogicalSwitchAnnotation]
×
444
        }
×
445

446
        return vpcName, subnetName
×
447
}
448

449
// getDefaultVpcAndSubnet returns the default VPC/Subnet to apply to a LoadBalancer if nothing was found
450
// during automatic discovery. If both parameters are non-empty, they are returned as is.
451
func (c *Controller) getDefaultVpcAndSubnet(service *v1.Service, vpcName, subnetName string) (string, string) {
×
452
        // Default to what's on the service or to the default VPC
×
453
        if vpcName == "" {
×
454
                if vpcName = service.Annotations[util.VpcAnnotation]; vpcName == "" {
×
455
                        vpcName = c.config.ClusterRouter
×
456
                }
×
457
        }
458

459
        // Use the default subnet if it wasn't found
460
        if subnetName == "" {
×
461
                subnetName = util.DefaultSubnet
×
462
        }
×
463

464
        return vpcName, subnetName
×
465
}
466

467
// getVpcAndSubnetForEndpoints returns the name of the VPC/Subnet for EndpointSlices
468
func (c *Controller) getVpcAndSubnetForEndpoints(endpointSlices []*discoveryv1.EndpointSlice, service *v1.Service) (vpcName, subnetName string, err error) {
×
469
        // Let the user self-determine what VPC and subnet to use if they provided annotations on the service
×
470
        // Both the VPC and Subnet must be provided
×
471
        vpcName, subnetName = getCustomServiceVpcAndSubnet(service)
×
472
        if vpcName != "" && subnetName != "" {
×
473
                return vpcName, subnetName, nil
×
474
        }
×
475

476
        // Choose the most optimized and straightforward way to retrieve the name of the VPC and subnet
477
        if serviceHasSelector(service) {
×
478
                // The service has a selector, which means that the EndpointSlices should have targets.
×
479
                // We can use those targets instead of looking at every pod in the namespace.
×
480
                vpcName, subnetName = c.findVpcAndSubnetWithTargets(endpointSlices)
×
481
        } else {
×
482
                // The service has no selectors, we must find which pods in the namespace of the service
×
483
                // are targeted by the endpoint by only looking at the IPs.
×
484
                pods, err := c.podsLister.Pods(service.Namespace).List(labels.Everything())
×
485
                if err != nil {
×
486
                        err := fmt.Errorf("failed to get pods for service %s in namespace %s: %w", service.Name, service.Namespace, err)
×
487
                        klog.Error(err)
×
488
                        return "", "", err
×
489
                }
×
490

491
                vpcName, subnetName = c.findVpcAndSubnetWithNoTargets(endpointSlices, pods)
×
492
        }
493

494
        vpcName, subnetName = c.getDefaultVpcAndSubnet(service, vpcName, subnetName)
×
495
        return vpcName, subnetName, nil
×
496
}
497

498
// findVpcAndSubnetWithTargets returns the name of the VPC and Subnet for endpoints with targets
499
func (c *Controller) findVpcAndSubnetWithTargets(endpointSlices []*discoveryv1.EndpointSlice) (vpcName, subnetName string) {
×
500
        for _, slice := range endpointSlices {
×
501
                for _, endpoint := range slice.Endpoints {
×
502
                        if endpoint.TargetRef == nil {
×
503
                                continue
×
504
                        }
505

506
                        namespace, name := endpoint.TargetRef.Namespace, endpoint.TargetRef.Name
×
507
                        if name == "" || namespace == "" {
×
508
                                continue
×
509
                        }
510

511
                        pod, err := c.podsLister.Pods(namespace).Get(name)
×
512
                        if err != nil {
×
513
                                err := fmt.Errorf("couldn't retrieve pod %s/%s: %w", namespace, name, err)
×
514
                                klog.Error(err)
×
515
                                continue
×
516
                        }
517

518
                        vpc, subnet, err := c.getEndpointVpcAndSubnet(pod, endpoint.Addresses)
×
519
                        if err != nil {
×
520
                                err := fmt.Errorf("couldn't retrieve get subnet/vpc for pod %s/%s: %w", namespace, name, err)
×
521
                                klog.Error(err)
×
522
                                continue
×
523
                        }
524

525
                        if vpcName == "" {
×
526
                                vpcName = vpc
×
527
                        }
×
528

529
                        if subnetName == "" {
×
530
                                subnetName = subnet
×
531
                        }
×
532

533
                        if vpcName != "" && subnetName != "" {
×
534
                                return vpcName, subnetName
×
535
                        }
×
536
                }
537
        }
538

539
        return vpcName, subnetName
×
540
}
541

542
// findVpcAndSubnetWithNoTargets returns the name of the VPC and Subnet for endpoints with no targets
543
func (c *Controller) findVpcAndSubnetWithNoTargets(endpointSlices []*discoveryv1.EndpointSlice, pods []*v1.Pod) (vpcName, subnetName string) {
×
544
        for _, slice := range endpointSlices {
×
545
                for _, endpoint := range slice.Endpoints {
×
546
                        for _, pod := range pods {
×
547
                                vpc, subnet, err := c.getEndpointVpcAndSubnet(pod, endpoint.Addresses)
×
548
                                if err != nil {
×
549
                                        err := fmt.Errorf("couldn't retrieve subnet/vpc for pod %s/%s: %w", pod.Namespace, pod.Name, err)
×
550
                                        klog.Error(err)
×
551
                                        continue
×
552
                                }
553

554
                                if vpcName == "" {
×
555
                                        vpcName = vpc
×
556
                                }
×
557

558
                                if subnetName == "" {
×
559
                                        subnetName = subnet
×
560
                                }
×
561

562
                                if vpcName != "" && subnetName != "" {
×
563
                                        return vpcName, subnetName
×
564
                                }
×
565
                        }
566
                }
567
        }
568

569
        return vpcName, subnetName
×
570
}
571

572
// getHealthCheckVip get health check vip for load balancer, the vip name is the subnet name
573
// the vip is used to check the health of the backend pod
574
func (c *Controller) getHealthCheckVip(subnetName, lbVip string) (string, error) {
×
575
        var (
×
576
                needCreateHealthCheckVip bool
×
577
                checkVip                 *kubeovnv1.Vip
×
578
                checkIP                  string
×
579
                err                      error
×
580
        )
×
581
        vipName := subnetName
×
582
        checkVip, err = c.virtualIpsLister.Get(vipName)
×
583
        if err != nil {
×
584
                if errors.IsNotFound(err) {
×
585
                        needCreateHealthCheckVip = true
×
586
                } else {
×
587
                        klog.Errorf("failed to get health check vip %s, %v", vipName, err)
×
588
                        return "", err
×
589
                }
×
590
        }
591
        if needCreateHealthCheckVip {
×
592
                vip := &kubeovnv1.Vip{
×
593
                        ObjectMeta: metav1.ObjectMeta{
×
594
                                Name: vipName,
×
595
                        },
×
596
                        Spec: kubeovnv1.VipSpec{
×
597
                                Subnet: subnetName,
×
598
                        },
×
599
                }
×
600
                if _, err = c.config.KubeOvnClient.KubeovnV1().Vips().Create(context.Background(), vip, metav1.CreateOptions{}); err != nil {
×
601
                        klog.Errorf("failed to create health check vip %s, %v", vipName, err)
×
602
                        return "", err
×
603
                }
×
604

605
                // wait for vip created
606
                // TODO: WATCH VIP
607
                time.Sleep(1 * time.Second)
×
608
                checkVip, err = c.virtualIpsLister.Get(vipName)
×
609
                if err != nil {
×
610
                        klog.Errorf("failed to get health check vip %s, %v", vipName, err)
×
611
                        return "", err
×
612
                }
×
613
        }
614

615
        if checkVip.Status.V4ip == "" && checkVip.Status.V6ip == "" {
×
616
                err = fmt.Errorf("vip %s is not ready", vipName)
×
617
                klog.Error(err)
×
618
                return "", err
×
619
        }
×
620

621
        switch util.CheckProtocol(lbVip) {
×
622
        case kubeovnv1.ProtocolIPv4:
×
623
                checkIP = checkVip.Status.V4ip
×
624
        case kubeovnv1.ProtocolIPv6:
×
625
                checkIP = checkVip.Status.V6ip
×
626
        }
627
        if checkIP == "" {
×
628
                err = fmt.Errorf("failed to get health check vip subnet %s", vipName)
×
629
                klog.Error(err)
×
630
                return "", err
×
631
        }
×
632

633
        return checkIP, nil
×
634
}
635

636
// getEndpointBackend returns the LB backend for a service
637
func (c *Controller) getEndpointBackend(endpointSlices []*discoveryv1.EndpointSlice, servicePort v1.ServicePort, serviceIP string) (backends []string) {
×
638
        protocol := util.CheckProtocol(serviceIP)
×
639

×
640
        for _, endpointSlice := range endpointSlices {
×
641
                var targetPort int32
×
642
                for _, port := range endpointSlice.Ports {
×
643
                        if port.Name != nil && *port.Name == servicePort.Name {
×
644
                                targetPort = *port.Port
×
645
                                break
×
646
                        }
647
                }
648
                if targetPort == 0 {
×
649
                        continue
×
650
                }
651

652
                for _, endpoint := range endpointSlice.Endpoints {
×
653
                        if !endpointReady(endpoint) {
×
654
                                continue
×
655
                        }
656

657
                        for _, address := range endpoint.Addresses {
×
658
                                if util.CheckProtocol(address) == protocol {
×
659
                                        backends = append(backends, util.JoinHostPort(address, targetPort))
×
660
                                }
×
661
                        }
662
                }
663
        }
664

665
        return backends
×
666
}
667

668
// endpointReady returns whether an endpoint can receive traffic
669
func endpointReady(endpoint discoveryv1.Endpoint) bool {
1✔
670
        return endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready
1✔
671
}
1✔
672

673
// addIPPortMappingEntry adds a new entry to an IPPortMapping for a given target, the addresses on that target and the
674
// VIP used to run the health checks
675
func (c *Controller) addIPPortMappingEntry(pod *v1.Pod, addresses []string, checkVip string, mapping IPPortMapping) error {
×
676
        // Abort if the pod is getting deleted
×
677
        if !pod.DeletionTimestamp.IsZero() {
×
678
                return nil
×
679
        }
×
680

681
        // Compute the name of the LSP for that endpoint target
682
        lspName, err := c.getEndpointTargetLSPName(pod, addresses)
×
683
        if err != nil {
×
684
                return fmt.Errorf("couldn't get LSP for the endpoint's target: %w", err)
×
685
        }
×
686

687
        for _, address := range addresses {
×
NEW
688
                key := address
×
NEW
689
                if util.CheckProtocol(address) == kubeovnv1.ProtocolIPv6 {
×
NEW
690
                        key = fmt.Sprintf("[%s]", address)
×
NEW
691
                }
×
NEW
692
                mapping[key] = fmt.Sprintf(util.HealthCheckNamedVipTemplate, lspName, checkVip)
×
693
        }
694

695
        return nil
×
696
}
697

698
// getIPPortMapping returns the mapping between each endpoint, LSP and health check VIP
699
func (c *Controller) getIPPortMapping(endpointSlices []*discoveryv1.EndpointSlice, service *v1.Service, checkVip string) (IPPortMapping, error) {
×
700
        // Choose the most optimized and straightforward way to compute the IPPortMapping
×
701
        if serviceHasSelector(service) {
×
702
                // The service has a selector, which means that the EndpointSlices should have targets.
×
703
                // We can use those targets instead of looking at every pod in the namespace.
×
704
                return c.getIPPortMappingWithTargets(endpointSlices, checkVip), nil
×
705
        }
×
706

707
        // The service has no selectors, we must find which pods in the namespace of the service
708
        // are targeted by the endpoint by only looking at the IPs.
709
        pods, err := c.podsLister.Pods(service.Namespace).List(labels.Everything())
×
710
        if err != nil {
×
711
                err := fmt.Errorf("failed to get pods for service %s in namespace %s: %w", service.Name, service.Namespace, err)
×
712
                klog.Error(err)
×
713
                return nil, err
×
714
        }
×
715

716
        return c.getIPPortMappingWithNoTargets(endpointSlices, pods, checkVip), nil
×
717
}
718

719
// getIPPortMappingWithTargets returns the IPPortMapping for endpoints with targets
720
func (c *Controller) getIPPortMappingWithTargets(endpointSlices []*discoveryv1.EndpointSlice, checkVip string) IPPortMapping {
×
721
        mapping := make(IPPortMapping)
×
722

×
723
        for _, slice := range endpointSlices {
×
724
                for _, endpoint := range slice.Endpoints {
×
725
                        if endpoint.TargetRef == nil {
×
726
                                continue
×
727
                        }
728

729
                        namespace, name := endpoint.TargetRef.Namespace, endpoint.TargetRef.Name
×
730
                        if name == "" || namespace == "" {
×
731
                                continue
×
732
                        }
733

734
                        // Retrieve the pod for that endpoint target
735
                        pod, err := c.podsLister.Pods(namespace).Get(name)
×
736
                        if err != nil {
×
737
                                err := fmt.Errorf("couldn't retrieve pod %s/%s: %w", namespace, name, err)
×
738
                                klog.Error(err)
×
739
                                continue
×
740
                        }
741

742
                        // Compute the IPPortMapping for that endpoint target
743
                        if err := c.addIPPortMappingEntry(pod, endpoint.Addresses, checkVip, mapping); err != nil {
×
744
                                err := fmt.Errorf("couldn't compute ip port mapping for pod %s/%s: %w", namespace, name, err)
×
745
                                klog.Error(err)
×
746
                                continue
×
747
                        }
748
                }
749
        }
750

751
        return mapping
×
752
}
753

754
// getIPPortMappingWithNoTargets returns the IPPortMapping for endpoints with no targets
755
func (c *Controller) getIPPortMappingWithNoTargets(endpointSlices []*discoveryv1.EndpointSlice, pods []*v1.Pod, checkVip string) IPPortMapping {
×
756
        mapping := make(IPPortMapping)
×
757

×
758
        for _, slice := range endpointSlices {
×
759
                for _, endpoint := range slice.Endpoints {
×
760
                        for _, pod := range pods {
×
761
                                // Try to find a matching provider for the addresses
×
762
                                provider, err := c.getEndpointProvider(pod, endpoint.Addresses)
×
763
                                if err != nil {
×
764
                                        err := fmt.Errorf("couldn't get provider for pod %s/%s: %w", pod.Namespace, pod.Name, err)
×
765
                                        klog.Error(err)
×
766
                                        continue
×
767
                                }
768

769
                                // If the pod has a provider that matches that set of addresses, it is an endpoint target.
770
                                // Otherwise, it isn't targeted by the EndpointSlice and can be dismissed.
771
                                if provider == "" {
×
772
                                        continue
×
773
                                }
774

775
                                // Compute the IPPortMapping for that endpoint target
776
                                if err := c.addIPPortMappingEntry(pod, endpoint.Addresses, checkVip, mapping); err != nil {
×
777
                                        err := fmt.Errorf("couldn't compute ip port mapping for pod %s/%s: %w", pod.Namespace, pod.Name, err)
×
778
                                        klog.Error(err)
×
779
                                        continue
×
780
                                }
781
                        }
782
                }
783
        }
784

785
        return mapping
×
786
}
787

788
// getPodProviders returns all the providers available on a pod
789
func (c *Controller) getPodProviders(pod *v1.Pod) ([]string, error) {
1✔
790
        // Get all the networks to which the pod is attached
1✔
791
        podNetworks, err := c.getPodKubeovnNets(pod)
1✔
792
        if err != nil {
1✔
793
                return nil, fmt.Errorf("failed to get pod networks: %w", err)
×
794
        }
×
795

796
        // Retrieve all the providers
797
        var providers []string
1✔
798
        for _, podNetwork := range podNetworks {
2✔
799
                providers = append(providers, podNetwork.ProviderName)
1✔
800
        }
1✔
801

802
        return providers, nil
1✔
803
}
804

805
// getMatchingProviderForAddress returns the provider linked to a subnet in which a particular address is present
806
func getMatchingProviderForAddress(pod *v1.Pod, providers []string, address string) string {
1✔
807
        if pod.Annotations == nil {
2✔
808
                return ""
1✔
809
        }
1✔
810

811
        // Find which provider is linked to this address
812
        for _, provider := range providers {
2✔
813
                ipsForProvider, exists := pod.Annotations[fmt.Sprintf(util.IPAddressAnnotationTemplate, provider)]
1✔
814
                if !exists {
1✔
815
                        continue
×
816
                }
817

818
                ips := strings.Split(ipsForProvider, ",")
1✔
819
                if slices.Contains(ips, address) {
2✔
820
                        return provider
1✔
821
                }
1✔
822
        }
823

824
        return ""
1✔
825
}
826

827
// getEndpointProvider returns the provider linked to the addresses of an endpoint
828
func (c *Controller) getEndpointProvider(pod *v1.Pod, addresses []string) (string, error) {
×
829
        // Retrieve all the providers of the pod
×
830
        providers, err := c.getPodProviders(pod)
×
831
        if err != nil {
×
832
                return "", err
×
833
        }
×
834

835
        // Get the first matching provider for any of the address in the endpoint
836
        var provider string
×
837
        for _, address := range addresses {
×
838
                if provider = getMatchingProviderForAddress(pod, providers, address); provider != "" {
×
839
                        return provider, nil
×
840
                }
×
841
        }
842

843
        return "", nil
×
844
}
845

846
// getEndpointTargetLSPNameFromProvider returns the name of the LSP for a pod targeted by an endpoint.
847
// A custom provider can be specified if the LSP is within a subnet that doesn't use
848
// the default "ovn" provider.
849
func getEndpointTargetLSPNameFromProvider(pod *v1.Pod, provider string) string {
1✔
850
        // If no provider is specified, use the default one
1✔
851
        if provider == "" {
2✔
852
                provider = util.OvnProvider
1✔
853
        }
1✔
854

855
        target := pod.Name
1✔
856

1✔
857
        // If this pod is a VM launcher pod, we need to retrieve the name of the VM. This is necessary
1✔
858
        // because we do not use the same syntax for the LSP of normal pods and for VM pods
1✔
859
        if vmName, exists := pod.Annotations[fmt.Sprintf(util.VMAnnotationTemplate, provider)]; exists {
2✔
860
                target = vmName
1✔
861
        }
1✔
862

863
        return ovs.PodNameToPortName(target, pod.Namespace, provider)
1✔
864
}
865

866
// getEndpointTargetLSP returns the name of the LSP on which addresses are attached for a specific pod
867
func (c *Controller) getEndpointTargetLSPName(pod *v1.Pod, addresses []string) (string, error) {
×
868
        // Retrieve the provider for those addresses
×
869
        provider, err := c.getEndpointProvider(pod, addresses)
×
870
        if err != nil {
×
871
                return "", err
×
872
        }
×
873

874
        return getEndpointTargetLSPNameFromProvider(pod, provider), nil
×
875
}
876

877
// getSubnetByProvider returns the subnet linked to a provider on a pod
878
func getSubnetByProvider(pod *v1.Pod, provider string) (string, error) {
×
879
        subnetName, exists := pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, provider)]
×
880
        if !exists {
×
881
                return "", fmt.Errorf("couldn't find subnet linked to provider %s", provider)
×
882
        }
×
883

884
        return subnetName, nil
×
885
}
886

887
// getVpcByProvider returns the VPC linked to a provider on a pod
888
func getVpcByProvider(pod *v1.Pod, provider string) (string, error) {
×
889
        vpcName, exists := pod.Annotations[fmt.Sprintf(util.LogicalRouterAnnotationTemplate, provider)]
×
890
        if !exists {
×
891
                return "", fmt.Errorf("couldn't find vpc linked to provider %s", provider)
×
892
        }
×
893

894
        return vpcName, nil
×
895
}
896

897
// getEndpointVpcAndSubnet returns the VPC/subnet for a pod and a set of addresses attached to it
898
func (c *Controller) getEndpointVpcAndSubnet(pod *v1.Pod, addresses []string) (string, string, error) {
×
899
        // Retrieve the provider for those addresses
×
900
        provider, err := c.getEndpointProvider(pod, addresses)
×
901
        if err != nil {
×
902
                return "", "", err
×
903
        }
×
904

905
        if provider == "" {
×
906
                return "", "", nil
×
907
        }
×
908

909
        // Retrieve the subnet
910
        subnet, err := getSubnetByProvider(pod, provider)
×
911
        if err != nil {
×
912
                return "", "", err
×
913
        }
×
914

915
        // Retrieve the VPC
916
        vpc, err := getVpcByProvider(pod, provider)
×
917
        if err != nil {
×
918
                return "", "", err
×
919
        }
×
920

921
        return vpc, subnet, nil
×
922
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc