• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando-incubator / stackset-controller / 8283134271

14 Mar 2024 03:13PM UTC coverage: 52.089% (-0.2%) from 52.337%
8283134271

Pull #608

github

zaklawrencea
Handle informer error

Signed-off-by: Zak Lawrence <zaklawrencea@protonmail.com>
Pull Request #608: Update k8s.io/* dependencies to v0.26.14

0 of 41 new or added lines in 3 files covered. (0.0%)

1 existing line in 1 file now uncovered.

3079 of 5911 relevant lines covered (52.09%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.71
/controller/stackset.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "net/http"
7
        "runtime/debug"
8
        "strings"
9
        "sync"
10
        "time"
11

12
        "github.com/google/go-cmp/cmp"
13
        "github.com/google/go-cmp/cmp/cmpopts"
14
        "github.com/heptiolabs/healthcheck"
15
        "github.com/prometheus/client_golang/prometheus"
16
        log "github.com/sirupsen/logrus"
17
        rgv1 "github.com/szuecs/routegroup-client/apis/zalando.org/v1"
18
        zv1 "github.com/zalando-incubator/stackset-controller/pkg/apis/zalando.org/v1"
19
        "github.com/zalando-incubator/stackset-controller/pkg/clientset"
20
        "github.com/zalando-incubator/stackset-controller/pkg/core"
21
        "github.com/zalando-incubator/stackset-controller/pkg/recorder"
22
        "golang.org/x/sync/errgroup"
23
        v1 "k8s.io/api/core/v1"
24
        networking "k8s.io/api/networking/v1"
25
        "k8s.io/apimachinery/pkg/api/equality"
26
        "k8s.io/apimachinery/pkg/api/errors"
27
        "k8s.io/apimachinery/pkg/api/resource"
28
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
        "k8s.io/apimachinery/pkg/fields"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        "k8s.io/client-go/tools/cache"
33
        kube_record "k8s.io/client-go/tools/record"
34
)
35

36
const (
37
        PrescaleStacksAnnotationKey               = "alpha.stackset-controller.zalando.org/prescale-stacks"
38
        ResetHPAMinReplicasDelayAnnotationKey     = "alpha.stackset-controller.zalando.org/reset-hpa-min-replicas-delay"
39
        StacksetControllerControllerAnnotationKey = "stackset-controller.zalando.org/controller"
40
        ControllerLastUpdatedAnnotationKey        = "stackset-controller.zalando.org/updated-timestamp"
41
        TrafficSegmentsAnnotationKey              = "stackset-controller.zalando.org/use-traffic-segments"
42

43
        reasonFailedManageStackSet = "FailedManageStackSet"
44

45
        defaultResetMinReplicasDelay = 10 * time.Minute
46
)
47

48
var configurationResourceNameError = "ConfigurationResource name must be prefixed by Stack name. ConfigurationResource: %s, Stack: %s"
49

50
// StackSetController is the main controller. It watches for changes to
51
// stackset resources and starts and maintains other controllers per
52
// stackset resource.
53
type StackSetController struct {
54
        logger                      *log.Entry
55
        client                      clientset.Interface
56
        namespace                   string
57
        syncIngressAnnotations      []string
58
        controllerID                string
59
        backendWeightsAnnotationKey string
60
        clusterDomains              []string
61
        interval                    time.Duration
62
        stacksetEvents              chan stacksetEvent
63
        stacksetStore               map[types.UID]zv1.StackSet
64
        recorder                    kube_record.EventRecorder
65
        metricsReporter             *core.MetricsReporter
66
        HealthReporter              healthcheck.Handler
67
        routeGroupSupportEnabled    bool
68
        trafficSegmentsEnabled      bool
69
        annotatedTrafficSegments    bool
70
        ingressSourceSwitchTTL      time.Duration
71
        now                         func() string
72
        reconcileWorkers            int
73
        configMapSupportEnabled     bool
74
        secretSupportEnabled        bool
75
        sync.Mutex
76
}
77

78
type stacksetEvent struct {
79
        Deleted  bool
80
        StackSet *zv1.StackSet
81
}
82

83
// eventedError wraps an error that was already exposed as an event to the user
84
type eventedError struct {
85
        err error
86
}
87

88
func (ee *eventedError) Error() string {
×
89
        return ee.err.Error()
×
90
}
×
91

92
func now() string {
×
93
        return time.Now().Format(time.RFC3339)
×
94
}
×
95

96
// NewStackSetController initializes a new StackSetController.
97
func NewStackSetController(
98
        client clientset.Interface,
99
        namespace string,
100
        controllerID string,
101
        parallelWork int,
102
        backendWeightsAnnotationKey string,
103
        clusterDomains []string,
104
        registry prometheus.Registerer,
105
        interval time.Duration,
106
        routeGroupSupportEnabled bool,
107
        trafficSegmentsEnabled bool,
108
        annotatedTrafficSegments bool,
109
        syncIngressAnnotations []string,
110
        configMapSupportEnabled bool,
111
        secretSupportEnabled bool,
112
        ingressSourceSwitchTTL time.Duration,
113
) (*StackSetController, error) {
1✔
114
        metricsReporter, err := core.NewMetricsReporter(registry)
1✔
115
        if err != nil {
1✔
116
                return nil, err
×
117
        }
×
118

119
        return &StackSetController{
1✔
120
                logger:                      log.WithFields(log.Fields{"controller": "stackset"}),
1✔
121
                client:                      client,
1✔
122
                namespace:                   namespace,
1✔
123
                controllerID:                controllerID,
1✔
124
                backendWeightsAnnotationKey: backendWeightsAnnotationKey,
1✔
125
                clusterDomains:              clusterDomains,
1✔
126
                interval:                    interval,
1✔
127
                stacksetEvents:              make(chan stacksetEvent, 1),
1✔
128
                stacksetStore:               make(map[types.UID]zv1.StackSet),
1✔
129
                recorder:                    recorder.CreateEventRecorder(client),
1✔
130
                metricsReporter:             metricsReporter,
1✔
131
                HealthReporter:              healthcheck.NewHandler(),
1✔
132
                routeGroupSupportEnabled:    routeGroupSupportEnabled,
1✔
133
                trafficSegmentsEnabled:      trafficSegmentsEnabled,
1✔
134
                annotatedTrafficSegments:    annotatedTrafficSegments,
1✔
135
                syncIngressAnnotations:      syncIngressAnnotations,
1✔
136
                ingressSourceSwitchTTL:      ingressSourceSwitchTTL,
1✔
137
                configMapSupportEnabled:     configMapSupportEnabled,
1✔
138
                secretSupportEnabled:        secretSupportEnabled,
1✔
139
                now:                         now,
1✔
140
                reconcileWorkers:            parallelWork,
1✔
141
        }, nil
1✔
142
}
143

144
func (c *StackSetController) stacksetLogger(ssc *core.StackSetContainer) *log.Entry {
×
145
        return c.logger.WithFields(map[string]interface{}{
×
146
                "namespace": ssc.StackSet.Namespace,
×
147
                "stackset":  ssc.StackSet.Name,
×
148
        })
×
149
}
×
150

151
func (c *StackSetController) stackLogger(ssc *core.StackSetContainer, sc *core.StackContainer) *log.Entry {
×
152
        return c.logger.WithFields(map[string]interface{}{
×
153
                "namespace": ssc.StackSet.Namespace,
×
154
                "stackset":  ssc.StackSet.Name,
×
155
                "stack":     sc.Name(),
×
156
        })
×
157
}
×
158

159
// Run runs the main loop of the StackSetController. Before the loops it
160
// sets up a watcher to watch StackSet resources. The watch will send
161
// changes over a channel which is polled from the main loop.
NEW
162
func (c *StackSetController) Run(ctx context.Context) error {
×
163
        var nextCheck time.Time
×
164

×
165
        // We're not alive if nextCheck is too far in the past
×
166
        c.HealthReporter.AddLivenessCheck("nextCheck", func() error {
×
167
                if time.Since(nextCheck) > 5*c.interval {
×
168
                        return fmt.Errorf("nextCheck too old")
×
169
                }
×
170
                return nil
×
171
        })
172

NEW
173
        err := c.startWatch(ctx)
×
NEW
174
        if err != nil {
×
NEW
175
                return err
×
NEW
176
        }
×
177

178
        http.HandleFunc("/healthz", c.HealthReporter.LiveEndpoint)
×
179

×
180
        nextCheck = time.Now().Add(-c.interval)
×
181

×
182
        for {
×
183
                select {
×
184
                case <-time.After(time.Until(nextCheck)):
×
185

×
186
                        nextCheck = time.Now().Add(c.interval)
×
187

×
188
                        stackSetContainers, err := c.collectResources(ctx)
×
189
                        if err != nil {
×
190
                                c.logger.Errorf("Failed to collect resources: %v", err)
×
191
                                continue
×
192
                        }
193

194
                        var reconcileGroup errgroup.Group
×
195
                        reconcileGroup.SetLimit(c.reconcileWorkers)
×
196
                        for stackset, container := range stackSetContainers {
×
197
                                container := container
×
198
                                stackset := stackset
×
199

×
200
                                reconcileGroup.Go(func() error {
×
201
                                        if _, ok := c.stacksetStore[stackset]; ok {
×
202
                                                err := c.ReconcileStackSet(ctx, container)
×
203
                                                if err != nil {
×
204
                                                        c.stacksetLogger(container).Errorf("unable to reconcile a stackset: %v", err)
×
205
                                                        return c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
206
                                                }
×
207
                                        }
208
                                        return nil
×
209
                                })
210
                        }
211

212
                        err = reconcileGroup.Wait()
×
213
                        if err != nil {
×
214
                                c.logger.Errorf("Failed waiting for reconcilers: %v", err)
×
215
                        }
×
216
                        err = c.metricsReporter.Report(stackSetContainers)
×
217
                        if err != nil {
×
218
                                c.logger.Errorf("Failed reporting metrics: %v", err)
×
219
                        }
×
220
                case e := <-c.stacksetEvents:
×
221
                        stackset := *e.StackSet
×
222
                        fixupStackSetTypeMeta(&stackset)
×
223

×
224
                        // update/delete existing entry
×
225
                        if _, ok := c.stacksetStore[stackset.UID]; ok {
×
226
                                if e.Deleted || !c.hasOwnership(&stackset) {
×
227
                                        delete(c.stacksetStore, stackset.UID)
×
228
                                        continue
×
229
                                }
230

231
                                // update stackset entry
232
                                c.stacksetStore[stackset.UID] = stackset
×
233
                                continue
×
234
                        }
235

236
                        // check if stackset should be managed by the controller
237
                        if !c.hasOwnership(&stackset) {
×
238
                                continue
×
239
                        }
240

241
                        c.logger.Infof("Adding entry for StackSet %s/%s", stackset.Namespace, stackset.Name)
×
242
                        c.stacksetStore[stackset.UID] = stackset
×
243
                case <-ctx.Done():
×
244
                        c.logger.Info("Terminating main controller loop.")
×
NEW
245
                        return nil
×
246
                }
247
        }
248
}
249

250
// injectSegmentAnnotation injects the traffic segment annotation if it's not
251
// already present.
252
//
253
// Only inject the traffic segment annotation if the controller has traffic
254
// segments enabled by default, i.e. doesn't matter if the StackSet has the
255
// segment annotation.
256
func (c *StackSetController) injectSegmentAnnotation(
257
        ctx context.Context,
258
        stackSet *zv1.StackSet,
259
) bool {
×
260
        if c.annotatedTrafficSegments {
×
261
                return false
×
262
        }
×
263

264
        if !c.trafficSegmentsEnabled {
×
265
                return false
×
266
        }
×
267

268
        if stackSet.Annotations[TrafficSegmentsAnnotationKey] == "true" {
×
269
                return false
×
270
        }
×
271

272
        stackSet.Annotations[TrafficSegmentsAnnotationKey] = "true"
×
273

×
274
        _, err := c.client.ZalandoV1().StackSets(
×
275
                stackSet.Namespace,
×
276
        ).Update(
×
277
                ctx,
×
278
                stackSet,
×
279
                metav1.UpdateOptions{},
×
280
        )
×
281
        if err != nil {
×
282
                c.logger.Errorf(
×
283
                        "Failed injecting segment annotation: %v",
×
284
                        err,
×
285
                )
×
286
                return false
×
287
        }
×
288
        c.recorder.Eventf(
×
289
                stackSet,
×
290
                v1.EventTypeNormal,
×
291
                "UpdatedStackSet",
×
292
                "Updated StackSet %s. Injected %s annotation",
×
293
                stackSet.Name,
×
294
                TrafficSegmentsAnnotationKey,
×
295
        )
×
296

×
297
        return true
×
298
}
299

300
// collectResources collects resources for all stacksets at once and stores them per StackSet/Stack so that we don't
301
// overload the API requests with unnecessary requests
302
func (c *StackSetController) collectResources(ctx context.Context) (map[types.UID]*core.StackSetContainer, error) {
1✔
303
        stacksets := make(map[types.UID]*core.StackSetContainer, len(c.stacksetStore))
1✔
304
        for uid, stackset := range c.stacksetStore {
2✔
305
                stackset := stackset
1✔
306

1✔
307
                reconciler := core.TrafficReconciler(&core.SimpleTrafficReconciler{})
1✔
308

1✔
309
                // use prescaling logic if enabled with an annotation
1✔
310
                if _, ok := stackset.Annotations[PrescaleStacksAnnotationKey]; ok {
2✔
311
                        resetDelay := defaultResetMinReplicasDelay
1✔
312
                        if resetDelayValue, ok := getResetMinReplicasDelay(stackset.Annotations); ok {
2✔
313
                                resetDelay = resetDelayValue
1✔
314
                        }
1✔
315
                        reconciler = &core.PrescalingTrafficReconciler{
1✔
316
                                ResetHPAMinReplicasTimeout: resetDelay,
1✔
317
                        }
1✔
318
                }
319

320
                stacksetContainer := core.NewContainer(&stackset, reconciler, c.backendWeightsAnnotationKey, c.clusterDomains)
1✔
321
                if c.trafficSegmentsEnabled || c.annotatedTrafficSegments {
2✔
322

1✔
323
                        if stackset.Annotations[TrafficSegmentsAnnotationKey] == "true" {
2✔
324
                                stacksetContainer.EnableSegmentTraffic()
1✔
325
                                stacksetContainer.SynchronizeIngressAnnotations(
1✔
326
                                        c.syncIngressAnnotations,
1✔
327
                                )
1✔
328
                        }
1✔
329
                }
330
                stacksets[uid] = stacksetContainer
1✔
331
        }
332

333
        err := c.collectStacks(ctx, stacksets)
1✔
334
        if err != nil {
1✔
335
                return nil, err
×
336
        }
×
337

338
        err = c.collectIngresses(ctx, stacksets)
1✔
339
        if err != nil {
1✔
340
                return nil, err
×
341
        }
×
342

343
        if c.routeGroupSupportEnabled {
2✔
344
                err = c.collectRouteGroups(ctx, stacksets)
1✔
345
                if err != nil {
1✔
346
                        return nil, err
×
347
                }
×
348
        }
349

350
        err = c.collectDeployments(ctx, stacksets)
1✔
351
        if err != nil {
1✔
352
                return nil, err
×
353
        }
×
354

355
        err = c.collectServices(ctx, stacksets)
1✔
356
        if err != nil {
1✔
357
                return nil, err
×
358
        }
×
359

360
        err = c.collectHPAs(ctx, stacksets)
1✔
361
        if err != nil {
1✔
362
                return nil, err
×
363
        }
×
364

365
        if c.configMapSupportEnabled {
2✔
366
                err = c.collectConfigMaps(ctx, stacksets)
1✔
367
                if err != nil {
1✔
368
                        return nil, err
×
369
                }
×
370
        }
371

372
        if c.secretSupportEnabled {
2✔
373
                err = c.collectSecrets(ctx, stacksets)
1✔
374
                if err != nil {
1✔
375
                        return nil, err
×
376
                }
×
377
        }
378

379
        return stacksets, nil
1✔
380
}
381

382
func (c *StackSetController) collectIngresses(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
383
        ingresses, err := c.client.NetworkingV1().Ingresses(c.namespace).List(ctx, metav1.ListOptions{})
1✔
384

1✔
385
        if err != nil {
1✔
386
                return fmt.Errorf("failed to list Ingresses: %v", err)
×
387
        }
×
388

389
        for _, i := range ingresses.Items {
2✔
390
                ingress := i
1✔
391
                if uid, ok := getOwnerUID(ingress.ObjectMeta); ok {
2✔
392
                        // stackset ingress
1✔
393
                        if s, ok := stacksets[uid]; ok {
2✔
394
                                s.Ingress = &ingress
1✔
395
                                continue
1✔
396
                        }
397

398
                        // stack ingress
399
                        for _, stackset := range stacksets {
2✔
400
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
401
                                        if strings.HasSuffix(
1✔
402
                                                ingress.ObjectMeta.Name,
1✔
403
                                                core.SegmentSuffix,
1✔
404
                                        ) {
2✔
405
                                                // Traffic Segment
1✔
406
                                                s.Resources.IngressSegment = &ingress
1✔
407
                                        } else {
2✔
408
                                                s.Resources.Ingress = &ingress
1✔
409
                                        }
1✔
410
                                        break
1✔
411
                                }
412
                        }
413
                }
414
        }
415
        return nil
1✔
416
}
417

418
func (c *StackSetController) collectRouteGroups(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
419
        rgs, err := c.client.RouteGroupV1().RouteGroups(c.namespace).List(
1✔
420
                ctx,
1✔
421
                metav1.ListOptions{},
1✔
422
        )
1✔
423
        if err != nil {
1✔
424
                return fmt.Errorf("failed to list RouteGroups: %v", err)
×
425
        }
×
426

427
        for _, rg := range rgs.Items {
2✔
428
                routegroup := rg
1✔
429
                if uid, ok := getOwnerUID(routegroup.ObjectMeta); ok {
2✔
430
                        // stackset routegroups
1✔
431
                        if s, ok := stacksets[uid]; ok {
2✔
432
                                s.RouteGroup = &routegroup
1✔
433
                                continue
1✔
434
                        }
435

436
                        // stack routegroups
437
                        for _, stackset := range stacksets {
2✔
438
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
439
                                        if strings.HasSuffix(
1✔
440
                                                routegroup.ObjectMeta.Name,
1✔
441
                                                core.SegmentSuffix,
1✔
442
                                        ) {
2✔
443
                                                // Traffic Segment
1✔
444
                                                s.Resources.RouteGroupSegment = &routegroup
1✔
445
                                        } else {
2✔
446
                                                s.Resources.RouteGroup = &routegroup
1✔
447
                                        }
1✔
448
                                        break
1✔
449
                                }
450
                        }
451
                }
452
        }
453
        return nil
1✔
454
}
455

456
func (c *StackSetController) collectStacks(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
457
        stacks, err := c.client.ZalandoV1().Stacks(c.namespace).List(ctx, metav1.ListOptions{})
1✔
458
        if err != nil {
1✔
459
                return fmt.Errorf("failed to list Stacks: %v", err)
×
460
        }
×
461

462
        for _, stack := range stacks.Items {
2✔
463
                if uid, ok := getOwnerUID(stack.ObjectMeta); ok {
2✔
464
                        if s, ok := stacksets[uid]; ok {
2✔
465
                                stack := stack
1✔
466
                                fixupStackTypeMeta(&stack)
1✔
467

1✔
468
                                s.StackContainers[stack.UID] = &core.StackContainer{
1✔
469
                                        Stack: &stack,
1✔
470
                                }
1✔
471
                                continue
1✔
472
                        }
473
                }
474
        }
475
        return nil
1✔
476
}
477

478
func (c *StackSetController) collectDeployments(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
479
        deployments, err := c.client.AppsV1().Deployments(c.namespace).List(ctx, metav1.ListOptions{})
1✔
480
        if err != nil {
1✔
481
                return fmt.Errorf("failed to list Deployments: %v", err)
×
482
        }
×
483

484
        for _, d := range deployments.Items {
2✔
485
                deployment := d
1✔
486
                if uid, ok := getOwnerUID(deployment.ObjectMeta); ok {
2✔
487
                        for _, stackset := range stacksets {
2✔
488
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
489
                                        s.Resources.Deployment = &deployment
1✔
490
                                        break
1✔
491
                                }
492
                        }
493
                }
494
        }
495
        return nil
1✔
496
}
497

498
func (c *StackSetController) collectServices(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
499
        services, err := c.client.CoreV1().Services(c.namespace).List(ctx, metav1.ListOptions{})
1✔
500
        if err != nil {
1✔
501
                return fmt.Errorf("failed to list Services: %v", err)
×
502
        }
×
503

504
Items:
1✔
505
        for _, s := range services.Items {
2✔
506
                service := s
1✔
507
                if uid, ok := getOwnerUID(service.ObjectMeta); ok {
2✔
508
                        for _, stackset := range stacksets {
2✔
509
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
510
                                        s.Resources.Service = &service
1✔
511
                                        continue Items
1✔
512
                                }
513

514
                                // service/HPA used to be owned by the deployment for some reason
515
                                for _, stack := range stackset.StackContainers {
2✔
516
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
517
                                                stack.Resources.Service = &service
1✔
518
                                                continue Items
1✔
519
                                        }
520
                                }
521
                        }
522
                }
523
        }
524
        return nil
1✔
525
}
526

527
func (c *StackSetController) collectHPAs(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
528
        hpas, err := c.client.AutoscalingV2().HorizontalPodAutoscalers(c.namespace).List(ctx, metav1.ListOptions{})
1✔
529
        if err != nil {
1✔
530
                return fmt.Errorf("failed to list HPAs: %v", err)
×
531
        }
×
532

533
Items:
1✔
534
        for _, h := range hpas.Items {
2✔
535
                hpa := h
1✔
536
                if uid, ok := getOwnerUID(hpa.ObjectMeta); ok {
2✔
537
                        for _, stackset := range stacksets {
2✔
538
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
539
                                        s.Resources.HPA = &hpa
1✔
540
                                        continue Items
1✔
541
                                }
542

543
                                // service/HPA used to be owned by the deployment for some reason
544
                                for _, stack := range stackset.StackContainers {
2✔
545
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
546
                                                stack.Resources.HPA = &hpa
1✔
547
                                                continue Items
1✔
548
                                        }
549
                                }
550
                        }
551
                }
552
        }
553
        return nil
1✔
554
}
555

556
func (c *StackSetController) collectConfigMaps(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
557
        configMaps, err := c.client.CoreV1().ConfigMaps(c.namespace).List(ctx, metav1.ListOptions{})
1✔
558
        if err != nil {
1✔
559
                return fmt.Errorf("failed to list ConfigMaps: %v", err)
×
560
        }
×
561

562
        for _, cm := range configMaps.Items {
2✔
563
                configMap := cm
1✔
564
                if uid, ok := getOwnerUID(configMap.ObjectMeta); ok {
2✔
565
                        for _, stackset := range stacksets {
2✔
566
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
567
                                        s.Resources.ConfigMaps = append(s.Resources.ConfigMaps, &configMap)
1✔
568
                                        break
1✔
569
                                }
570
                        }
571
                }
572
        }
573
        return nil
1✔
574
}
575

576
func (c *StackSetController) collectSecrets(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
577
        secrets, err := c.client.CoreV1().Secrets(c.namespace).List(ctx, metav1.ListOptions{})
1✔
578
        if err != nil {
1✔
579
                return fmt.Errorf("failed to list Secrets: %v", err)
×
580
        }
×
581

582
        for _, sct := range secrets.Items {
2✔
583
                secret := sct
1✔
584
                if uid, ok := getOwnerUID(secret.ObjectMeta); ok {
2✔
585
                        for _, stackset := range stacksets {
2✔
586
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
587
                                        s.Resources.Secrets = append(s.Resources.Secrets, &secret)
1✔
588
                                        break
1✔
589
                                }
590
                        }
591
                }
592
        }
593
        return nil
1✔
594
}
595

596
func getOwnerUID(objectMeta metav1.ObjectMeta) (types.UID, bool) {
1✔
597
        if len(objectMeta.OwnerReferences) == 1 {
2✔
598
                return objectMeta.OwnerReferences[0].UID, true
1✔
599
        }
1✔
600
        return "", false
1✔
601
}
602

603
func (c *StackSetController) errorEventf(object runtime.Object, reason string, err error) error {
×
604
        switch err.(type) {
×
605
        case *eventedError:
×
606
                // already notified
×
607
                return err
×
608
        default:
×
609
                c.recorder.Eventf(
×
610
                        object,
×
611
                        v1.EventTypeWarning,
×
612
                        reason,
×
613
                        err.Error())
×
614
                return &eventedError{err: err}
×
615
        }
616
}
617

618
// hasOwnership returns true if the controller is the "owner" of the stackset.
619
// Whether it's owner is determined by the value of the
620
// 'stackset-controller.zalando.org/controller' annotation. If the value
621
// matches the controllerID then it owns it, or if the controllerID is
622
// "" and there's no annotation set.
623
func (c *StackSetController) hasOwnership(stackset *zv1.StackSet) bool {
×
624
        if stackset.Annotations != nil {
×
625
                if owner, ok := stackset.Annotations[StacksetControllerControllerAnnotationKey]; ok {
×
626
                        return owner == c.controllerID
×
627
                }
×
628
        }
629
        return c.controllerID == ""
×
630
}
631

NEW
632
func (c *StackSetController) startWatch(ctx context.Context) error {
×
633
        informer := cache.NewSharedIndexInformer(
×
634
                cache.NewListWatchFromClient(c.client.ZalandoV1().RESTClient(), "stacksets", c.namespace, fields.Everything()),
×
635
                &zv1.StackSet{},
×
636
                0, // skip resync
×
637
                cache.Indexers{},
×
638
        )
×
639

×
NEW
640
        _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
641
                AddFunc:    c.add,
×
642
                UpdateFunc: c.update,
×
643
                DeleteFunc: c.del,
×
644
        })
×
NEW
645
        if err != nil {
×
NEW
646
                return fmt.Errorf("Failed to add event handler: %w", err)
×
NEW
647
        }
×
648

649
        go informer.Run(ctx.Done())
×
650
        if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
×
NEW
651
                return fmt.Errorf("Timed out waiting for caches to sync")
×
652
        }
×
653
        c.logger.Info("Synced StackSet watcher")
×
NEW
654

×
NEW
655
        return nil
×
656
}
657

658
func (c *StackSetController) add(obj interface{}) {
×
659
        stackset, ok := obj.(*zv1.StackSet)
×
660
        if !ok {
×
661
                return
×
662
        }
×
663

664
        c.logger.Infof("New StackSet added %s/%s", stackset.Namespace, stackset.Name)
×
665
        c.stacksetEvents <- stacksetEvent{
×
666
                StackSet: stackset.DeepCopy(),
×
667
        }
×
668
}
669

670
func (c *StackSetController) update(oldObj, newObj interface{}) {
×
671
        newStackset, ok := newObj.(*zv1.StackSet)
×
672
        if !ok {
×
673
                return
×
674
        }
×
675

676
        oldStackset, ok := oldObj.(*zv1.StackSet)
×
677
        if !ok {
×
678
                return
×
679
        }
×
680

681
        c.logger.Debugf("StackSet %s/%s changed: %s",
×
682
                newStackset.Namespace,
×
683
                newStackset.Name,
×
684
                cmp.Diff(oldStackset, newStackset, cmpopts.IgnoreUnexported(resource.Quantity{})),
×
685
        )
×
686

×
687
        c.logger.Infof("StackSet updated %s/%s", newStackset.Namespace, newStackset.Name)
×
688
        c.stacksetEvents <- stacksetEvent{
×
689
                StackSet: newStackset.DeepCopy(),
×
690
        }
×
691
}
692

693
func (c *StackSetController) del(obj interface{}) {
×
694
        stackset, ok := obj.(*zv1.StackSet)
×
695
        if !ok {
×
696
                return
×
697
        }
×
698

699
        c.logger.Infof("StackSet deleted %s/%s", stackset.Namespace, stackset.Name)
×
700
        c.stacksetEvents <- stacksetEvent{
×
701
                StackSet: stackset.DeepCopy(),
×
702
                Deleted:  true,
×
703
        }
×
704
}
705

706
func retryUpdate(updateFn func(retry bool) error) error {
×
707
        retry := false
×
708
        for {
×
709
                err := updateFn(retry)
×
710
                if err != nil {
×
711
                        if errors.IsConflict(err) {
×
712
                                retry = true
×
713
                                continue
×
714
                        }
715
                        return err
×
716
                }
717
                return nil
×
718
        }
719
}
720

721
// ReconcileStatuses reconciles the statuses of StackSets and Stacks.
722
func (c *StackSetController) ReconcileStatuses(ctx context.Context, ssc *core.StackSetContainer) error {
×
723
        for _, sc := range ssc.StackContainers {
×
724
                stack := sc.Stack.DeepCopy()
×
725
                status := *sc.GenerateStackStatus()
×
726
                err := retryUpdate(func(retry bool) error {
×
727
                        if retry {
×
728
                                updated, err := c.client.ZalandoV1().Stacks(sc.Namespace()).Get(ctx, stack.Name, metav1.GetOptions{})
×
729
                                if err != nil {
×
730
                                        return err
×
731
                                }
×
732
                                stack = updated
×
733
                        }
734
                        if !equality.Semantic.DeepEqual(status, stack.Status) {
×
735
                                stack.Status = status
×
736
                                _, err := c.client.ZalandoV1().Stacks(sc.Namespace()).UpdateStatus(ctx, stack, metav1.UpdateOptions{})
×
737
                                return err
×
738
                        }
×
739
                        return nil
×
740
                })
741
                if err != nil {
×
742
                        return c.errorEventf(sc.Stack, "FailedUpdateStackStatus", err)
×
743
                }
×
744
        }
745

746
        stackset := ssc.StackSet.DeepCopy()
×
747
        status := *ssc.GenerateStackSetStatus()
×
748
        err := retryUpdate(func(retry bool) error {
×
749
                if retry {
×
750
                        updated, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).Get(ctx, ssc.StackSet.Name, metav1.GetOptions{})
×
751
                        if err != nil {
×
752
                                return err
×
753
                        }
×
754
                        stackset = updated
×
755
                }
756
                if !equality.Semantic.DeepEqual(status, stackset.Status) {
×
757
                        stackset.Status = status
×
758
                        _, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, stackset, metav1.UpdateOptions{})
×
759
                        return err
×
760
                }
×
761
                return nil
×
762
        })
763
        if err != nil {
×
764
                return c.errorEventf(ssc.StackSet, "FailedUpdateStackSetStatus", err)
×
765
        }
×
766
        return nil
×
767
}
768

769
// ReconcileTrafficSegments updates the traffic segments according to the actual
770
// traffic weight of each stack.
771
//
772
// Returns the ordered list of Trafic Segments that need to be updated.
773
func (c *StackSetController) ReconcileTrafficSegments(
774
        ctx context.Context,
775
        ssc *core.StackSetContainer,
776
) ([]types.UID, error) {
×
777
        // Compute segments
×
778
        toUpdate, err := ssc.ComputeTrafficSegments()
×
779
        if err != nil {
×
780
                return nil, c.errorEventf(ssc.StackSet, "FailedManageSegments", err)
×
781
        }
×
782

783
        return toUpdate, nil
×
784
}
785

786
// CreateCurrentStack creates a new Stack object for the current stack, if needed
787
func (c *StackSetController) CreateCurrentStack(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
788
        newStack, newStackVersion := ssc.NewStack()
1✔
789
        if newStack == nil {
2✔
790
                return nil
1✔
791
        }
1✔
792

793
        if c.configMapSupportEnabled || c.secretSupportEnabled {
2✔
794
                // ensure that ConfigurationResources are prefixed by Stack name.
1✔
795
                if err := validateAllConfigurationResourcesNames(newStack.Stack); err != nil {
1✔
796
                        return err
×
797
                }
×
798
        }
799

800
        created, err := c.client.ZalandoV1().Stacks(newStack.Namespace()).Create(ctx, newStack.Stack, metav1.CreateOptions{})
1✔
801
        if err != nil {
1✔
802
                return err
×
803
        }
×
804
        fixupStackTypeMeta(created)
1✔
805

1✔
806
        c.recorder.Eventf(
1✔
807
                ssc.StackSet,
1✔
808
                v1.EventTypeNormal,
1✔
809
                "CreatedStack",
1✔
810
                "Created stack %s",
1✔
811
                newStack.Name(),
1✔
812
        )
1✔
813

1✔
814
        // Persist ObservedStackVersion in the status
1✔
815
        updated := ssc.StackSet.DeepCopy()
1✔
816
        updated.Status.ObservedStackVersion = newStackVersion
1✔
817

1✔
818
        result, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, updated, metav1.UpdateOptions{})
1✔
819
        if err != nil {
1✔
820
                return err
×
821
        }
×
822
        fixupStackSetTypeMeta(result)
1✔
823
        ssc.StackSet = result
1✔
824

1✔
825
        ssc.StackContainers[created.UID] = &core.StackContainer{
1✔
826
                Stack:          created,
1✔
827
                PendingRemoval: false,
1✔
828
                Resources:      core.StackResources{},
1✔
829
        }
1✔
830
        return nil
1✔
831
}
832

833
// CleanupOldStacks deletes stacks that are no longer needed.
834
func (c *StackSetController) CleanupOldStacks(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
835
        for _, sc := range ssc.StackContainers {
2✔
836
                if !sc.PendingRemoval {
2✔
837
                        continue
1✔
838
                }
839

840
                stack := sc.Stack
1✔
841
                err := c.client.ZalandoV1().Stacks(stack.Namespace).Delete(ctx, stack.Name, metav1.DeleteOptions{})
1✔
842
                if err != nil {
1✔
843
                        return c.errorEventf(ssc.StackSet, "FailedDeleteStack", err)
×
844
                }
×
845
                c.recorder.Eventf(
1✔
846
                        ssc.StackSet,
1✔
847
                        v1.EventTypeNormal,
1✔
848
                        "DeletedExcessStack",
1✔
849
                        "Deleted excess stack %s",
1✔
850
                        stack.Name)
1✔
851
        }
852

853
        return nil
1✔
854
}
855

856
// AddUpdateStackSetIngress reconciles the Ingress but never deletes it, it returns the existing/new Ingress
857
func (c *StackSetController) AddUpdateStackSetIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup, ingress *networking.Ingress) (*networking.Ingress, error) {
1✔
858
        // Ingress removed, handled outside
1✔
859
        if ingress == nil {
2✔
860
                return existing, nil
1✔
861
        }
1✔
862

863
        if existing == nil {
2✔
864
                if ingress.Annotations == nil {
2✔
865
                        ingress.Annotations = make(map[string]string)
1✔
866
                }
1✔
867
                ingress.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
868

1✔
869
                createdIng, err := c.client.NetworkingV1().Ingresses(ingress.Namespace).Create(ctx, ingress, metav1.CreateOptions{})
1✔
870
                if err != nil {
1✔
871
                        return nil, err
×
872
                }
×
873
                c.recorder.Eventf(
1✔
874
                        stackset,
1✔
875
                        v1.EventTypeNormal,
1✔
876
                        "CreatedIngress",
1✔
877
                        "Created Ingress %s",
1✔
878
                        ingress.Name)
1✔
879
                return createdIng, nil
1✔
880
        }
881

882
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
883
        if existingHaveUpdateTimeStamp {
2✔
884
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
885
        }
1✔
886

887
        // Check if we need to update the Ingress
888
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(ingress.Spec, existing.Spec) &&
1✔
889
                equality.Semantic.DeepEqual(ingress.Annotations, existing.Annotations) &&
1✔
890
                equality.Semantic.DeepEqual(ingress.Labels, existing.Labels) {
2✔
891
                // add the annotation back after comparing
1✔
892
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
893
                return existing, nil
1✔
894
        }
1✔
895

896
        updated := existing.DeepCopy()
1✔
897
        updated.Spec = ingress.Spec
1✔
898
        if ingress.Annotations != nil {
2✔
899
                updated.Annotations = ingress.Annotations
1✔
900
        } else {
2✔
901
                updated.Annotations = make(map[string]string)
1✔
902
        }
1✔
903
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
904

1✔
905
        updated.Labels = ingress.Labels
1✔
906

1✔
907
        createdIngress, err := c.client.NetworkingV1().Ingresses(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
908
        if err != nil {
1✔
909
                return nil, err
×
910
        }
×
911
        c.recorder.Eventf(
1✔
912
                stackset,
1✔
913
                v1.EventTypeNormal,
1✔
914
                "UpdatedIngress",
1✔
915
                "Updated Ingress %s",
1✔
916
                ingress.Name)
1✔
917
        return createdIngress, nil
1✔
918
}
919

920
func (c *StackSetController) deleteIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup) error {
1✔
921
        // Check if a routegroup exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
922
        if stackset.Spec.RouteGroup != nil && c.routeGroupSupportEnabled {
2✔
923
                if routegroup == nil {
2✔
924
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup missing", existing.Name)
1✔
925
                        return nil
1✔
926
                }
1✔
927
                timestamp, ok := routegroup.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
928
                // The only scenario version we could think of for this is
1✔
929
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
930
                //  in that case, just wait until the RouteGroup has the annotation
1✔
931
                if !ok {
2✔
932
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have the %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
933
                        return nil
1✔
934
                }
1✔
935

936
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
937
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have a valid %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
938
                        return nil
1✔
939
                } else if !ready {
3✔
940
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s updated less than %s ago", existing.Name, routegroup.Name, c.ingressSourceSwitchTTL)
1✔
941
                        return nil
1✔
942
                }
1✔
943
        }
944
        err := c.client.NetworkingV1().Ingresses(existing.Namespace).Delete(ctx, existing.Name, metav1.DeleteOptions{})
1✔
945
        if err != nil {
1✔
946
                return err
×
947
        }
×
948
        c.recorder.Eventf(
1✔
949
                stackset,
1✔
950
                v1.EventTypeNormal,
1✔
951
                "DeletedIngress",
1✔
952
                "Deleted Ingress %s",
1✔
953
                existing.Namespace)
1✔
954
        return nil
1✔
955
}
956

957
// AddUpdateStackSetRouteGroup reconciles the RouteGroup but never deletes it, it returns the existing/new RouteGroup
958
func (c *StackSetController) AddUpdateStackSetRouteGroup(ctx context.Context, stackset *zv1.StackSet, existing *rgv1.RouteGroup, ingress *networking.Ingress, rg *rgv1.RouteGroup) (*rgv1.RouteGroup, error) {
1✔
959
        // RouteGroup removed, handled outside
1✔
960
        if rg == nil {
2✔
961
                return existing, nil
1✔
962
        }
1✔
963

964
        // Create new RouteGroup
965
        if existing == nil {
2✔
966
                if rg.Annotations == nil {
2✔
967
                        rg.Annotations = make(map[string]string)
1✔
968
                }
1✔
969
                rg.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
970

1✔
971
                createdRg, err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Create(ctx, rg, metav1.CreateOptions{})
1✔
972
                if err != nil {
1✔
973
                        return nil, err
×
974
                }
×
975
                c.recorder.Eventf(
1✔
976
                        stackset,
1✔
977
                        v1.EventTypeNormal,
1✔
978
                        "CreatedRouteGroup",
1✔
979
                        "Created RouteGroup %s",
1✔
980
                        rg.Name)
1✔
981
                return createdRg, nil
1✔
982
        }
983

984
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
985
        if existingHaveUpdateTimeStamp {
2✔
986
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
987
        }
1✔
988

989
        // Check if we need to update the RouteGroup
990
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(rg.Spec, existing.Spec) &&
1✔
991
                equality.Semantic.DeepEqual(rg.Annotations, existing.Annotations) &&
1✔
992
                equality.Semantic.DeepEqual(rg.Labels, existing.Labels) {
2✔
993
                // add the annotation back after comparing
1✔
994
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
995
                return existing, nil
1✔
996
        }
1✔
997

998
        updated := existing.DeepCopy()
1✔
999
        updated.Spec = rg.Spec
1✔
1000
        if rg.Annotations != nil {
1✔
1001
                updated.Annotations = rg.Annotations
×
1002
        } else {
1✔
1003
                updated.Annotations = make(map[string]string)
1✔
1004
        }
1✔
1005
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
1006

1✔
1007
        updated.Labels = rg.Labels
1✔
1008

1✔
1009
        createdRg, err := c.client.RouteGroupV1().RouteGroups(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
1010
        if err != nil {
1✔
1011
                return nil, err
×
1012
        }
×
1013
        c.recorder.Eventf(
1✔
1014
                stackset,
1✔
1015
                v1.EventTypeNormal,
1✔
1016
                "UpdatedRouteGroup",
1✔
1017
                "Updated RouteGroup %s",
1✔
1018
                rg.Name)
1✔
1019
        return createdRg, nil
1✔
1020
}
1021

1022
func (c *StackSetController) deleteRouteGroup(ctx context.Context, stackset *zv1.StackSet, rg *rgv1.RouteGroup, ingress *networking.Ingress) error {
1✔
1023
        // Check if an ingress exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
1024
        if stackset.Spec.Ingress != nil {
2✔
1025
                if ingress == nil {
2✔
1026
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress missing", rg.Name)
1✔
1027
                        return nil
1✔
1028
                }
1✔
1029
                timestamp, ok := ingress.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
1030
                // The only scenario version we could think of for this is
1✔
1031
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
1032
                //  in that case, just wait until the RouteGroup has the annotation
1✔
1033
                if !ok {
2✔
1034
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have the %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
1035
                        return nil
1✔
1036
                }
1✔
1037

1038
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
1039
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have a valid %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
1040
                        return nil
1✔
1041
                } else if !ready {
3✔
1042
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s updated less than %s ago", rg.Name, ingress.Name, c.ingressSourceSwitchTTL)
1✔
1043
                        return nil
1✔
1044
                }
1✔
1045
        }
1046
        err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Delete(ctx, rg.Name, metav1.DeleteOptions{})
1✔
1047
        if err != nil {
1✔
1048
                return err
×
1049
        }
×
1050
        c.recorder.Eventf(
1✔
1051
                stackset,
1✔
1052
                v1.EventTypeNormal,
1✔
1053
                "DeletedRouteGroup",
1✔
1054
                "Deleted RouteGroup %s",
1✔
1055
                rg.Namespace)
1✔
1056
        return nil
1✔
1057
}
1058

1059
func (c *StackSetController) ReconcileStackSetIngressSources(
1060
        ctx context.Context,
1061
        stackset *zv1.StackSet,
1062
        existingIng *networking.Ingress,
1063
        existingRg *rgv1.RouteGroup,
1064
        generateIng func() (*networking.Ingress, error),
1065
        generateRg func() (*rgv1.RouteGroup, error),
1066
) error {
1✔
1067
        ingress, err := generateIng()
1✔
1068
        if err != nil {
1✔
1069
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1070
        }
×
1071

1072
        // opt-out existingIng creation in case we have an external entity creating existingIng
1073
        appliedIng, err := c.AddUpdateStackSetIngress(ctx, stackset, existingIng, existingRg, ingress)
1✔
1074
        if err != nil {
1✔
1075
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1076
        }
×
1077

1078
        rg, err := generateRg()
1✔
1079
        if err != nil {
1✔
1080
                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1081
        }
×
1082

1083
        var appliedRg *rgv1.RouteGroup
1✔
1084
        if c.routeGroupSupportEnabled {
2✔
1085
                appliedRg, err = c.AddUpdateStackSetRouteGroup(ctx, stackset, existingRg, appliedIng, rg)
1✔
1086
                if err != nil {
1✔
1087
                        return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1088
                }
×
1089
        }
1090

1091
        // Ingress removed
1092
        if ingress == nil {
2✔
1093
                if existingIng != nil {
2✔
1094
                        err := c.deleteIngress(ctx, stackset, existingIng, appliedRg)
1✔
1095
                        if err != nil {
1✔
1096
                                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1097
                        }
×
1098
                }
1099
        }
1100

1101
        // RouteGroup removed
1102
        if rg == nil {
2✔
1103
                if existingRg != nil {
2✔
1104
                        err := c.deleteRouteGroup(ctx, stackset, existingRg, appliedIng)
1✔
1105
                        if err != nil {
1✔
1106
                                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1107
                        }
×
1108
                }
1109
        }
1110

1111
        return nil
1✔
1112
}
1113

1114
// convertToTrafficSegments removes the central ingress component of the
1115
// StackSet, if and only if the StackSet already has traffic segments.
1116
func (c *StackSetController) convertToTrafficSegments(
1117
        ctx context.Context,
1118
        ssc *core.StackSetContainer,
1119
) error {
×
1120
        if ssc.Ingress == nil && ssc.RouteGroup == nil {
×
1121
                return nil
×
1122
        }
×
1123

1124
        var ingTimestamp, rgTimestamp *metav1.Time
×
1125
        for _, sc := range ssc.StackContainers {
×
1126
                if sc.Stack.Spec.Ingress != nil && sc.Resources.IngressSegment == nil {
×
1127
                        c.logger.Warnf(
×
1128
                                "Not deleting Ingress %s, stack %s doesn't have a segment yet.",
×
1129
                                ssc.Ingress.Name,
×
1130
                                sc.Name(),
×
1131
                        )
×
1132
                        return nil
×
1133
                }
×
1134

1135
                if sc.Stack.Spec.RouteGroup != nil && sc.Resources.RouteGroupSegment == nil {
×
1136
                        c.logger.Warnf(
×
1137
                                "Not deleting RouteGroup %s, stack %s doesn't have a segment yet.",
×
1138
                                ssc.RouteGroup.Name,
×
1139
                                sc.Name(),
×
1140
                        )
×
1141
                        return nil
×
1142
                }
×
1143

1144
                // If we find stacks with a segment, we can delete the
1145
                // central ingress resources.
1146
                if ingTimestamp == nil && sc.Resources.IngressSegment != nil {
×
1147
                        ingTimestamp = &sc.Resources.IngressSegment.CreationTimestamp
×
1148
                }
×
1149

1150
                if rgTimestamp == nil && sc.Resources.RouteGroupSegment != nil {
×
1151
                        rgTimestamp = &sc.Resources.RouteGroupSegment.CreationTimestamp
×
1152
                }
×
1153
        }
1154

1155
        if len(ssc.StackContainers) == 0 {
×
1156
                c.logger.Infof(
×
1157
                        "No stacks found for StackSet %s, safe to delete central "+
×
NEW
1158
                                "ingress/routegroup",
×
1159
                        ssc.StackSet.Name,
×
1160
                )
×
1161

×
1162
                // If we don't have any stacks, we can delete the central ingress
×
1163
                // resources
×
1164
                oldEnough := metav1.NewTime(
×
NEW
1165
                        time.Now().Add(-c.ingressSourceSwitchTTL - time.Minute),
×
1166
                )
×
1167
                ingTimestamp = &oldEnough
×
1168
                rgTimestamp = &oldEnough
×
1169
        }
×
1170

1171
        if ingTimestamp != nil && ssc.Ingress != nil {
×
1172
                if !resourceReadyTime(ingTimestamp.Time, c.ingressSourceSwitchTTL) {
×
1173
                        c.logger.Infof(
×
1174
                                "Not deleting Ingress %s, segments created less than %s ago",
×
1175
                                ssc.Ingress.Name,
×
1176
                                c.ingressSourceSwitchTTL,
×
1177
                        )
×
1178
                        return nil
×
1179
                }
×
1180

1181
                err := c.client.NetworkingV1().Ingresses(ssc.Ingress.Namespace).Delete(
×
1182
                        ctx,
×
1183
                        ssc.Ingress.Name,
×
1184
                        metav1.DeleteOptions{},
×
1185
                )
×
1186
                if err != nil {
×
1187
                        return err
×
1188
                }
×
1189

1190
                c.recorder.Eventf(
×
1191
                        ssc.StackSet,
×
1192
                        v1.EventTypeNormal,
×
1193
                        "DeletedIngress",
×
1194
                        "Deleted Ingress %s, StackSet conversion to traffic segments complete",
×
1195
                        ssc.Ingress.Namespace,
×
1196
                )
×
1197

×
1198
                ssc.Ingress = nil
×
1199
        }
1200

1201
        if rgTimestamp != nil && ssc.RouteGroup != nil {
×
1202
                if !resourceReadyTime(rgTimestamp.Time, c.ingressSourceSwitchTTL) {
×
1203
                        c.logger.Infof(
×
1204
                                "Not deleting RouteGroup %s, segments created less than %s ago",
×
1205
                                ssc.RouteGroup.Name,
×
1206
                                c.ingressSourceSwitchTTL,
×
1207
                        )
×
1208
                        return nil
×
1209
                }
×
1210

1211
                err := c.client.RouteGroupV1().RouteGroups(
×
1212
                        ssc.RouteGroup.Namespace,
×
1213
                ).Delete(
×
1214
                        ctx,
×
1215
                        ssc.RouteGroup.Name,
×
1216
                        metav1.DeleteOptions{},
×
1217
                )
×
1218
                if err != nil {
×
1219
                        return err
×
1220
                }
×
1221

1222
                c.recorder.Eventf(
×
1223
                        ssc.RouteGroup,
×
1224
                        v1.EventTypeNormal,
×
1225
                        "DeletedRouteGroup",
×
1226
                        "Deleted RouteGroup %s, StackSet conversion to traffic segments complete",
×
1227
                        ssc.RouteGroup.Namespace,
×
1228
                )
×
1229

×
1230
                ssc.RouteGroup = nil
×
1231
        }
1232

1233
        return nil
×
1234
}
1235

1236
// ReconcileStackSetResources reconciles the central Ingress and/or RouteGroup
1237
// of the specified StackSet.
1238
//
1239
// If the StackSet supports traffic segments, the controller won't reconcile the
1240
// central ingress resources. This method is deprecated and will be removed in
1241
// the future.
1242
func (c *StackSetController) ReconcileStackSetResources(ctx context.Context, ssc *core.StackSetContainer) error {
×
1243
        if !ssc.SupportsSegmentTraffic() {
×
1244
                err := c.ReconcileStackSetIngressSources(
×
1245
                        ctx,
×
1246
                        ssc.StackSet,
×
1247
                        ssc.Ingress,
×
1248
                        ssc.RouteGroup,
×
1249
                        ssc.GenerateIngress,
×
1250
                        ssc.GenerateRouteGroup,
×
1251
                )
×
1252
                if err != nil {
×
1253
                        return err
×
1254
                }
×
1255
        } else {
×
1256
                // Convert StackSet to traffic segments, if needed
×
1257
                err := c.convertToTrafficSegments(ctx, ssc)
×
1258
                if err != nil {
×
1259
                        return err
×
1260
                }
×
1261
        }
1262

1263
        trafficChanges := ssc.TrafficChanges()
×
1264
        if len(trafficChanges) != 0 {
×
1265
                var changeMessages []string
×
1266
                for _, change := range trafficChanges {
×
1267
                        changeMessages = append(changeMessages, change.String())
×
1268
                }
×
1269

1270
                c.recorder.Eventf(
×
1271
                        ssc.StackSet,
×
1272
                        v1.EventTypeNormal,
×
1273
                        "TrafficSwitched",
×
1274
                        "Switched traffic: %s",
×
1275
                        strings.Join(changeMessages, ", "))
×
1276
        }
1277

1278
        return nil
×
1279
}
1280

1281
func (c *StackSetController) ReconcileStackSetDesiredTraffic(ctx context.Context, existing *zv1.StackSet, generateUpdated func() []*zv1.DesiredTraffic) error {
1✔
1282
        updatedTraffic := generateUpdated()
1✔
1283

1✔
1284
        if equality.Semantic.DeepEqual(existing.Spec.Traffic, updatedTraffic) {
1✔
1285
                return nil
×
1286
        }
×
1287

1288
        updated := existing.DeepCopy()
1✔
1289
        updated.Spec.Traffic = updatedTraffic
1✔
1290

1✔
1291
        _, err := c.client.ZalandoV1().StackSets(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
1292
        if err != nil {
1✔
1293
                return err
×
1294
        }
×
1295
        c.recorder.Eventf(
1✔
1296
                updated,
1✔
1297
                v1.EventTypeNormal,
1✔
1298
                "UpdatedStackSet",
1✔
1299
                "Updated StackSet %s",
1✔
1300
                updated.Name)
1✔
1301
        return nil
1✔
1302
}
1303

1304
func (c *StackSetController) ReconcileStackResources(ctx context.Context, ssc *core.StackSetContainer, sc *core.StackContainer) error {
×
1305
        err := c.ReconcileStackIngress(ctx, sc.Stack, sc.Resources.Ingress, sc.GenerateIngress)
×
1306
        if err != nil {
×
1307
                return c.errorEventf(sc.Stack, "FailedManageIngress", err)
×
1308
        }
×
1309

1310
        // This is to support both central and segment-based traffic.
1311
        if ssc.SupportsSegmentTraffic() {
×
1312
                err = c.ReconcileStackIngress(
×
1313
                        ctx,
×
1314
                        sc.Stack,
×
1315
                        sc.Resources.IngressSegment,
×
1316
                        sc.GenerateIngressSegment,
×
1317
                )
×
1318
                if err != nil {
×
1319
                        return c.errorEventf(sc.Stack, "FailedManageIngressSegment", err)
×
1320
                }
×
1321
        }
1322

1323
        if c.routeGroupSupportEnabled {
×
1324
                err = c.ReconcileStackRouteGroup(ctx, sc.Stack, sc.Resources.RouteGroup, sc.GenerateRouteGroup)
×
1325
                if err != nil {
×
1326
                        return c.errorEventf(sc.Stack, "FailedManageRouteGroup", err)
×
1327
                }
×
1328

1329
                // This is to support both central and segment-based traffic.
1330
                if ssc.SupportsSegmentTraffic() {
×
1331
                        err = c.ReconcileStackRouteGroup(
×
1332
                                ctx,
×
1333
                                sc.Stack,
×
1334
                                sc.Resources.RouteGroupSegment,
×
1335
                                sc.GenerateRouteGroupSegment,
×
1336
                        )
×
1337
                        if err != nil {
×
1338
                                return c.errorEventf(
×
1339
                                        sc.Stack,
×
1340
                                        "FailedManageRouteGroupSegment",
×
1341
                                        err,
×
1342
                                )
×
1343
                        }
×
1344
                }
1345
        }
1346

1347
        if c.configMapSupportEnabled {
×
1348
                err := c.ReconcileStackConfigMapRefs(ctx, sc.Stack, sc.UpdateObjectMeta)
×
1349
                if err != nil {
×
1350
                        return c.errorEventf(sc.Stack, "FailedManageConfigMapRefs", err)
×
1351
                }
×
1352
        }
1353

1354
        if c.secretSupportEnabled {
×
1355
                err := c.ReconcileStackSecretRefs(ctx, sc.Stack, sc.UpdateObjectMeta)
×
1356
                if err != nil {
×
1357
                        return c.errorEventf(sc.Stack, "FailedManageSecretRefs", err)
×
1358
                }
×
1359
        }
1360

1361
        err = c.ReconcileStackDeployment(ctx, sc.Stack, sc.Resources.Deployment, sc.GenerateDeployment)
×
1362
        if err != nil {
×
1363
                return c.errorEventf(sc.Stack, "FailedManageDeployment", err)
×
1364
        }
×
1365

1366
        hpaGenerator := sc.GenerateHPA
×
1367
        if ssc.SupportsSegmentTraffic() {
×
1368
                hpaGenerator = sc.GenerateHPAToSegment
×
1369
        }
×
1370
        err = c.ReconcileStackHPA(ctx, sc.Stack, sc.Resources.HPA, hpaGenerator)
×
1371
        if err != nil {
×
1372
                return c.errorEventf(sc.Stack, "FailedManageHPA", err)
×
1373
        }
×
1374

1375
        err = c.ReconcileStackService(ctx, sc.Stack, sc.Resources.Service, sc.GenerateService)
×
1376
        if err != nil {
×
1377
                return c.errorEventf(sc.Stack, "FailedManageService", err)
×
1378
        }
×
1379

1380
        return nil
×
1381
}
1382

1383
// ReconcileStackSet reconciles all the things from a stackset
1384
func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *core.StackSetContainer) (err error) {
×
1385
        defer func() {
×
1386
                if r := recover(); r != nil {
×
1387
                        c.metricsReporter.ReportPanic()
×
1388
                        c.stacksetLogger(container).Errorf("Encountered a panic while processing a stackset: %v\n%s", r, debug.Stack())
×
1389
                        err = fmt.Errorf("panic: %v", r)
×
1390
                }
×
1391
        }()
1392

1393
        if c.injectSegmentAnnotation(ctx, container.StackSet) {
×
1394
                // Reconciler handles StackSet in the next loop
×
1395
                return nil
×
1396
        }
×
1397

1398
        // Create current stack, if needed. Proceed on errors.
1399
        err = c.CreateCurrentStack(ctx, container)
×
1400
        if err != nil {
×
1401
                err = c.errorEventf(container.StackSet, "FailedCreateStack", err)
×
1402
                c.stacksetLogger(container).Errorf("Unable to create stack: %v", err)
×
1403
        }
×
1404

1405
        // Update statuses from external resources (ingresses, deployments, etc). Abort on errors.
1406
        err = container.UpdateFromResources()
×
1407
        if err != nil {
×
1408
                return err
×
1409
        }
×
1410

1411
        // Update the stacks with the currently selected traffic reconciler. Proceed on errors.
1412
        err = container.ManageTraffic(time.Now())
×
1413
        if err != nil {
×
1414
                c.stacksetLogger(container).Errorf("Traffic reconciliation failed: %v", err)
×
1415
                c.recorder.Eventf(
×
1416
                        container.StackSet,
×
1417
                        v1.EventTypeWarning,
×
1418
                        "TrafficNotSwitched",
×
1419
                        "Failed to switch traffic: "+err.Error())
×
1420
        }
×
1421

1422
        // Mark stacks that should be removed
1423
        container.MarkExpiredStacks()
×
1424

×
1425
        segsInOrder := []types.UID{}
×
1426
        // This is to support both central and segment-based traffic.
×
1427
        if container.SupportsSegmentTraffic() {
×
1428
                // Update traffic segments. Proceed on errors.
×
1429
                segsInOrder, err = c.ReconcileTrafficSegments(ctx, container)
×
1430
                if err != nil {
×
1431
                        err = c.errorEventf(
×
1432
                                container.StackSet,
×
1433
                                reasonFailedManageStackSet,
×
1434
                                err,
×
1435
                        )
×
1436
                        c.stacksetLogger(container).Errorf(
×
1437
                                "Unable to reconcile traffic segments: %v",
×
1438
                                err,
×
1439
                        )
×
1440
                }
×
1441
        }
1442

1443
        // Reconcile stack resources. Proceed on errors.
1444
        reconciledStacks := map[types.UID]bool{}
×
1445
        for _, id := range segsInOrder {
×
1446
                reconciledStacks[id] = true
×
1447
                sc := container.StackContainers[id]
×
1448
                err = c.ReconcileStackResources(ctx, container, sc)
×
1449
                if err != nil {
×
1450
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
1451
                        c.stackLogger(container, sc).Errorf(
×
1452
                                "Unable to reconcile stack resources: %v",
×
1453
                                err,
×
1454
                        )
×
1455
                }
×
1456
        }
1457

1458
        for k, sc := range container.StackContainers {
×
1459
                if reconciledStacks[k] {
×
1460
                        continue
×
1461
                }
1462

1463
                err = c.ReconcileStackResources(ctx, container, sc)
×
1464
                if err != nil {
×
1465
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
1466
                        c.stackLogger(container, sc).Errorf("Unable to reconcile stack resources: %v", err)
×
1467
                }
×
1468
        }
1469

1470
        // Reconcile stackset resources (update ingress and/or routegroups). Proceed on errors.
1471
        err = c.ReconcileStackSetResources(ctx, container)
×
1472
        if err != nil {
×
1473
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1474
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset resources: %v", err)
×
1475
        }
×
1476

1477
        // Reconcile desired traffic in the stackset. Proceed on errors.
1478
        err = c.ReconcileStackSetDesiredTraffic(ctx, container.StackSet, container.GenerateStackSetTraffic)
×
1479
        if err != nil {
×
1480
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1481
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset traffic: %v", err)
×
1482
        }
×
1483

1484
        // Delete old stacks. Proceed on errors.
1485
        err = c.CleanupOldStacks(ctx, container)
×
1486
        if err != nil {
×
1487
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1488
                c.stacksetLogger(container).Errorf("Unable to delete old stacks: %v", err)
×
1489
        }
×
1490

1491
        // Update statuses.
1492
        err = c.ReconcileStatuses(ctx, container)
×
1493
        if err != nil {
×
1494
                return err
×
1495
        }
×
1496

1497
        return nil
×
1498
}
1499

1500
// getResetMinReplicasDelay parses and returns the reset delay if set in the
1501
// stackset annotation.
1502
func getResetMinReplicasDelay(annotations map[string]string) (time.Duration, bool) {
1✔
1503
        resetDelayStr, ok := annotations[ResetHPAMinReplicasDelayAnnotationKey]
1✔
1504
        if !ok {
2✔
1505
                return 0, false
1✔
1506
        }
1✔
1507
        resetDelay, err := time.ParseDuration(resetDelayStr)
1✔
1508
        if err != nil {
1✔
1509
                return 0, false
×
1510
        }
×
1511
        return resetDelay, true
1✔
1512
}
1513

1514
func fixupStackSetTypeMeta(stackset *zv1.StackSet) {
1✔
1515
        // set TypeMeta manually because of this bug:
1✔
1516
        // https://github.com/kubernetes/client-go/issues/308
1✔
1517
        stackset.APIVersion = core.APIVersion
1✔
1518
        stackset.Kind = core.KindStackSet
1✔
1519
}
1✔
1520

1521
func fixupStackTypeMeta(stack *zv1.Stack) {
1✔
1522
        // set TypeMeta manually because of this bug:
1✔
1523
        // https://github.com/kubernetes/client-go/issues/308
1✔
1524
        stack.APIVersion = core.APIVersion
1✔
1525
        stack.Kind = core.KindStack
1✔
1526
}
1✔
1527

1528
func resourceReady(timestamp string, ttl time.Duration) (bool, error) {
1✔
1529
        resourceLastUpdated, err := time.Parse(time.RFC3339, timestamp)
1✔
1530
        if err != nil {
2✔
1531
                // wait until there's a valid timestamp on the annotation
1✔
1532
                return false, err
1✔
1533
        }
1✔
1534

1535
        return resourceReadyTime(resourceLastUpdated, ttl), nil
1✔
1536
}
1537

1538
func resourceReadyTime(timestamp time.Time, ttl time.Duration) bool {
1✔
1539
        if !timestamp.IsZero() && time.Since(timestamp) > ttl {
2✔
1540
                return true
1✔
1541
        }
1✔
1542

1543
        return false
1✔
1544
}
1545

1546
// validateConfigurationResourcesNames returns an error if any ConfigurationResource
1547
// name is not prefixed by Stack name.
1548
func validateAllConfigurationResourcesNames(stack *zv1.Stack) error {
1✔
1549
        for _, rsc := range stack.Spec.ConfigurationResources {
1✔
1550
                if err := validateConfigurationResourceName(stack.Name, rsc.GetName()); err != nil {
×
1551
                        return err
×
1552
                }
×
1553
        }
1554
        return nil
1✔
1555
}
1556

1557
// validateConfigurationResourceName returns an error if specific resource
1558
// name is not prefixed by Stack name.
1559
func validateConfigurationResourceName(stack string, rsc string) error {
1✔
1560
        if !strings.HasPrefix(rsc, stack) {
2✔
1561
                return fmt.Errorf(configurationResourceNameError, rsc, stack)
1✔
1562
        }
1✔
1563
        return nil
1✔
1564
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc