• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando-incubator / stackset-controller / 6052535528

01 Sep 2023 05:56PM UTC coverage: 71.558% (-1.5%) from 73.025%
6052535528

Pull #529

github

linki
add basic resource templates reconciliation code with stub implementation
Pull Request #529: Add basic resource templates reconciliation code with stub implementation

64 of 64 new or added lines in 5 files covered. (100.0%)

2209 of 3087 relevant lines covered (71.56%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.15
/controller/stackset.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "net/http"
7
        "runtime/debug"
8
        "strings"
9
        "sync"
10
        "time"
11

12
        "github.com/google/go-cmp/cmp"
13
        "github.com/google/go-cmp/cmp/cmpopts"
14
        "github.com/heptiolabs/healthcheck"
15
        "github.com/prometheus/client_golang/prometheus"
16
        log "github.com/sirupsen/logrus"
17
        rgv1 "github.com/szuecs/routegroup-client/apis/zalando.org/v1"
18
        zv1 "github.com/zalando-incubator/stackset-controller/pkg/apis/zalando.org/v1"
19
        "github.com/zalando-incubator/stackset-controller/pkg/clientset"
20
        "github.com/zalando-incubator/stackset-controller/pkg/core"
21
        "github.com/zalando-incubator/stackset-controller/pkg/recorder"
22
        "golang.org/x/sync/errgroup"
23
        v1 "k8s.io/api/core/v1"
24
        networking "k8s.io/api/networking/v1"
25
        "k8s.io/apimachinery/pkg/api/equality"
26
        "k8s.io/apimachinery/pkg/api/errors"
27
        "k8s.io/apimachinery/pkg/api/resource"
28
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
        "k8s.io/apimachinery/pkg/fields"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        "k8s.io/client-go/tools/cache"
33
        kube_record "k8s.io/client-go/tools/record"
34
)
35

36
const (
37
        PrescaleStacksAnnotationKey               = "alpha.stackset-controller.zalando.org/prescale-stacks"
38
        ResetHPAMinReplicasDelayAnnotationKey     = "alpha.stackset-controller.zalando.org/reset-hpa-min-replicas-delay"
39
        StacksetControllerControllerAnnotationKey = "stackset-controller.zalando.org/controller"
40
        ControllerLastUpdatedAnnotationKey        = "stackset-controller.zalando.org/updated-timestamp"
41

42
        reasonFailedManageStackSet = "FailedManageStackSet"
43

44
        defaultResetMinReplicasDelay = 10 * time.Minute
45
)
46

47
// StackSetController is the main controller. It watches for changes to
48
// stackset resources and starts and maintains other controllers per
49
// stackset resource.
50
type StackSetController struct {
51
        logger                      *log.Entry
52
        client                      clientset.Interface
53
        controllerID                string
54
        backendWeightsAnnotationKey string
55
        clusterDomains              []string
56
        interval                    time.Duration
57
        stacksetEvents              chan stacksetEvent
58
        stacksetStore               map[types.UID]zv1.StackSet
59
        recorder                    kube_record.EventRecorder
60
        metricsReporter             *core.MetricsReporter
61
        HealthReporter              healthcheck.Handler
62
        routeGroupSupportEnabled    bool
63
        resourceTemplatesEnabled    bool
64
        ingressSourceSwitchTTL      time.Duration
65
        now                         func() string
66
        reconcileWorkers            int
67
        sync.Mutex
68
}
69

70
type stacksetEvent struct {
71
        Deleted  bool
72
        StackSet *zv1.StackSet
73
}
74

75
// eventedError wraps an error that was already exposed as an event to the user
76
type eventedError struct {
77
        err error
78
}
79

80
func (ee *eventedError) Error() string {
×
81
        return ee.err.Error()
×
82
}
×
83

84
func now() string {
×
85
        return time.Now().Format(time.RFC3339)
×
86
}
×
87

88
// NewStackSetController initializes a new StackSetController.
89
func NewStackSetController(client clientset.Interface, controllerID string, parallelWork int, backendWeightsAnnotationKey string, clusterDomains []string, registry prometheus.Registerer, interval time.Duration, routeGroupSupportEnabled bool, resourceTemplatesEnabled bool, ingressSourceSwitchTTL time.Duration) (*StackSetController, error) {
1✔
90
        metricsReporter, err := core.NewMetricsReporter(registry)
1✔
91
        if err != nil {
1✔
92
                return nil, err
×
93
        }
×
94

95
        return &StackSetController{
1✔
96
                logger:                      log.WithFields(log.Fields{"controller": "stackset"}),
1✔
97
                client:                      client,
1✔
98
                controllerID:                controllerID,
1✔
99
                backendWeightsAnnotationKey: backendWeightsAnnotationKey,
1✔
100
                clusterDomains:              clusterDomains,
1✔
101
                interval:                    interval,
1✔
102
                stacksetEvents:              make(chan stacksetEvent, 1),
1✔
103
                stacksetStore:               make(map[types.UID]zv1.StackSet),
1✔
104
                recorder:                    recorder.CreateEventRecorder(client),
1✔
105
                metricsReporter:             metricsReporter,
1✔
106
                HealthReporter:              healthcheck.NewHandler(),
1✔
107
                routeGroupSupportEnabled:    routeGroupSupportEnabled,
1✔
108
                resourceTemplatesEnabled:    resourceTemplatesEnabled,
1✔
109
                ingressSourceSwitchTTL:      ingressSourceSwitchTTL,
1✔
110
                now:                         now,
1✔
111
                reconcileWorkers:            parallelWork,
1✔
112
        }, nil
1✔
113
}
114

115
func (c *StackSetController) stacksetLogger(ssc *core.StackSetContainer) *log.Entry {
×
116
        return c.logger.WithFields(map[string]interface{}{
×
117
                "namespace": ssc.StackSet.Namespace,
×
118
                "stackset":  ssc.StackSet.Name,
×
119
        })
×
120
}
×
121

122
func (c *StackSetController) stackLogger(ssc *core.StackSetContainer, sc *core.StackContainer) *log.Entry {
×
123
        return c.logger.WithFields(map[string]interface{}{
×
124
                "namespace": ssc.StackSet.Namespace,
×
125
                "stackset":  ssc.StackSet.Name,
×
126
                "stack":     sc.Name(),
×
127
        })
×
128
}
×
129

130
// Run runs the main loop of the StackSetController. Before the loops it
131
// sets up a watcher to watch StackSet resources. The watch will send
132
// changes over a channel which is polled from the main loop.
133
func (c *StackSetController) Run(ctx context.Context) {
×
134
        var nextCheck time.Time
×
135

×
136
        // We're not alive if nextCheck is too far in the past
×
137
        c.HealthReporter.AddLivenessCheck("nextCheck", func() error {
×
138
                if time.Since(nextCheck) > 5*c.interval {
×
139
                        return fmt.Errorf("nextCheck too old")
×
140
                }
×
141
                return nil
×
142
        })
143

144
        c.startWatch(ctx)
×
145

×
146
        http.HandleFunc("/healthz", c.HealthReporter.LiveEndpoint)
×
147

×
148
        nextCheck = time.Now().Add(-c.interval)
×
149

×
150
        for {
×
151
                select {
×
152
                case <-time.After(time.Until(nextCheck)):
×
153

×
154
                        nextCheck = time.Now().Add(c.interval)
×
155

×
156
                        stackSetContainers, err := c.collectResources(ctx)
×
157
                        if err != nil {
×
158
                                c.logger.Errorf("Failed to collect resources: %v", err)
×
159
                                continue
×
160
                        }
161

162
                        var reconcileGroup errgroup.Group
×
163
                        reconcileGroup.SetLimit(c.reconcileWorkers)
×
164
                        for stackset, container := range stackSetContainers {
×
165
                                container := container
×
166
                                stackset := stackset
×
167

×
168
                                reconcileGroup.Go(func() error {
×
169
                                        if _, ok := c.stacksetStore[stackset]; ok {
×
170
                                                err := c.ReconcileStackSet(ctx, container)
×
171
                                                if err != nil {
×
172
                                                        c.stacksetLogger(container).Errorf("unable to reconcile a stackset: %v", err)
×
173
                                                        return c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
174
                                                }
×
175
                                        }
176
                                        return nil
×
177
                                })
178
                        }
179

180
                        err = reconcileGroup.Wait()
×
181
                        if err != nil {
×
182
                                c.logger.Errorf("Failed waiting for reconcilers: %v", err)
×
183
                        }
×
184
                        err = c.metricsReporter.Report(stackSetContainers)
×
185
                        if err != nil {
×
186
                                c.logger.Errorf("Failed reporting metrics: %v", err)
×
187
                        }
×
188
                case e := <-c.stacksetEvents:
×
189
                        stackset := *e.StackSet
×
190
                        fixupStackSetTypeMeta(&stackset)
×
191

×
192
                        // update/delete existing entry
×
193
                        if _, ok := c.stacksetStore[stackset.UID]; ok {
×
194
                                if e.Deleted || !c.hasOwnership(&stackset) {
×
195
                                        delete(c.stacksetStore, stackset.UID)
×
196
                                        continue
×
197
                                }
198

199
                                // update stackset entry
200
                                c.stacksetStore[stackset.UID] = stackset
×
201
                                continue
×
202
                        }
203

204
                        // check if stackset should be managed by the controller
205
                        if !c.hasOwnership(&stackset) {
×
206
                                continue
×
207
                        }
208

209
                        c.logger.Infof("Adding entry for StackSet %s/%s", stackset.Namespace, stackset.Name)
×
210
                        c.stacksetStore[stackset.UID] = stackset
×
211
                case <-ctx.Done():
×
212
                        c.logger.Info("Terminating main controller loop.")
×
213
                        return
×
214
                }
215
        }
216
}
217

218
// collectResources collects resources for all stacksets at once and stores them per StackSet/Stack so that we don't
219
// overload the API requests with unnecessary requests
220
func (c *StackSetController) collectResources(ctx context.Context) (map[types.UID]*core.StackSetContainer, error) {
1✔
221
        stacksets := make(map[types.UID]*core.StackSetContainer, len(c.stacksetStore))
1✔
222
        for uid, stackset := range c.stacksetStore {
2✔
223
                stackset := stackset
1✔
224

1✔
225
                reconciler := core.TrafficReconciler(&core.SimpleTrafficReconciler{})
1✔
226

1✔
227
                // use prescaling logic if enabled with an annotation
1✔
228
                if _, ok := stackset.Annotations[PrescaleStacksAnnotationKey]; ok {
2✔
229
                        resetDelay := defaultResetMinReplicasDelay
1✔
230
                        if resetDelayValue, ok := getResetMinReplicasDelay(stackset.Annotations); ok {
2✔
231
                                resetDelay = resetDelayValue
1✔
232
                        }
1✔
233
                        reconciler = &core.PrescalingTrafficReconciler{
1✔
234
                                ResetHPAMinReplicasTimeout: resetDelay,
1✔
235
                        }
1✔
236
                }
237

238
                stacksetContainer := core.NewContainer(&stackset, reconciler, c.backendWeightsAnnotationKey, c.clusterDomains)
1✔
239
                stacksets[uid] = stacksetContainer
1✔
240
        }
241

242
        err := c.collectStacks(ctx, stacksets)
1✔
243
        if err != nil {
1✔
244
                return nil, err
×
245
        }
×
246

247
        err = c.collectIngresses(ctx, stacksets)
1✔
248
        if err != nil {
1✔
249
                return nil, err
×
250
        }
×
251

252
        if c.routeGroupSupportEnabled {
2✔
253
                err = c.collectRouteGroups(ctx, stacksets)
1✔
254
                if err != nil {
1✔
255
                        return nil, err
×
256
                }
×
257
        }
258

259
        err = c.collectDeployments(ctx, stacksets)
1✔
260
        if err != nil {
1✔
261
                return nil, err
×
262
        }
×
263

264
        err = c.collectServices(ctx, stacksets)
1✔
265
        if err != nil {
1✔
266
                return nil, err
×
267
        }
×
268

269
        err = c.collectHPAs(ctx, stacksets)
1✔
270
        if err != nil {
1✔
271
                return nil, err
×
272
        }
×
273

274
        return stacksets, nil
1✔
275
}
276

277
func (c *StackSetController) collectIngresses(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
278
        ingresses, err := c.client.NetworkingV1().Ingresses(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
279
        if err != nil {
1✔
280
                return fmt.Errorf("failed to list Ingresses: %v", err)
×
281
        }
×
282

283
Items:
1✔
284
        for _, i := range ingresses.Items {
2✔
285
                ingress := i
1✔
286
                if uid, ok := getOwnerUID(ingress.ObjectMeta); ok {
2✔
287
                        // stackset ingress
1✔
288
                        if s, ok := stacksets[uid]; ok {
2✔
289
                                s.Ingress = &ingress
1✔
290
                                continue
1✔
291
                        }
292

293
                        // stack ingress
294
                        for _, stackset := range stacksets {
2✔
295
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
296
                                        s.Resources.Ingress = &ingress
1✔
297
                                        continue Items
1✔
298
                                }
299
                        }
300
                }
301
        }
302
        return nil
1✔
303
}
304

305
func (c *StackSetController) collectRouteGroups(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
306
        routegroups, err := c.client.RouteGroupV1().RouteGroups(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
307
        if err != nil {
1✔
308
                return fmt.Errorf("failed to list RouteGroups: %v", err)
×
309
        }
×
310

311
Items:
1✔
312
        for _, rg := range routegroups.Items {
2✔
313
                routegroup := rg
1✔
314
                if uid, ok := getOwnerUID(routegroup.ObjectMeta); ok {
2✔
315
                        // stackset routegroups
1✔
316
                        if s, ok := stacksets[uid]; ok {
2✔
317
                                s.RouteGroup = &routegroup
1✔
318
                                continue
1✔
319
                        }
320

321
                        // stack routegroups
322
                        for _, stackset := range stacksets {
2✔
323
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
324
                                        s.Resources.RouteGroup = &routegroup
1✔
325
                                        continue Items
1✔
326
                                }
327
                        }
328
                }
329
        }
330
        return nil
1✔
331
}
332

333
func (c *StackSetController) collectStacks(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
334
        stacks, err := c.client.ZalandoV1().Stacks(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
335
        if err != nil {
1✔
336
                return fmt.Errorf("failed to list Stacks: %v", err)
×
337
        }
×
338

339
        for _, stack := range stacks.Items {
2✔
340
                if uid, ok := getOwnerUID(stack.ObjectMeta); ok {
2✔
341
                        if s, ok := stacksets[uid]; ok {
2✔
342
                                stack := stack
1✔
343
                                fixupStackTypeMeta(&stack)
1✔
344

1✔
345
                                s.StackContainers[stack.UID] = &core.StackContainer{
1✔
346
                                        Stack: &stack,
1✔
347
                                }
1✔
348
                                continue
1✔
349
                        }
350
                }
351
        }
352
        return nil
1✔
353
}
354

355
func (c *StackSetController) collectDeployments(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
356
        deployments, err := c.client.AppsV1().Deployments(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
357
        if err != nil {
1✔
358
                return fmt.Errorf("failed to list Deployments: %v", err)
×
359
        }
×
360

361
        for _, d := range deployments.Items {
2✔
362
                deployment := d
1✔
363
                if uid, ok := getOwnerUID(deployment.ObjectMeta); ok {
2✔
364
                        for _, stackset := range stacksets {
2✔
365
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
366
                                        s.Resources.Deployment = &deployment
1✔
367
                                        break
1✔
368
                                }
369
                        }
370
                }
371
        }
372
        return nil
1✔
373
}
374

375
func (c *StackSetController) collectServices(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
376
        services, err := c.client.CoreV1().Services(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
377
        if err != nil {
1✔
378
                return fmt.Errorf("failed to list Services: %v", err)
×
379
        }
×
380

381
Items:
1✔
382
        for _, s := range services.Items {
2✔
383
                service := s
1✔
384
                if uid, ok := getOwnerUID(service.ObjectMeta); ok {
2✔
385
                        for _, stackset := range stacksets {
2✔
386
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
387
                                        s.Resources.Service = &service
1✔
388
                                        continue Items
1✔
389
                                }
390

391
                                // service/HPA used to be owned by the deployment for some reason
392
                                for _, stack := range stackset.StackContainers {
2✔
393
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
394
                                                stack.Resources.Service = &service
1✔
395
                                                continue Items
1✔
396
                                        }
397
                                }
398
                        }
399
                }
400
        }
401
        return nil
1✔
402
}
403

404
func (c *StackSetController) collectHPAs(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
405
        hpas, err := c.client.AutoscalingV2().HorizontalPodAutoscalers(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
1✔
406
        if err != nil {
1✔
407
                return fmt.Errorf("failed to list HPAs: %v", err)
×
408
        }
×
409

410
Items:
1✔
411
        for _, h := range hpas.Items {
2✔
412
                hpa := h
1✔
413
                if uid, ok := getOwnerUID(hpa.ObjectMeta); ok {
2✔
414
                        for _, stackset := range stacksets {
2✔
415
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
416
                                        s.Resources.HPA = &hpa
1✔
417
                                        continue Items
1✔
418
                                }
419

420
                                // service/HPA used to be owned by the deployment for some reason
421
                                for _, stack := range stackset.StackContainers {
2✔
422
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
423
                                                stack.Resources.HPA = &hpa
1✔
424
                                                continue Items
1✔
425
                                        }
426
                                }
427
                        }
428
                }
429
        }
430
        return nil
1✔
431
}
432

433
func getOwnerUID(objectMeta metav1.ObjectMeta) (types.UID, bool) {
1✔
434
        if len(objectMeta.OwnerReferences) == 1 {
2✔
435
                return objectMeta.OwnerReferences[0].UID, true
1✔
436
        }
1✔
437
        return "", false
1✔
438
}
439

440
func (c *StackSetController) errorEventf(object runtime.Object, reason string, err error) error {
×
441
        switch err.(type) {
×
442
        case *eventedError:
×
443
                // already notified
×
444
                return err
×
445
        default:
×
446
                c.recorder.Eventf(
×
447
                        object,
×
448
                        v1.EventTypeWarning,
×
449
                        reason,
×
450
                        err.Error())
×
451
                return &eventedError{err: err}
×
452
        }
453
}
454

455
// hasOwnership returns true if the controller is the "owner" of the stackset.
456
// Whether it's owner is determined by the value of the
457
// 'stackset-controller.zalando.org/controller' annotation. If the value
458
// matches the controllerID then it owns it, or if the controllerID is
459
// "" and there's no annotation set.
460
func (c *StackSetController) hasOwnership(stackset *zv1.StackSet) bool {
×
461
        if stackset.Annotations != nil {
×
462
                if owner, ok := stackset.Annotations[StacksetControllerControllerAnnotationKey]; ok {
×
463
                        return owner == c.controllerID
×
464
                }
×
465
        }
466
        return c.controllerID == ""
×
467
}
468

469
func (c *StackSetController) startWatch(ctx context.Context) {
×
470
        informer := cache.NewSharedIndexInformer(
×
471
                cache.NewListWatchFromClient(c.client.ZalandoV1().RESTClient(), "stacksets", v1.NamespaceAll, fields.Everything()),
×
472
                &zv1.StackSet{},
×
473
                0, // skip resync
×
474
                cache.Indexers{},
×
475
        )
×
476

×
477
        informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
478
                AddFunc:    c.add,
×
479
                UpdateFunc: c.update,
×
480
                DeleteFunc: c.del,
×
481
        })
×
482
        go informer.Run(ctx.Done())
×
483
        if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
×
484
                c.logger.Errorf("Timed out waiting for caches to sync")
×
485
                return
×
486
        }
×
487
        c.logger.Info("Synced StackSet watcher")
×
488
}
489

490
func (c *StackSetController) add(obj interface{}) {
×
491
        stackset, ok := obj.(*zv1.StackSet)
×
492
        if !ok {
×
493
                return
×
494
        }
×
495

496
        c.logger.Infof("New StackSet added %s/%s", stackset.Namespace, stackset.Name)
×
497
        c.stacksetEvents <- stacksetEvent{
×
498
                StackSet: stackset.DeepCopy(),
×
499
        }
×
500
}
501

502
func (c *StackSetController) update(oldObj, newObj interface{}) {
×
503
        newStackset, ok := newObj.(*zv1.StackSet)
×
504
        if !ok {
×
505
                return
×
506
        }
×
507

508
        oldStackset, ok := oldObj.(*zv1.StackSet)
×
509
        if !ok {
×
510
                return
×
511
        }
×
512

513
        c.logger.Debugf("StackSet %s/%s changed: %s",
×
514
                newStackset.Namespace,
×
515
                newStackset.Name,
×
516
                cmp.Diff(oldStackset, newStackset, cmpopts.IgnoreUnexported(resource.Quantity{})),
×
517
        )
×
518

×
519
        c.logger.Infof("StackSet updated %s/%s", newStackset.Namespace, newStackset.Name)
×
520
        c.stacksetEvents <- stacksetEvent{
×
521
                StackSet: newStackset.DeepCopy(),
×
522
        }
×
523
}
524

525
func (c *StackSetController) del(obj interface{}) {
×
526
        stackset, ok := obj.(*zv1.StackSet)
×
527
        if !ok {
×
528
                return
×
529
        }
×
530

531
        c.logger.Infof("StackSet deleted %s/%s", stackset.Namespace, stackset.Name)
×
532
        c.stacksetEvents <- stacksetEvent{
×
533
                StackSet: stackset.DeepCopy(),
×
534
                Deleted:  true,
×
535
        }
×
536
}
537

538
func retryUpdate(updateFn func(retry bool) error) error {
×
539
        retry := false
×
540
        for {
×
541
                err := updateFn(retry)
×
542
                if err != nil {
×
543
                        if errors.IsConflict(err) {
×
544
                                retry = true
×
545
                                continue
×
546
                        }
547
                        return err
×
548
                }
549
                return nil
×
550
        }
551
}
552

553
// ReconcileStatuses reconciles the statuses of StackSets and Stacks.
554
func (c *StackSetController) ReconcileStatuses(ctx context.Context, ssc *core.StackSetContainer) error {
×
555
        for _, sc := range ssc.StackContainers {
×
556
                stack := sc.Stack.DeepCopy()
×
557
                status := *sc.GenerateStackStatus()
×
558
                err := retryUpdate(func(retry bool) error {
×
559
                        if retry {
×
560
                                updated, err := c.client.ZalandoV1().Stacks(sc.Namespace()).Get(ctx, stack.Name, metav1.GetOptions{})
×
561
                                if err != nil {
×
562
                                        return err
×
563
                                }
×
564
                                stack = updated
×
565
                        }
566
                        if !equality.Semantic.DeepEqual(status, stack.Status) {
×
567
                                stack.Status = status
×
568
                                _, err := c.client.ZalandoV1().Stacks(sc.Namespace()).UpdateStatus(ctx, stack, metav1.UpdateOptions{})
×
569
                                return err
×
570
                        }
×
571
                        return nil
×
572
                })
573
                if err != nil {
×
574
                        return c.errorEventf(sc.Stack, "FailedUpdateStackStatus", err)
×
575
                }
×
576
        }
577

578
        stackset := ssc.StackSet.DeepCopy()
×
579
        status := *ssc.GenerateStackSetStatus()
×
580
        err := retryUpdate(func(retry bool) error {
×
581
                if retry {
×
582
                        updated, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).Get(ctx, ssc.StackSet.Name, metav1.GetOptions{})
×
583
                        if err != nil {
×
584
                                return err
×
585
                        }
×
586
                        stackset = updated
×
587
                }
588
                if !equality.Semantic.DeepEqual(status, stackset.Status) {
×
589
                        stackset.Status = status
×
590
                        _, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, stackset, metav1.UpdateOptions{})
×
591
                        return err
×
592
                }
×
593
                return nil
×
594
        })
595

596
        if err != nil {
×
597
                return c.errorEventf(ssc.StackSet, "FailedUpdateStackSetStatus", err)
×
598
        }
×
599
        return nil
×
600
}
601

602
// CreateCurrentStack creates a new Stack object for the current stack, if needed
603
func (c *StackSetController) CreateCurrentStack(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
604
        newStack, newStackVersion := ssc.NewStack()
1✔
605
        if newStack == nil {
2✔
606
                return nil
1✔
607
        }
1✔
608

609
        created, err := c.client.ZalandoV1().Stacks(newStack.Namespace()).Create(ctx, newStack.Stack, metav1.CreateOptions{})
1✔
610
        if err != nil {
1✔
611
                return err
×
612
        }
×
613
        fixupStackTypeMeta(created)
1✔
614

1✔
615
        c.recorder.Eventf(
1✔
616
                ssc.StackSet,
1✔
617
                v1.EventTypeNormal,
1✔
618
                "CreatedStack",
1✔
619
                "Created stack %s",
1✔
620
                newStack.Name())
1✔
621

1✔
622
        // Persist ObservedStackVersion in the status
1✔
623
        updated := ssc.StackSet.DeepCopy()
1✔
624
        updated.Status.ObservedStackVersion = newStackVersion
1✔
625

1✔
626
        result, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, updated, metav1.UpdateOptions{})
1✔
627
        if err != nil {
1✔
628
                return err
×
629
        }
×
630
        fixupStackSetTypeMeta(result)
1✔
631
        ssc.StackSet = result
1✔
632

1✔
633
        ssc.StackContainers[created.UID] = &core.StackContainer{
1✔
634
                Stack:          created,
1✔
635
                PendingRemoval: false,
1✔
636
                Resources:      core.StackResources{},
1✔
637
        }
1✔
638
        return nil
1✔
639
}
640

641
// CleanupOldStacks deletes stacks that are no longer needed.
642
func (c *StackSetController) CleanupOldStacks(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
643
        for _, sc := range ssc.StackContainers {
2✔
644
                if !sc.PendingRemoval {
2✔
645
                        continue
1✔
646
                }
647

648
                stack := sc.Stack
1✔
649
                err := c.client.ZalandoV1().Stacks(stack.Namespace).Delete(ctx, stack.Name, metav1.DeleteOptions{})
1✔
650
                if err != nil {
1✔
651
                        return c.errorEventf(ssc.StackSet, "FailedDeleteStack", err)
×
652
                }
×
653
                c.recorder.Eventf(
1✔
654
                        ssc.StackSet,
1✔
655
                        v1.EventTypeNormal,
1✔
656
                        "DeletedExcessStack",
1✔
657
                        "Deleted excess stack %s",
1✔
658
                        stack.Name)
1✔
659
        }
660

661
        return nil
1✔
662
}
663

664
// AddUpdateStackSetIngress reconciles the Ingress but never deletes it, it returns the existing/new Ingress
665
func (c *StackSetController) AddUpdateStackSetIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup, ingress *networking.Ingress) (*networking.Ingress, error) {
1✔
666

1✔
667
        // Ingress removed, handled outside
1✔
668
        if ingress == nil {
2✔
669
                return existing, nil
1✔
670
        }
1✔
671

672
        // Create new Ingress
673
        if existing == nil {
2✔
674
                if ingress.Annotations == nil {
2✔
675
                        ingress.Annotations = make(map[string]string)
1✔
676
                }
1✔
677
                ingress.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
678

1✔
679
                createdIng, err := c.client.NetworkingV1().Ingresses(ingress.Namespace).Create(ctx, ingress, metav1.CreateOptions{})
1✔
680
                if err != nil {
1✔
681
                        return nil, err
×
682
                }
×
683
                c.recorder.Eventf(
1✔
684
                        stackset,
1✔
685
                        v1.EventTypeNormal,
1✔
686
                        "CreatedIngress",
1✔
687
                        "Created Ingress %s",
1✔
688
                        ingress.Name)
1✔
689
                return createdIng, nil
1✔
690
        }
691

692
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
693
        if existingHaveUpdateTimeStamp {
2✔
694
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
695
        }
1✔
696

697
        // Check if we need to update the Ingress
698
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(ingress.Spec, existing.Spec) &&
1✔
699
                equality.Semantic.DeepEqual(ingress.Annotations, existing.Annotations) &&
1✔
700
                equality.Semantic.DeepEqual(ingress.Labels, existing.Labels) {
2✔
701
                // add the annotation back after comparing
1✔
702
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
703
                return existing, nil
1✔
704
        }
1✔
705

706
        updated := existing.DeepCopy()
1✔
707
        updated.Spec = ingress.Spec
1✔
708
        if ingress.Annotations != nil {
2✔
709
                updated.Annotations = ingress.Annotations
1✔
710
        } else {
2✔
711
                updated.Annotations = make(map[string]string)
1✔
712
        }
1✔
713
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
714

1✔
715
        updated.Labels = ingress.Labels
1✔
716

1✔
717
        createdIngress, err := c.client.NetworkingV1().Ingresses(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
718
        if err != nil {
1✔
719
                return nil, err
×
720
        }
×
721
        c.recorder.Eventf(
1✔
722
                stackset,
1✔
723
                v1.EventTypeNormal,
1✔
724
                "UpdatedIngress",
1✔
725
                "Updated Ingress %s",
1✔
726
                ingress.Name)
1✔
727
        return createdIngress, nil
1✔
728
}
729

730
func (c *StackSetController) deleteIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup) error {
1✔
731
        // Check if a routegroup exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
732
        if stackset.Spec.RouteGroup != nil && c.routeGroupSupportEnabled {
2✔
733
                if routegroup == nil {
2✔
734
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup missing", existing.Name)
1✔
735
                        return nil
1✔
736
                }
1✔
737
                timestamp, ok := routegroup.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
738
                // The only scenario version we could think of for this is
1✔
739
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
740
                //  in that case, just wait until the RouteGroup has the annotation
1✔
741
                if !ok {
2✔
742
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have the %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
743
                        return nil
1✔
744
                }
1✔
745

746
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
747
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have a valid %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
748
                        return nil
1✔
749
                } else if !ready {
3✔
750
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s updated less than %s ago", existing.Name, routegroup.Name, c.ingressSourceSwitchTTL)
1✔
751
                        return nil
1✔
752
                }
1✔
753
        }
754
        err := c.client.NetworkingV1().Ingresses(existing.Namespace).Delete(ctx, existing.Name, metav1.DeleteOptions{})
1✔
755
        if err != nil {
1✔
756
                return err
×
757
        }
×
758
        c.recorder.Eventf(
1✔
759
                stackset,
1✔
760
                v1.EventTypeNormal,
1✔
761
                "DeletedIngress",
1✔
762
                "Deleted Ingress %s",
1✔
763
                existing.Namespace)
1✔
764
        return nil
1✔
765
}
766

767
// AddUpdateStackSetRouteGroup reconciles the RouteGroup but never deletes it, it returns the existing/new RouteGroup
768
func (c *StackSetController) AddUpdateStackSetRouteGroup(ctx context.Context, stackset *zv1.StackSet, existing *rgv1.RouteGroup, ingress *networking.Ingress, rg *rgv1.RouteGroup) (*rgv1.RouteGroup, error) {
1✔
769
        // RouteGroup removed, handled outside
1✔
770
        if rg == nil {
2✔
771
                return existing, nil
1✔
772
        }
1✔
773

774
        // Create new RouteGroup
775
        if existing == nil {
2✔
776
                if rg.Annotations == nil {
2✔
777
                        rg.Annotations = make(map[string]string)
1✔
778
                }
1✔
779
                rg.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
780

1✔
781
                createdRg, err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Create(ctx, rg, metav1.CreateOptions{})
1✔
782
                if err != nil {
1✔
783
                        return nil, err
×
784
                }
×
785
                c.recorder.Eventf(
1✔
786
                        stackset,
1✔
787
                        v1.EventTypeNormal,
1✔
788
                        "CreatedRouteGroup",
1✔
789
                        "Created RouteGroup %s",
1✔
790
                        rg.Name)
1✔
791
                return createdRg, nil
1✔
792
        }
793

794
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
795
        if existingHaveUpdateTimeStamp {
2✔
796
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
797
        }
1✔
798

799
        // Check if we need to update the RouteGroup
800
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(rg.Spec, existing.Spec) &&
1✔
801
                equality.Semantic.DeepEqual(rg.Annotations, existing.Annotations) &&
1✔
802
                equality.Semantic.DeepEqual(rg.Labels, existing.Labels) {
2✔
803
                // add the annotation back after comparing
1✔
804
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
805
                return existing, nil
1✔
806
        }
1✔
807

808
        updated := existing.DeepCopy()
1✔
809
        updated.Spec = rg.Spec
1✔
810
        if rg.Annotations != nil {
1✔
811
                updated.Annotations = rg.Annotations
×
812
        } else {
1✔
813
                updated.Annotations = make(map[string]string)
1✔
814
        }
1✔
815
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
816

1✔
817
        updated.Labels = rg.Labels
1✔
818

1✔
819
        createdRg, err := c.client.RouteGroupV1().RouteGroups(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
820
        if err != nil {
1✔
821
                return nil, err
×
822
        }
×
823
        c.recorder.Eventf(
1✔
824
                stackset,
1✔
825
                v1.EventTypeNormal,
1✔
826
                "UpdatedRouteGroup",
1✔
827
                "Updated RouteGroup %s",
1✔
828
                rg.Name)
1✔
829
        return createdRg, nil
1✔
830
}
831

832
func (c *StackSetController) deleteRouteGroup(ctx context.Context, stackset *zv1.StackSet, rg *rgv1.RouteGroup, ingress *networking.Ingress) error {
1✔
833
        // Check if an ingress exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
834
        if stackset.Spec.Ingress != nil {
2✔
835
                if ingress == nil {
2✔
836
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress missing", rg.Name)
1✔
837
                        return nil
1✔
838
                }
1✔
839
                timestamp, ok := ingress.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
840
                // The only scenario version we could think of for this is
1✔
841
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
842
                //  in that case, just wait until the RouteGroup has the annotation
1✔
843
                if !ok {
2✔
844
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have the %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
845
                        return nil
1✔
846
                }
1✔
847

848
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
849
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have a valid %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
850
                        return nil
1✔
851
                } else if !ready {
3✔
852
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s updated less than %s ago", rg.Name, ingress.Name, c.ingressSourceSwitchTTL)
1✔
853
                        return nil
1✔
854
                }
1✔
855
        }
856
        err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Delete(ctx, rg.Name, metav1.DeleteOptions{})
1✔
857
        if err != nil {
1✔
858
                return err
×
859
        }
×
860
        c.recorder.Eventf(
1✔
861
                stackset,
1✔
862
                v1.EventTypeNormal,
1✔
863
                "DeletedRouteGroup",
1✔
864
                "Deleted RouteGroup %s",
1✔
865
                rg.Namespace)
1✔
866
        return nil
1✔
867
}
868

869
func (c *StackSetController) ReconcileStackSetIngressSources(
870
        ctx context.Context,
871
        stackset *zv1.StackSet,
872
        existingIng *networking.Ingress,
873
        existingRg *rgv1.RouteGroup,
874
        generateIng func() (*networking.Ingress, error),
875
        generateRg func() (*rgv1.RouteGroup, error),
876
) error {
1✔
877
        ingress, err := generateIng()
1✔
878
        if err != nil {
1✔
879
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
880
        }
×
881

882
        // opt-out existingIng creation in case we have an external entity creating existingIng
883
        appliedIng, err := c.AddUpdateStackSetIngress(ctx, stackset, existingIng, existingRg, ingress)
1✔
884
        if err != nil {
1✔
885
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
886
        }
×
887

888
        rg, err := generateRg()
1✔
889
        if err != nil {
1✔
890
                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
891
        }
×
892

893
        var appliedRg *rgv1.RouteGroup
1✔
894
        if c.routeGroupSupportEnabled {
2✔
895
                appliedRg, err = c.AddUpdateStackSetRouteGroup(ctx, stackset, existingRg, appliedIng, rg)
1✔
896
                if err != nil {
1✔
897
                        return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
898
                }
×
899
        }
900

901
        // Ingress removed
902
        if ingress == nil {
2✔
903
                if existingIng != nil {
2✔
904
                        err := c.deleteIngress(ctx, stackset, existingIng, appliedRg)
1✔
905
                        if err != nil {
1✔
906
                                return c.errorEventf(stackset, "FailedManageIngress", err)
×
907
                        }
×
908
                }
909
        }
910

911
        // RouteGroup removed
912
        if rg == nil {
2✔
913
                if existingRg != nil {
2✔
914
                        err := c.deleteRouteGroup(ctx, stackset, existingRg, appliedIng)
1✔
915
                        if err != nil {
1✔
916
                                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
917
                        }
×
918
                }
919
        }
920

921
        return nil
1✔
922
}
923

924
func (c *StackSetController) ReconcileStackSetResources(ctx context.Context, ssc *core.StackSetContainer) error {
×
925
        err := c.ReconcileStackSetIngressSources(ctx, ssc.StackSet, ssc.Ingress, ssc.RouteGroup, ssc.GenerateIngress, ssc.GenerateRouteGroup)
×
926
        if err != nil {
×
927
                return err
×
928
        }
×
929

930
        trafficChanges := ssc.TrafficChanges()
×
931
        if len(trafficChanges) != 0 {
×
932
                var changeMessages []string
×
933
                for _, change := range trafficChanges {
×
934
                        changeMessages = append(changeMessages, change.String())
×
935
                }
×
936

937
                c.recorder.Eventf(
×
938
                        ssc.StackSet,
×
939
                        v1.EventTypeNormal,
×
940
                        "TrafficSwitched",
×
941
                        "Switched traffic: %s",
×
942
                        strings.Join(changeMessages, ", "))
×
943
        }
944

945
        return nil
×
946
}
947

948
func (c *StackSetController) ReconcileStackSetDesiredTraffic(ctx context.Context, existing *zv1.StackSet, generateUpdated func() []*zv1.DesiredTraffic) error {
1✔
949
        updatedTraffic := generateUpdated()
1✔
950

1✔
951
        if equality.Semantic.DeepEqual(existing.Spec.Traffic, updatedTraffic) {
1✔
952
                return nil
×
953
        }
×
954

955
        updated := existing.DeepCopy()
1✔
956
        updated.Spec.Traffic = updatedTraffic
1✔
957

1✔
958
        _, err := c.client.ZalandoV1().StackSets(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
959
        if err != nil {
1✔
960
                return err
×
961
        }
×
962
        c.recorder.Eventf(
1✔
963
                updated,
1✔
964
                v1.EventTypeNormal,
1✔
965
                "UpdatedStackSet",
1✔
966
                "Updated StackSet %s",
1✔
967
                updated.Name)
1✔
968
        return nil
1✔
969
}
970

971
func (c *StackSetController) ReconcileStackResources(ctx context.Context, ssc *core.StackSetContainer, sc *core.StackContainer) error {
×
972

×
973
        err := c.ReconcileStackDeployment(ctx, sc.Stack, sc.Resources.Deployment, sc.GenerateDeployment)
×
974
        if err != nil {
×
975
                return c.errorEventf(sc.Stack, "FailedManageDeployment", err)
×
976
        }
×
977
        err = c.ReconcileStackHPA(ctx, sc.Stack, sc.Resources.HPA, sc.GenerateHPA)
×
978
        if err != nil {
×
979
                return c.errorEventf(sc.Stack, "FailedManageHPA", err)
×
980
        }
×
981

982
        err = c.ReconcileStackService(ctx, sc.Stack, sc.Resources.Service, sc.GenerateService)
×
983
        if err != nil {
×
984
                return c.errorEventf(sc.Stack, "FailedManageService", err)
×
985
        }
×
986

987
        err = c.ReconcileStackIngress(ctx, sc.Stack, sc.Resources.Ingress, sc.GenerateIngress)
×
988
        if err != nil {
×
989
                return c.errorEventf(sc.Stack, "FailedManageIngress", err)
×
990
        }
×
991

992
        if c.routeGroupSupportEnabled {
×
993
                err = c.ReconcileStackRouteGroup(ctx, sc.Stack, sc.Resources.RouteGroup, sc.GenerateRouteGroup)
×
994
                if err != nil {
×
995
                        return c.errorEventf(sc.Stack, "FailedManageRouteGroup", err)
×
996
                }
×
997
        }
998

999
        if c.resourceTemplatesEnabled {
×
1000
                err = c.ReconcileStackResourceTemplates(ctx, sc.Stack, sc.Resources.ResourceTemplates, sc.GenerateResourceTemplates)
×
1001
                if err != nil {
×
1002
                        return c.errorEventf(sc.Stack, "FailedManageResourceTemplates", err)
×
1003
                }
×
1004
        }
1005

1006
        return nil
×
1007
}
1008

1009
// ReconcileStackSet reconciles all the things from a stackset
1010
func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *core.StackSetContainer) (err error) {
×
1011
        defer func() {
×
1012
                if r := recover(); r != nil {
×
1013
                        c.metricsReporter.ReportPanic()
×
1014
                        c.stacksetLogger(container).Errorf("Encountered a panic while processing a stackset: %v\n%s", r, debug.Stack())
×
1015
                        err = fmt.Errorf("panic: %v", r)
×
1016
                }
×
1017
        }()
1018

1019
        // Create current stack, if needed. Proceed on errors.
1020
        err = c.CreateCurrentStack(ctx, container)
×
1021
        if err != nil {
×
1022
                err = c.errorEventf(container.StackSet, "FailedCreateStack", err)
×
1023
                c.stacksetLogger(container).Errorf("Unable to create stack: %v", err)
×
1024
        }
×
1025

1026
        // Update statuses from external resources (ingresses, deployments, etc). Abort on errors.
1027
        err = container.UpdateFromResources()
×
1028
        if err != nil {
×
1029
                return err
×
1030
        }
×
1031

1032
        // Update the stacks with the currently selected traffic reconciler. Proceed on errors.
1033
        err = container.ManageTraffic(time.Now())
×
1034
        if err != nil {
×
1035
                c.stacksetLogger(container).Errorf("Traffic reconciliation failed: %v", err)
×
1036
                c.recorder.Eventf(
×
1037
                        container.StackSet,
×
1038
                        v1.EventTypeWarning,
×
1039
                        "TrafficNotSwitched",
×
1040
                        "Failed to switch traffic: "+err.Error())
×
1041
        }
×
1042

1043
        // Mark stacks that should be removed
1044
        container.MarkExpiredStacks()
×
1045

×
1046
        // Reconcile stack resources. Proceed on errors.
×
1047
        for _, sc := range container.StackContainers {
×
1048
                err = c.ReconcileStackResources(ctx, container, sc)
×
1049
                if err != nil {
×
1050
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
1051
                        c.stackLogger(container, sc).Errorf("Unable to reconcile stack resources: %v", err)
×
1052
                }
×
1053
        }
1054

1055
        // Reconcile stackset resources (update ingress and/or routegroups). Proceed on errors.
1056
        err = c.ReconcileStackSetResources(ctx, container)
×
1057
        if err != nil {
×
1058
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1059
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset resources: %v", err)
×
1060
        }
×
1061

1062
        // Reconcile desired traffic in the stackset. Proceed on errors.
1063
        err = c.ReconcileStackSetDesiredTraffic(ctx, container.StackSet, container.GenerateStackSetTraffic)
×
1064
        if err != nil {
×
1065
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1066
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset traffic: %v", err)
×
1067
        }
×
1068

1069
        // Delete old stacks. Proceed on errors.
1070
        err = c.CleanupOldStacks(ctx, container)
×
1071
        if err != nil {
×
1072
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1073
                c.stacksetLogger(container).Errorf("Unable to delete old stacks: %v", err)
×
1074
        }
×
1075

1076
        // Update statuses.
1077
        err = c.ReconcileStatuses(ctx, container)
×
1078
        if err != nil {
×
1079
                return err
×
1080
        }
×
1081

1082
        return nil
×
1083
}
1084

1085
// getResetMinReplicasDelay parses and returns the reset delay if set in the
1086
// stackset annotation.
1087
func getResetMinReplicasDelay(annotations map[string]string) (time.Duration, bool) {
1✔
1088
        resetDelayStr, ok := annotations[ResetHPAMinReplicasDelayAnnotationKey]
1✔
1089
        if !ok {
2✔
1090
                return 0, false
1✔
1091
        }
1✔
1092
        resetDelay, err := time.ParseDuration(resetDelayStr)
1✔
1093
        if err != nil {
1✔
1094
                return 0, false
×
1095
        }
×
1096
        return resetDelay, true
1✔
1097
}
1098

1099
func fixupStackSetTypeMeta(stackset *zv1.StackSet) {
1✔
1100
        // set TypeMeta manually because of this bug:
1✔
1101
        // https://github.com/kubernetes/client-go/issues/308
1✔
1102
        stackset.APIVersion = core.APIVersion
1✔
1103
        stackset.Kind = core.KindStackSet
1✔
1104
}
1✔
1105

1106
func fixupStackTypeMeta(stack *zv1.Stack) {
1✔
1107
        // set TypeMeta manually because of this bug:
1✔
1108
        // https://github.com/kubernetes/client-go/issues/308
1✔
1109
        stack.APIVersion = core.APIVersion
1✔
1110
        stack.Kind = core.KindStack
1✔
1111
}
1✔
1112

1113
func resourceReady(timestamp string, ttl time.Duration) (bool, error) {
1✔
1114
        resourceLastUpdated, err := time.Parse(time.RFC3339, timestamp)
1✔
1115
        if err != nil {
2✔
1116
                // wait until there's a valid timestamp on the annotation
1✔
1117
                return false, err
1✔
1118
        }
1✔
1119

1120
        if !resourceLastUpdated.IsZero() && time.Since(resourceLastUpdated) > ttl {
2✔
1121
                return true, nil
1✔
1122
        }
1✔
1123

1124
        return false, nil
1✔
1125
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc