• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando-incubator / stackset-controller / 21064480734

16 Jan 2026 11:02AM UTC coverage: 46.572% (-3.3%) from 49.897%
21064480734

Pull #723

github

mikkeloscar
Refactor controller queue logic

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
Pull Request #723: Refactor controller queue logic

6 of 394 new or added lines in 4 files covered. (1.52%)

9 existing lines in 1 file now uncovered.

2670 of 5733 relevant lines covered (46.57%)

0.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/cmd/stackset-controller/main.go
1
package main
2

3
import (
4
        "context"
5
        "net"
6
        "net/http"
7
        "net/url"
8
        "os"
9
        "os/signal"
10
        "syscall"
11
        "time"
12

13
        "github.com/alecthomas/kingpin/v2"
14
        "github.com/prometheus/client_golang/prometheus"
15
        "github.com/prometheus/client_golang/prometheus/promhttp"
16
        log "github.com/sirupsen/logrus"
17
        "github.com/zalando-incubator/stackset-controller/controller"
18
        "github.com/zalando-incubator/stackset-controller/pkg/clientset"
19
        "github.com/zalando-incubator/stackset-controller/pkg/traffic"
20
        corev1 "k8s.io/api/core/v1"
21
        "k8s.io/client-go/rest"
22
        "k8s.io/client-go/tools/clientcmd"
23
        "k8s.io/client-go/transport"
24
)
25

26
const (
27
        defaultInterval               = "10s"
28
        defaultIngressSourceSwitchTTL = "5m"
29
        defaultMetricsAddress         = ":7979"
30
        defaultClientGOTimeout        = 30 * time.Second
31
        defaultReconcileWorkers       = "10"
32
)
33

34
var (
35
        config struct {
36
                Debug                       bool
37
                Interval                    time.Duration
38
                APIServer                   *url.URL
39
                Namespace                   string
40
                MetricsAddress              string
41
                ClusterDomains              []string
42
                NoTrafficScaledownTTL       time.Duration
43
                ControllerID                string
44
                BackendWeightsAnnotationKey string
45
                RouteGroupSupportEnabled    bool
46
                SyncIngressAnnotations      []string
47
                ReconcileWorkers            int
48
                ConfigMapSupportEnabled     bool
49
                SecretSupportEnabled        bool
50
                PCSSupportEnabled           bool
51
                ForwardSupportEnabled       bool
52
        }
53
)
54

55
func main() {
×
56
        kingpin.Flag("debug", "Enable debug logging.").BoolVar(&config.Debug)
×
57
        kingpin.Flag("interval", "Interval between syncing stacksets.").
×
58
                Default(defaultInterval).DurationVar(&config.Interval)
×
59
        kingpin.Flag("apiserver", "API server url.").URLVar(&config.APIServer)
×
60
        kingpin.Flag("namespace", "Limit scope to a particular namespace.").Default(corev1.NamespaceAll).StringVar(&config.Namespace)
×
61
        kingpin.Flag("metrics-address", "defines where to serve metrics").Default(defaultMetricsAddress).StringVar(&config.MetricsAddress)
×
62
        kingpin.Flag("controller-id", "ID of the controller used to determine ownership of StackSet resources").StringVar(&config.ControllerID)
×
63
        kingpin.Flag("reconcile-workers", "The amount of stacksets to reconcile in parallel at a time.").
×
64
                Default(defaultReconcileWorkers).IntVar(&config.ReconcileWorkers)
×
65
        kingpin.Flag("backend-weights-key", "Backend weights annotation key the controller will use to set current traffic values").Default(traffic.DefaultBackendWeightsAnnotationKey).StringVar(&config.BackendWeightsAnnotationKey)
×
66
        kingpin.Flag("cluster-domain", "Main domains of the cluster, used for generating Stack Ingress hostnames").Envar("CLUSTER_DOMAIN").Required().StringsVar(&config.ClusterDomains)
×
67
        kingpin.Flag("enable-routegroup-support", "Enable support for RouteGroups on StackSets.").Default("false").BoolVar(&config.RouteGroupSupportEnabled)
×
68
        kingpin.Flag(
×
69
                "sync-ingress-annotation",
×
70
                "Ingress/RouteGroup annotation to propagate to all traffic segments.",
×
71
        ).StringsVar(&config.SyncIngressAnnotations)
×
72
        kingpin.Flag("enable-configmap-support", "Enable support for ConfigMaps on StackSets.").Default("false").BoolVar(&config.ConfigMapSupportEnabled)
×
73
        kingpin.Flag("enable-secret-support", "Enable support for Secrets on StackSets.").Default("false").BoolVar(&config.SecretSupportEnabled)
×
74
        kingpin.Flag("enable-pcs-support", "Enable support for PlatformCredentialsSet on StackSets.").Default("false").BoolVar(&config.PCSSupportEnabled)
×
75
        kingpin.Flag("enable-forward-support", "Enable support for skipper traffic forwarding.").Default("false").BoolVar(&config.ForwardSupportEnabled)
×
76
        kingpin.Parse()
×
77

×
78
        if config.Debug {
×
79
                log.SetLevel(log.DebugLevel)
×
80
        }
×
81

82
        stackSetConfig := controller.StackSetConfig{
×
83
                Namespace:    config.Namespace,
×
84
                ControllerID: config.ControllerID,
×
85

×
86
                ClusterDomains:              config.ClusterDomains,
×
87
                BackendWeightsAnnotationKey: config.BackendWeightsAnnotationKey,
×
88
                SyncIngressAnnotations:      config.SyncIngressAnnotations,
×
89

×
90
                ReconcileWorkers: config.ReconcileWorkers,
×
91
                Interval:         config.Interval,
×
92

×
93
                RouteGroupSupportEnabled: config.RouteGroupSupportEnabled,
×
94
                ConfigMapSupportEnabled:  config.ConfigMapSupportEnabled,
×
95
                SecretSupportEnabled:     config.SecretSupportEnabled,
×
96
                PcsSupportEnabled:        config.PCSSupportEnabled,
×
97
                ForwardSupportEnabled:    config.ForwardSupportEnabled,
×
98
        }
×
99

×
100
        ctx, cancel := context.WithCancel(context.Background())
×
101
        kubeConfig, err := configureKubeConfig(config.APIServer, defaultClientGOTimeout, ctx.Done())
×
102
        if err != nil {
×
103
                log.Fatalf("Failed to setup Kubernetes config: %v", err)
×
104
        }
×
105

106
        client, err := clientset.NewForConfig(kubeConfig)
×
107
        if err != nil {
×
108
                log.Fatalf("Failed to initialize Kubernetes client: %v", err)
×
109
        }
×
110

111
        controller, err := controller.NewStackSetController(
×
112
                client,
×
113
                prometheus.DefaultRegisterer,
×
114
                stackSetConfig,
×
115
        )
×
116
        if err != nil {
×
117
                log.Fatalf("Failed to create Stackset controller: %v", err)
×
118
        }
×
119

120
        go handleSigterm(cancel)
×
121
        go serveMetrics(config.MetricsAddress)
×
NEW
122
        err = controller.Run2(ctx)
×
123
        if err != nil {
×
124
                cancel()
×
125
                log.Fatalf("Failed to run controller: %v", err)
×
126
        }
×
127
}
128

129
// handleSigterm handles SIGTERM signal sent to the process.
130
func handleSigterm(cancelFunc func()) {
×
131
        signals := make(chan os.Signal, 1)
×
132
        signal.Notify(signals, syscall.SIGTERM)
×
133
        <-signals
×
134
        log.Info("Received Term signal. Terminating...")
×
135
        cancelFunc()
×
136
}
×
137

138
// configureKubeConfig configures a kubeconfig.
139
func configureKubeConfig(apiServerURL *url.URL, timeout time.Duration, stopCh <-chan struct{}) (*rest.Config, error) {
×
140
        tr := &http.Transport{
×
141
                DialContext: (&net.Dialer{
×
142
                        Timeout:   timeout,
×
143
                        KeepAlive: 30 * time.Second,
×
144
                        DualStack: false, // K8s do not work well with IPv6
×
145
                }).DialContext,
×
146
                TLSHandshakeTimeout:   timeout,
×
147
                ResponseHeaderTimeout: 10 * time.Second,
×
148
                MaxIdleConns:          10,
×
149
                MaxIdleConnsPerHost:   2,
×
150
                IdleConnTimeout:       20 * time.Second,
×
151
        }
×
152

×
153
        // We need this to reliably fade on DNS change, which is right
×
154
        // now not fixed with IdleConnTimeout in the http.Transport.
×
155
        // https://github.com/golang/go/issues/23427
×
156
        go func(d time.Duration) {
×
157
                for {
×
158
                        select {
×
159
                        case <-time.After(d):
×
160
                                tr.CloseIdleConnections()
×
161
                        case <-stopCh:
×
162
                                return
×
163
                        }
164
                }
165
        }(20 * time.Second)
166

167
        if apiServerURL != nil {
×
168
                return &rest.Config{
×
169
                        Host:      apiServerURL.String(),
×
170
                        Timeout:   timeout,
×
171
                        Transport: tr,
×
172
                        QPS:       100.0,
×
173
                        Burst:     500,
×
174
                }, nil
×
175
        }
×
176

177
        // Try in-cluster config
178
        config, err := rest.InClusterConfig()
×
179
        if err == rest.ErrNotInCluster {
×
180
                // fall back to kubeconfig
×
181
                kubeconfig := os.Getenv("KUBECONFIG")
×
182
                if kubeconfig == "" {
×
183
                        kubeconfig = os.ExpandEnv("${HOME}/.kube/config")
×
184
                }
×
185
                return clientcmd.BuildConfigFromFlags("", kubeconfig)
×
186
        }
187
        if err != nil {
×
188
                return nil, err
×
189
        }
×
190

191
        // patch TLS config
192
        restTransportConfig, err := config.TransportConfig()
×
193
        if err != nil {
×
194
                return nil, err
×
195
        }
×
196
        restTLSConfig, err := transport.TLSConfigFor(restTransportConfig)
×
197
        if err != nil {
×
198
                return nil, err
×
199
        }
×
200
        tr.TLSClientConfig = restTLSConfig
×
201

×
202
        config.Timeout = timeout
×
203
        config.Transport = tr
×
204
        config.QPS = 100.0
×
205
        config.Burst = 500
×
206
        // disable TLSClientConfig to make the custom Transport work
×
207
        config.TLSClientConfig = rest.TLSClientConfig{}
×
208
        return config, nil
×
209
}
210

211
// gather go metrics
212
func serveMetrics(address string) {
×
213
        http.Handle("/metrics", promhttp.Handler())
×
214
        log.Fatal(http.ListenAndServe(address, nil))
×
215
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc