• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 13363029881

07 Feb 2025 12:42PM UTC coverage: 38.579% (-0.004%) from 38.583%
13363029881

push

github

web-flow
Merge pull request #1220 from SpiNNakerManchester/fix_tombstone_again

Fix it and test it this time

9183 of 23803 relevant lines covered (38.58%)

1.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.39
/SpiNNaker-allocserv/src/main/java/uk/ac/manchester/spinnaker/alloc/allocator/AllocatorTask.java
1
/*
2
 * Copyright (c) 2021 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.alloc.allocator;
17

18
import static java.lang.Math.ceil;
19
import static java.lang.Math.max;
20
import static java.lang.Math.min;
21
import static java.lang.Math.sqrt;
22
import static java.lang.String.format;
23
import static java.util.Objects.nonNull;
24
import static org.slf4j.LoggerFactory.getLogger;
25
import static uk.ac.manchester.spinnaker.alloc.Constants.TRIAD_DEPTH;
26
import static uk.ac.manchester.spinnaker.alloc.db.Row.enumerate;
27
import static uk.ac.manchester.spinnaker.alloc.db.Row.integer;
28
import static uk.ac.manchester.spinnaker.alloc.db.Utils.isBusy;
29
import static uk.ac.manchester.spinnaker.alloc.model.JobState.DESTROYED;
30
import static uk.ac.manchester.spinnaker.alloc.model.JobState.POWER;
31
import static uk.ac.manchester.spinnaker.alloc.model.JobState.QUEUED;
32
import static uk.ac.manchester.spinnaker.alloc.model.JobState.READY;
33
import static uk.ac.manchester.spinnaker.alloc.model.PowerState.OFF;
34
import static uk.ac.manchester.spinnaker.alloc.model.PowerState.ON;
35
import static uk.ac.manchester.spinnaker.utils.MathUtils.ceildiv;
36

37
import java.time.Instant;
38
import java.util.ArrayList;
39
import java.util.Collection;
40
import java.util.EnumSet;
41
import java.util.HashSet;
42
import java.util.List;
43
import java.util.Objects;
44
import java.util.Set;
45
import java.util.stream.Stream;
46

47
import javax.annotation.PostConstruct;
48

49
import org.slf4j.Logger;
50
import org.springframework.beans.factory.annotation.Autowired;
51
import org.springframework.dao.DataAccessException;
52
import org.springframework.scheduling.TaskScheduler;
53
import org.springframework.scheduling.support.CronTrigger;
54
import org.springframework.stereotype.Service;
55

56
import com.google.errorprone.annotations.RestrictedApi;
57

58
import uk.ac.manchester.spinnaker.alloc.ForTestingOnly;
59
import uk.ac.manchester.spinnaker.alloc.ServiceMasterControl;
60
import uk.ac.manchester.spinnaker.alloc.SpallocProperties.AllocatorProperties;
61
import uk.ac.manchester.spinnaker.alloc.SpallocProperties.HistoricalDataProperties;
62
import uk.ac.manchester.spinnaker.alloc.SpallocProperties.KeepaliveProperties;
63
import uk.ac.manchester.spinnaker.alloc.bmp.BMPController;
64
import uk.ac.manchester.spinnaker.alloc.db.DatabaseAPI.Connection;
65
import uk.ac.manchester.spinnaker.alloc.db.DatabaseAPI.Query;
66
import uk.ac.manchester.spinnaker.alloc.db.DatabaseAPI.Update;
67
import uk.ac.manchester.spinnaker.alloc.db.DatabaseAwareBean;
68
import uk.ac.manchester.spinnaker.alloc.db.Row;
69
import uk.ac.manchester.spinnaker.alloc.db.SQLQueries;
70
import uk.ac.manchester.spinnaker.alloc.model.BMPAndMachine;
71
import uk.ac.manchester.spinnaker.alloc.model.BoardAndBMP;
72
import uk.ac.manchester.spinnaker.alloc.model.Direction;
73
import uk.ac.manchester.spinnaker.alloc.model.JobState;
74
import uk.ac.manchester.spinnaker.alloc.model.PowerState;
75
import uk.ac.manchester.spinnaker.machine.board.TriadCoords;
76
import uk.ac.manchester.spinnaker.utils.UsedInJavadocOnly;
77

78
/**
79
 * The allocation engine. Allocations are performed by running suitable
80
 * (non-trivial) SQL queries on a periodic basis, putting jobs that cannot be
81
 * allocated back on the queue for a later attempt (and increasing their
82
 * priority when it does so). This class is also responsible for destroying jobs
83
 * that are not kept alive sufficiently frequently, and eventually migrating
84
 * records of dead jobs to long-term storage ("tombstoning").
85
 */
86
@Service
87
public class AllocatorTask extends DatabaseAwareBean
3✔
88
                implements PowerController {
89
        /**
90
         * @see #setPower(Connection,int,PowerState)
91
         */
92
        private static final EnumSet<Direction> NO_PERIMETER =
3✔
93
                        EnumSet.noneOf(Direction.class);
3✔
94

95
        private static final Logger log = getLogger(AllocatorTask.class);
3✔
96

97
        private static final Rectangle ONE_BOARD = new Rectangle(1, 1, 1);
3✔
98

99
        /**
100
         * Maximum number of jobs that we actually run the quota check for. Used
101
         * because we are reusing a query, and we'll probably never have that many
102
         * live jobs even on the big machine.
103
         */
104
        private static final Integer NUMBER_OF_JOBS_TO_QUOTA_CHECK = 100000;
3✔
105

106
        private static final String DESTROY_ON_POWER_ERROR =
107
                        "Error changing power state!  Please contact an administrator.";
108

109
        @Autowired
110
        private Epochs epochs;
111

112
        @Autowired
113
        private ServiceMasterControl serviceControl;
114

115
        @Autowired
116
        private QuotaManager quotaManager;
117

118
        @Autowired
119
        private AllocatorProperties allocProps;
120

121
        @Autowired
122
        private KeepaliveProperties keepAliveProps;
123

124
        @Autowired
125
        private HistoricalDataProperties historyProps;
126

127
        @Autowired
128
        private ProxyRememberer rememberer;
129

130
        @Autowired
131
        private TaskScheduler scheduler;
132

133
        // Note can't be autowired as circular;
134
        // instead set by setter in postconstruct of BMPController
135
        private BMPController bmpController;
136

137
        public void setBMPController(BMPController bmpController) {
138
                this.bmpController = bmpController;
3✔
139
        }
3✔
140

141
        @PostConstruct
142
        @SuppressWarnings("FutureReturnValueIgnored")
143
        private void init() {
144
                scheduler.scheduleAtFixedRate(() -> allocate(),        allocProps.getPeriod());
3✔
145
                scheduler.scheduleAtFixedRate(() -> expireJobs(),
3✔
146
                                keepAliveProps.getExpiryPeriod());
3✔
147
                scheduler.schedule(() -> tombstone(),
3✔
148
                                new CronTrigger(historyProps.getSchedule()));
3✔
149
        }
3✔
150

151
        /**
152
         * Perform update on a job now as a result of a change.
153
         *
154
         * @param jobId
155
         *            The job to update.
156
         * @param sourceState
157
         *            The change source state.
158
         * @param targetState
159
         *            The change target state.
160
         */
161
        public void updateJob(int jobId, JobState sourceState,
162
                        JobState targetState) {
163
                scheduler.schedule(() -> updateJobNow(jobId, sourceState, targetState),
3✔
164
                                Instant.now());
3✔
165
        }
3✔
166

167
        private void updateJobNow(int jobId, JobState sourceState,
168
                        JobState targetState) {
169
                try {
170
                        var updated = execute(
3✔
171
                                        conn -> update(jobId, sourceState, targetState, conn));
3✔
172
                        if (updated) {
3✔
173
                                log.debug("advancing job {} epoch", jobId);
3✔
174
                                epochs.jobChanged(jobId);
3✔
175
                        }
176
                } catch (DataAccessException e) {
×
177
                        if (isBusy(e)) {
×
178
                                log.info("database is busy; "
×
179
                                                + "will try allocation processing later");
180
                                return;
×
181
                        }
182
                        throw e;
×
183
                }
3✔
184
        }
3✔
185

186
        private boolean update(int jobId, JobState sourceState,
187
                        JobState targetState, Connection c) {
188
                log.debug("Updating job {} from {} to {}", jobId, sourceState,
3✔
189
                                targetState);
190
                try (var getChangeStatus = c.query(COUNT_CHANGES_FOR_JOB);
3✔
191
                                var setJobState = c.update(SET_STATE_PENDING);
3✔
192
                                var setJobDestroyed = c.update(SET_STATE_DESTROYED);
3✔
193
                                var deleteChanges = c.update(DELETE_PENDING);
3✔
194
                                var deleteTask = c.update(DELETE_TASK)) {
3✔
195
                        // Count pending changes for this state change
196
                        var status = getChangeStatus.call1(ChangeStatus::new,
3✔
197
                                        jobId, sourceState, targetState).orElseThrow(
3✔
198
                                                        () -> new RuntimeException(
×
199
                                                                        "Error counting job tasks"));
200

201
                        log.debug("Job {} has {} changes remaining", jobId,
3✔
202
                                        status.nChanges);
3✔
203

204
                        // If the remaining things are errors, react (if there are errors,
205
                        // eventually non-errors will be deleted)
206
                        if (status.nErrors > 0 && (status.nErrors == status.nChanges)) {
3✔
207
                                log.info("Job {} changes resulted in errors.", jobId);
3✔
208

209
                                // We can delete the changes now as we know the issues
210
                                deleteChanges.call(jobId, sourceState, targetState);
3✔
211

212
                                // If we are going to destroyed, we can mostly ignore errors,
213
                                // and similarly if we are going to queue it again anyway
214
                                if (targetState == DESTROYED || targetState == QUEUED) {
3✔
215
                                        return true;
×
216
                                }
217

218
                                // If the job was ready before we tried to do this, we have to
219
                                // destroy the job with an error!
220
                                if (sourceState == READY) {
3✔
221
                                        destroyJob(c, jobId, DESTROY_ON_POWER_ERROR);
×
222
                                        return true;
×
223
                                }
224

225
                                // If the job was not ready, we need to re-queue and reallocate
226
                                // boards
227
                                scheduler.schedule(() -> setPower(jobId, OFF, QUEUED),
3✔
228
                                                Instant.now());
3✔
229
                                return false;
3✔
230
                        } else if (status.nChanges > 0) {
3✔
231
                                // There are still changes happening - let them finish first
232
                                // even if there are errors as safer to do once everything
233
                                // is done.
234
                                return false;
×
235
                        }
236

237
                        // If there are no more pending changes and no errors,
238
                        // set the job state to the target state
239
                        log.debug("Job {} moving to state {}", jobId, targetState);
3✔
240

241
                        // If destroyed, we don't need to set the state as it will be gone
242
                        if (targetState == DESTROYED) {
3✔
243
                                int rows = setJobDestroyed.call(jobId);
3✔
244
                                if (rows != 1) {
3✔
245
                                        log.warn("unexpected number of rows affected by "
3✔
246
                                                        + "destroy in state update: {}", rows);
3✔
247
                                }
248
                                return rows > 0;
3✔
249

250
                        // If ready, we can now delete the job request too
251
                        } else if (targetState == READY) {
3✔
252
                                int rows = deleteTask.call(jobId);
3✔
253
                                if (rows != 1) {
3✔
254
                                        log.warn("unexpected number of rows affected by ready in "
×
255
                                                        + "state update: {}", rows);
×
256
                                }
257
                        }
258

259
                        return setJobState.call(targetState, jobId) > 0;
3✔
260
                }
3✔
261
        }
262

263
        /**
264
         * Helper class representing a rectangle of triads.
265
         *
266
         * @author Donal Fellows
267
         */
268
        private static final class Rectangle {
269
                final int width;
270

271
                final int height;
272

273
                /** Depth of rectangle. 1 or 3 */
274
                final int depth;
275

276
                private Rectangle(int width, int height, int depth) {
3✔
277
                        this.width = width;
3✔
278
                        this.height = height;
3✔
279
                        this.depth = depth;
3✔
280
                }
3✔
281

282
                private Rectangle(Row row) {
283
                        this(row.getInt("max_width"), row.getInt("max_height"),
3✔
284
                                        TRIAD_DEPTH);
285
                }
3✔
286

287
                @Override
288
                public String toString() {
289
                        return format("%dx%dx%d", width, height, depth);
×
290
                }
291

292
                public int getArea() {
293
                        return width * height * depth;
×
294
                }
295
        }
296

297
        /**
298
         * Ask for allocation to happen now.
299
         */
300
        public void scheduleAllocateNow() {
301
                scheduler.schedule(this::allocate, Instant.now());
3✔
302
        }
3✔
303

304
        /**
305
         * Allocate all current requests for resources.
306
         */
307
        public synchronized void allocate() {
308
                if (serviceControl.isPaused()) {
3✔
309
                        return;
3✔
310
                }
311

312
                try {
313
                        var allocated = execute(this::allocate);
×
314
                        allocated.updateEpochs();
×
315
                        allocated.updateBMPs();
×
316
                } catch (DataAccessException e) {
×
317
                        if (isBusy(e)) {
×
318
                                log.info("database is busy; "
×
319
                                                + "will try allocation processing later");
320
                                return;
×
321
                        }
322
                        throw e;
×
323
                }
×
324
        }
×
325

326
        private class Perimeter {
327
                int boardId;
328

329
                Direction direction;
330

331
                Perimeter(Row row) {
3✔
332
                        boardId = row.getInt("board_id");
3✔
333
                        direction = row.getEnum("direction", Direction.class);
3✔
334
                }
3✔
335
        }
336

337
        private class ChangeStatus {
338
                private final int nChanges;
339

340
                private final int nErrors;
341

342
                ChangeStatus(Row row) {
3✔
343
                        nChanges = row.getInt("n_changes");
3✔
344
                        nErrors = row.getInt("n_errors");
3✔
345
                }
3✔
346
        }
347

348
        /** Encapsulates the queries and updates used in power control. */
349
        private class PowerSQL extends AbstractSQL {
350
                /** Get basic information about a specific job. */
351
                private final Query getJobState;
352

353
                /** Get what boards are allocated to a job (that is queued or ready). */
354
                private final Query getJobBoards;
355

356
                /**
357
                 * Get the links on the perimeter of the allocation to a job. The
358
                 * perimeter is defined as being the links between a board that is part
359
                 * of the allocation and a board that is not; it's <em>not</em> a
360
                 * geometric definition, but rather a relational algebraic one.
361
                 */
362
                private final Query getPerimeter;
363

364
                /** Create a request to change the power status of a board. */
365
                private final Update issuePowerChange;
366

367
                /** Set the state and number of pending changes for a job. */
368
                private final Update setStatePending;
369

370
                /** Set the state to destroyed. */
371
                private final Update setStateDestroyed;
372

373
                PowerSQL(Connection conn) {
3✔
374
                        super(conn);
3✔
375
                        getJobState = conn.query(GET_JOB);
3✔
376
                        getJobBoards = conn.query(GET_JOB_BOARDS);
3✔
377
                        getPerimeter = conn.query(getPerimeterLinks);
3✔
378
                        issuePowerChange = conn.update(issueChangeForJob);
3✔
379
                        setStatePending = conn.update(SET_STATE_PENDING);
3✔
380
                        setStateDestroyed = conn.update(SET_STATE_DESTROYED);
3✔
381
                }
3✔
382

383
                @Override
384
                public void close() {
385
                        getJobState.close();
3✔
386
                        getJobBoards.close();
3✔
387
                        getPerimeter.close();
3✔
388
                        issuePowerChange.close();
3✔
389
                        setStatePending.close();
3✔
390
                        setStateDestroyed.close();
3✔
391
                }
3✔
392
        }
393

394
        /** Encapsulates the queries and updates used in allocation. */
395
        @UsedInJavadocOnly(SQLQueries.class)
396
        private final class AllocSQL extends PowerSQL {
397
                /** Increases the importance of a job. */
398
                private final Update bumpImportance;
399

400
                /** Get the list of allocation tasks for jobs in a given state. */
401
                private final Query getTasks;
402

403
                /** Find a single free board. */
404
                private final Query findFreeBoard;
405

406
                /**
407
                 * Find a rectangle of triads of boards that may be allocated.
408
                 *
409
                 * @see SQLQueries#FIND_FREE_BOARD
410
                 */
411
                private final Query getRectangles;
412

413
                /**
414
                 * Find a rectangle of triads of boards that may be allocated rooted at
415
                 * a particular board.
416
                 *
417
                 * @see SQLQueries#FIND_FREE_BOARD_AT
418
                 */
419
                private final Query getRectangleAt;
420

421
                /**
422
                 * Count the number of <em>connected</em> boards (i.e., have at least
423
                 * one path over enabled links to the root board of the allocation)
424
                 * within a rectangle of triads. The triads are taken as being full
425
                 * depth.
426
                 */
427
                private final Query countConnectedBoards;
428

429
                /**
430
                 * Find an allocatable board with a specific board ID. (This will have
431
                 * been previously converted from some other form of board coordinates.)
432
                 */
433
                private final Query findSpecificBoard;
434

435
                /**
436
                 * Get the set of boards at some coordinates within a triad rectangle
437
                 * that are connected (i.e., have at least one path over enableable
438
                 * links) to the root board.
439
                 */
440
                private final Query getConnectedBoardIDs;
441

442
                /** Tell a board that it is allocated. */
443
                private final Update allocBoard;
444

445
                /** Tell a job that it is allocated. Doesn't set the state. */
446
                private final Update allocJob;
447

448
                AllocSQL(Connection conn) {
3✔
449
                        super(conn);
3✔
450
                        bumpImportance = conn.update(BUMP_IMPORTANCE);
3✔
451
                        getTasks = conn.query(getAllocationTasks);
3✔
452
                        findFreeBoard = conn.query(FIND_FREE_BOARD);
3✔
453
                        getRectangles = conn.query(findRectangle);
3✔
454
                        getRectangleAt = conn.query(findRectangleAt);
3✔
455
                        countConnectedBoards = conn.query(countConnected);
3✔
456
                        findSpecificBoard = conn.query(FIND_LOCATION);
3✔
457
                        getConnectedBoardIDs = conn.query(getConnectedBoards);
3✔
458
                        allocBoard = conn.update(ALLOCATE_BOARDS_BOARD);
3✔
459
                        allocJob = conn.update(ALLOCATE_BOARDS_JOB);
3✔
460
                }
3✔
461

462
                @Override
463
                public void close() {
464
                        super.close();
3✔
465
                        bumpImportance.close();
3✔
466
                        getTasks.close();
3✔
467
                        findFreeBoard.close();
3✔
468
                        getRectangles.close();
3✔
469
                        getRectangleAt.close();
3✔
470
                        countConnectedBoards.close();
3✔
471
                        findSpecificBoard.close();
3✔
472
                        getConnectedBoardIDs.close();
3✔
473
                        allocBoard.close();
3✔
474
                        allocJob.close();
3✔
475
                }
3✔
476
        }
477

478
        /** Encapsulates the queries and updates used in deletion. */
479
        private final class DestroySQL extends PowerSQL {
480
                /** Get basic information about a specific job. */
481
                private final Query getJob = conn.query(GET_JOB);
3✔
482

483
                /** Mark a job as dead. */
484
                private final Update markAsDestroyed = conn.update(DESTROY_JOB);
3✔
485

486
                /** Note the reason why a job is dead. */
487
                private Update recordDestroyReason = conn.update(NOTE_DESTROY_REASON);
3✔
488

489
                /** Delete a request to allocate resources for a job. */
490
                private final Update killAlloc = conn.update(KILL_JOB_ALLOC_TASK);
3✔
491

492
                DestroySQL(Connection conn) {
3✔
493
                        super(conn);
3✔
494
                }
3✔
495

496
                @Override
497
                public void close() {
498
                        super.close();
3✔
499
                        getJob.close();
3✔
500
                        markAsDestroyed.close();
3✔
501
                        recordDestroyReason.close();
3✔
502
                        killAlloc.close();
3✔
503
                }
3✔
504
        }
505

506
        private class AllocTask {
507
                final int id;
508

509
                final int importance;
510

511
                final int jobId;
512

513
                final int machineId;
514

515
                final Rectangle max;
516

517
                final int maxDeadBoards;
518

519
                final Integer numBoards;
520

521
                final Integer width;
522

523
                final Integer height;
524

525
                final Integer root;
526

527
                AllocTask(Row row) {
3✔
528
                        id = row.getInt("req_id");
3✔
529
                        importance = row.getInt("importance");
3✔
530
                        jobId = row.getInt("job_id");
3✔
531
                        machineId = row.getInt("machine_id");
3✔
532
                        max = new Rectangle(row);
3✔
533
                        maxDeadBoards = row.getInt("max_dead_boards");
3✔
534
                        numBoards = row.getInteger("num_boards");
3✔
535
                        width = row.getInteger("width");
3✔
536
                        height = row.getInteger("height");
3✔
537
                        root = row.getInteger("board_id");
3✔
538
                }
3✔
539

540
                Collection<BMPAndMachine> allocate(AllocSQL sql) {
541
                        if (nonNull(numBoards) && numBoards > 0) {
3✔
542
                                // Single-board case gets its own allocator that's better at
543
                                // that
544
                                if (numBoards == 1) {
3✔
545
                                        log.debug("Allocate one board");
3✔
546
                                        return allocateOneBoard(sql, jobId, machineId);
3✔
547
                                }
548
                                var estimate = new DimensionEstimate(numBoards, max);
3✔
549
                                return allocateDimensions(sql, jobId, machineId, estimate,
3✔
550
                                                maxDeadBoards);
551
                        }
552

553
                        if (nonNull(width) && nonNull(height) && nonNull(root)) {
3✔
554
                                return allocateTriadsAt(sql, jobId, machineId, root, width,
×
555
                                                height,        maxDeadBoards);
×
556
                        }
557

558
                        if (nonNull(width) && nonNull(height) && width > 0 && height > 0) {
3✔
559
                                // Special case; user is really just asking for one board
560
                                if (height == 1 && width == 1 && nonNull(maxDeadBoards)
3✔
561
                                                && maxDeadBoards == 2) {
562
                                        return allocateOneBoard(sql, jobId, machineId);
3✔
563
                                }
564
                                var estimate = new DimensionEstimate(width, height, max);
3✔
565
                                log.debug(
3✔
566
                                                "resolved request for {}x{} boards to {}x{} triads "
567
                                                                + "with tolerance {}",
568
                                                width, height, estimate.width, estimate.height,
3✔
569
                                                estimate.tolerance);
3✔
570
                                return allocateDimensions(sql, jobId, machineId, estimate,
3✔
571
                                                maxDeadBoards);
572
                        }
573

574
                        if (nonNull(root)) {
3✔
575
                                // Ignores maxDeadBoards; is a single-board allocate
576
                                return allocateBoard(sql, jobId, machineId, root);
3✔
577
                        }
578

579
                        log.warn("job {} could not be allocated; "
×
580
                                        + "bad request will be cleared from queue", jobId);
×
581
                        return List.of();
×
582
                }
583
        }
584

585
        /**
586
         * A set of information about the allocations that have been made.
587
         */
588
        class Allocations {
589

590
                /** The BMPs that have been affected by the allocations. **/
591
                final Set<Integer> bmps = new HashSet<>();
3✔
592

593
                /** The Machines that have been affected by the allocations. **/
594
                final Set<Integer> machines = new HashSet<>();
3✔
595

596
                /** The jobs that have been affected by the allocations. **/
597
                final List<Integer> jobIds = new ArrayList<>();
3✔
598

599
                Allocations() {
3✔
600
                        // Does nothing
601
                }
3✔
602

603
                Allocations(int jobId, Collection<BMPAndMachine> bmps) {
3✔
604
                        addAll(jobId, bmps);
3✔
605
                }
3✔
606

607
                void addAll(int jobId, Collection<BMPAndMachine> bmps) {
608
                        if (bmps.size() > 0) {
3✔
609
                                jobIds.add(jobId);
3✔
610
                                for (var bm : bmps) {
3✔
611
                                        this.bmps.add(bm.bmpId);
3✔
612
                                        this.machines.add(bm.machineId);
3✔
613
                                }
3✔
614
                        }
615
                }
3✔
616

617
                void updateEpochs() {
618
                        log.debug("Updating jobs {}", jobIds);
3✔
619
                        for (var job : jobIds) {
3✔
620
                                epochs.jobChanged(job);
3✔
621
                        }
3✔
622
                        log.debug("Updating machines {}", machines);
3✔
623
                        for (var m : machines) {
3✔
624
                                epochs.machineChanged(m);
3✔
625
                        }
3✔
626
                }
3✔
627

628
                void updateBMPs() {
629
                        if (!bmps.isEmpty() && bmpController != null) {
3✔
630
                                // Poke the BMP controller to start looking!
631
                                log.debug("Triggering BMPs {}", bmps);
3✔
632
                                bmpController.triggerSearch(bmps);
3✔
633
                        }
634
                }
3✔
635

636
                boolean notEmpty() {
637
                        return !jobIds.isEmpty();
3✔
638
                }
639
        }
640

641
        /**
642
         * Allocate all current requests for resources.
643
         *
644
         * @param conn
645
         *            The DB connection
646
         * @return Whether any changes have been done
647
         */
648
        private Allocations allocate(Connection conn) {
649
                try (var sql = new AllocSQL(conn)) {
3✔
650
                        int maxImportance = -1;
3✔
651
                        log.trace("Allocate running");
3✔
652
                        var allocations = new Allocations();
3✔
653
                        var tasks = sql.getTasks.call(AllocTask::new, QUEUED);
3✔
654
                        log.debug("allocate for {} tasks", tasks.size());
3✔
655
                        for (AllocTask task : tasks) {
3✔
656
                                if (task.importance > maxImportance) {
3✔
657
                                        maxImportance = task.importance;
3✔
658
                                } else if (task.importance < maxImportance
×
659
                                                - allocProps.getImportanceSpan()) {
×
660
                                        // Too much of a span
661
                                        continue;
×
662
                                }
663
                                var handled = task.allocate(sql);
3✔
664
                                allocations.addAll(task.jobId, handled);
3✔
665
                                log.debug("allocate for {} (job {}): {}", task.id,
3✔
666
                                                task.jobId, handled);
3✔
667
                        }
3✔
668
                        /*
669
                         * Those tasks which weren't allocated get their importance bumped
670
                         * so they get considered with higher priority when the allocator
671
                         * runs next time.
672
                         */
673
                        sql.bumpImportance.call();
3✔
674
                        return allocations;
3✔
675
                }
676
        }
677

678
        /**
679
         * Destroy jobs that have missed their keepalive.
680
         */
681
        public void expireJobs() {
682
                if (serviceControl.isPaused()) {
3✔
683
                        return;
3✔
684
                }
685

686
                try {
687
                        var allocated = execute(this::expireJobs);
×
688
                        allocated.updateEpochs();
×
689
                        allocated.updateBMPs();
×
690
                } catch (DataAccessException e) {
×
691
                        if (isBusy(e)) {
×
692
                                log.info("database is busy; "
×
693
                                                + "will try job expiry processing later");
694
                                return;
×
695
                        }
696
                        throw e;
×
697
                }
×
698
        }
×
699

700
        /**
701
         * Destroy jobs that have missed their keepalive.
702
         *
703
         * @param conn
704
         *            How to talk to the DB
705
         * @return Whether any jobs have been expired.
706
         */
707
        private Allocations expireJobs(Connection conn) {
708
                var allocations = new Allocations();
3✔
709
                try (var find = conn.query(FIND_EXPIRED_JOBS)) {
3✔
710
                        var toKill = find.call(integer("job_id"));
3✔
711
                        for (var id : toKill) {
3✔
712
                                allocations.addAll(id,
3✔
713
                                                destroyJob(conn, id, "keepalive expired"));
3✔
714
                        }
3✔
715
                }
716
                try (var find = conn.query(GET_LIVE_JOB_IDS)) {
3✔
717
                        var toKill = find.call(integer("job_id"),
3✔
718
                                        NUMBER_OF_JOBS_TO_QUOTA_CHECK, 0);
3✔
719
                        for (var id : toKill) {
3✔
720
                                if (quotaManager.shouldKillJob(id)) {
×
721
                                        allocations.addAll(id,
×
722
                                                        destroyJob(conn, id, "quota exceeded"));
×
723
                                }
724
                        }
×
725
                }
726
                return allocations;
3✔
727
        }
728

729
        /**
730
         * Migrates long dead jobs to the historical data DB.
731
         */
732
        public void tombstone() {
733
                if (serviceControl.isPaused()) {
×
734
                        return;
×
735
                }
736

737
                try (var conn = getConnection();
×
738
                                var histConn = getHistoricalConnection()) {
×
739
                        var c = tombstone(conn, histConn);
×
740
                        log.info("tombstoning completed: "
×
741
                                        + "moved {} job records and {} allocation records",
742
                                        c.numJobs(), c.numAllocs());
×
743
                } catch (DataAccessException e) {
×
744
                        if (isBusy(e)) {
×
745
                                log.info("database is busy; "
×
746
                                                + "will try job tombstone processing at future date");
747
                                return;
×
748
                        }
749
                        throw e;
×
750
                }
×
751
        }
×
752

753
        /**
754
         * Describes what the first stage of the tombstoner has copied.
755
         */
756
        static final class Copied {
757
                private final List<HistoricalJob> jobs;
758

759
                private final List<HistoricalAlloc> allocs;
760

761
                private Copied(List<HistoricalJob> jobs, List<HistoricalAlloc> allocs) {
3✔
762
                        this.jobs = jobs;
3✔
763
                        this.allocs = allocs;
3✔
764
                }
3✔
765

766
                private Stream<HistoricalAlloc> allocs() {
767
                        return allocs.stream().filter(Objects::nonNull);
3✔
768
                }
769

770
                private Stream<HistoricalJob> jobs() {
771
                        return jobs.stream().filter(Objects::nonNull);
3✔
772
                }
773

774
                private Stream<Integer> nmpiJobs() {
775
                        return jobs().map(j -> j.nmpiJobId).filter(Objects::nonNull);
3✔
776
                }
777

778
                private Stream<Integer> nmpiSessions() {
779
                        return jobs().map(j -> j.nmpiSessionId).filter(Objects::nonNull);
3✔
780
                }
781

782
                /**
783
                 * @return The number of job records to copy over to the historical
784
                 *         database.
785
                 */
786
                int numJobs() {
787
                        return jobs.size();
3✔
788
                }
789

790
                /**
791
                 * @return The number of board allocation records to copy over to the
792
                 *         historical database.
793
                 */
794
                int numAllocs() {
795
                        return allocs.size();
3✔
796
                }
797
        }
798

799
        private class HistoricalAlloc {
800
                int allocId;
801

802
                int jobId;
803

804
                int boardId;
805

806
                Instant allocTimestamp;
807

808
                HistoricalAlloc(Row row) {
×
809
                        allocId = row.getInt("alloc_id");
×
810
                        jobId = row.getInt("job_id");
×
811
                        boardId = row.getInt("board_id");
×
812
                        allocTimestamp = row.getInstant("alloc_timestamp");
×
813
                }
×
814

815
                Object[] args() {
816
                        return new Object[] {
×
817
                                allocId, jobId, boardId, allocTimestamp
×
818
                        };
819
                }
820
        }
821

822
        private class HistoricalJob {
823
                int jobId;
824

825
                int machineId;
826

827
                String owner;
828

829
                Instant createTimestamp;
830

831
                int width;
832

833
                int height;
834

835
                int depth;
836

837
                int allocatedRoot;
838

839
                Instant keepaliveInterval;
840

841
                String keepaliveHost;
842

843
                String deathReason;
844

845
                Instant deathTimestamp;
846

847
                byte[] originalRequest;
848

849
                Instant allocationTimestamp;
850

851
                int allocationSize;
852

853
                String machineName;
854

855
                String userName;
856

857
                int groupId;
858

859
                String groupName;
860

861
                Integer nmpiJobId;
862

863
                Integer nmpiSessionId;
864

865
                HistoricalJob(Row row) {
3✔
866
                        jobId = row.getInt("job_id");
3✔
867
                        machineId = row.getInt("machine_id");
3✔
868
                        owner = row.getString("owner");
3✔
869
                        createTimestamp = row.getInstant("create_timestamp");
3✔
870
                        width = row.getInt("width");
3✔
871
                        height = row.getInt("height");
3✔
872
                        depth = row.getInt("depth");
3✔
873
                        allocatedRoot = row.getInt("allocated_root");
3✔
874
                        keepaliveInterval = row.getInstant("keepalive_interval");
3✔
875
                        keepaliveHost = row.getString("keepalive_host");
3✔
876
                        deathReason = row.getString("death_reason");
3✔
877
                        deathTimestamp = row.getInstant("death_timestamp");
3✔
878
                        originalRequest = row.getBytes("original_request");
3✔
879
                        allocationTimestamp = row.getInstant("allocation_timestamp");
3✔
880
                        allocationSize = row.getInt("allocation_size");
3✔
881
                        machineName = row.getString("machine_name");
3✔
882
                        userName = row.getString("user_name");
3✔
883
                        groupId = row.getInt("group_id");
3✔
884
                        groupName = row.getString("group_name");
3✔
885
                        nmpiJobId = row.getInteger("nmpi_job_id");
3✔
886
                        nmpiSessionId = row.getInteger("nmpi_session_id");
3✔
887
                }
3✔
888

889
                Object[] args() {
890
                        return new Object[] {
3✔
891
                                jobId, machineId, owner, createTimestamp,
3✔
892
                                width, height, depth, allocatedRoot, keepaliveInterval,
3✔
893
                                keepaliveHost, deathReason, deathTimestamp, originalRequest,
894
                                allocationTimestamp, allocationSize, machineName, userName,
3✔
895
                                groupId, groupName, nmpiJobId, nmpiSessionId
3✔
896
                        };
897
                }
898
        }
899

900
        /**
901
         * Implementation of {@link #tombstone()}. This is done as two transactions
902
         * to help manage the amount of locking (especially multi-DB locking);
903
         * nothing else ought to be updating any of these jobs at the time this task
904
         * usually runs, but we'll still try to keep things minimally locked.
905
         *
906
         * @param conn
907
         *            The DB connection
908
         * @return Description of the tombstoned IDs
909
         */
910
        private Copied tombstone(Connection conn, Connection histConn) {
911
                // No tombstoning without the target DB!
912
                if (!isHistoricalDBAvailable()) {
3✔
913
                        return new Copied(List.of(), List.of());
×
914
                }
915

916
                try (var readJobs = conn.query(READ_HISTORICAL_JOBS);
3✔
917
                                var readAllocs = conn.query(READ_HISTORICAL_ALLOCS);
3✔
918
                                var deleteJobs = conn.update(DELETE_JOB_RECORD);
3✔
919
                                var deleteAllocs = conn.update(DELETE_ALLOC_RECORD);
3✔
920
                                var deleteNMPIJob = conn.update(DELETE_NMPI_JOB);
3✔
921
                                var deleteNMPISession = conn.update(DELETE_NMPI_SESSION);
3✔
922
                                var writeJobs = histConn.update(WRITE_HISTORICAL_JOBS);
3✔
923
                                var writeAllocs = histConn.update(WRITE_HISTORICAL_ALLOCS)) {
3✔
924
                        var grace = historyProps.getGracePeriod();
3✔
925
                        var copied = conn.transaction(
3✔
926
                                        () -> new Copied(readJobs.call(HistoricalJob::new, grace),
3✔
927
                                                        readAllocs.call(HistoricalAlloc::new, grace)));
3✔
928
                        histConn.transaction(() -> {
3✔
929
                                copied.allocs().forEach((a) -> writeAllocs.call(a.args()));
3✔
930
                                copied.jobs().forEach((j) -> writeJobs.call(j.args()));
3✔
931
                        });
3✔
932
                        conn.transaction(() -> {
3✔
933
                                copied.nmpiJobs().forEach(deleteNMPIJob::call);
3✔
934
                                copied.nmpiSessions().forEach(deleteNMPISession::call);
3✔
935
                                copied.allocs().forEach((a) -> deleteAllocs.call(a.allocId));
3✔
936
                                copied.jobs().forEach((j) -> deleteJobs.call(j.jobId));
3✔
937
                        });
3✔
938
                        return copied;
3✔
939
                }
940
        }
941

942
        @Override
943
        public void destroyJob(int id, String reason) {
944
                var allocations = new Allocations(
3✔
945
                                id, execute(conn -> destroyJob(conn, id, reason)));
3✔
946
                allocations.updateEpochs();
3✔
947
                allocations.updateBMPs();
3✔
948
        }
3✔
949

950
        /**
951
         * Destroy a job.
952
         *
953
         * @param conn
954
         *            How to talk to the DB
955
         * @param id
956
         *            The ID of the job
957
         * @param reason
958
         *            Why is the job being destroyed.
959
         * @return Whether the job was destroyed.
960
         */
961
        private Collection<BMPAndMachine> destroyJob(Connection conn, int id,
962
                        String reason) {
963
                JobLifecycle.log.info("destroying job {} \"{}\"", id, reason);
3✔
964
                try (var sql = new DestroySQL(conn)) {
3✔
965
                        if (sql.getJob.call1(enumerate("job_state", JobState.class), id)
3✔
966
                                        .orElse(DESTROYED) == DESTROYED) {
3✔
967
                                log.info("job {} already destroyed", id);
3✔
968
                                /*
969
                                 * Don't do anything if the job doesn't exist or is already
970
                                 * destroyed
971
                                 */
972
                                return List.of();
3✔
973
                        }
974
                        /*
975
                         * Record the reason as a separate step here; state change can take
976
                         * some time.  It also doesn't matter if we do that twice.
977
                         */
978
                        sql.recordDestroyReason.call(reason, id);
3✔
979
                        // Inserts into pending_changes; these run after job is dead
980
                        var bmps = setPower(sql, id, OFF, DESTROYED);
3✔
981
                        sql.killAlloc.call(id);
3✔
982
                        sql.markAsDestroyed.call(reason, id);
3✔
983
                        log.info("job {} marked as destroyed", id);
3✔
984
                        return bmps;
3✔
985
                } finally {
3✔
986
                        quotaManager.finishJob(id);
3✔
987
                        rememberer.killProxies(id);
3✔
988
                }
989
        }
990

991
        /**
992
         * Computes the estimate of what sort of allocation will be required.
993
         * Converts a number of boards into a close-to-square size to search for.
994
         * <p>
995
         * With the big machine's level of resources, that's good enough. For now.
996
         *
997
         * @author Donal Fellows
998
         */
999
        private static final class DimensionEstimate {
1000
                /** The estimated width, in triads. */
1001
                final int width;
1002

1003
                /** The estimated height, in triads. */
1004
                final int height;
1005

1006
                /**
1007
                 * The number of boards in the rectangle of triads that we can tolerate
1008
                 * being down due to overallocation (due to the use of rectangles and
1009
                 * triads).
1010
                 */
1011
                final int tolerance;
1012

1013
                /**
1014
                 * Create an estimate of what to allocate. The old spalloc would take
1015
                 * hints at this point on the aspect ratio, but we don't bother; we
1016
                 * strongly prefer allocations "nearly square", going for making them
1017
                 * slightly taller than wide if necessary.
1018
                 *
1019
                 * @param numBoards
1020
                 *            The number of boards wanted.
1021
                 * @param max
1022
                 *            The size of the machine.
1023
                 */
1024
                DimensionEstimate(int numBoards, Rectangle max) {
3✔
1025
                        if (numBoards < 1) {
3✔
1026
                                throw new IllegalArgumentException(
×
1027
                                                "number of boards must be greater than zero");
1028
                        }
1029
                        int numTriads = ceildiv(numBoards, TRIAD_DEPTH);
3✔
1030
                        width = min((int) ceil(sqrt(numTriads)), max.width);
3✔
1031
                        height = min(ceildiv(numTriads, width), max.height);
3✔
1032
                        tolerance = (width * height * TRIAD_DEPTH) - numBoards;
3✔
1033
                        if (width < 1 || height < 1) {
3✔
1034
                                throw new IllegalArgumentException(
×
1035
                                                "computed dimensions must be greater than zero");
1036
                        }
1037
                        if (tolerance < 0) {
3✔
1038
                                throw new IllegalArgumentException(
×
1039
                                                "that job cannot possibly fit on this machine");
1040
                        }
1041
                }
3✔
1042

1043
                /**
1044
                 * Create an estimate of what to allocate. This does not need to be
1045
                 * "near square".
1046
                 *
1047
                 * @param w
1048
                 *            The width of the allocation requested, in triads.
1049
                 * @param h
1050
                 *            The height of the allocation requested, in triads.
1051
                 * @param max
1052
                 *            The size of the machine.
1053
                 */
1054
                DimensionEstimate(int w, int h, Rectangle max) {
3✔
1055
                        if (w < 1 || h < 1) {
3✔
1056
                                throw new IllegalArgumentException(
×
1057
                                                "dimensions must be greater than zero");
1058
                        }
1059
                        int numBoards = w * h * TRIAD_DEPTH;
3✔
1060
                        width = max(1, min(w, max.width));
3✔
1061
                        height = max(1, min(h, max.height));
3✔
1062
                        tolerance = (width * height * TRIAD_DEPTH) - numBoards;
3✔
1063
                        if (tolerance < 0) {
3✔
1064
                                throw new IllegalArgumentException(
3✔
1065
                                                "that job cannot possibly fit on this machine");
1066
                        }
1067
                }
3✔
1068

1069
                /** @return The estimated dimensions as a rectangle. */
1070
                Rectangle getRect() {
1071
                        return new Rectangle(width, height, TRIAD_DEPTH);
3✔
1072
                }
1073
        }
1074

1075
        private Collection<BMPAndMachine> allocateOneBoard(AllocSQL sql, int jobId,
1076
                        int machineId) {
1077
                // This is simplified; no subsidiary searching needed
1078
                return sql.findFreeBoard
3✔
1079
                                .call1(row -> setAllocation(sql, jobId,
3✔
1080
                                                ONE_BOARD, machineId, coords(row)), machineId)
3✔
1081
                                .orElse(List.of());
3✔
1082
        }
1083

1084
        private static TriadCoords coords(Row row) {
1085
                int x = row.getInt("x");
3✔
1086
                int y = row.getInt("y");
3✔
1087
                int z = row.getInt("z");
3✔
1088
                return new TriadCoords(x, y, z);
3✔
1089
        }
1090

1091
        private Collection<BMPAndMachine> allocateDimensions(AllocSQL sql,
1092
                        int jobId, int machineId, DimensionEstimate estimate,
1093
                        int userMaxDead) {
1094
                int tolerance = userMaxDead + estimate.tolerance;
3✔
1095
                int minArea =
3✔
1096
                                estimate.width * estimate.height * TRIAD_DEPTH - tolerance;
1097
                for (var root : sql.getRectangles
3✔
1098
                                .call(AllocatorTask::coords, estimate.width, estimate.height,
3✔
1099
                                                machineId, tolerance)) {
3✔
1100
                        if (minArea > 1) {
3✔
1101
                                /*
1102
                                 * Check that a minimum number of boards are reachable from the
1103
                                 * proposed root board. If the root board is isolated, we don't
1104
                                 * care if the rest of the allocation works because the rest of
1105
                                 * the toolchain won't cope.
1106
                                 */
1107
                                int size = connectedSize(sql, machineId, root, estimate);
3✔
1108
                                if (size < minArea) {
3✔
1109
                                        continue;
×
1110
                                }
1111
                        }
1112
                        return setAllocation(sql, jobId, estimate.getRect(), machineId,
3✔
1113
                                        root);
1114
                }
1115
                log.debug("Could not allocate min area {}", minArea);
×
1116
                return List.of();
×
1117
        }
1118

1119
        /**
1120
         * Find the number of boards that are reachable from the proposed root
1121
         * board.
1122
         *
1123
         * @param sql
1124
         *            How to talk to the DB
1125
         * @param machineId
1126
         *            The machine on which the allocation is happening
1127
         * @param root
1128
         *            Root logical coordinates
1129
         * @param width
1130
         *            The width of the planned allocation, in triads
1131
         * @param height
1132
         *            The width of the planned allocation, in triads
1133
         * @return How many boards in the allocation are reachable.
1134
         */
1135
        private int connectedSize(AllocSQL sql, int machineId, TriadCoords root,
1136
                        int width, int height) {
1137
                return sql.countConnectedBoards
3✔
1138
                                .call1(integer("connected_size"), machineId, root.x, root.y,
3✔
1139
                                                width, height).orElse(-1);
3✔
1140
        }
1141

1142
        /**
1143
         * Find the number of boards that are reachable from the proposed root
1144
         * board.
1145
         *
1146
         * @param sql
1147
         *            How to talk to the DB
1148
         * @param machineId
1149
         *            The machine on which the allocation is happening
1150
         * @param root
1151
         *            Root logical coordinates
1152
         * @param rect
1153
         *            The requested allocation dimensions
1154
         * @return How many boards in the allocation are reachable.
1155
         */
1156
        private int connectedSize(AllocSQL sql, int machineId, TriadCoords root,
1157
                        Rectangle rect) {
1158
                return connectedSize(sql, machineId, root, rect.width, rect.height);
×
1159
        }
1160

1161
        /**
1162
         * Find the number of boards that are reachable from the proposed root
1163
         * board.
1164
         *
1165
         * @param sql
1166
         *            How to talk to the DB
1167
         * @param machineId
1168
         *            The machine on which the allocation is happening
1169
         * @param root
1170
         *            Root logical coordinates
1171
         * @param estimate
1172
         *            The planned allocation dimensions
1173
         * @return How many boards in the allocation are reachable.
1174
         */
1175
        private int connectedSize(AllocSQL sql, int machineId, TriadCoords root,
1176
                        DimensionEstimate estimate) {
1177
                return connectedSize(sql, machineId, root, estimate.width,
3✔
1178
                                estimate.height);
1179
        }
1180

1181
        private Collection<BMPAndMachine> allocateBoard(AllocSQL sql, int jobId,
1182
                        int machineId, int boardId) {
1183
                return sql.findSpecificBoard
3✔
1184
                                .call1(row -> setAllocation(sql, jobId, ONE_BOARD, machineId,
3✔
1185
                                                coords(row)), machineId, boardId)
3✔
1186
                                .orElse(List.of());
3✔
1187
        }
1188

1189
        private Collection<BMPAndMachine> allocateTriadsAt(AllocSQL sql, int jobId,
1190
                        int machineId, int rootId, int width, int height,
1191
                        int maxDeadBoards) {
1192
                var rect = new Rectangle(width, height, TRIAD_DEPTH);
×
1193
                return sql.getRectangleAt
×
1194
                                .call1(AllocatorTask::coords, rootId, width, height, machineId,
×
1195
                                                maxDeadBoards)
×
1196
                                .filter(root -> connectedSize(sql, machineId, root,
×
1197
                                                rect) >= rect.getArea() - maxDeadBoards)
×
1198
                                .map(root -> setAllocation(sql, jobId, rect, machineId, root))
×
1199
                                .orElse(List.of());
×
1200
        }
1201

1202
        /**
1203
         * Does the actual allocation.
1204
         * <p>
1205
         * At this point, we've checked that there's enough boards <em>and</em> we
1206
         * are running in a transaction. We take particular care that we only
1207
         * actually allocate boards that are reachable from the root board; this is
1208
         * assumed by the SpiNNaker tools, so we'd better conform to that
1209
         * expectation. Fortunately, the bigger the allocation, the more likely that
1210
         * is true (and it is trivially true for single-board allocations.)
1211
         * <p>
1212
         * If you want a multi-board allocation, you'd better be allocating a full
1213
         * triad's-worth of depth or you'll get nothing.
1214
         *
1215
         * @param sql
1216
         *            How to talk to the DB
1217
         * @param jobId
1218
         *            What job are we allocating for
1219
         * @param rect
1220
         *            Proposed rectangle size
1221
         * @param machineId
1222
         *            What machine are we allocating on
1223
         * @param root
1224
         *            Proposed root coordinates
1225
         * @return The BMPs that have been used to make the allocation.
1226
         */
1227
        private Collection<BMPAndMachine> setAllocation(AllocSQL sql, int jobId,
1228
                        Rectangle rect,        int machineId, TriadCoords root) {
1229
                log.debug("performing allocation for {}: {}x{}x{} at {}:{}:{}", jobId,
3✔
1230
                                rect.width, rect.height, rect.depth, root.x, root.y, root.z);
3✔
1231
                var boardsToAllocate = sql.getConnectedBoardIDs
3✔
1232
                                .call(integer("board_id"), machineId, root.x, root.y, root.z,
3✔
1233
                                                rect.width, rect.height, rect.depth);
3✔
1234
                if (boardsToAllocate.isEmpty()) {
3✔
1235
                        log.debug("No boards to allocate");
×
1236
                        return List.of();
×
1237
                }
1238
                for (var boardId : boardsToAllocate) {
3✔
1239
                        sql.allocBoard.call(jobId, boardId);
3✔
1240
                }
3✔
1241

1242
                var board = boardsToAllocate.get(0);
3✔
1243
                sql.allocJob.call(rect.width, rect.height, rect.depth,
3✔
1244
                                board, boardsToAllocate.size(), board, jobId);
3✔
1245
                log.info("allocated {} boards to {}; issuing power up commands",
3✔
1246
                                boardsToAllocate.size(), jobId);
3✔
1247
                // Any proxies that existed are now defunct; user must make anew
1248
                rememberer.killProxies(jobId);
3✔
1249
                return setPower(sql, jobId, ON, READY);
3✔
1250
        }
1251

1252
        @Override
1253
        public boolean setPower(int jobId, PowerState power, JobState targetState) {
1254
                if (targetState == DESTROYED) {
3✔
1255
                        throw new IllegalArgumentException(
×
1256
                                        "job destruction must be done via destroyJob() method");
1257
                }
1258
                var allocations = new Allocations(jobId, execute(conn -> {
3✔
1259
                        try (var sql = new PowerSQL(conn)) {
3✔
1260
                                return setPower(sql, jobId, power, targetState);
3✔
1261
                        }
1262
                }));
1263
                allocations.updateEpochs();
3✔
1264
                allocations.updateBMPs();
3✔
1265
                return allocations.notEmpty();
3✔
1266
        }
1267

1268
        /**
1269
         * Issue a request to change the power for the boards of a job.
1270
         *
1271
         * @param sql
1272
         *            How to talk to the DB
1273
         * @param jobId
1274
         *            The job in question
1275
         * @param power
1276
         *            The power state to switch to
1277
         * @param targetState
1278
         *            The state to put the job in afterwards
1279
         * @return The ids of the BMPs that have been changed
1280
         */
1281
        private Collection<BMPAndMachine> setPower(PowerSQL sql, int jobId,
1282
                        PowerState power, JobState targetState) {
1283
                var sourceState = sql.getJobState.call1(
3✔
1284
                                enumerate("job_state", JobState.class), jobId).orElseThrow();
3✔
1285
                var boards = sql.getJobBoards.call(BoardAndBMP::new, jobId);
3✔
1286
                if (boards.isEmpty()) {
3✔
1287
                        log.debug("No boards for job {}", jobId);
3✔
1288
                        if (targetState == DESTROYED) {
3✔
1289
                                log.debug("no boards for {} in destroy", jobId);
3✔
1290
                        }
1291
                        // Make sure we still update the job!
1292
                        updateJob(jobId, sourceState, targetState);
3✔
1293
                        return List.of();
3✔
1294
                }
1295
                log.debug("{} boards for job {}", boards.size(), jobId);
3✔
1296

1297
                // Number of changes pending, one per board
1298
                int numPending = 0;
3✔
1299

1300
                var bmps = new HashSet<BMPAndMachine>();
3✔
1301
                if (power == ON) {
3✔
1302
                        /*
1303
                         * This is a bit of a trickier case, as we need to say which links
1304
                         * are to be switched on or, more particularly, which are to be
1305
                         * switched off because they are links to boards that are not
1306
                         * allocated to the job. Off-board links are shut off by default.
1307
                         */
1308
                        var perimeterLinks = Row.stream(
3✔
1309
                                        sql.getPerimeter.call(Perimeter::new, jobId))
3✔
1310
                                        .toCollectingMap(Direction.class, (p) -> p.boardId,
3✔
1311
                                                        (p) -> p.direction);
3✔
1312

1313
                        for (var board : boards) {
3✔
1314
                                var toChange = perimeterLinks.getOrDefault(board.boardId,
3✔
1315
                                                NO_PERIMETER);
1316
                                numPending += sql.issuePowerChange.call(jobId,
3✔
1317
                                                board.boardId,
3✔
1318
                                                sourceState, targetState, true,
3✔
1319
                                                !toChange.contains(Direction.N),
3✔
1320
                                                !toChange.contains(Direction.E),
3✔
1321
                                                !toChange.contains(Direction.SE),
3✔
1322
                                                !toChange.contains(Direction.S),
3✔
1323
                                                !toChange.contains(Direction.W),
3✔
1324
                                                !toChange.contains(Direction.NW));
3✔
1325
                                bmps.add(board.bmp);
3✔
1326
                        }
3✔
1327
                } else {
3✔
1328
                        // Powering off; all links switch to off so no perimeter check
1329
                        for (var board : boards) {
3✔
1330
                                numPending += sql.issuePowerChange.call(jobId,
3✔
1331
                                                board.boardId,        sourceState, targetState,
3✔
1332
                                                false, false, false, false,        false, false, false);
3✔
1333
                                bmps.add(board.bmp);
3✔
1334
                        }
3✔
1335
                }
1336

1337
                if (targetState == DESTROYED) {
3✔
1338
                        log.debug("num changes for {} in destroy: {}", jobId, numPending);
3✔
1339
                        log.info("destroying job {} after power change", jobId);
3✔
1340
                        int rows = sql.setStateDestroyed.call(jobId);
3✔
1341
                        if (rows != 1) {
3✔
1342
                                log.warn("unexpected number of jobs marked destroyed: {}",
×
1343
                                                rows);
×
1344
                        }
1345
                } else {
3✔
1346
                        log.debug("Num changes for target {}: {}", targetState, numPending);
3✔
1347
                        sql.setStatePending.call(
3✔
1348
                                numPending > 0 ? POWER : targetState, jobId);
3✔
1349
                }
1350

1351
                return bmps;
3✔
1352
        }
1353

1354
        /**
1355
         * Operations for testing only.
1356
         *
1357
         * @hidden
1358
         */
1359
        @ForTestingOnly
1360
        interface TestAPI {
1361
                /**
1362
                 * Allocate all current requests for resources.
1363
                 *
1364
                 * @return The BMPs updated by the allocation.
1365
                 */
1366
                Allocations allocate();
1367

1368
                /**
1369
                 * Destroy a job.
1370
                 *
1371
                 * @param id
1372
                 *            The ID of the job
1373
                 * @param reason
1374
                 *            Why is the job being destroyed.
1375
                 * @return The BMPs updated by the destruction.
1376
                 */
1377
                Allocations destroyJob(int id, String reason);
1378

1379
                /**
1380
                 * Destroy jobs that have missed their keepalive.
1381
                 *
1382
                 * @return The BMPs updated by the expiry.
1383
                 */
1384
                Allocations expireJobs();
1385
        }
1386

1387
        /** Operations for testing historical database only. */
1388
        @ForTestingOnly
1389
        interface HistTestAPI {
1390

1391

1392
                /**
1393
                 * Implementation of {@link AllocatorTask#tombstone()}.
1394
                 *
1395
                 * @return Description of the tombstoned IDs
1396
                 */
1397
                Copied tombstone();
1398
        }
1399

1400
        /**
1401
         * @param conn
1402
         *            The DB connection
1403
         * @return The test interface.
1404
         * @deprecated This interface is just for testing.
1405
         * @hidden
1406
         */
1407
        @ForTestingOnly
1408
        @RestrictedApi(explanation = "just for testing", link = "index.html",
1409
                        allowedOnPath = ".*/src/test/java/.*")
1410
        @Deprecated
1411
        TestAPI getTestAPI(Connection conn) {
1412
                ForTestingOnly.Utils.checkForTestClassOnStack();
3✔
1413
                return new TestAPI() {
3✔
1414
                        @Override
1415
                        public Allocations allocate() {
1416
                                return AllocatorTask.this.allocate(conn);
3✔
1417
                        }
1418

1419
                        @Override
1420
                        public Allocations destroyJob(int id, String reason) {
1421
                                return new Allocations(id,
3✔
1422
                                                AllocatorTask.this.destroyJob(conn, id, reason));
3✔
1423
                        }
1424

1425
                        @Override
1426
                        public Allocations expireJobs() {
1427
                                return AllocatorTask.this.expireJobs(conn);
3✔
1428
                        }
1429
                };
1430
        }
1431

1432
        /**
1433
         * @param conn
1434
         *            The DB connection
1435
         * @param histConn
1436
         *            The historical DB connection
1437
         * @return The test interface.
1438
         * @deprecated This interface is just for testing.
1439
         */
1440
        @ForTestingOnly
1441
        @RestrictedApi(explanation = "just for testing", link = "index.html",
1442
                        allowedOnPath = ".*/src/test/java/.*")
1443
        @Deprecated
1444
        HistTestAPI getHistTestAPI(Connection conn, Connection histConn) {
1445
                ForTestingOnly.Utils.checkForTestClassOnStack();
3✔
1446
                return new HistTestAPI() {
3✔
1447
                        @Override
1448
                        public Copied tombstone() {
1449
                                return AllocatorTask.this.tombstone(conn, histConn);
3✔
1450
                        }
1451
                };
1452
        }
1453
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc