• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 14556105732

21 Mar 2025 04:17PM UTC coverage: 38.266%. Remained the same
14556105732

push

github

web-flow
Merge pull request #1222 from SpiNNakerManchester/more_spalloc_rest_calls

More spalloc rest calls

70 of 815 new or added lines in 33 files covered. (8.59%)

1689 existing lines in 35 files now uncovered.

9193 of 24024 relevant lines covered (38.27%)

1.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.94
/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/alloc/client/SpallocClientFactory.java
1
/*
2
 * Copyright (c) 2021 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.alloc.client;
17

18
import static com.fasterxml.jackson.databind.PropertyNamingStrategies.KEBAB_CASE;
19
import static com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS;
20
import static java.lang.Integer.toUnsignedString;
21
import static java.lang.String.format;
22
import static java.lang.Thread.sleep;
23
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
24
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
25
import static java.net.HttpURLConnection.HTTP_NO_CONTENT;
26
import static java.net.URLEncoder.encode;
27
import static java.nio.charset.StandardCharsets.UTF_8;
28
import static java.nio.ByteOrder.LITTLE_ENDIAN;
29
import static java.util.Collections.synchronizedMap;
30
import static java.util.Objects.isNull;
31
import static java.util.Objects.nonNull;
32
import static java.util.stream.Collectors.joining;
33
import static java.util.stream.Collectors.toList;
34
import static org.apache.commons.io.IOUtils.readLines;
35
import static org.slf4j.LoggerFactory.getLogger;
36
import static uk.ac.manchester.spinnaker.alloc.client.ClientUtils.asDir;
37

38
import java.io.BufferedReader;
39
import java.io.FileNotFoundException;
40
import java.io.IOException;
41
import java.io.InputStream;
42
import java.io.InputStreamReader;
43
import java.io.OutputStreamWriter;
44
import java.net.HttpURLConnection;
45
import java.net.URI;
46
import java.net.URISyntaxException;
47
import java.nio.ByteBuffer;
48
import java.nio.channels.Channels;
49
import java.util.ArrayList;
50
import java.util.Collection;
51
import java.util.HashMap;
52
import java.util.List;
53
import java.util.Map;
54
import java.util.stream.Stream;
55

56
import org.apache.commons.io.IOUtils;
57
import org.slf4j.Logger;
58

59
import com.fasterxml.jackson.databind.json.JsonMapper;
60
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
61
import com.google.errorprone.annotations.MustBeClosed;
62
import com.google.errorprone.annotations.concurrent.GuardedBy;
63

64
import uk.ac.manchester.spinnaker.alloc.client.SpallocClient.Job;
65
import uk.ac.manchester.spinnaker.alloc.client.SpallocClient.Machine;
66
import uk.ac.manchester.spinnaker.alloc.client.SpallocClient.SpallocException;
67
import uk.ac.manchester.spinnaker.machine.ChipLocation;
68
import uk.ac.manchester.spinnaker.machine.CoreLocation;
69
import uk.ac.manchester.spinnaker.machine.HasChipLocation;
70
import uk.ac.manchester.spinnaker.machine.HasCoreLocation;
71
import uk.ac.manchester.spinnaker.machine.MemoryLocation;
72
import uk.ac.manchester.spinnaker.machine.board.PhysicalCoords;
73
import uk.ac.manchester.spinnaker.machine.board.TriadCoords;
74
import uk.ac.manchester.spinnaker.machine.tags.IPTag;
75
import uk.ac.manchester.spinnaker.messages.model.Version;
76
import uk.ac.manchester.spinnaker.storage.ProxyInformation;
77
import uk.ac.manchester.spinnaker.transceiver.SpinnmanException;
78
import uk.ac.manchester.spinnaker.transceiver.TransceiverInterface;
79
import uk.ac.manchester.spinnaker.utils.Daemon;
80

81
/**
82
 * A factory for clients to connect to the Spalloc service.
83
 * <p>
84
 * <strong>Implementation Note:</strong> Neither this class nor the client
85
 * classes it creates maintain state that needs to be closed explicitly
86
 * <em>except</em> for
87
 * {@linkplain SpallocClient.Job#getTransceiver() transceivers}, as transceivers
88
 * usually need to be closed.
89
 *
90
 * @author Donal Fellows
91
 */
92
public class SpallocClientFactory {
93
        private static final Logger log = getLogger(SpallocClientFactory.class);
3✔
94

95
        private static final String CONTENT_TYPE = "Content-Type";
96

97
        private static final String TEXT_PLAIN = "text/plain; charset=UTF-8";
98

99
        private static final String APPLICATION_JSON = "application/json";
100

101
        private static final String FORM_ENCODED =
102
                        "application/x-www-form-urlencoded";
103

104
        private static final URI KEEPALIVE = URI.create("keepalive");
3✔
105

106
        private static final URI MACHINE = URI.create("machine");
3✔
107

108
        private static final URI POWER = URI.create("power");
3✔
109

110
        private static final URI WAIT_FLAG = URI.create("?wait=true");
3✔
111

112
        private static final URI MEMORY = URI.create("memory");
3✔
113

114
        private static final URI FAST_DATA_WRITE = URI.create("fast-data-write");
3✔
115

116
        private static final URI FAST_DATA_READ = URI.create("fast-data-read");
3✔
117

118
        // Amount to divide keepalive interval by to get actual keep alive delay
119
        private static final int KEEPALIVE_DIVIDER = 2;
120

121
        /** Used to convert to/from JSON. */
122
        static final JsonMapper JSON_MAPPER = JsonMapper.builder()
3✔
123
                        .findAndAddModules().disable(WRITE_DATES_AS_TIMESTAMPS)
3✔
124
                        .addModule(new JavaTimeModule())
3✔
125
                        .propertyNamingStrategy(KEBAB_CASE).build();
3✔
126

127
        private final URI baseUrl;
128

129
        /**
130
         * Cache of machines, which don't expire.
131
         */
132
        private static final Map<String, Machine> MACHINE_MAP =
3✔
133
                        synchronizedMap(new HashMap<>());
3✔
134

135
        /**
136
         * Create a factory that can talk to a given service.
137
         *
138
         * @param baseUrl
139
         *            Where the server is.
140
         */
UNCOV
141
        public SpallocClientFactory(URI baseUrl) {
×
UNCOV
142
                this.baseUrl = asDir(baseUrl);
×
UNCOV
143
        }
×
144

145
        /**
146
         * Get a handle to a job given its proxy access information (derived from a
147
         * database query).
148
         *
149
         * @param proxy
150
         *            The proxy information from the database. Handles {@code null}.
151
         * @return The job handle, or {@code null} if {@code proxy==null}.
152
         * @throws IOException
153
         *             If connecting to the job fails.
154
         */
155
        public static Job getJobFromProxyInfo(ProxyInformation proxy)
156
                        throws IOException {
UNCOV
157
                if (proxy == null) {
×
UNCOV
158
                        return null;
×
159
                }
UNCOV
160
                log.info("Using proxy {} for connections", proxy.spallocUrl);
×
UNCOV
161
                return new SpallocClientFactory(URI.create(proxy.spallocUrl))
×
UNCOV
162
                                .getJob(proxy.jobUrl, proxy.headers, proxy.cookies);
×
163
        }
164

165
        /**
166
         * Read an object from a stream.
167
         *
168
         * @param <T>
169
         *            The type of the object to read.
170
         * @param is
171
         *            The stream
172
         * @param cls
173
         *            The class of object to read.
174
         * @return The object
175
         * @throws IOException
176
         *             If an I/O error happens or the content on the stream can't be
177
         *             made into an instance of the given class.
178
         */
179
        static <T> T readJson(InputStream is, Class<T> cls) throws IOException {
UNCOV
180
                BufferedReader streamReader = new BufferedReader(
×
181
                                new InputStreamReader(is, "UTF-8"));
UNCOV
182
                StringBuilder responseStrBuilder = new StringBuilder();
×
183

184
                String inputStr;
UNCOV
185
                while ((inputStr = streamReader.readLine()) != null) {
×
UNCOV
186
                        responseStrBuilder.append(inputStr);
×
187
                }
UNCOV
188
                String json = responseStrBuilder.toString();
×
189

190
                try {
UNCOV
191
                        return JSON_MAPPER.readValue(json, cls);
×
192
                } catch (IOException e) {
×
UNCOV
193
                        log.error("Error while reading json {}", json);
×
UNCOV
194
                        throw e;
×
195
                }
196
        }
197

198
        /**
199
         * Outputs a form to a connection in
200
         * {@code application/x-www-form-urlencoded} format.
201
         *
202
         * @param connection
203
         *            The connection. Must have the right verb set.
204
         * @param map
205
         *            The contents of the form.
206
         * @throws IOException
207
         *             If I/O fails.
208
         */
209
        static void writeForm(HttpURLConnection connection, Map<String, String> map)
210
                        throws IOException {
UNCOV
211
                var form = map.entrySet().stream()
×
UNCOV
212
                                .map(e -> e.getKey() + "=" + encode(e.getValue(), UTF_8))
×
UNCOV
213
                                .collect(joining("&"));
×
214

UNCOV
215
                connection.setDoOutput(true);
×
UNCOV
216
                connection.setRequestProperty(CONTENT_TYPE, FORM_ENCODED);
×
UNCOV
217
                try (var w =
×
UNCOV
218
                                new OutputStreamWriter(connection.getOutputStream(), UTF_8)) {
×
UNCOV
219
                        w.write(form);
×
220
                }
221
        }
×
222

223
        /**
224
         * Outputs an object to a connection in {@code application/json} format.
225
         *
226
         * @param connection
227
         *            The connection. Must have the right verb set.
228
         * @param object
229
         *            The object to write.
230
         * @throws IOException
231
         *             If I/O fails.
232
         */
233
        static void writeObject(HttpURLConnection connection, Object object)
234
                        throws IOException {
UNCOV
235
                connection.setDoOutput(true);
×
UNCOV
236
                connection.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
×
UNCOV
237
                try (var out = connection.getOutputStream()) {
×
UNCOV
238
                        JSON_MAPPER.writeValue(out, object);
×
239
                }
UNCOV
240
        }
×
241

242
        /**
243
         * Outputs a string to a connection in {@code text/plain} format.
244
         *
245
         * @param connection
246
         *            The connection. Must have the right verb set.
247
         * @param string
248
         *            The string to write.
249
         * @throws IOException
250
         *             If I/O fails.
251
         */
252
        static void writeString(HttpURLConnection connection, String string)
253
                        throws IOException {
UNCOV
254
                connection.setDoOutput(true);
×
UNCOV
255
                connection.setRequestProperty(CONTENT_TYPE, TEXT_PLAIN);
×
UNCOV
256
                try (var w = new OutputStreamWriter(connection.getOutputStream(),
×
257
                                UTF_8)) {
UNCOV
258
                        w.write(string);
×
259
                }
UNCOV
260
        }
×
261

262
        /**
263
         * Checks for errors in the response.
264
         *
265
         * @param conn
266
         *            The HTTP connection
267
         * @param errorMessage
268
         *            The message to use on error (describes what did not work at a
269
         *            higher level)
270
         * @return The input stream so any non-error response content can be
271
         *         obtained.
272
         * @throws IOException
273
         *             If things go wrong with comms.
274
         * @throws FileNotFoundException
275
         *             on a {@link HttpURLConnection#HTTP_NOT_FOUND}
276
         * @throws SpallocException
277
         *             on other server errors
278
         */
279
        @MustBeClosed
280
        static InputStream checkForError(HttpURLConnection conn,
281
                        String errorMessage) throws IOException {
UNCOV
282
                if (conn.getResponseCode() == HTTP_NOT_FOUND) {
×
283
                        // Special case
UNCOV
284
                        throw new FileNotFoundException(errorMessage);
×
285
                }
UNCOV
286
                if (conn.getResponseCode() >= HTTP_BAD_REQUEST) {
×
UNCOV
287
                        throw new SpallocException(conn.getErrorStream(),
×
UNCOV
288
                                        conn.getResponseCode());
×
289
                }
UNCOV
290
                return conn.getInputStream();
×
291
        }
292

293
        /**
294
         * Checks for errors in the response without expecting response content.
295
         *
296
         * @param conn
297
         *            The HTTP connection
298
         * @param errorMessage
299
         *            The message to use on error (describes what did not work at a
300
         *            higher level)
301
         * @throws IOException
302
         *             If things go wrong with comms.
303
         * @throws FileNotFoundException
304
         *             on a {@link HttpURLConnection#HTTP_NOT_FOUND}
305
         * @throws SpallocException
306
         *             on other server errors
307
         */
308
        static void checkForErrorNoResponse(HttpURLConnection conn,
309
                        String errorMessage) throws IOException {
NEW
310
                if (conn.getResponseCode() == HTTP_NOT_FOUND) {
×
311
                        // Special case
NEW
312
                        throw new FileNotFoundException(errorMessage);
×
313
                }
NEW
314
                if (conn.getResponseCode() >= HTTP_BAD_REQUEST) {
×
NEW
315
                        throw new SpallocException(conn.getErrorStream(),
×
NEW
316
                                        conn.getResponseCode());
×
317
                }
NEW
318
        }
×
319

320
        /**
321
         * Create a client and log in.
322
         *
323
         * @param username
324
         *            The username to log in with.
325
         * @param password
326
         *            The password to log in with.
327
         * @return The client API for the given server.
328
         * @throws IOException
329
         *             If the server doesn't respond or logging in fails.
330
         */
331
        public SpallocClient login(String username, String password)
332
                        throws IOException {
UNCOV
333
                var s = new ClientSession(baseUrl, username, password);
×
334

UNCOV
335
                return new ClientImpl(s, s.discoverRoot());
×
336
        }
337

338
        /**
339
         * Get direct access to a Job.
340
         *
341
         * @param uri
342
         *            The URI of the job
343
         * @param headers
344
         *            The headers to read authentication from.
345
         * @param cookies
346
         *            The cookies to read authentication from.
347
         * @return A job.
348
         * @throws IOException
349
         *             If there is an error communicating with the server.
350
         */
351
        public Job getJob(String uri, Map<String, String> headers,
352
                        Map<String, String> cookies) throws IOException {
353
                var u = URI.create(uri);
×
UNCOV
354
                var s = new ClientSession(baseUrl, headers, cookies);
×
355
                var c = new ClientImpl(s, s.discoverRoot());
×
UNCOV
356
                log.info("Connecting to job on {}", u);
×
UNCOV
357
                return c.job(u);
×
358
        }
359

360
        private abstract static class Common {
361
                private final SpallocClient client;
362

363
                final Session s;
364

UNCOV
365
                Common(SpallocClient client, Session s) {
×
UNCOV
366
                        this.client = client != null ? client : (SpallocClient) this;
×
UNCOV
367
                        this.s = s;
×
UNCOV
368
                }
×
369

370
                final Machine getMachine(String name) throws IOException {
UNCOV
371
                        Machine m = MACHINE_MAP.get(name);
×
372
                        if (m == null) {
×
UNCOV
373
                                client.listMachines();
×
UNCOV
374
                                m = MACHINE_MAP.get(name);
×
375
                        }
UNCOV
376
                        if (m == null) {
×
UNCOV
377
                                throw new IOException("Machine " + name + " not found");
×
378
                        }
UNCOV
379
                        return m;
×
380
                }
381

382
                private WhereIs whereis(HttpURLConnection conn) throws IOException {
UNCOV
383
                        try (var is = checkForError(conn,
×
384
                                        "couldn't get board information")) {
UNCOV
385
                                if (conn.getResponseCode() == HTTP_NO_CONTENT) {
×
UNCOV
386
                                        throw new FileNotFoundException("machine not allocated");
×
387
                                }
UNCOV
388
                                return readJson(is, WhereIs.class);
×
389
                        } finally {
390
                                s.trackCookie(conn);
×
391
                        }
392
                }
393

394
                final WhereIs whereis(URI uri) throws IOException {
UNCOV
395
                        return s.withRenewal(() -> {
×
UNCOV
396
                                var conn = s.connection(uri);
×
UNCOV
397
                                var w = whereis(conn);
×
UNCOV
398
                                w.setMachineHandle(getMachine(w.getMachineName()));
×
UNCOV
399
                                w.clearMachineRef();
×
UNCOV
400
                                return w;
×
401
                        });
402
                }
403
        }
404

405
        private static final class ClientImpl extends Common
406
                        implements SpallocClient {
407
                private Version v;
408

409
                private URI jobs;
410

411
                private URI machines;
412

413
                private ClientImpl(Session s, RootInfo ri) throws IOException {
414
                        super(null, s);
×
UNCOV
415
                        this.v = ri.version;
×
416
                        this.jobs = asDir(ri.jobsURI);
×
UNCOV
417
                        this.machines = asDir(ri.machinesURI);
×
UNCOV
418
                }
×
419

420
                @Override
421
                public Version getVersion() {
422
                        return v;
×
423
                }
424

425
                /**
426
                 * Slightly convoluted class to fetch jobs. The complication means we
427
                 * get the initial failure exception nice and early, while we're ready
428
                 * for it. This code would be quite a lot simpler if we didn't want to
429
                 * get the exception during construction.
430
                 */
431
                private class JobLister extends ListFetchingIter<URI> {
432
                        private URI next;
433

434
                        private List<URI> first;
435

436
                        JobLister(URI initial) throws IOException {
×
437
                                var first = getJobList(s.connection(initial));
×
UNCOV
438
                                next = first.next;
×
UNCOV
439
                                this.first = first.jobs;
×
UNCOV
440
                        }
×
441

442
                        private Jobs getJobList(HttpURLConnection conn) throws IOException {
UNCOV
443
                                try (var is = checkForError(conn, "couldn't list jobs")) {
×
UNCOV
444
                                        return readJson(is, Jobs.class);
×
445
                                } finally {
UNCOV
446
                                        s.trackCookie(conn);
×
447
                                }
448
                        }
449

450
                        @Override
451
                        List<URI> fetchNext() throws IOException {
452
                                if (nonNull(first)) {
×
453
                                        try {
454
                                                return first;
×
455
                                        } finally {
UNCOV
456
                                                first = null;
×
457
                                        }
458
                                }
459
                                var j = getJobList(s.connection(next));
×
UNCOV
460
                                next = j.next;
×
UNCOV
461
                                return j.jobs;
×
462
                        }
463

464
                        @Override
465
                        boolean canFetchMore() {
UNCOV
466
                                if (nonNull(first)) {
×
UNCOV
467
                                        return true;
×
468
                                }
UNCOV
469
                                return nonNull(next);
×
470
                        }
471
                }
472

473
                private Stream<Job> listJobs(URI flags) throws IOException {
474
                        var basicData = new JobLister(
×
475
                                        nonNull(flags) ? jobs.resolve(flags) : jobs);
×
476
                        return basicData.stream().flatMap(Collection::stream)
×
477
                                        .map(this::job);
×
478
                }
479

480
                @Override
481
                public List<Job> listJobs(boolean wait) throws IOException {
UNCOV
482
                        return s.withRenewal(() -> listJobs(WAIT_FLAG)).collect(toList());
×
483
                }
484

485
                @Override
486
                public Stream<Job> listJobsWithDeleted(boolean wait)
487
                                throws IOException {
UNCOV
488
                        var opts = new StringBuilder("?deleted=true");
×
489
                        if (wait) {
×
UNCOV
490
                                opts.append("&wait=true");
×
491
                        }
UNCOV
492
                        return s.withRenewal(() -> listJobs(URI.create(opts.toString())));
×
493
                }
494

495
                @Override
496
                public Job createJob(CreateJob createInstructions) throws IOException {
497
                        var uri = s.withRenewal(() -> {
×
498
                                var conn = s.connection(jobs, true);
×
UNCOV
499
                                writeObject(conn, createInstructions);
×
500
                                // Get the response entity... and discard it
UNCOV
501
                                try (var is = checkForError(conn, "job create failed")) {
×
UNCOV
502
                                        readLines(is, UTF_8);
×
503
                                        // But we do want the Location header
504
                                        return URI.create(conn.getHeaderField("Location"));
×
505
                                } finally {
506
                                        s.trackCookie(conn);
×
507
                                }
508
                        });
UNCOV
509
                        var job = job(uri);
×
UNCOV
510
                        job.startKeepalive(
×
511
                                        createInstructions.getKeepaliveInterval().toMillis()
×
512
                                        / KEEPALIVE_DIVIDER);
513
                        return job;
×
514
                }
515

516
                JobImpl job(URI uri) {
UNCOV
517
                        return new JobImpl(this, s, asDir(uri));
×
518
                }
519

520
                @Override
521
                public List<Machine> listMachines() throws IOException {
UNCOV
522
                        return s.withRenewal(() -> {
×
UNCOV
523
                                var conn = s.connection(machines);
×
UNCOV
524
                                try (var is = checkForError(conn, "list machines failed")) {
×
525
                                        var ms = readJson(is, Machines.class);
×
526
                                        // Assume we can cache this
527
                                        for (var bmd : ms.machines) {
×
UNCOV
528
                                                log.debug("Machine {} found", bmd.name);
×
529
                                                MACHINE_MAP.put(bmd.name,
×
530
                                                                new MachineImpl(this, s, bmd));
UNCOV
531
                                        }
×
UNCOV
532
                                        return new ArrayList<Machine>(MACHINE_MAP.values());
×
533
                                } finally {
534
                                        s.trackCookie(conn);
×
535
                                }
536
                        });
537
                }
538
        }
539

540
        private static final class JobImpl extends Common implements Job {
541
                private final URI uri;
542

543
                private volatile boolean dead;
544

545
                @GuardedBy("lock")
546
                private ProxyProtocolClient proxy;
547

548
                private final Object lock = new Object();
×
549

550
                JobImpl(SpallocClient client, Session session, URI uri) {
UNCOV
551
                        super(client, session);
×
UNCOV
552
                        this.uri = uri;
×
UNCOV
553
                        this.dead = false;
×
554
                }
×
555

556
                @Override
557
                public JobDescription describe(boolean wait) throws IOException {
UNCOV
558
                        return s.withRenewal(() -> {
×
559
                                var conn = wait ? s.connection(uri, WAIT_FLAG)
×
560
                                                : s.connection(uri);
×
561
                                try (var is = checkForError(conn, "couldn't get job state")) {
×
562
                                        return readJson(is, JobDescription.class);
×
563
                                } finally {
564
                                        s.trackCookie(conn);
×
565
                                }
566
                        });
567
                }
568

569
                @Override
570
                public void keepalive() throws IOException {
571
                        s.withRenewal(() -> {
×
UNCOV
572
                                var conn = s.connection(uri, KEEPALIVE, true);
×
UNCOV
573
                                conn.setRequestMethod("PUT");
×
UNCOV
574
                                writeString(conn, "alive");
×
UNCOV
575
                                try (var is = checkForError(conn, "couldn't keep job alive")) {
×
UNCOV
576
                                        return readLines(is, UTF_8);
×
577
                                        // Ignore the output
578
                                } finally {
UNCOV
579
                                        s.trackCookie(conn);
×
580
                                }
581
                        });
UNCOV
582
                }
×
583

584
                public void startKeepalive(long delayMs) {
585
                        if (dead) {
×
UNCOV
586
                                throw new IllegalStateException("job is already deleted");
×
587
                        }
588
                        var t = new Daemon(() -> {
×
589
                                try {
590
                                        while (true) {
591
                                                sleep(delayMs);
×
UNCOV
592
                                                if (dead) {
×
UNCOV
593
                                                        break;
×
594
                                                }
595
                                                keepalive();
×
596
                                        }
597
                                } catch (IOException e) {
×
598
                                        log.warn("failed to keep job alive for {}", this, e);
×
599
                                } catch (InterruptedException e) {
×
600
                                        // If interrupted, we're simply done
601
                                }
×
UNCOV
602
                        });
×
UNCOV
603
                        t.setName("keepalive for " + uri);
×
UNCOV
604
                        t.setUncaughtExceptionHandler((th, e) -> {
×
UNCOV
605
                                log.warn("unexpected exception in {}", th, e);
×
UNCOV
606
                        });
×
UNCOV
607
                        t.start();
×
608
                }
×
609

610
                @Override
611
                public void delete(String reason) throws IOException {
612
                        dead = true;
×
613
                        s.withRenewal(() -> {
×
UNCOV
614
                                var conn = s.connection(uri, "?reason=" + encode(reason, UTF_8),
×
615
                                                true);
616
                                conn.setRequestMethod("DELETE");
×
UNCOV
617
                                try (var is = checkForError(conn, "couldn't delete job")) {
×
UNCOV
618
                                        readLines(is, UTF_8);
×
619
                                        // Ignore the output
620
                                } finally {
UNCOV
621
                                        s.trackCookie(conn);
×
622
                                }
623
                                return this;
×
624
                        });
625
                        synchronized (lock) {
×
UNCOV
626
                                if (haveProxy()) {
×
UNCOV
627
                                        proxy.close();
×
628
                                        proxy = null;
×
629
                                }
630
                        }
×
UNCOV
631
                }
×
632

633
                @Override
634
                public AllocatedMachine machine() throws IOException {
635
                        var am = s.withRenewal(() -> {
×
636
                                var conn = s.connection(uri, MACHINE);
×
UNCOV
637
                                try (var is = checkForError(conn,
×
638
                                                "couldn't get allocation description")) {
639
                                        if (conn.getResponseCode() == HTTP_NO_CONTENT) {
×
640
                                                throw new IOException("machine not allocated");
×
641
                                        }
642
                                        return readJson(is, AllocatedMachine.class);
×
643
                                } finally {
644
                                        s.trackCookie(conn);
×
645
                                }
646
                        });
UNCOV
647
                        am.setMachine(getMachine(am.getMachineName()));
×
UNCOV
648
                        return am;
×
649
                }
650

651
                @Override
652
                public boolean getPower() throws IOException {
653
                        return s.withRenewal(() -> {
×
654
                                var conn = s.connection(uri, POWER);
×
655
                                try (var is = checkForError(conn, "couldn't get power state")) {
×
UNCOV
656
                                        if (conn.getResponseCode() == HTTP_NO_CONTENT) {
×
UNCOV
657
                                                throw new IOException("machine not allocated");
×
658
                                        }
UNCOV
659
                                        return "ON".equals(readJson(is, Power.class).power);
×
660
                                } finally {
UNCOV
661
                                        s.trackCookie(conn);
×
662
                                }
663
                        });
664
                }
665

666
                @Override
667
                public boolean setPower(boolean switchOn) throws IOException {
668
                        var power = new Power();
×
UNCOV
669
                        power.power = (switchOn ? "ON" : "OFF");
×
UNCOV
670
                        boolean powered = s.withRenewal(() -> {
×
UNCOV
671
                                var conn = s.connection(uri, POWER, true);
×
672
                                conn.setRequestMethod("PUT");
×
673
                                writeObject(conn, power);
×
674
                                try (var is = checkForError(conn, "couldn't set power state")) {
×
UNCOV
675
                                        if (conn.getResponseCode() == HTTP_NO_CONTENT) {
×
676
                                                throw new IOException("machine not allocated");
×
677
                                        }
UNCOV
678
                                        return "ON".equals(readJson(is, Power.class).power);
×
679
                                } finally {
UNCOV
680
                                        s.trackCookie(conn);
×
681
                                }
682
                        });
UNCOV
683
                        if (!powered) {
×
684
                                // If someone turns the power off, close the proxy
685
                                synchronized (lock) {
×
UNCOV
686
                                        if (haveProxy()) {
×
UNCOV
687
                                                proxy.close();
×
UNCOV
688
                                                proxy = null;
×
689
                                        }
690
                                }
×
691
                        }
692
                        return powered;
×
693
                }
694

695
                @Override
696
                public WhereIs whereIs(HasChipLocation chip) throws IOException {
UNCOV
697
                        return whereis(uri.resolve(
×
698
                                        format("chip?x=%d&y=%d", chip.getX(), chip.getY())));
×
699
                }
700

701
                @GuardedBy("lock")
702
                private boolean haveProxy() {
UNCOV
703
                        return nonNull(proxy) && proxy.isOpen();
×
704
                }
705

706
                /**
707
                 * @return The websocket-based proxy.
708
                 * @throws IOException
709
                 *             if we can't connect
710
                 * @throws InterruptedException
711
                 *             if we're interrupted while connecting
712
                 */
713
                private ProxyProtocolClient getProxy()
714
                                throws IOException, InterruptedException {
715
                        synchronized (lock) {
×
UNCOV
716
                                if (haveProxy()) {
×
717
                                        return proxy;
×
718
                                }
UNCOV
719
                        }
×
720
                        var wssAddr = describe(false).getProxyAddress();
×
UNCOV
721
                        if (isNull(wssAddr)) {
×
722
                                throw new IOException("machine not allocated");
×
723
                        }
724
                        synchronized (lock) {
×
725
                                if (!haveProxy()) {
×
UNCOV
726
                                        proxy = s.withRenewal(() -> s.websocket(wssAddr));
×
727
                                }
UNCOV
728
                                return proxy;
×
729
                        }
730
                }
731

732
                @MustBeClosed
733
                @Override
734
                public TransceiverInterface getTransceiver()
735
                                throws IOException, InterruptedException, SpinnmanException {
UNCOV
736
                        var ws = getProxy();
×
NEW
737
                        return new ProxiedTransceiver(this, ws);
×
738
                }
739

740
                @Override
741
                public String toString() {
UNCOV
742
                        return "Job(" + uri + ")";
×
743
                }
744

745
                @Override
746
                public void writeMemory(HasChipLocation chip,
747
                                MemoryLocation baseAddress, ByteBuffer data)
748
                                throws IOException {
749
                        try {
NEW
750
                                s.withRenewal(() -> {
×
NEW
751
                                        var conn = s.connection(uri,
×
NEW
752
                                                        new URI(MEMORY + "?x=" + chip.getX()
×
NEW
753
                                                                        + "&y=" + chip.getY()
×
754
                                                                        + "&address="
NEW
755
                                                                        + toUnsignedString(baseAddress.address)),
×
756
                                                        true);
NEW
757
                                        conn.setDoOutput(true);
×
NEW
758
                                        conn.setRequestMethod("POST");
×
NEW
759
                                        conn.setRequestProperty(
×
760
                                                        "Content-Type", "application/octet-stream");
NEW
761
                                        try (var os = conn.getOutputStream();
×
NEW
762
                                                        var channel = Channels.newChannel(os)) {
×
NEW
763
                                                channel.write(data);
×
764
                                        }
NEW
765
                                        checkForErrorNoResponse(conn, "Couldn't write memory");
×
NEW
766
                                        return null;
×
767
                                });
NEW
768
                        } catch (URISyntaxException e) {
×
NEW
769
                                throw new IOException(e);
×
NEW
770
                        }
×
NEW
771
                }
×
772

773
                @Override
774
                public ByteBuffer readMemory(HasChipLocation chip,
775
                                MemoryLocation baseAddress, int length)
776
                                throws IOException {
777
                        try {
NEW
778
                                return s.withRenewal(() -> {
×
NEW
779
                                        var conn = s.connection(uri,
×
NEW
780
                                                        new URI(MEMORY + "?x=" + chip.getX()
×
NEW
781
                                                                        + "&y=" + chip.getY()
×
782
                                                                        + "&address="
NEW
783
                                                                        + toUnsignedString(baseAddress.address)
×
784
                                                                        + "&size=" + length));
NEW
785
                                        conn.setRequestMethod("GET");
×
NEW
786
                                        conn.setRequestProperty(
×
787
                                                        "Accept", "application/octet-stream");
NEW
788
                                        try (var is = checkForError(conn, "couldn't read memory")) {
×
NEW
789
                                                var buffer = ByteBuffer.allocate(length);
×
NEW
790
                                                var channel = Channels.newChannel(is);
×
NEW
791
                                                IOUtils.readFully(channel, buffer);
×
NEW
792
                                                buffer.rewind();
×
NEW
793
                                                return buffer.asReadOnlyBuffer().order(LITTLE_ENDIAN);
×
794
                                        }
795
                                });
NEW
796
                        } catch (URISyntaxException e) {
×
NEW
797
                                throw new IOException(e);
×
798
                        }
799
                }
800

801
                @Override
802
                public void fastWriteData(CoreLocation gathererCore,
803
                                IPTag iptag, HasChipLocation chip, MemoryLocation baseAddress,
804
                                ByteBuffer data) throws IOException {
805
                        try {
NEW
806
                                s.withRenewal(() -> {
×
NEW
807
                                        var conn = s.connection(uri,
×
808
                                                        new URI(FAST_DATA_WRITE
NEW
809
                                                                        + "?gather_x=" + gathererCore.getX()
×
NEW
810
                                                                        + "&gather_y=" + gathererCore.getY()
×
NEW
811
                                                                        + "&gather_p=" + gathererCore.getP()
×
NEW
812
                                                                        + "&eth_x=" + iptag.getDestination().getX()
×
NEW
813
                                                                        + "&eth_y=" + iptag.getDestination().getY()
×
814
                                                                        + "&eth_address="
NEW
815
                                                                        + iptag.getBoardAddress().getHostAddress()
×
NEW
816
                                                                        + "&iptag=" + iptag.getTag()
×
NEW
817
                                                                        + "&x=" + chip.getX()
×
NEW
818
                                                                        + "&y=" + chip.getY()
×
819
                                                                        + "&address="
NEW
820
                                                                        + toUnsignedString(baseAddress.address)),
×
821
                                                        true);
NEW
822
                                        conn.setDoOutput(true);
×
NEW
823
                                        conn.setRequestMethod("POST");
×
NEW
824
                                        conn.setRequestProperty(
×
825
                                                        "Content-Type", "application/octet-stream");
NEW
826
                                        try (var os = conn.getOutputStream();
×
NEW
827
                                                        var channel = Channels.newChannel(os)) {
×
NEW
828
                                                channel.write(data);
×
829
                                        }
NEW
830
                                        checkForErrorNoResponse(conn, "Couldn't fast write memory");
×
NEW
831
                                        return null;
×
832
                                });
NEW
833
                        } catch (URISyntaxException e) {
×
NEW
834
                                throw new IOException(e);
×
NEW
835
                        }
×
NEW
836
                }
×
837

838
                @Override
839
                public ByteBuffer fastReadData(ChipLocation gathererChip,
840
                                IPTag iptag, HasCoreLocation monitorCore,
841
                                MemoryLocation baseAddress, int length) throws IOException {
842
                        try {
NEW
843
                                return s.withRenewal(() -> {
×
NEW
844
                                        var conn = s.connection(uri,
×
845
                                                        new URI(FAST_DATA_READ
NEW
846
                                                                        + "?gather_x=" + gathererChip.getX()
×
NEW
847
                                                                        + "&gather_y=" + gathererChip.getY()
×
NEW
848
                                                                        + "&eth_x=" + iptag.getDestination().getX()
×
NEW
849
                                                                        + "&eth_y=" + iptag.getDestination().getY()
×
850
                                                                        + "&eth_address="
NEW
851
                                                                        + iptag.getBoardAddress().getHostAddress()
×
NEW
852
                                                                        + "&iptag=" + iptag.getTag()
×
NEW
853
                                                                        + "&x=" + monitorCore.getX()
×
NEW
854
                                                                        + "&y=" + monitorCore.getY()
×
NEW
855
                                                                        + "&p=" + monitorCore.getP()
×
856
                                                                        + "&address="
NEW
857
                                                                        + toUnsignedString(baseAddress.address)
×
858
                                                                        + "&size=" + length));
NEW
859
                                        conn.setRequestMethod("GET");
×
NEW
860
                                        conn.setRequestProperty(
×
861
                                                        "Accept", "application/octet-stream");
NEW
862
                                        try (var is = checkForError(conn, "couldn't read memory")) {
×
NEW
863
                                                var buffer = ByteBuffer.allocate(length);
×
NEW
864
                                                var channel = Channels.newChannel(is);
×
NEW
865
                                                IOUtils.readFully(channel, buffer);
×
NEW
866
                                                buffer.rewind();
×
NEW
867
                                                return buffer.asReadOnlyBuffer().order(LITTLE_ENDIAN);
×
868
                                        }
869
                                });
NEW
870
                        } catch (URISyntaxException e) {
×
NEW
871
                                throw new IOException(e);
×
872
                        }
873
                }
874
        }
875

876
        private static final class MachineImpl extends Common implements Machine {
877
                private static final int TRIAD = 3;
878

879
                private final BriefMachineDescription bmd;
880

881
                private List<BoardCoords> deadBoards;
882

883
                private List<DeadLink> deadLinks;
884

885
                MachineImpl(SpallocClient client, Session session,
886
                                BriefMachineDescription bmd) {
UNCOV
887
                        super(client, session);
×
UNCOV
888
                        this.bmd = bmd;
×
UNCOV
889
                        this.deadBoards = List.copyOf(bmd.deadBoards);
×
UNCOV
890
                        this.deadLinks = List.copyOf(bmd.deadLinks);
×
891
                }
×
892

893
                @Override
894
                public String getName() {
UNCOV
895
                        return bmd.name;
×
896
                }
897

898
                @Override
899
                public List<String> getTags() {
900
                        return bmd.tags;
×
901
                }
902

903
                @Override
904
                public int getWidth() {
UNCOV
905
                        return bmd.width;
×
906
                }
907

908
                @Override
909
                public int getHeight() {
910
                        return bmd.height;
×
911
                }
912

913
                @Override
914
                public int getLiveBoardCount() {
915
                        return bmd.width * bmd.height * TRIAD - bmd.deadBoards.size();
×
916
                }
917

918
                @Override
919
                public List<BoardCoords> getDeadBoards() {
920
                        return deadBoards;
×
921
                }
922

923
                @Override
924
                public List<DeadLink> getDeadLinks() {
UNCOV
925
                        return deadLinks;
×
926
                }
927

928
                @Override
929
                public void waitForChange() throws IOException {
930
                        var nbmd = s.withRenewal(() -> {
×
UNCOV
931
                                var conn = s.connection(bmd.uri, WAIT_FLAG);
×
932
                                try (var is = checkForError(conn,
×
933
                                                "couldn't wait for state change")) {
934
                                        return readJson(is, BriefMachineDescription.class);
×
935
                                } finally {
UNCOV
936
                                        s.trackCookie(conn);
×
937
                                }
938
                        });
939
                        this.deadBoards = List.copyOf(nbmd.deadBoards);
×
940
                        this.deadLinks = List.copyOf(nbmd.deadLinks);
×
941
                }
×
942

943
                @Override
944
                public WhereIs getBoard(TriadCoords coords) throws IOException {
945
                        return whereis(
×
946
                                        bmd.uri.resolve(format("logical-board?x=%d&y=%d&z=%d",
×
UNCOV
947
                                                        coords.x, coords.y, coords.z)));
×
948
                }
949

950
                @Override
951
                public WhereIs getBoard(PhysicalCoords coords) throws IOException {
UNCOV
952
                        return whereis(bmd.uri.resolve(
×
UNCOV
953
                                        format("physical-board?cabinet=%d&frame=%d&board=%d",
×
UNCOV
954
                                                        coords.c, coords.f, coords.b)));
×
955
                }
956

957
                @Override
958
                public WhereIs getBoard(HasChipLocation chip) throws IOException {
959
                        return whereis(bmd.uri.resolve(
×
960
                                        format("chip?x=%d&y=%d", chip.getX(), chip.getY())));
×
961
                }
962

963
                @Override
964
                public WhereIs getBoard(String address) throws IOException {
965
                        return whereis(bmd.uri.resolve(
×
966
                                        format("board-ip?address=%s", encode(address, UTF_8))));
×
967
                }
968
        }
969
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc