• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HDT3213 / godis / 15238419529

25 May 2025 01:32PM UTC coverage: 72.019% (-3.7%) from 75.704%
15238419529

push

github

HDT3213
update github actions go version

1 of 1 new or added line in 1 file covered. (100.0%)

1149 existing lines in 29 files now uncovered.

8473 of 11765 relevant lines covered (72.02%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.52
/database/server.go
1
package database
2

3
import (
4
        "fmt"
5
        "os"
6
        "runtime/debug"
7
        "strconv"
8
        "strings"
9
        "sync/atomic"
10
        "time"
11

12
        "github.com/hdt3213/godis/aof"
13
        "github.com/hdt3213/godis/config"
14
        "github.com/hdt3213/godis/interface/database"
15
        "github.com/hdt3213/godis/interface/redis"
16
        "github.com/hdt3213/godis/lib/logger"
17
        "github.com/hdt3213/godis/lib/utils"
18
        "github.com/hdt3213/godis/pubsub"
19
        "github.com/hdt3213/godis/redis/protocol"
20
)
21

22
var godisVersion = "1.2.8" // do not modify
23

24
// Server is a redis-server with full capabilities including multiple database, rdb loader, replication
25
type Server struct {
26
        dbSet []*atomic.Value // *DB
27

28
        // handle publish/subscribe
29
        hub *pubsub.Hub
30
        // handle aof persistence
31
        persister *aof.Persister
32

33
        // for replication
34
        role         int32
35
        slaveStatus  *slaveStatus
36
        masterStatus *masterStatus
37

38
        // hooks
39
        insertCallback database.KeyEventCallback
40
        deleteCallback database.KeyEventCallback
41
}
42

43
func fileExists(filename string) bool {
1✔
44
        info, err := os.Stat(filename)
1✔
45
        return err == nil && !info.IsDir()
1✔
46
}
1✔
47

48
// NewStandaloneServer creates a standalone redis server, with multi database and all other funtions
49
func NewStandaloneServer() *Server {
1✔
50
        server := &Server{}
1✔
51
        if config.Properties.Databases == 0 {
2✔
52
                config.Properties.Databases = 16
1✔
53
        }
1✔
54
        // creat tmp dir
55
        err := os.MkdirAll(config.GetTmpDir(), os.ModePerm)
1✔
56
        if err != nil {
1✔
UNCOV
57
                panic(fmt.Errorf("create tmp dir failed: %v", err))
×
58
        }
59
        // make db set
60
        server.dbSet = make([]*atomic.Value, config.Properties.Databases)
1✔
61
        for i := range server.dbSet {
2✔
62
                singleDB := makeDB()
1✔
63
                singleDB.index = i
1✔
64
                holder := &atomic.Value{}
1✔
65
                holder.Store(singleDB)
1✔
66
                server.dbSet[i] = holder
1✔
67
        }
1✔
68
        server.hub = pubsub.MakeHub()
1✔
69
        // record aof
1✔
70
        validAof := false
1✔
71
        if config.Properties.AppendOnly {
2✔
72
                validAof = fileExists(config.Properties.AppendFilename)
1✔
73
                aofHandler, err := NewPersister(server,
1✔
74
                        config.Properties.AppendFilename, true, config.Properties.AppendFsync)
1✔
75
                if err != nil {
1✔
UNCOV
76
                        panic(err)
×
77
                }
78
                server.bindPersister(aofHandler)
1✔
79
        }
80
        if config.Properties.RDBFilename != "" && !validAof {
2✔
81
                // load rdb
1✔
82
                err := server.loadRdbFile()
1✔
83
                if err != nil {
2✔
84
                        logger.Error(err)
1✔
85
                }
1✔
86
        }
87
        server.slaveStatus = initReplSlaveStatus()
1✔
88
        server.initMasterStatus()
1✔
89
        server.startReplCron()
1✔
90
        server.role = masterRole // The initialization process does not require atomicity
1✔
91
        return server
1✔
92
}
93

94
// Exec executes command
95
// parameter `cmdLine` contains command and its arguments, for example: "set key value"
96
func (server *Server) Exec(c redis.Connection, cmdLine [][]byte) (result redis.Reply) {
1✔
97
        defer func() {
2✔
98
                if err := recover(); err != nil {
1✔
99
                        logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))
×
UNCOV
100
                        result = &protocol.UnknownErrReply{}
×
101
                }
×
102
        }()
103

104
        cmdName := strings.ToLower(string(cmdLine[0]))
1✔
105
        // ping
1✔
106
        if cmdName == "ping" {
2✔
107
                return Ping(c, cmdLine[1:])
1✔
108
        }
1✔
109
        // authenticate
110
        if cmdName == "auth" {
2✔
111
                return Auth(c, cmdLine[1:])
1✔
112
        }
1✔
113
        if !isAuthenticated(c) {
2✔
114
                return protocol.MakeErrReply("NOAUTH Authentication required")
1✔
115
        }
1✔
116
        // info
117
        if cmdName == "info" {
2✔
118
                return Info(server, cmdLine[1:])
1✔
119
        }
1✔
120
        if cmdName == "dbsize" {
2✔
121
                return DbSize(c, server)
1✔
122
        }
1✔
123
        if cmdName == "slaveof" {
2✔
124
                if c != nil && c.InMultiState() {
1✔
UNCOV
125
                        return protocol.MakeErrReply("cannot use slave of database within multi")
×
126
                }
×
127
                if len(cmdLine) != 3 {
1✔
UNCOV
128
                        return protocol.MakeArgNumErrReply("SLAVEOF")
×
UNCOV
129
                }
×
130
                return server.execSlaveOf(c, cmdLine[1:])
1✔
131
        } else if cmdName == "command" {
2✔
132
                return execCommand(cmdLine[1:])
1✔
133
        }
1✔
134

135
        // read only slave
136
        role := atomic.LoadInt32(&server.role)
1✔
137
        if role == slaveRole && !c.IsMaster() {
2✔
138
                // only allow read only command, forbid all special commands except `auth` and `slaveof`
1✔
139
                if !isReadOnlyCommand(cmdName) {
1✔
UNCOV
140
                        return protocol.MakeErrReply("READONLY You can't write against a read only slave.")
×
UNCOV
141
                }
×
142
        }
143

144
        // special commands which cannot execute within transaction
145
        if cmdName == "subscribe" {
1✔
146
                if len(cmdLine) < 2 {
×
147
                        return protocol.MakeArgNumErrReply("subscribe")
×
UNCOV
148
                }
×
149
                return pubsub.Subscribe(server.hub, c, cmdLine[1:])
×
150
        } else if cmdName == "publish" {
1✔
UNCOV
151
                return pubsub.Publish(server.hub, cmdLine[1:])
×
152
        } else if cmdName == "unsubscribe" {
1✔
UNCOV
153
                return pubsub.UnSubscribe(server.hub, c, cmdLine[1:])
×
154
        } else if cmdName == "bgrewriteaof" {
1✔
155
                if !config.Properties.AppendOnly {
×
UNCOV
156
                        return protocol.MakeErrReply("AppendOnly is false, you can't rewrite aof file")
×
UNCOV
157
                }
×
158
                // aof.go imports router.go, router.go cannot import BGRewriteAOF from aof.go
UNCOV
159
                return BGRewriteAOF(server, cmdLine[1:])
×
160
        } else if cmdName == "rewriteaof" {
2✔
161
                if !config.Properties.AppendOnly {
1✔
UNCOV
162
                        return protocol.MakeErrReply("AppendOnly is false, you can't rewrite aof file")
×
UNCOV
163
                }
×
164
                return RewriteAOF(server, cmdLine[1:])
1✔
165
        } else if cmdName == "flushall" {
2✔
166
                return server.flushAll()
1✔
167
        } else if cmdName == "flushdb" {
3✔
168
                if !validateArity(1, cmdLine) {
1✔
169
                        return protocol.MakeArgNumErrReply(cmdName)
×
UNCOV
170
                }
×
171
                if c.InMultiState() {
1✔
UNCOV
172
                        return protocol.MakeErrReply("ERR command 'FlushDB' cannot be used in MULTI")
×
UNCOV
173
                }
×
174
                return server.execFlushDB(c.GetDBIndex())
1✔
175
        } else if cmdName == "save" {
2✔
176
                return SaveRDB(server, cmdLine[1:])
1✔
177
        } else if cmdName == "bgsave" {
2✔
UNCOV
178
                return BGSaveRDB(server, cmdLine[1:])
×
179
        } else if cmdName == "select" {
2✔
180
                if c != nil && c.InMultiState() {
1✔
UNCOV
181
                        return protocol.MakeErrReply("cannot select database within multi")
×
UNCOV
182
                }
×
183
                if len(cmdLine) != 2 {
1✔
UNCOV
184
                        return protocol.MakeArgNumErrReply("select")
×
UNCOV
185
                }
×
186
                return execSelect(c, server, cmdLine[1:])
1✔
187
        } else if cmdName == "copy" {
2✔
188
                if len(cmdLine) < 3 {
1✔
UNCOV
189
                        return protocol.MakeArgNumErrReply("copy")
×
UNCOV
190
                }
×
191
                return execCopy(server, c, cmdLine[1:])
1✔
192
        } else if cmdName == "replconf" {
1✔
UNCOV
193
                return server.execReplConf(c, cmdLine[1:])
×
194
        } else if cmdName == "psync" {
2✔
195
                return server.execPSync(c, cmdLine[1:])
1✔
196
        }
1✔
197
        // todo: support multi database transaction
198

199
        // normal commands
200
        dbIndex := c.GetDBIndex()
1✔
201
        selectedDB, errReply := server.selectDB(dbIndex)
1✔
202
        if errReply != nil {
1✔
203
                return errReply
×
UNCOV
204
        }
×
205
        return selectedDB.Exec(c, cmdLine)
1✔
206
}
207

208
// AfterClientClose does some clean after client close connection
UNCOV
209
func (server *Server) AfterClientClose(c redis.Connection) {
×
UNCOV
210
        pubsub.UnsubscribeAll(server.hub, c)
×
211
}
×
212

213
// Close graceful shutdown database
214
func (server *Server) Close() {
1✔
215
        // stop slaveStatus first
1✔
216
        server.slaveStatus.close()
1✔
217
        if server.persister != nil {
2✔
218
                server.persister.Close()
1✔
219
        }
1✔
220
        server.stopMaster()
1✔
221
}
222

223
func execSelect(c redis.Connection, mdb *Server, args [][]byte) redis.Reply {
1✔
224
        dbIndex, err := strconv.Atoi(string(args[0]))
1✔
225
        if err != nil {
1✔
226
                return protocol.MakeErrReply("ERR invalid DB index")
×
UNCOV
227
        }
×
228
        if dbIndex >= len(mdb.dbSet) || dbIndex < 0 {
1✔
UNCOV
229
                return protocol.MakeErrReply("ERR DB index is out of range")
×
UNCOV
230
        }
×
231
        c.SelectDB(dbIndex)
1✔
232
        return protocol.MakeOkReply()
1✔
233
}
234

235
func (server *Server) execFlushDB(dbIndex int) redis.Reply {
1✔
236
        if server.persister != nil {
2✔
237
                server.persister.SaveCmdLine(dbIndex, utils.ToCmdLine("FlushDB"))
1✔
238
        }
1✔
239
        return server.flushDB(dbIndex)
1✔
240
}
241

242
// flushDB flushes the selected database
243
func (server *Server) flushDB(dbIndex int) redis.Reply {
1✔
244
        if dbIndex >= len(server.dbSet) || dbIndex < 0 {
1✔
UNCOV
245
                return protocol.MakeErrReply("ERR DB index is out of range")
×
UNCOV
246
        }
×
247
        newDB := makeDB()
1✔
248
        server.loadDB(dbIndex, newDB)
1✔
249
        return &protocol.OkReply{}
1✔
250
}
251

252
func (server *Server) loadDB(dbIndex int, newDB *DB) redis.Reply {
1✔
253
        if dbIndex >= len(server.dbSet) || dbIndex < 0 {
1✔
254
                return protocol.MakeErrReply("ERR DB index is out of range")
×
255
        }
×
256
        oldDB := server.mustSelectDB(dbIndex)
1✔
257
        newDB.index = dbIndex
1✔
258
        newDB.addAof = oldDB.addAof // inherit oldDB
1✔
259
        server.dbSet[dbIndex].Store(newDB)
1✔
260
        return &protocol.OkReply{}
1✔
261
}
262

263
// flushAll flushes all databases.
264
func (server *Server) flushAll() redis.Reply {
1✔
265
        for i := range server.dbSet {
2✔
266
                server.flushDB(i)
1✔
267
        }
1✔
268
        if server.persister != nil {
1✔
UNCOV
269
                server.persister.SaveCmdLine(0, utils.ToCmdLine("FlushAll"))
×
270
        }
×
271
        return &protocol.OkReply{}
1✔
272
}
273

274
// selectDB returns the database with the given index, or an error if the index is out of range.
275
func (server *Server) selectDB(dbIndex int) (*DB, *protocol.StandardErrReply) {
1✔
276
        if dbIndex >= len(server.dbSet) || dbIndex < 0 {
1✔
277
                return nil, protocol.MakeErrReply("ERR DB index is out of range")
×
278
        }
×
279
        return server.dbSet[dbIndex].Load().(*DB), nil
1✔
280
}
281

282
// mustSelectDB is like selectDB, but panics if an error occurs.
283
func (server *Server) mustSelectDB(dbIndex int) *DB {
1✔
284
        selectedDB, err := server.selectDB(dbIndex)
1✔
285
        if err != nil {
1✔
286
                panic(err)
×
287
        }
288
        return selectedDB
1✔
289
}
290

291
// ForEach traverses all the keys in the given database
292
func (server *Server) ForEach(dbIndex int, cb func(key string, data *database.DataEntity, expiration *time.Time) bool) {
1✔
293
        server.mustSelectDB(dbIndex).ForEach(cb)
1✔
294
}
1✔
295

296
// GetEntity returns the data entity to the given key
UNCOV
297
func (server *Server) GetEntity(dbIndex int, key string) (*database.DataEntity, bool) {
×
UNCOV
298
        return server.mustSelectDB(dbIndex).GetEntity(key)
×
UNCOV
299
}
×
300

301
func (server *Server) GetExpiration(dbIndex int, key string) *time.Time {
×
302
        raw, ok := server.mustSelectDB(dbIndex).ttlMap.Get(key)
×
UNCOV
303
        if !ok {
×
UNCOV
304
                return nil
×
305
        }
×
306
        expireTime, _ := raw.(time.Time)
×
UNCOV
307
        return &expireTime
×
308
}
309

310
// ExecMulti executes multi commands transaction Atomically and Isolated
UNCOV
311
func (server *Server) ExecMulti(conn redis.Connection, watching map[string]uint32, cmdLines []CmdLine) redis.Reply {
×
UNCOV
312
        selectedDB, errReply := server.selectDB(conn.GetDBIndex())
×
UNCOV
313
        if errReply != nil {
×
UNCOV
314
                return errReply
×
315
        }
×
316
        return selectedDB.ExecMulti(conn, watching, cmdLines)
×
317
}
318

319
// RWLocks lock keys for writing and reading
320
func (server *Server) RWLocks(dbIndex int, writeKeys []string, readKeys []string) {
×
321
        server.mustSelectDB(dbIndex).RWLocks(writeKeys, readKeys)
×
322
}
×
323

324
// RWUnLocks unlock keys for writing and reading
325
func (server *Server) RWUnLocks(dbIndex int, writeKeys []string, readKeys []string) {
×
326
        server.mustSelectDB(dbIndex).RWUnLocks(writeKeys, readKeys)
×
327
}
×
328

329
// GetUndoLogs return rollback commands
330
func (server *Server) GetUndoLogs(dbIndex int, cmdLine [][]byte) []CmdLine {
×
331
        return server.mustSelectDB(dbIndex).GetUndoLogs(cmdLine)
×
332
}
×
333

334
// ExecWithLock executes normal commands, invoker should provide locks
UNCOV
335
func (server *Server) ExecWithLock(conn redis.Connection, cmdLine [][]byte) redis.Reply {
×
UNCOV
336
        db, errReply := server.selectDB(conn.GetDBIndex())
×
UNCOV
337
        if errReply != nil {
×
UNCOV
338
                return errReply
×
UNCOV
339
        }
×
UNCOV
340
        return db.execWithLock(cmdLine)
×
341
}
342

343
// BGRewriteAOF asynchronously rewrites Append-Only-File
UNCOV
344
func BGRewriteAOF(db *Server, args [][]byte) redis.Reply {
×
UNCOV
345
        go db.persister.Rewrite()
×
UNCOV
346
        return protocol.MakeStatusReply("Background append only file rewriting started")
×
UNCOV
347
}
×
348

349
// RewriteAOF start Append-Only-File rewriting and blocked until it finished
350
func RewriteAOF(db *Server, args [][]byte) redis.Reply {
1✔
351
        err := db.persister.Rewrite()
1✔
352
        if err != nil {
1✔
UNCOV
353
                return protocol.MakeErrReply(err.Error())
×
UNCOV
354
        }
×
355
        return protocol.MakeOkReply()
1✔
356
}
357

358
// SaveRDB start RDB writing and blocked until it finished
359
func SaveRDB(db *Server, args [][]byte) redis.Reply {
1✔
360
        if db.persister == nil {
1✔
UNCOV
361
                return protocol.MakeErrReply("please enable aof before using save")
×
UNCOV
362
        }
×
363
        rdbFilename := config.Properties.RDBFilename
1✔
364
        if rdbFilename == "" {
1✔
UNCOV
365
                rdbFilename = "dump.rdb"
×
UNCOV
366
        }
×
367
        err := db.persister.GenerateRDB(rdbFilename)
1✔
368
        if err != nil {
1✔
UNCOV
369
                return protocol.MakeErrReply(err.Error())
×
UNCOV
370
        }
×
371
        return protocol.MakeOkReply()
1✔
372
}
373

374
// BGSaveRDB asynchronously save RDB
UNCOV
375
func BGSaveRDB(db *Server, args [][]byte) redis.Reply {
×
UNCOV
376
        if db.persister == nil {
×
UNCOV
377
                return protocol.MakeErrReply("please enable aof before using save")
×
UNCOV
378
        }
×
UNCOV
379
        go func() {
×
UNCOV
380
                defer func() {
×
UNCOV
381
                        if err := recover(); err != nil {
×
UNCOV
382
                                logger.Error(err)
×
UNCOV
383
                        }
×
384
                }()
UNCOV
385
                rdbFilename := config.Properties.RDBFilename
×
UNCOV
386
                if rdbFilename == "" {
×
UNCOV
387
                        rdbFilename = "dump.rdb"
×
UNCOV
388
                }
×
UNCOV
389
                err := db.persister.GenerateRDB(rdbFilename)
×
UNCOV
390
                if err != nil {
×
UNCOV
391
                        logger.Error(err)
×
UNCOV
392
                }
×
393
        }()
UNCOV
394
        return protocol.MakeStatusReply("Background saving started")
×
395
}
396

397
// GetDBSize returns keys count and ttl key count
398
func (server *Server) GetDBSize(dbIndex int) (int, int) {
1✔
399
        db := server.mustSelectDB(dbIndex)
1✔
400
        return db.data.Len(), db.ttlMap.Len()
1✔
401
}
1✔
402

403
func (server *Server) startReplCron() {
1✔
404
        go func(mdb *Server) {
2✔
405
                ticker := time.Tick(time.Second * 10)
1✔
406
                for range ticker {
2✔
407
                        mdb.slaveCron()
1✔
408
                        mdb.masterCron()
1✔
409
                }
1✔
410
        }(server)
411
}
412

413
// GetAvgTTL Calculate the average expiration time of keys
UNCOV
414
func (server *Server) GetAvgTTL(dbIndex, randomKeyCount int) int64 {
×
UNCOV
415
        var ttlCount int64
×
UNCOV
416
        db := server.mustSelectDB(dbIndex)
×
UNCOV
417
        keys := db.data.RandomKeys(randomKeyCount)
×
UNCOV
418
        for _, k := range keys {
×
UNCOV
419
                t := time.Now()
×
UNCOV
420
                rawExpireTime, ok := db.ttlMap.Get(k)
×
UNCOV
421
                if !ok {
×
UNCOV
422
                        continue
×
423
                }
UNCOV
424
                expireTime, _ := rawExpireTime.(time.Time)
×
UNCOV
425
                // if the key has already reached its expiration time during calculation, ignore it
×
UNCOV
426
                if expireTime.Sub(t).Microseconds() > 0 {
×
UNCOV
427
                        ttlCount += expireTime.Sub(t).Microseconds()
×
UNCOV
428
                }
×
429
        }
UNCOV
430
        return ttlCount / int64(len(keys))
×
431
}
432

UNCOV
433
func (server *Server) SetKeyInsertedCallback(cb database.KeyEventCallback) {
×
UNCOV
434
        server.insertCallback = cb
×
UNCOV
435
        for i := range server.dbSet {
×
UNCOV
436
                db := server.mustSelectDB(i)
×
UNCOV
437
                db.insertCallback = cb
×
UNCOV
438
        }
×
439

440
}
441

UNCOV
442
func (server *Server) SetKeyDeletedCallback(cb database.KeyEventCallback) {
×
UNCOV
443
        server.deleteCallback = cb
×
UNCOV
444
        for i := range server.dbSet {
×
UNCOV
445
                db := server.mustSelectDB(i)
×
UNCOV
446
                db.deleteCallback = cb
×
UNCOV
447
        }
×
448
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc