• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 6233274834

19 Sep 2023 08:46AM UTC coverage: 36.409% (-0.6%) from 36.982%
6233274834

Pull #658

github

dkfellows
Merge branch 'master' into java-17
Pull Request #658: Update Java version to 17

1656 of 1656 new or added lines in 260 files covered. (100.0%)

8373 of 22997 relevant lines covered (36.41%)

0.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.83
/SpiNNaker-allocserv/src/main/java/uk/ac/manchester/spinnaker/alloc/allocator/Epochs.java
1
/*
2
 * Copyright (c) 2021 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.alloc.allocator;
17

18
import static java.util.Objects.nonNull;
19
import static org.slf4j.LoggerFactory.getLogger;
20

21
import java.time.Duration;
22
import java.util.Collection;
23
import java.util.HashMap;
24
import java.util.HashSet;
25
import java.util.List;
26
import java.util.Map;
27
import java.util.Set;
28
import java.util.WeakHashMap;
29

30
import javax.annotation.PostConstruct;
31

32
import org.slf4j.Logger;
33
import org.springframework.beans.factory.annotation.Autowired;
34
import org.springframework.scheduling.TaskScheduler;
35
import org.springframework.stereotype.Service;
36

37
import com.google.errorprone.annotations.concurrent.GuardedBy;
38

39
/**
40
 * Manages waiting for values.
41
 *
42
 * @author Donal Fellows
43
 * @author Andrew Rowley
44
 */
45
@Service
46
public class Epochs {
1✔
47
        private static final Logger log = getLogger(Epochs.class);
1✔
48

49
        /** How long to wait between cleaning of the maps. */
50
        private static final Duration CLEANING_SCHEDULE = Duration.ofSeconds(30);
1✔
51

52
        @Autowired
53
        private TaskScheduler scheduler;
54

55
        /**
56
         * Mapping from job ID to set of epoch handles waiting on it for a state
57
         * change.
58
         */
59
        private final EpochMap jobs = new EpochMap();
1✔
60

61
        /**
62
         * Mapping from machine ID to set of epoch handles waiting on it for a state
63
         * change.
64
         */
65
        private final EpochMap machines = new EpochMap();
1✔
66

67
        /**
68
         * Mapping from board ID to set of epoch handles waiting on it for blacklist
69
         * handling.
70
         */
71
        private final EpochMap blacklists = new EpochMap();
1✔
72

73
        @PostConstruct
74
        private void init() {
75
                scheduler.scheduleAtFixedRate(this::checkEmpties, CLEANING_SCHEDULE);
1✔
76
        }
1✔
77

78
        /**
79
         * The maps contain weak reference maps, where Epochs are removed when they
80
         * are no longer referenced, but the map will still contain the empty weak
81
         * reference map unless removed. This tasklet handles that cleanup.
82
         */
83
        private void checkEmpties() {
84
                if (jobs.checkEmptyValues()) {
1✔
85
                        log.debug("Job map now contains jobs {}", jobs.getIds());
1✔
86
                }
87
                if (machines.checkEmptyValues()) {
1✔
88
                        log.debug("Machine map now contains machines {}",
1✔
89
                                        machines.getIds());
1✔
90
                }
91
                if (blacklists.checkEmptyValues()) {
1✔
92
                        log.debug("Blacklist map now contains boards {}",
×
93
                                        blacklists.getIds());
×
94
                }
95
        }
1✔
96

97
        /**
98
         * Get a job epoch for a job.
99
         *
100
         * @param jobId
101
         *            The job identifier.
102
         * @return The epoch handle.
103
         */
104
        public Epoch getJobsEpoch(int jobId) {
105
                return new Epoch(jobs, jobId);
1✔
106
        }
107

108
        /**
109
         * Get a job epoch for a list of jobs.
110
         *
111
         * @param jobIds
112
         *            The job identifiers.
113
         * @return The epoch handle.
114
         */
115
        public Epoch getJobsEpoch(List<Integer> jobIds) {
116
                return new Epoch(jobs, jobIds);
1✔
117
        }
118

119
        /**
120
         * Indicate a change in a job. Will wake any thread waiting on changes to
121
         * the job epoch with {@link Epoch#waitForChange(Duration) waitForChange()}
122
         * on a {@code Epoch} handle.
123
         *
124
         * @param job
125
         *            The job that has changed
126
         */
127
        public void jobChanged(int job) {
128
                jobs.changed(job);
1✔
129
        }
1✔
130

131
        /**
132
         * Get a machine epoch for a machine.
133
         *
134
         * @param machineId
135
         *            The identifier of the machine.
136
         * @return The epoch handle.
137
         */
138
        public Epoch getMachineEpoch(int machineId) {
139
                return new Epoch(machines, machineId);
1✔
140
        }
141

142
        /**
143
         * Get a machine epoch for a set of machines.
144
         *
145
         * @param machineIds
146
         *            The identifiers of the machine.
147
         * @return The epoch handle.
148
         */
149
        public Epoch getMachinesEpoch(List<Integer> machineIds) {
150
                return new Epoch(machines, machineIds);
1✔
151
        }
152

153
        /**
154
         * Indicate a change in a machine. Will wake any thread waiting on changes
155
         * to the machine epoch with {@link Epoch#waitForChange(Duration)
156
         * waitForChange()} on a {@code Epoch} handle.
157
         *
158
         * @param machine
159
         *            The machine that has changed
160
         */
161
        public void machineChanged(int machine) {
162
                machines.changed(machine);
1✔
163
        }
1✔
164

165
        /**
166
         * Get a blacklist epoch for a board.
167
         *
168
         * @param boardId
169
         *            The id of the board.
170
         * @return The epoch handle.
171
         */
172
        public Epoch getBlacklistEpoch(int boardId) {
173
                return new Epoch(blacklists, boardId);
1✔
174
        }
175

176
        /**
177
         * Indicate a change in a blacklist. Will wake any thread waiting on changes
178
         * to the blacklist epoch with {@link Epoch#waitForChange(Duration)
179
         * waitForChange()} on a {@code Epoch} handle.
180
         *
181
         * @param board
182
         *            The board that has changed.
183
         */
184
        public void blacklistChanged(int board) {
185
                blacklists.changed(board);
1✔
186
        }
1✔
187

188
        /**
189
         * A waitable epoch checkpoint.
190
         *
191
         * @author Donal Fellows
192
         * @author Andrew Rowley
193
         */
194
        public static final class Epoch {
195
                private final EpochMap map;
196

197
                private final List<Integer> ids;
198

199
                private final Set<Integer> changed = new HashSet<>();
1✔
200

201
                private Epoch(EpochMap map, int id) {
1✔
202
                        this.map = map;
1✔
203
                        this.ids = List.of(id);
1✔
204
                        map.addAll(this, ids);
1✔
205
                }
1✔
206

207
                private Epoch(EpochMap map, List<Integer> ids) {
1✔
208
                        if (ids.isEmpty()) {
1✔
209
                                log.warn("empty ID list; will never wake");
×
210
                        }
211
                        this.map = map;
1✔
212
                        this.ids = List.copyOf(ids);
1✔
213
                        map.addAll(this, ids);
1✔
214
                }
1✔
215

216
                synchronized void updateChanged(int id) {
217
                        log.debug("Change to {}, id {}", this, id);
1✔
218
                        changed.add(id);
1✔
219
                        notifyAll();
1✔
220
                }
1✔
221

222
                /**
223
                 * Wait, for up to {@code timeout}, for a change.
224
                 *
225
                 * @param timeout
226
                 *            The time to wait, in milliseconds.
227
                 * @return Whether the item has changed or not.
228
                 * @throws InterruptedException
229
                 *             If the wait is interrupted.
230
                 */
231
                public boolean waitForChange(Duration timeout)
232
                                throws InterruptedException {
233
                        return !getChanged(timeout).isEmpty();
1✔
234
                }
235

236
                /**
237
                 * Get the set of changed items.
238
                 *
239
                 * @param timeout
240
                 *            The timeout to wait for one item to change.
241
                 * @return The changed items.
242
                 * @throws InterruptedException
243
                 *             If the wait is interrupted.
244
                 */
245
                public Collection<Integer> getChanged(Duration timeout)
246
                                throws InterruptedException {
247
                        synchronized (this) {
1✔
248
                                log.debug("Waiting for change to {}", this);
1✔
249
                                wait(timeout.toMillis());
1✔
250
                                log.debug("After wait, changed: {}", changed);
1✔
251
                                return Set.copyOf(changed);
1✔
252
                        }
253
                }
254

255
                /**
256
                 * Check if this epoch is the current one.
257
                 *
258
                 * @return Whether this is a valid epoch.
259
                 */
260
                public boolean isValid() {
261
                        return map.containsAnyKey(ids);
1✔
262
                }
263
        }
264
}
265

266
/**
267
 * A weak mapping from an ID to the epoch handles that care about it. Handles
268
 * will be removed when they get garbage collected.
269
 *
270
 * @author Donal Fellows
271
 */
272
class EpochMap {
1✔
273
        /** The value in {@link #map} leaves. Shared. Unimportant. */
274
        private static final Object OBJ = new Object();
1✔
275

276
        /** A map from integers to weak sets of epochs. */
277
        @GuardedBy("this")
1✔
278
        private final Map<Integer, Map<Epochs.Epoch, Object>> map = new HashMap<>();
279

280
        synchronized boolean checkEmptyValues() {
281
                return map.entrySet().removeIf(entry -> entry.getValue().isEmpty());
1✔
282
        }
283

284
        void changed(int id) {
285
                var items = getSet(id);
1✔
286
                if (nonNull(items)) {
1✔
287
                        for (var item : items) {
1✔
288
                                item.updateChanged(id);
1✔
289
                        }
1✔
290
                }
291
        }
1✔
292

293
        /**
294
         * Take the set of epochs for a particular ID.
295
         *
296
         * @param id
297
         *            The key into the map.
298
         * @return The removed set of epochs. Empty if the key is absent.
299
         */
300
        private synchronized Set<Epochs.Epoch> getSet(Integer id) {
301
                var weakmap = map.get(id);
1✔
302
                if (weakmap == null) {
1✔
303
                        return null;
1✔
304
                }
305
                // Copy the set here while still synchronized to avoid
306
                // ConcurrentModificationException when updated elsewhere.
307
                return Set.copyOf(weakmap.keySet());
1✔
308
        }
309

310
        synchronized void addAll(Epochs.Epoch epoch, List<Integer> ids) {
311
                for (var id : ids) {
1✔
312
                        map.computeIfAbsent(id, __ -> new WeakHashMap<>()).put(epoch, OBJ);
1✔
313
                }
1✔
314
        }
1✔
315

316
        @SuppressWarnings("GuardedBy")
317
        synchronized boolean containsAnyKey(Collection<Integer> ids) {
318
                return ids.stream().allMatch(map::containsKey);
1✔
319
        }
320

321
        synchronized Collection<Integer> getIds() {
322
                return map.keySet();
1✔
323
        }
324
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc