• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HDT3213 / delayqueue / 13690045745

06 Mar 2025 02:12AM UTC coverage: 83.96% (-0.3%) from 84.211%
13690045745

Pull #18

github

web-flow
Fix: WithLogger doesn't work when cause consume error
Pull Request #18: Fix: withLogger doesn't work when cause consume error

0 of 1 new or added line in 1 file covered. (0.0%)

2 existing lines in 1 file now uncovered.

670 of 798 relevant lines covered (83.96%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.99
/delayqueue.go
1
package delayqueue
2

3
import (
4
        "errors"
5
        "fmt"
6
        "log"
7
        "strconv"
8
        "strings"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/google/uuid"
14
)
15

16
// DelayQueue is a message queue supporting delayed/scheduled delivery based on redis
17
type DelayQueue struct {
18
        // name for this Queue. Make sure the name is unique in redis database
19
        name               string
20
        redisCli           RedisCli
21
        cb                 func(string) bool
22
        pendingKey         string // sorted set: message id -> delivery time
23
        readyKey           string // list
24
        unAckKey           string // sorted set: message id -> retry time
25
        retryKey           string // list
26
        retryCountKey      string // hash: message id -> remain retry count
27
        garbageKey         string // set: message id
28
        useHashTag         bool
29
        ticker             *time.Ticker
30
        logger             Logger
31
        close              chan struct{}
32
        running            int32
33
        maxConsumeDuration time.Duration // default 5 seconds
34
        msgTTL             time.Duration // default 1 hour
35
        defaultRetryCount  uint          // default 3
36
        fetchInterval      time.Duration // default 1 second
37
        fetchLimit         uint          // default no limit
38
        fetchCount         int32         // actually running task number
39
        concurrent         uint          // default 1, executed serially
40
        sha1map            map[string]string
41
        sha1mapMu          *sync.RWMutex
42
        scriptPreload      bool
43
        // for batch consume
44
        consumeBuffer chan string
45

46
        eventListener       EventListener
47
        nackRedeliveryDelay time.Duration
48
}
49

50
// NilErr represents redis nil
51
var NilErr = errors.New("nil")
52

53
// RedisCli is abstraction for redis client, required commands only not all commands
54
type RedisCli interface {
55
        // Eval sends lua script to redis
56
        // args should be string, integer or float
57
        // returns string, int64, []interface{} (elements can be string or int64)
58
        Eval(script string, keys []string, args []interface{}) (interface{}, error)
59
        Set(key string, value string, expiration time.Duration) error
60
        // Get represents redis command GET
61
        // please NilErr when no such key in redis
62
        Get(key string) (string, error)
63
        Del(keys []string) error
64
        HSet(key string, field string, value string) error
65
        HDel(key string, fields []string) error
66
        SMembers(key string) ([]string, error)
67
        SRem(key string, members []string) error
68
        ZAdd(key string, values map[string]float64) error
69
        ZRem(key string, fields []string) (int64, error)
70
        ZCard(key string) (int64, error)
71
        ZScore(key string, member string) (float64, error)
72
        LLen(key string) (int64, error)
73
        LRem(key string, count int64, value string) (int64, error)
74

75
        // Publish used for monitor only
76
        Publish(channel string, payload string) error
77
        // Subscribe used for monitor only
78
        // returns: payload channel, subscription closer, error; the subscription closer should close payload channel as well
79
        Subscribe(channel string) (payloads <-chan string, close func(), err error)
80

81
        // ScriptLoad call `script load` command
82
        ScriptLoad(script string) (string, error)
83
        // EvalSha run preload scripts
84
        // If there is no preload scripts please return error with message "NOSCRIPT"
85
        EvalSha(sha1 string, keys []string, args []interface{}) (interface{}, error)
86
}
87

88
// Logger is an abstraction of logging system
89
type Logger interface {
90
        Printf(format string, v ...interface{})
91
}
92

93
type hashTagKeyOpt int
94
type prefixOpt string
95

96
// CallbackFunc receives and consumes messages
97
// returns true to confirm successfully consumed, false to re-deliver this message
98
type CallbackFunc = func(string) bool
99

100
// UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot.
101
// If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue
102
// WARNING! Changing (add or remove) this option will cause DelayQueue failing to read existed data in redis
103
// see more:  https://redis.io/docs/reference/cluster-spec/#hash-tags
104
func UseHashTagKey() interface{} {
1✔
105
        return hashTagKeyOpt(1)
1✔
106
}
1✔
107

108
// UseCustomPrefix customize prefix to instead of default prefix "dp"
109
func UseCustomPrefix(prefix string) interface{} {
1✔
110
        return prefixOpt(prefix)
1✔
111
}
1✔
112

113
// NewQueue0 creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message
114
// callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
115
func NewQueue0(name string, cli RedisCli, opts ...interface{}) *DelayQueue {
1✔
116
        if name == "" {
1✔
117
                panic("name is required")
×
118
        }
119
        if cli == nil {
1✔
120
                panic("cli is required")
×
121
        }
122
        prefix := "dp"
1✔
123
        useHashTag := false
1✔
124
        var callback CallbackFunc = nil
1✔
125
        for _, opt := range opts {
2✔
126
                switch o := opt.(type) {
1✔
127
                case hashTagKeyOpt:
1✔
128
                        useHashTag = true
1✔
129
                case prefixOpt:
1✔
130
                        prefix = string(o)
1✔
131
                case CallbackFunc:
1✔
132
                        callback = o
1✔
133
                }
134
        }
135
        keyPrefix := prefix + ":" + name
1✔
136
        if useHashTag {
2✔
137
                keyPrefix = "{" + keyPrefix + "}"
1✔
138
        } 
1✔
139
        return &DelayQueue{
1✔
140
                name:               name,
1✔
141
                redisCli:           cli,
1✔
142
                cb:                 callback,
1✔
143
                pendingKey:         keyPrefix + ":pending",
1✔
144
                readyKey:           keyPrefix + ":ready",
1✔
145
                unAckKey:           keyPrefix + ":unack",
1✔
146
                retryKey:           keyPrefix + ":retry",
1✔
147
                retryCountKey:      keyPrefix + ":retry:cnt",
1✔
148
                garbageKey:         keyPrefix + ":garbage",
1✔
149
                useHashTag:         useHashTag,
1✔
150
                close:              nil,
1✔
151
                maxConsumeDuration: 5 * time.Second,
1✔
152
                msgTTL:             time.Hour,
1✔
153
                logger:             log.Default(),
1✔
154
                defaultRetryCount:  3,
1✔
155
                fetchInterval:      time.Second,
1✔
156
                concurrent:         1,
1✔
157
                sha1map:            make(map[string]string),
1✔
158
                sha1mapMu:          &sync.RWMutex{},
1✔
159
                scriptPreload:      true,
1✔
160
        }
1✔
161
}
162

163
// WithCallback set callback for queue to receives and consumes messages
164
// callback returns true to confirm successfully consumed, false to re-deliver this message
165
func (q *DelayQueue) WithCallback(callback CallbackFunc) *DelayQueue {
1✔
166
        q.cb = callback
1✔
167
        return q
1✔
168
}
1✔
169

170
// WithLogger customizes logger for queue
171
func (q *DelayQueue) WithLogger(logger Logger) *DelayQueue {
1✔
172
        q.logger = logger
1✔
173
        return q
1✔
174
}
1✔
175

176
// WithFetchInterval customizes the interval at which consumer fetch message from redis
177
func (q *DelayQueue) WithFetchInterval(d time.Duration) *DelayQueue {
1✔
178
        q.fetchInterval = d
1✔
179
        return q
1✔
180
}
1✔
181

182
// WithScriptPreload use script load command preload scripts to redis
183
func (q *DelayQueue) WithScriptPreload(flag bool) *DelayQueue {
1✔
184
        q.scriptPreload = flag
1✔
185
        return q
1✔
186
}
1✔
187

188
// WithMaxConsumeDuration customizes max consume duration
189
// If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again
190
func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue {
1✔
191
        q.maxConsumeDuration = d
1✔
192
        return q
1✔
193
}
1✔
194

195
// WithFetchLimit limits the max number of processing messages, 0 means no limit
196
func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue {
1✔
197
        q.fetchLimit = limit
1✔
198
        return q
1✔
199
}
1✔
200

201
// WithConcurrent sets the number of concurrent consumers
202
func (q *DelayQueue) WithConcurrent(c uint) *DelayQueue {
1✔
203
        if c == 0 {
1✔
204
                panic("concurrent cannot be 0")
×
205
        }
206
        q.assertNotRunning()
1✔
207
        q.concurrent = c
1✔
208
        return q
1✔
209
}
210

211
// WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue
212
// use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message
213
func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue {
1✔
214
        q.defaultRetryCount = count
1✔
215
        return q
1✔
216
}
1✔
217

218
// WithNackRedeliveryDelay customizes the interval between redelivery and nack (callback returns false)
219
// If consumption exceeded deadline, the message will be redelivered immediately
220
func (q *DelayQueue) WithNackRedeliveryDelay(d time.Duration) *DelayQueue {
1✔
221
        q.nackRedeliveryDelay = d
1✔
222
        return q
1✔
223
}
1✔
224

225
func (q *DelayQueue) genMsgKey(idStr string) string {
1✔
226
        if q.useHashTag {
2✔
227
                return "{dp:" + q.name + "}" + ":msg:" + idStr
1✔
228
        }
1✔
229
        return "dp:" + q.name + ":msg:" + idStr
1✔
230
}
231

232
type retryCountOpt int
233

234
// WithRetryCount set retry count for a msg
235
// example: queue.SendDelayMsg(payload, duration, delayqueue.WithRetryCount(3))
236
func WithRetryCount(count int) interface{} {
1✔
237
        return retryCountOpt(count)
1✔
238
}
1✔
239

240
type msgTTLOpt time.Duration
241

242
// WithMsgTTL set ttl for a msg
243
// example: queue.SendDelayMsg(payload, duration, delayqueue.WithMsgTTL(Hour))
244
func WithMsgTTL(d time.Duration) interface{} {
1✔
245
        return msgTTLOpt(d)
1✔
246
}
1✔
247

248
// MessageInfo stores information to trace a message
249
type MessageInfo struct {
250
        id string
251
}
252

253
func (msg *MessageInfo) ID() string {
1✔
254
        return msg.id
1✔
255
}
1✔
256

257
const (
258
        StatePending    = "pending"
259
        StateReady      = "ready"
260
        StateReadyRetry = "ready_to_retry"
261
        StateConsuming  = "consuming"
262
        StateUnknown    = "unknown"
263
)
264

265
// SendScheduleMsgV2 submits a message delivered at given time
266
func (q *DelayQueue) SendScheduleMsgV2(payload string, t time.Time, opts ...interface{}) (*MessageInfo, error) {
1✔
267
        // parse options
1✔
268
        retryCount := q.defaultRetryCount
1✔
269
        for _, opt := range opts {
2✔
270
                switch o := opt.(type) {
1✔
271
                case retryCountOpt:
1✔
272
                        retryCount = uint(o)
1✔
273
                case msgTTLOpt:
1✔
274
                        q.msgTTL = time.Duration(o)
1✔
275
                }
276
        }
277
        // generate id
278
        idStr := uuid.Must(uuid.NewRandom()).String()
1✔
279
        now := time.Now()
1✔
280
        // store msg
1✔
281
        msgTTL := t.Sub(now) + q.msgTTL // delivery + q.msgTTL
1✔
282
        err := q.redisCli.Set(q.genMsgKey(idStr), payload, msgTTL)
1✔
283
        if err != nil {
1✔
284
                return nil, fmt.Errorf("store msg failed: %v", err)
×
285
        }
×
286
        // store retry count
287
        err = q.redisCli.HSet(q.retryCountKey, idStr, strconv.Itoa(int(retryCount)))
1✔
288
        if err != nil {
1✔
289
                return nil, fmt.Errorf("store retry count failed: %v", err)
×
290
        }
×
291
        // put to pending
292
        err = q.redisCli.ZAdd(q.pendingKey, map[string]float64{idStr: float64(t.Unix())})
1✔
293
        if err != nil {
1✔
294
                return nil, fmt.Errorf("push to pending failed: %v", err)
×
295
        }
×
296
        q.reportEvent(NewMessageEvent, 1)
1✔
297
        return &MessageInfo{
1✔
298
                id: idStr,
1✔
299
        }, nil
1✔
300
}
301

302
// SendDelayMsg submits a message delivered after given duration
303
func (q *DelayQueue) SendDelayMsgV2(payload string, duration time.Duration, opts ...interface{}) (*MessageInfo, error) {
1✔
304
        t := time.Now().Add(duration)
1✔
305
        return q.SendScheduleMsgV2(payload, t, opts...)
1✔
306
}
1✔
307

308
// SendScheduleMsg submits a message delivered at given time
309
// It is compatible with SendScheduleMsgV2, but does not return MessageInfo
310
func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error {
1✔
311
        _, err := q.SendScheduleMsgV2(payload, t, opts...)
1✔
312
        return err
1✔
313
}
1✔
314

315
// SendDelayMsg submits a message delivered after given duration
316
// It is compatible with SendDelayMsgV2, but does not return MessageInfo
317
func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error {
1✔
318
        t := time.Now().Add(duration)
1✔
319
        return q.SendScheduleMsg(payload, t, opts...)
1✔
320
}
1✔
321

322
type InterceptResult struct {
323
        Intercepted bool
324
        State       string
325
}
326

327
// TryIntercept trys to intercept a message
328
func (q *DelayQueue) TryIntercept(msg *MessageInfo) (*InterceptResult, error) {
1✔
329
        id := msg.ID()
1✔
330
        // try to intercept at ready
1✔
331
        removed, err := q.redisCli.LRem(q.readyKey, 0, id)
1✔
332
        if err != nil {
1✔
333
                q.logger.Printf("intercept %s from ready failed: %v", id, err)
×
334
        }
×
335
        if removed > 0 {
2✔
336
                _ = q.redisCli.Del([]string{q.genMsgKey(id)})
1✔
337
                _ = q.redisCli.HDel(q.retryCountKey, []string{id})
1✔
338
                return &InterceptResult{
1✔
339
                        Intercepted: true,
1✔
340
                        State:       StateReady,
1✔
341
                }, nil
1✔
342
        }
1✔
343
        // try to intercept at pending
344
        removed, err = q.redisCli.ZRem(q.pendingKey, []string{id})
1✔
345
        if err != nil {
1✔
346
                q.logger.Printf("intercept %s from pending failed: %v", id, err)
×
347
        }
×
348
        if removed > 0 {
2✔
349
                _ = q.redisCli.Del([]string{q.genMsgKey(id)})
1✔
350
                _ = q.redisCli.HDel(q.retryCountKey, []string{id})
1✔
351
                return &InterceptResult{
1✔
352
                        Intercepted: true,
1✔
353
                        State:       StatePending,
1✔
354
                }, nil
1✔
355
        }
1✔
356
        // message may be being consumed or has been successfully consumed
357
        // if the message has been successfully consumed, the following action will cause nothing
358
        // if the message is being consumed,the following action will prevent it from being retried
359
        q.redisCli.HDel(q.retryCountKey, []string{id})
1✔
360
        q.redisCli.LRem(q.retryKey, 0, id)
1✔
361

1✔
362
        return &InterceptResult{
1✔
363
                Intercepted: false,
1✔
364
                State:       StateUnknown,
1✔
365
        }, nil
1✔
366
}
367

368
func (q *DelayQueue) loadScript(script string) (string, error) {
1✔
369
        sha1, err := q.redisCli.ScriptLoad(script)
1✔
370
        if err != nil {
1✔
371
                return "", err
×
372
        }
×
373
        q.sha1mapMu.Lock()
1✔
374
        q.sha1map[script] = sha1
1✔
375
        q.sha1mapMu.Unlock()
1✔
376
        return sha1, nil
1✔
377
}
378

379
func (q *DelayQueue) eval(script string, keys []string, args []interface{}) (interface{}, error) {
1✔
380
        if !q.scriptPreload {
2✔
381
                return q.redisCli.Eval(script, keys, args)
1✔
382
        }
1✔
383
        var err error
1✔
384
        q.sha1mapMu.RLock()
1✔
385
        sha1, ok := q.sha1map[script]
1✔
386
        q.sha1mapMu.RUnlock()
1✔
387
        if !ok {
2✔
388
                sha1, err = q.loadScript(script)
1✔
389
                if err != nil {
1✔
390
                        return nil, err
×
391
                }
×
392
        }
393
        result, err := q.redisCli.EvalSha(sha1, keys, args)
1✔
394
        if err == nil {
2✔
395
                return result, err
1✔
396
        }
1✔
397
        // script not loaded, reload it
398
        // It is possible to access a node in the cluster that has no pre-loaded scripts.
399
        if strings.HasPrefix(err.Error(), "NOSCRIPT") {
1✔
400
                sha1, err = q.loadScript(script)
×
401
                if err != nil {
×
402
                        return nil, err
×
403
                }
×
404
                // try again
405
                result, err = q.redisCli.EvalSha(sha1, keys, args)
×
406
        }
407
        return result, err
1✔
408
}
409

410
// pending2ReadyScript atomically moves messages from pending to ready
411
// keys: pendingKey, readyKey
412
// argv: currentTime
413
// returns: ready message number
414
const pending2ReadyScript = `
415
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- get ready msg
416
if (#msgs == 0) then return end
417
local args2 = {} -- keys to push into ready
418
for _,v in ipairs(msgs) do
419
        table.insert(args2, v) 
420
    if (#args2 == 4000) then
421
                redis.call('LPush', KEYS[2], unpack(args2))
422
                args2 = {}
423
        end
424
end
425
if (#args2 > 0) then 
426
        redis.call('LPush', KEYS[2], unpack(args2))
427
end
428
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- remove msgs from pending
429
return #msgs
430
`
431

432
func (q *DelayQueue) pending2Ready() error {
1✔
433
        now := time.Now().Unix()
1✔
434
        keys := []string{q.pendingKey, q.readyKey}
1✔
435
        raw, err := q.eval(pending2ReadyScript, keys, []interface{}{now})
1✔
436
        if err != nil && err != NilErr {
1✔
437
                return fmt.Errorf("pending2ReadyScript failed: %v", err)
×
438
        }
×
439
        count, ok := raw.(int64)
1✔
440
        if ok {
2✔
441
                q.reportEvent(ReadyEvent, int(count))
1✔
442
        }
1✔
443
        return nil
1✔
444
}
445

446
// ready2UnackScript atomically moves messages from ready to unack
447
// keys: readyKey/retryKey, unackKey
448
// argv: retryTime
449
const ready2UnackScript = `
450
local msg = redis.call('RPop', KEYS[1])
451
if (not msg) then return end
452
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
453
return msg
454
`
455

456
func (q *DelayQueue) ready2Unack() (string, error) {
1✔
457
        retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
1✔
458
        keys := []string{q.readyKey, q.unAckKey}
1✔
459
        ret, err := q.eval(ready2UnackScript, keys, []interface{}{retryTime})
1✔
460
        if err == NilErr {
2✔
461
                return "", err
1✔
462
        }
1✔
463
        if err != nil {
1✔
464
                return "", fmt.Errorf("ready2UnackScript failed: %v", err)
×
465
        }
×
466
        str, ok := ret.(string)
1✔
467
        if !ok {
1✔
468
                return "", fmt.Errorf("illegal result: %#v", ret)
×
469
        }
×
470
        q.reportEvent(DeliveredEvent, 1)
1✔
471
        return str, nil
1✔
472
}
473

474
func (q *DelayQueue) retry2Unack() (string, error) {
1✔
475
        retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
1✔
476
        keys := []string{q.retryKey, q.unAckKey}
1✔
477
        ret, err := q.eval(ready2UnackScript, keys, []interface{}{retryTime, q.retryKey, q.unAckKey})
1✔
478
        if err == NilErr {
2✔
479
                return "", NilErr
1✔
480
        }
1✔
481
        if err != nil {
1✔
482
                return "", fmt.Errorf("ready2UnackScript failed: %v", err)
×
483
        }
×
484
        str, ok := ret.(string)
1✔
485
        if !ok {
1✔
486
                return "", fmt.Errorf("illegal result: %#v", ret)
×
487
        }
×
488
        return str, nil
1✔
489
}
490

491
func (q *DelayQueue) callback(idStr string) error {
1✔
492
        payload, err := q.redisCli.Get(q.genMsgKey(idStr))
1✔
493
        if err == NilErr {
1✔
UNCOV
494
                return nil
×
UNCOV
495
        }
×
496
        if err != nil {
1✔
497
                // Is an IO error?
×
498
                return fmt.Errorf("get message payload failed: %v", err)
×
499
        }
×
500
        ack := q.cb(payload)
1✔
501
        if ack {
2✔
502
                err = q.ack(idStr)
1✔
503
        } else {
2✔
504
                err = q.nack(idStr)
1✔
505
        }
1✔
506
        return err
1✔
507
}
508

509
func (q *DelayQueue) ack(idStr string) error {
1✔
510
        atomic.AddInt32(&q.fetchCount, -1)
1✔
511
        _, err := q.redisCli.ZRem(q.unAckKey, []string{idStr})
1✔
512
        if err != nil {
1✔
513
                return fmt.Errorf("remove from unack failed: %v", err)
×
514
        }
×
515
        // msg key has ttl, ignore result of delete
516
        _ = q.redisCli.Del([]string{q.genMsgKey(idStr)})
1✔
517
        _ = q.redisCli.HDel(q.retryCountKey, []string{idStr})
1✔
518
        q.reportEvent(AckEvent, 1)
1✔
519
        return nil
1✔
520
}
521

522
// updateZSetScoreScript update score of a zset member if it exists
523
// KEYS[1]: zset
524
// ARGV[1]: score
525
// ARGV[2]: member
526
const updateZSetScoreScript = `
527
if redis.call('zrank', KEYS[1], ARGV[2]) ~= nil then
528
    return redis.call('zadd', KEYS[1], ARGV[1], ARGV[2])
529
else
530
    return 0
531
end
532
`
533

534
func (q *DelayQueue) updateZSetScore(key string, score float64, member string) error {
1✔
535
        scoreStr := strconv.FormatFloat(score, 'f', -1, 64)
1✔
536
        _, err := q.eval(updateZSetScoreScript, []string{key}, []interface{}{scoreStr, member})
1✔
537
        return err
1✔
538
}
1✔
539

540
func (q *DelayQueue) nack(idStr string) error {
1✔
541
        atomic.AddInt32(&q.fetchCount, -1)
1✔
542
        retryTime := float64(time.Now().Add(q.nackRedeliveryDelay).Unix())
1✔
543
        // if message consumption has not reach deadlin (still in unAckKey), then update its retry time
1✔
544
        err := q.updateZSetScore(q.unAckKey, retryTime, idStr)
1✔
545
        if err != nil {
1✔
546
                return fmt.Errorf("negative ack failed: %v", err)
×
547
        }
×
548
        q.reportEvent(NackEvent, 1)
1✔
549
        return nil
1✔
550
}
551

552
// unack2RetryScript atomically moves messages from unack to retry which remaining retry count greater than 0,
553
// and moves messages from unack to garbage which  retry count is 0
554
// Because DelayQueue cannot determine garbage message before eval unack2RetryScript, so it cannot pass keys parameter to redisCli.Eval
555
// Therefore unack2RetryScript moves garbage message to garbageKey instead of deleting directly
556
// keys: unackKey, retryCountKey, retryKey, garbageKey
557
// argv: currentTime
558
// returns: {retryMsgs, failMsgs}
559
const unack2RetryScript = `
560
local unack2retry = function(msgs)
561
        local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- get retry count
562
        local retryMsgs = 0
563
        local failMsgs = 0
564
        for i,v in ipairs(retryCounts) do
565
                local k = msgs[i]
566
                if v ~= false and v ~= nil and v ~= '' and tonumber(v) > 0 then
567
                        redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count
568
                        redis.call("LPush", KEYS[3], k) -- add to retry
569
                        retryMsgs = retryMsgs + 1
570
                else
571
                        redis.call("HDel", KEYS[2], k) -- del retry count
572
                        redis.call("SAdd", KEYS[4], k) -- add to garbage
573
                        failMsgs = failMsgs + 1
574
                end
575
        end
576
        return retryMsgs, failMsgs
577
end
578

579
local retryMsgs = 0
580
local failMsgs = 0
581
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- get retry msg
582
if (#msgs == 0) then return end
583
if #msgs < 4000 then
584
        local d1, d2 = unack2retry(msgs)
585
        retryMsgs = retryMsgs + d1
586
        failMsgs = failMsgs + d2
587
else
588
        local buf = {}
589
        for _,v in ipairs(msgs) do
590
                table.insert(buf, v)
591
                if #buf == 4000 then
592
                    local d1, d2 = unack2retry(buf)
593
                        retryMsgs = retryMsgs + d1
594
                        failMsgs = failMsgs + d2
595
                        buf = {}
596
                end
597
        end
598
        if (#buf > 0) then
599
                local d1, d2 = unack2retry(buf)
600
                retryMsgs = retryMsgs + d1
601
                failMsgs = failMsgs + d2
602
        end
603
end
604
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- remove msgs from unack
605
return {retryMsgs, failMsgs}
606
`
607

608
func (q *DelayQueue) unack2Retry() error {
1✔
609
        keys := []string{q.unAckKey, q.retryCountKey, q.retryKey, q.garbageKey}
1✔
610
        now := time.Now()
1✔
611
        raw, err := q.eval(unack2RetryScript, keys, []interface{}{now.Unix()})
1✔
612
        if err != nil && err != NilErr {
1✔
613
                return fmt.Errorf("unack to retry script failed: %v", err)
×
614
        }
×
615
        infos, ok := raw.([]interface{})
1✔
616
        if ok && len(infos) == 2 {
2✔
617
                retryCount, ok := infos[0].(int64)
1✔
618
                if ok {
2✔
619
                        q.reportEvent(RetryEvent, int(retryCount))
1✔
620
                }
1✔
621
                failCount, ok := infos[1].(int64)
1✔
622
                if ok {
2✔
623
                        q.reportEvent(FinalFailedEvent, int(failCount))
1✔
624
                }
1✔
625
        }
626
        return nil
1✔
627
}
628

629
func (q *DelayQueue) garbageCollect() error {
1✔
630
        msgIds, err := q.redisCli.SMembers(q.garbageKey)
1✔
631
        if err != nil {
1✔
632
                return fmt.Errorf("smembers failed: %v", err)
×
633
        }
×
634
        if len(msgIds) == 0 {
2✔
635
                return nil
1✔
636
        }
1✔
637
        // allow concurrent clean
638
        msgKeys := make([]string, 0, len(msgIds))
1✔
639
        for _, idStr := range msgIds {
2✔
640
                msgKeys = append(msgKeys, q.genMsgKey(idStr))
1✔
641
        }
1✔
642
        err = q.redisCli.Del(msgKeys)
1✔
643
        if err != nil && err != NilErr {
1✔
644
                return fmt.Errorf("del msgs failed: %v", err)
×
645
        }
×
646
        err = q.redisCli.SRem(q.garbageKey, msgIds)
1✔
647
        if err != nil && err != NilErr {
1✔
648
                return fmt.Errorf("remove from garbage key failed: %v", err)
×
649
        }
×
650
        return nil
1✔
651
}
652

653
func (q *DelayQueue) beforeConsume() ([]string, error) {
1✔
654
        // pending to ready
1✔
655
        err := q.pending2Ready()
1✔
656
        if err != nil {
1✔
657
                return nil, err
×
658
        }
×
659
        // ready2Unack
660
        // prioritize new message consumption to avoid avalanches
661
        ids := make([]string, 0, q.fetchLimit)
1✔
662
        var fetchCount int32
1✔
663
        for {
2✔
664
                fetchCount = atomic.LoadInt32(&q.fetchCount)
1✔
665
                if q.fetchLimit > 0 && fetchCount >= int32(q.fetchLimit) {
2✔
666
                        break
1✔
667
                }
668
                idStr, err := q.ready2Unack()
1✔
669
                if err == NilErr { // consumed all
2✔
670
                        break
1✔
671
                }
672
                if err != nil {
1✔
673
                        return nil, err
×
674
                }
×
675
                ids = append(ids, idStr)
1✔
676
                atomic.AddInt32(&q.fetchCount, 1)
1✔
677
        }
678
        // retry2Unack
679
        if fetchCount < int32(q.fetchLimit) || q.fetchLimit == 0 {
2✔
680
                for {
2✔
681
                        fetchCount = atomic.LoadInt32(&q.fetchCount)
1✔
682
                        if q.fetchLimit > 0 && fetchCount >= int32(q.fetchLimit) {
2✔
683
                                break
1✔
684
                        }
685
                        idStr, err := q.retry2Unack()
1✔
686
                        if err == NilErr { // consumed all
2✔
687
                                break
1✔
688
                        }
689
                        if err != nil {
1✔
690
                                return nil, err
×
691
                        }
×
692
                        ids = append(ids, idStr)
1✔
693
                        atomic.AddInt32(&q.fetchCount, 1)
1✔
694
                }
695
        }
696
        return ids, nil
1✔
697
}
698

699
func (q *DelayQueue) afterConsume() error {
1✔
700
        // unack to retry
1✔
701
        err := q.unack2Retry()
1✔
702
        if err != nil {
1✔
703
                return err
×
704
        }
×
705
        err = q.garbageCollect()
1✔
706
        if err != nil {
1✔
707
                return err
×
708
        }
×
709
        return nil
1✔
710
}
711

712
func (q *DelayQueue) setRunning() {
1✔
713
        atomic.StoreInt32(&q.running, 1)
1✔
714
}
1✔
715

716
func (q *DelayQueue) setNotRunning() {
1✔
717
        atomic.StoreInt32(&q.running, 0)
1✔
718
}
1✔
719

720
func (q *DelayQueue) assertNotRunning() {
1✔
721
        running := atomic.LoadInt32(&q.running)
1✔
722
        if running > 0 {
1✔
723
                panic("operation cannot be performed during running")
×
724
        }
725
}
726

727
func (q *DelayQueue) goWithRecover(fn func()) {
1✔
728
        go func() {
2✔
729
                defer func() {
2✔
730
                        if err := recover(); err != nil {
1✔
731
                                q.logger.Printf("panic: %v\n", err)
×
732
                        }
×
733
                }()
734
                fn()
1✔
735
        }()
736
}
737

738
// StartConsume creates a goroutine to consume message from DelayQueue
739
// use `<-done` to wait consumer stopping
740
// If there is no callback set, StartConsume will panic
741
func (q *DelayQueue) StartConsume() (done <-chan struct{}) {
1✔
742
        if q.cb == nil {
1✔
743
                panic("this instance has no callback")
×
744
        }
745
        q.close = make(chan struct{}, 1)
1✔
746
        q.setRunning()
1✔
747
        q.ticker = time.NewTicker(q.fetchInterval)
1✔
748
        q.consumeBuffer = make(chan string, q.fetchLimit)
1✔
749
        done0 := make(chan struct{})
1✔
750
        // start worker
1✔
751
        for i := 0; i < int(q.concurrent); i++ {
2✔
752
                q.goWithRecover(func() {
2✔
753
                        for id := range q.consumeBuffer {
2✔
754
                                q.callback(id)
1✔
755
                                q.afterConsume()
1✔
756
                        }
1✔
757
                })
758
        }
759
        // start main loop
760
        go func() {
2✔
761
        tickerLoop:
1✔
762
                for {
2✔
763
                        select {
1✔
764
                        case <-q.ticker.C:
1✔
765
                                ids, err := q.beforeConsume()
1✔
766
                                if err != nil {
1✔
NEW
767
                                        q.logger.Printf("consume error: %v", err)
×
768
                                }
×
769
                                q.goWithRecover(func() {
2✔
770
                                        for _, id := range ids {
2✔
771
                                                q.consumeBuffer <- id
1✔
772
                                        }
1✔
773
                                })
774
                        case <-q.close:
1✔
775
                                break tickerLoop
1✔
776
                        }
777
                }
778
                close(done0)
1✔
779
        }()
780
        return done0
1✔
781
}
782

783
// StopConsume stops consumer goroutine
784
func (q *DelayQueue) StopConsume() {
1✔
785
        close(q.close)
1✔
786
        q.setNotRunning()
1✔
787
        if q.ticker != nil {
2✔
788
                q.ticker.Stop()
1✔
789
        }
1✔
790
}
791

792
// GetPendingCount returns the number of pending messages
793
func (q *DelayQueue) GetPendingCount() (int64, error) {
1✔
794
        return q.redisCli.ZCard(q.pendingKey)
1✔
795
}
1✔
796

797
// GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered
798
func (q *DelayQueue) GetReadyCount() (int64, error) {
1✔
799
        return q.redisCli.LLen(q.readyKey)
1✔
800
}
1✔
801

802
// GetProcessingCount returns the number of messages which are being processed
803
func (q *DelayQueue) GetProcessingCount() (int64, error) {
1✔
804
        return q.redisCli.ZCard(q.unAckKey)
1✔
805
}
1✔
806

807
// EventListener which will be called when events occur
808
// This Listener can be used to monitor running status
809
type EventListener interface {
810
        // OnEvent will be called when events occur
811
        OnEvent(*Event)
812
}
813

814
// ListenEvent register a listener which will be called when events occur,
815
// so it can be used to monitor running status
816
//
817
// But It can ONLY receive events from the CURRENT INSTANCE,
818
// if you want to listen to all events in queue, just use Monitor.ListenEvent
819
//
820
// There can be AT MOST ONE EventListener in an DelayQueue instance.
821
// If you are using customized listener, Monitor will stop working
822
func (q *DelayQueue) ListenEvent(listener EventListener) {
1✔
823
        q.eventListener = listener
1✔
824
}
1✔
825

826
// RemoveListener stops reporting events to EventListener
827
func (q *DelayQueue) DisableListener() {
×
828
        q.eventListener = nil
×
829
}
×
830

831
func (q *DelayQueue) reportEvent(code int, count int) {
1✔
832
        listener := q.eventListener // eventListener may be changed during running
1✔
833
        if listener != nil && count > 0 {
2✔
834
                event := &Event{
1✔
835
                        Code:      code,
1✔
836
                        Timestamp: time.Now().Unix(),
1✔
837
                        MsgCount:  count,
1✔
838
                }
1✔
839
                listener.OnEvent(event)
1✔
840
        }
1✔
841
}
842

843
// pubsubListener receives events and reports them through redis pubsub for monitoring
844
type pubsubListener struct {
845
        redisCli   RedisCli
846
        reportChan string
847
}
848

849
func genReportChannel(name string) string {
1✔
850
        return "dq:" + name + ":reportEvents"
1✔
851
}
1✔
852

853
// EnableReport enables reporting to monitor
854
func (q *DelayQueue) EnableReport() {
1✔
855
        reportChan := genReportChannel(q.name)
1✔
856
        q.ListenEvent(&pubsubListener{
1✔
857
                redisCli:   q.redisCli,
1✔
858
                reportChan: reportChan,
1✔
859
        })
1✔
860
}
1✔
861

862
// DisableReport stops reporting to monitor
863
func (q *DelayQueue) DisableReport() {
×
864
        q.DisableListener()
×
865
}
×
866

867
func (l *pubsubListener) OnEvent(event *Event) {
1✔
868
        payload := encodeEvent(event)
1✔
869
        l.redisCli.Publish(l.reportChan, payload)
1✔
870
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc