• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HDT3213 / godis / 15257335499

26 May 2025 03:18PM UTC coverage: 72.716% (+0.7%) from 72.002%
15257335499

push

github

HDT3213
chores for cluster

6 of 16 new or added lines in 7 files covered. (37.5%)

4 existing lines in 3 files now uncovered.

8563 of 11776 relevant lines covered (72.72%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.27
/cluster/core/node_manager.go
1
package core
2

3
import (
4
        "errors"
5
        "fmt"
6
        "math"
7
        "strconv"
8
        "sync"
9
        "time"
10

11
        "github.com/hdt3213/godis/cluster/raft"
12
        "github.com/hdt3213/godis/interface/redis"
13
        "github.com/hdt3213/godis/lib/logger"
14
        "github.com/hdt3213/godis/lib/utils"
15
        "github.com/hdt3213/godis/redis/protocol"
16
)
17

18
const joinClusterCommand = "cluster.join"
19
const migrationChangeRouteCommand = "cluster.migration.changeroute"
20

21
func init() {
1✔
22
        RegisterCmd(joinClusterCommand, execJoin)
1✔
23
        RegisterCmd(migrationChangeRouteCommand, execMigrationChangeRoute)
1✔
24
}
1✔
25

26
// execJoin handles cluster-join command as raft leader
27
// format: cluster-join redisAddress(advertised), raftAddress, masterId
28
func execJoin(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
1✔
29
        if len(cmdLine) < 3 {
1✔
30
                return protocol.MakeArgNumErrReply(joinClusterCommand)
×
31
        }
×
32
        state := cluster.raftNode.State()
1✔
33
        if state != raft.Leader {
1✔
34
                // I am not leader, forward request to leader
×
35
                leaderConn, err := cluster.BorrowLeaderClient()
×
36
                if err != nil {
×
37
                        return protocol.MakeErrReply(err.Error())
×
38
                }
×
39
                defer cluster.connections.ReturnPeerClient(leaderConn)
×
40
                return leaderConn.Send(cmdLine)
×
41
        }
42
        // self node is leader
43
        redisAddr := string(cmdLine[1])
1✔
44
        raftAddr := string(cmdLine[2])
1✔
45
        err := cluster.raftNode.AddToRaft(redisAddr, raftAddr)
1✔
46
        if err != nil {
1✔
47
                return protocol.MakeErrReply(err.Error())
×
48
        }
×
49
        master := ""
1✔
50
        if len(cmdLine) == 4 {
2✔
51
                master = string(cmdLine[3])
1✔
52
        }
1✔
53
        _, err = cluster.raftNode.Propose(&raft.LogEntry{
1✔
54
                Event: raft.EventJoin,
1✔
55
                JoinTask: &raft.JoinTask{
1✔
56
                        NodeId: redisAddr,
1✔
57
                        Master: master,
1✔
58
                },
1✔
59
        })
1✔
60
        if err != nil {
1✔
61
                // todo: remove the node from raft
×
62
                return protocol.MakeErrReply(err.Error())
×
63
        }
×
64

65
        // join sucees, rebalance node
66
        return protocol.MakeOkReply()
1✔
67
}
68

69
type rebalanceManager struct {
70
        mu *sync.Mutex
71
}
72

73
func newRebalanceManager() *rebalanceManager {
1✔
74
        return &rebalanceManager{
1✔
75
                mu: &sync.Mutex{},
1✔
76
        }
1✔
77
}
1✔
78

79
func (cluster *Cluster) doRebalance() {
1✔
80
        cluster.rebalanceManger.mu.Lock()
1✔
81
        defer cluster.rebalanceManger.mu.Unlock()
1✔
82
        pendingTasks, err := cluster.makeRebalancePlan()
1✔
83
        if err != nil {
1✔
84
                logger.Errorf("makeRebalancePlan err: %v", err)
×
85
                return
×
86
        }
×
87
        if len(pendingTasks) == 0 {
1✔
UNCOV
88
                return
×
89
        }
×
90
        logger.Infof("rebalance plan generated, contains %d tasks", len(pendingTasks))
1✔
91

1✔
92
        for _, task := range pendingTasks {
2✔
93
                err := cluster.triggerMigrationTask(task)
1✔
94
                if err != nil {
1✔
95
                        logger.Errorf("triggerMigrationTask err: %v", err)
×
96
                } else {
1✔
97
                        logger.Infof("triggerMigrationTask %s success", task.ID)
1✔
98
                }
1✔
99
        }
100

101
}
102

103
// triggerRebalanceTask start a rebalance task
104
// only leader can do
105
func (cluster *Cluster) triggerMigrationTask(task *raft.MigratingTask) error {
1✔
106
        // propose migration
1✔
107
        _, err := cluster.raftNode.Propose(&raft.LogEntry{
1✔
108
                Event:         raft.EventStartMigrate,
1✔
109
                MigratingTask: task,
1✔
110
        })
1✔
111
        if err != nil {
1✔
112
                return fmt.Errorf("propose EventStartMigrate  %s failed: %v", task.ID, err)
×
113
        }
×
114
        logger.Infof("propose EventStartMigrate %s success", task.ID)
1✔
115

1✔
116
        cmdLine := utils.ToCmdLine(startMigrationCommand, task.ID, task.SrcNode)
1✔
117
        for _, slotId := range task.Slots {
2✔
118
                slotIdStr := strconv.Itoa(int(slotId))
1✔
119
                cmdLine = append(cmdLine, []byte(slotIdStr))
1✔
120
        }
1✔
121
        targetNodeConn, err := cluster.connections.BorrowPeerClient(task.TargetNode)
1✔
122
        if err != nil {
1✔
123
                return err
×
124
        }
×
125
        defer cluster.connections.ReturnPeerClient(targetNodeConn)
1✔
126
        reply := targetNodeConn.Send(cmdLine)
1✔
127
        if protocol.IsOKReply(reply) {
2✔
128
                return nil
1✔
129
        }
1✔
130
        return protocol.MakeErrReply("")
×
131
}
132

133
func (cluster *Cluster) makeRebalancePlan() ([]*raft.MigratingTask, error) {
1✔
134

1✔
135
        var migratings []*raft.MigratingTask
1✔
136
        cluster.raftNode.FSM.WithReadLock(func(fsm *raft.FSM) {
2✔
137
                avgSlot := int(math.Ceil(float64(SlotCount) / float64(len(fsm.MasterSlaves))))
1✔
138
                var exportingNodes []string
1✔
139
                var importingNodes []string
1✔
140
                for _, ms := range fsm.MasterSlaves {
2✔
141
                        nodeId := ms.MasterId
1✔
142
                        nodeSlots := fsm.Node2Slot[nodeId]
1✔
143
                        if len(nodeSlots) > avgSlot+1 {
2✔
144
                                exportingNodes = append(exportingNodes, nodeId)
1✔
145
                        }
1✔
146
                        if len(nodeSlots) < avgSlot-1 {
2✔
147
                                importingNodes = append(importingNodes, nodeId)
1✔
148
                        }
1✔
149
                }
150

151
                importIndex := 0
1✔
152
                exportIndex := 0
1✔
153
                var exportSlots []uint32
1✔
154
                for importIndex < len(importingNodes) && exportIndex < len(exportingNodes) {
2✔
155
                        exportNode := exportingNodes[exportIndex]
1✔
156
                        if len(exportSlots) == 0 {
2✔
157
                                exportNodeSlots := fsm.Node2Slot[exportNode]
1✔
158
                                exportCount := len(exportNodeSlots) - avgSlot
1✔
159
                                exportSlots = exportNodeSlots[0:exportCount]
1✔
160
                        }
1✔
161
                        importNode := importingNodes[importIndex]
1✔
162
                        importNodeCurrentIndex := fsm.Node2Slot[importNode]
1✔
163
                        requirements := avgSlot - len(importNodeCurrentIndex)
1✔
164
                        task := &raft.MigratingTask{
1✔
165
                                ID:         utils.RandString(20),
1✔
166
                                SrcNode:    exportNode,
1✔
167
                                TargetNode: importNode,
1✔
168
                        }
1✔
169
                        if requirements <= len(exportSlots) {
2✔
170
                                // exportSlots 可以提供足够的 slots, importingNode 处理完毕
1✔
171
                                task.Slots = exportSlots[0:requirements]
1✔
172
                                exportSlots = exportSlots[requirements:]
1✔
173
                                importIndex++
1✔
174
                        } else {
1✔
175
                                // exportSlots 无法提供足够的 slots, exportingNode 处理完毕
×
176
                                task.Slots = exportSlots
×
177
                                exportSlots = nil
×
178
                                exportIndex++
×
179
                        }
×
180
                        migratings = append(migratings, task)
1✔
181
                }
182
        })
183
        return migratings, nil
1✔
184
}
185

186
func (cluster *Cluster) waitCommitted(peer string, logIndex uint64) error {
1✔
187
        srcNodeConn, err := cluster.connections.BorrowPeerClient(peer)
1✔
188
        if err != nil {
1✔
189
                return err
×
190
        }
×
191
        defer cluster.connections.ReturnPeerClient(srcNodeConn)
1✔
192
        var peerIndex uint64
1✔
193
        for i := 0; i < 50; i++ {
2✔
194
                reply := srcNodeConn.Send(utils.ToCmdLine(getCommittedIndexCommand))
1✔
195
                switch reply := reply.(type) {
1✔
196
                case *protocol.IntReply:
1✔
197
                        peerIndex = uint64(reply.Code)
1✔
198
                        if peerIndex >= logIndex {
2✔
199
                                return nil
1✔
200
                        }
1✔
201
                case *protocol.StandardErrReply:
×
202
                        logger.Infof("get committed index failed: %v", reply.Error())
×
203
                default:
×
204
                        logger.Infof("get committed index unknown responseL %+v", reply.ToBytes())
×
205
                }
206
                time.Sleep(time.Millisecond * 100)
1✔
207
        }
208
        return errors.New("wait committed timeout")
×
209
}
210

211
// execMigrationChangeRoute should be exectued at leader
212
// it proposes EventFinishMigrate through raft, to change the route to the new node
213
// it returns until the proposal has been accepted by the majority  and two related nodes
214
// format: cluster.migration.changeroute taskid
215
func execMigrationChangeRoute(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
1✔
216
        if len(cmdLine) != 2 {
1✔
217
                return protocol.MakeArgNumErrReply(migrationChangeRouteCommand)
×
218
        }
×
219
        state := cluster.raftNode.State()
1✔
220
        if state != raft.Leader {
1✔
221
                // I am not leader, forward request to leader
×
222
                leaderConn, err := cluster.BorrowLeaderClient()
×
223
                if err != nil {
×
224
                        return protocol.MakeErrReply(err.Error())
×
225
                }
×
226
                defer cluster.connections.ReturnPeerClient(leaderConn)
×
227
                return leaderConn.Send(cmdLine)
×
228
        }
229
        taskId := string(cmdLine[1])
1✔
230
        logger.Infof("change route for migration %s", taskId)
1✔
231
        task := cluster.raftNode.FSM.GetMigratingTask(taskId)
1✔
232
        if task == nil {
1✔
233
                return protocol.MakeErrReply("ERR task not found")
×
234
        }
×
235
        logger.Infof("change route for migration %s, got task info", taskId)
1✔
236
        // propose
1✔
237
        logIndex, err := cluster.raftNode.Propose(&raft.LogEntry{
1✔
238
                Event:         raft.EventFinishMigrate,
1✔
239
                MigratingTask: task,
1✔
240
        })
1✔
241
        if err != nil {
1✔
242
                return protocol.MakeErrReply("ERR " + err.Error())
×
243
        }
×
244
        logger.Infof("change route for migration %s, raft proposed", taskId)
1✔
245

1✔
246
        // confirm the 2 related node committed this log
1✔
247
        err = cluster.waitCommitted(task.SrcNode, logIndex)
1✔
248
        if err != nil {
1✔
249
                return protocol.MakeErrReply("ERR " + err.Error())
×
250
        }
×
251
        logger.Infof("change route for migration %s, confirm source node finished", taskId)
1✔
252

1✔
253
        err = cluster.waitCommitted(task.TargetNode, logIndex)
1✔
254
        if err != nil {
1✔
255
                return protocol.MakeErrReply("ERR " + err.Error())
×
256
        }
×
257
        logger.Infof("change route for migration %s, confirm target node finished", taskId)
1✔
258

1✔
259
        return protocol.MakeOkReply()
1✔
260
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc