• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 6135134805

08 Sep 2023 01:28PM UTC coverage: 36.986% (-0.1%) from 37.105%
6135134805

push

github

web-flow
Merge pull request #1035 from SpiNNakerManchester/retry_proxy

Retry proxy

8684 of 23479 relevant lines covered (36.99%)

1.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

21.52
/SpiNNaker-allocserv/src/main/java/uk/ac/manchester/spinnaker/alloc/proxy/SpinWSHandler.java
1
/*
2
 * Copyright (c) 2022 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.alloc.proxy;
17

18
import static java.lang.Integer.parseInt;
19
import static java.nio.ByteOrder.LITTLE_ENDIAN;
20
import static java.util.concurrent.Executors.newCachedThreadPool;
21
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
22
import static org.slf4j.LoggerFactory.getLogger;
23
import static uk.ac.manchester.spinnaker.alloc.proxy.Utils.getFieldFromTemplate;
24

25
import java.io.EOFException;
26
import java.io.IOException;
27
import java.net.InetAddress;
28
import java.net.URI;
29
import java.net.UnknownHostException;
30
import java.util.Map;
31
import java.util.Optional;
32
import java.util.concurrent.ExecutorService;
33

34
import javax.annotation.PostConstruct;
35
import javax.annotation.PreDestroy;
36

37
import org.slf4j.Logger;
38
import org.springframework.beans.factory.annotation.Autowired;
39
import org.springframework.http.server.ServerHttpRequest;
40
import org.springframework.http.server.ServerHttpResponse;
41
import org.springframework.security.core.context.SecurityContextHolder;
42
import org.springframework.stereotype.Component;
43
import org.springframework.web.socket.BinaryMessage;
44
import org.springframework.web.socket.CloseStatus;
45
import org.springframework.web.socket.WebSocketHandler;
46
import org.springframework.web.socket.WebSocketSession;
47
import org.springframework.web.socket.handler.BinaryWebSocketHandler;
48
import org.springframework.web.socket.server.HandshakeInterceptor;
49
import org.springframework.web.util.UriTemplate;
50

51
import uk.ac.manchester.spinnaker.alloc.SpallocProperties;
52
import uk.ac.manchester.spinnaker.alloc.allocator.SpallocAPI;
53
import uk.ac.manchester.spinnaker.alloc.allocator.SpallocAPI.Job;
54
import uk.ac.manchester.spinnaker.alloc.security.Permit;
55
import uk.ac.manchester.spinnaker.alloc.web.RequestFailedException;
56
import uk.ac.manchester.spinnaker.alloc.web.RequestFailedException.NotFound;
57
import uk.ac.manchester.spinnaker.utils.Daemon;
58
import uk.ac.manchester.spinnaker.utils.UsedInJavadocOnly;
59

60
/**
61
 * Initial handler for web sockets. Maps a particular websocket to a
62
 * {@linkplain ProxyCore proxy handler} that processes messages received and
63
 * sends messages the other way.
64
 *
65
 * @author Donal Fellows
66
 */
67
@Component
68
public class SpinWSHandler extends BinaryWebSocketHandler
69
                implements HandshakeInterceptor {
70
        private static final Logger log = getLogger(SpinWSHandler.class);
3✔
71

72
        @Autowired
73
        private SpallocAPI spallocCore;
74

75
        @Autowired
76
        private SpallocProperties properties;
77

78
        private InetAddress localHost;
79

80
        private ExecutorService executor;
81

82
        private final ConnectionIDIssuer idIssuer = new ConnectionIDIssuer();
3✔
83

84
        SpinWSHandler() {
3✔
85
                var group = new ThreadGroup("ws/udp workers");
3✔
86
                executor =
3✔
87
                                newCachedThreadPool(r -> new Daemon(group, r, "ws/udp worker"));
3✔
88
        }
3✔
89

90
        /** The path that we match in this handler. */
91
        public static final String PATH = "proxy/{id:\\d+}";
92

93
        /** The name of the field in the {@link #PATH}. */
94
        private static final String ID = "id";
95

96
        /** The {@link #PATH} as a template. */
97
        private final UriTemplate template = new UriTemplate(PATH);
3✔
98

99
        /**
100
         * Type-safe session property descriptor.
101
         *
102
         * @param <T>
103
         *            The type of the property.
104
         */
105
        private static final class Property<T> {
106
                private final String name;
107

108
                private final Class<T> cls;
109

110
                /**
111
                 * @param name
112
                 *            The name of the property.
113
                 * @param cls
114
                 *            The class of the property.
115
                 */
116
                Property(String name, Class<T> cls) {
3✔
117
                        this.name = SpinWSHandler.class + ":" + name;
3✔
118
                        this.cls = cls;
3✔
119
                }
3✔
120

121
                T get(WebSocketSession session) {
122
                        return cls.cast(session.getAttributes().get(name));
×
123
                }
124

125
                void put(Map<String, Object> attributes, T value) {
126
                        attributes.put(name, value);
×
127
                }
×
128
        }
129

130
        private static final Property<Job> JOB =
3✔
131
                        new Property<>("job", Job.class);
132

133
        private static final Property<ProxyCore> PROXY =
3✔
134
                        new Property<>("proxy", ProxyCore.class);
135

136
        // -----------------------------------------------------------
137
        // Satisfy the APIs that we use to plug into Spring
138

139
        @PostConstruct
140
        private void initLocalHost() throws UnknownHostException {
141
                var props = properties.getProxy();
3✔
142
                if (!props.getLocalHost().isBlank()) {
3✔
143
                        this.localHost = InetAddress.getByName(props.getLocalHost());
×
144
                }
145
        }
3✔
146

147
        @PreDestroy
148
        private void stopPool() {
149
                /*
150
                 * The threads inside don't need to be explicitly stopped as they're all
151
                 * daemon threads.
152
                 */
153
                executor.shutdown();
×
154
        }
×
155

156
        /**
157
         * Look up the job that websocket is being created for and attach it to the
158
         * websocket's attributes. {@inheritDoc}
159
         *
160
         * @return Whether we found the job and attached it.
161
         */
162
        @Override
163
        public boolean beforeHandshake(ServerHttpRequest request,
164
                        ServerHttpResponse response, WebSocketHandler wsHandler,
165
                        Map<String, Object> attributes) {
166
                var j = lookUpJobFromPath(request);
×
167
                // If we have a job, remember it and succeed
168
                j.ifPresent(job -> JOB.put(attributes, job));
×
169
                log.debug("Before handshake with request uri {}, job {}",
×
170
                                request.getURI(), j);
×
171
                return j.isPresent();
×
172
        }
173

174
        /** Empty method to satisfy interface. {@inheritDoc} */
175
        @Override
176
        public void afterHandshake(ServerHttpRequest request,
177
                        ServerHttpResponse response, WebSocketHandler wsHandler,
178
                        Exception exception) {
179
                log.debug("Handshake done for uri {}", request.getURI());
×
180
        }
×
181

182
        /**
183
         * Websocket established; connect to the proxy handler.
184
         *
185
         * @param session
186
         *            The websocket session to connect.
187
         */
188
        @Override
189
        public void afterConnectionEstablished(WebSocketSession session) {
190
                log.debug("Websocket session {} established", session);
×
191
                initProxyCore(session, JOB.get(session));
×
192
        }
×
193

194
        /**
195
         * Websocket closed; disconnect the proxy handler.
196
         *
197
         * @param session
198
         *            The websocket session to disconnect.
199
         * @param status
200
         *            Why the session closed. (Unimportant for this code)
201
         */
202
        @Override
203
        public void afterConnectionClosed(WebSocketSession session,
204
                        CloseStatus status) {
205
                log.debug("Websocket session {} closed", session);
×
206
                closed(session, PROXY.get(session), JOB.get(session));
×
207
        }
×
208

209
        /**
210
         * Websocket message received, dispatch it to the proxy for handling.
211
         *
212
         * @param session
213
         *            The websocket session that the message was received on.
214
         * @param message
215
         *            The received message.
216
         */
217
        @Override
218
        protected void handleBinaryMessage(WebSocketSession session,
219
                        BinaryMessage message) throws Exception {
220
                delegateToProxy(message, PROXY.get(session));
×
221
        }
×
222

223
        /**
224
         * Log exceptions from the transport level. Except for {@link EOFException}
225
         * because that's just fine and comes up in a normal disconnect.
226
         *
227
         * @param session
228
         *            The websocket session that had the error.
229
         * @param exception
230
         *            What happened.
231
         */
232
        @Override
233
        public void handleTransportError(WebSocketSession session,
234
                        Throwable exception) {
235
                if (!(exception instanceof EOFException)) {
×
236
                        // We don't log EOFException
237
                        log.warn("transport error for {}", session, exception);
×
238
                } else {
239
                        log.debug("End of web socket session {}", session, exception);
×
240
                }
241
                // Don't need to close; afterConnectionClosed() will be called next
242
        }
×
243

244
        // -----------------------------------------------------------
245
        // General implementation methods
246

247
        private Optional<Job> lookUpJobFromPath(ServerHttpRequest request) {
248
                return getFieldFromTemplate(template, request.getURI(), ID)
×
249
                                // Convert to integer
250
                                .flatMap(Utils::parseInteger)
×
251
                                // IDs are only ever valid if positive
252
                                .filter(Utils::positive)
×
253
                                // Do the lookup of the job
254
                                .flatMap(this::getJob);
×
255
        }
256

257
        /**
258
         * How to look up the job. Note that the underlying object is security
259
         * aware, so we need the authorisation step.
260
         *
261
         * @param jobId
262
         *            The job identifier
263
         * @return The job, if one is known and the current user is allowed to read
264
         *         details of it (owner or admin).
265
         */
266
        private Optional<Job> getJob(int jobId) {
267
                var permit = new Permit(SecurityContextHolder.getContext());
×
268
                // How to look up a job; the permit is needed!
269
                return permit.authorize(() -> spallocCore.getJob(permit, jobId));
×
270
        }
271

272
        /**
273
         * Connection established and job looked up. Make a proxy.
274
         *
275
         * @param session
276
         *            The Websocket session
277
         * @param job
278
         *            The job extracted from the websocket path
279
         * @throws NotFound
280
         *             If the job ID can't be mapped to a job
281
         * @throws RequestFailedException
282
         *             If the job doesn't have an allocated machine
283
         */
284
        @UsedInJavadocOnly(NotFound.class)
285
        protected final void initProxyCore(WebSocketSession session, Job job) {
286
                var proxy = job.getMachine()
×
287
                                .map(machine -> new ProxyCore(session, machine.getConnections(),
×
288
                                                executor, idIssuer::issueId,
×
289
                                                properties.getProxy().isLogWriteCounts(), localHost))
×
290
                                .orElseThrow(
×
291
                                                () -> new RequestFailedException(SERVICE_UNAVAILABLE,
×
292
                                                                "job not in state where proxying permitted"));
293
                PROXY.put(session.getAttributes(), proxy);
×
294
                job.rememberProxy(proxy);
×
295
                log.debug("user {} has web socket {} connected for job {}",
×
296
                                session.getPrincipal(), session, job.getId());
×
297
        }
×
298

299
        /**
300
         * Connection closed.
301
         *
302
         * @param session
303
         *            The Websocket session
304
         * @param proxy
305
         *            The proxy handler for our custom protocol
306
         * @param job
307
         *            What job we are working for
308
         */
309
        protected final void closed(WebSocketSession session, ProxyCore proxy,
310
                        Job job) {
311
                if (proxy != null) {
×
312
                        proxy.close();
×
313
                        job.forgetProxy(proxy);
×
314
                }
315
                log.debug("user {} has disconnected web socket {}",
×
316
                                session.getPrincipal(), session);
×
317
        }
×
318

319
        /**
320
         * Message was sent to us.
321
         *
322
         * @param message
323
         *            The body of the message.
324
         * @param proxy
325
         *            The socket proxy will be handling the message
326
         * @throws IOException
327
         *             If the proxy fails to handle the message in a bad way
328
         */
329
        protected final void delegateToProxy(BinaryMessage message, ProxyCore proxy)
330
                        throws IOException {
331
                proxy.handleClientMessage(message.getPayload().order(LITTLE_ENDIAN));
×
332
        }
×
333

334
        /**
335
         * Handles giving each proxy connection its own ID. We give them unique IDs
336
         * so that if someone opens multiple websockets to the same job, they can't
337
         * get the same ID for the connections underneath; that would be just too
338
         * confusing!
339
         */
340
        private static final class ConnectionIDIssuer {
341
                private int id;
342

343
                /**
344
                 * Issue an ID.
345
                 *
346
                 * @return A new ID. Never zero.
347
                 */
348
                private synchronized int issueId() {
349
                        int thisId;
350
                        do {
351
                                thisId = ++id;
×
352
                        } while (thisId == 0);
×
353
                        return thisId;
×
354
                }
355
        }
356
}
357

358
abstract class Utils {
359
        private Utils() {
360
        }
361

362
        /**
363
         * Match the template against the path of the URI and extract a field.
364
         *
365
         * @param template
366
         *            The template
367
         * @param uri
368
         *            The URI
369
         * @param key
370
         *            The name of the field to extract
371
         * @return The content of the field, if present. Otherwise
372
         *         {@link Optional#empty()}.
373
         */
374
        static Optional<String> getFieldFromTemplate(UriTemplate template,
375
                        URI uri, String key) {
376
                var templateResults = template.match(uri.getPath());
×
377
                if (templateResults == null) {
×
378
                        return Optional.empty();
×
379
                }
380
                var val = templateResults.get(key);
×
381
                return Optional.ofNullable(val);
×
382
        }
383

384
        /**
385
         * Parse a string as a decimal integer.
386
         *
387
         * @param val
388
         *            The string to parse.
389
         * @return The integer, or {@link Optional#empty()} if the parse fails.
390
         */
391
        static Optional<Integer> parseInteger(String val) {
392
                try {
393
                        return Optional.of(parseInt(val));
×
394
                } catch (NumberFormatException ignored) {
×
395
                        // Do nothing here
396
                }
397
                return Optional.empty();
×
398
        }
399

400
        static boolean positive(int n) {
401
                return n > 0;
×
402
        }
403
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc