• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HDT3213 / delayqueue / 11175840384

04 Oct 2024 07:28AM UTC coverage: 82.665% (-1.2%) from 83.856%
11175840384

push

github

HDT3213
support custom logger

1 of 1 new or added line in 1 file covered. (100.0%)

84 existing lines in 2 files now uncovered.

577 of 698 relevant lines covered (82.66%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.58
/delayqueue.go
1
package delayqueue
2

3
import (
4
        "errors"
5
        "fmt"
6
        "log"
7
        "strconv"
8
        "strings"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/google/uuid"
14
)
15

16
// DelayQueue is a message queue supporting delayed/scheduled delivery based on redis
17
type DelayQueue struct {
18
        // name for this Queue. Make sure the name is unique in redis database
19
        name               string
20
        redisCli           RedisCli
21
        cb                 func(string) bool
22
        pendingKey         string // sorted set: message id -> delivery time
23
        readyKey           string // list
24
        unAckKey           string // sorted set: message id -> retry time
25
        retryKey           string // list
26
        retryCountKey      string // hash: message id -> remain retry count
27
        garbageKey         string // set: message id
28
        useHashTag         bool
29
        ticker             *time.Ticker
30
        logger             Logger
31
        close              chan struct{}
32
        running            int32
33
        maxConsumeDuration time.Duration // default 5 seconds
34
        msgTTL             time.Duration // default 1 hour
35
        defaultRetryCount  uint          // default 3
36
        fetchInterval      time.Duration // default 1 second
37
        fetchLimit         uint          // default no limit
38
        fetchCount         int32         // actually running task number
39
        concurrent         uint          // default 1, executed serially
40
        sha1map            map[string]string
41
        sha1mapMu          *sync.RWMutex
42
        scriptPreload      bool
43
        // for batch consume
44
        consumeBuffer chan string
45

46
        eventListener EventListener
47
}
48

49
// NilErr represents redis nil
50
var NilErr = errors.New("nil")
51

52
// RedisCli is abstraction for redis client, required commands only not all commands
53
type RedisCli interface {
54
        // Eval sends lua script to redis
55
        // args should be string, integer or float
56
        // returns string, int64, []interface{} (elements can be string or int64)
57
        Eval(script string, keys []string, args []interface{}) (interface{}, error)
58
        Set(key string, value string, expiration time.Duration) error
59
        // Get represents redis command GET
60
        // please NilErr when no such key in redis
61
        Get(key string) (string, error)
62
        Del(keys []string) error
63
        HSet(key string, field string, value string) error
64
        HDel(key string, fields []string) error
65
        SMembers(key string) ([]string, error)
66
        SRem(key string, members []string) error
67
        ZAdd(key string, values map[string]float64) error
68
        ZRem(key string, fields []string) error
69
        ZCard(key string) (int64, error)
70
        LLen(key string) (int64, error)
71

72
        // Publish used for monitor only
73
        Publish(channel string, payload string) error
74
        // Subscribe used for monitor only
75
        // returns: payload channel, subscription closer, error; the subscription closer should close payload channel as well
76
        Subscribe(channel string) (payloads <-chan string, close func(), err error)
77

78
        // ScriptLoad call `script load` command
79
        ScriptLoad(script string) (string, error)
80
        // EvalSha run preload scripts
81
        // If there is no preload scripts please return error with message "NOSCRIPT"
82
        EvalSha(sha1 string, keys []string, args []interface{}) (interface{}, error)
83
}
84

85
// Logger is an abstraction of logging system
86
type Logger interface {
87
        Printf(format string, v ...interface{})
88
}
89

90
type hashTagKeyOpt int
91

92
// CallbackFunc receives and consumes messages
93
// returns true to confirm successfully consumed, false to re-deliver this message
94
type CallbackFunc = func(string) bool
95

96
// UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot.
97
// If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue
98
// WARNING! Changing (add or remove) this option will cause DelayQueue failing to read existed data in redis
99
// see more:  https://redis.io/docs/reference/cluster-spec/#hash-tags
100
func UseHashTagKey() interface{} {
1✔
101
        return hashTagKeyOpt(1)
1✔
102
}
1✔
103

104
// NewQueue0 creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message
105
// callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
106
func NewQueue0(name string, cli RedisCli, opts ...interface{}) *DelayQueue {
1✔
107
        if name == "" {
1✔
UNCOV
108
                panic("name is required")
×
109
        }
110
        if cli == nil {
1✔
UNCOV
111
                panic("cli is required")
×
112
        }
113
        useHashTag := false
1✔
114
        var callback CallbackFunc = nil
1✔
115
        for _, opt := range opts {
2✔
116
                switch o := opt.(type) {
1✔
117
                case hashTagKeyOpt:
1✔
118
                        useHashTag = true
1✔
119
                case CallbackFunc:
1✔
120
                        callback = o
1✔
121
                }
122
        }
123
        var keyPrefix string
1✔
124
        if useHashTag {
2✔
125
                keyPrefix = "{dp:" + name + "}"
1✔
126
        } else {
2✔
127
                keyPrefix = "dp:" + name
1✔
128
        }
1✔
129
        return &DelayQueue{
1✔
130
                name:               name,
1✔
131
                redisCli:           cli,
1✔
132
                cb:                 callback,
1✔
133
                pendingKey:         keyPrefix + ":pending",
1✔
134
                readyKey:           keyPrefix + ":ready",
1✔
135
                unAckKey:           keyPrefix + ":unack",
1✔
136
                retryKey:           keyPrefix + ":retry",
1✔
137
                retryCountKey:      keyPrefix + ":retry:cnt",
1✔
138
                garbageKey:         keyPrefix + ":garbage",
1✔
139
                useHashTag:         useHashTag,
1✔
140
                close:              nil,
1✔
141
                maxConsumeDuration: 5 * time.Second,
1✔
142
                msgTTL:             time.Hour,
1✔
143
                logger:             log.Default(),
1✔
144
                defaultRetryCount:  3,
1✔
145
                fetchInterval:      time.Second,
1✔
146
                concurrent:         1,
1✔
147
                sha1map:            make(map[string]string),
1✔
148
                sha1mapMu:          &sync.RWMutex{},
1✔
149
                scriptPreload:      true,
1✔
150
        }
1✔
151
}
152

153
// WithCallback set callback for queue to receives and consumes messages
154
// callback returns true to confirm successfully consumed, false to re-deliver this message
155
func (q *DelayQueue) WithCallback(callback CallbackFunc) *DelayQueue {
1✔
156
        q.cb = callback
1✔
157
        return q
1✔
158
}
1✔
159

160
// WithLogger customizes logger for queue
161
func (q *DelayQueue) WithLogger(logger Logger) *DelayQueue {
1✔
162
        q.logger = logger
1✔
163
        return q
1✔
164
}
1✔
165

166
// WithFetchInterval customizes the interval at which consumer fetch message from redis
167
func (q *DelayQueue) WithFetchInterval(d time.Duration) *DelayQueue {
1✔
168
        q.fetchInterval = d
1✔
169
        return q
1✔
170
}
1✔
171

172
// WithScriptPreload use script load command preload scripts to redis
173
func (q *DelayQueue) WithScriptPreload(flag bool) *DelayQueue {
1✔
174
        q.scriptPreload = flag
1✔
175
        return q
1✔
176
}
1✔
177

178
// WithMaxConsumeDuration customizes max consume duration
179
// If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again
180
func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue {
1✔
181
        q.maxConsumeDuration = d
1✔
182
        return q
1✔
183
}
1✔
184

185
// WithFetchLimit limits the max number of processing messages, 0 means no limit
186
func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue {
1✔
187
        q.fetchLimit = limit
1✔
188
        return q
1✔
189
}
1✔
190

191
// WithConcurrent sets the number of concurrent consumers
192
func (q *DelayQueue) WithConcurrent(c uint) *DelayQueue {
1✔
193
        if c == 0 {
1✔
UNCOV
194
                panic("concurrent cannot be 0")
×
195
        }
196
        q.assertNotRunning()
1✔
197
        q.concurrent = c
1✔
198
        return q
1✔
199
}
200

201
// WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue
202
// use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message
203
func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue {
1✔
204
        q.defaultRetryCount = count
1✔
205
        return q
1✔
206
}
1✔
207

208
func (q *DelayQueue) genMsgKey(idStr string) string {
1✔
209
        if q.useHashTag {
2✔
210
                return "{dp:" + q.name + "}" + ":msg:" + idStr
1✔
211
        }
1✔
212
        return "dp:" + q.name + ":msg:" + idStr
1✔
213
}
214

215
type retryCountOpt int
216

217
// WithRetryCount set retry count for a msg
218
// example: queue.SendDelayMsg(payload, duration, delayqueue.WithRetryCount(3))
219
func WithRetryCount(count int) interface{} {
1✔
220
        return retryCountOpt(count)
1✔
221
}
1✔
222

223
type msgTTLOpt time.Duration
224

225
// WithMsgTTL set ttl for a msg
226
// example: queue.SendDelayMsg(payload, duration, delayqueue.WithMsgTTL(Hour))
227
func WithMsgTTL(d time.Duration) interface{} {
1✔
228
        return msgTTLOpt(d)
1✔
229
}
1✔
230

231
// SendScheduleMsg submits a message delivered at given time
232
func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error {
1✔
233
        // parse options
1✔
234
        retryCount := q.defaultRetryCount
1✔
235
        for _, opt := range opts {
2✔
236
                switch o := opt.(type) {
1✔
237
                case retryCountOpt:
1✔
238
                        retryCount = uint(o)
1✔
239
                case msgTTLOpt:
1✔
240
                        q.msgTTL = time.Duration(o)
1✔
241
                }
242
        }
243
        // generate id
244
        idStr := uuid.Must(uuid.NewRandom()).String()
1✔
245
        now := time.Now()
1✔
246
        // store msg
1✔
247
        msgTTL := t.Sub(now) + q.msgTTL // delivery + q.msgTTL
1✔
248
        err := q.redisCli.Set(q.genMsgKey(idStr), payload, msgTTL)
1✔
249
        if err != nil {
1✔
UNCOV
250
                return fmt.Errorf("store msg failed: %v", err)
×
UNCOV
251
        }
×
252
        // store retry count
253
        err = q.redisCli.HSet(q.retryCountKey, idStr, strconv.Itoa(int(retryCount)))
1✔
254
        if err != nil {
1✔
UNCOV
255
                return fmt.Errorf("store retry count failed: %v", err)
×
UNCOV
256
        }
×
257
        // put to pending
258
        err = q.redisCli.ZAdd(q.pendingKey, map[string]float64{idStr: float64(t.Unix())})
1✔
259
        if err != nil {
1✔
UNCOV
260
                return fmt.Errorf("push to pending failed: %v", err)
×
UNCOV
261
        }
×
262
        q.reportEvent(NewMessageEvent, 1)
1✔
263
        return nil
1✔
264
}
265

266
// SendDelayMsg submits a message delivered after given duration
267
func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error {
1✔
268
        t := time.Now().Add(duration)
1✔
269
        return q.SendScheduleMsg(payload, t, opts...)
1✔
270
}
1✔
271

272
func (q *DelayQueue) loadScript(script string) (string, error) {
1✔
273
        sha1, err := q.redisCli.ScriptLoad(script)
1✔
274
        if err != nil {
1✔
UNCOV
275
                return "", err
×
UNCOV
276
        }
×
277
        q.sha1mapMu.Lock()
1✔
278
        q.sha1map[script] = sha1
1✔
279
        q.sha1mapMu.Unlock()
1✔
280
        return sha1, nil
1✔
281
}
282

283
func (q *DelayQueue) eval(script string, keys []string, args []interface{}) (interface{}, error) {
1✔
284
        if !q.scriptPreload {
1✔
UNCOV
285
                return q.redisCli.Eval(script, keys, args)
×
UNCOV
286
        }
×
287
        var err error
1✔
288
        q.sha1mapMu.RLock()
1✔
289
        sha1, ok := q.sha1map[script]
1✔
290
        q.sha1mapMu.RUnlock()
1✔
291
        if !ok {
2✔
292
                sha1, err = q.loadScript(script)
1✔
293
                if err != nil {
1✔
UNCOV
294
                        return nil, err
×
UNCOV
295
                }
×
296
        }
297
        result, err := q.redisCli.EvalSha(sha1, keys, args)
1✔
298
        if err == nil {
2✔
299
                return result, err
1✔
300
        }
1✔
301
        // script not loaded, reload it
302
        // It is possible to access a node in the cluster that has no pre-loaded scripts.
303
        if strings.HasPrefix(err.Error(), "NOSCRIPT") {
2✔
304
                sha1, err = q.loadScript(script)
1✔
305
                if err != nil {
1✔
UNCOV
306
                        return nil, err
×
307
                }
×
308
                // try again
309
                result, err = q.redisCli.EvalSha(sha1, keys, args)
1✔
310
        }
311
        return result, err
1✔
312
}
313

314
// pending2ReadyScript atomically moves messages from pending to ready
315
// keys: pendingKey, readyKey
316
// argv: currentTime
317
// returns: ready message number
318
const pending2ReadyScript = `
319
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- get ready msg
320
if (#msgs == 0) then return end
321
local args2 = {} -- keys to push into ready
322
for _,v in ipairs(msgs) do
323
        table.insert(args2, v) 
324
    if (#args2 == 4000) then
325
                redis.call('LPush', KEYS[2], unpack(args2))
326
                args2 = {}
327
        end
328
end
329
if (#args2 > 0) then 
330
        redis.call('LPush', KEYS[2], unpack(args2))
331
end
332
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- remove msgs from pending
333
return #msgs
334
`
335

336
func (q *DelayQueue) pending2Ready() error {
1✔
337
        now := time.Now().Unix()
1✔
338
        keys := []string{q.pendingKey, q.readyKey}
1✔
339
        raw, err := q.eval(pending2ReadyScript, keys, []interface{}{now})
1✔
340
        if err != nil && err != NilErr {
1✔
341
                return fmt.Errorf("pending2ReadyScript failed: %v", err)
×
342
        }
×
343
        count, ok := raw.(int64)
1✔
344
        if ok {
2✔
345
                q.reportEvent(ReadyEvent, int(count))
1✔
346
        }
1✔
347
        return nil
1✔
348
}
349

350
// ready2UnackScript atomically moves messages from ready to unack
351
// keys: readyKey/retryKey, unackKey
352
// argv: retryTime
353
const ready2UnackScript = `
354
local msg = redis.call('RPop', KEYS[1])
355
if (not msg) then return end
356
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
357
return msg
358
`
359

360
func (q *DelayQueue) ready2Unack() (string, error) {
1✔
361
        retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
1✔
362
        keys := []string{q.readyKey, q.unAckKey}
1✔
363
        ret, err := q.eval(ready2UnackScript, keys, []interface{}{retryTime})
1✔
364
        if err == NilErr {
2✔
365
                return "", err
1✔
366
        }
1✔
367
        if err != nil {
1✔
UNCOV
368
                return "", fmt.Errorf("ready2UnackScript failed: %v", err)
×
UNCOV
369
        }
×
370
        str, ok := ret.(string)
1✔
371
        if !ok {
1✔
372
                return "", fmt.Errorf("illegal result: %#v", ret)
×
373
        }
×
374
        q.reportEvent(DeliveredEvent, 1)
1✔
375
        return str, nil
1✔
376
}
377

378
func (q *DelayQueue) retry2Unack() (string, error) {
1✔
379
        retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
1✔
380
        keys := []string{q.retryKey, q.unAckKey}
1✔
381
        ret, err := q.eval(ready2UnackScript, keys, []interface{}{retryTime, q.retryKey, q.unAckKey})
1✔
382
        if err == NilErr {
2✔
383
                return "", NilErr
1✔
384
        }
1✔
385
        if err != nil {
1✔
UNCOV
386
                return "", fmt.Errorf("ready2UnackScript failed: %v", err)
×
UNCOV
387
        }
×
388
        str, ok := ret.(string)
1✔
389
        if !ok {
1✔
UNCOV
390
                return "", fmt.Errorf("illegal result: %#v", ret)
×
UNCOV
391
        }
×
392
        return str, nil
1✔
393
}
394

395
func (q *DelayQueue) callback(idStr string) error {
1✔
396
        payload, err := q.redisCli.Get(q.genMsgKey(idStr))
1✔
397
        if err == NilErr {
1✔
UNCOV
398
                return nil
×
UNCOV
399
        }
×
400
        if err != nil {
1✔
UNCOV
401
                // Is an IO error?
×
UNCOV
402
                return fmt.Errorf("get message payload failed: %v", err)
×
UNCOV
403
        }
×
404
        ack := q.cb(payload)
1✔
405
        if ack {
2✔
406
                err = q.ack(idStr)
1✔
407
        } else {
2✔
408
                err = q.nack(idStr)
1✔
409
        }
1✔
410
        return err
1✔
411
}
412

413
func (q *DelayQueue) ack(idStr string) error {
1✔
414
        atomic.AddInt32(&q.fetchCount, -1)
1✔
415
        err := q.redisCli.ZRem(q.unAckKey, []string{idStr})
1✔
416
        if err != nil {
1✔
UNCOV
417
                return fmt.Errorf("remove from unack failed: %v", err)
×
UNCOV
418
        }
×
419
        // msg key has ttl, ignore result of delete
420
        _ = q.redisCli.Del([]string{q.genMsgKey(idStr)})
1✔
421
        _ = q.redisCli.HDel(q.retryCountKey, []string{idStr})
1✔
422
        q.reportEvent(AckEvent, 1)
1✔
423
        return nil
1✔
424
}
425

426
func (q *DelayQueue) nack(idStr string) error {
1✔
427
        atomic.AddInt32(&q.fetchCount, -1)
1✔
428
        // update retry time as now, unack2Retry will move it to retry immediately
1✔
429
        err := q.redisCli.ZAdd(q.unAckKey, map[string]float64{
1✔
430
                idStr: float64(time.Now().Unix()),
1✔
431
        })
1✔
432
        if err != nil {
1✔
UNCOV
433
                return fmt.Errorf("negative ack failed: %v", err)
×
UNCOV
434
        }
×
435
        q.reportEvent(NackEvent, 1)
1✔
436
        return nil
1✔
437
}
438

439
// unack2RetryScript atomically moves messages from unack to retry which remaining retry count greater than 0,
440
// and moves messages from unack to garbage which  retry count is 0
441
// Because DelayQueue cannot determine garbage message before eval unack2RetryScript, so it cannot pass keys parameter to redisCli.Eval
442
// Therefore unack2RetryScript moves garbage message to garbageKey instead of deleting directly
443
// keys: unackKey, retryCountKey, retryKey, garbageKey
444
// argv: currentTime
445
// returns: {retryMsgs, failMsgs}
446
const unack2RetryScript = `
447
local unack2retry = function(msgs)
448
        local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- get retry count
449
        local retryMsgs = 0
450
        local failMsgs = 0
451
        for i,v in ipairs(retryCounts) do
452
                local k = msgs[i]
453
                if v ~= false and v ~= nil and v ~= '' and tonumber(v) > 0 then
454
                        redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count
455
                        redis.call("LPush", KEYS[3], k) -- add to retry
456
                        retryMsgs = retryMsgs + 1
457
                else
458
                        redis.call("HDel", KEYS[2], k) -- del retry count
459
                        redis.call("SAdd", KEYS[4], k) -- add to garbage
460
                        failMsgs = failMsgs + 1
461
                end
462
        end
463
        return retryMsgs, failMsgs
464
end
465

466
local retryMsgs = 0
467
local failMsgs = 0
468
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- get retry msg
469
if (#msgs == 0) then return end
470
if #msgs < 4000 then
471
        local d1, d2 = unack2retry(msgs)
472
        retryMsgs = retryMsgs + d1
473
        failMsgs = failMsgs + d2
474
else
475
        local buf = {}
476
        for _,v in ipairs(msgs) do
477
                table.insert(buf, v)
478
                if #buf == 4000 then
479
                    local d1, d2 = unack2retry(buf)
480
                        retryMsgs = retryMsgs + d1
481
                        failMsgs = failMsgs + d2
482
                        buf = {}
483
                end
484
        end
485
        if (#buf > 0) then
486
                local d1, d2 = unack2retry(buf)
487
                retryMsgs = retryMsgs + d1
488
                failMsgs = failMsgs + d2
489
        end
490
end
491
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- remove msgs from unack
492
return {retryMsgs, failMsgs}
493
`
494

495
func (q *DelayQueue) unack2Retry() error {
1✔
496
        keys := []string{q.unAckKey, q.retryCountKey, q.retryKey, q.garbageKey}
1✔
497
        now := time.Now()
1✔
498
        raw, err := q.eval(unack2RetryScript, keys, []interface{}{now.Unix()})
1✔
499
        if err != nil && err != NilErr {
1✔
500
                return fmt.Errorf("unack to retry script failed: %v", err)
×
UNCOV
501
        }
×
502
        infos, ok := raw.([]interface{})
1✔
503
        if ok && len(infos) == 2 {
2✔
504
                retryCount, ok := infos[0].(int64)
1✔
505
                if ok {
2✔
506
                        q.reportEvent(RetryEvent, int(retryCount))
1✔
507
                }
1✔
508
                failCount, ok := infos[1].(int64)
1✔
509
                if ok {
2✔
510
                        q.reportEvent(FinalFailedEvent, int(failCount))
1✔
511
                }
1✔
512
        }
513
        return nil
1✔
514
}
515

516
func (q *DelayQueue) garbageCollect() error {
1✔
517
        msgIds, err := q.redisCli.SMembers(q.garbageKey)
1✔
518
        if err != nil {
1✔
UNCOV
519
                return fmt.Errorf("smembers failed: %v", err)
×
UNCOV
520
        }
×
521
        if len(msgIds) == 0 {
2✔
522
                return nil
1✔
523
        }
1✔
524
        // allow concurrent clean
525
        msgKeys := make([]string, 0, len(msgIds))
1✔
526
        for _, idStr := range msgIds {
2✔
527
                msgKeys = append(msgKeys, q.genMsgKey(idStr))
1✔
528
        }
1✔
529
        err = q.redisCli.Del(msgKeys)
1✔
530
        if err != nil && err != NilErr {
1✔
UNCOV
531
                return fmt.Errorf("del msgs failed: %v", err)
×
UNCOV
532
        }
×
533
        err = q.redisCli.SRem(q.garbageKey, msgIds)
1✔
534
        if err != nil && err != NilErr {
1✔
UNCOV
535
                return fmt.Errorf("remove from garbage key failed: %v", err)
×
UNCOV
536
        }
×
537
        return nil
1✔
538
}
539

540
func (q *DelayQueue) beforeConsume() ([]string, error) {
1✔
541
        // pending to ready
1✔
542
        err := q.pending2Ready()
1✔
543
        if err != nil {
1✔
UNCOV
544
                return nil, err
×
UNCOV
545
        }
×
546
        // ready2Unack
547
        // prioritize new message consumption to avoid avalanches
548
        ids := make([]string, 0, q.fetchLimit)
1✔
549
        var fetchCount int32
1✔
550
        for {
2✔
551
                fetchCount = atomic.LoadInt32(&q.fetchCount)
1✔
552
                if q.fetchLimit > 0 && fetchCount >= int32(q.fetchLimit) {
2✔
553
                        break
1✔
554
                }
555
                idStr, err := q.ready2Unack()
1✔
556
                if err == NilErr { // consumed all
2✔
557
                        break
1✔
558
                }
559
                if err != nil {
1✔
UNCOV
560
                        return nil, err
×
UNCOV
561
                }
×
562
                ids = append(ids, idStr)
1✔
563
                atomic.AddInt32(&q.fetchCount, 1)
1✔
564
        }
565
        // retry2Unack
566
        if fetchCount < int32(q.fetchLimit) || q.fetchLimit == 0 {
2✔
567
                for {
2✔
568
                        fetchCount = atomic.LoadInt32(&q.fetchCount)
1✔
569
                        if q.fetchLimit > 0 && fetchCount >= int32(q.fetchLimit) {
2✔
570
                                break
1✔
571
                        }
572
                        idStr, err := q.retry2Unack()
1✔
573
                        if err == NilErr { // consumed all
2✔
574
                                break
1✔
575
                        }
576
                        if err != nil {
1✔
UNCOV
577
                                return nil, err
×
UNCOV
578
                        }
×
579
                        ids = append(ids, idStr)
1✔
580
                        atomic.AddInt32(&q.fetchCount, 1)
1✔
581
                }
582
        }
583
        return ids, nil
1✔
584
}
585

586
func (q *DelayQueue) afterConsume() error {
1✔
587
        // unack to retry
1✔
588
        err := q.unack2Retry()
1✔
589
        if err != nil {
1✔
UNCOV
590
                return err
×
UNCOV
591
        }
×
592
        err = q.garbageCollect()
1✔
593
        if err != nil {
1✔
594
                return err
×
UNCOV
595
        }
×
596
        return nil
1✔
597
}
598

599
func (q *DelayQueue) setRunning() {
1✔
600
        atomic.StoreInt32(&q.running, 1)
1✔
601
}
1✔
602

603
func (q *DelayQueue) setNotRunning() {
1✔
604
        atomic.StoreInt32(&q.running, 0)
1✔
605
}
1✔
606

607
func (q *DelayQueue) assertNotRunning() {
1✔
608
        running := atomic.LoadInt32(&q.running)
1✔
609
        if running > 0 {
1✔
UNCOV
610
                panic("operation cannot be performed during running")
×
611
        }
612
}
613

614
func (q *DelayQueue) goWithRecover(fn func()) {
1✔
615
        go func() {
2✔
616
                defer func() {
2✔
617
                        if err := recover(); err != nil {
2✔
618
                                q.logger.Printf("panic: %v\n", err)
1✔
619
                        }
1✔
620
                }()
621
                fn()
1✔
622
        }()
623
}
624

625
// StartConsume creates a goroutine to consume message from DelayQueue
626
// use `<-done` to wait consumer stopping
627
// If there is no callback set, StartConsume will panic
628
func (q *DelayQueue) StartConsume() (done <-chan struct{}) {
1✔
629
        if q.cb == nil {
1✔
UNCOV
630
                panic("this instance has no callback")
×
631
        }
632
        q.close = make(chan struct{}, 1)
1✔
633
        q.setRunning()
1✔
634
        q.ticker = time.NewTicker(q.fetchInterval)
1✔
635
        q.consumeBuffer = make(chan string, q.fetchLimit)
1✔
636
        done0 := make(chan struct{})
1✔
637
        // start worker
1✔
638
        for i := 0; i < int(q.concurrent); i++ {
2✔
639
                q.goWithRecover(func() {
2✔
640
                        for id := range q.consumeBuffer {
2✔
641
                                q.callback(id)
1✔
642
                                q.afterConsume()
1✔
643
                        }
1✔
644
                })
645
        }
646
        // start main loop
647
        go func() {
2✔
648
        tickerLoop:
1✔
649
                for {
2✔
650
                        select {
1✔
651
                        case <-q.ticker.C:
1✔
652
                                ids, err := q.beforeConsume()
1✔
653
                                if err != nil {
1✔
654
                                        log.Printf("consume error: %v", err)
×
655
                                }
×
656
                                q.goWithRecover(func() {
2✔
657
                                        for _, id := range ids {
2✔
658
                                                q.consumeBuffer <- id
1✔
659
                                        }
1✔
660
                                })
661
                        case <-q.close:
1✔
662
                                break tickerLoop
1✔
663
                        }
664
                }
665
                close(done0)
1✔
666
        }()
667
        return done0
1✔
668
}
669

670
// StopConsume stops consumer goroutine
671
func (q *DelayQueue) StopConsume() {
1✔
672
        close(q.close)
1✔
673
        q.setNotRunning()
1✔
674
        if q.ticker != nil {
2✔
675
                q.ticker.Stop()
1✔
676
        }
1✔
677
        close(q.consumeBuffer)
1✔
678
}
679

680
// GetPendingCount returns the number of pending messages
681
func (q *DelayQueue) GetPendingCount() (int64, error) {
1✔
682
        return q.redisCli.ZCard(q.pendingKey)
1✔
683
}
1✔
684

685
// GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered
686
func (q *DelayQueue) GetReadyCount() (int64, error) {
1✔
687
        return q.redisCli.LLen(q.readyKey)
1✔
688
}
1✔
689

690
// GetProcessingCount returns the number of messages which are being processed
691
func (q *DelayQueue) GetProcessingCount() (int64, error) {
1✔
692
        return q.redisCli.ZCard(q.unAckKey)
1✔
693
}
1✔
694

695
// EventListener which will be called when events occur
696
// This Listener can be used to monitor running status
697
type EventListener interface {
698
        // OnEvent will be called when events occur
699
        OnEvent(*Event)
700
}
701

702
// ListenEvent register a listener which will be called when events occur,
703
// so it can be used to monitor running status
704
//
705
// But It can ONLY receive events from the CURRENT INSTANCE,
706
// if you want to listen to all events in queue, just use Monitor.ListenEvent
707
//
708
// There can be AT MOST ONE EventListener in an DelayQueue instance.
709
// If you are using customized listener, Monitor will stop working
710
func (q *DelayQueue) ListenEvent(listener EventListener) {
1✔
711
        q.eventListener = listener
1✔
712
}
1✔
713

714
// RemoveListener stops reporting events to EventListener
UNCOV
715
func (q *DelayQueue) DisableListener() {
×
UNCOV
716
        q.eventListener = nil
×
UNCOV
717
}
×
718

719
func (q *DelayQueue) reportEvent(code int, count int) {
1✔
720
        listener := q.eventListener // eventListener may be changed during running
1✔
721
        if listener != nil && count > 0 {
2✔
722
                event := &Event{
1✔
723
                        Code:      code,
1✔
724
                        Timestamp: time.Now().Unix(),
1✔
725
                        MsgCount:  count,
1✔
726
                }
1✔
727
                listener.OnEvent(event)
1✔
728
        }
1✔
729
}
730

731
// pubsubListener receives events and reports them through redis pubsub for monitoring
732
type pubsubListener struct {
733
        redisCli   RedisCli
734
        reportChan string
735
}
736

737
func genReportChannel(name string) string {
1✔
738
        return "dq:" + name + ":reportEvents"
1✔
739
}
1✔
740

741
// EnableReport enables reporting to monitor
742
func (q *DelayQueue) EnableReport() {
1✔
743
        reportChan := genReportChannel(q.name)
1✔
744
        q.ListenEvent(&pubsubListener{
1✔
745
                redisCli:   q.redisCli,
1✔
746
                reportChan: reportChan,
1✔
747
        })
1✔
748
}
1✔
749

750
// DisableReport stops reporting to monitor
UNCOV
751
func (q *DelayQueue) DisableReport() {
×
UNCOV
752
        q.DisableListener()
×
UNCOV
753
}
×
754

755
func (l *pubsubListener) OnEvent(event *Event) {
1✔
756
        payload := encodeEvent(event)
1✔
757
        l.redisCli.Publish(l.reportChan, payload)
1✔
758
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc