• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 26012877262

18 May 2026 04:04AM UTC coverage: 83.319% (-0.09%) from 83.412%
26012877262

push

github

web-flow
fix: restore Jackson 2.x property order in RqueueRedisSerializer to prevent stale processing-queue entries after 3.x → 4.x upgrade (#300)

* fix: restore Jackson 2.x property order in RqueueRedisSerializer to prevent stale processing-queue entries after 3.x → 4.x upgrade

* build: bump version to 4.0.0-RC10

* feat: add rqueue.serialization.property.order property to control JSON field ordering

Introduces RqueueRedisSerializer.PropertyOrder enum (ALPHABETICAL | DECLARATION)
and wires it via rqueue.serialization.property.order (default: ALPHABETICAL).

ALPHABETICAL uses Jackson 3.x alphabetical ordering, the native default for
RQueue 4.x deployments. No configuration change required for new installs.

DECLARATION uses declaration order, matching the Jackson 2.x behaviour of
RQueue 3.x. Set this when upgrading from 3.x with messages still in Redis
queues, as switching while messages are in-flight causes unexpected retries.

The setting is applied in RqueueListenerBaseConfig before any Redis template is
created (overriding RedisUtils providers when DECLARATION is requested), and
flows through RqueueConfig to RqueueInternalPubSubChannel so all serialiser
instances in the application use the same order.

Docs: configuration.md and migrations.md updated with property description,
accepted values, and the 3.x → 4.x migration warning.

Assisted-By: Claude Sonnet 4.6

2627 of 3485 branches covered (75.38%)

Branch coverage included in aggregate %.

13 of 31 new or added lines in 3 files covered. (41.94%)

14 existing lines in 6 files now uncovered.

7847 of 9086 relevant lines covered (86.36%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.93
/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/worker/NatsWorkerRegistryStore.java
1
/*
2
 * Copyright (c) 2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.nats.worker;
18

19
import com.github.sonus21.rqueue.config.NatsBackendCondition;
20
import com.github.sonus21.rqueue.models.registry.RqueueWorkerInfo;
21
import com.github.sonus21.rqueue.nats.internal.NatsProvisioner;
22
import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets;
23
import com.github.sonus21.rqueue.nats.kv.NatsKvKeys;
24
import com.github.sonus21.rqueue.worker.WorkerRegistryStore;
25
import io.nats.client.JetStreamApiException;
26
import io.nats.client.KeyValue;
27
import io.nats.client.api.KeyValueEntry;
28
import java.io.IOException;
29
import java.time.Duration;
30
import java.util.ArrayList;
31
import java.util.Collection;
32
import java.util.Collections;
33
import java.util.LinkedHashMap;
34
import java.util.List;
35
import java.util.Map;
36
import java.util.logging.Level;
37
import java.util.logging.Logger;
38
import org.springframework.context.annotation.Conditional;
39
import org.springframework.context.annotation.DependsOn;
40
import org.springframework.stereotype.Repository;
41

42
/**
43
 * NATS JetStream KV-backed {@link WorkerRegistryStore}. Uses two buckets so each can carry its
44
 * own bucket-level {@code maxAge}: {@code rqueue-workers} (worker info, TTL = workerTtl) and
45
 * {@code rqueue-worker-heartbeats} (per-(queue, workerId) heartbeats, TTL = queueTtl).
46
 *
47
 * <p>Per-queue heartbeats are stored as flattened keys of the form
48
 * {@code "<sanitizedQueueKey>__<sanitizedWorkerId>"}. Listing heartbeats for a queue iterates
49
 * all bucket keys with the matching {@code <sanitizedQueueKey>__} prefix.
50
 *
51
 * <p>Bucket {@code ttl} (NATS' name for entry max-age) is a one-shot configuration set at bucket
52
 * creation; {@link #refreshQueueTtl(String, Duration)} is therefore a no-op — every write into
53
 * the heartbeat bucket re-establishes the entry's age from zero, which is sufficient given the
54
 * registry rewrites heartbeats on the configured interval.
55
 *
56
 * <p>The first call lazily creates each bucket. {@code ttl} is fixed at bucket creation, so
57
 * existing buckets are reused even if the configured TTL has since changed.
58
 */
59
@Repository
60
@Conditional(NatsBackendCondition.class)
61
@DependsOn("natsKvBucketValidator")
62
public class NatsWorkerRegistryStore implements WorkerRegistryStore {
63

64
  private static final Logger log = Logger.getLogger(NatsWorkerRegistryStore.class.getName());
1✔
65
  private static final String WORKER_BUCKET = NatsKvBuckets.WORKERS;
66
  private static final String HEARTBEAT_BUCKET = NatsKvBuckets.WORKER_HEARTBEATS;
67
  /** Separator used to flatten a {@code (queueKey, workerId)} pair into a single KV key. */
68
  private static final String SEP = "__";
69

70
  private final NatsProvisioner provisioner;
71
  private final com.github.sonus21.rqueue.serdes.RqueueSerDes serdes;
72
  /** Captured on first putWorkerInfo call so worker bucket gets the right maxAge. */
73
  private volatile Duration workerBucketTtl;
74
  /** Captured on first putQueueHeartbeat / refreshQueueTtl so heartbeat bucket gets the right maxAge. */
75
  private volatile Duration heartbeatBucketTtl;
76

77
  public NatsWorkerRegistryStore(
78
      NatsProvisioner provisioner, com.github.sonus21.rqueue.serdes.RqueueSerDes serdes) {
1✔
79
    this.provisioner = provisioner;
1✔
80
    this.serdes = serdes;
1✔
81
  }
1✔
82

83
  @Override
84
  public void putWorkerInfo(String workerKey, RqueueWorkerInfo info, Duration ttl) {
85
    if (workerBucketTtl == null) {
1✔
86
      workerBucketTtl = ttl;
1✔
87
    }
88
    try {
89
      KeyValue kv = provisioner.ensureKv(WORKER_BUCKET, workerBucketTtl);
1✔
90
      kv.put(NatsKvKeys.sanitize(workerKey), serialize(info));
1✔
91
    } catch (IOException | JetStreamApiException e) {
1✔
92
      log.log(Level.WARNING, "putWorkerInfo " + workerKey + " failed", e);
1✔
93
    }
1✔
94
  }
1✔
95

96
  @Override
97
  public void deleteWorkerInfo(String workerKey) {
98
    try {
99
      KeyValue kv = provisioner.ensureKv(WORKER_BUCKET, workerBucketTtl);
1✔
100
      kv.delete(NatsKvKeys.sanitize(workerKey));
1✔
101
    } catch (IOException | JetStreamApiException e) {
1✔
102
      log.log(Level.WARNING, "deleteWorkerInfo " + workerKey + " failed", e);
1✔
103
    }
1✔
104
  }
1✔
105

106
  @Override
107
  public Map<String, RqueueWorkerInfo> getWorkerInfos(Collection<String> workerKeys) {
108
    if (workerKeys == null || workerKeys.isEmpty()) {
1✔
109
      return Collections.emptyMap();
1✔
110
    }
111
    Map<String, RqueueWorkerInfo> out = new LinkedHashMap<>();
1✔
112
    try {
113
      KeyValue kv = provisioner.ensureKv(WORKER_BUCKET, workerBucketTtl);
1✔
114
      for (String key : workerKeys) {
1✔
115
        KeyValueEntry entry = kv.get(NatsKvKeys.sanitize(key));
1✔
116
        if (entry == null || entry.getValue() == null) {
1!
UNCOV
117
          continue;
×
118
        }
119
        RqueueWorkerInfo info = deserialize(entry.getValue());
1✔
120
        if (info != null && info.getWorkerId() != null) {
1!
121
          out.put(info.getWorkerId(), info);
1✔
122
        }
123
      }
1✔
124
    } catch (IOException | JetStreamApiException e) {
×
UNCOV
125
      log.log(Level.WARNING, "getWorkerInfos failed", e);
×
126
    }
1✔
127
    return out;
1✔
128
  }
129

130
  @Override
131
  public void putQueueHeartbeat(String queueKey, String workerId, String metadataJson) {
132
    if (heartbeatBucketTtl == null) {
1✔
133
      // Best-effort default — overwritten by refreshQueueTtl when the registry computes the real
134
      // value.
135
      heartbeatBucketTtl = Duration.ofHours(1);
1✔
136
    }
137
    try {
138
      KeyValue kv = provisioner.ensureKv(HEARTBEAT_BUCKET, heartbeatBucketTtl);
1✔
139
      kv.put(compositeKey(queueKey, workerId), metadataJson.getBytes());
1✔
140
    } catch (IOException | JetStreamApiException e) {
1✔
141
      log.log(Level.WARNING, "putQueueHeartbeat queue=" + queueKey + " failed", e);
1✔
142
    }
1✔
143
  }
1✔
144

145
  @Override
146
  public Map<String, String> getQueueHeartbeats(String queueKey) {
147
    Map<String, String> out = new LinkedHashMap<>();
1✔
148
    try {
149
      KeyValue kv = provisioner.ensureKv(HEARTBEAT_BUCKET, heartbeatBucketTtl);
1✔
150
      String prefix = NatsKvKeys.sanitize(queueKey) + SEP;
1✔
151
      List<String> keys = new ArrayList<>(kv.keys());
1✔
152
      for (String k : keys) {
1✔
153
        if (!k.startsWith(prefix)) {
1✔
154
          continue;
1✔
155
        }
156
        KeyValueEntry entry = kv.get(k);
1✔
157
        if (entry == null || entry.getValue() == null) {
1!
UNCOV
158
          continue;
×
159
        }
160
        String workerId = k.substring(prefix.length());
1✔
161
        out.put(workerId, new String(entry.getValue()));
1✔
162
      }
1✔
163
    } catch (IOException | JetStreamApiException | InterruptedException e) {
1✔
164
      if (e instanceof InterruptedException) {
1✔
165
        Thread.currentThread().interrupt();
1✔
166
      }
167
      log.log(Level.WARNING, "getQueueHeartbeats queue=" + queueKey + " failed", e);
1✔
168
    }
1✔
169
    return out;
1✔
170
  }
171

172
  @Override
173
  public void deleteQueueHeartbeats(String queueKey, String... workerIds) {
174
    if (workerIds == null || workerIds.length == 0) {
1✔
175
      return;
1✔
176
    }
177
    try {
178
      KeyValue kv = provisioner.ensureKv(HEARTBEAT_BUCKET, heartbeatBucketTtl);
1✔
179
      for (String workerId : workerIds) {
1✔
180
        kv.delete(compositeKey(queueKey, workerId));
1✔
181
      }
182
    } catch (IOException | JetStreamApiException e) {
×
UNCOV
183
      log.log(Level.WARNING, "deleteQueueHeartbeats queue=" + queueKey + " failed", e);
×
184
    }
1✔
185
  }
1✔
186

187
  @Override
188
  public void refreshQueueTtl(String queueKey, Duration ttl) {
189
    // NATS KV applies maxAge at the bucket level and resets per-entry age on each write.
190
    // The registry already rewrites heartbeats on its configured interval, so each fresh put
191
    // implicitly resets expiry. We only capture the first observed ttl so the bucket created
192
    // on first use carries the correct maxAge.
193
    if (heartbeatBucketTtl == null) {
1✔
194
      heartbeatBucketTtl = ttl;
1✔
195
    }
196
  }
1✔
197

198
  // ---- helpers ----------------------------------------------------------
199

200
  private static String compositeKey(String queueKey, String workerId) {
201
    return NatsKvKeys.sanitize(queueKey) + SEP + NatsKvKeys.sanitize(workerId);
1✔
202
  }
203

204
  private byte[] serialize(RqueueWorkerInfo info) throws IOException {
205
    return serdes.serialize(info);
1✔
206
  }
207

208
  private RqueueWorkerInfo deserialize(byte[] bytes) {
209
    try {
210
      return serdes.deserialize(bytes, RqueueWorkerInfo.class);
1✔
UNCOV
211
    } catch (Exception e) {
×
UNCOV
212
      log.log(Level.WARNING, "deserialize RqueueWorkerInfo failed", e);
×
UNCOV
213
      return null;
×
214
    }
215
  }
216
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc