• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando-incubator / stackset-controller / 7207467831

14 Dec 2023 10:17AM UTC coverage: 70.969% (-2.3%) from 73.254%
7207467831

Pull #495

github

gargravarr
Add missing ConfigMap code.
Pull Request #495: Add support for traffic segments.

318 of 562 new or added lines in 7 files covered. (56.58%)

1 existing line in 1 file now uncovered.

2606 of 3672 relevant lines covered (70.97%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.17
/controller/stackset.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "net/http"
7
        "runtime/debug"
8
        "strings"
9
        "sync"
10
        "time"
11

12
        "github.com/google/go-cmp/cmp"
13
        "github.com/google/go-cmp/cmp/cmpopts"
14
        "github.com/heptiolabs/healthcheck"
15
        "github.com/prometheus/client_golang/prometheus"
16
        log "github.com/sirupsen/logrus"
17
        rgv1 "github.com/szuecs/routegroup-client/apis/zalando.org/v1"
18
        zv1 "github.com/zalando-incubator/stackset-controller/pkg/apis/zalando.org/v1"
19
        "github.com/zalando-incubator/stackset-controller/pkg/clientset"
20
        "github.com/zalando-incubator/stackset-controller/pkg/core"
21
        "github.com/zalando-incubator/stackset-controller/pkg/recorder"
22
        "golang.org/x/sync/errgroup"
23
        v1 "k8s.io/api/core/v1"
24
        networking "k8s.io/api/networking/v1"
25
        "k8s.io/apimachinery/pkg/api/equality"
26
        "k8s.io/apimachinery/pkg/api/errors"
27
        "k8s.io/apimachinery/pkg/api/resource"
28
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
        "k8s.io/apimachinery/pkg/fields"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        "k8s.io/client-go/tools/cache"
33
        kube_record "k8s.io/client-go/tools/record"
34
)
35

36
const (
37
        PrescaleStacksAnnotationKey               = "alpha.stackset-controller.zalando.org/prescale-stacks"
38
        ResetHPAMinReplicasDelayAnnotationKey     = "alpha.stackset-controller.zalando.org/reset-hpa-min-replicas-delay"
39
        StacksetControllerControllerAnnotationKey = "stackset-controller.zalando.org/controller"
40
        ControllerLastUpdatedAnnotationKey        = "stackset-controller.zalando.org/updated-timestamp"
41
        TrafficSegmentsAnnotationKey              = "stackset-controller.zalando.org/use-traffic-segments"
42

43
        reasonFailedManageStackSet = "FailedManageStackSet"
44

45
        defaultResetMinReplicasDelay = 10 * time.Minute
46
)
47

48
// StackSetController is the main controller. It watches for changes to
49
// stackset resources and starts and maintains other controllers per
50
// stackset resource.
51
type StackSetController struct {
52
        logger                      *log.Entry
53
        client                      clientset.Interface
54
        controllerID                string
55
        backendWeightsAnnotationKey string
56
        clusterDomains              []string
57
        interval                    time.Duration
58
        stacksetEvents              chan stacksetEvent
59
        stacksetStore               map[types.UID]zv1.StackSet
60
        recorder                    kube_record.EventRecorder
61
        metricsReporter             *core.MetricsReporter
62
        HealthReporter              healthcheck.Handler
63
        routeGroupSupportEnabled    bool
64
        trafficSegmentsEnabled      bool
65
        annotatedTrafficSegments    bool
66
        ingressSourceSwitchTTL      time.Duration
67
        now                         func() string
68
        reconcileWorkers            int
69
        configMapSupportEnabled     bool
70
        sync.Mutex
71
}
72

73
type stacksetEvent struct {
74
        Deleted  bool
75
        StackSet *zv1.StackSet
76
}
77

78
// eventedError wraps an error that was already exposed as an event to the user
79
type eventedError struct {
80
        err error
81
}
82

83
func (ee *eventedError) Error() string {
×
84
        return ee.err.Error()
×
85
}
×
86

87
func now() string {
×
88
        return time.Now().Format(time.RFC3339)
×
89
}
×
90

91
// NewStackSetController initializes a new StackSetController.
92
func NewStackSetController(
93
        client clientset.Interface,
94
        controllerID string,
95
        parallelWork int,
96
        backendWeightsAnnotationKey string,
97
        clusterDomains []string,
98
        registry prometheus.Registerer,
99
        interval time.Duration,
100
        routeGroupSupportEnabled bool,
101
        trafficSegmentsEnabled bool,
102
        annotatedTrafficSegments bool,
103
        configMapSupportEnabled bool,
104
        ingressSourceSwitchTTL time.Duration,
105
) (*StackSetController, error) {
1✔
106
        metricsReporter, err := core.NewMetricsReporter(registry)
1✔
107
        if err != nil {
1✔
108
                return nil, err
×
109
        }
×
110

111
        return &StackSetController{
1✔
112
                logger:                      log.WithFields(log.Fields{"controller": "stackset"}),
1✔
113
                client:                      client,
1✔
114
                controllerID:                controllerID,
1✔
115
                backendWeightsAnnotationKey: backendWeightsAnnotationKey,
1✔
116
                clusterDomains:              clusterDomains,
1✔
117
                interval:                    interval,
1✔
118
                stacksetEvents:              make(chan stacksetEvent, 1),
1✔
119
                stacksetStore:               make(map[types.UID]zv1.StackSet),
1✔
120
                recorder:                    recorder.CreateEventRecorder(client),
1✔
121
                metricsReporter:             metricsReporter,
1✔
122
                HealthReporter:              healthcheck.NewHandler(),
1✔
123
                routeGroupSupportEnabled:    routeGroupSupportEnabled,
1✔
124
                trafficSegmentsEnabled:      trafficSegmentsEnabled,
1✔
125
                annotatedTrafficSegments:    annotatedTrafficSegments,
1✔
126
                ingressSourceSwitchTTL:      ingressSourceSwitchTTL,
1✔
127
                configMapSupportEnabled:     configMapSupportEnabled,
1✔
128
                now:                         now,
1✔
129
                reconcileWorkers:            parallelWork,
1✔
130
        }, nil
1✔
131
}
132

133
func (c *StackSetController) stacksetLogger(ssc *core.StackSetContainer) *log.Entry {
×
134
        return c.logger.WithFields(map[string]interface{}{
×
135
                "namespace": ssc.StackSet.Namespace,
×
136
                "stackset":  ssc.StackSet.Name,
×
137
        })
×
138
}
×
139

140
func (c *StackSetController) stackLogger(ssc *core.StackSetContainer, sc *core.StackContainer) *log.Entry {
×
141
        return c.logger.WithFields(map[string]interface{}{
×
142
                "namespace": ssc.StackSet.Namespace,
×
143
                "stackset":  ssc.StackSet.Name,
×
144
                "stack":     sc.Name(),
×
145
        })
×
146
}
×
147

148
// Run runs the main loop of the StackSetController. Before the loops it
149
// sets up a watcher to watch StackSet resources. The watch will send
150
// changes over a channel which is polled from the main loop.
151
func (c *StackSetController) Run(ctx context.Context) {
×
152
        var nextCheck time.Time
×
153

×
154
        // We're not alive if nextCheck is too far in the past
×
155
        c.HealthReporter.AddLivenessCheck("nextCheck", func() error {
×
156
                if time.Since(nextCheck) > 5*c.interval {
×
157
                        return fmt.Errorf("nextCheck too old")
×
158
                }
×
159
                return nil
×
160
        })
161

162
        c.startWatch(ctx)
×
163

×
164
        http.HandleFunc("/healthz", c.HealthReporter.LiveEndpoint)
×
165

×
166
        nextCheck = time.Now().Add(-c.interval)
×
167

×
168
        for {
×
169
                select {
×
170
                case <-time.After(time.Until(nextCheck)):
×
171

×
172
                        nextCheck = time.Now().Add(c.interval)
×
173

×
174
                        stackSetContainers, err := c.collectResources(ctx)
×
175
                        if err != nil {
×
176
                                c.logger.Errorf("Failed to collect resources: %v", err)
×
177
                                continue
×
178
                        }
179

180
                        var reconcileGroup errgroup.Group
×
181
                        reconcileGroup.SetLimit(c.reconcileWorkers)
×
182
                        for stackset, container := range stackSetContainers {
×
183
                                container := container
×
184
                                stackset := stackset
×
185

×
186
                                reconcileGroup.Go(func() error {
×
187
                                        if _, ok := c.stacksetStore[stackset]; ok {
×
188
                                                err := c.ReconcileStackSet(ctx, container)
×
189
                                                if err != nil {
×
190
                                                        c.stacksetLogger(container).Errorf("unable to reconcile a stackset: %v", err)
×
191
                                                        return c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
192
                                                }
×
193
                                        }
194
                                        return nil
×
195
                                })
196
                        }
197

198
                        err = reconcileGroup.Wait()
×
199
                        if err != nil {
×
200
                                c.logger.Errorf("Failed waiting for reconcilers: %v", err)
×
201
                        }
×
202
                        err = c.metricsReporter.Report(stackSetContainers)
×
203
                        if err != nil {
×
204
                                c.logger.Errorf("Failed reporting metrics: %v", err)
×
205
                        }
×
206
                case e := <-c.stacksetEvents:
×
207
                        stackset := *e.StackSet
×
208
                        fixupStackSetTypeMeta(&stackset)
×
209

×
210
                        // update/delete existing entry
×
211
                        if _, ok := c.stacksetStore[stackset.UID]; ok {
×
212
                                if e.Deleted || !c.hasOwnership(&stackset) {
×
213
                                        delete(c.stacksetStore, stackset.UID)
×
214
                                        continue
×
215
                                }
216

217
                                // update stackset entry
218
                                c.stacksetStore[stackset.UID] = stackset
×
219
                                continue
×
220
                        }
221

222
                        // check if stackset should be managed by the controller
223
                        if !c.hasOwnership(&stackset) {
×
224
                                continue
×
225
                        }
226

227
                        c.logger.Infof("Adding entry for StackSet %s/%s", stackset.Namespace, stackset.Name)
×
228
                        c.stacksetStore[stackset.UID] = stackset
×
229
                case <-ctx.Done():
×
230
                        c.logger.Info("Terminating main controller loop.")
×
231
                        return
×
232
                }
233
        }
234
}
235

236
// injectSegmentAnnotation injects the traffic segment annotation if it's not
237
// already present.
238
//
239
// Only inject the traffic segment annotation if the controller has traffic
240
// segments enabled by default, i.e. doesn't matter if the StackSet has the
241
// segment annotation.
242
func (c *StackSetController) injectSegmentAnnotation(
243
        ctx context.Context,
244
        stackSet *zv1.StackSet,
NEW
245
) bool {
×
NEW
246
        if !c.trafficSegmentsEnabled {
×
NEW
247
                return false
×
NEW
248
        }
×
249

NEW
250
        if c.annotatedTrafficSegments {
×
NEW
251
                return false
×
NEW
252
        }
×
253

NEW
254
        if stackSet.Annotations[TrafficSegmentsAnnotationKey] == "true" {
×
NEW
255
                return false
×
NEW
256
        }
×
257

NEW
258
        stackSet.Annotations[TrafficSegmentsAnnotationKey] = "true"
×
NEW
259

×
NEW
260
        _, err := c.client.ZalandoV1().StackSets(
×
NEW
261
                stackSet.Namespace,
×
NEW
262
        ).Update(
×
NEW
263
                ctx,
×
NEW
264
                stackSet,
×
NEW
265
                metav1.UpdateOptions{},
×
NEW
266
        )
×
NEW
267
        if err != nil {
×
NEW
268
                c.logger.Errorf(
×
NEW
269
                        "Failed injecting segment annotation: %v",
×
NEW
270
                        err,
×
NEW
271
                )
×
NEW
272
                return false
×
NEW
273
        }
×
NEW
274
        c.recorder.Eventf(
×
NEW
275
                stackSet,
×
NEW
276
                v1.EventTypeNormal,
×
NEW
277
                "UpdatedStackSet",
×
NEW
278
                "Updated StackSet %s. Injected %s annotation",
×
NEW
279
                stackSet.Name,
×
NEW
280
                TrafficSegmentsAnnotationKey,
×
NEW
281
        )
×
NEW
282

×
NEW
283
        return true
×
284
}
285

286
// collectResources collects resources for all stacksets at once and stores them per StackSet/Stack so that we don't
287
// overload the API requests with unnecessary requests
288
func (c *StackSetController) collectResources(ctx context.Context) (map[types.UID]*core.StackSetContainer, error) {
1✔
289
        stacksets := make(map[types.UID]*core.StackSetContainer, len(c.stacksetStore))
1✔
290
        for uid, stackset := range c.stacksetStore {
2✔
291
                stackset := stackset
1✔
292

1✔
293
                reconciler := core.TrafficReconciler(&core.SimpleTrafficReconciler{})
1✔
294

1✔
295
                // use prescaling logic if enabled with an annotation
1✔
296
                if _, ok := stackset.Annotations[PrescaleStacksAnnotationKey]; ok {
2✔
297
                        resetDelay := defaultResetMinReplicasDelay
1✔
298
                        if resetDelayValue, ok := getResetMinReplicasDelay(stackset.Annotations); ok {
2✔
299
                                resetDelay = resetDelayValue
1✔
300
                        }
1✔
301
                        reconciler = &core.PrescalingTrafficReconciler{
1✔
302
                                ResetHPAMinReplicasTimeout: resetDelay,
1✔
303
                        }
1✔
304
                }
305

306
                stacksetContainer := core.NewContainer(&stackset, reconciler, c.backendWeightsAnnotationKey, c.clusterDomains)
1✔
307
                if c.trafficSegmentsEnabled &&
1✔
308
                        stackset.Annotations[TrafficSegmentsAnnotationKey] == "true" {
1✔
NEW
309

×
NEW
310
                        stacksetContainer.EnableSegmentTraffic()
×
NEW
311
                }
×
312
                stacksets[uid] = stacksetContainer
1✔
313
        }
314

315
        err := c.collectStacks(ctx, stacksets)
1✔
316
        if err != nil {
1✔
317
                return nil, err
×
318
        }
×
319

320
        err = c.collectIngresses(ctx, stacksets)
1✔
321
        if err != nil {
1✔
322
                return nil, err
×
323
        }
×
324

325
        if c.routeGroupSupportEnabled {
2✔
326
                err = c.collectRouteGroups(ctx, stacksets)
1✔
327
                if err != nil {
1✔
328
                        return nil, err
×
329
                }
×
330
        }
331

332
        err = c.collectDeployments(ctx, stacksets)
1✔
333
        if err != nil {
1✔
334
                return nil, err
×
335
        }
×
336

337
        err = c.collectServices(ctx, stacksets)
1✔
338
        if err != nil {
1✔
339
                return nil, err
×
340
        }
×
341

342
        err = c.collectHPAs(ctx, stacksets)
1✔
343
        if err != nil {
1✔
344
                return nil, err
×
345
        }
×
346

347
        if c.configMapSupportEnabled {
2✔
348
                err = c.collectConfigMaps(ctx, stacksets)
1✔
349
                if err != nil {
1✔
350
                        return nil, err
×
351
                }
×
352
        }
353

354
        return stacksets, nil
1✔
355
}
356

357
func (c *StackSetController) collectIngresses(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
358
        ingresses, err := c.client.NetworkingV1().Ingresses(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
359

1✔
360
        if err != nil {
1✔
361
                return fmt.Errorf("failed to list Ingresses: %v", err)
×
362
        }
×
363

364
        for _, i := range ingresses.Items {
2✔
365
                ingress := i
1✔
366
                if uid, ok := getOwnerUID(ingress.ObjectMeta); ok {
2✔
367
                        // stackset ingress
1✔
368
                        if s, ok := stacksets[uid]; ok {
2✔
369
                                s.Ingress = &ingress
1✔
370
                                continue
1✔
371
                        }
372

373
                        // stack ingress
374
                        for _, stackset := range stacksets {
2✔
375
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
376
                                        if strings.HasSuffix(
1✔
377
                                                ingress.ObjectMeta.Name,
1✔
378
                                                core.SegmentSuffix,
1✔
379
                                        ) {
2✔
380
                                                // Traffic Segment
1✔
381
                                                s.Resources.IngressSegment = &ingress
1✔
382
                                        } else {
2✔
383
                                                s.Resources.Ingress = &ingress
1✔
384
                                        }
1✔
385
                                        break
1✔
386
                                }
387
                        }
388
                }
389
        }
390
        return nil
1✔
391
}
392

393
func (c *StackSetController) collectRouteGroups(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
394
        rgs, err := c.client.RouteGroupV1().RouteGroups(v1.NamespaceAll).List(
1✔
395
                ctx,
1✔
396
                metav1.ListOptions{},
1✔
397
        )
1✔
398
        if err != nil {
1✔
399
                return fmt.Errorf("failed to list RouteGroups: %v", err)
×
400
        }
×
401

402
        for _, rg := range rgs.Items {
2✔
403
                routegroup := rg
1✔
404
                if uid, ok := getOwnerUID(routegroup.ObjectMeta); ok {
2✔
405
                        // stackset routegroups
1✔
406
                        if s, ok := stacksets[uid]; ok {
2✔
407
                                s.RouteGroup = &routegroup
1✔
408
                                continue
1✔
409
                        }
410

411
                        // stack routegroups
412
                        for _, stackset := range stacksets {
2✔
413
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
414
                                        if strings.HasSuffix(
1✔
415
                                                routegroup.ObjectMeta.Name,
1✔
416
                                                core.SegmentSuffix,
1✔
417
                                        ) {
2✔
418
                                                // Traffic Segment
1✔
419
                                                s.Resources.RouteGroupSegment = &routegroup
1✔
420
                                        } else {
2✔
421
                                                s.Resources.RouteGroup = &routegroup
1✔
422
                                        }
1✔
423
                                        break
1✔
424
                                }
425
                        }
426
                }
427
        }
428
        return nil
1✔
429
}
430

431
func (c *StackSetController) collectStacks(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
432
        stacks, err := c.client.ZalandoV1().Stacks(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
433
        if err != nil {
1✔
434
                return fmt.Errorf("failed to list Stacks: %v", err)
×
435
        }
×
436

437
        for _, stack := range stacks.Items {
2✔
438
                if uid, ok := getOwnerUID(stack.ObjectMeta); ok {
2✔
439
                        if s, ok := stacksets[uid]; ok {
2✔
440
                                stack := stack
1✔
441
                                fixupStackTypeMeta(&stack)
1✔
442

1✔
443
                                s.StackContainers[stack.UID] = &core.StackContainer{
1✔
444
                                        Stack: &stack,
1✔
445
                                }
1✔
446
                                continue
1✔
447
                        }
448
                }
449
        }
450
        return nil
1✔
451
}
452

453
func (c *StackSetController) collectDeployments(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
454
        deployments, err := c.client.AppsV1().Deployments(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
455
        if err != nil {
1✔
456
                return fmt.Errorf("failed to list Deployments: %v", err)
×
457
        }
×
458

459
        for _, d := range deployments.Items {
2✔
460
                deployment := d
1✔
461
                if uid, ok := getOwnerUID(deployment.ObjectMeta); ok {
2✔
462
                        for _, stackset := range stacksets {
2✔
463
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
464
                                        s.Resources.Deployment = &deployment
1✔
465
                                        break
1✔
466
                                }
467
                        }
468
                }
469
        }
470
        return nil
1✔
471
}
472

473
func (c *StackSetController) collectServices(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
474
        services, err := c.client.CoreV1().Services(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
475
        if err != nil {
1✔
476
                return fmt.Errorf("failed to list Services: %v", err)
×
477
        }
×
478

479
Items:
1✔
480
        for _, s := range services.Items {
2✔
481
                service := s
1✔
482
                if uid, ok := getOwnerUID(service.ObjectMeta); ok {
2✔
483
                        for _, stackset := range stacksets {
2✔
484
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
485
                                        s.Resources.Service = &service
1✔
486
                                        continue Items
1✔
487
                                }
488

489
                                // service/HPA used to be owned by the deployment for some reason
490
                                for _, stack := range stackset.StackContainers {
2✔
491
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
492
                                                stack.Resources.Service = &service
1✔
493
                                                continue Items
1✔
494
                                        }
495
                                }
496
                        }
497
                }
498
        }
499
        return nil
1✔
500
}
501

502
func (c *StackSetController) collectHPAs(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
503
        hpas, err := c.client.AutoscalingV2().HorizontalPodAutoscalers(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
504
        if err != nil {
1✔
505
                return fmt.Errorf("failed to list HPAs: %v", err)
×
506
        }
×
507

508
Items:
1✔
509
        for _, h := range hpas.Items {
2✔
510
                hpa := h
1✔
511
                if uid, ok := getOwnerUID(hpa.ObjectMeta); ok {
2✔
512
                        for _, stackset := range stacksets {
2✔
513
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
514
                                        s.Resources.HPA = &hpa
1✔
515
                                        continue Items
1✔
516
                                }
517

518
                                // service/HPA used to be owned by the deployment for some reason
519
                                for _, stack := range stackset.StackContainers {
2✔
520
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
521
                                                stack.Resources.HPA = &hpa
1✔
522
                                                continue Items
1✔
523
                                        }
524
                                }
525
                        }
526
                }
527
        }
528
        return nil
1✔
529
}
530

531
func (c *StackSetController) collectConfigMaps(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
532
        configMaps, err := c.client.CoreV1().ConfigMaps(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
533
        if err != nil {
1✔
534
                return fmt.Errorf("failed to list ConfigMaps: %v", err)
×
535
        }
×
536

537
        for _, cm := range configMaps.Items {
2✔
538
                configMap := cm
1✔
539
                if uid, ok := getOwnerUID(configMap.ObjectMeta); ok {
2✔
540
                        for _, stackset := range stacksets {
2✔
541
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
542
                                        s.Resources.ConfigMaps = append(s.Resources.ConfigMaps, &configMap)
1✔
543
                                        break
1✔
544
                                }
545
                        }
546
                }
547
        }
548
        return nil
1✔
549
}
550

551
func getOwnerUID(objectMeta metav1.ObjectMeta) (types.UID, bool) {
1✔
552
        if len(objectMeta.OwnerReferences) == 1 {
2✔
553
                return objectMeta.OwnerReferences[0].UID, true
1✔
554
        }
1✔
555
        return "", false
1✔
556
}
557

558
func (c *StackSetController) errorEventf(object runtime.Object, reason string, err error) error {
×
559
        switch err.(type) {
×
560
        case *eventedError:
×
561
                // already notified
×
562
                return err
×
563
        default:
×
564
                c.recorder.Eventf(
×
565
                        object,
×
566
                        v1.EventTypeWarning,
×
567
                        reason,
×
568
                        err.Error())
×
569
                return &eventedError{err: err}
×
570
        }
571
}
572

573
// hasOwnership returns true if the controller is the "owner" of the stackset.
574
// Whether it's owner is determined by the value of the
575
// 'stackset-controller.zalando.org/controller' annotation. If the value
576
// matches the controllerID then it owns it, or if the controllerID is
577
// "" and there's no annotation set.
578
func (c *StackSetController) hasOwnership(stackset *zv1.StackSet) bool {
×
579
        if stackset.Annotations != nil {
×
580
                if owner, ok := stackset.Annotations[StacksetControllerControllerAnnotationKey]; ok {
×
581
                        return owner == c.controllerID
×
582
                }
×
583
        }
584
        return c.controllerID == ""
×
585
}
586

587
func (c *StackSetController) startWatch(ctx context.Context) {
×
588
        informer := cache.NewSharedIndexInformer(
×
589
                cache.NewListWatchFromClient(c.client.ZalandoV1().RESTClient(), "stacksets", v1.NamespaceAll, fields.Everything()),
×
590
                &zv1.StackSet{},
×
591
                0, // skip resync
×
592
                cache.Indexers{},
×
593
        )
×
594

×
595
        informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
596
                AddFunc:    c.add,
×
597
                UpdateFunc: c.update,
×
598
                DeleteFunc: c.del,
×
599
        })
×
600
        go informer.Run(ctx.Done())
×
601
        if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
×
602
                c.logger.Errorf("Timed out waiting for caches to sync")
×
603
                return
×
604
        }
×
605
        c.logger.Info("Synced StackSet watcher")
×
606
}
607

608
func (c *StackSetController) add(obj interface{}) {
×
609
        stackset, ok := obj.(*zv1.StackSet)
×
610
        if !ok {
×
611
                return
×
612
        }
×
613

614
        c.logger.Infof("New StackSet added %s/%s", stackset.Namespace, stackset.Name)
×
615
        c.stacksetEvents <- stacksetEvent{
×
616
                StackSet: stackset.DeepCopy(),
×
617
        }
×
618
}
619

620
func (c *StackSetController) update(oldObj, newObj interface{}) {
×
621
        newStackset, ok := newObj.(*zv1.StackSet)
×
622
        if !ok {
×
623
                return
×
624
        }
×
625

626
        oldStackset, ok := oldObj.(*zv1.StackSet)
×
627
        if !ok {
×
628
                return
×
629
        }
×
630

631
        c.logger.Debugf("StackSet %s/%s changed: %s",
×
632
                newStackset.Namespace,
×
633
                newStackset.Name,
×
634
                cmp.Diff(oldStackset, newStackset, cmpopts.IgnoreUnexported(resource.Quantity{})),
×
635
        )
×
636

×
637
        c.logger.Infof("StackSet updated %s/%s", newStackset.Namespace, newStackset.Name)
×
638
        c.stacksetEvents <- stacksetEvent{
×
639
                StackSet: newStackset.DeepCopy(),
×
640
        }
×
641
}
642

643
func (c *StackSetController) del(obj interface{}) {
×
644
        stackset, ok := obj.(*zv1.StackSet)
×
645
        if !ok {
×
646
                return
×
647
        }
×
648

649
        c.logger.Infof("StackSet deleted %s/%s", stackset.Namespace, stackset.Name)
×
650
        c.stacksetEvents <- stacksetEvent{
×
651
                StackSet: stackset.DeepCopy(),
×
652
                Deleted:  true,
×
653
        }
×
654
}
655

656
func retryUpdate(updateFn func(retry bool) error) error {
×
657
        retry := false
×
658
        for {
×
659
                err := updateFn(retry)
×
660
                if err != nil {
×
661
                        if errors.IsConflict(err) {
×
662
                                retry = true
×
663
                                continue
×
664
                        }
665
                        return err
×
666
                }
667
                return nil
×
668
        }
669
}
670

671
// ReconcileStatuses reconciles the statuses of StackSets and Stacks.
672
func (c *StackSetController) ReconcileStatuses(ctx context.Context, ssc *core.StackSetContainer) error {
×
673
        for _, sc := range ssc.StackContainers {
×
674
                stack := sc.Stack.DeepCopy()
×
675
                status := *sc.GenerateStackStatus()
×
676
                err := retryUpdate(func(retry bool) error {
×
677
                        if retry {
×
678
                                updated, err := c.client.ZalandoV1().Stacks(sc.Namespace()).Get(ctx, stack.Name, metav1.GetOptions{})
×
679
                                if err != nil {
×
680
                                        return err
×
681
                                }
×
682
                                stack = updated
×
683
                        }
684
                        if !equality.Semantic.DeepEqual(status, stack.Status) {
×
685
                                stack.Status = status
×
686
                                _, err := c.client.ZalandoV1().Stacks(sc.Namespace()).UpdateStatus(ctx, stack, metav1.UpdateOptions{})
×
687
                                return err
×
688
                        }
×
689
                        return nil
×
690
                })
691
                if err != nil {
×
692
                        return c.errorEventf(sc.Stack, "FailedUpdateStackStatus", err)
×
693
                }
×
694
        }
695

696
        stackset := ssc.StackSet.DeepCopy()
×
697
        status := *ssc.GenerateStackSetStatus()
×
698
        err := retryUpdate(func(retry bool) error {
×
699
                if retry {
×
700
                        updated, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).Get(ctx, ssc.StackSet.Name, metav1.GetOptions{})
×
701
                        if err != nil {
×
702
                                return err
×
703
                        }
×
704
                        stackset = updated
×
705
                }
706
                if !equality.Semantic.DeepEqual(status, stackset.Status) {
×
707
                        stackset.Status = status
×
708
                        _, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, stackset, metav1.UpdateOptions{})
×
709
                        return err
×
710
                }
×
711
                return nil
×
712
        })
713
        if err != nil {
×
714
                return c.errorEventf(ssc.StackSet, "FailedUpdateStackSetStatus", err)
×
715
        }
×
716
        return nil
×
717
}
718

719
// ReconcileTrafficSegments updates the traffic segments according to the actual
720
// traffic weight of each stack.
721
//
722
// Returns the ordered list of Trafic Segments that need to be updated.
723
func (c *StackSetController) ReconcileTrafficSegments(
724
        ctx context.Context,
725
        ssc *core.StackSetContainer,
NEW
726
) ([]types.UID, error) {
×
NEW
727
        // Compute segments
×
NEW
728
        toUpdate, err := ssc.ComputeTrafficSegments()
×
NEW
729
        if err != nil {
×
NEW
730
                return nil, c.errorEventf(ssc.StackSet, "FailedManageSegments", err)
×
NEW
731
        }
×
732

NEW
733
        return toUpdate, nil
×
734
}
735

736
// CreateCurrentStack creates a new Stack object for the current stack, if needed
737
func (c *StackSetController) CreateCurrentStack(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
738
        newStack, newStackVersion := ssc.NewStack()
1✔
739
        if newStack == nil {
2✔
740
                return nil
1✔
741
        }
1✔
742

743
        if c.configMapSupportEnabled {
2✔
744
                // ensure that ConfigurationResources are prefixed by Stack name.
1✔
745
                if err := validateConfigurationResourceNames(newStack.Stack); err != nil {
1✔
746
                        return err
×
747
                }
×
748
        }
749

750
        created, err := c.client.ZalandoV1().Stacks(newStack.Namespace()).Create(ctx, newStack.Stack, metav1.CreateOptions{})
1✔
751
        if err != nil {
1✔
752
                return err
×
753
        }
×
754
        fixupStackTypeMeta(created)
1✔
755

1✔
756
        c.recorder.Eventf(
1✔
757
                ssc.StackSet,
1✔
758
                v1.EventTypeNormal,
1✔
759
                "CreatedStack",
1✔
760
                "Created stack %s",
1✔
761
                newStack.Name(),
1✔
762
        )
1✔
763

1✔
764
        // Persist ObservedStackVersion in the status
1✔
765
        updated := ssc.StackSet.DeepCopy()
1✔
766
        updated.Status.ObservedStackVersion = newStackVersion
1✔
767

1✔
768
        result, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, updated, metav1.UpdateOptions{})
1✔
769
        if err != nil {
1✔
770
                return err
×
771
        }
×
772
        fixupStackSetTypeMeta(result)
1✔
773
        ssc.StackSet = result
1✔
774

1✔
775
        ssc.StackContainers[created.UID] = &core.StackContainer{
1✔
776
                Stack:          created,
1✔
777
                PendingRemoval: false,
1✔
778
                Resources:      core.StackResources{},
1✔
779
        }
1✔
780
        return nil
1✔
781
}
782

783
// CleanupOldStacks deletes stacks that are no longer needed.
784
func (c *StackSetController) CleanupOldStacks(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
785
        for _, sc := range ssc.StackContainers {
2✔
786
                if !sc.PendingRemoval {
2✔
787
                        continue
1✔
788
                }
789

790
                stack := sc.Stack
1✔
791
                err := c.client.ZalandoV1().Stacks(stack.Namespace).Delete(ctx, stack.Name, metav1.DeleteOptions{})
1✔
792
                if err != nil {
1✔
793
                        return c.errorEventf(ssc.StackSet, "FailedDeleteStack", err)
×
794
                }
×
795
                c.recorder.Eventf(
1✔
796
                        ssc.StackSet,
1✔
797
                        v1.EventTypeNormal,
1✔
798
                        "DeletedExcessStack",
1✔
799
                        "Deleted excess stack %s",
1✔
800
                        stack.Name)
1✔
801
        }
802

803
        return nil
1✔
804
}
805

806
// AddUpdateStackSetIngress reconciles the Ingress but never deletes it, it returns the existing/new Ingress
807
func (c *StackSetController) AddUpdateStackSetIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup, ingress *networking.Ingress) (*networking.Ingress, error) {
1✔
808
        // Ingress removed, handled outside
1✔
809
        if ingress == nil {
2✔
810
                return existing, nil
1✔
811
        }
1✔
812

813
        if existing == nil {
2✔
814
                if ingress.Annotations == nil {
2✔
815
                        ingress.Annotations = make(map[string]string)
1✔
816
                }
1✔
817
                ingress.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
818

1✔
819
                createdIng, err := c.client.NetworkingV1().Ingresses(ingress.Namespace).Create(ctx, ingress, metav1.CreateOptions{})
1✔
820
                if err != nil {
1✔
821
                        return nil, err
×
822
                }
×
823
                c.recorder.Eventf(
1✔
824
                        stackset,
1✔
825
                        v1.EventTypeNormal,
1✔
826
                        "CreatedIngress",
1✔
827
                        "Created Ingress %s",
1✔
828
                        ingress.Name)
1✔
829
                return createdIng, nil
1✔
830
        }
831

832
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
833
        if existingHaveUpdateTimeStamp {
2✔
834
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
835
        }
1✔
836

837
        // Check if we need to update the Ingress
838
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(ingress.Spec, existing.Spec) &&
1✔
839
                equality.Semantic.DeepEqual(ingress.Annotations, existing.Annotations) &&
1✔
840
                equality.Semantic.DeepEqual(ingress.Labels, existing.Labels) {
2✔
841
                // add the annotation back after comparing
1✔
842
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
843
                return existing, nil
1✔
844
        }
1✔
845

846
        updated := existing.DeepCopy()
1✔
847
        updated.Spec = ingress.Spec
1✔
848
        if ingress.Annotations != nil {
2✔
849
                updated.Annotations = ingress.Annotations
1✔
850
        } else {
2✔
851
                updated.Annotations = make(map[string]string)
1✔
852
        }
1✔
853
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
854

1✔
855
        updated.Labels = ingress.Labels
1✔
856

1✔
857
        createdIngress, err := c.client.NetworkingV1().Ingresses(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
858
        if err != nil {
1✔
859
                return nil, err
×
860
        }
×
861
        c.recorder.Eventf(
1✔
862
                stackset,
1✔
863
                v1.EventTypeNormal,
1✔
864
                "UpdatedIngress",
1✔
865
                "Updated Ingress %s",
1✔
866
                ingress.Name)
1✔
867
        return createdIngress, nil
1✔
868
}
869

870
func (c *StackSetController) deleteIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup) error {
1✔
871
        // Check if a routegroup exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
872
        if stackset.Spec.RouteGroup != nil && c.routeGroupSupportEnabled {
2✔
873
                if routegroup == nil {
2✔
874
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup missing", existing.Name)
1✔
875
                        return nil
1✔
876
                }
1✔
877
                timestamp, ok := routegroup.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
878
                // The only scenario version we could think of for this is
1✔
879
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
880
                //  in that case, just wait until the RouteGroup has the annotation
1✔
881
                if !ok {
2✔
882
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have the %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
883
                        return nil
1✔
884
                }
1✔
885

886
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
887
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have a valid %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
888
                        return nil
1✔
889
                } else if !ready {
3✔
890
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s updated less than %s ago", existing.Name, routegroup.Name, c.ingressSourceSwitchTTL)
1✔
891
                        return nil
1✔
892
                }
1✔
893
        }
894
        err := c.client.NetworkingV1().Ingresses(existing.Namespace).Delete(ctx, existing.Name, metav1.DeleteOptions{})
1✔
895
        if err != nil {
1✔
896
                return err
×
897
        }
×
898
        c.recorder.Eventf(
1✔
899
                stackset,
1✔
900
                v1.EventTypeNormal,
1✔
901
                "DeletedIngress",
1✔
902
                "Deleted Ingress %s",
1✔
903
                existing.Namespace)
1✔
904
        return nil
1✔
905
}
906

907
// AddUpdateStackSetRouteGroup reconciles the RouteGroup but never deletes it, it returns the existing/new RouteGroup
908
func (c *StackSetController) AddUpdateStackSetRouteGroup(ctx context.Context, stackset *zv1.StackSet, existing *rgv1.RouteGroup, ingress *networking.Ingress, rg *rgv1.RouteGroup) (*rgv1.RouteGroup, error) {
1✔
909
        // RouteGroup removed, handled outside
1✔
910
        if rg == nil {
2✔
911
                return existing, nil
1✔
912
        }
1✔
913

914
        // Create new RouteGroup
915
        if existing == nil {
2✔
916
                if rg.Annotations == nil {
2✔
917
                        rg.Annotations = make(map[string]string)
1✔
918
                }
1✔
919
                rg.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
920

1✔
921
                createdRg, err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Create(ctx, rg, metav1.CreateOptions{})
1✔
922
                if err != nil {
1✔
923
                        return nil, err
×
924
                }
×
925
                c.recorder.Eventf(
1✔
926
                        stackset,
1✔
927
                        v1.EventTypeNormal,
1✔
928
                        "CreatedRouteGroup",
1✔
929
                        "Created RouteGroup %s",
1✔
930
                        rg.Name)
1✔
931
                return createdRg, nil
1✔
932
        }
933

934
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
935
        if existingHaveUpdateTimeStamp {
2✔
936
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
937
        }
1✔
938

939
        // Check if we need to update the RouteGroup
940
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(rg.Spec, existing.Spec) &&
1✔
941
                equality.Semantic.DeepEqual(rg.Annotations, existing.Annotations) &&
1✔
942
                equality.Semantic.DeepEqual(rg.Labels, existing.Labels) {
2✔
943
                // add the annotation back after comparing
1✔
944
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
945
                return existing, nil
1✔
946
        }
1✔
947

948
        updated := existing.DeepCopy()
1✔
949
        updated.Spec = rg.Spec
1✔
950
        if rg.Annotations != nil {
1✔
951
                updated.Annotations = rg.Annotations
×
952
        } else {
1✔
953
                updated.Annotations = make(map[string]string)
1✔
954
        }
1✔
955
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
956

1✔
957
        updated.Labels = rg.Labels
1✔
958

1✔
959
        createdRg, err := c.client.RouteGroupV1().RouteGroups(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
960
        if err != nil {
1✔
961
                return nil, err
×
962
        }
×
963
        c.recorder.Eventf(
1✔
964
                stackset,
1✔
965
                v1.EventTypeNormal,
1✔
966
                "UpdatedRouteGroup",
1✔
967
                "Updated RouteGroup %s",
1✔
968
                rg.Name)
1✔
969
        return createdRg, nil
1✔
970
}
971

972
func (c *StackSetController) deleteRouteGroup(ctx context.Context, stackset *zv1.StackSet, rg *rgv1.RouteGroup, ingress *networking.Ingress) error {
1✔
973
        // Check if an ingress exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
974
        if stackset.Spec.Ingress != nil {
2✔
975
                if ingress == nil {
2✔
976
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress missing", rg.Name)
1✔
977
                        return nil
1✔
978
                }
1✔
979
                timestamp, ok := ingress.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
980
                // The only scenario version we could think of for this is
1✔
981
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
982
                //  in that case, just wait until the RouteGroup has the annotation
1✔
983
                if !ok {
2✔
984
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have the %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
985
                        return nil
1✔
986
                }
1✔
987

988
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
989
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have a valid %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
990
                        return nil
1✔
991
                } else if !ready {
3✔
992
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s updated less than %s ago", rg.Name, ingress.Name, c.ingressSourceSwitchTTL)
1✔
993
                        return nil
1✔
994
                }
1✔
995
        }
996
        err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Delete(ctx, rg.Name, metav1.DeleteOptions{})
1✔
997
        if err != nil {
1✔
998
                return err
×
999
        }
×
1000
        c.recorder.Eventf(
1✔
1001
                stackset,
1✔
1002
                v1.EventTypeNormal,
1✔
1003
                "DeletedRouteGroup",
1✔
1004
                "Deleted RouteGroup %s",
1✔
1005
                rg.Namespace)
1✔
1006
        return nil
1✔
1007
}
1008

1009
func (c *StackSetController) ReconcileStackSetIngressSources(
1010
        ctx context.Context,
1011
        stackset *zv1.StackSet,
1012
        existingIng *networking.Ingress,
1013
        existingRg *rgv1.RouteGroup,
1014
        generateIng func() (*networking.Ingress, error),
1015
        generateRg func() (*rgv1.RouteGroup, error),
1016
) error {
1✔
1017
        ingress, err := generateIng()
1✔
1018
        if err != nil {
1✔
1019
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1020
        }
×
1021

1022
        // opt-out existingIng creation in case we have an external entity creating existingIng
1023
        appliedIng, err := c.AddUpdateStackSetIngress(ctx, stackset, existingIng, existingRg, ingress)
1✔
1024
        if err != nil {
1✔
1025
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1026
        }
×
1027

1028
        rg, err := generateRg()
1✔
1029
        if err != nil {
1✔
1030
                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1031
        }
×
1032

1033
        var appliedRg *rgv1.RouteGroup
1✔
1034
        if c.routeGroupSupportEnabled {
2✔
1035
                appliedRg, err = c.AddUpdateStackSetRouteGroup(ctx, stackset, existingRg, appliedIng, rg)
1✔
1036
                if err != nil {
1✔
1037
                        return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1038
                }
×
1039
        }
1040

1041
        // Ingress removed
1042
        if ingress == nil {
2✔
1043
                if existingIng != nil {
2✔
1044
                        err := c.deleteIngress(ctx, stackset, existingIng, appliedRg)
1✔
1045
                        if err != nil {
1✔
1046
                                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1047
                        }
×
1048
                }
1049
        }
1050

1051
        // RouteGroup removed
1052
        if rg == nil {
2✔
1053
                if existingRg != nil {
2✔
1054
                        err := c.deleteRouteGroup(ctx, stackset, existingRg, appliedIng)
1✔
1055
                        if err != nil {
1✔
1056
                                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1057
                        }
×
1058
                }
1059
        }
1060

1061
        return nil
1✔
1062
}
1063

1064
// convertToTrafficSegments removes the central ingress component of the
1065
// StackSet, if and only if the StackSet already has traffic segments.
1066
func (c *StackSetController) convertToTrafficSegments(
1067
        ctx context.Context,
1068
        ssc *core.StackSetContainer,
NEW
1069
) error {
×
NEW
1070
        if ssc.Ingress == nil && ssc.RouteGroup == nil {
×
NEW
1071
                return nil
×
NEW
1072
        }
×
1073

NEW
1074
        var ingTimestamp, rgTimestamp *metav1.Time
×
NEW
1075
        for _, sc := range ssc.StackContainers {
×
NEW
1076
                // If we find at least one stack with a segment, we can delete the
×
NEW
1077
                // central ingress resources.
×
NEW
1078
                if ingTimestamp == nil && sc.Resources.IngressSegment != nil {
×
NEW
1079
                        ingTimestamp = &sc.Resources.IngressSegment.CreationTimestamp
×
NEW
1080
                }
×
1081

NEW
1082
                if rgTimestamp == nil && sc.Resources.RouteGroupSegment != nil {
×
NEW
1083
                        rgTimestamp = &sc.Resources.RouteGroupSegment.CreationTimestamp
×
NEW
1084
                }
×
1085

NEW
1086
                if ingTimestamp != nil && rgTimestamp != nil {
×
NEW
1087
                        break
×
1088
                }
1089
        }
1090

NEW
1091
        if ingTimestamp != nil && ssc.Ingress != nil {
×
NEW
1092
                if !resourceReadyTime(ingTimestamp.Time, c.ingressSourceSwitchTTL) {
×
NEW
1093
                        c.logger.Infof(
×
NEW
1094
                                "Not deleting Ingress %s, segments created less than %s ago",
×
NEW
1095
                                ssc.Ingress.Name,
×
NEW
1096
                                c.ingressSourceSwitchTTL,
×
NEW
1097
                        )
×
NEW
1098
                        return nil
×
NEW
1099
                }
×
1100

NEW
1101
                err := c.client.NetworkingV1().Ingresses(ssc.Ingress.Namespace).Delete(
×
NEW
1102
                        ctx,
×
NEW
1103
                        ssc.Ingress.Name,
×
NEW
1104
                        metav1.DeleteOptions{},
×
NEW
1105
                )
×
NEW
1106
                if err != nil {
×
NEW
1107
                        return err
×
NEW
1108
                }
×
1109

NEW
1110
                c.recorder.Eventf(
×
NEW
1111
                        ssc.StackSet,
×
NEW
1112
                        v1.EventTypeNormal,
×
NEW
1113
                        "DeletedIngress",
×
NEW
1114
                        "Deleted Ingress %s, StackSet conversion to traffic segments complete",
×
NEW
1115
                        ssc.Ingress.Namespace,
×
NEW
1116
                )
×
NEW
1117

×
NEW
1118
                ssc.Ingress = nil
×
1119
        }
1120

NEW
1121
        if rgTimestamp != nil && ssc.RouteGroup != nil {
×
NEW
1122
                if !resourceReadyTime(rgTimestamp.Time, c.ingressSourceSwitchTTL) {
×
NEW
1123
                        c.logger.Infof(
×
NEW
1124
                                "Not deleting RouteGroup %s, segments created less than %s ago",
×
NEW
1125
                                ssc.RouteGroup.Name,
×
NEW
1126
                                c.ingressSourceSwitchTTL,
×
NEW
1127
                        )
×
NEW
1128
                        return nil
×
NEW
1129
                }
×
1130

NEW
1131
                err := c.client.RouteGroupV1().RouteGroups(
×
NEW
1132
                        ssc.RouteGroup.Namespace,
×
NEW
1133
                ).Delete(
×
NEW
1134
                        ctx,
×
NEW
1135
                        ssc.RouteGroup.Name,
×
NEW
1136
                        metav1.DeleteOptions{},
×
NEW
1137
                )
×
NEW
1138
                if err != nil {
×
NEW
1139
                        return err
×
NEW
1140
                }
×
1141

NEW
1142
                c.recorder.Eventf(
×
NEW
1143
                        ssc.RouteGroup,
×
NEW
1144
                        v1.EventTypeNormal,
×
NEW
1145
                        "DeletedRouteGroup",
×
NEW
1146
                        "Deleted RouteGroup %s, StackSet conversion to traffic segments complete",
×
NEW
1147
                        ssc.RouteGroup.Namespace,
×
NEW
1148
                )
×
NEW
1149

×
NEW
1150
                ssc.RouteGroup = nil
×
1151
        }
1152

NEW
1153
        return nil
×
1154
}
1155

1156
// ReconcileStackSetResources reconciles the central Ingress and/or RouteGroup
1157
// of the specified StackSet.
1158
//
1159
// If the StackSet supports traffic segments, the controller won't reconcile the
1160
// central ingress resources. This method is deprecated and will be removed in
1161
// the future.
1162
func (c *StackSetController) ReconcileStackSetResources(ctx context.Context, ssc *core.StackSetContainer) error {
×
NEW
1163
        if !ssc.SupportsSegmentTraffic() {
×
NEW
1164
                err := c.ReconcileStackSetIngressSources(
×
NEW
1165
                        ctx,
×
NEW
1166
                        ssc.StackSet,
×
NEW
1167
                        ssc.Ingress,
×
NEW
1168
                        ssc.RouteGroup,
×
NEW
1169
                        ssc.GenerateIngress,
×
NEW
1170
                        ssc.GenerateRouteGroup,
×
NEW
1171
                )
×
NEW
1172
                if err != nil {
×
NEW
1173
                        return err
×
NEW
1174
                }
×
NEW
1175
        } else {
×
NEW
1176
                // Convert StackSet to traffic segments, if needed
×
NEW
1177
                err := c.convertToTrafficSegments(ctx, ssc)
×
NEW
1178
                if err != nil {
×
NEW
1179
                        return err
×
NEW
1180
                }
×
1181
        }
1182

1183
        trafficChanges := ssc.TrafficChanges()
×
1184
        if len(trafficChanges) != 0 {
×
1185
                var changeMessages []string
×
1186
                for _, change := range trafficChanges {
×
1187
                        changeMessages = append(changeMessages, change.String())
×
1188
                }
×
1189

1190
                c.recorder.Eventf(
×
1191
                        ssc.StackSet,
×
1192
                        v1.EventTypeNormal,
×
1193
                        "TrafficSwitched",
×
1194
                        "Switched traffic: %s",
×
1195
                        strings.Join(changeMessages, ", "))
×
1196
        }
1197

1198
        return nil
×
1199
}
1200

1201
func (c *StackSetController) ReconcileStackSetDesiredTraffic(ctx context.Context, existing *zv1.StackSet, generateUpdated func() []*zv1.DesiredTraffic) error {
1✔
1202
        updatedTraffic := generateUpdated()
1✔
1203

1✔
1204
        if equality.Semantic.DeepEqual(existing.Spec.Traffic, updatedTraffic) {
1✔
1205
                return nil
×
1206
        }
×
1207

1208
        updated := existing.DeepCopy()
1✔
1209
        updated.Spec.Traffic = updatedTraffic
1✔
1210

1✔
1211
        _, err := c.client.ZalandoV1().StackSets(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
1212
        if err != nil {
1✔
1213
                return err
×
1214
        }
×
1215
        c.recorder.Eventf(
1✔
1216
                updated,
1✔
1217
                v1.EventTypeNormal,
1✔
1218
                "UpdatedStackSet",
1✔
1219
                "Updated StackSet %s",
1✔
1220
                updated.Name)
1✔
1221
        return nil
1✔
1222
}
1223

1224
func (c *StackSetController) ReconcileStackResources(ctx context.Context, ssc *core.StackSetContainer, sc *core.StackContainer) error {
×
1225
        if c.configMapSupportEnabled {
×
1226
                err := c.ReconcileStackConfigMap(ctx, sc.Stack, sc.Resources.ConfigMaps, sc.UpdateObjectMeta)
×
1227
                if err != nil {
×
1228
                        return c.errorEventf(sc.Stack, "FailedManageConfigMap", err)
×
1229
                }
×
1230
        }
1231

1232
        err := c.ReconcileStackDeployment(ctx, sc.Stack, sc.Resources.Deployment, sc.GenerateDeployment)
×
1233
        if err != nil {
×
1234
                return c.errorEventf(sc.Stack, "FailedManageDeployment", err)
×
1235
        }
×
1236
        err = c.ReconcileStackHPA(ctx, sc.Stack, sc.Resources.HPA, sc.GenerateHPA)
×
1237
        if err != nil {
×
1238
                return c.errorEventf(sc.Stack, "FailedManageHPA", err)
×
1239
        }
×
1240

1241
        err = c.ReconcileStackService(ctx, sc.Stack, sc.Resources.Service, sc.GenerateService)
×
1242
        if err != nil {
×
1243
                return c.errorEventf(sc.Stack, "FailedManageService", err)
×
1244
        }
×
1245

1246
        err = c.ReconcileStackIngress(ctx, sc.Stack, sc.Resources.Ingress, sc.GenerateIngress)
×
1247
        if err != nil {
×
1248
                return c.errorEventf(sc.Stack, "FailedManageIngress", err)
×
1249
        }
×
1250

1251
        // This is to support both central and segment-based traffic.
NEW
1252
        if ssc.SupportsSegmentTraffic() {
×
NEW
1253
                err = c.ReconcileStackIngress(
×
NEW
1254
                        ctx,
×
NEW
1255
                        sc.Stack,
×
NEW
1256
                        sc.Resources.IngressSegment,
×
NEW
1257
                        sc.GenerateIngressSegment,
×
NEW
1258
                )
×
NEW
1259
                if err != nil {
×
NEW
1260
                        return c.errorEventf(sc.Stack, "FailedManageIngressSegment", err)
×
NEW
1261
                }
×
1262
        }
1263

1264
        if c.routeGroupSupportEnabled {
×
1265
                err = c.ReconcileStackRouteGroup(ctx, sc.Stack, sc.Resources.RouteGroup, sc.GenerateRouteGroup)
×
1266
                if err != nil {
×
1267
                        return c.errorEventf(sc.Stack, "FailedManageRouteGroup", err)
×
1268
                }
×
1269

1270
                // This is to support both central and segment-based traffic.
NEW
1271
                if ssc.SupportsSegmentTraffic() {
×
NEW
1272
                        err = c.ReconcileStackRouteGroup(
×
NEW
1273
                                ctx,
×
NEW
1274
                                sc.Stack,
×
NEW
1275
                                sc.Resources.RouteGroupSegment,
×
NEW
1276
                                sc.GenerateRouteGroupSegment,
×
NEW
1277
                        )
×
NEW
1278
                        if err != nil {
×
NEW
1279
                                return c.errorEventf(
×
NEW
1280
                                        sc.Stack,
×
NEW
1281
                                        "FailedManageRouteGroupSegment",
×
NEW
1282
                                        err,
×
NEW
1283
                                )
×
NEW
1284
                        }
×
1285
                }
1286
        }
1287
        return nil
×
1288
}
1289

1290
// ReconcileStackSet reconciles all the things from a stackset
1291
func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *core.StackSetContainer) (err error) {
×
1292
        defer func() {
×
1293
                if r := recover(); r != nil {
×
1294
                        c.metricsReporter.ReportPanic()
×
1295
                        c.stacksetLogger(container).Errorf("Encountered a panic while processing a stackset: %v\n%s", r, debug.Stack())
×
1296
                        err = fmt.Errorf("panic: %v", r)
×
1297
                }
×
1298
        }()
1299

NEW
1300
        if c.injectSegmentAnnotation(ctx, container.StackSet) {
×
NEW
1301
                // Reconciler handles StackSet in the next loop
×
NEW
1302
                return nil
×
NEW
1303
        }
×
1304

1305
        // Create current stack, if needed. Proceed on errors.
1306
        err = c.CreateCurrentStack(ctx, container)
×
1307
        if err != nil {
×
1308
                err = c.errorEventf(container.StackSet, "FailedCreateStack", err)
×
1309
                c.stacksetLogger(container).Errorf("Unable to create stack: %v", err)
×
1310
        }
×
1311

1312
        // Update statuses from external resources (ingresses, deployments, etc). Abort on errors.
1313
        err = container.UpdateFromResources()
×
1314
        if err != nil {
×
1315
                return err
×
1316
        }
×
1317

1318
        // Update the stacks with the currently selected traffic reconciler. Proceed on errors.
1319
        err = container.ManageTraffic(time.Now())
×
1320
        if err != nil {
×
1321
                c.stacksetLogger(container).Errorf("Traffic reconciliation failed: %v", err)
×
1322
                c.recorder.Eventf(
×
1323
                        container.StackSet,
×
1324
                        v1.EventTypeWarning,
×
1325
                        "TrafficNotSwitched",
×
1326
                        "Failed to switch traffic: "+err.Error())
×
1327
        }
×
1328

1329
        // Mark stacks that should be removed
1330
        container.MarkExpiredStacks()
×
1331

×
NEW
1332
        segsInOrder := []types.UID{}
×
NEW
1333
        // This is to support both central and segment-based traffic.
×
NEW
1334
        if container.SupportsSegmentTraffic() {
×
NEW
1335
                // Update traffic segments. Proceed on errors.
×
NEW
1336
                segsInOrder, err = c.ReconcileTrafficSegments(ctx, container)
×
NEW
1337
                if err != nil {
×
NEW
1338
                        err = c.errorEventf(
×
NEW
1339
                                container.StackSet,
×
NEW
1340
                                reasonFailedManageStackSet,
×
NEW
1341
                                err,
×
NEW
1342
                        )
×
NEW
1343
                        c.stacksetLogger(container).Errorf(
×
NEW
1344
                                "Unable to reconcile traffic segments: %v",
×
NEW
1345
                                err,
×
NEW
1346
                        )
×
NEW
1347
                }
×
1348
        }
1349

1350
        // Reconcile stack resources. Proceed on errors.
NEW
1351
        reconciledStacks := map[types.UID]bool{}
×
NEW
1352
        for _, id := range segsInOrder {
×
NEW
1353
                reconciledStacks[id] = true
×
NEW
1354
                sc := container.StackContainers[id]
×
NEW
1355
                err = c.ReconcileStackResources(ctx, container, sc)
×
NEW
1356
                if err != nil {
×
NEW
1357
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
NEW
1358
                        c.stackLogger(container, sc).Errorf(
×
NEW
1359
                                "Unable to reconcile stack resources: %v",
×
NEW
1360
                                err,
×
NEW
1361
                        )
×
NEW
1362
                }
×
1363
        }
1364

NEW
1365
        for k, sc := range container.StackContainers {
×
NEW
1366
                if reconciledStacks[k] {
×
NEW
1367
                        continue
×
1368
                }
1369

1370
                err = c.ReconcileStackResources(ctx, container, sc)
×
1371
                if err != nil {
×
1372
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
1373
                        c.stackLogger(container, sc).Errorf("Unable to reconcile stack resources: %v", err)
×
1374
                }
×
1375
        }
1376

1377
        // Reconcile stackset resources (update ingress and/or routegroups). Proceed on errors.
1378
        err = c.ReconcileStackSetResources(ctx, container)
×
1379
        if err != nil {
×
1380
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1381
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset resources: %v", err)
×
1382
        }
×
1383

1384
        // Reconcile desired traffic in the stackset. Proceed on errors.
1385
        err = c.ReconcileStackSetDesiredTraffic(ctx, container.StackSet, container.GenerateStackSetTraffic)
×
1386
        if err != nil {
×
1387
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1388
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset traffic: %v", err)
×
1389
        }
×
1390

1391
        // Delete old stacks. Proceed on errors.
1392
        err = c.CleanupOldStacks(ctx, container)
×
1393
        if err != nil {
×
1394
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1395
                c.stacksetLogger(container).Errorf("Unable to delete old stacks: %v", err)
×
1396
        }
×
1397

1398
        // Update statuses.
1399
        err = c.ReconcileStatuses(ctx, container)
×
1400
        if err != nil {
×
1401
                return err
×
1402
        }
×
1403

1404
        return nil
×
1405
}
1406

1407
// getResetMinReplicasDelay parses and returns the reset delay if set in the
1408
// stackset annotation.
1409
func getResetMinReplicasDelay(annotations map[string]string) (time.Duration, bool) {
1✔
1410
        resetDelayStr, ok := annotations[ResetHPAMinReplicasDelayAnnotationKey]
1✔
1411
        if !ok {
2✔
1412
                return 0, false
1✔
1413
        }
1✔
1414
        resetDelay, err := time.ParseDuration(resetDelayStr)
1✔
1415
        if err != nil {
1✔
1416
                return 0, false
×
1417
        }
×
1418
        return resetDelay, true
1✔
1419
}
1420

1421
func fixupStackSetTypeMeta(stackset *zv1.StackSet) {
1✔
1422
        // set TypeMeta manually because of this bug:
1✔
1423
        // https://github.com/kubernetes/client-go/issues/308
1✔
1424
        stackset.APIVersion = core.APIVersion
1✔
1425
        stackset.Kind = core.KindStackSet
1✔
1426
}
1✔
1427

1428
func fixupStackTypeMeta(stack *zv1.Stack) {
1✔
1429
        // set TypeMeta manually because of this bug:
1✔
1430
        // https://github.com/kubernetes/client-go/issues/308
1✔
1431
        stack.APIVersion = core.APIVersion
1✔
1432
        stack.Kind = core.KindStack
1✔
1433
}
1✔
1434

1435
func resourceReady(timestamp string, ttl time.Duration) (bool, error) {
1✔
1436
        resourceLastUpdated, err := time.Parse(time.RFC3339, timestamp)
1✔
1437
        if err != nil {
2✔
1438
                // wait until there's a valid timestamp on the annotation
1✔
1439
                return false, err
1✔
1440
        }
1✔
1441

1442
        return resourceReadyTime(resourceLastUpdated, ttl), nil
1✔
1443
}
1444

1445
func resourceReadyTime(timestamp time.Time, ttl time.Duration) bool {
1✔
1446
        if !timestamp.IsZero() && time.Since(timestamp) > ttl {
2✔
1447
                return true
1✔
1448
        }
1✔
1449

1450
        return false
1✔
1451
}
1452

1453
// validateConfigurationResourceNames returns an error if any ConfigurationResource name is not prefixed by Stack name.
1454
func validateConfigurationResourceNames(stack *zv1.Stack) error {
1✔
1455
        for _, rsc := range stack.Spec.ConfigurationResources {
2✔
1456
                rscName := rsc.ConfigMapRef.Name
1✔
1457
                if !strings.HasPrefix(rscName, stack.Name) {
1✔
1458
                        return fmt.Errorf("ConfigurationResource name must be prefixed by Stack name. "+
×
1459
                                "ConfigurationResource: %s, Stack: %s", rscName, stack.Name)
×
1460
                }
×
1461
        }
1462

1463
        return nil
1✔
1464
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc