• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-query / 23622439422

26 Mar 2026 11:05PM UTC coverage: 67.082% (-2.4%) from 69.473%
23622439422

push

github

tkuhn
feat: detect and recover from Nanopub Registry resets

When the registry is reset and repopulated, its load counter restarts
from 0 while nanopub-query's stored counter remains high, causing it
to silently serve stale data. This adds detection via the registry's
setupId (Nanopub-Registry-Setup-Id header) and counter decrease, then
re-streams all nanopubs while skipping duplicates via existing per-repo
dedup in NanopubLoader.

- Read setupId from HEAD response alongside existing counter/status
- Persist setupId in admin repo, restore on startup
- Add RESETTING state to StatusController for counter reset
- Trigger resync on setupId change or counter decrease
- Re-stream with dedup preserves existing data, only loads new nanopubs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

227 of 372 branches covered (61.02%)

Branch coverage included in aggregate %.

635 of 913 relevant lines covered (69.55%)

10.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

22.92
src/main/java/com/knowledgepixels/query/JellyNanopubLoader.java
1
package com.knowledgepixels.query;
2

3
import java.io.IOException;
4
import java.util.concurrent.atomic.AtomicLong;
5

6
import org.apache.http.client.methods.CloseableHttpResponse;
7
import org.apache.http.client.methods.HttpGet;
8
import org.apache.http.client.methods.HttpHead;
9
import org.apache.http.impl.client.CloseableHttpClient;
10
import org.apache.http.impl.client.HttpClientBuilder;
11
import org.apache.http.util.EntityUtils;
12
import org.nanopub.NanopubUtils;
13
import org.nanopub.jelly.NanopubStream;
14
import org.slf4j.Logger;
15
import org.slf4j.LoggerFactory;
16

17
/**
18
 * Loads nanopubs from the attached Nanopub Registry via a restartable Jelly stream.
19
 */
20
public class JellyNanopubLoader {
×
21
    private static final String registryUrl;
22
    private static long lastCommittedCounter = -1;
6✔
23
    private static Long lastKnownSetupId = null;
6✔
24
    private static final CloseableHttpClient metadataClient;
25
    private static final CloseableHttpClient jellyStreamClient;
26

27
    private static final int MAX_RETRIES_METADATA = 10;
28
    private static final int RETRY_DELAY_METADATA = 3000;
29
    private static final int RETRY_DELAY_JELLY = 5000;
30

31
    private static final Logger log = LoggerFactory.getLogger(JellyNanopubLoader.class);
9✔
32

33
    /**
34
     * Registry metadata returned by a HEAD request.
35
     */
36
    record RegistryMetadata(long loadCounter, Long setupId) {}
27✔
37

38
    /**
39
     * The interval in milliseconds at which the updates loader should poll for new nanopubs.
40
     */
41
    public static final int UPDATES_POLL_INTERVAL = 2000;
42

43
    enum LoadingType {
×
44
        INITIAL,
×
45
        UPDATE,
×
46
    }
47

48
    static {
49
        // Initialize registryUrl
50
        var url = Utils.getEnvString(
12✔
51
                "REGISTRY_FIXED_URL", "https://registry.knowledgepixels.com/"
52
        );
53
        if (!url.endsWith("/")) url += "/";
12!
54
        registryUrl = url;
6✔
55

56
        metadataClient = HttpClientBuilder.create().setDefaultRequestConfig(Utils.getHttpRequestConfig()).build();
15✔
57
        jellyStreamClient = NanopubUtils.getHttpClient();
6✔
58
    }
3✔
59

60
    /**
61
     * Start or continue (after restart) the initial loading procedure. This simply loads all
62
     * nanopubs from the attached Registry.
63
     *
64
     * @param afterCounter which counter to start from (-1 for the beginning)
65
     */
66
    public static void loadInitial(long afterCounter) {
67
        RegistryMetadata metadata = fetchRegistryMetadata();
6✔
68
        long targetCounter = metadata.loadCounter();
9✔
69
        log.info("Fetched Registry load counter: {}", targetCounter);
15✔
70
        // Store setupId on initial load
71
        if (metadata.setupId() != null && lastKnownSetupId == null) {
9!
72
            lastKnownSetupId = metadata.setupId();
×
73
            StatusController.get().setRegistrySetupId(metadata.setupId());
×
74
        }
75
        lastCommittedCounter = afterCounter;
6✔
76
        while (lastCommittedCounter < targetCounter) {
12!
77
            try {
78
                loadBatch(lastCommittedCounter, LoadingType.INITIAL);
×
79
                log.info("Initial load: loaded batch up to counter {}", lastCommittedCounter);
×
80
            } catch (Exception e) {
×
81
                log.info("Failed to load batch starting from counter {}", lastCommittedCounter);
×
82
                log.info("Failure reason: ", e);
×
83
                try {
84
                    Thread.sleep(RETRY_DELAY_JELLY);
×
85
                } catch (InterruptedException e2) {
×
86
                    throw new RuntimeException("Interrupted while waiting to retry loading batch.");
×
87
                }
×
88
            }
×
89
        }
90
        log.info("Initial load complete.");
9✔
91
    }
3✔
92

93
    /**
94
     * Check if the Registry has any new nanopubs. If it does, load them.
95
     * This method should be called periodically, and you should wait for it to finish before
96
     * calling it again.
97
     */
98
    public static void loadUpdates() {
99
        try {
100
            final var status = StatusController.get().getState();
×
101
            lastCommittedCounter = status.loadCounter;
×
102
            RegistryMetadata metadata = fetchRegistryMetadata();
×
103
            long targetCounter = metadata.loadCounter();
×
104
            Long currentSetupId = metadata.setupId();
×
105

106
            // Detect reset via setupId change
107
            if (lastKnownSetupId != null && currentSetupId != null
×
108
                    && !lastKnownSetupId.equals(currentSetupId)) {
×
109
                log.warn("Registry reset detected: setupId {} -> {}", lastKnownSetupId, currentSetupId);
×
110
                performResync(currentSetupId);
×
111
                return;
×
112
            }
113
            // Detect reset via counter decrease (also covers first run after upgrade
114
            // where no setupId was persisted yet but the registry has already been reset)
115
            if (lastCommittedCounter > 0 && targetCounter >= 0
×
116
                    && targetCounter < lastCommittedCounter) {
117
                log.warn("Registry counter decreased {} -> {}, triggering resync",
×
118
                        lastCommittedCounter, targetCounter);
×
119
                performResync(currentSetupId);
×
120
                return;
×
121
            }
122

123
            // Update lastKnownSetupId on first successful poll
124
            if (currentSetupId != null && lastKnownSetupId == null) {
×
125
                lastKnownSetupId = currentSetupId;
×
126
                StatusController.get().setRegistrySetupId(currentSetupId);
×
127
            }
128

129
            StatusController.get().setLoadingUpdates(status.loadCounter);
×
130
            if (lastCommittedCounter >= targetCounter) {
×
131
                StatusController.get().setReady();
×
132
                return;
×
133
            }
134
            loadBatch(lastCommittedCounter, LoadingType.UPDATE);
×
135
            log.info("Loaded {} update(s). Counter: {}, target was: {}",
×
136
                    lastCommittedCounter - status.loadCounter, lastCommittedCounter, targetCounter);
×
137
            if (lastCommittedCounter < targetCounter) {
×
138
                log.info("Warning: expected to load nanopubs up to (inclusive) counter " +
×
139
                        targetCounter + " based on the counter reported in Registry's headers, " +
140
                        "but loaded only up to {}.", lastCommittedCounter);
×
141
            }
142
        } catch (Exception e) {
×
143
            log.info("Failed to load updates. Current counter: {}", lastCommittedCounter);
×
144
            log.info("Failure Reason: ", e);
×
145
        } finally {
146
            try {
147
                StatusController.get().setReady();
×
148
            } catch (Exception e) {
×
149
                log.info("Update loader: failed to set status to READY.");
×
150
                log.info("Failure Reason: ", e);
×
151
            }
×
152
        }
153
    }
×
154

155
    /**
156
     * Re-stream all nanopubs from the registry after a reset is detected.
157
     * Existing nanopubs are skipped by NanopubLoader's per-repo dedup.
158
     *
159
     * @param newSetupId the new setup ID from the registry, or null if unknown
160
     */
161
    private static void performResync(Long newSetupId) {
162
        log.warn("Starting resync with registry. New setupId: {}", newSetupId);
×
163
        StatusController.get().setResetting();
×
164
        lastKnownSetupId = newSetupId;
×
165
        if (newSetupId != null) {
×
166
            StatusController.get().setRegistrySetupId(newSetupId);
×
167
        }
168
        StatusController.get().setLoadingInitial(-1);
×
169
        loadInitial(-1);
×
170
        StatusController.get().setReady();
×
171
        log.warn("Resync complete. Counter: {}", lastCommittedCounter);
×
172
    }
×
173

174
    /**
175
     * Load a batch of nanopubs from the Jelly stream.
176
     * <p>
177
     * The method requests the list of all nanopubs from the Registry and reads it for as long
178
     * as it can. If the stream is interrupted, the method will throw an exception, and you
179
     * can resume loading from the last known counter.
180
     *
181
     * @param afterCounter the last known nanopub counter to have been committed in the DB
182
     * @param type         the type of loading operation (initial or update)
183
     */
184
    static void loadBatch(long afterCounter, LoadingType type) {
185
        CloseableHttpResponse response;
186
        try {
187
            var request = new HttpGet(makeStreamFetchUrl(afterCounter));
×
188
            response = jellyStreamClient.execute(request);
×
189
        } catch (IOException e) {
×
190
            throw new RuntimeException("Failed to fetch Jelly stream from the Registry (I/O error).", e);
×
191
        }
×
192

193
        int httpStatus = response.getStatusLine().getStatusCode();
×
194
        if (httpStatus < 200 || httpStatus >= 300) {
×
195
            EntityUtils.consumeQuietly(response.getEntity());
×
196
            throw new RuntimeException("Jelly stream HTTP status is not 2xx: " + httpStatus + ".");
×
197
        }
198

199
        try (
200
                var is = response.getEntity().getContent();
×
201
                var npStream = NanopubStream.fromByteStream(is).getAsNanopubs()
×
202
        ) {
203
            AtomicLong checkpointTime = new AtomicLong(System.currentTimeMillis());
×
204
            AtomicLong checkpointCounter = new AtomicLong(lastCommittedCounter);
×
205
            AtomicLong lastSavedCounter = new AtomicLong(lastCommittedCounter);
×
206
            AtomicLong loaded = new AtomicLong(0L);
×
207

208
            npStream.forEach(m -> {
×
209
                if (!m.isSuccess()) throw new RuntimeException("Failed to load " +
×
210
                        "nanopub from Jelly stream. Last known counter: " + lastCommittedCounter,
211
                        m.getException()
×
212
                );
213
                if (m.getCounter() < lastCommittedCounter) {
×
214
                    throw new RuntimeException("Received a nanopub with a counter lower than " +
×
215
                            "the last known counter. Last known counter: " + lastCommittedCounter +
216
                            ", received counter: " + m.getCounter());
×
217
                }
218
                NanopubLoader.load(m.getNanopub(), m.getCounter());
×
219
                if (m.getCounter() % 10 == 0) {
×
220
                    // Save the committed counter only every 10 nanopubs to reduce DB load
221
                    saveCommittedCounter(type);
×
222
                    lastSavedCounter.set(m.getCounter());
×
223
                }
224
                lastCommittedCounter = m.getCounter();
×
225
                loaded.getAndIncrement();
×
226

227
                if (loaded.get() % 50 == 0) {
×
228
                    long currTime = System.currentTimeMillis();
×
229
                    double speed = 50 / ((currTime - checkpointTime.get()) / 1000.0);
×
230
                    log.info("Loading speed: " + String.format("%.2f", speed) +
×
231
                            " np/s. Counter: " + lastCommittedCounter);
232
                    checkpointTime.set(currTime);
×
233
                    checkpointCounter.set(lastCommittedCounter);
×
234
                }
235
            });
×
236
            // Make sure to save the last committed counter at the end of the batch
237
            if (lastCommittedCounter >= lastSavedCounter.get()) {
×
238
                saveCommittedCounter(type);
×
239
            }
240
        } catch (IOException e) {
×
241
            throw new RuntimeException("I/O error while reading the response Jelly stream.", e);
×
242
        } finally {
243
            try {
244
                response.close();
×
245
            } catch (IOException e) {
×
246
                log.info("Failed to close the Jelly stream response.");
×
247
            }
×
248
        }
249
    }
×
250

251
    /**
252
     * Save the last committed counter to the DB. Do this every N nanopubs to reduce DB load.
253
     * Remember to call this method at the end of the batch as well.
254
     *
255
     * @param type the type of loading operation (initial or update)
256
     */
257
    private static void saveCommittedCounter(LoadingType type) {
258
        try {
259
            if (type == LoadingType.INITIAL) {
×
260
                StatusController.get().setLoadingInitial(lastCommittedCounter);
×
261
            } else {
262
                StatusController.get().setLoadingUpdates(lastCommittedCounter);
×
263
            }
264
        } catch (Exception e) {
×
265
            throw new RuntimeException("Could not update the nanopub counter in DB", e);
×
266
        }
×
267
    }
×
268

269
    /**
270
     * Set the last known setup ID. Called from MainVerticle on startup to restore persisted state.
271
     *
272
     * @param setupId the setup ID to set, or null if not known
273
     */
274
    static void setLastKnownSetupId(Long setupId) {
275
        lastKnownSetupId = setupId;
×
276
    }
×
277

278
    /**
279
     * Run a HEAD request to the Registry to fetch its current metadata (load counter and setup ID).
280
     *
281
     * @return the registry metadata
282
     */
283
    static RegistryMetadata fetchRegistryMetadata() {
284
        int tries = 0;
6✔
285
        RegistryMetadata metadata = null;
6✔
286
        while (metadata == null && tries < MAX_RETRIES_METADATA) {
15!
287
            try {
288
                metadata = fetchRegistryMetadataInner();
6✔
289
            } catch (Exception e) {
×
290
                tries++;
×
291
                log.info("Failed to fetch registry metadata, try " + tries +
×
292
                        ". Retrying in {}ms...", RETRY_DELAY_METADATA);
×
293
                log.info("Failure Reason: ", e);
×
294
                try {
295
                    Thread.sleep(RETRY_DELAY_METADATA);
×
296
                } catch (InterruptedException e2) {
×
297
                    throw new RuntimeException(
×
298
                            "Interrupted while waiting to retry fetching registry metadata.");
299
                }
×
300
            }
3✔
301
        }
302
        if (metadata == null) {
6!
303
            throw new RuntimeException("Failed to fetch registry metadata after " +
×
304
                    MAX_RETRIES_METADATA + " retries.");
305
        }
306
        return metadata;
6✔
307
    }
308

309
    /**
310
     * Inner logic for fetching the registry metadata via HEAD request.
311
     *
312
     * @return the registry metadata (load counter and setup ID)
313
     * @throws IOException if the HTTP request fails
314
     */
315
    private static RegistryMetadata fetchRegistryMetadataInner() throws IOException {
316
        var request = new HttpHead(registryUrl);
15✔
317
        try (var response = metadataClient.execute(request)) {
12✔
318
            int status = response.getStatusLine().getStatusCode();
12✔
319
            EntityUtils.consumeQuietly(response.getEntity());
9✔
320
            if (status < 200 || status >= 300) {
18!
321
                throw new RuntimeException("Registry metadata HTTP status is not 2xx: " +
×
322
                        status + ".");
323
            }
324

325
            // Check if the registry is ready
326
            var hStatus = response.getHeaders("Nanopub-Registry-Status");
12✔
327
            if (hStatus.length == 0) {
9!
328
                throw new RuntimeException("Registry did not return a Nanopub-Registry-Status header.");
×
329
            }
330
            if (!"ready".equals(hStatus[0].getValue()) && !"updating".equals(hStatus[0].getValue())) {
21!
331
                throw new RuntimeException("Registry is not in ready state.");
×
332
            }
333

334
            // Get the load counter
335
            var hCounter = response.getHeaders("Nanopub-Registry-Load-Counter");
12✔
336
            if (hCounter.length == 0) {
9!
337
                throw new RuntimeException("Registry did not return a Nanopub-Registry-Load-Counter header.");
×
338
            }
339
            long loadCounter = Long.parseLong(hCounter[0].getValue());
18✔
340

341
            // Get the setup ID (optional — older registries may not have it)
342
            Long setupId = null;
6✔
343
            var hSetupId = response.getHeaders("Nanopub-Registry-Setup-Id");
12✔
344
            if (hSetupId.length > 0) {
9!
345
                try {
346
                    setupId = Long.parseLong(hSetupId[0].getValue());
21✔
347
                } catch (NumberFormatException e) {
×
348
                    log.info("Could not parse Nanopub-Registry-Setup-Id header: {}", hSetupId[0].getValue());
×
349
                }
3✔
350
            }
351

352
            return new RegistryMetadata(loadCounter, setupId);
24✔
353
        }
354
    }
355

356
    /**
357
     * Construct the URL for fetching the Jelly stream.
358
     *
359
     * @param afterCounter the last known nanopub counter to have been committed in the DB
360
     * @return the URL for fetching the Jelly stream
361
     */
362
    private static String makeStreamFetchUrl(long afterCounter) {
363
        return registryUrl + "nanopubs.jelly?afterCounter=" + afterCounter;
×
364
    }
365
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc