• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-query / 24403974009

14 Apr 2026 02:13PM UTC coverage: 66.641% (-0.6%) from 67.247%
24403974009

push

github

tkuhn
doc: rename space ID to space ref and origin to root nanopub

"Space ID" vs "Space IRI" was confusing (similar terms, different
meanings). Renamed the compound identifier to "space ref" (opaque
reference used for indexing) and kept "Space IRI" for the RDF IRI
declared in assertions. Also renamed "origin nanopub" to "root
nanopub" per the existing "root nanopub" terminology, and SPACEIDHASH
to SPACEIRIHASH (more accurate: it's a hash of the Space IRI).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

228 of 380 branches covered (60.0%)

Branch coverage included in aggregate %.

653 of 942 relevant lines covered (69.32%)

3.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.07
src/main/java/com/knowledgepixels/query/JellyNanopubLoader.java
1
package com.knowledgepixels.query;
2

3
import java.io.IOException;
4
import java.util.concurrent.atomic.AtomicLong;
5

6
import org.apache.http.client.methods.CloseableHttpResponse;
7
import org.apache.http.client.methods.HttpGet;
8
import org.apache.http.client.methods.HttpHead;
9
import org.apache.http.impl.client.CloseableHttpClient;
10
import org.apache.http.impl.client.HttpClientBuilder;
11
import org.apache.http.util.EntityUtils;
12
import org.nanopub.NanopubUtils;
13
import org.nanopub.jelly.NanopubStream;
14
import org.slf4j.Logger;
15
import org.slf4j.LoggerFactory;
16

17
/**
18
 * Loads nanopubs from the attached Nanopub Registry via a restartable Jelly stream.
19
 */
20
public class JellyNanopubLoader {
×
21
    static final String registryUrl;
22
    private static long lastCommittedCounter = -1;
2✔
23
    private static Long lastKnownSetupId = null;
2✔
24
    // Latest registry metadata fields, updated on each metadata fetch and forwarded to clients
25
    static volatile String lastCoverageTypes = null;
2✔
26
    static volatile String lastCoverageAgents = null;
2✔
27
    static volatile String lastTestInstance = null;
2✔
28
    static volatile String lastNanopubCount = null;
2✔
29
    private static final CloseableHttpClient metadataClient;
30
    private static final CloseableHttpClient jellyStreamClient;
31

32
    private static final int MAX_RETRIES_METADATA = 10;
33
    private static final int RETRY_DELAY_METADATA = 3000;
34
    private static final int RETRY_DELAY_JELLY = 5000;
35

36
    private static final Logger log = LoggerFactory.getLogger(JellyNanopubLoader.class);
3✔
37

38
    /**
39
     * Registry metadata returned by a HEAD request.
40
     */
41
    record RegistryMetadata(long loadCounter, Long setupId, String coverageTypes,
21✔
42
                            String coverageAgents, String testInstance, String nanopubCount) {}
43

44
    /**
45
     * The interval in milliseconds at which the updates loader should poll for new nanopubs.
46
     */
47
    public static final int UPDATES_POLL_INTERVAL = 2000;
48

49
    enum LoadingType {
×
50
        INITIAL,
×
51
        UPDATE,
×
52
    }
53

54
    static {
55
        // Initialize registryUrl
56
        var url = Utils.getEnvString(
4✔
57
                "REGISTRY_FIXED_URL", "https://registry.knowledgepixels.com/"
58
        );
59
        if (!url.endsWith("/")) url += "/";
4!
60
        registryUrl = url;
2✔
61

62
        metadataClient = HttpClientBuilder.create().setDefaultRequestConfig(Utils.getHttpRequestConfig()).build();
5✔
63
        jellyStreamClient = NanopubUtils.getHttpClient();
2✔
64
    }
1✔
65

66
    /**
67
     * Start or continue (after restart) the initial loading procedure. This simply loads all
68
     * nanopubs from the attached Registry.
69
     *
70
     * @param afterCounter which counter to start from (-1 for the beginning)
71
     */
72
    public static void loadInitial(long afterCounter) {
73
        RegistryMetadata metadata = fetchRegistryMetadata();
2✔
74
        updateForwardingMetadata(metadata);
2✔
75
        long targetCounter = metadata.loadCounter();
3✔
76
        log.info("Fetched Registry load counter: {}", targetCounter);
5✔
77
        // Store setupId on initial load
78
        if (metadata.setupId() != null && lastKnownSetupId == null) {
3!
79
            lastKnownSetupId = metadata.setupId();
×
80
            StatusController.get().setRegistrySetupId(metadata.setupId());
×
81
        }
82
        lastCommittedCounter = afterCounter;
2✔
83
        while (lastCommittedCounter < targetCounter) {
4!
84
            try {
85
                loadBatch(lastCommittedCounter, LoadingType.INITIAL);
×
86
                log.info("Initial load: loaded batch up to counter {}", lastCommittedCounter);
×
87
            } catch (Exception e) {
×
88
                log.info("Failed to load batch starting from counter {}", lastCommittedCounter);
×
89
                log.info("Failure reason: ", e);
×
90
                try {
91
                    Thread.sleep(RETRY_DELAY_JELLY);
×
92
                } catch (InterruptedException e2) {
×
93
                    throw new RuntimeException("Interrupted while waiting to retry loading batch.");
×
94
                }
×
95
            }
×
96
        }
97
        log.info("Initial load complete.");
3✔
98
    }
1✔
99

100
    /**
101
     * Check if the Registry has any new nanopubs. If it does, load them.
102
     * This method should be called periodically, and you should wait for it to finish before
103
     * calling it again.
104
     */
105
    public static void loadUpdates() {
106
        try {
107
            final var status = StatusController.get().getState();
×
108
            lastCommittedCounter = status.loadCounter;
×
109
            RegistryMetadata metadata = fetchRegistryMetadata();
×
110
            updateForwardingMetadata(metadata);
×
111
            long targetCounter = metadata.loadCounter();
×
112
            Long currentSetupId = metadata.setupId();
×
113

114
            // Detect reset via setupId change
115
            if (lastKnownSetupId != null && currentSetupId != null
×
116
                    && !lastKnownSetupId.equals(currentSetupId)) {
×
117
                log.warn("Registry reset detected: setupId {} -> {}", lastKnownSetupId, currentSetupId);
×
118
                performResync(currentSetupId);
×
119
                return;
×
120
            }
121
            // Detect reset via counter decrease (also covers first run after upgrade
122
            // where no setupId was persisted yet but the registry has already been reset)
123
            if (lastCommittedCounter > 0 && targetCounter >= 0
×
124
                    && targetCounter < lastCommittedCounter) {
125
                log.warn("Registry counter decreased {} -> {}, triggering resync",
×
126
                        lastCommittedCounter, targetCounter);
×
127
                performResync(currentSetupId);
×
128
                return;
×
129
            }
130

131
            // Update lastKnownSetupId on first successful poll
132
            if (currentSetupId != null && lastKnownSetupId == null) {
×
133
                if (lastCommittedCounter > 0) {
×
134
                    // Upgrade from a version without setupId tracking. The DB has data but
135
                    // we can't verify it matches the current registry. Force a resync.
136
                    log.warn("No stored setupId but DB has data (counter: {}). "
×
137
                            + "Forcing resync to ensure data consistency.", lastCommittedCounter);
×
138
                    performResync(currentSetupId);
×
139
                    return;
×
140
                }
141
                lastKnownSetupId = currentSetupId;
×
142
                StatusController.get().setRegistrySetupId(currentSetupId);
×
143
            }
144

145
            StatusController.get().setLoadingUpdates(status.loadCounter);
×
146
            if (lastCommittedCounter >= targetCounter) {
×
147
                StatusController.get().setReady();
×
148
                return;
×
149
            }
150
            loadBatch(lastCommittedCounter, LoadingType.UPDATE);
×
151
            log.info("Loaded {} update(s). Counter: {}, target was: {}",
×
152
                    lastCommittedCounter - status.loadCounter, lastCommittedCounter, targetCounter);
×
153
            if (lastCommittedCounter < targetCounter) {
×
154
                log.info("Warning: expected to load nanopubs up to (inclusive) counter " +
×
155
                        targetCounter + " based on the counter reported in Registry's headers, " +
156
                        "but loaded only up to {}.", lastCommittedCounter);
×
157
            }
158
        } catch (Exception e) {
×
159
            log.info("Failed to load updates. Current counter: {}", lastCommittedCounter);
×
160
            log.info("Failure Reason: ", e);
×
161
        } finally {
162
            try {
163
                StatusController.get().setReady();
×
164
            } catch (Exception e) {
×
165
                log.info("Update loader: failed to set status to READY.");
×
166
                log.info("Failure Reason: ", e);
×
167
            }
×
168
        }
169
    }
×
170

171
    /**
172
     * Re-stream all nanopubs from the registry after a reset is detected.
173
     * Existing nanopubs are skipped by NanopubLoader's per-repo dedup.
174
     *
175
     * @param newSetupId the new setup ID from the registry, or null if unknown
176
     */
177
    private static void performResync(Long newSetupId) {
178
        log.warn("Starting resync with registry. New setupId: {}", newSetupId);
×
179
        StatusController.get().setResetting();
×
180
        lastKnownSetupId = newSetupId;
×
181
        if (newSetupId != null) {
×
182
            StatusController.get().setRegistrySetupId(newSetupId);
×
183
        }
184
        StatusController.get().setLoadingInitial(-1);
×
185
        loadInitial(-1);
×
186
        StatusController.get().setReady();
×
187
        log.warn("Resync complete. Counter: {}", lastCommittedCounter);
×
188
    }
×
189

190
    /**
191
     * Load a batch of nanopubs from the Jelly stream.
192
     * <p>
193
     * The method requests the list of all nanopubs from the Registry and reads it for as long
194
     * as it can. If the stream is interrupted, the method will throw an exception, and you
195
     * can resume loading from the last known counter.
196
     *
197
     * @param afterCounter the last known nanopub counter to have been committed in the DB
198
     * @param type         the type of loading operation (initial or update)
199
     */
200
    static void loadBatch(long afterCounter, LoadingType type) {
201
        CloseableHttpResponse response;
202
        try {
203
            var request = new HttpGet(makeStreamFetchUrl(afterCounter));
×
204
            response = jellyStreamClient.execute(request);
×
205
        } catch (IOException e) {
×
206
            throw new RuntimeException("Failed to fetch Jelly stream from the Registry (I/O error).", e);
×
207
        }
×
208

209
        int httpStatus = response.getStatusLine().getStatusCode();
×
210
        if (httpStatus < 200 || httpStatus >= 300) {
×
211
            EntityUtils.consumeQuietly(response.getEntity());
×
212
            throw new RuntimeException("Jelly stream HTTP status is not 2xx: " + httpStatus + ".");
×
213
        }
214

215
        try (
216
                var is = response.getEntity().getContent();
×
217
                var npStream = NanopubStream.fromByteStream(is).getAsNanopubs()
×
218
        ) {
219
            AtomicLong checkpointTime = new AtomicLong(System.currentTimeMillis());
×
220
            AtomicLong checkpointCounter = new AtomicLong(lastCommittedCounter);
×
221
            AtomicLong lastSavedCounter = new AtomicLong(lastCommittedCounter);
×
222
            AtomicLong loaded = new AtomicLong(0L);
×
223

224
            npStream.forEach(m -> {
×
225
                if (!m.isSuccess()) throw new RuntimeException("Failed to load " +
×
226
                        "nanopub from Jelly stream. Last known counter: " + lastCommittedCounter,
227
                        m.getException()
×
228
                );
229
                if (m.getCounter() < lastCommittedCounter) {
×
230
                    throw new RuntimeException("Received a nanopub with a counter lower than " +
×
231
                            "the last known counter. Last known counter: " + lastCommittedCounter +
232
                            ", received counter: " + m.getCounter());
×
233
                }
234
                NanopubLoader.load(m.getNanopub(), m.getCounter());
×
235
                if (m.getCounter() % 10 == 0) {
×
236
                    // Save the committed counter only every 10 nanopubs to reduce DB load
237
                    saveCommittedCounter(type);
×
238
                    lastSavedCounter.set(m.getCounter());
×
239
                }
240
                lastCommittedCounter = m.getCounter();
×
241
                loaded.getAndIncrement();
×
242

243
                if (loaded.get() % 50 == 0) {
×
244
                    long currTime = System.currentTimeMillis();
×
245
                    double speed = 50 / ((currTime - checkpointTime.get()) / 1000.0);
×
246
                    log.info("Loading speed: " + String.format("%.2f", speed) +
×
247
                            " np/s. Counter: " + lastCommittedCounter);
248
                    checkpointTime.set(currTime);
×
249
                    checkpointCounter.set(lastCommittedCounter);
×
250
                }
251
            });
×
252
            // Make sure to save the last committed counter at the end of the batch
253
            if (lastCommittedCounter >= lastSavedCounter.get()) {
×
254
                saveCommittedCounter(type);
×
255
            }
256
        } catch (IOException e) {
×
257
            throw new RuntimeException("I/O error while reading the response Jelly stream.", e);
×
258
        } finally {
259
            try {
260
                response.close();
×
261
            } catch (IOException e) {
×
262
                log.info("Failed to close the Jelly stream response.");
×
263
            }
×
264
        }
265
    }
×
266

267
    /**
268
     * Save the last committed counter to the DB. Do this every N nanopubs to reduce DB load.
269
     * Remember to call this method at the end of the batch as well.
270
     *
271
     * @param type the type of loading operation (initial or update)
272
     */
273
    private static void saveCommittedCounter(LoadingType type) {
274
        try {
275
            if (type == LoadingType.INITIAL) {
×
276
                StatusController.get().setLoadingInitial(lastCommittedCounter);
×
277
            } else {
278
                StatusController.get().setLoadingUpdates(lastCommittedCounter);
×
279
            }
280
        } catch (Exception e) {
×
281
            throw new RuntimeException("Could not update the nanopub counter in DB", e);
×
282
        }
×
283
    }
×
284

285
    /**
286
     * Set the last known setup ID. Called from MainVerticle on startup to restore persisted state.
287
     *
288
     * @param setupId the setup ID to set, or null if not known
289
     */
290
    static void setLastKnownSetupId(Long setupId) {
291
        lastKnownSetupId = setupId;
×
292
    }
×
293

294
    /**
295
     * Update the cached metadata fields used for forwarding to clients.
296
     */
297
    private static void updateForwardingMetadata(RegistryMetadata metadata) {
298
        lastCoverageTypes = metadata.coverageTypes();
3✔
299
        lastCoverageAgents = metadata.coverageAgents();
3✔
300
        lastTestInstance = metadata.testInstance();
3✔
301
        lastNanopubCount = metadata.nanopubCount();
3✔
302
    }
1✔
303

304
    /**
305
     * Run a HEAD request to the Registry to fetch its current metadata (load counter and setup ID).
306
     *
307
     * @return the registry metadata
308
     */
309
    static RegistryMetadata fetchRegistryMetadata() {
310
        int tries = 0;
2✔
311
        RegistryMetadata metadata = null;
2✔
312
        while (metadata == null && tries < MAX_RETRIES_METADATA) {
5!
313
            try {
314
                metadata = fetchRegistryMetadataInner();
2✔
315
            } catch (Exception e) {
×
316
                tries++;
×
317
                log.info("Failed to fetch registry metadata, try " + tries +
×
318
                        ". Retrying in {}ms...", RETRY_DELAY_METADATA);
×
319
                log.info("Failure Reason: ", e);
×
320
                try {
321
                    Thread.sleep(RETRY_DELAY_METADATA);
×
322
                } catch (InterruptedException e2) {
×
323
                    throw new RuntimeException(
×
324
                            "Interrupted while waiting to retry fetching registry metadata.");
325
                }
×
326
            }
1✔
327
        }
328
        if (metadata == null) {
2!
329
            throw new RuntimeException("Failed to fetch registry metadata after " +
×
330
                    MAX_RETRIES_METADATA + " retries.");
331
        }
332
        return metadata;
2✔
333
    }
334

335
    /**
336
     * Inner logic for fetching the registry metadata via HEAD request.
337
     *
338
     * @return the registry metadata (load counter and setup ID)
339
     * @throws IOException if the HTTP request fails
340
     */
341
    private static RegistryMetadata fetchRegistryMetadataInner() throws IOException {
342
        var request = new HttpHead(registryUrl);
5✔
343
        try (var response = metadataClient.execute(request)) {
4✔
344
            int status = response.getStatusLine().getStatusCode();
4✔
345
            EntityUtils.consumeQuietly(response.getEntity());
3✔
346
            if (status < 200 || status >= 300) {
6!
347
                throw new RuntimeException("Registry metadata HTTP status is not 2xx: " +
×
348
                        status + ".");
349
            }
350

351
            // Check if the registry is ready
352
            var hStatus = response.getHeaders("Nanopub-Registry-Status");
4✔
353
            if (hStatus.length == 0) {
3!
354
                throw new RuntimeException("Registry did not return a Nanopub-Registry-Status header.");
×
355
            }
356
            if (!"ready".equals(hStatus[0].getValue()) && !"updating".equals(hStatus[0].getValue())) {
7!
357
                throw new RuntimeException("Registry is not in ready state.");
×
358
            }
359

360
            // Get the load counter
361
            var hCounter = response.getHeaders("Nanopub-Registry-Load-Counter");
4✔
362
            if (hCounter.length == 0) {
3!
363
                throw new RuntimeException("Registry did not return a Nanopub-Registry-Load-Counter header.");
×
364
            }
365
            long loadCounter = Long.parseLong(hCounter[0].getValue());
6✔
366

367
            // Get the setup ID (optional — older registries may not have it)
368
            Long setupId = null;
2✔
369
            var hSetupId = response.getHeaders("Nanopub-Registry-Setup-Id");
4✔
370
            if (hSetupId.length > 0) {
3!
371
                try {
372
                    setupId = Long.parseLong(hSetupId[0].getValue());
7✔
373
                } catch (NumberFormatException e) {
×
374
                    log.info("Could not parse Nanopub-Registry-Setup-Id header: {}", hSetupId[0].getValue());
×
375
                }
1✔
376
            }
377

378
            // Read metadata headers for forwarding to clients
379
            String coverageTypes = getHeaderValue(response, "Nanopub-Registry-Coverage-Types");
4✔
380
            String coverageAgents = getHeaderValue(response, "Nanopub-Registry-Coverage-Agents");
4✔
381
            String testInstance = getHeaderValue(response, "Nanopub-Registry-Test-Instance");
4✔
382
            String nanopubCount = getHeaderValue(response, "Nanopub-Registry-Nanopub-Count");
4✔
383

384
            return new RegistryMetadata(loadCounter, setupId, coverageTypes, coverageAgents, testInstance, nanopubCount);
12✔
385
        }
386
    }
387

388
    private static String getHeaderValue(CloseableHttpResponse response, String name) {
389
        var headers = response.getHeaders(name);
4✔
390
        return headers.length > 0 ? headers[0].getValue() : null;
9!
391
    }
392

393
    /**
394
     * Construct the URL for fetching the Jelly stream.
395
     *
396
     * @param afterCounter the last known counter to have been committed in the DB
397
     * @return the URL for fetching the Jelly stream
398
     */
399
    private static String makeStreamFetchUrl(long afterCounter) {
400
        return registryUrl + "nanopubs.jelly?afterCounter=" + afterCounter;
×
401
    }
402
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc