• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

LearnLib / automatalib / 13138848026

04 Feb 2025 02:53PM UTC coverage: 92.108% (+2.2%) from 89.877%
13138848026

push

github

mtf90
[maven-release-plugin] prepare release automatalib-0.12.0

16609 of 18032 relevant lines covered (92.11%)

1.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.89
/commons/util/src/main/java/net/automatalib/common/util/concurrent/ScalingThreadPoolExecutor.java
1
/* Copyright (C) 2013-2025 TU Dortmund University
2
 * This file is part of AutomataLib <https://automatalib.net>.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package net.automatalib.common.util.concurrent;
17

18
import java.util.concurrent.LinkedBlockingQueue;
19
import java.util.concurrent.RejectedExecutionException;
20
import java.util.concurrent.RejectedExecutionHandler;
21
import java.util.concurrent.ThreadPoolExecutor;
22
import java.util.concurrent.TimeUnit;
23
import java.util.concurrent.atomic.AtomicInteger;
24

25
/**
26
 * A {@link ThreadPoolExecutor} that internally uses a {@link ScalingLinkedBlockingQueue} to manage scheduled tasks.
27
 * This allows us to manage a dynamically sized thread pool that actually spawns new threads when the pool still allows
28
 * for it.
29
 * <p>
30
 * Additionally, this implementation keeps track of the current number of active threads by using an
31
 * {@link AtomicInteger} counter, rather than querying its list of worker threads.
32
 * <p>
33
 * For further information, see
34
 * <a href="https://github.com/kimchy/kimchy.github.com/blob/master/_posts/2008-11-23-juc-executorservice-gotcha.textile">
35
 * this blog post</a>.
36
 */
37
public final class ScalingThreadPoolExecutor extends ThreadPoolExecutor {
38

39
    private final AtomicInteger activeCount = new AtomicInteger();
2✔
40

41
    public ScalingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
42
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new ScalingLinkedBlockingQueue());
2✔
43

44
        ((ScalingLinkedBlockingQueue) getQueue()).setTpe(this);
2✔
45
        setRejectedExecutionHandler(new ForceEnqueuingHandler());
2✔
46
    }
2✔
47

48
    @Override
49
    public int getActiveCount() {
50
        return activeCount.get();
2✔
51
    }
52

53
    @Override
54
    protected void beforeExecute(Thread t, Runnable r) {
55
        activeCount.incrementAndGet();
2✔
56
    }
2✔
57

58
    @Override
59
    protected void afterExecute(Runnable r, Throwable t) {
60
        activeCount.decrementAndGet();
2✔
61
    }
2✔
62

63
    /**
64
     * A modified {@link LinkedBlockingQueue} that rejects offering new elements if its associated
65
     * {@link ScalingThreadPoolExecutor} still has threads to spare. This means we will force a creation of a new thread
66
     * whenever there is room for parallel processing.
67
     * <p>
68
     * To prevent {@link RejectedExecutionException}s when reaching the limit of the thread pool, this class is only
69
     * useful in combination with a {@link ForceEnqueuingHandler} that forcefully enqueues the scheduled task to this
70
     * (otherwise unbounded) queue.
71
     */
72
    @SuppressWarnings("PMD.NonSerializableClass") // not publicly exposed
73
    private static final class ScalingLinkedBlockingQueue extends LinkedBlockingQueue<Runnable> {
74

75
        private ThreadPoolExecutor tpe;
76

77
        void setTpe(ThreadPoolExecutor tpe) {
78
            this.tpe = tpe;
2✔
79
        }
2✔
80

81
        @Override
82
        public boolean offer(Runnable r) {
83
            return tpe.getActiveCount() + size() >= tpe.getMaximumPoolSize() && super.offer(r);
2✔
84
        }
85
    }
86

87
    /**
88
     * A {@link RejectedExecutionHandler} that forces the enqueuing of rejected tasks to the queue of a
89
     * {@link ThreadPoolExecutor}. Mainly useful in combination with {@link ScalingLinkedBlockingQueue}.
90
     */
91
    private static final class ForceEnqueuingHandler implements RejectedExecutionHandler {
92

93
        @Override
94
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
95
            try {
96
                executor.getQueue().put(r);
2✔
97
            } catch (InterruptedException e) {
×
98
                throw new RejectedExecutionException(e);
×
99
            }
2✔
100
        }
2✔
101
    }
102

103
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc