• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-query / 24130021648

08 Apr 2026 10:11AM UTC coverage: 66.468% (-0.5%) from 66.922%
24130021648

push

github

tkuhn
feat: align with registry seqNum API and forward registry metadata headers

Migrate from afterCounter to afterSeqNum query parameter (with fallback
for older registries). Prefer Nanopub-Registry-SeqNum header over
Load-Counter for sync cursor. Forward coverage types (defaulting to
"all"), test instance, and nanopub count from registry to clients.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

233 of 390 branches covered (59.74%)

Branch coverage included in aggregate %.

659 of 952 relevant lines covered (69.22%)

10.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

29.02
src/main/java/com/knowledgepixels/query/JellyNanopubLoader.java
1
package com.knowledgepixels.query;
2

3
import java.io.IOException;
4
import java.util.concurrent.atomic.AtomicLong;
5

6
import org.apache.http.client.methods.CloseableHttpResponse;
7
import org.apache.http.client.methods.HttpGet;
8
import org.apache.http.client.methods.HttpHead;
9
import org.apache.http.impl.client.CloseableHttpClient;
10
import org.apache.http.impl.client.HttpClientBuilder;
11
import org.apache.http.util.EntityUtils;
12
import org.nanopub.NanopubUtils;
13
import org.nanopub.jelly.NanopubStream;
14
import org.slf4j.Logger;
15
import org.slf4j.LoggerFactory;
16

17
/**
18
 * Loads nanopubs from the attached Nanopub Registry via a restartable Jelly stream.
19
 */
20
public class JellyNanopubLoader {
×
21
    static final String registryUrl;
22
    private static long lastCommittedCounter = -1;
6✔
23
    private static Long lastKnownSetupId = null;
6✔
24
    // Latest registry metadata fields, updated on each metadata fetch and forwarded to clients
25
    static volatile String lastCoverageTypes = null;
6✔
26
    static volatile String lastTestInstance = null;
6✔
27
    static volatile String lastNanopubCount = null;
6✔
28
    private static final CloseableHttpClient metadataClient;
29
    private static final CloseableHttpClient jellyStreamClient;
30

31
    private static final int MAX_RETRIES_METADATA = 10;
32
    private static final int RETRY_DELAY_METADATA = 3000;
33
    private static final int RETRY_DELAY_JELLY = 5000;
34

35
    private static final Logger log = LoggerFactory.getLogger(JellyNanopubLoader.class);
9✔
36

37
    /**
38
     * Registry metadata returned by a HEAD request.
39
     */
40
    record RegistryMetadata(long loadCounter, Long setupId, String coverageTypes,
54✔
41
                            String testInstance, String nanopubCount) {}
42

43
    /**
44
     * The interval in milliseconds at which the updates loader should poll for new nanopubs.
45
     */
46
    public static final int UPDATES_POLL_INTERVAL = 2000;
47

48
    enum LoadingType {
×
49
        INITIAL,
×
50
        UPDATE,
×
51
    }
52

53
    static {
54
        // Initialize registryUrl
55
        var url = Utils.getEnvString(
12✔
56
                "REGISTRY_FIXED_URL", "https://registry.knowledgepixels.com/"
57
        );
58
        if (!url.endsWith("/")) url += "/";
12!
59
        registryUrl = url;
6✔
60

61
        metadataClient = HttpClientBuilder.create().setDefaultRequestConfig(Utils.getHttpRequestConfig()).build();
15✔
62
        jellyStreamClient = NanopubUtils.getHttpClient();
6✔
63
    }
3✔
64

65
    /**
66
     * Start or continue (after restart) the initial loading procedure. This simply loads all
67
     * nanopubs from the attached Registry.
68
     *
69
     * @param afterCounter which counter to start from (-1 for the beginning)
70
     */
71
    public static void loadInitial(long afterCounter) {
72
        RegistryMetadata metadata = fetchRegistryMetadata();
6✔
73
        updateForwardingMetadata(metadata);
6✔
74
        long targetCounter = metadata.loadCounter();
9✔
75
        log.info("Fetched Registry seqNum: {}", targetCounter);
15✔
76
        // Store setupId on initial load
77
        if (metadata.setupId() != null && lastKnownSetupId == null) {
9!
78
            lastKnownSetupId = metadata.setupId();
×
79
            StatusController.get().setRegistrySetupId(metadata.setupId());
×
80
        }
81
        lastCommittedCounter = afterCounter;
6✔
82
        while (lastCommittedCounter < targetCounter) {
12!
83
            try {
84
                loadBatch(lastCommittedCounter, LoadingType.INITIAL);
×
85
                log.info("Initial load: loaded batch up to counter {}", lastCommittedCounter);
×
86
            } catch (Exception e) {
×
87
                log.info("Failed to load batch starting from counter {}", lastCommittedCounter);
×
88
                log.info("Failure reason: ", e);
×
89
                try {
90
                    Thread.sleep(RETRY_DELAY_JELLY);
×
91
                } catch (InterruptedException e2) {
×
92
                    throw new RuntimeException("Interrupted while waiting to retry loading batch.");
×
93
                }
×
94
            }
×
95
        }
96
        log.info("Initial load complete.");
9✔
97
    }
3✔
98

99
    /**
100
     * Check if the Registry has any new nanopubs. If it does, load them.
101
     * This method should be called periodically, and you should wait for it to finish before
102
     * calling it again.
103
     */
104
    public static void loadUpdates() {
105
        try {
106
            final var status = StatusController.get().getState();
×
107
            lastCommittedCounter = status.loadCounter;
×
108
            RegistryMetadata metadata = fetchRegistryMetadata();
×
109
            updateForwardingMetadata(metadata);
×
110
            long targetCounter = metadata.loadCounter();
×
111
            Long currentSetupId = metadata.setupId();
×
112

113
            // Detect reset via setupId change
114
            if (lastKnownSetupId != null && currentSetupId != null
×
115
                    && !lastKnownSetupId.equals(currentSetupId)) {
×
116
                log.warn("Registry reset detected: setupId {} -> {}", lastKnownSetupId, currentSetupId);
×
117
                performResync(currentSetupId);
×
118
                return;
×
119
            }
120
            // Detect reset via counter decrease (also covers first run after upgrade
121
            // where no setupId was persisted yet but the registry has already been reset)
122
            if (lastCommittedCounter > 0 && targetCounter >= 0
×
123
                    && targetCounter < lastCommittedCounter) {
124
                log.warn("Registry counter decreased {} -> {}, triggering resync",
×
125
                        lastCommittedCounter, targetCounter);
×
126
                performResync(currentSetupId);
×
127
                return;
×
128
            }
129

130
            // Update lastKnownSetupId on first successful poll
131
            if (currentSetupId != null && lastKnownSetupId == null) {
×
132
                if (lastCommittedCounter > 0) {
×
133
                    // Upgrade from a version without setupId tracking. The DB has data but
134
                    // we can't verify it matches the current registry. Force a resync.
135
                    log.warn("No stored setupId but DB has data (counter: {}). "
×
136
                            + "Forcing resync to ensure data consistency.", lastCommittedCounter);
×
137
                    performResync(currentSetupId);
×
138
                    return;
×
139
                }
140
                lastKnownSetupId = currentSetupId;
×
141
                StatusController.get().setRegistrySetupId(currentSetupId);
×
142
            }
143

144
            StatusController.get().setLoadingUpdates(status.loadCounter);
×
145
            if (lastCommittedCounter >= targetCounter) {
×
146
                StatusController.get().setReady();
×
147
                return;
×
148
            }
149
            loadBatch(lastCommittedCounter, LoadingType.UPDATE);
×
150
            log.info("Loaded {} update(s). Counter: {}, target was: {}",
×
151
                    lastCommittedCounter - status.loadCounter, lastCommittedCounter, targetCounter);
×
152
            if (lastCommittedCounter < targetCounter) {
×
153
                log.info("Warning: expected to load nanopubs up to (inclusive) counter " +
×
154
                        targetCounter + " based on the counter reported in Registry's headers, " +
155
                        "but loaded only up to {}.", lastCommittedCounter);
×
156
            }
157
        } catch (Exception e) {
×
158
            log.info("Failed to load updates. Current counter: {}", lastCommittedCounter);
×
159
            log.info("Failure Reason: ", e);
×
160
        } finally {
161
            try {
162
                StatusController.get().setReady();
×
163
            } catch (Exception e) {
×
164
                log.info("Update loader: failed to set status to READY.");
×
165
                log.info("Failure Reason: ", e);
×
166
            }
×
167
        }
168
    }
×
169

170
    /**
171
     * Re-stream all nanopubs from the registry after a reset is detected.
172
     * Existing nanopubs are skipped by NanopubLoader's per-repo dedup.
173
     *
174
     * @param newSetupId the new setup ID from the registry, or null if unknown
175
     */
176
    private static void performResync(Long newSetupId) {
177
        log.warn("Starting resync with registry. New setupId: {}", newSetupId);
×
178
        StatusController.get().setResetting();
×
179
        lastKnownSetupId = newSetupId;
×
180
        if (newSetupId != null) {
×
181
            StatusController.get().setRegistrySetupId(newSetupId);
×
182
        }
183
        StatusController.get().setLoadingInitial(-1);
×
184
        loadInitial(-1);
×
185
        StatusController.get().setReady();
×
186
        log.warn("Resync complete. Counter: {}", lastCommittedCounter);
×
187
    }
×
188

189
    /**
190
     * Load a batch of nanopubs from the Jelly stream.
191
     * <p>
192
     * The method requests the list of all nanopubs from the Registry and reads it for as long
193
     * as it can. If the stream is interrupted, the method will throw an exception, and you
194
     * can resume loading from the last known counter.
195
     *
196
     * @param afterCounter the last known nanopub counter to have been committed in the DB
197
     * @param type         the type of loading operation (initial or update)
198
     */
199
    static void loadBatch(long afterCounter, LoadingType type) {
200
        CloseableHttpResponse response;
201
        try {
202
            var request = new HttpGet(makeStreamFetchUrl(afterCounter));
×
203
            response = jellyStreamClient.execute(request);
×
204
        } catch (IOException e) {
×
205
            throw new RuntimeException("Failed to fetch Jelly stream from the Registry (I/O error).", e);
×
206
        }
×
207

208
        int httpStatus = response.getStatusLine().getStatusCode();
×
209
        if (httpStatus < 200 || httpStatus >= 300) {
×
210
            EntityUtils.consumeQuietly(response.getEntity());
×
211
            throw new RuntimeException("Jelly stream HTTP status is not 2xx: " + httpStatus + ".");
×
212
        }
213

214
        try (
215
                var is = response.getEntity().getContent();
×
216
                var npStream = NanopubStream.fromByteStream(is).getAsNanopubs()
×
217
        ) {
218
            AtomicLong checkpointTime = new AtomicLong(System.currentTimeMillis());
×
219
            AtomicLong checkpointCounter = new AtomicLong(lastCommittedCounter);
×
220
            AtomicLong lastSavedCounter = new AtomicLong(lastCommittedCounter);
×
221
            AtomicLong loaded = new AtomicLong(0L);
×
222

223
            npStream.forEach(m -> {
×
224
                if (!m.isSuccess()) throw new RuntimeException("Failed to load " +
×
225
                        "nanopub from Jelly stream. Last known counter: " + lastCommittedCounter,
226
                        m.getException()
×
227
                );
228
                if (m.getCounter() < lastCommittedCounter) {
×
229
                    throw new RuntimeException("Received a nanopub with a counter lower than " +
×
230
                            "the last known counter. Last known counter: " + lastCommittedCounter +
231
                            ", received counter: " + m.getCounter());
×
232
                }
233
                NanopubLoader.load(m.getNanopub(), m.getCounter());
×
234
                if (m.getCounter() % 10 == 0) {
×
235
                    // Save the committed counter only every 10 nanopubs to reduce DB load
236
                    saveCommittedCounter(type);
×
237
                    lastSavedCounter.set(m.getCounter());
×
238
                }
239
                lastCommittedCounter = m.getCounter();
×
240
                loaded.getAndIncrement();
×
241

242
                if (loaded.get() % 50 == 0) {
×
243
                    long currTime = System.currentTimeMillis();
×
244
                    double speed = 50 / ((currTime - checkpointTime.get()) / 1000.0);
×
245
                    log.info("Loading speed: " + String.format("%.2f", speed) +
×
246
                            " np/s. Counter: " + lastCommittedCounter);
247
                    checkpointTime.set(currTime);
×
248
                    checkpointCounter.set(lastCommittedCounter);
×
249
                }
250
            });
×
251
            // Make sure to save the last committed counter at the end of the batch
252
            if (lastCommittedCounter >= lastSavedCounter.get()) {
×
253
                saveCommittedCounter(type);
×
254
            }
255
        } catch (IOException e) {
×
256
            throw new RuntimeException("I/O error while reading the response Jelly stream.", e);
×
257
        } finally {
258
            try {
259
                response.close();
×
260
            } catch (IOException e) {
×
261
                log.info("Failed to close the Jelly stream response.");
×
262
            }
×
263
        }
264
    }
×
265

266
    /**
267
     * Save the last committed counter to the DB. Do this every N nanopubs to reduce DB load.
268
     * Remember to call this method at the end of the batch as well.
269
     *
270
     * @param type the type of loading operation (initial or update)
271
     */
272
    private static void saveCommittedCounter(LoadingType type) {
273
        try {
274
            if (type == LoadingType.INITIAL) {
×
275
                StatusController.get().setLoadingInitial(lastCommittedCounter);
×
276
            } else {
277
                StatusController.get().setLoadingUpdates(lastCommittedCounter);
×
278
            }
279
        } catch (Exception e) {
×
280
            throw new RuntimeException("Could not update the nanopub counter in DB", e);
×
281
        }
×
282
    }
×
283

284
    /**
285
     * Set the last known setup ID. Called from MainVerticle on startup to restore persisted state.
286
     *
287
     * @param setupId the setup ID to set, or null if not known
288
     */
289
    static void setLastKnownSetupId(Long setupId) {
290
        lastKnownSetupId = setupId;
×
291
    }
×
292

293
    /**
294
     * Update the cached metadata fields used for forwarding to clients.
295
     */
296
    private static void updateForwardingMetadata(RegistryMetadata metadata) {
297
        lastCoverageTypes = metadata.coverageTypes();
9✔
298
        lastTestInstance = metadata.testInstance();
9✔
299
        lastNanopubCount = metadata.nanopubCount();
9✔
300
    }
3✔
301

302
    /**
303
     * Run a HEAD request to the Registry to fetch its current metadata (seqNum and setup ID).
304
     *
305
     * @return the registry metadata
306
     */
307
    static RegistryMetadata fetchRegistryMetadata() {
308
        int tries = 0;
6✔
309
        RegistryMetadata metadata = null;
6✔
310
        while (metadata == null && tries < MAX_RETRIES_METADATA) {
15!
311
            try {
312
                metadata = fetchRegistryMetadataInner();
6✔
313
            } catch (Exception e) {
×
314
                tries++;
×
315
                log.info("Failed to fetch registry metadata, try " + tries +
×
316
                        ". Retrying in {}ms...", RETRY_DELAY_METADATA);
×
317
                log.info("Failure Reason: ", e);
×
318
                try {
319
                    Thread.sleep(RETRY_DELAY_METADATA);
×
320
                } catch (InterruptedException e2) {
×
321
                    throw new RuntimeException(
×
322
                            "Interrupted while waiting to retry fetching registry metadata.");
323
                }
×
324
            }
3✔
325
        }
326
        if (metadata == null) {
6!
327
            throw new RuntimeException("Failed to fetch registry metadata after " +
×
328
                    MAX_RETRIES_METADATA + " retries.");
329
        }
330
        return metadata;
6✔
331
    }
332

333
    /**
334
     * Inner logic for fetching the registry metadata via HEAD request.
335
     *
336
     * @return the registry metadata (load counter and setup ID)
337
     * @throws IOException if the HTTP request fails
338
     */
339
    private static RegistryMetadata fetchRegistryMetadataInner() throws IOException {
340
        var request = new HttpHead(registryUrl);
15✔
341
        try (var response = metadataClient.execute(request)) {
12✔
342
            int status = response.getStatusLine().getStatusCode();
12✔
343
            EntityUtils.consumeQuietly(response.getEntity());
9✔
344
            if (status < 200 || status >= 300) {
18!
345
                throw new RuntimeException("Registry metadata HTTP status is not 2xx: " +
×
346
                        status + ".");
347
            }
348

349
            // Check if the registry is ready
350
            var hStatus = response.getHeaders("Nanopub-Registry-Status");
12✔
351
            if (hStatus.length == 0) {
9!
352
                throw new RuntimeException("Registry did not return a Nanopub-Registry-Status header.");
×
353
            }
354
            if (!"ready".equals(hStatus[0].getValue()) && !"updating".equals(hStatus[0].getValue())) {
42!
355
                throw new RuntimeException("Registry is not in ready state.");
×
356
            }
357

358
            // Get the seqNum (preferred) or load counter (fallback for older registries)
359
            // TODO(transition): Remove Load-Counter fallback after all registries upgraded
360
            Long seqNum = null;
6✔
361
            var hSeqNum = response.getHeaders("Nanopub-Registry-SeqNum");
12✔
362
            if (hSeqNum.length > 0) {
9!
363
                seqNum = Long.parseLong(hSeqNum[0].getValue());
21✔
364
            }
365
            if (seqNum == null) {
6!
366
                var hCounter = response.getHeaders("Nanopub-Registry-Load-Counter");
×
367
                if (hCounter.length > 0) {
×
368
                    seqNum = Long.parseLong(hCounter[0].getValue());
×
369
                }
370
            }
371
            if (seqNum == null) {
6!
372
                throw new RuntimeException("Registry did not return a Nanopub-Registry-SeqNum or Load-Counter header.");
×
373
            }
374

375
            // Get the setup ID (optional — older registries may not have it)
376
            Long setupId = null;
6✔
377
            var hSetupId = response.getHeaders("Nanopub-Registry-Setup-Id");
12✔
378
            if (hSetupId.length > 0) {
9!
379
                try {
380
                    setupId = Long.parseLong(hSetupId[0].getValue());
21✔
381
                } catch (NumberFormatException e) {
×
382
                    log.info("Could not parse Nanopub-Registry-Setup-Id header: {}", hSetupId[0].getValue());
×
383
                }
3✔
384
            }
385

386
            // Read optional metadata headers for forwarding to clients
387
            String coverageTypes = null;
6✔
388
            var hCovTypes = response.getHeaders("Nanopub-Registry-Coverage-Types");
12✔
389
            if (hCovTypes.length > 0) {
9!
390
                coverageTypes = hCovTypes[0].getValue();
×
391
            }
392

393
            String testInstance = null;
6✔
394
            var hTestInstance = response.getHeaders("Nanopub-Registry-Test-Instance");
12✔
395
            if (hTestInstance.length > 0) {
9!
396
                testInstance = hTestInstance[0].getValue();
15✔
397
            }
398

399
            String nanopubCount = null;
6✔
400
            var hNpCount = response.getHeaders("Nanopub-Registry-Nanopub-Count");
12✔
401
            if (hNpCount.length > 0) {
9!
402
                nanopubCount = hNpCount[0].getValue();
15✔
403
            }
404

405
            return new RegistryMetadata(seqNum, setupId, coverageTypes, testInstance, nanopubCount);
36✔
406
        }
407
    }
408

409
    /**
410
     * Construct the URL for fetching the Jelly stream.
411
     *
412
     * @param afterSeqNum the last known seqNum to have been committed in the DB
413
     * @return the URL for fetching the Jelly stream
414
     */
415
    private static String makeStreamFetchUrl(long afterSeqNum) {
416
        // TODO(transition): Remove afterCounter param after all registries upgraded
417
        return registryUrl + "nanopubs.jelly?afterSeqNum=" + afterSeqNum + "&afterCounter=" + afterSeqNum;
×
418
    }
419
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc