• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-query / 24664456478

20 Apr 2026 11:39AM UTC coverage: 59.905% (-0.3%) from 60.236%
24664456478

push

github

web-flow
Merge pull request #75 from knowledgepixels/fix/timeouts-backoff-breaker

fix/perf/feat: changes 1+7+8 — HTTP timeouts, exponential backoff, circuit breaker

293 of 544 branches covered (53.86%)

Branch coverage included in aggregate %.

841 of 1349 relevant lines covered (62.34%)

6.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

26.6
src/main/java/com/knowledgepixels/query/JellyNanopubLoader.java
1
package com.knowledgepixels.query;
2

3
import java.io.IOException;
4
import java.util.concurrent.atomic.AtomicLong;
5

6
import org.apache.http.client.methods.CloseableHttpResponse;
7
import org.apache.http.client.methods.HttpGet;
8
import org.apache.http.client.methods.HttpHead;
9
import org.apache.http.impl.client.CloseableHttpClient;
10
import org.apache.http.impl.client.HttpClientBuilder;
11
import org.apache.http.util.EntityUtils;
12
import org.nanopub.NanopubUtils;
13
import org.nanopub.jelly.NanopubStream;
14
import org.slf4j.Logger;
15
import org.slf4j.LoggerFactory;
16

17
/**
18
 * Loads nanopubs from the attached Nanopub Registry via a restartable Jelly stream.
19
 */
20
public class JellyNanopubLoader {
×
21
    static final String registryUrl;
22
    private static long lastCommittedCounter = -1;
4✔
23
    private static Long lastKnownSetupId = null;
4✔
24
    // Latest registry metadata fields, updated on each metadata fetch and forwarded to clients
25
    static volatile String lastCoverageTypes = null;
4✔
26
    static volatile String lastCoverageAgents = null;
4✔
27
    static volatile String lastTestInstance = null;
4✔
28
    static volatile String lastNanopubCount = null;
4✔
29
    private static final CloseableHttpClient metadataClient;
30
    private static final CloseableHttpClient jellyStreamClient;
31

32
    private static final int MAX_RETRIES_METADATA = 10;
33
    private static final int RETRY_DELAY_METADATA = 3000;
34
    private static final int RETRY_DELAY_JELLY = 5000;
35

36
    /**
37
     * Circuit-breaker state. Counts consecutive {@code loadUpdates} invocations in
38
     * which the catch block fired; resets to zero on the next successful batch.
39
     * Scheduled on the single-threaded executor in {@link MainVerticle}, so a plain
40
     * {@code int} is enough — no concurrency.
41
     *
42
     * <p>When the counter reaches {@link #BREAKER_THRESHOLD}, the next invocation
43
     * sleeps {@link #BREAKER_PAUSE_MS} before proceeding. Lets the saturated RDF4J
44
     * drain instead of being hammered every {@link #UPDATES_POLL_INTERVAL} ms.
45
     *
46
     * <p>Depends on {@link com.knowledgepixels.query.TripleStore}'s socket timeouts
47
     * (change 1 of the fix plan) to turn parked commits into propagating exceptions;
48
     * without them, {@code loadBatch} can park forever, the catch never fires, and
49
     * this counter stays at zero.
50
     */
51
    static volatile int consecutiveBatchFailures = 0;
4✔
52

53
    static final int BREAKER_THRESHOLD = 3;
54
    static final long BREAKER_PAUSE_MS = 30_000L;
55

56
    private static final Logger log = LoggerFactory.getLogger(JellyNanopubLoader.class);
6✔
57

58
    /**
59
     * Registry metadata returned by a HEAD request.
60
     */
61
    record RegistryMetadata(long loadCounter, Long setupId, String coverageTypes,
48✔
62
                            String coverageAgents, String testInstance, String nanopubCount,
63
                            String trustStateHash) {}
64

65
    /**
66
     * The interval in milliseconds at which the updates loader should poll for new nanopubs.
67
     */
68
    public static final int UPDATES_POLL_INTERVAL = 2000;
69

70
    enum LoadingType {
×
71
        INITIAL,
×
72
        UPDATE,
×
73
    }
74

75
    static {
76
        // Initialize registryUrl
77
        var url = Utils.getEnvString(
8✔
78
                "REGISTRY_FIXED_URL", "https://registry.knowledgepixels.com/"
79
        );
80
        if (!url.endsWith("/")) url += "/";
8!
81
        registryUrl = url;
4✔
82

83
        metadataClient = HttpClientBuilder.create().setDefaultRequestConfig(Utils.getHttpRequestConfig()).build();
10✔
84
        jellyStreamClient = NanopubUtils.getHttpClient();
4✔
85
    }
2✔
86

87
    /**
88
     * Start or continue (after restart) the initial loading procedure. This simply loads all
89
     * nanopubs from the attached Registry.
90
     *
91
     * @param afterCounter which counter to start from (-1 for the beginning)
92
     */
93
    public static void loadInitial(long afterCounter) {
94
        RegistryMetadata metadata = fetchRegistryMetadata();
4✔
95
        updateForwardingMetadata(metadata);
4✔
96
        TrustStateLoader.maybeUpdate(metadata.trustStateHash());
6✔
97
        long targetCounter = metadata.loadCounter();
6✔
98
        log.info("Fetched Registry load counter: {}", targetCounter);
10✔
99
        // Store setupId on initial load
100
        if (metadata.setupId() != null && lastKnownSetupId == null) {
6!
101
            lastKnownSetupId = metadata.setupId();
×
102
            StatusController.get().setRegistrySetupId(metadata.setupId());
×
103
        }
104
        lastCommittedCounter = afterCounter;
4✔
105
        while (lastCommittedCounter < targetCounter) {
8!
106
            try {
107
                loadBatch(lastCommittedCounter, LoadingType.INITIAL);
×
108
                log.info("Initial load: loaded batch up to counter {}", lastCommittedCounter);
×
109
            } catch (Exception e) {
×
110
                log.info("Failed to load batch starting from counter {}", lastCommittedCounter);
×
111
                log.info("Failure reason: ", e);
×
112
                try {
113
                    Thread.sleep(RETRY_DELAY_JELLY);
×
114
                } catch (InterruptedException e2) {
×
115
                    throw new RuntimeException("Interrupted while waiting to retry loading batch.");
×
116
                }
×
117
            }
×
118
        }
119
        log.info("Initial load complete.");
6✔
120
    }
2✔
121

122
    /**
123
     * Check if the Registry has any new nanopubs. If it does, load them.
124
     * This method should be called periodically, and you should wait for it to finish before
125
     * calling it again.
126
     */
127
    public static void loadUpdates() {
128
        // Circuit breaker: after BREAKER_THRESHOLD consecutive failed batches, pause
129
        // before the next attempt so a saturated RDF4J can drain. Check happens before
130
        // any RDF4J-touching work so the sleep isn't itself under the broken regime.
131
        if (consecutiveBatchFailures >= BREAKER_THRESHOLD) {
×
132
            log.warn("Circuit breaker active after {} consecutive batch failures; pausing {} ms before next attempt",
×
133
                    consecutiveBatchFailures, BREAKER_PAUSE_MS);
×
134
            try {
135
                Thread.sleep(BREAKER_PAUSE_MS);
×
136
            } catch (InterruptedException e) {
×
137
                // Preserve interruption semantics so a graceful shutdown (e.g. via
138
                // MainVerticle's shutdown hook) isn't blocked by the pause.
139
                Thread.currentThread().interrupt();
×
140
                return;
×
141
            }
×
142
        }
143
        try {
144
            final var status = StatusController.get().getState();
×
145
            lastCommittedCounter = status.loadCounter;
×
146
            RegistryMetadata metadata = fetchRegistryMetadata();
×
147
            updateForwardingMetadata(metadata);
×
148
            TrustStateLoader.maybeUpdate(metadata.trustStateHash());
×
149
            long targetCounter = metadata.loadCounter();
×
150
            Long currentSetupId = metadata.setupId();
×
151

152
            // Detect reset via setupId change
153
            if (lastKnownSetupId != null && currentSetupId != null
×
154
                    && !lastKnownSetupId.equals(currentSetupId)) {
×
155
                log.warn("Registry reset detected: setupId {} -> {}", lastKnownSetupId, currentSetupId);
×
156
                performResync(currentSetupId);
×
157
                return;
×
158
            }
159
            // Detect reset via counter decrease (also covers first run after upgrade
160
            // where no setupId was persisted yet but the registry has already been reset)
161
            if (lastCommittedCounter > 0 && targetCounter >= 0
×
162
                    && targetCounter < lastCommittedCounter) {
163
                log.warn("Registry counter decreased {} -> {}, triggering resync",
×
164
                        lastCommittedCounter, targetCounter);
×
165
                performResync(currentSetupId);
×
166
                return;
×
167
            }
168

169
            // Update lastKnownSetupId on first successful poll
170
            if (currentSetupId != null && lastKnownSetupId == null) {
×
171
                if (lastCommittedCounter > 0) {
×
172
                    // Upgrade from a version without setupId tracking. The DB has data but
173
                    // we can't verify it matches the current registry. Force a resync.
174
                    log.warn("No stored setupId but DB has data (counter: {}). "
×
175
                            + "Forcing resync to ensure data consistency.", lastCommittedCounter);
×
176
                    performResync(currentSetupId);
×
177
                    return;
×
178
                }
179
                lastKnownSetupId = currentSetupId;
×
180
                StatusController.get().setRegistrySetupId(currentSetupId);
×
181
            }
182

183
            StatusController.get().setLoadingUpdates(status.loadCounter);
×
184
            if (lastCommittedCounter >= targetCounter) {
×
185
                StatusController.get().setReady();
×
186
                return;
×
187
            }
188
            loadBatch(lastCommittedCounter, LoadingType.UPDATE);
×
189
            // Batch completed without an exception — reset the breaker counter.
190
            consecutiveBatchFailures = 0;
×
191
            log.info("Loaded {} update(s). Counter: {}, target was: {}",
×
192
                    lastCommittedCounter - status.loadCounter, lastCommittedCounter, targetCounter);
×
193
            if (lastCommittedCounter < targetCounter) {
×
194
                log.info("Warning: expected to load nanopubs up to (inclusive) counter " +
×
195
                        targetCounter + " based on the counter reported in Registry's headers, " +
196
                        "but loaded only up to {}.", lastCommittedCounter);
×
197
            }
198
        } catch (Exception e) {
×
199
            consecutiveBatchFailures++;
×
200
            log.warn("Failed to load updates. Current counter: {} (consecutive failures: {})",
×
201
                    lastCommittedCounter, consecutiveBatchFailures, e);
×
202
        } finally {
203
            try {
204
                StatusController.get().setReady();
×
205
            } catch (Exception e) {
×
206
                log.info("Update loader: failed to set status to READY.");
×
207
                log.info("Failure Reason: ", e);
×
208
            }
×
209
        }
210
    }
×
211

212
    /**
213
     * Re-stream all nanopubs from the registry after a reset is detected.
214
     * Existing nanopubs are skipped by NanopubLoader's per-repo dedup.
215
     *
216
     * @param newSetupId the new setup ID from the registry, or null if unknown
217
     */
218
    private static void performResync(Long newSetupId) {
219
        log.warn("Starting resync with registry. New setupId: {}", newSetupId);
×
220
        StatusController.get().setResetting();
×
221
        lastKnownSetupId = newSetupId;
×
222
        if (newSetupId != null) {
×
223
            StatusController.get().setRegistrySetupId(newSetupId);
×
224
        }
225
        StatusController.get().setLoadingInitial(-1);
×
226
        loadInitial(-1);
×
227
        StatusController.get().setReady();
×
228
        log.warn("Resync complete. Counter: {}", lastCommittedCounter);
×
229
    }
×
230

231
    /**
232
     * Load a batch of nanopubs from the Jelly stream.
233
     * <p>
234
     * The method requests the list of all nanopubs from the Registry and reads it for as long
235
     * as it can. If the stream is interrupted, the method will throw an exception, and you
236
     * can resume loading from the last known counter.
237
     *
238
     * @param afterCounter the last known nanopub counter to have been committed in the DB
239
     * @param type         the type of loading operation (initial or update)
240
     */
241
    static void loadBatch(long afterCounter, LoadingType type) {
242
        CloseableHttpResponse response;
243
        try {
244
            var request = new HttpGet(makeStreamFetchUrl(afterCounter));
×
245
            response = jellyStreamClient.execute(request);
×
246
        } catch (IOException e) {
×
247
            throw new RuntimeException("Failed to fetch Jelly stream from the Registry (I/O error).", e);
×
248
        }
×
249

250
        int httpStatus = response.getStatusLine().getStatusCode();
×
251
        if (httpStatus < 200 || httpStatus >= 300) {
×
252
            EntityUtils.consumeQuietly(response.getEntity());
×
253
            throw new RuntimeException("Jelly stream HTTP status is not 2xx: " + httpStatus + ".");
×
254
        }
255

256
        try (
257
                var is = response.getEntity().getContent();
×
258
                var npStream = NanopubStream.fromByteStream(is).getAsNanopubs()
×
259
        ) {
260
            AtomicLong checkpointTime = new AtomicLong(System.currentTimeMillis());
×
261
            AtomicLong checkpointCounter = new AtomicLong(lastCommittedCounter);
×
262
            AtomicLong lastSavedCounter = new AtomicLong(lastCommittedCounter);
×
263
            AtomicLong loaded = new AtomicLong(0L);
×
264

265
            npStream.forEach(m -> {
×
266
                if (!m.isSuccess()) throw new RuntimeException("Failed to load " +
×
267
                        "nanopub from Jelly stream. Last known counter: " + lastCommittedCounter,
268
                        m.getException()
×
269
                );
270
                if (m.getCounter() < lastCommittedCounter) {
×
271
                    throw new RuntimeException("Received a nanopub with a counter lower than " +
×
272
                            "the last known counter. Last known counter: " + lastCommittedCounter +
273
                            ", received counter: " + m.getCounter());
×
274
                }
275
                NanopubLoader.load(m.getNanopub(), m.getCounter());
×
276
                if (m.getCounter() % 10 == 0) {
×
277
                    // Save the committed counter only every 10 nanopubs to reduce DB load
278
                    saveCommittedCounter(type);
×
279
                    lastSavedCounter.set(m.getCounter());
×
280
                }
281
                lastCommittedCounter = m.getCounter();
×
282
                loaded.getAndIncrement();
×
283

284
                if (loaded.get() % 50 == 0) {
×
285
                    long currTime = System.currentTimeMillis();
×
286
                    double speed = 50 / ((currTime - checkpointTime.get()) / 1000.0);
×
287
                    log.info("Loading speed: " + String.format("%.2f", speed) +
×
288
                            " np/s. Counter: " + lastCommittedCounter);
289
                    checkpointTime.set(currTime);
×
290
                    checkpointCounter.set(lastCommittedCounter);
×
291
                }
292
            });
×
293
            // Make sure to save the last committed counter at the end of the batch
294
            if (lastCommittedCounter >= lastSavedCounter.get()) {
×
295
                saveCommittedCounter(type);
×
296
            }
297
        } catch (IOException e) {
×
298
            throw new RuntimeException("I/O error while reading the response Jelly stream.", e);
×
299
        } finally {
300
            try {
301
                response.close();
×
302
            } catch (IOException e) {
×
303
                log.info("Failed to close the Jelly stream response.");
×
304
            }
×
305
        }
306
    }
×
307

308
    /**
309
     * Save the last committed counter to the DB. Do this every N nanopubs to reduce DB load.
310
     * Remember to call this method at the end of the batch as well.
311
     *
312
     * @param type the type of loading operation (initial or update)
313
     */
314
    private static void saveCommittedCounter(LoadingType type) {
315
        try {
316
            if (type == LoadingType.INITIAL) {
×
317
                StatusController.get().setLoadingInitial(lastCommittedCounter);
×
318
            } else {
319
                StatusController.get().setLoadingUpdates(lastCommittedCounter);
×
320
            }
321
        } catch (Exception e) {
×
322
            throw new RuntimeException("Could not update the nanopub counter in DB", e);
×
323
        }
×
324
    }
×
325

326
    /**
327
     * Set the last known setup ID. Called from MainVerticle on startup to restore persisted state.
328
     *
329
     * @param setupId the setup ID to set, or null if not known
330
     */
331
    static void setLastKnownSetupId(Long setupId) {
332
        lastKnownSetupId = setupId;
×
333
    }
×
334

335
    /**
336
     * Update the cached metadata fields used for forwarding to clients.
337
     */
338
    private static void updateForwardingMetadata(RegistryMetadata metadata) {
339
        lastCoverageTypes = metadata.coverageTypes();
6✔
340
        lastCoverageAgents = metadata.coverageAgents();
6✔
341
        lastTestInstance = metadata.testInstance();
6✔
342
        lastNanopubCount = metadata.nanopubCount();
6✔
343
    }
2✔
344

345
    /**
346
     * Run a HEAD request to the Registry to fetch its current metadata (load counter and setup ID).
347
     *
348
     * @return the registry metadata
349
     */
350
    static RegistryMetadata fetchRegistryMetadata() {
351
        int tries = 0;
4✔
352
        RegistryMetadata metadata = null;
4✔
353
        while (metadata == null && tries < MAX_RETRIES_METADATA) {
10!
354
            try {
355
                metadata = fetchRegistryMetadataInner();
4✔
356
            } catch (Exception e) {
×
357
                tries++;
×
358
                log.info("Failed to fetch registry metadata, try " + tries +
×
359
                        ". Retrying in {}ms...", RETRY_DELAY_METADATA);
×
360
                log.info("Failure Reason: ", e);
×
361
                try {
362
                    Thread.sleep(RETRY_DELAY_METADATA);
×
363
                } catch (InterruptedException e2) {
×
364
                    throw new RuntimeException(
×
365
                            "Interrupted while waiting to retry fetching registry metadata.");
366
                }
×
367
            }
2✔
368
        }
369
        if (metadata == null) {
4!
370
            throw new RuntimeException("Failed to fetch registry metadata after " +
×
371
                    MAX_RETRIES_METADATA + " retries.");
372
        }
373
        return metadata;
4✔
374
    }
375

376
    /**
377
     * Inner logic for fetching the registry metadata via HEAD request.
378
     *
379
     * @return the registry metadata (load counter and setup ID)
380
     * @throws IOException if the HTTP request fails
381
     */
382
    private static RegistryMetadata fetchRegistryMetadataInner() throws IOException {
383
        var request = new HttpHead(registryUrl);
10✔
384
        try (var response = metadataClient.execute(request)) {
8✔
385
            int status = response.getStatusLine().getStatusCode();
8✔
386
            EntityUtils.consumeQuietly(response.getEntity());
6✔
387
            if (status < 200 || status >= 300) {
12!
388
                throw new RuntimeException("Registry metadata HTTP status is not 2xx: " +
×
389
                        status + ".");
390
            }
391

392
            // Check if the registry is ready
393
            var hStatus = response.getHeaders("Nanopub-Registry-Status");
8✔
394
            if (hStatus.length == 0) {
6!
395
                throw new RuntimeException("Registry did not return a Nanopub-Registry-Status header.");
×
396
            }
397
            if (!"ready".equals(hStatus[0].getValue()) && !"updating".equals(hStatus[0].getValue())) {
14!
398
                throw new RuntimeException("Registry is not in ready state.");
×
399
            }
400

401
            // Get the load counter
402
            var hCounter = response.getHeaders("Nanopub-Registry-Load-Counter");
8✔
403
            if (hCounter.length == 0) {
6!
404
                throw new RuntimeException("Registry did not return a Nanopub-Registry-Load-Counter header.");
×
405
            }
406
            long loadCounter = Long.parseLong(hCounter[0].getValue());
12✔
407

408
            // Get the setup ID (optional — older registries may not have it)
409
            Long setupId = null;
4✔
410
            var hSetupId = response.getHeaders("Nanopub-Registry-Setup-Id");
8✔
411
            if (hSetupId.length > 0) {
6!
412
                try {
413
                    setupId = Long.parseLong(hSetupId[0].getValue());
14✔
414
                } catch (NumberFormatException e) {
×
415
                    log.info("Could not parse Nanopub-Registry-Setup-Id header: {}", hSetupId[0].getValue());
×
416
                }
2✔
417
            }
418

419
            // Read metadata headers for forwarding to clients
420
            String coverageTypes = getHeaderValue(response, "Nanopub-Registry-Coverage-Types");
8✔
421
            String coverageAgents = getHeaderValue(response, "Nanopub-Registry-Coverage-Agents");
8✔
422
            String testInstance = getHeaderValue(response, "Nanopub-Registry-Test-Instance");
8✔
423
            String nanopubCount = getHeaderValue(response, "Nanopub-Registry-Nanopub-Count");
8✔
424
            // Optional — older registries (without trust calculation) won't set this header.
425
            String trustStateHash = getHeaderValue(response, "Nanopub-Registry-Trust-State-Hash");
8✔
426

427
            return new RegistryMetadata(loadCounter, setupId, coverageTypes, coverageAgents,
26✔
428
                    testInstance, nanopubCount, trustStateHash);
429
        }
430
    }
431

432
    private static String getHeaderValue(CloseableHttpResponse response, String name) {
433
        var headers = response.getHeaders(name);
8✔
434
        return headers.length > 0 ? headers[0].getValue() : null;
18!
435
    }
436

437
    /**
438
     * Construct the URL for fetching the Jelly stream.
439
     *
440
     * @param afterCounter the last known counter to have been committed in the DB
441
     * @return the URL for fetching the Jelly stream
442
     */
443
    private static String makeStreamFetchUrl(long afterCounter) {
444
        return registryUrl + "nanopubs.jelly?afterCounter=" + afterCounter;
×
445
    }
446
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc