• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

HDT3213 / godis / 15238419529

25 May 2025 01:32PM UTC coverage: 72.019% (-3.7%) from 75.704%
15238419529

push

github

HDT3213
update github actions go version

1 of 1 new or added line in 1 file covered. (100.0%)

1149 existing lines in 29 files now uncovered.

8473 of 11765 relevant lines covered (72.02%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.81
/database/replication_master.go
1
package database
2

3
import (
4
        "errors"
5
        "fmt"
6
        "io"
7
        "io/ioutil"
8
        "os"
9
        "strconv"
10
        "strings"
11
        "sync"
12
        "time"
13

14
        "github.com/hdt3213/godis/interface/redis"
15
        "github.com/hdt3213/godis/lib/logger"
16
        "github.com/hdt3213/godis/lib/sync/atomic"
17
        "github.com/hdt3213/godis/lib/utils"
18
        "github.com/hdt3213/godis/redis/protocol"
19
)
20

21
const (
22
        slaveStateHandShake = uint8(iota)
23
        slaveStateWaitSaveEnd
24
        slaveStateSendingRDB
25
        slaveStateOnline
26
)
27

28
const (
29
        bgSaveIdle = uint8(iota)
30
        bgSaveRunning
31
        bgSaveFinish
32
)
33

34
const (
35
        slaveCapacityNone = 0
36
        slaveCapacityEOF  = 1 << iota
37
        slaveCapacityPsync2
38
)
39

40
// slaveClient stores slave status in the view of master
41
type slaveClient struct {
42
        conn         redis.Connection
43
        state        uint8
44
        offset       int64
45
        lastAckTime  time.Time
46
        announceIp   string
47
        announcePort int
48
        capacity     uint8
49
}
50

51
// aofListener is currently only responsible for updating the backlog
52
type replBacklog struct {
53
        buf           []byte
54
        beginOffset   int64
55
        currentOffset int64
56
}
57

58
func (backlog *replBacklog) appendBytes(bin []byte) {
1✔
59
        backlog.buf = append(backlog.buf, bin...)
1✔
60
        backlog.currentOffset += int64(len(bin))
1✔
61
}
1✔
62

63
func (backlog *replBacklog) getSnapshot() ([]byte, int64) {
1✔
64
        return backlog.buf[:], backlog.currentOffset
1✔
65
}
1✔
66

67
func (backlog *replBacklog) getSnapshotAfter(beginOffset int64) ([]byte, int64) {
1✔
68
        beg := beginOffset - backlog.beginOffset
1✔
69
        return backlog.buf[beg:], backlog.currentOffset
1✔
70
}
1✔
71

72
func (backlog *replBacklog) isValidOffset(offset int64) bool {
1✔
73
        return offset >= backlog.beginOffset && offset < backlog.currentOffset
1✔
74
}
1✔
75

76
type masterStatus struct {
77
        mu           sync.RWMutex
78
        replId       string
79
        backlog      *replBacklog
80
        slaveMap     map[redis.Connection]*slaveClient
81
        waitSlaves   map[*slaveClient]struct{}
82
        onlineSlaves map[*slaveClient]struct{}
83
        bgSaveState  uint8
84
        rdbFilename  string
85
        aofListener  *replAofListener
86
        rewriting    atomic.Boolean
87
}
88

89
// bgSaveForReplication does bg-save and send rdb to waiting slaves
90
func (server *Server) bgSaveForReplication() {
1✔
91
        go func() {
2✔
92
                defer func() {
2✔
93
                        if e := recover(); e != nil {
1✔
UNCOV
94
                                logger.Errorf("panic: %v", e)
×
UNCOV
95
                        }
×
96
                }()
97
                if err := server.saveForReplication(); err != nil {
1✔
UNCOV
98
                        logger.Errorf("save for replication error: %v", err)
×
UNCOV
99
                }
×
100
        }()
101

102
}
103

104
// saveForReplication does bg-save and send rdb to waiting slaves
105
func (server *Server) saveForReplication() error {
1✔
106
        rdbFile, err := ioutil.TempFile("", "*.rdb")
1✔
107
        if err != nil {
1✔
UNCOV
108
                return fmt.Errorf("create temp rdb failed: %v", err)
×
UNCOV
109
        }
×
110
        rdbFilename := rdbFile.Name()
1✔
111
        server.masterStatus.mu.Lock()
1✔
112
        server.masterStatus.bgSaveState = bgSaveRunning
1✔
113
        server.masterStatus.rdbFilename = rdbFilename // todo: can reuse config.Properties.RDBFilename?
1✔
114
        aofListener := &replAofListener{
1✔
115
                mdb:     server,
1✔
116
                backlog: server.masterStatus.backlog,
1✔
117
        }
1✔
118
        server.masterStatus.aofListener = aofListener
1✔
119
        server.masterStatus.mu.Unlock()
1✔
120

1✔
121
        err = server.persister.GenerateRDBForReplication(rdbFilename, aofListener, nil)
1✔
122
        if err != nil {
1✔
UNCOV
123
                return err
×
UNCOV
124
        }
×
125
        aofListener.readyToSend = true
1✔
126

1✔
127
        // change bgSaveState and get waitSlaves for sending
1✔
128
        waitSlaves := make(map[*slaveClient]struct{})
1✔
129
        server.masterStatus.mu.Lock()
1✔
130
        server.masterStatus.bgSaveState = bgSaveFinish
1✔
131
        for slave := range server.masterStatus.waitSlaves {
2✔
132
                waitSlaves[slave] = struct{}{}
1✔
133
        }
1✔
134
        server.masterStatus.waitSlaves = nil
1✔
135
        server.masterStatus.mu.Unlock()
1✔
136

1✔
137
        // send rdb to waiting slaves
1✔
138
        for slave := range waitSlaves {
2✔
139
                err = server.masterFullReSyncWithSlave(slave)
1✔
140
                if err != nil {
1✔
UNCOV
141
                        server.removeSlave(slave)
×
UNCOV
142
                        logger.Errorf("masterFullReSyncWithSlave error: %v", err)
×
UNCOV
143
                        continue
×
144
                }
145
        }
146
        return nil
1✔
147
}
148

149
func (server *Server) rewriteRDB() error {
1✔
150
        rdbFile, err := ioutil.TempFile("", "*.rdb")
1✔
151
        if err != nil {
1✔
UNCOV
152
                return fmt.Errorf("create temp rdb failed: %v", err)
×
UNCOV
153
        }
×
154
        rdbFilename := rdbFile.Name()
1✔
155
        newBacklog := &replBacklog{}
1✔
156
        aofListener := &replAofListener{
1✔
157
                backlog: newBacklog,
1✔
158
                mdb:     server,
1✔
159
        }
1✔
160
        hook := func() {
2✔
161
                // pausing aof first, then lock masterStatus.
1✔
162
                // use the same order as replAofListener to avoid dead lock
1✔
163
                server.masterStatus.mu.Lock()
1✔
164
                defer server.masterStatus.mu.Unlock()
1✔
165
                newBacklog.beginOffset = server.masterStatus.backlog.currentOffset
1✔
166
        }
1✔
167
        err = server.persister.GenerateRDBForReplication(rdbFilename, aofListener, hook)
1✔
168
        if err != nil { // wait rdb result
1✔
UNCOV
169
                return err
×
UNCOV
170
        }
×
171
        server.masterStatus.mu.Lock()
1✔
172
        server.masterStatus.rdbFilename = rdbFilename
1✔
173
        server.masterStatus.backlog = newBacklog
1✔
174
        server.persister.RemoveListener(server.masterStatus.aofListener)
1✔
175
        server.masterStatus.aofListener = aofListener
1✔
176
        server.masterStatus.mu.Unlock()
1✔
177
        // It is ok to know that new backlog is ready later, so we change readyToSend without sync
1✔
178
        // But setting readyToSend=true must after new backlog is really ready (that means master.mu.Unlock)
1✔
179
        aofListener.readyToSend = true
1✔
180
        return nil
1✔
181
}
182

183
// masterFullReSyncWithSlave send replication header, rdb file and all backlogs to slave
184
func (server *Server) masterFullReSyncWithSlave(slave *slaveClient) error {
1✔
185
        // write replication header
1✔
186
        header := "+FULLRESYNC " + server.masterStatus.replId + " " +
1✔
187
                strconv.FormatInt(server.masterStatus.backlog.beginOffset, 10) + protocol.CRLF
1✔
188
        _, err := slave.conn.Write([]byte(header))
1✔
189
        if err != nil {
1✔
UNCOV
190
                return fmt.Errorf("write replication header to slave failed: %v", err)
×
UNCOV
191
        }
×
192
        // send rdb
193
        rdbFile, err := os.Open(server.masterStatus.rdbFilename)
1✔
194
        if err != nil {
1✔
UNCOV
195
                return fmt.Errorf("open rdb file %s for replication error: %v", server.masterStatus.rdbFilename, err)
×
UNCOV
196
        }
×
197
        slave.state = slaveStateSendingRDB
1✔
198
        rdbInfo, _ := os.Stat(server.masterStatus.rdbFilename)
1✔
199
        rdbSize := rdbInfo.Size()
1✔
200
        rdbHeader := "$" + strconv.FormatInt(rdbSize, 10) + protocol.CRLF
1✔
201
        _, err = slave.conn.Write([]byte(rdbHeader))
1✔
202
        if err != nil {
1✔
UNCOV
203
                return fmt.Errorf("write rdb header to slave failed: %v", err)
×
204
        }
×
205
        _, err = io.Copy(slave.conn, rdbFile)
1✔
206
        if err != nil {
1✔
UNCOV
207
                return fmt.Errorf("write rdb file to slave failed: %v", err)
×
UNCOV
208
        }
×
209

210
        // send backlog
211
        server.masterStatus.mu.RLock()
1✔
212
        backlog, currentOffset := server.masterStatus.backlog.getSnapshot()
1✔
213
        server.masterStatus.mu.RUnlock()
1✔
214
        _, err = slave.conn.Write(backlog)
1✔
215
        if err != nil {
1✔
UNCOV
216
                return fmt.Errorf("full resync write backlog to slave failed: %v", err)
×
UNCOV
217
        }
×
218

219
        // set slave as online
220
        server.setSlaveOnline(slave, currentOffset)
1✔
221
        return nil
1✔
222
}
223

224
var cannotPartialSync = errors.New("cannot do partial sync")
225

226
func (server *Server) masterTryPartialSyncWithSlave(slave *slaveClient, replId string, slaveOffset int64) error {
1✔
227
        server.masterStatus.mu.RLock()
1✔
228
        if replId != server.masterStatus.replId {
1✔
UNCOV
229
                server.masterStatus.mu.RUnlock()
×
230
                return cannotPartialSync
×
231
        }
×
232
        if !server.masterStatus.backlog.isValidOffset(slaveOffset) {
1✔
UNCOV
233
                server.masterStatus.mu.RUnlock()
×
UNCOV
234
                return cannotPartialSync
×
UNCOV
235
        }
×
236
        backlog, currentOffset := server.masterStatus.backlog.getSnapshotAfter(slaveOffset)
1✔
237
        server.masterStatus.mu.RUnlock()
1✔
238

1✔
239
        // send replication header
1✔
240
        header := "+CONTINUE " + server.masterStatus.replId + protocol.CRLF
1✔
241
        _, err := slave.conn.Write([]byte(header))
1✔
242
        if err != nil {
1✔
UNCOV
243
                return fmt.Errorf("write replication header to slave failed: %v", err)
×
UNCOV
244
        }
×
245
        // send backlog
246
        _, err = slave.conn.Write(backlog)
1✔
247
        if err != nil {
1✔
UNCOV
248
                return fmt.Errorf("partial resync write backlog to slave failed: %v", err)
×
UNCOV
249
        }
×
250

251
        // set slave online
252
        server.setSlaveOnline(slave, currentOffset)
1✔
253
        return nil
1✔
254
}
255

256
// masterSendUpdatesToSlave only sends data to online slaves after bgSave is finished
257
// if bgSave is running, updates will be sent after the saving finished
258
func (server *Server) masterSendUpdatesToSlave() error {
1✔
259
        onlineSlaves := make(map[*slaveClient]struct{})
1✔
260
        server.masterStatus.mu.RLock()
1✔
261
        beginOffset := server.masterStatus.backlog.beginOffset
1✔
262
        backlog, currentOffset := server.masterStatus.backlog.getSnapshot()
1✔
263
        for slave := range server.masterStatus.onlineSlaves {
2✔
264
                onlineSlaves[slave] = struct{}{}
1✔
265
        }
1✔
266
        server.masterStatus.mu.RUnlock()
1✔
267
        for slave := range onlineSlaves {
2✔
268
                slaveBeginOffset := slave.offset - beginOffset
1✔
269
                _, err := slave.conn.Write(backlog[slaveBeginOffset:])
1✔
270
                if err != nil {
2✔
271
                        logger.Errorf("send updates backlog to slave failed: %v", err)
1✔
272
                        server.removeSlave(slave)
1✔
273
                        continue
1✔
274
                }
275
                slave.offset = currentOffset
1✔
276
        }
277
        return nil
1✔
278
}
279

280
func (server *Server) execPSync(c redis.Connection, args [][]byte) redis.Reply {
1✔
281
        replId := string(args[0])
1✔
282
        replOffset, err := strconv.ParseInt(string(args[1]), 10, 64)
1✔
283
        if err != nil {
1✔
UNCOV
284
                return protocol.MakeErrReply("ERR value is not an integer or out of range")
×
UNCOV
285
        }
×
286
        server.masterStatus.mu.Lock()
1✔
287
        defer server.masterStatus.mu.Unlock()
1✔
288
        slave := server.masterStatus.slaveMap[c]
1✔
289
        if slave == nil {
2✔
290
                slave = &slaveClient{
1✔
291
                        conn: c,
1✔
292
                }
1✔
293
                c.SetSlave()
1✔
294
                server.masterStatus.slaveMap[c] = slave
1✔
295
        }
1✔
296
        if server.masterStatus.bgSaveState == bgSaveIdle {
2✔
297
                slave.state = slaveStateWaitSaveEnd
1✔
298
                server.masterStatus.waitSlaves[slave] = struct{}{}
1✔
299
                server.bgSaveForReplication()
1✔
300
        } else if server.masterStatus.bgSaveState == bgSaveRunning {
2✔
UNCOV
301
                slave.state = slaveStateWaitSaveEnd
×
UNCOV
302
                server.masterStatus.waitSlaves[slave] = struct{}{}
×
303
        } else if server.masterStatus.bgSaveState == bgSaveFinish {
2✔
304
                go func() {
2✔
305
                        defer func() {
2✔
306
                                if e := recover(); e != nil {
1✔
UNCOV
307
                                        logger.Errorf("panic: %v", e)
×
UNCOV
308
                                }
×
309
                        }()
310
                        err := server.masterTryPartialSyncWithSlave(slave, replId, replOffset)
1✔
311
                        if err == nil {
2✔
312
                                return
1✔
313
                        }
1✔
314
                        if err != cannotPartialSync {
×
315
                                server.removeSlave(slave)
×
UNCOV
316
                                logger.Errorf("masterTryPartialSyncWithSlave error: %v", err)
×
317
                                return
×
318
                        }
×
319
                        // assert err == cannotPartialSync
320
                        if err := server.masterFullReSyncWithSlave(slave); err != nil {
×
321
                                server.removeSlave(slave)
×
UNCOV
322
                                logger.Errorf("masterFullReSyncWithSlave error: %v", err)
×
UNCOV
323
                                return
×
UNCOV
324
                        }
×
325
                }()
326
        }
327
        return &protocol.NoReply{}
1✔
328
}
329

330
func (server *Server) execReplConf(c redis.Connection, args [][]byte) redis.Reply {
×
331
        if len(args)%2 != 0 {
×
332
                return protocol.MakeSyntaxErrReply()
×
333
        }
×
334
        server.masterStatus.mu.RLock()
×
335
        slave := server.masterStatus.slaveMap[c]
×
336
        server.masterStatus.mu.RUnlock()
×
337
        for i := 0; i < len(args); i += 2 {
×
338
                key := strings.ToLower(string(args[i]))
×
339
                value := string(args[i+1])
×
340
                switch key {
×
341
                case "ack":
×
342
                        offset, err := strconv.ParseInt(value, 10, 64)
×
343
                        if err != nil {
×
344
                                return protocol.MakeErrReply("ERR value is not an integer or out of range")
×
345
                        }
×
UNCOV
346
                        slave.offset = offset
×
UNCOV
347
                        slave.lastAckTime = time.Now()
×
348
                        return &protocol.NoReply{}
×
349
                }
350
        }
UNCOV
351
        return protocol.MakeOkReply()
×
352
}
353

354
func (server *Server) removeSlave(slave *slaveClient) {
1✔
355
        server.masterStatus.mu.Lock()
1✔
356
        defer server.masterStatus.mu.Unlock()
1✔
357
        _ = slave.conn.Close()
1✔
358
        delete(server.masterStatus.slaveMap, slave.conn)
1✔
359
        delete(server.masterStatus.waitSlaves, slave)
1✔
360
        delete(server.masterStatus.onlineSlaves, slave)
1✔
361
        logger.Info("disconnect with slave " + slave.conn.Name())
1✔
362
}
1✔
363

364
func (server *Server) setSlaveOnline(slave *slaveClient, currentOffset int64) {
1✔
365
        server.masterStatus.mu.Lock()
1✔
366
        defer server.masterStatus.mu.Unlock()
1✔
367
        slave.state = slaveStateOnline
1✔
368
        slave.offset = currentOffset
1✔
369
        server.masterStatus.onlineSlaves[slave] = struct{}{}
1✔
370
}
1✔
371

372
var pingBytes = protocol.MakeMultiBulkReply(utils.ToCmdLine("ping")).ToBytes()
373

374
const maxBacklogSize = 10 * 1024 * 1024 // 10MB
375

376
func (server *Server) masterCron() {
1✔
377
        server.masterStatus.mu.Lock()
1✔
378
        if len(server.masterStatus.slaveMap) == 0 { // no slaves, do nothing
2✔
379
                server.masterStatus.mu.Unlock()
1✔
380
                return
1✔
381
        }
1✔
382
        if server.masterStatus.bgSaveState == bgSaveFinish {
2✔
383
                server.masterStatus.backlog.appendBytes(pingBytes)
1✔
384
        }
1✔
385
        backlogSize := len(server.masterStatus.backlog.buf)
1✔
386
        server.masterStatus.mu.Unlock()
1✔
387
        if err := server.masterSendUpdatesToSlave(); err != nil {
1✔
388
                logger.Errorf("masterSendUpdatesToSlave error: %v", err)
×
389
        }
×
390
        if backlogSize > maxBacklogSize && !server.masterStatus.rewriting.Get() {
1✔
391
                go func() {
×
392
                        server.masterStatus.rewriting.Set(true)
×
393
                        defer server.masterStatus.rewriting.Set(false)
×
UNCOV
394
                        if err := server.rewriteRDB(); err != nil {
×
UNCOV
395
                                server.masterStatus.rewriting.Set(false)
×
UNCOV
396
                                logger.Errorf("rewrite error: %v", err)
×
UNCOV
397
                        }
×
398
                }()
399
        }
400
}
401

402
// replAofListener is an implementation for aof.Listener
403
type replAofListener struct {
404
        mdb         *Server
405
        backlog     *replBacklog // may NOT be mdb.masterStatus.backlog
406
        readyToSend bool
407
}
408

409
func (listener *replAofListener) Callback(cmdLines []CmdLine) {
1✔
410
        listener.mdb.masterStatus.mu.Lock()
1✔
411
        for _, cmdLine := range cmdLines {
2✔
412
                reply := protocol.MakeMultiBulkReply(cmdLine)
1✔
413
                listener.backlog.appendBytes(reply.ToBytes())
1✔
414
        }
1✔
415
        listener.mdb.masterStatus.mu.Unlock()
1✔
416
        // listener could receive updates generated during rdb saving in progress
1✔
417
        // Do not send updates to slave before rdb saving is finished
1✔
418
        if listener.readyToSend {
2✔
419
                if err := listener.mdb.masterSendUpdatesToSlave(); err != nil {
1✔
UNCOV
420
                        logger.Errorf("masterSendUpdatesToSlave after receive aof error: %v", err)
×
UNCOV
421
                }
×
422
        }
423
}
424

425
func (server *Server) initMasterStatus() {
1✔
426
        server.masterStatus = &masterStatus{
1✔
427
                mu:           sync.RWMutex{},
1✔
428
                replId:       utils.RandHexString(40),
1✔
429
                backlog:      &replBacklog{},
1✔
430
                slaveMap:     make(map[redis.Connection]*slaveClient),
1✔
431
                waitSlaves:   make(map[*slaveClient]struct{}),
1✔
432
                onlineSlaves: make(map[*slaveClient]struct{}),
1✔
433
                bgSaveState:  bgSaveIdle,
1✔
434
                rdbFilename:  "",
1✔
435
        }
1✔
436
}
1✔
437

438
func (server *Server) stopMaster() {
1✔
439
        server.masterStatus.mu.Lock()
1✔
440
        defer server.masterStatus.mu.Unlock()
1✔
441

1✔
442
        // disconnect with slave
1✔
443
        for _, slave := range server.masterStatus.slaveMap {
1✔
444
                _ = slave.conn.Close()
×
UNCOV
445
                delete(server.masterStatus.slaveMap, slave.conn)
×
UNCOV
446
                delete(server.masterStatus.waitSlaves, slave)
×
UNCOV
447
                delete(server.masterStatus.onlineSlaves, slave)
×
UNCOV
448
        }
×
449

450
        // clean master status
451
        if server.persister != nil {
2✔
452
                server.persister.RemoveListener(server.masterStatus.aofListener)
1✔
453
        }
1✔
454
        _ = os.Remove(server.masterStatus.rdbFilename)
1✔
455
        server.masterStatus.rdbFilename = ""
1✔
456
        server.masterStatus.replId = ""
1✔
457
        server.masterStatus.backlog = &replBacklog{}
1✔
458
        server.masterStatus.slaveMap = make(map[redis.Connection]*slaveClient)
1✔
459
        server.masterStatus.waitSlaves = make(map[*slaveClient]struct{})
1✔
460
        server.masterStatus.onlineSlaves = make(map[*slaveClient]struct{})
1✔
461
        server.masterStatus.bgSaveState = bgSaveIdle
1✔
462
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc