• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 26012877262

18 May 2026 04:04AM UTC coverage: 83.319% (-0.09%) from 83.412%
26012877262

push

github

web-flow
fix: restore Jackson 2.x property order in RqueueRedisSerializer to prevent stale processing-queue entries after 3.x → 4.x upgrade (#300)

* fix: restore Jackson 2.x property order in RqueueRedisSerializer to prevent stale processing-queue entries after 3.x → 4.x upgrade

* build: bump version to 4.0.0-RC10

* feat: add rqueue.serialization.property.order property to control JSON field ordering

Introduces RqueueRedisSerializer.PropertyOrder enum (ALPHABETICAL | DECLARATION)
and wires it via rqueue.serialization.property.order (default: ALPHABETICAL).

ALPHABETICAL uses Jackson 3.x alphabetical ordering, the native default for
RQueue 4.x deployments. No configuration change required for new installs.

DECLARATION uses declaration order, matching the Jackson 2.x behaviour of
RQueue 3.x. Set this when upgrading from 3.x with messages still in Redis
queues, as switching while messages are in-flight causes unexpected retries.

The setting is applied in RqueueListenerBaseConfig before any Redis template is
created (overriding RedisUtils providers when DECLARATION is requested), and
flows through RqueueConfig to RqueueInternalPubSubChannel so all serialiser
instances in the application use the same order.

Docs: configuration.md and migrations.md updated with property description,
accepted values, and the 3.x → 4.x migration warning.

Assisted-By: Claude Sonnet 4.6

2627 of 3485 branches covered (75.38%)

Branch coverage included in aggregate %.

13 of 31 new or added lines in 3 files covered. (41.94%)

14 existing lines in 6 files now uncovered.

7847 of 9086 relevant lines covered (86.36%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.75
/rqueue-core/src/main/java/com/github/sonus21/rqueue/utils/ThreadUtils.java
1
/*
2
 * Copyright (c) 2020-2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.utils;
18

19
import java.util.ArrayList;
20
import java.util.Collection;
21
import java.util.List;
22
import java.util.concurrent.CancellationException;
23
import java.util.concurrent.ExecutionException;
24
import java.util.concurrent.Future;
25
import java.util.concurrent.ScheduledFuture;
26
import java.util.concurrent.TimeUnit;
27
import java.util.concurrent.TimeoutException;
28
import org.slf4j.Logger;
29
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
30
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
31

32
public final class ThreadUtils {
33

34
  private ThreadUtils() {}
35

36
  public static ThreadPoolTaskScheduler createTaskScheduler(
37
      int poolSize, String threadPrefix, int terminationTime) {
38
    ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
1✔
39
    scheduler.setBeanName(threadPrefix.substring(0, threadPrefix.length() - 1));
1✔
40
    scheduler.setPoolSize(poolSize);
1✔
41
    scheduler.setThreadNamePrefix(threadPrefix);
1✔
42
    scheduler.setAwaitTerminationSeconds(terminationTime);
1✔
43
    scheduler.setRemoveOnCancelPolicy(true);
1✔
44
    scheduler.afterPropertiesSet();
1✔
45
    return scheduler;
1✔
46
  }
47

48
  public static ThreadPoolTaskExecutor createTaskExecutor(
49
      String beanName, String threadPrefix, int corePoolSize, int maxPoolSize, int queueCapacity) {
50
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
1✔
51
    threadPoolTaskExecutor.setThreadNamePrefix(threadPrefix);
1✔
52
    threadPoolTaskExecutor.setBeanName(beanName);
1✔
53
    if (corePoolSize > 0) {
1✔
54
      threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
1✔
55
      threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
1✔
56
      threadPoolTaskExecutor.setMaxPoolSize(Math.max(corePoolSize, maxPoolSize));
1✔
57
      threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
1✔
58
    }
59
    threadPoolTaskExecutor.afterPropertiesSet();
1✔
60
    return threadPoolTaskExecutor;
1✔
61
  }
62

63
  private static void waitForShutdown(
64
      Logger log, Future<?> future, long waitTimeInMillis, String msg, Object... msgParams) {
65
    boolean completedOrCancelled = future.isCancelled() || future.isDone();
1!
66
    if (completedOrCancelled) {
1!
67
      return;
×
68
    }
69
    try {
70
      future.get(waitTimeInMillis, TimeUnit.MILLISECONDS);
1✔
UNCOV
71
    } catch (ExecutionException | TimeoutException | CancellationException e) {
×
UNCOV
72
      log.debug(msg, msgParams, e);
×
73
    } catch (InterruptedException e) {
×
74
      Thread.currentThread().interrupt();
×
75
    }
1✔
76
  }
1✔
77

78
  public static void waitForTermination(
79
      Logger log, Future<?> future, long waitTimeInMillis, String msg, Object... msgParams) {
80
    if (future == null || future.isCancelled() || future.isDone()) {
1!
81
      return;
1✔
82
    }
83
    if (future instanceof ScheduledFuture) {
1✔
84
      ScheduledFuture<?> f = (ScheduledFuture<?>) future;
1✔
85
      if (f.getDelay(TimeUnit.MILLISECONDS) > Constants.MIN_DELAY) {
1✔
86
        f.cancel(false);
1✔
87
        return;
1✔
88
      }
89
    }
90
    waitForShutdown(log, future, waitTimeInMillis, msg, msgParams);
1✔
91
  }
1✔
92

93
  public static boolean waitForWorkerTermination(
94
      final Collection<QueueThreadPool> queueThreadPools, long waitTime) {
95
    long endTime = System.currentTimeMillis() + waitTime;
1✔
96
    List<QueueThreadPool> remaining = new ArrayList<>(queueThreadPools);
1✔
97
    while (System.currentTimeMillis() < endTime && !remaining.isEmpty()) {
1✔
98
      List<QueueThreadPool> newRemaining = new ArrayList<>();
1✔
99
      for (QueueThreadPool queueThreadPool : remaining) {
1✔
100
        if (!queueThreadPool.allTasksCompleted()) {
1✔
101
          newRemaining.add(queueThreadPool);
1✔
102
        }
103
      }
1✔
104
      if (!newRemaining.isEmpty()) {
1✔
105
        TimeoutUtils.sleep(10);
1✔
106
      }
107
      remaining = newRemaining;
1✔
108
    }
1✔
109
    return remaining.isEmpty();
1✔
110
  }
111

112
  public static String getWorkerName(String name) {
113
    String camelCase = StringUtils.getBeanName(name);
1✔
114
    return camelCase + "Listener";
1✔
115
  }
116
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc