• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando-incubator / stackset-controller / 8268314348

13 Mar 2024 04:37PM UTC coverage: 53.536% (+1.2%) from 52.337%
8268314348

Pull #587

github

katyanna
Add unit tests, support PCS update

Signed-off-by: Katyanna Moura <amelie.kn@gmail.com>
Pull Request #587: Support PlatformCredentialsSet versioning

70 of 146 new or added lines in 6 files covered. (47.95%)

1 existing line in 1 file now uncovered.

3225 of 6024 relevant lines covered (53.54%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.46
/controller/stackset.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "net/http"
7
        "runtime/debug"
8
        "strings"
9
        "sync"
10
        "time"
11

12
        "github.com/google/go-cmp/cmp"
13
        "github.com/google/go-cmp/cmp/cmpopts"
14
        "github.com/heptiolabs/healthcheck"
15
        "github.com/prometheus/client_golang/prometheus"
16
        log "github.com/sirupsen/logrus"
17
        rgv1 "github.com/szuecs/routegroup-client/apis/zalando.org/v1"
18
        zv1 "github.com/zalando-incubator/stackset-controller/pkg/apis/zalando.org/v1"
19
        "github.com/zalando-incubator/stackset-controller/pkg/clientset"
20
        "github.com/zalando-incubator/stackset-controller/pkg/core"
21
        "github.com/zalando-incubator/stackset-controller/pkg/recorder"
22
        "golang.org/x/sync/errgroup"
23
        v1 "k8s.io/api/core/v1"
24
        networking "k8s.io/api/networking/v1"
25
        "k8s.io/apimachinery/pkg/api/equality"
26
        "k8s.io/apimachinery/pkg/api/errors"
27
        "k8s.io/apimachinery/pkg/api/resource"
28
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
        "k8s.io/apimachinery/pkg/fields"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        "k8s.io/client-go/tools/cache"
33
        kube_record "k8s.io/client-go/tools/record"
34
)
35

36
const (
37
        PrescaleStacksAnnotationKey               = "alpha.stackset-controller.zalando.org/prescale-stacks"
38
        ResetHPAMinReplicasDelayAnnotationKey     = "alpha.stackset-controller.zalando.org/reset-hpa-min-replicas-delay"
39
        StacksetControllerControllerAnnotationKey = "stackset-controller.zalando.org/controller"
40
        ControllerLastUpdatedAnnotationKey        = "stackset-controller.zalando.org/updated-timestamp"
41
        TrafficSegmentsAnnotationKey              = "stackset-controller.zalando.org/use-traffic-segments"
42

43
        reasonFailedManageStackSet = "FailedManageStackSet"
44

45
        defaultResetMinReplicasDelay = 10 * time.Minute
46
)
47

48
var configurationResourceNameError = "ConfigurationResource name must be prefixed by Stack name. ConfigurationResource: %s, Stack: %s"
49

50
// StackSetController is the main controller. It watches for changes to
51
// stackset resources and starts and maintains other controllers per
52
// stackset resource.
53
type StackSetController struct {
54
        logger                      *log.Entry
55
        client                      clientset.Interface
56
        namespace                   string
57
        syncIngressAnnotations      []string
58
        controllerID                string
59
        backendWeightsAnnotationKey string
60
        clusterDomains              []string
61
        interval                    time.Duration
62
        stacksetEvents              chan stacksetEvent
63
        stacksetStore               map[types.UID]zv1.StackSet
64
        recorder                    kube_record.EventRecorder
65
        metricsReporter             *core.MetricsReporter
66
        HealthReporter              healthcheck.Handler
67
        routeGroupSupportEnabled    bool
68
        trafficSegmentsEnabled      bool
69
        annotatedTrafficSegments    bool
70
        ingressSourceSwitchTTL      time.Duration
71
        now                         func() string
72
        reconcileWorkers            int
73
        configMapSupportEnabled     bool
74
        secretSupportEnabled        bool
75
        sync.Mutex
76
}
77

78
type stacksetEvent struct {
79
        Deleted  bool
80
        StackSet *zv1.StackSet
81
}
82

83
// eventedError wraps an error that was already exposed as an event to the user
84
type eventedError struct {
85
        err error
86
}
87

88
func (ee *eventedError) Error() string {
×
89
        return ee.err.Error()
×
90
}
×
91

92
func now() string {
×
93
        return time.Now().Format(time.RFC3339)
×
94
}
×
95

96
// NewStackSetController initializes a new StackSetController.
97
func NewStackSetController(
98
        client clientset.Interface,
99
        namespace string,
100
        controllerID string,
101
        parallelWork int,
102
        backendWeightsAnnotationKey string,
103
        clusterDomains []string,
104
        registry prometheus.Registerer,
105
        interval time.Duration,
106
        routeGroupSupportEnabled bool,
107
        trafficSegmentsEnabled bool,
108
        annotatedTrafficSegments bool,
109
        syncIngressAnnotations []string,
110
        configMapSupportEnabled bool,
111
        secretSupportEnabled bool,
112
        ingressSourceSwitchTTL time.Duration,
113
) (*StackSetController, error) {
1✔
114
        metricsReporter, err := core.NewMetricsReporter(registry)
1✔
115
        if err != nil {
1✔
116
                return nil, err
×
117
        }
×
118

119
        return &StackSetController{
1✔
120
                logger:                      log.WithFields(log.Fields{"controller": "stackset"}),
1✔
121
                client:                      client,
1✔
122
                namespace:                   namespace,
1✔
123
                controllerID:                controllerID,
1✔
124
                backendWeightsAnnotationKey: backendWeightsAnnotationKey,
1✔
125
                clusterDomains:              clusterDomains,
1✔
126
                interval:                    interval,
1✔
127
                stacksetEvents:              make(chan stacksetEvent, 1),
1✔
128
                stacksetStore:               make(map[types.UID]zv1.StackSet),
1✔
129
                recorder:                    recorder.CreateEventRecorder(client),
1✔
130
                metricsReporter:             metricsReporter,
1✔
131
                HealthReporter:              healthcheck.NewHandler(),
1✔
132
                routeGroupSupportEnabled:    routeGroupSupportEnabled,
1✔
133
                trafficSegmentsEnabled:      trafficSegmentsEnabled,
1✔
134
                annotatedTrafficSegments:    annotatedTrafficSegments,
1✔
135
                syncIngressAnnotations:      syncIngressAnnotations,
1✔
136
                ingressSourceSwitchTTL:      ingressSourceSwitchTTL,
1✔
137
                configMapSupportEnabled:     configMapSupportEnabled,
1✔
138
                secretSupportEnabled:        secretSupportEnabled,
1✔
139
                now:                         now,
1✔
140
                reconcileWorkers:            parallelWork,
1✔
141
        }, nil
1✔
142
}
143

144
func (c *StackSetController) stacksetLogger(ssc *core.StackSetContainer) *log.Entry {
×
145
        return c.logger.WithFields(map[string]interface{}{
×
146
                "namespace": ssc.StackSet.Namespace,
×
147
                "stackset":  ssc.StackSet.Name,
×
148
        })
×
149
}
×
150

151
func (c *StackSetController) stackLogger(ssc *core.StackSetContainer, sc *core.StackContainer) *log.Entry {
×
152
        return c.logger.WithFields(map[string]interface{}{
×
153
                "namespace": ssc.StackSet.Namespace,
×
154
                "stackset":  ssc.StackSet.Name,
×
155
                "stack":     sc.Name(),
×
156
        })
×
157
}
×
158

159
// Run runs the main loop of the StackSetController. Before the loops it
160
// sets up a watcher to watch StackSet resources. The watch will send
161
// changes over a channel which is polled from the main loop.
162
func (c *StackSetController) Run(ctx context.Context) {
×
163
        var nextCheck time.Time
×
164

×
165
        // We're not alive if nextCheck is too far in the past
×
166
        c.HealthReporter.AddLivenessCheck("nextCheck", func() error {
×
167
                if time.Since(nextCheck) > 5*c.interval {
×
168
                        return fmt.Errorf("nextCheck too old")
×
169
                }
×
170
                return nil
×
171
        })
172

173
        c.startWatch(ctx)
×
174

×
175
        http.HandleFunc("/healthz", c.HealthReporter.LiveEndpoint)
×
176

×
177
        nextCheck = time.Now().Add(-c.interval)
×
178

×
179
        for {
×
180
                select {
×
181
                case <-time.After(time.Until(nextCheck)):
×
182

×
183
                        nextCheck = time.Now().Add(c.interval)
×
184

×
185
                        stackSetContainers, err := c.collectResources(ctx)
×
186
                        if err != nil {
×
187
                                c.logger.Errorf("Failed to collect resources: %v", err)
×
188
                                continue
×
189
                        }
190

191
                        var reconcileGroup errgroup.Group
×
192
                        reconcileGroup.SetLimit(c.reconcileWorkers)
×
193
                        for stackset, container := range stackSetContainers {
×
194
                                container := container
×
195
                                stackset := stackset
×
196

×
197
                                reconcileGroup.Go(func() error {
×
198
                                        if _, ok := c.stacksetStore[stackset]; ok {
×
199
                                                err := c.ReconcileStackSet(ctx, container)
×
200
                                                if err != nil {
×
201
                                                        c.stacksetLogger(container).Errorf("unable to reconcile a stackset: %v", err)
×
202
                                                        return c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
203
                                                }
×
204
                                        }
205
                                        return nil
×
206
                                })
207
                        }
208

209
                        err = reconcileGroup.Wait()
×
210
                        if err != nil {
×
211
                                c.logger.Errorf("Failed waiting for reconcilers: %v", err)
×
212
                        }
×
213
                        err = c.metricsReporter.Report(stackSetContainers)
×
214
                        if err != nil {
×
215
                                c.logger.Errorf("Failed reporting metrics: %v", err)
×
216
                        }
×
217
                case e := <-c.stacksetEvents:
×
218
                        stackset := *e.StackSet
×
219
                        fixupStackSetTypeMeta(&stackset)
×
220

×
221
                        // update/delete existing entry
×
222
                        if _, ok := c.stacksetStore[stackset.UID]; ok {
×
223
                                if e.Deleted || !c.hasOwnership(&stackset) {
×
224
                                        delete(c.stacksetStore, stackset.UID)
×
225
                                        continue
×
226
                                }
227

228
                                // update stackset entry
229
                                c.stacksetStore[stackset.UID] = stackset
×
230
                                continue
×
231
                        }
232

233
                        // check if stackset should be managed by the controller
234
                        if !c.hasOwnership(&stackset) {
×
235
                                continue
×
236
                        }
237

238
                        c.logger.Infof("Adding entry for StackSet %s/%s", stackset.Namespace, stackset.Name)
×
239
                        c.stacksetStore[stackset.UID] = stackset
×
240
                case <-ctx.Done():
×
241
                        c.logger.Info("Terminating main controller loop.")
×
242
                        return
×
243
                }
244
        }
245
}
246

247
// injectSegmentAnnotation injects the traffic segment annotation if it's not
248
// already present.
249
//
250
// Only inject the traffic segment annotation if the controller has traffic
251
// segments enabled by default, i.e. doesn't matter if the StackSet has the
252
// segment annotation.
253
func (c *StackSetController) injectSegmentAnnotation(
254
        ctx context.Context,
255
        stackSet *zv1.StackSet,
256
) bool {
×
257
        if c.annotatedTrafficSegments {
×
258
                return false
×
259
        }
×
260

261
        if !c.trafficSegmentsEnabled {
×
262
                return false
×
263
        }
×
264

265
        if stackSet.Annotations[TrafficSegmentsAnnotationKey] == "true" {
×
266
                return false
×
267
        }
×
268

269
        stackSet.Annotations[TrafficSegmentsAnnotationKey] = "true"
×
270

×
271
        _, err := c.client.ZalandoV1().StackSets(
×
272
                stackSet.Namespace,
×
273
        ).Update(
×
274
                ctx,
×
275
                stackSet,
×
276
                metav1.UpdateOptions{},
×
277
        )
×
278
        if err != nil {
×
279
                c.logger.Errorf(
×
280
                        "Failed injecting segment annotation: %v",
×
281
                        err,
×
282
                )
×
283
                return false
×
284
        }
×
285
        c.recorder.Eventf(
×
286
                stackSet,
×
287
                v1.EventTypeNormal,
×
288
                "UpdatedStackSet",
×
289
                "Updated StackSet %s. Injected %s annotation",
×
290
                stackSet.Name,
×
291
                TrafficSegmentsAnnotationKey,
×
292
        )
×
293

×
294
        return true
×
295
}
296

297
// collectResources collects resources for all stacksets at once and stores them per StackSet/Stack so that we don't
298
// overload the API requests with unnecessary requests
299
func (c *StackSetController) collectResources(ctx context.Context) (map[types.UID]*core.StackSetContainer, error) {
1✔
300
        stacksets := make(map[types.UID]*core.StackSetContainer, len(c.stacksetStore))
1✔
301
        for uid, stackset := range c.stacksetStore {
2✔
302
                stackset := stackset
1✔
303

1✔
304
                reconciler := core.TrafficReconciler(&core.SimpleTrafficReconciler{})
1✔
305

1✔
306
                // use prescaling logic if enabled with an annotation
1✔
307
                if _, ok := stackset.Annotations[PrescaleStacksAnnotationKey]; ok {
2✔
308
                        resetDelay := defaultResetMinReplicasDelay
1✔
309
                        if resetDelayValue, ok := getResetMinReplicasDelay(stackset.Annotations); ok {
2✔
310
                                resetDelay = resetDelayValue
1✔
311
                        }
1✔
312
                        reconciler = &core.PrescalingTrafficReconciler{
1✔
313
                                ResetHPAMinReplicasTimeout: resetDelay,
1✔
314
                        }
1✔
315
                }
316

317
                stacksetContainer := core.NewContainer(&stackset, reconciler, c.backendWeightsAnnotationKey, c.clusterDomains)
1✔
318
                if c.trafficSegmentsEnabled || c.annotatedTrafficSegments {
2✔
319

1✔
320
                        if stackset.Annotations[TrafficSegmentsAnnotationKey] == "true" {
2✔
321
                                stacksetContainer.EnableSegmentTraffic()
1✔
322
                                stacksetContainer.SynchronizeIngressAnnotations(
1✔
323
                                        c.syncIngressAnnotations,
1✔
324
                                )
1✔
325
                        }
1✔
326
                }
327
                stacksets[uid] = stacksetContainer
1✔
328
        }
329

330
        err := c.collectStacks(ctx, stacksets)
1✔
331
        if err != nil {
1✔
332
                return nil, err
×
333
        }
×
334

335
        err = c.collectIngresses(ctx, stacksets)
1✔
336
        if err != nil {
1✔
337
                return nil, err
×
338
        }
×
339

340
        if c.routeGroupSupportEnabled {
2✔
341
                err = c.collectRouteGroups(ctx, stacksets)
1✔
342
                if err != nil {
1✔
343
                        return nil, err
×
344
                }
×
345
        }
346

347
        err = c.collectDeployments(ctx, stacksets)
1✔
348
        if err != nil {
1✔
349
                return nil, err
×
350
        }
×
351

352
        err = c.collectServices(ctx, stacksets)
1✔
353
        if err != nil {
1✔
354
                return nil, err
×
355
        }
×
356

357
        err = c.collectHPAs(ctx, stacksets)
1✔
358
        if err != nil {
1✔
359
                return nil, err
×
360
        }
×
361

362
        if c.configMapSupportEnabled {
2✔
363
                err = c.collectConfigMaps(ctx, stacksets)
1✔
364
                if err != nil {
1✔
365
                        return nil, err
×
366
                }
×
367
        }
368

369
        if c.secretSupportEnabled {
2✔
370
                err = c.collectSecrets(ctx, stacksets)
1✔
371
                if err != nil {
1✔
372
                        return nil, err
×
373
                }
×
374
        }
375

376
        // if c.pcsSupportEnabled {
377
        err = c.collectPlatformCredentialsSet(ctx, stacksets)
1✔
378
        if err != nil {
1✔
NEW
379
                return nil, err
×
NEW
380
        }
×
381
        // }
382

383
        return stacksets, nil
1✔
384
}
385

386
func (c *StackSetController) collectIngresses(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
387
        ingresses, err := c.client.NetworkingV1().Ingresses(c.namespace).List(ctx, metav1.ListOptions{})
1✔
388

1✔
389
        if err != nil {
1✔
390
                return fmt.Errorf("failed to list Ingresses: %v", err)
×
391
        }
×
392

393
        for _, i := range ingresses.Items {
2✔
394
                ingress := i
1✔
395
                if uid, ok := getOwnerUID(ingress.ObjectMeta); ok {
2✔
396
                        // stackset ingress
1✔
397
                        if s, ok := stacksets[uid]; ok {
2✔
398
                                s.Ingress = &ingress
1✔
399
                                continue
1✔
400
                        }
401

402
                        // stack ingress
403
                        for _, stackset := range stacksets {
2✔
404
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
405
                                        if strings.HasSuffix(
1✔
406
                                                ingress.ObjectMeta.Name,
1✔
407
                                                core.SegmentSuffix,
1✔
408
                                        ) {
2✔
409
                                                // Traffic Segment
1✔
410
                                                s.Resources.IngressSegment = &ingress
1✔
411
                                        } else {
2✔
412
                                                s.Resources.Ingress = &ingress
1✔
413
                                        }
1✔
414
                                        break
1✔
415
                                }
416
                        }
417
                }
418
        }
419
        return nil
1✔
420
}
421

422
func (c *StackSetController) collectRouteGroups(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
423
        rgs, err := c.client.RouteGroupV1().RouteGroups(c.namespace).List(
1✔
424
                ctx,
1✔
425
                metav1.ListOptions{},
1✔
426
        )
1✔
427
        if err != nil {
1✔
428
                return fmt.Errorf("failed to list RouteGroups: %v", err)
×
429
        }
×
430

431
        for _, rg := range rgs.Items {
2✔
432
                routegroup := rg
1✔
433
                if uid, ok := getOwnerUID(routegroup.ObjectMeta); ok {
2✔
434
                        // stackset routegroups
1✔
435
                        if s, ok := stacksets[uid]; ok {
2✔
436
                                s.RouteGroup = &routegroup
1✔
437
                                continue
1✔
438
                        }
439

440
                        // stack routegroups
441
                        for _, stackset := range stacksets {
2✔
442
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
443
                                        if strings.HasSuffix(
1✔
444
                                                routegroup.ObjectMeta.Name,
1✔
445
                                                core.SegmentSuffix,
1✔
446
                                        ) {
2✔
447
                                                // Traffic Segment
1✔
448
                                                s.Resources.RouteGroupSegment = &routegroup
1✔
449
                                        } else {
2✔
450
                                                s.Resources.RouteGroup = &routegroup
1✔
451
                                        }
1✔
452
                                        break
1✔
453
                                }
454
                        }
455
                }
456
        }
457
        return nil
1✔
458
}
459

460
func (c *StackSetController) collectStacks(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
461
        stacks, err := c.client.ZalandoV1().Stacks(c.namespace).List(ctx, metav1.ListOptions{})
1✔
462
        if err != nil {
1✔
463
                return fmt.Errorf("failed to list Stacks: %v", err)
×
464
        }
×
465

466
        for _, stack := range stacks.Items {
2✔
467
                if uid, ok := getOwnerUID(stack.ObjectMeta); ok {
2✔
468
                        if s, ok := stacksets[uid]; ok {
2✔
469
                                stack := stack
1✔
470
                                fixupStackTypeMeta(&stack)
1✔
471

1✔
472
                                s.StackContainers[stack.UID] = &core.StackContainer{
1✔
473
                                        Stack: &stack,
1✔
474
                                }
1✔
475
                                continue
1✔
476
                        }
477
                }
478
        }
479
        return nil
1✔
480
}
481

482
func (c *StackSetController) collectDeployments(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
483
        deployments, err := c.client.AppsV1().Deployments(c.namespace).List(ctx, metav1.ListOptions{})
1✔
484
        if err != nil {
1✔
485
                return fmt.Errorf("failed to list Deployments: %v", err)
×
486
        }
×
487

488
        for _, d := range deployments.Items {
2✔
489
                deployment := d
1✔
490
                if uid, ok := getOwnerUID(deployment.ObjectMeta); ok {
2✔
491
                        for _, stackset := range stacksets {
2✔
492
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
493
                                        s.Resources.Deployment = &deployment
1✔
494
                                        break
1✔
495
                                }
496
                        }
497
                }
498
        }
499
        return nil
1✔
500
}
501

502
func (c *StackSetController) collectServices(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
503
        services, err := c.client.CoreV1().Services(c.namespace).List(ctx, metav1.ListOptions{})
1✔
504
        if err != nil {
1✔
505
                return fmt.Errorf("failed to list Services: %v", err)
×
506
        }
×
507

508
Items:
1✔
509
        for _, s := range services.Items {
2✔
510
                service := s
1✔
511
                if uid, ok := getOwnerUID(service.ObjectMeta); ok {
2✔
512
                        for _, stackset := range stacksets {
2✔
513
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
514
                                        s.Resources.Service = &service
1✔
515
                                        continue Items
1✔
516
                                }
517

518
                                // service/HPA used to be owned by the deployment for some reason
519
                                for _, stack := range stackset.StackContainers {
2✔
520
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
521
                                                stack.Resources.Service = &service
1✔
522
                                                continue Items
1✔
523
                                        }
524
                                }
525
                        }
526
                }
527
        }
528
        return nil
1✔
529
}
530

531
func (c *StackSetController) collectHPAs(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
1✔
532
        hpas, err := c.client.AutoscalingV2().HorizontalPodAutoscalers(c.namespace).List(ctx, metav1.ListOptions{})
1✔
533
        if err != nil {
1✔
534
                return fmt.Errorf("failed to list HPAs: %v", err)
×
535
        }
×
536

537
Items:
1✔
538
        for _, h := range hpas.Items {
2✔
539
                hpa := h
1✔
540
                if uid, ok := getOwnerUID(hpa.ObjectMeta); ok {
2✔
541
                        for _, stackset := range stacksets {
2✔
542
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
543
                                        s.Resources.HPA = &hpa
1✔
544
                                        continue Items
1✔
545
                                }
546

547
                                // service/HPA used to be owned by the deployment for some reason
548
                                for _, stack := range stackset.StackContainers {
2✔
549
                                        if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid {
2✔
550
                                                stack.Resources.HPA = &hpa
1✔
551
                                                continue Items
1✔
552
                                        }
553
                                }
554
                        }
555
                }
556
        }
557
        return nil
1✔
558
}
559

560
func (c *StackSetController) collectConfigMaps(
561
        ctx context.Context,
562
        stacksets map[types.UID]*core.StackSetContainer,
563
) error {
1✔
564
        configMaps, err := c.client.CoreV1().ConfigMaps(c.namespace).List(ctx, metav1.ListOptions{})
1✔
565
        if err != nil {
1✔
566
                return fmt.Errorf("failed to list ConfigMaps: %v", err)
×
567
        }
×
568

569
        for _, cm := range configMaps.Items {
2✔
570
                configMap := cm
1✔
571
                if uid, ok := getOwnerUID(configMap.ObjectMeta); ok {
2✔
572
                        for _, stackset := range stacksets {
2✔
573
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
574
                                        s.Resources.ConfigMaps = append(s.Resources.ConfigMaps, &configMap)
1✔
575
                                        break
1✔
576
                                }
577
                        }
578
                }
579
        }
580
        return nil
1✔
581
}
582

583
func (c *StackSetController) collectSecrets(
584
        ctx context.Context,
585
        stacksets map[types.UID]*core.StackSetContainer,
586
) error {
1✔
587
        secrets, err := c.client.CoreV1().Secrets(c.namespace).List(ctx, metav1.ListOptions{})
1✔
588
        if err != nil {
1✔
589
                return fmt.Errorf("failed to list Secrets: %v", err)
×
590
        }
×
591

592
        for _, sct := range secrets.Items {
2✔
593
                secret := sct
1✔
594
                if uid, ok := getOwnerUID(secret.ObjectMeta); ok {
2✔
595
                        for _, stackset := range stacksets {
2✔
596
                                if s, ok := stackset.StackContainers[uid]; ok {
2✔
597
                                        s.Resources.Secrets = append(s.Resources.Secrets, &secret)
1✔
598
                                        break
1✔
599
                                }
600
                        }
601
                }
602
        }
603
        return nil
1✔
604
}
605

606
func (c *StackSetController) collectPlatformCredentialsSet(
607
        ctx context.Context,
608
        stacksets map[types.UID]*core.StackSetContainer,
609
) error {
1✔
610
        platformCredentialsSets, err := c.client.ZalandoV1().PlatformCredentialsSets(c.namespace).
1✔
611
                List(ctx, metav1.ListOptions{})
1✔
612
        if err != nil {
1✔
NEW
613
                return fmt.Errorf("failed to list PlatformCredentialsSet: %v", err)
×
NEW
614
        }
×
615

616
        for _, platformCredentialsSet := range platformCredentialsSets.Items {
1✔
NEW
617
                pcs := platformCredentialsSet
×
NEW
618
                if uid, ok := getOwnerUID(platformCredentialsSet.ObjectMeta); ok {
×
NEW
619
                        for _, stackset := range stacksets {
×
NEW
620
                                if s, ok := stackset.StackContainers[uid]; ok {
×
NEW
621
                                        s.Resources.PlatformCredentialsSets = append(
×
NEW
622
                                                s.Resources.PlatformCredentialsSets,
×
NEW
623
                                                &pcs,
×
NEW
624
                                        )
×
NEW
625
                                        break
×
626
                                }
627
                        }
628
                }
629
        }
630
        return nil
1✔
631
}
632

633
func getOwnerUID(objectMeta metav1.ObjectMeta) (types.UID, bool) {
1✔
634
        if len(objectMeta.OwnerReferences) == 1 {
2✔
635
                return objectMeta.OwnerReferences[0].UID, true
1✔
636
        }
1✔
637
        return "", false
1✔
638
}
639

640
func (c *StackSetController) errorEventf(object runtime.Object, reason string, err error) error {
×
641
        switch err.(type) {
×
642
        case *eventedError:
×
643
                // already notified
×
644
                return err
×
645
        default:
×
646
                c.recorder.Eventf(
×
647
                        object,
×
648
                        v1.EventTypeWarning,
×
649
                        reason,
×
650
                        err.Error())
×
651
                return &eventedError{err: err}
×
652
        }
653
}
654

655
// hasOwnership returns true if the controller is the "owner" of the stackset.
656
// Whether it's owner is determined by the value of the
657
// 'stackset-controller.zalando.org/controller' annotation. If the value
658
// matches the controllerID then it owns it, or if the controllerID is
659
// "" and there's no annotation set.
660
func (c *StackSetController) hasOwnership(stackset *zv1.StackSet) bool {
×
661
        if stackset.Annotations != nil {
×
662
                if owner, ok := stackset.Annotations[StacksetControllerControllerAnnotationKey]; ok {
×
663
                        return owner == c.controllerID
×
664
                }
×
665
        }
666
        return c.controllerID == ""
×
667
}
668

669
func (c *StackSetController) startWatch(ctx context.Context) {
×
670
        informer := cache.NewSharedIndexInformer(
×
671
                cache.NewListWatchFromClient(c.client.ZalandoV1().RESTClient(), "stacksets", c.namespace, fields.Everything()),
×
672
                &zv1.StackSet{},
×
673
                0, // skip resync
×
674
                cache.Indexers{},
×
675
        )
×
676

×
677
        informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
×
678
                AddFunc:    c.add,
×
679
                UpdateFunc: c.update,
×
680
                DeleteFunc: c.del,
×
681
        })
×
682
        go informer.Run(ctx.Done())
×
683
        if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
×
684
                c.logger.Errorf("Timed out waiting for caches to sync")
×
685
                return
×
686
        }
×
687
        c.logger.Info("Synced StackSet watcher")
×
688
}
689

690
func (c *StackSetController) add(obj interface{}) {
×
691
        stackset, ok := obj.(*zv1.StackSet)
×
692
        if !ok {
×
693
                return
×
694
        }
×
695

696
        c.logger.Infof("New StackSet added %s/%s", stackset.Namespace, stackset.Name)
×
697
        c.stacksetEvents <- stacksetEvent{
×
698
                StackSet: stackset.DeepCopy(),
×
699
        }
×
700
}
701

702
func (c *StackSetController) update(oldObj, newObj interface{}) {
×
703
        newStackset, ok := newObj.(*zv1.StackSet)
×
704
        if !ok {
×
705
                return
×
706
        }
×
707

708
        oldStackset, ok := oldObj.(*zv1.StackSet)
×
709
        if !ok {
×
710
                return
×
711
        }
×
712

713
        c.logger.Debugf("StackSet %s/%s changed: %s",
×
714
                newStackset.Namespace,
×
715
                newStackset.Name,
×
716
                cmp.Diff(oldStackset, newStackset, cmpopts.IgnoreUnexported(resource.Quantity{})),
×
717
        )
×
718

×
719
        c.logger.Infof("StackSet updated %s/%s", newStackset.Namespace, newStackset.Name)
×
720
        c.stacksetEvents <- stacksetEvent{
×
721
                StackSet: newStackset.DeepCopy(),
×
722
        }
×
723
}
724

725
func (c *StackSetController) del(obj interface{}) {
×
726
        stackset, ok := obj.(*zv1.StackSet)
×
727
        if !ok {
×
728
                return
×
729
        }
×
730

731
        c.logger.Infof("StackSet deleted %s/%s", stackset.Namespace, stackset.Name)
×
732
        c.stacksetEvents <- stacksetEvent{
×
733
                StackSet: stackset.DeepCopy(),
×
734
                Deleted:  true,
×
735
        }
×
736
}
737

738
func retryUpdate(updateFn func(retry bool) error) error {
×
739
        retry := false
×
740
        for {
×
741
                err := updateFn(retry)
×
742
                if err != nil {
×
743
                        if errors.IsConflict(err) {
×
744
                                retry = true
×
745
                                continue
×
746
                        }
747
                        return err
×
748
                }
749
                return nil
×
750
        }
751
}
752

753
// ReconcileStatuses reconciles the statuses of StackSets and Stacks.
754
func (c *StackSetController) ReconcileStatuses(ctx context.Context, ssc *core.StackSetContainer) error {
×
755
        for _, sc := range ssc.StackContainers {
×
756
                stack := sc.Stack.DeepCopy()
×
757
                status := *sc.GenerateStackStatus()
×
758
                err := retryUpdate(func(retry bool) error {
×
759
                        if retry {
×
760
                                updated, err := c.client.ZalandoV1().Stacks(sc.Namespace()).Get(ctx, stack.Name, metav1.GetOptions{})
×
761
                                if err != nil {
×
762
                                        return err
×
763
                                }
×
764
                                stack = updated
×
765
                        }
766
                        if !equality.Semantic.DeepEqual(status, stack.Status) {
×
767
                                stack.Status = status
×
768
                                _, err := c.client.ZalandoV1().Stacks(sc.Namespace()).UpdateStatus(ctx, stack, metav1.UpdateOptions{})
×
769
                                return err
×
770
                        }
×
771
                        return nil
×
772
                })
773
                if err != nil {
×
774
                        return c.errorEventf(sc.Stack, "FailedUpdateStackStatus", err)
×
775
                }
×
776
        }
777

778
        stackset := ssc.StackSet.DeepCopy()
×
779
        status := *ssc.GenerateStackSetStatus()
×
780
        err := retryUpdate(func(retry bool) error {
×
781
                if retry {
×
782
                        updated, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).Get(ctx, ssc.StackSet.Name, metav1.GetOptions{})
×
783
                        if err != nil {
×
784
                                return err
×
785
                        }
×
786
                        stackset = updated
×
787
                }
788
                if !equality.Semantic.DeepEqual(status, stackset.Status) {
×
789
                        stackset.Status = status
×
790
                        _, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, stackset, metav1.UpdateOptions{})
×
791
                        return err
×
792
                }
×
793
                return nil
×
794
        })
795
        if err != nil {
×
796
                return c.errorEventf(ssc.StackSet, "FailedUpdateStackSetStatus", err)
×
797
        }
×
798
        return nil
×
799
}
800

801
// ReconcileTrafficSegments updates the traffic segments according to the actual
802
// traffic weight of each stack.
803
//
804
// Returns the ordered list of Trafic Segments that need to be updated.
805
func (c *StackSetController) ReconcileTrafficSegments(
806
        ctx context.Context,
807
        ssc *core.StackSetContainer,
808
) ([]types.UID, error) {
×
809
        // Compute segments
×
810
        toUpdate, err := ssc.ComputeTrafficSegments()
×
811
        if err != nil {
×
812
                return nil, c.errorEventf(ssc.StackSet, "FailedManageSegments", err)
×
813
        }
×
814

815
        return toUpdate, nil
×
816
}
817

818
// CreateCurrentStack creates a new Stack object for the current stack, if needed
819
func (c *StackSetController) CreateCurrentStack(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
820
        newStack, newStackVersion := ssc.NewStack()
1✔
821
        if newStack == nil {
2✔
822
                return nil
1✔
823
        }
1✔
824

825
        if c.configMapSupportEnabled || c.secretSupportEnabled {
2✔
826
                // ensure that ConfigurationResources are prefixed by Stack name.
1✔
827
                if err := validateAllConfigurationResourcesNames(newStack.Stack); err != nil {
1✔
828
                        return err
×
829
                }
×
830
        }
831

832
        created, err := c.client.ZalandoV1().Stacks(newStack.Namespace()).Create(ctx, newStack.Stack, metav1.CreateOptions{})
1✔
833
        if err != nil {
1✔
834
                return err
×
835
        }
×
836
        fixupStackTypeMeta(created)
1✔
837

1✔
838
        c.recorder.Eventf(
1✔
839
                ssc.StackSet,
1✔
840
                v1.EventTypeNormal,
1✔
841
                "CreatedStack",
1✔
842
                "Created stack %s",
1✔
843
                newStack.Name(),
1✔
844
        )
1✔
845

1✔
846
        // Persist ObservedStackVersion in the status
1✔
847
        updated := ssc.StackSet.DeepCopy()
1✔
848
        updated.Status.ObservedStackVersion = newStackVersion
1✔
849

1✔
850
        result, err := c.client.ZalandoV1().StackSets(ssc.StackSet.Namespace).UpdateStatus(ctx, updated, metav1.UpdateOptions{})
1✔
851
        if err != nil {
1✔
852
                return err
×
853
        }
×
854
        fixupStackSetTypeMeta(result)
1✔
855
        ssc.StackSet = result
1✔
856

1✔
857
        ssc.StackContainers[created.UID] = &core.StackContainer{
1✔
858
                Stack:          created,
1✔
859
                PendingRemoval: false,
1✔
860
                Resources:      core.StackResources{},
1✔
861
        }
1✔
862
        return nil
1✔
863
}
864

865
// CleanupOldStacks deletes stacks that are no longer needed.
866
func (c *StackSetController) CleanupOldStacks(ctx context.Context, ssc *core.StackSetContainer) error {
1✔
867
        for _, sc := range ssc.StackContainers {
2✔
868
                if !sc.PendingRemoval {
2✔
869
                        continue
1✔
870
                }
871

872
                stack := sc.Stack
1✔
873
                err := c.client.ZalandoV1().Stacks(stack.Namespace).Delete(ctx, stack.Name, metav1.DeleteOptions{})
1✔
874
                if err != nil {
1✔
875
                        return c.errorEventf(ssc.StackSet, "FailedDeleteStack", err)
×
876
                }
×
877
                c.recorder.Eventf(
1✔
878
                        ssc.StackSet,
1✔
879
                        v1.EventTypeNormal,
1✔
880
                        "DeletedExcessStack",
1✔
881
                        "Deleted excess stack %s",
1✔
882
                        stack.Name)
1✔
883
        }
884

885
        return nil
1✔
886
}
887

888
// AddUpdateStackSetIngress reconciles the Ingress but never deletes it, it returns the existing/new Ingress
889
func (c *StackSetController) AddUpdateStackSetIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup, ingress *networking.Ingress) (*networking.Ingress, error) {
1✔
890
        // Ingress removed, handled outside
1✔
891
        if ingress == nil {
2✔
892
                return existing, nil
1✔
893
        }
1✔
894

895
        if existing == nil {
2✔
896
                if ingress.Annotations == nil {
2✔
897
                        ingress.Annotations = make(map[string]string)
1✔
898
                }
1✔
899
                ingress.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
900

1✔
901
                createdIng, err := c.client.NetworkingV1().Ingresses(ingress.Namespace).Create(ctx, ingress, metav1.CreateOptions{})
1✔
902
                if err != nil {
1✔
903
                        return nil, err
×
904
                }
×
905
                c.recorder.Eventf(
1✔
906
                        stackset,
1✔
907
                        v1.EventTypeNormal,
1✔
908
                        "CreatedIngress",
1✔
909
                        "Created Ingress %s",
1✔
910
                        ingress.Name)
1✔
911
                return createdIng, nil
1✔
912
        }
913

914
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
915
        if existingHaveUpdateTimeStamp {
2✔
916
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
917
        }
1✔
918

919
        // Check if we need to update the Ingress
920
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(ingress.Spec, existing.Spec) &&
1✔
921
                equality.Semantic.DeepEqual(ingress.Annotations, existing.Annotations) &&
1✔
922
                equality.Semantic.DeepEqual(ingress.Labels, existing.Labels) {
2✔
923
                // add the annotation back after comparing
1✔
924
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
925
                return existing, nil
1✔
926
        }
1✔
927

928
        updated := existing.DeepCopy()
1✔
929
        updated.Spec = ingress.Spec
1✔
930
        if ingress.Annotations != nil {
2✔
931
                updated.Annotations = ingress.Annotations
1✔
932
        } else {
2✔
933
                updated.Annotations = make(map[string]string)
1✔
934
        }
1✔
935
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
936

1✔
937
        updated.Labels = ingress.Labels
1✔
938

1✔
939
        createdIngress, err := c.client.NetworkingV1().Ingresses(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
940
        if err != nil {
1✔
941
                return nil, err
×
942
        }
×
943
        c.recorder.Eventf(
1✔
944
                stackset,
1✔
945
                v1.EventTypeNormal,
1✔
946
                "UpdatedIngress",
1✔
947
                "Updated Ingress %s",
1✔
948
                ingress.Name)
1✔
949
        return createdIngress, nil
1✔
950
}
951

952
func (c *StackSetController) deleteIngress(ctx context.Context, stackset *zv1.StackSet, existing *networking.Ingress, routegroup *rgv1.RouteGroup) error {
1✔
953
        // Check if a routegroup exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
954
        if stackset.Spec.RouteGroup != nil && c.routeGroupSupportEnabled {
2✔
955
                if routegroup == nil {
2✔
956
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup missing", existing.Name)
1✔
957
                        return nil
1✔
958
                }
1✔
959
                timestamp, ok := routegroup.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
960
                // The only scenario version we could think of for this is
1✔
961
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
962
                //  in that case, just wait until the RouteGroup has the annotation
1✔
963
                if !ok {
2✔
964
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have the %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
965
                        return nil
1✔
966
                }
1✔
967

968
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
969
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s does not have a valid %s annotation yet", existing.Name, routegroup.Name, ControllerLastUpdatedAnnotationKey)
1✔
970
                        return nil
1✔
971
                } else if !ready {
3✔
972
                        c.logger.Infof("Not deleting Ingress %s yet, RouteGroup %s updated less than %s ago", existing.Name, routegroup.Name, c.ingressSourceSwitchTTL)
1✔
973
                        return nil
1✔
974
                }
1✔
975
        }
976
        err := c.client.NetworkingV1().Ingresses(existing.Namespace).Delete(ctx, existing.Name, metav1.DeleteOptions{})
1✔
977
        if err != nil {
1✔
978
                return err
×
979
        }
×
980
        c.recorder.Eventf(
1✔
981
                stackset,
1✔
982
                v1.EventTypeNormal,
1✔
983
                "DeletedIngress",
1✔
984
                "Deleted Ingress %s",
1✔
985
                existing.Namespace)
1✔
986
        return nil
1✔
987
}
988

989
// AddUpdateStackSetRouteGroup reconciles the RouteGroup but never deletes it, it returns the existing/new RouteGroup
990
func (c *StackSetController) AddUpdateStackSetRouteGroup(ctx context.Context, stackset *zv1.StackSet, existing *rgv1.RouteGroup, ingress *networking.Ingress, rg *rgv1.RouteGroup) (*rgv1.RouteGroup, error) {
1✔
991
        // RouteGroup removed, handled outside
1✔
992
        if rg == nil {
2✔
993
                return existing, nil
1✔
994
        }
1✔
995

996
        // Create new RouteGroup
997
        if existing == nil {
2✔
998
                if rg.Annotations == nil {
2✔
999
                        rg.Annotations = make(map[string]string)
1✔
1000
                }
1✔
1001
                rg.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
1002

1✔
1003
                createdRg, err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Create(ctx, rg, metav1.CreateOptions{})
1✔
1004
                if err != nil {
1✔
1005
                        return nil, err
×
1006
                }
×
1007
                c.recorder.Eventf(
1✔
1008
                        stackset,
1✔
1009
                        v1.EventTypeNormal,
1✔
1010
                        "CreatedRouteGroup",
1✔
1011
                        "Created RouteGroup %s",
1✔
1012
                        rg.Name)
1✔
1013
                return createdRg, nil
1✔
1014
        }
1015

1016
        lastUpdateValue, existingHaveUpdateTimeStamp := existing.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
1017
        if existingHaveUpdateTimeStamp {
2✔
1018
                delete(existing.Annotations, ControllerLastUpdatedAnnotationKey)
1✔
1019
        }
1✔
1020

1021
        // Check if we need to update the RouteGroup
1022
        if existingHaveUpdateTimeStamp && equality.Semantic.DeepDerivative(rg.Spec, existing.Spec) &&
1✔
1023
                equality.Semantic.DeepEqual(rg.Annotations, existing.Annotations) &&
1✔
1024
                equality.Semantic.DeepEqual(rg.Labels, existing.Labels) {
2✔
1025
                // add the annotation back after comparing
1✔
1026
                existing.Annotations[ControllerLastUpdatedAnnotationKey] = lastUpdateValue
1✔
1027
                return existing, nil
1✔
1028
        }
1✔
1029

1030
        updated := existing.DeepCopy()
1✔
1031
        updated.Spec = rg.Spec
1✔
1032
        if rg.Annotations != nil {
1✔
1033
                updated.Annotations = rg.Annotations
×
1034
        } else {
1✔
1035
                updated.Annotations = make(map[string]string)
1✔
1036
        }
1✔
1037
        updated.Annotations[ControllerLastUpdatedAnnotationKey] = c.now()
1✔
1038

1✔
1039
        updated.Labels = rg.Labels
1✔
1040

1✔
1041
        createdRg, err := c.client.RouteGroupV1().RouteGroups(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
1042
        if err != nil {
1✔
1043
                return nil, err
×
1044
        }
×
1045
        c.recorder.Eventf(
1✔
1046
                stackset,
1✔
1047
                v1.EventTypeNormal,
1✔
1048
                "UpdatedRouteGroup",
1✔
1049
                "Updated RouteGroup %s",
1✔
1050
                rg.Name)
1✔
1051
        return createdRg, nil
1✔
1052
}
1053

1054
func (c *StackSetController) deleteRouteGroup(ctx context.Context, stackset *zv1.StackSet, rg *rgv1.RouteGroup, ingress *networking.Ingress) error {
1✔
1055
        // Check if an ingress exists and if so only delete if it has existed for more than ingressSourceWithTTL time.
1✔
1056
        if stackset.Spec.Ingress != nil {
2✔
1057
                if ingress == nil {
2✔
1058
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress missing", rg.Name)
1✔
1059
                        return nil
1✔
1060
                }
1✔
1061
                timestamp, ok := ingress.Annotations[ControllerLastUpdatedAnnotationKey]
1✔
1062
                // The only scenario version we could think of for this is
1✔
1063
                //  if the RouteGroup was created by an older version of StackSet Controller
1✔
1064
                //  in that case, just wait until the RouteGroup has the annotation
1✔
1065
                if !ok {
2✔
1066
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have the %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
1067
                        return nil
1✔
1068
                }
1✔
1069

1070
                if ready, err := resourceReady(timestamp, c.ingressSourceSwitchTTL); err != nil {
2✔
1071
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s does not have a valid %s annotation yet", rg.Name, ingress.Name, ControllerLastUpdatedAnnotationKey)
1✔
1072
                        return nil
1✔
1073
                } else if !ready {
3✔
1074
                        c.logger.Infof("Not deleting RouteGroup %s yet, Ingress %s updated less than %s ago", rg.Name, ingress.Name, c.ingressSourceSwitchTTL)
1✔
1075
                        return nil
1✔
1076
                }
1✔
1077
        }
1078
        err := c.client.RouteGroupV1().RouteGroups(rg.Namespace).Delete(ctx, rg.Name, metav1.DeleteOptions{})
1✔
1079
        if err != nil {
1✔
1080
                return err
×
1081
        }
×
1082
        c.recorder.Eventf(
1✔
1083
                stackset,
1✔
1084
                v1.EventTypeNormal,
1✔
1085
                "DeletedRouteGroup",
1✔
1086
                "Deleted RouteGroup %s",
1✔
1087
                rg.Namespace)
1✔
1088
        return nil
1✔
1089
}
1090

1091
func (c *StackSetController) ReconcileStackSetIngressSources(
1092
        ctx context.Context,
1093
        stackset *zv1.StackSet,
1094
        existingIng *networking.Ingress,
1095
        existingRg *rgv1.RouteGroup,
1096
        generateIng func() (*networking.Ingress, error),
1097
        generateRg func() (*rgv1.RouteGroup, error),
1098
) error {
1✔
1099
        ingress, err := generateIng()
1✔
1100
        if err != nil {
1✔
1101
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1102
        }
×
1103

1104
        // opt-out existingIng creation in case we have an external entity creating existingIng
1105
        appliedIng, err := c.AddUpdateStackSetIngress(ctx, stackset, existingIng, existingRg, ingress)
1✔
1106
        if err != nil {
1✔
1107
                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1108
        }
×
1109

1110
        rg, err := generateRg()
1✔
1111
        if err != nil {
1✔
1112
                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1113
        }
×
1114

1115
        var appliedRg *rgv1.RouteGroup
1✔
1116
        if c.routeGroupSupportEnabled {
2✔
1117
                appliedRg, err = c.AddUpdateStackSetRouteGroup(ctx, stackset, existingRg, appliedIng, rg)
1✔
1118
                if err != nil {
1✔
1119
                        return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1120
                }
×
1121
        }
1122

1123
        // Ingress removed
1124
        if ingress == nil {
2✔
1125
                if existingIng != nil {
2✔
1126
                        err := c.deleteIngress(ctx, stackset, existingIng, appliedRg)
1✔
1127
                        if err != nil {
1✔
1128
                                return c.errorEventf(stackset, "FailedManageIngress", err)
×
1129
                        }
×
1130
                }
1131
        }
1132

1133
        // RouteGroup removed
1134
        if rg == nil {
2✔
1135
                if existingRg != nil {
2✔
1136
                        err := c.deleteRouteGroup(ctx, stackset, existingRg, appliedIng)
1✔
1137
                        if err != nil {
1✔
1138
                                return c.errorEventf(stackset, "FailedManageRouteGroup", err)
×
1139
                        }
×
1140
                }
1141
        }
1142

1143
        return nil
1✔
1144
}
1145

1146
// convertToTrafficSegments removes the central ingress component of the
1147
// StackSet, if and only if the StackSet already has traffic segments.
1148
func (c *StackSetController) convertToTrafficSegments(
1149
        ctx context.Context,
1150
        ssc *core.StackSetContainer,
1151
) error {
×
1152
        if ssc.Ingress == nil && ssc.RouteGroup == nil {
×
1153
                return nil
×
1154
        }
×
1155

1156
        var ingTimestamp, rgTimestamp *metav1.Time
×
1157
        for _, sc := range ssc.StackContainers {
×
1158
                if sc.Stack.Spec.Ingress != nil && sc.Resources.IngressSegment == nil {
×
1159
                        c.logger.Warnf(
×
1160
                                "Not deleting Ingress %s, stack %s doesn't have a segment yet.",
×
1161
                                ssc.Ingress.Name,
×
1162
                                sc.Name(),
×
1163
                        )
×
1164
                        return nil
×
1165
                }
×
1166

1167
                if sc.Stack.Spec.RouteGroup != nil && sc.Resources.RouteGroupSegment == nil {
×
1168
                        c.logger.Warnf(
×
1169
                                "Not deleting RouteGroup %s, stack %s doesn't have a segment yet.",
×
1170
                                ssc.RouteGroup.Name,
×
1171
                                sc.Name(),
×
1172
                        )
×
1173
                        return nil
×
1174
                }
×
1175

1176
                // If we find stacks with a segment, we can delete the
1177
                // central ingress resources.
1178
                if ingTimestamp == nil && sc.Resources.IngressSegment != nil {
×
1179
                        ingTimestamp = &sc.Resources.IngressSegment.CreationTimestamp
×
1180
                }
×
1181

1182
                if rgTimestamp == nil && sc.Resources.RouteGroupSegment != nil {
×
1183
                        rgTimestamp = &sc.Resources.RouteGroupSegment.CreationTimestamp
×
1184
                }
×
1185
        }
1186

1187
        if len(ssc.StackContainers) == 0 {
×
1188
                c.logger.Infof(
×
1189
                        "No stacks found for StackSet %s, safe to delete central "+
×
1190
                        "ingress/routegroup",
×
1191
                        ssc.StackSet.Name,
×
1192
                )
×
1193

×
1194
                // If we don't have any stacks, we can delete the central ingress
×
1195
                // resources
×
1196
                oldEnough := metav1.NewTime(
×
1197
                        time.Now().Add(-c.ingressSourceSwitchTTL-time.Minute),
×
1198
                )
×
1199
                ingTimestamp = &oldEnough
×
1200
                rgTimestamp = &oldEnough
×
1201
        }
×
1202

1203
        if ingTimestamp != nil && ssc.Ingress != nil {
×
1204
                if !resourceReadyTime(ingTimestamp.Time, c.ingressSourceSwitchTTL) {
×
1205
                        c.logger.Infof(
×
1206
                                "Not deleting Ingress %s, segments created less than %s ago",
×
1207
                                ssc.Ingress.Name,
×
1208
                                c.ingressSourceSwitchTTL,
×
1209
                        )
×
1210
                        return nil
×
1211
                }
×
1212

1213
                err := c.client.NetworkingV1().Ingresses(ssc.Ingress.Namespace).Delete(
×
1214
                        ctx,
×
1215
                        ssc.Ingress.Name,
×
1216
                        metav1.DeleteOptions{},
×
1217
                )
×
1218
                if err != nil {
×
1219
                        return err
×
1220
                }
×
1221

1222
                c.recorder.Eventf(
×
1223
                        ssc.StackSet,
×
1224
                        v1.EventTypeNormal,
×
1225
                        "DeletedIngress",
×
1226
                        "Deleted Ingress %s, StackSet conversion to traffic segments complete",
×
1227
                        ssc.Ingress.Namespace,
×
1228
                )
×
1229

×
1230
                ssc.Ingress = nil
×
1231
        }
1232

1233
        if rgTimestamp != nil && ssc.RouteGroup != nil {
×
1234
                if !resourceReadyTime(rgTimestamp.Time, c.ingressSourceSwitchTTL) {
×
1235
                        c.logger.Infof(
×
1236
                                "Not deleting RouteGroup %s, segments created less than %s ago",
×
1237
                                ssc.RouteGroup.Name,
×
1238
                                c.ingressSourceSwitchTTL,
×
1239
                        )
×
1240
                        return nil
×
1241
                }
×
1242

1243
                err := c.client.RouteGroupV1().RouteGroups(
×
1244
                        ssc.RouteGroup.Namespace,
×
1245
                ).Delete(
×
1246
                        ctx,
×
1247
                        ssc.RouteGroup.Name,
×
1248
                        metav1.DeleteOptions{},
×
1249
                )
×
1250
                if err != nil {
×
1251
                        return err
×
1252
                }
×
1253

1254
                c.recorder.Eventf(
×
1255
                        ssc.RouteGroup,
×
1256
                        v1.EventTypeNormal,
×
1257
                        "DeletedRouteGroup",
×
1258
                        "Deleted RouteGroup %s, StackSet conversion to traffic segments complete",
×
1259
                        ssc.RouteGroup.Namespace,
×
1260
                )
×
1261

×
1262
                ssc.RouteGroup = nil
×
1263
        }
1264

1265
        return nil
×
1266
}
1267

1268
// ReconcileStackSetResources reconciles the central Ingress and/or RouteGroup
1269
// of the specified StackSet.
1270
//
1271
// If the StackSet supports traffic segments, the controller won't reconcile the
1272
// central ingress resources. This method is deprecated and will be removed in
1273
// the future.
1274
func (c *StackSetController) ReconcileStackSetResources(ctx context.Context, ssc *core.StackSetContainer) error {
×
1275
        if !ssc.SupportsSegmentTraffic() {
×
1276
                err := c.ReconcileStackSetIngressSources(
×
1277
                        ctx,
×
1278
                        ssc.StackSet,
×
1279
                        ssc.Ingress,
×
1280
                        ssc.RouteGroup,
×
1281
                        ssc.GenerateIngress,
×
1282
                        ssc.GenerateRouteGroup,
×
1283
                )
×
1284
                if err != nil {
×
1285
                        return err
×
1286
                }
×
1287
        } else {
×
1288
                // Convert StackSet to traffic segments, if needed
×
1289
                err := c.convertToTrafficSegments(ctx, ssc)
×
1290
                if err != nil {
×
1291
                        return err
×
1292
                }
×
1293
        }
1294

1295
        trafficChanges := ssc.TrafficChanges()
×
1296
        if len(trafficChanges) != 0 {
×
1297
                var changeMessages []string
×
1298
                for _, change := range trafficChanges {
×
1299
                        changeMessages = append(changeMessages, change.String())
×
1300
                }
×
1301

1302
                c.recorder.Eventf(
×
1303
                        ssc.StackSet,
×
1304
                        v1.EventTypeNormal,
×
1305
                        "TrafficSwitched",
×
1306
                        "Switched traffic: %s",
×
1307
                        strings.Join(changeMessages, ", "))
×
1308
        }
1309

1310
        return nil
×
1311
}
1312

1313
func (c *StackSetController) ReconcileStackSetDesiredTraffic(ctx context.Context, existing *zv1.StackSet, generateUpdated func() []*zv1.DesiredTraffic) error {
1✔
1314
        updatedTraffic := generateUpdated()
1✔
1315

1✔
1316
        if equality.Semantic.DeepEqual(existing.Spec.Traffic, updatedTraffic) {
1✔
1317
                return nil
×
1318
        }
×
1319

1320
        updated := existing.DeepCopy()
1✔
1321
        updated.Spec.Traffic = updatedTraffic
1✔
1322

1✔
1323
        _, err := c.client.ZalandoV1().StackSets(updated.Namespace).Update(ctx, updated, metav1.UpdateOptions{})
1✔
1324
        if err != nil {
1✔
1325
                return err
×
1326
        }
×
1327
        c.recorder.Eventf(
1✔
1328
                updated,
1✔
1329
                v1.EventTypeNormal,
1✔
1330
                "UpdatedStackSet",
1✔
1331
                "Updated StackSet %s",
1✔
1332
                updated.Name)
1✔
1333
        return nil
1✔
1334
}
1335

1336
func (c *StackSetController) ReconcileStackResources(ctx context.Context, ssc *core.StackSetContainer, sc *core.StackContainer) error {
×
1337
        err := c.ReconcileStackIngress(ctx, sc.Stack, sc.Resources.Ingress, sc.GenerateIngress)
×
1338
        if err != nil {
×
1339
                return c.errorEventf(sc.Stack, "FailedManageIngress", err)
×
1340
        }
×
1341

1342
        // This is to support both central and segment-based traffic.
1343
        if ssc.SupportsSegmentTraffic() {
×
1344
                err = c.ReconcileStackIngress(
×
1345
                        ctx,
×
1346
                        sc.Stack,
×
1347
                        sc.Resources.IngressSegment,
×
1348
                        sc.GenerateIngressSegment,
×
1349
                )
×
1350
                if err != nil {
×
1351
                        return c.errorEventf(sc.Stack, "FailedManageIngressSegment", err)
×
1352
                }
×
1353
        }
1354

1355
        if c.routeGroupSupportEnabled {
×
1356
                err = c.ReconcileStackRouteGroup(ctx, sc.Stack, sc.Resources.RouteGroup, sc.GenerateRouteGroup)
×
1357
                if err != nil {
×
1358
                        return c.errorEventf(sc.Stack, "FailedManageRouteGroup", err)
×
1359
                }
×
1360

1361
                // This is to support both central and segment-based traffic.
1362
                if ssc.SupportsSegmentTraffic() {
×
1363
                        err = c.ReconcileStackRouteGroup(
×
1364
                                ctx,
×
1365
                                sc.Stack,
×
1366
                                sc.Resources.RouteGroupSegment,
×
1367
                                sc.GenerateRouteGroupSegment,
×
1368
                        )
×
1369
                        if err != nil {
×
1370
                                return c.errorEventf(
×
1371
                                        sc.Stack,
×
1372
                                        "FailedManageRouteGroupSegment",
×
1373
                                        err,
×
1374
                                )
×
1375
                        }
×
1376
                }
1377
        }
1378

1379
        if c.configMapSupportEnabled {
×
1380
                err := c.ReconcileStackConfigMapRefs(ctx, sc.Stack, sc.UpdateObjectMeta)
×
1381
                if err != nil {
×
1382
                        return c.errorEventf(sc.Stack, "FailedManageConfigMapRefs", err)
×
1383
                }
×
1384
        }
1385

1386
        if c.secretSupportEnabled {
×
1387
                err := c.ReconcileStackSecretRefs(ctx, sc.Stack, sc.UpdateObjectMeta)
×
1388
                if err != nil {
×
1389
                        return c.errorEventf(sc.Stack, "FailedManageSecretRefs", err)
×
1390
                }
×
1391
        }
1392

1393
        // if c.pcsSupportEnabled {
NEW
1394
        err = c.ReconcileStackPlatformCredentialsSets(
×
NEW
1395
                ctx,
×
NEW
1396
                sc.Stack,
×
NEW
1397
                sc.Resources.PlatformCredentialsSets,
×
NEW
1398
                sc.GeneratePlatformCredentialsSet,
×
NEW
1399
        )
×
NEW
1400
        if err != nil {
×
NEW
1401
                return c.errorEventf(sc.Stack, "FailedManagePlatformCredentialsSet", err)
×
NEW
1402
        }
×
1403
        // }
1404

1405
        err = c.ReconcileStackDeployment(ctx, sc.Stack, sc.Resources.Deployment, sc.GenerateDeployment)
×
1406
        if err != nil {
×
1407
                return c.errorEventf(sc.Stack, "FailedManageDeployment", err)
×
1408
        }
×
1409

1410
        hpaGenerator := sc.GenerateHPA
×
1411
        if ssc.SupportsSegmentTraffic() {
×
1412
                hpaGenerator = sc.GenerateHPAToSegment
×
1413
        }
×
1414
        err = c.ReconcileStackHPA(ctx, sc.Stack, sc.Resources.HPA, hpaGenerator)
×
1415
        if err != nil {
×
1416
                return c.errorEventf(sc.Stack, "FailedManageHPA", err)
×
1417
        }
×
1418

1419
        err = c.ReconcileStackService(ctx, sc.Stack, sc.Resources.Service, sc.GenerateService)
×
1420
        if err != nil {
×
1421
                return c.errorEventf(sc.Stack, "FailedManageService", err)
×
1422
        }
×
1423

1424
        return nil
×
1425
}
1426

1427
// ReconcileStackSet reconciles all the things from a stackset
1428
func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *core.StackSetContainer) (err error) {
×
1429
        defer func() {
×
1430
                if r := recover(); r != nil {
×
1431
                        c.metricsReporter.ReportPanic()
×
1432
                        c.stacksetLogger(container).Errorf("Encountered a panic while processing a stackset: %v\n%s", r, debug.Stack())
×
1433
                        err = fmt.Errorf("panic: %v", r)
×
1434
                }
×
1435
        }()
1436

1437
        if c.injectSegmentAnnotation(ctx, container.StackSet) {
×
1438
                // Reconciler handles StackSet in the next loop
×
1439
                return nil
×
1440
        }
×
1441

1442
        // Create current stack, if needed. Proceed on errors.
1443
        err = c.CreateCurrentStack(ctx, container)
×
1444
        if err != nil {
×
1445
                err = c.errorEventf(container.StackSet, "FailedCreateStack", err)
×
1446
                c.stacksetLogger(container).Errorf("Unable to create stack: %v", err)
×
1447
        }
×
1448

1449
        // Update statuses from external resources (ingresses, deployments, etc). Abort on errors.
1450
        err = container.UpdateFromResources()
×
1451
        if err != nil {
×
1452
                return err
×
1453
        }
×
1454

1455
        // Update the stacks with the currently selected traffic reconciler. Proceed on errors.
1456
        err = container.ManageTraffic(time.Now())
×
1457
        if err != nil {
×
1458
                c.stacksetLogger(container).Errorf("Traffic reconciliation failed: %v", err)
×
1459
                c.recorder.Eventf(
×
1460
                        container.StackSet,
×
1461
                        v1.EventTypeWarning,
×
1462
                        "TrafficNotSwitched",
×
1463
                        "Failed to switch traffic: "+err.Error())
×
1464
        }
×
1465

1466
        // Mark stacks that should be removed
1467
        container.MarkExpiredStacks()
×
1468

×
1469
        segsInOrder := []types.UID{}
×
1470
        // This is to support both central and segment-based traffic.
×
1471
        if container.SupportsSegmentTraffic() {
×
1472
                // Update traffic segments. Proceed on errors.
×
1473
                segsInOrder, err = c.ReconcileTrafficSegments(ctx, container)
×
1474
                if err != nil {
×
1475
                        err = c.errorEventf(
×
1476
                                container.StackSet,
×
1477
                                reasonFailedManageStackSet,
×
1478
                                err,
×
1479
                        )
×
1480
                        c.stacksetLogger(container).Errorf(
×
1481
                                "Unable to reconcile traffic segments: %v",
×
1482
                                err,
×
1483
                        )
×
1484
                }
×
1485
        }
1486

1487
        // Reconcile stack resources. Proceed on errors.
1488
        reconciledStacks := map[types.UID]bool{}
×
1489
        for _, id := range segsInOrder {
×
1490
                reconciledStacks[id] = true
×
1491
                sc := container.StackContainers[id]
×
1492
                err = c.ReconcileStackResources(ctx, container, sc)
×
1493
                if err != nil {
×
1494
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
1495
                        c.stackLogger(container, sc).Errorf(
×
1496
                                "Unable to reconcile stack resources: %v",
×
1497
                                err,
×
1498
                        )
×
1499
                }
×
1500
        }
1501

1502
        for k, sc := range container.StackContainers {
×
1503
                if reconciledStacks[k] {
×
1504
                        continue
×
1505
                }
1506

1507
                err = c.ReconcileStackResources(ctx, container, sc)
×
1508
                if err != nil {
×
1509
                        err = c.errorEventf(sc.Stack, "FailedManageStack", err)
×
1510
                        c.stackLogger(container, sc).Errorf("Unable to reconcile stack resources: %v", err)
×
1511
                }
×
1512
        }
1513

1514
        // Reconcile stackset resources (update ingress and/or routegroups). Proceed on errors.
1515
        err = c.ReconcileStackSetResources(ctx, container)
×
1516
        if err != nil {
×
1517
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1518
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset resources: %v", err)
×
1519
        }
×
1520

1521
        // Reconcile desired traffic in the stackset. Proceed on errors.
1522
        err = c.ReconcileStackSetDesiredTraffic(ctx, container.StackSet, container.GenerateStackSetTraffic)
×
1523
        if err != nil {
×
1524
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1525
                c.stacksetLogger(container).Errorf("Unable to reconcile stackset traffic: %v", err)
×
1526
        }
×
1527

1528
        // Delete old stacks. Proceed on errors.
1529
        err = c.CleanupOldStacks(ctx, container)
×
1530
        if err != nil {
×
1531
                err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err)
×
1532
                c.stacksetLogger(container).Errorf("Unable to delete old stacks: %v", err)
×
1533
        }
×
1534

1535
        // Update statuses.
1536
        err = c.ReconcileStatuses(ctx, container)
×
1537
        if err != nil {
×
1538
                return err
×
1539
        }
×
1540

1541
        return nil
×
1542
}
1543

1544
// getResetMinReplicasDelay parses and returns the reset delay if set in the
1545
// stackset annotation.
1546
func getResetMinReplicasDelay(annotations map[string]string) (time.Duration, bool) {
1✔
1547
        resetDelayStr, ok := annotations[ResetHPAMinReplicasDelayAnnotationKey]
1✔
1548
        if !ok {
2✔
1549
                return 0, false
1✔
1550
        }
1✔
1551
        resetDelay, err := time.ParseDuration(resetDelayStr)
1✔
1552
        if err != nil {
1✔
1553
                return 0, false
×
1554
        }
×
1555
        return resetDelay, true
1✔
1556
}
1557

1558
func fixupStackSetTypeMeta(stackset *zv1.StackSet) {
1✔
1559
        // set TypeMeta manually because of this bug:
1✔
1560
        // https://github.com/kubernetes/client-go/issues/308
1✔
1561
        stackset.APIVersion = core.APIVersion
1✔
1562
        stackset.Kind = core.KindStackSet
1✔
1563
}
1✔
1564

1565
func fixupStackTypeMeta(stack *zv1.Stack) {
1✔
1566
        // set TypeMeta manually because of this bug:
1✔
1567
        // https://github.com/kubernetes/client-go/issues/308
1✔
1568
        stack.APIVersion = core.APIVersion
1✔
1569
        stack.Kind = core.KindStack
1✔
1570
}
1✔
1571

1572
func resourceReady(timestamp string, ttl time.Duration) (bool, error) {
1✔
1573
        resourceLastUpdated, err := time.Parse(time.RFC3339, timestamp)
1✔
1574
        if err != nil {
2✔
1575
                // wait until there's a valid timestamp on the annotation
1✔
1576
                return false, err
1✔
1577
        }
1✔
1578

1579
        return resourceReadyTime(resourceLastUpdated, ttl), nil
1✔
1580
}
1581

1582
func resourceReadyTime(timestamp time.Time, ttl time.Duration) bool {
1✔
1583
        if !timestamp.IsZero() && time.Since(timestamp) > ttl {
2✔
1584
                return true
1✔
1585
        }
1✔
1586

1587
        return false
1✔
1588
}
1589

1590
// validateConfigurationResourcesNames returns an error if any ConfigurationResource
1591
// name is not prefixed by Stack name.
1592
func validateAllConfigurationResourcesNames(stack *zv1.Stack) error {
1✔
1593
        for _, rsc := range stack.Spec.ConfigurationResources {
1✔
1594
                if err := validateConfigurationResourceName(stack.Name, rsc.GetName()); err != nil {
×
1595
                        return err
×
1596
                }
×
1597
        }
1598
        return nil
1✔
1599
}
1600

1601
// validateConfigurationResourceName returns an error if specific resource
1602
// name is not prefixed by Stack name.
1603
func validateConfigurationResourceName(stack string, rsc string) error {
1✔
1604
        if !strings.HasPrefix(rsc, stack) {
2✔
1605
                return fmt.Errorf(configurationResourceNameError, rsc, stack)
1✔
1606
        }
1✔
1607
        return nil
1✔
1608
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc