• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-registry / 24121799241

08 Apr 2026 06:42AM UTC coverage: 32.765% (-0.6%) from 33.374%
24121799241

push

github

tkuhn
Merge branch 'feat/agent-quota-enforcement'

263 of 888 branches covered (29.62%)

Branch coverage included in aggregate %.

793 of 2335 relevant lines covered (33.96%)

5.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

8.15
src/main/java/com/knowledgepixels/registry/NanopubLoader.java
1
package com.knowledgepixels.registry;
2

3
import com.mongodb.ErrorCategory;
4
import com.mongodb.MongoWriteException;
5
import com.mongodb.client.ClientSession;
6
import com.mongodb.client.MongoCursor;
7
import net.trustyuri.TrustyUriUtils;
8
import net.trustyuri.rdf.RdfModule;
9
import org.apache.http.Header;
10
import org.apache.http.HttpResponse;
11
import org.apache.http.client.HttpClient;
12
import org.apache.http.client.methods.CloseableHttpResponse;
13
import org.apache.http.client.methods.HttpGet;
14
import org.apache.http.util.EntityUtils;
15
import org.bson.Document;
16
import org.bson.types.Binary;
17
import org.eclipse.rdf4j.common.exception.RDF4JException;
18
import org.eclipse.rdf4j.rio.RDFFormat;
19
import org.nanopub.MalformedNanopubException;
20
import org.nanopub.Nanopub;
21
import org.nanopub.NanopubImpl;
22
import org.nanopub.NanopubUtils;
23
import org.nanopub.extra.server.GetNanopub;
24
import org.nanopub.jelly.JellyUtils;
25
import org.nanopub.jelly.MaybeNanopub;
26
import org.nanopub.jelly.NanopubStream;
27
import org.nanopub.trusty.TrustyNanopubUtils;
28
import org.nanopub.vocabulary.NPX;
29
import org.slf4j.Logger;
30
import org.slf4j.LoggerFactory;
31

32
import java.io.IOException;
33
import java.io.InputStream;
34
import java.util.ArrayList;
35
import java.util.Collections;
36
import java.util.List;
37
import java.util.concurrent.ExecutorService;
38
import java.util.concurrent.Executors;
39
import java.util.concurrent.Semaphore;
40
import java.util.concurrent.TimeUnit;
41
import java.util.concurrent.atomic.AtomicReference;
42
import java.util.function.Consumer;
43
import java.util.stream.Stream;
44

45
import static com.knowledgepixels.registry.RegistryDB.has;
46
import static com.knowledgepixels.registry.RegistryDB.insert;
47

48
public class NanopubLoader {
49

50
    private NanopubLoader() {
51
    }
52

53
    public final static String INTRO_TYPE = NPX.DECLARED_BY.stringValue();
9✔
54
    public final static String INTRO_TYPE_HASH = Utils.getHash(INTRO_TYPE);
9✔
55
    public final static String ENDORSE_TYPE = Utils.APPROVES_OF.stringValue();
9✔
56
    public final static String ENDORSE_TYPE_HASH = Utils.getHash(ENDORSE_TYPE);
9✔
57
    private static final Logger log = LoggerFactory.getLogger(NanopubLoader.class);
9✔
58

59
    // TODO Distinguish and support these cases:
60
    //      1. Simple load: load to all core lists if pubkey is "core-loaded", or load to all lists if pubkey is "full-loaded"
61
    //      2. Core load: load to all core lists (initialize if needed), or load to all lists if pubkey is "full-loaded"
62
    //      3. Full load: load to all lists (initialize if needed)
63

64
    public static void simpleLoad(ClientSession mongoSession, String nanopubId) {
65
        simpleLoad(mongoSession, nanopubId, true);
×
66
    }
×
67

68
    public static void simpleLoad(ClientSession mongoSession, String nanopubId, boolean persistOnRetrieve) {
69
        if (persistOnRetrieve) {
×
70
            simpleLoad(mongoSession, retrieveNanopub(mongoSession, nanopubId));
×
71
        } else {
72
            Nanopub np = retrieveLocalNanopub(mongoSession, nanopubId);
×
73
            if (np == null) {
×
74
                np = getNanopub(nanopubId);
×
75
            }
76
            if (np != null) {
×
77
                simpleLoad(mongoSession, np);
×
78
            }
79
        }
80
    }
×
81

82
    public static void simpleLoad(ClientSession mongoSession, Nanopub np) {
83
        String pubkey = RegistryDB.getPubkey(np);
×
84
        if (pubkey == null) {
×
85
            log.info("Ignore (not signed): {}", np.getUri());
×
86
            return;
×
87
        }
88
        simpleLoad(mongoSession, np, pubkey);
×
89
    }
×
90

91
    /**
92
     * Loads a nanopub to the appropriate lists, using a pre-verified public key
93
     * to skip redundant signature verification.
94
     */
95
    public static void simpleLoad(ClientSession mongoSession, Nanopub np, String verifiedPubkey) {
96
        String pubkeyHash = Utils.getHash(verifiedPubkey);
9✔
97
        // TODO Do we need to load anything else here, into the other DB collections?
98
        if (has(mongoSession, "lists", new Document("pubkey", pubkeyHash).append("type", "$").append("status", "loaded"))) {
45!
99
            RegistryDB.loadNanopubVerified(mongoSession, np, verifiedPubkey, pubkeyHash, "$");
×
100
        } else if (has(mongoSession, "lists", new Document("pubkey", pubkeyHash).append("type", INTRO_TYPE_HASH).append("status", "loaded"))) {
45!
101
            RegistryDB.loadNanopubVerified(mongoSession, np, verifiedPubkey, pubkeyHash, INTRO_TYPE, ENDORSE_TYPE);
×
102
        } else if (!has(mongoSession, "lists", new Document("pubkey", pubkeyHash).append("type", INTRO_TYPE_HASH))) {
36!
103
            // Unknown pubkey: create encountered intro list so RUN_OPTIONAL_LOAD picks it up
104
            try {
105
                insert(mongoSession, "lists", new Document("pubkey", pubkeyHash)
30✔
106
                        .append("type", INTRO_TYPE_HASH)
9✔
107
                        .append("status", EntryStatus.encountered.getValue()));
6✔
108
            } catch (MongoWriteException e) {
×
109
                if (e.getError().getCategory() != ErrorCategory.DUPLICATE_KEY) throw e;
×
110
            }
3✔
111
        }
112
    }
3✔
113

114
    private static final int LOAD_PARALLELISM = Integer.parseInt(
12✔
115
            Utils.getEnv("REGISTRY_LOAD_PARALLELISM", String.valueOf(Runtime.getRuntime().availableProcessors())));
12✔
116

117
    /**
118
     * Processes a stream of nanopubs in parallel using a thread pool.
119
     * Each worker thread uses its own MongoDB ClientSession.
120
     * Backpressure is applied via a semaphore to avoid unbounded memory growth.
121
     *
122
     * @param stream    the nanopub stream to process
123
     * @param processor consumer that processes each nanopub (called with its own ClientSession)
124
     */
125
    public static void loadStreamInParallel(Stream<MaybeNanopub> stream, Consumer<Nanopub> processor) {
126
        if (LOAD_PARALLELISM <= 1) {
×
127
            // Fall back to sequential processing
128
            stream.forEach(m -> {
×
129
                if (!m.isSuccess()) throw new AbortingTaskException("Failed to download nanopub; aborting task...");
×
130
                processor.accept(m.getNanopub());
×
131
            });
×
132
            return;
×
133
        }
134

135
        ExecutorService executor = Executors.newFixedThreadPool(LOAD_PARALLELISM);
×
136
        Semaphore semaphore = new Semaphore(LOAD_PARALLELISM * 2);
×
137
        AtomicReference<Exception> error = new AtomicReference<>();
×
138

139
        try {
140
            stream.forEach(m -> {
×
141
                if (error.get() != null) return;
×
142
                if (!m.isSuccess()) {
×
143
                    error.compareAndSet(null, new AbortingTaskException("Failed to download nanopub; aborting task..."));
×
144
                    return;
×
145
                }
146
                Nanopub np = m.getNanopub();
×
147
                try {
148
                    semaphore.acquire();
×
149
                } catch (InterruptedException e) {
×
150
                    Thread.currentThread().interrupt();
×
151
                    error.compareAndSet(null, e);
×
152
                    return;
×
153
                }
×
154
                executor.submit(() -> {
×
155
                    try {
156
                        processor.accept(np);
×
157
                    } catch (Exception e) {
×
158
                        error.compareAndSet(null, e);
×
159
                    } finally {
160
                        semaphore.release();
×
161
                    }
162
                });
×
163
            });
×
164
        } finally {
165
            executor.shutdown();
×
166
            try {
167
                executor.awaitTermination(1, TimeUnit.HOURS);
×
168
            } catch (InterruptedException e) {
×
169
                Thread.currentThread().interrupt();
×
170
            }
×
171
        }
172

173
        if (error.get() != null) {
×
174
            if (error.get() instanceof RuntimeException re) throw re;
×
175
            throw new RuntimeException("Parallel loading failed", error.get());
×
176
        }
177
    }
×
178

179
    /**
180
     * Retrieve Nanopubs from the peers of this Nanopub Registry.
181
     *
182
     * @param typeHash   The hash of the type of the Nanopub to retrieve.
183
     * @param pubkeyHash The hash of the pubkey of the Nanopub to retrieve.
184
     * @return A stream of MaybeNanopub objects, or an empty stream if no peer is available.
185
     */
186
    public static Stream<MaybeNanopub> retrieveNanopubsFromPeers(String typeHash, String pubkeyHash) {
187
        return retrieveNanopubsFromPeers(typeHash, pubkeyHash, null);
×
188
    }
189

190
    /**
191
     * Retrieve Nanopubs from the peers, optionally skipping ahead using checksums.
192
     *
193
     * @param typeHash        The hash of the type of the Nanopub to retrieve.
194
     * @param pubkeyHash      The hash of the pubkey of the Nanopub to retrieve.
195
     * @param afterChecksums  Comma-separated checksums for skip-ahead (geometric fallback), or null for full fetch.
196
     * @return A stream of MaybeNanopub objects, or an empty stream if no peer is available.
197
     */
198
    public static Stream<MaybeNanopub> retrieveNanopubsFromPeers(String typeHash, String pubkeyHash, String afterChecksums) {
199
        // TODO Move the code of this method to nanopub-java library.
200

201
        List<String> peerUrlsToTry = new ArrayList<>(Utils.getPeerUrls());
×
202
        Collections.shuffle(peerUrlsToTry);
×
203
        while (!peerUrlsToTry.isEmpty()) {
×
204
            String peerUrl = peerUrlsToTry.removeFirst();
×
205

206
            String requestUrl = peerUrl + "list/" + pubkeyHash + "/" + typeHash + ".jelly";
×
207
            if (afterChecksums != null) {
×
208
                requestUrl += "?afterChecksums=" + afterChecksums;
×
209
            }
210
            log.info("Request: {}", requestUrl);
×
211
            try {
212
                CloseableHttpResponse resp = NanopubUtils.getHttpClient().execute(new HttpGet(requestUrl));
×
213
                int httpStatus = resp.getStatusLine().getStatusCode();
×
214
                if (httpStatus < 200 || httpStatus >= 300) {
×
215
                    log.info("Request failed: {} {}", peerUrl, httpStatus);
×
216
                    EntityUtils.consumeQuietly(resp.getEntity());
×
217
                    continue;
×
218
                }
219
                Header nrStatus = resp.getFirstHeader("Nanopub-Registry-Status");
×
220
                if (nrStatus == null) {
×
221
                    log.info("Nanopub-Registry-Status header not found at: {}", peerUrl);
×
222
                    EntityUtils.consumeQuietly(resp.getEntity());
×
223
                    continue;
×
224
                } else if (!nrStatus.getValue().equals("ready") && !nrStatus.getValue().equals("updating")) {
×
225
                    log.info("Peer in non-ready state: {} {}", peerUrl, nrStatus.getValue());
×
226
                    EntityUtils.consumeQuietly(resp.getEntity());
×
227
                    continue;
×
228
                }
229
                InputStream is = resp.getEntity().getContent();
×
230
                return NanopubStream.fromByteStream(is).getAsNanopubs().onClose(() -> {
×
231
                    try {
232
                        resp.close();
×
233
                    } catch (IOException e) {
×
234
                        log.debug("Error closing HTTP response", e);
×
235
                    }
×
236
                });
×
237
            } catch (UnsupportedOperationException | IOException ex) {
×
238
                log.info("Request failed: ", ex);
×
239
            }
240
        }
×
241
        return Stream.empty();
×
242
    }
243

244
    public static Nanopub retrieveNanopub(ClientSession mongoSession, String nanopubId) {
245
        Nanopub np = retrieveLocalNanopub(mongoSession, nanopubId);
×
246
        int tryCount = 0;
×
247
        while (np == null) {
×
248
            if (tryCount > 10) {
×
249
                throw new RuntimeException("Could not load nanopub: " + nanopubId);
×
250
            } else if (tryCount > 0) {
×
251
                try {
252
                    Thread.sleep(100);
×
253
                } catch (InterruptedException ex) {
×
254
                    log.info("Thread was interrupted", ex);
×
255
                }
×
256
            }
257
            log.info("Loading {}", nanopubId);
×
258

259
            // TODO Reach out to other Nanopub Registries here:
260
            np = getNanopub(nanopubId);
×
261
            if (np != null) {
×
262
                RegistryDB.loadNanopub(mongoSession, np);
×
263
            } else {
264
                tryCount = tryCount + 1;
×
265
            }
266
        }
267
        return np;
×
268
    }
269

270
    public static Nanopub retrieveLocalNanopub(ClientSession mongoSession, String nanopubId) {
271
        String ac = TrustyUriUtils.getArtifactCode(nanopubId);
×
272
        MongoCursor<Document> cursor = RegistryDB.get(mongoSession, Collection.NANOPUBS.toString(), new Document("_id", ac));
×
273
        if (!cursor.hasNext()) return null;
×
274
        try {
275
            // Parse from Jelly, not TriG (it's faster)
276
            return JellyUtils.readFromDB(((Binary) cursor.next().get("jelly")).getData());
×
277
        } catch (RDF4JException | MalformedNanopubException ex) {
×
278
            log.info("Exception reading Jelly", ex);
×
279
            return null;
×
280
        }
281
    }
282

283
    // TODO Provide this method in nanopub-java (GetNanopub)
284
    private static Nanopub getNanopub(String uriOrArtifactCode) {
285
        List<String> peerUrls = new ArrayList<>(Utils.getPeerUrls());
×
286
        Collections.shuffle(peerUrls);
×
287
        String ac = GetNanopub.getArtifactCode(uriOrArtifactCode);
×
288
        if (!ac.startsWith(RdfModule.MODULE_ID)) {
×
289
            throw new IllegalArgumentException("Not a trusty URI of type RA");
×
290
        }
291
        while (!peerUrls.isEmpty()) {
×
292
            String peerUrl = peerUrls.removeFirst();
×
293
            try {
294
                Nanopub np = get(ac, peerUrl, NanopubUtils.getHttpClient());
×
295
                if (np != null) {
×
296
                    return np;
×
297
                }
298
            } catch (IOException | RDF4JException | MalformedNanopubException ex) {
×
299
                // ignore
300
            }
×
301
        }
×
302
        return null;
×
303
    }
304

305
    // TODO Provide this method in nanopub-java (GetNanopub)
306
    private static Nanopub get(String artifactCode, String registryUrl, HttpClient httpClient)
307
            throws IOException, RDF4JException, MalformedNanopubException {
308
        HttpGet get = null;
×
309
        // TODO Get in Jelly format:
310
        String getUrl = registryUrl + "np/" + artifactCode;
×
311
        try {
312
            get = new HttpGet(getUrl);
×
313
        } catch (IllegalArgumentException ex) {
×
314
            throw new IOException("invalid URL: " + getUrl);
×
315
        }
×
316
        get.setHeader("Accept", "application/trig");
×
317
        InputStream in = null;
×
318
        try {
319
            HttpResponse resp = httpClient.execute(get);
×
320
            if (!wasSuccessful(resp)) {
×
321
                EntityUtils.consumeQuietly(resp.getEntity());
×
322
                throw new IOException(resp.getStatusLine().toString());
×
323
            }
324
            in = resp.getEntity().getContent();
×
325
            Nanopub nanopub = new NanopubImpl(in, RDFFormat.TRIG);
×
326
            if (!TrustyNanopubUtils.isValidTrustyNanopub(nanopub)) {
×
327
                throw new MalformedNanopubException("Nanopub is not trusty");
×
328
            }
329
            return nanopub;
×
330
        } finally {
331
            if (in != null) in.close();
×
332
        }
333
    }
334

335
    private static boolean wasSuccessful(HttpResponse resp) {
336
        int c = resp.getStatusLine().getStatusCode();
×
337
        return c >= 200 && c < 300;
×
338
    }
339

340
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc