• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 2610

pending completion
2610

push

circleci

web-flow
Fixing #193. (#194)

* Fixing #193.

* added doc

* add a warning log for not running tasks

* wip

* Use hybrid combination of Redis and fixed rate scheduler to avoid job multiplications.

* test fixes

* 25% buffer

* doc updated.

* new user

---------

Co-authored-by: Sonu Kumar <sonu@git>

135 of 135 new or added lines in 5 files covered. (100.0%)

5378 of 5877 relevant lines covered (91.51%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.55
/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RedisScheduleTriggerHandler.java
1
/*
2
 * Copyright (c) 2019-2023 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16
package com.github.sonus21.rqueue.core;
17

18
import com.github.sonus21.rqueue.config.RqueueSchedulerConfig;
19
import com.github.sonus21.rqueue.utils.ThreadUtils;
20
import com.google.common.annotations.VisibleForTesting;
21
import org.slf4j.Logger;
22
import org.springframework.data.redis.connection.Message;
23
import org.springframework.data.redis.connection.MessageListener;
24
import org.springframework.data.redis.listener.ChannelTopic;
25

26
import java.util.HashMap;
27
import java.util.List;
28
import java.util.Map;
29
import java.util.concurrent.ConcurrentHashMap;
30
import java.util.concurrent.Future;
31
import java.util.function.Function;
32

33
class RedisScheduleTriggerHandler {
34

35
  private final RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory;
36
  private final RqueueSchedulerConfig rqueueSchedulerConfig;
37
  private final Logger logger;
38
  private final Function<String, Future<?>> scheduler;
39
  private final Function<String, String> channelNameProducer;
40
  private final List<String> queueNames;
41

42
  @VisibleForTesting
43
  Map<String, Long> queueNameToLastRunTime;
44
  @VisibleForTesting
45
  Map<String, Future<?>> queueNameToFuture;
46
  @VisibleForTesting
47
  Map<String, String> channelNameToQueueName;
48
  @VisibleForTesting
49
  MessageListener messageListener;
50

51
  RedisScheduleTriggerHandler(Logger logger,
52
      RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory,
53
      RqueueSchedulerConfig rqueueSchedulerConfig, List<String> queueNames,
54
      Function<String, Future<?>> scheduler, Function<String, String> channelNameProducer) {
1✔
55
    this.queueNames = queueNames;
1✔
56
    this.rqueueSchedulerConfig = rqueueSchedulerConfig;
1✔
57
    this.rqueueRedisListenerContainerFactory = rqueueRedisListenerContainerFactory;
1✔
58
    this.logger = logger;
1✔
59
    this.scheduler = scheduler;
1✔
60
    this.channelNameProducer = channelNameProducer;
1✔
61
  }
1✔
62

63
  void initialize() {
64
    this.messageListener = new MessageSchedulerListener();
1✔
65
    this.channelNameToQueueName = new HashMap<>(queueNames.size());
1✔
66
    this.queueNameToFuture = new ConcurrentHashMap<>(queueNames.size());
1✔
67
    this.queueNameToLastRunTime = new ConcurrentHashMap<>(queueNames.size());
1✔
68
  }
1✔
69

70
  void stop() {
71
    for (String queue : queueNames) {
1✔
72
      stopQueue(queue);
1✔
73
    }
1✔
74
  }
1✔
75

76
  void startQueue(String queueName) {
77
    queueNameToLastRunTime.put(queueName, 0L);
1✔
78
    subscribeToRedisTopic(queueName);
1✔
79
  }
1✔
80

81
  void stopQueue(String queueName) {
82
    Future<?> future = queueNameToFuture.get(queueName);
1✔
83
    ThreadUtils.waitForTermination(logger, future, rqueueSchedulerConfig.getTerminationWaitTime(),
1✔
84
        "An exception occurred while stopping scheduler queue '{}'", queueName);
85
    queueNameToLastRunTime.put(queueName, 0L);
1✔
86
    queueNameToFuture.remove(queueName);
1✔
87
    unsubscribeFromRedis(queueName);
1✔
88
  }
1✔
89

90
  private void unsubscribeFromRedis(String queueName) {
91
    String channelName = channelNameProducer.apply(queueName);
1✔
92
    logger.debug("Queue {} unsubscribe from channel {}", queueName, channelName);
1✔
93
    rqueueRedisListenerContainerFactory.removeMessageListener(messageListener,
1✔
94
        new ChannelTopic(channelName));
95
    channelNameToQueueName.put(channelName, queueName);
1✔
96
  }
1✔
97

98
  private void subscribeToRedisTopic(String queueName) {
99
    String channelName = channelNameProducer.apply(queueName);
1✔
100
    channelNameToQueueName.put(channelName, queueName);
1✔
101
    logger.debug("Queue {} subscribe to channel {}", queueName, channelName);
1✔
102
    rqueueRedisListenerContainerFactory.addMessageListener(messageListener,
1✔
103
        new ChannelTopic(channelName));
104
  }
1✔
105

106
  protected long getMinDelay() {
107
    return rqueueSchedulerConfig.minMessageMoveDelay();
1✔
108
  }
109

110

111
  /**
112
   * This MessageListener listen the event from Redis, its expected that the event should be only
113
   * raised when elements in the ZSET are lagging behind current time.
114
   */
115
  private class MessageSchedulerListener implements MessageListener {
1✔
116

117
    private void schedule(String queueName, long currentTime) {
118
      Future<?> future = queueNameToFuture.get(queueName);
1✔
119
      if (future == null || future.isCancelled() || future.isDone()) {
1✔
120
        queueNameToLastRunTime.put(queueName, currentTime);
1✔
121
        Future<?> newFuture = scheduler.apply(queueName);
1✔
122
        queueNameToFuture.put(queueName, newFuture);
1✔
123
      }
124
    }
1✔
125

126
    private void handleMessage(String queueName, long startTime) {
127
      long currentTime = System.currentTimeMillis();
1✔
128
      if (startTime > currentTime) {
1✔
129
        logger.warn("Received message body is not correct queue: {}, time: {}", queueName,
1✔
130
            startTime);
1✔
131
        return;
1✔
132
      }
133
      long lastRunTime = queueNameToLastRunTime.get(queueName);
1✔
134
      if (currentTime - lastRunTime < getMinDelay()) {
1✔
135
        return;
1✔
136
      }
137
      schedule(queueName, currentTime);
1✔
138
    }
1✔
139

140
    @Override
141
    public void onMessage(Message message, byte[] pattern) {
142
      if (message.getBody().length == 0 || message.getChannel().length == 0) {
1✔
143
        return;
×
144
      }
145
      String body = new String(message.getBody());
1✔
146
      String channel = new String(message.getChannel());
1✔
147
      logger.trace("Body: {} Channel: {}", body, channel);
1✔
148
      try {
149
        long startTime = Long.parseLong(body);
1✔
150
        String queueName = channelNameToQueueName.get(channel);
1✔
151
        if (queueName == null) {
1✔
152
          logger.warn("Unknown channel name {}", channel);
1✔
153
          return;
1✔
154
        }
155
        handleMessage(queueName, startTime);
1✔
156
      } catch (Exception e) {
1✔
157
        logger.error("Error occurred on a channel {}, body: {}", channel, body, e);
1✔
158
      }
1✔
159
    }
1✔
160
  }
161
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc