• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubeovn / kube-ovn / 16877641164

11 Aug 2025 10:38AM UTC coverage: 21.408% (-0.04%) from 21.449%
16877641164

push

github

web-flow
Support healthchecks for static endpoints in services (#5435)

* feat(endpoint_slice): refactor and support arbitrary providers

Signed-off-by: SkalaNetworks <contact@skala.network>

* feat(endpoints): support healthchecks for static endpoints

Signed-off-by: SkalaNetworks <contact@skala.network>

* feat(slr): setting to disable healthchecks

Signed-off-by: SkalaNetworks <contact@skala.network>

* feat(svc): disable healthchecks at will

Signed-off-by: SkalaNetworks <contact@skala.network>

* fix(slr): propagate changes in annotations to svc

Signed-off-by: SkalaNetworks <contact@skala.network>

---------

Signed-off-by: SkalaNetworks <contact@skala.network>

15 of 188 new or added lines in 3 files covered. (7.98%)

4 existing lines in 3 files now uncovered.

10579 of 49417 relevant lines covered (21.41%)

0.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

8.73
/pkg/controller/endpoint_slice.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "slices"
7
        "strings"
8
        "time"
9

10
        v1 "k8s.io/api/core/v1"
11
        discoveryv1 "k8s.io/api/discovery/v1"
12
        "k8s.io/apimachinery/pkg/api/errors"
13
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14
        "k8s.io/apimachinery/pkg/labels"
15
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
16
        "k8s.io/client-go/tools/cache"
17
        "k8s.io/klog/v2"
18

19
        kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
20
        "github.com/kubeovn/kube-ovn/pkg/ovs"
21
        "github.com/kubeovn/kube-ovn/pkg/util"
22
)
23

24
type IPPortMapping map[string]string
25

26
// getServiceForEndpointSlice returns the service linked to an EndpointSlice
27
func getServiceForEndpointSlice(endpointSlice *discoveryv1.EndpointSlice) string {
1✔
28
        if endpointSlice != nil && endpointSlice.Labels != nil {
2✔
29
                return endpointSlice.Labels[discoveryv1.LabelServiceName]
1✔
30
        }
1✔
31

32
        return ""
1✔
33
}
34

35
func findServiceKey(endpointSlice *discoveryv1.EndpointSlice) string {
1✔
36
        service := getServiceForEndpointSlice(endpointSlice)
1✔
37
        if service == "" {
2✔
38
                return ""
1✔
39
        }
1✔
40

41
        return endpointSlice.Namespace + "/" + service
1✔
42
}
43

44
func (c *Controller) enqueueAddEndpointSlice(obj any) {
×
45
        key := findServiceKey(obj.(*discoveryv1.EndpointSlice))
×
46
        if key != "" {
×
47
                klog.V(3).Infof("enqueue add endpointSlice %s", key)
×
48
                c.addOrUpdateEndpointSliceQueue.Add(key)
×
49
        }
×
50
}
51

52
func (c *Controller) enqueueUpdateEndpointSlice(oldObj, newObj any) {
×
53
        oldEndpointSlice := oldObj.(*discoveryv1.EndpointSlice)
×
54
        newEndpointSlice := newObj.(*discoveryv1.EndpointSlice)
×
55
        if oldEndpointSlice.ResourceVersion == newEndpointSlice.ResourceVersion {
×
56
                return
×
57
        }
×
58

59
        if len(oldEndpointSlice.Endpoints) == 0 && len(newEndpointSlice.Endpoints) == 0 {
×
60
                return
×
61
        }
×
62

63
        key := findServiceKey(newEndpointSlice)
×
64
        if key != "" {
×
65
                klog.V(3).Infof("enqueue update endpointSlice for service %s", key)
×
66
                c.addOrUpdateEndpointSliceQueue.Add(key)
×
67
        }
×
68
}
69

70
func (c *Controller) handleUpdateEndpointSlice(key string) error {
×
71
        namespace, name, err := cache.SplitMetaNamespaceKey(key)
×
72
        if err != nil {
×
73
                utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
×
74
                return nil
×
75
        }
×
76

77
        c.epKeyMutex.LockKey(key)
×
78
        defer func() { _ = c.epKeyMutex.UnlockKey(key) }()
×
79
        klog.Infof("handle update endpointSlice for service %s", key)
×
80

×
81
        endpointSlices, err := c.endpointSlicesLister.EndpointSlices(namespace).List(labels.Set{discoveryv1.LabelServiceName: name}.AsSelector())
×
82
        if err != nil {
×
83
                if errors.IsNotFound(err) {
×
84
                        return nil
×
85
                }
×
86
                klog.Error(err)
×
87
                return err
×
88
        }
89

90
        cachedService, err := c.servicesLister.Services(namespace).Get(name)
×
91
        if err != nil {
×
92
                if errors.IsNotFound(err) {
×
93
                        return nil
×
94
                }
×
95
                klog.Error(err)
×
96
                return err
×
97
        }
98
        svc := cachedService.DeepCopy()
×
99

×
100
        var (
×
101
                lbVips                   []string
×
102
                vip, vpcName, subnetName string
×
103
                ok                       bool
×
104
                ignoreHealthCheck        = true
×
105
                isPreferLocalBackend     = false
×
106
        )
×
107

×
108
        if vip, ok = svc.Annotations[util.SwitchLBRuleVipsAnnotation]; ok {
×
109
                lbVips = []string{vip}
×
110

×
NEW
111
                // Health checks can only run against IPv4 endpoints and if the service doesn't specify they must be disabled
×
NEW
112
                if util.CheckProtocol(vip) == kubeovnv1.ProtocolIPv4 && !serviceHealthChecksDisabled(svc) {
×
NEW
113
                        ignoreHealthCheck = false
×
114
                }
×
115
        } else if lbVips = util.ServiceClusterIPs(*svc); len(lbVips) == 0 {
×
116
                return nil
×
117
        }
×
118

119
        if c.config.EnableLb && c.config.EnableOVNLBPreferLocal {
×
120
                if svc.Spec.Type == v1.ServiceTypeLoadBalancer && svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal {
×
121
                        if len(svc.Status.LoadBalancer.Ingress) > 0 {
×
122
                                for _, ingress := range svc.Status.LoadBalancer.Ingress {
×
123
                                        if ingress.IP != "" {
×
124
                                                lbVips = append(lbVips, ingress.IP)
×
125
                                        }
×
126
                                }
127
                        }
128
                        isPreferLocalBackend = true
×
129
                } else if svc.Spec.Type == v1.ServiceTypeClusterIP && svc.Spec.InternalTrafficPolicy != nil && *svc.Spec.InternalTrafficPolicy == v1.ServiceInternalTrafficPolicyLocal {
×
130
                        isPreferLocalBackend = true
×
131
                }
×
132
        }
133

134
        vpcName, subnetName, err = c.getVpcAndSubnetForEndpoints(endpointSlices, svc)
×
135
        if err != nil {
×
136
                return err
×
137
        }
×
138

139
        var (
×
140
                vpc    *kubeovnv1.Vpc
×
141
                svcVpc string
×
142
        )
×
143

×
144
        if vpc, err = c.vpcsLister.Get(vpcName); err != nil {
×
145
                klog.Errorf("failed to get vpc %s, %v", vpcName, err)
×
146
                return err
×
147
        }
×
148

149
        tcpLb, udpLb, sctpLb := vpc.Status.TCPLoadBalancer, vpc.Status.UDPLoadBalancer, vpc.Status.SctpLoadBalancer
×
150
        oldTCPLb, oldUDPLb, oldSctpLb := vpc.Status.TCPSessionLoadBalancer, vpc.Status.UDPSessionLoadBalancer, vpc.Status.SctpSessionLoadBalancer
×
151
        if svc.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
×
152
                tcpLb, udpLb, sctpLb, oldTCPLb, oldUDPLb, oldSctpLb = oldTCPLb, oldUDPLb, oldSctpLb, tcpLb, udpLb, sctpLb
×
153
        }
×
154

155
        for _, lbVip := range lbVips {
×
156
                for _, port := range svc.Spec.Ports {
×
157
                        var lb, oldLb string
×
158
                        switch port.Protocol {
×
159
                        case v1.ProtocolTCP:
×
160
                                lb, oldLb = tcpLb, oldTCPLb
×
161
                        case v1.ProtocolUDP:
×
162
                                lb, oldLb = udpLb, oldUDPLb
×
163
                        case v1.ProtocolSCTP:
×
164
                                lb, oldLb = sctpLb, oldSctpLb
×
165
                        }
166

167
                        var (
×
168
                                vip, checkIP             string
×
169
                                backends                 []string
×
170
                                ipPortMapping, externals map[string]string
×
171
                        )
×
172

×
173
                        if !ignoreHealthCheck {
×
174
                                if checkIP, err = c.getHealthCheckVip(subnetName, lbVip); err != nil {
×
175
                                        klog.Error(err)
×
176
                                        return err
×
177
                                }
×
178
                                externals = map[string]string{
×
179
                                        util.SwitchLBRuleSubnet: subnetName,
×
180
                                }
×
181
                        }
182

183
                        if isPreferLocalBackend {
×
184
                                // only use the ipportmapping's lsp to ip map when the backend is local
×
185
                                checkIP = util.MasqueradeCheckIP
×
186
                        }
×
187

NEW
188
                        backends = c.getEndpointBackend(endpointSlices, port, lbVip)
×
NEW
189

×
NEW
190
                        if !ignoreHealthCheck || isPreferLocalBackend {
×
NEW
191
                                ipPortMapping, err = c.getIPPortMapping(endpointSlices, svc, checkIP)
×
NEW
192
                                if err != nil {
×
NEW
193
                                        err := fmt.Errorf("couldn't get ip port mapping for svc %s/%s: %w", svc.Namespace, svc.Name, err)
×
NEW
194
                                        return err
×
NEW
195
                                }
×
196
                        }
197

198
                        // for performance reason delete lb with no backends
199
                        if len(backends) != 0 {
×
200
                                vip = util.JoinHostPort(lbVip, port.Port)
×
201
                                klog.Infof("add vip endpoint %s, backends %v to LB %s", vip, backends, lb)
×
202
                                if err = c.OVNNbClient.LoadBalancerAddVip(lb, vip, backends...); err != nil {
×
203
                                        klog.Errorf("failed to add vip %s with backends %s to LB %s: %v", lbVip, backends, lb, err)
×
204
                                        return err
×
205
                                }
×
206

207
                                if isPreferLocalBackend && len(ipPortMapping) != 0 {
×
208
                                        if err = c.OVNNbClient.LoadBalancerUpdateIPPortMapping(lb, vip, ipPortMapping); err != nil {
×
209
                                                klog.Errorf("failed to update ip port mapping %s for vip %s to LB %s: %v", ipPortMapping, vip, lb, err)
×
210
                                                return err
×
211
                                        }
×
212
                                }
213

NEW
214
                                if !ignoreHealthCheck {
×
215
                                        klog.Infof("add health check ip port mapping %v to LB %s", ipPortMapping, lb)
×
216
                                        if err = c.OVNNbClient.LoadBalancerAddHealthCheck(lb, vip, ignoreHealthCheck, ipPortMapping, externals); err != nil {
×
217
                                                klog.Errorf("failed to add health check for vip %s with ip port mapping %s to LB %s: %v", lbVip, ipPortMapping, lb, err)
×
218
                                                return err
×
219
                                        }
×
220
                                }
221
                        } else {
×
222
                                vip = util.JoinHostPort(lbVip, port.Port)
×
223
                                klog.V(3).Infof("delete vip endpoint %s from LB %s", vip, lb)
×
224
                                if err = c.OVNNbClient.LoadBalancerDeleteVip(lb, vip, true); err != nil {
×
225
                                        klog.Errorf("failed to delete vip endpoint %s from LB %s: %v", vip, lb, err)
×
226
                                        return err
×
227
                                }
×
228

229
                                klog.V(3).Infof("delete vip endpoint %s from old LB %s", vip, oldLb)
×
230
                                if err = c.OVNNbClient.LoadBalancerDeleteVip(oldLb, vip, true); err != nil {
×
231
                                        klog.Errorf("failed to delete vip %s from LB %s: %v", vip, oldLb, err)
×
232
                                        return err
×
233
                                }
×
234

235
                                if c.config.EnableOVNLBPreferLocal {
×
236
                                        if err := c.OVNNbClient.LoadBalancerDeleteIPPortMapping(lb, vip); err != nil {
×
237
                                                klog.Errorf("failed to delete ip port mapping for vip %s from LB %s: %v", vip, lb, err)
×
238
                                                return err
×
239
                                        }
×
240
                                        if err := c.OVNNbClient.LoadBalancerDeleteIPPortMapping(oldLb, vip); err != nil {
×
241
                                                klog.Errorf("failed to delete ip port mapping for vip %s from LB %s: %v", vip, lb, err)
×
242
                                                return err
×
243
                                        }
×
244
                                }
245
                        }
246
                }
247
        }
248

249
        if svcVpc = svc.Annotations[util.VpcAnnotation]; svcVpc != vpcName {
×
250
                patch := util.KVPatch{util.VpcAnnotation: vpcName}
×
251
                if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Services(namespace), svc.Name, patch); err != nil {
×
252
                        klog.Errorf("failed to patch service %s: %v", key, err)
×
253
                        return err
×
254
                }
×
255
        }
256

257
        return nil
×
258
}
259

260
// enqueueStaticEndpointUpdateInNamespace enqueues updates for every statically generated EndpointSlice in a namespace.
261
// Statically generated EndpointSlices are not generated by the selectors of their parent service.
NEW
262
func (c *Controller) enqueueStaticEndpointUpdateInNamespace(namespace string) {
×
NEW
263
        // Find all the statically generated EndpointSlices in the namespace
×
NEW
264
        endpointSlices, err := c.findStaticEndpointSlicesInNamespace(namespace)
×
NEW
265
        if err != nil {
×
NEW
266
                err := fmt.Errorf("couldn't find static endpointslices in namespace %s: %w", namespace, err)
×
NEW
267
                klog.Error(err)
×
NEW
268
        }
×
269

270
        // Enqueue updates for all the EndpointSlices
NEW
271
        for _, slice := range endpointSlices {
×
NEW
272
                c.enqueueAddEndpointSlice(slice)
×
NEW
273
        }
×
274
}
275

276
// serviceHealthChecksDisabled returns whether health checks must be omitted for a particular service
277
func serviceHealthChecksDisabled(service *v1.Service) bool {
1✔
278
        // Service must not have disabled health checks
1✔
279
        if service.Annotations != nil && service.Annotations[util.ServiceHealthCheck] == "false" {
2✔
280
                return true
1✔
281
        }
1✔
282

283
        // If nothing is specified, checks are enabled by default
284
        return false
1✔
285
}
286

287
// findStaticEndpointSlicesInNamespace finds all the EndpointSlices in a namespace that are statically generated.
288
// Statically generated EndpointSlices are not generated by the selectors of their parent service.
NEW
289
func (c *Controller) findStaticEndpointSlicesInNamespace(namespace string) ([]*discoveryv1.EndpointSlice, error) {
×
NEW
290
        // Retrieve all the services in the namespace
×
NEW
291
        services, err := c.servicesLister.Services(namespace).List(labels.Everything())
×
NEW
292
        if err != nil {
×
NEW
293
                err := fmt.Errorf("couldn't list services in namespace %s: %w", namespace, err)
×
NEW
294
                klog.Error(err)
×
NEW
295
                return nil, err
×
NEW
296
        }
×
297

298
        // Only handle services that have static endpoints provided, and not selectors
NEW
299
        var filteredServices []*v1.Service
×
NEW
300
        for _, service := range services {
×
NEW
301
                if serviceHasSelector(service) {
×
NEW
302
                        continue
×
303
                }
304

NEW
305
                filteredServices = append(filteredServices, service)
×
306
        }
307

308
        // Find the EndpointSlices linked to those services
NEW
309
        endpointSlices, err := c.findEndpointSlicesForServices(namespace, filteredServices)
×
NEW
310
        if err != nil {
×
NEW
311
                return nil, err
×
NEW
312
        }
×
313

NEW
314
        return endpointSlices, nil
×
315
}
316

317
// findEndpointSlicesForServices returns all the EndpointSlices that are linked to services in the same namespace.
318
// Parameter "namespace" is the namespace in which all the services are located.
319
// Parameter "services" is a list of all the services for which we want to find the EndpointSlices.
NEW
320
func (c *Controller) findEndpointSlicesForServices(namespace string, services []*v1.Service) ([]*discoveryv1.EndpointSlice, error) {
×
NEW
321
        var endpointSlices []*discoveryv1.EndpointSlice
×
NEW
322

×
NEW
323
        // Retrieve all the endpointSlices in the namespace of the services
×
NEW
324
        eps, err := c.endpointSlicesLister.EndpointSlices(namespace).List(labels.Everything())
×
NEW
325
        if err != nil {
×
NEW
326
                err := fmt.Errorf("couldn't list endpointslices in namespace %s: %w", namespace, err)
×
NEW
327
                klog.Error(err)
×
NEW
328
                return nil, err
×
NEW
329
        }
×
330

331
        // Find the EndpointSlices part of each service
NEW
332
        for _, service := range services {
×
NEW
333
                for _, endpointSlice := range eps {
×
NEW
334
                        if getServiceForEndpointSlice(endpointSlice) == service.Name {
×
NEW
335
                                endpointSlices = append(endpointSlices, endpointSlice)
×
NEW
336
                        }
×
337
                }
338
        }
339

NEW
340
        return endpointSlices, nil
×
341
}
342

343
// serviceHasSelector returns if a service has selectors
344
func serviceHasSelector(service *v1.Service) bool {
1✔
345
        return len(service.Spec.Selector) > 0
1✔
346
}
1✔
347

348
// getCustomServiceVpcAndSubnet returns the custom VPC/Subnet defined on a service
NEW
349
func getCustomServiceVpcAndSubnet(service *v1.Service) (vpcName, subnetName string) {
×
350
        if service.Annotations != nil {
×
NEW
351
                vpcName = service.Annotations[util.LogicalRouterAnnotation]
×
NEW
352
                subnetName = service.Annotations[util.LogicalSwitchAnnotation]
×
NEW
353
        }
×
354

NEW
355
        return vpcName, subnetName
×
356
}
357

358
// getDefaultVpcAndSubnet returns the default VPC/Subnet to apply to a LoadBalancer if nothing was found
359
// during automatic discovery. If both parameters are non-empty, they are returned as is.
NEW
360
func (c *Controller) getDefaultVpcAndSubnet(service *v1.Service, vpcName, subnetName string) (string, string) {
×
NEW
361
        // Default to what's on the service or to the default VPC
×
NEW
362
        if vpcName == "" {
×
NEW
363
                if vpcName = service.Annotations[util.VpcAnnotation]; vpcName == "" {
×
NEW
364
                        vpcName = c.config.ClusterRouter
×
UNCOV
365
                }
×
366
        }
367

368
        // Use the default subnet if it wasn't found
NEW
369
        if subnetName == "" {
×
NEW
370
                subnetName = util.DefaultSubnet
×
NEW
371
        }
×
372

NEW
373
        return vpcName, subnetName
×
374
}
375

376
// getVpcAndSubnetForEndpoints returns the name of the VPC/Subnet for EndpointSlices
NEW
377
func (c *Controller) getVpcAndSubnetForEndpoints(endpointSlices []*discoveryv1.EndpointSlice, service *v1.Service) (vpcName, subnetName string, err error) {
×
NEW
378
        // Let the user self-determine what VPC and subnet to use if they provided annotations on the service
×
NEW
379
        // Both the VPC and Subnet must be provided
×
NEW
380
        vpcName, subnetName = getCustomServiceVpcAndSubnet(service)
×
NEW
381
        if vpcName != "" && subnetName != "" {
×
NEW
382
                return vpcName, subnetName, nil
×
NEW
383
        }
×
384

385
        // Choose the most optimized and straightforward way to retrieve the name of the VPC and subnet
386
        if serviceHasSelector(service) {
×
387
                // The service has a selector, which means that the EndpointSlices should have targets.
×
388
                // We can use those targets instead of looking at every pod in the namespace.
×
389
                vpcName, subnetName = c.findVpcAndSubnetWithTargets(endpointSlices)
×
390
        } else {
×
391
                // The service has no selectors, we must find which pods in the namespace of the service
×
392
                // are targeted by the endpoint by only looking at the IPs.
×
393
                pods, err := c.podsLister.Pods(service.Namespace).List(labels.Everything())
×
394
                if err != nil {
×
395
                        err := fmt.Errorf("failed to get pods for service %s in namespace %s: %w", service.Name, service.Namespace, err)
×
396
                        klog.Error(err)
×
397
                        return "", "", err
×
398
                }
×
399

400
                vpcName, subnetName = c.findVpcAndSubnetWithNoTargets(endpointSlices, pods)
×
401
        }
402

NEW
403
        vpcName, subnetName = c.getDefaultVpcAndSubnet(service, vpcName, subnetName)
×
404
        return vpcName, subnetName, nil
×
405
}
406

407
// findVpcAndSubnetWithTargets returns the name of the VPC and Subnet for endpoints with targets
408
func (c *Controller) findVpcAndSubnetWithTargets(endpointSlices []*discoveryv1.EndpointSlice) (vpcName, subnetName string) {
×
409
        for _, slice := range endpointSlices {
×
410
                for _, endpoint := range slice.Endpoints {
×
411
                        if endpoint.TargetRef == nil {
×
412
                                continue
×
413
                        }
414

415
                        namespace, name := endpoint.TargetRef.Namespace, endpoint.TargetRef.Name
×
416
                        if name == "" || namespace == "" {
×
417
                                continue
×
418
                        }
419

420
                        pod, err := c.podsLister.Pods(namespace).Get(name)
×
421
                        if err != nil {
×
422
                                err := fmt.Errorf("couldn't retrieve pod %s/%s: %w", namespace, name, err)
×
423
                                klog.Error(err)
×
424
                                continue
×
425
                        }
426

427
                        vpc, subnet, err := c.getEndpointVpcAndSubnet(pod, endpoint.Addresses)
×
428
                        if err != nil {
×
429
                                err := fmt.Errorf("couldn't retrieve get subnet/vpc for pod %s/%s: %w", namespace, name, err)
×
430
                                klog.Error(err)
×
431
                                continue
×
432
                        }
433

434
                        if vpcName == "" {
×
435
                                vpcName = vpc
×
436
                        }
×
437

438
                        if subnetName == "" {
×
439
                                subnetName = subnet
×
440
                        }
×
441

442
                        if vpcName != "" && subnetName != "" {
×
443
                                return vpcName, subnetName
×
444
                        }
×
445
                }
446
        }
447

448
        return vpcName, subnetName
×
449
}
450

451
// findVpcAndSubnetWithNoTargets returns the name of the VPC and Subnet for endpoints with no targets
452
func (c *Controller) findVpcAndSubnetWithNoTargets(endpointSlices []*discoveryv1.EndpointSlice, pods []*v1.Pod) (vpcName, subnetName string) {
×
453
        for _, slice := range endpointSlices {
×
454
                for _, endpoint := range slice.Endpoints {
×
455
                        for _, pod := range pods {
×
456
                                vpc, subnet, err := c.getEndpointVpcAndSubnet(pod, endpoint.Addresses)
×
457
                                if err != nil {
×
458
                                        err := fmt.Errorf("couldn't retrieve subnet/vpc for pod %s/%s: %w", pod.Namespace, pod.Name, err)
×
459
                                        klog.Error(err)
×
460
                                        continue
×
461
                                }
462

463
                                if vpcName == "" {
×
464
                                        vpcName = vpc
×
465
                                }
×
466

467
                                if subnetName == "" {
×
468
                                        subnetName = subnet
×
469
                                }
×
470

471
                                if vpcName != "" && subnetName != "" {
×
472
                                        return vpcName, subnetName
×
473
                                }
×
474
                        }
475
                }
476
        }
477

478
        return vpcName, subnetName
×
479
}
480

481
// getHealthCheckVip get health check vip for load balancer, the vip name is the subnet name
482
// the vip is used to check the health of the backend pod
483
func (c *Controller) getHealthCheckVip(subnetName, lbVip string) (string, error) {
×
484
        var (
×
485
                needCreateHealthCheckVip bool
×
486
                checkVip                 *kubeovnv1.Vip
×
487
                checkIP                  string
×
488
                err                      error
×
489
        )
×
490
        vipName := subnetName
×
491
        checkVip, err = c.virtualIpsLister.Get(vipName)
×
492
        if err != nil {
×
493
                if errors.IsNotFound(err) {
×
494
                        needCreateHealthCheckVip = true
×
495
                } else {
×
496
                        klog.Errorf("failed to get health check vip %s, %v", vipName, err)
×
497
                        return "", err
×
498
                }
×
499
        }
500
        if needCreateHealthCheckVip {
×
501
                vip := &kubeovnv1.Vip{
×
502
                        ObjectMeta: metav1.ObjectMeta{
×
503
                                Name: vipName,
×
504
                        },
×
505
                        Spec: kubeovnv1.VipSpec{
×
506
                                Subnet: subnetName,
×
507
                        },
×
508
                }
×
509
                if _, err = c.config.KubeOvnClient.KubeovnV1().Vips().Create(context.Background(), vip, metav1.CreateOptions{}); err != nil {
×
510
                        klog.Errorf("failed to create health check vip %s, %v", vipName, err)
×
511
                        return "", err
×
512
                }
×
513

514
                // wait for vip created
515
                // TODO: WATCH VIP
516
                time.Sleep(1 * time.Second)
×
517
                checkVip, err = c.virtualIpsLister.Get(vipName)
×
518
                if err != nil {
×
519
                        klog.Errorf("failed to get health check vip %s, %v", vipName, err)
×
520
                        return "", err
×
521
                }
×
522
        }
523

524
        if checkVip.Status.V4ip == "" && checkVip.Status.V6ip == "" {
×
525
                err = fmt.Errorf("vip %s is not ready", vipName)
×
526
                klog.Error(err)
×
527
                return "", err
×
528
        }
×
529

530
        switch util.CheckProtocol(lbVip) {
×
531
        case kubeovnv1.ProtocolIPv4:
×
532
                checkIP = checkVip.Status.V4ip
×
533
        case kubeovnv1.ProtocolIPv6:
×
534
                checkIP = checkVip.Status.V6ip
×
535
        }
536
        if checkIP == "" {
×
537
                err = fmt.Errorf("failed to get health check vip subnet %s", vipName)
×
538
                klog.Error(err)
×
539
                return "", err
×
540
        }
×
541

542
        return checkIP, nil
×
543
}
544

545
// getEndpointBackend returns the LB backend for a service
NEW
546
func (c *Controller) getEndpointBackend(endpointSlices []*discoveryv1.EndpointSlice, servicePort v1.ServicePort, serviceIP string) (backends []string) {
×
NEW
547
        protocol := util.CheckProtocol(serviceIP)
×
548

×
549
        for _, endpointSlice := range endpointSlices {
×
550
                var targetPort int32
×
551
                for _, port := range endpointSlice.Ports {
×
552
                        if port.Name != nil && *port.Name == servicePort.Name {
×
553
                                targetPort = *port.Port
×
554
                                break
×
555
                        }
556
                }
557
                if targetPort == 0 {
×
558
                        continue
×
559
                }
560

561
                for _, endpoint := range endpointSlice.Endpoints {
×
562
                        if !endpointReady(endpoint) {
×
563
                                continue
×
564
                        }
565

566
                        for _, address := range endpoint.Addresses {
×
567
                                if util.CheckProtocol(address) == protocol {
×
568
                                        backends = append(backends, util.JoinHostPort(address, targetPort))
×
569
                                }
×
570
                        }
571
                }
572
        }
573

NEW
574
        return backends
×
575
}
576

577
// endpointReady returns whether an endpoint can receive traffic
578
func endpointReady(endpoint discoveryv1.Endpoint) bool {
1✔
579
        return endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready
1✔
580
}
1✔
581

582
// addIPPortMappingEntry adds a new entry to an IPPortMapping for a given target, the addresses on that target and the
583
// VIP used to run the health checks
NEW
584
func (c *Controller) addIPPortMappingEntry(pod *v1.Pod, addresses []string, checkVip string, mapping IPPortMapping) error {
×
NEW
585
        // Abort if the pod is getting deleted
×
NEW
586
        if !pod.DeletionTimestamp.IsZero() {
×
NEW
587
                return nil
×
NEW
588
        }
×
589

590
        // Compute the name of the LSP for that endpoint target
NEW
591
        lspName, err := c.getEndpointTargetLSPName(pod, addresses)
×
NEW
592
        if err != nil {
×
NEW
593
                return fmt.Errorf("couldn't get LSP for the endpoint's target: %w", err)
×
NEW
594
        }
×
595

NEW
596
        for _, address := range addresses {
×
NEW
597
                mapping[address] = fmt.Sprintf(util.HealthCheckNamedVipTemplate, lspName, checkVip)
×
NEW
598
        }
×
599

NEW
600
        return nil
×
601
}
602

603
// getIPPortMapping returns the mapping between each endpoint, LSP and health check VIP
NEW
604
func (c *Controller) getIPPortMapping(endpointSlices []*discoveryv1.EndpointSlice, service *v1.Service, checkVip string) (IPPortMapping, error) {
×
NEW
605
        // Choose the most optimized and straightforward way to compute the IPPortMapping
×
NEW
606
        if serviceHasSelector(service) {
×
NEW
607
                // The service has a selector, which means that the EndpointSlices should have targets.
×
NEW
608
                // We can use those targets instead of looking at every pod in the namespace.
×
NEW
609
                return c.getIPPortMappingWithTargets(endpointSlices, checkVip), nil
×
NEW
610
        }
×
611

612
        // The service has no selectors, we must find which pods in the namespace of the service
613
        // are targeted by the endpoint by only looking at the IPs.
NEW
614
        pods, err := c.podsLister.Pods(service.Namespace).List(labels.Everything())
×
NEW
615
        if err != nil {
×
NEW
616
                err := fmt.Errorf("failed to get pods for service %s in namespace %s: %w", service.Name, service.Namespace, err)
×
NEW
617
                klog.Error(err)
×
NEW
618
                return nil, err
×
NEW
619
        }
×
620

NEW
621
        return c.getIPPortMappingWithNoTargets(endpointSlices, pods, checkVip), nil
×
622
}
623

624
// getIPPortMappingWithTargets returns the IPPortMapping for endpoints with targets
NEW
625
func (c *Controller) getIPPortMappingWithTargets(endpointSlices []*discoveryv1.EndpointSlice, checkVip string) IPPortMapping {
×
NEW
626
        mapping := make(IPPortMapping)
×
NEW
627

×
NEW
628
        for _, slice := range endpointSlices {
×
NEW
629
                for _, endpoint := range slice.Endpoints {
×
NEW
630
                        if endpoint.TargetRef == nil {
×
NEW
631
                                continue
×
632
                        }
633

NEW
634
                        namespace, name := endpoint.TargetRef.Namespace, endpoint.TargetRef.Name
×
NEW
635
                        if name == "" || namespace == "" {
×
NEW
636
                                continue
×
637
                        }
638

639
                        // Retrieve the pod for that endpoint target
NEW
640
                        pod, err := c.podsLister.Pods(namespace).Get(name)
×
NEW
641
                        if err != nil {
×
NEW
642
                                err := fmt.Errorf("couldn't retrieve pod %s/%s: %w", namespace, name, err)
×
NEW
643
                                klog.Error(err)
×
NEW
644
                                continue
×
645
                        }
646

647
                        // Compute the IPPortMapping for that endpoint target
NEW
648
                        if err := c.addIPPortMappingEntry(pod, endpoint.Addresses, checkVip, mapping); err != nil {
×
NEW
649
                                err := fmt.Errorf("couldn't compute ip port mapping for pod %s/%s: %w", namespace, name, err)
×
NEW
650
                                klog.Error(err)
×
NEW
651
                                continue
×
652
                        }
653
                }
654
        }
655

NEW
656
        return mapping
×
657
}
658

659
// getIPPortMappingWithTargets returns the IPPortMapping for endpoints with no targets
NEW
660
func (c *Controller) getIPPortMappingWithNoTargets(endpointSlices []*discoveryv1.EndpointSlice, pods []*v1.Pod, checkVip string) IPPortMapping {
×
NEW
661
        mapping := make(IPPortMapping)
×
NEW
662

×
NEW
663
        for _, slice := range endpointSlices {
×
NEW
664
                for _, endpoint := range slice.Endpoints {
×
NEW
665
                        for _, pod := range pods {
×
NEW
666
                                // Try to find a matching provider for the addresses
×
NEW
667
                                provider, err := c.getEndpointProvider(pod, endpoint.Addresses)
×
NEW
668
                                if err != nil {
×
NEW
669
                                        err := fmt.Errorf("couldn't get provider for pod %s/%s: %w", pod.Namespace, pod.Name, err)
×
NEW
670
                                        klog.Error(err)
×
NEW
671
                                        continue
×
672
                                }
673

674
                                // If the pod has a provider that matches that set of addresses, it is an endpoint target.
675
                                // Otherwise, it isn't targeted by the EndpointSlice and can be dismissed.
NEW
676
                                if provider == "" {
×
NEW
677
                                        continue
×
678
                                }
679

680
                                // Compute the IPPortMapping for that endpoint target
NEW
681
                                if err := c.addIPPortMappingEntry(pod, endpoint.Addresses, checkVip, mapping); err != nil {
×
NEW
682
                                        err := fmt.Errorf("couldn't compute ip port mapping for pod %s/%s: %w", pod.Namespace, pod.Name, err)
×
NEW
683
                                        klog.Error(err)
×
NEW
684
                                        continue
×
685
                                }
686
                        }
687
                }
688
        }
689

NEW
690
        return mapping
×
691
}
692

693
// getPodProviders returns all the providers available on a pod
694
func (c *Controller) getPodProviders(pod *v1.Pod) ([]string, error) {
×
695
        // Get all the networks to which the pod is attached
×
696
        podNetworks, err := c.getPodKubeovnNets(pod)
×
697
        if err != nil {
×
698
                return nil, fmt.Errorf("failed to get pod networks: %w", err)
×
699
        }
×
700

701
        // Retrieve all the providers
702
        var providers []string
×
703
        for _, podNetwork := range podNetworks {
×
704
                providers = append(providers, podNetwork.ProviderName)
×
705
        }
×
706

707
        return providers, nil
×
708
}
709

710
// getMatchingProviderForAddress returns the provider linked to a subnet in which a particular address is present
711
func getMatchingProviderForAddress(pod *v1.Pod, providers []string, address string) string {
1✔
712
        if pod.Annotations == nil {
2✔
713
                return ""
1✔
714
        }
1✔
715

716
        // Find which provider is linked to this address
717
        for _, provider := range providers {
2✔
718
                ipsForProvider, exists := pod.Annotations[fmt.Sprintf(util.IPAddressAnnotationTemplate, provider)]
1✔
719
                if !exists {
1✔
720
                        continue
×
721
                }
722

723
                ips := strings.Split(ipsForProvider, ",")
1✔
724
                if slices.Contains(ips, address) {
2✔
725
                        return provider
1✔
726
                }
1✔
727
        }
728

729
        return ""
1✔
730
}
731

732
// getEndpointProvider returns the provider linked to the addresses of an endpoint
733
func (c *Controller) getEndpointProvider(pod *v1.Pod, addresses []string) (string, error) {
×
734
        // Retrieve all the providers of the pod
×
735
        providers, err := c.getPodProviders(pod)
×
736
        if err != nil {
×
737
                return "", err
×
738
        }
×
739

740
        // Get the first matching provider for any of the address in the endpoint
741
        var provider string
×
742
        for _, address := range addresses {
×
743
                if provider = getMatchingProviderForAddress(pod, providers, address); provider != "" {
×
744
                        return provider, nil
×
745
                }
×
746
        }
747

748
        return "", nil
×
749
}
750

751
// getEndpointTargetLSPNameFromProvider returns the name of the LSP for a pod targeted by an endpoint.
752
// A custom provider can be specified if the LSP is within a subnet that doesn't use
753
// the default "ovn" provider.
754
func getEndpointTargetLSPNameFromProvider(pod *v1.Pod, provider string) string {
1✔
755
        // If no provider is specified, use the default one
1✔
756
        if provider == "" {
2✔
757
                provider = util.OvnProvider
1✔
758
        }
1✔
759

760
        target := pod.Name
1✔
761

1✔
762
        // If this pod is a VM launcher pod, we need to retrieve the name of the VM. This is necessary
1✔
763
        // because we do not use the same syntax for the LSP of normal pods and for VM pods
1✔
764
        if vmName, exists := pod.Annotations[fmt.Sprintf(util.VMAnnotationTemplate, provider)]; exists {
2✔
765
                target = vmName
1✔
766
        }
1✔
767

768
        return ovs.PodNameToPortName(target, pod.Namespace, provider)
1✔
769
}
770

771
// getEndpointTargetLSP returns the name of the LSP on which addresses are attached for a specific pod
772
func (c *Controller) getEndpointTargetLSPName(pod *v1.Pod, addresses []string) (string, error) {
×
773
        // Retrieve the provider for those addresses
×
774
        provider, err := c.getEndpointProvider(pod, addresses)
×
775
        if err != nil {
×
776
                return "", err
×
777
        }
×
778

779
        return getEndpointTargetLSPNameFromProvider(pod, provider), nil
×
780
}
781

782
// getSubnetByProvider returns the subnet linked to a provider on a pod
NEW
783
func getSubnetByProvider(pod *v1.Pod, provider string) (string, error) {
×
784
        subnetName, exists := pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, provider)]
×
785
        if !exists {
×
786
                return "", fmt.Errorf("couldn't find subnet linked to provider %s", provider)
×
787
        }
×
788

789
        return subnetName, nil
×
790
}
791

792
// getVpcByProvider returns the VPC linked to a provider on a pod
NEW
793
func getVpcByProvider(pod *v1.Pod, provider string) (string, error) {
×
794
        vpcName, exists := pod.Annotations[fmt.Sprintf(util.LogicalRouterAnnotationTemplate, provider)]
×
795
        if !exists {
×
796
                return "", fmt.Errorf("couldn't find vpc linked to provider %s", provider)
×
797
        }
×
798

799
        return vpcName, nil
×
800
}
801

802
// getEndpointVpcAndSubnet returns the VPC/subnet for a pod and a set of addresses attached to it
803
func (c *Controller) getEndpointVpcAndSubnet(pod *v1.Pod, addresses []string) (string, string, error) {
×
804
        // Retrieve the provider for those addresses
×
805
        provider, err := c.getEndpointProvider(pod, addresses)
×
806
        if err != nil {
×
807
                return "", "", err
×
808
        }
×
809

810
        if provider == "" {
×
811
                return "", "", nil
×
812
        }
×
813

814
        // Retrieve the subnet
NEW
815
        subnet, err := getSubnetByProvider(pod, provider)
×
816
        if err != nil {
×
817
                return "", "", err
×
818
        }
×
819

820
        // Retrieve the VPC
NEW
821
        vpc, err := getVpcByProvider(pod, provider)
×
822
        if err != nil {
×
823
                return "", "", err
×
824
        }
×
825

826
        return vpc, subnet, nil
×
827
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc