• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chideat / valkey-operator / 15440127187

04 Jun 2025 10:37AM UTC coverage: 19.366% (+9.1%) from 10.284%
15440127187

push

github

web-flow
chore: added more unit tests (#18)

fix bugs of create valkey instance or users

220 of 588 new or added lines in 46 files covered. (37.41%)

11 existing lines in 6 files now uncovered.

3880 of 20035 relevant lines covered (19.37%)

0.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.24
/internal/controller/user_controller.go
1
/*
2
Copyright 2024 chideat.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "reflect"
23
        "strings"
24
        "time"
25

26
        "github.com/chideat/valkey-operator/api/core"
27
        "github.com/chideat/valkey-operator/api/v1alpha1"
28
        "github.com/chideat/valkey-operator/internal/builder"
29
        "github.com/chideat/valkey-operator/internal/config"
30
        "github.com/chideat/valkey-operator/internal/controller/user"
31
        "github.com/chideat/valkey-operator/internal/util"
32
        security "github.com/chideat/valkey-operator/pkg/security/password"
33

34
        corev1 "k8s.io/api/core/v1"
35
        "k8s.io/apimachinery/pkg/api/errors"
36
        "k8s.io/apimachinery/pkg/runtime"
37
        "k8s.io/apimachinery/pkg/types"
38
        "k8s.io/client-go/tools/record"
39
        "k8s.io/client-go/util/retry"
40
        ctrl "sigs.k8s.io/controller-runtime"
41
        "sigs.k8s.io/controller-runtime/pkg/client"
42
        "sigs.k8s.io/controller-runtime/pkg/controller"
43
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
44
        "sigs.k8s.io/controller-runtime/pkg/event"
45
        "sigs.k8s.io/controller-runtime/pkg/log"
46
        "sigs.k8s.io/controller-runtime/pkg/predicate"
47
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
48
)
49

50
const (
51
        UserFinalizer = "buf.red/user-finalizer"
52
)
53

54
// UserReconciler reconciles a User object
55
type UserReconciler struct {
56
        client.Client
57
        Scheme *runtime.Scheme
58

59
        EventRecorder record.EventRecorder
60
        Handler       *user.UserHandler
61
}
62

63
// +kubebuilder:rbac:groups=valkey.buf.red,resources=users,verbs=get;list;watch;create;update;patch;delete
64
// +kubebuilder:rbac:groups=valkey.buf.red,resources=users/status,verbs=get;update;patch
65
// +kubebuilder:rbac:groups=valkey.buf.red,resources=users/finalizers,verbs=update
66

67
// Reconcile is part of the main kubernetes reconciliation loop which aims to
68
// move the current state of the cluster closer to the desired state.
69
func (r *UserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
1✔
70
        logger := log.FromContext(ctx).WithName("User").WithValues("target", req.String())
1✔
71

1✔
72
        instance := v1alpha1.User{}
1✔
73
        err := r.Client.Get(ctx, req.NamespacedName, &instance)
1✔
74
        if err != nil {
1✔
75
                if errors.IsNotFound(err) {
×
76
                        return reconcile.Result{}, nil
×
77
                }
×
NEW
78
                logger.Error(err, "get valkey user failed")
×
UNCOV
79
                return reconcile.Result{}, err
×
80
        }
81

82
        if instance.GetDeletionTimestamp() != nil {
1✔
NEW
83
                logger.Info("user is being deleted", "instance", req.NamespacedName)
×
NEW
84
                if err := r.Handler.Delete(ctx, instance, logger); err != nil {
×
NEW
85
                        if instance.Status.Message != err.Error() {
×
NEW
86
                                instance.Status.Phase = v1alpha1.UserFail
×
NEW
87
                                instance.Status.Message = fmt.Sprintf("clean user failed with error %s", err.Error())
×
NEW
88
                                if err := r.Client.Status().Update(ctx, &instance); err != nil {
×
NEW
89
                                        logger.Error(err, "update user status failed", "instance", req.NamespacedName)
×
NEW
90
                                        return ctrl.Result{}, err
×
NEW
91
                                }
×
92
                        }
NEW
93
                        return ctrl.Result{RequeueAfter: time.Second * 10}, err
×
NEW
94
                } else {
×
NEW
95
                        controllerutil.RemoveFinalizer(&instance, UserFinalizer)
×
NEW
96
                        if err := r.Update(ctx, &instance); err != nil {
×
NEW
97
                                logger.Error(err, "remove finalizer failed", "instance", req.NamespacedName)
×
NEW
98
                                return ctrl.Result{RequeueAfter: time.Second * 10}, nil
×
NEW
99
                        }
×
100
                }
101
                return ctrl.Result{}, nil
×
102
        }
103

104
        vkName := instance.Spec.InstanceName
1✔
105
        switch instance.Spec.Arch {
1✔
106
        case core.ValkeySentinel, core.ValkeyReplica:
×
107
                rf := &v1alpha1.Failover{}
×
108
                if err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: vkName}, rf); err != nil {
×
109
                        if errors.IsNotFound(err) {
×
110
                                logger.Error(err, "instance not found", "name", vkName)
×
111
                                return ctrl.Result{Requeue: true}, r.Delete(ctx, &instance)
×
112
                        }
×
113
                        logger.Error(err, "get failover failed", "name", instance.Name)
×
114
                        return ctrl.Result{}, err
×
115
                }
116
        case core.ValkeyCluster:
1✔
117
                cluster := &v1alpha1.Cluster{}
1✔
118
                if err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: vkName}, cluster); err != nil {
1✔
119
                        if errors.IsNotFound(err) {
×
120
                                logger.Error(err, "instance not found", "name", vkName)
×
121
                                return ctrl.Result{Requeue: true}, r.Delete(ctx, &instance)
×
122
                        }
×
123
                        logger.Error(err, "get cluster instance failed", "name", instance.Name)
×
124
                        return ctrl.Result{}, err
×
125
                }
126
        }
127

128
        // verify user password
129
        for _, name := range instance.Spec.PasswordSecrets {
1✔
130
                if name == "" {
×
131
                        continue
×
132
                }
133
                secret := &corev1.Secret{}
×
134
                if err := r.Get(ctx, types.NamespacedName{
×
135
                        Namespace: instance.Namespace,
×
136
                        Name:      name,
×
137
                }, secret); err != nil {
×
138
                        logger.Error(err, "get secret failed", "secret name", name)
×
139
                        instance.Status.Message = err.Error()
×
140
                        instance.Status.Phase = v1alpha1.UserFail
×
141
                        if e := r.Client.Status().Update(ctx, &instance); e != nil {
×
142
                                logger.Error(e, "update User status to Fail failed")
×
143
                        }
×
144
                        return ctrl.Result{}, err
×
145
                } else if err := security.PasswordValidate(string(secret.Data["password"]), 8, 32); err != nil {
×
146
                        if instance.Spec.AccountType != v1alpha1.SystemAccount {
×
147
                                instance.Status.Message = err.Error()
×
148
                                instance.Status.Phase = v1alpha1.UserFail
×
149
                                if e := r.Client.Status().Update(ctx, &instance); e != nil {
×
150
                                        logger.Error(e, "update User status to Fail failed")
×
151
                                }
×
152
                                return ctrl.Result{}, err
×
153
                        }
154
                }
155

156
                if secret.GetLabels() == nil {
×
157
                        secret.SetLabels(map[string]string{})
×
158
                }
×
159
                if secret.Labels[builder.InstanceNameLabelKey] != vkName ||
×
160
                        len(secret.GetOwnerReferences()) == 0 || secret.OwnerReferences[0].UID != instance.GetUID() {
×
161

×
162
                        secret.Labels[builder.ManagedByLabelKey] = config.AppName
×
163
                        secret.Labels[builder.InstanceNameLabelKey] = vkName
×
164
                        secret.OwnerReferences = util.BuildOwnerReferences(&instance)
×
165
                        if err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
×
NEW
166
                                return r.Update(ctx, secret)
×
167
                        }); err != nil {
×
168
                                logger.Error(err, "update secret owner failed", "secret", secret.Name)
×
169
                                instance.Status.Message = err.Error()
×
170
                                instance.Status.Phase = v1alpha1.UserFail
×
171
                                return ctrl.Result{RequeueAfter: time.Second * 5}, r.Client.Status().Update(ctx, &instance)
×
172
                        }
×
173
                }
174
        }
175

176
        if err := r.Handler.Do(ctx, instance, logger); err != nil {
2✔
177
                if strings.Contains(err.Error(), "instance is not ready") ||
1✔
178
                        strings.Contains(err.Error(), "node not ready") ||
1✔
179
                        strings.Contains(err.Error(), "user not operator") ||
1✔
180
                        strings.Contains(err.Error(), "ERR unknown command `ACL`") {
1✔
181
                        logger.V(3).Info("instance is not ready", "instance", vkName)
×
182
                        instance.Status.Message = err.Error()
×
183
                        instance.Status.Phase = v1alpha1.UserPending
×
184
                        if err := r.updateUserStatus(ctx, &instance); err != nil {
×
185
                                logger.Error(err, "update User status to Pending failed")
×
186
                        }
×
187
                        return ctrl.Result{RequeueAfter: time.Second * 15}, nil
×
188
                }
189

190
                instance.Status.Message = err.Error()
1✔
191
                instance.Status.Phase = v1alpha1.UserFail
1✔
192
                logger.Error(err, "user reconcile failed")
1✔
193
                if err := r.updateUserStatus(ctx, &instance); err != nil {
1✔
194
                        logger.Error(err, "update User status to Fail failed")
×
195
                }
×
196
                return reconcile.Result{RequeueAfter: time.Second * 10}, nil
1✔
197
        }
NEW
198
        instance.Status.Phase = v1alpha1.UserReady
×
199
        instance.Status.Message = ""
×
200
        logger.V(3).Info("user reconcile success")
×
201
        if err := r.updateUserStatus(ctx, &instance); err != nil {
×
202
                logger.Error(err, "update User status to Success failed")
×
203
                return reconcile.Result{RequeueAfter: time.Second * 10}, err
×
204
        }
×
205
        if !controllerutil.ContainsFinalizer(&instance, UserFinalizer) {
×
206
                controllerutil.AddFinalizer(&instance, UserFinalizer)
×
207
                if err := r.updateUser(ctx, &instance); err != nil {
×
208
                        logger.Error(err, "update finalizer user failed")
×
209
                        return ctrl.Result{}, err
×
210
                }
×
211
        }
212

213
        return ctrl.Result{}, nil
×
214
}
215

216
func (r *UserReconciler) updateUserStatus(ctx context.Context, inst *v1alpha1.User) error {
1✔
217
        return retry.RetryOnConflict(retry.DefaultRetry, func() error {
2✔
218
                var oldUser v1alpha1.User
1✔
219
                if err := r.Get(ctx, types.NamespacedName{Namespace: inst.Namespace, Name: inst.Name}, &oldUser); err != nil {
1✔
220
                        return err
×
221
                }
×
222
                inst.ResourceVersion = oldUser.ResourceVersion
1✔
223
                return r.Status().Update(ctx, inst)
1✔
224
        })
225
}
226

227
func (r *UserReconciler) updateUser(ctx context.Context, inst *v1alpha1.User) error {
×
228
        return retry.RetryOnConflict(retry.DefaultRetry, func() error {
×
229
                var oldUser v1alpha1.User
×
230
                if err := r.Get(ctx, types.NamespacedName{Namespace: inst.Namespace, Name: inst.Name}, &oldUser); err != nil {
×
231
                        return err
×
232
                }
×
233
                inst.ResourceVersion = oldUser.ResourceVersion
×
234
                return r.Update(ctx, inst)
×
235
        })
236
}
237

238
// SetupWithManager sets up the controller with the Manager.
239
func (r *UserReconciler) SetupWithManager(mgr ctrl.Manager) error {
×
240
        return ctrl.NewControllerManagedBy(mgr).
×
241
                For(&v1alpha1.User{}).
×
242
                Owns(&corev1.Secret{}).
×
243
                WithEventFilter(generationChangedFilter()).
×
244
                WithOptions(controller.Options{MaxConcurrentReconciles: 8}).
×
245
                Complete(r)
×
246
}
×
247

248
func generationChangedFilter() predicate.Predicate {
×
249
        return predicate.Funcs{
×
250
                UpdateFunc: func(e event.UpdateEvent) bool {
×
251
                        if e.ObjectOld == nil {
×
252
                                return false
×
253
                        }
×
254
                        if e.ObjectNew == nil {
×
255
                                return false
×
256
                        }
×
257
                        if reflect.TypeOf(e.ObjectNew).String() == reflect.TypeOf(&corev1.Secret{}).String() {
×
258
                                return true
×
259
                        }
×
260
                        // Ignore updates to CR status in which case metadata.Generation does not change
261
                        return e.ObjectOld.GetGeneration() != e.ObjectNew.GetGeneration()
×
262
                },
263
        }
264
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc