• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chideat / valkey-operator / 13214360144

08 Feb 2025 09:25AM UTC coverage: 13.534%. First build
13214360144

push

github

web-flow
feat: added features for user, added suit tests (#2)

* chore: updated github actions

* feat: added user recocile, added suit tests

* fix: upgrade x/net for high cve

* fix: update test command

* chore: updated github actions

* chore: updated docker file

108 of 501 new or added lines in 31 files covered. (21.56%)

3048 of 22521 relevant lines covered (13.53%)

0.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

21.05
/internal/controller/user/handler.go
1
package user
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "fmt"
7

8
        "github.com/chideat/valkey-operator/pkg/types"
9
        "github.com/chideat/valkey-operator/pkg/types/user"
10
        "github.com/go-logr/logr"
11
        "k8s.io/apimachinery/pkg/api/errors"
12
        "k8s.io/client-go/tools/record"
13

14
        "github.com/chideat/valkey-operator/api/core"
15
        "github.com/chideat/valkey-operator/api/v1alpha1"
16
        "github.com/chideat/valkey-operator/internal/builder/aclbuilder"
17
        "github.com/chideat/valkey-operator/internal/valkey/cluster"
18
        "github.com/chideat/valkey-operator/internal/valkey/failover"
19
        "github.com/chideat/valkey-operator/pkg/kubernetes"
20
)
21

22
type UserHandler struct {
23
        k8sClient     kubernetes.ClientSet
24
        eventRecorder record.EventRecorder
25
        logger        logr.Logger
26
}
27

28
func NewUserHandler(k8sservice kubernetes.ClientSet, eventRecorder record.EventRecorder, logger logr.Logger) *UserHandler {
1✔
29
        return &UserHandler{
1✔
30
                k8sClient:     k8sservice,
1✔
31
                eventRecorder: eventRecorder,
1✔
32
                logger:        logger.WithName("UserHandler"),
1✔
33
        }
1✔
34
}
1✔
35

NEW
36
func (r *UserHandler) Delete(ctx context.Context, inst v1alpha1.User, logger logr.Logger) error {
×
NEW
37
        logger.V(3).Info("delete user", "user instance name", inst.Name,
×
NEW
38
                "instance", inst.Spec.InstanceName, "type", inst.Spec.Arch)
×
NEW
39
        if inst.Spec.Username == user.DefaultUserName || inst.Spec.Username == user.DefaultOperatorUserName {
×
NEW
40
                return nil
×
NEW
41
        }
×
42

NEW
43
        vkName := inst.Spec.InstanceName
×
NEW
44
        cmName := aclbuilder.GenerateACLConfigMapName(inst.Spec.Arch, vkName)
×
NEW
45
        if configMap, err := r.k8sClient.GetConfigMap(ctx, inst.Namespace, cmName); err != nil {
×
NEW
46
                if !errors.IsNotFound(err) {
×
NEW
47
                        logger.Error(err, "delete user from configmap failed")
×
NEW
48
                        return err
×
NEW
49
                }
×
NEW
50
        } else if _, ok := configMap.Data[inst.Spec.Username]; ok {
×
NEW
51
                delete(configMap.Data, inst.Spec.Username)
×
NEW
52
                if err := r.k8sClient.UpdateConfigMap(ctx, inst.Namespace, configMap); err != nil {
×
NEW
53
                        logger.Error(err, "delete user from configmap failed", "configmap", cmName)
×
NEW
54
                        return err
×
NEW
55
                }
×
56
        }
57

NEW
58
        switch inst.Spec.Arch {
×
NEW
59
        case core.ValkeyCluster:
×
NEW
60
                logger.V(3).Info("cluster", "instance", vkName)
×
NEW
61
                rc, err := r.k8sClient.GetCluster(ctx, inst.Namespace, vkName)
×
NEW
62
                if errors.IsNotFound(err) {
×
NEW
63
                        return nil
×
NEW
64
                } else if err != nil {
×
NEW
65
                        return err
×
NEW
66
                }
×
67

NEW
68
                rcm, err := cluster.NewCluster(ctx, r.k8sClient, r.eventRecorder, rc, logger)
×
NEW
69
                if err != nil {
×
NEW
70
                        return err
×
NEW
71
                }
×
NEW
72
                if !rcm.IsReady() {
×
NEW
73
                        logger.V(3).Info("instance is not ready", "instance", vkName)
×
NEW
74
                        return fmt.Errorf("instance is not ready")
×
NEW
75
                }
×
76

NEW
77
                for _, node := range rcm.Nodes() {
×
NEW
78
                        err := node.Setup(ctx, []interface{}{"ACL", "DELUSER", inst.Spec.Username})
×
NEW
79
                        if err != nil {
×
NEW
80
                                logger.Error(err, "acl del user failed", "node", node.GetName())
×
NEW
81
                                return err
×
NEW
82
                        }
×
NEW
83
                        logger.V(3).Info("acl del user success", "node", node.GetName())
×
84
                }
NEW
85
        case core.ValkeyFailover, core.ValkeyReplica:
×
NEW
86
                logger.V(3).Info("sentinel", "instane", vkName)
×
NEW
87
                rf, err := r.k8sClient.GetFailover(ctx, inst.Namespace, vkName)
×
NEW
88
                if errors.IsNotFound(err) {
×
NEW
89
                        return nil
×
NEW
90
                } else if err != nil {
×
NEW
91
                        return err
×
NEW
92
                }
×
93

NEW
94
                rfm, err := failover.NewFailover(ctx, r.k8sClient, r.eventRecorder, rf, logger)
×
NEW
95
                if err != nil {
×
NEW
96
                        return err
×
NEW
97
                }
×
NEW
98
                if !rfm.IsReady() {
×
NEW
99
                        logger.V(3).Info("instance is not ready", "instance", vkName)
×
NEW
100
                        return fmt.Errorf("instance is not ready")
×
NEW
101
                }
×
NEW
102
                for _, node := range rfm.Nodes() {
×
NEW
103
                        err := node.Setup(ctx, []interface{}{"ACL", "DELUSER", inst.Spec.Username})
×
NEW
104
                        if err != nil {
×
NEW
105
                                logger.Error(err, "acl del user failed", "node", node.GetName())
×
NEW
106
                                return err
×
NEW
107
                        }
×
NEW
108
                        logger.V(3).Info("acl del user success", "node", node.GetName())
×
109
                }
110
        }
NEW
111
        return nil
×
112
}
113

114
func (r *UserHandler) Do(ctx context.Context, inst v1alpha1.User, logger logr.Logger) error {
1✔
115
        if inst.Annotations == nil {
2✔
116
                inst.Annotations = map[string]string{}
1✔
117
        }
1✔
118

119
        passwords := []string{}
1✔
120
        userPassword := &user.Password{}
1✔
121
        for _, secretName := range inst.Spec.PasswordSecrets {
1✔
NEW
122
                secret, err := r.k8sClient.GetSecret(ctx, inst.Namespace, secretName)
×
NEW
123
                if err != nil {
×
NEW
124
                        return err
×
NEW
125
                }
×
NEW
126
                passwords = append(passwords, string(secret.Data["password"]))
×
NEW
127
                userPassword = &user.Password{
×
NEW
128
                        SecretName: secretName,
×
NEW
129
                }
×
130
        }
131

132
        logger.V(3).Info("reconcile user", "user name", inst.Name, "type", inst.Spec.Arch)
1✔
133
        vkName := inst.Spec.InstanceName
1✔
134
        cmName := aclbuilder.GenerateACLConfigMapName(inst.Spec.Arch, vkName)
1✔
135

1✔
136
        switch inst.Spec.Arch {
1✔
137
        case core.ValkeyCluster:
1✔
138
                logger.V(3).Info("cluster", "instance", vkName)
1✔
139
                rc, err := r.k8sClient.GetCluster(ctx, inst.Namespace, vkName)
1✔
140
                if err != nil {
1✔
NEW
141
                        return err
×
NEW
142
                }
×
143

144
                rcm, err := cluster.NewCluster(ctx, r.k8sClient, r.eventRecorder, rc, logger)
1✔
145
                if err != nil {
1✔
NEW
146
                        return err
×
NEW
147
                }
×
148
                if !rcm.IsReady() {
1✔
NEW
149
                        logger.V(3).Info("instance is not ready", "instance", vkName)
×
NEW
150
                        return fmt.Errorf("instance is not ready")
×
NEW
151
                }
×
152

153
                aclRules := inst.Spec.AclRules
1✔
154
                rule, err := user.NewRule(inst.Spec.AclRules)
1✔
155
                if err != nil {
1✔
NEW
156
                        logger.V(3).Info("rule parse failed", "rule", inst.Spec.AclRules)
×
NEW
157
                        return err
×
NEW
158
                }
×
159
                rule = types.PatchClusterClientRequiredRules(rule)
1✔
160
                aclRules = rule.Encode()
1✔
161

1✔
162
                userObj, err := types.NewUserFromValkeyUser(inst.Spec.Username, aclRules, userPassword)
1✔
163
                if err != nil {
1✔
NEW
164
                        return err
×
NEW
165
                }
×
166
                info, err := json.Marshal(userObj)
1✔
167
                if err != nil {
1✔
NEW
168
                        return err
×
NEW
169
                }
×
170

171
                configmap, err := r.k8sClient.GetConfigMap(ctx, inst.Namespace, cmName)
1✔
172
                if err != nil {
2✔
173
                        return err
1✔
174
                }
1✔
NEW
175
                configmap.Data[inst.Spec.Username] = string(info)
×
NEW
176

×
NEW
177
                if inst.Spec.AccountType != v1alpha1.System {
×
NEW
178
                        for _, node := range rcm.Nodes() {
×
NEW
179
                                _, err := node.SetACLUser(ctx, inst.Spec.Username, passwords, aclRules)
×
NEW
180
                                if err != nil {
×
NEW
181
                                        logger.Error(err, "acl set user failed", "node", node.GetName())
×
NEW
182
                                        return err
×
NEW
183
                                }
×
NEW
184
                                logger.V(3).Info("acl set user success", "node", node.GetName())
×
185
                        }
NEW
186
                } else {
×
NEW
187
                        logger.V(3).Info("skip system account online update", "username", inst.Spec.Username)
×
NEW
188
                }
×
189

NEW
190
                if err := r.k8sClient.UpdateConfigMap(ctx, inst.Namespace, configmap); err != nil {
×
NEW
191
                        logger.Error(err, "update configmap failed", "configmap", configmap.Name)
×
NEW
192
                        return err
×
NEW
193
                }
×
NEW
194
        case core.ValkeyFailover, core.ValkeyReplica:
×
NEW
195
                logger.V(3).Info("sentinel", "instance", vkName)
×
NEW
196
                rf, err := r.k8sClient.GetFailover(ctx, inst.Namespace, vkName)
×
NEW
197
                if err != nil {
×
NEW
198
                        return err
×
NEW
199
                }
×
NEW
200
                rfm, err := failover.NewFailover(ctx, r.k8sClient, r.eventRecorder, rf, logger)
×
NEW
201
                if err != nil {
×
NEW
202
                        return err
×
NEW
203
                }
×
NEW
204
                if !rfm.IsReady() {
×
NEW
205
                        logger.V(3).Info("instance is not ready", "instance", vkName)
×
NEW
206
                        return fmt.Errorf("instance is not ready")
×
NEW
207
                }
×
NEW
208
                configmap, err := r.k8sClient.GetConfigMap(ctx, inst.Namespace, cmName)
×
NEW
209
                if err != nil {
×
NEW
210
                        return err
×
NEW
211
                }
×
NEW
212
                userObj, err := types.NewUserFromValkeyUser(inst.Spec.Username, inst.Spec.AclRules, userPassword)
×
NEW
213
                if err != nil {
×
NEW
214
                        return err
×
NEW
215
                }
×
NEW
216
                info, err := json.Marshal(userObj)
×
NEW
217
                if err != nil {
×
NEW
218
                        return err
×
NEW
219
                }
×
NEW
220
                configmap.Data[inst.Spec.Username] = string(info)
×
NEW
221

×
NEW
222
                if inst.Spec.AccountType != v1alpha1.System {
×
NEW
223
                        for _, node := range rfm.Nodes() {
×
NEW
224
                                _, err := node.SetACLUser(ctx, inst.Spec.Username, passwords, inst.Spec.AclRules)
×
NEW
225
                                if err != nil {
×
NEW
226
                                        logger.Error(err, "acl set user failed", "node", node.GetName())
×
NEW
227
                                        return err
×
NEW
228
                                }
×
NEW
229
                                logger.V(3).Info("acl set user success", "node", node.GetName())
×
230
                        }
NEW
231
                } else {
×
NEW
232
                        logger.V(3).Info("skip system account online update", "username", inst.Spec.Username)
×
NEW
233
                }
×
234

NEW
235
                if err := r.k8sClient.UpdateConfigMap(ctx, inst.Namespace, configmap); err != nil {
×
NEW
236
                        logger.Error(err, "update configmap failed", "configmap", configmap.Name)
×
NEW
237
                        return err
×
NEW
238
                }
×
239
        }
NEW
240
        return nil
×
241
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc