• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chideat / valkey-operator / 14401089424

11 Apr 2025 10:26AM UTC coverage: 13.642% (+3.4%) from 10.284%
14401089424

Pull #18

github

chideat
fix: updated package versions
Pull Request #18: chore: added more unit tests

67 of 245 new or added lines in 21 files covered. (27.35%)

6 existing lines in 1 file now uncovered.

2730 of 20012 relevant lines covered (13.64%)

0.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/internal/controller/user/handler.go
1
/*
2
Copyright 2024 chideat.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package user
18

19
import (
20
        "context"
21
        "encoding/json"
22
        "fmt"
23

24
        "github.com/chideat/valkey-operator/pkg/types"
25
        "github.com/chideat/valkey-operator/pkg/types/user"
26
        "github.com/go-logr/logr"
27
        "k8s.io/apimachinery/pkg/api/errors"
28
        "k8s.io/client-go/tools/record"
29

30
        "github.com/chideat/valkey-operator/api/core"
31
        "github.com/chideat/valkey-operator/api/v1alpha1"
32
        "github.com/chideat/valkey-operator/internal/builder/aclbuilder"
33
        "github.com/chideat/valkey-operator/internal/valkey/cluster"
34
        "github.com/chideat/valkey-operator/internal/valkey/failover"
35
        "github.com/chideat/valkey-operator/pkg/kubernetes"
36
)
37

38
type UserHandler struct {
39
        k8sClient     kubernetes.ClientSet
40
        eventRecorder record.EventRecorder
41
        logger        logr.Logger
42
}
43

44
func NewUserHandler(k8sservice kubernetes.ClientSet, eventRecorder record.EventRecorder, logger logr.Logger) *UserHandler {
×
45
        return &UserHandler{
×
46
                k8sClient:     k8sservice,
×
47
                eventRecorder: eventRecorder,
×
48
                logger:        logger.WithName("UserHandler"),
×
49
        }
×
50
}
×
51

52
func (r *UserHandler) Delete(ctx context.Context, inst v1alpha1.User, logger logr.Logger) error {
×
53
        logger.V(3).Info("delete user", "user instance name", inst.Name,
×
54
                "instance", inst.Spec.InstanceName, "type", inst.Spec.Arch)
×
55
        if inst.Spec.Username == user.DefaultUserName || inst.Spec.Username == user.DefaultOperatorUserName {
×
56
                return nil
×
57
        }
×
58

59
        vkName := inst.Spec.InstanceName
×
60
        cmName := aclbuilder.GenerateACLConfigMapName(inst.Spec.Arch, vkName)
×
61
        if configMap, err := r.k8sClient.GetConfigMap(ctx, inst.Namespace, cmName); err != nil {
×
62
                if !errors.IsNotFound(err) {
×
63
                        logger.Error(err, "delete user from configmap failed")
×
64
                        return err
×
65
                }
×
66
        } else if _, ok := configMap.Data[inst.Spec.Username]; ok {
×
67
                delete(configMap.Data, inst.Spec.Username)
×
68
                if err := r.k8sClient.UpdateConfigMap(ctx, inst.Namespace, configMap); err != nil {
×
69
                        logger.Error(err, "delete user from configmap failed", "configmap", cmName)
×
70
                        return err
×
71
                }
×
72
        }
73

74
        switch inst.Spec.Arch {
×
75
        case core.ValkeyCluster:
×
76
                logger.V(3).Info("cluster", "instance", vkName)
×
77
                rc, err := r.k8sClient.GetCluster(ctx, inst.Namespace, vkName)
×
78
                if errors.IsNotFound(err) {
×
79
                        return nil
×
80
                } else if err != nil {
×
81
                        return err
×
82
                }
×
83

84
                rcm, err := cluster.NewCluster(ctx, r.k8sClient, r.eventRecorder, rc, logger)
×
85
                if err != nil {
×
86
                        return err
×
87
                }
×
88
                if !rcm.IsReady() {
×
89
                        logger.V(3).Info("instance is not ready", "instance", vkName)
×
90
                        return fmt.Errorf("instance is not ready")
×
91
                }
×
92

93
                for _, node := range rcm.Nodes() {
×
94
                        err := node.Setup(ctx, []any{"ACL", "DELUSER", inst.Spec.Username})
×
95
                        if err != nil {
×
96
                                logger.Error(err, "acl del user failed", "node", node.GetName())
×
97
                                return err
×
98
                        }
×
99
                        logger.V(3).Info("acl del user success", "node", node.GetName())
×
100
                }
101
        case core.ValkeyFailover, core.ValkeyReplica:
×
102
                logger.V(3).Info("sentinel", "instane", vkName)
×
103
                rf, err := r.k8sClient.GetFailover(ctx, inst.Namespace, vkName)
×
104
                if errors.IsNotFound(err) {
×
105
                        return nil
×
106
                } else if err != nil {
×
107
                        return err
×
108
                }
×
109

110
                rfm, err := failover.NewFailover(ctx, r.k8sClient, r.eventRecorder, rf, logger)
×
111
                if err != nil {
×
112
                        return err
×
113
                }
×
114
                if !rfm.IsReady() {
×
115
                        logger.V(3).Info("instance is not ready", "instance", vkName)
×
116
                        return fmt.Errorf("instance is not ready")
×
117
                }
×
118
                for _, node := range rfm.Nodes() {
×
119
                        err := node.Setup(ctx, []any{"ACL", "DELUSER", inst.Spec.Username})
×
120
                        if err != nil {
×
121
                                logger.Error(err, "acl del user failed", "node", node.GetName())
×
122
                                return err
×
123
                        }
×
124
                        logger.V(3).Info("acl del user success", "node", node.GetName())
×
125
                }
126
        }
127
        return nil
×
128
}
129

130
func (r *UserHandler) Do(ctx context.Context, inst v1alpha1.User, logger logr.Logger) error {
×
131
        if inst.Annotations == nil {
×
132
                inst.Annotations = map[string]string{}
×
133
        }
×
134

135
        passwords := []string{}
×
136
        userPassword := &user.Password{}
×
137
        for _, secretName := range inst.Spec.PasswordSecrets {
×
138
                secret, err := r.k8sClient.GetSecret(ctx, inst.Namespace, secretName)
×
139
                if err != nil {
×
140
                        return err
×
141
                }
×
142
                passwords = append(passwords, string(secret.Data["password"]))
×
143
                userPassword = &user.Password{
×
144
                        SecretName: secretName,
×
145
                }
×
146
        }
147

148
        logger.V(3).Info("reconcile user", "user name", inst.Name, "type", inst.Spec.Arch)
×
149
        vkName := inst.Spec.InstanceName
×
150
        cmName := aclbuilder.GenerateACLConfigMapName(inst.Spec.Arch, vkName)
×
151

×
152
        switch inst.Spec.Arch {
×
153
        case core.ValkeyCluster:
×
154
                logger.V(3).Info("cluster", "instance", vkName)
×
155
                rc, err := r.k8sClient.GetCluster(ctx, inst.Namespace, vkName)
×
156
                if err != nil {
×
157
                        return err
×
158
                }
×
159

160
                rcm, err := cluster.NewCluster(ctx, r.k8sClient, r.eventRecorder, rc, logger)
×
161
                if err != nil {
×
162
                        return err
×
163
                }
×
164
                if !rcm.IsReady() {
×
165
                        logger.V(3).Info("instance is not ready", "instance", vkName)
×
166
                        return fmt.Errorf("instance is not ready")
×
167
                }
×
168

169
                rule, err := user.NewRule(inst.Spec.AclRules)
×
170
                if err != nil {
×
171
                        logger.V(3).Info("rule parse failed", "rule", inst.Spec.AclRules)
×
172
                        return err
×
173
                }
×
174
                rule = types.PatchClusterClientRequiredRules(rule)
×
NEW
175
                aclRules := rule.Encode()
×
176

×
177
                userObj, err := types.NewUserFromValkeyUser(inst.Spec.Username, aclRules, userPassword)
×
178
                if err != nil {
×
179
                        return err
×
180
                }
×
181
                info, err := json.Marshal(userObj)
×
182
                if err != nil {
×
183
                        return err
×
184
                }
×
185

186
                configmap, err := r.k8sClient.GetConfigMap(ctx, inst.Namespace, cmName)
×
187
                if err != nil {
×
188
                        return err
×
189
                }
×
190
                configmap.Data[inst.Spec.Username] = string(info)
×
191

×
192
                if inst.Spec.AccountType != v1alpha1.SystemAccount {
×
193
                        for _, node := range rcm.Nodes() {
×
194
                                _, err := node.SetACLUser(ctx, inst.Spec.Username, passwords, aclRules)
×
195
                                if err != nil {
×
196
                                        logger.Error(err, "acl set user failed", "node", node.GetName())
×
197
                                        return err
×
198
                                }
×
199
                                logger.V(3).Info("acl set user success", "node", node.GetName())
×
200
                        }
201
                } else {
×
202
                        logger.V(3).Info("skip system account online update", "username", inst.Spec.Username)
×
203
                }
×
204

205
                if err := r.k8sClient.UpdateConfigMap(ctx, inst.Namespace, configmap); err != nil {
×
206
                        logger.Error(err, "update configmap failed", "configmap", configmap.Name)
×
207
                        return err
×
208
                }
×
209
        case core.ValkeyFailover, core.ValkeyReplica:
×
210
                logger.V(3).Info("sentinel", "instance", vkName)
×
211
                rf, err := r.k8sClient.GetFailover(ctx, inst.Namespace, vkName)
×
212
                if err != nil {
×
213
                        return err
×
214
                }
×
215
                rfm, err := failover.NewFailover(ctx, r.k8sClient, r.eventRecorder, rf, logger)
×
216
                if err != nil {
×
217
                        return err
×
218
                }
×
219
                if !rfm.IsReady() {
×
220
                        logger.V(3).Info("instance is not ready", "instance", vkName)
×
221
                        return fmt.Errorf("instance is not ready")
×
222
                }
×
223
                configmap, err := r.k8sClient.GetConfigMap(ctx, inst.Namespace, cmName)
×
224
                if err != nil {
×
225
                        return err
×
226
                }
×
227
                userObj, err := types.NewUserFromValkeyUser(inst.Spec.Username, inst.Spec.AclRules, userPassword)
×
228
                if err != nil {
×
229
                        return err
×
230
                }
×
231
                info, err := json.Marshal(userObj)
×
232
                if err != nil {
×
233
                        return err
×
234
                }
×
235
                configmap.Data[inst.Spec.Username] = string(info)
×
236

×
237
                if inst.Spec.AccountType != v1alpha1.SystemAccount {
×
238
                        for _, node := range rfm.Nodes() {
×
239
                                _, err := node.SetACLUser(ctx, inst.Spec.Username, passwords, inst.Spec.AclRules)
×
240
                                if err != nil {
×
241
                                        logger.Error(err, "acl set user failed", "node", node.GetName())
×
242
                                        return err
×
243
                                }
×
244
                                logger.V(3).Info("acl set user success", "node", node.GetName())
×
245
                        }
246
                } else {
×
247
                        logger.V(3).Info("skip system account online update", "username", inst.Spec.Username)
×
248
                }
×
249

250
                if err := r.k8sClient.UpdateConfigMap(ctx, inst.Namespace, configmap); err != nil {
×
251
                        logger.Error(err, "update configmap failed", "configmap", configmap.Name)
×
252
                        return err
×
253
                }
×
254
        }
255
        return nil
×
256
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc