• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 26012877262

18 May 2026 04:04AM UTC coverage: 83.319% (-0.09%) from 83.412%
26012877262

push

github

web-flow
fix: restore Jackson 2.x property order in RqueueRedisSerializer to prevent stale processing-queue entries after 3.x → 4.x upgrade (#300)

* fix: restore Jackson 2.x property order in RqueueRedisSerializer to prevent stale processing-queue entries after 3.x → 4.x upgrade

* build: bump version to 4.0.0-RC10

* feat: add rqueue.serialization.property.order property to control JSON field ordering

Introduces RqueueRedisSerializer.PropertyOrder enum (ALPHABETICAL | DECLARATION)
and wires it via rqueue.serialization.property.order (default: ALPHABETICAL).

ALPHABETICAL uses Jackson 3.x alphabetical ordering, the native default for
RQueue 4.x deployments. No configuration change required for new installs.

DECLARATION uses declaration order, matching the Jackson 2.x behaviour of
RQueue 3.x. Set this when upgrading from 3.x with messages still in Redis
queues, as switching while messages are in-flight causes unexpected retries.

The setting is applied in RqueueListenerBaseConfig before any Redis template is
created (overriding RedisUtils providers when DECLARATION is requested), and
flows through RqueueConfig to RqueueInternalPubSubChannel so all serialiser
instances in the application use the same order.

Docs: configuration.md and migrations.md updated with property description,
accepted values, and the 3.x → 4.x migration warning.

Assisted-By: Claude Sonnet 4.6

2627 of 3485 branches covered (75.38%)

Branch coverage included in aggregate %.

13 of 31 new or added lines in 3 files covered. (41.94%)

14 existing lines in 6 files now uncovered.

7847 of 9086 relevant lines covered (86.36%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.2
/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueJobDao.java
1
/*
2
 * Copyright (c) 2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 */
10

11
package com.github.sonus21.rqueue.nats.dao;
12

13
import com.github.sonus21.rqueue.config.NatsBackendCondition;
14
import com.github.sonus21.rqueue.dao.RqueueJobDao;
15
import com.github.sonus21.rqueue.models.db.RqueueJob;
16
import com.github.sonus21.rqueue.nats.internal.NatsProvisioner;
17
import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets;
18
import com.github.sonus21.rqueue.nats.kv.NatsKvKeys;
19
import io.nats.client.JetStreamApiException;
20
import io.nats.client.KeyValue;
21
import io.nats.client.api.KeyValueEntry;
22
import java.io.IOException;
23
import java.time.Duration;
24
import java.util.ArrayList;
25
import java.util.Collection;
26
import java.util.Collections;
27
import java.util.List;
28
import java.util.logging.Level;
29
import java.util.logging.Logger;
30
import org.springframework.context.annotation.Conditional;
31
import org.springframework.context.annotation.DependsOn;
32
import org.springframework.stereotype.Repository;
33

34
/**
35
 * NATS-backed {@link RqueueJobDao} using a JetStream KV bucket as the job store. Entries are
36
 * keyed by job id and serialized as JSON. Look-ups by message id walk the bucket
37
 * keys; for the volumes rqueue typically tracks (current in-flight + recent retry history) this
38
 * is acceptable for v1 — the Redis impl uses an explicit reverse index, that's a follow-up here.
39
 *
40
 * <p>The {@code expiry} on save / create is currently best-effort: the bucket is created on the
41
 * first call with the expiry as its TTL. Subsequent saves reuse the same bucket regardless of
42
 * the requested per-key expiry.
43
 */
44
@Repository
45
@Conditional(NatsBackendCondition.class)
46
@DependsOn("natsKvBucketValidator")
47
public class NatsRqueueJobDao implements RqueueJobDao {
48

49
  private static final Logger log = Logger.getLogger(NatsRqueueJobDao.class.getName());
1✔
50
  private static final String BUCKET_NAME = NatsKvBuckets.JOBS;
51

52
  private final NatsProvisioner provisioner;
53
  private final com.github.sonus21.rqueue.serdes.RqueueSerDes serdes;
54

55
  public NatsRqueueJobDao(
56
      NatsProvisioner provisioner, com.github.sonus21.rqueue.serdes.RqueueSerDes serdes) {
1✔
57
    this.provisioner = provisioner;
1✔
58
    this.serdes = serdes;
1✔
59
  }
1✔
60

61
  private KeyValue kv(Duration ttl) throws IOException, JetStreamApiException {
62
    return provisioner.ensureKv(BUCKET_NAME, ttl);
1✔
63
  }
64

65
  @Override
66
  public void createJob(RqueueJob rqueueJob, Duration expiry) {
67
    save(rqueueJob, expiry);
1✔
68
  }
1✔
69

70
  @Override
71
  public void save(RqueueJob rqueueJob, Duration expiry) {
72
    try {
73
      kv(expiry).put(NatsKvKeys.sanitize(rqueueJob.getId()), serialize(rqueueJob));
1✔
74
    } catch (IOException | JetStreamApiException e) {
1✔
75
      log.log(Level.WARNING, "save job " + rqueueJob.getId() + " failed", e);
1✔
76
    }
1✔
77
  }
1✔
78

79
  @Override
80
  public RqueueJob findById(String jobId) {
81
    return loadByKey(NatsKvKeys.sanitize(jobId));
1✔
82
  }
83

84
  @Override
85
  public List<RqueueJob> findJobsByIdIn(Collection<String> jobIds) {
86
    List<RqueueJob> out = new ArrayList<>(jobIds.size());
1✔
87
    for (String id : jobIds) {
1✔
88
      RqueueJob j = findById(id);
1✔
89
      if (j != null) {
1✔
90
        out.add(j);
1✔
91
      }
92
    }
1✔
93
    return out;
1✔
94
  }
95

96
  @Override
97
  public List<RqueueJob> finByMessageId(String messageId) {
98
    if (messageId == null) {
1✔
99
      return Collections.emptyList();
1✔
100
    }
101
    return scanForMessageIds(Collections.singletonList(messageId));
1✔
102
  }
103

104
  @Override
105
  public List<RqueueJob> finByMessageIdIn(List<String> messageIds) {
106
    if (messageIds == null || messageIds.isEmpty()) {
1✔
107
      return Collections.emptyList();
1✔
108
    }
109
    return scanForMessageIds(messageIds);
1✔
110
  }
111

112
  @Override
113
  public void delete(String jobId) {
114
    try {
115
      kv(null).delete(NatsKvKeys.sanitize(jobId));
1✔
116
    } catch (IOException | JetStreamApiException e) {
1✔
117
      log.log(Level.WARNING, "delete job " + jobId + " failed", e);
1✔
118
    }
1✔
119
  }
1✔
120

121
  // ---- helpers ----------------------------------------------------------
122

123
  private RqueueJob loadByKey(String key) {
124
    try {
125
      KeyValueEntry entry = kv(null).get(key);
1✔
126
      if (entry == null || entry.getValue() == null) {
1✔
127
        return null;
1✔
128
      }
129
      return deserialize(entry.getValue());
1✔
130
    } catch (IOException | JetStreamApiException e) {
1✔
131
      log.log(Level.WARNING, "loadByKey " + key + " failed", e);
1✔
132
      return null;
1✔
133
    }
134
  }
135

136
  private List<RqueueJob> scanForMessageIds(Collection<String> messageIds) {
137
    try {
138
      List<String> keys = new ArrayList<>(kv(null).keys());
1✔
139
      List<RqueueJob> out = new ArrayList<>();
1✔
140
      for (String k : keys) {
1✔
141
        RqueueJob j = loadByKey(k);
1✔
142
        if (j != null && messageIds.contains(j.getMessageId())) {
1✔
143
          out.add(j);
1✔
144
        }
145
      }
1✔
146
      return out;
1✔
147
    } catch (IOException | JetStreamApiException | InterruptedException e) {
1✔
148
      log.log(Level.WARNING, "scanForMessageIds failed", e);
1✔
149
      if (e instanceof InterruptedException) {
1✔
150
        Thread.currentThread().interrupt();
1✔
151
      }
152
      return Collections.emptyList();
1✔
153
    }
154
  }
155

156
  private byte[] serialize(RqueueJob job) throws IOException {
157
    return serdes.serialize(job);
1✔
158
  }
159

160
  private RqueueJob deserialize(byte[] bytes) {
161
    try {
162
      return serdes.deserialize(bytes, RqueueJob.class);
1✔
163
    } catch (Exception e) {
×
164
      log.log(Level.WARNING, "deserialize RqueueJob failed", e);
×
UNCOV
165
      return null;
×
166
    }
167
  }
168
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc