• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 13502930948

24 Feb 2025 04:31PM UTC coverage: 38.515% (-0.01%) from 38.525%
13502930948

push

github

rowleya
Client and server side of Fast Data Write

653 of 981 new or added lines in 8 files covered. (66.56%)

4 existing lines in 2 files now uncovered.

9190 of 23861 relevant lines covered (38.51%)

1.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

4.27
/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/alloc/client/SpallocClientFactory.java
1
/*
2
 * Copyright (c) 2021 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.alloc.client;
17

18
import static com.fasterxml.jackson.databind.PropertyNamingStrategies.KEBAB_CASE;
19
import static com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS;
20
import static java.lang.Integer.toUnsignedString;
21
import static java.lang.String.format;
22
import static java.lang.Thread.sleep;
23
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
24
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
25
import static java.net.HttpURLConnection.HTTP_NO_CONTENT;
26
import static java.net.URLEncoder.encode;
27
import static java.nio.charset.StandardCharsets.UTF_8;
28
import static java.nio.ByteOrder.LITTLE_ENDIAN;
29
import static java.util.Collections.synchronizedMap;
30
import static java.util.Objects.isNull;
31
import static java.util.Objects.nonNull;
32
import static java.util.stream.Collectors.joining;
33
import static java.util.stream.Collectors.toList;
34
import static org.apache.commons.io.IOUtils.readLines;
35
import static org.slf4j.LoggerFactory.getLogger;
36
import static uk.ac.manchester.spinnaker.alloc.client.ClientUtils.asDir;
37

38
import java.io.BufferedReader;
39
import java.io.FileNotFoundException;
40
import java.io.IOException;
41
import java.io.InputStream;
42
import java.io.InputStreamReader;
43
import java.io.OutputStreamWriter;
44
import java.net.HttpURLConnection;
45
import java.net.URI;
46
import java.net.URISyntaxException;
47
import java.nio.ByteBuffer;
48
import java.nio.channels.Channels;
49
import java.util.ArrayList;
50
import java.util.Collection;
51
import java.util.HashMap;
52
import java.util.List;
53
import java.util.Map;
54
import java.util.stream.Stream;
55

56
import org.apache.commons.io.IOUtils;
57
import org.slf4j.Logger;
58

59
import com.fasterxml.jackson.databind.json.JsonMapper;
60
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
61
import com.google.errorprone.annotations.MustBeClosed;
62
import com.google.errorprone.annotations.concurrent.GuardedBy;
63

64
import uk.ac.manchester.spinnaker.alloc.client.SpallocClient.Job;
65
import uk.ac.manchester.spinnaker.alloc.client.SpallocClient.Machine;
66
import uk.ac.manchester.spinnaker.alloc.client.SpallocClient.SpallocException;
67
import uk.ac.manchester.spinnaker.machine.ChipLocation;
68
import uk.ac.manchester.spinnaker.machine.CoreLocation;
69
import uk.ac.manchester.spinnaker.machine.HasChipLocation;
70
import uk.ac.manchester.spinnaker.machine.MemoryLocation;
71
import uk.ac.manchester.spinnaker.machine.board.PhysicalCoords;
72
import uk.ac.manchester.spinnaker.machine.board.TriadCoords;
73
import uk.ac.manchester.spinnaker.machine.tags.IPTag;
74
import uk.ac.manchester.spinnaker.messages.model.Version;
75
import uk.ac.manchester.spinnaker.storage.ProxyInformation;
76
import uk.ac.manchester.spinnaker.transceiver.SpinnmanException;
77
import uk.ac.manchester.spinnaker.transceiver.TransceiverInterface;
78
import uk.ac.manchester.spinnaker.utils.Daemon;
79

80
/**
81
 * A factory for clients to connect to the Spalloc service.
82
 * <p>
83
 * <strong>Implementation Note:</strong> Neither this class nor the client
84
 * classes it creates maintain state that needs to be closed explicitly
85
 * <em>except</em> for
86
 * {@linkplain SpallocClient.Job#getTransceiver() transceivers}, as transceivers
87
 * usually need to be closed.
88
 *
89
 * @author Donal Fellows
90
 */
91
public class SpallocClientFactory {
92
    private static final Logger log = getLogger(SpallocClientFactory.class);
3✔
93

94
    private static final String CONTENT_TYPE = "Content-Type";
95

96
    private static final String TEXT_PLAIN = "text/plain; charset=UTF-8";
97

98
    private static final String APPLICATION_JSON = "application/json";
99

100
    private static final String FORM_ENCODED =
101
            "application/x-www-form-urlencoded";
102

103
    private static final URI KEEPALIVE = URI.create("keepalive");
3✔
104

105
    private static final URI MACHINE = URI.create("machine");
3✔
106

107
    private static final URI POWER = URI.create("power");
3✔
108

109
    private static final URI WAIT_FLAG = URI.create("?wait=true");
3✔
110

111
    private static final URI MEMORY = URI.create("memory");
3✔
112

113
    private static final URI FAST_DATA_WRITE = URI.create("fast-data-write");
3✔
114

115
    private static final URI FAST_DATA_READ = URI.create("fast-data-read");
3✔
116

117
    // Amount to divide keepalive interval by to get actual keep alive delay
118
    private static final int KEEPALIVE_DIVIDER = 2;
119

120
    /** Used to convert to/from JSON. */
121
    static final JsonMapper JSON_MAPPER = JsonMapper.builder()
3✔
122
            .findAndAddModules().disable(WRITE_DATES_AS_TIMESTAMPS)
3✔
123
            .addModule(new JavaTimeModule())
3✔
124
            .propertyNamingStrategy(KEBAB_CASE).build();
3✔
125

126
    private final URI baseUrl;
127

128
    /**
129
     * Cache of machines, which don't expire.
130
     */
131
    private static final Map<String, Machine> MACHINE_MAP =
3✔
132
            synchronizedMap(new HashMap<>());
3✔
133

134
    /**
135
     * Create a factory that can talk to a given service.
136
     *
137
     * @param baseUrl
138
     *            Where the server is.
139
     */
140
    public SpallocClientFactory(URI baseUrl) {
×
141
        this.baseUrl = asDir(baseUrl);
×
142
    }
×
143

144
    /**
145
     * Get a handle to a job given its proxy access information (derived from a
146
     * database query).
147
     *
148
     * @param proxy
149
     *            The proxy information from the database. Handles {@code null}.
150
     * @return The job handle, or {@code null} if {@code proxy==null}.
151
     * @throws IOException
152
     *             If connecting to the job fails.
153
     */
154
    public static Job getJobFromProxyInfo(ProxyInformation proxy)
155
            throws IOException {
156
        if (proxy == null) {
×
157
            return null;
×
158
        }
159
        log.info("Using proxy {} for connections", proxy.spallocUrl);
×
160
        return new SpallocClientFactory(URI.create(proxy.spallocUrl))
×
161
                .getJob(proxy.jobUrl, proxy.headers, proxy.cookies);
×
162
    }
163

164
    /**
165
     * Read an object from a stream.
166
     *
167
     * @param <T>
168
     *            The type of the object to read.
169
     * @param is
170
     *            The stream
171
     * @param cls
172
     *            The class of object to read.
173
     * @return The object
174
     * @throws IOException
175
     *             If an I/O error happens or the content on the stream can't be
176
     *             made into an instance of the given class.
177
     */
178
    static <T> T readJson(InputStream is, Class<T> cls) throws IOException {
179
        BufferedReader streamReader = new BufferedReader(
×
180
                new InputStreamReader(is, "UTF-8"));
181
        StringBuilder responseStrBuilder = new StringBuilder();
×
182

183
        String inputStr;
184
        while ((inputStr = streamReader.readLine()) != null) {
×
185
            responseStrBuilder.append(inputStr);
×
186
        }
187
        String json = responseStrBuilder.toString();
×
188

189
        try {
190
            return JSON_MAPPER.readValue(json, cls);
×
191
        } catch (IOException e) {
×
192
            log.error("Error while reading json {}", json);
×
193
            throw e;
×
194
        }
195
    }
196

197
    /**
198
     * Outputs a form to a connection in
199
     * {@code application/x-www-form-urlencoded} format.
200
     *
201
     * @param connection
202
     *            The connection. Must have the right verb set.
203
     * @param map
204
     *            The contents of the form.
205
     * @throws IOException
206
     *             If I/O fails.
207
     */
208
    static void writeForm(HttpURLConnection connection, Map<String, String> map)
209
            throws IOException {
210
        var form = map.entrySet().stream()
×
211
                .map(e -> e.getKey() + "=" + encode(e.getValue(), UTF_8))
×
212
                .collect(joining("&"));
×
213

214
        connection.setDoOutput(true);
×
215
        connection.setRequestProperty(CONTENT_TYPE, FORM_ENCODED);
×
216
        try (var w =
×
217
                new OutputStreamWriter(connection.getOutputStream(), UTF_8)) {
×
218
            w.write(form);
×
219
        }
220
    }
×
221

222
    /**
223
     * Outputs an object to a connection in {@code application/json} format.
224
     *
225
     * @param connection
226
     *            The connection. Must have the right verb set.
227
     * @param object
228
     *            The object to write.
229
     * @throws IOException
230
     *             If I/O fails.
231
     */
232
    static void writeObject(HttpURLConnection connection, Object object)
233
            throws IOException {
234
        connection.setDoOutput(true);
×
235
        connection.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
×
236
        try (var out = connection.getOutputStream()) {
×
237
            JSON_MAPPER.writeValue(out, object);
×
238
        }
239
    }
×
240

241
    /**
242
     * Outputs a string to a connection in {@code text/plain} format.
243
     *
244
     * @param connection
245
     *            The connection. Must have the right verb set.
246
     * @param string
247
     *            The string to write.
248
     * @throws IOException
249
     *             If I/O fails.
250
     */
251
    static void writeString(HttpURLConnection connection, String string)
252
            throws IOException {
253
        connection.setDoOutput(true);
×
254
        connection.setRequestProperty(CONTENT_TYPE, TEXT_PLAIN);
×
255
        try (var w = new OutputStreamWriter(connection.getOutputStream(),
×
256
                UTF_8)) {
257
            w.write(string);
×
258
        }
259
    }
×
260

261
    /**
262
     * Checks for errors in the response.
263
     *
264
     * @param conn
265
     *            The HTTP connection
266
     * @param errorMessage
267
     *            The message to use on error (describes what did not work at a
268
     *            higher level)
269
     * @return The input stream so any non-error response content can be
270
     *         obtained.
271
     * @throws IOException
272
     *             If things go wrong with comms.
273
     * @throws FileNotFoundException
274
     *             on a {@link HttpURLConnection#HTTP_NOT_FOUND}
275
     * @throws SpallocException
276
     *             on other server errors
277
     */
278
    @MustBeClosed
279
    static InputStream checkForError(HttpURLConnection conn,
280
            String errorMessage) throws IOException {
281
        if (conn.getResponseCode() == HTTP_NOT_FOUND) {
×
282
            // Special case
283
            throw new FileNotFoundException(errorMessage);
×
284
        }
285
        if (conn.getResponseCode() >= HTTP_BAD_REQUEST) {
×
286
            throw new SpallocException(conn.getErrorStream(),
×
287
                    conn.getResponseCode());
×
288
        }
289
        return conn.getInputStream();
×
290
    }
291

292
    /**
293
     * Create a client and log in.
294
     *
295
     * @param username
296
     *            The username to log in with.
297
     * @param password
298
     *            The password to log in with.
299
     * @return The client API for the given server.
300
     * @throws IOException
301
     *             If the server doesn't respond or logging in fails.
302
     */
303
    public SpallocClient login(String username, String password)
304
            throws IOException {
305
        var s = new ClientSession(baseUrl, username, password);
×
306

307
        return new ClientImpl(s, s.discoverRoot());
×
308
    }
309

310
    /**
311
     * Get direct access to a Job.
312
     *
313
     * @param uri
314
     *            The URI of the job
315
     * @param headers
316
     *            The headers to read authentication from.
317
     * @param cookies
318
     *            The cookies to read authentication from.
319
     * @return A job.
320
     * @throws IOException
321
     *             If there is an error communicating with the server.
322
     */
323
    public Job getJob(String uri, Map<String, String> headers,
324
            Map<String, String> cookies) throws IOException {
325
        var u = URI.create(uri);
×
326
        var s = new ClientSession(baseUrl, headers, cookies);
×
327
        var c = new ClientImpl(s, s.discoverRoot());
×
328
        log.info("Connecting to job on {}", u);
×
329
        return c.job(u);
×
330
    }
331

332
    private abstract static class Common {
333
        private final SpallocClient client;
334

335
        final Session s;
336

337
        Common(SpallocClient client, Session s) {
×
338
            this.client = client != null ? client : (SpallocClient) this;
×
339
            this.s = s;
×
340
        }
×
341

342
        final Machine getMachine(String name) throws IOException {
343
            Machine m = MACHINE_MAP.get(name);
×
344
            if (m == null) {
×
345
                client.listMachines();
×
346
                m = MACHINE_MAP.get(name);
×
347
            }
348
            if (m == null) {
×
349
                throw new IOException("Machine " + name + " not found");
×
350
            }
351
            return m;
×
352
        }
353

354
        private WhereIs whereis(HttpURLConnection conn) throws IOException {
355
            try (var is = checkForError(conn,
×
356
                    "couldn't get board information")) {
357
                if (conn.getResponseCode() == HTTP_NO_CONTENT) {
×
358
                    throw new FileNotFoundException("machine not allocated");
×
359
                }
360
                return readJson(is, WhereIs.class);
×
361
            } finally {
362
                s.trackCookie(conn);
×
363
            }
364
        }
365

366
        final WhereIs whereis(URI uri) throws IOException {
367
            return s.withRenewal(() -> {
×
368
                var conn = s.connection(uri);
×
369
                var w = whereis(conn);
×
370
                w.setMachineHandle(getMachine(w.getMachineName()));
×
371
                w.clearMachineRef();
×
372
                return w;
×
373
            });
374
        }
375
    }
376

377
    private static final class ClientImpl extends Common
378
            implements SpallocClient {
379
        private Version v;
380

381
        private URI jobs;
382

383
        private URI machines;
384

385
        private ClientImpl(Session s, RootInfo ri) throws IOException {
386
            super(null, s);
×
387
            this.v = ri.version;
×
388
            this.jobs = asDir(ri.jobsURI);
×
389
            this.machines = asDir(ri.machinesURI);
×
390
        }
×
391

392
        @Override
393
        public Version getVersion() {
394
            return v;
×
395
        }
396

397
        /**
398
         * Slightly convoluted class to fetch jobs. The complication means we
399
         * get the initial failure exception nice and early, while we're ready
400
         * for it. This code would be quite a lot simpler if we didn't want to
401
         * get the exception during construction.
402
         */
403
        private class JobLister extends ListFetchingIter<URI> {
404
            private URI next;
405

406
            private List<URI> first;
407

408
            JobLister(URI initial) throws IOException {
×
409
                var first = getJobList(s.connection(initial));
×
410
                next = first.next;
×
411
                this.first = first.jobs;
×
412
            }
×
413

414
            private Jobs getJobList(HttpURLConnection conn) throws IOException {
415
                try (var is = checkForError(conn, "couldn't list jobs")) {
×
416
                    return readJson(is, Jobs.class);
×
417
                } finally {
418
                    s.trackCookie(conn);
×
419
                }
420
            }
421

422
            @Override
423
            List<URI> fetchNext() throws IOException {
424
                if (nonNull(first)) {
×
425
                    try {
426
                        return first;
×
427
                    } finally {
428
                        first = null;
×
429
                    }
430
                }
431
                var j = getJobList(s.connection(next));
×
432
                next = j.next;
×
433
                return j.jobs;
×
434
            }
435

436
            @Override
437
            boolean canFetchMore() {
438
                if (nonNull(first)) {
×
439
                    return true;
×
440
                }
441
                return nonNull(next);
×
442
            }
443
        }
444

445
        private Stream<Job> listJobs(URI flags) throws IOException {
446
            var basicData = new JobLister(
×
447
                    nonNull(flags) ? jobs.resolve(flags) : jobs);
×
448
            return basicData.stream().flatMap(Collection::stream)
×
449
                    .map(this::job);
×
450
        }
451

452
        @Override
453
        public List<Job> listJobs(boolean wait) throws IOException {
454
            return s.withRenewal(() -> listJobs(WAIT_FLAG)).collect(toList());
×
455
        }
456

457
        @Override
458
        public Stream<Job> listJobsWithDeleted(boolean wait)
459
                throws IOException {
460
            var opts = new StringBuilder("?deleted=true");
×
461
            if (wait) {
×
462
                opts.append("&wait=true");
×
463
            }
464
            return s.withRenewal(() -> listJobs(URI.create(opts.toString())));
×
465
        }
466

467
        @Override
468
        public Job createJob(CreateJob createInstructions) throws IOException {
469
            var uri = s.withRenewal(() -> {
×
470
                var conn = s.connection(jobs, true);
×
471
                writeObject(conn, createInstructions);
×
472
                // Get the response entity... and discard it
473
                try (var is = checkForError(conn, "job create failed")) {
×
474
                    readLines(is, UTF_8);
×
475
                    // But we do want the Location header
476
                    return URI.create(conn.getHeaderField("Location"));
×
477
                } finally {
478
                    s.trackCookie(conn);
×
479
                }
480
            });
481
            var job = job(uri);
×
482
            job.startKeepalive(
×
483
                    createInstructions.getKeepaliveInterval().toMillis()
×
484
                    / KEEPALIVE_DIVIDER);
485
            return job;
×
486
        }
487

488
        JobImpl job(URI uri) {
489
            return new JobImpl(this, s, asDir(uri));
×
490
        }
491

492
        @Override
493
        public List<Machine> listMachines() throws IOException {
494
            return s.withRenewal(() -> {
×
495
                var conn = s.connection(machines);
×
496
                try (var is = checkForError(conn, "list machines failed")) {
×
497
                    var ms = readJson(is, Machines.class);
×
498
                    // Assume we can cache this
499
                    for (var bmd : ms.machines) {
×
500
                        log.debug("Machine {} found", bmd.name);
×
501
                        MACHINE_MAP.put(bmd.name,
×
502
                                new MachineImpl(this, s, bmd));
503
                    }
×
504
                    return new ArrayList<Machine>(MACHINE_MAP.values());
×
505
                } finally {
506
                    s.trackCookie(conn);
×
507
                }
508
            });
509
        }
510
    }
511

512
    private static final class JobImpl extends Common implements Job {
513
        private final URI uri;
514

515
        private volatile boolean dead;
516

517
        @GuardedBy("lock")
518
        private ProxyProtocolClient proxy;
519

520
        private final Object lock = new Object();
×
521

522
        JobImpl(SpallocClient client, Session session, URI uri) {
523
            super(client, session);
×
524
            this.uri = uri;
×
525
            this.dead = false;
×
526
        }
×
527

528
        @Override
529
        public JobDescription describe(boolean wait) throws IOException {
530
            return s.withRenewal(() -> {
×
531
                var conn = wait ? s.connection(uri, WAIT_FLAG)
×
532
                        : s.connection(uri);
×
533
                try (var is = checkForError(conn, "couldn't get job state")) {
×
534
                    return readJson(is, JobDescription.class);
×
535
                } finally {
536
                    s.trackCookie(conn);
×
537
                }
538
            });
539
        }
540

541
        @Override
542
        public void keepalive() throws IOException {
543
            s.withRenewal(() -> {
×
544
                var conn = s.connection(uri, KEEPALIVE, true);
×
545
                conn.setRequestMethod("PUT");
×
546
                writeString(conn, "alive");
×
547
                try (var is = checkForError(conn, "couldn't keep job alive")) {
×
548
                    return readLines(is, UTF_8);
×
549
                    // Ignore the output
550
                } finally {
551
                    s.trackCookie(conn);
×
552
                }
553
            });
554
        }
×
555

556
        public void startKeepalive(long delayMs) {
557
            if (dead) {
×
558
                throw new IllegalStateException("job is already deleted");
×
559
            }
560
            var t = new Daemon(() -> {
×
561
                try {
562
                    while (true) {
563
                        sleep(delayMs);
×
564
                        if (dead) {
×
565
                            break;
×
566
                        }
567
                        keepalive();
×
568
                    }
569
                } catch (IOException e) {
×
570
                    log.warn("failed to keep job alive for {}", this, e);
×
571
                } catch (InterruptedException e) {
×
572
                    // If interrupted, we're simply done
573
                }
×
574
            });
×
575
            t.setName("keepalive for " + uri);
×
576
            t.setUncaughtExceptionHandler((th, e) -> {
×
577
                log.warn("unexpected exception in {}", th, e);
×
578
            });
×
579
            t.start();
×
580
        }
×
581

582
        @Override
583
        public void delete(String reason) throws IOException {
584
            dead = true;
×
585
            s.withRenewal(() -> {
×
586
                var conn = s.connection(uri, "?reason=" + encode(reason, UTF_8),
×
587
                        true);
588
                conn.setRequestMethod("DELETE");
×
589
                try (var is = checkForError(conn, "couldn't delete job")) {
×
590
                    readLines(is, UTF_8);
×
591
                    // Ignore the output
592
                } finally {
593
                    s.trackCookie(conn);
×
594
                }
595
                return this;
×
596
            });
597
            synchronized (lock) {
×
598
                if (haveProxy()) {
×
599
                    proxy.close();
×
600
                    proxy = null;
×
601
                }
602
            }
×
603
        }
×
604

605
        @Override
606
        public AllocatedMachine machine() throws IOException {
607
            var am = s.withRenewal(() -> {
×
608
                var conn = s.connection(uri, MACHINE);
×
609
                try (var is = checkForError(conn,
×
610
                        "couldn't get allocation description")) {
611
                    if (conn.getResponseCode() == HTTP_NO_CONTENT) {
×
612
                        throw new IOException("machine not allocated");
×
613
                    }
614
                    return readJson(is, AllocatedMachine.class);
×
615
                } finally {
616
                    s.trackCookie(conn);
×
617
                }
618
            });
619
            am.setMachine(getMachine(am.getMachineName()));
×
620
            return am;
×
621
        }
622

623
        @Override
624
        public boolean getPower() throws IOException {
625
            return s.withRenewal(() -> {
×
626
                var conn = s.connection(uri, POWER);
×
627
                try (var is = checkForError(conn, "couldn't get power state")) {
×
628
                    if (conn.getResponseCode() == HTTP_NO_CONTENT) {
×
629
                        throw new IOException("machine not allocated");
×
630
                    }
631
                    return "ON".equals(readJson(is, Power.class).power);
×
632
                } finally {
633
                    s.trackCookie(conn);
×
634
                }
635
            });
636
        }
637

638
        @Override
639
        public boolean setPower(boolean switchOn) throws IOException {
640
            var power = new Power();
×
641
            power.power = (switchOn ? "ON" : "OFF");
×
642
            boolean powered = s.withRenewal(() -> {
×
643
                var conn = s.connection(uri, POWER, true);
×
644
                conn.setRequestMethod("PUT");
×
645
                writeObject(conn, power);
×
646
                try (var is = checkForError(conn, "couldn't set power state")) {
×
647
                    if (conn.getResponseCode() == HTTP_NO_CONTENT) {
×
648
                        throw new IOException("machine not allocated");
×
649
                    }
650
                    return "ON".equals(readJson(is, Power.class).power);
×
651
                } finally {
652
                    s.trackCookie(conn);
×
653
                }
654
            });
655
            if (!powered) {
×
656
                // If someone turns the power off, close the proxy
657
                synchronized (lock) {
×
658
                    if (haveProxy()) {
×
659
                        proxy.close();
×
660
                        proxy = null;
×
661
                    }
662
                }
×
663
            }
664
            return powered;
×
665
        }
666

667
        @Override
668
        public WhereIs whereIs(HasChipLocation chip) throws IOException {
669
            return whereis(uri.resolve(
×
670
                    format("chip?x=%d&y=%d", chip.getX(), chip.getY())));
×
671
        }
672

673
        @GuardedBy("lock")
674
        private boolean haveProxy() {
675
            return nonNull(proxy) && proxy.isOpen();
×
676
        }
677

678
        /**
679
         * @return The websocket-based proxy.
680
         * @throws IOException
681
         *             if we can't connect
682
         * @throws InterruptedException
683
         *             if we're interrupted while connecting
684
         */
685
        private ProxyProtocolClient getProxy()
686
                throws IOException, InterruptedException {
687
            synchronized (lock) {
×
688
                if (haveProxy()) {
×
689
                    return proxy;
×
690
                }
691
            }
×
692
            var wssAddr = describe(false).getProxyAddress();
×
693
            if (isNull(wssAddr)) {
×
694
                throw new IOException("machine not allocated");
×
695
            }
696
            synchronized (lock) {
×
697
                if (!haveProxy()) {
×
698
                    proxy = s.withRenewal(() -> s.websocket(wssAddr));
×
699
                }
700
                return proxy;
×
701
            }
702
        }
703

704
        @MustBeClosed
705
        @Override
706
        public TransceiverInterface getTransceiver()
707
                throws IOException, InterruptedException, SpinnmanException {
708
            var ws = getProxy();
×
709
            return new ProxiedTransceiver(this, ws);
×
710
        }
711

712
        @Override
713
        public String toString() {
714
            return "Job(" + uri + ")";
×
715
        }
716

717
        @Override
718
        public void writeMemory(HasChipLocation chip,
719
                MemoryLocation baseAddress, ByteBuffer data)
720
                throws IOException {
721
            try {
722
                s.withRenewal(() -> {
×
723
                    var conn = s.connection(uri,
×
724
                            new URI(MEMORY + "?x=" + chip.getX()
×
725
                                    + "&y=" + chip.getY()
×
726
                                    + "&address="
727
                                    + toUnsignedString(baseAddress.address)),
×
728
                            true);
729
                    conn.setDoOutput(true);
×
730
                    conn.setRequestMethod("POST");
×
731
                    conn.setRequestProperty(
×
732
                            "Content-Type", "application/octet-stream");
733
                    try (var os = conn.getOutputStream();
×
734
                         var channel = Channels.newChannel(os)) {
×
735
                        channel.write(data);
×
736
                    }
737
                    try (var is = checkForError(conn, "couldn't write memory")) {
×
738
                        // Do Nothing
739
                    }
×
740
                    return null;
×
741
                });
742
            } catch (URISyntaxException e) {
×
743
                throw new IOException(e);
×
744
            }
×
745
        }
×
746

747
        @Override
748
        public ByteBuffer readMemory(HasChipLocation chip,
749
                MemoryLocation baseAddress, int length)
750
                throws IOException {
751
            try {
752
                return s.withRenewal(() -> {
×
753
                    var conn = s.connection(uri,
×
754
                            new URI(MEMORY + "?x=" + chip.getX()
×
755
                                    + "&y=" + chip.getY()
×
756
                                    + "&address="
757
                                    + toUnsignedString(baseAddress.address)
×
758
                                    + "&size=" + length));
759
                    conn.setRequestMethod("GET");
×
760
                    conn.setRequestProperty(
×
761
                            "Accept", "application/octet-stream");
762
                    try (var is = checkForError(conn, "couldn't read memory")) {
×
763
                        var buffer = ByteBuffer.allocate(length);
×
764
                        var channel = Channels.newChannel(is);
×
765
                        IOUtils.readFully(channel, buffer);
×
766
                        buffer.rewind();
×
767
                        return buffer.asReadOnlyBuffer().order(LITTLE_ENDIAN);
×
768
                    }
769
                });
770
            } catch (URISyntaxException e) {
×
771
                throw new IOException(e);
×
772
            }
773
        }
774

775
        @Override
776
        public void fastWriteData(CoreLocation gathererCore,
777
                ChipLocation ethernetChip, String ethernetAddress,
778
                IPTag iptag, HasChipLocation chip, MemoryLocation baseAddress,
779
                ByteBuffer data) throws IOException {
780
            try {
NEW
781
                s.withRenewal(() -> {
×
NEW
782
                    var conn = s.connection(uri,
×
783
                            new URI(FAST_DATA_WRITE
NEW
784
                                    + "?gather_x=" + gathererCore.getX()
×
NEW
785
                                    + "&gather_y=" + gathererCore.getY()
×
NEW
786
                                    + "&gather_p=" + gathererCore.getP()
×
NEW
787
                                    + "&eth_x=" + ethernetChip.getX()
×
NEW
788
                                    + "&eth_y=" + ethernetChip.getY()
×
789
                                    + "&eth_address=" + ethernetAddress
NEW
790
                                    + "&iptag=" + iptag.getTag()
×
NEW
791
                                    + "&x=" + chip.getX()
×
NEW
792
                                    + "&y=" + chip.getY()
×
793
                                    + "&address="
NEW
794
                                    + toUnsignedString(baseAddress.address)),
×
795
                            true);
NEW
796
                    conn.setDoOutput(true);
×
NEW
797
                    conn.setRequestMethod("POST");
×
NEW
798
                    conn.setRequestProperty(
×
799
                            "Content-Type", "application/octet-stream");
NEW
800
                    try (var os = conn.getOutputStream();
×
NEW
801
                         var channel = Channels.newChannel(os)) {
×
NEW
802
                        channel.write(data);
×
803
                    }
NEW
804
                    try (var is = checkForError(conn,
×
805
                            "couldn't fast write memory")) {
806
                        // Do Nothing
NEW
807
                    }
×
NEW
808
                    return null;
×
809
                });
NEW
810
            } catch (URISyntaxException e) {
×
NEW
811
                throw new IOException(e);
×
NEW
812
            }
×
NEW
813
        }
×
814
    }
815

816
    private static final class MachineImpl extends Common implements Machine {
817
        private static final int TRIAD = 3;
818

819
        private final BriefMachineDescription bmd;
820

821
        private List<BoardCoords> deadBoards;
822

823
        private List<DeadLink> deadLinks;
824

825
        MachineImpl(SpallocClient client, Session session,
826
                BriefMachineDescription bmd) {
827
            super(client, session);
×
828
            this.bmd = bmd;
×
829
            this.deadBoards = List.copyOf(bmd.deadBoards);
×
830
            this.deadLinks = List.copyOf(bmd.deadLinks);
×
831
        }
×
832

833
        @Override
834
        public String getName() {
835
            return bmd.name;
×
836
        }
837

838
        @Override
839
        public List<String> getTags() {
840
            return bmd.tags;
×
841
        }
842

843
        @Override
844
        public int getWidth() {
845
            return bmd.width;
×
846
        }
847

848
        @Override
849
        public int getHeight() {
850
            return bmd.height;
×
851
        }
852

853
        @Override
854
        public int getLiveBoardCount() {
855
            return bmd.width * bmd.height * TRIAD - bmd.deadBoards.size();
×
856
        }
857

858
        @Override
859
        public List<BoardCoords> getDeadBoards() {
860
            return deadBoards;
×
861
        }
862

863
        @Override
864
        public List<DeadLink> getDeadLinks() {
865
            return deadLinks;
×
866
        }
867

868
        @Override
869
        public void waitForChange() throws IOException {
870
            var nbmd = s.withRenewal(() -> {
×
871
                var conn = s.connection(bmd.uri, WAIT_FLAG);
×
872
                try (var is = checkForError(conn,
×
873
                        "couldn't wait for state change")) {
874
                    return readJson(is, BriefMachineDescription.class);
×
875
                } finally {
876
                    s.trackCookie(conn);
×
877
                }
878
            });
879
            this.deadBoards = List.copyOf(nbmd.deadBoards);
×
880
            this.deadLinks = List.copyOf(nbmd.deadLinks);
×
881
        }
×
882

883
        @Override
884
        public WhereIs getBoard(TriadCoords coords) throws IOException {
885
            return whereis(
×
886
                    bmd.uri.resolve(format("logical-board?x=%d&y=%d&z=%d",
×
887
                            coords.x, coords.y, coords.z)));
×
888
        }
889

890
        @Override
891
        public WhereIs getBoard(PhysicalCoords coords) throws IOException {
892
            return whereis(bmd.uri.resolve(
×
893
                    format("physical-board?cabinet=%d&frame=%d&board=%d",
×
894
                            coords.c, coords.f, coords.b)));
×
895
        }
896

897
        @Override
898
        public WhereIs getBoard(HasChipLocation chip) throws IOException {
899
            return whereis(bmd.uri.resolve(
×
900
                    format("chip?x=%d&y=%d", chip.getX(), chip.getY())));
×
901
        }
902

903
        @Override
904
        public WhereIs getBoard(String address) throws IOException {
905
            return whereis(bmd.uri.resolve(
×
906
                    format("board-ip?address=%s", encode(address, UTF_8))));
×
907
        }
908
    }
909
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc