• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HDT3213 / delayqueue / 11071804419

27 Sep 2024 01:43PM UTC coverage: 83.856% (+1.2%) from 82.698%
11071804419

push

github

HDT3213
fix test suite

535 of 638 relevant lines covered (83.86%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.07
/delayqueue.go
1
package delayqueue
2

3
import (
4
        "errors"
5
        "fmt"
6
        "log"
7
        "strconv"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/google/uuid"
12
)
13

14
// DelayQueue is a message queue supporting delayed/scheduled delivery based on redis
15
type DelayQueue struct {
16
        // name for this Queue. Make sure the name is unique in redis database
17
        name               string
18
        redisCli           RedisCli
19
        cb                 func(string) bool
20
        pendingKey         string // sorted set: message id -> delivery time
21
        readyKey           string // list
22
        unAckKey           string // sorted set: message id -> retry time
23
        retryKey           string // list
24
        retryCountKey      string // hash: message id -> remain retry count
25
        garbageKey         string // set: message id
26
        useHashTag         bool
27
        ticker             *time.Ticker
28
        logger             *log.Logger
29
        close              chan struct{}
30
        running            int32
31
        maxConsumeDuration time.Duration // default 5 seconds
32
        msgTTL             time.Duration // default 1 hour
33
        defaultRetryCount  uint          // default 3
34
        fetchInterval      time.Duration // default 1 second
35
        fetchLimit         uint          // default no limit
36
        fetchCount         int32         // actually running task number
37
        concurrent         uint          // default 1, executed serially
38

39
        // for batch consume
40
        consumeBuffer chan string
41

42
        eventListener EventListener
43
}
44

45
// NilErr represents redis nil
46
var NilErr = errors.New("nil")
47

48
// RedisCli is abstraction for redis client, required commands only not all commands
49
type RedisCli interface {
50
        // Eval sends lua script to redis
51
        // args should be string, integer or float
52
        // returns string, int64, []interface{} (elements can be string or int64)
53
        Eval(script string, keys []string, args []interface{}) (interface{}, error)
54
        Set(key string, value string, expiration time.Duration) error
55
        // Get represents redis command GET
56
        // please NilErr when no such key in redis
57
        Get(key string) (string, error)
58
        Del(keys []string) error
59
        HSet(key string, field string, value string) error
60
        HDel(key string, fields []string) error
61
        SMembers(key string) ([]string, error)
62
        SRem(key string, members []string) error
63
        ZAdd(key string, values map[string]float64) error
64
        ZRem(key string, fields []string) error
65
        ZCard(key string) (int64, error)
66
        LLen(key string) (int64, error)
67

68
        // Publish used for monitor only
69
        Publish(channel string, payload string) error
70
        // Subscribe used for monitor only
71
        // returns: payload channel, subscription closer, error; the subscription closer should close payload channel as well
72
        Subscribe(channel string) (payloads <-chan string, close func(), err error)
73
}
74

75
type hashTagKeyOpt int
76

77
// CallbackFunc receives and consumes messages
78
// returns true to confirm successfully consumed, false to re-deliver this message
79
type CallbackFunc = func(string) bool
80

81
// UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot.
82
// If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue
83
// WARNING! Changing (add or remove) this option will cause DelayQueue failing to read existed data in redis
84
// see more:  https://redis.io/docs/reference/cluster-spec/#hash-tags
85
func UseHashTagKey() interface{} {
1✔
86
        return hashTagKeyOpt(1)
1✔
87
}
1✔
88

89
// NewQueue0 creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message
90
// callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
91
func NewQueue0(name string, cli RedisCli, opts ...interface{}) *DelayQueue {
1✔
92
        if name == "" {
1✔
93
                panic("name is required")
×
94
        }
95
        if cli == nil {
1✔
96
                panic("cli is required")
×
97
        }
98
        useHashTag := false
1✔
99
        var callback CallbackFunc = nil
1✔
100
        for _, opt := range opts {
2✔
101
                switch o := opt.(type) {
1✔
102
                case hashTagKeyOpt:
1✔
103
                        useHashTag = true
1✔
104
                case CallbackFunc:
1✔
105
                        callback = o
1✔
106
                }
107
        }
108
        var keyPrefix string
1✔
109
        if useHashTag {
2✔
110
                keyPrefix = "{dp:" + name + "}"
1✔
111
        } else {
2✔
112
                keyPrefix = "dp:" + name
1✔
113
        }
1✔
114
        return &DelayQueue{
1✔
115
                name:               name,
1✔
116
                redisCli:           cli,
1✔
117
                cb:                 callback,
1✔
118
                pendingKey:         keyPrefix + ":pending",
1✔
119
                readyKey:           keyPrefix + ":ready",
1✔
120
                unAckKey:           keyPrefix + ":unack",
1✔
121
                retryKey:           keyPrefix + ":retry",
1✔
122
                retryCountKey:      keyPrefix + ":retry:cnt",
1✔
123
                garbageKey:         keyPrefix + ":garbage",
1✔
124
                useHashTag:         useHashTag,
1✔
125
                close:              nil,
1✔
126
                maxConsumeDuration: 5 * time.Second,
1✔
127
                msgTTL:             time.Hour,
1✔
128
                logger:             log.Default(),
1✔
129
                defaultRetryCount:  3,
1✔
130
                fetchInterval:      time.Second,
1✔
131
                concurrent:         1,
1✔
132
        }
1✔
133
}
134

135
// WithCallback set callback for queue to receives and consumes messages
136
// callback returns true to confirm successfully consumed, false to re-deliver this message
137
func (q *DelayQueue) WithCallback(callback CallbackFunc) *DelayQueue {
1✔
138
        q.cb = callback
1✔
139
        return q
1✔
140
}
1✔
141

142
// WithLogger customizes logger for queue
143
func (q *DelayQueue) WithLogger(logger *log.Logger) *DelayQueue {
1✔
144
        q.logger = logger
1✔
145
        return q
1✔
146
}
1✔
147

148
// WithFetchInterval customizes the interval at which consumer fetch message from redis
149
func (q *DelayQueue) WithFetchInterval(d time.Duration) *DelayQueue {
1✔
150
        q.fetchInterval = d
1✔
151
        return q
1✔
152
}
1✔
153

154
// WithMaxConsumeDuration customizes max consume duration
155
// If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again
156
func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue {
1✔
157
        q.maxConsumeDuration = d
1✔
158
        return q
1✔
159
}
1✔
160

161
// WithFetchLimit limits the max number of processing messages, 0 means no limit
162
func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue {
1✔
163
        q.fetchLimit = limit
1✔
164
        return q
1✔
165
}
1✔
166

167
// WithConcurrent sets the number of concurrent consumers
168
func (q *DelayQueue) WithConcurrent(c uint) *DelayQueue {
1✔
169
        if c == 0 {
1✔
170
                panic("concurrent cannot be 0")
×
171
        }
172
        q.assertNotRunning()
1✔
173
        q.concurrent = c
1✔
174
        return q
1✔
175
}
176

177
// WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue
178
// use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message
179
func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue {
1✔
180
        q.defaultRetryCount = count
1✔
181
        return q
1✔
182
}
1✔
183

184
func (q *DelayQueue) genMsgKey(idStr string) string {
1✔
185
        if q.useHashTag {
2✔
186
                return "{dp:" + q.name + "}" + ":msg:" + idStr
1✔
187
        }
1✔
188
        return "dp:" + q.name + ":msg:" + idStr
1✔
189
}
190

191
type retryCountOpt int
192

193
// WithRetryCount set retry count for a msg
194
// example: queue.SendDelayMsg(payload, duration, delayqueue.WithRetryCount(3))
195
func WithRetryCount(count int) interface{} {
1✔
196
        return retryCountOpt(count)
1✔
197
}
1✔
198

199
type msgTTLOpt time.Duration
200

201
// WithMsgTTL set ttl for a msg
202
// example: queue.SendDelayMsg(payload, duration, delayqueue.WithMsgTTL(Hour))
203
func WithMsgTTL(d time.Duration) interface{} {
1✔
204
        return msgTTLOpt(d)
1✔
205
}
1✔
206

207
// SendScheduleMsg submits a message delivered at given time
208
func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error {
1✔
209
        // parse options
1✔
210
        retryCount := q.defaultRetryCount
1✔
211
        for _, opt := range opts {
2✔
212
                switch o := opt.(type) {
1✔
213
                case retryCountOpt:
1✔
214
                        retryCount = uint(o)
1✔
215
                case msgTTLOpt:
1✔
216
                        q.msgTTL = time.Duration(o)
1✔
217
                }
218
        }
219
        // generate id
220
        idStr := uuid.Must(uuid.NewRandom()).String()
1✔
221
        now := time.Now()
1✔
222
        // store msg
1✔
223
        msgTTL := t.Sub(now) + q.msgTTL // delivery + q.msgTTL
1✔
224
        err := q.redisCli.Set(q.genMsgKey(idStr), payload, msgTTL)
1✔
225
        if err != nil {
1✔
226
                return fmt.Errorf("store msg failed: %v", err)
×
227
        }
×
228
        // store retry count
229
        err = q.redisCli.HSet(q.retryCountKey, idStr, strconv.Itoa(int(retryCount)))
1✔
230
        if err != nil {
1✔
231
                return fmt.Errorf("store retry count failed: %v", err)
×
232
        }
×
233
        // put to pending
234
        err = q.redisCli.ZAdd(q.pendingKey, map[string]float64{idStr: float64(t.Unix())})
1✔
235
        if err != nil {
1✔
236
                return fmt.Errorf("push to pending failed: %v", err)
×
237
        }
×
238
        q.reportEvent(NewMessageEvent, 1)
1✔
239
        return nil
1✔
240
}
241

242
// SendDelayMsg submits a message delivered after given duration
243
func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error {
1✔
244
        t := time.Now().Add(duration)
1✔
245
        return q.SendScheduleMsg(payload, t, opts...)
1✔
246
}
1✔
247

248
// pending2ReadyScript atomically moves messages from pending to ready
249
// keys: pendingKey, readyKey
250
// argv: currentTime
251
// returns: ready message number
252
const pending2ReadyScript = `
253
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- get ready msg
254
if (#msgs == 0) then return end
255
local args2 = {} -- keys to push into ready
256
for _,v in ipairs(msgs) do
257
        table.insert(args2, v) 
258
    if (#args2 == 4000) then
259
                redis.call('LPush', KEYS[2], unpack(args2))
260
                args2 = {}
261
        end
262
end
263
if (#args2 > 0) then 
264
        redis.call('LPush', KEYS[2], unpack(args2))
265
end
266
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- remove msgs from pending
267
return #msgs
268
`
269

270
func (q *DelayQueue) pending2Ready() error {
1✔
271
        now := time.Now().Unix()
1✔
272
        keys := []string{q.pendingKey, q.readyKey}
1✔
273
        raw, err := q.redisCli.Eval(pending2ReadyScript, keys, []interface{}{now})
1✔
274
        if err != nil && err != NilErr {
1✔
275
                return fmt.Errorf("pending2ReadyScript failed: %v", err)
×
276
        }
×
277
        count, ok := raw.(int64)
1✔
278
        if ok {
2✔
279
                q.reportEvent(ReadyEvent, int(count))
1✔
280
        }
1✔
281
        return nil
1✔
282
}
283

284
// ready2UnackScript atomically moves messages from ready to unack
285
// keys: readyKey/retryKey, unackKey
286
// argv: retryTime
287
const ready2UnackScript = `
288
local msg = redis.call('RPop', KEYS[1])
289
if (not msg) then return end
290
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
291
return msg
292
`
293

294
func (q *DelayQueue) ready2Unack() (string, error) {
1✔
295
        retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
1✔
296
        keys := []string{q.readyKey, q.unAckKey}
1✔
297
        ret, err := q.redisCli.Eval(ready2UnackScript, keys, []interface{}{retryTime})
1✔
298
        if err == NilErr {
2✔
299
                return "", err
1✔
300
        }
1✔
301
        if err != nil {
1✔
302
                return "", fmt.Errorf("ready2UnackScript failed: %v", err)
×
303
        }
×
304
        str, ok := ret.(string)
1✔
305
        if !ok {
1✔
306
                return "", fmt.Errorf("illegal result: %#v", ret)
×
307
        }
×
308
        q.reportEvent(DeliveredEvent, 1)
1✔
309
        return str, nil
1✔
310
}
311

312
func (q *DelayQueue) retry2Unack() (string, error) {
1✔
313
        retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
1✔
314
        keys := []string{q.retryKey, q.unAckKey}
1✔
315
        ret, err := q.redisCli.Eval(ready2UnackScript, keys, []interface{}{retryTime, q.retryKey, q.unAckKey})
1✔
316
        if err == NilErr {
2✔
317
                return "", NilErr
1✔
318
        }
1✔
319
        if err != nil {
1✔
320
                return "", fmt.Errorf("ready2UnackScript failed: %v", err)
×
321
        }
×
322
        str, ok := ret.(string)
1✔
323
        if !ok {
1✔
324
                return "", fmt.Errorf("illegal result: %#v", ret)
×
325
        }
×
326
        return str, nil
1✔
327
}
328

329
func (q *DelayQueue) callback(idStr string) error {
1✔
330
        payload, err := q.redisCli.Get(q.genMsgKey(idStr))
1✔
331
        if err == NilErr {
1✔
332
                return nil
×
333
        }
×
334
        if err != nil {
1✔
335
                // Is an IO error?
×
336
                return fmt.Errorf("get message payload failed: %v", err)
×
337
        }
×
338
        ack := q.cb(payload)
1✔
339
        if ack {
2✔
340
                err = q.ack(idStr)
1✔
341
        } else {
2✔
342
                err = q.nack(idStr)
1✔
343
        }
1✔
344
        return err
1✔
345
}
346

347
func (q *DelayQueue) ack(idStr string) error {
1✔
348
        atomic.AddInt32(&q.fetchCount, -1)
1✔
349
        err := q.redisCli.ZRem(q.unAckKey, []string{idStr})
1✔
350
        if err != nil {
1✔
351
                return fmt.Errorf("remove from unack failed: %v", err)
×
352
        }
×
353
        // msg key has ttl, ignore result of delete
354
        _ = q.redisCli.Del([]string{q.genMsgKey(idStr)})
1✔
355
        _ = q.redisCli.HDel(q.retryCountKey, []string{idStr})
1✔
356
        q.reportEvent(AckEvent, 1)
1✔
357
        return nil
1✔
358
}
359

360
func (q *DelayQueue) nack(idStr string) error {
1✔
361
        atomic.AddInt32(&q.fetchCount, -1)
1✔
362
        // update retry time as now, unack2Retry will move it to retry immediately
1✔
363
        err := q.redisCli.ZAdd(q.unAckKey, map[string]float64{
1✔
364
                idStr: float64(time.Now().Unix()),
1✔
365
        })
1✔
366
        if err != nil {
1✔
367
                return fmt.Errorf("negative ack failed: %v", err)
×
368
        }
×
369
        q.reportEvent(NackEvent, 1)
1✔
370
        return nil
1✔
371
}
372

373
// unack2RetryScript atomically moves messages from unack to retry which remaining retry count greater than 0,
374
// and moves messages from unack to garbage which  retry count is 0
375
// Because DelayQueue cannot determine garbage message before eval unack2RetryScript, so it cannot pass keys parameter to redisCli.Eval
376
// Therefore unack2RetryScript moves garbage message to garbageKey instead of deleting directly
377
// keys: unackKey, retryCountKey, retryKey, garbageKey
378
// argv: currentTime
379
// returns: {retryMsgs, failMsgs}
380
const unack2RetryScript = `
381
local unack2retry = function(msgs)
382
        local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- get retry count
383
        local retryMsgs = 0
384
        local failMsgs = 0
385
        for i,v in ipairs(retryCounts) do
386
                local k = msgs[i]
387
                if v ~= false and v ~= nil and v ~= '' and tonumber(v) > 0 then
388
                        redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count
389
                        redis.call("LPush", KEYS[3], k) -- add to retry
390
                        retryMsgs = retryMsgs + 1
391
                else
392
                        redis.call("HDel", KEYS[2], k) -- del retry count
393
                        redis.call("SAdd", KEYS[4], k) -- add to garbage
394
                        failMsgs = failMsgs + 1
395
                end
396
        end
397
        return retryMsgs, failMsgs
398
end
399

400
local retryMsgs = 0
401
local failMsgs = 0
402
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- get retry msg
403
if (#msgs == 0) then return end
404
if #msgs < 4000 then
405
        local d1, d2 = unack2retry(msgs)
406
        retryMsgs = retryMsgs + d1
407
        failMsgs = failMsgs + d2
408
else
409
        local buf = {}
410
        for _,v in ipairs(msgs) do
411
                table.insert(buf, v)
412
                if #buf == 4000 then
413
                    local d1, d2 = unack2retry(buf)
414
                        retryMsgs = retryMsgs + d1
415
                        failMsgs = failMsgs + d2
416
                        buf = {}
417
                end
418
        end
419
        if (#buf > 0) then
420
                local d1, d2 = unack2retry(buf)
421
                retryMsgs = retryMsgs + d1
422
                failMsgs = failMsgs + d2
423
        end
424
end
425
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- remove msgs from unack
426
return {retryMsgs, failMsgs}
427
`
428

429
func (q *DelayQueue) unack2Retry() error {
1✔
430
        keys := []string{q.unAckKey, q.retryCountKey, q.retryKey, q.garbageKey}
1✔
431
        now := time.Now()
1✔
432
        raw, err := q.redisCli.Eval(unack2RetryScript, keys, []interface{}{now.Unix()})
1✔
433
        if err != nil && err != NilErr {
1✔
434
                return fmt.Errorf("unack to retry script failed: %v", err)
×
435
        }
×
436
        infos, ok := raw.([]interface{})
1✔
437
        if ok && len(infos) == 2 {
2✔
438
                retryCount, ok := infos[0].(int64)
1✔
439
                if ok {
2✔
440
                        q.reportEvent(RetryEvent, int(retryCount))
1✔
441
                }
1✔
442
                failCount, ok := infos[1].(int64)
1✔
443
                if ok {
2✔
444
                        q.reportEvent(FinalFailedEvent, int(failCount))
1✔
445
                }
1✔
446
        }
447
        return nil
1✔
448
}
449

450
func (q *DelayQueue) garbageCollect() error {
1✔
451
        msgIds, err := q.redisCli.SMembers(q.garbageKey)
1✔
452
        if err != nil {
1✔
453
                return fmt.Errorf("smembers failed: %v", err)
×
454
        }
×
455
        if len(msgIds) == 0 {
2✔
456
                return nil
1✔
457
        }
1✔
458
        // allow concurrent clean
459
        msgKeys := make([]string, 0, len(msgIds))
1✔
460
        for _, idStr := range msgIds {
2✔
461
                msgKeys = append(msgKeys, q.genMsgKey(idStr))
1✔
462
        }
1✔
463
        err = q.redisCli.Del(msgKeys)
1✔
464
        if err != nil && err != NilErr {
1✔
465
                return fmt.Errorf("del msgs failed: %v", err)
×
466
        }
×
467
        err = q.redisCli.SRem(q.garbageKey, msgIds)
1✔
468
        if err != nil && err != NilErr {
1✔
469
                return fmt.Errorf("remove from garbage key failed: %v", err)
×
470
        }
×
471
        return nil
1✔
472
}
473

474
func (q *DelayQueue) beforeConsume() ([]string, error) {
1✔
475
        // pending to ready
1✔
476
        err := q.pending2Ready()
1✔
477
        if err != nil {
1✔
478
                return nil, err
×
479
        }
×
480
        // ready2Unack
481
        // prioritize new message consumption to avoid avalanches
482
        ids := make([]string, 0, q.fetchLimit)
1✔
483
        var fetchCount int32
1✔
484
        for {
2✔
485
                fetchCount = atomic.LoadInt32(&q.fetchCount)
1✔
486
                if q.fetchLimit > 0 && fetchCount >= int32(q.fetchLimit) {
2✔
487
                        break
1✔
488
                }
489
                idStr, err := q.ready2Unack()
1✔
490
                if err == NilErr { // consumed all
2✔
491
                        break
1✔
492
                }
493
                if err != nil {
1✔
494
                        return nil, err
×
495
                }
×
496
                ids = append(ids, idStr)
1✔
497
                atomic.AddInt32(&q.fetchCount, 1)
1✔
498
        }
499
        // retry2Unack
500
        if fetchCount < int32(q.fetchLimit) || q.fetchLimit == 0 {
2✔
501
                for {
2✔
502
                        fetchCount = atomic.LoadInt32(&q.fetchCount)
1✔
503
                        if q.fetchLimit > 0 && fetchCount >= int32(q.fetchLimit) {
2✔
504
                                break
1✔
505
                        }
506
                        idStr, err := q.retry2Unack()
1✔
507
                        if err == NilErr { // consumed all
2✔
508
                                break
1✔
509
                        }
510
                        if err != nil {
1✔
511
                                return nil, err
×
512
                        }
×
513
                        ids = append(ids, idStr)
1✔
514
                        atomic.AddInt32(&q.fetchCount, 1)
1✔
515
                }
516
        }
517
        return ids, nil
1✔
518
}
519

520
func (q *DelayQueue) afterConsume() error {
1✔
521
        // unack to retry
1✔
522
        err := q.unack2Retry()
1✔
523
        if err != nil {
1✔
524
                return err
×
525
        }
×
526
        err = q.garbageCollect()
1✔
527
        if err != nil {
1✔
528
                return err
×
529
        }
×
530
        return nil
1✔
531
}
532

533
func (q *DelayQueue) setRunning() {
1✔
534
        atomic.StoreInt32(&q.running, 1)
1✔
535
}
1✔
536

537
func (q *DelayQueue) setNotRunning() {
1✔
538
        atomic.StoreInt32(&q.running, 0)
1✔
539
}
1✔
540

541
func (q *DelayQueue) assertNotRunning() {
1✔
542
        running := atomic.LoadInt32(&q.running)
1✔
543
        if running > 0 {
1✔
544
                panic("operation cannot be performed during running")
×
545
        }
546
}
547

548
func (q *DelayQueue)goWithRecover(fn func()) {
1✔
549
        go func ()  {
2✔
550
                defer func ()  {
2✔
551
                        if err := recover(); err != nil {
2✔
552
                                q.logger.Printf("panic: %v\n", err)
1✔
553
                        }
1✔
554
                }()
555
                fn()
1✔
556
        }()
557
}
558

559
// StartConsume creates a goroutine to consume message from DelayQueue
560
// use `<-done` to wait consumer stopping
561
// If there is no callback set, StartConsume will panic
562
func (q *DelayQueue) StartConsume() (done <-chan struct{}) {
1✔
563
        if q.cb == nil {
1✔
564
                panic("this instance has no callback")
×
565
        }
566
        q.close = make(chan struct{}, 1)
1✔
567
        q.setRunning()
1✔
568
        q.ticker = time.NewTicker(q.fetchInterval)
1✔
569
        q.consumeBuffer = make(chan string, q.fetchLimit)
1✔
570
        done0 := make(chan struct{})
1✔
571
        // start worker
1✔
572
        for i := 0; i < int(q.concurrent); i++ {
2✔
573
                q.goWithRecover(func() {
2✔
574
                        for id := range q.consumeBuffer {
2✔
575
                                q.callback(id)
1✔
576
                                q.afterConsume()
1✔
577
                        }
1✔
578
                })
579
        }
580
        // start main loop
581
        go func() {
2✔
582
        tickerLoop:
1✔
583
                for {
2✔
584
                        select {
1✔
585
                        case <-q.ticker.C:
1✔
586
                                ids, err := q.beforeConsume()
1✔
587
                                if err != nil {
1✔
588
                                        log.Printf("consume error: %v", err)
×
589
                                }
×
590
                                q.goWithRecover(func() {
2✔
591
                                        for _, id := range ids {
2✔
592
                                                q.consumeBuffer <- id
1✔
593
                                        }
1✔
594
                                })
595
                        case <-q.close:
1✔
596
                                break tickerLoop
1✔
597
                        }
598
                }
599
                close(done0)
1✔
600
        }()
601
        return done0
1✔
602
}
603

604
// StopConsume stops consumer goroutine
605
func (q *DelayQueue) StopConsume() {
1✔
606
        close(q.close)
1✔
607
        q.setNotRunning()
1✔
608
        if q.ticker != nil {
2✔
609
                q.ticker.Stop()
1✔
610
        }
1✔
611
        close(q.consumeBuffer)
1✔
612
}
613

614
// GetPendingCount returns the number of pending messages
615
func (q *DelayQueue) GetPendingCount() (int64, error) {
1✔
616
        return q.redisCli.ZCard(q.pendingKey)
1✔
617
}
1✔
618

619
// GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered
620
func (q *DelayQueue) GetReadyCount() (int64, error) {
1✔
621
        return q.redisCli.LLen(q.readyKey)
1✔
622
}
1✔
623

624
// GetProcessingCount returns the number of messages which are being processed
625
func (q *DelayQueue) GetProcessingCount() (int64, error) {
1✔
626
        return q.redisCli.ZCard(q.unAckKey)
1✔
627
}
1✔
628

629
// EventListener which will be called when events occur
630
// This Listener can be used to monitor running status
631
type EventListener interface {
632
        // OnEvent will be called when events occur
633
        OnEvent(*Event)
634
}
635

636
// ListenEvent register a listener which will be called when events occur,
637
// so it can be used to monitor running status
638
//
639
// But It can ONLY receive events from the CURRENT INSTANCE,
640
// if you want to listen to all events in queue, just use Monitor.ListenEvent
641
//
642
// There can be AT MOST ONE EventListener in an DelayQueue instance.
643
// If you are using customized listener, Monitor will stop working
644
func (q *DelayQueue) ListenEvent(listener EventListener) {
1✔
645
        q.eventListener = listener
1✔
646
}
1✔
647

648
// RemoveListener stops reporting events to EventListener
649
func (q *DelayQueue) DisableListener() {
×
650
        q.eventListener = nil
×
651
}
×
652

653
func (q *DelayQueue) reportEvent(code int, count int) {
1✔
654
        listener := q.eventListener // eventListener may be changed during running
1✔
655
        if listener != nil && count > 0 {
2✔
656
                event := &Event{
1✔
657
                        Code:      code,
1✔
658
                        Timestamp: time.Now().Unix(),
1✔
659
                        MsgCount:  count,
1✔
660
                }
1✔
661
                listener.OnEvent(event)
1✔
662
        }
1✔
663
}
664

665
// pubsubListener receives events and reports them through redis pubsub for monitoring
666
type pubsubListener struct {
667
        redisCli   RedisCli
668
        reportChan string
669
}
670

671
func genReportChannel(name string) string {
1✔
672
        return "dq:" + name + ":reportEvents"
1✔
673
}
1✔
674

675
// EnableReport enables reporting to monitor
676
func (q *DelayQueue) EnableReport() {
1✔
677
        reportChan := genReportChannel(q.name)
1✔
678
        q.ListenEvent(&pubsubListener{
1✔
679
                redisCli:   q.redisCli,
1✔
680
                reportChan: reportChan,
1✔
681
        })
1✔
682
}
1✔
683

684
// DisableReport stops reporting to monitor
685
func (q *DelayQueue) DisableReport() {
×
686
        q.DisableListener()
×
687
}
×
688

689
func (l *pubsubListener) OnEvent(event *Event) {
1✔
690
        payload := encodeEvent(event)
1✔
691
        l.redisCli.Publish(l.reportChan, payload)
1✔
692
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc