• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 13770791693

10 Mar 2025 05:18PM UTC coverage: 38.345% (-0.04%) from 38.381%
13770791693

push

github

rowleya
Style fixes

1 of 1 new or added line in 1 file covered. (100.0%)

879 existing lines in 25 files now uncovered.

9191 of 23969 relevant lines covered (38.35%)

1.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/protocols/FastDataIn.java
1
/*
2
 * Copyright (c) 2025 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.protocols;
17

18
import static java.lang.Integer.toUnsignedLong;
19
import static java.lang.Math.min;
20
import static java.lang.String.format;
21
import static java.nio.ByteBuffer.allocate;
22
import static java.nio.ByteOrder.LITTLE_ENDIAN;
23
import static java.util.stream.Collectors.toList;
24
import static java.util.stream.IntStream.range;
25
import static org.slf4j.LoggerFactory.getLogger;
26
import static uk.ac.manchester.spinnaker.protocols.FastDataInCommandID.SEND_DATA_TO_LOCATION;
27
import static uk.ac.manchester.spinnaker.protocols.FastDataInCommandID.SEND_SEQ_DATA;
28
import static uk.ac.manchester.spinnaker.protocols.FastDataInCommandID.SEND_TELL_DATA_IN;
29
import static uk.ac.manchester.spinnaker.messages.Constants.SDP_PAYLOAD_WORDS;
30
import static uk.ac.manchester.spinnaker.messages.Constants.WORD_SIZE;
31
import static uk.ac.manchester.spinnaker.messages.sdp.SDPHeader.Flag.REPLY_NOT_EXPECTED;
32
import static uk.ac.manchester.spinnaker.messages.sdp.SDPPort.GATHERER_DATA_SPEED_UP;
33
import static uk.ac.manchester.spinnaker.utils.ByteBufferUtils.slice;
34
import static uk.ac.manchester.spinnaker.utils.MathUtils.ceildiv;
35

36
import java.io.IOException;
37
import java.net.SocketTimeoutException;
38
import java.nio.ByteBuffer;
39
import java.nio.IntBuffer;
40
import java.util.BitSet;
41

42
import org.slf4j.Logger;
43

44
import com.google.errorprone.annotations.CheckReturnValue;
45
import com.google.errorprone.annotations.MustBeClosed;
46

47
import uk.ac.manchester.spinnaker.connections.ThrottledConnection;
48
import uk.ac.manchester.spinnaker.machine.CoreLocation;
49
import uk.ac.manchester.spinnaker.machine.HasChipLocation;
50
import uk.ac.manchester.spinnaker.machine.HasCoreLocation;
51
import uk.ac.manchester.spinnaker.machine.MemoryLocation;
52
import uk.ac.manchester.spinnaker.machine.tags.IPTag;
53
import uk.ac.manchester.spinnaker.messages.sdp.SDPHeader;
54
import uk.ac.manchester.spinnaker.messages.sdp.SDPLocation;
55
import uk.ac.manchester.spinnaker.messages.sdp.SDPMessage;
56
import uk.ac.manchester.spinnaker.transceiver.ProcessException;
57

58
public class FastDataIn implements AutoCloseable {
59

UNCOV
60
        private static final Logger log = getLogger(FastDataIn.class);
×
61

62
        /** Items of data a SDP packet can hold when SCP header removed. */
63
        private static final int BYTES_PER_FULL_PACKET =
64
                        SDP_PAYLOAD_WORDS * WORD_SIZE;
65

66
        // 272 bytes as removed SCP header
67

68
        /**
69
         * size of the location data packet (command, transaction id, start sdram
70
         * address, x and y, and max packet number.
71
         */
72
        private static final int BYTES_FOR_LOCATION_PACKET = 5 * WORD_SIZE;
73

74
        /**
75
         * Offset where data in starts on first command (command, transaction id,
76
         * seq_number), in bytes.
77
         */
78
        private static final int OFFSET_AFTER_COMMAND_AND_KEY = 3 * WORD_SIZE;
79

80
        /** Size for data to store when packet with command and key. */
81
        private static final int DATA_IN_FULL_PACKET_WITH_KEY =
82
                        BYTES_PER_FULL_PACKET - OFFSET_AFTER_COMMAND_AND_KEY;
83

84
        /**
85
         * size for data to store when sending tell packet (command id, transaction
86
         * id).
87
         */
88
        private static final int BYTES_FOR_TELL_PACKET = 2 * WORD_SIZE;
89

90
        private static final int TIMEOUT_RETRY_LIMIT = 100;
91

92
        /** flag for saying missing all SEQ numbers. */
93
        private static final int FLAG_FOR_MISSING_ALL_SEQUENCES = 0xFFFFFFFE;
94

95
        /** Sequence number that marks the end of a sequence number stream. */
96
        private static final int MISSING_SEQS_END = -1;
97

98
        private final HasCoreLocation gathererCore;
99

100
        private final ThrottledConnection connection;
101

102
        /** The current transaction id for the board. */
UNCOV
103
        private int transactionId = 0;
×
104

105
        /**
106
         * Create an instance of the protocol for talking to a particular extra
107
         * monitor core on a particular board.
108
         *
109
         * @param gathererCore
110
         *            The gatherer core on the board that messages will be routed
111
         *            via.
112
         * @param iptag The IPTag to use for communications.
113
         * @throws IOException
114
         *             If IO fails.
115
         * @throws ProcessException
116
         *             If SpiNNaker rejects the reprogramming of the tag.
117
         * @throws InterruptedException
118
         *             If communications are interrupted.
119
         */
120
        @MustBeClosed
121
        @SuppressWarnings("MustBeClosed")
122
        public FastDataIn(CoreLocation gathererCore, IPTag iptag)
UNCOV
123
                                        throws ProcessException, IOException, InterruptedException {
×
UNCOV
124
                this.gathererCore = gathererCore;
×
UNCOV
125
                this.connection = new ThrottledConnection(iptag);
×
UNCOV
126
        }
×
127

128
        @Override
129
        public void close() throws IOException {
UNCOV
130
                connection.close();
×
UNCOV
131
        }
×
132

133
        /**
134
         * Write data to a given memory location.
135
         *
136
         * @param boardLocalDestination
137
         *            The target chip of the write.
138
         * @param baseAddress
139
         *            Whether the data will be written.
140
         * @param data
141
         *            The data to be written.
142
         * @throws IOException
143
         *             If IO fails.
144
         * @throws InterruptedException
145
         *            If communications are interrupted.
146
         */
147
        public void fastWrite(HasChipLocation boardLocalDestination,
148
                        MemoryLocation baseAddress, ByteBuffer data)
149
                                        throws IOException, InterruptedException {
UNCOV
150
                int timeoutCount = 0;
×
UNCOV
151
                int numPackets = computeNumPackets(data);
×
UNCOV
152
                int transactionId = ++this.transactionId;
×
153

154
                outerLoop: while (true) {
155
                        // Do the initial blast of data
UNCOV
156
                        sendInitialPackets(boardLocalDestination,
×
157
                                        baseAddress, data, transactionId, numPackets);
158
                        /*
159
                         * Don't create a missing buffer until at least one packet has
160
                         * come back.
161
                         */
162
                        BitSet missing = null;
×
163

164
                        // Wait for confirmation and do required retransmits
165
                        innerLoop: while (true) {
166
                                try {
UNCOV
167
                                        var buf = connection.receive();
×
UNCOV
168
                                        var received = buf.order(LITTLE_ENDIAN).asIntBuffer();
×
UNCOV
169
                                        timeoutCount = 0; // Reset the timeout counter
×
UNCOV
170
                                        int command = received.get();
×
171
                                        try {
172
                                                // read transaction id
UNCOV
173
                                                var commandCode = FastDataInCommandID.forValue(command);
×
UNCOV
174
                                                int thisTransactionId = received.get();
×
175

176
                                                // if wrong transaction id, ignore packet
177
                                                if (thisTransactionId != transactionId) {
×
178
                                                        continue innerLoop;
×
179
                                                }
180

181
                                                // Decide what to do with the packet
UNCOV
182
                                                switch (commandCode) {
×
183
                                                case RECEIVE_FINISHED_DATA_IN:
184
                                                        // We're done!
UNCOV
185
                                                        break outerLoop;
×
186

187
                                                case RECEIVE_MISSING_SEQ_DATA_IN:
188
                                                        if (!received.hasRemaining()) {
×
UNCOV
189
                                                                throw new BadDataInMessageException(
×
UNCOV
190
                                                                                received.get(0), received);
×
191
                                                        }
192
                                                        log.debug(
×
193
                                                                        "another packet (#{}) of missing "
194
                                                                                        + "sequence numbers;",
195
                                                                        received.get(1));
×
UNCOV
196
                                                        break;
×
197
                                                default:
198
                                                        throw new BadDataInMessageException(
×
199
                                                                        received.get(0), received);
×
200
                                                }
201

202
                                                /*
203
                                                 * The currently received packet has missing
204
                                                 * sequence numbers. Accumulate and dispatch
205
                                                 * transactionId when we've got them all.
206
                                                 */
UNCOV
207
                                                if (missing == null) {
×
208
                                                        missing = new BitSet(numPackets);
×
209
                                                }
UNCOV
210
                                                var flags = addMissedSeqNums(
×
211
                                                                received, missing, numPackets);
212

213
                                                /*
214
                                                 * Check that you've seen something that implies
215
                                                 * ready to retransmit.
216
                                                 */
217
                                                if (flags.seenAll || flags.seenEnd) {
×
218
                                                        retransmitMissingPackets(data, missing,
×
219
                                                                        transactionId);
220
                                                        missing.clear();
×
221
                                                }
UNCOV
222
                                        } catch (IllegalArgumentException e) {
×
UNCOV
223
                                                log.error("Unexpected command code " + command
×
224
                                                                + " received from "
UNCOV
225
                                                                + connection.getLocation());
×
UNCOV
226
                                        }
×
227
                                } catch (SocketTimeoutException e) {
×
228
                                        if (timeoutCount++ > TIMEOUT_RETRY_LIMIT) {
×
UNCOV
229
                                                log.error(
×
230
                                                                "ran out of attempts on transaction {}"
231
                                                                                + " due to timeouts.",
232
                                                                transactionId);
×
233
                                                throw e;
×
234
                                        }
235
                                        /*
236
                                         * If we never received a packet, we will never have
237
                                         * created the buffer, so send everything again
238
                                         */
239
                                        if (missing == null) {
×
UNCOV
240
                                                log.debug("full timeout; resending initial "
×
241
                                                                + "packets for stream with transaction "
242
                                                                + "id {}", transactionId);
×
243
                                                continue outerLoop;
×
244
                                        }
UNCOV
245
                                        log.warn(
×
246
                                                        "timeout {} on transaction {} writing to {}"
247
                                                                        + " via {}",
UNCOV
248
                                                        timeoutCount, transactionId, baseAddress,
×
249
                                                        gathererCore);
250
                                        retransmitMissingPackets(data, missing,
×
251
                                                        transactionId);
252
                                        missing.clear();
×
253
                                }
×
254
                        }
255
                }
UNCOV
256
        }
×
257

258
        @CheckReturnValue
259
        private SeenFlags addMissedSeqNums(IntBuffer received, BitSet seqNums,
260
                        int expectedMax) {
UNCOV
261
                var flags = new SeenFlags();
×
262
                var addedEnd = "";
×
263
                var addedAll = "";
×
UNCOV
264
                int actuallyAdded = 0;
×
UNCOV
265
                while (received.hasRemaining()) {
×
266
                        int num = received.get();
×
267

UNCOV
268
                        if (num == MISSING_SEQS_END) {
×
UNCOV
269
                                addedEnd = "and saw END marker";
×
UNCOV
270
                                flags.seenEnd = true;
×
271
                                break;
×
272
                        }
273
                        if (num == FLAG_FOR_MISSING_ALL_SEQUENCES) {
×
274
                                addedAll = "by finding ALL missing marker";
×
275
                                flags.seenAll = true;
×
276
                                for (int seqNum = 0; seqNum < expectedMax; seqNum++) {
×
UNCOV
277
                                        seqNums.set(seqNum);
×
278
                                        actuallyAdded++;
×
279
                                }
280
                                break;
×
281
                        }
282

283
                        seqNums.set(num);
×
284
                        actuallyAdded++;
×
285
                        if (num < 0 || num > expectedMax) {
×
286
                                throw new CrazySequenceNumberException(num, received);
×
287
                        }
288
                }
×
UNCOV
289
                log.debug("added {} missed packets, {}{}", actuallyAdded, addedEnd,
×
290
                                addedAll);
UNCOV
291
                return flags;
×
292
        }
293

294
        private int sendInitialPackets(HasChipLocation boardLocalDestination,
295
                        MemoryLocation baseAddress, ByteBuffer data, int transactionId,
296
                        int numPackets) throws IOException {
UNCOV
297
                log.debug("streaming {} bytes in {} packets using transaction {}",
×
298
                                data.remaining(), numPackets, transactionId);
×
299
                log.debug("sending packet #{}", 0);
×
UNCOV
300
                connection.send(dataToLocation(boardLocalDestination, baseAddress,
×
301
                                numPackets, transactionId));
UNCOV
302
                for (int seqNum = 0; seqNum < numPackets; seqNum++) {
×
UNCOV
303
                        log.debug("sending packet #{}", seqNum);
×
UNCOV
304
                        connection.send(seqData(data, seqNum, transactionId));
×
305
                }
UNCOV
306
                log.debug("sending terminating packet");
×
307
                connection.send(tellDataIn(transactionId));
×
308
                return numPackets;
×
309
        }
310

311
        private void retransmitMissingPackets(ByteBuffer dataToSend,
312
                        BitSet missingSeqNums, int transactionId)
313
                        throws IOException {
314
                log.debug("retransmitting {} packets", missingSeqNums.cardinality());
×
315

316
                missingSeqNums.stream().forEach(seqNum -> {
×
317
                        log.debug("resending packet #{}", seqNum);
×
318
                        try {
UNCOV
319
                                connection.send(seqData(dataToSend, seqNum, transactionId));
×
UNCOV
320
                        } catch (IOException e) {
×
UNCOV
321
                                log.error(
×
322
                                                "missing sequence packet with id {}-{} "
323
                                                                + "failed to transmit",
324
                                                seqNum, transactionId, e);
×
UNCOV
325
                        }
×
326
                });
×
327
                log.debug("sending terminating packet");
×
UNCOV
328
                connection.send(tellDataIn(transactionId));
×
329
        }
×
330

331
        /**
332
         * Contains flags for seen missing sequence numbers.
333
         *
334
         * @author Alan Stokes
335
         */
336
        private static final class SeenFlags {
337
                boolean seenEnd;
338

339
                boolean seenAll;
340
        }
341

342
        private SDPHeader header() {
UNCOV
343
                return new SDPHeader(REPLY_NOT_EXPECTED, new SDPLocation(gathererCore),
×
344
                                GATHERER_DATA_SPEED_UP.value);
345
        }
346

347
        /**
348
         * @param boardLocalDestination
349
         *            The destination for the data on the board being used.
350
         * @param baseAddress
351
         *            Where the data is to be written.
352
         * @param numPackets
353
         *            How many SDP packets will be sent.
354
         * @param transactionId
355
         *            The transaction id of this stream.
356
         * @return The message indicating the start of the data.
357
         */
358
        private SDPMessage dataToLocation(HasChipLocation boardLocalDestination,
359
                        MemoryLocation baseAddress, int numPackets, int transactionId) {
UNCOV
360
                var payload = allocate(BYTES_FOR_LOCATION_PACKET).order(LITTLE_ENDIAN);
×
UNCOV
361
                payload.putInt(SEND_DATA_TO_LOCATION.value);
×
UNCOV
362
                payload.putInt(transactionId);
×
UNCOV
363
                payload.putInt(baseAddress.address);
×
UNCOV
364
                payload.putShort((short) boardLocalDestination.getY());
×
UNCOV
365
                payload.putShort((short) boardLocalDestination.getX());
×
UNCOV
366
                payload.putInt(numPackets - 1);
×
UNCOV
367
                payload.flip();
×
UNCOV
368
                return new SDPMessage(header(), payload);
×
369
        }
370

371
        /**
372
         * @param data
373
         *            The overall data to be transmitted.
374
         * @param seqNum
375
         *            The sequence number of this chunk.
376
         *
377
         * @param transactionId
378
         *            The transaction id for this stream.
379
         * @return The message containing a chunk of the data.
380
         * @throws RuntimeException
381
         *             If the sequence number is nonsense.
382
         */
383
        SDPMessage seqData(ByteBuffer data, int seqNum, int transactionId) {
UNCOV
384
                var payload = allocate(BYTES_PER_FULL_PACKET).order(LITTLE_ENDIAN);
×
UNCOV
385
                int position = calculatePositionFromSequenceNumber(seqNum);
×
UNCOV
386
                if (position >= data.limit()) {
×
UNCOV
387
                        throw new RuntimeException(format(
×
388
                                        "attempt to write off end of buffer due to "
389
                                                        + "over-large sequence number (%d) given "
390
                                                        + "that only %d bytes are to be sent",
UNCOV
391
                                        seqNum, toUnsignedLong(data.limit())));
×
392
                }
UNCOV
393
                payload.putInt(SEND_SEQ_DATA.value);
×
394
                payload.putInt(transactionId);
×
395
                payload.putInt(seqNum);
×
396
                putBuffer(data, position, payload);
×
397
                return new SDPMessage(header(), payload);
×
398
        }
399

400
        private int putBuffer(ByteBuffer data, int position, ByteBuffer payload) {
401
                var slice = slice(data, position,
×
UNCOV
402
                                min(data.remaining() - position, payload.remaining()));
×
403
                payload.put(slice).flip();
×
404
                return slice.position();
×
405
        }
406

407
        private int calculatePositionFromSequenceNumber(int seqNum) {
UNCOV
408
                return DATA_IN_FULL_PACKET_WITH_KEY * seqNum;
×
409
        }
410

411
        /**
412
         * generates the tell message.
413
         *
414
         * @param transactionId
415
         *            The transaction id for this stream.
416
         * @return The message indicating the end of the data.
417
         */
418
        SDPMessage tellDataIn(int transactionId) {
UNCOV
419
                var payload = allocate(BYTES_FOR_TELL_PACKET).order(LITTLE_ENDIAN);
×
UNCOV
420
                payload.putInt(SEND_TELL_DATA_IN.value);
×
UNCOV
421
                payload.putInt(transactionId);
×
UNCOV
422
                payload.flip();
×
UNCOV
423
                return new SDPMessage(header(), payload);
×
424
        }
425

426
        /**
427
         * Computes the number of packets required to send the given data.
428
         *
429
         * @param data
430
         *            The data being sent. (This operation only reads.)
431
         * @return The number of packets (i.e. 1 more than the max sequence number).
432
         */
433
        static int computeNumPackets(ByteBuffer data) {
UNCOV
434
                return ceildiv(data.remaining(), DATA_IN_FULL_PACKET_WITH_KEY);
×
435
        }
436

437
        /**
438
         * Exception thrown when something mad comes back off SpiNNaker.
439
         *
440
         * @author Donal Fellows
441
         */
442
        static class BadDataInMessageException extends RuntimeException {
443
                private static final long serialVersionUID = 1L;
444

445
                BadDataInMessageException(int code, IntBuffer message) {
UNCOV
446
                        super("unexpected response code: " + toUnsignedLong(code));
×
UNCOV
447
                        log.warn("bad message payload: {}", range(0, message.limit())
×
UNCOV
448
                                        .map(i -> message.get(i)).boxed().collect(toList()));
×
UNCOV
449
                }
×
450
        }
451

452
        /**
453
         * Exception thrown when something mad comes back off SpiNNaker.
454
         *
455
         * @author Donal Fellows
456
         * @author Alan Stokes
457
         */
458
        static class CrazySequenceNumberException extends RuntimeException {
459
                private static final long serialVersionUID = 1L;
460

461
                CrazySequenceNumberException(int remaining, IntBuffer message) {
UNCOV
462
                        super("crazy number of missing packets: "
×
UNCOV
463
                                        + toUnsignedLong(remaining));
×
UNCOV
464
                        log.warn("bad message payload: {}", range(0, message.limit())
×
UNCOV
465
                                        .map(i -> message.get(i)).boxed().collect(toList()));
×
UNCOV
466
                }
×
467
        }
468
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc