• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

knowledgepixels / nanodash / 23133579782

16 Mar 2026 08:00AM UTC coverage: 15.975% (+0.2%) from 15.811%
23133579782

Pull #402

github

web-flow
Merge 2ffdda823 into 39c6ac11c
Pull Request #402: Fix unbounded memory growth and resource exhaustion

717 of 5521 branches covered (12.99%)

Branch coverage included in aggregate %.

1811 of 10304 relevant lines covered (17.58%)

2.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.36
src/main/java/com/knowledgepixels/nanodash/ApiCache.java
1
package com.knowledgepixels.nanodash;
2

3
import com.google.common.cache.Cache;
4
import com.google.common.cache.CacheBuilder;
5
import org.eclipse.rdf4j.model.Model;
6
import org.nanopub.extra.services.*;
7
import org.slf4j.Logger;
8
import org.slf4j.LoggerFactory;
9

10
import java.util.HashMap;
11
import java.util.List;
12
import java.util.Map;
13
import java.util.Random;
14
import java.util.concurrent.ConcurrentHashMap;
15
import java.util.concurrent.ConcurrentMap;
16
import java.util.concurrent.TimeUnit;
17

18
/**
19
 * A utility class for caching API responses and maps to reduce redundant API calls.
20
 * This class is thread-safe and ensures that cached data is refreshed periodically.
21
 */
22
public class ApiCache {
23

24
    private ApiCache() {
25
    } // no instances allowed
26

27
    private static final int MAX_CACHE_ENTRIES = 10_000;
28

29
    private static final Cache<String, ApiResponse> cachedResponses = CacheBuilder.newBuilder()
6✔
30
        .maximumSize(MAX_CACHE_ENTRIES)
9✔
31
        .expireAfterAccess(24, TimeUnit.HOURS)
6✔
32
        .removalListener(n -> cleanupMetadata(n.getKey().toString()))
18✔
33
        .build();
6✔
34
    private static final Cache<String, Model> cachedRdfModels = CacheBuilder.newBuilder()
6✔
35
        .maximumSize(MAX_CACHE_ENTRIES)
9✔
36
        .expireAfterAccess(24, TimeUnit.HOURS)
6✔
37
        .removalListener(n -> cleanupMetadata(n.getKey().toString()))
3✔
38
        .build();
6✔
39
    private transient static ConcurrentMap<String, Integer> failed = new ConcurrentHashMap<>();
12✔
40
    private static final Cache<String, Map<String, String>> cachedMaps = CacheBuilder.newBuilder()
6✔
41
        .maximumSize(MAX_CACHE_ENTRIES)
9✔
42
        .expireAfterAccess(24, TimeUnit.HOURS)
6✔
43
        .removalListener(n -> cleanupMetadata(n.getKey().toString()))
3✔
44
        .build();
6✔
45
    private transient static ConcurrentMap<String, Long> lastRefresh = new ConcurrentHashMap<>();
12✔
46
    private transient static ConcurrentMap<String, Long> refreshStart = new ConcurrentHashMap<>();
12✔
47
    private transient static ConcurrentMap<String, Long> runAfter = new ConcurrentHashMap<>();
12✔
48
    private static final Logger logger = LoggerFactory.getLogger(ApiCache.class);
12✔
49

50
    private static void cleanupMetadata(String cacheId) {
51
        lastRefresh.remove(cacheId);
12✔
52
        failed.remove(cacheId);
12✔
53
        runAfter.remove(cacheId);
12✔
54
    }
3✔
55

56
    /**
57
     * Checks if a cache refresh is currently running for the given cache ID.
58
     *
59
     * @param cacheId The unique identifier for the cache.
60
     * @return True if a refresh is running, false otherwise.
61
     */
62
    private static boolean isRunning(String cacheId) {
63
        if (!refreshStart.containsKey(cacheId)) return false;
18✔
64
        return System.currentTimeMillis() - refreshStart.get(cacheId) < 60 * 1000;
42✔
65
    }
66

67
    /**
68
     * Checks if a cache refresh is currently running for the given QueryRef.
69
     *
70
     * @param queryRef The query reference
71
     * @return True if a refresh is running, false otherwise.
72
     */
73
    public static boolean isRunning(QueryRef queryRef) {
74
        return isRunning(queryRef.getAsUrlString());
12✔
75
    }
76

77
    /**
78
     * Updates the cached API response for a specific query reference.
79
     *
80
     * @param queryRef The query reference
81
     * @throws FailedApiCallException If the API call fails.
82
     */
83
    private static void updateResponse(QueryRef queryRef, boolean forced) throws FailedApiCallException, APINotReachableException, NotEnoughAPIInstancesException {
84
        ApiResponse response;
85
        if (forced) {
6✔
86
            response = QueryApiAccess.forcedGet(queryRef);
12✔
87
        } else {
88
            response = QueryApiAccess.get(queryRef);
9✔
89
        }
90
        String cacheId = queryRef.getAsUrlString();
9✔
91
        logger.info("Updating cached API response for {}", cacheId);
12✔
92
        cachedResponses.put(cacheId, response);
12✔
93
        lastRefresh.put(cacheId, System.currentTimeMillis());
18✔
94
    }
3✔
95

96
    public static ApiResponse retrieveResponseSync(QueryRef queryRef, boolean forced) {
97
        long timeNow = System.currentTimeMillis();
6✔
98
        String cacheId = queryRef.getAsUrlString();
9✔
99
        logger.info("Retrieving cached API response synchronously for {}", cacheId);
12✔
100
        boolean needsRefresh = true;
6✔
101
        if (cachedResponses.getIfPresent(cacheId) != null) {
12✔
102
            long cacheAge = timeNow - lastRefresh.get(cacheId);
24✔
103
            needsRefresh = cacheAge > 60 * 1000;
24✔
104
        }
105
        if (failed.get(cacheId) != null && failed.get(cacheId) > 2) {
33!
106
            failed.remove(cacheId);
12✔
107
            throw new RuntimeException("Query failed: " + cacheId);
18✔
108
        }
109
        if ((needsRefresh || forced) && !isRunning(cacheId)) {
21!
110
            logger.info("Refreshing cache for {}", cacheId);
12✔
111
            refreshStart.put(cacheId, timeNow);
18✔
112
            try {
113
                if (runAfter.containsKey(cacheId)) {
12!
114
                    while (System.currentTimeMillis() < runAfter.get(cacheId)) {
×
115
                        Thread.sleep(100);
×
116
                    }
117
                    runAfter.remove(cacheId);
×
118
                }
119
                if (failed.get(cacheId) != null) {
12!
120
                    // 1 second pause between failed attempts;
121
                    Thread.sleep(1000);
×
122
                }
123
                Thread.sleep(100 + new Random().nextLong(400));
24✔
124
            } catch (InterruptedException ex) {
×
125
                logger.error("Interrupted while waiting to refresh cache: {}", ex.getMessage());
×
126
            }
3✔
127
            try {
128
                ApiCache.updateResponse(queryRef, forced);
9✔
129
            } catch (Exception ex) {
3✔
130
                logger.error("Failed to update cache for {}: {}", cacheId, ex.getMessage());
18✔
131
                cachedResponses.invalidate(cacheId);
9✔
132
                failed.merge(cacheId, 1, Integer::sum);
21✔
133
                lastRefresh.put(cacheId, System.currentTimeMillis());
18✔
134
            } finally {
135
                refreshStart.remove(cacheId);
12✔
136
            }
137
        }
138
        return cachedResponses.getIfPresent(cacheId);
15✔
139
    }
140

141
    /**
142
     * Retrieves a cached API response for a specific QueryRef.
143
     *
144
     * @param queryRef The QueryRef object containing the query name and parameters.
145
     * @return The cached API response, or null if not cached.
146
     */
147
    public static ApiResponse retrieveResponseAsync(QueryRef queryRef) {
148
        long timeNow = System.currentTimeMillis();
6✔
149
        String cacheId = queryRef.getAsUrlString();
9✔
150
        logger.info("Retrieving cached API response asynchronously for {}", cacheId);
12✔
151
        boolean isCached = false;
6✔
152
        boolean needsRefresh = true;
6✔
153
        if (cachedResponses.getIfPresent(cacheId) != null) {
12✔
154
            long cacheAge = timeNow - lastRefresh.get(cacheId);
24✔
155
            isCached = cacheAge < 24 * 60 * 60 * 1000;
21!
156
            needsRefresh = cacheAge > 60 * 1000;
18!
157
        }
158
        if (failed.get(cacheId) != null && failed.get(cacheId) > 2) {
12!
159
            failed.remove(cacheId);
×
160
            throw new RuntimeException("Query failed: " + cacheId);
×
161
        }
162
        if (needsRefresh && !isRunning(cacheId)) {
15!
163
            refreshStart.put(cacheId, timeNow);
18✔
164
            NanodashThreadPool.submit(() -> {
15✔
165
                try {
166
                    if (runAfter.containsKey(cacheId)) {
12!
167
                        while (System.currentTimeMillis() < runAfter.get(cacheId)) {
×
168
                            Thread.sleep(100);
×
169
                        }
170
                        runAfter.remove(cacheId);
×
171
                    }
172
                    if (failed.get(cacheId) != null) {
12!
173
                        // 1 second pause between failed attempts;
174
                        Thread.sleep(1000);
×
175
                    }
176
                    Thread.sleep(100 + new Random().nextLong(400));
24✔
177
                } catch (InterruptedException ex) {
×
178
                    logger.error("Interrupted while waiting to refresh cache: {}", ex.getMessage());
×
179
                }
3✔
180
                try {
181
                    ApiCache.updateResponse(queryRef, false);
9✔
182
                } catch (Exception ex) {
×
183
                    logger.error("Failed to update cache for {}: {}", cacheId, ex.getMessage());
×
184
                    cachedResponses.invalidate(cacheId);
×
185
                    failed.merge(cacheId, 1, Integer::sum);
×
186
                    lastRefresh.put(cacheId, System.currentTimeMillis());
×
187
                } finally {
188
                    refreshStart.remove(cacheId);
12✔
189
                }
190
            });
3✔
191
        }
192
        if (isCached) {
6✔
193
            return cachedResponses.getIfPresent(cacheId);
15✔
194
        } else {
195
            return null;
6✔
196
        }
197
    }
198

199
    /**
200
     * Updates the cached map for a specific query reference.
201
     *
202
     * @param queryRef The query reference
203
     * @throws FailedApiCallException If the API call fails.
204
     */
205
    private static void updateMap(QueryRef queryRef) throws FailedApiCallException, APINotReachableException, NotEnoughAPIInstancesException {
206
        Map<String, String> map = new HashMap<>();
×
207
        List<ApiResponseEntry> respList = QueryApiAccess.get(queryRef).getData();
×
208
        while (respList != null && !respList.isEmpty()) {
×
209
            ApiResponseEntry resultEntry = respList.removeFirst();
×
210
            map.put(resultEntry.get("key"), resultEntry.get("value"));
×
211
        }
×
212
        String cacheId = queryRef.getAsUrlString();
×
213
        cachedMaps.put(cacheId, map);
×
214
        lastRefresh.put(cacheId, System.currentTimeMillis());
×
215
    }
×
216

217
    /**
218
     * Retrieves a cached map for a specific query reference.
219
     * If the cache is stale, it triggers a background refresh.
220
     *
221
     * @param queryRef The query reference
222
     * @return The cached map, or null if not cached.
223
     */
224
    public static synchronized Map<String, String> retrieveMap(QueryRef queryRef) {
225
        long timeNow = System.currentTimeMillis();
×
226
        String cacheId = queryRef.getAsUrlString();
×
227
        boolean isCached = false;
×
228
        boolean needsRefresh = true;
×
229
        if (cachedMaps.getIfPresent(cacheId) != null) {
×
230
            long cacheAge = timeNow - lastRefresh.get(cacheId);
×
231
            isCached = cacheAge < 24 * 60 * 60 * 1000;
×
232
            needsRefresh = cacheAge > 60 * 1000;
×
233
        }
234
        if (needsRefresh && !isRunning(cacheId)) {
×
235
            refreshStart.put(cacheId, timeNow);
×
236
            NanodashThreadPool.submit(() -> {
×
237
                try {
238
                    if (runAfter.containsKey(cacheId)) {
×
239
                        while (System.currentTimeMillis() < runAfter.get(cacheId)) {
×
240
                            Thread.sleep(100);
×
241
                        }
242
                        runAfter.remove(cacheId);
×
243
                    }
244
                    Thread.sleep(100 + new Random().nextLong(400));
×
245
                } catch (InterruptedException ex) {
×
246
                    logger.error("Interrupted while waiting to refresh cache: {}", ex.getMessage());
×
247
                }
×
248
                try {
249
                    ApiCache.updateMap(queryRef);
×
250
                } catch (Exception ex) {
×
251
                    logger.error("Failed to update cache for {}: {}", cacheId, ex.getMessage());
×
252
                    cachedResponses.invalidate(cacheId);
×
253
                    lastRefresh.put(cacheId, System.currentTimeMillis());
×
254
                }  finally {
255
                    refreshStart.remove(cacheId);
×
256
                }
257
            });
×
258
        }
259
        if (isCached) {
×
260
            if (cachedResponses.getIfPresent(cacheId) == null) {
×
261
                cachedResponses.invalidate(cacheId);
×
262
                throw new RuntimeException("Query failed: " + cacheId);
×
263
            }
264
            return cachedMaps.getIfPresent(cacheId);
×
265
        } else {
266
            return null;
×
267
        }
268
    }
269

270
    private static void updateRdfModel(QueryRef queryRef) throws FailedApiCallException, APINotReachableException, NotEnoughAPIInstancesException {
271
        final Model[] modelRef = new Model[1];
×
272
        QueryAccess qa = new QueryAccess() {
×
273
            @Override
274
            protected void processHeader(String[] line) {}
×
275
            @Override
276
            protected void processLine(String[] line) {}
×
277
            @Override
278
            protected void processRdfContent(Model model) {
279
                modelRef[0] = model;
×
280
            }
×
281
        };
282
        qa.call(queryRef);
×
283
        if (modelRef[0] == null) {
×
284
            throw new FailedApiCallException(new Exception("No RDF content in response for query: " + queryRef.getQueryId()));
×
285
        }
286
        String cacheId = queryRef.getAsUrlString();
×
287
        logger.info("Updating cached RDF model for {}", cacheId);
×
288
        cachedRdfModels.put(cacheId, modelRef[0]);
×
289
        lastRefresh.put(cacheId, System.currentTimeMillis());
×
290
    }
×
291

292
    /**
293
     * Retrieves a cached RDF model for a CONSTRUCT query, triggering a background fetch if needed.
294
     *
295
     * @param queryRef The QueryRef for the CONSTRUCT query.
296
     * @return The cached RDF Model, or null if not yet available.
297
     */
298
    public static Model retrieveRdfModelAsync(QueryRef queryRef) {
299
        long timeNow = System.currentTimeMillis();
×
300
        String cacheId = queryRef.getAsUrlString();
×
301
        logger.info("Retrieving cached RDF model asynchronously for {}", cacheId);
×
302
        boolean isCached = false;
×
303
        boolean needsRefresh = true;
×
304
        if (cachedRdfModels.getIfPresent(cacheId) != null) {
×
305
            long cacheAge = timeNow - lastRefresh.get(cacheId);
×
306
            isCached = cacheAge < 24 * 60 * 60 * 1000;
×
307
            needsRefresh = cacheAge > 60 * 1000;
×
308
        }
309
        if (failed.get(cacheId) != null && failed.get(cacheId) > 2) {
×
310
            failed.remove(cacheId);
×
311
            throw new RuntimeException("Query failed: " + cacheId);
×
312
        }
313
        if (needsRefresh && !isRunning(cacheId)) {
×
314
            refreshStart.put(cacheId, timeNow);
×
315
            NanodashThreadPool.submit(() -> {
×
316
                try {
317
                    if (runAfter.containsKey(cacheId)) {
×
318
                        while (System.currentTimeMillis() < runAfter.get(cacheId)) {
×
319
                            Thread.sleep(100);
×
320
                        }
321
                        runAfter.remove(cacheId);
×
322
                    }
323
                    if (failed.get(cacheId) != null) {
×
324
                        Thread.sleep(1000);
×
325
                    }
326
                    Thread.sleep(100 + new Random().nextLong(400));
×
327
                } catch (InterruptedException ex) {
×
328
                    logger.error("Interrupted while waiting to refresh RDF cache: {}", ex.getMessage());
×
329
                }
×
330
                try {
331
                    updateRdfModel(queryRef);
×
332
                } catch (Exception ex) {
×
333
                    logger.error("Failed to update RDF cache for {}: {}", cacheId, ex.getMessage());
×
334
                    cachedRdfModels.invalidate(cacheId);
×
335
                    failed.merge(cacheId, 1, Integer::sum);
×
336
                    lastRefresh.put(cacheId, System.currentTimeMillis());
×
337
                } finally {
338
                    refreshStart.remove(cacheId);
×
339
                }
340
            });
×
341
        }
342
        if (isCached) {
×
343
            return cachedRdfModels.getIfPresent(cacheId);
×
344
        } else {
345
            return null;
×
346
        }
347
    }
348

349
    /**
350
     * Clears the cached response for a specific query reference and sets a delay before the next refresh can occur.
351
     *
352
     * @param queryRef   The query reference for which to clear the cache.
353
     * @param waitMillis The amount of time in milliseconds to wait before allowing the cache to be refreshed again.
354
     */
355
    public static void clearCache(QueryRef queryRef, long waitMillis) {
356
        if (waitMillis < 0) {
12✔
357
            throw new IllegalArgumentException("waitMillis must be non-negative");
15✔
358
        }
359
        cachedResponses.invalidate(queryRef.getAsUrlString());
12✔
360
        runAfter.put(queryRef.getAsUrlString(), System.currentTimeMillis() + waitMillis);
27✔
361
    }
3✔
362

363
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc