• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kermitt2 / grobid / 388

pending completion
388

push

circleci

update logs

3 of 3 new or added lines in 2 files covered. (100.0%)

14846 of 37503 relevant lines covered (39.59%)

0.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.34
/grobid-core/src/main/java/org/grobid/core/utilities/crossref/CrossrefClient.java
1
package org.grobid.core.utilities.crossref;
2

3
import java.io.Closeable;
4
import java.io.IOException;
5
import java.net.URISyntaxException;
6
import java.util.Map;
7
import java.util.HashMap;
8
import java.util.List;
9
import java.util.ArrayList;
10
import java.util.concurrent.*;
11

12
import org.apache.commons.lang3.concurrent.TimedSemaphore;
13
import org.apache.http.client.ClientProtocolException;
14
import org.grobid.core.utilities.crossref.CrossrefRequestListener.Response;
15
import org.slf4j.Logger;
16
import org.slf4j.LoggerFactory;
17

18
/**
19
 * Request pool to get data from api.crossref.org without exceeding limits
20
 * supporting multi-thread.
21
 *
22
 * Note: the provided interval for the query rate returned by CrossRef appeared to be note reliable, 
23
 * so we have to use the rate limit (X-Rate-Limit-Interval) as a global parallel query limit, without 
24
 * interval consideration.  
25
 * See https://github.com/kermitt2/grobid/pull/725
26
 * 
27
 */
28
public class CrossrefClient implements Closeable {
29
        public static final Logger logger = LoggerFactory.getLogger(CrossrefClient.class);
1✔
30
        
31
        protected static volatile CrossrefClient instance;
32

33
        protected volatile ExecutorService executorService;
34

35
        protected int max_pool_size = 1;
1✔
36
        protected static boolean limitAuto = true;
1✔
37

38
        // this list is used to maintain a list of Futures that were submitted,
39
        // that we can use to check if the requests are completed
40
        //private List<Future<?>> futures = new ArrayList<Future<?>>();
41
        protected volatile Map<Long, List<Future<?>>> futures = new HashMap<>();
1✔
42

43
        public static CrossrefClient getInstance() {
44
        if (instance == null) {
1✔
45
                        getNewInstance();
1✔
46
                }
47
        return instance;
1✔
48
    }
49

50
    /**
51
     * Creates a new instance.
52
     */
53
        private static synchronized void getNewInstance() {
54
                logger.debug("Get new instance of CrossrefClient");
1✔
55
                instance = new CrossrefClient();
1✔
56
        }
1✔
57

58
    /**
59
     * Hidden constructor
60
     */
61
    protected CrossrefClient() {
1✔
62
            // note: by default timeout with newCachedThreadPool is set to 60s, which might be too much for crossref usage,
63
            // hanging grobid significantly, so we might want to use rather a custom instance of ThreadPoolExecutor and set 
64
            // the timeout sifferently
65
                this.executorService = Executors.newCachedThreadPool(r -> {
1✔
66
            Thread t = Executors.defaultThreadFactory().newThread(r);
×
67
            t.setDaemon(true);
×
68
            return t;
×
69
        });
70
                this.futures = new HashMap<>();
1✔
71
                setLimits(1, 1000);
1✔
72
        }
1✔
73

74
        public static void printLog(CrossrefRequest<?> request, String message) {
75
                logger.debug((request != null ? request+": " : "")+message);
×
76
                //System.out.println((request != null ? request+": " : "")+message);
77
        }
×
78
        
79
        public void setLimits(int iterations, int interval) {
80
                this.setMax_pool_size(iterations);
1✔
81
                // interval is not usable anymore, we need to wait termination of threads independently from any time interval
82
        }
1✔
83
        
84
        public void updateLimits(int iterations, int interval) {
85
                if (this.limitAuto) {
×
86
                        //printLog(null, "Updating limits... " + iterations + " / " + interval);
87
                        this.setLimits(iterations, interval);
×
88
                        // note: interval not used anymore
89
                }
90
        }
×
91
        
92
        /**
93
         * Push a request in pool to be executed as soon as possible, then wait a response through the listener.
94
         * API Documentation : https://github.com/CrossRef/rest-api-doc/blob/master/rest_api.md
95
         */
96
        public <T extends Object> void pushRequest(CrossrefRequest<T> request, CrossrefRequestListener<T> listener, 
97
                long threadId) throws URISyntaxException, ClientProtocolException, IOException {
98
                if (listener != null)
×
99
                        request.addListener(listener);
×
100
                synchronized(this) {
×
101
                        // we limit the number of active threads to the crossref api dynamic limit returned in the response header
102
                        while(((ThreadPoolExecutor)executorService).getActiveCount() >= this.getMax_pool_size()) {
×
103
                                try {
104
                                        TimeUnit.MICROSECONDS.sleep(10);
×
105
                                } catch (InterruptedException e) {
×
106
                                        e.printStackTrace();
×
107
                                }
×
108
                        }
109
                        Future<?> f = executorService.submit(new CrossrefRequestTask<T>(this, request));
×
110
                        List<Future<?>> localFutures = this.futures.get(new Long(threadId));
×
111
                        if (localFutures == null)
×
112
                                localFutures = new ArrayList<Future<?>>();
×
113
                        localFutures.add(f);
×
114
                        this.futures.put(threadId, localFutures);
×
115
                        logger.debug("add request to thread " + threadId +
×
116
                                        "active threads count is now " + ((ThreadPoolExecutor) executorService).getActiveCount()
×
117
                        );
118
//System.out.println("add request to thread " + threadId + " / current total for the thread: " +  localFutures.size());                        
119
                }
×
120
        }
×
121
        
122
        /**
123
         * Push a request in pool to be executed soon as possible, then wait a response through the listener.
124
         * @see <a href="https://github.com/CrossRef/rest-api-doc/blob/master/rest_api.md">Crossref API Documentation</a>
125
         * 
126
         * @param params                query parameters, can be null, ex: ?query.title=[title]&query.author=[author]
127
         * @param deserializer        json response deserializer, ex: WorkDeserializer to convert Work to BiblioItem
128
         * @param threadId                the java identifier of the thread providing the request (e.g. via Thread.currentThread().getId())
129
         * @param listener                catch response from request
130
         */
131
        public <T extends Object> void pushRequest(String model, Map<String, String> params, CrossrefDeserializer<T> deserializer, 
132
                        long threadId, CrossrefRequestListener<T> listener) throws URISyntaxException, ClientProtocolException, IOException {
133
                CrossrefRequest<T> request = new CrossrefRequest<T>(model, params, deserializer);
×
134
                synchronized(this) {
×
135
                        this.<T>pushRequest(request, listener, threadId);
×
136
                }
×
137
        }
×
138

139
        /**
140
         * Wait for all request from a specific thread to be completed
141
         */
142
        public void finish(long threadId) {
143
                synchronized(this.futures) {
×
144
                        try {
145
                                List<Future<?>> threadFutures = this.futures.get(new Long(threadId));
×
146
                                if (threadFutures != null) {
×
147
//System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< thread: " + threadId + " / waiting for " + threadFutures.size() + " requests to finish...");
148
                                        for(Future<?> future : threadFutures) {
×
149
                                                future.get();
×
150
                                                // get will block until the future is done
151
                                        }
×
152
                                        this.futures.remove(threadId);
×
153
                                }
154
                        } catch (InterruptedException ie) {
×
155
                                 // Preserve interrupt status
156
                                 Thread.currentThread().interrupt();
×
157
                        } catch (ExecutionException ee) {
×
158
                                logger.error("CrossRef request execution fails");
×
159
                        }
×
160
                }
×
161
        }
×
162

163
        public int getMax_pool_size() {
164
                return max_pool_size;
×
165
        }
166

167
        public void setMax_pool_size(int max_pool_size) {
168
                this.max_pool_size = max_pool_size;
1✔
169
        }
1✔
170

171
        @Override
172
        public void close() throws IOException {
173
                executorService.shutdown();
1✔
174
        }
1✔
175
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc