• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HDT3213 / delayqueue / 5916655980

20 Aug 2023 09:41AM UTC coverage: 85.246% (+3.5%) from 81.699%
5916655980

push

github

HDT3213
fix test suite

364 of 427 relevant lines covered (85.25%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.27
/delayqueue.go
1
package delayqueue
2

3
import (
4
        "errors"
5
        "fmt"
6
        "github.com/google/uuid"
7
        "log"
8
        "strconv"
9
        "sync"
10
        "time"
11
)
12

13
// DelayQueue is a message queue supporting delayed/scheduled delivery based on redis
14
type DelayQueue struct {
15
        // name for this Queue. Make sure the name is unique in redis database
16
        name          string
17
        redisCli      RedisCli
18
        cb            func(string) bool
19
        pendingKey    string // sorted set: message id -> delivery time
20
        readyKey      string // list
21
        unAckKey      string // sorted set: message id -> retry time
22
        retryKey      string // list
23
        retryCountKey string // hash: message id -> remain retry count
24
        garbageKey    string // set: message id
25
        useHashTag    bool
26
        ticker        *time.Ticker
27
        logger        *log.Logger
28
        close         chan struct{}
29

30
        maxConsumeDuration time.Duration
31
        msgTTL             time.Duration
32
        defaultRetryCount  uint
33
        fetchInterval      time.Duration
34
        fetchLimit         uint
35
        concurrent         uint
36
}
37

38
// NilErr represents redis nil
39
var NilErr = errors.New("nil")
40

41
// RedisCli is abstraction for redis client, required commands only not all commands
42
type RedisCli interface {
43
        Eval(script string, keys []string, args []interface{}) (interface{}, error) // args should be string, integer or float
44
        Set(key string, value string, expiration time.Duration) error
45
        Get(key string) (string, error)
46
        Del(keys []string) error
47
        HSet(key string, field string, value string) error
48
        HDel(key string, fields []string) error
49
        SMembers(key string) ([]string, error)
50
        SRem(key string, members []string) error
51
        ZAdd(key string, values map[string]float64) error
52
        ZRem(key string, fields []string) error
53
}
54

55
type hashTagKeyOpt int
56

57
// UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot.
58
// If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue
59
// WARNING! Changing (add or remove) this option will cause DelayQueue failing to read existed data in redis
60
// see more:  https://redis.io/docs/reference/cluster-spec/#hash-tags
61
func UseHashTagKey() interface{} {
1✔
62
        return hashTagKeyOpt(1)
1✔
63
}
1✔
64

65
// NewQueue0 creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message
66
// callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
67
func NewQueue0(name string, cli RedisCli, callback func(string) bool, opts ...interface{}) *DelayQueue {
1✔
68
        if name == "" {
1✔
69
                panic("name is required")
×
70
        }
71
        if cli == nil {
1✔
72
                panic("cli is required")
×
73
        }
74
        if callback == nil {
1✔
75
                panic("callback is required")
×
76
        }
77
        useHashTag := false
1✔
78
        for _, opt := range opts {
2✔
79
                switch opt.(type) {
1✔
80
                case hashTagKeyOpt:
1✔
81
                        useHashTag = true
1✔
82
                }
83
        }
84
        var keyPrefix string
1✔
85
        if useHashTag {
2✔
86
                keyPrefix = "{dp:" + name + "}"
1✔
87
        } else {
2✔
88
                keyPrefix = "dp:" + name
1✔
89
        }
1✔
90
        return &DelayQueue{
1✔
91
                name:               name,
1✔
92
                redisCli:           cli,
1✔
93
                cb:                 callback,
1✔
94
                pendingKey:         keyPrefix + ":pending",
1✔
95
                readyKey:           keyPrefix + ":ready",
1✔
96
                unAckKey:           keyPrefix + ":unack",
1✔
97
                retryKey:           keyPrefix + ":retry",
1✔
98
                retryCountKey:      keyPrefix + ":retry:cnt",
1✔
99
                garbageKey:         keyPrefix + ":garbage",
1✔
100
                useHashTag:         useHashTag,
1✔
101
                close:              make(chan struct{}, 1),
1✔
102
                maxConsumeDuration: 5 * time.Second,
1✔
103
                msgTTL:             time.Hour,
1✔
104
                logger:             log.Default(),
1✔
105
                defaultRetryCount:  3,
1✔
106
                fetchInterval:      time.Second,
1✔
107
                concurrent:         1,
1✔
108
        }
1✔
109
}
110

111
// WithLogger customizes logger for queue
112
func (q *DelayQueue) WithLogger(logger *log.Logger) *DelayQueue {
1✔
113
        q.logger = logger
1✔
114
        return q
1✔
115
}
1✔
116

117
// WithFetchInterval customizes the interval at which consumer fetch message from redis
118
func (q *DelayQueue) WithFetchInterval(d time.Duration) *DelayQueue {
1✔
119
        q.fetchInterval = d
1✔
120
        return q
1✔
121
}
1✔
122

123
// WithMaxConsumeDuration customizes max consume duration
124
// If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again
125
func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue {
1✔
126
        q.maxConsumeDuration = d
1✔
127
        return q
1✔
128
}
1✔
129

130
// WithFetchLimit limits the max number of unack (processing) messages
131
func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue {
1✔
132
        q.fetchLimit = limit
1✔
133
        return q
1✔
134
}
1✔
135

136
// WithConcurrent sets the number of concurrent consumers
137
func (q *DelayQueue) WithConcurrent(c uint) *DelayQueue {
1✔
138
        if c == 0 {
1✔
139
                return q
×
140
        }
×
141
        q.concurrent = c
1✔
142
        return q
1✔
143
}
144

145
// WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue
146
// use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message
147
func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue {
1✔
148
        q.defaultRetryCount = count
1✔
149
        return q
1✔
150
}
1✔
151

152
func (q *DelayQueue) genMsgKey(idStr string) string {
1✔
153
        if q.useHashTag {
2✔
154
                return "{dp:" + q.name + "}" + ":msg:" + idStr
1✔
155
        }
1✔
156
        return "dp:" + q.name + ":msg:" + idStr
1✔
157
}
158

159
type retryCountOpt int
160

161
// WithRetryCount set retry count for a msg
162
// example: queue.SendDelayMsg(payload, duration, delayqueue.WithRetryCount(3))
163
func WithRetryCount(count int) interface{} {
1✔
164
        return retryCountOpt(count)
1✔
165
}
1✔
166

167
type msgTTLOpt time.Duration
168

169
// WithMsgTTL set ttl for a msg
170
// example: queue.SendDelayMsg(payload, duration, delayqueue.WithMsgTTL(Hour))
171
func WithMsgTTL(d time.Duration) interface{} {
1✔
172
        return msgTTLOpt(d)
1✔
173
}
1✔
174

175
// SendScheduleMsg submits a message delivered at given time
176
func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error {
1✔
177
        // parse options
1✔
178
        retryCount := q.defaultRetryCount
1✔
179
        for _, opt := range opts {
2✔
180
                switch o := opt.(type) {
1✔
181
                case retryCountOpt:
1✔
182
                        retryCount = uint(o)
1✔
183
                case msgTTLOpt:
1✔
184
                        q.msgTTL = time.Duration(o)
1✔
185
                }
186
        }
187
        // generate id
188
        idStr := uuid.Must(uuid.NewRandom()).String()
1✔
189
        now := time.Now()
1✔
190
        // store msg
1✔
191
        msgTTL := t.Sub(now) + q.msgTTL // delivery + q.msgTTL
1✔
192
        err := q.redisCli.Set(q.genMsgKey(idStr), payload, msgTTL)
1✔
193
        if err != nil {
1✔
194
                return fmt.Errorf("store msg failed: %v", err)
×
195
        }
×
196
        // store retry count
197
        err = q.redisCli.HSet(q.retryCountKey, idStr, strconv.Itoa(int(retryCount)))
1✔
198
        if err != nil {
1✔
199
                return fmt.Errorf("store retry count failed: %v", err)
×
200
        }
×
201
        // put to pending
202
        err = q.redisCli.ZAdd(q.pendingKey, map[string]float64{idStr: float64(t.Unix())})
1✔
203
        if err != nil {
1✔
204
                return fmt.Errorf("push to pending failed: %v", err)
×
205
        }
×
206
        return nil
1✔
207
}
208

209
// SendDelayMsg submits a message delivered after given duration
210
func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error {
1✔
211
        t := time.Now().Add(duration)
1✔
212
        return q.SendScheduleMsg(payload, t, opts...)
1✔
213
}
1✔
214

215
// pending2ReadyScript atomically moves messages from pending to ready
216
// keys: pendingKey, readyKey
217
// argv: currentTime
218
const pending2ReadyScript = `
219
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- get ready msg
220
if (#msgs == 0) then return end
221
local args2 = {'LPush', KEYS[2]} -- push into ready
222
for _,v in ipairs(msgs) do
223
        table.insert(args2, v) 
224
    if (#args2 == 4000) then
225
                redis.call(unpack(args2))
226
                args2 = {'LPush', KEYS[2]}
227
        end
228
end
229
if (#args2 > 2) then 
230
        redis.call(unpack(args2))
231
end
232
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- remove msgs from pending
233
`
234

235
func (q *DelayQueue) pending2Ready() error {
1✔
236
        now := time.Now().Unix()
1✔
237
        keys := []string{q.pendingKey, q.readyKey}
1✔
238
        _, err := q.redisCli.Eval(pending2ReadyScript, keys, []interface{}{now})
1✔
239
        if err != nil && err != NilErr {
1✔
240
                return fmt.Errorf("pending2ReadyScript failed: %v", err)
×
241
        }
×
242
        return nil
1✔
243
}
244

245
// ready2UnackScript atomically moves messages from ready to unack
246
// keys: readyKey/retryKey, unackKey
247
// argv: retryTime
248
const ready2UnackScript = `
249
local msg = redis.call('RPop', KEYS[1])
250
if (not msg) then return end
251
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
252
return msg
253
`
254

255
func (q *DelayQueue) ready2Unack() (string, error) {
1✔
256
        retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
1✔
257
        keys := []string{q.readyKey, q.unAckKey}
1✔
258
        ret, err := q.redisCli.Eval(ready2UnackScript, keys, []interface{}{retryTime})
1✔
259
        if err == NilErr {
2✔
260
                return "", err
1✔
261
        }
1✔
262
        if err != nil {
1✔
263
                return "", fmt.Errorf("ready2UnackScript failed: %v", err)
×
264
        }
×
265
        str, ok := ret.(string)
1✔
266
        if !ok {
1✔
267
                return "", fmt.Errorf("illegal result: %#v", ret)
×
268
        }
×
269
        return str, nil
1✔
270
}
271

272
func (q *DelayQueue) retry2Unack() (string, error) {
1✔
273
        retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
1✔
274
        keys := []string{q.retryKey, q.unAckKey}
1✔
275
        ret, err := q.redisCli.Eval(ready2UnackScript, keys, []interface{}{retryTime, q.retryKey, q.unAckKey})
1✔
276
        if err == NilErr {
2✔
277
                return "", NilErr
1✔
278
        }
1✔
279
        if err != nil {
1✔
280
                return "", fmt.Errorf("ready2UnackScript failed: %v", err)
×
281
        }
×
282
        str, ok := ret.(string)
1✔
283
        if !ok {
1✔
284
                return "", fmt.Errorf("illegal result: %#v", ret)
×
285
        }
×
286
        return str, nil
1✔
287
}
288

289
func (q *DelayQueue) callback(idStr string) error {
1✔
290
        payload, err := q.redisCli.Get(q.genMsgKey(idStr))
1✔
291
        if err == NilErr {
1✔
292
                return nil
×
293
        }
×
294
        if err != nil {
1✔
295
                // Is an IO error?
×
296
                return fmt.Errorf("get message payload failed: %v", err)
×
297
        }
×
298
        ack := q.cb(payload)
1✔
299
        if ack {
2✔
300
                err = q.ack(idStr)
1✔
301
        } else {
2✔
302
                err = q.nack(idStr)
1✔
303
        }
1✔
304
        return err
1✔
305
}
306

307
// batchCallback calls DelayQueue.callback in batch. callback is executed concurrently according to property DelayQueue.concurrent
308
// batchCallback must wait all callback finished, otherwise the actual number of processing messages may beyond DelayQueue.FetchLimit
309
func (q *DelayQueue) batchCallback(ids []string) {
1✔
310
        if len(ids) == 1 || q.concurrent == 1 {
2✔
311
                for _, id := range ids {
2✔
312
                        err := q.callback(id)
1✔
313
                        if err != nil {
1✔
314
                                q.logger.Printf("consume msg %s failed: %v", id, err)
×
315
                        }
×
316
                }
317
                return
1✔
318
        }
319
        ch := make(chan string, len(ids))
1✔
320
        for _, id := range ids {
2✔
321
                ch <- id
1✔
322
        }
1✔
323
        close(ch)
1✔
324
        wg := sync.WaitGroup{}
1✔
325
        concurrent := int(q.concurrent)
1✔
326
        if concurrent > len(ids) { // too many goroutines is no use
1✔
327
                concurrent = len(ids)
×
328
        }
×
329
        wg.Add(concurrent)
1✔
330
        for i := 0; i < concurrent; i++ {
2✔
331
                go func() {
2✔
332
                        defer wg.Done()
1✔
333
                        for id := range ch {
2✔
334
                                err := q.callback(id)
1✔
335
                                if err != nil {
1✔
336
                                        q.logger.Printf("consume msg %s failed: %v", id, err)
×
337
                                }
×
338
                        }
339
                }()
340
        }
341
        wg.Wait()
1✔
342
}
343

344
func (q *DelayQueue) ack(idStr string) error {
1✔
345
        err := q.redisCli.ZRem(q.unAckKey, []string{idStr})
1✔
346
        if err != nil {
1✔
347
                return fmt.Errorf("remove from unack failed: %v", err)
×
348
        }
×
349
        // msg key has ttl, ignore result of delete
350
        _ = q.redisCli.Del([]string{q.genMsgKey(idStr)})
1✔
351
        _ = q.redisCli.HDel(q.retryCountKey, []string{idStr})
1✔
352
        return nil
1✔
353
}
354

355
func (q *DelayQueue) nack(idStr string) error {
1✔
356
        // update retry time as now, unack2Retry will move it to retry immediately
1✔
357
        err := q.redisCli.ZAdd(q.unAckKey, map[string]float64{
1✔
358
                idStr: float64(time.Now().Unix()),
1✔
359
        })
1✔
360
        if err != nil {
1✔
361
                return fmt.Errorf("negative ack failed: %v", err)
×
362
        }
×
363
        return nil
1✔
364
}
365

366
// unack2RetryScript atomically moves messages from unack to retry which remaining retry count greater than 0,
367
// and moves messages from unack to garbage which  retry count is 0
368
// Because DelayQueue cannot determine garbage message before eval unack2RetryScript, so it cannot pass keys parameter to redisCli.Eval
369
// Therefore unack2RetryScript moves garbage message to garbageKey instead of deleting directly
370
// keys: unackKey, retryCountKey, retryKey, garbageKey
371
// argv: currentTime
372
const unack2RetryScript = `
373
local unack2retry = function(msgs)
374
        local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- get retry count
375
        for i,v in ipairs(retryCounts) do
376
                local k = msgs[i]
377
                if v ~= false and v ~= nil and v ~= '' and tonumber(v) > 0 then
378
                        redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count
379
                        redis.call("LPush", KEYS[3], k) -- add to retry
380
                else
381
                        redis.call("HDel", KEYS[2], k) -- del retry count
382
                        redis.call("SAdd", KEYS[4], k) -- add to garbage
383
                end
384
        end
385
end
386

387
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- get retry msg
388
if (#msgs == 0) then return end
389
if #msgs < 4000 then
390
        unack2retry(msgs)
391
else
392
        local buf = {}
393
        for _,v in ipairs(msgs) do
394
                table.insert(buf, v)
395
                if #buf == 4000 then
396
                        unack2retry(buf)
397
                        buf = {}
398
                end
399
        end
400
        if (#buf > 0) then
401
                unack2retry(buf)
402
        end
403
end
404
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- remove msgs from unack
405
`
406

407
func (q *DelayQueue) unack2Retry() error {
1✔
408
        keys := []string{q.unAckKey, q.retryCountKey, q.retryKey, q.garbageKey}
1✔
409
        now := time.Now()
1✔
410
        _, err := q.redisCli.Eval(unack2RetryScript, keys, []interface{}{now.Unix()})
1✔
411
        if err != nil && err != NilErr {
1✔
412
                return fmt.Errorf("unack to retry script failed: %v", err)
×
413
        }
×
414
        return nil
1✔
415
}
416

417
func (q *DelayQueue) garbageCollect() error {
1✔
418
        msgIds, err := q.redisCli.SMembers(q.garbageKey)
1✔
419
        if err != nil {
1✔
420
                return fmt.Errorf("smembers failed: %v", err)
×
421
        }
×
422
        if len(msgIds) == 0 {
2✔
423
                return nil
1✔
424
        }
1✔
425
        // allow concurrent clean
426
        msgKeys := make([]string, 0, len(msgIds))
1✔
427
        for _, idStr := range msgIds {
2✔
428
                msgKeys = append(msgKeys, q.genMsgKey(idStr))
1✔
429
        }
1✔
430
        err = q.redisCli.Del(msgKeys)
1✔
431
        if err != nil && err != NilErr {
1✔
432
                return fmt.Errorf("del msgs failed: %v", err)
×
433
        }
×
434
        err = q.redisCli.SRem(q.garbageKey, msgIds)
1✔
435
        if err != nil && err != NilErr {
1✔
436
                return fmt.Errorf("remove from garbage key failed: %v", err)
×
437
        }
×
438
        return nil
1✔
439
}
440

441
func (q *DelayQueue) consume() error {
1✔
442
        // pending to ready
1✔
443
        err := q.pending2Ready()
1✔
444
        if err != nil {
1✔
445
                return err
×
446
        }
×
447
        // consume
448
        ids := make([]string, 0, q.fetchLimit)
1✔
449
        for {
2✔
450
                idStr, err := q.ready2Unack()
1✔
451
                if err == NilErr { // consumed all
2✔
452
                        break
1✔
453
                }
454
                if err != nil {
1✔
455
                        return err
×
456
                }
×
457
                ids = append(ids, idStr)
1✔
458
                if q.fetchLimit > 0 && len(ids) >= int(q.fetchLimit) {
2✔
459
                        break
1✔
460
                }
461
        }
462
        if len(ids) > 0 {
2✔
463
                q.batchCallback(ids)
1✔
464
        }
1✔
465
        // unack to retry
466
        err = q.unack2Retry()
1✔
467
        if err != nil {
1✔
468
                return err
×
469
        }
×
470
        err = q.garbageCollect()
1✔
471
        if err != nil {
1✔
472
                return err
×
473
        }
×
474
        // retry
475
        ids = make([]string, 0, q.fetchLimit)
1✔
476
        for {
2✔
477
                idStr, err := q.retry2Unack()
1✔
478
                if err == NilErr { // consumed all
2✔
479
                        break
1✔
480
                }
481
                if err != nil {
1✔
482
                        return err
×
483
                }
×
484
                ids = append(ids, idStr)
1✔
485
                if q.fetchLimit > 0 && len(ids) >= int(q.fetchLimit) {
2✔
486
                        break
1✔
487
                }
488
        }
489
        if len(ids) > 0 {
2✔
490
                q.batchCallback(ids)
1✔
491
        }
1✔
492
        return nil
1✔
493
}
494

495
// StartConsume creates a goroutine to consume message from DelayQueue
496
// use `<-done` to wait consumer stopping
497
func (q *DelayQueue) StartConsume() (done <-chan struct{}) {
1✔
498
        q.ticker = time.NewTicker(q.fetchInterval)
1✔
499
        done0 := make(chan struct{})
1✔
500
        go func() {
2✔
501
        tickerLoop:
1✔
502
                for {
2✔
503
                        select {
1✔
504
                        case <-q.ticker.C:
1✔
505
                                err := q.consume()
1✔
506
                                if err != nil {
1✔
507
                                        log.Printf("consume error: %v", err)
×
508
                                }
×
509
                        case <-q.close:
1✔
510
                                break tickerLoop
1✔
511
                        }
512
                }
513
                close(done0)
1✔
514
        }()
515
        return done0
1✔
516
}
517

518
// StopConsume stops consumer goroutine
519
func (q *DelayQueue) StopConsume() {
1✔
520
        close(q.close)
1✔
521
        if q.ticker != nil {
2✔
522
                q.ticker.Stop()
1✔
523
        }
1✔
524
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc