• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HDT3213 / delayqueue / 12848542532

19 Jan 2025 12:08AM UTC coverage: 82.739% (+0.07%) from 82.665%
12848542532

push

github

HDT3213
support NackRedeliveryDelay

5 of 5 new or added lines in 1 file covered. (100.0%)

4 existing lines in 1 file now uncovered.

580 of 701 relevant lines covered (82.74%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.69
/delayqueue.go
1
package delayqueue
2

3
import (
4
        "errors"
5
        "fmt"
6
        "log"
7
        "strconv"
8
        "strings"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/google/uuid"
14
)
15

16
// DelayQueue is a message queue supporting delayed/scheduled delivery based on redis
17
type DelayQueue struct {
18
        // name for this Queue. Make sure the name is unique in redis database
19
        name               string
20
        redisCli           RedisCli
21
        cb                 func(string) bool
22
        pendingKey         string // sorted set: message id -> delivery time
23
        readyKey           string // list
24
        unAckKey           string // sorted set: message id -> retry time
25
        retryKey           string // list
26
        retryCountKey      string // hash: message id -> remain retry count
27
        garbageKey         string // set: message id
28
        useHashTag         bool
29
        ticker             *time.Ticker
30
        logger             Logger
31
        close              chan struct{}
32
        running            int32
33
        maxConsumeDuration time.Duration // default 5 seconds
34
        msgTTL             time.Duration // default 1 hour
35
        defaultRetryCount  uint          // default 3
36
        fetchInterval      time.Duration // default 1 second
37
        fetchLimit         uint          // default no limit
38
        fetchCount         int32         // actually running task number
39
        concurrent         uint          // default 1, executed serially
40
        sha1map            map[string]string
41
        sha1mapMu          *sync.RWMutex
42
        scriptPreload      bool
43
        // for batch consume
44
        consumeBuffer chan string
45

46
        eventListener       EventListener
47
        nackRedeliveryDelay time.Duration
48
}
49

50
// NilErr represents redis nil
51
var NilErr = errors.New("nil")
52

53
// RedisCli is abstraction for redis client, required commands only not all commands
54
type RedisCli interface {
55
        // Eval sends lua script to redis
56
        // args should be string, integer or float
57
        // returns string, int64, []interface{} (elements can be string or int64)
58
        Eval(script string, keys []string, args []interface{}) (interface{}, error)
59
        Set(key string, value string, expiration time.Duration) error
60
        // Get represents redis command GET
61
        // please NilErr when no such key in redis
62
        Get(key string) (string, error)
63
        Del(keys []string) error
64
        HSet(key string, field string, value string) error
65
        HDel(key string, fields []string) error
66
        SMembers(key string) ([]string, error)
67
        SRem(key string, members []string) error
68
        ZAdd(key string, values map[string]float64) error
69
        ZRem(key string, fields []string) error
70
        ZCard(key string) (int64, error)
71
        LLen(key string) (int64, error)
72

73
        // Publish used for monitor only
74
        Publish(channel string, payload string) error
75
        // Subscribe used for monitor only
76
        // returns: payload channel, subscription closer, error; the subscription closer should close payload channel as well
77
        Subscribe(channel string) (payloads <-chan string, close func(), err error)
78

79
        // ScriptLoad call `script load` command
80
        ScriptLoad(script string) (string, error)
81
        // EvalSha run preload scripts
82
        // If there is no preload scripts please return error with message "NOSCRIPT"
83
        EvalSha(sha1 string, keys []string, args []interface{}) (interface{}, error)
84
}
85

86
// Logger is an abstraction of logging system
87
type Logger interface {
88
        Printf(format string, v ...interface{})
89
}
90

91
type hashTagKeyOpt int
92

93
// CallbackFunc receives and consumes messages
94
// returns true to confirm successfully consumed, false to re-deliver this message
95
type CallbackFunc = func(string) bool
96

97
// UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot.
98
// If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue
99
// WARNING! Changing (add or remove) this option will cause DelayQueue failing to read existed data in redis
100
// see more:  https://redis.io/docs/reference/cluster-spec/#hash-tags
101
func UseHashTagKey() interface{} {
1✔
102
        return hashTagKeyOpt(1)
1✔
103
}
1✔
104

105
// NewQueue0 creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message
106
// callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
107
func NewQueue0(name string, cli RedisCli, opts ...interface{}) *DelayQueue {
1✔
108
        if name == "" {
1✔
109
                panic("name is required")
×
110
        }
111
        if cli == nil {
1✔
112
                panic("cli is required")
×
113
        }
114
        useHashTag := false
1✔
115
        var callback CallbackFunc = nil
1✔
116
        for _, opt := range opts {
2✔
117
                switch o := opt.(type) {
1✔
118
                case hashTagKeyOpt:
1✔
119
                        useHashTag = true
1✔
120
                case CallbackFunc:
1✔
121
                        callback = o
1✔
122
                }
123
        }
124
        var keyPrefix string
1✔
125
        if useHashTag {
2✔
126
                keyPrefix = "{dp:" + name + "}"
1✔
127
        } else {
2✔
128
                keyPrefix = "dp:" + name
1✔
129
        }
1✔
130
        return &DelayQueue{
1✔
131
                name:               name,
1✔
132
                redisCli:           cli,
1✔
133
                cb:                 callback,
1✔
134
                pendingKey:         keyPrefix + ":pending",
1✔
135
                readyKey:           keyPrefix + ":ready",
1✔
136
                unAckKey:           keyPrefix + ":unack",
1✔
137
                retryKey:           keyPrefix + ":retry",
1✔
138
                retryCountKey:      keyPrefix + ":retry:cnt",
1✔
139
                garbageKey:         keyPrefix + ":garbage",
1✔
140
                useHashTag:         useHashTag,
1✔
141
                close:              nil,
1✔
142
                maxConsumeDuration: 5 * time.Second,
1✔
143
                msgTTL:             time.Hour,
1✔
144
                logger:             log.Default(),
1✔
145
                defaultRetryCount:  3,
1✔
146
                fetchInterval:      time.Second,
1✔
147
                concurrent:         1,
1✔
148
                sha1map:            make(map[string]string),
1✔
149
                sha1mapMu:          &sync.RWMutex{},
1✔
150
                scriptPreload:      true,
1✔
151
        }
1✔
152
}
153

154
// WithCallback set callback for queue to receives and consumes messages
155
// callback returns true to confirm successfully consumed, false to re-deliver this message
156
func (q *DelayQueue) WithCallback(callback CallbackFunc) *DelayQueue {
1✔
157
        q.cb = callback
1✔
158
        return q
1✔
159
}
1✔
160

161
// WithLogger customizes logger for queue
162
func (q *DelayQueue) WithLogger(logger Logger) *DelayQueue {
1✔
163
        q.logger = logger
1✔
164
        return q
1✔
165
}
1✔
166

167
// WithFetchInterval customizes the interval at which consumer fetch message from redis
168
func (q *DelayQueue) WithFetchInterval(d time.Duration) *DelayQueue {
1✔
169
        q.fetchInterval = d
1✔
170
        return q
1✔
171
}
1✔
172

173
// WithScriptPreload use script load command preload scripts to redis
174
func (q *DelayQueue) WithScriptPreload(flag bool) *DelayQueue {
1✔
175
        q.scriptPreload = flag
1✔
176
        return q
1✔
177
}
1✔
178

179
// WithMaxConsumeDuration customizes max consume duration
180
// If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again
181
func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue {
1✔
182
        q.maxConsumeDuration = d
1✔
183
        return q
1✔
184
}
1✔
185

186
// WithFetchLimit limits the max number of processing messages, 0 means no limit
187
func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue {
1✔
188
        q.fetchLimit = limit
1✔
189
        return q
1✔
190
}
1✔
191

192
// WithConcurrent sets the number of concurrent consumers
193
func (q *DelayQueue) WithConcurrent(c uint) *DelayQueue {
1✔
194
        if c == 0 {
1✔
195
                panic("concurrent cannot be 0")
×
196
        }
197
        q.assertNotRunning()
1✔
198
        q.concurrent = c
1✔
199
        return q
1✔
200
}
201

202
// WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue
203
// use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message
204
func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue {
1✔
205
        q.defaultRetryCount = count
1✔
206
        return q
1✔
207
}
1✔
208

209
// WithNackRedeliveryDelay customizes the interval between redelivery and nack (callback returns false) 
210
// If consumption exceeded deadline, the message will be redelivered immediately
211
func (q *DelayQueue) WithNackRedeliveryDelay(d time.Duration) *DelayQueue {
1✔
212
        q.nackRedeliveryDelay = d
1✔
213
        return q
1✔
214
}
1✔
215

216
func (q *DelayQueue) genMsgKey(idStr string) string {
1✔
217
        if q.useHashTag {
2✔
218
                return "{dp:" + q.name + "}" + ":msg:" + idStr
1✔
219
        }
1✔
220
        return "dp:" + q.name + ":msg:" + idStr
1✔
221
}
222

223
type retryCountOpt int
224

225
// WithRetryCount set retry count for a msg
226
// example: queue.SendDelayMsg(payload, duration, delayqueue.WithRetryCount(3))
227
func WithRetryCount(count int) interface{} {
1✔
228
        return retryCountOpt(count)
1✔
229
}
1✔
230

231
type msgTTLOpt time.Duration
232

233
// WithMsgTTL set ttl for a msg
234
// example: queue.SendDelayMsg(payload, duration, delayqueue.WithMsgTTL(Hour))
235
func WithMsgTTL(d time.Duration) interface{} {
1✔
236
        return msgTTLOpt(d)
1✔
237
}
1✔
238

239
// SendScheduleMsg submits a message delivered at given time
240
func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error {
1✔
241
        // parse options
1✔
242
        retryCount := q.defaultRetryCount
1✔
243
        for _, opt := range opts {
2✔
244
                switch o := opt.(type) {
1✔
245
                case retryCountOpt:
1✔
246
                        retryCount = uint(o)
1✔
247
                case msgTTLOpt:
1✔
248
                        q.msgTTL = time.Duration(o)
1✔
249
                }
250
        }
251
        // generate id
252
        idStr := uuid.Must(uuid.NewRandom()).String()
1✔
253
        now := time.Now()
1✔
254
        // store msg
1✔
255
        msgTTL := t.Sub(now) + q.msgTTL // delivery + q.msgTTL
1✔
256
        err := q.redisCli.Set(q.genMsgKey(idStr), payload, msgTTL)
1✔
257
        if err != nil {
1✔
258
                return fmt.Errorf("store msg failed: %v", err)
×
259
        }
×
260
        // store retry count
261
        err = q.redisCli.HSet(q.retryCountKey, idStr, strconv.Itoa(int(retryCount)))
1✔
262
        if err != nil {
1✔
263
                return fmt.Errorf("store retry count failed: %v", err)
×
264
        }
×
265
        // put to pending
266
        err = q.redisCli.ZAdd(q.pendingKey, map[string]float64{idStr: float64(t.Unix())})
1✔
267
        if err != nil {
1✔
268
                return fmt.Errorf("push to pending failed: %v", err)
×
269
        }
×
270
        q.reportEvent(NewMessageEvent, 1)
1✔
271
        return nil
1✔
272
}
273

274
// SendDelayMsg submits a message delivered after given duration
275
func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error {
1✔
276
        t := time.Now().Add(duration)
1✔
277
        return q.SendScheduleMsg(payload, t, opts...)
1✔
278
}
1✔
279

280
func (q *DelayQueue) loadScript(script string) (string, error) {
1✔
281
        sha1, err := q.redisCli.ScriptLoad(script)
1✔
282
        if err != nil {
1✔
283
                return "", err
×
284
        }
×
285
        q.sha1mapMu.Lock()
1✔
286
        q.sha1map[script] = sha1
1✔
287
        q.sha1mapMu.Unlock()
1✔
288
        return sha1, nil
1✔
289
}
290

291
func (q *DelayQueue) eval(script string, keys []string, args []interface{}) (interface{}, error) {
1✔
292
        if !q.scriptPreload {
1✔
293
                return q.redisCli.Eval(script, keys, args)
×
294
        }
×
295
        var err error
1✔
296
        q.sha1mapMu.RLock()
1✔
297
        sha1, ok := q.sha1map[script]
1✔
298
        q.sha1mapMu.RUnlock()
1✔
299
        if !ok {
2✔
300
                sha1, err = q.loadScript(script)
1✔
301
                if err != nil {
1✔
302
                        return nil, err
×
303
                }
×
304
        }
305
        result, err := q.redisCli.EvalSha(sha1, keys, args)
1✔
306
        if err == nil {
2✔
307
                return result, err
1✔
308
        }
1✔
309
        // script not loaded, reload it
310
        // It is possible to access a node in the cluster that has no pre-loaded scripts.
311
        if strings.HasPrefix(err.Error(), "NOSCRIPT") {
2✔
312
                sha1, err = q.loadScript(script)
1✔
313
                if err != nil {
1✔
314
                        return nil, err
×
315
                }
×
316
                // try again
317
                result, err = q.redisCli.EvalSha(sha1, keys, args)
1✔
318
        }
319
        return result, err
1✔
320
}
321

322
// pending2ReadyScript atomically moves messages from pending to ready
323
// keys: pendingKey, readyKey
324
// argv: currentTime
325
// returns: ready message number
326
const pending2ReadyScript = `
327
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- get ready msg
328
if (#msgs == 0) then return end
329
local args2 = {} -- keys to push into ready
330
for _,v in ipairs(msgs) do
331
        table.insert(args2, v) 
332
    if (#args2 == 4000) then
333
                redis.call('LPush', KEYS[2], unpack(args2))
334
                args2 = {}
335
        end
336
end
337
if (#args2 > 0) then 
338
        redis.call('LPush', KEYS[2], unpack(args2))
339
end
340
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- remove msgs from pending
341
return #msgs
342
`
343

344
func (q *DelayQueue) pending2Ready() error {
1✔
345
        now := time.Now().Unix()
1✔
346
        keys := []string{q.pendingKey, q.readyKey}
1✔
347
        raw, err := q.eval(pending2ReadyScript, keys, []interface{}{now})
1✔
348
        if err != nil && err != NilErr {
1✔
349
                return fmt.Errorf("pending2ReadyScript failed: %v", err)
×
350
        }
×
351
        count, ok := raw.(int64)
1✔
352
        if ok {
2✔
353
                q.reportEvent(ReadyEvent, int(count))
1✔
354
        }
1✔
355
        return nil
1✔
356
}
357

358
// ready2UnackScript atomically moves messages from ready to unack
359
// keys: readyKey/retryKey, unackKey
360
// argv: retryTime
361
const ready2UnackScript = `
362
local msg = redis.call('RPop', KEYS[1])
363
if (not msg) then return end
364
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
365
return msg
366
`
367

368
func (q *DelayQueue) ready2Unack() (string, error) {
1✔
369
        retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
1✔
370
        keys := []string{q.readyKey, q.unAckKey}
1✔
371
        ret, err := q.eval(ready2UnackScript, keys, []interface{}{retryTime})
1✔
372
        if err == NilErr {
2✔
373
                return "", err
1✔
374
        }
1✔
375
        if err != nil {
1✔
376
                return "", fmt.Errorf("ready2UnackScript failed: %v", err)
×
377
        }
×
378
        str, ok := ret.(string)
1✔
379
        if !ok {
1✔
380
                return "", fmt.Errorf("illegal result: %#v", ret)
×
381
        }
×
382
        q.reportEvent(DeliveredEvent, 1)
1✔
383
        return str, nil
1✔
384
}
385

386
func (q *DelayQueue) retry2Unack() (string, error) {
1✔
387
        retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
1✔
388
        keys := []string{q.retryKey, q.unAckKey}
1✔
389
        ret, err := q.eval(ready2UnackScript, keys, []interface{}{retryTime, q.retryKey, q.unAckKey})
1✔
390
        if err == NilErr {
2✔
391
                return "", NilErr
1✔
392
        }
1✔
393
        if err != nil {
1✔
394
                return "", fmt.Errorf("ready2UnackScript failed: %v", err)
×
395
        }
×
396
        str, ok := ret.(string)
1✔
397
        if !ok {
1✔
398
                return "", fmt.Errorf("illegal result: %#v", ret)
×
399
        }
×
400
        return str, nil
1✔
401
}
402

403
func (q *DelayQueue) callback(idStr string) error {
1✔
404
        payload, err := q.redisCli.Get(q.genMsgKey(idStr))
1✔
405
        if err == NilErr {
2✔
406
                return nil
1✔
407
        }
1✔
408
        if err != nil {
1✔
409
                // Is an IO error?
×
410
                return fmt.Errorf("get message payload failed: %v", err)
×
411
        }
×
412
        ack := q.cb(payload)
1✔
413
        if ack {
2✔
414
                err = q.ack(idStr)
1✔
415
        } else {
2✔
416
                err = q.nack(idStr)
1✔
417
        }
1✔
418
        return err
1✔
419
}
420

421
func (q *DelayQueue) ack(idStr string) error {
1✔
422
        atomic.AddInt32(&q.fetchCount, -1)
1✔
423
        err := q.redisCli.ZRem(q.unAckKey, []string{idStr})
1✔
424
        if err != nil {
1✔
425
                return fmt.Errorf("remove from unack failed: %v", err)
×
426
        }
×
427
        // msg key has ttl, ignore result of delete
428
        _ = q.redisCli.Del([]string{q.genMsgKey(idStr)})
1✔
429
        _ = q.redisCli.HDel(q.retryCountKey, []string{idStr})
1✔
430
        q.reportEvent(AckEvent, 1)
1✔
431
        return nil
1✔
432
}
433

434
func (q *DelayQueue) nack(idStr string) error {
1✔
435
        atomic.AddInt32(&q.fetchCount, -1)
1✔
436
        // update retry time as now, unack2Retry will move it to retry immediately
1✔
437
        err := q.redisCli.ZAdd(q.unAckKey, map[string]float64{
1✔
438
                idStr: float64(time.Now().Add(q.nackRedeliveryDelay).Unix()),
1✔
439
        })
1✔
440
        if err != nil {
1✔
441
                return fmt.Errorf("negative ack failed: %v", err)
×
442
        }
×
443
        q.reportEvent(NackEvent, 1)
1✔
444
        return nil
1✔
445
}
446

447
// unack2RetryScript atomically moves messages from unack to retry which remaining retry count greater than 0,
448
// and moves messages from unack to garbage which  retry count is 0
449
// Because DelayQueue cannot determine garbage message before eval unack2RetryScript, so it cannot pass keys parameter to redisCli.Eval
450
// Therefore unack2RetryScript moves garbage message to garbageKey instead of deleting directly
451
// keys: unackKey, retryCountKey, retryKey, garbageKey
452
// argv: currentTime
453
// returns: {retryMsgs, failMsgs}
454
const unack2RetryScript = `
455
local unack2retry = function(msgs)
456
        local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- get retry count
457
        local retryMsgs = 0
458
        local failMsgs = 0
459
        for i,v in ipairs(retryCounts) do
460
                local k = msgs[i]
461
                if v ~= false and v ~= nil and v ~= '' and tonumber(v) > 0 then
462
                        redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count
463
                        redis.call("LPush", KEYS[3], k) -- add to retry
464
                        retryMsgs = retryMsgs + 1
465
                else
466
                        redis.call("HDel", KEYS[2], k) -- del retry count
467
                        redis.call("SAdd", KEYS[4], k) -- add to garbage
468
                        failMsgs = failMsgs + 1
469
                end
470
        end
471
        return retryMsgs, failMsgs
472
end
473

474
local retryMsgs = 0
475
local failMsgs = 0
476
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- get retry msg
477
if (#msgs == 0) then return end
478
if #msgs < 4000 then
479
        local d1, d2 = unack2retry(msgs)
480
        retryMsgs = retryMsgs + d1
481
        failMsgs = failMsgs + d2
482
else
483
        local buf = {}
484
        for _,v in ipairs(msgs) do
485
                table.insert(buf, v)
486
                if #buf == 4000 then
487
                    local d1, d2 = unack2retry(buf)
488
                        retryMsgs = retryMsgs + d1
489
                        failMsgs = failMsgs + d2
490
                        buf = {}
491
                end
492
        end
493
        if (#buf > 0) then
494
                local d1, d2 = unack2retry(buf)
495
                retryMsgs = retryMsgs + d1
496
                failMsgs = failMsgs + d2
497
        end
498
end
499
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- remove msgs from unack
500
return {retryMsgs, failMsgs}
501
`
502

503
func (q *DelayQueue) unack2Retry() error {
1✔
504
        keys := []string{q.unAckKey, q.retryCountKey, q.retryKey, q.garbageKey}
1✔
505
        now := time.Now()
1✔
506
        raw, err := q.eval(unack2RetryScript, keys, []interface{}{now.Unix()})
1✔
507
        if err != nil && err != NilErr {
1✔
508
                return fmt.Errorf("unack to retry script failed: %v", err)
×
509
        }
×
510
        infos, ok := raw.([]interface{})
1✔
511
        if ok && len(infos) == 2 {
2✔
512
                retryCount, ok := infos[0].(int64)
1✔
513
                if ok {
2✔
514
                        q.reportEvent(RetryEvent, int(retryCount))
1✔
515
                }
1✔
516
                failCount, ok := infos[1].(int64)
1✔
517
                if ok {
2✔
518
                        q.reportEvent(FinalFailedEvent, int(failCount))
1✔
519
                }
1✔
520
        }
521
        return nil
1✔
522
}
523

524
func (q *DelayQueue) garbageCollect() error {
1✔
525
        msgIds, err := q.redisCli.SMembers(q.garbageKey)
1✔
526
        if err != nil {
1✔
527
                return fmt.Errorf("smembers failed: %v", err)
×
528
        }
×
529
        if len(msgIds) == 0 {
2✔
530
                return nil
1✔
531
        }
1✔
532
        // allow concurrent clean
533
        msgKeys := make([]string, 0, len(msgIds))
1✔
534
        for _, idStr := range msgIds {
2✔
535
                msgKeys = append(msgKeys, q.genMsgKey(idStr))
1✔
536
        }
1✔
537
        err = q.redisCli.Del(msgKeys)
1✔
538
        if err != nil && err != NilErr {
1✔
539
                return fmt.Errorf("del msgs failed: %v", err)
×
540
        }
×
541
        err = q.redisCli.SRem(q.garbageKey, msgIds)
1✔
542
        if err != nil && err != NilErr {
1✔
543
                return fmt.Errorf("remove from garbage key failed: %v", err)
×
544
        }
×
545
        return nil
1✔
546
}
547

548
func (q *DelayQueue) beforeConsume() ([]string, error) {
1✔
549
        // pending to ready
1✔
550
        err := q.pending2Ready()
1✔
551
        if err != nil {
1✔
552
                return nil, err
×
553
        }
×
554
        // ready2Unack
555
        // prioritize new message consumption to avoid avalanches
556
        ids := make([]string, 0, q.fetchLimit)
1✔
557
        var fetchCount int32
1✔
558
        for {
2✔
559
                fetchCount = atomic.LoadInt32(&q.fetchCount)
1✔
560
                if q.fetchLimit > 0 && fetchCount >= int32(q.fetchLimit) {
2✔
561
                        break
1✔
562
                }
563
                idStr, err := q.ready2Unack()
1✔
564
                if err == NilErr { // consumed all
2✔
565
                        break
1✔
566
                }
567
                if err != nil {
1✔
568
                        return nil, err
×
569
                }
×
570
                ids = append(ids, idStr)
1✔
571
                atomic.AddInt32(&q.fetchCount, 1)
1✔
572
        }
573
        // retry2Unack
574
        if fetchCount < int32(q.fetchLimit) || q.fetchLimit == 0 {
2✔
575
                for {
2✔
576
                        fetchCount = atomic.LoadInt32(&q.fetchCount)
1✔
577
                        if q.fetchLimit > 0 && fetchCount >= int32(q.fetchLimit) {
2✔
578
                                break
1✔
579
                        }
580
                        idStr, err := q.retry2Unack()
1✔
581
                        if err == NilErr { // consumed all
2✔
582
                                break
1✔
583
                        }
584
                        if err != nil {
1✔
585
                                return nil, err
×
586
                        }
×
587
                        ids = append(ids, idStr)
1✔
588
                        atomic.AddInt32(&q.fetchCount, 1)
1✔
589
                }
590
        }
591
        return ids, nil
1✔
592
}
593

594
func (q *DelayQueue) afterConsume() error {
1✔
595
        // unack to retry
1✔
596
        err := q.unack2Retry()
1✔
597
        if err != nil {
1✔
598
                return err
×
599
        }
×
600
        err = q.garbageCollect()
1✔
601
        if err != nil {
1✔
602
                return err
×
603
        }
×
604
        return nil
1✔
605
}
606

607
func (q *DelayQueue) setRunning() {
1✔
608
        atomic.StoreInt32(&q.running, 1)
1✔
609
}
1✔
610

611
func (q *DelayQueue) setNotRunning() {
1✔
612
        atomic.StoreInt32(&q.running, 0)
1✔
613
}
1✔
614

615
func (q *DelayQueue) assertNotRunning() {
1✔
616
        running := atomic.LoadInt32(&q.running)
1✔
617
        if running > 0 {
1✔
618
                panic("operation cannot be performed during running")
×
619
        }
620
}
621

622
func (q *DelayQueue) goWithRecover(fn func()) {
1✔
623
        go func() {
2✔
624
                defer func() {
2✔
625
                        if err := recover(); err != nil {
1✔
UNCOV
626
                                q.logger.Printf("panic: %v\n", err)
×
UNCOV
627
                        }
×
628
                }()
629
                fn()
1✔
630
        }()
631
}
632

633
// StartConsume creates a goroutine to consume message from DelayQueue
634
// use `<-done` to wait consumer stopping
635
// If there is no callback set, StartConsume will panic
636
func (q *DelayQueue) StartConsume() (done <-chan struct{}) {
1✔
637
        if q.cb == nil {
1✔
638
                panic("this instance has no callback")
×
639
        }
640
        q.close = make(chan struct{}, 1)
1✔
641
        q.setRunning()
1✔
642
        q.ticker = time.NewTicker(q.fetchInterval)
1✔
643
        q.consumeBuffer = make(chan string, q.fetchLimit)
1✔
644
        done0 := make(chan struct{})
1✔
645
        // start worker
1✔
646
        for i := 0; i < int(q.concurrent); i++ {
2✔
647
                q.goWithRecover(func() {
2✔
648
                        for id := range q.consumeBuffer {
2✔
649
                                q.callback(id)
1✔
650
                                q.afterConsume()
1✔
651
                        }
1✔
652
                })
653
        }
654
        // start main loop
655
        go func() {
2✔
656
        tickerLoop:
1✔
657
                for {
2✔
658
                        select {
1✔
659
                        case <-q.ticker.C:
1✔
660
                                ids, err := q.beforeConsume()
1✔
661
                                if err != nil {
1✔
662
                                        log.Printf("consume error: %v", err)
×
663
                                }
×
664
                                q.goWithRecover(func() {
2✔
665
                                        for _, id := range ids {
2✔
666
                                                q.consumeBuffer <- id
1✔
667
                                        }
1✔
668
                                })
669
                        case <-q.close:
1✔
670
                                break tickerLoop
1✔
671
                        }
672
                }
673
                close(done0)
1✔
674
        }()
675
        return done0
1✔
676
}
677

678
// StopConsume stops consumer goroutine
679
func (q *DelayQueue) StopConsume() {
1✔
680
        close(q.close)
1✔
681
        q.setNotRunning()
1✔
682
        if q.ticker != nil {
2✔
683
                q.ticker.Stop()
1✔
684
        }
1✔
685
}
686

687
// GetPendingCount returns the number of pending messages
688
func (q *DelayQueue) GetPendingCount() (int64, error) {
1✔
689
        return q.redisCli.ZCard(q.pendingKey)
1✔
690
}
1✔
691

692
// GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered
693
func (q *DelayQueue) GetReadyCount() (int64, error) {
1✔
694
        return q.redisCli.LLen(q.readyKey)
1✔
695
}
1✔
696

697
// GetProcessingCount returns the number of messages which are being processed
698
func (q *DelayQueue) GetProcessingCount() (int64, error) {
1✔
699
        return q.redisCli.ZCard(q.unAckKey)
1✔
700
}
1✔
701

702
// EventListener which will be called when events occur
703
// This Listener can be used to monitor running status
704
type EventListener interface {
705
        // OnEvent will be called when events occur
706
        OnEvent(*Event)
707
}
708

709
// ListenEvent register a listener which will be called when events occur,
710
// so it can be used to monitor running status
711
//
712
// But It can ONLY receive events from the CURRENT INSTANCE,
713
// if you want to listen to all events in queue, just use Monitor.ListenEvent
714
//
715
// There can be AT MOST ONE EventListener in an DelayQueue instance.
716
// If you are using customized listener, Monitor will stop working
717
func (q *DelayQueue) ListenEvent(listener EventListener) {
1✔
718
        q.eventListener = listener
1✔
719
}
1✔
720

721
// RemoveListener stops reporting events to EventListener
UNCOV
722
func (q *DelayQueue) DisableListener() {
×
723
        q.eventListener = nil
×
724
}
×
725

726
func (q *DelayQueue) reportEvent(code int, count int) {
1✔
727
        listener := q.eventListener // eventListener may be changed during running
1✔
728
        if listener != nil && count > 0 {
2✔
729
                event := &Event{
1✔
730
                        Code:      code,
1✔
731
                        Timestamp: time.Now().Unix(),
1✔
732
                        MsgCount:  count,
1✔
733
                }
1✔
734
                listener.OnEvent(event)
1✔
735
        }
1✔
736
}
737

738
// pubsubListener receives events and reports them through redis pubsub for monitoring
739
type pubsubListener struct {
740
        redisCli   RedisCli
741
        reportChan string
742
}
743

744
func genReportChannel(name string) string {
1✔
745
        return "dq:" + name + ":reportEvents"
1✔
746
}
1✔
747

748
// EnableReport enables reporting to monitor
749
func (q *DelayQueue) EnableReport() {
1✔
750
        reportChan := genReportChannel(q.name)
1✔
751
        q.ListenEvent(&pubsubListener{
1✔
752
                redisCli:   q.redisCli,
1✔
753
                reportChan: reportChan,
1✔
754
        })
1✔
755
}
1✔
756

757
// DisableReport stops reporting to monitor
UNCOV
758
func (q *DelayQueue) DisableReport() {
×
759
        q.DisableListener()
×
760
}
×
761

762
func (l *pubsubListener) OnEvent(event *Event) {
1✔
763
        payload := encodeEvent(event)
1✔
764
        l.redisCli.Publish(l.reportChan, payload)
1✔
765
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc