• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / external-dns / 10370590389

13 Aug 2024 01:22PM UTC coverage: 52.554% (+0.01%) from 52.54%
10370590389

push

github

web-flow
Merge pull request #3400 from tjamet/min-event-sync

Improve MinEventInterval compliance with docs

23 of 27 new or added lines in 1 file covered. (85.19%)

1 existing line in 1 file now uncovered.

14711 of 27992 relevant lines covered (52.55%)

470.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.26
/controller/controller.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "errors"
22
        "fmt"
23
        "sync"
24
        "time"
25

26
        "github.com/prometheus/client_golang/prometheus"
27
        log "github.com/sirupsen/logrus"
28

29
        "sigs.k8s.io/external-dns/endpoint"
30
        "sigs.k8s.io/external-dns/plan"
31
        "sigs.k8s.io/external-dns/provider"
32
        "sigs.k8s.io/external-dns/registry"
33
        "sigs.k8s.io/external-dns/source"
34
)
35

36
var (
37
        registryErrorsTotal = prometheus.NewCounter(
38
                prometheus.CounterOpts{
39
                        Namespace: "external_dns",
40
                        Subsystem: "registry",
41
                        Name:      "errors_total",
42
                        Help:      "Number of Registry errors.",
43
                },
44
        )
45
        sourceErrorsTotal = prometheus.NewCounter(
46
                prometheus.CounterOpts{
47
                        Namespace: "external_dns",
48
                        Subsystem: "source",
49
                        Name:      "errors_total",
50
                        Help:      "Number of Source errors.",
51
                },
52
        )
53
        sourceEndpointsTotal = prometheus.NewGauge(
54
                prometheus.GaugeOpts{
55
                        Namespace: "external_dns",
56
                        Subsystem: "source",
57
                        Name:      "endpoints_total",
58
                        Help:      "Number of Endpoints in all sources",
59
                },
60
        )
61
        registryEndpointsTotal = prometheus.NewGauge(
62
                prometheus.GaugeOpts{
63
                        Namespace: "external_dns",
64
                        Subsystem: "registry",
65
                        Name:      "endpoints_total",
66
                        Help:      "Number of Endpoints in the registry",
67
                },
68
        )
69
        lastSyncTimestamp = prometheus.NewGauge(
70
                prometheus.GaugeOpts{
71
                        Namespace: "external_dns",
72
                        Subsystem: "controller",
73
                        Name:      "last_sync_timestamp_seconds",
74
                        Help:      "Timestamp of last successful sync with the DNS provider",
75
                },
76
        )
77
        lastReconcileTimestamp = prometheus.NewGauge(
78
                prometheus.GaugeOpts{
79
                        Namespace: "external_dns",
80
                        Subsystem: "controller",
81
                        Name:      "last_reconcile_timestamp_seconds",
82
                        Help:      "Timestamp of last attempted sync with the DNS provider",
83
                },
84
        )
85
        controllerNoChangesTotal = prometheus.NewCounter(
86
                prometheus.CounterOpts{
87
                        Namespace: "external_dns",
88
                        Subsystem: "controller",
89
                        Name:      "no_op_runs_total",
90
                        Help:      "Number of reconcile loops ending up with no changes on the DNS provider side.",
91
                },
92
        )
93
        deprecatedRegistryErrors = prometheus.NewCounter(
94
                prometheus.CounterOpts{
95
                        Subsystem: "registry",
96
                        Name:      "errors_total",
97
                        Help:      "Number of Registry errors.",
98
                },
99
        )
100
        deprecatedSourceErrors = prometheus.NewCounter(
101
                prometheus.CounterOpts{
102
                        Subsystem: "source",
103
                        Name:      "errors_total",
104
                        Help:      "Number of Source errors.",
105
                },
106
        )
107
        registryARecords = prometheus.NewGauge(
108
                prometheus.GaugeOpts{
109
                        Namespace: "external_dns",
110
                        Subsystem: "registry",
111
                        Name:      "a_records",
112
                        Help:      "Number of Registry A records.",
113
                },
114
        )
115
        registryAAAARecords = prometheus.NewGauge(
116
                prometheus.GaugeOpts{
117
                        Namespace: "external_dns",
118
                        Subsystem: "registry",
119
                        Name:      "aaaa_records",
120
                        Help:      "Number of Registry AAAA records.",
121
                },
122
        )
123
        sourceARecords = prometheus.NewGauge(
124
                prometheus.GaugeOpts{
125
                        Namespace: "external_dns",
126
                        Subsystem: "source",
127
                        Name:      "a_records",
128
                        Help:      "Number of Source A records.",
129
                },
130
        )
131
        sourceAAAARecords = prometheus.NewGauge(
132
                prometheus.GaugeOpts{
133
                        Namespace: "external_dns",
134
                        Subsystem: "source",
135
                        Name:      "aaaa_records",
136
                        Help:      "Number of Source AAAA records.",
137
                },
138
        )
139
        verifiedARecords = prometheus.NewGauge(
140
                prometheus.GaugeOpts{
141
                        Namespace: "external_dns",
142
                        Subsystem: "controller",
143
                        Name:      "verified_a_records",
144
                        Help:      "Number of DNS A-records that exists both in source and registry.",
145
                },
146
        )
147
        verifiedAAAARecords = prometheus.NewGauge(
148
                prometheus.GaugeOpts{
149
                        Namespace: "external_dns",
150
                        Subsystem: "controller",
151
                        Name:      "verified_aaaa_records",
152
                        Help:      "Number of DNS AAAA-records that exists both in source and registry.",
153
                },
154
        )
155
)
156

157
func init() {
1✔
158
        prometheus.MustRegister(registryErrorsTotal)
1✔
159
        prometheus.MustRegister(sourceErrorsTotal)
1✔
160
        prometheus.MustRegister(sourceEndpointsTotal)
1✔
161
        prometheus.MustRegister(registryEndpointsTotal)
1✔
162
        prometheus.MustRegister(lastSyncTimestamp)
1✔
163
        prometheus.MustRegister(lastReconcileTimestamp)
1✔
164
        prometheus.MustRegister(deprecatedRegistryErrors)
1✔
165
        prometheus.MustRegister(deprecatedSourceErrors)
1✔
166
        prometheus.MustRegister(controllerNoChangesTotal)
1✔
167
        prometheus.MustRegister(registryARecords)
1✔
168
        prometheus.MustRegister(registryAAAARecords)
1✔
169
        prometheus.MustRegister(sourceARecords)
1✔
170
        prometheus.MustRegister(sourceAAAARecords)
1✔
171
        prometheus.MustRegister(verifiedARecords)
1✔
172
        prometheus.MustRegister(verifiedAAAARecords)
1✔
173
}
1✔
174

175
// Controller is responsible for orchestrating the different components.
176
// It works in the following way:
177
// * Ask the DNS provider for current list of endpoints.
178
// * Ask the Source for the desired list of endpoints.
179
// * Take both lists and calculate a Plan to move current towards desired state.
180
// * Tell the DNS provider to apply the changes calculated by the Plan.
181
type Controller struct {
182
        Source   source.Source
183
        Registry registry.Registry
184
        // The policy that defines which changes to DNS records are allowed
185
        Policy plan.Policy
186
        // The interval between individual synchronizations
187
        Interval time.Duration
188
        // The DomainFilter defines which DNS records to keep or exclude
189
        DomainFilter endpoint.DomainFilter
190
        // The nextRunAt used for throttling and batching reconciliation
191
        nextRunAt time.Time
192
        // The runAtMutex is for atomic updating of nextRunAt and lastRunAt
193
        runAtMutex sync.Mutex
194
        // The lastRunAt used for throttling and batching reconciliation
195
        lastRunAt time.Time
196
        // MangedRecordTypes are DNS record types that will be considered for management.
197
        ManagedRecordTypes []string
198
        // ExcludeRecordTypes are DNS record types that will be excluded from management.
199
        ExcludeRecordTypes []string
200
        // MinEventSyncInterval is used as window for batching events
201
        MinEventSyncInterval time.Duration
202
}
203

204
// RunOnce runs a single iteration of a reconciliation loop.
205
func (c *Controller) RunOnce(ctx context.Context) error {
12✔
206
        lastReconcileTimestamp.SetToCurrentTime()
12✔
207

12✔
208
        c.runAtMutex.Lock()
12✔
209
        c.lastRunAt = time.Now()
12✔
210
        c.runAtMutex.Unlock()
12✔
211

12✔
212
        records, err := c.Registry.Records(ctx)
12✔
213
        if err != nil {
12✔
214
                registryErrorsTotal.Inc()
×
215
                deprecatedRegistryErrors.Inc()
×
216
                return err
×
217
        }
×
218

219
        registryEndpointsTotal.Set(float64(len(records)))
12✔
220
        regARecords, regAAAARecords := countAddressRecords(records)
12✔
221
        registryARecords.Set(float64(regARecords))
12✔
222
        registryAAAARecords.Set(float64(regAAAARecords))
12✔
223
        ctx = context.WithValue(ctx, provider.RecordsContextKey, records)
12✔
224

12✔
225
        endpoints, err := c.Source.Endpoints(ctx)
12✔
226
        if err != nil {
12✔
227
                sourceErrorsTotal.Inc()
×
228
                deprecatedSourceErrors.Inc()
×
229
                return err
×
230
        }
×
231
        sourceEndpointsTotal.Set(float64(len(endpoints)))
12✔
232
        srcARecords, srcAAAARecords := countAddressRecords(endpoints)
12✔
233
        sourceARecords.Set(float64(srcARecords))
12✔
234
        sourceAAAARecords.Set(float64(srcAAAARecords))
12✔
235
        vARecords, vAAAARecords := countMatchingAddressRecords(endpoints, records)
12✔
236
        verifiedARecords.Set(float64(vARecords))
12✔
237
        verifiedAAAARecords.Set(float64(vAAAARecords))
12✔
238
        endpoints, err = c.Registry.AdjustEndpoints(endpoints)
12✔
239
        if err != nil {
12✔
240
                return fmt.Errorf("adjusting endpoints: %w", err)
×
241
        }
×
242
        registryFilter := c.Registry.GetDomainFilter()
12✔
243

12✔
244
        plan := &plan.Plan{
12✔
245
                Policies:       []plan.Policy{c.Policy},
12✔
246
                Current:        records,
12✔
247
                Desired:        endpoints,
12✔
248
                DomainFilter:   endpoint.MatchAllDomainFilters{&c.DomainFilter, &registryFilter},
12✔
249
                ManagedRecords: c.ManagedRecordTypes,
12✔
250
                ExcludeRecords: c.ExcludeRecordTypes,
12✔
251
                OwnerID:        c.Registry.OwnerID(),
12✔
252
        }
12✔
253

12✔
254
        plan = plan.Calculate()
12✔
255

12✔
256
        if plan.Changes.HasChanges() {
21✔
257
                err = c.Registry.ApplyChanges(ctx, plan.Changes)
9✔
258
                if err != nil {
9✔
259
                        registryErrorsTotal.Inc()
×
260
                        deprecatedRegistryErrors.Inc()
×
261
                        return err
×
262
                }
×
263
        } else {
3✔
264
                controllerNoChangesTotal.Inc()
3✔
265
                log.Info("All records are already up to date")
3✔
266
        }
3✔
267

268
        lastSyncTimestamp.SetToCurrentTime()
12✔
269

12✔
270
        return nil
12✔
271
}
272

273
func earliest(r time.Time, times ...time.Time) time.Time {
4✔
274
        for _, t := range times {
8✔
275
                if t.Before(r) {
4✔
NEW
276
                        r = t
×
NEW
277
                }
×
278
        }
279
        return r
4✔
280
}
281

282
func latest(r time.Time, times ...time.Time) time.Time {
4✔
283
        for _, t := range times {
8✔
284
                if t.After(r) {
4✔
NEW
285
                        r = t
×
NEW
286
                }
×
287
        }
288
        return r
4✔
289
}
290

291
// Counts the intersections of A and AAAA records in endpoint and registry.
292
func countMatchingAddressRecords(endpoints []*endpoint.Endpoint, registryRecords []*endpoint.Endpoint) (int, int) {
12✔
293
        recordsMap := make(map[string]map[string]struct{})
12✔
294
        for _, regRecord := range registryRecords {
39✔
295
                if _, found := recordsMap[regRecord.DNSName]; !found {
54✔
296
                        recordsMap[regRecord.DNSName] = make(map[string]struct{})
27✔
297
                }
27✔
298
                recordsMap[regRecord.DNSName][regRecord.RecordType] = struct{}{}
27✔
299
        }
300
        aCount := 0
12✔
301
        aaaaCount := 0
12✔
302
        for _, sourceRecord := range endpoints {
47✔
303
                if _, found := recordsMap[sourceRecord.DNSName]; found {
56✔
304
                        if _, found := recordsMap[sourceRecord.DNSName][sourceRecord.RecordType]; found {
42✔
305
                                switch sourceRecord.RecordType {
21✔
306
                                case endpoint.RecordTypeA:
11✔
307
                                        aCount++
11✔
308
                                case endpoint.RecordTypeAAAA:
8✔
309
                                        aaaaCount++
8✔
310
                                }
311
                        }
312
                }
313
        }
314
        return aCount, aaaaCount
12✔
315
}
316

317
func countAddressRecords(endpoints []*endpoint.Endpoint) (int, int) {
24✔
318
        aCount := 0
24✔
319
        aaaaCount := 0
24✔
320
        for _, endPoint := range endpoints {
86✔
321
                switch endPoint.RecordType {
62✔
322
                case endpoint.RecordTypeA:
34✔
323
                        aCount++
34✔
324
                case endpoint.RecordTypeAAAA:
24✔
325
                        aaaaCount++
24✔
326
                }
327
        }
328
        return aCount, aaaaCount
24✔
329
}
330

331
// ScheduleRunOnce makes sure execution happens at most once per interval.
332
func (c *Controller) ScheduleRunOnce(now time.Time) {
4✔
333
        c.runAtMutex.Lock()
4✔
334
        defer c.runAtMutex.Unlock()
4✔
335
        c.nextRunAt = latest(
4✔
336
                c.lastRunAt.Add(c.MinEventSyncInterval),
4✔
337
                earliest(
4✔
338
                        now.Add(5*time.Second),
4✔
339
                        c.nextRunAt,
4✔
340
                ),
4✔
341
        )
4✔
342
}
4✔
343

344
func (c *Controller) ShouldRunOnce(now time.Time) bool {
13✔
345
        c.runAtMutex.Lock()
13✔
346
        defer c.runAtMutex.Unlock()
13✔
347
        if now.Before(c.nextRunAt) {
20✔
348
                return false
7✔
349
        }
7✔
350
        c.nextRunAt = now.Add(c.Interval)
6✔
351
        return true
6✔
352
}
353

354
// Run runs RunOnce in a loop with a delay until context is canceled
355
func (c *Controller) Run(ctx context.Context) {
1✔
356
        ticker := time.NewTicker(time.Second)
1✔
357
        defer ticker.Stop()
1✔
358
        for {
3✔
359
                if c.ShouldRunOnce(time.Now()) {
4✔
360
                        if err := c.RunOnce(ctx); err != nil {
2✔
361
                                if errors.Is(err, provider.SoftError) {
×
362
                                        log.Errorf("Failed to do run once: %v", err)
×
363
                                } else {
×
364
                                        log.Fatalf("Failed to do run once: %v", err)
×
365
                                }
×
366
                        }
367
                }
368
                select {
2✔
369
                case <-ticker.C:
1✔
370
                case <-ctx.Done():
1✔
371
                        log.Info("Terminating main controller loop")
1✔
372
                        return
1✔
373
                }
374
        }
375
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc