• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-query / 24963156575

26 Apr 2026 05:52PM UTC coverage: 58.926% (-0.5%) from 59.461%
24963156575

push

github

web-flow
Merge pull request #81 from knowledgepixels/feature/62-spaces-materializer-2b

feat: per-tier SPARQL UPDATE loops in full build (#62, PR 2b/3)

385 of 740 branches covered (52.03%)

Branch coverage included in aggregate %.

1064 of 1719 relevant lines covered (61.9%)

9.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

13.92
src/main/java/com/knowledgepixels/query/AuthorityResolver.java
1
package com.knowledgepixels.query;
2

3
import java.util.ArrayList;
4
import java.util.List;
5
import java.util.Optional;
6
import java.util.Set;
7

8
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
9
import org.eclipse.rdf4j.model.IRI;
10
import org.eclipse.rdf4j.model.Statement;
11
import org.eclipse.rdf4j.model.Value;
12
import org.eclipse.rdf4j.model.ValueFactory;
13
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
14
import org.eclipse.rdf4j.model.vocabulary.RDF;
15
import org.eclipse.rdf4j.query.BindingSet;
16
import org.eclipse.rdf4j.query.QueryLanguage;
17
import org.eclipse.rdf4j.query.TupleQueryResult;
18
import org.eclipse.rdf4j.repository.RepositoryConnection;
19
import org.eclipse.rdf4j.repository.RepositoryResult;
20
import org.nanopub.vocabulary.NPA;
21
import org.slf4j.Logger;
22
import org.slf4j.LoggerFactory;
23

24
import com.knowledgepixels.query.vocabulary.GEN;
25
import com.knowledgepixels.query.vocabulary.NPAT;
26
import com.knowledgepixels.query.vocabulary.SpacesVocab;
27

28
/**
29
 * Drives the space-state materialization pipeline: detects trust-state flips,
30
 * mirrors the approved {@code (agent, pubkey)} rows from the {@code trust} repo
31
 * into a fresh {@code npass:<T>_<M>} graph in the {@code spaces} repo, runs the
32
 * per-tier validation loops (stubbed in PR 2a), flips the
33
 * {@code npa:hasCurrentSpaceState} pointer, and drops the previous graph. Also
34
 * cleans up orphan {@code npass:*} graphs on startup.
35
 *
36
 * <p>See {@code doc/plan-space-repositories.md} — this implements the "Full
37
 * build" procedure plus pointer management and the mirror step. Per-tier SPARQL
38
 * UPDATE loops, the incremental cycle, and the periodic-rebuild flag follow in
39
 * PRs 2b and 2c.
40
 */
41
public final class AuthorityResolver {
42

43
    private static final Logger log = LoggerFactory.getLogger(AuthorityResolver.class);
9✔
44

45
    private static final ValueFactory vf = SimpleValueFactory.getInstance();
6✔
46

47
    private static final String SPACES_REPO = "spaces";
48
    private static final String TRUST_REPO = "trust";
49

50
    /** NPA constants pulled in locally (trust-side). */
51
    private static final IRI NPA_HAS_CURRENT_TRUST_STATE =
9✔
52
            vf.createIRI(NPA.NAMESPACE, "hasCurrentTrustState");
6✔
53
    private static final IRI NPA_ACCOUNT_STATE = vf.createIRI(NPA.NAMESPACE, "AccountState");
15✔
54
    private static final IRI NPA_AGENT = vf.createIRI(NPA.NAMESPACE, "agent");
15✔
55
    private static final IRI NPA_PUBKEY = vf.createIRI(NPA.NAMESPACE, "pubkey");
15✔
56
    private static final IRI NPA_TRUST_STATUS = vf.createIRI(NPA.NAMESPACE, "trustStatus");
15✔
57
    private static final IRI NPA_LOADED = vf.createIRI(NPA.NAMESPACE, "loaded");
15✔
58
    private static final IRI NPA_TO_LOAD = vf.createIRI(NPA.NAMESPACE, "toLoad");
15✔
59

60
    /**
61
     * Trust-approved set: rows with one of these {@code npa:trustStatus} values
62
     * are mirrored into the space-state graph. Per
63
     * {@code doc/design-trust-state-repos.md}, these are the two "authority-
64
     * approving" statuses; {@code npa:contested}, {@code npa:skipped}, and the
65
     * transient statuses are distinct values of the same predicate and are
66
     * excluded automatically by this positive-list filter.
67
     */
68
    private static final Set<IRI> APPROVED_SET = Set.of(NPA_LOADED, NPA_TO_LOAD);
15✔
69

70
    private static AuthorityResolver instance;
71

72
    /** Returns the singleton. */
73
    public static synchronized AuthorityResolver get() {
74
        if (instance == null) {
6✔
75
            instance = new AuthorityResolver();
12✔
76
        }
77
        return instance;
6✔
78
    }
79

80
    private AuthorityResolver() {
81
    }
82

83
    // ---------------- Public entry points ----------------
84

85
    /**
86
     * Poll entry point: checks the current trust-state hash against the active
87
     * space-state graph's hash component; if they differ (or no space-state graph
88
     * exists yet), runs a full build. Safe to call repeatedly on a schedule —
89
     * when the hashes match, it's a no-op. Gated by
90
     * {@link FeatureFlags#spacesEnabled()}.
91
     */
92
    public void tick() {
93
        if (!FeatureFlags.spacesEnabled()) return;
6!
94
        String trustStateHash = TrustStateRegistry.get().getCurrentHash().orElse(null);
18✔
95
        if (trustStateHash == null) {
6!
96
            log.debug("AuthorityResolver.tick: no current trust state yet — skipping");
9✔
97
            return;
3✔
98
        }
99
        String currentGraphName = getCurrentSpaceStateGraphLocalName();
×
100
        if (currentGraphName != null && currentGraphName.startsWith(trustStateHash + "_")) {
×
101
            log.debug("AuthorityResolver.tick: already on trust state {}", abbrev(trustStateHash));
×
102
            return;
×
103
        }
104
        log.info("AuthorityResolver.tick: trust-state flip detected (now {}); running full build",
×
105
                abbrev(trustStateHash));
×
106
        runFullBuild(trustStateHash);
×
107
    }
×
108

109
    /**
110
     * Startup cleanup: drop any {@code npass:*} graph that the
111
     * {@code npa:hasCurrentSpaceState} pointer isn't pointing at. Orphans come
112
     * from crashes mid-build. Safe to call at any time; idempotent.
113
     */
114
    public void cleanOrphans() {
115
        if (!FeatureFlags.spacesEnabled()) return;
×
116
        IRI current = getCurrentSpaceStateGraph();
×
117
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
118
            int dropped = 0;
×
119
            try (RepositoryResult<org.eclipse.rdf4j.model.Resource> ctxs = conn.getContextIDs()) {
×
120
                List<IRI> toDrop = new ArrayList<>();
×
121
                while (ctxs.hasNext()) {
×
122
                    org.eclipse.rdf4j.model.Resource ctx = ctxs.next();
×
123
                    if (!(ctx instanceof IRI iri)) continue;
×
124
                    if (!iri.stringValue().startsWith(SpacesVocab.NPASS_NAMESPACE)) continue;
×
125
                    if (iri.equals(current)) continue;
×
126
                    toDrop.add(iri);
×
127
                }
×
128
                for (IRI iri : toDrop) {
×
129
                    conn.begin(IsolationLevels.SERIALIZABLE);
×
130
                    conn.clear(iri);
×
131
                    conn.commit();
×
132
                    dropped++;
×
133
                    log.info("AuthorityResolver.cleanOrphans: dropped orphan graph {}", iri);
×
134
                }
×
135
            }
136
            if (dropped == 0) {
×
137
                log.debug("AuthorityResolver.cleanOrphans: no orphan space-state graphs");
×
138
            }
139
        } catch (Exception ex) {
×
140
            log.info("AuthorityResolver.cleanOrphans: failed: {}", ex.toString());
×
141
        }
×
142
    }
×
143

144
    // ---------------- Full build ----------------
145

146
    /**
147
     * Mutex-protected full build of the space-state graph for the given trust
148
     * state. Captures {@code M = currentLoadCounter}, mirrors trust-approved
149
     * rows, (PR 2b: runs per-tier UPDATE loops from scratch), stamps
150
     * {@code processedUpTo = M}, flips the pointer, drops the previous graph.
151
     */
152
    synchronized void runFullBuild(String trustStateHash) {
153
        long loadCounter = getCurrentLoadCounter();
×
154
        IRI newGraph = SpacesVocab.forSpaceState(trustStateHash, loadCounter);
×
155
        IRI oldGraph = getCurrentSpaceStateGraph();
×
156
        if (newGraph.equals(oldGraph)) {
×
157
            log.debug("AuthorityResolver.runFullBuild: already current at {}", newGraph);
×
158
            return;
×
159
        }
160

161
        // 1. Mirror trust-approved rows into the new graph.
162
        int mirrored = mirrorTrustState(trustStateHash, newGraph);
×
163

164
        // 2. Per-tier UPDATE loops (from scratch: lastProcessed = -1 so the
165
        //    delta filter FILTER(?ln > ?lastProcessed) includes everything).
166
        TierCounts counts = runAllTierLoops(newGraph, -1);
×
167

168
        // 3. Stamp processedUpTo inside the new graph.
169
        writeProcessedUpTo(newGraph, loadCounter);
×
170

171
        // 4. Flip the current-space-state pointer.
172
        flipPointer(newGraph);
×
173

174
        // 5. Drop the old graph if one existed.
175
        if (oldGraph != null) {
×
176
            dropGraph(oldGraph);
×
177
        }
178

179
        log.info("AuthorityResolver: full build complete — graph={} mirrored={} rows loadCounter={} "
×
180
                        + "tiers: admin={} attachment={} maintainer={} member={} observer={}",
181
                newGraph, mirrored, loadCounter,
×
182
                counts.admin, counts.attachment, counts.maintainer, counts.member, counts.observer);
×
183
    }
×
184

185
    // ---------------- Tier UPDATE loops ----------------
186

187
    /** Per-tier INSERT counts (for logging/metrics). */
188
    static final class TierCounts {
×
189
        int admin;
190
        int attachment;
191
        int maintainer;
192
        int member;
193
        int observer;
194
    }
195

196
    /**
197
     * Runs the five tier loops in order: admin → {@code gen:hasRole} attachment
198
     * validation → maintainer → member → observer. Each loop iterates a SPARQL
199
     * INSERT to fixed point (no new triples added). Returns per-tier counts.
200
     *
201
     * @param graph         target space-state graph
202
     * @param lastProcessed load-number horizon; use {@code -1} for full build
203
     */
204
    TierCounts runAllTierLoops(IRI graph, long lastProcessed) {
205
        TierCounts c = new TierCounts();
×
206
        c.admin = runTierLabeled("admin", graph, adminTierUpdate(graph, lastProcessed));
×
207
        c.attachment = runTierLabeled("attachment", graph,
×
208
                attachmentValidationUpdate(graph, lastProcessed));
×
209
        c.maintainer = runTierLabeled("maintainer", graph, nonAdminTierUpdate(graph, lastProcessed,
×
210
                GEN.MAINTAINER_ROLE, PUBLISHER_IS_ADMIN));
211
        // Member tier: admin OR maintainer publisher — split into two simpler updates
212
        // so the query planner doesn't struggle with the UNION.
213
        c.member = runTierLabeled("member(admin-pub)", graph, nonAdminTierUpdate(graph, lastProcessed,
×
214
                GEN.MEMBER_ROLE, PUBLISHER_IS_ADMIN));
215
        c.member += runTierLabeled("member(maint-pub)", graph, nonAdminTierUpdate(graph, lastProcessed,
×
216
                GEN.MEMBER_ROLE, publisherIsTieredRole(GEN.MAINTAINER_ROLE)));
×
217
        // Observer tier: self-evidence only per the plan's policy table
218
        // (gen:ObserverRole = self). Authority-publisher sub-tiers were overreach;
219
        // the three of them have been removed, so an observer instantiation is
220
        // validated iff the assignee's own pubkey signed it.
221
        c.observer = runTierLabeled("observer(self)", graph, nonAdminTierUpdate(graph, lastProcessed,
×
222
                GEN.OBSERVER_ROLE, PUBLISHER_IS_SELF));
223
        return c;
×
224
    }
225

226
    /**
227
     * Builds a publisher constraint requiring the publisher to be a validated holder
228
     * of the given tier's role (maintainer or member) in the target space.
229
     * Owns its own AccountState resolution so ?publisher is bound through the
230
     * targeted (pkh → agent) lookup rather than enumerated.
231
     */
232
    private static String publisherIsTieredRole(IRI tierClass) {
233
        return """
×
234
                ?acct a npa:AccountState ;
235
                      npa:pubkey ?pkh ;
236
                      npa:agent  ?publisher .
237
                ?tierRI a gen:RoleInstantiation ;
238
                        npa:forSpace ?space ;
239
                        npa:forAgent ?publisher .
240
                ?rdT a npa:RoleDeclaration ;
241
                     npa:hasRoleType <%1$s> .
242
                { ?tierRI npa:regularProperty ?predT . ?rdT gen:hasRegularProperty ?predT . }
243
                UNION
244
                { ?tierRI npa:inverseProperty ?predT . ?rdT gen:hasInverseProperty ?predT . }
245
                """.formatted(tierClass);
×
246
    }
247

248
    /** Wraps {@link #runTierLoop} with tier-name context for logs/exceptions. */
249
    private int runTierLabeled(String tier, IRI graph, String sparqlUpdate) {
250
        try {
251
            return runTierLoop(graph, sparqlUpdate);
×
252
        } catch (RuntimeException ex) {
×
253
            log.error("AuthorityResolver: tier={} failed with SPARQL UPDATE:\n{}\n", tier, sparqlUpdate, ex);
×
254
            throw ex;
×
255
        }
256
    }
257

258
    /**
259
     * Runs a single tier's INSERT to fixed point. Counts rows by probing
260
     * graph size before/after each INSERT; stops when the size doesn't change.
261
     *
262
     * @return total number of triples inserted by this tier across all iterations
263
     */
264
    int runTierLoop(IRI graph, String sparqlUpdate) {
265
        int total = 0;
×
266
        long before = graphSize(graph);
×
267
        while (true) {
268
            // Note: no explicit transaction wrapping here. In tests we observed that
269
            // HTTPRepository's RDF4J-transaction protocol silently no-op'd cross-graph
270
            // SPARQL UPDATEs with UNION sub-patterns inside conn.begin()/commit(),
271
            // while the same UPDATE POSTed directly to /statements applied correctly.
272
            // A bare prepareUpdate().execute() takes the direct /statements path and
273
            // runs the UPDATE atomically per SPARQL 1.1 semantics — which is all we
274
            // need; there's nothing else to commit atomically alongside the UPDATE.
275
            try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
276
                conn.prepareUpdate(QueryLanguage.SPARQL, sparqlUpdate).execute();
×
277
            }
278
            long after = graphSize(graph);
×
279
            long added = after - before;
×
280
            if (added <= 0) break;
×
281
            total += added;
×
282
            before = after;
×
283
        }
×
284
        return total;
×
285
    }
286

287
    private long graphSize(IRI graph) {
288
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
289
            return conn.size(graph);
×
290
        }
291
    }
292

293
    // ---------------- SPARQL templates ----------------
294

295
    /**
296
     * Reusable invalidation filter on a bound nanopub-IRI variable. Pass the bare
297
     * variable name (no leading {@code ?}); e.g. {@code invalidationFilter("np")}
298
     * produces an outer-scoped {@code FILTER NOT EXISTS { GRAPH npa:spacesGraph
299
     * { ?_inv_np a npa:Invalidation ; npa:invalidates ?np . } }}.
300
     *
301
     * <p>Important: this filter must be placed OUTSIDE the surrounding
302
     * {@code GRAPH npa:spacesGraph { ... }} block, not nested inside it. When
303
     * nested, RDF4J's planner couples the FILTER NOT EXISTS evaluation into the
304
     * join order (per-row scan of {@code ?_inv a npa:Invalidation} multiplied by
305
     * the candidate set), which we measured turning a 39ms query into a 60s+
306
     * timeout on the live observer-tier data. Outside the GRAPH block, the
307
     * planner defers the filter until {@code ?np}/{@code ?rdNp} are bound and
308
     * does a targeted index lookup.
309
     *
310
     * <p>Variable names must match {@code [A-Za-z0-9_]+} per SPARQL grammar —
311
     * embedding a {@code ?} inside {@code ?_inv_?np} would yield a parse error.
312
     */
313
    private static String invalidationFilter(String bareVarName) {
314
        return "FILTER NOT EXISTS { GRAPH <" + SpacesVocab.SPACES_GRAPH + "> {"
30✔
315
                + " ?_inv_" + bareVarName
316
                + " a <" + SpacesVocab.INVALIDATION + "> ; "
317
                + "<" + SpacesVocab.INVALIDATES + "> ?" + bareVarName + " . } }";
318
    }
319

320
    /**
321
     * Admin tier: seed from {@code npadef:...hasRootAdmin} (trusted by construction)
322
     * plus closed-over admin grants; insert any {@code gen:RoleInstantiation} with
323
     * {@code npa:regularProperty gen:hasAdmin} whose publisher (resolved via mirrored
324
     * trust-approved AccountState) is already in the admin set.
325
     */
326
    static String adminTierUpdate(IRI graph, long lastProcessed) {
327
        // Order tuned for RDF4J's evaluator:
328
        //   1. Anchor on the small (seed UNION closed-over) set to bind ?publisher
329
        //      and ?space cheaply.
330
        //   2. Resolve ?pkh from the mirrored AccountState row (?publisher bound).
331
        //   3. Probe instantiations using the now-bound (?space, ?pkh) — targeted
332
        //      lookup, not a full RoleInstantiation scan.
333
        //   4. Load-number filter on bound ?np.
334
        //   5. Dedup at the end.
335
        return """
69✔
336
                PREFIX npa:  <%1$s>
337
                PREFIX gen:  <%2$s>
338
                INSERT { GRAPH <%3$s> {
339
                  ?ri a gen:RoleInstantiation ;
340
                      npa:forSpace ?space ;
341
                      npa:regularProperty gen:hasAdmin ;
342
                      npa:forAgent ?agent ;
343
                      npa:viaNanopub ?np .
344
                } }
345
                WHERE {
346
                  # 1. Anchor: who is already an admin of which space?
347
                  {
348
                    # Seed branch: root-admin in a non-invalidated SpaceDefinition.
349
                    GRAPH <%4$s> {
350
                      ?def a npa:SpaceDefinition ;
351
                           npa:forSpaceRef  ?spaceRef ;
352
                           npa:hasRootAdmin ?publisher ;
353
                           npa:viaNanopub   ?defNp .
354
                      ?spaceRef npa:spaceIri ?space .
355
                    }
356
                    %7$s
357
                  }
358
                  UNION
359
                  {
360
                    # Closed-over branch: an existing admin in this space-state graph.
361
                    GRAPH <%3$s> {
362
                      ?prev a gen:RoleInstantiation ;
363
                            npa:forSpace        ?space ;
364
                            npa:regularProperty gen:hasAdmin ;
365
                            npa:forAgent        ?publisher .
366
                    }
367
                  }
368
                  # 2. Mirror: resolve ?publisher → ?pkh via the trust-approved row.
369
                  GRAPH <%3$s> {
370
                    ?acct a npa:AccountState ;
371
                          npa:agent  ?publisher ;
372
                          npa:pubkey ?pkh .
373
                  }
374
                  # 3. Targeted instantiation lookup by space + pubkey.
375
                  GRAPH <%4$s> {
376
                    ?ri a gen:RoleInstantiation ;
377
                        npa:forSpace        ?space ;
378
                        npa:regularProperty gen:hasAdmin ;
379
                        npa:forAgent        ?agent ;
380
                        npa:pubkeyHash      ?pkh ;
381
                        npa:viaNanopub      ?np .
382
                  }
383
                  %6$s
384
                  # 4. Load-number filter on bound ?np.
385
                  GRAPH <%8$s> {
386
                    ?np npa:hasLoadNumber ?ln .
387
                    FILTER (?ln > %5$d)
388
                  }
389
                  # 5. Dedup last.
390
                  FILTER NOT EXISTS { GRAPH <%3$s> {
391
                    ?existing a gen:RoleInstantiation ;
392
                              npa:forSpace ?space ;
393
                              npa:forAgent ?agent ;
394
                              npa:regularProperty gen:hasAdmin .
395
                  } }
396
                }
397
                """.formatted(
3✔
398
                NPA.NAMESPACE,
399
                GEN.NAMESPACE,
400
                graph,
401
                SpacesVocab.SPACES_GRAPH,
402
                lastProcessed,
15✔
403
                invalidationFilter("np"),
15✔
404
                invalidationFilter("defNp"),
18✔
405
                NPA.GRAPH);
406
    }
407

408
    /**
409
     * {@code gen:hasRole} attachment validation: an attachment is validated iff its
410
     * publisher is already a validated admin of the target space. Adds
411
     * {@code gen:RoleAssignment} rows to the space-state graph.
412
     */
413
    static String attachmentValidationUpdate(IRI graph, long lastProcessed) {
414
        return """
69✔
415
                PREFIX npa:  <%1$s>
416
                PREFIX gen:  <%2$s>
417
                INSERT { GRAPH <%3$s> {
418
                  ?ra a gen:RoleAssignment ;
419
                      npa:forSpace ?space ;
420
                      gen:hasRole  ?role ;
421
                      npa:viaNanopub ?np .
422
                } }
423
                WHERE {
424
                  GRAPH <%4$s> {
425
                    ?ra a gen:RoleAssignment ;
426
                        npa:forSpace ?space ;
427
                        gen:hasRole  ?role ;
428
                        npa:pubkeyHash ?pkh ;
429
                        npa:viaNanopub ?np .
430
                  }
431
                  GRAPH <%7$s> {
432
                    ?np npa:hasLoadNumber ?ln .
433
                    FILTER (?ln > %5$d)
434
                  }
435
                  GRAPH <%3$s> {
436
                    ?acct a npa:AccountState ;
437
                          npa:agent  ?publisher ;
438
                          npa:pubkey ?pkh .
439
                    ?adminRI a gen:RoleInstantiation ;
440
                             npa:forSpace ?space ;
441
                             npa:regularProperty gen:hasAdmin ;
442
                             npa:forAgent ?publisher .
443
                  }
444
                  %6$s
445
                  FILTER NOT EXISTS { GRAPH <%3$s> {
446
                    ?existing a gen:RoleAssignment ;
447
                              npa:forSpace ?space ;
448
                              gen:hasRole  ?role .
449
                  } }
450
                }
451
                """.formatted(
3✔
452
                NPA.NAMESPACE,
453
                GEN.NAMESPACE,
454
                graph,
455
                SpacesVocab.SPACES_GRAPH,
456
                lastProcessed,
15✔
457
                invalidationFilter("np"),
18✔
458
                NPA.GRAPH);
459
    }
460

461
    /**
462
     * Non-admin tier publisher constraints (inserted as a SPARQL sub-pattern).
463
     * Each constraint owns the AccountState (pkh → agent) lookup so the join
464
     * variable is bound through a targeted pattern. The observer-self variant
465
     * binds {@code npa:agent ?agent} directly — no separate {@code ?publisher}
466
     * variable, no post-join equality filter — which lets the planner anchor
467
     * the AccountState lookup on the already-bound {@code ?agent} instead of
468
     * enumerating all approved publishers and filtering at the end.
469
     */
470
    static final String PUBLISHER_IS_ADMIN = """
471
            ?acct a npa:AccountState ;
472
                  npa:pubkey ?pkh ;
473
                  npa:agent  ?publisher .
474
            ?adminRI a gen:RoleInstantiation ;
475
                     npa:forSpace ?space ;
476
                     npa:regularProperty gen:hasAdmin ;
477
                     npa:forAgent ?publisher .
478
            """;
479

480
    /** Observer self-evidence: the assignee's own pubkey signed the instantiation. */
481
    static final String PUBLISHER_IS_SELF = """
482
            ?acct a npa:AccountState ;
483
                  npa:pubkey ?pkh ;
484
                  npa:agent  ?agent .
485
            """;
486

487
    /**
488
     * Maintainer / Member / Observer tier INSERT. Same shape: find an instantiation
489
     * whose predicate matches a RoleDeclaration of the given tier attached to the
490
     * target space, and whose publisher passes the tier-specific constraint.
491
     */
492
    static String nonAdminTierUpdate(IRI graph, long lastProcessed,
493
                                     IRI tierClass, String publisherConstraint) {
494
        // Order tuned for RDF4J's evaluator (which executes BGPs roughly in order).
495
        // The crucial choice is the *anchor*: instantiation-first plans send the
496
        // planner exploring the full ~thousands of candidate RIs and only filter
497
        // by tier at the very end. Attachment-first anchors on the small set of
498
        // gen:RoleAssignment rows already validated in this space-state graph
499
        // (~hundreds, often zero) and walks outward by bound (?role, ?space).
500
        //
501
        //   1. Anchor on RoleAssignments in this space-state graph (small).
502
        //   2. Match the tier-pinned RoleDeclaration by ?role.
503
        //   3. Pair role-decl direction to instantiation direction in one UNION
504
        //      so only (reg, reg)/(inv, inv) combos are explored.
505
        //   4. Targeted instantiation lookup — (?space, ?pred) are bound.
506
        //   5. Publisher constraint (incl. AccountState resolution).
507
        //   6. Load-number filter on bound ?np.
508
        //   7. Dedup at the end.
509
        return """
69✔
510
                PREFIX npa:  <%1$s>
511
                PREFIX gen:  <%2$s>
512
                INSERT { GRAPH <%3$s> {
513
                  ?ri a gen:RoleInstantiation ;
514
                      npa:forSpace ?space ;
515
                      npa:forAgent ?agent ;
516
                      npa:viaNanopub ?np .
517
                } }
518
                WHERE {
519
                  # 1. Anchor: validated attachments in this space-state graph.
520
                  GRAPH <%3$s> {
521
                    ?ra a gen:RoleAssignment ;
522
                        gen:hasRole  ?role ;
523
                        npa:forSpace ?space .
524
                  }
525
                  # 2. Tier-pinned RoleDeclaration (?role bound from the attachment).
526
                  GRAPH <%4$s> {
527
                    ?rd a npa:RoleDeclaration ;
528
                        npa:hasRoleType <%7$s> ;
529
                        npa:role        ?role ;
530
                        npa:viaNanopub  ?rdNp .
531
                    # 3. Pair direction so only matching combos are explored.
532
                    {
533
                      ?rd gen:hasRegularProperty ?pred .
534
                      ?ri npa:regularProperty    ?pred .
535
                    }
536
                    UNION
537
                    {
538
                      ?rd gen:hasInverseProperty ?pred .
539
                      ?ri npa:inverseProperty    ?pred .
540
                    }
541
                    # 4. Targeted instantiation lookup — (?space, ?pred) bound.
542
                    ?ri a gen:RoleInstantiation ;
543
                        npa:forSpace   ?space ;
544
                        npa:forAgent   ?agent ;
545
                        npa:pubkeyHash ?pkh ;
546
                        npa:viaNanopub ?np .
547
                  }
548
                  # 5. Publisher constraint (incl. AccountState resolution).
549
                  GRAPH <%3$s> {
550
                    %9$s
551
                  }
552
                  # 6. Load-number filter on bound ?np.
553
                  GRAPH <%10$s> {
554
                    ?np npa:hasLoadNumber ?ln .
555
                    FILTER (?ln > %5$d)
556
                  }
557
                  # 7. Invalidation filters — outside the GRAPH block so the
558
                  #    planner defers them until ?rdNp/?np are bound.
559
                  %8$s
560
                  %6$s
561
                  # 8. Dedup last.
562
                  FILTER NOT EXISTS { GRAPH <%3$s> {
563
                    ?existing a gen:RoleInstantiation ;
564
                              npa:forSpace ?space ;
565
                              npa:forAgent ?agent ;
566
                              npa:viaNanopub ?np .
567
                  } }
568
                }
569
                """.formatted(
3✔
570
                NPA.NAMESPACE,
571
                GEN.NAMESPACE,
572
                graph,
573
                SpacesVocab.SPACES_GRAPH,
574
                lastProcessed,
15✔
575
                invalidationFilter("np"),
27✔
576
                tierClass,
577
                invalidationFilter("rdNp"),
30✔
578
                publisherConstraint,
579
                NPA.GRAPH);
580
    }
581

582
    /**
583
     * Copies trust-approved {@code npa:AccountState} rows from {@code npat:<T>}
584
     * in the {@code trust} repo into {@code newGraph} in the {@code spaces} repo,
585
     * inside one spaces-side serializable transaction.
586
     *
587
     * @return number of rows mirrored (useful for metrics / logging)
588
     */
589
    int mirrorTrustState(String trustStateHash, IRI newGraph) {
590
        IRI trustStateIri = NPAT.forHash(trustStateHash);
×
591
        int count = 0;
×
592
        try (RepositoryConnection trustConn = TripleStore.get().getRepoConnection(TRUST_REPO);
×
593
             RepositoryConnection spacesConn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
594
            trustConn.begin(IsolationLevels.READ_COMMITTED);
×
595
            spacesConn.begin(IsolationLevels.SERIALIZABLE);
×
596
            // Walk rdf:type triples in the trust state's graph; for each AccountState,
597
            // check status and copy the approved ones verbatim (minus status-specific
598
            // detail triples, which we don't need for validation).
599
            try (RepositoryResult<Statement> typeRows = trustConn.getStatements(
×
600
                    null, RDF.TYPE, NPA_ACCOUNT_STATE, trustStateIri)) {
601
                while (typeRows.hasNext()) {
×
602
                    Statement st = typeRows.next();
×
603
                    if (!(st.getSubject() instanceof IRI accountStateIri)) continue;
×
604
                    Value status = trustConn.getStatements(accountStateIri, NPA_TRUST_STATUS, null, trustStateIri)
×
605
                            .stream().findFirst().map(Statement::getObject).orElse(null);
×
606
                    if (!(status instanceof IRI statusIri) || !APPROVED_SET.contains(statusIri)) continue;
×
607
                    Value agent = trustConn.getStatements(accountStateIri, NPA_AGENT, null, trustStateIri)
×
608
                            .stream().findFirst().map(Statement::getObject).orElse(null);
×
609
                    Value pubkey = trustConn.getStatements(accountStateIri, NPA_PUBKEY, null, trustStateIri)
×
610
                            .stream().findFirst().map(Statement::getObject).orElse(null);
×
611
                    if (agent == null || pubkey == null) {
×
612
                        log.warn("AuthorityResolver.mirror: account {} missing agent or pubkey; skipping",
×
613
                                accountStateIri);
614
                        continue;
×
615
                    }
616
                    spacesConn.add(accountStateIri, RDF.TYPE, NPA_ACCOUNT_STATE, newGraph);
×
617
                    spacesConn.add(accountStateIri, NPA_AGENT, agent, newGraph);
×
618
                    spacesConn.add(accountStateIri, NPA_PUBKEY, pubkey, newGraph);
×
619
                    spacesConn.add(accountStateIri, NPA_TRUST_STATUS, statusIri, newGraph);
×
620
                    count++;
×
621
                }
×
622
            }
623
            spacesConn.commit();
×
624
            trustConn.commit();
×
625
        }
626
        return count;
×
627
    }
628

629
    // ---------------- Pointer + counter helpers ----------------
630

631
    /**
632
     * Reads the current {@code npa:hasCurrentSpaceState} pointer from the
633
     * {@code npa:graph} admin graph of the {@code spaces} repo. Returns
634
     * {@code null} if no pointer exists yet.
635
     */
636
    IRI getCurrentSpaceStateGraph() {
637
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
638
            Value v = Utils.getObjectForPattern(conn, NPA.GRAPH, NPA.THIS_REPO,
×
639
                    SpacesVocab.HAS_CURRENT_SPACE_STATE);
640
            return (v instanceof IRI iri) ? iri : null;
×
641
        } catch (Exception ex) {
×
642
            log.warn("AuthorityResolver: failed to read hasCurrentSpaceState pointer: {}", ex.toString());
×
643
            return null;
×
644
        }
645
    }
646

647
    /** Convenience: local-name of the current space-state graph IRI. */
648
    private String getCurrentSpaceStateGraphLocalName() {
649
        IRI iri = getCurrentSpaceStateGraph();
×
650
        if (iri == null) return null;
×
651
        String s = iri.stringValue();
×
652
        if (!s.startsWith(SpacesVocab.NPASS_NAMESPACE)) return null;
×
653
        return s.substring(SpacesVocab.NPASS_NAMESPACE.length());
×
654
    }
655

656
    long getCurrentLoadCounter() {
657
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
658
            Value v = Utils.getObjectForPattern(conn, NPA.GRAPH, NPA.THIS_REPO,
×
659
                    SpacesVocab.CURRENT_LOAD_COUNTER);
660
            if (v == null) return 0;
×
661
            try {
662
                return Long.parseLong(v.stringValue());
×
663
            } catch (NumberFormatException ex) {
×
664
                log.warn("AuthorityResolver: non-numeric currentLoadCounter: {}", v);
×
665
                return 0;
×
666
            }
667
        } catch (Exception ex) {
×
668
            log.warn("AuthorityResolver: failed to read currentLoadCounter: {}", ex.toString());
×
669
            return 0;
×
670
        }
671
    }
672

673
    /**
674
     * Atomic pointer flip: a single SPARQL {@code DELETE … INSERT … WHERE}
675
     * replaces the old pointer with the new one in one statement, so readers
676
     * never see a zero-pointer window.
677
     */
678
    void flipPointer(IRI newGraph) {
679
        String update = String.format("""
×
680
                DELETE { GRAPH <%s> { <%s> <%s> ?old } }
681
                INSERT { GRAPH <%s> { <%s> <%s> <%s> } }
682
                WHERE  { OPTIONAL { GRAPH <%s> { <%s> <%s> ?old } } }
683
                """,
684
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.HAS_CURRENT_SPACE_STATE,
685
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.HAS_CURRENT_SPACE_STATE, newGraph,
686
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.HAS_CURRENT_SPACE_STATE);
687
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
688
            conn.begin(IsolationLevels.SERIALIZABLE);
×
689
            conn.prepareUpdate(QueryLanguage.SPARQL, update).execute();
×
690
            conn.commit();
×
691
        }
692
    }
×
693

694
    void writeProcessedUpTo(IRI graph, long loadCounter) {
695
        String update = String.format("""
×
696
                DELETE { GRAPH <%s> { <%s> <%s> ?old } }
697
                INSERT { GRAPH <%s> { <%s> <%s> "%d"^^<http://www.w3.org/2001/XMLSchema#long> } }
698
                WHERE  { OPTIONAL { GRAPH <%s> { <%s> <%s> ?old } } }
699
                """,
700
                graph, graph, SpacesVocab.PROCESSED_UP_TO,
701
                graph, graph, SpacesVocab.PROCESSED_UP_TO, loadCounter,
×
702
                graph, graph, SpacesVocab.PROCESSED_UP_TO);
703
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
704
            conn.begin(IsolationLevels.SERIALIZABLE);
×
705
            conn.prepareUpdate(QueryLanguage.SPARQL, update).execute();
×
706
            conn.commit();
×
707
        }
708
    }
×
709

710
    /**
711
     * Reads {@code processedUpTo} from the given space-state graph.
712
     * Returns {@code -1} if absent (graph not fully built yet).
713
     */
714
    long readProcessedUpTo(IRI graph) {
715
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
716
            String query = String.format(
×
717
                    "SELECT ?n WHERE { GRAPH <%s> { <%s> <%s> ?n } }",
718
                    graph, graph, SpacesVocab.PROCESSED_UP_TO);
719
            try (TupleQueryResult r = conn.prepareTupleQuery(QueryLanguage.SPARQL, query).evaluate()) {
×
720
                if (!r.hasNext()) return -1;
×
721
                BindingSet b = r.next();
×
722
                return Long.parseLong(b.getBinding("n").getValue().stringValue());
×
723
            }
×
724
        } catch (Exception ex) {
×
725
            log.warn("AuthorityResolver: failed to read processedUpTo for {}: {}", graph, ex.toString());
×
726
            return -1;
×
727
        }
728
    }
729

730
    void dropGraph(IRI graph) {
731
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
732
            conn.begin(IsolationLevels.SERIALIZABLE);
×
733
            conn.clear(graph);
×
734
            conn.commit();
×
735
            log.info("AuthorityResolver: dropped old space-state graph {}", graph);
×
736
        }
737
    }
×
738

739
    // ---------------- Trust-repo pointer lookup (used by TrustStateRegistry's bootstrap) ----------------
740

741
    /**
742
     * Queries the {@code trust} repo directly for the current trust-state hash.
743
     * Prefer {@link TrustStateRegistry#getCurrentHash()} in normal operation —
744
     * this helper exists for tests and diagnostics.
745
     *
746
     * @return the current trust-state hash, or empty if none is set
747
     */
748
    Optional<String> readTrustRepoCurrentHash() {
749
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(TRUST_REPO)) {
×
750
            Value v = Utils.getObjectForPattern(conn, NPA.GRAPH, NPA.THIS_REPO,
×
751
                    NPA_HAS_CURRENT_TRUST_STATE);
752
            if (!(v instanceof IRI iri)) return Optional.empty();
×
753
            String s = iri.stringValue();
×
754
            if (!s.startsWith(NPAT.NAMESPACE)) return Optional.empty();
×
755
            return Optional.of(s.substring(NPAT.NAMESPACE.length()));
×
756
        } catch (Exception ex) {
×
757
            log.warn("AuthorityResolver: failed to read trust-repo current pointer: {}", ex.toString());
×
758
            return Optional.empty();
×
759
        }
760
    }
761

762
    private static String abbrev(String hash) {
763
        return hash.length() > 12 ? hash.substring(0, 12) + "…" : hash;
×
764
    }
765

766
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc