• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 13989614444

20 Mar 2025 11:43AM UTC coverage: 38.216% (-0.4%) from 38.579%
13989614444

Pull #1222

github

rowleya
Fix issues from review
Pull Request #1222: More spalloc rest calls

70 of 815 new or added lines in 33 files covered. (8.59%)

29 existing lines in 13 files now uncovered.

9181 of 24024 relevant lines covered (38.22%)

1.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/protocols/FastDataIn.java
1
/*
2
 * Copyright (c) 2025 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.protocols;
17

18
import static java.lang.Integer.toUnsignedLong;
19
import static java.lang.Math.min;
20
import static java.lang.String.format;
21
import static java.nio.ByteBuffer.allocate;
22
import static java.nio.ByteOrder.LITTLE_ENDIAN;
23
import static java.util.stream.Collectors.toList;
24
import static java.util.stream.IntStream.range;
25
import static org.slf4j.LoggerFactory.getLogger;
26
import static uk.ac.manchester.spinnaker.protocols.FastDataInCommandID.SEND_DATA_TO_LOCATION;
27
import static uk.ac.manchester.spinnaker.protocols.FastDataInCommandID.SEND_SEQ_DATA;
28
import static uk.ac.manchester.spinnaker.protocols.FastDataInCommandID.SEND_TELL_DATA_IN;
29
import static uk.ac.manchester.spinnaker.messages.Constants.SDP_PAYLOAD_WORDS;
30
import static uk.ac.manchester.spinnaker.messages.Constants.WORD_SIZE;
31
import static uk.ac.manchester.spinnaker.messages.sdp.SDPHeader.Flag.REPLY_NOT_EXPECTED;
32
import static uk.ac.manchester.spinnaker.messages.sdp.SDPPort.GATHERER_DATA_SPEED_UP;
33
import static uk.ac.manchester.spinnaker.utils.ByteBufferUtils.slice;
34
import static uk.ac.manchester.spinnaker.utils.MathUtils.ceildiv;
35

36
import java.io.IOException;
37
import java.net.SocketTimeoutException;
38
import java.nio.ByteBuffer;
39
import java.nio.IntBuffer;
40
import java.util.BitSet;
41

42
import org.slf4j.Logger;
43

44
import com.google.errorprone.annotations.CheckReturnValue;
45
import com.google.errorprone.annotations.MustBeClosed;
46

47
import uk.ac.manchester.spinnaker.connections.ThrottledConnection;
48
import uk.ac.manchester.spinnaker.machine.CoreLocation;
49
import uk.ac.manchester.spinnaker.machine.HasChipLocation;
50
import uk.ac.manchester.spinnaker.machine.HasCoreLocation;
51
import uk.ac.manchester.spinnaker.machine.MemoryLocation;
52
import uk.ac.manchester.spinnaker.machine.tags.IPTag;
53
import uk.ac.manchester.spinnaker.messages.sdp.SDPHeader;
54
import uk.ac.manchester.spinnaker.messages.sdp.SDPLocation;
55
import uk.ac.manchester.spinnaker.messages.sdp.SDPMessage;
56
import uk.ac.manchester.spinnaker.transceiver.ProcessException;
57

58
public class FastDataIn implements AutoCloseable {
59

NEW
60
        private static final Logger log = getLogger(FastDataIn.class);
×
61

62
        /** Items of data a SDP packet can hold when SCP header removed. */
63
        private static final int BYTES_PER_FULL_PACKET =
64
                        SDP_PAYLOAD_WORDS * WORD_SIZE;
65

66
        // 272 bytes as removed SCP header
67

68
        /**
69
         * size of the location data packet (command, transaction id, start sdram
70
         * address, x and y, and max packet number.
71
         */
72
        private static final int BYTES_FOR_LOCATION_PACKET = 5 * WORD_SIZE;
73

74
        /**
75
         * Offset where data in starts on first command (command, transaction id,
76
         * seq_number), in bytes.
77
         */
78
        private static final int OFFSET_AFTER_COMMAND_AND_KEY = 3 * WORD_SIZE;
79

80
        /** Size for data to store when packet with command and key. */
81
        private static final int DATA_IN_FULL_PACKET_WITH_KEY =
82
                        BYTES_PER_FULL_PACKET - OFFSET_AFTER_COMMAND_AND_KEY;
83

84
        /**
85
         * size for data to store when sending tell packet (command id, transaction
86
         * id).
87
         */
88
        private static final int BYTES_FOR_TELL_PACKET = 2 * WORD_SIZE;
89

90
        private static final int TIMEOUT_RETRY_LIMIT = 100;
91

92
        /** flag for saying missing all SEQ numbers. */
93
        private static final int FLAG_FOR_MISSING_ALL_SEQUENCES = 0xFFFFFFFE;
94

95
        /** Sequence number that marks the end of a sequence number stream. */
96
        private static final int MISSING_SEQS_END = -1;
97

98
        private final HasCoreLocation gathererCore;
99

100
        private final ThrottledConnection connection;
101

102
        /** The current transaction id for the board. */
NEW
103
        private int transactionId = 0;
×
104

105
        /**
106
         * Create an instance of the protocol for talking to a particular extra
107
         * monitor core on a particular board.
108
         *
109
         * @param gathererCore
110
         *            The gatherer core on the board that messages will be routed
111
         *            via.
112
         * @param iptag The IPTag to use for communications.
113
         * @throws IOException
114
         *             If IO fails.
115
         * @throws ProcessException
116
         *             If SpiNNaker rejects the reprogramming of the tag.
117
         * @throws InterruptedException
118
         *             If communications are interrupted.
119
         */
120
        @MustBeClosed
121
        @SuppressWarnings("MustBeClosed")
122
        public FastDataIn(CoreLocation gathererCore, IPTag iptag)
NEW
123
                                        throws ProcessException, IOException, InterruptedException {
×
NEW
124
                log.debug("Creating fast data in for " + gathererCore + " with tag "
×
125
                                        + iptag);
NEW
126
                this.gathererCore = gathererCore;
×
NEW
127
                this.connection = new ThrottledConnection(iptag);
×
NEW
128
        }
×
129

130
        @Override
131
        public void close() throws IOException {
NEW
132
                connection.close();
×
NEW
133
        }
×
134

135
        /**
136
         * Write data to a given memory location.
137
         *
138
         * @param boardLocalDestination
139
         *            The target chip of the write.
140
         * @param baseAddress
141
         *            Whether the data will be written.
142
         * @param data
143
         *            The data to be written.
144
         * @throws IOException
145
         *             If IO fails.
146
         * @throws InterruptedException
147
         *            If communications are interrupted.
148
         */
149
        public void fastWrite(HasChipLocation boardLocalDestination,
150
                        MemoryLocation baseAddress, ByteBuffer data)
151
                                        throws IOException, InterruptedException {
NEW
152
                log.debug("Fast writing via " + gathererCore + ": " + connection
×
153
                                        + " to board relative location "
154
                                        + boardLocalDestination + " address " + baseAddress
NEW
155
                                        + " " + data.remaining() + " bytes");
×
NEW
156
                int timeoutCount = 0;
×
NEW
157
                int numPackets = computeNumPackets(data);
×
NEW
158
                int transactionId = ++this.transactionId;
×
159

160
                outerLoop: while (true) {
161
                        // Do the initial blast of data
NEW
162
                        sendInitialPackets(boardLocalDestination,
×
163
                                        baseAddress, data, transactionId, numPackets);
164
                        /*
165
                         * Don't create a missing buffer until at least one packet has
166
                         * come back.
167
                         */
NEW
168
                        BitSet missing = null;
×
169

170
                        // Wait for confirmation and do required retransmits
171
                        innerLoop: while (true) {
172
                                try {
NEW
173
                                        var buf = connection.receive();
×
NEW
174
                                        var received = buf.order(LITTLE_ENDIAN).asIntBuffer();
×
NEW
175
                                        timeoutCount = 0; // Reset the timeout counter
×
NEW
176
                                        int command = received.get();
×
177
                                        try {
178
                                                // read transaction id
NEW
179
                                                var commandCode = FastDataInCommandID.forValue(command);
×
NEW
180
                                                int thisTransactionId = received.get();
×
181

182
                                                // if wrong transaction id, ignore packet
NEW
183
                                                if (thisTransactionId != transactionId) {
×
NEW
184
                                                        continue innerLoop;
×
185
                                                }
186

187
                                                // Decide what to do with the packet
NEW
188
                                                switch (commandCode) {
×
189
                                                case RECEIVE_FINISHED_DATA_IN:
190
                                                        // We're done!
NEW
191
                                                        break outerLoop;
×
192

193
                                                case RECEIVE_MISSING_SEQ_DATA_IN:
NEW
194
                                                        if (!received.hasRemaining()) {
×
NEW
195
                                                                throw new BadDataInMessageException(
×
NEW
196
                                                                                received.get(0), received);
×
197
                                                        }
NEW
198
                                                        log.debug(
×
199
                                                                        "another packet (#{}) of missing "
200
                                                                                        + "sequence numbers;",
NEW
201
                                                                        received.get(1));
×
NEW
202
                                                        break;
×
203
                                                default:
NEW
204
                                                        throw new BadDataInMessageException(
×
NEW
205
                                                                        received.get(0), received);
×
206
                                                }
207

208
                                                /*
209
                                                 * The currently received packet has missing
210
                                                 * sequence numbers. Accumulate and dispatch
211
                                                 * transactionId when we've got them all.
212
                                                 */
NEW
213
                                                if (missing == null) {
×
NEW
214
                                                        missing = new BitSet(numPackets);
×
215
                                                }
NEW
216
                                                var flags = addMissedSeqNums(
×
217
                                                                received, missing, numPackets);
218

219
                                                /*
220
                                                 * Check that you've seen something that implies
221
                                                 * ready to retransmit.
222
                                                 */
NEW
223
                                                if (flags.seenAll || flags.seenEnd) {
×
NEW
224
                                                        retransmitMissingPackets(data, missing,
×
225
                                                                        transactionId);
NEW
226
                                                        missing.clear();
×
227
                                                }
NEW
228
                                        } catch (IllegalArgumentException e) {
×
NEW
229
                                                log.error("Unexpected command code " + command
×
230
                                                                + " received from "
NEW
231
                                                                + connection.getLocation());
×
NEW
232
                                        }
×
NEW
233
                                } catch (SocketTimeoutException e) {
×
NEW
234
                                        if (timeoutCount++ > TIMEOUT_RETRY_LIMIT) {
×
NEW
235
                                                log.error(
×
236
                                                                "ran out of attempts on transaction {}"
237
                                                                                + " due to timeouts.",
NEW
238
                                                                transactionId);
×
NEW
239
                                                throw e;
×
240
                                        }
241
                                        /*
242
                                         * If we never received a packet, we will never have
243
                                         * created the buffer, so send everything again
244
                                         */
NEW
245
                                        if (missing == null) {
×
NEW
246
                                                log.debug("full timeout; resending initial "
×
247
                                                                + "packets for stream with transaction "
NEW
248
                                                                + "id {}", transactionId);
×
NEW
249
                                                continue outerLoop;
×
250
                                        }
NEW
251
                                        log.warn(
×
252
                                                        "timeout {} on transaction {} writing to {}"
253
                                                                        + " via {}",
NEW
254
                                                        timeoutCount, transactionId, baseAddress,
×
255
                                                        gathererCore);
NEW
256
                                        retransmitMissingPackets(data, missing,
×
257
                                                        transactionId);
NEW
258
                                        missing.clear();
×
NEW
259
                                }
×
260
                        }
261
                }
NEW
262
        }
×
263

264
        @CheckReturnValue
265
        private SeenFlags addMissedSeqNums(IntBuffer received, BitSet seqNums,
266
                        int expectedMax) {
NEW
267
                var flags = new SeenFlags();
×
NEW
268
                var addedEnd = "";
×
NEW
269
                var addedAll = "";
×
NEW
270
                int actuallyAdded = 0;
×
NEW
271
                while (received.hasRemaining()) {
×
NEW
272
                        int num = received.get();
×
273

NEW
274
                        if (num == MISSING_SEQS_END) {
×
NEW
275
                                addedEnd = "and saw END marker";
×
NEW
276
                                flags.seenEnd = true;
×
NEW
277
                                break;
×
278
                        }
NEW
279
                        if (num == FLAG_FOR_MISSING_ALL_SEQUENCES) {
×
NEW
280
                                addedAll = "by finding ALL missing marker";
×
NEW
281
                                flags.seenAll = true;
×
NEW
282
                                for (int seqNum = 0; seqNum < expectedMax; seqNum++) {
×
NEW
283
                                        seqNums.set(seqNum);
×
NEW
284
                                        actuallyAdded++;
×
285
                                }
NEW
286
                                break;
×
287
                        }
288

NEW
289
                        seqNums.set(num);
×
NEW
290
                        actuallyAdded++;
×
NEW
291
                        if (num < 0 || num > expectedMax) {
×
NEW
292
                                throw new CrazySequenceNumberException(num, received);
×
293
                        }
NEW
294
                }
×
NEW
295
                log.debug("added {} missed packets, {}{}", actuallyAdded, addedEnd,
×
296
                                addedAll);
NEW
297
                return flags;
×
298
        }
299

300
        private int sendInitialPackets(HasChipLocation boardLocalDestination,
301
                        MemoryLocation baseAddress, ByteBuffer data, int transactionId,
302
                        int numPackets) throws IOException {
NEW
303
                log.debug("streaming {} bytes in {} packets using transaction {}",
×
NEW
304
                                data.remaining(), numPackets, transactionId);
×
NEW
305
                log.debug("sending packet #{}", 0);
×
NEW
306
                connection.send(dataToLocation(boardLocalDestination, baseAddress,
×
307
                                numPackets, transactionId));
NEW
308
                for (int seqNum = 0; seqNum < numPackets; seqNum++) {
×
NEW
309
                        log.debug("sending packet #{}", seqNum);
×
NEW
310
                        connection.send(seqData(data, seqNum, transactionId));
×
311
                }
NEW
312
                log.debug("sending terminating packet");
×
NEW
313
                connection.send(tellDataIn(transactionId));
×
NEW
314
                return numPackets;
×
315
        }
316

317
        private void retransmitMissingPackets(ByteBuffer dataToSend,
318
                        BitSet missingSeqNums, int transactionId)
319
                        throws IOException {
NEW
320
                log.debug("retransmitting {} packets", missingSeqNums.cardinality());
×
321

NEW
322
                missingSeqNums.stream().forEach(seqNum -> {
×
NEW
323
                        log.debug("resending packet #{}", seqNum);
×
324
                        try {
NEW
325
                                connection.send(seqData(dataToSend, seqNum, transactionId));
×
NEW
326
                        } catch (IOException e) {
×
NEW
327
                                log.error(
×
328
                                                "missing sequence packet with id {}-{} "
329
                                                                + "failed to transmit",
NEW
330
                                                seqNum, transactionId, e);
×
NEW
331
                        }
×
NEW
332
                });
×
NEW
333
                log.debug("sending terminating packet");
×
NEW
334
                connection.send(tellDataIn(transactionId));
×
NEW
335
        }
×
336

337
        /**
338
         * Contains flags for seen missing sequence numbers.
339
         *
340
         * @author Alan Stokes
341
         */
342
        private static final class SeenFlags {
343
                boolean seenEnd;
344

345
                boolean seenAll;
346
        }
347

348
        private SDPHeader header() {
NEW
349
                return new SDPHeader(REPLY_NOT_EXPECTED, new SDPLocation(gathererCore),
×
350
                                GATHERER_DATA_SPEED_UP.value);
351
        }
352

353
        /**
354
         * @param boardLocalDestination
355
         *            The destination for the data on the board being used.
356
         * @param baseAddress
357
         *            Where the data is to be written.
358
         * @param numPackets
359
         *            How many SDP packets will be sent.
360
         * @param transactionId
361
         *            The transaction id of this stream.
362
         * @return The message indicating the start of the data.
363
         */
364
        private SDPMessage dataToLocation(HasChipLocation boardLocalDestination,
365
                        MemoryLocation baseAddress, int numPackets, int transactionId) {
NEW
366
                var payload = allocate(BYTES_FOR_LOCATION_PACKET).order(LITTLE_ENDIAN);
×
NEW
367
                payload.putInt(SEND_DATA_TO_LOCATION.value);
×
NEW
368
                payload.putInt(transactionId);
×
NEW
369
                payload.putInt(baseAddress.address);
×
NEW
370
                payload.putShort((short) boardLocalDestination.getY());
×
NEW
371
                payload.putShort((short) boardLocalDestination.getX());
×
NEW
372
                payload.putInt(numPackets - 1);
×
NEW
373
                payload.flip();
×
NEW
374
                return new SDPMessage(header(), payload);
×
375
        }
376

377
        /**
378
         * @param data
379
         *            The overall data to be transmitted.
380
         * @param seqNum
381
         *            The sequence number of this chunk.
382
         *
383
         * @param transactionId
384
         *            The transaction id for this stream.
385
         * @return The message containing a chunk of the data.
386
         * @throws RuntimeException
387
         *             If the sequence number is nonsense.
388
         */
389
        SDPMessage seqData(ByteBuffer data, int seqNum, int transactionId) {
NEW
390
                var payload = allocate(BYTES_PER_FULL_PACKET).order(LITTLE_ENDIAN);
×
NEW
391
                int position = calculatePositionFromSequenceNumber(seqNum);
×
NEW
392
                if (position >= data.limit()) {
×
NEW
393
                        throw new RuntimeException(format(
×
394
                                        "attempt to write off end of buffer due to "
395
                                                        + "over-large sequence number (%d) given "
396
                                                        + "that only %d bytes are to be sent",
NEW
397
                                        seqNum, toUnsignedLong(data.limit())));
×
398
                }
NEW
399
                payload.putInt(SEND_SEQ_DATA.value);
×
NEW
400
                payload.putInt(transactionId);
×
NEW
401
                payload.putInt(seqNum);
×
NEW
402
                putBuffer(data, position, payload);
×
NEW
403
                return new SDPMessage(header(), payload);
×
404
        }
405

406
        private int putBuffer(ByteBuffer data, int position, ByteBuffer payload) {
NEW
407
                var slice = slice(data, position,
×
NEW
408
                                min(data.remaining() - position, payload.remaining()));
×
NEW
409
                payload.put(slice).flip();
×
NEW
410
                return slice.position();
×
411
        }
412

413
        private int calculatePositionFromSequenceNumber(int seqNum) {
NEW
414
                return DATA_IN_FULL_PACKET_WITH_KEY * seqNum;
×
415
        }
416

417
        /**
418
         * generates the tell message.
419
         *
420
         * @param transactionId
421
         *            The transaction id for this stream.
422
         * @return The message indicating the end of the data.
423
         */
424
        SDPMessage tellDataIn(int transactionId) {
NEW
425
                var payload = allocate(BYTES_FOR_TELL_PACKET).order(LITTLE_ENDIAN);
×
NEW
426
                payload.putInt(SEND_TELL_DATA_IN.value);
×
NEW
427
                payload.putInt(transactionId);
×
NEW
428
                payload.flip();
×
NEW
429
                return new SDPMessage(header(), payload);
×
430
        }
431

432
        /**
433
         * Computes the number of packets required to send the given data.
434
         *
435
         * @param data
436
         *            The data being sent. (This operation only reads.)
437
         * @return The number of packets (i.e. 1 more than the max sequence number).
438
         */
439
        static int computeNumPackets(ByteBuffer data) {
NEW
440
                return ceildiv(data.remaining(), DATA_IN_FULL_PACKET_WITH_KEY);
×
441
        }
442

443
        /**
444
         * Exception thrown when something mad comes back off SpiNNaker.
445
         *
446
         * @author Donal Fellows
447
         */
448
        static class BadDataInMessageException extends RuntimeException {
449
                private static final long serialVersionUID = 1L;
450

451
                BadDataInMessageException(int code, IntBuffer message) {
NEW
452
                        super("unexpected response code: " + toUnsignedLong(code));
×
NEW
453
                        log.warn("bad message payload: {}", range(0, message.limit())
×
NEW
454
                                        .map(i -> message.get(i)).boxed().collect(toList()));
×
NEW
455
                }
×
456
        }
457

458
        /**
459
         * Exception thrown when something mad comes back off SpiNNaker.
460
         *
461
         * @author Donal Fellows
462
         * @author Alan Stokes
463
         */
464
        static class CrazySequenceNumberException extends RuntimeException {
465
                private static final long serialVersionUID = 1L;
466

467
                CrazySequenceNumberException(int remaining, IntBuffer message) {
NEW
468
                        super("crazy number of missing packets: "
×
NEW
469
                                        + toUnsignedLong(remaining));
×
NEW
470
                        log.warn("bad message payload: {}", range(0, message.limit())
×
NEW
471
                                        .map(i -> message.get(i)).boxed().collect(toList()));
×
NEW
472
                }
×
473
        }
474
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc