• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando-incubator / stackset-controller / 8156967012

05 Mar 2024 01:19PM UTC coverage: 52.317% (+0.004%) from 52.313%
8156967012

Pull #542

github

linki
temporarily drop feature flags for smaller diff and predictable tests
Pull Request #542: Prevent scaling up stacks without traffic

3 of 3 new or added lines in 1 file covered. (100.0%)

121 existing lines in 2 files now uncovered.

3060 of 5849 relevant lines covered (52.32%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.39
/controller/stackset.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "net/http"
7
        "runtime/debug"
8
        "strings"
9
        "sync"
10
        "time"
11

12
        "github.com/google/go-cmp/cmp"
13
        "github.com/google/go-cmp/cmp/cmpopts"
14
        "github.com/heptiolabs/healthcheck"
15
        "github.com/prometheus/client_golang/prometheus"
16
        log "github.com/sirupsen/logrus"
17
        rgv1 "github.com/szuecs/routegroup-client/apis/zalando.org/v1"
18
        zv1 "github.com/zalando-incubator/stackset-controller/pkg/apis/zalando.org/v1"
19
        "github.com/zalando-incubator/stackset-controller/pkg/clientset"
20
        "github.com/zalando-incubator/stackset-controller/pkg/core"
21
        "github.com/zalando-incubator/stackset-controller/pkg/recorder"
22
        "golang.org/x/sync/errgroup"
23
        v1 "k8s.io/api/core/v1"
24
        networking "k8s.io/api/networking/v1"
25
        "k8s.io/apimachinery/pkg/api/equality"
26
        "k8s.io/apimachinery/pkg/api/errors"
27
        "k8s.io/apimachinery/pkg/api/resource"
28
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
        "k8s.io/apimachinery/pkg/fields"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        "k8s.io/client-go/tools/cache"
33
        kube_record "k8s.io/client-go/tools/record"
34
)
35

36
const (
37
        PrescaleStacksAnnotationKey               = "alpha.stackset-controller.zalando.org/prescale-stacks"
38
        ResetHPAMinReplicasDelayAnnotationKey     = "alpha.stackset-controller.zalando.org/reset-hpa-min-replicas-delay"
39
        StacksetControllerControllerAnnotationKey = "stackset-controller.zalando.org/controller"
40
        ControllerLastUpdatedAnnotationKey        = "stackset-controller.zalando.org/updated-timestamp"
41
        TrafficSegmentsAnnotationKey              = "stackset-controller.zalando.org/use-traffic-segments"
42

43
        reasonFailedManageStackSet = "FailedManageStackSet"
44

45
        defaultResetMinReplicasDelay = 10 * time.Minute
46
)
47

48
var configurationResourceNameError = "ConfigurationResource name must be prefixed by Stack name. ConfigurationResource: %s, Stack: %s"
49

50
// StackSetController is the main controller. It watches for changes to
51
// stackset resources and starts and maintains other controllers per
52
// stackset resource.
53
type StackSetController struct {
54
        logger                      *log.Entry
55
        client                      clientset.Interface
56
        namespace                   string
57
        syncIngressAnnotations      []string
58
        controllerID                string
59
        backendWeightsAnnotationKey string
60
        clusterDomains              []string
61
        interval                    time.Duration
62
        stacksetEvents              chan stacksetEvent
63
        stacksetStore               map[types.UID]zv1.StackSet
64
        recorder                    kube_record.EventRecorder
65
        metricsReporter             *core.MetricsReporter
66
        HealthReporter              healthcheck.Handler
67
        routeGroupSupportEnabled    bool
68
        trafficSegmentsEnabled      bool
69
        annotatedTrafficSegments    bool
70
        ingressSourceSwitchTTL      time.Duration
71
        now                         func() string
72
        reconcileWorkers            int
73
        configMapSupportEnabled     bool
74
        secretSupportEnabled        bool
75
        sync.Mutex
76
}
77

78
type stacksetEvent struct {
79
        Deleted  bool
80
        StackSet *zv1.StackSet
81
}
82

83
// eventedError wraps an error that was already exposed as an event to the user
84
type eventedError struct {
85
        err error
86
}
87

88
func (ee *eventedError) Error() string {
×
89
        return ee.err.Error()
×
90
}
×
91

92
func now() string {
×
93
        return time.Now().Format(time.RFC3339)
×
94
}
×
95

96
// NewStackSetController initializes a new StackSetController.
97
func NewStackSetController(
98
        client clientset.Interface,
99
        namespace string,
100
        controllerID string,
101
        parallelWork int,
102
        backendWeightsAnnotationKey string,
103
        clusterDomains []string,
104
        registry prometheus.Registerer,
105
        interval time.Duration,
106
        routeGroupSupportEnabled bool,
107
        trafficSegmentsEnabled bool,
108
        annotatedTrafficSegments bool,
109
        syncIngressAnnotations []string,
110
        configMapSupportEnabled bool,
111
        secretSupportEnabled bool,
112
        ingressSourceSwitchTTL time.Duration,
113
) (*StackSetController, error) {
1✔
114
        metricsReporter, err := core.NewMetricsReporter(registry)
1✔
115
        if err != nil {
1✔
116
                return nil, err
×
117
        }
×
118

119
        return &StackSetController{
1✔
120
                logger:                      log.WithFields(log.Fields{"controller": "stackset"}),
1✔
121
                client:                      client,
1✔
122
                namespace:                   namespace,
1✔
123
                controllerID:                controllerID,
1✔
124
                backendWeightsAnnotationKey: backendWeightsAnnotationKey,
1✔
125
                clusterDomains:              clusterDomains,
1✔
126
                interval:                    interval,
1✔
127
                stacksetEvents:              make(chan stacksetEvent, 1),
1✔
128
                stacksetStore:               make(map[types.UID]zv1.StackSet),
1✔
129
                recorder:                    recorder.CreateEventRecorder(client),
1✔
130
                metricsReporter:             metricsReporter,
1✔
131
                HealthReporter:              healthcheck.NewHandler(),
1✔
132
                routeGroupSupportEnabled:    routeGroupSupportEnabled,
1✔
133
                trafficSegmentsEnabled:      trafficSegmentsEnabled,
1✔
134
                annotatedTrafficSegments:    annotatedTrafficSegments,
1✔
135
                syncIngressAnnotations:      syncIngressAnnotations,
1✔
136
                ingressSourceSwitchTTL:      ingressSourceSwitchTTL,
1✔
137
                configMapSupportEnabled:     configMapSupportEnabled,
1✔
138
                secretSupportEnabled:        secretSupportEnabled,
1✔
139
                now:                         now,
1✔
140
                reconcileWorkers:            parallelWork,
1✔
141
        }, nil
1✔
142
}
143

144
func (c *StackSetController) stacksetLogger(ssc *core.StackSetContainer) *log.Entry {
×
145
        return c.logger.WithFields(map[string]interface{}{
×
146
                "namespace": ssc.StackSet.Namespace,
×
147
                "stackset":  ssc.StackSet.Name,
×
148
        })
×
149
}
×
150

151
func (c *StackSetController) stackLogger(ssc *core.StackSetContainer, sc *core.StackContainer) *log.Entry {
×
152
        return c.logger.WithFields(map[string]interface{}{
×
153
                "namespace": ssc.StackSet.Namespace,
×
154
                "stackset":  ssc.StackSet.Name,
×
155
                "stack":     sc.Name(),
×
156
        })
×
157
}
×
158

159
// Run runs the main loop of the StackSetController. Before the loops it
160
// sets up a watcher to watch StackSet resources. The watch will send
161
// changes over a channel which is polled from the main loop.
162
func (c *StackSetController) Run(ctx context.Context) {
×
163
        var nextCheck time.Time
×
164

×
165
        // We're not alive if nextCheck is too far in the past
×
166
        c.HealthReporter.AddLivenessCheck("nextCheck", func() error {
×
167
                if time.Since(nextCheck) > 5*c.interval {
×
168
                        return fmt.Errorf("nextCheck too old")
×
169
                }
×
170
                return nil
×
171
        })
172

173
        c.startWatch(ctx)
×
174

×
175
        http.HandleFunc("/healthz", c.HealthReporter.LiveEndpoint)
×
176

×
177
        nextCheck = time.Now().Add(-c.interval)
×
178

×
179
        for {
×
180
                select {
×
181
                case <-time.After(time.Until(nextCheck)):
×
182

×
183
                        nextCheck = time.Now().Add(c.interval)
×
184

×
185
                        stackSetContainers, err := c.collectResources(ctx)
×
186
                        if err != nil {
×
187
                                c.logger.Errorf("Failed to collect resources: %v", err)
×
188
                                continue
×
189
                        }
190

191
                        var reconcileGroup errgroup.Group
×
192
                        reconcileGroup.SetLimit(c.reconcileWorkers)
×
193
                        for stackset, container := range stackSetContainers {
×
194
                                container := container
×
195
                                stackset := stackset
×
196

×
197
                                reconcileGroup.Go(func() error {
×
198
                                        if _, ok := c.stacksetStore[stackset]; ok {
×
199
                                                err := c.ReconcileStackSet(ctx, container)
×
200
                                                if err != nil {
×
201
                                                        c.stacksetLogger(container).Errorf("unable to reconcile a stackset: %v", err)
×
202
                                                        return c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
203
                                                }
×
204
                                        }
205
                                        return nil
×
206
                                })
207
                        }
208

209
                        err = reconcileGroup.Wait()
×
210
                        if err != nil {
×
211
                                c.logger.Errorf("Failed waiting for reconcilers: %v", err)
×
212
                        }
×
213
                        err = c.metricsReporter.Report(stackSetContainers)
×
214
                        if err != nil {
×
215
                                c.logger.Errorf("Failed reporting metrics: %v", err)
×
216
                        }
×
217
                case e := <-c.stacksetEvents:
×
218
                        stackset := *e.StackSet
×
219
                        fixupStackSetTypeMeta(&stackset)
×
220

×
221
                        // update/delete existing entry
×
222
                        if _, ok := c.stacksetStore[stackset.UID]; ok {
×
223
                                if e.Deleted || !c.hasOwnership(&stackset) {
×
224
                                        delete(c.stacksetStore, stackset.UID)
×
225
                                        continue
×
226
                                }
227

228
                                // update stackset entry
229
                                c.stacksetStore[stackset.UID] = stackset
×
230
                                continue
×
231
                        }
232

233
                        // check if stackset should be managed by the controller
234
                        if !c.hasOwnership(&stackset) {
×
235
                                continue
×
236
                        }
237

238
                        c.logger.Infof("Adding entry for StackSet %s/%s", stackset.Namespace, stackset.Name)
×
239
                        c.stacksetStore[stackset.UID] = stackset
×
240
                case <-ctx.Done():
×
241
                        c.logger.Info("Terminating main controller loop.")
×
242
                        return
×
243
                }
244
        }
245
}
246

247
// injectSegmentAnnotation injects the traffic segment annotation if it's not
248
// already present.
249
//
250
// Only inject the traffic segment annotation if the controller has traffic
251
// segments enabled by default, i.e. doesn't matter if the StackSet has the
252
// segment annotation.
253
func (c *StackSetController) injectSegmentAnnotation(
254
        ctx context.Context,
255
        stackSet *zv1.StackSet,
256
) bool {
×
257
        if c.annotatedTrafficSegments {
×
258
                return false
×
259
        }
×
260

261
        if !c.trafficSegmentsEnabled {
×
262
                return false
×
263
        }
×
264

265
        if stackSet.Annotations[TrafficSegmentsAnnotationKey] == "true" {
×
266
                return false
×
267
        }
×
268

269
        stackSet.Annotations[TrafficSegmentsAnnotationKey] = "true"
×
270

×
271
        _, err := c.client.ZalandoV1().StackSets(
×
272
                stackSet.Namespace,
×
273
        ).Update(
×
274
                ctx,
×
275
                stackSet,
×
276
                metav1.UpdateOptions{},
×
277
        )
×
278
        if err != nil {
×
279
                c.logger.Errorf(
×
280
                        "Failed injecting segment annotation: %v",
×
281
                        err,
×
282
                )
×
283
                return false
×
284
        }
×
285
        c.recorder.Eventf(
×
286
                stackSet,
×
287
                v1.EventTypeNormal,
×
288
                "UpdatedStackSet",
×
289
                "Updated StackSet %s. Injected %s annotation",
×
290
                stackSet.Name,
×
291
                TrafficSegmentsAnnotationKey,
×
292
        )
×
293

×
294
        return true
×
295
}
296

297
// collectResources collects resources for all stacksets at once and stores them per StackSet/Stack so that we don't
298
// overload the API requests with unnecessary requests
299
func (c *StackSetController) collectResources(ctx context.Context) (map[types.UID]*core.StackSetContainer, error) {
1✔
300
        stacksets := make(map[types.UID]*core.StackSetContainer, len(c.stacksetStore))
1✔
301
        for uid, stackset := range c.stacksetStore {
2✔
302
                stackset := stackset
1✔
303

1✔
304
                reconciler := core.TrafficReconciler(&core.SimpleTrafficReconciler{})
1✔
305

1✔
306
                // use prescaling logic if enabled with an annotation
1✔
307
                if _, ok := stackset.Annotations[PrescaleStacksAnnotationKey]; ok {
2✔
308
                        resetDelay := defaultResetMinReplicasDelay
1✔
309
                        if resetDelayValue, ok := getResetMinReplicasDelay(stackset.Annotations); ok {
2✔
310
                                resetDelay = resetDelayValue
1✔
311
                        }
1✔
312
                        reconciler = &core.PrescalingTrafficReconciler{
1✔
313
                                ResetHPAMinReplicasTimeout: resetDelay,
1✔
314
                        }
1✔
315
                }
316

317
                stacksetContainer := core.NewContainer(&stackset, reconciler, c.backendWeightsAnnotationKey, c.clusterDomains)
1✔
318
                if c.trafficSegmentsEnabled || c.annotatedTrafficSegments {
2✔
319

1✔
320
                        if stackset.Annotations[TrafficSegmentsAnnotationKey] == "true" {
2✔
321
                                stacksetContainer.EnableSegmentTraffic()
1✔
322
                                stacksetContainer.SynchronizeIngressAnnotations(
1✔
323
                                        c.syncIngressAnnotations,
1✔
324
                                )
1✔
325
                        }
1✔
326
                }
327
                stacksets[uid] = stacksetContainer
1✔
328
        }
329

330
        err := c.collectStacks(ctx, stacksets)
1✔
331
        if err != nil {
1✔
332
                return nil, err
×
333
        }
×
334

335
        err = c.collectIngresses(ctx, stacksets)
1✔
336
        if err != nil {
1✔
337
                return nil, err
×
338
        }
×
339

340
        if c.routeGroupSupportEnabled {
2✔
341
                err = c.collectRouteGroups(ctx, stacksets)
1✔
342
                if err != nil {
1✔
343
                        return nil, err
×
344
                }
×
345
        }
346

347
        err = c.collectDeployments(ctx, stacksets)
1✔
348
        if err != nil {
1✔
349
                return nil, err
×
350
        }
×
351

352
        err = c.collectServices(ctx, stacksets)
1✔
353
        if err != nil {
1✔
354
                return nil, err
×
355
        }
×
356

357
        err = c.collectHPAs(ctx, stacksets)
1✔
358
        if err != nil {
1✔
359
                return nil, err
×
360
        }
×
361

362
        if c.configMapSupportEnabled {
2✔
363
                err = c.collectConfigMaps(ctx, stacksets)
1✔
364
                if err != nil {
1✔
365
                        return nil, err
×
366
                }
×
367
        }
368

369
        if c.secretSupportEnabled {
2✔
370
                err = c.collectSecrets(ctx, stacksets)
1✔
371
                if err != nil {
1✔
372
                        return nil, err
×
373
                }
×
374
        }
375

376
        return stacksets, nil
1✔
377
}
378

379
func (c *StackSetController) collectIngresses(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
380
        ingresses, err := c.client.NetworkingV1().Ingresses(c.namespace).List(ctx, metav1.ListOptions{})
1✔
381

1✔
382
        if err != nil {
1✔
383
                return fmt.Errorf("failed to list Ingresses: %v", err)
×
384
        }
×
385

386
        for _, i := range ingresses.Items {
2✔
387
                ingress := i
1✔
388
                if uid, ok := getOwnerUID(ingress.ObjectMeta); ok {
2✔
389
                        // stackset ingress
1✔
390
                        if s, ok := stacksets[uid]; ok {
2✔
391
                                s.Ingress = &ingress
1✔
392
                                continue
1✔
393
                        }
394

395
                        // stack ingress
396
                        for _, stackset := range stacksets {
2✔
397
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
398
                                        if strings.HasSuffix(
1✔
399
                                                ingress.ObjectMeta.Name,
1✔
400
                                                core.SegmentSuffix,
1✔
401
                                        ) {
2✔
402
                                                // Traffic Segment
1✔
403
                                                s.Resources.IngressSegment = &ingress
1✔
404
                                        } else {
2✔
405
                                                s.Resources.Ingress = &ingress
1✔
406
                                        }
1✔
407
                                        break
1✔
408
                                }
409
                        }
410
                }
411
        }
412
        return nil
1✔
413
}
414

415
func (c *StackSetController) collectRouteGroups(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
416
        rgs, err := c.client.RouteGroupV1().RouteGroups(c.namespace).List(
1✔
417
                ctx,
1✔
418
                metav1.ListOptions{},
1✔
419
        )
1✔
420
        if err != nil {
1✔
421
                return fmt.Errorf("failed to list RouteGroups: %v", err)
×
422
        }
×
423

424
        for _, rg := range rgs.Items {
2✔
425
                routegroup := rg
1✔
426
                if uid, ok := getOwnerUID(routegroup.ObjectMeta); ok {
2✔
427
                        // stackset routegroups
1✔
428
                        if s, ok := stacksets[uid]; ok {
2✔
429
                                s.RouteGroup = &routegroup
1✔
430
                                continue
1✔
431
                        }
432

433
                        // stack routegroups
434
                        for _, stackset := range stacksets {
2✔
435
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
436
                                        if strings.HasSuffix(
1✔
437
                                                routegroup.ObjectMeta.Name,
1✔
438
                                                core.SegmentSuffix,
1✔
439
                                        ) {
2✔
440
                                                // Traffic Segment
1✔
441
                                                s.Resources.RouteGroupSegment = &routegroup
1✔
442
                                        } else {
2✔
443
                                                s.Resources.RouteGroup = &routegroup
1✔
444
                                        }
1✔
445
                                        break
1✔
446
                                }
447
                        }
448
                }
449
        }
450
        return nil
1✔
451
}
452

453
func (c *StackSetController) collectStacks(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
454
        stacks, err := c.client.ZalandoV1().Stacks(c.namespace).List(ctx, metav1.ListOptions{})
1✔
455
        if err != nil {
1✔
456
                return fmt.Errorf("failed to list Stacks: %v", err)
×
457
        }
×
458

459
        for _, stack := range stacks.Items {
2✔
460
                if uid, ok := getOwnerUID(stack.ObjectMeta); ok {
2✔
461
                        if s, ok := stacksets[uid]; ok {
2✔
462
                                stack := stack
1✔
463
                                fixupStackTypeMeta(&stack)
1✔
464

1✔
465
                                s.StackContainers[stack.UID] = &core.StackContainer{
1✔
466
                                        Stack: &stack,
1✔
467
                                }
1✔
468
                                continue
1✔
469
                        }
470
                }
471
        }
472
        return nil
1✔
473
}
474

475
func (c *StackSetController) collectDeployments(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
476
        deployments, err := c.client.AppsV1().Deployments(c.namespace).List(ctx, metav1.ListOptions{})
1✔
477
        if err != nil {
1✔
478
                return fmt.Errorf("failed to list Deployments: %v", err)
×
479
        }
×
480

481
        for _, d := range deployments.Items {
2✔
482
                deployment := d
1✔
483
                if uid, ok := getOwnerUID(deployment.ObjectMeta); ok {
2✔
484
                        for _, stackset := range stacksets {
2✔
485
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
486
                                        s.Resources.Deployment = &deployment
1✔
487
                                        break
1✔
488
                                }
489
                        }
490
                }
491
        }
492
        return nil
1✔
493
}
494

495
func (c *StackSetController) collectServices(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
496
        services, err := c.client.CoreV1().Services(c.namespace).List(ctx, metav1.ListOptions{})
1✔
497
        if err != nil {
1✔
498
                return fmt.Errorf("failed to list Services: %v", err)
×
499
        }
×
500

501
Items:
1✔
502
        for _, s := range services.Items {
2✔
503
                service := s
1✔
504
                if uid, ok := getOwnerUID(service.ObjectMeta); ok {
2✔
505
                        for _, stackset := range stacksets {
2✔
506
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
507
                                        s.Resources.Service = &service
1✔
508
                                        continue Items
1✔
509
                                }
510

511
                                // service/HPA used to be owned by the deployment for some reason
512
                                for _, stack := range stackset.StackContainers {
2✔
513
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
514
                                                stack.Resources.Service = &service
1✔
515
                                                continue Items
1✔
516
                                        }
517
                                }
518
                        }
519
                }
520
        }
521
        return nil
1✔
522
}
523

524
func (c *StackSetController) collectHPAs(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
525
        hpas, err := c.client.AutoscalingV2().HorizontalPodAutoscalers(c.namespace).List(ctx, metav1.ListOptions{})
1✔
526
        if err != nil {
1✔
527
                return fmt.Errorf("failed to list HPAs: %v", err)
×
528
        }
×
529

530
Items:
1✔
531
        for _, h := range hpas.Items {
2✔
532
                hpa := h
1✔
533
                if uid, ok := getOwnerUID(hpa.ObjectMeta); ok {
2✔
534
                        for _, stackset := range stacksets {
2✔
535
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
536
                                        s.Resources.HPA = &hpa
1✔
537
                                        continue Items
1✔
538
                                }
539

540
                                // service/HPA used to be owned by the deployment for some reason
541
                                for _, stack := range stackset.StackContainers {
2✔
542
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
543
                                                stack.Resources.HPA = &hpa
1✔
544
                                                continue Items
1✔
545
                                        }
546
                                }
547
                        }
548
                }
549
        }
550
        return nil
1✔
551
}
552

553
func (c *StackSetController) collectConfigMaps(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
554
        configMaps, err := c.client.CoreV1().ConfigMaps(c.namespace).List(ctx, metav1.ListOptions{})
1✔
555
        if err != nil {
1✔
556
                return fmt.Errorf("failed to list ConfigMaps: %v", err)
×
557
        }
×
558

559
        for _, cm := range configMaps.Items {
2✔
560
                configMap := cm
1✔
561
                if uid, ok := getOwnerUID(configMap.ObjectMeta); ok {
2✔
562
                        for _, stackset := range stacksets {
2✔
563
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
564
                                        s.Resources.ConfigMaps = append(s.Resources.ConfigMaps, &configMap)
1✔
565
                                        break
1✔
566
                                }
567
                        }
568
                }
569
        }
570
        return nil
1✔
571
}
572

573
func (c *StackSetController) collectSecrets(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
574
        secrets, err := c.client.CoreV1().Secrets(c.namespace).List(ctx, metav1.ListOptions{})
1✔
575
        if err != nil {
1✔
576
                return fmt.Errorf("failed to list Secrets: %v", err)
×
577
        }
×
578

579
        for _, sct := range secrets.Items {
2✔
580
                secret := sct
1✔
581
                if uid, ok := getOwnerUID(secret.ObjectMeta); ok {
2✔
582
                        for _, stackset := range stacksets {
2✔
583
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
584
                                        s.Resources.Secrets = append(s.Resources.Secrets, &secret)
1✔
585
                                        break
1✔
586
                                }
587
                        }
588
                }
589
        }
590
        return nil
1✔
591
}
592

593
func getOwnerUID(objectMeta metav1.ObjectMeta) (types.UID, bool) {
1✔
594
        if len(objectMeta.OwnerReferences) == 1 {
2✔
595
                return objectMeta.OwnerReferences[0].UID, true
1✔
596
        }
1✔
597
        return "", false
1✔
598
}
599

600
func (c *StackSetController) errorEventf(object runtime.Object, reason string, err error) error {
×
601
        switch err.(type) {
×
602
        case *eventedError:
×
603
                // already notified
×
604
                return err
×
605
        default:
×
606
                c.recorder.Eventf(
×
607
                        object,
×
608
                        v1.EventTypeWarning,
×
609
                        reason,
×
610
                        err.Error())
×
611
                return &eventedError{err: err}
×
612
        }
613
}
614

615
// hasOwnership returns true if the controller is the "owner" of the stackset.
616
// Whether it's owner is determined by the value of the
617
// 'stackset-controller.zalando.org/controller' annotation. If the value
618
// matches the controllerID then it owns it, or if the controllerID is
619
// "" and there's no annotation set.
620
func (c *StackSetController) hasOwnership(stackset *zv1.StackSet) bool {
×
621
        if stackset.Annotations != nil {
×
622
                if owner, ok := stackset.Annotations[StacksetControllerControllerAnnotationKey]; ok {
×
623
                        return owner == c.controllerID
×
624
                }
×
625
        }
626
        return c.controllerID == ""
×
627
}
628

629
func (c *StackSetController) startWatch(ctx context.Context) {
×
630
        informer := cache.NewSharedIndexInformer(
×
631
                cache.NewListWatchFromClient(c.client.ZalandoV1().RESTClient(), "stacksets", c.namespace, fields.Everything()),
×
632
                &zv1.StackSet{},
×
633
                0, // skip resync
×
634
                cache.Indexers{},
×
635
        )
×
636

×
637
        informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
638
                AddFunc:    c.add,
×
639
                UpdateFunc: c.update,
×
640
                DeleteFunc: c.del,
×
641
        })
×
642
        go informer.Run(ctx.Done())
×
643
        if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
×
644
                c.logger.Errorf("Timed out waiting for caches to sync")
×
645
                return
×
646
        }
×
647
        c.logger.Info("Synced StackSet watcher")
×
648
}
649

650
func (c *StackSetController) add(obj interface{}) {
×
651
        stackset, ok := obj.(*zv1.StackSet)
×
652
        if !ok {
×
653
                return
×
654
        }
×
655

656
        c.logger.Infof("New StackSet added %s/%s", stackset.Namespace, stackset.Name)
×
657
        c.stacksetEvents <- stacksetEvent{
×
658
                StackSet: stackset.DeepCopy(),
×
659
        }
×
660
}
661

662
func (c *StackSetController) update(oldObj, newObj interface{}) {
×
663
        newStackset, ok := newObj.(*zv1.StackSet)
×
664
        if !ok {
×
665
                return
×
666
        }
×
667

668
        oldStackset, ok := oldObj.(*zv1.StackSet)
×
669
        if !ok {
×
670
                return
×
671
        }
×
672

673
        c.logger.Debugf("StackSet %s/%s changed: %s",
×
674
                newStackset.Namespace,
×
675
                newStackset.Name,
×
676
                cmp.Diff(oldStackset, newStackset, cmpopts.IgnoreUnexported(resource.Quantity{})),
×
677
        )
×
678

×
679
        c.logger.Infof("StackSet updated %s/%s", newStackset.Namespace, newStackset.Name)
×
680
        c.stacksetEvents <- stacksetEvent{
×
681
                StackSet: newStackset.DeepCopy(),
×
682
        }
×
683
}
684

685
func (c *StackSetController) del(obj interface{}) {
×
686
        stackset, ok := obj.(*zv1.StackSet)
×
687
        if !ok {
×
688
                return
×
689
        }
×
690

691
        c.logger.Infof("StackSet deleted %s/%s", stackset.Namespace, stackset.Name)
×
692
        c.stacksetEvents <- stacksetEvent{
×
693
                StackSet: stackset.DeepCopy(),
×
694
                Deleted:  true,
×
695
        }
×
696
}
697

698
func retryUpdate(updateFn func(retry bool) error) error {
×
699
        retry := false
×
700
        for {
×
701
                err := updateFn(retry)
×
702
                if err != nil {
×
703
                        if errors.IsConflict(err) {
×
704
                                retry = true
×
705
                                continue
×
706
                        }
707
                        return err
×
708
                }
709
                return nil
×
710
        }
711
}
712

713
// ReconcileStatuses reconciles the statuses of StackSets and Stacks.
714
func (c *StackSetController) ReconcileStatuses(ctx context.Context, ssc *core.StackSetContainer) error {
×
715
        for _, sc := range ssc.StackContainers {
×
716
                stack := sc.Stack.DeepCopy()
×
717
                status := *sc.GenerateStackStatus()
×
718
                err := retryUpdate(func(retry bool) error {
×
719
                        if retry {
×
720
                                updated, err := c.client.ZalandoV1().Stacks(sc.Namespace()).Get(ctx, stack.Name, metav1.GetOptions{})
×
721
                                if err != nil {
×
722
                                        return err
×
723
                                }
×
724
                                stack = updated
×
725
                        }
726
                        if !equality.Semantic.DeepEqual(status, stack.Status) {
×
727
                                stack.Status = status
×
728
                                _, err := c.client.ZalandoV1().Stacks(sc.Namespace()).UpdateStatus(ctx, stack, metav1.UpdateOptions{})
×
729
                                return err
×
730
                        }
×
731
                        return nil
×
732
                })
733
                if err != nil {
×
734
                        return c.errorEventf(sc.Stack, "FailedUpdateStackStatus", err)
×
735
                }
×
736
        }
737

738
        stackset := ssc.StackSet.DeepCopy()
×
739
        status := *ssc.GenerateStackSetStatus()
×
740
        err := retryUpdate(func(retry bool) error {
×
741
                if retry {
×
742
                        updated, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).Get(ctx, ssc.StackSet.Name, metav1.GetOptions{})
×
743
                        if err != nil {
×
744
                                return err
×
745
                        }
×
746
                        stackset = updated
×
747
                }
748
                if !equality.Semantic.DeepEqual(status, stackset.Status) {
×
749
                        stackset.Status = status
×
750
                        _, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, stackset, metav1.UpdateOptions{})
×
751
                        return err
×
752
                }
×
753
                return nil
×
754
        })
755
        if err != nil {
×
756
                return c.errorEventf(ssc.StackSet, "FailedUpdateStackSetStatus", err)
×
757
        }
×
758
        return nil
×
759
}
760

761
// ReconcileTrafficSegments updates the traffic segments according to the actual
762
// traffic weight of each stack.
763
//
764
// Returns the ordered list of Trafic Segments that need to be updated.
765
func (c *StackSetController) ReconcileTrafficSegments(
766
        ctx context.Context,
767
        ssc *core.StackSetContainer,
768
) ([]types.UID, error) {
×
769
        // Compute segments
×
770
        toUpdate, err := ssc.ComputeTrafficSegments()
×
771
        if err != nil {
×
772
                return nil, c.errorEventf(ssc.StackSet, "FailedManageSegments", err)
×
773
        }
×
774

775
        return toUpdate, nil
×
776
}
777

778
// CreateCurrentStack creates a new Stack object for the current stack, if needed
779
func (c *StackSetController) CreateCurrentStack(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
780
        newStack, newStackVersion := ssc.NewStack()
1✔
781
        if newStack == nil {
2✔
782
                return nil
1✔
783
        }
1✔
784

785
        if c.configMapSupportEnabled || c.secretSupportEnabled {
2✔
786
                // ensure that ConfigurationResources are prefixed by Stack name.
1✔
787
                if err := validateAllConfigurationResourcesNames(newStack.Stack); err != nil {
1✔
788
                        return err
×
789
                }
×
790
        }
791

792
        created, err := c.client.ZalandoV1().Stacks(newStack.Namespace()).Create(ctx, newStack.Stack, metav1.CreateOptions{})
1✔
793
        if err != nil {
1✔
794
                return err
×
795
        }
×
796
        fixupStackTypeMeta(created)
1✔
797

1✔
798
        c.recorder.Eventf(
1✔
799
                ssc.StackSet,
1✔
800
                v1.EventTypeNormal,
1✔
801
                "CreatedStack",
1✔
802
                "Created stack %s",
1✔
803
                newStack.Name(),
1✔
804
        )
1✔
805

1✔
806
        // Persist ObservedStackVersion in the status
1✔
807
        updated := ssc.StackSet.DeepCopy()
1✔
808
        updated.Status.ObservedStackVersion = newStackVersion
1✔
809

1✔
810
        result, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, updated, metav1.UpdateOptions{})
1✔
811
        if err != nil {
1✔
812
                return err
×
813
        }
×
814
        fixupStackSetTypeMeta(result)
1✔
815
        ssc.StackSet = result
1✔
816

1✔
817
        ssc.StackContainers[created.UID] = &core.StackContainer{
1✔
818
                Stack:          created,
1✔
819
                PendingRemoval: false,
1✔
820
                Resources:      core.StackResources{},
1✔
821
        }
1✔
822
        return nil
1✔
823
}
824

825
// CleanupOldStacks deletes stacks that are no longer needed.
826
func (c *StackSetController) CleanupOldStacks(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
827
        for _, sc := range ssc.StackContainers {
2✔
828
                if !sc.PendingRemoval {
2✔
829
                        continue
1✔
830
                }
831

832
                stack := sc.Stack
1✔
833
                err := c.client.ZalandoV1().Stacks(stack.Namespace).Delete(ctx, stack.Name, metav1.DeleteOptions{})
1✔
834
                if err != nil {
1✔
835
                        return c.errorEventf(ssc.StackSet, "FailedDeleteStack", err)
×
836
                }
×
837
                c.recorder.Eventf(
1✔
838
                        ssc.StackSet,
1✔
839
                        v1.EventTypeNormal,
1✔
840
                        "DeletedExcessStack",
1✔
841
                        "Deleted excess stack %s",
1✔
842
                        stack.Name)
1✔
843
        }
844

845
        return nil
1✔
846
}
847

848
// AddUpdateStackSetIngress reconciles the Ingress but never deletes it, it returns the existing/new Ingress
849
func (c *StackSetController) AddUpdateStackSetIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup, ingress *networking.Ingress) (*networking.Ingress, error) {
1✔
850
        // Ingress removed, handled outside
1✔
851
        if ingress == nil {
2✔
852
                return existing, nil
1✔
853
        }
1✔
854

855
        if existing == nil {
2✔
856
                if ingress.Annotations == nil {
2✔
857
                        ingress.Annotations = make(map[string]string)
1✔
858
                }
1✔
859
                ingress.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
860

1✔
861
                createdIng, err := c.client.NetworkingV1().Ingresses(ingress.Namespace).Create(ctx, ingress, metav1.CreateOptions{})
1✔
862
                if err != nil {
1✔
863
                        return nil, err
×
864
                }
×
865
                c.recorder.Eventf(
1✔
866
                        stackset,
1✔
867
                        v1.EventTypeNormal,
1✔
868
                        "CreatedIngress",
1✔
869
                        "Created Ingress %s",
1✔
870
                        ingress.Name)
1✔
871
                return createdIng, nil
1✔
872
        }
873

874
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
875
        if existingHaveUpdateTimeStamp {
2✔
876
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
877
        }
1✔
878

879
        // Check if we need to update the Ingress
880
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(ingress.Spec, existing.Spec) &&
1✔
881
                equality.Semantic.DeepEqual(ingress.Annotations, existing.Annotations) &&
1✔
882
                equality.Semantic.DeepEqual(ingress.Labels, existing.Labels) {
2✔
883
                // add the annotation back after comparing
1✔
884
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
885
                return existing, nil
1✔
886
        }
1✔
887

888
        updated := existing.DeepCopy()
1✔
889
        updated.Spec = ingress.Spec
1✔
890
        if ingress.Annotations != nil {
2✔
891
                updated.Annotations = ingress.Annotations
1✔
892
        } else {
2✔
893
                updated.Annotations = make(map[string]string)
1✔
894
        }
1✔
895
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
896

1✔
897
        updated.Labels = ingress.Labels
1✔
898

1✔
899
        createdIngress, err := c.client.NetworkingV1().Ingresses(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
900
        if err != nil {
1✔
901
                return nil, err
×
902
        }
×
903
        c.recorder.Eventf(
1✔
904
                stackset,
1✔
905
                v1.EventTypeNormal,
1✔
906
                "UpdatedIngress",
1✔
907
                "Updated Ingress %s",
1✔
908
                ingress.Name)
1✔
909
        return createdIngress, nil
1✔
910
}
911

912
func (c *StackSetController) deleteIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup) error {
1✔
913
        // Check if a routegroup exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
914
        if stackset.Spec.RouteGroup != nil && c.routeGroupSupportEnabled {
2✔
915
                if routegroup == nil {
2✔
916
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup missing", existing.Name)
1✔
917
                        return nil
1✔
918
                }
1✔
919
                timestamp, ok := routegroup.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
920
                // The only scenario version we could think of for this is
1✔
921
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
922
                //  in that case, just wait until the RouteGroup has the annotation
1✔
923
                if !ok {
2✔
924
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have the %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
925
                        return nil
1✔
926
                }
1✔
927

928
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
929
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have a valid %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
930
                        return nil
1✔
931
                } else if !ready {
3✔
932
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s updated less than %s ago", existing.Name, routegroup.Name, c.ingressSourceSwitchTTL)
1✔
933
                        return nil
1✔
934
                }
1✔
935
        }
936
        err := c.client.NetworkingV1().Ingresses(existing.Namespace).Delete(ctx, existing.Name, metav1.DeleteOptions{})
1✔
937
        if err != nil {
1✔
938
                return err
×
939
        }
×
940
        c.recorder.Eventf(
1✔
941
                stackset,
1✔
942
                v1.EventTypeNormal,
1✔
943
                "DeletedIngress",
1✔
944
                "Deleted Ingress %s",
1✔
945
                existing.Namespace)
1✔
946
        return nil
1✔
947
}
948

949
// AddUpdateStackSetRouteGroup reconciles the RouteGroup but never deletes it, it returns the existing/new RouteGroup
950
func (c *StackSetController) AddUpdateStackSetRouteGroup(ctx context.Context, stackset *zv1.StackSet, existing *rgv1.RouteGroup, ingress *networking.Ingress, rg *rgv1.RouteGroup) (*rgv1.RouteGroup, error) {
1✔
951
        // RouteGroup removed, handled outside
1✔
952
        if rg == nil {
2✔
953
                return existing, nil
1✔
954
        }
1✔
955

956
        // Create new RouteGroup
957
        if existing == nil {
2✔
958
                if rg.Annotations == nil {
2✔
959
                        rg.Annotations = make(map[string]string)
1✔
960
                }
1✔
961
                rg.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
962

1✔
963
                createdRg, err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Create(ctx, rg, metav1.CreateOptions{})
1✔
964
                if err != nil {
1✔
965
                        return nil, err
×
966
                }
×
967
                c.recorder.Eventf(
1✔
968
                        stackset,
1✔
969
                        v1.EventTypeNormal,
1✔
970
                        "CreatedRouteGroup",
1✔
971
                        "Created RouteGroup %s",
1✔
972
                        rg.Name)
1✔
973
                return createdRg, nil
1✔
974
        }
975

976
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
977
        if existingHaveUpdateTimeStamp {
2✔
978
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
979
        }
1✔
980

981
        // Check if we need to update the RouteGroup
982
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(rg.Spec, existing.Spec) &&
1✔
983
                equality.Semantic.DeepEqual(rg.Annotations, existing.Annotations) &&
1✔
984
                equality.Semantic.DeepEqual(rg.Labels, existing.Labels) {
2✔
985
                // add the annotation back after comparing
1✔
986
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
987
                return existing, nil
1✔
988
        }
1✔
989

990
        updated := existing.DeepCopy()
1✔
991
        updated.Spec = rg.Spec
1✔
992
        if rg.Annotations != nil {
1✔
993
                updated.Annotations = rg.Annotations
×
994
        } else {
1✔
995
                updated.Annotations = make(map[string]string)
1✔
996
        }
1✔
997
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
998

1✔
999
        updated.Labels = rg.Labels
1✔
1000

1✔
1001
        createdRg, err := c.client.RouteGroupV1().RouteGroups(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
1002
        if err != nil {
1✔
1003
                return nil, err
×
1004
        }
×
1005
        c.recorder.Eventf(
1✔
1006
                stackset,
1✔
1007
                v1.EventTypeNormal,
1✔
1008
                "UpdatedRouteGroup",
1✔
1009
                "Updated RouteGroup %s",
1✔
1010
                rg.Name)
1✔
1011
        return createdRg, nil
1✔
1012
}
1013

1014
func (c *StackSetController) deleteRouteGroup(ctx context.Context, stackset *zv1.StackSet, rg *rgv1.RouteGroup, ingress *networking.Ingress) error {
1✔
1015
        // Check if an ingress exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
1016
        if stackset.Spec.Ingress != nil {
2✔
1017
                if ingress == nil {
2✔
1018
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress missing", rg.Name)
1✔
1019
                        return nil
1✔
1020
                }
1✔
1021
                timestamp, ok := ingress.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
1022
                // The only scenario version we could think of for this is
1✔
1023
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
1024
                //  in that case, just wait until the RouteGroup has the annotation
1✔
1025
                if !ok {
2✔
1026
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have the %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
1027
                        return nil
1✔
1028
                }
1✔
1029

1030
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
1031
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have a valid %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
1032
                        return nil
1✔
1033
                } else if !ready {
3✔
1034
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s updated less than %s ago", rg.Name, ingress.Name, c.ingressSourceSwitchTTL)
1✔
1035
                        return nil
1✔
1036
                }
1✔
1037
        }
1038
        err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Delete(ctx, rg.Name, metav1.DeleteOptions{})
1✔
1039
        if err != nil {
1✔
1040
                return err
×
1041
        }
×
1042
        c.recorder.Eventf(
1✔
1043
                stackset,
1✔
1044
                v1.EventTypeNormal,
1✔
1045
                "DeletedRouteGroup",
1✔
1046
                "Deleted RouteGroup %s",
1✔
1047
                rg.Namespace)
1✔
1048
        return nil
1✔
1049
}
1050

1051
func (c *StackSetController) ReconcileStackSetIngressSources(
1052
        ctx context.Context,
1053
        stackset *zv1.StackSet,
1054
        existingIng *networking.Ingress,
1055
        existingRg *rgv1.RouteGroup,
1056
        generateIng func() (*networking.Ingress, error),
1057
        generateRg func() (*rgv1.RouteGroup, error),
1058
) error {
1✔
1059
        ingress, err := generateIng()
1✔
1060
        if err != nil {
1✔
1061
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1062
        }
×
1063

1064
        // opt-out existingIng creation in case we have an external entity creating existingIng
1065
        appliedIng, err := c.AddUpdateStackSetIngress(ctx, stackset, existingIng, existingRg, ingress)
1✔
1066
        if err != nil {
1✔
1067
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1068
        }
×
1069

1070
        rg, err := generateRg()
1✔
1071
        if err != nil {
1✔
1072
                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1073
        }
×
1074

1075
        var appliedRg *rgv1.RouteGroup
1✔
1076
        if c.routeGroupSupportEnabled {
2✔
1077
                appliedRg, err = c.AddUpdateStackSetRouteGroup(ctx, stackset, existingRg, appliedIng, rg)
1✔
1078
                if err != nil {
1✔
1079
                        return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1080
                }
×
1081
        }
1082

1083
        // Ingress removed
1084
        if ingress == nil {
2✔
1085
                if existingIng != nil {
2✔
1086
                        err := c.deleteIngress(ctx, stackset, existingIng, appliedRg)
1✔
1087
                        if err != nil {
1✔
1088
                                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1089
                        }
×
1090
                }
1091
        }
1092

1093
        // RouteGroup removed
1094
        if rg == nil {
2✔
1095
                if existingRg != nil {
2✔
1096
                        err := c.deleteRouteGroup(ctx, stackset, existingRg, appliedIng)
1✔
1097
                        if err != nil {
1✔
1098
                                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1099
                        }
×
1100
                }
1101
        }
1102

1103
        return nil
1✔
1104
}
1105

1106
// convertToTrafficSegments removes the central ingress component of the
1107
// StackSet, if and only if the StackSet already has traffic segments.
1108
func (c *StackSetController) convertToTrafficSegments(
1109
        ctx context.Context,
1110
        ssc *core.StackSetContainer,
1111
) error {
×
1112
        if ssc.Ingress == nil && ssc.RouteGroup == nil {
×
1113
                return nil
×
1114
        }
×
1115

1116
        var ingTimestamp, rgTimestamp *metav1.Time
×
1117
        for _, sc := range ssc.StackContainers {
×
1118
                if sc.Stack.Spec.Ingress != nil && sc.Resources.IngressSegment == nil {
×
1119
                        c.logger.Warnf(
×
1120
                                "Not deleting Ingress %s, stack %s doesn't have a segment yet.",
×
1121
                                ssc.Ingress.Name,
×
1122
                                sc.Name(),
×
1123
                        )
×
1124
                        return nil
×
1125
                }
×
1126

1127
                if sc.Stack.Spec.RouteGroup != nil && sc.Resources.RouteGroupSegment == nil {
×
1128
                        c.logger.Warnf(
×
1129
                                "Not deleting RouteGroup %s, stack %s doesn't have a segment yet.",
×
1130
                                ssc.RouteGroup.Name,
×
1131
                                sc.Name(),
×
1132
                        )
×
1133
                        return nil
×
1134
                }
×
1135

1136
                // If we find stacks with a segment, we can delete the
1137
                // central ingress resources.
1138
                if ingTimestamp == nil && sc.Resources.IngressSegment != nil {
×
1139
                        ingTimestamp = &sc.Resources.IngressSegment.CreationTimestamp
×
1140
                }
×
1141

1142
                if rgTimestamp == nil && sc.Resources.RouteGroupSegment != nil {
×
1143
                        rgTimestamp = &sc.Resources.RouteGroupSegment.CreationTimestamp
×
1144
                }
×
1145
        }
1146

1147
        if ingTimestamp != nil && ssc.Ingress != nil {
×
1148
                if !resourceReadyTime(ingTimestamp.Time, c.ingressSourceSwitchTTL) {
×
1149
                        c.logger.Infof(
×
1150
                                "Not deleting Ingress %s, segments created less than %s ago",
×
1151
                                ssc.Ingress.Name,
×
1152
                                c.ingressSourceSwitchTTL,
×
1153
                        )
×
1154
                        return nil
×
1155
                }
×
1156

1157
                err := c.client.NetworkingV1().Ingresses(ssc.Ingress.Namespace).Delete(
×
1158
                        ctx,
×
1159
                        ssc.Ingress.Name,
×
1160
                        metav1.DeleteOptions{},
×
1161
                )
×
UNCOV
1162
                if err != nil {
×
1163
                        return err
×
1164
                }
×
1165

1166
                c.recorder.Eventf(
×
1167
                        ssc.StackSet,
×
1168
                        v1.EventTypeNormal,
×
1169
                        "DeletedIngress",
×
1170
                        "Deleted Ingress %s, StackSet conversion to traffic segments complete",
×
1171
                        ssc.Ingress.Namespace,
×
UNCOV
1172
                )
×
1173

×
1174
                ssc.Ingress = nil
×
1175
        }
1176

1177
        if rgTimestamp != nil && ssc.RouteGroup != nil {
×
1178
                if !resourceReadyTime(rgTimestamp.Time, c.ingressSourceSwitchTTL) {
×
1179
                        c.logger.Infof(
×
1180
                                "Not deleting RouteGroup %s, segments created less than %s ago",
×
UNCOV
1181
                                ssc.RouteGroup.Name,
×
1182
                                c.ingressSourceSwitchTTL,
×
1183
                        )
×
1184
                        return nil
×
1185
                }
×
1186

1187
                err := c.client.RouteGroupV1().RouteGroups(
×
1188
                        ssc.RouteGroup.Namespace,
×
1189
                ).Delete(
×
1190
                        ctx,
×
UNCOV
1191
                        ssc.RouteGroup.Name,
×
UNCOV
1192
                        metav1.DeleteOptions{},
×
1193
                )
×
1194
                if err != nil {
×
1195
                        return err
×
1196
                }
×
1197

1198
                c.recorder.Eventf(
×
1199
                        ssc.RouteGroup,
×
1200
                        v1.EventTypeNormal,
×
1201
                        "DeletedRouteGroup",
×
UNCOV
1202
                        "Deleted RouteGroup %s, StackSet conversion to traffic segments complete",
×
1203
                        ssc.RouteGroup.Namespace,
×
1204
                )
×
1205

×
1206
                ssc.RouteGroup = nil
×
1207
        }
1208

1209
        return nil
×
1210
}
1211

1212
// ReconcileStackSetResources reconciles the central Ingress and/or RouteGroup
1213
// of the specified StackSet.
1214
//
1215
// If the StackSet supports traffic segments, the controller won't reconcile the
1216
// central ingress resources. This method is deprecated and will be removed in
1217
// the future.
1218
func (c *StackSetController) ReconcileStackSetResources(ctx context.Context, ssc *core.StackSetContainer) error {
×
1219
        if !ssc.SupportsSegmentTraffic() {
×
1220
                err := c.ReconcileStackSetIngressSources(
×
1221
                        ctx,
×
1222
                        ssc.StackSet,
×
UNCOV
1223
                        ssc.Ingress,
×
UNCOV
1224
                        ssc.RouteGroup,
×
1225
                        ssc.GenerateIngress,
×
UNCOV
1226
                        ssc.GenerateRouteGroup,
×
UNCOV
1227
                )
×
UNCOV
1228
                if err != nil {
×
UNCOV
1229
                        return err
×
UNCOV
1230
                }
×
UNCOV
1231
        } else {
×
UNCOV
1232
                // Convert StackSet to traffic segments, if needed
×
UNCOV
1233
                err := c.convertToTrafficSegments(ctx, ssc)
×
1234
                if err != nil {
×
1235
                        return err
×
1236
                }
×
1237
        }
1238

1239
        trafficChanges := ssc.TrafficChanges()
×
1240
        if len(trafficChanges) != 0 {
×
1241
                var changeMessages []string
×
1242
                for _, change := range trafficChanges {
×
1243
                        changeMessages = append(changeMessages, change.String())
×
1244
                }
×
1245

1246
                c.recorder.Eventf(
×
1247
                        ssc.StackSet,
×
1248
                        v1.EventTypeNormal,
×
1249
                        "TrafficSwitched",
×
1250
                        "Switched traffic: %s",
×
1251
                        strings.Join(changeMessages, ", "))
×
1252
        }
1253

UNCOV
1254
        return nil
×
1255
}
1256

1257
func (c *StackSetController) ReconcileStackSetDesiredTraffic(ctx context.Context, existing *zv1.StackSet, generateUpdated func() []*zv1.DesiredTraffic) error {
1✔
1258
        updatedTraffic := generateUpdated()
1✔
1259

1✔
1260
        if equality.Semantic.DeepEqual(existing.Spec.Traffic, updatedTraffic) {
1✔
UNCOV
1261
                return nil
×
1262
        }
×
1263

1264
        updated := existing.DeepCopy()
1✔
1265
        updated.Spec.Traffic = updatedTraffic
1✔
1266

1✔
1267
        _, err := c.client.ZalandoV1().StackSets(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
1268
        if err != nil {
1✔
UNCOV
1269
                return err
×
1270
        }
×
1271
        c.recorder.Eventf(
1✔
1272
                updated,
1✔
1273
                v1.EventTypeNormal,
1✔
1274
                "UpdatedStackSet",
1✔
1275
                "Updated StackSet %s",
1✔
1276
                updated.Name)
1✔
1277
        return nil
1✔
1278
}
1279

UNCOV
1280
func (c *StackSetController) ReconcileStackResources(ctx context.Context, ssc *core.StackSetContainer, sc *core.StackContainer) error {
×
UNCOV
1281
        err := c.ReconcileStackIngress(ctx, sc.Stack, sc.Resources.Ingress, sc.GenerateIngress)
×
UNCOV
1282
        if err != nil {
×
UNCOV
1283
                return c.errorEventf(sc.Stack, "FailedManageIngress", err)
×
UNCOV
1284
        }
×
1285

1286
        // This is to support both central and segment-based traffic.
UNCOV
1287
        if ssc.SupportsSegmentTraffic() {
×
UNCOV
1288
                err = c.ReconcileStackIngress(
×
UNCOV
1289
                        ctx,
×
UNCOV
1290
                        sc.Stack,
×
UNCOV
1291
                        sc.Resources.IngressSegment,
×
UNCOV
1292
                        sc.GenerateIngressSegment,
×
UNCOV
1293
                )
×
UNCOV
1294
                if err != nil {
×
UNCOV
1295
                        return c.errorEventf(sc.Stack, "FailedManageIngressSegment", err)
×
1296
                }
×
1297
        }
1298

1299
        if c.routeGroupSupportEnabled {
×
1300
                err = c.ReconcileStackRouteGroup(ctx, sc.Stack, sc.Resources.RouteGroup, sc.GenerateRouteGroup)
×
UNCOV
1301
                if err != nil {
×
UNCOV
1302
                        return c.errorEventf(sc.Stack, "FailedManageRouteGroup", err)
×
1303
                }
×
1304

1305
                // This is to support both central and segment-based traffic.
1306
                if ssc.SupportsSegmentTraffic() {
×
1307
                        err = c.ReconcileStackRouteGroup(
×
1308
                                ctx,
×
1309
                                sc.Stack,
×
1310
                                sc.Resources.RouteGroupSegment,
×
1311
                                sc.GenerateRouteGroupSegment,
×
1312
                        )
×
UNCOV
1313
                        if err != nil {
×
UNCOV
1314
                                return c.errorEventf(
×
1315
                                        sc.Stack,
×
1316
                                        "FailedManageRouteGroupSegment",
×
1317
                                        err,
×
1318
                                )
×
1319
                        }
×
1320
                }
1321
        }
1322

1323
        if c.configMapSupportEnabled {
×
1324
                err = c.ReconcileStackConfigMapRefs(ctx, sc.Stack, sc.Resources.ConfigMaps, sc.UpdateObjectMeta)
×
1325
                if err != nil {
×
1326
                        return c.errorEventf(sc.Stack, "FailedManageConfigMapRefs", err)
×
1327
                }
×
1328
        }
1329

1330
        if c.secretSupportEnabled {
×
1331
                err := c.ReconcileStackSecretRefs(ctx, sc.Stack, sc.Resources.Secrets, sc.UpdateObjectMeta)
×
1332
                if err != nil {
×
1333
                        return c.errorEventf(sc.Stack, "FailedManageSecretRefs", err)
×
1334
                }
×
1335
        }
1336

UNCOV
1337
        err = c.ReconcileStackDeployment(ctx, sc.Stack, sc.Resources.Deployment, sc.GenerateDeployment)
×
UNCOV
1338
        if err != nil {
×
1339
                return c.errorEventf(sc.Stack, "FailedManageDeployment", err)
×
1340
        }
×
1341

1342
        hpaGenerator := sc.GenerateHPA
×
1343
        if ssc.SupportsSegmentTraffic() {
×
UNCOV
1344
                hpaGenerator = sc.GenerateHPAToSegment
×
UNCOV
1345
        }
×
1346
        err = c.ReconcileStackHPA(ctx, sc.Stack, sc.Resources.HPA, hpaGenerator)
×
1347
        if err != nil {
×
1348
                return c.errorEventf(sc.Stack, "FailedManageHPA", err)
×
1349
        }
×
1350

UNCOV
1351
        err = c.ReconcileStackService(ctx, sc.Stack, sc.Resources.Service, sc.GenerateService)
×
UNCOV
1352
        if err != nil {
×
1353
                return c.errorEventf(sc.Stack, "FailedManageService", err)
×
1354
        }
×
1355

1356
        return nil
×
1357
}
1358

1359
// ReconcileStackSet reconciles all the things from a stackset
1360
func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *core.StackSetContainer) (err error) {
×
1361
        defer func() {
×
1362
                if r := recover(); r != nil {
×
1363
                        c.metricsReporter.ReportPanic()
×
1364
                        c.stacksetLogger(container).Errorf("Encountered a panic while processing a stackset: %v\n%s", r, debug.Stack())
×
1365
                        err = fmt.Errorf("panic: %v", r)
×
UNCOV
1366
                }
×
1367
        }()
1368

1369
        if c.injectSegmentAnnotation(ctx, container.StackSet) {
×
1370
                // Reconciler handles StackSet in the next loop
×
UNCOV
1371
                return nil
×
1372
        }
×
1373

1374
        // Create current stack, if needed. Proceed on errors.
UNCOV
1375
        err = c.CreateCurrentStack(ctx, container)
×
1376
        if err != nil {
×
1377
                err = c.errorEventf(container.StackSet, "FailedCreateStack", err)
×
1378
                c.stacksetLogger(container).Errorf("Unable to create stack: %v", err)
×
1379
        }
×
1380

1381
        // Update statuses from external resources (ingresses, deployments, etc). Abort on errors.
1382
        err = container.UpdateFromResources()
×
UNCOV
1383
        if err != nil {
×
UNCOV
1384
                return err
×
1385
        }
×
1386

1387
        // Update the stacks with the currently selected traffic reconciler. Proceed on errors.
1388
        err = container.ManageTraffic(time.Now())
×
UNCOV
1389
        if err != nil {
×
UNCOV
1390
                c.stacksetLogger(container).Errorf("Traffic reconciliation failed: %v", err)
×
1391
                c.recorder.Eventf(
×
1392
                        container.StackSet,
×
1393
                        v1.EventTypeWarning,
×
1394
                        "TrafficNotSwitched",
×
1395
                        "Failed to switch traffic: "+err.Error())
×
UNCOV
1396
        }
×
1397

1398
        // Mark stacks that should be removed
1399
        container.MarkExpiredStacks()
×
1400

×
1401
        segsInOrder := []types.UID{}
×
UNCOV
1402
        // This is to support both central and segment-based traffic.
×
UNCOV
1403
        if container.SupportsSegmentTraffic() {
×
1404
                // Update traffic segments. Proceed on errors.
×
1405
                segsInOrder, err = c.ReconcileTrafficSegments(ctx, container)
×
1406
                if err != nil {
×
1407
                        err = c.errorEventf(
×
1408
                                container.StackSet,
×
1409
                                reasonFailedManageStackSet,
×
1410
                                err,
×
1411
                        )
×
1412
                        c.stacksetLogger(container).Errorf(
×
UNCOV
1413
                                "Unable to reconcile traffic segments: %v",
×
UNCOV
1414
                                err,
×
1415
                        )
×
1416
                }
×
1417
        }
1418

1419
        // Reconcile stack resources. Proceed on errors.
1420
        reconciledStacks := map[types.UID]bool{}
×
1421
        for _, id := range segsInOrder {
×
1422
                reconciledStacks[id] = true
×
1423
                sc := container.StackContainers[id]
×
1424
                err = c.ReconcileStackResources(ctx, container, sc)
×
1425
                if err != nil {
×
1426
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
1427
                        c.stackLogger(container, sc).Errorf(
×
1428
                                "Unable to reconcile stack resources: %v",
×
1429
                                err,
×
1430
                        )
×
1431
                }
×
1432
        }
1433

UNCOV
1434
        for k, sc := range container.StackContainers {
×
UNCOV
1435
                if reconciledStacks[k] {
×
1436
                        continue
×
1437
                }
1438

1439
                err = c.ReconcileStackResources(ctx, container, sc)
×
1440
                if err != nil {
×
1441
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
1442
                        c.stackLogger(container, sc).Errorf("Unable to reconcile stack resources: %v", err)
×
1443
                }
×
1444
        }
1445

1446
        // Reconcile stackset resources (update ingress and/or routegroups). Proceed on errors.
1447
        err = c.ReconcileStackSetResources(ctx, container)
×
UNCOV
1448
        if err != nil {
×
UNCOV
1449
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1450
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset resources: %v", err)
×
1451
        }
×
1452

1453
        // Reconcile desired traffic in the stackset. Proceed on errors.
UNCOV
1454
        err = c.ReconcileStackSetDesiredTraffic(ctx, container.StackSet, container.GenerateStackSetTraffic)
×
1455
        if err != nil {
×
1456
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1457
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset traffic: %v", err)
×
1458
        }
×
1459

1460
        // Delete old stacks. Proceed on errors.
UNCOV
1461
        err = c.CleanupOldStacks(ctx, container)
×
UNCOV
1462
        if err != nil {
×
1463
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1464
                c.stacksetLogger(container).Errorf("Unable to delete old stacks: %v", err)
×
1465
        }
×
1466

1467
        // Update statuses.
UNCOV
1468
        err = c.ReconcileStatuses(ctx, container)
×
UNCOV
1469
        if err != nil {
×
1470
                return err
×
1471
        }
×
1472

1473
        return nil
×
1474
}
1475

1476
// getResetMinReplicasDelay parses and returns the reset delay if set in the
1477
// stackset annotation.
1478
func getResetMinReplicasDelay(annotations map[string]string) (time.Duration, bool) {
1✔
1479
        resetDelayStr, ok := annotations[ResetHPAMinReplicasDelayAnnotationKey]
1✔
1480
        if !ok {
2✔
1481
                return 0, false
1✔
1482
        }
1✔
1483
        resetDelay, err := time.ParseDuration(resetDelayStr)
1✔
1484
        if err != nil {
1✔
1485
                return 0, false
×
1486
        }
×
1487
        return resetDelay, true
1✔
1488
}
1489

1490
func fixupStackSetTypeMeta(stackset *zv1.StackSet) {
1✔
1491
        // set TypeMeta manually because of this bug:
1✔
1492
        // https://github.com/kubernetes/client-go/issues/308
1✔
1493
        stackset.APIVersion = core.APIVersion
1✔
1494
        stackset.Kind = core.KindStackSet
1✔
1495
}
1✔
1496

1497
func fixupStackTypeMeta(stack *zv1.Stack) {
1✔
1498
        // set TypeMeta manually because of this bug:
1✔
1499
        // https://github.com/kubernetes/client-go/issues/308
1✔
1500
        stack.APIVersion = core.APIVersion
1✔
1501
        stack.Kind = core.KindStack
1✔
1502
}
1✔
1503

1504
func resourceReady(timestamp string, ttl time.Duration) (bool, error) {
1✔
1505
        resourceLastUpdated, err := time.Parse(time.RFC3339, timestamp)
1✔
1506
        if err != nil {
2✔
1507
                // wait until there's a valid timestamp on the annotation
1✔
1508
                return false, err
1✔
1509
        }
1✔
1510

1511
        return resourceReadyTime(resourceLastUpdated, ttl), nil
1✔
1512
}
1513

1514
func resourceReadyTime(timestamp time.Time, ttl time.Duration) bool {
1✔
1515
        if !timestamp.IsZero() && time.Since(timestamp) > ttl {
2✔
1516
                return true
1✔
1517
        }
1✔
1518

1519
        return false
1✔
1520
}
1521

1522
// validateConfigurationResourcesNames returns an error if any ConfigurationResource
1523
// name is not prefixed by Stack name.
1524
func validateAllConfigurationResourcesNames(stack *zv1.Stack) error {
1✔
1525
        for _, rsc := range stack.Spec.ConfigurationResources {
1✔
UNCOV
1526
                if !strings.HasPrefix(rsc.GetName(), stack.Name) {
×
UNCOV
1527
                        return fmt.Errorf(configurationResourceNameError, rsc.GetName(), stack.Name)
×
UNCOV
1528
                }
×
1529
        }
1530
        return nil
1✔
1531
}
1532

1533
// validateConfigurationResourceName returns an error if specific resource
1534
// name is not prefixed by Stack name.
1535
func validateConfigurationResourceName(stack string, rsc string) error {
1✔
1536
        if !strings.HasPrefix(rsc, stack) {
1✔
UNCOV
1537
                return fmt.Errorf(configurationResourceNameError, rsc, stack)
×
UNCOV
1538
        }
×
1539
        return nil
1✔
1540
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc