• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando-incubator / stackset-controller / 7206110607

14 Dec 2023 08:09AM UTC coverage: 71.066% (-2.2%) from 73.254%
7206110607

Pull #495

github

gargravarr
Decouple segment computing and generation; configure options to completely disable segments.
Pull Request #495: Add support for traffic segments.

318 of 561 new or added lines in 7 files covered. (56.68%)

1 existing line in 1 file now uncovered.

2606 of 3667 relevant lines covered (71.07%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.39
/controller/stackset.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "net/http"
7
        "runtime/debug"
8
        "strings"
9
        "sync"
10
        "time"
11

12
        "github.com/google/go-cmp/cmp"
13
        "github.com/google/go-cmp/cmp/cmpopts"
14
        "github.com/heptiolabs/healthcheck"
15
        "github.com/prometheus/client_golang/prometheus"
16
        log "github.com/sirupsen/logrus"
17
        rgv1 "github.com/szuecs/routegroup-client/apis/zalando.org/v1"
18
        zv1 "github.com/zalando-incubator/stackset-controller/pkg/apis/zalando.org/v1"
19
        "github.com/zalando-incubator/stackset-controller/pkg/clientset"
20
        "github.com/zalando-incubator/stackset-controller/pkg/core"
21
        "github.com/zalando-incubator/stackset-controller/pkg/recorder"
22
        "golang.org/x/sync/errgroup"
23
        v1 "k8s.io/api/core/v1"
24
        networking "k8s.io/api/networking/v1"
25
        "k8s.io/apimachinery/pkg/api/equality"
26
        "k8s.io/apimachinery/pkg/api/errors"
27
        "k8s.io/apimachinery/pkg/api/resource"
28
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
        "k8s.io/apimachinery/pkg/fields"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        "k8s.io/client-go/tools/cache"
33
        kube_record "k8s.io/client-go/tools/record"
34
)
35

36
const (
37
        PrescaleStacksAnnotationKey               = "alpha.stackset-controller.zalando.org/prescale-stacks"
38
        ResetHPAMinReplicasDelayAnnotationKey     = "alpha.stackset-controller.zalando.org/reset-hpa-min-replicas-delay"
39
        StacksetControllerControllerAnnotationKey = "stackset-controller.zalando.org/controller"
40
        ControllerLastUpdatedAnnotationKey        = "stackset-controller.zalando.org/updated-timestamp"
41
        TrafficSegmentsAnnotationKey              = "stackset-controller.zalando.org/use-traffic-segments"
42

43
        reasonFailedManageStackSet = "FailedManageStackSet"
44

45
        defaultResetMinReplicasDelay = 10 * time.Minute
46
)
47

48
// StackSetController is the main controller. It watches for changes to
49
// stackset resources and starts and maintains other controllers per
50
// stackset resource.
51
type StackSetController struct {
52
        logger                      *log.Entry
53
        client                      clientset.Interface
54
        controllerID                string
55
        backendWeightsAnnotationKey string
56
        clusterDomains              []string
57
        interval                    time.Duration
58
        stacksetEvents              chan stacksetEvent
59
        stacksetStore               map[types.UID]zv1.StackSet
60
        recorder                    kube_record.EventRecorder
61
        metricsReporter             *core.MetricsReporter
62
        HealthReporter              healthcheck.Handler
63
        routeGroupSupportEnabled    bool
64
        trafficSegmentsEnabled      bool
65
        annotatedTrafficSegments    bool
66
        ingressSourceSwitchTTL      time.Duration
67
        now                         func() string
68
        reconcileWorkers            int
69
        configMapSupportEnabled     bool
70
        sync.Mutex
71
}
72

73
type stacksetEvent struct {
74
        Deleted  bool
75
        StackSet *zv1.StackSet
76
}
77

78
// eventedError wraps an error that was already exposed as an event to the user
79
type eventedError struct {
80
        err error
81
}
82

83
func (ee *eventedError) Error() string {
×
84
        return ee.err.Error()
×
85
}
×
86

87
func now() string {
×
88
        return time.Now().Format(time.RFC3339)
×
89
}
×
90

91
// NewStackSetController initializes a new StackSetController.
92
func NewStackSetController(
93
        client clientset.Interface,
94
        controllerID string,
95
        parallelWork int,
96
        backendWeightsAnnotationKey string,
97
        clusterDomains []string,
98
        registry prometheus.Registerer,
99
        interval time.Duration,
100
        routeGroupSupportEnabled bool,
101
        trafficSegmentsEnabled bool,
102
        annotatedTrafficSegments bool,
103
        configMapSupportEnabled bool,
104
        ingressSourceSwitchTTL time.Duration,
105
) (*StackSetController, error) {
1✔
106
        metricsReporter, err := core.NewMetricsReporter(registry)
1✔
107
        if err != nil {
1✔
108
                return nil, err
×
109
        }
×
110

111
        return &StackSetController{
1✔
112
                logger:                      log.WithFields(log.Fields{"controller": "stackset"}),
1✔
113
                client:                      client,
1✔
114
                controllerID:                controllerID,
1✔
115
                backendWeightsAnnotationKey: backendWeightsAnnotationKey,
1✔
116
                clusterDomains:              clusterDomains,
1✔
117
                interval:                    interval,
1✔
118
                stacksetEvents:              make(chan stacksetEvent, 1),
1✔
119
                stacksetStore:               make(map[types.UID]zv1.StackSet),
1✔
120
                recorder:                    recorder.CreateEventRecorder(client),
1✔
121
                metricsReporter:             metricsReporter,
1✔
122
                HealthReporter:              healthcheck.NewHandler(),
1✔
123
                routeGroupSupportEnabled:    routeGroupSupportEnabled,
1✔
124
                trafficSegmentsEnabled:      trafficSegmentsEnabled,
1✔
125
                annotatedTrafficSegments:    annotatedTrafficSegments,
1✔
126
                ingressSourceSwitchTTL:      ingressSourceSwitchTTL,
1✔
127
                configMapSupportEnabled:     configMapSupportEnabled,
1✔
128
                now:                         now,
1✔
129
                reconcileWorkers:            parallelWork,
1✔
130
        }, nil
1✔
131
}
132

133
func (c *StackSetController) stacksetLogger(ssc *core.StackSetContainer) *log.Entry {
×
134
        return c.logger.WithFields(map[string]interface{}{
×
135
                "namespace": ssc.StackSet.Namespace,
×
136
                "stackset":  ssc.StackSet.Name,
×
137
        })
×
138
}
×
139

140
func (c *StackSetController) stackLogger(ssc *core.StackSetContainer, sc *core.StackContainer) *log.Entry {
×
141
        return c.logger.WithFields(map[string]interface{}{
×
142
                "namespace": ssc.StackSet.Namespace,
×
143
                "stackset":  ssc.StackSet.Name,
×
144
                "stack":     sc.Name(),
×
145
        })
×
146
}
×
147

148
// Run runs the main loop of the StackSetController. Before the loops it
149
// sets up a watcher to watch StackSet resources. The watch will send
150
// changes over a channel which is polled from the main loop.
151
func (c *StackSetController) Run(ctx context.Context) {
×
152
        var nextCheck time.Time
×
153

×
154
        // We're not alive if nextCheck is too far in the past
×
155
        c.HealthReporter.AddLivenessCheck("nextCheck", func() error {
×
156
                if time.Since(nextCheck) > 5*c.interval {
×
157
                        return fmt.Errorf("nextCheck too old")
×
158
                }
×
159
                return nil
×
160
        })
161

162
        c.startWatch(ctx)
×
163

×
164
        http.HandleFunc("/healthz", c.HealthReporter.LiveEndpoint)
×
165

×
166
        nextCheck = time.Now().Add(-c.interval)
×
167

×
168
        for {
×
169
                select {
×
170
                case <-time.After(time.Until(nextCheck)):
×
171

×
172
                        nextCheck = time.Now().Add(c.interval)
×
173

×
174
                        stackSetContainers, err := c.collectResources(ctx)
×
175
                        if err != nil {
×
176
                                c.logger.Errorf("Failed to collect resources: %v", err)
×
177
                                continue
×
178
                        }
179

180
                        var reconcileGroup errgroup.Group
×
181
                        reconcileGroup.SetLimit(c.reconcileWorkers)
×
182
                        for stackset, container := range stackSetContainers {
×
183
                                container := container
×
184
                                stackset := stackset
×
185

×
186
                                reconcileGroup.Go(func() error {
×
187
                                        if _, ok := c.stacksetStore[stackset]; ok {
×
188
                                                err := c.ReconcileStackSet(ctx, container)
×
189
                                                if err != nil {
×
190
                                                        c.stacksetLogger(container).Errorf("unable to reconcile a stackset: %v", err)
×
191
                                                        return c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
192
                                                }
×
193
                                        }
194
                                        return nil
×
195
                                })
196
                        }
197

198
                        err = reconcileGroup.Wait()
×
199
                        if err != nil {
×
200
                                c.logger.Errorf("Failed waiting for reconcilers: %v", err)
×
201
                        }
×
202
                        err = c.metricsReporter.Report(stackSetContainers)
×
203
                        if err != nil {
×
204
                                c.logger.Errorf("Failed reporting metrics: %v", err)
×
205
                        }
×
206
                case e := <-c.stacksetEvents:
×
207
                        stackset := *e.StackSet
×
208
                        fixupStackSetTypeMeta(&stackset)
×
209

×
210
                        // update/delete existing entry
×
211
                        if _, ok := c.stacksetStore[stackset.UID]; ok {
×
212
                                if e.Deleted || !c.hasOwnership(&stackset) {
×
213
                                        delete(c.stacksetStore, stackset.UID)
×
214
                                        continue
×
215
                                }
216

217
                                // update stackset entry
218
                                c.stacksetStore[stackset.UID] = stackset
×
219
                                continue
×
220
                        }
221

222
                        // check if stackset should be managed by the controller
223
                        if !c.hasOwnership(&stackset) {
×
224
                                continue
×
225
                        }
226

227
                        c.logger.Infof("Adding entry for StackSet %s/%s", stackset.Namespace, stackset.Name)
×
228
                        c.stacksetStore[stackset.UID] = stackset
×
229
                case <-ctx.Done():
×
230
                        c.logger.Info("Terminating main controller loop.")
×
231
                        return
×
232
                }
233
        }
234
}
235

236
// injectSegmentAnnotation injects the traffic segment annotation if it's not
237
// already present.
238
//
239
// Only inject the traffic segment annotation if the controller has traffic
240
// segments enabled by default, i.e. doesn't matter if the StackSet has the
241
// segment annotation.
242
func (c *StackSetController) injectSegmentAnnotation(
243
        ctx context.Context,
244
        stackSet *zv1.StackSet,
NEW
245
) bool {
×
NEW
246
        if !c.trafficSegmentsEnabled {
×
NEW
247
                return false
×
NEW
248
        }
×
249

NEW
250
        if c.annotatedTrafficSegments {
×
NEW
251
                return false
×
NEW
252
        }
×
253

NEW
254
        if stackSet.Annotations[TrafficSegmentsAnnotationKey] == "true" {
×
NEW
255
                return false
×
NEW
256
        }
×
257

NEW
258
        stackSet.Annotations[TrafficSegmentsAnnotationKey] = "true"
×
NEW
259

×
NEW
260
        _, err := c.client.ZalandoV1().StackSets(
×
NEW
261
                stackSet.Namespace,
×
NEW
262
        ).Update(
×
NEW
263
                ctx,
×
NEW
264
                stackSet,
×
NEW
265
                metav1.UpdateOptions{},
×
NEW
266
        )
×
NEW
267
        if err != nil {
×
NEW
268
                c.logger.Errorf(
×
NEW
269
                        "Failed injecting segment annotation: %v",
×
NEW
270
                        err,
×
NEW
271
                )
×
NEW
272
                return false
×
NEW
273
        }
×
NEW
274
        c.recorder.Eventf(
×
NEW
275
                stackSet,
×
NEW
276
                v1.EventTypeNormal,
×
NEW
277
                "UpdatedStackSet",
×
NEW
278
                "Updated StackSet %s",
×
NEW
279
                stackSet.Name,
×
NEW
280
        )
×
NEW
281

×
NEW
282
        return true
×
283
}
284

285
// collectResources collects resources for all stacksets at once and stores them per StackSet/Stack so that we don't
286
// overload the API requests with unnecessary requests
287
func (c *StackSetController) collectResources(ctx context.Context) (map[types.UID]*core.StackSetContainer, error) {
1✔
288
        stacksets := make(map[types.UID]*core.StackSetContainer, len(c.stacksetStore))
1✔
289
        for uid, stackset := range c.stacksetStore {
2✔
290
                stackset := stackset
1✔
291

1✔
292
                reconciler := core.TrafficReconciler(&core.SimpleTrafficReconciler{})
1✔
293

1✔
294
                // use prescaling logic if enabled with an annotation
1✔
295
                if _, ok := stackset.Annotations[PrescaleStacksAnnotationKey]; ok {
2✔
296
                        resetDelay := defaultResetMinReplicasDelay
1✔
297
                        if resetDelayValue, ok := getResetMinReplicasDelay(stackset.Annotations); ok {
2✔
298
                                resetDelay = resetDelayValue
1✔
299
                        }
1✔
300
                        reconciler = &core.PrescalingTrafficReconciler{
1✔
301
                                ResetHPAMinReplicasTimeout: resetDelay,
1✔
302
                        }
1✔
303
                }
304

305
                stacksetContainer := core.NewContainer(&stackset, reconciler, c.backendWeightsAnnotationKey, c.clusterDomains)
1✔
306
                if c.trafficSegmentsEnabled &&
1✔
307
                        stackset.Annotations[TrafficSegmentsAnnotationKey] == "true" {
1✔
NEW
308

×
NEW
309
                        stacksetContainer.EnableSegmentTraffic()
×
NEW
310
                }
×
311
                stacksets[uid] = stacksetContainer
1✔
312
        }
313

314
        err := c.collectStacks(ctx, stacksets)
1✔
315
        if err != nil {
1✔
316
                return nil, err
×
317
        }
×
318

319
        err = c.collectIngresses(ctx, stacksets)
1✔
320
        if err != nil {
1✔
321
                return nil, err
×
322
        }
×
323

324
        if c.routeGroupSupportEnabled {
2✔
325
                err = c.collectRouteGroups(ctx, stacksets)
1✔
326
                if err != nil {
1✔
327
                        return nil, err
×
328
                }
×
329
        }
330

331
        err = c.collectDeployments(ctx, stacksets)
1✔
332
        if err != nil {
1✔
333
                return nil, err
×
334
        }
×
335

336
        err = c.collectServices(ctx, stacksets)
1✔
337
        if err != nil {
1✔
338
                return nil, err
×
339
        }
×
340

341
        err = c.collectHPAs(ctx, stacksets)
1✔
342
        if err != nil {
1✔
343
                return nil, err
×
344
        }
×
345

346
        if c.configMapSupportEnabled {
2✔
347
                err = c.collectConfigMaps(ctx, stacksets)
1✔
348
                if err != nil {
1✔
349
                        return nil, err
×
350
                }
×
351
        }
352

353
        return stacksets, nil
1✔
354
}
355

356
func (c *StackSetController) collectIngresses(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
357
        ingresses, err := c.client.NetworkingV1().Ingresses(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
358

1✔
359
        if err != nil {
1✔
360
                return fmt.Errorf("failed to list Ingresses: %v", err)
×
361
        }
×
362

363
        for _, i := range ingresses.Items {
2✔
364
                ingress := i
1✔
365
                if uid, ok := getOwnerUID(ingress.ObjectMeta); ok {
2✔
366
                        // stackset ingress
1✔
367
                        if s, ok := stacksets[uid]; ok {
2✔
368
                                s.Ingress = &ingress
1✔
369
                                continue
1✔
370
                        }
371

372
                        // stack ingress
373
                        for _, stackset := range stacksets {
2✔
374
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
375
                                        if strings.HasSuffix(
1✔
376
                                                ingress.ObjectMeta.Name,
1✔
377
                                                core.SegmentSuffix,
1✔
378
                                        ) {
2✔
379
                                                // Traffic Segment
1✔
380
                                                s.Resources.IngressSegment = &ingress
1✔
381
                                        } else {
2✔
382
                                                s.Resources.Ingress = &ingress
1✔
383
                                        }
1✔
384
                                        break
1✔
385
                                }
386
                        }
387
                }
388
        }
389
        return nil
1✔
390
}
391

392
func (c *StackSetController) collectRouteGroups(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
393
        rgs, err := c.client.RouteGroupV1().RouteGroups(v1.NamespaceAll).List(
1✔
394
                ctx,
1✔
395
                metav1.ListOptions{},
1✔
396
        )
1✔
397
        if err != nil {
1✔
398
                return fmt.Errorf("failed to list RouteGroups: %v", err)
×
399
        }
×
400

401
        for _, rg := range rgs.Items {
2✔
402
                routegroup := rg
1✔
403
                if uid, ok := getOwnerUID(routegroup.ObjectMeta); ok {
2✔
404
                        // stackset routegroups
1✔
405
                        if s, ok := stacksets[uid]; ok {
2✔
406
                                s.RouteGroup = &routegroup
1✔
407
                                continue
1✔
408
                        }
409

410
                        // stack routegroups
411
                        for _, stackset := range stacksets {
2✔
412
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
413
                                        if strings.HasSuffix(
1✔
414
                                                routegroup.ObjectMeta.Name,
1✔
415
                                                core.SegmentSuffix,
1✔
416
                                        ) {
2✔
417
                                                // Traffic Segment
1✔
418
                                                s.Resources.RouteGroupSegment = &routegroup
1✔
419
                                        } else {
2✔
420
                                                s.Resources.RouteGroup = &routegroup
1✔
421
                                        }
1✔
422
                                        break
1✔
423
                                }
424
                        }
425
                }
426
        }
427
        return nil
1✔
428
}
429

430
func (c *StackSetController) collectStacks(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
431
        stacks, err := c.client.ZalandoV1().Stacks(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
432
        if err != nil {
1✔
433
                return fmt.Errorf("failed to list Stacks: %v", err)
×
434
        }
×
435

436
        for _, stack := range stacks.Items {
2✔
437
                if uid, ok := getOwnerUID(stack.ObjectMeta); ok {
2✔
438
                        if s, ok := stacksets[uid]; ok {
2✔
439
                                stack := stack
1✔
440
                                fixupStackTypeMeta(&stack)
1✔
441

1✔
442
                                s.StackContainers[stack.UID] = &core.StackContainer{
1✔
443
                                        Stack: &stack,
1✔
444
                                }
1✔
445
                                continue
1✔
446
                        }
447
                }
448
        }
449
        return nil
1✔
450
}
451

452
func (c *StackSetController) collectDeployments(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
453
        deployments, err := c.client.AppsV1().Deployments(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
454
        if err != nil {
1✔
455
                return fmt.Errorf("failed to list Deployments: %v", err)
×
456
        }
×
457

458
        for _, d := range deployments.Items {
2✔
459
                deployment := d
1✔
460
                if uid, ok := getOwnerUID(deployment.ObjectMeta); ok {
2✔
461
                        for _, stackset := range stacksets {
2✔
462
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
463
                                        s.Resources.Deployment = &deployment
1✔
464
                                        break
1✔
465
                                }
466
                        }
467
                }
468
        }
469
        return nil
1✔
470
}
471

472
func (c *StackSetController) collectServices(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
473
        services, err := c.client.CoreV1().Services(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
474
        if err != nil {
1✔
475
                return fmt.Errorf("failed to list Services: %v", err)
×
476
        }
×
477

478
Items:
1✔
479
        for _, s := range services.Items {
2✔
480
                service := s
1✔
481
                if uid, ok := getOwnerUID(service.ObjectMeta); ok {
2✔
482
                        for _, stackset := range stacksets {
2✔
483
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
484
                                        s.Resources.Service = &service
1✔
485
                                        continue Items
1✔
486
                                }
487

488
                                // service/HPA used to be owned by the deployment for some reason
489
                                for _, stack := range stackset.StackContainers {
2✔
490
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
491
                                                stack.Resources.Service = &service
1✔
492
                                                continue Items
1✔
493
                                        }
494
                                }
495
                        }
496
                }
497
        }
498
        return nil
1✔
499
}
500

501
func (c *StackSetController) collectHPAs(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
502
        hpas, err := c.client.AutoscalingV2().HorizontalPodAutoscalers(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
503
        if err != nil {
1✔
504
                return fmt.Errorf("failed to list HPAs: %v", err)
×
505
        }
×
506

507
Items:
1✔
508
        for _, h := range hpas.Items {
2✔
509
                hpa := h
1✔
510
                if uid, ok := getOwnerUID(hpa.ObjectMeta); ok {
2✔
511
                        for _, stackset := range stacksets {
2✔
512
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
513
                                        s.Resources.HPA = &hpa
1✔
514
                                        continue Items
1✔
515
                                }
516

517
                                // service/HPA used to be owned by the deployment for some reason
518
                                for _, stack := range stackset.StackContainers {
2✔
519
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
520
                                                stack.Resources.HPA = &hpa
1✔
521
                                                continue Items
1✔
522
                                        }
523
                                }
524
                        }
525
                }
526
        }
527
        return nil
1✔
528
}
529

530
func (c *StackSetController) collectConfigMaps(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
531
        configMaps, err := c.client.CoreV1().ConfigMaps(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
532
        if err != nil {
1✔
533
                return fmt.Errorf("failed to list ConfigMaps: %v", err)
×
534
        }
×
535

536
        for _, cm := range configMaps.Items {
2✔
537
                configMap := cm
1✔
538
                if uid, ok := getOwnerUID(configMap.ObjectMeta); ok {
2✔
539
                        for _, stackset := range stacksets {
2✔
540
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
541
                                        s.Resources.ConfigMaps = append(s.Resources.ConfigMaps, &configMap)
1✔
542
                                        break
1✔
543
                                }
544
                        }
545
                }
546
        }
547
        return nil
1✔
548
}
549

550
func getOwnerUID(objectMeta metav1.ObjectMeta) (types.UID, bool) {
1✔
551
        if len(objectMeta.OwnerReferences) == 1 {
2✔
552
                return objectMeta.OwnerReferences[0].UID, true
1✔
553
        }
1✔
554
        return "", false
1✔
555
}
556

557
func (c *StackSetController) errorEventf(object runtime.Object, reason string, err error) error {
×
558
        switch err.(type) {
×
559
        case *eventedError:
×
560
                // already notified
×
561
                return err
×
562
        default:
×
563
                c.recorder.Eventf(
×
564
                        object,
×
565
                        v1.EventTypeWarning,
×
566
                        reason,
×
567
                        err.Error())
×
568
                return &eventedError{err: err}
×
569
        }
570
}
571

572
// hasOwnership returns true if the controller is the "owner" of the stackset.
573
// Whether it's owner is determined by the value of the
574
// 'stackset-controller.zalando.org/controller' annotation. If the value
575
// matches the controllerID then it owns it, or if the controllerID is
576
// "" and there's no annotation set.
577
func (c *StackSetController) hasOwnership(stackset *zv1.StackSet) bool {
×
578
        if stackset.Annotations != nil {
×
579
                if owner, ok := stackset.Annotations[StacksetControllerControllerAnnotationKey]; ok {
×
580
                        return owner == c.controllerID
×
581
                }
×
582
        }
583
        return c.controllerID == ""
×
584
}
585

586
func (c *StackSetController) startWatch(ctx context.Context) {
×
587
        informer := cache.NewSharedIndexInformer(
×
588
                cache.NewListWatchFromClient(c.client.ZalandoV1().RESTClient(), "stacksets", v1.NamespaceAll, fields.Everything()),
×
589
                &zv1.StackSet{},
×
590
                0, // skip resync
×
591
                cache.Indexers{},
×
592
        )
×
593

×
594
        informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
595
                AddFunc:    c.add,
×
596
                UpdateFunc: c.update,
×
597
                DeleteFunc: c.del,
×
598
        })
×
599
        go informer.Run(ctx.Done())
×
600
        if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
×
601
                c.logger.Errorf("Timed out waiting for caches to sync")
×
602
                return
×
603
        }
×
604
        c.logger.Info("Synced StackSet watcher")
×
605
}
606

607
func (c *StackSetController) add(obj interface{}) {
×
608
        stackset, ok := obj.(*zv1.StackSet)
×
609
        if !ok {
×
610
                return
×
611
        }
×
612

613
        c.logger.Infof("New StackSet added %s/%s", stackset.Namespace, stackset.Name)
×
614
        c.stacksetEvents <- stacksetEvent{
×
615
                StackSet: stackset.DeepCopy(),
×
616
        }
×
617
}
618

619
func (c *StackSetController) update(oldObj, newObj interface{}) {
×
620
        newStackset, ok := newObj.(*zv1.StackSet)
×
621
        if !ok {
×
622
                return
×
623
        }
×
624

625
        oldStackset, ok := oldObj.(*zv1.StackSet)
×
626
        if !ok {
×
627
                return
×
628
        }
×
629

630
        c.logger.Debugf("StackSet %s/%s changed: %s",
×
631
                newStackset.Namespace,
×
632
                newStackset.Name,
×
633
                cmp.Diff(oldStackset, newStackset, cmpopts.IgnoreUnexported(resource.Quantity{})),
×
634
        )
×
635

×
636
        c.logger.Infof("StackSet updated %s/%s", newStackset.Namespace, newStackset.Name)
×
637
        c.stacksetEvents <- stacksetEvent{
×
638
                StackSet: newStackset.DeepCopy(),
×
639
        }
×
640
}
641

642
func (c *StackSetController) del(obj interface{}) {
×
643
        stackset, ok := obj.(*zv1.StackSet)
×
644
        if !ok {
×
645
                return
×
646
        }
×
647

648
        c.logger.Infof("StackSet deleted %s/%s", stackset.Namespace, stackset.Name)
×
649
        c.stacksetEvents <- stacksetEvent{
×
650
                StackSet: stackset.DeepCopy(),
×
651
                Deleted:  true,
×
652
        }
×
653
}
654

655
func retryUpdate(updateFn func(retry bool) error) error {
×
656
        retry := false
×
657
        for {
×
658
                err := updateFn(retry)
×
659
                if err != nil {
×
660
                        if errors.IsConflict(err) {
×
661
                                retry = true
×
662
                                continue
×
663
                        }
664
                        return err
×
665
                }
666
                return nil
×
667
        }
668
}
669

670
// ReconcileStatuses reconciles the statuses of StackSets and Stacks.
671
func (c *StackSetController) ReconcileStatuses(ctx context.Context, ssc *core.StackSetContainer) error {
×
672
        for _, sc := range ssc.StackContainers {
×
673
                stack := sc.Stack.DeepCopy()
×
674
                status := *sc.GenerateStackStatus()
×
675
                err := retryUpdate(func(retry bool) error {
×
676
                        if retry {
×
677
                                updated, err := c.client.ZalandoV1().Stacks(sc.Namespace()).Get(ctx, stack.Name, metav1.GetOptions{})
×
678
                                if err != nil {
×
679
                                        return err
×
680
                                }
×
681
                                stack = updated
×
682
                        }
683
                        if !equality.Semantic.DeepEqual(status, stack.Status) {
×
684
                                stack.Status = status
×
685
                                _, err := c.client.ZalandoV1().Stacks(sc.Namespace()).UpdateStatus(ctx, stack, metav1.UpdateOptions{})
×
686
                                return err
×
687
                        }
×
688
                        return nil
×
689
                })
690
                if err != nil {
×
691
                        return c.errorEventf(sc.Stack, "FailedUpdateStackStatus", err)
×
692
                }
×
693
        }
694

695
        stackset := ssc.StackSet.DeepCopy()
×
696
        status := *ssc.GenerateStackSetStatus()
×
697
        err := retryUpdate(func(retry bool) error {
×
698
                if retry {
×
699
                        updated, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).Get(ctx, ssc.StackSet.Name, metav1.GetOptions{})
×
700
                        if err != nil {
×
701
                                return err
×
702
                        }
×
703
                        stackset = updated
×
704
                }
705
                if !equality.Semantic.DeepEqual(status, stackset.Status) {
×
706
                        stackset.Status = status
×
707
                        _, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, stackset, metav1.UpdateOptions{})
×
708
                        return err
×
709
                }
×
710
                return nil
×
711
        })
712
        if err != nil {
×
713
                return c.errorEventf(ssc.StackSet, "FailedUpdateStackSetStatus", err)
×
714
        }
×
715
        return nil
×
716
}
717

718
// ReconcileTrafficSegments updates the traffic segments according to the actual
719
// traffic weight of each stack.
720
//
721
// Returns the ordered list of Trafic Segments that need to be updated.
722
func (c *StackSetController) ReconcileTrafficSegments(
723
        ctx context.Context,
724
        ssc *core.StackSetContainer,
NEW
725
) ([]types.UID, error) {
×
NEW
726
        // Compute segments
×
NEW
727
        toUpdate, err := ssc.ComputeTrafficSegments()
×
NEW
728
        if err != nil {
×
NEW
729
                return nil, c.errorEventf(ssc.StackSet, "FailedManageSegments", err)
×
NEW
730
        }
×
731

NEW
732
        return toUpdate, nil
×
733
}
734

735
// CreateCurrentStack creates a new Stack object for the current stack, if needed
736
func (c *StackSetController) CreateCurrentStack(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
737
        newStack, newStackVersion := ssc.NewStack()
1✔
738
        if newStack == nil {
2✔
739
                return nil
1✔
740
        }
1✔
741

742
        if c.configMapSupportEnabled {
2✔
743
                // ensure that ConfigurationResources are prefixed by Stack name.
1✔
744
                if err := validateConfigurationResourceNames(newStack.Stack); err != nil {
1✔
745
                        return err
×
746
                }
×
747
        }
748

749
        created, err := c.client.ZalandoV1().Stacks(newStack.Namespace()).Create(ctx, newStack.Stack, metav1.CreateOptions{})
1✔
750
        if err != nil {
1✔
751
                return err
×
752
        }
×
753
        fixupStackTypeMeta(created)
1✔
754

1✔
755
        c.recorder.Eventf(
1✔
756
                ssc.StackSet,
1✔
757
                v1.EventTypeNormal,
1✔
758
                "CreatedStack",
1✔
759
                "Created stack %s",
1✔
760
                newStack.Name(),
1✔
761
        )
1✔
762

1✔
763
        // Persist ObservedStackVersion in the status
1✔
764
        updated := ssc.StackSet.DeepCopy()
1✔
765
        updated.Status.ObservedStackVersion = newStackVersion
1✔
766

1✔
767
        result, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, updated, metav1.UpdateOptions{})
1✔
768
        if err != nil {
1✔
769
                return err
×
770
        }
×
771
        fixupStackSetTypeMeta(result)
1✔
772
        ssc.StackSet = result
1✔
773

1✔
774
        ssc.StackContainers[created.UID] = &core.StackContainer{
1✔
775
                Stack:          created,
1✔
776
                PendingRemoval: false,
1✔
777
                Resources:      core.StackResources{},
1✔
778
        }
1✔
779
        return nil
1✔
780
}
781

782
// CleanupOldStacks deletes stacks that are no longer needed.
783
func (c *StackSetController) CleanupOldStacks(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
784
        for _, sc := range ssc.StackContainers {
2✔
785
                if !sc.PendingRemoval {
2✔
786
                        continue
1✔
787
                }
788

789
                stack := sc.Stack
1✔
790
                err := c.client.ZalandoV1().Stacks(stack.Namespace).Delete(ctx, stack.Name, metav1.DeleteOptions{})
1✔
791
                if err != nil {
1✔
792
                        return c.errorEventf(ssc.StackSet, "FailedDeleteStack", err)
×
793
                }
×
794
                c.recorder.Eventf(
1✔
795
                        ssc.StackSet,
1✔
796
                        v1.EventTypeNormal,
1✔
797
                        "DeletedExcessStack",
1✔
798
                        "Deleted excess stack %s",
1✔
799
                        stack.Name)
1✔
800
        }
801

802
        return nil
1✔
803
}
804

805
// AddUpdateStackSetIngress reconciles the Ingress but never deletes it, it returns the existing/new Ingress
806
func (c *StackSetController) AddUpdateStackSetIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup, ingress *networking.Ingress) (*networking.Ingress, error) {
1✔
807
        // Ingress removed, handled outside
1✔
808
        if ingress == nil {
2✔
809
                return existing, nil
1✔
810
        }
1✔
811

812
        if existing == nil {
2✔
813
                if ingress.Annotations == nil {
2✔
814
                        ingress.Annotations = make(map[string]string)
1✔
815
                }
1✔
816
                ingress.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
817

1✔
818
                createdIng, err := c.client.NetworkingV1().Ingresses(ingress.Namespace).Create(ctx, ingress, metav1.CreateOptions{})
1✔
819
                if err != nil {
1✔
820
                        return nil, err
×
821
                }
×
822
                c.recorder.Eventf(
1✔
823
                        stackset,
1✔
824
                        v1.EventTypeNormal,
1✔
825
                        "CreatedIngress",
1✔
826
                        "Created Ingress %s",
1✔
827
                        ingress.Name)
1✔
828
                return createdIng, nil
1✔
829
        }
830

831
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
832
        if existingHaveUpdateTimeStamp {
2✔
833
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
834
        }
1✔
835

836
        // Check if we need to update the Ingress
837
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(ingress.Spec, existing.Spec) &&
1✔
838
                equality.Semantic.DeepEqual(ingress.Annotations, existing.Annotations) &&
1✔
839
                equality.Semantic.DeepEqual(ingress.Labels, existing.Labels) {
2✔
840
                // add the annotation back after comparing
1✔
841
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
842
                return existing, nil
1✔
843
        }
1✔
844

845
        updated := existing.DeepCopy()
1✔
846
        updated.Spec = ingress.Spec
1✔
847
        if ingress.Annotations != nil {
2✔
848
                updated.Annotations = ingress.Annotations
1✔
849
        } else {
2✔
850
                updated.Annotations = make(map[string]string)
1✔
851
        }
1✔
852
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
853

1✔
854
        updated.Labels = ingress.Labels
1✔
855

1✔
856
        createdIngress, err := c.client.NetworkingV1().Ingresses(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
857
        if err != nil {
1✔
858
                return nil, err
×
859
        }
×
860
        c.recorder.Eventf(
1✔
861
                stackset,
1✔
862
                v1.EventTypeNormal,
1✔
863
                "UpdatedIngress",
1✔
864
                "Updated Ingress %s",
1✔
865
                ingress.Name)
1✔
866
        return createdIngress, nil
1✔
867
}
868

869
func (c *StackSetController) deleteIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup) error {
1✔
870
        // Check if a routegroup exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
871
        if stackset.Spec.RouteGroup != nil && c.routeGroupSupportEnabled {
2✔
872
                if routegroup == nil {
2✔
873
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup missing", existing.Name)
1✔
874
                        return nil
1✔
875
                }
1✔
876
                timestamp, ok := routegroup.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
877
                // The only scenario version we could think of for this is
1✔
878
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
879
                //  in that case, just wait until the RouteGroup has the annotation
1✔
880
                if !ok {
2✔
881
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have the %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
882
                        return nil
1✔
883
                }
1✔
884

885
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
886
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have a valid %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
887
                        return nil
1✔
888
                } else if !ready {
3✔
889
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s updated less than %s ago", existing.Name, routegroup.Name, c.ingressSourceSwitchTTL)
1✔
890
                        return nil
1✔
891
                }
1✔
892
        }
893
        err := c.client.NetworkingV1().Ingresses(existing.Namespace).Delete(ctx, existing.Name, metav1.DeleteOptions{})
1✔
894
        if err != nil {
1✔
895
                return err
×
896
        }
×
897
        c.recorder.Eventf(
1✔
898
                stackset,
1✔
899
                v1.EventTypeNormal,
1✔
900
                "DeletedIngress",
1✔
901
                "Deleted Ingress %s",
1✔
902
                existing.Namespace)
1✔
903
        return nil
1✔
904
}
905

906
// AddUpdateStackSetRouteGroup reconciles the RouteGroup but never deletes it, it returns the existing/new RouteGroup
907
func (c *StackSetController) AddUpdateStackSetRouteGroup(ctx context.Context, stackset *zv1.StackSet, existing *rgv1.RouteGroup, ingress *networking.Ingress, rg *rgv1.RouteGroup) (*rgv1.RouteGroup, error) {
1✔
908
        // RouteGroup removed, handled outside
1✔
909
        if rg == nil {
2✔
910
                return existing, nil
1✔
911
        }
1✔
912

913
        // Create new RouteGroup
914
        if existing == nil {
2✔
915
                if rg.Annotations == nil {
2✔
916
                        rg.Annotations = make(map[string]string)
1✔
917
                }
1✔
918
                rg.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
919

1✔
920
                createdRg, err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Create(ctx, rg, metav1.CreateOptions{})
1✔
921
                if err != nil {
1✔
922
                        return nil, err
×
923
                }
×
924
                c.recorder.Eventf(
1✔
925
                        stackset,
1✔
926
                        v1.EventTypeNormal,
1✔
927
                        "CreatedRouteGroup",
1✔
928
                        "Created RouteGroup %s",
1✔
929
                        rg.Name)
1✔
930
                return createdRg, nil
1✔
931
        }
932

933
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
934
        if existingHaveUpdateTimeStamp {
2✔
935
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
936
        }
1✔
937

938
        // Check if we need to update the RouteGroup
939
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(rg.Spec, existing.Spec) &&
1✔
940
                equality.Semantic.DeepEqual(rg.Annotations, existing.Annotations) &&
1✔
941
                equality.Semantic.DeepEqual(rg.Labels, existing.Labels) {
2✔
942
                // add the annotation back after comparing
1✔
943
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
944
                return existing, nil
1✔
945
        }
1✔
946

947
        updated := existing.DeepCopy()
1✔
948
        updated.Spec = rg.Spec
1✔
949
        if rg.Annotations != nil {
1✔
950
                updated.Annotations = rg.Annotations
×
951
        } else {
1✔
952
                updated.Annotations = make(map[string]string)
1✔
953
        }
1✔
954
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
955

1✔
956
        updated.Labels = rg.Labels
1✔
957

1✔
958
        createdRg, err := c.client.RouteGroupV1().RouteGroups(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
959
        if err != nil {
1✔
960
                return nil, err
×
961
        }
×
962
        c.recorder.Eventf(
1✔
963
                stackset,
1✔
964
                v1.EventTypeNormal,
1✔
965
                "UpdatedRouteGroup",
1✔
966
                "Updated RouteGroup %s",
1✔
967
                rg.Name)
1✔
968
        return createdRg, nil
1✔
969
}
970

971
func (c *StackSetController) deleteRouteGroup(ctx context.Context, stackset *zv1.StackSet, rg *rgv1.RouteGroup, ingress *networking.Ingress) error {
1✔
972
        // Check if an ingress exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
973
        if stackset.Spec.Ingress != nil {
2✔
974
                if ingress == nil {
2✔
975
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress missing", rg.Name)
1✔
976
                        return nil
1✔
977
                }
1✔
978
                timestamp, ok := ingress.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
979
                // The only scenario version we could think of for this is
1✔
980
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
981
                //  in that case, just wait until the RouteGroup has the annotation
1✔
982
                if !ok {
2✔
983
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have the %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
984
                        return nil
1✔
985
                }
1✔
986

987
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
988
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have a valid %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
989
                        return nil
1✔
990
                } else if !ready {
3✔
991
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s updated less than %s ago", rg.Name, ingress.Name, c.ingressSourceSwitchTTL)
1✔
992
                        return nil
1✔
993
                }
1✔
994
        }
995
        err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Delete(ctx, rg.Name, metav1.DeleteOptions{})
1✔
996
        if err != nil {
1✔
997
                return err
×
998
        }
×
999
        c.recorder.Eventf(
1✔
1000
                stackset,
1✔
1001
                v1.EventTypeNormal,
1✔
1002
                "DeletedRouteGroup",
1✔
1003
                "Deleted RouteGroup %s",
1✔
1004
                rg.Namespace)
1✔
1005
        return nil
1✔
1006
}
1007

1008
func (c *StackSetController) ReconcileStackSetIngressSources(
1009
        ctx context.Context,
1010
        stackset *zv1.StackSet,
1011
        existingIng *networking.Ingress,
1012
        existingRg *rgv1.RouteGroup,
1013
        generateIng func() (*networking.Ingress, error),
1014
        generateRg func() (*rgv1.RouteGroup, error),
1015
) error {
1✔
1016
        ingress, err := generateIng()
1✔
1017
        if err != nil {
1✔
1018
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1019
        }
×
1020

1021
        // opt-out existingIng creation in case we have an external entity creating existingIng
1022
        appliedIng, err := c.AddUpdateStackSetIngress(ctx, stackset, existingIng, existingRg, ingress)
1✔
1023
        if err != nil {
1✔
1024
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1025
        }
×
1026

1027
        rg, err := generateRg()
1✔
1028
        if err != nil {
1✔
1029
                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1030
        }
×
1031

1032
        var appliedRg *rgv1.RouteGroup
1✔
1033
        if c.routeGroupSupportEnabled {
2✔
1034
                appliedRg, err = c.AddUpdateStackSetRouteGroup(ctx, stackset, existingRg, appliedIng, rg)
1✔
1035
                if err != nil {
1✔
1036
                        return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1037
                }
×
1038
        }
1039

1040
        // Ingress removed
1041
        if ingress == nil {
2✔
1042
                if existingIng != nil {
2✔
1043
                        err := c.deleteIngress(ctx, stackset, existingIng, appliedRg)
1✔
1044
                        if err != nil {
1✔
1045
                                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1046
                        }
×
1047
                }
1048
        }
1049

1050
        // RouteGroup removed
1051
        if rg == nil {
2✔
1052
                if existingRg != nil {
2✔
1053
                        err := c.deleteRouteGroup(ctx, stackset, existingRg, appliedIng)
1✔
1054
                        if err != nil {
1✔
1055
                                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1056
                        }
×
1057
                }
1058
        }
1059

1060
        return nil
1✔
1061
}
1062

1063
// convertToTrafficSegments removes the central ingress component of the
1064
// StackSet, if and only if the StackSet already has traffic segments.
1065
func (c *StackSetController) convertToTrafficSegments(
1066
        ctx context.Context,
1067
        ssc *core.StackSetContainer,
NEW
1068
) error {
×
NEW
1069
        if ssc.Ingress == nil && ssc.RouteGroup == nil {
×
NEW
1070
                return nil
×
NEW
1071
        }
×
1072

NEW
1073
        var ingTimestamp, rgTimestamp *metav1.Time
×
NEW
1074
        for _, sc := range ssc.StackContainers {
×
NEW
1075
                // If we find at least one stack with a segment, we can delete the
×
NEW
1076
                // central ingress resources.
×
NEW
1077
                if ingTimestamp == nil && sc.Resources.IngressSegment != nil {
×
NEW
1078
                        ingTimestamp = &sc.Resources.IngressSegment.CreationTimestamp
×
NEW
1079
                }
×
1080

NEW
1081
                if rgTimestamp == nil && sc.Resources.RouteGroupSegment != nil {
×
NEW
1082
                        rgTimestamp = &sc.Resources.RouteGroupSegment.CreationTimestamp
×
NEW
1083
                }
×
1084

NEW
1085
                if ingTimestamp != nil && rgTimestamp != nil {
×
NEW
1086
                        break
×
1087
                }
1088
        }
1089

NEW
1090
        if ingTimestamp != nil && ssc.Ingress != nil {
×
NEW
1091
                if !resourceReadyTime(ingTimestamp.Time, c.ingressSourceSwitchTTL) {
×
NEW
1092
                        c.logger.Infof(
×
NEW
1093
                                "Not deleting Ingress %s, segments created less than %s ago",
×
NEW
1094
                                ssc.Ingress.Name,
×
NEW
1095
                                c.ingressSourceSwitchTTL,
×
NEW
1096
                        )
×
NEW
1097
                        return nil
×
NEW
1098
                }
×
1099

NEW
1100
                err := c.client.NetworkingV1().Ingresses(ssc.Ingress.Namespace).Delete(
×
NEW
1101
                        ctx,
×
NEW
1102
                        ssc.Ingress.Name,
×
NEW
1103
                        metav1.DeleteOptions{},
×
NEW
1104
                )
×
NEW
1105
                if err != nil {
×
NEW
1106
                        return err
×
NEW
1107
                }
×
1108

NEW
1109
                c.recorder.Eventf(
×
NEW
1110
                        ssc.StackSet,
×
NEW
1111
                        v1.EventTypeNormal,
×
NEW
1112
                        "DeletedIngress",
×
NEW
1113
                        "Deleted Ingress %s, StackSet conversion complete",
×
NEW
1114
                        ssc.Ingress.Namespace,
×
NEW
1115
                )
×
NEW
1116

×
NEW
1117
                ssc.Ingress = nil
×
1118
        }
1119

NEW
1120
        if rgTimestamp != nil && ssc.RouteGroup != nil {
×
NEW
1121
                if !resourceReadyTime(rgTimestamp.Time, c.ingressSourceSwitchTTL) {
×
NEW
1122
                        c.logger.Infof(
×
NEW
1123
                                "Not deleting RouteGroup %s, segments created less than %s ago",
×
NEW
1124
                                ssc.RouteGroup.Name,
×
NEW
1125
                                c.ingressSourceSwitchTTL,
×
NEW
1126
                        )
×
NEW
1127
                        return nil
×
NEW
1128
                }
×
1129

NEW
1130
                err := c.client.RouteGroupV1().RouteGroups(
×
NEW
1131
                        ssc.RouteGroup.Namespace,
×
NEW
1132
                ).Delete(
×
NEW
1133
                        ctx,
×
NEW
1134
                        ssc.RouteGroup.Name,
×
NEW
1135
                        metav1.DeleteOptions{},
×
NEW
1136
                )
×
NEW
1137
                if err != nil {
×
NEW
1138
                        return err
×
NEW
1139
                }
×
1140

NEW
1141
                c.recorder.Eventf(
×
NEW
1142
                        ssc.RouteGroup,
×
NEW
1143
                        v1.EventTypeNormal,
×
NEW
1144
                        "DeletedRouteGroup",
×
NEW
1145
                        "Deleted RouteGroup %s, StackSet conversion complete",
×
NEW
1146
                        ssc.RouteGroup.Namespace,
×
NEW
1147
                )
×
NEW
1148

×
NEW
1149
                ssc.RouteGroup = nil
×
1150
        }
1151

NEW
1152
        return nil
×
1153
}
1154

1155
// ReconcileStackSetResources reconciles the central Ingress and/or RouteGroup
1156
// of the specified StackSet.
1157
//
1158
// If the StackSet supports traffic segments, the controller won't reconcile the
1159
// central ingress resources. This method is deprecated and will be removed in
1160
// the future.
1161
func (c *StackSetController) ReconcileStackSetResources(ctx context.Context, ssc *core.StackSetContainer) error {
×
NEW
1162
        if !ssc.SupportsSegmentTraffic() {
×
NEW
1163
                err := c.ReconcileStackSetIngressSources(
×
NEW
1164
                        ctx,
×
NEW
1165
                        ssc.StackSet,
×
NEW
1166
                        ssc.Ingress,
×
NEW
1167
                        ssc.RouteGroup,
×
NEW
1168
                        ssc.GenerateIngress,
×
NEW
1169
                        ssc.GenerateRouteGroup,
×
NEW
1170
                )
×
NEW
1171
                if err != nil {
×
NEW
1172
                        return err
×
NEW
1173
                }
×
NEW
1174
        } else {
×
NEW
1175
                // Convert StackSet to traffic segments, if needed
×
NEW
1176
                err := c.convertToTrafficSegments(ctx, ssc)
×
NEW
1177
                if err != nil {
×
NEW
1178
                        return err
×
NEW
1179
                }
×
1180
        }
1181

1182
        trafficChanges := ssc.TrafficChanges()
×
1183
        if len(trafficChanges) != 0 {
×
1184
                var changeMessages []string
×
1185
                for _, change := range trafficChanges {
×
1186
                        changeMessages = append(changeMessages, change.String())
×
1187
                }
×
1188

1189
                c.recorder.Eventf(
×
1190
                        ssc.StackSet,
×
1191
                        v1.EventTypeNormal,
×
1192
                        "TrafficSwitched",
×
1193
                        "Switched traffic: %s",
×
1194
                        strings.Join(changeMessages, ", "))
×
1195
        }
1196

1197
        return nil
×
1198
}
1199

1200
func (c *StackSetController) ReconcileStackSetDesiredTraffic(ctx context.Context, existing *zv1.StackSet, generateUpdated func() []*zv1.DesiredTraffic) error {
1✔
1201
        updatedTraffic := generateUpdated()
1✔
1202

1✔
1203
        if equality.Semantic.DeepEqual(existing.Spec.Traffic, updatedTraffic) {
1✔
1204
                return nil
×
1205
        }
×
1206

1207
        updated := existing.DeepCopy()
1✔
1208
        updated.Spec.Traffic = updatedTraffic
1✔
1209

1✔
1210
        _, err := c.client.ZalandoV1().StackSets(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
1211
        if err != nil {
1✔
1212
                return err
×
1213
        }
×
1214
        c.recorder.Eventf(
1✔
1215
                updated,
1✔
1216
                v1.EventTypeNormal,
1✔
1217
                "UpdatedStackSet",
1✔
1218
                "Updated StackSet %s",
1✔
1219
                updated.Name)
1✔
1220
        return nil
1✔
1221
}
1222

1223
func (c *StackSetController) ReconcileStackResources(ctx context.Context, ssc *core.StackSetContainer, sc *core.StackContainer) error {
×
1224

×
1225
        err := c.ReconcileStackDeployment(ctx, sc.Stack, sc.Resources.Deployment, sc.GenerateDeployment)
×
1226
        if err != nil {
×
1227
                return c.errorEventf(sc.Stack, "FailedManageDeployment", err)
×
1228
        }
×
1229
        err = c.ReconcileStackHPA(ctx, sc.Stack, sc.Resources.HPA, sc.GenerateHPA)
×
1230
        if err != nil {
×
1231
                return c.errorEventf(sc.Stack, "FailedManageHPA", err)
×
1232
        }
×
1233

1234
        err = c.ReconcileStackService(ctx, sc.Stack, sc.Resources.Service, sc.GenerateService)
×
1235
        if err != nil {
×
1236
                return c.errorEventf(sc.Stack, "FailedManageService", err)
×
1237
        }
×
1238

1239
        err = c.ReconcileStackIngress(ctx, sc.Stack, sc.Resources.Ingress, sc.GenerateIngress)
×
1240
        if err != nil {
×
1241
                return c.errorEventf(sc.Stack, "FailedManageIngress", err)
×
1242
        }
×
1243

1244
        // This is to support both central and segment-based traffic.
NEW
1245
        if ssc.SupportsSegmentTraffic() {
×
NEW
1246
                err = c.ReconcileStackIngress(
×
NEW
1247
                        ctx,
×
NEW
1248
                        sc.Stack,
×
NEW
1249
                        sc.Resources.IngressSegment,
×
NEW
1250
                        sc.GenerateIngressSegment,
×
NEW
1251
                )
×
NEW
1252
                if err != nil {
×
NEW
1253
                        return c.errorEventf(sc.Stack, "FailedManageIngressSegment", err)
×
NEW
1254
                }
×
1255
        }
1256

1257
        if c.routeGroupSupportEnabled {
×
1258
                err = c.ReconcileStackRouteGroup(ctx, sc.Stack, sc.Resources.RouteGroup, sc.GenerateRouteGroup)
×
1259
                if err != nil {
×
1260
                        return c.errorEventf(sc.Stack, "FailedManageRouteGroup", err)
×
1261
                }
×
1262

1263
                // This is to support both central and segment-based traffic.
NEW
1264
                if ssc.SupportsSegmentTraffic() {
×
NEW
1265
                        err = c.ReconcileStackRouteGroup(
×
NEW
1266
                                ctx,
×
NEW
1267
                                sc.Stack,
×
NEW
1268
                                sc.Resources.RouteGroupSegment,
×
NEW
1269
                                sc.GenerateRouteGroupSegment,
×
NEW
1270
                        )
×
NEW
1271
                        if err != nil {
×
NEW
1272
                                return c.errorEventf(
×
NEW
1273
                                        sc.Stack,
×
NEW
1274
                                        "FailedManageRouteGroupSegment",
×
NEW
1275
                                        err,
×
NEW
1276
                                )
×
NEW
1277
                        }
×
1278
                }
1279
        }
1280
        return nil
×
1281
}
1282

1283
// ReconcileStackSet reconciles all the things from a stackset
1284
func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *core.StackSetContainer) (err error) {
×
1285
        defer func() {
×
1286
                if r := recover(); r != nil {
×
1287
                        c.metricsReporter.ReportPanic()
×
1288
                        c.stacksetLogger(container).Errorf("Encountered a panic while processing a stackset: %v\n%s", r, debug.Stack())
×
1289
                        err = fmt.Errorf("panic: %v", r)
×
1290
                }
×
1291
        }()
1292

NEW
1293
        if c.injectSegmentAnnotation(ctx, container.StackSet) {
×
NEW
1294
                // Reconciler handles StackSet in the next loop
×
NEW
1295
                return nil
×
NEW
1296
        }
×
1297

1298
        // Create current stack, if needed. Proceed on errors.
1299
        err = c.CreateCurrentStack(ctx, container)
×
1300
        if err != nil {
×
1301
                err = c.errorEventf(container.StackSet, "FailedCreateStack", err)
×
1302
                c.stacksetLogger(container).Errorf("Unable to create stack: %v", err)
×
1303
        }
×
1304

1305
        // Update statuses from external resources (ingresses, deployments, etc). Abort on errors.
1306
        err = container.UpdateFromResources()
×
1307
        if err != nil {
×
1308
                return err
×
1309
        }
×
1310

1311
        // Update the stacks with the currently selected traffic reconciler. Proceed on errors.
1312
        err = container.ManageTraffic(time.Now())
×
1313
        if err != nil {
×
1314
                c.stacksetLogger(container).Errorf("Traffic reconciliation failed: %v", err)
×
1315
                c.recorder.Eventf(
×
1316
                        container.StackSet,
×
1317
                        v1.EventTypeWarning,
×
1318
                        "TrafficNotSwitched",
×
1319
                        "Failed to switch traffic: "+err.Error())
×
1320
        }
×
1321

1322
        // Mark stacks that should be removed
1323
        container.MarkExpiredStacks()
×
1324

×
NEW
1325
        segsInOrder := []types.UID{}
×
NEW
1326
        // This is to support both central and segment-based traffic.
×
NEW
1327
        if container.SupportsSegmentTraffic() {
×
NEW
1328
                // Update traffic segments. Proceed on errors.
×
NEW
1329
                segsInOrder, err = c.ReconcileTrafficSegments(ctx, container)
×
NEW
1330
                if err != nil {
×
NEW
1331
                        err = c.errorEventf(
×
NEW
1332
                                container.StackSet,
×
NEW
1333
                                reasonFailedManageStackSet,
×
NEW
1334
                                err,
×
NEW
1335
                        )
×
NEW
1336
                        c.stacksetLogger(container).Errorf(
×
NEW
1337
                                "Unable to reconcile traffic segments: %v",
×
NEW
1338
                                err,
×
NEW
1339
                        )
×
NEW
1340
                }
×
1341
        }
1342

1343
        // Reconcile stack resources. Proceed on errors.
NEW
1344
        reconciledStacks := map[types.UID]bool{}
×
NEW
1345
        for _, id := range segsInOrder {
×
NEW
1346
                reconciledStacks[id] = true
×
NEW
1347
                sc := container.StackContainers[id]
×
NEW
1348
                err = c.ReconcileStackResources(ctx, container, sc)
×
NEW
1349
                if err != nil {
×
NEW
1350
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
NEW
1351
                        c.stackLogger(container, sc).Errorf(
×
NEW
1352
                                "Unable to reconcile stack resources: %v",
×
NEW
1353
                                err,
×
NEW
1354
                        )
×
NEW
1355
                }
×
1356
        }
1357

NEW
1358
        for k, sc := range container.StackContainers {
×
NEW
1359
                if reconciledStacks[k] {
×
NEW
1360
                        continue
×
1361
                }
1362

1363
                err = c.ReconcileStackResources(ctx, container, sc)
×
1364
                if err != nil {
×
1365
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
1366
                        c.stackLogger(container, sc).Errorf("Unable to reconcile stack resources: %v", err)
×
1367
                }
×
1368
        }
1369

1370
        // Reconcile stackset resources (update ingress and/or routegroups). Proceed on errors.
1371
        err = c.ReconcileStackSetResources(ctx, container)
×
1372
        if err != nil {
×
1373
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1374
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset resources: %v", err)
×
1375
        }
×
1376

1377
        // Reconcile desired traffic in the stackset. Proceed on errors.
1378
        err = c.ReconcileStackSetDesiredTraffic(ctx, container.StackSet, container.GenerateStackSetTraffic)
×
1379
        if err != nil {
×
1380
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1381
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset traffic: %v", err)
×
1382
        }
×
1383

1384
        // Delete old stacks. Proceed on errors.
1385
        err = c.CleanupOldStacks(ctx, container)
×
1386
        if err != nil {
×
1387
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1388
                c.stacksetLogger(container).Errorf("Unable to delete old stacks: %v", err)
×
1389
        }
×
1390

1391
        // Update statuses.
1392
        err = c.ReconcileStatuses(ctx, container)
×
1393
        if err != nil {
×
1394
                return err
×
1395
        }
×
1396

1397
        return nil
×
1398
}
1399

1400
// getResetMinReplicasDelay parses and returns the reset delay if set in the
1401
// stackset annotation.
1402
func getResetMinReplicasDelay(annotations map[string]string) (time.Duration, bool) {
1✔
1403
        resetDelayStr, ok := annotations[ResetHPAMinReplicasDelayAnnotationKey]
1✔
1404
        if !ok {
2✔
1405
                return 0, false
1✔
1406
        }
1✔
1407
        resetDelay, err := time.ParseDuration(resetDelayStr)
1✔
1408
        if err != nil {
1✔
1409
                return 0, false
×
1410
        }
×
1411
        return resetDelay, true
1✔
1412
}
1413

1414
func fixupStackSetTypeMeta(stackset *zv1.StackSet) {
1✔
1415
        // set TypeMeta manually because of this bug:
1✔
1416
        // https://github.com/kubernetes/client-go/issues/308
1✔
1417
        stackset.APIVersion = core.APIVersion
1✔
1418
        stackset.Kind = core.KindStackSet
1✔
1419
}
1✔
1420

1421
func fixupStackTypeMeta(stack *zv1.Stack) {
1✔
1422
        // set TypeMeta manually because of this bug:
1✔
1423
        // https://github.com/kubernetes/client-go/issues/308
1✔
1424
        stack.APIVersion = core.APIVersion
1✔
1425
        stack.Kind = core.KindStack
1✔
1426
}
1✔
1427

1428
func resourceReady(timestamp string, ttl time.Duration) (bool, error) {
1✔
1429
        resourceLastUpdated, err := time.Parse(time.RFC3339, timestamp)
1✔
1430
        if err != nil {
2✔
1431
                // wait until there's a valid timestamp on the annotation
1✔
1432
                return false, err
1✔
1433
        }
1✔
1434

1435
        return resourceReadyTime(resourceLastUpdated, ttl), nil
1✔
1436
}
1437

1438
func resourceReadyTime(timestamp time.Time, ttl time.Duration) bool {
1✔
1439
        if !timestamp.IsZero() && time.Since(timestamp) > ttl {
2✔
1440
                return true
1✔
1441
        }
1✔
1442

1443
        return false
1✔
1444
}
1445

1446
// validateConfigurationResourceNames returns an error if any ConfigurationResource name is not prefixed by Stack name.
1447
func validateConfigurationResourceNames(stack *zv1.Stack) error {
1✔
1448
        for _, rsc := range stack.Spec.ConfigurationResources {
2✔
1449
                rscName := rsc.ConfigMapRef.Name
1✔
1450
                if !strings.HasPrefix(rscName, stack.Name) {
1✔
1451
                        return fmt.Errorf("ConfigurationResource name must be prefixed by Stack name. "+
×
1452
                                "ConfigurationResource: %s, Stack: %s", rscName, stack.Name)
×
1453
                }
×
1454
        }
1455

1456
        return nil
1✔
1457
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc