• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanopub-registry / 22891728305

10 Mar 2026 07:26AM UTC coverage: 27.342% (-0.05%) from 27.394%
22891728305

push

github

tkuhn
fix: close HTTP response in retrieveNanopubsFromPeers to prevent connection pool exhaustion

The returned Stream<MaybeNanopub> wraps an open CloseableHttpResponse but
never registered a close handler. When callers closed the stream, the HTTP
connection was never returned to the pool, eventually causing
ConnectionPoolTimeoutException and blocking all peer sync.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

170 of 680 branches covered (25.0%)

Branch coverage included in aggregate %.

551 of 1957 relevant lines covered (28.16%)

4.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.11
src/main/java/com/knowledgepixels/registry/NanopubLoader.java
1
package com.knowledgepixels.registry;
2

3
import com.mongodb.ErrorCategory;
4
import com.mongodb.MongoWriteException;
5
import com.mongodb.client.ClientSession;
6
import com.mongodb.client.MongoCursor;
7
import net.trustyuri.TrustyUriUtils;
8
import net.trustyuri.rdf.RdfModule;
9
import org.apache.http.Header;
10
import org.apache.http.HttpResponse;
11
import org.apache.http.client.HttpClient;
12
import org.apache.http.client.methods.CloseableHttpResponse;
13
import org.apache.http.client.methods.HttpGet;
14
import org.apache.http.util.EntityUtils;
15
import org.bson.Document;
16
import org.bson.types.Binary;
17
import org.eclipse.rdf4j.common.exception.RDF4JException;
18
import org.eclipse.rdf4j.rio.RDFFormat;
19
import org.nanopub.MalformedNanopubException;
20
import org.nanopub.Nanopub;
21
import org.nanopub.NanopubImpl;
22
import org.nanopub.NanopubUtils;
23
import org.nanopub.extra.server.GetNanopub;
24
import org.nanopub.jelly.JellyUtils;
25
import org.nanopub.jelly.MaybeNanopub;
26
import org.nanopub.jelly.NanopubStream;
27
import org.nanopub.trusty.TrustyNanopubUtils;
28
import org.nanopub.vocabulary.NPX;
29
import org.slf4j.Logger;
30
import org.slf4j.LoggerFactory;
31

32
import java.io.IOException;
33
import java.io.InputStream;
34
import java.util.ArrayList;
35
import java.util.Collections;
36
import java.util.List;
37
import java.util.stream.Stream;
38

39
import static com.knowledgepixels.registry.RegistryDB.has;
40
import static com.knowledgepixels.registry.RegistryDB.insert;
41

42
public class NanopubLoader {
43

44
    private NanopubLoader() {
45
    }
46

47
    public final static String INTRO_TYPE = NPX.DECLARED_BY.stringValue();
9✔
48
    public final static String INTRO_TYPE_HASH = Utils.getHash(INTRO_TYPE);
9✔
49
    public final static String ENDORSE_TYPE = Utils.APPROVES_OF.stringValue();
9✔
50
    public final static String ENDORSE_TYPE_HASH = Utils.getHash(ENDORSE_TYPE);
9✔
51
    private static final Logger log = LoggerFactory.getLogger(NanopubLoader.class);
12✔
52

53
    // TODO Distinguish and support these cases:
54
    //      1. Simple load: load to all core lists if pubkey is "core-loaded", or load to all lists if pubkey is "full-loaded"
55
    //      2. Core load: load to all core lists (initialize if needed), or load to all lists if pubkey is "full-loaded"
56
    //      3. Full load: load to all lists (initialize if needed)
57

58
    public static void simpleLoad(ClientSession mongoSession, String nanopubId) {
59
        simpleLoad(mongoSession, retrieveNanopub(mongoSession, nanopubId));
×
60
    }
×
61

62
    public static void simpleLoad(ClientSession mongoSession, Nanopub np) {
63
        String pubkey = RegistryDB.getPubkey(np);
×
64
        if (pubkey == null) {
×
65
            log.info("Ignore (not signed): {}", np.getUri());
×
66
            return;
×
67
        }
68
        String pubkeyHash = Utils.getHash(pubkey);
×
69
        // TODO Do we need to load anything else here, into the other DB collections?
70
        if (has(mongoSession, "lists", new Document("pubkey", pubkeyHash).append("type", "$").append("status", "loaded"))) {
×
71
            RegistryDB.loadNanopub(mongoSession, np, pubkeyHash, "$");
×
72
        } else if (has(mongoSession, "lists", new Document("pubkey", pubkeyHash).append("type", INTRO_TYPE_HASH).append("status", "loaded"))) {
×
73
            RegistryDB.loadNanopub(mongoSession, np, pubkeyHash, INTRO_TYPE, ENDORSE_TYPE);
×
74
        } else if (!has(mongoSession, "lists", new Document("pubkey", pubkeyHash).append("type", INTRO_TYPE_HASH))) {
×
75
            // Unknown pubkey: create encountered intro list so RUN_OPTIONAL_LOAD picks it up
76
            try {
77
                insert(mongoSession, "lists", new Document("pubkey", pubkeyHash)
×
78
                        .append("type", INTRO_TYPE_HASH)
×
79
                        .append("status", EntryStatus.encountered.getValue()));
×
80
            } catch (MongoWriteException e) {
×
81
                if (e.getError().getCategory() != ErrorCategory.DUPLICATE_KEY) throw e;
×
82
            }
×
83
        }
84
    }
×
85

86
    /**
87
     * Retrieve Nanopubs from the peers of this Nanopub Registry.
88
     *
89
     * @param typeHash   The hash of the type of the Nanopub to retrieve.
90
     * @param pubkeyHash The hash of the pubkey of the Nanopub to retrieve.
91
     * @return A stream of MaybeNanopub objects, or an empty stream if no peer is available.
92
     */
93
    public static Stream<MaybeNanopub> retrieveNanopubsFromPeers(String typeHash, String pubkeyHash) {
94
        // TODO Move the code of this method to nanopub-java library.
95

96
        List<String> peerUrlsToTry = new ArrayList<>(Utils.getPeerUrls());
×
97
        Collections.shuffle(peerUrlsToTry);
×
98
        while (!peerUrlsToTry.isEmpty()) {
×
99
            String peerUrl = peerUrlsToTry.removeFirst();
×
100

101
            String requestUrl = peerUrl + "list/" + pubkeyHash + "/" + typeHash + ".jelly";
×
102
            log.info("Request: {}", requestUrl);
×
103
            try {
104
                CloseableHttpResponse resp = NanopubUtils.getHttpClient().execute(new HttpGet(requestUrl));
×
105
                int httpStatus = resp.getStatusLine().getStatusCode();
×
106
                if (httpStatus < 200 || httpStatus >= 300) {
×
107
                    log.info("Request failed: {} {}", peerUrl, httpStatus);
×
108
                    EntityUtils.consumeQuietly(resp.getEntity());
×
109
                    continue;
×
110
                }
111
                Header nrStatus = resp.getFirstHeader("Nanopub-Registry-Status");
×
112
                if (nrStatus == null) {
×
113
                    log.info("Nanopub-Registry-Status header not found at: {}", peerUrl);
×
114
                    EntityUtils.consumeQuietly(resp.getEntity());
×
115
                    continue;
×
116
                } else if (!nrStatus.getValue().equals("ready") && !nrStatus.getValue().equals("updating")) {
×
117
                    log.info("Peer in non-ready state: {} {}", peerUrl, nrStatus.getValue());
×
118
                    EntityUtils.consumeQuietly(resp.getEntity());
×
119
                    continue;
×
120
                }
121
                InputStream is = resp.getEntity().getContent();
×
122
                return NanopubStream.fromByteStream(is).getAsNanopubs().onClose(() -> {
×
123
                    try {
124
                        resp.close();
×
125
                    } catch (IOException e) {
×
126
                        log.debug("Error closing HTTP response", e);
×
127
                    }
×
128
                });
×
129
            } catch (UnsupportedOperationException | IOException ex) {
×
130
                log.info("Request failed: ", ex);
×
131
            }
132
        }
×
133
        return Stream.empty();
×
134
    }
135

136
    public static Nanopub retrieveNanopub(ClientSession mongoSession, String nanopubId) {
137
        Nanopub np = retrieveLocalNanopub(mongoSession, nanopubId);
×
138
        int tryCount = 0;
×
139
        while (np == null) {
×
140
            if (tryCount > 10) {
×
141
                throw new RuntimeException("Could not load nanopub: " + nanopubId);
×
142
            } else if (tryCount > 0) {
×
143
                try {
144
                    Thread.sleep(100);
×
145
                } catch (InterruptedException ex) {
×
146
                    log.info("Thread was interrupted", ex);
×
147
                }
×
148
            }
149
            log.info("Loading {}", nanopubId);
×
150

151
            // TODO Reach out to other Nanopub Registries here:
152
            np = getNanopub(nanopubId);
×
153
            if (np != null) {
×
154
                RegistryDB.loadNanopub(mongoSession, np);
×
155
            } else {
156
                tryCount = tryCount + 1;
×
157
            }
158
        }
159
        return np;
×
160
    }
161

162
    public static Nanopub retrieveLocalNanopub(ClientSession mongoSession, String nanopubId) {
163
        String ac = TrustyUriUtils.getArtifactCode(nanopubId);
×
164
        MongoCursor<Document> cursor = RegistryDB.get(mongoSession, Collection.NANOPUBS.toString(), new Document("_id", ac));
×
165
        if (!cursor.hasNext()) return null;
×
166
        try {
167
            // Parse from Jelly, not TriG (it's faster)
168
            return JellyUtils.readFromDB(((Binary) cursor.next().get("jelly")).getData());
×
169
        } catch (RDF4JException | MalformedNanopubException ex) {
×
170
            log.info("Exception reading Jelly", ex);
×
171
            return null;
×
172
        }
173
    }
174

175
    // TODO Provide this method in nanopub-java (GetNanopub)
176
    private static Nanopub getNanopub(String uriOrArtifactCode) {
177
        List<String> peerUrls = new ArrayList<>(Utils.getPeerUrls());
×
178
        Collections.shuffle(peerUrls);
×
179
        String ac = GetNanopub.getArtifactCode(uriOrArtifactCode);
×
180
        if (!ac.startsWith(RdfModule.MODULE_ID)) {
×
181
            throw new IllegalArgumentException("Not a trusty URI of type RA");
×
182
        }
183
        while (!peerUrls.isEmpty()) {
×
184
            String peerUrl = peerUrls.removeFirst();
×
185
            try {
186
                Nanopub np = get(ac, peerUrl, NanopubUtils.getHttpClient());
×
187
                if (np != null) {
×
188
                    return np;
×
189
                }
190
            } catch (IOException | RDF4JException | MalformedNanopubException ex) {
×
191
                // ignore
192
            }
×
193
        }
×
194
        return null;
×
195
    }
196

197
    // TODO Provide this method in nanopub-java (GetNanopub)
198
    private static Nanopub get(String artifactCode, String registryUrl, HttpClient httpClient)
199
            throws IOException, RDF4JException, MalformedNanopubException {
200
        HttpGet get = null;
×
201
        // TODO Get in Jelly format:
202
        String getUrl = registryUrl + "np/" + artifactCode;
×
203
        try {
204
            get = new HttpGet(getUrl);
×
205
        } catch (IllegalArgumentException ex) {
×
206
            throw new IOException("invalid URL: " + getUrl);
×
207
        }
×
208
        get.setHeader("Accept", "application/trig");
×
209
        InputStream in = null;
×
210
        try {
211
            HttpResponse resp = httpClient.execute(get);
×
212
            if (!wasSuccessful(resp)) {
×
213
                EntityUtils.consumeQuietly(resp.getEntity());
×
214
                throw new IOException(resp.getStatusLine().toString());
×
215
            }
216
            in = resp.getEntity().getContent();
×
217
            Nanopub nanopub = new NanopubImpl(in, RDFFormat.TRIG);
×
218
            if (!TrustyNanopubUtils.isValidTrustyNanopub(nanopub)) {
×
219
                throw new MalformedNanopubException("Nanopub is not trusty");
×
220
            }
221
            return nanopub;
×
222
        } finally {
223
            if (in != null) in.close();
×
224
        }
225
    }
226

227
    private static boolean wasSuccessful(HttpResponse resp) {
228
        int c = resp.getStatusLine().getStatusCode();
×
229
        return c >= 200 && c < 300;
×
230
    }
231

232
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc