• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando-incubator / stackset-controller / 7062472472

01 Dec 2023 03:53PM UTC coverage: 73.254% (+0.07%) from 73.186%
7062472472

push

github

web-flow
Reference ConfigMap for per stack versioning (#520)

Currently Stacks share the same configuration resources, which may lead to confusion and incidents. As a first step on having one configuration resource dedicated to each stack, this Pull Request implements ConfigMaps versioned by reference. Other resources will follow in future Pull Requests.

The expected behaviour is that given the a ConfigMap named in the <stack-name>-<configmap-name> pattern is referenced under ConfigurationResources on the Stack definition, Stackset Controller identifies the deployed ConfigMap and updates it with the Stack info on ownerReferences.

81 of 109 new or added lines in 4 files covered. (74.31%)

2 existing lines in 1 file now uncovered.

2328 of 3178 relevant lines covered (73.25%)

0.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.83
/controller/stackset.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "net/http"
7
        "runtime/debug"
8
        "strings"
9
        "sync"
10
        "time"
11

12
        "github.com/google/go-cmp/cmp"
13
        "github.com/google/go-cmp/cmp/cmpopts"
14
        "github.com/heptiolabs/healthcheck"
15
        "github.com/prometheus/client_golang/prometheus"
16
        log "github.com/sirupsen/logrus"
17
        rgv1 "github.com/szuecs/routegroup-client/apis/zalando.org/v1"
18
        zv1 "github.com/zalando-incubator/stackset-controller/pkg/apis/zalando.org/v1"
19
        "github.com/zalando-incubator/stackset-controller/pkg/clientset"
20
        "github.com/zalando-incubator/stackset-controller/pkg/core"
21
        "github.com/zalando-incubator/stackset-controller/pkg/recorder"
22
        "golang.org/x/sync/errgroup"
23
        v1 "k8s.io/api/core/v1"
24
        networking "k8s.io/api/networking/v1"
25
        "k8s.io/apimachinery/pkg/api/equality"
26
        "k8s.io/apimachinery/pkg/api/errors"
27
        "k8s.io/apimachinery/pkg/api/resource"
28
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
        "k8s.io/apimachinery/pkg/fields"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        "k8s.io/client-go/tools/cache"
33
        kube_record "k8s.io/client-go/tools/record"
34
)
35

36
const (
37
        PrescaleStacksAnnotationKey               = "alpha.stackset-controller.zalando.org/prescale-stacks"
38
        ResetHPAMinReplicasDelayAnnotationKey     = "alpha.stackset-controller.zalando.org/reset-hpa-min-replicas-delay"
39
        StacksetControllerControllerAnnotationKey = "stackset-controller.zalando.org/controller"
40
        ControllerLastUpdatedAnnotationKey        = "stackset-controller.zalando.org/updated-timestamp"
41

42
        reasonFailedManageStackSet = "FailedManageStackSet"
43

44
        defaultResetMinReplicasDelay = 10 * time.Minute
45
)
46

47
// StackSetController is the main controller. It watches for changes to
48
// stackset resources and starts and maintains other controllers per
49
// stackset resource.
50
type StackSetController struct {
51
        logger                      *log.Entry
52
        client                      clientset.Interface
53
        controllerID                string
54
        backendWeightsAnnotationKey string
55
        clusterDomains              []string
56
        interval                    time.Duration
57
        stacksetEvents              chan stacksetEvent
58
        stacksetStore               map[types.UID]zv1.StackSet
59
        recorder                    kube_record.EventRecorder
60
        metricsReporter             *core.MetricsReporter
61
        HealthReporter              healthcheck.Handler
62
        routeGroupSupportEnabled    bool
63
        ingressSourceSwitchTTL      time.Duration
64
        now                         func() string
65
        reconcileWorkers            int
66
        configMapSupportEnabled     bool
67
        sync.Mutex
68
}
69

70
type stacksetEvent struct {
71
        Deleted  bool
72
        StackSet *zv1.StackSet
73
}
74

75
// eventedError wraps an error that was already exposed as an event to the user
76
type eventedError struct {
77
        err error
78
}
79

80
func (ee *eventedError) Error() string {
×
81
        return ee.err.Error()
×
82
}
×
83

84
func now() string {
×
85
        return time.Now().Format(time.RFC3339)
×
86
}
×
87

88
// NewStackSetController initializes a new StackSetController.
89
func NewStackSetController(
90
        client clientset.Interface,
91
        controllerID string,
92
        parallelWork int,
93
        backendWeightsAnnotationKey string,
94
        clusterDomains []string,
95
        registry prometheus.Registerer,
96
        interval time.Duration,
97
        routeGroupSupportEnabled bool,
98
        ingressSourceSwitchTTL time.Duration,
99
        configMapSupportEnabled bool,
100
) (*StackSetController, error) {
1✔
101
        metricsReporter, err := core.NewMetricsReporter(registry)
1✔
102
        if err != nil {
1✔
103
                return nil, err
×
104
        }
×
105

106
        return &StackSetController{
1✔
107
                logger:                      log.WithFields(log.Fields{"controller": "stackset"}),
1✔
108
                client:                      client,
1✔
109
                controllerID:                controllerID,
1✔
110
                backendWeightsAnnotationKey: backendWeightsAnnotationKey,
1✔
111
                clusterDomains:              clusterDomains,
1✔
112
                interval:                    interval,
1✔
113
                stacksetEvents:              make(chan stacksetEvent, 1),
1✔
114
                stacksetStore:               make(map[types.UID]zv1.StackSet),
1✔
115
                recorder:                    recorder.CreateEventRecorder(client),
1✔
116
                metricsReporter:             metricsReporter,
1✔
117
                HealthReporter:              healthcheck.NewHandler(),
1✔
118
                routeGroupSupportEnabled:    routeGroupSupportEnabled,
1✔
119
                ingressSourceSwitchTTL:      ingressSourceSwitchTTL,
1✔
120
                configMapSupportEnabled:     configMapSupportEnabled,
1✔
121
                now:                         now,
1✔
122
                reconcileWorkers:            parallelWork,
1✔
123
        }, nil
1✔
124
}
125

126
func (c *StackSetController) stacksetLogger(ssc *core.StackSetContainer) *log.Entry {
×
127
        return c.logger.WithFields(map[string]interface{}{
×
128
                "namespace": ssc.StackSet.Namespace,
×
129
                "stackset":  ssc.StackSet.Name,
×
130
        })
×
131
}
×
132

133
func (c *StackSetController) stackLogger(ssc *core.StackSetContainer, sc *core.StackContainer) *log.Entry {
×
134
        return c.logger.WithFields(map[string]interface{}{
×
135
                "namespace": ssc.StackSet.Namespace,
×
136
                "stackset":  ssc.StackSet.Name,
×
137
                "stack":     sc.Name(),
×
138
        })
×
139
}
×
140

141
// Run runs the main loop of the StackSetController. Before the loops it
142
// sets up a watcher to watch StackSet resources. The watch will send
143
// changes over a channel which is polled from the main loop.
144
func (c *StackSetController) Run(ctx context.Context) {
×
145
        var nextCheck time.Time
×
146

×
147
        // We're not alive if nextCheck is too far in the past
×
148
        c.HealthReporter.AddLivenessCheck("nextCheck", func() error {
×
149
                if time.Since(nextCheck) > 5*c.interval {
×
150
                        return fmt.Errorf("nextCheck too old")
×
151
                }
×
152
                return nil
×
153
        })
154

155
        c.startWatch(ctx)
×
156

×
157
        http.HandleFunc("/healthz", c.HealthReporter.LiveEndpoint)
×
158

×
159
        nextCheck = time.Now().Add(-c.interval)
×
160

×
161
        for {
×
162
                select {
×
163
                case <-time.After(time.Until(nextCheck)):
×
164

×
165
                        nextCheck = time.Now().Add(c.interval)
×
166

×
167
                        stackSetContainers, err := c.collectResources(ctx)
×
168
                        if err != nil {
×
169
                                c.logger.Errorf("Failed to collect resources: %v", err)
×
170
                                continue
×
171
                        }
172

173
                        var reconcileGroup errgroup.Group
×
174
                        reconcileGroup.SetLimit(c.reconcileWorkers)
×
175
                        for stackset, container := range stackSetContainers {
×
176
                                container := container
×
177
                                stackset := stackset
×
178

×
179
                                reconcileGroup.Go(func() error {
×
180
                                        if _, ok := c.stacksetStore[stackset]; ok {
×
181
                                                err := c.ReconcileStackSet(ctx, container)
×
182
                                                if err != nil {
×
183
                                                        c.stacksetLogger(container).Errorf("unable to reconcile a stackset: %v", err)
×
184
                                                        return c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
185
                                                }
×
186
                                        }
187
                                        return nil
×
188
                                })
189
                        }
190

191
                        err = reconcileGroup.Wait()
×
192
                        if err != nil {
×
193
                                c.logger.Errorf("Failed waiting for reconcilers: %v", err)
×
194
                        }
×
195
                        err = c.metricsReporter.Report(stackSetContainers)
×
196
                        if err != nil {
×
197
                                c.logger.Errorf("Failed reporting metrics: %v", err)
×
198
                        }
×
199
                case e := <-c.stacksetEvents:
×
200
                        stackset := *e.StackSet
×
201
                        fixupStackSetTypeMeta(&stackset)
×
202

×
203
                        // update/delete existing entry
×
204
                        if _, ok := c.stacksetStore[stackset.UID]; ok {
×
205
                                if e.Deleted || !c.hasOwnership(&stackset) {
×
206
                                        delete(c.stacksetStore, stackset.UID)
×
207
                                        continue
×
208
                                }
209

210
                                // update stackset entry
211
                                c.stacksetStore[stackset.UID] = stackset
×
212
                                continue
×
213
                        }
214

215
                        // check if stackset should be managed by the controller
216
                        if !c.hasOwnership(&stackset) {
×
217
                                continue
×
218
                        }
219

220
                        c.logger.Infof("Adding entry for StackSet %s/%s", stackset.Namespace, stackset.Name)
×
221
                        c.stacksetStore[stackset.UID] = stackset
×
222
                case <-ctx.Done():
×
223
                        c.logger.Info("Terminating main controller loop.")
×
224
                        return
×
225
                }
226
        }
227
}
228

229
// collectResources collects resources for all stacksets at once and stores them per StackSet/Stack so that we don't
230
// overload the API requests with unnecessary requests
231
func (c *StackSetController) collectResources(ctx context.Context) (map[types.UID]*core.StackSetContainer, error) {
1✔
232
        stacksets := make(map[types.UID]*core.StackSetContainer, len(c.stacksetStore))
1✔
233
        for uid, stackset := range c.stacksetStore {
2✔
234
                stackset := stackset
1✔
235

1✔
236
                reconciler := core.TrafficReconciler(&core.SimpleTrafficReconciler{})
1✔
237

1✔
238
                // use prescaling logic if enabled with an annotation
1✔
239
                if _, ok := stackset.Annotations[PrescaleStacksAnnotationKey]; ok {
2✔
240
                        resetDelay := defaultResetMinReplicasDelay
1✔
241
                        if resetDelayValue, ok := getResetMinReplicasDelay(stackset.Annotations); ok {
2✔
242
                                resetDelay = resetDelayValue
1✔
243
                        }
1✔
244
                        reconciler = &core.PrescalingTrafficReconciler{
1✔
245
                                ResetHPAMinReplicasTimeout: resetDelay,
1✔
246
                        }
1✔
247
                }
248

249
                stacksetContainer := core.NewContainer(&stackset, reconciler, c.backendWeightsAnnotationKey, c.clusterDomains)
1✔
250
                stacksets[uid] = stacksetContainer
1✔
251
        }
252

253
        err := c.collectStacks(ctx, stacksets)
1✔
254
        if err != nil {
1✔
255
                return nil, err
×
256
        }
×
257

258
        err = c.collectIngresses(ctx, stacksets)
1✔
259
        if err != nil {
1✔
260
                return nil, err
×
261
        }
×
262

263
        if c.routeGroupSupportEnabled {
2✔
264
                err = c.collectRouteGroups(ctx, stacksets)
1✔
265
                if err != nil {
1✔
266
                        return nil, err
×
267
                }
×
268
        }
269

270
        err = c.collectDeployments(ctx, stacksets)
1✔
271
        if err != nil {
1✔
272
                return nil, err
×
273
        }
×
274

275
        err = c.collectServices(ctx, stacksets)
1✔
276
        if err != nil {
1✔
277
                return nil, err
×
278
        }
×
279

280
        err = c.collectHPAs(ctx, stacksets)
1✔
281
        if err != nil {
1✔
282
                return nil, err
×
283
        }
×
284

285
        if c.configMapSupportEnabled {
2✔
286
                err = c.collectConfigMaps(ctx, stacksets)
1✔
287
                if err != nil {
1✔
NEW
288
                        return nil, err
×
NEW
289
                }
×
290
        }
291

292
        return stacksets, nil
1✔
293
}
294

295
func (c *StackSetController) collectIngresses(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
296
        ingresses, err := c.client.NetworkingV1().Ingresses(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
297
        if err != nil {
1✔
298
                return fmt.Errorf("failed to list Ingresses: %v", err)
×
299
        }
×
300

301
Items:
1✔
302
        for _, i := range ingresses.Items {
2✔
303
                ingress := i
1✔
304
                if uid, ok := getOwnerUID(ingress.ObjectMeta); ok {
2✔
305
                        // stackset ingress
1✔
306
                        if s, ok := stacksets[uid]; ok {
2✔
307
                                s.Ingress = &ingress
1✔
308
                                continue
1✔
309
                        }
310

311
                        // stack ingress
312
                        for _, stackset := range stacksets {
2✔
313
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
314
                                        s.Resources.Ingress = &ingress
1✔
315
                                        continue Items
1✔
316
                                }
317
                        }
318
                }
319
        }
320
        return nil
1✔
321
}
322

323
func (c *StackSetController) collectRouteGroups(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
324
        routegroups, err := c.client.RouteGroupV1().RouteGroups(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
325
        if err != nil {
1✔
326
                return fmt.Errorf("failed to list RouteGroups: %v", err)
×
327
        }
×
328

329
Items:
1✔
330
        for _, rg := range routegroups.Items {
2✔
331
                routegroup := rg
1✔
332
                if uid, ok := getOwnerUID(routegroup.ObjectMeta); ok {
2✔
333
                        // stackset routegroups
1✔
334
                        if s, ok := stacksets[uid]; ok {
2✔
335
                                s.RouteGroup = &routegroup
1✔
336
                                continue
1✔
337
                        }
338

339
                        // stack routegroups
340
                        for _, stackset := range stacksets {
2✔
341
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
342
                                        s.Resources.RouteGroup = &routegroup
1✔
343
                                        continue Items
1✔
344
                                }
345
                        }
346
                }
347
        }
348
        return nil
1✔
349
}
350

351
func (c *StackSetController) collectStacks(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
352
        stacks, err := c.client.ZalandoV1().Stacks(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
353
        if err != nil {
1✔
354
                return fmt.Errorf("failed to list Stacks: %v", err)
×
355
        }
×
356

357
        for _, stack := range stacks.Items {
2✔
358
                if uid, ok := getOwnerUID(stack.ObjectMeta); ok {
2✔
359
                        if s, ok := stacksets[uid]; ok {
2✔
360
                                stack := stack
1✔
361
                                fixupStackTypeMeta(&stack)
1✔
362

1✔
363
                                s.StackContainers[stack.UID] = &core.StackContainer{
1✔
364
                                        Stack: &stack,
1✔
365
                                }
1✔
366
                                continue
1✔
367
                        }
368
                }
369
        }
370
        return nil
1✔
371
}
372

373
func (c *StackSetController) collectDeployments(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
374
        deployments, err := c.client.AppsV1().Deployments(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
375
        if err != nil {
1✔
376
                return fmt.Errorf("failed to list Deployments: %v", err)
×
377
        }
×
378

379
        for _, d := range deployments.Items {
2✔
380
                deployment := d
1✔
381
                if uid, ok := getOwnerUID(deployment.ObjectMeta); ok {
2✔
382
                        for _, stackset := range stacksets {
2✔
383
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
384
                                        s.Resources.Deployment = &deployment
1✔
385
                                        break
1✔
386
                                }
387
                        }
388
                }
389
        }
390
        return nil
1✔
391
}
392

393
func (c *StackSetController) collectServices(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
394
        services, err := c.client.CoreV1().Services(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
395
        if err != nil {
1✔
396
                return fmt.Errorf("failed to list Services: %v", err)
×
397
        }
×
398

399
Items:
1✔
400
        for _, s := range services.Items {
2✔
401
                service := s
1✔
402
                if uid, ok := getOwnerUID(service.ObjectMeta); ok {
2✔
403
                        for _, stackset := range stacksets {
2✔
404
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
405
                                        s.Resources.Service = &service
1✔
406
                                        continue Items
1✔
407
                                }
408

409
                                // service/HPA used to be owned by the deployment for some reason
410
                                for _, stack := range stackset.StackContainers {
2✔
411
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
412
                                                stack.Resources.Service = &service
1✔
413
                                                continue Items
1✔
414
                                        }
415
                                }
416
                        }
417
                }
418
        }
419
        return nil
1✔
420
}
421

422
func (c *StackSetController) collectHPAs(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
423
        hpas, err := c.client.AutoscalingV2().HorizontalPodAutoscalers(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
424
        if err != nil {
1✔
425
                return fmt.Errorf("failed to list HPAs: %v", err)
×
426
        }
×
427

428
Items:
1✔
429
        for _, h := range hpas.Items {
2✔
430
                hpa := h
1✔
431
                if uid, ok := getOwnerUID(hpa.ObjectMeta); ok {
2✔
432
                        for _, stackset := range stacksets {
2✔
433
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
434
                                        s.Resources.HPA = &hpa
1✔
435
                                        continue Items
1✔
436
                                }
437

438
                                // service/HPA used to be owned by the deployment for some reason
439
                                for _, stack := range stackset.StackContainers {
2✔
440
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
441
                                                stack.Resources.HPA = &hpa
1✔
442
                                                continue Items
1✔
443
                                        }
444
                                }
445
                        }
446
                }
447
        }
448
        return nil
1✔
449
}
450

451
func (c *StackSetController) collectConfigMaps(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
452
        configMaps, err := c.client.CoreV1().ConfigMaps(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
453
        if err != nil {
1✔
NEW
454
                return fmt.Errorf("failed to list ConfigMaps: %v", err)
×
NEW
455
        }
×
456

457
        for _, cm := range configMaps.Items {
2✔
458
                configMap := cm
1✔
459
                if uid, ok := getOwnerUID(configMap.ObjectMeta); ok {
2✔
460
                        for _, stackset := range stacksets {
2✔
461
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
462
                                        s.Resources.ConfigMaps = append(s.Resources.ConfigMaps, &configMap)
1✔
463
                                        break
1✔
464
                                }
465
                        }
466
                }
467
        }
468
        return nil
1✔
469
}
470

471
func getOwnerUID(objectMeta metav1.ObjectMeta) (types.UID, bool) {
1✔
472
        if len(objectMeta.OwnerReferences) == 1 {
2✔
473
                return objectMeta.OwnerReferences[0].UID, true
1✔
474
        }
1✔
475
        return "", false
1✔
476
}
477

478
func (c *StackSetController) errorEventf(object runtime.Object, reason string, err error) error {
×
479
        switch err.(type) {
×
480
        case *eventedError:
×
481
                // already notified
×
482
                return err
×
483
        default:
×
484
                c.recorder.Eventf(
×
485
                        object,
×
486
                        v1.EventTypeWarning,
×
487
                        reason,
×
488
                        err.Error())
×
489
                return &eventedError{err: err}
×
490
        }
491
}
492

493
// hasOwnership returns true if the controller is the "owner" of the stackset.
494
// Whether it's owner is determined by the value of the
495
// 'stackset-controller.zalando.org/controller' annotation. If the value
496
// matches the controllerID then it owns it, or if the controllerID is
497
// "" and there's no annotation set.
498
func (c *StackSetController) hasOwnership(stackset *zv1.StackSet) bool {
×
499
        if stackset.Annotations != nil {
×
500
                if owner, ok := stackset.Annotations[StacksetControllerControllerAnnotationKey]; ok {
×
501
                        return owner == c.controllerID
×
502
                }
×
503
        }
504
        return c.controllerID == ""
×
505
}
506

507
func (c *StackSetController) startWatch(ctx context.Context) {
×
508
        informer := cache.NewSharedIndexInformer(
×
509
                cache.NewListWatchFromClient(c.client.ZalandoV1().RESTClient(), "stacksets", v1.NamespaceAll, fields.Everything()),
×
510
                &zv1.StackSet{},
×
511
                0, // skip resync
×
512
                cache.Indexers{},
×
513
        )
×
514

×
515
        informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
516
                AddFunc:    c.add,
×
517
                UpdateFunc: c.update,
×
518
                DeleteFunc: c.del,
×
519
        })
×
520
        go informer.Run(ctx.Done())
×
521
        if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
×
522
                c.logger.Errorf("Timed out waiting for caches to sync")
×
523
                return
×
524
        }
×
525
        c.logger.Info("Synced StackSet watcher")
×
526
}
527

528
func (c *StackSetController) add(obj interface{}) {
×
529
        stackset, ok := obj.(*zv1.StackSet)
×
530
        if !ok {
×
531
                return
×
532
        }
×
533

534
        c.logger.Infof("New StackSet added %s/%s", stackset.Namespace, stackset.Name)
×
535
        c.stacksetEvents <- stacksetEvent{
×
536
                StackSet: stackset.DeepCopy(),
×
537
        }
×
538
}
539

540
func (c *StackSetController) update(oldObj, newObj interface{}) {
×
541
        newStackset, ok := newObj.(*zv1.StackSet)
×
542
        if !ok {
×
543
                return
×
544
        }
×
545

546
        oldStackset, ok := oldObj.(*zv1.StackSet)
×
547
        if !ok {
×
548
                return
×
549
        }
×
550

551
        c.logger.Debugf("StackSet %s/%s changed: %s",
×
552
                newStackset.Namespace,
×
553
                newStackset.Name,
×
554
                cmp.Diff(oldStackset, newStackset, cmpopts.IgnoreUnexported(resource.Quantity{})),
×
555
        )
×
556

×
557
        c.logger.Infof("StackSet updated %s/%s", newStackset.Namespace, newStackset.Name)
×
558
        c.stacksetEvents <- stacksetEvent{
×
559
                StackSet: newStackset.DeepCopy(),
×
560
        }
×
561
}
562

563
func (c *StackSetController) del(obj interface{}) {
×
564
        stackset, ok := obj.(*zv1.StackSet)
×
565
        if !ok {
×
566
                return
×
567
        }
×
568

569
        c.logger.Infof("StackSet deleted %s/%s", stackset.Namespace, stackset.Name)
×
570
        c.stacksetEvents <- stacksetEvent{
×
571
                StackSet: stackset.DeepCopy(),
×
572
                Deleted:  true,
×
573
        }
×
574
}
575

576
func retryUpdate(updateFn func(retry bool) error) error {
×
577
        retry := false
×
578
        for {
×
579
                err := updateFn(retry)
×
580
                if err != nil {
×
581
                        if errors.IsConflict(err) {
×
582
                                retry = true
×
583
                                continue
×
584
                        }
585
                        return err
×
586
                }
587
                return nil
×
588
        }
589
}
590

591
// ReconcileStatuses reconciles the statuses of StackSets and Stacks.
592
func (c *StackSetController) ReconcileStatuses(ctx context.Context, ssc *core.StackSetContainer) error {
×
593
        for _, sc := range ssc.StackContainers {
×
594
                stack := sc.Stack.DeepCopy()
×
595
                status := *sc.GenerateStackStatus()
×
596
                err := retryUpdate(func(retry bool) error {
×
597
                        if retry {
×
598
                                updated, err := c.client.ZalandoV1().Stacks(sc.Namespace()).Get(ctx, stack.Name, metav1.GetOptions{})
×
599
                                if err != nil {
×
600
                                        return err
×
601
                                }
×
602
                                stack = updated
×
603
                        }
604
                        if !equality.Semantic.DeepEqual(status, stack.Status) {
×
605
                                stack.Status = status
×
606
                                _, err := c.client.ZalandoV1().Stacks(sc.Namespace()).UpdateStatus(ctx, stack, metav1.UpdateOptions{})
×
607
                                return err
×
608
                        }
×
609
                        return nil
×
610
                })
611
                if err != nil {
×
612
                        return c.errorEventf(sc.Stack, "FailedUpdateStackStatus", err)
×
613
                }
×
614
        }
615

616
        stackset := ssc.StackSet.DeepCopy()
×
617
        status := *ssc.GenerateStackSetStatus()
×
618
        err := retryUpdate(func(retry bool) error {
×
619
                if retry {
×
620
                        updated, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).Get(ctx, ssc.StackSet.Name, metav1.GetOptions{})
×
621
                        if err != nil {
×
622
                                return err
×
623
                        }
×
624
                        stackset = updated
×
625
                }
626
                if !equality.Semantic.DeepEqual(status, stackset.Status) {
×
627
                        stackset.Status = status
×
628
                        _, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, stackset, metav1.UpdateOptions{})
×
629
                        return err
×
630
                }
×
631
                return nil
×
632
        })
UNCOV
633
        if err != nil {
×
634
                return c.errorEventf(ssc.StackSet, "FailedUpdateStackSetStatus", err)
×
635
        }
×
636
        return nil
×
637
}
638

639
// CreateCurrentStack creates a new Stack object for the current stack, if needed
640
func (c *StackSetController) CreateCurrentStack(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
641
        newStack, newStackVersion := ssc.NewStack()
1✔
642
        if newStack == nil {
2✔
643
                return nil
1✔
644
        }
1✔
645

646
        if c.configMapSupportEnabled {
2✔
647
                // ensure that ConfigurationResources are prefixed by Stack name.
1✔
648
                if err := validateConfigurationResourceNames(newStack.Stack); err != nil {
1✔
NEW
649
                        return err
×
NEW
650
                }
×
651
        }
652

653
        created, err := c.client.ZalandoV1().Stacks(newStack.Namespace()).Create(ctx, newStack.Stack, metav1.CreateOptions{})
1✔
654
        if err != nil {
1✔
655
                return err
×
656
        }
×
657
        fixupStackTypeMeta(created)
1✔
658

1✔
659
        c.recorder.Eventf(
1✔
660
                ssc.StackSet,
1✔
661
                v1.EventTypeNormal,
1✔
662
                "CreatedStack",
1✔
663
                "Created stack %s",
1✔
664
                newStack.Name())
1✔
665

1✔
666
        // Persist ObservedStackVersion in the status
1✔
667
        updated := ssc.StackSet.DeepCopy()
1✔
668
        updated.Status.ObservedStackVersion = newStackVersion
1✔
669

1✔
670
        result, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, updated, metav1.UpdateOptions{})
1✔
671
        if err != nil {
1✔
672
                return err
×
673
        }
×
674
        fixupStackSetTypeMeta(result)
1✔
675
        ssc.StackSet = result
1✔
676

1✔
677
        ssc.StackContainers[created.UID] = &core.StackContainer{
1✔
678
                Stack:          created,
1✔
679
                PendingRemoval: false,
1✔
680
                Resources:      core.StackResources{},
1✔
681
        }
1✔
682
        return nil
1✔
683
}
684

685
// CleanupOldStacks deletes stacks that are no longer needed.
686
func (c *StackSetController) CleanupOldStacks(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
687
        for _, sc := range ssc.StackContainers {
2✔
688
                if !sc.PendingRemoval {
2✔
689
                        continue
1✔
690
                }
691

692
                stack := sc.Stack
1✔
693
                err := c.client.ZalandoV1().Stacks(stack.Namespace).Delete(ctx, stack.Name, metav1.DeleteOptions{})
1✔
694
                if err != nil {
1✔
695
                        return c.errorEventf(ssc.StackSet, "FailedDeleteStack", err)
×
696
                }
×
697
                c.recorder.Eventf(
1✔
698
                        ssc.StackSet,
1✔
699
                        v1.EventTypeNormal,
1✔
700
                        "DeletedExcessStack",
1✔
701
                        "Deleted excess stack %s",
1✔
702
                        stack.Name)
1✔
703
        }
704

705
        return nil
1✔
706
}
707

708
// AddUpdateStackSetIngress reconciles the Ingress but never deletes it, it returns the existing/new Ingress
709
func (c *StackSetController) AddUpdateStackSetIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup, ingress *networking.Ingress) (*networking.Ingress, error) {
1✔
710
        // Ingress removed, handled outside
1✔
711
        if ingress == nil {
2✔
712
                return existing, nil
1✔
713
        }
1✔
714

715
        // Create new Ingress
716
        if existing == nil {
2✔
717
                if ingress.Annotations == nil {
2✔
718
                        ingress.Annotations = make(map[string]string)
1✔
719
                }
1✔
720
                ingress.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
721

1✔
722
                createdIng, err := c.client.NetworkingV1().Ingresses(ingress.Namespace).Create(ctx, ingress, metav1.CreateOptions{})
1✔
723
                if err != nil {
1✔
724
                        return nil, err
×
725
                }
×
726
                c.recorder.Eventf(
1✔
727
                        stackset,
1✔
728
                        v1.EventTypeNormal,
1✔
729
                        "CreatedIngress",
1✔
730
                        "Created Ingress %s",
1✔
731
                        ingress.Name)
1✔
732
                return createdIng, nil
1✔
733
        }
734

735
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
736
        if existingHaveUpdateTimeStamp {
2✔
737
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
738
        }
1✔
739

740
        // Check if we need to update the Ingress
741
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(ingress.Spec, existing.Spec) &&
1✔
742
                equality.Semantic.DeepEqual(ingress.Annotations, existing.Annotations) &&
1✔
743
                equality.Semantic.DeepEqual(ingress.Labels, existing.Labels) {
2✔
744
                // add the annotation back after comparing
1✔
745
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
746
                return existing, nil
1✔
747
        }
1✔
748

749
        updated := existing.DeepCopy()
1✔
750
        updated.Spec = ingress.Spec
1✔
751
        if ingress.Annotations != nil {
2✔
752
                updated.Annotations = ingress.Annotations
1✔
753
        } else {
2✔
754
                updated.Annotations = make(map[string]string)
1✔
755
        }
1✔
756
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
757

1✔
758
        updated.Labels = ingress.Labels
1✔
759

1✔
760
        createdIngress, err := c.client.NetworkingV1().Ingresses(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
761
        if err != nil {
1✔
762
                return nil, err
×
763
        }
×
764
        c.recorder.Eventf(
1✔
765
                stackset,
1✔
766
                v1.EventTypeNormal,
1✔
767
                "UpdatedIngress",
1✔
768
                "Updated Ingress %s",
1✔
769
                ingress.Name)
1✔
770
        return createdIngress, nil
1✔
771
}
772

773
func (c *StackSetController) deleteIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup) error {
1✔
774
        // Check if a routegroup exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
775
        if stackset.Spec.RouteGroup != nil && c.routeGroupSupportEnabled {
2✔
776
                if routegroup == nil {
2✔
777
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup missing", existing.Name)
1✔
778
                        return nil
1✔
779
                }
1✔
780
                timestamp, ok := routegroup.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
781
                // The only scenario version we could think of for this is
1✔
782
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
783
                //  in that case, just wait until the RouteGroup has the annotation
1✔
784
                if !ok {
2✔
785
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have the %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
786
                        return nil
1✔
787
                }
1✔
788

789
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
790
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have a valid %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
791
                        return nil
1✔
792
                } else if !ready {
3✔
793
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s updated less than %s ago", existing.Name, routegroup.Name, c.ingressSourceSwitchTTL)
1✔
794
                        return nil
1✔
795
                }
1✔
796
        }
797
        err := c.client.NetworkingV1().Ingresses(existing.Namespace).Delete(ctx, existing.Name, metav1.DeleteOptions{})
1✔
798
        if err != nil {
1✔
799
                return err
×
800
        }
×
801
        c.recorder.Eventf(
1✔
802
                stackset,
1✔
803
                v1.EventTypeNormal,
1✔
804
                "DeletedIngress",
1✔
805
                "Deleted Ingress %s",
1✔
806
                existing.Namespace)
1✔
807
        return nil
1✔
808
}
809

810
// AddUpdateStackSetRouteGroup reconciles the RouteGroup but never deletes it, it returns the existing/new RouteGroup
811
func (c *StackSetController) AddUpdateStackSetRouteGroup(ctx context.Context, stackset *zv1.StackSet, existing *rgv1.RouteGroup, ingress *networking.Ingress, rg *rgv1.RouteGroup) (*rgv1.RouteGroup, error) {
1✔
812
        // RouteGroup removed, handled outside
1✔
813
        if rg == nil {
2✔
814
                return existing, nil
1✔
815
        }
1✔
816

817
        // Create new RouteGroup
818
        if existing == nil {
2✔
819
                if rg.Annotations == nil {
2✔
820
                        rg.Annotations = make(map[string]string)
1✔
821
                }
1✔
822
                rg.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
823

1✔
824
                createdRg, err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Create(ctx, rg, metav1.CreateOptions{})
1✔
825
                if err != nil {
1✔
826
                        return nil, err
×
827
                }
×
828
                c.recorder.Eventf(
1✔
829
                        stackset,
1✔
830
                        v1.EventTypeNormal,
1✔
831
                        "CreatedRouteGroup",
1✔
832
                        "Created RouteGroup %s",
1✔
833
                        rg.Name)
1✔
834
                return createdRg, nil
1✔
835
        }
836

837
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
838
        if existingHaveUpdateTimeStamp {
2✔
839
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
840
        }
1✔
841

842
        // Check if we need to update the RouteGroup
843
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(rg.Spec, existing.Spec) &&
1✔
844
                equality.Semantic.DeepEqual(rg.Annotations, existing.Annotations) &&
1✔
845
                equality.Semantic.DeepEqual(rg.Labels, existing.Labels) {
2✔
846
                // add the annotation back after comparing
1✔
847
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
848
                return existing, nil
1✔
849
        }
1✔
850

851
        updated := existing.DeepCopy()
1✔
852
        updated.Spec = rg.Spec
1✔
853
        if rg.Annotations != nil {
1✔
854
                updated.Annotations = rg.Annotations
×
855
        } else {
1✔
856
                updated.Annotations = make(map[string]string)
1✔
857
        }
1✔
858
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
859

1✔
860
        updated.Labels = rg.Labels
1✔
861

1✔
862
        createdRg, err := c.client.RouteGroupV1().RouteGroups(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
863
        if err != nil {
1✔
864
                return nil, err
×
865
        }
×
866
        c.recorder.Eventf(
1✔
867
                stackset,
1✔
868
                v1.EventTypeNormal,
1✔
869
                "UpdatedRouteGroup",
1✔
870
                "Updated RouteGroup %s",
1✔
871
                rg.Name)
1✔
872
        return createdRg, nil
1✔
873
}
874

875
func (c *StackSetController) deleteRouteGroup(ctx context.Context, stackset *zv1.StackSet, rg *rgv1.RouteGroup, ingress *networking.Ingress) error {
1✔
876
        // Check if an ingress exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
877
        if stackset.Spec.Ingress != nil {
2✔
878
                if ingress == nil {
2✔
879
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress missing", rg.Name)
1✔
880
                        return nil
1✔
881
                }
1✔
882
                timestamp, ok := ingress.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
883
                // The only scenario version we could think of for this is
1✔
884
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
885
                //  in that case, just wait until the RouteGroup has the annotation
1✔
886
                if !ok {
2✔
887
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have the %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
888
                        return nil
1✔
889
                }
1✔
890

891
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
892
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have a valid %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
893
                        return nil
1✔
894
                } else if !ready {
3✔
895
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s updated less than %s ago", rg.Name, ingress.Name, c.ingressSourceSwitchTTL)
1✔
896
                        return nil
1✔
897
                }
1✔
898
        }
899
        err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Delete(ctx, rg.Name, metav1.DeleteOptions{})
1✔
900
        if err != nil {
1✔
901
                return err
×
902
        }
×
903
        c.recorder.Eventf(
1✔
904
                stackset,
1✔
905
                v1.EventTypeNormal,
1✔
906
                "DeletedRouteGroup",
1✔
907
                "Deleted RouteGroup %s",
1✔
908
                rg.Namespace)
1✔
909
        return nil
1✔
910
}
911

912
func (c *StackSetController) ReconcileStackSetIngressSources(
913
        ctx context.Context,
914
        stackset *zv1.StackSet,
915
        existingIng *networking.Ingress,
916
        existingRg *rgv1.RouteGroup,
917
        generateIng func() (*networking.Ingress, error),
918
        generateRg func() (*rgv1.RouteGroup, error),
919
) error {
1✔
920
        ingress, err := generateIng()
1✔
921
        if err != nil {
1✔
922
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
923
        }
×
924

925
        // opt-out existingIng creation in case we have an external entity creating existingIng
926
        appliedIng, err := c.AddUpdateStackSetIngress(ctx, stackset, existingIng, existingRg, ingress)
1✔
927
        if err != nil {
1✔
928
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
929
        }
×
930

931
        rg, err := generateRg()
1✔
932
        if err != nil {
1✔
933
                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
934
        }
×
935

936
        var appliedRg *rgv1.RouteGroup
1✔
937
        if c.routeGroupSupportEnabled {
2✔
938
                appliedRg, err = c.AddUpdateStackSetRouteGroup(ctx, stackset, existingRg, appliedIng, rg)
1✔
939
                if err != nil {
1✔
940
                        return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
941
                }
×
942
        }
943

944
        // Ingress removed
945
        if ingress == nil {
2✔
946
                if existingIng != nil {
2✔
947
                        err := c.deleteIngress(ctx, stackset, existingIng, appliedRg)
1✔
948
                        if err != nil {
1✔
949
                                return c.errorEventf(stackset, "FailedManageIngress", err)
×
950
                        }
×
951
                }
952
        }
953

954
        // RouteGroup removed
955
        if rg == nil {
2✔
956
                if existingRg != nil {
2✔
957
                        err := c.deleteRouteGroup(ctx, stackset, existingRg, appliedIng)
1✔
958
                        if err != nil {
1✔
959
                                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
960
                        }
×
961
                }
962
        }
963

964
        return nil
1✔
965
}
966

967
func (c *StackSetController) ReconcileStackSetResources(ctx context.Context, ssc *core.StackSetContainer) error {
×
968
        err := c.ReconcileStackSetIngressSources(ctx, ssc.StackSet, ssc.Ingress, ssc.RouteGroup, ssc.GenerateIngress, ssc.GenerateRouteGroup)
×
969
        if err != nil {
×
970
                return err
×
971
        }
×
972

973
        trafficChanges := ssc.TrafficChanges()
×
974
        if len(trafficChanges) != 0 {
×
975
                var changeMessages []string
×
976
                for _, change := range trafficChanges {
×
977
                        changeMessages = append(changeMessages, change.String())
×
978
                }
×
979

980
                c.recorder.Eventf(
×
981
                        ssc.StackSet,
×
982
                        v1.EventTypeNormal,
×
983
                        "TrafficSwitched",
×
984
                        "Switched traffic: %s",
×
985
                        strings.Join(changeMessages, ", "))
×
986
        }
987

988
        return nil
×
989
}
990

991
func (c *StackSetController) ReconcileStackSetDesiredTraffic(ctx context.Context, existing *zv1.StackSet, generateUpdated func() []*zv1.DesiredTraffic) error {
1✔
992
        updatedTraffic := generateUpdated()
1✔
993

1✔
994
        if equality.Semantic.DeepEqual(existing.Spec.Traffic, updatedTraffic) {
1✔
995
                return nil
×
996
        }
×
997

998
        updated := existing.DeepCopy()
1✔
999
        updated.Spec.Traffic = updatedTraffic
1✔
1000

1✔
1001
        _, err := c.client.ZalandoV1().StackSets(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
1002
        if err != nil {
1✔
1003
                return err
×
1004
        }
×
1005
        c.recorder.Eventf(
1✔
1006
                updated,
1✔
1007
                v1.EventTypeNormal,
1✔
1008
                "UpdatedStackSet",
1✔
1009
                "Updated StackSet %s",
1✔
1010
                updated.Name)
1✔
1011
        return nil
1✔
1012
}
1013

1014
func (c *StackSetController) ReconcileStackResources(ctx context.Context, ssc *core.StackSetContainer, sc *core.StackContainer) error {
×
NEW
1015
        if c.configMapSupportEnabled {
×
NEW
1016
                err := c.ReconcileStackConfigMap(ctx, sc.Stack, sc.Resources.ConfigMaps, sc.UpdateObjectMeta)
×
NEW
1017
                if err != nil {
×
NEW
1018
                        return c.errorEventf(sc.Stack, "FailedManageConfigMap", err)
×
NEW
1019
                }
×
1020
        }
1021

1022
        err := c.ReconcileStackDeployment(ctx, sc.Stack, sc.Resources.Deployment, sc.GenerateDeployment)
×
1023
        if err != nil {
×
1024
                return c.errorEventf(sc.Stack, "FailedManageDeployment", err)
×
1025
        }
×
1026
        err = c.ReconcileStackHPA(ctx, sc.Stack, sc.Resources.HPA, sc.GenerateHPA)
×
1027
        if err != nil {
×
1028
                return c.errorEventf(sc.Stack, "FailedManageHPA", err)
×
1029
        }
×
1030

1031
        err = c.ReconcileStackService(ctx, sc.Stack, sc.Resources.Service, sc.GenerateService)
×
1032
        if err != nil {
×
1033
                return c.errorEventf(sc.Stack, "FailedManageService", err)
×
1034
        }
×
1035

1036
        err = c.ReconcileStackIngress(ctx, sc.Stack, sc.Resources.Ingress, sc.GenerateIngress)
×
1037
        if err != nil {
×
1038
                return c.errorEventf(sc.Stack, "FailedManageIngress", err)
×
1039
        }
×
1040

1041
        if c.routeGroupSupportEnabled {
×
1042
                err = c.ReconcileStackRouteGroup(ctx, sc.Stack, sc.Resources.RouteGroup, sc.GenerateRouteGroup)
×
1043
                if err != nil {
×
1044
                        return c.errorEventf(sc.Stack, "FailedManageRouteGroup", err)
×
1045
                }
×
1046
        }
UNCOV
1047
        return nil
×
1048
}
1049

1050
// ReconcileStackSet reconciles all the things from a stackset
1051
func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *core.StackSetContainer) (err error) {
×
1052
        defer func() {
×
1053
                if r := recover(); r != nil {
×
1054
                        c.metricsReporter.ReportPanic()
×
1055
                        c.stacksetLogger(container).Errorf("Encountered a panic while processing a stackset: %v\n%s", r, debug.Stack())
×
1056
                        err = fmt.Errorf("panic: %v", r)
×
1057
                }
×
1058
        }()
1059

1060
        // Create current stack, if needed. Proceed on errors.
1061
        err = c.CreateCurrentStack(ctx, container)
×
1062
        if err != nil {
×
1063
                err = c.errorEventf(container.StackSet, "FailedCreateStack", err)
×
1064
                c.stacksetLogger(container).Errorf("Unable to create stack: %v", err)
×
1065
        }
×
1066

1067
        // Update statuses from external resources (ingresses, deployments, etc). Abort on errors.
1068
        err = container.UpdateFromResources()
×
1069
        if err != nil {
×
1070
                return err
×
1071
        }
×
1072

1073
        // Update the stacks with the currently selected traffic reconciler. Proceed on errors.
1074
        err = container.ManageTraffic(time.Now())
×
1075
        if err != nil {
×
1076
                c.stacksetLogger(container).Errorf("Traffic reconciliation failed: %v", err)
×
1077
                c.recorder.Eventf(
×
1078
                        container.StackSet,
×
1079
                        v1.EventTypeWarning,
×
1080
                        "TrafficNotSwitched",
×
1081
                        "Failed to switch traffic: "+err.Error())
×
1082
        }
×
1083

1084
        // Mark stacks that should be removed
1085
        container.MarkExpiredStacks()
×
1086

×
1087
        // Reconcile stack resources. Proceed on errors.
×
1088
        for _, sc := range container.StackContainers {
×
1089
                err = c.ReconcileStackResources(ctx, container, sc)
×
1090
                if err != nil {
×
1091
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
1092
                        c.stackLogger(container, sc).Errorf("Unable to reconcile stack resources: %v", err)
×
1093
                }
×
1094
        }
1095

1096
        // Reconcile stackset resources (update ingress and/or routegroups). Proceed on errors.
1097
        err = c.ReconcileStackSetResources(ctx, container)
×
1098
        if err != nil {
×
1099
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1100
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset resources: %v", err)
×
1101
        }
×
1102

1103
        // Reconcile desired traffic in the stackset. Proceed on errors.
1104
        err = c.ReconcileStackSetDesiredTraffic(ctx, container.StackSet, container.GenerateStackSetTraffic)
×
1105
        if err != nil {
×
1106
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1107
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset traffic: %v", err)
×
1108
        }
×
1109

1110
        // Delete old stacks. Proceed on errors.
1111
        err = c.CleanupOldStacks(ctx, container)
×
1112
        if err != nil {
×
1113
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1114
                c.stacksetLogger(container).Errorf("Unable to delete old stacks: %v", err)
×
1115
        }
×
1116

1117
        // Update statuses.
1118
        err = c.ReconcileStatuses(ctx, container)
×
1119
        if err != nil {
×
1120
                return err
×
1121
        }
×
1122

1123
        return nil
×
1124
}
1125

1126
// getResetMinReplicasDelay parses and returns the reset delay if set in the
1127
// stackset annotation.
1128
func getResetMinReplicasDelay(annotations map[string]string) (time.Duration, bool) {
1✔
1129
        resetDelayStr, ok := annotations[ResetHPAMinReplicasDelayAnnotationKey]
1✔
1130
        if !ok {
2✔
1131
                return 0, false
1✔
1132
        }
1✔
1133
        resetDelay, err := time.ParseDuration(resetDelayStr)
1✔
1134
        if err != nil {
1✔
1135
                return 0, false
×
1136
        }
×
1137
        return resetDelay, true
1✔
1138
}
1139

1140
func fixupStackSetTypeMeta(stackset *zv1.StackSet) {
1✔
1141
        // set TypeMeta manually because of this bug:
1✔
1142
        // https://github.com/kubernetes/client-go/issues/308
1✔
1143
        stackset.APIVersion = core.APIVersion
1✔
1144
        stackset.Kind = core.KindStackSet
1✔
1145
}
1✔
1146

1147
func fixupStackTypeMeta(stack *zv1.Stack) {
1✔
1148
        // set TypeMeta manually because of this bug:
1✔
1149
        // https://github.com/kubernetes/client-go/issues/308
1✔
1150
        stack.APIVersion = core.APIVersion
1✔
1151
        stack.Kind = core.KindStack
1✔
1152
}
1✔
1153

1154
func resourceReady(timestamp string, ttl time.Duration) (bool, error) {
1✔
1155
        resourceLastUpdated, err := time.Parse(time.RFC3339, timestamp)
1✔
1156
        if err != nil {
2✔
1157
                // wait until there's a valid timestamp on the annotation
1✔
1158
                return false, err
1✔
1159
        }
1✔
1160

1161
        if !resourceLastUpdated.IsZero() && time.Since(resourceLastUpdated) > ttl {
2✔
1162
                return true, nil
1✔
1163
        }
1✔
1164

1165
        return false, nil
1✔
1166
}
1167

1168
// validateConfigurationResourceNames returns an error if any ConfigurationResource name is not prefixed by Stack name.
1169
func validateConfigurationResourceNames(stack *zv1.Stack) error {
1✔
1170
        for _, rsc := range stack.Spec.ConfigurationResources {
2✔
1171
                rscName := rsc.ConfigMapRef.Name
1✔
1172
                if !strings.HasPrefix(rscName, stack.Name) {
1✔
NEW
1173
                        return fmt.Errorf("ConfigurationResource name must be prefixed by Stack name. "+
×
NEW
1174
                                "ConfigurationResource: %s, Stack: %s", rscName, stack.Name)
×
NEW
1175
                }
×
1176
        }
1177

1178
        return nil
1✔
1179
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc