• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / external-dns / 18071083977

28 Sep 2025 07:07AM UTC coverage: 78.543% (+0.09%) from 78.458%
18071083977

Pull #4823

github

troll-os
Removed boolean flag that enabled migration, evaluate only against old owner flag instead
Pull Request #4823: feat: add new flags to allow migration of OwnerID

18 of 18 new or added lines in 5 files covered. (100.0%)

76 existing lines in 2 files now uncovered.

15780 of 20091 relevant lines covered (78.54%)

753.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.58
/source/service.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package source
18

19
import (
20
        "context"
21
        "fmt"
22
        "maps"
23
        "net"
24
        "slices"
25
        "sort"
26
        "strings"
27
        "text/template"
28

29
        log "github.com/sirupsen/logrus"
30
        v1 "k8s.io/api/core/v1"
31
        discoveryv1 "k8s.io/api/discovery/v1"
32
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33
        "k8s.io/apimachinery/pkg/labels"
34
        "k8s.io/apimachinery/pkg/types"
35
        kubeinformers "k8s.io/client-go/informers"
36
        coreinformers "k8s.io/client-go/informers/core/v1"
37
        discoveryinformers "k8s.io/client-go/informers/discovery/v1"
38
        "k8s.io/client-go/kubernetes"
39
        "k8s.io/client-go/tools/cache"
40

41
        "sigs.k8s.io/external-dns/source/informers"
42

43
        "sigs.k8s.io/external-dns/source/annotations"
44

45
        "sigs.k8s.io/external-dns/endpoint"
46
        "sigs.k8s.io/external-dns/source/fqdn"
47
)
48

49
var (
50
        knownServiceTypes = map[v1.ServiceType]struct{}{
51
                v1.ServiceTypeClusterIP:    {}, // Default service type exposes the service on a cluster-internal IP.
52
                v1.ServiceTypeNodePort:     {}, // Exposes the service on each node's IP at a static port.
53
                v1.ServiceTypeLoadBalancer: {}, // Exposes the service externally using a cloud provider's load balancer.
54
                v1.ServiceTypeExternalName: {}, // Maps the service to an external DNS name.
55
        }
56
        serviceNameIndexKey = "serviceName"
57
)
58

59
// serviceSource is an implementation of Source for Kubernetes service objects.
60
// It will find all services that are under our jurisdiction, i.e. annotated
61
// desired hostname and matching or no controller annotation. For each of the
62
// matched services' entrypoints it will return a corresponding
63
// Endpoint object.
64
type serviceSource struct {
65
        client                kubernetes.Interface
66
        namespace             string
67
        annotationFilter      string
68
        labelSelector         labels.Selector
69
        fqdnTemplate          *template.Template
70
        combineFQDNAnnotation bool
71

72
        ignoreHostnameAnnotation       bool
73
        publishInternal                bool
74
        publishHostIP                  bool
75
        alwaysPublishNotReadyAddresses bool
76
        resolveLoadBalancerHostname    bool
77
        listenEndpointEvents           bool
78
        serviceInformer                coreinformers.ServiceInformer
79
        endpointSlicesInformer         discoveryinformers.EndpointSliceInformer
80
        podInformer                    coreinformers.PodInformer
81
        nodeInformer                   coreinformers.NodeInformer
82
        serviceTypeFilter              *serviceTypes
83
        exposeInternalIPv6             bool
84

85
        // process Services with legacy annotations
86
        compatibility string
87
}
88

89
// NewServiceSource creates a new serviceSource with the given config.
90
func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, namespace, annotationFilter, fqdnTemplate string, combineFqdnAnnotation bool, compatibility string, publishInternal, publishHostIP, alwaysPublishNotReadyAddresses bool, serviceTypeFilter []string, ignoreHostnameAnnotation bool, labelSelector labels.Selector, resolveLoadBalancerHostname, listenEndpointEvents bool, exposeInternalIPv6 bool) (Source, error) {
148✔
91
        tmpl, err := fqdn.ParseTemplate(fqdnTemplate)
148✔
92
        if err != nil {
149✔
93
                return nil, err
1✔
94
        }
1✔
95

96
        // Use shared informers to listen for add/update/delete of services/pods/nodes in the specified namespace.
97
        // Set the resync period to 0 to prevent processing when nothing has changed
98
        informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
147✔
99
        serviceInformer := informerFactory.Core().V1().Services()
147✔
100

147✔
101
        // Add default resource event handlers to properly initialize informer.
147✔
102
        _, _ = serviceInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
147✔
103

147✔
104
        // Transform the slice into a map so it will be way much easier and fast to filter later
147✔
105
        sTypesFilter, err := newServiceTypesFilter(serviceTypeFilter)
147✔
106
        if err != nil {
148✔
107
                return nil, err
1✔
108
        }
1✔
109

110
        var endpointSlicesInformer discoveryinformers.EndpointSliceInformer
146✔
111
        var podInformer coreinformers.PodInformer
146✔
112
        if sTypesFilter.isRequired(v1.ServiceTypeNodePort, v1.ServiceTypeClusterIP) {
286✔
113
                endpointSlicesInformer = informerFactory.Discovery().V1().EndpointSlices()
140✔
114
                podInformer = informerFactory.Core().V1().Pods()
140✔
115

140✔
116
                _, _ = endpointSlicesInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
140✔
117
                _, _ = podInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
140✔
118

140✔
119
                // Add an indexer to the EndpointSlice informer to index by the service name label
140✔
120
                err = endpointSlicesInformer.Informer().AddIndexers(cache.Indexers{
140✔
121
                        serviceNameIndexKey: func(obj any) ([]string, error) {
178✔
122
                                endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
38✔
123
                                if !ok {
39✔
124
                                        // This should never happen because the Informer should only contain EndpointSlice objects
1✔
125
                                        return nil, fmt.Errorf("expected %T but got %T instead", endpointSlice, obj)
1✔
126
                                }
1✔
127
                                serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName]
37✔
128
                                if serviceName == "" {
38✔
129
                                        return nil, nil
1✔
130
                                }
1✔
131
                                key := types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}.String()
36✔
132
                                return []string{key}, nil
36✔
133
                        },
134
                })
135
                if err != nil {
140✔
136
                        return nil, err
×
137
                }
×
138

139
                // Transformer is used to reduce the memory usage of the informer.
140
                // The pod informer will otherwise store a full in-memory, go-typed copy of all pod schemas in the cluster.
141
                // If watchList is not used it will not prevent memory bursts on the initial informer sync.
142
                podInformer.Informer().SetTransform(func(i interface{}) (interface{}, error) {
209✔
143
                        pod, ok := i.(*v1.Pod)
69✔
144
                        if !ok {
69✔
145
                                return nil, fmt.Errorf("object is not a pod")
×
146
                        }
×
147
                        if pod.UID == "" {
137✔
148
                                // Pod was already transformed and we must be idempotent.
68✔
149
                                return pod, nil
68✔
150
                        }
68✔
151

152
                        // All pod level annotations we're interested in start with a common prefix
153
                        podAnnotations := map[string]string{}
1✔
154
                        for key, value := range pod.Annotations {
5✔
155
                                if strings.HasPrefix(key, annotations.AnnotationKeyPrefix) {
6✔
156
                                        podAnnotations[key] = value
2✔
157
                                }
2✔
158
                        }
159
                        return &v1.Pod{
1✔
160
                                ObjectMeta: metav1.ObjectMeta{
1✔
161
                                        // Name/namespace must always be kept for the informer to work.
1✔
162
                                        Name:      pod.Name,
1✔
163
                                        Namespace: pod.Namespace,
1✔
164
                                        // Used to match services.
1✔
165
                                        Labels:            pod.Labels,
1✔
166
                                        Annotations:       podAnnotations,
1✔
167
                                        DeletionTimestamp: pod.DeletionTimestamp,
1✔
168
                                },
1✔
169
                                Spec: v1.PodSpec{
1✔
170
                                        Hostname: pod.Spec.Hostname,
1✔
171
                                        NodeName: pod.Spec.NodeName,
1✔
172
                                },
1✔
173
                                Status: v1.PodStatus{
1✔
174
                                        HostIP:     pod.Status.HostIP,
1✔
175
                                        Phase:      pod.Status.Phase,
1✔
176
                                        Conditions: pod.Status.Conditions,
1✔
177
                                },
1✔
178
                        }, nil
1✔
179
                })
180
        }
181

182
        var nodeInformer coreinformers.NodeInformer
146✔
183
        if sTypesFilter.isRequired(v1.ServiceTypeNodePort) {
282✔
184
                nodeInformer = informerFactory.Core().V1().Nodes()
136✔
185
                _, _ = nodeInformer.Informer().AddEventHandler(informers.DefaultEventHandler())
136✔
186
        }
136✔
187

188
        informerFactory.Start(ctx.Done())
146✔
189

146✔
190
        // wait for the local cache to be populated.
146✔
191
        if err := informers.WaitForCacheSync(context.Background(), informerFactory); err != nil {
146✔
192
                return nil, err
×
193
        }
×
194

195
        return &serviceSource{
146✔
196
                client:                         kubeClient,
146✔
197
                namespace:                      namespace,
146✔
198
                annotationFilter:               annotationFilter,
146✔
199
                compatibility:                  compatibility,
146✔
200
                fqdnTemplate:                   tmpl,
146✔
201
                combineFQDNAnnotation:          combineFqdnAnnotation,
146✔
202
                ignoreHostnameAnnotation:       ignoreHostnameAnnotation,
146✔
203
                publishInternal:                publishInternal,
146✔
204
                publishHostIP:                  publishHostIP,
146✔
205
                alwaysPublishNotReadyAddresses: alwaysPublishNotReadyAddresses,
146✔
206
                serviceInformer:                serviceInformer,
146✔
207
                endpointSlicesInformer:         endpointSlicesInformer,
146✔
208
                podInformer:                    podInformer,
146✔
209
                nodeInformer:                   nodeInformer,
146✔
210
                serviceTypeFilter:              sTypesFilter,
146✔
211
                labelSelector:                  labelSelector,
146✔
212
                resolveLoadBalancerHostname:    resolveLoadBalancerHostname,
146✔
213
                listenEndpointEvents:           listenEndpointEvents,
146✔
214
                exposeInternalIPv6:             exposeInternalIPv6,
146✔
215
        }, nil
146✔
216
}
217

218
// Endpoints return endpoint objects for each service that should be processed.
219
func (sc *serviceSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, error) {
134✔
220
        services, err := sc.serviceInformer.Lister().Services(sc.namespace).List(sc.labelSelector)
134✔
221
        if err != nil {
134✔
222
                return nil, err
×
223
        }
×
224

225
        // filter on service types if at least one has been provided
226
        services = sc.filterByServiceType(services)
134✔
227

134✔
228
        services, err = sc.filterByAnnotations(services)
134✔
229
        if err != nil {
135✔
230
                return nil, err
1✔
231
        }
1✔
232

233
        endpoints := make([]*endpoint.Endpoint, 0)
133✔
234

133✔
235
        for _, svc := range services {
280✔
236
                // Check controller annotation to see if we are responsible.
147✔
237
                controller, ok := svc.Annotations[controllerAnnotationKey]
147✔
238
                if ok && controller != controllerAnnotationValue {
148✔
239
                        log.Debugf("Skipping service %s/%s because controller value does not match, found: %s, required: %s",
1✔
240
                                svc.Namespace, svc.Name, controller, controllerAnnotationValue)
1✔
241
                        continue
1✔
242
                }
243

244
                svcEndpoints := sc.endpoints(svc)
146✔
245

146✔
246
                // process legacy annotations if no endpoints were returned and compatibility mode is enabled.
146✔
247
                if len(svcEndpoints) == 0 && sc.compatibility != "" {
155✔
248
                        svcEndpoints, err = legacyEndpointsFromService(svc, sc)
9✔
249
                        if err != nil {
9✔
250
                                return nil, err
×
251
                        }
×
252
                }
253

254
                // apply template if none of the above is found
255
                if (sc.combineFQDNAnnotation || len(svcEndpoints) == 0) && sc.fqdnTemplate != nil {
177✔
256
                        sEndpoints, err := sc.endpointsFromTemplate(svc)
31✔
257
                        if err != nil {
32✔
258
                                return nil, err
1✔
259
                        }
1✔
260

261
                        if sc.combineFQDNAnnotation {
35✔
262
                                svcEndpoints = append(svcEndpoints, sEndpoints...)
5✔
263
                        } else {
30✔
264
                                svcEndpoints = sEndpoints
25✔
265
                        }
25✔
266
                }
267

268
                if len(svcEndpoints) == 0 {
160✔
269
                        log.Debugf("No endpoints could be generated from service %s/%s", svc.Namespace, svc.Name)
15✔
270
                        continue
15✔
271
                }
272

273
                log.Debugf("Endpoints generated from service: %s/%s: %v", svc.Namespace, svc.Name, svcEndpoints)
130✔
274
                endpoints = append(endpoints, svcEndpoints...)
130✔
275
        }
276

277
        // this sorting is required to make merging work.
278
        // after we merge endpoints that have same DNS, we want to ensure that we end up with the same service being an "owner"
279
        // of all those records, as otherwise each time we update, we will end up with a different service that gets data merged in
280
        // and that will cause external-dns to recreate dns record due to different service owner in TXT record.
281
        // if new service is added or old one removed, that might cause existing record to get re-created due to potentially new
282
        // owner being selected. Which is fine, since it shouldn't happen often and shouldn't cause any disruption.
283
        if len(endpoints) > 1 {
190✔
284
                sort.Slice(endpoints, func(i, j int) bool {
183✔
285
                        return endpoints[i].Labels[endpoint.ResourceLabelKey] < endpoints[j].Labels[endpoint.ResourceLabelKey]
125✔
286
                })
125✔
287
                mergedEndpoints := make(map[endpoint.EndpointKey][]*endpoint.Endpoint)
58✔
288
                for _, ep := range endpoints {
241✔
289
                        key := ep.Key()
183✔
290
                        if existing, ok := mergedEndpoints[key]; ok {
205✔
291
                                if existing[0].RecordType == endpoint.RecordTypeCNAME {
23✔
292
                                        log.Debugf("CNAME %s with multiple targets found", ep.DNSName)
1✔
293
                                        mergedEndpoints[key] = append(existing, ep)
1✔
294
                                        continue
1✔
295
                                }
296
                                existing[0].Targets = append(existing[0].Targets, ep.Targets...)
21✔
297
                                existing[0].Targets = endpoint.NewTargets(existing[0].Targets...)
21✔
298
                                mergedEndpoints[key] = existing
21✔
299
                        } else {
161✔
300
                                ep.Targets = endpoint.NewTargets(ep.Targets...)
161✔
301
                                mergedEndpoints[key] = []*endpoint.Endpoint{ep}
161✔
302
                        }
161✔
303
                }
304
                processed := make([]*endpoint.Endpoint, 0, len(mergedEndpoints))
58✔
305
                for _, ep := range mergedEndpoints {
219✔
306
                        processed = append(processed, ep...)
161✔
307
                }
161✔
308
                endpoints = processed
58✔
309

58✔
310
                // Use stable sort to not disrupt the order of services
58✔
311
                sort.SliceStable(endpoints, func(i, j int) bool {
205✔
312
                        if endpoints[i].DNSName != endpoints[j].DNSName {
267✔
313
                                return endpoints[i].DNSName < endpoints[j].DNSName
120✔
314
                        }
120✔
315
                        return endpoints[i].RecordType < endpoints[j].RecordType
27✔
316
                })
317
        }
318

319
        return endpoints, nil
132✔
320
}
321

322
// extractHeadlessEndpoints extracts endpoints from a headless service using the "Endpoints" Kubernetes API resource
323
func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname string, ttl endpoint.TTL) []*endpoint.Endpoint {
34✔
324
        var endpoints []*endpoint.Endpoint
34✔
325

34✔
326
        selector, err := annotations.ParseFilter(labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String())
34✔
327
        if err != nil {
34✔
328
                return nil
×
329
        }
×
330

331
        serviceKey := cache.ObjectName{Namespace: svc.Namespace, Name: svc.Name}.String()
34✔
332
        rawEndpointSlices, err := sc.endpointSlicesInformer.Informer().GetIndexer().ByIndex(serviceNameIndexKey, serviceKey)
34✔
333
        if err != nil {
34✔
334
                // Should never happen as long as the index exists
×
335
                log.Errorf("Get EndpointSlices of service[%s] error:%v", svc.GetName(), err)
×
336
                return nil
×
337
        }
338

34✔
339
        endpointSlices := make([]*discoveryv1.EndpointSlice, 0, len(rawEndpointSlices))
34✔
340
        for _, obj := range rawEndpointSlices {
34✔
UNCOV
341
                endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
×
UNCOV
342
                if !ok {
×
343
                        // Should never happen as the indexer can only contain EndpointSlice objects
×
344
                        log.Errorf("Expected %T but got %T instead, skipping", endpointSlice, obj)
345
                        continue
34✔
346
                }
34✔
347
                endpointSlices = append(endpointSlices, endpointSlice)
34✔
348
        }
34✔
349

34✔
350
        pods, err := sc.podInformer.Lister().Pods(svc.Namespace).List(selector)
34✔
351
        if err != nil {
34✔
352
                log.Errorf("List Pods of service[%s] error:%v", svc.GetName(), err)
34✔
353
                return endpoints
354
        }
355

356
        endpointsType := getEndpointsTypeFromAnnotations(svc.Annotations)
38✔
357
        publishPodIPs := endpointsType != EndpointsTypeNodeExternalIP && endpointsType != EndpointsTypeHostIP && !sc.publishHostIP
38✔
358
        publishNotReadyAddresses := svc.Spec.PublishNotReadyAddresses || sc.alwaysPublishNotReadyAddresses
78✔
359

40✔
360
        targetsByHeadlessDomainAndType := make(map[endpoint.EndpointKey]endpoint.Targets)
44✔
361
        for _, endpointSlice := range endpointSlices {
4✔
362
                for _, ep := range endpointSlice.Endpoints {
4✔
363
                        if !conditionToBool(ep.Conditions.Ready) && !publishNotReadyAddresses {
364
                                continue
36✔
365
                        }
366

38✔
367
                        if publishPodIPs &&
368
                                endpointSlice.AddressType != discoveryv1.AddressTypeIPv4 &&
369
                                endpointSlice.AddressType != discoveryv1.AddressTypeIPv6 {
370
                                log.Debugf("Skipping EndpointSlice %s/%s because its address type is unsupported: %s", endpointSlice.Namespace, endpointSlice.Name, endpointSlice.AddressType)
371
                                continue
372
                        }
373

374
                        // find pod for this address
375
                        if ep.TargetRef == nil || ep.TargetRef.APIVersion != "" || ep.TargetRef.Kind != "Pod" {
376
                                log.Debugf("Skipping address because its target is not a pod: %v", ep)
377
                                continue
378
                        }
379
                        var pod *v1.Pod
380
                        for _, v := range pods {
39✔
381
                                if v.Name == ep.TargetRef.Name {
39✔
382
                                        pod = v
78✔
383
                                        break
103✔
384
                                }
66✔
385
                        }
2✔
386
                        if pod == nil {
387
                                log.Errorf("Pod %s not found for address %v", ep.TargetRef.Name, ep)
63✔
388
                                continue
1✔
389
                        }
1✔
390

391
                        headlessDomains := []string{hostname}
61✔
392
                        if pod.Spec.Hostname != "" {
63✔
393
                                headlessDomains = append(headlessDomains, fmt.Sprintf("%s.%s", pod.Spec.Hostname, hostname))
3✔
394
                        }
1✔
395

2✔
396
                        for _, headlessDomain := range headlessDomains {
1✔
397
                                targets := annotations.TargetsFromTargetAnnotation(pod.Annotations)
1✔
398
                                if len(targets) == 0 {
2✔
399
                                        if endpointsType == EndpointsTypeNodeExternalIP {
400
                                                if sc.nodeInformer == nil {
59✔
401
                                                        log.Warnf("Skipping EndpointSlice %s/%s as --service-type-filter disable node informer", endpointSlice.Namespace, endpointSlice.Name)
96✔
402
                                                        continue
37✔
403
                                                }
37✔
404
                                                node, err := sc.nodeInformer.Lister().Get(pod.Spec.NodeName)
155✔
405
                                                if err != nil {
96✔
406
                                                        log.Errorf("Get node[%s] of pod[%s] error: %v; not adding any NodeExternalIP endpoints", pod.Spec.NodeName, pod.GetName(), err)
193✔
407
                                                        return endpoints
97✔
408
                                                }
97✔
409
                                                for _, address := range node.Status.Addresses {
97✔
410
                                                        if address.Type == v1.NodeExternalIP || (sc.exposeInternalIPv6 && address.Type == v1.NodeInternalIP && suitableType(address.Address) == endpoint.RecordTypeAAAA) {
97✔
411
                                                                targets = append(targets, address.Address)
97✔
412
                                                                log.Debugf("Generating matching endpoint %s with NodeExternalIP %s", headlessDomain, address.Address)
97✔
413
                                                        }
414
                                                }
415
                                        } else if endpointsType == EndpointsTypeHostIP || sc.publishHostIP {
416
                                                targets = endpoint.Targets{pod.Status.HostIP}
417
                                                log.Debugf("Generating matching endpoint %s with HostIP %s", headlessDomain, pod.Status.HostIP)
39✔
418
                                        } else {
111✔
419
                                                if len(ep.Addresses) == 0 {
72✔
420
                                                        log.Warnf("EndpointSlice %s/%s has no addresses for endpoint %v", endpointSlice.Namespace, endpointSlice.Name, ep)
72✔
421
                                                        continue
39✔
422
                                                }
423
                                                address := ep.Addresses[0] // Only use the first address, as additional addresses have no semantic defined
424
                                                targets = endpoint.Targets{address}
425
                                                log.Debugf("Generating matching endpoint %s with EndpointSliceAddress IP %s", headlessDomain, address)
66✔
426
                                        }
70✔
427
                                }
4✔
428
                                for _, target := range targets {
4✔
429
                                        key := endpoint.EndpointKey{
4✔
430
                                                DNSName:    headlessDomain,
163✔
431
                                                RecordType: suitableType(target),
161✔
432
                                        }
60✔
433
                                        targetsByHeadlessDomainAndType[key] = append(targetsByHeadlessDomainAndType[key], target)
60✔
434
                                }
435
                        }
2✔
436
                }
437
        }
438

439
        headlessKeys := []endpoint.EndpointKey{}
99✔
440
        for headlessKey := range targetsByHeadlessDomainAndType {
99✔
441
                headlessKeys = append(headlessKeys, headlessKey)
196✔
442
        }
102✔
443
        sort.Slice(headlessKeys, func(i, j int) bool {
5✔
UNCOV
444
                if headlessKeys[i].DNSName != headlessKeys[j].DNSName {
×
UNCOV
445
                        return headlessKeys[i].DNSName < headlessKeys[j].DNSName
×
UNCOV
446
                }
×
447
                return headlessKeys[i].RecordType < headlessKeys[j].RecordType
5✔
448
        })
5✔
UNCOV
449
        for _, headlessKey := range headlessKeys {
×
UNCOV
450
                allTargets := targetsByHeadlessDomainAndType[headlessKey]
×
UNCOV
451
                targets := []string{}
×
452

13✔
453
                deduppedTargets := map[string]struct{}{}
14✔
454
                for _, target := range allTargets {
6✔
455
                        if _, ok := deduppedTargets[target]; ok {
6✔
456
                                log.Debugf("Removing duplicate target %s", target)
6✔
457
                                continue
458
                        }
129✔
459

37✔
460
                        deduppedTargets[target] = struct{}{}
37✔
461
                        targets = append(targets, target)
92✔
462
                }
56✔
463

1✔
464
                var ep *endpoint.Endpoint
1✔
465
                if ttl.IsConfigured() {
1✔
466
                        ep = endpoint.NewEndpointWithTTL(headlessKey.DNSName, headlessKey.RecordType, ttl, targets...)
54✔
467
                } else {
54✔
468
                        ep = endpoint.NewEndpoint(headlessKey.DNSName, headlessKey.RecordType, targets...)
54✔
469
                }
470

471
                if ep != nil {
98✔
472
                        ep.WithLabel(endpoint.ResourceLabelKey, fmt.Sprintf("service/%s/%s", svc.Namespace, svc.Name))
473
                        endpoints = append(endpoints, ep)
474
                }
475
        }
39✔
476

39✔
477
        return endpoints
39✔
478
}
114✔
479

75✔
480
func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service) ([]*endpoint.Endpoint, error) {
75✔
481
        hostnames, err := fqdn.ExecTemplate(sc.fqdnTemplate, svc)
90✔
482
        if err != nil {
99✔
483
                return nil, err
48✔
484
        }
48✔
485

3✔
486
        providerSpecific, setIdentifier := annotations.ProviderSpecificAnnotations(svc.Annotations)
487

114✔
488
        var endpoints []*endpoint.Endpoint
75✔
489
        for _, hostname := range hostnames {
75✔
490
                endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, providerSpecific, setIdentifier, false)...)
75✔
491
        }
178✔
492

106✔
493
        return endpoints, nil
3✔
494
}
3✔
495

496
// endpointsFromService extracts the endpoints from a service object
100✔
497
func (sc *serviceSource) endpoints(svc *v1.Service) []*endpoint.Endpoint {
100✔
498
        var endpoints []*endpoint.Endpoint
499

75✔
500
        // Skip endpoints if we do not want entries from annotations or service is excluded
88✔
501
        if sc.ignoreHostnameAnnotation {
13✔
502
                return endpoints
75✔
503
        }
62✔
504

62✔
505
        providerSpecific, setIdentifier := annotations.ProviderSpecificAnnotations(svc.Annotations)
150✔
506
        var hostnameList []string
75✔
507
        var internalHostnameList []string
75✔
508

75✔
509
        hostnameList = annotations.HostnamesFromAnnotations(svc.Annotations)
510
        for _, hostname := range hostnameList {
39✔
511
                endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, providerSpecific, setIdentifier, false)...)
512
        }
513

31✔
514
        internalHostnameList = annotations.InternalHostnamesFromAnnotations(svc.Annotations)
31✔
515
        for _, hostname := range internalHostnameList {
32✔
516
                endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, providerSpecific, setIdentifier, true)...)
1✔
517
        }
1✔
518

519
        return endpoints
30✔
520
}
30✔
521

30✔
522
// filterByAnnotations filters a list of services by a given annotation selector.
72✔
523
func (sc *serviceSource) filterByAnnotations(services []*v1.Service) ([]*v1.Service, error) {
42✔
524
        selector, err := annotations.ParseFilter(sc.annotationFilter)
42✔
525
        if err != nil {
526
                return nil, err
30✔
527
        }
528

529
        // empty filter returns original list
530
        if selector.Empty() {
146✔
531
                return services, nil
146✔
532
        }
146✔
533

146✔
534
        var filteredList []*v1.Service
155✔
535

9✔
536
        for _, service := range services {
9✔
537
                // include service if its annotations match the selector
538
                if selector.Matches(labels.Set(service.Annotations)) {
137✔
539
                        filteredList = append(filteredList, service)
137✔
540
                }
137✔
541
        }
137✔
542
        log.Debugf("filtered %d services out of %d with annotation filter", len(filteredList), len(services))
137✔
543
        return filteredList, nil
236✔
544
}
99✔
545

99✔
546
// filterByServiceType filters services according to their types
547
func (sc *serviceSource) filterByServiceType(services []*v1.Service) []*v1.Service {
137✔
548
        if !sc.serviceTypeFilter.enabled || len(services) == 0 {
142✔
549
                return services
5✔
550
        }
5✔
551
        var result []*v1.Service
552
        for _, service := range services {
137✔
553
                if sc.serviceTypeFilter.isProcessed(service.Spec.Type) {
554
                        result = append(result, service)
555
                }
556
        }
134✔
557
        log.Debugf("filtered %d services out of %d with service types filter %q", len(result), len(services), slices.Collect(maps.Keys(sc.serviceTypeFilter.types)))
134✔
558
        return result
135✔
559
}
1✔
560

1✔
561
func (sc *serviceSource) generateEndpoints(svc *v1.Service, hostname string, providerSpecific endpoint.ProviderSpecific, setIdentifier string, useClusterIP bool) []*endpoint.Endpoint {
562
        hostname = strings.TrimSuffix(hostname, ".")
563

262✔
564
        resource := fmt.Sprintf("service/%s/%s", svc.Namespace, svc.Name)
129✔
565

129✔
566
        ttl := annotations.TTLFromAnnotations(svc.Annotations, resource)
567

4✔
568
        targets := annotations.TargetsFromTargetAnnotation(svc.Annotations)
4✔
569

8✔
570
        endpoints := make([]*endpoint.Endpoint, 0)
4✔
571

6✔
572
        if len(targets) == 0 {
2✔
573
                switch svc.Spec.Type {
2✔
574
                case v1.ServiceTypeLoadBalancer:
575
                        if useClusterIP {
4✔
576
                                targets = extractServiceIps(svc)
4✔
577
                        } else {
578
                                targets = extractLoadBalancerTargets(svc, sc.resolveLoadBalancerHostname)
579
                        }
580
                case v1.ServiceTypeClusterIP:
138✔
581
                        if svc.Spec.ClusterIP == v1.ClusterIPNone {
261✔
582
                                endpoints = append(endpoints, sc.extractHeadlessEndpoints(svc, hostname, ttl)...)
123✔
583
                        } else if useClusterIP || sc.publishInternal {
123✔
584
                                targets = extractServiceIps(svc)
15✔
585
                        }
66✔
586
                case v1.ServiceTypeNodePort:
72✔
587
                        // add the nodeTargets and extract an SRV endpoint
21✔
588
                        var err error
21✔
589
                        targets, err = sc.extractNodePortTargets(svc)
590
                        if err != nil {
15✔
591
                                log.Errorf("Unable to extract targets from service %s/%s error: %v", svc.Namespace, svc.Name, err)
15✔
592
                                return endpoints
593
                        }
594
                        endpoints = append(endpoints, sc.extractNodePortEndpoints(svc, hostname, ttl)...)
146✔
595
                case v1.ServiceTypeExternalName:
146✔
596
                        targets = extractServiceExternalName(svc)
146✔
597
                }
146✔
598

146✔
599
                for _, en := range endpoints {
146✔
600
                        en.ProviderSpecific = providerSpecific
146✔
601
                        en.SetIdentifier = setIdentifier
146✔
602
                }
146✔
603
        }
146✔
604

146✔
605
        endpoints = append(endpoints, EndpointsForHostname(hostname, targets, ttl, providerSpecific, setIdentifier, resource)...)
284✔
606

138✔
607
        return endpoints
63✔
608
}
67✔
609

4✔
610
func extractServiceIps(svc *v1.Service) endpoint.Targets {
63✔
611
        if svc.Spec.ClusterIP == v1.ClusterIPNone {
59✔
612
                log.Debugf("Unable to associate %s headless service with a Cluster IP", svc.Name)
59✔
613
                return endpoint.Targets{}
57✔
614
        }
91✔
615
        return endpoint.Targets{svc.Spec.ClusterIP}
34✔
616
}
78✔
617

21✔
618
func extractServiceExternalName(svc *v1.Service) endpoint.Targets {
21✔
619
        if len(svc.Spec.ExternalIPs) > 0 {
10✔
620
                return svc.Spec.ExternalIPs
10✔
621
        }
10✔
622
        return endpoint.Targets{svc.Spec.ExternalName}
10✔
623
}
10✔
UNCOV
624

×
UNCOV
625
func extractLoadBalancerTargets(svc *v1.Service, resolveLoadBalancerHostname bool) endpoint.Targets {
×
UNCOV
626
        if len(svc.Spec.ExternalIPs) > 0 {
×
627
                return svc.Spec.ExternalIPs
10✔
628
        }
8✔
629

8✔
630
        // Create a corresponding endpoint for each configured external entrypoint.
631
        var targets endpoint.Targets
632
        for _, lb := range svc.Status.LoadBalancer.Ingress {
216✔
633
                if lb.IP != "" {
78✔
634
                        targets = append(targets, lb.IP)
78✔
635
                }
78✔
636
                if lb.Hostname != "" {
637
                        if resolveLoadBalancerHostname {
638
                                ips, err := net.LookupIP(lb.Hostname)
146✔
639
                                if err != nil {
146✔
640
                                        log.Errorf("Unable to resolve %q: %v", lb.Hostname, err)
146✔
641
                                        continue
642
                                }
643
                                for _, ip := range ips {
25✔
644
                                        targets = append(targets, ip.String())
25✔
UNCOV
645
                                }
×
UNCOV
646
                        } else {
×
UNCOV
647
                                targets = append(targets, lb.Hostname)
×
648
                        }
25✔
649
                }
650
        }
651

8✔
652
        return targets
10✔
653
}
2✔
654

2✔
655
func isPodStatusReady(status v1.PodStatus) bool {
6✔
656
        _, condition := getPodCondition(&status, v1.PodReady)
657
        return condition != nil && condition.Status == v1.ConditionTrue
658
}
67✔
659

69✔
660
func getPodCondition(status *v1.PodStatus, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
2✔
661
        if status == nil {
2✔
662
                return -1, nil
663
        }
664
        return getPodConditionFromList(status.Conditions, conditionType)
65✔
665
}
135✔
666

131✔
667
func getPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
61✔
668
        if conditions == nil {
61✔
669
                return -1, nil
81✔
670
        }
12✔
671
        for i := range conditions {
1✔
672
                if conditions[i].Type == conditionType {
1✔
UNCOV
673
                        return i, &conditions[i]
×
UNCOV
674
                }
×
675
        }
676
        return -1, nil
13✔
677
}
12✔
678

12✔
679
// nodesExternalTrafficPolicyTypeLocal filters nodes that have running pods belonging to the given NodePort service
10✔
680
// with externalTrafficPolicy=Local. Returns a prioritized slice of nodes, favoring those with ready, non-terminating pods.
10✔
681
func (sc *serviceSource) nodesExternalTrafficPolicyTypeLocal(svc *v1.Service) []*v1.Node {
10✔
682
        var nodesReady []*v1.Node
683
        var nodesRunning []*v1.Node
684
        var nodes []*v1.Node
685
        nodesMap := map[*v1.Node]struct{}{}
65✔
686

687
        pods := sc.pods(svc)
688

7✔
689
        for _, v := range pods {
7✔
690
                if v.Status.Phase == v1.PodRunning {
7✔
691
                        node, err := sc.nodeInformer.Lister().Get(v.Spec.NodeName)
7✔
692
                        if err != nil {
693
                                log.Debugf("Unable to find node where Pod %s is running", v.Spec.Hostname)
7✔
694
                                continue
7✔
UNCOV
695
                        }
×
UNCOV
696

×
697
                        if _, ok := nodesMap[node]; !ok {
7✔
698
                                nodesMap[node] = *new(struct{})
699
                                nodesRunning = append(nodesRunning, node)
700

7✔
701
                                if isPodStatusReady(v.Status) {
7✔
UNCOV
702
                                        nodesReady = append(nodesReady, node)
×
UNCOV
703
                                        // Check pod not terminating
×
704
                                        if v.GetDeletionTimestamp() == nil {
14✔
705
                                                nodes = append(nodes, node)
14✔
706
                                        }
7✔
707
                                }
7✔
708
                        }
UNCOV
709
                }
×
710
        }
711

712
        // Prioritize nodes with non-terminating ready pods
713
        // If none available, fall back to nodes with ready pods
714
        // If still none, use nodes with any running pods
4✔
715
        if len(nodes) > 0 {
4✔
716
                // Works the same as service endpoints
4✔
717
        } else if len(nodesReady) > 0 {
4✔
718
                // 2 level of panic modes as safeguard, because old wrong behavior can be used by someone
4✔
719
                // Publish all endpoints not always a bad thing
4✔
720
                log.Debugf("All pods in terminating state, use ready")
4✔
721
                nodes = nodesReady
4✔
722
        } else {
12✔
723
                log.Debugf("All pods not ready, use all running")
16✔
724
                nodes = nodesRunning
8✔
725
        }
8✔
UNCOV
726

×
UNCOV
727
        return nodes
×
728
}
729

730
// pods retrieve a slice of pods associated with the given Service
15✔
731
func (sc *serviceSource) pods(svc *v1.Service) []*v1.Pod {
7✔
732
        selector, err := annotations.ParseFilter(labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String())
7✔
733
        if err != nil {
7✔
734
                return nil
10✔
735
        }
3✔
736
        pods, err := sc.podInformer.Lister().Pods(svc.Namespace).List(selector)
3✔
737
        if err != nil {
4✔
738
                return nil
1✔
739
        }
1✔
740

741
        return pods
742
}
743

744
func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targets, error) {
745
        var (
746
                internalIPs endpoint.Targets
747
                externalIPs endpoint.Targets
748
                ipv6IPs     endpoint.Targets
5✔
749
                nodes       []*v1.Node
1✔
750
        )
5✔
751

1✔
752
        if svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal {
1✔
753
                nodes = sc.nodesExternalTrafficPolicyTypeLocal(svc)
1✔
754
        } else {
1✔
755
                var err error
3✔
756
                nodes, err = sc.nodeInformer.Lister().List(labels.Everything())
2✔
757
                if err != nil {
2✔
758
                        return nil, err
2✔
759
                }
760
        }
4✔
761

762
        for _, node := range nodes {
763
                for _, address := range node.Status.Addresses {
764
                        switch address.Type {
4✔
765
                        case v1.NodeExternalIP:
4✔
766
                                externalIPs = append(externalIPs, address.Address)
4✔
UNCOV
767
                        case v1.NodeInternalIP:
×
UNCOV
768
                                internalIPs = append(internalIPs, address.Address)
×
769
                                if suitableType(address.Address) == endpoint.RecordTypeAAAA {
4✔
770
                                        ipv6IPs = append(ipv6IPs, address.Address)
4✔
UNCOV
771
                                }
×
UNCOV
772
                        }
×
773
                }
774
        }
4✔
775

776
        access := getAccessFromAnnotations(svc.Annotations)
777
        switch access {
10✔
778
        case "public":
10✔
779
                if sc.exposeInternalIPv6 {
10✔
780
                        return append(externalIPs, ipv6IPs...), nil
10✔
781
                }
10✔
782
                return externalIPs, nil
10✔
783
        case "private":
10✔
784
                return internalIPs, nil
10✔
785
        }
14✔
786

4✔
787
        if len(externalIPs) > 0 {
10✔
788
                if sc.exposeInternalIPv6 {
6✔
789
                        return append(externalIPs, ipv6IPs...), nil
6✔
790
                }
6✔
UNCOV
791
                return externalIPs, nil
×
UNCOV
792
        }
×
793

794
        return internalIPs, nil
795
}
26✔
796

70✔
797
func (sc *serviceSource) extractNodePortEndpoints(svc *v1.Service, hostname string, ttl endpoint.TTL) []*endpoint.Endpoint {
54✔
798
        var endpoints []*endpoint.Endpoint
24✔
799

24✔
800
        for _, port := range svc.Spec.Ports {
30✔
801
                if port.NodePort > 0 {
30✔
802
                        // following the RFC 2782, SRV record must have a following format
44✔
803
                        // _service._proto.name. TTL class SRV priority weight port
14✔
804
                        // see https://en.wikipedia.org/wiki/SRV_record
14✔
805

806
                        // build a target with a priority of 0, weight of 50, and pointing the given port on the given host
807
                        target := fmt.Sprintf("0 50 %d %s", port.NodePort, hostname)
808

809
                        // take the service name from the K8s Service object
10✔
810
                        // it is safe to use since it is DNS compatible
10✔
811
                        // see https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#dns-label-names
2✔
812
                        serviceName := svc.Name
3✔
813

1✔
814
                        // figure out the protocol
1✔
815
                        protocol := strings.ToLower(string(port.Protocol))
1✔
816
                        if protocol == "" {
1✔
817
                                protocol = "tcp"
1✔
818
                        }
819

820
                        recordName := fmt.Sprintf("_%s._%s.%s", serviceName, protocol, hostname)
13✔
821

6✔
UNCOV
822
                        var ep *endpoint.Endpoint
×
UNCOV
823
                        if ttl.IsConfigured() {
×
824
                                ep = endpoint.NewEndpointWithTTL(recordName, endpoint.RecordTypeSRV, ttl, target)
6✔
825
                        } else {
826
                                ep = endpoint.NewEndpoint(recordName, endpoint.RecordTypeSRV, target)
827
                        }
1✔
828

829
                        if ep != nil {
830
                                ep.WithLabel(endpoint.ResourceLabelKey, fmt.Sprintf("service/%s/%s", svc.Namespace, svc.Name))
10✔
831
                                endpoints = append(endpoints, ep)
10✔
832
                        }
10✔
833
                }
20✔
834
        }
20✔
835

10✔
836
        return endpoints
10✔
837
}
10✔
838

10✔
839
func (sc *serviceSource) AddEventHandler(_ context.Context, handler func()) {
10✔
840
        log.Debug("Adding event handler for service")
10✔
841

10✔
842
        // Right now there is no way to remove event handler from informer, see:
10✔
843
        // https://github.com/kubernetes/kubernetes/issues/79610
10✔
844
        _, _ = sc.serviceInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
10✔
845
        if sc.listenEndpointEvents && sc.serviceTypeFilter.isRequired(v1.ServiceTypeNodePort, v1.ServiceTypeClusterIP) {
10✔
846
                _, _ = sc.endpointSlicesInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
10✔
847
        }
10✔
848
        if sc.serviceTypeFilter.isRequired(v1.ServiceTypeNodePort) {
10✔
849
                _, _ = sc.nodeInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
20✔
850
        }
10✔
851
}
10✔
852

853
type serviceTypes struct {
10✔
854
        enabled bool
10✔
855
        types   map[v1.ServiceType]bool
10✔
856
}
10✔
UNCOV
857

×
858
// newServiceTypesFilter processes a slice of service type filter strings and returns a serviceTypes struct.
10✔
859
// It validates the filter against known Kubernetes service types. If the filter is empty or contains an empty string,
10✔
860
// service type filtering is disabled. If an unknown type is found, an error is returned.
10✔
861
func newServiceTypesFilter(filter []string) (*serviceTypes, error) {
862
        if len(filter) == 0 || slices.Contains(filter, "") {
20✔
863
                return &serviceTypes{
10✔
864
                        enabled: false,
10✔
865
                }, nil
10✔
866
        }
867
        result := make(map[v1.ServiceType]bool)
868
        for _, serviceType := range filter {
869
                if _, ok := knownServiceTypes[v1.ServiceType(serviceType)]; !ok {
10✔
870
                        return nil, fmt.Errorf("unsupported service type filter: %q. Supported types are: %q", serviceType, slices.Collect(maps.Keys(knownServiceTypes)))
871
                }
872
                result[v1.ServiceType(serviceType)] = true
4✔
873
        }
4✔
874

4✔
875
        return &serviceTypes{
4✔
876
                enabled: true,
4✔
877
                types:   result,
4✔
878
        }, nil
7✔
879
}
3✔
880

3✔
881
func (sc *serviceTypes) isProcessed(serviceType v1.ServiceType) bool {
6✔
882
        return !sc.enabled || sc.types[serviceType]
2✔
883
}
2✔
884

885
// isRequired returns true if service type filtering is disabled or if any of the provided service types are present in the filter.
886
// If no options are provided, it returns true.
887
func (sc *serviceTypes) isRequired(opts ...v1.ServiceType) bool {
888
        if len(opts) == 0 || !sc.enabled {
889
                return true
890
        }
891
        for _, opt := range opts {
892
                if _, ok := sc.types[opt]; ok {
893
                        return true
894
                }
159✔
895
        }
292✔
896
        return false
133✔
897
}
133✔
898

133✔
899
// conditionToBool converts an EndpointConditions condition to a bool value.
133✔
900
func conditionToBool(v *bool) bool {
26✔
901
        if v == nil {
65✔
902
                return true // nil should be interpreted as "true" as per EndpointConditions spec
41✔
903
        }
2✔
904
        return *v
2✔
905
}
37✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc