• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-redis-operator / redis-cluster-operator / 15751580462

19 Jun 2025 06:57AM UTC coverage: 65.76% (-1.4%) from 67.141%
15751580462

push

github

serdarkalayci
implement scale in
relates to #1

11 of 74 new or added lines in 3 files covered. (14.86%)

3 existing lines in 1 file now uncovered.

895 of 1361 relevant lines covered (65.76%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.01
/controller/rediscluster_controller.go
1
/*
2
Copyright 2023.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "time"
23

24
        "github.com/go-logr/logr"
25
        redisinternal "github.com/kubernetes-redis-operator/redis-cluster-operator/internal/redis"
26
        "github.com/kubernetes-redis-operator/redis-cluster-operator/internal/utils"
27
        "github.com/redis/go-redis/v9"
28
        appsv1 "k8s.io/api/apps/v1"
29
        corev1 "k8s.io/api/core/v1"
30
        apierrors "k8s.io/apimachinery/pkg/api/errors"
31
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32
        "k8s.io/apimachinery/pkg/util/wait"
33
        "k8s.io/client-go/util/retry"
34

35
        "k8s.io/apimachinery/pkg/runtime"
36
        ctrl "sigs.k8s.io/controller-runtime"
37
        "sigs.k8s.io/controller-runtime/pkg/log"
38

39
        redisclusterv1alpha1 "github.com/kubernetes-redis-operator/redis-cluster-operator/api/v1alpha1"
40
)
41

42
// RedisClusterReconciler reconciles a RedisCluster object
43
type RedisClusterReconciler struct {
44
        KubernetesManager IKubernetesManager
45
        Scheme            *runtime.Scheme
46
}
47

48
//+kubebuilder:rbac:groups=rediscluster.kuro.io,resources=redisclusters,verbs=get;list;watch;create;update;patch;delete
49
//+kubebuilder:rbac:groups=rediscluster.kuro.io,resources=redisclusters/status,verbs=get;update;patch
50
//+kubebuilder:rbac:groups=rediscluster.kuro.io,resources=redisclusters/finalizers,verbs=update
51

52
// Reconcile is part of the main kubernetes reconciliation loop which aims to
53
// move the current state of the cluster closer to the desired state.
54
// TODO(user): Modify the Reconcile function to compare the state specified by
55
// the RedisCluster object against the actual cluster state, and then
56
// perform operations to make the cluster state reflect the state specified by
57
// the user.
58
//
59
// For more details, check Reconcile and its Result here:
60
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.14.1/pkg/reconcile
61
func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
1✔
62
        logger := log.FromContext(ctx)
1✔
63

1✔
64
        logger.Info("Reconciling RedisCluster", "cluster", req.Name, "namespace", req.Namespace)
1✔
65

1✔
66
        //region Try to get the RedisCluster object from the cluster to make sure it still exists
1✔
67
        redisCluster, err := r.KubernetesManager.FetchRedisCluster(ctx, req.NamespacedName)
1✔
68

1✔
69
        if err != nil {
2✔
70
                if apierrors.IsNotFound(err) {
2✔
71
                        // The RedisCluster resource is not found on the cluster. Probably deleted by the user before this reconciliation. We'll quit early.
1✔
72
                        logger.Info("RedisCluster not found during reconcile. Probably deleted by user. Exiting early.")
1✔
73
                        return ctrl.Result{}, nil
1✔
74
                }
1✔
75
        }
76
        //endregion
77

78
        //region Try to get the ConfigMap for the RedisCluster
79
        result, err := r.ensureConfigMap(ctx, logger, redisCluster)
1✔
80
        if err != nil || result.RequeueAfter > 0 {
2✔
81
                return result, err
1✔
82
        }
1✔
83
        //endregion
84

85
        //region Ensure Statefulset
86
        masterSSet, replSSets, result, err := r.ensureStatefulSet(ctx, logger, redisCluster)
1✔
87
        if err != nil || result.RequeueAfter > 0 {
2✔
88
                return result, err
1✔
89
        }
1✔
90
        //endregion
91

92
        //region Ensure Service
93
        result, err = r.ensureService(ctx, logger, redisCluster)
1✔
94
        if err != nil || result.RequeueAfter > 0 {
2✔
95
                return result, err
1✔
96
        }
1✔
97
        //endregion
98
        clusterNodes := redisinternal.ClusterNodes{}
1✔
99
        if redisCluster.Status.GetInOperationCondition() == nil {
2✔
100
                // If the cluster is not in operation, we should set the in operation condition to false.
1✔
101
                logger.Info("Cluster is initializing.")
1✔
102
                redisCluster.Status.ClusterState = redisclusterv1alpha1.ClusterStateInitializing
1✔
103
                redisCluster.Status.SetInOperationCondition(true, "ClusterStateInitializing", "Cluster is initializing state")
1✔
104
                err = r.KubernetesManager.UpdateResourceStatus(ctx, redisCluster)
1✔
105
                if err != nil {
1✔
NEW
106
                        return r.RequeueError(ctx, "Could not update RedisCluster condition", err)
×
107
                }
×
108
        }
109
        if redisCluster.Status.GetInOperationCondition().Status == metav1.ConditionTrue {
2✔
110
                // If the cluster is in operation, we should ensure that the operation is running correctly.
1✔
111
                logger.Info("Cluster is in operation. Reconciliation will check for the operation to complete.")
1✔
112
                switch redisCluster.Status.ClusterState {
1✔
NEW
113
                case redisclusterv1alpha1.ClusterStateScalingOut:
×
NEW
114
                        // If the cluster is scaling out, we should ensure that the statefulset is scaled out correctly.
×
NEW
115
                        result, err = r.reconcileScaleCluster(ctx, logger, redisCluster, masterSSet, replSSets)
×
NEW
116
                        if err != nil || result.RequeueAfter > 0 {
×
NEW
117
                                return result, err
×
UNCOV
118
                        }
×
NEW
119
                case redisclusterv1alpha1.ClusterStateScalingIn:
×
NEW
120
                        clusterNodes.DrainNodes(ctx, redisCluster)
×
NEW
121
                        // If the cluster is scaling in, we should ensure that the statefulset is scaled in correctly.
×
NEW
122
                        result, err = r.reconcileScaleCluster(ctx, logger, redisCluster, masterSSet, replSSets)
×
NEW
123
                        if err != nil || result.RequeueAfter > 0 {
×
NEW
124
                                return result, err
×
NEW
125
                        }
×
NEW
126
                case redisclusterv1alpha1.ClusterStateIncreasingReplicas:
×
127
                        // If the cluster is increasing replicas, we should ensure that the statefulset is updated correctly.
NEW
128
                case redisclusterv1alpha1.ClusterStateDecreasingReplicas:
×
129
                        // If the cluster is decreasing replicas, we should ensure that the statefulset is updated correctly.
NEW
130
                case redisclusterv1alpha1.ClusterStateNormal:
×
NEW
131
                        // If the cluster state is normal, there should be no operation in progress.
×
NEW
132
                        logger.Info("Cluster is in normal state, but still marked as in operation. This should not happen. Resetting operation condition.")
×
NEW
133
                        redisCluster.Status.SetInOperationCondition(false, "ClusterStateNormal", "Cluster is in normal state")
×
NEW
134
                        err = r.KubernetesManager.UpdateResourceStatus(ctx, redisCluster)
×
135
                        if err != nil {
×
NEW
136
                                return r.RequeueError(ctx, "Could not update RedisCluster status", err)
×
137
                        }
×
138
                        // Reconcile the cluster again to check if a new operation is needed.
NEW
139
                        return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
×
140
                }
NEW
141
        } else {
×
NEW
142
                // If the cluster is not in operation, we can check the spec and compare it to the current state of the cluster to decide whether we need an operation.
×
NEW
143
                logger.Info("Cluster is not in operation. Continuing with reconciliation to check for potential operations.")
×
UNCOV
144
        }
×
145

146
        pods, err := r.KubernetesManager.FetchRedisPods(ctx, redisCluster)
1✔
147
        if err != nil {
1✔
148
                return r.RequeueError(ctx, "Could not fetch pods for redis cluster", err)
×
149
        }
×
150

151
        for _, pod := range pods.Items {
1✔
UNCOV
152
                if utils.IsPodReady(&pod) {
×
NEW
153
                        node, err := redisinternal.NewNode(ctx, &redis.Options{
×
154
                                Addr: pod.Status.PodIP + ":6379",
×
155
                        }, &pod, redis.NewClient)
×
156
                        if err != nil {
×
157
                                return r.RequeueError(ctx, "Could not load Redis Client", err)
×
158
                        }
×
159

160
                        // make sure that the node knows about itself
161
                        // This is necessary, as the nodes often startup without being able to retrieve their own IP address
162
                        err = node.Client.ClusterMeet(ctx, pod.Status.PodIP, "6379").Err()
×
163
                        if err != nil {
×
164
                                return r.RequeueError(ctx, "Could not let node meet itself", err)
×
165
                        }
×
166
                        clusterNodes.Nodes = append(clusterNodes.Nodes, node)
×
167
                }
168
        }
169

170
        allPodsReady := len(clusterNodes.Nodes) == int(redisCluster.Spec.Masters)
1✔
171
        if !allPodsReady {
2✔
172
                logger.Info("Not all pods are ready. Reconciling again in 10 seconds")
1✔
173
                return ctrl.Result{
1✔
174
                        RequeueAfter: 10 * time.Second,
1✔
175
                }, nil
1✔
176
        }
1✔
177

178
        if allPodsReady {
×
179
                // region Ensure Cluster Meet
×
180

×
181
                // todo we should check whether a cluster meet is necessary before just spraying cluster meets.
×
182
                // This can also be augmented by doing cluster meet for all ready nodes, and ignoring any none ready ones.
×
183
                // If the amount of ready pods is equal to the amount of nodes needed, we probably have some additional nodes we need to remove.
×
184
                // We can forget these additional nodes, as they are probably nodes which pods got killed.
×
185
                logger.Info("Meeting Redis nodes")
×
186
                err = clusterNodes.ClusterMeet(ctx)
×
187
                if err != nil {
×
188
                        return r.RequeueError(ctx, "Could not meet all nodes together", err)
×
189
                }
×
190
                // We'll wait for 10 seconds to ensure the meet is propagated
191
                time.Sleep(time.Second * 5)
×
192
                // endregion
×
193

×
194
                logger.Info("Checking Cluster Master Replica Ratio")
×
195
                // region Ensure Cluster Replication Ratio
×
196
                err = clusterNodes.EnsureClusterReplicationRatio(ctx, redisCluster)
×
197
                if err != nil {
×
198
                        return r.RequeueError(ctx, "Failed to ensure cluster ratio for cluster", err)
×
199
                }
×
200
                // endregion
201

202
                err = clusterNodes.ReloadNodes(ctx)
×
203
                if err != nil {
×
204
                        return r.RequeueError(ctx, "Failed to reload node info for cluster", err)
×
205
                }
×
206

207
                // region Assign Slots
208
                logger.Info("Assigning Missing Slots")
×
209
                slotsAssignments := clusterNodes.CalculateSlotAssignment()
×
210
                for node, slots := range slotsAssignments {
×
211
                        if len(slots) == 0 {
×
212
                                continue
×
213
                        }
214
                        var slotsInt []int
×
215
                        for _, slot := range slots {
×
216
                                slotsInt = append(slotsInt, int(slot))
×
217
                        }
×
218
                        err = node.ClusterAddSlots(ctx, slotsInt...).Err()
×
219
                        if err != nil {
×
220
                                return r.RequeueError(ctx, "Could not assign node slots", err)
×
221
                        }
×
222
                }
223
                // endregion
224

225
                logger.Info("Forgetting Failed Nodes No Longer Valid")
×
226
                failingNodes, err := clusterNodes.GetFailingNodes(ctx)
×
227
                if err != nil {
×
228
                        return r.RequeueError(ctx, "could not fetch failing nodes", err)
×
229
                }
×
230
                for _, node := range failingNodes {
×
231
                        err = clusterNodes.ForgetNode(ctx, node)
×
232
                        if err != nil {
×
233
                                return r.RequeueError(ctx, fmt.Sprintf("could not forget node %s", node.NodeAttributes.ID), err)
×
234
                        }
×
235
                }
236

237
                logger.Info("Balancing Redis Cluster slots")
×
238
                err = clusterNodes.BalanceSlots(ctx, redisCluster)
×
239
                if err != nil {
×
240
                        return r.RequeueError(ctx, "could not balance slots across nodes", err)
×
241
                }
×
242
                logger.Info("Finished balancing Redis Cluster slots")
×
243
        }
244

NEW
245
        redisCluster.Status.ClusterState = redisclusterv1alpha1.ClusterStateNormal
×
NEW
246
        redisCluster.Status.SetInOperationCondition(false, "ClusterStateNormal", "Cluster is in normal state")
×
NEW
247
        err = r.KubernetesManager.UpdateResourceStatus(ctx, redisCluster)
×
NEW
248
        if err != nil {
×
NEW
249
                return r.RequeueError(ctx, "Could not update RedisCluster status", err)
×
NEW
250
        }
×
251

NEW
252
        logger.Info("Reconciliation completed successfully for RedisCluster", "cluster", req.Name, "namespace", req.Namespace)
×
NEW
253
        // Return a result to requeue the reconciliation after 30 seconds
×
254
        return ctrl.Result{
×
255
                RequeueAfter: 30 * time.Second,
×
256
        }, nil
×
257
}
258

259
func (r *RedisClusterReconciler) RequeueError(ctx context.Context, message string, err error) (ctrl.Result, error) {
×
260
        logger := log.FromContext(ctx)
×
261
        logger.Error(err, message)
×
262
        return ctrl.Result{
×
263
                RequeueAfter: 10 * time.Second,
×
264
        }, err
×
265
}
×
266

267
// SetupWithManager sets up the controller with the Manager.
268
func (r *RedisClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
×
269
        return ctrl.NewControllerManagedBy(mgr).
×
270
                For(&redisclusterv1alpha1.RedisCluster{}).
×
271
                Complete(r)
×
272
}
×
273

274
type clusterInfo struct {
275
        redisCluster *redisclusterv1alpha1.RedisCluster
276
        masterSS     *appsv1.StatefulSet
277
        replicaSS    []*appsv1.StatefulSet
278
        masterPods   []*corev1.Pod
279
        replicaPods  []*corev1.Pod
280
        cm           *corev1.ConfigMap
281
        scr          *corev1.Secret
282
}
283

284
func (r *RedisClusterReconciler) ensureConfigMap(ctx context.Context, logger logr.Logger, redisCluster *redisclusterv1alpha1.RedisCluster) (ctrl.Result, error) {
1✔
285
        configMap, err := r.KubernetesManager.FetchConfigmap(ctx, redisCluster)
1✔
286
        if err != nil && !apierrors.IsNotFound(err) {
2✔
287
                logger.Error(err, "error checking the existence of ConfigMap")
1✔
288
                return ctrl.Result{
1✔
289
                        RequeueAfter: 30 * time.Second,
1✔
290
                }, err
1✔
291
        }
1✔
292
        if apierrors.IsNotFound(err) {
2✔
293
                configMap, err = r.KubernetesManager.CreateConfigMap(ctx, redisCluster)
1✔
294
                if err != nil {
2✔
295
                        logger.Error(err, "error creating ConfigMap for RedisCluster")
1✔
296
                        return ctrl.Result{
1✔
297
                                RequeueAfter: 30 * time.Second,
1✔
298
                        }, err
1✔
299
                }
1✔
300
                logger.Info("Created ConfigMap for RedisCluster. Reconciling in 5 seconds.")
1✔
301
                return ctrl.Result{
1✔
302
                        RequeueAfter: 5 * time.Second,
1✔
303
                }, err
1✔
304
        }
305

306
        err = retry.RetryOnConflict(wait.Backoff{
1✔
307
                Steps:    5,
1✔
308
                Duration: 2 * time.Second,
1✔
309
                Factor:   1.0,
1✔
310
                Jitter:   0.1,
1✔
311
        }, func() error {
2✔
312
                configMap, err = r.KubernetesManager.FetchConfigmap(ctx, redisCluster)
1✔
313
                if err != nil {
1✔
314
                        logger.Error(err, "error finding configMap")
×
315
                        return err
×
316
                }
×
317
                err = ctrl.SetControllerReference(redisCluster, configMap, r.Scheme)
1✔
318
                if err != nil {
1✔
319
                        logger.Error(err, "error setting owner reference of ConfigMap")
×
320
                        return err
×
321
                }
×
322
                err = r.KubernetesManager.UpdateResource(ctx, configMap)
1✔
323
                if err != nil {
1✔
324
                        logger.Error(err, "error updating owner reference of ConfigMap")
×
325
                }
×
326
                return err
1✔
327
        })
328
        if err != nil {
1✔
329
                logger.Error(err, "error setting/updating owner reference of ConfigMap")
×
330
                return ctrl.Result{
×
331
                        RequeueAfter: 10 * time.Second,
×
332
                }, err
×
333
        }
×
334
        return ctrl.Result{}, nil
1✔
335
}
336

337
func (r *RedisClusterReconciler) ensureStatefulSet(ctx context.Context, logger logr.Logger, redisCluster *redisclusterv1alpha1.RedisCluster) (*appsv1.StatefulSet, []*appsv1.StatefulSet, ctrl.Result, error) {
1✔
338
        masterSSet, replSSets, err := r.KubernetesManager.FetchStatefulsets(ctx, redisCluster)
1✔
339
        if err != nil && !apierrors.IsNotFound(err) {
2✔
340
                logger.Error(err, "Could not check whether statefulset exists due to error.")
1✔
341
                return nil, nil, ctrl.Result{
1✔
342
                        RequeueAfter: 30 * time.Second,
1✔
343
                }, err
1✔
344
        }
1✔
345

346
        if apierrors.IsNotFound(err) {
2✔
347
                masterSSet, replSSets, err = r.KubernetesManager.CreateStatefulsets(ctx, redisCluster)
1✔
348
                if err != nil {
2✔
349
                        logger.Error(err, "Failed to create Statefulset for RedisCluster")
1✔
350
                        return nil, nil, ctrl.Result{
1✔
351
                                RequeueAfter: 30 * time.Second,
1✔
352
                        }, err
1✔
353
                }
1✔
354
                logger.Info("Created Statefulset for RedisCluster. Reconciling in 5 seconds.")
1✔
355
                return masterSSet, replSSets, ctrl.Result{
1✔
356
                        RequeueAfter: 5 * time.Second,
1✔
357
                }, err
1✔
358
        }
359

360
        // Set owner reference for master statefulset
361
        masterSSet, replSSets, err = r.KubernetesManager.FetchStatefulsets(ctx, redisCluster)
1✔
362
        if err != nil {
1✔
363
                logger.Error(err, "Cannot find statefulsets")
×
364
                return nil, nil, ctrl.Result{
×
365
                        RequeueAfter: 5 * time.Second,
×
366
                }, err
×
367
        }
×
368
        err = retry.RetryOnConflict(wait.Backoff{
1✔
369
                Steps:    5,
1✔
370
                Duration: 2 * time.Second,
1✔
371
                Factor:   1.0,
1✔
372
                Jitter:   0.1,
1✔
373
        }, func() error {
2✔
374
                err = ctrl.SetControllerReference(redisCluster, masterSSet, r.Scheme)
1✔
375
                if err != nil {
1✔
376
                        logger.Error(err, fmt.Sprintf("Could not set owner name for master statefulset named %s", masterSSet.Name))
×
377
                        return err
×
378
                }
×
379
                err = r.KubernetesManager.UpdateResource(ctx, masterSSet)
1✔
380
                if err != nil {
1✔
381
                        logger.Error(err, fmt.Sprintf("Could not update master statefulset named %s  with owner reference", masterSSet.Name))
×
382
                        return err
×
383
                }
×
384
                return err
1✔
385
        })
386
        for _, statefulset := range replSSets {
2✔
387
                err = retry.RetryOnConflict(wait.Backoff{
1✔
388
                        Steps:    5,
1✔
389
                        Duration: 2 * time.Second,
1✔
390
                        Factor:   1.0,
1✔
391
                        Jitter:   0.1,
1✔
392
                }, func() error {
2✔
393
                        err = ctrl.SetControllerReference(redisCluster, statefulset, r.Scheme)
1✔
394
                        if err != nil {
1✔
395
                                logger.Error(err, fmt.Sprintf("Could not set owner name for replica statefulset named %s", statefulset.Name))
×
396
                                return err
×
397
                        }
×
398
                        err = r.KubernetesManager.UpdateResource(ctx, statefulset)
1✔
399
                        if err != nil {
1✔
400
                                logger.Error(err, fmt.Sprintf("Could not update replica statefulset named %s  with owner reference", statefulset.Name))
×
401
                                return err
×
402
                        }
×
403
                        return err
1✔
404
                })
405
        }
406
        if err != nil {
1✔
407
                logger.Error(err, "Could not set owner reference for statefulset")
×
408
                return nil, nil, ctrl.Result{
×
409
                        RequeueAfter: 10 * time.Second,
×
410
                }, err
×
411
        }
×
412
        return masterSSet, replSSets, ctrl.Result{}, nil
1✔
413
}
414

415
func (r *RedisClusterReconciler) ensureService(ctx context.Context, logger logr.Logger, redisCluster *redisclusterv1alpha1.RedisCluster) (ctrl.Result, error) {
1✔
416
        service, err := r.KubernetesManager.FetchService(ctx, redisCluster)
1✔
417
        if err != nil && !apierrors.IsNotFound(err) {
2✔
418
                logger.Error(err, "Could not check whether service exists due to error.")
1✔
419
                return ctrl.Result{
1✔
420
                        RequeueAfter: 30 * time.Second,
1✔
421
                }, err
1✔
422
        }
1✔
423
        if apierrors.IsNotFound(err) {
2✔
424
                service, err = r.KubernetesManager.CreateService(ctx, redisCluster)
1✔
425
                if err != nil {
2✔
426
                        logger.Error(err, "Failed to create Service for RedisCluster")
1✔
427
                        return ctrl.Result{
1✔
428
                                RequeueAfter: 30 * time.Second,
1✔
429
                        }, err
1✔
430
                }
1✔
431
                logger.Info("Created Service for RedisCluster. Reconciling in 5 seconds.")
1✔
432
                return ctrl.Result{
1✔
433
                        RequeueAfter: 5 * time.Second,
1✔
434
                }, err
1✔
435
        }
436

437
        // Set Service owner reference
438
        err = retry.RetryOnConflict(wait.Backoff{
1✔
439
                Steps:    5,
1✔
440
                Duration: 2 * time.Second,
1✔
441
                Factor:   1.0,
1✔
442
                Jitter:   0.1,
1✔
443
        }, func() error {
2✔
444
                service, err = r.KubernetesManager.FetchService(ctx, redisCluster)
1✔
445
                if err != nil {
1✔
446
                        logger.Error(err, "Cannot find service")
×
447
                        return err
×
448
                }
×
449
                err = ctrl.SetControllerReference(redisCluster, service, r.Scheme)
1✔
450
                if err != nil {
1✔
451
                        logger.Error(err, "Could not set owner reference for service")
×
452
                        return err
×
453
                }
×
454
                err = r.KubernetesManager.UpdateResource(ctx, service)
1✔
455
                if err != nil {
1✔
456
                        logger.Error(err, "Could not update service with owner reference")
×
457
                }
×
458
                return err
1✔
459
        })
460
        if err != nil {
1✔
461
                logger.Error(err, "Could not set owner reference for service")
×
462
                return ctrl.Result{
×
463
                        RequeueAfter: 10 * time.Second,
×
464
                }, err
×
465
        }
×
466
        return ctrl.Result{}, nil
1✔
467
}
468

469
// reconcileScaleOut checks if the cluster needs to be scaled out and performs the scaling if necessary.
NEW
470
func (r *RedisClusterReconciler) reconcileScaleCluster(ctx context.Context, logger logr.Logger, redisCluster *redisclusterv1alpha1.RedisCluster, masterSSet *appsv1.StatefulSet, replSSets []*appsv1.StatefulSet) (ctrl.Result, error) {
×
NEW
471
        replicas := redisCluster.Spec.Masters
×
NEW
472
        masterSSet.Spec.Replicas = &replicas
×
NEW
473
        err := r.KubernetesManager.UpdateResource(ctx, masterSSet)
×
NEW
474
        if err != nil {
×
NEW
475
                return r.RequeueError(ctx, "Could not update statefulset replicas", err)
×
NEW
476
        }
×
NEW
477
        for _, statefulset := range replSSets {
×
NEW
478
                if *statefulset.Spec.Replicas < redisCluster.Spec.Masters {
×
NEW
479
                        statefulset.Spec.Replicas = &replicas
×
NEW
480
                        err := r.KubernetesManager.UpdateResource(ctx, statefulset)
×
NEW
481
                        if err != nil {
×
NEW
482
                                return r.RequeueError(ctx, "Could not update statefulset replicas", err)
×
NEW
483
                        }
×
484
                }
485
        }
NEW
486
        return ctrl.Result{
×
NEW
487
                RequeueAfter: 5 * time.Second,
×
NEW
488
        }, nil
×
489
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc