• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-query / 26156381787

20 May 2026 10:21AM UTC coverage: 58.089% (-0.3%) from 58.355%
26156381787

push

github

web-flow
Merge pull request #103 from knowledgepixels/feat/full-retractor-content-in-type-repos

feat(NanopubLoader): index retractions under invalidated nanopub's types with full content

480 of 906 branches covered (52.98%)

Branch coverage included in aggregate %.

1301 of 2160 relevant lines covered (60.23%)

9.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.52
src/main/java/com/knowledgepixels/query/NanopubLoader.java
1
package com.knowledgepixels.query;
2

3
import net.trustyuri.TrustyUriUtils;
4
import org.apache.http.client.HttpClient;
5
import org.apache.http.impl.client.HttpClientBuilder;
6
import org.eclipse.rdf4j.common.exception.RDF4JException;
7
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
8
import org.eclipse.rdf4j.model.*;
9
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
10
import org.eclipse.rdf4j.model.vocabulary.DCTERMS;
11
import org.eclipse.rdf4j.model.vocabulary.RDFS;
12
import org.eclipse.rdf4j.query.BindingSet;
13
import org.eclipse.rdf4j.query.QueryLanguage;
14
import org.eclipse.rdf4j.query.TupleQuery;
15
import org.eclipse.rdf4j.query.TupleQueryResult;
16
import org.eclipse.rdf4j.repository.RepositoryConnection;
17
import org.eclipse.rdf4j.repository.RepositoryResult;
18
import org.nanopub.Nanopub;
19
import org.nanopub.NanopubUtils;
20
import org.nanopub.SimpleCreatorPattern;
21
import org.nanopub.SimpleTimestampPattern;
22
import org.nanopub.extra.security.KeyDeclaration;
23
import org.nanopub.extra.security.MalformedCryptoElementException;
24
import org.nanopub.extra.security.NanopubSignatureElement;
25
import org.nanopub.extra.security.SignatureUtils;
26
import org.nanopub.extra.server.GetNanopub;
27
import org.nanopub.extra.setting.IntroNanopub;
28
import org.nanopub.vocabulary.NP;
29
import org.nanopub.vocabulary.NPA;
30
import org.nanopub.vocabulary.NPX;
31
import org.nanopub.vocabulary.PAV;
32
import org.slf4j.Logger;
33
import org.slf4j.LoggerFactory;
34

35

36
import java.security.GeneralSecurityException;
37
import java.util.*;
38
import java.util.concurrent.ExecutionException;
39
import java.util.concurrent.Executors;
40
import java.util.concurrent.Future;
41
import java.util.concurrent.ThreadLocalRandom;
42
import java.util.concurrent.ThreadPoolExecutor;
43
import java.util.function.Consumer;
44

45
/**
46
 * Utility class for loading nanopublications into the database.
47
 */
48
public class NanopubLoader {
49

50
    private static HttpClient httpClient;
51
    private static final ThreadPoolExecutor loadingPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
12✔
52

53
    /**
54
     * Cached count of nanopubs ever loaded into the {@code full} repo. Maintained
55
     * for {@link MainVerticle}'s {@code Nanopub-Query-Loaded-Nanopub-Count}
56
     * response header. Mirrors the persisted {@code npa:hasNanopubCount} triple
57
     * that {@link #loadNanopubToRepo} maintains; invalidations don't decrement
58
     * (they're recorded as separate {@code npx:invalidates} markers), so this
59
     * is a cumulative count including superseded/retracted nanopubs — matching
60
     * the registry-side {@code Nanopub-Registry-Nanopub-Count} semantics.
61
     * Populated lazily on first read; bumped post-commit on each fresh load.
62
     */
63
    static volatile Long loadedNanopubCount = null;
6✔
64

65
    /**
66
     * Retry budget for the five (with #71 merged: six) structurally identical
67
     * retry loops in this file. Previously the shape was flat {@code 10 s × 30} —
68
     * five minutes of constant hammering at RDF4J that did not help a slow server.
69
     * The new shape is bounded exponential backoff with ±50 % jitter:
70
     * {@code base = 1, 2, 4, 8, 16, 32, 60, 60 s} for attempts 1…8, each perturbed
71
     * by up to half its base value. Jitter prevents the 4-thread loadingPool from
72
     * retrying in lock-step after a shared RDF4J failure (GC pause / overload spike).
73
     * Worst-case wall time per failing task drops from ~35 min (post-change-1
74
     * timeouts × 30 flat retries) to ~11 min (8 retries × 60 s timeout + backoff
75
     * sleeps). This is the figure that sets the circuit-breaker trip time in
76
     * {@link JellyNanopubLoader}.
77
     */
78
    private static final int MAX_RETRIES = 8;
79
    private static final long[] BACKOFF_BASE_MS =
105✔
80
            {1_000L, 2_000L, 4_000L, 8_000L, 16_000L, 32_000L, 60_000L, 60_000L};
81

82
    /**
83
     * Returns the sleep delay in ms for the given 1-indexed retry attempt. Delay
84
     * is {@link #BACKOFF_BASE_MS}{@code [attempt-1]} perturbed by ±50 % uniform
85
     * jitter, clamped to be non-negative.
86
     *
87
     * @param attempt 1-indexed retry attempt number
88
     * @return the computed sleep delay in ms
89
     */
90
    static long computeBackoffMillis(int attempt) {
91
        long base = BACKOFF_BASE_MS[Math.min(attempt - 1, BACKOFF_BASE_MS.length - 1)];
×
92
        long jitter = ThreadLocalRandom.current().nextLong(base + 1) - base / 2;
×
93
        return Math.max(0L, base + jitter);
×
94
    }
95
    private Nanopub np;
96
    private NanopubSignatureElement el = null;
9✔
97
    private List<Statement> metaStatements = new ArrayList<>();
15✔
98
    private List<Statement> nanopubStatements = new ArrayList<>();
15✔
99
    private List<Statement> literalStatements = new ArrayList<>();
15✔
100
    private List<Statement> invalidateStatements = new ArrayList<>();
15✔
101
    private List<Statement> textStatements, allStatements, invalidatingStatements;
102
    private List<Statement> spaceExtractionStatements = new ArrayList<>();
15✔
103
    private Calendar timestamp = null;
9✔
104
    private Statement pubkeyStatement, pubkeyStatementX;
105
    private List<String> notes = new ArrayList<>();
15✔
106
    private boolean aborted = false;
9✔
107
    private static final Logger log = LoggerFactory.getLogger(NanopubLoader.class);
9✔
108

109

110
    NanopubLoader(Nanopub np, long counter) {
6✔
111
        this.np = np;
9✔
112
        if (counter >= 0) {
12✔
113
            log.info("Loading {}: {}", counter, np.getUri());
24✔
114
        } else {
115
            log.info("Loading: {}", np.getUri());
15✔
116
        }
117

118
        // TODO Ensure proper synchronization and DB rollbacks
119

120
        // TODO Check for null characters ("\0"), which can cause problems in Virtuoso.
121

122
        String ac = TrustyUriUtils.getArtifactCode(np.getUri().toString());
15✔
123
        if (!np.getHeadUri().toString().contains(ac) || !np.getAssertionUri().toString().contains(ac) || !np.getProvenanceUri().toString().contains(ac) || !np.getPubinfoUri().toString().contains(ac)) {
72!
124
            notes.add("could not load nanopub as not all graphs contained the artifact code");
×
125
            aborted = true;
×
126
            return;
×
127
        }
128

129
        try {
130
            el = SignatureUtils.getSignatureElement(np);
12✔
131
        } catch (MalformedCryptoElementException ex) {
×
132
            notes.add("Signature error");
×
133
        }
3✔
134
        if (!hasValidSignature(el)) {
12✔
135
            // Audit trail for the silent-false path: without this, an aborted nanopub
136
            // is invisible in the admin repo and only detectable as a gap between the
137
            // stream counter and the loaded count. See issue around RDF4J-instability
138
            // load gaps (full vs registry, meta vs full) where signature validation
139
            // returned false but no GeneralSecurityException was thrown.
140
            if (notes.isEmpty()) {
12!
141
                notes.add("Invalid signature");
15✔
142
            }
143
            aborted = true;
9✔
144
            return;
3✔
145
        }
146

147
        pubkeyStatement = vf.createStatement(np.getUri(), NPA.HAS_VALID_SIGNATURE_FOR_PUBLIC_KEY, vf.createLiteral(el.getPublicKeyString()), NPA.GRAPH);
39✔
148
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasValidSignatureForPublicKey, FULL_PUBKEY, npa:graph, meta, full pubkey if signature is valid
149
        metaStatements.add(pubkeyStatement);
18✔
150
        pubkeyStatementX = vf.createStatement(np.getUri(), NPA.HAS_VALID_SIGNATURE_FOR_PUBLIC_KEY_HASH, vf.createLiteral(Utils.createHash(el.getPublicKeyString())), NPA.GRAPH);
42✔
151
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasValidSignatureForPublicKeyHash, PUBKEY_HASH, npa:graph, meta, hex-encoded SHA256 hash if signature is valid
152
        metaStatements.add(pubkeyStatementX);
18✔
153

154
        if (el.getSigners().size() == 1) {  // > 1 is deprecated
18!
155
            metaStatements.add(vf.createStatement(np.getUri(), NPX.SIGNED_BY, el.getSigners().iterator().next(), NPA.GRAPH));
48✔
156
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:signedBy, SIGNER, npa:graph, meta, ID of signer
157
        }
158

159
        Set<IRI> subIris = new HashSet<>();
12✔
160
        Set<IRI> otherNps = new HashSet<>();
12✔
161
        Set<IRI> invalidated = new HashSet<>();
12✔
162
        Set<IRI> retracted = new HashSet<>();
12✔
163
        Set<IRI> superseded = new HashSet<>();
12✔
164
        String combinedLiterals = "";
6✔
165
        for (Statement st : NanopubUtils.getStatements(np)) {
33✔
166
            nanopubStatements.add(st);
15✔
167

168
            if (st.getPredicate().toString().contains(ac)) {
18!
169
                subIris.add(st.getPredicate());
×
170
            } else {
171
                IRI b = getBaseTrustyUri(st.getPredicate());
12✔
172
                if (b != null) otherNps.add(b);
6!
173
            }
174
            if (st.getPredicate().equals(NPX.RETRACTS) && st.getObject() instanceof IRI) {
15!
175
                retracted.add((IRI) st.getObject());
×
176
            }
177
            if (st.getPredicate().equals(NPX.INVALIDATES) && st.getObject() instanceof IRI) {
15!
178
                invalidated.add((IRI) st.getObject());
×
179
            }
180
            if (st.getSubject().equals(np.getUri()) && st.getObject() instanceof IRI) {
30✔
181
                if (st.getPredicate().equals(NPX.SUPERSEDES)) {
15✔
182
                    superseded.add((IRI) st.getObject());
18✔
183
                }
184
                if (st.getObject().toString().matches(".*[^A-Za-z0-9\\-_]RA[A-Za-z0-9\\-_]{43}")) {
18✔
185
                    metaStatements.add(vf.createStatement(np.getUri(), st.getPredicate(), st.getObject(), NPA.NETWORK_GRAPH));
39✔
186
                    // @ADMIN-TRIPLE-TABLE@ NANOPUB1, RELATION, NANOPUB2, npa:networkGraph, meta, any inter-nanopub relation found in NANOPUB1
187
                }
188
                if (st.getContext().equals(np.getPubinfoUri())) {
18✔
189
                    if (st.getPredicate().equals(NPX.INTRODUCES) || st.getPredicate().equals(NPX.DESCRIBES) || st.getPredicate().equals(NPX.EMBEDS)) {
45!
190
                        metaStatements.add(vf.createStatement(np.getUri(), st.getPredicate(), st.getObject(), NPA.GRAPH));
39✔
191
                        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:introduces, THING, npa:graph, meta, when such a triple is present in pubinfo of NANOPUB
192
                        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:describes, THING, npa:graph, meta, when such a triple is present in pubinfo of NANOPUB
193
                        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:embeds, THING, npa:graph, meta, when such a triple is present in pubinfo of NANOPUB
194
                    }
195
                }
196
            }
197
            if (st.getSubject().toString().contains(ac)) {
18✔
198
                subIris.add((IRI) st.getSubject());
21✔
199
            } else {
200
                IRI b = getBaseTrustyUri(st.getSubject());
12✔
201
                if (b != null) otherNps.add(b);
6!
202
            }
203
            if (st.getObject() instanceof IRI) {
12✔
204
                if (st.getObject().toString().contains(ac)) {
18✔
205
                    subIris.add((IRI) st.getObject());
21✔
206
                } else {
207
                    IRI b = getBaseTrustyUri(st.getObject());
12✔
208
                    if (b != null) otherNps.add(b);
18✔
209
                }
3✔
210
            } else {
211
                combinedLiterals += st.getObject().stringValue().replaceAll("\\s+", " ") + "\n";
27✔
212
//                                if (st.getSubject().equals(np.getUri()) && !st.getSubject().equals(HAS_FILTER_LITERAL)) {
213
//                                        literalStatements.add(vf.createStatement(np.getUri(), st.getPredicate(), st.getObject(), LITERAL_GRAPH));
214
//                                } else {
215
//                                        literalStatements.add(vf.createStatement(np.getUri(), HAS_LITERAL, st.getObject(), LITERAL_GRAPH));
216
//                                }
217
            }
218
        }
3✔
219
        subIris.remove(np.getUri());
15✔
220
        subIris.remove(np.getAssertionUri());
15✔
221
        subIris.remove(np.getProvenanceUri());
15✔
222
        subIris.remove(np.getPubinfoUri());
15✔
223
        for (IRI i : subIris) {
30✔
224
            metaStatements.add(vf.createStatement(np.getUri(), NPA.HAS_SUB_IRI, i, NPA.GRAPH));
33✔
225
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasSubIri, SUB_IRI, npa:graph, meta, for any IRI minted in the namespace of the NANOPUB
226
        }
3✔
227
        for (IRI i : otherNps) {
30✔
228
            metaStatements.add(vf.createStatement(np.getUri(), NPA.REFERS_TO_NANOPUB, i, NPA.NETWORK_GRAPH));
33✔
229
            // @ADMIN-TRIPLE-TABLE@ NANOPUB1, npa:refersToNanopub, NANOPUB2, npa:networkGraph, meta, generic inter-nanopub relation
230
        }
3✔
231
        for (IRI i : invalidated) {
18!
232
            invalidateStatements.add(vf.createStatement(np.getUri(), NPX.INVALIDATES, i, NPA.GRAPH));
×
233
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:invalidates, INVALIDATED_NANOPUB, npa:graph, meta, if the NANOPUB retracts or supersedes another nanopub
234
        }
×
235
        for (IRI i : retracted) {
18!
236
            invalidateStatements.add(vf.createStatement(np.getUri(), NPX.INVALIDATES, i, NPA.GRAPH));
×
237
            metaStatements.add(vf.createStatement(np.getUri(), NPX.RETRACTS, i, NPA.GRAPH));
×
238
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:retracts, RETRACTED_NANOPUB, npa:graph, meta, if the NANOPUB retracts another nanopub
239
        }
×
240
        for (IRI i : superseded) {
30✔
241
            invalidateStatements.add(vf.createStatement(np.getUri(), NPX.INVALIDATES, i, NPA.GRAPH));
33✔
242
            metaStatements.add(vf.createStatement(np.getUri(), NPX.SUPERSEDES, i, NPA.GRAPH));
33✔
243
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:supersedes, SUPERSEDED_NANOPUB, npa:graph, meta, if the NANOPUB supersedes another nanopub
244
        }
3✔
245

246
        metaStatements.add(vf.createStatement(np.getUri(), NPA.HAS_HEAD_GRAPH, np.getHeadUri(), NPA.GRAPH));
36✔
247
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasHeadGraph, HEAD_GRAPH, npa:graph, meta, direct link to the head graph of the NANOPUB
248
        metaStatements.add(vf.createStatement(np.getUri(), NPA.HAS_GRAPH, np.getHeadUri(), NPA.GRAPH));
36✔
249
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasGraph, GRAPH, npa:graph, meta, generic link to all four graphs of the given NANOPUB
250
        metaStatements.add(vf.createStatement(np.getUri(), NP.HAS_ASSERTION, np.getAssertionUri(), NPA.GRAPH));
36✔
251
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, np:hasAssertion, ASSERTION_GRAPH, npa:graph, meta, direct link to the assertion graph of the NANOPUB
252
        metaStatements.add(vf.createStatement(np.getUri(), NPA.HAS_GRAPH, np.getAssertionUri(), NPA.GRAPH));
36✔
253
        metaStatements.add(vf.createStatement(np.getUri(), NP.HAS_PROVENANCE, np.getProvenanceUri(), NPA.GRAPH));
36✔
254
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, np:hasProvenance, PROVENANCE_GRAPH, npa:graph, meta, direct link to the provenance graph of the NANOPUB
255
        metaStatements.add(vf.createStatement(np.getUri(), NPA.HAS_GRAPH, np.getProvenanceUri(), NPA.GRAPH));
36✔
256
        metaStatements.add(vf.createStatement(np.getUri(), NP.HAS_PUBINFO, np.getPubinfoUri(), NPA.GRAPH));
36✔
257
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, np:hasPublicationInfo, PUBINFO_GRAPH, npa:graph, meta, direct link to the pubinfo graph of the NANOPUB
258
        metaStatements.add(vf.createStatement(np.getUri(), NPA.HAS_GRAPH, np.getPubinfoUri(), NPA.GRAPH));
36✔
259

260
        String artifactCode = TrustyUriUtils.getArtifactCode(np.getUri().stringValue());
15✔
261
        metaStatements.add(vf.createStatement(np.getUri(), NPA.ARTIFACT_CODE, vf.createLiteral(artifactCode), NPA.GRAPH));
39✔
262
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:artifactCode, ARTIFACT_CODE, npa:graph, meta, artifact code starting with 'RA...'
263

264
        if (isIntroNanopub(np)) {
9✔
265
            IntroNanopub introNp = new IntroNanopub(np);
15✔
266
            metaStatements.add(vf.createStatement(np.getUri(), NPA.IS_INTRODUCTION_OF, introNp.getUser(), NPA.GRAPH));
36✔
267
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:isIntroductionOf, AGENT, npa:graph, meta, linking intro nanopub to the agent it is introducing
268
            for (KeyDeclaration kc : introNp.getKeyDeclarations()) {
33✔
269
                metaStatements.add(vf.createStatement(np.getUri(), NPA.DECLARES_PUBKEY, vf.createLiteral(kc.getPublicKeyString()), NPA.GRAPH));
42✔
270
                // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:declaresPubkey, FULL_PUBKEY, npa:graph, meta, full pubkey declared by the given intro NANOPUB
271
            }
3✔
272
        }
273

274
        try {
275
            timestamp = SimpleTimestampPattern.getCreationTime(np);
12✔
276
        } catch (IllegalArgumentException ex) {
×
277
            notes.add("Illegal date/time");
×
278
        }
3✔
279
        if (timestamp != null) {
9!
280
            metaStatements.add(vf.createStatement(np.getUri(), DCTERMS.CREATED, vf.createLiteral(timestamp.getTime()), NPA.GRAPH));
45✔
281
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, dct:created, CREATION_DATE, npa:graph, meta, normalized creation timestamp
282
        }
283

284
        String literalFilter = "_pubkey_" + Utils.createHash(el.getPublicKeyString());
18✔
285
        for (IRI typeIri : NanopubUtils.getTypes(np)) {
33✔
286
            metaStatements.add(vf.createStatement(np.getUri(), NPX.HAS_NANOPUB_TYPE, typeIri, NPA.GRAPH));
33✔
287
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:hasNanopubType, NANOPUB_TYPE, npa:graph, meta, type of NANOPUB
288
            literalFilter += " _type_" + Utils.createHash(typeIri);
15✔
289
        }
3✔
290
        String label = NanopubUtils.getLabel(np);
9✔
291
        if (label != null) {
6!
292
            metaStatements.add(vf.createStatement(np.getUri(), RDFS.LABEL, vf.createLiteral(label), NPA.GRAPH));
39✔
293
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, rdfs:label, LABEL, npa:graph, meta, label of NANOPUB
294
        }
295
        String description = NanopubUtils.getDescription(np);
9✔
296
        if (description != null) {
6✔
297
            metaStatements.add(vf.createStatement(np.getUri(), DCTERMS.DESCRIPTION, vf.createLiteral(description), NPA.GRAPH));
39✔
298
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, dct:description, LABEL, npa:graph, meta, description of NANOPUB
299
        }
300
        for (IRI creatorIri : SimpleCreatorPattern.getCreators(np)) {
33✔
301
            metaStatements.add(vf.createStatement(np.getUri(), DCTERMS.CREATOR, creatorIri, NPA.GRAPH));
33✔
302
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, dct:creator, CREATOR, npa:graph, meta, creator of NANOPUB (can be several)
303
        }
3✔
304
        for (IRI authorIri : SimpleCreatorPattern.getAuthors(np)) {
21!
305
            metaStatements.add(vf.createStatement(np.getUri(), PAV.AUTHORED_BY, authorIri, NPA.GRAPH));
×
306
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, pav:authoredBy, AUTHOR, npa:graph, meta, author of NANOPUB (can be several)
307
        }
×
308

309
        if (!combinedLiterals.isEmpty()) {
9!
310
            literalStatements.add(vf.createStatement(np.getUri(), NPA.HAS_FILTER_LITERAL, vf.createLiteral(literalFilter + "\n" + combinedLiterals), NPA.GRAPH));
45✔
311
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasFilterLiteral, FILTER_LITERAL, npa:graph, literal, auxiliary literal for filtering by type and pubkey in text repo
312
        }
313

314
        // Any statements that express that the currently processed nanopub is already invalidated:
315
        invalidatingStatements = getInvalidatingStatements(np.getUri());
15✔
316

317
        metaStatements.addAll(invalidateStatements);
18✔
318

319
        allStatements = new ArrayList<>(nanopubStatements);
21✔
320
        allStatements.addAll(metaStatements);
18✔
321
        allStatements.addAll(invalidatingStatements);
18✔
322

323
        textStatements = new ArrayList<>(literalStatements);
21✔
324
        textStatements.addAll(metaStatements);
18✔
325
        textStatements.addAll(invalidatingStatements);
18✔
326

327
        if (FeatureFlags.spacesEnabled()) {
6!
328
            IRI signedBy = (el.getSigners().size() == 1) ? el.getSigners().iterator().next() : null;
42!
329
            String pubkeyHash = Utils.createHash(el.getPublicKeyString());
15✔
330
            Date createdAt = (timestamp != null) ? timestamp.getTime() : null;
24!
331
            SpacesExtractor.Context ctx = new SpacesExtractor.Context(ac, signedBy, pubkeyHash, createdAt);
24✔
332
            spaceExtractionStatements = SpacesExtractor.extract(np, ctx);
15✔
333
        }
334
    }
3✔
335

336
    /**
337
     * Get the HTTP client used for fetching nanopublications.
338
     *
339
     * @return the HTTP client
340
     */
341
    static HttpClient getHttpClient() {
342
        if (httpClient == null) {
6✔
343
            httpClient = HttpClientBuilder.create().setDefaultRequestConfig(Utils.getHttpRequestConfig()).build();
15✔
344
        }
345
        return httpClient;
6✔
346
    }
347

348
    /**
349
     * Load the given nanopublication into the database.
350
     *
351
     * @param nanopubUri Nanopublication identifier (URI)
352
     */
353
    public static void load(String nanopubUri) {
354
        if (isNanopubLoaded(nanopubUri)) {
9!
355
            log.info("Already loaded: {}", nanopubUri);
×
356
        } else {
357
            Nanopub np = GetNanopub.get(nanopubUri, getHttpClient());
12✔
358
            load(np, -1);
9✔
359
        }
360
    }
3✔
361

362
    /**
363
     * Load a nanopub into the database.
364
     *
365
     * @param np      the nanopub to load
366
     * @param counter the load counter, only used for logging (or -1 if not known)
367
     * @throws RDF4JException if the loading fails
368
     */
369
    public static void load(Nanopub np, long counter) throws RDF4JException {
370
        NanopubLoader loader = new NanopubLoader(np, counter);
18✔
371
        loader.executeLoading();
6✔
372
    }
3✔
373

374
    @GeneratedFlagForDependentElements
375
    private void executeLoading() {
376
        var runningTasks = new ArrayList<Future<?>>();
377
        Consumer<Runnable> runTask = t -> runningTasks.add(loadingPool.submit(t));
×
378

379
        for (String note : notes) {
380
            loadNoteToRepo(np.getUri(), note);
381
        }
382

383
        if (!aborted) {
384
            // Submit all tasks except the "meta" task
385
            if (timestamp != null) {
386
                if (new Date().getTime() - timestamp.getTimeInMillis() < THIRTY_DAYS) {
387
                    if (FeatureFlags.last30dRepoEnabled()) {
388
                        runTask.accept(() -> loadNanopubToLatest(np.getUri(), allStatements));
×
389
                    }
390
                }
391
            }
392

393
            if (FeatureFlags.textRepoEnabled()) {
394
                runTask.accept(() -> loadNanopubToRepo(np.getUri(), textStatements, "text"));
×
395
            }
396
            if (FeatureFlags.fullRepoEnabled()) {
397
                runTask.accept(() -> loadNanopubToRepo(np.getUri(), allStatements, "full"));
×
398
            }
399
            // Note: "meta" task is deferred until all other tasks complete successfully
400

401
            runTask.accept(() -> loadNanopubToRepo(np.getUri(), allStatements, "pubkey_" + Utils.createHash(el.getPublicKeyString())));
×
402
            //                loadNanopubToRepo(np.getUri(), textStatements, "text-pubkey_" + Utils.createHash(el.getPublicKeyString()));
403
            for (IRI typeIri : NanopubUtils.getTypes(np)) {
404
                // Exclude locally minted IRIs:
405
                if (typeIri.stringValue().startsWith(np.getUri().stringValue())) continue;
406
                if (!typeIri.stringValue().matches("https?://.*")) continue;
407
                runTask.accept(() -> loadNanopubToRepo(np.getUri(), allStatements, "type_" + Utils.createHash(typeIri)));
×
408
                //                        loadNanopubToRepo(np.getUri(), textStatements, "text-type_" + Utils.createHash(typeIri));
409
            }
410
            //                for (IRI creatorIri : SimpleCreatorPattern.getCreators(np)) {
411
            //                        // Exclude locally minted IRIs:
412
            //                        if (creatorIri.stringValue().startsWith(np.getUri().stringValue())) continue;
413
            //                        if (!creatorIri.stringValue().matches("https?://.*")) continue;
414
            //                        loadNanopubToRepo(np.getUri(), allStatements, "user_" + Utils.createHash(creatorIri));
415
            //                        loadNanopubToRepo(np.getUri(), textStatements, "text-user_" + Utils.createHash(creatorIri));
416
            //                }
417
            //                for (IRI authorIri : SimpleCreatorPattern.getAuthors(np)) {
418
            //                        // Exclude locally minted IRIs:
419
            //                        if (authorIri.stringValue().startsWith(np.getUri().stringValue())) continue;
420
            //                        if (!authorIri.stringValue().matches("https?://.*")) continue;
421
            //                        loadNanopubToRepo(np.getUri(), allStatements, "user_" + Utils.createHash(authorIri));
422
            //                        loadNanopubToRepo(np.getUri(), textStatements, "text-user_" + Utils.createHash(authorIri));
423
            //                }
424

425
            // Write to the spaces repo whenever the nanopub has its own space-relevant
426
            // extractions OR carries an npx:invalidates / npx:retracts / npx:supersedes
427
            // triple. The latter case keeps pure-retraction nanopubs (no own space-typed
428
            // content) visible to the materialiser's invalidation joins: they need
429
            // ?invNp <npx:invalidates> ?np AND ?invNp <npa:hasLoadNumber> ?ln in
430
            // npa:graph of the spaces repo, both of which loadToSpacesRepo writes.
431
            // Without this, an A-before-B load order with a pure-retraction B would
432
            // leave A's row in the materialised state graph forever.
433
            if (FeatureFlags.spacesEnabled()
434
                    && (!spaceExtractionStatements.isEmpty() || !invalidateStatements.isEmpty())) {
435
                runTask.accept(() -> loadToSpacesRepo(np.getUri(), allStatements, spaceExtractionStatements));
×
436
            }
437

438
            for (Statement st : invalidateStatements) {
439
                runTask.accept(() -> loadInvalidateStatements(np, el.getPublicKeyString(), st, pubkeyStatement, pubkeyStatementX, allStatements));
×
440
            }
441

442
            // Reverse-order symmetry: when retractors were loaded before this nanopub,
443
            // getInvalidatingStatements (in the constructor) captured their
444
            // `npx:invalidates` markers into invalidatingStatements. Mirror what
445
            // loadInvalidateStatements does in the forward case — load each retractor's
446
            // full content into this nanopub's per-type repos (those types the
447
            // retractor doesn't itself carry), sourced from the retractor's per-pubkey
448
            // repo (the one shard guaranteed to be populated for every successfully
449
            // loaded nanopub).
450
            Map<IRI, String> invalidatorPubkeys = collectInvalidatorPubkeys(invalidatingStatements);
451
            if (!invalidatorPubkeys.isEmpty()) {
452
                Set<IRI> thisNpTypes = NanopubUtils.getTypes(np);
453
                for (Map.Entry<IRI, String> e : invalidatorPubkeys.entrySet()) {
454
                    IRI invIri = e.getKey();
455
                    String invPubkey = e.getValue();
456
                    runTask.accept(() -> loadInvalidatorIntoTypeRepos(invIri, invPubkey, np.getUri(), thisNpTypes));
×
457
                }
458
            }
459

460
            // Wait for all non-meta tasks to complete successfully before submitting the meta task.
461
            // On failure, cancel the remaining futures so orphaned tasks don't keep running in the
462
            // shared loadingPool and race with the next batch retry (which re-submits the same
463
            // nanopub against the same repos).
464
            for (var task : runningTasks) {
465
                try {
466
                    task.get();
467
                } catch (ExecutionException | InterruptedException ex) {
468
                    for (var t : runningTasks) {
469
                        if (!t.isDone()) t.cancel(true);
470
                    }
471
                    throw new RuntimeException("Error in nanopub loading thread", ex.getCause());
472
                }
473
            }
474

475
            // Now submit and wait for the "meta" task after all other tasks have completed successfully
476
            Future<?> metaTask = loadingPool.submit(() -> loadNanopubToRepo(np.getUri(), metaStatements, "meta"));
×
477
            try {
478
                metaTask.get();
479
            } catch (ExecutionException | InterruptedException ex) {
480
                throw new RuntimeException("Error in nanopub loading thread (meta task)", ex.getCause());
481
            }
482
        }
483
    }
484

485
    private static Long lastUpdateOfLatestRepo = null;
6✔
486
    private static long THIRTY_DAYS = 1000L * 60 * 60 * 24 * 30;
6✔
487
    private static long ONE_HOUR = 1000L * 60 * 60;
6✔
488

489
    @GeneratedFlagForDependentElements
490
    private static void loadNanopubToLatest(IRI npId, List<Statement> statements) {
491
        boolean success = false;
492
        int retries = 0;
493
        while (!success) {
494
            RepositoryConnection conn = TripleStore.get().getRepoConnection("last30d");
495
            try (conn) {
496
                // Read committed, because deleting old nanopubs is idempotent. Inserts do not collide
497
                // with deletes, because we are not inserting old nanopubs.
498
                conn.begin(IsolationLevels.READ_COMMITTED);
499
                conn.add(statements);
500
                if (lastUpdateOfLatestRepo == null || new Date().getTime() - lastUpdateOfLatestRepo > ONE_HOUR) {
501
                    log.trace("Remove old nanopubs...");
502
                    Literal thirtyDaysAgo = vf.createLiteral(new Date(new Date().getTime() - THIRTY_DAYS));
503
                    TupleQuery q = conn.prepareTupleQuery(QueryLanguage.SPARQL, "SELECT * { graph <" + NPA.GRAPH + "> { " + "?np <" + DCTERMS.CREATED + "> ?date . " + "filter ( ?date < ?thirtydaysago ) " + "} }");
504
                    q.setBinding("thirtydaysago", thirtyDaysAgo);
505
                    try (TupleQueryResult r = q.evaluate()) {
506
                        while (r.hasNext()) {
507
                            BindingSet b = r.next();
508
                            IRI oldNpId = (IRI) b.getBinding("np").getValue();
509
                            log.trace("Remove old nanopub: {}", oldNpId);
510
                            for (Value v : Utils.getObjectsForPattern(conn, NPA.GRAPH, oldNpId, NPA.HAS_GRAPH)) {
511
                                // Remove all four nanopub graphs:
512
                                conn.remove((Resource) null, (IRI) null, (Value) null, (IRI) v);
513
                            }
514
                            // Remove nanopubs in admin graphs:
515
                            conn.remove(oldNpId, null, null, NPA.GRAPH);
516
                            conn.remove(oldNpId, null, null, NPA.NETWORK_GRAPH);
517
                        }
518
                    }
519
                    lastUpdateOfLatestRepo = new Date().getTime();
520
                }
521
                conn.commit();
522
                success = true;
523
            } catch (Exception ex) {
524
                log.warn("Could not load nanopub {} to last30d repo.", npId, ex);
525
                if (conn.isActive()) conn.rollback();
526
            }
527
            if (!success) {
528
                retries++;
529
                if (retries >= MAX_RETRIES) {
530
                    throw new RuntimeException("Failed to load nanopub " + npId + " to last30d repo after " + MAX_RETRIES + " retries");
531
                }
532
                long delay = computeBackoffMillis(retries);
533
                log.info("Retrying in {} ms for nanopub {} in last30d (attempt {}/{})...", delay, npId, retries, MAX_RETRIES);
534
                try {
535
                    Thread.sleep(delay);
536
                } catch (InterruptedException x) {
537
                    Thread.currentThread().interrupt();
538
                }
539
            }
540
        }
541
    }
542

543
    @GeneratedFlagForDependentElements
544
    private static void loadNanopubToRepo(IRI npId, List<Statement> statements, String repoName) {
545
        boolean success = false;
546
        int retries = 0;
547
        while (!success) {
548
            RepositoryConnection conn = TripleStore.get().getRepoConnection(repoName);
549
            long newCountForCache = -1;
550
            try (conn) {
551
                // Serializable, because write skew would cause the chain of hashes to be broken.
552
                // The inserts must be done serially.
553
                conn.begin(IsolationLevels.SERIALIZABLE);
554
                var repoStatus = fetchRepoStatus(conn, npId);
555
                if (repoStatus.isLoaded) {
556
                    log.info("Already loaded: {}", npId);
557
                } else {
558
                    String newChecksum = NanopubUtils.updateXorChecksum(npId, repoStatus.checksum);
559
                    conn.remove(NPA.THIS_REPO, NPA.HAS_NANOPUB_COUNT, null, NPA.GRAPH);
560
                    conn.remove(NPA.THIS_REPO, NPA.HAS_NANOPUB_CHECKSUM, null, NPA.GRAPH);
561
                    conn.add(NPA.THIS_REPO, NPA.HAS_NANOPUB_COUNT, vf.createLiteral(repoStatus.count + 1), NPA.GRAPH);
562
                    // @ADMIN-TRIPLE-TABLE@ REPO, npa:hasNanopubCount, NANOPUB_COUNT, npa:graph, admin, number of nanopubs loaded
563
                    conn.add(NPA.THIS_REPO, NPA.HAS_NANOPUB_CHECKSUM, vf.createLiteral(newChecksum), NPA.GRAPH);
564
                    // @ADMIN-TRIPLE-TABLE@ REPO, npa:hasNanopubChecksum, NANOPUB_CHECKSUM, npa:graph, admin, checksum of all loaded nanopubs (order-independent XOR checksum on trusty URIs in Base64 notation)
565
                    conn.add(npId, NPA.HAS_LOAD_NUMBER, vf.createLiteral(repoStatus.count), NPA.GRAPH);
566
                    // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasLoadNumber, LOAD_NUMBER, npa:graph, admin, the sequential number at which this NANOPUB was loaded
567
                    conn.add(npId, NPA.HAS_LOAD_CHECKSUM, vf.createLiteral(newChecksum), NPA.GRAPH);
568
                    // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasLoadChecksum, LOAD_CHECKSUM, npa:graph, admin, the checksum of all loaded nanopubs after loading the given NANOPUB
569
                    conn.add(npId, NPA.HAS_LOAD_TIMESTAMP, vf.createLiteral(new Date()), NPA.GRAPH);
570
                    // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasLoadTimestamp, LOAD_TIMESTAMP, npa:graph, admin, the time point at which this NANOPUB was loaded
571
                    conn.add(statements);
572
                    if ("full".equals(repoName)) {
573
                        newCountForCache = repoStatus.count + 1;
574
                    }
575
                }
576
                conn.commit();
577
                if (newCountForCache >= 0) {
578
                    loadedNanopubCount = newCountForCache;
579
                }
580
                success = true;
581
            } catch (Exception ex) {
582
                log.warn("Could not load nanopub {} to repo {}.", npId, repoName, ex);
583
                if (conn.isActive()) conn.rollback();
584
            }
585
            if (!success) {
586
                retries++;
587
                if (retries >= MAX_RETRIES) {
588
                    throw new RuntimeException("Failed to load nanopub " + npId + " to repo " + repoName + " after " + MAX_RETRIES + " retries");
589
                }
590
                long delay = computeBackoffMillis(retries);
591
                log.info("Retrying in {} ms for nanopub {} in repo {} (attempt {}/{})...", delay, npId, repoName, retries, MAX_RETRIES);
592
                try {
593
                    Thread.sleep(delay);
594
                } catch (InterruptedException x) {
595
                    Thread.currentThread().interrupt();
596
                }
597
            }
598
        }
599
    }
600

601
    /**
602
     * Writes the raw nanopub statements (all four graphs) into the {@code spaces}
603
     * repo alongside the pre-computed extraction statements (which target
604
     * {@code npa:spacesGraph}). Stamps the load-number on the nanopub IRI and bumps
605
     * {@code npa:thisRepo npa:currentLoadCounter} in {@code npa:graph}, all within
606
     * one serializable transaction.
607
     *
608
     * <p>Idempotent: if the nanopub already has a {@code npa:hasLoadNumber} stamp in
609
     * {@code npa:graph} of the {@code spaces} repo, this is a no-op.
610
     *
611
     * @param npId            nanopub IRI
612
     * @param nanopubTriples  raw nanopub statements (all four graphs + meta)
613
     * @param spaceExtraction summary triples destined for {@code npa:spacesGraph}
614
     */
615
    @GeneratedFlagForDependentElements
616
    private static void loadToSpacesRepo(IRI npId, List<Statement> nanopubTriples,
617
                                         List<Statement> spaceExtraction) {
618
        boolean success = false;
619
        int retries = 0;
620
        while (!success) {
621
            RepositoryConnection conn = TripleStore.get().getRepoConnection("spaces");
622
            try (conn) {
623
                conn.begin(IsolationLevels.SERIALIZABLE);
624
                // Idempotency: skip if this nanopub is already stamped in this repo.
625
                if (Utils.getObjectForPattern(conn, NPA.GRAPH, npId, NPA.HAS_LOAD_NUMBER) != null) {
626
                    conn.commit();
627
                    success = true;
628
                    continue;
629
                }
630
                long newCounter = fetchSpacesLoadCounter(conn) + 1;
631
                conn.remove(NPA.THIS_REPO,
632
                        com.knowledgepixels.query.vocabulary.SpacesVocab.CURRENT_LOAD_COUNTER,
633
                        null, NPA.GRAPH);
634
                conn.add(NPA.THIS_REPO,
635
                        com.knowledgepixels.query.vocabulary.SpacesVocab.CURRENT_LOAD_COUNTER,
636
                        vf.createLiteral(newCounter), NPA.GRAPH);
637
                conn.add(npId, NPA.HAS_LOAD_NUMBER, vf.createLiteral(newCounter), NPA.GRAPH);
638
                conn.add(nanopubTriples);
639
                conn.add(spaceExtraction);
640
                conn.commit();
641
                success = true;
642
            } catch (Exception ex) {
643
                log.warn("Could not load nanopub {} to spaces repo.", npId, ex);
644
                if (conn.isActive()) conn.rollback();
645
            }
646
            if (!success) {
647
                retries++;
648
                if (retries >= MAX_RETRIES) {
649
                    throw new RuntimeException("Failed to load nanopub " + npId + " to spaces repo after " + MAX_RETRIES + " retries");
650
                }
651
                long delay = computeBackoffMillis(retries);
652
                log.info("Retrying in {} ms for nanopub {} in spaces repo (attempt {}/{})...", delay, npId, retries, MAX_RETRIES);
653
                try {
654
                    Thread.sleep(delay);
655
                } catch (InterruptedException x) {
656
                    Thread.currentThread().interrupt();
657
                }
658
            }
659
        }
660
    }
661

662
    /**
663
     * Returns the cumulative count of nanopubs ever loaded into the {@code full}
664
     * repo, or {@code null} if the value cannot be determined (e.g. the store
665
     * hasn't been initialised yet). Reads the persisted {@code npa:hasNanopubCount}
666
     * triple on first call and caches it in {@link #loadedNanopubCount};
667
     * subsequent fresh loads update the cache in-place.
668
     */
669
    public static Long getLoadedNanopubCount() {
670
        Long v = loadedNanopubCount;
6✔
671
        if (v != null) return v;
12✔
672
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection("full")) {
12✔
673
            Value val = Utils.getObjectForPattern(conn, NPA.GRAPH, NPA.THIS_REPO, NPA.HAS_NANOPUB_COUNT);
×
674
            if (val != null) {
×
675
                v = Long.parseLong(val.stringValue());
×
676
                loadedNanopubCount = v;
×
677
                return v;
×
678
            }
679
        } catch (NumberFormatException ex) {
×
680
            log.warn("Invalid npa:hasNanopubCount literal in full repo", ex);
×
681
        } catch (Exception ex) {
3✔
682
            log.warn("Could not read npa:hasNanopubCount from full repo", ex);
12✔
683
        }
×
684
        return null;
6✔
685
    }
686

687
    private static long fetchSpacesLoadCounter(RepositoryConnection conn) {
688
        Value v = Utils.getObjectForPattern(conn, NPA.GRAPH, NPA.THIS_REPO,
×
689
                com.knowledgepixels.query.vocabulary.SpacesVocab.CURRENT_LOAD_COUNTER);
690
        if (v == null) return 0;
×
691
        try {
692
            return Long.parseLong(v.stringValue());
×
693
        } catch (NumberFormatException ex) {
×
694
            log.warn("Invalid npa:currentLoadCounter literal in spaces repo: {}", v);
×
695
            return 0;
×
696
        }
697
    }
698

699
    private record RepoStatus(boolean isLoaded, long count, String checksum) {
×
700
    }
701

702
    /**
703
     * To execute before loading a nanopub: check if the nanopub is already loaded and what is the
704
     * current load counter and checksum. This effectively batches three queries into one.
705
     * This method must be called from within a transaction.
706
     *
707
     * @param conn repo connection
708
     * @param npId nanopub ID
709
     * @return the current status
710
     */
711
    @GeneratedFlagForDependentElements
712
    private static RepoStatus fetchRepoStatus(RepositoryConnection conn, IRI npId) {
713
        var result = conn.prepareTupleQuery(QueryLanguage.SPARQL, REPO_STATUS_QUERY_TEMPLATE.formatted(npId)).evaluate();
714
        try (result) {
715
            if (!result.hasNext()) {
716
                // This may happen if the repo was created, but is completely empty.
717
                return new RepoStatus(false, 0, NanopubUtils.INIT_CHECKSUM);
718
            }
719
            var row = result.next();
720
            return new RepoStatus(row.hasBinding("loadNumber"), Long.parseLong(row.getBinding("count").getValue().stringValue()), row.getBinding("checksum").getValue().stringValue());
721
        }
722
    }
723

724
    @GeneratedFlagForDependentElements
725
    private static void loadInvalidateStatements(Nanopub thisNp, String thisPubkey, Statement invalidateStatement, Statement pubkeyStatement, Statement pubkeyStatementX, List<Statement> thisAllStatements) {
726
        boolean success = false;
727
        int retries = 0;
728
        List<IRI> typesToLoadFullInto = new ArrayList<>();
729
        while (!success) {
730
            typesToLoadFullInto.clear();
731
            List<RepositoryConnection> connections = new ArrayList<>();
732
            RepositoryConnection metaConn = TripleStore.get().getRepoConnection("meta");
733
            try {
734
                IRI invalidatedNpId = (IRI) invalidateStatement.getObject();
735
                // Basic isolation because here we only read append-only data.
736
                metaConn.begin(IsolationLevels.READ_COMMITTED);
737

738
                Value pubkeyValue = Utils.getObjectForPattern(metaConn, NPA.GRAPH, invalidatedNpId, NPA.HAS_VALID_SIGNATURE_FOR_PUBLIC_KEY);
739
                if (pubkeyValue != null) {
740
                    String pubkey = pubkeyValue.stringValue();
741

742
                    if (!pubkey.equals(thisPubkey)) {
743
                        //log.info("Adding invalidation expressed in " + thisNp.getUri() + " also to repo for pubkey " + pubkey);
744
                        connections.add(loadStatements("pubkey_" + Utils.createHash(pubkey), invalidateStatement, pubkeyStatement, pubkeyStatementX));
745
//                                                connections.add(loadStatements("text-pubkey_" + Utils.createHash(pubkey), invalidateStatement, pubkeyStatement));
746
                    }
747

748
                    Set<IRI> thisNpTypes = NanopubUtils.getTypes(thisNp);
749
                    for (Value v : Utils.getObjectsForPattern(metaConn, NPA.GRAPH, invalidatedNpId, NPX.HAS_NANOPUB_TYPE)) {
750
                        if (v instanceof IRI typeIri && !thisNpTypes.contains(typeIri)) {
751
                            // Defer until after the meta-read commits — full load goes
752
                            // through loadNanopubToRepo, which has its own transaction
753
                            // and retry loop (see post-loop block below).
754
                            typesToLoadFullInto.add(typeIri);
755
                        }
756
                    }
757

758
//                                        for (Value v : Utils.getObjectsForPattern(metaConn, NPA.GRAPH, invalidatedNpId, DCTERMS.CREATOR)) {
759
//                                                IRI creatorIri = (IRI) v;
760
//                                                if (!SimpleCreatorPattern.getCreators(thisNp).contains(creatorIri)) {
761
//                                                        //log.info("Adding invalidation expressed in " + thisNp.getUri() + " also to repo for user " + creatorIri);
762
//                                                        connections.add(loadStatements("user_" + Utils.createHash(creatorIri), invalidateStatement, pubkeyStatement));
763
//                                                        connections.add(loadStatements("text-user_" + Utils.createHash(creatorIri), invalidateStatement, pubkeyStatement));
764
//                                                }
765
//                                        }
766
                }
767

768
                metaConn.commit();
769
                // TODO handle case that some commits succeed and some fail
770
                for (RepositoryConnection c : connections) c.commit();
771
                success = true;
772
            } catch (Exception ex) {
773
                log.warn("Could not load invalidate statements for {}.", thisNp.getUri(), ex);
774
                if (metaConn.isActive()) metaConn.rollback();
775
                for (RepositoryConnection c : connections) {
776
                    if (c.isActive()) c.rollback();
777
                }
778
            } finally {
779
                metaConn.close();
780
                for (RepositoryConnection c : connections) c.close();
781
            }
782
            if (!success) {
783
                retries++;
784
                if (retries >= MAX_RETRIES) {
785
                    throw new RuntimeException("Failed to load invalidate statements for " + thisNp.getUri() + " after " + MAX_RETRIES + " retries");
786
                }
787
                long delay = computeBackoffMillis(retries);
788
                log.info("Retrying in {} ms for invalidate statements of {} (attempt {}/{})...", delay, thisNp.getUri(), retries, MAX_RETRIES);
789
                try {
790
                    Thread.sleep(delay);
791
                } catch (InterruptedException x) {
792
                    Thread.currentThread().interrupt();
793
                }
794
            }
795
        }
796
        // Mirror the Registries' behaviour: index a retraction under the types of the
797
        // nanopub it invalidates, even when the retractor itself doesn't carry those
798
        // types. Load the full retracting nanopub (not just the npx:invalidates marker)
799
        // so a query against a type repo can fetch the retractor's own assertion /
800
        // provenance / pubinfo, not only the join handle.
801
        // loadNanopubToRepo is idempotent (early-exit on npa:hasLoadNumber) and runs
802
        // its own SERIALIZABLE transaction + retry loop, so it's safe to call here.
803
        for (IRI typeIri : typesToLoadFullInto) {
804
            loadNanopubToRepo(thisNp.getUri(), thisAllStatements, "type_" + Utils.createHash(typeIri));
805
        }
806
    }
807

808
    /**
809
     * Extracts a map from invalidator IRI to that invalidator's pubkey literal
810
     * out of an {@code invalidatingStatements} list as produced by
811
     * {@link #getInvalidatingStatements}. The list interleaves
812
     * {@code (?inv, npx:invalidates, ?np)} and
813
     * {@code (?inv, npa:hasValidSignatureForPublicKey, ?pubkey)} triples per
814
     * invalidator; this helper picks out only the pubkey-binding triples.
815
     */
816
    private static Map<IRI, String> collectInvalidatorPubkeys(List<Statement> invalidatingStatements) {
817
        Map<IRI, String> result = new LinkedHashMap<>();
×
818
        for (Statement st : invalidatingStatements) {
×
819
            if (st.getPredicate().equals(NPA.HAS_VALID_SIGNATURE_FOR_PUBLIC_KEY)
×
820
                    && st.getSubject() instanceof IRI invIri) {
×
821
                result.put(invIri, st.getObject().stringValue());
×
822
            }
823
        }
×
824
        return result;
×
825
    }
826

827
    /**
828
     * Reverse-order counterpart of the forward-order full-content propagation in
829
     * {@link #loadInvalidateStatements}: when this nanopub had already-loaded
830
     * retractors at the time of its own load (captured by
831
     * {@link #getInvalidatingStatements}), load each retractor's full content
832
     * into this nanopub's per-type repos — restricted to those types the
833
     * retractor doesn't itself carry (the retractor's own load already populated
834
     * the type repos it covers).
835
     *
836
     * <p>Source is the retractor's per-pubkey repo, which is the one shard
837
     * unconditionally populated for every successfully-loaded nanopub. The
838
     * retractor's types are read from the meta repo so we can skip type repos
839
     * the retractor's own regular load already populated.
840
     */
841
    @GeneratedFlagForDependentElements
842
    private static void loadInvalidatorIntoTypeRepos(IRI invIri, String invPubkey, IRI thisNpId, Set<IRI> thisNpTypes) {
843
        Set<IRI> invTypes = readInvalidatorTypesFromMeta(invIri, thisNpId);
844

845
        List<IRI> typesToLoadInto = new ArrayList<>();
846
        for (IRI typeIri : thisNpTypes) {
847
            // Match the regular per-type load loop's exclusion of locally-minted IRIs.
848
            if (typeIri.stringValue().startsWith(thisNpId.stringValue())) continue;
849
            if (!typeIri.stringValue().matches("https?://.*")) continue;
850
            if (!invTypes.contains(typeIri)) typesToLoadInto.add(typeIri);
851
        }
852
        if (typesToLoadInto.isEmpty()) return;
853

854
        List<Statement> invContent = fetchNanopubAllStatementsFromPubkeyRepo(invIri, invPubkey);
855
        for (IRI typeIri : typesToLoadInto) {
856
            loadNanopubToRepo(invIri, invContent, "type_" + Utils.createHash(typeIri));
857
        }
858
    }
859

860
    @GeneratedFlagForDependentElements
861
    private static Set<IRI> readInvalidatorTypesFromMeta(IRI invIri, IRI thisNpId) {
862
        Set<IRI> invTypes = new HashSet<>();
863
        boolean success = false;
864
        int retries = 0;
865
        while (!success) {
866
            invTypes.clear();
867
            RepositoryConnection metaConn = TripleStore.get().getRepoConnection("meta");
868
            try (metaConn) {
869
                metaConn.begin(IsolationLevels.READ_COMMITTED);
870
                for (Value v : Utils.getObjectsForPattern(metaConn, NPA.GRAPH, invIri, NPX.HAS_NANOPUB_TYPE)) {
871
                    if (v instanceof IRI ti) invTypes.add(ti);
872
                }
873
                metaConn.commit();
874
                success = true;
875
            } catch (Exception ex) {
876
                log.warn("Could not read invalidator types for {} (target nanopub {}).", invIri, thisNpId, ex);
877
                if (metaConn.isActive()) metaConn.rollback();
878
            }
879
            if (!success) {
880
                retries++;
881
                if (retries >= MAX_RETRIES) {
882
                    throw new RuntimeException("Failed to read invalidator types for " + invIri + " after " + MAX_RETRIES + " retries");
883
                }
884
                long delay = computeBackoffMillis(retries);
885
                log.info("Retrying in {} ms for invalidator types of {} (attempt {}/{})...", delay, invIri, retries, MAX_RETRIES);
886
                try {
887
                    Thread.sleep(delay);
888
                } catch (InterruptedException x) {
889
                    Thread.currentThread().interrupt();
890
                }
891
            }
892
        }
893
        return invTypes;
894
    }
895

896
    /**
897
     * Reads a nanopub's content back from its per-pubkey repo as a list of
898
     * statements that mirrors what its original load produced into
899
     * {@code allStatements}, minus the per-repo bookkeeping triples
900
     * ({@code npa:hasLoadNumber}, {@code npa:hasLoadChecksum},
901
     * {@code npa:hasLoadTimestamp}). {@link #loadNanopubToRepo} stamps those
902
     * fresh on every destination repo, so they must be filtered out of the
903
     * source set.
904
     *
905
     * <p>Fetched content:
906
     * <ul>
907
     *   <li>All triples in the nanopub's four named graphs, discovered via
908
     *       {@code <npId> npa:hasGraph ?g} in {@code npa:graph}.</li>
909
     *   <li>{@code (<npId>, ?p, ?o)} in {@code npa:graph}, excluding the per-repo
910
     *       bookkeeping predicates above.</li>
911
     *   <li>{@code (?inv, npx:invalidates, <npId>)} in {@code npa:graph}, plus
912
     *       the matching {@code npa:hasValidSignatureForPublicKey[Hash]} triples
913
     *       of each {@code ?inv}, so propagation carries the nanopub's full
914
     *       invalidator history (which doesn't affect query results — see the
915
     *       one-hop filter in {@code Utils#defaultQuery} — but keeps repos
916
     *       consistent).</li>
917
     *   <li>{@code (<npId>, ?p, ?o)} in {@code npa:networkGraph}.</li>
918
     * </ul>
919
     */
920
    @GeneratedFlagForDependentElements
921
    private static List<Statement> fetchNanopubAllStatementsFromPubkeyRepo(IRI npId, String pubkey) {
922
        String repoName = "pubkey_" + Utils.createHash(pubkey);
923
        boolean success = false;
924
        int retries = 0;
925
        List<Statement> result = new ArrayList<>();
926
        while (!success) {
927
            result.clear();
928
            RepositoryConnection conn = TripleStore.get().getRepoConnection(repoName);
929
            try (conn) {
930
                // Append-only data + idempotent re-load downstream: READ_COMMITTED suffices.
931
                conn.begin(IsolationLevels.READ_COMMITTED);
932

933
                List<IRI> npGraphs = new ArrayList<>();
934
                try (RepositoryResult<Statement> r = conn.getStatements(npId, NPA.HAS_GRAPH, null, NPA.GRAPH)) {
935
                    while (r.hasNext()) {
936
                        Value o = r.next().getObject();
937
                        if (o instanceof IRI iri) npGraphs.add(iri);
938
                    }
939
                }
940

941
                for (IRI g : npGraphs) {
942
                    try (RepositoryResult<Statement> r = conn.getStatements(null, null, null, g)) {
943
                        while (r.hasNext()) result.add(r.next());
944
                    }
945
                }
946

947
                try (RepositoryResult<Statement> r = conn.getStatements(npId, null, null, NPA.GRAPH)) {
948
                    while (r.hasNext()) {
949
                        Statement st = r.next();
950
                        IRI p = st.getPredicate();
951
                        if (p.equals(NPA.HAS_LOAD_NUMBER)
952
                                || p.equals(NPA.HAS_LOAD_CHECKSUM)
953
                                || p.equals(NPA.HAS_LOAD_TIMESTAMP)) {
954
                            continue;
955
                        }
956
                        result.add(st);
957
                    }
958
                }
959

960
                Set<IRI> invalidators = new HashSet<>();
961
                try (RepositoryResult<Statement> r = conn.getStatements(null, NPX.INVALIDATES, npId, NPA.GRAPH)) {
962
                    while (r.hasNext()) {
963
                        Statement st = r.next();
964
                        result.add(st);
965
                        if (st.getSubject() instanceof IRI invIri) invalidators.add(invIri);
966
                    }
967
                }
968
                for (IRI invIri : invalidators) {
969
                    try (RepositoryResult<Statement> r = conn.getStatements(invIri, NPA.HAS_VALID_SIGNATURE_FOR_PUBLIC_KEY, null, NPA.GRAPH)) {
970
                        while (r.hasNext()) result.add(r.next());
971
                    }
972
                    try (RepositoryResult<Statement> r = conn.getStatements(invIri, NPA.HAS_VALID_SIGNATURE_FOR_PUBLIC_KEY_HASH, null, NPA.GRAPH)) {
973
                        while (r.hasNext()) result.add(r.next());
974
                    }
975
                }
976

977
                try (RepositoryResult<Statement> r = conn.getStatements(npId, null, null, NPA.NETWORK_GRAPH)) {
978
                    while (r.hasNext()) result.add(r.next());
979
                }
980

981
                conn.commit();
982
                success = true;
983
            } catch (Exception ex) {
984
                log.warn("Could not fetch nanopub content from {} for {}.", repoName, npId, ex);
985
                if (conn.isActive()) conn.rollback();
986
            }
987
            if (!success) {
988
                retries++;
989
                if (retries >= MAX_RETRIES) {
990
                    throw new RuntimeException("Failed to fetch nanopub content from " + repoName + " for " + npId + " after " + MAX_RETRIES + " retries");
991
                }
992
                long delay = computeBackoffMillis(retries);
993
                log.info("Retrying in {} ms for fetching nanopub content of {} from {} (attempt {}/{})...", delay, npId, repoName, retries, MAX_RETRIES);
994
                try {
995
                    Thread.sleep(delay);
996
                } catch (InterruptedException x) {
997
                    Thread.currentThread().interrupt();
998
                }
999
            }
1000
        }
1001
        return result;
1002
    }
1003

1004
    @GeneratedFlagForDependentElements
1005
    private static RepositoryConnection loadStatements(String repoName, Statement... statements) {
1006
        RepositoryConnection conn = TripleStore.get().getRepoConnection(repoName);
1007
        // Basic isolation: we only append new statements
1008
        conn.begin(IsolationLevels.READ_COMMITTED);
1009
        for (Statement st : statements) {
1010
            conn.add(st);
1011
        }
1012
        return conn;
1013
    }
1014

1015
    @GeneratedFlagForDependentElements
1016
    static List<Statement> getInvalidatingStatements(IRI npId) {
1017
        List<Statement> invalidatingStatements = new ArrayList<>();
1018
        boolean success = false;
1019
        int retries = 0;
1020
        while (!success) {
1021
            RepositoryConnection conn = TripleStore.get().getRepoConnection("meta");
1022
            try (conn) {
1023
                // Basic isolation because here we only read append-only data.
1024
                conn.begin(IsolationLevels.READ_COMMITTED);
1025

1026
                TupleQueryResult r = conn.prepareTupleQuery(QueryLanguage.SPARQL, "SELECT * { graph <" + NPA.GRAPH + "> { " + "?np <" + NPX.INVALIDATES + "> <" + npId + "> ; <" + NPA.HAS_VALID_SIGNATURE_FOR_PUBLIC_KEY + "> ?pubkey . " + "} }").evaluate();
1027
                try (r) {
1028
                    while (r.hasNext()) {
1029
                        BindingSet b = r.next();
1030
                        invalidatingStatements.add(vf.createStatement((IRI) b.getBinding("np").getValue(), NPX.INVALIDATES, npId, NPA.GRAPH));
1031
                        invalidatingStatements.add(vf.createStatement((IRI) b.getBinding("np").getValue(), NPA.HAS_VALID_SIGNATURE_FOR_PUBLIC_KEY, b.getBinding("pubkey").getValue(), NPA.GRAPH));
1032
                    }
1033
                }
1034
                conn.commit();
1035
                success = true;
1036
            } catch (Exception ex) {
1037
                log.warn("Could not load invalidating statements for {}.", npId, ex);
1038
                if (conn.isActive()) conn.rollback();
1039
            }
1040
            if (!success) {
1041
                retries++;
1042
                if (retries >= MAX_RETRIES) {
1043
                    throw new RuntimeException("Failed to get invalidating statements for " + npId + " after " + MAX_RETRIES + " retries");
1044
                }
1045
                long delay = computeBackoffMillis(retries);
1046
                log.info("Retrying in {} ms for invalidating statements of {} (attempt {}/{})...", delay, npId, retries, MAX_RETRIES);
1047
                try {
1048
                    Thread.sleep(delay);
1049
                } catch (InterruptedException x) {
1050
                    Thread.currentThread().interrupt();
1051
                }
1052
            }
1053
        }
1054
        return invalidatingStatements;
1055
    }
1056

1057
    @GeneratedFlagForDependentElements
1058
    private static void loadNoteToRepo(Resource subj, String note) {
1059
        boolean success = false;
1060
        int retries = 0;
1061
        while (!success) {
1062
            RepositoryConnection conn = TripleStore.get().getAdminRepoConnection();
1063
            try (conn) {
1064
                List<Statement> statements = new ArrayList<>();
1065
                statements.add(vf.createStatement(subj, NPA.NOTE, vf.createLiteral(note), NPA.GRAPH));
1066
                conn.add(statements);
1067
                success = true;
1068
            } catch (Exception ex) {
1069
                log.warn("Could not load note to repo for {}.", subj, ex);
1070
            }
1071
            if (!success) {
1072
                retries++;
1073
                if (retries >= MAX_RETRIES) {
1074
                    throw new RuntimeException("Failed to load note to repo for " + subj + " after " + MAX_RETRIES + " retries");
1075
                }
1076
                long delay = computeBackoffMillis(retries);
1077
                log.info("Retrying in {} ms for note on {} (attempt {}/{})...", delay, subj, retries, MAX_RETRIES);
1078
                try {
1079
                    Thread.sleep(delay);
1080
                } catch (InterruptedException x) {
1081
                    Thread.currentThread().interrupt();
1082
                }
1083
            }
1084
        }
1085
    }
1086

1087
    static boolean hasValidSignature(NanopubSignatureElement el) {
1088
        if (el == null) {
6!
1089
            log.warn("Signature validation: signature element is null");
×
1090
            return false;
×
1091
        }
1092
        try {
1093
            if (SignatureUtils.hasValidSignature(el) && el.getPublicKeyString() != null) {
18!
1094
                return true;
6✔
1095
            }
1096
            log.warn("Signature validation returned false for {} (pubkey present: {})",
12✔
1097
                    el.getUri(), el.getPublicKeyString() != null);
21!
1098
        } catch (GeneralSecurityException ex) {
3✔
1099
            log.warn("Signature validation failed for signature element {}", el.getUri(), ex);
18✔
1100
        }
3✔
1101
        return false;
6✔
1102
    }
1103

1104
    private static IRI getBaseTrustyUri(Value v) {
1105
        if (!(v instanceof IRI)) return null;
9!
1106
        String s = v.stringValue();
9✔
1107
        if (!s.matches(".*[^A-Za-z0-9\\-_]RA[A-Za-z0-9\\-_]{43}([^A-Za-z0-9\\\\-_].{0,43})?")) {
12✔
1108
            return null;
6✔
1109
        }
1110
        return vf.createIRI(s.replaceFirst("^(.*[^A-Za-z0-9\\-_]RA[A-Za-z0-9\\-_]{43})([^A-Za-z0-9\\\\-_].{0,43})?$", "$1"));
21✔
1111
    }
1112

1113
    // TODO: Move this to nanopub library:
1114
    private static boolean isIntroNanopub(Nanopub np) {
1115
        for (Statement st : np.getAssertion()) {
33✔
1116
            if (st.getPredicate().equals(NPX.DECLARED_BY)) return true;
21✔
1117
        }
3✔
1118
        return false;
6✔
1119
    }
1120

1121
    /**
1122
     * Check if a nanopub is already loaded in the admin graph.
1123
     *
1124
     * @param npId the nanopub ID
1125
     * @return true if the nanopub is loaded, false otherwise
1126
     */
1127
    @GeneratedFlagForDependentElements
1128
    static boolean isNanopubLoaded(String npId) {
1129
        boolean loaded = false;
1130
        RepositoryConnection conn = TripleStore.get().getRepoConnection("meta");
1131
        try (conn) {
1132
            if (Utils.getObjectForPattern(conn, NPA.GRAPH, vf.createIRI(npId), NPA.HAS_LOAD_NUMBER) != null) {
1133
                loaded = true;
1134
            }
1135
        } catch (Exception ex) {
1136
            log.warn("Could not check whether nanopub is loaded.", ex);
1137
        }
1138
        return loaded;
1139
    }
1140

1141
    private static ValueFactory vf = SimpleValueFactory.getInstance();
6✔
1142

1143
    // TODO remove the constants and use the ones from the nanopub library instead
1144

1145
    /**
1146
     * Template for the query that fetches the status of a repository.
1147
     */
1148
    // Template for .fetchRepoStatus
1149
    private static final String REPO_STATUS_QUERY_TEMPLATE = """
84✔
1150
            SELECT * { graph <%s> {
1151
              OPTIONAL { <%s> <%s> ?loadNumber . }
1152
              <%s> <%s> ?count ;
1153
                   <%s> ?checksum .
1154
            } }
1155
            """.formatted(NPA.GRAPH, "%s", NPA.HAS_LOAD_NUMBER, NPA.THIS_REPO, NPA.HAS_NANOPUB_COUNT, NPA.HAS_NANOPUB_CHECKSUM);
6✔
1156
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc