• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HDT3213 / godis / 4407781310

13 Mar 2023 05:27PM UTC coverage: 75.694% (-0.02%) from 75.716%
4407781310

push

github

finley
bug fix: unlock master status when no slaves

1 of 1 new or added line in 1 file covered. (100.0%)

6107 of 8068 relevant lines covered (75.69%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.06
/database/database.go
1
// Package database is a memory database with redis compatible interface
2
package database
3

4
import (
5
        "github.com/hdt3213/godis/datastruct/dict"
6
        "github.com/hdt3213/godis/datastruct/lock"
7
        "github.com/hdt3213/godis/interface/database"
8
        "github.com/hdt3213/godis/interface/redis"
9
        "github.com/hdt3213/godis/lib/logger"
10
        "github.com/hdt3213/godis/lib/timewheel"
11
        "github.com/hdt3213/godis/redis/protocol"
12
        "strings"
13
        "time"
14
)
15

16
const (
17
        dataDictSize = 1 << 16
18
        ttlDictSize  = 1 << 10
19
        lockerSize   = 1024
20
)
21

22
// DB stores data and execute user's commands
23
type DB struct {
24
        index int
25
        // key -> DataEntity
26
        data dict.Dict
27
        // key -> expireTime (time.Time)
28
        ttlMap dict.Dict
29
        // key -> version(uint32)
30
        versionMap dict.Dict
31

32
        // dict.Dict will ensure concurrent-safety of its method
33
        // use this mutex for complicated command only, eg. rpush, incr ...
34
        locker *lock.Locks
35
        addAof func(CmdLine)
36
}
37

38
// ExecFunc is interface for command executor
39
// args don't include cmd line
40
type ExecFunc func(db *DB, args [][]byte) redis.Reply
41

42
// PreFunc analyses command line when queued command to `multi`
43
// returns related write keys and read keys
44
type PreFunc func(args [][]byte) ([]string, []string)
45

46
// CmdLine is alias for [][]byte, represents a command line
47
type CmdLine = [][]byte
48

49
// UndoFunc returns undo logs for the given command line
50
// execute from head to tail when undo
51
type UndoFunc func(db *DB, args [][]byte) []CmdLine
52

53
// makeDB create DB instance
54
func makeDB() *DB {
1✔
55
        db := &DB{
1✔
56
                data:       dict.MakeConcurrent(dataDictSize),
1✔
57
                ttlMap:     dict.MakeConcurrent(ttlDictSize),
1✔
58
                versionMap: dict.MakeConcurrent(dataDictSize),
1✔
59
                locker:     lock.Make(lockerSize),
1✔
60
                addAof:     func(line CmdLine) {},
2✔
61
        }
62
        return db
1✔
63
}
64

65
// makeBasicDB create DB instance only with basic abilities.
66
// It is not concurrent safe
67
func makeBasicDB() *DB {
1✔
68
        db := &DB{
1✔
69
                data:       dict.MakeSimple(),
1✔
70
                ttlMap:     dict.MakeSimple(),
1✔
71
                versionMap: dict.MakeSimple(),
1✔
72
                locker:     lock.Make(1),
1✔
73
                addAof:     func(line CmdLine) {},
2✔
74
        }
75
        return db
1✔
76
}
77

78
// Exec executes command within one database
79
func (db *DB) Exec(c redis.Connection, cmdLine [][]byte) redis.Reply {
1✔
80
        // transaction control commands and other commands which cannot execute within transaction
1✔
81
        cmdName := strings.ToLower(string(cmdLine[0]))
1✔
82
        if cmdName == "multi" {
2✔
83
                if len(cmdLine) != 1 {
1✔
84
                        return protocol.MakeArgNumErrReply(cmdName)
×
85
                }
×
86
                return StartMulti(c)
1✔
87
        } else if cmdName == "discard" {
2✔
88
                if len(cmdLine) != 1 {
1✔
89
                        return protocol.MakeArgNumErrReply(cmdName)
×
90
                }
×
91
                return DiscardMulti(c)
1✔
92
        } else if cmdName == "exec" {
2✔
93
                if len(cmdLine) != 1 {
1✔
94
                        return protocol.MakeArgNumErrReply(cmdName)
×
95
                }
×
96
                return execMulti(db, c)
1✔
97
        } else if cmdName == "watch" {
2✔
98
                if !validateArity(-2, cmdLine) {
1✔
99
                        return protocol.MakeArgNumErrReply(cmdName)
×
100
                }
×
101
                return Watch(db, c, cmdLine[1:])
1✔
102
        }
103
        if c != nil && c.InMultiState() {
2✔
104
                return EnqueueCmd(c, cmdLine)
1✔
105
        }
1✔
106

107
        return db.execNormalCommand(cmdLine)
1✔
108
}
109

110
func (db *DB) execNormalCommand(cmdLine [][]byte) redis.Reply {
1✔
111
        cmdName := strings.ToLower(string(cmdLine[0]))
1✔
112
        cmd, ok := cmdTable[cmdName]
1✔
113
        if !ok {
2✔
114
                return protocol.MakeErrReply("ERR unknown command '" + cmdName + "'")
1✔
115
        }
1✔
116
        if !validateArity(cmd.arity, cmdLine) {
2✔
117
                return protocol.MakeArgNumErrReply(cmdName)
1✔
118
        }
1✔
119

120
        prepare := cmd.prepare
1✔
121
        write, read := prepare(cmdLine[1:])
1✔
122
        db.addVersion(write...)
1✔
123
        db.RWLocks(write, read)
1✔
124
        defer db.RWUnLocks(write, read)
1✔
125
        fun := cmd.executor
1✔
126
        return fun(db, cmdLine[1:])
1✔
127
}
128

129
// execWithLock executes normal commands, invoker should provide locks
130
func (db *DB) execWithLock(cmdLine [][]byte) redis.Reply {
1✔
131
        cmdName := strings.ToLower(string(cmdLine[0]))
1✔
132
        cmd, ok := cmdTable[cmdName]
1✔
133
        if !ok {
1✔
134
                return protocol.MakeErrReply("ERR unknown command '" + cmdName + "'")
×
135
        }
×
136
        if !validateArity(cmd.arity, cmdLine) {
1✔
137
                return protocol.MakeArgNumErrReply(cmdName)
×
138
        }
×
139
        fun := cmd.executor
1✔
140
        return fun(db, cmdLine[1:])
1✔
141
}
142

143
func validateArity(arity int, cmdArgs [][]byte) bool {
1✔
144
        argNum := len(cmdArgs)
1✔
145
        if arity >= 0 {
2✔
146
                return argNum == arity
1✔
147
        }
1✔
148
        return argNum >= -arity
1✔
149
}
150

151
/* ---- Data Access ----- */
152

153
// GetEntity returns DataEntity bind to given key
154
func (db *DB) GetEntity(key string) (*database.DataEntity, bool) {
1✔
155
        raw, ok := db.data.Get(key)
1✔
156
        if !ok {
2✔
157
                return nil, false
1✔
158
        }
1✔
159
        if db.IsExpired(key) {
2✔
160
                return nil, false
1✔
161
        }
1✔
162
        entity, _ := raw.(*database.DataEntity)
1✔
163
        return entity, true
1✔
164
}
165

166
// PutEntity a DataEntity into DB
167
func (db *DB) PutEntity(key string, entity *database.DataEntity) int {
1✔
168
        return db.data.Put(key, entity)
1✔
169
}
1✔
170

171
// PutIfExists edit an existing DataEntity
172
func (db *DB) PutIfExists(key string, entity *database.DataEntity) int {
1✔
173
        return db.data.PutIfExists(key, entity)
1✔
174
}
1✔
175

176
// PutIfAbsent insert an DataEntity only if the key not exists
177
func (db *DB) PutIfAbsent(key string, entity *database.DataEntity) int {
1✔
178
        return db.data.PutIfAbsent(key, entity)
1✔
179
}
1✔
180

181
// Remove the given key from db
182
func (db *DB) Remove(key string) {
1✔
183
        db.data.Remove(key)
1✔
184
        db.ttlMap.Remove(key)
1✔
185
        taskKey := genExpireTask(key)
1✔
186
        timewheel.Cancel(taskKey)
1✔
187
}
1✔
188

189
// Removes the given keys from db
190
func (db *DB) Removes(keys ...string) (deleted int) {
1✔
191
        deleted = 0
1✔
192
        for _, key := range keys {
2✔
193
                _, exists := db.data.Get(key)
1✔
194
                if exists {
2✔
195
                        db.Remove(key)
1✔
196
                        deleted++
1✔
197
                }
1✔
198
        }
199
        return deleted
1✔
200
}
201

202
// Flush clean database
203
// deprecated
204
// for test only
205
func (db *DB) Flush() {
1✔
206
        db.data.Clear()
1✔
207
        db.ttlMap.Clear()
1✔
208
        db.locker = lock.Make(lockerSize)
1✔
209
}
1✔
210

211
/* ---- Lock Function ----- */
212

213
// RWLocks lock keys for writing and reading
214
func (db *DB) RWLocks(writeKeys []string, readKeys []string) {
1✔
215
        db.locker.RWLocks(writeKeys, readKeys)
1✔
216
}
1✔
217

218
// RWUnLocks unlock keys for writing and reading
219
func (db *DB) RWUnLocks(writeKeys []string, readKeys []string) {
1✔
220
        db.locker.RWUnLocks(writeKeys, readKeys)
1✔
221
}
1✔
222

223
/* ---- TTL Functions ---- */
224

225
func genExpireTask(key string) string {
1✔
226
        return "expire:" + key
1✔
227
}
1✔
228

229
// Expire sets ttlCmd of key
230
func (db *DB) Expire(key string, expireTime time.Time) {
1✔
231
        db.ttlMap.Put(key, expireTime)
1✔
232
        taskKey := genExpireTask(key)
1✔
233
        timewheel.At(expireTime, taskKey, func() {
2✔
234
                keys := []string{key}
1✔
235
                db.RWLocks(keys, nil)
1✔
236
                defer db.RWUnLocks(keys, nil)
1✔
237
                // check-lock-check, ttl may be updated during waiting lock
1✔
238
                logger.Info("expire " + key)
1✔
239
                rawExpireTime, ok := db.ttlMap.Get(key)
1✔
240
                if !ok {
1✔
241
                        return
×
242
                }
×
243
                expireTime, _ := rawExpireTime.(time.Time)
1✔
244
                expired := time.Now().After(expireTime)
1✔
245
                if expired {
1✔
246
                        db.Remove(key)
×
247
                }
×
248
        })
249
}
250

251
// Persist cancel ttlCmd of key
252
func (db *DB) Persist(key string) {
1✔
253
        db.ttlMap.Remove(key)
1✔
254
        taskKey := genExpireTask(key)
1✔
255
        timewheel.Cancel(taskKey)
1✔
256
}
1✔
257

258
// IsExpired check whether a key is expired
259
func (db *DB) IsExpired(key string) bool {
1✔
260
        rawExpireTime, ok := db.ttlMap.Get(key)
1✔
261
        if !ok {
2✔
262
                return false
1✔
263
        }
1✔
264
        expireTime, _ := rawExpireTime.(time.Time)
1✔
265
        expired := time.Now().After(expireTime)
1✔
266
        if expired {
2✔
267
                db.Remove(key)
1✔
268
        }
1✔
269
        return expired
1✔
270
}
271

272
/* --- add version --- */
273

274
func (db *DB) addVersion(keys ...string) {
1✔
275
        for _, key := range keys {
2✔
276
                versionCode := db.GetVersion(key)
1✔
277
                db.versionMap.Put(key, versionCode+1)
1✔
278
        }
1✔
279
}
280

281
// GetVersion returns version code for given key
282
func (db *DB) GetVersion(key string) uint32 {
1✔
283
        entity, ok := db.versionMap.Get(key)
1✔
284
        if !ok {
2✔
285
                return 0
1✔
286
        }
1✔
287
        return entity.(uint32)
1✔
288
}
289

290
// ForEach traverses all the keys in the database
291
func (db *DB) ForEach(cb func(key string, data *database.DataEntity, expiration *time.Time) bool) {
1✔
292
        db.data.ForEach(func(key string, raw interface{}) bool {
2✔
293
                entity, _ := raw.(*database.DataEntity)
1✔
294
                var expiration *time.Time
1✔
295
                rawExpireTime, ok := db.ttlMap.Get(key)
1✔
296
                if ok {
2✔
297
                        expireTime, _ := rawExpireTime.(time.Time)
1✔
298
                        expiration = &expireTime
1✔
299
                }
1✔
300
                return cb(key, entity, expiration)
1✔
301
        })
302
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc