• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Azure / aks-app-routing-operator / 9455033482

10 Jun 2024 08:22PM UTC coverage: 78.693% (-0.5%) from 79.15%
9455033482

Pull #201

github

web-flow
Merge 106a17649 into 8ca6390b9
Pull Request #201: adding custom-http-errors field to crd

12 of 36 new or added lines in 3 files covered. (33.33%)

1 existing line in 1 file now uncovered.

3010 of 3825 relevant lines covered (78.69%)

13.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.41
/pkg/controller/ingress/concurrency_watchdog.go
1
// Copyright (c) Microsoft Corporation.
2
// Licensed under the MIT License.
3

4
package ingress
5

6
import (
7
        "bytes"
8
        "container/ring"
9
        "context"
10
        "errors"
11
        "fmt"
12
        "io"
13
        "time"
14

15
        approutingv1alpha1 "github.com/Azure/aks-app-routing-operator/api/v1alpha1"
16
        "github.com/Azure/aks-app-routing-operator/pkg/controller/nginxingress"
17
        "github.com/go-logr/logr"
18
        "github.com/hashicorp/go-multierror"
19
        prommodel "github.com/prometheus/client_model/go"
20
        "github.com/prometheus/common/expfmt"
21
        corev1 "k8s.io/api/core/v1"
22
        policyv1beta1 "k8s.io/api/policy/v1beta1"
23
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24
        "k8s.io/client-go/kubernetes"
25
        "k8s.io/client-go/rest"
26
        ctrl "sigs.k8s.io/controller-runtime"
27
        "sigs.k8s.io/controller-runtime/pkg/client"
28

29
        "github.com/Azure/aks-app-routing-operator/pkg/config"
30
        "github.com/Azure/aks-app-routing-operator/pkg/controller/controllername"
31
        "github.com/Azure/aks-app-routing-operator/pkg/controller/metrics"
32
        "github.com/Azure/aks-app-routing-operator/pkg/util"
33
)
34

35
var (
36
        concurrencyWatchdogControllerName = controllername.New("concurrency", "watchdog")
37
)
38

39
// ScrapeFn returns the connection count for the given pod
40
type ScrapeFn func(ctx context.Context, client rest.Interface, pod *corev1.Pod) (float64, error)
41

42
// NginxScrapeFn is the scrape function for Nginx
43
func NginxScrapeFn(ctx context.Context, client rest.Interface, pod *corev1.Pod) (float64, error) {
2✔
44
        lgr := logr.FromContextOrDiscard(ctx)
2✔
45

2✔
46
        lgr.Info("scraping pod", "pod", pod.Name)
2✔
47
        resp, err := client.Get().
2✔
48
                AbsPath("/api/v1/namespaces", pod.Namespace, "pods", pod.Name+":10254", "proxy/metrics").
2✔
49
                Timeout(time.Second * 30).
2✔
50
                MaxRetries(4).
2✔
51
                DoRaw(ctx)
2✔
52
        if err != nil {
2✔
53
                return 0, err
×
54
        }
×
55

56
        family := &prommodel.MetricFamily{}
2✔
57
        dec := expfmt.NewDecoder(bytes.NewReader(resp), expfmt.FmtOpenMetrics_0_0_1)
2✔
58
        for {
5✔
59
                err = dec.Decode(family)
3✔
60
                if errors.Is(err, io.EOF) {
4✔
61
                        break
1✔
62
                }
63
                if err != nil {
2✔
64
                        return 0, err
×
65
                }
×
66
                if family.GetName() != "nginx_ingress_controller_nginx_process_connections" {
2✔
UNCOV
67
                        continue
×
68
                }
69
                for _, metric := range family.Metric {
4✔
70
                        if metric.Gauge == nil || !metricHasLabel(metric, "state", "active") {
3✔
71
                                continue
1✔
72
                        }
73
                        return metric.Gauge.GetValue(), nil
1✔
74
                }
75
        }
76

77
        return 0, fmt.Errorf("active connections metric not found")
1✔
78
}
79

80
// WatchdogTarget refers to a target the concurrency watchdog should track
81
type WatchdogTarget struct {
82
        ScrapeFn  ScrapeFn
83
        PodLabels map[string]string
84
}
85

86
type ListWatchdogTargets func() ([]WatchdogTarget, error)
87

88
func GetListNginxWatchdogTargets(cl client.Client, defaultNicControllerClass string) ListWatchdogTargets {
1✔
89
        return func() ([]WatchdogTarget, error) {
4✔
90
                nics := &approutingv1alpha1.NginxIngressControllerList{}
3✔
91
                if err := cl.List(nil, nics); err != nil {
3✔
92
                        return nil, fmt.Errorf("listing NginxIngressController objects: %w", err)
×
93
                }
×
94

95
                var targets []WatchdogTarget
3✔
96
                for _, nic := range nics.Items {
6✔
97
                        ingCfg := nginxingress.ToNginxIngressConfig(&nic, defaultNicControllerClass)
3✔
98
                        if ingCfg == nil {
3✔
99
                                continue
×
100
                        }
101

102
                        targets = append(targets, WatchdogTarget{
3✔
103
                                ScrapeFn:  NginxScrapeFn,
3✔
104
                                PodLabels: ingCfg.PodLabels(),
3✔
105
                        })
3✔
106
                }
107

108
                return targets, nil
3✔
109
        }
110
}
111

112
// ConcurrencyWatchdog evicts ingress controller pods that have too many active connections relative to others.
113
// This helps redistribute long-running connections when the ingress controller scales up.
114
type ConcurrencyWatchdog struct {
115
        client              client.Client
116
        clientset           kubernetes.Interface
117
        restClient          rest.Interface
118
        logger              logr.Logger
119
        config              *config.Config
120
        listWatchdogTargets ListWatchdogTargets
121

122
        interval, minPodAge, voteTTL time.Duration
123
        minVotesBeforeEviction       int
124
        minPercentOverAvgBeforeVote  float64
125

126
        votes *ring.Ring
127
}
128

129
func NewConcurrencyWatchdog(manager ctrl.Manager, conf *config.Config, target ListWatchdogTargets) error {
1✔
130
        metrics.InitControllerMetrics(concurrencyWatchdogControllerName)
1✔
131
        clientset, err := kubernetes.NewForConfig(manager.GetConfig())
1✔
132
        if err != nil {
1✔
133
                return err
×
134
        }
×
135

136
        c := &ConcurrencyWatchdog{
1✔
137
                client:              manager.GetClient(),
1✔
138
                clientset:           clientset,
1✔
139
                restClient:          clientset.CoreV1().RESTClient(),
1✔
140
                logger:              concurrencyWatchdogControllerName.AddToLogger(manager.GetLogger()),
1✔
141
                config:              conf,
1✔
142
                listWatchdogTargets: target,
1✔
143

1✔
144
                interval:                    time.Minute,
1✔
145
                minPodAge:                   time.Minute * 5,
1✔
146
                minVotesBeforeEviction:      conf.ConcurrencyWatchdogVotes,
1✔
147
                minPercentOverAvgBeforeVote: conf.ConcurrencyWatchdogThres,
1✔
148
                voteTTL:                     time.Minute * 10,
1✔
149

1✔
150
                votes: ring.New(20),
1✔
151
        }
1✔
152

1✔
153
        return manager.Add(c)
1✔
154
}
155

156
func (c *ConcurrencyWatchdog) Start(ctx context.Context) error {
×
157
        for {
×
158
                select {
×
159
                case <-ctx.Done():
×
160
                        return ctx.Err()
×
161
                case <-time.After(util.Jitter(c.interval, 0.3)):
×
162
                }
163
                if err := c.tick(ctx); err != nil {
×
164
                        c.logger.Error(err, "error reconciling ingress controller resources")
×
165
                        continue
×
166
                }
167
        }
168
}
169

170
func (c *ConcurrencyWatchdog) tick(ctx context.Context) error {
5✔
171
        lgr := c.logger
5✔
172
        start := time.Now()
5✔
173
        var retErr *multierror.Error
5✔
174
        defer func() {
10✔
175
                lgr.Info("finished checking on ingress controller pods", "latencySec", time.Since(start).Seconds())
5✔
176

5✔
177
                //placing this call inside a closure allows for result and err to be bound after tick executes
5✔
178
                //this makes sure they have the proper value
5✔
179
                //just calling defer metrics.HandleControllerReconcileMetrics(controllerName, result, err) would bind
5✔
180
                //the values of result and err to their zero values, since they were just instantiated
5✔
181
                metrics.HandleControllerReconcileMetrics(concurrencyWatchdogControllerName, ctrl.Result{}, retErr.ErrorOrNil())
5✔
182
        }()
5✔
183

184
        lgr.Info("listing watchdog targets")
5✔
185
        targets, err := c.listWatchdogTargets()
5✔
186
        if err != nil {
6✔
187
                lgr.Error(err, "listing watchdog targets")
1✔
188
                retErr = multierror.Append(retErr, fmt.Errorf("listing watchdog targets: %w", err))
1✔
189
                return retErr.ErrorOrNil()
1✔
190
        }
1✔
191

192
        for _, target := range targets {
8✔
193
                lgr := c.logger.WithValues("target", target.PodLabels)
4✔
194
                lgr.Info("starting checking on ingress controller pods")
4✔
195

4✔
196
                lgr.Info("listing pods")
4✔
197
                list := &corev1.PodList{}
4✔
198
                err := c.client.List(ctx, list, client.InNamespace(c.config.NS), client.MatchingLabels(target.PodLabels))
4✔
199
                if err != nil {
4✔
200
                        lgr.Error(err, "listing pods")
×
201
                        retErr = multierror.Append(retErr, fmt.Errorf("listing pods: %w", err))
×
202
                        continue
×
203
                }
204

205
                lgr.Info("gathering metrics for each pod")
4✔
206
                connectionCountByPod := make([]float64, len(list.Items))
4✔
207
                nReadyPods := 0
4✔
208
                var totalConnectionCount float64
4✔
209
                for i, pod := range list.Items {
18✔
210
                        lgr := lgr.WithValues("pod", pod.Name)
14✔
211
                        if !podIsReady(&pod) {
16✔
212
                                lgr.Info("pod is not ready", "name", pod.Name)
2✔
213
                                continue
2✔
214
                        }
215
                        nReadyPods++
12✔
216
                        ctx := logr.NewContext(ctx, lgr)
12✔
217
                        count, err := target.ScrapeFn(ctx, c.restClient, &pod)
12✔
218
                        if err != nil {
12✔
219
                                lgr.Error(err, "scraping pod", "name", pod.Name)
×
220
                                retErr = multierror.Append(retErr, fmt.Errorf("scraping pod %q: %w", pod.Name, err))
×
221
                                continue
×
222
                        }
223
                        connectionCountByPod[i] = count
12✔
224
                        totalConnectionCount += count
12✔
225
                }
226
                avgConnectionCount := totalConnectionCount / float64(nReadyPods)
4✔
227

4✔
228
                // Only rebalance connections when three or more replicas are ready.
4✔
229
                // Otherwise we will just push the connections to the other replica.
4✔
230
                if nReadyPods < 3 {
6✔
231
                        lgr.Info("not enough ready pods to rebalance connections", "readyPods", nReadyPods)
2✔
232
                        continue
2✔
233
                }
234

235
                lgr.Info("processing votes")
2✔
236
                pod := c.processVotes(list, connectionCountByPod, avgConnectionCount)
2✔
237
                if pod == "" {
3✔
238
                        lgr.Info("no pod to evict")
1✔
239
                        continue
1✔
240
                }
241

242
                lgr.Info("evicting pod due to high relative connection concurrency", "name", pod)
1✔
243
                eviction := &policyv1beta1.Eviction{
1✔
244
                        ObjectMeta: metav1.ObjectMeta{
1✔
245
                                Name:      pod,
1✔
246
                                Namespace: c.config.NS,
1✔
247
                        },
1✔
248
                }
1✔
249

1✔
250
                if err := c.clientset.CoreV1().Pods(eviction.Namespace).EvictV1beta1(ctx, eviction); err != nil {
2✔
251
                        lgr.Error(err, "unable to evict pod", "name", pod)
1✔
252
                        // don't add the error to return since we shouldn't retry right away
1✔
253
                }
1✔
254
        }
255
        if err := retErr.ErrorOrNil(); err != nil {
4✔
256
                c.logger.Error(err, "reconciling ingress controller resources")
×
257
                return err
×
258
        }
×
259
        return nil
4✔
260
}
261

262
func (c *ConcurrencyWatchdog) processVotes(list *corev1.PodList, connectionCountByPod []float64, avgConnectionCount float64) string {
71✔
263
        // Vote on outlier(s)
71✔
264
        podsByName := map[string]struct{}{}
71✔
265
        for i, pod := range list.Items {
426✔
266
                podsByName[pod.Name] = struct{}{}
355✔
267

355✔
268
                rank := (connectionCountByPod[i] / avgConnectionCount) * 100
355✔
269
                if rank < c.minPercentOverAvgBeforeVote || time.Since(pod.CreationTimestamp.Time) < c.minPodAge {
642✔
270
                        continue
287✔
271
                }
272
                c.logger.Info("voting to evict pod due to high connection concurrency", "name", pod.Name, "percentOfAvg", rank)
68✔
273

68✔
274
                c.votes = c.votes.Next()
68✔
275
                var vote *evictionVote
68✔
276
                if c.votes.Value == nil {
96✔
277
                        vote = &evictionVote{}
28✔
278
                        c.votes.Value = vote
28✔
279
                } else {
68✔
280
                        vote = c.votes.Value.(*evictionVote)
40✔
281
                }
40✔
282

283
                vote.PodName = pod.Name
68✔
284
                vote.Time = time.Now()
68✔
285
        }
286

287
        // Aggregate votes
288
        votesPerPod := map[string]int{}
71✔
289
        c.votes.Do(func(cur interface{}) {
1,491✔
290
                vote, ok := cur.(*evictionVote)
1,420✔
291
                if !ok {
1,817✔
292
                        return
397✔
293
                }
397✔
294
                if _, exists := podsByName[vote.PodName]; !exists || time.Since(vote.Time) > c.voteTTL {
1,025✔
295
                        return
2✔
296
                }
2✔
297
                votesPerPod[vote.PodName]++
1,021✔
298
        })
299

300
        // Apply votes
301
        for pod, votes := range votesPerPod {
141✔
302
                if votes < c.minVotesBeforeEviction {
79✔
303
                        continue
9✔
304
                }
305
                return pod
61✔
306
        }
307
        return ""
10✔
308
}
309

310
func (c *ConcurrencyWatchdog) NeedLeaderElection() bool {
2✔
311
        return true
2✔
312
}
2✔
313

314
type evictionVote struct {
315
        Time    time.Time
316
        PodName string
317
}
318

319
func metricHasLabel(metric *prommodel.Metric, key, value string) bool {
2✔
320
        for _, cur := range metric.Label {
4✔
321
                if cur.GetName() == key && cur.GetValue() == value {
3✔
322
                        return true
1✔
323
                }
1✔
324
        }
325
        return false
1✔
326
}
327

328
func podIsReady(pod *corev1.Pod) bool {
14✔
329
        for _, cond := range pod.Status.Conditions {
28✔
330
                if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
26✔
331
                        return true
12✔
332
                }
12✔
333
        }
334
        return false
2✔
335
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc