• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 26012877262

18 May 2026 04:04AM UTC coverage: 83.319% (-0.09%) from 83.412%
26012877262

push

github

web-flow
fix: restore Jackson 2.x property order in RqueueRedisSerializer to prevent stale processing-queue entries after 3.x → 4.x upgrade (#300)

* fix: restore Jackson 2.x property order in RqueueRedisSerializer to prevent stale processing-queue entries after 3.x → 4.x upgrade

* build: bump version to 4.0.0-RC10

* feat: add rqueue.serialization.property.order property to control JSON field ordering

Introduces RqueueRedisSerializer.PropertyOrder enum (ALPHABETICAL | DECLARATION)
and wires it via rqueue.serialization.property.order (default: ALPHABETICAL).

ALPHABETICAL uses Jackson 3.x alphabetical ordering, the native default for
RQueue 4.x deployments. No configuration change required for new installs.

DECLARATION uses declaration order, matching the Jackson 2.x behaviour of
RQueue 3.x. Set this when upgrading from 3.x with messages still in Redis
queues, as switching while messages are in-flight causes unexpected retries.

The setting is applied in RqueueListenerBaseConfig before any Redis template is
created (overriding RedisUtils providers when DECLARATION is requested), and
flows through RqueueConfig to RqueueInternalPubSubChannel so all serialiser
instances in the application use the same order.

Docs: configuration.md and migrations.md updated with property description,
accepted values, and the 3.x → 4.x migration warning.

Assisted-By: Claude Sonnet 4.6

2627 of 3485 branches covered (75.38%)

Branch coverage included in aggregate %.

13 of 31 new or added lines in 3 files covered. (41.94%)

14 existing lines in 6 files now uncovered.

7847 of 9086 relevant lines covered (86.36%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.86
/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueSystemConfigDao.java
1
/*
2
 * Copyright (c) 2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 */
10

11
package com.github.sonus21.rqueue.nats.dao;
12

13
import com.github.sonus21.rqueue.config.NatsBackendCondition;
14
import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao;
15
import com.github.sonus21.rqueue.models.db.QueueConfig;
16
import com.github.sonus21.rqueue.nats.internal.NatsProvisioner;
17
import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets;
18
import com.github.sonus21.rqueue.nats.kv.NatsKvKeys;
19
import io.nats.client.JetStreamApiException;
20
import io.nats.client.KeyValue;
21
import io.nats.client.api.KeyValueEntry;
22
import java.io.IOException;
23
import java.util.ArrayList;
24
import java.util.Collection;
25
import java.util.List;
26
import java.util.concurrent.ConcurrentHashMap;
27
import java.util.logging.Level;
28
import java.util.logging.Logger;
29
import org.springframework.context.annotation.Conditional;
30
import org.springframework.context.annotation.DependsOn;
31
import org.springframework.stereotype.Repository;
32

33
/**
34
 * NATS-backed {@link RqueueSystemConfigDao} using a JetStream KV bucket as the queue-config
35
 * store. Entries are keyed by {@link QueueConfig#getName()} and serialized via standard Java
36
 * serialization, matching the Redis impl which also relies on
37
 * {@link com.github.sonus21.rqueue.models.SerializableBase}.
38
 *
39
 * <p>An in-process cache mirrors the Redis impl's {@code byCachedXxx} methods for parity.
40
 * {@link #clearCacheByName(String)} evicts; {@link #saveQConfig(QueueConfig)} keeps the cache
41
 * in sync.
42
 */
43
@Repository
44
@Conditional(NatsBackendCondition.class)
45
@DependsOn("natsKvBucketValidator")
46
public class NatsRqueueSystemConfigDao implements RqueueSystemConfigDao {
47

48
  private static final Logger log = Logger.getLogger(NatsRqueueSystemConfigDao.class.getName());
1✔
49
  private static final String BUCKET_NAME = NatsKvBuckets.QUEUE_CONFIG;
50

51
  private final NatsProvisioner provisioner;
52
  private final com.github.sonus21.rqueue.serdes.RqueueSerDes serdes;
53
  private final ConcurrentHashMap<String, QueueConfig> cache = new ConcurrentHashMap<>();
1✔
54

55
  public NatsRqueueSystemConfigDao(
56
      NatsProvisioner provisioner, com.github.sonus21.rqueue.serdes.RqueueSerDes serdes) {
1✔
57
    this.provisioner = provisioner;
1✔
58
    this.serdes = serdes;
1✔
59
  }
1✔
60

61
  private KeyValue kv() throws IOException, JetStreamApiException {
62
    return provisioner.ensureKv(BUCKET_NAME, null);
1✔
63
  }
64

65
  @Override
66
  public QueueConfig getConfigByName(String name) {
67
    return getConfigByName(name, true);
1✔
68
  }
69

70
  @Override
71
  public QueueConfig getConfigByName(String name, boolean cached) {
72
    if (cached) {
1✔
73
      QueueConfig hit = cache.get(name);
1✔
74
      if (hit != null) {
1✔
75
        return hit;
1✔
76
      }
77
    }
78
    QueueConfig loaded = loadByKey(NatsKvKeys.sanitize(name));
1✔
79
    if (loaded != null) {
1✔
80
      cache.put(name, loaded);
1✔
81
    }
82
    return loaded;
1✔
83
  }
84

85
  @Override
86
  public QueueConfig getQConfig(String id, boolean cached) {
87
    if (cached) {
1✔
88
      for (QueueConfig hit : cache.values()) {
1✔
89
        if (id != null && id.equals(hit.getId())) {
1!
90
          return hit;
1✔
91
        }
UNCOV
92
      }
×
93
    }
94
    return scanForId(id);
1✔
95
  }
96

97
  @Override
98
  public List<QueueConfig> getConfigByNames(Collection<String> names) {
99
    List<QueueConfig> out = new ArrayList<>(names.size());
1✔
100
    for (String n : names) {
1✔
101
      QueueConfig c = getConfigByName(n);
1✔
102
      if (c != null) {
1✔
103
        out.add(c);
1✔
104
      }
105
    }
1✔
106
    return out;
1✔
107
  }
108

109
  @Override
110
  public List<QueueConfig> findAllQConfig(Collection<String> ids) {
111
    List<QueueConfig> out = new ArrayList<>(ids.size());
1✔
112
    for (String id : ids) {
1✔
113
      QueueConfig c = getQConfig(id, true);
1✔
114
      if (c != null) {
1✔
115
        out.add(c);
1✔
116
      }
117
    }
1✔
118
    return out;
1✔
119
  }
120

121
  @Override
122
  public void saveQConfig(QueueConfig queueConfig) {
123
    try {
124
      kv().put(NatsKvKeys.sanitize(queueConfig.getName()), serialize(queueConfig));
1✔
125
      cache.put(queueConfig.getName(), queueConfig);
1✔
126
    } catch (IOException | JetStreamApiException e) {
1✔
127
      log.log(Level.WARNING, "saveQConfig " + queueConfig.getName() + " failed", e);
1✔
128
    }
1✔
129
  }
1✔
130

131
  @Override
132
  public void saveAllQConfig(List<QueueConfig> newConfigs) {
133
    for (QueueConfig c : newConfigs) {
1✔
134
      saveQConfig(c);
1✔
135
    }
1✔
136
  }
1✔
137

138
  @Override
139
  public void clearCacheByName(String name) {
140
    cache.remove(name);
1✔
141
  }
1✔
142

143
  // ---- helpers ----------------------------------------------------------
144

145
  private QueueConfig loadByKey(String key) {
146
    try {
147
      KeyValueEntry entry = kv().get(key);
1✔
148
      if (entry == null || entry.getValue() == null) {
1!
149
        return null;
1✔
150
      }
151
      return deserialize(entry.getValue());
1✔
152
    } catch (IOException | JetStreamApiException e) {
1✔
153
      log.log(Level.WARNING, "loadByKey " + key + " failed", e);
1✔
154
      return null;
1✔
155
    }
156
  }
157

158
  private QueueConfig scanForId(String id) {
159
    if (id == null) {
1✔
160
      return null;
1✔
161
    }
162
    try {
163
      List<String> keys = new ArrayList<>(kv().keys());
1✔
164
      for (String k : keys) {
1✔
165
        QueueConfig c = loadByKey(k);
1✔
166
        if (c != null && id.equals(c.getId())) {
1!
167
          return c;
1✔
168
        }
169
      }
1✔
170
      return null;
1✔
171
    } catch (IOException | JetStreamApiException | InterruptedException e) {
1✔
172
      log.log(Level.WARNING, "scanForId " + id + " failed", e);
1✔
173
      if (e instanceof InterruptedException) {
1✔
174
        Thread.currentThread().interrupt();
1✔
175
      }
176
      return null;
1✔
177
    }
178
  }
179

180
  private byte[] serialize(QueueConfig c) throws IOException {
181
    return serdes.serialize(c);
1✔
182
  }
183

184
  private QueueConfig deserialize(byte[] bytes) {
185
    try {
186
      return serdes.deserialize(bytes, QueueConfig.class);
1✔
187
    } catch (Exception e) {
×
188
      log.log(Level.WARNING, "deserialize QueueConfig failed", e);
×
UNCOV
189
      return null;
×
190
    }
191
  }
192
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc