• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando-incubator / stackset-controller / 8250881658

12 Mar 2024 02:47PM UTC coverage: 52.178% (-0.2%) from 52.337%
8250881658

push

github

zaklawrencea
Update dependencies (k8s.io/* -> v0.26.14)

Signed-off-by: Zak Lawrence <zaklawrencea@protonmail.com>

0 of 23 new or added lines in 2 files covered. (0.0%)

1 existing line in 1 file now uncovered.

3079 of 5901 relevant lines covered (52.18%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.95
/controller/stackset.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "net/http"
7
        "runtime/debug"
8
        "strings"
9
        "sync"
10
        "time"
11

12
        "github.com/google/go-cmp/cmp"
13
        "github.com/google/go-cmp/cmp/cmpopts"
14
        "github.com/heptiolabs/healthcheck"
15
        "github.com/prometheus/client_golang/prometheus"
16
        log "github.com/sirupsen/logrus"
17
        rgv1 "github.com/szuecs/routegroup-client/apis/zalando.org/v1"
18
        zv1 "github.com/zalando-incubator/stackset-controller/pkg/apis/zalando.org/v1"
19
        "github.com/zalando-incubator/stackset-controller/pkg/clientset"
20
        "github.com/zalando-incubator/stackset-controller/pkg/core"
21
        "github.com/zalando-incubator/stackset-controller/pkg/recorder"
22
        "golang.org/x/sync/errgroup"
23
        v1 "k8s.io/api/core/v1"
24
        networking "k8s.io/api/networking/v1"
25
        "k8s.io/apimachinery/pkg/api/equality"
26
        "k8s.io/apimachinery/pkg/api/errors"
27
        "k8s.io/apimachinery/pkg/api/resource"
28
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
        "k8s.io/apimachinery/pkg/fields"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        "k8s.io/client-go/tools/cache"
33
        kube_record "k8s.io/client-go/tools/record"
34
)
35

36
const (
37
        PrescaleStacksAnnotationKey               = "alpha.stackset-controller.zalando.org/prescale-stacks"
38
        ResetHPAMinReplicasDelayAnnotationKey     = "alpha.stackset-controller.zalando.org/reset-hpa-min-replicas-delay"
39
        StacksetControllerControllerAnnotationKey = "stackset-controller.zalando.org/controller"
40
        ControllerLastUpdatedAnnotationKey        = "stackset-controller.zalando.org/updated-timestamp"
41
        TrafficSegmentsAnnotationKey              = "stackset-controller.zalando.org/use-traffic-segments"
42

43
        reasonFailedManageStackSet = "FailedManageStackSet"
44

45
        defaultResetMinReplicasDelay = 10 * time.Minute
46
)
47

48
var configurationResourceNameError = "ConfigurationResource name must be prefixed by Stack name. ConfigurationResource: %s, Stack: %s"
49

50
// StackSetController is the main controller. It watches for changes to
51
// stackset resources and starts and maintains other controllers per
52
// stackset resource.
53
type StackSetController struct {
54
        logger                      *log.Entry
55
        client                      clientset.Interface
56
        namespace                   string
57
        syncIngressAnnotations      []string
58
        controllerID                string
59
        backendWeightsAnnotationKey string
60
        clusterDomains              []string
61
        interval                    time.Duration
62
        stacksetEvents              chan stacksetEvent
63
        stacksetStore               map[types.UID]zv1.StackSet
64
        recorder                    kube_record.EventRecorder
65
        metricsReporter             *core.MetricsReporter
66
        HealthReporter              healthcheck.Handler
67
        routeGroupSupportEnabled    bool
68
        trafficSegmentsEnabled      bool
69
        annotatedTrafficSegments    bool
70
        ingressSourceSwitchTTL      time.Duration
71
        now                         func() string
72
        reconcileWorkers            int
73
        configMapSupportEnabled     bool
74
        secretSupportEnabled        bool
75
        sync.Mutex
76
}
77

78
type stacksetEvent struct {
79
        Deleted  bool
80
        StackSet *zv1.StackSet
81
}
82

83
// eventedError wraps an error that was already exposed as an event to the user
84
type eventedError struct {
85
        err error
86
}
87

88
func (ee *eventedError) Error() string {
×
89
        return ee.err.Error()
×
90
}
×
91

92
func now() string {
×
93
        return time.Now().Format(time.RFC3339)
×
94
}
×
95

96
// NewStackSetController initializes a new StackSetController.
97
func NewStackSetController(
98
        client clientset.Interface,
99
        namespace string,
100
        controllerID string,
101
        parallelWork int,
102
        backendWeightsAnnotationKey string,
103
        clusterDomains []string,
104
        registry prometheus.Registerer,
105
        interval time.Duration,
106
        routeGroupSupportEnabled bool,
107
        trafficSegmentsEnabled bool,
108
        annotatedTrafficSegments bool,
109
        syncIngressAnnotations []string,
110
        configMapSupportEnabled bool,
111
        secretSupportEnabled bool,
112
        ingressSourceSwitchTTL time.Duration,
113
) (*StackSetController, error) {
1✔
114
        metricsReporter, err := core.NewMetricsReporter(registry)
1✔
115
        if err != nil {
1✔
116
                return nil, err
×
117
        }
×
118

119
        return &StackSetController{
1✔
120
                logger:                      log.WithFields(log.Fields{"controller": "stackset"}),
1✔
121
                client:                      client,
1✔
122
                namespace:                   namespace,
1✔
123
                controllerID:                controllerID,
1✔
124
                backendWeightsAnnotationKey: backendWeightsAnnotationKey,
1✔
125
                clusterDomains:              clusterDomains,
1✔
126
                interval:                    interval,
1✔
127
                stacksetEvents:              make(chan stacksetEvent, 1),
1✔
128
                stacksetStore:               make(map[types.UID]zv1.StackSet),
1✔
129
                recorder:                    recorder.CreateEventRecorder(client),
1✔
130
                metricsReporter:             metricsReporter,
1✔
131
                HealthReporter:              healthcheck.NewHandler(),
1✔
132
                routeGroupSupportEnabled:    routeGroupSupportEnabled,
1✔
133
                trafficSegmentsEnabled:      trafficSegmentsEnabled,
1✔
134
                annotatedTrafficSegments:    annotatedTrafficSegments,
1✔
135
                syncIngressAnnotations:      syncIngressAnnotations,
1✔
136
                ingressSourceSwitchTTL:      ingressSourceSwitchTTL,
1✔
137
                configMapSupportEnabled:     configMapSupportEnabled,
1✔
138
                secretSupportEnabled:        secretSupportEnabled,
1✔
139
                now:                         now,
1✔
140
                reconcileWorkers:            parallelWork,
1✔
141
        }, nil
1✔
142
}
143

144
func (c *StackSetController) stacksetLogger(ssc *core.StackSetContainer) *log.Entry {
×
145
        return c.logger.WithFields(map[string]interface{}{
×
146
                "namespace": ssc.StackSet.Namespace,
×
147
                "stackset":  ssc.StackSet.Name,
×
148
        })
×
149
}
×
150

151
func (c *StackSetController) stackLogger(ssc *core.StackSetContainer, sc *core.StackContainer) *log.Entry {
×
152
        return c.logger.WithFields(map[string]interface{}{
×
153
                "namespace": ssc.StackSet.Namespace,
×
154
                "stackset":  ssc.StackSet.Name,
×
155
                "stack":     sc.Name(),
×
156
        })
×
157
}
×
158

159
// Run runs the main loop of the StackSetController. Before the loops it
160
// sets up a watcher to watch StackSet resources. The watch will send
161
// changes over a channel which is polled from the main loop.
162
func (c *StackSetController) Run(ctx context.Context) {
×
163
        var nextCheck time.Time
×
164

×
165
        // We're not alive if nextCheck is too far in the past
×
166
        c.HealthReporter.AddLivenessCheck("nextCheck", func() error {
×
167
                if time.Since(nextCheck) > 5*c.interval {
×
168
                        return fmt.Errorf("nextCheck too old")
×
169
                }
×
170
                return nil
×
171
        })
172

173
        c.startWatch(ctx)
×
174

×
175
        http.HandleFunc("/healthz", c.HealthReporter.LiveEndpoint)
×
176

×
177
        nextCheck = time.Now().Add(-c.interval)
×
178

×
179
        for {
×
180
                select {
×
181
                case <-time.After(time.Until(nextCheck)):
×
182

×
183
                        nextCheck = time.Now().Add(c.interval)
×
184

×
185
                        stackSetContainers, err := c.collectResources(ctx)
×
186
                        if err != nil {
×
187
                                c.logger.Errorf("Failed to collect resources: %v", err)
×
188
                                continue
×
189
                        }
190

191
                        var reconcileGroup errgroup.Group
×
192
                        reconcileGroup.SetLimit(c.reconcileWorkers)
×
193
                        for stackset, container := range stackSetContainers {
×
194
                                container := container
×
195
                                stackset := stackset
×
196

×
197
                                reconcileGroup.Go(func() error {
×
198
                                        if _, ok := c.stacksetStore[stackset]; ok {
×
199
                                                err := c.ReconcileStackSet(ctx, container)
×
200
                                                if err != nil {
×
201
                                                        c.stacksetLogger(container).Errorf("unable to reconcile a stackset: %v", err)
×
202
                                                        return c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
203
                                                }
×
204
                                        }
205
                                        return nil
×
206
                                })
207
                        }
208

209
                        err = reconcileGroup.Wait()
×
210
                        if err != nil {
×
211
                                c.logger.Errorf("Failed waiting for reconcilers: %v", err)
×
212
                        }
×
213
                        err = c.metricsReporter.Report(stackSetContainers)
×
214
                        if err != nil {
×
215
                                c.logger.Errorf("Failed reporting metrics: %v", err)
×
216
                        }
×
217
                case e := <-c.stacksetEvents:
×
218
                        stackset := *e.StackSet
×
219
                        fixupStackSetTypeMeta(&stackset)
×
220

×
221
                        // update/delete existing entry
×
222
                        if _, ok := c.stacksetStore[stackset.UID]; ok {
×
223
                                if e.Deleted || !c.hasOwnership(&stackset) {
×
224
                                        delete(c.stacksetStore, stackset.UID)
×
225
                                        continue
×
226
                                }
227

228
                                // update stackset entry
229
                                c.stacksetStore[stackset.UID] = stackset
×
230
                                continue
×
231
                        }
232

233
                        // check if stackset should be managed by the controller
234
                        if !c.hasOwnership(&stackset) {
×
235
                                continue
×
236
                        }
237

238
                        c.logger.Infof("Adding entry for StackSet %s/%s", stackset.Namespace, stackset.Name)
×
239
                        c.stacksetStore[stackset.UID] = stackset
×
240
                case <-ctx.Done():
×
241
                        c.logger.Info("Terminating main controller loop.")
×
242
                        return
×
243
                }
244
        }
245
}
246

247
// injectSegmentAnnotation injects the traffic segment annotation if it's not
248
// already present.
249
//
250
// Only inject the traffic segment annotation if the controller has traffic
251
// segments enabled by default, i.e. doesn't matter if the StackSet has the
252
// segment annotation.
253
func (c *StackSetController) injectSegmentAnnotation(
254
        ctx context.Context,
255
        stackSet *zv1.StackSet,
256
) bool {
×
257
        if c.annotatedTrafficSegments {
×
258
                return false
×
259
        }
×
260

261
        if !c.trafficSegmentsEnabled {
×
262
                return false
×
263
        }
×
264

265
        if stackSet.Annotations[TrafficSegmentsAnnotationKey] == "true" {
×
266
                return false
×
267
        }
×
268

269
        stackSet.Annotations[TrafficSegmentsAnnotationKey] = "true"
×
270

×
271
        _, err := c.client.ZalandoV1().StackSets(
×
272
                stackSet.Namespace,
×
273
        ).Update(
×
274
                ctx,
×
275
                stackSet,
×
276
                metav1.UpdateOptions{},
×
277
        )
×
278
        if err != nil {
×
279
                c.logger.Errorf(
×
280
                        "Failed injecting segment annotation: %v",
×
281
                        err,
×
282
                )
×
283
                return false
×
284
        }
×
285
        c.recorder.Eventf(
×
286
                stackSet,
×
287
                v1.EventTypeNormal,
×
288
                "UpdatedStackSet",
×
289
                "Updated StackSet %s. Injected %s annotation",
×
290
                stackSet.Name,
×
291
                TrafficSegmentsAnnotationKey,
×
292
        )
×
293

×
294
        return true
×
295
}
296

297
// collectResources collects resources for all stacksets at once and stores them per StackSet/Stack so that we don't
298
// overload the API requests with unnecessary requests
299
func (c *StackSetController) collectResources(ctx context.Context) (map[types.UID]*core.StackSetContainer, error) {
1✔
300
        stacksets := make(map[types.UID]*core.StackSetContainer, len(c.stacksetStore))
1✔
301
        for uid, stackset := range c.stacksetStore {
2✔
302
                stackset := stackset
1✔
303

1✔
304
                reconciler := core.TrafficReconciler(&core.SimpleTrafficReconciler{})
1✔
305

1✔
306
                // use prescaling logic if enabled with an annotation
1✔
307
                if _, ok := stackset.Annotations[PrescaleStacksAnnotationKey]; ok {
2✔
308
                        resetDelay := defaultResetMinReplicasDelay
1✔
309
                        if resetDelayValue, ok := getResetMinReplicasDelay(stackset.Annotations); ok {
2✔
310
                                resetDelay = resetDelayValue
1✔
311
                        }
1✔
312
                        reconciler = &core.PrescalingTrafficReconciler{
1✔
313
                                ResetHPAMinReplicasTimeout: resetDelay,
1✔
314
                        }
1✔
315
                }
316

317
                stacksetContainer := core.NewContainer(&stackset, reconciler, c.backendWeightsAnnotationKey, c.clusterDomains)
1✔
318
                if c.trafficSegmentsEnabled || c.annotatedTrafficSegments {
2✔
319

1✔
320
                        if stackset.Annotations[TrafficSegmentsAnnotationKey] == "true" {
2✔
321
                                stacksetContainer.EnableSegmentTraffic()
1✔
322
                                stacksetContainer.SynchronizeIngressAnnotations(
1✔
323
                                        c.syncIngressAnnotations,
1✔
324
                                )
1✔
325
                        }
1✔
326
                }
327
                stacksets[uid] = stacksetContainer
1✔
328
        }
329

330
        err := c.collectStacks(ctx, stacksets)
1✔
331
        if err != nil {
1✔
332
                return nil, err
×
333
        }
×
334

335
        err = c.collectIngresses(ctx, stacksets)
1✔
336
        if err != nil {
1✔
337
                return nil, err
×
338
        }
×
339

340
        if c.routeGroupSupportEnabled {
2✔
341
                err = c.collectRouteGroups(ctx, stacksets)
1✔
342
                if err != nil {
1✔
343
                        return nil, err
×
344
                }
×
345
        }
346

347
        err = c.collectDeployments(ctx, stacksets)
1✔
348
        if err != nil {
1✔
349
                return nil, err
×
350
        }
×
351

352
        err = c.collectServices(ctx, stacksets)
1✔
353
        if err != nil {
1✔
354
                return nil, err
×
355
        }
×
356

357
        err = c.collectHPAs(ctx, stacksets)
1✔
358
        if err != nil {
1✔
359
                return nil, err
×
360
        }
×
361

362
        if c.configMapSupportEnabled {
2✔
363
                err = c.collectConfigMaps(ctx, stacksets)
1✔
364
                if err != nil {
1✔
365
                        return nil, err
×
366
                }
×
367
        }
368

369
        if c.secretSupportEnabled {
2✔
370
                err = c.collectSecrets(ctx, stacksets)
1✔
371
                if err != nil {
1✔
372
                        return nil, err
×
373
                }
×
374
        }
375

376
        return stacksets, nil
1✔
377
}
378

379
func (c *StackSetController) collectIngresses(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
380
        ingresses, err := c.client.NetworkingV1().Ingresses(c.namespace).List(ctx, metav1.ListOptions{})
1✔
381

1✔
382
        if err != nil {
1✔
383
                return fmt.Errorf("failed to list Ingresses: %v", err)
×
384
        }
×
385

386
        for _, i := range ingresses.Items {
2✔
387
                ingress := i
1✔
388
                if uid, ok := getOwnerUID(ingress.ObjectMeta); ok {
2✔
389
                        // stackset ingress
1✔
390
                        if s, ok := stacksets[uid]; ok {
2✔
391
                                s.Ingress = &ingress
1✔
392
                                continue
1✔
393
                        }
394

395
                        // stack ingress
396
                        for _, stackset := range stacksets {
2✔
397
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
398
                                        if strings.HasSuffix(
1✔
399
                                                ingress.ObjectMeta.Name,
1✔
400
                                                core.SegmentSuffix,
1✔
401
                                        ) {
2✔
402
                                                // Traffic Segment
1✔
403
                                                s.Resources.IngressSegment = &ingress
1✔
404
                                        } else {
2✔
405
                                                s.Resources.Ingress = &ingress
1✔
406
                                        }
1✔
407
                                        break
1✔
408
                                }
409
                        }
410
                }
411
        }
412
        return nil
1✔
413
}
414

415
func (c *StackSetController) collectRouteGroups(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
416
        rgs, err := c.client.RouteGroupV1().RouteGroups(c.namespace).List(
1✔
417
                ctx,
1✔
418
                metav1.ListOptions{},
1✔
419
        )
1✔
420
        if err != nil {
1✔
421
                return fmt.Errorf("failed to list RouteGroups: %v", err)
×
422
        }
×
423

424
        for _, rg := range rgs.Items {
2✔
425
                routegroup := rg
1✔
426
                if uid, ok := getOwnerUID(routegroup.ObjectMeta); ok {
2✔
427
                        // stackset routegroups
1✔
428
                        if s, ok := stacksets[uid]; ok {
2✔
429
                                s.RouteGroup = &routegroup
1✔
430
                                continue
1✔
431
                        }
432

433
                        // stack routegroups
434
                        for _, stackset := range stacksets {
2✔
435
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
436
                                        if strings.HasSuffix(
1✔
437
                                                routegroup.ObjectMeta.Name,
1✔
438
                                                core.SegmentSuffix,
1✔
439
                                        ) {
2✔
440
                                                // Traffic Segment
1✔
441
                                                s.Resources.RouteGroupSegment = &routegroup
1✔
442
                                        } else {
2✔
443
                                                s.Resources.RouteGroup = &routegroup
1✔
444
                                        }
1✔
445
                                        break
1✔
446
                                }
447
                        }
448
                }
449
        }
450
        return nil
1✔
451
}
452

453
func (c *StackSetController) collectStacks(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
454
        stacks, err := c.client.ZalandoV1().Stacks(c.namespace).List(ctx, metav1.ListOptions{})
1✔
455
        if err != nil {
1✔
456
                return fmt.Errorf("failed to list Stacks: %v", err)
×
457
        }
×
458

459
        for _, stack := range stacks.Items {
2✔
460
                if uid, ok := getOwnerUID(stack.ObjectMeta); ok {
2✔
461
                        if s, ok := stacksets[uid]; ok {
2✔
462
                                stack := stack
1✔
463
                                fixupStackTypeMeta(&stack)
1✔
464

1✔
465
                                s.StackContainers[stack.UID] = &core.StackContainer{
1✔
466
                                        Stack: &stack,
1✔
467
                                }
1✔
468
                                continue
1✔
469
                        }
470
                }
471
        }
472
        return nil
1✔
473
}
474

475
func (c *StackSetController) collectDeployments(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
476
        deployments, err := c.client.AppsV1().Deployments(c.namespace).List(ctx, metav1.ListOptions{})
1✔
477
        if err != nil {
1✔
478
                return fmt.Errorf("failed to list Deployments: %v", err)
×
479
        }
×
480

481
        for _, d := range deployments.Items {
2✔
482
                deployment := d
1✔
483
                if uid, ok := getOwnerUID(deployment.ObjectMeta); ok {
2✔
484
                        for _, stackset := range stacksets {
2✔
485
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
486
                                        s.Resources.Deployment = &deployment
1✔
487
                                        break
1✔
488
                                }
489
                        }
490
                }
491
        }
492
        return nil
1✔
493
}
494

495
func (c *StackSetController) collectServices(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
496
        services, err := c.client.CoreV1().Services(c.namespace).List(ctx, metav1.ListOptions{})
1✔
497
        if err != nil {
1✔
498
                return fmt.Errorf("failed to list Services: %v", err)
×
499
        }
×
500

501
Items:
1✔
502
        for _, s := range services.Items {
2✔
503
                service := s
1✔
504
                if uid, ok := getOwnerUID(service.ObjectMeta); ok {
2✔
505
                        for _, stackset := range stacksets {
2✔
506
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
507
                                        s.Resources.Service = &service
1✔
508
                                        continue Items
1✔
509
                                }
510

511
                                // service/HPA used to be owned by the deployment for some reason
512
                                for _, stack := range stackset.StackContainers {
2✔
513
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
514
                                                stack.Resources.Service = &service
1✔
515
                                                continue Items
1✔
516
                                        }
517
                                }
518
                        }
519
                }
520
        }
521
        return nil
1✔
522
}
523

524
func (c *StackSetController) collectHPAs(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
525
        hpas, err := c.client.AutoscalingV2().HorizontalPodAutoscalers(c.namespace).List(ctx, metav1.ListOptions{})
1✔
526
        if err != nil {
1✔
527
                return fmt.Errorf("failed to list HPAs: %v", err)
×
528
        }
×
529

530
Items:
1✔
531
        for _, h := range hpas.Items {
2✔
532
                hpa := h
1✔
533
                if uid, ok := getOwnerUID(hpa.ObjectMeta); ok {
2✔
534
                        for _, stackset := range stacksets {
2✔
535
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
536
                                        s.Resources.HPA = &hpa
1✔
537
                                        continue Items
1✔
538
                                }
539

540
                                // service/HPA used to be owned by the deployment for some reason
541
                                for _, stack := range stackset.StackContainers {
2✔
542
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
543
                                                stack.Resources.HPA = &hpa
1✔
544
                                                continue Items
1✔
545
                                        }
546
                                }
547
                        }
548
                }
549
        }
550
        return nil
1✔
551
}
552

553
func (c *StackSetController) collectConfigMaps(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
554
        configMaps, err := c.client.CoreV1().ConfigMaps(c.namespace).List(ctx, metav1.ListOptions{})
1✔
555
        if err != nil {
1✔
556
                return fmt.Errorf("failed to list ConfigMaps: %v", err)
×
557
        }
×
558

559
        for _, cm := range configMaps.Items {
2✔
560
                configMap := cm
1✔
561
                if uid, ok := getOwnerUID(configMap.ObjectMeta); ok {
2✔
562
                        for _, stackset := range stacksets {
2✔
563
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
564
                                        s.Resources.ConfigMaps = append(s.Resources.ConfigMaps, &configMap)
1✔
565
                                        break
1✔
566
                                }
567
                        }
568
                }
569
        }
570
        return nil
1✔
571
}
572

573
func (c *StackSetController) collectSecrets(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
574
        secrets, err := c.client.CoreV1().Secrets(c.namespace).List(ctx, metav1.ListOptions{})
1✔
575
        if err != nil {
1✔
576
                return fmt.Errorf("failed to list Secrets: %v", err)
×
577
        }
×
578

579
        for _, sct := range secrets.Items {
2✔
580
                secret := sct
1✔
581
                if uid, ok := getOwnerUID(secret.ObjectMeta); ok {
2✔
582
                        for _, stackset := range stacksets {
2✔
583
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
584
                                        s.Resources.Secrets = append(s.Resources.Secrets, &secret)
1✔
585
                                        break
1✔
586
                                }
587
                        }
588
                }
589
        }
590
        return nil
1✔
591
}
592

593
func getOwnerUID(objectMeta metav1.ObjectMeta) (types.UID, bool) {
1✔
594
        if len(objectMeta.OwnerReferences) == 1 {
2✔
595
                return objectMeta.OwnerReferences[0].UID, true
1✔
596
        }
1✔
597
        return "", false
1✔
598
}
599

600
func (c *StackSetController) errorEventf(object runtime.Object, reason string, err error) error {
×
601
        switch err.(type) {
×
602
        case *eventedError:
×
603
                // already notified
×
604
                return err
×
605
        default:
×
606
                c.recorder.Eventf(
×
607
                        object,
×
608
                        v1.EventTypeWarning,
×
609
                        reason,
×
610
                        err.Error())
×
611
                return &eventedError{err: err}
×
612
        }
613
}
614

615
// hasOwnership returns true if the controller is the "owner" of the stackset.
616
// Whether it's owner is determined by the value of the
617
// 'stackset-controller.zalando.org/controller' annotation. If the value
618
// matches the controllerID then it owns it, or if the controllerID is
619
// "" and there's no annotation set.
620
func (c *StackSetController) hasOwnership(stackset *zv1.StackSet) bool {
×
621
        if stackset.Annotations != nil {
×
622
                if owner, ok := stackset.Annotations[StacksetControllerControllerAnnotationKey]; ok {
×
623
                        return owner == c.controllerID
×
624
                }
×
625
        }
626
        return c.controllerID == ""
×
627
}
628

629
func (c *StackSetController) startWatch(ctx context.Context) {
×
630
        informer := cache.NewSharedIndexInformer(
×
631
                cache.NewListWatchFromClient(c.client.ZalandoV1().RESTClient(), "stacksets", c.namespace, fields.Everything()),
×
632
                &zv1.StackSet{},
×
633
                0, // skip resync
×
634
                cache.Indexers{},
×
635
        )
×
636

×
NEW
637
        _, _ = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
638
                AddFunc:    c.add,
×
639
                UpdateFunc: c.update,
×
640
                DeleteFunc: c.del,
×
641
        })
×
642
        go informer.Run(ctx.Done())
×
643
        if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
×
644
                c.logger.Errorf("Timed out waiting for caches to sync")
×
645
                return
×
646
        }
×
647
        c.logger.Info("Synced StackSet watcher")
×
648
}
649

650
func (c *StackSetController) add(obj interface{}) {
×
651
        stackset, ok := obj.(*zv1.StackSet)
×
652
        if !ok {
×
653
                return
×
654
        }
×
655

656
        c.logger.Infof("New StackSet added %s/%s", stackset.Namespace, stackset.Name)
×
657
        c.stacksetEvents <- stacksetEvent{
×
658
                StackSet: stackset.DeepCopy(),
×
659
        }
×
660
}
661

662
func (c *StackSetController) update(oldObj, newObj interface{}) {
×
663
        newStackset, ok := newObj.(*zv1.StackSet)
×
664
        if !ok {
×
665
                return
×
666
        }
×
667

668
        oldStackset, ok := oldObj.(*zv1.StackSet)
×
669
        if !ok {
×
670
                return
×
671
        }
×
672

673
        c.logger.Debugf("StackSet %s/%s changed: %s",
×
674
                newStackset.Namespace,
×
675
                newStackset.Name,
×
676
                cmp.Diff(oldStackset, newStackset, cmpopts.IgnoreUnexported(resource.Quantity{})),
×
677
        )
×
678

×
679
        c.logger.Infof("StackSet updated %s/%s", newStackset.Namespace, newStackset.Name)
×
680
        c.stacksetEvents <- stacksetEvent{
×
681
                StackSet: newStackset.DeepCopy(),
×
682
        }
×
683
}
684

685
func (c *StackSetController) del(obj interface{}) {
×
686
        stackset, ok := obj.(*zv1.StackSet)
×
687
        if !ok {
×
688
                return
×
689
        }
×
690

691
        c.logger.Infof("StackSet deleted %s/%s", stackset.Namespace, stackset.Name)
×
692
        c.stacksetEvents <- stacksetEvent{
×
693
                StackSet: stackset.DeepCopy(),
×
694
                Deleted:  true,
×
695
        }
×
696
}
697

698
func retryUpdate(updateFn func(retry bool) error) error {
×
699
        retry := false
×
700
        for {
×
701
                err := updateFn(retry)
×
702
                if err != nil {
×
703
                        if errors.IsConflict(err) {
×
704
                                retry = true
×
705
                                continue
×
706
                        }
707
                        return err
×
708
                }
709
                return nil
×
710
        }
711
}
712

713
// ReconcileStatuses reconciles the statuses of StackSets and Stacks.
714
func (c *StackSetController) ReconcileStatuses(ctx context.Context, ssc *core.StackSetContainer) error {
×
715
        for _, sc := range ssc.StackContainers {
×
716
                stack := sc.Stack.DeepCopy()
×
717
                status := *sc.GenerateStackStatus()
×
718
                err := retryUpdate(func(retry bool) error {
×
719
                        if retry {
×
720
                                updated, err := c.client.ZalandoV1().Stacks(sc.Namespace()).Get(ctx, stack.Name, metav1.GetOptions{})
×
721
                                if err != nil {
×
722
                                        return err
×
723
                                }
×
724
                                stack = updated
×
725
                        }
726
                        if !equality.Semantic.DeepEqual(status, stack.Status) {
×
727
                                stack.Status = status
×
728
                                _, err := c.client.ZalandoV1().Stacks(sc.Namespace()).UpdateStatus(ctx, stack, metav1.UpdateOptions{})
×
729
                                return err
×
730
                        }
×
731
                        return nil
×
732
                })
733
                if err != nil {
×
734
                        return c.errorEventf(sc.Stack, "FailedUpdateStackStatus", err)
×
735
                }
×
736
        }
737

738
        stackset := ssc.StackSet.DeepCopy()
×
739
        status := *ssc.GenerateStackSetStatus()
×
740
        err := retryUpdate(func(retry bool) error {
×
741
                if retry {
×
742
                        updated, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).Get(ctx, ssc.StackSet.Name, metav1.GetOptions{})
×
743
                        if err != nil {
×
744
                                return err
×
745
                        }
×
746
                        stackset = updated
×
747
                }
748
                if !equality.Semantic.DeepEqual(status, stackset.Status) {
×
749
                        stackset.Status = status
×
750
                        _, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, stackset, metav1.UpdateOptions{})
×
751
                        return err
×
752
                }
×
753
                return nil
×
754
        })
755
        if err != nil {
×
756
                return c.errorEventf(ssc.StackSet, "FailedUpdateStackSetStatus", err)
×
757
        }
×
758
        return nil
×
759
}
760

761
// ReconcileTrafficSegments updates the traffic segments according to the actual
762
// traffic weight of each stack.
763
//
764
// Returns the ordered list of Trafic Segments that need to be updated.
765
func (c *StackSetController) ReconcileTrafficSegments(
766
        ctx context.Context,
767
        ssc *core.StackSetContainer,
768
) ([]types.UID, error) {
×
769
        // Compute segments
×
770
        toUpdate, err := ssc.ComputeTrafficSegments()
×
771
        if err != nil {
×
772
                return nil, c.errorEventf(ssc.StackSet, "FailedManageSegments", err)
×
773
        }
×
774

775
        return toUpdate, nil
×
776
}
777

778
// CreateCurrentStack creates a new Stack object for the current stack, if needed
779
func (c *StackSetController) CreateCurrentStack(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
780
        newStack, newStackVersion := ssc.NewStack()
1✔
781
        if newStack == nil {
2✔
782
                return nil
1✔
783
        }
1✔
784

785
        if c.configMapSupportEnabled || c.secretSupportEnabled {
2✔
786
                // ensure that ConfigurationResources are prefixed by Stack name.
1✔
787
                if err := validateAllConfigurationResourcesNames(newStack.Stack); err != nil {
1✔
788
                        return err
×
789
                }
×
790
        }
791

792
        created, err := c.client.ZalandoV1().Stacks(newStack.Namespace()).Create(ctx, newStack.Stack, metav1.CreateOptions{})
1✔
793
        if err != nil {
1✔
794
                return err
×
795
        }
×
796
        fixupStackTypeMeta(created)
1✔
797

1✔
798
        c.recorder.Eventf(
1✔
799
                ssc.StackSet,
1✔
800
                v1.EventTypeNormal,
1✔
801
                "CreatedStack",
1✔
802
                "Created stack %s",
1✔
803
                newStack.Name(),
1✔
804
        )
1✔
805

1✔
806
        // Persist ObservedStackVersion in the status
1✔
807
        updated := ssc.StackSet.DeepCopy()
1✔
808
        updated.Status.ObservedStackVersion = newStackVersion
1✔
809

1✔
810
        result, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, updated, metav1.UpdateOptions{})
1✔
811
        if err != nil {
1✔
812
                return err
×
813
        }
×
814
        fixupStackSetTypeMeta(result)
1✔
815
        ssc.StackSet = result
1✔
816

1✔
817
        ssc.StackContainers[created.UID] = &core.StackContainer{
1✔
818
                Stack:          created,
1✔
819
                PendingRemoval: false,
1✔
820
                Resources:      core.StackResources{},
1✔
821
        }
1✔
822
        return nil
1✔
823
}
824

825
// CleanupOldStacks deletes stacks that are no longer needed.
826
func (c *StackSetController) CleanupOldStacks(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
827
        for _, sc := range ssc.StackContainers {
2✔
828
                if !sc.PendingRemoval {
2✔
829
                        continue
1✔
830
                }
831

832
                stack := sc.Stack
1✔
833
                err := c.client.ZalandoV1().Stacks(stack.Namespace).Delete(ctx, stack.Name, metav1.DeleteOptions{})
1✔
834
                if err != nil {
1✔
835
                        return c.errorEventf(ssc.StackSet, "FailedDeleteStack", err)
×
836
                }
×
837
                c.recorder.Eventf(
1✔
838
                        ssc.StackSet,
1✔
839
                        v1.EventTypeNormal,
1✔
840
                        "DeletedExcessStack",
1✔
841
                        "Deleted excess stack %s",
1✔
842
                        stack.Name)
1✔
843
        }
844

845
        return nil
1✔
846
}
847

848
// AddUpdateStackSetIngress reconciles the Ingress but never deletes it, it returns the existing/new Ingress
849
func (c *StackSetController) AddUpdateStackSetIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup, ingress *networking.Ingress) (*networking.Ingress, error) {
1✔
850
        // Ingress removed, handled outside
1✔
851
        if ingress == nil {
2✔
852
                return existing, nil
1✔
853
        }
1✔
854

855
        if existing == nil {
2✔
856
                if ingress.Annotations == nil {
2✔
857
                        ingress.Annotations = make(map[string]string)
1✔
858
                }
1✔
859
                ingress.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
860

1✔
861
                createdIng, err := c.client.NetworkingV1().Ingresses(ingress.Namespace).Create(ctx, ingress, metav1.CreateOptions{})
1✔
862
                if err != nil {
1✔
863
                        return nil, err
×
864
                }
×
865
                c.recorder.Eventf(
1✔
866
                        stackset,
1✔
867
                        v1.EventTypeNormal,
1✔
868
                        "CreatedIngress",
1✔
869
                        "Created Ingress %s",
1✔
870
                        ingress.Name)
1✔
871
                return createdIng, nil
1✔
872
        }
873

874
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
875
        if existingHaveUpdateTimeStamp {
2✔
876
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
877
        }
1✔
878

879
        // Check if we need to update the Ingress
880
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(ingress.Spec, existing.Spec) &&
1✔
881
                equality.Semantic.DeepEqual(ingress.Annotations, existing.Annotations) &&
1✔
882
                equality.Semantic.DeepEqual(ingress.Labels, existing.Labels) {
2✔
883
                // add the annotation back after comparing
1✔
884
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
885
                return existing, nil
1✔
886
        }
1✔
887

888
        updated := existing.DeepCopy()
1✔
889
        updated.Spec = ingress.Spec
1✔
890
        if ingress.Annotations != nil {
2✔
891
                updated.Annotations = ingress.Annotations
1✔
892
        } else {
2✔
893
                updated.Annotations = make(map[string]string)
1✔
894
        }
1✔
895
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
896

1✔
897
        updated.Labels = ingress.Labels
1✔
898

1✔
899
        createdIngress, err := c.client.NetworkingV1().Ingresses(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
900
        if err != nil {
1✔
901
                return nil, err
×
902
        }
×
903
        c.recorder.Eventf(
1✔
904
                stackset,
1✔
905
                v1.EventTypeNormal,
1✔
906
                "UpdatedIngress",
1✔
907
                "Updated Ingress %s",
1✔
908
                ingress.Name)
1✔
909
        return createdIngress, nil
1✔
910
}
911

912
func (c *StackSetController) deleteIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup) error {
1✔
913
        // Check if a routegroup exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
914
        if stackset.Spec.RouteGroup != nil && c.routeGroupSupportEnabled {
2✔
915
                if routegroup == nil {
2✔
916
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup missing", existing.Name)
1✔
917
                        return nil
1✔
918
                }
1✔
919
                timestamp, ok := routegroup.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
920
                // The only scenario version we could think of for this is
1✔
921
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
922
                //  in that case, just wait until the RouteGroup has the annotation
1✔
923
                if !ok {
2✔
924
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have the %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
925
                        return nil
1✔
926
                }
1✔
927

928
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
929
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have a valid %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
930
                        return nil
1✔
931
                } else if !ready {
3✔
932
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s updated less than %s ago", existing.Name, routegroup.Name, c.ingressSourceSwitchTTL)
1✔
933
                        return nil
1✔
934
                }
1✔
935
        }
936
        err := c.client.NetworkingV1().Ingresses(existing.Namespace).Delete(ctx, existing.Name, metav1.DeleteOptions{})
1✔
937
        if err != nil {
1✔
938
                return err
×
939
        }
×
940
        c.recorder.Eventf(
1✔
941
                stackset,
1✔
942
                v1.EventTypeNormal,
1✔
943
                "DeletedIngress",
1✔
944
                "Deleted Ingress %s",
1✔
945
                existing.Namespace)
1✔
946
        return nil
1✔
947
}
948

949
// AddUpdateStackSetRouteGroup reconciles the RouteGroup but never deletes it, it returns the existing/new RouteGroup
950
func (c *StackSetController) AddUpdateStackSetRouteGroup(ctx context.Context, stackset *zv1.StackSet, existing *rgv1.RouteGroup, ingress *networking.Ingress, rg *rgv1.RouteGroup) (*rgv1.RouteGroup, error) {
1✔
951
        // RouteGroup removed, handled outside
1✔
952
        if rg == nil {
2✔
953
                return existing, nil
1✔
954
        }
1✔
955

956
        // Create new RouteGroup
957
        if existing == nil {
2✔
958
                if rg.Annotations == nil {
2✔
959
                        rg.Annotations = make(map[string]string)
1✔
960
                }
1✔
961
                rg.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
962

1✔
963
                createdRg, err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Create(ctx, rg, metav1.CreateOptions{})
1✔
964
                if err != nil {
1✔
965
                        return nil, err
×
966
                }
×
967
                c.recorder.Eventf(
1✔
968
                        stackset,
1✔
969
                        v1.EventTypeNormal,
1✔
970
                        "CreatedRouteGroup",
1✔
971
                        "Created RouteGroup %s",
1✔
972
                        rg.Name)
1✔
973
                return createdRg, nil
1✔
974
        }
975

976
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
977
        if existingHaveUpdateTimeStamp {
2✔
978
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
979
        }
1✔
980

981
        // Check if we need to update the RouteGroup
982
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(rg.Spec, existing.Spec) &&
1✔
983
                equality.Semantic.DeepEqual(rg.Annotations, existing.Annotations) &&
1✔
984
                equality.Semantic.DeepEqual(rg.Labels, existing.Labels) {
2✔
985
                // add the annotation back after comparing
1✔
986
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
987
                return existing, nil
1✔
988
        }
1✔
989

990
        updated := existing.DeepCopy()
1✔
991
        updated.Spec = rg.Spec
1✔
992
        if rg.Annotations != nil {
1✔
993
                updated.Annotations = rg.Annotations
×
994
        } else {
1✔
995
                updated.Annotations = make(map[string]string)
1✔
996
        }
1✔
997
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
998

1✔
999
        updated.Labels = rg.Labels
1✔
1000

1✔
1001
        createdRg, err := c.client.RouteGroupV1().RouteGroups(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
1002
        if err != nil {
1✔
1003
                return nil, err
×
1004
        }
×
1005
        c.recorder.Eventf(
1✔
1006
                stackset,
1✔
1007
                v1.EventTypeNormal,
1✔
1008
                "UpdatedRouteGroup",
1✔
1009
                "Updated RouteGroup %s",
1✔
1010
                rg.Name)
1✔
1011
        return createdRg, nil
1✔
1012
}
1013

1014
func (c *StackSetController) deleteRouteGroup(ctx context.Context, stackset *zv1.StackSet, rg *rgv1.RouteGroup, ingress *networking.Ingress) error {
1✔
1015
        // Check if an ingress exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
1016
        if stackset.Spec.Ingress != nil {
2✔
1017
                if ingress == nil {
2✔
1018
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress missing", rg.Name)
1✔
1019
                        return nil
1✔
1020
                }
1✔
1021
                timestamp, ok := ingress.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
1022
                // The only scenario version we could think of for this is
1✔
1023
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
1024
                //  in that case, just wait until the RouteGroup has the annotation
1✔
1025
                if !ok {
2✔
1026
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have the %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
1027
                        return nil
1✔
1028
                }
1✔
1029

1030
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
1031
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have a valid %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
1032
                        return nil
1✔
1033
                } else if !ready {
3✔
1034
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s updated less than %s ago", rg.Name, ingress.Name, c.ingressSourceSwitchTTL)
1✔
1035
                        return nil
1✔
1036
                }
1✔
1037
        }
1038
        err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Delete(ctx, rg.Name, metav1.DeleteOptions{})
1✔
1039
        if err != nil {
1✔
1040
                return err
×
1041
        }
×
1042
        c.recorder.Eventf(
1✔
1043
                stackset,
1✔
1044
                v1.EventTypeNormal,
1✔
1045
                "DeletedRouteGroup",
1✔
1046
                "Deleted RouteGroup %s",
1✔
1047
                rg.Namespace)
1✔
1048
        return nil
1✔
1049
}
1050

1051
func (c *StackSetController) ReconcileStackSetIngressSources(
1052
        ctx context.Context,
1053
        stackset *zv1.StackSet,
1054
        existingIng *networking.Ingress,
1055
        existingRg *rgv1.RouteGroup,
1056
        generateIng func() (*networking.Ingress, error),
1057
        generateRg func() (*rgv1.RouteGroup, error),
1058
) error {
1✔
1059
        ingress, err := generateIng()
1✔
1060
        if err != nil {
1✔
1061
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1062
        }
×
1063

1064
        // opt-out existingIng creation in case we have an external entity creating existingIng
1065
        appliedIng, err := c.AddUpdateStackSetIngress(ctx, stackset, existingIng, existingRg, ingress)
1✔
1066
        if err != nil {
1✔
1067
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1068
        }
×
1069

1070
        rg, err := generateRg()
1✔
1071
        if err != nil {
1✔
1072
                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1073
        }
×
1074

1075
        var appliedRg *rgv1.RouteGroup
1✔
1076
        if c.routeGroupSupportEnabled {
2✔
1077
                appliedRg, err = c.AddUpdateStackSetRouteGroup(ctx, stackset, existingRg, appliedIng, rg)
1✔
1078
                if err != nil {
1✔
1079
                        return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1080
                }
×
1081
        }
1082

1083
        // Ingress removed
1084
        if ingress == nil {
2✔
1085
                if existingIng != nil {
2✔
1086
                        err := c.deleteIngress(ctx, stackset, existingIng, appliedRg)
1✔
1087
                        if err != nil {
1✔
1088
                                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1089
                        }
×
1090
                }
1091
        }
1092

1093
        // RouteGroup removed
1094
        if rg == nil {
2✔
1095
                if existingRg != nil {
2✔
1096
                        err := c.deleteRouteGroup(ctx, stackset, existingRg, appliedIng)
1✔
1097
                        if err != nil {
1✔
1098
                                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1099
                        }
×
1100
                }
1101
        }
1102

1103
        return nil
1✔
1104
}
1105

1106
// convertToTrafficSegments removes the central ingress component of the
1107
// StackSet, if and only if the StackSet already has traffic segments.
1108
func (c *StackSetController) convertToTrafficSegments(
1109
        ctx context.Context,
1110
        ssc *core.StackSetContainer,
1111
) error {
×
1112
        if ssc.Ingress == nil && ssc.RouteGroup == nil {
×
1113
                return nil
×
1114
        }
×
1115

1116
        var ingTimestamp, rgTimestamp *metav1.Time
×
1117
        for _, sc := range ssc.StackContainers {
×
1118
                if sc.Stack.Spec.Ingress != nil && sc.Resources.IngressSegment == nil {
×
1119
                        c.logger.Warnf(
×
1120
                                "Not deleting Ingress %s, stack %s doesn't have a segment yet.",
×
1121
                                ssc.Ingress.Name,
×
1122
                                sc.Name(),
×
1123
                        )
×
1124
                        return nil
×
1125
                }
×
1126

1127
                if sc.Stack.Spec.RouteGroup != nil && sc.Resources.RouteGroupSegment == nil {
×
1128
                        c.logger.Warnf(
×
1129
                                "Not deleting RouteGroup %s, stack %s doesn't have a segment yet.",
×
1130
                                ssc.RouteGroup.Name,
×
1131
                                sc.Name(),
×
1132
                        )
×
1133
                        return nil
×
1134
                }
×
1135

1136
                // If we find stacks with a segment, we can delete the
1137
                // central ingress resources.
1138
                if ingTimestamp == nil && sc.Resources.IngressSegment != nil {
×
1139
                        ingTimestamp = &sc.Resources.IngressSegment.CreationTimestamp
×
1140
                }
×
1141

1142
                if rgTimestamp == nil && sc.Resources.RouteGroupSegment != nil {
×
1143
                        rgTimestamp = &sc.Resources.RouteGroupSegment.CreationTimestamp
×
1144
                }
×
1145
        }
1146

1147
        if len(ssc.StackContainers) == 0 {
×
1148
                c.logger.Infof(
×
1149
                        "No stacks found for StackSet %s, safe to delete central "+
×
NEW
1150
                                "ingress/routegroup",
×
1151
                        ssc.StackSet.Name,
×
1152
                )
×
1153

×
1154
                // If we don't have any stacks, we can delete the central ingress
×
1155
                // resources
×
1156
                oldEnough := metav1.NewTime(
×
NEW
1157
                        time.Now().Add(-c.ingressSourceSwitchTTL - time.Minute),
×
1158
                )
×
1159
                ingTimestamp = &oldEnough
×
1160
                rgTimestamp = &oldEnough
×
1161
        }
×
1162

1163
        if ingTimestamp != nil && ssc.Ingress != nil {
×
1164
                if !resourceReadyTime(ingTimestamp.Time, c.ingressSourceSwitchTTL) {
×
1165
                        c.logger.Infof(
×
1166
                                "Not deleting Ingress %s, segments created less than %s ago",
×
1167
                                ssc.Ingress.Name,
×
1168
                                c.ingressSourceSwitchTTL,
×
1169
                        )
×
1170
                        return nil
×
1171
                }
×
1172

1173
                err := c.client.NetworkingV1().Ingresses(ssc.Ingress.Namespace).Delete(
×
1174
                        ctx,
×
1175
                        ssc.Ingress.Name,
×
1176
                        metav1.DeleteOptions{},
×
1177
                )
×
1178
                if err != nil {
×
1179
                        return err
×
1180
                }
×
1181

1182
                c.recorder.Eventf(
×
1183
                        ssc.StackSet,
×
1184
                        v1.EventTypeNormal,
×
1185
                        "DeletedIngress",
×
1186
                        "Deleted Ingress %s, StackSet conversion to traffic segments complete",
×
1187
                        ssc.Ingress.Namespace,
×
1188
                )
×
1189

×
1190
                ssc.Ingress = nil
×
1191
        }
1192

1193
        if rgTimestamp != nil && ssc.RouteGroup != nil {
×
1194
                if !resourceReadyTime(rgTimestamp.Time, c.ingressSourceSwitchTTL) {
×
1195
                        c.logger.Infof(
×
1196
                                "Not deleting RouteGroup %s, segments created less than %s ago",
×
1197
                                ssc.RouteGroup.Name,
×
1198
                                c.ingressSourceSwitchTTL,
×
1199
                        )
×
1200
                        return nil
×
1201
                }
×
1202

1203
                err := c.client.RouteGroupV1().RouteGroups(
×
1204
                        ssc.RouteGroup.Namespace,
×
1205
                ).Delete(
×
1206
                        ctx,
×
1207
                        ssc.RouteGroup.Name,
×
1208
                        metav1.DeleteOptions{},
×
1209
                )
×
1210
                if err != nil {
×
1211
                        return err
×
1212
                }
×
1213

1214
                c.recorder.Eventf(
×
1215
                        ssc.RouteGroup,
×
1216
                        v1.EventTypeNormal,
×
1217
                        "DeletedRouteGroup",
×
1218
                        "Deleted RouteGroup %s, StackSet conversion to traffic segments complete",
×
1219
                        ssc.RouteGroup.Namespace,
×
1220
                )
×
1221

×
1222
                ssc.RouteGroup = nil
×
1223
        }
1224

1225
        return nil
×
1226
}
1227

1228
// ReconcileStackSetResources reconciles the central Ingress and/or RouteGroup
1229
// of the specified StackSet.
1230
//
1231
// If the StackSet supports traffic segments, the controller won't reconcile the
1232
// central ingress resources. This method is deprecated and will be removed in
1233
// the future.
1234
func (c *StackSetController) ReconcileStackSetResources(ctx context.Context, ssc *core.StackSetContainer) error {
×
1235
        if !ssc.SupportsSegmentTraffic() {
×
1236
                err := c.ReconcileStackSetIngressSources(
×
1237
                        ctx,
×
1238
                        ssc.StackSet,
×
1239
                        ssc.Ingress,
×
1240
                        ssc.RouteGroup,
×
1241
                        ssc.GenerateIngress,
×
1242
                        ssc.GenerateRouteGroup,
×
1243
                )
×
1244
                if err != nil {
×
1245
                        return err
×
1246
                }
×
1247
        } else {
×
1248
                // Convert StackSet to traffic segments, if needed
×
1249
                err := c.convertToTrafficSegments(ctx, ssc)
×
1250
                if err != nil {
×
1251
                        return err
×
1252
                }
×
1253
        }
1254

1255
        trafficChanges := ssc.TrafficChanges()
×
1256
        if len(trafficChanges) != 0 {
×
1257
                var changeMessages []string
×
1258
                for _, change := range trafficChanges {
×
1259
                        changeMessages = append(changeMessages, change.String())
×
1260
                }
×
1261

1262
                c.recorder.Eventf(
×
1263
                        ssc.StackSet,
×
1264
                        v1.EventTypeNormal,
×
1265
                        "TrafficSwitched",
×
1266
                        "Switched traffic: %s",
×
1267
                        strings.Join(changeMessages, ", "))
×
1268
        }
1269

1270
        return nil
×
1271
}
1272

1273
func (c *StackSetController) ReconcileStackSetDesiredTraffic(ctx context.Context, existing *zv1.StackSet, generateUpdated func() []*zv1.DesiredTraffic) error {
1✔
1274
        updatedTraffic := generateUpdated()
1✔
1275

1✔
1276
        if equality.Semantic.DeepEqual(existing.Spec.Traffic, updatedTraffic) {
1✔
1277
                return nil
×
1278
        }
×
1279

1280
        updated := existing.DeepCopy()
1✔
1281
        updated.Spec.Traffic = updatedTraffic
1✔
1282

1✔
1283
        _, err := c.client.ZalandoV1().StackSets(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
1284
        if err != nil {
1✔
1285
                return err
×
1286
        }
×
1287
        c.recorder.Eventf(
1✔
1288
                updated,
1✔
1289
                v1.EventTypeNormal,
1✔
1290
                "UpdatedStackSet",
1✔
1291
                "Updated StackSet %s",
1✔
1292
                updated.Name)
1✔
1293
        return nil
1✔
1294
}
1295

1296
func (c *StackSetController) ReconcileStackResources(ctx context.Context, ssc *core.StackSetContainer, sc *core.StackContainer) error {
×
1297
        err := c.ReconcileStackIngress(ctx, sc.Stack, sc.Resources.Ingress, sc.GenerateIngress)
×
1298
        if err != nil {
×
1299
                return c.errorEventf(sc.Stack, "FailedManageIngress", err)
×
1300
        }
×
1301

1302
        // This is to support both central and segment-based traffic.
1303
        if ssc.SupportsSegmentTraffic() {
×
1304
                err = c.ReconcileStackIngress(
×
1305
                        ctx,
×
1306
                        sc.Stack,
×
1307
                        sc.Resources.IngressSegment,
×
1308
                        sc.GenerateIngressSegment,
×
1309
                )
×
1310
                if err != nil {
×
1311
                        return c.errorEventf(sc.Stack, "FailedManageIngressSegment", err)
×
1312
                }
×
1313
        }
1314

1315
        if c.routeGroupSupportEnabled {
×
1316
                err = c.ReconcileStackRouteGroup(ctx, sc.Stack, sc.Resources.RouteGroup, sc.GenerateRouteGroup)
×
1317
                if err != nil {
×
1318
                        return c.errorEventf(sc.Stack, "FailedManageRouteGroup", err)
×
1319
                }
×
1320

1321
                // This is to support both central and segment-based traffic.
1322
                if ssc.SupportsSegmentTraffic() {
×
1323
                        err = c.ReconcileStackRouteGroup(
×
1324
                                ctx,
×
1325
                                sc.Stack,
×
1326
                                sc.Resources.RouteGroupSegment,
×
1327
                                sc.GenerateRouteGroupSegment,
×
1328
                        )
×
1329
                        if err != nil {
×
1330
                                return c.errorEventf(
×
1331
                                        sc.Stack,
×
1332
                                        "FailedManageRouteGroupSegment",
×
1333
                                        err,
×
1334
                                )
×
1335
                        }
×
1336
                }
1337
        }
1338

1339
        if c.configMapSupportEnabled {
×
1340
                err := c.ReconcileStackConfigMapRefs(ctx, sc.Stack, sc.UpdateObjectMeta)
×
1341
                if err != nil {
×
1342
                        return c.errorEventf(sc.Stack, "FailedManageConfigMapRefs", err)
×
1343
                }
×
1344
        }
1345

1346
        if c.secretSupportEnabled {
×
1347
                err := c.ReconcileStackSecretRefs(ctx, sc.Stack, sc.UpdateObjectMeta)
×
1348
                if err != nil {
×
1349
                        return c.errorEventf(sc.Stack, "FailedManageSecretRefs", err)
×
1350
                }
×
1351
        }
1352

1353
        err = c.ReconcileStackDeployment(ctx, sc.Stack, sc.Resources.Deployment, sc.GenerateDeployment)
×
1354
        if err != nil {
×
1355
                return c.errorEventf(sc.Stack, "FailedManageDeployment", err)
×
1356
        }
×
1357

1358
        hpaGenerator := sc.GenerateHPA
×
1359
        if ssc.SupportsSegmentTraffic() {
×
1360
                hpaGenerator = sc.GenerateHPAToSegment
×
1361
        }
×
1362
        err = c.ReconcileStackHPA(ctx, sc.Stack, sc.Resources.HPA, hpaGenerator)
×
1363
        if err != nil {
×
1364
                return c.errorEventf(sc.Stack, "FailedManageHPA", err)
×
1365
        }
×
1366

1367
        err = c.ReconcileStackService(ctx, sc.Stack, sc.Resources.Service, sc.GenerateService)
×
1368
        if err != nil {
×
1369
                return c.errorEventf(sc.Stack, "FailedManageService", err)
×
1370
        }
×
1371

1372
        return nil
×
1373
}
1374

1375
// ReconcileStackSet reconciles all the things from a stackset
1376
func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *core.StackSetContainer) (err error) {
×
1377
        defer func() {
×
1378
                if r := recover(); r != nil {
×
1379
                        c.metricsReporter.ReportPanic()
×
1380
                        c.stacksetLogger(container).Errorf("Encountered a panic while processing a stackset: %v\n%s", r, debug.Stack())
×
1381
                        err = fmt.Errorf("panic: %v", r)
×
1382
                }
×
1383
        }()
1384

1385
        if c.injectSegmentAnnotation(ctx, container.StackSet) {
×
1386
                // Reconciler handles StackSet in the next loop
×
1387
                return nil
×
1388
        }
×
1389

1390
        // Create current stack, if needed. Proceed on errors.
1391
        err = c.CreateCurrentStack(ctx, container)
×
1392
        if err != nil {
×
1393
                err = c.errorEventf(container.StackSet, "FailedCreateStack", err)
×
1394
                c.stacksetLogger(container).Errorf("Unable to create stack: %v", err)
×
1395
        }
×
1396

1397
        // Update statuses from external resources (ingresses, deployments, etc). Abort on errors.
1398
        err = container.UpdateFromResources()
×
1399
        if err != nil {
×
1400
                return err
×
1401
        }
×
1402

1403
        // Update the stacks with the currently selected traffic reconciler. Proceed on errors.
1404
        err = container.ManageTraffic(time.Now())
×
1405
        if err != nil {
×
1406
                c.stacksetLogger(container).Errorf("Traffic reconciliation failed: %v", err)
×
1407
                c.recorder.Eventf(
×
1408
                        container.StackSet,
×
1409
                        v1.EventTypeWarning,
×
1410
                        "TrafficNotSwitched",
×
1411
                        "Failed to switch traffic: "+err.Error())
×
1412
        }
×
1413

1414
        // Mark stacks that should be removed
1415
        container.MarkExpiredStacks()
×
1416

×
1417
        segsInOrder := []types.UID{}
×
1418
        // This is to support both central and segment-based traffic.
×
1419
        if container.SupportsSegmentTraffic() {
×
1420
                // Update traffic segments. Proceed on errors.
×
1421
                segsInOrder, err = c.ReconcileTrafficSegments(ctx, container)
×
1422
                if err != nil {
×
1423
                        err = c.errorEventf(
×
1424
                                container.StackSet,
×
1425
                                reasonFailedManageStackSet,
×
1426
                                err,
×
1427
                        )
×
1428
                        c.stacksetLogger(container).Errorf(
×
1429
                                "Unable to reconcile traffic segments: %v",
×
1430
                                err,
×
1431
                        )
×
1432
                }
×
1433
        }
1434

1435
        // Reconcile stack resources. Proceed on errors.
1436
        reconciledStacks := map[types.UID]bool{}
×
1437
        for _, id := range segsInOrder {
×
1438
                reconciledStacks[id] = true
×
1439
                sc := container.StackContainers[id]
×
1440
                err = c.ReconcileStackResources(ctx, container, sc)
×
1441
                if err != nil {
×
1442
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
1443
                        c.stackLogger(container, sc).Errorf(
×
1444
                                "Unable to reconcile stack resources: %v",
×
1445
                                err,
×
1446
                        )
×
1447
                }
×
1448
        }
1449

1450
        for k, sc := range container.StackContainers {
×
1451
                if reconciledStacks[k] {
×
1452
                        continue
×
1453
                }
1454

1455
                err = c.ReconcileStackResources(ctx, container, sc)
×
1456
                if err != nil {
×
1457
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
1458
                        c.stackLogger(container, sc).Errorf("Unable to reconcile stack resources: %v", err)
×
1459
                }
×
1460
        }
1461

1462
        // Reconcile stackset resources (update ingress and/or routegroups). Proceed on errors.
1463
        err = c.ReconcileStackSetResources(ctx, container)
×
1464
        if err != nil {
×
1465
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1466
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset resources: %v", err)
×
1467
        }
×
1468

1469
        // Reconcile desired traffic in the stackset. Proceed on errors.
1470
        err = c.ReconcileStackSetDesiredTraffic(ctx, container.StackSet, container.GenerateStackSetTraffic)
×
1471
        if err != nil {
×
1472
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1473
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset traffic: %v", err)
×
1474
        }
×
1475

1476
        // Delete old stacks. Proceed on errors.
1477
        err = c.CleanupOldStacks(ctx, container)
×
1478
        if err != nil {
×
1479
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1480
                c.stacksetLogger(container).Errorf("Unable to delete old stacks: %v", err)
×
1481
        }
×
1482

1483
        // Update statuses.
1484
        err = c.ReconcileStatuses(ctx, container)
×
1485
        if err != nil {
×
1486
                return err
×
1487
        }
×
1488

1489
        return nil
×
1490
}
1491

1492
// getResetMinReplicasDelay parses and returns the reset delay if set in the
1493
// stackset annotation.
1494
func getResetMinReplicasDelay(annotations map[string]string) (time.Duration, bool) {
1✔
1495
        resetDelayStr, ok := annotations[ResetHPAMinReplicasDelayAnnotationKey]
1✔
1496
        if !ok {
2✔
1497
                return 0, false
1✔
1498
        }
1✔
1499
        resetDelay, err := time.ParseDuration(resetDelayStr)
1✔
1500
        if err != nil {
1✔
1501
                return 0, false
×
1502
        }
×
1503
        return resetDelay, true
1✔
1504
}
1505

1506
func fixupStackSetTypeMeta(stackset *zv1.StackSet) {
1✔
1507
        // set TypeMeta manually because of this bug:
1✔
1508
        // https://github.com/kubernetes/client-go/issues/308
1✔
1509
        stackset.APIVersion = core.APIVersion
1✔
1510
        stackset.Kind = core.KindStackSet
1✔
1511
}
1✔
1512

1513
func fixupStackTypeMeta(stack *zv1.Stack) {
1✔
1514
        // set TypeMeta manually because of this bug:
1✔
1515
        // https://github.com/kubernetes/client-go/issues/308
1✔
1516
        stack.APIVersion = core.APIVersion
1✔
1517
        stack.Kind = core.KindStack
1✔
1518
}
1✔
1519

1520
func resourceReady(timestamp string, ttl time.Duration) (bool, error) {
1✔
1521
        resourceLastUpdated, err := time.Parse(time.RFC3339, timestamp)
1✔
1522
        if err != nil {
2✔
1523
                // wait until there's a valid timestamp on the annotation
1✔
1524
                return false, err
1✔
1525
        }
1✔
1526

1527
        return resourceReadyTime(resourceLastUpdated, ttl), nil
1✔
1528
}
1529

1530
func resourceReadyTime(timestamp time.Time, ttl time.Duration) bool {
1✔
1531
        if !timestamp.IsZero() && time.Since(timestamp) > ttl {
2✔
1532
                return true
1✔
1533
        }
1✔
1534

1535
        return false
1✔
1536
}
1537

1538
// validateConfigurationResourcesNames returns an error if any ConfigurationResource
1539
// name is not prefixed by Stack name.
1540
func validateAllConfigurationResourcesNames(stack *zv1.Stack) error {
1✔
1541
        for _, rsc := range stack.Spec.ConfigurationResources {
1✔
1542
                if err := validateConfigurationResourceName(stack.Name, rsc.GetName()); err != nil {
×
1543
                        return err
×
1544
                }
×
1545
        }
1546
        return nil
1✔
1547
}
1548

1549
// validateConfigurationResourceName returns an error if specific resource
1550
// name is not prefixed by Stack name.
1551
func validateConfigurationResourceName(stack string, rsc string) error {
1✔
1552
        if !strings.HasPrefix(rsc, stack) {
2✔
1553
                return fmt.Errorf(configurationResourceNameError, rsc, stack)
1✔
1554
        }
1✔
1555
        return nil
1✔
1556
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc