• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-registry / 24138607049

08 Apr 2026 01:42PM UTC coverage: 32.47% (-0.4%) from 32.824%
24138607049

Pull #99

github

web-flow
Merge 336546501 into 689a63b39
Pull Request #99: Fix peer sync race with committed counter watermark

268 of 926 branches covered (28.94%)

Branch coverage included in aggregate %.

797 of 2354 relevant lines covered (33.86%)

5.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.99
src/main/java/com/knowledgepixels/registry/RegistryDB.java
1
package com.knowledgepixels.registry;
2

3
import com.knowledgepixels.registry.db.IndexInitializer;
4
import com.mongodb.MongoClient;
5
import com.mongodb.MongoNamespace;
6
import com.mongodb.ErrorCategory;
7
import com.mongodb.MongoWriteException;
8
import com.mongodb.ServerAddress;
9
import com.mongodb.client.ClientSession;
10
import com.mongodb.client.MongoCollection;
11
import com.mongodb.client.MongoCursor;
12
import com.mongodb.client.MongoDatabase;
13
import com.mongodb.client.model.CountOptions;
14
import com.mongodb.client.model.FindOneAndUpdateOptions;
15
import com.mongodb.client.model.ReturnDocument;
16
import com.mongodb.client.model.UpdateOptions;
17
import net.trustyuri.TrustyUriUtils;
18
import org.bson.Document;
19
import org.bson.conversions.Bson;
20
import org.bson.types.Binary;
21
import org.eclipse.rdf4j.common.exception.RDF4JException;
22
import org.eclipse.rdf4j.model.IRI;
23
import org.eclipse.rdf4j.rio.RDFFormat;
24
import org.nanopub.MalformedNanopubException;
25
import org.nanopub.Nanopub;
26

27
import java.util.Calendar;
28
import org.nanopub.NanopubUtils;
29
import org.nanopub.extra.security.MalformedCryptoElementException;
30
import org.nanopub.extra.security.NanopubSignatureElement;
31
import org.nanopub.extra.security.SignatureUtils;
32
import org.nanopub.jelly.JellyUtils;
33
import org.slf4j.Logger;
34
import org.slf4j.LoggerFactory;
35

36
import java.io.IOException;
37
import java.security.GeneralSecurityException;
38
import java.util.ArrayList;
39

40
import static com.mongodb.client.model.Indexes.ascending;
41

42
public class RegistryDB {
43

44
    private RegistryDB() {
45
    }
46

47
    private static final String REGISTRY_DB_NAME = Utils.getEnv("REGISTRY_DB_NAME", "nanopubRegistry");
12✔
48

49
    private static final Logger logger = LoggerFactory.getLogger(RegistryDB.class);
9✔
50

51
    private static MongoClient mongoClient;
52
    private static MongoDatabase mongoDB;
53

54
    /**
55
     * Returns the MongoDB database instance.
56
     *
57
     * @return the MongoDatabase instance
58
     */
59
    public static MongoDatabase getDB() {
60
        return mongoDB;
6✔
61
    }
62

63
    /**
64
     * Returns the MongoDB client instance.
65
     *
66
     * @return the MongoClient instance
67
     */
68
    public static MongoClient getClient() {
69
        return mongoClient;
6✔
70
    }
71

72
    /**
73
     * Returns the specified collection from the MongoDB database.
74
     *
75
     * @param name the name of the collection
76
     * @return the MongoCollection instance
77
     */
78
    public static MongoCollection<Document> collection(String name) {
79
        return mongoDB.getCollection(name);
12✔
80
    }
81

82
    /**
83
     * Initializes the MongoDB connection and sets up collections and indexes if not already initialized.
84
     */
85
    public static void init() {
86
        if (mongoClient != null) {
6✔
87
            logger.info("RegistryDB already initialized");
9✔
88
            return;
3✔
89
        }
90
        final String REGISTRY_DB_HOST = Utils.getEnv("REGISTRY_DB_HOST", "mongodb");
12✔
91
        final int REGISTRY_DB_PORT = Integer.parseInt(Utils.getEnv("REGISTRY_DB_PORT", String.valueOf(ServerAddress.defaultPort())));
18✔
92
        logger.info("Initializing RegistryDB connection to database '{}' at {}:{}", REGISTRY_DB_NAME, REGISTRY_DB_HOST, REGISTRY_DB_PORT);
54✔
93
        mongoClient = new MongoClient(REGISTRY_DB_HOST, REGISTRY_DB_PORT);
18✔
94
        mongoDB = mongoClient.getDatabase(REGISTRY_DB_NAME);
12✔
95

96
        try (ClientSession mongoSession = mongoClient.startSession()) {
9✔
97
            if (!isInitialized(mongoSession)) {
9!
98
                IndexInitializer.initCollections(mongoSession);
6✔
99
            }
100
            initCounter(mongoSession);
6✔
101
        }
102
    }
3✔
103

104
    /**
105
     * Checks if the database has been initialized.
106
     *
107
     * @param mongoSession the MongoDB client session
108
     * @return true if initialized, false otherwise
109
     */
110
    public static boolean isInitialized(ClientSession mongoSession) {
111
        return getValue(mongoSession, Collection.SERVER_INFO.toString(), "setupId") != null;
24!
112
    }
113

114
    /**
115
     * Renames a collection in the database. If the new collection name already exists, it will be dropped first.
116
     *
117
     * @param oldCollectionName the current name of the collection
118
     * @param newCollectionName the new name for the collection
119
     */
120
    public static void rename(String oldCollectionName, String newCollectionName) {
121
        // Designed as idempotent operation: calling multiple times has same effect as calling once
122
        if (hasCollection(oldCollectionName)) {
9✔
123
            if (hasCollection(newCollectionName)) {
9✔
124
                collection(newCollectionName).drop();
9✔
125
            }
126
            collection(oldCollectionName).renameCollection(new MongoNamespace(REGISTRY_DB_NAME, newCollectionName));
24✔
127
        }
128
    }
3✔
129

130
    /**
131
     * Checks if a collection with the given name exists in the database.
132
     *
133
     * @param collectionName the name of the collection to check
134
     * @return true if the collection exists, false otherwise
135
     */
136
    public static boolean hasCollection(String collectionName) {
137
        return mongoDB.listCollectionNames().into(new ArrayList<>()).contains(collectionName);
30✔
138
    }
139

140
    /**
141
     * Increases the trust state counter in the server info collection.
142
     *
143
     * @param mongoSession the MongoDB client session
144
     */
145
    public static void increaseStateCounter(ClientSession mongoSession) {
146
        MongoCursor<Document> cursor = collection(Collection.SERVER_INFO.toString()).find(mongoSession, new Document("_id", "trustStateCounter")).cursor();
36✔
147
        if (cursor.hasNext()) {
9✔
148
            long counter = cursor.next().getLong("value");
21✔
149
            collection(Collection.SERVER_INFO.toString()).updateOne(mongoSession, new Document("_id", "trustStateCounter"), new Document("$set", new Document("value", counter + 1)));
69✔
150
        } else {
3✔
151
            collection(Collection.SERVER_INFO.toString()).insertOne(mongoSession, new Document("_id", "trustStateCounter").append("value", 0L));
42✔
152
        }
153
    }
3✔
154

155
    /**
156
     * Checks if an element with the given name exists in the specified collection.
157
     *
158
     * @param mongoSession the MongoDB client session
159
     * @param collection   the name of the collection
160
     * @param elementName  the name of the element used as the _id field
161
     * @return true if the element exists, false otherwise
162
     */
163
    public static boolean has(ClientSession mongoSession, String collection, String elementName) {
164
        return has(mongoSession, collection, new Document("_id", elementName));
27✔
165
    }
166

167
    private static final CountOptions hasCountOptions = new CountOptions().limit(1);
18✔
168

169
    /**
170
     * Checks if any document matching the given filter exists in the specified collection.
171
     *
172
     * @param mongoSession the MongoDB client session
173
     * @param collection   the name of the collection
174
     * @param find         the filter to match documents
175
     * @return true if at least one matching document exists, false otherwise
176
     */
177
    public static boolean has(ClientSession mongoSession, String collection, Bson find) {
178
        return collection(collection).countDocuments(mongoSession, find, hasCountOptions) > 0;
39✔
179
    }
180

181
    /**
182
     * Retrieves a cursor for documents matching the given filter in the specified collection.
183
     *
184
     * @param mongoSession the MongoDB client session
185
     * @param collection   the name of the collection
186
     * @param find         the filter to match documents
187
     * @return a MongoCursor for the matching documents
188
     */
189
    public static MongoCursor<Document> get(ClientSession mongoSession, String collection, Bson find) {
190
        return collection(collection).find(mongoSession, find).cursor();
21✔
191
    }
192

193
    /**
194
     * Retrieves the value of an element with the given name from the specified collection.
195
     *
196
     * @param mongoSession the MongoDB client session
197
     * @param collection   the name of the collection
198
     * @param elementName  the name of the element used as the _id field
199
     * @return the value of the element, or null if not found
200
     */
201
    public static Object getValue(ClientSession mongoSession, String collection, String elementName) {
202
        logger.info("Getting value of element '{}' from collection '{}'", elementName, collection);
15✔
203
        Document d = collection(collection).find(mongoSession, new Document("_id", elementName)).first();
36✔
204
        if (d == null) {
6✔
205
            return null;
6✔
206
        }
207
        return d.get("value");
12✔
208
    }
209

210
    /**
211
     * Retrieves the boolean value of an element with the given name from the specified collection.
212
     *
213
     * @param mongoSession the MongoDB client session
214
     * @param collection   the name of the collection
215
     * @param elementName  the name of the element used as the _id field
216
     * @return the value of the element, or null if not found
217
     */
218
    public static boolean isSet(ClientSession mongoSession, String collection, String elementName) {
219
        Document d = collection(collection).find(mongoSession, new Document("_id", elementName)).first();
36✔
220
        if (d == null) {
6✔
221
            return false;
6✔
222
        }
223
        return d.getBoolean("value");
15✔
224
    }
225

226
    /**
227
     * Retrieves a single document matching the given filter from the specified collection.
228
     *
229
     * @param mongoSession the MongoDB client session
230
     * @param collection   the name of the collection
231
     * @param find         the filter to match the document
232
     * @return the matching document, or null if not found
233
     */
234
    public static Document getOne(ClientSession mongoSession, String collection, Bson find) {
235
        return collection(collection).find(mongoSession, find).first();
24✔
236
    }
237

238
    /**
239
     * Retrieves the maximum value of a specified field from the documents in the given collection.
240
     *
241
     * @param mongoSession the MongoDB client session
242
     * @param collection   the name of the collection
243
     * @param fieldName    the field for which to find the maximum value
244
     * @return the maximum value of the specified field, or null if no documents exist
245
     */
246
    public static Object getMaxValue(ClientSession mongoSession, String collection, String fieldName) {
247
        Document doc = collection(collection).find(mongoSession).projection(new Document(fieldName, 1)).sort(new Document(fieldName, -1)).first();
63✔
248
        if (doc == null) {
6✔
249
            return null;
6✔
250
        }
251
        return doc.get(fieldName);
12✔
252
    }
253

254
    /**
255
     * Retrieves the document with the maximum value of a specified field from the documents matching the given filter in the specified collection.
256
     *
257
     * @param mongoSession the MongoDB client session
258
     * @param collection   the name of the collection
259
     * @param find         the filter to match documents
260
     * @param fieldName    the field for which to find the maximum value
261
     * @return the document with the maximum value of the specified field, or null if no matching documents exist
262
     */
263
    public static Document getMaxValueDocument(ClientSession mongoSession, String collection, Bson find, String fieldName) {
264
        return collection(collection).find(mongoSession, find).sort(new Document(fieldName, -1)).first();
45✔
265
    }
266

267
    /**
268
     * Sets or updates a document in the specified collection.
269
     *
270
     * @param mongoSession the MongoDB client session
271
     * @param collection   the name of the collection
272
     * @param update       the document to set or update (must contain an _id field)
273
     */
274
    public static void set(ClientSession mongoSession, String collection, Document update) {
275
        Bson find = new Document("_id", update.get("_id"));
24✔
276
        MongoCursor<Document> cursor = collection(collection).find(mongoSession, find).cursor();
21✔
277
        if (cursor.hasNext()) {
9✔
278
            collection(collection).updateOne(mongoSession, find, new Document("$set", update));
33✔
279
        }
280
    }
3✔
281

282
    /**
283
     * Inserts a document into the specified collection.
284
     *
285
     * @param mongoSession the MongoDB client session
286
     * @param collection   the name of the collection
287
     * @param doc          the document to insert
288
     */
289
    public static void insert(ClientSession mongoSession, String collection, Document doc) {
290
        collection(collection).insertOne(mongoSession, doc);
15✔
291
    }
3✔
292

293
    /**
294
     * Sets the value of an element with the given name in the specified collection.
295
     * If the element does not exist, it will be created.
296
     *
297
     * @param mongoSession the MongoDB client session
298
     * @param collection   the name of the collection
299
     * @param elementId    the name of the element used as the _id field
300
     * @param value        the value to set
301
     */
302
    public static void setValue(ClientSession mongoSession, String collection, String elementId, Object value) {
303
        collection(collection).updateOne(mongoSession, new Document("_id", elementId), new Document("$set", new Document("value", value)), new UpdateOptions().upsert(true));
72✔
304
    }
3✔
305

306
    /**
307
     * Records the hash of a given value in the "hashes" collection.
308
     * Uses upsert to avoid expensive exception-based duplicate handling.
309
     *
310
     * @param mongoSession the MongoDB client session
311
     * @param value        the value to hash and record
312
     */
313
    public static void recordHash(ClientSession mongoSession, String value) {
314
        String hash = Utils.getHash(value);
9✔
315
        try {
316
            collection("hashes").updateOne(mongoSession,
60✔
317
                    new Document("value", value),
318
                    new Document("$setOnInsert", new Document("value", value).append("hash", hash)),
18✔
319
                    new UpdateOptions().upsert(true));
3✔
320
        } catch (MongoWriteException e) {
×
321
            // Concurrent upsert race: another thread inserted the same hash — safe to ignore
322
            if (e.getError().getCategory() != ErrorCategory.DUPLICATE_KEY) throw e;
×
323
        }
3✔
324
    }
3✔
325

326
    /**
327
     * Retrieves the original value corresponding to a given hash from the "hashes" collection.
328
     *
329
     * @param hash the hash to look up
330
     * @return the original value, or null if not found
331
     */
332
    public static String unhash(String hash) {
333
        try (var c = collection("hashes").find(new Document("hash", hash)).cursor()) {
30✔
334
            if (c.hasNext()) {
9✔
335
                return c.next().getString("value");
24✔
336
            }
337
            return null;
12✔
338
        }
12!
339
    }
340

341
    private static final int COUNTER_BATCH_SIZE = Integer.parseInt(
12✔
342
            Utils.getEnv("REGISTRY_COUNTER_BATCH_SIZE", "20"));
3✔
343

344
    /**
345
     * Thread-local batch range for counter allocation: [nextToUse, endExclusive).
346
     * Each thread claims a batch of COUNTER_BATCH_SIZE counter values from MongoDB,
347
     * then hands them out locally with zero contention until the batch is exhausted.
348
     */
349
    private static final ThreadLocal<long[]> counterRange = new ThreadLocal<>();
15✔
350

351
    /**
352
     * Initializes the counter documents to the current maximum counter value
353
     * in the nanopubs collection.
354
     * Uses $max to ensure the counters are never decreased. Safe to call on every startup.
355
     */
356
    private static void initCounter(ClientSession mongoSession) {
357
        Long maxCounter = (Long) getMaxValue(mongoSession, Collection.NANOPUBS.toString(), "counter");
21✔
358
        long effective = maxCounter != null ? maxCounter : 0L;
21✔
359
        collection("counters").updateOne(mongoSession,
51✔
360
                new Document("_id", "nanopubs"),
361
                new Document("$max", new Document("value", effective)),
21✔
362
                new UpdateOptions().upsert(true));
3✔
363
        collection("counters").updateOne(mongoSession,
51✔
364
                new Document("_id", "nanopubs-committed"),
365
                new Document("$max", new Document("value", effective)),
21✔
366
                new UpdateOptions().upsert(true));
3✔
367
        logger.info("Counter initialized to {}", effective);
15✔
368
    }
3✔
369

370
    /**
371
     * Returns the next counter value for a nanopub, using thread-local batch
372
     * allocation to reduce MongoDB contention.
373
     */
374
    private static long getNextCounter(ClientSession mongoSession) {
375
        long[] range = counterRange.get();
12✔
376
        if (range != null && range[0] < range[1]) {
30!
377
            return range[0]++;
27✔
378
        }
379
        Document result = collection("counters").findOneAndUpdate(
57✔
380
                mongoSession,
381
                new Document("_id", "nanopubs"),
382
                new Document("$inc", new Document("value", (long) COUNTER_BATCH_SIZE)),
21✔
383
                new FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER));
9✔
384
        long batchEnd = result.getLong("value");
15✔
385
        long batchStart = batchEnd - COUNTER_BATCH_SIZE + 1;
21✔
386
        counterRange.set(new long[]{batchStart + 1, batchEnd + 1});
48✔
387
        return batchStart;
6✔
388
    }
389

390
    /**
391
     * Updates the committed counter to the current max(counter) from nanopubs.
392
     * Call this after parallel loading completes to make new nanopubs visible to peers.
393
     */
394
    public static void updateCommittedCounter(ClientSession mongoSession) {
395
        Long maxCounter = (Long) getMaxValue(mongoSession, Collection.NANOPUBS.toString(), "counter");
×
396
        if (maxCounter != null) {
×
397
            collection("counters").updateOne(mongoSession,
×
398
                    new Document("_id", "nanopubs-committed"),
399
                    new Document("$max", new Document("value", maxCounter)),
400
                    new UpdateOptions().upsert(true));
×
401
        }
402
    }
×
403

404
    /**
405
     * Returns the committed counter value — the highest counter below which all
406
     * nanopubs are guaranteed to be inserted. Safe for peers to sync up to.
407
     */
408
    public static long getCommittedCounter(ClientSession mongoSession) {
409
        Document doc = collection("counters").find(mongoSession, new Document("_id", "nanopubs-committed")).first();
×
410
        return doc != null && doc.get("value") != null ? doc.getLong("value") : 0L;
×
411
    }
412

413
    /**
414
     * Loads a nanopublication into the database.
415
     *
416
     * @param mongoSession the MongoDB client session
417
     * @param nanopub      the nanopublication to load
418
     */
419
    public static boolean loadNanopub(ClientSession mongoSession, Nanopub nanopub) {
420
        return loadNanopub(mongoSession, nanopub, null);
21✔
421
    }
422

423
    /**
424
     * Loads a nanopublication into the database, optionally filtering by public key hash and types.
425
     *
426
     * @param mongoSession the MongoDB client session
427
     * @param nanopub      the nanopublication to load
428
     * @param pubkeyHash   the public key hash to filter by (can be null)
429
     * @param types        the types to filter by (can be empty)
430
     * @return true if the nanopublication was loaded, false otherwise
431
     */
432
    public static boolean loadNanopub(ClientSession mongoSession, Nanopub nanopub, String pubkeyHash, String... types) {
433
        String pubkey = getPubkey(nanopub);
9✔
434
        if (pubkey == null) {
6✔
435
            logger.info("Ignoring invalid nanopub: {}", nanopub.getUri());
15✔
436
            return false;
6✔
437
        }
438
        return loadNanopubVerified(mongoSession, nanopub, pubkey, pubkeyHash, types);
21✔
439
    }
440

441
    /**
442
     * Loads a nanopublication with a pre-verified public key, skipping signature verification.
443
     * Use this when the caller has already verified the signature via getPubkey().
444
     */
445
    static boolean loadNanopubVerified(ClientSession mongoSession, Nanopub nanopub, String verifiedPubkey, String pubkeyHash, String... types) {
446
        if (nanopub.getTripleCount() > 1200) {
12!
447
            logger.info("Nanopub has too many triples ({}): {}", nanopub.getTripleCount(), nanopub.getUri());
×
448
            return false;
×
449
        }
450
        if (nanopub.getByteCount() > 1000000) {
15!
451
            logger.info("Nanopub is too large ({}): {}", nanopub.getByteCount(), nanopub.getUri());
×
452
            return false;
×
453
        }
454
        Calendar creationTime;
455
        try {
456
            creationTime = nanopub.getCreationTime();
9✔
457
        } catch (Exception ex) {
×
458
            logger.info("Nanopub has malformed timestamp, treating as no timestamp: {}", nanopub.getUri());
×
459
            creationTime = null;
×
460
        }
3✔
461
        if (creationTime != null && creationTime.getTimeInMillis() > System.currentTimeMillis() + 60000) {
27!
462
            logger.info("Nanopub has a future timestamp: {}", nanopub.getUri());
×
463
            return false;
×
464
        }
465
        String nanopubUriStr = nanopub.getUri().stringValue();
12✔
466
        for (IRI graphUri : nanopub.getGraphUris()) {
33✔
467
            if (!graphUri.stringValue().startsWith(nanopubUriStr)) {
15!
468
                logger.info("Nanopub has graph URI not matching base URI: {}", nanopub.getUri());
×
469
                return false;
×
470
            }
471
        }
3✔
472
        String ph = Utils.getHash(verifiedPubkey);
9✔
473
        if (pubkeyHash != null && !pubkeyHash.equals(ph)) {
18!
474
            logger.info("Ignoring nanopub with non-matching pubkey: {}", nanopub.getUri());
×
475
            return false;
×
476
        }
477
        recordHash(mongoSession, verifiedPubkey);
9✔
478

479
        String ac = TrustyUriUtils.getArtifactCode(nanopub.getUri().stringValue());
15✔
480
        if (ac == null) {
6!
481
            // I don't think this ever happens, but checking here to be sure
482
            logger.info("ERROR. Unexpected Trusty URI: {}", nanopub.getUri());
×
483
            return false;
×
484
        }
485
        if (has(mongoSession, Collection.NANOPUBS.toString(), ac)) {
18✔
486
            logger.debug("Already loaded: {}", nanopub.getUri());
18✔
487
        } else {
488
            String nanopubString;
489
            byte[] jellyContent;
490
            try {
491
                nanopubString = NanopubUtils.writeToString(nanopub, RDFFormat.TRIG);
12✔
492
                // Save the same thing in the Jelly format for faster loading
493
                jellyContent = JellyUtils.writeNanopubForDB(nanopub);
9✔
494
            } catch (IOException ex) {
×
495
                throw new RuntimeException(ex);
×
496
            }
3✔
497
            long counter = getNextCounter(mongoSession);
9✔
498
            boolean inserted = false;
6✔
499
            try {
500
                collection(Collection.NANOPUBS.toString()).insertOne(mongoSession, new Document("_id", ac).append("fullId", nanopub.getUri().stringValue()).append("counter", counter).append("pubkey", ph).append("content", nanopubString).append("jelly", new Binary(jellyContent)));
93✔
501
                inserted = true;
6✔
502
            } catch (MongoWriteException e) {
×
503
                if (e.getError().getCategory() != ErrorCategory.DUPLICATE_KEY) throw e;
×
504
                // Another thread inserted this nanopub concurrently — safe to skip
505
                logger.debug("Already loaded (concurrent): {}", nanopub.getUri());
×
506
            }
3✔
507

508
            if (inserted) {
6!
509
                for (IRI invalidatedId : Utils.getInvalidatedNanopubIds(nanopub)) {
33✔
510
                    String invalidatedAc = TrustyUriUtils.getArtifactCode(invalidatedId.stringValue());
12✔
511
                    if (invalidatedAc == null) continue;  // This should never happen; checking here just to be sure
6!
512

513
                    // Add this nanopub also to all lists of invalidated nanopubs:
514
                    collection("invalidations").insertOne(mongoSession, new Document("invalidatingNp", ac).append("invalidatingPubkey", ph).append("invalidatedNp", invalidatedAc));
45✔
515
                    MongoCursor<Document> invalidatedEntries = collection("listEntries").find(mongoSession, new Document("np", invalidatedAc).append("pubkey", ph)).cursor();
42✔
516
                    while (invalidatedEntries.hasNext()) {
9!
517
                        Document invalidatedEntry = invalidatedEntries.next();
×
518
                        addToList(mongoSession, nanopub, ph, invalidatedEntry.getString("type"));
×
519
                    }
×
520

521
                    collection("listEntries").updateMany(mongoSession, new Document("np", invalidatedAc).append("pubkey", ph), new Document("$set", new Document("invalidated", true)));
69✔
522
                    collection("trustEdges").updateMany(mongoSession, new Document("source", invalidatedAc), new Document("$set", new Document("invalidated", true)));
60✔
523
                }
3✔
524
            }
525
        }
526

527
        if (pubkeyHash != null) {
6✔
528
            for (String type : types) {
48✔
529
                // TODO Check if nanopub really has the type?
530
                addToList(mongoSession, nanopub, pubkeyHash, Utils.getTypeHash(mongoSession, type));
21✔
531
                if (type.equals("$")) {
12!
532
                    for (IRI t : NanopubUtils.getTypes(nanopub)) {
33✔
533
                        String th = Utils.getTypeHash(mongoSession, t);
12✔
534
                        if (CoverageFilter.isCoveredType(th)) {
9!
535
                            addToList(mongoSession, nanopub, pubkeyHash, th);
15✔
536
                        }
537
                    }
3✔
538
                }
539
            }
540
        }
541

542
        // Add the invalidating nanopubs also to the lists of this nanopub:
543
        try (MongoCursor<Document> invalidations = collection("invalidations").find(mongoSession, new Document("invalidatedNp", ac).append("invalidatingPubkey", ph)).cursor()) {
42✔
544
            if (invalidations.hasNext()) {
9!
545
                collection("listEntries").updateMany(mongoSession, new Document("np", ac).append("pubkey", ph), new Document("$set", new Document("invalidated", true)));
×
546
                collection("trustEdges").updateMany(mongoSession, new Document("source", ac), new Document("$set", new Document("invalidated", true)));
×
547
            }
548
            while (invalidations.hasNext()) {
9!
549
                String iac = invalidations.next().getString("invalidatingNp");
×
550
                try {
551
                    Document npDoc = collection(Collection.NANOPUBS.toString()).find(mongoSession, new Document("_id", iac)).projection(new Document("jelly", 1)).first();
×
552
                    Nanopub inp = JellyUtils.readFromDB(npDoc.get("jelly", Binary.class).getData());
×
553
                    for (IRI type : NanopubUtils.getTypes(inp)) {
×
554
                        addToList(mongoSession, inp, ph, Utils.getTypeHash(mongoSession, type));
×
555
                    }
×
556
                } catch (RDF4JException | MalformedNanopubException ex) {
×
557
                    ex.printStackTrace();
×
558
                }
×
559
            }
×
560

561
        }
562

563
        return true;
6✔
564
    }
565

566
    private static void addToList(ClientSession mongoSession, Nanopub nanopub, String pubkeyHash, String typeHash) {
567
        String ac = TrustyUriUtils.getArtifactCode(nanopub.getUri().stringValue());
15✔
568
        try {
569
            insert(mongoSession, "lists", new Document("pubkey", pubkeyHash).append("type", typeHash)
39✔
570
                    .append("maxPosition", -1L));
6✔
571
        } catch (MongoWriteException e) {
×
572
            // Duplicate key error -- ignore it
573
            if (e.getError().getCategory() != ErrorCategory.DUPLICATE_KEY) throw e;
×
574
        }
3✔
575

576
        if (has(mongoSession, "listEntries", new Document("pubkey", pubkeyHash).append("type", typeHash).append("np", ac))) {
45!
577
            logger.debug("Already listed: {}", nanopub.getUri());
×
578
        } else {
579
            initListPositionIfNeeded(mongoSession, pubkeyHash, typeHash);
12✔
580

581
            for (int attempt = 0; ; attempt++) {
6✔
582
                // Atomically claim next position
583
                Document updated = collection("lists").findOneAndUpdate(mongoSession,
39✔
584
                        new Document("pubkey", pubkeyHash).append("type", typeHash),
24✔
585
                        new Document("$inc", new Document("maxPosition", 1L)),
21✔
586
                        new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER));
3✔
587
                long position = updated.getLong("maxPosition");
15✔
588

589
                // Get checksum from previous entry by exact position lookup (O(1) index hit)
590
                String checksum;
591
                if (position == 0) {
12!
592
                    checksum = NanopubUtils.updateXorChecksum(nanopub.getUri(), NanopubUtils.INIT_CHECKSUM);
18✔
593
                } else {
594
                    Document prevEntry = collection("listEntries").find(mongoSession,
×
595
                            new Document("pubkey", pubkeyHash).append("type", typeHash)
×
596
                                    .append("position", position - 1)).first();
×
597
                    String prevChecksum = (prevEntry != null) ? prevEntry.getString("checksum") : null;
×
598
                    if (prevChecksum == null) {
×
599
                        // Rare: previous entry not yet inserted by concurrent thread; fall back to sorted query
600
                        Document maxDoc = getMaxValueDocument(mongoSession, "listEntries",
×
601
                                new Document("pubkey", pubkeyHash).append("type", typeHash), "position");
×
602
                        prevChecksum = (maxDoc != null) ? maxDoc.getString("checksum") : NanopubUtils.INIT_CHECKSUM;
×
603
                    }
604
                    checksum = NanopubUtils.updateXorChecksum(nanopub.getUri(), prevChecksum);
×
605
                }
606

607
                try {
608
                    collection("listEntries").insertOne(mongoSession, new Document("pubkey", pubkeyHash)
33✔
609
                            .append("type", typeHash).append("position", position).append("np", ac)
30✔
610
                            .append("checksum", checksum).append("invalidated", false));
15✔
611
                    break;
3✔
612
                } catch (MongoWriteException e) {
×
613
                    if (e.getError().getCategory() != ErrorCategory.DUPLICATE_KEY) throw e;
×
614
                    if (has(mongoSession, "listEntries", new Document("pubkey", pubkeyHash)
×
615
                            .append("type", typeHash).append("np", ac))) {
×
616
                        break; // Already listed by concurrent thread
×
617
                    }
618
                    if (attempt >= 100) {
×
619
                        throw new RuntimeException("Failed to insert list entry after " + (attempt + 1) + " attempts");
×
620
                    }
621
                }
622
            }
623
        }
624
    }
3✔
625

626
    /**
627
     * Lazily initializes the maxPosition field on a lists document for lists
628
     * created before this field existed. Uses a one-time sorted query, then
629
     * all subsequent calls use the atomic counter.
630
     */
631
    private static void initListPositionIfNeeded(ClientSession mongoSession, String pubkeyHash, String typeHash) {
632
        Document listDoc = collection("lists").find(mongoSession,
33✔
633
                new Document("pubkey", pubkeyHash).append("type", typeHash)).first();
12✔
634
        if (listDoc == null || listDoc.get("maxPosition") != null) return;
21!
635

636
        Document maxDoc = getMaxValueDocument(mongoSession, "listEntries",
×
637
                new Document("pubkey", pubkeyHash).append("type", typeHash), "position");
×
638
        long maxPos = (maxDoc != null) ? maxDoc.getLong("position") : -1L;
×
639

640
        // Conditional update: only set if maxPosition still doesn't exist (race-safe)
641
        collection("lists").updateOne(mongoSession,
×
642
                new Document("pubkey", pubkeyHash).append("type", typeHash)
×
643
                        .append("maxPosition", new Document("$exists", false)),
×
644
                new Document("$set", new Document("maxPosition", maxPos)));
×
645
    }
×
646

647
    /**
648
     * Builds a comma-separated list of checksums at geometric positions for a given list,
649
     * for use with the afterChecksums parameter during peer sync.
650
     * Returns checksums at positions: max, max-10, max-100, max-1000, max-10000, ...
651
     * Returns null if the list has no entries.
652
     */
653
    public static String buildChecksumFallbacks(ClientSession mongoSession, String pubkeyHash, String typeHash) {
654
        Document maxDoc = getMaxValueDocument(mongoSession, "listEntries",
33✔
655
                new Document("pubkey", pubkeyHash).append("type", typeHash), "position");
6✔
656
        if (maxDoc == null) return null;
12✔
657

658
        long maxPosition = maxDoc.getLong("position");
15✔
659
        StringBuilder sb = new StringBuilder();
12✔
660
        sb.append(maxDoc.getString("checksum"));
18✔
661

662
        for (long offset = 10; offset <= maxPosition; offset *= 10) {
33✔
663
            long targetPos = maxPosition - offset;
12✔
664
            Document entry = collection("listEntries").find(mongoSession,
33✔
665
                    new Document("pubkey", pubkeyHash).append("type", typeHash)
9✔
666
                            .append("position", targetPos)).first();
15✔
667
            if (entry != null) {
6!
668
                sb.append(",").append(entry.getString("checksum"));
24✔
669
            }
670
        }
671
        return sb.toString();
9✔
672
    }
673

674
    /**
675
     * Returns the public key string of the Nanopub's signature, or null if not available or invalid.
676
     *
677
     * @param nanopub the nanopub to extract the public key from
678
     * @return The public key string, or null if not available or invalid.
679
     */
680
    public static String getPubkey(Nanopub nanopub) {
681
        // TODO shouldn't this be moved to a utility class in nanopub-java? there is a similar method in NanopubElement class of nanodash
682
        NanopubSignatureElement el;
683
        try {
684
            el = SignatureUtils.getSignatureElement(nanopub);
9✔
685
            if (el != null && SignatureUtils.hasValidSignature(el) && el.getPublicKeyString() != null) {
24!
686
                return el.getPublicKeyString();
9✔
687
            }
688
        } catch (MalformedCryptoElementException | GeneralSecurityException ex) {
×
689
            logger.error("Error in checking the signature of the nanopub {}", nanopub.getUri());
×
690
        }
3✔
691
        return null;
6✔
692
    }
693

694
    /**
695
     * Calculates a hash representing the current state of the trust paths in the loading collection.
696
     *
697
     * @param mongoSession the MongoDB client session
698
     * @return the calculated trust state hash
699
     */
700
    public static String calculateTrustStateHash(ClientSession mongoSession) {
701
        MongoCursor<Document> tp = collection("trustPaths_loading").find(mongoSession).sort(ascending("_id")).cursor();
×
702
        // TODO Improve this so we don't create the full string just for calculating its hash:
703
        String s = "";
×
704
        while (tp.hasNext()) {
×
705
            Document d = tp.next();
×
706
            s += d.getString("_id") + " (" + d.getString("type") + ")\n";
×
707
        }
×
708
        return Utils.getHash(s);
×
709
    }
710

711
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc