• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando-incubator / stackset-controller / 21063969266

16 Jan 2026 10:43AM UTC coverage: 46.538% (-3.4%) from 49.897%
21063969266

Pull #723

github

mikkeloscar
Refactor controller queue logic

Signed-off-by: Mikkel Oscar Lyderik Larsen <mikkel.larsen@zalando.de>
Pull Request #723: Refactor controller queue logic

6 of 394 new or added lines in 4 files covered. (1.52%)

11 existing lines in 2 files now uncovered.

2668 of 5733 relevant lines covered (46.54%)

0.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

24.24
/controller/stackset.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "net/http"
7
        "runtime/debug"
8
        "strings"
9
        "sync"
10
        "time"
11

12
        "github.com/google/go-cmp/cmp"
13
        "github.com/google/go-cmp/cmp/cmpopts"
14
        "github.com/heptiolabs/healthcheck"
15
        "github.com/prometheus/client_golang/prometheus"
16
        log "github.com/sirupsen/logrus"
17
        rgv1 "github.com/szuecs/routegroup-client/apis/zalando.org/v1"
18
        routegroupinformers "github.com/szuecs/routegroup-client/client/informers/externalversions"
19
        rginformerv1 "github.com/szuecs/routegroup-client/client/informers/externalversions/zalando.org/v1"
20
        zv1 "github.com/zalando-incubator/stackset-controller/pkg/apis/zalando.org/v1"
21
        stacksetinformers "github.com/zalando-incubator/stackset-controller/pkg/client/informers/externalversions"
22
        stacksetinformerv1 "github.com/zalando-incubator/stackset-controller/pkg/client/informers/externalversions/zalando.org/v1"
23
        "github.com/zalando-incubator/stackset-controller/pkg/clientset"
24
        "github.com/zalando-incubator/stackset-controller/pkg/core"
25
        "github.com/zalando-incubator/stackset-controller/pkg/recorder"
26
        "golang.org/x/sync/errgroup"
27
        "golang.org/x/time/rate"
28
        appsv1 "k8s.io/api/apps/v1"
29
        autoscalingv2 "k8s.io/api/autoscaling/v2"
30
        v1 "k8s.io/api/core/v1"
31
        networking "k8s.io/api/networking/v1"
32
        "k8s.io/apimachinery/pkg/api/equality"
33
        "k8s.io/apimachinery/pkg/api/errors"
34
        "k8s.io/apimachinery/pkg/api/resource"
35
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36
        "k8s.io/apimachinery/pkg/fields"
37
        "k8s.io/apimachinery/pkg/labels"
38
        "k8s.io/apimachinery/pkg/runtime"
39
        "k8s.io/apimachinery/pkg/types"
40
        "k8s.io/apimachinery/pkg/util/wait"
41
        "k8s.io/client-go/informers"
42
        appsinformerv1 "k8s.io/client-go/informers/apps/v1"
43
        autoscalinginformerv2 "k8s.io/client-go/informers/autoscaling/v2"
44
        coreinformer "k8s.io/client-go/informers/core/v1"
45
        ingressinformerv1 "k8s.io/client-go/informers/networking/v1"
46
        "k8s.io/client-go/tools/cache"
47
        kube_record "k8s.io/client-go/tools/record"
48
        "k8s.io/client-go/util/workqueue"
49
)
50

51
const (
52
        PrescaleStacksAnnotationKey               = "alpha.stackset-controller.zalando.org/prescale-stacks"
53
        ResetHPAMinReplicasDelayAnnotationKey     = "alpha.stackset-controller.zalando.org/reset-hpa-min-replicas-delay"
54
        StacksetControllerControllerAnnotationKey = "stackset-controller.zalando.org/controller"
55
        ControllerLastUpdatedAnnotationKey        = "stackset-controller.zalando.org/updated-timestamp"
56

57
        reasonFailedManageStackSet = "FailedManageStackSet"
58

59
        defaultResetMinReplicasDelay = 10 * time.Minute
60
)
61

62
var configurationResourceNameError = "ConfigurationResource name must be prefixed by Stack name. ConfigurationResource: %s, Stack: %s"
63

64
type controllerInformers struct {
65
        stacksetInformer   stacksetinformerv1.StackSetInformer
66
        stackInformer      stacksetinformerv1.StackInformer
67
        ingressInformer    ingressinformerv1.IngressInformer
68
        routegroupInformer rginformerv1.RouteGroupInformer
69
        deploymentInformer appsinformerv1.DeploymentInformer
70
        serviceInformer    coreinformer.ServiceInformer
71
        hpaInformer        autoscalinginformerv2.HorizontalPodAutoscalerInformer
72
        configMapInformer  coreinformer.ConfigMapInformer
73
        secretInformer     coreinformer.SecretInformer
74
        pcsInformer        stacksetinformerv1.PlatformCredentialsSetInformer
75
}
76

77
// StackSetController is the main controller. It watches for changes to
78
// stackset resources and starts and maintains other controllers per
79
// stackset resource.
80
type StackSetController struct {
81
        logger          *log.Entry
82
        client          clientset.Interface
83
        informers       controllerInformers
84
        config          StackSetConfig
85
        stacksetEvents  chan stacksetEvent
86
        stacksetStore   map[types.UID]zv1.StackSet
87
        recorder        kube_record.EventRecorder
88
        metricsReporter *core.MetricsReporter
89
        HealthReporter  healthcheck.Handler
90
        now             func() string
91
        sync.Mutex
92
        queue workqueue.TypedRateLimitingInterface[types.NamespacedName]
93
}
94

95
type StackSetConfig struct {
96
        Namespace    string
97
        ControllerID string
98

99
        ClusterDomains              []string
100
        BackendWeightsAnnotationKey string
101
        SyncIngressAnnotations      []string
102

103
        ReconcileWorkers int
104
        Interval         time.Duration
105

106
        RouteGroupSupportEnabled bool
107
        ConfigMapSupportEnabled  bool
108
        SecretSupportEnabled     bool
109
        PcsSupportEnabled        bool
110
        ForwardSupportEnabled    bool
111
}
112

113
type stacksetEvent struct {
114
        Deleted  bool
115
        StackSet *zv1.StackSet
116
}
117

118
// eventedError wraps an error that was already exposed as an event to the user
119
type eventedError struct {
120
        err error
121
}
122

123
func (ee *eventedError) Error() string {
×
124
        return ee.err.Error()
×
125
}
×
126

127
func now() string {
×
128
        return time.Now().Format(time.RFC3339)
×
129
}
×
130

131
// NewStackSetController initializes a new StackSetController.
132
func NewStackSetController(
133
        client clientset.Interface,
134
        registry prometheus.Registerer,
135
        config StackSetConfig,
136
) (*StackSetController, error) {
1✔
137
        metricsReporter, err := core.NewMetricsReporter(registry)
1✔
138
        if err != nil {
1✔
139
                return nil, err
×
140
        }
×
141

142
        logger := log.WithField("controller", "stackset")
1✔
143

1✔
144
        if config.ControllerID != "" {
1✔
145
                logger = logger.WithField("controller_id", config.ControllerID)
×
146
        }
×
147

148
        return &StackSetController{
1✔
149
                logger:          logger,
1✔
150
                client:          client,
1✔
151
                config:          config,
1✔
152
                stacksetEvents:  make(chan stacksetEvent, 1),
1✔
153
                stacksetStore:   make(map[types.UID]zv1.StackSet),
1✔
154
                recorder:        recorder.CreateEventRecorder(client),
1✔
155
                metricsReporter: metricsReporter,
1✔
156
                HealthReporter:  healthcheck.NewHandler(),
1✔
157
                now:             now,
1✔
158
                queue: workqueue.NewTypedRateLimitingQueue(workqueue.NewTypedMaxOfRateLimiter(
1✔
159
                        workqueue.NewTypedItemExponentialFailureRateLimiter[types.NamespacedName](5*time.Millisecond, 5*time.Second),
1✔
160
                        // 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
1✔
161
                        &workqueue.TypedBucketRateLimiter[types.NamespacedName]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
1✔
162
                )),
1✔
163
        }, nil
1✔
164
}
165

166
func (c *StackSetController) stacksetLogger(ssc *core.StackSetContainer) *log.Entry {
×
167
        return c.logger.WithFields(map[string]interface{}{
×
168
                "namespace": ssc.StackSet.Namespace,
×
169
                "stackset":  ssc.StackSet.Name,
×
170
        })
×
171
}
×
172

173
func (c *StackSetController) stackLogger(ssc *core.StackSetContainer, sc *core.StackContainer) *log.Entry {
×
174
        return c.logger.WithFields(map[string]interface{}{
×
175
                "namespace": ssc.StackSet.Namespace,
×
176
                "stackset":  ssc.StackSet.Name,
×
177
                "stack":     sc.Name(),
×
178
        })
×
179
}
×
180

181
// Run runs the main loop of the StackSetController. Before the loops it
182
// sets up a watcher to watch StackSet resources. The watch will send
183
// changes over a channel which is polled from the main loop.
184
func (c *StackSetController) Run(ctx context.Context) error {
×
185
        var nextCheck time.Time
×
186

×
187
        // We're not alive if nextCheck is too far in the past
×
188
        c.HealthReporter.AddLivenessCheck("nextCheck", func() error {
×
189
                if time.Since(nextCheck) > 5*c.config.Interval {
×
190
                        return fmt.Errorf("nextCheck too old")
×
191
                }
×
192
                return nil
×
193
        })
194

195
        err := c.startWatch(ctx)
×
196
        if err != nil {
×
197
                return err
×
198
        }
×
199

200
        http.HandleFunc("/healthz", c.HealthReporter.LiveEndpoint)
×
201

×
202
        nextCheck = time.Now().Add(-c.config.Interval)
×
203

×
204
        for {
×
205
                select {
×
206
                case <-time.After(time.Until(nextCheck)):
×
207

×
208
                        nextCheck = time.Now().Add(c.config.Interval)
×
209

×
210
                        stackSetContainers, err := c.collectResources(ctx)
×
211
                        if err != nil {
×
212
                                c.logger.Errorf("Failed to collect resources: %v", err)
×
213
                                continue
×
214
                        }
215

216
                        var reconcileGroup errgroup.Group
×
217
                        reconcileGroup.SetLimit(c.config.ReconcileWorkers)
×
218
                        for stackset, container := range stackSetContainers {
×
219
                                container := container
×
220
                                stackset := stackset
×
221

×
222
                                reconcileGroup.Go(func() error {
×
223
                                        if _, ok := c.stacksetStore[stackset]; ok {
×
224
                                                err := c.ReconcileStackSet(ctx, container)
×
225
                                                if err != nil {
×
226
                                                        c.stacksetLogger(container).Errorf("unable to reconcile a stackset: %v", err)
×
227
                                                        return c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
228
                                                }
×
229
                                        }
230
                                        return nil
×
231
                                })
232
                        }
233

234
                        err = reconcileGroup.Wait()
×
235
                        if err != nil {
×
236
                                c.logger.Errorf("Failed waiting for reconcilers: %v", err)
×
237
                        }
×
238
                        err = c.metricsReporter.Report(stackSetContainers)
×
239
                        if err != nil {
×
240
                                c.logger.Errorf("Failed reporting metrics: %v", err)
×
241
                        }
×
242
                case e := <-c.stacksetEvents:
×
243
                        stackset := *e.StackSet
×
244
                        fixupStackSetTypeMeta(&stackset)
×
245

×
246
                        // update/delete existing entry
×
247
                        if _, ok := c.stacksetStore[stackset.UID]; ok {
×
248
                                if e.Deleted || !c.hasOwnership(&stackset) {
×
249
                                        delete(c.stacksetStore, stackset.UID)
×
250
                                        continue
×
251
                                }
252

253
                                // update stackset entry
254
                                c.stacksetStore[stackset.UID] = stackset
×
255
                                continue
×
256
                        }
257

258
                        // check if stackset should be managed by the controller
259
                        if !c.hasOwnership(&stackset) {
×
260
                                continue
×
261
                        }
262

263
                        c.logger.Infof("Adding entry for StackSet %s/%s", stackset.Namespace, stackset.Name)
×
264
                        c.stacksetStore[stackset.UID] = stackset
×
265
                case <-ctx.Done():
×
266
                        c.logger.Info("Terminating main controller loop.")
×
267
                        return nil
×
268
                }
269
        }
270
}
271

NEW
272
func (c *StackSetController) Run2(ctx context.Context) error {
×
NEW
273
        // defer runtime.HandleCrash()
×
NEW
274
        defer c.queue.ShutDown()
×
NEW
275

×
NEW
276
        // TODO: health
×
NEW
277
        // TODO: metricsReporter
×
NEW
278
        c.logger.Info("Starting Controller")
×
NEW
279

×
NEW
280
        err := c.setupInformers(ctx)
×
NEW
281
        if err != nil {
×
NEW
282
                return fmt.Errorf("failed to setup informers: %v", err)
×
NEW
283
        }
×
284

NEW
285
        for i := 0; i < c.config.ReconcileWorkers; i++ {
×
NEW
286
                go wait.UntilWithContext(ctx, c.worker, time.Second)
×
NEW
287
        }
×
288

NEW
289
        <-ctx.Done()
×
NEW
290
        c.logger.Info("Shutting down controller")
×
NEW
291
        return nil
×
292
}
293

NEW
294
func (c *StackSetController) worker(ctx context.Context) {
×
NEW
295
        for c.processNextItem(ctx) {
×
NEW
296
        }
×
297
}
298

NEW
299
func (c *StackSetController) processNextItem(ctx context.Context) bool {
×
NEW
300
        key, quit := c.queue.Get()
×
NEW
301
        if quit {
×
NEW
302
                return false
×
NEW
303
        }
×
NEW
304
        defer c.queue.Done(key)
×
NEW
305

×
NEW
306
        err := c.syncHandler(ctx, key)
×
NEW
307
        if err == nil {
×
NEW
308
                c.queue.Forget(key)
×
NEW
309
                return true
×
NEW
310
        }
×
311

312
        // runtime.HandleError(fmt.Errorf("sync %q failed: %v", key, err))
NEW
313
        c.queue.AddRateLimited(key)
×
NEW
314
        return true
×
315
}
316

NEW
317
func (c *StackSetController) syncHandler(ctx context.Context, key types.NamespacedName) error {
×
NEW
318
        container, err := c.stacksetContainer(ctx, key)
×
NEW
319
        if err != nil {
×
NEW
320
                return err
×
NEW
321
        }
×
322

NEW
323
        err = c.ReconcileStackSet(ctx, container)
×
NEW
324
        if err != nil {
×
NEW
325
                c.stacksetLogger(container).Errorf("unable to reconcile a stackset: %v", err)
×
NEW
326
                return err
×
NEW
327
        }
×
NEW
328
        return nil
×
329
}
330

331
// collectResources collects resources for all stacksets at once and stores them per StackSet/Stack so that we don't
332
// overload the API requests with unnecessary requests
333
func (c *StackSetController) collectResources(ctx context.Context) (map[types.UID]*core.StackSetContainer, error) {
1✔
334
        stacksets := make(map[types.UID]*core.StackSetContainer, len(c.stacksetStore))
1✔
335
        for uid, stackset := range c.stacksetStore {
2✔
336
                stackset := stackset
1✔
337

1✔
338
                reconciler := core.TrafficReconciler(&core.SimpleTrafficReconciler{})
1✔
339

1✔
340
                // use prescaling logic if enabled with an annotation
1✔
341
                if _, ok := stackset.Annotations[PrescaleStacksAnnotationKey]; ok {
2✔
342
                        resetDelay := defaultResetMinReplicasDelay
1✔
343
                        if resetDelayValue, ok := getResetMinReplicasDelay(stackset.Annotations); ok {
2✔
344
                                resetDelay = resetDelayValue
1✔
345
                        }
1✔
346
                        reconciler = &core.PrescalingTrafficReconciler{
1✔
347
                                ResetHPAMinReplicasTimeout: resetDelay,
1✔
348
                        }
1✔
349
                }
350

351
                stacksetContainer := core.NewContainer(
1✔
352
                        &stackset,
1✔
353
                        reconciler,
1✔
354
                        c.config.BackendWeightsAnnotationKey,
1✔
355
                        c.config.ClusterDomains,
1✔
356
                        c.config.SyncIngressAnnotations,
1✔
357
                )
1✔
358
                stacksets[uid] = stacksetContainer
1✔
359
        }
360

361
        err := c.collectStacks(ctx, stacksets)
1✔
362
        if err != nil {
1✔
363
                return nil, err
×
364
        }
×
365

366
        err = c.collectIngresses(ctx, stacksets)
1✔
367
        if err != nil {
1✔
368
                return nil, err
×
369
        }
×
370

371
        if c.config.RouteGroupSupportEnabled {
2✔
372
                err = c.collectRouteGroups(ctx, stacksets)
1✔
373
                if err != nil {
1✔
374
                        return nil, err
×
375
                }
×
376
        }
377

378
        err = c.collectDeployments(ctx, stacksets)
1✔
379
        if err != nil {
1✔
380
                return nil, err
×
381
        }
×
382

383
        err = c.collectServices(ctx, stacksets)
1✔
384
        if err != nil {
1✔
385
                return nil, err
×
386
        }
×
387

388
        err = c.collectHPAs(ctx, stacksets)
1✔
389
        if err != nil {
1✔
390
                return nil, err
×
391
        }
×
392

393
        if c.config.ConfigMapSupportEnabled {
2✔
394
                err = c.collectConfigMaps(ctx, stacksets)
1✔
395
                if err != nil {
1✔
396
                        return nil, err
×
397
                }
×
398
        }
399

400
        if c.config.SecretSupportEnabled {
2✔
401
                err = c.collectSecrets(ctx, stacksets)
1✔
402
                if err != nil {
1✔
403
                        return nil, err
×
404
                }
×
405
        }
406

407
        if c.config.PcsSupportEnabled {
2✔
408
                err = c.collectPlatformCredentialsSet(ctx, stacksets)
1✔
409
                if err != nil {
1✔
410
                        return nil, err
×
411
                }
×
412
        }
413

414
        return stacksets, nil
1✔
415
}
416

417
func (c *StackSetController) collectIngresses(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
418
        ingresses, err := c.client.NetworkingV1().Ingresses(c.config.Namespace).List(ctx, metav1.ListOptions{})
1✔
419

1✔
420
        if err != nil {
1✔
421
                return fmt.Errorf("failed to list Ingresses: %v", err)
×
422
        }
×
423

424
        for _, i := range ingresses.Items {
2✔
425
                ingress := i
1✔
426
                if uid, ok := getOwnerUID(ingress.ObjectMeta); ok {
2✔
427
                        // stackset ingress
1✔
428
                        if s, ok := stacksets[uid]; ok {
2✔
429
                                s.Ingress = &ingress
1✔
430
                                continue
1✔
431
                        }
432

433
                        // stack ingress
434
                        for _, stackset := range stacksets {
2✔
435
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
436
                                        if strings.HasSuffix(
1✔
437
                                                ingress.ObjectMeta.Name,
1✔
438
                                                core.SegmentSuffix,
1✔
439
                                        ) {
2✔
440
                                                // Traffic Segment
1✔
441
                                                s.Resources.IngressSegment = &ingress
1✔
442
                                        } else {
2✔
443
                                                s.Resources.Ingress = &ingress
1✔
444
                                        }
1✔
445
                                        break
1✔
446
                                }
447
                        }
448
                }
449
        }
450
        return nil
1✔
451
}
452

453
func (c *StackSetController) collectRouteGroups(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
454
        rgs, err := c.client.RouteGroupV1().RouteGroups(c.config.Namespace).List(
1✔
455
                ctx,
1✔
456
                metav1.ListOptions{},
1✔
457
        )
1✔
458
        if err != nil {
1✔
459
                return fmt.Errorf("failed to list RouteGroups: %v", err)
×
460
        }
×
461

462
        for _, rg := range rgs.Items {
2✔
463
                routegroup := rg
1✔
464
                if uid, ok := getOwnerUID(routegroup.ObjectMeta); ok {
2✔
465
                        // stackset routegroups
1✔
466
                        if s, ok := stacksets[uid]; ok {
2✔
467
                                s.RouteGroup = &routegroup
1✔
468
                                continue
1✔
469
                        }
470

471
                        // stack routegroups
472
                        for _, stackset := range stacksets {
2✔
473
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
474
                                        if strings.HasSuffix(
1✔
475
                                                routegroup.ObjectMeta.Name,
1✔
476
                                                core.SegmentSuffix,
1✔
477
                                        ) {
2✔
478
                                                // Traffic Segment
1✔
479
                                                s.Resources.RouteGroupSegment = &routegroup
1✔
480
                                        } else {
2✔
481
                                                s.Resources.RouteGroup = &routegroup
1✔
482
                                        }
1✔
483
                                        break
1✔
484
                                }
485
                        }
486
                }
487
        }
488
        return nil
1✔
489
}
490

491
func (c *StackSetController) collectStacks(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
492
        stacks, err := c.client.ZalandoV1().Stacks(c.config.Namespace).List(ctx, metav1.ListOptions{})
1✔
493
        if err != nil {
1✔
494
                return fmt.Errorf("failed to list Stacks: %v", err)
×
495
        }
×
496

497
        for _, stack := range stacks.Items {
2✔
498
                if uid, ok := getOwnerUID(stack.ObjectMeta); ok {
2✔
499
                        if s, ok := stacksets[uid]; ok {
2✔
500
                                stack := stack
1✔
501
                                fixupStackTypeMeta(&stack)
1✔
502

1✔
503
                                s.StackContainers[stack.UID] = &core.StackContainer{
1✔
504
                                        Stack: &stack,
1✔
505
                                }
1✔
506
                                continue
1✔
507
                        }
508
                }
509
        }
510
        return nil
1✔
511
}
512

513
func (c *StackSetController) collectDeployments(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
514
        deployments, err := c.client.AppsV1().Deployments(c.config.Namespace).List(ctx, metav1.ListOptions{})
1✔
515
        if err != nil {
1✔
516
                return fmt.Errorf("failed to list Deployments: %v", err)
×
517
        }
×
518

519
        for _, d := range deployments.Items {
2✔
520
                deployment := d
1✔
521
                if uid, ok := getOwnerUID(deployment.ObjectMeta); ok {
2✔
522
                        for _, stackset := range stacksets {
2✔
523
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
524
                                        s.Resources.Deployment = &deployment
1✔
525
                                        break
1✔
526
                                }
527
                        }
528
                }
529
        }
530
        return nil
1✔
531
}
532

533
func (c *StackSetController) collectServices(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
534
        services, err := c.client.CoreV1().Services(c.config.Namespace).List(ctx, metav1.ListOptions{})
1✔
535
        if err != nil {
1✔
536
                return fmt.Errorf("failed to list Services: %v", err)
×
537
        }
×
538

539
Items:
1✔
540
        for _, s := range services.Items {
2✔
541
                service := s
1✔
542
                if uid, ok := getOwnerUID(service.ObjectMeta); ok {
2✔
543
                        for _, stackset := range stacksets {
2✔
544
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
545
                                        s.Resources.Service = &service
1✔
546
                                        continue Items
1✔
547
                                }
548

549
                                // service/HPA used to be owned by the deployment for some reason
550
                                for _, stack := range stackset.StackContainers {
2✔
551
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
552
                                                stack.Resources.Service = &service
1✔
553
                                                continue Items
1✔
554
                                        }
555
                                }
556
                        }
557
                }
558
        }
559
        return nil
1✔
560
}
561

562
func (c *StackSetController) collectHPAs(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
563
        hpas, err := c.client.AutoscalingV2().HorizontalPodAutoscalers(c.config.Namespace).List(ctx, metav1.ListOptions{})
1✔
564
        if err != nil {
1✔
565
                return fmt.Errorf("failed to list HPAs: %v", err)
×
566
        }
×
567

568
Items:
1✔
569
        for _, h := range hpas.Items {
2✔
570
                hpa := h
1✔
571
                if uid, ok := getOwnerUID(hpa.ObjectMeta); ok {
2✔
572
                        for _, stackset := range stacksets {
2✔
573
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
574
                                        s.Resources.HPA = &hpa
1✔
575
                                        continue Items
1✔
576
                                }
577

578
                                // service/HPA used to be owned by the deployment for some reason
579
                                for _, stack := range stackset.StackContainers {
2✔
580
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
581
                                                stack.Resources.HPA = &hpa
1✔
582
                                                continue Items
1✔
583
                                        }
584
                                }
585
                        }
586
                }
587
        }
588
        return nil
1✔
589
}
590

591
func (c *StackSetController) collectConfigMaps(
592
        ctx context.Context,
593
        stacksets map[types.UID]*core.StackSetContainer,
594
) error {
1✔
595
        configMaps, err := c.client.CoreV1().ConfigMaps(c.config.Namespace).List(ctx, metav1.ListOptions{})
1✔
596
        if err != nil {
1✔
597
                return fmt.Errorf("failed to list ConfigMaps: %v", err)
×
598
        }
×
599

600
        for _, cm := range configMaps.Items {
2✔
601
                configMap := cm
1✔
602
                if uid, ok := getOwnerUID(configMap.ObjectMeta); ok {
2✔
603
                        for _, stackset := range stacksets {
2✔
604
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
605
                                        s.Resources.ConfigMaps = append(s.Resources.ConfigMaps, &configMap)
1✔
606
                                        break
1✔
607
                                }
608
                        }
609
                }
610
        }
611
        return nil
1✔
612
}
613

614
func (c *StackSetController) collectSecrets(
615
        ctx context.Context,
616
        stacksets map[types.UID]*core.StackSetContainer,
617
) error {
1✔
618
        secrets, err := c.client.CoreV1().Secrets(c.config.Namespace).List(ctx, metav1.ListOptions{})
1✔
619
        if err != nil {
1✔
620
                return fmt.Errorf("failed to list Secrets: %v", err)
×
621
        }
×
622

623
        for _, sct := range secrets.Items {
2✔
624
                secret := sct
1✔
625
                if uid, ok := getOwnerUID(secret.ObjectMeta); ok {
2✔
626
                        for _, stackset := range stacksets {
2✔
627
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
628
                                        s.Resources.Secrets = append(s.Resources.Secrets, &secret)
1✔
629
                                        break
1✔
630
                                }
631
                        }
632
                }
633
        }
634
        return nil
1✔
635
}
636

637
func (c *StackSetController) collectPlatformCredentialsSet(
638
        ctx context.Context,
639
        stacksets map[types.UID]*core.StackSetContainer,
640
) error {
1✔
641
        platformCredentialsSets, err := c.client.ZalandoV1().PlatformCredentialsSets(c.config.Namespace).
1✔
642
                List(ctx, metav1.ListOptions{})
1✔
643
        if err != nil {
1✔
644
                return fmt.Errorf("failed to list PlatformCredentialsSet: %v", err)
×
645
        }
×
646

647
        for _, platformCredentialsSet := range platformCredentialsSets.Items {
1✔
648
                pcs := platformCredentialsSet
×
649
                if uid, ok := getOwnerUID(platformCredentialsSet.ObjectMeta); ok {
×
650
                        for _, stackset := range stacksets {
×
651
                                if s, ok := stackset.StackContainers[uid]; ok {
×
652
                                        s.Resources.PlatformCredentialsSets = append(
×
653
                                                s.Resources.PlatformCredentialsSets,
×
654
                                                &pcs,
×
655
                                        )
×
656
                                        break
×
657
                                }
658
                        }
659
                }
660
        }
661
        return nil
1✔
662
}
663

664
// collectResources collects resources for all stacksets at once and stores them per StackSet/Stack so that we don't
665
// overload the API requests with unnecessary requests
NEW
666
func (c *StackSetController) stacksetContainer(ctx context.Context, key types.NamespacedName) (*core.StackSetContainer, error) {
×
NEW
667
        obj, exists, err := c.informers.stacksetInformer.Informer().GetIndexer().GetByKey(key.String())
×
NEW
668
        if err != nil {
×
NEW
669
                return nil, fmt.Errorf("failed to get StackSet %s from informer: %v", key.String(), err)
×
NEW
670
        }
×
NEW
671
        if !exists {
×
NEW
672
                return nil, fmt.Errorf("StackSet %s not found in informer", key.String())
×
NEW
673
        }
×
674

NEW
675
        stackset, ok := obj.(*zv1.StackSet)
×
NEW
676
        if !ok {
×
NEW
677
                return nil, fmt.Errorf("failed to cast object to StackSet for %s", key.String())
×
NEW
678
        }
×
679

NEW
680
        stackset = stackset.DeepCopy()
×
NEW
681
        fixupStackSetTypeMeta(stackset)
×
NEW
682

×
NEW
683
        reconciler := core.TrafficReconciler(&core.SimpleTrafficReconciler{})
×
NEW
684

×
NEW
685
        // use prescaling logic if enabled with an annotation
×
NEW
686
        if _, ok := stackset.Annotations[PrescaleStacksAnnotationKey]; ok {
×
NEW
687
                resetDelay := defaultResetMinReplicasDelay
×
NEW
688
                if resetDelayValue, ok := getResetMinReplicasDelay(stackset.Annotations); ok {
×
NEW
689
                        resetDelay = resetDelayValue
×
NEW
690
                }
×
NEW
691
                reconciler = &core.PrescalingTrafficReconciler{
×
NEW
692
                        ResetHPAMinReplicasTimeout: resetDelay,
×
NEW
693
                }
×
694
        }
695

NEW
696
        stacksetContainer := core.NewContainer(
×
NEW
697
                stackset,
×
NEW
698
                reconciler,
×
NEW
699
                c.config.BackendWeightsAnnotationKey,
×
NEW
700
                c.config.ClusterDomains,
×
NEW
701
                c.config.SyncIngressAnnotations,
×
NEW
702
        )
×
NEW
703

×
NEW
704
        labelSelector := labels.SelectorFromSet(labels.Set{(core.StacksetHeritageLabelKey): stackset.Name})
×
NEW
705
        err = c.addStacks(stacksetContainer, key, labelSelector)
×
NEW
706
        if err != nil {
×
NEW
707
                return nil, err
×
NEW
708
        }
×
709

NEW
710
        err = c.addIngresses(stacksetContainer, key, labelSelector)
×
NEW
711
        if err != nil {
×
NEW
712
                return nil, err
×
NEW
713
        }
×
714

NEW
715
        if c.config.RouteGroupSupportEnabled {
×
NEW
716
                err = c.addRouteGroups(stacksetContainer, key, labelSelector)
×
NEW
717
                if err != nil {
×
NEW
718
                        return nil, err
×
NEW
719
                }
×
720
        }
721

NEW
722
        err = c.addDeployments(stacksetContainer, key, labelSelector)
×
NEW
723
        if err != nil {
×
NEW
724
                return nil, err
×
NEW
725
        }
×
726

NEW
727
        err = c.addServices(stacksetContainer, key, labelSelector)
×
NEW
728
        if err != nil {
×
NEW
729
                return nil, err
×
NEW
730
        }
×
731

NEW
732
        err = c.addHPAs(stacksetContainer, key, labelSelector)
×
NEW
733
        if err != nil {
×
NEW
734
                return nil, err
×
NEW
735
        }
×
736

NEW
737
        if c.config.ConfigMapSupportEnabled {
×
NEW
738
                err = c.addConfigMaps(stacksetContainer, key, labelSelector)
×
NEW
739
                if err != nil {
×
NEW
740
                        return nil, err
×
NEW
741
                }
×
742
        }
743

NEW
744
        if c.config.SecretSupportEnabled {
×
NEW
745
                err = c.addSecrets(stacksetContainer, key, labelSelector)
×
NEW
746
                if err != nil {
×
NEW
747
                        return nil, err
×
NEW
748
                }
×
749
        }
750

NEW
751
        if c.config.PcsSupportEnabled {
×
NEW
752
                err = c.addPlatformCredentialsSet(stacksetContainer, key, labelSelector)
×
NEW
753
                if err != nil {
×
NEW
754
                        return nil, err
×
NEW
755
                }
×
756
        }
757

NEW
758
        return stacksetContainer, nil
×
759
}
760

NEW
761
func (c *StackSetController) addStacks(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error {
×
NEW
762
        stacks, err := c.informers.stackInformer.Lister().Stacks(key.Namespace).List(labelSelector)
×
NEW
763
        if err != nil {
×
NEW
764
                return fmt.Errorf("failed to list Stacks for StackSet %s: %v", key.String(), err)
×
NEW
765
        }
×
NEW
766
        for _, stack := range stacks {
×
NEW
767
                stack := stack.DeepCopy()
×
NEW
768
                if uid, ok := getOwnerUID(stack.ObjectMeta); ok {
×
NEW
769
                        if uid == stacksetContainer.StackSet.UID {
×
NEW
770
                                fixupStackTypeMeta(stack)
×
NEW
771

×
NEW
772
                                stacksetContainer.StackContainers[stack.UID] = &core.StackContainer{
×
NEW
773
                                        Stack: stack,
×
NEW
774
                                }
×
NEW
775
                        }
×
776
                }
777
        }
NEW
778
        return nil
×
779
}
780

NEW
781
func (c *StackSetController) addIngresses(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error {
×
NEW
782
        ingresses, err := c.informers.ingressInformer.Lister().Ingresses(key.Namespace).List(labelSelector)
×
NEW
783
        if err != nil {
×
NEW
784
                return fmt.Errorf("failed to list Ingresses for StackSet %s: %v", key.String(), err)
×
NEW
785
        }
×
NEW
786
        for _, ingress := range ingresses {
×
NEW
787
                ingress := ingress.DeepCopy()
×
NEW
788
                if uid, ok := getOwnerUID(ingress.ObjectMeta); ok {
×
NEW
789
                        // stackset ingress
×
NEW
790
                        if uid == stacksetContainer.StackSet.UID {
×
NEW
791
                                stacksetContainer.Ingress = ingress
×
NEW
792
                                continue
×
793
                        }
794

795
                        // stack ingress
NEW
796
                        if s, ok := stacksetContainer.StackContainers[uid]; ok {
×
NEW
797
                                if strings.HasSuffix(
×
NEW
798
                                        ingress.ObjectMeta.Name,
×
NEW
799
                                        core.SegmentSuffix,
×
NEW
800
                                ) {
×
NEW
801
                                        // Traffic Segment
×
NEW
802
                                        s.Resources.IngressSegment = ingress
×
NEW
803
                                } else {
×
NEW
804
                                        s.Resources.Ingress = ingress
×
NEW
805
                                }
×
806
                        }
807
                }
808
        }
809

NEW
810
        return nil
×
811
}
812

NEW
813
func (c *StackSetController) addRouteGroups(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error {
×
NEW
814
        routegroups, err := c.informers.routegroupInformer.Lister().RouteGroups(key.Namespace).List(labelSelector)
×
NEW
815
        if err != nil {
×
NEW
816
                return fmt.Errorf("failed to list Ingresses for StackSet %s: %v", key.String(), err)
×
NEW
817
        }
×
NEW
818
        for _, routegroup := range routegroups {
×
NEW
819
                routegroup := routegroup.DeepCopy()
×
NEW
820
                if uid, ok := getOwnerUID(routegroup.ObjectMeta); ok {
×
NEW
821
                        // stackset routegroup
×
NEW
822
                        if uid == stacksetContainer.StackSet.UID {
×
NEW
823
                                stacksetContainer.RouteGroup = routegroup
×
NEW
824
                                continue
×
825
                        }
826

827
                        // stack routegroup
NEW
828
                        if s, ok := stacksetContainer.StackContainers[uid]; ok {
×
NEW
829
                                if strings.HasSuffix(
×
NEW
830
                                        routegroup.ObjectMeta.Name,
×
NEW
831
                                        core.SegmentSuffix,
×
NEW
832
                                ) {
×
NEW
833
                                        // Traffic Segment
×
NEW
834
                                        s.Resources.RouteGroupSegment = routegroup
×
NEW
835
                                } else {
×
NEW
836
                                        s.Resources.RouteGroup = routegroup
×
NEW
837
                                }
×
838
                        }
839
                }
840
        }
841

NEW
842
        return nil
×
843
}
844

NEW
845
func (c *StackSetController) addDeployments(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error {
×
NEW
846
        deployments, err := c.informers.deploymentInformer.Lister().Deployments(key.Namespace).List(labelSelector)
×
NEW
847
        if err != nil {
×
NEW
848
                return fmt.Errorf("failed to list Deployments for StackSet %s: %v", key.String(), err)
×
NEW
849
        }
×
850

NEW
851
        for _, deployment := range deployments {
×
NEW
852
                deployment := deployment.DeepCopy()
×
NEW
853
                if uid, ok := getOwnerUID(deployment.ObjectMeta); ok {
×
NEW
854
                        if s, ok := stacksetContainer.StackContainers[uid]; ok {
×
NEW
855
                                s.Resources.Deployment = deployment
×
NEW
856
                        }
×
857
                }
858
        }
NEW
859
        return nil
×
860
}
861

NEW
862
func (c *StackSetController) addServices(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error {
×
NEW
863
        services, err := c.informers.serviceInformer.Lister().Services(key.Namespace).List(labelSelector)
×
NEW
864
        if err != nil {
×
NEW
865
                return fmt.Errorf("failed to list Services for StackSet %s: %v", key.String(), err)
×
NEW
866
        }
×
867

NEW
868
Items:
×
NEW
869
        for _, service := range services {
×
NEW
870
                service := service.DeepCopy()
×
NEW
871
                if uid, ok := getOwnerUID(service.ObjectMeta); ok {
×
NEW
872
                        if s, ok := stacksetContainer.StackContainers[uid]; ok {
×
NEW
873
                                s.Resources.Service = service
×
NEW
874
                        }
×
875

876
                        // service/HPA used to be owned by the deployment for some reason
877
                        // TODO: check if this can be removed
NEW
878
                        for _, stack := range stacksetContainer.StackContainers {
×
NEW
879
                                if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
×
NEW
880
                                        stack.Resources.Service = service
×
NEW
881
                                        continue Items
×
882
                                }
883
                        }
884
                }
885
        }
NEW
886
        return nil
×
887
}
888

NEW
889
func (c *StackSetController) addHPAs(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error {
×
NEW
890
        hpas, err := c.informers.hpaInformer.Lister().HorizontalPodAutoscalers(key.Namespace).List(labelSelector)
×
NEW
891
        if err != nil {
×
NEW
892
                return fmt.Errorf("failed to list HPAs for StackSet %s: %v", key.String(), err)
×
NEW
893
        }
×
894

NEW
895
Items:
×
NEW
896
        for _, hpa := range hpas {
×
NEW
897
                hpa := hpa.DeepCopy()
×
NEW
898
                if uid, ok := getOwnerUID(hpa.ObjectMeta); ok {
×
NEW
899
                        if s, ok := stacksetContainer.StackContainers[uid]; ok {
×
NEW
900
                                s.Resources.HPA = hpa
×
NEW
901
                        }
×
902

903
                        // service/HPA used to be owned by the deployment for some reason
904
                        // TODO: check if this can be removed
NEW
905
                        for _, stack := range stacksetContainer.StackContainers {
×
NEW
906
                                if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
×
NEW
907
                                        stack.Resources.HPA = hpa
×
NEW
908
                                        continue Items
×
909
                                }
910
                        }
911
                }
912
        }
NEW
913
        return nil
×
914
}
915

NEW
916
func (c *StackSetController) addConfigMaps(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error {
×
NEW
917
        configMaps, err := c.informers.configMapInformer.Lister().ConfigMaps(key.Namespace).List(labelSelector)
×
NEW
918
        if err != nil {
×
NEW
919
                return fmt.Errorf("failed to list ConfigMaps for StackSet %s: %v", key.String(), err)
×
NEW
920
        }
×
921

NEW
922
        for _, configMap := range configMaps {
×
NEW
923
                configMap := configMap.DeepCopy()
×
NEW
924
                if uid, ok := getOwnerUID(configMap.ObjectMeta); ok {
×
NEW
925
                        if s, ok := stacksetContainer.StackContainers[uid]; ok {
×
NEW
926
                                s.Resources.ConfigMaps = append(s.Resources.ConfigMaps, configMap)
×
NEW
927
                        }
×
928
                }
929
        }
NEW
930
        return nil
×
931
}
932

NEW
933
func (c *StackSetController) addSecrets(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error {
×
NEW
934
        secrets, err := c.informers.secretInformer.Lister().Secrets(key.Namespace).List(labelSelector)
×
NEW
935
        if err != nil {
×
NEW
936
                return fmt.Errorf("failed to list Secrets for StackSet %s: %v", key.String(), err)
×
NEW
937
        }
×
938

NEW
939
        for _, secret := range secrets {
×
NEW
940
                secret := secret.DeepCopy()
×
NEW
941
                if uid, ok := getOwnerUID(secret.ObjectMeta); ok {
×
NEW
942
                        if s, ok := stacksetContainer.StackContainers[uid]; ok {
×
NEW
943
                                s.Resources.Secrets = append(s.Resources.Secrets, secret)
×
NEW
944
                        }
×
945
                }
946
        }
NEW
947
        return nil
×
948
}
949

NEW
950
func (c *StackSetController) addPlatformCredentialsSet(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error {
×
NEW
951
        platformCredentialsSets, err := c.informers.pcsInformer.Lister().PlatformCredentialsSets(key.Namespace).List(labelSelector)
×
NEW
952
        if err != nil {
×
NEW
953
                return fmt.Errorf("failed to list PlatformCredentialsSets for StackSet %s: %v", key.String(), err)
×
NEW
954
        }
×
955

NEW
956
        for _, pcs := range platformCredentialsSets {
×
NEW
957
                pcs := pcs.DeepCopy()
×
NEW
958
                if uid, ok := getOwnerUID(pcs.ObjectMeta); ok {
×
NEW
959
                        if s, ok := stacksetContainer.StackContainers[uid]; ok {
×
NEW
960
                                s.Resources.PlatformCredentialsSets = append(
×
NEW
961
                                        s.Resources.PlatformCredentialsSets,
×
NEW
962
                                        pcs,
×
NEW
963
                                )
×
NEW
964
                        }
×
965
                }
966
        }
NEW
967
        return nil
×
968
}
969

970
func getOwnerUID(objectMeta metav1.ObjectMeta) (types.UID, bool) {
1✔
971
        if len(objectMeta.OwnerReferences) == 1 {
2✔
972
                return objectMeta.OwnerReferences[0].UID, true
1✔
973
        }
1✔
974
        return "", false
1✔
975
}
976

977
func (c *StackSetController) errorEventf(object runtime.Object, reason string, err error) error {
×
978
        switch err.(type) {
×
979
        case *eventedError:
×
980
                // already notified
×
981
                return err
×
982
        default:
×
983
                c.recorder.Eventf(
×
984
                        object,
×
985
                        v1.EventTypeWarning,
×
986
                        reason,
×
987
                        err.Error())
×
988
                return &eventedError{err: err}
×
989
        }
990
}
991

992
// hasOwnership returns true if the controller is the "owner" of the stackset.
993
// Whether it's owner is determined by the value of the
994
// 'stackset-controller.zalando.org/controller' annotation. If the value
995
// matches the controllerID then it owns it, or if the controllerID is
996
// "" and there's no annotation set.
997
func (c *StackSetController) hasOwnership(stackset *zv1.StackSet) bool {
×
998
        if stackset.Annotations != nil {
×
999
                if owner, ok := stackset.Annotations[StacksetControllerControllerAnnotationKey]; ok {
×
1000
                        return owner == c.config.ControllerID
×
1001
                }
×
1002
        }
1003
        return c.config.ControllerID == ""
×
1004
}
1005

1006
func (c *StackSetController) startWatch(ctx context.Context) error {
×
1007
        informer := cache.NewSharedIndexInformer(
×
1008
                cache.NewListWatchFromClient(c.client.ZalandoV1().RESTClient(), "stacksets", c.config.Namespace, fields.Everything()),
×
1009
                &zv1.StackSet{},
×
1010
                0, // skip resync
×
1011
                cache.Indexers{},
×
1012
        )
×
1013

×
1014
        _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
1015
                AddFunc:    c.add,
×
1016
                UpdateFunc: c.update,
×
1017
                DeleteFunc: c.del,
×
1018
        })
×
1019
        if err != nil {
×
1020
                return fmt.Errorf("failed to add event handler: %w", err)
×
1021
        }
×
1022

1023
        go informer.Run(ctx.Done())
×
1024
        if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
×
1025
                return fmt.Errorf("timed out waiting for caches to sync")
×
1026
        }
×
1027
        c.logger.Info("Synced StackSet watcher")
×
1028

×
1029
        return nil
×
1030
}
1031

1032
type informerInterface interface {
1033
        Informer() cache.SharedIndexInformer
1034
}
1035

NEW
1036
func (c *StackSetController) setupInformers(ctx context.Context) error {
×
NEW
1037
        factory := informers.NewSharedInformerFactory(c.client, time.Hour*24)
×
NEW
1038

×
NEW
1039
        stackSetFactory := stacksetinformers.NewSharedInformerFactory(c.client, time.Hour*24) // TODO: resync
×
NEW
1040
        rgClient := c.client.(*clientset.Clientset)
×
NEW
1041
        routegroupFactory := routegroupinformers.NewSharedInformerFactory(rgClient.RouteGroup, time.Hour*24)
×
NEW
1042

×
NEW
1043
        c.informers.stacksetInformer = stackSetFactory.Zalando().V1().StackSets()
×
NEW
1044
        c.informers.stackInformer = stackSetFactory.Zalando().V1().Stacks()
×
NEW
1045
        c.informers.ingressInformer = factory.Networking().V1().Ingresses()
×
NEW
1046
        c.informers.routegroupInformer = routegroupFactory.Zalando().V1().RouteGroups()
×
NEW
1047
        c.informers.deploymentInformer = factory.Apps().V1().Deployments()
×
NEW
1048
        c.informers.serviceInformer = factory.Core().V1().Services()
×
NEW
1049
        c.informers.hpaInformer = factory.Autoscaling().V2().HorizontalPodAutoscalers()
×
NEW
1050
        c.informers.configMapInformer = factory.Core().V1().ConfigMaps()
×
NEW
1051
        c.informers.secretInformer = factory.Core().V1().Secrets()
×
NEW
1052
        c.informers.pcsInformer = stackSetFactory.Zalando().V1().PlatformCredentialsSets()
×
NEW
1053

×
NEW
1054
        resourceInformers := []informerInterface{
×
NEW
1055
                c.informers.stacksetInformer,
×
NEW
1056
                c.informers.stackInformer,
×
NEW
1057
                c.informers.ingressInformer,
×
NEW
1058
                c.informers.routegroupInformer,
×
NEW
1059
                c.informers.deploymentInformer,
×
NEW
1060
                c.informers.serviceInformer,
×
NEW
1061
                c.informers.hpaInformer,
×
NEW
1062
                c.informers.configMapInformer,
×
NEW
1063
                c.informers.secretInformer,
×
NEW
1064
                c.informers.pcsInformer,
×
NEW
1065
        }
×
NEW
1066

×
NEW
1067
        var hasSynced []cache.InformerSynced
×
NEW
1068
        for _, informer := range resourceInformers {
×
NEW
1069
                _, err := informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
NEW
1070
                        AddFunc:    c.addResource,
×
NEW
1071
                        UpdateFunc: c.updateResource,
×
NEW
1072
                        DeleteFunc: c.deleteResource,
×
NEW
1073
                })
×
NEW
1074
                if err != nil {
×
NEW
1075
                        return fmt.Errorf("failed to add event handler: %v", err)
×
NEW
1076
                }
×
1077

NEW
1078
                hasSynced = append(hasSynced, informer.Informer().HasSynced)
×
NEW
1079
                go informer.Informer().Run(ctx.Done())
×
1080
        }
1081

NEW
1082
        if !cache.WaitForCacheSync(ctx.Done(),
×
NEW
1083
                hasSynced...,
×
NEW
1084
        ) {
×
NEW
1085
                return fmt.Errorf("failed to sync informers")
×
NEW
1086
        }
×
1087

NEW
1088
        return nil
×
1089
}
1090

NEW
1091
func objToNamespacedName(obj interface{}) (types.NamespacedName, bool) {
×
NEW
1092
        switch typedObj := obj.(type) {
×
NEW
1093
        case *zv1.StackSet:
×
NEW
1094
                return types.NamespacedName{
×
NEW
1095
                        Namespace: typedObj.Namespace,
×
NEW
1096
                        Name:      typedObj.Name,
×
NEW
1097
                }, true
×
NEW
1098
        case *zv1.Stack, *networking.Ingress, *rgv1.RouteGroup, *appsv1.Deployment, *v1.Service, *autoscalingv2.HorizontalPodAutoscaler, *v1.ConfigMap, *v1.Secret, *zv1.PlatformCredentialsSet:
×
NEW
1099
                meta, ok := obj.(metav1.ObjectMeta)
×
NEW
1100
                if !ok {
×
NEW
1101
                        return types.NamespacedName{}, false
×
NEW
1102
                }
×
1103

NEW
1104
                if stackset, ok := meta.GetLabels()[core.StacksetHeritageLabelKey]; ok {
×
NEW
1105
                        return types.NamespacedName{
×
NEW
1106
                                Namespace: meta.Namespace,
×
NEW
1107
                                Name:      stackset,
×
NEW
1108
                        }, true
×
NEW
1109
                }
×
1110

NEW
1111
                return types.NamespacedName{}, false
×
NEW
1112
        default:
×
NEW
1113
                return types.NamespacedName{}, false
×
1114
        }
1115
}
1116

NEW
1117
func (c *StackSetController) addResource(obj any) {
×
NEW
1118
        key, ok := objToNamespacedName(obj)
×
NEW
1119
        if !ok {
×
NEW
1120
                return
×
NEW
1121
        }
×
NEW
1122
        c.queue.Add(key)
×
1123
}
1124

NEW
1125
func (c *StackSetController) updateResource(oldObj, newObj any) {
×
NEW
1126
        c.addResource(newObj)
×
NEW
1127
}
×
1128

NEW
1129
func (c *StackSetController) deleteResource(obj any) {
×
NEW
1130
        stackset, ok := obj.(*zv1.StackSet)
×
NEW
1131
        if !ok {
×
NEW
1132
                // non-stackset deletions indicate refresh to resource
×
NEW
1133
                // associated with a stackset
×
NEW
1134
                c.addResource(obj)
×
NEW
1135
                return
×
NEW
1136
        }
×
1137

NEW
1138
        key := types.NamespacedName{
×
NEW
1139
                Namespace: stackset.Namespace,
×
NEW
1140
                Name:      stackset.Name,
×
NEW
1141
        }
×
NEW
1142

×
NEW
1143
        c.queue.Forget(key)
×
NEW
1144
        c.queue.Done(key)
×
1145
}
1146

1147
func (c *StackSetController) add(obj interface{}) {
×
1148
        stackset, ok := obj.(*zv1.StackSet)
×
1149
        if !ok {
×
1150
                return
×
1151
        }
×
1152

1153
        c.logger.Infof("New StackSet added %s/%s", stackset.Namespace, stackset.Name)
×
1154
        c.stacksetEvents <- stacksetEvent{
×
1155
                StackSet: stackset.DeepCopy(),
×
1156
        }
×
1157
}
1158

1159
func (c *StackSetController) update(oldObj, newObj interface{}) {
×
1160
        newStackset, ok := newObj.(*zv1.StackSet)
×
1161
        if !ok {
×
1162
                return
×
1163
        }
×
1164

1165
        oldStackset, ok := oldObj.(*zv1.StackSet)
×
1166
        if !ok {
×
1167
                return
×
1168
        }
×
1169

1170
        c.logger.Debugf("StackSet %s/%s changed: %s",
×
1171
                newStackset.Namespace,
×
1172
                newStackset.Name,
×
1173
                cmp.Diff(oldStackset, newStackset, cmpopts.IgnoreUnexported(resource.Quantity{})),
×
1174
        )
×
1175

×
1176
        c.logger.Infof("StackSet updated %s/%s", newStackset.Namespace, newStackset.Name)
×
1177
        c.stacksetEvents <- stacksetEvent{
×
1178
                StackSet: newStackset.DeepCopy(),
×
1179
        }
×
1180
}
1181

1182
func (c *StackSetController) del(obj interface{}) {
×
1183
        stackset, ok := obj.(*zv1.StackSet)
×
1184
        if !ok {
×
1185
                return
×
1186
        }
×
1187

1188
        c.logger.Infof("StackSet deleted %s/%s", stackset.Namespace, stackset.Name)
×
1189
        c.stacksetEvents <- stacksetEvent{
×
1190
                StackSet: stackset.DeepCopy(),
×
1191
                Deleted:  true,
×
1192
        }
×
1193
}
1194

1195
func retryUpdate(updateFn func(retry bool) error) error {
×
1196
        retry := false
×
1197
        for {
×
1198
                err := updateFn(retry)
×
1199
                if err != nil {
×
1200
                        if errors.IsConflict(err) {
×
1201
                                retry = true
×
1202
                                continue
×
1203
                        }
1204
                        return err
×
1205
                }
1206
                return nil
×
1207
        }
1208
}
1209

1210
// ReconcileStatuses reconciles the statuses of StackSets and Stacks.
1211
func (c *StackSetController) ReconcileStatuses(ctx context.Context, ssc *core.StackSetContainer) error {
×
1212
        for _, sc := range ssc.StackContainers {
×
1213
                stack := sc.Stack.DeepCopy()
×
1214
                status := *sc.GenerateStackStatus()
×
1215
                err := retryUpdate(func(retry bool) error {
×
1216
                        if retry {
×
1217
                                updated, err := c.client.ZalandoV1().Stacks(sc.Namespace()).Get(ctx, stack.Name, metav1.GetOptions{})
×
1218
                                if err != nil {
×
1219
                                        return err
×
1220
                                }
×
1221
                                stack = updated
×
1222
                        }
1223
                        if !equality.Semantic.DeepEqual(status, stack.Status) {
×
1224
                                stack.Status = status
×
1225
                                _, err := c.client.ZalandoV1().Stacks(sc.Namespace()).UpdateStatus(ctx, stack, metav1.UpdateOptions{})
×
1226
                                return err
×
1227
                        }
×
1228
                        return nil
×
1229
                })
1230
                if err != nil {
×
1231
                        return c.errorEventf(sc.Stack, "FailedUpdateStackStatus", err)
×
1232
                }
×
1233
        }
1234

1235
        stackset := ssc.StackSet.DeepCopy()
×
1236
        status := *ssc.GenerateStackSetStatus()
×
1237
        err := retryUpdate(func(retry bool) error {
×
1238
                if retry {
×
1239
                        updated, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).Get(ctx, ssc.StackSet.Name, metav1.GetOptions{})
×
1240
                        if err != nil {
×
1241
                                return err
×
1242
                        }
×
1243
                        stackset = updated
×
1244
                }
1245
                if !equality.Semantic.DeepEqual(status, stackset.Status) {
×
1246
                        stackset.Status = status
×
1247
                        _, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, stackset, metav1.UpdateOptions{})
×
1248
                        return err
×
1249
                }
×
1250
                return nil
×
1251
        })
1252
        if err != nil {
×
1253
                return c.errorEventf(ssc.StackSet, "FailedUpdateStackSetStatus", err)
×
1254
        }
×
1255
        return nil
×
1256
}
1257

1258
// ReconcileTrafficSegments updates the traffic segments according to the actual
1259
// traffic weight of each stack.
1260
//
1261
// Returns the ordered list of Trafic Segments that need to be updated.
1262
func (c *StackSetController) ReconcileTrafficSegments(
1263
        ctx context.Context,
1264
        ssc *core.StackSetContainer,
1265
) ([]types.UID, error) {
×
1266
        // Compute segments
×
1267
        toUpdate, err := ssc.ComputeTrafficSegments()
×
1268
        if err != nil {
×
1269
                return nil, c.errorEventf(ssc.StackSet, "FailedManageSegments", err)
×
1270
        }
×
1271

1272
        return toUpdate, nil
×
1273
}
1274

1275
// CreateCurrentStack creates a new Stack object for the current stack, if needed
1276
func (c *StackSetController) CreateCurrentStack(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
1277
        newStack, newStackVersion := ssc.NewStack(c.config.ForwardSupportEnabled)
1✔
1278
        if newStack == nil {
2✔
1279
                return nil
1✔
1280
        }
1✔
1281

1282
        if c.config.ConfigMapSupportEnabled || c.config.SecretSupportEnabled {
2✔
1283
                // ensure that ConfigurationResources are prefixed by Stack name.
1✔
1284
                if err := validateAllConfigurationResourcesNames(newStack.Stack); err != nil {
1✔
1285
                        return err
×
1286
                }
×
1287
        }
1288

1289
        created, err := c.client.ZalandoV1().Stacks(newStack.Namespace()).Create(ctx, newStack.Stack, metav1.CreateOptions{})
1✔
1290
        if err != nil {
1✔
1291
                return err
×
1292
        }
×
1293
        fixupStackTypeMeta(created)
1✔
1294

1✔
1295
        c.recorder.Eventf(
1✔
1296
                ssc.StackSet,
1✔
1297
                v1.EventTypeNormal,
1✔
1298
                "CreatedStack",
1✔
1299
                "Created stack %s",
1✔
1300
                newStack.Name(),
1✔
1301
        )
1✔
1302

1✔
1303
        // Persist ObservedStackVersion in the status
1✔
1304
        updated := ssc.StackSet.DeepCopy()
1✔
1305
        updated.Status.ObservedStackVersion = newStackVersion
1✔
1306

1✔
1307
        result, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, updated, metav1.UpdateOptions{})
1✔
1308
        if err != nil {
1✔
1309
                return err
×
1310
        }
×
1311
        fixupStackSetTypeMeta(result)
1✔
1312
        ssc.StackSet = result
1✔
1313

1✔
1314
        ssc.StackContainers[created.UID] = &core.StackContainer{
1✔
1315
                Stack:          created,
1✔
1316
                PendingRemoval: false,
1✔
1317
                Resources:      core.StackResources{},
1✔
1318
        }
1✔
1319
        return nil
1✔
1320
}
1321

1322
// CleanupOldStacks deletes stacks that are no longer needed.
1323
func (c *StackSetController) CleanupOldStacks(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
1324
        for _, sc := range ssc.StackContainers {
2✔
1325
                if !sc.PendingRemoval {
2✔
1326
                        continue
1✔
1327
                }
1328

1329
                stack := sc.Stack
1✔
1330
                err := c.client.ZalandoV1().Stacks(stack.Namespace).Delete(ctx, stack.Name, metav1.DeleteOptions{})
1✔
1331
                if err != nil {
1✔
1332
                        return c.errorEventf(ssc.StackSet, "FailedDeleteStack", err)
×
1333
                }
×
1334
                c.recorder.Eventf(
1✔
1335
                        ssc.StackSet,
1✔
1336
                        v1.EventTypeNormal,
1✔
1337
                        "DeletedExcessStack",
1✔
1338
                        "Deleted excess stack %s",
1✔
1339
                        stack.Name)
1✔
1340
        }
1341

1342
        return nil
1✔
1343
}
1344

1345
// AddUpdateStackSetIngress reconciles the Ingress but never deletes it, it returns the existing/new Ingress
1346
func (c *StackSetController) AddUpdateStackSetIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup, ingress *networking.Ingress) (*networking.Ingress, error) {
×
1347
        // Ingress removed, handled outside
×
1348
        if ingress == nil {
×
1349
                return existing, nil
×
1350
        }
×
1351

1352
        if existing == nil {
×
1353
                if ingress.Annotations == nil {
×
1354
                        ingress.Annotations = make(map[string]string)
×
1355
                }
×
1356
                ingress.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
×
1357

×
1358
                createdIng, err := c.client.NetworkingV1().Ingresses(ingress.Namespace).Create(ctx, ingress, metav1.CreateOptions{})
×
1359
                if err != nil {
×
1360
                        return nil, err
×
1361
                }
×
1362
                c.recorder.Eventf(
×
1363
                        stackset,
×
1364
                        v1.EventTypeNormal,
×
1365
                        "CreatedIngress",
×
1366
                        "Created Ingress %s",
×
1367
                        ingress.Name)
×
1368
                return createdIng, nil
×
1369
        }
1370

1371
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
×
1372
        if existingHaveUpdateTimeStamp {
×
1373
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
×
1374
        }
×
1375

1376
        // Check if we need to update the Ingress
1377
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(ingress.Spec, existing.Spec) &&
×
1378
                equality.Semantic.DeepEqual(ingress.Annotations, existing.Annotations) &&
×
1379
                equality.Semantic.DeepEqual(ingress.Labels, existing.Labels) {
×
1380
                // add the annotation back after comparing
×
1381
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
×
1382
                return existing, nil
×
1383
        }
×
1384

1385
        updated := existing.DeepCopy()
×
1386
        updated.Spec = ingress.Spec
×
1387
        if ingress.Annotations != nil {
×
1388
                updated.Annotations = ingress.Annotations
×
1389
        } else {
×
1390
                updated.Annotations = make(map[string]string)
×
1391
        }
×
1392
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
×
1393

×
1394
        updated.Labels = ingress.Labels
×
1395

×
1396
        createdIngress, err := c.client.NetworkingV1().Ingresses(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
×
1397
        if err != nil {
×
1398
                return nil, err
×
1399
        }
×
1400
        c.recorder.Eventf(
×
1401
                stackset,
×
1402
                v1.EventTypeNormal,
×
1403
                "UpdatedIngress",
×
1404
                "Updated Ingress %s",
×
1405
                ingress.Name)
×
1406
        return createdIngress, nil
×
1407
}
1408

1409
// AddUpdateStackSetRouteGroup reconciles the RouteGroup but never deletes it, it returns the existing/new RouteGroup
1410
func (c *StackSetController) AddUpdateStackSetRouteGroup(ctx context.Context, stackset *zv1.StackSet, existing *rgv1.RouteGroup, ingress *networking.Ingress, rg *rgv1.RouteGroup) (*rgv1.RouteGroup, error) {
×
1411
        // RouteGroup removed, handled outside
×
1412
        if rg == nil {
×
1413
                return existing, nil
×
1414
        }
×
1415

1416
        // Create new RouteGroup
1417
        if existing == nil {
×
1418
                if rg.Annotations == nil {
×
1419
                        rg.Annotations = make(map[string]string)
×
1420
                }
×
1421
                rg.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
×
1422

×
1423
                createdRg, err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Create(ctx, rg, metav1.CreateOptions{})
×
1424
                if err != nil {
×
1425
                        return nil, err
×
1426
                }
×
1427
                c.recorder.Eventf(
×
1428
                        stackset,
×
1429
                        v1.EventTypeNormal,
×
1430
                        "CreatedRouteGroup",
×
1431
                        "Created RouteGroup %s",
×
1432
                        rg.Name)
×
1433
                return createdRg, nil
×
1434
        }
1435

1436
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
×
1437
        if existingHaveUpdateTimeStamp {
×
1438
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
×
1439
        }
×
1440

1441
        // Check if we need to update the RouteGroup
1442
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(rg.Spec, existing.Spec) &&
×
1443
                equality.Semantic.DeepEqual(rg.Annotations, existing.Annotations) &&
×
1444
                equality.Semantic.DeepEqual(rg.Labels, existing.Labels) {
×
1445
                // add the annotation back after comparing
×
1446
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
×
1447
                return existing, nil
×
1448
        }
×
1449

1450
        updated := existing.DeepCopy()
×
1451
        updated.Spec = rg.Spec
×
1452
        if rg.Annotations != nil {
×
1453
                updated.Annotations = rg.Annotations
×
1454
        } else {
×
1455
                updated.Annotations = make(map[string]string)
×
1456
        }
×
1457
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
×
1458

×
1459
        updated.Labels = rg.Labels
×
1460

×
1461
        createdRg, err := c.client.RouteGroupV1().RouteGroups(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
×
1462
        if err != nil {
×
1463
                return nil, err
×
1464
        }
×
1465
        c.recorder.Eventf(
×
1466
                stackset,
×
1467
                v1.EventTypeNormal,
×
1468
                "UpdatedRouteGroup",
×
1469
                "Updated RouteGroup %s",
×
1470
                rg.Name)
×
1471
        return createdRg, nil
×
1472
}
1473

1474
// RecordTrafficSwitch records an event detailing when switches in traffic to
1475
// Stacks, only when there are changes to record.
1476
func (c *StackSetController) RecordTrafficSwitch(ctx context.Context, ssc *core.StackSetContainer) error {
×
1477
        trafficChanges := ssc.TrafficChanges()
×
1478
        if len(trafficChanges) != 0 {
×
1479
                var changeMessages []string
×
1480
                for _, change := range trafficChanges {
×
1481
                        changeMessages = append(changeMessages, change.String())
×
1482
                }
×
1483

1484
                c.recorder.Eventf(
×
1485
                        ssc.StackSet,
×
1486
                        v1.EventTypeNormal,
×
1487
                        "TrafficSwitched",
×
1488
                        "Switched traffic: %s",
×
1489
                        strings.Join(changeMessages, ", "))
×
1490
        }
1491

1492
        return nil
×
1493
}
1494

1495
func (c *StackSetController) ReconcileStackSetDesiredTraffic(ctx context.Context, existing *zv1.StackSet, generateUpdated func() []*zv1.DesiredTraffic) error {
1✔
1496
        updatedTraffic := generateUpdated()
1✔
1497

1✔
1498
        if equality.Semantic.DeepEqual(existing.Spec.Traffic, updatedTraffic) {
1✔
1499
                return nil
×
1500
        }
×
1501

1502
        updated := existing.DeepCopy()
1✔
1503
        updated.Spec.Traffic = updatedTraffic
1✔
1504

1✔
1505
        _, err := c.client.ZalandoV1().StackSets(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
1506
        if err != nil {
1✔
1507
                return err
×
1508
        }
×
1509
        c.recorder.Eventf(
1✔
1510
                updated,
1✔
1511
                v1.EventTypeNormal,
1✔
1512
                "UpdatedStackSet",
1✔
1513
                "Updated StackSet %s",
1✔
1514
                updated.Name)
1✔
1515
        return nil
1✔
1516
}
1517

1518
func (c *StackSetController) ReconcileStackResources(ctx context.Context, ssc *core.StackSetContainer, sc *core.StackContainer) error {
×
1519
        err := c.ReconcileStackIngress(ctx, sc.Stack, sc.Resources.Ingress, sc.GenerateIngress)
×
1520
        if err != nil {
×
1521
                return c.errorEventf(sc.Stack, "FailedManageIngress", err)
×
1522
        }
×
1523

1524
        err = c.ReconcileStackIngress(
×
1525
                ctx,
×
1526
                sc.Stack,
×
1527
                sc.Resources.IngressSegment,
×
1528
                sc.GenerateIngressSegment,
×
1529
        )
×
1530
        if err != nil {
×
1531
                return c.errorEventf(sc.Stack, "FailedManageIngressSegment", err)
×
1532
        }
×
1533

1534
        if c.config.RouteGroupSupportEnabled {
×
1535
                err = c.ReconcileStackRouteGroup(ctx, sc.Stack, sc.Resources.RouteGroup, sc.GenerateRouteGroup)
×
1536
                if err != nil {
×
1537
                        return c.errorEventf(sc.Stack, "FailedManageRouteGroup", err)
×
1538
                }
×
1539

1540
                err = c.ReconcileStackRouteGroup(
×
1541
                        ctx,
×
1542
                        sc.Stack,
×
1543
                        sc.Resources.RouteGroupSegment,
×
1544
                        sc.GenerateRouteGroupSegment,
×
1545
                )
×
1546
                if err != nil {
×
1547
                        return c.errorEventf(
×
1548
                                sc.Stack,
×
1549
                                "FailedManageRouteGroupSegment",
×
1550
                                err,
×
1551
                        )
×
1552
                }
×
1553
        }
1554

1555
        if c.config.ConfigMapSupportEnabled {
×
1556
                err := c.ReconcileStackConfigMapRefs(ctx, sc.Stack, sc.UpdateObjectMeta)
×
1557
                if err != nil {
×
1558
                        return c.errorEventf(sc.Stack, "FailedManageConfigMapRefs", err)
×
1559
                }
×
1560
        }
1561

1562
        if c.config.SecretSupportEnabled {
×
1563
                err := c.ReconcileStackSecretRefs(ctx, sc.Stack, sc.UpdateObjectMeta)
×
1564
                if err != nil {
×
1565
                        return c.errorEventf(sc.Stack, "FailedManageSecretRefs", err)
×
1566
                }
×
1567
        }
1568

1569
        if c.config.PcsSupportEnabled {
×
1570
                err = c.ReconcileStackPlatformCredentialsSets(
×
1571
                        ctx,
×
1572
                        sc.Stack,
×
1573
                        sc.Resources.PlatformCredentialsSets,
×
1574
                        sc.GeneratePlatformCredentialsSet,
×
1575
                )
×
1576
                if err != nil {
×
1577
                        return c.errorEventf(sc.Stack, "FailedManagePlatformCredentialsSet", err)
×
1578
                }
×
1579
        }
1580

1581
        err = c.ReconcileStackDeployment(ctx, sc.Stack, sc.Resources.Deployment, sc.GenerateDeployment)
×
1582
        if err != nil {
×
1583
                return c.errorEventf(sc.Stack, "FailedManageDeployment", err)
×
1584
        }
×
1585

1586
        hpaGenerator := sc.GenerateHPA
×
1587
        err = c.ReconcileStackHPA(ctx, sc.Stack, sc.Resources.HPA, hpaGenerator)
×
1588
        if err != nil {
×
1589
                return c.errorEventf(sc.Stack, "FailedManageHPA", err)
×
1590
        }
×
1591

1592
        err = c.ReconcileStackService(ctx, sc.Stack, sc.Resources.Service, sc.GenerateService)
×
1593
        if err != nil {
×
1594
                return c.errorEventf(sc.Stack, "FailedManageService", err)
×
1595
        }
×
1596

1597
        return nil
×
1598
}
1599

1600
// ReconcileStackSet reconciles all the things from a stackset
1601
func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *core.StackSetContainer) (err error) {
×
1602
        defer func() {
×
1603
                if r := recover(); r != nil {
×
1604
                        c.metricsReporter.ReportPanic()
×
1605
                        c.stacksetLogger(container).Errorf("Encountered a panic while processing a stackset: %v\n%s", r, debug.Stack())
×
1606
                        err = fmt.Errorf("panic: %v", r)
×
1607
                }
×
1608
        }()
1609

NEW
1610
        var errors []error
×
NEW
1611

×
1612
        // Create current stack, if needed. Proceed on errors.
×
1613
        err = c.CreateCurrentStack(ctx, container)
×
1614
        if err != nil {
×
1615
                err = c.errorEventf(container.StackSet, "FailedCreateStack", err)
×
1616
                c.stacksetLogger(container).Errorf("Unable to create stack: %v", err)
×
NEW
1617
                errors = append(errors, err)
×
UNCOV
1618
        }
×
1619

1620
        // Update statuses from external resources (ingresses, deployments, etc). Abort on errors.
1621
        err = container.UpdateFromResources()
×
1622
        if err != nil {
×
NEW
1623
                c.recorder.Eventf(
×
NEW
1624
                        container.StackSet,
×
NEW
1625
                        v1.EventTypeWarning,
×
NEW
1626
                        "FailedUpdateFromResources",
×
NEW
1627
                        "Failed to update from resources: "+err.Error())
×
1628
                return err
×
1629
        }
×
1630

1631
        // Update the stacks with the currently selected traffic reconciler. Proceed on errors.
1632
        err = container.ManageTraffic(time.Now())
×
1633
        if err != nil {
×
1634
                c.stacksetLogger(container).Errorf("Traffic reconciliation failed: %v", err)
×
1635
                c.recorder.Eventf(
×
1636
                        container.StackSet,
×
1637
                        v1.EventTypeWarning,
×
1638
                        "TrafficNotSwitched",
×
1639
                        "Failed to switch traffic: "+err.Error())
×
NEW
1640
                errors = append(errors, err)
×
UNCOV
1641
        }
×
1642

1643
        // Mark stacks that should be removed
1644
        container.MarkExpiredStacks()
×
1645

×
1646
        // Update traffic segments. Proceed on errors.
×
1647
        segsInOrder, err := c.ReconcileTrafficSegments(ctx, container)
×
1648
        if err != nil {
×
1649
                err = c.errorEventf(
×
1650
                        container.StackSet,
×
1651
                        reasonFailedManageStackSet,
×
1652
                        err,
×
1653
                )
×
1654
                c.stacksetLogger(container).Errorf(
×
1655
                        "Unable to reconcile traffic segments: %v",
×
1656
                        err,
×
1657
                )
×
NEW
1658
                errors = append(errors, err)
×
UNCOV
1659
        }
×
1660

1661
        // Reconcile stack resources. Proceed on errors.
1662
        reconciledStacks := map[types.UID]bool{}
×
1663
        for _, id := range segsInOrder {
×
1664
                reconciledStacks[id] = true
×
1665
                sc := container.StackContainers[id]
×
1666
                err = c.ReconcileStackResources(ctx, container, sc)
×
1667
                if err != nil {
×
1668
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
1669
                        c.stackLogger(container, sc).Errorf(
×
1670
                                "Unable to reconcile stack resources: %v",
×
1671
                                err,
×
1672
                        )
×
NEW
1673
                        errors = append(errors, err)
×
UNCOV
1674
                }
×
1675
        }
1676

1677
        for k, sc := range container.StackContainers {
×
1678
                if reconciledStacks[k] {
×
1679
                        continue
×
1680
                }
1681

1682
                err = c.ReconcileStackResources(ctx, container, sc)
×
1683
                if err != nil {
×
1684
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
1685
                        c.stackLogger(container, sc).Errorf("Unable to reconcile stack resources: %v", err)
×
NEW
1686
                        errors = append(errors, err)
×
UNCOV
1687
                }
×
1688
        }
1689

1690
        // Reconcile stackset resources (update ingress and/or routegroups). Proceed on errors.
1691
        err = c.RecordTrafficSwitch(ctx, container)
×
1692
        if err != nil {
×
1693
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1694
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset resources: %v", err)
×
NEW
1695
                errors = append(errors, err)
×
UNCOV
1696
        }
×
1697

1698
        // Reconcile desired traffic in the stackset. Proceed on errors.
1699
        err = c.ReconcileStackSetDesiredTraffic(ctx, container.StackSet, container.GenerateStackSetTraffic)
×
1700
        if err != nil {
×
1701
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1702
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset traffic: %v", err)
×
NEW
1703
                errors = append(errors, err)
×
UNCOV
1704
        }
×
1705

1706
        // Delete old stacks. Proceed on errors.
1707
        err = c.CleanupOldStacks(ctx, container)
×
1708
        if err != nil {
×
1709
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1710
                c.stacksetLogger(container).Errorf("Unable to delete old stacks: %v", err)
×
NEW
1711
                errors = append(errors, err)
×
UNCOV
1712
        }
×
1713

1714
        // Update statuses.
1715
        err = c.ReconcileStatuses(ctx, container)
×
1716
        if err != nil {
×
NEW
1717
                c.recorder.Eventf(
×
NEW
1718
                        container.StackSet,
×
NEW
1719
                        v1.EventTypeWarning,
×
NEW
1720
                        "FailedUpdateStatuses",
×
NEW
1721
                        "Failed to update statuses: "+err.Error())
×
1722
                return err
×
1723
        }
×
1724

NEW
1725
        if len(errors) > 0 {
×
NEW
1726
                return fmt.Errorf("encountered %d errors during reconciliation, see events/logs for details", len(errors))
×
NEW
1727
        }
×
1728

UNCOV
1729
        return nil
×
1730
}
1731

1732
// getResetMinReplicasDelay parses and returns the reset delay if set in the
1733
// stackset annotation.
1734
func getResetMinReplicasDelay(annotations map[string]string) (time.Duration, bool) {
1✔
1735
        resetDelayStr, ok := annotations[ResetHPAMinReplicasDelayAnnotationKey]
1✔
1736
        if !ok {
2✔
1737
                return 0, false
1✔
1738
        }
1✔
1739
        resetDelay, err := time.ParseDuration(resetDelayStr)
1✔
1740
        if err != nil {
1✔
1741
                return 0, false
×
1742
        }
×
1743
        return resetDelay, true
1✔
1744
}
1745

1746
func fixupStackSetTypeMeta(stackset *zv1.StackSet) {
1✔
1747
        // set TypeMeta manually because of this bug:
1✔
1748
        // https://github.com/kubernetes/client-go/issues/308
1✔
1749
        stackset.APIVersion = core.APIVersion
1✔
1750
        stackset.Kind = core.KindStackSet
1✔
1751
}
1✔
1752

1753
func fixupStackTypeMeta(stack *zv1.Stack) {
1✔
1754
        // set TypeMeta manually because of this bug:
1✔
1755
        // https://github.com/kubernetes/client-go/issues/308
1✔
1756
        stack.APIVersion = core.APIVersion
1✔
1757
        stack.Kind = core.KindStack
1✔
1758
}
1✔
1759

1760
// validateConfigurationResourcesNames returns an error if any ConfigurationResource
1761
// name is not prefixed by Stack name.
1762
func validateAllConfigurationResourcesNames(stack *zv1.Stack) error {
1✔
1763
        for _, rsc := range stack.Spec.ConfigurationResources {
1✔
1764
                if err := validateConfigurationResourceName(stack.Name, rsc.GetName()); err != nil {
×
1765
                        return err
×
1766
                }
×
1767
        }
1768
        return nil
1✔
1769
}
1770

1771
// validateConfigurationResourceName returns an error if specific resource
1772
// name is not prefixed by Stack name.
1773
func validateConfigurationResourceName(stack string, rsc string) error {
1✔
1774
        if !strings.HasPrefix(rsc, stack) {
2✔
1775
                return fmt.Errorf(configurationResourceNameError, rsc, stack)
1✔
1776
        }
1✔
1777
        return nil
1✔
1778
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc