• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando-incubator / stackset-controller / 6743896547

01 Nov 2023 03:07PM UTC coverage: 73.518% (+0.3%) from 73.218%
6743896547

Pull #520

github

katyanna
Check ownerReference on reconcile tests
Pull Request #520: Reference ConfigMap for per stack versioning

110 of 137 new or added lines in 4 files covered. (80.29%)

2 existing lines in 1 file now uncovered.

2357 of 3206 relevant lines covered (73.52%)

0.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.7
/controller/stackset.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "net/http"
7
        "runtime/debug"
8
        "strings"
9
        "sync"
10
        "time"
11

12
        "github.com/google/go-cmp/cmp"
13
        "github.com/google/go-cmp/cmp/cmpopts"
14
        "github.com/heptiolabs/healthcheck"
15
        "github.com/prometheus/client_golang/prometheus"
16
        log "github.com/sirupsen/logrus"
17
        rgv1 "github.com/szuecs/routegroup-client/apis/zalando.org/v1"
18
        zv1 "github.com/zalando-incubator/stackset-controller/pkg/apis/zalando.org/v1"
19
        "github.com/zalando-incubator/stackset-controller/pkg/clientset"
20
        "github.com/zalando-incubator/stackset-controller/pkg/core"
21
        "github.com/zalando-incubator/stackset-controller/pkg/recorder"
22
        "golang.org/x/sync/errgroup"
23
        v1 "k8s.io/api/core/v1"
24
        networking "k8s.io/api/networking/v1"
25
        "k8s.io/apimachinery/pkg/api/equality"
26
        "k8s.io/apimachinery/pkg/api/errors"
27
        "k8s.io/apimachinery/pkg/api/resource"
28
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
        "k8s.io/apimachinery/pkg/fields"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        "k8s.io/client-go/tools/cache"
33
        kube_record "k8s.io/client-go/tools/record"
34
)
35

36
const (
37
        PrescaleStacksAnnotationKey               = "alpha.stackset-controller.zalando.org/prescale-stacks"
38
        ResetHPAMinReplicasDelayAnnotationKey     = "alpha.stackset-controller.zalando.org/reset-hpa-min-replicas-delay"
39
        StacksetControllerControllerAnnotationKey = "stackset-controller.zalando.org/controller"
40
        ControllerLastUpdatedAnnotationKey        = "stackset-controller.zalando.org/updated-timestamp"
41

42
        reasonFailedManageStackSet = "FailedManageStackSet"
43

44
        defaultResetMinReplicasDelay = 10 * time.Minute
45
)
46

47
// StackSetController is the main controller. It watches for changes to
48
// stackset resources and starts and maintains other controllers per
49
// stackset resource.
50
type StackSetController struct {
51
        logger                      *log.Entry
52
        client                      clientset.Interface
53
        controllerID                string
54
        backendWeightsAnnotationKey string
55
        clusterDomains              []string
56
        interval                    time.Duration
57
        stacksetEvents              chan stacksetEvent
58
        stacksetStore               map[types.UID]zv1.StackSet
59
        recorder                    kube_record.EventRecorder
60
        metricsReporter             *core.MetricsReporter
61
        HealthReporter              healthcheck.Handler
62
        routeGroupSupportEnabled    bool
63
        ingressSourceSwitchTTL      time.Duration
64
        now                         func() string
65
        reconcileWorkers            int
66
        configMapSupportEnabled     bool
67
        sync.Mutex
68
}
69

70
type stacksetEvent struct {
71
        Deleted  bool
72
        StackSet *zv1.StackSet
73
}
74

75
// eventedError wraps an error that was already exposed as an event to the user
76
type eventedError struct {
77
        err error
78
}
79

80
func (ee *eventedError) Error() string {
×
81
        return ee.err.Error()
×
82
}
×
83

84
func now() string {
×
85
        return time.Now().Format(time.RFC3339)
×
86
}
×
87

88
// NewStackSetController initializes a new StackSetController.
89
func NewStackSetController(
90
        client clientset.Interface,
91
        controllerID string,
92
        parallelWork int,
93
        backendWeightsAnnotationKey string,
94
        clusterDomains []string,
95
        registry prometheus.Registerer,
96
        interval time.Duration,
97
        routeGroupSupportEnabled bool,
98
        ingressSourceSwitchTTL time.Duration,
99
        configMapSupportEnabled bool,
100
) (*StackSetController, error) {
1✔
101
        metricsReporter, err := core.NewMetricsReporter(registry)
1✔
102
        if err != nil {
1✔
103
                return nil, err
×
104
        }
×
105

106
        return &StackSetController{
1✔
107
                logger:                      log.WithFields(log.Fields{"controller": "stackset"}),
1✔
108
                client:                      client,
1✔
109
                controllerID:                controllerID,
1✔
110
                backendWeightsAnnotationKey: backendWeightsAnnotationKey,
1✔
111
                clusterDomains:              clusterDomains,
1✔
112
                interval:                    interval,
1✔
113
                stacksetEvents:              make(chan stacksetEvent, 1),
1✔
114
                stacksetStore:               make(map[types.UID]zv1.StackSet),
1✔
115
                recorder:                    recorder.CreateEventRecorder(client),
1✔
116
                metricsReporter:             metricsReporter,
1✔
117
                HealthReporter:              healthcheck.NewHandler(),
1✔
118
                routeGroupSupportEnabled:    routeGroupSupportEnabled,
1✔
119
                ingressSourceSwitchTTL:      ingressSourceSwitchTTL,
1✔
120
                configMapSupportEnabled:     configMapSupportEnabled,
1✔
121
                now:                         now,
1✔
122
                reconcileWorkers:            parallelWork,
1✔
123
        }, nil
1✔
124
}
125

126
func (c *StackSetController) stacksetLogger(ssc *core.StackSetContainer) *log.Entry {
×
127
        return c.logger.WithFields(map[string]interface{}{
×
128
                "namespace": ssc.StackSet.Namespace,
×
129
                "stackset":  ssc.StackSet.Name,
×
130
        })
×
131
}
×
132

133
func (c *StackSetController) stackLogger(ssc *core.StackSetContainer, sc *core.StackContainer) *log.Entry {
×
134
        return c.logger.WithFields(map[string]interface{}{
×
135
                "namespace": ssc.StackSet.Namespace,
×
136
                "stackset":  ssc.StackSet.Name,
×
137
                "stack":     sc.Name(),
×
138
        })
×
139
}
×
140

141
// Run runs the main loop of the StackSetController. Before the loops it
142
// sets up a watcher to watch StackSet resources. The watch will send
143
// changes over a channel which is polled from the main loop.
144
func (c *StackSetController) Run(ctx context.Context) {
×
145
        var nextCheck time.Time
×
146

×
147
        // We're not alive if nextCheck is too far in the past
×
148
        c.HealthReporter.AddLivenessCheck("nextCheck", func() error {
×
149
                if time.Since(nextCheck) > 5*c.interval {
×
150
                        return fmt.Errorf("nextCheck too old")
×
151
                }
×
152
                return nil
×
153
        })
154

155
        c.startWatch(ctx)
×
156

×
157
        http.HandleFunc("/healthz", c.HealthReporter.LiveEndpoint)
×
158

×
159
        nextCheck = time.Now().Add(-c.interval)
×
160

×
161
        for {
×
162
                select {
×
163
                case <-time.After(time.Until(nextCheck)):
×
164

×
165
                        nextCheck = time.Now().Add(c.interval)
×
166

×
167
                        stackSetContainers, err := c.collectResources(ctx)
×
168
                        if err != nil {
×
169
                                c.logger.Errorf("Failed to collect resources: %v", err)
×
170
                                continue
×
171
                        }
172

173
                        var reconcileGroup errgroup.Group
×
174
                        reconcileGroup.SetLimit(c.reconcileWorkers)
×
175
                        for stackset, container := range stackSetContainers {
×
176
                                container := container
×
177
                                stackset := stackset
×
178

×
179
                                reconcileGroup.Go(func() error {
×
180
                                        if _, ok := c.stacksetStore[stackset]; ok {
×
181
                                                err := c.ReconcileStackSet(ctx, container)
×
182
                                                if err != nil {
×
183
                                                        c.stacksetLogger(container).Errorf("unable to reconcile a stackset: %v", err)
×
184
                                                        return c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
185
                                                }
×
186
                                        }
187
                                        return nil
×
188
                                })
189
                        }
190

191
                        err = reconcileGroup.Wait()
×
192
                        if err != nil {
×
193
                                c.logger.Errorf("Failed waiting for reconcilers: %v", err)
×
194
                        }
×
195
                        err = c.metricsReporter.Report(stackSetContainers)
×
196
                        if err != nil {
×
197
                                c.logger.Errorf("Failed reporting metrics: %v", err)
×
198
                        }
×
199
                case e := <-c.stacksetEvents:
×
200
                        stackset := *e.StackSet
×
201
                        fixupStackSetTypeMeta(&stackset)
×
202

×
203
                        // update/delete existing entry
×
204
                        if _, ok := c.stacksetStore[stackset.UID]; ok {
×
205
                                if e.Deleted || !c.hasOwnership(&stackset) {
×
206
                                        delete(c.stacksetStore, stackset.UID)
×
207
                                        continue
×
208
                                }
209

210
                                // update stackset entry
211
                                c.stacksetStore[stackset.UID] = stackset
×
212
                                continue
×
213
                        }
214

215
                        // check if stackset should be managed by the controller
216
                        if !c.hasOwnership(&stackset) {
×
217
                                continue
×
218
                        }
219

220
                        c.logger.Infof("Adding entry for StackSet %s/%s", stackset.Namespace, stackset.Name)
×
221
                        c.stacksetStore[stackset.UID] = stackset
×
222
                case <-ctx.Done():
×
223
                        c.logger.Info("Terminating main controller loop.")
×
224
                        return
×
225
                }
226
        }
227
}
228

229
// collectResources collects resources for all stacksets at once and stores them per StackSet/Stack so that we don't
230
// overload the API requests with unnecessary requests
231
func (c *StackSetController) collectResources(ctx context.Context) (map[types.UID]*core.StackSetContainer, error) {
1✔
232
        stacksets := make(map[types.UID]*core.StackSetContainer, len(c.stacksetStore))
1✔
233
        for uid, stackset := range c.stacksetStore {
2✔
234
                stackset := stackset
1✔
235

1✔
236
                reconciler := core.TrafficReconciler(&core.SimpleTrafficReconciler{})
1✔
237

1✔
238
                // use prescaling logic if enabled with an annotation
1✔
239
                if _, ok := stackset.Annotations[PrescaleStacksAnnotationKey]; ok {
2✔
240
                        resetDelay := defaultResetMinReplicasDelay
1✔
241
                        if resetDelayValue, ok := getResetMinReplicasDelay(stackset.Annotations); ok {
2✔
242
                                resetDelay = resetDelayValue
1✔
243
                        }
1✔
244
                        reconciler = &core.PrescalingTrafficReconciler{
1✔
245
                                ResetHPAMinReplicasTimeout: resetDelay,
1✔
246
                        }
1✔
247
                }
248

249
                stacksetContainer := core.NewContainer(&stackset, reconciler, c.backendWeightsAnnotationKey, c.clusterDomains)
1✔
250
                stacksets[uid] = stacksetContainer
1✔
251
        }
252

253
        err := c.collectStacks(ctx, stacksets)
1✔
254
        if err != nil {
1✔
255
                return nil, err
×
256
        }
×
257

258
        err = c.collectIngresses(ctx, stacksets)
1✔
259
        if err != nil {
1✔
260
                return nil, err
×
261
        }
×
262

263
        if c.routeGroupSupportEnabled {
2✔
264
                err = c.collectRouteGroups(ctx, stacksets)
1✔
265
                if err != nil {
1✔
266
                        return nil, err
×
267
                }
×
268
        }
269

270
        err = c.collectDeployments(ctx, stacksets)
1✔
271
        if err != nil {
1✔
272
                return nil, err
×
273
        }
×
274

275
        err = c.collectServices(ctx, stacksets)
1✔
276
        if err != nil {
1✔
277
                return nil, err
×
278
        }
×
279

280
        err = c.collectHPAs(ctx, stacksets)
1✔
281
        if err != nil {
1✔
282
                return nil, err
×
283
        }
×
284

285
        if c.configMapSupportEnabled {
2✔
286
                err = c.collectConfigMaps(ctx, stacksets)
1✔
287
                if err != nil {
1✔
NEW
288
                        return nil, err
×
NEW
289
                }
×
290
        }
291

292
        return stacksets, nil
1✔
293
}
294

295
func (c *StackSetController) collectIngresses(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
296
        ingresses, err := c.client.NetworkingV1().Ingresses(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
297
        if err != nil {
1✔
298
                return fmt.Errorf("failed to list Ingresses: %v", err)
×
299
        }
×
300

301
Items:
1✔
302
        for _, i := range ingresses.Items {
2✔
303
                ingress := i
1✔
304
                if uid, ok := getOwnerUID(ingress.ObjectMeta); ok {
2✔
305
                        // stackset ingress
1✔
306
                        if s, ok := stacksets[uid]; ok {
2✔
307
                                s.Ingress = &ingress
1✔
308
                                continue
1✔
309
                        }
310

311
                        // stack ingress
312
                        for _, stackset := range stacksets {
2✔
313
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
314
                                        s.Resources.Ingress = &ingress
1✔
315
                                        continue Items
1✔
316
                                }
317
                        }
318
                }
319
        }
320
        return nil
1✔
321
}
322

323
func (c *StackSetController) collectRouteGroups(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
324
        routegroups, err := c.client.RouteGroupV1().RouteGroups(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
325
        if err != nil {
1✔
326
                return fmt.Errorf("failed to list RouteGroups: %v", err)
×
327
        }
×
328

329
Items:
1✔
330
        for _, rg := range routegroups.Items {
2✔
331
                routegroup := rg
1✔
332
                if uid, ok := getOwnerUID(routegroup.ObjectMeta); ok {
2✔
333
                        // stackset routegroups
1✔
334
                        if s, ok := stacksets[uid]; ok {
2✔
335
                                s.RouteGroup = &routegroup
1✔
336
                                continue
1✔
337
                        }
338

339
                        // stack routegroups
340
                        for _, stackset := range stacksets {
2✔
341
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
342
                                        s.Resources.RouteGroup = &routegroup
1✔
343
                                        continue Items
1✔
344
                                }
345
                        }
346
                }
347
        }
348
        return nil
1✔
349
}
350

351
func (c *StackSetController) collectStacks(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
352
        stacks, err := c.client.ZalandoV1().Stacks(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
353
        if err != nil {
1✔
354
                return fmt.Errorf("failed to list Stacks: %v", err)
×
355
        }
×
356

357
        for _, stack := range stacks.Items {
2✔
358
                if uid, ok := getOwnerUID(stack.ObjectMeta); ok {
2✔
359
                        if s, ok := stacksets[uid]; ok {
2✔
360
                                stack := stack
1✔
361
                                fixupStackTypeMeta(&stack)
1✔
362

1✔
363
                                s.StackContainers[stack.UID] = &core.StackContainer{
1✔
364
                                        Stack: &stack,
1✔
365
                                }
1✔
366
                                continue
1✔
367
                        }
368
                }
369
        }
370
        return nil
1✔
371
}
372

373
func (c *StackSetController) collectDeployments(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
374
        deployments, err := c.client.AppsV1().Deployments(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
375
        if err != nil {
1✔
376
                return fmt.Errorf("failed to list Deployments: %v", err)
×
377
        }
×
378

379
        for _, d := range deployments.Items {
2✔
380
                deployment := d
1✔
381
                if uid, ok := getOwnerUID(deployment.ObjectMeta); ok {
2✔
382
                        for _, stackset := range stacksets {
2✔
383
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
384
                                        s.Resources.Deployment = &deployment
1✔
385
                                        break
1✔
386
                                }
387
                        }
388
                }
389
        }
390
        return nil
1✔
391
}
392

393
func (c *StackSetController) collectServices(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
394
        services, err := c.client.CoreV1().Services(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
395
        if err != nil {
1✔
396
                return fmt.Errorf("failed to list Services: %v", err)
×
397
        }
×
398

399
Items:
1✔
400
        for _, s := range services.Items {
2✔
401
                service := s
1✔
402
                if uid, ok := getOwnerUID(service.ObjectMeta); ok {
2✔
403
                        for _, stackset := range stacksets {
2✔
404
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
405
                                        s.Resources.Service = &service
1✔
406
                                        continue Items
1✔
407
                                }
408

409
                                // service/HPA used to be owned by the deployment for some reason
410
                                for _, stack := range stackset.StackContainers {
2✔
411
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
412
                                                stack.Resources.Service = &service
1✔
413
                                                continue Items
1✔
414
                                        }
415
                                }
416
                        }
417
                }
418
        }
419
        return nil
1✔
420
}
421

422
func (c *StackSetController) collectHPAs(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
423
        hpas, err := c.client.AutoscalingV2().HorizontalPodAutoscalers(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
424
        if err != nil {
1✔
425
                return fmt.Errorf("failed to list HPAs: %v", err)
×
426
        }
×
427

428
Items:
1✔
429
        for _, h := range hpas.Items {
2✔
430
                hpa := h
1✔
431
                if uid, ok := getOwnerUID(hpa.ObjectMeta); ok {
2✔
432
                        for _, stackset := range stacksets {
2✔
433
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
434
                                        s.Resources.HPA = &hpa
1✔
435
                                        continue Items
1✔
436
                                }
437

438
                                // service/HPA used to be owned by the deployment for some reason
439
                                for _, stack := range stackset.StackContainers {
2✔
440
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
441
                                                stack.Resources.HPA = &hpa
1✔
442
                                                continue Items
1✔
443
                                        }
444
                                }
445
                        }
446
                }
447
        }
448
        return nil
1✔
449
}
450

451
func (c *StackSetController) collectConfigMaps(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
452
        configMaps, err := c.client.CoreV1().ConfigMaps(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
453
        if err != nil {
1✔
NEW
454
                return fmt.Errorf("failed to list ConfigMaps: %v", err)
×
NEW
455
        }
×
456

457
        for _, cm := range configMaps.Items {
2✔
458
                configMap := cm
1✔
459
                if uid, ok := getOwnerUID(configMap.ObjectMeta); ok {
2✔
460
                        for _, stackset := range stacksets {
2✔
461
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
462
                                        s.Resources.ConfigMaps = append(s.Resources.ConfigMaps, &configMap)
1✔
463
                                        break
1✔
464
                                }
465
                        }
466
                }
467
        }
468
        return nil
1✔
469
}
470

471
func getOwnerUID(objectMeta metav1.ObjectMeta) (types.UID, bool) {
1✔
472
        if len(objectMeta.OwnerReferences) == 1 {
2✔
473
                return objectMeta.OwnerReferences[0].UID, true
1✔
474
        }
1✔
475
        return "", false
1✔
476
}
477

478
func (c *StackSetController) errorEventf(object runtime.Object, reason string, err error) error {
×
479
        switch err.(type) {
×
480
        case *eventedError:
×
481
                // already notified
×
482
                return err
×
483
        default:
×
484
                c.recorder.Eventf(
×
485
                        object,
×
486
                        v1.EventTypeWarning,
×
487
                        reason,
×
488
                        err.Error())
×
489
                return &eventedError{err: err}
×
490
        }
491
}
492

493
// hasOwnership returns true if the controller is the "owner" of the stackset.
494
// Whether it's owner is determined by the value of the
495
// 'stackset-controller.zalando.org/controller' annotation. If the value
496
// matches the controllerID then it owns it, or if the controllerID is
497
// "" and there's no annotation set.
498
func (c *StackSetController) hasOwnership(stackset *zv1.StackSet) bool {
×
499
        if stackset.Annotations != nil {
×
500
                if owner, ok := stackset.Annotations[StacksetControllerControllerAnnotationKey]; ok {
×
501
                        return owner == c.controllerID
×
502
                }
×
503
        }
504
        return c.controllerID == ""
×
505
}
506

507
func (c *StackSetController) startWatch(ctx context.Context) {
×
508
        informer := cache.NewSharedIndexInformer(
×
509
                cache.NewListWatchFromClient(c.client.ZalandoV1().RESTClient(), "stacksets", v1.NamespaceAll, fields.Everything()),
×
510
                &zv1.StackSet{},
×
511
                0, // skip resync
×
512
                cache.Indexers{},
×
513
        )
×
514

×
515
        informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
516
                AddFunc:    c.add,
×
517
                UpdateFunc: c.update,
×
518
                DeleteFunc: c.del,
×
519
        })
×
520
        go informer.Run(ctx.Done())
×
521
        if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
×
522
                c.logger.Errorf("Timed out waiting for caches to sync")
×
523
                return
×
524
        }
×
525
        c.logger.Info("Synced StackSet watcher")
×
526
}
527

528
func (c *StackSetController) add(obj interface{}) {
×
529
        stackset, ok := obj.(*zv1.StackSet)
×
530
        if !ok {
×
531
                return
×
532
        }
×
533

534
        c.logger.Infof("New StackSet added %s/%s", stackset.Namespace, stackset.Name)
×
535
        c.stacksetEvents <- stacksetEvent{
×
536
                StackSet: stackset.DeepCopy(),
×
537
        }
×
538
}
539

540
func (c *StackSetController) update(oldObj, newObj interface{}) {
×
541
        newStackset, ok := newObj.(*zv1.StackSet)
×
542
        if !ok {
×
543
                return
×
544
        }
×
545

546
        oldStackset, ok := oldObj.(*zv1.StackSet)
×
547
        if !ok {
×
548
                return
×
549
        }
×
550

551
        c.logger.Debugf("StackSet %s/%s changed: %s",
×
552
                newStackset.Namespace,
×
553
                newStackset.Name,
×
554
                cmp.Diff(oldStackset, newStackset, cmpopts.IgnoreUnexported(resource.Quantity{})),
×
555
        )
×
556

×
557
        c.logger.Infof("StackSet updated %s/%s", newStackset.Namespace, newStackset.Name)
×
558
        c.stacksetEvents <- stacksetEvent{
×
559
                StackSet: newStackset.DeepCopy(),
×
560
        }
×
561
}
562

563
func (c *StackSetController) del(obj interface{}) {
×
564
        stackset, ok := obj.(*zv1.StackSet)
×
565
        if !ok {
×
566
                return
×
567
        }
×
568

569
        c.logger.Infof("StackSet deleted %s/%s", stackset.Namespace, stackset.Name)
×
570
        c.stacksetEvents <- stacksetEvent{
×
571
                StackSet: stackset.DeepCopy(),
×
572
                Deleted:  true,
×
573
        }
×
574
}
575

576
func retryUpdate(updateFn func(retry bool) error) error {
×
577
        retry := false
×
578
        for {
×
579
                err := updateFn(retry)
×
580
                if err != nil {
×
581
                        if errors.IsConflict(err) {
×
582
                                retry = true
×
583
                                continue
×
584
                        }
585
                        return err
×
586
                }
587
                return nil
×
588
        }
589
}
590

591
// ReconcileStatuses reconciles the statuses of StackSets and Stacks.
592
func (c *StackSetController) ReconcileStatuses(ctx context.Context, ssc *core.StackSetContainer) error {
×
593
        for _, sc := range ssc.StackContainers {
×
594
                stack := sc.Stack.DeepCopy()
×
595
                status := *sc.GenerateStackStatus()
×
596
                err := retryUpdate(func(retry bool) error {
×
597
                        if retry {
×
598
                                updated, err := c.client.ZalandoV1().Stacks(sc.Namespace()).Get(ctx, stack.Name, metav1.GetOptions{})
×
599
                                if err != nil {
×
600
                                        return err
×
601
                                }
×
602
                                stack = updated
×
603
                        }
604
                        if !equality.Semantic.DeepEqual(status, stack.Status) {
×
605
                                stack.Status = status
×
606
                                _, err := c.client.ZalandoV1().Stacks(sc.Namespace()).UpdateStatus(ctx, stack, metav1.UpdateOptions{})
×
607
                                return err
×
608
                        }
×
609
                        return nil
×
610
                })
611
                if err != nil {
×
612
                        return c.errorEventf(sc.Stack, "FailedUpdateStackStatus", err)
×
613
                }
×
614
        }
615

616
        stackset := ssc.StackSet.DeepCopy()
×
617
        status := *ssc.GenerateStackSetStatus()
×
618
        err := retryUpdate(func(retry bool) error {
×
619
                if retry {
×
620
                        updated, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).Get(ctx, ssc.StackSet.Name, metav1.GetOptions{})
×
621
                        if err != nil {
×
622
                                return err
×
623
                        }
×
624
                        stackset = updated
×
625
                }
626
                if !equality.Semantic.DeepEqual(status, stackset.Status) {
×
627
                        stackset.Status = status
×
628
                        _, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, stackset, metav1.UpdateOptions{})
×
629
                        return err
×
630
                }
×
631
                return nil
×
632
        })
UNCOV
633
        if err != nil {
×
634
                return c.errorEventf(ssc.StackSet, "FailedUpdateStackSetStatus", err)
×
635
        }
×
636
        return nil
×
637
}
638

639
// CreateCurrentStack creates a new Stack object for the current stack, if needed
640
func (c *StackSetController) CreateCurrentStack(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
641
        newStack, newStackVersion := ssc.NewStack()
1✔
642
        if newStack == nil {
2✔
643
                return nil
1✔
644
        }
1✔
645

646
        created, err := c.client.ZalandoV1().Stacks(newStack.Namespace()).Create(ctx, newStack.Stack, metav1.CreateOptions{})
1✔
647
        if err != nil {
1✔
648
                return err
×
649
        }
×
650
        fixupStackTypeMeta(created)
1✔
651

1✔
652
        c.recorder.Eventf(
1✔
653
                ssc.StackSet,
1✔
654
                v1.EventTypeNormal,
1✔
655
                "CreatedStack",
1✔
656
                "Created stack %s",
1✔
657
                newStack.Name())
1✔
658

1✔
659
        // Persist ObservedStackVersion in the status
1✔
660
        updated := ssc.StackSet.DeepCopy()
1✔
661
        updated.Status.ObservedStackVersion = newStackVersion
1✔
662

1✔
663
        result, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, updated, metav1.UpdateOptions{})
1✔
664
        if err != nil {
1✔
665
                return err
×
666
        }
×
667
        fixupStackSetTypeMeta(result)
1✔
668
        ssc.StackSet = result
1✔
669

1✔
670
        ssc.StackContainers[created.UID] = &core.StackContainer{
1✔
671
                Stack:          created,
1✔
672
                PendingRemoval: false,
1✔
673
                Resources:      core.StackResources{},
1✔
674
        }
1✔
675
        return nil
1✔
676
}
677

678
// CleanupOldStacks deletes stacks that are no longer needed.
679
func (c *StackSetController) CleanupOldStacks(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
680
        for _, sc := range ssc.StackContainers {
2✔
681
                if !sc.PendingRemoval {
2✔
682
                        continue
1✔
683
                }
684

685
                stack := sc.Stack
1✔
686
                err := c.client.ZalandoV1().Stacks(stack.Namespace).Delete(ctx, stack.Name, metav1.DeleteOptions{})
1✔
687
                if err != nil {
1✔
688
                        return c.errorEventf(ssc.StackSet, "FailedDeleteStack", err)
×
689
                }
×
690
                c.recorder.Eventf(
1✔
691
                        ssc.StackSet,
1✔
692
                        v1.EventTypeNormal,
1✔
693
                        "DeletedExcessStack",
1✔
694
                        "Deleted excess stack %s",
1✔
695
                        stack.Name)
1✔
696
        }
697

698
        return nil
1✔
699
}
700

701
// AddUpdateStackSetIngress reconciles the Ingress but never deletes it, it returns the existing/new Ingress
702
func (c *StackSetController) AddUpdateStackSetIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup, ingress *networking.Ingress) (*networking.Ingress, error) {
1✔
703
        // Ingress removed, handled outside
1✔
704
        if ingress == nil {
2✔
705
                return existing, nil
1✔
706
        }
1✔
707

708
        // Create new Ingress
709
        if existing == nil {
2✔
710
                if ingress.Annotations == nil {
2✔
711
                        ingress.Annotations = make(map[string]string)
1✔
712
                }
1✔
713
                ingress.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
714

1✔
715
                createdIng, err := c.client.NetworkingV1().Ingresses(ingress.Namespace).Create(ctx, ingress, metav1.CreateOptions{})
1✔
716
                if err != nil {
1✔
717
                        return nil, err
×
718
                }
×
719
                c.recorder.Eventf(
1✔
720
                        stackset,
1✔
721
                        v1.EventTypeNormal,
1✔
722
                        "CreatedIngress",
1✔
723
                        "Created Ingress %s",
1✔
724
                        ingress.Name)
1✔
725
                return createdIng, nil
1✔
726
        }
727

728
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
729
        if existingHaveUpdateTimeStamp {
2✔
730
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
731
        }
1✔
732

733
        // Check if we need to update the Ingress
734
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(ingress.Spec, existing.Spec) &&
1✔
735
                equality.Semantic.DeepEqual(ingress.Annotations, existing.Annotations) &&
1✔
736
                equality.Semantic.DeepEqual(ingress.Labels, existing.Labels) {
2✔
737
                // add the annotation back after comparing
1✔
738
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
739
                return existing, nil
1✔
740
        }
1✔
741

742
        updated := existing.DeepCopy()
1✔
743
        updated.Spec = ingress.Spec
1✔
744
        if ingress.Annotations != nil {
2✔
745
                updated.Annotations = ingress.Annotations
1✔
746
        } else {
2✔
747
                updated.Annotations = make(map[string]string)
1✔
748
        }
1✔
749
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
750

1✔
751
        updated.Labels = ingress.Labels
1✔
752

1✔
753
        createdIngress, err := c.client.NetworkingV1().Ingresses(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
754
        if err != nil {
1✔
755
                return nil, err
×
756
        }
×
757
        c.recorder.Eventf(
1✔
758
                stackset,
1✔
759
                v1.EventTypeNormal,
1✔
760
                "UpdatedIngress",
1✔
761
                "Updated Ingress %s",
1✔
762
                ingress.Name)
1✔
763
        return createdIngress, nil
1✔
764
}
765

766
func (c *StackSetController) deleteIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup) error {
1✔
767
        // Check if a routegroup exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
768
        if stackset.Spec.RouteGroup != nil && c.routeGroupSupportEnabled {
2✔
769
                if routegroup == nil {
2✔
770
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup missing", existing.Name)
1✔
771
                        return nil
1✔
772
                }
1✔
773
                timestamp, ok := routegroup.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
774
                // The only scenario version we could think of for this is
1✔
775
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
776
                //  in that case, just wait until the RouteGroup has the annotation
1✔
777
                if !ok {
2✔
778
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have the %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
779
                        return nil
1✔
780
                }
1✔
781

782
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
783
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have a valid %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
784
                        return nil
1✔
785
                } else if !ready {
3✔
786
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s updated less than %s ago", existing.Name, routegroup.Name, c.ingressSourceSwitchTTL)
1✔
787
                        return nil
1✔
788
                }
1✔
789
        }
790
        err := c.client.NetworkingV1().Ingresses(existing.Namespace).Delete(ctx, existing.Name, metav1.DeleteOptions{})
1✔
791
        if err != nil {
1✔
792
                return err
×
793
        }
×
794
        c.recorder.Eventf(
1✔
795
                stackset,
1✔
796
                v1.EventTypeNormal,
1✔
797
                "DeletedIngress",
1✔
798
                "Deleted Ingress %s",
1✔
799
                existing.Namespace)
1✔
800
        return nil
1✔
801
}
802

803
// AddUpdateStackSetRouteGroup reconciles the RouteGroup but never deletes it, it returns the existing/new RouteGroup
804
func (c *StackSetController) AddUpdateStackSetRouteGroup(ctx context.Context, stackset *zv1.StackSet, existing *rgv1.RouteGroup, ingress *networking.Ingress, rg *rgv1.RouteGroup) (*rgv1.RouteGroup, error) {
1✔
805
        // RouteGroup removed, handled outside
1✔
806
        if rg == nil {
2✔
807
                return existing, nil
1✔
808
        }
1✔
809

810
        // Create new RouteGroup
811
        if existing == nil {
2✔
812
                if rg.Annotations == nil {
2✔
813
                        rg.Annotations = make(map[string]string)
1✔
814
                }
1✔
815
                rg.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
816

1✔
817
                createdRg, err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Create(ctx, rg, metav1.CreateOptions{})
1✔
818
                if err != nil {
1✔
819
                        return nil, err
×
820
                }
×
821
                c.recorder.Eventf(
1✔
822
                        stackset,
1✔
823
                        v1.EventTypeNormal,
1✔
824
                        "CreatedRouteGroup",
1✔
825
                        "Created RouteGroup %s",
1✔
826
                        rg.Name)
1✔
827
                return createdRg, nil
1✔
828
        }
829

830
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
831
        if existingHaveUpdateTimeStamp {
2✔
832
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
833
        }
1✔
834

835
        // Check if we need to update the RouteGroup
836
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(rg.Spec, existing.Spec) &&
1✔
837
                equality.Semantic.DeepEqual(rg.Annotations, existing.Annotations) &&
1✔
838
                equality.Semantic.DeepEqual(rg.Labels, existing.Labels) {
2✔
839
                // add the annotation back after comparing
1✔
840
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
841
                return existing, nil
1✔
842
        }
1✔
843

844
        updated := existing.DeepCopy()
1✔
845
        updated.Spec = rg.Spec
1✔
846
        if rg.Annotations != nil {
1✔
847
                updated.Annotations = rg.Annotations
×
848
        } else {
1✔
849
                updated.Annotations = make(map[string]string)
1✔
850
        }
1✔
851
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
852

1✔
853
        updated.Labels = rg.Labels
1✔
854

1✔
855
        createdRg, err := c.client.RouteGroupV1().RouteGroups(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
856
        if err != nil {
1✔
857
                return nil, err
×
858
        }
×
859
        c.recorder.Eventf(
1✔
860
                stackset,
1✔
861
                v1.EventTypeNormal,
1✔
862
                "UpdatedRouteGroup",
1✔
863
                "Updated RouteGroup %s",
1✔
864
                rg.Name)
1✔
865
        return createdRg, nil
1✔
866
}
867

868
func (c *StackSetController) deleteRouteGroup(ctx context.Context, stackset *zv1.StackSet, rg *rgv1.RouteGroup, ingress *networking.Ingress) error {
1✔
869
        // Check if an ingress exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
870
        if stackset.Spec.Ingress != nil {
2✔
871
                if ingress == nil {
2✔
872
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress missing", rg.Name)
1✔
873
                        return nil
1✔
874
                }
1✔
875
                timestamp, ok := ingress.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
876
                // The only scenario version we could think of for this is
1✔
877
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
878
                //  in that case, just wait until the RouteGroup has the annotation
1✔
879
                if !ok {
2✔
880
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have the %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
881
                        return nil
1✔
882
                }
1✔
883

884
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
885
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have a valid %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
886
                        return nil
1✔
887
                } else if !ready {
3✔
888
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s updated less than %s ago", rg.Name, ingress.Name, c.ingressSourceSwitchTTL)
1✔
889
                        return nil
1✔
890
                }
1✔
891
        }
892
        err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Delete(ctx, rg.Name, metav1.DeleteOptions{})
1✔
893
        if err != nil {
1✔
894
                return err
×
895
        }
×
896
        c.recorder.Eventf(
1✔
897
                stackset,
1✔
898
                v1.EventTypeNormal,
1✔
899
                "DeletedRouteGroup",
1✔
900
                "Deleted RouteGroup %s",
1✔
901
                rg.Namespace)
1✔
902
        return nil
1✔
903
}
904

905
func (c *StackSetController) ReconcileStackSetIngressSources(
906
        ctx context.Context,
907
        stackset *zv1.StackSet,
908
        existingIng *networking.Ingress,
909
        existingRg *rgv1.RouteGroup,
910
        generateIng func() (*networking.Ingress, error),
911
        generateRg func() (*rgv1.RouteGroup, error),
912
) error {
1✔
913
        ingress, err := generateIng()
1✔
914
        if err != nil {
1✔
915
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
916
        }
×
917

918
        // opt-out existingIng creation in case we have an external entity creating existingIng
919
        appliedIng, err := c.AddUpdateStackSetIngress(ctx, stackset, existingIng, existingRg, ingress)
1✔
920
        if err != nil {
1✔
921
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
922
        }
×
923

924
        rg, err := generateRg()
1✔
925
        if err != nil {
1✔
926
                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
927
        }
×
928

929
        var appliedRg *rgv1.RouteGroup
1✔
930
        if c.routeGroupSupportEnabled {
2✔
931
                appliedRg, err = c.AddUpdateStackSetRouteGroup(ctx, stackset, existingRg, appliedIng, rg)
1✔
932
                if err != nil {
1✔
933
                        return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
934
                }
×
935
        }
936

937
        // Ingress removed
938
        if ingress == nil {
2✔
939
                if existingIng != nil {
2✔
940
                        err := c.deleteIngress(ctx, stackset, existingIng, appliedRg)
1✔
941
                        if err != nil {
1✔
942
                                return c.errorEventf(stackset, "FailedManageIngress", err)
×
943
                        }
×
944
                }
945
        }
946

947
        // RouteGroup removed
948
        if rg == nil {
2✔
949
                if existingRg != nil {
2✔
950
                        err := c.deleteRouteGroup(ctx, stackset, existingRg, appliedIng)
1✔
951
                        if err != nil {
1✔
952
                                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
953
                        }
×
954
                }
955
        }
956

957
        return nil
1✔
958
}
959

960
func (c *StackSetController) ReconcileStackSetResources(ctx context.Context, ssc *core.StackSetContainer) error {
×
961
        err := c.ReconcileStackSetIngressSources(ctx, ssc.StackSet, ssc.Ingress, ssc.RouteGroup, ssc.GenerateIngress, ssc.GenerateRouteGroup)
×
962
        if err != nil {
×
963
                return err
×
964
        }
×
965

966
        trafficChanges := ssc.TrafficChanges()
×
967
        if len(trafficChanges) != 0 {
×
968
                var changeMessages []string
×
969
                for _, change := range trafficChanges {
×
970
                        changeMessages = append(changeMessages, change.String())
×
971
                }
×
972

973
                c.recorder.Eventf(
×
974
                        ssc.StackSet,
×
975
                        v1.EventTypeNormal,
×
976
                        "TrafficSwitched",
×
977
                        "Switched traffic: %s",
×
978
                        strings.Join(changeMessages, ", "))
×
979
        }
980

981
        return nil
×
982
}
983

984
func (c *StackSetController) ReconcileStackSetDesiredTraffic(ctx context.Context, existing *zv1.StackSet, generateUpdated func() []*zv1.DesiredTraffic) error {
1✔
985
        updatedTraffic := generateUpdated()
1✔
986

1✔
987
        if equality.Semantic.DeepEqual(existing.Spec.Traffic, updatedTraffic) {
1✔
988
                return nil
×
989
        }
×
990

991
        updated := existing.DeepCopy()
1✔
992
        updated.Spec.Traffic = updatedTraffic
1✔
993

1✔
994
        _, err := c.client.ZalandoV1().StackSets(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
995
        if err != nil {
1✔
996
                return err
×
997
        }
×
998
        c.recorder.Eventf(
1✔
999
                updated,
1✔
1000
                v1.EventTypeNormal,
1✔
1001
                "UpdatedStackSet",
1✔
1002
                "Updated StackSet %s",
1✔
1003
                updated.Name)
1✔
1004
        return nil
1✔
1005
}
1006

1007
func (c *StackSetController) ReconcileStackResources(ctx context.Context, ssc *core.StackSetContainer, sc *core.StackContainer) error {
×
NEW
1008
        if c.configMapSupportEnabled {
×
NEW
1009
                err := c.ReconcileStackConfigMap(ctx, sc.Stack, sc.Resources.ConfigMaps, sc.GenerateConfigMap)
×
NEW
1010
                if err != nil {
×
NEW
1011
                        return c.errorEventf(sc.Stack, "FailedManageConfigMap", err)
×
NEW
1012
                }
×
1013
        }
1014

1015
        err := c.ReconcileStackDeployment(ctx, sc.Stack, sc.Resources.Deployment, sc.GenerateDeployment)
×
1016
        if err != nil {
×
1017
                return c.errorEventf(sc.Stack, "FailedManageDeployment", err)
×
1018
        }
×
1019
        err = c.ReconcileStackHPA(ctx, sc.Stack, sc.Resources.HPA, sc.GenerateHPA)
×
1020
        if err != nil {
×
1021
                return c.errorEventf(sc.Stack, "FailedManageHPA", err)
×
1022
        }
×
1023

1024
        err = c.ReconcileStackService(ctx, sc.Stack, sc.Resources.Service, sc.GenerateService)
×
1025
        if err != nil {
×
1026
                return c.errorEventf(sc.Stack, "FailedManageService", err)
×
1027
        }
×
1028

1029
        err = c.ReconcileStackIngress(ctx, sc.Stack, sc.Resources.Ingress, sc.GenerateIngress)
×
1030
        if err != nil {
×
1031
                return c.errorEventf(sc.Stack, "FailedManageIngress", err)
×
1032
        }
×
1033

1034
        if c.routeGroupSupportEnabled {
×
1035
                err = c.ReconcileStackRouteGroup(ctx, sc.Stack, sc.Resources.RouteGroup, sc.GenerateRouteGroup)
×
1036
                if err != nil {
×
1037
                        return c.errorEventf(sc.Stack, "FailedManageRouteGroup", err)
×
1038
                }
×
1039
        }
UNCOV
1040
        return nil
×
1041
}
1042

1043
// ReconcileStackSet reconciles all the things from a stackset
1044
func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *core.StackSetContainer) (err error) {
×
1045
        defer func() {
×
1046
                if r := recover(); r != nil {
×
1047
                        c.metricsReporter.ReportPanic()
×
1048
                        c.stacksetLogger(container).Errorf("Encountered a panic while processing a stackset: %v\n%s", r, debug.Stack())
×
1049
                        err = fmt.Errorf("panic: %v", r)
×
1050
                }
×
1051
        }()
1052

1053
        // Create current stack, if needed. Proceed on errors.
1054
        err = c.CreateCurrentStack(ctx, container)
×
1055
        if err != nil {
×
1056
                err = c.errorEventf(container.StackSet, "FailedCreateStack", err)
×
1057
                c.stacksetLogger(container).Errorf("Unable to create stack: %v", err)
×
1058
        }
×
1059

1060
        // Update statuses from external resources (ingresses, deployments, etc). Abort on errors.
1061
        err = container.UpdateFromResources()
×
1062
        if err != nil {
×
1063
                return err
×
1064
        }
×
1065

1066
        // Update the stacks with the currently selected traffic reconciler. Proceed on errors.
1067
        err = container.ManageTraffic(time.Now())
×
1068
        if err != nil {
×
1069
                c.stacksetLogger(container).Errorf("Traffic reconciliation failed: %v", err)
×
1070
                c.recorder.Eventf(
×
1071
                        container.StackSet,
×
1072
                        v1.EventTypeWarning,
×
1073
                        "TrafficNotSwitched",
×
1074
                        "Failed to switch traffic: "+err.Error())
×
1075
        }
×
1076

1077
        // Mark stacks that should be removed
1078
        container.MarkExpiredStacks()
×
1079

×
1080
        // Reconcile stack resources. Proceed on errors.
×
1081
        for _, sc := range container.StackContainers {
×
1082
                err = c.ReconcileStackResources(ctx, container, sc)
×
1083
                if err != nil {
×
1084
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
1085
                        c.stackLogger(container, sc).Errorf("Unable to reconcile stack resources: %v", err)
×
1086
                }
×
1087
        }
1088

1089
        // Reconcile stackset resources (update ingress and/or routegroups). Proceed on errors.
1090
        err = c.ReconcileStackSetResources(ctx, container)
×
1091
        if err != nil {
×
1092
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1093
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset resources: %v", err)
×
1094
        }
×
1095

1096
        // Reconcile desired traffic in the stackset. Proceed on errors.
1097
        err = c.ReconcileStackSetDesiredTraffic(ctx, container.StackSet, container.GenerateStackSetTraffic)
×
1098
        if err != nil {
×
1099
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1100
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset traffic: %v", err)
×
1101
        }
×
1102

1103
        // Delete old stacks. Proceed on errors.
1104
        err = c.CleanupOldStacks(ctx, container)
×
1105
        if err != nil {
×
1106
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1107
                c.stacksetLogger(container).Errorf("Unable to delete old stacks: %v", err)
×
1108
        }
×
1109

1110
        // Update statuses.
1111
        err = c.ReconcileStatuses(ctx, container)
×
1112
        if err != nil {
×
1113
                return err
×
1114
        }
×
1115

1116
        return nil
×
1117
}
1118

1119
// getResetMinReplicasDelay parses and returns the reset delay if set in the
1120
// stackset annotation.
1121
func getResetMinReplicasDelay(annotations map[string]string) (time.Duration, bool) {
1✔
1122
        resetDelayStr, ok := annotations[ResetHPAMinReplicasDelayAnnotationKey]
1✔
1123
        if !ok {
2✔
1124
                return 0, false
1✔
1125
        }
1✔
1126
        resetDelay, err := time.ParseDuration(resetDelayStr)
1✔
1127
        if err != nil {
1✔
1128
                return 0, false
×
1129
        }
×
1130
        return resetDelay, true
1✔
1131
}
1132

1133
func fixupStackSetTypeMeta(stackset *zv1.StackSet) {
1✔
1134
        // set TypeMeta manually because of this bug:
1✔
1135
        // https://github.com/kubernetes/client-go/issues/308
1✔
1136
        stackset.APIVersion = core.APIVersion
1✔
1137
        stackset.Kind = core.KindStackSet
1✔
1138
}
1✔
1139

1140
func fixupStackTypeMeta(stack *zv1.Stack) {
1✔
1141
        // set TypeMeta manually because of this bug:
1✔
1142
        // https://github.com/kubernetes/client-go/issues/308
1✔
1143
        stack.APIVersion = core.APIVersion
1✔
1144
        stack.Kind = core.KindStack
1✔
1145
}
1✔
1146

1147
func resourceReady(timestamp string, ttl time.Duration) (bool, error) {
1✔
1148
        resourceLastUpdated, err := time.Parse(time.RFC3339, timestamp)
1✔
1149
        if err != nil {
2✔
1150
                // wait until there's a valid timestamp on the annotation
1✔
1151
                return false, err
1✔
1152
        }
1✔
1153

1154
        if !resourceLastUpdated.IsZero() && time.Since(resourceLastUpdated) > ttl {
2✔
1155
                return true, nil
1✔
1156
        }
1✔
1157

1158
        return false, nil
1✔
1159
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc