• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-query / 24660087238

20 Apr 2026 09:53AM UTC coverage: 60.194% (-0.4%) from 60.573%
24660087238

push

github

web-flow
Merge pull request #73 from knowledgepixels/fix/log-hygiene-active-evict-lockfree-setupid

fix/perf: changes 2+3+4 — log hygiene, active-repo eviction skip, lock-free setupId getter

293 of 540 branches covered (54.26%)

Branch coverage included in aggregate %.

826 of 1319 relevant lines covered (62.62%)

9.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.78
src/main/java/com/knowledgepixels/query/TripleStore.java
1
package com.knowledgepixels.query;
2

3
import org.apache.commons.exec.environment.EnvironmentUtils;
4
import org.apache.http.HttpResponse;
5
import org.apache.http.client.methods.HttpUriRequest;
6
import org.apache.http.client.methods.RequestBuilder;
7
import org.apache.http.entity.StringEntity;
8
import org.apache.http.impl.client.BasicResponseHandler;
9
import org.apache.http.impl.client.CloseableHttpClient;
10
import org.apache.http.impl.client.HttpClients;
11
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
12
import org.eclipse.rdf4j.model.ValueFactory;
13
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
14
import org.eclipse.rdf4j.repository.Repository;
15
import org.eclipse.rdf4j.repository.RepositoryConnection;
16
import org.eclipse.rdf4j.repository.base.RepositoryConnectionWrapper;
17
import org.eclipse.rdf4j.repository.http.HTTPRepository;
18
import org.nanopub.NanopubUtils;
19
import org.nanopub.vocabulary.NPA;
20
import org.slf4j.Logger;
21
import org.slf4j.LoggerFactory;
22

23
import java.io.BufferedReader;
24
import java.io.IOException;
25
import java.io.InputStreamReader;
26
import java.util.*;
27
import java.util.Map.Entry;
28
import java.util.concurrent.ConcurrentHashMap;
29
import java.util.concurrent.atomic.AtomicBoolean;
30
import java.util.concurrent.atomic.AtomicInteger;
31
import java.util.concurrent.locks.ReadWriteLock;
32
import java.util.concurrent.locks.ReentrantReadWriteLock;
33

34
/**
35
 * Class to access the database in the form of triple stores.
36
 */
37
public class TripleStore {
38

39
    /**
40
     * Name of the admin graph.
41
     */
42
    public static final String ADMIN_REPO = "admin";
43

44
    private static ValueFactory vf = SimpleValueFactory.getInstance();
6✔
45

46
    private static final Logger log = LoggerFactory.getLogger(TripleStore.class);
12✔
47

48
    private final Map<String, Repository> repositories = new LinkedHashMap<>();
15✔
49

50
    /**
51
     * Per-repo open-connection counter, read by the eviction loop to skip repos that
52
     * still have live connections. Incremented in {@link #getRepoConnection(String)}
53
     * just before the connection is handed out and decremented via a
54
     * {@link RepositoryConnectionWrapper} that intercepts {@code close()} exactly once.
55
     */
56
    private final ConcurrentHashMap<String, AtomicInteger> openConnections = new ConcurrentHashMap<>();
15✔
57

58
    private String endpointBase = null;
9✔
59
    private String endpointType = null;
9✔
60

61
    private static TripleStore tripleStoreInstance;
62

63
    /**
64
     * Returns singleton triple store instance.
65
     *
66
     * @return Triple store instance
67
     */
68
    public static TripleStore get() {
69
        if (tripleStoreInstance == null) {
6!
70
            try {
71
                tripleStoreInstance = new TripleStore();
×
72
            } catch (IOException ex) {
×
73
                log.info("Could not init TripleStore. ", ex);
×
74
            }
×
75
        }
76
        return tripleStoreInstance;
×
77
    }
78

79
    private TripleStore() throws IOException {
6✔
80
        Map<String, String> env = EnvironmentUtils.getProcEnvironment();
6✔
81
        endpointBase = env.get("ENDPOINT_BASE");
18✔
82
        log.info("Endpoint base: {}", endpointBase);
15✔
83
        endpointType = env.get("ENDPOINT_TYPE");
18✔
84

85
        getRepository("empty");  // Make sure empty repo exists
×
86
    }
×
87

88
    private final CloseableHttpClient httpclient = HttpClients.createDefault();
9✔
89

90
    @GeneratedFlagForDependentElements
91
    Repository getRepository(String name) {
92
        synchronized (this) {
93
            if (repositories.size() > 100) {
94
                evictIdleRepos();
95
            }
96
            if (repositories.containsKey(name)) {
97
                // Move to the end of the list:
98
                Repository repo = repositories.remove(name);
99
                repositories.put(name, repo);
100
            } else {
101
                Repository repository = null;
102
                if (endpointType == null || endpointType.equals("rdf4j")) {
103
                    HTTPRepository hr = new HTTPRepository(endpointBase + "repositories/" + name);
104
                    hr.setHttpClient(httpclient);
105
                    repository = hr;
106
//                        } else if (endpointType.equals("virtuoso")) {
107
//                                repository = new VirtuosoRepository(endpointBase + name, username, password);
108
                } else {
109
                    throw new RuntimeException("Unknown repository type: " + endpointType);
110
                }
111
                repositories.put(name, repository);
112
                createRepo(name);
113
                getRepoConnection(name).close();
114
            }
115
            return repositories.get(name);
116
        }
117
    }
118

119
    /**
120
     * Return the repository connection for the given repository name.
121
     *
122
     * @param name repository name
123
     * @return repository connection
124
     */
125
    @GeneratedFlagForDependentElements
126
    public RepositoryConnection getRepoConnection(String name) {
127
        // The increment has to happen under the same monitor that guards eviction,
128
        // otherwise another thread could evict this repo in the window between
129
        // getRepository() returning and the counter going above zero. getConnection()
130
        // on HTTPRepository is local (it doesn't do HTTP), so holding the lock here
131
        // is cheap.
132
        synchronized (this) {
133
            Repository repo = getRepository(name);
134
            if (repo == null) {
135
                return null;
136
            }
137
            AtomicInteger counter = openConnections.computeIfAbsent(name, k -> new AtomicInteger());
×
138
            counter.incrementAndGet();
139
            return new CountingRepositoryConnection(repo, repo.getConnection(), counter);
140
        }
141
    }
142

143
    /**
144
     * Evicts the eldest cache entries until either the size is back within the
145
     * 100-entry cap or every remaining entry has at least one open connection.
146
     * The cap is load-bearing — each cached entry keeps an LMDB environment alive
147
     * on the RDF4J server (in-memory cache, mmap pages, native memory), so
148
     * exceeding it by much risks server-side OOM. Actively-used repos are skipped
149
     * rather than shut down, because {@link org.eclipse.rdf4j.repository.http.HTTPRepository#shutDown()}
150
     * closes the session manager and kills any live transaction on that repo with
151
     * a connection-close error (the {@code MMapIndexInput – Already closed}
152
     * failure mode observed on query-3 in the April test). The cache self-converges
153
     * as active repos become idle.
154
     */
155
    @GeneratedFlagForDependentElements
156
    private void evictIdleRepos() {
157
        List<String> skipped = new ArrayList<>();
158
        Iterator<Entry<String, Repository>> iter = repositories.entrySet().iterator();
159
        while (iter.hasNext() && repositories.size() > 100) {
160
            Entry<String, Repository> e = iter.next();
161
            AtomicInteger active = openConnections.get(e.getKey());
162
            if (active != null && active.get() > 0) {
163
                skipped.add(e.getKey());
164
                continue;
165
            }
166
            iter.remove();
167
            log.info("Shutting down repo: {}", e.getKey());
168
            e.getValue().shutDown();
169
            log.info("Shutdown complete");
170
        }
171
        if (!skipped.isEmpty()) {
172
            log.warn("Skipped eviction for {} active repo(s); cache size is now {} (cap 100). Active names: {}",
173
                    skipped.size(), repositories.size(), skipped);
174
        }
175
    }
176

177
    /**
178
     * Minimal wrapper around a delegate {@link RepositoryConnection} whose only job
179
     * is to decrement the per-repo open-connection counter exactly once when the
180
     * caller closes it. Uses {@link AtomicBoolean} so that repeated/idempotent
181
     * {@code close()} calls (common with try-with-resources plus explicit close)
182
     * don't decrement more than once.
183
     */
184
    private static final class CountingRepositoryConnection extends RepositoryConnectionWrapper {
185

186
        private final AtomicInteger counter;
187
        private final AtomicBoolean closed = new AtomicBoolean();
×
188

189
        CountingRepositoryConnection(Repository repo, RepositoryConnection delegate, AtomicInteger counter) {
190
            super(repo, delegate);
×
191
            this.counter = counter;
×
192
        }
×
193

194
        @Override
195
        public void close() {
196
            try {
197
                super.close();
×
198
            } finally {
199
                if (closed.compareAndSet(false, true)) {
×
200
                    counter.decrementAndGet();
×
201
                }
202
            }
203
        }
×
204

205
    }
206

207
    @GeneratedFlagForDependentElements
208
    private void createRepo(String repoName) {
209
        if (!repoName.equals(ADMIN_REPO)) {
210
            getRepository(ADMIN_REPO);  // make sure admin repo is loaded first
211
        }
212
        try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
213
            //log.info("Trying to creating repo " + name);
214

215
            // TODO new syntax somehow doesn't work for the Lucene case:
216

217
//                        String createRegularRepoQueryString = "@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>.\n"
218
//                                        + "@prefix config: <tag:rdf4j.org,2023:config/>.\n"
219
//                                        + "[] a config:Repository ;\n"
220
//                                        + "    config:rep.id \"" + name + "\" ;\n"
221
//                                        + "    rdfs:label \"" + name + " native store\" ;\n"
222
//                                        + "    config:rep.impl [\n"
223
//                                        + "        config:rep.type \"openrdf:SailRepository\" ;\n"
224
//                                        + "        config:sail.impl [\n"
225
//                                        + "            config:sail.type \"openrdf:NativeStore\" ;\n"
226
//                                        + "            config:sail.iterationCacheSyncThreshold \"10000\";\n"
227
//                                        + "            config:native.tripleIndexes \"spoc,posc,ospc,opsc,psoc,sopc,spoc,cpos,cosp,cops,cpso,csop\" ;\n"
228
//                                        + "            config:sail.defaultQueryEvaluationMode \"STANDARD\"\n"
229
//                                        + "        ]\n"
230
//                                        + "    ].";
231
//                        String createTextRepoQueryString = "@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>.\n"
232
//                                        + "@prefix config: <tag:rdf4j.org,2023:config/>.\n"
233
//                                        + "[] a config:Repository ;\n"
234
//                                        + "    config:rep.id \"" + name + "\" ;\n"
235
//                                        + "    rdfs:label \"" + name + " native store\" ;\n"
236
//                                        + "    config:rep.impl [\n"
237
//                                        + "        config:rep.type \"openrdf:SailRepository\" ;\n"
238
//                                        + "        config:sail.impl [\n"
239
//                                        + "            config:sail.type \"openrdf:LuceneSail\" ;\n"
240
//                                        + "            config:sail.lucene.indexDir \"index/\" ;\n"
241
//                                        + "            config:delegate [\n"
242
//                                        + "                config:rep.type \"openrdf:SailRepository\" ;\n"
243
//                                        + "                config:sail.impl [\n"
244
//                                        + "                    config:sail.type \"openrdf:NativeStore\" ;\n"
245
//                                        + "                    config:sail.iterationCacheSyncThreshold \"10000\";\n"
246
//                                        + "                    config:native.tripleIndexes \"spoc,posc,ospc,opsc,psoc,sopc,spoc,cpos,cosp,cops,cpso,csop\" ;\n"
247
//                                        + "                    config:sail.defaultQueryEvaluationMode \"STANDARD\"\n"
248
//                                        + "                ]\n"
249
//                                        + "            ]\n"
250
//                                        + "        ]\n"
251
//                                        + "    ].";
252

253
            String indexTypes = "spoc,posc,ospc,cspo,cpos,cosp";
254
            if (repoName.startsWith("meta") || repoName.startsWith("text")) {
255
                indexTypes = "spoc,posc,ospc";
256
            }
257

258
            String createRegularRepoQueryString =
259
                    "@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>.\n" +
260
                    "@prefix rep: <http://www.openrdf.org/config/repository#>.\n" +
261
                    "@prefix sr: <http://www.openrdf.org/config/repository/sail#>.\n" +
262
                    "@prefix sail: <http://www.openrdf.org/config/sail#>.\n" +
263
                    "@prefix sail-luc: <http://www.openrdf.org/config/sail/lucene#>.\n" +
264
                    "@prefix lmdb: <http://rdf4j.org/config/sail/lmdb#>.\n" +
265
                    "@prefix sb: <http://www.openrdf.org/config/sail/base#>.\n" +
266
                    "\n" +
267
                    "[] a rep:Repository ;\n" +
268
                    "    rep:repositoryID \"" + repoName + "\" ;\n" +
269
                    "    rdfs:label \"" + repoName + " LMDB store\" ;\n" +
270
                    "    rep:repositoryImpl [\n" +
271
                    "        rep:repositoryType \"openrdf:SailRepository\" ;\n" +
272
                    "        sr:sailImpl [\n" +
273
                    "            sail:sailType \"rdf4j:LmdbStore\" ;\n" +
274
                    "            sail:iterationCacheSyncThreshold \"10000\";\n" +
275
                    "            lmdb:tripleIndexes \"" + indexTypes + "\" ;\n" +
276
                    "            sb:defaultQueryEvaluationMode \"STANDARD\"\n" +
277
                    "        ]\n"
278
                    + "    ].\n";
279

280
            // TODO Index npa:hasFilterLiteral predicate too (see https://groups.google.com/g/rdf4j-users/c/epF4Af1jXGU):
281
            String createTextRepoQueryString =
282
                    "@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>.\n" +
283
                    "@prefix rep: <http://www.openrdf.org/config/repository#>.\n" +
284
                    "@prefix sr: <http://www.openrdf.org/config/repository/sail#>.\n" +
285
                    "@prefix sail: <http://www.openrdf.org/config/sail#>.\n" +
286
                    "@prefix sail-luc: <http://www.openrdf.org/config/sail/lucene#>.\n" +
287
                    "@prefix lmdb: <http://rdf4j.org/config/sail/lmdb#>.\n" +
288
                    "@prefix sb: <http://www.openrdf.org/config/sail/base#>.\n" +
289
                    "\n"
290
                    + "[] a rep:Repository ;\n" +
291
                    "    rep:repositoryID \"" + repoName + "\" ;\n" +
292
                    "    rdfs:label \"" + repoName + " store\" ;\n" +
293
                    "    rep:repositoryImpl [\n" +
294
                    "        rep:repositoryType \"openrdf:SailRepository\" ;\n" +
295
                    "        sr:sailImpl [\n" +
296
                    "            sail:sailType \"openrdf:LuceneSail\" ;\n" +
297
                    "            sail-luc:indexDir \"index/\" ;\n" +
298
                    "            sail-luc:transactional false ;\n" +
299
                    "            sail:delegate [\n" +
300
                    "              sail:sailType \"rdf4j:LmdbStore\" ;\n" +
301
                    "              sail:iterationCacheSyncThreshold \"10000\";\n" +
302
                    "              lmdb:tripleIndexes \"" + indexTypes + "\" ;\n" +
303
                    "              sb:defaultQueryEvaluationMode \"STANDARD\"\n" +
304
                    "            ]\n" +
305
                    "        ]\n" +
306
                    "    ].";
307

308
            String createRepoQueryString = createRegularRepoQueryString;
309
            if (repoName.startsWith("text")) {
310
                createRepoQueryString = createTextRepoQueryString;
311
            }
312

313
            HttpUriRequest createRepoRequest = RequestBuilder.put().setUri(endpointBase + "repositories/" + repoName).addHeader("Content-Type", "text/turtle").setEntity(new StringEntity(createRepoQueryString)).build();
314

315
            HttpResponse response = httpclient.execute(createRepoRequest);
316
            int statusCode = response.getStatusLine().getStatusCode();
317
            if (statusCode == 409) {
318
                //log.info("Already exists.");
319
                getRepository(repoName).init();
320
            } else if (statusCode >= 200 && statusCode < 300) {
321
                //log.info("Successfully created.");
322
                initNewRepo(repoName);
323
            } else {
324
                log.info("Status code: {}", response.getStatusLine().getStatusCode());
325
                log.info(response.getStatusLine().getReasonPhrase());
326
                String handledResponse = new BasicResponseHandler().handleResponse(response);
327
                log.info("Response: {}", handledResponse);
328
            }
329
        } catch (IOException ex) {
330
            log.info("Could not create repo.", ex);
331
        }
332
    }
333

334
    /**
335
     * Sends shutdown signal to all repositories.
336
     */
337
    @GeneratedFlagForDependentElements
338
    public void shutdownRepositories() {
339
        for (Repository repo : repositories.values()) {
340
            if (repo != null && repo.isInitialized()) {
341
                repo.shutDown();
342
            }
343
        }
344
    }
345

346
    /**
347
     * Returns admin repo connection.
348
     *
349
     * @return repository connection to admin repository
350
     */
351
    @GeneratedFlagForDependentElements
352
    public RepositoryConnection getAdminRepoConnection() {
353
        return get().getRepoConnection(ADMIN_REPO);
354
    }
355

356
    private Set<String> cachedRepositoryNames = Set.of();
9✔
357
    private boolean repoNamesCacheValid = false;
9✔
358
    private final ReadWriteLock repoNamesCacheLock = new ReentrantReadWriteLock();
15✔
359

360
    /**
361
     * Returns set of all repository names.
362
     *
363
     * @return Repository name set
364
     */
365
    public Set<String> getRepositoryNames() {
366
        // See if the repository names are cached:
367
        final var readLock = repoNamesCacheLock.readLock();
12✔
368
        try {
369
            readLock.lock();
6✔
370
            if (repoNamesCacheValid) {
9✔
371
                return cachedRepositoryNames;
15✔
372
            }
373
        } finally {
374
            readLock.unlock();
6✔
375
        }
376

377
        // Not cached, get from server:
378
        final var writeLock = repoNamesCacheLock.writeLock();
12✔
379
        try {
380
            writeLock.lock();
6✔
381
            // Check again if another thread has already updated the cache:
382
            if (repoNamesCacheValid) {
9!
383
                return cachedRepositoryNames;
×
384
            }
385
            Map<String, Boolean> repositoryNames = null;
6✔
386
            try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
6✔
387
                HttpResponse resp = httpclient.execute(RequestBuilder.get()
21✔
388
                        .setUri(endpointBase + "/repositories")
9✔
389
                        .addHeader("Content-Type", "text/csv")
3✔
390
                        .build());
3✔
391
                BufferedReader reader = new BufferedReader(new InputStreamReader(resp.getEntity().getContent()));
30✔
392
                int code = resp.getStatusLine().getStatusCode();
12✔
393
                if (code < 200 || code >= 300) return null;
30!
394
                repositoryNames = new HashMap<>();
12✔
395
                int lineCount = 0;
6✔
396
                while (true) {
397
                    String line = reader.readLine();
9✔
398
                    if (line == null) break;
9✔
399
                    if (lineCount > 0) {
6✔
400
                        String repoName = line.split(",")[1];
18✔
401
                        repositoryNames.put(repoName, true);
18✔
402
                    }
403
                    lineCount = lineCount + 1;
12✔
404
                }
3✔
405
            } catch (IOException ex) {
15!
406
                log.info("Could not get repository names.", ex);
12✔
407
                return null;
12✔
408
            }
3✔
409
            cachedRepositoryNames = repositoryNames.keySet();
12✔
410
            repoNamesCacheValid = true;
9✔
411
            return cachedRepositoryNames;
15✔
412
        } finally {
413
            writeLock.unlock();
6✔
414
        }
415
    }
416

417
    /**
418
     * Invalidates the repository names cache. Call this method when a repository is created or deleted.
419
     */
420
    private void invalidateRepositoryNamesCache() {
421
        final var writeLock = repoNamesCacheLock.writeLock();
×
422
        try {
423
            writeLock.lock();
×
424
            repoNamesCacheValid = false;
×
425
        } finally {
426
            writeLock.unlock();
×
427
        }
428
    }
×
429

430
    @GeneratedFlagForDependentElements
431
    private void initNewRepo(String repoName) {
432
        String repoInitId = new Random().nextLong() + "";
433
        getRepository(repoName).init();
434
        if (!repoName.equals("empty")) {
435
            RepositoryConnection conn = getRepoConnection(repoName);
436
            try (conn) {
437
                // Full isolation, just in case.
438
                conn.begin(IsolationLevels.SERIALIZABLE);
439
                conn.add(NPA.THIS_REPO, NPA.HAS_REPO_INIT_ID, vf.createLiteral(repoInitId), NPA.GRAPH);
440
                if (tracksNanopubCountAndChecksum(repoName)) {
441
                    conn.add(NPA.THIS_REPO, NPA.HAS_NANOPUB_COUNT, vf.createLiteral(0L), NPA.GRAPH);
442
                    conn.add(NPA.THIS_REPO, NPA.HAS_NANOPUB_CHECKSUM, vf.createLiteral(NanopubUtils.INIT_CHECKSUM), NPA.GRAPH);
443
                }
444
                if (repoName.startsWith("pubkey_") || repoName.startsWith("type_")) {
445
                    String h = repoName.replaceFirst("^[^_]+_", "");
446
                    conn.add(NPA.THIS_REPO, NPA.HAS_COVERAGE_ITEM, Utils.getObjectForHash(h), NPA.GRAPH);
447
                    conn.add(NPA.THIS_REPO, NPA.HAS_COVERAGE_HASH, vf.createLiteral(h), NPA.GRAPH);
448
                    conn.add(NPA.THIS_REPO, NPA.HAS_COVERAGE_FILTER, vf.createLiteral("_" + repoName), NPA.GRAPH);
449
                }
450
                conn.commit();
451
            }
452
            // Refresh repository names cache
453
            invalidateRepositoryNamesCache();
454
        }
455
    }
456

457
    /**
458
     * Whether the given repo participates in the cumulative nanopub-count / XOR-checksum
459
     * tracking. Repos that don't hold raw nanopubs skip the
460
     * {@code npa:hasNanopubCount} and {@code npa:hasNanopubChecksum} initial triples —
461
     * leaving them at {@code 0} and the empty-XOR placeholder forever would just be
462
     * misleading. Currently excluded:
463
     * <ul>
464
     *   <li>{@code admin} — holds metadata only.</li>
465
     *   <li>{@code last30d} — content expires on a periodic cleanup.</li>
466
     *   <li>{@code trust} and {@code spaces} — hold derived state, not raw nanopubs.</li>
467
     * </ul>
468
     */
469
    private static boolean tracksNanopubCountAndChecksum(String repoName) {
470
        return !repoName.equals(ADMIN_REPO)
×
471
                && !repoName.equals("last30d")
×
472
                && !repoName.equals("trust")
×
473
                && !repoName.equals("spaces");
×
474
    }
475

476
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc