• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 13989614444

20 Mar 2025 11:43AM UTC coverage: 38.216% (-0.4%) from 38.579%
13989614444

Pull #1222

github

rowleya
Fix issues from review
Pull Request #1222: More spalloc rest calls

70 of 815 new or added lines in 33 files covered. (8.59%)

29 existing lines in 13 files now uncovered.

9181 of 24024 relevant lines covered (38.22%)

1.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/alloc/client/ProxiedTransceiver.java
1
/*
2
 * Copyright (c) 2022 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.alloc.client;
17

18
import static uk.ac.manchester.spinnaker.messages.Constants.UDP_MESSAGE_MAX_SIZE;
19
import static uk.ac.manchester.spinnaker.machine.ChipLocation.ZERO_ZERO;
20
import static uk.ac.manchester.spinnaker.utils.InetFactory.getByNameQuietly;
21

22
import java.io.EOFException;
23
import java.io.File;
24
import java.io.FileInputStream;
25
import java.io.IOException;
26
import java.io.InputStream;
27
import java.net.InetAddress;
28
import java.nio.ByteBuffer;
29
import java.util.ArrayList;
30
import java.util.HashMap;
31
import java.util.List;
32
import java.util.Map;
33

34
import com.google.errorprone.annotations.CheckReturnValue;
35
import com.google.errorprone.annotations.MustBeClosed;
36

37
import uk.ac.manchester.spinnaker.alloc.client.SpallocClient.Job;
38
import uk.ac.manchester.spinnaker.connections.EIEIOConnection;
39
import uk.ac.manchester.spinnaker.connections.SCPConnection;
40
import uk.ac.manchester.spinnaker.connections.model.Connection;
41
import uk.ac.manchester.spinnaker.machine.ChipLocation;
42
import uk.ac.manchester.spinnaker.machine.HasCoreLocation;
43
import uk.ac.manchester.spinnaker.machine.MachineVersion;
44
import uk.ac.manchester.spinnaker.machine.MemoryLocation;
45
import uk.ac.manchester.spinnaker.storage.BufferManagerStorage;
46
import uk.ac.manchester.spinnaker.storage.StorageException;
47
import uk.ac.manchester.spinnaker.storage.BufferManagerStorage.Region;
48
import uk.ac.manchester.spinnaker.transceiver.ParallelSafe;
49
import uk.ac.manchester.spinnaker.transceiver.ProcessException;
50
import uk.ac.manchester.spinnaker.transceiver.SpinnmanException;
51
import uk.ac.manchester.spinnaker.transceiver.Transceiver;
52

53
/** A transceiver that routes messages across the proxy. */
54
final class ProxiedTransceiver extends Transceiver {
55

56
        /** Size of the buffer for moving bytes around. */
57
        private static final int BUFFER_SIZE = 1024;
58

59
        private final Job job;
60

61
        private final ProxyProtocolClient websocket;
62

NEW
63
        private final Map<InetAddress, ChipLocation> hostToChip = new HashMap<>();
×
64

65
        /**
66
         * @param version
67
         *            The version of the machine connected to.
68
         * @param connections
69
         *            The proxied connections we will use.
70
         * @param hostToChip
71
         *            The mapping from addresses to chip locations, to enable
72
         *            manufacturing of proxied {@link EIEIOConnection}s.
73
         * @param websocket
74
         *            The proxy handle.
75
         * @throws SpinnmanException
76
         * @throws IOException
77
         *             If we couldn't finish setting up our networking.
78
         * @throws InterruptedException
79
         *             If communications are interrupted.
80
         * @throws SpinnmanExcception
81
         *             If SpiNNaker rejects a message.
82
         */
83
        @MustBeClosed
84
        ProxiedTransceiver(Job job,
85
                        ProxyProtocolClient websocket)
86
                        throws IOException, SpinnmanException, InterruptedException {
NEW
87
                super(getVersion(job), getConnections(job, websocket));
×
NEW
88
                this.job = job;
×
89
                this.websocket = websocket;
×
90

NEW
91
                for (Connection conn : getConnections()) {
×
NEW
92
                        if (conn instanceof ProxiedSCPConnection) {
×
NEW
93
                                ProxiedSCPConnection pConn = (ProxiedSCPConnection) conn;
×
NEW
94
                                hostToChip.put(pConn.getRemoteIPAddress(), pConn.getChip());
×
95
                        }
NEW
96
                }
×
NEW
97
        }
×
98

99
        private static MachineVersion getVersion(Job job) throws IOException {
NEW
100
                var machine = job.machine();
×
NEW
101
                return MachineVersion.bySize(machine.getWidth(), machine.getHeight());
×
102
        }
103

104
        private static List<Connection> getConnections(Job job,
105
                        ProxyProtocolClient ws) throws IOException, InterruptedException {
NEW
106
                var conns = new ArrayList<Connection>();
×
NEW
107
                var machine = job.machine();
×
NEW
108
                InetAddress bootChipAddress = null;
×
NEW
109
                for (var bc : machine.getConnections()) {
×
NEW
110
                        var chipAddr = getByNameQuietly(bc.getHostname());
×
NEW
111
                        var chipLoc = bc.getChip().asChipLocation();
×
NEW
112
                        conns.add(new ProxiedSCPConnection(chipLoc, ws, chipAddr));
×
NEW
113
                        if (chipLoc.equals(ZERO_ZERO)) {
×
NEW
114
                                bootChipAddress = chipAddr;
×
115
                        }
NEW
116
                }
×
NEW
117
                if (bootChipAddress != null) {
×
NEW
118
                        conns.add(new ProxiedBootConnection(ws, bootChipAddress));
×
119
                }
NEW
120
                return conns;
×
121
        }
122

123
        /** {@inheritDoc} */
124
        @Override
125
        public void close() throws IOException {
126
                super.close();
×
127
                websocket.close();
×
128
        }
×
129

130
        @Override
131
        public SCPConnection createScpConnection(ChipLocation chip,
132
                        InetAddress addr) throws IOException {
133
                try {
134
                        return new ProxiedSCPConnection(chip, websocket, addr);
×
135
                } catch (InterruptedException e) {
×
136
                        throw new IOException("failed to proxy connection", e);
×
137
                }
138
        }
139

140
        @Override
141
        protected EIEIOConnection newEieioConnection(InetAddress localHost,
142
                        Integer localPort) throws IOException {
143
                try {
144
                        return new ProxiedEIEIOListenerConnection(hostToChip, websocket);
×
145
                } catch (InterruptedException e) {
×
146
                        throw new IOException("failed to proxy connection", e);
×
147
                }
148
        }
149

150
        @Override
151
        @ParallelSafe
152
        public void writeMemory(HasCoreLocation core, MemoryLocation baseAddress,
153
                        InputStream dataStream, int numBytes)
154
                        throws IOException, ProcessException, InterruptedException {
155
                // If this will use a single message, just use SCP
NEW
156
                if (numBytes <= UDP_MESSAGE_MAX_SIZE) {
×
NEW
157
                        super.writeMemory(core, baseAddress, dataStream, numBytes);
×
158
                } else {
NEW
159
                        ByteBuffer data = ByteBuffer.allocate(numBytes);
×
NEW
160
                        byte[] buffer = new byte[BUFFER_SIZE];
×
NEW
161
                        int remaining = numBytes;
×
NEW
162
                        while (remaining > 0) {
×
NEW
163
                                int toRead = Math.min(remaining, buffer.length);
×
NEW
164
                                int read = dataStream.read(buffer, 0, toRead);
×
NEW
165
                                if (read < 0) {
×
NEW
166
                                        throw new EOFException();
×
167
                                }
NEW
168
                                data.put(buffer, 0, read);
×
NEW
169
                                remaining -= read;
×
NEW
170
                        }
×
NEW
171
                        this.job.writeMemory(core, baseAddress, data);
×
172
                }
NEW
173
        }
×
174

175
        @Override
176
        @ParallelSafe
177
        public void writeMemory(HasCoreLocation core, MemoryLocation baseAddress,
178
                        File dataFile)
179
                        throws IOException, ProcessException, InterruptedException {
180
                // If this will use a single message, just use SCP
NEW
181
                if (dataFile.length() <= UDP_MESSAGE_MAX_SIZE) {
×
NEW
182
                        super.writeMemory(core, baseAddress, dataFile);
×
183
                } else {
NEW
184
                        try (var stream = new FileInputStream(dataFile)) {
×
NEW
185
                                writeMemory(core, baseAddress, stream, (int) dataFile.length());
×
186
                        }
187
                }
NEW
188
        }
×
189

190
        @Override
191
        @ParallelSafe
192
        public void writeMemory(HasCoreLocation core, MemoryLocation baseAddress,
193
                        ByteBuffer data)
194
                        throws IOException, ProcessException, InterruptedException {
195
                // If this will use a single message, just use SCP
NEW
196
                if (data.remaining() <= UDP_MESSAGE_MAX_SIZE) {
×
NEW
197
                        super.writeMemory(core, baseAddress, data);
×
198
                } else {
NEW
199
                        this.job.writeMemory(core, baseAddress, data);
×
200
                }
NEW
201
        }
×
202

203
        @Override
204
        @CheckReturnValue
205
        @ParallelSafe
206
        public ByteBuffer readMemory(HasCoreLocation core,
207
                        MemoryLocation baseAddress, int length)
208
                        throws IOException, ProcessException, InterruptedException {
209
                // If this will use a single message, just use SCP
NEW
210
                if (length <= UDP_MESSAGE_MAX_SIZE) {
×
NEW
211
                        return super.readMemory(core, baseAddress, length);
×
212
                } else {
NEW
213
                        return job.readMemory(core, baseAddress, length);
×
214
                }
215
        }
216

217
        @Override
218
        public void readRegion(Region region, BufferManagerStorage storage)
219
                        throws IOException, ProcessException, StorageException,
220
                        InterruptedException {
NEW
221
                if (region.size < UDP_MESSAGE_MAX_SIZE) {
×
NEW
222
                        super.readRegion(region, storage);
×
223
                } else {
NEW
224
                        var buffer = job.readMemory(
×
225
                                        region.core, region.startAddress, region.size);
NEW
226
                        storage.addRecordingContents(region, buffer);
×
227
                }
NEW
228
        }
×
229
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc