• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tomdesair / tus-java-server / 27465232708

13 Jun 2026 11:17AM UTC coverage: 92.468% (-2.1%) from 94.575%
27465232708

Pull #80

github

web-flow
Merge 3c0ac21fb into 73dcffedd
Pull Request #80: feat: Add upload lock contention resolution for HEAD requests

603 of 694 branches covered (86.89%)

Branch coverage included in aggregate %.

150 of 197 new or added lines in 7 files covered. (76.14%)

1 existing line in 1 file now uncovered.

1754 of 1855 relevant lines covered (94.56%)

5.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.41
/src/main/java/me/desair/tus/server/upload/disk/DiskLockingService.java
1
package me.desair.tus.server.upload.disk;
2

3
import java.io.File;
4
import java.io.IOException;
5
import java.io.InputStream;
6
import java.lang.ref.WeakReference;
7
import java.nio.file.DirectoryStream;
8
import java.nio.file.Files;
9
import java.nio.file.Path;
10
import java.nio.file.attribute.FileTime;
11
import java.util.Map;
12
import java.util.concurrent.ConcurrentHashMap;
13
import me.desair.tus.server.exception.TusException;
14
import me.desair.tus.server.exception.UploadAlreadyLockedException;
15
import me.desair.tus.server.upload.UploadId;
16
import me.desair.tus.server.upload.UploadIdFactory;
17
import me.desair.tus.server.upload.UploadLock;
18
import me.desair.tus.server.upload.UploadLockingService;
19
import me.desair.tus.server.util.InterruptibleInputStream;
20
import org.apache.commons.lang3.Validate;
21
import org.slf4j.Logger;
22
import org.slf4j.LoggerFactory;
23

24
/**
25
 * {@link UploadLockingService} implementation that uses the file system for implementing locking
26
 * <br>
27
 * File locking can also apply to shared network drives. This way the framework supports clustering
28
 * as long as the upload storage directory is mounted as a shared (network) drive. <br>
29
 * File locks are also automatically released on application (JVM) shutdown. This means the file
30
 * locking is not persistent and prevents cleanup and stale lock issues.
31
 */
32
public class DiskLockingService extends AbstractDiskBasedService implements UploadLockingService {
33

34
  private static final Logger log = LoggerFactory.getLogger(DiskLockingService.class);
6✔
35
  private static final String LOCK_SUB_DIRECTORY = "locks";
36

37
  /** Registry tracking active request input streams in the current JVM. */
38
  private static final ConcurrentHashMap<String, WeakReference<InterruptibleInputStream>>
39
      activeLocks = new ConcurrentHashMap<>();
8✔
40

41
  private static Thread watchdogThread = null;
4✔
42
  private static final Object watchdogLock = new Object();
10✔
43

44
  private UploadIdFactory idFactory;
45

46
  public DiskLockingService(String storagePath) {
47
    super(storagePath + File.separator + LOCK_SUB_DIRECTORY);
10✔
48
  }
2✔
49

50
  /** Constructor to use custom UploadIdFactory. */
51
  public DiskLockingService(UploadIdFactory idFactory, String storagePath) {
52
    this(storagePath);
6✔
53
    Validate.notNull(idFactory, "The IdFactory cannot be null");
12✔
54
    this.idFactory = idFactory;
6✔
55
  }
2✔
56

57
  /**
58
   * Attempts to lock the upload resource. Wraps the lock in a RegisteredLock decorator to manage
59
   * cleanup of stop files and the active lock registry.
60
   */
61
  @Override
62
  public UploadLock lockUploadByUri(String requestUri) throws TusException, IOException {
63

64
    UploadId id = idFactory.readUploadId(requestUri);
10✔
65

66
    UploadLock lock = null;
4✔
67

68
    Path lockPath = getLockPath(id);
8✔
69
    // If lockPath is not null, we know this is a valid Upload URI
70
    if (lockPath != null) {
4✔
71
      FileBasedLock baseLock = new FileBasedLock(requestUri, lockPath);
12✔
72
      Path stopFilePath = baseLock.getLockPath().resolveSibling(id.toString() + ".stop");
14✔
73
      lock = new RegisteredLock(baseLock, id.toString(), stopFilePath);
18✔
74
    }
75
    return lock;
4✔
76
  }
77

78
  /** Cleans up stale locks and stop files in the storage directory. */
79
  @Override
80
  public void cleanupStaleLocks() throws IOException {
81
    try (DirectoryStream<Path> locksStream = Files.newDirectoryStream(getStoragePath())) {
8✔
82
      for (Path path : locksStream) {
16✔
83

84
        FileTime lastModifiedTime = Files.getLastModifiedTime(path);
5✔
85
        if (lastModifiedTime.toMillis() < System.currentTimeMillis() - 10000L) {
7✔
86
          String fileName = path.getFileName().toString();
4✔
87
          if (fileName.endsWith(".stop")) {
4!
UNCOV
88
            Files.deleteIfExists(path);
×
89
          } else {
90
            UploadId id = new UploadId(fileName);
5✔
91
            if (!isLocked(id)) {
4✔
92
              Files.deleteIfExists(path);
3✔
93
              Files.deleteIfExists(path.resolveSibling(fileName + ".stop"));
6✔
94
            }
95
          }
96
        }
97
      }
1✔
98
    }
99
  }
2✔
100

101
  /** Checks whether the upload is locked by attempting to obtain a short-lived file lock. */
102
  @Override
103
  public boolean isLocked(UploadId id) {
104
    boolean locked = false;
4✔
105
    Path lockPath = getLockPath(id);
8✔
106

107
    if (lockPath != null) {
4!
108
      // Try to obtain a lock to see if the upload is currently locked
109
      try (UploadLock lock = new FileBasedLock(id.toString(), lockPath)) {
14✔
110

111
        // We got the lock, so it means no one else is locking it.
112
        locked = false;
4✔
113

114
      } catch (UploadAlreadyLockedException | IOException e) {
1✔
115
        // There was already a lock
116
        locked = true;
2✔
117
      }
2✔
118
    }
119

120
    return locked;
4✔
121
  }
122

123
  @Override
124
  public void setIdFactory(UploadIdFactory idFactory) {
125
    Validate.notNull(idFactory, "The IdFactory cannot be null");
6✔
126
    this.idFactory = idFactory;
3✔
127
  }
1✔
128

129
  /**
130
   * Registers an active request input stream so that it can be interrupted if lock contention
131
   * occurs.
132
   */
133
  @Override
134
  public void registerInputStream(String requestUri, InputStream inputStream) {
135
    if (inputStream == null) {
4!
NEW
136
      return;
×
137
    }
138
    UploadId id = idFactory.readUploadId(requestUri);
10✔
139
    if (id == null) {
4!
NEW
140
      return;
×
141
    }
142
    if (inputStream instanceof InterruptibleInputStream) {
6!
143
      activeLocks.put(id.toString(), new WeakReference<>((InterruptibleInputStream) inputStream));
20✔
144
      startWatchdogIfNecessary();
4✔
145
    }
146
  }
2✔
147

148
  /**
149
   * Requests that the lock for the given URI be released, interrupting the active stream and
150
   * creating a stop file to signal other replicas.
151
   */
152
  @Override
153
  public void requestLockRelease(String requestUri) {
154
    UploadId id = idFactory.readUploadId(requestUri);
10✔
155
    if (id == null) {
4!
NEW
156
      return;
×
157
    }
158
    String idStr = id.toString();
6✔
159

160
    // 1. Release JVM-local lock if active
161
    WeakReference<InterruptibleInputStream> streamRef = activeLocks.get(idStr);
10✔
162
    if (streamRef != null) {
4!
163
      InterruptibleInputStream stream = streamRef.get();
8✔
164
      if (stream != null) {
4!
165
        stream.interrupt();
4✔
166
      }
167
      activeLocks.remove(idStr);
8✔
168
    }
169

170
    // 2. Create the stop file to signal other replicas
171
    Path stopFilePath = getStopPath(id);
8✔
172
    if (stopFilePath != null) {
4!
173
      try {
174
        Path parentDir = stopFilePath.getParent();
6✔
175
        if (parentDir != null && !Files.exists(parentDir)) {
14!
NEW
176
          Files.createDirectories(parentDir);
×
177
        }
178
        Files.write(stopFilePath, new byte[0]);
14✔
NEW
179
      } catch (IOException e) {
×
NEW
180
        log.warn("Unable to create stop file " + stopFilePath, e);
×
181
      }
2✔
182
    }
183
  }
2✔
184

185
  /** Spawns a new background watchdog thread if none is currently active. */
186
  private void startWatchdogIfNecessary() {
187
    synchronized (watchdogLock) {
8✔
188
      if (watchdogThread == null || !watchdogThread.isAlive()) {
7!
189
        watchdogThread = new Thread(new LockWatchdogRunnable(), "tus-lock-watchdog");
18✔
190
        watchdogThread.setDaemon(true);
6✔
191
        // Set lowest priority to ensure request threads are prioritized by the OS scheduler
192
        watchdogThread.setPriority(Thread.MIN_PRIORITY);
6✔
193
        watchdogThread.start();
4✔
194
      }
195
    }
6✔
196
  }
2✔
197

198
  private Path getLockPath(UploadId id) {
199
    return getPathInStorageDirectory(id);
8✔
200
  }
201

202
  /**
203
   * Resolves the stop file path based on the upload ID, ensuring files reside in the correct
204
   * directory.
205
   */
206
  private Path getStopPath(UploadId id) {
207
    Path lockPath = getPathInStorageDirectory(id);
8✔
208
    return lockPath == null ? null : lockPath.resolveSibling(id.toString() + ".stop");
16!
209
  }
210

211
  /**
212
   * Runnable implementation for the background watchdog thread. This thread polls the storage
213
   * directory for ".stop" files created by other processes or replicas. If a stop file is found for
214
   * an active local upload, it interrupts the request stream to release the lock. The thread
215
   * self-terminates when there are no more active local locks to monitor.
216
   */
217
  private class LockWatchdogRunnable implements Runnable {
12✔
218
    @Override
219
    public void run() {
220
      try {
221
        while (true) {
222
          try {
223
            Thread.sleep(1000L);
4✔
NEW
224
          } catch (InterruptedException e) {
×
NEW
225
            Thread.currentThread().interrupt();
×
NEW
226
            break;
×
227
          }
2✔
228

229
          // Check stop files for each active lock
230
          for (Map.Entry<String, WeakReference<InterruptibleInputStream>> entry :
231
              activeLocks.entrySet()) {
18✔
232
            String idStr = entry.getKey();
4✔
233
            WeakReference<InterruptibleInputStream> ref = entry.getValue();
4✔
234
            InterruptibleInputStream stream = ref.get();
4✔
235

236
            if (stream == null) {
2!
NEW
237
              activeLocks.remove(idStr);
×
NEW
238
              continue;
×
239
            }
240

241
            Path stopFilePath = getStopPath(new UploadId(idStr));
8✔
242
            if (stopFilePath != null && Files.exists(stopFilePath)) {
7!
243
              try {
244
                log.info(
4✔
245
                    "Watchdog detected stop file for upload ID {}. Interrupting stream.", idStr);
246
                stream.interrupt();
2✔
NEW
247
              } catch (Throwable t) {
×
NEW
248
                log.warn("Error interrupting stream for ID " + idStr, t);
×
249
              }
1✔
250
              activeLocks.remove(idStr);
4✔
251
              try {
252
                Files.deleteIfExists(stopFilePath);
3✔
NEW
253
              } catch (IOException e) {
×
254
                // ignore
255
              }
1✔
256
            }
257
          }
1✔
258

259
          // Thread-safe check to decide whether to exit
260
          if (activeLocks.isEmpty()) {
6!
261
            synchronized (watchdogLock) {
8✔
262
              if (activeLocks.isEmpty()) {
6!
263
                watchdogThread = null;
4✔
264
                break;
6✔
265
              }
NEW
266
            }
×
267
          }
268
        }
NEW
269
      } catch (Throwable t) {
×
NEW
270
        log.error("Lock watchdog encountered an unexpected error", t);
×
NEW
271
        synchronized (watchdogLock) {
×
NEW
272
          watchdogThread = null;
×
NEW
273
        }
×
274
      }
2✔
275
    }
2✔
276
  }
277

278
  /**
279
   * Decorator around UploadLock to manage local map registry cleanup and stop file deletion when
280
   * the lock is released or closed.
281
   */
282
  private class RegisteredLock implements UploadLock {
283
    private final UploadLock delegate;
284
    private final String uploadIdStr;
285
    private final Path stopFilePath;
286

287
    public RegisteredLock(UploadLock delegate, String uploadIdStr, Path stopFilePath) {
10✔
288
      this.delegate = delegate;
6✔
289
      this.uploadIdStr = uploadIdStr;
6✔
290
      this.stopFilePath = stopFilePath;
6✔
291
    }
2✔
292

293
    @Override
294
    public String getUploadUri() {
NEW
295
      return delegate.getUploadUri();
×
296
    }
297

298
    @Override
299
    public void release() {
300
      try {
301
        delegate.release();
3✔
302
      } finally {
303
        activeLocks.remove(uploadIdStr);
5✔
304
        if (stopFilePath != null) {
3!
305
          try {
306
            Files.deleteIfExists(stopFilePath);
4✔
NEW
307
          } catch (IOException e) {
×
NEW
308
            log.warn("Unable to delete stop file " + stopFilePath, e);
×
309
          }
1✔
310
        }
311
      }
312
    }
1✔
313

314
    @Override
315
    public void close() throws IOException {
316
      try {
317
        delegate.close();
3✔
318
      } finally {
319
        activeLocks.remove(uploadIdStr);
5✔
320
        if (stopFilePath != null) {
3!
321
          try {
322
            Files.deleteIfExists(stopFilePath);
4✔
NEW
323
          } catch (IOException e) {
×
NEW
324
            log.warn("Unable to delete stop file " + stopFilePath, e);
×
325
          }
1✔
326
        }
327
      }
328
    }
1✔
329
  }
330
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc