• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 6233274834

19 Sep 2023 08:46AM UTC coverage: 36.409% (-0.6%) from 36.982%
6233274834

Pull #658

github

dkfellows
Merge branch 'master' into java-17
Pull Request #658: Update Java version to 17

1656 of 1656 new or added lines in 260 files covered. (100.0%)

8373 of 22997 relevant lines covered (36.41%)

0.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/SpiNNaker-allocserv/src/main/java/uk/ac/manchester/spinnaker/alloc/proxy/ProxyCore.java
1
/*
2
 * Copyright (c) 2022 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.alloc.proxy;
17

18
import static java.net.InetAddress.getByName;
19
import static java.nio.charset.StandardCharsets.UTF_8;
20
import static org.slf4j.LoggerFactory.getLogger;
21
import static org.springframework.web.socket.CloseStatus.BAD_DATA;
22
import static org.springframework.web.socket.CloseStatus.SERVER_ERROR;
23
import static uk.ac.manchester.spinnaker.alloc.proxy.ProxyOp.CLOSE;
24
import static uk.ac.manchester.spinnaker.alloc.proxy.ProxyOp.OPEN_UNCONNECTED;
25
import static uk.ac.manchester.spinnaker.messages.Constants.WORD_SIZE;
26
import static uk.ac.manchester.spinnaker.utils.ByteBufferUtils.alloc;
27
import static uk.ac.manchester.spinnaker.utils.UnitConstants.KILOBYTE;
28
import static uk.ac.manchester.spinnaker.utils.UnitConstants.MSEC_PER_SEC;
29

30
import java.io.IOException;
31
import java.net.InetAddress;
32
import java.net.UnknownHostException;
33
import java.nio.BufferUnderflowException;
34
import java.nio.ByteBuffer;
35
import java.util.HashMap;
36
import java.util.List;
37
import java.util.Map;
38
import java.util.Set;
39
import java.util.concurrent.Executor;
40
import java.util.function.IntSupplier;
41

42
import org.slf4j.Logger;
43
import org.springframework.web.socket.BinaryMessage;
44
import org.springframework.web.socket.WebSocketSession;
45
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
46

47
import com.google.errorprone.annotations.concurrent.GuardedBy;
48

49
import uk.ac.manchester.spinnaker.alloc.model.ConnectionInfo;
50
import uk.ac.manchester.spinnaker.machine.ChipLocation;
51

52
/**
53
 * The main proxy class for a particular web socket session. It's bound to a
54
 * job, which should be running at the time that the web socket is opened.
55
 *
56
 * @author Donal Fellows
57
 */
58
public class ProxyCore implements AutoCloseable {
59
        private static final Logger log = getLogger(ProxyCore.class);
×
60

61
        private static final int MAX_PORT = 65535;
62

63
        private static final int RESPONSE_WORDS = 2;
64

65
        private static final int ID_WORDS = 1;
66

67
        private static final int IP_ADDR_AND_PORT_WORDS = 2;
68

69
        /**
70
         * Max amount of time to wait to send a message on the websocket, in
71
         * milliseconds.
72
         */
73
        private static final int SEND_TIME_LIMIT_MS = 10 * MSEC_PER_SEC;
74

75
        /**
76
         * Max amount of space (in bytes) that may be used to buffer messages
77
         * waiting to be sent on the websocket.
78
         */
79
        private static final int SEND_BUFFER_SIZE = 512 * KILOBYTE;
80

81
        private final WebSocketSession session;
82

83
        private final Map<ChipLocation, InetAddress> hosts = new HashMap<>();
×
84

85
        @GuardedBy("itself")
×
86
        private final Map<Integer, ProxyUDPConnection> conns = new HashMap<>();
87

88
        private final Set<InetAddress> recvFrom;
89

90
        private final IntSupplier idIssuer;
91

92
        private final Executor executor;
93

94
        private final boolean writeCounts;
95

96
        private final InetAddress localHost;
97

98
        /**
99
         * @param s
100
         *            The websocket session.
101
         * @param connections
102
         *            What boards may this session talk to.
103
         * @param executor
104
         *            What runs the worker tasks.
105
         * @param idIssuer
106
         *            Provides connection IDs. These will never be zero.
107
         * @param writeCounts
108
         *            Whether to write the number of messages sent and received on
109
         *            the proxied connections to the log.
110
         * @param localHost
111
         *            The local address for sockets talking to the machines. If
112
         *            {@code null}, opening a general receiver socket will not
113
         *            work.
114
         */
115
        ProxyCore(WebSocketSession s, List<ConnectionInfo> connections,
116
                        Executor executor, IntSupplier idIssuer, boolean writeCounts,
117
                        InetAddress localHost) {
×
118
                session = new ConcurrentWebSocketSessionDecorator(s, SEND_TIME_LIMIT_MS,
×
119
                                SEND_BUFFER_SIZE);
120
                this.executor = executor;
×
121
                this.idIssuer = idIssuer;
×
122
                this.writeCounts = writeCounts;
×
123
                this.localHost = localHost;
×
124
                for (var ci : connections) {
×
125
                        try {
126
                                hosts.put(ci.chip(), getByName(ci.hostname()));
×
127
                        } catch (UnknownHostException e) {
×
128
                                log.warn("unexpectedly unknown board address: {}",
×
129
                                                ci.hostname(), e);
×
130
                        }
×
131
                }
×
132
                recvFrom = Set.copyOf(hosts.values());
×
133
        }
×
134

135
        @FunctionalInterface
136
        private interface Impl {
137
                ByteBuffer call(ByteBuffer message) throws IOException;
138
        }
139

140
        private Impl decode(int opcode) {
141
                return switch (ProxyOp.values()[opcode]) {
×
142
                case OPEN -> this::openConnectedChannel;
×
143
                case CLOSE -> this::closeChannel;
×
144
                case MESSAGE -> this::sendMessage;
×
145
                case OPEN_UNCONNECTED -> this::openUnconnectedChannel;
×
146
                case MESSAGE_TO -> this::sendMessageTo;
×
147
                default -> this::unexpectedMessage;
×
148
                };
149
        }
150

151
        private ByteBuffer unexpectedMessage(ByteBuffer message) {
152
                int opcode = message.getInt(0);
×
153
                log.error("unexpected message: {}", opcode);
×
154
                return composeError(-1, new Exception("unexpected message: " + opcode));
×
155
        }
156

157
        /**
158
         * Handle a message sent to this service on the web socket associated with
159
         * this object.
160
         *
161
         * @param message
162
         *            The content of the message.
163
         * @throws IOException
164
         *             Not expected; implementations don't actually throw
165
         */
166
        public final void handleClientMessage(ByteBuffer message)
167
                        throws IOException {
168
                try {
169
                        var impl = decode(message.getInt());
×
170
                        var reply = impl != null ? impl.call(message) : null;
×
171
                        if (reply != null) {
×
172
                                session.sendMessage(new BinaryMessage(reply.flip()));
×
173
                        }
174
                } catch (IOException e) {
×
175
                        log.error("Closing data on server error", e);
×
176
                        session.close(SERVER_ERROR);
×
177
                } catch (IllegalArgumentException | BufferUnderflowException e) {
×
178
                        log.error("Closing session on bad data", e);
×
179
                        session.close(BAD_DATA);
×
180
                }
×
181
        }
×
182

183
        private static ByteBuffer response(ProxyOp op, int correlationId,
184
                        int additionalWords) {
185
                int nWords = RESPONSE_WORDS + additionalWords;
×
186
                var msg = alloc(nWords * WORD_SIZE);
×
187
                msg.putInt(op.ordinal());
×
188
                msg.putInt(correlationId);
×
189
                return msg;
×
190
        }
191

192
        private static final int MAX_EXN_LENGTH = 1000;
193

194
        private static void assertEndOfMessage(ByteBuffer message) {
195
                if (message.hasRemaining()) {
×
196
                        throw new IllegalArgumentException("extra content ("
×
197
                                        + message.remaining() + " bytes) supplied in message");
×
198
                }
199
        }
×
200

201
        private static ByteBuffer composeError(int corId, Exception ex) {
202
                var msg = ex.getMessage();
×
203
                while (true) {
204
                        if (msg == null) {
×
205
                                msg = "";
×
206
                        }
207
                        var msgBytes = msg.getBytes(UTF_8);
×
208
                        if (msgBytes.length > MAX_EXN_LENGTH) {
×
209
                                msg = null;
×
210
                                continue;
×
211
                        }
212
                        var data = alloc(WORD_SIZE + WORD_SIZE + msgBytes.length);
×
213
                        data.putInt(ProxyOp.ERROR.ordinal());
×
214
                        data.putInt(corId);
×
215
                        data.put(msgBytes);
×
216
                        return data;
×
217
                }
218
        }
219

220
        /**
221
         * Open a connected channel in response to a {@link ProxyOp#OPEN} message.
222
         * Note that no control over the local (to the service) port number or
223
         * address is provided, nor is a mechanism given to easily make available
224
         * what address is used (though it can be obtained from the IPTag).
225
         *
226
         * @param message
227
         *            The message received. The initial 4-byte type code will have
228
         *            been already read out of the buffer.
229
         * @return The response message to send, in the bytes leading up to the
230
         *         position. The caller will {@linkplain ByteBuffer#flip() flip} the
231
         *         message. Underlying failures that are not due to outright
232
         *         protocol abuse will be reported to users as a
233
         *         {@link ProxyOp#ERROR} message.
234
         * @throws IllegalArgumentException
235
         *             If insufficient or too many arguments are supplied.
236
         */
237
        protected ByteBuffer openConnectedChannel(ByteBuffer message) {
238
                // This method handles message parsing/assembly and validation
239
                int corId = message.getInt();
×
240
                int x = message.getInt();
×
241
                int y = message.getInt();
×
242
                int port = message.getInt();
×
243
                assertEndOfMessage(message);
×
244

245
                try {
246
                        var who = getTargetHost(x, y);
×
247
                        validatePort(port);
×
248

249
                        int id = openConnected(who, port);
×
250

251
                        var msg = response(ProxyOp.OPEN, corId, ID_WORDS);
×
252
                        msg.putInt(id);
×
253
                        return msg;
×
254
                } catch (Exception e) {
×
255
                        log.error("failed to open connected channel", e);
×
256
                        return composeError(corId, e);
×
257
                }
258
        }
259

260
        /**
261
         * Open a connection.
262
         *
263
         * @param who
264
         *            What board IP address are we connecting to?
265
         * @param port
266
         *            What port are we connecting to?
267
         * @return The connection ID.
268
         * @throws IOException
269
         *             If the proxy connection can't be opened.
270
         */
271
        private int openConnected(InetAddress who, int port) throws IOException {
272
                // This method actually makes a connection and listener thread
273
                int id = idIssuer.getAsInt();
×
274
                var conn = new ProxyUDPConnection(session, who, port, id,
×
275
                                () -> removeConnection(id), null);
×
276
                setConnection(id, conn);
×
277

278
                // Start sending messages received from the board
279
                executor.execute(conn::connectedReceiverTask);
×
280

281
                log.info("opened proxy connected channel {}:{} to {}:{}", session, id,
×
282
                                who, port);
×
283
                return id;
×
284
        }
285

286
        private InetAddress getTargetHost(int x, int y) {
287
                var who = hosts.get(new ChipLocation(x, y));
×
288
                if (who == null) {
×
289
                        throw new IllegalArgumentException("unrecognised ethernet chip");
×
290
                }
291
                return who;
×
292
        }
293

294
        /**
295
         * Open an unconnected channel in response to a
296
         * {@link ProxyOp#OPEN_UNCONNECTED} message. Note that no control over the
297
         * local (to the service) port number or address is provided, but the IP
298
         * address and port opened are in the return message.
299
         *
300
         * @param message
301
         *            The message received. The initial 4-byte type code will have
302
         *            been already read out of the buffer.
303
         * @return The response message to send, in the bytes leading up to the
304
         *         position. The caller will {@linkplain ByteBuffer#flip() flip} the
305
         *         message. Underlying failures that are not due to outright
306
         *         protocol abuse will be reported to users as a
307
         *         {@link ProxyOp#ERROR} message.
308
         * @throws IllegalArgumentException
309
         *             If insufficient or too many arguments are supplied.
310
         */
311
        protected ByteBuffer openUnconnectedChannel(ByteBuffer message) {
312
                // This method handles message parsing/assembly and validation
313
                int corId = message.getInt();
×
314
                assertEndOfMessage(message);
×
315

316
                try {
317
                        var uc = openUnconnected();
×
318
                        var addr = uc.localAddr().getAddress();
×
319
                        log.debug("Unconnected channel local address: {}", addr);
×
320

321
                        var msg = response(OPEN_UNCONNECTED, corId,
×
322
                                        ID_WORDS + IP_ADDR_AND_PORT_WORDS);
323
                        msg.putInt(uc.id());
×
324
                        msg.put(addr);
×
325
                        msg.putInt(uc.localPort());
×
326
                        return msg;
×
327
                } catch (Exception e) {
×
328
                        log.error("failed to open unconnected channel", e);
×
329
                        return composeError(corId, e);
×
330
                }
331
        }
332

333
        private record Unconnected(int id, InetAddress localAddr, int localPort) {
×
334
        }
335

336
        private Unconnected openUnconnected() throws IOException {
337
                int id = idIssuer.getAsInt();
×
338
                var conn = new ProxyUDPConnection(session, null, 0, id,
×
339
                                () -> removeConnection(id), localHost);
×
340
                setConnection(id, conn);
×
341
                var who = conn.getLocalIPAddress();
×
342
                int port = conn.getLocalPort();
×
343

344
                // Start sending messages received from the board
345
                executor.execute(() -> conn.eieioReceiverTask(recvFrom));
×
346

347
                log.info("opened proxy unconnected channel {}:{} from {}:{}", session,
×
348
                                id, who, port);
×
349
                return new Unconnected(id, who, port);
×
350
        }
351

352
        /**
353
         * Close a channel in response to a {@link ProxyOp#CLOSE} message. It's not
354
         * an error to close a channel twice
355
         *
356
         * @param message
357
         *            The message received. The initial 4-byte type code will have
358
         *            been already read out of the buffer.
359
         * @return The response message to send, in the bytes leading up to the
360
         *         position. The caller will {@linkplain ByteBuffer#flip() flip} the
361
         *         message. Underlying failures that are not due to outright
362
         *         protocol abuse will be reported to users as a
363
         *         {@link ProxyOp#ERROR} message.
364
         * @throws IllegalArgumentException
365
         *             If insufficient or too many arguments are supplied.
366
         */
367
        protected ByteBuffer closeChannel(ByteBuffer message) {
368
                int corId = message.getInt();
×
369
                int id = message.getInt();
×
370
                assertEndOfMessage(message);
×
371
                try {
372
                        var msg = response(CLOSE, corId, ID_WORDS);
×
373
                        msg.putInt(closeChannel(id));
×
374
                        return msg;
×
375
                } catch (Exception e) {
×
376
                        log.error("failed to close channel", e);
×
377
                        return composeError(corId, e);
×
378
                }
379
        }
380

381
        private int closeChannel(int id) throws IOException {
382
                @SuppressWarnings("resource")
383
                var conn = removeConnection(id);
×
384
                if (!isValid(conn)) {
×
385
                        return 0;
×
386
                }
387
                conn.close();
×
388
                // Thread will shut down now that the proxy is closed
389
                log.debug("closed proxy channel {}:{}", session, id);
×
390
                if (writeCounts) {
×
391
                        conn.writeCountsToLog();
×
392
                }
393
                return id;
×
394
        }
395

396
        /**
397
         * Send a message on a channel in response to a {@link ProxyOp#MESSAGE}
398
         * message. It's not an error to send on a non-existant or closed channel.
399
         *
400
         * @param message
401
         *            The message received. The initial 4-byte type code will have
402
         *            been already read out of the buffer.
403
         * @return The response message to send, in the bytes leading up to the
404
         *         position. The caller will {@linkplain ByteBuffer#flip() flip} the
405
         *         message. If {@code null}, no response will be sent (expected case
406
         *         for this operation!)
407
         * @throws IOException
408
         *             If the proxy connection can't be used.
409
         */
410
        protected ByteBuffer sendMessage(ByteBuffer message) throws IOException {
411
                int id = message.getInt();
×
412
                log.trace("got message for channel {}", id);
×
413
                var conn = getConnection(id);
×
414
                if (isValid(conn) && conn.getRemoteIPAddress() != null) {
×
415
                        var payload = message.slice();
×
416
                        log.trace("sending message to {} of length {}", conn,
×
417
                                        payload.remaining());
×
418
                        conn.sendMessage(payload);
×
419
                }
420
                return null;
×
421
        }
422

423
        /**
424
         * Send a message to a particular destination on a channel in response to a
425
         * {@link ProxyOp#MESSAGE_TO} message. It's not an error to send on a
426
         * non-existent or closed channel. It is an error to use this operation on a
427
         * channel that has a bound remote host address.
428
         *
429
         * @param message
430
         *            The message received. The initial 4-byte type code will have
431
         *            been already read out of the buffer.
432
         * @return The response message to send, in the bytes leading up to the
433
         *         position. The caller will {@linkplain ByteBuffer#flip() flip} the
434
         *         message. If {@code null}, no response will be sent (expected case
435
         *         for this operation!)
436
         * @throws IOException
437
         *             If the proxy connection can't be used.
438
         * @throws IllegalArgumentException
439
         *             If the target doesn't exist in the job, the port number is
440
         *             out of range, or the channel has a bound address.
441
         */
442
        protected ByteBuffer sendMessageTo(ByteBuffer message) throws IOException {
443
                int id = message.getInt();
×
444
                int x = message.getInt();
×
445
                int y = message.getInt();
×
446
                var who = getTargetHost(x, y);
×
447

448
                int port = message.getInt();
×
449
                validatePort(port);
×
450

451
                log.trace("got message for channel {} for {}:{}", id, who, port);
×
452
                var conn = getConnection(id);
×
453
                if (isValid(conn)) {
×
454
                        if (conn.getRemoteIPAddress() != null) {
×
455
                                throw new IllegalArgumentException("channel is connected");
×
456
                        }
457
                        var payload = message.slice();
×
458
                        log.trace("sending message to {} of length {}", conn,
×
459
                                        payload.remaining());
×
460
                        conn.sendMessage(payload, who, port);
×
461
                }
462
                return null;
×
463
        }
464

465
        private static boolean isValid(ProxyUDPConnection conn) {
466
                return conn != null && !conn.isClosed();
×
467
        }
468

469
        private static void validatePort(int port) {
470
                if (port < 1 || port > MAX_PORT) {
×
471
                        throw new IllegalArgumentException("bad port number");
×
472
                }
473
        }
×
474

475
        private void setConnection(int id, ProxyUDPConnection conn) {
476
                synchronized (conns) {
×
477
                        conns.put(id, conn);
×
478
                }
×
479
        }
×
480

481
        private ProxyUDPConnection getConnection(int id) {
482
                synchronized (conns) {
×
483
                        return conns.get(id);
×
484
                }
485
        }
486

487
        private ProxyUDPConnection removeConnection(int id) {
488
                synchronized (conns) {
×
489
                        return conns.remove(id);
×
490
                }
491
        }
492

493
        private List<ProxyUDPConnection> listConnections() {
494
                synchronized (conns) {
×
495
                        return List.copyOf(conns.values());
×
496
                }
497
        }
498

499
        @Override
500
        public void close() {
501
                // Take a copy immediately
502
                var copy = listConnections();
×
503
                for (var conn : copy) {
×
504
                        if (conn != null && !conn.isClosed()) {
×
505
                                try {
506
                                        conn.close();
×
507
                                } catch (IOException e) {
×
508
                                        // Don't stop what we're doing; everything must go!
509
                                }
×
510
                        }
511
                }
×
512
        }
×
513
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc