• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-query / 24965204834

26 Apr 2026 07:33PM UTC coverage: 56.939% (-2.0%) from 58.926%
24965204834

push

github

web-flow
Merge pull request #83 from knowledgepixels/feature/62-spaces-materializer-2c

feat: incremental delta cycles + invalidation propagation + periodic rebuild (#62, PR 2c/3)

386 of 768 branches covered (50.26%)

Branch coverage included in aggregate %.

1087 of 1819 relevant lines covered (59.76%)

9.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

12.5
src/main/java/com/knowledgepixels/query/AuthorityResolver.java
1
package com.knowledgepixels.query;
2

3
import java.util.ArrayList;
4
import java.util.List;
5
import java.util.Optional;
6
import java.util.Set;
7

8
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
9
import org.eclipse.rdf4j.model.IRI;
10
import org.eclipse.rdf4j.model.Statement;
11
import org.eclipse.rdf4j.model.Value;
12
import org.eclipse.rdf4j.model.ValueFactory;
13
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
14
import org.eclipse.rdf4j.model.vocabulary.RDF;
15
import org.eclipse.rdf4j.query.BindingSet;
16
import org.eclipse.rdf4j.query.QueryLanguage;
17
import org.eclipse.rdf4j.query.TupleQueryResult;
18
import org.eclipse.rdf4j.repository.RepositoryConnection;
19
import org.eclipse.rdf4j.repository.RepositoryResult;
20
import org.nanopub.vocabulary.NPA;
21
import org.slf4j.Logger;
22
import org.slf4j.LoggerFactory;
23

24
import com.knowledgepixels.query.vocabulary.GEN;
25
import com.knowledgepixels.query.vocabulary.NPAT;
26
import com.knowledgepixels.query.vocabulary.SpacesVocab;
27

28
/**
29
 * Drives the space-state materialization pipeline. Three entry points scheduled
30
 * by {@code MainVerticle}:
31
 * <ul>
32
 *   <li>{@link #tick()} — detects trust-state flips (full build) and otherwise
33
 *       advances the current space-state graph by an {@link #runIncrementalCycle
34
 *       incremental cycle} bounded by {@code (processedUpTo, currentLoadCounter]}.</li>
35
 *   <li>{@link #periodicRebuildTick()} — checks the {@code npa:needsFullRebuild}
36
 *       flag set by structural invalidations and re-runs the full build into a
37
 *       fresh graph, atomically flips the pointer, drops the old graph.</li>
38
 *   <li>{@link #cleanOrphans()} — startup cleanup of {@code npass:*} graphs the
39
 *       pointer isn't referencing.</li>
40
 * </ul>
41
 *
42
 * <p>Incremental cycle order: invalidation DELETEs (admin RI / RoleAssignment /
43
 * non-admin RI) → mirror-step delta is implicit (rebuilt only on full build) →
44
 * per-tier INSERTs (admin → attachment → maintainer → member → observer) →
45
 * late-arrival sweep (re-run downstream tiers without the load-number filter
46
 * iff this cycle added any structural rows). Sets {@code npa:needsFullRebuild}
47
 * when an admin RI / RoleAssignment / RoleDeclaration was invalidated; periodic
48
 * worker turns the flag into a from-scratch rebuild.
49
 *
50
 * <p>See {@code doc/plan-space-repositories.md} — this implements the "Full
51
 * build", "Incremental cycle", and "Periodic full rebuild" procedures.
52
 */
53
public final class AuthorityResolver {
54

55
    private static final Logger log = LoggerFactory.getLogger(AuthorityResolver.class);
9✔
56

57
    private static final ValueFactory vf = SimpleValueFactory.getInstance();
6✔
58

59
    private static final String SPACES_REPO = "spaces";
60
    private static final String TRUST_REPO = "trust";
61

62
    /** NPA constants pulled in locally (trust-side). */
63
    private static final IRI NPA_HAS_CURRENT_TRUST_STATE =
9✔
64
            vf.createIRI(NPA.NAMESPACE, "hasCurrentTrustState");
6✔
65
    private static final IRI NPA_ACCOUNT_STATE = vf.createIRI(NPA.NAMESPACE, "AccountState");
15✔
66
    private static final IRI NPA_AGENT = vf.createIRI(NPA.NAMESPACE, "agent");
15✔
67
    private static final IRI NPA_PUBKEY = vf.createIRI(NPA.NAMESPACE, "pubkey");
15✔
68
    private static final IRI NPA_TRUST_STATUS = vf.createIRI(NPA.NAMESPACE, "trustStatus");
15✔
69
    private static final IRI NPA_LOADED = vf.createIRI(NPA.NAMESPACE, "loaded");
15✔
70
    private static final IRI NPA_TO_LOAD = vf.createIRI(NPA.NAMESPACE, "toLoad");
15✔
71

72
    /**
73
     * Trust-approved set: rows with one of these {@code npa:trustStatus} values
74
     * are mirrored into the space-state graph. Per
75
     * {@code doc/design-trust-state-repos.md}, these are the two "authority-
76
     * approving" statuses; {@code npa:contested}, {@code npa:skipped}, and the
77
     * transient statuses are distinct values of the same predicate and are
78
     * excluded automatically by this positive-list filter.
79
     */
80
    private static final Set<IRI> APPROVED_SET = Set.of(NPA_LOADED, NPA_TO_LOAD);
15✔
81

82
    private static AuthorityResolver instance;
83

84
    /** Returns the singleton. */
85
    public static synchronized AuthorityResolver get() {
86
        if (instance == null) {
6✔
87
            instance = new AuthorityResolver();
12✔
88
        }
89
        return instance;
6✔
90
    }
91

92
    private AuthorityResolver() {
93
    }
94

95
    // ---------------- Public entry points ----------------
96

97
    /**
98
     * Poll entry point. Behaviour:
99
     * <ul>
100
     *   <li>If no current space-state graph or the trust state has flipped → full build.</li>
101
     *   <li>Otherwise → {@link #runIncrementalCycle incremental cycle} on the load-number
102
     *       delta {@code (processedUpTo, currentLoadCounter]}. No-op if {@code
103
     *       processedUpTo == currentLoadCounter}.</li>
104
     * </ul>
105
     * Safe to call repeatedly on a schedule. Gated by {@link FeatureFlags#spacesEnabled()}.
106
     */
107
    public void tick() {
108
        if (!FeatureFlags.spacesEnabled()) return;
6!
109
        String trustStateHash = TrustStateRegistry.get().getCurrentHash().orElse(null);
18✔
110
        if (trustStateHash == null) {
6!
111
            log.debug("AuthorityResolver.tick: no current trust state yet — skipping");
9✔
112
            return;
3✔
113
        }
114
        IRI currentGraph = getCurrentSpaceStateGraph();
×
115
        String currentGraphName = (currentGraph == null) ? null
×
116
                : currentGraph.stringValue().substring(SpacesVocab.NPASS_NAMESPACE.length());
×
117
        if (currentGraphName == null || !currentGraphName.startsWith(trustStateHash + "_")) {
×
118
            log.info("AuthorityResolver.tick: trust-state flip detected (now {}); running full build",
×
119
                    abbrev(trustStateHash));
×
120
            runFullBuild(trustStateHash);
×
121
            return;
×
122
        }
123
        runIncrementalCycle(currentGraph);
×
124
    }
×
125

126
    /**
127
     * Periodic worker. If {@code npa:needsFullRebuild} was raised by an
128
     * incremental cycle's structural DELETE, runs a from-scratch rebuild into
129
     * a fresh space-state graph (using the current trust-state hash and load
130
     * counter) and clears the flag. No-op when the flag is not set. Safe to
131
     * call concurrently with {@link #tick()} when both are scheduled on the
132
     * same single-threaded executor.
133
     */
134
    public void periodicRebuildTick() {
135
        if (!FeatureFlags.spacesEnabled()) return;
×
136
        if (!readNeedsFullRebuild()) return;
×
137
        String trustStateHash = TrustStateRegistry.get().getCurrentHash().orElse(null);
×
138
        if (trustStateHash == null) {
×
139
            log.debug("AuthorityResolver.periodicRebuildTick: no current trust state — deferring");
×
140
            return;
×
141
        }
142
        log.info("AuthorityResolver.periodicRebuildTick: needsFullRebuild flag set; rebuilding");
×
143
        runFullBuild(trustStateHash);
×
144
        clearNeedsFullRebuild();
×
145
    }
×
146

147
    /**
148
     * Startup cleanup: drop any {@code npass:*} graph that the
149
     * {@code npa:hasCurrentSpaceState} pointer isn't pointing at. Orphans come
150
     * from crashes mid-build. Safe to call at any time; idempotent.
151
     */
152
    public void cleanOrphans() {
153
        if (!FeatureFlags.spacesEnabled()) return;
×
154
        IRI current = getCurrentSpaceStateGraph();
×
155
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
156
            int dropped = 0;
×
157
            try (RepositoryResult<org.eclipse.rdf4j.model.Resource> ctxs = conn.getContextIDs()) {
×
158
                List<IRI> toDrop = new ArrayList<>();
×
159
                while (ctxs.hasNext()) {
×
160
                    org.eclipse.rdf4j.model.Resource ctx = ctxs.next();
×
161
                    if (!(ctx instanceof IRI iri)) continue;
×
162
                    if (!iri.stringValue().startsWith(SpacesVocab.NPASS_NAMESPACE)) continue;
×
163
                    if (iri.equals(current)) continue;
×
164
                    toDrop.add(iri);
×
165
                }
×
166
                for (IRI iri : toDrop) {
×
167
                    conn.begin(IsolationLevels.SERIALIZABLE);
×
168
                    conn.clear(iri);
×
169
                    conn.commit();
×
170
                    dropped++;
×
171
                    log.info("AuthorityResolver.cleanOrphans: dropped orphan graph {}", iri);
×
172
                }
×
173
            }
174
            if (dropped == 0) {
×
175
                log.debug("AuthorityResolver.cleanOrphans: no orphan space-state graphs");
×
176
            }
177
        } catch (Exception ex) {
×
178
            log.info("AuthorityResolver.cleanOrphans: failed: {}", ex.toString());
×
179
        }
×
180
    }
×
181

182
    // ---------------- Full build ----------------
183

184
    /**
185
     * Mutex-protected full build of the space-state graph for the given trust
186
     * state. Captures {@code M = currentLoadCounter}, mirrors trust-approved
187
     * rows, (PR 2b: runs per-tier UPDATE loops from scratch), stamps
188
     * {@code processedUpTo = M}, flips the pointer, drops the previous graph.
189
     */
190
    synchronized void runFullBuild(String trustStateHash) {
191
        long loadCounter = getCurrentLoadCounter();
×
192
        IRI newGraph = SpacesVocab.forSpaceState(trustStateHash, loadCounter);
×
193
        IRI oldGraph = getCurrentSpaceStateGraph();
×
194
        if (newGraph.equals(oldGraph)) {
×
195
            log.debug("AuthorityResolver.runFullBuild: already current at {}", newGraph);
×
196
            return;
×
197
        }
198

199
        // 1. Mirror trust-approved rows into the new graph.
200
        int mirrored = mirrorTrustState(trustStateHash, newGraph);
×
201

202
        // 2. Per-tier UPDATE loops (from scratch: lastProcessed = -1 so the
203
        //    delta filter FILTER(?ln > ?lastProcessed) includes everything).
204
        TierCounts counts = runAllTierLoops(newGraph, -1);
×
205

206
        // 3. Stamp processedUpTo inside the new graph.
207
        writeProcessedUpTo(newGraph, loadCounter);
×
208

209
        // 4. Flip the current-space-state pointer.
210
        flipPointer(newGraph);
×
211

212
        // 5. Drop the old graph if one existed.
213
        if (oldGraph != null) {
×
214
            dropGraph(oldGraph);
×
215
        }
216

217
        log.info("AuthorityResolver: full build complete — graph={} mirrored={} rows loadCounter={} "
×
218
                        + "tiers: admin={} attachment={} maintainer={} member={} observer={}",
219
                newGraph, mirrored, loadCounter,
×
220
                counts.admin, counts.attachment, counts.maintainer, counts.member, counts.observer);
×
221
    }
×
222

223
    // ---------------- Incremental cycle ----------------
224

225
    /**
226
     * Single delta cycle on the current space-state graph. Bounded by
227
     * {@code (processedUpTo, currentLoadCounter]}; no-op if the range is empty.
228
     *
229
     * <p>Order:
230
     * <ol>
231
     *   <li>Apply invalidation DELETEs (admin RI, RoleAssignment, non-admin RI)
232
     *       and the RoleDeclaration ASK. Any DELETE on a structural kind sets
233
     *       {@code npa:needsFullRebuild} to bound the staleness from sticky
234
     *       downstream entries; the periodic worker turns that into a from-scratch
235
     *       rebuild on its next pass.</li>
236
     *   <li>Run per-tier INSERTs in the same order as the full build.</li>
237
     *   <li>Late-arrival sweep: if any structural row was added, re-run downstream
238
     *       tier INSERTs with {@code lastProcessed = -1} to catch candidates whose
239
     *       enabling event landed in this same cycle. Dedup filters protect
240
     *       against double-insert.</li>
241
     *   <li>Bump {@code processedUpTo} to {@code currentLoadCounter}.</li>
242
     * </ol>
243
     */
244
    synchronized void runIncrementalCycle(IRI graph) {
245
        long currentLoadCounter = getCurrentLoadCounter();
×
246
        long lastProcessed = readProcessedUpTo(graph);
×
247
        if (lastProcessed < 0) {
×
248
            log.warn("AuthorityResolver.runIncrementalCycle: missing processedUpTo on {}; skipping",
×
249
                    graph);
250
            return;
×
251
        }
252
        if (currentLoadCounter <= lastProcessed) {
×
253
            log.debug("AuthorityResolver.runIncrementalCycle: caught up at load {} on {}",
×
254
                    currentLoadCounter, graph);
×
255
            return;
×
256
        }
257

258
        boolean structuralInvalidation = applyInvalidations(graph, lastProcessed);
×
259
        TierCounts counts = runAllTierLoops(graph, lastProcessed);
×
260
        boolean structuralAdds = (counts.admin > 0)
×
261
                || (counts.attachment > 0)
262
                || newRoleDeclarationsArrived(lastProcessed);
×
263
        if (structuralAdds) {
×
264
            // Late-arrival sweep: only the leaf tiers (attachment/maintainer/member/observer)
265
            // can promote candidates whose enabling event arrived in this same cycle. Skip
266
            // the admin tier — its only enabling event is the admin grant itself, already
267
            // handled by the regular pass.
268
            TierCounts lateCounts = runDownstreamWithoutLoadFilter(graph);
×
269
            counts.attachment += lateCounts.attachment;
×
270
            counts.maintainer += lateCounts.maintainer;
×
271
            counts.member     += lateCounts.member;
×
272
            counts.observer   += lateCounts.observer;
×
273
        }
274

275
        writeProcessedUpTo(graph, currentLoadCounter);
×
276

277
        log.info("AuthorityResolver: incremental cycle complete — graph={} delta=({}, {}] "
×
278
                        + "tiers: admin={} attachment={} maintainer={} member={} observer={} "
279
                        + "structuralInvalidation={} structuralAdds={}",
280
                graph, lastProcessed, currentLoadCounter,
×
281
                counts.admin, counts.attachment, counts.maintainer, counts.member, counts.observer,
×
282
                structuralInvalidation, structuralAdds);
×
283
    }
×
284

285
    /**
286
     * Runs the four invalidation-DELETE / ASK steps. Sets {@code npa:needsFullRebuild}
287
     * when admin-RI, RoleAssignment, or RoleDeclaration invalidations matched (the
288
     * three structural kinds). Leaf-tier RI deletes don't set the flag.
289
     *
290
     * @return true iff at least one structural kind was invalidated
291
     */
292
    boolean applyInvalidations(IRI graph, long lastProcessed) {
293
        boolean structural = false;
×
294
        if (wouldInvalidate(graph, lastProcessed, /*adminPinned=*/ true,
×
295
                            adminInvalidationCheckWhere(graph, lastProcessed))) {
×
296
            executeUpdate(adminInvalidationDelete(graph, lastProcessed));
×
297
            structural = true;
×
298
        }
299
        if (wouldInvalidate(graph, lastProcessed, /*adminPinned=*/ false,
×
300
                            roleAssignmentInvalidationCheckWhere(graph, lastProcessed))) {
×
301
            executeUpdate(roleAssignmentInvalidationDelete(graph, lastProcessed));
×
302
            structural = true;
×
303
        }
304
        // RoleDeclaration ASK only — RDs aren't materialized into the space-state
305
        // graph, so there's nothing to DELETE here. The flag still flips because
306
        // sticky downstream RIs derived from the now-invalidated RD need a
307
        // from-scratch recompute.
308
        if (wouldInvalidate(graph, lastProcessed, /*adminPinned=*/ false,
×
309
                            roleDeclarationInvalidationCheckWhere(lastProcessed))) {
×
310
            structural = true;
×
311
        }
312
        // Leaf-tier RI deletes — no flag.
313
        executeUpdate(leafTierInvalidationDelete(graph, lastProcessed));
×
314
        if (structural) setNeedsFullRebuild();
×
315
        return structural;
×
316
    }
317

318
    /**
319
     * Runs the four leaf tiers (attachment/maintainer/member/observer) with
320
     * {@code lastProcessed = -1} so the load-number filter on the candidate
321
     * side admits everything. Dedup filters in the tier templates prevent
322
     * double-insert. Used by the late-arrival sweep.
323
     */
324
    TierCounts runDownstreamWithoutLoadFilter(IRI graph) {
325
        TierCounts c = new TierCounts();
×
326
        c.attachment = runTierLabeled("attachment(late)", graph,
×
327
                attachmentValidationUpdate(graph, -1));
×
328
        c.maintainer = runTierLabeled("maintainer(late)", graph,
×
329
                nonAdminTierUpdate(graph, -1, GEN.MAINTAINER_ROLE, PUBLISHER_IS_ADMIN));
×
330
        c.member = runTierLabeled("member(admin-pub,late)", graph,
×
331
                nonAdminTierUpdate(graph, -1, GEN.MEMBER_ROLE, PUBLISHER_IS_ADMIN));
×
332
        c.member += runTierLabeled("member(maint-pub,late)", graph,
×
333
                nonAdminTierUpdate(graph, -1,
×
334
                        GEN.MEMBER_ROLE, publisherIsTieredRole(GEN.MAINTAINER_ROLE)));
×
335
        c.observer = runTierLabeled("observer(self,late)", graph,
×
336
                nonAdminTierUpdate(graph, -1, GEN.OBSERVER_ROLE, PUBLISHER_IS_SELF));
×
337
        return c;
×
338
    }
339

340
    /**
341
     * Cheap ASK: did any new {@code npa:RoleDeclaration} extraction land in the
342
     * load-number delta {@code (lastProcessed, ∞)}? Used by the late-arrival
343
     * trigger so an RD that arrives in the same cycle as a matching candidate
344
     * still gets validated.
345
     */
346
    boolean newRoleDeclarationsArrived(long lastProcessed) {
347
        String ask = String.format("""
×
348
                PREFIX npa: <%1$s>
349
                ASK {
350
                  GRAPH <%2$s> {
351
                    ?rd a npa:RoleDeclaration ;
352
                        npa:viaNanopub ?np .
353
                  }
354
                  GRAPH <%3$s> {
355
                    ?np npa:hasLoadNumber ?ln .
356
                    FILTER (?ln > %4$d)
357
                  }
358
                }
359
                """, NPA.NAMESPACE, SpacesVocab.SPACES_GRAPH, NPA.GRAPH, lastProcessed);
×
360
        return runAsk(ask);
×
361
    }
362

363
    // ---------------- Tier UPDATE loops ----------------
364

365
    /** Per-tier INSERT counts (for logging/metrics). */
366
    static final class TierCounts {
×
367
        int admin;
368
        int attachment;
369
        int maintainer;
370
        int member;
371
        int observer;
372
    }
373

374
    /**
375
     * Runs the five tier loops in order: admin → {@code gen:hasRole} attachment
376
     * validation → maintainer → member → observer. Each loop iterates a SPARQL
377
     * INSERT to fixed point (no new triples added). Returns per-tier counts.
378
     *
379
     * @param graph         target space-state graph
380
     * @param lastProcessed load-number horizon; use {@code -1} for full build
381
     */
382
    TierCounts runAllTierLoops(IRI graph, long lastProcessed) {
383
        TierCounts c = new TierCounts();
×
384
        c.admin = runTierLabeled("admin", graph, adminTierUpdate(graph, lastProcessed));
×
385
        c.attachment = runTierLabeled("attachment", graph,
×
386
                attachmentValidationUpdate(graph, lastProcessed));
×
387
        c.maintainer = runTierLabeled("maintainer", graph, nonAdminTierUpdate(graph, lastProcessed,
×
388
                GEN.MAINTAINER_ROLE, PUBLISHER_IS_ADMIN));
389
        // Member tier: admin OR maintainer publisher — split into two simpler updates
390
        // so the query planner doesn't struggle with the UNION.
391
        c.member = runTierLabeled("member(admin-pub)", graph, nonAdminTierUpdate(graph, lastProcessed,
×
392
                GEN.MEMBER_ROLE, PUBLISHER_IS_ADMIN));
393
        c.member += runTierLabeled("member(maint-pub)", graph, nonAdminTierUpdate(graph, lastProcessed,
×
394
                GEN.MEMBER_ROLE, publisherIsTieredRole(GEN.MAINTAINER_ROLE)));
×
395
        // Observer tier: self-evidence only per the plan's policy table
396
        // (gen:ObserverRole = self). Authority-publisher sub-tiers were overreach;
397
        // the three of them have been removed, so an observer instantiation is
398
        // validated iff the assignee's own pubkey signed it.
399
        c.observer = runTierLabeled("observer(self)", graph, nonAdminTierUpdate(graph, lastProcessed,
×
400
                GEN.OBSERVER_ROLE, PUBLISHER_IS_SELF));
401
        return c;
×
402
    }
403

404
    /**
405
     * Builds a publisher constraint requiring the publisher to be a validated holder
406
     * of the given tier's role (maintainer or member) in the target space.
407
     * Owns its own AccountState resolution so ?publisher is bound through the
408
     * targeted (pkh → agent) lookup rather than enumerated.
409
     */
410
    private static String publisherIsTieredRole(IRI tierClass) {
411
        return """
×
412
                ?acct a npa:AccountState ;
413
                      npa:pubkey ?pkh ;
414
                      npa:agent  ?publisher .
415
                ?tierRI a gen:RoleInstantiation ;
416
                        npa:forSpace ?space ;
417
                        npa:forAgent ?publisher .
418
                ?rdT a npa:RoleDeclaration ;
419
                     npa:hasRoleType <%1$s> .
420
                { ?tierRI npa:regularProperty ?predT . ?rdT gen:hasRegularProperty ?predT . }
421
                UNION
422
                { ?tierRI npa:inverseProperty ?predT . ?rdT gen:hasInverseProperty ?predT . }
423
                """.formatted(tierClass);
×
424
    }
425

426
    /** Wraps {@link #runTierLoop} with tier-name context for logs/exceptions. */
427
    private int runTierLabeled(String tier, IRI graph, String sparqlUpdate) {
428
        try {
429
            return runTierLoop(graph, sparqlUpdate);
×
430
        } catch (RuntimeException ex) {
×
431
            log.error("AuthorityResolver: tier={} failed with SPARQL UPDATE:\n{}\n", tier, sparqlUpdate, ex);
×
432
            throw ex;
×
433
        }
434
    }
435

436
    /**
437
     * Runs a single tier's INSERT to fixed point. Counts rows by probing
438
     * graph size before/after each INSERT; stops when the size doesn't change.
439
     *
440
     * @return total number of triples inserted by this tier across all iterations
441
     */
442
    int runTierLoop(IRI graph, String sparqlUpdate) {
443
        int total = 0;
×
444
        long before = graphSize(graph);
×
445
        while (true) {
446
            // Note: no explicit transaction wrapping here. In tests we observed that
447
            // HTTPRepository's RDF4J-transaction protocol silently no-op'd cross-graph
448
            // SPARQL UPDATEs with UNION sub-patterns inside conn.begin()/commit(),
449
            // while the same UPDATE POSTed directly to /statements applied correctly.
450
            // A bare prepareUpdate().execute() takes the direct /statements path and
451
            // runs the UPDATE atomically per SPARQL 1.1 semantics — which is all we
452
            // need; there's nothing else to commit atomically alongside the UPDATE.
453
            try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
454
                conn.prepareUpdate(QueryLanguage.SPARQL, sparqlUpdate).execute();
×
455
            }
456
            long after = graphSize(graph);
×
457
            long added = after - before;
×
458
            if (added <= 0) break;
×
459
            total += added;
×
460
            before = after;
×
461
        }
×
462
        return total;
×
463
    }
464

465
    private long graphSize(IRI graph) {
466
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
467
            return conn.size(graph);
×
468
        }
469
    }
470

471
    // ---------------- SPARQL templates ----------------
472

473
    /**
474
     * Reusable invalidation filter on a bound nanopub-IRI variable. Pass the bare
475
     * variable name (no leading {@code ?}); e.g. {@code invalidationFilter("np")}
476
     * produces an outer-scoped {@code FILTER NOT EXISTS { GRAPH npa:spacesGraph
477
     * { ?_inv_np a npa:Invalidation ; npa:invalidates ?np . } }}.
478
     *
479
     * <p>Important: this filter must be placed OUTSIDE the surrounding
480
     * {@code GRAPH npa:spacesGraph { ... }} block, not nested inside it. When
481
     * nested, RDF4J's planner couples the FILTER NOT EXISTS evaluation into the
482
     * join order (per-row scan of {@code ?_inv a npa:Invalidation} multiplied by
483
     * the candidate set), which we measured turning a 39ms query into a 60s+
484
     * timeout on the live observer-tier data. Outside the GRAPH block, the
485
     * planner defers the filter until {@code ?np}/{@code ?rdNp} are bound and
486
     * does a targeted index lookup.
487
     *
488
     * <p>Variable names must match {@code [A-Za-z0-9_]+} per SPARQL grammar —
489
     * embedding a {@code ?} inside {@code ?_inv_?np} would yield a parse error.
490
     */
491
    private static String invalidationFilter(String bareVarName) {
492
        return "FILTER NOT EXISTS { GRAPH <" + SpacesVocab.SPACES_GRAPH + "> {"
30✔
493
                + " ?_inv_" + bareVarName
494
                + " a <" + SpacesVocab.INVALIDATION + "> ; "
495
                + "<" + SpacesVocab.INVALIDATES + "> ?" + bareVarName + " . } }";
496
    }
497

498
    /**
499
     * Admin tier: seed from {@code npadef:...hasRootAdmin} (trusted by construction)
500
     * plus closed-over admin grants; insert any {@code gen:RoleInstantiation} with
501
     * {@code npa:inverseProperty gen:hasAdmin} whose publisher (resolved via mirrored
502
     * trust-approved AccountState) is already in the admin set.
503
     */
504
    static String adminTierUpdate(IRI graph, long lastProcessed) {
505
        // Order tuned for RDF4J's evaluator:
506
        //   1. Anchor on the small (seed UNION closed-over) set to bind ?publisher
507
        //      and ?space cheaply.
508
        //   2. Resolve ?pkh from the mirrored AccountState row (?publisher bound).
509
        //   3. Probe instantiations using the now-bound (?space, ?pkh) — targeted
510
        //      lookup, not a full RoleInstantiation scan.
511
        //   4. Load-number filter on bound ?np.
512
        //   5. Dedup at the end.
513
        return """
69✔
514
                PREFIX npa:  <%1$s>
515
                PREFIX gen:  <%2$s>
516
                INSERT { GRAPH <%3$s> {
517
                  ?ri a gen:RoleInstantiation ;
518
                      npa:forSpace ?space ;
519
                      npa:inverseProperty gen:hasAdmin ;
520
                      npa:forAgent ?agent ;
521
                      npa:viaNanopub ?np .
522
                } }
523
                WHERE {
524
                  # 1. Anchor: who is already an admin of which space?
525
                  {
526
                    # Seed branch: root-admin in a non-invalidated SpaceDefinition.
527
                    GRAPH <%4$s> {
528
                      ?def a npa:SpaceDefinition ;
529
                           npa:forSpaceRef  ?spaceRef ;
530
                           npa:hasRootAdmin ?publisher ;
531
                           npa:viaNanopub   ?defNp .
532
                      ?spaceRef npa:spaceIri ?space .
533
                    }
534
                    %7$s
535
                  }
536
                  UNION
537
                  {
538
                    # Closed-over branch: an existing admin in this space-state graph.
539
                    GRAPH <%3$s> {
540
                      ?prev a gen:RoleInstantiation ;
541
                            npa:forSpace        ?space ;
542
                            npa:inverseProperty gen:hasAdmin ;
543
                            npa:forAgent        ?publisher .
544
                    }
545
                  }
546
                  # 2. Mirror: resolve ?publisher → ?pkh via the trust-approved row.
547
                  GRAPH <%3$s> {
548
                    ?acct a npa:AccountState ;
549
                          npa:agent  ?publisher ;
550
                          npa:pubkey ?pkh .
551
                  }
552
                  # 3. Targeted instantiation lookup by space + pubkey.
553
                  GRAPH <%4$s> {
554
                    ?ri a gen:RoleInstantiation ;
555
                        npa:forSpace        ?space ;
556
                        npa:inverseProperty gen:hasAdmin ;
557
                        npa:forAgent        ?agent ;
558
                        npa:pubkeyHash      ?pkh ;
559
                        npa:viaNanopub      ?np .
560
                  }
561
                  %6$s
562
                  # 4. Load-number filter on bound ?np.
563
                  GRAPH <%8$s> {
564
                    ?np npa:hasLoadNumber ?ln .
565
                    FILTER (?ln > %5$d)
566
                  }
567
                  # 5. Dedup last.
568
                  FILTER NOT EXISTS { GRAPH <%3$s> {
569
                    ?existing a gen:RoleInstantiation ;
570
                              npa:forSpace ?space ;
571
                              npa:forAgent ?agent ;
572
                              npa:inverseProperty gen:hasAdmin .
573
                  } }
574
                }
575
                """.formatted(
3✔
576
                NPA.NAMESPACE,
577
                GEN.NAMESPACE,
578
                graph,
579
                SpacesVocab.SPACES_GRAPH,
580
                lastProcessed,
15✔
581
                invalidationFilter("np"),
15✔
582
                invalidationFilter("defNp"),
18✔
583
                NPA.GRAPH);
584
    }
585

586
    /**
587
     * {@code gen:hasRole} attachment validation: an attachment is validated iff its
588
     * publisher is already a validated admin of the target space. Adds
589
     * {@code gen:RoleAssignment} rows to the space-state graph.
590
     */
591
    static String attachmentValidationUpdate(IRI graph, long lastProcessed) {
592
        return """
69✔
593
                PREFIX npa:  <%1$s>
594
                PREFIX gen:  <%2$s>
595
                INSERT { GRAPH <%3$s> {
596
                  ?ra a gen:RoleAssignment ;
597
                      npa:forSpace ?space ;
598
                      gen:hasRole  ?role ;
599
                      npa:viaNanopub ?np .
600
                } }
601
                WHERE {
602
                  GRAPH <%4$s> {
603
                    ?ra a gen:RoleAssignment ;
604
                        npa:forSpace ?space ;
605
                        gen:hasRole  ?role ;
606
                        npa:pubkeyHash ?pkh ;
607
                        npa:viaNanopub ?np .
608
                  }
609
                  GRAPH <%7$s> {
610
                    ?np npa:hasLoadNumber ?ln .
611
                    FILTER (?ln > %5$d)
612
                  }
613
                  GRAPH <%3$s> {
614
                    ?acct a npa:AccountState ;
615
                          npa:agent  ?publisher ;
616
                          npa:pubkey ?pkh .
617
                    ?adminRI a gen:RoleInstantiation ;
618
                             npa:forSpace ?space ;
619
                             npa:inverseProperty gen:hasAdmin ;
620
                             npa:forAgent ?publisher .
621
                  }
622
                  %6$s
623
                  FILTER NOT EXISTS { GRAPH <%3$s> {
624
                    ?existing a gen:RoleAssignment ;
625
                              npa:forSpace ?space ;
626
                              gen:hasRole  ?role .
627
                  } }
628
                }
629
                """.formatted(
3✔
630
                NPA.NAMESPACE,
631
                GEN.NAMESPACE,
632
                graph,
633
                SpacesVocab.SPACES_GRAPH,
634
                lastProcessed,
15✔
635
                invalidationFilter("np"),
18✔
636
                NPA.GRAPH);
637
    }
638

639
    /**
640
     * Non-admin tier publisher constraints (inserted as a SPARQL sub-pattern).
641
     * Each constraint owns the AccountState (pkh → agent) lookup so the join
642
     * variable is bound through a targeted pattern. The observer-self variant
643
     * binds {@code npa:agent ?agent} directly — no separate {@code ?publisher}
644
     * variable, no post-join equality filter — which lets the planner anchor
645
     * the AccountState lookup on the already-bound {@code ?agent} instead of
646
     * enumerating all approved publishers and filtering at the end.
647
     */
648
    static final String PUBLISHER_IS_ADMIN = """
649
            ?acct a npa:AccountState ;
650
                  npa:pubkey ?pkh ;
651
                  npa:agent  ?publisher .
652
            ?adminRI a gen:RoleInstantiation ;
653
                     npa:forSpace ?space ;
654
                     npa:inverseProperty gen:hasAdmin ;
655
                     npa:forAgent ?publisher .
656
            """;
657

658
    /** Observer self-evidence: the assignee's own pubkey signed the instantiation. */
659
    static final String PUBLISHER_IS_SELF = """
660
            ?acct a npa:AccountState ;
661
                  npa:pubkey ?pkh ;
662
                  npa:agent  ?agent .
663
            """;
664

665
    /**
666
     * Maintainer / Member / Observer tier INSERT. Same shape: find an instantiation
667
     * whose predicate matches a RoleDeclaration of the given tier attached to the
668
     * target space, and whose publisher passes the tier-specific constraint.
669
     */
670
    static String nonAdminTierUpdate(IRI graph, long lastProcessed,
671
                                     IRI tierClass, String publisherConstraint) {
672
        // Order tuned for RDF4J's evaluator (which executes BGPs roughly in order).
673
        // The crucial choice is the *anchor*: instantiation-first plans send the
674
        // planner exploring the full ~thousands of candidate RIs and only filter
675
        // by tier at the very end. Attachment-first anchors on the small set of
676
        // gen:RoleAssignment rows already validated in this space-state graph
677
        // (~hundreds, often zero) and walks outward by bound (?role, ?space).
678
        //
679
        //   1. Anchor on RoleAssignments in this space-state graph (small).
680
        //   2. Match the tier-pinned RoleDeclaration by ?role.
681
        //   3. Pair role-decl direction to instantiation direction in one UNION
682
        //      so only (reg, reg)/(inv, inv) combos are explored.
683
        //   4. Targeted instantiation lookup — (?space, ?pred) are bound.
684
        //   5. Publisher constraint (incl. AccountState resolution).
685
        //   6. Load-number filter on bound ?np.
686
        //   7. Dedup at the end.
687
        return """
69✔
688
                PREFIX npa:  <%1$s>
689
                PREFIX gen:  <%2$s>
690
                INSERT { GRAPH <%3$s> {
691
                  ?ri a gen:RoleInstantiation ;
692
                      npa:forSpace ?space ;
693
                      npa:forAgent ?agent ;
694
                      npa:viaNanopub ?np .
695
                } }
696
                WHERE {
697
                  # 1. Anchor: validated attachments in this space-state graph.
698
                  GRAPH <%3$s> {
699
                    ?ra a gen:RoleAssignment ;
700
                        gen:hasRole  ?role ;
701
                        npa:forSpace ?space .
702
                  }
703
                  # 2. Tier-pinned RoleDeclaration (?role bound from the attachment).
704
                  GRAPH <%4$s> {
705
                    ?rd a npa:RoleDeclaration ;
706
                        npa:hasRoleType <%7$s> ;
707
                        npa:role        ?role ;
708
                        npa:viaNanopub  ?rdNp .
709
                    # 3. Pair direction so only matching combos are explored.
710
                    {
711
                      ?rd gen:hasRegularProperty ?pred .
712
                      ?ri npa:regularProperty    ?pred .
713
                    }
714
                    UNION
715
                    {
716
                      ?rd gen:hasInverseProperty ?pred .
717
                      ?ri npa:inverseProperty    ?pred .
718
                    }
719
                    # 4. Targeted instantiation lookup — (?space, ?pred) bound.
720
                    ?ri a gen:RoleInstantiation ;
721
                        npa:forSpace   ?space ;
722
                        npa:forAgent   ?agent ;
723
                        npa:pubkeyHash ?pkh ;
724
                        npa:viaNanopub ?np .
725
                  }
726
                  # 5. Publisher constraint (incl. AccountState resolution).
727
                  GRAPH <%3$s> {
728
                    %9$s
729
                  }
730
                  # 6. Load-number filter on bound ?np.
731
                  GRAPH <%10$s> {
732
                    ?np npa:hasLoadNumber ?ln .
733
                    FILTER (?ln > %5$d)
734
                  }
735
                  # 7. Invalidation filters — outside the GRAPH block so the
736
                  #    planner defers them until ?rdNp/?np are bound.
737
                  %8$s
738
                  %6$s
739
                  # 8. Dedup last.
740
                  FILTER NOT EXISTS { GRAPH <%3$s> {
741
                    ?existing a gen:RoleInstantiation ;
742
                              npa:forSpace ?space ;
743
                              npa:forAgent ?agent ;
744
                              npa:viaNanopub ?np .
745
                  } }
746
                }
747
                """.formatted(
3✔
748
                NPA.NAMESPACE,
749
                GEN.NAMESPACE,
750
                graph,
751
                SpacesVocab.SPACES_GRAPH,
752
                lastProcessed,
15✔
753
                invalidationFilter("np"),
27✔
754
                tierClass,
755
                invalidationFilter("rdNp"),
30✔
756
                publisherConstraint,
757
                NPA.GRAPH);
758
    }
759

760
    // ---------------- Invalidation templates (incremental cycle) ----------------
761

762
    /**
763
     * WHERE clause shared by the admin-RI invalidation ASK precheck and the
764
     * matching DELETE. Identifies admin-tier {@code gen:RoleInstantiation} rows
765
     * in the space-state graph whose {@code npa:viaNanopub} equals the target
766
     * of an {@code npa:Invalidation} that landed in {@code (lastProcessed, ∞)}.
767
     */
768
    static String adminInvalidationCheckWhere(IRI graph, long lastProcessed) {
769
        return String.format("""
60✔
770
                  GRAPH <%1$s> {
771
                    ?ri a gen:RoleInstantiation ;
772
                        npa:inverseProperty gen:hasAdmin ;
773
                        npa:viaNanopub ?np .
774
                  }
775
                  GRAPH <%2$s> {
776
                    ?inv a npa:Invalidation ;
777
                         npa:invalidates ?np ;
778
                         npa:viaNanopub  ?invNp .
779
                  }
780
                  GRAPH <%3$s> {
781
                    ?invNp npa:hasLoadNumber ?ln .
782
                    FILTER (?ln > %4$d)
783
                  }
784
                """, graph, SpacesVocab.SPACES_GRAPH, NPA.GRAPH, lastProcessed);
6✔
785
    }
786

787
    /** DELETE template for admin-tier RoleInstantiations whose source nanopub was invalidated. */
788
    static String adminInvalidationDelete(IRI graph, long lastProcessed) {
789
        return String.format("""
63✔
790
                PREFIX npa: <%1$s>
791
                PREFIX gen: <%2$s>
792
                DELETE { GRAPH <%3$s> {
793
                  ?ri ?p ?o .
794
                } }
795
                WHERE {
796
                  GRAPH <%3$s> { ?ri ?p ?o . }
797
                %4$s
798
                }
799
                """, NPA.NAMESPACE, GEN.NAMESPACE, graph,
800
                adminInvalidationCheckWhere(graph, lastProcessed));
6✔
801
    }
802

803
    /** WHERE clause for RoleAssignment invalidation. */
804
    static String roleAssignmentInvalidationCheckWhere(IRI graph, long lastProcessed) {
805
        return String.format("""
60✔
806
                  GRAPH <%1$s> {
807
                    ?ra a gen:RoleAssignment ;
808
                        npa:viaNanopub ?np .
809
                  }
810
                  GRAPH <%2$s> {
811
                    ?inv a npa:Invalidation ;
812
                         npa:invalidates ?np ;
813
                         npa:viaNanopub  ?invNp .
814
                  }
815
                  GRAPH <%3$s> {
816
                    ?invNp npa:hasLoadNumber ?ln .
817
                    FILTER (?ln > %4$d)
818
                  }
819
                """, graph, SpacesVocab.SPACES_GRAPH, NPA.GRAPH, lastProcessed);
6✔
820
    }
821

822
    /** DELETE template for RoleAssignments whose source nanopub was invalidated. */
823
    static String roleAssignmentInvalidationDelete(IRI graph, long lastProcessed) {
824
        return String.format("""
63✔
825
                PREFIX npa: <%1$s>
826
                PREFIX gen: <%2$s>
827
                DELETE { GRAPH <%3$s> {
828
                  ?ra ?p ?o .
829
                } }
830
                WHERE {
831
                  GRAPH <%3$s> { ?ra ?p ?o . }
832
                %4$s
833
                }
834
                """, NPA.NAMESPACE, GEN.NAMESPACE, graph,
835
                roleAssignmentInvalidationCheckWhere(graph, lastProcessed));
6✔
836
    }
837

838
    /**
839
     * WHERE clause for RoleDeclaration invalidation. ASK-only (no DELETE):
840
     * RoleDeclarations live in {@code npa:spacesGraph} and aren't materialized
841
     * into the space-state graph, so there's nothing to remove from the
842
     * space-state. The ASK still flips {@code npa:needsFullRebuild} because
843
     * sticky downstream RIs that were derived under the now-invalidated RD
844
     * need a from-scratch recompute.
845
     */
846
    static String roleDeclarationInvalidationCheckWhere(long lastProcessed) {
847
        return String.format("""
48✔
848
                  GRAPH <%1$s> {
849
                    ?rd a npa:RoleDeclaration ;
850
                        npa:viaNanopub ?np .
851
                    ?inv a npa:Invalidation ;
852
                         npa:invalidates ?np ;
853
                         npa:viaNanopub  ?invNp .
854
                  }
855
                  GRAPH <%2$s> {
856
                    ?invNp npa:hasLoadNumber ?ln .
857
                    FILTER (?ln > %3$d)
858
                  }
859
                """, SpacesVocab.SPACES_GRAPH, NPA.GRAPH, lastProcessed);
6✔
860
    }
861

862
    /**
863
     * DELETE template for non-admin (leaf-tier) RoleInstantiations whose source
864
     * nanopub was invalidated. Identified as {@code gen:RoleInstantiation} rows
865
     * lacking the admin-pinning {@code npa:inverseProperty gen:hasAdmin} triple.
866
     * No flag is set; leaf-tier removals are recoverable on the next cycle.
867
     */
868
    static String leafTierInvalidationDelete(IRI graph, long lastProcessed) {
869
        return String.format("""
84✔
870
                PREFIX npa: <%1$s>
871
                PREFIX gen: <%2$s>
872
                DELETE { GRAPH <%3$s> {
873
                  ?ri ?p ?o .
874
                } }
875
                WHERE {
876
                  GRAPH <%3$s> {
877
                    ?ri a gen:RoleInstantiation ;
878
                        npa:viaNanopub ?np .
879
                    FILTER NOT EXISTS { ?ri npa:inverseProperty gen:hasAdmin }
880
                    ?ri ?p ?o .
881
                  }
882
                  GRAPH <%4$s> {
883
                    ?inv a npa:Invalidation ;
884
                         npa:invalidates ?np ;
885
                         npa:viaNanopub  ?invNp .
886
                  }
887
                  GRAPH <%5$s> {
888
                    ?invNp npa:hasLoadNumber ?ln .
889
                    FILTER (?ln > %6$d)
890
                  }
891
                }
892
                """, NPA.NAMESPACE, GEN.NAMESPACE, graph,
893
                SpacesVocab.SPACES_GRAPH, NPA.GRAPH, lastProcessed);
6✔
894
    }
895

896
    /** Wraps an ASK by joining the shared prefixes. */
897
    private boolean wouldInvalidate(IRI graph, long lastProcessed,
898
                                    boolean adminPinned, String whereClause) {
899
        // adminPinned is informational only — kept to make call sites read clearly;
900
        // the WHERE clause already encodes the kind via its own type predicates.
901
        String ask = String.format("""
×
902
                PREFIX npa: <%1$s>
903
                PREFIX gen: <%2$s>
904
                ASK { %3$s }
905
                """, NPA.NAMESPACE, GEN.NAMESPACE, whereClause);
906
        return runAsk(ask);
×
907
    }
908

909
    private boolean runAsk(String sparql) {
910
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
911
            return conn.prepareBooleanQuery(QueryLanguage.SPARQL, sparql).evaluate();
×
912
        }
913
    }
914

915
    private void executeUpdate(String sparqlUpdate) {
916
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
917
            conn.prepareUpdate(QueryLanguage.SPARQL, sparqlUpdate).execute();
×
918
        }
919
    }
×
920

921
    // ---------------- Mirror step ----------------
922

923
    /**
924
     * Copies trust-approved {@code npa:AccountState} rows from {@code npat:<T>}
925
     * in the {@code trust} repo into {@code newGraph} in the {@code spaces} repo,
926
     * inside one spaces-side serializable transaction.
927
     *
928
     * @return number of rows mirrored (useful for metrics / logging)
929
     */
930
    int mirrorTrustState(String trustStateHash, IRI newGraph) {
931
        IRI trustStateIri = NPAT.forHash(trustStateHash);
×
932
        int count = 0;
×
933
        try (RepositoryConnection trustConn = TripleStore.get().getRepoConnection(TRUST_REPO);
×
934
             RepositoryConnection spacesConn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
935
            trustConn.begin(IsolationLevels.READ_COMMITTED);
×
936
            spacesConn.begin(IsolationLevels.SERIALIZABLE);
×
937
            // Walk rdf:type triples in the trust state's graph; for each AccountState,
938
            // check status and copy the approved ones verbatim (minus status-specific
939
            // detail triples, which we don't need for validation).
940
            try (RepositoryResult<Statement> typeRows = trustConn.getStatements(
×
941
                    null, RDF.TYPE, NPA_ACCOUNT_STATE, trustStateIri)) {
942
                while (typeRows.hasNext()) {
×
943
                    Statement st = typeRows.next();
×
944
                    if (!(st.getSubject() instanceof IRI accountStateIri)) continue;
×
945
                    Value status = trustConn.getStatements(accountStateIri, NPA_TRUST_STATUS, null, trustStateIri)
×
946
                            .stream().findFirst().map(Statement::getObject).orElse(null);
×
947
                    if (!(status instanceof IRI statusIri) || !APPROVED_SET.contains(statusIri)) continue;
×
948
                    Value agent = trustConn.getStatements(accountStateIri, NPA_AGENT, null, trustStateIri)
×
949
                            .stream().findFirst().map(Statement::getObject).orElse(null);
×
950
                    Value pubkey = trustConn.getStatements(accountStateIri, NPA_PUBKEY, null, trustStateIri)
×
951
                            .stream().findFirst().map(Statement::getObject).orElse(null);
×
952
                    if (agent == null || pubkey == null) {
×
953
                        log.warn("AuthorityResolver.mirror: account {} missing agent or pubkey; skipping",
×
954
                                accountStateIri);
955
                        continue;
×
956
                    }
957
                    spacesConn.add(accountStateIri, RDF.TYPE, NPA_ACCOUNT_STATE, newGraph);
×
958
                    spacesConn.add(accountStateIri, NPA_AGENT, agent, newGraph);
×
959
                    spacesConn.add(accountStateIri, NPA_PUBKEY, pubkey, newGraph);
×
960
                    spacesConn.add(accountStateIri, NPA_TRUST_STATUS, statusIri, newGraph);
×
961
                    count++;
×
962
                }
×
963
            }
964
            spacesConn.commit();
×
965
            trustConn.commit();
×
966
        }
967
        return count;
×
968
    }
969

970
    // ---------------- Pointer + counter helpers ----------------
971

972
    /**
973
     * Reads the current {@code npa:hasCurrentSpaceState} pointer from the
974
     * {@code npa:graph} admin graph of the {@code spaces} repo. Returns
975
     * {@code null} if no pointer exists yet.
976
     */
977
    IRI getCurrentSpaceStateGraph() {
978
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
979
            Value v = Utils.getObjectForPattern(conn, NPA.GRAPH, NPA.THIS_REPO,
×
980
                    SpacesVocab.HAS_CURRENT_SPACE_STATE);
981
            return (v instanceof IRI iri) ? iri : null;
×
982
        } catch (Exception ex) {
×
983
            log.warn("AuthorityResolver: failed to read hasCurrentSpaceState pointer: {}", ex.toString());
×
984
            return null;
×
985
        }
986
    }
987

988
    long getCurrentLoadCounter() {
989
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
990
            Value v = Utils.getObjectForPattern(conn, NPA.GRAPH, NPA.THIS_REPO,
×
991
                    SpacesVocab.CURRENT_LOAD_COUNTER);
992
            if (v == null) return 0;
×
993
            try {
994
                return Long.parseLong(v.stringValue());
×
995
            } catch (NumberFormatException ex) {
×
996
                log.warn("AuthorityResolver: non-numeric currentLoadCounter: {}", v);
×
997
                return 0;
×
998
            }
999
        } catch (Exception ex) {
×
1000
            log.warn("AuthorityResolver: failed to read currentLoadCounter: {}", ex.toString());
×
1001
            return 0;
×
1002
        }
1003
    }
1004

1005
    /**
1006
     * Atomic pointer flip: a single SPARQL {@code DELETE … INSERT … WHERE}
1007
     * replaces the old pointer with the new one in one statement, so readers
1008
     * never see a zero-pointer window.
1009
     */
1010
    void flipPointer(IRI newGraph) {
1011
        String update = String.format("""
×
1012
                DELETE { GRAPH <%s> { <%s> <%s> ?old } }
1013
                INSERT { GRAPH <%s> { <%s> <%s> <%s> } }
1014
                WHERE  { OPTIONAL { GRAPH <%s> { <%s> <%s> ?old } } }
1015
                """,
1016
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.HAS_CURRENT_SPACE_STATE,
1017
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.HAS_CURRENT_SPACE_STATE, newGraph,
1018
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.HAS_CURRENT_SPACE_STATE);
1019
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1020
            conn.begin(IsolationLevels.SERIALIZABLE);
×
1021
            conn.prepareUpdate(QueryLanguage.SPARQL, update).execute();
×
1022
            conn.commit();
×
1023
        }
1024
    }
×
1025

1026
    void writeProcessedUpTo(IRI graph, long loadCounter) {
1027
        String update = String.format("""
×
1028
                DELETE { GRAPH <%s> { <%s> <%s> ?old } }
1029
                INSERT { GRAPH <%s> { <%s> <%s> "%d"^^<http://www.w3.org/2001/XMLSchema#long> } }
1030
                WHERE  { OPTIONAL { GRAPH <%s> { <%s> <%s> ?old } } }
1031
                """,
1032
                graph, graph, SpacesVocab.PROCESSED_UP_TO,
1033
                graph, graph, SpacesVocab.PROCESSED_UP_TO, loadCounter,
×
1034
                graph, graph, SpacesVocab.PROCESSED_UP_TO);
1035
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1036
            conn.begin(IsolationLevels.SERIALIZABLE);
×
1037
            conn.prepareUpdate(QueryLanguage.SPARQL, update).execute();
×
1038
            conn.commit();
×
1039
        }
1040
    }
×
1041

1042
    /**
1043
     * Reads {@code processedUpTo} from the given space-state graph.
1044
     * Returns {@code -1} if absent (graph not fully built yet).
1045
     */
1046
    long readProcessedUpTo(IRI graph) {
1047
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1048
            String query = String.format(
×
1049
                    "SELECT ?n WHERE { GRAPH <%s> { <%s> <%s> ?n } }",
1050
                    graph, graph, SpacesVocab.PROCESSED_UP_TO);
1051
            try (TupleQueryResult r = conn.prepareTupleQuery(QueryLanguage.SPARQL, query).evaluate()) {
×
1052
                if (!r.hasNext()) return -1;
×
1053
                BindingSet b = r.next();
×
1054
                return Long.parseLong(b.getBinding("n").getValue().stringValue());
×
1055
            }
×
1056
        } catch (Exception ex) {
×
1057
            log.warn("AuthorityResolver: failed to read processedUpTo for {}: {}", graph, ex.toString());
×
1058
            return -1;
×
1059
        }
1060
    }
1061

1062
    /**
1063
     * Reads the {@code npa:needsFullRebuild} flag (boolean literal) from
1064
     * {@code npa:graph} in the {@code spaces} repo. Defaults to {@code false}
1065
     * when the triple is absent.
1066
     */
1067
    boolean readNeedsFullRebuild() {
1068
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1069
            Value v = Utils.getObjectForPattern(conn, NPA.GRAPH, NPA.THIS_REPO,
×
1070
                    SpacesVocab.NEEDS_FULL_REBUILD);
1071
            return v != null && Boolean.parseBoolean(v.stringValue());
×
1072
        } catch (Exception ex) {
×
1073
            log.warn("AuthorityResolver: failed to read needsFullRebuild: {}", ex.toString());
×
1074
            return false;
×
1075
        }
1076
    }
1077

1078
    void setNeedsFullRebuild() {
1079
        writeNeedsFullRebuild(true);
×
1080
    }
×
1081

1082
    void clearNeedsFullRebuild() {
1083
        writeNeedsFullRebuild(false);
×
1084
    }
×
1085

1086
    private void writeNeedsFullRebuild(boolean value) {
1087
        String update = String.format("""
×
1088
                DELETE { GRAPH <%s> { <%s> <%s> ?old } }
1089
                INSERT { GRAPH <%s> { <%s> <%s> "%s"^^<http://www.w3.org/2001/XMLSchema#boolean> } }
1090
                WHERE  { OPTIONAL { GRAPH <%s> { <%s> <%s> ?old } } }
1091
                """,
1092
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.NEEDS_FULL_REBUILD,
1093
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.NEEDS_FULL_REBUILD, value,
×
1094
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.NEEDS_FULL_REBUILD);
1095
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1096
            conn.begin(IsolationLevels.SERIALIZABLE);
×
1097
            conn.prepareUpdate(QueryLanguage.SPARQL, update).execute();
×
1098
            conn.commit();
×
1099
        }
1100
    }
×
1101

1102
    void dropGraph(IRI graph) {
1103
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1104
            conn.begin(IsolationLevels.SERIALIZABLE);
×
1105
            conn.clear(graph);
×
1106
            conn.commit();
×
1107
            log.info("AuthorityResolver: dropped old space-state graph {}", graph);
×
1108
        }
1109
    }
×
1110

1111
    // ---------------- Trust-repo pointer lookup (used by TrustStateRegistry's bootstrap) ----------------
1112

1113
    /**
1114
     * Queries the {@code trust} repo directly for the current trust-state hash.
1115
     * Prefer {@link TrustStateRegistry#getCurrentHash()} in normal operation —
1116
     * this helper exists for tests and diagnostics.
1117
     *
1118
     * @return the current trust-state hash, or empty if none is set
1119
     */
1120
    Optional<String> readTrustRepoCurrentHash() {
1121
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(TRUST_REPO)) {
×
1122
            Value v = Utils.getObjectForPattern(conn, NPA.GRAPH, NPA.THIS_REPO,
×
1123
                    NPA_HAS_CURRENT_TRUST_STATE);
1124
            if (!(v instanceof IRI iri)) return Optional.empty();
×
1125
            String s = iri.stringValue();
×
1126
            if (!s.startsWith(NPAT.NAMESPACE)) return Optional.empty();
×
1127
            return Optional.of(s.substring(NPAT.NAMESPACE.length()));
×
1128
        } catch (Exception ex) {
×
1129
            log.warn("AuthorityResolver: failed to read trust-repo current pointer: {}", ex.toString());
×
1130
            return Optional.empty();
×
1131
        }
1132
    }
1133

1134
    private static String abbrev(String hash) {
1135
        return hash.length() > 12 ? hash.substring(0, 12) + "…" : hash;
×
1136
    }
1137

1138
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc