• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando-incubator / stackset-controller / 21067310236

16 Jan 2026 12:54PM UTC coverage: 46.483% (-3.4%) from 49.897%
21067310236

Pull #723

github

mikkeloscar
Refactor controller queue logic

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
Pull Request #723: Refactor controller queue logic

6 of 405 new or added lines in 4 files covered. (1.48%)

9 existing lines in 1 file now uncovered.

2670 of 5744 relevant lines covered (46.48%)

0.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

24.03
/controller/stackset.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "net/http"
7
        "runtime/debug"
8
        "strings"
9
        "sync"
10
        "time"
11

12
        "github.com/google/go-cmp/cmp"
13
        "github.com/google/go-cmp/cmp/cmpopts"
14
        "github.com/heptiolabs/healthcheck"
15
        "github.com/prometheus/client_golang/prometheus"
16
        log "github.com/sirupsen/logrus"
17
        rgv1 "github.com/szuecs/routegroup-client/apis/zalando.org/v1"
18
        routegroupinformers "github.com/szuecs/routegroup-client/client/informers/externalversions"
19
        rginformerv1 "github.com/szuecs/routegroup-client/client/informers/externalversions/zalando.org/v1"
20
        zv1 "github.com/zalando-incubator/stackset-controller/pkg/apis/zalando.org/v1"
21
        stacksetinformers "github.com/zalando-incubator/stackset-controller/pkg/client/informers/externalversions"
22
        stacksetinformerv1 "github.com/zalando-incubator/stackset-controller/pkg/client/informers/externalversions/zalando.org/v1"
23
        "github.com/zalando-incubator/stackset-controller/pkg/clientset"
24
        "github.com/zalando-incubator/stackset-controller/pkg/core"
25
        "github.com/zalando-incubator/stackset-controller/pkg/recorder"
26
        "golang.org/x/sync/errgroup"
27
        "golang.org/x/time/rate"
28
        appsv1 "k8s.io/api/apps/v1"
29
        autoscalingv2 "k8s.io/api/autoscaling/v2"
30
        v1 "k8s.io/api/core/v1"
31
        networking "k8s.io/api/networking/v1"
32
        "k8s.io/apimachinery/pkg/api/equality"
33
        "k8s.io/apimachinery/pkg/api/errors"
34
        "k8s.io/apimachinery/pkg/api/resource"
35
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36
        "k8s.io/apimachinery/pkg/fields"
37
        "k8s.io/apimachinery/pkg/labels"
38
        "k8s.io/apimachinery/pkg/runtime"
39
        "k8s.io/apimachinery/pkg/types"
40
        "k8s.io/apimachinery/pkg/util/wait"
41
        "k8s.io/client-go/informers"
42
        appsinformerv1 "k8s.io/client-go/informers/apps/v1"
43
        autoscalinginformerv2 "k8s.io/client-go/informers/autoscaling/v2"
44
        coreinformer "k8s.io/client-go/informers/core/v1"
45
        ingressinformerv1 "k8s.io/client-go/informers/networking/v1"
46
        "k8s.io/client-go/tools/cache"
47
        kube_record "k8s.io/client-go/tools/record"
48
        "k8s.io/client-go/util/workqueue"
49
)
50

51
const (
52
        PrescaleStacksAnnotationKey               = "alpha.stackset-controller.zalando.org/prescale-stacks"
53
        ResetHPAMinReplicasDelayAnnotationKey     = "alpha.stackset-controller.zalando.org/reset-hpa-min-replicas-delay"
54
        StacksetControllerControllerAnnotationKey = "stackset-controller.zalando.org/controller"
55
        ControllerLastUpdatedAnnotationKey        = "stackset-controller.zalando.org/updated-timestamp"
56

57
        reasonFailedManageStackSet = "FailedManageStackSet"
58

59
        defaultResetMinReplicasDelay = 10 * time.Minute
60
)
61

62
var configurationResourceNameError = "ConfigurationResource name must be prefixed by Stack name. ConfigurationResource: %s, Stack: %s"
63

64
type controllerInformers struct {
65
        stacksetInformer   stacksetinformerv1.StackSetInformer
66
        stackInformer      stacksetinformerv1.StackInformer
67
        ingressInformer    ingressinformerv1.IngressInformer
68
        routegroupInformer rginformerv1.RouteGroupInformer
69
        deploymentInformer appsinformerv1.DeploymentInformer
70
        serviceInformer    coreinformer.ServiceInformer
71
        hpaInformer        autoscalinginformerv2.HorizontalPodAutoscalerInformer
72
        configMapInformer  coreinformer.ConfigMapInformer
73
        secretInformer     coreinformer.SecretInformer
74
        pcsInformer        stacksetinformerv1.PlatformCredentialsSetInformer
75
}
76

77
// StackSetController is the main controller. It watches for changes to
78
// stackset resources and starts and maintains other controllers per
79
// stackset resource.
80
type StackSetController struct {
81
        logger          *log.Entry
82
        client          clientset.Interface
83
        informers       controllerInformers
84
        config          StackSetConfig
85
        stacksetEvents  chan stacksetEvent
86
        stacksetStore   map[types.UID]zv1.StackSet
87
        recorder        kube_record.EventRecorder
88
        metricsReporter *core.MetricsReporter
89
        HealthReporter  healthcheck.Handler
90
        now             func() string
91
        sync.Mutex
92
        queue workqueue.TypedRateLimitingInterface[types.NamespacedName]
93
}
94

95
type StackSetConfig struct {
96
        Namespace    string
97
        ControllerID string
98

99
        ClusterDomains              []string
100
        BackendWeightsAnnotationKey string
101
        SyncIngressAnnotations      []string
102

103
        ReconcileWorkers int
104
        Interval         time.Duration
105

106
        RouteGroupSupportEnabled bool
107
        ConfigMapSupportEnabled  bool
108
        SecretSupportEnabled     bool
109
        PcsSupportEnabled        bool
110
        ForwardSupportEnabled    bool
111
}
112

113
type stacksetEvent struct {
114
        Deleted  bool
115
        StackSet *zv1.StackSet
116
}
117

118
// eventedError wraps an error that was already exposed as an event to the user
119
type eventedError struct {
120
        err error
121
}
122

123
func (ee *eventedError) Error() string {
×
124
        return ee.err.Error()
×
125
}
×
126

127
func now() string {
×
128
        return time.Now().Format(time.RFC3339)
×
129
}
×
130

131
// NewStackSetController initializes a new StackSetController.
132
func NewStackSetController(
133
        client clientset.Interface,
134
        registry prometheus.Registerer,
135
        config StackSetConfig,
136
) (*StackSetController, error) {
1✔
137
        metricsReporter, err := core.NewMetricsReporter(registry)
1✔
138
        if err != nil {
1✔
139
                return nil, err
×
140
        }
×
141

142
        logger := log.WithField("controller", "stackset")
1✔
143

1✔
144
        if config.ControllerID != "" {
1✔
145
                logger = logger.WithField("controller_id", config.ControllerID)
×
146
        }
×
147

148
        return &StackSetController{
1✔
149
                logger:          logger,
1✔
150
                client:          client,
1✔
151
                config:          config,
1✔
152
                stacksetEvents:  make(chan stacksetEvent, 1),
1✔
153
                stacksetStore:   make(map[types.UID]zv1.StackSet),
1✔
154
                recorder:        recorder.CreateEventRecorder(client),
1✔
155
                metricsReporter: metricsReporter,
1✔
156
                HealthReporter:  healthcheck.NewHandler(),
1✔
157
                now:             now,
1✔
158
                queue: workqueue.NewTypedRateLimitingQueue(workqueue.NewTypedMaxOfRateLimiter(
1✔
159
                        workqueue.NewTypedItemExponentialFailureRateLimiter[types.NamespacedName](5*time.Millisecond, 5*time.Second),
1✔
160
                        // 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
1✔
161
                        &workqueue.TypedBucketRateLimiter[types.NamespacedName]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
1✔
162
                )),
1✔
163
        }, nil
1✔
164
}
165

166
func (c *StackSetController) stacksetLogger(ssc *core.StackSetContainer) *log.Entry {
×
167
        return c.logger.WithFields(map[string]interface{}{
×
168
                "namespace": ssc.StackSet.Namespace,
×
169
                "stackset":  ssc.StackSet.Name,
×
170
        })
×
171
}
×
172

173
func (c *StackSetController) stackLogger(ssc *core.StackSetContainer, sc *core.StackContainer) *log.Entry {
×
174
        return c.logger.WithFields(map[string]interface{}{
×
175
                "namespace": ssc.StackSet.Namespace,
×
176
                "stackset":  ssc.StackSet.Name,
×
177
                "stack":     sc.Name(),
×
178
        })
×
179
}
×
180

181
// Run runs the main loop of the StackSetController. Before the loops it
182
// sets up a watcher to watch StackSet resources. The watch will send
183
// changes over a channel which is polled from the main loop.
184
func (c *StackSetController) Run(ctx context.Context) error {
×
185
        var nextCheck time.Time
×
186

×
187
        // We're not alive if nextCheck is too far in the past
×
188
        c.HealthReporter.AddLivenessCheck("nextCheck", func() error {
×
189
                if time.Since(nextCheck) > 5*c.config.Interval {
×
190
                        return fmt.Errorf("nextCheck too old")
×
191
                }
×
192
                return nil
×
193
        })
194

195
        err := c.startWatch(ctx)
×
196
        if err != nil {
×
197
                return err
×
198
        }
×
199

200
        http.HandleFunc("/healthz", c.HealthReporter.LiveEndpoint)
×
201

×
202
        nextCheck = time.Now().Add(-c.config.Interval)
×
203

×
204
        for {
×
205
                select {
×
206
                case <-time.After(time.Until(nextCheck)):
×
207

×
208
                        nextCheck = time.Now().Add(c.config.Interval)
×
209

×
210
                        stackSetContainers, err := c.collectResources(ctx)
×
211
                        if err != nil {
×
212
                                c.logger.Errorf("Failed to collect resources: %v", err)
×
213
                                continue
×
214
                        }
215

216
                        var reconcileGroup errgroup.Group
×
217
                        reconcileGroup.SetLimit(c.config.ReconcileWorkers)
×
218
                        for stackset, container := range stackSetContainers {
×
219
                                container := container
×
220
                                stackset := stackset
×
221

×
222
                                reconcileGroup.Go(func() error {
×
223
                                        if _, ok := c.stacksetStore[stackset]; ok {
×
224
                                                err := c.ReconcileStackSet(ctx, container)
×
225
                                                if err != nil {
×
226
                                                        c.stacksetLogger(container).Errorf("unable to reconcile a stackset: %v", err)
×
227
                                                        return c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
228
                                                }
×
229
                                        }
230
                                        return nil
×
231
                                })
232
                        }
233

234
                        err = reconcileGroup.Wait()
×
235
                        if err != nil {
×
236
                                c.logger.Errorf("Failed waiting for reconcilers: %v", err)
×
237
                        }
×
238
                        err = c.metricsReporter.Report(stackSetContainers)
×
239
                        if err != nil {
×
240
                                c.logger.Errorf("Failed reporting metrics: %v", err)
×
241
                        }
×
242
                case e := <-c.stacksetEvents:
×
243
                        stackset := *e.StackSet
×
244
                        fixupStackSetTypeMeta(&stackset)
×
245

×
246
                        // update/delete existing entry
×
247
                        if _, ok := c.stacksetStore[stackset.UID]; ok {
×
248
                                if e.Deleted || !c.hasOwnership(&stackset) {
×
249
                                        delete(c.stacksetStore, stackset.UID)
×
250
                                        continue
×
251
                                }
252

253
                                // update stackset entry
254
                                c.stacksetStore[stackset.UID] = stackset
×
255
                                continue
×
256
                        }
257

258
                        // check if stackset should be managed by the controller
259
                        if !c.hasOwnership(&stackset) {
×
260
                                continue
×
261
                        }
262

263
                        c.logger.Infof("Adding entry for StackSet %s/%s", stackset.Namespace, stackset.Name)
×
264
                        c.stacksetStore[stackset.UID] = stackset
×
265
                case <-ctx.Done():
×
266
                        c.logger.Info("Terminating main controller loop.")
×
267
                        return nil
×
268
                }
269
        }
270
}
271

NEW
272
func (c *StackSetController) Run2(ctx context.Context) error {
×
NEW
273
        // defer runtime.HandleCrash()
×
NEW
274
        defer c.queue.ShutDown()
×
NEW
275

×
NEW
276
        // TODO: health
×
NEW
277
        // TODO: metricsReporter
×
NEW
278
        c.logger.Info("Starting Controller")
×
NEW
279

×
NEW
280
        err := c.setupInformers(ctx)
×
NEW
281
        if err != nil {
×
NEW
282
                return fmt.Errorf("failed to setup informers: %v", err)
×
NEW
283
        }
×
284

NEW
285
        for i := 0; i < c.config.ReconcileWorkers; i++ {
×
NEW
286
                go wait.UntilWithContext(ctx, c.worker, time.Second)
×
NEW
287
        }
×
288

NEW
289
        <-ctx.Done()
×
NEW
290
        c.logger.Info("Shutting down controller")
×
NEW
291
        return nil
×
292
}
293

NEW
294
func (c *StackSetController) worker(ctx context.Context) {
×
NEW
295
        for c.processNextItem(ctx) {
×
NEW
296
        }
×
297
}
298

NEW
299
func (c *StackSetController) processNextItem(ctx context.Context) bool {
×
NEW
300
        key, quit := c.queue.Get()
×
NEW
301
        if quit {
×
NEW
302
                return false
×
NEW
303
        }
×
NEW
304
        defer c.queue.Done(key)
×
NEW
305

×
NEW
306
        err := c.syncHandler(ctx, key)
×
NEW
307
        if err == nil {
×
NEW
308
                c.queue.Forget(key)
×
NEW
309
                return true
×
NEW
310
        }
×
311

312
        // runtime.HandleError(fmt.Errorf("sync %q failed: %v", key, err))
NEW
313
        c.queue.AddRateLimited(key)
×
NEW
314
        return true
×
315
}
316

NEW
317
func (c *StackSetController) syncHandler(ctx context.Context, key types.NamespacedName) error {
×
NEW
318
        obj, exists, err := c.informers.stacksetInformer.Informer().GetIndexer().GetByKey(key.String())
×
NEW
319
        if err != nil {
×
NEW
320
                return fmt.Errorf("failed to get StackSet %s from informer: %v", key.String(), err)
×
NEW
321
        }
×
NEW
322
        if !exists {
×
NEW
323
                return fmt.Errorf("StackSet %s not found in informer", key.String())
×
NEW
324
        }
×
325

NEW
326
        stackset, ok := obj.(*zv1.StackSet)
×
NEW
327
        if !ok {
×
NEW
328
                return fmt.Errorf("failed to cast object to StackSet for %s", key.String())
×
NEW
329
        }
×
330

NEW
331
        if !c.hasOwnership(stackset) {
×
NEW
332
                return nil
×
NEW
333
        }
×
334

NEW
335
        container, err := c.stacksetContainer(stackset.DeepCopy())
×
NEW
336
        if err != nil {
×
NEW
337
                return err
×
NEW
338
        }
×
339

NEW
340
        err = c.ReconcileStackSet(ctx, container)
×
NEW
341
        if err != nil {
×
NEW
342
                c.stacksetLogger(container).Errorf("unable to reconcile a stackset: %v", err)
×
NEW
343
                return err
×
NEW
344
        }
×
NEW
345
        return nil
×
346
}
347

348
// collectResources collects resources for all stacksets at once and stores them per StackSet/Stack so that we don't
349
// overload the API requests with unnecessary requests
350
func (c *StackSetController) collectResources(ctx context.Context) (map[types.UID]*core.StackSetContainer, error) {
1✔
351
        stacksets := make(map[types.UID]*core.StackSetContainer, len(c.stacksetStore))
1✔
352
        for uid, stackset := range c.stacksetStore {
2✔
353
                stackset := stackset
1✔
354

1✔
355
                reconciler := core.TrafficReconciler(&core.SimpleTrafficReconciler{})
1✔
356

1✔
357
                // use prescaling logic if enabled with an annotation
1✔
358
                if _, ok := stackset.Annotations[PrescaleStacksAnnotationKey]; ok {
2✔
359
                        resetDelay := defaultResetMinReplicasDelay
1✔
360
                        if resetDelayValue, ok := getResetMinReplicasDelay(stackset.Annotations); ok {
2✔
361
                                resetDelay = resetDelayValue
1✔
362
                        }
1✔
363
                        reconciler = &core.PrescalingTrafficReconciler{
1✔
364
                                ResetHPAMinReplicasTimeout: resetDelay,
1✔
365
                        }
1✔
366
                }
367

368
                stacksetContainer := core.NewContainer(
1✔
369
                        &stackset,
1✔
370
                        reconciler,
1✔
371
                        c.config.BackendWeightsAnnotationKey,
1✔
372
                        c.config.ClusterDomains,
1✔
373
                        c.config.SyncIngressAnnotations,
1✔
374
                )
1✔
375
                stacksets[uid] = stacksetContainer
1✔
376
        }
377

378
        err := c.collectStacks(ctx, stacksets)
1✔
379
        if err != nil {
1✔
380
                return nil, err
×
381
        }
×
382

383
        err = c.collectIngresses(ctx, stacksets)
1✔
384
        if err != nil {
1✔
385
                return nil, err
×
386
        }
×
387

388
        if c.config.RouteGroupSupportEnabled {
2✔
389
                err = c.collectRouteGroups(ctx, stacksets)
1✔
390
                if err != nil {
1✔
391
                        return nil, err
×
392
                }
×
393
        }
394

395
        err = c.collectDeployments(ctx, stacksets)
1✔
396
        if err != nil {
1✔
397
                return nil, err
×
398
        }
×
399

400
        err = c.collectServices(ctx, stacksets)
1✔
401
        if err != nil {
1✔
402
                return nil, err
×
403
        }
×
404

405
        err = c.collectHPAs(ctx, stacksets)
1✔
406
        if err != nil {
1✔
407
                return nil, err
×
408
        }
×
409

410
        if c.config.ConfigMapSupportEnabled {
2✔
411
                err = c.collectConfigMaps(ctx, stacksets)
1✔
412
                if err != nil {
1✔
413
                        return nil, err
×
414
                }
×
415
        }
416

417
        if c.config.SecretSupportEnabled {
2✔
418
                err = c.collectSecrets(ctx, stacksets)
1✔
419
                if err != nil {
1✔
420
                        return nil, err
×
421
                }
×
422
        }
423

424
        if c.config.PcsSupportEnabled {
2✔
425
                err = c.collectPlatformCredentialsSet(ctx, stacksets)
1✔
426
                if err != nil {
1✔
427
                        return nil, err
×
428
                }
×
429
        }
430

431
        return stacksets, nil
1✔
432
}
433

434
func (c *StackSetController) collectIngresses(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
435
        ingresses, err := c.client.NetworkingV1().Ingresses(c.config.Namespace).List(ctx, metav1.ListOptions{})
1✔
436

1✔
437
        if err != nil {
1✔
438
                return fmt.Errorf("failed to list Ingresses: %v", err)
×
439
        }
×
440

441
        for _, i := range ingresses.Items {
2✔
442
                ingress := i
1✔
443
                if uid, ok := getOwnerUID(ingress.ObjectMeta); ok {
2✔
444
                        // stackset ingress
1✔
445
                        if s, ok := stacksets[uid]; ok {
2✔
446
                                s.Ingress = &ingress
1✔
447
                                continue
1✔
448
                        }
449

450
                        // stack ingress
451
                        for _, stackset := range stacksets {
2✔
452
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
453
                                        if strings.HasSuffix(
1✔
454
                                                ingress.ObjectMeta.Name,
1✔
455
                                                core.SegmentSuffix,
1✔
456
                                        ) {
2✔
457
                                                // Traffic Segment
1✔
458
                                                s.Resources.IngressSegment = &ingress
1✔
459
                                        } else {
2✔
460
                                                s.Resources.Ingress = &ingress
1✔
461
                                        }
1✔
462
                                        break
1✔
463
                                }
464
                        }
465
                }
466
        }
467
        return nil
1✔
468
}
469

470
func (c *StackSetController) collectRouteGroups(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
471
        rgs, err := c.client.RouteGroupV1().RouteGroups(c.config.Namespace).List(
1✔
472
                ctx,
1✔
473
                metav1.ListOptions{},
1✔
474
        )
1✔
475
        if err != nil {
1✔
476
                return fmt.Errorf("failed to list RouteGroups: %v", err)
×
477
        }
×
478

479
        for _, rg := range rgs.Items {
2✔
480
                routegroup := rg
1✔
481
                if uid, ok := getOwnerUID(routegroup.ObjectMeta); ok {
2✔
482
                        // stackset routegroups
1✔
483
                        if s, ok := stacksets[uid]; ok {
2✔
484
                                s.RouteGroup = &routegroup
1✔
485
                                continue
1✔
486
                        }
487

488
                        // stack routegroups
489
                        for _, stackset := range stacksets {
2✔
490
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
491
                                        if strings.HasSuffix(
1✔
492
                                                routegroup.ObjectMeta.Name,
1✔
493
                                                core.SegmentSuffix,
1✔
494
                                        ) {
2✔
495
                                                // Traffic Segment
1✔
496
                                                s.Resources.RouteGroupSegment = &routegroup
1✔
497
                                        } else {
2✔
498
                                                s.Resources.RouteGroup = &routegroup
1✔
499
                                        }
1✔
500
                                        break
1✔
501
                                }
502
                        }
503
                }
504
        }
505
        return nil
1✔
506
}
507

508
func (c *StackSetController) collectStacks(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
509
        stacks, err := c.client.ZalandoV1().Stacks(c.config.Namespace).List(ctx, metav1.ListOptions{})
1✔
510
        if err != nil {
1✔
511
                return fmt.Errorf("failed to list Stacks: %v", err)
×
512
        }
×
513

514
        for _, stack := range stacks.Items {
2✔
515
                if uid, ok := getOwnerUID(stack.ObjectMeta); ok {
2✔
516
                        if s, ok := stacksets[uid]; ok {
2✔
517
                                stack := stack
1✔
518
                                fixupStackTypeMeta(&stack)
1✔
519

1✔
520
                                s.StackContainers[stack.UID] = &core.StackContainer{
1✔
521
                                        Stack: &stack,
1✔
522
                                }
1✔
523
                                continue
1✔
524
                        }
525
                }
526
        }
527
        return nil
1✔
528
}
529

530
func (c *StackSetController) collectDeployments(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
531
        deployments, err := c.client.AppsV1().Deployments(c.config.Namespace).List(ctx, metav1.ListOptions{})
1✔
532
        if err != nil {
1✔
533
                return fmt.Errorf("failed to list Deployments: %v", err)
×
534
        }
×
535

536
        for _, d := range deployments.Items {
2✔
537
                deployment := d
1✔
538
                if uid, ok := getOwnerUID(deployment.ObjectMeta); ok {
2✔
539
                        for _, stackset := range stacksets {
2✔
540
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
541
                                        s.Resources.Deployment = &deployment
1✔
542
                                        break
1✔
543
                                }
544
                        }
545
                }
546
        }
547
        return nil
1✔
548
}
549

550
func (c *StackSetController) collectServices(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
551
        services, err := c.client.CoreV1().Services(c.config.Namespace).List(ctx, metav1.ListOptions{})
1✔
552
        if err != nil {
1✔
553
                return fmt.Errorf("failed to list Services: %v", err)
×
554
        }
×
555

556
Items:
1✔
557
        for _, s := range services.Items {
2✔
558
                service := s
1✔
559
                if uid, ok := getOwnerUID(service.ObjectMeta); ok {
2✔
560
                        for _, stackset := range stacksets {
2✔
561
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
562
                                        s.Resources.Service = &service
1✔
563
                                        continue Items
1✔
564
                                }
565

566
                                // service/HPA used to be owned by the deployment for some reason
567
                                for _, stack := range stackset.StackContainers {
2✔
568
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
569
                                                stack.Resources.Service = &service
1✔
570
                                                continue Items
1✔
571
                                        }
572
                                }
573
                        }
574
                }
575
        }
576
        return nil
1✔
577
}
578

579
func (c *StackSetController) collectHPAs(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
580
        hpas, err := c.client.AutoscalingV2().HorizontalPodAutoscalers(c.config.Namespace).List(ctx, metav1.ListOptions{})
1✔
581
        if err != nil {
1✔
582
                return fmt.Errorf("failed to list HPAs: %v", err)
×
583
        }
×
584

585
Items:
1✔
586
        for _, h := range hpas.Items {
2✔
587
                hpa := h
1✔
588
                if uid, ok := getOwnerUID(hpa.ObjectMeta); ok {
2✔
589
                        for _, stackset := range stacksets {
2✔
590
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
591
                                        s.Resources.HPA = &hpa
1✔
592
                                        continue Items
1✔
593
                                }
594

595
                                // service/HPA used to be owned by the deployment for some reason
596
                                for _, stack := range stackset.StackContainers {
2✔
597
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
598
                                                stack.Resources.HPA = &hpa
1✔
599
                                                continue Items
1✔
600
                                        }
601
                                }
602
                        }
603
                }
604
        }
605
        return nil
1✔
606
}
607

608
func (c *StackSetController) collectConfigMaps(
609
        ctx context.Context,
610
        stacksets map[types.UID]*core.StackSetContainer,
611
) error {
1✔
612
        configMaps, err := c.client.CoreV1().ConfigMaps(c.config.Namespace).List(ctx, metav1.ListOptions{})
1✔
613
        if err != nil {
1✔
614
                return fmt.Errorf("failed to list ConfigMaps: %v", err)
×
615
        }
×
616

617
        for _, cm := range configMaps.Items {
2✔
618
                configMap := cm
1✔
619
                if uid, ok := getOwnerUID(configMap.ObjectMeta); ok {
2✔
620
                        for _, stackset := range stacksets {
2✔
621
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
622
                                        s.Resources.ConfigMaps = append(s.Resources.ConfigMaps, &configMap)
1✔
623
                                        break
1✔
624
                                }
625
                        }
626
                }
627
        }
628
        return nil
1✔
629
}
630

631
func (c *StackSetController) collectSecrets(
632
        ctx context.Context,
633
        stacksets map[types.UID]*core.StackSetContainer,
634
) error {
1✔
635
        secrets, err := c.client.CoreV1().Secrets(c.config.Namespace).List(ctx, metav1.ListOptions{})
1✔
636
        if err != nil {
1✔
637
                return fmt.Errorf("failed to list Secrets: %v", err)
×
638
        }
×
639

640
        for _, sct := range secrets.Items {
2✔
641
                secret := sct
1✔
642
                if uid, ok := getOwnerUID(secret.ObjectMeta); ok {
2✔
643
                        for _, stackset := range stacksets {
2✔
644
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
645
                                        s.Resources.Secrets = append(s.Resources.Secrets, &secret)
1✔
646
                                        break
1✔
647
                                }
648
                        }
649
                }
650
        }
651
        return nil
1✔
652
}
653

654
func (c *StackSetController) collectPlatformCredentialsSet(
655
        ctx context.Context,
656
        stacksets map[types.UID]*core.StackSetContainer,
657
) error {
1✔
658
        platformCredentialsSets, err := c.client.ZalandoV1().PlatformCredentialsSets(c.config.Namespace).
1✔
659
                List(ctx, metav1.ListOptions{})
1✔
660
        if err != nil {
1✔
661
                return fmt.Errorf("failed to list PlatformCredentialsSet: %v", err)
×
662
        }
×
663

664
        for _, platformCredentialsSet := range platformCredentialsSets.Items {
1✔
665
                pcs := platformCredentialsSet
×
666
                if uid, ok := getOwnerUID(platformCredentialsSet.ObjectMeta); ok {
×
667
                        for _, stackset := range stacksets {
×
668
                                if s, ok := stackset.StackContainers[uid]; ok {
×
669
                                        s.Resources.PlatformCredentialsSets = append(
×
670
                                                s.Resources.PlatformCredentialsSets,
×
671
                                                &pcs,
×
672
                                        )
×
673
                                        break
×
674
                                }
675
                        }
676
                }
677
        }
678
        return nil
1✔
679
}
680

681
// collectResources collects resources for all stacksets at once and stores them per StackSet/Stack so that we don't
682
// overload the API requests with unnecessary requests
NEW
683
func (c *StackSetController) stacksetContainer(stackset *zv1.StackSet) (*core.StackSetContainer, error) {
×
NEW
684
        key := types.NamespacedName{
×
NEW
685
                Namespace: stackset.Namespace,
×
NEW
686
                Name:      stackset.Name,
×
NEW
687
        }
×
NEW
688

×
NEW
689
        fixupStackSetTypeMeta(stackset)
×
NEW
690

×
NEW
691
        reconciler := core.TrafficReconciler(&core.SimpleTrafficReconciler{})
×
NEW
692

×
NEW
693
        // use prescaling logic if enabled with an annotation
×
NEW
694
        if _, ok := stackset.Annotations[PrescaleStacksAnnotationKey]; ok {
×
NEW
695
                resetDelay := defaultResetMinReplicasDelay
×
NEW
696
                if resetDelayValue, ok := getResetMinReplicasDelay(stackset.Annotations); ok {
×
NEW
697
                        resetDelay = resetDelayValue
×
NEW
698
                }
×
NEW
699
                reconciler = &core.PrescalingTrafficReconciler{
×
NEW
700
                        ResetHPAMinReplicasTimeout: resetDelay,
×
NEW
701
                }
×
702
        }
703

NEW
704
        stacksetContainer := core.NewContainer(
×
NEW
705
                stackset,
×
NEW
706
                reconciler,
×
NEW
707
                c.config.BackendWeightsAnnotationKey,
×
NEW
708
                c.config.ClusterDomains,
×
NEW
709
                c.config.SyncIngressAnnotations,
×
NEW
710
        )
×
NEW
711

×
NEW
712
        labelSelector := labels.SelectorFromSet(labels.Set{(core.StacksetHeritageLabelKey): stackset.Name})
×
NEW
713
        err := c.addStacks(stacksetContainer, key, labelSelector)
×
NEW
714
        if err != nil {
×
NEW
715
                return nil, err
×
NEW
716
        }
×
717

NEW
718
        err = c.addIngresses(stacksetContainer, key, labelSelector)
×
NEW
719
        if err != nil {
×
NEW
720
                return nil, err
×
NEW
721
        }
×
722

NEW
723
        if c.config.RouteGroupSupportEnabled {
×
NEW
724
                err = c.addRouteGroups(stacksetContainer, key, labelSelector)
×
NEW
725
                if err != nil {
×
NEW
726
                        return nil, err
×
NEW
727
                }
×
728
        }
729

NEW
730
        err = c.addDeployments(stacksetContainer, key, labelSelector)
×
NEW
731
        if err != nil {
×
NEW
732
                return nil, err
×
NEW
733
        }
×
734

NEW
735
        err = c.addServices(stacksetContainer, key, labelSelector)
×
NEW
736
        if err != nil {
×
NEW
737
                return nil, err
×
NEW
738
        }
×
739

NEW
740
        err = c.addHPAs(stacksetContainer, key, labelSelector)
×
NEW
741
        if err != nil {
×
NEW
742
                return nil, err
×
NEW
743
        }
×
744

NEW
745
        if c.config.ConfigMapSupportEnabled {
×
NEW
746
                err = c.addConfigMaps(stacksetContainer, key, labelSelector)
×
NEW
747
                if err != nil {
×
NEW
748
                        return nil, err
×
NEW
749
                }
×
750
        }
751

NEW
752
        if c.config.SecretSupportEnabled {
×
NEW
753
                err = c.addSecrets(stacksetContainer, key, labelSelector)
×
NEW
754
                if err != nil {
×
NEW
755
                        return nil, err
×
NEW
756
                }
×
757
        }
758

NEW
759
        if c.config.PcsSupportEnabled {
×
NEW
760
                err = c.addPlatformCredentialsSet(stacksetContainer, key, labelSelector)
×
NEW
761
                if err != nil {
×
NEW
762
                        return nil, err
×
NEW
763
                }
×
764
        }
765

NEW
766
        return stacksetContainer, nil
×
767
}
768

NEW
769
func (c *StackSetController) addStacks(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error {
×
NEW
770
        stacks, err := c.informers.stackInformer.Lister().Stacks(key.Namespace).List(labelSelector)
×
NEW
771
        if err != nil {
×
NEW
772
                return fmt.Errorf("failed to list Stacks for StackSet %s: %v", key.String(), err)
×
NEW
773
        }
×
NEW
774
        for _, stack := range stacks {
×
NEW
775
                stack := stack.DeepCopy()
×
NEW
776
                if uid, ok := getOwnerUID(stack.ObjectMeta); ok {
×
NEW
777
                        if uid == stacksetContainer.StackSet.UID {
×
NEW
778
                                fixupStackTypeMeta(stack)
×
NEW
779

×
NEW
780
                                stacksetContainer.StackContainers[stack.UID] = &core.StackContainer{
×
NEW
781
                                        Stack: stack,
×
NEW
782
                                }
×
NEW
783
                        }
×
784
                }
785
        }
NEW
786
        return nil
×
787
}
788

NEW
789
func (c *StackSetController) addIngresses(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error {
×
NEW
790
        ingresses, err := c.informers.ingressInformer.Lister().Ingresses(key.Namespace).List(labelSelector)
×
NEW
791
        if err != nil {
×
NEW
792
                return fmt.Errorf("failed to list Ingresses for StackSet %s: %v", key.String(), err)
×
NEW
793
        }
×
NEW
794
        for _, ingress := range ingresses {
×
NEW
795
                ingress := ingress.DeepCopy()
×
NEW
796
                if uid, ok := getOwnerUID(ingress.ObjectMeta); ok {
×
NEW
797
                        // stackset ingress
×
NEW
798
                        if uid == stacksetContainer.StackSet.UID {
×
NEW
799
                                stacksetContainer.Ingress = ingress
×
NEW
800
                                continue
×
801
                        }
802

803
                        // stack ingress
NEW
804
                        if s, ok := stacksetContainer.StackContainers[uid]; ok {
×
NEW
805
                                if strings.HasSuffix(
×
NEW
806
                                        ingress.ObjectMeta.Name,
×
NEW
807
                                        core.SegmentSuffix,
×
NEW
808
                                ) {
×
NEW
809
                                        // Traffic Segment
×
NEW
810
                                        s.Resources.IngressSegment = ingress
×
NEW
811
                                } else {
×
NEW
812
                                        s.Resources.Ingress = ingress
×
NEW
813
                                }
×
814
                        }
815
                }
816
        }
817

NEW
818
        return nil
×
819
}
820

NEW
821
func (c *StackSetController) addRouteGroups(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error {
×
NEW
822
        routegroups, err := c.informers.routegroupInformer.Lister().RouteGroups(key.Namespace).List(labelSelector)
×
NEW
823
        if err != nil {
×
NEW
824
                return fmt.Errorf("failed to list Ingresses for StackSet %s: %v", key.String(), err)
×
NEW
825
        }
×
NEW
826
        for _, routegroup := range routegroups {
×
NEW
827
                routegroup := routegroup.DeepCopy()
×
NEW
828
                if uid, ok := getOwnerUID(routegroup.ObjectMeta); ok {
×
NEW
829
                        // stackset routegroup
×
NEW
830
                        if uid == stacksetContainer.StackSet.UID {
×
NEW
831
                                stacksetContainer.RouteGroup = routegroup
×
NEW
832
                                continue
×
833
                        }
834

835
                        // stack routegroup
NEW
836
                        if s, ok := stacksetContainer.StackContainers[uid]; ok {
×
NEW
837
                                if strings.HasSuffix(
×
NEW
838
                                        routegroup.ObjectMeta.Name,
×
NEW
839
                                        core.SegmentSuffix,
×
NEW
840
                                ) {
×
NEW
841
                                        // Traffic Segment
×
NEW
842
                                        s.Resources.RouteGroupSegment = routegroup
×
NEW
843
                                } else {
×
NEW
844
                                        s.Resources.RouteGroup = routegroup
×
NEW
845
                                }
×
846
                        }
847
                }
848
        }
849

NEW
850
        return nil
×
851
}
852

NEW
853
func (c *StackSetController) addDeployments(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error {
×
NEW
854
        deployments, err := c.informers.deploymentInformer.Lister().Deployments(key.Namespace).List(labelSelector)
×
NEW
855
        if err != nil {
×
NEW
856
                return fmt.Errorf("failed to list Deployments for StackSet %s: %v", key.String(), err)
×
NEW
857
        }
×
858

NEW
859
        for _, deployment := range deployments {
×
NEW
860
                deployment := deployment.DeepCopy()
×
NEW
861
                if uid, ok := getOwnerUID(deployment.ObjectMeta); ok {
×
NEW
862
                        if s, ok := stacksetContainer.StackContainers[uid]; ok {
×
NEW
863
                                s.Resources.Deployment = deployment
×
NEW
864
                        }
×
865
                }
866
        }
NEW
867
        return nil
×
868
}
869

NEW
870
func (c *StackSetController) addServices(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error {
×
NEW
871
        services, err := c.informers.serviceInformer.Lister().Services(key.Namespace).List(labelSelector)
×
NEW
872
        if err != nil {
×
NEW
873
                return fmt.Errorf("failed to list Services for StackSet %s: %v", key.String(), err)
×
NEW
874
        }
×
875

NEW
876
Items:
×
NEW
877
        for _, service := range services {
×
NEW
878
                service := service.DeepCopy()
×
NEW
879
                if uid, ok := getOwnerUID(service.ObjectMeta); ok {
×
NEW
880
                        if s, ok := stacksetContainer.StackContainers[uid]; ok {
×
NEW
881
                                s.Resources.Service = service
×
NEW
882
                        }
×
883

884
                        // service/HPA used to be owned by the deployment for some reason
885
                        // TODO: check if this can be removed
NEW
886
                        for _, stack := range stacksetContainer.StackContainers {
×
NEW
887
                                if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
×
NEW
888
                                        stack.Resources.Service = service
×
NEW
889
                                        continue Items
×
890
                                }
891
                        }
892
                }
893
        }
NEW
894
        return nil
×
895
}
896

NEW
897
func (c *StackSetController) addHPAs(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error {
×
NEW
898
        hpas, err := c.informers.hpaInformer.Lister().HorizontalPodAutoscalers(key.Namespace).List(labelSelector)
×
NEW
899
        if err != nil {
×
NEW
900
                return fmt.Errorf("failed to list HPAs for StackSet %s: %v", key.String(), err)
×
NEW
901
        }
×
902

NEW
903
Items:
×
NEW
904
        for _, hpa := range hpas {
×
NEW
905
                hpa := hpa.DeepCopy()
×
NEW
906
                if uid, ok := getOwnerUID(hpa.ObjectMeta); ok {
×
NEW
907
                        if s, ok := stacksetContainer.StackContainers[uid]; ok {
×
NEW
908
                                s.Resources.HPA = hpa
×
NEW
909
                        }
×
910

911
                        // service/HPA used to be owned by the deployment for some reason
912
                        // TODO: check if this can be removed
NEW
913
                        for _, stack := range stacksetContainer.StackContainers {
×
NEW
914
                                if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
×
NEW
915
                                        stack.Resources.HPA = hpa
×
NEW
916
                                        continue Items
×
917
                                }
918
                        }
919
                }
920
        }
NEW
921
        return nil
×
922
}
923

NEW
924
func (c *StackSetController) addConfigMaps(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error {
×
NEW
925
        configMaps, err := c.informers.configMapInformer.Lister().ConfigMaps(key.Namespace).List(labelSelector)
×
NEW
926
        if err != nil {
×
NEW
927
                return fmt.Errorf("failed to list ConfigMaps for StackSet %s: %v", key.String(), err)
×
NEW
928
        }
×
929

NEW
930
        for _, configMap := range configMaps {
×
NEW
931
                configMap := configMap.DeepCopy()
×
NEW
932
                if uid, ok := getOwnerUID(configMap.ObjectMeta); ok {
×
NEW
933
                        if s, ok := stacksetContainer.StackContainers[uid]; ok {
×
NEW
934
                                s.Resources.ConfigMaps = append(s.Resources.ConfigMaps, configMap)
×
NEW
935
                        }
×
936
                }
937
        }
NEW
938
        return nil
×
939
}
940

NEW
941
func (c *StackSetController) addSecrets(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error {
×
NEW
942
        secrets, err := c.informers.secretInformer.Lister().Secrets(key.Namespace).List(labelSelector)
×
NEW
943
        if err != nil {
×
NEW
944
                return fmt.Errorf("failed to list Secrets for StackSet %s: %v", key.String(), err)
×
NEW
945
        }
×
946

NEW
947
        for _, secret := range secrets {
×
NEW
948
                secret := secret.DeepCopy()
×
NEW
949
                if uid, ok := getOwnerUID(secret.ObjectMeta); ok {
×
NEW
950
                        if s, ok := stacksetContainer.StackContainers[uid]; ok {
×
NEW
951
                                s.Resources.Secrets = append(s.Resources.Secrets, secret)
×
NEW
952
                        }
×
953
                }
954
        }
NEW
955
        return nil
×
956
}
957

NEW
958
func (c *StackSetController) addPlatformCredentialsSet(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error {
×
NEW
959
        platformCredentialsSets, err := c.informers.pcsInformer.Lister().PlatformCredentialsSets(key.Namespace).List(labelSelector)
×
NEW
960
        if err != nil {
×
NEW
961
                return fmt.Errorf("failed to list PlatformCredentialsSets for StackSet %s: %v", key.String(), err)
×
NEW
962
        }
×
963

NEW
964
        for _, pcs := range platformCredentialsSets {
×
NEW
965
                pcs := pcs.DeepCopy()
×
NEW
966
                if uid, ok := getOwnerUID(pcs.ObjectMeta); ok {
×
NEW
967
                        if s, ok := stacksetContainer.StackContainers[uid]; ok {
×
NEW
968
                                s.Resources.PlatformCredentialsSets = append(
×
NEW
969
                                        s.Resources.PlatformCredentialsSets,
×
NEW
970
                                        pcs,
×
NEW
971
                                )
×
NEW
972
                        }
×
973
                }
974
        }
NEW
975
        return nil
×
976
}
977

978
func getOwnerUID(objectMeta metav1.ObjectMeta) (types.UID, bool) {
1✔
979
        if len(objectMeta.OwnerReferences) == 1 {
2✔
980
                return objectMeta.OwnerReferences[0].UID, true
1✔
981
        }
1✔
982
        return "", false
1✔
983
}
984

985
func (c *StackSetController) errorEventf(object runtime.Object, reason string, err error) error {
×
986
        switch err.(type) {
×
987
        case *eventedError:
×
988
                // already notified
×
989
                return err
×
990
        default:
×
991
                c.recorder.Eventf(
×
992
                        object,
×
993
                        v1.EventTypeWarning,
×
994
                        reason,
×
995
                        err.Error())
×
996
                return &eventedError{err: err}
×
997
        }
998
}
999

1000
// hasOwnership returns true if the controller is the "owner" of the stackset.
1001
// Whether it's owner is determined by the value of the
1002
// 'stackset-controller.zalando.org/controller' annotation. If the value
1003
// matches the controllerID then it owns it, or if the controllerID is
1004
// "" and there's no annotation set.
1005
func (c *StackSetController) hasOwnership(stackset *zv1.StackSet) bool {
×
1006
        if stackset.Annotations != nil {
×
1007
                if owner, ok := stackset.Annotations[StacksetControllerControllerAnnotationKey]; ok {
×
1008
                        return owner == c.config.ControllerID
×
1009
                }
×
1010
        }
1011
        return c.config.ControllerID == ""
×
1012
}
1013

1014
func (c *StackSetController) startWatch(ctx context.Context) error {
×
1015
        informer := cache.NewSharedIndexInformer(
×
1016
                cache.NewListWatchFromClient(c.client.ZalandoV1().RESTClient(), "stacksets", c.config.Namespace, fields.Everything()),
×
1017
                &zv1.StackSet{},
×
1018
                0, // skip resync
×
1019
                cache.Indexers{},
×
1020
        )
×
1021

×
1022
        _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1023
                AddFunc:    c.add,
×
1024
                UpdateFunc: c.update,
×
1025
                DeleteFunc: c.del,
×
1026
        })
×
1027
        if err != nil {
×
1028
                return fmt.Errorf("failed to add event handler: %w", err)
×
1029
        }
×
1030

1031
        go informer.Run(ctx.Done())
×
1032
        if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
×
1033
                return fmt.Errorf("timed out waiting for caches to sync")
×
1034
        }
×
1035
        c.logger.Info("Synced StackSet watcher")
×
1036

×
1037
        return nil
×
1038
}
1039

1040
type informerInterface interface {
1041
        Informer() cache.SharedIndexInformer
1042
}
1043

NEW
1044
func (c *StackSetController) setupInformers(ctx context.Context) error {
×
NEW
1045
        factory := informers.NewSharedInformerFactoryWithOptions(c.client, 0, informers.WithNamespace(c.config.Namespace))
×
NEW
1046

×
NEW
1047
        stackSetFactory := stacksetinformers.NewSharedInformerFactoryWithOptions(c.client, 0, stacksetinformers.WithNamespace(c.config.Namespace))
×
NEW
1048
        rgClient := c.client.(*clientset.Clientset)
×
NEW
1049
        routegroupFactory := routegroupinformers.NewSharedInformerFactoryWithOptions(rgClient.RouteGroup, 0, routegroupinformers.WithNamespace(c.config.Namespace))
×
NEW
1050

×
NEW
1051
        c.informers.stacksetInformer = stackSetFactory.Zalando().V1().StackSets()
×
NEW
1052
        c.informers.stackInformer = stackSetFactory.Zalando().V1().Stacks()
×
NEW
1053
        c.informers.ingressInformer = factory.Networking().V1().Ingresses()
×
NEW
1054
        c.informers.routegroupInformer = routegroupFactory.Zalando().V1().RouteGroups()
×
NEW
1055
        c.informers.deploymentInformer = factory.Apps().V1().Deployments()
×
NEW
1056
        c.informers.serviceInformer = factory.Core().V1().Services()
×
NEW
1057
        c.informers.hpaInformer = factory.Autoscaling().V2().HorizontalPodAutoscalers()
×
NEW
1058
        c.informers.configMapInformer = factory.Core().V1().ConfigMaps()
×
NEW
1059
        c.informers.secretInformer = factory.Core().V1().Secrets()
×
NEW
1060
        c.informers.pcsInformer = stackSetFactory.Zalando().V1().PlatformCredentialsSets()
×
NEW
1061

×
NEW
1062
        resourceInformers := []struct {
×
NEW
1063
                informer informerInterface
×
NEW
1064
                resync   time.Duration
×
NEW
1065
        }{
×
NEW
1066
                {c.informers.stacksetInformer, c.config.Interval},
×
NEW
1067
                {c.informers.stackInformer, 0},
×
NEW
1068
                {c.informers.ingressInformer, 0},
×
NEW
1069
                {c.informers.routegroupInformer, 0},
×
NEW
1070
                {c.informers.deploymentInformer, 0},
×
NEW
1071
                {c.informers.serviceInformer, 0},
×
NEW
1072
                {c.informers.hpaInformer, 0},
×
NEW
1073
                {c.informers.configMapInformer, 0},
×
NEW
1074
                {c.informers.secretInformer, 0},
×
NEW
1075
                {c.informers.pcsInformer, 0},
×
NEW
1076
        }
×
NEW
1077

×
NEW
1078
        var hasSynced []cache.InformerSynced
×
NEW
1079
        for _, informer := range resourceInformers {
×
NEW
1080
                _, err := informer.informer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
×
NEW
1081
                        AddFunc:    c.addResource,
×
NEW
1082
                        UpdateFunc: c.updateResource,
×
NEW
1083
                        DeleteFunc: c.deleteResource,
×
NEW
1084
                },
×
NEW
1085
                        informer.resync)
×
NEW
1086
                if err != nil {
×
NEW
1087
                        return fmt.Errorf("failed to add event handler: %v", err)
×
NEW
1088
                }
×
1089

NEW
1090
                hasSynced = append(hasSynced, informer.informer.Informer().HasSynced)
×
NEW
1091
                go informer.informer.Informer().Run(ctx.Done())
×
1092
        }
1093

NEW
1094
        if !cache.WaitForCacheSync(ctx.Done(),
×
NEW
1095
                hasSynced...,
×
NEW
1096
        ) {
×
NEW
1097
                return fmt.Errorf("failed to sync informers")
×
NEW
1098
        }
×
1099

NEW
1100
        return nil
×
1101
}
1102

NEW
1103
func objToNamespacedName(obj any) (types.NamespacedName, bool) {
×
NEW
1104
        switch typedObj := obj.(type) {
×
NEW
1105
        case *zv1.StackSet:
×
NEW
1106
                return types.NamespacedName{
×
NEW
1107
                        Namespace: typedObj.Namespace,
×
NEW
1108
                        Name:      typedObj.Name,
×
NEW
1109
                }, true
×
NEW
1110
        case *zv1.Stack, *networking.Ingress, *rgv1.RouteGroup, *appsv1.Deployment, *v1.Service, *autoscalingv2.HorizontalPodAutoscaler, *v1.ConfigMap, *v1.Secret, *zv1.PlatformCredentialsSet:
×
NEW
1111
                meta, ok := obj.(metav1.ObjectMeta)
×
NEW
1112
                if !ok {
×
NEW
1113
                        return types.NamespacedName{}, false
×
NEW
1114
                }
×
1115

NEW
1116
                if stackset, ok := meta.GetLabels()[core.StacksetHeritageLabelKey]; ok {
×
NEW
1117
                        return types.NamespacedName{
×
NEW
1118
                                Namespace: meta.Namespace,
×
NEW
1119
                                Name:      stackset,
×
NEW
1120
                        }, true
×
NEW
1121
                }
×
1122

NEW
1123
                return types.NamespacedName{}, false
×
NEW
1124
        default:
×
NEW
1125
                return types.NamespacedName{}, false
×
1126
        }
1127
}
1128

NEW
1129
func (c *StackSetController) addResource(obj any) {
×
NEW
1130
        key, ok := objToNamespacedName(obj)
×
NEW
1131
        if !ok {
×
NEW
1132
                return
×
NEW
1133
        }
×
NEW
1134
        c.queue.Add(key)
×
1135
}
1136

NEW
1137
func (c *StackSetController) updateResource(oldObj, newObj any) {
×
NEW
1138
        c.addResource(newObj)
×
NEW
1139
}
×
1140

NEW
1141
func (c *StackSetController) deleteResource(obj any) {
×
NEW
1142
        stackset, ok := obj.(*zv1.StackSet)
×
NEW
1143
        if !ok {
×
NEW
1144
                // non-stackset deletions indicate refresh to resource
×
NEW
1145
                // associated with a stackset
×
NEW
1146
                c.addResource(obj)
×
NEW
1147
                return
×
NEW
1148
        }
×
1149

NEW
1150
        key := types.NamespacedName{
×
NEW
1151
                Namespace: stackset.Namespace,
×
NEW
1152
                Name:      stackset.Name,
×
NEW
1153
        }
×
NEW
1154

×
NEW
1155
        c.queue.Forget(key)
×
NEW
1156
        c.queue.Done(key)
×
1157
}
1158

1159
func (c *StackSetController) add(obj interface{}) {
×
1160
        stackset, ok := obj.(*zv1.StackSet)
×
1161
        if !ok {
×
1162
                return
×
1163
        }
×
1164

1165
        c.logger.Infof("New StackSet added %s/%s", stackset.Namespace, stackset.Name)
×
1166
        c.stacksetEvents <- stacksetEvent{
×
1167
                StackSet: stackset.DeepCopy(),
×
1168
        }
×
1169
}
1170

1171
func (c *StackSetController) update(oldObj, newObj interface{}) {
×
1172
        newStackset, ok := newObj.(*zv1.StackSet)
×
1173
        if !ok {
×
1174
                return
×
1175
        }
×
1176

1177
        oldStackset, ok := oldObj.(*zv1.StackSet)
×
1178
        if !ok {
×
1179
                return
×
1180
        }
×
1181

1182
        c.logger.Debugf("StackSet %s/%s changed: %s",
×
1183
                newStackset.Namespace,
×
1184
                newStackset.Name,
×
1185
                cmp.Diff(oldStackset, newStackset, cmpopts.IgnoreUnexported(resource.Quantity{})),
×
1186
        )
×
1187

×
1188
        c.logger.Infof("StackSet updated %s/%s", newStackset.Namespace, newStackset.Name)
×
1189
        c.stacksetEvents <- stacksetEvent{
×
1190
                StackSet: newStackset.DeepCopy(),
×
1191
        }
×
1192
}
1193

1194
func (c *StackSetController) del(obj interface{}) {
×
1195
        stackset, ok := obj.(*zv1.StackSet)
×
1196
        if !ok {
×
1197
                return
×
1198
        }
×
1199

1200
        c.logger.Infof("StackSet deleted %s/%s", stackset.Namespace, stackset.Name)
×
1201
        c.stacksetEvents <- stacksetEvent{
×
1202
                StackSet: stackset.DeepCopy(),
×
1203
                Deleted:  true,
×
1204
        }
×
1205
}
1206

1207
func retryUpdate(updateFn func(retry bool) error) error {
×
1208
        retry := false
×
1209
        for {
×
1210
                err := updateFn(retry)
×
1211
                if err != nil {
×
1212
                        if errors.IsConflict(err) {
×
1213
                                retry = true
×
1214
                                continue
×
1215
                        }
1216
                        return err
×
1217
                }
1218
                return nil
×
1219
        }
1220
}
1221

1222
// ReconcileStatuses reconciles the statuses of StackSets and Stacks.
1223
func (c *StackSetController) ReconcileStatuses(ctx context.Context, ssc *core.StackSetContainer) error {
×
1224
        for _, sc := range ssc.StackContainers {
×
1225
                stack := sc.Stack.DeepCopy()
×
1226
                status := *sc.GenerateStackStatus()
×
1227
                err := retryUpdate(func(retry bool) error {
×
1228
                        if retry {
×
1229
                                updated, err := c.client.ZalandoV1().Stacks(sc.Namespace()).Get(ctx, stack.Name, metav1.GetOptions{})
×
1230
                                if err != nil {
×
1231
                                        return err
×
1232
                                }
×
1233
                                stack = updated
×
1234
                        }
1235
                        if !equality.Semantic.DeepEqual(status, stack.Status) {
×
1236
                                stack.Status = status
×
1237
                                _, err := c.client.ZalandoV1().Stacks(sc.Namespace()).UpdateStatus(ctx, stack, metav1.UpdateOptions{})
×
1238
                                return err
×
1239
                        }
×
1240
                        return nil
×
1241
                })
1242
                if err != nil {
×
1243
                        return c.errorEventf(sc.Stack, "FailedUpdateStackStatus", err)
×
1244
                }
×
1245
        }
1246

1247
        stackset := ssc.StackSet.DeepCopy()
×
1248
        status := *ssc.GenerateStackSetStatus()
×
1249
        err := retryUpdate(func(retry bool) error {
×
1250
                if retry {
×
1251
                        updated, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).Get(ctx, ssc.StackSet.Name, metav1.GetOptions{})
×
1252
                        if err != nil {
×
1253
                                return err
×
1254
                        }
×
1255
                        stackset = updated
×
1256
                }
1257
                if !equality.Semantic.DeepEqual(status, stackset.Status) {
×
1258
                        stackset.Status = status
×
1259
                        _, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, stackset, metav1.UpdateOptions{})
×
1260
                        return err
×
1261
                }
×
1262
                return nil
×
1263
        })
1264
        if err != nil {
×
1265
                return c.errorEventf(ssc.StackSet, "FailedUpdateStackSetStatus", err)
×
1266
        }
×
1267
        return nil
×
1268
}
1269

1270
// ReconcileTrafficSegments updates the traffic segments according to the actual
1271
// traffic weight of each stack.
1272
//
1273
// Returns the ordered list of Trafic Segments that need to be updated.
1274
func (c *StackSetController) ReconcileTrafficSegments(
1275
        ctx context.Context,
1276
        ssc *core.StackSetContainer,
1277
) ([]types.UID, error) {
×
1278
        // Compute segments
×
1279
        toUpdate, err := ssc.ComputeTrafficSegments()
×
1280
        if err != nil {
×
1281
                return nil, c.errorEventf(ssc.StackSet, "FailedManageSegments", err)
×
1282
        }
×
1283

1284
        return toUpdate, nil
×
1285
}
1286

1287
// CreateCurrentStack creates a new Stack object for the current stack, if needed
1288
func (c *StackSetController) CreateCurrentStack(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
1289
        newStack, newStackVersion := ssc.NewStack(c.config.ForwardSupportEnabled)
1✔
1290
        if newStack == nil {
2✔
1291
                return nil
1✔
1292
        }
1✔
1293

1294
        if c.config.ConfigMapSupportEnabled || c.config.SecretSupportEnabled {
2✔
1295
                // ensure that ConfigurationResources are prefixed by Stack name.
1✔
1296
                if err := validateAllConfigurationResourcesNames(newStack.Stack); err != nil {
1✔
1297
                        return err
×
1298
                }
×
1299
        }
1300

1301
        created, err := c.client.ZalandoV1().Stacks(newStack.Namespace()).Create(ctx, newStack.Stack, metav1.CreateOptions{})
1✔
1302
        if err != nil {
1✔
1303
                return err
×
1304
        }
×
1305
        fixupStackTypeMeta(created)
1✔
1306

1✔
1307
        c.recorder.Eventf(
1✔
1308
                ssc.StackSet,
1✔
1309
                v1.EventTypeNormal,
1✔
1310
                "CreatedStack",
1✔
1311
                "Created stack %s",
1✔
1312
                newStack.Name(),
1✔
1313
        )
1✔
1314

1✔
1315
        // Persist ObservedStackVersion in the status
1✔
1316
        updated := ssc.StackSet.DeepCopy()
1✔
1317
        updated.Status.ObservedStackVersion = newStackVersion
1✔
1318

1✔
1319
        result, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, updated, metav1.UpdateOptions{})
1✔
1320
        if err != nil {
1✔
1321
                return err
×
1322
        }
×
1323
        fixupStackSetTypeMeta(result)
1✔
1324
        ssc.StackSet = result
1✔
1325

1✔
1326
        ssc.StackContainers[created.UID] = &core.StackContainer{
1✔
1327
                Stack:          created,
1✔
1328
                PendingRemoval: false,
1✔
1329
                Resources:      core.StackResources{},
1✔
1330
        }
1✔
1331
        return nil
1✔
1332
}
1333

1334
// CleanupOldStacks deletes stacks that are no longer needed.
1335
func (c *StackSetController) CleanupOldStacks(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
1336
        for _, sc := range ssc.StackContainers {
2✔
1337
                if !sc.PendingRemoval {
2✔
1338
                        continue
1✔
1339
                }
1340

1341
                stack := sc.Stack
1✔
1342
                err := c.client.ZalandoV1().Stacks(stack.Namespace).Delete(ctx, stack.Name, metav1.DeleteOptions{})
1✔
1343
                if err != nil {
1✔
1344
                        return c.errorEventf(ssc.StackSet, "FailedDeleteStack", err)
×
1345
                }
×
1346
                c.recorder.Eventf(
1✔
1347
                        ssc.StackSet,
1✔
1348
                        v1.EventTypeNormal,
1✔
1349
                        "DeletedExcessStack",
1✔
1350
                        "Deleted excess stack %s",
1✔
1351
                        stack.Name)
1✔
1352
        }
1353

1354
        return nil
1✔
1355
}
1356

1357
// AddUpdateStackSetIngress reconciles the Ingress but never deletes it, it returns the existing/new Ingress
1358
func (c *StackSetController) AddUpdateStackSetIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup, ingress *networking.Ingress) (*networking.Ingress, error) {
×
1359
        // Ingress removed, handled outside
×
1360
        if ingress == nil {
×
1361
                return existing, nil
×
1362
        }
×
1363

1364
        if existing == nil {
×
1365
                if ingress.Annotations == nil {
×
1366
                        ingress.Annotations = make(map[string]string)
×
1367
                }
×
1368
                ingress.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
×
1369

×
1370
                createdIng, err := c.client.NetworkingV1().Ingresses(ingress.Namespace).Create(ctx, ingress, metav1.CreateOptions{})
×
1371
                if err != nil {
×
1372
                        return nil, err
×
1373
                }
×
1374
                c.recorder.Eventf(
×
1375
                        stackset,
×
1376
                        v1.EventTypeNormal,
×
1377
                        "CreatedIngress",
×
1378
                        "Created Ingress %s",
×
1379
                        ingress.Name)
×
1380
                return createdIng, nil
×
1381
        }
1382

1383
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
×
1384
        if existingHaveUpdateTimeStamp {
×
1385
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
×
1386
        }
×
1387

1388
        // Check if we need to update the Ingress
1389
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(ingress.Spec, existing.Spec) &&
×
1390
                equality.Semantic.DeepEqual(ingress.Annotations, existing.Annotations) &&
×
1391
                equality.Semantic.DeepEqual(ingress.Labels, existing.Labels) {
×
1392
                // add the annotation back after comparing
×
1393
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
×
1394
                return existing, nil
×
1395
        }
×
1396

1397
        updated := existing.DeepCopy()
×
1398
        updated.Spec = ingress.Spec
×
1399
        if ingress.Annotations != nil {
×
1400
                updated.Annotations = ingress.Annotations
×
1401
        } else {
×
1402
                updated.Annotations = make(map[string]string)
×
1403
        }
×
1404
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
×
1405

×
1406
        updated.Labels = ingress.Labels
×
1407

×
1408
        createdIngress, err := c.client.NetworkingV1().Ingresses(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
×
1409
        if err != nil {
×
1410
                return nil, err
×
1411
        }
×
1412
        c.recorder.Eventf(
×
1413
                stackset,
×
1414
                v1.EventTypeNormal,
×
1415
                "UpdatedIngress",
×
1416
                "Updated Ingress %s",
×
1417
                ingress.Name)
×
1418
        return createdIngress, nil
×
1419
}
1420

1421
// AddUpdateStackSetRouteGroup reconciles the RouteGroup but never deletes it, it returns the existing/new RouteGroup
1422
func (c *StackSetController) AddUpdateStackSetRouteGroup(ctx context.Context, stackset *zv1.StackSet, existing *rgv1.RouteGroup, ingress *networking.Ingress, rg *rgv1.RouteGroup) (*rgv1.RouteGroup, error) {
×
1423
        // RouteGroup removed, handled outside
×
1424
        if rg == nil {
×
1425
                return existing, nil
×
1426
        }
×
1427

1428
        // Create new RouteGroup
1429
        if existing == nil {
×
1430
                if rg.Annotations == nil {
×
1431
                        rg.Annotations = make(map[string]string)
×
1432
                }
×
1433
                rg.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
×
1434

×
1435
                createdRg, err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Create(ctx, rg, metav1.CreateOptions{})
×
1436
                if err != nil {
×
1437
                        return nil, err
×
1438
                }
×
1439
                c.recorder.Eventf(
×
1440
                        stackset,
×
1441
                        v1.EventTypeNormal,
×
1442
                        "CreatedRouteGroup",
×
1443
                        "Created RouteGroup %s",
×
1444
                        rg.Name)
×
1445
                return createdRg, nil
×
1446
        }
1447

1448
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
×
1449
        if existingHaveUpdateTimeStamp {
×
1450
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
×
1451
        }
×
1452

1453
        // Check if we need to update the RouteGroup
1454
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(rg.Spec, existing.Spec) &&
×
1455
                equality.Semantic.DeepEqual(rg.Annotations, existing.Annotations) &&
×
1456
                equality.Semantic.DeepEqual(rg.Labels, existing.Labels) {
×
1457
                // add the annotation back after comparing
×
1458
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
×
1459
                return existing, nil
×
1460
        }
×
1461

1462
        updated := existing.DeepCopy()
×
1463
        updated.Spec = rg.Spec
×
1464
        if rg.Annotations != nil {
×
1465
                updated.Annotations = rg.Annotations
×
1466
        } else {
×
1467
                updated.Annotations = make(map[string]string)
×
1468
        }
×
1469
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
×
1470

×
1471
        updated.Labels = rg.Labels
×
1472

×
1473
        createdRg, err := c.client.RouteGroupV1().RouteGroups(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
×
1474
        if err != nil {
×
1475
                return nil, err
×
1476
        }
×
1477
        c.recorder.Eventf(
×
1478
                stackset,
×
1479
                v1.EventTypeNormal,
×
1480
                "UpdatedRouteGroup",
×
1481
                "Updated RouteGroup %s",
×
1482
                rg.Name)
×
1483
        return createdRg, nil
×
1484
}
1485

1486
// RecordTrafficSwitch records an event detailing when switches in traffic to
1487
// Stacks, only when there are changes to record.
1488
func (c *StackSetController) RecordTrafficSwitch(ctx context.Context, ssc *core.StackSetContainer) error {
×
1489
        trafficChanges := ssc.TrafficChanges()
×
1490
        if len(trafficChanges) != 0 {
×
1491
                var changeMessages []string
×
1492
                for _, change := range trafficChanges {
×
1493
                        changeMessages = append(changeMessages, change.String())
×
1494
                }
×
1495

1496
                c.recorder.Eventf(
×
1497
                        ssc.StackSet,
×
1498
                        v1.EventTypeNormal,
×
1499
                        "TrafficSwitched",
×
1500
                        "Switched traffic: %s",
×
1501
                        strings.Join(changeMessages, ", "))
×
1502
        }
1503

1504
        return nil
×
1505
}
1506

1507
func (c *StackSetController) ReconcileStackSetDesiredTraffic(ctx context.Context, existing *zv1.StackSet, generateUpdated func() []*zv1.DesiredTraffic) error {
1✔
1508
        updatedTraffic := generateUpdated()
1✔
1509

1✔
1510
        if equality.Semantic.DeepEqual(existing.Spec.Traffic, updatedTraffic) {
1✔
1511
                return nil
×
1512
        }
×
1513

1514
        updated := existing.DeepCopy()
1✔
1515
        updated.Spec.Traffic = updatedTraffic
1✔
1516

1✔
1517
        _, err := c.client.ZalandoV1().StackSets(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
1518
        if err != nil {
1✔
1519
                return err
×
1520
        }
×
1521
        c.recorder.Eventf(
1✔
1522
                updated,
1✔
1523
                v1.EventTypeNormal,
1✔
1524
                "UpdatedStackSet",
1✔
1525
                "Updated StackSet %s",
1✔
1526
                updated.Name)
1✔
1527
        return nil
1✔
1528
}
1529

1530
func (c *StackSetController) ReconcileStackResources(ctx context.Context, ssc *core.StackSetContainer, sc *core.StackContainer) error {
×
1531
        err := c.ReconcileStackIngress(ctx, sc.Stack, sc.Resources.Ingress, sc.GenerateIngress)
×
1532
        if err != nil {
×
1533
                return c.errorEventf(sc.Stack, "FailedManageIngress", err)
×
1534
        }
×
1535

1536
        err = c.ReconcileStackIngress(
×
1537
                ctx,
×
1538
                sc.Stack,
×
1539
                sc.Resources.IngressSegment,
×
1540
                sc.GenerateIngressSegment,
×
1541
        )
×
1542
        if err != nil {
×
1543
                return c.errorEventf(sc.Stack, "FailedManageIngressSegment", err)
×
1544
        }
×
1545

1546
        if c.config.RouteGroupSupportEnabled {
×
1547
                err = c.ReconcileStackRouteGroup(ctx, sc.Stack, sc.Resources.RouteGroup, sc.GenerateRouteGroup)
×
1548
                if err != nil {
×
1549
                        return c.errorEventf(sc.Stack, "FailedManageRouteGroup", err)
×
1550
                }
×
1551

1552
                err = c.ReconcileStackRouteGroup(
×
1553
                        ctx,
×
1554
                        sc.Stack,
×
1555
                        sc.Resources.RouteGroupSegment,
×
1556
                        sc.GenerateRouteGroupSegment,
×
1557
                )
×
1558
                if err != nil {
×
1559
                        return c.errorEventf(
×
1560
                                sc.Stack,
×
1561
                                "FailedManageRouteGroupSegment",
×
1562
                                err,
×
1563
                        )
×
1564
                }
×
1565
        }
1566

1567
        if c.config.ConfigMapSupportEnabled {
×
1568
                err := c.ReconcileStackConfigMapRefs(ctx, sc.Stack, sc.UpdateObjectMeta)
×
1569
                if err != nil {
×
1570
                        return c.errorEventf(sc.Stack, "FailedManageConfigMapRefs", err)
×
1571
                }
×
1572
        }
1573

1574
        if c.config.SecretSupportEnabled {
×
1575
                err := c.ReconcileStackSecretRefs(ctx, sc.Stack, sc.UpdateObjectMeta)
×
1576
                if err != nil {
×
1577
                        return c.errorEventf(sc.Stack, "FailedManageSecretRefs", err)
×
1578
                }
×
1579
        }
1580

1581
        if c.config.PcsSupportEnabled {
×
1582
                err = c.ReconcileStackPlatformCredentialsSets(
×
1583
                        ctx,
×
1584
                        sc.Stack,
×
1585
                        sc.Resources.PlatformCredentialsSets,
×
1586
                        sc.GeneratePlatformCredentialsSet,
×
1587
                )
×
1588
                if err != nil {
×
1589
                        return c.errorEventf(sc.Stack, "FailedManagePlatformCredentialsSet", err)
×
1590
                }
×
1591
        }
1592

1593
        err = c.ReconcileStackDeployment(ctx, sc.Stack, sc.Resources.Deployment, sc.GenerateDeployment)
×
1594
        if err != nil {
×
1595
                return c.errorEventf(sc.Stack, "FailedManageDeployment", err)
×
1596
        }
×
1597

1598
        hpaGenerator := sc.GenerateHPA
×
1599
        err = c.ReconcileStackHPA(ctx, sc.Stack, sc.Resources.HPA, hpaGenerator)
×
1600
        if err != nil {
×
1601
                return c.errorEventf(sc.Stack, "FailedManageHPA", err)
×
1602
        }
×
1603

1604
        err = c.ReconcileStackService(ctx, sc.Stack, sc.Resources.Service, sc.GenerateService)
×
1605
        if err != nil {
×
1606
                return c.errorEventf(sc.Stack, "FailedManageService", err)
×
1607
        }
×
1608

1609
        return nil
×
1610
}
1611

1612
// ReconcileStackSet reconciles all the things from a stackset
1613
func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *core.StackSetContainer) (err error) {
×
1614
        defer func() {
×
1615
                if r := recover(); r != nil {
×
1616
                        c.metricsReporter.ReportPanic()
×
1617
                        c.stacksetLogger(container).Errorf("Encountered a panic while processing a stackset: %v\n%s", r, debug.Stack())
×
1618
                        err = fmt.Errorf("panic: %v", r)
×
1619
                }
×
1620
        }()
1621

NEW
1622
        var errors []error
×
NEW
1623

×
1624
        // Create current stack, if needed. Proceed on errors.
×
1625
        err = c.CreateCurrentStack(ctx, container)
×
1626
        if err != nil {
×
1627
                err = c.errorEventf(container.StackSet, "FailedCreateStack", err)
×
1628
                c.stacksetLogger(container).Errorf("Unable to create stack: %v", err)
×
NEW
1629
                errors = append(errors, err)
×
UNCOV
1630
        }
×
1631

1632
        // Update statuses from external resources (ingresses, deployments, etc). Abort on errors.
1633
        err = container.UpdateFromResources()
×
1634
        if err != nil {
×
NEW
1635
                c.recorder.Eventf(
×
NEW
1636
                        container.StackSet,
×
NEW
1637
                        v1.EventTypeWarning,
×
NEW
1638
                        "FailedUpdateFromResources",
×
NEW
1639
                        "Failed to update from resources: "+err.Error())
×
1640
                return err
×
1641
        }
×
1642

1643
        // Update the stacks with the currently selected traffic reconciler. Proceed on errors.
1644
        err = container.ManageTraffic(time.Now())
×
1645
        if err != nil {
×
1646
                c.stacksetLogger(container).Errorf("Traffic reconciliation failed: %v", err)
×
1647
                c.recorder.Eventf(
×
1648
                        container.StackSet,
×
1649
                        v1.EventTypeWarning,
×
1650
                        "TrafficNotSwitched",
×
1651
                        "Failed to switch traffic: "+err.Error())
×
NEW
1652
                errors = append(errors, err)
×
UNCOV
1653
        }
×
1654

1655
        // Mark stacks that should be removed
1656
        container.MarkExpiredStacks()
×
1657

×
1658
        // Update traffic segments. Proceed on errors.
×
1659
        segsInOrder, err := c.ReconcileTrafficSegments(ctx, container)
×
1660
        if err != nil {
×
1661
                err = c.errorEventf(
×
1662
                        container.StackSet,
×
1663
                        reasonFailedManageStackSet,
×
1664
                        err,
×
1665
                )
×
1666
                c.stacksetLogger(container).Errorf(
×
1667
                        "Unable to reconcile traffic segments: %v",
×
1668
                        err,
×
1669
                )
×
NEW
1670
                errors = append(errors, err)
×
UNCOV
1671
        }
×
1672

1673
        // Reconcile stack resources. Proceed on errors.
1674
        reconciledStacks := map[types.UID]bool{}
×
1675
        for _, id := range segsInOrder {
×
1676
                reconciledStacks[id] = true
×
1677
                sc := container.StackContainers[id]
×
1678
                err = c.ReconcileStackResources(ctx, container, sc)
×
1679
                if err != nil {
×
1680
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
1681
                        c.stackLogger(container, sc).Errorf(
×
1682
                                "Unable to reconcile stack resources: %v",
×
1683
                                err,
×
1684
                        )
×
NEW
1685
                        errors = append(errors, err)
×
UNCOV
1686
                }
×
1687
        }
1688

1689
        for k, sc := range container.StackContainers {
×
1690
                if reconciledStacks[k] {
×
1691
                        continue
×
1692
                }
1693

1694
                err = c.ReconcileStackResources(ctx, container, sc)
×
1695
                if err != nil {
×
1696
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
1697
                        c.stackLogger(container, sc).Errorf("Unable to reconcile stack resources: %v", err)
×
NEW
1698
                        errors = append(errors, err)
×
UNCOV
1699
                }
×
1700
        }
1701

1702
        // Reconcile stackset resources (update ingress and/or routegroups). Proceed on errors.
1703
        err = c.RecordTrafficSwitch(ctx, container)
×
1704
        if err != nil {
×
1705
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1706
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset resources: %v", err)
×
NEW
1707
                errors = append(errors, err)
×
UNCOV
1708
        }
×
1709

1710
        // Reconcile desired traffic in the stackset. Proceed on errors.
1711
        err = c.ReconcileStackSetDesiredTraffic(ctx, container.StackSet, container.GenerateStackSetTraffic)
×
1712
        if err != nil {
×
1713
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1714
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset traffic: %v", err)
×
NEW
1715
                errors = append(errors, err)
×
UNCOV
1716
        }
×
1717

1718
        // Delete old stacks. Proceed on errors.
1719
        err = c.CleanupOldStacks(ctx, container)
×
1720
        if err != nil {
×
1721
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1722
                c.stacksetLogger(container).Errorf("Unable to delete old stacks: %v", err)
×
NEW
1723
                errors = append(errors, err)
×
UNCOV
1724
        }
×
1725

1726
        // Update statuses.
1727
        err = c.ReconcileStatuses(ctx, container)
×
1728
        if err != nil {
×
NEW
1729
                c.recorder.Eventf(
×
NEW
1730
                        container.StackSet,
×
NEW
1731
                        v1.EventTypeWarning,
×
NEW
1732
                        "FailedUpdateStatuses",
×
NEW
1733
                        "Failed to update statuses: "+err.Error())
×
1734
                return err
×
1735
        }
×
1736

NEW
1737
        if len(errors) > 0 {
×
NEW
1738
                return fmt.Errorf("encountered %d errors during reconciliation, see events/logs for details", len(errors))
×
NEW
1739
        }
×
1740

UNCOV
1741
        return nil
×
1742
}
1743

1744
// getResetMinReplicasDelay parses and returns the reset delay if set in the
1745
// stackset annotation.
1746
func getResetMinReplicasDelay(annotations map[string]string) (time.Duration, bool) {
1✔
1747
        resetDelayStr, ok := annotations[ResetHPAMinReplicasDelayAnnotationKey]
1✔
1748
        if !ok {
2✔
1749
                return 0, false
1✔
1750
        }
1✔
1751
        resetDelay, err := time.ParseDuration(resetDelayStr)
1✔
1752
        if err != nil {
1✔
1753
                return 0, false
×
1754
        }
×
1755
        return resetDelay, true
1✔
1756
}
1757

1758
func fixupStackSetTypeMeta(stackset *zv1.StackSet) {
1✔
1759
        // set TypeMeta manually because of this bug:
1✔
1760
        // https://github.com/kubernetes/client-go/issues/308
1✔
1761
        stackset.APIVersion = core.APIVersion
1✔
1762
        stackset.Kind = core.KindStackSet
1✔
1763
}
1✔
1764

1765
func fixupStackTypeMeta(stack *zv1.Stack) {
1✔
1766
        // set TypeMeta manually because of this bug:
1✔
1767
        // https://github.com/kubernetes/client-go/issues/308
1✔
1768
        stack.APIVersion = core.APIVersion
1✔
1769
        stack.Kind = core.KindStack
1✔
1770
}
1✔
1771

1772
// validateConfigurationResourcesNames returns an error if any ConfigurationResource
1773
// name is not prefixed by Stack name.
1774
func validateAllConfigurationResourcesNames(stack *zv1.Stack) error {
1✔
1775
        for _, rsc := range stack.Spec.ConfigurationResources {
1✔
1776
                if err := validateConfigurationResourceName(stack.Name, rsc.GetName()); err != nil {
×
1777
                        return err
×
1778
                }
×
1779
        }
1780
        return nil
1✔
1781
}
1782

1783
// validateConfigurationResourceName returns an error if specific resource
1784
// name is not prefixed by Stack name.
1785
func validateConfigurationResourceName(stack string, rsc string) error {
1✔
1786
        if !strings.HasPrefix(rsc, stack) {
2✔
1787
                return fmt.Errorf(configurationResourceNameError, rsc, stack)
1✔
1788
        }
1✔
1789
        return nil
1✔
1790
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc