• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 25600409404

09 May 2026 11:49AM UTC coverage: 83.347%. First build
25600409404

Pull #295

github

web-flow
Merge ace5e3c2a into 9757517ae
Pull Request #295: Nats v2 web

2564 of 3407 branches covered (75.26%)

Branch coverage included in aggregate %.

795 of 1072 new or added lines in 22 files covered. (74.16%)

7711 of 8921 relevant lines covered (86.44%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.41
/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/EndpointRegistry.java
1
/*
2
 * Copyright (c) 2020-2026 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.core;
18

19
import com.github.sonus21.rqueue.exception.OverrideException;
20
import com.github.sonus21.rqueue.exception.QueueDoesNotExist;
21
import com.github.sonus21.rqueue.listener.QueueDetail;
22
import com.github.sonus21.rqueue.utils.PriorityUtils;
23
import java.util.ArrayList;
24
import java.util.Comparator;
25
import java.util.HashMap;
26
import java.util.List;
27
import java.util.Map;
28
import java.util.function.Function;
29
import java.util.stream.Collectors;
30

31
/**
32
 * Stores all registered queue details
33
 *
34
 * <p>this stores all the queue details mapped from {@link
35
 * com.github.sonus21.rqueue.annotation.RqueueListener}.
36
 */
37
public final class EndpointRegistry {
38

39
  private static final Object lock = new Object();
1✔
40
  // composite key: "queueName" for single-consumer, "queueName##consumerName" for multi-consumer
41
  private static final Map<String, QueueDetail> registry = new HashMap<>();
1✔
42
  // first registered QueueDetail per queue name — used by enqueue / management operations
43
  private static final Map<String, QueueDetail> primaryByName = new HashMap<>();
1✔
44

45
  private static String compositeKey(QueueDetail qd) {
46
    String cn = qd.getConsumerName();
1✔
47
    return (cn != null && !cn.isEmpty()) ? qd.getName() + "##" + cn : qd.getName();
1!
48
  }
49

50
  private EndpointRegistry() {}
51

52
  /**
53
   * Get QueueDetail for the given queue. If queue is having priority than it should be called with
54
   * {@link com.github.sonus21.rqueue.utils.PriorityUtils#getQueueNameForPriority(String, String)}.
55
   *
56
   * @param queueName queue name
57
   * @return queue detail
58
   * @throws QueueDoesNotExist this error is thrown when queue is not registered.
59
   * @see #get(String, String)
60
   */
61
  public static QueueDetail get(String queueName) {
62
    QueueDetail queueDetail = primaryByName.get(queueName);
1✔
63
    if (queueDetail == null) {
1✔
64
      throw new QueueDoesNotExist(queueName);
1✔
65
    }
66
    return queueDetail;
1✔
67
  }
68

69
  /**
70
   * Get QueueDetail for the given queue, with priority
71
   *
72
   * @param queueName queue name
73
   * @param priority  priority of this queue like critical, high
74
   * @return queue detail
75
   * @throws QueueDoesNotExist this error is thrown when queue is not registered.
76
   */
77
  public static QueueDetail get(String queueName, String priority) {
78
    QueueDetail queueDetail =
1✔
79
        primaryByName.get(PriorityUtils.getQueueNameForPriority(queueName, priority));
1✔
80
    if (queueDetail == null) {
1!
81
      throw new QueueDoesNotExist(queueName);
×
82
    }
83
    return queueDetail;
1✔
84
  }
85

86
  public static void register(QueueDetail queueDetail) {
87
    synchronized (lock) {
1✔
88
      String ck = compositeKey(queueDetail);
1✔
89
      if (registry.containsKey(ck)) {
1!
90
        throw new OverrideException(queueDetail.getName());
×
91
      }
92
      registry.put(ck, queueDetail);
1✔
93
      primaryByName.putIfAbsent(queueDetail.getName(), queueDetail);
1✔
94
      lock.notifyAll();
1✔
95
    }
1✔
96
  }
1✔
97

98
  public static void delete() {
99
    synchronized (lock) {
1✔
100
      registry.clear();
1✔
101
      primaryByName.clear();
1✔
102
      lock.notifyAll();
1✔
103
    }
1✔
104
  }
1✔
105

106
  public static List<String> getActiveQueues() {
107
    synchronized (lock) {
1✔
108
      List<String> queues = primaryByName.values().stream()
1✔
109
          .filter(QueueDetail::isActive)
1✔
110
          .map(QueueDetail::getName)
1✔
111
          .collect(Collectors.toList());
1✔
112
      lock.notifyAll();
1✔
113
      return queues;
1✔
114
    }
115
  }
116

117
  public static List<QueueDetail> getActiveQueueDetails() {
118
    synchronized (lock) {
1✔
119
      List<QueueDetail> queueDetails =
1✔
120
          registry.values().stream().filter(QueueDetail::isActive).collect(Collectors.toList());
1✔
121
      lock.notifyAll();
1✔
122
      return queueDetails;
1✔
123
    }
124
  }
125

126
  public static Map<String, QueueDetail> getActiveQueueMap() {
127
    synchronized (lock) {
1✔
128
      Map<String, QueueDetail> queueDetails = primaryByName.values().stream()
1✔
129
          .filter(QueueDetail::isActive)
1✔
130
          .collect(Collectors.toMap(QueueDetail::getName, Function.identity()));
1✔
131
      lock.notifyAll();
1✔
132
      return queueDetails;
1✔
133
    }
134
  }
135

136
  public static String toStr() {
137
    StringBuilder builder = new StringBuilder();
×
138
    synchronized (lock) {
×
139
      List<QueueDetail> queueDetails = new ArrayList<>(registry.values());
×
140
      queueDetails.sort(Comparator.comparing(QueueDetail::getName));
×
141
      for (QueueDetail q : queueDetails) {
×
142
        builder.append(q.toString());
×
143
        builder.append("\n");
×
144
      }
×
145
      lock.notifyAll();
×
146
    }
×
147
    return builder.toString();
×
148
  }
149

150
  public static int getActiveQueueCount() {
151
    return getActiveQueues().size();
1✔
152
  }
153

154
  public static int getRegisteredQueueCount() {
155
    return registry.size();
1✔
156
  }
157

158
  /**
159
   * Returns every {@link QueueDetail} registered under the given queue name, including all
160
   * {@code @RqueueListener} methods that share the same backing storage. Used by the
161
   * dashboard to render one subscriber row per handler. Returns an empty list when no
162
   * detail is registered.
163
   */
164
  public static List<QueueDetail> getAllForQueue(String queueName) {
165
    if (queueName == null) {
1!
NEW
166
      return new ArrayList<>();
×
167
    }
168
    synchronized (lock) {
1✔
169
      List<QueueDetail> matches = registry.values().stream()
1✔
170
          .filter(qd -> queueName.equals(qd.getName()))
1✔
171
          .sorted(Comparator.comparing(qd -> {
1✔
NEW
172
            String cn = qd.getConsumerName();
×
NEW
173
            return cn == null ? "" : cn;
×
174
          }))
175
          .collect(Collectors.toList());
1✔
176
      lock.notifyAll();
1✔
177
      return matches;
1✔
178
    }
179
  }
180
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc