• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.java / #1870

17 Feb 2025 01:09PM UTC coverage: 95.711% (+0.009%) from 95.702%
#1870

push

github

web-flow
feature: update_service_to_support_adding_endpoint_after_construction (#1274)

* update_service_to_support_adding_dynamic_endpoint_after_service_construction

* revert format change & restrict empty list to InfoResponse

* revert format change & restrict empty list to InfoResponse

* revert format change & restrict empty list to InfoResponse

* revert format change & restrict empty list to InfoResponse

* adding comments for 2 discovery contexts

* sync with infoResponsse, adding empty list

38 of 44 new or added lines in 5 files covered. (86.36%)

1 existing line in 1 file now uncovered.

11447 of 11960 relevant lines covered (95.71%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.92
/src/main/java/io/nats/service/Service.java
1
// Copyright 2023 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at:
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package io.nats.service;
15

16
import io.nats.client.Connection;
17
import io.nats.client.Dispatcher;
18
import io.nats.client.support.DateTimeUtils;
19
import io.nats.client.support.JsonUtils;
20

21
import java.time.Duration;
22
import java.time.ZonedDateTime;
23
import java.util.ArrayList;
24
import java.util.List;
25
import java.util.concurrent.CompletableFuture;
26
import java.util.concurrent.ConcurrentHashMap;
27
import java.util.concurrent.ConcurrentMap;
28
import java.util.concurrent.TimeUnit;
29
import java.util.concurrent.locks.ReentrantLock;
30

31
import static io.nats.client.support.ApiConstants.*;
32
import static io.nats.client.support.JsonUtils.endJson;
33
import static io.nats.client.support.Validator.nullOrEmpty;
34

35
/**
36
 * The Services Framework introduces a higher-level API for implementing services with NATS.
37
 * Services automatically contain Ping, Info and Stats responders.
38
 * Services have one or more service endpoints. {@link ServiceEndpoint}
39
 * When multiple instances of a service endpoints are active they work in a queue, meaning only one listener responds to any given request.
40
 */
41
public class Service {
42
    public static final String SRV_PING = "PING";
43
    public static final String SRV_INFO = "INFO";
44
    public static final String SRV_STATS = "STATS";
45
    public static final String DEFAULT_SERVICE_PREFIX = "$SRV.";
46

47
    private final Connection conn;
48
    private final Duration drainTimeout;
49
    private final ConcurrentMap<String, EndpointContext> serviceContexts;
50
    private final List<EndpointContext> discoveryContexts;
51
    private final List<Dispatcher> dInternals;
52
    private final PingResponse pingResponse;
53
    private final InfoResponse infoResponse;
54

55
    private final ReentrantLock startStopLock;
56
    private CompletableFuture<Boolean> runningIndicator;
57
    private ZonedDateTime started;
58

59
    Service(ServiceBuilder b) {
1✔
60
        String id = new io.nats.client.NUID().next();
1✔
61
        conn = b.conn;
1✔
62
        drainTimeout = b.drainTimeout;
1✔
63
        dInternals = new ArrayList<>();
1✔
64
        startStopLock = new ReentrantLock();
1✔
65

66
        // set up the service contexts
67
        // ? do we need an internal dispatcher for any user endpoints
68
        Dispatcher dTemp = null;
1✔
69
        serviceContexts = new ConcurrentHashMap<>();
1✔
70
        for (ServiceEndpoint se : b.serviceEndpoints.values()) {
1✔
71
            addServiceEndpoint(se);
1✔
72
        }
1✔
73
        if (dTemp != null) {
1✔
UNCOV
74
            dInternals.add(dTemp);
×
75
        }
76

77
        // build static responses
78
        pingResponse = new PingResponse(id, b.name, b.version, b.metadata);
1✔
79
        infoResponse = new InfoResponse(id, b.name, b.version, b.metadata, b.description, b.serviceEndpoints.values());
1✔
80

81
        if (b.pingDispatcher == null || b.infoDispatcher == null || b.statsDispatcher == null) {
1✔
82
            dTemp = conn.createDispatcher();
1✔
83
            dInternals.add(dTemp);
1✔
84
        }
85
        else {
86
            dTemp = null;
1✔
87
        }
88

89
        discoveryContexts = new ArrayList<>();
1✔
90
        addDiscoveryContexts(SRV_PING, pingResponse, b.pingDispatcher, dTemp);
1✔
91
        addDynamicDiscoveryContexts(SRV_INFO, infoResponse, b.infoDispatcher, dTemp);
1✔
92
        addStatsContexts(b.statsDispatcher, dTemp);
1✔
93
    }
1✔
94

95
    /**
96
     * Adds a service endpoint to the list of service contexts and starts it if the service is running.
97
     * @param se the service endpoint to be added
98
     */
99
    public void addServiceEndpoint(ServiceEndpoint se) {
100
        Dispatcher dTemp = null == dInternals || dInternals.isEmpty() ? null : dInternals.get(0);
1✔
101
        EndpointContext ctx = null;
1✔
102
        if (se.getDispatcher() == null) {
1✔
103
            if (dTemp == null) {
1✔
104
                dTemp = conn.createDispatcher();
1✔
105
                dInternals.add(dTemp);
1✔
106
            }
107
            ctx = new EndpointContext(conn, dTemp, false, se);
1✔
108
        } else {
109
            ctx = new EndpointContext(conn, null, false, se);
1✔
110
        }
111
        serviceContexts.put(se.getName(), ctx);
1✔
112
        startStopLock.lock();
1✔
113
        try {
114
            if (runningIndicator != null) {
1✔
NEW
115
                ctx.start();
×
116
            }
117
        } finally {
118
            startStopLock.unlock();
1✔
119
        }
120

121
        if (null != infoResponse) {
1✔
122
            infoResponse.addServiceEndpoint(se);
1✔
123
        }
124

125
    }
1✔
126

127
    private void addDiscoveryContexts(String discoveryName, Dispatcher dUser, Dispatcher dInternal, ServiceMessageHandler handler) {
128
        Endpoint[] endpoints = new Endpoint[] {
1✔
129
            internalEndpoint(discoveryName, null, null),
1✔
130
            internalEndpoint(discoveryName, pingResponse.getName(), null),
1✔
131
            internalEndpoint(discoveryName, pingResponse.getName(), pingResponse.getId())
1✔
132
        };
133

134
        for (Endpoint endpoint : endpoints) {
1✔
135
            discoveryContexts.add(
1✔
136
                new EndpointContext(conn, dInternal, true,
137
                    new ServiceEndpoint(endpoint, handler, dUser)));
138
        }
139
    }
1✔
140

141
    /**
142
     * Adds dynamic discovery contexts for the service, dynamically generating the bytes content per call.
143
     * This is different from `addDiscoveryContexts` which reuses the same static bytes at registration.
144
     * @param discoveryName the name of the discovery
145
     * @param dUser the user dispatcher
146
     * @param dInternal the internal dispatcher
147
     * @param handler the service message handler
148
     */
149
    private void addDynamicDiscoveryContexts(String discoveryName, ServiceResponse sr, Dispatcher dUser, Dispatcher dInternal) {
150
        ServiceMessageHandler handler = smsg -> smsg.respond(conn, sr.serialize());
1✔
151
        addDiscoveryContexts(discoveryName, dUser, dInternal, handler);
1✔
152
    }
1✔
153

154
    /**
155
     * Adds discovery contexts for the service, reusing the same static bytes at registration.
156
     * @param discoveryName the name of the discovery
157
     * @param sr the service response
158
     * @param dUser the user dispatcher
159
     * @param dInternal the internal dispatcher
160
     */
161
    private void addDiscoveryContexts(String discoveryName, ServiceResponse sr, Dispatcher dUser, Dispatcher dInternal) {
162
        final byte[] responseBytes = sr.serialize();
1✔
163
        ServiceMessageHandler handler = smsg -> smsg.respond(conn, responseBytes);
1✔
164
        addDiscoveryContexts(discoveryName, dUser, dInternal, handler);
1✔
165
    }
1✔
166

167
    private void addStatsContexts(Dispatcher dUser, Dispatcher dInternal) {
168
        ServiceMessageHandler handler = smsg -> smsg.respond(conn, getStatsResponse().serialize());
1✔
169
        addDiscoveryContexts(SRV_STATS, dUser, dInternal, handler);
1✔
170
    }
1✔
171

172
    private Endpoint internalEndpoint(String discoveryName, String optionalServiceNameSegment, String optionalServiceIdSegment) {
173
        String subject = toDiscoverySubject(discoveryName, optionalServiceNameSegment, optionalServiceIdSegment);
1✔
174
        return new Endpoint(subject, subject, null, null, false);
1✔
175
    }
176

177
    static String toDiscoverySubject(String discoveryName, String optionalServiceNameSegment, String optionalServiceIdSegment) {
178
        if (nullOrEmpty(optionalServiceIdSegment)) {
1✔
179
            if (nullOrEmpty(optionalServiceNameSegment)) {
1✔
180
                return DEFAULT_SERVICE_PREFIX + discoveryName;
1✔
181
            }
182
            return DEFAULT_SERVICE_PREFIX + discoveryName + "." + optionalServiceNameSegment;
1✔
183
        }
184
        return DEFAULT_SERVICE_PREFIX + discoveryName + "." + optionalServiceNameSegment + "." + optionalServiceIdSegment;
1✔
185
    }
186

187
    /**
188
     * Start the service
189
     * @return a future that can be held to see if another thread called stop
190
     */
191
    public CompletableFuture<Boolean> startService() {
192
        startStopLock.lock();
1✔
193
        try {
194
            if (runningIndicator == null) {
1✔
195
                runningIndicator = new CompletableFuture<>();
1✔
196
                for (EndpointContext ctx : serviceContexts.values()) {
1✔
197
                    ctx.start();
1✔
198
                }
1✔
199
                for (EndpointContext ctx : discoveryContexts) {
1✔
200
                    ctx.start();
1✔
201
                }
1✔
202
                started = DateTimeUtils.gmtNow();
1✔
203
            }
204
            return runningIndicator;
1✔
205
        }
206
        finally {
207
            startStopLock.unlock();
1✔
208
        }
209
    }
210

211
    /**
212
     * Get an instance of a ServiceBuilder.
213
     * @return the instance
214
     */
215
    public static ServiceBuilder builder() {
216
        return new ServiceBuilder();
1✔
217
    }
218

219
    /**
220
     * Stop the service by draining.
221
     */
222
    public void stop() {
223
        stop(true, null);
1✔
224
    }
1✔
225

226
    /**
227
     * Stop the service by draining. Mark the future that was received from the start method that the service completed exceptionally.
228
     * @param t the error cause
229
     */
230
    public void stop(Throwable t) {
231
        stop(true, t);
1✔
232
    }
1✔
233

234
    /**
235
     * Stop the service, optionally draining.
236
     * @param drain the flag indicating to drain or not
237
     */
238
    public void stop(boolean drain) {
239
        stop(drain, null);
1✔
240
    }
1✔
241

242
    /**
243
     * Stop the service, optionally draining and optionally with an error cause
244
     * @param drain the flag indicating to drain or not
245
     * @param t the optional error cause. If supplied, mark the future that was received from the start method that the service completed exceptionally
246
     */
247
    public void stop(boolean drain, Throwable t) {
248
        startStopLock.lock();
1✔
249
        try {
250
            if (runningIndicator != null) {
1✔
251
                if (drain) {
1✔
252
                    List<CompletableFuture<Boolean>> futures = new ArrayList<>();
1✔
253

254
                    for (Dispatcher d : dInternals) {
1✔
255
                        try {
256
                            futures.add(d.drain(drainTimeout));
1✔
257
                        }
258
                        catch (Exception e) { /* nothing I can really do, we are stopping anyway */ }
1✔
259
                    }
1✔
260

261
                    for (EndpointContext c : serviceContexts.values()) {
1✔
262
                        if (c.isNotInternalDispatcher()) {
1✔
263
                            try {
264
                                futures.add(c.getSub().drain(drainTimeout));
1✔
265
                            }
266
                            catch (Exception e) { /* nothing I can really do, we are stopping anyway */ }
1✔
267
                        }
268
                    }
1✔
269

270
                    for (EndpointContext c : discoveryContexts) {
1✔
271
                        if (c.isNotInternalDispatcher()) {
1✔
272
                            try {
273
                                futures.add(c.getSub().drain(drainTimeout));
1✔
274
                            }
275
                            catch (Exception e) { /* nothing I can really do, we are stopping anyway */ }
1✔
276
                        }
277
                    }
1✔
278

279
                    // make sure drain is done before closing dispatcher
280
                    long drainTimeoutMillis = drainTimeout.toMillis();
1✔
281
                    for (CompletableFuture<Boolean> f : futures) {
1✔
282
                        try {
283
                            f.get(drainTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
284
                        }
285
                        catch (Exception ignore) {
×
286
                            // don't care if it completes successfully or not, just that it's done.
287
                        }
1✔
288
                    }
1✔
289
                }
290

291
                // close internal dispatchers
292
                for (Dispatcher d : dInternals) {
1✔
293
                    conn.closeDispatcher(d);
1✔
294
                }
1✔
295

296
                // ok we are done
297
                if (t == null) {
1✔
298
                    runningIndicator.complete(true);
1✔
299
                }
300
                else {
301
                    runningIndicator.completeExceptionally(t);
1✔
302
                }
303
                runningIndicator = null; // we don't need a copy anymore
1✔
304
            }
305
        }
306
        finally {
307
            startStopLock.unlock();
1✔
308
        }
309
    }
1✔
310

311
    /**
312
     * Reset the statistics for the endpoints
313
     */
314
    public void reset() {
315
        started = DateTimeUtils.gmtNow();
1✔
316
        for (EndpointContext c : discoveryContexts) {
1✔
317
            c.reset();
1✔
318
        }
1✔
319
        for (EndpointContext c : serviceContexts.values()) {
1✔
320
            c.reset();
1✔
321
        }
1✔
322
    }
1✔
323

324
    /**
325
     * Get the id of the service
326
     * @return the id
327
     */
328
    public String getId() {
329
        return infoResponse.getId();
1✔
330
    }
331

332
    /**
333
     * Get the name of the service
334
     * @return the name
335
     */
336
    public String getName() {
337
        return infoResponse.getName();
1✔
338
    }
339

340
    /**
341
     * Get the version of the service
342
     * @return the version
343
     */
344
    public String getVersion() {
345
        return infoResponse.getVersion();
1✔
346
    }
347

348
    /**
349
     * Get the description of the service
350
     * @return the description
351
     */
352
    public String getDescription() {
353
        return infoResponse.getDescription();
1✔
354
    }
355

356
    /**
357
     * Get the drain timeout setting
358
     * @return the drain timeout setting
359
     */
360
    public Duration getDrainTimeout() {
361
        return drainTimeout;
1✔
362
    }
363

364
    /**
365
     * Get the pre-constructed ping response.
366
     * @return the ping response
367
     */
368
    public PingResponse getPingResponse() {
369
        return pingResponse;
1✔
370
    }
371

372
    /**
373
     * Get the pre-constructed info response.
374
     * @return the info response
375
     */
376
    public InfoResponse getInfoResponse() {
377
        return infoResponse;
1✔
378
    }
379

380
    /**
381
     * Get the up-to-date stats response which contains a list of all {@link EndpointStats}
382
     * @return the stats response
383
     */
384
    public StatsResponse getStatsResponse() {
385
        List<EndpointStats> endpointStats = new ArrayList<>();
1✔
386
        for (EndpointContext c : serviceContexts.values()) {
1✔
387
            endpointStats.add(c.getEndpointStats());
1✔
388
        }
1✔
389
        return new StatsResponse(pingResponse, started, endpointStats);
1✔
390
    }
391

392
    /**
393
     * Get the up-to-date {@link EndpointStats} for a specific endpoint
394
     * @param endpointName the endpoint name
395
     * @return the EndpointStats or null if the name is not found.
396
     */
397
    public EndpointStats getEndpointStats(String endpointName) {
398
        EndpointContext c = serviceContexts.get(endpointName);
1✔
399
        return c == null ? null : c.getEndpointStats();
1✔
400
    }
401

402
    @Override
403
    public String toString() {
404
        StringBuilder sb = JsonUtils.beginJsonPrefixed("\"Service\":");
1✔
405
        JsonUtils.addField(sb, ID, infoResponse.getId());
1✔
406
        JsonUtils.addField(sb, NAME, infoResponse.getName());
1✔
407
        JsonUtils.addField(sb, VERSION, infoResponse.getVersion());
1✔
408
        JsonUtils.addField(sb, DESCRIPTION, infoResponse.getDescription());
1✔
409
        return endJson(sb).toString();
1✔
410
    }
411
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc