• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 7044

08 Sep 2025 01:37PM UTC coverage: 36.281% (-0.03%) from 36.308%
7044

push

github

web-flow
Merge pull request #1306 from SpiNNakerManchester/dependabot/maven/org.webjars-swagger-ui-5.28.1

Bump org.webjars:swagger-ui from 5.28.0 to 5.28.1

1909 of 5896 branches covered (32.38%)

Branch coverage included in aggregate %.

8971 of 24092 relevant lines covered (37.24%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

7.51
/SpiNNaker-nmpiserv/src/main/java/uk/ac/manchester/spinnaker/nmpi/jobmanager/OutputManagerImpl.java
1
/*
2
 * Copyright (c) 2014 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.nmpi.jobmanager;
17

18
import static java.lang.System.currentTimeMillis;
19
import static java.nio.file.Files.move;
20
import static java.nio.file.Files.probeContentType;
21
import static java.util.Objects.isNull;
22
import static java.util.Objects.nonNull;
23
import static java.util.concurrent.Executors.newScheduledThreadPool;
24
import static java.util.concurrent.TimeUnit.DAYS;
25
import static java.util.concurrent.TimeUnit.MILLISECONDS;
26
import static jakarta.ws.rs.core.Response.ok;
27
import static jakarta.ws.rs.core.Response.serverError;
28
import static jakarta.ws.rs.core.Response.status;
29
import static jakarta.ws.rs.core.Response.Status.BAD_REQUEST;
30
import static jakarta.ws.rs.core.Response.Status.NOT_FOUND;
31
import static org.slf4j.LoggerFactory.getLogger;
32
import static uk.ac.manchester.spinnaker.utils.ThreadUtils.waitfor;
33

34
import java.io.File;
35
import java.io.FileInputStream;
36
import java.io.FileNotFoundException;
37
import java.io.IOException;
38
import java.io.PrintWriter;
39
import java.net.MalformedURLException;
40
import java.net.URL;
41
import java.util.ArrayList;
42
import java.util.Collection;
43
import java.util.HashMap;
44
import java.util.List;
45
import java.util.Map;
46
import java.util.concurrent.ScheduledExecutorService;
47

48
import jakarta.annotation.PostConstruct;
49
import jakarta.annotation.PreDestroy;
50
import jakarta.ws.rs.WebApplicationException;
51
import jakarta.ws.rs.core.Response;
52

53
import org.slf4j.Logger;
54
import org.springframework.beans.factory.annotation.Value;
55

56
import uk.ac.manchester.spinnaker.nmpi.model.job.nmpi.DataItem;
57
import uk.ac.manchester.spinnaker.nmpi.rest.OutputManager;
58
import uk.ac.manchester.spinnaker.nmpi.rest.UnicoreFileClient;
59

60
/**
61
 * Service for managing Job output files.
62
 */
63
//TODO needs security; Role = OutputHandler
64
public class OutputManagerImpl implements OutputManager {
65
        /** Indicates that a file has been removed. */
66
        private static final String PURGED_FILE = ".purged_";
67

68
        /** The directory to store files in. */
69
        @Value("${results.directory}")
70
        private File resultsDirectory;
71

72
        /** The URL of the server. */
73
        private final URL baseServerUrl;
74

75
        /** The amount of time results should be kept, in milliseconds. */
76
        private long timeToKeepResults;
77

78
        /** Map of locks for files. */
79
        private final Map<File, LockToken> synchronizers = new HashMap<>();
2✔
80

81
        /** The logger. */
82
        private static final Logger logger = getLogger(OutputManagerImpl.class);
2✔
83

84
        /**
85
         * A lock token. Initially locked.
86
         */
87
        private static final class LockToken {
×
88
                /** True if the token is locked. */
89
                private boolean locked = true;
×
90

91
                /** True if the token is waiting for a lock. */
92
                private boolean waiting = false;
×
93

94
                /**
95
                 * Wait until the token is unlocked.
96
                 */
97
                private synchronized void waitForUnlock() {
98
                        waiting = true;
×
99

100
                        // Wait until unlocked
101
                        while (locked) {
×
102
                                waitfor(this);
×
103
                        }
104

105
                        // Now lock again
106
                        locked = true;
×
107
                        waiting = false;
×
108
                }
×
109

110
                /**
111
                 * Unlock the token.
112
                 *
113
                 * @return True if the token is waiting again.
114
                 */
115
                private synchronized boolean unlock() {
116
                        locked = false;
×
117
                        notifyAll();
×
118
                        return waiting;
×
119
                }
120
        }
121

122
        /**
123
         * A class to lock a job.
124
         */
125
        private class JobLock implements AutoCloseable {
126
                /** The directory being locked by this token. */
127
                private File dir;
128

129
                /**
130
                 * Create a new lock for a directory.
131
                 *
132
                 * @param dir
133
                 *            The directory to lock
134
                 */
135
                JobLock(final File dir) {
×
136
                        this.dir = dir;
×
137

138
                        LockToken lock;
139
                        synchronized (synchronizers) {
×
140
                                if (!synchronizers.containsKey(dir)) {
×
141
                                        // Constructed pre-locked
142
                                        synchronizers.put(dir, new LockToken());
×
143
                                        return;
×
144
                                }
145
                                lock = synchronizers.get(dir);
×
146
                        }
×
147

148
                        lock.waitForUnlock();
×
149
                }
×
150

151
                @Override
152
                public void close() {
153
                        synchronized (synchronizers) {
×
154
                                final var lock = synchronizers.get(dir);
×
155
                                if (!lock.unlock()) {
×
156
                                        synchronizers.remove(dir);
×
157
                                }
158
                        }
×
159
                }
×
160
        }
161

162
        /**
163
         * Instantiate the output manager.
164
         *
165
         * @param baseServerUrl
166
         *            The base URL of the overall service, used when generating
167
         *            internal URLs.
168
         */
169
        public OutputManagerImpl(final URL baseServerUrl) {
2✔
170
                this.baseServerUrl = baseServerUrl;
2✔
171
        }
2✔
172

173
        /**
174
         * Set the number of days after a job has finished to keep results.
175
         *
176
         * @param nDaysToKeepResults
177
         *            The number of days to keep the results
178
         */
179
        @Value("${results.purge.days}")
180
        void setPurgeTimeout(final long nDaysToKeepResults) {
181
                timeToKeepResults = MILLISECONDS.convert(nDaysToKeepResults, DAYS);
2✔
182
        }
2✔
183

184
        /** Periodic execution engine. */
185
        private final ScheduledExecutorService scheduler = newScheduledThreadPool(
2✔
186
                        1);
187

188
        /**
189
         * Arrange for old output to be purged once per day.
190
         */
191
        @PostConstruct
192
        private void initPurgeScheduler() {
193
                scheduler.scheduleAtFixedRate(this::removeOldFiles, 0, 1, DAYS);
2✔
194
        }
2✔
195

196
        /**
197
         * Stop the scheduler running jobs.
198
         */
199
        @PreDestroy
200
        private void stopPurgeScheduler() {
201
                scheduler.shutdown();
1✔
202
        }
1✔
203

204
        /**
205
         * Get the project directory for a given project.
206
         *
207
         * @param projectId
208
         *            The id of the project
209
         * @return The directory of the project
210
         */
211
        private File getProjectDirectory(final String projectId) {
212
                if (isNull(projectId) || projectId.isEmpty()
×
213
                                || projectId.endsWith("/")) {
×
214
                        throw new IllegalArgumentException("bad projectId");
×
215
                }
216
                final var name = new File(projectId).getName();
×
217
                if (name.equals(".") || name.equals("..") || name.isEmpty()) {
×
218
                        throw new IllegalArgumentException("bad projectId");
×
219
                }
220
                return new File(resultsDirectory, name);
×
221
        }
222

223
        @Override
224
        public List<DataItem> addOutputs(final String projectId, final int id,
225
                        final File baseDirectory, final Collection<File> outputs)
226
                        throws IOException {
227
                if (isNull(outputs)) {
×
228
                        return null;
×
229
                }
230

231
                final var pId = new File(projectId).getName();
×
232
                final int pathStart = baseDirectory.getAbsolutePath().length();
×
233
                final var projectDirectory = getProjectDirectory(projectId);
×
234
                final var idDirectory = new File(projectDirectory, String.valueOf(id));
×
235

236
                try (var op = new JobLock(idDirectory)) {
×
237
                        final var outputData = new ArrayList<DataItem>();
×
238
                        for (final var output : outputs) {
×
239
                                if (!output.getAbsolutePath()
×
240
                                                .startsWith(baseDirectory.getAbsolutePath())) {
×
241
                                        throw new IOException("Output file " + output
×
242
                                                        + " is outside base directory " + baseDirectory);
243
                                }
244

245
                                var outputPath = output.getAbsolutePath()
×
246
                                                .substring(pathStart).replace('\\', '/');
×
247
                                if (outputPath.startsWith("/")) {
×
248
                                        outputPath = outputPath.substring(1);
×
249
                                }
250

251
                                final var newOutput = new File(idDirectory, outputPath);
×
252
                                newOutput.getParentFile().mkdirs();
×
253
                                if (newOutput.exists()) {
×
254
                                        if (!newOutput.delete()) {
×
255
                                                logger.warn("Could not delete existing file {};"
×
256
                                                                + " new file will not be used!", newOutput);
257
                                        } else {
258
                                                logger.warn("Overwriting existing file {}", newOutput);
×
259
                                        }
260
                                }
261
                                if (!newOutput.exists()) {
×
262
                                        move(output.toPath(), newOutput.toPath());
×
263
                                }
264
                                final var outputUrl = new URL(baseServerUrl,
×
265
                                                "output/" + pId + "/" + id + "/" + outputPath);
266
                                outputData.add(new DataItem(outputUrl.toExternalForm()));
×
267
                                logger.debug("New output {} mapped to {}",
×
268
                                                newOutput, outputUrl);
269
                        }
×
270

271
                        return outputData;
×
272
                }
273
        }
274

275
        /**
276
         * Get a file as a response to a query.
277
         *
278
         * @param idDirectory
279
         *            The directory of the project
280
         * @param filename
281
         *            The name of the file to be stored
282
         * @param download
283
         *            True if the content type should be set to guarantee that the
284
         *            file is downloaded, False to attempt to guess the content type
285
         * @return The response
286
         */
287
        private Response getResultFile(final File idDirectory,
288
                        final String filename, final boolean download) {
289
                final var resultFile = new File(idDirectory, filename);
×
290
                final var purgeFile = getPurgeFile(idDirectory);
×
291

292
                try (var op = new JobLock(idDirectory)) {
×
293
                        if (purgeFile.exists()) {
×
294
                                logger.debug("{} was purged", idDirectory);
×
295
                                return status(NOT_FOUND).entity("Results from job "
×
296
                                                + idDirectory.getName() + " have been removed")
×
297
                                                .build();
×
298
                        }
299

300
                        if (!resultFile.canRead()) {
×
301
                                logger.debug("{} was not found", resultFile);
×
302
                                return status(NOT_FOUND).build();
×
303
                        }
304

305
                        try {
306
                                if (!download) {
×
307
                                        final var contentType =
×
308
                                                        probeContentType(resultFile.toPath());
×
309
                                        if (nonNull(contentType)) {
×
310
                                                logger.debug("File has content type {}", contentType);
×
311
                                                return ok(resultFile, contentType).build();
×
312
                                        }
313
                                }
314
                        } catch (final IOException e) {
×
315
                                logger.debug("Content type of {} could not be determined",
×
316
                                                resultFile, e);
317
                        }
×
318

319
                        return ok(resultFile).header("Content-Disposition",
×
320
                                        "attachment; filename=" + filename).build();
×
321
                }
×
322
        }
323

324
        /**
325
         * Get the file that marks a directory as purged.
326
         *
327
         * @param directory
328
         *            The directory to find the file in
329
         * @return The purge marker file
330
         */
331
        private File getPurgeFile(final File directory) {
332
                return new File(resultsDirectory, PURGED_FILE + directory.getName());
×
333
        }
334

335
        @Override
336
        public Response getResultFile(final String projectId, final int id,
337
                        final String filename, final boolean download) {
338
                logger.debug("Retrieving {} from {}/{}", filename, projectId, id);
×
339
                final var projectDirectory = getProjectDirectory(projectId);
×
340
                final var idDirectory = new File(projectDirectory, String.valueOf(id));
×
341
                return getResultFile(idDirectory, filename, download);
×
342
        }
343

344
        @Override
345
        public Response getResultFile(final int id, final String filename,
346
                        final boolean download) {
347
                logger.debug("Retrieving {} from {}", filename, id);
×
348
                final var idDirectory = getProjectDirectory(String.valueOf(id));
×
349
                return getResultFile(idDirectory, filename, download);
×
350
        }
351

352
        /**
353
         * Upload files in recursive subdirectories to UniCore.
354
         *
355
         * @param authHeader
356
         *            The authentication to use
357
         * @param directory
358
         *            The directory to start from
359
         * @param fileManager
360
         *            The UniCore client
361
         * @param storageId
362
         *            The id of the UniCore storage
363
         * @param filePath
364
         *            The path in the UniCore storage to upload to
365
         * @throws IOException
366
         *             If something goes wrong
367
         */
368
        private void recursivelyUploadFiles(final String authHeader,
369
                        final File directory,
370
                        final UnicoreFileClient fileManager, final String storageId,
371
                        final String filePath) throws IOException {
372
                final var files = directory.listFiles();
×
373
                if (isNull(files)) {
×
374
                        return;
×
375
                }
376
                for (final var file : files) {
×
377
                        if (file.getName().equals(".") || file.getName().equals("..")
×
378
                                        || file.getName().isEmpty()) {
×
379
                                continue;
×
380
                        }
381
                        final var uploadFileName = filePath + "/" + file.getName();
×
382
                        if (file.isDirectory()) {
×
383
                                recursivelyUploadFiles(authHeader, file, fileManager, storageId,
×
384
                                                uploadFileName);
385
                                continue;
×
386
                        }
387
                        if (!file.isFile()) {
×
388
                                continue;
×
389
                        }
390
                        try (var input = new FileInputStream(file)) {
×
391
                                fileManager.upload(authHeader, storageId, uploadFileName,
×
392
                                                input);
393
                        } catch (final WebApplicationException e) {
×
394
                                throw new IOException("Error uploading file to " + storageId
×
395
                                                + "/" + uploadFileName, e);
396
                        } catch (final FileNotFoundException e) {
×
397
                                // Ignore files which vanish.
398
                        }
×
399
                }
400
        }
×
401

402
        @Override
403
        public Response uploadResultsToHPCServer(final String projectId,
404
                        final int id, final String serverUrl, final String storageId,
405
                        final String filePath, final String userId, final String token) {
406
                final var idDirectory =
×
407
                                new File(getProjectDirectory(projectId), String.valueOf(id));
×
408
                if (!idDirectory.canRead()) {
×
409
                        logger.debug("{} was not found", idDirectory);
×
410
                        return status(NOT_FOUND).build();
×
411
                }
412

413
                try {
414
                        final var authHeader = "Bearer: " + token;
×
415
                        final var fileClient = UnicoreFileClient.createClient(serverUrl);
×
416
                        try (var op = new JobLock(idDirectory)) {
×
417
                                recursivelyUploadFiles(authHeader, idDirectory, fileClient,
×
418
                                                storageId, filePath.replaceAll("/+$", ""));
×
419
                        }
420
                } catch (final MalformedURLException e) {
×
421
                        logger.error("bad user-supplied URL", e);
×
422
                        return status(BAD_REQUEST)
×
423
                                        .entity("The URL specified was malformed").build();
×
424
                } catch (final Throwable e) {
×
425
                        logger.error("failure in upload", e);
×
426
                        return serverError()
×
427
                                        .entity("General error reading or uploading a file")
×
428
                                        .build();
×
429
                }
×
430

431
                return ok("ok").build();
×
432
        }
433

434
        /**
435
         * Recursively remove a directory.
436
         *
437
         * @param directory
438
         *            The directory to remove
439
         */
440
        private void removeDirectory(final File directory) {
441
                for (final var file : directory.listFiles()) {
×
442
                        if (file.isDirectory()) {
×
443
                                removeDirectory(file);
×
444
                        } else {
445
                                file.delete();
×
446
                        }
447
                }
448
                directory.delete();
×
449
        }
×
450

451
        /**
452
         * Remove files that are deemed to have expired.
453
         */
454
        private void removeOldFiles() {
455
                final long startTime = currentTimeMillis();
2✔
456
                for (final var projectDirectory : resultsDirectory.listFiles()) {
2!
457
                        if (projectDirectory.isDirectory()
2✔
458
                                        && removeOldProjectDirectoryContents(startTime,
×
459
                                                        projectDirectory)) {
460
                                logger.info("No more outputs for project {}",
×
461
                                                projectDirectory.getName());
×
462
                                projectDirectory.delete();
×
463
                        }
464
                }
465
        }
×
466

467
        /**
468
         * Remove project contents that are deemed to have expired.
469
         *
470
         * @param startTime
471
         *            The current time being considered
472
         * @param projectDirectory
473
         *            The directory containing the project files
474
         * @return True if every job in the project has been removed
475
         */
476
        private boolean removeOldProjectDirectoryContents(final long startTime,
477
                        final File projectDirectory) {
478
                boolean allJobsRemoved = true;
2✔
479
                for (final var jobDirectory : projectDirectory.listFiles()) {
×
480
                        logger.debug("Determining whether to remove {} "
×
481
                                        + "which is {}ms old of {}", jobDirectory,
482
                                        startTime - jobDirectory.lastModified(),
×
483
                                        timeToKeepResults);
×
484
                        if (jobDirectory.isDirectory() && ((startTime
×
485
                                        - jobDirectory.lastModified()) > timeToKeepResults)) {
×
486
                                logger.info("Removing results for job {}",
×
487
                                                jobDirectory.getName());
×
488
                                try (var op = new JobLock(jobDirectory)) {
×
489
                                        removeDirectory(jobDirectory);
×
490
                                }
491

492
                                try (var purgedFileWriter =
×
493
                                                new PrintWriter(getPurgeFile(jobDirectory))) {
×
494
                                        purgedFileWriter.println(currentTimeMillis());
×
495
                                } catch (final IOException e) {
×
496
                                        logger.error("Error writing purge file", e);
×
497
                                }
×
498
                        } else {
499
                                allJobsRemoved = false;
×
500
                        }
501
                }
502
                return allJobsRemoved;
×
503
        }
504
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc