• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-query / 23634943841

27 Mar 2026 06:53AM UTC coverage: 66.999% (-0.4%) from 67.361%
23634943841

push

github

tkuhn
fix: detect registry reset on upgrade and add FORCE_RESYNC env var

When upgrading from a version without setupId tracking, the startup code
would adopt the registry's current setupId, preventing reset detection if
the registry counter had already grown past the stored counter. Now the
setupId is left unset on upgrade so that loadUpdates() triggers a resync.

Also adds FORCE_RESYNC=true env var to manually trigger a full re-load
on startup, useful for fixing existing instances with stale data.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

228 of 378 branches covered (60.32%)

Branch coverage included in aggregate %.

645 of 925 relevant lines covered (69.73%)

10.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

25.51
src/main/java/com/knowledgepixels/query/JellyNanopubLoader.java
1
package com.knowledgepixels.query;
2

3
import java.io.IOException;
4
import java.util.concurrent.atomic.AtomicLong;
5

6
import org.apache.http.client.methods.CloseableHttpResponse;
7
import org.apache.http.client.methods.HttpGet;
8
import org.apache.http.client.methods.HttpHead;
9
import org.apache.http.impl.client.CloseableHttpClient;
10
import org.apache.http.impl.client.HttpClientBuilder;
11
import org.apache.http.util.EntityUtils;
12
import org.nanopub.NanopubUtils;
13
import org.nanopub.jelly.NanopubStream;
14
import org.slf4j.Logger;
15
import org.slf4j.LoggerFactory;
16

17
/**
18
 * Loads nanopubs from the attached Nanopub Registry via a restartable Jelly stream.
19
 */
20
public class JellyNanopubLoader {
×
21
    static final String registryUrl;
22
    private static long lastCommittedCounter = -1;
6✔
23
    private static Long lastKnownSetupId = null;
6✔
24
    private static final CloseableHttpClient metadataClient;
25
    private static final CloseableHttpClient jellyStreamClient;
26

27
    private static final int MAX_RETRIES_METADATA = 10;
28
    private static final int RETRY_DELAY_METADATA = 3000;
29
    private static final int RETRY_DELAY_JELLY = 5000;
30

31
    private static final Logger log = LoggerFactory.getLogger(JellyNanopubLoader.class);
9✔
32

33
    /**
34
     * Registry metadata returned by a HEAD request.
35
     */
36
    record RegistryMetadata(long loadCounter, Long setupId) {}
27✔
37

38
    /**
39
     * The interval in milliseconds at which the updates loader should poll for new nanopubs.
40
     */
41
    public static final int UPDATES_POLL_INTERVAL = 2000;
42

43
    enum LoadingType {
×
44
        INITIAL,
×
45
        UPDATE,
×
46
    }
47

48
    static {
49
        // Initialize registryUrl
50
        var url = Utils.getEnvString(
12✔
51
                "REGISTRY_FIXED_URL", "https://registry.knowledgepixels.com/"
52
        );
53
        if (!url.endsWith("/")) url += "/";
12!
54
        registryUrl = url;
6✔
55

56
        metadataClient = HttpClientBuilder.create().setDefaultRequestConfig(Utils.getHttpRequestConfig()).build();
15✔
57
        jellyStreamClient = NanopubUtils.getHttpClient();
6✔
58
    }
3✔
59

60
    /**
61
     * Start or continue (after restart) the initial loading procedure. This simply loads all
62
     * nanopubs from the attached Registry.
63
     *
64
     * @param afterCounter which counter to start from (-1 for the beginning)
65
     */
66
    public static void loadInitial(long afterCounter) {
67
        RegistryMetadata metadata = fetchRegistryMetadata();
6✔
68
        long targetCounter = metadata.loadCounter();
9✔
69
        log.info("Fetched Registry load counter: {}", targetCounter);
15✔
70
        // Store setupId on initial load
71
        if (metadata.setupId() != null && lastKnownSetupId == null) {
9!
72
            lastKnownSetupId = metadata.setupId();
×
73
            StatusController.get().setRegistrySetupId(metadata.setupId());
×
74
        }
75
        lastCommittedCounter = afterCounter;
6✔
76
        while (lastCommittedCounter < targetCounter) {
12!
77
            try {
78
                loadBatch(lastCommittedCounter, LoadingType.INITIAL);
×
79
                log.info("Initial load: loaded batch up to counter {}", lastCommittedCounter);
×
80
            } catch (Exception e) {
×
81
                log.info("Failed to load batch starting from counter {}", lastCommittedCounter);
×
82
                log.info("Failure reason: ", e);
×
83
                try {
84
                    Thread.sleep(RETRY_DELAY_JELLY);
×
85
                } catch (InterruptedException e2) {
×
86
                    throw new RuntimeException("Interrupted while waiting to retry loading batch.");
×
87
                }
×
88
            }
×
89
        }
90
        log.info("Initial load complete.");
9✔
91
    }
3✔
92

93
    /**
94
     * Check if the Registry has any new nanopubs. If it does, load them.
95
     * This method should be called periodically, and you should wait for it to finish before
96
     * calling it again.
97
     */
98
    public static void loadUpdates() {
99
        try {
100
            final var status = StatusController.get().getState();
×
101
            lastCommittedCounter = status.loadCounter;
×
102
            RegistryMetadata metadata = fetchRegistryMetadata();
×
103
            long targetCounter = metadata.loadCounter();
×
104
            Long currentSetupId = metadata.setupId();
×
105

106
            // Detect reset via setupId change
107
            if (lastKnownSetupId != null && currentSetupId != null
×
108
                    && !lastKnownSetupId.equals(currentSetupId)) {
×
109
                log.warn("Registry reset detected: setupId {} -> {}", lastKnownSetupId, currentSetupId);
×
110
                performResync(currentSetupId);
×
111
                return;
×
112
            }
113
            // Detect reset via counter decrease (also covers first run after upgrade
114
            // where no setupId was persisted yet but the registry has already been reset)
115
            if (lastCommittedCounter > 0 && targetCounter >= 0
×
116
                    && targetCounter < lastCommittedCounter) {
117
                log.warn("Registry counter decreased {} -> {}, triggering resync",
×
118
                        lastCommittedCounter, targetCounter);
×
119
                performResync(currentSetupId);
×
120
                return;
×
121
            }
122

123
            // Update lastKnownSetupId on first successful poll
124
            if (currentSetupId != null && lastKnownSetupId == null) {
×
125
                if (lastCommittedCounter > 0) {
×
126
                    // Upgrade from a version without setupId tracking. The DB has data but
127
                    // we can't verify it matches the current registry. Force a resync.
128
                    log.warn("No stored setupId but DB has data (counter: {}). "
×
129
                            + "Forcing resync to ensure data consistency.", lastCommittedCounter);
×
130
                    performResync(currentSetupId);
×
131
                    return;
×
132
                }
133
                lastKnownSetupId = currentSetupId;
×
134
                StatusController.get().setRegistrySetupId(currentSetupId);
×
135
            }
136

137
            StatusController.get().setLoadingUpdates(status.loadCounter);
×
138
            if (lastCommittedCounter >= targetCounter) {
×
139
                StatusController.get().setReady();
×
140
                return;
×
141
            }
142
            loadBatch(lastCommittedCounter, LoadingType.UPDATE);
×
143
            log.info("Loaded {} update(s). Counter: {}, target was: {}",
×
144
                    lastCommittedCounter - status.loadCounter, lastCommittedCounter, targetCounter);
×
145
            if (lastCommittedCounter < targetCounter) {
×
146
                log.info("Warning: expected to load nanopubs up to (inclusive) counter " +
×
147
                        targetCounter + " based on the counter reported in Registry's headers, " +
148
                        "but loaded only up to {}.", lastCommittedCounter);
×
149
            }
150
        } catch (Exception e) {
×
151
            log.info("Failed to load updates. Current counter: {}", lastCommittedCounter);
×
152
            log.info("Failure Reason: ", e);
×
153
        } finally {
154
            try {
155
                StatusController.get().setReady();
×
156
            } catch (Exception e) {
×
157
                log.info("Update loader: failed to set status to READY.");
×
158
                log.info("Failure Reason: ", e);
×
159
            }
×
160
        }
161
    }
×
162

163
    /**
164
     * Re-stream all nanopubs from the registry after a reset is detected.
165
     * Existing nanopubs are skipped by NanopubLoader's per-repo dedup.
166
     *
167
     * @param newSetupId the new setup ID from the registry, or null if unknown
168
     */
169
    private static void performResync(Long newSetupId) {
170
        log.warn("Starting resync with registry. New setupId: {}", newSetupId);
×
171
        StatusController.get().setResetting();
×
172
        lastKnownSetupId = newSetupId;
×
173
        if (newSetupId != null) {
×
174
            StatusController.get().setRegistrySetupId(newSetupId);
×
175
        }
176
        StatusController.get().setLoadingInitial(-1);
×
177
        loadInitial(-1);
×
178
        StatusController.get().setReady();
×
179
        log.warn("Resync complete. Counter: {}", lastCommittedCounter);
×
180
    }
×
181

182
    /**
183
     * Load a batch of nanopubs from the Jelly stream.
184
     * <p>
185
     * The method requests the list of all nanopubs from the Registry and reads it for as long
186
     * as it can. If the stream is interrupted, the method will throw an exception, and you
187
     * can resume loading from the last known counter.
188
     *
189
     * @param afterCounter the last known nanopub counter to have been committed in the DB
190
     * @param type         the type of loading operation (initial or update)
191
     */
192
    static void loadBatch(long afterCounter, LoadingType type) {
193
        CloseableHttpResponse response;
194
        try {
195
            var request = new HttpGet(makeStreamFetchUrl(afterCounter));
×
196
            response = jellyStreamClient.execute(request);
×
197
        } catch (IOException e) {
×
198
            throw new RuntimeException("Failed to fetch Jelly stream from the Registry (I/O error).", e);
×
199
        }
×
200

201
        int httpStatus = response.getStatusLine().getStatusCode();
×
202
        if (httpStatus < 200 || httpStatus >= 300) {
×
203
            EntityUtils.consumeQuietly(response.getEntity());
×
204
            throw new RuntimeException("Jelly stream HTTP status is not 2xx: " + httpStatus + ".");
×
205
        }
206

207
        try (
208
                var is = response.getEntity().getContent();
×
209
                var npStream = NanopubStream.fromByteStream(is).getAsNanopubs()
×
210
        ) {
211
            AtomicLong checkpointTime = new AtomicLong(System.currentTimeMillis());
×
212
            AtomicLong checkpointCounter = new AtomicLong(lastCommittedCounter);
×
213
            AtomicLong lastSavedCounter = new AtomicLong(lastCommittedCounter);
×
214
            AtomicLong loaded = new AtomicLong(0L);
×
215

216
            npStream.forEach(m -> {
×
217
                if (!m.isSuccess()) throw new RuntimeException("Failed to load " +
×
218
                        "nanopub from Jelly stream. Last known counter: " + lastCommittedCounter,
219
                        m.getException()
×
220
                );
221
                if (m.getCounter() < lastCommittedCounter) {
×
222
                    throw new RuntimeException("Received a nanopub with a counter lower than " +
×
223
                            "the last known counter. Last known counter: " + lastCommittedCounter +
224
                            ", received counter: " + m.getCounter());
×
225
                }
226
                NanopubLoader.load(m.getNanopub(), m.getCounter());
×
227
                if (m.getCounter() % 10 == 0) {
×
228
                    // Save the committed counter only every 10 nanopubs to reduce DB load
229
                    saveCommittedCounter(type);
×
230
                    lastSavedCounter.set(m.getCounter());
×
231
                }
232
                lastCommittedCounter = m.getCounter();
×
233
                loaded.getAndIncrement();
×
234

235
                if (loaded.get() % 50 == 0) {
×
236
                    long currTime = System.currentTimeMillis();
×
237
                    double speed = 50 / ((currTime - checkpointTime.get()) / 1000.0);
×
238
                    log.info("Loading speed: " + String.format("%.2f", speed) +
×
239
                            " np/s. Counter: " + lastCommittedCounter);
240
                    checkpointTime.set(currTime);
×
241
                    checkpointCounter.set(lastCommittedCounter);
×
242
                }
243
            });
×
244
            // Make sure to save the last committed counter at the end of the batch
245
            if (lastCommittedCounter >= lastSavedCounter.get()) {
×
246
                saveCommittedCounter(type);
×
247
            }
248
        } catch (IOException e) {
×
249
            throw new RuntimeException("I/O error while reading the response Jelly stream.", e);
×
250
        } finally {
251
            try {
252
                response.close();
×
253
            } catch (IOException e) {
×
254
                log.info("Failed to close the Jelly stream response.");
×
255
            }
×
256
        }
257
    }
×
258

259
    /**
260
     * Save the last committed counter to the DB. Do this every N nanopubs to reduce DB load.
261
     * Remember to call this method at the end of the batch as well.
262
     *
263
     * @param type the type of loading operation (initial or update)
264
     */
265
    private static void saveCommittedCounter(LoadingType type) {
266
        try {
267
            if (type == LoadingType.INITIAL) {
×
268
                StatusController.get().setLoadingInitial(lastCommittedCounter);
×
269
            } else {
270
                StatusController.get().setLoadingUpdates(lastCommittedCounter);
×
271
            }
272
        } catch (Exception e) {
×
273
            throw new RuntimeException("Could not update the nanopub counter in DB", e);
×
274
        }
×
275
    }
×
276

277
    /**
278
     * Set the last known setup ID. Called from MainVerticle on startup to restore persisted state.
279
     *
280
     * @param setupId the setup ID to set, or null if not known
281
     */
282
    static void setLastKnownSetupId(Long setupId) {
283
        lastKnownSetupId = setupId;
×
284
    }
×
285

286
    /**
287
     * Run a HEAD request to the Registry to fetch its current metadata (load counter and setup ID).
288
     *
289
     * @return the registry metadata
290
     */
291
    static RegistryMetadata fetchRegistryMetadata() {
292
        int tries = 0;
6✔
293
        RegistryMetadata metadata = null;
6✔
294
        while (metadata == null && tries < MAX_RETRIES_METADATA) {
15!
295
            try {
296
                metadata = fetchRegistryMetadataInner();
4✔
297
            } catch (Exception e) {
1✔
298
                tries++;
1✔
299
                log.info("Failed to fetch registry metadata, try " + tries +
5✔
300
                        ". Retrying in {}ms...", RETRY_DELAY_METADATA);
1✔
301
                log.info("Failure Reason: ", e);
4✔
302
                try {
303
                    Thread.sleep(RETRY_DELAY_METADATA);
2✔
304
                } catch (InterruptedException e2) {
×
305
                    throw new RuntimeException(
×
306
                            "Interrupted while waiting to retry fetching registry metadata.");
307
                }
1✔
308
            }
3✔
309
        }
310
        if (metadata == null) {
6!
311
            throw new RuntimeException("Failed to fetch registry metadata after " +
5✔
312
                    MAX_RETRIES_METADATA + " retries.");
313
        }
314
        return metadata;
4✔
315
    }
316

317
    /**
318
     * Inner logic for fetching the registry metadata via HEAD request.
319
     *
320
     * @return the registry metadata (load counter and setup ID)
321
     * @throws IOException if the HTTP request fails
322
     */
323
    private static RegistryMetadata fetchRegistryMetadataInner() throws IOException {
324
        var request = new HttpHead(registryUrl);
15✔
325
        try (var response = metadataClient.execute(request)) {
8✔
326
            int status = response.getStatusLine().getStatusCode();
8✔
327
            EntityUtils.consumeQuietly(response.getEntity());
6✔
328
            if (status < 200 || status >= 300) {
12!
329
                throw new RuntimeException("Registry metadata HTTP status is not 2xx: " +
×
330
                        status + ".");
331
            }
332

333
            // Check if the registry is ready
334
            var hStatus = response.getHeaders("Nanopub-Registry-Status");
8✔
335
            if (hStatus.length == 0) {
6!
336
                throw new RuntimeException("Registry did not return a Nanopub-Registry-Status header.");
×
337
            }
338
            if (!"ready".equals(hStatus[0].getValue()) && !"updating".equals(hStatus[0].getValue())) {
14!
339
                throw new RuntimeException("Registry is not in ready state.");
×
340
            }
341

342
            // Get the load counter
343
            var hCounter = response.getHeaders("Nanopub-Registry-Load-Counter");
8✔
344
            if (hCounter.length == 0) {
6!
345
                throw new RuntimeException("Registry did not return a Nanopub-Registry-Load-Counter header.");
×
346
            }
347
            long loadCounter = Long.parseLong(hCounter[0].getValue());
12✔
348

349
            // Get the setup ID (optional — older registries may not have it)
350
            Long setupId = null;
4✔
351
            var hSetupId = response.getHeaders("Nanopub-Registry-Setup-Id");
8✔
352
            if (hSetupId.length > 0) {
6!
353
                try {
354
                    setupId = Long.parseLong(hSetupId[0].getValue());
14✔
355
                } catch (NumberFormatException e) {
×
356
                    log.info("Could not parse Nanopub-Registry-Setup-Id header: {}", hSetupId[0].getValue());
×
357
                }
2✔
358
            }
359

360
            return new RegistryMetadata(loadCounter, setupId);
16✔
361
        }
362
    }
363

364
    /**
365
     * Construct the URL for fetching the Jelly stream.
366
     *
367
     * @param afterCounter the last known nanopub counter to have been committed in the DB
368
     * @return the URL for fetching the Jelly stream
369
     */
370
    private static String makeStreamFetchUrl(long afterCounter) {
371
        return registryUrl + "nanopubs.jelly?afterCounter=" + afterCounter;
×
372
    }
373
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc