• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Nanopublication / nanopub-java / 17038267114

18 Aug 2025 10:41AM UTC coverage: 47.35% (-1.7%) from 49.072%
17038267114

push

github

Ziroli Plutschow
Now all Unit Tests run successful without network connection.

- Moved the GetIndex test to an Integration Test.

881 of 2880 branches covered (30.59%)

Branch coverage included in aggregate %.

4729 of 8968 relevant lines covered (52.73%)

2.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
src/main/java/org/nanopub/extra/server/FetchIndex.java
1
package org.nanopub.extra.server;
2

3
import net.trustyuri.TrustyUriUtils;
4
import org.apache.http.conn.ConnectionPoolTimeoutException;
5
import org.eclipse.rdf4j.model.IRI;
6
import org.eclipse.rdf4j.rio.RDFFormat;
7
import org.eclipse.rdf4j.rio.RDFHandlerException;
8
import org.nanopub.Nanopub;
9
import org.nanopub.NanopubUtils;
10
import org.nanopub.extra.index.IndexUtils;
11
import org.nanopub.extra.index.NanopubIndex;
12
import org.nanopub.extra.server.RegistryInfo.RegistryInfoException;
13

14
import java.io.OutputStream;
15
import java.util.*;
16

17
/**
18
 * Fetches index.
19
 */
20
public class FetchIndex {
21

22
    /**
23
     * The maximum number of parallel requests that can be made to a single server.
24
     */
25
    public static final int maxParallelRequestsPerServer = 5;
26

27
    private OutputStream out;
28
    private RDFFormat format;
29
    private boolean writeIndex, writeContent;
30
    private boolean running = false;
×
31
    private List<FetchNanopubTask> fetchTasks;
32
    private List<RegistryInfo> registries;
33
    private RegistryInfo localRegistryInfo;
34
    private Map<RegistryInfo, Set<FetchNanopubTask>> serverLoad;
35
    private Map<RegistryInfo, Integer> serverUsage;
36
    private int nanopubCount;
37
    private Listener listener;
38

39
    /**
40
     * Default constructor for FetchIndex.
41
     */
42
    protected FetchIndex() {
×
43
    }
×
44

45
    /**
46
     * Creates a new FetchIndex instance.
47
     *
48
     * @param indexUri         the URI of the index to fetch
49
     * @param out              the output stream to write the fetched nanopubs to
50
     * @param format           the RDF format to use for writing nanopubs
51
     * @param writeIndex       true if the index nanopub should be written, false otherwise
52
     * @param writeContent     true if the content nanopubs should be written, false otherwise
53
     * @param localRegistryUrl the URL of a local registry to use, or null if not needed
54
     */
55
    public FetchIndex(String indexUri, OutputStream out, RDFFormat format, boolean writeIndex, boolean writeContent, String localRegistryUrl) {
×
56
        this.out = out;
×
57
        this.format = format;
×
58
        this.writeIndex = writeIndex;
×
59
        this.writeContent = writeContent;
×
60
        fetchTasks = new ArrayList<>();
×
61
        fetchTasks.add(new FetchNanopubTask(indexUri, true));
×
62
        registries = new ArrayList<>();
×
63
        serverLoad = new HashMap<>();
×
64
        serverUsage = new HashMap<>();
×
65
        ServerIterator serverIterator = new ServerIterator();
×
66
        while (serverIterator.hasNext()) {
×
67
            RegistryInfo registryInfo = serverIterator.next();
×
68
            registries.add(registryInfo);
×
69
            serverLoad.put(registryInfo, new HashSet<>());
×
70
            serverUsage.put(registryInfo, 0);
×
71
        }
×
72
        try {
73
            ServerIterator.writeCachedServers(registries);
×
74
        } catch (Exception ex) {
×
75
        }
×
76
        if (localRegistryUrl != null) {
×
77
            try {
78
                localRegistryInfo = RegistryInfo.load(localRegistryUrl);
×
79
                registries.add(localRegistryInfo);
×
80
                serverLoad.put(localRegistryInfo, new HashSet<>());
×
81
                serverUsage.put(localRegistryInfo, 0);
×
82
            } catch (RegistryInfoException ex) {
×
83
                ex.printStackTrace();
×
84
                return;
×
85
            }
×
86
        }
87
        nanopubCount = 0;
×
88
    }
×
89

90
    /**
91
     * Starts the fetching process.
92
     */
93
    public void run() {
94
        synchronized (this) {
×
95
            if (running) return;
×
96
            running = true;
×
97
        }
×
98
        while (!fetchTasks.isEmpty()) {
×
99
            checkTasks();
×
100
            try {
101
                Thread.sleep(5);
×
102
            } catch (InterruptedException ex) {
×
103
            }
×
104
        }
105
    }
×
106

107
    private void checkTasks() {
108
        for (FetchNanopubTask task : new ArrayList<>(fetchTasks)) {
×
109
            if (task.isRunning()) continue;
×
110
            if (task.isCancelled()) {
×
111
                fetchTasks.remove(task);
×
112
                serverLoad.get(task.getLastRegistry()).remove(task);
×
113
                continue;
×
114
            }
115
            if (task.getLastRegistry() != null) {
×
116
                serverLoad.get(task.getLastRegistry()).remove(task);
×
117
            }
118
            if (task.getNanopub() == null) {
×
119
                if (task.getTriedServersCount() == registries.size()) {
×
120
                    System.err.println("Failed to get " + task.getNanopubUri());
×
121
                    fetchTasks.remove(task);
×
122
                    continue;
×
123
                }
124
                if (localRegistryInfo != null && !task.hasServerBeenTried(localRegistryInfo)) {
×
125
                    assignTask(task, localRegistryInfo);
×
126
                    break;
×
127
                }
128
                List<RegistryInfo> shuffledServers = new ArrayList<>(registries);
×
129
                Collections.shuffle(shuffledServers);
×
130
                for (RegistryInfo registryInfo : shuffledServers) {
×
131
                    if (task.hasServerBeenTried(registryInfo)) continue;
×
132
                    int load = serverLoad.get(registryInfo).size();
×
133
                    if (load >= maxParallelRequestsPerServer) {
×
134
                        continue;
×
135
                    }
136
                    assignTask(task, registryInfo);
×
137
                    break;
×
138
                }
139
            } else if (task.isIndex()) {
×
140
                if (fetchTasks.size() < 3000) {
×
141
                    try {
142
                        Nanopub np = task.getNanopub();
×
143
                        if (!IndexUtils.isIndex(np)) {
×
144
                            throw new RuntimeException("NOT AN INDEX: " + np.getUri());
×
145
                        }
146
                        NanopubIndex npi = IndexUtils.castToIndex(np);
×
147
                        if (writeIndex) {
×
148
                            writeNanopub(npi);
×
149
                        }
150
                        if (writeContent) {
×
151
                            for (IRI elementUri : npi.getElements()) {
×
152
                                fetchTasks.add(new FetchNanopubTask(elementUri.toString(), false));
×
153
                            }
×
154
                        }
155
                        for (IRI subIndexUri : npi.getSubIndexes()) {
×
156
                            // Failing to get subindexes can block the entire process, therefore
157
                            // we launch three sibling tasks at the same time:
158
                            FetchNanopubTask t1 = new FetchNanopubTask(subIndexUri.toString(), true);
×
159
                            fetchTasks.addFirst(t1);
×
160
                            FetchNanopubTask t2 = new FetchNanopubTask(subIndexUri.toString(), true, t1);
×
161
                            fetchTasks.addFirst(t2);
×
162
                            FetchNanopubTask t3 = new FetchNanopubTask(subIndexUri.toString(), true, t1, t2);
×
163
                            fetchTasks.addFirst(t3);
×
164
                        }
×
165
                        if (npi.getAppendedIndex() != null) {
×
166
                            // Failing to get appended indexes can block the entire process, therefore
167
                            // we launch three sibling tasks at the same time:
168
                            FetchNanopubTask t1 = new FetchNanopubTask(npi.getAppendedIndex().toString(), true);
×
169
                            fetchTasks.addFirst(t1);
×
170
                            FetchNanopubTask t2 = new FetchNanopubTask(npi.getAppendedIndex().toString(), true, t1);
×
171
                            fetchTasks.addFirst(t2);
×
172
                            FetchNanopubTask t3 = new FetchNanopubTask(npi.getAppendedIndex().toString(), true, t1, t2);
×
173
                            fetchTasks.addFirst(t3);
×
174
                        }
175
                    } catch (Exception ex) {
×
176
                        throw new RuntimeException(ex);
×
177
                    }
×
178
                    fetchTasks.remove(task);
×
179
                }
180
            } else {
181
                try {
182
                    writeNanopub(task.getNanopub());
×
183
                } catch (Exception ex) {
×
184
                    throw new RuntimeException(ex);
×
185
                }
×
186
                fetchTasks.remove(task);
×
187
            }
188
        }
×
189
    }
×
190

191
    private void writeNanopub(Nanopub np) throws RDFHandlerException {
192
        nanopubCount++;
×
193
        if (listener != null && nanopubCount % 100 == 0) {
×
194
            listener.progress(nanopubCount);
×
195
        }
196
        NanopubUtils.writeToStream(np, out, format);
×
197
    }
×
198

199
    /**
200
     * Returns the number of nanopubs fetched so far.
201
     *
202
     * @return the number of nanopubs fetched so far
203
     */
204
    public int getNanopubCount() {
205
        return nanopubCount;
×
206
    }
207

208
    /**
209
     * Returns the list of registries that are being used.
210
     *
211
     * @return the list of registries
212
     */
213
    public List<RegistryInfo> getRegistries() {
214
        return new ArrayList<>(registries);
×
215
    }
216

217
    /**
218
     * Returns the number of times a server has been used to fetch a nanopub.
219
     *
220
     * @param r the registry info of the server
221
     * @return the number of times the server has been used
222
     */
223
    public int getServerUsage(RegistryInfo r) {
224
        return serverUsage.get(r);
×
225
    }
226

227
    /**
228
     * Sets a listener that will be notified about progress and exceptions.
229
     *
230
     * @param l the listener to set
231
     */
232
    public void setProgressListener(Listener l) {
233
        listener = l;
×
234
    }
×
235

236
    private void assignTask(final FetchNanopubTask task, final RegistryInfo r) {
237
        task.prepareForTryingServer(r);
×
238
        serverLoad.get(r).add(task);
×
239
        Runnable runFetchTask = () -> task.tryServer(r);
×
240
        Thread thread = new Thread(runFetchTask);
×
241
        thread.start();
×
242
    }
×
243

244
    private class FetchNanopubTask {
245

246
        private String npUri;
247
        private boolean isIndex;
248
        private Nanopub nanopub;
249
        private Set<RegistryInfo> registries = new HashSet<>();
×
250
        private boolean running = false;
×
251
        private boolean cancelled = false;
×
252
        private RegistryInfo lastRegistry;
253
        private Set<FetchNanopubTask> siblings;
254

255
        /**
256
         * Creates a new task to fetch a nanopub.
257
         *
258
         * @param npUri    the URI of the nanopub to fetch
259
         * @param isIndex  true if the nanopub is an index, false otherwise
260
         * @param siblings other tasks that are siblings of this task (for sub-indexes)
261
         */
262
        public FetchNanopubTask(String npUri, boolean isIndex, FetchNanopubTask... siblings) {
×
263
            this.npUri = npUri;
×
264
            this.isIndex = isIndex;
×
265
            this.siblings = new HashSet<>(Arrays.asList(siblings));
×
266
            for (FetchNanopubTask s : siblings) {
×
267
                s.siblings.add(this);
×
268
            }
269
        }
×
270

271
        /**
272
         * Returns whether the nanopub is an index or not.
273
         *
274
         * @return true if the nanopub is an index, false otherwise
275
         */
276
        public boolean isIndex() {
277
            return isIndex;
×
278
        }
279

280
        /**
281
         * Returns the nanopub fetched by this task.
282
         *
283
         * @return the fetched nanopub, or null if not yet fetched
284
         */
285
        public Nanopub getNanopub() {
286
            return nanopub;
×
287
        }
288

289
        /**
290
         * Returns the URI of the nanopub to be fetched.
291
         *
292
         * @return the URI of the nanopub
293
         */
294
        public String getNanopubUri() {
295
            return npUri;
×
296
        }
297

298
        /**
299
         * Returns whether the task is currently running.
300
         *
301
         * @return true if the task is running, false otherwise
302
         */
303
        public boolean isRunning() {
304
            return running;
×
305
        }
306

307
        /**
308
         * Returns if the task has been cancelled.
309
         */
310
        public boolean isCancelled() {
311
            return cancelled;
×
312
        }
313

314
        /**
315
         * Returns whether the server has already been tried for this task.
316
         *
317
         * @param r the registry info of the server
318
         * @return true if the server has been tried, false otherwise
319
         */
320
        public boolean hasServerBeenTried(RegistryInfo r) {
321
            return registries.contains(r);
×
322
        }
323

324
        /**
325
         * Returns the number of servers that have been tried so far for this task.
326
         *
327
         * @return the number of tried servers
328
         */
329
        public int getTriedServersCount() {
330
            return registries.size();
×
331
        }
332

333
        /**
334
         * Returns the last registry that was used for this task.
335
         *
336
         * @return the last registry info, or null if no server has been tried yet
337
         */
338
        public RegistryInfo getLastRegistry() {
339
            return lastRegistry;
×
340
        }
341

342
        /**
343
         * Prepares the task for trying a server.
344
         *
345
         * @param r the registry info of the server to try
346
         */
347
        public void prepareForTryingServer(RegistryInfo r) {
348
            registries.add(r);
×
349
            lastRegistry = r;
×
350
            running = true;
×
351
        }
×
352

353
        /**
354
         * Attempts to fetch the nanopub from the specified server.
355
         *
356
         * @param r the registry info of the server to try
357
         */
358
        public void tryServer(RegistryInfo r) {
359
            boolean serverTried = false;
×
360
            try {
361
                serverTried = true;
×
362
                nanopub = GetNanopub.get(TrustyUriUtils.getArtifactCode(npUri), r);
×
363
            } catch (ConnectionPoolTimeoutException ex) {
×
364
                serverTried = false;
×
365
                // too many connection attempts; try again later
366
            } catch (Exception ex) {
×
367
                if (listener != null) listener.exceptionHappened(ex, r, TrustyUriUtils.getArtifactCode(npUri));
×
368
            } finally {
369
                running = false;
×
370
                if (serverTried) {
×
371
                    synchronized (FetchIndex.class) {
×
372
                        if (cancelled) {
×
373
                            // Sibling already did the work...
374
                        } else {
375
                            for (FetchNanopubTask s : siblings) {
×
376
                                s.cancelled = true;
×
377
                            }
×
378
                            serverUsage.put(r, serverUsage.get(r) + 1);
×
379
                        }
380
                    }
×
381
                }
382
            }
383
        }
×
384

385
    }
386

387

388
    /**
389
     * Listener interface for progress and exception handling.
390
     */
391
    public static interface Listener {
392

393
        /**
394
         * Called to report progress.
395
         *
396
         * @param count the number of nanopubs processed so far
397
         */
398
        public void progress(int count);
399

400
        /**
401
         * Called when an exception occurs during fetching.
402
         *
403
         * @param ex           the exception that occurred
404
         * @param r            the registry info of the server where the exception occurred
405
         * @param artifactCode the artifact code of the nanopub that caused the exception
406
         */
407
        public void exceptionHappened(Exception ex, RegistryInfo r, String artifactCode);
408

409
    }
410

411
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc