• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #1875

18 Feb 2025 07:43PM UTC coverage: 95.693% (-0.02%) from 95.709%
#1875

push

github

web-flow
Service user endpoints can be run without queue groups. (#1277)

Bug fix: internal endpoints should not be run in a queue group.

16 of 16 new or added lines in 3 files covered. (100.0%)

2 existing lines in 1 file now uncovered.

11442 of 11957 relevant lines covered (95.69%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.79
/src/main/java/io/nats/service/Service.java
1
// Copyright 2023 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.service;
15

16
import io.nats.client.Connection;
17
import io.nats.client.Dispatcher;
18
import io.nats.client.support.DateTimeUtils;
19
import io.nats.client.support.JsonUtils;
20

21
import java.time.Duration;
22
import java.time.ZonedDateTime;
23
import java.util.ArrayList;
24
import java.util.List;
25
import java.util.concurrent.CompletableFuture;
26
import java.util.concurrent.ConcurrentHashMap;
27
import java.util.concurrent.ConcurrentMap;
28
import java.util.concurrent.TimeUnit;
29
import java.util.concurrent.locks.ReentrantLock;
30

31
import static io.nats.client.support.ApiConstants.*;
32
import static io.nats.client.support.JsonUtils.endJson;
33
import static io.nats.client.support.Validator.nullOrEmpty;
34

35
/**
36
 * The Services Framework introduces a higher-level API for implementing services with NATS.
37
 * Services automatically contain Ping, Info and Stats responders.
38
 * Services have one or more service endpoints. {@link ServiceEndpoint}
39
 * When multiple instances of a service endpoints are active they work in a queue, meaning only one listener responds to any given request.
40
 */
41
public class Service {
42
    public static final String SRV_PING = "PING";
43
    public static final String SRV_INFO = "INFO";
44
    public static final String SRV_STATS = "STATS";
45
    public static final String DEFAULT_SERVICE_PREFIX = "$SRV.";
46

47
    private final Connection conn;
48
    private final Duration drainTimeout;
49
    private final ConcurrentMap<String, EndpointContext> serviceContexts;
50
    private final List<EndpointContext> discoveryContexts;
51
    private final List<Dispatcher> dInternals;
52
    private final PingResponse pingResponse;
53
    private final InfoResponse infoResponse;
54

55
    private final ReentrantLock startStopLock;
56
    private CompletableFuture<Boolean> runningIndicator;
57
    private ZonedDateTime started;
58

59
    Service(ServiceBuilder b) {
1✔
60
        String id = new io.nats.client.NUID().next();
1✔
61
        conn = b.conn;
1✔
62
        drainTimeout = b.drainTimeout;
1✔
63
        dInternals = new ArrayList<>();
1✔
64
        startStopLock = new ReentrantLock();
1✔
65

66
        // build responses first. info needs to be available when adding service endpoints.
67
        pingResponse = new PingResponse(id, b.name, b.version, b.metadata);
1✔
68
        infoResponse = new InfoResponse(id, b.name, b.version, b.metadata, b.description);
1✔
69

70
        // set up the service contexts
71
        // ? do we need an internal dispatcher for any user endpoints !! addServiceEndpoint deals with it
72
        serviceContexts = new ConcurrentHashMap<>();
1✔
73
        for (ServiceEndpoint se : b.serviceEndpoints.values()) {
1✔
74
            addServiceEndpoint(se);
1✔
75
        }
1✔
76

77
        Dispatcher dTemp = null;
1✔
78
        if (b.pingDispatcher == null || b.infoDispatcher == null || b.statsDispatcher == null) {
1✔
79
            dTemp = conn.createDispatcher();
1✔
80
            dInternals.add(dTemp);
1✔
81
        }
82

83
        discoveryContexts = new ArrayList<>();
1✔
84
        addDiscoveryContexts(SRV_PING, pingResponse, b.pingDispatcher, dTemp);
1✔
85
        addDiscoveryContexts(SRV_INFO, infoResponse, b.infoDispatcher, dTemp);
1✔
86
        addStatsContexts(b.statsDispatcher, dTemp);
1✔
87
    }
1✔
88

89
    /**
90
     * Adds a service endpoint to the list of service contexts and starts it if the service is running.
91
     * @param se the service endpoint to be added
92
     */
93
    public void addServiceEndpoint(ServiceEndpoint se) {
94
        // do this first so it's available on start
95
        infoResponse.addServiceEndpoint(se);
1✔
96

97
        EndpointContext ctx;
98
        if (se.getDispatcher() == null) {
1✔
99
            Dispatcher dTemp = dInternals.isEmpty() ? null : dInternals.get(0);
1✔
100
            if (dTemp == null) {
1✔
101
                dTemp = conn.createDispatcher();
1✔
102
                dInternals.add(dTemp);
1✔
103
            }
104
            ctx = new EndpointContext(conn, dTemp, false, se);
1✔
105
        } else {
1✔
106
            ctx = new EndpointContext(conn, null, false, se);
1✔
107
        }
108
        serviceContexts.put(se.getName(), ctx);
1✔
109

110
        // if the service is already started, start the newly added context
111
        startStopLock.lock();
1✔
112
        try {
113
            if (runningIndicator != null) {
1✔
114
                ctx.start();
1✔
115
            }
116
        }
117
        finally {
118
            startStopLock.unlock();
1✔
119
        }
120
    }
1✔
121

122
    private void addDiscoveryContexts(String discoveryName, Dispatcher dUser, Dispatcher dInternal, ServiceMessageHandler handler) {
123
        Endpoint[] endpoints = new Endpoint[] {
1✔
124
            internalEndpoint(discoveryName, null, null),
1✔
125
            internalEndpoint(discoveryName, pingResponse.getName(), null),
1✔
126
            internalEndpoint(discoveryName, pingResponse.getName(), pingResponse.getId())
1✔
127
        };
128

129
        for (Endpoint endpoint : endpoints) {
1✔
130
            discoveryContexts.add(
1✔
131
                new EndpointContext(conn, dInternal, true,
132
                    new ServiceEndpoint(endpoint, handler, dUser)));
133
        }
134
    }
1✔
135

136
    /**
137
     * Adds discovery contexts for the service, reusing the same static bytes at registration.
138
     * @param discoveryName the name of the discovery
139
     * @param sr the service response
140
     * @param dUser the user dispatcher
141
     * @param dInternal the internal dispatcher
142
     */
143
    private void addDiscoveryContexts(String discoveryName, ServiceResponse sr, Dispatcher dUser, Dispatcher dInternal) {
144
        ServiceMessageHandler handler = smsg -> smsg.respond(conn, sr.serialize());
1✔
145
        addDiscoveryContexts(discoveryName, dUser, dInternal, handler);
1✔
146
    }
1✔
147

148
    private void addStatsContexts(Dispatcher dUser, Dispatcher dInternal) {
149
        ServiceMessageHandler handler = smsg -> smsg.respond(conn, getStatsResponse().serialize());
1✔
150
        addDiscoveryContexts(SRV_STATS, dUser, dInternal, handler);
1✔
151
    }
1✔
152

153
    private Endpoint internalEndpoint(String discoveryName, String optionalServiceNameSegment, String optionalServiceIdSegment) {
154
        String subject = toDiscoverySubject(discoveryName, optionalServiceNameSegment, optionalServiceIdSegment);
1✔
155
        return new Endpoint(subject, subject, null, null, false);
1✔
156
    }
157

158
    static String toDiscoverySubject(String discoveryName, String optionalServiceNameSegment, String optionalServiceIdSegment) {
159
        if (nullOrEmpty(optionalServiceIdSegment)) {
1✔
160
            if (nullOrEmpty(optionalServiceNameSegment)) {
1✔
161
                return DEFAULT_SERVICE_PREFIX + discoveryName;
1✔
162
            }
163
            return DEFAULT_SERVICE_PREFIX + discoveryName + "." + optionalServiceNameSegment;
1✔
164
        }
165
        return DEFAULT_SERVICE_PREFIX + discoveryName + "." + optionalServiceNameSegment + "." + optionalServiceIdSegment;
1✔
166
    }
167

168
    /**
169
     * Start the service
170
     * @return a future that can be held to see if another thread called stop
171
     */
172
    public CompletableFuture<Boolean> startService() {
173
        startStopLock.lock();
1✔
174
        try {
175
            if (runningIndicator == null) {
1✔
176
                runningIndicator = new CompletableFuture<>();
1✔
177
                for (EndpointContext ctx : serviceContexts.values()) {
1✔
178
                    ctx.start();
1✔
179
                }
1✔
180
                for (EndpointContext ctx : discoveryContexts) {
1✔
181
                    ctx.start();
1✔
182
                }
1✔
183
                started = DateTimeUtils.gmtNow();
1✔
184
            }
185
            return runningIndicator;
1✔
186
        }
187
        finally {
188
            startStopLock.unlock();
1✔
189
        }
190
    }
191

192
    /**
193
     * Get an instance of a ServiceBuilder.
194
     * @return the instance
195
     */
196
    public static ServiceBuilder builder() {
197
        return new ServiceBuilder();
1✔
198
    }
199

200
    /**
201
     * Stop the service by draining.
202
     */
203
    public void stop() {
204
        stop(true, null);
1✔
205
    }
1✔
206

207
    /**
208
     * Stop the service by draining. Mark the future that was received from the start method that the service completed exceptionally.
209
     * @param t the error cause
210
     */
211
    public void stop(Throwable t) {
212
        stop(true, t);
1✔
213
    }
1✔
214

215
    /**
216
     * Stop the service, optionally draining.
217
     * @param drain the flag indicating to drain or not
218
     */
219
    public void stop(boolean drain) {
220
        stop(drain, null);
1✔
221
    }
1✔
222

223
    /**
224
     * Stop the service, optionally draining and optionally with an error cause
225
     * @param drain the flag indicating to drain or not
226
     * @param t the optional error cause. If supplied, mark the future that was received from the start method that the service completed exceptionally
227
     */
228
    public void stop(boolean drain, Throwable t) {
229
        startStopLock.lock();
1✔
230
        try {
231
            if (runningIndicator != null) {
1✔
232
                if (drain) {
1✔
233
                    List<CompletableFuture<Boolean>> futures = new ArrayList<>();
1✔
234

235
                    for (Dispatcher d : dInternals) {
1✔
236
                        try {
237
                            futures.add(d.drain(drainTimeout));
1✔
238
                        }
239
                        catch (Exception e) { /* nothing I can really do, we are stopping anyway */ }
1✔
240
                    }
1✔
241

242
                    for (EndpointContext c : serviceContexts.values()) {
1✔
243
                        if (c.isNotInternalDispatcher()) {
1✔
244
                            try {
UNCOV
245
                                futures.add(c.getSub().drain(drainTimeout));
×
246
                            }
247
                            catch (Exception e) { /* nothing I can really do, we are stopping anyway */ }
1✔
248
                        }
249
                    }
1✔
250

251
                    for (EndpointContext c : discoveryContexts) {
1✔
252
                        if (c.isNotInternalDispatcher()) {
1✔
253
                            try {
UNCOV
254
                                futures.add(c.getSub().drain(drainTimeout));
×
255
                            }
256
                            catch (Exception e) { /* nothing I can really do, we are stopping anyway */ }
1✔
257
                        }
258
                    }
1✔
259

260
                    // make sure drain is done before closing dispatcher
261
                    long drainTimeoutMillis = drainTimeout.toMillis();
1✔
262
                    for (CompletableFuture<Boolean> f : futures) {
1✔
263
                        try {
264
                            f.get(drainTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
265
                        }
266
                        catch (Exception ignore) {
×
267
                            // don't care if it completes successfully or not, just that it's done.
268
                        }
1✔
269
                    }
1✔
270
                }
271

272
                // close internal dispatchers
273
                for (Dispatcher d : dInternals) {
1✔
274
                    conn.closeDispatcher(d);
1✔
275
                }
1✔
276

277
                // ok we are done
278
                if (t == null) {
1✔
279
                    runningIndicator.complete(true);
1✔
280
                }
281
                else {
282
                    runningIndicator.completeExceptionally(t);
1✔
283
                }
284
                runningIndicator = null; // we don't need a copy anymore
1✔
285
            }
286
        }
287
        finally {
288
            startStopLock.unlock();
1✔
289
        }
290
    }
1✔
291

292
    /**
293
     * Reset the statistics for the endpoints
294
     */
295
    public void reset() {
296
        started = DateTimeUtils.gmtNow();
1✔
297
        for (EndpointContext c : discoveryContexts) {
1✔
298
            c.reset();
1✔
299
        }
1✔
300
        for (EndpointContext c : serviceContexts.values()) {
1✔
301
            c.reset();
1✔
302
        }
1✔
303
    }
1✔
304

305
    /**
306
     * Get the id of the service
307
     * @return the id
308
     */
309
    public String getId() {
310
        return infoResponse.getId();
1✔
311
    }
312

313
    /**
314
     * Get the name of the service
315
     * @return the name
316
     */
317
    public String getName() {
318
        return infoResponse.getName();
1✔
319
    }
320

321
    /**
322
     * Get the version of the service
323
     * @return the version
324
     */
325
    public String getVersion() {
326
        return infoResponse.getVersion();
1✔
327
    }
328

329
    /**
330
     * Get the description of the service
331
     * @return the description
332
     */
333
    public String getDescription() {
334
        return infoResponse.getDescription();
1✔
335
    }
336

337
    /**
338
     * Get the drain timeout setting
339
     * @return the drain timeout setting
340
     */
341
    public Duration getDrainTimeout() {
342
        return drainTimeout;
1✔
343
    }
344

345
    /**
346
     * Get the pre-constructed ping response.
347
     * @return the ping response
348
     */
349
    public PingResponse getPingResponse() {
350
        return pingResponse;
1✔
351
    }
352

353
    /**
354
     * Get the pre-constructed info response.
355
     * @return the info response
356
     */
357
    public InfoResponse getInfoResponse() {
358
        return infoResponse;
1✔
359
    }
360

361
    /**
362
     * Get the up-to-date stats response which contains a list of all {@link EndpointStats}
363
     * @return the stats response
364
     */
365
    public StatsResponse getStatsResponse() {
366
        List<EndpointStats> endpointStats = new ArrayList<>();
1✔
367
        for (EndpointContext c : serviceContexts.values()) {
1✔
368
            endpointStats.add(c.getEndpointStats());
1✔
369
        }
1✔
370
        return new StatsResponse(pingResponse, started, endpointStats);
1✔
371
    }
372

373
    /**
374
     * Get the up-to-date {@link EndpointStats} for a specific endpoint
375
     * @param endpointName the endpoint name
376
     * @return the EndpointStats or null if the name is not found.
377
     */
378
    public EndpointStats getEndpointStats(String endpointName) {
379
        EndpointContext c = serviceContexts.get(endpointName);
1✔
380
        return c == null ? null : c.getEndpointStats();
1✔
381
    }
382

383
    @Override
384
    public String toString() {
385
        StringBuilder sb = JsonUtils.beginJsonPrefixed("\"Service\":");
1✔
386
        JsonUtils.addField(sb, ID, infoResponse.getId());
1✔
387
        JsonUtils.addField(sb, NAME, infoResponse.getName());
1✔
388
        JsonUtils.addField(sb, VERSION, infoResponse.getVersion());
1✔
389
        JsonUtils.addField(sb, DESCRIPTION, infoResponse.getDescription());
1✔
390
        return endJson(sb).toString();
1✔
391
    }
392
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc