• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

topfreegames / maestro / 6301877509

25 Sep 2023 04:02PM UTC coverage: 78.06% (-0.1%) from 78.177%
6301877509

push

github

web-flow
Fix maestro v9 (#600)

* Upgrade datadog client

* Add keepalive to check for broken TCP connections

* Bump go to 1.17

7998 of 10246 relevant lines covered (78.06%)

89.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.97
/controller/utils.go
1
// maestro
2
// https://github.com/topfreegames/maestro
3
//
4
// Licensed under the MIT license:
5
// http://www.opensource.org/licenses/mit-license
6
// Copyright © 2017 Top Free Games <backend@tfgco.com>
7

8
package controller
9

10
import (
11
        "context"
12
        "errors"
13
        "fmt"
14
        "math"
15
        "math/rand"
16
        "strings"
17
        "sync"
18
        "time"
19

20
        "github.com/sirupsen/logrus"
21
        "github.com/spf13/viper"
22
        "github.com/topfreegames/maestro/models"
23
        "github.com/topfreegames/maestro/reporters"
24
        yaml "gopkg.in/yaml.v2"
25
        v1 "k8s.io/api/core/v1"
26
        "k8s.io/apimachinery/pkg/fields"
27
        "k8s.io/apimachinery/pkg/labels"
28
        "k8s.io/client-go/kubernetes"
29

30
        redisLock "github.com/bsm/redis-lock"
31
        clockinterfaces "github.com/topfreegames/extensions/clock/interfaces"
32
        pginterfaces "github.com/topfreegames/extensions/pg/interfaces"
33
        "github.com/topfreegames/extensions/redis"
34
        redisinterfaces "github.com/topfreegames/extensions/redis/interfaces"
35
        maestroErrors "github.com/topfreegames/maestro/errors"
36
        reportersConstants "github.com/topfreegames/maestro/reporters/constants"
37
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38
)
39

40
// SegmentAndReplacePods acts when a scheduler rolling update is needed.
41
// It segment the list of current pods in chunks of size maxSurge and replace them with new ones
42
func SegmentAndReplacePods(
43
        logger logrus.FieldLogger,
44
        roomManager models.RoomManager,
45
        mr *models.MixedMetricsReporter,
46
        clientset kubernetes.Interface,
47
        db pginterfaces.DB,
48
        redisClient redisinterfaces.RedisClient,
49
        willTimeoutAt time.Time,
50
        configYAML *models.ConfigYAML,
51
        pods []*models.Pod,
52
        scheduler *models.Scheduler,
53
        operationManager *models.OperationManager,
54
        cancelPollingPeriod time.Duration,
55
        maxSurge, goroutinePoolSize int,
56
) (timeoutErr, cancelErr, err error) {
10✔
57
        rand.Seed(time.Now().UnixNano())
10✔
58
        schedulerName := scheduler.Name
10✔
59
        l := logger.WithFields(logrus.Fields{
10✔
60
                "source":    "SegmentAndReplacePods",
10✔
61
                "scheduler": schedulerName,
10✔
62
        })
10✔
63

10✔
64
        ctx, cancel := context.WithDeadline(context.Background(), willTimeoutAt)
10✔
65
        defer cancel()
10✔
66

10✔
67
        inRollingUpdate := operationManager != nil
10✔
68

10✔
69
        if inRollingUpdate {
19✔
70
                go watchOperation(ctx, cancel, logger, operationManager, cancelPollingPeriod)
9✔
71
        }
9✔
72

73
        // segment pods in chunks
74
        podChunks := segmentPods(pods, maxSurge)
10✔
75

10✔
76
        for i, chunk := range podChunks {
20✔
77
                l.Debugf("updating chunk %d: %v", i, names(chunk))
10✔
78

10✔
79
                // replace chunk
10✔
80
                err := replacePodsAndWait(
10✔
81
                        ctx,
10✔
82
                        logger,
10✔
83
                        roomManager,
10✔
84
                        mr,
10✔
85
                        clientset,
10✔
86
                        db,
10✔
87
                        redisClient,
10✔
88
                        configYAML,
10✔
89
                        chunk,
10✔
90
                        scheduler,
10✔
91
                        inRollingUpdate,
10✔
92
                        goroutinePoolSize,
10✔
93
                )
10✔
94

10✔
95
                if err != nil {
14✔
96
                        l.WithError(err).Error("error replacing chunk of pods")
4✔
97
                        return nil, nil, err
4✔
98
                }
4✔
99
        }
100

101
        cancelReason := ctx.Err()
6✔
102

6✔
103
        if cancelReason == context.DeadlineExceeded {
7✔
104
                timeoutErr = errors.New("timedout waiting rooms to be replaced, rolled back")
1✔
105
                l.WithError(timeoutErr).Error("operation timed out while replacing chunk of pods")
1✔
106
        } else if cancelReason == context.Canceled {
8✔
107
                cancelErr = errors.New("operation was canceled, rolled back")
2✔
108
                l.WithError(cancelErr).Error("operation canceled while error replacing chunk of pods")
2✔
109
        }
2✔
110

111
        return timeoutErr, cancelErr, err
6✔
112
}
113

114
func watchOperation(
115
        ctx context.Context,
116
        cancel func(),
117
        logger logrus.FieldLogger,
118
        operationManager *models.OperationManager,
119
        cancelPollingPeriod time.Duration,
120
) {
9✔
121
        canceled, err := operationManager.WasCanceled()
9✔
122
        if err != nil {
9✔
123
                logger.WithError(err).Warn("error reading operation status from redis")
×
124
        }
×
125
        if canceled && err == nil {
9✔
126
                cancel()
×
127
                return
×
128
        }
×
129

130
        ticker := time.NewTicker(cancelPollingPeriod)
9✔
131
        defer ticker.Stop()
9✔
132
        for {
18✔
133
                select {
9✔
134
                case <-ticker.C:
2✔
135
                        canceled, err := operationManager.WasCanceled()
2✔
136
                        if err != nil {
2✔
137
                                logger.WithError(err).Warn("error reading operation status from redis")
×
138
                                continue
×
139
                        }
140
                        if canceled {
4✔
141
                                cancel()
2✔
142
                                return
2✔
143
                        }
2✔
144
                case <-ctx.Done():
7✔
145
                        return
7✔
146
                }
147
        }
148
}
149

150
func replacePodsAndWait(
151
        ctx context.Context,
152
        logger logrus.FieldLogger,
153
        roomManager models.RoomManager,
154
        mr *models.MixedMetricsReporter,
155
        clientset kubernetes.Interface,
156
        db pginterfaces.DB,
157
        redisClient redisinterfaces.RedisClient,
158
        configYAML *models.ConfigYAML,
159
        podsChunk []*models.Pod,
160
        scheduler *models.Scheduler,
161
        inRollingUpdate bool,
162
        goroutinePoolSize int,
163
) (err error) {
10✔
164
        logger.Debug("starting to replace pods with new ones")
10✔
165

10✔
166
        childCtx, cancel := context.WithCancel(ctx)
10✔
167

10✔
168
        errChan := make(chan error, goroutinePoolSize)
10✔
169

10✔
170
        pods := make(chan *models.Pod, len(podsChunk))
10✔
171
        for _, pod := range podsChunk {
22✔
172
                pods <- pod
12✔
173
        }
12✔
174
        close(pods)
10✔
175

10✔
176
        var wg sync.WaitGroup
10✔
177
        logger.Infof("starting %d in-memory workers to replace %d pods", goroutinePoolSize, len(podsChunk))
10✔
178
        for i := 0; i < goroutinePoolSize; i++ {
21✔
179
                wg.Add(1)
11✔
180
                go func() {
22✔
181
                        defer wg.Done()
11✔
182
                        err := replacePodWorker(
11✔
183
                                childCtx,
11✔
184
                                logger,
11✔
185
                                roomManager,
11✔
186
                                mr,
11✔
187
                                clientset,
11✔
188
                                db,
11✔
189
                                redisClient,
11✔
190
                                configYAML,
11✔
191
                                scheduler,
11✔
192
                                pods,
11✔
193
                                inRollingUpdate,
11✔
194
                        )
11✔
195
                        // if there's an error in any of the workers cancel the operation
11✔
196
                        if err != nil {
15✔
197
                                cancel()
4✔
198
                                errChan <- err
4✔
199
                        }
4✔
200
                }()
201
        }
202

203
        wg.Wait()
10✔
204

10✔
205
        select {
10✔
206
        case err = <-errChan:
4✔
207
                logger.WithError(err).Debug("error replacing pods")
4✔
208
        default:
6✔
209
                logger.WithError(err).Trace("replacePodsAndWait finished successfully")
6✔
210
        }
211

212
        return err
10✔
213
}
214

215
func replacePodWorker(
216
        ctx context.Context,
217
        logger logrus.FieldLogger,
218
        roomManager models.RoomManager,
219
        mr *models.MixedMetricsReporter,
220
        clientset kubernetes.Interface,
221
        db pginterfaces.DB,
222
        redisClient redisinterfaces.RedisClient,
223
        configYAML *models.ConfigYAML,
224
        scheduler *models.Scheduler,
225
        pods <-chan *models.Pod,
226
        inRollingUpdate bool,
227
) error {
11✔
228
        for {
22✔
229
                select {
11✔
230
                case pod, ok := <-pods:
11✔
231
                        // this case is executed even after pods channel was closed we need this check
11✔
232
                        if !ok {
11✔
233
                                return nil
×
234
                        }
×
235
                        canceled, err := createNewRemoveOldPod(
11✔
236
                                ctx,
11✔
237
                                logger,
11✔
238
                                roomManager,
11✔
239
                                mr,
11✔
240
                                clientset,
11✔
241
                                db,
11✔
242
                                redisClient,
11✔
243
                                configYAML,
11✔
244
                                scheduler,
11✔
245
                                pod,
11✔
246
                                inRollingUpdate,
11✔
247
                        )
11✔
248

11✔
249
                        if err != nil {
15✔
250
                                return err
4✔
251
                        }
4✔
252

253
                        if canceled {
11✔
254
                                return nil
4✔
255
                        }
4✔
256

257
                        logger.Infof("pods remaining to replace: %d", len(pods))
3✔
258

3✔
259
                        if len(pods) == 0 {
6✔
260
                                return nil
3✔
261
                        }
3✔
262
                case <-ctx.Done():
×
263
                        return nil
×
264
                }
265
        }
266
}
267

268
func createNewRemoveOldPod(
269
        ctx context.Context,
270
        logger logrus.FieldLogger,
271
        roomManager models.RoomManager,
272
        mr *models.MixedMetricsReporter,
273
        clientset kubernetes.Interface,
274
        db pginterfaces.DB,
275
        redisClient redisinterfaces.RedisClient,
276
        configYAML *models.ConfigYAML,
277
        scheduler *models.Scheduler,
278
        pod *models.Pod,
279
        inRollingUpdate bool,
280
) (canceled bool, err error) {
11✔
281
        logger.Debug("creating pod")
11✔
282

11✔
283
        // create new pod
11✔
284
        newPod, err := roomManager.Create(logger, mr, redisClient,
11✔
285
                db, clientset, configYAML, scheduler)
11✔
286

11✔
287
        if err != nil {
13✔
288
                logger.WithError(err).Errorf("error creating pod")
2✔
289
                return false, err
2✔
290
        }
2✔
291

292
        // wait for new pod to be created
293
        canceled, err = waitCreatingPods(
9✔
294
                ctx, logger, clientset, redisClient, configYAML,
9✔
295
                []v1.Pod{*newPod}, mr)
9✔
296
        if canceled || err != nil {
14✔
297
                logger.Errorf("error waiting for pod to be created")
5✔
298
                return canceled, err
5✔
299
        }
5✔
300

301
        canceled, err = DeletePodsAndWait(
4✔
302
                ctx,
4✔
303
                reportersConstants.ReasonUpdate,
4✔
304
                logger,
4✔
305
                roomManager,
4✔
306
                mr,
4✔
307
                clientset,
4✔
308
                redisClient,
4✔
309
                configYAML,
4✔
310
                []*models.Pod{pod},
4✔
311
        )
4✔
312

4✔
313
        if err != nil && !strings.Contains(err.Error(), "redis") {
4✔
314
                return false, nil
×
315
        }
×
316

317
        if canceled {
5✔
318
                return true, nil
1✔
319
        }
1✔
320

321
        // Remove invalid rooms redis keys if in a rolling update operation
322
        // in order to track progress correctly
323
        if inRollingUpdate {
6✔
324
                err = models.RemoveInvalidRooms(redisClient, mr, configYAML.Name, []string{pod.Name})
3✔
325
                if err != nil {
3✔
326
                        logger.WithError(err).Warnf("error removing room %s from invalidRooms redis key during rolling update", pod.Name)
×
327
                }
×
328
        }
329

330
        return false, nil
3✔
331
}
332

333
// DeletePodsAndWait deletes a list of pods
334
func DeletePodsAndWait(
335
        ctx context.Context,
336
        reason string,
337
        logger logrus.FieldLogger,
338
        roomManager models.RoomManager,
339
        mr *models.MixedMetricsReporter,
340
        clientset kubernetes.Interface,
341
        redisClient redisinterfaces.RedisClient,
342
        configYAML *models.ConfigYAML,
343
        pods []*models.Pod,
344
) (canceled bool, err error) {
4✔
345

4✔
346
        for _, pod := range pods {
8✔
347
                logger.Debugf("deleting pod %s", pod.Name)
4✔
348
                err = DeletePodAndRoom(logger, roomManager, mr, clientset, redisClient,
4✔
349
                        configYAML, pod.Name, reason)
4✔
350
                if err != nil && !strings.Contains(err.Error(), "redis") {
4✔
351
                        logger.WithError(err).Errorf("error deleting pod %s", pod.Name)
×
352
                        return false, nil
×
353
                }
×
354
        }
355

356
        // wait for old pods to be deleted
357
        // we assume that maxSurge == maxUnavailable as we can't set maxUnavailable yet
358
        // so for every pod created in a chunk one is deleted right after it
359
        canceled = waitTerminatingPods(ctx, logger, clientset, redisClient, configYAML.Name, pods, mr)
4✔
360
        if canceled {
5✔
361
                return canceled, nil
1✔
362
        }
1✔
363

364
        return false, nil
3✔
365
}
366

367
// DBRollback perform a rollback on a scheduler config in the database
368
func DBRollback(
369
        ctx context.Context,
370
        logger logrus.FieldLogger,
371
        mr *models.MixedMetricsReporter,
372
        db pginterfaces.DB,
373
        redisClient *redis.Client,
374
        failedConfigYAML *models.ConfigYAML,
375
        oldConfigYAML *models.ConfigYAML,
376
        clock clockinterfaces.Clock,
377
        scheduler *models.Scheduler,
378
        config *viper.Viper,
379
        oldVersion string,
380
) (err error) {
3✔
381
        eventRollbackTags := map[string]interface{}{
3✔
382
                "name": scheduler.Name,
3✔
383
                "game": scheduler.Game,
3✔
384
        }
3✔
385
        err = scheduler.UpdateVersionStatus(db)
3✔
386
        if err != nil {
3✔
387
                reporters.Report(reportersConstants.EventSchedulerRollbackError, eventRollbackTags)
×
388
                return err
×
389
        }
×
390

391
        // create new major version to rollback
392
        scheduler.NextMajorVersion()
3✔
393
        scheduler.RollingUpdateStatus = rollbackStatus(oldVersion)
3✔
394
        err = saveConfigYAML(
3✔
395
                ctx,
3✔
396
                logger,
3✔
397
                mr,
3✔
398
                db,
3✔
399
                oldConfigYAML,
3✔
400
                scheduler,
3✔
401
                *failedConfigYAML,
3✔
402
                config,
3✔
403
                oldVersion,
3✔
404
        )
3✔
405
        if err != nil {
3✔
406
                reporters.Report(reportersConstants.EventSchedulerRollbackError, eventRollbackTags)
×
407
                return err
×
408
        }
×
409

410
        err = scheduler.UpdateVersionStatus(db)
3✔
411
        if err != nil {
3✔
412
                reporters.Report(reportersConstants.EventSchedulerRollbackError, eventRollbackTags)
×
413
                return err
×
414
        }
×
415

416
        return nil
3✔
417
}
418

419
func waitTerminatingPods(
420
        ctx context.Context,
421
        l logrus.FieldLogger,
422
        clientset kubernetes.Interface,
423
        redisClient redisinterfaces.RedisClient,
424
        namespace string,
425
        deletedPods []*models.Pod,
426
        mr *models.MixedMetricsReporter,
427
) (wasCanceled bool) {
4✔
428
        logger := l.WithFields(logrus.Fields{
4✔
429
                "source":    "controller.waitTerminatingPods",
4✔
430
                "scheduler": namespace,
4✔
431
        })
4✔
432

4✔
433
        logger.Debugf("waiting for pods to terminate: %#v", names(deletedPods))
4✔
434

4✔
435
        ticker := time.NewTicker(1 * time.Second)
4✔
436
        defer ticker.Stop()
4✔
437

4✔
438
        for {
18✔
439
                exit := true
14✔
440

14✔
441
                // go select statement chooses "randomly" when two branches are available, so it can make select ignore
14✔
442
                // ctx.Done() for iterations. This check prevents to enter select if the context was already cancelled.
14✔
443
                if ctx.Err() != nil {
14✔
444
                        logger.Warn("operation canceled/timedout waiting for rooms to be removed")
×
445
                        return true
×
446
                }
×
447

448
                select {
14✔
449
                case <-ctx.Done():
1✔
450
                        logger.Warn("operation canceled/timedout waiting for rooms to be removed")
1✔
451
                        return true
1✔
452
                case <-ticker.C:
13✔
453
                        for _, pod := range deletedPods {
26✔
454
                                p, err := models.GetPodFromRedis(redisClient, mr, pod.Name, namespace)
13✔
455
                                if err != nil {
13✔
456
                                        logger.
×
457
                                                WithError(err).
×
458
                                                WithField("pod", pod.Name).
×
459
                                                Info("error getting pod")
×
460
                                        exit = false
×
461
                                        break
×
462
                                }
463

464
                                if p != nil {
23✔
465
                                        if pod.IsTerminating {
10✔
466
                                                logger.WithField("pod", pod.Name).Debugf("pod is terminating")
×
467
                                                exit = false
×
468
                                                break
×
469
                                        }
470

471
                                        logger.WithField("pod", pod.Name).Debugf("pod still exists, deleting again")
10✔
472
                                        err = mr.WithSegment(models.SegmentPod, func() error {
20✔
473
                                                return clientset.CoreV1().Pods(namespace).Delete(pod.Name, deleteOptions)
10✔
474
                                        })
10✔
475
                                        exit = false
10✔
476
                                        break
10✔
477
                                }
478
                        }
479
                }
480

481
                if exit {
16✔
482
                        logger.Info("terminating pods were successfully removed")
3✔
483
                        break
3✔
484
                }
485
        }
486

487
        return false
3✔
488
}
489

490
func waitCreatingPods(
491
        ctx context.Context,
492
        l logrus.FieldLogger,
493
        clientset kubernetes.Interface,
494
        redisClient redisinterfaces.RedisClient,
495
        config *models.ConfigYAML,
496
        createdPods []v1.Pod,
497
        mr *models.MixedMetricsReporter,
498
) (canceled bool, err error) {
9✔
499
        logger := l.WithFields(logrus.Fields{
9✔
500
                "source":    "controller.waitCreatingPods",
9✔
501
                "scheduler": config.Name,
9✔
502
        })
9✔
503

9✔
504
        ticker := time.NewTicker(1 * time.Second)
9✔
505
        defer ticker.Stop()
9✔
506

9✔
507
        var retryNo []int
9✔
508
        for range createdPods {
18✔
509
                retryNo = append(retryNo, 0)
9✔
510
        }
9✔
511
        backoffStart := time.Duration(1 * time.Second)
9✔
512

9✔
513
        for {
24✔
514
                exit := true
15✔
515

15✔
516
                // go select statement chooses "randomly" when two branches are available, so it can make select ignore
15✔
517
                // ctx.Done() for iterations. This check prevents to enter select if the context was already cancelled.
15✔
518
                if ctx.Err() != nil {
18✔
519
                        logger.Warn("operation canceled/timeout waiting for rooms to be created")
3✔
520
                        return true, nil
3✔
521
                }
3✔
522

523
                select {
12✔
524
                case <-ctx.Done():
×
525
                        logger.Warn("operation canceled/timeout waiting for rooms to be created")
×
526
                        return true, nil
×
527
                case <-ticker.C:
12✔
528
                        for i, pod := range createdPods {
24✔
529
                                createdPod, err := models.GetPodFromRedis(redisClient, mr, pod.GetName(), config.Name)
12✔
530
                                if err != nil {
12✔
531
                                        logger.
×
532
                                                WithError(err).
×
533
                                                WithField("pod", pod.GetName()).
×
534
                                                Error("error getting pod")
×
535
                                        exit = false
×
536
                                        break
×
537
                                }
538

539
                                if createdPod == nil {
16✔
540
                                        // apply exponential backoff
4✔
541
                                        retryNo[i]++
4✔
542
                                        backoff := exponentialBackoff(backoffStart, retryNo[i])
4✔
543

4✔
544
                                        exit = false
4✔
545
                                        logger.
4✔
546
                                                WithError(err).
4✔
547
                                                WithField("pod", pod.GetName()).
4✔
548
                                                Errorf("error creating pod, recreating in %s (retry %d)", backoff, retryNo[i])
4✔
549

4✔
550
                                        pod.ResourceVersion = ""
4✔
551
                                        err = mr.WithSegment(models.SegmentPod, func() error {
8✔
552
                                                var err error
4✔
553
                                                _, err = clientset.CoreV1().Pods(config.Name).Create(&pod)
4✔
554
                                                time.Sleep(backoff)
4✔
555
                                                return err
4✔
556
                                        })
4✔
557
                                        if err != nil {
8✔
558
                                                logger.
4✔
559
                                                        WithError(err).
4✔
560
                                                        WithField("pod", pod.GetName()).
4✔
561
                                                        Errorf("error recreating pod")
4✔
562
                                        }
4✔
563
                                        break
4✔
564
                                } else {
8✔
565
                                        retryNo[i] = 0
8✔
566
                                }
8✔
567

568
                                if models.IsUnitTest(createdPod) {
8✔
569
                                        break
×
570
                                }
571

572
                                if err != nil && !strings.Contains(err.Error(), "not found") {
8✔
573
                                        logger.
×
574
                                                WithError(err).
×
575
                                                WithField("pod", pod.GetName()).
×
576
                                                Error("error getting pod")
×
577
                                        exit = false
×
578
                                        break
×
579
                                }
580

581
                                if createdPod.Status.Phase != v1.PodRunning {
10✔
582
                                        podErr := models.ValidatePodWaitingState(createdPod)
2✔
583
                                        if config.PreventRoomsCreationWithError && podErr != nil {
3✔
584
                                                l.WithError(podErr).Error("pod has error, aborting the pods creation")
1✔
585
                                                return false, podErr
1✔
586
                                        }
1✔
587

588
                                        isPending, reason, message := models.PodPending(createdPod)
1✔
589
                                        if isPending && strings.Contains(message, models.PodNotFitsHostPorts) {
1✔
590
                                                l.WithFields(logrus.Fields{
×
591
                                                        "pod":     createdPod.Name,
×
592
                                                        "reason":  reason,
×
593
                                                        "message": message,
×
594
                                                }).Error("pod's host port is not available in any node of the pool, watcher will delete it soon")
×
595
                                                continue
×
596
                                        } else {
1✔
597
                                                l.WithFields(logrus.Fields{
1✔
598
                                                        "pod":     createdPod.Name,
1✔
599
                                                        "pending": isPending,
1✔
600
                                                        "reason":  reason,
1✔
601
                                                        "message": message,
1✔
602
                                                }).Debug("pod is not running yet")
1✔
603
                                                exit = false
1✔
604
                                                break
1✔
605
                                        }
606
                                }
607

608
                                if !models.IsPodReady(createdPod) || !models.IsRoomReadyOrOccupied(logger, redisClient, config.Name, createdPod.Name) {
8✔
609
                                        logger.WithField("pod", createdPod.Name).Debug("pod not ready yet, waiting...")
2✔
610
                                        err = models.ValidatePodWaitingState(createdPod)
2✔
611

2✔
612
                                        if config.PreventRoomsCreationWithError && err != nil {
3✔
613
                                                logger.WithField("pod", pod.GetName()).WithError(err).Error("invalid pod waiting state")
1✔
614
                                                return false, err
1✔
615
                                        }
1✔
616

617
                                        exit = false
1✔
618
                                        break
1✔
619
                                }
620
                        }
621
                }
622

623
                if exit {
14✔
624
                        logger.Info("creating pods are successfully running")
4✔
625
                        break
4✔
626
                }
627
        }
628

629
        return false, nil
4✔
630
}
631

632
// DeletePodAndRoom deletes the pod and removes the room from redis
633
func DeletePodAndRoom(
634
        logger logrus.FieldLogger,
635
        roomManager models.RoomManager,
636
        mr *models.MixedMetricsReporter,
637
        clientset kubernetes.Interface,
638
        redisClient redisinterfaces.RedisClient,
639
        configYaml *models.ConfigYAML,
640
        name, reason string,
641
) error {
4✔
642
        var pod *models.Pod
4✔
643
        err := mr.WithSegment(models.SegmentPod, func() error {
8✔
644
                var err error
4✔
645
                pod, err = models.NewPod(name, nil, configYaml, redisClient, mr)
4✔
646
                return err
4✔
647
        })
4✔
648
        if err != nil {
4✔
649
                return err
×
650
        }
×
651

652
        err = roomManager.Delete(logger, mr, clientset, redisClient, configYaml,
4✔
653
                pod.Name, reason)
4✔
654
        if err != nil && !strings.Contains(err.Error(), "not found") {
4✔
655
                logger.
×
656
                        WithField("roomName", pod.Name).
×
657
                        WithError(err).
×
658
                        Error("error removing pod from kube")
×
659
                return err
×
660
        }
×
661

662
        room := models.NewRoom(pod.Name, configYaml.Name)
4✔
663
        err = room.ClearAll(redisClient, mr)
4✔
664
        if err != nil {
4✔
665
                logger.
×
666
                        WithField("roomName", pod.Name).
×
667
                        WithError(err).
×
668
                        Error("error removing room info from redis")
×
669
                return err
×
670
        }
×
671

672
        return nil
4✔
673
}
674

675
func segmentPods(pods []*models.Pod, maxSurge int) [][]*models.Pod {
10✔
676
        if pods == nil || len(pods) == 0 {
10✔
677
                return make([][]*models.Pod, 0)
×
678
        }
×
679

680
        totalLength := len(pods)
10✔
681
        chunkLength := chunkLength(pods, maxSurge)
10✔
682
        chunks := nChunks(pods, chunkLength)
10✔
683
        podChunks := make([][]*models.Pod, chunks)
10✔
684

10✔
685
        for i := range podChunks {
20✔
686
                start := i * chunkLength
10✔
687
                end := start + chunkLength
10✔
688
                if end > totalLength {
10✔
689
                        end = totalLength
×
690
                }
×
691

692
                podChunks[i] = pods[start:end]
10✔
693
        }
694

695
        return podChunks
10✔
696
}
697

698
func chunkLength(pods []*models.Pod, maxSurge int) int {
10✔
699
        denominator := 100.0 / float64(maxSurge)
10✔
700
        lenPods := float64(len(pods))
10✔
701
        return int(math.Ceil(lenPods / denominator))
10✔
702
}
10✔
703

704
func nChunks(pods []*models.Pod, chunkLength int) int {
10✔
705
        return int(math.Ceil(float64(len(pods)) / float64(chunkLength)))
10✔
706
}
10✔
707

708
func names(pods []*models.Pod) []string {
14✔
709
        names := make([]string, len(pods))
14✔
710
        for i, pod := range pods {
30✔
711
                names[i] = pod.Name
16✔
712
        }
16✔
713
        return names
14✔
714
}
715

716
func waitForPods(
717
        timeout time.Duration,
718
        clientset kubernetes.Interface,
719
        redisClient redisinterfaces.RedisClient,
720
        config *models.ConfigYAML,
721
        pods []*v1.Pod,
722
        l logrus.FieldLogger,
723
        mr *models.MixedMetricsReporter,
724
) error {
50✔
725
        timeoutTimer := time.NewTimer(timeout)
50✔
726
        defer timeoutTimer.Stop()
50✔
727
        ticker := time.NewTicker(500 * time.Millisecond)
50✔
728
        defer ticker.Stop()
50✔
729

50✔
730
        var retryNo []int
50✔
731
        for range pods {
234✔
732
                retryNo = append(retryNo, 0)
184✔
733
        }
184✔
734
        backoffStart := time.Duration(500 * time.Millisecond)
50✔
735

50✔
736
        for {
100✔
737
                exit := true
50✔
738
                select {
50✔
739
                case <-timeoutTimer.C:
×
740
                        msg := "timeout waiting for rooms to be created"
×
741
                        l.Error(msg)
×
742
                        return errors.New(msg)
×
743
                case <-ticker.C:
50✔
744
                        for i := range pods {
234✔
745
                                if pods[i] != nil {
367✔
746
                                        var pod *models.Pod
183✔
747
                                        pod, err := models.GetPodFromRedis(redisClient, mr, pods[i].GetName(), config.Name)
183✔
748
                                        if err != nil || pod == nil {
183✔
749
                                                // apply exponential backoff
×
750
                                                retryNo[i]++
×
751
                                                backoff := exponentialBackoff(backoffStart, retryNo[i])
×
752

×
753
                                                //The pod does not exist (not even on Pending or ContainerCreating state), so create again
×
754
                                                exit = false
×
755
                                                l.WithError(err).Infof("error creating pod %s, recreating in %s (retry %d)", pods[i].GetName(), backoff, retryNo[i])
×
756

×
757
                                                pods[i].ResourceVersion = ""
×
758
                                                err = mr.WithSegment(models.SegmentPod, func() error {
×
759
                                                        _, err = clientset.CoreV1().Pods(config.Name).Create(pods[i])
×
760
                                                        time.Sleep(backoff)
×
761
                                                        return err
×
762
                                                })
×
763
                                                if err != nil {
×
764
                                                        l.WithError(err).Errorf("error recreating pod %s", pods[i].GetName())
×
765
                                                }
×
766
                                        } else {
183✔
767
                                                if models.IsUnitTest(pod) {
183✔
768
                                                        break
×
769
                                                }
770

771
                                                retryNo[i] = 0
183✔
772

183✔
773
                                                if pod.Status.Phase != v1.PodRunning {
183✔
774
                                                        podErr := models.ValidatePodWaitingState(pod)
×
775
                                                        if config.PreventRoomsCreationWithError && podErr != nil {
×
776
                                                                l.WithError(podErr).Error("pod has error, aborting the pods creation")
×
777
                                                                return podErr
×
778
                                                        }
×
779

780
                                                        isPending, reason, message := models.PodPending(pod)
×
781
                                                        if isPending && strings.Contains(message, models.PodNotFitsHostPorts) {
×
782
                                                                l.WithFields(logrus.Fields{
×
783
                                                                        "reason":  reason,
×
784
                                                                        "message": message,
×
785
                                                                }).Error("pod's host port is not available in any node of the pool, watcher will delete it soon")
×
786
                                                                continue
×
787
                                                        } else {
×
788
                                                                l.WithFields(logrus.Fields{
×
789
                                                                        "pod":     pod.Name,
×
790
                                                                        "pending": isPending,
×
791
                                                                        "reason":  reason,
×
792
                                                                        "message": message,
×
793
                                                                }).Warn("pod is not running yet")
×
794
                                                                exit = false
×
795
                                                                break
×
796
                                                        }
797
                                                }
798

799
                                                if !models.IsPodReady(pod) {
183✔
800
                                                        exit = false
×
801
                                                        break
×
802
                                                }
803
                                        }
804
                                }
805
                        }
806
                        l.Debug("scaling scheduler...")
50✔
807
                }
808
                if exit {
100✔
809
                        l.Info("finished scaling scheduler")
50✔
810
                        break
50✔
811
                }
812
        }
813
        return nil
50✔
814
}
815

816
func hasNotReadyPods(
817
        config *viper.Viper,
818
        redisClient redisinterfaces.RedisClient,
819
        namespace string,
820
        checkPodsWithError bool,
821
        mr *models.MixedMetricsReporter,
822
) (bool, error) {
55✔
823
        var pods map[string]*models.Pod
55✔
824
        err := mr.WithSegment(models.SegmentPod, func() error {
110✔
825
                var err error
55✔
826
                pods, err = models.GetPodMapFromRedis(config, redisClient, mr, namespace)
55✔
827
                return err
55✔
828
        })
55✔
829
        if err != nil {
55✔
830
                return false, maestroErrors.NewKubernetesError("error when listing pods", err)
×
831
        }
×
832

833
        for _, pod := range pods {
62✔
834
                if pod.Status.Phase == v1.PodPending {
8✔
835
                        return true, nil
1✔
836
                }
1✔
837

838
                podErr := models.ValidatePodWaitingState(pod)
6✔
839
                if checkPodsWithError && podErr != nil {
7✔
840
                        return true, nil
1✔
841
                }
1✔
842
        }
843

844
        return false, nil
53✔
845
}
846

847
func getSchedulersAndGlobalPortRanges(
848
        db pginterfaces.DB,
849
        redis redisinterfaces.RedisClient,
850
        log logrus.FieldLogger,
851
) (ranges map[string]*models.PortRange, err error) {
6✔
852
        log = log.WithField("operation", "controller.getSchedulersPortRanges")
6✔
853

6✔
854
        ranges = map[string]*models.PortRange{}
6✔
855

6✔
856
        log.Debug("listing schedulers")
6✔
857
        names, err := models.ListSchedulersNames(db)
6✔
858
        if err != nil {
6✔
859
                log.WithError(err).Error("error listing schedulers from db")
×
860
                return ranges, err
×
861
        }
×
862

863
        log.Debug("loading schedulers")
6✔
864
        schedulers, err := models.LoadSchedulers(db, names)
6✔
865
        if err != nil {
6✔
866
                log.WithError(err).Error("error loading schedulers from db")
×
867
                return ranges, err
×
868
        }
×
869

870
        log.Debug("unmarshaling config yamls")
6✔
871
        for _, scheduler := range schedulers {
6✔
872
                configYaml, err := models.NewConfigYAML(scheduler.YAML)
×
873
                if err != nil {
×
874
                        log.WithError(err).Errorf("failed to unmarshal scheduler %s", scheduler.Name)
×
875
                        return nil, err
×
876
                }
×
877

878
                if configYaml.PortRange.IsSet() {
×
879
                        ranges[scheduler.Name] = configYaml.PortRange
×
880
                }
×
881
        }
882

883
        log.Debug("getting global port range")
6✔
884
        start, end, err := models.GetGlobalPortRange(redis)
6✔
885
        if err != nil {
6✔
886
                log.WithError(err).Error("failed to get global port range from redis")
×
887
                return ranges, err
×
888
        }
×
889

890
        log.Debug("successfully got port ranges")
6✔
891
        ranges[models.Global] = &models.PortRange{
6✔
892
                Start: start,
6✔
893
                End:   end,
6✔
894
        }
6✔
895

6✔
896
        return ranges, nil
6✔
897
}
898

899
func checkPortRange(
900
        oldConfig, newConfig *models.ConfigYAML,
901
        log logrus.FieldLogger,
902
        db pginterfaces.DB,
903
        redis redisinterfaces.RedisClient,
904
) (changedPortRange bool, err error) {
48✔
905
        isCreatingScheduler := oldConfig == nil
48✔
906

48✔
907
        if isCreatingScheduler {
84✔
908
                if !newConfig.PortRange.IsSet() {
68✔
909
                        return false, nil
32✔
910
                }
32✔
911
        } else {
12✔
912
                if !oldConfig.PortRange.IsSet() && !newConfig.PortRange.IsSet() {
21✔
913
                        return false, nil
9✔
914
                }
9✔
915

916
                if oldConfig.PortRange.IsSet() && !newConfig.PortRange.IsSet() {
4✔
917
                        return true, nil
1✔
918
                }
1✔
919

920
                if oldConfig.PortRange.Equals(newConfig.PortRange) {
2✔
921
                        log.Info("old scheduler contains new port range, skipping port check")
×
922
                        return false, nil
×
923
                }
×
924

925
                if !newConfig.PortRange.IsValid() {
2✔
926
                        return false, errors.New("port range is invalid")
×
927
                }
×
928
        }
929

930
        log.Info("update changed ports pool, getting all used ports range")
6✔
931
        ranges, err := getSchedulersAndGlobalPortRanges(db, redis, log)
6✔
932
        if err != nil {
6✔
933
                return true, err
×
934
        }
×
935

936
        log.WithField("pool", newConfig.PortRange.String()).Info("checking if new pool has intersection with other ones")
6✔
937
        for schedulerName, portRange := range ranges {
12✔
938
                if schedulerName == newConfig.Name {
6✔
939
                        continue
×
940
                }
941
                if portRange.HasIntersection(newConfig.PortRange) {
7✔
942
                        return true, fmt.Errorf("scheduler trying to use ports used by pool '%s'", schedulerName)
1✔
943
                }
1✔
944
        }
945

946
        return true, nil
5✔
947
}
948

949
// SetScalingAmount check the max and min limits and adjust the amount to scale accordingly
950
func SetScalingAmount(
951
        logger logrus.FieldLogger,
952
        mr *models.MixedMetricsReporter,
953
        db pginterfaces.DB,
954
        redisClient redisinterfaces.RedisClient,
955
        scheduler *models.Scheduler,
956
        max, min, amount int,
957
        isScaleDown bool,
958
) (int, error) {
69✔
959
        currentRooms, err := models.GetRoomsCountByStatus(redisClient, scheduler.Name)
69✔
960
        if err != nil {
69✔
961
                return 0, err
×
962
        }
×
963

964
        if isScaleDown == true {
80✔
965
                return setScaleDownAmount(logger, amount, currentRooms.Available(), max, min), nil
11✔
966
        }
11✔
967

968
        return setScaleUpAmount(logger, amount, currentRooms.Available(), max, min), nil
58✔
969
}
970

971
func setScaleUpAmount(logger logrus.FieldLogger, amount, currentRooms, max, min int) int {
58✔
972
        if max > 0 {
66✔
973
                if currentRooms >= max {
12✔
974
                        logger.Warn("scale already at max. Not scaling up any rooms")
4✔
975
                        return 0
4✔
976
                }
4✔
977

978
                if currentRooms+amount > max {
6✔
979
                        logger.Warnf("amount to scale is higher than max. Maestro will scale up to the max of %d", max)
2✔
980
                        return max - currentRooms
2✔
981
                }
2✔
982
        }
983

984
        if currentRooms+amount < min {
53✔
985
                logger.Warnf("amount to scale is lower than min. Maestro will scale up to the min of %d", min)
1✔
986
                return min - currentRooms
1✔
987
        }
1✔
988

989
        return amount
51✔
990
}
991

992
func setScaleDownAmount(logger logrus.FieldLogger, amount, currentRooms, max, min int) int {
11✔
993
        if min > 0 {
22✔
994
                if currentRooms <= min {
14✔
995
                        logger.Warn("scale already at min. Not scaling down any rooms")
3✔
996
                        return 0
3✔
997
                }
3✔
998

999
                if currentRooms-amount < min {
9✔
1000
                        logger.Warnf("amount to scale is lower than min. Maestro will scale down to the min of %d", min)
1✔
1001
                        return currentRooms - min
1✔
1002
                }
1✔
1003
        }
1004

1005
        if max > 0 && currentRooms-amount > max {
8✔
1006
                logger.Warnf("amount to scale is lower than max. Maestro will scale down to the max of %d", max)
1✔
1007
                return currentRooms - max
1✔
1008
        }
1✔
1009

1010
        return amount
6✔
1011
}
1012

1013
func validateMetricsTrigger(configYAML *models.ConfigYAML, logger logrus.FieldLogger) error {
66✔
1014
        for _, trigger := range configYAML.AutoScaling.Up.MetricsTrigger {
80✔
1015
                if trigger.Type == models.CPUAutoScalingPolicyType {
24✔
1016
                        if (configYAML.Requests == nil || configYAML.Requests.CPU == "") && len(configYAML.Containers) == 0 {
13✔
1017
                                logger.Error("must set requests.cpu in order to use cpu autoscaling")
3✔
1018
                                return fmt.Errorf("must set requests.cpu in order to use cpu autoscaling")
3✔
1019
                        }
3✔
1020
                        for _, container := range configYAML.Containers {
12✔
1021
                                if container.Requests == nil || container.Requests.CPU == "" {
8✔
1022
                                        logger.Error("must set requests.cpu in order to use cpu autoscaling")
3✔
1023
                                        return fmt.Errorf("must set requests.cpu in order to use cpu autoscaling")
3✔
1024
                                }
3✔
1025
                        }
1026
                }
1027

1028
                if trigger.Type == models.MemAutoScalingPolicyType {
12✔
1029
                        if (configYAML.Requests == nil || configYAML.Requests.Memory == "") && len(configYAML.Containers) == 0 {
6✔
1030
                                logger.Error("must set requests.memory in order to use mem autoscaling")
2✔
1031
                                return fmt.Errorf("must set requests.memory in order to use mem autoscaling")
2✔
1032
                        }
2✔
1033
                        for _, container := range configYAML.Containers {
4✔
1034
                                if container.Requests == nil || container.Requests.Memory == "" {
4✔
1035
                                        logger.Error("must set requests.memory in order to use mem autoscaling")
2✔
1036
                                        return fmt.Errorf("must set requests.memory in order to use mem autoscaling")
2✔
1037
                                }
2✔
1038
                        }
1039
                }
1040
        }
1041

1042
        for _, trigger := range configYAML.AutoScaling.Down.MetricsTrigger {
56✔
1043
                if trigger.Type == models.CPUAutoScalingPolicyType {
×
1044
                        if (configYAML.Requests == nil || configYAML.Requests.CPU == "") && len(configYAML.Containers) == 0 {
×
1045
                                logger.Error("must set requests.cpu in order to use cpu autoscaling")
×
1046
                                return fmt.Errorf("must set requests.cpu in order to use cpu autoscaling")
×
1047
                        }
×
1048
                        for _, container := range configYAML.Containers {
×
1049
                                if container.Requests == nil || container.Requests.CPU == "" {
×
1050
                                        logger.Error("must set requests.cpu in order to use cpu autoscaling")
×
1051
                                        return fmt.Errorf("must set requests.cpu in order to use cpu autoscaling")
×
1052
                                }
×
1053
                        }
1054
                }
1055

1056
                if trigger.Type == models.MemAutoScalingPolicyType {
×
1057
                        if (configYAML.Requests == nil || configYAML.Requests.Memory == "") && len(configYAML.Containers) == 0 {
×
1058
                                logger.Error("must set requests.memory in order to use mem autoscaling")
×
1059
                                return fmt.Errorf("must set requests.memory in order to use mem autoscaling")
×
1060
                        }
×
1061
                        for _, container := range configYAML.Containers {
×
1062
                                if container.Requests == nil || container.Requests.Memory == "" {
×
1063
                                        logger.Error("must set requests.memory in order to use mem autoscaling")
×
1064
                                        return fmt.Errorf("must set requests.memory in order to use mem autoscaling")
×
1065
                                }
×
1066
                        }
1067
                }
1068
        }
1069
        return nil
56✔
1070
}
1071

1072
func schedulerAndConfigFromName(
1073
        mr *models.MixedMetricsReporter,
1074
        db pginterfaces.DB,
1075
        schedulerName string,
1076
) (
1077
        *models.Scheduler,
1078
        models.ConfigYAML,
1079
        error,
1080
) {
24✔
1081
        var configYaml models.ConfigYAML
24✔
1082
        scheduler := models.NewScheduler(schedulerName, "", "")
24✔
1083
        err := mr.WithSegment(models.SegmentSelect, func() error {
48✔
1084
                return scheduler.Load(db)
24✔
1085
        })
24✔
1086
        if err != nil {
27✔
1087
                return nil, configYaml, maestroErrors.NewDatabaseError(err)
3✔
1088
        }
3✔
1089

1090
        // Check if scheduler to Update exists indeed
1091
        if scheduler.YAML == "" {
23✔
1092
                msg := fmt.Sprintf("scheduler %s not found, create it first", schedulerName)
2✔
1093
                return nil, configYaml, maestroErrors.NewValidationFailedError(errors.New(msg))
2✔
1094
        }
2✔
1095
        err = yaml.Unmarshal([]byte(scheduler.YAML), &configYaml)
19✔
1096
        if err != nil {
20✔
1097
                return nil, configYaml, err
1✔
1098
        }
1✔
1099
        return scheduler, configYaml, nil
18✔
1100
}
1101

1102
func validateConfig(logger logrus.FieldLogger, configYAML *models.ConfigYAML, maxSurge int) error {
19✔
1103
        if maxSurge <= 0 {
19✔
1104
                return errors.New("invalid parameter: maxsurge must be greater than 0")
×
1105
        }
×
1106

1107
        if configYAML.AutoScaling.Max > 0 && configYAML.AutoScaling.Min > configYAML.AutoScaling.Max {
20✔
1108
                return errors.New("invalid parameter: autoscaling max must be greater than min")
1✔
1109
        }
1✔
1110

1111
        // if using resource scaling (cpu, mem) requests must be set
1112
        return validateMetricsTrigger(configYAML, logger)
18✔
1113
}
1114

1115
// LoadScheduler loads a scheduler from DB with its YAML config
1116
func LoadScheduler(
1117
        mr *models.MixedMetricsReporter,
1118
        db pginterfaces.DB,
1119
        schedulerOrNil *models.Scheduler,
1120
        schedulerName string,
1121
) (*models.Scheduler, models.ConfigYAML, error) {
14✔
1122
        var scheduler *models.Scheduler
14✔
1123
        var configYAML models.ConfigYAML
14✔
1124
        var err error
14✔
1125
        if schedulerOrNil != nil {
17✔
1126
                scheduler = schedulerOrNil
3✔
1127
                err = yaml.Unmarshal([]byte(scheduler.YAML), &configYAML)
3✔
1128
                if err != nil {
3✔
1129
                        return scheduler, configYAML, err
×
1130
                }
×
1131
        } else {
11✔
1132
                scheduler, configYAML, err = schedulerAndConfigFromName(mr, db, schedulerName)
11✔
1133
                if err != nil {
13✔
1134
                        return scheduler, configYAML, err
2✔
1135
                }
2✔
1136
        }
1137
        return scheduler, configYAML, nil
12✔
1138
}
1139

1140
// AcquireLock acquires a lock defined by its lockKey
1141
func AcquireLock(
1142
        ctx context.Context,
1143
        logger logrus.FieldLogger,
1144
        redisClient *redis.Client,
1145
        config *viper.Viper,
1146
        operationManager *models.OperationManager,
1147
        lockKey, schedulerName string,
1148
) (lock *redisLock.Lock, canceled bool, err error) {
25✔
1149
        timeoutSec := config.GetInt("updateTimeoutSeconds")
25✔
1150
        lockTimeoutMS := config.GetInt("watcher.lockTimeoutMs")
25✔
1151
        timeoutDur := time.Duration(timeoutSec) * time.Second
25✔
1152
        ticker := time.NewTicker(2 * time.Second)
25✔
1153

25✔
1154
        // guarantee that downScaling and config locks doesn't timeout before update times out.
25✔
1155
        // otherwise it can result in all pods dying during a rolling update that is destined to timeout
25✔
1156
        if (lockKey == models.GetSchedulerDownScalingLockKey(config.GetString("watcher.lockKey"), schedulerName) ||
25✔
1157
                lockKey == models.GetSchedulerConfigLockKey(config.GetString("watcher.lockKey"), schedulerName)) &&
25✔
1158
                lockTimeoutMS < timeoutSec*1000 {
43✔
1159
                lockTimeoutMS = (timeoutSec + 1) * 1000
18✔
1160
        }
18✔
1161

1162
        defer ticker.Stop()
25✔
1163
        timeout := time.NewTimer(timeoutDur)
25✔
1164
        defer timeout.Stop()
25✔
1165

25✔
1166
        l := logger.WithFields(logrus.Fields{
25✔
1167
                "source":    "AcquireLock",
25✔
1168
                "scheduler": schedulerName,
25✔
1169
        })
25✔
1170

25✔
1171
        for {
50✔
1172
                exit := false
25✔
1173
                lock, err = redisClient.EnterCriticalSection(
25✔
1174
                        redisClient.Trace(ctx),
25✔
1175
                        lockKey,
25✔
1176
                        time.Duration(lockTimeoutMS)*time.Millisecond,
25✔
1177
                        0, 0,
25✔
1178
                )
25✔
1179
                select {
25✔
1180
                case <-timeout.C:
2✔
1181
                        l.Warn("timeout while wating for redis lock")
2✔
1182
                        return nil, false, errors.New("timeout while wating for redis lock")
2✔
1183
                case <-ticker.C:
23✔
1184
                        if operationManager != nil {
39✔
1185
                                canceled, err := operationManager.WasCanceled()
16✔
1186
                                if canceled && err == nil {
17✔
1187
                                        l.Warn("operation was canceled")
1✔
1188
                                        return nil, true, nil
1✔
1189
                                }
1✔
1190
                        }
1191

1192
                        if err != nil {
23✔
1193
                                l.WithError(err).Error("error getting watcher lock")
1✔
1194
                                return nil, false, err
1✔
1195
                        }
1✔
1196

1197
                        if lock == nil {
21✔
1198
                                l.Warnf("unable to get watcher %s lock %s, maybe some other process has it", schedulerName, lockKey)
×
1199
                                break
×
1200
                        }
1201

1202
                        if lock.IsLocked() {
42✔
1203
                                exit = true
21✔
1204
                                break
21✔
1205
                        }
1206
                }
1207
                if exit {
42✔
1208
                        l.Debugf("acquired lock %s", lockKey)
21✔
1209
                        break
21✔
1210
                }
1211
        }
1212

1213
        return lock, false, err
21✔
1214
}
1215

1216
// AcquireLockOnce tries to acquire a lock defined by its lockKey only once
1217
// If lock is already acquired by another process it just returns an error
1218
func AcquireLockOnce(
1219
        ctx context.Context,
1220
        logger logrus.FieldLogger,
1221
        redisClient *redis.Client,
1222
        config *viper.Viper,
1223
        lockKey string,
1224
        schedulerName string,
1225
) (*redisLock.Lock, error) {
×
1226
        l := logger.WithFields(logrus.Fields{
×
1227
                "source":    "AcquireLockOnce",
×
1228
                "scheduler": schedulerName,
×
1229
        })
×
1230

×
1231
        lockTimeoutMS := config.GetInt("watcher.lockTimeoutMs")
×
1232

×
1233
        lock, err := redisClient.EnterCriticalSection(
×
1234
                redisClient.Trace(ctx),
×
1235
                lockKey,
×
1236
                time.Duration(lockTimeoutMS)*time.Millisecond,
×
1237
                0, 0,
×
1238
        )
×
1239

×
1240
        if err != nil {
×
1241
                l.WithError(err).Error("error acquiring lock")
×
1242
                return nil, err
×
1243
        }
×
1244

1245
        if lock == nil {
×
1246
                l.Warnf("unable to acquire scheduler %s lock %s, maybe some other process has it", schedulerName, lockKey)
×
1247
                return nil, fmt.Errorf("unable to acquire scheduler %s lock %s, maybe some other process has it", schedulerName, lockKey)
×
1248
        }
×
1249

1250
        if lock.IsLocked() {
×
1251
                l.Debugf("acquired lock %s", lockKey)
×
1252
        }
×
1253

1254
        return lock, err
×
1255
}
1256

1257
// ReleaseLock releases a lock defined by its lockKey
1258
func ReleaseLock(
1259
        logger logrus.FieldLogger,
1260
        redisClient *redis.Client,
1261
        lock *redisLock.Lock,
1262
        schedulerName string,
1263
) {
25✔
1264
        l := logger.WithFields(logrus.Fields{
25✔
1265
                "source":    "ReleaseLock",
25✔
1266
                "scheduler": schedulerName,
25✔
1267
        })
25✔
1268

25✔
1269
        if lock != nil {
46✔
1270
                err := redisClient.LeaveCriticalSection(lock)
21✔
1271
                if err != nil {
21✔
1272
                        l.WithError(err).Error("error releasing lock. Either wait or remove it manually from redis")
×
1273
                } else {
21✔
1274
                        l.Debug("lock released")
21✔
1275
                }
21✔
1276
        } else {
4✔
1277
                l.Debug("lock is nil. No lock to release")
4✔
1278
        }
4✔
1279
}
1280

1281
func saveConfigYAML(
1282
        ctx context.Context,
1283
        logger logrus.FieldLogger,
1284
        mr *models.MixedMetricsReporter,
1285
        db pginterfaces.DB,
1286
        configYAML *models.ConfigYAML,
1287
        scheduler *models.Scheduler,
1288
        oldConfig models.ConfigYAML,
1289
        config *viper.Viper,
1290
        oldVersion string,
1291
) error {
15✔
1292
        l := logger.WithFields(logrus.Fields{
15✔
1293
                "source":    "saveConfigYAML",
15✔
1294
                "scheduler": configYAML.Name,
15✔
1295
        })
15✔
1296
        maxVersions := config.GetInt("schedulers.versions.toKeep")
15✔
1297

15✔
1298
        l.Debug("updating configYAML on database")
15✔
1299

15✔
1300
        // Update new config on DB
15✔
1301
        configBytes, err := yaml.Marshal(configYAML)
15✔
1302
        if err != nil {
15✔
1303
                return err
×
1304
        }
×
1305
        yamlString := string(configBytes)
15✔
1306
        scheduler.Game = configYAML.Game
15✔
1307
        scheduler.YAML = yamlString
15✔
1308

15✔
1309
        if string(oldConfig.ToYAML()) != string(configYAML.ToYAML()) {
30✔
1310
                err = mr.WithSegment(models.SegmentUpdate, func() error {
30✔
1311
                        created, err := scheduler.UpdateVersion(db, maxVersions, oldVersion)
15✔
1312
                        if !created {
15✔
1313
                                return err
×
1314
                        }
×
1315
                        if err != nil {
15✔
1316
                                l.WithError(err).Error("error on operation on scheduler_verions table. But the newest one was created.")
×
1317
                        }
×
1318
                        return nil
15✔
1319
                })
1320
                if err != nil {
15✔
1321
                        l.WithError(err).Error("failed to update scheduler on database")
×
1322
                        return err
×
1323
                }
×
1324

1325
                l.Info("updated configYaml on database")
15✔
1326
        } else {
×
1327
                l.Info("config yaml is the same, skipping")
×
1328
        }
×
1329

1330
        return nil
15✔
1331
}
1332

1333
// ListCurrentPods returns a list of kubernetes pods
1334
func ListCurrentPods(
1335
        mr *models.MixedMetricsReporter,
1336
        clientset kubernetes.Interface,
1337
        schedulerName string,
1338
) (*v1.PodList, error) {
×
1339
        var kubePods *v1.PodList
×
1340
        err := mr.WithSegment(models.SegmentPod, func() error {
×
1341
                var err error
×
1342
                kubePods, err = clientset.CoreV1().Pods(schedulerName).List(metav1.ListOptions{
×
1343
                        LabelSelector: labels.Set{}.AsSelector().String(),
×
1344
                        FieldSelector: fields.Everything().String(),
×
1345
                })
×
1346
                return err
×
1347
        })
×
1348
        if err != nil {
×
1349
                return nil, maestroErrors.NewKubernetesError("error when listing pods", err)
×
1350
        }
×
1351
        return kubePods, nil
×
1352
}
1353

1354
func deleteSchedulerHelper(
1355
        logger logrus.FieldLogger,
1356
        mr *models.MixedMetricsReporter,
1357
        db pginterfaces.DB,
1358
        redisClient redisinterfaces.RedisClient,
1359
        clientset kubernetes.Interface,
1360
        scheduler *models.Scheduler,
1361
        namespace *models.Namespace,
1362
        timeoutSec int,
1363
) error {
8✔
1364
        var err error
8✔
1365
        if scheduler.ID != "" {
9✔
1366
                scheduler.State = models.StateTerminating
1✔
1367
                scheduler.StateLastChangedAt = time.Now().Unix()
1✔
1368
                if scheduler.LastScaleOpAt == 0 {
2✔
1369
                        scheduler.LastScaleOpAt = 1
1✔
1370
                }
1✔
1371
                err = mr.WithSegment(models.SegmentUpdate, func() error {
2✔
1372
                        return scheduler.Update(db)
1✔
1373
                })
1✔
1374
                if err != nil {
2✔
1375
                        logger.WithError(err).Error("failed to update scheduler state")
1✔
1376
                        return err
1✔
1377
                }
1✔
1378
        }
1379

1380
        configYAML, _ := models.NewConfigYAML(scheduler.YAML)
7✔
1381
        // Delete pods and wait for graceful termination before deleting the namespace
7✔
1382
        err = mr.WithSegment(models.SegmentPod, func() error {
14✔
1383
                return namespace.DeletePods(clientset, redisClient, mr, scheduler)
7✔
1384
        })
7✔
1385
        if err != nil {
7✔
1386
                logger.WithError(err).Error("failed to delete namespace pods")
×
1387
                return err
×
1388
        }
×
1389
        timeoutPods := time.NewTimer(time.Duration(2*configYAML.ShutdownTimeout) * time.Second)
7✔
1390
        defer timeoutPods.Stop()
7✔
1391
        ticker := time.NewTicker(1 * time.Second)
7✔
1392
        defer ticker.Stop()
7✔
1393

7✔
1394
        time.Sleep(10 * time.Nanosecond) //This negligible sleep avoids race condition
7✔
1395
        exit := false
7✔
1396
        for !exit {
14✔
1397
                select {
7✔
1398
                case <-timeoutPods.C:
1✔
1399
                        return errors.New("timeout deleting scheduler pods")
1✔
1400
                case <-ticker.C:
6✔
1401
                        var podCount int
6✔
1402
                        listErr := mr.WithSegment(models.SegmentPod, func() error {
12✔
1403
                                var err error
6✔
1404
                                podCount, err = models.GetPodCountFromRedis(redisClient, mr, scheduler.Name)
6✔
1405
                                return err
6✔
1406
                        })
6✔
1407
                        if listErr != nil {
6✔
1408
                                logger.WithError(listErr).Error("error listing pods")
×
1409
                        } else if podCount == 0 {
12✔
1410
                                exit = true
6✔
1411
                        }
6✔
1412
                        logger.Debug("deleting scheduler pods")
6✔
1413
                }
1414
        }
1415

1416
        err = mr.WithSegment(models.SegmentNamespace, func() error {
12✔
1417
                return namespace.Delete(clientset)
6✔
1418
        })
6✔
1419
        if err != nil {
6✔
1420
                logger.WithError(err).Error("failed to delete namespace while deleting scheduler")
×
1421
                return err
×
1422
        }
×
1423
        timeoutNamespace := time.NewTimer(time.Duration(timeoutSec) * time.Second)
6✔
1424
        defer timeoutNamespace.Stop()
6✔
1425

6✔
1426
        time.Sleep(10 * time.Nanosecond) //This negligible sleep avoids race condition
6✔
1427
        exit = false
6✔
1428
        for !exit {
12✔
1429
                select {
6✔
1430
                case <-timeoutNamespace.C:
1✔
1431
                        return errors.New("timeout deleting namespace")
1✔
1432
                default:
5✔
1433
                        exists, existsErr := namespace.Exists(clientset)
5✔
1434
                        if existsErr != nil {
5✔
1435
                                logger.WithError(existsErr).Error("error checking namespace existence")
×
1436
                        } else if !exists {
10✔
1437
                                exit = true
5✔
1438
                        }
5✔
1439
                        logger.Debug("deleting scheduler namespace")
5✔
1440
                        time.Sleep(time.Duration(1) * time.Second)
5✔
1441
                }
1442
        }
1443

1444
        // Delete from DB must be the last operation because
1445
        // if kubernetes failed to delete pods, watcher will recreate
1446
        // and keep the last state
1447
        err = mr.WithSegment(models.SegmentDelete, func() error {
10✔
1448
                return scheduler.Delete(db)
5✔
1449
        })
5✔
1450
        if err != nil {
6✔
1451
                logger.WithError(err).Error("failed to delete scheduler from database while deleting scheduler")
1✔
1452
                return err
1✔
1453
        }
1✔
1454

1455
        reporters.Report(reportersConstants.EventSchedulerDelete, map[string]interface{}{
4✔
1456
                "name": scheduler.Name,
4✔
1457
                "game": scheduler.Game,
4✔
1458
        })
4✔
1459

4✔
1460
        return nil
4✔
1461
}
1462

1463
func exponentialBackoff(backoffStart time.Duration, retryNo int) time.Duration {
4✔
1464
        min := 1
4✔
1465
        max := int(math.Pow(2, float64(retryNo)))
4✔
1466
        k := rand.Intn(max-min) + min
4✔
1467

4✔
1468
        return time.Duration(k * int(backoffStart))
4✔
1469
}
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc