• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Nanopublication / nanopub-java / 25661208539

11 May 2026 09:13AM UTC coverage: 53.316% (+0.05%) from 53.268%
25661208539

Pull #80

github

web-flow
Merge 655e088c8 into 5e0e65089
Pull Request #80: feat(services): make query parallel-call count configurable; relax instance threshold

1224 of 3220 branches covered (38.01%)

Branch coverage included in aggregate %.

5593 of 9566 relevant lines covered (58.47%)

8.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.94
src/main/java/org/nanopub/extra/services/QueryCall.java
1
package org.nanopub.extra.services;
2

3
import org.apache.http.HttpResponse;
4
import org.apache.http.client.methods.HttpGet;
5
import org.apache.http.util.EntityUtils;
6
import org.nanopub.NanopubUtils;
7
import org.nanopub.vocabulary.NPS;
8
import org.slf4j.Logger;
9
import org.slf4j.LoggerFactory;
10

11
import java.io.IOException;
12
import java.util.ArrayList;
13
import java.util.LinkedList;
14
import java.util.List;
15

16
/**
17
 * Second-generation query API call.
18
 */
19
public class QueryCall {
20

21
    private static final int DEFAULT_PARALLEL_CALL_COUNT = 2;
22

23
    /**
24
     * System property setting how many query API instances to call in parallel.
25
     * Must be {@code >= 1}; defaults to {@value #DEFAULT_PARALLEL_CALL_COUNT}.
26
     * Env var {@code NANOPUB_QUERY_PARALLEL_CALL_COUNT} also accepted.
27
     */
28
    public static final String PARALLEL_CALL_COUNT_PROPERTY = "nanopub.query.parallel-call-count";
29

30
    /**
31
     * Environment variable equivalent of {@link #PARALLEL_CALL_COUNT_PROPERTY}.
32
     */
33
    public static final String PARALLEL_CALL_COUNT_ENV = "NANOPUB_QUERY_PARALLEL_CALL_COUNT";
34

35
    private static int maxRetryCount = 3;
6✔
36
    private static final Logger logger = LoggerFactory.getLogger(QueryCall.class);
12✔
37

38
    /**
39
     * Returns the number of query API instances to call in parallel, resolved
40
     * (in order) from {@link #PARALLEL_CALL_COUNT_PROPERTY},
41
     * {@link #PARALLEL_CALL_COUNT_ENV}, or the default of
42
     * {@value #DEFAULT_PARALLEL_CALL_COUNT}. Invalid values are ignored.
43
     *
44
     * @return the parallel call count (always {@code >= 1})
45
     */
46
    public static int getParallelCallCount() {
47
        String value = System.getProperty(PARALLEL_CALL_COUNT_PROPERTY);
9✔
48
        if (value == null || value.isEmpty()) value = System.getenv(PARALLEL_CALL_COUNT_ENV);
24!
49
        if (value != null && !value.trim().isEmpty()) {
18!
50
            try {
51
                int n = Integer.parseInt(value.trim());
12✔
52
                if (n >= 1) return n;
15✔
53
                logger.warn("Ignoring {}={}: must be >= 1", PARALLEL_CALL_COUNT_PROPERTY, value);
15✔
54
            } catch (NumberFormatException ex) {
3✔
55
                logger.warn("Ignoring {}={}: not an integer", PARALLEL_CALL_COUNT_PROPERTY, value);
15✔
56
            }
3✔
57
        }
58
        return DEFAULT_PARALLEL_CALL_COUNT;
6✔
59
    }
60

61
    /**
62
     * Run a query call with the given query ID and parameters.
63
     *
64
     * @param queryRef the reference to the query to run
65
     * @return the HTTP response from the query API
66
     * @throws APINotReachableException       if the API is not reachable after retries
67
     * @throws NotEnoughAPIInstancesException if there are not enough API instances available
68
     */
69
    public static HttpResponse run(QueryRef queryRef) throws APINotReachableException, NotEnoughAPIInstancesException {
70
        int retryCount = 0;
6✔
71
        while (retryCount < maxRetryCount) {
9!
72
            QueryCall apiCall = new QueryCall(queryRef);
15✔
73
            apiCall.run();
6✔
74
            while (!apiCall.calls.isEmpty() && apiCall.resp == null) {
21!
75
                try {
76
                    Thread.sleep(200);
6✔
77
                } catch (InterruptedException ex) {
×
78
                    Thread.currentThread().interrupt();
×
79
                }
3✔
80
            }
81
            if (apiCall.resp != null) {
9!
82
                return apiCall.resp;
9✔
83
            }
84
            retryCount = retryCount + 1;
×
85
        }
×
86
        throw new APINotReachableException("Giving up contacting API: " + queryRef.getQueryId());
×
87
    }
88

89
    /**
90
     * System property naming a whitespace-separated list of query API instance URLs.
91
     * When set, this overrides discovery via the nanopub setting (env var
92
     * {@code NANOPUB_QUERY_INSTANCES} also accepted).
93
     */
94
    public static final String QUERY_INSTANCES_PROPERTY = "nanopub.query.instances";
95

96
    /**
97
     * Environment variable equivalent of {@link #QUERY_INSTANCES_PROPERTY}.
98
     */
99
    public static final String QUERY_INSTANCES_ENV = "NANOPUB_QUERY_INSTANCES";
100

101
    private static List<String> checkedApiInstances;
102

103
    /**
104
     * Returns the list of available query API instances that are currently accessible.
105
     * <p>
106
     * Sources, in order of priority:
107
     * <ol>
108
     *   <li>{@code nanopub.query.instances} system property / {@code NANOPUB_QUERY_INSTANCES} env var
109
     *       (whitespace-separated URLs).</li>
110
     *   <li>The active {@link org.nanopub.extra.setting.NanopubSetting}'s service intro collection,
111
     *       filtered to services of type {@link NPS#NANOPUB_QUERY_1_1}.</li>
112
     * </ol>
113
     * Each candidate is liveness-checked via an HTTP GET to its root URL.
114
     *
115
     * @return a list of accessible query API instances
116
     */
117
    public static List<String> getApiInstances() throws NotEnoughAPIInstancesException {
118
        if (checkedApiInstances != null) return checkedApiInstances;
12✔
119
        List<String> candidates = resolveCandidateInstances();
6✔
120
        if (candidates.isEmpty()) {
9!
121
            throw new NotEnoughAPIInstancesException("No query API instances configured or discoverable");
×
122
        }
123
        checkedApiInstances = new ArrayList<>();
12✔
124
        for (String a : candidates) {
30✔
125
            try {
126
                logger.info("Checking API instance: {}", a);
12✔
127
                HttpResponse resp = NanopubUtils.getHttpClient().execute(new HttpGet(a));
21✔
128
                EntityUtils.consumeQuietly(resp.getEntity());
9✔
129
                if (wasSuccessful(resp)) {
9✔
130
                    logger.info("SUCCESS: Nanopub Query instance is accessible: {}", a);
12✔
131
                    checkedApiInstances.add(a);
15✔
132
                } else {
133
                    logger.error("FAILURE: Nanopub Query instance isn't accessible: {}", a);
12✔
134
                }
135
            } catch (IOException ex) {
×
136
                logger.error("FAILURE: Nanopub Query instance isn't accessible: {}", a);
×
137
            }
3✔
138
        }
3✔
139
        logger.info("{} accessible Nanopub Query instances", checkedApiInstances.size());
18✔
140
        if (checkedApiInstances.isEmpty()) {
9✔
141
            checkedApiInstances = null;
6✔
142
            throw new NotEnoughAPIInstancesException("No healthy Nanopub Query instances available");
15✔
143
        }
144
        if (checkedApiInstances.size() == 1) {
12✔
145
            logger.warn("Only one healthy Nanopub Query instance available; no failover.");
9✔
146
        }
147
        return checkedApiInstances;
6✔
148
    }
149

150
    private static List<String> resolveCandidateInstances() {
151
        String override = System.getProperty(QUERY_INSTANCES_PROPERTY);
9✔
152
        if (override == null || override.isEmpty()) override = System.getenv(QUERY_INSTANCES_ENV);
24!
153
        if (override != null && !override.trim().isEmpty()) {
18!
154
            List<String> list = new ArrayList<>();
12✔
155
            for (String url : override.trim().split("\\s+")) list.add(url);
69✔
156
            logger.info("Using {} query API instance(s) from override", list.size());
18✔
157
            return list;
6✔
158
        }
159
        List<String> fromSetting = ServiceLookup.getServices(NPS.NANOPUB_QUERY_1_1);
9✔
160
        logger.info("Discovered {} query API instance(s) from setting", fromSetting.size());
18✔
161
        return new ArrayList<>(fromSetting);
15✔
162
    }
163

164
    private QueryRef queryRef;
165
    private List<String> apisToCall = new ArrayList<>();
15✔
166
    private List<Call> calls = new ArrayList<>();
15✔
167

168
    private HttpResponse resp;
169

170
    private QueryCall(QueryRef queryRef) {
6✔
171
        this.queryRef = queryRef;
9✔
172
        logger.info("Invoking API operation {}", queryRef);
12✔
173
    }
3✔
174

175
    private void run() throws NotEnoughAPIInstancesException {
176
        List<String> apiInstancesToTry = new LinkedList<>(getApiInstances());
15✔
177
        int parallelCallCount = getParallelCallCount();
6✔
178
        while (!apiInstancesToTry.isEmpty() && apisToCall.size() < parallelCallCount) {
24!
179
            int randomIndex = (int) ((Math.random() * apiInstancesToTry.size()));
21✔
180
            String apiUrl = apiInstancesToTry.get(randomIndex);
15✔
181
            apisToCall.add(apiUrl);
15✔
182
            logger.info("Trying API ({}) {}", apisToCall.size(), apiUrl);
24✔
183
            apiInstancesToTry.remove(randomIndex);
12✔
184
        }
3✔
185
        for (String api : apisToCall) {
33✔
186
            Call call = new Call(api);
18✔
187
            calls.add(call);
15✔
188
            new Thread(call).start();
15✔
189
        }
3✔
190
    }
3✔
191

192
    private synchronized void finished(Call call, HttpResponse resp, String apiUrl) {
193
        if (this.resp != null) { // result already in
9!
194
            EntityUtils.consumeQuietly(resp.getEntity());
×
195
            return;
×
196
        }
197
        logger.info("Result in from {}:", apiUrl);
12✔
198
        logger.info("- Request: {}", queryRef);
15✔
199
        logger.info("- Response size: {}", resp.getEntity().getContentLength());
21✔
200
        this.resp = resp;
9✔
201

202
        for (Call c : calls) {
33✔
203
            if (c != call) c.abort();
15✔
204
        }
3✔
205
    }
3✔
206

207
    private static boolean wasSuccessful(HttpResponse resp) {
208
        if (resp == null || resp.getEntity() == null) return false;
15!
209
        int c = resp.getStatusLine().getStatusCode();
12✔
210
        if (c < 200 || c >= 300) return false;
24!
211
        return true;
6✔
212
    }
213

214
    private static boolean wasSuccessfulNonempty(HttpResponse resp) {
215
        if (!wasSuccessful(resp)) return false;
9!
216
        // TODO Make sure we always return proper error codes, and then this shouldn't be necessary:
217
        if (resp.getHeaders("Content-Length").length > 0 && resp.getEntity().getContentLength() < 0) return false;
33!
218
        return true;
6✔
219
    }
220

221

222
    private class Call implements Runnable {
223

224
        private String apiUrl;
225
        private HttpGet get;
226

227
        public Call(String apiUrl) {
15✔
228
            this.apiUrl = apiUrl;
9✔
229
        }
3✔
230

231
        public void run() {
232
            get = new HttpGet(apiUrl + "api/" + queryRef.getAsUrlString());
36✔
233
            get.setHeader("Accept", "text/csv, text/turtle;q=0.9, application/ld+json;q=0.8");
15✔
234
            HttpResponse resp = null;
6✔
235
            try {
236
                resp = NanopubUtils.getHttpClient().execute(get);
15✔
237
                if (!wasSuccessfulNonempty(resp)) {
9!
238
                    throw new IOException(resp.getStatusLine().toString());
×
239
                }
240
                finished(this, resp, apiUrl);
21✔
241
            } catch (Exception ex) {
3✔
242
                if (resp != null) EntityUtils.consumeQuietly(resp.getEntity());
6!
243
                logger.error("Request to {} was not successful: {}", apiUrl, ex.getMessage());
21✔
244
            }
3✔
245
            calls.remove(this);
18✔
246
        }
3✔
247

248
        private void abort() {
249
            if (get == null) return;
9!
250
            if (get.isAborted()) return;
12!
251
            get.abort();
9✔
252
        }
3✔
253

254
    }
255

256
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc