• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chideat / valkey-operator / 14173979426

31 Mar 2025 02:26PM UTC coverage: 10.247% (-3.3%) from 13.534%
14173979426

push

github

web-flow
feat: added webhooks

168 of 2201 new or added lines in 67 files covered. (7.63%)

1254 existing lines in 61 files now uncovered.

2074 of 20241 relevant lines covered (10.25%)

0.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

20.13
/internal/controller/user_controller.go
1
/*
2
Copyright 2024 chideat.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "reflect"
22
        "strings"
23
        "time"
24

25
        "github.com/chideat/valkey-operator/api/core"
26
        "github.com/chideat/valkey-operator/api/v1alpha1"
27
        "github.com/chideat/valkey-operator/internal/builder"
28
        "github.com/chideat/valkey-operator/internal/config"
29
        "github.com/chideat/valkey-operator/internal/controller/user"
30
        "github.com/chideat/valkey-operator/internal/util"
31
        "github.com/chideat/valkey-operator/pkg/kubernetes"
32
        security "github.com/chideat/valkey-operator/pkg/security/password"
33

34
        corev1 "k8s.io/api/core/v1"
35
        "k8s.io/apimachinery/pkg/api/errors"
36
        "k8s.io/apimachinery/pkg/runtime"
37
        "k8s.io/apimachinery/pkg/types"
38
        "k8s.io/client-go/tools/record"
39
        "k8s.io/client-go/util/retry"
40
        ctrl "sigs.k8s.io/controller-runtime"
41
        "sigs.k8s.io/controller-runtime/pkg/client"
42
        "sigs.k8s.io/controller-runtime/pkg/controller"
43
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
44
        "sigs.k8s.io/controller-runtime/pkg/event"
45
        "sigs.k8s.io/controller-runtime/pkg/log"
46
        "sigs.k8s.io/controller-runtime/pkg/predicate"
47
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
48
)
49

50
const (
51
        UserFinalizer = "buf.red/user-finalizer"
52
)
53

54
// UserReconciler reconciles a User object
55
type UserReconciler struct {
56
        client.Client
57
        Scheme *runtime.Scheme
58

59
        K8sClient     kubernetes.ClientSet
60
        EventRecorder record.EventRecorder
61
        Handler       *user.UserHandler
62
}
63

64
// +kubebuilder:rbac:groups=valkey.buf.red,resources=users,verbs=get;list;watch;create;update;patch;delete
65
// +kubebuilder:rbac:groups=valkey.buf.red,resources=users/status,verbs=get;update;patch
66
// +kubebuilder:rbac:groups=valkey.buf.red,resources=users/finalizers,verbs=update
67

68
// Reconcile is part of the main kubernetes reconciliation loop which aims to
69
// move the current state of the cluster closer to the desired state.
70
func (r *UserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
1✔
71
        logger := log.FromContext(ctx).WithName("User").WithValues("target", req.String())
1✔
72

1✔
73
        instance := v1alpha1.User{}
1✔
74
        err := r.Client.Get(ctx, req.NamespacedName, &instance)
1✔
75
        if err != nil {
1✔
NEW
76
                logger.Error(err, "get valkey user failed")
×
77
                if errors.IsNotFound(err) {
×
78
                        return reconcile.Result{}, nil
×
79
                }
×
80
                return reconcile.Result{}, err
×
81
        }
82

83
        isMarkedToBeDeleted := instance.GetDeletionTimestamp() != nil
1✔
84
        if isMarkedToBeDeleted {
1✔
85
                logger.Info("remove finalizer")
×
86
                controllerutil.RemoveFinalizer(&instance, UserFinalizer)
×
87
                if err := r.Update(ctx, &instance); err != nil {
×
88
                        return ctrl.Result{}, err
×
89
                }
×
90
                return ctrl.Result{}, nil
×
91
        }
92

93
        vkName := instance.Spec.InstanceName
1✔
94
        switch instance.Spec.Arch {
1✔
95
        case core.ValkeySentinel, core.ValkeyReplica:
×
96
                rf := &v1alpha1.Failover{}
×
97
                if err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: vkName}, rf); err != nil {
×
98
                        if errors.IsNotFound(err) {
×
99
                                logger.Error(err, "instance not found", "name", vkName)
×
100
                                return ctrl.Result{Requeue: true}, r.Delete(ctx, &instance)
×
101
                        }
×
102
                        logger.Error(err, "get failover failed", "name", instance.Name)
×
103
                        return ctrl.Result{}, err
×
104
                }
105
        case core.ValkeyCluster:
1✔
106
                cluster := &v1alpha1.Cluster{}
1✔
107
                if err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: vkName}, cluster); err != nil {
1✔
108
                        if errors.IsNotFound(err) {
×
109
                                logger.Error(err, "instance not found", "name", vkName)
×
110
                                return ctrl.Result{Requeue: true}, r.Delete(ctx, &instance)
×
111
                        }
×
112
                        logger.Error(err, "get cluster instance failed", "name", instance.Name)
×
113
                        return ctrl.Result{}, err
×
114
                }
115
        }
116

117
        // verify user password
118
        for _, name := range instance.Spec.PasswordSecrets {
1✔
119
                if name == "" {
×
120
                        continue
×
121
                }
122
                secret := &corev1.Secret{}
×
123
                if err := r.Get(ctx, types.NamespacedName{
×
124
                        Namespace: instance.Namespace,
×
125
                        Name:      name,
×
126
                }, secret); err != nil {
×
127
                        logger.Error(err, "get secret failed", "secret name", name)
×
128
                        instance.Status.Message = err.Error()
×
129
                        instance.Status.Phase = v1alpha1.UserFail
×
130
                        if e := r.Client.Status().Update(ctx, &instance); e != nil {
×
131
                                logger.Error(e, "update User status to Fail failed")
×
132
                        }
×
133
                        return ctrl.Result{}, err
×
134
                } else if err := security.PasswordValidate(string(secret.Data["password"]), 8, 32); err != nil {
×
NEW
135
                        if instance.Spec.AccountType != v1alpha1.SystemAccount {
×
136
                                instance.Status.Message = err.Error()
×
137
                                instance.Status.Phase = v1alpha1.UserFail
×
138
                                if e := r.Client.Status().Update(ctx, &instance); e != nil {
×
139
                                        logger.Error(e, "update User status to Fail failed")
×
140
                                }
×
141
                                return ctrl.Result{}, err
×
142
                        }
143
                }
144

145
                if secret.GetLabels() == nil {
×
146
                        secret.SetLabels(map[string]string{})
×
147
                }
×
148
                if secret.Labels[builder.InstanceNameLabelKey] != vkName ||
×
149
                        len(secret.GetOwnerReferences()) == 0 || secret.OwnerReferences[0].UID != instance.GetUID() {
×
150

×
151
                        secret.Labels[builder.ManagedByLabelKey] = config.AppName
×
152
                        secret.Labels[builder.InstanceNameLabelKey] = vkName
×
153
                        secret.OwnerReferences = util.BuildOwnerReferences(&instance)
×
154
                        if err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
×
155
                                return r.K8sClient.UpdateSecret(ctx, instance.Namespace, secret)
×
156
                        }); err != nil {
×
157
                                logger.Error(err, "update secret owner failed", "secret", secret.Name)
×
158
                                instance.Status.Message = err.Error()
×
159
                                instance.Status.Phase = v1alpha1.UserFail
×
160
                                return ctrl.Result{RequeueAfter: time.Second * 5}, r.Client.Status().Update(ctx, &instance)
×
161
                        }
×
162
                }
163
        }
164

165
        if err := r.Handler.Do(ctx, instance, logger); err != nil {
2✔
166
                if strings.Contains(err.Error(), "instance is not ready") ||
1✔
167
                        strings.Contains(err.Error(), "node not ready") ||
1✔
168
                        strings.Contains(err.Error(), "user not operator") ||
1✔
169
                        strings.Contains(err.Error(), "ERR unknown command `ACL`") {
1✔
170
                        logger.V(3).Info("instance is not ready", "instance", vkName)
×
171
                        instance.Status.Message = err.Error()
×
172
                        instance.Status.Phase = v1alpha1.UserPending
×
173
                        if err := r.updateUserStatus(ctx, &instance); err != nil {
×
174
                                logger.Error(err, "update User status to Pending failed")
×
175
                        }
×
176
                        return ctrl.Result{RequeueAfter: time.Second * 15}, nil
×
177
                }
178

179
                instance.Status.Message = err.Error()
1✔
180
                instance.Status.Phase = v1alpha1.UserFail
1✔
181
                logger.Error(err, "user reconcile failed")
1✔
182
                if err := r.updateUserStatus(ctx, &instance); err != nil {
1✔
183
                        logger.Error(err, "update User status to Fail failed")
×
184
                }
×
185
                return reconcile.Result{RequeueAfter: time.Second * 10}, nil
1✔
186
        }
187
        instance.Status.Phase = v1alpha1.UserSuccess
×
188
        instance.Status.Message = ""
×
189
        logger.V(3).Info("user reconcile success")
×
190
        if err := r.updateUserStatus(ctx, &instance); err != nil {
×
191
                logger.Error(err, "update User status to Success failed")
×
192
                return reconcile.Result{RequeueAfter: time.Second * 10}, err
×
193
        }
×
194
        if !controllerutil.ContainsFinalizer(&instance, UserFinalizer) {
×
195
                controllerutil.AddFinalizer(&instance, UserFinalizer)
×
196
                if err := r.updateUser(ctx, &instance); err != nil {
×
197
                        logger.Error(err, "update finalizer user failed")
×
198
                        return ctrl.Result{}, err
×
199
                }
×
200
        }
201

202
        return ctrl.Result{}, nil
×
203
}
204

205
func (r *UserReconciler) updateUserStatus(ctx context.Context, inst *v1alpha1.User) error {
1✔
206
        return retry.RetryOnConflict(retry.DefaultRetry, func() error {
2✔
207
                var oldUser v1alpha1.User
1✔
208
                if err := r.Get(ctx, types.NamespacedName{Namespace: inst.Namespace, Name: inst.Name}, &oldUser); err != nil {
1✔
209
                        return err
×
210
                }
×
211
                inst.ResourceVersion = oldUser.ResourceVersion
1✔
212
                return r.Status().Update(ctx, inst)
1✔
213
        })
214
}
215

216
func (r *UserReconciler) updateUser(ctx context.Context, inst *v1alpha1.User) error {
×
217
        return retry.RetryOnConflict(retry.DefaultRetry, func() error {
×
218
                var oldUser v1alpha1.User
×
219
                if err := r.Get(ctx, types.NamespacedName{Namespace: inst.Namespace, Name: inst.Name}, &oldUser); err != nil {
×
220
                        return err
×
221
                }
×
222
                inst.ResourceVersion = oldUser.ResourceVersion
×
223
                return r.Update(ctx, inst)
×
224
        })
225
}
226

227
// SetupWithManager sets up the controller with the Manager.
228
func (r *UserReconciler) SetupWithManager(mgr ctrl.Manager) error {
×
229
        return ctrl.NewControllerManagedBy(mgr).
×
230
                For(&v1alpha1.User{}).
×
231
                Owns(&corev1.Secret{}).
×
232
                WithEventFilter(generationChangedFilter()).
×
233
                WithOptions(controller.Options{MaxConcurrentReconciles: 8}).
×
234
                Complete(r)
×
235
}
×
236

237
func generationChangedFilter() predicate.Predicate {
×
238
        return predicate.Funcs{
×
239
                UpdateFunc: func(e event.UpdateEvent) bool {
×
240
                        if e.ObjectOld == nil {
×
241
                                return false
×
242
                        }
×
243
                        if e.ObjectNew == nil {
×
244
                                return false
×
245
                        }
×
246
                        if reflect.TypeOf(e.ObjectNew).String() == reflect.TypeOf(&corev1.Secret{}).String() {
×
247
                                return true
×
248
                        }
×
249
                        // Ignore updates to CR status in which case metadata.Generation does not change
250
                        return e.ObjectOld.GetGeneration() != e.ObjectNew.GetGeneration()
×
251
                },
252
        }
253
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc