• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando-incubator / cluster-lifecycle-manager / 10108145206

26 Jul 2024 08:22AM UTC coverage: 48.258%. First build
10108145206

Pull #797

github

gargravarr
Fix kubectl arguments.

Signed-off-by: gargravarr <rodrigo.gargravarr@gmail.com>
Pull Request #797: Implement EKS provisioning and support multiple provisioners.

24 of 405 new or added lines in 8 files covered. (5.93%)

3019 of 6256 relevant lines covered (48.26%)

14.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.0
/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "slices"
7
        "time"
8

9
        log "github.com/sirupsen/logrus"
10
        "github.com/zalando-incubator/cluster-lifecycle-manager/api"
11
        "github.com/zalando-incubator/cluster-lifecycle-manager/channel"
12
        "github.com/zalando-incubator/cluster-lifecycle-manager/config"
13
        "github.com/zalando-incubator/cluster-lifecycle-manager/pkg/util/command"
14
        "github.com/zalando-incubator/cluster-lifecycle-manager/provisioner"
15
        "github.com/zalando-incubator/cluster-lifecycle-manager/registry"
16
)
17

18
const (
19
        errTypeGeneral           = "https://cluster-lifecycle-manager.zalando.org/problems/general-error"
20
        errTypeCoalescedProblems = "https://cluster-lifecycle-manager.zalando.org/problems/too-many-problems"
21
        errorLimit               = 25
22
)
23

24
var (
25
        statusRequested             = "requested"
26
        statusReady                 = "ready"
27
        statusDecommissionRequested = "decommission-requested"
28
        statusDecommissioned        = "decommissioned"
29
)
30

31
// Options are options which can be used to configure the controller when it is
32
// initialized.
33
type Options struct {
34
        Interval          time.Duration
35
        AccountFilter     config.IncludeExcludeFilter
36
        Providers         []string
37
        DryRun            bool
38
        ConcurrentUpdates uint
39
        EnvironmentOrder  []string
40
}
41

42
// Controller defines the main control loop for the cluster-lifecycle-manager.
43
type Controller struct {
44
        logger               *log.Entry
45
        execManager          *command.ExecManager
46
        registry             registry.Registry
47
        provisioners         map[provisioner.ProviderID]provisioner.Provisioner
48
        providers            []string
49
        channelConfigSourcer channel.ConfigSource
50
        interval             time.Duration
51
        dryRun               bool
52
        clusterList          *ClusterList
53
        concurrentUpdates    uint
54
}
55

56
// New initializes a new controller.
57
func New(
58
        logger *log.Entry,
59
        execManager *command.ExecManager,
60
        registry registry.Registry,
61
        provisioners map[provisioner.ProviderID]provisioner.Provisioner,
62
        channelConfigSourcer channel.ConfigSource,
63
        options *Options,
64
) *Controller {
11✔
65
        return &Controller{
11✔
66
                logger:               logger,
11✔
67
                execManager:          execManager,
11✔
68
                registry:             registry,
11✔
69
                provisioners:         provisioners,
11✔
70
                providers:            options.Providers,
11✔
71
                channelConfigSourcer: channel.NewCachingSource(channelConfigSourcer),
11✔
72
                interval:             options.Interval,
11✔
73
                dryRun:               options.DryRun,
11✔
74
                clusterList:          NewClusterList(options.AccountFilter),
11✔
75
                concurrentUpdates:    options.ConcurrentUpdates,
11✔
76
        }
11✔
77
}
11✔
78

79
// Run the main controller loop.
80
func (c *Controller) Run(ctx context.Context) {
×
81
        log.Info("Starting main control loop.")
×
82

×
83
        // Start the update workers
×
84
        for i := uint(0); i < c.concurrentUpdates; i++ {
×
85
                go c.processWorkerLoop(ctx, i+1)
×
86
        }
×
87

88
        var interval time.Duration
×
89

×
90
        // Start the refresh loop
×
91
        for {
×
92
                select {
×
93
                case <-time.After(interval):
×
94
                        interval = c.interval
×
95
                        err := c.refresh()
×
96
                        if err != nil {
×
97
                                log.Errorf("Failed to refresh cluster list: %s", err)
×
98
                        }
×
99
                case <-ctx.Done():
×
100
                        log.Info("Terminating main controller loop.")
×
101
                        return
×
102
                }
103
        }
104
}
105

106
func (c *Controller) processWorkerLoop(ctx context.Context, workerNum uint) {
×
107
        for {
×
108
                select {
×
109
                case <-time.After(c.interval):
×
110
                        updateCtx, cancelFunc := context.WithCancel(ctx)
×
111
                        nextCluster := c.clusterList.SelectNext(cancelFunc)
×
112
                        if nextCluster != nil {
×
113
                                c.processCluster(updateCtx, workerNum, nextCluster)
×
114
                        }
×
115
                        cancelFunc()
×
116
                case <-ctx.Done():
×
117
                        return
×
118
                }
119
        }
120
}
121

122
// refresh refreshes the channel configuration and the cluster list
123
func (c *Controller) refresh() error {
209✔
124
        err := c.channelConfigSourcer.Update(context.Background(), c.logger)
209✔
125
        if err != nil {
209✔
126
                return err
×
127
        }
×
128

129
        clusters, err := c.registry.ListClusters(
209✔
130
                registry.Filter{
209✔
131
                        Providers: c.providers,
209✔
132
                },
209✔
133
        )
209✔
134
        if err != nil {
209✔
135
                return err
×
136
        }
×
137

138
        c.clusterList.UpdateAvailable(c.channelConfigSourcer, c.dropUnsupported(clusters))
209✔
139
        return nil
209✔
140
}
141

142
// dropUnsupported removes clusters not supported by the current provisioner
143
func (c *Controller) dropUnsupported(clusters []*api.Cluster) []*api.Cluster {
209✔
144
        result := make([]*api.Cluster, 0, len(clusters))
209✔
145
        for _, cluster := range clusters {
418✔
146
                supports := false
209✔
147
                for _, provisioner := range c.provisioners {
418✔
148
                        if provisioner.Supports(cluster) {
417✔
149
                                supports = true
208✔
150
                                result = append(result, cluster)
208✔
151
                                break
208✔
152
                        }
153
                }
154

155
                if !supports {
210✔
156
                        log.Debugf("Unsupported cluster: %s", cluster.ID)
1✔
157
                        continue
1✔
158
                }
159
        }
160
        return result
209✔
161
}
162

163
// doProcessCluster checks if an action needs to be taken depending on the
164
// cluster state and triggers the provisioner accordingly.
165
func (c *Controller) doProcessCluster(ctx context.Context, logger *log.Entry, clusterInfo *ClusterInfo) (rerr error) {
208✔
166
        cluster := clusterInfo.Cluster
208✔
167
        if cluster.Status == nil {
208✔
168
                cluster.Status = &api.ClusterStatus{}
×
169
        }
×
170

171
        // There was an error trying to determine the target configuration, abort
172
        if clusterInfo.NextError != nil {
209✔
173
                return clusterInfo.NextError
1✔
174
        }
1✔
175

176
        config, err := clusterInfo.ChannelVersion.Get(ctx, logger)
207✔
177
        if err != nil {
208✔
178
                return err
1✔
179
        }
1✔
180
        defer func() {
412✔
181
                err := config.Delete()
206✔
182
                if err != nil {
206✔
183
                        rerr = err
×
184
                }
×
185
        }()
186

187
        provisioner, ok := c.provisioners[provisioner.ProviderID(cluster.Provider)]
206✔
188
        if !ok {
206✔
NEW
189
                return fmt.Errorf(
×
NEW
190
                        "cluster %s: unknown provider %q",
×
NEW
191
                        cluster.ID,
×
NEW
192
                        cluster.Provider,
×
NEW
193
                )
×
NEW
194
        }
×
195

196
        switch cluster.LifecycleStatus {
206✔
197
        case statusRequested, statusReady:
205✔
198
                cluster.Status.NextVersion = clusterInfo.NextVersion.String()
205✔
199
                if !c.dryRun {
410✔
200
                        err = c.registry.UpdateCluster(cluster)
205✔
201
                        if err != nil {
205✔
202
                                return err
×
203
                        }
×
204
                }
205

206
                err = provisioner.Provision(ctx, logger, cluster, config)
205✔
207
                if err != nil {
407✔
208
                        return err
202✔
209
                }
202✔
210

211
                cluster.LifecycleStatus = statusReady
3✔
212
                cluster.Status.LastVersion = cluster.Status.CurrentVersion
3✔
213
                cluster.Status.CurrentVersion = cluster.Status.NextVersion
3✔
214
                cluster.Status.NextVersion = ""
3✔
215
                cluster.Status.Problems = []*api.Problem{}
3✔
216
        case statusDecommissionRequested:
1✔
217
                err = provisioner.Decommission(ctx, logger, cluster)
1✔
218
                if err != nil {
1✔
219
                        return err
×
220
                }
×
221

222
                cluster.Status.LastVersion = cluster.Status.CurrentVersion
1✔
223
                cluster.Status.CurrentVersion = ""
1✔
224
                cluster.Status.NextVersion = ""
1✔
225
                cluster.Status.Problems = []*api.Problem{}
1✔
226
                cluster.LifecycleStatus = statusDecommissioned
1✔
227
        default:
×
228
                return fmt.Errorf("invalid cluster status: %s", cluster.LifecycleStatus)
×
229
        }
230

231
        return nil
4✔
232
}
233

234
// processCluster calls doProcessCluster and handles logging and reporting
235
func (c *Controller) processCluster(updateCtx context.Context, workerNum uint, clusterInfo *ClusterInfo) {
200✔
236
        defer c.clusterList.ClusterProcessed(clusterInfo)
200✔
237

200✔
238
        cluster := clusterInfo.Cluster
200✔
239
        clusterLog := c.logger.WithField("cluster", cluster.Alias).WithField("worker", workerNum)
200✔
240

200✔
241
        versionedLog := clusterLog
200✔
242
        if clusterInfo.NextVersion != nil {
400✔
243
                versionedLog = clusterLog.WithField("version", clusterInfo.NextVersion.String())
200✔
244
        }
200✔
245

246
        versionedLog.Infof("Processing cluster (%s)", cluster.LifecycleStatus)
200✔
247

200✔
248
        err := c.doProcessCluster(updateCtx, clusterLog, clusterInfo)
200✔
249

200✔
250
        // log the error and resolve the special error cases
200✔
251
        if err != nil {
400✔
252
                versionedLog.Errorf("Failed to process cluster: %s", err)
200✔
253

200✔
254
                // treat "provider not supported" as no error
200✔
255
                if err == provisioner.ErrProviderNotSupported {
200✔
256
                        err = nil
×
257
                }
×
258
        } else {
×
259
                versionedLog.Infof("Finished processing cluster")
×
260
        }
×
261

262
        // update the cluster state in the registry
263
        if !c.dryRun {
400✔
264
                if err != nil {
400✔
265
                        if cluster.Status.Problems == nil {
202✔
266
                                cluster.Status.Problems = make([]*api.Problem, 0, 1)
2✔
267
                        }
2✔
268
                        cluster.Status.Problems = append(cluster.Status.Problems, &api.Problem{
200✔
269
                                Title: err.Error(),
200✔
270
                                Type:  errTypeGeneral,
200✔
271
                        })
200✔
272

200✔
273
                        cluster.Status.Problems = slices.CompactFunc(cluster.Status.Problems, func(a, b *api.Problem) bool { return *a == *b })
2,474✔
274

275
                        if len(cluster.Status.Problems) > errorLimit {
275✔
276
                                cluster.Status.Problems = cluster.Status.Problems[len(cluster.Status.Problems)-errorLimit:]
75✔
277
                                cluster.Status.Problems[0] = &api.Problem{
75✔
278
                                        Type:  errTypeCoalescedProblems,
75✔
279
                                        Title: "<multiple problems>",
75✔
280
                                }
75✔
281
                        }
75✔
282
                } else {
×
283
                        cluster.Status.Problems = []*api.Problem{}
×
284
                }
×
285
                err = c.registry.UpdateCluster(cluster)
200✔
286
                if err != nil {
200✔
287
                        versionedLog.Errorf("Unable to update cluster state: %s", err)
×
288
                }
×
289
        }
290
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc