• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-registry / 24334244342

13 Apr 2026 08:46AM UTC coverage: 32.151% (-0.2%) from 32.357%
24334244342

push

github

web-flow
Merge pull request #101 from knowledgepixels/fix/100-sync-nanopubs-from-encountered-pubkey

fix: store and backfill nanopubs from encountered pubkeys

265 of 924 branches covered (28.68%)

Branch coverage included in aggregate %.

786 of 2345 relevant lines covered (33.52%)

5.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

8.55
src/main/java/com/knowledgepixels/registry/NanopubLoader.java
1
package com.knowledgepixels.registry;
2

3
import com.mongodb.ErrorCategory;
4
import com.mongodb.MongoWriteException;
5
import com.mongodb.client.ClientSession;
6
import com.mongodb.client.MongoCursor;
7
import net.trustyuri.TrustyUriUtils;
8
import net.trustyuri.rdf.RdfModule;
9
import org.apache.http.Header;
10
import org.apache.http.HttpResponse;
11
import org.apache.http.client.HttpClient;
12
import org.apache.http.client.methods.CloseableHttpResponse;
13
import org.apache.http.client.methods.HttpGet;
14
import org.apache.http.util.EntityUtils;
15
import org.bson.Document;
16
import org.bson.types.Binary;
17
import org.eclipse.rdf4j.common.exception.RDF4JException;
18
import org.eclipse.rdf4j.rio.RDFFormat;
19
import org.nanopub.MalformedNanopubException;
20
import org.nanopub.Nanopub;
21
import org.nanopub.NanopubImpl;
22
import org.nanopub.NanopubUtils;
23
import org.nanopub.extra.server.GetNanopub;
24
import org.nanopub.jelly.JellyUtils;
25
import org.nanopub.jelly.MaybeNanopub;
26
import org.nanopub.jelly.NanopubStream;
27
import org.nanopub.trusty.TrustyNanopubUtils;
28
import org.nanopub.vocabulary.NPX;
29
import org.slf4j.Logger;
30
import org.slf4j.LoggerFactory;
31

32
import java.io.IOException;
33
import java.io.InputStream;
34
import java.util.ArrayList;
35
import java.util.Collections;
36
import java.util.List;
37
import java.util.concurrent.ExecutorService;
38
import java.util.concurrent.Executors;
39
import java.util.concurrent.Semaphore;
40
import java.util.concurrent.TimeUnit;
41
import java.util.concurrent.atomic.AtomicReference;
42
import java.util.function.Consumer;
43
import java.util.stream.Stream;
44

45
import static com.knowledgepixels.registry.RegistryDB.has;
46
import static com.knowledgepixels.registry.RegistryDB.insert;
47

48
public class NanopubLoader {
49

50
    private NanopubLoader() {
51
    }
52

53
    public final static String INTRO_TYPE = NPX.DECLARED_BY.stringValue();
9✔
54
    public final static String INTRO_TYPE_HASH = Utils.getHash(INTRO_TYPE);
9✔
55
    public final static String ENDORSE_TYPE = Utils.APPROVES_OF.stringValue();
9✔
56
    public final static String ENDORSE_TYPE_HASH = Utils.getHash(ENDORSE_TYPE);
9✔
57
    private static final Logger log = LoggerFactory.getLogger(NanopubLoader.class);
9✔
58

59
    // TODO Distinguish and support these cases:
60
    //      1. Simple load: load to all core lists if pubkey is "core-loaded", or load to all lists if pubkey is "full-loaded"
61
    //      2. Core load: load to all core lists (initialize if needed), or load to all lists if pubkey is "full-loaded"
62
    //      3. Full load: load to all lists (initialize if needed)
63

64
    public static void simpleLoad(ClientSession mongoSession, String nanopubId) {
65
        simpleLoad(mongoSession, nanopubId, true);
×
66
    }
×
67

68
    public static void simpleLoad(ClientSession mongoSession, String nanopubId, boolean persistOnRetrieve) {
69
        if (persistOnRetrieve) {
×
70
            simpleLoad(mongoSession, retrieveNanopub(mongoSession, nanopubId));
×
71
        } else {
72
            Nanopub np = retrieveLocalNanopub(mongoSession, nanopubId);
×
73
            if (np == null) {
×
74
                np = getNanopub(nanopubId);
×
75
            }
76
            if (np != null) {
×
77
                simpleLoad(mongoSession, np);
×
78
            }
79
        }
80
    }
×
81

82
    public static void simpleLoad(ClientSession mongoSession, Nanopub np) {
83
        String pubkey = RegistryDB.getPubkey(np);
×
84
        if (pubkey == null) {
×
85
            log.info("Ignore (not signed): {}", np.getUri());
×
86
            return;
×
87
        }
88
        simpleLoad(mongoSession, np, pubkey);
×
89
    }
×
90

91
    /**
92
     * Loads a nanopub to the appropriate lists, using a pre-verified public key
93
     * to skip redundant signature verification.
94
     */
95
    public static void simpleLoad(ClientSession mongoSession, Nanopub np, String verifiedPubkey) {
96
        String pubkeyHash = Utils.getHash(verifiedPubkey);
9✔
97
        // TODO Do we need to load anything else here, into the other DB collections?
98
        if (has(mongoSession, "lists", new Document("pubkey", pubkeyHash).append("type", "$").append("status", "loaded"))) {
45!
99
            RegistryDB.loadNanopubVerified(mongoSession, np, verifiedPubkey, pubkeyHash, "$");
×
100
        } else if (has(mongoSession, "lists", new Document("pubkey", pubkeyHash).append("type", INTRO_TYPE_HASH).append("status", "loaded"))) {
45!
101
            RegistryDB.loadNanopubVerified(mongoSession, np, verifiedPubkey, pubkeyHash, INTRO_TYPE, ENDORSE_TYPE);
×
102
        } else {
103
            // Pubkey not yet loaded (unknown or in transitional "encountered" state): store the
104
            // nanopub in the nanopubs collection so it is not lost. RUN_OPTIONAL_LOAD will add it
105
            // to the appropriate lists once the pubkey's intro/endorse have been fetched.
106
            RegistryDB.loadNanopubVerified(mongoSession, np, verifiedPubkey, null);
24✔
107
            if (!has(mongoSession, "lists", new Document("pubkey", pubkeyHash).append("type", INTRO_TYPE_HASH))) {
36!
108
                // Unknown pubkey: create encountered intro list so RUN_OPTIONAL_LOAD picks it up
109
                try {
110
                    insert(mongoSession, "lists", new Document("pubkey", pubkeyHash)
30✔
111
                            .append("type", INTRO_TYPE_HASH)
9✔
112
                            .append("status", EntryStatus.encountered.getValue()));
6✔
113
                } catch (MongoWriteException e) {
×
114
                    if (e.getError().getCategory() != ErrorCategory.DUPLICATE_KEY) throw e;
×
115
                }
3✔
116
            }
117
        }
118
    }
3✔
119

120
    private static final int LOAD_PARALLELISM = Integer.parseInt(
12✔
121
            Utils.getEnv("REGISTRY_LOAD_PARALLELISM", String.valueOf(Runtime.getRuntime().availableProcessors())));
12✔
122

123
    /**
124
     * Processes a stream of nanopubs in parallel using a thread pool.
125
     * Each worker thread uses its own MongoDB ClientSession.
126
     * Backpressure is applied via a semaphore to avoid unbounded memory growth.
127
     *
128
     * @param stream    the nanopub stream to process
129
     * @param processor consumer that processes each nanopub (called with its own ClientSession)
130
     */
131
    public static void loadStreamInParallel(Stream<MaybeNanopub> stream, Consumer<Nanopub> processor) {
132
        if (LOAD_PARALLELISM <= 1) {
×
133
            // Fall back to sequential processing
134
            stream.forEach(m -> {
×
135
                if (!m.isSuccess()) throw new AbortingTaskException("Failed to download nanopub; aborting task...");
×
136
                processor.accept(m.getNanopub());
×
137
            });
×
138
            return;
×
139
        }
140

141
        ExecutorService executor = Executors.newFixedThreadPool(LOAD_PARALLELISM);
×
142
        Semaphore semaphore = new Semaphore(LOAD_PARALLELISM * 2);
×
143
        AtomicReference<Exception> error = new AtomicReference<>();
×
144

145
        try {
146
            stream.forEach(m -> {
×
147
                if (error.get() != null) return;
×
148
                if (!m.isSuccess()) {
×
149
                    error.compareAndSet(null, new AbortingTaskException("Failed to download nanopub; aborting task..."));
×
150
                    return;
×
151
                }
152
                Nanopub np = m.getNanopub();
×
153
                try {
154
                    semaphore.acquire();
×
155
                } catch (InterruptedException e) {
×
156
                    Thread.currentThread().interrupt();
×
157
                    error.compareAndSet(null, e);
×
158
                    return;
×
159
                }
×
160
                executor.submit(() -> {
×
161
                    try {
162
                        processor.accept(np);
×
163
                    } catch (Exception e) {
×
164
                        error.compareAndSet(null, e);
×
165
                    } finally {
166
                        semaphore.release();
×
167
                    }
168
                });
×
169
            });
×
170
        } finally {
171
            executor.shutdown();
×
172
            try {
173
                executor.awaitTermination(1, TimeUnit.HOURS);
×
174
            } catch (InterruptedException e) {
×
175
                Thread.currentThread().interrupt();
×
176
            }
×
177
        }
178

179
        if (error.get() != null) {
×
180
            if (error.get() instanceof RuntimeException re) throw re;
×
181
            throw new RuntimeException("Parallel loading failed", error.get());
×
182
        }
183
    }
×
184

185
    /**
186
     * Retrieve Nanopubs from the peers of this Nanopub Registry.
187
     *
188
     * @param typeHash   The hash of the type of the Nanopub to retrieve.
189
     * @param pubkeyHash The hash of the pubkey of the Nanopub to retrieve.
190
     * @return A stream of MaybeNanopub objects, or an empty stream if no peer is available.
191
     */
192
    public static Stream<MaybeNanopub> retrieveNanopubsFromPeers(String typeHash, String pubkeyHash) {
193
        return retrieveNanopubsFromPeers(typeHash, pubkeyHash, null);
×
194
    }
195

196
    /**
197
     * Retrieve Nanopubs from the peers, optionally skipping ahead using checksums.
198
     *
199
     * @param typeHash        The hash of the type of the Nanopub to retrieve.
200
     * @param pubkeyHash      The hash of the pubkey of the Nanopub to retrieve.
201
     * @param afterChecksums  Comma-separated checksums for skip-ahead (geometric fallback), or null for full fetch.
202
     * @return A stream of MaybeNanopub objects, or an empty stream if no peer is available.
203
     */
204
    public static Stream<MaybeNanopub> retrieveNanopubsFromPeers(String typeHash, String pubkeyHash, String afterChecksums) {
205
        // TODO Move the code of this method to nanopub-java library.
206

207
        List<String> peerUrlsToTry = new ArrayList<>(Utils.getPeerUrls());
×
208
        Collections.shuffle(peerUrlsToTry);
×
209
        while (!peerUrlsToTry.isEmpty()) {
×
210
            String peerUrl = peerUrlsToTry.removeFirst();
×
211

212
            String requestUrl = peerUrl + "list/" + pubkeyHash + "/" + typeHash + ".jelly";
×
213
            if (afterChecksums != null) {
×
214
                requestUrl += "?afterChecksums=" + afterChecksums;
×
215
            }
216
            log.info("Request: {}", requestUrl);
×
217
            try {
218
                CloseableHttpResponse resp = NanopubUtils.getHttpClient().execute(new HttpGet(requestUrl));
×
219
                int httpStatus = resp.getStatusLine().getStatusCode();
×
220
                if (httpStatus < 200 || httpStatus >= 300) {
×
221
                    log.info("Request failed: {} {}", peerUrl, httpStatus);
×
222
                    EntityUtils.consumeQuietly(resp.getEntity());
×
223
                    continue;
×
224
                }
225
                Header nrStatus = resp.getFirstHeader("Nanopub-Registry-Status");
×
226
                if (nrStatus == null) {
×
227
                    log.info("Nanopub-Registry-Status header not found at: {}", peerUrl);
×
228
                    EntityUtils.consumeQuietly(resp.getEntity());
×
229
                    continue;
×
230
                } else if (!nrStatus.getValue().equals("ready") && !nrStatus.getValue().equals("updating")) {
×
231
                    log.info("Peer in non-ready state: {} {}", peerUrl, nrStatus.getValue());
×
232
                    EntityUtils.consumeQuietly(resp.getEntity());
×
233
                    continue;
×
234
                }
235
                InputStream is = resp.getEntity().getContent();
×
236
                return NanopubStream.fromByteStream(is).getAsNanopubs().onClose(() -> {
×
237
                    try {
238
                        resp.close();
×
239
                    } catch (IOException e) {
×
240
                        log.debug("Error closing HTTP response", e);
×
241
                    }
×
242
                });
×
243
            } catch (UnsupportedOperationException | IOException ex) {
×
244
                log.info("Request failed: ", ex);
×
245
            }
246
        }
×
247
        return Stream.empty();
×
248
    }
249

250
    public static Nanopub retrieveNanopub(ClientSession mongoSession, String nanopubId) {
251
        Nanopub np = retrieveLocalNanopub(mongoSession, nanopubId);
×
252
        int tryCount = 0;
×
253
        while (np == null) {
×
254
            if (tryCount > 10) {
×
255
                throw new RuntimeException("Could not load nanopub: " + nanopubId);
×
256
            } else if (tryCount > 0) {
×
257
                try {
258
                    Thread.sleep(100);
×
259
                } catch (InterruptedException ex) {
×
260
                    log.info("Thread was interrupted", ex);
×
261
                }
×
262
            }
263
            log.info("Loading {}", nanopubId);
×
264

265
            // TODO Reach out to other Nanopub Registries here:
266
            np = getNanopub(nanopubId);
×
267
            if (np != null) {
×
268
                RegistryDB.loadNanopub(mongoSession, np);
×
269
            } else {
270
                tryCount = tryCount + 1;
×
271
            }
272
        }
273
        return np;
×
274
    }
275

276
    public static Nanopub retrieveLocalNanopub(ClientSession mongoSession, String nanopubId) {
277
        String ac = TrustyUriUtils.getArtifactCode(nanopubId);
×
278
        MongoCursor<Document> cursor = RegistryDB.get(mongoSession, Collection.NANOPUBS.toString(), new Document("_id", ac));
×
279
        if (!cursor.hasNext()) return null;
×
280
        try {
281
            // Parse from Jelly, not TriG (it's faster)
282
            return JellyUtils.readFromDB(((Binary) cursor.next().get("jelly")).getData());
×
283
        } catch (RDF4JException | MalformedNanopubException ex) {
×
284
            log.info("Exception reading Jelly", ex);
×
285
            return null;
×
286
        }
287
    }
288

289
    // TODO Provide this method in nanopub-java (GetNanopub)
290
    private static Nanopub getNanopub(String uriOrArtifactCode) {
291
        List<String> peerUrls = new ArrayList<>(Utils.getPeerUrls());
×
292
        Collections.shuffle(peerUrls);
×
293
        String ac = GetNanopub.getArtifactCode(uriOrArtifactCode);
×
294
        if (!ac.startsWith(RdfModule.MODULE_ID)) {
×
295
            throw new IllegalArgumentException("Not a trusty URI of type RA");
×
296
        }
297
        while (!peerUrls.isEmpty()) {
×
298
            String peerUrl = peerUrls.removeFirst();
×
299
            try {
300
                Nanopub np = get(ac, peerUrl, NanopubUtils.getHttpClient());
×
301
                if (np != null) {
×
302
                    return np;
×
303
                }
304
            } catch (IOException | RDF4JException | MalformedNanopubException ex) {
×
305
                // ignore
306
            }
×
307
        }
×
308
        return null;
×
309
    }
310

311
    // TODO Provide this method in nanopub-java (GetNanopub)
312
    private static Nanopub get(String artifactCode, String registryUrl, HttpClient httpClient)
313
            throws IOException, RDF4JException, MalformedNanopubException {
314
        HttpGet get = null;
×
315
        // TODO Get in Jelly format:
316
        String getUrl = registryUrl + "np/" + artifactCode;
×
317
        try {
318
            get = new HttpGet(getUrl);
×
319
        } catch (IllegalArgumentException ex) {
×
320
            throw new IOException("invalid URL: " + getUrl);
×
321
        }
×
322
        get.setHeader("Accept", "application/trig");
×
323
        InputStream in = null;
×
324
        try {
325
            HttpResponse resp = httpClient.execute(get);
×
326
            if (!wasSuccessful(resp)) {
×
327
                EntityUtils.consumeQuietly(resp.getEntity());
×
328
                throw new IOException(resp.getStatusLine().toString());
×
329
            }
330
            in = resp.getEntity().getContent();
×
331
            Nanopub nanopub = new NanopubImpl(in, RDFFormat.TRIG);
×
332
            if (!TrustyNanopubUtils.isValidTrustyNanopub(nanopub)) {
×
333
                throw new MalformedNanopubException("Nanopub is not trusty");
×
334
            }
335
            return nanopub;
×
336
        } finally {
337
            if (in != null) in.close();
×
338
        }
339
    }
340

341
    private static boolean wasSuccessful(HttpResponse resp) {
342
        int c = resp.getStatusLine().getStatusCode();
×
343
        return c >= 200 && c < 300;
×
344
    }
345

346
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc