• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 6233274834

19 Sep 2023 08:46AM UTC coverage: 36.409% (-0.6%) from 36.982%
6233274834

Pull #658

github

dkfellows
Merge branch 'master' into java-17
Pull Request #658: Update Java version to 17

1656 of 1656 new or added lines in 260 files covered. (100.0%)

8373 of 22997 relevant lines covered (36.41%)

0.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.24
/SpiNNaker-allocserv/src/main/java/uk/ac/manchester/spinnaker/alloc/compat/V1CompatService.java
1
/*
2
 * Copyright (c) 2021 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.alloc.compat;
17

18
import static com.fasterxml.jackson.databind.PropertyNamingStrategies.SNAKE_CASE;
19
import static java.lang.Thread.interrupted;
20
import static java.util.Objects.nonNull;
21
import static java.util.Objects.requireNonNull;
22
import static java.util.concurrent.Executors.newCachedThreadPool;
23
import static java.util.concurrent.Executors.newFixedThreadPool;
24
import static java.util.concurrent.TimeUnit.MILLISECONDS;
25
import static org.slf4j.LoggerFactory.getLogger;
26

27
import java.io.IOException;
28
import java.io.PipedReader;
29
import java.io.PipedWriter;
30
import java.net.InetSocketAddress;
31
import java.net.ServerSocket;
32
import java.net.Socket;
33
import java.net.SocketException;
34
import java.time.Duration;
35
import java.util.concurrent.ExecutorService;
36
import java.util.concurrent.Future;
37
import java.util.concurrent.ThreadFactory;
38

39
import javax.annotation.PostConstruct;
40
import javax.annotation.PreDestroy;
41

42
import org.slf4j.Logger;
43
import org.springframework.beans.factory.ObjectProvider;
44
import org.springframework.beans.factory.annotation.Autowired;
45
import org.springframework.stereotype.Service;
46

47
import com.fasterxml.jackson.databind.ObjectMapper;
48
import com.fasterxml.jackson.databind.json.JsonMapper;
49
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
50
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
51
import com.google.errorprone.annotations.RestrictedApi;
52

53
import uk.ac.manchester.spinnaker.alloc.ForTestingOnly;
54
import uk.ac.manchester.spinnaker.alloc.SpallocProperties;
55
import uk.ac.manchester.spinnaker.alloc.SpallocProperties.CompatibilityProperties;
56
import uk.ac.manchester.spinnaker.alloc.model.Prototype;
57
import uk.ac.manchester.spinnaker.utils.UsedInJavadocOnly;
58
import uk.ac.manchester.spinnaker.utils.ValueHolder;
59

60
/**
61
 * Implementation of the old style Spalloc interface. Delegates handling a
62
 * single connection to a {@link V1CompatTask} instance (manufactured by Spring
63
 * as a {@linkplain Prototype prototype} bean).
64
 *
65
 * @author Donal Fellows
66
 */
67
@Service("spalloc-v1-compatibility-service")
68
@UsedInJavadocOnly(Prototype.class)
69
public class V1CompatService {
70
        private static final Logger log = getLogger(V1CompatService.class);
1✔
71

72
        /** The overall service properties. */
73
        @Autowired
74
        private SpallocProperties mainProps;
75

76
        /**
77
         * Factory for {@linkplain V1CompatTask tasks}. Only use via
78
         * {@link #getTask(Socket) getTask(...)} or the test API.
79
         */
80
        @Autowired
81
        private ObjectProvider<V1CompatTask> taskFactory;
82

83
        /** How we make threads. */
84
        private final ThreadFactory threadFactory;
85

86
        /** The service socket. */
87
        private ServerSocket serv;
88

89
        /** The main network service listener thread. */
90
        private Thread servThread;
91

92
        /**
93
         * How to serialize and deserialize JSON. Would be a bean except then we'd
94
         * be wiring by name.
95
         */
96
        private final ObjectMapper mapper;
97

98
        /** How the majority of threads are launched by the service. */
99
        private ExecutorService executor;
100

101
        private Duration shutdownTimeout;
102

103
        V1CompatService() {
1✔
104
                mapper = JsonMapper.builder().propertyNamingStrategy(SNAKE_CASE)
1✔
105
                                .addModule(new Jdk8Module())
1✔
106
                                .addModule(new JavaTimeModule()).build();
1✔
107
                var group = new ThreadGroup("spalloc-legacy-service");
1✔
108
                var counter = new ValueHolder<>(1);
1✔
109
                threadFactory = r -> {
1✔
110
                        var t = new Thread(group, r,
1✔
111
                                        "spalloc-legacy-" + counter.update(i -> i + 1));
1✔
112
                        t.setUncaughtExceptionHandler((thread, ex) -> log
1✔
113
                                        .error("uncaught exception in {}", thread, ex));
×
114
                        return t;
1✔
115
                };
116
        }
1✔
117

118
        /** A class that can reach into a compat service. */
119
        abstract static class Aware {
120
                private final V1CompatService srv;
121

122
                Aware(V1CompatService service) {
1✔
123
                        srv = requireNonNull(service);
1✔
124
                }
1✔
125

126
                /**
127
                 * @return The executor to use.
128
                 */
129
                protected final ExecutorService getExecutor() {
130
                        return requireNonNull(srv.executor);
1✔
131
                }
132

133
                /**
134
                 * @return The JSON mapper to use if necessary.
135
                 */
136
                protected final ObjectMapper getJsonMapper() {
137
                        return requireNonNull(srv.mapper);
1✔
138
                }
139

140
                /** @return The relevant properties. */
141
                protected final CompatibilityProperties getProperties() {
142
                        return srv.mainProps.getCompat();
×
143
                }
144
        }
145

146
        @PostConstruct
147
        private void open() throws IOException {
148
                var props = mainProps.getCompat();
1✔
149
                if (props.getThreadPoolSize() > 0) {
1✔
150
                        log.info("setting thread pool size to {}",
1✔
151
                                        props.getThreadPoolSize());
1✔
152
                        executor = newFixedThreadPool(props.getThreadPoolSize(),
1✔
153
                                        threadFactory);
154
                } else {
155
                        log.info("using unbounded thread pool");
1✔
156
                        executor = newCachedThreadPool(threadFactory);
1✔
157
                }
158

159
                if (props.isEnable()) {
1✔
160
                        var addr = new InetSocketAddress(props.getHost(), props.getPort());
×
161
                        serv = new ServerSocket();
×
162
                        serv.bind(addr);
×
163
                        servThread = threadFactory.newThread(this::acceptConnections);
×
164
                        servThread.setName("spalloc-legacy-service");
×
165
                        log.info("launching listener thread {} on address {}", servThread,
×
166
                                        addr);
167
                        servThread.start();
×
168
                }
169

170
                this.shutdownTimeout = props.getShutdownTimeout();
1✔
171
        }
1✔
172

173
        @PreDestroy
174
        private void close() throws IOException, InterruptedException {
175
                if (nonNull(serv)) {
×
176
                        log.info("shutting down listener thread {}", servThread);
×
177
                        // Shut down the server socket first; no new clients
178
                        servThread.interrupt();
×
179
                        serv.close();
×
180
                        servThread.join();
×
181
                }
182

183
                // Shut down the clients
184
                var remainingTasks = executor.shutdownNow();
×
185
                if (!remainingTasks.isEmpty()) {
×
186
                        log.warn("there are {} compat tasks outstanding",
×
187
                                        remainingTasks.size());
×
188
                }
189
                if (executor.awaitTermination(shutdownTimeout.toMillis(),
×
190
                                MILLISECONDS)) {
191
                        log.info("compat service stopped");
×
192
                } else {
193
                        log.warn("compat service executor ({}) still running!", executor);
×
194
                }
195
        }
×
196

197
        /**
198
         * Make a task.
199
         *
200
         * @param socket
201
         *            The connected socket that the task will be handling.
202
         * @return The task instance.
203
         */
204
        private V1CompatTask getTask(Socket socket) {
205
                return taskFactory.getObject(this, socket);
×
206
        }
207

208
        /**
209
         * Main service loop. Accepts connections and dispatches them to workers.
210
         */
211
        private void acceptConnections() {
212
                try {
213
                        while (acceptConnection()) {
×
214
                                continue;
×
215
                        }
216
                } finally {
217
                        try {
218
                                serv.close();
×
219
                        } catch (IOException e) {
×
220
                                log.warn("IO error", e);
×
221
                        }
×
222
                }
223
        }
×
224

225
        /**
226
         * Accept a single connection and dispatch it to a worker task in a thread.
227
         *
228
         * @return If {@code false}, we want to stop accepting connections.
229
         */
230
        private boolean acceptConnection() {
231
                try {
232
                        var service = getTask(serv.accept());
×
233
                        executor.execute(service::handleConnection);
×
234
                } catch (SocketException e) {
×
235
                        // Check here; interrupt = shutting down = no errors, please
236
                        if (interrupted()) {
×
237
                                return false;
×
238
                        }
239
                        if (serv.isClosed()) {
×
240
                                return false;
×
241
                        }
242
                        log.warn("IO error", e);
×
243
                } catch (IOException e) {
×
244
                        log.warn("IO error", e);
×
245
                }
×
246
                // If we've been interrupted here, we want the main loop to stop
247
                return !interrupted();
×
248
        }
249

250
        /**
251
         * Not a public API! Operations for testing only.
252
         *
253
         * @hidden
254
         */
255
        @ForTestingOnly
256
        public interface TestAPI {
257
                /**
258
                 * Make an instance of {@link V1CompatTask} that we can talk to.
259
                 *
260
                 * @param in
261
                 *            How to send a message to the task. Should be
262
                 *            <em>unconnected</em>.
263
                 * @param out
264
                 *            How to receive a message from the task. Should be
265
                 *            <em>unconnected</em>.
266
                 * @return A future that can be cancelled to shut things down.
267
                 * @throws Exception
268
                 *             If various things go wrong.
269
                 */
270
                Future<?> launchInstance(PipedWriter in, PipedReader out)
271
                                throws Exception;
272
        }
273

274
        /**
275
         * Not a public API!
276
         *
277
         * @return Test interface.
278
         * @deprecated Only for testing.
279
         * @hidden
280
         */
281
        @ForTestingOnly
282
        @RestrictedApi(explanation = "just for testing", link = "index.html",
283
                        allowedOnPath = ".*/src/test/java/.*")
284
        @Deprecated
285
        public TestAPI getTestApi() {
286
                ForTestingOnly.Utils.checkForTestClassOnStack();
1✔
287
                return new TestAPI() {
1✔
288
                        @Override
289
                        public Future<?> launchInstance(PipedWriter in, PipedReader out)
290
                                        throws Exception {
291
                                var service = taskFactory.getObject(V1CompatService.this,
1✔
292
                                                new PipedReader(in), new PipedWriter(out));
293
                                return executor.submit(service::handleConnection);
1✔
294
                        }
295
                };
296
        }
297
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc