• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 6074875230

04 Sep 2023 02:30PM UTC coverage: 36.847% (-0.2%) from 37.017%
6074875230

push

github

web-flow
Merge pull request #992 from SpiNNakerManchester/debug_compat

Fix the notification of compat jobs

35 of 35 new or added lines in 4 files covered. (100.0%)

8649 of 23473 relevant lines covered (36.85%)

0.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.95
/SpiNNaker-allocserv/src/main/java/uk/ac/manchester/spinnaker/alloc/allocator/Epochs.java
1
/*
2
 * Copyright (c) 2021 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.alloc.allocator;
17

18
import static java.util.Objects.nonNull;
19
import static org.slf4j.LoggerFactory.getLogger;
20

21
import java.time.Duration;
22
import java.util.Collection;
23
import java.util.ConcurrentModificationException;
24
import java.util.HashMap;
25
import java.util.HashSet;
26
import java.util.List;
27
import java.util.Map;
28
import java.util.Set;
29
import java.util.WeakHashMap;
30
import java.util.function.BiConsumer;
31

32
import javax.annotation.PostConstruct;
33

34
import org.slf4j.Logger;
35
import org.springframework.beans.factory.annotation.Autowired;
36
import org.springframework.scheduling.TaskScheduler;
37
import org.springframework.stereotype.Service;
38

39
import com.google.errorprone.annotations.concurrent.GuardedBy;
40

41
import uk.ac.manchester.spinnaker.utils.UsedInJavadocOnly;
42

43
/**
44
 * Manages waiting for values.
45
 *
46
 * @author Donal Fellows
47
 * @author Andrew Rowley
48
 */
49
@Service
50
public class Epochs {
3✔
51
        private static final Logger log = getLogger(Epochs.class);
3✔
52

53
        /** How long to wait between cleaning of the maps. */
54
        private static final Duration CLEANING_SCHEDULE = Duration.ofSeconds(30);
3✔
55

56
        @Autowired
57
        private TaskScheduler scheduler;
58

59
        /**
60
         * Mapping from job ID to set of epoch handles waiting on it for a state
61
         * change.
62
         */
63
        private final EpochMap jobs = new EpochMap();
3✔
64

65
        /**
66
         * Mapping from machine ID to set of epoch handles waiting on it for a state
67
         * change.
68
         */
69
        private final EpochMap machines = new EpochMap();
3✔
70

71
        /**
72
         * Mapping from board ID to set of epoch handles waiting on it for blacklist
73
         * handling.
74
         */
75
        private final EpochMap blacklists = new EpochMap();
3✔
76

77
        @PostConstruct
78
        private void init() {
79
                scheduler.scheduleAtFixedRate(this::checkEmpties, CLEANING_SCHEDULE);
3✔
80
        }
3✔
81

82
        /**
83
         * The maps contain weak reference maps, where Epochs are removed when they
84
         * are no longer referenced, but the map will still contain the empty weak
85
         * reference map unless removed. This tasklet handles that cleanup.
86
         */
87
        private void checkEmpties() {
88
                if (jobs.checkEmptyValues()) {
3✔
89
                        log.debug("Job map now contains jobs {}", jobs.getIds());
3✔
90
                }
91
                if (machines.checkEmptyValues()) {
3✔
92
                        log.debug("Machine map now contains machines {}",
×
93
                                        machines.getIds());
×
94
                }
95
                if (blacklists.checkEmptyValues()) {
3✔
96
                        log.debug("Blacklist map now contains boards {}",
3✔
97
                                        blacklists.getIds());
3✔
98
                }
99
        }
3✔
100

101
        /**
102
         * Get a job epoch for a job.
103
         *
104
         * @param jobId
105
         *            The job identifier.
106
         * @return The epoch handle.
107
         */
108
        public Epoch getJobsEpoch(int jobId) {
109
                return new Epoch(jobs, jobId);
3✔
110
        }
111

112
        /**
113
         * Get a job epoch for a list of jobs.
114
         *
115
         * @param jobIds
116
         *            The job identifiers.
117
         * @return The epoch handle.
118
         */
119
        public Epoch getJobsEpoch(List<Integer> jobIds) {
120
                return new Epoch(jobs, jobIds);
3✔
121
        }
122

123
        /**
124
         * Indicate a change in a job. Will wake any thread waiting on changes to
125
         * the job epoch with {@link Epoch#waitForChange(Duration) waitForChange()}
126
         * on a {@code Epoch} handle.
127
         *
128
         * @param job
129
         *            The job that has changed
130
         */
131
        public void jobChanged(int job) {
132
                jobs.changed(job);
3✔
133
        }
3✔
134

135
        /**
136
         * Get a machine epoch for a machine.
137
         *
138
         * @param machineId
139
         *            The identifier of the machine.
140
         * @return The epoch handle.
141
         */
142
        public Epoch getMachineEpoch(int machineId) {
143
                return new Epoch(machines, machineId);
3✔
144
        }
145

146
        /**
147
         * Get a machine epoch for a set of machines.
148
         *
149
         * @param machineIds
150
         *            The identifiers of the machine.
151
         * @return The epoch handle.
152
         */
153
        public Epoch getMachinesEpoch(List<Integer> machineIds) {
154
                return new Epoch(machines, machineIds);
3✔
155
        }
156

157
        /**
158
         * Indicate a change in a machine. Will wake any thread waiting on changes
159
         * to the machine epoch with {@link Epoch#waitForChange(Duration)
160
         * waitForChange()} on a {@code Epoch} handle.
161
         *
162
         * @param machine
163
         *            The machine that has changed
164
         */
165
        public void machineChanged(int machine) {
166
                machines.changed(machine);
3✔
167
        }
3✔
168

169
        /**
170
         * Get a blacklist epoch for a board.
171
         *
172
         * @param boardId
173
         *            The id of the board.
174
         * @return The epoch handle.
175
         */
176
        public Epoch getBlacklistEpoch(int boardId) {
177
                return new Epoch(blacklists, boardId);
3✔
178
        }
179

180
        /**
181
         * Indicate a change in a blacklist. Will wake any thread waiting on changes
182
         * to the blacklist epoch with {@link Epoch#waitForChange(Duration)
183
         * waitForChange()} on a {@code Epoch} handle.
184
         *
185
         * @param board
186
         *            The board that has changed.
187
         */
188
        public void blacklistChanged(int board) {
189
                blacklists.changed(board);
3✔
190
        }
3✔
191

192
        /**
193
         * A waitable epoch checkpoint.
194
         *
195
         * @author Donal Fellows
196
         * @author Andrew Rowley
197
         */
198
        public final class Epoch {
199
                private final EpochMap map;
200

201
                private final List<Integer> ids;
202

203
                private final Set<Integer> changed = new HashSet<>();
3✔
204

205
                private Epoch(EpochMap map, int id) {
3✔
206
                        this.map = map;
3✔
207
                        this.ids = List.of(id);
3✔
208
                        map.addAll(this, ids);
3✔
209
                }
3✔
210

211
                private Epoch(EpochMap map, List<Integer> ids) {
3✔
212
                        if (ids.isEmpty()) {
3✔
213
                                log.warn("empty ID list; will never wake");
×
214
                        }
215
                        this.map = map;
3✔
216
                        this.ids = List.copyOf(ids);
3✔
217
                        map.addAll(this, ids);
3✔
218
                }
3✔
219

220
                synchronized void updateChanged(int id) {
221
                        log.debug("Change to {}, id {}", this, id);
3✔
222
                        changed.add(id);
3✔
223
                        notifyAll();
3✔
224
                }
3✔
225

226
                /**
227
                 * Wait, for up to {@code timeout}, for a change.
228
                 *
229
                 * @param timeout
230
                 *            The time to wait, in milliseconds.
231
                 * @return Whether the item has changed or not.
232
                 * @throws InterruptedException
233
                 *             If the wait is interrupted.
234
                 */
235
                public boolean waitForChange(Duration timeout)
236
                                throws InterruptedException {
237
                        return !getChanged(timeout).isEmpty();
3✔
238
                }
239

240
                /**
241
                 * Get the set of changed items.
242
                 *
243
                 * @param timeout
244
                 *            The timeout to wait for one item to change.
245
                 * @return The changed items.
246
                 * @throws InterruptedException
247
                 *             If the wait is interrupted.
248
                 */
249
                public Collection<Integer> getChanged(Duration timeout)
250
                                throws InterruptedException {
251
                        synchronized (this) {
3✔
252
                                log.debug("Waiting for change to {}", this);
3✔
253
                                wait(timeout.toMillis());
3✔
254
                                log.debug("After wait, changed: {}", changed);
3✔
255
                                return Set.copyOf(changed);
3✔
256
                        }
257
                }
258

259
                /**
260
                 * Check if this epoch is the current one.
261
                 *
262
                 * @return Whether this is a valid epoch.
263
                 */
264
                public boolean isValid() {
265
                        return map.containsAnyKey(ids);
3✔
266
                }
267
        }
268
}
269

270
/**
271
 * A weak mapping from an ID to the epoch handles that care about it. Handles
272
 * will be removed when they get garbage collected.
273
 *
274
 * @author Donal Fellows
275
 */
276
class EpochMap {
3✔
277
        /** The value in {@link #map} leaves. Shared. Unimportant. */
278
        private static final Object OBJ = new Object();
3✔
279

280
        /** A map from integers to weak sets of epochs. */
281
        @GuardedBy("this")
3✔
282
        private final Map<Integer, Map<Epochs.Epoch, Object>> map = new HashMap<>();
283

284
        synchronized boolean checkEmptyValues() {
285
                return map.entrySet().removeIf(entry -> entry.getValue().isEmpty());
3✔
286
        }
287

288
        void changed(int id) {
289
                var items = getSet(id);
3✔
290
                if (nonNull(items)) {
3✔
291
                        synchronized (items) {
3✔
292
                                for (var item : items) {
3✔
293
                                        item.updateChanged(id);
3✔
294
                                }
3✔
295
                        }
3✔
296
                }
297
        }
3✔
298

299
        /**
300
         * Take the set of epochs for a particular ID.
301
         *
302
         * @param id
303
         *            The key into the map.
304
         * @return The removed set of epochs. Empty if the key is absent.
305
         */
306
        @UsedInJavadocOnly({
307
                BiConsumer.class, ConcurrentModificationException.class
308
        })
309
        private synchronized Set<Epochs.Epoch> getSet(Integer id) {
310
                var weakmap = map.get(id);
3✔
311
                if (weakmap == null) {
3✔
312
                        return null;
3✔
313
                }
314
                // Copy the set here while still synchronized to avoid
315
                // ConcurrentModificationException when updated elsewhere.
316
                return Set.copyOf(weakmap.keySet());
3✔
317
        }
318

319
        synchronized void addAll(Epochs.Epoch epoch, List<Integer> ids) {
320
                for (var id : ids) {
3✔
321
                        map.computeIfAbsent(id, key -> new WeakHashMap<>()).put(epoch, OBJ);
3✔
322
                }
3✔
323
        }
3✔
324

325
        @SuppressWarnings("GuardedBy")
326
        synchronized boolean containsAnyKey(Collection<Integer> ids) {
327
                return ids.stream().allMatch(map::containsKey);
3✔
328
        }
329

330
        synchronized Collection<Integer> getIds() {
331
                return map.keySet();
3✔
332
        }
333
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc