• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chideat / valkey-operator / 14173979426

31 Mar 2025 02:26PM UTC coverage: 10.247% (-3.3%) from 13.534%
14173979426

push

github

web-flow
feat: added webhooks

168 of 2201 new or added lines in 67 files covered. (7.63%)

1254 existing lines in 61 files now uncovered.

2074 of 20241 relevant lines covered (10.25%)

0.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/internal/controller/user/handler.go
1
/*
2
Copyright 2024 chideat.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package user
18

19
import (
20
        "context"
21
        "encoding/json"
22
        "fmt"
23

24
        "github.com/chideat/valkey-operator/pkg/types"
25
        "github.com/chideat/valkey-operator/pkg/types/user"
26
        "github.com/go-logr/logr"
27
        "k8s.io/apimachinery/pkg/api/errors"
28
        "k8s.io/client-go/tools/record"
29

30
        "github.com/chideat/valkey-operator/api/core"
31
        "github.com/chideat/valkey-operator/api/v1alpha1"
32
        "github.com/chideat/valkey-operator/internal/builder/aclbuilder"
33
        "github.com/chideat/valkey-operator/internal/valkey/cluster"
34
        "github.com/chideat/valkey-operator/internal/valkey/failover"
35
        "github.com/chideat/valkey-operator/pkg/kubernetes"
36
)
37

38
type UserHandler struct {
39
        k8sClient     kubernetes.ClientSet
40
        eventRecorder record.EventRecorder
41
        logger        logr.Logger
42
}
43

UNCOV
44
func NewUserHandler(k8sservice kubernetes.ClientSet, eventRecorder record.EventRecorder, logger logr.Logger) *UserHandler {
×
UNCOV
45
        return &UserHandler{
×
UNCOV
46
                k8sClient:     k8sservice,
×
UNCOV
47
                eventRecorder: eventRecorder,
×
UNCOV
48
                logger:        logger.WithName("UserHandler"),
×
UNCOV
49
        }
×
UNCOV
50
}
×
51

52
func (r *UserHandler) Delete(ctx context.Context, inst v1alpha1.User, logger logr.Logger) error {
×
53
        logger.V(3).Info("delete user", "user instance name", inst.Name,
×
54
                "instance", inst.Spec.InstanceName, "type", inst.Spec.Arch)
×
55
        if inst.Spec.Username == user.DefaultUserName || inst.Spec.Username == user.DefaultOperatorUserName {
×
56
                return nil
×
57
        }
×
58

59
        vkName := inst.Spec.InstanceName
×
60
        cmName := aclbuilder.GenerateACLConfigMapName(inst.Spec.Arch, vkName)
×
61
        if configMap, err := r.k8sClient.GetConfigMap(ctx, inst.Namespace, cmName); err != nil {
×
62
                if !errors.IsNotFound(err) {
×
63
                        logger.Error(err, "delete user from configmap failed")
×
64
                        return err
×
65
                }
×
66
        } else if _, ok := configMap.Data[inst.Spec.Username]; ok {
×
67
                delete(configMap.Data, inst.Spec.Username)
×
68
                if err := r.k8sClient.UpdateConfigMap(ctx, inst.Namespace, configMap); err != nil {
×
69
                        logger.Error(err, "delete user from configmap failed", "configmap", cmName)
×
70
                        return err
×
71
                }
×
72
        }
73

74
        switch inst.Spec.Arch {
×
75
        case core.ValkeyCluster:
×
76
                logger.V(3).Info("cluster", "instance", vkName)
×
77
                rc, err := r.k8sClient.GetCluster(ctx, inst.Namespace, vkName)
×
78
                if errors.IsNotFound(err) {
×
79
                        return nil
×
80
                } else if err != nil {
×
81
                        return err
×
82
                }
×
83

84
                rcm, err := cluster.NewCluster(ctx, r.k8sClient, r.eventRecorder, rc, logger)
×
85
                if err != nil {
×
86
                        return err
×
87
                }
×
88
                if !rcm.IsReady() {
×
89
                        logger.V(3).Info("instance is not ready", "instance", vkName)
×
90
                        return fmt.Errorf("instance is not ready")
×
91
                }
×
92

93
                for _, node := range rcm.Nodes() {
×
NEW
94
                        err := node.Setup(ctx, []any{"ACL", "DELUSER", inst.Spec.Username})
×
95
                        if err != nil {
×
96
                                logger.Error(err, "acl del user failed", "node", node.GetName())
×
97
                                return err
×
98
                        }
×
99
                        logger.V(3).Info("acl del user success", "node", node.GetName())
×
100
                }
101
        case core.ValkeyFailover, core.ValkeyReplica:
×
102
                logger.V(3).Info("sentinel", "instane", vkName)
×
103
                rf, err := r.k8sClient.GetFailover(ctx, inst.Namespace, vkName)
×
104
                if errors.IsNotFound(err) {
×
105
                        return nil
×
106
                } else if err != nil {
×
107
                        return err
×
108
                }
×
109

110
                rfm, err := failover.NewFailover(ctx, r.k8sClient, r.eventRecorder, rf, logger)
×
111
                if err != nil {
×
112
                        return err
×
113
                }
×
114
                if !rfm.IsReady() {
×
115
                        logger.V(3).Info("instance is not ready", "instance", vkName)
×
116
                        return fmt.Errorf("instance is not ready")
×
117
                }
×
118
                for _, node := range rfm.Nodes() {
×
NEW
119
                        err := node.Setup(ctx, []any{"ACL", "DELUSER", inst.Spec.Username})
×
120
                        if err != nil {
×
121
                                logger.Error(err, "acl del user failed", "node", node.GetName())
×
122
                                return err
×
123
                        }
×
124
                        logger.V(3).Info("acl del user success", "node", node.GetName())
×
125
                }
126
        }
127
        return nil
×
128
}
129

UNCOV
130
func (r *UserHandler) Do(ctx context.Context, inst v1alpha1.User, logger logr.Logger) error {
×
UNCOV
131
        if inst.Annotations == nil {
×
UNCOV
132
                inst.Annotations = map[string]string{}
×
UNCOV
133
        }
×
134

UNCOV
135
        passwords := []string{}
×
UNCOV
136
        userPassword := &user.Password{}
×
UNCOV
137
        for _, secretName := range inst.Spec.PasswordSecrets {
×
138
                secret, err := r.k8sClient.GetSecret(ctx, inst.Namespace, secretName)
×
139
                if err != nil {
×
140
                        return err
×
141
                }
×
142
                passwords = append(passwords, string(secret.Data["password"]))
×
143
                userPassword = &user.Password{
×
144
                        SecretName: secretName,
×
145
                }
×
146
        }
147

UNCOV
148
        logger.V(3).Info("reconcile user", "user name", inst.Name, "type", inst.Spec.Arch)
×
UNCOV
149
        vkName := inst.Spec.InstanceName
×
UNCOV
150
        cmName := aclbuilder.GenerateACLConfigMapName(inst.Spec.Arch, vkName)
×
UNCOV
151

×
UNCOV
152
        switch inst.Spec.Arch {
×
UNCOV
153
        case core.ValkeyCluster:
×
UNCOV
154
                logger.V(3).Info("cluster", "instance", vkName)
×
UNCOV
155
                rc, err := r.k8sClient.GetCluster(ctx, inst.Namespace, vkName)
×
UNCOV
156
                if err != nil {
×
157
                        return err
×
158
                }
×
159

UNCOV
160
                rcm, err := cluster.NewCluster(ctx, r.k8sClient, r.eventRecorder, rc, logger)
×
UNCOV
161
                if err != nil {
×
162
                        return err
×
163
                }
×
UNCOV
164
                if !rcm.IsReady() {
×
165
                        logger.V(3).Info("instance is not ready", "instance", vkName)
×
166
                        return fmt.Errorf("instance is not ready")
×
167
                }
×
168

UNCOV
169
                aclRules := inst.Spec.AclRules
×
UNCOV
170
                rule, err := user.NewRule(inst.Spec.AclRules)
×
UNCOV
171
                if err != nil {
×
172
                        logger.V(3).Info("rule parse failed", "rule", inst.Spec.AclRules)
×
173
                        return err
×
174
                }
×
UNCOV
175
                rule = types.PatchClusterClientRequiredRules(rule)
×
UNCOV
176
                aclRules = rule.Encode()
×
UNCOV
177

×
UNCOV
178
                userObj, err := types.NewUserFromValkeyUser(inst.Spec.Username, aclRules, userPassword)
×
UNCOV
179
                if err != nil {
×
180
                        return err
×
181
                }
×
UNCOV
182
                info, err := json.Marshal(userObj)
×
UNCOV
183
                if err != nil {
×
184
                        return err
×
185
                }
×
186

UNCOV
187
                configmap, err := r.k8sClient.GetConfigMap(ctx, inst.Namespace, cmName)
×
UNCOV
188
                if err != nil {
×
UNCOV
189
                        return err
×
UNCOV
190
                }
×
191
                configmap.Data[inst.Spec.Username] = string(info)
×
192

×
NEW
193
                if inst.Spec.AccountType != v1alpha1.SystemAccount {
×
194
                        for _, node := range rcm.Nodes() {
×
195
                                _, err := node.SetACLUser(ctx, inst.Spec.Username, passwords, aclRules)
×
196
                                if err != nil {
×
197
                                        logger.Error(err, "acl set user failed", "node", node.GetName())
×
198
                                        return err
×
199
                                }
×
200
                                logger.V(3).Info("acl set user success", "node", node.GetName())
×
201
                        }
202
                } else {
×
203
                        logger.V(3).Info("skip system account online update", "username", inst.Spec.Username)
×
204
                }
×
205

206
                if err := r.k8sClient.UpdateConfigMap(ctx, inst.Namespace, configmap); err != nil {
×
207
                        logger.Error(err, "update configmap failed", "configmap", configmap.Name)
×
208
                        return err
×
209
                }
×
210
        case core.ValkeyFailover, core.ValkeyReplica:
×
211
                logger.V(3).Info("sentinel", "instance", vkName)
×
212
                rf, err := r.k8sClient.GetFailover(ctx, inst.Namespace, vkName)
×
213
                if err != nil {
×
214
                        return err
×
215
                }
×
216
                rfm, err := failover.NewFailover(ctx, r.k8sClient, r.eventRecorder, rf, logger)
×
217
                if err != nil {
×
218
                        return err
×
219
                }
×
220
                if !rfm.IsReady() {
×
221
                        logger.V(3).Info("instance is not ready", "instance", vkName)
×
222
                        return fmt.Errorf("instance is not ready")
×
223
                }
×
224
                configmap, err := r.k8sClient.GetConfigMap(ctx, inst.Namespace, cmName)
×
225
                if err != nil {
×
226
                        return err
×
227
                }
×
228
                userObj, err := types.NewUserFromValkeyUser(inst.Spec.Username, inst.Spec.AclRules, userPassword)
×
229
                if err != nil {
×
230
                        return err
×
231
                }
×
232
                info, err := json.Marshal(userObj)
×
233
                if err != nil {
×
234
                        return err
×
235
                }
×
236
                configmap.Data[inst.Spec.Username] = string(info)
×
237

×
NEW
238
                if inst.Spec.AccountType != v1alpha1.SystemAccount {
×
239
                        for _, node := range rfm.Nodes() {
×
240
                                _, err := node.SetACLUser(ctx, inst.Spec.Username, passwords, inst.Spec.AclRules)
×
241
                                if err != nil {
×
242
                                        logger.Error(err, "acl set user failed", "node", node.GetName())
×
243
                                        return err
×
244
                                }
×
245
                                logger.V(3).Info("acl set user success", "node", node.GetName())
×
246
                        }
247
                } else {
×
248
                        logger.V(3).Info("skip system account online update", "username", inst.Spec.Username)
×
249
                }
×
250

251
                if err := r.k8sClient.UpdateConfigMap(ctx, inst.Namespace, configmap); err != nil {
×
252
                        logger.Error(err, "update configmap failed", "configmap", configmap.Name)
×
253
                        return err
×
254
                }
×
255
        }
256
        return nil
×
257
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc