• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 2697

01 Mar 2024 11:19AM UTC coverage: 91.373% (-0.05%) from 91.424%
2697

push

circleci

web-flow
Explicit PathVariable name in queue related endpoints (#219)

5370 of 5877 relevant lines covered (91.37%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.38
/rqueue-core/src/main/java/com/github/sonus21/rqueue/utils/ThreadUtils.java
1
/*
2
 * Copyright (c) 2020-2023 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.utils;
18

19
import java.util.ArrayList;
20
import java.util.Collection;
21
import java.util.List;
22
import java.util.concurrent.CancellationException;
23
import java.util.concurrent.ExecutionException;
24
import java.util.concurrent.Future;
25
import java.util.concurrent.ScheduledFuture;
26
import java.util.concurrent.TimeUnit;
27
import java.util.concurrent.TimeoutException;
28
import org.slf4j.Logger;
29
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
30
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
31

32
public final class ThreadUtils {
33

34
  private ThreadUtils() {
35
  }
36

37
  public static ThreadPoolTaskScheduler createTaskScheduler(
38
      int poolSize, String threadPrefix, int terminationTime) {
39
    ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
1✔
40
    scheduler.setBeanName(threadPrefix.substring(0, threadPrefix.length() - 1));
1✔
41
    scheduler.setPoolSize(poolSize);
1✔
42
    scheduler.setThreadNamePrefix(threadPrefix);
1✔
43
    scheduler.setAwaitTerminationSeconds(terminationTime);
1✔
44
    scheduler.setRemoveOnCancelPolicy(true);
1✔
45
    scheduler.afterPropertiesSet();
1✔
46
    return scheduler;
1✔
47
  }
48

49
  public static ThreadPoolTaskExecutor createTaskExecutor(
50
      String beanName, String threadPrefix, int corePoolSize, int maxPoolSize, int queueCapacity) {
51
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
1✔
52
    threadPoolTaskExecutor.setThreadNamePrefix(threadPrefix);
1✔
53
    threadPoolTaskExecutor.setBeanName(beanName);
1✔
54
    if (corePoolSize > 0) {
1✔
55
      threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
1✔
56
      threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
1✔
57
      threadPoolTaskExecutor.setMaxPoolSize(Math.max(corePoolSize, maxPoolSize));
1✔
58
      threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
1✔
59
    }
60
    threadPoolTaskExecutor.afterPropertiesSet();
1✔
61
    return threadPoolTaskExecutor;
1✔
62
  }
63

64
  private static void waitForShutdown(
65
      Logger log, Future<?> future, long waitTimeInMillis, String msg, Object... msgParams) {
66
    boolean completedOrCancelled = future.isCancelled() || future.isDone();
1✔
67
    if (completedOrCancelled) {
1✔
68
      return;
×
69
    }
70
    try {
71
      future.get(waitTimeInMillis, TimeUnit.MILLISECONDS);
1✔
72
    } catch (ExecutionException | TimeoutException | CancellationException e) {
×
73
      log.debug(msg, msgParams, e);
×
74
    } catch (InterruptedException e) {
×
75
      Thread.currentThread().interrupt();
×
76
    }
1✔
77
  }
1✔
78

79
  public static void waitForTermination(
80
      Logger log, Future<?> future, long waitTimeInMillis, String msg, Object... msgParams) {
81
    if (future == null || future.isCancelled() || future.isDone()) {
1✔
82
      return;
1✔
83
    }
84
    if (future instanceof ScheduledFuture) {
1✔
85
      ScheduledFuture<?> f = (ScheduledFuture<?>) future;
1✔
86
      if (f.getDelay(TimeUnit.MILLISECONDS) > Constants.MIN_DELAY) {
1✔
87
        f.cancel(false);
1✔
88
        return;
1✔
89
      }
90
    }
91
    waitForShutdown(log, future, waitTimeInMillis, msg, msgParams);
1✔
92
  }
1✔
93

94
  public static boolean waitForWorkerTermination(
95
      final Collection<QueueThreadPool> queueThreadPools, long waitTime) {
96
    long endTime = System.currentTimeMillis() + waitTime;
1✔
97
    List<QueueThreadPool> remaining = new ArrayList<>(queueThreadPools);
1✔
98
    while (System.currentTimeMillis() < endTime && !remaining.isEmpty()) {
1✔
99
      List<QueueThreadPool> newRemaining = new ArrayList<>();
1✔
100
      for (QueueThreadPool queueThreadPool : remaining) {
1✔
101
        if (!queueThreadPool.allTasksCompleted()) {
1✔
102
          newRemaining.add(queueThreadPool);
1✔
103
        }
104
      }
1✔
105
      if (!newRemaining.isEmpty()) {
1✔
106
        TimeoutUtils.sleep(10);
1✔
107
      }
108
      remaining = newRemaining;
1✔
109
    }
1✔
110
    return remaining.isEmpty();
1✔
111
  }
112

113
  public static String getWorkerName(String name) {
114
    String camelCase = StringUtils.getBeanName(name);
1✔
115
    return camelCase + "Listener";
1✔
116
  }
117
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc