• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HDT3213 / delayqueue / 13009842967

28 Jan 2025 11:53AM UTC coverage: 83.879% (+3.5%) from 80.357%
13009842967

push

github

HDT3213
...

666 of 794 relevant lines covered (83.88%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.86
/delayqueue.go
1
package delayqueue
2

3
import (
4
        "errors"
5
        "fmt"
6
        "log"
7
        "strconv"
8
        "strings"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/google/uuid"
14
)
15

16
// DelayQueue is a message queue supporting delayed/scheduled delivery based on redis
17
type DelayQueue struct {
18
        // name for this Queue. Make sure the name is unique in redis database
19
        name               string
20
        redisCli           RedisCli
21
        cb                 func(string) bool
22
        pendingKey         string // sorted set: message id -> delivery time
23
        readyKey           string // list
24
        unAckKey           string // sorted set: message id -> retry time
25
        retryKey           string // list
26
        retryCountKey      string // hash: message id -> remain retry count
27
        garbageKey         string // set: message id
28
        useHashTag         bool
29
        ticker             *time.Ticker
30
        logger             Logger
31
        close              chan struct{}
32
        running            int32
33
        maxConsumeDuration time.Duration // default 5 seconds
34
        msgTTL             time.Duration // default 1 hour
35
        defaultRetryCount  uint          // default 3
36
        fetchInterval      time.Duration // default 1 second
37
        fetchLimit         uint          // default no limit
38
        fetchCount         int32         // actually running task number
39
        concurrent         uint          // default 1, executed serially
40
        sha1map            map[string]string
41
        sha1mapMu          *sync.RWMutex
42
        scriptPreload      bool
43
        // for batch consume
44
        consumeBuffer chan string
45

46
        eventListener       EventListener
47
        nackRedeliveryDelay time.Duration
48
}
49

50
// NilErr represents redis nil
51
var NilErr = errors.New("nil")
52

53
// RedisCli is abstraction for redis client, required commands only not all commands
54
type RedisCli interface {
55
        // Eval sends lua script to redis
56
        // args should be string, integer or float
57
        // returns string, int64, []interface{} (elements can be string or int64)
58
        Eval(script string, keys []string, args []interface{}) (interface{}, error)
59
        Set(key string, value string, expiration time.Duration) error
60
        // Get represents redis command GET
61
        // please NilErr when no such key in redis
62
        Get(key string) (string, error)
63
        Del(keys []string) error
64
        HSet(key string, field string, value string) error
65
        HDel(key string, fields []string) error
66
        SMembers(key string) ([]string, error)
67
        SRem(key string, members []string) error
68
        ZAdd(key string, values map[string]float64) error
69
        ZRem(key string, fields []string) (int64, error)
70
        ZCard(key string) (int64, error)
71
        ZScore(key string, member string) (float64, error)
72
        LLen(key string) (int64, error)
73
        LRem(key string, count int64, value string) (int64, error)
74

75
        // Publish used for monitor only
76
        Publish(channel string, payload string) error
77
        // Subscribe used for monitor only
78
        // returns: payload channel, subscription closer, error; the subscription closer should close payload channel as well
79
        Subscribe(channel string) (payloads <-chan string, close func(), err error)
80

81
        // ScriptLoad call `script load` command
82
        ScriptLoad(script string) (string, error)
83
        // EvalSha run preload scripts
84
        // If there is no preload scripts please return error with message "NOSCRIPT"
85
        EvalSha(sha1 string, keys []string, args []interface{}) (interface{}, error)
86
}
87

88
// Logger is an abstraction of logging system
89
type Logger interface {
90
        Printf(format string, v ...interface{})
91
}
92

93
type hashTagKeyOpt int
94

95
// CallbackFunc receives and consumes messages
96
// returns true to confirm successfully consumed, false to re-deliver this message
97
type CallbackFunc = func(string) bool
98

99
// UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot.
100
// If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue
101
// WARNING! Changing (add or remove) this option will cause DelayQueue failing to read existed data in redis
102
// see more:  https://redis.io/docs/reference/cluster-spec/#hash-tags
103
func UseHashTagKey() interface{} {
1✔
104
        return hashTagKeyOpt(1)
1✔
105
}
1✔
106

107
// NewQueue0 creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message
108
// callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
109
func NewQueue0(name string, cli RedisCli, opts ...interface{}) *DelayQueue {
1✔
110
        if name == "" {
1✔
111
                panic("name is required")
×
112
        }
113
        if cli == nil {
1✔
114
                panic("cli is required")
×
115
        }
116
        useHashTag := false
1✔
117
        var callback CallbackFunc = nil
1✔
118
        for _, opt := range opts {
2✔
119
                switch o := opt.(type) {
1✔
120
                case hashTagKeyOpt:
1✔
121
                        useHashTag = true
1✔
122
                case CallbackFunc:
1✔
123
                        callback = o
1✔
124
                }
125
        }
126
        var keyPrefix string
1✔
127
        if useHashTag {
2✔
128
                keyPrefix = "{dp:" + name + "}"
1✔
129
        } else {
2✔
130
                keyPrefix = "dp:" + name
1✔
131
        }
1✔
132
        return &DelayQueue{
1✔
133
                name:               name,
1✔
134
                redisCli:           cli,
1✔
135
                cb:                 callback,
1✔
136
                pendingKey:         keyPrefix + ":pending",
1✔
137
                readyKey:           keyPrefix + ":ready",
1✔
138
                unAckKey:           keyPrefix + ":unack",
1✔
139
                retryKey:           keyPrefix + ":retry",
1✔
140
                retryCountKey:      keyPrefix + ":retry:cnt",
1✔
141
                garbageKey:         keyPrefix + ":garbage",
1✔
142
                useHashTag:         useHashTag,
1✔
143
                close:              nil,
1✔
144
                maxConsumeDuration: 5 * time.Second,
1✔
145
                msgTTL:             time.Hour,
1✔
146
                logger:             log.Default(),
1✔
147
                defaultRetryCount:  3,
1✔
148
                fetchInterval:      time.Second,
1✔
149
                concurrent:         1,
1✔
150
                sha1map:            make(map[string]string),
1✔
151
                sha1mapMu:          &sync.RWMutex{},
1✔
152
                scriptPreload:      true,
1✔
153
        }
1✔
154
}
155

156
// WithCallback set callback for queue to receives and consumes messages
157
// callback returns true to confirm successfully consumed, false to re-deliver this message
158
func (q *DelayQueue) WithCallback(callback CallbackFunc) *DelayQueue {
1✔
159
        q.cb = callback
1✔
160
        return q
1✔
161
}
1✔
162

163
// WithLogger customizes logger for queue
164
func (q *DelayQueue) WithLogger(logger Logger) *DelayQueue {
1✔
165
        q.logger = logger
1✔
166
        return q
1✔
167
}
1✔
168

169
// WithFetchInterval customizes the interval at which consumer fetch message from redis
170
func (q *DelayQueue) WithFetchInterval(d time.Duration) *DelayQueue {
1✔
171
        q.fetchInterval = d
1✔
172
        return q
1✔
173
}
1✔
174

175
// WithScriptPreload use script load command preload scripts to redis
176
func (q *DelayQueue) WithScriptPreload(flag bool) *DelayQueue {
1✔
177
        q.scriptPreload = flag
1✔
178
        return q
1✔
179
}
1✔
180

181
// WithMaxConsumeDuration customizes max consume duration
182
// If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again
183
func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue {
1✔
184
        q.maxConsumeDuration = d
1✔
185
        return q
1✔
186
}
1✔
187

188
// WithFetchLimit limits the max number of processing messages, 0 means no limit
189
func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue {
1✔
190
        q.fetchLimit = limit
1✔
191
        return q
1✔
192
}
1✔
193

194
// WithConcurrent sets the number of concurrent consumers
195
func (q *DelayQueue) WithConcurrent(c uint) *DelayQueue {
1✔
196
        if c == 0 {
1✔
197
                panic("concurrent cannot be 0")
×
198
        }
199
        q.assertNotRunning()
1✔
200
        q.concurrent = c
1✔
201
        return q
1✔
202
}
203

204
// WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue
205
// use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message
206
func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue {
1✔
207
        q.defaultRetryCount = count
1✔
208
        return q
1✔
209
}
1✔
210

211
// WithNackRedeliveryDelay customizes the interval between redelivery and nack (callback returns false)
212
// If consumption exceeded deadline, the message will be redelivered immediately
213
func (q *DelayQueue) WithNackRedeliveryDelay(d time.Duration) *DelayQueue {
1✔
214
        q.nackRedeliveryDelay = d
1✔
215
        return q
1✔
216
}
1✔
217

218
func (q *DelayQueue) genMsgKey(idStr string) string {
1✔
219
        if q.useHashTag {
2✔
220
                return "{dp:" + q.name + "}" + ":msg:" + idStr
1✔
221
        }
1✔
222
        return "dp:" + q.name + ":msg:" + idStr
1✔
223
}
224

225
type retryCountOpt int
226

227
// WithRetryCount set retry count for a msg
228
// example: queue.SendDelayMsg(payload, duration, delayqueue.WithRetryCount(3))
229
func WithRetryCount(count int) interface{} {
1✔
230
        return retryCountOpt(count)
1✔
231
}
1✔
232

233
type msgTTLOpt time.Duration
234

235
// WithMsgTTL set ttl for a msg
236
// example: queue.SendDelayMsg(payload, duration, delayqueue.WithMsgTTL(Hour))
237
func WithMsgTTL(d time.Duration) interface{} {
1✔
238
        return msgTTLOpt(d)
1✔
239
}
1✔
240

241
// MessageInfo stores information to trace a message
242
type MessageInfo struct {
243
        id string
244
}
245

246
func (msg *MessageInfo) ID() string {
1✔
247
        return msg.id
1✔
248
}
1✔
249

250
const (
251
        StatePending    = "pending"
252
        StateReady      = "ready"
253
        StateReadyRetry = "ready_to_retry"
254
        StateConsuming  = "consuming"
255
        StateUnknown    = "unknown"
256
)
257

258
// SendScheduleMsgV2 submits a message delivered at given time
259
func (q *DelayQueue) SendScheduleMsgV2(payload string, t time.Time, opts ...interface{}) (*MessageInfo, error) {
1✔
260
        // parse options
1✔
261
        retryCount := q.defaultRetryCount
1✔
262
        for _, opt := range opts {
2✔
263
                switch o := opt.(type) {
1✔
264
                case retryCountOpt:
1✔
265
                        retryCount = uint(o)
1✔
266
                case msgTTLOpt:
1✔
267
                        q.msgTTL = time.Duration(o)
1✔
268
                }
269
        }
270
        // generate id
271
        idStr := uuid.Must(uuid.NewRandom()).String()
1✔
272
        now := time.Now()
1✔
273
        // store msg
1✔
274
        msgTTL := t.Sub(now) + q.msgTTL // delivery + q.msgTTL
1✔
275
        err := q.redisCli.Set(q.genMsgKey(idStr), payload, msgTTL)
1✔
276
        if err != nil {
1✔
277
                return nil, fmt.Errorf("store msg failed: %v", err)
×
278
        }
×
279
        // store retry count
280
        err = q.redisCli.HSet(q.retryCountKey, idStr, strconv.Itoa(int(retryCount)))
1✔
281
        if err != nil {
1✔
282
                return nil, fmt.Errorf("store retry count failed: %v", err)
×
283
        }
×
284
        // put to pending
285
        err = q.redisCli.ZAdd(q.pendingKey, map[string]float64{idStr: float64(t.Unix())})
1✔
286
        if err != nil {
1✔
287
                return nil, fmt.Errorf("push to pending failed: %v", err)
×
288
        }
×
289
        q.reportEvent(NewMessageEvent, 1)
1✔
290
        return &MessageInfo{
1✔
291
                id: idStr,
1✔
292
        }, nil
1✔
293
}
294

295
// SendDelayMsg submits a message delivered after given duration
296
func (q *DelayQueue) SendDelayMsgV2(payload string, duration time.Duration, opts ...interface{}) (*MessageInfo, error) {
1✔
297
        t := time.Now().Add(duration)
1✔
298
        return q.SendScheduleMsgV2(payload, t, opts...)
1✔
299
}
1✔
300

301
// SendScheduleMsg submits a message delivered at given time
302
// It is compatible with SendScheduleMsgV2, but does not return MessageInfo
303
func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error {
1✔
304
        _, err := q.SendScheduleMsgV2(payload, t, opts...)
1✔
305
        return err
1✔
306
}
1✔
307

308
// SendDelayMsg submits a message delivered after given duration
309
// It is compatible with SendDelayMsgV2, but does not return MessageInfo
310
func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error {
1✔
311
        t := time.Now().Add(duration)
1✔
312
        return q.SendScheduleMsg(payload, t, opts...)
1✔
313
}
1✔
314

315
type InterceptResult struct {
316
        Intercepted bool
317
        State       string
318
}
319

320
// TryIntercept trys to intercept a message
321
func (q *DelayQueue) TryIntercept(msg *MessageInfo) (*InterceptResult, error) {
1✔
322
        id := msg.ID()
1✔
323
        // try to intercept at ready
1✔
324
        removed, err := q.redisCli.LRem(q.readyKey, 0, id)
1✔
325
        if err != nil {
1✔
326
                q.logger.Printf("intercept %s from ready failed: %v", id, err)
×
327
        }
×
328
        if removed > 0 {
2✔
329
                _ = q.redisCli.Del([]string{q.genMsgKey(id)})
1✔
330
                _ = q.redisCli.HDel(q.retryCountKey, []string{id})
1✔
331
                return &InterceptResult{
1✔
332
                        Intercepted: true,
1✔
333
                        State:       StateReady,
1✔
334
                }, nil
1✔
335
        }
1✔
336
        // try to intercept at pending
337
        removed, err = q.redisCli.ZRem(q.pendingKey, []string{id})
1✔
338
        if err != nil {
1✔
339
                q.logger.Printf("intercept %s from pending failed: %v", id, err)
×
340
        }
×
341
        if removed > 0 {
2✔
342
                _ = q.redisCli.Del([]string{q.genMsgKey(id)})
1✔
343
                _ = q.redisCli.HDel(q.retryCountKey, []string{id})
1✔
344
                return &InterceptResult{
1✔
345
                        Intercepted: true,
1✔
346
                        State:       StatePending,
1✔
347
                }, nil
1✔
348
        }
1✔
349
        // message may be being consumed or has been successfully consumed
350
        // if the message has been successfully consumed, the following action will cause nothing
351
        // if the message is being consumed,the following action will prevent it from being retried
352
        q.redisCli.HDel(q.retryCountKey, []string{id})
1✔
353
        q.redisCli.LRem(q.retryKey, 0, id)
1✔
354

1✔
355
        return &InterceptResult{
1✔
356
                Intercepted: false,
1✔
357
                State:       StateUnknown,
1✔
358
        }, nil
1✔
359
}
360

361
func (q *DelayQueue) loadScript(script string) (string, error) {
1✔
362
        sha1, err := q.redisCli.ScriptLoad(script)
1✔
363
        if err != nil {
1✔
364
                return "", err
×
365
        }
×
366
        q.sha1mapMu.Lock()
1✔
367
        q.sha1map[script] = sha1
1✔
368
        q.sha1mapMu.Unlock()
1✔
369
        return sha1, nil
1✔
370
}
371

372
func (q *DelayQueue) eval(script string, keys []string, args []interface{}) (interface{}, error) {
1✔
373
        if !q.scriptPreload {
2✔
374
                return q.redisCli.Eval(script, keys, args)
1✔
375
        }
1✔
376
        var err error
1✔
377
        q.sha1mapMu.RLock()
1✔
378
        sha1, ok := q.sha1map[script]
1✔
379
        q.sha1mapMu.RUnlock()
1✔
380
        if !ok {
2✔
381
                sha1, err = q.loadScript(script)
1✔
382
                if err != nil {
1✔
383
                        return nil, err
×
384
                }
×
385
        }
386
        result, err := q.redisCli.EvalSha(sha1, keys, args)
1✔
387
        if err == nil {
2✔
388
                return result, err
1✔
389
        }
1✔
390
        // script not loaded, reload it
391
        // It is possible to access a node in the cluster that has no pre-loaded scripts.
392
        if strings.HasPrefix(err.Error(), "NOSCRIPT") {
1✔
393
                sha1, err = q.loadScript(script)
×
394
                if err != nil {
×
395
                        return nil, err
×
396
                }
×
397
                // try again
398
                result, err = q.redisCli.EvalSha(sha1, keys, args)
×
399
        }
400
        return result, err
1✔
401
}
402

403
// pending2ReadyScript atomically moves messages from pending to ready
404
// keys: pendingKey, readyKey
405
// argv: currentTime
406
// returns: ready message number
407
const pending2ReadyScript = `
408
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- get ready msg
409
if (#msgs == 0) then return end
410
local args2 = {} -- keys to push into ready
411
for _,v in ipairs(msgs) do
412
        table.insert(args2, v) 
413
    if (#args2 == 4000) then
414
                redis.call('LPush', KEYS[2], unpack(args2))
415
                args2 = {}
416
        end
417
end
418
if (#args2 > 0) then 
419
        redis.call('LPush', KEYS[2], unpack(args2))
420
end
421
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- remove msgs from pending
422
return #msgs
423
`
424

425
func (q *DelayQueue) pending2Ready() error {
1✔
426
        now := time.Now().Unix()
1✔
427
        keys := []string{q.pendingKey, q.readyKey}
1✔
428
        raw, err := q.eval(pending2ReadyScript, keys, []interface{}{now})
1✔
429
        if err != nil && err != NilErr {
1✔
430
                return fmt.Errorf("pending2ReadyScript failed: %v", err)
×
431
        }
×
432
        count, ok := raw.(int64)
1✔
433
        if ok {
2✔
434
                q.reportEvent(ReadyEvent, int(count))
1✔
435
        }
1✔
436
        return nil
1✔
437
}
438

439
// ready2UnackScript atomically moves messages from ready to unack
440
// keys: readyKey/retryKey, unackKey
441
// argv: retryTime
442
const ready2UnackScript = `
443
local msg = redis.call('RPop', KEYS[1])
444
if (not msg) then return end
445
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
446
return msg
447
`
448

449
func (q *DelayQueue) ready2Unack() (string, error) {
1✔
450
        retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
1✔
451
        keys := []string{q.readyKey, q.unAckKey}
1✔
452
        ret, err := q.eval(ready2UnackScript, keys, []interface{}{retryTime})
1✔
453
        if err == NilErr {
2✔
454
                return "", err
1✔
455
        }
1✔
456
        if err != nil {
1✔
457
                return "", fmt.Errorf("ready2UnackScript failed: %v", err)
×
458
        }
×
459
        str, ok := ret.(string)
1✔
460
        if !ok {
1✔
461
                return "", fmt.Errorf("illegal result: %#v", ret)
×
462
        }
×
463
        q.reportEvent(DeliveredEvent, 1)
1✔
464
        return str, nil
1✔
465
}
466

467
func (q *DelayQueue) retry2Unack() (string, error) {
1✔
468
        retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
1✔
469
        keys := []string{q.retryKey, q.unAckKey}
1✔
470
        ret, err := q.eval(ready2UnackScript, keys, []interface{}{retryTime, q.retryKey, q.unAckKey})
1✔
471
        if err == NilErr {
2✔
472
                return "", NilErr
1✔
473
        }
1✔
474
        if err != nil {
1✔
475
                return "", fmt.Errorf("ready2UnackScript failed: %v", err)
×
476
        }
×
477
        str, ok := ret.(string)
1✔
478
        if !ok {
1✔
479
                return "", fmt.Errorf("illegal result: %#v", ret)
×
480
        }
×
481
        return str, nil
1✔
482
}
483

484
func (q *DelayQueue) callback(idStr string) error {
1✔
485
        payload, err := q.redisCli.Get(q.genMsgKey(idStr))
1✔
486
        if err == NilErr {
1✔
487
                return nil
×
488
        }
×
489
        if err != nil {
1✔
490
                // Is an IO error?
×
491
                return fmt.Errorf("get message payload failed: %v", err)
×
492
        }
×
493
        ack := q.cb(payload)
1✔
494
        if ack {
2✔
495
                err = q.ack(idStr)
1✔
496
        } else {
2✔
497
                err = q.nack(idStr)
1✔
498
        }
1✔
499
        return err
1✔
500
}
501

502
func (q *DelayQueue) ack(idStr string) error {
1✔
503
        atomic.AddInt32(&q.fetchCount, -1)
1✔
504
        _, err := q.redisCli.ZRem(q.unAckKey, []string{idStr})
1✔
505
        if err != nil {
1✔
506
                return fmt.Errorf("remove from unack failed: %v", err)
×
507
        }
×
508
        // msg key has ttl, ignore result of delete
509
        _ = q.redisCli.Del([]string{q.genMsgKey(idStr)})
1✔
510
        _ = q.redisCli.HDel(q.retryCountKey, []string{idStr})
1✔
511
        q.reportEvent(AckEvent, 1)
1✔
512
        return nil
1✔
513
}
514

515
// updateZSetScoreScript update score of a zset member if it exists
516
// KEYS[1]: zset
517
// ARGV[1]: score
518
// ARGV[2]: member
519
const updateZSetScoreScript = `
520
if redis.call('zrank', KEYS[1], ARGV[2]) ~= nil then
521
    return redis.call('zadd', KEYS[1], ARGV[1], ARGV[2])
522
else
523
    return 0
524
end
525
`
526

527
func (q *DelayQueue) updateZSetScore(key string, score float64, member string) error {
1✔
528
        scoreStr := strconv.FormatFloat(score, 'f', -1, 64)
1✔
529
        _, err := q.eval(updateZSetScoreScript, []string{key}, []interface{}{scoreStr, member})
1✔
530
        return err
1✔
531
}
1✔
532

533
func (q *DelayQueue) nack(idStr string) error {
1✔
534
        atomic.AddInt32(&q.fetchCount, -1)
1✔
535
        retryTime := float64(time.Now().Add(q.nackRedeliveryDelay).Unix())
1✔
536
        // if message consumption has not reach deadlin (still in unAckKey), then update its retry time
1✔
537
        err := q.updateZSetScore(q.unAckKey, retryTime, idStr)
1✔
538
        if err != nil {
1✔
539
                return fmt.Errorf("negative ack failed: %v", err)
×
540
        }
×
541
        q.reportEvent(NackEvent, 1)
1✔
542
        return nil
1✔
543
}
544

545
// unack2RetryScript atomically moves messages from unack to retry which remaining retry count greater than 0,
546
// and moves messages from unack to garbage which  retry count is 0
547
// Because DelayQueue cannot determine garbage message before eval unack2RetryScript, so it cannot pass keys parameter to redisCli.Eval
548
// Therefore unack2RetryScript moves garbage message to garbageKey instead of deleting directly
549
// keys: unackKey, retryCountKey, retryKey, garbageKey
550
// argv: currentTime
551
// returns: {retryMsgs, failMsgs}
552
const unack2RetryScript = `
553
local unack2retry = function(msgs)
554
        local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- get retry count
555
        local retryMsgs = 0
556
        local failMsgs = 0
557
        for i,v in ipairs(retryCounts) do
558
                local k = msgs[i]
559
                if v ~= false and v ~= nil and v ~= '' and tonumber(v) > 0 then
560
                        redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count
561
                        redis.call("LPush", KEYS[3], k) -- add to retry
562
                        retryMsgs = retryMsgs + 1
563
                else
564
                        redis.call("HDel", KEYS[2], k) -- del retry count
565
                        redis.call("SAdd", KEYS[4], k) -- add to garbage
566
                        failMsgs = failMsgs + 1
567
                end
568
        end
569
        return retryMsgs, failMsgs
570
end
571

572
local retryMsgs = 0
573
local failMsgs = 0
574
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- get retry msg
575
if (#msgs == 0) then return end
576
if #msgs < 4000 then
577
        local d1, d2 = unack2retry(msgs)
578
        retryMsgs = retryMsgs + d1
579
        failMsgs = failMsgs + d2
580
else
581
        local buf = {}
582
        for _,v in ipairs(msgs) do
583
                table.insert(buf, v)
584
                if #buf == 4000 then
585
                    local d1, d2 = unack2retry(buf)
586
                        retryMsgs = retryMsgs + d1
587
                        failMsgs = failMsgs + d2
588
                        buf = {}
589
                end
590
        end
591
        if (#buf > 0) then
592
                local d1, d2 = unack2retry(buf)
593
                retryMsgs = retryMsgs + d1
594
                failMsgs = failMsgs + d2
595
        end
596
end
597
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- remove msgs from unack
598
return {retryMsgs, failMsgs}
599
`
600

601
func (q *DelayQueue) unack2Retry() error {
1✔
602
        keys := []string{q.unAckKey, q.retryCountKey, q.retryKey, q.garbageKey}
1✔
603
        now := time.Now()
1✔
604
        raw, err := q.eval(unack2RetryScript, keys, []interface{}{now.Unix()})
1✔
605
        if err != nil && err != NilErr {
1✔
606
                return fmt.Errorf("unack to retry script failed: %v", err)
×
607
        }
×
608
        infos, ok := raw.([]interface{})
1✔
609
        if ok && len(infos) == 2 {
2✔
610
                retryCount, ok := infos[0].(int64)
1✔
611
                if ok {
2✔
612
                        q.reportEvent(RetryEvent, int(retryCount))
1✔
613
                }
1✔
614
                failCount, ok := infos[1].(int64)
1✔
615
                if ok {
2✔
616
                        q.reportEvent(FinalFailedEvent, int(failCount))
1✔
617
                }
1✔
618
        }
619
        return nil
1✔
620
}
621

622
func (q *DelayQueue) garbageCollect() error {
1✔
623
        msgIds, err := q.redisCli.SMembers(q.garbageKey)
1✔
624
        if err != nil {
1✔
625
                return fmt.Errorf("smembers failed: %v", err)
×
626
        }
×
627
        if len(msgIds) == 0 {
2✔
628
                return nil
1✔
629
        }
1✔
630
        // allow concurrent clean
631
        msgKeys := make([]string, 0, len(msgIds))
1✔
632
        for _, idStr := range msgIds {
2✔
633
                msgKeys = append(msgKeys, q.genMsgKey(idStr))
1✔
634
        }
1✔
635
        err = q.redisCli.Del(msgKeys)
1✔
636
        if err != nil && err != NilErr {
1✔
637
                return fmt.Errorf("del msgs failed: %v", err)
×
638
        }
×
639
        err = q.redisCli.SRem(q.garbageKey, msgIds)
1✔
640
        if err != nil && err != NilErr {
1✔
641
                return fmt.Errorf("remove from garbage key failed: %v", err)
×
642
        }
×
643
        return nil
1✔
644
}
645

646
func (q *DelayQueue) beforeConsume() ([]string, error) {
1✔
647
        // pending to ready
1✔
648
        err := q.pending2Ready()
1✔
649
        if err != nil {
1✔
650
                return nil, err
×
651
        }
×
652
        // ready2Unack
653
        // prioritize new message consumption to avoid avalanches
654
        ids := make([]string, 0, q.fetchLimit)
1✔
655
        var fetchCount int32
1✔
656
        for {
2✔
657
                fetchCount = atomic.LoadInt32(&q.fetchCount)
1✔
658
                if q.fetchLimit > 0 && fetchCount >= int32(q.fetchLimit) {
2✔
659
                        break
1✔
660
                }
661
                idStr, err := q.ready2Unack()
1✔
662
                if err == NilErr { // consumed all
2✔
663
                        break
1✔
664
                }
665
                if err != nil {
1✔
666
                        return nil, err
×
667
                }
×
668
                ids = append(ids, idStr)
1✔
669
                atomic.AddInt32(&q.fetchCount, 1)
1✔
670
        }
671
        // retry2Unack
672
        if fetchCount < int32(q.fetchLimit) || q.fetchLimit == 0 {
2✔
673
                for {
2✔
674
                        fetchCount = atomic.LoadInt32(&q.fetchCount)
1✔
675
                        if q.fetchLimit > 0 && fetchCount >= int32(q.fetchLimit) {
2✔
676
                                break
1✔
677
                        }
678
                        idStr, err := q.retry2Unack()
1✔
679
                        if err == NilErr { // consumed all
2✔
680
                                break
1✔
681
                        }
682
                        if err != nil {
1✔
683
                                return nil, err
×
684
                        }
×
685
                        ids = append(ids, idStr)
1✔
686
                        atomic.AddInt32(&q.fetchCount, 1)
1✔
687
                }
688
        }
689
        return ids, nil
1✔
690
}
691

692
func (q *DelayQueue) afterConsume() error {
1✔
693
        // unack to retry
1✔
694
        err := q.unack2Retry()
1✔
695
        if err != nil {
1✔
696
                return err
×
697
        }
×
698
        err = q.garbageCollect()
1✔
699
        if err != nil {
1✔
700
                return err
×
701
        }
×
702
        return nil
1✔
703
}
704

705
func (q *DelayQueue) setRunning() {
1✔
706
        atomic.StoreInt32(&q.running, 1)
1✔
707
}
1✔
708

709
func (q *DelayQueue) setNotRunning() {
1✔
710
        atomic.StoreInt32(&q.running, 0)
1✔
711
}
1✔
712

713
func (q *DelayQueue) assertNotRunning() {
1✔
714
        running := atomic.LoadInt32(&q.running)
1✔
715
        if running > 0 {
1✔
716
                panic("operation cannot be performed during running")
×
717
        }
718
}
719

720
func (q *DelayQueue) goWithRecover(fn func()) {
1✔
721
        go func() {
2✔
722
                defer func() {
2✔
723
                        if err := recover(); err != nil {
1✔
724
                                q.logger.Printf("panic: %v\n", err)
×
725
                        }
×
726
                }()
727
                fn()
1✔
728
        }()
729
}
730

731
// StartConsume creates a goroutine to consume message from DelayQueue
732
// use `<-done` to wait consumer stopping
733
// If there is no callback set, StartConsume will panic
734
func (q *DelayQueue) StartConsume() (done <-chan struct{}) {
1✔
735
        if q.cb == nil {
1✔
736
                panic("this instance has no callback")
×
737
        }
738
        q.close = make(chan struct{}, 1)
1✔
739
        q.setRunning()
1✔
740
        q.ticker = time.NewTicker(q.fetchInterval)
1✔
741
        q.consumeBuffer = make(chan string, q.fetchLimit)
1✔
742
        done0 := make(chan struct{})
1✔
743
        // start worker
1✔
744
        for i := 0; i < int(q.concurrent); i++ {
2✔
745
                q.goWithRecover(func() {
2✔
746
                        for id := range q.consumeBuffer {
2✔
747
                                q.callback(id)
1✔
748
                                q.afterConsume()
1✔
749
                        }
1✔
750
                })
751
        }
752
        // start main loop
753
        go func() {
2✔
754
        tickerLoop:
1✔
755
                for {
2✔
756
                        select {
1✔
757
                        case <-q.ticker.C:
1✔
758
                                ids, err := q.beforeConsume()
1✔
759
                                if err != nil {
1✔
760
                                        log.Printf("consume error: %v", err)
×
761
                                }
×
762
                                q.goWithRecover(func() {
2✔
763
                                        for _, id := range ids {
2✔
764
                                                q.consumeBuffer <- id
1✔
765
                                        }
1✔
766
                                })
767
                        case <-q.close:
1✔
768
                                break tickerLoop
1✔
769
                        }
770
                }
771
                close(done0)
1✔
772
        }()
773
        return done0
1✔
774
}
775

776
// StopConsume stops consumer goroutine
777
func (q *DelayQueue) StopConsume() {
1✔
778
        close(q.close)
1✔
779
        q.setNotRunning()
1✔
780
        if q.ticker != nil {
2✔
781
                q.ticker.Stop()
1✔
782
        }
1✔
783
}
784

785
// GetPendingCount returns the number of pending messages
786
func (q *DelayQueue) GetPendingCount() (int64, error) {
1✔
787
        return q.redisCli.ZCard(q.pendingKey)
1✔
788
}
1✔
789

790
// GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered
791
func (q *DelayQueue) GetReadyCount() (int64, error) {
1✔
792
        return q.redisCli.LLen(q.readyKey)
1✔
793
}
1✔
794

795
// GetProcessingCount returns the number of messages which are being processed
796
func (q *DelayQueue) GetProcessingCount() (int64, error) {
1✔
797
        return q.redisCli.ZCard(q.unAckKey)
1✔
798
}
1✔
799

800
// EventListener which will be called when events occur
801
// This Listener can be used to monitor running status
802
type EventListener interface {
803
        // OnEvent will be called when events occur
804
        OnEvent(*Event)
805
}
806

807
// ListenEvent register a listener which will be called when events occur,
808
// so it can be used to monitor running status
809
//
810
// But It can ONLY receive events from the CURRENT INSTANCE,
811
// if you want to listen to all events in queue, just use Monitor.ListenEvent
812
//
813
// There can be AT MOST ONE EventListener in an DelayQueue instance.
814
// If you are using customized listener, Monitor will stop working
815
func (q *DelayQueue) ListenEvent(listener EventListener) {
1✔
816
        q.eventListener = listener
1✔
817
}
1✔
818

819
// RemoveListener stops reporting events to EventListener
820
func (q *DelayQueue) DisableListener() {
×
821
        q.eventListener = nil
×
822
}
×
823

824
func (q *DelayQueue) reportEvent(code int, count int) {
1✔
825
        listener := q.eventListener // eventListener may be changed during running
1✔
826
        if listener != nil && count > 0 {
2✔
827
                event := &Event{
1✔
828
                        Code:      code,
1✔
829
                        Timestamp: time.Now().Unix(),
1✔
830
                        MsgCount:  count,
1✔
831
                }
1✔
832
                listener.OnEvent(event)
1✔
833
        }
1✔
834
}
835

836
// pubsubListener receives events and reports them through redis pubsub for monitoring
837
type pubsubListener struct {
838
        redisCli   RedisCli
839
        reportChan string
840
}
841

842
func genReportChannel(name string) string {
1✔
843
        return "dq:" + name + ":reportEvents"
1✔
844
}
1✔
845

846
// EnableReport enables reporting to monitor
847
func (q *DelayQueue) EnableReport() {
1✔
848
        reportChan := genReportChannel(q.name)
1✔
849
        q.ListenEvent(&pubsubListener{
1✔
850
                redisCli:   q.redisCli,
1✔
851
                reportChan: reportChan,
1✔
852
        })
1✔
853
}
1✔
854

855
// DisableReport stops reporting to monitor
856
func (q *DelayQueue) DisableReport() {
×
857
        q.DisableListener()
×
858
}
×
859

860
func (l *pubsubListener) OnEvent(event *Event) {
1✔
861
        payload := encodeEvent(event)
1✔
862
        l.redisCli.Publish(l.reportChan, payload)
1✔
863
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc