• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-registry / 24727603152

21 Apr 2026 02:18PM UTC coverage: 31.808% (+0.3%) from 31.558%
24727603152

push

github

ashleycaselli
chore(logging): enhance info error handling in Nanopub processing

280 of 986 branches covered (28.4%)

Branch coverage included in aggregate %.

839 of 2532 relevant lines covered (33.14%)

5.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

8.3
src/main/java/com/knowledgepixels/registry/NanopubLoader.java
1
package com.knowledgepixels.registry;
2

3
import com.mongodb.ErrorCategory;
4
import com.mongodb.MongoWriteException;
5
import com.mongodb.client.ClientSession;
6
import com.mongodb.client.MongoCursor;
7
import net.trustyuri.TrustyUriUtils;
8
import net.trustyuri.rdf.RdfModule;
9
import org.apache.http.Header;
10
import org.apache.http.HttpResponse;
11
import org.apache.http.client.HttpClient;
12
import org.apache.http.client.methods.CloseableHttpResponse;
13
import org.apache.http.client.methods.HttpGet;
14
import org.apache.http.util.EntityUtils;
15
import org.bson.Document;
16
import org.bson.types.Binary;
17
import org.eclipse.rdf4j.common.exception.RDF4JException;
18
import org.eclipse.rdf4j.rio.RDFFormat;
19
import org.nanopub.MalformedNanopubException;
20
import org.nanopub.Nanopub;
21
import org.nanopub.NanopubImpl;
22
import org.nanopub.NanopubUtils;
23
import org.nanopub.extra.server.GetNanopub;
24
import org.nanopub.jelly.JellyUtils;
25
import org.nanopub.jelly.MaybeNanopub;
26
import org.nanopub.jelly.NanopubStream;
27
import org.nanopub.trusty.TrustyNanopubUtils;
28
import org.nanopub.vocabulary.NPX;
29
import org.slf4j.Logger;
30
import org.slf4j.LoggerFactory;
31

32
import java.io.IOException;
33
import java.io.InputStream;
34
import java.util.ArrayList;
35
import java.util.Collections;
36
import java.util.List;
37
import java.util.concurrent.ExecutorService;
38
import java.util.concurrent.Executors;
39
import java.util.concurrent.Semaphore;
40
import java.util.concurrent.TimeUnit;
41
import java.util.concurrent.atomic.AtomicReference;
42
import java.util.function.Consumer;
43
import java.util.stream.Stream;
44

45
import static com.knowledgepixels.registry.RegistryDB.has;
46
import static com.knowledgepixels.registry.RegistryDB.insert;
47

48
public class NanopubLoader {
49

50
    private NanopubLoader() {
51
    }
52

53
    public final static String INTRO_TYPE = NPX.DECLARED_BY.stringValue();
9✔
54
    public final static String INTRO_TYPE_HASH = Utils.getHash(INTRO_TYPE);
9✔
55
    public final static String ENDORSE_TYPE = Utils.APPROVES_OF.stringValue();
9✔
56
    public final static String ENDORSE_TYPE_HASH = Utils.getHash(ENDORSE_TYPE);
9✔
57
    private static final Logger log = LoggerFactory.getLogger(NanopubLoader.class);
9✔
58

59
    // TODO Distinguish and support these cases:
60
    //      1. Simple load: load to all core lists if pubkey is "core-loaded", or load to all lists if pubkey is "full-loaded"
61
    //      2. Core load: load to all core lists (initialize if needed), or load to all lists if pubkey is "full-loaded"
62
    //      3. Full load: load to all lists (initialize if needed)
63

64
    public static void simpleLoad(ClientSession mongoSession, String nanopubId) {
65
        simpleLoad(mongoSession, nanopubId, true);
×
66
    }
×
67

68
    public static void simpleLoad(ClientSession mongoSession, String nanopubId, boolean persistOnRetrieve) {
69
        if (persistOnRetrieve) {
×
70
            simpleLoad(mongoSession, retrieveNanopub(mongoSession, nanopubId));
×
71
        } else {
72
            Nanopub np = retrieveLocalNanopub(mongoSession, nanopubId);
×
73
            if (np == null) {
×
74
                np = getNanopub(nanopubId);
×
75
            }
76
            if (np != null) {
×
77
                simpleLoad(mongoSession, np);
×
78
            }
79
        }
80
    }
×
81

82
    public static void simpleLoad(ClientSession mongoSession, Nanopub np) {
83
        String pubkey = RegistryDB.getPubkey(np);
×
84
        if (pubkey == null) {
×
85
            log.warn("Skipping nanopub {}: no valid signature found", np.getUri());
×
86
            return;
×
87
        }
88
        simpleLoad(mongoSession, np, pubkey);
×
89
    }
×
90

91
    /**
92
     * Loads a nanopub to the appropriate lists, using a pre-verified public key
93
     * to skip redundant signature verification.
94
     */
95
    public static void simpleLoad(ClientSession mongoSession, Nanopub np, String verifiedPubkey) {
96
        String pubkeyHash = Utils.getHash(verifiedPubkey);
9✔
97
        // TODO Do we need to load anything else here, into the other DB collections?
98
        if (has(mongoSession, "lists", new Document("pubkey", pubkeyHash).append("type", "$").append("status", "loaded"))) {
45!
99
            RegistryDB.loadNanopubVerified(mongoSession, np, verifiedPubkey, pubkeyHash, "$");
×
100
        } else if (has(mongoSession, "lists", new Document("pubkey", pubkeyHash).append("type", INTRO_TYPE_HASH).append("status", "loaded"))) {
45!
101
            RegistryDB.loadNanopubVerified(mongoSession, np, verifiedPubkey, pubkeyHash, INTRO_TYPE, ENDORSE_TYPE);
×
102
        } else {
103
            // Pubkey not yet loaded (unknown or in transitional "encountered" state): store the
104
            // nanopub in the nanopubs collection so it is not lost. RUN_OPTIONAL_LOAD will add it
105
            // to the appropriate lists once the pubkey's intro/endorse have been fetched.
106
            RegistryDB.loadNanopubVerified(mongoSession, np, verifiedPubkey, null);
24✔
107
            if (!has(mongoSession, "lists", new Document("pubkey", pubkeyHash).append("type", INTRO_TYPE_HASH))) {
36!
108
                // Unknown pubkey: create encountered intro list so RUN_OPTIONAL_LOAD picks it up
109
                try {
110
                    insert(mongoSession, "lists", new Document("pubkey", pubkeyHash)
30✔
111
                            .append("type", INTRO_TYPE_HASH)
9✔
112
                            .append("status", EntryStatus.encountered.getValue()));
6✔
113
                } catch (MongoWriteException e) {
×
114
                    if (e.getError().getCategory() != ErrorCategory.DUPLICATE_KEY) {
×
115
                        throw e;
×
116
                    }
117
                }
3✔
118
            }
119
        }
120
    }
3✔
121

122
    private static final int LOAD_PARALLELISM = Integer.parseInt(
12✔
123
            Utils.getEnv("REGISTRY_LOAD_PARALLELISM", String.valueOf(Runtime.getRuntime().availableProcessors())));
12✔
124

125
    /**
126
     * Processes a stream of nanopubs in parallel using a thread pool.
127
     * Each worker thread uses its own MongoDB ClientSession.
128
     * Backpressure is applied via a semaphore to avoid unbounded memory growth.
129
     *
130
     * @param stream    the nanopub stream to process
131
     * @param processor consumer that processes each nanopub (called with its own ClientSession)
132
     */
133
    public static void loadStreamInParallel(Stream<MaybeNanopub> stream, Consumer<Nanopub> processor) {
134
        if (LOAD_PARALLELISM <= 1) {
×
135
            // Fall back to sequential processing
136
            stream.forEach(m -> {
×
137
                if (!m.isSuccess()) {
×
138
                    throw new AbortingTaskException("Failed to download nanopub; aborting task...");
×
139
                }
140
                processor.accept(m.getNanopub());
×
141
            });
×
142
            return;
×
143
        }
144

145
        ExecutorService executor = Executors.newFixedThreadPool(LOAD_PARALLELISM);
×
146
        Semaphore semaphore = new Semaphore(LOAD_PARALLELISM * 2);
×
147
        AtomicReference<Exception> error = new AtomicReference<>();
×
148

149
        try {
150
            stream.forEach(m -> {
×
151
                if (error.get() != null) {
×
152
                    return;
×
153
                }
154
                if (!m.isSuccess()) {
×
155
                    error.compareAndSet(null, new AbortingTaskException("Failed to download nanopub; aborting task..."));
×
156
                    return;
×
157
                }
158
                Nanopub np = m.getNanopub();
×
159
                try {
160
                    semaphore.acquire();
×
161
                } catch (InterruptedException e) {
×
162
                    Thread.currentThread().interrupt();
×
163
                    error.compareAndSet(null, e);
×
164
                    return;
×
165
                }
×
166
                executor.submit(() -> {
×
167
                    try {
168
                        processor.accept(np);
×
169
                    } catch (Exception e) {
×
170
                        error.compareAndSet(null, e);
×
171
                    } finally {
172
                        semaphore.release();
×
173
                    }
174
                });
×
175
            });
×
176
        } finally {
177
            executor.shutdown();
×
178
            try {
179
                executor.awaitTermination(1, TimeUnit.HOURS);
×
180
            } catch (InterruptedException e) {
×
181
                Thread.currentThread().interrupt();
×
182
            }
×
183
        }
184

185
        if (error.get() != null) {
×
186
            if (error.get() instanceof RuntimeException re) {
×
187
                throw re;
×
188
            }
189
            throw new RuntimeException("Parallel loading failed", error.get());
×
190
        }
191
    }
×
192

193
    /**
194
     * Retrieve Nanopubs from the peers of this Nanopub Registry.
195
     *
196
     * @param typeHash   The hash of the type of the Nanopub to retrieve.
197
     * @param pubkeyHash The hash of the pubkey of the Nanopub to retrieve.
198
     * @return A stream of MaybeNanopub objects, or an empty stream if no peer is available.
199
     */
200
    public static Stream<MaybeNanopub> retrieveNanopubsFromPeers(String typeHash, String pubkeyHash) {
201
        return retrieveNanopubsFromPeers(typeHash, pubkeyHash, null);
×
202
    }
203

204
    /**
205
     * Retrieve Nanopubs from the peers, optionally skipping ahead using checksums.
206
     *
207
     * @param typeHash       The hash of the type of the Nanopub to retrieve.
208
     * @param pubkeyHash     The hash of the pubkey of the Nanopub to retrieve.
209
     * @param afterChecksums Comma-separated checksums for skip-ahead (geometric fallback), or null for full fetch.
210
     * @return A stream of MaybeNanopub objects, or an empty stream if no peer is available.
211
     */
212
    public static Stream<MaybeNanopub> retrieveNanopubsFromPeers(String typeHash, String pubkeyHash, String afterChecksums) {
213
        // TODO Move the code of this method to nanopub-java library.
214

215
        List<String> peerUrlsToTry = new ArrayList<>(Utils.getPeerUrls());
×
216
        Collections.shuffle(peerUrlsToTry);
×
217
        while (!peerUrlsToTry.isEmpty()) {
×
218
            String peerUrl = peerUrlsToTry.removeFirst();
×
219

220
            String requestUrl = peerUrl + "list/" + pubkeyHash + "/" + typeHash + ".jelly";
×
221
            if (afterChecksums != null) {
×
222
                requestUrl += "?afterChecksums=" + afterChecksums;
×
223
            }
224
            log.debug("Fetching nanopub list from peer: {}", requestUrl);
×
225
            try {
226
                CloseableHttpResponse resp = NanopubUtils.getHttpClient().execute(new HttpGet(requestUrl));
×
227
                int httpStatus = resp.getStatusLine().getStatusCode();
×
228
                if (httpStatus < 200 || httpStatus >= 300) {
×
229
                    log.warn("Fetching nanopub list from {} failed (HTTP {}); trying next peer", requestUrl, httpStatus);
×
230
                    EntityUtils.consumeQuietly(resp.getEntity());
×
231
                    continue;
×
232
                }
233
                Header nrStatus = resp.getFirstHeader("Nanopub-Registry-Status");
×
234
                if (nrStatus == null) {
×
235
                    log.warn("Peer {} did not return Nanopub-Registry-Status header; trying next peer", peerUrl);
×
236
                    EntityUtils.consumeQuietly(resp.getEntity());
×
237
                    continue;
×
238
                } else if (!nrStatus.getValue().equals("ready") && !nrStatus.getValue().equals("updating")) {
×
239
                    log.warn("Skipping peer {}: registry status is '{}' (expected 'ready' or 'updating'); trying next peer", peerUrl, nrStatus.getValue());
×
240
                    EntityUtils.consumeQuietly(resp.getEntity());
×
241
                    continue;
×
242
                }
243
                InputStream is = resp.getEntity().getContent();
×
244
                return NanopubStream.fromByteStream(is).getAsNanopubs().onClose(() -> {
×
245
                    try {
246
                        resp.close();
×
247
                    } catch (IOException e) {
×
248
                        log.debug("Error closing HTTP response", e);
×
249
                    }
×
250
                });
×
251
            } catch (UnsupportedOperationException | IOException ex) {
×
252
                log.warn("Failed to fetch nanopub list from {}: {}", requestUrl, ex.getMessage(), ex);
×
253
            }
254
        }
×
255
        return Stream.empty();
×
256
    }
257

258
    public static Nanopub retrieveNanopub(ClientSession mongoSession, String nanopubId) {
259
        Nanopub np = retrieveLocalNanopub(mongoSession, nanopubId);
×
260
        int tryCount = 0;
×
261
        while (np == null) {
×
262
            if (tryCount > 10) {
×
263
                throw new RuntimeException("Could not load nanopub: " + nanopubId);
×
264
            } else if (tryCount > 0) {
×
265
                try {
266
                    Thread.sleep(100);
×
267
                } catch (InterruptedException ex) {
×
268
                    log.warn("Thread interrupted while waiting to retry nanopub retrieval for {}", nanopubId, ex);
×
269
                }
×
270
            }
271
            log.info("Nanopub {} not found locally; fetching from peers (attempt {})", nanopubId, tryCount + 1);
×
272

273
            // TODO Reach out to other Nanopub Registries here:
274
            np = getNanopub(nanopubId);
×
275
            if (np != null) {
×
276
                RegistryDB.loadNanopub(mongoSession, np);
×
277
            } else {
278
                tryCount = tryCount + 1;
×
279
            }
280
        }
281
        return np;
×
282
    }
283

284
    public static Nanopub retrieveLocalNanopub(ClientSession mongoSession, String nanopubId) {
285
        String ac = TrustyUriUtils.getArtifactCode(nanopubId);
×
286
        MongoCursor<Document> cursor = RegistryDB.get(mongoSession, Collection.NANOPUBS.toString(), new Document("_id", ac));
×
287
        if (!cursor.hasNext()) {
×
288
            return null;
×
289
        }
290
        try {
291
            // Parse from Jelly, not TriG (it's faster)
292
            return JellyUtils.readFromDB(((Binary) cursor.next().get("jelly")).getData());
×
293
        } catch (RDF4JException | MalformedNanopubException ex) {
×
294
            log.error("Failed to parse Jelly content for nanopub '{}'; returning null", nanopubId, ex);
×
295
            return null;
×
296
        }
297
    }
298

299
    // TODO Provide this method in nanopub-java (GetNanopub)
300
    private static Nanopub getNanopub(String uriOrArtifactCode) {
301
        List<String> peerUrls = new ArrayList<>(Utils.getPeerUrls());
×
302
        Collections.shuffle(peerUrls);
×
303
        String ac = GetNanopub.getArtifactCode(uriOrArtifactCode);
×
304
        if (!ac.startsWith(RdfModule.MODULE_ID)) {
×
305
            throw new IllegalArgumentException("Not a trusty URI of type RA");
×
306
        }
307
        while (!peerUrls.isEmpty()) {
×
308
            String peerUrl = peerUrls.removeFirst();
×
309
            try {
310
                Nanopub np = get(ac, peerUrl, NanopubUtils.getHttpClient());
×
311
                if (np != null) {
×
312
                    return np;
×
313
                }
314
            } catch (IOException | RDF4JException | MalformedNanopubException ex) {
×
315
                log.debug("Failed to fetch nanopub {} from peer {}: {}", ac, peerUrl, ex.getMessage(), ex);
×
316
            }
×
317
        }
×
318
        return null;
×
319
    }
320

321
    // TODO Provide this method in nanopub-java (GetNanopub)
322
    private static Nanopub get(String artifactCode, String registryUrl, HttpClient httpClient)
323
            throws IOException, RDF4JException, MalformedNanopubException {
324
        HttpGet get = null;
×
325
        // TODO Get in Jelly format:
326
        String getUrl = registryUrl + "np/" + artifactCode;
×
327
        try {
328
            get = new HttpGet(getUrl);
×
329
        } catch (IllegalArgumentException ex) {
×
330
            throw new IOException("invalid URL: " + getUrl);
×
331
        }
×
332
        get.setHeader("Accept", "application/trig");
×
333
        InputStream in = null;
×
334
        try {
335
            HttpResponse resp = httpClient.execute(get);
×
336
            if (!wasSuccessful(resp)) {
×
337
                EntityUtils.consumeQuietly(resp.getEntity());
×
338
                throw new IOException(resp.getStatusLine().toString());
×
339
            }
340
            in = resp.getEntity().getContent();
×
341
            Nanopub nanopub = new NanopubImpl(in, RDFFormat.TRIG);
×
342
            if (!TrustyNanopubUtils.isValidTrustyNanopub(nanopub)) {
×
343
                throw new MalformedNanopubException("Nanopub is not trusty");
×
344
            }
345
            return nanopub;
×
346
        } finally {
347
            if (in != null) {
×
348
                in.close();
×
349
            }
350
        }
351
    }
352

353
    private static boolean wasSuccessful(HttpResponse resp) {
354
        int c = resp.getStatusLine().getStatusCode();
×
355
        return c >= 200 && c < 300;
×
356
    }
357

358
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc