• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 13520271443

25 Feb 2025 11:34AM UTC coverage: 38.48% (-0.03%) from 38.51%
13520271443

push

github

rowleya
Fix tabs

675 of 2950 new or added lines in 17 files covered. (22.88%)

9 existing lines in 2 files now uncovered.

9182 of 23862 relevant lines covered (38.48%)

1.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/protocols/FastDataIn.java
1
/*
2
 * Copyright (c) 2025 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.protocols;
17

18
import static java.lang.Integer.toUnsignedLong;
19
import static java.lang.Math.min;
20
import static java.lang.String.format;
21
import static java.nio.ByteBuffer.allocate;
22
import static java.nio.ByteOrder.LITTLE_ENDIAN;
23
import static java.util.stream.Collectors.toList;
24
import static java.util.stream.IntStream.range;
25
import static org.slf4j.LoggerFactory.getLogger;
26
import static uk.ac.manchester.spinnaker.protocols.FastDataInCommandID.SEND_DATA_TO_LOCATION;
27
import static uk.ac.manchester.spinnaker.protocols.FastDataInCommandID.SEND_SEQ_DATA;
28
import static uk.ac.manchester.spinnaker.protocols.FastDataInCommandID.SEND_TELL_DATA_IN;
29
import static uk.ac.manchester.spinnaker.messages.Constants.SDP_PAYLOAD_WORDS;
30
import static uk.ac.manchester.spinnaker.messages.Constants.WORD_SIZE;
31
import static uk.ac.manchester.spinnaker.messages.sdp.SDPHeader.Flag.REPLY_NOT_EXPECTED;
32
import static uk.ac.manchester.spinnaker.messages.sdp.SDPPort.GATHERER_DATA_SPEED_UP;
33
import static uk.ac.manchester.spinnaker.utils.ByteBufferUtils.slice;
34
import static uk.ac.manchester.spinnaker.utils.MathUtils.ceildiv;
35

36
import java.io.IOException;
37
import java.net.SocketTimeoutException;
38
import java.nio.ByteBuffer;
39
import java.nio.IntBuffer;
40
import java.util.BitSet;
41

42
import org.slf4j.Logger;
43

44
import com.google.errorprone.annotations.CheckReturnValue;
45
import com.google.errorprone.annotations.MustBeClosed;
46

47
import uk.ac.manchester.spinnaker.connections.ThrottledConnection;
48
import uk.ac.manchester.spinnaker.machine.ChipLocation;
49
import uk.ac.manchester.spinnaker.machine.CoreLocation;
50
import uk.ac.manchester.spinnaker.machine.HasChipLocation;
51
import uk.ac.manchester.spinnaker.machine.HasCoreLocation;
52
import uk.ac.manchester.spinnaker.machine.MemoryLocation;
53
import uk.ac.manchester.spinnaker.machine.tags.IPTag;
54
import uk.ac.manchester.spinnaker.messages.sdp.SDPHeader;
55
import uk.ac.manchester.spinnaker.messages.sdp.SDPLocation;
56
import uk.ac.manchester.spinnaker.messages.sdp.SDPMessage;
57
import uk.ac.manchester.spinnaker.transceiver.ProcessException;
58
import uk.ac.manchester.spinnaker.transceiver.TransceiverInterface;
59

60
public class FastDataIn implements AutoCloseable {
61

NEW
62
        private static final Logger log = getLogger(FastDataIn.class);
×
63

64
        /** Items of data a SDP packet can hold when SCP header removed. */
65
        private static final int BYTES_PER_FULL_PACKET = SDP_PAYLOAD_WORDS * WORD_SIZE;
66

67
        // 272 bytes as removed SCP header
68

69
        /**
70
         * size of the location data packet (command, transaction id, start sdram
71
         * address, x and y, and max packet number.
72
         */
73
        private static final int BYTES_FOR_LOCATION_PACKET = 5 * WORD_SIZE;
74

75
        /**
76
         * Offset where data in starts on first command (command, transaction id,
77
         * seq_number), in bytes.
78
         */
79
        private static final int OFFSET_AFTER_COMMAND_AND_KEY = 3 * WORD_SIZE;
80

81
        /** Size for data to store when packet with command and key. */
82
        private static final int DATA_IN_FULL_PACKET_WITH_KEY =
83
                        BYTES_PER_FULL_PACKET - OFFSET_AFTER_COMMAND_AND_KEY;
84

85
        /**
86
         * size for data to store when sending tell packet (command id, transaction
87
         * id).
88
         */
89
        private static final int BYTES_FOR_TELL_PACKET = 2 * WORD_SIZE;
90

91
        private static final int TIMEOUT_RETRY_LIMIT = 100;
92

93
        /** flag for saying missing all SEQ numbers. */
94
        private static final int FLAG_FOR_MISSING_ALL_SEQUENCES = 0xFFFFFFFE;
95

96
        /** Sequence number that marks the end of a sequence number stream. */
97
        private static final int MISSING_SEQS_END = -1;
98

99
        private final int maxChipX;
100

101
        private final int maxChipY;
102

103
        private final HasCoreLocation gathererCore;
104

105
        private final ThrottledConnection connection;
106

107
        /** The current transaction id for the board. */
NEW
108
        private int transactionId = 0;
×
109

110
        /**
111
         * Create an instance of the protocol for talking to a particular extra
112
         * monitor core on a particular board.
113
         *
114
         * @param maxChipX The maximum X coordinate of the machine to speak to.
115
         * @param maxChipY The maximum Y coordinate of the machine to speak to.
116
         * @param gathererCore
117
         *            The gatherer core on the board that messages will be routed
118
         *            via.
119
         * @param transceiver A transceiver to use for communications.
120
         * @param ethernetChip The Ethernet chip to talk to the chip via.
121
         * @param ethernetAddress The Ethernet address of the chip to talk via.
122
         * @param iptag The IPTag to use for communications.
123
         * @throws IOException
124
         *             If IO fails.
125
         * @throws ProcessException
126
         *             If SpiNNaker rejects the reprogramming of the tag.
127
         * @throws InterruptedException
128
         *             If communications are interrupted.
129
         */
130
        @MustBeClosed
131
        @SuppressWarnings("MustBeClosed")
132
        public FastDataIn(int maxChipX, int maxChipY, CoreLocation gathererCore,
133
                        TransceiverInterface transceiver, ChipLocation ethernetChip,
134
                        String ethernetAddress, IPTag iptag)
NEW
135
                                        throws ProcessException, IOException, InterruptedException {
×
NEW
136
                this.gathererCore = gathererCore;
×
NEW
137
                this.maxChipX = maxChipX;
×
NEW
138
                this.maxChipY = maxChipY;
×
NEW
139
                this.connection = new ThrottledConnection(transceiver, ethernetChip,
×
140
                                ethernetAddress, iptag);
NEW
141
        }
×
142

143
        @Override
144
        public void close() throws IOException {
NEW
145
                connection.close();
×
NEW
146
        }
×
147

148
        private HasChipLocation getBoardLocalDestination(
149
                        HasChipLocation monitorChip) {
NEW
150
                int boardLocalX = monitorChip.getX() - gathererCore.getX();
×
NEW
151
                if (boardLocalX < 0) {
×
NEW
152
                        boardLocalX += maxChipX + 1;
×
153
                }
NEW
154
                int boardLocalY = monitorChip.getY() - gathererCore.getY();
×
NEW
155
                if (boardLocalY < 0) {
×
NEW
156
                        boardLocalY += maxChipY + 1;
×
157
                }
NEW
158
                return new ChipLocation(boardLocalX, boardLocalY);
×
159
        }
160

161
        /**
162
         * Write data to a given memory location.
163
         *
164
         * @param baseAddress
165
         *            Whether the data will be written.
166
         * @param data
167
         *            The data to be written.
168
         * @throws IOException
169
         *             If IO fails.
170
         * @throws InterruptedException
171
         *            If communications are interrupted.
172
         */
173
        public void fastWrite(HasChipLocation destination,
174
                        MemoryLocation baseAddress, ByteBuffer data)
175
                                        throws IOException, InterruptedException {
NEW
176
                int timeoutCount = 0;
×
NEW
177
                int numPackets = computeNumPackets(data);
×
NEW
178
                int transactionId = ++this.transactionId;
×
179

180
                outerLoop: while (true) {
181
                        // Do the initial blast of data
NEW
182
                        sendInitialPackets(getBoardLocalDestination(destination),
×
183
                                        baseAddress, data, transactionId, numPackets);
184
                        /*
185
                         * Don't create a missing buffer until at least one packet has
186
                         * come back.
187
                         */
NEW
188
                        BitSet missing = null;
×
189

190
                        // Wait for confirmation and do required retransmits
191
                        innerLoop: while (true) {
192
                                try {
NEW
193
                                        var buf = connection.receive();
×
NEW
194
                                        var received = buf.order(LITTLE_ENDIAN).asIntBuffer();
×
NEW
195
                                        timeoutCount = 0; // Reset the timeout counter
×
NEW
196
                                        int command = received.get();
×
197
                                        try {
198
                                                // read transaction id
NEW
199
                                                var commandCode = FastDataInCommandID.forValue(command);
×
NEW
200
                                                int thisTransactionId = received.get();
×
201

202
                                                // if wrong transaction id, ignore packet
NEW
203
                                                if (thisTransactionId != transactionId) {
×
NEW
204
                                                        continue innerLoop;
×
205
                                                }
206

207
                                                // Decide what to do with the packet
NEW
208
                                                switch (commandCode) {
×
209
                                                case RECEIVE_FINISHED_DATA_IN:
210
                                                        // We're done!
NEW
211
                                                        break outerLoop;
×
212

213
                                                case RECEIVE_MISSING_SEQ_DATA_IN:
NEW
214
                                                        if (!received.hasRemaining()) {
×
NEW
215
                                                                throw new BadDataInMessageException(
×
NEW
216
                                                                                received.get(0), received);
×
217
                                                        }
NEW
218
                                                        log.debug(
×
219
                                                                        "another packet (#{}) of missing "
220
                                                                                        + "sequence numbers;",
NEW
221
                                                                        received.get(1));
×
NEW
222
                                                        break;
×
223
                                                default:
NEW
224
                                                        throw new BadDataInMessageException(
×
NEW
225
                                                                        received.get(0), received);
×
226
                                                }
227

228
                                                /*
229
                                                 * The currently received packet has missing
230
                                                 * sequence numbers. Accumulate and dispatch
231
                                                 * transactionId when we've got them all.
232
                                                 */
NEW
233
                                                if (missing == null) {
×
NEW
234
                                                        missing = new BitSet(numPackets);
×
235
                                                }
NEW
236
                                                var flags = addMissedSeqNums(
×
237
                                                                received, missing, numPackets);
238

239
                                                /*
240
                                                 * Check that you've seen something that implies
241
                                                 * ready to retransmit.
242
                                                 */
NEW
243
                                                if (flags.seenAll || flags.seenEnd) {
×
NEW
244
                                                        retransmitMissingPackets(data, missing,
×
245
                                                                        transactionId);
NEW
246
                                                        missing.clear();
×
247
                                                }
NEW
248
                                        } catch (IllegalArgumentException e) {
×
NEW
249
                                                log.error("Unexpected command code " + command
×
250
                                                                + " received from "
NEW
251
                                                                + connection.getLocation());
×
NEW
252
                                        }
×
NEW
253
                                } catch (SocketTimeoutException e) {
×
NEW
254
                                        if (timeoutCount++ > TIMEOUT_RETRY_LIMIT) {
×
NEW
255
                                                log.error(
×
256
                                                                "ran out of attempts on transaction {}"
257
                                                                                + " due to timeouts.",
NEW
258
                                                                transactionId);
×
NEW
259
                                                throw e;
×
260
                                        }
261
                                        /*
262
                                         * If we never received a packet, we will never have
263
                                         * created the buffer, so send everything again
264
                                         */
NEW
265
                                        if (missing == null) {
×
NEW
266
                                                log.debug("full timeout; resending initial "
×
267
                                                                + "packets for stream with transaction "
NEW
268
                                                                + "id {}", transactionId);
×
NEW
269
                                                continue outerLoop;
×
270
                                        }
NEW
271
                                        log.info(
×
272
                                                        "timeout {} on transaction {} writing to {}"
273
                                                                        + " via {}",
NEW
274
                                                        timeoutCount, transactionId, baseAddress,
×
275
                                                        gathererCore);
NEW
276
                                        retransmitMissingPackets(data, missing,
×
277
                                                        transactionId);
NEW
278
                                        missing.clear();
×
NEW
279
                                }
×
280
                        }
281
                }
NEW
282
        }
×
283

284
        @CheckReturnValue
285
        private SeenFlags addMissedSeqNums(IntBuffer received, BitSet seqNums,
286
                        int expectedMax) {
NEW
287
                var flags = new SeenFlags();
×
NEW
288
                var addedEnd = "";
×
NEW
289
                var addedAll = "";
×
NEW
290
                int actuallyAdded = 0;
×
NEW
291
                while (received.hasRemaining()) {
×
NEW
292
                        int num = received.get();
×
293

NEW
294
                        if (num == MISSING_SEQS_END) {
×
NEW
295
                                addedEnd = "and saw END marker";
×
NEW
296
                                flags.seenEnd = true;
×
NEW
297
                                break;
×
298
                        }
NEW
299
                        if (num == FLAG_FOR_MISSING_ALL_SEQUENCES) {
×
NEW
300
                                addedAll = "by finding ALL missing marker";
×
NEW
301
                                flags.seenAll = true;
×
NEW
302
                                for (int seqNum = 0; seqNum < expectedMax; seqNum++) {
×
NEW
303
                                        seqNums.set(seqNum);
×
NEW
304
                                        actuallyAdded++;
×
305
                                }
NEW
306
                                break;
×
307
                        }
308

NEW
309
                        seqNums.set(num);
×
NEW
310
                        actuallyAdded++;
×
NEW
311
                        if (num < 0 || num > expectedMax) {
×
NEW
312
                                throw new CrazySequenceNumberException(num, received);
×
313
                        }
NEW
314
                }
×
NEW
315
                log.debug("added {} missed packets, {}{}", actuallyAdded, addedEnd,
×
316
                                addedAll);
NEW
317
                return flags;
×
318
        }
319

320
        private int sendInitialPackets(HasChipLocation boardLocalDestination,
321
                        MemoryLocation baseAddress, ByteBuffer data, int transactionId,
322
                        int numPackets) throws IOException {
NEW
323
                log.info("streaming {} bytes in {} packets using transaction {}",
×
NEW
324
                                data.remaining(), numPackets, transactionId);
×
NEW
325
                log.debug("sending packet #{}", 0);
×
NEW
326
                connection.send(dataToLocation(boardLocalDestination, baseAddress,
×
327
                                numPackets, transactionId));
NEW
328
                for (int seqNum = 0; seqNum < numPackets; seqNum++) {
×
NEW
329
                        log.debug("sending packet #{}", seqNum);
×
NEW
330
                        connection.send(seqData(data, seqNum, transactionId));
×
331
                }
NEW
332
                log.debug("sending terminating packet");
×
NEW
333
                connection.send(tellDataIn(transactionId));
×
NEW
334
                return numPackets;
×
335
        }
336

337
        private void retransmitMissingPackets(ByteBuffer dataToSend,
338
                        BitSet missingSeqNums, int transactionId)
339
                        throws IOException {
NEW
340
                log.info("retransmitting {} packets", missingSeqNums.cardinality());
×
341

NEW
342
                missingSeqNums.stream().forEach(seqNum -> {
×
NEW
343
                        log.debug("resending packet #{}", seqNum);
×
344
                        try {
NEW
345
                                connection.send(seqData(dataToSend, seqNum, transactionId));
×
NEW
346
                        } catch (IOException e) {
×
NEW
347
                                log.error(
×
348
                                                "missing sequence packet with id {}-{} "
349
                                                                + "failed to transmit",
NEW
350
                                                seqNum, transactionId, e);
×
NEW
351
                        }
×
NEW
352
                });
×
NEW
353
                log.debug("sending terminating packet");
×
NEW
354
                connection.send(tellDataIn(transactionId));
×
NEW
355
        }
×
356

357
        /**
358
         * Contains flags for seen missing sequence numbers.
359
         *
360
         * @author Alan Stokes
361
         */
362
        private static final class SeenFlags {
363
                boolean seenEnd;
364

365
                boolean seenAll;
366
        }
367

368
        private SDPHeader header() {
NEW
369
                return new SDPHeader(REPLY_NOT_EXPECTED, new SDPLocation(gathererCore),
×
370
                                GATHERER_DATA_SPEED_UP.value);
371
        }
372

373
        /**
374
         * @param boardLocalDestination
375
         *            The destination for the data on the board being used.
376
         * @param baseAddress
377
         *            Where the data is to be written.
378
         * @param numPackets
379
         *            How many SDP packets will be sent.
380
         * @param transactionId
381
         *            The transaction id of this stream.
382
         * @return The message indicating the start of the data.
383
         */
384
        private SDPMessage dataToLocation(HasChipLocation boardLocalDestination,
385
                        MemoryLocation baseAddress, int numPackets, int transactionId) {
NEW
386
                var payload = allocate(BYTES_FOR_LOCATION_PACKET).order(LITTLE_ENDIAN);
×
NEW
387
                payload.putInt(SEND_DATA_TO_LOCATION.value);
×
NEW
388
                payload.putInt(transactionId);
×
NEW
389
                payload.putInt(baseAddress.address);
×
NEW
390
                payload.putShort((short) boardLocalDestination.getY());
×
NEW
391
                payload.putShort((short) boardLocalDestination.getX());
×
NEW
392
                payload.putInt(numPackets - 1);
×
NEW
393
                payload.flip();
×
NEW
394
                return new SDPMessage(header(), payload);
×
395
        }
396

397
        /**
398
         * @param data
399
         *            The overall data to be transmitted.
400
         * @param seqNum
401
         *            The sequence number of this chunk.
402
         *
403
         * @param transactionId
404
         *            The transaction id for this stream.
405
         * @return The message containing a chunk of the data.
406
         * @throws RuntimeException
407
         *             If the sequence number is nonsense.
408
         */
409
        SDPMessage seqData(ByteBuffer data, int seqNum, int transactionId) {
NEW
410
                var payload = allocate(BYTES_PER_FULL_PACKET).order(LITTLE_ENDIAN);
×
NEW
411
                int position = calculatePositionFromSequenceNumber(seqNum);
×
NEW
412
                if (position >= data.limit()) {
×
NEW
413
                        throw new RuntimeException(format(
×
414
                                        "attempt to write off end of buffer due to "
415
                                                        + "over-large sequence number (%d) given "
416
                                                        + "that only %d bytes are to be sent",
NEW
417
                                        seqNum, toUnsignedLong(data.limit())));
×
418
                }
NEW
419
                payload.putInt(SEND_SEQ_DATA.value);
×
NEW
420
                payload.putInt(transactionId);
×
NEW
421
                payload.putInt(seqNum);
×
NEW
422
                putBuffer(data, position, payload);
×
NEW
423
                return new SDPMessage(header(), payload);
×
424
        }
425

426
        private int putBuffer(ByteBuffer data, int position, ByteBuffer payload) {
NEW
427
                var slice = slice(data, position,
×
NEW
428
                                min(data.remaining() - position, payload.remaining()));
×
NEW
429
                payload.put(slice).flip();
×
NEW
430
                return slice.position();
×
431
        }
432

433
        private int calculatePositionFromSequenceNumber(int seqNum) {
NEW
434
                return DATA_IN_FULL_PACKET_WITH_KEY * seqNum;
×
435
        }
436

437
        /**
438
         * generates the tell message.
439
         *
440
         * @param transactionId
441
         *            The transaction id for this stream.
442
         * @return The message indicating the end of the data.
443
         */
444
        SDPMessage tellDataIn(int transactionId) {
NEW
445
                var payload = allocate(BYTES_FOR_TELL_PACKET).order(LITTLE_ENDIAN);
×
NEW
446
                payload.putInt(SEND_TELL_DATA_IN.value);
×
NEW
447
                payload.putInt(transactionId);
×
NEW
448
                payload.flip();
×
NEW
449
                return new SDPMessage(header(), payload);
×
450
        }
451

452
        /**
453
         * Computes the number of packets required to send the given data.
454
         *
455
         * @param data
456
         *            The data being sent. (This operation only reads.)
457
         * @return The number of packets (i.e. 1 more than the max sequence number).
458
         */
459
        static int computeNumPackets(ByteBuffer data) {
NEW
460
                return ceildiv(data.remaining(), DATA_IN_FULL_PACKET_WITH_KEY);
×
461
        }
462

463
        /**
464
         * Exception thrown when something mad comes back off SpiNNaker.
465
         *
466
         * @author Donal Fellows
467
         */
468
        static class BadDataInMessageException extends RuntimeException {
469
                private static final long serialVersionUID = 1L;
470

471
                BadDataInMessageException(int code, IntBuffer message) {
NEW
472
                        super("unexpected response code: " + toUnsignedLong(code));
×
NEW
473
                        log.warn("bad message payload: {}", range(0, message.limit())
×
NEW
474
                                        .map(i -> message.get(i)).boxed().collect(toList()));
×
NEW
475
                }
×
476
        }
477

478
        /**
479
         * Exception thrown when something mad comes back off SpiNNaker.
480
         *
481
         * @author Donal Fellows
482
         * @author Alan Stokes
483
         */
484
        static class CrazySequenceNumberException extends RuntimeException {
485
                private static final long serialVersionUID = 1L;
486

487
                CrazySequenceNumberException(int remaining, IntBuffer message) {
NEW
488
                        super("crazy number of missing packets: "
×
NEW
489
                                        + toUnsignedLong(remaining));
×
NEW
490
                        log.warn("bad message payload: {}", range(0, message.limit())
×
NEW
491
                                        .map(i -> message.get(i)).boxed().collect(toList()));
×
NEW
492
                }
×
493
        }
494
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc