• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 6310285782

26 Sep 2023 08:47AM UTC coverage: 36.367% (-0.5%) from 36.866%
6310285782

Pull #658

github

dkfellows
Merge branch 'master' into java-17
Pull Request #658: Update Java version to 17 and JEE to 9

1675 of 1675 new or added lines in 266 files covered. (100.0%)

8368 of 23010 relevant lines covered (36.37%)

0.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.24
/SpiNNaker-allocserv/src/main/java/uk/ac/manchester/spinnaker/alloc/compat/V1CompatService.java
1
/*
2
 * Copyright (c) 2021 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.alloc.compat;
17

18
import static com.fasterxml.jackson.databind.PropertyNamingStrategies.SNAKE_CASE;
19
import static java.lang.Thread.interrupted;
20
import static java.util.Objects.nonNull;
21
import static java.util.Objects.requireNonNull;
22
import static java.util.concurrent.Executors.newCachedThreadPool;
23
import static java.util.concurrent.Executors.newFixedThreadPool;
24
import static java.util.concurrent.TimeUnit.MILLISECONDS;
25
import static org.slf4j.LoggerFactory.getLogger;
26

27
import java.io.IOException;
28
import java.io.PipedReader;
29
import java.io.PipedWriter;
30
import java.net.InetSocketAddress;
31
import java.net.ServerSocket;
32
import java.net.Socket;
33
import java.net.SocketException;
34
import java.time.Duration;
35
import java.util.concurrent.ExecutorService;
36
import java.util.concurrent.Future;
37
import java.util.concurrent.ThreadFactory;
38

39
import org.slf4j.Logger;
40
import org.springframework.beans.factory.ObjectProvider;
41
import org.springframework.beans.factory.annotation.Autowired;
42
import org.springframework.stereotype.Service;
43

44
import com.fasterxml.jackson.databind.ObjectMapper;
45
import com.fasterxml.jackson.databind.json.JsonMapper;
46
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
47
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
48
import com.google.errorprone.annotations.RestrictedApi;
49

50
import jakarta.annotation.PostConstruct;
51
import jakarta.annotation.PreDestroy;
52
import uk.ac.manchester.spinnaker.alloc.ForTestingOnly;
53
import uk.ac.manchester.spinnaker.alloc.SpallocProperties;
54
import uk.ac.manchester.spinnaker.alloc.SpallocProperties.CompatibilityProperties;
55
import uk.ac.manchester.spinnaker.alloc.model.Prototype;
56
import uk.ac.manchester.spinnaker.utils.UsedInJavadocOnly;
57
import uk.ac.manchester.spinnaker.utils.ValueHolder;
58

59
/**
60
 * Implementation of the old style Spalloc interface. Delegates handling a
61
 * single connection to a {@link V1CompatTask} instance (manufactured by Spring
62
 * as a {@linkplain Prototype prototype} bean).
63
 *
64
 * @author Donal Fellows
65
 */
66
@Service("spalloc-v1-compatibility-service")
67
@UsedInJavadocOnly(Prototype.class)
68
public class V1CompatService {
69
        private static final Logger log = getLogger(V1CompatService.class);
1✔
70

71
        /** The overall service properties. */
72
        @Autowired
73
        private SpallocProperties mainProps;
74

75
        /**
76
         * Factory for {@linkplain V1CompatTask tasks}. Only use via
77
         * {@link #getTask(Socket) getTask(...)} or the test API.
78
         */
79
        @Autowired
80
        private ObjectProvider<V1CompatTask> taskFactory;
81

82
        /** How we make threads. */
83
        private final ThreadFactory threadFactory;
84

85
        /** The service socket. */
86
        private ServerSocket serv;
87

88
        /** The main network service listener thread. */
89
        private Thread servThread;
90

91
        /**
92
         * How to serialize and deserialize JSON. Would be a bean except then we'd
93
         * be wiring by name.
94
         */
95
        private final ObjectMapper mapper;
96

97
        /** How the majority of threads are launched by the service. */
98
        private ExecutorService executor;
99

100
        private Duration shutdownTimeout;
101

102
        V1CompatService() {
1✔
103
                mapper = JsonMapper.builder().propertyNamingStrategy(SNAKE_CASE)
1✔
104
                                .addModule(new Jdk8Module())
1✔
105
                                .addModule(new JavaTimeModule()).build();
1✔
106
                var group = new ThreadGroup("spalloc-legacy-service");
1✔
107
                var counter = new ValueHolder<>(1);
1✔
108
                threadFactory = r -> {
1✔
109
                        var t = new Thread(group, r,
1✔
110
                                        "spalloc-legacy-" + counter.update(i -> i + 1));
1✔
111
                        t.setUncaughtExceptionHandler((thread, ex) -> log
1✔
112
                                        .error("uncaught exception in {}", thread, ex));
×
113
                        return t;
1✔
114
                };
115
        }
1✔
116

117
        /** A class that can reach into a compat service. */
118
        abstract static class Aware {
119
                private final V1CompatService srv;
120

121
                Aware(V1CompatService service) {
1✔
122
                        srv = requireNonNull(service);
1✔
123
                }
1✔
124

125
                /**
126
                 * @return The executor to use.
127
                 */
128
                protected final ExecutorService getExecutor() {
129
                        return requireNonNull(srv.executor);
1✔
130
                }
131

132
                /**
133
                 * @return The JSON mapper to use if necessary.
134
                 */
135
                protected final ObjectMapper getJsonMapper() {
136
                        return requireNonNull(srv.mapper);
1✔
137
                }
138

139
                /** @return The relevant properties. */
140
                protected final CompatibilityProperties getProperties() {
141
                        return srv.mainProps.getCompat();
×
142
                }
143
        }
144

145
        @PostConstruct
146
        private void open() throws IOException {
147
                var props = mainProps.getCompat();
1✔
148
                if (props.getThreadPoolSize() > 0) {
1✔
149
                        log.info("setting thread pool size to {}",
1✔
150
                                        props.getThreadPoolSize());
1✔
151
                        executor = newFixedThreadPool(props.getThreadPoolSize(),
1✔
152
                                        threadFactory);
153
                } else {
154
                        log.info("using unbounded thread pool");
1✔
155
                        executor = newCachedThreadPool(threadFactory);
1✔
156
                }
157

158
                if (props.isEnable()) {
1✔
159
                        var addr = new InetSocketAddress(props.getHost(), props.getPort());
×
160
                        serv = new ServerSocket();
×
161
                        serv.bind(addr);
×
162
                        servThread = threadFactory.newThread(this::acceptConnections);
×
163
                        servThread.setName("spalloc-legacy-service");
×
164
                        log.info("launching listener thread {} on address {}", servThread,
×
165
                                        addr);
166
                        servThread.start();
×
167
                }
168

169
                this.shutdownTimeout = props.getShutdownTimeout();
1✔
170
        }
1✔
171

172
        @PreDestroy
173
        private void close() throws IOException, InterruptedException {
174
                if (nonNull(serv)) {
×
175
                        log.info("shutting down listener thread {}", servThread);
×
176
                        // Shut down the server socket first; no new clients
177
                        servThread.interrupt();
×
178
                        serv.close();
×
179
                        servThread.join();
×
180
                }
181

182
                // Shut down the clients
183
                var remainingTasks = executor.shutdownNow();
×
184
                if (!remainingTasks.isEmpty()) {
×
185
                        log.warn("there are {} compat tasks outstanding",
×
186
                                        remainingTasks.size());
×
187
                }
188
                if (executor.awaitTermination(shutdownTimeout.toMillis(),
×
189
                                MILLISECONDS)) {
190
                        log.info("compat service stopped");
×
191
                } else {
192
                        log.warn("compat service executor ({}) still running!", executor);
×
193
                }
194
        }
×
195

196
        /**
197
         * Make a task.
198
         *
199
         * @param socket
200
         *            The connected socket that the task will be handling.
201
         * @return The task instance.
202
         */
203
        private V1CompatTask getTask(Socket socket) {
204
                return taskFactory.getObject(this, socket);
×
205
        }
206

207
        /**
208
         * Main service loop. Accepts connections and dispatches them to workers.
209
         */
210
        private void acceptConnections() {
211
                try {
212
                        while (acceptConnection()) {
×
213
                                continue;
×
214
                        }
215
                } finally {
216
                        try {
217
                                serv.close();
×
218
                        } catch (IOException e) {
×
219
                                log.warn("IO error", e);
×
220
                        }
×
221
                }
222
        }
×
223

224
        /**
225
         * Accept a single connection and dispatch it to a worker task in a thread.
226
         *
227
         * @return If {@code false}, we want to stop accepting connections.
228
         */
229
        private boolean acceptConnection() {
230
                try {
231
                        var service = getTask(serv.accept());
×
232
                        executor.execute(service::handleConnection);
×
233
                } catch (SocketException e) {
×
234
                        // Check here; interrupt = shutting down = no errors, please
235
                        if (interrupted()) {
×
236
                                return false;
×
237
                        }
238
                        if (serv.isClosed()) {
×
239
                                return false;
×
240
                        }
241
                        log.warn("IO error", e);
×
242
                } catch (IOException e) {
×
243
                        log.warn("IO error", e);
×
244
                }
×
245
                // If we've been interrupted here, we want the main loop to stop
246
                return !interrupted();
×
247
        }
248

249
        /**
250
         * Not a public API! Operations for testing only.
251
         *
252
         * @hidden
253
         */
254
        @ForTestingOnly
255
        public interface TestAPI {
256
                /**
257
                 * Make an instance of {@link V1CompatTask} that we can talk to.
258
                 *
259
                 * @param in
260
                 *            How to send a message to the task. Should be
261
                 *            <em>unconnected</em>.
262
                 * @param out
263
                 *            How to receive a message from the task. Should be
264
                 *            <em>unconnected</em>.
265
                 * @return A future that can be cancelled to shut things down.
266
                 * @throws Exception
267
                 *             If various things go wrong.
268
                 */
269
                Future<?> launchInstance(PipedWriter in, PipedReader out)
270
                                throws Exception;
271
        }
272

273
        /**
274
         * Not a public API!
275
         *
276
         * @return Test interface.
277
         * @deprecated Only for testing.
278
         * @hidden
279
         */
280
        @ForTestingOnly
281
        @RestrictedApi(explanation = "just for testing", link = "index.html",
282
                        allowedOnPath = ".*/src/test/java/.*")
283
        @Deprecated
284
        public TestAPI getTestApi() {
285
                ForTestingOnly.Utils.checkForTestClassOnStack();
1✔
286
                return new TestAPI() {
1✔
287
                        @Override
288
                        public Future<?> launchInstance(PipedWriter in, PipedReader out)
289
                                        throws Exception {
290
                                var service = taskFactory.getObject(V1CompatService.this,
1✔
291
                                                new PipedReader(in), new PipedWriter(out));
292
                                return executor.submit(service::handleConnection);
1✔
293
                        }
294
                };
295
        }
296
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc