• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / external-dns / 17323496975

29 Aug 2025 12:13PM UTC coverage: 77.431% (+0.3%) from 77.179%
17323496975

Pull #5459

github

u-kai
refactor(registry/txt): rename to isAbsent
Pull Request #5459: fix(txt-registry): skip creation of already-existing TXT records (#4914)

34 of 34 new or added lines in 1 file covered. (100.0%)

387 existing lines in 18 files now uncovered.

15346 of 19819 relevant lines covered (77.43%)

732.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.7
/controller/controller.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "errors"
22
        "fmt"
23
        "sync"
24
        "time"
25

26
        "github.com/prometheus/client_golang/prometheus"
27
        log "github.com/sirupsen/logrus"
28

29
        "sigs.k8s.io/external-dns/endpoint"
30
        "sigs.k8s.io/external-dns/pkg/metrics"
31
        "sigs.k8s.io/external-dns/plan"
32
        "sigs.k8s.io/external-dns/provider"
33
        "sigs.k8s.io/external-dns/registry"
34
        "sigs.k8s.io/external-dns/source"
35
)
36

37
var (
38
        registryErrorsTotal = metrics.NewCounterWithOpts(
39
                prometheus.CounterOpts{
40
                        Subsystem: "registry",
41
                        Name:      "errors_total",
42
                        Help:      "Number of Registry errors.",
43
                },
44
        )
45
        sourceErrorsTotal = metrics.NewCounterWithOpts(
46
                prometheus.CounterOpts{
47
                        Subsystem: "source",
48
                        Name:      "errors_total",
49
                        Help:      "Number of Source errors.",
50
                },
51
        )
52
        sourceEndpointsTotal = metrics.NewGaugeWithOpts(
53
                prometheus.GaugeOpts{
54
                        Subsystem: "source",
55
                        Name:      "endpoints_total",
56
                        Help:      "Number of Endpoints in all sources",
57
                },
58
        )
59
        registryEndpointsTotal = metrics.NewGaugeWithOpts(
60
                prometheus.GaugeOpts{
61
                        Subsystem: "registry",
62
                        Name:      "endpoints_total",
63
                        Help:      "Number of Endpoints in the registry",
64
                },
65
        )
66
        lastSyncTimestamp = metrics.NewGaugeWithOpts(
67
                prometheus.GaugeOpts{
68
                        Subsystem: "controller",
69
                        Name:      "last_sync_timestamp_seconds",
70
                        Help:      "Timestamp of last successful sync with the DNS provider",
71
                },
72
        )
73
        lastReconcileTimestamp = metrics.NewGaugeWithOpts(
74
                prometheus.GaugeOpts{
75
                        Subsystem: "controller",
76
                        Name:      "last_reconcile_timestamp_seconds",
77
                        Help:      "Timestamp of last attempted sync with the DNS provider",
78
                },
79
        )
80
        controllerNoChangesTotal = metrics.NewCounterWithOpts(
81
                prometheus.CounterOpts{
82
                        Subsystem: "controller",
83
                        Name:      "no_op_runs_total",
84
                        Help:      "Number of reconcile loops ending up with no changes on the DNS provider side.",
85
                },
86
        )
87
        deprecatedRegistryErrors = metrics.NewCounterWithOpts(
88
                prometheus.CounterOpts{
89
                        Subsystem: "registry",
90
                        Name:      "errors_total",
91
                        Help:      "Number of Registry errors.",
92
                },
93
        )
94
        deprecatedSourceErrors = metrics.NewCounterWithOpts(
95
                prometheus.CounterOpts{
96
                        Subsystem: "source",
97
                        Name:      "errors_total",
98
                        Help:      "Number of Source errors.",
99
                },
100
        )
101

102
        registryRecords = metrics.NewGaugedVectorOpts(
103
                prometheus.GaugeOpts{
104
                        Subsystem: "registry",
105
                        Name:      "records",
106
                        Help:      "Number of registry records partitioned by label name (vector).",
107
                },
108
                []string{"record_type"},
109
        )
110

111
        sourceRecords = metrics.NewGaugedVectorOpts(
112
                prometheus.GaugeOpts{
113
                        Subsystem: "source",
114
                        Name:      "records",
115
                        Help:      "Number of source records partitioned by label name (vector).",
116
                },
117
                []string{"record_type"},
118
        )
119

120
        verifiedRecords = metrics.NewGaugedVectorOpts(
121
                prometheus.GaugeOpts{
122
                        Subsystem: "controller",
123
                        Name:      "verified_records",
124
                        Help:      "Number of DNS records that exists both in source and registry (vector).",
125
                },
126
                []string{"record_type"},
127
        )
128

129
        consecutiveSoftErrors = metrics.NewGaugeWithOpts(
130
                prometheus.GaugeOpts{
131
                        Subsystem: "controller",
132
                        Name:      "consecutive_soft_errors",
133
                        Help:      "Number of consecutive soft errors in reconciliation loop.",
134
                },
135
        )
136
)
137

138
func init() {
139
        metrics.RegisterMetric.MustRegister(registryErrorsTotal)
1✔
140
        metrics.RegisterMetric.MustRegister(sourceErrorsTotal)
1✔
141
        metrics.RegisterMetric.MustRegister(sourceEndpointsTotal)
1✔
142
        metrics.RegisterMetric.MustRegister(registryEndpointsTotal)
1✔
143
        metrics.RegisterMetric.MustRegister(lastSyncTimestamp)
1✔
144
        metrics.RegisterMetric.MustRegister(lastReconcileTimestamp)
1✔
145
        metrics.RegisterMetric.MustRegister(deprecatedRegistryErrors)
1✔
146
        metrics.RegisterMetric.MustRegister(deprecatedSourceErrors)
1✔
147
        metrics.RegisterMetric.MustRegister(controllerNoChangesTotal)
1✔
148

1✔
149
        metrics.RegisterMetric.MustRegister(registryRecords)
1✔
150
        metrics.RegisterMetric.MustRegister(sourceRecords)
1✔
151
        metrics.RegisterMetric.MustRegister(verifiedRecords)
1✔
152

1✔
153
        metrics.RegisterMetric.MustRegister(consecutiveSoftErrors)
1✔
154
}
1✔
155

1✔
156
// Controller is responsible for orchestrating the different components.
157
// It works in the following way:
158
// * Ask the DNS provider for the current list of endpoints.
159
// * Ask the Source for the desired list of endpoints.
160
// * Take both lists and calculate a Plan to move current towards the desired state.
161
// * Tell the DNS provider to apply the changes calculated by the Plan.
162
type Controller struct {
163
        Source   source.Source
164
        Registry registry.Registry
165
        // The policy that defines which change to DNS records is allowed
166
        Policy plan.Policy
167
        // The interval between individual synchronizations
168
        Interval time.Duration
169
        // The DomainFilter defines which DNS records to keep or exclude
170
        DomainFilter endpoint.DomainFilterInterface
171
        // The nextRunAt used for throttling and batching reconciliation
172
        nextRunAt time.Time
173
        // The runAtMutex is for atomic updating of nextRunAt and lastRunAt
174
        runAtMutex sync.Mutex
175
        // The lastRunAt used for throttling and batching reconciliation
176
        lastRunAt time.Time
177
        // MangedRecordTypes are DNS record types that will be considered for management.
178
        ManagedRecordTypes []string
179
        // ExcludeRecordTypes are DNS record types that will be excluded from management.
180
        ExcludeRecordTypes []string
181
        // MinEventSyncInterval is used as a window for batching events
182
        MinEventSyncInterval time.Duration
183
}
184

185
// RunOnce runs a single iteration of a reconciliation loop.
186
func (c *Controller) RunOnce(ctx context.Context) error {
187
        lastReconcileTimestamp.Gauge.SetToCurrentTime()
188

16✔
189
        c.runAtMutex.Lock()
16✔
190
        c.lastRunAt = time.Now()
16✔
191
        c.runAtMutex.Unlock()
16✔
192

16✔
193
        regMetrics := newMetricsRecorder()
16✔
194

16✔
195
        regRecords, err := c.Registry.Records(ctx)
16✔
196
        if err != nil {
16✔
197
                registryErrorsTotal.Counter.Inc()
16✔
198
                deprecatedRegistryErrors.Counter.Inc()
19✔
199
                return err
3✔
200
        }
3✔
201

3✔
202
        registryEndpointsTotal.Gauge.Set(float64(len(regRecords)))
3✔
203

204
        countAddressRecords(regMetrics, regRecords, registryRecords)
13✔
205

13✔
206
        ctx = context.WithValue(ctx, provider.RecordsContextKey, regRecords)
13✔
207

13✔
208
        sourceEndpoints, err := c.Source.Endpoints(ctx)
13✔
209
        if err != nil {
13✔
210
                sourceErrorsTotal.Counter.Inc()
13✔
211
                deprecatedSourceErrors.Counter.Inc()
13✔
212
                return err
×
213
        }
×
UNCOV
214

×
UNCOV
215
        sourceEndpointsTotal.Gauge.Set(float64(len(sourceEndpoints)))
×
216

217
        sourceMetrics := newMetricsRecorder()
13✔
218
        countAddressRecords(sourceMetrics, sourceEndpoints, sourceRecords)
13✔
219

13✔
220
        vaMetrics := newMetricsRecorder()
13✔
221
        countMatchingAddressRecords(vaMetrics, sourceEndpoints, regRecords, verifiedRecords)
13✔
222

13✔
223
        endpoints, err := c.Registry.AdjustEndpoints(sourceEndpoints)
13✔
224
        if err != nil {
13✔
225
                return fmt.Errorf("adjusting endpoints: %w", err)
13✔
226
        }
13✔
UNCOV
227
        registryFilter := c.Registry.GetDomainFilter()
×
UNCOV
228

×
229
        plan := &plan.Plan{
13✔
230
                Policies:       []plan.Policy{c.Policy},
13✔
231
                Current:        regRecords,
13✔
232
                Desired:        endpoints,
13✔
233
                DomainFilter:   endpoint.MatchAllDomainFilters{c.DomainFilter, registryFilter},
13✔
234
                ManagedRecords: c.ManagedRecordTypes,
13✔
235
                ExcludeRecords: c.ExcludeRecordTypes,
13✔
236
                OwnerID:        c.Registry.OwnerID(),
13✔
237
        }
13✔
238

13✔
239
        plan = plan.Calculate()
13✔
240

13✔
241
        if plan.Changes.HasChanges() {
13✔
242
                err = c.Registry.ApplyChanges(ctx, plan.Changes)
13✔
243
                if err != nil {
23✔
244
                        registryErrorsTotal.Counter.Inc()
10✔
245
                        deprecatedRegistryErrors.Counter.Inc()
10✔
246
                        return err
×
247
                }
×
UNCOV
248
        } else {
×
249
                controllerNoChangesTotal.Counter.Inc()
10✔
250
                log.Info("All records are already up to date")
10✔
251
        }
10✔
252

3✔
253
        lastSyncTimestamp.Gauge.SetToCurrentTime()
3✔
254

3✔
255
        return nil
3✔
256
}
257

13✔
258
func earliest(r time.Time, times ...time.Time) time.Time {
13✔
259
        for _, t := range times {
13✔
260
                if t.Before(r) {
261
                        r = t
262
                }
4✔
263
        }
8✔
264
        return r
4✔
UNCOV
265
}
×
UNCOV
266

×
267
func latest(r time.Time, times ...time.Time) time.Time {
268
        for _, t := range times {
4✔
269
                if t.After(r) {
270
                        r = t
271
                }
4✔
272
        }
8✔
273
        return r
4✔
UNCOV
274
}
×
UNCOV
275

×
276
// Counts the intersections of records in endpoint and registry.
277
func countMatchingAddressRecords(rec *metricsRecorder, endpoints []*endpoint.Endpoint, registryRecords []*endpoint.Endpoint, metric metrics.GaugeVecMetric) {
4✔
278
        recordsMap := make(map[string]map[string]struct{})
279
        for _, regRecord := range registryRecords {
280
                if _, found := recordsMap[regRecord.DNSName]; !found {
281
                        recordsMap[regRecord.DNSName] = make(map[string]struct{})
13✔
282
                }
13✔
283
                recordsMap[regRecord.DNSName][regRecord.RecordType] = struct{}{}
5,796✔
284
        }
11,566✔
285

5,783✔
286
        for _, sourceRecord := range endpoints {
5,783✔
287
                if _, found := recordsMap[sourceRecord.DNSName]; found {
5,783✔
288
                        if _, ok := recordsMap[sourceRecord.DNSName][sourceRecord.RecordType]; ok {
289
                                rec.recordEndpointType(sourceRecord.RecordType)
290
                        }
978✔
291
                }
1,521✔
292
        }
1,112✔
293

556✔
294
        for _, rt := range endpoint.KnownRecordTypes {
556✔
295
                metric.SetWithLabels(rec.loadFloat64(rt), rt)
296
        }
297
}
298

117✔
299
// countAddressRecords updates the metricsRecorder with the count of each record type
104✔
300
// found in the provided endpoints slice, and sets the corresponding metrics for each
104✔
301
// known DNS record type using the sourceRecords metric.
302
func countAddressRecords(rec *metricsRecorder, endpoints []*endpoint.Endpoint, metric metrics.GaugeVecMetric) {
303
        // compute the number of records per type
304
        for _, endPoint := range endpoints {
305
                rec.recordEndpointType(endPoint.RecordType)
306
        }
26✔
307
        // set metrics for each record type
26✔
308
        for _, rt := range endpoint.KnownRecordTypes {
6,774✔
309
                metric.SetWithLabels(rec.loadFloat64(rt), rt)
6,748✔
310
        }
6,748✔
311
}
312

234✔
313
// ScheduleRunOnce makes sure execution happens at most once per interval.
208✔
314
func (c *Controller) ScheduleRunOnce(now time.Time) {
208✔
315
        c.runAtMutex.Lock()
316
        defer c.runAtMutex.Unlock()
317
        c.nextRunAt = latest(
318
                c.lastRunAt.Add(c.MinEventSyncInterval),
4✔
319
                earliest(
4✔
320
                        now.Add(5*time.Second),
4✔
321
                        c.nextRunAt,
4✔
322
                ),
4✔
323
        )
4✔
324
}
4✔
325

4✔
326
func (c *Controller) ShouldRunOnce(now time.Time) bool {
4✔
327
        c.runAtMutex.Lock()
4✔
328
        defer c.runAtMutex.Unlock()
4✔
329
        if now.Before(c.nextRunAt) {
330
                return false
16✔
331
        }
16✔
332
        c.nextRunAt = now.Add(c.Interval)
16✔
333
        return true
23✔
334
}
7✔
335

7✔
336
// Run runs RunOnce in a loop with a delay until context is canceled
9✔
337
func (c *Controller) Run(ctx context.Context) {
9✔
338
        ticker := time.NewTicker(time.Second)
339
        defer ticker.Stop()
340
        var softErrorCount int
341
        for {
2✔
342
                if c.ShouldRunOnce(time.Now()) {
2✔
343
                        if err := c.RunOnce(ctx); err != nil {
2✔
344
                                if errors.Is(err, provider.SoftError) {
2✔
345
                                        softErrorCount++
7✔
346
                                        consecutiveSoftErrors.Gauge.Set(float64(softErrorCount))
10✔
347
                                        log.Errorf("Failed to do run once: %v (consecutive soft errors: %d)", err, softErrorCount)
8✔
348
                                } else {
6✔
349
                                        log.Fatalf("Failed to do run once: %v", err)
3✔
350
                                }
3✔
351
                        } else {
3✔
352
                                if softErrorCount > 0 {
3✔
353
                                        log.Infof("Reconciliation succeeded after %d consecutive soft errors", softErrorCount)
×
354
                                }
×
355
                                softErrorCount = 0
2✔
356
                                consecutiveSoftErrors.Gauge.Set(0)
2✔
UNCOV
357
                        }
×
UNCOV
358
                }
×
359
                select {
2✔
360
                case <-ticker.C:
2✔
361
                case <-ctx.Done():
362
                        log.Info("Terminating main controller loop")
363
                        return
5✔
364
                }
3✔
365
        }
2✔
366
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc