• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 6233274834

19 Sep 2023 08:46AM UTC coverage: 36.409% (-0.6%) from 36.982%
6233274834

Pull #658

github

dkfellows
Merge branch 'master' into java-17
Pull Request #658: Update Java version to 17

1656 of 1656 new or added lines in 260 files covered. (100.0%)

8373 of 22997 relevant lines covered (36.41%)

0.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/RuntimeControlProcess.java
1
/*
2
 * Copyright (c) 2018 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.transceiver;
17

18
import static java.lang.Math.min;
19
import static java.util.Objects.requireNonNull;
20
import static uk.ac.manchester.spinnaker.messages.Constants.CPU_IOBUF_ADDRESS_OFFSET;
21
import static uk.ac.manchester.spinnaker.messages.Constants.UDP_MESSAGE_MAX_SIZE;
22
import static uk.ac.manchester.spinnaker.transceiver.Utils.getVcpuAddress;
23
import static uk.ac.manchester.spinnaker.utils.ByteBufferUtils.alloc;
24

25
import java.io.IOException;
26
import java.nio.ByteBuffer;
27
import java.util.ArrayDeque;
28
import java.util.Iterator;
29
import java.util.Map;
30
import java.util.Queue;
31
import java.util.TreeMap;
32

33
import uk.ac.manchester.spinnaker.connections.ConnectionSelector;
34
import uk.ac.manchester.spinnaker.connections.SCPConnection;
35
import uk.ac.manchester.spinnaker.machine.CoreLocation;
36
import uk.ac.manchester.spinnaker.machine.CoreSubsets;
37
import uk.ac.manchester.spinnaker.machine.MemoryLocation;
38
import uk.ac.manchester.spinnaker.messages.model.IOBuffer;
39
import uk.ac.manchester.spinnaker.messages.scp.ClearIOBUF;
40
import uk.ac.manchester.spinnaker.messages.scp.ReadMemory;
41
import uk.ac.manchester.spinnaker.messages.scp.UpdateProvenanceAndExit;
42
import uk.ac.manchester.spinnaker.messages.scp.UpdateRuntime;
43
import uk.ac.manchester.spinnaker.utils.DefaultMap;
44
import uk.ac.manchester.spinnaker.utils.MappableIterable;
45

46
/**
47
 * A process for controlling an application running on a SpiNNaker core. The
48
 * operations on this process <em>do not make sense when applied to a SCAMP
49
 * core;</em> they only apply to application cores.
50
 *
51
 * @author Donal Fellows
52
 */
53
final class RuntimeControlProcess extends TxrxProcess {
54
        private static final int BUF_HEADER_BYTES = 16;
55

56
        private static final int BLOCK_HEADER_BYTES = 16;
57

58
        private static final int WORD = 4;
59

60
        private final Queue<NextRead> nextReads = new ArrayDeque<>();
×
61

62
        private final Queue<ExtraRead> extraReads = new ArrayDeque<>();
×
63

64
        private final Map<CoreLocation, Map<Integer, ByteBuffer>> iobuf =
×
65
                        new DefaultMap<>(TreeMap::new);
66

67
        /**
68
         * @param connectionSelector
69
         *            How to select how to communicate.
70
         * @param retryTracker
71
         *            Object used to track how many retries were used in an
72
         *            operation. May be {@code null} if no suck tracking is
73
         *            required.
74
         */
75
        RuntimeControlProcess(
76
                        ConnectionSelector<? extends SCPConnection> connectionSelector,
77
                        RetryTracker retryTracker) {
78
                super(connectionSelector, retryTracker);
×
79
        }
×
80

81
        /**
82
         * Clear the IOBUF buffers of a core.
83
         *
84
         * @param core
85
         *            the core where the IOBUF is.
86
         * @throws IOException
87
         *             If anything goes wrong with networking.
88
         * @throws ProcessException
89
         *             If SpiNNaker rejects the message.
90
         * @throws InterruptedException
91
         *             If the communications were interrupted.
92
         */
93
        void clearIOBUF(CoreLocation core)
94
                        throws IOException, ProcessException, InterruptedException {
95
                call(new ClearIOBUF(core));
×
96
        }
×
97

98
        /**
99
         * Clear the IOBUF buffers of some cores.
100
         *
101
         * @param coreSubsets
102
         *            the cores where the IOBUFs are.
103
         * @throws IOException
104
         *             If anything goes wrong with networking.
105
         * @throws ProcessException
106
         *             If SpiNNaker rejects the message.
107
         * @throws InterruptedException
108
         *             If the communications were interrupted.
109
         */
110
        void clearIOBUF(CoreSubsets coreSubsets)
111
                        throws IOException, ProcessException, InterruptedException {
112
                for (var core : requireNonNull(coreSubsets,
×
113
                                "must have actual core subset to iterate over")) {
114
                        sendRequest(new ClearIOBUF(core));
×
115
                }
×
116
                finishBatch();
×
117
        }
×
118

119
        /**
120
         * Update the running time configuration of some cores.
121
         *
122
         * @param runTimesteps
123
         *            The number of machine timesteps to run for. {@code null}
124
         *            indicates an infinite run.
125
         * @param currentTime
126
         *            The current simulation time.
127
         * @param syncTimesteps
128
         *            The number of timesteps before we pause to synchronise.
129
         * @param coreSubsets
130
         *            the cores to update the information of.
131
         * @throws IOException
132
         *             If anything goes wrong with networking.
133
         * @throws ProcessException
134
         *             If SpiNNaker rejects the message.
135
         * @throws InterruptedException
136
         *             If the communications were interrupted.
137
         */
138
        void updateRuntime(Integer runTimesteps, int currentTime, int syncTimesteps,
139
                        CoreSubsets coreSubsets)
140
                        throws IOException, ProcessException, InterruptedException {
141
                int runTime = (runTimesteps == null ? 0 : runTimesteps);
×
142
                boolean infiniteRun = runTimesteps == null;
×
143
                for (var core : requireNonNull(coreSubsets,
×
144
                                "must have actual core subset to iterate over")) {
145
                        sendRequest(new UpdateRuntime(core, runTime, infiniteRun,
×
146
                                        currentTime, syncTimesteps));
147
                }
×
148
                finishBatch();
×
149
        }
×
150

151
        /**
152
         * Ask some cores to update their provenance data and exit. It is up to the
153
         * caller to check for the cores' response, which is by changing state to
154
         * the exited state.
155
         *
156
         * @param coreSubsets
157
         *            the cores to update the provenance of.
158
         * @throws IOException
159
         *             If anything goes wrong with networking.
160
         * @throws InterruptedException
161
         *             If communications are interrupted.
162
         */
163
        void updateProvenanceAndExit(CoreSubsets coreSubsets)
164
                        throws IOException, InterruptedException {
165
                for (var core : requireNonNull(coreSubsets,
×
166
                                "must have actual core subset to iterate over")) {
167
                        sendOneWayRequest(new UpdateProvenanceAndExit(core));
×
168
                }
×
169
        }
×
170

171
        private static int chunk(int overall) {
172
                return min(overall, UDP_MESSAGE_MAX_SIZE);
×
173
        }
174

175
        /**
176
         * Get the IOBUF buffers from some cores.
177
         *
178
         * @param size
179
         *            How much to read.
180
         * @param cores
181
         *            Which cores to read from.
182
         * @return The buffers. It is the responsibility of the caller to handle
183
         *         whatever order they are produced in.
184
         * @throws IOException
185
         *             If anything goes wrong with networking.
186
         * @throws ProcessException
187
         *             If SpiNNaker rejects a message.
188
         * @throws InterruptedException
189
         *             If the communications were interrupted.
190
         */
191
        MappableIterable<IOBuffer> readIOBuf(int size, CoreSubsets cores)
192
                        throws ProcessException, IOException, InterruptedException {
193
                // Get the IOBuf address for each core
194
                for (var core : requireNonNull(cores,
×
195
                                "must have actual core subset to iterate over")) {
196
                        sendGet(new ReadMemory(core.getScampCore(),
×
197
                                        getVcpuAddress(core).add(CPU_IOBUF_ADDRESS_OFFSET), WORD),
×
198
                                        bytes -> issueReadForIOBufHead(core, 0,
×
199
                                                        new MemoryLocation(bytes.getInt()),
×
200
                                                        chunk(size + BUF_HEADER_BYTES)));
×
201
                }
×
202
                finishBatch();
×
203

204
                // Run rounds of the process until reading is complete
205
                while (!nextReads.isEmpty() || !extraReads.isEmpty()) {
×
206
                        while (!extraReads.isEmpty()) {
×
207
                                var read = extraReads.remove();
×
208
                                sendGet(read.message(),
×
209
                                                bytes -> saveIOBufTailSection(read, bytes));
×
210
                        }
×
211

212
                        while (!nextReads.isEmpty()) {
×
213
                                var read = nextReads.remove();
×
214
                                sendGet(read.message(), bytes -> {
×
215
                                        // Unpack the IOBuf header
216
                                        var nextAddress = new MemoryLocation(bytes.getInt());
×
217
                                        bytes.getLong(); // Ignore 8 bytes
×
218
                                        int bytesToRead = bytes.getInt();
×
219

220
                                        // Save the rest of the IOBuf
221
                                        int packetBytes = saveIOBufHead(read, bytes, bytesToRead);
×
222

223
                                        // Ask for the rest of the IOBuf buffer to be copied over
224
                                        issueReadsForIOBufTail(read, bytesToRead,
×
225
                                                        read.base.add(packetBytes + BLOCK_HEADER_BYTES),
×
226
                                                        packetBytes);
227

228
                                        // If there is another IOBuf buffer, read this next
229
                                        issueReadForIOBufHead(read.core, read.blockID + 1,
×
230
                                                        nextAddress, read.size);
231
                                });
×
232
                        }
×
233

234
                        finishBatch();
×
235
                }
236

237
                return () -> new Iterator<IOBuffer>() {
×
238
                        private final Iterator<CoreLocation> cores =
×
239
                                        iobuf.keySet().iterator();
×
240

241
                        @Override
242
                        public boolean hasNext() {
243
                                return cores.hasNext();
×
244
                        }
245

246
                        @Override
247
                        public IOBuffer next() {
248
                                var core = cores.next();
×
249
                                return new IOBuffer(core, iobuf.get(core).values());
×
250
                        }
251
                };
252
        }
253

254
        private void issueReadForIOBufHead(CoreLocation core, int blockID,
255
                        MemoryLocation next, int size) {
256
                if (!next.isNull()) {
×
257
                        nextReads.add(new NextRead(core, blockID, next, size));
×
258
                }
259
        }
×
260

261
        private int saveIOBufHead(NextRead read, ByteBuffer bytes,
262
                        int bytesToRead) {
263
                // Create a buffer for the data
264
                var buffer = alloc(bytesToRead);
×
265
                // Put the data from this packet into the buffer
266
                int packetBytes = min(bytes.remaining(), bytesToRead);
×
267
                if (packetBytes > 0) {
×
268
                        buffer.put(bytes);
×
269
                }
270
                iobuf.get(read.core).put(read.blockID, buffer);
×
271
                return packetBytes;
×
272
        }
273

274
        private void issueReadsForIOBufTail(NextRead read, int bytesToRead,
275
                        MemoryLocation baseAddress, int readOffset) {
276
                bytesToRead -= readOffset;
×
277
                // While more reads need to be done to read the data
278
                while (bytesToRead > 0) {
×
279
                        // Read the next bit of memory making up the buffer
280
                        int next = chunk(bytesToRead);
×
281
                        extraReads.add(new ExtraRead(read, baseAddress, next, readOffset));
×
282
                        baseAddress = baseAddress.add(next);
×
283
                        readOffset += next;
×
284
                        bytesToRead -= next;
×
285
                }
×
286
        }
×
287

288
        private void saveIOBufTailSection(ExtraRead read, ByteBuffer bytes) {
289
                var buffer = iobuf.get(read.core).get(read.blockID);
×
290
                synchronized (buffer) {
×
291
                        buffer.position(read.offset);
×
292
                        buffer.put(bytes);
×
293
                }
×
294
        }
×
295

296
        private static class NextRead {
297
                final CoreLocation core;
298

299
                final int blockID;
300

301
                final MemoryLocation base;
302

303
                final int size;
304

305
                NextRead(CoreLocation core, int blockID, MemoryLocation base,
306
                                int size) {
×
307
                        this.core = core;
×
308
                        this.blockID = blockID;
×
309
                        this.base = base;
×
310
                        this.size = size;
×
311
                }
×
312

313
                /**
314
                 * @return the message implied by this object
315
                 */
316
                ReadMemory message() {
317
                        return new ReadMemory(core.getScampCore(), base, size);
×
318
                }
319
        }
320

321
        private static class ExtraRead {
322
                final CoreLocation core;
323

324
                final int blockID;
325

326
                final MemoryLocation base;
327

328
                final int size;
329

330
                final int offset;
331

332
                ExtraRead(NextRead head, MemoryLocation base, int size, int offset) {
×
333
                        this.core = head.core;
×
334
                        this.blockID = head.blockID;
×
335
                        this.base = base;
×
336
                        this.size = size;
×
337
                        this.offset = offset;
×
338
                }
×
339

340
                /**
341
                 * @return the message implied by this object
342
                 */
343
                ReadMemory message() {
344
                        return new ReadMemory(core.getScampCore(), base, size);
×
345
                }
346
        }
347
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc