• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 15136401884

20 May 2025 11:32AM UTC coverage: 37.528% (-0.8%) from 38.278%
15136401884

Pull #1227

github

rowleya
Make sure all errors are caught
Pull Request #1227: Emergency stop

114 of 152 new or added lines in 10 files covered. (75.0%)

243 existing lines in 6 files now uncovered.

9062 of 24147 relevant lines covered (37.53%)

1.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.13
/SpiNNaker-allocserv/src/main/java/uk/ac/manchester/spinnaker/alloc/allocator/AllocatorTask.java
1
/*
2
 * Copyright (c) 2021 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.alloc.allocator;
17

18
import static java.lang.Math.ceil;
19
import static java.lang.Math.max;
20
import static java.lang.Math.min;
21
import static java.lang.Math.sqrt;
22
import static java.lang.String.format;
23
import static java.util.Objects.nonNull;
24
import static org.slf4j.LoggerFactory.getLogger;
25
import static uk.ac.manchester.spinnaker.alloc.Constants.TRIAD_DEPTH;
26
import static uk.ac.manchester.spinnaker.alloc.db.Row.enumerate;
27
import static uk.ac.manchester.spinnaker.alloc.db.Row.integer;
28
import static uk.ac.manchester.spinnaker.alloc.db.Utils.isBusy;
29
import static uk.ac.manchester.spinnaker.alloc.model.JobState.DESTROYED;
30
import static uk.ac.manchester.spinnaker.alloc.model.JobState.POWER;
31
import static uk.ac.manchester.spinnaker.alloc.model.JobState.QUEUED;
32
import static uk.ac.manchester.spinnaker.alloc.model.JobState.READY;
33
import static uk.ac.manchester.spinnaker.alloc.model.PowerState.OFF;
34
import static uk.ac.manchester.spinnaker.alloc.model.PowerState.ON;
35
import static uk.ac.manchester.spinnaker.utils.MathUtils.ceildiv;
36

37
import java.time.Instant;
38
import java.util.ArrayList;
39
import java.util.Collection;
40
import java.util.EnumSet;
41
import java.util.HashSet;
42
import java.util.List;
43
import java.util.Objects;
44
import java.util.Set;
45
import java.util.concurrent.ScheduledFuture;
46
import java.util.stream.Stream;
47

48
import javax.annotation.PostConstruct;
49

50
import org.slf4j.Logger;
51
import org.springframework.beans.factory.annotation.Autowired;
52
import org.springframework.dao.DataAccessException;
53
import org.springframework.scheduling.TaskScheduler;
54
import org.springframework.scheduling.support.CronTrigger;
55
import org.springframework.stereotype.Service;
56

57
import com.google.errorprone.annotations.RestrictedApi;
58
import com.google.errorprone.annotations.concurrent.GuardedBy;
59

60
import uk.ac.manchester.spinnaker.alloc.ForTestingOnly;
61
import uk.ac.manchester.spinnaker.alloc.ServiceMasterControl;
62
import uk.ac.manchester.spinnaker.alloc.SpallocProperties.AllocatorProperties;
63
import uk.ac.manchester.spinnaker.alloc.SpallocProperties.HistoricalDataProperties;
64
import uk.ac.manchester.spinnaker.alloc.SpallocProperties.KeepaliveProperties;
65
import uk.ac.manchester.spinnaker.alloc.bmp.BMPController;
66
import uk.ac.manchester.spinnaker.alloc.db.DatabaseAPI.Connection;
67
import uk.ac.manchester.spinnaker.alloc.db.DatabaseAPI.Query;
68
import uk.ac.manchester.spinnaker.alloc.db.DatabaseAPI.Update;
69
import uk.ac.manchester.spinnaker.alloc.db.DatabaseAwareBean;
70
import uk.ac.manchester.spinnaker.alloc.db.Row;
71
import uk.ac.manchester.spinnaker.alloc.db.SQLQueries;
72
import uk.ac.manchester.spinnaker.alloc.model.BMPAndMachine;
73
import uk.ac.manchester.spinnaker.alloc.model.BoardAndBMP;
74
import uk.ac.manchester.spinnaker.alloc.model.Direction;
75
import uk.ac.manchester.spinnaker.alloc.model.JobState;
76
import uk.ac.manchester.spinnaker.alloc.model.PowerState;
77
import uk.ac.manchester.spinnaker.machine.board.TriadCoords;
78
import uk.ac.manchester.spinnaker.utils.UsedInJavadocOnly;
79

80
/**
81
 * The allocation engine. Allocations are performed by running suitable
82
 * (non-trivial) SQL queries on a periodic basis, putting jobs that cannot be
83
 * allocated back on the queue for a later attempt (and increasing their
84
 * priority when it does so). This class is also responsible for destroying jobs
85
 * that are not kept alive sufficiently frequently, and eventually migrating
86
 * records of dead jobs to long-term storage ("tombstoning").
87
 */
88
@Service
89
public class AllocatorTask extends DatabaseAwareBean
3✔
90
                implements PowerController {
91
        /**
92
         * @see #setPower(Connection,int,PowerState)
93
         */
94
        private static final EnumSet<Direction> NO_PERIMETER =
3✔
95
                        EnumSet.noneOf(Direction.class);
3✔
96

97
        private static final Logger log = getLogger(AllocatorTask.class);
3✔
98

99
        private static final Rectangle ONE_BOARD = new Rectangle(1, 1, 1);
3✔
100

101
        /**
102
         * Maximum number of jobs that we actually run the quota check for. Used
103
         * because we are reusing a query, and we'll probably never have that many
104
         * live jobs even on the big machine.
105
         */
106
        private static final Integer NUMBER_OF_JOBS_TO_QUOTA_CHECK = 100000;
3✔
107

108
        private static final String DESTROY_ON_POWER_ERROR =
109
                        "Error changing power state!  Please contact an administrator.";
110

111
        @Autowired
112
        private Epochs epochs;
113

114
        @Autowired
115
        private ServiceMasterControl serviceControl;
116

117
        @Autowired
118
        private QuotaManager quotaManager;
119

120
        @Autowired
121
        private AllocatorProperties allocProps;
122

123
        @Autowired
124
        private KeepaliveProperties keepAliveProps;
125

126
        @Autowired
127
        private HistoricalDataProperties historyProps;
128

129
        @Autowired
130
        private JobObjectRememberer rememberer;
131

132
        @Autowired
133
        private TaskScheduler scheduler;
134

135
        @GuardedBy("itself")
3✔
136
        private final List<ScheduledFuture<?>> futures = new ArrayList<>();
137

138
        @GuardedBy("futures")
3✔
139
        private ScheduledFuture<?> allocateFuture = null;
140

141
        @GuardedBy("futures")
3✔
142
        private boolean emergencyStop = false;
143

144
        // Note can't be autowired as circular;
145
        // instead set by setter in postconstruct of BMPController
146
        private BMPController bmpController;
147

148
        public void setBMPController(BMPController bmpController) {
149
                this.bmpController = bmpController;
3✔
150
        }
3✔
151

152
        @PostConstruct
153
        @SuppressWarnings("FutureReturnValueIgnored")
154
        private void init() {
155
                synchronized (futures) {
3✔
156
                        futures.add(scheduler.scheduleAtFixedRate(() -> allocate(),
3✔
157
                                        allocProps.getPeriod()));
3✔
158
                        futures.add(scheduler.scheduleAtFixedRate(() -> expireJobs(),
3✔
159
                                        keepAliveProps.getExpiryPeriod()));
3✔
160
                        futures.add(scheduler.schedule(() -> tombstone(),
3✔
161
                                        new CronTrigger(historyProps.getSchedule())));
3✔
162
                }
3✔
163
        }
3✔
164

165
        /**
166
         * Perform update on a job now as a result of a change.
167
         *
168
         * @param jobId
169
         *            The job to update.
170
         * @param sourceState
171
         *            The change source state.
172
         * @param targetState
173
         *            The change target state.
174
         */
175
        public void updateJob(int jobId, JobState sourceState,
176
                        JobState targetState) {
177
                synchronized (futures) {
3✔
178
                        // No need to update the job in emergency stop mode, as we are
179
                        // going to cancel them all!  This is non-critical though; anything
180
                        // that gets through here should be OK.
181
                        if (emergencyStop) {
3✔
NEW
182
                                log.warn("emergency stop; not updating job {}", jobId);
×
NEW
183
                                return;
×
184
                        }
185
                        scheduler.schedule(
3✔
186
                                        () -> updateJobNow(jobId, sourceState, targetState),
3✔
187
                                        Instant.now());
3✔
188
                }
3✔
189
        }
3✔
190

191
        private void updateJobNow(int jobId, JobState sourceState,
192
                        JobState targetState) {
193
                try {
194
                        var updated = execute(
3✔
195
                                        conn -> update(jobId, sourceState, targetState, conn));
3✔
196
                        if (updated) {
3✔
197
                                log.debug("advancing job {} epoch", jobId);
3✔
198
                                epochs.jobChanged(jobId);
3✔
199
                        }
200
                } catch (DataAccessException e) {
×
201
                        if (isBusy(e)) {
×
202
                                log.info("database is busy; "
×
203
                                                + "will try allocation processing later");
204
                                return;
×
205
                        }
206
                        throw e;
×
207
                }
3✔
208
        }
3✔
209

210
        private boolean update(int jobId, JobState sourceState,
211
                        JobState targetState, Connection c) {
212
                log.debug("Updating job {} from {} to {}", jobId, sourceState,
3✔
213
                                targetState);
214
                try (var getChangeStatus = c.query(COUNT_CHANGES_FOR_JOB);
3✔
215
                                var setJobState = c.update(SET_STATE_PENDING);
3✔
216
                                var setJobDestroyed = c.update(SET_STATE_DESTROYED);
3✔
217
                                var deleteChanges = c.update(DELETE_PENDING);
3✔
218
                                var deleteTask = c.update(DELETE_TASK)) {
3✔
219
                        // Count pending changes for this state change
220
                        var status = getChangeStatus.call1(ChangeStatus::new,
3✔
221
                                        jobId, sourceState, targetState).orElseThrow(
3✔
222
                                                        () -> new RuntimeException(
×
223
                                                                        "Error counting job tasks"));
224

225
                        log.debug("Job {} has {} changes remaining", jobId,
3✔
226
                                        status.nChanges);
3✔
227

228
                        // If the remaining things are errors, react (if there are errors,
229
                        // eventually non-errors will be deleted)
230
                        if (status.nErrors > 0 && (status.nErrors == status.nChanges)) {
3✔
231
                                log.info("Job {} changes resulted in errors.", jobId);
3✔
232

233
                                // We can delete the changes now as we know the issues
234
                                deleteChanges.call(jobId, sourceState, targetState);
3✔
235

236
                                // If we are going to destroyed, we can mostly ignore errors,
237
                                // and similarly if we are going to queue it again anyway
238
                                if (targetState == DESTROYED || targetState == QUEUED) {
3✔
239
                                        return true;
×
240
                                }
241

242
                                // If the job was ready before we tried to do this, we have to
243
                                // destroy the job with an error!
244
                                if (sourceState == READY) {
3✔
245
                                        destroyJob(c, jobId, DESTROY_ON_POWER_ERROR);
×
246
                                        return true;
×
247
                                }
248

249
                                // If the job was not ready, we need to re-queue and reallocate
250
                                // boards, but not if in emergency stop mode
251
                                synchronized (futures) {
3✔
252
                                        if (emergencyStop) {
3✔
NEW
253
                                                log.warn("emergency stop; not requeuing job {}", jobId);
×
NEW
254
                                                return false;
×
255
                                        }
256
                                        // The power request will get through here, but will be
257
                                        // stopped later.  This is a request for off anyway, so non
258
                                        // critical.
259
                                        scheduler.schedule(() -> setPower(jobId, OFF, QUEUED),
3✔
260
                                                        Instant.now());
3✔
261
                                }
3✔
262
                                return false;
3✔
263
                        } else if (status.nChanges > 0) {
3✔
264
                                // There are still changes happening - let them finish first
265
                                // even if there are errors as safer to do once everything
266
                                // is done.
267
                                return false;
×
268
                        }
269

270
                        // If there are no more pending changes and no errors,
271
                        // set the job state to the target state
272
                        log.debug("Job {} moving to state {}", jobId, targetState);
3✔
273

274
                        // If destroyed, we don't need to set the state as it will be gone
275
                        if (targetState == DESTROYED) {
3✔
276
                                int rows = setJobDestroyed.call(jobId);
3✔
277
                                if (rows != 1) {
3✔
278
                                        log.warn("unexpected number of rows affected by "
3✔
279
                                                        + "destroy in state update: {}", rows);
3✔
280
                                }
281
                                return rows > 0;
3✔
282

283
                        // If ready, we can now delete the job request too
284
                        } else if (targetState == READY) {
3✔
285
                                int rows = deleteTask.call(jobId);
3✔
286
                                if (rows != 1) {
3✔
287
                                        log.warn("unexpected number of rows affected by ready in "
3✔
288
                                                        + "state update: {}", rows);
3✔
289
                                }
290
                        }
291

292
                        return setJobState.call(targetState, jobId) > 0;
3✔
293
                }
3✔
294
        }
295

296
        public void emergencyStop() {
297
                synchronized (futures) {
3✔
298
                        emergencyStop = true;
3✔
299
                        for (var future : futures) {
3✔
300
                                future.cancel(true);
3✔
301
                        }
3✔
302
                        if (allocateFuture != null) {
3✔
NEW
303
                                allocateFuture.cancel(true);
×
304
                        }
305
                }
3✔
306
                bmpController.emergencyStop();
3✔
307
                destroyAllJobs();
3✔
308
        }
3✔
309

310
        /**
311
         * Helper class representing a rectangle of triads.
312
         *
313
         * @author Donal Fellows
314
         */
315
        private static final class Rectangle {
316
                final int width;
317

318
                final int height;
319

320
                /** Depth of rectangle. 1 or 3 */
321
                final int depth;
322

323
                private Rectangle(int width, int height, int depth) {
3✔
324
                        this.width = width;
3✔
325
                        this.height = height;
3✔
326
                        this.depth = depth;
3✔
327
                }
3✔
328

329
                private Rectangle(Row row) {
330
                        this(row.getInt("max_width"), row.getInt("max_height"),
3✔
331
                                        TRIAD_DEPTH);
332
                }
3✔
333

334
                @Override
335
                public String toString() {
336
                        return format("%dx%dx%d", width, height, depth);
×
337
                }
338

339
                public int getArea() {
340
                        return width * height * depth;
×
341
                }
342
        }
343

344
        /**
345
         * Ask for allocation to happen now.
346
         */
347
        public void scheduleAllocateNow() {
348
                synchronized (futures) {
3✔
349
                        if (emergencyStop) {
3✔
NEW
350
                                log.warn("emergency stop; not scheduling allocation");
×
NEW
351
                                return;
×
352
                        }
353
                        allocateFuture = scheduler.schedule(this::allocate, Instant.now());
3✔
354
                }
3✔
355
        }
3✔
356

357
        /**
358
         * Allocate all current requests for resources.
359
         */
360
        public synchronized void allocate() {
361
                if (serviceControl.isPaused()) {
3✔
362
                        return;
3✔
363
                }
364

365
                try {
366
                        var allocated = execute(this::allocate);
×
367
                        allocated.updateEpochs();
×
368
                        allocated.updateBMPs();
×
369
                } catch (DataAccessException e) {
×
370
                        if (isBusy(e)) {
×
371
                                log.info("database is busy; "
×
372
                                                + "will try allocation processing later");
373
                                return;
×
374
                        }
375
                        throw e;
×
376
                }
×
377
        }
×
378

379
        private class Perimeter {
380
                int boardId;
381

382
                Direction direction;
383

384
                Perimeter(Row row) {
3✔
385
                        boardId = row.getInt("board_id");
3✔
386
                        direction = row.getEnum("direction", Direction.class);
3✔
387
                }
3✔
388
        }
389

390
        private class ChangeStatus {
391
                private final int nChanges;
392

393
                private final int nErrors;
394

395
                ChangeStatus(Row row) {
3✔
396
                        nChanges = row.getInt("n_changes");
3✔
397
                        nErrors = row.getInt("n_errors");
3✔
398
                }
3✔
399
        }
400

401
        /** Encapsulates the queries and updates used in power control. */
402
        private class PowerSQL extends AbstractSQL {
403
                /** Get basic information about a specific job. */
404
                private final Query getJobState;
405

406
                /** Get what boards are allocated to a job (that is queued or ready). */
407
                private final Query getJobBoards;
408

409
                /**
410
                 * Get the links on the perimeter of the allocation to a job. The
411
                 * perimeter is defined as being the links between a board that is part
412
                 * of the allocation and a board that is not; it's <em>not</em> a
413
                 * geometric definition, but rather a relational algebraic one.
414
                 */
415
                private final Query getPerimeter;
416

417
                /** Create a request to change the power status of a board. */
418
                private final Update issuePowerChange;
419

420
                /** Set the state and number of pending changes for a job. */
421
                private final Update setStatePending;
422

423
                /** Set the state to destroyed. */
424
                private final Update setStateDestroyed;
425

426
                PowerSQL(Connection conn) {
3✔
427
                        super(conn);
3✔
428
                        getJobState = conn.query(GET_JOB);
3✔
429
                        getJobBoards = conn.query(GET_JOB_BOARDS);
3✔
430
                        getPerimeter = conn.query(getPerimeterLinks);
3✔
431
                        issuePowerChange = conn.update(issueChangeForJob);
3✔
432
                        setStatePending = conn.update(SET_STATE_PENDING);
3✔
433
                        setStateDestroyed = conn.update(SET_STATE_DESTROYED);
3✔
434
                }
3✔
435

436
                @Override
437
                public void close() {
438
                        getJobState.close();
3✔
439
                        getJobBoards.close();
3✔
440
                        getPerimeter.close();
3✔
441
                        issuePowerChange.close();
3✔
442
                        setStatePending.close();
3✔
443
                        setStateDestroyed.close();
3✔
444
                }
3✔
445
        }
446

447
        /** Encapsulates the queries and updates used in allocation. */
448
        @UsedInJavadocOnly(SQLQueries.class)
449
        private final class AllocSQL extends PowerSQL {
450
                /** Increases the importance of a job. */
451
                private final Update bumpImportance;
452

453
                /** Get the list of allocation tasks for jobs in a given state. */
454
                private final Query getTasks;
455

456
                /** Find a single free board. */
457
                private final Query findFreeBoard;
458

459
                /**
460
                 * Find a rectangle of triads of boards that may be allocated.
461
                 *
462
                 * @see SQLQueries#FIND_FREE_BOARD
463
                 */
464
                private final Query getRectangles;
465

466
                /**
467
                 * Find a rectangle of triads of boards that may be allocated rooted at
468
                 * a particular board.
469
                 *
470
                 * @see SQLQueries#FIND_FREE_BOARD_AT
471
                 */
472
                private final Query getRectangleAt;
473

474
                /**
475
                 * Count the number of <em>connected</em> boards (i.e., have at least
476
                 * one path over enabled links to the root board of the allocation)
477
                 * within a rectangle of triads. The triads are taken as being full
478
                 * depth.
479
                 */
480
                private final Query countConnectedBoards;
481

482
                /**
483
                 * Find an allocatable board with a specific board ID. (This will have
484
                 * been previously converted from some other form of board coordinates.)
485
                 */
486
                private final Query findSpecificBoard;
487

488
                /**
489
                 * Get the set of boards at some coordinates within a triad rectangle
490
                 * that are connected (i.e., have at least one path over enableable
491
                 * links) to the root board.
492
                 */
493
                private final Query getConnectedBoardIDs;
494

495
                /** Tell a board that it is allocated. */
496
                private final Update allocBoard;
497

498
                /** Tell a job that it is allocated. Doesn't set the state. */
499
                private final Update allocJob;
500

501
                AllocSQL(Connection conn) {
3✔
502
                        super(conn);
3✔
503
                        bumpImportance = conn.update(BUMP_IMPORTANCE);
3✔
504
                        getTasks = conn.query(getAllocationTasks);
3✔
505
                        findFreeBoard = conn.query(FIND_FREE_BOARD);
3✔
506
                        getRectangles = conn.query(findRectangle);
3✔
507
                        getRectangleAt = conn.query(findRectangleAt);
3✔
508
                        countConnectedBoards = conn.query(countConnected);
3✔
509
                        findSpecificBoard = conn.query(FIND_LOCATION);
3✔
510
                        getConnectedBoardIDs = conn.query(getConnectedBoards);
3✔
511
                        allocBoard = conn.update(ALLOCATE_BOARDS_BOARD);
3✔
512
                        allocJob = conn.update(ALLOCATE_BOARDS_JOB);
3✔
513
                }
3✔
514

515
                @Override
516
                public void close() {
517
                        super.close();
3✔
518
                        bumpImportance.close();
3✔
519
                        getTasks.close();
3✔
520
                        findFreeBoard.close();
3✔
521
                        getRectangles.close();
3✔
522
                        getRectangleAt.close();
3✔
523
                        countConnectedBoards.close();
3✔
524
                        findSpecificBoard.close();
3✔
525
                        getConnectedBoardIDs.close();
3✔
526
                        allocBoard.close();
3✔
527
                        allocJob.close();
3✔
528
                }
3✔
529
        }
530

531
        /** Encapsulates the queries and updates used in deletion. */
532
        private final class DestroySQL extends PowerSQL {
533

534
                /** Get basic information about a specific job. */
535
                private final Query getJob = conn.query(GET_JOB);
3✔
536

537
                /** Mark a job as dead. */
538
                private final Update markAsDestroyed = conn.update(DESTROY_JOB);
3✔
539

540
                /** Note the reason why a job is dead. */
541
                private Update recordDestroyReason = conn.update(NOTE_DESTROY_REASON);
3✔
542

543
                /** Delete a request to allocate resources for a job. */
544
                private final Update killAlloc = conn.update(KILL_JOB_ALLOC_TASK);
3✔
545

546
                DestroySQL(Connection conn) {
3✔
547
                        super(conn);
3✔
548
                }
3✔
549

550
                @Override
551
                public void close() {
552
                        super.close();
3✔
553
                        getJob.close();
3✔
554
                        markAsDestroyed.close();
3✔
555
                        recordDestroyReason.close();
3✔
556
                        killAlloc.close();
3✔
557
                }
3✔
558
        }
559

560
        private class AllocTask {
561
                final int id;
562

563
                final int importance;
564

565
                final int jobId;
566

567
                final int machineId;
568

569
                final Rectangle max;
570

571
                final int maxDeadBoards;
572

573
                final Integer numBoards;
574

575
                final Integer width;
576

577
                final Integer height;
578

579
                final Integer root;
580

581
                AllocTask(Row row) {
3✔
582
                        id = row.getInt("req_id");
3✔
583
                        importance = row.getInt("importance");
3✔
584
                        jobId = row.getInt("job_id");
3✔
585
                        machineId = row.getInt("machine_id");
3✔
586
                        max = new Rectangle(row);
3✔
587
                        maxDeadBoards = row.getInt("max_dead_boards");
3✔
588
                        numBoards = row.getInteger("num_boards");
3✔
589
                        width = row.getInteger("width");
3✔
590
                        height = row.getInteger("height");
3✔
591
                        root = row.getInteger("board_id");
3✔
592
                }
3✔
593

594
                Collection<BMPAndMachine> allocate(AllocSQL sql) {
595
                        if (nonNull(numBoards) && numBoards > 0) {
3✔
596
                                // Single-board case gets its own allocator that's better at
597
                                // that
598
                                if (numBoards == 1) {
3✔
599
                                        log.debug("Allocate one board");
3✔
600
                                        return allocateOneBoard(sql, jobId, machineId);
3✔
601
                                }
602
                                var estimate = new DimensionEstimate(numBoards, max);
3✔
603
                                return allocateDimensions(sql, jobId, machineId, estimate,
3✔
604
                                                maxDeadBoards);
605
                        }
606

607
                        if (nonNull(width) && nonNull(height) && nonNull(root)) {
3✔
608
                                return allocateTriadsAt(sql, jobId, machineId, root, width,
×
609
                                                height,        maxDeadBoards);
×
610
                        }
611

612
                        if (nonNull(width) && nonNull(height) && width > 0 && height > 0) {
3✔
613
                                // Special case; user is really just asking for one board
614
                                if (height == 1 && width == 1 && nonNull(maxDeadBoards)
3✔
615
                                                && maxDeadBoards == 2) {
616
                                        return allocateOneBoard(sql, jobId, machineId);
3✔
617
                                }
618
                                var estimate = new DimensionEstimate(width, height, max);
3✔
619
                                log.debug(
3✔
620
                                                "resolved request for {}x{} boards to {}x{} triads "
621
                                                                + "with tolerance {}",
622
                                                width, height, estimate.width, estimate.height,
3✔
623
                                                estimate.tolerance);
3✔
624
                                return allocateDimensions(sql, jobId, machineId, estimate,
3✔
625
                                                maxDeadBoards);
626
                        }
627

628
                        if (nonNull(root)) {
3✔
629
                                // Ignores maxDeadBoards; is a single-board allocate
630
                                return allocateBoard(sql, jobId, machineId, root);
3✔
631
                        }
632

633
                        log.warn("job {} could not be allocated; "
×
634
                                        + "bad request will be cleared from queue", jobId);
×
635
                        return List.of();
×
636
                }
637
        }
638

639
        /**
640
         * A set of information about the allocations that have been made.
641
         */
642
        class Allocations {
643

644
                /** The BMPs that have been affected by the allocations. **/
645
                final Set<Integer> bmps = new HashSet<>();
3✔
646

647
                /** The Machines that have been affected by the allocations. **/
648
                final Set<Integer> machines = new HashSet<>();
3✔
649

650
                /** The jobs that have been affected by the allocations. **/
651
                final List<Integer> jobIds = new ArrayList<>();
3✔
652

653
                Allocations() {
3✔
654
                        // Does nothing
655
                }
3✔
656

657
                Allocations(int jobId, Collection<BMPAndMachine> bmps) {
3✔
658
                        addAll(jobId, bmps);
3✔
659
                }
3✔
660

661
                void addAll(int jobId, Collection<BMPAndMachine> bmps) {
662
                        if (bmps.size() > 0) {
3✔
663
                                jobIds.add(jobId);
3✔
664
                                for (var bm : bmps) {
3✔
665
                                        this.bmps.add(bm.bmpId);
3✔
666
                                        this.machines.add(bm.machineId);
3✔
667
                                }
3✔
668
                        }
669
                }
3✔
670

671
                void updateEpochs() {
672
                        log.debug("Updating jobs {}", jobIds);
3✔
673
                        for (var job : jobIds) {
3✔
674
                                epochs.jobChanged(job);
3✔
675
                        }
3✔
676
                        log.debug("Updating machines {}", machines);
3✔
677
                        for (var m : machines) {
3✔
678
                                epochs.machineChanged(m);
3✔
679
                        }
3✔
680
                }
3✔
681

682
                void updateBMPs() {
683
                        if (!bmps.isEmpty() && bmpController != null) {
3✔
684
                                // Poke the BMP controller to start looking!
685
                                log.debug("Triggering BMPs {}", bmps);
3✔
686
                                bmpController.triggerSearch(bmps);
3✔
687
                        }
688
                }
3✔
689

690
                boolean notEmpty() {
691
                        return !jobIds.isEmpty();
3✔
692
                }
693
        }
694

695
        /**
696
         * Allocate all current requests for resources.
697
         *
698
         * @param conn
699
         *            The DB connection
700
         * @return Whether any changes have been done
701
         */
702
        private Allocations allocate(Connection conn) {
703
                try (var sql = new AllocSQL(conn)) {
3✔
704
                        int maxImportance = -1;
3✔
705
                        log.trace("Allocate running");
3✔
706
                        var allocations = new Allocations();
3✔
707
                        var tasks = sql.getTasks.call(AllocTask::new, QUEUED);
3✔
708
                        log.debug("allocate for {} tasks", tasks.size());
3✔
709
                        for (AllocTask task : tasks) {
3✔
710
                                if (task.importance > maxImportance) {
3✔
711
                                        maxImportance = task.importance;
3✔
712
                                } else if (task.importance < maxImportance
×
713
                                                - allocProps.getImportanceSpan()) {
×
714
                                        // Too much of a span
715
                                        continue;
×
716
                                }
717
                                var handled = task.allocate(sql);
3✔
718
                                allocations.addAll(task.jobId, handled);
3✔
719
                                log.debug("allocate for {} (job {}): {}", task.id,
3✔
720
                                                task.jobId, handled);
3✔
721
                        }
3✔
722
                        /*
723
                         * Those tasks which weren't allocated get their importance bumped
724
                         * so they get considered with higher priority when the allocator
725
                         * runs next time.
726
                         */
727
                        sql.bumpImportance.call();
3✔
728
                        return allocations;
3✔
729
                }
730
        }
731

732
        /**
733
         * Destroy jobs that have missed their keepalive.
734
         */
735
        public void expireJobs() {
736
                if (serviceControl.isPaused()) {
3✔
737
                        return;
3✔
738
                }
739

740
                try {
741
                        var allocated = execute(this::expireJobs);
×
742
                        allocated.updateEpochs();
×
743
                        allocated.updateBMPs();
×
744
                } catch (DataAccessException e) {
×
745
                        if (isBusy(e)) {
×
746
                                log.info("database is busy; "
×
747
                                                + "will try job expiry processing later");
748
                                return;
×
749
                        }
750
                        throw e;
×
751
                }
×
752
        }
×
753

754
        /**
755
         * Destroy jobs that have missed their keepalive.
756
         *
757
         * @param conn
758
         *            How to talk to the DB
759
         * @return Whether any jobs have been expired.
760
         */
761
        private Allocations expireJobs(Connection conn) {
762
                var allocations = new Allocations();
3✔
763
                try (var find = conn.query(FIND_EXPIRED_JOBS)) {
3✔
764
                        var toKill = find.call(integer("job_id"));
3✔
765
                        for (var id : toKill) {
3✔
766
                                allocations.addAll(id,
3✔
767
                                                destroyJob(conn, id, "keepalive expired"));
3✔
768
                        }
3✔
769
                }
770
                try (var find = conn.query(GET_LIVE_JOB_IDS)) {
3✔
771
                        var toKill = find.call(integer("job_id"),
3✔
772
                                        NUMBER_OF_JOBS_TO_QUOTA_CHECK, 0);
3✔
773
                        for (var id : toKill) {
3✔
774
                                if (quotaManager.shouldKillJob(id)) {
×
775
                                        allocations.addAll(id,
×
776
                                                        destroyJob(conn, id, "quota exceeded"));
×
777
                                }
778
                        }
×
779
                }
780
                return allocations;
3✔
781
        }
782

783
        /**
784
         * Migrates long dead jobs to the historical data DB.
785
         */
786
        public void tombstone() {
787
                if (serviceControl.isPaused()) {
×
788
                        return;
×
789
                }
790

791
                try (var conn = getConnection();
×
792
                                var histConn = getHistoricalConnection()) {
×
793
                        var c = tombstone(conn, histConn);
×
794
                        log.info("tombstoning completed: "
×
795
                                        + "moved {} job records and {} allocation records",
796
                                        c.numJobs(), c.numAllocs());
×
797
                } catch (DataAccessException e) {
×
798
                        if (isBusy(e)) {
×
799
                                log.info("database is busy; "
×
800
                                                + "will try job tombstone processing at future date");
801
                                return;
×
802
                        }
803
                        throw e;
×
804
                }
×
805
        }
×
806

807
        /**
808
         * Describes what the first stage of the tombstoner has copied.
809
         */
810
        static final class Copied {
811
                private final List<HistoricalJob> jobs;
812

813
                private final List<HistoricalAlloc> allocs;
814

815
                private Copied(List<HistoricalJob> jobs, List<HistoricalAlloc> allocs) {
3✔
816
                        this.jobs = jobs;
3✔
817
                        this.allocs = allocs;
3✔
818
                }
3✔
819

820
                private Stream<HistoricalAlloc> allocs() {
821
                        return allocs.stream().filter(Objects::nonNull);
3✔
822
                }
823

824
                private Stream<HistoricalJob> jobs() {
825
                        return jobs.stream().filter(Objects::nonNull);
3✔
826
                }
827

828
                private Stream<Integer> nmpiJobs() {
829
                        return jobs().map(j -> j.nmpiJobId).filter(Objects::nonNull);
3✔
830
                }
831

832
                private Stream<Integer> nmpiSessions() {
833
                        return jobs().map(j -> j.nmpiSessionId).filter(Objects::nonNull);
3✔
834
                }
835

836
                /**
837
                 * @return The number of job records to copy over to the historical
838
                 *         database.
839
                 */
840
                int numJobs() {
841
                        return jobs.size();
3✔
842
                }
843

844
                /**
845
                 * @return The number of board allocation records to copy over to the
846
                 *         historical database.
847
                 */
848
                int numAllocs() {
849
                        return allocs.size();
3✔
850
                }
851
        }
852

853
        private class HistoricalAlloc {
854
                int allocId;
855

856
                int jobId;
857

858
                int boardId;
859

860
                Instant allocTimestamp;
861

862
                HistoricalAlloc(Row row) {
×
863
                        allocId = row.getInt("alloc_id");
×
864
                        jobId = row.getInt("job_id");
×
865
                        boardId = row.getInt("board_id");
×
866
                        allocTimestamp = row.getInstant("alloc_timestamp");
×
867
                }
×
868

869
                Object[] args() {
870
                        return new Object[] {
×
871
                                allocId, jobId, boardId, allocTimestamp
×
872
                        };
873
                }
874
        }
875

876
        private class HistoricalJob {
877
                int jobId;
878

879
                int machineId;
880

881
                String owner;
882

883
                Instant createTimestamp;
884

885
                int width;
886

887
                int height;
888

889
                int depth;
890

891
                int allocatedRoot;
892

893
                Instant keepaliveInterval;
894

895
                String keepaliveHost;
896

897
                String deathReason;
898

899
                Instant deathTimestamp;
900

901
                byte[] originalRequest;
902

903
                Instant allocationTimestamp;
904

905
                int allocationSize;
906

907
                String machineName;
908

909
                String userName;
910

911
                int groupId;
912

913
                String groupName;
914

915
                Integer nmpiJobId;
916

917
                Integer nmpiSessionId;
918

919
                HistoricalJob(Row row) {
3✔
920
                        jobId = row.getInt("job_id");
3✔
921
                        machineId = row.getInt("machine_id");
3✔
922
                        owner = row.getString("owner");
3✔
923
                        createTimestamp = row.getInstant("create_timestamp");
3✔
924
                        width = row.getInt("width");
3✔
925
                        height = row.getInt("height");
3✔
926
                        depth = row.getInt("depth");
3✔
927
                        allocatedRoot = row.getInt("allocated_root");
3✔
928
                        keepaliveInterval = row.getInstant("keepalive_interval");
3✔
929
                        keepaliveHost = row.getString("keepalive_host");
3✔
930
                        deathReason = row.getString("death_reason");
3✔
931
                        deathTimestamp = row.getInstant("death_timestamp");
3✔
932
                        originalRequest = row.getBytes("original_request");
3✔
933
                        allocationTimestamp = row.getInstant("allocation_timestamp");
3✔
934
                        allocationSize = row.getInt("allocation_size");
3✔
935
                        machineName = row.getString("machine_name");
3✔
936
                        userName = row.getString("user_name");
3✔
937
                        groupId = row.getInt("group_id");
3✔
938
                        groupName = row.getString("group_name");
3✔
939
                        nmpiJobId = row.getInteger("nmpi_job_id");
3✔
940
                        nmpiSessionId = row.getInteger("nmpi_session_id");
3✔
941
                }
3✔
942

943
                Object[] args() {
944
                        return new Object[] {
3✔
945
                                jobId, machineId, owner, createTimestamp,
3✔
946
                                width, height, depth, allocatedRoot, keepaliveInterval,
3✔
947
                                keepaliveHost, deathReason, deathTimestamp, originalRequest,
948
                                allocationTimestamp, allocationSize, machineName, userName,
3✔
949
                                groupId, groupName, nmpiJobId, nmpiSessionId
3✔
950
                        };
951
                }
952
        }
953

954
        /**
955
         * Implementation of {@link #tombstone()}. This is done as two transactions
956
         * to help manage the amount of locking (especially multi-DB locking);
957
         * nothing else ought to be updating any of these jobs at the time this task
958
         * usually runs, but we'll still try to keep things minimally locked.
959
         *
960
         * @param conn
961
         *            The DB connection
962
         * @return Description of the tombstoned IDs
963
         */
964
        private Copied tombstone(Connection conn, Connection histConn) {
965
                // No tombstoning without the target DB!
966
                if (!isHistoricalDBAvailable()) {
3✔
967
                        return new Copied(List.of(), List.of());
×
968
                }
969

970
                try (var readJobs = conn.query(READ_HISTORICAL_JOBS);
3✔
971
                                var readAllocs = conn.query(READ_HISTORICAL_ALLOCS);
3✔
972
                                var deleteJobs = conn.update(DELETE_JOB_RECORD);
3✔
973
                                var deleteAllocs = conn.update(DELETE_ALLOC_RECORD);
3✔
974
                                var deleteNMPIJob = conn.update(DELETE_NMPI_JOB);
3✔
975
                                var deleteNMPISession = conn.update(DELETE_NMPI_SESSION);
3✔
976
                                var writeJobs = histConn.update(WRITE_HISTORICAL_JOBS);
3✔
977
                                var writeAllocs = histConn.update(WRITE_HISTORICAL_ALLOCS)) {
3✔
978
                        var grace = historyProps.getGracePeriod();
3✔
979
                        var copied = conn.transaction(
3✔
980
                                        () -> new Copied(readJobs.call(HistoricalJob::new, grace),
3✔
981
                                                        readAllocs.call(HistoricalAlloc::new, grace)));
3✔
982
                        histConn.transaction(() -> {
3✔
983
                                copied.allocs().forEach((a) -> writeAllocs.call(a.args()));
3✔
984
                                copied.jobs().forEach((j) -> writeJobs.call(j.args()));
3✔
985
                        });
3✔
986
                        conn.transaction(() -> {
3✔
987
                                copied.nmpiJobs().forEach(deleteNMPIJob::call);
3✔
988
                                copied.nmpiSessions().forEach(deleteNMPISession::call);
3✔
989
                                copied.allocs().forEach((a) -> deleteAllocs.call(a.allocId));
3✔
990
                                copied.jobs().forEach((j) -> deleteJobs.call(j.jobId));
3✔
991
                        });
3✔
992
                        return copied;
3✔
993
                }
994
        }
995

996
        @Override
997
        public void destroyJob(int id, String reason) {
998
                var allocations = new Allocations(
3✔
999
                                id, execute(conn -> destroyJob(conn, id, reason)));
3✔
1000
                allocations.updateEpochs();
3✔
1001
                allocations.updateBMPs();
3✔
1002
        }
3✔
1003

1004
        private void destroyAllJobs() {
1005
                var allocations = new Allocations();
3✔
1006
                allocations.jobIds.addAll(execute(conn -> {
3✔
1007
                        try (var getAll = conn.query(GET_ALL_LIVE_JOBS);
3✔
1008
                                        var destroyAll = conn.update(DESTROY_ALL_LIVE_JOBS);
3✔
1009
                                        var killAllAllocs = conn.update(KILL_ALL_JOB_ALLOC_TASK)) {
3✔
1010
                                var jobs = getAll.call(integer("job_id"));
3✔
1011
                                killAllAllocs.call();
3✔
1012
                                destroyAll.call(
3✔
1013
                                                "The machine has been shut down due to an emergency");
1014
                                return jobs;
3✔
1015
                        }
1016
                }));
1017
                allocations.updateEpochs();
3✔
1018
        }
3✔
1019

1020
        /**
1021
         * Destroy a job.
1022
         *
1023
         * @param conn
1024
         *            How to talk to the DB
1025
         * @param id
1026
         *            The ID of the job
1027
         * @param reason
1028
         *            Why is the job being destroyed.
1029
         * @return Whether the job was destroyed.
1030
         */
1031
        private Collection<BMPAndMachine> destroyJob(Connection conn, int id,
1032
                        String reason) {
1033
                JobLifecycle.log.info("destroying job {} \"{}\"", id, reason);
3✔
1034
                try (var sql = new DestroySQL(conn)) {
3✔
1035
                        if (sql.getJob.call1(enumerate("job_state", JobState.class), id)
3✔
1036
                                        .orElse(DESTROYED) == DESTROYED) {
3✔
1037
                                log.info("job {} already destroyed", id);
3✔
1038
                                /*
1039
                                 * Don't do anything if the job doesn't exist or is already
1040
                                 * destroyed
1041
                                 */
1042
                                return List.of();
3✔
1043
                        }
1044
                        /*
1045
                         * Record the reason as a separate step here; state change can take
1046
                         * some time.  It also doesn't matter if we do that twice.
1047
                         */
1048
                        sql.recordDestroyReason.call(reason, id);
3✔
1049
                        // Inserts into pending_changes; these run after job is dead
1050
                        var bmps = setPower(sql, id, OFF, DESTROYED);
3✔
1051
                        sql.killAlloc.call(id);
3✔
1052
                        sql.markAsDestroyed.call(reason, id);
3✔
1053
                        log.info("job {} marked as destroyed", id);
3✔
1054
                        return bmps;
3✔
1055
                } finally {
3✔
1056
                        quotaManager.finishJob(id);
3✔
1057
                        rememberer.closeJob(id);
3✔
1058
                }
1059
        }
1060

1061
        /**
1062
         * Computes the estimate of what sort of allocation will be required.
1063
         * Converts a number of boards into a close-to-square size to search for.
1064
         * <p>
1065
         * With the big machine's level of resources, that's good enough. For now.
1066
         *
1067
         * @author Donal Fellows
1068
         */
1069
        private static final class DimensionEstimate {
1070
                /** The estimated width, in triads. */
1071
                final int width;
1072

1073
                /** The estimated height, in triads. */
1074
                final int height;
1075

1076
                /**
1077
                 * The number of boards in the rectangle of triads that we can tolerate
1078
                 * being down due to overallocation (due to the use of rectangles and
1079
                 * triads).
1080
                 */
1081
                final int tolerance;
1082

1083
                /**
1084
                 * Create an estimate of what to allocate. The old spalloc would take
1085
                 * hints at this point on the aspect ratio, but we don't bother; we
1086
                 * strongly prefer allocations "nearly square", going for making them
1087
                 * slightly taller than wide if necessary.
1088
                 *
1089
                 * @param numBoards
1090
                 *            The number of boards wanted.
1091
                 * @param max
1092
                 *            The size of the machine.
1093
                 */
1094
                DimensionEstimate(int numBoards, Rectangle max) {
3✔
1095
                        if (numBoards < 1) {
3✔
1096
                                throw new IllegalArgumentException(
×
1097
                                                "number of boards must be greater than zero");
1098
                        }
1099
                        int numTriads = ceildiv(numBoards, TRIAD_DEPTH);
3✔
1100
                        width = min((int) ceil(sqrt(numTriads)), max.width);
3✔
1101
                        height = min(ceildiv(numTriads, width), max.height);
3✔
1102
                        tolerance = (width * height * TRIAD_DEPTH) - numBoards;
3✔
1103
                        if (width < 1 || height < 1) {
3✔
1104
                                throw new IllegalArgumentException(
×
1105
                                                "computed dimensions must be greater than zero");
1106
                        }
1107
                        if (tolerance < 0) {
3✔
1108
                                throw new IllegalArgumentException(
×
1109
                                                "that job cannot possibly fit on this machine");
1110
                        }
1111
                }
3✔
1112

1113
                /**
1114
                 * Create an estimate of what to allocate. This does not need to be
1115
                 * "near square".
1116
                 *
1117
                 * @param w
1118
                 *            The width of the allocation requested, in triads.
1119
                 * @param h
1120
                 *            The height of the allocation requested, in triads.
1121
                 * @param max
1122
                 *            The size of the machine.
1123
                 */
1124
                DimensionEstimate(int w, int h, Rectangle max) {
3✔
1125
                        if (w < 1 || h < 1) {
3✔
1126
                                throw new IllegalArgumentException(
×
1127
                                                "dimensions must be greater than zero");
1128
                        }
1129
                        int numBoards = w * h * TRIAD_DEPTH;
3✔
1130
                        width = max(1, min(w, max.width));
3✔
1131
                        height = max(1, min(h, max.height));
3✔
1132
                        tolerance = (width * height * TRIAD_DEPTH) - numBoards;
3✔
1133
                        if (tolerance < 0) {
3✔
1134
                                throw new IllegalArgumentException(
3✔
1135
                                                "that job cannot possibly fit on this machine");
1136
                        }
1137
                }
3✔
1138

1139
                /** @return The estimated dimensions as a rectangle. */
1140
                Rectangle getRect() {
1141
                        return new Rectangle(width, height, TRIAD_DEPTH);
3✔
1142
                }
1143
        }
1144

1145
        private Collection<BMPAndMachine> allocateOneBoard(AllocSQL sql, int jobId,
1146
                        int machineId) {
1147
                // This is simplified; no subsidiary searching needed
1148
                return sql.findFreeBoard
3✔
1149
                                .call1(row -> setAllocation(sql, jobId,
3✔
1150
                                                ONE_BOARD, machineId, coords(row)), machineId)
3✔
1151
                                .orElse(List.of());
3✔
1152
        }
1153

1154
        private static TriadCoords coords(Row row) {
1155
                int x = row.getInt("x");
3✔
1156
                int y = row.getInt("y");
3✔
1157
                int z = row.getInt("z");
3✔
1158
                return new TriadCoords(x, y, z);
3✔
1159
        }
1160

1161
        private Collection<BMPAndMachine> allocateDimensions(AllocSQL sql,
1162
                        int jobId, int machineId, DimensionEstimate estimate,
1163
                        int userMaxDead) {
1164
                int tolerance = userMaxDead + estimate.tolerance;
3✔
1165
                int minArea =
3✔
1166
                                estimate.width * estimate.height * TRIAD_DEPTH - tolerance;
1167
                for (var root : sql.getRectangles
3✔
1168
                                .call(AllocatorTask::coords, estimate.width, estimate.height,
3✔
1169
                                                machineId, tolerance)) {
3✔
1170
                        if (minArea > 1) {
3✔
1171
                                /*
1172
                                 * Check that a minimum number of boards are reachable from the
1173
                                 * proposed root board. If the root board is isolated, we don't
1174
                                 * care if the rest of the allocation works because the rest of
1175
                                 * the toolchain won't cope.
1176
                                 */
1177
                                int size = connectedSize(sql, machineId, root, estimate);
3✔
1178
                                if (size < minArea) {
3✔
1179
                                        continue;
×
1180
                                }
1181
                        }
1182
                        return setAllocation(sql, jobId, estimate.getRect(), machineId,
3✔
1183
                                        root);
1184
                }
1185
                log.debug("Could not allocate min area {}", minArea);
×
1186
                return List.of();
×
1187
        }
1188

1189
        /**
1190
         * Find the number of boards that are reachable from the proposed root
1191
         * board.
1192
         *
1193
         * @param sql
1194
         *            How to talk to the DB
1195
         * @param machineId
1196
         *            The machine on which the allocation is happening
1197
         * @param root
1198
         *            Root logical coordinates
1199
         * @param width
1200
         *            The width of the planned allocation, in triads
1201
         * @param height
1202
         *            The width of the planned allocation, in triads
1203
         * @return How many boards in the allocation are reachable.
1204
         */
1205
        private int connectedSize(AllocSQL sql, int machineId, TriadCoords root,
1206
                        int width, int height) {
1207
                return sql.countConnectedBoards
3✔
1208
                                .call1(integer("connected_size"), machineId, root.x, root.y,
3✔
1209
                                                width, height).orElse(-1);
3✔
1210
        }
1211

1212
        /**
1213
         * Find the number of boards that are reachable from the proposed root
1214
         * board.
1215
         *
1216
         * @param sql
1217
         *            How to talk to the DB
1218
         * @param machineId
1219
         *            The machine on which the allocation is happening
1220
         * @param root
1221
         *            Root logical coordinates
1222
         * @param rect
1223
         *            The requested allocation dimensions
1224
         * @return How many boards in the allocation are reachable.
1225
         */
1226
        private int connectedSize(AllocSQL sql, int machineId, TriadCoords root,
1227
                        Rectangle rect) {
1228
                return connectedSize(sql, machineId, root, rect.width, rect.height);
×
1229
        }
1230

1231
        /**
1232
         * Find the number of boards that are reachable from the proposed root
1233
         * board.
1234
         *
1235
         * @param sql
1236
         *            How to talk to the DB
1237
         * @param machineId
1238
         *            The machine on which the allocation is happening
1239
         * @param root
1240
         *            Root logical coordinates
1241
         * @param estimate
1242
         *            The planned allocation dimensions
1243
         * @return How many boards in the allocation are reachable.
1244
         */
1245
        private int connectedSize(AllocSQL sql, int machineId, TriadCoords root,
1246
                        DimensionEstimate estimate) {
1247
                return connectedSize(sql, machineId, root, estimate.width,
3✔
1248
                                estimate.height);
1249
        }
1250

1251
        private Collection<BMPAndMachine> allocateBoard(AllocSQL sql, int jobId,
1252
                        int machineId, int boardId) {
1253
                return sql.findSpecificBoard
3✔
1254
                                .call1(row -> setAllocation(sql, jobId, ONE_BOARD, machineId,
3✔
1255
                                                coords(row)), machineId, boardId)
3✔
1256
                                .orElse(List.of());
3✔
1257
        }
1258

1259
        private Collection<BMPAndMachine> allocateTriadsAt(AllocSQL sql, int jobId,
1260
                        int machineId, int rootId, int width, int height,
1261
                        int maxDeadBoards) {
1262
                var rect = new Rectangle(width, height, TRIAD_DEPTH);
×
1263
                return sql.getRectangleAt
×
1264
                                .call1(AllocatorTask::coords, rootId, width, height, machineId,
×
1265
                                                maxDeadBoards)
×
1266
                                .filter(root -> connectedSize(sql, machineId, root,
×
1267
                                                rect) >= rect.getArea() - maxDeadBoards)
×
1268
                                .map(root -> setAllocation(sql, jobId, rect, machineId, root))
×
1269
                                .orElse(List.of());
×
1270
        }
1271

1272
        /**
1273
         * Does the actual allocation.
1274
         * <p>
1275
         * At this point, we've checked that there's enough boards <em>and</em> we
1276
         * are running in a transaction. We take particular care that we only
1277
         * actually allocate boards that are reachable from the root board; this is
1278
         * assumed by the SpiNNaker tools, so we'd better conform to that
1279
         * expectation. Fortunately, the bigger the allocation, the more likely that
1280
         * is true (and it is trivially true for single-board allocations.)
1281
         * <p>
1282
         * If you want a multi-board allocation, you'd better be allocating a full
1283
         * triad's-worth of depth or you'll get nothing.
1284
         *
1285
         * @param sql
1286
         *            How to talk to the DB
1287
         * @param jobId
1288
         *            What job are we allocating for
1289
         * @param rect
1290
         *            Proposed rectangle size
1291
         * @param machineId
1292
         *            What machine are we allocating on
1293
         * @param root
1294
         *            Proposed root coordinates
1295
         * @return The BMPs that have been used to make the allocation.
1296
         */
1297
        private Collection<BMPAndMachine> setAllocation(AllocSQL sql, int jobId,
1298
                        Rectangle rect,        int machineId, TriadCoords root) {
1299
                log.debug("performing allocation for {}: {}x{}x{} at {}:{}:{}", jobId,
3✔
1300
                                rect.width, rect.height, rect.depth, root.x, root.y, root.z);
3✔
1301
                var boardsToAllocate = sql.getConnectedBoardIDs
3✔
1302
                                .call(integer("board_id"), machineId, root.x, root.y, root.z,
3✔
1303
                                                rect.width, rect.height, rect.depth);
3✔
1304
                if (boardsToAllocate.isEmpty()) {
3✔
1305
                        log.debug("No boards to allocate");
×
1306
                        return List.of();
×
1307
                }
1308
                for (var boardId : boardsToAllocate) {
3✔
1309
                        sql.allocBoard.call(jobId, boardId);
3✔
1310
                }
3✔
1311

1312
                var board = boardsToAllocate.get(0);
3✔
1313
                sql.allocJob.call(rect.width, rect.height, rect.depth,
3✔
1314
                                board, boardsToAllocate.size(), board, jobId);
3✔
1315
                log.info("allocated {} boards to {}; issuing power up commands",
3✔
1316
                                boardsToAllocate.size(), jobId);
3✔
1317
                // Any proxies that existed are now defunct; user must make anew
1318
                rememberer.closeJob(jobId);
3✔
1319
                return setPower(sql, jobId, ON, READY);
3✔
1320
        }
1321

1322
        @Override
1323
        public boolean setPower(int jobId, PowerState power, JobState targetState) {
1324
                if (targetState == DESTROYED) {
3✔
1325
                        throw new IllegalArgumentException(
×
1326
                                        "job destruction must be done via destroyJob() method");
1327
                }
1328
                var allocations = new Allocations(jobId, execute(conn -> {
3✔
1329
                        try (var sql = new PowerSQL(conn)) {
3✔
1330
                                return setPower(sql, jobId, power, targetState);
3✔
1331
                        }
1332
                }));
1333
                allocations.updateEpochs();
3✔
1334
                allocations.updateBMPs();
3✔
1335
                return allocations.notEmpty();
3✔
1336
        }
1337

1338
        /**
1339
         * Issue a request to change the power for the boards of a job.
1340
         *
1341
         * @param sql
1342
         *            How to talk to the DB
1343
         * @param jobId
1344
         *            The job in question
1345
         * @param power
1346
         *            The power state to switch to
1347
         * @param targetState
1348
         *            The state to put the job in afterwards
1349
         * @return The ids of the BMPs that have been changed
1350
         */
1351
        private Collection<BMPAndMachine> setPower(PowerSQL sql, int jobId,
1352
                        PowerState power, JobState targetState) {
1353
                var sourceState = sql.getJobState.call1(
3✔
1354
                                enumerate("job_state", JobState.class), jobId).orElseThrow();
3✔
1355
                var boards = sql.getJobBoards.call(BoardAndBMP::new, jobId);
3✔
1356
                if (boards.isEmpty()) {
3✔
1357
                        log.debug("No boards for job {}", jobId);
3✔
1358
                        if (targetState == DESTROYED) {
3✔
1359
                                log.debug("no boards for {} in destroy", jobId);
3✔
1360
                        }
1361
                        // Make sure we still update the job!
1362
                        updateJob(jobId, sourceState, targetState);
3✔
1363
                        return List.of();
3✔
1364
                }
1365
                log.debug("{} boards for job {}", boards.size(), jobId);
3✔
1366

1367
                // Number of changes pending, one per board
1368
                int numPending = 0;
3✔
1369

1370
                var bmps = new HashSet<BMPAndMachine>();
3✔
1371
                if (power == ON) {
3✔
1372
                        /*
1373
                         * This is a bit of a trickier case, as we need to say which links
1374
                         * are to be switched on or, more particularly, which are to be
1375
                         * switched off because they are links to boards that are not
1376
                         * allocated to the job. Off-board links are shut off by default.
1377
                         */
1378
                        var perimeterLinks = Row.stream(
3✔
1379
                                        sql.getPerimeter.call(Perimeter::new, jobId))
3✔
1380
                                        .toCollectingMap(Direction.class, (p) -> p.boardId,
3✔
1381
                                                        (p) -> p.direction);
3✔
1382

1383
                        for (var board : boards) {
3✔
1384
                                var toChange = perimeterLinks.getOrDefault(board.boardId,
3✔
1385
                                                NO_PERIMETER);
1386
                                numPending += sql.issuePowerChange.call(jobId,
3✔
1387
                                                board.boardId,
3✔
1388
                                                sourceState, targetState, true,
3✔
1389
                                                !toChange.contains(Direction.N),
3✔
1390
                                                !toChange.contains(Direction.E),
3✔
1391
                                                !toChange.contains(Direction.SE),
3✔
1392
                                                !toChange.contains(Direction.S),
3✔
1393
                                                !toChange.contains(Direction.W),
3✔
1394
                                                !toChange.contains(Direction.NW));
3✔
1395
                                bmps.add(board.bmp);
3✔
1396
                        }
3✔
1397
                } else {
3✔
1398
                        // Powering off; all links switch to off so no perimeter check
1399
                        for (var board : boards) {
3✔
1400
                                numPending += sql.issuePowerChange.call(jobId,
3✔
1401
                                                board.boardId,        sourceState, targetState,
3✔
1402
                                                false, false, false, false,        false, false, false);
3✔
1403
                                bmps.add(board.bmp);
3✔
1404
                        }
3✔
1405
                }
1406

1407
                if (targetState == DESTROYED) {
3✔
1408
                        log.debug("num changes for {} in destroy: {}", jobId, numPending);
3✔
1409
                        log.info("destroying job {} after power change", jobId);
3✔
1410
                        int rows = sql.setStateDestroyed.call(jobId);
3✔
1411
                        if (rows != 1) {
3✔
1412
                                log.warn("unexpected number of jobs marked destroyed: {}",
×
1413
                                                rows);
×
1414
                        }
1415
                } else {
3✔
1416
                        log.debug("Num changes for target {}: {}", targetState, numPending);
3✔
1417
                        sql.setStatePending.call(
3✔
1418
                                numPending > 0 ? POWER : targetState, jobId);
3✔
1419
                }
1420

1421
                return bmps;
3✔
1422
        }
1423

1424
        /**
1425
         * Operations for testing only.
1426
         *
1427
         * @hidden
1428
         */
1429
        @ForTestingOnly
1430
        interface TestAPI {
1431
                /**
1432
                 * Allocate all current requests for resources.
1433
                 *
1434
                 * @return The BMPs updated by the allocation.
1435
                 */
1436
                Allocations allocate();
1437

1438
                /**
1439
                 * Destroy a job.
1440
                 *
1441
                 * @param id
1442
                 *            The ID of the job
1443
                 * @param reason
1444
                 *            Why is the job being destroyed.
1445
                 * @return The BMPs updated by the destruction.
1446
                 */
1447
                Allocations destroyJob(int id, String reason);
1448

1449
                /**
1450
                 * Destroy jobs that have missed their keepalive.
1451
                 *
1452
                 * @return The BMPs updated by the expiry.
1453
                 */
1454
                Allocations expireJobs();
1455

1456
                /**
1457
                 * Stop the allocator.
1458
                 */
1459
                void emergencyStop();
1460

1461
                /**
1462
                 * Restart allocations after stopping, to allow more tests!
1463
                 */
1464
                void restartAfterStop();
1465
        }
1466

1467
        /** Operations for testing historical database only. */
1468
        @ForTestingOnly
1469
        interface HistTestAPI {
1470

1471

1472
                /**
1473
                 * Implementation of {@link AllocatorTask#tombstone()}.
1474
                 *
1475
                 * @return Description of the tombstoned IDs
1476
                 */
1477
                Copied tombstone();
1478
        }
1479

1480
        /**
1481
         * @param conn
1482
         *            The DB connection
1483
         * @return The test interface.
1484
         * @deprecated This interface is just for testing.
1485
         * @hidden
1486
         */
1487
        @ForTestingOnly
1488
        @RestrictedApi(explanation = "just for testing", link = "index.html",
1489
                        allowedOnPath = ".*/src/test/java/.*")
1490
        @Deprecated
1491
        TestAPI getTestAPI(Connection conn) {
1492
                ForTestingOnly.Utils.checkForTestClassOnStack();
3✔
1493
                return new TestAPI() {
3✔
1494
                        @Override
1495
                        public Allocations allocate() {
1496
                                return AllocatorTask.this.allocate(conn);
3✔
1497
                        }
1498

1499
                        @Override
1500
                        public Allocations destroyJob(int id, String reason) {
1501
                                return new Allocations(id,
3✔
1502
                                                AllocatorTask.this.destroyJob(conn, id, reason));
3✔
1503
                        }
1504

1505
                        @Override
1506
                        public Allocations expireJobs() {
1507
                                return AllocatorTask.this.expireJobs(conn);
3✔
1508
                        }
1509

1510
                        @Override
1511
                        public void emergencyStop() {
1512
                                AllocatorTask.this.emergencyStop();
3✔
1513
                        }
3✔
1514

1515
                        @Override
1516
                        public void restartAfterStop() {
1517
                                synchronized (AllocatorTask.this.futures) {
3✔
1518
                                        AllocatorTask.this.emergencyStop = false;
3✔
1519
                                        AllocatorTask.this.futures.clear();
3✔
1520
                                        AllocatorTask.this.init();
3✔
1521
                                }
3✔
1522
                        }
3✔
1523
                };
1524
        }
1525

1526
        /**
1527
         * @param conn
1528
         *            The DB connection
1529
         * @param histConn
1530
         *            The historical DB connection
1531
         * @return The test interface.
1532
         * @deprecated This interface is just for testing.
1533
         */
1534
        @ForTestingOnly
1535
        @RestrictedApi(explanation = "just for testing", link = "index.html",
1536
                        allowedOnPath = ".*/src/test/java/.*")
1537
        @Deprecated
1538
        HistTestAPI getHistTestAPI(Connection conn, Connection histConn) {
1539
                ForTestingOnly.Utils.checkForTestClassOnStack();
3✔
1540
                return new HistTestAPI() {
3✔
1541
                        @Override
1542
                        public Copied tombstone() {
1543
                                return AllocatorTask.this.tombstone(conn, histConn);
3✔
1544
                        }
1545
                };
1546
        }
1547
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc