• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chideat / valkey-operator / 17861088256

19 Sep 2025 02:22PM UTC coverage: 18.853% (-0.5%) from 19.354%
17861088256

Pull #59

github

chideat
chore: update release pipeline and dependencies
Pull Request #59: Enhanced Stability, Pause Logic, and Sentinel Improvements

88 of 1118 new or added lines in 42 files covered. (7.87%)

62 existing lines in 12 files now uncovered.

3881 of 20586 relevant lines covered (18.85%)

0.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

14.21
/internal/controller/user_controller.go
1
/*
2
Copyright 2024 chideat.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "reflect"
23
        "slices"
24
        "strings"
25
        "time"
26

27
        "github.com/chideat/valkey-operator/api/core"
28
        "github.com/chideat/valkey-operator/api/v1alpha1"
29
        "github.com/chideat/valkey-operator/internal/builder"
30
        "github.com/chideat/valkey-operator/internal/config"
31
        "github.com/chideat/valkey-operator/internal/controller/user"
32
        "github.com/chideat/valkey-operator/internal/util"
33
        security "github.com/chideat/valkey-operator/pkg/security/password"
34
        tuser "github.com/chideat/valkey-operator/pkg/types/user"
35

36
        corev1 "k8s.io/api/core/v1"
37
        "k8s.io/apimachinery/pkg/api/errors"
38
        "k8s.io/apimachinery/pkg/runtime"
39
        "k8s.io/apimachinery/pkg/types"
40
        "k8s.io/client-go/tools/record"
41
        "k8s.io/client-go/util/retry"
42
        ctrl "sigs.k8s.io/controller-runtime"
43
        "sigs.k8s.io/controller-runtime/pkg/client"
44
        "sigs.k8s.io/controller-runtime/pkg/controller"
45
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
46
        "sigs.k8s.io/controller-runtime/pkg/event"
47
        "sigs.k8s.io/controller-runtime/pkg/log"
48
        "sigs.k8s.io/controller-runtime/pkg/predicate"
49
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
50
)
51

52
const (
53
        UserFinalizer = "buf.red/user-finalizer"
54
)
55

56
// UserReconciler reconciles a User object
57
type UserReconciler struct {
58
        client.Client
59
        Scheme *runtime.Scheme
60

61
        EventRecorder record.EventRecorder
62
        Handler       *user.UserHandler
63
}
64

65
// +kubebuilder:rbac:groups=valkey.buf.red,resources=users,verbs=get;list;watch;create;update;patch;delete
66
// +kubebuilder:rbac:groups=valkey.buf.red,resources=users/status,verbs=get;update;patch
67
// +kubebuilder:rbac:groups=valkey.buf.red,resources=users/finalizers,verbs=update
68

69
// Reconcile is part of the main kubernetes reconciliation loop which aims to
70
// move the current state of the cluster closer to the desired state.
71
func (r *UserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
1✔
72
        logger := log.FromContext(ctx).WithName("User").WithValues("target", req.String())
1✔
73

1✔
74
        instance := v1alpha1.User{}
1✔
75
        if err := r.Client.Get(ctx, req.NamespacedName, &instance); err != nil {
1✔
UNCOV
76
                logger.Error(err, "get valkey user failed")
×
NEW
77
                return reconcile.Result{}, client.IgnoreNotFound(err)
×
78
        } else if instance.GetDeletionTimestamp() != nil {
1✔
NEW
79
                if slices.Contains([]string{tuser.DefaultOperatorUserName}, instance.Spec.Username) {
×
NEW
80
                        switch instance.Spec.Arch {
×
NEW
81
                        case core.ValkeyReplica, core.ValkeyFailover:
×
NEW
82
                                rf := &v1alpha1.Failover{}
×
NEW
83
                                if err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: instance.Spec.InstanceName}, rf); err != nil {
×
NEW
84
                                        if !errors.IsNotFound(err) {
×
NEW
85
                                                logger.Error(err, "get instance failed", "name", instance.Name)
×
NEW
86
                                                return ctrl.Result{}, err
×
NEW
87
                                        }
×
NEW
88
                                } else {
×
NEW
89
                                        if rf.GetDeletionTimestamp() != nil {
×
NEW
90
                                                logger.Info("failover is deleting, skip remove finalizer", "name", instance.Spec.InstanceName)
×
NEW
91
                                                return ctrl.Result{RequeueAfter: time.Second * 10}, nil
×
NEW
92
                                        }
×
93
                                        // this should not happen, but we still return a requeue result
NEW
94
                                        return ctrl.Result{RequeueAfter: time.Minute}, nil
×
95
                                }
NEW
96
                        case core.ValkeyCluster:
×
NEW
97
                                cluster := &v1alpha1.Cluster{}
×
NEW
98
                                if err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: instance.Spec.InstanceName}, cluster); err != nil {
×
NEW
99
                                        if !errors.IsNotFound(err) {
×
NEW
100
                                                logger.Error(err, "get instance failed", "name", instance.Name)
×
NEW
101
                                                return ctrl.Result{}, err
×
NEW
102
                                        }
×
NEW
103
                                } else {
×
NEW
104
                                        if cluster.GetDeletionTimestamp() != nil {
×
NEW
105
                                                logger.Info("instance is deleting, skip remove finalizer", "name", instance.Spec.InstanceName)
×
NEW
106
                                                return ctrl.Result{RequeueAfter: time.Second * 10}, nil
×
NEW
107
                                        }
×
108
                                        // this should not happen, but we still return a requeue result
NEW
109
                                        return ctrl.Result{RequeueAfter: time.Minute}, nil
×
110
                                }
111
                        }
112
                }
113

UNCOV
114
                if err := r.Handler.Delete(ctx, instance, logger); err != nil {
×
115
                        if instance.Status.Message != err.Error() {
×
116
                                instance.Status.Phase = v1alpha1.UserFail
×
117
                                instance.Status.Message = fmt.Sprintf("clean user failed with error %s", err.Error())
×
118
                                if err := r.Client.Status().Update(ctx, &instance); err != nil {
×
119
                                        logger.Error(err, "update user status failed", "instance", req.NamespacedName)
×
120
                                        return ctrl.Result{}, err
×
121
                                }
×
122
                        }
123
                        return ctrl.Result{RequeueAfter: time.Second * 10}, err
×
124
                }
125

NEW
126
                for _, name := range instance.Spec.PasswordSecrets {
×
NEW
127
                        if name == "" {
×
NEW
128
                                continue
×
129
                        }
NEW
130
                        secret := &corev1.Secret{}
×
NEW
131
                        if err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: name}, secret); err != nil {
×
NEW
132
                                if errors.IsNotFound(err) {
×
NEW
133
                                        logger.Info("secret not found, skip remove finalizer", "name", name)
×
NEW
134
                                        continue
×
135
                                }
NEW
136
                                logger.Error(err, "get secret failed", "secret name", name)
×
NEW
137
                                return ctrl.Result{}, err
×
138
                        }
139

NEW
140
                        if slices.Contains(secret.GetFinalizers(), UserFinalizer) {
×
NEW
141
                                controllerutil.RemoveFinalizer(secret, UserFinalizer)
×
NEW
142
                                if err := r.Update(ctx, secret); err != nil {
×
NEW
143
                                        logger.Error(err, "remove finalizer from secret failed", "secret name", name)
×
NEW
144
                                        return ctrl.Result{}, err
×
NEW
145
                                }
×
146
                        }
147
                }
148

NEW
149
                logger.Info("RemoveFinalizer", "instance", req.NamespacedName)
×
NEW
150
                controllerutil.RemoveFinalizer(&instance, UserFinalizer)
×
NEW
151
                if err := r.Update(ctx, &instance); err != nil {
×
NEW
152
                        logger.Error(err, "remove finalizer failed", "instance", req.NamespacedName)
×
NEW
153
                        return ctrl.Result{RequeueAfter: time.Second * 10}, nil
×
154
                }
×
155
                return ctrl.Result{}, nil
×
156
        }
157

158
        vkName := instance.Spec.InstanceName
1✔
159
        switch instance.Spec.Arch {
1✔
160
        case core.ValkeySentinel, core.ValkeyReplica:
×
161
                rf := &v1alpha1.Failover{}
×
162
                if err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: vkName}, rf); err != nil {
×
163
                        if errors.IsNotFound(err) {
×
164
                                logger.Error(err, "instance not found", "name", vkName)
×
165
                                return ctrl.Result{Requeue: true}, r.Delete(ctx, &instance)
×
166
                        }
×
167
                        logger.Error(err, "get failover failed", "name", instance.Name)
×
168
                        return ctrl.Result{}, err
×
169
                }
170
        case core.ValkeyCluster:
1✔
171
                cluster := &v1alpha1.Cluster{}
1✔
172
                if err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: vkName}, cluster); err != nil {
1✔
173
                        if errors.IsNotFound(err) {
×
174
                                logger.Error(err, "instance not found", "name", vkName)
×
175
                                return ctrl.Result{Requeue: true}, r.Delete(ctx, &instance)
×
176
                        }
×
177
                        logger.Error(err, "get cluster instance failed", "name", instance.Name)
×
178
                        return ctrl.Result{}, err
×
179
                }
180
        }
181

182
        // verify user password
183
        for _, name := range instance.Spec.PasswordSecrets {
1✔
184
                if name == "" {
×
185
                        continue
×
186
                }
187
                secret := &corev1.Secret{}
×
NEW
188
                if err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: name}, secret); err != nil {
×
189
                        logger.Error(err, "get secret failed", "secret name", name)
×
190
                        instance.Status.Message = err.Error()
×
191
                        instance.Status.Phase = v1alpha1.UserFail
×
192
                        if e := r.Client.Status().Update(ctx, &instance); e != nil {
×
193
                                logger.Error(e, "update User status to Fail failed")
×
194
                        }
×
195
                        return ctrl.Result{}, err
×
196
                } else if err := security.PasswordValidate(string(secret.Data["password"]), 8, 32); err != nil {
×
197
                        if instance.Spec.AccountType != v1alpha1.SystemAccount {
×
198
                                instance.Status.Message = err.Error()
×
199
                                instance.Status.Phase = v1alpha1.UserFail
×
200
                                if e := r.Client.Status().Update(ctx, &instance); e != nil {
×
201
                                        logger.Error(e, "update User status to Fail failed")
×
202
                                }
×
203
                                return ctrl.Result{}, err
×
204
                        }
205
                }
206

207
                if secret.GetLabels() == nil {
×
208
                        secret.SetLabels(map[string]string{})
×
209
                }
×
210
                if secret.Labels[builder.InstanceNameLabelKey] != vkName ||
×
NEW
211
                        len(secret.GetOwnerReferences()) == 0 || secret.OwnerReferences[0].UID != instance.GetUID() ||
×
NEW
212
                        controllerutil.ContainsFinalizer(secret, UserFinalizer) {
×
213

×
214
                        secret.Labels[builder.ManagedByLabelKey] = config.AppName
×
215
                        secret.Labels[builder.InstanceNameLabelKey] = vkName
×
216
                        secret.OwnerReferences = util.BuildOwnerReferences(&instance)
×
NEW
217
                        controllerutil.AddFinalizer(secret, UserFinalizer)
×
NEW
218
                        if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
×
219
                                return r.Update(ctx, secret)
×
220
                        }); err != nil {
×
221
                                logger.Error(err, "update secret owner failed", "secret", secret.Name)
×
222
                                instance.Status.Message = err.Error()
×
223
                                instance.Status.Phase = v1alpha1.UserFail
×
224
                                return ctrl.Result{RequeueAfter: time.Second * 5}, r.Client.Status().Update(ctx, &instance)
×
225
                        }
×
226
                }
227
        }
228

229
        if err := r.Handler.Do(ctx, instance, logger); err != nil {
2✔
230
                if strings.Contains(err.Error(), "instance is not ready") ||
1✔
231
                        strings.Contains(err.Error(), "node not ready") ||
1✔
232
                        strings.Contains(err.Error(), "user not operator") ||
1✔
233
                        strings.Contains(err.Error(), "ERR unknown command `ACL`") {
1✔
234
                        logger.V(3).Info("instance is not ready", "instance", vkName)
×
235
                        instance.Status.Message = err.Error()
×
236
                        instance.Status.Phase = v1alpha1.UserPending
×
237
                        if err := r.updateUserStatus(ctx, &instance); err != nil {
×
238
                                logger.Error(err, "update User status to Pending failed")
×
239
                        }
×
240
                        return ctrl.Result{RequeueAfter: time.Second * 15}, nil
×
241
                }
242

243
                instance.Status.Message = err.Error()
1✔
244
                instance.Status.Phase = v1alpha1.UserFail
1✔
245
                logger.Error(err, "user reconcile failed")
1✔
246
                if err := r.updateUserStatus(ctx, &instance); err != nil {
1✔
247
                        logger.Error(err, "update User status to Fail failed")
×
248
                }
×
249
                return reconcile.Result{RequeueAfter: time.Second * 10}, nil
1✔
250
        }
251
        instance.Status.Phase = v1alpha1.UserReady
×
252
        instance.Status.Message = ""
×
253
        logger.V(3).Info("user reconcile success")
×
254
        if err := r.updateUserStatus(ctx, &instance); err != nil {
×
255
                logger.Error(err, "update User status to Success failed")
×
256
                return reconcile.Result{RequeueAfter: time.Second * 10}, err
×
257
        }
×
258
        if !controllerutil.ContainsFinalizer(&instance, UserFinalizer) {
×
259
                controllerutil.AddFinalizer(&instance, UserFinalizer)
×
260
                if err := r.updateUser(ctx, &instance); err != nil {
×
261
                        logger.Error(err, "update finalizer user failed")
×
262
                        return ctrl.Result{}, err
×
263
                }
×
264
        }
265

266
        return ctrl.Result{}, nil
×
267
}
268

269
func (r *UserReconciler) updateUserStatus(ctx context.Context, inst *v1alpha1.User) error {
1✔
270
        return retry.RetryOnConflict(retry.DefaultRetry, func() error {
2✔
271
                var oldUser v1alpha1.User
1✔
272
                if err := r.Get(ctx, types.NamespacedName{Namespace: inst.Namespace, Name: inst.Name}, &oldUser); err != nil {
1✔
273
                        return err
×
274
                }
×
275
                inst.ResourceVersion = oldUser.ResourceVersion
1✔
276
                return r.Status().Update(ctx, inst)
1✔
277
        })
278
}
279

280
func (r *UserReconciler) updateUser(ctx context.Context, inst *v1alpha1.User) error {
×
281
        return retry.RetryOnConflict(retry.DefaultRetry, func() error {
×
282
                var oldUser v1alpha1.User
×
283
                if err := r.Get(ctx, types.NamespacedName{Namespace: inst.Namespace, Name: inst.Name}, &oldUser); err != nil {
×
284
                        return err
×
285
                }
×
286
                inst.ResourceVersion = oldUser.ResourceVersion
×
287
                return r.Update(ctx, inst)
×
288
        })
289
}
290

291
// SetupWithManager sets up the controller with the Manager.
292
func (r *UserReconciler) SetupWithManager(mgr ctrl.Manager) error {
×
293
        return ctrl.NewControllerManagedBy(mgr).
×
294
                For(&v1alpha1.User{}).
×
295
                Owns(&corev1.Secret{}).
×
296
                WithEventFilter(generationChangedFilter()).
×
297
                WithOptions(controller.Options{MaxConcurrentReconciles: 8}).
×
298
                Complete(r)
×
299
}
×
300

301
func generationChangedFilter() predicate.Predicate {
×
302
        return predicate.Funcs{
×
303
                UpdateFunc: func(e event.UpdateEvent) bool {
×
304
                        if e.ObjectOld == nil {
×
305
                                return false
×
306
                        }
×
307
                        if e.ObjectNew == nil {
×
308
                                return false
×
309
                        }
×
310
                        if reflect.TypeOf(e.ObjectNew).String() == reflect.TypeOf(&corev1.Secret{}).String() {
×
311
                                return true
×
312
                        }
×
313
                        // Ignore updates to CR status in which case metadata.Generation does not change
314
                        return e.ObjectOld.GetGeneration() != e.ObjectNew.GetGeneration()
×
315
                },
316
        }
317
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc