• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 6820

17 Jul 2025 02:19PM UTC coverage: 36.227%. Remained the same
6820

Pull #1137

github

web-flow
Merge branch 'master' into abort
Pull Request #1137: add abort on timeout

1898 of 5876 branches covered (32.3%)

Branch coverage included in aggregate %.

0 of 1 new or added line in 1 file covered. (0.0%)

12 existing lines in 1 file now uncovered.

8933 of 24022 relevant lines covered (37.19%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.57
/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/spalloc/SpallocConnection.java
1
/*
2
 * Copyright (c) 2018 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.spalloc;
17

18
import static java.lang.Thread.currentThread;
19
import static java.nio.charset.StandardCharsets.UTF_8;
20
import static java.util.Objects.requireNonNull;
21
import static org.slf4j.LoggerFactory.getLogger;
22
import static uk.ac.manchester.spinnaker.spalloc.Utils.makeTimeout;
23
import static uk.ac.manchester.spinnaker.spalloc.Utils.timeLeft;
24
import static uk.ac.manchester.spinnaker.spalloc.Utils.timedOut;
25

26
import java.io.BufferedReader;
27
import java.io.Closeable;
28
import java.io.EOFException;
29
import java.io.IOException;
30
import java.io.InputStreamReader;
31
import java.io.OutputStreamWriter;
32
import java.io.PrintWriter;
33
import java.net.InetSocketAddress;
34
import java.net.Socket;
35
import java.net.SocketTimeoutException;
36
import java.util.HashMap;
37
import java.util.List;
38
import java.util.Map;
39
import java.util.Queue;
40
import java.util.concurrent.ConcurrentLinkedQueue;
41

42
import org.slf4j.Logger;
43

44
import com.google.errorprone.annotations.MustBeClosed;
45
import com.google.errorprone.annotations.concurrent.GuardedBy;
46

47
import uk.ac.manchester.spinnaker.spalloc.exceptions.SpallocProtocolException;
48
import uk.ac.manchester.spinnaker.spalloc.exceptions.SpallocProtocolTimeoutException;
49
import uk.ac.manchester.spinnaker.spalloc.exceptions.SpallocServerException;
50
import uk.ac.manchester.spinnaker.spalloc.messages.Command;
51
import uk.ac.manchester.spinnaker.spalloc.messages.ExceptionResponse;
52
import uk.ac.manchester.spinnaker.spalloc.messages.Notification;
53
import uk.ac.manchester.spinnaker.spalloc.messages.Response;
54
import uk.ac.manchester.spinnaker.spalloc.messages.ReturnResponse;
55

56
/**
57
 * The basic communications layer of the spalloc client. This client assumes
58
 * that the protocol is line-oriented, but does not assume anything about the
59
 * formatting of the contents of the lines.
60
 *
61
 * @author Donal Fellows
62
 */
63
public abstract class SpallocConnection implements Closeable {
64
        private static final Logger log = getLogger(SpallocConnection.class);
2✔
65

66
        /**
67
         * The hostname and port of the spalloc server.
68
         */
69
        private final InetSocketAddress addr;
70

71
        private final Integer defaultTimeout;
72

73
        /**
74
         * Whether this connection is in the dead state. Dead connections have no
75
         * underlying connections open.
76
         */
77
        private boolean dead;
78

79
        /**
80
         * Mapping from threads to sockets. Kept because we need to have way to shut
81
         * down all sockets at once.
82
         */
83
        @GuardedBy("socksLock")
2✔
84
        private final Map<Thread, TextSocket> socks = new HashMap<>();
85

86
        /** Lock for access to {@link #socks}. */
87
        private final Object socksLock = new Object();
2✔
88

89
        /**
90
         * The thread-aware socket factory. Every thread gets exactly one socket.
91
         */
92
        @SuppressWarnings("ThreadLocalUsage")
2✔
93
        private final ThreadLocal<TextSocket> threadLocalSocket =
94
                        ThreadLocal.withInitial(TextSocket::new);
2✔
95

96
        /** A queue of unprocessed notifications. */
97
        protected final Queue<Notification> notifications =
2✔
98
                        new ConcurrentLinkedQueue<>();
99

100
        /**
101
         * Define a new connection. <b>NB:</b> Does not connect to the server until
102
         * {@link #connect()} is called.
103
         *
104
         * @param hostname
105
         *            The hostname of the server.
106
         * @param port
107
         *            The port to use.
108
         * @param timeout
109
         *            The default timeout.
110
         */
111
        @MustBeClosed
112
        protected SpallocConnection(String hostname, int port, Integer timeout) {
2✔
113
                addr = new InetSocketAddress(
2✔
114
                                requireNonNull(hostname, "hostname must not be null"), port);
2✔
115
                this.dead = true;
2✔
116
                this.defaultTimeout = timeout;
2✔
117
        }
2✔
118

119
        /**
120
         * Context adapter. Allows this code to be used like this:
121
         *
122
         * <pre>
123
         * try (var c = client.withConnection()) {
124
         *     ...
125
         * }
126
         * </pre>
127
         *
128
         * @return the auto-closeable context.
129
         * @throws IOException
130
         *             If the connect to the spalloc server fails.
131
         */
132
        @MustBeClosed
133
        public AutoCloseable withConnection() throws IOException {
134
                connect();
2✔
135
                return this::close;
2✔
136
        }
137

138
        private TextSocket getConnection(Integer timeout) throws IOException {
139
                TextSocket sock = null;
2✔
140
                var key = currentThread();
2✔
141
                /*
142
                 * This loop will keep trying to connect until the socket exists and is
143
                 * in a connected state.
144
                 */
145
                do {
146
                        if (dead) {
2✔
147
                                throw new EOFException("not connected");
2✔
148
                        }
149
                        sock = getConnectedSocket(key, timeout);
2✔
150
                } while (sock == null);
2!
151

152
                if (timeout != null) {
2✔
153
                        sock.setSoTimeout(timeout);
2✔
154
                }
155
                return sock;
2✔
156
        }
157

158
        /**
159
         * Try to get a connected socket.
160
         *
161
         * @param key
162
         *            The thread that wants the connection (i.e., the current
163
         *            thread; cached for efficiency).
164
         * @param timeout
165
         *            The socket's timeout.
166
         * @return The socket, or {@code null} if the connection failed.
167
         * @throws IOException
168
         *             if something really bad goes wrong.
169
         */
170
        private TextSocket getConnectedSocket(Thread key, Integer timeout)
171
                        throws IOException {
172
                TextSocket sock;
173
                boolean connectNeeded = false;
2✔
174
                synchronized (socksLock) {
2✔
175
                        sock = threadLocalSocket.get();
2✔
176
                        if (!socks.containsKey(key)) {
2✔
177
                                socks.put(key, sock);
2✔
178
                                connectNeeded = true;
2✔
179
                        }
180
                }
2✔
181

182
                if (connectNeeded) {
2✔
183
                        sock.setSoTimeout(timeout != null ? timeout : 0);
2✔
184
                        if (!doConnect(sock)) {
2!
UNCOV
185
                                closeThreadConnection(key);
×
186
                                return null;
×
187
                        }
188
                }
189
                return sock;
2✔
190
        }
191

192
        private boolean doConnect(Socket sock) throws IOException {
193
                boolean success = false;
2✔
194
                try {
195
                        sock.connect(addr);
2✔
196
                        success = true;
2✔
197
                } catch (IOException e) {
2✔
198
                        if (!e.getMessage().contains("EISCONN")) {
2!
199
                                throw e;
2✔
200
                        }
201
                }
2✔
202
                return success;
2✔
203
        }
204

205
        /**
206
         * (Re)connect to the server.
207
         *
208
         * @throws IOException
209
         *             If a connection failure occurs.
210
         */
211
        public void connect() throws IOException {
212
                connect(defaultTimeout);
2✔
213
        }
2✔
214

215
        /**
216
         * (Re)connect to the server.
217
         *
218
         * @param timeout
219
         *            How long to spend (re)connecting.
220
         * @throws IOException
221
         *             If a connection failure occurs.
222
         */
223
        public void connect(Integer timeout) throws IOException {
224
                // Close any existing connection
225
                var s = threadLocalSocket.get();
2✔
226
                if (s.isClosed()) {
2!
UNCOV
227
                        closeThreadConnection(currentThread());
×
228
                } else if (!s.isConnected()) {
2!
229
                        closeThreadConnection(currentThread());
2✔
230
                }
231
                dead = false;
2✔
232
                try {
233
                        getConnection(timeout);
2✔
234
                } catch (IOException e) {
2✔
235
                        // Failure, try again...
236
                        closeThreadConnection(currentThread());
2✔
237
                        // Pass on the exception
238
                        throw e;
2✔
239
                }
2✔
240
        }
2✔
241

242
        @SuppressWarnings("resource")
243
        private void closeThreadConnection(Thread key) throws IOException {
244
                Socket sock;
245
                synchronized (socksLock) {
2✔
246
                        sock = socks.remove(key);
2✔
247
                }
2✔
248
                if (sock != null) {
2✔
249
                        // Mark the thread local so it will reinitialise
250
                        if (key == currentThread()) {
2✔
251
                                threadLocalSocket.remove();
2✔
252
                        }
253
                        // Close the socket itself
254
                        sock.close();
2✔
255
                }
256
        }
2✔
257

258
        /**
259
         * Disconnect from the server.
260
         *
261
         * @throws IOException
262
         *             if anything goes wrong
263
         */
264
        @Override
265
        public void close() throws IOException {
266
                dead = true;
2✔
267
                List<Thread> keys;
268
                synchronized (socksLock) {
2✔
269
                        // Copy so we can safely remove asynchronously
270
                        keys = List.copyOf(socks.keySet());
2✔
271
                }
2✔
272
                for (var key : keys) {
2✔
273
                        closeThreadConnection(key);
2✔
274
                }
2✔
275
                threadLocalSocket.remove();
2✔
276
        }
2✔
277

278
        private static String readLine(TextSocket sock)
279
                        throws SpallocProtocolTimeoutException, IOException,
280
                        InterruptedException {
281
                try {
282
                        var line = sock.getReader().readLine();
2✔
283
                        if (line == null) {
2✔
284
                                throw new EOFException("Connection closed");
2✔
285
                        }
286
                        return line;
2✔
287
                } catch (SocketTimeoutException e) {
2✔
288
                        if (Thread.interrupted()) {
2!
UNCOV
289
                                throw new InterruptedException("interrupted in readLine");
×
290
                        }
291
                        throw new SpallocProtocolTimeoutException("recv timed out", e);
2✔
292
                }
293
        }
294

295
        /**
296
         * Receive a line from the server with a response.
297
         *
298
         * @param timeout
299
         *            The number of milliseconds to wait before timing out or
300
         *            {@code null} if this function should try again forever.
301
         * @return The unpacked response from the line received.
302
         * @throws SpallocProtocolTimeoutException
303
         *             If a timeout occurs.
304
         * @throws SpallocProtocolException
305
         *             If the socket gets an empty response.
306
         * @throws IOException
307
         *             If the socket is unusable or becomes disconnected.
308
         * @throws InterruptedException
309
         *             If interrupted, eventually
310
         */
311
        protected Response receiveResponse(Integer timeout)
312
                        throws SpallocProtocolTimeoutException, IOException,
313
                        InterruptedException {
314
                if (timeout == null || timeout < 0) {
2!
315
                        timeout = 0;
2✔
316
                }
317
                var sock = getConnection(timeout);
2✔
318

319
                // Wait for some data to arrive
320
                var line = readLine(sock); // Not null; null case throws
2✔
321
                return parseResponse(line);
2✔
322
        }
323

324
        /**
325
         * Attempt to send a command as a line to the server.
326
         *
327
         * @param command
328
         *            The command to serialise.
329
         * @param timeout
330
         *            The number of milliseconds to wait before timing out or
331
         *            {@code null} if this function should try again forever.
332
         * @throws SpallocProtocolTimeoutException
333
         *             If a timeout occurs.
334
         * @throws IOException
335
         *             If the socket is unusable or becomes disconnected.
336
         */
337
        protected void sendCommand(Command<?> command, Integer timeout)
338
                        throws SpallocProtocolTimeoutException, IOException {
339
                if (timeout == null || timeout < 0) {
2!
340
                        timeout = 0;
2✔
341
                }
342
                if (log.isDebugEnabled()) {
2!
UNCOV
343
                        log.debug("sending a {}", command.getClass());
×
344
                }
345
                var sock = getConnection(timeout);
2✔
346

347
                // Send the line
348
                var msg = formatRequest(command);
2✔
349
                try {
350
                        var pw = sock.getWriter();
2✔
351
                        pw.println(msg);
2✔
352
                        if (pw.checkError()) {
2!
353
                                /*
354
                                 * Socket started giving errors; presume EOF since the
355
                                 * PrintWriter won't actually tell us why...
356
                                 */
UNCOV
357
                                throw new EOFException("command write failed");
×
358
                        }
UNCOV
359
                } catch (SocketTimeoutException e) {
×
360
                        throw new SpallocProtocolTimeoutException("send timed out", e);
×
361
                }
2✔
362
        }
2✔
363

364
        /**
365
         * Format a request to be ready to go to the server.
366
         *
367
         * @param command
368
         *            The request to format for sending. Not {@code null}.
369
         * @return The text to send to the server. Will have a newline added.
370
         * @throws IOException
371
         *             If formatting goes wrong.
372
         */
373
        protected abstract String formatRequest(Command<?> command)
374
                        throws IOException;
375

376
        /**
377
         * Parse a response line from the server.
378
         *
379
         * @param line
380
         *            The line to parse. Not {@code null}. Has the terminating
381
         *            newline removed.
382
         * @return The parsed response.
383
         * @throws IOException
384
         *             If parsing completely fails.
385
         * @throws SpallocProtocolException
386
         *             If an unexpected valid JSON message is returned (e.g.,
387
         *             {@code null}).
388
         */
389
        protected abstract Response parseResponse(String line) throws IOException;
390

391
        /**
392
         * Send a command to the server and return the reply.
393
         *
394
         * @param command
395
         *            The command to send.
396
         * @param timeout
397
         *            The number of milliseconds to wait before timing out or
398
         *            {@code null} if this function should wait forever.
399
         * @return The result string returned by the server.
400
         * @throws SpallocServerException
401
         *             If the server sends an error.
402
         * @throws SpallocProtocolTimeoutException
403
         *             If a timeout occurs.
404
         * @throws SpallocProtocolException
405
         *             If the connection is unavailable or is closed.
406
         * @throws InterruptedException
407
         *             If interrupted, eventually
408
         */
409
        protected String call(Command<?> command, Integer timeout)
410
                        throws SpallocServerException, SpallocProtocolTimeoutException,
411
                        SpallocProtocolException, InterruptedException {
412
                try {
413
                        var finishTime = makeTimeout(timeout);
2✔
414

415
                        // Construct and send the command message
416
                        sendCommand(command, timeout);
2✔
417

418
                        // Command sent! Attempt to receive the response...
419
                        while (!timedOut(finishTime)) {
2!
420
                                var r = receiveResponse(timeLeft(finishTime));
2✔
421
                                if (r == null) {
2!
UNCOV
422
                                        continue;
×
423
                                }
424
                                if (r instanceof ReturnResponse) {
2✔
425
                                        // Success!
426
                                        return ((ReturnResponse) r).getReturnValue();
2✔
427
                                } else if (r instanceof ExceptionResponse) {
2✔
428
                                        // Server error!
429
                                        throw new SpallocServerException((ExceptionResponse) r);
2✔
430
                                } else if (r instanceof Notification) {
2!
431
                                        // Got a notification, keep trying...
432
                                        notifications.add((Notification) r);
2✔
433
                                } else {
UNCOV
434
                                        throw new SpallocProtocolException(
×
UNCOV
435
                                                        "bad response: " + r.getClass());
×
436
                                }
437
                        }
2✔
438
                        throw new SpallocProtocolTimeoutException(
×
UNCOV
439
                                        "timed out while calling " + command.getCommand());
×
440
                } catch (SpallocProtocolTimeoutException e) {
2✔
441
                        throw e;
2✔
442
                } catch (IOException e) {
×
UNCOV
443
                        throw new SpallocProtocolException(e);
×
444
                }
445
        }
446

447
        /**
448
         * Subclass of Socket to encapsulate reading and writing text by lines. This
449
         * handles all buffering internally, locking that close to the socket
450
         * itself.
451
         */
452
        private static final class TextSocket extends Socket {
453
                private BufferedReader br;
454

455
                private PrintWriter pw;
456

457
                PrintWriter getWriter() throws IOException {
458
                        if (pw == null) {
2✔
459
                                pw = new PrintWriter(
2✔
460
                                                new OutputStreamWriter(getOutputStream(), UTF_8));
2✔
461
                        }
462
                        return pw;
2✔
463
                }
464

465
                BufferedReader getReader() throws IOException {
466
                        if (br == null) {
2✔
467
                                br = new BufferedReader(
2✔
468
                                                new InputStreamReader(getInputStream(), UTF_8));
2✔
469
                        }
470
                        return br;
2✔
471
                }
472
        }
473

474
        @Override
475
        public String toString() {
UNCOV
476
                return addr + " dead: " + dead + "  " + defaultTimeout;
×
477
        }
478
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc