• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25600722838

09 May 2026 12:06PM UTC coverage: 83.396% (-5.3%) from 88.677%
25600722838

push

github

web-flow
Nats v2 web (#295)

* ci: compile main sources in coverage_report job

The coverage_report job was producing an effectively empty
jacocoTestReport.xml (3.4KB vs ~1.1MB locally) because no .class files
existed when coverageReportOnly ran — the job checked out source code
and downloaded .exec artifacts, but never compiled. JaCoCo's report
generator skips packages/classes it cannot resolve, so the merged XML
ended up with only <sessioninfo> entries and no <package> elements.

That made coverallsJacoco silently no-op via the
"source file set empty, skipping" branch in CoverallsReporter, so
"Push coverage to Coveralls" reported success without uploading.

Verified by downloading the coverage-report artifact from a recent run
and comparing its XML structure against a local build's report.

Assisted-By: Claude Code

* nats-web: implement pause / soft-delete admin ops and capability-aware Q-detail

Replace the all-stub `NatsRqueueUtilityService` with real impls for the operations
JetStream can model: `pauseUnpauseQueue` persists the `paused` flag on `QueueConfig`
in the queue-config KV bucket and notifies the local listener container so the poller
stops dispatching; `deleteMessage` is a soft delete via `MessageMetadataService`
(stream message persists, dashboard hides via the metadata flag); `getDataType`
reports `STREAM`. `moveMessage`, `enqueueMessage`, and `makeEmpty` deliberately
remain "not supported" — there is no JetStream primitive for those.

Update `RqueueQDetailServiceImpl.getRunningTasks` / `getScheduledTasks` to return
header-only tables when the broker capabilities suppress those sections, instead of
emitting zero rows or 501s on NATS.

20 new unit tests cover the pause/delete paths and lock in the still-unsupported
operations. Updates `nats-task.md` / `nats-task-v2.md` to reflect what landed.

Assisted-By: Claude Code

* nats-web: capability-aware nav / charts and stream-based peek

End-to-end browser-tested the NATS dashboard and shipped the t... (continued)

2566 of 3407 branches covered (75.32%)

Branch coverage included in aggregate %.

795 of 1072 new or added lines in 22 files covered. (74.16%)

312 existing lines in 38 files now uncovered.

7715 of 8921 relevant lines covered (86.48%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.0
/rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/GenericMessageConverter.java
1
/*
2
 * Copyright (c) 2019-2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.converter;
18

19
import static org.springframework.util.Assert.notNull;
20

21
import com.github.sonus21.rqueue.serdes.RqJacksonSerDes;
22
import com.github.sonus21.rqueue.serdes.RqJacksonTypeFactory;
23
import com.github.sonus21.rqueue.serdes.RqueueSerDes;
24
import com.github.sonus21.rqueue.serdes.RqueueTypeFactory;
25
import com.github.sonus21.rqueue.serdes.SerializationUtils;
26
import com.github.sonus21.rqueue.serdes.TypeEnvelop;
27
import java.lang.reflect.Field;
28
import java.lang.reflect.TypeVariable;
29
import java.nio.charset.StandardCharsets;
30
import java.util.Collection;
31
import java.util.List;
32
import lombok.AllArgsConstructor;
33
import lombok.Getter;
34
import lombok.NoArgsConstructor;
35
import lombok.Setter;
36
import lombok.extern.slf4j.Slf4j;
37
import org.springframework.messaging.Message;
38
import org.springframework.messaging.MessageHeaders;
39
import org.springframework.messaging.converter.SmartMessageConverter;
40
import org.springframework.messaging.support.GenericMessage;
41
import tools.jackson.core.JacksonException;
42
import tools.jackson.core.JsonGenerator;
43
import tools.jackson.core.JsonParser;
44
import tools.jackson.databind.DeserializationContext;
45
import tools.jackson.databind.ObjectMapper;
46
import tools.jackson.databind.SerializationContext;
47
import tools.jackson.databind.annotation.JsonDeserialize;
48
import tools.jackson.databind.annotation.JsonSerialize;
49
import tools.jackson.databind.deser.std.StdDeserializer;
50
import tools.jackson.databind.ser.std.StdSerializer;
51

52
/**
53
 * A converter to turn the payload of a {@link Message} from serialized form to a typed String and
54
 * vice versa. Supports {@link List} and single-level generic envelope types (e.g. {@code Event<T>})
55
 * where type parameters are non-generic and can be resolved from non-null field values.
56
 */
57
@Slf4j
1✔
58
public class GenericMessageConverter implements SmartMessageConverter {
59

60
  private final SmartMessageSerDes smartMessageSerDes;
61

62
  public GenericMessageConverter() {
1✔
63
    this.smartMessageSerDes =
1✔
64
        new SmartMessageSerDes(SerializationUtils.getSerDes(), SerializationUtils.getTypeFactory());
1✔
65
  }
1✔
66

UNCOV
67
  public GenericMessageConverter(ObjectMapper objectMapper) {
×
UNCOV
68
    notNull(objectMapper, "objectMapper cannot be null");
×
UNCOV
69
    this.smartMessageSerDes = new SmartMessageSerDes(
×
70
        new RqJacksonSerDes(objectMapper), new RqJacksonTypeFactory(objectMapper));
UNCOV
71
  }
×
72

UNCOV
73
  public GenericMessageConverter(RqueueSerDes serDes, RqueueTypeFactory typeFactory) {
×
UNCOV
74
    notNull(serDes, "serDes cannot be null");
×
UNCOV
75
    notNull(typeFactory, "typeFactory cannot be null");
×
UNCOV
76
    this.smartMessageSerDes = new SmartMessageSerDes(serDes, typeFactory);
×
UNCOV
77
  }
×
78

79
  /**
80
   * Create a {@link Message} whose payload is the result of converting the given payload Object to
81
   * serialized form. It ignores all headers components.
82
   *
83
   * <p>If the converter cannot perform the conversion, it return {@code null}.
84
   *
85
   * @param payload the Object to convert
86
   * @param headers optional headers for the message (may be {@code null})
87
   * @return the new message, or {@code null}
88
   */
89
  @Override
90
  public Message<?> toMessage(Object payload, MessageHeaders headers) {
91
    log.trace("Payload: {} headers: {}", payload, headers);
1✔
92
    String message = smartMessageSerDes.serialize(payload);
1✔
93
    if (message == null) {
1✔
94
      return null;
1✔
95
    }
96
    return new GenericMessage<>(message);
1✔
97
  }
98

99
  @Override
100
  public Object fromMessage(Message<?> message, Class<?> targetClass, Object conversionHint) {
101
    log.trace("Message: {} class: {} hint: {}", message, targetClass, conversionHint);
1✔
102
    return fromMessage(message, targetClass);
1✔
103
  }
104

105
  @Override
106
  public Message<?> toMessage(Object payload, MessageHeaders headers, Object conversionHint) {
107
    log.trace("Payload: {} headers: {} hint: {}", payload, headers, conversionHint);
1✔
108
    return toMessage(payload, headers);
1✔
109
  }
110

111
  /**
112
   * Convert the payload of a {@link Message} from a serialized form to a typed Object of type
113
   * stored in message it self.
114
   *
115
   * <p>If the converter cannot perform the conversion it returns {@code null}.
116
   *
117
   * @param message     the input message
118
   * @param targetClass the target class for the conversion
119
   * @return the result of the conversion, or {@code null} if the converter cannot perform the
120
   * conversion.
121
   */
122
  @Override
123
  public Object fromMessage(Message<?> message, Class<?> targetClass) {
124
    log.trace("Message: {} class: {}", message, targetClass);
1✔
125
    String payload;
126
    try {
127
      payload = (String) message.getPayload();
1✔
128
    } catch (ClassCastException e) {
1✔
129
      return null;
1✔
130
    }
1✔
131
    return smartMessageSerDes.deserialize(payload);
1✔
132
  }
133

134
  @Getter
135
  @Setter
136
  @NoArgsConstructor
137
  @AllArgsConstructor
138
  private static class Msg {
139

140
    @JsonSerialize(using = Utf8BytesSerializer.class)
141
    @JsonDeserialize(using = Utf8BytesDeserializer.class)
142
    private byte[] msg;
143

144
    private String name;
145
  }
146

147
  private static class Utf8BytesSerializer extends StdSerializer<byte[]> {
148

149
    Utf8BytesSerializer() {
150
      super(byte[].class);
1✔
151
    }
1✔
152

153
    @Override
154
    public void serialize(byte[] value, JsonGenerator gen, SerializationContext ctx)
155
        throws JacksonException {
156
      gen.writeString(new String(value, StandardCharsets.UTF_8));
1✔
157
    }
1✔
158
  }
159

160
  private static class Utf8BytesDeserializer extends StdDeserializer<byte[]> {
161

162
    Utf8BytesDeserializer() {
163
      super(byte[].class);
1✔
164
    }
1✔
165

166
    @Override
167
    public byte[] deserialize(JsonParser p, DeserializationContext ctx) throws JacksonException {
168
      String text = p.getString();
1✔
169
      if (text == null) {
1!
UNCOV
170
        return null;
×
171
      }
172
      return text.getBytes(StandardCharsets.UTF_8);
1✔
173
    }
174
  }
175

176
  public static class SmartMessageSerDes {
177

178
    private final RqueueSerDes serDes;
179
    private final RqueueTypeFactory typeFactory;
180

181
    public SmartMessageSerDes(RqueueSerDes serDes, RqueueTypeFactory typeFactory) {
1✔
182
      this.serDes = serDes;
1✔
183
      this.typeFactory = typeFactory;
1✔
184
    }
1✔
185

186
    private String[] splitClassNames(String name) {
187
      return name.split("#");
1✔
188
    }
189

190
    private String getClassNameForCollection(String name, Collection<?> payload) {
191
      if (payload instanceof List) {
1✔
192
        if (payload.isEmpty()) {
1✔
193
          return null;
1✔
194
        }
195
        Object firstItem = ((List<?>) payload).get(0);
1✔
196
        // Only support non-generic item classes in lists to avoid ambiguous encoding
197
        if (firstItem.getClass().getTypeParameters().length > 0) {
1✔
198
          return null;
1✔
199
        }
200
        String itemClassName = getClassName(firstItem);
1✔
201
        if (itemClassName == null) {
1!
UNCOV
202
          return null;
×
203
        }
204
        return name + '#' + itemClassName;
1✔
205
      }
206
      return null;
1✔
207
    }
208

209
    private Class<?> resolveTypeVariable(Class<?> clazz, TypeVariable<?> tv, Object payload) {
210
      // TypeVariable instances are scoped to the class that declares them, so
211
      // field.getGenericType().equals(tv) can only match fields declared on clazz itself.
212
      // Superclass fields reference their own TypeVariable instances, which are distinct objects.
213
      for (Field field : clazz.getDeclaredFields()) {
1✔
214
        if (field.getGenericType().equals(tv)) {
1✔
215
          field.setAccessible(true);
1✔
216
          try {
217
            Object value = field.get(payload);
1✔
218
            if (value != null) {
1✔
219
              return value.getClass();
1✔
220
            }
UNCOV
221
          } catch (IllegalAccessException e) {
×
UNCOV
222
            log.debug("Cannot access field {}", field.getName(), e);
×
223
          }
1✔
224
        }
225
      }
226
      return null;
1✔
227
    }
228

229
    private String getGenericFieldBasedClassName(Class<?> clazz, Object payload) {
230
      TypeVariable<?>[] typeVariables = clazz.getTypeParameters();
1✔
231
      if (typeVariables.length == 0) {
1✔
232
        return clazz.getName();
1✔
233
      }
234
      StringBuilder sb = new StringBuilder(clazz.getName());
1✔
235
      for (TypeVariable<?> tv : typeVariables) {
1✔
236
        Class<?> resolved = resolveTypeVariable(clazz, tv, payload);
1✔
237
        if (resolved == null || resolved.getTypeParameters().length > 0) {
1!
238
          return null;
1✔
239
        }
240
        sb.append('#').append(resolved.getName());
1✔
241
      }
242
      return sb.toString();
1✔
243
    }
244

245
    private String getClassName(Object payload) {
246
      Class<?> payloadClass = payload.getClass();
1✔
247
      String name = payloadClass.getName();
1✔
248
      if (payload instanceof Collection) {
1✔
249
        return getClassNameForCollection(name, (Collection<?>) payload);
1✔
250
      }
251
      return getGenericFieldBasedClassName(payloadClass, payload);
1✔
252
    }
253

254
    private TypeEnvelop getTargetType(Msg msg) throws ClassNotFoundException {
255
      String[] classNames = splitClassNames(msg.getName());
1✔
256
      if (classNames.length == 1) {
1✔
257
        Class<?> c = Thread.currentThread().getContextClassLoader().loadClass(msg.getName());
1✔
258
        return typeFactory.create(c);
1✔
259
      }
260
      Class<?> envelopeClass =
261
          Thread.currentThread().getContextClassLoader().loadClass(classNames[0]);
1✔
262
      Class<?>[] classes = new Class<?>[classNames.length - 1];
1✔
263
      for (int i = 1; i < classNames.length; i++) {
1✔
264
        classes[i - 1] = Thread.currentThread().getContextClassLoader().loadClass(classNames[i]);
1✔
265
      }
266
      return typeFactory.create(envelopeClass, classes);
1✔
267
    }
268

269
    public Object deserialize(String payload) {
270
      try {
271
        if (SerializationUtils.isJson(payload)) {
1✔
272
          Msg msg = serDes.deserialize(payload, Msg.class);
1✔
273
          TypeEnvelop type = getTargetType(msg);
1✔
274
          return serDes.deserialize(msg.msg, type);
1✔
275
        }
276
      } catch (Exception e) {
1✔
277
        log.debug("Deserialization of message {} failed", payload, e);
1✔
278
      }
1✔
279
      return null;
1✔
280
    }
281

282
    public <T> T deserialize(byte[] payload, Class<T> clazz) {
UNCOV
283
      if (SerializationUtils.isEmpty(payload)) {
×
UNCOV
284
        return null;
×
285
      }
286
      try {
UNCOV
287
        return serDes.deserialize(payload, clazz);
×
UNCOV
288
      } catch (Exception e) {
×
UNCOV
289
        log.debug("Deserialization of message {} failed", new String(payload), e);
×
290
      }
UNCOV
291
      return null;
×
292
    }
293

294
    public String serialize(Object payload) {
295
      String name = getClassName(payload);
1✔
296
      if (name == null) {
1✔
297
        return null;
1✔
298
      }
299
      try {
300
        byte[] msg = serDes.serialize(payload);
1✔
301
        Msg message = new Msg(msg, name);
1✔
302
        return serDes.serializeAsString(message);
1✔
UNCOV
303
      } catch (Exception e) {
×
UNCOV
304
        log.debug("Serialisation failed", e);
×
UNCOV
305
        return null;
×
306
      }
307
    }
308
  }
309
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc