• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 6233274834

19 Sep 2023 08:46AM UTC coverage: 36.409% (-0.6%) from 36.982%
6233274834

Pull #658

github

dkfellows
Merge branch 'master' into java-17
Pull Request #658: Update Java version to 17

1656 of 1656 new or added lines in 260 files covered. (100.0%)

8373 of 22997 relevant lines covered (36.41%)

0.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.83
/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/spalloc/SpallocJob.java
1
/*
2
 * Copyright (c) 2018 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.spalloc;
17

18
import static java.lang.Math.max;
19
import static java.lang.Math.min;
20
import static java.lang.String.format;
21
import static java.lang.System.currentTimeMillis;
22
import static java.lang.Thread.interrupted;
23
import static java.lang.Thread.sleep;
24
import static java.util.Objects.nonNull;
25
import static java.util.Objects.requireNonNull;
26
import static org.slf4j.LoggerFactory.getLogger;
27
import static uk.ac.manchester.spinnaker.machine.ChipLocation.ZERO_ZERO;
28
import static uk.ac.manchester.spinnaker.spalloc.JobConstants.RECONNECT_DELAY_DEFAULT;
29
import static uk.ac.manchester.spinnaker.spalloc.Utils.makeTimeout;
30
import static uk.ac.manchester.spinnaker.spalloc.Utils.timeLeft;
31
import static uk.ac.manchester.spinnaker.spalloc.Utils.timedOut;
32
import static uk.ac.manchester.spinnaker.spalloc.messages.State.QUEUED;
33
import static uk.ac.manchester.spinnaker.utils.UnitConstants.MSEC_PER_SEC;
34

35
import java.io.IOException;
36
import java.util.List;
37

38
import org.slf4j.Logger;
39

40
import com.google.errorprone.annotations.MustBeClosed;
41

42
import uk.ac.manchester.spinnaker.machine.HasChipLocation;
43
import uk.ac.manchester.spinnaker.machine.MachineDimensions;
44
import uk.ac.manchester.spinnaker.messages.model.Version;
45
import uk.ac.manchester.spinnaker.spalloc.exceptions.JobDestroyedException;
46
import uk.ac.manchester.spinnaker.spalloc.exceptions.SpallocProtocolTimeoutException;
47
import uk.ac.manchester.spinnaker.spalloc.exceptions.SpallocServerException;
48
import uk.ac.manchester.spinnaker.spalloc.exceptions.SpallocStateChangeTimeoutException;
49
import uk.ac.manchester.spinnaker.spalloc.messages.BoardCoordinates;
50
import uk.ac.manchester.spinnaker.spalloc.messages.BoardPhysicalCoordinates;
51
import uk.ac.manchester.spinnaker.spalloc.messages.Connection;
52
import uk.ac.manchester.spinnaker.spalloc.messages.JobMachineInfo;
53
import uk.ac.manchester.spinnaker.spalloc.messages.JobState;
54
import uk.ac.manchester.spinnaker.spalloc.messages.State;
55
import uk.ac.manchester.spinnaker.utils.Daemon;
56

57
/**
58
 * A high-level interface for requesting and managing allocations of SpiNNaker
59
 * boards.
60
 * <p>
61
 * Constructing a {@link SpallocJob} object connects to a <a href=
62
 * "https://github.com/SpiNNakerManchester/spalloc_server">spalloc-server</a>
63
 * and requests a number of SpiNNaker boards. The job object may then be used to
64
 * monitor the state of the request, control the boards allocated and determine
65
 * their IP addresses.
66
 * <p>
67
 * In its simplest form, a {@link SpallocJob} can be used as a context manager
68
 * like so:
69
 *
70
 * <pre>
71
 * try (var j = new SpallocJob(new CreateJob(6).owner(me))) {
72
 *     myApplication.boot(j.getHostname(), j.getDimensions());
73
 *     myApplication.run(j.getHostname());
74
 * }
75
 * </pre>
76
 *
77
 * In this example a six-board machine is requested and the
78
 * {@code try}-with-resources context is entered once the allocation has been
79
 * made and the allocated boards are fully powered on. When control leaves the
80
 * block, the job is destroyed and the boards shut down by the server ready for
81
 * another job.
82
 * <p>
83
 * For more fine-grained control, the same functionality is available via
84
 * various methods:
85
 *
86
 * <pre>
87
 * var j = new SpallocJob(new CreateJob(6).owner(me)));
88
 * j.waitUntilReady();
89
 * myApplication.boot(j.getHostname(), j.getDimensions());
90
 * myApplication.run(j.getHostname());
91
 * j.destroy();
92
 * </pre>
93
 *
94
 * <b>Note:</b> <blockquote class="note"> More complex applications may wish to
95
 * log the following properties of their job to support later debugging efforts:
96
 * <ul>
97
 * <li>{@code ID} &mdash; May be used to query the state of the job and find out
98
 * its fate if cancelled or destroyed. The <i>spalloc-job</i> command can be
99
 * used to discover the state/fate of the job and <i>spalloc-where-is</i> may be
100
 * used to find out what boards problem chips reside on.
101
 * <li>{@code machineName} and {@code boards} together give a complete record of
102
 * the hardware used by the job. The <i>spalloc-where-is</i> command may be used
103
 * to find out the physical locations of the boards used.
104
 * </ul>
105
 * </blockquote>
106
 *
107
 * @see CreateJob {@code CreateJob}: How to describe the job to create.
108
 */
109
public class SpallocJob implements AutoCloseable, SpallocJobAPI {
110
        private static final Logger log = getLogger(SpallocJob.class);
1✔
111

112
        /** Minimum supported server version. */
113
        private static final Version MIN_VER = new Version(0, 4, 0);
1✔
114

115
        /** Maximum supported server version. */
116
        private static final Version MAX_VER = new Version(2, 0, 0);
1✔
117

118
        private static final int STATUS_CACHE_PERIOD = 500;
119

120
        private SpallocClient client;
121

122
        private int id;
123

124
        private Integer timeout;
125

126
        private Integer keepaliveTime;
127

128
        /** The keepalive thread. */
129
        private Thread keepalive;
130

131
        /** Used to signal that the keepalive thread should stop. */
132
        private volatile boolean stopping;
133

134
        /**
135
         * Cache of information about a job's state. This information can change,
136
         * but not usually extremely rapidly; it has a caching period, implemented
137
         * using the {@link #statusTimestamp} field.
138
         */
139
        private JobState statusCache;
140

141
        /**
142
         * The time when the information in {@link #statusCache} was last collected.
143
         */
144
        private long statusTimestamp;
145

146
        /**
147
         * Cache of information about a machine. This is information which doesn't
148
         * change once it is assigned, so there is no expiry mechanism.
149
         */
150
        private JobMachineInfo machine;
151

152
        /** The status cache period, in ms. Non-constant for tests. */
153
        int statusCachePeriod = STATUS_CACHE_PERIOD;
1✔
154

155
        private int reconnectDelay = f2ms(RECONNECT_DELAY_DEFAULT);
1✔
156

157
        private static final ThreadGroup SPALLOC_WORKERS =
1✔
158
                        new ThreadGroup("spalloc worker threads");
159

160
        private static Configuration config;
161

162
        /**
163
         * Set up where configuration settings come from. By default, this is from a
164
         * file called {@value #DEFAULT_CONFIGURATION_SOURCE}; this method allows
165
         * you to override that (e.g., for testing).
166
         *
167
         * @param filename
168
         *            the base filename (without a path) to load the configuration
169
         *            from. This is expected to be a {@code .ini} file.
170
         * @see Configuration
171
         */
172
        public static void setConfigurationSource(String filename) {
173
                config = new Configuration(filename);
1✔
174
        }
1✔
175

176
        /**
177
         * The name of the default file to load the configuration from.
178
         */
179
        public static final String DEFAULT_CONFIGURATION_SOURCE = "spalloc.ini";
180

181
        static {
182
                setConfigurationSource(DEFAULT_CONFIGURATION_SOURCE);
1✔
183
        }
1✔
184

185
        /**
186
         * Create a spalloc job that requests a SpiNNaker machine.
187
         *
188
         * @param hostname
189
         *            The spalloc server host
190
         * @param timeout
191
         *            The communications timeout
192
         * @param builder
193
         *            The job-creation request builder.
194
         * @throws IOException
195
         *             If communications fail.
196
         * @throws SpallocServerException
197
         *             If the spalloc server rejects the operation request.
198
         * @throws InterruptedException
199
         *             If interrupted while waiting.
200
         */
201
        @MustBeClosed
202
        public SpallocJob(String hostname, Integer timeout, CreateJob builder)
203
                        throws IOException, SpallocServerException, InterruptedException {
204
                this(hostname, config.getPort(), timeout, builder);
×
205
        }
×
206

207
        /**
208
         * Create a spalloc job that requests a SpiNNaker machine.
209
         *
210
         * @param hostname
211
         *            The spalloc server host
212
         * @param builder
213
         *            The job-creation request builder.
214
         * @throws IOException
215
         *             If communications fail.
216
         * @throws SpallocServerException
217
         *             If the spalloc server rejects the operation request.
218
         * @throws InterruptedException
219
         *             If interrupted while waiting.
220
         */
221
        @MustBeClosed
222
        public SpallocJob(String hostname, CreateJob builder)
223
                        throws IOException, SpallocServerException, InterruptedException {
224
                this(hostname, config.getPort(), f2ms(config.getTimeout()), builder);
×
225
        }
×
226

227
        /**
228
         * Create a spalloc job that requests a SpiNNaker machine.
229
         *
230
         * @param builder
231
         *            The job-creation request builder.
232
         * @throws IOException
233
         *             If communications fail.
234
         * @throws SpallocServerException
235
         *             If the spalloc server rejects the operation request.
236
         * @throws InterruptedException
237
         *             If interrupted while waiting.
238
         */
239
        @MustBeClosed
240
        public SpallocJob(CreateJob builder)
241
                        throws IOException, SpallocServerException, InterruptedException {
242
                this(config.getHost(), config.getPort(), f2ms(config.getTimeout()),
×
243
                                builder);
244
        }
×
245

246
        /**
247
         * Create a spalloc job that requests a SpiNNaker machine.
248
         *
249
         * @param client
250
         *            The spalloc client
251
         * @param builder
252
         *            The job-creation request builder.
253
         * @throws IOException
254
         *             If communications fail.
255
         * @throws SpallocServerException
256
         *             If the spalloc server rejects the operation request.
257
         * @throws IllegalArgumentException
258
         *             If a bad builder is given.
259
         * @throws InterruptedException
260
         *             If interrupted while waiting.
261
         */
262
        public SpallocJob(SpallocClient client, CreateJob builder)
263
                        throws IOException, SpallocServerException, InterruptedException {
264
                this(client, f2ms(config.getTimeout()), builder);
1✔
265
        }
1✔
266

267
        private static void validateBuilder(CreateJob builder) {
268
                if (requireNonNull(builder, "a builder must be specified")
1✔
269
                                .isTargetDefined()) {
1✔
270
                        return;
1✔
271
                }
272
                var machine = config.getMachine();
×
273
                var tags = config.getTags();
×
274
                if (nonNull(machine)) {
×
275
                        builder.machine(machine);
×
276
                } else if (nonNull(tags)) {
×
277
                        builder.tags(tags);
×
278
                } else {
279
                        throw new IllegalArgumentException(
×
280
                                        "must have either machine or tags specified or able "
281
                                                        + "to be looked up from the configuration");
282
                }
283
        }
×
284

285
        /**
286
         * Create a spalloc job that requests a SpiNNaker machine.
287
         *
288
         * @param client
289
         *            The spalloc client
290
         * @param timeout
291
         *            The communications timeout
292
         * @param builder
293
         *            The job-creation request builder.
294
         * @throws IOException
295
         *             If communications fail.
296
         * @throws SpallocServerException
297
         *             If the spalloc server rejects the operation request.
298
         * @throws InterruptedException
299
         *             If interrupted while waiting.
300
         */
301
        public SpallocJob(SpallocClient client, Integer timeout, CreateJob builder)
302
                        throws IOException, SpallocServerException, InterruptedException {
1✔
303
                validateBuilder(builder);
1✔
304
                this.client = client;
1✔
305
                this.timeout = timeout;
1✔
306
                client.connect();
1✔
307
                reconnectDelay = f2ms(config.getReconnectDelay());
1✔
308
                id = client.createJob(builder, timeout);
1✔
309
                /*
310
                 * We also need the keepalive configuration so we know when to send
311
                 * keepalive messages.
312
                 */
313
                keepaliveTime = f2ms(builder.getKeepAlive());
1✔
314
                log.info("created spalloc job with ID: {}", id);
1✔
315
                launchKeepaliveDaemon();
1✔
316
        }
1✔
317

318
        /**
319
         * Create a spalloc job that requests a SpiNNaker machine.
320
         *
321
         * @param hostname
322
         *            The spalloc server host
323
         * @param port
324
         *            The spalloc server port
325
         * @param timeout
326
         *            The communications timeout
327
         * @param builder
328
         *            The job-creation request builder.
329
         * @throws IOException
330
         *             If communications fail.
331
         * @throws SpallocServerException
332
         *             If the spalloc server rejects the operation request.
333
         * @throws InterruptedException
334
         *             If interrupted while waiting.
335
         * @throws IllegalArgumentException
336
         *             If a bad builder is given.
337
         */
338
        @MustBeClosed
339
        @SuppressWarnings("MustBeClosed")
340
        public SpallocJob(String hostname, Integer port, Integer timeout,
341
                        CreateJob builder)
342
                        throws IOException, SpallocServerException, InterruptedException {
×
343
                validateBuilder(builder);
×
344
                this.client = new SpallocClient(hostname, port, timeout);
×
345
                this.timeout = timeout;
×
346
                client.connect();
×
347
                reconnectDelay = f2ms(config.getReconnectDelay());
×
348
                id = client.createJob(builder, timeout);
×
349
                /*
350
                 * We also need the keepalive configuration so we know when to send
351
                 * keepalive messages.
352
                 */
353
                keepaliveTime = f2ms(builder.getKeepAlive());
×
354
                log.info("created spalloc job with ID: {}", id);
×
355
                launchKeepaliveDaemon();
×
356
        }
×
357

358
        /**
359
         * Create a job client that resumes an existing job given its ID.
360
         *
361
         * @param id
362
         *            The job ID
363
         * @throws IOException
364
         *             If communications fail.
365
         * @throws SpallocServerException
366
         *             If the spalloc server rejects the operation request.
367
         * @throws JobDestroyedException
368
         *             If the job doesn't exist (any more).
369
         * @throws InterruptedException
370
         *             If interrupted while waiting.
371
         */
372
        @MustBeClosed
373
        public SpallocJob(int id) throws IOException, SpallocServerException,
374
                        JobDestroyedException, InterruptedException {
375
                this(config.getHost(), config.getPort(), f2ms(config.getTimeout()), id);
×
376
        }
×
377

378
        /**
379
         * Create a job client that resumes an existing job given its ID.
380
         *
381
         * @param hostname
382
         *            The spalloc server host
383
         * @param id
384
         *            The job ID
385
         * @throws IOException
386
         *             If communications fail.
387
         * @throws SpallocServerException
388
         *             If the spalloc server rejects the operation request.
389
         * @throws JobDestroyedException
390
         *             If the job doesn't exist (any more).
391
         * @throws InterruptedException
392
         *             If interrupted while waiting.
393
         */
394
        @MustBeClosed
395
        public SpallocJob(String hostname, int id)
396
                        throws IOException, SpallocServerException, JobDestroyedException,
397
                        InterruptedException {
398
                this(hostname, config.getPort(), f2ms(config.getTimeout()), id);
×
399
        }
×
400

401
        /**
402
         * Create a job client that resumes an existing job given its ID.
403
         *
404
         * @param hostname
405
         *            The spalloc server host
406
         * @param timeout
407
         *            The communications timeout
408
         * @param id
409
         *            The job ID
410
         * @throws IOException
411
         *             If communications fail.
412
         * @throws SpallocServerException
413
         *             If the spalloc server rejects the operation request.
414
         * @throws JobDestroyedException
415
         *             If the job doesn't exist (any more).
416
         * @throws InterruptedException
417
         *             If interrupted while waiting.
418
         */
419
        @MustBeClosed
420
        public SpallocJob(String hostname, Integer timeout, int id)
421
                        throws IOException, SpallocServerException, JobDestroyedException,
422
                        InterruptedException {
423
                this(hostname, config.getPort(), timeout, id);
×
424
        }
×
425

426
        /**
427
         * Converts a "float" number of seconds to milliseconds.
428
         *
429
         * @param obj
430
         *            The number of seconds as a {@link Number} but up-casted to an
431
         *            object for convenience.
432
         * @return The number of milliseconds, suitable for use with Java timing
433
         *         operations.
434
         */
435
        private static Integer f2ms(Object obj) {
436
                if (obj == null) {
1✔
437
                        return null;
×
438
                }
439
                return (int) (((Number) obj).doubleValue() * MSEC_PER_SEC);
1✔
440
        }
441

442
        /**
443
         * Create a job client that resumes an existing job given its ID.
444
         *
445
         * @param hostname
446
         *            The spalloc server host
447
         * @param port
448
         *            The TCP port
449
         * @param timeout
450
         *            The communications timeout
451
         * @param id
452
         *            The job ID
453
         * @throws IOException
454
         *             If communications fail.
455
         * @throws SpallocServerException
456
         *             If the spalloc server rejects the operation request.
457
         * @throws JobDestroyedException
458
         *             If the job doesn't exist (any more).
459
         * @throws InterruptedException
460
         *             If interrupted while waiting.
461
         */
462
        @MustBeClosed
463
        @SuppressWarnings("MustBeClosed")
464
        public SpallocJob(String hostname, int port, Integer timeout, int id)
465
                        throws IOException, SpallocServerException, JobDestroyedException,
466
                        InterruptedException {
×
467
                client = new SpallocClient(hostname, port, timeout);
×
468
                this.timeout = timeout;
×
469
                this.id = id;
×
470
                client.connect();
×
471
                reconnectDelay = f2ms(config.getReconnectDelay());
×
472
                /*
473
                 * If the job no longer exists, we can't get the keepalive interval (and
474
                 * there's nothing to keepalive) so just bail out.
475
                 */
476
                var jobState = getStatus();
×
477
                switch (jobState.getState()) {
×
478
                case DESTROYED:
479
                        if (nonNull(jobState.getReason())) {
×
480
                                throw new JobDestroyedException(format(
×
481
                                                "SpallocJob %d does not exist: %s: %s", id,
×
482
                                                jobState.getState().name(), jobState.getReason()));
×
483
                        }
484
                        // fall through
485
                case UNKNOWN:
486
                        throw new JobDestroyedException(
×
487
                                        format("SpallocJob %d does not exist: %s", id,
×
488
                                                        jobState.getState().name()));
×
489
                default:
490
                        // do nothing
491
                }
492
                // Snag the keepalive interval from the job
493
                keepaliveTime = f2ms(jobState.getKeepalive());
×
494
                log.info("resumed spalloc job with ID: {}", id);
×
495
                launchKeepaliveDaemon();
×
496
        }
×
497

498
        private void launchKeepaliveDaemon() {
499
                log.info("launching keepalive thread for {} with interval {}ms", id,
1✔
500
                                keepaliveTime / 2);
1✔
501
                if (keepalive != null) {
1✔
502
                        log.warn("launching second keepalive thread for {}", id);
×
503
                }
504
                stopping = false;
1✔
505
                keepalive = new Daemon(SPALLOC_WORKERS, this::keepalive,
1✔
506
                                "keepalive for spalloc job " + id);
507
                keepalive.setUncaughtExceptionHandler((th, e) -> {
1✔
508
                        log.warn("unexpected exception in {}", th, e);
×
509
                });
×
510
                keepalive.start();
1✔
511
        }
1✔
512

513
        private void keepalive() {
514
                try {
515
                        while (!stopping) {
1✔
516
                                client.jobKeepAlive(id, timeout);
1✔
517
                                if (!interrupted()) {
1✔
518
                                        sleep(keepaliveTime / 2);
1✔
519
                                }
520
                        }
521
                } catch (IOException | SpallocServerException e) {
×
522
                        log.debug("exception in keepalive; terminating", e);
×
523
                } catch (InterruptedException e) {
1✔
524
                        log.trace("interrupted in keepalive", e);
1✔
525
                }
×
526
        }
1✔
527

528
        @Override
529
        public void close() throws IOException {
530
                stopping = true;
1✔
531
                keepalive.interrupt();
1✔
532
                client.close();
1✔
533
        }
1✔
534

535
        /**
536
         * Reconnect to the server and check version.
537
         * <p>
538
         * If reconnection fails, the error is reported as a warning but no
539
         * exception is raised.
540
         *
541
         * @throws InterruptedException
542
         *             If interrupted while waiting.
543
         */
544
        private void reconnect() throws InterruptedException {
545
                try {
546
                        client.connect(timeout);
×
547
                        assertCompatibleVersion();
×
548
                        log.info("Reconnected to spalloc server successfully");
×
549
                } catch (SpallocServerException | IOException e) {
×
550
                        /*
551
                         * Connect/version command failed... Leave the socket clearly broken
552
                         * so that we retry again
553
                         */
554
                        log.warn("Spalloc server is unreachable ({}), will keep trying...",
×
555
                                        e.getMessage());
×
556
                        try {
557
                                client.close();
×
558
                        } catch (IOException inner) {
×
559
                                // close failed?! Nothing we can do but log and try later
560
                                log.error("problem closing connection", inner);
×
561
                        }
×
562
                }
×
563
        }
×
564

565
        /**
566
         * Assert that the server version is compatible. This client supports from
567
         * 0.4.0 to 2.0.0 (but not including the latter).
568
         *
569
         * @throws IOException
570
         *             If communications fail.
571
         * @throws SpallocServerException
572
         *             If the spalloc server rejects the operation request.
573
         * @throws InterruptedException
574
         *             If interrupted while waiting.
575
         * @throws IllegalStateException
576
         *             If the server is not compatible with this client.
577
         */
578
        protected void assertCompatibleVersion()
579
                        throws IOException, SpallocServerException, InterruptedException {
580
                var v = client.version(timeout);
×
581
                if (MIN_VER.compareTo(v) <= 0 && MAX_VER.compareTo(v) > 0) {
×
582
                        return;
×
583
                }
584
                client.close();
×
585
                throw new IllegalStateException(
×
586
                                "Server version " + v + " is not compatible with this client");
587
        }
588

589
        @Override
590
        public void destroy(String reason)
591
                        throws IOException, SpallocServerException, InterruptedException {
592
                try {
593
                        stopping = true; // Don't need a keepalive any more
1✔
594
                        client.destroyJob(id, reason, timeout);
1✔
595
                } finally {
596
                        close();
1✔
597
                }
598
                purgeStatus();
1✔
599
        }
1✔
600

601
        @Override
602
        public void setPower(Boolean powerOn)
603
                        throws IOException, SpallocServerException, InterruptedException {
604
                if (powerOn == null) {
1✔
605
                        return;
×
606
                }
607
                if (powerOn) {
1✔
608
                        client.powerOnJobBoards(id, timeout);
1✔
609
                } else {
610
                        client.powerOffJobBoards(id, timeout);
×
611
                }
612
                purgeStatus();
1✔
613
        }
1✔
614

615
        @Override
616
        public int getID() {
617
                return id;
1✔
618
        }
619

620
        private JobState getStatus()
621
                        throws IOException, SpallocServerException, InterruptedException {
622
                if (statusCache == null || statusTimestamp < currentTimeMillis()
1✔
623
                                - statusCachePeriod) {
624
                        statusCache = client.getJobState(id, timeout);
1✔
625
                        statusTimestamp = currentTimeMillis();
1✔
626
                }
627
                return statusCache;
1✔
628
        }
629

630
        private void purgeStatus() {
631
                statusCache = null;
1✔
632
        }
1✔
633

634
        @Override
635
        public State getState()
636
                        throws IOException, SpallocServerException, InterruptedException {
637
                return getStatus().getState();
1✔
638
        }
639

640
        @Override
641
        public Boolean getPower()
642
                        throws IOException, SpallocServerException, InterruptedException {
643
                return getStatus().getPower();
1✔
644
        }
645

646
        @Override
647
        public String getDestroyReason()
648
                        throws IOException, SpallocServerException, InterruptedException {
649
                return getStatus().getReason();
×
650
        }
651

652
        private void retrieveMachineInfo()
653
                        throws IOException, SpallocServerException, IllegalStateException,
654
                        InterruptedException {
655
                /*
656
                 * Check the job is still not QUEUED as then machine info is all nulls
657
                 * getJobMachineInfo works if the Job is in State.POWER
658
                 */
659
                switch (getState()) {
1✔
660
                case DESTROYED:
661
                        // Nothing to do; the job's dead
662
                        return;
×
663
                case UNKNOWN: // Shouldn't be possible, but recheck...
664
                case QUEUED:
665
                        // Double check very latest state.
666
                        purgeStatus();
×
667
                        if (getState() == QUEUED) {
×
668
                                throw new IllegalStateException(
×
669
                                                "Job not Ready. Call waitUntilReady first.");
670
                        }
671
                        // fall through
672
                default:
673
                        machine = client.getJobMachineInfo(id, timeout);
1✔
674
                }
675

676
        }
1✔
677

678
        private boolean isMachineInfoInvalid() {
679
                return machine == null || machine.getWidth() == 0;
1✔
680
        }
681

682
        @Override
683
        public List<Connection> getConnections()
684
                        throws IOException, SpallocServerException, InterruptedException {
685
                if (isMachineInfoInvalid()) {
×
686
                        retrieveMachineInfo();
×
687
                }
688
                if (isMachineInfoInvalid()) {
×
689
                        return null;
×
690
                }
691
                return machine.getConnections();
×
692
        }
693

694
        @Override
695
        public String getHostname()
696
                        throws IOException, SpallocServerException, InterruptedException {
697
                for (Connection c : getConnections()) {
×
698
                        if (c.chip().onSameChipAs(ZERO_ZERO)) {
×
699
                                return c.hostname();
×
700
                        }
701
                }
×
702
                return null;
×
703
        }
704

705
        @Override
706
        public MachineDimensions getDimensions()
707
                        throws IOException, SpallocServerException,
708
                        InterruptedException {
709
                if (isMachineInfoInvalid()) {
×
710
                        retrieveMachineInfo();
×
711
                }
712
                if (isMachineInfoInvalid()) {
×
713
                        return null;
×
714
                }
715
                return new MachineDimensions(machine.getWidth(), machine.getHeight());
×
716
        }
717

718
        @Override
719
        public String getMachineName() throws IOException, SpallocServerException,
720
                        InterruptedException {
721
                if (isMachineInfoInvalid()) {
×
722
                        retrieveMachineInfo();
×
723
                }
724
                if (isMachineInfoInvalid()) {
×
725
                        return null;
×
726
                }
727
                return machine.getMachineName();
×
728
        }
729

730
        @Override
731
        public List<BoardCoordinates> getBoards()
732
                        throws IOException, SpallocServerException, IllegalStateException,
733
                        InterruptedException {
734
                if (isMachineInfoInvalid()) {
1✔
735
                        retrieveMachineInfo();
1✔
736
                }
737
                if (isMachineInfoInvalid()) {
1✔
738
                        return null;
×
739
                }
740
                return machine.getBoards();
1✔
741
        }
742

743
        @Override
744
        public State waitForStateChange(State oldState, Integer timeout)
745
                        throws SpallocServerException, InterruptedException {
746
                var finishTime = makeTimeout(timeout);
1✔
747

748
                // We may get disconnected while waiting so keep listening...
749
                while (!timedOut(finishTime)) {
1✔
750
                        try {
751
                                // Watch for changes in this SpallocJob's state
752
                                client.notifyJob(id, true);
1✔
753

754
                                // Wait for job state to change
755
                                while (!timedOut(finishTime)) {
1✔
756
                                        // Has the job changed state?
757
                                        purgeStatus();
1✔
758
                                        var newState = getState();
1✔
759
                                        if (newState != oldState) {
1✔
760
                                                return newState;
1✔
761
                                        }
762

763
                                        // Wait for a state change and keep the job alive
764
                                        if (!doWaitForAChange(finishTime)) {
×
765
                                                /*
766
                                                 * The user's timeout expired while waiting for a state
767
                                                 * change, return the old state and give up.
768
                                                 */
769
                                                return oldState;
×
770
                                        }
771
                                }
×
772
                        } catch (IOException e) {
×
773
                                /*
774
                                 * Something went wrong while communicating with the server,
775
                                 * reconnect after the reconnection delay (or timeout, whichever
776
                                 * came first).
777
                                 */
778
                                try {
779
                                        doReconnect(finishTime);
×
780
                                } catch (IOException e1) {
×
781
                                        log.error("problem when reconnecting after disconnect", e1);
×
782
                                }
×
783
                        }
×
784
                }
785

786
                /*
787
                 * If we get here, the timeout expired without a state change, just
788
                 * return the old state
789
                 */
790
                return oldState;
×
791
        }
792

793
        /**
794
         * Wait for a state change and keep the job alive.
795
         *
796
         * @param finishTime
797
         *            when our timeout expires, or {@code null} for never
798
         * @return True if the state has changed, or false on timeout
799
         * @throws SpallocServerException
800
         *             If the server throws an exception.
801
         * @throws IOException
802
         *             If communications fail.
803
         * @throws InterruptedException
804
         *             If interrupted while waiting.
805
         */
806
        private boolean doWaitForAChange(Long finishTime)
807
                        throws IOException, SpallocServerException, InterruptedException {
808
                /*
809
                 * Since we're about to block holding the client lock, we must be
810
                 * responsible for keeping everything alive.
811
                 */
812
                while (!timedOut(finishTime)) {
×
813
                        client.jobKeepAlive(id, timeout);
×
814

815
                        // Wait for the job to change
816
                        try {
817
                                /*
818
                                 * Block waiting for the job to change no-longer than the
819
                                 * user-specified timeout or half the keepalive interval.
820
                                 */
821
                                Integer waitTimeout;
822
                                if (finishTime != null && keepaliveTime != null) {
×
823
                                        waitTimeout = min(keepaliveTime / 2, timeLeft(finishTime));
×
824
                                } else if (finishTime == null) {
×
825
                                        waitTimeout =
826
                                                        (keepalive == null) ? null : keepaliveTime / 2;
×
827
                                } else {
828
                                        waitTimeout = timeLeft(finishTime);
×
829
                                }
830
                                if (waitTimeout == null || waitTimeout >= 0) {
×
831
                                        client.waitForNotification(waitTimeout);
×
832
                                        return true;
×
833
                                }
834
                        } catch (SpallocProtocolTimeoutException e) {
×
835
                                /*
836
                                 * Its been a while, send a keep-alive since we're still holding
837
                                 * the lock
838
                                 */
839
                        }
×
840
                }
841
                // The user's timeout expired while waiting for a state change
842
                return false;
×
843
        }
844

845
        /**
846
         * Reconnect after the reconnection delay (or timeout, whichever came
847
         * first).
848
         *
849
         * @throws IOException
850
         *             If communications fail.
851
         * @throws InterruptedException
852
         *             If the wait is interrupted.
853
         */
854
        private void doReconnect(Long finishTime)
855
                        throws IOException, InterruptedException {
856
                client.close();
×
857
                int delay;
858
                if (finishTime != null) {
×
859
                        delay = min(timeLeft(finishTime), reconnectDelay);
×
860
                } else {
861
                        delay = reconnectDelay;
×
862
                }
863
                sleep(max(0, delay));
×
864
                reconnect();
×
865
        }
×
866

867
        @Override
868
        public void waitUntilReady(Integer timeout)
869
                        throws JobDestroyedException, IOException, SpallocServerException,
870
                        SpallocStateChangeTimeoutException, InterruptedException {
871
                State curState = null;
1✔
872
                var finishTime = makeTimeout(timeout);
1✔
873
                while (!timedOut(finishTime)) {
1✔
874
                        if (curState == null) {
1✔
875
                                /*
876
                                 * Get initial state (NB: done here such that the command is
877
                                 * never sent if the timeout has already occurred)
878
                                 */
879
                                curState = getState();
1✔
880
                        }
881

882
                        // Are we ready yet?
883
                        switch (curState) {
1✔
884
                        case READY:
885
                                log.info("job:{} is now ready", id);
1✔
886
                                // Now in the ready state! Done successfully.
887
                                return;
1✔
888
                        case QUEUED:
889
                                log.info("job:{} has been queued by the spalloc server", id);
×
890
                                break;
×
891
                        case POWER:
892
                                log.info("waiting for board power commands to "
1✔
893
                                                + "complete for job:{}", id);
1✔
894
                                break;
1✔
895
                        case DESTROYED:
896
                                // In a state which can never become ready
897
                                throw new JobDestroyedException(getDestroyReason());
×
898
                        default: // UNKNOWN
899
                                // Server has forgotten what this job even was...
900
                                throw new JobDestroyedException(
×
901
                                                "Spalloc server no longer recognises job:" + id);
902
                        }
903
                        // Wait for a state change...
904
                        curState = waitForStateChange(curState, timeLeft(finishTime));
1✔
905
                }
906
                // Timed out!
907
                throw new SpallocStateChangeTimeoutException();
×
908
        }
909

910
        @Override
911
        public BoardPhysicalCoordinates whereIs(HasChipLocation chip)
912
                        throws IOException, SpallocServerException, InterruptedException {
913
                var result = client.whereIs(id, chip, timeout);
×
914
                if (result == null) {
×
915
                        throw new IllegalStateException(
×
916
                                        "received null instead of machine location");
917
                }
918
                return result.physical();
×
919
        }
920

921
        /**
922
         * @return The underlying client, allowing access to non-job-related
923
         *         operations.
924
         */
925
        public SpallocClient getClient() {
926
                return client;
×
927
        }
928
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc