• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando-incubator / stackset-controller / 7115116827

06 Dec 2023 01:21PM UTC coverage: 70.507% (-2.7%) from 73.254%
7115116827

Pull #495

github

gargravarr
Generate actual segment in cluster.
Pull Request #495: Add support for traffic segments.

320 of 584 new or added lines in 7 files covered. (54.79%)

1 existing line in 1 file now uncovered.

2601 of 3689 relevant lines covered (70.51%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.16
/controller/stackset.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "net/http"
7
        "runtime/debug"
8
        "strings"
9
        "sync"
10
        "time"
11

12
        "github.com/google/go-cmp/cmp"
13
        "github.com/google/go-cmp/cmp/cmpopts"
14
        "github.com/heptiolabs/healthcheck"
15
        "github.com/prometheus/client_golang/prometheus"
16
        log "github.com/sirupsen/logrus"
17
        rgv1 "github.com/szuecs/routegroup-client/apis/zalando.org/v1"
18
        zv1 "github.com/zalando-incubator/stackset-controller/pkg/apis/zalando.org/v1"
19
        "github.com/zalando-incubator/stackset-controller/pkg/clientset"
20
        "github.com/zalando-incubator/stackset-controller/pkg/core"
21
        "github.com/zalando-incubator/stackset-controller/pkg/recorder"
22
        "golang.org/x/sync/errgroup"
23
        v1 "k8s.io/api/core/v1"
24
        networking "k8s.io/api/networking/v1"
25
        "k8s.io/apimachinery/pkg/api/equality"
26
        "k8s.io/apimachinery/pkg/api/errors"
27
        "k8s.io/apimachinery/pkg/api/resource"
28
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
        "k8s.io/apimachinery/pkg/fields"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        "k8s.io/client-go/tools/cache"
33
        kube_record "k8s.io/client-go/tools/record"
34
)
35

36
const (
37
        PrescaleStacksAnnotationKey               = "alpha.stackset-controller.zalando.org/prescale-stacks"
38
        ResetHPAMinReplicasDelayAnnotationKey     = "alpha.stackset-controller.zalando.org/reset-hpa-min-replicas-delay"
39
        StacksetControllerControllerAnnotationKey = "stackset-controller.zalando.org/controller"
40
        ControllerLastUpdatedAnnotationKey        = "stackset-controller.zalando.org/updated-timestamp"
41
        TrafficSegmentsAnnotationKey              = "stackset-controller.zalando.org/use-traffic-segments"
42

43
        reasonFailedManageStackSet = "FailedManageStackSet"
44

45
        defaultResetMinReplicasDelay = 10 * time.Minute
46
)
47

48
// StackSetController is the main controller. It watches for changes to
49
// stackset resources and starts and maintains other controllers per
50
// stackset resource.
51
type StackSetController struct {
52
        logger                      *log.Entry
53
        client                      clientset.Interface
54
        controllerID                string
55
        backendWeightsAnnotationKey string
56
        clusterDomains              []string
57
        interval                    time.Duration
58
        stacksetEvents              chan stacksetEvent
59
        stacksetStore               map[types.UID]zv1.StackSet
60
        recorder                    kube_record.EventRecorder
61
        metricsReporter             *core.MetricsReporter
62
        HealthReporter              healthcheck.Handler
63
        routeGroupSupportEnabled    bool
64
        trafficSegmentsEnabled      bool
65
        ingressSourceSwitchTTL      time.Duration
66
        now                         func() string
67
        reconcileWorkers            int
68
        configMapSupportEnabled     bool
69
        sync.Mutex
70
}
71

72
type stacksetEvent struct {
73
        Deleted  bool
74
        StackSet *zv1.StackSet
75
}
76

77
// eventedError wraps an error that was already exposed as an event to the user
78
type eventedError struct {
79
        err error
80
}
81

82
func (ee *eventedError) Error() string {
×
83
        return ee.err.Error()
×
84
}
×
85

86
func now() string {
×
87
        return time.Now().Format(time.RFC3339)
×
88
}
×
89

90
// NewStackSetController initializes a new StackSetController.
91
func NewStackSetController(
92
        client clientset.Interface,
93
        controllerID string,
94
        parallelWork int,
95
        backendWeightsAnnotationKey string,
96
        clusterDomains []string,
97
        registry prometheus.Registerer,
98
        interval time.Duration,
99
        routeGroupSupportEnabled bool,
100
        trafficSegmentsEnabled bool,
101
        configMapSupportEnabled bool,
102
        ingressSourceSwitchTTL time.Duration,
103
) (*StackSetController, error) {
1✔
104
        metricsReporter, err := core.NewMetricsReporter(registry)
1✔
105
        if err != nil {
1✔
106
                return nil, err
×
107
        }
×
108

109
        return &StackSetController{
1✔
110
                logger:                      log.WithFields(log.Fields{"controller": "stackset"}),
1✔
111
                client:                      client,
1✔
112
                controllerID:                controllerID,
1✔
113
                backendWeightsAnnotationKey: backendWeightsAnnotationKey,
1✔
114
                clusterDomains:              clusterDomains,
1✔
115
                interval:                    interval,
1✔
116
                stacksetEvents:              make(chan stacksetEvent, 1),
1✔
117
                stacksetStore:               make(map[types.UID]zv1.StackSet),
1✔
118
                recorder:                    recorder.CreateEventRecorder(client),
1✔
119
                metricsReporter:             metricsReporter,
1✔
120
                HealthReporter:              healthcheck.NewHandler(),
1✔
121
                routeGroupSupportEnabled:    routeGroupSupportEnabled,
1✔
122
                trafficSegmentsEnabled:      trafficSegmentsEnabled,
1✔
123
                ingressSourceSwitchTTL:      ingressSourceSwitchTTL,
1✔
124
                configMapSupportEnabled:     configMapSupportEnabled,
1✔
125
                now:                         now,
1✔
126
                reconcileWorkers:            parallelWork,
1✔
127
        }, nil
1✔
128
}
129

130
func (c *StackSetController) stacksetLogger(ssc *core.StackSetContainer) *log.Entry {
×
131
        return c.logger.WithFields(map[string]interface{}{
×
132
                "namespace": ssc.StackSet.Namespace,
×
133
                "stackset":  ssc.StackSet.Name,
×
134
        })
×
135
}
×
136

137
func (c *StackSetController) stackLogger(ssc *core.StackSetContainer, sc *core.StackContainer) *log.Entry {
×
138
        return c.logger.WithFields(map[string]interface{}{
×
139
                "namespace": ssc.StackSet.Namespace,
×
140
                "stackset":  ssc.StackSet.Name,
×
141
                "stack":     sc.Name(),
×
142
        })
×
143
}
×
144

145
// Run runs the main loop of the StackSetController. Before the loops it
146
// sets up a watcher to watch StackSet resources. The watch will send
147
// changes over a channel which is polled from the main loop.
148
func (c *StackSetController) Run(ctx context.Context) {
×
149
        var nextCheck time.Time
×
150

×
151
        // We're not alive if nextCheck is too far in the past
×
152
        c.HealthReporter.AddLivenessCheck("nextCheck", func() error {
×
153
                if time.Since(nextCheck) > 5*c.interval {
×
154
                        return fmt.Errorf("nextCheck too old")
×
155
                }
×
156
                return nil
×
157
        })
158

159
        c.startWatch(ctx)
×
160

×
161
        http.HandleFunc("/healthz", c.HealthReporter.LiveEndpoint)
×
162

×
163
        nextCheck = time.Now().Add(-c.interval)
×
164

×
165
        for {
×
166
                select {
×
167
                case <-time.After(time.Until(nextCheck)):
×
168

×
169
                        nextCheck = time.Now().Add(c.interval)
×
170

×
171
                        stackSetContainers, err := c.collectResources(ctx)
×
172
                        if err != nil {
×
173
                                c.logger.Errorf("Failed to collect resources: %v", err)
×
174
                                continue
×
175
                        }
176

177
                        var reconcileGroup errgroup.Group
×
178
                        reconcileGroup.SetLimit(c.reconcileWorkers)
×
179
                        for stackset, container := range stackSetContainers {
×
180
                                container := container
×
181
                                stackset := stackset
×
182

×
183
                                reconcileGroup.Go(func() error {
×
184
                                        if _, ok := c.stacksetStore[stackset]; ok {
×
185
                                                err := c.ReconcileStackSet(ctx, container)
×
186
                                                if err != nil {
×
187
                                                        c.stacksetLogger(container).Errorf("unable to reconcile a stackset: %v", err)
×
188
                                                        return c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
189
                                                }
×
190
                                        }
191
                                        return nil
×
192
                                })
193
                        }
194

195
                        err = reconcileGroup.Wait()
×
196
                        if err != nil {
×
197
                                c.logger.Errorf("Failed waiting for reconcilers: %v", err)
×
198
                        }
×
199
                        err = c.metricsReporter.Report(stackSetContainers)
×
200
                        if err != nil {
×
201
                                c.logger.Errorf("Failed reporting metrics: %v", err)
×
202
                        }
×
203
                case e := <-c.stacksetEvents:
×
204
                        stackset := *e.StackSet
×
205
                        fixupStackSetTypeMeta(&stackset)
×
206

×
207
                        // update/delete existing entry
×
208
                        if _, ok := c.stacksetStore[stackset.UID]; ok {
×
209
                                if e.Deleted || !c.hasOwnership(&stackset) {
×
210
                                        delete(c.stacksetStore, stackset.UID)
×
211
                                        continue
×
212
                                }
213

214
                                // update stackset entry
215
                                c.stacksetStore[stackset.UID] = stackset
×
216
                                continue
×
217
                        }
218

219
                        // check if stackset should be managed by the controller
220
                        if !c.hasOwnership(&stackset) {
×
221
                                continue
×
222
                        }
223

224
                        c.logger.Infof("Adding entry for StackSet %s/%s", stackset.Namespace, stackset.Name)
×
225
                        c.stacksetStore[stackset.UID] = stackset
×
226
                case <-ctx.Done():
×
227
                        c.logger.Info("Terminating main controller loop.")
×
228
                        return
×
229
                }
230
        }
231
}
232

233
// injectSegmentAnnotation injects the traffic segment annotation if it's not
234
// already present.
235
//
236
// Only inject the traffic segment annotation if the controller has traffic
237
// segments enabled.
238
func (c *StackSetController) injectSegmentAnnotation(
239
        ctx context.Context,
240
        stackSet *zv1.StackSet,
NEW
241
) bool {
×
NEW
242
        if !c.trafficSegmentsEnabled {
×
NEW
243
                return false
×
NEW
244
        }
×
245

NEW
246
        if stackSet.Annotations[TrafficSegmentsAnnotationKey] == "true" {
×
NEW
247
                return false
×
NEW
248
        }
×
249

NEW
250
        stackSet.Annotations[TrafficSegmentsAnnotationKey] = "true"
×
NEW
251

×
NEW
252
        _, err := c.client.ZalandoV1().StackSets(
×
NEW
253
                stackSet.Namespace,
×
NEW
254
        ).Update(
×
NEW
255
                ctx,
×
NEW
256
                stackSet,
×
NEW
257
                metav1.UpdateOptions{},
×
NEW
258
        )
×
NEW
259
        if err != nil {
×
NEW
260
                c.logger.Errorf(
×
NEW
261
                        "Failed injecting segment annotation: %v",
×
NEW
262
                        err,
×
NEW
263
                )
×
NEW
264
                return false
×
NEW
265
        }
×
NEW
266
        c.recorder.Eventf(
×
NEW
267
                stackSet,
×
NEW
268
                v1.EventTypeNormal,
×
NEW
269
                "UpdatedStackSet",
×
NEW
270
                "Updated StackSet %s",
×
NEW
271
                stackSet.Name,
×
NEW
272
        )
×
NEW
273

×
NEW
274
        return true
×
275
}
276

277
// collectResources collects resources for all stacksets at once and stores them per StackSet/Stack so that we don't
278
// overload the API requests with unnecessary requests
279
func (c *StackSetController) collectResources(ctx context.Context) (map[types.UID]*core.StackSetContainer, error) {
1✔
280
        stacksets := make(map[types.UID]*core.StackSetContainer, len(c.stacksetStore))
1✔
281
        for uid, stackset := range c.stacksetStore {
2✔
282
                stackset := stackset
1✔
283

1✔
284
                reconciler := core.TrafficReconciler(&core.SimpleTrafficReconciler{})
1✔
285

1✔
286
                // use prescaling logic if enabled with an annotation
1✔
287
                if _, ok := stackset.Annotations[PrescaleStacksAnnotationKey]; ok {
2✔
288
                        resetDelay := defaultResetMinReplicasDelay
1✔
289
                        if resetDelayValue, ok := getResetMinReplicasDelay(stackset.Annotations); ok {
2✔
290
                                resetDelay = resetDelayValue
1✔
291
                        }
1✔
292
                        reconciler = &core.PrescalingTrafficReconciler{
1✔
293
                                ResetHPAMinReplicasTimeout: resetDelay,
1✔
294
                        }
1✔
295
                }
296

297
                stacksetContainer := core.NewContainer(&stackset, reconciler, c.backendWeightsAnnotationKey, c.clusterDomains)
1✔
298
                if stackset.Annotations[TrafficSegmentsAnnotationKey] == "true" {
1✔
NEW
299
                        stacksetContainer.EnableSegmentTraffic()
×
NEW
300
                }
×
301
                stacksets[uid] = stacksetContainer
1✔
302
        }
303

304
        err := c.collectStacks(ctx, stacksets)
1✔
305
        if err != nil {
1✔
306
                return nil, err
×
307
        }
×
308

309
        err = c.collectIngresses(ctx, stacksets)
1✔
310
        if err != nil {
1✔
311
                return nil, err
×
312
        }
×
313

314
        if c.routeGroupSupportEnabled {
2✔
315
                err = c.collectRouteGroups(ctx, stacksets)
1✔
316
                if err != nil {
1✔
317
                        return nil, err
×
318
                }
×
319
        }
320

321
        err = c.collectDeployments(ctx, stacksets)
1✔
322
        if err != nil {
1✔
323
                return nil, err
×
324
        }
×
325

326
        err = c.collectServices(ctx, stacksets)
1✔
327
        if err != nil {
1✔
328
                return nil, err
×
329
        }
×
330

331
        err = c.collectHPAs(ctx, stacksets)
1✔
332
        if err != nil {
1✔
333
                return nil, err
×
334
        }
×
335

336
        if c.configMapSupportEnabled {
2✔
337
                err = c.collectConfigMaps(ctx, stacksets)
1✔
338
                if err != nil {
1✔
339
                        return nil, err
×
340
                }
×
341
        }
342

343
        return stacksets, nil
1✔
344
}
345

346
func (c *StackSetController) collectIngresses(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
347
        ingresses, err := c.client.NetworkingV1().Ingresses(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
348

1✔
349
        if err != nil {
1✔
350
                return fmt.Errorf("failed to list Ingresses: %v", err)
×
351
        }
×
352

353
        for _, i := range ingresses.Items {
2✔
354
                ingress := i
1✔
355
                if uid, ok := getOwnerUID(ingress.ObjectMeta); ok {
2✔
356
                        // stackset ingress
1✔
357
                        if s, ok := stacksets[uid]; ok {
2✔
358
                                s.Ingress = &ingress
1✔
359
                                continue
1✔
360
                        }
361

362
                        // stack ingress
363
                        for _, stackset := range stacksets {
2✔
364
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
365
                                        if strings.HasSuffix(
1✔
366
                                                ingress.ObjectMeta.Name,
1✔
367
                                                core.SegmentSuffix,
1✔
368
                                        ) {
2✔
369
                                                // Traffic Segment
1✔
370
                                                s.Resources.IngressSegment = &ingress
1✔
371
                                        } else {
2✔
372
                                                s.Resources.Ingress = &ingress
1✔
373
                                        }
1✔
374
                                        break
1✔
375
                                }
376
                        }
377
                }
378
        }
379
        return nil
1✔
380
}
381

382
func (c *StackSetController) collectRouteGroups(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
383
        rgs, err := c.client.RouteGroupV1().RouteGroups(v1.NamespaceAll).List(
1✔
384
                ctx,
1✔
385
                metav1.ListOptions{},
1✔
386
        )
1✔
387
        if err != nil {
1✔
388
                return fmt.Errorf("failed to list RouteGroups: %v", err)
×
389
        }
×
390

391
        for _, rg := range rgs.Items {
2✔
392
                routegroup := rg
1✔
393
                if uid, ok := getOwnerUID(routegroup.ObjectMeta); ok {
2✔
394
                        // stackset routegroups
1✔
395
                        if s, ok := stacksets[uid]; ok {
2✔
396
                                s.RouteGroup = &routegroup
1✔
397
                                continue
1✔
398
                        }
399

400
                        // stack routegroups
401
                        for _, stackset := range stacksets {
2✔
402
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
403
                                        if strings.HasSuffix(
1✔
404
                                                routegroup.ObjectMeta.Name,
1✔
405
                                                core.SegmentSuffix,
1✔
406
                                        ) {
2✔
407
                                                // Traffic Segment
1✔
408
                                                s.Resources.RouteGroupSegment = &routegroup
1✔
409
                                        } else {
2✔
410
                                                s.Resources.RouteGroup = &routegroup
1✔
411
                                        }
1✔
412
                                        break
1✔
413
                                }
414
                        }
415
                }
416
        }
417
        return nil
1✔
418
}
419

420
func (c *StackSetController) collectStacks(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
421
        stacks, err := c.client.ZalandoV1().Stacks(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
422
        if err != nil {
1✔
423
                return fmt.Errorf("failed to list Stacks: %v", err)
×
424
        }
×
425

426
        for _, stack := range stacks.Items {
2✔
427
                if uid, ok := getOwnerUID(stack.ObjectMeta); ok {
2✔
428
                        if s, ok := stacksets[uid]; ok {
2✔
429
                                stack := stack
1✔
430
                                fixupStackTypeMeta(&stack)
1✔
431

1✔
432
                                s.StackContainers[stack.UID] = &core.StackContainer{
1✔
433
                                        Stack: &stack,
1✔
434
                                }
1✔
435
                                continue
1✔
436
                        }
437
                }
438
        }
439
        return nil
1✔
440
}
441

442
func (c *StackSetController) collectDeployments(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
443
        deployments, err := c.client.AppsV1().Deployments(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
444
        if err != nil {
1✔
445
                return fmt.Errorf("failed to list Deployments: %v", err)
×
446
        }
×
447

448
        for _, d := range deployments.Items {
2✔
449
                deployment := d
1✔
450
                if uid, ok := getOwnerUID(deployment.ObjectMeta); ok {
2✔
451
                        for _, stackset := range stacksets {
2✔
452
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
453
                                        s.Resources.Deployment = &deployment
1✔
454
                                        break
1✔
455
                                }
456
                        }
457
                }
458
        }
459
        return nil
1✔
460
}
461

462
func (c *StackSetController) collectServices(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
463
        services, err := c.client.CoreV1().Services(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
464
        if err != nil {
1✔
465
                return fmt.Errorf("failed to list Services: %v", err)
×
466
        }
×
467

468
Items:
1✔
469
        for _, s := range services.Items {
2✔
470
                service := s
1✔
471
                if uid, ok := getOwnerUID(service.ObjectMeta); ok {
2✔
472
                        for _, stackset := range stacksets {
2✔
473
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
474
                                        s.Resources.Service = &service
1✔
475
                                        continue Items
1✔
476
                                }
477

478
                                // service/HPA used to be owned by the deployment for some reason
479
                                for _, stack := range stackset.StackContainers {
2✔
480
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
481
                                                stack.Resources.Service = &service
1✔
482
                                                continue Items
1✔
483
                                        }
484
                                }
485
                        }
486
                }
487
        }
488
        return nil
1✔
489
}
490

491
func (c *StackSetController) collectHPAs(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
492
        hpas, err := c.client.AutoscalingV2().HorizontalPodAutoscalers(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
493
        if err != nil {
1✔
494
                return fmt.Errorf("failed to list HPAs: %v", err)
×
495
        }
×
496

497
Items:
1✔
498
        for _, h := range hpas.Items {
2✔
499
                hpa := h
1✔
500
                if uid, ok := getOwnerUID(hpa.ObjectMeta); ok {
2✔
501
                        for _, stackset := range stacksets {
2✔
502
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
503
                                        s.Resources.HPA = &hpa
1✔
504
                                        continue Items
1✔
505
                                }
506

507
                                // service/HPA used to be owned by the deployment for some reason
508
                                for _, stack := range stackset.StackContainers {
2✔
509
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
510
                                                stack.Resources.HPA = &hpa
1✔
511
                                                continue Items
1✔
512
                                        }
513
                                }
514
                        }
515
                }
516
        }
517
        return nil
1✔
518
}
519

520
func (c *StackSetController) collectConfigMaps(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
521
        configMaps, err := c.client.CoreV1().ConfigMaps(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
522
        if err != nil {
1✔
523
                return fmt.Errorf("failed to list ConfigMaps: %v", err)
×
524
        }
×
525

526
        for _, cm := range configMaps.Items {
2✔
527
                configMap := cm
1✔
528
                if uid, ok := getOwnerUID(configMap.ObjectMeta); ok {
2✔
529
                        for _, stackset := range stacksets {
2✔
530
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
531
                                        s.Resources.ConfigMaps = append(s.Resources.ConfigMaps, &configMap)
1✔
532
                                        break
1✔
533
                                }
534
                        }
535
                }
536
        }
537
        return nil
1✔
538
}
539

540
func getOwnerUID(objectMeta metav1.ObjectMeta) (types.UID, bool) {
1✔
541
        if len(objectMeta.OwnerReferences) == 1 {
2✔
542
                return objectMeta.OwnerReferences[0].UID, true
1✔
543
        }
1✔
544
        return "", false
1✔
545
}
546

547
func (c *StackSetController) errorEventf(object runtime.Object, reason string, err error) error {
×
548
        switch err.(type) {
×
549
        case *eventedError:
×
550
                // already notified
×
551
                return err
×
552
        default:
×
553
                c.recorder.Eventf(
×
554
                        object,
×
555
                        v1.EventTypeWarning,
×
556
                        reason,
×
557
                        err.Error())
×
558
                return &eventedError{err: err}
×
559
        }
560
}
561

562
// hasOwnership returns true if the controller is the "owner" of the stackset.
563
// Whether it's owner is determined by the value of the
564
// 'stackset-controller.zalando.org/controller' annotation. If the value
565
// matches the controllerID then it owns it, or if the controllerID is
566
// "" and there's no annotation set.
567
func (c *StackSetController) hasOwnership(stackset *zv1.StackSet) bool {
×
568
        if stackset.Annotations != nil {
×
569
                if owner, ok := stackset.Annotations[StacksetControllerControllerAnnotationKey]; ok {
×
570
                        return owner == c.controllerID
×
571
                }
×
572
        }
573
        return c.controllerID == ""
×
574
}
575

576
func (c *StackSetController) startWatch(ctx context.Context) {
×
577
        informer := cache.NewSharedIndexInformer(
×
578
                cache.NewListWatchFromClient(c.client.ZalandoV1().RESTClient(), "stacksets", v1.NamespaceAll, fields.Everything()),
×
579
                &zv1.StackSet{},
×
580
                0, // skip resync
×
581
                cache.Indexers{},
×
582
        )
×
583

×
584
        informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
585
                AddFunc:    c.add,
×
586
                UpdateFunc: c.update,
×
587
                DeleteFunc: c.del,
×
588
        })
×
589
        go informer.Run(ctx.Done())
×
590
        if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
×
591
                c.logger.Errorf("Timed out waiting for caches to sync")
×
592
                return
×
593
        }
×
594
        c.logger.Info("Synced StackSet watcher")
×
595
}
596

597
func (c *StackSetController) add(obj interface{}) {
×
598
        stackset, ok := obj.(*zv1.StackSet)
×
599
        if !ok {
×
600
                return
×
601
        }
×
602

603
        c.logger.Infof("New StackSet added %s/%s", stackset.Namespace, stackset.Name)
×
604
        c.stacksetEvents <- stacksetEvent{
×
605
                StackSet: stackset.DeepCopy(),
×
606
        }
×
607
}
608

609
func (c *StackSetController) update(oldObj, newObj interface{}) {
×
610
        newStackset, ok := newObj.(*zv1.StackSet)
×
611
        if !ok {
×
612
                return
×
613
        }
×
614

615
        oldStackset, ok := oldObj.(*zv1.StackSet)
×
616
        if !ok {
×
617
                return
×
618
        }
×
619

620
        c.logger.Debugf("StackSet %s/%s changed: %s",
×
621
                newStackset.Namespace,
×
622
                newStackset.Name,
×
623
                cmp.Diff(oldStackset, newStackset, cmpopts.IgnoreUnexported(resource.Quantity{})),
×
624
        )
×
625

×
626
        c.logger.Infof("StackSet updated %s/%s", newStackset.Namespace, newStackset.Name)
×
627
        c.stacksetEvents <- stacksetEvent{
×
628
                StackSet: newStackset.DeepCopy(),
×
629
        }
×
630
}
631

632
func (c *StackSetController) del(obj interface{}) {
×
633
        stackset, ok := obj.(*zv1.StackSet)
×
634
        if !ok {
×
635
                return
×
636
        }
×
637

638
        c.logger.Infof("StackSet deleted %s/%s", stackset.Namespace, stackset.Name)
×
639
        c.stacksetEvents <- stacksetEvent{
×
640
                StackSet: stackset.DeepCopy(),
×
641
                Deleted:  true,
×
642
        }
×
643
}
644

645
func retryUpdate(updateFn func(retry bool) error) error {
×
646
        retry := false
×
647
        for {
×
648
                err := updateFn(retry)
×
649
                if err != nil {
×
650
                        if errors.IsConflict(err) {
×
651
                                retry = true
×
652
                                continue
×
653
                        }
654
                        return err
×
655
                }
656
                return nil
×
657
        }
658
}
659

660
// ReconcileStatuses reconciles the statuses of StackSets and Stacks.
661
func (c *StackSetController) ReconcileStatuses(ctx context.Context, ssc *core.StackSetContainer) error {
×
662
        for _, sc := range ssc.StackContainers {
×
663
                stack := sc.Stack.DeepCopy()
×
664
                status := *sc.GenerateStackStatus()
×
665
                err := retryUpdate(func(retry bool) error {
×
666
                        if retry {
×
667
                                updated, err := c.client.ZalandoV1().Stacks(sc.Namespace()).Get(ctx, stack.Name, metav1.GetOptions{})
×
668
                                if err != nil {
×
669
                                        return err
×
670
                                }
×
671
                                stack = updated
×
672
                        }
673
                        if !equality.Semantic.DeepEqual(status, stack.Status) {
×
674
                                stack.Status = status
×
675
                                _, err := c.client.ZalandoV1().Stacks(sc.Namespace()).UpdateStatus(ctx, stack, metav1.UpdateOptions{})
×
676
                                return err
×
677
                        }
×
678
                        return nil
×
679
                })
680
                if err != nil {
×
681
                        return c.errorEventf(sc.Stack, "FailedUpdateStackStatus", err)
×
682
                }
×
683
        }
684

685
        stackset := ssc.StackSet.DeepCopy()
×
686
        status := *ssc.GenerateStackSetStatus()
×
687
        err := retryUpdate(func(retry bool) error {
×
688
                if retry {
×
689
                        updated, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).Get(ctx, ssc.StackSet.Name, metav1.GetOptions{})
×
690
                        if err != nil {
×
691
                                return err
×
692
                        }
×
693
                        stackset = updated
×
694
                }
695
                if !equality.Semantic.DeepEqual(status, stackset.Status) {
×
696
                        stackset.Status = status
×
697
                        _, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, stackset, metav1.UpdateOptions{})
×
698
                        return err
×
699
                }
×
700
                return nil
×
701
        })
702
        if err != nil {
×
703
                return c.errorEventf(ssc.StackSet, "FailedUpdateStackSetStatus", err)
×
704
        }
×
705
        return nil
×
706
}
707

708
// ReconcileTrafficSegments updates the traffic segments according to the actual
709
// traffic weight of each stack.
710
//
711
// Returns the ordered list of Trafic Segments that need to be updated.
712
func (c *StackSetController) ReconcileTrafficSegments(
713
        ctx context.Context,
714
        ssc *core.StackSetContainer,
NEW
715
) ([]core.TrafficSegment, error) {
×
NEW
716
        // Compute segments
×
NEW
717
        toUpdate, err := ssc.ComputeTrafficSegments()
×
NEW
718
        if err != nil {
×
NEW
719
                return []core.TrafficSegment{},
×
NEW
720
                        c.errorEventf(ssc.StackSet, "FailedManageSegments", err)
×
NEW
721
        }
×
722

NEW
723
        for _, ts := range toUpdate {
×
NEW
724
                ssc.StackContainers[ts.GetID()].IngressSegmentToUpdate = ts.IngressSegment
×
NEW
725
                ssc.StackContainers[ts.GetID()].RouteGroupSegmentToUpdate = ts.RouteGroupSegment
×
NEW
726
        }
×
727

NEW
728
        return toUpdate, nil
×
729
}
730

731
// CreateCurrentStack creates a new Stack object for the current stack, if needed
732
func (c *StackSetController) CreateCurrentStack(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
733
        newStack, newStackVersion := ssc.NewStack()
1✔
734
        if newStack == nil {
2✔
735
                return nil
1✔
736
        }
1✔
737

738
        if c.configMapSupportEnabled {
2✔
739
                // ensure that ConfigurationResources are prefixed by Stack name.
1✔
740
                if err := validateConfigurationResourceNames(newStack.Stack); err != nil {
1✔
741
                        return err
×
742
                }
×
743
        }
744

745
        created, err := c.client.ZalandoV1().Stacks(newStack.Namespace()).Create(ctx, newStack.Stack, metav1.CreateOptions{})
1✔
746
        if err != nil {
1✔
747
                return err
×
748
        }
×
749
        fixupStackTypeMeta(created)
1✔
750

1✔
751
        c.recorder.Eventf(
1✔
752
                ssc.StackSet,
1✔
753
                v1.EventTypeNormal,
1✔
754
                "CreatedStack",
1✔
755
                "Created stack %s",
1✔
756
                newStack.Name(),
1✔
757
        )
1✔
758

1✔
759
        // Persist ObservedStackVersion in the status
1✔
760
        updated := ssc.StackSet.DeepCopy()
1✔
761
        updated.Status.ObservedStackVersion = newStackVersion
1✔
762

1✔
763
        result, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, updated, metav1.UpdateOptions{})
1✔
764
        if err != nil {
1✔
765
                return err
×
766
        }
×
767
        fixupStackSetTypeMeta(result)
1✔
768
        ssc.StackSet = result
1✔
769

1✔
770
        ssc.StackContainers[created.UID] = &core.StackContainer{
1✔
771
                Stack:          created,
1✔
772
                PendingRemoval: false,
1✔
773
                Resources:      core.StackResources{},
1✔
774
        }
1✔
775
        return nil
1✔
776
}
777

778
// CleanupOldStacks deletes stacks that are no longer needed.
779
func (c *StackSetController) CleanupOldStacks(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
780
        for _, sc := range ssc.StackContainers {
2✔
781
                if !sc.PendingRemoval {
2✔
782
                        continue
1✔
783
                }
784

785
                stack := sc.Stack
1✔
786
                err := c.client.ZalandoV1().Stacks(stack.Namespace).Delete(ctx, stack.Name, metav1.DeleteOptions{})
1✔
787
                if err != nil {
1✔
788
                        return c.errorEventf(ssc.StackSet, "FailedDeleteStack", err)
×
789
                }
×
790
                c.recorder.Eventf(
1✔
791
                        ssc.StackSet,
1✔
792
                        v1.EventTypeNormal,
1✔
793
                        "DeletedExcessStack",
1✔
794
                        "Deleted excess stack %s",
1✔
795
                        stack.Name)
1✔
796
        }
797

798
        return nil
1✔
799
}
800

801
// AddUpdateStackSetIngress reconciles the Ingress but never deletes it, it returns the existing/new Ingress
802
func (c *StackSetController) AddUpdateStackSetIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup, ingress *networking.Ingress) (*networking.Ingress, error) {
1✔
803
        // Ingress removed, handled outside
1✔
804
        if ingress == nil {
2✔
805
                return existing, nil
1✔
806
        }
1✔
807

808
        if existing == nil {
2✔
809
                if ingress.Annotations == nil {
2✔
810
                        ingress.Annotations = make(map[string]string)
1✔
811
                }
1✔
812
                ingress.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
813

1✔
814
                createdIng, err := c.client.NetworkingV1().Ingresses(ingress.Namespace).Create(ctx, ingress, metav1.CreateOptions{})
1✔
815
                if err != nil {
1✔
816
                        return nil, err
×
817
                }
×
818
                c.recorder.Eventf(
1✔
819
                        stackset,
1✔
820
                        v1.EventTypeNormal,
1✔
821
                        "CreatedIngress",
1✔
822
                        "Created Ingress %s",
1✔
823
                        ingress.Name)
1✔
824
                return createdIng, nil
1✔
825
        }
826

827
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
828
        if existingHaveUpdateTimeStamp {
2✔
829
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
830
        }
1✔
831

832
        // Check if we need to update the Ingress
833
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(ingress.Spec, existing.Spec) &&
1✔
834
                equality.Semantic.DeepEqual(ingress.Annotations, existing.Annotations) &&
1✔
835
                equality.Semantic.DeepEqual(ingress.Labels, existing.Labels) {
2✔
836
                // add the annotation back after comparing
1✔
837
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
838
                return existing, nil
1✔
839
        }
1✔
840

841
        updated := existing.DeepCopy()
1✔
842
        updated.Spec = ingress.Spec
1✔
843
        if ingress.Annotations != nil {
2✔
844
                updated.Annotations = ingress.Annotations
1✔
845
        } else {
2✔
846
                updated.Annotations = make(map[string]string)
1✔
847
        }
1✔
848
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
849

1✔
850
        updated.Labels = ingress.Labels
1✔
851

1✔
852
        createdIngress, err := c.client.NetworkingV1().Ingresses(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
853
        if err != nil {
1✔
854
                return nil, err
×
855
        }
×
856
        c.recorder.Eventf(
1✔
857
                stackset,
1✔
858
                v1.EventTypeNormal,
1✔
859
                "UpdatedIngress",
1✔
860
                "Updated Ingress %s",
1✔
861
                ingress.Name)
1✔
862
        return createdIngress, nil
1✔
863
}
864

865
func (c *StackSetController) deleteIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup) error {
1✔
866
        // Check if a routegroup exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
867
        if stackset.Spec.RouteGroup != nil && c.routeGroupSupportEnabled {
2✔
868
                if routegroup == nil {
2✔
869
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup missing", existing.Name)
1✔
870
                        return nil
1✔
871
                }
1✔
872
                timestamp, ok := routegroup.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
873
                // The only scenario version we could think of for this is
1✔
874
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
875
                //  in that case, just wait until the RouteGroup has the annotation
1✔
876
                if !ok {
2✔
877
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have the %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
878
                        return nil
1✔
879
                }
1✔
880

881
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
882
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have a valid %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
883
                        return nil
1✔
884
                } else if !ready {
3✔
885
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s updated less than %s ago", existing.Name, routegroup.Name, c.ingressSourceSwitchTTL)
1✔
886
                        return nil
1✔
887
                }
1✔
888
        }
889
        err := c.client.NetworkingV1().Ingresses(existing.Namespace).Delete(ctx, existing.Name, metav1.DeleteOptions{})
1✔
890
        if err != nil {
1✔
891
                return err
×
892
        }
×
893
        c.recorder.Eventf(
1✔
894
                stackset,
1✔
895
                v1.EventTypeNormal,
1✔
896
                "DeletedIngress",
1✔
897
                "Deleted Ingress %s",
1✔
898
                existing.Namespace)
1✔
899
        return nil
1✔
900
}
901

902
// AddUpdateStackSetRouteGroup reconciles the RouteGroup but never deletes it, it returns the existing/new RouteGroup
903
func (c *StackSetController) AddUpdateStackSetRouteGroup(ctx context.Context, stackset *zv1.StackSet, existing *rgv1.RouteGroup, ingress *networking.Ingress, rg *rgv1.RouteGroup) (*rgv1.RouteGroup, error) {
1✔
904
        // RouteGroup removed, handled outside
1✔
905
        if rg == nil {
2✔
906
                return existing, nil
1✔
907
        }
1✔
908

909
        // Create new RouteGroup
910
        if existing == nil {
2✔
911
                if rg.Annotations == nil {
2✔
912
                        rg.Annotations = make(map[string]string)
1✔
913
                }
1✔
914
                rg.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
915

1✔
916
                createdRg, err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Create(ctx, rg, metav1.CreateOptions{})
1✔
917
                if err != nil {
1✔
918
                        return nil, err
×
919
                }
×
920
                c.recorder.Eventf(
1✔
921
                        stackset,
1✔
922
                        v1.EventTypeNormal,
1✔
923
                        "CreatedRouteGroup",
1✔
924
                        "Created RouteGroup %s",
1✔
925
                        rg.Name)
1✔
926
                return createdRg, nil
1✔
927
        }
928

929
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
930
        if existingHaveUpdateTimeStamp {
2✔
931
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
932
        }
1✔
933

934
        // Check if we need to update the RouteGroup
935
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(rg.Spec, existing.Spec) &&
1✔
936
                equality.Semantic.DeepEqual(rg.Annotations, existing.Annotations) &&
1✔
937
                equality.Semantic.DeepEqual(rg.Labels, existing.Labels) {
2✔
938
                // add the annotation back after comparing
1✔
939
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
940
                return existing, nil
1✔
941
        }
1✔
942

943
        updated := existing.DeepCopy()
1✔
944
        updated.Spec = rg.Spec
1✔
945
        if rg.Annotations != nil {
1✔
946
                updated.Annotations = rg.Annotations
×
947
        } else {
1✔
948
                updated.Annotations = make(map[string]string)
1✔
949
        }
1✔
950
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
951

1✔
952
        updated.Labels = rg.Labels
1✔
953

1✔
954
        createdRg, err := c.client.RouteGroupV1().RouteGroups(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
955
        if err != nil {
1✔
956
                return nil, err
×
957
        }
×
958
        c.recorder.Eventf(
1✔
959
                stackset,
1✔
960
                v1.EventTypeNormal,
1✔
961
                "UpdatedRouteGroup",
1✔
962
                "Updated RouteGroup %s",
1✔
963
                rg.Name)
1✔
964
        return createdRg, nil
1✔
965
}
966

967
func (c *StackSetController) deleteRouteGroup(ctx context.Context, stackset *zv1.StackSet, rg *rgv1.RouteGroup, ingress *networking.Ingress) error {
1✔
968
        // Check if an ingress exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
969
        if stackset.Spec.Ingress != nil {
2✔
970
                if ingress == nil {
2✔
971
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress missing", rg.Name)
1✔
972
                        return nil
1✔
973
                }
1✔
974
                timestamp, ok := ingress.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
975
                // The only scenario version we could think of for this is
1✔
976
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
977
                //  in that case, just wait until the RouteGroup has the annotation
1✔
978
                if !ok {
2✔
979
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have the %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
980
                        return nil
1✔
981
                }
1✔
982

983
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
984
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have a valid %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
985
                        return nil
1✔
986
                } else if !ready {
3✔
987
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s updated less than %s ago", rg.Name, ingress.Name, c.ingressSourceSwitchTTL)
1✔
988
                        return nil
1✔
989
                }
1✔
990
        }
991
        err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Delete(ctx, rg.Name, metav1.DeleteOptions{})
1✔
992
        if err != nil {
1✔
993
                return err
×
994
        }
×
995
        c.recorder.Eventf(
1✔
996
                stackset,
1✔
997
                v1.EventTypeNormal,
1✔
998
                "DeletedRouteGroup",
1✔
999
                "Deleted RouteGroup %s",
1✔
1000
                rg.Namespace)
1✔
1001
        return nil
1✔
1002
}
1003

1004
func (c *StackSetController) ReconcileStackSetIngressSources(
1005
        ctx context.Context,
1006
        stackset *zv1.StackSet,
1007
        existingIng *networking.Ingress,
1008
        existingRg *rgv1.RouteGroup,
1009
        generateIng func() (*networking.Ingress, error),
1010
        generateRg func() (*rgv1.RouteGroup, error),
1011
) error {
1✔
1012
        ingress, err := generateIng()
1✔
1013
        if err != nil {
1✔
1014
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1015
        }
×
1016

1017
        // opt-out existingIng creation in case we have an external entity creating existingIng
1018
        appliedIng, err := c.AddUpdateStackSetIngress(ctx, stackset, existingIng, existingRg, ingress)
1✔
1019
        if err != nil {
1✔
1020
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1021
        }
×
1022

1023
        rg, err := generateRg()
1✔
1024
        if err != nil {
1✔
1025
                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1026
        }
×
1027

1028
        var appliedRg *rgv1.RouteGroup
1✔
1029
        if c.routeGroupSupportEnabled {
2✔
1030
                appliedRg, err = c.AddUpdateStackSetRouteGroup(ctx, stackset, existingRg, appliedIng, rg)
1✔
1031
                if err != nil {
1✔
1032
                        return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1033
                }
×
1034
        }
1035

1036
        // Ingress removed
1037
        if ingress == nil {
2✔
1038
                if existingIng != nil {
2✔
1039
                        err := c.deleteIngress(ctx, stackset, existingIng, appliedRg)
1✔
1040
                        if err != nil {
1✔
1041
                                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1042
                        }
×
1043
                }
1044
        }
1045

1046
        // RouteGroup removed
1047
        if rg == nil {
2✔
1048
                if existingRg != nil {
2✔
1049
                        err := c.deleteRouteGroup(ctx, stackset, existingRg, appliedIng)
1✔
1050
                        if err != nil {
1✔
1051
                                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1052
                        }
×
1053
                }
1054
        }
1055

1056
        return nil
1✔
1057
}
1058

1059
// convertToTrafficSegments removes the central ingress component of the
1060
// StackSet, if and only if the StackSet already has traffic segments.
1061
func (c *StackSetController) convertToTrafficSegments(
1062
        ctx context.Context,
1063
        ssc *core.StackSetContainer,
NEW
1064
) error {
×
NEW
1065
        if ssc.Ingress == nil && ssc.RouteGroup == nil {
×
NEW
1066
                return nil
×
NEW
1067
        }
×
1068

NEW
1069
        var ingTimestamp, rgTimestamp *metav1.Time
×
NEW
1070
        for _, sc := range ssc.StackContainers {
×
NEW
1071
                // If we find at least one stack with a segment, we can delete the
×
NEW
1072
                // central ingress resources.
×
NEW
1073
                if ingTimestamp == nil && sc.Resources.IngressSegment != nil {
×
NEW
1074
                        ingTimestamp = &sc.Resources.IngressSegment.CreationTimestamp
×
NEW
1075
                }
×
1076

NEW
1077
                if rgTimestamp == nil && sc.Resources.RouteGroupSegment != nil {
×
NEW
1078
                        rgTimestamp = &sc.Resources.RouteGroupSegment.CreationTimestamp
×
NEW
1079
                }
×
1080

NEW
1081
                if ingTimestamp != nil && rgTimestamp != nil {
×
NEW
1082
                        break
×
1083
                }
1084
        }
1085

NEW
1086
        if ingTimestamp != nil && ssc.Ingress != nil {
×
NEW
1087
                if !resourceReadyTime(ingTimestamp.Time, c.ingressSourceSwitchTTL) {
×
NEW
1088
                        c.logger.Infof(
×
NEW
1089
                                "Not deleting Ingress %s, segments created less than %s ago",
×
NEW
1090
                                ssc.Ingress.Name,
×
NEW
1091
                                c.ingressSourceSwitchTTL,
×
NEW
1092
                        )
×
NEW
1093
                        return nil
×
NEW
1094
                }
×
1095

NEW
1096
                err := c.client.NetworkingV1().Ingresses(ssc.Ingress.Namespace).Delete(
×
NEW
1097
                        ctx,
×
NEW
1098
                        ssc.Ingress.Name,
×
NEW
1099
                        metav1.DeleteOptions{},
×
NEW
1100
                )
×
NEW
1101
                if err != nil {
×
NEW
1102
                        return err
×
NEW
1103
                }
×
1104

NEW
1105
                c.recorder.Eventf(
×
NEW
1106
                        ssc.StackSet,
×
NEW
1107
                        v1.EventTypeNormal,
×
NEW
1108
                        "DeletedIngress",
×
NEW
1109
                        "Deleted Ingress %s, StackSet conversion complete",
×
NEW
1110
                        ssc.Ingress.Namespace,
×
NEW
1111
                )
×
NEW
1112

×
NEW
1113
                ssc.Ingress = nil
×
1114
        }
1115

NEW
1116
        if rgTimestamp != nil && ssc.RouteGroup != nil {
×
NEW
1117
                if !resourceReadyTime(rgTimestamp.Time, c.ingressSourceSwitchTTL) {
×
NEW
1118
                        c.logger.Infof(
×
NEW
1119
                                "Not deleting RouteGroup %s, segments created less than %s ago",
×
NEW
1120
                                ssc.RouteGroup.Name,
×
NEW
1121
                                c.ingressSourceSwitchTTL,
×
NEW
1122
                        )
×
NEW
1123
                        return nil
×
NEW
1124
                }
×
1125

NEW
1126
                err := c.client.RouteGroupV1().RouteGroups(
×
NEW
1127
                        ssc.RouteGroup.Namespace,
×
NEW
1128
                ).Delete(
×
NEW
1129
                        ctx,
×
NEW
1130
                        ssc.RouteGroup.Name,
×
NEW
1131
                        metav1.DeleteOptions{},
×
NEW
1132
                )
×
NEW
1133
                if err != nil {
×
NEW
1134
                        return err
×
NEW
1135
                }
×
1136

NEW
1137
                c.recorder.Eventf(
×
NEW
1138
                        ssc.RouteGroup,
×
NEW
1139
                        v1.EventTypeNormal,
×
NEW
1140
                        "DeletedRouteGroup",
×
NEW
1141
                        "Deleted RouteGroup %s, StackSet conversion complete",
×
NEW
1142
                        ssc.RouteGroup.Namespace,
×
NEW
1143
                )
×
NEW
1144

×
NEW
1145
                ssc.RouteGroup = nil
×
1146
        }
1147

NEW
1148
        return nil
×
1149
}
1150

1151
// ReconcileStackSetResources reconciles the central Ingress and/or RouteGroup
1152
// of the specified StackSet.
1153
//
1154
// If the StackSet supports traffic segments, the controller won't reconcile the
1155
// central ingress resources. This method is deprecated and will be removed in
1156
// the future.
1157
func (c *StackSetController) ReconcileStackSetResources(ctx context.Context, ssc *core.StackSetContainer) error {
×
NEW
1158
        if !ssc.SupportsSegmentTraffic() {
×
NEW
1159
                err := c.ReconcileStackSetIngressSources(
×
NEW
1160
                        ctx,
×
NEW
1161
                        ssc.StackSet,
×
NEW
1162
                        ssc.Ingress,
×
NEW
1163
                        ssc.RouteGroup,
×
NEW
1164
                        ssc.GenerateIngress,
×
NEW
1165
                        ssc.GenerateRouteGroup,
×
NEW
1166
                )
×
NEW
1167
                if err != nil {
×
NEW
1168
                        return err
×
NEW
1169
                }
×
NEW
1170
        } else {
×
NEW
1171
                // Convert StackSet to traffic segments, if needed
×
NEW
1172
                err := c.convertToTrafficSegments(ctx, ssc)
×
NEW
1173
                if err != nil {
×
NEW
1174
                        return err
×
NEW
1175
                }
×
1176
        }
1177

1178
        trafficChanges := ssc.TrafficChanges()
×
1179
        if len(trafficChanges) != 0 {
×
1180
                var changeMessages []string
×
1181
                for _, change := range trafficChanges {
×
1182
                        changeMessages = append(changeMessages, change.String())
×
1183
                }
×
1184

1185
                c.recorder.Eventf(
×
1186
                        ssc.StackSet,
×
1187
                        v1.EventTypeNormal,
×
1188
                        "TrafficSwitched",
×
1189
                        "Switched traffic: %s",
×
1190
                        strings.Join(changeMessages, ", "))
×
1191
        }
1192

1193
        return nil
×
1194
}
1195

1196
func (c *StackSetController) ReconcileStackSetDesiredTraffic(ctx context.Context, existing *zv1.StackSet, generateUpdated func() []*zv1.DesiredTraffic) error {
1✔
1197
        updatedTraffic := generateUpdated()
1✔
1198

1✔
1199
        if equality.Semantic.DeepEqual(existing.Spec.Traffic, updatedTraffic) {
1✔
1200
                return nil
×
1201
        }
×
1202

1203
        updated := existing.DeepCopy()
1✔
1204
        updated.Spec.Traffic = updatedTraffic
1✔
1205

1✔
1206
        _, err := c.client.ZalandoV1().StackSets(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
1207
        if err != nil {
1✔
1208
                return err
×
1209
        }
×
1210
        c.recorder.Eventf(
1✔
1211
                updated,
1✔
1212
                v1.EventTypeNormal,
1✔
1213
                "UpdatedStackSet",
1✔
1214
                "Updated StackSet %s",
1✔
1215
                updated.Name)
1✔
1216
        return nil
1✔
1217
}
1218

1219
func (c *StackSetController) ReconcileStackResources(ctx context.Context, ssc *core.StackSetContainer, sc *core.StackContainer) error {
×
1220

×
1221
        err := c.ReconcileStackDeployment(ctx, sc.Stack, sc.Resources.Deployment, sc.GenerateDeployment)
×
1222
        if err != nil {
×
1223
                return c.errorEventf(sc.Stack, "FailedManageDeployment", err)
×
1224
        }
×
1225
        err = c.ReconcileStackHPA(ctx, sc.Stack, sc.Resources.HPA, sc.GenerateHPA)
×
1226
        if err != nil {
×
1227
                return c.errorEventf(sc.Stack, "FailedManageHPA", err)
×
1228
        }
×
1229

1230
        err = c.ReconcileStackService(ctx, sc.Stack, sc.Resources.Service, sc.GenerateService)
×
1231
        if err != nil {
×
1232
                return c.errorEventf(sc.Stack, "FailedManageService", err)
×
1233
        }
×
1234

1235
        err = c.ReconcileStackIngress(ctx, sc.Stack, sc.Resources.Ingress, sc.GenerateIngress)
×
1236
        if err != nil {
×
1237
                return c.errorEventf(sc.Stack, "FailedManageIngress", err)
×
1238
        }
×
1239

1240
        // This is to support both central and segment-based traffic.
NEW
1241
        if ssc.SupportsSegmentTraffic() {
×
NEW
1242
                err = c.ReconcileStackIngress(
×
NEW
1243
                        ctx,
×
NEW
1244
                        sc.Stack,
×
NEW
1245
                        sc.Resources.IngressSegment,
×
NEW
1246
                        sc.GenerateIngressSegment,
×
NEW
1247
                )
×
NEW
1248
                if err != nil {
×
NEW
1249
                        return c.errorEventf(sc.Stack, "FailedManageIngressSegment", err)
×
NEW
1250
                }
×
1251

NEW
1252
                sc.IngressSegmentToUpdate = nil
×
1253
        }
1254

1255
        if c.routeGroupSupportEnabled {
×
1256
                err = c.ReconcileStackRouteGroup(ctx, sc.Stack, sc.Resources.RouteGroup, sc.GenerateRouteGroup)
×
1257
                if err != nil {
×
1258
                        return c.errorEventf(sc.Stack, "FailedManageRouteGroup", err)
×
1259
                }
×
1260

1261
                // This is to support both central and segment-based traffic.
NEW
1262
                if ssc.SupportsSegmentTraffic() {
×
NEW
1263
                        err = c.ReconcileStackRouteGroup(
×
NEW
1264
                                ctx,
×
NEW
1265
                                sc.Stack,
×
NEW
1266
                                sc.Resources.RouteGroupSegment,
×
NEW
1267
                                sc.GenerateRouteGroupSegment,
×
NEW
1268
                        )
×
NEW
1269
                        if err != nil {
×
NEW
1270
                                return c.errorEventf(
×
NEW
1271
                                        sc.Stack,
×
NEW
1272
                                        "FailedManageRouteGroupSegment",
×
NEW
1273
                                        err,
×
NEW
1274
                                )
×
NEW
1275
                        }
×
1276

NEW
1277
                        sc.RouteGroupSegmentToUpdate = nil
×
1278
                }
1279
        }
1280
        return nil
×
1281
}
1282

1283
// ReconcileStackSet reconciles all the things from a stackset
1284
func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *core.StackSetContainer) (err error) {
×
1285
        defer func() {
×
1286
                if r := recover(); r != nil {
×
1287
                        c.metricsReporter.ReportPanic()
×
1288
                        c.stacksetLogger(container).Errorf("Encountered a panic while processing a stackset: %v\n%s", r, debug.Stack())
×
1289
                        err = fmt.Errorf("panic: %v", r)
×
1290
                }
×
1291
        }()
1292

NEW
1293
        if c.injectSegmentAnnotation(ctx, container.StackSet) {
×
NEW
1294
                // Reconciler handles StackSet in the next loop
×
NEW
1295
                return nil
×
NEW
1296
        }
×
1297

1298
        // Create current stack, if needed. Proceed on errors.
1299
        err = c.CreateCurrentStack(ctx, container)
×
1300
        if err != nil {
×
1301
                err = c.errorEventf(container.StackSet, "FailedCreateStack", err)
×
1302
                c.stacksetLogger(container).Errorf("Unable to create stack: %v", err)
×
1303
        }
×
1304

1305
        // Update statuses from external resources (ingresses, deployments, etc). Abort on errors.
1306
        err = container.UpdateFromResources()
×
1307
        if err != nil {
×
1308
                return err
×
1309
        }
×
1310

1311
        // Update the stacks with the currently selected traffic reconciler. Proceed on errors.
1312
        err = container.ManageTraffic(time.Now())
×
1313
        if err != nil {
×
1314
                c.stacksetLogger(container).Errorf("Traffic reconciliation failed: %v", err)
×
1315
                c.recorder.Eventf(
×
1316
                        container.StackSet,
×
1317
                        v1.EventTypeWarning,
×
1318
                        "TrafficNotSwitched",
×
1319
                        "Failed to switch traffic: "+err.Error())
×
1320
        }
×
1321

1322
        // Mark stacks that should be removed
1323
        container.MarkExpiredStacks()
×
1324

×
NEW
1325
        segsInOrder := []core.TrafficSegment{}
×
NEW
1326
        // This is to support both central and segment-based traffic.
×
NEW
1327
        if container.SupportsSegmentTraffic() {
×
NEW
1328
                // Update traffic segments. Proceed on errors.
×
NEW
1329
                segsInOrder, err = c.ReconcileTrafficSegments(ctx, container)
×
NEW
1330
                if err != nil {
×
NEW
1331
                        err = c.errorEventf(
×
NEW
1332
                                container.StackSet,
×
NEW
1333
                                reasonFailedManageStackSet,
×
NEW
1334
                                err,
×
NEW
1335
                        )
×
NEW
1336
                        c.stacksetLogger(container).Errorf(
×
NEW
1337
                                "Unable to reconcile traffic segments: %v",
×
NEW
1338
                                err,
×
NEW
1339
                        )
×
NEW
1340
                }
×
1341
        }
1342

1343
        // Reconcile stack resources. Proceed on errors.
NEW
1344
        reconciledStacks := map[types.UID]bool{}
×
NEW
1345
        for _, ts := range segsInOrder {
×
NEW
1346
                reconciledStacks[ts.GetID()] = true
×
NEW
1347
                sc := container.StackContainers[ts.GetID()]
×
NEW
1348
                err = c.ReconcileStackResources(ctx, container, sc)
×
NEW
1349
                if err != nil {
×
NEW
1350
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
NEW
1351
                        c.stackLogger(container, sc).Errorf(
×
NEW
1352
                                "Unable to reconcile stack resources: %v",
×
NEW
1353
                                err,
×
NEW
1354
                        )
×
NEW
1355
                }
×
1356
        }
1357

NEW
1358
        for k, sc := range container.StackContainers {
×
NEW
1359
                if reconciledStacks[k] {
×
NEW
1360
                        continue
×
1361
                }
1362

1363
                err = c.ReconcileStackResources(ctx, container, sc)
×
1364
                if err != nil {
×
1365
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
1366
                        c.stackLogger(container, sc).Errorf("Unable to reconcile stack resources: %v", err)
×
1367
                }
×
1368
        }
1369

1370
        // Reconcile stackset resources (update ingress and/or routegroups). Proceed on errors.
1371
        err = c.ReconcileStackSetResources(ctx, container)
×
1372
        if err != nil {
×
1373
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1374
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset resources: %v", err)
×
1375
        }
×
1376

1377
        // Reconcile desired traffic in the stackset. Proceed on errors.
1378
        err = c.ReconcileStackSetDesiredTraffic(ctx, container.StackSet, container.GenerateStackSetTraffic)
×
1379
        if err != nil {
×
1380
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1381
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset traffic: %v", err)
×
1382
        }
×
1383

1384
        // Delete old stacks. Proceed on errors.
1385
        err = c.CleanupOldStacks(ctx, container)
×
1386
        if err != nil {
×
1387
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1388
                c.stacksetLogger(container).Errorf("Unable to delete old stacks: %v", err)
×
1389
        }
×
1390

1391
        // Update statuses.
1392
        err = c.ReconcileStatuses(ctx, container)
×
1393
        if err != nil {
×
1394
                return err
×
1395
        }
×
1396

1397
        return nil
×
1398
}
1399

1400
// getResetMinReplicasDelay parses and returns the reset delay if set in the
1401
// stackset annotation.
1402
func getResetMinReplicasDelay(annotations map[string]string) (time.Duration, bool) {
1✔
1403
        resetDelayStr, ok := annotations[ResetHPAMinReplicasDelayAnnotationKey]
1✔
1404
        if !ok {
2✔
1405
                return 0, false
1✔
1406
        }
1✔
1407
        resetDelay, err := time.ParseDuration(resetDelayStr)
1✔
1408
        if err != nil {
1✔
1409
                return 0, false
×
1410
        }
×
1411
        return resetDelay, true
1✔
1412
}
1413

1414
func fixupStackSetTypeMeta(stackset *zv1.StackSet) {
1✔
1415
        // set TypeMeta manually because of this bug:
1✔
1416
        // https://github.com/kubernetes/client-go/issues/308
1✔
1417
        stackset.APIVersion = core.APIVersion
1✔
1418
        stackset.Kind = core.KindStackSet
1✔
1419
}
1✔
1420

1421
func fixupStackTypeMeta(stack *zv1.Stack) {
1✔
1422
        // set TypeMeta manually because of this bug:
1✔
1423
        // https://github.com/kubernetes/client-go/issues/308
1✔
1424
        stack.APIVersion = core.APIVersion
1✔
1425
        stack.Kind = core.KindStack
1✔
1426
}
1✔
1427

1428
func resourceReady(timestamp string, ttl time.Duration) (bool, error) {
1✔
1429
        resourceLastUpdated, err := time.Parse(time.RFC3339, timestamp)
1✔
1430
        if err != nil {
2✔
1431
                // wait until there's a valid timestamp on the annotation
1✔
1432
                return false, err
1✔
1433
        }
1✔
1434

1435
        return resourceReadyTime(resourceLastUpdated, ttl), nil
1✔
1436
}
1437

1438
func resourceReadyTime(timestamp time.Time, ttl time.Duration) bool {
1✔
1439
        if !timestamp.IsZero() && time.Since(timestamp) > ttl {
2✔
1440
                return true
1✔
1441
        }
1✔
1442

1443
        return false
1✔
1444
}
1445

1446
// validateConfigurationResourceNames returns an error if any ConfigurationResource name is not prefixed by Stack name.
1447
func validateConfigurationResourceNames(stack *zv1.Stack) error {
1✔
1448
        for _, rsc := range stack.Spec.ConfigurationResources {
2✔
1449
                rscName := rsc.ConfigMapRef.Name
1✔
1450
                if !strings.HasPrefix(rscName, stack.Name) {
1✔
1451
                        return fmt.Errorf("ConfigurationResource name must be prefixed by Stack name. "+
×
1452
                                "ConfigurationResource: %s, Stack: %s", rscName, stack.Name)
×
1453
                }
×
1454
        }
1455

1456
        return nil
1✔
1457
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc