• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25621442036

10 May 2026 06:06AM UTC coverage: 83.337% (-0.06%) from 83.396%
25621442036

Pull #297

github

web-flow
Merge 151ec0fb3 into a868dcde0
Pull Request #297: Nats scheduling fix

2625 of 3487 branches covered (75.28%)

Branch coverage included in aggregate %.

129 of 180 new or added lines in 12 files covered. (71.67%)

11 existing lines in 3 files now uncovered.

7833 of 9062 relevant lines covered (86.44%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.16
/rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsAggregatorService.java
1
/*
2
 * Copyright (c) 2020-2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.metrics;
18

19
import com.github.sonus21.rqueue.common.RqueueLockManager;
20
import com.github.sonus21.rqueue.config.RqueueConfig;
21
import com.github.sonus21.rqueue.config.RqueueWebConfig;
22
import com.github.sonus21.rqueue.core.RqueueMessage;
23
import com.github.sonus21.rqueue.dao.RqueueQStatsDao;
24
import com.github.sonus21.rqueue.listener.QueueDetail;
25
import com.github.sonus21.rqueue.models.aggregator.QueueEvents;
26
import com.github.sonus21.rqueue.models.aggregator.TasksStat;
27
import com.github.sonus21.rqueue.models.db.MessageMetadata;
28
import com.github.sonus21.rqueue.models.db.QueueStatistics;
29
import com.github.sonus21.rqueue.models.enums.MessageStatus;
30
import com.github.sonus21.rqueue.models.event.RqueueExecutionEvent;
31
import com.github.sonus21.rqueue.utils.Constants;
32
import com.github.sonus21.rqueue.utils.DateTimeUtils;
33
import com.github.sonus21.rqueue.utils.ThreadUtils;
34
import com.github.sonus21.rqueue.utils.TimeoutUtils;
35
import java.time.Duration;
36
import java.time.LocalDate;
37
import java.util.ArrayList;
38
import java.util.Collection;
39
import java.util.HashMap;
40
import java.util.List;
41
import java.util.Map;
42
import java.util.Map.Entry;
43
import java.util.UUID;
44
import java.util.concurrent.BlockingQueue;
45
import java.util.concurrent.ConcurrentHashMap;
46
import java.util.concurrent.Future;
47
import java.util.concurrent.LinkedBlockingQueue;
48
import java.util.concurrent.TimeUnit;
49
import lombok.extern.slf4j.Slf4j;
50
import org.springframework.beans.factory.DisposableBean;
51
import org.springframework.beans.factory.annotation.Autowired;
52
import org.springframework.context.ApplicationListener;
53
import org.springframework.context.SmartLifecycle;
54
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
55
import org.springframework.stereotype.Component;
56
import org.springframework.util.CollectionUtils;
57

58
@Component
59
@Slf4j
1✔
60
public class RqueueMetricsAggregatorService
61
    implements ApplicationListener<RqueueExecutionEvent>, DisposableBean, SmartLifecycle {
62

63
  private final RqueueConfig rqueueConfig;
64
  private final RqueueWebConfig rqueueWebConfig;
65
  private final RqueueLockManager rqueueLockManager;
66
  private final RqueueQStatsDao rqueueQStatsDao;
67
  private final Object lifecycleMgr = new Object();
1✔
68
  private final Object aggregatorLock = new Object();
1✔
69
  private volatile boolean running = false;
1✔
70
  private ThreadPoolTaskScheduler taskExecutor;
71
  private Map<String, QueueEvents> queueNameToEvents;
72
  private BlockingQueue<QueueEvents> queue;
73
  private List<Future<?>> eventAggregatorTasks;
74

75
  @Autowired
76
  public RqueueMetricsAggregatorService(
77
      RqueueConfig rqueueConfig,
78
      RqueueWebConfig rqueueWebConfig,
79
      RqueueLockManager rqueueLockManager,
80
      RqueueQStatsDao rqueueQStatsDao) {
1✔
81
    this.rqueueConfig = rqueueConfig;
1✔
82
    this.rqueueWebConfig = rqueueWebConfig;
1✔
83
    this.rqueueLockManager = rqueueLockManager;
1✔
84
    this.rqueueQStatsDao = rqueueQStatsDao;
1✔
85
  }
1✔
86

87
  @Override
88
  public void destroy() throws Exception {
89
    log.info("Destroying task aggregator");
1✔
90
    stop();
1✔
91
    if (this.taskExecutor != null) {
1!
92
      this.taskExecutor.destroy();
1✔
93
    }
94
  }
1✔
95

96
  @Override
97
  public void start() {
98
    log.info("Starting task aggregation {}", RqueueConfig.getBrokerId());
1✔
99
    synchronized (lifecycleMgr) {
1✔
100
      running = true;
1✔
101
      if (!rqueueWebConfig.isCollectListenerStats()) {
1!
102
        return;
×
103
      }
104
      if (rqueueConfig.isProducer()) {
1!
105
        return;
×
106
      }
107
      this.eventAggregatorTasks = new ArrayList<>();
1✔
108
      this.queueNameToEvents = new ConcurrentHashMap<>();
1✔
109
      this.queue = new LinkedBlockingQueue<>();
1✔
110
      int threadCount = rqueueWebConfig.getStatsAggregatorThreadCount();
1✔
111
      this.taskExecutor = ThreadUtils.createTaskScheduler(threadCount, "RqueueTaskAggregator-", 30);
1✔
112
      for (int i = 0; i < threadCount; i++) {
1✔
113
        EventAggregator eventAggregator = new EventAggregator();
1✔
114
        eventAggregatorTasks.add(this.taskExecutor.submit(eventAggregator));
1✔
115
      }
116
      this.taskExecutor.scheduleAtFixedRate(
1✔
117
          new SweepJob(), Duration.ofSeconds(rqueueWebConfig.getAggregateEventWaitTimeInSecond()));
1✔
118
      lifecycleMgr.notifyAll();
1✔
119
    }
1✔
120
  }
1✔
121

122
  private boolean processingRequired(QueueEvents queueEvents) {
123
    return queueEvents.processingRequired(
1✔
124
        rqueueWebConfig.getAggregateEventWaitTimeInSecond(),
1✔
125
        rqueueWebConfig.getAggregateEventCount());
1✔
126
  }
127

128
  private void waitForRunningTaskToStop() {
129
    if (!CollectionUtils.isEmpty(eventAggregatorTasks)) {
1!
130
      for (Future<?> future : eventAggregatorTasks) {
1✔
131
        ThreadUtils.waitForTermination(
1✔
132
            log,
133
            future,
134
            rqueueWebConfig.getAggregateShutdownWaitTime(),
1✔
135
            "Aggregator task termination");
136
      }
1✔
137
    }
138
  }
1✔
139

140
  @Override
141
  public void stop() {
142
    log.info("Stopping task aggregation {}", RqueueConfig.getBrokerId());
1✔
143
    synchronized (lifecycleMgr) {
1✔
144
      synchronized (aggregatorLock) {
1✔
145
        if (!CollectionUtils.isEmpty(queueNameToEvents)) {
1!
146
          Collection<QueueEvents> queueEvents = queueNameToEvents.values();
1✔
147
          queue.addAll(queueEvents);
1✔
148
          queueEvents.clear();
1✔
149
        }
150
        aggregatorLock.notifyAll();
1✔
151
      }
1✔
152
      running = false;
1✔
153
      waitForRunningTaskToStop();
1✔
154
      lifecycleMgr.notifyAll();
1✔
155
    }
1✔
156
  }
1✔
157

158
  @Override
159
  public boolean isRunning() {
160
    synchronized (lifecycleMgr) {
×
161
      return this.running;
×
162
    }
163
  }
164

165
  @Override
166
  public void onApplicationEvent(RqueueExecutionEvent event) {
167
    synchronized (aggregatorLock) {
1✔
168
      if (log.isTraceEnabled()) {
1!
169
        log.trace("Event {}", event);
×
170
      }
171
      QueueDetail queueDetail = (QueueDetail) event.getSource();
1✔
172
      String queueName = queueDetail.getName();
1✔
173
      QueueEvents queueEvents = queueNameToEvents.get(queueName);
1✔
174
      if (queueEvents == null) {
1✔
175
        queueEvents = new QueueEvents(event);
1✔
176
      } else {
177
        queueEvents.addEvent(event);
1✔
178
      }
179
      if (processingRequired(queueEvents)) {
1✔
180
        if (log.isTraceEnabled()) {
1!
181
          log.trace("Adding events to the queue");
×
182
        }
183
        queue.add(queueEvents);
1✔
184
        queueNameToEvents.remove(queueName);
1✔
185
      } else {
186
        queueNameToEvents.put(queueName, queueEvents);
1✔
187
      }
188
      aggregatorLock.notifyAll();
1✔
189
    }
1✔
190
  }
1✔
191

192
  @Override
193
  public boolean isAutoStartup() {
194
    return true;
×
195
  }
196

197
  @Override
198
  public void stop(Runnable callback) {
199
    stop();
×
200
    callback.run();
×
201
  }
×
202

203
  @Override
204
  public int getPhase() {
205
    return Integer.MAX_VALUE;
×
206
  }
207

208
  class SweepJob implements Runnable {
1✔
209

210
    @Override
211
    public void run() {
UNCOV
212
      if (log.isDebugEnabled()) {
×
213
        log.debug("Checking pending events.");
×
214
      }
UNCOV
215
      synchronized (aggregatorLock) {
×
UNCOV
216
        List<String> queuesToSweep = new ArrayList<>();
×
UNCOV
217
        for (Entry<String, QueueEvents> entry : queueNameToEvents.entrySet()) {
×
218
          QueueEvents queueEvents = entry.getValue();
×
219
          String queueName = entry.getKey();
×
220
          if (processingRequired(queueEvents)) {
×
221
            queue.add(queueEvents);
×
222
            queuesToSweep.add(queueName);
×
223
          }
224
        }
×
UNCOV
225
        for (String queueName : queuesToSweep) {
×
226
          queueNameToEvents.remove(queueName);
×
227
        }
×
UNCOV
228
        aggregatorLock.notifyAll();
×
UNCOV
229
      }
×
UNCOV
230
    }
×
231
  }
232

233
  private class EventAggregator implements Runnable {
1✔
234

235
    private void aggregate(RqueueExecutionEvent event, TasksStat stat) {
236
      MessageMetadata messageMetadata = event.getJob().getMessageMetadata();
1✔
237
      RqueueMessage rqueueMessage = event.getJob().getRqueueMessage();
1✔
238
      MessageStatus messageStatus = messageMetadata.getStatus();
1✔
239
      if (MessageStatus.DISCARDED.equals(messageStatus)) {
1✔
240
        stat.discarded += 1;
1✔
241
      } else if (MessageStatus.SUCCESSFUL.equals(messageStatus)) {
1✔
242
        stat.success += 1;
1✔
243
      } else if (MessageStatus.MOVED_TO_DLQ.equals(messageStatus)) {
1!
244
        stat.movedToDlq += 1;
1✔
245
      }
246
      if (rqueueMessage.getFailureCount() > 0) {
1✔
247
        stat.retried += 1;
1✔
248
      }
249
      stat.minExecution = Math.min(stat.minExecution, messageMetadata.getTotalExecutionTime());
1✔
250
      stat.maxExecution = Math.max(stat.maxExecution, messageMetadata.getTotalExecutionTime());
1✔
251
      stat.jobCount += 1;
1✔
252
      stat.totalExecutionTime += messageMetadata.getTotalExecutionTime();
1✔
253
    }
1✔
254

255
    private void saveAggregateData(
256
        Map<LocalDate, TasksStat> localDateTasksStatMap, String queueStatKey) {
257
      QueueStatistics queueStatistics = rqueueQStatsDao.findById(queueStatKey);
1✔
258
      if (queueStatistics == null) {
1!
259
        queueStatistics = new QueueStatistics(queueStatKey);
1✔
260
      }
261
      LocalDate today = DateTimeUtils.today();
1✔
262
      queueStatistics.updateTime();
1✔
263
      for (Entry<LocalDate, TasksStat> entry : localDateTasksStatMap.entrySet()) {
1✔
264
        queueStatistics.update(entry.getValue(), entry.getKey().toString());
1✔
265
      }
1✔
266
      queueStatistics.pruneStats(today, rqueueWebConfig.getHistoryDay());
1✔
267
      rqueueQStatsDao.save(queueStatistics);
1✔
268
    }
1✔
269

270
    private Map<LocalDate, TasksStat> aggregate(QueueEvents events) {
271
      List<RqueueExecutionEvent> executionEvents = events.rqueueExecutionEvents;
1✔
272
      RqueueExecutionEvent queueRqueueExecutionEvent = executionEvents.get(0);
1✔
273
      Map<LocalDate, TasksStat> localDateTasksStatMap = new HashMap<>();
1✔
274
      for (RqueueExecutionEvent event : executionEvents) {
1✔
275
        LocalDate date = DateTimeUtils.localDateFromMilli(queueRqueueExecutionEvent.getTimestamp());
1✔
276
        TasksStat stat = localDateTasksStatMap.getOrDefault(date, new TasksStat());
1✔
277
        aggregate(event, stat);
1✔
278
        localDateTasksStatMap.put(date, stat);
1✔
279
      }
1✔
280
      return localDateTasksStatMap;
1✔
281
    }
282

283
    private void processEvents(QueueEvents events) {
284
      List<RqueueExecutionEvent> queueRqueueExecutionEvents = events.rqueueExecutionEvents;
1✔
285
      if (!CollectionUtils.isEmpty(queueRqueueExecutionEvents)) {
1!
286
        RqueueExecutionEvent queueRqueueExecutionEvent = queueRqueueExecutionEvents.get(0);
1✔
287
        QueueDetail queueDetail = (QueueDetail) queueRqueueExecutionEvent.getSource();
1✔
288
        String queueStatKey = rqueueConfig.getQueueStatisticsKey(queueDetail.getName());
1✔
289
        String lockKey = rqueueConfig.getLockKey(queueStatKey);
1✔
290
        String lockValue = UUID.randomUUID().toString();
1✔
291
        Map<LocalDate, TasksStat> localDateTasksStatMap = aggregate(events);
1✔
292

293
        try {
294
          if (rqueueLockManager.acquireLock(
1!
295
              lockKey,
296
              lockValue,
297
              Duration.ofMillis(rqueueWebConfig.getAggregateEventLockDurationInMs()))) {
1✔
298
            saveAggregateData(localDateTasksStatMap, queueStatKey);
1✔
299
          } else {
300
            log.debug(
×
301
                "queue:{}, aggregate job is unable to acquire lock", queueDetail.getQueueName());
×
302
            TimeoutUtils.sleep(Constants.ONE_MILLI);
×
303
            queue.add(events);
×
304
          }
305
        } finally {
306
          rqueueLockManager.releaseLock(lockKey, lockValue);
1✔
307
        }
308
      }
309
    }
1✔
310

311
    @Override
312
    public void run() {
313
      while (running) {
1✔
314
        QueueEvents events = null;
1✔
315
        try {
316
          if (log.isTraceEnabled()) {
1!
317
            log.trace("Aggregating queue stats");
×
318
          }
319
          events =
1✔
320
              queue.poll(rqueueWebConfig.getAggregateShutdownWaitTime() / 2, TimeUnit.MILLISECONDS);
1✔
321
          if (events == null) {
1✔
322
            continue;
1✔
323
          }
324
          processEvents(events);
1✔
325
        } catch (InterruptedException e) {
×
326
          Thread.currentThread().interrupt();
×
327
        } catch (Exception e) {
×
328
          // unprocessed events
329
          if (events != null) {
×
330
            queue.add(events);
×
331
          }
332
          log.error("Error in aggregator job ", e);
×
333
          TimeoutUtils.sleepLog(Constants.ONE_MILLI, false);
×
334
        }
1✔
335
      }
1✔
336
    }
1✔
337
  }
338
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc