• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HDT3213 / godis / 15238419529

25 May 2025 01:32PM UTC coverage: 72.019% (-3.7%) from 75.704%
15238419529

push

github

HDT3213
update github actions go version

1 of 1 new or added line in 1 file covered. (100.0%)

1149 existing lines in 29 files now uncovered.

8473 of 11765 relevant lines covered (72.02%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.52
/database/replication_slave.go
1
package database
2

3
import (
4
        "bytes"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "io/ioutil"
9
        "net"
10
        "os"
11
        "strconv"
12
        "strings"
13
        "sync"
14
        "sync/atomic"
15
        "time"
16

17
        "github.com/hdt3213/godis/aof"
18
        "github.com/hdt3213/godis/config"
19
        "github.com/hdt3213/godis/interface/redis"
20
        "github.com/hdt3213/godis/lib/logger"
21
        rdb "github.com/hdt3213/rdb/parser"
22
        "github.com/hdt3213/godis/lib/utils"
23
        "github.com/hdt3213/godis/redis/connection"
24
        "github.com/hdt3213/godis/redis/parser"
25
        "github.com/hdt3213/godis/redis/protocol"
26
)
27

28
const (
29
        masterRole = iota
30
        slaveRole
31
)
32

33
type slaveStatus struct {
34
        mutex  sync.Mutex
35
        ctx    context.Context
36
        cancel context.CancelFunc
37

38
        // configVersion stands for the version of slaveStatus config. Any change of master host/port will cause configVersion increment
39
        // If configVersion change has been found during slaveStatus current slaveStatus procedure will stop.
40
        // It is designed to abort a running slaveStatus procedure
41
        configVersion int32
42

43
        masterHost string
44
        masterPort int
45

46
        masterConn   net.Conn
47
        masterChan   <-chan *parser.Payload
48
        replId       string
49
        replOffset   int64
50
        lastRecvTime time.Time
51
        running      sync.WaitGroup
52
}
53

54
var configChangedErr = errors.New("slaveStatus config changed")
55

56
func initReplSlaveStatus() *slaveStatus {
1✔
57
        return &slaveStatus{}
1✔
58
}
1✔
59

60
func (server *Server) execSlaveOf(c redis.Connection, args [][]byte) redis.Reply {
1✔
61
        if strings.ToLower(string(args[0])) == "no" &&
1✔
62
                strings.ToLower(string(args[1])) == "one" {
2✔
63
                server.slaveOfNone()
1✔
64
                return protocol.MakeOkReply()
1✔
65
        }
1✔
66
        host := string(args[0])
1✔
67
        port, err := strconv.Atoi(string(args[1]))
1✔
68
        if err != nil {
1✔
69
                return protocol.MakeErrReply("ERR value is not an integer or out of range")
×
UNCOV
70
        }
×
71
        server.slaveStatus.mutex.Lock()
1✔
72
        atomic.StoreInt32(&server.role, slaveRole)
1✔
73
        server.slaveStatus.masterHost = host
1✔
74
        server.slaveStatus.masterPort = port
1✔
75
        atomic.AddInt32(&server.slaveStatus.configVersion, 1)
1✔
76
        server.slaveStatus.mutex.Unlock()
1✔
77
        go server.setupMaster()
1✔
78
        return protocol.MakeOkReply()
1✔
79
}
80

81
func (server *Server) slaveOfNone() {
1✔
82
        server.slaveStatus.mutex.Lock()
1✔
83
        defer server.slaveStatus.mutex.Unlock()
1✔
84
        server.slaveStatus.masterHost = ""
1✔
85
        server.slaveStatus.masterPort = 0
1✔
86
        server.slaveStatus.replId = ""
1✔
87
        server.slaveStatus.replOffset = -1
1✔
88
        server.slaveStatus.stopSlaveWithMutex()
1✔
89
        server.role = masterRole
1✔
90
}
1✔
91

92
// stopSlaveWithMutex stops in-progress connectWithMaster/fullSync/receiveAOF
93
// invoker should have slaveStatus mutex
94
func (repl *slaveStatus) stopSlaveWithMutex() {
1✔
95
        // update configVersion to stop connectWithMaster and fullSync
1✔
96
        atomic.AddInt32(&repl.configVersion, 1)
1✔
97
        // send cancel to receiveAOF
1✔
98
        if repl.cancel != nil {
2✔
99
                repl.cancel()
1✔
100
                repl.running.Wait()
1✔
101
        }
1✔
102
        repl.ctx = context.Background()
1✔
103
        repl.cancel = nil
1✔
104
        if repl.masterConn != nil {
2✔
105
                _ = repl.masterConn.Close() // parser.ParseStream will close masterChan
1✔
106
        }
1✔
107
        repl.masterConn = nil
1✔
108
        repl.masterChan = nil
1✔
109
}
110

111
func (repl *slaveStatus) close() error {
1✔
112
        repl.mutex.Lock()
1✔
113
        defer repl.mutex.Unlock()
1✔
114
        repl.stopSlaveWithMutex()
1✔
115
        return nil
1✔
116
}
1✔
117

118
// setupMaster connects to master and starts full sync
119
func (server *Server) setupMaster() {
1✔
120
        defer func() {
2✔
121
                if err := recover(); err != nil {
1✔
UNCOV
122
                        logger.Error(err)
×
UNCOV
123
                }
×
124
        }()
125
        var configVersion int32
1✔
126
        ctx, cancel := context.WithCancel(context.Background())
1✔
127
        server.slaveStatus.mutex.Lock()
1✔
128
        server.slaveStatus.ctx = ctx
1✔
129
        server.slaveStatus.cancel = cancel
1✔
130
        configVersion = server.slaveStatus.configVersion
1✔
131
        server.slaveStatus.mutex.Unlock()
1✔
132
        isFullReSync, err := server.connectWithMaster(configVersion)
1✔
133
        if err != nil {
1✔
134
                // connect failed, abort master
×
135
                logger.Error(err)
×
136
                server.slaveOfNone()
×
UNCOV
137
                return
×
UNCOV
138
        }
×
139
        if isFullReSync {
2✔
140
                err = server.loadMasterRDB(configVersion)
1✔
141
                if err != nil {
1✔
142
                        // load failed, abort master
×
143
                        logger.Error(err)
×
144
                        server.slaveOfNone()
×
UNCOV
145
                        return
×
UNCOV
146
                }
×
147
        }
148
        err = server.receiveAOF(ctx, configVersion)
1✔
149
        if err != nil {
2✔
150
                // full sync failed, abort
1✔
151
                logger.Error(err)
1✔
152
                return
1✔
153
        }
1✔
154
}
155

156
// connectWithMaster finishes handshake with master
157
// returns: isFullReSync, error
158
func (server *Server) connectWithMaster(configVersion int32) (bool, error) {
1✔
159
        addr := server.slaveStatus.masterHost + ":" + strconv.Itoa(server.slaveStatus.masterPort)
1✔
160
        conn, err := net.Dial("tcp", addr)
1✔
161
        if err != nil {
1✔
162
                server.slaveOfNone() // abort
×
UNCOV
163
                return false, errors.New("connect master failed " + err.Error())
×
UNCOV
164
        }
×
165
        masterChan := parser.ParseStream(conn)
1✔
166

1✔
167
        // ping
1✔
168
        pingCmdLine := utils.ToCmdLine("ping")
1✔
169
        pingReq := protocol.MakeMultiBulkReply(pingCmdLine)
1✔
170
        _, err = conn.Write(pingReq.ToBytes())
1✔
171
        if err != nil {
1✔
UNCOV
172
                return false, errors.New("send failed " + err.Error())
×
UNCOV
173
        }
×
174
        pingResp := <-masterChan
1✔
175
        if pingResp.Err != nil {
1✔
UNCOV
176
                return false, errors.New("read response failed: " + pingResp.Err.Error())
×
177
        }
×
178
        switch reply := pingResp.Data.(type) {
1✔
179
        case *protocol.StandardErrReply:
×
180
                if !strings.HasPrefix(reply.Error(), "NOAUTH") &&
×
181
                        !strings.HasPrefix(reply.Error(), "NOPERM") &&
×
182
                        !strings.HasPrefix(reply.Error(), "ERR operation not permitted") {
×
183
                        logger.Error("Error reply to PING from master: " + string(reply.ToBytes()))
×
184
                        server.slaveOfNone() // abort
×
UNCOV
185
                        return false, nil
×
UNCOV
186
                }
×
187
        }
188

189
        // just to reduce duplication of code
190
        sendCmdToMaster := func(conn net.Conn, cmdLine CmdLine, masterChan <-chan *parser.Payload) error {
2✔
191
                req := protocol.MakeMultiBulkReply(cmdLine)
1✔
192
                _, err := conn.Write(req.ToBytes())
1✔
193
                if err != nil {
1✔
194
                        server.slaveOfNone() // abort
×
UNCOV
195
                        return errors.New("send failed " + err.Error())
×
UNCOV
196
                }
×
197
                resp := <-masterChan
1✔
198
                if resp.Err != nil {
1✔
199
                        server.slaveOfNone() // abort
×
UNCOV
200
                        return errors.New("read response failed: " + resp.Err.Error())
×
201
                }
×
202
                if !protocol.IsOKReply(resp.Data) {
1✔
203
                        server.slaveOfNone() // abort
×
UNCOV
204
                        return errors.New("unexpected auth response: " + string(resp.Data.ToBytes()))
×
UNCOV
205
                }
×
206
                return nil
1✔
207
        }
208

209
        // auth
210
        if config.Properties.MasterAuth != "" {
1✔
211
                authCmdLine := utils.ToCmdLine("auth", config.Properties.MasterAuth)
×
212
                err = sendCmdToMaster(conn, authCmdLine, masterChan)
×
213
                if err != nil {
×
UNCOV
214
                        return false, err
×
UNCOV
215
                }
×
216
        }
217

218
        // announce port
219
        var port int
1✔
220
        if config.Properties.SlaveAnnouncePort != 0 {
1✔
UNCOV
221
                port = config.Properties.SlaveAnnouncePort
×
222
        } else {
1✔
223
                port = config.Properties.Port
1✔
224
        }
1✔
225
        portCmdLine := utils.ToCmdLine("REPLCONF", "listening-port", strconv.Itoa(port))
1✔
226
        err = sendCmdToMaster(conn, portCmdLine, masterChan)
1✔
227
        if err != nil {
1✔
UNCOV
228
                return false, err
×
UNCOV
229
        }
×
230

231
        // announce ip
232
        if config.Properties.SlaveAnnounceIP != "" {
1✔
233
                ipCmdLine := utils.ToCmdLine("REPLCONF", "ip-address", config.Properties.SlaveAnnounceIP)
×
234
                err = sendCmdToMaster(conn, ipCmdLine, masterChan)
×
235
                if err != nil {
×
UNCOV
236
                        return false, err
×
UNCOV
237
                }
×
238
        }
239

240
        // announce capacity
241
        capaCmdLine := utils.ToCmdLine("REPLCONF", "capa", "psync2")
1✔
242
        err = sendCmdToMaster(conn, capaCmdLine, masterChan)
1✔
243
        if err != nil {
1✔
UNCOV
244
                return false, err
×
UNCOV
245
        }
×
246

247
        // update connection
248
        server.slaveStatus.mutex.Lock()
1✔
249
        defer server.slaveStatus.mutex.Unlock()
1✔
250
        if server.slaveStatus.configVersion != configVersion {
1✔
251
                // slaveStatus conf changed during connecting and waiting mutex
×
UNCOV
252
                return false, configChangedErr
×
UNCOV
253
        }
×
254
        server.slaveStatus.masterConn = conn
1✔
255
        server.slaveStatus.masterChan = masterChan
1✔
256
        server.slaveStatus.lastRecvTime = time.Now()
1✔
257
        return server.psyncHandshake()
1✔
258
}
259

260
// psyncHandshake send `psync` to master and sync repl-id/offset with master
261
// invoker should provide with slaveStatus.mutex
262
func (server *Server) psyncHandshake() (bool, error) {
1✔
263
        replId := "?"
1✔
264
        var replOffset int64 = -1
1✔
265
        if server.slaveStatus.replId != "" {
2✔
266
                replId = server.slaveStatus.replId
1✔
267
                replOffset = server.slaveStatus.replOffset
1✔
268
        }
1✔
269
        psyncCmdLine := utils.ToCmdLine("psync", replId, strconv.FormatInt(replOffset, 10))
1✔
270
        psyncReq := protocol.MakeMultiBulkReply(psyncCmdLine)
1✔
271
        _, err := server.slaveStatus.masterConn.Write(psyncReq.ToBytes())
1✔
272
        if err != nil {
1✔
UNCOV
273
                return false, errors.New("send failed " + err.Error())
×
UNCOV
274
        }
×
275
        return server.parsePsyncHandshake()
1✔
276
}
277

278
func (server *Server) parsePsyncHandshake() (bool, error) {
1✔
279
        var err error
1✔
280
        psyncPayload := <-server.slaveStatus.masterChan
1✔
281
        if psyncPayload.Err != nil {
1✔
UNCOV
282
                return false, errors.New("read response failed: " + psyncPayload.Err.Error())
×
283
        }
×
284
        psyncHeader, ok := psyncPayload.Data.(*protocol.StatusReply)
1✔
285
        if !ok {
1✔
UNCOV
286
                return false, errors.New("illegal payload header not a status reply: " + string(psyncPayload.Data.ToBytes()))
×
UNCOV
287
        }
×
288
        headers := strings.Split(psyncHeader.Status, " ")
1✔
289
        if len(headers) != 3 && len(headers) != 2 {
1✔
UNCOV
290
                return false, errors.New("illegal payload header: " + psyncHeader.Status)
×
UNCOV
291
        }
×
292

293
        logger.Info("receive psync header from master")
1✔
294
        var isFullReSync bool
1✔
295
        if headers[0] == "FULLRESYNC" {
2✔
296
                logger.Info("full re-sync with master")
1✔
297
                server.slaveStatus.replId = headers[1]
1✔
298
                server.slaveStatus.replOffset, err = strconv.ParseInt(headers[2], 10, 64)
1✔
299
                isFullReSync = true
1✔
300
        } else if headers[0] == "CONTINUE" {
3✔
301
                logger.Info("continue partial sync")
1✔
302
                server.slaveStatus.replId = headers[1]
1✔
303
                isFullReSync = false
1✔
304
        } else {
1✔
UNCOV
305
                return false, errors.New("illegal psync resp: " + psyncHeader.Status)
×
UNCOV
306
        }
×
307

308
        if err != nil {
1✔
UNCOV
309
                return false, errors.New("get illegal repl offset: " + headers[2])
×
UNCOV
310
        }
×
311
        logger.Info(fmt.Sprintf("repl id: %s, current offset: %d", server.slaveStatus.replId, server.slaveStatus.replOffset))
1✔
312
        return isFullReSync, nil
1✔
313
}
314

315
func makeRdbLoader(upgradeAof bool) (*Server, string, error) {
1✔
316
        rdbLoader := MakeAuxiliaryServer()
1✔
317
        if !upgradeAof {
1✔
UNCOV
318
                return rdbLoader, "", nil
×
UNCOV
319
        }
×
320
        // make aof handler to generate new aof file during loading rdb
321
        newAofFile, err := ioutil.TempFile("", "*.aof")
1✔
322
        if err != nil {
1✔
UNCOV
323
                return nil, "", fmt.Errorf("create temp rdb failed: %v", err)
×
UNCOV
324
        }
×
325
        newAofFilename := newAofFile.Name()
1✔
326
        aofHandler, err := NewPersister(rdbLoader, newAofFilename, false, aof.FsyncNo)
1✔
327
        if err != nil {
1✔
UNCOV
328
                return nil, "", err
×
UNCOV
329
        }
×
330
        rdbLoader.bindPersister(aofHandler)
1✔
331
        return rdbLoader, newAofFilename, nil
1✔
332
}
333

334
// loadMasterRDB downloads rdb after handshake has been done
335
func (server *Server) loadMasterRDB(configVersion int32) error {
1✔
336
        rdbPayload := <-server.slaveStatus.masterChan
1✔
337
        if rdbPayload.Err != nil {
1✔
UNCOV
338
                return errors.New("read response failed: " + rdbPayload.Err.Error())
×
UNCOV
339
        }
×
340
        rdbReply, ok := rdbPayload.Data.(*protocol.BulkReply)
1✔
341
        if !ok {
1✔
UNCOV
342
                return errors.New("illegal payload header: " + string(rdbPayload.Data.ToBytes()))
×
343
        }
×
344

345
        logger.Info(fmt.Sprintf("receive %d bytes of rdb from master", len(rdbReply.Arg)))
1✔
346
        rdbDec := rdb.NewDecoder(bytes.NewReader(rdbReply.Arg))
1✔
347

1✔
348
        rdbLoader, newAofFilename, err := makeRdbLoader(config.Properties.AppendOnly)
1✔
349
        if err != nil {
1✔
UNCOV
350
                return err
×
UNCOV
351
        }
×
352
        err = rdbLoader.LoadRDB(rdbDec)
1✔
353
        if err != nil {
1✔
354
                return errors.New("dump rdb failed: " + err.Error())
×
355
        }
×
356

357
        server.slaveStatus.mutex.Lock()
1✔
358
        defer server.slaveStatus.mutex.Unlock()
1✔
359
        if server.slaveStatus.configVersion != configVersion {
1✔
UNCOV
360
                // slaveStatus conf changed during connecting and waiting mutex
×
UNCOV
361
                return configChangedErr
×
UNCOV
362
        }
×
363
        for i, h := range rdbLoader.dbSet {
2✔
364
                newDB := h.Load().(*DB)
1✔
365
                server.loadDB(i, newDB)
1✔
366
        }
1✔
367

368
        if config.Properties.AppendOnly {
2✔
369
                // use new aof file
1✔
370
                server.persister.Close()
1✔
371
                err = os.Rename(newAofFilename, config.Properties.AppendFilename)
1✔
372
                if err != nil {
1✔
UNCOV
373
                        return err
×
UNCOV
374
                }
×
375
                persister, err := NewPersister(server, config.Properties.AppendFilename, false, config.Properties.AppendFsync)
1✔
376
                if err != nil {
1✔
UNCOV
377
                        return err
×
UNCOV
378
                }
×
379
                server.bindPersister(persister)
1✔
380
        }
381

382
        return nil
1✔
383
}
384

385
func (server *Server) receiveAOF(ctx context.Context, configVersion int32) error {
1✔
386
        conn := connection.NewConn(server.slaveStatus.masterConn)
1✔
387
        conn.SetMaster()
1✔
388
        server.slaveStatus.running.Add(1)
1✔
389
        defer server.slaveStatus.running.Done()
1✔
390
        for {
2✔
391
                select {
1✔
392
                case payload, open := <-server.slaveStatus.masterChan:
1✔
393
                        if !open {
1✔
394
                                return errors.New("master channel unexpected close")
×
395
                        }
×
396
                        if payload.Err != nil {
2✔
397
                                return payload.Err
1✔
398
                        }
1✔
399
                        cmdLine, ok := payload.Data.(*protocol.MultiBulkReply)
1✔
400
                        if !ok {
1✔
UNCOV
401
                                return errors.New("unexpected payload: " + string(payload.Data.ToBytes()))
×
UNCOV
402
                        }
×
403
                        server.slaveStatus.mutex.Lock()
1✔
404
                        if server.slaveStatus.configVersion != configVersion {
1✔
UNCOV
405
                                // slaveStatus conf changed during connecting and waiting mutex
×
UNCOV
406
                                return configChangedErr
×
UNCOV
407
                        }
×
408
                        server.Exec(conn, cmdLine.Args)
1✔
409
                        n := len(cmdLine.ToBytes()) // todo: directly get size from socket
1✔
410
                        server.slaveStatus.replOffset += int64(n)
1✔
411
                        server.slaveStatus.lastRecvTime = time.Now()
1✔
412
                        logger.Info(fmt.Sprintf("receive %d bytes from master, current offset %d, %s",
1✔
413
                                n, server.slaveStatus.replOffset, strconv.Quote(string(cmdLine.ToBytes()))))
1✔
414
                        server.slaveStatus.mutex.Unlock()
1✔
415
                case <-ctx.Done():
1✔
416
                        _ = conn.Close()
1✔
417
                        return nil
1✔
418
                }
419
        }
420
}
421

422
func (server *Server) slaveCron() {
1✔
423
        repl := server.slaveStatus
1✔
424
        if repl.masterConn == nil {
2✔
425
                return
1✔
426
        }
1✔
427

428
        // check master timeout
429
        replTimeout := 60 * time.Second
1✔
430
        if config.Properties.ReplTimeout != 0 {
2✔
431
                replTimeout = time.Duration(config.Properties.ReplTimeout) * time.Second
1✔
432
        }
1✔
433
        minLastRecvTime := time.Now().Add(-replTimeout)
1✔
434
        if repl.lastRecvTime.Before(minLastRecvTime) {
2✔
435
                // reconnect with master
1✔
436
                err := server.reconnectWithMaster()
1✔
437
                if err != nil {
1✔
438
                        logger.Error("send failed " + err.Error())
×
439
                }
×
440
                return
1✔
441
        }
442
        // send ack to master
UNCOV
443
        err := repl.sendAck2Master()
×
UNCOV
444
        if err != nil {
×
UNCOV
445
                logger.Error("send failed " + err.Error())
×
UNCOV
446
        }
×
447
}
448

449
// Send a REPLCONF ACK command to the master to inform it about the current processed offset
450
func (repl *slaveStatus) sendAck2Master() error {
1✔
451
        psyncCmdLine := utils.ToCmdLine("REPLCONF", "ACK",
1✔
452
                strconv.FormatInt(repl.replOffset, 10))
1✔
453
        psyncReq := protocol.MakeMultiBulkReply(psyncCmdLine)
1✔
454
        _, err := repl.masterConn.Write(psyncReq.ToBytes())
1✔
455
        // logger.Info("send ack to master")
1✔
456
        return err
1✔
457
}
1✔
458

459
func (server *Server) reconnectWithMaster() error {
1✔
460
        logger.Info("reconnecting with master")
1✔
461
        server.slaveStatus.mutex.Lock()
1✔
462
        defer server.slaveStatus.mutex.Unlock()
1✔
463
        server.slaveStatus.stopSlaveWithMutex()
1✔
464
        go server.setupMaster()
1✔
465
        return nil
1✔
466
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc