• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-query / 26084788212

19 May 2026 08:10AM UTC coverage: 58.251% (-0.5%) from 58.799%
26084788212

push

github

web-flow
Merge pull request #101 from knowledgepixels/fix/load-resilience-audit

Surface and contain silent load-time failures

498 of 932 branches covered (53.43%)

Branch coverage included in aggregate %.

1320 of 2189 relevant lines covered (60.3%)

9.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.0
src/main/java/com/knowledgepixels/query/NanopubLoader.java
1
package com.knowledgepixels.query;
2

3
import net.trustyuri.TrustyUriUtils;
4
import org.apache.http.client.HttpClient;
5
import org.apache.http.impl.client.HttpClientBuilder;
6
import org.eclipse.rdf4j.common.exception.RDF4JException;
7
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
8
import org.eclipse.rdf4j.model.*;
9
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
10
import org.eclipse.rdf4j.model.vocabulary.DCTERMS;
11
import org.eclipse.rdf4j.model.vocabulary.RDFS;
12
import org.eclipse.rdf4j.query.BindingSet;
13
import org.eclipse.rdf4j.query.QueryLanguage;
14
import org.eclipse.rdf4j.query.TupleQuery;
15
import org.eclipse.rdf4j.query.TupleQueryResult;
16
import org.eclipse.rdf4j.repository.RepositoryConnection;
17
import org.nanopub.Nanopub;
18
import org.nanopub.NanopubUtils;
19
import org.nanopub.SimpleCreatorPattern;
20
import org.nanopub.SimpleTimestampPattern;
21
import org.nanopub.extra.security.KeyDeclaration;
22
import org.nanopub.extra.security.MalformedCryptoElementException;
23
import org.nanopub.extra.security.NanopubSignatureElement;
24
import org.nanopub.extra.security.SignatureUtils;
25
import org.nanopub.extra.server.GetNanopub;
26
import org.nanopub.extra.setting.IntroNanopub;
27
import org.nanopub.vocabulary.NP;
28
import org.nanopub.vocabulary.NPA;
29
import org.nanopub.vocabulary.NPX;
30
import org.nanopub.vocabulary.PAV;
31
import org.slf4j.Logger;
32
import org.slf4j.LoggerFactory;
33

34

35
import java.security.GeneralSecurityException;
36
import java.util.*;
37
import java.util.concurrent.ExecutionException;
38
import java.util.concurrent.Executors;
39
import java.util.concurrent.Future;
40
import java.util.concurrent.ThreadLocalRandom;
41
import java.util.concurrent.ThreadPoolExecutor;
42
import java.util.function.Consumer;
43

44
/**
45
 * Utility class for loading nanopublications into the database.
46
 */
47
public class NanopubLoader {
48

49
    private static HttpClient httpClient;
50
    private static final ThreadPoolExecutor loadingPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
12✔
51

52
    /**
53
     * Cached count of nanopubs ever loaded into the {@code full} repo. Maintained
54
     * for {@link MainVerticle}'s {@code Nanopub-Query-Loaded-Nanopub-Count}
55
     * response header. Mirrors the persisted {@code npa:hasNanopubCount} triple
56
     * that {@link #loadNanopubToRepo} maintains; invalidations don't decrement
57
     * (they're recorded as separate {@code npx:invalidates} markers), so this
58
     * is a cumulative count including superseded/retracted nanopubs — matching
59
     * the registry-side {@code Nanopub-Registry-Nanopub-Count} semantics.
60
     * Populated lazily on first read; bumped post-commit on each fresh load.
61
     */
62
    static volatile Long loadedNanopubCount = null;
6✔
63

64
    /**
65
     * Retry budget for the five (with #71 merged: six) structurally identical
66
     * retry loops in this file. Previously the shape was flat {@code 10 s × 30} —
67
     * five minutes of constant hammering at RDF4J that did not help a slow server.
68
     * The new shape is bounded exponential backoff with ±50 % jitter:
69
     * {@code base = 1, 2, 4, 8, 16, 32, 60, 60 s} for attempts 1…8, each perturbed
70
     * by up to half its base value. Jitter prevents the 4-thread loadingPool from
71
     * retrying in lock-step after a shared RDF4J failure (GC pause / overload spike).
72
     * Worst-case wall time per failing task drops from ~35 min (post-change-1
73
     * timeouts × 30 flat retries) to ~11 min (8 retries × 60 s timeout + backoff
74
     * sleeps). This is the figure that sets the circuit-breaker trip time in
75
     * {@link JellyNanopubLoader}.
76
     */
77
    private static final int MAX_RETRIES = 8;
78
    private static final long[] BACKOFF_BASE_MS =
105✔
79
            {1_000L, 2_000L, 4_000L, 8_000L, 16_000L, 32_000L, 60_000L, 60_000L};
80

81
    /**
82
     * Returns the sleep delay in ms for the given 1-indexed retry attempt. Delay
83
     * is {@link #BACKOFF_BASE_MS}{@code [attempt-1]} perturbed by ±50 % uniform
84
     * jitter, clamped to be non-negative.
85
     *
86
     * @param attempt 1-indexed retry attempt number
87
     * @return the computed sleep delay in ms
88
     */
89
    static long computeBackoffMillis(int attempt) {
90
        long base = BACKOFF_BASE_MS[Math.min(attempt - 1, BACKOFF_BASE_MS.length - 1)];
×
91
        long jitter = ThreadLocalRandom.current().nextLong(base + 1) - base / 2;
×
92
        return Math.max(0L, base + jitter);
×
93
    }
94
    private Nanopub np;
95
    private NanopubSignatureElement el = null;
9✔
96
    private List<Statement> metaStatements = new ArrayList<>();
15✔
97
    private List<Statement> nanopubStatements = new ArrayList<>();
15✔
98
    private List<Statement> literalStatements = new ArrayList<>();
15✔
99
    private List<Statement> invalidateStatements = new ArrayList<>();
15✔
100
    private List<Statement> textStatements, allStatements;
101
    private List<Statement> spaceExtractionStatements = new ArrayList<>();
15✔
102
    private Calendar timestamp = null;
9✔
103
    private Statement pubkeyStatement, pubkeyStatementX;
104
    private List<String> notes = new ArrayList<>();
15✔
105
    private boolean aborted = false;
9✔
106
    private static final Logger log = LoggerFactory.getLogger(NanopubLoader.class);
9✔
107

108

109
    NanopubLoader(Nanopub np, long counter) {
6✔
110
        this.np = np;
9✔
111
        if (counter >= 0) {
12✔
112
            log.info("Loading {}: {}", counter, np.getUri());
24✔
113
        } else {
114
            log.info("Loading: {}", np.getUri());
15✔
115
        }
116

117
        // TODO Ensure proper synchronization and DB rollbacks
118

119
        // TODO Check for null characters ("\0"), which can cause problems in Virtuoso.
120

121
        String ac = TrustyUriUtils.getArtifactCode(np.getUri().toString());
15✔
122
        if (!np.getHeadUri().toString().contains(ac) || !np.getAssertionUri().toString().contains(ac) || !np.getProvenanceUri().toString().contains(ac) || !np.getPubinfoUri().toString().contains(ac)) {
72!
123
            notes.add("could not load nanopub as not all graphs contained the artifact code");
×
124
            aborted = true;
×
125
            return;
×
126
        }
127

128
        try {
129
            el = SignatureUtils.getSignatureElement(np);
12✔
130
        } catch (MalformedCryptoElementException ex) {
×
131
            notes.add("Signature error");
×
132
        }
3✔
133
        if (!hasValidSignature(el)) {
12✔
134
            // Audit trail for the silent-false path: without this, an aborted nanopub
135
            // is invisible in the admin repo and only detectable as a gap between the
136
            // stream counter and the loaded count. See issue around RDF4J-instability
137
            // load gaps (full vs registry, meta vs full) where signature validation
138
            // returned false but no GeneralSecurityException was thrown.
139
            if (notes.isEmpty()) {
12!
140
                notes.add("Invalid signature");
15✔
141
            }
142
            aborted = true;
9✔
143
            return;
3✔
144
        }
145

146
        pubkeyStatement = vf.createStatement(np.getUri(), NPA.HAS_VALID_SIGNATURE_FOR_PUBLIC_KEY, vf.createLiteral(el.getPublicKeyString()), NPA.GRAPH);
39✔
147
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasValidSignatureForPublicKey, FULL_PUBKEY, npa:graph, meta, full pubkey if signature is valid
148
        metaStatements.add(pubkeyStatement);
18✔
149
        pubkeyStatementX = vf.createStatement(np.getUri(), NPA.HAS_VALID_SIGNATURE_FOR_PUBLIC_KEY_HASH, vf.createLiteral(Utils.createHash(el.getPublicKeyString())), NPA.GRAPH);
42✔
150
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasValidSignatureForPublicKeyHash, PUBKEY_HASH, npa:graph, meta, hex-encoded SHA256 hash if signature is valid
151
        metaStatements.add(pubkeyStatementX);
18✔
152

153
        if (el.getSigners().size() == 1) {  // > 1 is deprecated
18!
154
            metaStatements.add(vf.createStatement(np.getUri(), NPX.SIGNED_BY, el.getSigners().iterator().next(), NPA.GRAPH));
48✔
155
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:signedBy, SIGNER, npa:graph, meta, ID of signer
156
        }
157

158
        Set<IRI> subIris = new HashSet<>();
12✔
159
        Set<IRI> otherNps = new HashSet<>();
12✔
160
        Set<IRI> invalidated = new HashSet<>();
12✔
161
        Set<IRI> retracted = new HashSet<>();
12✔
162
        Set<IRI> superseded = new HashSet<>();
12✔
163
        String combinedLiterals = "";
6✔
164
        for (Statement st : NanopubUtils.getStatements(np)) {
33✔
165
            nanopubStatements.add(st);
15✔
166

167
            if (st.getPredicate().toString().contains(ac)) {
18!
168
                subIris.add(st.getPredicate());
×
169
            } else {
170
                IRI b = getBaseTrustyUri(st.getPredicate());
12✔
171
                if (b != null) otherNps.add(b);
6!
172
            }
173
            if (st.getPredicate().equals(NPX.RETRACTS) && st.getObject() instanceof IRI) {
15!
174
                retracted.add((IRI) st.getObject());
×
175
            }
176
            if (st.getPredicate().equals(NPX.INVALIDATES) && st.getObject() instanceof IRI) {
15!
177
                invalidated.add((IRI) st.getObject());
×
178
            }
179
            if (st.getSubject().equals(np.getUri()) && st.getObject() instanceof IRI) {
30✔
180
                if (st.getPredicate().equals(NPX.SUPERSEDES)) {
15✔
181
                    superseded.add((IRI) st.getObject());
18✔
182
                }
183
                if (st.getObject().toString().matches(".*[^A-Za-z0-9\\-_]RA[A-Za-z0-9\\-_]{43}")) {
18✔
184
                    metaStatements.add(vf.createStatement(np.getUri(), st.getPredicate(), st.getObject(), NPA.NETWORK_GRAPH));
39✔
185
                    // @ADMIN-TRIPLE-TABLE@ NANOPUB1, RELATION, NANOPUB2, npa:networkGraph, meta, any inter-nanopub relation found in NANOPUB1
186
                }
187
                if (st.getContext().equals(np.getPubinfoUri())) {
18✔
188
                    if (st.getPredicate().equals(NPX.INTRODUCES) || st.getPredicate().equals(NPX.DESCRIBES) || st.getPredicate().equals(NPX.EMBEDS)) {
45!
189
                        metaStatements.add(vf.createStatement(np.getUri(), st.getPredicate(), st.getObject(), NPA.GRAPH));
39✔
190
                        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:introduces, THING, npa:graph, meta, when such a triple is present in pubinfo of NANOPUB
191
                        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:describes, THING, npa:graph, meta, when such a triple is present in pubinfo of NANOPUB
192
                        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:embeds, THING, npa:graph, meta, when such a triple is present in pubinfo of NANOPUB
193
                    }
194
                }
195
            }
196
            if (st.getSubject().toString().contains(ac)) {
18✔
197
                subIris.add((IRI) st.getSubject());
21✔
198
            } else {
199
                IRI b = getBaseTrustyUri(st.getSubject());
12✔
200
                if (b != null) otherNps.add(b);
6!
201
            }
202
            if (st.getObject() instanceof IRI) {
12✔
203
                if (st.getObject().toString().contains(ac)) {
18✔
204
                    subIris.add((IRI) st.getObject());
21✔
205
                } else {
206
                    IRI b = getBaseTrustyUri(st.getObject());
12✔
207
                    if (b != null) otherNps.add(b);
18✔
208
                }
3✔
209
            } else {
210
                combinedLiterals += st.getObject().stringValue().replaceAll("\\s+", " ") + "\n";
27✔
211
//                                if (st.getSubject().equals(np.getUri()) && !st.getSubject().equals(HAS_FILTER_LITERAL)) {
212
//                                        literalStatements.add(vf.createStatement(np.getUri(), st.getPredicate(), st.getObject(), LITERAL_GRAPH));
213
//                                } else {
214
//                                        literalStatements.add(vf.createStatement(np.getUri(), HAS_LITERAL, st.getObject(), LITERAL_GRAPH));
215
//                                }
216
            }
217
        }
3✔
218
        subIris.remove(np.getUri());
15✔
219
        subIris.remove(np.getAssertionUri());
15✔
220
        subIris.remove(np.getProvenanceUri());
15✔
221
        subIris.remove(np.getPubinfoUri());
15✔
222
        for (IRI i : subIris) {
30✔
223
            metaStatements.add(vf.createStatement(np.getUri(), NPA.HAS_SUB_IRI, i, NPA.GRAPH));
33✔
224
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasSubIri, SUB_IRI, npa:graph, meta, for any IRI minted in the namespace of the NANOPUB
225
        }
3✔
226
        for (IRI i : otherNps) {
30✔
227
            metaStatements.add(vf.createStatement(np.getUri(), NPA.REFERS_TO_NANOPUB, i, NPA.NETWORK_GRAPH));
33✔
228
            // @ADMIN-TRIPLE-TABLE@ NANOPUB1, npa:refersToNanopub, NANOPUB2, npa:networkGraph, meta, generic inter-nanopub relation
229
        }
3✔
230
        for (IRI i : invalidated) {
18!
231
            invalidateStatements.add(vf.createStatement(np.getUri(), NPX.INVALIDATES, i, NPA.GRAPH));
×
232
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:invalidates, INVALIDATED_NANOPUB, npa:graph, meta, if the NANOPUB retracts or supersedes another nanopub
233
        }
×
234
        for (IRI i : retracted) {
18!
235
            invalidateStatements.add(vf.createStatement(np.getUri(), NPX.INVALIDATES, i, NPA.GRAPH));
×
236
            metaStatements.add(vf.createStatement(np.getUri(), NPX.RETRACTS, i, NPA.GRAPH));
×
237
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:retracts, RETRACTED_NANOPUB, npa:graph, meta, if the NANOPUB retracts another nanopub
238
        }
×
239
        for (IRI i : superseded) {
30✔
240
            invalidateStatements.add(vf.createStatement(np.getUri(), NPX.INVALIDATES, i, NPA.GRAPH));
33✔
241
            metaStatements.add(vf.createStatement(np.getUri(), NPX.SUPERSEDES, i, NPA.GRAPH));
33✔
242
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:supersedes, SUPERSEDED_NANOPUB, npa:graph, meta, if the NANOPUB supersedes another nanopub
243
        }
3✔
244

245
        metaStatements.add(vf.createStatement(np.getUri(), NPA.HAS_HEAD_GRAPH, np.getHeadUri(), NPA.GRAPH));
36✔
246
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasHeadGraph, HEAD_GRAPH, npa:graph, meta, direct link to the head graph of the NANOPUB
247
        metaStatements.add(vf.createStatement(np.getUri(), NPA.HAS_GRAPH, np.getHeadUri(), NPA.GRAPH));
36✔
248
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasGraph, GRAPH, npa:graph, meta, generic link to all four graphs of the given NANOPUB
249
        metaStatements.add(vf.createStatement(np.getUri(), NP.HAS_ASSERTION, np.getAssertionUri(), NPA.GRAPH));
36✔
250
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, np:hasAssertion, ASSERTION_GRAPH, npa:graph, meta, direct link to the assertion graph of the NANOPUB
251
        metaStatements.add(vf.createStatement(np.getUri(), NPA.HAS_GRAPH, np.getAssertionUri(), NPA.GRAPH));
36✔
252
        metaStatements.add(vf.createStatement(np.getUri(), NP.HAS_PROVENANCE, np.getProvenanceUri(), NPA.GRAPH));
36✔
253
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, np:hasProvenance, PROVENANCE_GRAPH, npa:graph, meta, direct link to the provenance graph of the NANOPUB
254
        metaStatements.add(vf.createStatement(np.getUri(), NPA.HAS_GRAPH, np.getProvenanceUri(), NPA.GRAPH));
36✔
255
        metaStatements.add(vf.createStatement(np.getUri(), NP.HAS_PUBINFO, np.getPubinfoUri(), NPA.GRAPH));
36✔
256
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, np:hasPublicationInfo, PUBINFO_GRAPH, npa:graph, meta, direct link to the pubinfo graph of the NANOPUB
257
        metaStatements.add(vf.createStatement(np.getUri(), NPA.HAS_GRAPH, np.getPubinfoUri(), NPA.GRAPH));
36✔
258

259
        String artifactCode = TrustyUriUtils.getArtifactCode(np.getUri().stringValue());
15✔
260
        metaStatements.add(vf.createStatement(np.getUri(), NPA.ARTIFACT_CODE, vf.createLiteral(artifactCode), NPA.GRAPH));
39✔
261
        // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:artifactCode, ARTIFACT_CODE, npa:graph, meta, artifact code starting with 'RA...'
262

263
        if (isIntroNanopub(np)) {
9✔
264
            IntroNanopub introNp = new IntroNanopub(np);
15✔
265
            metaStatements.add(vf.createStatement(np.getUri(), NPA.IS_INTRODUCTION_OF, introNp.getUser(), NPA.GRAPH));
36✔
266
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:isIntroductionOf, AGENT, npa:graph, meta, linking intro nanopub to the agent it is introducing
267
            for (KeyDeclaration kc : introNp.getKeyDeclarations()) {
33✔
268
                metaStatements.add(vf.createStatement(np.getUri(), NPA.DECLARES_PUBKEY, vf.createLiteral(kc.getPublicKeyString()), NPA.GRAPH));
42✔
269
                // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:declaresPubkey, FULL_PUBKEY, npa:graph, meta, full pubkey declared by the given intro NANOPUB
270
            }
3✔
271
        }
272

273
        try {
274
            timestamp = SimpleTimestampPattern.getCreationTime(np);
12✔
275
        } catch (IllegalArgumentException ex) {
×
276
            notes.add("Illegal date/time");
×
277
        }
3✔
278
        if (timestamp != null) {
9!
279
            metaStatements.add(vf.createStatement(np.getUri(), DCTERMS.CREATED, vf.createLiteral(timestamp.getTime()), NPA.GRAPH));
45✔
280
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, dct:created, CREATION_DATE, npa:graph, meta, normalized creation timestamp
281
        }
282

283
        String literalFilter = "_pubkey_" + Utils.createHash(el.getPublicKeyString());
18✔
284
        for (IRI typeIri : NanopubUtils.getTypes(np)) {
33✔
285
            metaStatements.add(vf.createStatement(np.getUri(), NPX.HAS_NANOPUB_TYPE, typeIri, NPA.GRAPH));
33✔
286
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npx:hasNanopubType, NANOPUB_TYPE, npa:graph, meta, type of NANOPUB
287
            literalFilter += " _type_" + Utils.createHash(typeIri);
15✔
288
        }
3✔
289
        String label = NanopubUtils.getLabel(np);
9✔
290
        if (label != null) {
6!
291
            metaStatements.add(vf.createStatement(np.getUri(), RDFS.LABEL, vf.createLiteral(label), NPA.GRAPH));
39✔
292
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, rdfs:label, LABEL, npa:graph, meta, label of NANOPUB
293
        }
294
        String description = NanopubUtils.getDescription(np);
9✔
295
        if (description != null) {
6✔
296
            metaStatements.add(vf.createStatement(np.getUri(), DCTERMS.DESCRIPTION, vf.createLiteral(description), NPA.GRAPH));
39✔
297
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, dct:description, LABEL, npa:graph, meta, description of NANOPUB
298
        }
299
        for (IRI creatorIri : SimpleCreatorPattern.getCreators(np)) {
33✔
300
            metaStatements.add(vf.createStatement(np.getUri(), DCTERMS.CREATOR, creatorIri, NPA.GRAPH));
33✔
301
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, dct:creator, CREATOR, npa:graph, meta, creator of NANOPUB (can be several)
302
        }
3✔
303
        for (IRI authorIri : SimpleCreatorPattern.getAuthors(np)) {
21!
304
            metaStatements.add(vf.createStatement(np.getUri(), PAV.AUTHORED_BY, authorIri, NPA.GRAPH));
×
305
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, pav:authoredBy, AUTHOR, npa:graph, meta, author of NANOPUB (can be several)
306
        }
×
307

308
        if (!combinedLiterals.isEmpty()) {
9!
309
            literalStatements.add(vf.createStatement(np.getUri(), NPA.HAS_FILTER_LITERAL, vf.createLiteral(literalFilter + "\n" + combinedLiterals), NPA.GRAPH));
45✔
310
            // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasFilterLiteral, FILTER_LITERAL, npa:graph, literal, auxiliary literal for filtering by type and pubkey in text repo
311
        }
312

313
        // Any statements that express that the currently processed nanopub is already invalidated:
314
        List<Statement> invalidatingStatements = getInvalidatingStatements(np.getUri());
12✔
315

316
        metaStatements.addAll(invalidateStatements);
18✔
317

318
        allStatements = new ArrayList<>(nanopubStatements);
21✔
319
        allStatements.addAll(metaStatements);
18✔
320
        allStatements.addAll(invalidatingStatements);
15✔
321

322
        textStatements = new ArrayList<>(literalStatements);
21✔
323
        textStatements.addAll(metaStatements);
18✔
324
        textStatements.addAll(invalidatingStatements);
15✔
325

326
        if (FeatureFlags.spacesEnabled()) {
6!
327
            IRI signedBy = (el.getSigners().size() == 1) ? el.getSigners().iterator().next() : null;
42!
328
            String pubkeyHash = Utils.createHash(el.getPublicKeyString());
15✔
329
            Date createdAt = (timestamp != null) ? timestamp.getTime() : null;
24!
330
            SpacesExtractor.Context ctx = new SpacesExtractor.Context(ac, signedBy, pubkeyHash, createdAt);
24✔
331
            spaceExtractionStatements = SpacesExtractor.extract(np, ctx);
15✔
332
        }
333
    }
3✔
334

335
    /**
336
     * Get the HTTP client used for fetching nanopublications.
337
     *
338
     * @return the HTTP client
339
     */
340
    static HttpClient getHttpClient() {
341
        if (httpClient == null) {
6✔
342
            httpClient = HttpClientBuilder.create().setDefaultRequestConfig(Utils.getHttpRequestConfig()).build();
15✔
343
        }
344
        return httpClient;
6✔
345
    }
346

347
    /**
348
     * Load the given nanopublication into the database.
349
     *
350
     * @param nanopubUri Nanopublication identifier (URI)
351
     */
352
    public static void load(String nanopubUri) {
353
        if (isNanopubLoaded(nanopubUri)) {
9!
354
            log.info("Already loaded: {}", nanopubUri);
×
355
        } else {
356
            Nanopub np = GetNanopub.get(nanopubUri, getHttpClient());
12✔
357
            load(np, -1);
9✔
358
        }
359
    }
3✔
360

361
    /**
362
     * Load a nanopub into the database.
363
     *
364
     * @param np      the nanopub to load
365
     * @param counter the load counter, only used for logging (or -1 if not known)
366
     * @throws RDF4JException if the loading fails
367
     */
368
    public static void load(Nanopub np, long counter) throws RDF4JException {
369
        NanopubLoader loader = new NanopubLoader(np, counter);
18✔
370
        loader.executeLoading();
6✔
371
    }
3✔
372

373
    @GeneratedFlagForDependentElements
374
    private void executeLoading() {
375
        var runningTasks = new ArrayList<Future<?>>();
376
        Consumer<Runnable> runTask = t -> runningTasks.add(loadingPool.submit(t));
×
377

378
        for (String note : notes) {
379
            loadNoteToRepo(np.getUri(), note);
380
        }
381

382
        if (!aborted) {
383
            // Submit all tasks except the "meta" task
384
            if (timestamp != null) {
385
                if (new Date().getTime() - timestamp.getTimeInMillis() < THIRTY_DAYS) {
386
                    if (FeatureFlags.last30dRepoEnabled()) {
387
                        runTask.accept(() -> loadNanopubToLatest(np.getUri(), allStatements));
×
388
                    }
389
                }
390
            }
391

392
            if (FeatureFlags.textRepoEnabled()) {
393
                runTask.accept(() -> loadNanopubToRepo(np.getUri(), textStatements, "text"));
×
394
            }
395
            if (FeatureFlags.fullRepoEnabled()) {
396
                runTask.accept(() -> loadNanopubToRepo(np.getUri(), allStatements, "full"));
×
397
            }
398
            // Note: "meta" task is deferred until all other tasks complete successfully
399

400
            runTask.accept(() -> loadNanopubToRepo(np.getUri(), allStatements, "pubkey_" + Utils.createHash(el.getPublicKeyString())));
×
401
            //                loadNanopubToRepo(np.getUri(), textStatements, "text-pubkey_" + Utils.createHash(el.getPublicKeyString()));
402
            for (IRI typeIri : NanopubUtils.getTypes(np)) {
403
                // Exclude locally minted IRIs:
404
                if (typeIri.stringValue().startsWith(np.getUri().stringValue())) continue;
405
                if (!typeIri.stringValue().matches("https?://.*")) continue;
406
                runTask.accept(() -> loadNanopubToRepo(np.getUri(), allStatements, "type_" + Utils.createHash(typeIri)));
×
407
                //                        loadNanopubToRepo(np.getUri(), textStatements, "text-type_" + Utils.createHash(typeIri));
408
            }
409
            //                for (IRI creatorIri : SimpleCreatorPattern.getCreators(np)) {
410
            //                        // Exclude locally minted IRIs:
411
            //                        if (creatorIri.stringValue().startsWith(np.getUri().stringValue())) continue;
412
            //                        if (!creatorIri.stringValue().matches("https?://.*")) continue;
413
            //                        loadNanopubToRepo(np.getUri(), allStatements, "user_" + Utils.createHash(creatorIri));
414
            //                        loadNanopubToRepo(np.getUri(), textStatements, "text-user_" + Utils.createHash(creatorIri));
415
            //                }
416
            //                for (IRI authorIri : SimpleCreatorPattern.getAuthors(np)) {
417
            //                        // Exclude locally minted IRIs:
418
            //                        if (authorIri.stringValue().startsWith(np.getUri().stringValue())) continue;
419
            //                        if (!authorIri.stringValue().matches("https?://.*")) continue;
420
            //                        loadNanopubToRepo(np.getUri(), allStatements, "user_" + Utils.createHash(authorIri));
421
            //                        loadNanopubToRepo(np.getUri(), textStatements, "text-user_" + Utils.createHash(authorIri));
422
            //                }
423

424
            if (FeatureFlags.spacesEnabled() && !spaceExtractionStatements.isEmpty()) {
425
                runTask.accept(() -> loadToSpacesRepo(np.getUri(), allStatements, spaceExtractionStatements));
×
426
            }
427

428
            for (Statement st : invalidateStatements) {
429
                runTask.accept(() -> loadInvalidateStatements(np, el.getPublicKeyString(), st, pubkeyStatement, pubkeyStatementX));
×
430
            }
431

432
            // Wait for all non-meta tasks to complete successfully before submitting the meta task.
433
            // On failure, cancel the remaining futures so orphaned tasks don't keep running in the
434
            // shared loadingPool and race with the next batch retry (which re-submits the same
435
            // nanopub against the same repos).
436
            for (var task : runningTasks) {
437
                try {
438
                    task.get();
439
                } catch (ExecutionException | InterruptedException ex) {
440
                    for (var t : runningTasks) {
441
                        if (!t.isDone()) t.cancel(true);
442
                    }
443
                    throw new RuntimeException("Error in nanopub loading thread", ex.getCause());
444
                }
445
            }
446

447
            // Now submit and wait for the "meta" task after all other tasks have completed successfully
448
            Future<?> metaTask = loadingPool.submit(() -> loadNanopubToRepo(np.getUri(), metaStatements, "meta"));
×
449
            try {
450
                metaTask.get();
451
            } catch (ExecutionException | InterruptedException ex) {
452
                throw new RuntimeException("Error in nanopub loading thread (meta task)", ex.getCause());
453
            }
454
        }
455
    }
456

457
    private static Long lastUpdateOfLatestRepo = null;
6✔
458
    private static long THIRTY_DAYS = 1000L * 60 * 60 * 24 * 30;
6✔
459
    private static long ONE_HOUR = 1000L * 60 * 60;
6✔
460

461
    @GeneratedFlagForDependentElements
462
    private static void loadNanopubToLatest(IRI npId, List<Statement> statements) {
463
        boolean success = false;
464
        int retries = 0;
465
        while (!success) {
466
            RepositoryConnection conn = TripleStore.get().getRepoConnection("last30d");
467
            try (conn) {
468
                // Read committed, because deleting old nanopubs is idempotent. Inserts do not collide
469
                // with deletes, because we are not inserting old nanopubs.
470
                conn.begin(IsolationLevels.READ_COMMITTED);
471
                conn.add(statements);
472
                if (lastUpdateOfLatestRepo == null || new Date().getTime() - lastUpdateOfLatestRepo > ONE_HOUR) {
473
                    log.trace("Remove old nanopubs...");
474
                    Literal thirtyDaysAgo = vf.createLiteral(new Date(new Date().getTime() - THIRTY_DAYS));
475
                    TupleQuery q = conn.prepareTupleQuery(QueryLanguage.SPARQL, "SELECT * { graph <" + NPA.GRAPH + "> { " + "?np <" + DCTERMS.CREATED + "> ?date . " + "filter ( ?date < ?thirtydaysago ) " + "} }");
476
                    q.setBinding("thirtydaysago", thirtyDaysAgo);
477
                    try (TupleQueryResult r = q.evaluate()) {
478
                        while (r.hasNext()) {
479
                            BindingSet b = r.next();
480
                            IRI oldNpId = (IRI) b.getBinding("np").getValue();
481
                            log.trace("Remove old nanopub: {}", oldNpId);
482
                            for (Value v : Utils.getObjectsForPattern(conn, NPA.GRAPH, oldNpId, NPA.HAS_GRAPH)) {
483
                                // Remove all four nanopub graphs:
484
                                conn.remove((Resource) null, (IRI) null, (Value) null, (IRI) v);
485
                            }
486
                            // Remove nanopubs in admin graphs:
487
                            conn.remove(oldNpId, null, null, NPA.GRAPH);
488
                            conn.remove(oldNpId, null, null, NPA.NETWORK_GRAPH);
489
                        }
490
                    }
491
                    lastUpdateOfLatestRepo = new Date().getTime();
492
                }
493
                conn.commit();
494
                success = true;
495
            } catch (Exception ex) {
496
                log.warn("Could not load nanopub {} to last30d repo.", npId, ex);
497
                if (conn.isActive()) conn.rollback();
498
            }
499
            if (!success) {
500
                retries++;
501
                if (retries >= MAX_RETRIES) {
502
                    throw new RuntimeException("Failed to load nanopub " + npId + " to last30d repo after " + MAX_RETRIES + " retries");
503
                }
504
                long delay = computeBackoffMillis(retries);
505
                log.info("Retrying in {} ms for nanopub {} in last30d (attempt {}/{})...", delay, npId, retries, MAX_RETRIES);
506
                try {
507
                    Thread.sleep(delay);
508
                } catch (InterruptedException x) {
509
                    Thread.currentThread().interrupt();
510
                }
511
            }
512
        }
513
    }
514

515
    @GeneratedFlagForDependentElements
516
    private static void loadNanopubToRepo(IRI npId, List<Statement> statements, String repoName) {
517
        boolean success = false;
518
        int retries = 0;
519
        while (!success) {
520
            RepositoryConnection conn = TripleStore.get().getRepoConnection(repoName);
521
            long newCountForCache = -1;
522
            try (conn) {
523
                // Serializable, because write skew would cause the chain of hashes to be broken.
524
                // The inserts must be done serially.
525
                conn.begin(IsolationLevels.SERIALIZABLE);
526
                var repoStatus = fetchRepoStatus(conn, npId);
527
                if (repoStatus.isLoaded) {
528
                    log.info("Already loaded: {}", npId);
529
                } else {
530
                    String newChecksum = NanopubUtils.updateXorChecksum(npId, repoStatus.checksum);
531
                    conn.remove(NPA.THIS_REPO, NPA.HAS_NANOPUB_COUNT, null, NPA.GRAPH);
532
                    conn.remove(NPA.THIS_REPO, NPA.HAS_NANOPUB_CHECKSUM, null, NPA.GRAPH);
533
                    conn.add(NPA.THIS_REPO, NPA.HAS_NANOPUB_COUNT, vf.createLiteral(repoStatus.count + 1), NPA.GRAPH);
534
                    // @ADMIN-TRIPLE-TABLE@ REPO, npa:hasNanopubCount, NANOPUB_COUNT, npa:graph, admin, number of nanopubs loaded
535
                    conn.add(NPA.THIS_REPO, NPA.HAS_NANOPUB_CHECKSUM, vf.createLiteral(newChecksum), NPA.GRAPH);
536
                    // @ADMIN-TRIPLE-TABLE@ REPO, npa:hasNanopubChecksum, NANOPUB_CHECKSUM, npa:graph, admin, checksum of all loaded nanopubs (order-independent XOR checksum on trusty URIs in Base64 notation)
537
                    conn.add(npId, NPA.HAS_LOAD_NUMBER, vf.createLiteral(repoStatus.count), NPA.GRAPH);
538
                    // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasLoadNumber, LOAD_NUMBER, npa:graph, admin, the sequential number at which this NANOPUB was loaded
539
                    conn.add(npId, NPA.HAS_LOAD_CHECKSUM, vf.createLiteral(newChecksum), NPA.GRAPH);
540
                    // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasLoadChecksum, LOAD_CHECKSUM, npa:graph, admin, the checksum of all loaded nanopubs after loading the given NANOPUB
541
                    conn.add(npId, NPA.HAS_LOAD_TIMESTAMP, vf.createLiteral(new Date()), NPA.GRAPH);
542
                    // @ADMIN-TRIPLE-TABLE@ NANOPUB, npa:hasLoadTimestamp, LOAD_TIMESTAMP, npa:graph, admin, the time point at which this NANOPUB was loaded
543
                    conn.add(statements);
544
                    if ("full".equals(repoName)) {
545
                        newCountForCache = repoStatus.count + 1;
546
                    }
547
                }
548
                conn.commit();
549
                if (newCountForCache >= 0) {
550
                    loadedNanopubCount = newCountForCache;
551
                }
552
                success = true;
553
            } catch (Exception ex) {
554
                log.warn("Could not load nanopub {} to repo {}.", npId, repoName, ex);
555
                if (conn.isActive()) conn.rollback();
556
            }
557
            if (!success) {
558
                retries++;
559
                if (retries >= MAX_RETRIES) {
560
                    throw new RuntimeException("Failed to load nanopub " + npId + " to repo " + repoName + " after " + MAX_RETRIES + " retries");
561
                }
562
                long delay = computeBackoffMillis(retries);
563
                log.info("Retrying in {} ms for nanopub {} in repo {} (attempt {}/{})...", delay, npId, repoName, retries, MAX_RETRIES);
564
                try {
565
                    Thread.sleep(delay);
566
                } catch (InterruptedException x) {
567
                    Thread.currentThread().interrupt();
568
                }
569
            }
570
        }
571
    }
572

573
    /**
574
     * Writes the raw nanopub statements (all four graphs) into the {@code spaces}
575
     * repo alongside the pre-computed extraction statements (which target
576
     * {@code npa:spacesGraph}). Stamps the load-number on the nanopub IRI and bumps
577
     * {@code npa:thisRepo npa:currentLoadCounter} in {@code npa:graph}, all within
578
     * one serializable transaction.
579
     *
580
     * <p>Idempotent: if the nanopub already has a {@code npa:hasLoadNumber} stamp in
581
     * {@code npa:graph} of the {@code spaces} repo, this is a no-op.
582
     *
583
     * @param npId            nanopub IRI
584
     * @param nanopubTriples  raw nanopub statements (all four graphs + meta)
585
     * @param spaceExtraction summary triples destined for {@code npa:spacesGraph}
586
     */
587
    @GeneratedFlagForDependentElements
588
    private static void loadToSpacesRepo(IRI npId, List<Statement> nanopubTriples,
589
                                         List<Statement> spaceExtraction) {
590
        boolean success = false;
591
        int retries = 0;
592
        while (!success) {
593
            RepositoryConnection conn = TripleStore.get().getRepoConnection("spaces");
594
            try (conn) {
595
                conn.begin(IsolationLevels.SERIALIZABLE);
596
                // Idempotency: skip if this nanopub is already stamped in this repo.
597
                if (Utils.getObjectForPattern(conn, NPA.GRAPH, npId, NPA.HAS_LOAD_NUMBER) != null) {
598
                    conn.commit();
599
                    success = true;
600
                    continue;
601
                }
602
                long newCounter = fetchSpacesLoadCounter(conn) + 1;
603
                conn.remove(NPA.THIS_REPO,
604
                        com.knowledgepixels.query.vocabulary.SpacesVocab.CURRENT_LOAD_COUNTER,
605
                        null, NPA.GRAPH);
606
                conn.add(NPA.THIS_REPO,
607
                        com.knowledgepixels.query.vocabulary.SpacesVocab.CURRENT_LOAD_COUNTER,
608
                        vf.createLiteral(newCounter), NPA.GRAPH);
609
                conn.add(npId, NPA.HAS_LOAD_NUMBER, vf.createLiteral(newCounter), NPA.GRAPH);
610
                conn.add(nanopubTriples);
611
                conn.add(spaceExtraction);
612
                conn.commit();
613
                success = true;
614
            } catch (Exception ex) {
615
                log.warn("Could not load nanopub {} to spaces repo.", npId, ex);
616
                if (conn.isActive()) conn.rollback();
617
            }
618
            if (!success) {
619
                retries++;
620
                if (retries >= MAX_RETRIES) {
621
                    throw new RuntimeException("Failed to load nanopub " + npId + " to spaces repo after " + MAX_RETRIES + " retries");
622
                }
623
                long delay = computeBackoffMillis(retries);
624
                log.info("Retrying in {} ms for nanopub {} in spaces repo (attempt {}/{})...", delay, npId, retries, MAX_RETRIES);
625
                try {
626
                    Thread.sleep(delay);
627
                } catch (InterruptedException x) {
628
                    Thread.currentThread().interrupt();
629
                }
630
            }
631
        }
632
    }
633

634
    /**
635
     * Returns the cumulative count of nanopubs ever loaded into the {@code full}
636
     * repo, or {@code null} if the value cannot be determined (e.g. the store
637
     * hasn't been initialised yet). Reads the persisted {@code npa:hasNanopubCount}
638
     * triple on first call and caches it in {@link #loadedNanopubCount};
639
     * subsequent fresh loads update the cache in-place.
640
     */
641
    public static Long getLoadedNanopubCount() {
642
        Long v = loadedNanopubCount;
6✔
643
        if (v != null) return v;
12✔
644
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection("full")) {
12✔
645
            Value val = Utils.getObjectForPattern(conn, NPA.GRAPH, NPA.THIS_REPO, NPA.HAS_NANOPUB_COUNT);
×
646
            if (val != null) {
×
647
                v = Long.parseLong(val.stringValue());
×
648
                loadedNanopubCount = v;
×
649
                return v;
×
650
            }
651
        } catch (NumberFormatException ex) {
×
652
            log.warn("Invalid npa:hasNanopubCount literal in full repo", ex);
×
653
        } catch (Exception ex) {
3✔
654
            log.warn("Could not read npa:hasNanopubCount from full repo", ex);
12✔
655
        }
×
656
        return null;
6✔
657
    }
658

659
    private static long fetchSpacesLoadCounter(RepositoryConnection conn) {
660
        Value v = Utils.getObjectForPattern(conn, NPA.GRAPH, NPA.THIS_REPO,
×
661
                com.knowledgepixels.query.vocabulary.SpacesVocab.CURRENT_LOAD_COUNTER);
662
        if (v == null) return 0;
×
663
        try {
664
            return Long.parseLong(v.stringValue());
×
665
        } catch (NumberFormatException ex) {
×
666
            log.warn("Invalid npa:currentLoadCounter literal in spaces repo: {}", v);
×
667
            return 0;
×
668
        }
669
    }
670

671
    private record RepoStatus(boolean isLoaded, long count, String checksum) {
×
672
    }
673

674
    /**
675
     * To execute before loading a nanopub: check if the nanopub is already loaded and what is the
676
     * current load counter and checksum. This effectively batches three queries into one.
677
     * This method must be called from within a transaction.
678
     *
679
     * @param conn repo connection
680
     * @param npId nanopub ID
681
     * @return the current status
682
     */
683
    @GeneratedFlagForDependentElements
684
    private static RepoStatus fetchRepoStatus(RepositoryConnection conn, IRI npId) {
685
        var result = conn.prepareTupleQuery(QueryLanguage.SPARQL, REPO_STATUS_QUERY_TEMPLATE.formatted(npId)).evaluate();
686
        try (result) {
687
            if (!result.hasNext()) {
688
                // This may happen if the repo was created, but is completely empty.
689
                return new RepoStatus(false, 0, NanopubUtils.INIT_CHECKSUM);
690
            }
691
            var row = result.next();
692
            return new RepoStatus(row.hasBinding("loadNumber"), Long.parseLong(row.getBinding("count").getValue().stringValue()), row.getBinding("checksum").getValue().stringValue());
693
        }
694
    }
695

696
    @GeneratedFlagForDependentElements
697
    private static void loadInvalidateStatements(Nanopub thisNp, String thisPubkey, Statement invalidateStatement, Statement pubkeyStatement, Statement pubkeyStatementX) {
698
        boolean success = false;
699
        int retries = 0;
700
        while (!success) {
701
            List<RepositoryConnection> connections = new ArrayList<>();
702
            RepositoryConnection metaConn = TripleStore.get().getRepoConnection("meta");
703
            try {
704
                IRI invalidatedNpId = (IRI) invalidateStatement.getObject();
705
                // Basic isolation because here we only read append-only data.
706
                metaConn.begin(IsolationLevels.READ_COMMITTED);
707

708
                Value pubkeyValue = Utils.getObjectForPattern(metaConn, NPA.GRAPH, invalidatedNpId, NPA.HAS_VALID_SIGNATURE_FOR_PUBLIC_KEY);
709
                if (pubkeyValue != null) {
710
                    String pubkey = pubkeyValue.stringValue();
711

712
                    if (!pubkey.equals(thisPubkey)) {
713
                        //log.info("Adding invalidation expressed in " + thisNp.getUri() + " also to repo for pubkey " + pubkey);
714
                        connections.add(loadStatements("pubkey_" + Utils.createHash(pubkey), invalidateStatement, pubkeyStatement, pubkeyStatementX));
715
//                                                connections.add(loadStatements("text-pubkey_" + Utils.createHash(pubkey), invalidateStatement, pubkeyStatement));
716
                    }
717

718
                    // Collect target types so we can both propagate per-type AND decide
719
                    // whether the target is space-relevant in a single pass over meta.
720
                    Set<IRI> targetTypes = new HashSet<>();
721
                    for (Value v : Utils.getObjectsForPattern(metaConn, NPA.GRAPH, invalidatedNpId, NPX.HAS_NANOPUB_TYPE)) {
722
                        if (v instanceof IRI typeIri) targetTypes.add(typeIri);
723
                    }
724
                    Set<IRI> thisNpTypes = NanopubUtils.getTypes(thisNp);
725
                    for (IRI typeIri : targetTypes) {
726
                        if (!thisNpTypes.contains(typeIri)) {
727
                            //log.info("Adding invalidation expressed in " + thisNp.getUri() + " also to repo for type " + typeIri);
728
                            connections.add(loadStatements("type_" + Utils.createHash(typeIri), invalidateStatement, pubkeyStatement, pubkeyStatementX));
729
//                                                        connections.add(loadStatements("text-type_" + Utils.createHash(typeIri), invalidateStatement, pubkeyStatement));
730
                        }
731
                    }
732

733
                    // Emit an npa:Invalidation entry into the spaces repo if the target
734
                    // had any space-relevant type. Piggybacks on the type lookup above
735
                    // so we don't re-read meta.
736
                    if (FeatureFlags.spacesEnabled() && SpacesExtractor.isSpaceRelevant(targetTypes)) {
737
                        List<Statement> invEntry = buildInvalidationEntry(thisNp, invalidatedNpId, targetTypes, thisPubkey);
738
                        if (!invEntry.isEmpty()) {
739
                            connections.add(loadStatements("spaces", invEntry.toArray(new Statement[0])));
740
                        }
741
                    }
742

743
//                                        for (Value v : Utils.getObjectsForPattern(metaConn, NPA.GRAPH, invalidatedNpId, DCTERMS.CREATOR)) {
744
//                                                IRI creatorIri = (IRI) v;
745
//                                                if (!SimpleCreatorPattern.getCreators(thisNp).contains(creatorIri)) {
746
//                                                        //log.info("Adding invalidation expressed in " + thisNp.getUri() + " also to repo for user " + creatorIri);
747
//                                                        connections.add(loadStatements("user_" + Utils.createHash(creatorIri), invalidateStatement, pubkeyStatement));
748
//                                                        connections.add(loadStatements("text-user_" + Utils.createHash(creatorIri), invalidateStatement, pubkeyStatement));
749
//                                                }
750
//                                        }
751
                }
752

753
                metaConn.commit();
754
                // TODO handle case that some commits succeed and some fail
755
                for (RepositoryConnection c : connections) c.commit();
756
                success = true;
757
            } catch (Exception ex) {
758
                log.warn("Could not load invalidate statements for {}.", thisNp.getUri(), ex);
759
                if (metaConn.isActive()) metaConn.rollback();
760
                for (RepositoryConnection c : connections) {
761
                    if (c.isActive()) c.rollback();
762
                }
763
            } finally {
764
                metaConn.close();
765
                for (RepositoryConnection c : connections) c.close();
766
            }
767
            if (!success) {
768
                retries++;
769
                if (retries >= MAX_RETRIES) {
770
                    throw new RuntimeException("Failed to load invalidate statements for " + thisNp.getUri() + " after " + MAX_RETRIES + " retries");
771
                }
772
                long delay = computeBackoffMillis(retries);
773
                log.info("Retrying in {} ms for invalidate statements of {} (attempt {}/{})...", delay, thisNp.getUri(), retries, MAX_RETRIES);
774
                try {
775
                    Thread.sleep(delay);
776
                } catch (InterruptedException x) {
777
                    Thread.currentThread().interrupt();
778
                }
779
            }
780
        }
781
    }
782

783
    /**
784
     * Builds the statements for an {@code npa:Invalidation} entry describing a
785
     * space-relevant retraction/supersession. Caller writes these into the
786
     * {@code spaces} repo.
787
     *
788
     * @param thisNp        the invalidator nanopub
789
     * @param invalidatedNp IRI of the invalidated target
790
     * @param targetTypes   the target's types (already read from the meta repo)
791
     * @param thisPubkey    the invalidator's signing pubkey
792
     * @return the invalidation-entry statements, or an empty list if no signer is known
793
     */
794
    private static List<Statement> buildInvalidationEntry(Nanopub thisNp, IRI invalidatedNp,
795
                                                          Set<IRI> targetTypes, String thisPubkey) {
796
        String artifactCode = TrustyUriUtils.getArtifactCode(thisNp.getUri().toString());
×
797
        if (artifactCode == null || artifactCode.isEmpty()) return Collections.emptyList();
×
798
        IRI signer = null;
×
799
        for (Statement st : thisNp.getPubinfo()) {
×
800
            if (!st.getSubject().equals(thisNp.getUri())) continue;
×
801
            if (!st.getPredicate().equals(NPX.SIGNED_BY)) continue;
×
802
            if (st.getObject() instanceof IRI signerIri) {
×
803
                signer = signerIri;
×
804
                break;
×
805
            }
806
        }
×
807
        Date createdAt = null;
×
808
        try {
809
            Calendar ts = SimpleTimestampPattern.getCreationTime(thisNp);
×
810
            if (ts != null) createdAt = ts.getTime();
×
811
        } catch (IllegalArgumentException ignored) {
×
812
            // pubinfo timestamp missing or malformed; extraction entry simply omits dct:created.
813
        }
×
814
        SpacesExtractor.Context ctx = new SpacesExtractor.Context(
×
815
                artifactCode, signer, Utils.createHash(thisPubkey), createdAt);
×
816
        return SpacesExtractor.extractInvalidation(thisNp, invalidatedNp, targetTypes, ctx);
×
817
    }
818

819
    @GeneratedFlagForDependentElements
820
    private static RepositoryConnection loadStatements(String repoName, Statement... statements) {
821
        RepositoryConnection conn = TripleStore.get().getRepoConnection(repoName);
822
        // Basic isolation: we only append new statements
823
        conn.begin(IsolationLevels.READ_COMMITTED);
824
        for (Statement st : statements) {
825
            conn.add(st);
826
        }
827
        return conn;
828
    }
829

830
    @GeneratedFlagForDependentElements
831
    static List<Statement> getInvalidatingStatements(IRI npId) {
832
        List<Statement> invalidatingStatements = new ArrayList<>();
833
        boolean success = false;
834
        int retries = 0;
835
        while (!success) {
836
            RepositoryConnection conn = TripleStore.get().getRepoConnection("meta");
837
            try (conn) {
838
                // Basic isolation because here we only read append-only data.
839
                conn.begin(IsolationLevels.READ_COMMITTED);
840

841
                TupleQueryResult r = conn.prepareTupleQuery(QueryLanguage.SPARQL, "SELECT * { graph <" + NPA.GRAPH + "> { " + "?np <" + NPX.INVALIDATES + "> <" + npId + "> ; <" + NPA.HAS_VALID_SIGNATURE_FOR_PUBLIC_KEY + "> ?pubkey . " + "} }").evaluate();
842
                try (r) {
843
                    while (r.hasNext()) {
844
                        BindingSet b = r.next();
845
                        invalidatingStatements.add(vf.createStatement((IRI) b.getBinding("np").getValue(), NPX.INVALIDATES, npId, NPA.GRAPH));
846
                        invalidatingStatements.add(vf.createStatement((IRI) b.getBinding("np").getValue(), NPA.HAS_VALID_SIGNATURE_FOR_PUBLIC_KEY, b.getBinding("pubkey").getValue(), NPA.GRAPH));
847
                    }
848
                }
849
                conn.commit();
850
                success = true;
851
            } catch (Exception ex) {
852
                log.warn("Could not load invalidating statements for {}.", npId, ex);
853
                if (conn.isActive()) conn.rollback();
854
            }
855
            if (!success) {
856
                retries++;
857
                if (retries >= MAX_RETRIES) {
858
                    throw new RuntimeException("Failed to get invalidating statements for " + npId + " after " + MAX_RETRIES + " retries");
859
                }
860
                long delay = computeBackoffMillis(retries);
861
                log.info("Retrying in {} ms for invalidating statements of {} (attempt {}/{})...", delay, npId, retries, MAX_RETRIES);
862
                try {
863
                    Thread.sleep(delay);
864
                } catch (InterruptedException x) {
865
                    Thread.currentThread().interrupt();
866
                }
867
            }
868
        }
869
        return invalidatingStatements;
870
    }
871

872
    @GeneratedFlagForDependentElements
873
    private static void loadNoteToRepo(Resource subj, String note) {
874
        boolean success = false;
875
        int retries = 0;
876
        while (!success) {
877
            RepositoryConnection conn = TripleStore.get().getAdminRepoConnection();
878
            try (conn) {
879
                List<Statement> statements = new ArrayList<>();
880
                statements.add(vf.createStatement(subj, NPA.NOTE, vf.createLiteral(note), NPA.GRAPH));
881
                conn.add(statements);
882
                success = true;
883
            } catch (Exception ex) {
884
                log.warn("Could not load note to repo for {}.", subj, ex);
885
            }
886
            if (!success) {
887
                retries++;
888
                if (retries >= MAX_RETRIES) {
889
                    throw new RuntimeException("Failed to load note to repo for " + subj + " after " + MAX_RETRIES + " retries");
890
                }
891
                long delay = computeBackoffMillis(retries);
892
                log.info("Retrying in {} ms for note on {} (attempt {}/{})...", delay, subj, retries, MAX_RETRIES);
893
                try {
894
                    Thread.sleep(delay);
895
                } catch (InterruptedException x) {
896
                    Thread.currentThread().interrupt();
897
                }
898
            }
899
        }
900
    }
901

902
    static boolean hasValidSignature(NanopubSignatureElement el) {
903
        if (el == null) {
6!
904
            log.warn("Signature validation: signature element is null");
×
905
            return false;
×
906
        }
907
        try {
908
            if (SignatureUtils.hasValidSignature(el) && el.getPublicKeyString() != null) {
18!
909
                return true;
6✔
910
            }
911
            log.warn("Signature validation returned false for {} (pubkey present: {})",
12✔
912
                    el.getUri(), el.getPublicKeyString() != null);
21!
913
        } catch (GeneralSecurityException ex) {
3✔
914
            log.warn("Signature validation failed for signature element {}", el.getUri(), ex);
18✔
915
        }
3✔
916
        return false;
6✔
917
    }
918

919
    private static IRI getBaseTrustyUri(Value v) {
920
        if (!(v instanceof IRI)) return null;
9!
921
        String s = v.stringValue();
9✔
922
        if (!s.matches(".*[^A-Za-z0-9\\-_]RA[A-Za-z0-9\\-_]{43}([^A-Za-z0-9\\\\-_].{0,43})?")) {
12✔
923
            return null;
6✔
924
        }
925
        return vf.createIRI(s.replaceFirst("^(.*[^A-Za-z0-9\\-_]RA[A-Za-z0-9\\-_]{43})([^A-Za-z0-9\\\\-_].{0,43})?$", "$1"));
21✔
926
    }
927

928
    // TODO: Move this to nanopub library:
929
    private static boolean isIntroNanopub(Nanopub np) {
930
        for (Statement st : np.getAssertion()) {
33✔
931
            if (st.getPredicate().equals(NPX.DECLARED_BY)) return true;
21✔
932
        }
3✔
933
        return false;
6✔
934
    }
935

936
    /**
937
     * Check if a nanopub is already loaded in the admin graph.
938
     *
939
     * @param npId the nanopub ID
940
     * @return true if the nanopub is loaded, false otherwise
941
     */
942
    @GeneratedFlagForDependentElements
943
    static boolean isNanopubLoaded(String npId) {
944
        boolean loaded = false;
945
        RepositoryConnection conn = TripleStore.get().getRepoConnection("meta");
946
        try (conn) {
947
            if (Utils.getObjectForPattern(conn, NPA.GRAPH, vf.createIRI(npId), NPA.HAS_LOAD_NUMBER) != null) {
948
                loaded = true;
949
            }
950
        } catch (Exception ex) {
951
            log.warn("Could not check whether nanopub is loaded.", ex);
952
        }
953
        return loaded;
954
    }
955

956
    private static ValueFactory vf = SimpleValueFactory.getInstance();
6✔
957

958
    // TODO remove the constants and use the ones from the nanopub library instead
959

960
    /**
961
     * Template for the query that fetches the status of a repository.
962
     */
963
    // Template for .fetchRepoStatus
964
    private static final String REPO_STATUS_QUERY_TEMPLATE = """
84✔
965
            SELECT * { graph <%s> {
966
              OPTIONAL { <%s> <%s> ?loadNumber . }
967
              <%s> <%s> ?count ;
968
                   <%s> ?checksum .
969
            } }
970
            """.formatted(NPA.GRAPH, "%s", NPA.HAS_LOAD_NUMBER, NPA.THIS_REPO, NPA.HAS_NANOPUB_COUNT, NPA.HAS_NANOPUB_CHECKSUM);
6✔
971
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc