• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-query / 24495424169

16 Apr 2026 06:21AM UTC coverage: 66.641% (-0.6%) from 67.247%
24495424169

push

github

tkuhn
doc: plan for mirroring trust state into the trust repo (#65)

Drafts the design for mirroring the registry's trust state into a
single 'trust' RDF4J repository in nanopub-query, with each state
stored in its own named graph keyed by trustStateHash. Detection
piggybacks on the existing 2-second loadUpdates poll. Materialization,
pointer swap, and retention pruning all happen in one serializable
transaction. Includes the canonical SPARQL pattern for querying the
current state via the npa:hasCurrentTrustState pointer.

Targets registry endpoint /trust-state/<hash>.json (knowledgepixels/nanopub-registry#107).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

228 of 380 branches covered (60.0%)

Branch coverage included in aggregate %.

653 of 942 relevant lines covered (69.32%)

10.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.07
src/main/java/com/knowledgepixels/query/JellyNanopubLoader.java
1
package com.knowledgepixels.query;
2

3
import java.io.IOException;
4
import java.util.concurrent.atomic.AtomicLong;
5

6
import org.apache.http.client.methods.CloseableHttpResponse;
7
import org.apache.http.client.methods.HttpGet;
8
import org.apache.http.client.methods.HttpHead;
9
import org.apache.http.impl.client.CloseableHttpClient;
10
import org.apache.http.impl.client.HttpClientBuilder;
11
import org.apache.http.util.EntityUtils;
12
import org.nanopub.NanopubUtils;
13
import org.nanopub.jelly.NanopubStream;
14
import org.slf4j.Logger;
15
import org.slf4j.LoggerFactory;
16

17
/**
18
 * Loads nanopubs from the attached Nanopub Registry via a restartable Jelly stream.
19
 */
20
public class JellyNanopubLoader {
×
21
    static final String registryUrl;
22
    private static long lastCommittedCounter = -1;
6✔
23
    private static Long lastKnownSetupId = null;
6✔
24
    // Latest registry metadata fields, updated on each metadata fetch and forwarded to clients
25
    static volatile String lastCoverageTypes = null;
6✔
26
    static volatile String lastCoverageAgents = null;
6✔
27
    static volatile String lastTestInstance = null;
6✔
28
    static volatile String lastNanopubCount = null;
6✔
29
    private static final CloseableHttpClient metadataClient;
30
    private static final CloseableHttpClient jellyStreamClient;
31

32
    private static final int MAX_RETRIES_METADATA = 10;
33
    private static final int RETRY_DELAY_METADATA = 3000;
34
    private static final int RETRY_DELAY_JELLY = 5000;
35

36
    private static final Logger log = LoggerFactory.getLogger(JellyNanopubLoader.class);
9✔
37

38
    /**
39
     * Registry metadata returned by a HEAD request.
40
     */
41
    record RegistryMetadata(long loadCounter, Long setupId, String coverageTypes,
63✔
42
                            String coverageAgents, String testInstance, String nanopubCount) {}
43

44
    /**
45
     * The interval in milliseconds at which the updates loader should poll for new nanopubs.
46
     */
47
    public static final int UPDATES_POLL_INTERVAL = 2000;
48

49
    enum LoadingType {
×
50
        INITIAL,
×
51
        UPDATE,
×
52
    }
53

54
    static {
55
        // Initialize registryUrl
56
        var url = Utils.getEnvString(
12✔
57
                "REGISTRY_FIXED_URL", "https://registry.knowledgepixels.com/"
58
        );
59
        if (!url.endsWith("/")) url += "/";
12!
60
        registryUrl = url;
6✔
61

62
        metadataClient = HttpClientBuilder.create().setDefaultRequestConfig(Utils.getHttpRequestConfig()).build();
15✔
63
        jellyStreamClient = NanopubUtils.getHttpClient();
6✔
64
    }
3✔
65

66
    /**
67
     * Start or continue (after restart) the initial loading procedure. This simply loads all
68
     * nanopubs from the attached Registry.
69
     *
70
     * @param afterCounter which counter to start from (-1 for the beginning)
71
     */
72
    public static void loadInitial(long afterCounter) {
73
        RegistryMetadata metadata = fetchRegistryMetadata();
6✔
74
        updateForwardingMetadata(metadata);
6✔
75
        long targetCounter = metadata.loadCounter();
9✔
76
        log.info("Fetched Registry load counter: {}", targetCounter);
15✔
77
        // Store setupId on initial load
78
        if (metadata.setupId() != null && lastKnownSetupId == null) {
9!
79
            lastKnownSetupId = metadata.setupId();
×
80
            StatusController.get().setRegistrySetupId(metadata.setupId());
×
81
        }
82
        lastCommittedCounter = afterCounter;
6✔
83
        while (lastCommittedCounter < targetCounter) {
12!
84
            try {
85
                loadBatch(lastCommittedCounter, LoadingType.INITIAL);
×
86
                log.info("Initial load: loaded batch up to counter {}", lastCommittedCounter);
×
87
            } catch (Exception e) {
×
88
                log.info("Failed to load batch starting from counter {}", lastCommittedCounter);
×
89
                log.info("Failure reason: ", e);
×
90
                try {
91
                    Thread.sleep(RETRY_DELAY_JELLY);
×
92
                } catch (InterruptedException e2) {
×
93
                    throw new RuntimeException("Interrupted while waiting to retry loading batch.");
×
94
                }
×
95
            }
×
96
        }
97
        log.info("Initial load complete.");
9✔
98
    }
3✔
99

100
    /**
101
     * Check if the Registry has any new nanopubs. If it does, load them.
102
     * This method should be called periodically, and you should wait for it to finish before
103
     * calling it again.
104
     */
105
    public static void loadUpdates() {
106
        try {
107
            final var status = StatusController.get().getState();
×
108
            lastCommittedCounter = status.loadCounter;
×
109
            RegistryMetadata metadata = fetchRegistryMetadata();
×
110
            updateForwardingMetadata(metadata);
×
111
            long targetCounter = metadata.loadCounter();
×
112
            Long currentSetupId = metadata.setupId();
×
113

114
            // Detect reset via setupId change
115
            if (lastKnownSetupId != null && currentSetupId != null
×
116
                    && !lastKnownSetupId.equals(currentSetupId)) {
×
117
                log.warn("Registry reset detected: setupId {} -> {}", lastKnownSetupId, currentSetupId);
×
118
                performResync(currentSetupId);
×
119
                return;
×
120
            }
121
            // Detect reset via counter decrease (also covers first run after upgrade
122
            // where no setupId was persisted yet but the registry has already been reset)
123
            if (lastCommittedCounter > 0 && targetCounter >= 0
×
124
                    && targetCounter < lastCommittedCounter) {
125
                log.warn("Registry counter decreased {} -> {}, triggering resync",
×
126
                        lastCommittedCounter, targetCounter);
×
127
                performResync(currentSetupId);
×
128
                return;
×
129
            }
130

131
            // Update lastKnownSetupId on first successful poll
132
            if (currentSetupId != null && lastKnownSetupId == null) {
×
133
                if (lastCommittedCounter > 0) {
×
134
                    // Upgrade from a version without setupId tracking. The DB has data but
135
                    // we can't verify it matches the current registry. Force a resync.
136
                    log.warn("No stored setupId but DB has data (counter: {}). "
×
137
                            + "Forcing resync to ensure data consistency.", lastCommittedCounter);
×
138
                    performResync(currentSetupId);
×
139
                    return;
×
140
                }
141
                lastKnownSetupId = currentSetupId;
×
142
                StatusController.get().setRegistrySetupId(currentSetupId);
×
143
            }
144

145
            StatusController.get().setLoadingUpdates(status.loadCounter);
×
146
            if (lastCommittedCounter >= targetCounter) {
×
147
                StatusController.get().setReady();
×
148
                return;
×
149
            }
150
            loadBatch(lastCommittedCounter, LoadingType.UPDATE);
×
151
            log.info("Loaded {} update(s). Counter: {}, target was: {}",
×
152
                    lastCommittedCounter - status.loadCounter, lastCommittedCounter, targetCounter);
×
153
            if (lastCommittedCounter < targetCounter) {
×
154
                log.info("Warning: expected to load nanopubs up to (inclusive) counter " +
×
155
                        targetCounter + " based on the counter reported in Registry's headers, " +
156
                        "but loaded only up to {}.", lastCommittedCounter);
×
157
            }
158
        } catch (Exception e) {
×
159
            log.info("Failed to load updates. Current counter: {}", lastCommittedCounter);
×
160
            log.info("Failure Reason: ", e);
×
161
        } finally {
162
            try {
163
                StatusController.get().setReady();
×
164
            } catch (Exception e) {
×
165
                log.info("Update loader: failed to set status to READY.");
×
166
                log.info("Failure Reason: ", e);
×
167
            }
×
168
        }
169
    }
×
170

171
    /**
172
     * Re-stream all nanopubs from the registry after a reset is detected.
173
     * Existing nanopubs are skipped by NanopubLoader's per-repo dedup.
174
     *
175
     * @param newSetupId the new setup ID from the registry, or null if unknown
176
     */
177
    private static void performResync(Long newSetupId) {
178
        log.warn("Starting resync with registry. New setupId: {}", newSetupId);
×
179
        StatusController.get().setResetting();
×
180
        lastKnownSetupId = newSetupId;
×
181
        if (newSetupId != null) {
×
182
            StatusController.get().setRegistrySetupId(newSetupId);
×
183
        }
184
        StatusController.get().setLoadingInitial(-1);
×
185
        loadInitial(-1);
×
186
        StatusController.get().setReady();
×
187
        log.warn("Resync complete. Counter: {}", lastCommittedCounter);
×
188
    }
×
189

190
    /**
191
     * Load a batch of nanopubs from the Jelly stream.
192
     * <p>
193
     * The method requests the list of all nanopubs from the Registry and reads it for as long
194
     * as it can. If the stream is interrupted, the method will throw an exception, and you
195
     * can resume loading from the last known counter.
196
     *
197
     * @param afterCounter the last known nanopub counter to have been committed in the DB
198
     * @param type         the type of loading operation (initial or update)
199
     */
200
    static void loadBatch(long afterCounter, LoadingType type) {
201
        CloseableHttpResponse response;
202
        try {
203
            var request = new HttpGet(makeStreamFetchUrl(afterCounter));
×
204
            response = jellyStreamClient.execute(request);
×
205
        } catch (IOException e) {
×
206
            throw new RuntimeException("Failed to fetch Jelly stream from the Registry (I/O error).", e);
×
207
        }
×
208

209
        int httpStatus = response.getStatusLine().getStatusCode();
×
210
        if (httpStatus < 200 || httpStatus >= 300) {
×
211
            EntityUtils.consumeQuietly(response.getEntity());
×
212
            throw new RuntimeException("Jelly stream HTTP status is not 2xx: " + httpStatus + ".");
×
213
        }
214

215
        try (
216
                var is = response.getEntity().getContent();
×
217
                var npStream = NanopubStream.fromByteStream(is).getAsNanopubs()
×
218
        ) {
219
            AtomicLong checkpointTime = new AtomicLong(System.currentTimeMillis());
×
220
            AtomicLong checkpointCounter = new AtomicLong(lastCommittedCounter);
×
221
            AtomicLong lastSavedCounter = new AtomicLong(lastCommittedCounter);
×
222
            AtomicLong loaded = new AtomicLong(0L);
×
223

224
            npStream.forEach(m -> {
×
225
                if (!m.isSuccess()) throw new RuntimeException("Failed to load " +
×
226
                        "nanopub from Jelly stream. Last known counter: " + lastCommittedCounter,
227
                        m.getException()
×
228
                );
229
                if (m.getCounter() < lastCommittedCounter) {
×
230
                    throw new RuntimeException("Received a nanopub with a counter lower than " +
×
231
                            "the last known counter. Last known counter: " + lastCommittedCounter +
232
                            ", received counter: " + m.getCounter());
×
233
                }
234
                NanopubLoader.load(m.getNanopub(), m.getCounter());
×
235
                if (m.getCounter() % 10 == 0) {
×
236
                    // Save the committed counter only every 10 nanopubs to reduce DB load
237
                    saveCommittedCounter(type);
×
238
                    lastSavedCounter.set(m.getCounter());
×
239
                }
240
                lastCommittedCounter = m.getCounter();
×
241
                loaded.getAndIncrement();
×
242

243
                if (loaded.get() % 50 == 0) {
×
244
                    long currTime = System.currentTimeMillis();
×
245
                    double speed = 50 / ((currTime - checkpointTime.get()) / 1000.0);
×
246
                    log.info("Loading speed: " + String.format("%.2f", speed) +
×
247
                            " np/s. Counter: " + lastCommittedCounter);
248
                    checkpointTime.set(currTime);
×
249
                    checkpointCounter.set(lastCommittedCounter);
×
250
                }
251
            });
×
252
            // Make sure to save the last committed counter at the end of the batch
253
            if (lastCommittedCounter >= lastSavedCounter.get()) {
×
254
                saveCommittedCounter(type);
×
255
            }
256
        } catch (IOException e) {
×
257
            throw new RuntimeException("I/O error while reading the response Jelly stream.", e);
×
258
        } finally {
259
            try {
260
                response.close();
×
261
            } catch (IOException e) {
×
262
                log.info("Failed to close the Jelly stream response.");
×
263
            }
×
264
        }
265
    }
×
266

267
    /**
268
     * Save the last committed counter to the DB. Do this every N nanopubs to reduce DB load.
269
     * Remember to call this method at the end of the batch as well.
270
     *
271
     * @param type the type of loading operation (initial or update)
272
     */
273
    private static void saveCommittedCounter(LoadingType type) {
274
        try {
275
            if (type == LoadingType.INITIAL) {
×
276
                StatusController.get().setLoadingInitial(lastCommittedCounter);
×
277
            } else {
278
                StatusController.get().setLoadingUpdates(lastCommittedCounter);
×
279
            }
280
        } catch (Exception e) {
×
281
            throw new RuntimeException("Could not update the nanopub counter in DB", e);
×
282
        }
×
283
    }
×
284

285
    /**
286
     * Set the last known setup ID. Called from MainVerticle on startup to restore persisted state.
287
     *
288
     * @param setupId the setup ID to set, or null if not known
289
     */
290
    static void setLastKnownSetupId(Long setupId) {
291
        lastKnownSetupId = setupId;
×
292
    }
×
293

294
    /**
295
     * Update the cached metadata fields used for forwarding to clients.
296
     */
297
    private static void updateForwardingMetadata(RegistryMetadata metadata) {
298
        lastCoverageTypes = metadata.coverageTypes();
9✔
299
        lastCoverageAgents = metadata.coverageAgents();
9✔
300
        lastTestInstance = metadata.testInstance();
9✔
301
        lastNanopubCount = metadata.nanopubCount();
9✔
302
    }
3✔
303

304
    /**
305
     * Run a HEAD request to the Registry to fetch its current metadata (load counter and setup ID).
306
     *
307
     * @return the registry metadata
308
     */
309
    static RegistryMetadata fetchRegistryMetadata() {
310
        int tries = 0;
6✔
311
        RegistryMetadata metadata = null;
6✔
312
        while (metadata == null && tries < MAX_RETRIES_METADATA) {
15!
313
            try {
314
                metadata = fetchRegistryMetadataInner();
6✔
315
            } catch (Exception e) {
×
316
                tries++;
×
317
                log.info("Failed to fetch registry metadata, try " + tries +
×
318
                        ". Retrying in {}ms...", RETRY_DELAY_METADATA);
×
319
                log.info("Failure Reason: ", e);
×
320
                try {
321
                    Thread.sleep(RETRY_DELAY_METADATA);
×
322
                } catch (InterruptedException e2) {
×
323
                    throw new RuntimeException(
×
324
                            "Interrupted while waiting to retry fetching registry metadata.");
325
                }
×
326
            }
3✔
327
        }
328
        if (metadata == null) {
6!
329
            throw new RuntimeException("Failed to fetch registry metadata after " +
×
330
                    MAX_RETRIES_METADATA + " retries.");
331
        }
332
        return metadata;
6✔
333
    }
334

335
    /**
336
     * Inner logic for fetching the registry metadata via HEAD request.
337
     *
338
     * @return the registry metadata (load counter and setup ID)
339
     * @throws IOException if the HTTP request fails
340
     */
341
    private static RegistryMetadata fetchRegistryMetadataInner() throws IOException {
342
        var request = new HttpHead(registryUrl);
15✔
343
        try (var response = metadataClient.execute(request)) {
12✔
344
            int status = response.getStatusLine().getStatusCode();
12✔
345
            EntityUtils.consumeQuietly(response.getEntity());
9✔
346
            if (status < 200 || status >= 300) {
18!
347
                throw new RuntimeException("Registry metadata HTTP status is not 2xx: " +
×
348
                        status + ".");
349
            }
350

351
            // Check if the registry is ready
352
            var hStatus = response.getHeaders("Nanopub-Registry-Status");
12✔
353
            if (hStatus.length == 0) {
9!
354
                throw new RuntimeException("Registry did not return a Nanopub-Registry-Status header.");
×
355
            }
356
            if (!"ready".equals(hStatus[0].getValue()) && !"updating".equals(hStatus[0].getValue())) {
21!
357
                throw new RuntimeException("Registry is not in ready state.");
×
358
            }
359

360
            // Get the load counter
361
            var hCounter = response.getHeaders("Nanopub-Registry-Load-Counter");
12✔
362
            if (hCounter.length == 0) {
9!
363
                throw new RuntimeException("Registry did not return a Nanopub-Registry-Load-Counter header.");
×
364
            }
365
            long loadCounter = Long.parseLong(hCounter[0].getValue());
18✔
366

367
            // Get the setup ID (optional — older registries may not have it)
368
            Long setupId = null;
6✔
369
            var hSetupId = response.getHeaders("Nanopub-Registry-Setup-Id");
12✔
370
            if (hSetupId.length > 0) {
9!
371
                try {
372
                    setupId = Long.parseLong(hSetupId[0].getValue());
21✔
373
                } catch (NumberFormatException e) {
×
374
                    log.info("Could not parse Nanopub-Registry-Setup-Id header: {}", hSetupId[0].getValue());
×
375
                }
3✔
376
            }
377

378
            // Read metadata headers for forwarding to clients
379
            String coverageTypes = getHeaderValue(response, "Nanopub-Registry-Coverage-Types");
12✔
380
            String coverageAgents = getHeaderValue(response, "Nanopub-Registry-Coverage-Agents");
12✔
381
            String testInstance = getHeaderValue(response, "Nanopub-Registry-Test-Instance");
12✔
382
            String nanopubCount = getHeaderValue(response, "Nanopub-Registry-Nanopub-Count");
12✔
383

384
            return new RegistryMetadata(loadCounter, setupId, coverageTypes, coverageAgents, testInstance, nanopubCount);
36✔
385
        }
386
    }
387

388
    private static String getHeaderValue(CloseableHttpResponse response, String name) {
389
        var headers = response.getHeaders(name);
12✔
390
        return headers.length > 0 ? headers[0].getValue() : null;
27!
391
    }
392

393
    /**
394
     * Construct the URL for fetching the Jelly stream.
395
     *
396
     * @param afterCounter the last known counter to have been committed in the DB
397
     * @return the URL for fetching the Jelly stream
398
     */
399
    private static String makeStreamFetchUrl(long afterCounter) {
400
        return registryUrl + "nanopubs.jelly?afterCounter=" + afterCounter;
×
401
    }
402
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc