• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 23134800411

16 Mar 2026 08:35AM UTC coverage: 23.397%. Remained the same
23134800411

push

github

web-flow
fix: avoid error log for missing VPC annotation on underlay pods (#6454)

In underlay networks (subnet with Vlan, without LogicalGateway or
U2OInterconnection), the logical_router annotation is intentionally
not set on pods. The endpoint_slice controller treated this as an
error, causing frequent "couldn't find vpc linked to provider ovn"
log messages. Change getVpcByProvider to return an empty string
instead of an error, allowing the existing fallback logic in
getDefaultVpcAndSubnet to correctly resolve to the default VPC.

Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

0 of 3 new or added lines in 1 file covered. (0.0%)

77 existing lines in 2 files now uncovered.

12827 of 54823 relevant lines covered (23.4%)

0.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.24
/pkg/controller/endpoint_slice.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "slices"
7
        "strings"
8
        "time"
9

10
        v1 "k8s.io/api/core/v1"
11
        discoveryv1 "k8s.io/api/discovery/v1"
12
        "k8s.io/apimachinery/pkg/api/errors"
13
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14
        "k8s.io/apimachinery/pkg/labels"
15
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
16
        "k8s.io/client-go/tools/cache"
17
        "k8s.io/klog/v2"
18

19
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
20
        "github.com/kubeovn/kube-ovn/pkg/ovs"
21
        "github.com/kubeovn/kube-ovn/pkg/util"
22
)
23

24
type IPPortMapping map[string]string
25

26
// getServiceForEndpointSlice returns the service linked to an EndpointSlice
27
func getServiceForEndpointSlice(endpointSlice *discoveryv1.EndpointSlice) string {
1✔
28
        if endpointSlice != nil && endpointSlice.Labels != nil {
2✔
29
                return endpointSlice.Labels[discoveryv1.LabelServiceName]
1✔
30
        }
1✔
31

32
        return ""
1✔
33
}
34

35
func findServiceKey(endpointSlice *discoveryv1.EndpointSlice) string {
1✔
36
        service := getServiceForEndpointSlice(endpointSlice)
1✔
37
        if service == "" {
2✔
38
                return ""
1✔
39
        }
1✔
40

41
        return endpointSlice.Namespace + "/" + service
1✔
42
}
43

44
func (c *Controller) enqueueAddEndpointSlice(obj any) {
×
45
        key := findServiceKey(obj.(*discoveryv1.EndpointSlice))
×
46
        if key != "" {
×
47
                klog.V(3).Infof("enqueue add endpointSlice %s", key)
×
48
                c.addOrUpdateEndpointSliceQueue.Add(key)
×
49
        }
×
50
}
51

52
func (c *Controller) enqueueUpdateEndpointSlice(oldObj, newObj any) {
×
53
        oldEndpointSlice := oldObj.(*discoveryv1.EndpointSlice)
×
54
        newEndpointSlice := newObj.(*discoveryv1.EndpointSlice)
×
55
        if oldEndpointSlice.ResourceVersion == newEndpointSlice.ResourceVersion {
×
56
                return
×
57
        }
×
58

59
        if len(oldEndpointSlice.Endpoints) == 0 && len(newEndpointSlice.Endpoints) == 0 {
×
60
                return
×
61
        }
×
62

63
        key := findServiceKey(newEndpointSlice)
×
64
        if key != "" {
×
65
                klog.V(3).Infof("enqueue update endpointSlice for service %s", key)
×
66
                c.addOrUpdateEndpointSliceQueue.Add(key)
×
67
        }
×
68
}
69

70
func (c *Controller) handleUpdateEndpointSlice(key string) error {
×
71
        namespace, name, err := cache.SplitMetaNamespaceKey(key)
×
72
        if err != nil {
×
73
                utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
×
74
                return nil
×
75
        }
×
76

77
        c.epKeyMutex.LockKey(key)
×
78
        defer func() { _ = c.epKeyMutex.UnlockKey(key) }()
×
79
        klog.Infof("handle update endpointSlice for service %s", key)
×
80

×
81
        endpointSlices, err := c.endpointSlicesLister.EndpointSlices(namespace).List(labels.Set{discoveryv1.LabelServiceName: name}.AsSelector())
×
82
        if err != nil {
×
83
                if errors.IsNotFound(err) {
×
84
                        return nil
×
85
                }
×
86
                klog.Error(err)
×
87
                return err
×
88
        }
89

90
        cachedService, err := c.servicesLister.Services(namespace).Get(name)
×
91
        if err != nil {
×
92
                if errors.IsNotFound(err) {
×
93
                        return nil
×
94
                }
×
95
                klog.Error(err)
×
96
                return err
×
97
        }
98
        svc := cachedService.DeepCopy()
×
99

×
100
        var (
×
101
                lbVips                   []string
×
102
                vip, vpcName, subnetName string
×
103
                ok                       bool
×
104
                ignoreHealthCheck        = true
×
105
                isPreferLocalBackend     = false
×
106
        )
×
107

×
108
        if vip, ok = svc.Annotations[util.SwitchLBRuleVipsAnnotation]; ok {
×
109
                lbVips = []string{vip}
×
110

×
111
                // Health checks can only run against IPv4 endpoints and if the service doesn't specify they must be disabled
×
112
                if util.CheckProtocol(vip) == kubeovnv1.ProtocolIPv4 && !serviceHealthChecksDisabled(svc) {
×
113
                        ignoreHealthCheck = false
×
114
                }
×
115
        } else if lbVips = util.ServiceClusterIPs(*svc); len(lbVips) == 0 {
×
116
                return nil
×
117
        }
×
118

119
        if c.config.EnableLb && c.config.EnableOVNLBPreferLocal {
×
120
                if svc.Spec.Type == v1.ServiceTypeLoadBalancer && svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal {
×
121
                        if len(svc.Status.LoadBalancer.Ingress) > 0 {
×
122
                                for _, ingress := range svc.Status.LoadBalancer.Ingress {
×
123
                                        if ingress.IP != "" {
×
124
                                                lbVips = append(lbVips, ingress.IP)
×
125
                                        }
×
126
                                }
127
                        }
128
                        isPreferLocalBackend = true
×
129
                } else if svc.Spec.Type == v1.ServiceTypeClusterIP && svc.Spec.InternalTrafficPolicy != nil && *svc.Spec.InternalTrafficPolicy == v1.ServiceInternalTrafficPolicyLocal {
×
130
                        isPreferLocalBackend = true
×
131
                }
×
132
        }
133

134
        // If Kube-OVN is running in secondary CNI mode, the endpoint IPs should be derived from the network attachment definitions
135
        // This overwrite can be removed if endpoint construction accounts for network attachment IP address
136
        // TODO: Identify how endpoints are constructed, by default, endpoints has IP address of eth0 interface
137
        if c.config.EnableNonPrimaryCNI && serviceHasSelector(svc) {
×
138
                var pods []*v1.Pod
×
139
                if pods, err = c.podsLister.Pods(namespace).List(labels.Set(svc.Spec.Selector).AsSelector()); err != nil {
×
140
                        klog.Errorf("failed to get pods for service %s in namespace %s: %v", name, namespace, err)
×
141
                        return err
×
142
                }
×
143
                err = c.replaceEndpointAddressesWithSecondaryIPs(endpointSlices, pods)
×
144
                if err != nil {
×
145
                        klog.Errorf("failed to update endpointSlice: %v", err)
×
146
                        return err
×
147
                }
×
148
        }
149

150
        vpcName, subnetName, err = c.getVpcAndSubnetForEndpoints(endpointSlices, svc)
×
151
        if err != nil {
×
152
                return err
×
153
        }
×
154

155
        var (
×
156
                vpc    *kubeovnv1.Vpc
×
157
                svcVpc string
×
158
        )
×
159

×
160
        if vpc, err = c.vpcsLister.Get(vpcName); err != nil {
×
161
                klog.Errorf("failed to get vpc %s, %v", vpcName, err)
×
162
                return err
×
163
        }
×
164

165
        tcpLb, udpLb, sctpLb := vpc.Status.TCPLoadBalancer, vpc.Status.UDPLoadBalancer, vpc.Status.SctpLoadBalancer
×
166
        oldTCPLb, oldUDPLb, oldSctpLb := vpc.Status.TCPSessionLoadBalancer, vpc.Status.UDPSessionLoadBalancer, vpc.Status.SctpSessionLoadBalancer
×
167
        if svc.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
×
168
                tcpLb, udpLb, sctpLb, oldTCPLb, oldUDPLb, oldSctpLb = oldTCPLb, oldUDPLb, oldSctpLb, tcpLb, udpLb, sctpLb
×
169
        }
×
170

171
        for _, lbVip := range lbVips {
×
172
                for _, port := range svc.Spec.Ports {
×
173
                        var lb, oldLb string
×
174
                        switch port.Protocol {
×
175
                        case v1.ProtocolTCP:
×
176
                                lb, oldLb = tcpLb, oldTCPLb
×
177
                        case v1.ProtocolUDP:
×
178
                                lb, oldLb = udpLb, oldUDPLb
×
179
                        case v1.ProtocolSCTP:
×
180
                                lb, oldLb = sctpLb, oldSctpLb
×
181
                        }
182

183
                        var (
×
184
                                vip, checkIP             string
×
185
                                backends                 []string
×
186
                                ipPortMapping, externals map[string]string
×
187
                        )
×
188

×
189
                        if !ignoreHealthCheck {
×
190
                                if checkIP, err = c.getHealthCheckVip(subnetName, lbVip); err != nil {
×
191
                                        klog.Error(err)
×
192
                                        return err
×
193
                                }
×
194
                                externals = map[string]string{
×
195
                                        util.SwitchLBRuleSubnet: subnetName,
×
196
                                }
×
197
                        }
198

199
                        if isPreferLocalBackend {
×
200
                                // only use the ipportmapping's lsp to ip map when the backend is local
×
201
                                checkIP = util.MasqueradeCheckIP
×
202
                        }
×
203

204
                        backends = c.getEndpointBackend(endpointSlices, port, lbVip)
×
205

×
206
                        if !ignoreHealthCheck || isPreferLocalBackend {
×
207
                                ipPortMapping, err = c.getIPPortMapping(endpointSlices, svc, checkIP)
×
208
                                if err != nil {
×
209
                                        err := fmt.Errorf("couldn't get ip port mapping for svc %s/%s: %w", svc.Namespace, svc.Name, err)
×
210
                                        return err
×
211
                                }
×
212
                        }
213

214
                        // for performance reason delete lb with no backends
215
                        if len(backends) != 0 {
×
216
                                vip = util.JoinHostPort(lbVip, port.Port)
×
217
                                klog.Infof("add vip endpoint %s, backends %v to LB %s", vip, backends, lb)
×
218
                                if err = c.OVNNbClient.LoadBalancerAddVip(lb, vip, backends...); err != nil {
×
219
                                        klog.Errorf("failed to add vip %s with backends %s to LB %s: %v", lbVip, backends, lb, err)
×
220
                                        return err
×
221
                                }
×
222

223
                                if isPreferLocalBackend && len(ipPortMapping) != 0 {
×
224
                                        if err = c.OVNNbClient.LoadBalancerUpdateIPPortMapping(lb, vip, ipPortMapping); err != nil {
×
225
                                                klog.Errorf("failed to update ip port mapping %s for vip %s to LB %s: %v", ipPortMapping, vip, lb, err)
×
226
                                                return err
×
227
                                        }
×
228
                                }
229

230
                                if !ignoreHealthCheck {
×
231
                                        klog.Infof("add health check ip port mapping %v to LB %s", ipPortMapping, lb)
×
232
                                        if err = c.OVNNbClient.LoadBalancerAddHealthCheck(lb, vip, ignoreHealthCheck, ipPortMapping, externals); err != nil {
×
233
                                                klog.Errorf("failed to add health check for vip %s with ip port mapping %s to LB %s: %v", lbVip, ipPortMapping, lb, err)
×
234
                                                return err
×
235
                                        }
×
236
                                }
237
                        } else {
×
238
                                vip = util.JoinHostPort(lbVip, port.Port)
×
239
                                klog.V(3).Infof("delete vip endpoint %s from LB %s", vip, lb)
×
240
                                if err = c.OVNNbClient.LoadBalancerDeleteVip(lb, vip, true); err != nil {
×
241
                                        klog.Errorf("failed to delete vip endpoint %s from LB %s: %v", vip, lb, err)
×
242
                                        return err
×
243
                                }
×
244

245
                                klog.V(3).Infof("delete vip endpoint %s from old LB %s", vip, oldLb)
×
246
                                if err = c.OVNNbClient.LoadBalancerDeleteVip(oldLb, vip, true); err != nil {
×
247
                                        klog.Errorf("failed to delete vip %s from LB %s: %v", vip, oldLb, err)
×
248
                                        return err
×
249
                                }
×
250

251
                                if c.config.EnableOVNLBPreferLocal {
×
252
                                        if err := c.OVNNbClient.LoadBalancerDeleteIPPortMapping(lb, vip); err != nil {
×
253
                                                klog.Errorf("failed to delete ip port mapping for vip %s from LB %s: %v", vip, lb, err)
×
254
                                                return err
×
255
                                        }
×
256
                                        if err := c.OVNNbClient.LoadBalancerDeleteIPPortMapping(oldLb, vip); err != nil {
×
257
                                                klog.Errorf("failed to delete ip port mapping for vip %s from LB %s: %v", vip, lb, err)
×
258
                                                return err
×
259
                                        }
×
260
                                }
261
                        }
262
                }
263
        }
264

265
        if svcVpc = svc.Annotations[util.VpcAnnotation]; svcVpc != vpcName {
×
266
                patch := util.KVPatch{util.VpcAnnotation: vpcName}
×
267
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Services(namespace), svc.Name, patch); err != nil {
×
268
                        klog.Errorf("failed to patch service %s: %v", key, err)
×
269
                        return err
×
270
                }
×
271
        }
272

273
        return nil
×
274
}
275

276
// Update the endpoint IP address with the secondary IP address of the pod using the network attachment definition annotation
277
// This is a temporary fix to allow consumers to use the secondary IP address of the pod
278
// TODO: Remove this function and update the endpoint construction to use the secondary IP address of the pod
279
func (c *Controller) replaceEndpointAddressesWithSecondaryIPs(endpointSlices []*discoveryv1.EndpointSlice, pods []*v1.Pod) error {
1✔
280
        // Track which pods have been processed
1✔
281
        processedPods := make(map[string]bool)
1✔
282
        // Store pod information in a map
1✔
283
        podMap := make(map[string]*v1.Pod, len(pods))
1✔
284
        for i := range pods {
2✔
285
                podMap[pods[i].Name] = pods[i]
1✔
286
        }
1✔
287
        // Pre-compute secondary IPs for all pods to avoid repeated annotation lookups
288
        secondaryIPs := make(map[string]string, len(pods))
1✔
289
        for _, pod := range pods {
2✔
290
                providers, err := c.getPodProviders(pod)
1✔
291
                if err != nil {
1✔
292
                        return err
×
293
                }
×
294
                if len(providers) > 0 {
2✔
295
                        ipAddress := pod.Annotations[fmt.Sprintf(util.IPAddressAnnotationTemplate, providers[0])]
1✔
296
                        if ipAddress != "" {
2✔
297
                                secondaryIPs[pod.Name] = ipAddress
1✔
298
                        }
1✔
299
                }
300
        }
301
        // Process each endpoint slice
302
        for i, endpoint := range endpointSlices {
2✔
303
                var copiedSlice *discoveryv1.EndpointSlice
1✔
304
                needsUpdate := false
1✔
305
                // Check if any endpoints need updating first
1✔
306
                for j, ep := range endpoint.Endpoints {
2✔
307
                        if ep.TargetRef != nil && ep.TargetRef.Kind == util.KindPod {
2✔
308
                                podName := ep.TargetRef.Name
1✔
309
                                // Skip if already processed this pod
1✔
310
                                // Include slice index to handle pod in multiple slices
1✔
311
                                podKey := fmt.Sprintf("%s/%d", podName, i)
1✔
312
                                if processedPods[podKey] {
1✔
313
                                        continue
×
314
                                }
315
                                if secondaryIP, hasSecondaryIP := secondaryIPs[podName]; hasSecondaryIP {
2✔
316
                                        if pod, ok := podMap[podName]; ok {
2✔
317
                                                // Check if any address needs replacement
1✔
318
                                                for k, address := range ep.Addresses {
2✔
319
                                                        // Only replace if it's the primary IP
1✔
320
                                                        if address == pod.Status.PodIP {
2✔
321
                                                                // Lazy deep copy
1✔
322
                                                                if !needsUpdate {
2✔
323
                                                                        copiedSlice = endpoint.DeepCopy()
1✔
324
                                                                        needsUpdate = true
1✔
325
                                                                }
1✔
326
                                                                klog.Infof("updating pod %s/%s ip address %s to %s",
1✔
327
                                                                        pod.Namespace, pod.Name, pod.Status.PodIP, secondaryIP)
1✔
328
                                                                copiedSlice.Endpoints[j].Addresses[k] = secondaryIP
1✔
329
                                                                processedPods[podKey] = true
1✔
330
                                                                // Only one primary IP per endpoint
1✔
331
                                                                break
1✔
332
                                                        } else if address == secondaryIP {
×
333
                                                                // Already has secondary IP, mark as processed
×
334
                                                                processedPods[podKey] = true
×
335
                                                                break
×
336
                                                        }
337
                                                }
338
                                        }
339
                                }
340
                        }
341
                }
342
                // Replace the slice if we made changes
343
                if needsUpdate {
2✔
344
                        endpointSlices[i] = copiedSlice
1✔
345
                }
1✔
346
        }
347

348
        return nil
1✔
349
}
350

351
// enqueueStaticEndpointUpdateInNamespace enqueues updates for every statically generated EndpointSlice in a namespace.
352
// Statically generated EndpointSlices are not generated by the selectors of their parent service.
353
func (c *Controller) enqueueStaticEndpointUpdateInNamespace(namespace string) {
×
354
        // Find all the statically generated EndpointSlices in the namespace
×
355
        endpointSlices, err := c.findStaticEndpointSlicesInNamespace(namespace)
×
356
        if err != nil {
×
357
                err := fmt.Errorf("couldn't find static endpointslices in namespace %s: %w", namespace, err)
×
358
                klog.Error(err)
×
359
        }
×
360

361
        // Enqueue updates for all the EndpointSlices
362
        for _, slice := range endpointSlices {
×
363
                c.enqueueAddEndpointSlice(slice)
×
364
        }
×
365
}
366

367
// serviceHealthChecksDisabled returns whether health checks must be omitted for a particular service
368
func serviceHealthChecksDisabled(service *v1.Service) bool {
1✔
369
        // Service must not have disabled health checks
1✔
370
        if service.Annotations != nil && service.Annotations[util.ServiceHealthCheck] == "false" {
2✔
371
                return true
1✔
372
        }
1✔
373

374
        // If nothing is specified, checks are enabled by default
375
        return false
1✔
376
}
377

378
// findStaticEndpointSlicesInNamespace finds all the EndpointSlices in a namespace that are statically generated.
379
// Statically generated EndpointSlices are not generated by the selectors of their parent service.
380
func (c *Controller) findStaticEndpointSlicesInNamespace(namespace string) ([]*discoveryv1.EndpointSlice, error) {
×
381
        // Retrieve all the services in the namespace
×
382
        services, err := c.servicesLister.Services(namespace).List(labels.Everything())
×
383
        if err != nil {
×
384
                err := fmt.Errorf("couldn't list services in namespace %s: %w", namespace, err)
×
385
                klog.Error(err)
×
386
                return nil, err
×
387
        }
×
388

389
        // Only handle services that have static endpoints provided, and not selectors
390
        var filteredServices []*v1.Service
×
391
        for _, service := range services {
×
392
                if serviceHasSelector(service) {
×
393
                        continue
×
394
                }
395

396
                filteredServices = append(filteredServices, service)
×
397
        }
398

399
        // Find the EndpointSlices linked to those services
400
        endpointSlices, err := c.findEndpointSlicesForServices(namespace, filteredServices)
×
401
        if err != nil {
×
402
                return nil, err
×
403
        }
×
404

405
        return endpointSlices, nil
×
406
}
407

408
// findEndpointSlicesForServices returns all the EndpointSlices that are linked to services in the same namespace.
409
// Parameter "namespace" is the namespace in which all the services are located.
410
// Parameter "services" is a list of all the services for which we want to find the EndpointSlices.
411
func (c *Controller) findEndpointSlicesForServices(namespace string, services []*v1.Service) ([]*discoveryv1.EndpointSlice, error) {
×
412
        var endpointSlices []*discoveryv1.EndpointSlice
×
413

×
414
        // Retrieve all the endpointSlices in the namespace of the services
×
415
        eps, err := c.endpointSlicesLister.EndpointSlices(namespace).List(labels.Everything())
×
416
        if err != nil {
×
417
                err := fmt.Errorf("couldn't list endpointslices in namespace %s: %w", namespace, err)
×
418
                klog.Error(err)
×
419
                return nil, err
×
420
        }
×
421

422
        // Find the EndpointSlices part of each service
423
        for _, service := range services {
×
424
                for _, endpointSlice := range eps {
×
425
                        if getServiceForEndpointSlice(endpointSlice) == service.Name {
×
426
                                endpointSlices = append(endpointSlices, endpointSlice)
×
427
                        }
×
428
                }
429
        }
430

431
        return endpointSlices, nil
×
432
}
433

434
// serviceHasSelector returns if a service has selectors
435
func serviceHasSelector(service *v1.Service) bool {
1✔
436
        return len(service.Spec.Selector) > 0
1✔
437
}
1✔
438

439
// getCustomServiceVpcAndSubnet returns the custom VPC/Subnet defined on a service
440
func getCustomServiceVpcAndSubnet(service *v1.Service) (vpcName, subnetName string) {
×
441
        if service.Annotations != nil {
×
442
                vpcName = service.Annotations[util.LogicalRouterAnnotation]
×
443
                subnetName = service.Annotations[util.LogicalSwitchAnnotation]
×
444
        }
×
445

446
        return vpcName, subnetName
×
447
}
448

449
// getDefaultVpcAndSubnet returns the default VPC/Subnet to apply to a LoadBalancer if nothing was found
450
// during automatic discovery. If both parameters are non-empty, they are returned as is.
451
func (c *Controller) getDefaultVpcAndSubnet(service *v1.Service, vpcName, subnetName string) (string, string) {
×
452
        // Default to what's on the service or to the default VPC
×
453
        if vpcName == "" {
×
454
                if vpcName = service.Annotations[util.VpcAnnotation]; vpcName == "" {
×
455
                        vpcName = c.config.ClusterRouter
×
456
                }
×
457
        }
458

459
        // Use the default subnet if it wasn't found
460
        if subnetName == "" {
×
461
                subnetName = util.DefaultSubnet
×
462
        }
×
463

464
        return vpcName, subnetName
×
465
}
466

467
// getVpcAndSubnetForEndpoints returns the name of the VPC/Subnet for EndpointSlices
468
func (c *Controller) getVpcAndSubnetForEndpoints(endpointSlices []*discoveryv1.EndpointSlice, service *v1.Service) (vpcName, subnetName string, err error) {
×
469
        // Let the user self-determine what VPC and subnet to use if they provided annotations on the service
×
470
        // Both the VPC and Subnet must be provided
×
471
        vpcName, subnetName = getCustomServiceVpcAndSubnet(service)
×
472
        if vpcName != "" && subnetName != "" {
×
473
                return vpcName, subnetName, nil
×
474
        }
×
475

476
        // Choose the most optimized and straightforward way to retrieve the name of the VPC and subnet
477
        if serviceHasSelector(service) {
×
478
                // The service has a selector, which means that the EndpointSlices should have targets.
×
479
                // We can use those targets instead of looking at every pod in the namespace.
×
480
                vpcName, subnetName = c.findVpcAndSubnetWithTargets(endpointSlices)
×
481
        } else {
×
482
                // The service has no selectors, we must find which pods in the namespace of the service
×
483
                // are targeted by the endpoint by only looking at the IPs.
×
484
                pods, err := c.podsLister.Pods(service.Namespace).List(labels.Everything())
×
485
                if err != nil {
×
486
                        err := fmt.Errorf("failed to get pods for service %s in namespace %s: %w", service.Name, service.Namespace, err)
×
487
                        klog.Error(err)
×
488
                        return "", "", err
×
489
                }
×
490

491
                vpcName, subnetName = c.findVpcAndSubnetWithNoTargets(endpointSlices, pods)
×
492
        }
493

494
        vpcName, subnetName = c.getDefaultVpcAndSubnet(service, vpcName, subnetName)
×
495
        return vpcName, subnetName, nil
×
496
}
497

498
// findVpcAndSubnetWithTargets returns the name of the VPC and Subnet for endpoints with targets
499
func (c *Controller) findVpcAndSubnetWithTargets(endpointSlices []*discoveryv1.EndpointSlice) (vpcName, subnetName string) {
×
500
        for _, slice := range endpointSlices {
×
501
                for _, endpoint := range slice.Endpoints {
×
502
                        if endpoint.TargetRef == nil {
×
503
                                continue
×
504
                        }
505

506
                        namespace, name := endpoint.TargetRef.Namespace, endpoint.TargetRef.Name
×
507
                        if name == "" || namespace == "" {
×
508
                                continue
×
509
                        }
510

511
                        pod, err := c.podsLister.Pods(namespace).Get(name)
×
512
                        if err != nil {
×
513
                                err := fmt.Errorf("couldn't retrieve pod %s/%s: %w", namespace, name, err)
×
514
                                klog.Error(err)
×
515
                                continue
×
516
                        }
517

518
                        vpc, subnet, err := c.getEndpointVpcAndSubnet(pod, endpoint.Addresses)
×
519
                        if err != nil {
×
520
                                err := fmt.Errorf("couldn't retrieve subnet/vpc for pod %s/%s: %w", namespace, name, err)
×
521
                                klog.Error(err)
×
522
                                continue
×
523
                        }
524

525
                        if vpcName == "" {
×
526
                                vpcName = vpc
×
527
                        }
×
528

529
                        if subnetName == "" {
×
530
                                subnetName = subnet
×
531
                        }
×
532

533
                        if vpcName != "" && subnetName != "" {
×
534
                                return vpcName, subnetName
×
535
                        }
×
536
                }
537
        }
538

539
        return vpcName, subnetName
×
540
}
541

542
// findVpcAndSubnetWithNoTargets returns the name of the VPC and Subnet for endpoints with no targets
543
func (c *Controller) findVpcAndSubnetWithNoTargets(endpointSlices []*discoveryv1.EndpointSlice, pods []*v1.Pod) (vpcName, subnetName string) {
×
544
        for _, slice := range endpointSlices {
×
545
                for _, endpoint := range slice.Endpoints {
×
546
                        for _, pod := range pods {
×
547
                                vpc, subnet, err := c.getEndpointVpcAndSubnet(pod, endpoint.Addresses)
×
548
                                if err != nil {
×
549
                                        err := fmt.Errorf("couldn't retrieve subnet/vpc for pod %s/%s: %w", pod.Namespace, pod.Name, err)
×
550
                                        klog.Error(err)
×
551
                                        continue
×
552
                                }
553

554
                                if vpcName == "" {
×
555
                                        vpcName = vpc
×
556
                                }
×
557

558
                                if subnetName == "" {
×
559
                                        subnetName = subnet
×
560
                                }
×
561

562
                                if vpcName != "" && subnetName != "" {
×
563
                                        return vpcName, subnetName
×
564
                                }
×
565
                        }
566
                }
567
        }
568

569
        return vpcName, subnetName
×
570
}
571

572
// getHealthCheckVip get health check vip for load balancer, the vip name is the subnet name
573
// the vip is used to check the health of the backend pod
574
func (c *Controller) getHealthCheckVip(subnetName, lbVip string) (string, error) {
×
575
        var (
×
576
                needCreateHealthCheckVip bool
×
577
                checkVip                 *kubeovnv1.Vip
×
578
                checkIP                  string
×
579
                err                      error
×
580
        )
×
581
        vipName := subnetName
×
582
        checkVip, err = c.virtualIpsLister.Get(vipName)
×
583
        if err != nil {
×
584
                if errors.IsNotFound(err) {
×
585
                        needCreateHealthCheckVip = true
×
586
                } else {
×
587
                        klog.Errorf("failed to get health check vip %s, %v", vipName, err)
×
588
                        return "", err
×
589
                }
×
590
        }
591
        if needCreateHealthCheckVip {
×
592
                vip := &kubeovnv1.Vip{
×
593
                        ObjectMeta: metav1.ObjectMeta{
×
594
                                Name: vipName,
×
595
                        },
×
596
                        Spec: kubeovnv1.VipSpec{
×
597
                                Subnet: subnetName,
×
598
                        },
×
599
                }
×
600
                if _, err = c.config.KubeOvnClient.KubeovnV1().Vips().Create(context.Background(), vip, metav1.CreateOptions{}); err != nil {
×
601
                        klog.Errorf("failed to create health check vip %s, %v", vipName, err)
×
602
                        return "", err
×
603
                }
×
604

605
                // wait for vip created
606
                // TODO: WATCH VIP
607
                time.Sleep(1 * time.Second)
×
608
                checkVip, err = c.virtualIpsLister.Get(vipName)
×
609
                if err != nil {
×
610
                        klog.Errorf("failed to get health check vip %s, %v", vipName, err)
×
611
                        return "", err
×
612
                }
×
613
        }
614

615
        if checkVip.Status.V4ip == "" && checkVip.Status.V6ip == "" {
×
616
                err = fmt.Errorf("vip %s is not ready", vipName)
×
617
                klog.Error(err)
×
618
                return "", err
×
619
        }
×
620

621
        switch util.CheckProtocol(lbVip) {
×
622
        case kubeovnv1.ProtocolIPv4:
×
623
                checkIP = checkVip.Status.V4ip
×
624
        case kubeovnv1.ProtocolIPv6:
×
625
                checkIP = checkVip.Status.V6ip
×
626
        }
627
        if checkIP == "" {
×
628
                err = fmt.Errorf("failed to get health check vip subnet %s", vipName)
×
629
                klog.Error(err)
×
630
                return "", err
×
631
        }
×
632

633
        return checkIP, nil
×
634
}
635

636
// getEndpointBackend returns the LB backend for a service
637
func (c *Controller) getEndpointBackend(endpointSlices []*discoveryv1.EndpointSlice, servicePort v1.ServicePort, serviceIP string) (backends []string) {
×
638
        protocol := util.CheckProtocol(serviceIP)
×
639

×
640
        for _, endpointSlice := range endpointSlices {
×
641
                var targetPort int32
×
642
                for _, port := range endpointSlice.Ports {
×
643
                        if port.Name != nil && *port.Name == servicePort.Name {
×
644
                                targetPort = *port.Port
×
645
                                break
×
646
                        }
647
                }
648
                if targetPort == 0 {
×
649
                        continue
×
650
                }
651

652
                for _, endpoint := range endpointSlice.Endpoints {
×
653
                        if !endpointReady(endpoint) {
×
654
                                continue
×
655
                        }
656

657
                        for _, address := range endpoint.Addresses {
×
658
                                if util.CheckProtocol(address) == protocol {
×
659
                                        backends = append(backends, util.JoinHostPort(address, targetPort))
×
660
                                }
×
661
                        }
662
                }
663
        }
664

665
        return backends
×
666
}
667

668
// endpointReady returns whether an endpoint can receive traffic
669
func endpointReady(endpoint discoveryv1.Endpoint) bool {
1✔
670
        return endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready
1✔
671
}
1✔
672

673
// addIPPortMappingEntry adds a new entry to an IPPortMapping for a given target, the addresses on that target and the
674
// VIP used to run the health checks
675
func (c *Controller) addIPPortMappingEntry(pod *v1.Pod, addresses []string, checkVip string, mapping IPPortMapping) error {
×
676
        // Abort if the pod is getting deleted
×
677
        if !pod.DeletionTimestamp.IsZero() {
×
678
                return nil
×
679
        }
×
680

681
        // Compute the name of the LSP for that endpoint target
682
        lspName, err := c.getEndpointTargetLSPName(pod, addresses)
×
683
        if err != nil {
×
684
                return fmt.Errorf("couldn't get LSP for the endpoint's target: %w", err)
×
685
        }
×
686

687
        for _, address := range addresses {
×
688
                key := address
×
689
                if util.CheckProtocol(address) == kubeovnv1.ProtocolIPv6 {
×
690
                        key = fmt.Sprintf("[%s]", address)
×
691
                }
×
692
                mapping[key] = fmt.Sprintf(util.HealthCheckNamedVipTemplate, lspName, checkVip)
×
693
        }
694

695
        return nil
×
696
}
697

698
// getIPPortMapping returns the mapping between each endpoint, LSP and health check VIP
699
func (c *Controller) getIPPortMapping(endpointSlices []*discoveryv1.EndpointSlice, service *v1.Service, checkVip string) (IPPortMapping, error) {
×
700
        // Choose the most optimized and straightforward way to compute the IPPortMapping
×
701
        if serviceHasSelector(service) {
×
702
                // The service has a selector, which means that the EndpointSlices should have targets.
×
703
                // We can use those targets instead of looking at every pod in the namespace.
×
704
                return c.getIPPortMappingWithTargets(endpointSlices, checkVip), nil
×
705
        }
×
706

707
        // The service has no selectors, we must find which pods in the namespace of the service
708
        // are targeted by the endpoint by only looking at the IPs.
709
        pods, err := c.podsLister.Pods(service.Namespace).List(labels.Everything())
×
710
        if err != nil {
×
711
                err := fmt.Errorf("failed to get pods for service %s in namespace %s: %w", service.Name, service.Namespace, err)
×
712
                klog.Error(err)
×
713
                return nil, err
×
714
        }
×
715

716
        return c.getIPPortMappingWithNoTargets(endpointSlices, pods, checkVip), nil
×
717
}
718

719
// getIPPortMappingWithTargets returns the IPPortMapping for endpoints with targets
720
func (c *Controller) getIPPortMappingWithTargets(endpointSlices []*discoveryv1.EndpointSlice, checkVip string) IPPortMapping {
×
721
        mapping := make(IPPortMapping)
×
722

×
723
        for _, slice := range endpointSlices {
×
724
                for _, endpoint := range slice.Endpoints {
×
725
                        if endpoint.TargetRef == nil {
×
726
                                continue
×
727
                        }
728

729
                        namespace, name := endpoint.TargetRef.Namespace, endpoint.TargetRef.Name
×
730
                        if name == "" || namespace == "" {
×
731
                                continue
×
732
                        }
733

734
                        // Retrieve the pod for that endpoint target
735
                        pod, err := c.podsLister.Pods(namespace).Get(name)
×
736
                        if err != nil {
×
737
                                err := fmt.Errorf("couldn't retrieve pod %s/%s: %w", namespace, name, err)
×
738
                                klog.Error(err)
×
739
                                continue
×
740
                        }
741

742
                        // Compute the IPPortMapping for that endpoint target
743
                        if err := c.addIPPortMappingEntry(pod, endpoint.Addresses, checkVip, mapping); err != nil {
×
744
                                err := fmt.Errorf("couldn't compute ip port mapping for pod %s/%s: %w", namespace, name, err)
×
745
                                klog.Error(err)
×
746
                                continue
×
747
                        }
748
                }
749
        }
750

751
        return mapping
×
752
}
753

754
// getIPPortMappingWithNoTargets returns the IPPortMapping for endpoints with no targets
755
func (c *Controller) getIPPortMappingWithNoTargets(endpointSlices []*discoveryv1.EndpointSlice, pods []*v1.Pod, checkVip string) IPPortMapping {
×
756
        mapping := make(IPPortMapping)
×
757

×
758
        for _, slice := range endpointSlices {
×
759
                for _, endpoint := range slice.Endpoints {
×
760
                        for _, pod := range pods {
×
761
                                // Try to find a matching provider for the addresses
×
762
                                provider, err := c.getEndpointProvider(pod, endpoint.Addresses)
×
763
                                if err != nil {
×
764
                                        err := fmt.Errorf("couldn't get provider for pod %s/%s: %w", pod.Namespace, pod.Name, err)
×
765
                                        klog.Error(err)
×
766
                                        continue
×
767
                                }
768

769
                                // If the pod has a provider that matches that set of addresses, it is an endpoint target.
770
                                // Otherwise, it isn't targeted by the EndpointSlice and can be dismissed.
771
                                if provider == "" {
×
772
                                        continue
×
773
                                }
774

775
                                // Compute the IPPortMapping for that endpoint target
776
                                if err := c.addIPPortMappingEntry(pod, endpoint.Addresses, checkVip, mapping); err != nil {
×
777
                                        err := fmt.Errorf("couldn't compute ip port mapping for pod %s/%s: %w", pod.Namespace, pod.Name, err)
×
778
                                        klog.Error(err)
×
779
                                        continue
×
780
                                }
781
                        }
782
                }
783
        }
784

785
        return mapping
×
786
}
787

788
// getPodProviders returns all the providers available on a pod
789
func (c *Controller) getPodProviders(pod *v1.Pod) ([]string, error) {
1✔
790
        // Get all the networks to which the pod is attached
1✔
791
        podNetworks, err := c.getPodKubeovnNets(pod)
1✔
792
        if err != nil {
1✔
793
                return nil, fmt.Errorf("failed to get pod networks: %w", err)
×
794
        }
×
795

796
        // Retrieve all the providers
797
        var providers []string
1✔
798
        for _, podNetwork := range podNetworks {
2✔
799
                providers = append(providers, podNetwork.ProviderName)
1✔
800
        }
1✔
801

802
        return providers, nil
1✔
803
}
804

805
// getMatchingProviderForAddress returns the provider linked to a subnet in which a particular address is present
806
func getMatchingProviderForAddress(pod *v1.Pod, providers []string, address string) string {
1✔
807
        if pod.Annotations == nil {
2✔
808
                return ""
1✔
809
        }
1✔
810

811
        // Find which provider is linked to this address
812
        for _, provider := range providers {
2✔
813
                ipsForProvider, exists := pod.Annotations[fmt.Sprintf(util.IPAddressAnnotationTemplate, provider)]
1✔
814
                if !exists {
1✔
815
                        continue
×
816
                }
817

818
                ips := strings.Split(ipsForProvider, ",")
1✔
819
                if slices.Contains(ips, address) {
2✔
820
                        return provider
1✔
821
                }
1✔
822
        }
823

824
        return ""
1✔
825
}
826

827
// getEndpointProvider returns the provider linked to the addresses of an endpoint
828
func (c *Controller) getEndpointProvider(pod *v1.Pod, addresses []string) (string, error) {
×
829
        // Retrieve all the providers of the pod
×
830
        providers, err := c.getPodProviders(pod)
×
831
        if err != nil {
×
832
                return "", err
×
833
        }
×
834

835
        // Get the first matching provider for any of the address in the endpoint
836
        var provider string
×
837
        for _, address := range addresses {
×
838
                if provider = getMatchingProviderForAddress(pod, providers, address); provider != "" {
×
839
                        return provider, nil
×
840
                }
×
841
        }
842

843
        return "", nil
×
844
}
845

846
// getEndpointTargetLSPNameFromProvider returns the name of the LSP for a pod targeted by an endpoint.
847
// A custom provider can be specified if the LSP is within a subnet that doesn't use
848
// the default "ovn" provider.
849
func getEndpointTargetLSPNameFromProvider(pod *v1.Pod, provider string) string {
1✔
850
        // If no provider is specified, use the default one
1✔
851
        if provider == "" {
2✔
852
                provider = util.OvnProvider
1✔
853
        }
1✔
854

855
        target := pod.Name
1✔
856

1✔
857
        // If this pod is a VM launcher pod, we need to retrieve the name of the VM. This is necessary
1✔
858
        // because we do not use the same syntax for the LSP of normal pods and for VM pods
1✔
859
        if vmName, exists := pod.Annotations[fmt.Sprintf(util.VMAnnotationTemplate, provider)]; exists {
2✔
860
                target = vmName
1✔
861
        }
1✔
862

863
        return ovs.PodNameToPortName(target, pod.Namespace, provider)
1✔
864
}
865

866
// getEndpointTargetLSP returns the name of the LSP on which addresses are attached for a specific pod
867
func (c *Controller) getEndpointTargetLSPName(pod *v1.Pod, addresses []string) (string, error) {
×
868
        // Retrieve the provider for those addresses
×
869
        provider, err := c.getEndpointProvider(pod, addresses)
×
870
        if err != nil {
×
871
                return "", err
×
872
        }
×
873

874
        return getEndpointTargetLSPNameFromProvider(pod, provider), nil
×
875
}
876

877
// getSubnetByProvider returns the subnet linked to a provider on a pod
878
func getSubnetByProvider(pod *v1.Pod, provider string) (string, error) {
×
879
        subnetName, exists := pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, provider)]
×
880
        if !exists {
×
881
                return "", fmt.Errorf("couldn't find subnet linked to provider %s", provider)
×
882
        }
×
883

884
        return subnetName, nil
×
885
}
886

887
// getVpcByProvider returns the VPC linked to a provider on a pod.
888
// For underlay subnets without LogicalGateway or U2OInterconnection,
889
// the logical_router annotation is not set, so an empty string is returned.
NEW
890
func getVpcByProvider(pod *v1.Pod, provider string) string {
×
NEW
891
        return pod.Annotations[fmt.Sprintf(util.LogicalRouterAnnotationTemplate, provider)]
×
UNCOV
892
}
×
893

894
// getEndpointVpcAndSubnet returns the VPC/subnet for a pod and a set of addresses attached to it
895
func (c *Controller) getEndpointVpcAndSubnet(pod *v1.Pod, addresses []string) (string, string, error) {
×
896
        // Retrieve the provider for those addresses
×
897
        provider, err := c.getEndpointProvider(pod, addresses)
×
898
        if err != nil {
×
899
                return "", "", err
×
900
        }
×
901

902
        if provider == "" {
×
903
                return "", "", nil
×
904
        }
×
905

906
        // Retrieve the subnet
907
        subnet, err := getSubnetByProvider(pod, provider)
×
908
        if err != nil {
×
909
                return "", "", err
×
910
        }
×
911

912
        // Retrieve the VPC
NEW
913
        vpc := getVpcByProvider(pod, provider)
×
914

×
915
        return vpc, subnet, nil
×
916
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc