• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 6233274834

19 Sep 2023 08:46AM UTC coverage: 36.409% (-0.6%) from 36.982%
6233274834

Pull #658

github

dkfellows
Merge branch 'master' into java-17
Pull Request #658: Update Java version to 17

1656 of 1656 new or added lines in 260 files covered. (100.0%)

8373 of 22997 relevant lines covered (36.41%)

0.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/SpiNNaker-allocserv/src/main/java/uk/ac/manchester/spinnaker/alloc/proxy/ProxyUDPConnection.java
1
/*
2
 * Copyright (c) 2022 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.alloc.proxy;
17

18
import static java.lang.Thread.currentThread;
19
import static org.slf4j.LoggerFactory.getLogger;
20
import static uk.ac.manchester.spinnaker.connections.UDPConnection.TrafficClass.IPTOS_THROUGHPUT;
21
import static uk.ac.manchester.spinnaker.utils.ByteBufferUtils.alloc;
22

23
import java.io.IOException;
24
import java.net.InetAddress;
25
import java.net.SocketTimeoutException;
26
import java.nio.ByteBuffer;
27
import java.util.Optional;
28
import java.util.Set;
29

30
import org.slf4j.Logger;
31
import org.springframework.web.socket.BinaryMessage;
32
import org.springframework.web.socket.WebSocketSession;
33

34
import uk.ac.manchester.spinnaker.connections.UDPConnection;
35
import uk.ac.manchester.spinnaker.connections.UDPPacket;
36

37
/**
38
 * The low-level handler for proxy connections.
39
 *
40
 * @author Donal Fellows
41
 */
42
public class ProxyUDPConnection extends UDPConnection<Optional<ByteBuffer>> {
43
        private static final Logger log = getLogger(ProxyUDPConnection.class);
×
44

45
        private static final int TIMEOUT = 1000;
46

47
        // Plenty of room; SpiNNaker messages are quite a bit smaller than this
48
        // Even boot messages are smaller; this is bigger than an Ethernet packet
49
        private static final int WORKING_BUFFER_SIZE = 2048;
50

51
        private final WebSocketSession session;
52

53
        private final Runnable emergencyRemove;
54

55
        private final ByteBuffer workingBuffer;
56

57
        private final String name;
58

59
        /** The number of <em>successfully</em> sent packets. */
60
        private long sendCount;
61

62
        /** The number of <em>successfully</em> received packets. */
63
        private long receiveCount;
64

65
        ProxyUDPConnection(WebSocketSession session, InetAddress remoteHost,
66
                        int remotePort, int id, Runnable emergencyRemove,
67
                        InetAddress localHost)
68
                        throws IOException {
69
                super(localHost, null, remoteHost, remotePort, IPTOS_THROUGHPUT);
×
70
                this.session = session;
×
71
                this.emergencyRemove = emergencyRemove;
×
72
                workingBuffer = alloc(WORKING_BUFFER_SIZE);
×
73
                // Fixed header for this particular connection
74
                workingBuffer.putInt(ProxyOp.MESSAGE.ordinal());
×
75
                workingBuffer.putInt(id);
×
76
                // Make the name now so it remains useful after close()
77
                if (remoteHost == null) {
×
78
                        name = session.getUri() + "#" + id + " ANY/ANY";
×
79
                } else {
80
                        name = session.getUri() + "#" + id + " " + remoteHost + "/"
×
81
                                        + remotePort;
82
                }
83
        }
×
84

85
        /**
86
         * {@inheritDoc}
87
         *
88
         * @return The wrapped message bytes, or {@link Optional#empty()} if the
89
         *         socket timed out (not an error!)
90
         */
91
        @Override
92
        public Optional<ByteBuffer> receiveMessage(int timeout)
93
                        throws IOException, InterruptedException {
94
                try {
95
                        // Raw buffer, including header bytes
96
                        var msg = receive(timeout);
×
97
                        receiveCount++;
×
98
                        return Optional.of(msg);
×
99
                } catch (SocketTimeoutException e) {
×
100
                        return Optional.empty();
×
101
                }
102
        }
103

104
        /**
105
         * Send a message on this connection.
106
         *
107
         * @param msg
108
         *            The message to send.
109
         * @throws IOException
110
         *             If sending fails.
111
         */
112
        public void sendMessage(ByteBuffer msg) throws IOException {
113
                send(msg);
×
114
                sendCount++;
×
115
        }
×
116

117
        /**
118
         * Send a message on this connection.
119
         *
120
         * @param msg
121
         *            The message to send.
122
         * @param addr
123
         *            Where to send to.
124
         * @param port
125
         *            What port to send to.
126
         * @throws IOException
127
         *             If sending fails.
128
         */
129
        public void sendMessage(ByteBuffer msg, InetAddress addr, int port)
130
                        throws IOException {
131
                sendTo(msg, addr, port);
×
132
                sendCount++;
×
133
        }
×
134

135
        /**
136
         * Writes the count of messages sent and received on this connection to the
137
         * log.
138
         */
139
        protected void writeCountsToLog() {
140
                if (sendCount > 0 || receiveCount > 0) {
×
141
                        log.info("{} message counts: sent {} received {}", name, sendCount,
×
142
                                        receiveCount);
×
143
                }
144
        }
×
145

146
        @Override
147
        public String toString() {
148
                return name;
×
149
        }
150

151
        /**
152
         * Core SpiNNaker message receive and dispatch-to-websocket loop.
153
         */
154
        protected void connectedReceiverTask() {
155
                var me = currentThread();
×
156
                var oldThreadName = me.getName();
×
157
                me.setName("ws/udp " + name);
×
158
                log.debug("launched listener {}", name);
×
159
                try {
160
                        mainLoop();
×
161
                } catch (IOException e) {
×
162
                        if (!isClosed()) {
×
163
                                emergencyClose(e);
×
164
                                log.warn("problem in SpiNNaker-to-client part of {}", name, e);
×
165
                        }
166
                } catch (InterruptedException e) {
×
167
                        // We've been interrupted, so we're done.
168
                        return;
×
169
                } finally {
170
                        log.debug("shutting down listener {}", name);
×
171
                        me.setName(oldThreadName);
×
172
                }
173
        }
×
174

175
        private void emergencyClose(IOException exn) {
176
                try {
177
                        close();
×
178
                        emergencyRemove.run();
×
179
                } catch (IOException e) {
×
180
                        exn.addSuppressed(e);
×
181
                }
×
182
        }
×
183

184
        private void mainLoop() throws IOException, InterruptedException {
185
                while (session.isOpen() && !isClosed()) {
×
186
                        var msg = receiveMessage(TIMEOUT);
×
187
                        if (msg.isPresent()) {
×
188
                                handleReceivedMessage(msg.orElseThrow());
×
189
                        } // Otherwise was a timeout; go round the loop again anyway.
190
                }
×
191
        }
×
192

193
        /**
194
         * Process a received message, forwarding it to the client.
195
         *
196
         * @param msg
197
         *            The received message, positioned at the point of the payload.
198
         * @throws IOException
199
         *             If the message can't be sent.
200
         */
201
        private void handleReceivedMessage(ByteBuffer msg) throws IOException {
202
                log.trace("{} received message {}", name, msg);
×
203
                var outgoing = workingBuffer.duplicate();
×
204
                outgoing.put(msg);
×
205
                outgoing.flip();
×
206
                synchronized (session) {
×
207
                        session.sendMessage(new BinaryMessage(outgoing));
×
208
                }
×
209
        }
×
210

211
        /**
212
         * Core SpiNNaker message receive and dispatch-to-websocket loop for the
213
         * type of connections required for EIEIO, especially for the live packet
214
         * gatherer, which does a complex muxing and programs sockets/tags in a
215
         * different way.
216
         *
217
         * @param recvFrom
218
         *            What hosts we are allowed to receive messages from.
219
         *            Messages from elsewhere will be discarded.
220
         */
221
        protected void eieioReceiverTask(Set<InetAddress> recvFrom) {
222
                var me = currentThread();
×
223
                var oldThreadName = me.getName();
×
224
                me.setName("ws/udp(eieio) " + name);
×
225
                log.debug("launched eieio listener {}", name);
×
226
                try {
227
                        mainLoop(recvFrom);
×
228
                } catch (IOException e) {
×
229
                        if (!isClosed()) {
×
230
                                emergencyClose(e);
×
231
                                log.warn("problem in SpiNNaker-to-client part of {}", name, e);
×
232
                        }
233
                } finally {
234
                        log.debug("shutting down eieio listener {}", name);
×
235
                        me.setName(oldThreadName);
×
236
                }
237
        }
×
238

239
        /**
240
         * Loop at core of {@link #eieioReceiverTask(Set)}.
241
         *
242
         * @param recvFrom
243
         *            What hosts we are allowed to receive messages from. Messages
244
         *            from elsewhere will be discarded.
245
         * @throws IOException
246
         *             If there is some sort of network problem.
247
         */
248
        private void mainLoop(Set<InetAddress> recvFrom) throws IOException {
249
                while (!isClosed()) {
×
250
                        UDPPacket packet;
251
                        try {
252
                                packet = receiveWithAddress(TIMEOUT);
×
253
                        } catch (SocketTimeoutException e) {
×
254
                                // Timeout; go round the loop again.
255
                                if (!session.isOpen() || isClosed()) {
×
256
                                        return;
×
257
                                }
258
                                continue;
×
259
                        }
×
260
                        // SECURITY: drop any packet not from an allocated board
261
                        if (!recvFrom.contains(packet.address().getAddress())) {
×
262
                                log.debug("dropped packet from {}", packet.address());
×
263
                                continue;
×
264
                        }
265
                        handleReceivedMessage(packet.byteBuffer());
×
266
                }
×
267
        }
×
268
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc