• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25934725140

15 May 2026 06:32PM UTC coverage: 83.412% (-0.01%) from 83.425%
25934725140

push

github

web-flow
Add global retry limit (#298)

* Add global retry limit

Assisted-By: Codex

* Apply Palantir Java Format

* Bump version to RC9

Assisted-By: Codex

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

2629 of 3489 branches covered (75.35%)

Branch coverage included in aggregate %.

3 of 3 new or added lines in 2 files covered. (100.0%)

6 existing lines in 2 files now uncovered.

7840 of 9062 relevant lines covered (86.52%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.54
/rqueue-core/src/main/java/com/github/sonus21/rqueue/worker/RqueueWorkerRegistryImpl.java
1
/*
2
 * Copyright (c) 2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.worker;
18

19
import com.github.sonus21.rqueue.config.RqueueConfig;
20
import com.github.sonus21.rqueue.core.EndpointRegistry;
21
import com.github.sonus21.rqueue.listener.QueueDetail;
22
import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent;
23
import com.github.sonus21.rqueue.models.registry.RqueueWorkerInfo;
24
import com.github.sonus21.rqueue.models.registry.RqueueWorkerPollerMetadata;
25
import com.github.sonus21.rqueue.models.registry.RqueueWorkerPollerView;
26
import com.github.sonus21.rqueue.serdes.RqueueSerDes;
27
import com.github.sonus21.rqueue.serdes.SerializationUtils;
28
import com.github.sonus21.rqueue.utils.DateTimeUtils;
29
import com.github.sonus21.rqueue.utils.QueueThreadPool;
30
import com.github.sonus21.rqueue.utils.StringUtils;
31
import java.lang.management.ManagementFactory;
32
import java.net.InetAddress;
33
import java.net.UnknownHostException;
34
import java.time.Duration;
35
import java.util.ArrayList;
36
import java.util.Collection;
37
import java.util.Collections;
38
import java.util.Comparator;
39
import java.util.LinkedHashMap;
40
import java.util.List;
41
import java.util.Map;
42
import java.util.concurrent.ConcurrentHashMap;
43
import lombok.extern.slf4j.Slf4j;
44
import org.springframework.context.ApplicationListener;
45
import org.springframework.util.CollectionUtils;
46

47
/**
48
 * Backend-agnostic worker registry. All heartbeat scheduling, in-memory bookkeeping, and view
49
 * assembly lives here; storage is delegated to a {@link WorkerRegistryStore}, of which Redis
50
 * and NATS JetStream KV provide concrete implementations.
51
 */
52
@Slf4j
1✔
53
public class RqueueWorkerRegistryImpl
54
    implements RqueueWorkerRegistry, ApplicationListener<RqueueBootstrapEvent> {
55
  private final RqueueSerDes serDes = SerializationUtils.getSerDes();
1✔
56
  private final RqueueConfig rqueueConfig;
57
  private final WorkerRegistryStore store;
58
  private final String workerId;
59
  private final String host;
60
  private final String pid;
61
  private final long startedAt;
62
  private final Map<String, Long> lastMessageAtByQueue = new ConcurrentHashMap<>();
1✔
63
  private final Map<String, Long> lastPollAtByQueue = new ConcurrentHashMap<>();
1✔
64
  private final Map<String, Long> lastQueueHeartbeatAt = new ConcurrentHashMap<>();
1✔
65
  private final Map<String, Long> lastQueueTtlRefreshAt = new ConcurrentHashMap<>();
1✔
66
  private final Map<String, Long> lastCapacityExhaustedAtByQueue = new ConcurrentHashMap<>();
1✔
67
  private final Map<String, Long> capacityExhaustedCountByQueue = new ConcurrentHashMap<>();
1✔
68
  private volatile long lastWorkerHeartbeatAt;
69

70
  public RqueueWorkerRegistryImpl(RqueueConfig rqueueConfig, WorkerRegistryStore store) {
1✔
71
    this.rqueueConfig = rqueueConfig;
1✔
72
    this.store = store;
1✔
73
    this.workerId = RqueueConfig.getBrokerId();
1✔
74
    this.host = getHostName();
1✔
75
    this.pid = getPid();
1✔
76
    this.startedAt = System.currentTimeMillis();
1✔
77
  }
1✔
78

79
  @Override
80
  public void recordQueuePoll(
81
      QueueDetail queueDetail, QueueThreadPool queueThreadPool, boolean messageReceived) {
82
    if (!rqueueConfig.isWorkerRegistryEnabled()) {
1!
83
      return;
×
84
    }
85
    String trackingKey = consumerTrackingKey(queueDetail);
1✔
86
    long now = System.currentTimeMillis();
1✔
87
    if (messageReceived) {
1✔
88
      lastMessageAtByQueue.put(trackingKey, now);
1✔
89
    }
90
    lastPollAtByQueue.put(trackingKey, now);
1✔
91
    refreshWorkerInfoIfRequired(now);
1✔
92
    if (!queueHeartbeatRequired(trackingKey, now)) {
1✔
93
      return;
1✔
94
    }
95
    publishHeartbeat(
1✔
96
        registryQueueName(queueDetail),
1✔
97
        trackingKey,
98
        queueThreadPool,
99
        now,
100
        queueDetail.resolvedConsumerName());
1✔
101
  }
1✔
102

103
  @Override
104
  public void recordQueueCapacityExhausted(
105
      QueueDetail queueDetail, QueueThreadPool queueThreadPool) {
106
    if (!rqueueConfig.isWorkerRegistryEnabled()) {
1!
107
      return;
×
108
    }
109
    String trackingKey = consumerTrackingKey(queueDetail);
1✔
110
    long now = System.currentTimeMillis();
1✔
111
    refreshWorkerInfoIfRequired(now);
1✔
112
    lastCapacityExhaustedAtByQueue.put(trackingKey, now);
1✔
113
    capacityExhaustedCountByQueue.compute(trackingKey, (key, count) -> {
1✔
114
      if (count == null) {
1✔
115
        return 1L;
1✔
116
      }
117
      if (count == Long.MAX_VALUE) {
1!
118
        return Long.MAX_VALUE;
×
119
      }
120
      return count + 1L;
1✔
121
    });
122
    if (!queueHeartbeatRequired(trackingKey, now)) {
1!
123
      return;
1✔
124
    }
UNCOV
125
    publishHeartbeat(
×
UNCOV
126
        registryQueueName(queueDetail),
×
127
        trackingKey,
128
        queueThreadPool,
129
        now,
UNCOV
130
        queueDetail.resolvedConsumerName());
×
UNCOV
131
  }
×
132

133
  private void publishHeartbeat(
134
      String registryQueueName,
135
      String trackingKey,
136
      QueueThreadPool queueThreadPool,
137
      long now,
138
      String consumerName) {
139
    RqueueWorkerPollerMetadata metadata = buildMetadata(trackingKey, queueThreadPool, consumerName);
1✔
140
    try {
141
      String queueKey = rqueueConfig.getWorkerRegistryQueueKey(registryQueueName);
1✔
142
      store.putQueueHeartbeat(
1✔
143
          queueKey, heartbeatSubKey(consumerName), serDes.serializeAsString(metadata));
1✔
144
      refreshQueueTtlIfRequired(registryQueueName, trackingKey, now);
1✔
145
      lastQueueHeartbeatAt.put(trackingKey, now);
1✔
146
    } catch (Exception e) {
×
147
      log.warn("Worker registry serialization failed for queue {}", trackingKey, e);
×
148
    }
1✔
149
  }
1✔
150

151
  @Override
152
  public List<RqueueWorkerPollerView> getQueueWorkers(String queueName) {
153
    if (!rqueueConfig.isWorkerRegistryEnabled()) {
1!
154
      return Collections.emptyList();
×
155
    }
156
    String queueKey = rqueueConfig.getWorkerRegistryQueueKey(queueName);
1✔
157
    Map<String, String> rawEntries = store.getQueueHeartbeats(queueKey);
1✔
158
    if (CollectionUtils.isEmpty(rawEntries)) {
1!
159
      return Collections.emptyList();
×
160
    }
161
    long now = System.currentTimeMillis();
1✔
162
    long staleAfter = 2 * rqueueConfig.getWorkerRegistryQueueHeartbeatInterval().toMillis();
1✔
163
    // Key = KV sub-key (may be "workerId__consumerName"); value = deserialized metadata.
164
    Map<String, RqueueWorkerPollerMetadata> metadataBySubKey = new LinkedHashMap<>();
1✔
165
    List<String> toDelete = new ArrayList<>();
1✔
166
    for (Map.Entry<String, String> entry : rawEntries.entrySet()) {
1✔
167
      try {
168
        RqueueWorkerPollerMetadata metadata =
1✔
169
            serDes.deserialize(entry.getValue(), RqueueWorkerPollerMetadata.class);
1✔
170
        if (metadata == null || metadata.getWorkerId() == null) {
1!
171
          toDelete.add(entry.getKey());
×
172
          continue;
×
173
        }
174
        // Lazy cleanup for entries that are far older than the queue retention window.
175
        if (now - metadata.getLastPollAt()
1✔
176
            > rqueueConfig.getWorkerRegistryQueueTtl().toMillis()) {
1!
177
          toDelete.add(entry.getKey());
×
178
          continue;
×
179
        }
180
        metadataBySubKey.put(entry.getKey(), metadata);
1✔
181
      } catch (Exception e) {
×
182
        log.warn("Worker registry deserialization failed for queue {}", queueName, e);
×
183
        toDelete.add(entry.getKey());
×
184
      }
1✔
185
    }
1✔
186
    if (!toDelete.isEmpty()) {
1!
187
      store.deleteQueueHeartbeats(queueKey, toDelete.toArray(new String[0]));
×
188
    }
189
    if (metadataBySubKey.isEmpty()) {
1!
190
      return Collections.emptyList();
×
191
    }
192
    // Collect unique bare workerIds from the metadata bodies (not from KV sub-keys, which may
193
    // be composite "workerId__consumerName" when multiple consumers share a queue).
194
    java.util.Set<String> bareWorkerIds = new java.util.LinkedHashSet<>();
1✔
195
    for (RqueueWorkerPollerMetadata m : metadataBySubKey.values()) {
1✔
196
      bareWorkerIds.add(m.getWorkerId());
1✔
197
    }
1✔
198
    Map<String, RqueueWorkerInfo> workerInfoById = loadWorkerInfo(bareWorkerIds);
1✔
199
    List<RqueueWorkerPollerView> rows = new ArrayList<>();
1✔
200
    for (Map.Entry<String, RqueueWorkerPollerMetadata> entry : metadataBySubKey.entrySet()) {
1✔
201
      RqueueWorkerPollerMetadata metadata = entry.getValue();
1✔
202
      String bareWorkerId = metadata.getWorkerId();
1✔
203
      RqueueWorkerInfo workerInfo = workerInfoById.get(bareWorkerId);
1✔
204
      long lastActivityAt = Math.max(
1✔
205
          metadata.getLastPollAt(),
1✔
206
          metadata.getLastCapacityExhaustedAt() == null
1!
207
              ? 0
1✔
208
              : metadata.getLastCapacityExhaustedAt());
×
209
      boolean stale = now - lastActivityAt > staleAfter || workerInfo == null;
1!
210
      rows.add(RqueueWorkerPollerView.builder()
1✔
211
          .queue(queueName)
1✔
212
          .workerId(bareWorkerId)
1✔
213
          .consumerName(metadata.getConsumerName())
1✔
214
          .host(workerInfo == null ? "unknown" : workerInfo.getHost())
1!
215
          .pid(workerInfo == null ? "" : workerInfo.getPid())
1!
216
          .status(stale ? "STALE" : "ACTIVE")
1!
217
          .lastPollAt(metadata.getLastPollAt())
1✔
218
          .lastPollAge(
1✔
219
              formatAge(now, metadata.getLastPollAt() == 0 ? null : metadata.getLastPollAt()))
1!
220
          .lastMessageAt(metadata.getLastMessageAt())
1✔
221
          .lastMessageAge(formatAge(now, metadata.getLastMessageAt()))
1✔
222
          .lastCapacityExhaustedAt(metadata.getLastCapacityExhaustedAt())
1✔
223
          .lastCapacityExhaustedAge(formatAge(now, metadata.getLastCapacityExhaustedAt()))
1✔
224
          .capacityExhaustedCount(metadata.getCapacityExhaustedCount())
1✔
225
          .build());
1✔
226
    }
1✔
227
    rows.sort(Comparator.comparingLong(RqueueWorkerPollerView::getLastPollAt).reversed());
1✔
228
    return rows;
1✔
229
  }
230

231
  @Override
232
  public void onApplicationEvent(RqueueBootstrapEvent event) {
233
    if (!rqueueConfig.isWorkerRegistryEnabled()) {
1!
234
      return;
×
235
    }
236
    if (event.isStartup()) {
1✔
237
      refreshWorkerInfo(System.currentTimeMillis());
1✔
238
    } else if (event.isShutdown()) {
1!
239
      cleanup();
1✔
240
    }
241
  }
1✔
242

243
  private void refreshWorkerInfoIfRequired(long now) {
244
    if (now - lastWorkerHeartbeatAt
1✔
245
        < rqueueConfig.getWorkerRegistryWorkerHeartbeatInterval().toMillis()) {
1✔
246
      return;
1✔
247
    }
248
    refreshWorkerInfo(now);
1✔
249
  }
1✔
250

251
  private void refreshWorkerInfo(long now) {
252
    RqueueWorkerInfo workerInfo = RqueueWorkerInfo.builder()
1✔
253
        .workerId(workerId)
1✔
254
        .host(host)
1✔
255
        .pid(pid)
1✔
256
        .version(rqueueConfig.getLibVersion())
1✔
257
        .startedAt(startedAt)
1✔
258
        .lastSeenAt(now)
1✔
259
        .build();
1✔
260
    store.putWorkerInfo(
1✔
261
        rqueueConfig.getWorkerRegistryKey(workerId),
1✔
262
        workerInfo,
263
        rqueueConfig.getWorkerRegistryWorkerTtl());
1✔
264
    lastWorkerHeartbeatAt = now;
1✔
265
  }
1✔
266

267
  private boolean queueHeartbeatRequired(String queueName, long now) {
268
    Long lastHeartbeat = lastQueueHeartbeatAt.get(queueName);
1✔
269
    if (lastHeartbeat == null) {
1✔
270
      return true;
1✔
271
    }
272
    return now - lastHeartbeat
1✔
273
        >= rqueueConfig.getWorkerRegistryQueueHeartbeatInterval().toMillis();
1✔
274
  }
275

276
  private void refreshQueueTtlIfRequired(String registryQueueName, String trackingKey, long now) {
277
    Duration ttl = rqueueConfig.getWorkerRegistryQueueTtl();
1✔
278
    long refreshIntervalInMillis = Math.max(1000L, ttl.toMillis() / 2);
1✔
279
    Long lastRefreshAt = lastQueueTtlRefreshAt.get(trackingKey);
1✔
280
    if (lastRefreshAt != null && now - lastRefreshAt < refreshIntervalInMillis) {
1!
281
      return;
1✔
282
    }
283
    store.refreshQueueTtl(rqueueConfig.getWorkerRegistryQueueKey(registryQueueName), ttl);
1✔
284
    lastQueueTtlRefreshAt.put(trackingKey, now);
1✔
285
  }
1✔
286

287
  private void cleanup() {
288
    store.deleteWorkerInfo(rqueueConfig.getWorkerRegistryKey(workerId));
1✔
289
    for (QueueDetail queueDetail : EndpointRegistry.getActiveQueueDetails()) {
1✔
290
      String consumerName = queueDetail.resolvedConsumerName();
1✔
291
      store.deleteQueueHeartbeats(
1✔
292
          rqueueConfig.getWorkerRegistryQueueKey(registryQueueName(queueDetail)),
1✔
293
          heartbeatSubKey(consumerName));
1✔
294
    }
1✔
295
    lastMessageAtByQueue.clear();
1✔
296
    lastPollAtByQueue.clear();
1✔
297
    lastQueueHeartbeatAt.clear();
1✔
298
    lastQueueTtlRefreshAt.clear();
1✔
299
    lastCapacityExhaustedAtByQueue.clear();
1✔
300
    capacityExhaustedCountByQueue.clear();
1✔
301
    lastWorkerHeartbeatAt = 0L;
1✔
302
  }
1✔
303

304
  private RqueueWorkerPollerMetadata buildMetadata(
305
      String registryQueueName, QueueThreadPool queueThreadPool, String consumerName) {
306
    return RqueueWorkerPollerMetadata.builder()
1✔
307
        .workerId(workerId)
1✔
308
        .consumerName(consumerName)
1✔
309
        .lastPollAt(lastPollAtByQueue.getOrDefault(registryQueueName, 0L))
1✔
310
        .lastMessageAt(lastMessageAtByQueue.get(registryQueueName))
1✔
311
        .lastCapacityExhaustedAt(lastCapacityExhaustedAtByQueue.get(registryQueueName))
1✔
312
        .capacityExhaustedCount(capacityExhaustedCountByQueue.getOrDefault(registryQueueName, 0L))
1✔
313
        .build();
1✔
314
  }
315

316
  private Map<String, RqueueWorkerInfo> loadWorkerInfo(Collection<String> workerIds) {
317
    List<String> keys = new ArrayList<>(workerIds.size());
1✔
318
    for (String workerId : workerIds) {
1✔
319
      keys.add(rqueueConfig.getWorkerRegistryKey(workerId));
1✔
320
    }
1✔
321
    Map<String, RqueueWorkerInfo> result = store.getWorkerInfos(keys);
1✔
322
    return result == null ? Collections.emptyMap() : result;
1!
323
  }
324

325
  private static String formatAge(long now, Long time) {
326
    if (time == null || time == 0) {
1!
327
      return "";
1✔
328
    }
329
    return DateTimeUtils.milliToHumanRepresentation(now - time);
1✔
330
  }
331

332
  /**
333
   * Returns the KV sub-key used to store this worker's heartbeat for a queue. When two
334
   * consumers share the same queue name (e.g. two {@code @RqueueListener} methods on the same
335
   * queue with different {@code consumerName}s), the consumer name is appended so each consumer
336
   * gets its own independent heartbeat entry rather than overwriting the other's.
337
   */
338
  private String heartbeatSubKey(String consumerName) {
339
    if (consumerName == null || consumerName.isEmpty()) {
1!
340
      return workerId;
×
341
    }
342
    return workerId + "__" + consumerName;
1✔
343
  }
344

345
  /**
346
   * Returns the queue name used as the KV bucket prefix for heartbeats. Always the bare queue
347
   * name (no consumer suffix) so that {@link #getQueueWorkers(String)} can find all consumers
348
   * of a queue under a single prefix scan.
349
   */
350
  private static String registryQueueName(QueueDetail queueDetail) {
351
    if (queueDetail.isSystemGenerated() && !StringUtils.isEmpty(queueDetail.getPriorityGroup())) {
1!
352
      return queueDetail.getPriorityGroup();
1✔
353
    }
354
    return queueDetail.getName();
1✔
355
  }
356

357
  /**
358
   * Returns the key used for all in-memory tracking maps (poll timestamps, heartbeat throttle,
359
   * TTL refresh throttle, capacity exhaustion counts). When multiple {@code @RqueueListener}
360
   * methods target the same queue with different consumer names, each consumer needs its own
361
   * independent tracking entry — otherwise the first heartbeat stamps the shared key and
362
   * suppresses all subsequent consumers during the throttle window.
363
   */
364
  private static String consumerTrackingKey(QueueDetail queueDetail) {
365
    String base = registryQueueName(queueDetail);
1✔
366
    String cn = queueDetail.resolvedConsumerName();
1✔
367
    return (cn != null && !cn.isEmpty()) ? base + "#" + cn : base;
1!
368
  }
369

370
  private static String getPid() {
371
    String runtimeName = ManagementFactory.getRuntimeMXBean().getName();
1✔
372
    int index = runtimeName.indexOf('@');
1✔
373
    if (index == -1) {
1!
374
      return runtimeName;
×
375
    }
376
    return runtimeName.substring(0, index);
1✔
377
  }
378

379
  private static String getHostName() {
380
    try {
381
      return InetAddress.getLocalHost().getHostName();
1✔
382
    } catch (UnknownHostException e) {
×
383
      return "unknown";
×
384
    }
385
  }
386
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc