• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2051

08 Jul 2025 02:56PM UTC coverage: 95.659% (+0.03%) from 95.626%
#2051

push

github

web-flow
Merge pull request #1352 from nats-io/ordered-examples

Ordered Consumer Examples

1 of 1 new or added line in 1 file covered. (100.0%)

3 existing lines in 3 files now uncovered.

11810 of 12346 relevant lines covered (95.66%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.41
/src/main/java/io/nats/client/impl/NatsDispatcher.java
1
// Copyright 2015-2018 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.client.impl;
15

16
import io.nats.client.Dispatcher;
17
import io.nats.client.MessageHandler;
18
import io.nats.client.Subscription;
19

20
import java.time.Duration;
21
import java.util.Map;
22
import java.util.concurrent.ConcurrentHashMap;
23
import java.util.concurrent.Future;
24
import java.util.concurrent.atomic.AtomicBoolean;
25

26
import static io.nats.client.support.Validator.*;
27

28
class NatsDispatcher extends NatsConsumer implements Dispatcher, Runnable {
29

30
    protected final MessageQueue incoming;
31
    protected final MessageHandler defaultHandler;
32

33
    protected Future<Boolean> thread;
34
    protected final AtomicBoolean running;
35
    protected final AtomicBoolean started;
36

37
    protected String id;
38

39
    // We will use the subject as the key for subscriptions that use the
40
    // default handler.
41
    protected final Map<String, NatsSubscription> subscriptionsUsingDefaultHandler;
42

43
    // We will use the SID as the key. Since  these subscriptions provide
44
    // their own handlers, we allow duplicates. There is a subtle but very
45
    // important difference here.
46
    protected final Map<String, NatsSubscription> subscriptionsWithHandlers;
47

48
    // We use the SID as the key here.
49
    protected final Map<String, MessageHandler> subscriptionHandlers;
50

51
    protected final Duration waitForMessage;
52

53
    NatsDispatcher(NatsConnection conn, MessageHandler handler) {
54
        super(conn);
1✔
55
        this.defaultHandler = handler;
1✔
56
        this.incoming = new MessageQueue(true, conn.getOptions().getRequestCleanupInterval());
1✔
57
        this.subscriptionsUsingDefaultHandler = new ConcurrentHashMap<>();
1✔
58
        this.subscriptionsWithHandlers = new ConcurrentHashMap<>();
1✔
59
        this.subscriptionHandlers = new ConcurrentHashMap<>();
1✔
60
        this.running = new AtomicBoolean(false);
1✔
61
        this.started = new AtomicBoolean(false);
1✔
62
        this.waitForMessage = Duration.ofMinutes(5); // This can be long since we aren't doing anything
1✔
63
    }
1✔
64

65
    @Override
66
    public void start(String id) {
67
        internalStart(id, true);
1✔
68
    }
1✔
69

70
    protected void internalStart(String id, boolean threaded) {
71
        if (!started.get()) {
1✔
72
            this.id = id;
1✔
73
            this.running.set(true);
1✔
74
            this.started.set(true);
1✔
75
            if (threaded) {
1✔
76
                thread = connection.getExecutor().submit(this, Boolean.TRUE);
1✔
77
            }
78
        }
79
    }
1✔
80

81
    boolean breakRunLoop() {
82
        return this.incoming.isDrained();
1✔
83
    }
84

85
    public void run() {
86
        try {
87
            while (running.get() && !Thread.interrupted()) {
1✔
88
                NatsMessage msg = this.incoming.pop(this.waitForMessage);
1✔
89
                if (msg != null) {
1✔
90
                    NatsSubscription sub = msg.getNatsSubscription();
1✔
91
                    if (sub != null && sub.isActive()) {
1✔
92
                        MessageHandler handler = subscriptionHandlers.get(sub.getSID());
1✔
93
                        if (handler == null) {
1✔
94
                            handler = defaultHandler;
1✔
95
                        }
96
                        // A dispatcher can have a null defaultHandler. You can't subscribe without a handler,
97
                        // but messages might come in while the dispatcher is being closed or after unsubscribe
98
                        // and the [non-default] handler has already been removed from subscriptionHandlers
99
                        if (handler != null) {
1✔
100
                            sub.incrementDeliveredCount();
1✔
101
                            this.incrementDeliveredCount();
1✔
102

103
                            try {
104
                                handler.onMessage(msg);
1✔
105
                            } catch (Exception exp) {
1✔
106
                                connection.processException(exp);
1✔
107
                            } catch (Error err) {
1✔
108
                                connection.processException(new Exception(err));
1✔
109
                            }
1✔
110

111
                            if (sub.reachedUnsubLimit()) {
1✔
112
                                connection.invalidate(sub);
1✔
113
                            }
114
                        }
115
                    }
116
                }
117

118
                if (breakRunLoop()) {
1✔
119
                    return;
1✔
120
                }
121
            }
1✔
122
        }
123
        catch (InterruptedException exp) {
×
124
            if (this.running.get()){
×
125
                this.connection.processException(exp);
×
126
            } //otherwise we did it
127
            Thread.currentThread().interrupt();
×
128
        }
129
        finally {
130
            this.running.set(false);
1✔
131
            this.thread = null;
1✔
132
        }
133
    }
1✔
134

135
    void stop(boolean unsubscribeAll) {
136
        this.running.set(false);
1✔
137
        this.incoming.pause();
1✔
138

139
        if (this.thread != null) {
1✔
140
            try {
141
                if (!this.thread.isCancelled()) {
1✔
142
                    this.thread.cancel(true);
1✔
143
                }
UNCOV
144
            } catch (Exception exp) {
×
145
                // let it go
146
            }
1✔
147
        }
148

149
        if (unsubscribeAll) {
1✔
150
            this.subscriptionsUsingDefaultHandler.forEach((subj, sub) -> {
1✔
151
                this.connection.unsubscribe(sub, -1);
1✔
152
            });
1✔
153
            this.subscriptionsWithHandlers.forEach((sid, sub) -> {
1✔
154
                this.connection.unsubscribe(sub, -1);
1✔
155
            });
1✔
156
        }
157

158
        this.subscriptionsUsingDefaultHandler.clear();
1✔
159
        this.subscriptionsWithHandlers.clear();
1✔
160
        this.subscriptionHandlers.clear();
1✔
161
    }
1✔
162

163
    public boolean isActive() {
164
        return this.running.get();
1✔
165
    }
166

167
    String getId() {
168
        return id;
1✔
169
    }
170

171
    MessageQueue getMessageQueue() {
172
        return incoming;
1✔
173
    }
174

175
    Map<String, MessageHandler> getSubscriptionHandlers() {
176
        return subscriptionHandlers;
1✔
177
    }
178

179
    void resendSubscriptions() {
180
        this.subscriptionsUsingDefaultHandler.forEach((id, sub)->{
1✔
181
            this.connection.sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName(), true);
1✔
182
        });
1✔
183
        this.subscriptionsWithHandlers.forEach((sid, sub)->{
1✔
184
            this.connection.sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName(), true);
1✔
185
        });
1✔
186
    }
1✔
187

188
    // Called by the connection when a subscription is removed.
189
    // We will first attempt to remove from subscriptionsWithHandlers
190
    // using the sub's SID, and if we don't find it there, we'll check
191
    // the subscriptionsUsingDefaultHandler Map and verify the SID
192
    // matches before removing. By verifying the SID in all cases we can
193
    // be certain we're removing the correct Subscription.
194
    void remove(NatsSubscription sub) {
195
        if (this.subscriptionsWithHandlers.remove(sub.getSID()) != null) {
1✔
196
            this.subscriptionHandlers.remove(sub.getSID());
1✔
197
        } else {
198
            NatsSubscription s = this.subscriptionsUsingDefaultHandler.get(sub.getSubject());
1✔
199
            if (s.getSID().equals(sub.getSID())) {
1✔
200
                this.subscriptionsUsingDefaultHandler.remove(sub.getSubject());
1✔
201
            }
202
        }
203
    }
1✔
204

205
    public Dispatcher subscribe(String subject) {
206
        validateSubject(subject, true);
1✔
207
        this.subscribeImplCore(subject, null, null);
1✔
208
        return this;
1✔
209
    }
210

211
    NatsSubscription subscribeReturningSubscription(String subject) {
212
        validateSubject(subject, true);
1✔
213
        return this.subscribeImplCore(subject, null, null);
1✔
214
    }
215

216
    public Subscription subscribe(String subject, MessageHandler handler) {
217
        validateSubject(subject, true);
1✔
218
        required(handler, "Handler");
1✔
219
        return this.subscribeImplCore(subject, null, handler);
1✔
220
    }
221

222
    public Dispatcher subscribe(String subject, String queueName) {
223
        validateSubject(subject, true);
1✔
224
        validateQueueName(queueName, true);
1✔
225
        this.subscribeImplCore(subject, queueName, null);
1✔
226
        return this;
1✔
227
    }
228

229
    public Subscription subscribe(String subject, String queueName,  MessageHandler handler) {
230
        validateSubject(subject, true);
1✔
231
        validateQueueName(queueName, true);
1✔
232
        if (handler == null) {
1✔
233
            throw new IllegalArgumentException("MessageHandler is required in subscribe");
1✔
234
        }
235
        return this.subscribeImplCore(subject, queueName, handler);
1✔
236
    }
237

238
    // Assumes the subj/queuename checks are done, does check for closed status
239
    NatsSubscription subscribeImplCore(String subject, String queueName, MessageHandler handler) {
240
        checkBeforeSubImpl();
1✔
241

242
        // If the handler is null, then we use the default handler, which will not allow
243
        // duplicate subscriptions to exist.
244
        if (handler == null) {
1✔
245
            NatsSubscription sub = this.subscriptionsUsingDefaultHandler.get(subject);
1✔
246

247
            if (sub == null) {
1✔
248
                sub = connection.createSubscription(subject, queueName, this, null);
1✔
249
                NatsSubscription wonTheRace = this.subscriptionsUsingDefaultHandler.putIfAbsent(subject, sub);
1✔
250
                if (wonTheRace != null) {
1✔
251
                    this.connection.unsubscribe(sub, -1); // Could happen on very bad timing
×
252
                }
253
            }
254

255
            return sub;
1✔
256
        }
257

258
        return _subscribeImplHandlerProvided(subject, queueName, handler, null);
1✔
259
    }
260

261
    NatsSubscription subscribeImplJetStream(String subject, String queueName, MessageHandler handler, NatsSubscriptionFactory nsf) {
262
        checkBeforeSubImpl();
1✔
263
        return _subscribeImplHandlerProvided(subject, queueName, handler, nsf);
1✔
264
    }
265

266
    private NatsSubscription _subscribeImplHandlerProvided(String subject, String queueName, MessageHandler handler, NatsSubscriptionFactory nsf) {
267
        NatsSubscription sub = connection.createSubscription(subject, queueName, this, nsf);
1✔
268
        this.subscriptionsWithHandlers.put(sub.getSID(), sub);
1✔
269
        this.subscriptionHandlers.put(sub.getSID(), handler);
1✔
270
        return sub;
1✔
271
    }
272

273
    String reSubscribe(NatsSubscription sub, String subject, String queueName, MessageHandler handler) {
274
        String sid = connection.reSubscribe(sub, subject, queueName);
1✔
275
        this.subscriptionsWithHandlers.put(sid, sub);
1✔
276
        this.subscriptionHandlers.put(sid, handler);
1✔
277
        return sid;
1✔
278
    }
279

280
    private void checkBeforeSubImpl() {
281
        if (!running.get()) {
1✔
282
            throw new IllegalStateException("Dispatcher is closed");
1✔
283
        }
284

285
        if (isDraining()) {
1✔
286
            throw new IllegalStateException("Dispatcher is draining");
×
287
        }
288
    }
1✔
289

290
    public Dispatcher unsubscribe(String subject) {
291
        return this.unsubscribe(subject, -1);
1✔
292
    }
293

294
    public Dispatcher unsubscribe(Subscription subscription) {
295
        return this.unsubscribe(subscription, -1);
1✔
296
    }
297

298
    public Dispatcher unsubscribe(String subject, int after) {
299
        if (!this.running.get()) {
1✔
300
            throw new IllegalStateException("Dispatcher is closed");
1✔
301
        }
302

303
        if (isDraining()) { // No op while draining
1✔
304
            return this;
1✔
305
        }
306

307
        if (subject == null || subject.length() == 0) {
1✔
308
            throw new IllegalArgumentException("Subject is required in unsubscribe");
1✔
309
        }
310

311
        NatsSubscription sub = this.subscriptionsUsingDefaultHandler.get(subject);
1✔
312

313
        if (sub != null) {
1✔
314
            this.connection.unsubscribe(sub, after); // Connection will tell us when to remove from the map
1✔
315
        }
316

317
        return this;
1✔
318
    }
319

320
    public Dispatcher unsubscribe(Subscription subscription, int after) {
321
        if (!this.running.get()) {
1✔
322
            throw new IllegalStateException("Dispatcher is closed");
1✔
323
        }
324

325
        if (isDraining()) { // No op while draining
1✔
326
            return this;
×
327
        }
328

329
        if (subscription.getDispatcher() != this) {
1✔
330
            throw new IllegalStateException("Subscription is not managed by this Dispatcher");
1✔
331
        }
332

333
        // We can probably optimize this path by adding getSID() to the Subscription interface.
334
        if (!(subscription instanceof NatsSubscription)) {
1✔
335
            throw new IllegalArgumentException("This Subscription implementation is not known by Dispatcher");
×
336
        }
337
        
338
        NatsSubscription ns = ((NatsSubscription) subscription);
1✔
339
        // Grab the NatsSubscription to verify we weren't given a different manager's subscription.
340
        NatsSubscription sub = this.subscriptionsWithHandlers.get(ns.getSID());
1✔
341

342
        if (sub != null) {
1✔
343
            this.connection.unsubscribe(sub, after); // Connection will tell us when to remove from the map
1✔
344
        }
345

346
        return this;
1✔
347
    }
348

349
    void sendUnsubForDrain() {
350
        this.subscriptionsUsingDefaultHandler.forEach((id, sub)->{
1✔
351
            this.connection.sendUnsub(sub, -1);
1✔
352
        });
1✔
353
        this.subscriptionsWithHandlers.forEach((sid, sub)->{
1✔
354
            this.connection.sendUnsub(sub, -1);
1✔
355
        });
1✔
356
    }
1✔
357

358
    void cleanUpAfterDrain() {
359
        this.connection.cleanupDispatcher(this);
1✔
360
    }
1✔
361

362
    public boolean isDrained() {
363
        return !isActive() && super.isDrained();
1✔
364
    }
365
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc