• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #2112

15 Aug 2025 05:52PM UTC coverage: 95.352% (-0.05%) from 95.404%
#2112

push

github

web-flow
Merge pull request #1396 from nats-io/service-flapper

Fix test flappers

10 of 15 new or added lines in 1 file covered. (66.67%)

2 existing lines in 1 file now uncovered.

11920 of 12501 relevant lines covered (95.35%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.0
/src/main/java/io/nats/service/Service.java
1
// Copyright 2023 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.service;
15

16
import io.nats.client.Connection;
17
import io.nats.client.Dispatcher;
18
import io.nats.client.support.DateTimeUtils;
19
import io.nats.client.support.JsonUtils;
20

21
import java.time.Duration;
22
import java.time.ZonedDateTime;
23
import java.util.ArrayList;
24
import java.util.Arrays;
25
import java.util.Collection;
26
import java.util.List;
27
import java.util.concurrent.*;
28
import java.util.concurrent.atomic.AtomicReference;
29
import java.util.concurrent.locks.ReentrantLock;
30

31
import static io.nats.client.support.ApiConstants.*;
32
import static io.nats.client.support.JsonUtils.endJson;
33
import static io.nats.client.support.Validator.nullOrEmpty;
34

35
/**
36
 * The Services Framework introduces a higher-level API for implementing services with NATS.
37
 * Services automatically contain Ping, Info and Stats responders.
38
 * Services have one or more service endpoints. {@link ServiceEndpoint}
39
 * When multiple instances of a service endpoints are active they work in a queue, meaning only one listener responds to any given request.
40
 */
41
public class Service {
42
    public static final String SRV_PING = "PING";
43
    public static final String SRV_INFO = "INFO";
44
    public static final String SRV_STATS = "STATS";
45
    public static final String DEFAULT_SERVICE_PREFIX = "$SRV.";
46

47
    private final Connection conn;
48
    private final Duration drainTimeout;
49
    private final ConcurrentHashMap<String, EndpointContext> serviceContexts;
50
    private final List<EndpointContext> discoveryContexts;
51
    private final List<Dispatcher> dInternals;
52
    private final AtomicReference<ZonedDateTime> startTimeRef;
53
    private final CompletableFuture<Boolean> startedFuture;
54
    private final PingResponse pingResponse;
55
    private final InfoResponse infoResponse;
56

57
    private final ReentrantLock startStopLock;
58
    private CompletableFuture<Boolean> runningIndicator;
59

60
    Service(ServiceBuilder b) {
1✔
61
        String id = new io.nats.client.NUID().next();
1✔
62
        conn = b.conn;
1✔
63
        drainTimeout = b.drainTimeout;
1✔
64
        dInternals = new ArrayList<>();
1✔
65
        startStopLock = new ReentrantLock();
1✔
66
        startTimeRef = new AtomicReference<>(DateTimeUtils.DEFAULT_TIME);
1✔
67
        startedFuture = new CompletableFuture<>();
1✔
68

69
        // build responses first. info needs to be available when adding service endpoints.
70
        pingResponse = new PingResponse(id, b.name, b.version, b.metadata);
1✔
71
        infoResponse = new InfoResponse(id, b.name, b.version, b.metadata, b.description);
1✔
72

73
        // set up the service contexts
74
        // ? do we need an internal dispatcher for any user endpoints !! addServiceEndpoint deals with it
75
        serviceContexts = new ConcurrentHashMap<>();
1✔
76
        addServiceEndpoints(b.serviceEndpoints.values());
1✔
77

78
        Dispatcher dTemp = null;
1✔
79
        if (b.pingDispatcher == null || b.infoDispatcher == null || b.statsDispatcher == null) {
1✔
80
            dTemp = conn.createDispatcher();
1✔
81
            dInternals.add(dTemp);
1✔
82
        }
83

84
        discoveryContexts = new ArrayList<>();
1✔
85
        addDiscoveryContexts(SRV_PING, pingResponse, b.pingDispatcher, dTemp);
1✔
86
        addDiscoveryContexts(SRV_INFO, infoResponse, b.infoDispatcher, dTemp);
1✔
87
        addStatsContexts(b.statsDispatcher, dTemp);
1✔
88
    }
1✔
89

90
    /**
91
     * Adds one or more service endpoint to the list of service contexts and starts it if the service is running.
92
     * @param serviceEndpoints one or more service endpoints to be added
93
     */
94
    public void addServiceEndpoints(ServiceEndpoint... serviceEndpoints) {
95
        addServiceEndpoints(Arrays.asList(serviceEndpoints));
1✔
96
    }
1✔
97

98
    /**
99
     * Adds all service endpoints to the list of service contexts and starts it if the service is running.
100
     * @param serviceEndpoints service endpoints to be added
101
     */
102
    public void addServiceEndpoints(Collection<ServiceEndpoint> serviceEndpoints) {
103
        startStopLock.lock();
1✔
104
        try {
105
            // do this first so it's available on start
106
            infoResponse.addServiceEndpoints(serviceEndpoints);
1✔
107
            for (ServiceEndpoint se : serviceEndpoints) {
1✔
108
                EndpointContext ctx;
109
                if (se.getDispatcher() == null) {
1✔
110
                    Dispatcher dTemp = dInternals.isEmpty() ? null : dInternals.get(0);
1✔
111
                    if (dTemp == null) {
1✔
112
                        dTemp = conn.createDispatcher();
1✔
113
                        dInternals.add(dTemp);
1✔
114
                    }
115
                    ctx = new EndpointContext(conn, dTemp, false, se);
1✔
116
                }
1✔
117
                else {
118
                    ctx = new EndpointContext(conn, null, false, se);
1✔
119
                }
120
                serviceContexts.put(se.getName(), ctx);
1✔
121

122
                // if the service is already started, start the newly added context
123
                if (runningIndicator != null) {
1✔
124
                    ctx.start();
1✔
125
                }
126
            }
1✔
127
        }
128
        finally {
129
            startStopLock.unlock();
1✔
130
        }
131
    }
1✔
132

133
    private void addDiscoveryContexts(String discoveryName, Dispatcher dUser, Dispatcher dInternal, ServiceMessageHandler handler) {
134
        Endpoint[] endpoints = new Endpoint[] {
1✔
135
            internalEndpoint(discoveryName, null, null),
1✔
136
            internalEndpoint(discoveryName, pingResponse.getName(), null),
1✔
137
            internalEndpoint(discoveryName, pingResponse.getName(), pingResponse.getId())
1✔
138
        };
139

140
        for (Endpoint endpoint : endpoints) {
1✔
141
            discoveryContexts.add(
1✔
142
                new EndpointContext(conn, dInternal, true,
143
                    new ServiceEndpoint(endpoint, handler, dUser)));
144
        }
145
    }
1✔
146

147
    /**
148
     * Adds discovery contexts for the service, reusing the same static bytes at registration.
149
     * @param discoveryName the name of the discovery
150
     * @param sr the service response
151
     * @param dUser the user dispatcher
152
     * @param dInternal the internal dispatcher
153
     */
154
    private void addDiscoveryContexts(String discoveryName, ServiceResponse sr, Dispatcher dUser, Dispatcher dInternal) {
155
        ServiceMessageHandler handler = smsg -> smsg.respond(conn, sr.serialize());
1✔
156
        addDiscoveryContexts(discoveryName, dUser, dInternal, handler);
1✔
157
    }
1✔
158

159
    private void addStatsContexts(Dispatcher dUser, Dispatcher dInternal) {
160
        ServiceMessageHandler handler = smsg -> smsg.respond(conn, getStatsResponse().serialize());
1✔
161
        addDiscoveryContexts(SRV_STATS, dUser, dInternal, handler);
1✔
162
    }
1✔
163

164
    private Endpoint internalEndpoint(String discoveryName, String optionalServiceNameSegment, String optionalServiceIdSegment) {
165
        String subject = toDiscoverySubject(discoveryName, optionalServiceNameSegment, optionalServiceIdSegment);
1✔
166
        return new Endpoint(subject, subject, null, null, false);
1✔
167
    }
168

169
    static String toDiscoverySubject(String discoveryName, String optionalServiceNameSegment, String optionalServiceIdSegment) {
170
        if (nullOrEmpty(optionalServiceIdSegment)) {
1✔
171
            if (nullOrEmpty(optionalServiceNameSegment)) {
1✔
172
                return DEFAULT_SERVICE_PREFIX + discoveryName;
1✔
173
            }
174
            return DEFAULT_SERVICE_PREFIX + discoveryName + "." + optionalServiceNameSegment;
1✔
175
        }
176
        return DEFAULT_SERVICE_PREFIX + discoveryName + "." + optionalServiceNameSegment + "." + optionalServiceIdSegment;
1✔
177
    }
178

179
    /**
180
     * Start the service
181
     * @return a future that can be held to see if another thread called stop
182
     */
183
    public CompletableFuture<Boolean> startService() {
184
        startStopLock.lock();
1✔
185
        try {
186
            if (runningIndicator == null) {
1✔
187
                runningIndicator = new CompletableFuture<>();
1✔
188
                for (EndpointContext ctx : serviceContexts.values()) {
1✔
189
                    ctx.start();
1✔
190
                }
1✔
191
                for (EndpointContext ctx : discoveryContexts) {
1✔
192
                    ctx.start();
1✔
193
                }
1✔
194
                startTimeRef.set(DateTimeUtils.gmtNow());
1✔
195
                startedFuture.complete(true);
1✔
196
            }
197
            return runningIndicator;
1✔
198
        }
199
        finally {
200
            startStopLock.unlock();
1✔
201
        }
202
    }
203

204
    /**
205
     * Get an instance of a ServiceBuilder.
206
     * @return the instance
207
     */
208
    public static ServiceBuilder builder() {
209
        return new ServiceBuilder();
1✔
210
    }
211

212
    /**
213
     * Stop the service by draining.
214
     */
215
    public void stop() {
216
        stop(true, null);
1✔
217
    }
1✔
218

219
    /**
220
     * Stop the service by draining. Mark the future that was received from the start method that the service completed exceptionally.
221
     * @param t the error cause
222
     */
223
    public void stop(Throwable t) {
224
        stop(true, t);
1✔
225
    }
1✔
226

227
    /**
228
     * Stop the service, optionally draining.
229
     * @param drain the flag indicating to drain or not
230
     */
231
    public void stop(boolean drain) {
232
        stop(drain, null);
1✔
233
    }
1✔
234

235
    /**
236
     * Stop the service, optionally draining and optionally with an error cause
237
     * @param drain the flag indicating to drain or not
238
     * @param t the optional error cause. If supplied, mark the future that was received from the start method that the service completed exceptionally
239
     */
240
    public void stop(boolean drain, Throwable t) {
241
        startStopLock.lock();
1✔
242
        try {
243
            if (runningIndicator != null) {
1✔
244
                if (drain) {
1✔
245
                    List<CompletableFuture<Boolean>> futures = new ArrayList<>();
1✔
246

247
                    for (Dispatcher d : dInternals) {
1✔
248
                        try {
249
                            futures.add(d.drain(drainTimeout));
1✔
250
                        }
251
                        catch (Exception e) { /* nothing I can really do, we are stopping anyway */ }
1✔
252
                    }
1✔
253

254
                    for (EndpointContext c : serviceContexts.values()) {
1✔
255
                        if (c.isNotInternalDispatcher()) {
1✔
256
                            try {
257
                                futures.add(c.drain(drainTimeout));
1✔
258
                            }
259
                            catch (Exception e) { /* nothing I can really do, we are stopping anyway */ }
1✔
260
                        }
261
                    }
1✔
262

263
                    for (EndpointContext c : discoveryContexts) {
1✔
264
                        if (c.isNotInternalDispatcher()) {
1✔
265
                            try {
266
                                futures.add(c.drain(drainTimeout));
1✔
267
                            }
268
                            catch (Exception e) { /* nothing I can really do, we are stopping anyway */ }
1✔
269
                        }
270
                    }
1✔
271

272
                    // make sure drain is done before closing dispatcher
273
                    long drainTimeoutMillis = drainTimeout.toMillis();
1✔
274
                    for (CompletableFuture<Boolean> f : futures) {
1✔
275
                        try {
276
                            f.get(drainTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
277
                        }
278
                        catch (Exception ignore) {
×
279
                            // don't care if it completes successfully or not, just that it's done.
280
                        }
1✔
281
                    }
1✔
282
                }
283

284
                // close internal dispatchers
285
                for (Dispatcher d : dInternals) {
1✔
286
                    conn.closeDispatcher(d);
1✔
287
                }
1✔
288

289
                // ok we are done
290
                if (t == null) {
1✔
291
                    runningIndicator.complete(true);
1✔
292
                }
293
                else {
294
                    runningIndicator.completeExceptionally(t);
1✔
295
                }
296
                runningIndicator = null; // we don't need a copy anymore
1✔
297
            }
298
        }
299
        finally {
300
            startStopLock.unlock();
1✔
301
        }
302
    }
1✔
303

304
    /**
305
     * Reset the statistics for the endpoints
306
     */
307
    public void reset() {
308
        if (isStarted()) {
1✔
309
            // has actually been started if the ref has been set
310
            startTimeRef.set(DateTimeUtils.gmtNow());
1✔
311
        }
312
        for (EndpointContext c : discoveryContexts) {
1✔
313
            c.reset();
1✔
314
        }
1✔
315
        for (EndpointContext c : serviceContexts.values()) {
1✔
316
            c.reset();
1✔
317
        }
1✔
318
    }
1✔
319

320
    /**
321
     * Get the id of the service
322
     * @return the id
323
     */
324
    public String getId() {
325
        return infoResponse.getId();
1✔
326
    }
327

328
    /**
329
     * Get the name of the service
330
     * @return the name
331
     */
332
    public String getName() {
333
        return infoResponse.getName();
1✔
334
    }
335

336
    /**
337
     * Get the version of the service
338
     * @return the version
339
     */
340
    public String getVersion() {
341
        return infoResponse.getVersion();
1✔
342
    }
343

344
    /**
345
     * Get the description of the service
346
     * @return the description
347
     */
348
    public String getDescription() {
349
        return infoResponse.getDescription();
1✔
350
    }
351

352
    /**
353
     * Get whether the service has full started
354
     * @return true if started
355
     */
356
    public boolean isStarted() {
357
        return startedFuture.isDone();
1✔
358
    }
359

360
    /**
361
     * Get
362
     * @param timeout the maximum time to wait
363
     * @param unit the time unit of the timeout argument
364
     * @return true if started by the timeout
365
     */
366
    public boolean isStarted(long timeout, TimeUnit unit) {
367
        try {
368
            return startedFuture.get(timeout, unit);
1✔
369
        }
NEW
370
        catch (InterruptedException e) {
×
NEW
371
            Thread.currentThread().interrupt();
×
NEW
372
            return false;
×
373
        }
NEW
374
        catch (ExecutionException | TimeoutException e) {
×
NEW
375
            return false;
×
376
        }
377
    }
378

379
    /**
380
     * Get the drain timeout setting
381
     * @return the drain timeout setting
382
     */
383
    public Duration getDrainTimeout() {
384
        return drainTimeout;
1✔
385
    }
386

387
    /**
388
     * Get the pre-constructed ping response.
389
     * @return the ping response
390
     */
391
    public PingResponse getPingResponse() {
392
        return pingResponse;
1✔
393
    }
394

395
    /**
396
     * Get the pre-constructed info response.
397
     * @return the info response
398
     */
399
    public InfoResponse getInfoResponse() {
400
        return infoResponse;
1✔
401
    }
402

403
    /**
404
     * Get the up-to-date stats response which contains a list of all {@link EndpointStats}
405
     * @return the stats response
406
     */
407
    public StatsResponse getStatsResponse() {
408
        List<EndpointStats> endpointStats = new ArrayList<>();
1✔
409
        for (EndpointContext c : serviceContexts.values()) {
1✔
410
            endpointStats.add(c.getEndpointStats());
1✔
411
        }
1✔
412
        // StatsResponse handles a start time of DateTimeUtils.DEFAULT_TIME
413
        return new StatsResponse(pingResponse, startTimeRef.get(), endpointStats);
1✔
414
    }
415

416
    /**
417
     * Get the up-to-date {@link EndpointStats} for a specific endpoint
418
     * @param endpointName the endpoint name
419
     * @return the EndpointStats or null if the name is not found.
420
     */
421
    public EndpointStats getEndpointStats(String endpointName) {
422
        EndpointContext c = serviceContexts.get(endpointName);
1✔
423
        return c == null ? null : c.getEndpointStats();
1✔
424
    }
425

426
    @Override
427
    public String toString() {
428
        StringBuilder sb = JsonUtils.beginJsonPrefixed("\"Service\":");
1✔
429
        JsonUtils.addField(sb, ID, infoResponse.getId());
1✔
430
        JsonUtils.addField(sb, NAME, infoResponse.getName());
1✔
431
        JsonUtils.addField(sb, VERSION, infoResponse.getVersion());
1✔
432
        JsonUtils.addField(sb, DESCRIPTION, infoResponse.getDescription());
1✔
433
        JsonUtils.addField(sb, STARTED, startTimeRef.get());
1✔
434
        return endJson(sb).toString();
1✔
435
    }
436
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc