• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2222

25 Sep 2025 02:30PM UTC coverage: 95.522% (-0.05%) from 95.571%
#2222

push

github

web-flow
Merge pull request #1433 from nats-io/prioritized

(2.12) Prioritized Consumer Support

36 of 41 new or added lines in 10 files covered. (87.8%)

4 existing lines in 2 files now uncovered.

12137 of 12706 relevant lines covered (95.52%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.61
/src/main/java/io/nats/client/impl/NatsDispatcher.java
1
// Copyright 2015-2018 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.Dispatcher;
17
import io.nats.client.MessageHandler;
18
import io.nats.client.Subscription;
19

20
import java.time.Duration;
21
import java.util.Map;
22
import java.util.concurrent.ConcurrentHashMap;
23
import java.util.concurrent.Future;
24
import java.util.concurrent.atomic.AtomicBoolean;
25

26
import static io.nats.client.support.Validator.*;
27

28
class NatsDispatcher extends NatsConsumer implements Dispatcher, Runnable {
29

30
    protected final MessageQueue incoming;
31
    protected final MessageHandler defaultHandler;
32

33
    protected Future<Boolean> thread;
34
    protected final AtomicBoolean running;
35
    protected final AtomicBoolean started;
36

37
    protected String id;
38

39
    // This tracks subscriptions made with the default handlers
40
    // There can only be one default handler subscription for any given subject
41
    protected final Map<String, NatsSubscription> subWithDefaultHandlerBySubject;
42

43
    // This tracks subscriptions made with non-default handlers
44
    protected final Map<String, NatsSubscription> subWithNonDefaultHandlerBySid;
45

46
    // There can be multiple non default handlers for any given subject, this track them
47
    protected final Map<String, Map<String, NatsSubscription>> subsBySidNonDefaultHandlersBySubject;
48

49
    // This tracks the non-default handler by sid
50
    protected final Map<String, MessageHandler> nonDefaultHandlerBySid;
51

52
    protected final Duration waitForMessage;
53

54
    NatsDispatcher(NatsConnection conn, MessageHandler handler) {
55
        super(conn);
1✔
56
        this.defaultHandler = handler;
1✔
57
        this.incoming = new MessageQueue(true, conn.getOptions().getRequestCleanupInterval());
1✔
58
        this.subWithDefaultHandlerBySubject = new ConcurrentHashMap<>();
1✔
59
        this.subWithNonDefaultHandlerBySid = new ConcurrentHashMap<>();
1✔
60
        this.subsBySidNonDefaultHandlersBySubject = new ConcurrentHashMap<>();
1✔
61
        this.nonDefaultHandlerBySid = new ConcurrentHashMap<>();
1✔
62
        this.running = new AtomicBoolean(false);
1✔
63
        this.started = new AtomicBoolean(false);
1✔
64
        this.waitForMessage = Duration.ofMinutes(5); // This can be long since we aren't doing anything
1✔
65
    }
1✔
66

67
    @Override
68
    public void start(String id) {
69
        internalStart(id, true);
1✔
70
    }
1✔
71

72
    @SuppressWarnings("SameParameterValue")
73
    protected void internalStart(String id, boolean threaded) {
74
        if (!started.get()) {
1✔
75
            this.id = id;
1✔
76
            this.running.set(true);
1✔
77
            this.started.set(true);
1✔
78
            if (threaded) {
1✔
79
                thread = connection.getExecutor().submit(this, Boolean.TRUE);
1✔
80
            }
81
        }
82
    }
1✔
83

84
    boolean breakRunLoop() {
85
        return this.incoming.isDrained();
1✔
86
    }
87

88
    public void run() {
89
        try {
90
            while (running.get() && !Thread.interrupted()) {
1✔
91
                NatsMessage msg = this.incoming.pop(this.waitForMessage);
1✔
92
                if (msg != null) {
1✔
93
                    NatsSubscription sub = msg.getNatsSubscription();
1✔
94
                    if (sub != null && sub.isActive()) {
1✔
95
                        MessageHandler handler = nonDefaultHandlerBySid.get(sub.getSID());
1✔
96
                        if (handler == null) {
1✔
97
                            handler = defaultHandler;
1✔
98
                        }
99
                        // A dispatcher can have a null defaultHandler. You can't subscribe without a handler,
100
                        // but messages might come in while the dispatcher is being closed or after unsubscribe
101
                        // and the [non-default] handler has already been removed from subscriptionHandlers
102
                        if (handler != null) {
1✔
103
                            sub.incrementDeliveredCount();
1✔
104
                            this.incrementDeliveredCount();
1✔
105

106
                            try {
107
                                handler.onMessage(msg);
1✔
108
                            } catch (Exception exp) {
1✔
109
                                connection.processException(exp);
1✔
110
                            } catch (Error err) {
1✔
111
                                connection.processException(new Exception(err));
1✔
112
                            }
1✔
113

114
                            if (sub.reachedUnsubLimit()) {
1✔
115
                                connection.invalidate(sub);
1✔
116
                            }
117
                        }
118
                    }
119
                }
120

121
                if (breakRunLoop()) {
1✔
122
                    return;
1✔
123
                }
124
            }
1✔
125
        }
126
        catch (InterruptedException exp) {
×
127
            if (this.running.get()){
×
128
                this.connection.processException(exp);
×
129
            } //otherwise we did it
130
            Thread.currentThread().interrupt();
×
131
        }
132
        finally {
133
            this.running.set(false);
1✔
134
            this.thread = null;
1✔
135
        }
136
    }
1✔
137

138
    void stop(boolean unsubscribeAll) {
139
        this.running.set(false);
1✔
140
        this.incoming.pause();
1✔
141

142
        if (this.thread != null) {
1✔
143
            try {
144
                if (!this.thread.isCancelled()) {
1✔
145
                    this.thread.cancel(true);
1✔
146
                }
UNCOV
147
            } catch (Exception exp) {
×
148
                // let it go
149
            }
1✔
150
        }
151

152
        if (unsubscribeAll) {
1✔
153
            subWithDefaultHandlerBySubject.forEach((subject, sub) -> connection.unsubscribe(sub, -1));
1✔
154
            subWithNonDefaultHandlerBySid.forEach((sid, sub) -> connection.unsubscribe(sub, -1));
1✔
155
        }
156

157
        subWithDefaultHandlerBySubject.clear();
1✔
158
        subWithNonDefaultHandlerBySid.clear();
1✔
159
        subsBySidNonDefaultHandlersBySubject.clear();
1✔
160
        nonDefaultHandlerBySid.clear();
1✔
161
    }
1✔
162

163
    public boolean isActive() {
164
        return this.running.get();
1✔
165
    }
166

167
    String getId() {
168
        return id;
1✔
169
    }
170

171
    MessageQueue getMessageQueue() {
172
        return incoming;
1✔
173
    }
174

175
    MessageHandler getNonDefaultHandlerBySid(String sid) {
176
        return nonDefaultHandlerBySid.get(sid);
1✔
177
    }
178

179
    boolean hasNoSubs() {
180
        return subWithDefaultHandlerBySubject.isEmpty() && subWithNonDefaultHandlerBySid.isEmpty();
1✔
181
    }
182

183
    void resendSubscriptions() {
184
        this.subWithDefaultHandlerBySubject.forEach((subject, sub) ->
1✔
185
            connection.sendSubscriptionMessage(sub.getSID(), subject, sub.getQueueName(), true));
1✔
186
        this.subWithNonDefaultHandlerBySid.forEach((sid, sub) ->
1✔
187
            connection.sendSubscriptionMessage(sid, sub.getSubject(), sub.getQueueName(), true));
1✔
188
    }
1✔
189

190
    // Remove this sub from all of our tracking maps.
191
    // Instead of logic to figure where the sub is in the first place,
192
    //   we try all tracking maps, with guard rails.
193
    // It's possible multiple threads/workflow could be hitting this,
194
    //   but all the maps are ConcurrentHashMap, we're safe.
195
    // For the case when we do find the sub's subject mapped to the default handler,
196
    //   we double-check that what is mapped is the same sub, again mostly a code guard.
197
    void remove(NatsSubscription sub) {
198
        // remove from all maps
199
        // lots of code guards here instead of checking where the sub might be
200
        String sid = sub.getSID();
1✔
201
        NatsSubscription defaultSub = subWithDefaultHandlerBySubject.get(sub.getSubject());
1✔
202
        if (defaultSub != null && defaultSub.getSID().equals(sid)) {
1✔
203
            subWithDefaultHandlerBySubject.remove(sub.getSubject());
1✔
204
        }
205
        subWithNonDefaultHandlerBySid.remove(sid);
1✔
206
        nonDefaultHandlerBySid.remove(sid);
1✔
207
        Map<String, NatsSubscription> subsBySid = subsBySidNonDefaultHandlersBySubject.get(sub.getSubject());
1✔
208
        if (subsBySid != null) {
1✔
209
            // it could be null, I know it's weird
210
            subsBySid.remove(sid);
1✔
211
            if (subsBySid.isEmpty()) {
1✔
212
                // if there are no more for the subject, we can remove the entry
213
                // from the map to avoid empty entries/memory leak
214
                subsBySidNonDefaultHandlersBySubject.remove(sub.getSubject());
1✔
215
            }
216
        }
217
    }
1✔
218

219
    public Dispatcher subscribe(String subject) {
220
        if (defaultHandler == null) {
1✔
221
            throw new IllegalStateException("Dispatcher was made without a default handler.");
1✔
222
        }
223
        validateSubject(subject, true);
1✔
224
        this.subscribeImplCore(subject, null, null);
1✔
225
        return this;
1✔
226
    }
227

228
    NatsSubscription subscribeReturningSubscription(String subject) {
229
        validateSubject(subject, true);
1✔
230
        return this.subscribeImplCore(subject, null, null);
1✔
231
    }
232

233
    public Subscription subscribe(String subject, MessageHandler handler) {
234
        validateSubject(subject, true);
1✔
235
        required(handler, "Handler");
1✔
236
        return this.subscribeImplCore(subject, null, handler);
1✔
237
    }
238

239
    public Dispatcher subscribe(String subject, String queueName) {
240
        validateSubject(subject, true);
1✔
241
        validateQueueName(queueName, true);
1✔
242
        this.subscribeImplCore(subject, queueName, null);
1✔
243
        return this;
1✔
244
    }
245

246
    public Subscription subscribe(String subject, String queueName,  MessageHandler handler) {
247
        validateSubject(subject, true);
1✔
248
        validateQueueName(queueName, true);
1✔
249
        if (handler == null) {
1✔
250
            throw new IllegalArgumentException("MessageHandler is required in subscribe");
1✔
251
        }
252
        return this.subscribeImplCore(subject, queueName, handler);
1✔
253
    }
254

255
    // Assumes the subj/queuename checks are done, does check for closed status
256
    NatsSubscription subscribeImplCore(String subject, String queueName, MessageHandler handler) {
257
        checkBeforeSubImpl();
1✔
258

259
        // If the handler is null, then we use the default handler, which will not allow
260
        // duplicate subscriptions to exist.
261
        if (handler == null) {
1✔
262
            NatsSubscription sub = this.subWithDefaultHandlerBySubject.get(subject);
1✔
263

264
            if (sub == null) {
1✔
265
                sub = connection.createSubscription(subject, queueName, this, null);
1✔
266
                NatsSubscription wonTheRace = this.subWithDefaultHandlerBySubject.putIfAbsent(subject, sub);
1✔
267
                if (wonTheRace != null) {
1✔
268
                    this.connection.unsubscribe(sub, -1); // Could happen on very bad timing
×
269
                }
270
            }
271

272
            return sub;
1✔
273
        }
274

275
        return _subscribeImplHandlerProvided(subject, queueName, handler, null);
1✔
276
    }
277

278
    NatsSubscription subscribeImplJetStream(String subject, String queueName, MessageHandler handler, NatsSubscriptionFactory nsf) {
279
        checkBeforeSubImpl();
1✔
280
        return _subscribeImplHandlerProvided(subject, queueName, handler, nsf);
1✔
281
    }
282

283
    private NatsSubscription _subscribeImplHandlerProvided(String subject, String queueName, MessageHandler handler, NatsSubscriptionFactory nsf) {
284
        NatsSubscription sub = connection.createSubscription(subject, queueName, this, nsf);
1✔
285
        trackSubWithUserHandler(sub.getSID(), sub, handler);
1✔
286
        return sub;
1✔
287
    }
288

289
    String reSubscribe(NatsSubscription sub, String subject, String queueName, MessageHandler handler) {
290
        String sid = connection.reSubscribe(sub, subject, queueName);
1✔
291
        trackSubWithUserHandler(sid, sub, handler);
1✔
292
        return sid;
1✔
293
    }
294

295
    private void trackSubWithUserHandler(String sid, NatsSubscription sub, MessageHandler handler) {
296
        subWithNonDefaultHandlerBySid.put(sid, sub);
1✔
297
        Map<String, NatsSubscription> subsBySid =
1✔
298
            subsBySidNonDefaultHandlersBySubject.computeIfAbsent(sub.getSubject(), k -> new ConcurrentHashMap<>());
1✔
299
        subsBySid.put(sid, sub);
1✔
300
        nonDefaultHandlerBySid.put(sid, handler);
1✔
301
    }
1✔
302

303
    private void checkBeforeSubImpl() {
304
        if (!running.get()) {
1✔
305
            throw new IllegalStateException("Dispatcher is closed");
1✔
306
        }
307

308
        if (isDraining()) {
1✔
309
            throw new IllegalStateException("Dispatcher is draining");
×
310
        }
311
    }
1✔
312

313
    public Dispatcher unsubscribe(String subject) {
314
        return this.unsubscribe(subject, -1);
1✔
315
    }
316

317
    public Dispatcher unsubscribe(Subscription subscription) {
318
        return this.unsubscribe(subscription, -1);
1✔
319
    }
320

321
    public Dispatcher unsubscribe(String subject, int after) {
322
        if (!this.running.get()) {
1✔
323
            throw new IllegalStateException("Dispatcher is closed");
1✔
324
        }
325

326
        if (isDraining()) { // No op while draining
1✔
327
            return this;
1✔
328
        }
329

330
        if (subject == null || subject.length() == 0) {
1✔
331
            throw new IllegalArgumentException("Subject is required in unsubscribe");
1✔
332
        }
333

334
        // Connection unsubscribe ends up calling invalidate on the sub which calls dispatcher.remove
335
        // meaning all we do is call this unsubscribe method and the workflow takes care of the rest
336
        NatsSubscription defaultHandlerSub = subWithDefaultHandlerBySubject.get(subject);
1✔
337
        if (defaultHandlerSub != null) {
1✔
338
            connection.unsubscribe(defaultHandlerSub, after);
1✔
339
        }
340

341
        subWithNonDefaultHandlerBySid.forEach((sid, sub) -> connection.unsubscribe(sub, after));
1✔
342

343
        return this;
1✔
344
    }
345

346
    public Dispatcher unsubscribe(Subscription subscription, int after) {
347
        if (!this.running.get()) {
1✔
348
            throw new IllegalStateException("Dispatcher is closed");
1✔
349
        }
350

351
        if (isDraining()) { // No op while draining
1✔
352
            return this;
×
353
        }
354

355
        if (subscription.getDispatcher() != this) {
1✔
356
            throw new IllegalStateException("Subscription is not managed by this Dispatcher");
1✔
357
        }
358

359
        // We can probably optimize this path by adding getSID() to the Subscription interface.
360
        if (!(subscription instanceof NatsSubscription)) {
1✔
361
            throw new IllegalArgumentException("This Subscription implementation is not known by Dispatcher");
×
362
        }
363
        
364
        NatsSubscription ns = ((NatsSubscription) subscription);
1✔
365
        // Grab the NatsSubscription to verify we weren't given a different manager's subscription.
366
        NatsSubscription sub = subWithNonDefaultHandlerBySid.get(ns.getSID());
1✔
367

368
        if (sub != null) {
1✔
369
            connection.unsubscribe(sub, after); // Connection will tell us when to remove from the map
1✔
370
        }
371

372
        return this;
1✔
373
    }
374

375
    void sendUnsubForDrain() {
376
        subWithDefaultHandlerBySubject.forEach((id, sub) -> connection.sendUnsub(sub, -1));
1✔
377
        subWithNonDefaultHandlerBySid.forEach((sid, sub) -> connection.sendUnsub(sub, -1));
1✔
378
    }
1✔
379

380
    void cleanUpAfterDrain() {
381
        this.connection.cleanupDispatcher(this);
1✔
382
    }
1✔
383

384
    public boolean isDrained() {
385
        return !isActive() && super.isDrained();
1✔
386
    }
387
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc