• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Nanopublication / nanopub-java / 25662474597

11 May 2026 09:40AM UTC coverage: 53.432% (+0.1%) from 53.324%
25662474597

push

github

web-flow
Merge pull request #81 from Nanopublication/feature/query-status-aware-routing

feat(services): gate query instances on Nanopub-Query-Status header

1247 of 3254 branches covered (38.32%)

Branch coverage included in aggregate %.

5626 of 9609 relevant lines covered (58.55%)

8.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.74
src/main/java/org/nanopub/extra/services/QueryCall.java
1
package org.nanopub.extra.services;
2

3
import org.apache.http.Header;
4
import org.apache.http.HttpResponse;
5
import org.apache.http.client.methods.HttpGet;
6
import org.apache.http.util.EntityUtils;
7
import org.nanopub.NanopubUtils;
8
import org.nanopub.vocabulary.NPS;
9
import org.slf4j.Logger;
10
import org.slf4j.LoggerFactory;
11

12
import java.io.IOException;
13
import java.util.ArrayList;
14
import java.util.Date;
15
import java.util.LinkedList;
16
import java.util.List;
17
import java.util.Locale;
18
import java.util.concurrent.ConcurrentHashMap;
19
import java.util.concurrent.ConcurrentMap;
20

21
/**
22
 * Second-generation query API call.
23
 */
24
public class QueryCall {
25

26
    private static final int DEFAULT_PARALLEL_CALL_COUNT = 2;
27

28
    /**
29
     * System property setting how many query API instances to call in parallel.
30
     * Must be {@code >= 1}; defaults to {@value #DEFAULT_PARALLEL_CALL_COUNT}.
31
     * Env var {@code NANOPUB_QUERY_PARALLEL_CALL_COUNT} also accepted.
32
     */
33
    public static final String PARALLEL_CALL_COUNT_PROPERTY = "nanopub.query.parallel-call-count";
34

35
    /**
36
     * Environment variable equivalent of {@link #PARALLEL_CALL_COUNT_PROPERTY}.
37
     */
38
    public static final String PARALLEL_CALL_COUNT_ENV = "NANOPUB_QUERY_PARALLEL_CALL_COUNT";
39

40
    private static int maxRetryCount = 3;
6✔
41
    private static final Logger logger = LoggerFactory.getLogger(QueryCall.class);
9✔
42

43
    /**
44
     * Returns the number of query API instances to call in parallel, resolved
45
     * (in order) from {@link #PARALLEL_CALL_COUNT_PROPERTY},
46
     * {@link #PARALLEL_CALL_COUNT_ENV}, or the default of
47
     * {@value #DEFAULT_PARALLEL_CALL_COUNT}. Invalid values are ignored.
48
     *
49
     * @return the parallel call count (always {@code >= 1})
50
     */
51
    public static int getParallelCallCount() {
52
        String value = System.getProperty(PARALLEL_CALL_COUNT_PROPERTY);
9✔
53
        if (value == null || value.isEmpty()) value = System.getenv(PARALLEL_CALL_COUNT_ENV);
24!
54
        if (value != null && !value.trim().isEmpty()) {
18!
55
            try {
56
                int n = Integer.parseInt(value.trim());
12✔
57
                if (n >= 1) return n;
15✔
58
                logger.warn("Ignoring {}={}: must be >= 1", PARALLEL_CALL_COUNT_PROPERTY, value);
15✔
59
            } catch (NumberFormatException ex) {
3✔
60
                logger.warn("Ignoring {}={}: not an integer", PARALLEL_CALL_COUNT_PROPERTY, value);
15✔
61
            }
3✔
62
        }
63
        return DEFAULT_PARALLEL_CALL_COUNT;
6✔
64
    }
65

66
    /**
67
     * HTTP response header carrying the query instance's sync state.
68
     * See nanopub-query's {@code StatusController}.
69
     */
70
    public static final String QUERY_STATUS_HEADER = "Nanopub-Query-Status";
71

72
    /**
73
     * System property setting the cool-down (in seconds) before a query instance
74
     * evicted for non-ready status is re-considered. Default
75
     * {@value #DEFAULT_EVICTION_COOLDOWN_SECONDS}. Env var
76
     * {@code NANOPUB_QUERY_EVICTION_COOLDOWN_SECONDS} also accepted.
77
     */
78
    public static final String EVICTION_COOLDOWN_PROPERTY = "nanopub.query.eviction-cooldown-seconds";
79

80
    /**
81
     * Environment variable equivalent of {@link #EVICTION_COOLDOWN_PROPERTY}.
82
     */
83
    public static final String EVICTION_COOLDOWN_ENV = "NANOPUB_QUERY_EVICTION_COOLDOWN_SECONDS";
84

85
    private static final int DEFAULT_EVICTION_COOLDOWN_SECONDS = 300;
86

87
    private static final ConcurrentMap<String, Long> evictedUntil = new ConcurrentHashMap<>();
15✔
88

89
    /**
90
     * Returns the eviction cool-down in milliseconds, resolved from
91
     * {@link #EVICTION_COOLDOWN_PROPERTY}, {@link #EVICTION_COOLDOWN_ENV},
92
     * or the default of {@value #DEFAULT_EVICTION_COOLDOWN_SECONDS} seconds.
93
     */
94
    public static long getEvictionCooldownMillis() {
95
        String value = System.getProperty(EVICTION_COOLDOWN_PROPERTY);
9✔
96
        if (value == null || value.isEmpty()) value = System.getenv(EVICTION_COOLDOWN_ENV);
24!
97
        if (value != null && !value.trim().isEmpty()) {
18!
98
            try {
99
                long n = Long.parseLong(value.trim());
12✔
100
                if (n >= 0) return n * 1000L;
24✔
101
                logger.warn("Ignoring {}={}: must be >= 0", EVICTION_COOLDOWN_PROPERTY, value);
15✔
102
            } catch (NumberFormatException ex) {
3✔
103
                logger.warn("Ignoring {}={}: not a number", EVICTION_COOLDOWN_PROPERTY, value);
15✔
104
            }
3✔
105
        }
106
        return DEFAULT_EVICTION_COOLDOWN_SECONDS * 1000L;
6✔
107
    }
108

109
    /**
110
     * Returns true if the response's {@link #QUERY_STATUS_HEADER} signals a
111
     * fully-synced state ({@code READY} or {@code LOADING_UPDATES}). Missing
112
     * header is treated as ready for backwards compatibility with older
113
     * query instances.
114
     */
115
    static boolean isReadyStatus(HttpResponse resp) {
116
        Header h = resp.getFirstHeader(QUERY_STATUS_HEADER);
12✔
117
        if (h == null) return true;
12✔
118
        String v = h.getValue();
9✔
119
        if (v == null || v.isEmpty()) return true;
15!
120
        String upper = v.toUpperCase(Locale.ROOT);
12✔
121
        return upper.equals("READY") || upper.equals("LOADING_UPDATES");
36✔
122
    }
123

124
    private static void evict(String apiUrl, String reason) {
125
        long until = System.currentTimeMillis() + getEvictionCooldownMillis();
×
126
        evictedUntil.put(apiUrl, until);
×
127
        logger.warn("Evicting Nanopub Query instance {} until {} ({})", apiUrl, new Date(until), reason);
×
128
    }
×
129

130
    private static List<String> filterEvicted(List<String> instances) {
131
        long now = System.currentTimeMillis();
6✔
132
        List<String> result = new ArrayList<>(instances.size());
18✔
133
        for (String url : instances) {
30✔
134
            Long until = evictedUntil.get(url);
15✔
135
            if (until == null || until <= now) result.add(url);
18!
136
        }
3✔
137
        return result;
6✔
138
    }
139

140
    /**
141
     * Run a query call with the given query ID and parameters.
142
     *
143
     * @param queryRef the reference to the query to run
144
     * @return the HTTP response from the query API
145
     * @throws APINotReachableException       if the API is not reachable after retries
146
     * @throws NotEnoughAPIInstancesException if there are not enough API instances available
147
     */
148
    public static HttpResponse run(QueryRef queryRef) throws APINotReachableException, NotEnoughAPIInstancesException {
149
        int retryCount = 0;
6✔
150
        while (retryCount < maxRetryCount) {
9!
151
            QueryCall apiCall = new QueryCall(queryRef);
15✔
152
            apiCall.run();
6✔
153
            while (!apiCall.calls.isEmpty() && apiCall.resp == null) {
21!
154
                try {
155
                    Thread.sleep(200);
6✔
156
                } catch (InterruptedException ex) {
×
157
                    Thread.currentThread().interrupt();
×
158
                }
3✔
159
            }
160
            if (apiCall.resp != null) {
9!
161
                return apiCall.resp;
9✔
162
            }
163
            retryCount = retryCount + 1;
×
164
        }
×
165
        throw new APINotReachableException("Giving up contacting API: " + queryRef.getQueryId());
×
166
    }
167

168
    /**
169
     * System property naming a whitespace-separated list of query API instance URLs.
170
     * When set, this overrides discovery via the nanopub setting (env var
171
     * {@code NANOPUB_QUERY_INSTANCES} also accepted).
172
     */
173
    public static final String QUERY_INSTANCES_PROPERTY = "nanopub.query.instances";
174

175
    /**
176
     * Environment variable equivalent of {@link #QUERY_INSTANCES_PROPERTY}.
177
     */
178
    public static final String QUERY_INSTANCES_ENV = "NANOPUB_QUERY_INSTANCES";
179

180
    private static List<String> checkedApiInstances;
181

182
    /**
183
     * Returns the list of available query API instances that are currently accessible.
184
     * <p>
185
     * Sources, in order of priority:
186
     * <ol>
187
     *   <li>{@code nanopub.query.instances} system property / {@code NANOPUB_QUERY_INSTANCES} env var
188
     *       (whitespace-separated URLs).</li>
189
     *   <li>The active {@link org.nanopub.extra.setting.NanopubSetting}'s service intro collection,
190
     *       filtered to services of type {@link NPS#NANOPUB_QUERY_1_1}.</li>
191
     * </ol>
192
     * Each candidate is liveness-checked via an HTTP GET to its root URL.
193
     *
194
     * @return a list of accessible query API instances
195
     */
196
    public static List<String> getApiInstances() throws NotEnoughAPIInstancesException {
197
        if (checkedApiInstances != null) return checkedApiInstances;
12✔
198
        List<String> candidates = resolveCandidateInstances();
6✔
199
        if (candidates.isEmpty()) {
9!
200
            throw new NotEnoughAPIInstancesException("No query API instances configured or discoverable");
×
201
        }
202
        checkedApiInstances = new ArrayList<>();
12✔
203
        for (String a : candidates) {
30✔
204
            try {
205
                logger.info("Checking API instance: {}", a);
12✔
206
                HttpResponse resp = NanopubUtils.getHttpClient().execute(new HttpGet(a));
21✔
207
                if (!wasSuccessful(resp)) {
9✔
208
                    EntityUtils.consumeQuietly(resp.getEntity());
9✔
209
                    logger.error("FAILURE: Nanopub Query instance isn't accessible: {}", a);
15✔
210
                } else if (!isReadyStatus(resp)) {
9✔
211
                    Header h = resp.getFirstHeader(QUERY_STATUS_HEADER);
12✔
212
                    EntityUtils.consumeQuietly(resp.getEntity());
9✔
213
                    logger.error("FAILURE: Nanopub Query instance not ready (status={}): {}", h.getValue(), a);
18✔
214
                } else {
3✔
215
                    EntityUtils.consumeQuietly(resp.getEntity());
9✔
216
                    logger.info("SUCCESS: Nanopub Query instance is accessible: {}", a);
12✔
217
                    checkedApiInstances.add(a);
12✔
218
                }
219
            } catch (IOException ex) {
×
220
                logger.error("FAILURE: Nanopub Query instance isn't accessible: {}", a);
×
221
            }
3✔
222
        }
3✔
223
        logger.info("{} accessible Nanopub Query instances", checkedApiInstances.size());
18✔
224
        if (checkedApiInstances.isEmpty()) {
9✔
225
            checkedApiInstances = null;
6✔
226
            throw new NotEnoughAPIInstancesException("No healthy Nanopub Query instances available");
15✔
227
        }
228
        if (checkedApiInstances.size() == 1) {
12✔
229
            logger.warn("Only one healthy Nanopub Query instance available; no failover.");
9✔
230
        }
231
        return checkedApiInstances;
6✔
232
    }
233

234
    private static List<String> resolveCandidateInstances() {
235
        String override = System.getProperty(QUERY_INSTANCES_PROPERTY);
9✔
236
        if (override == null || override.isEmpty()) override = System.getenv(QUERY_INSTANCES_ENV);
24!
237
        if (override != null && !override.trim().isEmpty()) {
18!
238
            List<String> list = new ArrayList<>();
12✔
239
            for (String url : override.trim().split("\\s+")) list.add(url);
69✔
240
            logger.info("Using {} query API instance(s) from override", list.size());
18✔
241
            return list;
6✔
242
        }
243
        List<String> fromSetting = ServiceLookup.getServices(NPS.NANOPUB_QUERY_1_1);
9✔
244
        logger.info("Discovered {} query API instance(s) from setting", fromSetting.size());
18✔
245
        return new ArrayList<>(fromSetting);
15✔
246
    }
247

248
    private QueryRef queryRef;
249
    private List<String> apisToCall = new ArrayList<>();
15✔
250
    private List<Call> calls = new ArrayList<>();
15✔
251

252
    private HttpResponse resp;
253

254
    private QueryCall(QueryRef queryRef) {
6✔
255
        this.queryRef = queryRef;
9✔
256
        logger.info("Invoking API operation {}", queryRef);
12✔
257
    }
3✔
258

259
    private void run() throws NotEnoughAPIInstancesException {
260
        List<String> candidates = filterEvicted(getApiInstances());
9✔
261
        if (candidates.isEmpty()) {
9!
262
            throw new NotEnoughAPIInstancesException(
×
263
                    "All Nanopub Query instances are currently evicted (loading/resetting); try again later");
264
        }
265
        List<String> apiInstancesToTry = new LinkedList<>(candidates);
15✔
266
        int parallelCallCount = getParallelCallCount();
6✔
267
        while (!apiInstancesToTry.isEmpty() && apisToCall.size() < parallelCallCount) {
24!
268
            int randomIndex = (int) ((Math.random() * apiInstancesToTry.size()));
21✔
269
            String apiUrl = apiInstancesToTry.get(randomIndex);
15✔
270
            apisToCall.add(apiUrl);
15✔
271
            logger.info("Trying API ({}) {}", apisToCall.size(), apiUrl);
24✔
272
            apiInstancesToTry.remove(randomIndex);
12✔
273
        }
3✔
274
        for (String api : apisToCall) {
33✔
275
            Call call = new Call(api);
18✔
276
            calls.add(call);
15✔
277
            new Thread(call).start();
15✔
278
        }
3✔
279
    }
3✔
280

281
    private synchronized void finished(Call call, HttpResponse resp, String apiUrl) {
282
        if (this.resp != null) { // result already in
9!
283
            EntityUtils.consumeQuietly(resp.getEntity());
×
284
            return;
×
285
        }
286
        logger.info("Result in from {}:", apiUrl);
12✔
287
        logger.info("- Request: {}", queryRef);
15✔
288
        logger.info("- Response size: {}", resp.getEntity().getContentLength());
21✔
289
        this.resp = resp;
9✔
290

291
        for (Call c : calls) {
33✔
292
            if (c != call) c.abort();
15✔
293
        }
3✔
294
    }
3✔
295

296
    private static boolean wasSuccessful(HttpResponse resp) {
297
        if (resp == null || resp.getEntity() == null) return false;
15!
298
        int c = resp.getStatusLine().getStatusCode();
12✔
299
        if (c < 200 || c >= 300) return false;
24!
300
        return true;
6✔
301
    }
302

303
    private static boolean wasSuccessfulNonempty(HttpResponse resp) {
304
        if (!wasSuccessful(resp)) return false;
9!
305
        // TODO Make sure we always return proper error codes, and then this shouldn't be necessary:
306
        if (resp.getHeaders("Content-Length").length > 0 && resp.getEntity().getContentLength() < 0) return false;
33!
307
        return true;
6✔
308
    }
309

310

311
    private class Call implements Runnable {
312

313
        private String apiUrl;
314
        private HttpGet get;
315

316
        public Call(String apiUrl) {
15✔
317
            this.apiUrl = apiUrl;
9✔
318
        }
3✔
319

320
        public void run() {
321
            get = new HttpGet(apiUrl + "api/" + queryRef.getAsUrlString());
36✔
322
            get.setHeader("Accept", "text/csv, text/turtle;q=0.9, application/ld+json;q=0.8");
15✔
323
            HttpResponse resp = null;
6✔
324
            try {
325
                resp = NanopubUtils.getHttpClient().execute(get);
15✔
326
                if (!wasSuccessfulNonempty(resp)) {
9!
327
                    throw new IOException(resp.getStatusLine().toString());
×
328
                }
329
                if (!isReadyStatus(resp)) {
9!
330
                    Header h = resp.getFirstHeader(QUERY_STATUS_HEADER);
×
331
                    String status = h == null ? "missing" : h.getValue();
×
332
                    evict(apiUrl, "status " + status);
×
333
                    EntityUtils.consumeQuietly(resp.getEntity());
×
334
                } else {
×
335
                    finished(this, resp, apiUrl);
21✔
336
                }
337
            } catch (Exception ex) {
3✔
338
                if (resp != null) EntityUtils.consumeQuietly(resp.getEntity());
6!
339
                logger.error("Request to {} was not successful: {}", apiUrl, ex.getMessage());
21✔
340
            }
3✔
341
            calls.remove(this);
18✔
342
        }
3✔
343

344
        private void abort() {
345
            if (get == null) return;
9!
346
            if (get.isAborted()) return;
12!
347
            get.abort();
9✔
348
        }
3✔
349

350
    }
351

352
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc