• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 13680697433

05 Mar 2025 04:27PM UTC coverage: 38.381% (-0.02%) from 38.399%
13680697433

push

github

rowleya
Avoid asking for the size of the machine after configuring for no drop

0 of 20 new or added lines in 4 files covered. (0.0%)

9 existing lines in 2 files now uncovered.

9185 of 23931 relevant lines covered (38.38%)

1.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/protocols/FastDataIn.java
1
/*
2
 * Copyright (c) 2025 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.protocols;
17

18
import static java.lang.Integer.toUnsignedLong;
19
import static java.lang.Math.min;
20
import static java.lang.String.format;
21
import static java.nio.ByteBuffer.allocate;
22
import static java.nio.ByteOrder.LITTLE_ENDIAN;
23
import static java.util.stream.Collectors.toList;
24
import static java.util.stream.IntStream.range;
25
import static org.slf4j.LoggerFactory.getLogger;
26
import static uk.ac.manchester.spinnaker.protocols.FastDataInCommandID.SEND_DATA_TO_LOCATION;
27
import static uk.ac.manchester.spinnaker.protocols.FastDataInCommandID.SEND_SEQ_DATA;
28
import static uk.ac.manchester.spinnaker.protocols.FastDataInCommandID.SEND_TELL_DATA_IN;
29
import static uk.ac.manchester.spinnaker.messages.Constants.SDP_PAYLOAD_WORDS;
30
import static uk.ac.manchester.spinnaker.messages.Constants.WORD_SIZE;
31
import static uk.ac.manchester.spinnaker.messages.sdp.SDPHeader.Flag.REPLY_NOT_EXPECTED;
32
import static uk.ac.manchester.spinnaker.messages.sdp.SDPPort.GATHERER_DATA_SPEED_UP;
33
import static uk.ac.manchester.spinnaker.utils.ByteBufferUtils.slice;
34
import static uk.ac.manchester.spinnaker.utils.MathUtils.ceildiv;
35

36
import java.io.IOException;
37
import java.net.SocketTimeoutException;
38
import java.nio.ByteBuffer;
39
import java.nio.IntBuffer;
40
import java.util.BitSet;
41

42
import org.slf4j.Logger;
43

44
import com.google.errorprone.annotations.CheckReturnValue;
45
import com.google.errorprone.annotations.MustBeClosed;
46

47
import uk.ac.manchester.spinnaker.connections.ThrottledConnection;
48
import uk.ac.manchester.spinnaker.machine.ChipLocation;
49
import uk.ac.manchester.spinnaker.machine.CoreLocation;
50
import uk.ac.manchester.spinnaker.machine.HasChipLocation;
51
import uk.ac.manchester.spinnaker.machine.HasCoreLocation;
52
import uk.ac.manchester.spinnaker.machine.MemoryLocation;
53
import uk.ac.manchester.spinnaker.machine.tags.IPTag;
54
import uk.ac.manchester.spinnaker.messages.sdp.SDPHeader;
55
import uk.ac.manchester.spinnaker.messages.sdp.SDPLocation;
56
import uk.ac.manchester.spinnaker.messages.sdp.SDPMessage;
57
import uk.ac.manchester.spinnaker.transceiver.ProcessException;
58
import uk.ac.manchester.spinnaker.transceiver.TransceiverInterface;
59

60
public class FastDataIn implements AutoCloseable {
61

62
        private static final Logger log = getLogger(FastDataIn.class);
×
63

64
        /** Items of data a SDP packet can hold when SCP header removed. */
65
        private static final int BYTES_PER_FULL_PACKET =
66
                        SDP_PAYLOAD_WORDS * WORD_SIZE;
67

68
        // 272 bytes as removed SCP header
69

70
        /**
71
         * size of the location data packet (command, transaction id, start sdram
72
         * address, x and y, and max packet number.
73
         */
74
        private static final int BYTES_FOR_LOCATION_PACKET = 5 * WORD_SIZE;
75

76
        /**
77
         * Offset where data in starts on first command (command, transaction id,
78
         * seq_number), in bytes.
79
         */
80
        private static final int OFFSET_AFTER_COMMAND_AND_KEY = 3 * WORD_SIZE;
81

82
        /** Size for data to store when packet with command and key. */
83
        private static final int DATA_IN_FULL_PACKET_WITH_KEY =
84
                        BYTES_PER_FULL_PACKET - OFFSET_AFTER_COMMAND_AND_KEY;
85

86
        /**
87
         * size for data to store when sending tell packet (command id, transaction
88
         * id).
89
         */
90
        private static final int BYTES_FOR_TELL_PACKET = 2 * WORD_SIZE;
91

92
        private static final int TIMEOUT_RETRY_LIMIT = 100;
93

94
        /** flag for saying missing all SEQ numbers. */
95
        private static final int FLAG_FOR_MISSING_ALL_SEQUENCES = 0xFFFFFFFE;
96

97
        /** Sequence number that marks the end of a sequence number stream. */
98
        private static final int MISSING_SEQS_END = -1;
99

100
        private final HasCoreLocation gathererCore;
101

102
        private final ThrottledConnection connection;
103

104
        /** The current transaction id for the board. */
105
        private int transactionId = 0;
×
106

107
        /**
108
         * Create an instance of the protocol for talking to a particular extra
109
         * monitor core on a particular board.
110
         *
111
         * @param maxChipX The maximum X coordinate of the machine to speak to.
112
         * @param maxChipY The maximum Y coordinate of the machine to speak to.
113
         * @param gathererCore
114
         *            The gatherer core on the board that messages will be routed
115
         *            via.
116
         * @param transceiver A transceiver to use for communications.
117
         * @param ethernetChip The Ethernet chip to talk to the chip via.
118
         * @param ethernetAddress The Ethernet address of the chip to talk via.
119
         * @param iptag The IPTag to use for communications.
120
         * @throws IOException
121
         *             If IO fails.
122
         * @throws ProcessException
123
         *             If SpiNNaker rejects the reprogramming of the tag.
124
         * @throws InterruptedException
125
         *             If communications are interrupted.
126
         */
127
        @MustBeClosed
128
        @SuppressWarnings("MustBeClosed")
129
        public FastDataIn(CoreLocation gathererCore,
130
                        TransceiverInterface transceiver, ChipLocation ethernetChip,
131
                        String ethernetAddress, IPTag iptag)
132
                                        throws ProcessException, IOException, InterruptedException {
×
133
                this.gathererCore = gathererCore;
×
134
                this.connection = new ThrottledConnection(transceiver, ethernetChip,
×
135
                                ethernetAddress, iptag);
136
        }
×
137

138
        @Override
139
        public void close() throws IOException {
140
                connection.close();
×
141
        }
×
142

143
        /**
144
         * Write data to a given memory location.
145
         *
146
         * @param destination
147
         *            The target chip of the write.
148
         * @param baseAddress
149
         *            Whether the data will be written.
150
         * @param data
151
         *            The data to be written.
152
         * @throws IOException
153
         *             If IO fails.
154
         * @throws InterruptedException
155
         *            If communications are interrupted.
156
         */
157
        public void fastWrite(HasChipLocation boardLocalDestination,
158
                        MemoryLocation baseAddress, ByteBuffer data)
159
                                        throws IOException, InterruptedException {
160
                int timeoutCount = 0;
×
161
                int numPackets = computeNumPackets(data);
×
162
                int transactionId = ++this.transactionId;
×
163

164
                outerLoop: while (true) {
165
                        // Do the initial blast of data
NEW
166
                        sendInitialPackets(boardLocalDestination,
×
167
                                        baseAddress, data, transactionId, numPackets);
168
                        /*
169
                         * Don't create a missing buffer until at least one packet has
170
                         * come back.
171
                         */
172
                        BitSet missing = null;
×
173

174
                        // Wait for confirmation and do required retransmits
175
                        innerLoop: while (true) {
176
                                try {
177
                                        var buf = connection.receive();
×
178
                                        var received = buf.order(LITTLE_ENDIAN).asIntBuffer();
×
179
                                        timeoutCount = 0; // Reset the timeout counter
×
180
                                        int command = received.get();
×
181
                                        try {
182
                                                // read transaction id
183
                                                var commandCode = FastDataInCommandID.forValue(command);
×
184
                                                int thisTransactionId = received.get();
×
185

186
                                                // if wrong transaction id, ignore packet
187
                                                if (thisTransactionId != transactionId) {
×
188
                                                        continue innerLoop;
×
189
                                                }
190

191
                                                // Decide what to do with the packet
192
                                                switch (commandCode) {
×
193
                                                case RECEIVE_FINISHED_DATA_IN:
194
                                                        // We're done!
195
                                                        break outerLoop;
×
196

197
                                                case RECEIVE_MISSING_SEQ_DATA_IN:
198
                                                        if (!received.hasRemaining()) {
×
199
                                                                throw new BadDataInMessageException(
×
200
                                                                                received.get(0), received);
×
201
                                                        }
202
                                                        log.debug(
×
203
                                                                        "another packet (#{}) of missing "
204
                                                                                        + "sequence numbers;",
205
                                                                        received.get(1));
×
206
                                                        break;
×
207
                                                default:
208
                                                        throw new BadDataInMessageException(
×
209
                                                                        received.get(0), received);
×
210
                                                }
211

212
                                                /*
213
                                                 * The currently received packet has missing
214
                                                 * sequence numbers. Accumulate and dispatch
215
                                                 * transactionId when we've got them all.
216
                                                 */
217
                                                if (missing == null) {
×
218
                                                        missing = new BitSet(numPackets);
×
219
                                                }
220
                                                var flags = addMissedSeqNums(
×
221
                                                                received, missing, numPackets);
222

223
                                                /*
224
                                                 * Check that you've seen something that implies
225
                                                 * ready to retransmit.
226
                                                 */
227
                                                if (flags.seenAll || flags.seenEnd) {
×
228
                                                        retransmitMissingPackets(data, missing,
×
229
                                                                        transactionId);
230
                                                        missing.clear();
×
231
                                                }
232
                                        } catch (IllegalArgumentException e) {
×
233
                                                log.error("Unexpected command code " + command
×
234
                                                                + " received from "
235
                                                                + connection.getLocation());
×
236
                                        }
×
237
                                } catch (SocketTimeoutException e) {
×
238
                                        if (timeoutCount++ > TIMEOUT_RETRY_LIMIT) {
×
239
                                                log.error(
×
240
                                                                "ran out of attempts on transaction {}"
241
                                                                                + " due to timeouts.",
242
                                                                transactionId);
×
243
                                                throw e;
×
244
                                        }
245
                                        /*
246
                                         * If we never received a packet, we will never have
247
                                         * created the buffer, so send everything again
248
                                         */
249
                                        if (missing == null) {
×
250
                                                log.debug("full timeout; resending initial "
×
251
                                                                + "packets for stream with transaction "
252
                                                                + "id {}", transactionId);
×
253
                                                continue outerLoop;
×
254
                                        }
255
                                        log.warn(
×
256
                                                        "timeout {} on transaction {} writing to {}"
257
                                                                        + " via {}",
258
                                                        timeoutCount, transactionId, baseAddress,
×
259
                                                        gathererCore);
260
                                        retransmitMissingPackets(data, missing,
×
261
                                                        transactionId);
262
                                        missing.clear();
×
263
                                }
×
264
                        }
265
                }
266
        }
×
267

268
        @CheckReturnValue
269
        private SeenFlags addMissedSeqNums(IntBuffer received, BitSet seqNums,
270
                        int expectedMax) {
271
                var flags = new SeenFlags();
×
272
                var addedEnd = "";
×
273
                var addedAll = "";
×
274
                int actuallyAdded = 0;
×
275
                while (received.hasRemaining()) {
×
276
                        int num = received.get();
×
277

278
                        if (num == MISSING_SEQS_END) {
×
279
                                addedEnd = "and saw END marker";
×
280
                                flags.seenEnd = true;
×
281
                                break;
×
282
                        }
283
                        if (num == FLAG_FOR_MISSING_ALL_SEQUENCES) {
×
284
                                addedAll = "by finding ALL missing marker";
×
285
                                flags.seenAll = true;
×
286
                                for (int seqNum = 0; seqNum < expectedMax; seqNum++) {
×
287
                                        seqNums.set(seqNum);
×
288
                                        actuallyAdded++;
×
289
                                }
290
                                break;
×
291
                        }
292

293
                        seqNums.set(num);
×
294
                        actuallyAdded++;
×
295
                        if (num < 0 || num > expectedMax) {
×
296
                                throw new CrazySequenceNumberException(num, received);
×
297
                        }
298
                }
×
299
                log.debug("added {} missed packets, {}{}", actuallyAdded, addedEnd,
×
300
                                addedAll);
301
                return flags;
×
302
        }
303

304
        private int sendInitialPackets(HasChipLocation boardLocalDestination,
305
                        MemoryLocation baseAddress, ByteBuffer data, int transactionId,
306
                        int numPackets) throws IOException {
307
                log.debug("streaming {} bytes in {} packets using transaction {}",
×
308
                                data.remaining(), numPackets, transactionId);
×
309
                log.debug("sending packet #{}", 0);
×
310
                connection.send(dataToLocation(boardLocalDestination, baseAddress,
×
311
                                numPackets, transactionId));
312
                for (int seqNum = 0; seqNum < numPackets; seqNum++) {
×
313
                        log.debug("sending packet #{}", seqNum);
×
314
                        connection.send(seqData(data, seqNum, transactionId));
×
315
                }
316
                log.debug("sending terminating packet");
×
317
                connection.send(tellDataIn(transactionId));
×
318
                return numPackets;
×
319
        }
320

321
        private void retransmitMissingPackets(ByteBuffer dataToSend,
322
                        BitSet missingSeqNums, int transactionId)
323
                        throws IOException {
324
                log.debug("retransmitting {} packets", missingSeqNums.cardinality());
×
325

326
                missingSeqNums.stream().forEach(seqNum -> {
×
327
                        log.debug("resending packet #{}", seqNum);
×
328
                        try {
329
                                connection.send(seqData(dataToSend, seqNum, transactionId));
×
330
                        } catch (IOException e) {
×
331
                                log.error(
×
332
                                                "missing sequence packet with id {}-{} "
333
                                                                + "failed to transmit",
334
                                                seqNum, transactionId, e);
×
335
                        }
×
336
                });
×
337
                log.debug("sending terminating packet");
×
338
                connection.send(tellDataIn(transactionId));
×
339
        }
×
340

341
        /**
342
         * Contains flags for seen missing sequence numbers.
343
         *
344
         * @author Alan Stokes
345
         */
346
        private static final class SeenFlags {
347
                boolean seenEnd;
348

349
                boolean seenAll;
350
        }
351

352
        private SDPHeader header() {
353
                return new SDPHeader(REPLY_NOT_EXPECTED, new SDPLocation(gathererCore),
×
354
                                GATHERER_DATA_SPEED_UP.value);
355
        }
356

357
        /**
358
         * @param boardLocalDestination
359
         *            The destination for the data on the board being used.
360
         * @param baseAddress
361
         *            Where the data is to be written.
362
         * @param numPackets
363
         *            How many SDP packets will be sent.
364
         * @param transactionId
365
         *            The transaction id of this stream.
366
         * @return The message indicating the start of the data.
367
         */
368
        private SDPMessage dataToLocation(HasChipLocation boardLocalDestination,
369
                        MemoryLocation baseAddress, int numPackets, int transactionId) {
370
                var payload = allocate(BYTES_FOR_LOCATION_PACKET).order(LITTLE_ENDIAN);
×
371
                payload.putInt(SEND_DATA_TO_LOCATION.value);
×
372
                payload.putInt(transactionId);
×
373
                payload.putInt(baseAddress.address);
×
374
                payload.putShort((short) boardLocalDestination.getY());
×
375
                payload.putShort((short) boardLocalDestination.getX());
×
376
                payload.putInt(numPackets - 1);
×
377
                payload.flip();
×
378
                return new SDPMessage(header(), payload);
×
379
        }
380

381
        /**
382
         * @param data
383
         *            The overall data to be transmitted.
384
         * @param seqNum
385
         *            The sequence number of this chunk.
386
         *
387
         * @param transactionId
388
         *            The transaction id for this stream.
389
         * @return The message containing a chunk of the data.
390
         * @throws RuntimeException
391
         *             If the sequence number is nonsense.
392
         */
393
        SDPMessage seqData(ByteBuffer data, int seqNum, int transactionId) {
394
                var payload = allocate(BYTES_PER_FULL_PACKET).order(LITTLE_ENDIAN);
×
395
                int position = calculatePositionFromSequenceNumber(seqNum);
×
396
                if (position >= data.limit()) {
×
397
                        throw new RuntimeException(format(
×
398
                                        "attempt to write off end of buffer due to "
399
                                                        + "over-large sequence number (%d) given "
400
                                                        + "that only %d bytes are to be sent",
401
                                        seqNum, toUnsignedLong(data.limit())));
×
402
                }
403
                payload.putInt(SEND_SEQ_DATA.value);
×
404
                payload.putInt(transactionId);
×
405
                payload.putInt(seqNum);
×
406
                putBuffer(data, position, payload);
×
407
                return new SDPMessage(header(), payload);
×
408
        }
409

410
        private int putBuffer(ByteBuffer data, int position, ByteBuffer payload) {
411
                var slice = slice(data, position,
×
412
                                min(data.remaining() - position, payload.remaining()));
×
413
                payload.put(slice).flip();
×
414
                return slice.position();
×
415
        }
416

417
        private int calculatePositionFromSequenceNumber(int seqNum) {
418
                return DATA_IN_FULL_PACKET_WITH_KEY * seqNum;
×
419
        }
420

421
        /**
422
         * generates the tell message.
423
         *
424
         * @param transactionId
425
         *            The transaction id for this stream.
426
         * @return The message indicating the end of the data.
427
         */
428
        SDPMessage tellDataIn(int transactionId) {
429
                var payload = allocate(BYTES_FOR_TELL_PACKET).order(LITTLE_ENDIAN);
×
430
                payload.putInt(SEND_TELL_DATA_IN.value);
×
431
                payload.putInt(transactionId);
×
432
                payload.flip();
×
433
                return new SDPMessage(header(), payload);
×
434
        }
435

436
        /**
437
         * Computes the number of packets required to send the given data.
438
         *
439
         * @param data
440
         *            The data being sent. (This operation only reads.)
441
         * @return The number of packets (i.e. 1 more than the max sequence number).
442
         */
443
        static int computeNumPackets(ByteBuffer data) {
444
                return ceildiv(data.remaining(), DATA_IN_FULL_PACKET_WITH_KEY);
×
445
        }
446

447
        /**
448
         * Exception thrown when something mad comes back off SpiNNaker.
449
         *
450
         * @author Donal Fellows
451
         */
452
        static class BadDataInMessageException extends RuntimeException {
453
                private static final long serialVersionUID = 1L;
454

455
                BadDataInMessageException(int code, IntBuffer message) {
456
                        super("unexpected response code: " + toUnsignedLong(code));
×
457
                        log.warn("bad message payload: {}", range(0, message.limit())
×
458
                                        .map(i -> message.get(i)).boxed().collect(toList()));
×
459
                }
×
460
        }
461

462
        /**
463
         * Exception thrown when something mad comes back off SpiNNaker.
464
         *
465
         * @author Donal Fellows
466
         * @author Alan Stokes
467
         */
468
        static class CrazySequenceNumberException extends RuntimeException {
469
                private static final long serialVersionUID = 1L;
470

471
                CrazySequenceNumberException(int remaining, IntBuffer message) {
472
                        super("crazy number of missing packets: "
×
473
                                        + toUnsignedLong(remaining));
×
474
                        log.warn("bad message payload: {}", range(0, message.limit())
×
475
                                        .map(i -> message.get(i)).boxed().collect(toList()));
×
476
                }
×
477
        }
478
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc