• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-query / 24993003870

27 Apr 2026 11:43AM UTC coverage: 56.311% (-0.3%) from 56.577%
24993003870

Pull #89

github

web-flow
Merge 2895d896c into 8f22e9f64
Pull Request #89: feat: validate downward observer-tier grants from admin/maintainer/member (#62)

404 of 804 branches covered (50.25%)

Branch coverage included in aggregate %.

1153 of 1961 relevant lines covered (58.8%)

8.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

12.88
src/main/java/com/knowledgepixels/query/AuthorityResolver.java
1
package com.knowledgepixels.query;
2

3
import java.util.ArrayList;
4
import java.util.List;
5
import java.util.Optional;
6
import java.util.Set;
7

8
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
9
import org.eclipse.rdf4j.model.IRI;
10
import org.eclipse.rdf4j.model.Statement;
11
import org.eclipse.rdf4j.model.Value;
12
import org.eclipse.rdf4j.model.ValueFactory;
13
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
14
import org.eclipse.rdf4j.model.vocabulary.RDF;
15
import org.eclipse.rdf4j.query.BindingSet;
16
import org.eclipse.rdf4j.query.QueryLanguage;
17
import org.eclipse.rdf4j.query.TupleQueryResult;
18
import org.eclipse.rdf4j.repository.RepositoryConnection;
19
import org.eclipse.rdf4j.repository.RepositoryResult;
20
import org.nanopub.vocabulary.NPA;
21
import org.slf4j.Logger;
22
import org.slf4j.LoggerFactory;
23

24
import com.knowledgepixels.query.vocabulary.GEN;
25
import com.knowledgepixels.query.vocabulary.NPAT;
26
import com.knowledgepixels.query.vocabulary.SpacesVocab;
27

28
/**
29
 * Drives the space-state materialization pipeline. Three entry points scheduled
30
 * by {@code MainVerticle}:
31
 * <ul>
32
 *   <li>{@link #tick()} — detects trust-state flips (full build) and otherwise
33
 *       advances the current space-state graph by an {@link #runIncrementalCycle
34
 *       incremental cycle} bounded by {@code (processedUpTo, currentLoadCounter]}.</li>
35
 *   <li>{@link #periodicRebuildTick()} — checks the {@code npa:needsFullRebuild}
36
 *       flag set by structural invalidations and re-runs the full build into a
37
 *       fresh graph, atomically flips the pointer, drops the old graph.</li>
38
 *   <li>{@link #cleanOrphans()} — startup cleanup of {@code npass:*} graphs the
39
 *       pointer isn't referencing.</li>
40
 * </ul>
41
 *
42
 * <p>Incremental cycle order: invalidation DELETEs (admin RI / RoleAssignment /
43
 * non-admin RI) → mirror-step delta is implicit (rebuilt only on full build) →
44
 * per-tier INSERTs (admin → attachment → maintainer → member → observer) →
45
 * late-arrival sweep (re-run downstream tiers without the load-number filter
46
 * iff this cycle added any structural rows). Sets {@code npa:needsFullRebuild}
47
 * when an admin RI / RoleAssignment / RoleDeclaration was invalidated; periodic
48
 * worker turns the flag into a from-scratch rebuild.
49
 *
50
 * <p>See {@code doc/design-space-repositories.md} — this implements the "Full
51
 * build", "Incremental cycle", and "Periodic full rebuild" procedures.
52
 */
53
public final class AuthorityResolver {
54

55
    private static final Logger log = LoggerFactory.getLogger(AuthorityResolver.class);
9✔
56

57
    private static final ValueFactory vf = SimpleValueFactory.getInstance();
6✔
58

59
    private static final String SPACES_REPO = "spaces";
60
    private static final String TRUST_REPO = "trust";
61

62
    /** NPA constants pulled in locally (trust-side). */
63
    private static final IRI NPA_HAS_CURRENT_TRUST_STATE =
9✔
64
            vf.createIRI(NPA.NAMESPACE, "hasCurrentTrustState");
6✔
65
    private static final IRI NPA_ACCOUNT_STATE = vf.createIRI(NPA.NAMESPACE, "AccountState");
15✔
66
    private static final IRI NPA_AGENT = vf.createIRI(NPA.NAMESPACE, "agent");
15✔
67
    private static final IRI NPA_PUBKEY = vf.createIRI(NPA.NAMESPACE, "pubkey");
15✔
68
    private static final IRI NPA_TRUST_STATUS = vf.createIRI(NPA.NAMESPACE, "trustStatus");
15✔
69
    private static final IRI NPA_LOADED = vf.createIRI(NPA.NAMESPACE, "loaded");
15✔
70
    private static final IRI NPA_TO_LOAD = vf.createIRI(NPA.NAMESPACE, "toLoad");
15✔
71

72
    /**
73
     * Trust-approved set: rows with one of these {@code npa:trustStatus} values
74
     * are mirrored into the space-state graph. Per
75
     * {@code doc/design-trust-state-repos.md}, these are the two "authority-
76
     * approving" statuses; {@code npa:contested}, {@code npa:skipped}, and the
77
     * transient statuses are distinct values of the same predicate and are
78
     * excluded automatically by this positive-list filter.
79
     */
80
    private static final Set<IRI> APPROVED_SET = Set.of(NPA_LOADED, NPA_TO_LOAD);
15✔
81

82
    private static AuthorityResolver instance;
83

84
    /** Returns the singleton. */
85
    public static synchronized AuthorityResolver get() {
86
        if (instance == null) {
6✔
87
            instance = new AuthorityResolver();
12✔
88
        }
89
        return instance;
6✔
90
    }
91

92
    private AuthorityResolver() {
6✔
93
    }
3✔
94

95
    // ---------------- Operational metrics snapshot ----------------
96
    //
97
    // Updated at the end of each runFullBuild / runIncrementalCycle, read by
98
    // MetricsCollector via the get*() accessors below. volatile is enough —
99
    // writers serialise via the synchronized methods, and readers (Prometheus
100
    // scrapes) only need most-recent visibility, not transactional consistency
101
    // across the snapshot. Defaults to zero values so a scrape that races a
102
    // boot before the first cycle returns 0, not NaN.
103

104
    private volatile TierSubjectTotals lastSubjectTotals = new TierSubjectTotals(0L, 0L, 0L);
24✔
105
    private volatile long lastInsertedTriplesTotal;
106
    private volatile long lastFullBuildDurationMs;
107
    private volatile long lastIncrementalCycleDurationMs;
108
    private volatile long lastProcessedUpToLag;
109

110
    public TierSubjectTotals getLastSubjectTotals() { return lastSubjectTotals; }
9✔
111
    public long getLastInsertedTriplesTotal() { return lastInsertedTriplesTotal; }
9✔
112
    public long getLastFullBuildDurationMs() { return lastFullBuildDurationMs; }
9✔
113
    public long getLastIncrementalCycleDurationMs() { return lastIncrementalCycleDurationMs; }
9✔
114
    public long getLastProcessedUpToLag() { return lastProcessedUpToLag; }
9✔
115

116
    // ---------------- Public entry points ----------------
117

118
    /**
119
     * Poll entry point. Behaviour:
120
     * <ul>
121
     *   <li>If no current space-state graph or the trust state has flipped → full build.</li>
122
     *   <li>Otherwise → {@link #runIncrementalCycle incremental cycle} on the load-number
123
     *       delta {@code (processedUpTo, currentLoadCounter]}. No-op if {@code
124
     *       processedUpTo == currentLoadCounter}.</li>
125
     * </ul>
126
     * Safe to call repeatedly on a schedule. Gated by {@link FeatureFlags#spacesEnabled()}.
127
     */
128
    public void tick() {
129
        if (!FeatureFlags.spacesEnabled()) return;
6!
130
        String trustStateHash = TrustStateRegistry.get().getCurrentHash().orElse(null);
18✔
131
        if (trustStateHash == null) {
6!
132
            log.debug("AuthorityResolver.tick: no current trust state yet — skipping");
9✔
133
            return;
3✔
134
        }
135
        IRI currentGraph = getCurrentSpaceStateGraph();
×
136
        String currentGraphName = (currentGraph == null) ? null
×
137
                : currentGraph.stringValue().substring(SpacesVocab.NPASS_NAMESPACE.length());
×
138
        if (currentGraphName == null || !currentGraphName.startsWith(trustStateHash + "_")) {
×
139
            log.info("AuthorityResolver.tick: trust-state flip detected (now {}); running full build",
×
140
                    abbrev(trustStateHash));
×
141
            runFullBuild(trustStateHash);
×
142
            return;
×
143
        }
144
        runIncrementalCycle(currentGraph);
×
145
    }
×
146

147
    /**
148
     * Periodic worker. If {@code npa:needsFullRebuild} was raised by an
149
     * incremental cycle's structural DELETE, runs a from-scratch rebuild into
150
     * a fresh space-state graph (using the current trust-state hash and load
151
     * counter) and clears the flag. No-op when the flag is not set. Safe to
152
     * call concurrently with {@link #tick()} when both are scheduled on the
153
     * same single-threaded executor.
154
     */
155
    public void periodicRebuildTick() {
156
        if (!FeatureFlags.spacesEnabled()) return;
×
157
        if (!readNeedsFullRebuild()) return;
×
158
        String trustStateHash = TrustStateRegistry.get().getCurrentHash().orElse(null);
×
159
        if (trustStateHash == null) {
×
160
            log.debug("AuthorityResolver.periodicRebuildTick: no current trust state — deferring");
×
161
            return;
×
162
        }
163
        log.info("AuthorityResolver.periodicRebuildTick: needsFullRebuild flag set; rebuilding");
×
164
        runFullBuild(trustStateHash);
×
165
        clearNeedsFullRebuild();
×
166
    }
×
167

168
    /**
169
     * Startup cleanup: drop any {@code npass:*} graph that the
170
     * {@code npa:hasCurrentSpaceState} pointer isn't pointing at. Orphans come
171
     * from crashes mid-build. Safe to call at any time; idempotent.
172
     */
173
    public void cleanOrphans() {
174
        if (!FeatureFlags.spacesEnabled()) return;
×
175
        IRI current = getCurrentSpaceStateGraph();
×
176
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
177
            int dropped = 0;
×
178
            try (RepositoryResult<org.eclipse.rdf4j.model.Resource> ctxs = conn.getContextIDs()) {
×
179
                List<IRI> toDrop = new ArrayList<>();
×
180
                while (ctxs.hasNext()) {
×
181
                    org.eclipse.rdf4j.model.Resource ctx = ctxs.next();
×
182
                    if (!(ctx instanceof IRI iri)) continue;
×
183
                    if (!iri.stringValue().startsWith(SpacesVocab.NPASS_NAMESPACE)) continue;
×
184
                    if (iri.equals(current)) continue;
×
185
                    toDrop.add(iri);
×
186
                }
×
187
                for (IRI iri : toDrop) {
×
188
                    conn.begin(IsolationLevels.SERIALIZABLE);
×
189
                    conn.clear(iri);
×
190
                    conn.commit();
×
191
                    dropped++;
×
192
                    log.info("AuthorityResolver.cleanOrphans: dropped orphan graph {}", iri);
×
193
                }
×
194
            }
195
            if (dropped == 0) {
×
196
                log.debug("AuthorityResolver.cleanOrphans: no orphan space-state graphs");
×
197
            }
198
        } catch (Exception ex) {
×
199
            log.info("AuthorityResolver.cleanOrphans: failed: {}", ex.toString());
×
200
        }
×
201
    }
×
202

203
    // ---------------- Full build ----------------
204

205
    /**
206
     * Mutex-protected full build of the space-state graph for the given trust
207
     * state. Captures {@code M = currentLoadCounter}, mirrors trust-approved
208
     * rows, (PR 2b: runs per-tier UPDATE loops from scratch), stamps
209
     * {@code processedUpTo = M}, flips the pointer, drops the previous graph.
210
     */
211
    synchronized void runFullBuild(String trustStateHash) {
212
        long startNanos = System.nanoTime();
×
213
        long loadCounter = getCurrentLoadCounter();
×
214
        IRI newGraph = SpacesVocab.forSpaceState(trustStateHash, loadCounter);
×
215
        IRI oldGraph = getCurrentSpaceStateGraph();
×
216
        if (newGraph.equals(oldGraph)) {
×
217
            log.debug("AuthorityResolver.runFullBuild: already current at {}", newGraph);
×
218
            return;
×
219
        }
220

221
        // 1. Mirror trust-approved rows into the new graph.
222
        int mirrored = mirrorTrustState(trustStateHash, newGraph);
×
223

224
        // 2. Per-tier UPDATE loops (from scratch: lastProcessed = -1 so the
225
        //    delta filter FILTER(?ln > ?lastProcessed) includes everything).
226
        TierInsertedTriples counts = runAllTierLoops(newGraph, -1);
×
227

228
        // 3. Stamp processedUpTo inside the new graph.
229
        writeProcessedUpTo(newGraph, loadCounter);
×
230

231
        // 4. Flip the current-space-state pointer.
232
        flipPointer(newGraph);
×
233

234
        // 5. Drop the old graph if one existed.
235
        if (oldGraph != null) {
×
236
            dropGraph(oldGraph);
×
237
        }
238

239
        TierSubjectTotals totals = computeTierSubjectTotals(newGraph);
×
240
        long durationMs = (System.nanoTime() - startNanos) / 1_000_000L;
×
241
        lastSubjectTotals = totals;
×
242
        lastInsertedTriplesTotal = (long) counts.admin + counts.attachment
×
243
                + counts.maintainer + counts.member + counts.observer;
244
        lastFullBuildDurationMs = durationMs;
×
245
        lastProcessedUpToLag = 0L;
×
246
        log.info("AuthorityResolver: full build complete — graph={} mirrored={} rows loadCounter={} "
×
247
                        + "subjects: adminRIs={} attachmentRAs={} nonAdminRIs={} "
248
                        + "(inserted-triples: admin={} attachment={} maintainer={} member={} observer={}) "
249
                        + "durationMs={}",
250
                newGraph, mirrored, loadCounter,
×
251
                totals.adminRIs(), totals.attachmentRAs(), totals.nonAdminRIs(),
×
252
                counts.admin, counts.attachment, counts.maintainer, counts.member, counts.observer,
×
253
                durationMs);
×
254
    }
×
255

256
    // ---------------- Incremental cycle ----------------
257

258
    /**
259
     * Single delta cycle on the current space-state graph. Bounded by
260
     * {@code (processedUpTo, currentLoadCounter]}; no-op if the range is empty.
261
     *
262
     * <p>Order:
263
     * <ol>
264
     *   <li>Apply invalidation DELETEs (admin RI, RoleAssignment, non-admin RI)
265
     *       and the RoleDeclaration ASK. Any DELETE on a structural kind sets
266
     *       {@code npa:needsFullRebuild} to bound the staleness from sticky
267
     *       downstream entries; the periodic worker turns that into a from-scratch
268
     *       rebuild on its next pass.</li>
269
     *   <li>Run per-tier INSERTs in the same order as the full build.</li>
270
     *   <li>Late-arrival sweep: if any structural row was added, re-run downstream
271
     *       tier INSERTs with {@code lastProcessed = -1} to catch candidates whose
272
     *       enabling event landed in this same cycle. Dedup filters protect
273
     *       against double-insert.</li>
274
     *   <li>Bump {@code processedUpTo} to {@code currentLoadCounter}.</li>
275
     * </ol>
276
     */
277
    synchronized void runIncrementalCycle(IRI graph) {
278
        long startNanos = System.nanoTime();
×
279
        long currentLoadCounter = getCurrentLoadCounter();
×
280
        long lastProcessed = readProcessedUpTo(graph);
×
281
        if (lastProcessed < 0) {
×
282
            log.warn("AuthorityResolver.runIncrementalCycle: missing processedUpTo on {}; skipping",
×
283
                    graph);
284
            return;
×
285
        }
286
        lastProcessedUpToLag = currentLoadCounter - lastProcessed;
×
287
        if (currentLoadCounter <= lastProcessed) {
×
288
            log.debug("AuthorityResolver.runIncrementalCycle: caught up at load {} on {}",
×
289
                    currentLoadCounter, graph);
×
290
            return;
×
291
        }
292

293
        boolean structuralInvalidation = applyInvalidations(graph, lastProcessed);
×
294
        TierInsertedTriples counts = runAllTierLoops(graph, lastProcessed);
×
295
        boolean structuralAdds = (counts.admin > 0)
×
296
                || (counts.attachment > 0)
297
                || newRoleDeclarationsArrived(lastProcessed);
×
298
        if (structuralAdds) {
×
299
            // Late-arrival sweep: only the leaf tiers (attachment/maintainer/member/observer)
300
            // can promote candidates whose enabling event arrived in this same cycle. Skip
301
            // the admin tier — its only enabling event is the admin grant itself, already
302
            // handled by the regular pass.
303
            TierInsertedTriples lateCounts = runDownstreamWithoutLoadFilter(graph);
×
304
            counts.attachment += lateCounts.attachment;
×
305
            counts.maintainer += lateCounts.maintainer;
×
306
            counts.member     += lateCounts.member;
×
307
            counts.observer   += lateCounts.observer;
×
308
        }
309

310
        writeProcessedUpTo(graph, currentLoadCounter);
×
311

312
        TierSubjectTotals totals = computeTierSubjectTotals(graph);
×
313
        long durationMs = (System.nanoTime() - startNanos) / 1_000_000L;
×
314
        lastSubjectTotals = totals;
×
315
        lastInsertedTriplesTotal = (long) counts.admin + counts.attachment
×
316
                + counts.maintainer + counts.member + counts.observer;
317
        lastIncrementalCycleDurationMs = durationMs;
×
318
        log.info("AuthorityResolver: incremental cycle complete — graph={} delta=({}, {}] "
×
319
                        + "subjects: adminRIs={} attachmentRAs={} nonAdminRIs={} "
320
                        + "(inserted-triples: admin={} attachment={} maintainer={} member={} observer={}) "
321
                        + "structuralInvalidation={} structuralAdds={} durationMs={}",
322
                graph, lastProcessed, currentLoadCounter,
×
323
                totals.adminRIs(), totals.attachmentRAs(), totals.nonAdminRIs(),
×
324
                counts.admin, counts.attachment, counts.maintainer, counts.member, counts.observer,
×
325
                structuralInvalidation, structuralAdds, durationMs);
×
326
    }
×
327

328
    /**
329
     * Runs the four invalidation-DELETE / ASK steps. Sets {@code npa:needsFullRebuild}
330
     * when admin-RI, RoleAssignment, or RoleDeclaration invalidations matched (the
331
     * three structural kinds). Leaf-tier RI deletes don't set the flag.
332
     *
333
     * @return true iff at least one structural kind was invalidated
334
     */
335
    boolean applyInvalidations(IRI graph, long lastProcessed) {
336
        boolean structural = false;
×
337
        if (wouldInvalidate(graph, lastProcessed, /*adminPinned=*/ true,
×
338
                            adminInvalidationCheckWhere(graph, lastProcessed))) {
×
339
            executeUpdate(adminInvalidationDelete(graph, lastProcessed));
×
340
            structural = true;
×
341
        }
342
        if (wouldInvalidate(graph, lastProcessed, /*adminPinned=*/ false,
×
343
                            roleAssignmentInvalidationCheckWhere(graph, lastProcessed))) {
×
344
            executeUpdate(roleAssignmentInvalidationDelete(graph, lastProcessed));
×
345
            structural = true;
×
346
        }
347
        // RoleDeclaration ASK only — RDs aren't materialized into the space-state
348
        // graph, so there's nothing to DELETE here. The flag still flips because
349
        // sticky downstream RIs derived from the now-invalidated RD need a
350
        // from-scratch recompute.
351
        if (wouldInvalidate(graph, lastProcessed, /*adminPinned=*/ false,
×
352
                            roleDeclarationInvalidationCheckWhere(lastProcessed))) {
×
353
            structural = true;
×
354
        }
355
        // Leaf-tier RI deletes — no flag.
356
        executeUpdate(leafTierInvalidationDelete(graph, lastProcessed));
×
357
        if (structural) setNeedsFullRebuild();
×
358
        return structural;
×
359
    }
360

361
    /**
362
     * Runs the four leaf tiers (attachment/maintainer/member/observer) with
363
     * {@code lastProcessed = -1} so the load-number filter on the candidate
364
     * side admits everything. Dedup filters in the tier templates prevent
365
     * double-insert. Used by the late-arrival sweep.
366
     */
367
    TierInsertedTriples runDownstreamWithoutLoadFilter(IRI graph) {
368
        TierInsertedTriples c = new TierInsertedTriples();
×
369
        c.attachment = runTierLabeled("attachment(late)", graph,
×
370
                attachmentValidationUpdate(graph, -1));
×
371
        c.maintainer = runTierLabeled("maintainer(late)", graph,
×
372
                nonAdminTierUpdate(graph, -1, GEN.MAINTAINER_ROLE, PUBLISHER_IS_ADMIN));
×
373
        c.member = runTierLabeled("member(admin-pub,late)", graph,
×
374
                nonAdminTierUpdate(graph, -1, GEN.MEMBER_ROLE, PUBLISHER_IS_ADMIN));
×
375
        c.member += runTierLabeled("member(maint-pub,late)", graph,
×
376
                nonAdminTierUpdate(graph, -1,
×
377
                        GEN.MEMBER_ROLE, publisherIsTieredRole(GEN.MAINTAINER_ROLE)));
×
378
        c.observer = runTierLabeled("observer(admin-pub,late)", graph,
×
379
                nonAdminTierUpdate(graph, -1, GEN.OBSERVER_ROLE, PUBLISHER_IS_ADMIN));
×
380
        c.observer += runTierLabeled("observer(maint-pub,late)", graph,
×
381
                nonAdminTierUpdate(graph, -1,
×
382
                        GEN.OBSERVER_ROLE, publisherIsTieredRole(GEN.MAINTAINER_ROLE)));
×
383
        c.observer += runTierLabeled("observer(member-pub,late)", graph,
×
384
                nonAdminTierUpdate(graph, -1,
×
385
                        GEN.OBSERVER_ROLE, publisherIsTieredRole(GEN.MEMBER_ROLE)));
×
386
        c.observer += runTierLabeled("observer(self,late)", graph,
×
387
                nonAdminTierUpdate(graph, -1, GEN.OBSERVER_ROLE, PUBLISHER_IS_SELF));
×
388
        return c;
×
389
    }
390

391
    /**
392
     * Cheap ASK: did any new {@code npa:RoleDeclaration} extraction land in the
393
     * load-number delta {@code (lastProcessed, ∞)}? Used by the late-arrival
394
     * trigger so an RD that arrives in the same cycle as a matching candidate
395
     * still gets validated.
396
     */
397
    boolean newRoleDeclarationsArrived(long lastProcessed) {
398
        String ask = String.format("""
×
399
                PREFIX npa: <%1$s>
400
                ASK {
401
                  GRAPH <%2$s> {
402
                    ?rd a npa:RoleDeclaration ;
403
                        npa:viaNanopub ?np .
404
                  }
405
                  GRAPH <%3$s> {
406
                    ?np npa:hasLoadNumber ?ln .
407
                    FILTER (?ln > %4$d)
408
                  }
409
                }
410
                """, NPA.NAMESPACE, SpacesVocab.SPACES_GRAPH, NPA.GRAPH, lastProcessed);
×
411
        return runAsk(ask);
×
412
    }
413

414
    // ---------------- Tier UPDATE loops ----------------
415

416
    /**
417
     * Per-tier inserted-triple tallies for one build or cycle. Counts the sum
418
     * of {@code (graphSize_after - graphSize_before)} across all iterations of
419
     * each tier's fixed-point INSERT loop — i.e. inserted *triples*, not
420
     * distinct subjects (a single RoleInstantiation insert writes 4–5 triples).
421
     *
422
     * <p>Used internally by the {@link #runIncrementalCycle structuralAdds}
423
     * boolean check (we only care whether any tier inserted at all).
424
     * Not what the log lines report: see {@link TierSubjectTotals} +
425
     * {@link #computeTierSubjectTotals} for the distinct-subject totals
426
     * surfaced to operators.
427
     */
428
    static final class TierInsertedTriples {
×
429
        int admin;
430
        int attachment;
431
        int maintainer;
432
        int member;
433
        int observer;
434
    }
435

436
    /**
437
     * Snapshot of distinct-subject totals in a space-state graph at a moment
438
     * in time. Independent of which tier-loop added each subject.
439
     */
440
    record TierSubjectTotals(long adminRIs, long attachmentRAs, long nonAdminRIs) {}
36✔
441

442
    /**
443
     * Runs the five tier loops in order: admin → {@code gen:hasRole} attachment
444
     * validation → maintainer → member → observer. Each loop iterates a SPARQL
445
     * INSERT to fixed point (no new triples added). Returns per-tier counts.
446
     *
447
     * @param graph         target space-state graph
448
     * @param lastProcessed load-number horizon; use {@code -1} for full build
449
     */
450
    TierInsertedTriples runAllTierLoops(IRI graph, long lastProcessed) {
451
        TierInsertedTriples c = new TierInsertedTriples();
×
452
        c.admin = runTierLabeled("admin", graph, adminTierUpdate(graph, lastProcessed));
×
453
        c.attachment = runTierLabeled("attachment", graph,
×
454
                attachmentValidationUpdate(graph, lastProcessed));
×
455
        c.maintainer = runTierLabeled("maintainer", graph, nonAdminTierUpdate(graph, lastProcessed,
×
456
                GEN.MAINTAINER_ROLE, PUBLISHER_IS_ADMIN));
457
        // Member tier: admin OR maintainer publisher — split into two simpler updates
458
        // so the query planner doesn't struggle with the UNION.
459
        c.member = runTierLabeled("member(admin-pub)", graph, nonAdminTierUpdate(graph, lastProcessed,
×
460
                GEN.MEMBER_ROLE, PUBLISHER_IS_ADMIN));
461
        c.member += runTierLabeled("member(maint-pub)", graph, nonAdminTierUpdate(graph, lastProcessed,
×
462
                GEN.MEMBER_ROLE, publisherIsTieredRole(GEN.MAINTAINER_ROLE)));
×
463
        // Observer tier: self-evidence OR a downward grant from any higher tier.
464
        // ObserverRole is the default tier when a role definition omits an
465
        // explicit subclass (see "Role types" in design-space-repositories.md), so
466
        // most "X assigned Y this role" nanopubs land here. Restricting the tier
467
        // to PUBLISHER_IS_SELF would silently drop those grants. The four
468
        // sub-loops mirror the trust-state's downward-only chain: admin grants
469
        // anything; maintainers and members grant observer; everyone may
470
        // self-attest.
471
        c.observer = runTierLabeled("observer(admin-pub)", graph, nonAdminTierUpdate(graph, lastProcessed,
×
472
                GEN.OBSERVER_ROLE, PUBLISHER_IS_ADMIN));
473
        c.observer += runTierLabeled("observer(maint-pub)", graph, nonAdminTierUpdate(graph, lastProcessed,
×
474
                GEN.OBSERVER_ROLE, publisherIsTieredRole(GEN.MAINTAINER_ROLE)));
×
475
        c.observer += runTierLabeled("observer(member-pub)", graph, nonAdminTierUpdate(graph, lastProcessed,
×
476
                GEN.OBSERVER_ROLE, publisherIsTieredRole(GEN.MEMBER_ROLE)));
×
477
        c.observer += runTierLabeled("observer(self)", graph, nonAdminTierUpdate(graph, lastProcessed,
×
478
                GEN.OBSERVER_ROLE, PUBLISHER_IS_SELF));
479
        return c;
×
480
    }
481

482
    /**
483
     * Builds a publisher constraint requiring the publisher to be a validated holder
484
     * of the given tier's role (maintainer or member) in the target space.
485
     * Owns its own AccountState resolution so ?publisher is bound through the
486
     * targeted (pkh → agent) lookup rather than enumerated.
487
     */
488
    private static String publisherIsTieredRole(IRI tierClass) {
489
        return """
×
490
                ?acct a npa:AccountState ;
491
                      npa:pubkey ?pkh ;
492
                      npa:agent  ?publisher .
493
                ?tierRI a gen:RoleInstantiation ;
494
                        npa:forSpace ?space ;
495
                        npa:forAgent ?publisher .
496
                ?rdT a npa:RoleDeclaration ;
497
                     npa:hasRoleType <%1$s> .
498
                { ?tierRI npa:regularProperty ?predT . ?rdT gen:hasRegularProperty ?predT . }
499
                UNION
500
                { ?tierRI npa:inverseProperty ?predT . ?rdT gen:hasInverseProperty ?predT . }
501
                """.formatted(tierClass);
×
502
    }
503

504
    /** Wraps {@link #runTierLoop} with tier-name context for logs/exceptions. */
505
    private int runTierLabeled(String tier, IRI graph, String sparqlUpdate) {
506
        try {
507
            return runTierLoop(graph, sparqlUpdate);
×
508
        } catch (RuntimeException ex) {
×
509
            log.error("AuthorityResolver: tier={} failed with SPARQL UPDATE:\n{}\n", tier, sparqlUpdate, ex);
×
510
            throw ex;
×
511
        }
512
    }
513

514
    /**
515
     * Runs a single tier's INSERT to fixed point. Counts rows by probing
516
     * graph size before/after each INSERT; stops when the size doesn't change.
517
     *
518
     * @return total number of triples inserted by this tier across all iterations
519
     */
520
    int runTierLoop(IRI graph, String sparqlUpdate) {
521
        int total = 0;
×
522
        long before = graphSize(graph);
×
523
        while (true) {
524
            // Note: no explicit transaction wrapping here. In tests we observed that
525
            // HTTPRepository's RDF4J-transaction protocol silently no-op'd cross-graph
526
            // SPARQL UPDATEs with UNION sub-patterns inside conn.begin()/commit(),
527
            // while the same UPDATE POSTed directly to /statements applied correctly.
528
            // A bare prepareUpdate().execute() takes the direct /statements path and
529
            // runs the UPDATE atomically per SPARQL 1.1 semantics — which is all we
530
            // need; there's nothing else to commit atomically alongside the UPDATE.
531
            try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
532
                conn.prepareUpdate(QueryLanguage.SPARQL, sparqlUpdate).execute();
×
533
            }
534
            long after = graphSize(graph);
×
535
            long added = after - before;
×
536
            if (added <= 0) break;
×
537
            total += added;
×
538
            before = after;
×
539
        }
×
540
        return total;
×
541
    }
542

543
    private long graphSize(IRI graph) {
544
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
545
            return conn.size(graph);
×
546
        }
547
    }
548

549
    /**
550
     * Distinct-subject totals in the given space-state graph, broken down by
551
     * RoleInstantiation kind (admin-pinned vs not) and RoleAssignment.
552
     * Three SELECT-COUNT queries — cheap, called once per build/cycle for
553
     * the user-facing log line. Returns zeros on failure (logged) so a flaky
554
     * count read can't wedge the cycle.
555
     */
556
    TierSubjectTotals computeTierSubjectTotals(IRI graph) {
557
        long adminRIs       = countDistinctSubjects(graph, """
×
558
                ?ri a gen:RoleInstantiation ; npa:inverseProperty gen:hasAdmin .
559
                """, "ri");
560
        long attachmentRAs  = countDistinctSubjects(graph, """
×
561
                ?ra a gen:RoleAssignment .
562
                """, "ra");
563
        long nonAdminRIs    = countDistinctSubjects(graph, """
×
564
                ?ri a gen:RoleInstantiation .
565
                FILTER NOT EXISTS { ?ri npa:inverseProperty gen:hasAdmin }
566
                """, "ri");
567
        return new TierSubjectTotals(adminRIs, attachmentRAs, nonAdminRIs);
×
568
    }
569

570
    private long countDistinctSubjects(IRI graph, String wherePattern, String varName) {
571
        String query = String.format("""
×
572
                PREFIX npa: <%1$s>
573
                PREFIX gen: <%2$s>
574
                SELECT (COUNT(DISTINCT ?%3$s) AS ?n) WHERE {
575
                  GRAPH <%4$s> {
576
                    %5$s
577
                  }
578
                }
579
                """, NPA.NAMESPACE, GEN.NAMESPACE, varName, graph, wherePattern);
580
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO);
×
581
             TupleQueryResult r = conn.prepareTupleQuery(QueryLanguage.SPARQL, query).evaluate()) {
×
582
            if (!r.hasNext()) return 0;
×
583
            return Long.parseLong(r.next().getBinding("n").getValue().stringValue());
×
584
        } catch (Exception ex) {
×
585
            log.warn("AuthorityResolver: countDistinctSubjects on {} failed: {}",
×
586
                    graph, ex.toString());
×
587
            return 0;
×
588
        }
589
    }
590

591
    // ---------------- SPARQL templates ----------------
592

593
    /**
594
     * Reusable invalidation filter on a bound nanopub-IRI variable. Pass the bare
595
     * variable name (no leading {@code ?}); e.g. {@code invalidationFilter("np")}
596
     * produces an outer-scoped {@code FILTER NOT EXISTS { GRAPH npa:spacesGraph
597
     * { ?_inv_np a npa:Invalidation ; npa:invalidates ?np . } }}.
598
     *
599
     * <p>Important: this filter must be placed OUTSIDE the surrounding
600
     * {@code GRAPH npa:spacesGraph { ... }} block, not nested inside it. When
601
     * nested, RDF4J's planner couples the FILTER NOT EXISTS evaluation into the
602
     * join order (per-row scan of {@code ?_inv a npa:Invalidation} multiplied by
603
     * the candidate set), which we measured turning a 39ms query into a 60s+
604
     * timeout on the live observer-tier data. Outside the GRAPH block, the
605
     * planner defers the filter until {@code ?np}/{@code ?rdNp} are bound and
606
     * does a targeted index lookup.
607
     *
608
     * <p>Variable names must match {@code [A-Za-z0-9_]+} per SPARQL grammar —
609
     * embedding a {@code ?} inside {@code ?_inv_?np} would yield a parse error.
610
     */
611
    private static String invalidationFilter(String bareVarName) {
612
        return "FILTER NOT EXISTS { GRAPH <" + SpacesVocab.SPACES_GRAPH + "> {"
30✔
613
                + " ?_inv_" + bareVarName
614
                + " a <" + SpacesVocab.INVALIDATION + "> ; "
615
                + "<" + SpacesVocab.INVALIDATES + "> ?" + bareVarName + " . } }";
616
    }
617

618
    /**
619
     * Admin tier: seed from {@code npadef:...hasRootAdmin} (trusted by construction)
620
     * plus closed-over admin grants; insert any {@code gen:RoleInstantiation} with
621
     * {@code npa:inverseProperty gen:hasAdmin} whose publisher (resolved via mirrored
622
     * trust-approved AccountState) is already in the admin set.
623
     */
624
    static String adminTierUpdate(IRI graph, long lastProcessed) {
625
        // Order tuned for RDF4J's evaluator:
626
        //   1. Anchor on the small (seed UNION closed-over) set to bind ?publisher
627
        //      and ?space cheaply.
628
        //   2. Resolve ?pkh from the mirrored AccountState row (?publisher bound).
629
        //   3. Probe instantiations using the now-bound (?space, ?pkh) — targeted
630
        //      lookup, not a full RoleInstantiation scan.
631
        //   4. Load-number filter on bound ?np.
632
        //   5. Dedup at the end.
633
        return """
69✔
634
                PREFIX npa:  <%1$s>
635
                PREFIX gen:  <%2$s>
636
                INSERT { GRAPH <%3$s> {
637
                  ?ri a gen:RoleInstantiation ;
638
                      npa:forSpace ?space ;
639
                      npa:inverseProperty gen:hasAdmin ;
640
                      npa:forAgent ?agent ;
641
                      npa:viaNanopub ?np .
642
                } }
643
                WHERE {
644
                  # 1. Anchor: who is already an admin of which space?
645
                  {
646
                    # Seed branch: root-admin in a non-invalidated SpaceDefinition.
647
                    GRAPH <%4$s> {
648
                      ?def a npa:SpaceDefinition ;
649
                           npa:forSpaceRef  ?spaceRef ;
650
                           npa:hasRootAdmin ?publisher ;
651
                           npa:viaNanopub   ?defNp .
652
                      ?spaceRef npa:spaceIri ?space .
653
                    }
654
                    %7$s
655
                  }
656
                  UNION
657
                  {
658
                    # Closed-over branch: an existing admin in this space-state graph.
659
                    GRAPH <%3$s> {
660
                      ?prev a gen:RoleInstantiation ;
661
                            npa:forSpace        ?space ;
662
                            npa:inverseProperty gen:hasAdmin ;
663
                            npa:forAgent        ?publisher .
664
                    }
665
                  }
666
                  # 2. Mirror: resolve ?publisher → ?pkh via the trust-approved row.
667
                  GRAPH <%3$s> {
668
                    ?acct a npa:AccountState ;
669
                          npa:agent  ?publisher ;
670
                          npa:pubkey ?pkh .
671
                  }
672
                  # 3. Targeted instantiation lookup by space + pubkey.
673
                  GRAPH <%4$s> {
674
                    ?ri a gen:RoleInstantiation ;
675
                        npa:forSpace        ?space ;
676
                        npa:inverseProperty gen:hasAdmin ;
677
                        npa:forAgent        ?agent ;
678
                        npa:pubkeyHash      ?pkh ;
679
                        npa:viaNanopub      ?np .
680
                  }
681
                  %6$s
682
                  # 4. Load-number filter on bound ?np.
683
                  GRAPH <%8$s> {
684
                    ?np npa:hasLoadNumber ?ln .
685
                    FILTER (?ln > %5$d)
686
                  }
687
                  # 5. Dedup last.
688
                  FILTER NOT EXISTS { GRAPH <%3$s> {
689
                    ?existing a gen:RoleInstantiation ;
690
                              npa:forSpace ?space ;
691
                              npa:forAgent ?agent ;
692
                              npa:inverseProperty gen:hasAdmin .
693
                  } }
694
                }
695
                """.formatted(
3✔
696
                NPA.NAMESPACE,
697
                GEN.NAMESPACE,
698
                graph,
699
                SpacesVocab.SPACES_GRAPH,
700
                lastProcessed,
15✔
701
                invalidationFilter("np"),
15✔
702
                invalidationFilter("defNp"),
18✔
703
                NPA.GRAPH);
704
    }
705

706
    /**
707
     * {@code gen:hasRole} attachment validation: an attachment is validated iff its
708
     * publisher is already a validated admin of the target space. Adds
709
     * {@code gen:RoleAssignment} rows to the space-state graph.
710
     */
711
    static String attachmentValidationUpdate(IRI graph, long lastProcessed) {
712
        return """
69✔
713
                PREFIX npa:  <%1$s>
714
                PREFIX gen:  <%2$s>
715
                INSERT { GRAPH <%3$s> {
716
                  ?ra a gen:RoleAssignment ;
717
                      npa:forSpace ?space ;
718
                      gen:hasRole  ?role ;
719
                      npa:viaNanopub ?np .
720
                } }
721
                WHERE {
722
                  GRAPH <%4$s> {
723
                    ?ra a gen:RoleAssignment ;
724
                        npa:forSpace ?space ;
725
                        gen:hasRole  ?role ;
726
                        npa:pubkeyHash ?pkh ;
727
                        npa:viaNanopub ?np .
728
                  }
729
                  GRAPH <%7$s> {
730
                    ?np npa:hasLoadNumber ?ln .
731
                    FILTER (?ln > %5$d)
732
                  }
733
                  GRAPH <%3$s> {
734
                    ?acct a npa:AccountState ;
735
                          npa:agent  ?publisher ;
736
                          npa:pubkey ?pkh .
737
                    ?adminRI a gen:RoleInstantiation ;
738
                             npa:forSpace ?space ;
739
                             npa:inverseProperty gen:hasAdmin ;
740
                             npa:forAgent ?publisher .
741
                  }
742
                  %6$s
743
                  FILTER NOT EXISTS { GRAPH <%3$s> {
744
                    ?existing a gen:RoleAssignment ;
745
                              npa:forSpace ?space ;
746
                              gen:hasRole  ?role .
747
                  } }
748
                }
749
                """.formatted(
3✔
750
                NPA.NAMESPACE,
751
                GEN.NAMESPACE,
752
                graph,
753
                SpacesVocab.SPACES_GRAPH,
754
                lastProcessed,
15✔
755
                invalidationFilter("np"),
18✔
756
                NPA.GRAPH);
757
    }
758

759
    /**
760
     * Non-admin tier publisher constraints (inserted as a SPARQL sub-pattern).
761
     * Each constraint owns the AccountState (pkh → agent) lookup so the join
762
     * variable is bound through a targeted pattern. The observer-self variant
763
     * binds {@code npa:agent ?agent} directly — no separate {@code ?publisher}
764
     * variable, no post-join equality filter — which lets the planner anchor
765
     * the AccountState lookup on the already-bound {@code ?agent} instead of
766
     * enumerating all approved publishers and filtering at the end.
767
     */
768
    static final String PUBLISHER_IS_ADMIN = """
769
            ?acct a npa:AccountState ;
770
                  npa:pubkey ?pkh ;
771
                  npa:agent  ?publisher .
772
            ?adminRI a gen:RoleInstantiation ;
773
                     npa:forSpace ?space ;
774
                     npa:inverseProperty gen:hasAdmin ;
775
                     npa:forAgent ?publisher .
776
            """;
777

778
    /** Observer self-evidence: the assignee's own pubkey signed the instantiation. */
779
    static final String PUBLISHER_IS_SELF = """
780
            ?acct a npa:AccountState ;
781
                  npa:pubkey ?pkh ;
782
                  npa:agent  ?agent .
783
            """;
784

785
    /**
786
     * Maintainer / Member / Observer tier INSERT. Same shape: find an instantiation
787
     * whose predicate matches a RoleDeclaration of the given tier attached to the
788
     * target space, and whose publisher passes the tier-specific constraint.
789
     */
790
    static String nonAdminTierUpdate(IRI graph, long lastProcessed,
791
                                     IRI tierClass, String publisherConstraint) {
792
        // Order tuned for RDF4J's evaluator (which executes BGPs roughly in order).
793
        // The crucial choice is the *anchor*: instantiation-first plans send the
794
        // planner exploring the full ~thousands of candidate RIs and only filter
795
        // by tier at the very end. Attachment-first anchors on the small set of
796
        // gen:RoleAssignment rows already validated in this space-state graph
797
        // (~hundreds, often zero) and walks outward by bound (?role, ?space).
798
        //
799
        //   1. Anchor on RoleAssignments in this space-state graph (small).
800
        //   2. Match the tier-pinned RoleDeclaration by ?role.
801
        //   3. Pair role-decl direction to instantiation direction in one UNION
802
        //      so only (reg, reg)/(inv, inv) combos are explored.
803
        //   4. Targeted instantiation lookup — (?space, ?pred) are bound.
804
        //   5. Publisher constraint (incl. AccountState resolution).
805
        //   6. Load-number filter on bound ?np.
806
        //   7. Dedup at the end.
807
        return """
69✔
808
                PREFIX npa:  <%1$s>
809
                PREFIX gen:  <%2$s>
810
                INSERT { GRAPH <%3$s> {
811
                  ?ri a gen:RoleInstantiation ;
812
                      npa:forSpace ?space ;
813
                      npa:forAgent ?agent ;
814
                      npa:viaNanopub ?np .
815
                } }
816
                WHERE {
817
                  # 1. Anchor: validated attachments in this space-state graph.
818
                  GRAPH <%3$s> {
819
                    ?ra a gen:RoleAssignment ;
820
                        gen:hasRole  ?role ;
821
                        npa:forSpace ?space .
822
                  }
823
                  # 2. Tier-pinned RoleDeclaration (?role bound from the attachment).
824
                  GRAPH <%4$s> {
825
                    ?rd a npa:RoleDeclaration ;
826
                        npa:hasRoleType <%7$s> ;
827
                        npa:role        ?role ;
828
                        npa:viaNanopub  ?rdNp .
829
                    # 3. Pair direction so only matching combos are explored.
830
                    {
831
                      ?rd gen:hasRegularProperty ?pred .
832
                      ?ri npa:regularProperty    ?pred .
833
                    }
834
                    UNION
835
                    {
836
                      ?rd gen:hasInverseProperty ?pred .
837
                      ?ri npa:inverseProperty    ?pred .
838
                    }
839
                    # 4. Targeted instantiation lookup — (?space, ?pred) bound.
840
                    ?ri a gen:RoleInstantiation ;
841
                        npa:forSpace   ?space ;
842
                        npa:forAgent   ?agent ;
843
                        npa:pubkeyHash ?pkh ;
844
                        npa:viaNanopub ?np .
845
                  }
846
                  # 5. Publisher constraint (incl. AccountState resolution).
847
                  GRAPH <%3$s> {
848
                    %9$s
849
                  }
850
                  # 6. Load-number filter on bound ?np.
851
                  GRAPH <%10$s> {
852
                    ?np npa:hasLoadNumber ?ln .
853
                    FILTER (?ln > %5$d)
854
                  }
855
                  # 7. Invalidation filters — outside the GRAPH block so the
856
                  #    planner defers them until ?rdNp/?np are bound.
857
                  %8$s
858
                  %6$s
859
                  # 8. Dedup last.
860
                  FILTER NOT EXISTS { GRAPH <%3$s> {
861
                    ?existing a gen:RoleInstantiation ;
862
                              npa:forSpace ?space ;
863
                              npa:forAgent ?agent ;
864
                              npa:viaNanopub ?np .
865
                  } }
866
                }
867
                """.formatted(
3✔
868
                NPA.NAMESPACE,
869
                GEN.NAMESPACE,
870
                graph,
871
                SpacesVocab.SPACES_GRAPH,
872
                lastProcessed,
15✔
873
                invalidationFilter("np"),
27✔
874
                tierClass,
875
                invalidationFilter("rdNp"),
30✔
876
                publisherConstraint,
877
                NPA.GRAPH);
878
    }
879

880
    // ---------------- Invalidation templates (incremental cycle) ----------------
881

882
    /**
883
     * WHERE clause shared by the admin-RI invalidation ASK precheck and the
884
     * matching DELETE. Identifies admin-tier {@code gen:RoleInstantiation} rows
885
     * in the space-state graph whose {@code npa:viaNanopub} equals the target
886
     * of an {@code npa:Invalidation} that landed in {@code (lastProcessed, ∞)}.
887
     */
888
    static String adminInvalidationCheckWhere(IRI graph, long lastProcessed) {
889
        return String.format("""
60✔
890
                  GRAPH <%1$s> {
891
                    ?ri a gen:RoleInstantiation ;
892
                        npa:inverseProperty gen:hasAdmin ;
893
                        npa:viaNanopub ?np .
894
                  }
895
                  GRAPH <%2$s> {
896
                    ?inv a npa:Invalidation ;
897
                         npa:invalidates ?np ;
898
                         npa:viaNanopub  ?invNp .
899
                  }
900
                  GRAPH <%3$s> {
901
                    ?invNp npa:hasLoadNumber ?ln .
902
                    FILTER (?ln > %4$d)
903
                  }
904
                """, graph, SpacesVocab.SPACES_GRAPH, NPA.GRAPH, lastProcessed);
6✔
905
    }
906

907
    /** DELETE template for admin-tier RoleInstantiations whose source nanopub was invalidated. */
908
    static String adminInvalidationDelete(IRI graph, long lastProcessed) {
909
        return String.format("""
63✔
910
                PREFIX npa: <%1$s>
911
                PREFIX gen: <%2$s>
912
                DELETE { GRAPH <%3$s> {
913
                  ?ri ?p ?o .
914
                } }
915
                WHERE {
916
                  GRAPH <%3$s> { ?ri ?p ?o . }
917
                %4$s
918
                }
919
                """, NPA.NAMESPACE, GEN.NAMESPACE, graph,
920
                adminInvalidationCheckWhere(graph, lastProcessed));
6✔
921
    }
922

923
    /** WHERE clause for RoleAssignment invalidation. */
924
    static String roleAssignmentInvalidationCheckWhere(IRI graph, long lastProcessed) {
925
        return String.format("""
60✔
926
                  GRAPH <%1$s> {
927
                    ?ra a gen:RoleAssignment ;
928
                        npa:viaNanopub ?np .
929
                  }
930
                  GRAPH <%2$s> {
931
                    ?inv a npa:Invalidation ;
932
                         npa:invalidates ?np ;
933
                         npa:viaNanopub  ?invNp .
934
                  }
935
                  GRAPH <%3$s> {
936
                    ?invNp npa:hasLoadNumber ?ln .
937
                    FILTER (?ln > %4$d)
938
                  }
939
                """, graph, SpacesVocab.SPACES_GRAPH, NPA.GRAPH, lastProcessed);
6✔
940
    }
941

942
    /** DELETE template for RoleAssignments whose source nanopub was invalidated. */
943
    static String roleAssignmentInvalidationDelete(IRI graph, long lastProcessed) {
944
        return String.format("""
63✔
945
                PREFIX npa: <%1$s>
946
                PREFIX gen: <%2$s>
947
                DELETE { GRAPH <%3$s> {
948
                  ?ra ?p ?o .
949
                } }
950
                WHERE {
951
                  GRAPH <%3$s> { ?ra ?p ?o . }
952
                %4$s
953
                }
954
                """, NPA.NAMESPACE, GEN.NAMESPACE, graph,
955
                roleAssignmentInvalidationCheckWhere(graph, lastProcessed));
6✔
956
    }
957

958
    /**
959
     * WHERE clause for RoleDeclaration invalidation. ASK-only (no DELETE):
960
     * RoleDeclarations live in {@code npa:spacesGraph} and aren't materialized
961
     * into the space-state graph, so there's nothing to remove from the
962
     * space-state. The ASK still flips {@code npa:needsFullRebuild} because
963
     * sticky downstream RIs that were derived under the now-invalidated RD
964
     * need a from-scratch recompute.
965
     */
966
    static String roleDeclarationInvalidationCheckWhere(long lastProcessed) {
967
        return String.format("""
48✔
968
                  GRAPH <%1$s> {
969
                    ?rd a npa:RoleDeclaration ;
970
                        npa:viaNanopub ?np .
971
                    ?inv a npa:Invalidation ;
972
                         npa:invalidates ?np ;
973
                         npa:viaNanopub  ?invNp .
974
                  }
975
                  GRAPH <%2$s> {
976
                    ?invNp npa:hasLoadNumber ?ln .
977
                    FILTER (?ln > %3$d)
978
                  }
979
                """, SpacesVocab.SPACES_GRAPH, NPA.GRAPH, lastProcessed);
6✔
980
    }
981

982
    /**
983
     * DELETE template for non-admin (leaf-tier) RoleInstantiations whose source
984
     * nanopub was invalidated. Identified as {@code gen:RoleInstantiation} rows
985
     * lacking the admin-pinning {@code npa:inverseProperty gen:hasAdmin} triple.
986
     * No flag is set; leaf-tier removals are recoverable on the next cycle.
987
     */
988
    static String leafTierInvalidationDelete(IRI graph, long lastProcessed) {
989
        return String.format("""
84✔
990
                PREFIX npa: <%1$s>
991
                PREFIX gen: <%2$s>
992
                DELETE { GRAPH <%3$s> {
993
                  ?ri ?p ?o .
994
                } }
995
                WHERE {
996
                  GRAPH <%3$s> {
997
                    ?ri a gen:RoleInstantiation ;
998
                        npa:viaNanopub ?np .
999
                    FILTER NOT EXISTS { ?ri npa:inverseProperty gen:hasAdmin }
1000
                    ?ri ?p ?o .
1001
                  }
1002
                  GRAPH <%4$s> {
1003
                    ?inv a npa:Invalidation ;
1004
                         npa:invalidates ?np ;
1005
                         npa:viaNanopub  ?invNp .
1006
                  }
1007
                  GRAPH <%5$s> {
1008
                    ?invNp npa:hasLoadNumber ?ln .
1009
                    FILTER (?ln > %6$d)
1010
                  }
1011
                }
1012
                """, NPA.NAMESPACE, GEN.NAMESPACE, graph,
1013
                SpacesVocab.SPACES_GRAPH, NPA.GRAPH, lastProcessed);
6✔
1014
    }
1015

1016
    /** Wraps an ASK by joining the shared prefixes. */
1017
    private boolean wouldInvalidate(IRI graph, long lastProcessed,
1018
                                    boolean adminPinned, String whereClause) {
1019
        // adminPinned is informational only — kept to make call sites read clearly;
1020
        // the WHERE clause already encodes the kind via its own type predicates.
1021
        String ask = String.format("""
×
1022
                PREFIX npa: <%1$s>
1023
                PREFIX gen: <%2$s>
1024
                ASK { %3$s }
1025
                """, NPA.NAMESPACE, GEN.NAMESPACE, whereClause);
1026
        return runAsk(ask);
×
1027
    }
1028

1029
    private boolean runAsk(String sparql) {
1030
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1031
            return conn.prepareBooleanQuery(QueryLanguage.SPARQL, sparql).evaluate();
×
1032
        }
1033
    }
1034

1035
    private void executeUpdate(String sparqlUpdate) {
1036
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1037
            conn.prepareUpdate(QueryLanguage.SPARQL, sparqlUpdate).execute();
×
1038
        }
1039
    }
×
1040

1041
    // ---------------- Mirror step ----------------
1042

1043
    /**
1044
     * Copies trust-approved {@code npa:AccountState} rows from {@code npat:<T>}
1045
     * in the {@code trust} repo into {@code newGraph} in the {@code spaces} repo,
1046
     * inside one spaces-side serializable transaction.
1047
     *
1048
     * @return number of rows mirrored (useful for metrics / logging)
1049
     */
1050
    int mirrorTrustState(String trustStateHash, IRI newGraph) {
1051
        IRI trustStateIri = NPAT.forHash(trustStateHash);
×
1052
        int count = 0;
×
1053
        try (RepositoryConnection trustConn = TripleStore.get().getRepoConnection(TRUST_REPO);
×
1054
             RepositoryConnection spacesConn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1055
            trustConn.begin(IsolationLevels.READ_COMMITTED);
×
1056
            spacesConn.begin(IsolationLevels.SERIALIZABLE);
×
1057
            // Walk rdf:type triples in the trust state's graph; for each AccountState,
1058
            // check status and copy the approved ones verbatim (minus status-specific
1059
            // detail triples, which we don't need for validation).
1060
            try (RepositoryResult<Statement> typeRows = trustConn.getStatements(
×
1061
                    null, RDF.TYPE, NPA_ACCOUNT_STATE, trustStateIri)) {
1062
                while (typeRows.hasNext()) {
×
1063
                    Statement st = typeRows.next();
×
1064
                    if (!(st.getSubject() instanceof IRI accountStateIri)) continue;
×
1065
                    Value status = trustConn.getStatements(accountStateIri, NPA_TRUST_STATUS, null, trustStateIri)
×
1066
                            .stream().findFirst().map(Statement::getObject).orElse(null);
×
1067
                    if (!(status instanceof IRI statusIri) || !APPROVED_SET.contains(statusIri)) continue;
×
1068
                    Value agent = trustConn.getStatements(accountStateIri, NPA_AGENT, null, trustStateIri)
×
1069
                            .stream().findFirst().map(Statement::getObject).orElse(null);
×
1070
                    Value pubkey = trustConn.getStatements(accountStateIri, NPA_PUBKEY, null, trustStateIri)
×
1071
                            .stream().findFirst().map(Statement::getObject).orElse(null);
×
1072
                    if (agent == null || pubkey == null) {
×
1073
                        log.warn("AuthorityResolver.mirror: account {} missing agent or pubkey; skipping",
×
1074
                                accountStateIri);
1075
                        continue;
×
1076
                    }
1077
                    spacesConn.add(accountStateIri, RDF.TYPE, NPA_ACCOUNT_STATE, newGraph);
×
1078
                    spacesConn.add(accountStateIri, NPA_AGENT, agent, newGraph);
×
1079
                    spacesConn.add(accountStateIri, NPA_PUBKEY, pubkey, newGraph);
×
1080
                    spacesConn.add(accountStateIri, NPA_TRUST_STATUS, statusIri, newGraph);
×
1081
                    count++;
×
1082
                }
×
1083
            }
1084
            spacesConn.commit();
×
1085
            trustConn.commit();
×
1086
        }
1087
        return count;
×
1088
    }
1089

1090
    // ---------------- Pointer + counter helpers ----------------
1091

1092
    /**
1093
     * Reads the current {@code npa:hasCurrentSpaceState} pointer from the
1094
     * {@code npa:graph} admin graph of the {@code spaces} repo. Returns
1095
     * {@code null} if no pointer exists yet.
1096
     */
1097
    IRI getCurrentSpaceStateGraph() {
1098
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1099
            Value v = Utils.getObjectForPattern(conn, NPA.GRAPH, NPA.THIS_REPO,
×
1100
                    SpacesVocab.HAS_CURRENT_SPACE_STATE);
1101
            return (v instanceof IRI iri) ? iri : null;
×
1102
        } catch (Exception ex) {
×
1103
            log.warn("AuthorityResolver: failed to read hasCurrentSpaceState pointer: {}", ex.toString());
×
1104
            return null;
×
1105
        }
1106
    }
1107

1108
    long getCurrentLoadCounter() {
1109
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1110
            Value v = Utils.getObjectForPattern(conn, NPA.GRAPH, NPA.THIS_REPO,
×
1111
                    SpacesVocab.CURRENT_LOAD_COUNTER);
1112
            if (v == null) return 0;
×
1113
            try {
1114
                return Long.parseLong(v.stringValue());
×
1115
            } catch (NumberFormatException ex) {
×
1116
                log.warn("AuthorityResolver: non-numeric currentLoadCounter: {}", v);
×
1117
                return 0;
×
1118
            }
1119
        } catch (Exception ex) {
×
1120
            log.warn("AuthorityResolver: failed to read currentLoadCounter: {}", ex.toString());
×
1121
            return 0;
×
1122
        }
1123
    }
1124

1125
    /**
1126
     * Atomic pointer flip: a single SPARQL {@code DELETE … INSERT … WHERE}
1127
     * replaces the old pointer with the new one in one statement, so readers
1128
     * never see a zero-pointer window.
1129
     */
1130
    void flipPointer(IRI newGraph) {
1131
        String update = String.format("""
×
1132
                DELETE { GRAPH <%s> { <%s> <%s> ?old } }
1133
                INSERT { GRAPH <%s> { <%s> <%s> <%s> } }
1134
                WHERE  { OPTIONAL { GRAPH <%s> { <%s> <%s> ?old } } }
1135
                """,
1136
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.HAS_CURRENT_SPACE_STATE,
1137
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.HAS_CURRENT_SPACE_STATE, newGraph,
1138
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.HAS_CURRENT_SPACE_STATE);
1139
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1140
            conn.begin(IsolationLevels.SERIALIZABLE);
×
1141
            conn.prepareUpdate(QueryLanguage.SPARQL, update).execute();
×
1142
            conn.commit();
×
1143
        }
1144
    }
×
1145

1146
    void writeProcessedUpTo(IRI graph, long loadCounter) {
1147
        String update = String.format("""
×
1148
                DELETE { GRAPH <%s> { <%s> <%s> ?old } }
1149
                INSERT { GRAPH <%s> { <%s> <%s> "%d"^^<http://www.w3.org/2001/XMLSchema#long> } }
1150
                WHERE  { OPTIONAL { GRAPH <%s> { <%s> <%s> ?old } } }
1151
                """,
1152
                graph, graph, SpacesVocab.PROCESSED_UP_TO,
1153
                graph, graph, SpacesVocab.PROCESSED_UP_TO, loadCounter,
×
1154
                graph, graph, SpacesVocab.PROCESSED_UP_TO);
1155
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1156
            conn.begin(IsolationLevels.SERIALIZABLE);
×
1157
            conn.prepareUpdate(QueryLanguage.SPARQL, update).execute();
×
1158
            conn.commit();
×
1159
        }
1160
    }
×
1161

1162
    /**
1163
     * Reads {@code processedUpTo} from the given space-state graph.
1164
     * Returns {@code -1} if absent (graph not fully built yet).
1165
     */
1166
    long readProcessedUpTo(IRI graph) {
1167
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1168
            String query = String.format(
×
1169
                    "SELECT ?n WHERE { GRAPH <%s> { <%s> <%s> ?n } }",
1170
                    graph, graph, SpacesVocab.PROCESSED_UP_TO);
1171
            try (TupleQueryResult r = conn.prepareTupleQuery(QueryLanguage.SPARQL, query).evaluate()) {
×
1172
                if (!r.hasNext()) return -1;
×
1173
                BindingSet b = r.next();
×
1174
                return Long.parseLong(b.getBinding("n").getValue().stringValue());
×
1175
            }
×
1176
        } catch (Exception ex) {
×
1177
            log.warn("AuthorityResolver: failed to read processedUpTo for {}: {}", graph, ex.toString());
×
1178
            return -1;
×
1179
        }
1180
    }
1181

1182
    /**
1183
     * Reads the {@code npa:needsFullRebuild} flag (boolean literal) from
1184
     * {@code npa:graph} in the {@code spaces} repo. Defaults to {@code false}
1185
     * when the triple is absent.
1186
     */
1187
    boolean readNeedsFullRebuild() {
1188
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1189
            Value v = Utils.getObjectForPattern(conn, NPA.GRAPH, NPA.THIS_REPO,
×
1190
                    SpacesVocab.NEEDS_FULL_REBUILD);
1191
            return v != null && Boolean.parseBoolean(v.stringValue());
×
1192
        } catch (Exception ex) {
×
1193
            log.warn("AuthorityResolver: failed to read needsFullRebuild: {}", ex.toString());
×
1194
            return false;
×
1195
        }
1196
    }
1197

1198
    void setNeedsFullRebuild() {
1199
        writeNeedsFullRebuild(true);
×
1200
    }
×
1201

1202
    void clearNeedsFullRebuild() {
1203
        writeNeedsFullRebuild(false);
×
1204
    }
×
1205

1206
    private void writeNeedsFullRebuild(boolean value) {
1207
        String update = String.format("""
×
1208
                DELETE { GRAPH <%s> { <%s> <%s> ?old } }
1209
                INSERT { GRAPH <%s> { <%s> <%s> "%s"^^<http://www.w3.org/2001/XMLSchema#boolean> } }
1210
                WHERE  { OPTIONAL { GRAPH <%s> { <%s> <%s> ?old } } }
1211
                """,
1212
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.NEEDS_FULL_REBUILD,
1213
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.NEEDS_FULL_REBUILD, value,
×
1214
                NPA.GRAPH, NPA.THIS_REPO, SpacesVocab.NEEDS_FULL_REBUILD);
1215
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1216
            conn.begin(IsolationLevels.SERIALIZABLE);
×
1217
            conn.prepareUpdate(QueryLanguage.SPARQL, update).execute();
×
1218
            conn.commit();
×
1219
        }
1220
    }
×
1221

1222
    void dropGraph(IRI graph) {
1223
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(SPACES_REPO)) {
×
1224
            conn.begin(IsolationLevels.SERIALIZABLE);
×
1225
            conn.clear(graph);
×
1226
            conn.commit();
×
1227
            log.info("AuthorityResolver: dropped old space-state graph {}", graph);
×
1228
        }
1229
    }
×
1230

1231
    // ---------------- Trust-repo pointer lookup (used by TrustStateRegistry's bootstrap) ----------------
1232

1233
    /**
1234
     * Queries the {@code trust} repo directly for the current trust-state hash.
1235
     * Prefer {@link TrustStateRegistry#getCurrentHash()} in normal operation —
1236
     * this helper exists for tests and diagnostics.
1237
     *
1238
     * @return the current trust-state hash, or empty if none is set
1239
     */
1240
    Optional<String> readTrustRepoCurrentHash() {
1241
        try (RepositoryConnection conn = TripleStore.get().getRepoConnection(TRUST_REPO)) {
×
1242
            Value v = Utils.getObjectForPattern(conn, NPA.GRAPH, NPA.THIS_REPO,
×
1243
                    NPA_HAS_CURRENT_TRUST_STATE);
1244
            if (!(v instanceof IRI iri)) return Optional.empty();
×
1245
            String s = iri.stringValue();
×
1246
            if (!s.startsWith(NPAT.NAMESPACE)) return Optional.empty();
×
1247
            return Optional.of(s.substring(NPAT.NAMESPACE.length()));
×
1248
        } catch (Exception ex) {
×
1249
            log.warn("AuthorityResolver: failed to read trust-repo current pointer: {}", ex.toString());
×
1250
            return Optional.empty();
×
1251
        }
1252
    }
1253

1254
    private static String abbrev(String hash) {
1255
        return hash.length() > 12 ? hash.substring(0, 12) + "…" : hash;
×
1256
    }
1257

1258
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc