• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HDT3213 / godis / 16088210384

05 Jul 2025 12:40PM UTC coverage: 72.663% (+0.3%) from 72.402%
16088210384

push

github

HDT3213
test and protect edge case

3 of 3 new or added lines in 1 file covered. (100.0%)

114 existing lines in 2 files now uncovered.

8668 of 11929 relevant lines covered (72.66%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.59
/database/server.go
1
package database
2

3
import (
4
        "fmt"
5
        "os"
6
        "runtime/debug"
7
        "strconv"
8
        "strings"
9
        "sync/atomic"
10
        "time"
11

12
        "github.com/hdt3213/godis/aof"
13
        "github.com/hdt3213/godis/config"
14
        "github.com/hdt3213/godis/interface/database"
15
        "github.com/hdt3213/godis/interface/redis"
16
        "github.com/hdt3213/godis/lib/logger"
17
        "github.com/hdt3213/godis/lib/utils"
18
        "github.com/hdt3213/godis/pubsub"
19
        "github.com/hdt3213/godis/redis/protocol"
20
)
21

22
var godisVersion = "1.2.8" // do not modify
23

24
// Server is a redis-server with full capabilities including multiple database, rdb loader, replication
25
type Server struct {
26
        dbSet []*atomic.Value // *DB
27

28
        // handle publish/subscribe
29
        hub *pubsub.Hub
30
        // handle aof persistence
31
        persister *aof.Persister
32

33
        // for replication
34
        role         int32
35
        slaveStatus  *slaveStatus
36
        masterStatus *masterStatus
37

38
        // hooks
39
        insertCallback database.KeyEventCallback
40
        deleteCallback database.KeyEventCallback
41

42
        // slow log record
43
        slogLogger *SlowLogger
44
}
45

46
func fileExists(filename string) bool {
1✔
47
        info, err := os.Stat(filename)
1✔
48
        return err == nil && !info.IsDir()
1✔
49
}
1✔
50

51
// NewStandaloneServer creates a standalone redis server, with multi database and all other funtions
52
func NewStandaloneServer() *Server {
1✔
53
        server := &Server{}
1✔
54
        if config.Properties.Databases == 0 {
2✔
55
                config.Properties.Databases = 16
1✔
56
        }
1✔
57
        // creat tmp dir
58
        err := os.MkdirAll(config.GetTmpDir(), os.ModePerm)
1✔
59
        if err != nil {
1✔
UNCOV
60
                panic(fmt.Errorf("create tmp dir failed: %v", err))
×
61
        }
62
        // make db set
63
        server.dbSet = make([]*atomic.Value, config.Properties.Databases)
1✔
64
        for i := range server.dbSet {
2✔
65
                singleDB := makeDB()
1✔
66
                singleDB.index = i
1✔
67
                holder := &atomic.Value{}
1✔
68
                holder.Store(singleDB)
1✔
69
                server.dbSet[i] = holder
1✔
70
        }
1✔
71
        server.hub = pubsub.MakeHub()
1✔
72
        // record aof
1✔
73
        validAof := false
1✔
74
        if config.Properties.AppendOnly {
2✔
75
                validAof = fileExists(config.Properties.AppendFilename)
1✔
76
                aofHandler, err := NewPersister(server,
1✔
77
                        config.Properties.AppendFilename, true, config.Properties.AppendFsync)
1✔
78
                if err != nil {
1✔
UNCOV
79
                        panic(err)
×
80
                }
81
                server.bindPersister(aofHandler)
1✔
82
        }
83
        if config.Properties.RDBFilename != "" && !validAof {
2✔
84
                // load rdb
1✔
85
                err := server.loadRdbFile()
1✔
86
                if err != nil {
2✔
87
                        logger.Error(err)
1✔
88
                }
1✔
89
        }
90
        server.slaveStatus = initReplSlaveStatus()
1✔
91
        server.initMasterStatus()
1✔
92
        server.startReplCron()
1✔
93
        server.role = masterRole // The initialization process does not require atomicity
1✔
94

1✔
95
        // record slow log
1✔
96
        server.slogLogger = NewSlowLogger(config.Properties.SlowLogMaxLen, config.Properties.SlowLogSlowerThan)
1✔
97

1✔
98
        return server
1✔
99
}
100

101
// Exec executes command
102
// parameter `cmdLine` contains command and its arguments, for example: "set key value"
103
func (server *Server) Exec(c redis.Connection, cmdLine [][]byte) (result redis.Reply) {
1✔
104
        defer func() {
2✔
105
                if err := recover(); err != nil {
1✔
UNCOV
106
                        logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))
×
UNCOV
107
                        result = &protocol.UnknownErrReply{}
×
UNCOV
108
                }
×
109
        }()
110
        // Record the start time of command execution
111
        GodisExecCommandStartUnixTime := time.Now()
1✔
112

1✔
113
        cmdName := strings.ToLower(string(cmdLine[0]))
1✔
114
        // ping
1✔
115
        if cmdName == "ping" {
2✔
116
                return Ping(c, cmdLine[1:])
1✔
117
        }
1✔
118
        // authenticate
119
        if cmdName == "auth" {
2✔
120
                return Auth(c, cmdLine[1:])
1✔
121
        }
1✔
122
        if !isAuthenticated(c) {
2✔
123
                return protocol.MakeErrReply("NOAUTH Authentication required")
1✔
124
        }
1✔
125
        // info
126
        if cmdName == "info" {
2✔
127
                return Info(server, cmdLine[1:])
1✔
128
        }
1✔
129

130
        // slowlog
131
        if cmdName == "slowlog" {
1✔
UNCOV
132
                return server.slogLogger.HandleSlowlogCommand(cmdLine)
×
UNCOV
133
        }
×
134

135
        if cmdName == "dbsize" {
2✔
136
                return DbSize(c, server)
1✔
137
        }
1✔
138
        if cmdName == "slaveof" {
2✔
139
                if c != nil && c.InMultiState() {
1✔
140
                        return protocol.MakeErrReply("cannot use slave of database within multi")
×
141
                }
×
142
                if len(cmdLine) != 3 {
1✔
UNCOV
143
                        return protocol.MakeArgNumErrReply("SLAVEOF")
×
UNCOV
144
                }
×
145
                return server.execSlaveOf(c, cmdLine[1:])
1✔
146
        } else if cmdName == "command" {
2✔
147
                return execCommand(cmdLine[1:])
1✔
148
        }
1✔
149

150
        // read only slave
151
        role := atomic.LoadInt32(&server.role)
1✔
152
        if role == slaveRole && !c.IsMaster() {
2✔
153
                // only allow read only command, forbid all special commands except `auth` and `slaveof`
1✔
154
                if !isReadOnlyCommand(cmdName) {
1✔
155
                        return protocol.MakeErrReply("READONLY You can't write against a read only slave.")
×
156
                }
×
157
        }
158

159
        // special commands which cannot execute within transaction
160
        if cmdName == "subscribe" {
1✔
UNCOV
161
                if len(cmdLine) < 2 {
×
162
                        return protocol.MakeArgNumErrReply("subscribe")
×
163
                }
×
UNCOV
164
                return pubsub.Subscribe(server.hub, c, cmdLine[1:])
×
165
        } else if cmdName == "publish" {
1✔
UNCOV
166
                return pubsub.Publish(server.hub, cmdLine[1:])
×
167
        } else if cmdName == "unsubscribe" {
1✔
UNCOV
168
                return pubsub.UnSubscribe(server.hub, c, cmdLine[1:])
×
169
        } else if cmdName == "bgrewriteaof" {
1✔
170
                if !config.Properties.AppendOnly {
×
UNCOV
171
                        return protocol.MakeErrReply("AppendOnly is false, you can't rewrite aof file")
×
172
                }
×
173
                // aof.go imports router.go, router.go cannot import BGRewriteAOF from aof.go
UNCOV
174
                return BGRewriteAOF(server, cmdLine[1:])
×
175
        } else if cmdName == "rewriteaof" {
2✔
176
                if !config.Properties.AppendOnly {
1✔
UNCOV
177
                        return protocol.MakeErrReply("AppendOnly is false, you can't rewrite aof file")
×
178
                }
×
179
                return RewriteAOF(server, cmdLine[1:])
1✔
180
        } else if cmdName == "flushall" {
2✔
181
                return server.flushAll()
1✔
182
        } else if cmdName == "flushdb" {
3✔
183
                if !validateArity(1, cmdLine) {
1✔
184
                        return protocol.MakeArgNumErrReply(cmdName)
×
185
                }
×
186
                if c.InMultiState() {
1✔
UNCOV
187
                        return protocol.MakeErrReply("ERR command 'FlushDB' cannot be used in MULTI")
×
UNCOV
188
                }
×
189
                return server.execFlushDB(c.GetDBIndex())
1✔
190
        } else if cmdName == "save" {
2✔
191
                return SaveRDB(server, cmdLine[1:])
1✔
192
        } else if cmdName == "bgsave" {
2✔
193
                return BGSaveRDB(server, cmdLine[1:])
×
194
        } else if cmdName == "select" {
2✔
195
                if c != nil && c.InMultiState() {
1✔
UNCOV
196
                        return protocol.MakeErrReply("cannot select database within multi")
×
UNCOV
197
                }
×
198
                if len(cmdLine) != 2 {
1✔
UNCOV
199
                        return protocol.MakeArgNumErrReply("select")
×
UNCOV
200
                }
×
201
                return execSelect(c, server, cmdLine[1:])
1✔
202
        } else if cmdName == "copy" {
2✔
203
                if len(cmdLine) < 3 {
1✔
204
                        return protocol.MakeArgNumErrReply("copy")
×
UNCOV
205
                }
×
206
                return execCopy(server, c, cmdLine[1:])
1✔
207
        } else if cmdName == "replconf" {
1✔
UNCOV
208
                return server.execReplConf(c, cmdLine[1:])
×
209
        } else if cmdName == "psync" {
2✔
210
                return server.execPSync(c, cmdLine[1:])
1✔
211
        }
1✔
212
        // todo: support multi database transaction
213

214
        // normal commands
215
        dbIndex := c.GetDBIndex()
1✔
216
        selectedDB, errReply := server.selectDB(dbIndex)
1✔
217
        if errReply != nil {
1✔
UNCOV
218
                return errReply
×
UNCOV
219
        }
×
220

221
        exec := selectedDB.Exec(c, cmdLine)
1✔
222
        // Record slow query logs
1✔
223
        server.slogLogger.Record(GodisExecCommandStartUnixTime, cmdLine, c.Name())
1✔
224
        return exec
1✔
225
}
226

227
// AfterClientClose does some clean after client close connection
UNCOV
228
func (server *Server) AfterClientClose(c redis.Connection) {
×
229
        pubsub.UnsubscribeAll(server.hub, c)
×
230
}
×
231

232
// Close graceful shutdown database
233
func (server *Server) Close() {
1✔
234
        // stop slaveStatus first
1✔
235
        server.slaveStatus.close()
1✔
236
        if server.persister != nil {
2✔
237
                server.persister.Close()
1✔
238
        }
1✔
239
        server.stopMaster()
1✔
240
}
241

242
func execSelect(c redis.Connection, mdb *Server, args [][]byte) redis.Reply {
1✔
243
        dbIndex, err := strconv.Atoi(string(args[0]))
1✔
244
        if err != nil {
1✔
245
                return protocol.MakeErrReply("ERR invalid DB index")
×
246
        }
×
247
        if dbIndex >= len(mdb.dbSet) || dbIndex < 0 {
1✔
UNCOV
248
                return protocol.MakeErrReply("ERR DB index is out of range")
×
UNCOV
249
        }
×
250
        c.SelectDB(dbIndex)
1✔
251
        return protocol.MakeOkReply()
1✔
252
}
253

254
func (server *Server) execFlushDB(dbIndex int) redis.Reply {
1✔
255
        if server.persister != nil {
2✔
256
                server.persister.SaveCmdLine(dbIndex, utils.ToCmdLine("FlushDB"))
1✔
257
        }
1✔
258
        return server.flushDB(dbIndex)
1✔
259
}
260

261
// flushDB flushes the selected database
262
func (server *Server) flushDB(dbIndex int) redis.Reply {
1✔
263
        if dbIndex >= len(server.dbSet) || dbIndex < 0 {
1✔
UNCOV
264
                return protocol.MakeErrReply("ERR DB index is out of range")
×
UNCOV
265
        }
×
266
        newDB := makeDB()
1✔
267
        server.loadDB(dbIndex, newDB)
1✔
268
        return &protocol.OkReply{}
1✔
269
}
270

271
func (server *Server) loadDB(dbIndex int, newDB *DB) redis.Reply {
1✔
272
        if dbIndex >= len(server.dbSet) || dbIndex < 0 {
1✔
UNCOV
273
                return protocol.MakeErrReply("ERR DB index is out of range")
×
UNCOV
274
        }
×
275
        oldDB := server.mustSelectDB(dbIndex)
1✔
276
        newDB.index = dbIndex
1✔
277
        newDB.addAof = oldDB.addAof // inherit oldDB
1✔
278
        server.dbSet[dbIndex].Store(newDB)
1✔
279
        return &protocol.OkReply{}
1✔
280
}
281

282
// flushAll flushes all databases.
283
func (server *Server) flushAll() redis.Reply {
1✔
284
        for i := range server.dbSet {
2✔
285
                server.flushDB(i)
1✔
286
        }
1✔
287
        if server.persister != nil {
1✔
UNCOV
288
                server.persister.SaveCmdLine(0, utils.ToCmdLine("FlushAll"))
×
UNCOV
289
        }
×
290
        return &protocol.OkReply{}
1✔
291
}
292

293
// selectDB returns the database with the given index, or an error if the index is out of range.
294
func (server *Server) selectDB(dbIndex int) (*DB, *protocol.StandardErrReply) {
1✔
295
        if dbIndex >= len(server.dbSet) || dbIndex < 0 {
1✔
UNCOV
296
                return nil, protocol.MakeErrReply("ERR DB index is out of range")
×
297
        }
×
298
        return server.dbSet[dbIndex].Load().(*DB), nil
1✔
299
}
300

301
// mustSelectDB is like selectDB, but panics if an error occurs.
302
func (server *Server) mustSelectDB(dbIndex int) *DB {
1✔
303
        selectedDB, err := server.selectDB(dbIndex)
1✔
304
        if err != nil {
1✔
305
                panic(err)
×
306
        }
307
        return selectedDB
1✔
308
}
309

310
// ForEach traverses all the keys in the given database
311
func (server *Server) ForEach(dbIndex int, cb func(key string, data *database.DataEntity, expiration *time.Time) bool) {
1✔
312
        server.mustSelectDB(dbIndex).ForEach(cb)
1✔
313
}
1✔
314

315
// GetEntity returns the data entity to the given key
316
func (server *Server) GetEntity(dbIndex int, key string) (*database.DataEntity, bool) {
×
UNCOV
317
        return server.mustSelectDB(dbIndex).GetEntity(key)
×
UNCOV
318
}
×
319

320
func (server *Server) GetExpiration(dbIndex int, key string) *time.Time {
×
321
        raw, ok := server.mustSelectDB(dbIndex).ttlMap.Get(key)
×
322
        if !ok {
×
UNCOV
323
                return nil
×
UNCOV
324
        }
×
325
        expireTime, _ := raw.(time.Time)
×
326
        return &expireTime
×
327
}
328

329
// ExecMulti executes multi commands transaction Atomically and Isolated
330
func (server *Server) ExecMulti(conn redis.Connection, watching map[string]uint32, cmdLines []CmdLine) redis.Reply {
×
331
        selectedDB, errReply := server.selectDB(conn.GetDBIndex())
×
332
        if errReply != nil {
×
UNCOV
333
                return errReply
×
UNCOV
334
        }
×
335
        return selectedDB.ExecMulti(conn, watching, cmdLines)
×
336
}
337

338
// RWLocks lock keys for writing and reading
339
func (server *Server) RWLocks(dbIndex int, writeKeys []string, readKeys []string) {
×
340
        server.mustSelectDB(dbIndex).RWLocks(writeKeys, readKeys)
×
UNCOV
341
}
×
342

343
// RWUnLocks unlock keys for writing and reading
344
func (server *Server) RWUnLocks(dbIndex int, writeKeys []string, readKeys []string) {
×
345
        server.mustSelectDB(dbIndex).RWUnLocks(writeKeys, readKeys)
×
346
}
×
347

348
// GetUndoLogs return rollback commands
UNCOV
349
func (server *Server) GetUndoLogs(dbIndex int, cmdLine [][]byte) []CmdLine {
×
UNCOV
350
        return server.mustSelectDB(dbIndex).GetUndoLogs(cmdLine)
×
UNCOV
351
}
×
352

353
// ExecWithLock executes normal commands, invoker should provide locks
354
func (server *Server) ExecWithLock(conn redis.Connection, cmdLine [][]byte) redis.Reply {
×
UNCOV
355
        db, errReply := server.selectDB(conn.GetDBIndex())
×
UNCOV
356
        if errReply != nil {
×
UNCOV
357
                return errReply
×
UNCOV
358
        }
×
UNCOV
359
        return db.execWithLock(cmdLine)
×
360
}
361

362
// BGRewriteAOF asynchronously rewrites Append-Only-File
UNCOV
363
func BGRewriteAOF(db *Server, args [][]byte) redis.Reply {
×
UNCOV
364
        go db.persister.Rewrite()
×
365
        return protocol.MakeStatusReply("Background append only file rewriting started")
×
366
}
×
367

368
// RewriteAOF start Append-Only-File rewriting and blocked until it finished
369
func RewriteAOF(db *Server, args [][]byte) redis.Reply {
1✔
370
        err := db.persister.Rewrite()
1✔
371
        if err != nil {
1✔
UNCOV
372
                return protocol.MakeErrReply(err.Error())
×
UNCOV
373
        }
×
374
        return protocol.MakeOkReply()
1✔
375
}
376

377
// SaveRDB start RDB writing and blocked until it finished
378
func SaveRDB(db *Server, args [][]byte) redis.Reply {
1✔
379
        if db.persister == nil {
1✔
380
                return protocol.MakeErrReply("please enable aof before using save")
×
381
        }
×
382
        rdbFilename := config.Properties.RDBFilename
1✔
383
        if rdbFilename == "" {
1✔
UNCOV
384
                rdbFilename = "dump.rdb"
×
385
        }
×
386
        err := db.persister.GenerateRDB(rdbFilename)
1✔
387
        if err != nil {
1✔
388
                return protocol.MakeErrReply(err.Error())
×
389
        }
×
390
        return protocol.MakeOkReply()
1✔
391
}
392

393
// BGSaveRDB asynchronously save RDB
394
func BGSaveRDB(db *Server, args [][]byte) redis.Reply {
×
UNCOV
395
        if db.persister == nil {
×
UNCOV
396
                return protocol.MakeErrReply("please enable aof before using save")
×
UNCOV
397
        }
×
UNCOV
398
        go func() {
×
UNCOV
399
                defer func() {
×
UNCOV
400
                        if err := recover(); err != nil {
×
UNCOV
401
                                logger.Error(err)
×
UNCOV
402
                        }
×
403
                }()
UNCOV
404
                rdbFilename := config.Properties.RDBFilename
×
UNCOV
405
                if rdbFilename == "" {
×
UNCOV
406
                        rdbFilename = "dump.rdb"
×
UNCOV
407
                }
×
UNCOV
408
                err := db.persister.GenerateRDB(rdbFilename)
×
UNCOV
409
                if err != nil {
×
UNCOV
410
                        logger.Error(err)
×
UNCOV
411
                }
×
412
        }()
UNCOV
413
        return protocol.MakeStatusReply("Background saving started")
×
414
}
415

416
// GetDBSize returns keys count and ttl key count
417
func (server *Server) GetDBSize(dbIndex int) (int, int) {
1✔
418
        db := server.mustSelectDB(dbIndex)
1✔
419
        return db.data.Len(), db.ttlMap.Len()
1✔
420
}
1✔
421

422
func (server *Server) startReplCron() {
1✔
423
        go func(mdb *Server) {
2✔
424
                ticker := time.Tick(time.Second * 10)
1✔
425
                for range ticker {
2✔
426
                        mdb.slaveCron()
1✔
427
                        mdb.masterCron()
1✔
428
                }
1✔
429
        }(server)
430
}
431

432
// GetAvgTTL Calculate the average expiration time of keys
433
func (server *Server) GetAvgTTL(dbIndex, randomKeyCount int) int64 {
×
434
        var ttlCount int64
×
435
        db := server.mustSelectDB(dbIndex)
×
436
        keys := db.data.RandomKeys(randomKeyCount)
×
437
        for _, k := range keys {
×
438
                t := time.Now()
×
UNCOV
439
                rawExpireTime, ok := db.ttlMap.Get(k)
×
UNCOV
440
                if !ok {
×
UNCOV
441
                        continue
×
442
                }
443
                expireTime, _ := rawExpireTime.(time.Time)
×
444
                // if the key has already reached its expiration time during calculation, ignore it
×
445
                if expireTime.Sub(t).Microseconds() > 0 {
×
446
                        ttlCount += expireTime.Sub(t).Microseconds()
×
447
                }
×
448
        }
UNCOV
449
        return ttlCount / int64(len(keys))
×
450
}
451

UNCOV
452
func (server *Server) SetKeyInsertedCallback(cb database.KeyEventCallback) {
×
UNCOV
453
        server.insertCallback = cb
×
UNCOV
454
        for i := range server.dbSet {
×
UNCOV
455
                db := server.mustSelectDB(i)
×
UNCOV
456
                db.insertCallback = cb
×
UNCOV
457
        }
×
458

459
}
460

UNCOV
461
func (server *Server) SetKeyDeletedCallback(cb database.KeyEventCallback) {
×
UNCOV
462
        server.deleteCallback = cb
×
UNCOV
463
        for i := range server.dbSet {
×
UNCOV
464
                db := server.mustSelectDB(i)
×
UNCOV
465
                db.deleteCallback = cb
×
UNCOV
466
        }
×
467
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc