• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando-incubator / stackset-controller / 7104061596

05 Dec 2023 05:02PM UTC coverage: 70.738% (-2.5%) from 73.244%
7104061596

Pull #495

github

gargravarr
Ensure create/update traffic segments when reconciling stack.
Pull Request #495: Add support for traffic segments.

330 of 582 new or added lines in 6 files covered. (56.7%)

1 existing line in 1 file now uncovered.

2548 of 3602 relevant lines covered (70.74%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.4
/controller/stackset.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "net/http"
7
        "runtime/debug"
8
        "strings"
9
        "sync"
10
        "time"
11

12
        "github.com/google/go-cmp/cmp"
13
        "github.com/google/go-cmp/cmp/cmpopts"
14
        "github.com/heptiolabs/healthcheck"
15
        "github.com/prometheus/client_golang/prometheus"
16
        log "github.com/sirupsen/logrus"
17
        rgv1 "github.com/szuecs/routegroup-client/apis/zalando.org/v1"
18
        zv1 "github.com/zalando-incubator/stackset-controller/pkg/apis/zalando.org/v1"
19
        "github.com/zalando-incubator/stackset-controller/pkg/clientset"
20
        "github.com/zalando-incubator/stackset-controller/pkg/core"
21
        "github.com/zalando-incubator/stackset-controller/pkg/recorder"
22
        "golang.org/x/sync/errgroup"
23
        v1 "k8s.io/api/core/v1"
24
        networking "k8s.io/api/networking/v1"
25
        "k8s.io/apimachinery/pkg/api/equality"
26
        "k8s.io/apimachinery/pkg/api/errors"
27
        "k8s.io/apimachinery/pkg/api/resource"
28
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
        "k8s.io/apimachinery/pkg/fields"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        "k8s.io/client-go/tools/cache"
33
        kube_record "k8s.io/client-go/tools/record"
34
)
35

36
const (
37
        PrescaleStacksAnnotationKey               = "alpha.stackset-controller.zalando.org/prescale-stacks"
38
        ResetHPAMinReplicasDelayAnnotationKey     = "alpha.stackset-controller.zalando.org/reset-hpa-min-replicas-delay"
39
        StacksetControllerControllerAnnotationKey = "stackset-controller.zalando.org/controller"
40
        ControllerLastUpdatedAnnotationKey        = "stackset-controller.zalando.org/updated-timestamp"
41
        TrafficSegmentsAnnotationKey              = "stackset-controller.zalando.org/use-traffic-segments"
42

43
        reasonFailedManageStackSet = "FailedManageStackSet"
44

45
        defaultResetMinReplicasDelay = 10 * time.Minute
46
)
47

48
// StackSetController is the main controller. It watches for changes to
49
// stackset resources and starts and maintains other controllers per
50
// stackset resource.
51
type StackSetController struct {
52
        logger                      *log.Entry
53
        client                      clientset.Interface
54
        controllerID                string
55
        backendWeightsAnnotationKey string
56
        clusterDomains              []string
57
        interval                    time.Duration
58
        stacksetEvents              chan stacksetEvent
59
        stacksetStore               map[types.UID]zv1.StackSet
60
        recorder                    kube_record.EventRecorder
61
        metricsReporter             *core.MetricsReporter
62
        HealthReporter              healthcheck.Handler
63
        routeGroupSupportEnabled    bool
64
        trafficSegmentsEnabled      bool
65
        ingressSourceSwitchTTL      time.Duration
66
        now                         func() string
67
        reconcileWorkers            int
68
        sync.Mutex
69
}
70

71
type stacksetEvent struct {
72
        Deleted  bool
73
        StackSet *zv1.StackSet
74
}
75

76
// eventedError wraps an error that was already exposed as an event to the user
77
type eventedError struct {
78
        err error
79
}
80

81
func (ee *eventedError) Error() string {
×
82
        return ee.err.Error()
×
83
}
×
84

85
func now() string {
×
86
        return time.Now().Format(time.RFC3339)
×
87
}
×
88

89
// NewStackSetController initializes a new StackSetController.
90
func NewStackSetController(
91
        client clientset.Interface,
92
        controllerID string,
93
        parallelWork int,
94
        backendWeightsAnnotationKey string,
95
        clusterDomains []string,
96
        registry prometheus.Registerer,
97
        interval time.Duration,
98
        routeGroupSupportEnabled bool,
99
        trafficSegmentsEnabled bool,
100
        ingressSourceSwitchTTL time.Duration,
101
) (*StackSetController, error) {
1✔
102
        metricsReporter, err := core.NewMetricsReporter(registry)
1✔
103
        if err != nil {
1✔
104
                return nil, err
×
105
        }
×
106

107
        return &StackSetController{
1✔
108
                logger:                      log.WithFields(log.Fields{"controller": "stackset"}),
1✔
109
                client:                      client,
1✔
110
                controllerID:                controllerID,
1✔
111
                backendWeightsAnnotationKey: backendWeightsAnnotationKey,
1✔
112
                clusterDomains:              clusterDomains,
1✔
113
                interval:                    interval,
1✔
114
                stacksetEvents:              make(chan stacksetEvent, 1),
1✔
115
                stacksetStore:               make(map[types.UID]zv1.StackSet),
1✔
116
                recorder:                    recorder.CreateEventRecorder(client),
1✔
117
                metricsReporter:             metricsReporter,
1✔
118
                HealthReporter:              healthcheck.NewHandler(),
1✔
119
                routeGroupSupportEnabled:    routeGroupSupportEnabled,
1✔
120
                trafficSegmentsEnabled:      trafficSegmentsEnabled,
1✔
121
                ingressSourceSwitchTTL:      ingressSourceSwitchTTL,
1✔
122
                now:                         now,
1✔
123
                reconcileWorkers:            parallelWork,
1✔
124
        }, nil
1✔
125
}
126

127
func (c *StackSetController) stacksetLogger(ssc *core.StackSetContainer) *log.Entry {
×
128
        return c.logger.WithFields(map[string]interface{}{
×
129
                "namespace": ssc.StackSet.Namespace,
×
130
                "stackset":  ssc.StackSet.Name,
×
131
        })
×
132
}
×
133

134
func (c *StackSetController) stackLogger(ssc *core.StackSetContainer, sc *core.StackContainer) *log.Entry {
×
135
        return c.logger.WithFields(map[string]interface{}{
×
136
                "namespace": ssc.StackSet.Namespace,
×
137
                "stackset":  ssc.StackSet.Name,
×
138
                "stack":     sc.Name(),
×
139
        })
×
140
}
×
141

142
// Run runs the main loop of the StackSetController. Before the loops it
143
// sets up a watcher to watch StackSet resources. The watch will send
144
// changes over a channel which is polled from the main loop.
145
func (c *StackSetController) Run(ctx context.Context) {
×
146
        var nextCheck time.Time
×
147

×
148
        // We're not alive if nextCheck is too far in the past
×
149
        c.HealthReporter.AddLivenessCheck("nextCheck", func() error {
×
150
                if time.Since(nextCheck) > 5*c.interval {
×
151
                        return fmt.Errorf("nextCheck too old")
×
152
                }
×
153
                return nil
×
154
        })
155

156
        c.startWatch(ctx)
×
157

×
158
        http.HandleFunc("/healthz", c.HealthReporter.LiveEndpoint)
×
159

×
160
        nextCheck = time.Now().Add(-c.interval)
×
161

×
162
        for {
×
163
                select {
×
164
                case <-time.After(time.Until(nextCheck)):
×
165

×
166
                        nextCheck = time.Now().Add(c.interval)
×
167

×
168
                        stackSetContainers, err := c.collectResources(ctx)
×
169
                        if err != nil {
×
170
                                c.logger.Errorf("Failed to collect resources: %v", err)
×
171
                                continue
×
172
                        }
173

174
                        var reconcileGroup errgroup.Group
×
175
                        reconcileGroup.SetLimit(c.reconcileWorkers)
×
176
                        for stackset, container := range stackSetContainers {
×
177
                                container := container
×
178
                                stackset := stackset
×
179

×
180
                                reconcileGroup.Go(func() error {
×
181
                                        if _, ok := c.stacksetStore[stackset]; ok {
×
182
                                                err := c.ReconcileStackSet(ctx, container)
×
183
                                                if err != nil {
×
184
                                                        c.stacksetLogger(container).Errorf("unable to reconcile a stackset: %v", err)
×
185
                                                        return c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
186
                                                }
×
187
                                        }
188
                                        return nil
×
189
                                })
190
                        }
191

192
                        err = reconcileGroup.Wait()
×
193
                        if err != nil {
×
194
                                c.logger.Errorf("Failed waiting for reconcilers: %v", err)
×
195
                        }
×
196
                        err = c.metricsReporter.Report(stackSetContainers)
×
197
                        if err != nil {
×
198
                                c.logger.Errorf("Failed reporting metrics: %v", err)
×
199
                        }
×
200
                case e := <-c.stacksetEvents:
×
201
                        stackset := *e.StackSet
×
202
                        fixupStackSetTypeMeta(&stackset)
×
203

×
204
                        // update/delete existing entry
×
205
                        if _, ok := c.stacksetStore[stackset.UID]; ok {
×
206
                                if e.Deleted || !c.hasOwnership(&stackset) {
×
207
                                        delete(c.stacksetStore, stackset.UID)
×
208
                                        continue
×
209
                                }
210

211
                                // update stackset entry
212
                                c.stacksetStore[stackset.UID] = stackset
×
213
                                continue
×
214
                        }
215

216
                        // check if stackset should be managed by the controller
217
                        if !c.hasOwnership(&stackset) {
×
218
                                continue
×
219
                        }
220

221
                        c.logger.Infof("Adding entry for StackSet %s/%s", stackset.Namespace, stackset.Name)
×
222
                        c.stacksetStore[stackset.UID] = stackset
×
223
                case <-ctx.Done():
×
224
                        c.logger.Info("Terminating main controller loop.")
×
225
                        return
×
226
                }
227
        }
228
}
229

230
// injectSegmentAnnotation injects the traffic segment annotation if it's not
231
// already present.
232
//
233
// Only inject the traffic segment annotation if the controller has traffic
234
// segments enabled.
235
func (c *StackSetController) injectSegmentAnnotation(
236
        ctx context.Context,
237
        stackSet *zv1.StackSet,
NEW
238
) bool {
×
NEW
239
        if !c.trafficSegmentsEnabled {
×
NEW
240
                return false
×
NEW
241
        }
×
242

NEW
243
        if stackSet.Annotations[TrafficSegmentsAnnotationKey] == "true" {
×
NEW
244
                return false
×
NEW
245
        }
×
246

NEW
247
        stackSet.Annotations[TrafficSegmentsAnnotationKey] = "true"
×
NEW
248

×
NEW
249
        _, err := c.client.ZalandoV1().StackSets(
×
NEW
250
                stackSet.Namespace,
×
NEW
251
        ).Update(
×
NEW
252
                ctx,
×
NEW
253
                stackSet,
×
NEW
254
                metav1.UpdateOptions{},
×
NEW
255
        )
×
NEW
256
        if err != nil {
×
NEW
257
                c.logger.Errorf(
×
NEW
258
                        "Failed injecting segment annotation: %v",
×
NEW
259
                        err,
×
NEW
260
                )
×
NEW
261
                return false
×
NEW
262
        }
×
NEW
263
        c.recorder.Eventf(
×
NEW
264
                stackSet,
×
NEW
265
                v1.EventTypeNormal,
×
NEW
266
                "UpdatedStackSet",
×
NEW
267
                "Updated StackSet %s",
×
NEW
268
                stackSet.Name,
×
NEW
269
        )
×
NEW
270

×
NEW
271
        return true
×
272
}
273

274
// collectResources collects resources for all stacksets at once and stores them per StackSet/Stack so that we don't
275
// overload the API requests with unnecessary requests
276
func (c *StackSetController) collectResources(ctx context.Context) (map[types.UID]*core.StackSetContainer, error) {
1✔
277
        stacksets := make(map[types.UID]*core.StackSetContainer, len(c.stacksetStore))
1✔
278
        for uid, stackset := range c.stacksetStore {
2✔
279
                stackset := stackset
1✔
280

1✔
281
                reconciler := core.TrafficReconciler(&core.SimpleTrafficReconciler{})
1✔
282

1✔
283
                // use prescaling logic if enabled with an annotation
1✔
284
                if _, ok := stackset.Annotations[PrescaleStacksAnnotationKey]; ok {
2✔
285
                        resetDelay := defaultResetMinReplicasDelay
1✔
286
                        if resetDelayValue, ok := getResetMinReplicasDelay(stackset.Annotations); ok {
2✔
287
                                resetDelay = resetDelayValue
1✔
288
                        }
1✔
289
                        reconciler = &core.PrescalingTrafficReconciler{
1✔
290
                                ResetHPAMinReplicasTimeout: resetDelay,
1✔
291
                        }
1✔
292
                }
293

294
                stacksetContainer := core.NewContainer(&stackset, reconciler, c.backendWeightsAnnotationKey, c.clusterDomains)
1✔
295
                if stackset.Annotations[TrafficSegmentsAnnotationKey] == "true" {
1✔
NEW
296
                        stacksetContainer.EnableSegmentTraffic()
×
NEW
297
                }
×
298
                stacksets[uid] = stacksetContainer
1✔
299
        }
300

301
        err := c.collectStacks(ctx, stacksets)
1✔
302
        if err != nil {
1✔
303
                return nil, err
×
304
        }
×
305

306
        err = c.collectIngresses(ctx, stacksets)
1✔
307
        if err != nil {
1✔
308
                return nil, err
×
309
        }
×
310

311
        if c.routeGroupSupportEnabled {
2✔
312
                err = c.collectRouteGroups(ctx, stacksets)
1✔
313
                if err != nil {
1✔
314
                        return nil, err
×
315
                }
×
316
        }
317

318
        err = c.collectDeployments(ctx, stacksets)
1✔
319
        if err != nil {
1✔
320
                return nil, err
×
321
        }
×
322

323
        err = c.collectServices(ctx, stacksets)
1✔
324
        if err != nil {
1✔
325
                return nil, err
×
326
        }
×
327

328
        err = c.collectHPAs(ctx, stacksets)
1✔
329
        if err != nil {
1✔
330
                return nil, err
×
331
        }
×
332

333
        return stacksets, nil
1✔
334
}
335

336
func (c *StackSetController) collectIngresses(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
337
        ingresses, err := c.client.NetworkingV1().Ingresses(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
338

1✔
339
        if err != nil {
1✔
340
                return fmt.Errorf("failed to list Ingresses: %v", err)
×
341
        }
×
342

343
        for _, i := range ingresses.Items {
2✔
344
                ingress := i
1✔
345
                if uid, ok := getOwnerUID(ingress.ObjectMeta); ok {
2✔
346
                        // stackset ingress
1✔
347
                        if s, ok := stacksets[uid]; ok {
2✔
348
                                s.Ingress = &ingress
1✔
349
                                continue
1✔
350
                        }
351

352
                        // stack ingress
353
                        for _, stackset := range stacksets {
2✔
354
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
355
                                        if strings.HasSuffix(
1✔
356
                                                ingress.ObjectMeta.Name,
1✔
357
                                                core.SegmentSuffix,
1✔
358
                                        ) {
2✔
359
                                                // Traffic Segment
1✔
360
                                                s.Resources.IngressSegment = &ingress
1✔
361
                                        } else {
2✔
362
                                                s.Resources.Ingress = &ingress
1✔
363
                                        }
1✔
364
                                        break
1✔
365
                                }
366
                        }
367
                }
368
        }
369
        return nil
1✔
370
}
371

372
func (c *StackSetController) collectRouteGroups(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
373
        rgs, err := c.client.RouteGroupV1().RouteGroups(v1.NamespaceAll).List(
1✔
374
                ctx,
1✔
375
                metav1.ListOptions{},
1✔
376
        )
1✔
377
        if err != nil {
1✔
378
                return fmt.Errorf("failed to list RouteGroups: %v", err)
×
379
        }
×
380

381
        for _, rg := range rgs.Items {
2✔
382
                routegroup := rg
1✔
383
                if uid, ok := getOwnerUID(routegroup.ObjectMeta); ok {
2✔
384
                        // stackset routegroups
1✔
385
                        if s, ok := stacksets[uid]; ok {
2✔
386
                                s.RouteGroup = &routegroup
1✔
387
                                continue
1✔
388
                        }
389

390
                        // stack routegroups
391
                        for _, stackset := range stacksets {
2✔
392
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
393
                                        if strings.HasSuffix(
1✔
394
                                                routegroup.ObjectMeta.Name,
1✔
395
                                                core.SegmentSuffix,
1✔
396
                                        ) {
2✔
397
                                                // Traffic Segment
1✔
398
                                                s.Resources.RouteGroupSegment = &routegroup
1✔
399
                                        } else {
2✔
400
                                                s.Resources.RouteGroup = &routegroup
1✔
401
                                        }
1✔
402
                                        break
1✔
403
                                }
404
                        }
405
                }
406
        }
407
        return nil
1✔
408
}
409

410
func (c *StackSetController) collectStacks(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
411
        stacks, err := c.client.ZalandoV1().Stacks(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
412
        if err != nil {
1✔
413
                return fmt.Errorf("failed to list Stacks: %v", err)
×
414
        }
×
415

416
        for _, stack := range stacks.Items {
2✔
417
                if uid, ok := getOwnerUID(stack.ObjectMeta); ok {
2✔
418
                        if s, ok := stacksets[uid]; ok {
2✔
419
                                stack := stack
1✔
420
                                fixupStackTypeMeta(&stack)
1✔
421

1✔
422
                                s.StackContainers[stack.UID] = &core.StackContainer{
1✔
423
                                        Stack: &stack,
1✔
424
                                }
1✔
425
                                continue
1✔
426
                        }
427
                }
428
        }
429
        return nil
1✔
430
}
431

432
func (c *StackSetController) collectDeployments(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
433
        deployments, err := c.client.AppsV1().Deployments(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
434
        if err != nil {
1✔
435
                return fmt.Errorf("failed to list Deployments: %v", err)
×
436
        }
×
437

438
        for _, d := range deployments.Items {
2✔
439
                deployment := d
1✔
440
                if uid, ok := getOwnerUID(deployment.ObjectMeta); ok {
2✔
441
                        for _, stackset := range stacksets {
2✔
442
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
443
                                        s.Resources.Deployment = &deployment
1✔
444
                                        break
1✔
445
                                }
446
                        }
447
                }
448
        }
449
        return nil
1✔
450
}
451

452
func (c *StackSetController) collectServices(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
453
        services, err := c.client.CoreV1().Services(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
454
        if err != nil {
1✔
455
                return fmt.Errorf("failed to list Services: %v", err)
×
456
        }
×
457

458
Items:
1✔
459
        for _, s := range services.Items {
2✔
460
                service := s
1✔
461
                if uid, ok := getOwnerUID(service.ObjectMeta); ok {
2✔
462
                        for _, stackset := range stacksets {
2✔
463
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
464
                                        s.Resources.Service = &service
1✔
465
                                        continue Items
1✔
466
                                }
467

468
                                // service/HPA used to be owned by the deployment for some reason
469
                                for _, stack := range stackset.StackContainers {
2✔
470
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
471
                                                stack.Resources.Service = &service
1✔
472
                                                continue Items
1✔
473
                                        }
474
                                }
475
                        }
476
                }
477
        }
478
        return nil
1✔
479
}
480

481
func (c *StackSetController) collectHPAs(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
482
        hpas, err := c.client.AutoscalingV2().HorizontalPodAutoscalers(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
483
        if err != nil {
1✔
484
                return fmt.Errorf("failed to list HPAs: %v", err)
×
485
        }
×
486

487
Items:
1✔
488
        for _, h := range hpas.Items {
2✔
489
                hpa := h
1✔
490
                if uid, ok := getOwnerUID(hpa.ObjectMeta); ok {
2✔
491
                        for _, stackset := range stacksets {
2✔
492
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
493
                                        s.Resources.HPA = &hpa
1✔
494
                                        continue Items
1✔
495
                                }
496

497
                                // service/HPA used to be owned by the deployment for some reason
498
                                for _, stack := range stackset.StackContainers {
2✔
499
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
500
                                                stack.Resources.HPA = &hpa
1✔
501
                                                continue Items
1✔
502
                                        }
503
                                }
504
                        }
505
                }
506
        }
507
        return nil
1✔
508
}
509

510
func getOwnerUID(objectMeta metav1.ObjectMeta) (types.UID, bool) {
1✔
511
        if len(objectMeta.OwnerReferences) == 1 {
2✔
512
                return objectMeta.OwnerReferences[0].UID, true
1✔
513
        }
1✔
514
        return "", false
1✔
515
}
516

517
func (c *StackSetController) errorEventf(object runtime.Object, reason string, err error) error {
×
518
        switch err.(type) {
×
519
        case *eventedError:
×
520
                // already notified
×
521
                return err
×
522
        default:
×
523
                c.recorder.Eventf(
×
524
                        object,
×
525
                        v1.EventTypeWarning,
×
526
                        reason,
×
527
                        err.Error())
×
528
                return &eventedError{err: err}
×
529
        }
530
}
531

532
// hasOwnership returns true if the controller is the "owner" of the stackset.
533
// Whether it's owner is determined by the value of the
534
// 'stackset-controller.zalando.org/controller' annotation. If the value
535
// matches the controllerID then it owns it, or if the controllerID is
536
// "" and there's no annotation set.
537
func (c *StackSetController) hasOwnership(stackset *zv1.StackSet) bool {
×
538
        if stackset.Annotations != nil {
×
539
                if owner, ok := stackset.Annotations[StacksetControllerControllerAnnotationKey]; ok {
×
540
                        return owner == c.controllerID
×
541
                }
×
542
        }
543
        return c.controllerID == ""
×
544
}
545

546
func (c *StackSetController) startWatch(ctx context.Context) {
×
547
        informer := cache.NewSharedIndexInformer(
×
548
                cache.NewListWatchFromClient(c.client.ZalandoV1().RESTClient(), "stacksets", v1.NamespaceAll, fields.Everything()),
×
549
                &zv1.StackSet{},
×
550
                0, // skip resync
×
551
                cache.Indexers{},
×
552
        )
×
553

×
554
        informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
555
                AddFunc:    c.add,
×
556
                UpdateFunc: c.update,
×
557
                DeleteFunc: c.del,
×
558
        })
×
559
        go informer.Run(ctx.Done())
×
560
        if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
×
561
                c.logger.Errorf("Timed out waiting for caches to sync")
×
562
                return
×
563
        }
×
564
        c.logger.Info("Synced StackSet watcher")
×
565
}
566

567
func (c *StackSetController) add(obj interface{}) {
×
568
        stackset, ok := obj.(*zv1.StackSet)
×
569
        if !ok {
×
570
                return
×
571
        }
×
572

573
        c.logger.Infof("New StackSet added %s/%s", stackset.Namespace, stackset.Name)
×
574
        c.stacksetEvents <- stacksetEvent{
×
575
                StackSet: stackset.DeepCopy(),
×
576
        }
×
577
}
578

579
func (c *StackSetController) update(oldObj, newObj interface{}) {
×
580
        newStackset, ok := newObj.(*zv1.StackSet)
×
581
        if !ok {
×
582
                return
×
583
        }
×
584

585
        oldStackset, ok := oldObj.(*zv1.StackSet)
×
586
        if !ok {
×
587
                return
×
588
        }
×
589

590
        c.logger.Debugf("StackSet %s/%s changed: %s",
×
591
                newStackset.Namespace,
×
592
                newStackset.Name,
×
593
                cmp.Diff(oldStackset, newStackset, cmpopts.IgnoreUnexported(resource.Quantity{})),
×
594
        )
×
595

×
596
        c.logger.Infof("StackSet updated %s/%s", newStackset.Namespace, newStackset.Name)
×
597
        c.stacksetEvents <- stacksetEvent{
×
598
                StackSet: newStackset.DeepCopy(),
×
599
        }
×
600
}
601

602
func (c *StackSetController) del(obj interface{}) {
×
603
        stackset, ok := obj.(*zv1.StackSet)
×
604
        if !ok {
×
605
                return
×
606
        }
×
607

608
        c.logger.Infof("StackSet deleted %s/%s", stackset.Namespace, stackset.Name)
×
609
        c.stacksetEvents <- stacksetEvent{
×
610
                StackSet: stackset.DeepCopy(),
×
611
                Deleted:  true,
×
612
        }
×
613
}
614

615
func retryUpdate(updateFn func(retry bool) error) error {
×
616
        retry := false
×
617
        for {
×
618
                err := updateFn(retry)
×
619
                if err != nil {
×
620
                        if errors.IsConflict(err) {
×
621
                                retry = true
×
622
                                continue
×
623
                        }
624
                        return err
×
625
                }
626
                return nil
×
627
        }
628
}
629

630
// ReconcileStatuses reconciles the statuses of StackSets and Stacks.
631
func (c *StackSetController) ReconcileStatuses(ctx context.Context, ssc *core.StackSetContainer) error {
×
632
        for _, sc := range ssc.StackContainers {
×
633
                stack := sc.Stack.DeepCopy()
×
634
                status := *sc.GenerateStackStatus()
×
635
                err := retryUpdate(func(retry bool) error {
×
636
                        if retry {
×
637
                                updated, err := c.client.ZalandoV1().Stacks(sc.Namespace()).Get(ctx, stack.Name, metav1.GetOptions{})
×
638
                                if err != nil {
×
639
                                        return err
×
640
                                }
×
641
                                stack = updated
×
642
                        }
643
                        if !equality.Semantic.DeepEqual(status, stack.Status) {
×
644
                                stack.Status = status
×
645
                                _, err := c.client.ZalandoV1().Stacks(sc.Namespace()).UpdateStatus(ctx, stack, metav1.UpdateOptions{})
×
646
                                return err
×
647
                        }
×
648
                        return nil
×
649
                })
650
                if err != nil {
×
651
                        return c.errorEventf(sc.Stack, "FailedUpdateStackStatus", err)
×
652
                }
×
653
        }
654

655
        stackset := ssc.StackSet.DeepCopy()
×
656
        status := *ssc.GenerateStackSetStatus()
×
657
        err := retryUpdate(func(retry bool) error {
×
658
                if retry {
×
659
                        updated, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).Get(ctx, ssc.StackSet.Name, metav1.GetOptions{})
×
660
                        if err != nil {
×
661
                                return err
×
662
                        }
×
663
                        stackset = updated
×
664
                }
665
                if !equality.Semantic.DeepEqual(status, stackset.Status) {
×
666
                        stackset.Status = status
×
667
                        _, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, stackset, metav1.UpdateOptions{})
×
668
                        return err
×
669
                }
×
670
                return nil
×
671
        })
672

673
        if err != nil {
×
674
                return c.errorEventf(ssc.StackSet, "FailedUpdateStackSetStatus", err)
×
675
        }
×
676
        return nil
×
677
}
678

679
// ReconcileTrafficSegments updates the traffic segments according to the actual
680
// traffic weight of each stack.
681
//
682
// Returns the ordered list of Trafic Segments that need to be updated.
683
func (c *StackSetController) ReconcileTrafficSegments(
684
        ctx context.Context,
685
        ssc *core.StackSetContainer,
NEW
686
) ([]core.TrafficSegment, error) {
×
NEW
687
        // Compute segments
×
NEW
688
        toUpdate, err := ssc.ComputeTrafficSegments()
×
NEW
689
        if err != nil {
×
NEW
690
                return []core.TrafficSegment{},
×
NEW
691
                        c.errorEventf(ssc.StackSet, "FailedManageSegments", err)
×
NEW
692
        }
×
693

NEW
694
        for _, ts := range toUpdate {
×
NEW
695
                ssc.StackContainers[ts.GetID()].IngressSegmentToUpdate = ts.IngressSegment
×
NEW
696
                ssc.StackContainers[ts.GetID()].RouteGroupSegmentToUpdate = ts.RouteGroupSegment
×
NEW
697
        }
×
698

NEW
699
        return toUpdate, nil
×
700
}
701

702
// CreateCurrentStack creates a new Stack object for the current stack, if needed
703
func (c *StackSetController) CreateCurrentStack(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
704
        newStack, newStackVersion := ssc.NewStack()
1✔
705
        if newStack == nil {
2✔
706
                return nil
1✔
707
        }
1✔
708

709
        created, err := c.client.ZalandoV1().Stacks(newStack.Namespace()).Create(ctx, newStack.Stack, metav1.CreateOptions{})
1✔
710
        if err != nil {
1✔
711
                return err
×
712
        }
×
713
        fixupStackTypeMeta(created)
1✔
714

1✔
715
        c.recorder.Eventf(
1✔
716
                ssc.StackSet,
1✔
717
                v1.EventTypeNormal,
1✔
718
                "CreatedStack",
1✔
719
                "Created stack %s",
1✔
720
                newStack.Name(),
1✔
721
        )
1✔
722

1✔
723
        // Persist ObservedStackVersion in the status
1✔
724
        updated := ssc.StackSet.DeepCopy()
1✔
725
        updated.Status.ObservedStackVersion = newStackVersion
1✔
726

1✔
727
        result, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, updated, metav1.UpdateOptions{})
1✔
728
        if err != nil {
1✔
729
                return err
×
730
        }
×
731
        fixupStackSetTypeMeta(result)
1✔
732
        ssc.StackSet = result
1✔
733

1✔
734
        ssc.StackContainers[created.UID] = &core.StackContainer{
1✔
735
                Stack:          created,
1✔
736
                PendingRemoval: false,
1✔
737
                Resources:      core.StackResources{},
1✔
738
        }
1✔
739
        return nil
1✔
740
}
741

742
// CleanupOldStacks deletes stacks that are no longer needed.
743
func (c *StackSetController) CleanupOldStacks(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
744
        for _, sc := range ssc.StackContainers {
2✔
745
                if !sc.PendingRemoval {
2✔
746
                        continue
1✔
747
                }
748

749
                stack := sc.Stack
1✔
750
                err := c.client.ZalandoV1().Stacks(stack.Namespace).Delete(ctx, stack.Name, metav1.DeleteOptions{})
1✔
751
                if err != nil {
1✔
752
                        return c.errorEventf(ssc.StackSet, "FailedDeleteStack", err)
×
753
                }
×
754
                c.recorder.Eventf(
1✔
755
                        ssc.StackSet,
1✔
756
                        v1.EventTypeNormal,
1✔
757
                        "DeletedExcessStack",
1✔
758
                        "Deleted excess stack %s",
1✔
759
                        stack.Name)
1✔
760
        }
761

762
        return nil
1✔
763
}
764

765
// AddUpdateStackSetIngress reconciles the Ingress but never deletes it, it returns the existing/new Ingress
766
func (c *StackSetController) AddUpdateStackSetIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup, ingress *networking.Ingress) (*networking.Ingress, error) {
1✔
767

1✔
768
        // Ingress removed, handled outside
1✔
769
        if ingress == nil {
2✔
770
                return existing, nil
1✔
771
        }
1✔
772

773
        if existing == nil {
2✔
774
                if ingress.Annotations == nil {
2✔
775
                        ingress.Annotations = make(map[string]string)
1✔
776
                }
1✔
777
                ingress.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
778

1✔
779
                createdIng, err := c.client.NetworkingV1().Ingresses(ingress.Namespace).Create(ctx, ingress, metav1.CreateOptions{})
1✔
780
                if err != nil {
1✔
781
                        return nil, err
×
782
                }
×
783
                c.recorder.Eventf(
1✔
784
                        stackset,
1✔
785
                        v1.EventTypeNormal,
1✔
786
                        "CreatedIngress",
1✔
787
                        "Created Ingress %s",
1✔
788
                        ingress.Name)
1✔
789
                return createdIng, nil
1✔
790
        }
791

792
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
793
        if existingHaveUpdateTimeStamp {
2✔
794
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
795
        }
1✔
796

797
        // Check if we need to update the Ingress
798
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(ingress.Spec, existing.Spec) &&
1✔
799
                equality.Semantic.DeepEqual(ingress.Annotations, existing.Annotations) &&
1✔
800
                equality.Semantic.DeepEqual(ingress.Labels, existing.Labels) {
2✔
801
                // add the annotation back after comparing
1✔
802
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
803
                return existing, nil
1✔
804
        }
1✔
805

806
        updated := existing.DeepCopy()
1✔
807
        updated.Spec = ingress.Spec
1✔
808
        if ingress.Annotations != nil {
2✔
809
                updated.Annotations = ingress.Annotations
1✔
810
        } else {
2✔
811
                updated.Annotations = make(map[string]string)
1✔
812
        }
1✔
813
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
814

1✔
815
        updated.Labels = ingress.Labels
1✔
816

1✔
817
        createdIngress, err := c.client.NetworkingV1().Ingresses(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
818
        if err != nil {
1✔
819
                return nil, err
×
820
        }
×
821
        c.recorder.Eventf(
1✔
822
                stackset,
1✔
823
                v1.EventTypeNormal,
1✔
824
                "UpdatedIngress",
1✔
825
                "Updated Ingress %s",
1✔
826
                ingress.Name)
1✔
827
        return createdIngress, nil
1✔
828
}
829

830
func (c *StackSetController) deleteIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup) error {
1✔
831
        // Check if a routegroup exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
832
        if stackset.Spec.RouteGroup != nil && c.routeGroupSupportEnabled {
2✔
833
                if routegroup == nil {
2✔
834
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup missing", existing.Name)
1✔
835
                        return nil
1✔
836
                }
1✔
837
                timestamp, ok := routegroup.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
838
                // The only scenario version we could think of for this is
1✔
839
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
840
                //  in that case, just wait until the RouteGroup has the annotation
1✔
841
                if !ok {
2✔
842
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have the %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
843
                        return nil
1✔
844
                }
1✔
845

846
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
847
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have a valid %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
848
                        return nil
1✔
849
                } else if !ready {
3✔
850
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s updated less than %s ago", existing.Name, routegroup.Name, c.ingressSourceSwitchTTL)
1✔
851
                        return nil
1✔
852
                }
1✔
853
        }
854
        err := c.client.NetworkingV1().Ingresses(existing.Namespace).Delete(ctx, existing.Name, metav1.DeleteOptions{})
1✔
855
        if err != nil {
1✔
856
                return err
×
857
        }
×
858
        c.recorder.Eventf(
1✔
859
                stackset,
1✔
860
                v1.EventTypeNormal,
1✔
861
                "DeletedIngress",
1✔
862
                "Deleted Ingress %s",
1✔
863
                existing.Namespace)
1✔
864
        return nil
1✔
865
}
866

867
// AddUpdateStackSetRouteGroup reconciles the RouteGroup but never deletes it, it returns the existing/new RouteGroup
868
func (c *StackSetController) AddUpdateStackSetRouteGroup(ctx context.Context, stackset *zv1.StackSet, existing *rgv1.RouteGroup, ingress *networking.Ingress, rg *rgv1.RouteGroup) (*rgv1.RouteGroup, error) {
1✔
869
        // RouteGroup removed, handled outside
1✔
870
        if rg == nil {
2✔
871
                return existing, nil
1✔
872
        }
1✔
873

874
        // Create new RouteGroup
875
        if existing == nil {
2✔
876
                if rg.Annotations == nil {
2✔
877
                        rg.Annotations = make(map[string]string)
1✔
878
                }
1✔
879
                rg.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
880

1✔
881
                createdRg, err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Create(ctx, rg, metav1.CreateOptions{})
1✔
882
                if err != nil {
1✔
883
                        return nil, err
×
884
                }
×
885
                c.recorder.Eventf(
1✔
886
                        stackset,
1✔
887
                        v1.EventTypeNormal,
1✔
888
                        "CreatedRouteGroup",
1✔
889
                        "Created RouteGroup %s",
1✔
890
                        rg.Name)
1✔
891
                return createdRg, nil
1✔
892
        }
893

894
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
895
        if existingHaveUpdateTimeStamp {
2✔
896
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
897
        }
1✔
898

899
        // Check if we need to update the RouteGroup
900
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(rg.Spec, existing.Spec) &&
1✔
901
                equality.Semantic.DeepEqual(rg.Annotations, existing.Annotations) &&
1✔
902
                equality.Semantic.DeepEqual(rg.Labels, existing.Labels) {
2✔
903
                // add the annotation back after comparing
1✔
904
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
905
                return existing, nil
1✔
906
        }
1✔
907

908
        updated := existing.DeepCopy()
1✔
909
        updated.Spec = rg.Spec
1✔
910
        if rg.Annotations != nil {
1✔
911
                updated.Annotations = rg.Annotations
×
912
        } else {
1✔
913
                updated.Annotations = make(map[string]string)
1✔
914
        }
1✔
915
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
916

1✔
917
        updated.Labels = rg.Labels
1✔
918

1✔
919
        createdRg, err := c.client.RouteGroupV1().RouteGroups(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
920
        if err != nil {
1✔
921
                return nil, err
×
922
        }
×
923
        c.recorder.Eventf(
1✔
924
                stackset,
1✔
925
                v1.EventTypeNormal,
1✔
926
                "UpdatedRouteGroup",
1✔
927
                "Updated RouteGroup %s",
1✔
928
                rg.Name)
1✔
929
        return createdRg, nil
1✔
930
}
931

932
func (c *StackSetController) deleteRouteGroup(ctx context.Context, stackset *zv1.StackSet, rg *rgv1.RouteGroup, ingress *networking.Ingress) error {
1✔
933
        // Check if an ingress exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
934
        if stackset.Spec.Ingress != nil {
2✔
935
                if ingress == nil {
2✔
936
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress missing", rg.Name)
1✔
937
                        return nil
1✔
938
                }
1✔
939
                timestamp, ok := ingress.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
940
                // The only scenario version we could think of for this is
1✔
941
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
942
                //  in that case, just wait until the RouteGroup has the annotation
1✔
943
                if !ok {
2✔
944
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have the %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
945
                        return nil
1✔
946
                }
1✔
947

948
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
949
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have a valid %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
950
                        return nil
1✔
951
                } else if !ready {
3✔
952
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s updated less than %s ago", rg.Name, ingress.Name, c.ingressSourceSwitchTTL)
1✔
953
                        return nil
1✔
954
                }
1✔
955
        }
956
        err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Delete(ctx, rg.Name, metav1.DeleteOptions{})
1✔
957
        if err != nil {
1✔
958
                return err
×
959
        }
×
960
        c.recorder.Eventf(
1✔
961
                stackset,
1✔
962
                v1.EventTypeNormal,
1✔
963
                "DeletedRouteGroup",
1✔
964
                "Deleted RouteGroup %s",
1✔
965
                rg.Namespace)
1✔
966
        return nil
1✔
967
}
968

969
func (c *StackSetController) ReconcileStackSetIngressSources(
970
        ctx context.Context,
971
        stackset *zv1.StackSet,
972
        existingIng *networking.Ingress,
973
        existingRg *rgv1.RouteGroup,
974
        generateIng func() (*networking.Ingress, error),
975
        generateRg func() (*rgv1.RouteGroup, error),
976
) error {
1✔
977
        ingress, err := generateIng()
1✔
978
        if err != nil {
1✔
979
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
980
        }
×
981

982
        // opt-out existingIng creation in case we have an external entity creating existingIng
983
        appliedIng, err := c.AddUpdateStackSetIngress(ctx, stackset, existingIng, existingRg, ingress)
1✔
984
        if err != nil {
1✔
985
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
986
        }
×
987

988
        rg, err := generateRg()
1✔
989
        if err != nil {
1✔
990
                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
991
        }
×
992

993
        var appliedRg *rgv1.RouteGroup
1✔
994
        if c.routeGroupSupportEnabled {
2✔
995
                appliedRg, err = c.AddUpdateStackSetRouteGroup(ctx, stackset, existingRg, appliedIng, rg)
1✔
996
                if err != nil {
1✔
997
                        return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
998
                }
×
999
        }
1000

1001
        // Ingress removed
1002
        if ingress == nil {
2✔
1003
                if existingIng != nil {
2✔
1004
                        err := c.deleteIngress(ctx, stackset, existingIng, appliedRg)
1✔
1005
                        if err != nil {
1✔
1006
                                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1007
                        }
×
1008
                }
1009
        }
1010

1011
        // RouteGroup removed
1012
        if rg == nil {
2✔
1013
                if existingRg != nil {
2✔
1014
                        err := c.deleteRouteGroup(ctx, stackset, existingRg, appliedIng)
1✔
1015
                        if err != nil {
1✔
1016
                                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1017
                        }
×
1018
                }
1019
        }
1020

1021
        return nil
1✔
1022
}
1023

1024
// convertToTrafficSegments removes the central ingress component of the
1025
// StackSet, if and only if the StackSet already has traffic segments.
1026
func (c *StackSetController) convertToTrafficSegments(
1027
        ctx context.Context,
1028
        ssc *core.StackSetContainer,
NEW
1029
) error {
×
NEW
1030
        if ssc.Ingress == nil && ssc.RouteGroup == nil {
×
NEW
1031
                return nil
×
NEW
1032
        }
×
1033

NEW
1034
        var ingTimestamp, rgTimestamp *metav1.Time
×
NEW
1035
        for _, sc := range ssc.StackContainers {
×
NEW
1036
                // If we find at least one stack with a segment, we can delete the
×
NEW
1037
                // central ingress resources.
×
NEW
1038
                if ingTimestamp == nil && sc.Resources.IngressSegment != nil {
×
NEW
1039
                        ingTimestamp = &sc.Resources.IngressSegment.CreationTimestamp
×
NEW
1040
                }
×
1041

NEW
1042
                if rgTimestamp == nil && sc.Resources.RouteGroupSegment != nil {
×
NEW
1043
                        rgTimestamp = &sc.Resources.RouteGroupSegment.CreationTimestamp
×
NEW
1044
                }
×
1045

NEW
1046
                if ingTimestamp != nil && rgTimestamp != nil {
×
NEW
1047
                        break
×
1048
                }
1049
        }
1050

NEW
1051
        if ingTimestamp != nil && ssc.Ingress != nil {
×
NEW
1052
                if !resourceReadyTime(ingTimestamp.Time, c.ingressSourceSwitchTTL) {
×
NEW
1053
                        c.logger.Infof(
×
NEW
1054
                                "Not deleting Ingress %s, segments created less than %s ago",
×
NEW
1055
                                ssc.Ingress.Name,
×
NEW
1056
                                c.ingressSourceSwitchTTL,
×
NEW
1057
                        )
×
NEW
1058
                        return nil
×
NEW
1059
                }
×
1060

NEW
1061
                err := c.client.NetworkingV1().Ingresses(ssc.Ingress.Namespace).Delete(
×
NEW
1062
                        ctx,
×
NEW
1063
                        ssc.Ingress.Name,
×
NEW
1064
                        metav1.DeleteOptions{},
×
NEW
1065
                )
×
NEW
1066
                if err != nil {
×
NEW
1067
                        return err
×
NEW
1068
                }
×
1069

NEW
1070
                c.recorder.Eventf(
×
NEW
1071
                        ssc.StackSet,
×
NEW
1072
                        v1.EventTypeNormal,
×
NEW
1073
                        "DeletedIngress",
×
NEW
1074
                        "Deleted Ingress %s, StackSet conversion complete",
×
NEW
1075
                        ssc.Ingress.Namespace,
×
NEW
1076
                )
×
NEW
1077

×
NEW
1078
                ssc.Ingress = nil
×
1079
        }
1080

NEW
1081
        if rgTimestamp != nil && ssc.RouteGroup != nil {
×
NEW
1082
                if !resourceReadyTime(rgTimestamp.Time, c.ingressSourceSwitchTTL) {
×
NEW
1083
                        c.logger.Infof(
×
NEW
1084
                                "Not deleting RouteGroup %s, segments created less than %s ago",
×
NEW
1085
                                ssc.RouteGroup.Name,
×
NEW
1086
                                c.ingressSourceSwitchTTL,
×
NEW
1087
                        )
×
NEW
1088
                        return nil
×
NEW
1089
                }
×
1090

NEW
1091
                err := c.client.RouteGroupV1().RouteGroups(
×
NEW
1092
                        ssc.RouteGroup.Namespace,
×
NEW
1093
                ).Delete(
×
NEW
1094
                        ctx,
×
NEW
1095
                        ssc.RouteGroup.Name,
×
NEW
1096
                        metav1.DeleteOptions{},
×
NEW
1097
                )
×
NEW
1098
                if err != nil {
×
NEW
1099
                        return err
×
NEW
1100
                }
×
1101

NEW
1102
                c.recorder.Eventf(
×
NEW
1103
                        ssc.RouteGroup,
×
NEW
1104
                        v1.EventTypeNormal,
×
NEW
1105
                        "DeletedRouteGroup",
×
NEW
1106
                        "Deleted RouteGroup %s, StackSet conversion complete",
×
NEW
1107
                        ssc.RouteGroup.Namespace,
×
NEW
1108
                )
×
NEW
1109

×
NEW
1110
                ssc.RouteGroup = nil
×
1111
        }
1112

NEW
1113
        return nil
×
1114
}
1115

1116
// ReconcileStackSetResources reconciles the central Ingress and/or RouteGroup
1117
// of the specified StackSet.
1118
//
1119
// If the StackSet supports traffic segments, the controller won't reconcile the
1120
// central ingress resources. This method is deprecated and will be removed in
1121
// the future.
1122
func (c *StackSetController) ReconcileStackSetResources(ctx context.Context, ssc *core.StackSetContainer) error {
×
NEW
1123
        if !ssc.SupportsSegmentTraffic() {
×
NEW
1124
                err := c.ReconcileStackSetIngressSources(
×
NEW
1125
                        ctx,
×
NEW
1126
                        ssc.StackSet,
×
NEW
1127
                        ssc.Ingress,
×
NEW
1128
                        ssc.RouteGroup,
×
NEW
1129
                        ssc.GenerateIngress,
×
NEW
1130
                        ssc.GenerateRouteGroup,
×
NEW
1131
                )
×
NEW
1132
                if err != nil {
×
NEW
1133
                        return err
×
NEW
1134
                }
×
NEW
1135
        } else {
×
NEW
1136
                // Convert StackSet to traffic segments, if needed
×
NEW
1137
                err := c.convertToTrafficSegments(ctx, ssc)
×
NEW
1138
                if err != nil {
×
NEW
1139
                        return err
×
NEW
1140
                }
×
1141
        }
1142

1143
        trafficChanges := ssc.TrafficChanges()
×
1144
        if len(trafficChanges) != 0 {
×
1145
                var changeMessages []string
×
1146
                for _, change := range trafficChanges {
×
1147
                        changeMessages = append(changeMessages, change.String())
×
1148
                }
×
1149

1150
                c.recorder.Eventf(
×
1151
                        ssc.StackSet,
×
1152
                        v1.EventTypeNormal,
×
1153
                        "TrafficSwitched",
×
1154
                        "Switched traffic: %s",
×
1155
                        strings.Join(changeMessages, ", "))
×
1156
        }
1157

1158
        return nil
×
1159
}
1160

1161
func (c *StackSetController) ReconcileStackSetDesiredTraffic(ctx context.Context, existing *zv1.StackSet, generateUpdated func() []*zv1.DesiredTraffic) error {
1✔
1162
        updatedTraffic := generateUpdated()
1✔
1163

1✔
1164
        if equality.Semantic.DeepEqual(existing.Spec.Traffic, updatedTraffic) {
1✔
1165
                return nil
×
1166
        }
×
1167

1168
        updated := existing.DeepCopy()
1✔
1169
        updated.Spec.Traffic = updatedTraffic
1✔
1170

1✔
1171
        _, err := c.client.ZalandoV1().StackSets(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
1172
        if err != nil {
1✔
1173
                return err
×
1174
        }
×
1175
        c.recorder.Eventf(
1✔
1176
                updated,
1✔
1177
                v1.EventTypeNormal,
1✔
1178
                "UpdatedStackSet",
1✔
1179
                "Updated StackSet %s",
1✔
1180
                updated.Name)
1✔
1181
        return nil
1✔
1182
}
1183

1184
func (c *StackSetController) ReconcileStackResources(ctx context.Context, ssc *core.StackSetContainer, sc *core.StackContainer) error {
×
1185
        err := c.ReconcileStackDeployment(ctx, sc.Stack, sc.Resources.Deployment, sc.GenerateDeployment)
×
1186
        if err != nil {
×
1187
                return c.errorEventf(sc.Stack, "FailedManageDeployment", err)
×
1188
        }
×
1189
        err = c.ReconcileStackHPA(ctx, sc.Stack, sc.Resources.HPA, sc.GenerateHPA)
×
1190
        if err != nil {
×
1191
                return c.errorEventf(sc.Stack, "FailedManageHPA", err)
×
1192
        }
×
1193

1194
        err = c.ReconcileStackService(ctx, sc.Stack, sc.Resources.Service, sc.GenerateService)
×
1195
        if err != nil {
×
1196
                return c.errorEventf(sc.Stack, "FailedManageService", err)
×
1197
        }
×
1198

1199
        err = c.ReconcileStackIngress(ctx, sc.Stack, sc.Resources.Ingress, sc.GenerateIngress)
×
1200
        if err != nil {
×
1201
                return c.errorEventf(sc.Stack, "FailedManageIngress", err)
×
1202
        }
×
1203

1204
        // This is to support both central and segment-based traffic.
NEW
1205
        if ssc.SupportsSegmentTraffic() {
×
NEW
1206
                err = c.ReconcileStackIngress(
×
NEW
1207
                        ctx,
×
NEW
1208
                        sc.Stack,
×
NEW
1209
                        sc.Resources.IngressSegment,
×
NEW
1210
                        sc.GenerateIngressSegment,
×
NEW
1211
                )
×
NEW
1212
                if err != nil {
×
NEW
1213
                        return c.errorEventf(sc.Stack, "FailedManageIngressSegment", err)
×
NEW
1214
                }
×
1215

NEW
1216
                sc.IngressSegmentToUpdate = nil
×
1217
        }
1218

1219
        if c.routeGroupSupportEnabled {
×
1220
                err = c.ReconcileStackRouteGroup(ctx, sc.Stack, sc.Resources.RouteGroup, sc.GenerateRouteGroup)
×
1221
                if err != nil {
×
1222
                        return c.errorEventf(sc.Stack, "FailedManageRouteGroup", err)
×
1223
                }
×
1224

1225
                // This is to support both central and segment-based traffic.
NEW
1226
                if ssc.SupportsSegmentTraffic() {
×
NEW
1227
                        err = c.ReconcileStackRouteGroup(
×
NEW
1228
                                ctx,
×
NEW
1229
                                sc.Stack,
×
NEW
1230
                                sc.Resources.RouteGroupSegment,
×
NEW
1231
                                sc.GenerateRouteGroupSegment,
×
NEW
1232
                        )
×
NEW
1233
                        if err != nil {
×
NEW
1234
                                return c.errorEventf(
×
NEW
1235
                                        sc.Stack,
×
NEW
1236
                                        "FailedManageRouteGroupSegment",
×
NEW
1237
                                        err,
×
NEW
1238
                                )
×
NEW
1239
                        }
×
1240

NEW
1241
                        sc.RouteGroupSegmentToUpdate = nil
×
1242
                }
1243
        }
1244

1245
        return nil
×
1246
}
1247

1248
// ReconcileStackSet reconciles all the things from a stackset
1249
func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *core.StackSetContainer) (err error) {
×
1250
        defer func() {
×
1251
                if r := recover(); r != nil {
×
1252
                        c.metricsReporter.ReportPanic()
×
1253
                        c.stacksetLogger(container).Errorf("Encountered a panic while processing a stackset: %v\n%s", r, debug.Stack())
×
1254
                        err = fmt.Errorf("panic: %v", r)
×
1255
                }
×
1256
        }()
1257

NEW
1258
        if c.injectSegmentAnnotation(ctx, container.StackSet) {
×
NEW
1259
                // Reconciler handles StackSet in the next loop
×
NEW
1260
                return nil
×
NEW
1261
        }
×
1262

1263
        // Create current stack, if needed. Proceed on errors.
1264
        err = c.CreateCurrentStack(ctx, container)
×
1265
        if err != nil {
×
1266
                err = c.errorEventf(container.StackSet, "FailedCreateStack", err)
×
1267
                c.stacksetLogger(container).Errorf("Unable to create stack: %v", err)
×
1268
        }
×
1269

1270
        // Update statuses from external resources (ingresses, deployments, etc). Abort on errors.
1271
        err = container.UpdateFromResources()
×
1272
        if err != nil {
×
1273
                return err
×
1274
        }
×
1275

1276
        // Update the stacks with the currently selected traffic reconciler. Proceed on errors.
1277
        err = container.ManageTraffic(time.Now())
×
1278
        if err != nil {
×
1279
                c.stacksetLogger(container).Errorf("Traffic reconciliation failed: %v", err)
×
1280
                c.recorder.Eventf(
×
1281
                        container.StackSet,
×
1282
                        v1.EventTypeWarning,
×
1283
                        "TrafficNotSwitched",
×
1284
                        "Failed to switch traffic: "+err.Error())
×
1285
        }
×
1286

1287
        // Mark stacks that should be removed
1288
        container.MarkExpiredStacks()
×
1289

×
NEW
1290
        segsInOrder := []core.TrafficSegment{}
×
NEW
1291
        // This is to support both central and segment-based traffic.
×
NEW
1292
        if container.SupportsSegmentTraffic() {
×
NEW
1293
                // Update traffic segments. Proceed on errors.
×
NEW
1294
                segsInOrder, err = c.ReconcileTrafficSegments(ctx, container)
×
NEW
1295
                if err != nil {
×
NEW
1296
                        err = c.errorEventf(
×
NEW
1297
                                container.StackSet,
×
NEW
1298
                                reasonFailedManageStackSet,
×
NEW
1299
                                err,
×
NEW
1300
                        )
×
NEW
1301
                        c.stacksetLogger(container).Errorf(
×
NEW
1302
                                "Unable to reconcile traffic segments: %v",
×
NEW
1303
                                err,
×
NEW
1304
                        )
×
NEW
1305
                }
×
1306
        }
1307

1308
        // Reconcile stack resources. Proceed on errors.
NEW
1309
        reconciledStacks := map[types.UID]bool{}
×
NEW
1310
        for _, ts := range segsInOrder {
×
NEW
1311
                reconciledStacks[ts.GetID()] = true
×
NEW
1312
                sc := container.StackContainers[ts.GetID()]
×
NEW
1313
                err = c.ReconcileStackResources(ctx, container, sc)
×
NEW
1314
                if err != nil {
×
NEW
1315
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
NEW
1316
                        c.stackLogger(container, sc).Errorf(
×
NEW
1317
                                "Unable to reconcile stack resources: %v",
×
NEW
1318
                                err,
×
NEW
1319
                        )
×
NEW
1320
                }
×
1321
        }
1322

NEW
1323
        for k, sc := range container.StackContainers {
×
NEW
1324
                if reconciledStacks[k] {
×
NEW
1325
                        continue
×
1326
                }
1327

1328
                err = c.ReconcileStackResources(ctx, container, sc)
×
1329
                if err != nil {
×
1330
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
1331
                        c.stackLogger(container, sc).Errorf("Unable to reconcile stack resources: %v", err)
×
1332
                }
×
1333
        }
1334

1335
        // Reconcile stackset resources (update ingress and/or routegroups). Proceed on errors.
1336
        err = c.ReconcileStackSetResources(ctx, container)
×
1337
        if err != nil {
×
1338
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1339
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset resources: %v", err)
×
1340
        }
×
1341

1342
        // Reconcile desired traffic in the stackset. Proceed on errors.
1343
        err = c.ReconcileStackSetDesiredTraffic(ctx, container.StackSet, container.GenerateStackSetTraffic)
×
1344
        if err != nil {
×
1345
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1346
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset traffic: %v", err)
×
1347
        }
×
1348

1349
        // Delete old stacks. Proceed on errors.
1350
        err = c.CleanupOldStacks(ctx, container)
×
1351
        if err != nil {
×
1352
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1353
                c.stacksetLogger(container).Errorf("Unable to delete old stacks: %v", err)
×
1354
        }
×
1355

1356
        // Update statuses.
1357
        err = c.ReconcileStatuses(ctx, container)
×
1358
        if err != nil {
×
1359
                return err
×
1360
        }
×
1361

1362
        return nil
×
1363
}
1364

1365
// getResetMinReplicasDelay parses and returns the reset delay if set in the
1366
// stackset annotation.
1367
func getResetMinReplicasDelay(annotations map[string]string) (time.Duration, bool) {
1✔
1368
        resetDelayStr, ok := annotations[ResetHPAMinReplicasDelayAnnotationKey]
1✔
1369
        if !ok {
2✔
1370
                return 0, false
1✔
1371
        }
1✔
1372
        resetDelay, err := time.ParseDuration(resetDelayStr)
1✔
1373
        if err != nil {
1✔
1374
                return 0, false
×
1375
        }
×
1376
        return resetDelay, true
1✔
1377
}
1378

1379
func fixupStackSetTypeMeta(stackset *zv1.StackSet) {
1✔
1380
        // set TypeMeta manually because of this bug:
1✔
1381
        // https://github.com/kubernetes/client-go/issues/308
1✔
1382
        stackset.APIVersion = core.APIVersion
1✔
1383
        stackset.Kind = core.KindStackSet
1✔
1384
}
1✔
1385

1386
func fixupStackTypeMeta(stack *zv1.Stack) {
1✔
1387
        // set TypeMeta manually because of this bug:
1✔
1388
        // https://github.com/kubernetes/client-go/issues/308
1✔
1389
        stack.APIVersion = core.APIVersion
1✔
1390
        stack.Kind = core.KindStack
1✔
1391
}
1✔
1392

1393
func resourceReady(timestamp string, ttl time.Duration) (bool, error) {
1✔
1394
        resourceLastUpdated, err := time.Parse(time.RFC3339, timestamp)
1✔
1395
        if err != nil {
2✔
1396
                // wait until there's a valid timestamp on the annotation
1✔
1397
                return false, err
1✔
1398
        }
1✔
1399

1400
        return resourceReadyTime(resourceLastUpdated, ttl), nil
1✔
1401
}
1402

1403
func resourceReadyTime(timestamp time.Time, ttl time.Duration) bool {
1✔
1404
        if !timestamp.IsZero() && time.Since(timestamp) > ttl {
2✔
1405
                return true
1✔
1406
        }
1✔
1407

1408
        return false
1✔
1409
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc