• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sonus21 / rqueue / 2707

01 Mar 2024 11:33AM UTC coverage: 91.237% (-0.1%) from 91.373%
2707

push

circleci

Sonu Kumar
released 3.1.1

5362 of 5877 relevant lines covered (91.24%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.47
/rqueue-core/src/main/java/com/github/sonus21/rqueue/utils/QueueThreadPool.java
1
/*
2
 * Copyright (c) 2021-2023 Sonu Kumar
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * You may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and limitations under the License.
14
 *
15
 */
16

17
package com.github.sonus21.rqueue.utils;
18

19
import com.github.sonus21.rqueue.core.RqueueMessage;
20
import com.github.sonus21.rqueue.listener.QueueDetail;
21
import java.util.concurrent.Semaphore;
22
import java.util.concurrent.TimeUnit;
23
import lombok.extern.slf4j.Slf4j;
24
import org.springframework.core.task.AsyncTaskExecutor;
25
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
26

27
@Slf4j
1✔
28
public final class QueueThreadPool {
29

30
  private final AsyncTaskExecutor taskExecutor;
31
  private final boolean defaultExecutor;
32
  private final Semaphore semaphore;
33
  private final int maxJobsCount;
34

35
  public QueueThreadPool(
36
      AsyncTaskExecutor taskExecutor, boolean defaultExecutor, int maxJobsCount) {
1✔
37
    this.taskExecutor = taskExecutor;
1✔
38
    this.defaultExecutor = defaultExecutor;
1✔
39
    this.maxJobsCount = maxJobsCount;
1✔
40
    this.semaphore = new Semaphore(maxJobsCount);
1✔
41
  }
1✔
42

43
  public void release() {
44
    release(1);
1✔
45
  }
1✔
46

47
  public void release(int n) {
48
    if (n > 0) {
1✔
49
      semaphore.release(n);
1✔
50
    }
51
  }
1✔
52

53
  public boolean acquire(int n, long timeout) throws InterruptedException {
54
    if (log.isDebugEnabled() && taskExecutor instanceof ThreadPoolTaskExecutor) {
1✔
55
      ThreadPoolTaskExecutor executor = ((ThreadPoolTaskExecutor) taskExecutor);
×
56
      log.debug("Current active threads {}", executor.getActiveCount());
×
57
    }
58
    return semaphore.tryAcquire(n, timeout, TimeUnit.MILLISECONDS);
1✔
59
  }
60

61
  public void execute(Runnable r) {
62
    this.taskExecutor.execute(r);
1✔
63
  }
1✔
64

65
  public int availableThreads() {
66
    return semaphore.availablePermits();
1✔
67
  }
68

69
  public boolean allTasksCompleted() {
70
    int permits = availableThreads();
1✔
71
    if (permits > maxJobsCount) {
1✔
72
      log.error("More number of release is called");
×
73
    }
74
    return permits >= maxJobsCount;
1✔
75
  }
76

77
  public String destroy() {
78
    if (!defaultExecutor) {
1✔
79
      return null;
1✔
80
    }
81
    if (taskExecutor instanceof ThreadPoolTaskExecutor) {
1✔
82
      ThreadPoolTaskExecutor executor = ((ThreadPoolTaskExecutor) taskExecutor);
1✔
83
      executor.destroy();
1✔
84
      return executor.getThreadNamePrefix();
1✔
85
    }
86
    return null;
×
87
  }
88

89
  public void taskRejected(QueueDetail queueDetail, RqueueMessage message) {
90
    log.warn(
×
91
        "Task rejected by executor Queue: {}, Message: {}",
92
        queueDetail.getName(),
×
93
        message.getMessage());
×
94
  }
×
95
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc