• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 6233274834

19 Sep 2023 08:46AM UTC coverage: 36.409% (-0.6%) from 36.982%
6233274834

Pull #658

github

dkfellows
Merge branch 'master' into java-17
Pull Request #658: Update Java version to 17

1656 of 1656 new or added lines in 260 files covered. (100.0%)

8373 of 22997 relevant lines covered (36.41%)

0.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.82
/SpiNNaker-allocserv/src/main/java/uk/ac/manchester/spinnaker/alloc/allocator/AllocatorTask.java
1
/*
2
 * Copyright (c) 2021 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.alloc.allocator;
17

18
import static java.lang.Math.ceil;
19
import static java.lang.Math.max;
20
import static java.lang.Math.min;
21
import static java.lang.Math.sqrt;
22
import static java.lang.String.format;
23
import static java.util.Objects.nonNull;
24
import static org.slf4j.LoggerFactory.getLogger;
25
import static uk.ac.manchester.spinnaker.alloc.Constants.TRIAD_DEPTH;
26
import static uk.ac.manchester.spinnaker.alloc.db.Row.enumerate;
27
import static uk.ac.manchester.spinnaker.alloc.db.Row.integer;
28
import static uk.ac.manchester.spinnaker.alloc.db.Utils.isBusy;
29
import static uk.ac.manchester.spinnaker.alloc.model.JobState.DESTROYED;
30
import static uk.ac.manchester.spinnaker.alloc.model.JobState.POWER;
31
import static uk.ac.manchester.spinnaker.alloc.model.JobState.QUEUED;
32
import static uk.ac.manchester.spinnaker.alloc.model.JobState.READY;
33
import static uk.ac.manchester.spinnaker.alloc.model.PowerState.OFF;
34
import static uk.ac.manchester.spinnaker.alloc.model.PowerState.ON;
35
import static uk.ac.manchester.spinnaker.utils.MathUtils.ceildiv;
36

37
import java.time.Instant;
38
import java.util.ArrayList;
39
import java.util.Collection;
40
import java.util.EnumSet;
41
import java.util.HashSet;
42
import java.util.List;
43
import java.util.Objects;
44
import java.util.Set;
45
import java.util.stream.Stream;
46

47
import javax.annotation.PostConstruct;
48

49
import org.slf4j.Logger;
50
import org.springframework.beans.factory.annotation.Autowired;
51
import org.springframework.dao.DataAccessException;
52
import org.springframework.scheduling.TaskScheduler;
53
import org.springframework.scheduling.support.CronTrigger;
54
import org.springframework.stereotype.Service;
55

56
import com.google.errorprone.annotations.RestrictedApi;
57

58
import uk.ac.manchester.spinnaker.alloc.ForTestingOnly;
59
import uk.ac.manchester.spinnaker.alloc.ServiceMasterControl;
60
import uk.ac.manchester.spinnaker.alloc.SpallocProperties.AllocatorProperties;
61
import uk.ac.manchester.spinnaker.alloc.SpallocProperties.HistoricalDataProperties;
62
import uk.ac.manchester.spinnaker.alloc.SpallocProperties.KeepaliveProperties;
63
import uk.ac.manchester.spinnaker.alloc.bmp.BMPController;
64
import uk.ac.manchester.spinnaker.alloc.db.DatabaseAPI.Connection;
65
import uk.ac.manchester.spinnaker.alloc.db.DatabaseAPI.Query;
66
import uk.ac.manchester.spinnaker.alloc.db.DatabaseAPI.Update;
67
import uk.ac.manchester.spinnaker.alloc.db.DatabaseAwareBean;
68
import uk.ac.manchester.spinnaker.alloc.db.Row;
69
import uk.ac.manchester.spinnaker.alloc.db.SQLQueries;
70
import uk.ac.manchester.spinnaker.alloc.model.BMPAndMachine;
71
import uk.ac.manchester.spinnaker.alloc.model.BoardAndBMP;
72
import uk.ac.manchester.spinnaker.alloc.model.Direction;
73
import uk.ac.manchester.spinnaker.alloc.model.JobState;
74
import uk.ac.manchester.spinnaker.alloc.model.PowerState;
75
import uk.ac.manchester.spinnaker.machine.board.TriadCoords;
76
import uk.ac.manchester.spinnaker.utils.UsedInJavadocOnly;
77

78
/**
79
 * The allocation engine. Allocations are performed by running suitable
80
 * (non-trivial) SQL queries on a periodic basis, putting jobs that cannot be
81
 * allocated back on the queue for a later attempt (and increasing their
82
 * priority when it does so). This class is also responsible for destroying jobs
83
 * that are not kept alive sufficiently frequently, and eventually migrating
84
 * records of dead jobs to long-term storage ("tombstoning").
85
 */
86
@Service
87
public class AllocatorTask extends DatabaseAwareBean
1✔
88
                implements PowerController {
89
        /**
90
         * @see #setPower(Connection,int,PowerState)
91
         */
92
        private static final EnumSet<Direction> NO_PERIMETER =
1✔
93
                        EnumSet.noneOf(Direction.class);
1✔
94

95
        private static final Logger log = getLogger(AllocatorTask.class);
1✔
96

97
        private static final Rectangle ONE_BOARD = new Rectangle(1, 1, 1);
1✔
98

99
        /**
100
         * Maximum number of jobs that we actually run the quota check for. Used
101
         * because we are reusing a query, and we'll probably never have that many
102
         * live jobs even on the big machine.
103
         */
104
        private static final Integer NUMBER_OF_JOBS_TO_QUOTA_CHECK = 100000;
1✔
105

106
        @Autowired
107
        private Epochs epochs;
108

109
        @Autowired
110
        private ServiceMasterControl serviceControl;
111

112
        @Autowired
113
        private QuotaManager quotaManager;
114

115
        @Autowired
116
        private AllocatorProperties allocProps;
117

118
        @Autowired
119
        private KeepaliveProperties keepAliveProps;
120

121
        @Autowired
122
        private HistoricalDataProperties historyProps;
123

124
        @Autowired
125
        private ProxyRememberer rememberer;
126

127
        @Autowired
128
        private TaskScheduler scheduler;
129

130
        // Note can't be autowired as circular;
131
        // instead set by setter in postconstruct of BMPController
132
        private BMPController bmpController;
133

134
        /**
135
         * Only called by BMP controller.
136
         *
137
         * @param bmpController
138
         *            The BMP controller
139
         * @hidden
140
         */
141
        public void setBMPController(BMPController bmpController) {
142
                this.bmpController = bmpController;
1✔
143
        }
1✔
144

145
        @PostConstruct
146
        @SuppressWarnings("FutureReturnValueIgnored")
147
        private void init() {
148
                scheduler.scheduleAtFixedRate(this::allocate, allocProps.getPeriod());
1✔
149
                scheduler.scheduleAtFixedRate(this::expireJobs,
1✔
150
                                keepAliveProps.getExpiryPeriod());
1✔
151
                scheduler.schedule(this::tombstone,
1✔
152
                                new CronTrigger(historyProps.getSchedule()));
1✔
153
        }
1✔
154

155
        /**
156
         * Perform update on a job now as a result of a change.
157
         *
158
         * @param jobId
159
         *            The job to update.
160
         * @param sourceState
161
         *            The change source state.
162
         * @param targetState
163
         *            The change target state.
164
         */
165
        public void updateJob(int jobId, JobState sourceState,
166
                        JobState targetState) {
167
                scheduler.schedule(() -> updateJobNow(jobId, sourceState, targetState),
1✔
168
                                Instant.now());
1✔
169
        }
1✔
170

171
        private void updateJobNow(int jobId, JobState sourceState,
172
                        JobState targetState) {
173
                try {
174
                        var updated = execute(
1✔
175
                                        conn -> update(jobId, sourceState, targetState, conn));
1✔
176
                        if (updated) {
1✔
177
                                log.debug("advancing job {} epoch", jobId);
1✔
178
                                epochs.jobChanged(jobId);
1✔
179
                        }
180
                } catch (DataAccessException e) {
×
181
                        if (isBusy(e)) {
×
182
                                log.info("database is busy; "
×
183
                                                + "will try allocation processing later");
184
                                return;
×
185
                        }
186
                        throw e;
×
187
                }
1✔
188
        }
1✔
189

190
        private boolean update(int jobId, JobState sourceState,
191
                        JobState targetState, Connection c) {
192
                try (var getNTasks = c.query(COUNT_CHANGES_FOR_JOB);
1✔
193
                                var setJobState = c.update(SET_STATE_PENDING);
1✔
194
                                var setJobDestroyed = c.update(SET_STATE_DESTROYED)) {
1✔
195
                        // Count pending changes for this state change
196
                        var n = getNTasks.call1(row -> row.getInteger("n_changes"),
1✔
197
                                        jobId, sourceState, targetState).orElseThrow(
1✔
198
                                                        () -> new RuntimeException(
×
199
                                                                        "Error counting job tasks"));
200

201
                        log.debug("Job {} has {} changes remaining", jobId, n);
1✔
202

203
                        // If there are no more pending changes, set the job state to
204
                        // the target state
205
                        if (n == 0) {
1✔
206
                                log.debug("Job {} moving to state {}", jobId, targetState);
1✔
207
                                if (targetState == DESTROYED) {
1✔
208
                                        int rows = setJobDestroyed.call(0, jobId);
1✔
209
                                        if (rows != 1) {
1✔
210
                                                log.warn("unexpected number of rows affected by "
1✔
211
                                                                + "destroy in state update: {}", rows);
1✔
212
                                        }
213
                                        return rows > 0;
1✔
214
                                }
215
                                return setJobState.call(targetState, 0, jobId) > 0;
1✔
216
                        }
217
                        return false;
×
218
                }
1✔
219
        }
220

221
        /**
222
         * Helper class representing a rectangle of triads.
223
         *
224
         * @author Donal Fellows
225
         * @param width
226
         *            Width of rectangle, in triads.
227
         * @param height
228
         *            Height of rectangle, in triads.
229
         * @param depth
230
         *            Depth of rectangle. 1 or 3
231
         */
232
        private record Rectangle(int width, int height, int depth) {
1✔
233
                private Rectangle(Row row) {
234
                        this(row.getInt("max_width"), row.getInt("max_height"),
1✔
235
                                        TRIAD_DEPTH);
236
                }
1✔
237

238
                @Override
239
                public String toString() {
240
                        return format("%dx%dx%d", width, height, depth);
×
241
                }
242

243
                public int getArea() {
244
                        return width * height * depth;
×
245
                }
246
        }
247

248
        /**
249
         * Ask for allocation to happen now.
250
         */
251
        public void scheduleAllocateNow() {
252
                scheduler.schedule(this::allocate, Instant.now());
1✔
253
        }
1✔
254

255
        /**
256
         * Allocate all current requests for resources.
257
         */
258
        public synchronized void allocate() {
259
                if (serviceControl.isPaused()) {
1✔
260
                        return;
1✔
261
                }
262

263
                try {
264
                        var allocated = execute(this::allocate);
×
265
                        allocated.updateEpochs();
×
266
                        allocated.updateBMPs();
×
267
                } catch (DataAccessException e) {
×
268
                        if (isBusy(e)) {
×
269
                                log.info("database is busy; "
×
270
                                                + "will try allocation processing later");
271
                                return;
×
272
                        }
273
                        throw e;
×
274
                }
×
275
        }
×
276

277
        /** Encapsulates the queries and updates used in power control. */
278
        private sealed class PowerSQL extends AbstractSQL
279
                        permits AllocSQL, DestroySQL {
280
                /** Get basic information about a specific job. */
281
                private final Query getJobState;
282

283
                /** Get what boards are allocated to a job (that is queued or ready). */
284
                private final Query getJobBoards;
285

286
                /**
287
                 * Get the links on the perimeter of the allocation to a job. The
288
                 * perimeter is defined as being the links between a board that is part
289
                 * of the allocation and a board that is not; it's <em>not</em> a
290
                 * geometric definition, but rather a relational algebraic one.
291
                 */
292
                private final Query getPerimeter;
293

294
                /** Create a request to change the power status of a board. */
295
                private final Update issuePowerChange;
296

297
                /** Set the state and number of pending changes for a job. */
298
                private final Update setStatePending;
299

300
                /** Set the state to destroyed. */
301
                private final Update setStateDestroyed;
302

303
                PowerSQL(Connection conn) {
1✔
304
                        super(conn);
1✔
305
                        getJobState = conn.query(GET_JOB);
1✔
306
                        getJobBoards = conn.query(GET_JOB_BOARDS);
1✔
307
                        getPerimeter = conn.query(getPerimeterLinks);
1✔
308
                        issuePowerChange = conn.update(ISSUE_CHANGE_FOR_JOB);
1✔
309
                        setStatePending = conn.update(SET_STATE_PENDING);
1✔
310
                        setStateDestroyed = conn.update(SET_STATE_DESTROYED);
1✔
311
                }
1✔
312

313
                @Override
314
                public void close() {
315
                        getJobState.close();
1✔
316
                        getJobBoards.close();
1✔
317
                        getPerimeter.close();
1✔
318
                        issuePowerChange.close();
1✔
319
                        setStatePending.close();
1✔
320
                        setStateDestroyed.close();
1✔
321
                }
1✔
322
        }
323

324
        /** Encapsulates the queries and updates used in allocation. */
325
        @UsedInJavadocOnly(SQLQueries.class)
326
        private final class AllocSQL extends PowerSQL {
327
                /** Increases the importance of a job. */
328
                private final Update bumpImportance;
329

330
                /** Get the list of allocation tasks for jobs in a given state. */
331
                private final Query getTasks;
332

333
                /** Delete an allocation task. */
334
                private final Update delete;
335

336
                /** Find a single free board. */
337
                private final Query findFreeBoard;
338

339
                /**
340
                 * Find a rectangle of triads of boards that may be allocated.
341
                 *
342
                 * @see SQLQueries#FIND_FREE_BOARD
343
                 */
344
                private final Query getRectangles;
345

346
                /**
347
                 * Find a rectangle of triads of boards that may be allocated rooted at
348
                 * a particular board.
349
                 *
350
                 * @see SQLQueries#FIND_FREE_BOARD_AT
351
                 */
352
                private final Query getRectangleAt;
353

354
                /**
355
                 * Count the number of <em>connected</em> boards (i.e., have at least
356
                 * one path over enabled links to the root board of the allocation)
357
                 * within a rectangle of triads. The triads are taken as being full
358
                 * depth.
359
                 */
360
                private final Query countConnectedBoards;
361

362
                /**
363
                 * Find an allocatable board with a specific board ID. (This will have
364
                 * been previously converted from some other form of board coordinates.)
365
                 */
366
                private final Query findSpecificBoard;
367

368
                /**
369
                 * Get the set of boards at some coordinates within a triad rectangle
370
                 * that are connected (i.e., have at least one path over enableable
371
                 * links) to the root board.
372
                 */
373
                private final Query getConnectedBoardIDs;
374

375
                /** Tell a board that it is allocated. */
376
                private final Update allocBoard;
377

378
                /** Tell a job that it is allocated. Doesn't set the state. */
379
                private final Update allocJob;
380

381
                AllocSQL(Connection conn) {
1✔
382
                        super(conn);
1✔
383
                        bumpImportance = conn.update(BUMP_IMPORTANCE);
1✔
384
                        getTasks = conn.query(GET_ALLOCATION_TASKS);
1✔
385
                        delete = conn.update(DELETE_TASK);
1✔
386
                        findFreeBoard = conn.query(FIND_FREE_BOARD);
1✔
387
                        getRectangles = conn.query(findRectangle);
1✔
388
                        getRectangleAt = conn.query(findRectangleAt);
1✔
389
                        countConnectedBoards = conn.query(countConnected);
1✔
390
                        findSpecificBoard = conn.query(FIND_LOCATION);
1✔
391
                        getConnectedBoardIDs = conn.query(getConnectedBoards);
1✔
392
                        allocBoard = conn.update(ALLOCATE_BOARDS_BOARD);
1✔
393
                        allocJob = conn.update(ALLOCATE_BOARDS_JOB);
1✔
394
                }
1✔
395

396
                @Override
397
                public void close() {
398
                        super.close();
1✔
399
                        bumpImportance.close();
1✔
400
                        getTasks.close();
1✔
401
                        delete.close();
1✔
402
                        findFreeBoard.close();
1✔
403
                        getRectangles.close();
1✔
404
                        getRectangleAt.close();
1✔
405
                        countConnectedBoards.close();
1✔
406
                        findSpecificBoard.close();
1✔
407
                        getConnectedBoardIDs.close();
1✔
408
                        allocBoard.close();
1✔
409
                        allocJob.close();
1✔
410
                }
1✔
411
        }
412

413
        /** Encapsulates the queries and updates used in deletion. */
414
        private final class DestroySQL extends PowerSQL {
415
                /** Get basic information about a specific job. */
416
                private final Query getJob = conn.query(GET_JOB);
1✔
417

418
                /** Mark a job as dead. */
419
                private final Update markAsDestroyed = conn.update(DESTROY_JOB);
1✔
420

421
                /** Note the reason why a job is dead. */
422
                private Update recordDestroyReason = conn.update(NOTE_DESTROY_REASON);
1✔
423

424
                /** Delete a request to allocate resources for a job. */
425
                private final Update killAlloc = conn.update(KILL_JOB_ALLOC_TASK);
1✔
426

427
                DestroySQL(Connection conn) {
1✔
428
                        super(conn);
1✔
429
                }
1✔
430

431
                @Override
432
                public void close() {
433
                        super.close();
1✔
434
                        getJob.close();
1✔
435
                        markAsDestroyed.close();
1✔
436
                        recordDestroyReason.close();
1✔
437
                        killAlloc.close();
1✔
438
                }
1✔
439
        }
440

441
        /** Encapsulates the task to do a particular allocation. */
442
        private class AllocTask {
443
                final int id;
444

445
                final int importance;
446

447
                final int jobId;
448

449
                final int machineId;
450

451
                final Rectangle max;
452

453
                final int maxDeadBoards;
454

455
                final Integer numBoards;
456

457
                final Integer width;
458

459
                final Integer height;
460

461
                final Integer root;
462

463
                AllocTask(Row row) {
1✔
464
                        id = row.getInt("req_id");
1✔
465
                        importance = row.getInt("importance");
1✔
466
                        jobId = row.getInt("job_id");
1✔
467
                        machineId = row.getInt("machine_id");
1✔
468
                        max = new Rectangle(row);
1✔
469
                        maxDeadBoards = row.getInt("max_dead_boards");
1✔
470
                        numBoards = row.getInteger("num_boards");
1✔
471
                        width = row.getInteger("width");
1✔
472
                        height = row.getInteger("height");
1✔
473
                        root = row.getInteger("board_id");
1✔
474
                }
1✔
475

476
                Collection<BMPAndMachine> allocate(AllocSQL sql) {
477
                        if (nonNull(numBoards) && numBoards > 0) {
1✔
478
                                // Single-board case gets its own allocator that's better at
479
                                // that
480
                                if (numBoards == 1) {
1✔
481
                                        log.debug("Allocate one board");
1✔
482
                                        return allocateOneBoard(sql, jobId, machineId);
1✔
483
                                }
484
                                var estimate = new DimensionEstimate(numBoards, max);
1✔
485
                                return allocateDimensions(sql, jobId, machineId, estimate,
1✔
486
                                                maxDeadBoards);
487
                        }
488

489
                        if (nonNull(width) && nonNull(height) && nonNull(root)) {
1✔
490
                                return allocateTriadsAt(sql, jobId, machineId, root, width,
×
491
                                                height,        maxDeadBoards);
×
492
                        }
493

494
                        if (nonNull(width) && nonNull(height) && width > 0 && height > 0) {
1✔
495
                                // Special case; user is really just asking for one board
496
                                if (height == 1 && width == 1 && nonNull(maxDeadBoards)
1✔
497
                                                && maxDeadBoards == 2) {
498
                                        return allocateOneBoard(sql, jobId, machineId);
1✔
499
                                }
500
                                var estimate = new DimensionEstimate(width, height, max);
1✔
501
                                log.debug(
1✔
502
                                                "resolved request for {}x{} boards to {}x{} triads "
503
                                                                + "with tolerance {}",
504
                                                width, height, estimate.width, estimate.height,
1✔
505
                                                estimate.tolerance);
1✔
506
                                return allocateDimensions(sql, jobId, machineId, estimate,
1✔
507
                                                maxDeadBoards);
508
                        }
509

510
                        if (nonNull(root)) {
1✔
511
                                // Ignores maxDeadBoards; is a single-board allocate
512
                                return allocateBoard(sql, jobId, machineId, root);
1✔
513
                        }
514

515
                        log.warn("job {} could not be allocated; "
×
516
                                        + "bad request will be cleared from queue", jobId);
×
517
                        return List.of();
×
518
                }
519
        }
520

521
        /**
522
         * A set of information about the allocations that have been made.
523
         */
524
        class Allocations {
525
                /** The BMPs that have been affected by the allocations. **/
526
                final Set<Integer> bmps = new HashSet<>();
1✔
527

528
                /** The Machines that have been affected by the allocations. **/
529
                private final Set<Integer> machines = new HashSet<>();
1✔
530

531
                /** The jobs that have been affected by the allocations. **/
532
                private final List<Integer> jobIds = new ArrayList<>();
1✔
533

534
                Allocations() {
1✔
535
                        // Does nothing
536
                }
1✔
537

538
                Allocations(int jobId, Collection<BMPAndMachine> bmps) {
1✔
539
                        addAll(jobId, bmps);
1✔
540
                }
1✔
541

542
                void addAll(int jobId, Collection<BMPAndMachine> bmps) {
543
                        if (bmps.size() > 0) {
1✔
544
                                jobIds.add(jobId);
1✔
545
                                for (var bm : bmps) {
1✔
546
                                        this.bmps.add(bm.bmpId);
1✔
547
                                        this.machines.add(bm.machineId);
1✔
548
                                }
1✔
549
                        }
550
                }
1✔
551

552
                void updateEpochs() {
553
                        log.debug("Updating jobs {}", jobIds);
1✔
554
                        for (var job : jobIds) {
1✔
555
                                epochs.jobChanged(job);
1✔
556
                        }
1✔
557
                        log.debug("Updating machines {}", machines);
1✔
558
                        for (var m : machines) {
1✔
559
                                epochs.machineChanged(m);
1✔
560
                        }
1✔
561
                }
1✔
562

563
                void updateBMPs() {
564
                        if (!bmps.isEmpty()) {
1✔
565
                                // Poke the BMP controller to start looking!
566
                                log.debug("Triggering BMPs {}", bmps);
1✔
567
                                bmpController.triggerSearch(bmps);
1✔
568
                        }
569
                }
1✔
570

571
                boolean notEmpty() {
572
                        return !jobIds.isEmpty();
1✔
573
                }
574
        }
575

576
        /**
577
         * Allocate all current requests for resources.
578
         *
579
         * @param conn
580
         *            The DB connection
581
         * @return Whether any changes have been done
582
         */
583
        private Allocations allocate(Connection conn) {
584
                try (var sql = new AllocSQL(conn)) {
1✔
585
                        int maxImportance = -1;
1✔
586
                        log.trace("Allocate running");
1✔
587
                        var allocations = new Allocations();
1✔
588
                        for (AllocTask task : sql.getTasks.call(AllocTask::new, QUEUED)) {
1✔
589
                                if (task.importance > maxImportance) {
1✔
590
                                        maxImportance = task.importance;
1✔
591
                                } else if (task.importance < maxImportance
×
592
                                                - allocProps.getImportanceSpan()) {
×
593
                                        // Too much of a span
594
                                        continue;
×
595
                                }
596
                                var handled = task.allocate(sql);
1✔
597
                                allocations.addAll(task.jobId, handled);
1✔
598
                                log.debug("allocate for {} (job {}): {}", task.id,
1✔
599
                                                task.jobId, handled);
1✔
600
                        }
1✔
601
                        /*
602
                         * Those tasks which weren't allocated get their importance bumped
603
                         * so they get considered with higher priority when the allocator
604
                         * runs next time.
605
                         */
606
                        sql.bumpImportance.call();
1✔
607
                        return allocations;
1✔
608
                }
609
        }
610

611
        /**
612
         * Destroy jobs that have missed their keepalive.
613
         */
614
        public void expireJobs() {
615
                if (serviceControl.isPaused()) {
1✔
616
                        return;
1✔
617
                }
618

619
                try {
620
                        var allocated = execute(this::expireJobs);
×
621
                        allocated.updateEpochs();
×
622
                        allocated.updateBMPs();
×
623
                } catch (DataAccessException e) {
×
624
                        if (isBusy(e)) {
×
625
                                log.info("database is busy; "
×
626
                                                + "will try job expiry processing later");
627
                                return;
×
628
                        }
629
                        throw e;
×
630
                }
×
631
        }
×
632

633
        /**
634
         * Destroy jobs that have missed their keepalive.
635
         *
636
         * @param conn
637
         *            How to talk to the DB
638
         * @return Whether any jobs have been expired.
639
         */
640
        private Allocations expireJobs(Connection conn) {
641
                var allocations = new Allocations();
1✔
642
                try (var find = conn.query(FIND_EXPIRED_JOBS)) {
1✔
643
                        var toKill = find.call(integer("job_id"));
1✔
644
                        for (var id : toKill) {
1✔
645
                                allocations.addAll(id,
1✔
646
                                                destroyJob(conn, id, "keepalive expired"));
1✔
647
                        }
1✔
648
                }
649
                try (var find = conn.query(GET_LIVE_JOB_IDS)) {
1✔
650
                        var toKill = find.call(integer("job_id"),
1✔
651
                                        NUMBER_OF_JOBS_TO_QUOTA_CHECK, 0);
1✔
652
                        for (var id : toKill) {
1✔
653
                                if (quotaManager.shouldKillJob(id)) {
×
654
                                        allocations.addAll(id,
×
655
                                                        destroyJob(conn, id, "quota exceeded"));
×
656
                                }
657
                        }
×
658
                }
659
                return allocations;
1✔
660
        }
661

662
        /**
663
         * Migrates long dead jobs to the historical data DB.
664
         */
665
        public void tombstone() {
666
                if (serviceControl.isPaused()) {
×
667
                        return;
×
668
                }
669

670
                try (var conn = getConnection();
×
671
                                var histConn = getHistoricalConnection()) {
×
672
                        var c = tombstone(conn, histConn);
×
673
                        log.info("tombstoning completed: "
×
674
                                        + "moved {} job records and {} allocation records",
675
                                        c.numJobs(), c.numAllocs());
×
676
                } catch (DataAccessException e) {
×
677
                        if (isBusy(e)) {
×
678
                                log.info("database is busy; "
×
679
                                                + "will try job tombstone processing at future date");
680
                                return;
×
681
                        }
682
                        throw e;
×
683
                }
×
684
        }
×
685

686
        /**
687
         * Describes what the first stage of the tombstoner has copied.
688
         *
689
         * @param jobs
690
         *            The jobs that were copied.
691
         * @param allocs
692
         *            The allocations that were copied.
693
         */
694
        record Copied(List<HistoricalJob> jobs, List<HistoricalAlloc> allocs) {
1✔
695
                private Stream<HistoricalAlloc> allocStream() {
696
                        return allocs.stream().filter(Objects::nonNull);
1✔
697
                }
698

699
                private Stream<HistoricalJob> jobStream() {
700
                        return jobs.stream().filter(Objects::nonNull);
1✔
701
                }
702

703
                /**
704
                 * @return The number of job records to copy over to the historical
705
                 *         database.
706
                 */
707
                int numJobs() {
708
                        return jobs.size();
1✔
709
                }
710

711
                /**
712
                 * @return The number of board allocation records to copy over to the
713
                 *         historical database.
714
                 */
715
                int numAllocs() {
716
                        return allocs.size();
1✔
717
                }
718

719
                /**
720
                 * Details of a copied allocation record.
721
                 *
722
                 * @param allocId
723
                 *            Allocation ID
724
                 * @param jobId
725
                 *            Job ID
726
                 * @param boardId
727
                 *            Board ID (the board that was allocated)
728
                 * @param allocTimestamp
729
                 *            When the board was allocated.
730
                 */
731
                record HistoricalAlloc(int allocId, int jobId, int boardId,
×
732
                                Instant allocTimestamp) {
733
                        HistoricalAlloc(Row row) {
734
                                this(row.getInt("alloc_id"), row.getInt("job_id"),
×
735
                                                row.getInt("board_id"),
×
736
                                                row.getInstant("alloc_timestamp"));
×
737
                        }
×
738

739
                        private Object[] args() {
740
                                return new Object[] {
×
741
                                        allocId, jobId, boardId, allocTimestamp
×
742
                                };
743
                        }
744
                }
745

746
                /**
747
                 * Details of a copied job record.
748
                 *
749
                 * @param jobId
750
                 *            Job ID
751
                 * @param machineId
752
                 *            Machine ID
753
                 * @param owner
754
                 *            Whose job was it (user ID)
755
                 * @param createTimestamp
756
                 *            When the job was submitted
757
                 * @param width
758
                 *            Width of requested allocation, in triads
759
                 * @param height
760
                 *            Height of requested allocation, in triads
761
                 * @param depth
762
                 *            Depth of requested allocation; 1 (single board) or 3
763
                 * @param allocatedRoot
764
                 *            ID of board at root of allocation
765
                 * @param keepaliveInterval
766
                 *            How often keep-alive messages should come
767
                 * @param keepaliveHost
768
                 *            IP address of machine keeping job alive
769
                 * @param deathReason
770
                 *            Why did the job terminate?
771
                 * @param deathTimestamp
772
                 *            When did the job terminate
773
                 * @param originalRequest
774
                 *            What was actually asked for. (Original request data)
775
                 * @param allocationTimestamp
776
                 *            When did we complete allocation. Quota consumption was
777
                 *            from this moment to the death timestamp.
778
                 * @param allocationSize
779
                 *            How many boards were allocated
780
                 * @param machineName
781
                 *            Name of allocated machine (convenience; implied by machine
782
                 *            ID)
783
                 * @param userName
784
                 *            Name of user (convenience; implied by owner ID)
785
                 * @param groupId
786
                 *            Group for accounting purposes
787
                 * @param groupName
788
                 *            Name of group (convenience; implied by group ID)
789
                 */
790
                record HistoricalJob(int jobId, int machineId, String owner,
1✔
791
                                Instant createTimestamp, int width, int height, int depth,
792
                                int allocatedRoot, Instant keepaliveInterval,
793
                                String keepaliveHost, String deathReason,
794
                                Instant deathTimestamp, byte[] originalRequest,
795
                                Instant allocationTimestamp, int allocationSize,
796
                                String machineName, String userName, int groupId,
797
                                String groupName) {
798
                        HistoricalJob(Row row) {
799
                                this(row.getInt("job_id"), row.getInt("machine_id"),
1✔
800
                                                row.getString("owner"),
1✔
801
                                                row.getInstant("create_timestamp"), row.getInt("width"),
1✔
802
                                                row.getInt("height"), row.getInt("depth"),
1✔
803
                                                row.getInt("allocated_root"),
1✔
804
                                                row.getInstant("keepalive_interval"),
1✔
805
                                                row.getString("keepalive_host"),
1✔
806
                                                row.getString("death_reason"),
1✔
807
                                                row.getInstant("death_timestamp"),
1✔
808
                                                row.getBytes("original_request"),
1✔
809
                                                row.getInstant("allocation_timestamp"),
1✔
810
                                                row.getInt("allocation_size"),
1✔
811
                                                row.getString("machine_name"),
1✔
812
                                                row.getString("user_name"), row.getInt("group_id"),
1✔
813
                                                row.getString("group_name"));
1✔
814
                        }
1✔
815

816
                        private Object[] args() {
817
                                return new Object[] {
1✔
818
                                        jobId, machineId, owner, createTimestamp, width, height,
1✔
819
                                        depth, allocatedRoot, keepaliveInterval, keepaliveHost,
1✔
820
                                        deathReason, deathTimestamp, originalRequest,
821
                                        allocationTimestamp, allocationSize, machineName, userName,
1✔
822
                                        groupId, groupName
1✔
823
                                };
824
                        }
825
                }
826
        }
827

828
        /**
829
         * Implementation of {@link #tombstone()}. This is done as two transactions
830
         * to help manage the amount of locking (especially multi-DB locking);
831
         * nothing else ought to be updating any of these jobs at the time this task
832
         * usually runs, but we'll still try to keep things minimally locked.
833
         *
834
         * @param conn
835
         *            The DB connection
836
         * @return Description of the tombstoned IDs
837
         */
838
        private Copied tombstone(Connection conn, Connection histConn) {
839
                // No tombstoning without the target DB!
840
                if (!isHistoricalDBAvailable()) {
1✔
841
                        return new Copied(List.of(), List.of());
×
842
                }
843

844
                try (var readJobs = conn.query(READ_HISTORICAL_JOBS);
1✔
845
                                var readAllocs = conn.query(READ_HISTORICAL_ALLOCS);
1✔
846
                                var deleteJobs = conn.update(DELETE_JOB_RECORD);
1✔
847
                                var deleteAllocs = conn.update(DELETE_ALLOC_RECORD);
1✔
848
                                var writeJobs = histConn.update(WRITE_HISTORICAL_JOBS);
1✔
849
                                var writeAllocs = histConn.update(WRITE_HISTORICAL_ALLOCS)) {
1✔
850
                        var grace = historyProps.getGracePeriod();
1✔
851
                        var copied = conn.transaction(() -> new Copied(
1✔
852
                                        readJobs.call(Copied.HistoricalJob::new, grace),
1✔
853
                                        readAllocs.call(Copied.HistoricalAlloc::new, grace)));
1✔
854
                        histConn.transaction(() -> {
1✔
855
                                copied.allocStream().forEach(a -> writeAllocs.call(a.args()));
1✔
856
                                copied.jobStream().forEach(j -> writeJobs.call(j.args()));
1✔
857
                        });
1✔
858
                        conn.transaction(() -> {
1✔
859
                                copied.allocStream()
1✔
860
                                                .forEach(a -> deleteAllocs.call(a.allocId()));
1✔
861
                                copied.jobStream().forEach(j -> deleteJobs.call(j.jobId()));
1✔
862
                        });
1✔
863
                        return copied;
1✔
864
                }
865
        }
866

867
        @Override
868
        public void destroyJob(int id, String reason) {
869
                var allocations = new Allocations(
1✔
870
                                id, execute(conn -> destroyJob(conn, id, reason)));
1✔
871
                allocations.updateEpochs();
1✔
872
                allocations.updateBMPs();
1✔
873
        }
1✔
874

875
        /**
876
         * Destroy a job.
877
         *
878
         * @param conn
879
         *            How to talk to the DB
880
         * @param id
881
         *            The ID of the job
882
         * @param reason
883
         *            Why is the job being destroyed.
884
         * @return Whether the job was destroyed.
885
         */
886
        private Collection<BMPAndMachine> destroyJob(Connection conn, int id,
887
                        String reason) {
888
                JobLifecycle.log.info("destroying job {} \"{}\"", id, reason);
1✔
889
                try (var sql = new DestroySQL(conn)) {
1✔
890
                        if (sql.getJob.call1(enumerate("job_state", JobState.class), id)
1✔
891
                                        .orElse(DESTROYED) == DESTROYED) {
1✔
892
                                log.info("job {} already destroyed", id);
1✔
893
                                /*
894
                                 * Don't do anything if the job doesn't exist or is already
895
                                 * destroyed
896
                                 */
897
                                return List.of();
1✔
898
                        }
899
                        /*
900
                         * Record the reason as a separate step here; state change can take
901
                         * some time.  It also doesn't matter if we do that twice.
902
                         */
903
                        sql.recordDestroyReason.call(reason, id);
1✔
904
                        // Inserts into pending_changes; these run after job is dead
905
                        var bmps = setPower(sql, id, OFF, DESTROYED);
1✔
906
                        sql.killAlloc.call(id);
1✔
907
                        sql.markAsDestroyed.call(reason, id);
1✔
908
                        JobLifecycle.log.info(
1✔
909
                                        "destroyed job {}; reclaiming boards in {} frames", id,
1✔
910
                                        bmps.size());
1✔
911
                        return bmps;
1✔
912
                } finally {
1✔
913
                        quotaManager.finishJob(id);
1✔
914
                        rememberer.killProxies(id);
1✔
915
                }
916
        }
917

918
        /**
919
         * Computes the estimate of what sort of allocation will be required.
920
         * Converts a number of boards into a close-to-square size to search for.
921
         * <p>
922
         * With the big machine's level of resources, that's good enough. For now.
923
         *
924
         * @author Donal Fellows
925
         */
926
        private static final class DimensionEstimate {
927
                /** The estimated width, in triads. */
928
                final int width;
929

930
                /** The estimated height, in triads. */
931
                final int height;
932

933
                /**
934
                 * The number of boards in the rectangle of triads that we can tolerate
935
                 * being down due to overallocation (due to the use of rectangles and
936
                 * triads).
937
                 */
938
                final int tolerance;
939

940
                /**
941
                 * Create an estimate of what to allocate. The old spalloc would take
942
                 * hints at this point on the aspect ratio, but we don't bother; we
943
                 * strongly prefer allocations "nearly square", going for making them
944
                 * slightly taller than wide if necessary.
945
                 *
946
                 * @param numBoards
947
                 *            The number of boards wanted.
948
                 * @param max
949
                 *            The size of the machine.
950
                 */
951
                DimensionEstimate(int numBoards, Rectangle max) {
1✔
952
                        if (numBoards < 1) {
1✔
953
                                throw new IllegalArgumentException(
×
954
                                                "number of boards must be greater than zero");
955
                        }
956
                        int numTriads = ceildiv(numBoards, TRIAD_DEPTH);
1✔
957
                        width = min((int) ceil(sqrt(numTriads)), max.width);
1✔
958
                        height = min(ceildiv(numTriads, width), max.height);
1✔
959
                        tolerance = (width * height * TRIAD_DEPTH) - numBoards;
1✔
960
                        if (width < 1 || height < 1) {
1✔
961
                                throw new IllegalArgumentException(
×
962
                                                "computed dimensions must be greater than zero");
963
                        }
964
                        if (tolerance < 0) {
1✔
965
                                throw new IllegalArgumentException(
×
966
                                                "that job cannot possibly fit on this machine");
967
                        }
968
                }
1✔
969

970
                /**
971
                 * Create an estimate of what to allocate. This does not need to be
972
                 * "near square".
973
                 *
974
                 * @param w
975
                 *            The width of the allocation requested, in triads.
976
                 * @param h
977
                 *            The height of the allocation requested, in triads.
978
                 * @param max
979
                 *            The size of the machine.
980
                 */
981
                DimensionEstimate(int w, int h, Rectangle max) {
1✔
982
                        if (w < 1 || h < 1) {
1✔
983
                                throw new IllegalArgumentException(
×
984
                                                "dimensions must be greater than zero");
985
                        }
986
                        int numBoards = w * h * TRIAD_DEPTH;
1✔
987
                        width = max(1, min(w, max.width));
1✔
988
                        height = max(1, min(h, max.height));
1✔
989
                        tolerance = (width * height * TRIAD_DEPTH) - numBoards;
1✔
990
                        if (tolerance < 0) {
1✔
991
                                throw new IllegalArgumentException(
1✔
992
                                                "that job cannot possibly fit on this machine");
993
                        }
994
                }
1✔
995

996
                /** @return The estimated dimensions as a rectangle. */
997
                Rectangle getRect() {
998
                        return new Rectangle(width, height, TRIAD_DEPTH);
1✔
999
                }
1000
        }
1001

1002
        private Collection<BMPAndMachine> allocateOneBoard(AllocSQL sql, int jobId,
1003
                        int machineId) {
1004
                // This is simplified; no subsidiary searching needed
1005
                return sql.findFreeBoard
1✔
1006
                                .call1(row -> setAllocation(sql, jobId,
1✔
1007
                                                ONE_BOARD, machineId, coords(row)), machineId)
1✔
1008
                                .orElse(List.of());
1✔
1009
        }
1010

1011
        private static TriadCoords coords(Row row) {
1012
                int x = row.getInt("x");
1✔
1013
                int y = row.getInt("y");
1✔
1014
                int z = row.getInt("z");
1✔
1015
                return new TriadCoords(x, y, z);
1✔
1016
        }
1017

1018
        private Collection<BMPAndMachine> allocateDimensions(AllocSQL sql,
1019
                        int jobId, int machineId, DimensionEstimate estimate,
1020
                        int userMaxDead) {
1021
                int tolerance = userMaxDead + estimate.tolerance;
1✔
1022
                int minArea =
1✔
1023
                                estimate.width * estimate.height * TRIAD_DEPTH - tolerance;
1024
                for (var root : sql.getRectangles
1✔
1025
                                .call(AllocatorTask::coords, estimate.width, estimate.height,
1✔
1026
                                                machineId, tolerance)) {
1✔
1027
                        if (minArea > 1) {
1✔
1028
                                /*
1029
                                 * Check that a minimum number of boards are reachable from the
1030
                                 * proposed root board. If the root board is isolated, we don't
1031
                                 * care if the rest of the allocation works because the rest of
1032
                                 * the toolchain won't cope.
1033
                                 */
1034
                                int size = connectedSize(sql, machineId, root, estimate);
1✔
1035
                                if (size < minArea) {
1✔
1036
                                        continue;
×
1037
                                }
1038
                        }
1039
                        return setAllocation(sql, jobId, estimate.getRect(), machineId,
1✔
1040
                                        root);
1041
                }
1042
                log.debug("Could not allocate min area {}", minArea);
×
1043
                return List.of();
×
1044
        }
1045

1046
        /**
1047
         * Find the number of boards that are reachable from the proposed root
1048
         * board.
1049
         *
1050
         * @param sql
1051
         *            How to talk to the DB
1052
         * @param machineId
1053
         *            The machine on which the allocation is happening
1054
         * @param root
1055
         *            Root logical coordinates
1056
         * @param width
1057
         *            The width of the planned allocation, in triads
1058
         * @param height
1059
         *            The width of the planned allocation, in triads
1060
         * @return How many boards in the allocation are reachable.
1061
         */
1062
        private int connectedSize(AllocSQL sql, int machineId, TriadCoords root,
1063
                        int width, int height) {
1064
                return sql.countConnectedBoards
1✔
1065
                                .call1(integer("connected_size"), machineId, root.x(), root.y(),
1✔
1066
                                                width, height).orElse(-1);
1✔
1067
        }
1068

1069
        /**
1070
         * Find the number of boards that are reachable from the proposed root
1071
         * board.
1072
         *
1073
         * @param sql
1074
         *            How to talk to the DB
1075
         * @param machineId
1076
         *            The machine on which the allocation is happening
1077
         * @param root
1078
         *            Root logical coordinates
1079
         * @param rect
1080
         *            The requested allocation dimensions
1081
         * @return How many boards in the allocation are reachable.
1082
         */
1083
        private int connectedSize(AllocSQL sql, int machineId, TriadCoords root,
1084
                        Rectangle rect) {
1085
                return connectedSize(sql, machineId, root, rect.width, rect.height);
×
1086
        }
1087

1088
        /**
1089
         * Find the number of boards that are reachable from the proposed root
1090
         * board.
1091
         *
1092
         * @param sql
1093
         *            How to talk to the DB
1094
         * @param machineId
1095
         *            The machine on which the allocation is happening
1096
         * @param root
1097
         *            Root logical coordinates
1098
         * @param estimate
1099
         *            The planned allocation dimensions
1100
         * @return How many boards in the allocation are reachable.
1101
         */
1102
        private int connectedSize(AllocSQL sql, int machineId, TriadCoords root,
1103
                        DimensionEstimate estimate) {
1104
                return connectedSize(sql, machineId, root, estimate.width,
1✔
1105
                                estimate.height);
1106
        }
1107

1108
        private Collection<BMPAndMachine> allocateBoard(AllocSQL sql, int jobId,
1109
                        int machineId, int boardId) {
1110
                return sql.findSpecificBoard
1✔
1111
                                .call1(row -> setAllocation(sql, jobId, ONE_BOARD, machineId,
1✔
1112
                                                coords(row)), machineId, boardId)
1✔
1113
                                .orElse(List.of());
1✔
1114
        }
1115

1116
        private Collection<BMPAndMachine> allocateTriadsAt(AllocSQL sql, int jobId,
1117
                        int machineId, int rootId, int width, int height,
1118
                        int maxDeadBoards) {
1119
                var rect = new Rectangle(width, height, TRIAD_DEPTH);
×
1120
                return sql.getRectangleAt
×
1121
                                .call1(AllocatorTask::coords, rootId, width, height, machineId,
×
1122
                                                maxDeadBoards)
×
1123
                                .filter(root -> connectedSize(sql, machineId, root,
×
1124
                                                rect) >= rect.getArea() - maxDeadBoards)
×
1125
                                .map(root -> setAllocation(sql, jobId, rect, machineId, root))
×
1126
                                .orElse(List.of());
×
1127
        }
1128

1129
        /**
1130
         * Does the actual allocation.
1131
         * <p>
1132
         * At this point, we've checked that there's enough boards <em>and</em> we
1133
         * are running in a transaction. We take particular care that we only
1134
         * actually allocate boards that are reachable from the root board; this is
1135
         * assumed by the SpiNNaker tools, so we'd better conform to that
1136
         * expectation. Fortunately, the bigger the allocation, the more likely that
1137
         * is true (and it is trivially true for single-board allocations.)
1138
         * <p>
1139
         * If you want a multi-board allocation, you'd better be allocating a full
1140
         * triad's-worth of depth or you'll get nothing.
1141
         *
1142
         * @param sql
1143
         *            How to talk to the DB
1144
         * @param jobId
1145
         *            What job are we allocating for
1146
         * @param rect
1147
         *            Proposed rectangle size
1148
         * @param machineId
1149
         *            What machine are we allocating on
1150
         * @param root
1151
         *            Proposed root coordinates
1152
         * @return The BMPs that have been used to make the allocation.
1153
         */
1154
        private Collection<BMPAndMachine> setAllocation(AllocSQL sql, int jobId,
1155
                        Rectangle rect,        int machineId, TriadCoords root) {
1156
                log.debug("performing allocation for {}: {}x{}x{} at {}:{}:{}", jobId,
1✔
1157
                                rect.width, rect.height, rect.depth, root.x(), root.y(),
1✔
1158
                                root.z());
1✔
1159
                var boardsToAllocate = sql.getConnectedBoardIDs.call(
1✔
1160
                                integer("board_id"), machineId, root.x(), root.y(), root.z(),
1✔
1161
                                rect.width, rect.height, rect.depth);
1✔
1162
                if (boardsToAllocate.isEmpty()) {
1✔
1163
                        log.debug("No boards to allocate");
×
1164
                        return List.of();
×
1165
                }
1166
                for (var boardId : boardsToAllocate) {
1✔
1167
                        sql.allocBoard.call(jobId, boardId);
1✔
1168
                }
1✔
1169

1170
                var board = boardsToAllocate.get(0);
1✔
1171
                sql.allocJob.call(rect.width, rect.height, rect.depth,
1✔
1172
                                board, boardsToAllocate.size(), board, jobId);
1✔
1173
                log.info("allocated {} boards to {}; issuing power up commands",
1✔
1174
                                boardsToAllocate.size(), jobId);
1✔
1175
                // Any proxies that existed are now defunct; user must make anew
1176
                rememberer.killProxies(jobId);
1✔
1177
                return setPower(sql, jobId, ON, READY);
1✔
1178
        }
1179

1180
        /**
1181
         * Reset a job after a failure on a BMP.
1182
         *
1183
         * @param jobId
1184
         *            The identifier of the job to reset.
1185
         */
1186
        @SuppressWarnings("FutureReturnValueIgnored")
1187
        public void resetPowerOnFailure(int jobId) {
1188
                scheduler.schedule(() -> setPower(jobId, OFF, QUEUED), Instant.now());
×
1189
        }
×
1190

1191
        @Override
1192
        public boolean setPower(int jobId, PowerState power, JobState targetState) {
1193
                if (targetState == DESTROYED) {
1✔
1194
                        throw new IllegalArgumentException(
×
1195
                                        "job destruction must be done via destroyJob() method");
1196
                }
1197
                var allocations = new Allocations(jobId, execute(conn -> {
1✔
1198
                        try (var sql = new PowerSQL(conn)) {
1✔
1199
                                return setPower(sql, jobId, power, targetState);
1✔
1200
                        }
1201
                }));
1202
                allocations.updateEpochs();
1✔
1203
                allocations.updateBMPs();
1✔
1204
                return allocations.notEmpty();
1✔
1205
        }
1206

1207
        /**
1208
         * Issue a request to change the power for the boards of a job.
1209
         *
1210
         * @param sql
1211
         *            How to talk to the DB
1212
         * @param jobId
1213
         *            The job in question
1214
         * @param power
1215
         *            The power state to switch to
1216
         * @param targetState
1217
         *            The state to put the job in afterwards
1218
         * @return The ids of the BMPs that have been changed
1219
         */
1220
        private Collection<BMPAndMachine> setPower(PowerSQL sql, int jobId,
1221
                        PowerState power, JobState targetState) {
1222
                var sourceState = sql.getJobState.call1(
1✔
1223
                                enumerate("job_state", JobState.class), jobId).orElseThrow();
1✔
1224
                var boards = sql.getJobBoards.call(BoardAndBMP::new, jobId);
1✔
1225
                if (boards.isEmpty()) {
1✔
1226
                        log.debug("No boards for job {}", jobId);
1✔
1227
                        if (targetState == DESTROYED) {
1✔
1228
                                log.debug("no boards for {} in destroy", jobId);
1✔
1229
                        }
1230
                        // Make sure we still update the job!
1231
                        updateJob(jobId, sourceState, targetState);
1✔
1232
                        return List.of();
1✔
1233
                }
1234
                log.debug("{} boards for job {}", boards.size(), jobId);
1✔
1235

1236
                // Number of changes pending, one per board
1237
                int numPending = 0;
1✔
1238

1239
                record Perimeter(int boardId, Direction direction) {
1✔
1240
                        Perimeter(Row row) {
1241
                                this(row.getInt("board_id"),
1✔
1242
                                                row.getEnum("direction", Direction.class));
1✔
1243
                        }
1✔
1244
                }
1245

1246
                var bmps = new HashSet<BMPAndMachine>();
1✔
1247
                if (power == ON) {
1✔
1248
                        /*
1249
                         * This is a bit of a trickier case, as we need to say which links
1250
                         * are to be switched on or, more particularly, which are to be
1251
                         * switched off because they are links to boards that are not
1252
                         * allocated to the job. Off-board links are shut off by default.
1253
                         */
1254
                        var perimeterLinks =
1✔
1255
                                        Row.stream(sql.getPerimeter.call(Perimeter::new, jobId))
1✔
1256
                                                        .toCollectingMap(Direction.class,
1✔
1257
                                                                        Perimeter::boardId, Perimeter::direction);
1258

1259
                        for (var board : boards) {
1✔
1260
                                var toChange = perimeterLinks.getOrDefault(board.boardId,
1✔
1261
                                                NO_PERIMETER);
1262
                                numPending += sql.issuePowerChange.call(jobId,
1✔
1263
                                                board.boardId,
1✔
1264
                                                sourceState, targetState, true,
1✔
1265
                                                !toChange.contains(Direction.N),
1✔
1266
                                                !toChange.contains(Direction.E),
1✔
1267
                                                !toChange.contains(Direction.SE),
1✔
1268
                                                !toChange.contains(Direction.S),
1✔
1269
                                                !toChange.contains(Direction.W),
1✔
1270
                                                !toChange.contains(Direction.NW));
1✔
1271
                                bmps.add(board.bmp);
1✔
1272
                        }
1✔
1273
                } else {
1✔
1274
                        // Powering off; all links switch to off so no perimeter check
1275
                        for (var board : boards) {
1✔
1276
                                numPending += sql.issuePowerChange.call(jobId,
1✔
1277
                                                board.boardId,        sourceState, targetState,
1✔
1278
                                                false, false, false, false,        false, false, false);
1✔
1279
                                bmps.add(board.bmp);
1✔
1280
                        }
1✔
1281
                }
1282

1283
                if (targetState == DESTROYED) {
1✔
1284
                        log.debug("num changes for {} in destroy: {}", jobId, numPending);
1✔
1285
                        log.info("destroying job {} after power change", jobId);
1✔
1286
                        int rows = sql.setStateDestroyed.call(numPending, jobId);
1✔
1287
                        if (rows != 1) {
1✔
1288
                                log.warn("unexpected number of jobs marked destroyed: {}",
×
1289
                                                rows);
×
1290
                        }
1291
                } else {
1✔
1292
                        log.debug("Num changes for target {}: {}", targetState, numPending);
1✔
1293
                        sql.setStatePending.call(
1✔
1294
                                numPending > 0 ? POWER : targetState,
1✔
1295
                                numPending, jobId);
1✔
1296
                }
1297

1298
                return bmps;
1✔
1299
        }
1300

1301
        /**
1302
         * Operations for testing only.
1303
         *
1304
         * @hidden
1305
         */
1306
        @ForTestingOnly
1307
        interface TestAPI {
1308
                /**
1309
                 * Allocate all current requests for resources.
1310
                 *
1311
                 * @return The BMPs updated by the allocation.
1312
                 */
1313
                Allocations allocate();
1314

1315
                /**
1316
                 * Destroy a job.
1317
                 *
1318
                 * @param id
1319
                 *            The ID of the job
1320
                 * @param reason
1321
                 *            Why is the job being destroyed.
1322
                 * @return The BMPs updated by the destruction.
1323
                 */
1324
                Allocations destroyJob(int id, String reason);
1325

1326
                /**
1327
                 * Destroy jobs that have missed their keepalive.
1328
                 *
1329
                 * @return The BMPs updated by the expiry.
1330
                 */
1331
                Allocations expireJobs();
1332
        }
1333

1334
        /** Operations for testing historical database only. */
1335
        @ForTestingOnly
1336
        interface HistTestAPI {
1337

1338

1339
                /**
1340
                 * Implementation of {@link AllocatorTask#tombstone()}.
1341
                 *
1342
                 * @return Description of the tombstoned IDs
1343
                 */
1344
                Copied tombstone();
1345
        }
1346

1347
        /**
1348
         * @param conn
1349
         *            The DB connection
1350
         * @return The test interface.
1351
         * @deprecated This interface is just for testing.
1352
         * @hidden
1353
         */
1354
        @ForTestingOnly
1355
        @RestrictedApi(explanation = "just for testing", link = "index.html",
1356
                        allowedOnPath = ".*/src/test/java/.*")
1357
        @Deprecated
1358
        TestAPI getTestAPI(Connection conn) {
1359
                ForTestingOnly.Utils.checkForTestClassOnStack();
1✔
1360
                return new TestAPI() {
1✔
1361
                        @Override
1362
                        public Allocations allocate() {
1363
                                return AllocatorTask.this.allocate(conn);
1✔
1364
                        }
1365

1366
                        @Override
1367
                        public Allocations destroyJob(int id, String reason) {
1368
                                return new Allocations(id,
1✔
1369
                                                AllocatorTask.this.destroyJob(conn, id, reason));
1✔
1370
                        }
1371

1372
                        @Override
1373
                        public Allocations expireJobs() {
1374
                                return AllocatorTask.this.expireJobs(conn);
1✔
1375
                        }
1376
                };
1377
        }
1378

1379
        /**
1380
         * @param conn
1381
         *            The DB connection
1382
         * @param histConn
1383
         *            The historical DB connection
1384
         * @return The test interface.
1385
         * @deprecated This interface is just for testing.
1386
         */
1387
        @ForTestingOnly
1388
        @RestrictedApi(explanation = "just for testing", link = "index.html",
1389
                        allowedOnPath = ".*/src/test/java/.*")
1390
        @Deprecated
1391
        HistTestAPI getHistTestAPI(Connection conn, Connection histConn) {
1392
                ForTestingOnly.Utils.checkForTestClassOnStack();
1✔
1393
                return new HistTestAPI() {
1✔
1394
                        @Override
1395
                        public Copied tombstone() {
1396
                                return AllocatorTask.this.tombstone(conn, histConn);
1✔
1397
                        }
1398
                };
1399
        }
1400
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc