• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

LearnLib / learnlib / 6433387082

06 Oct 2023 03:10PM UTC coverage: 92.296% (-0.007%) from 92.303%
6433387082

push

github

mtf90
update Falk's developer id

11573 of 12539 relevant lines covered (92.3%)

1.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.89
/oracles/parallelism/src/main/java/de/learnlib/oracle/parallelism/AbstractDynamicBatchProcessor.java
1
/* Copyright (C) 2013-2023 TU Dortmund
2
 * This file is part of LearnLib, http://www.learnlib.de/.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package de.learnlib.oracle.parallelism;
17

18
import java.util.ArrayList;
19
import java.util.Collection;
20
import java.util.List;
21
import java.util.concurrent.ExecutionException;
22
import java.util.concurrent.ExecutorService;
23
import java.util.concurrent.Future;
24
import java.util.function.Supplier;
25

26
import com.google.common.base.Throwables;
27
import de.learnlib.api.oracle.parallelism.BatchInterruptedException;
28
import de.learnlib.api.oracle.parallelism.BatchProcessor;
29
import de.learnlib.api.oracle.parallelism.ThreadPool;
30
import de.learnlib.setting.LearnLibProperty;
31
import de.learnlib.setting.LearnLibSettings;
32
import org.checkerframework.checker.index.qual.NonNegative;
33

34
/**
35
 * A batch processor that dynamically distributes queries to worker threads.
36
 *
37
 * @param <Q>
38
 *         query type
39
 * @param <P>
40
 *         (sub-) processor type
41
 */
42
public abstract class AbstractDynamicBatchProcessor<Q, P extends BatchProcessor<Q>>
43
        implements ThreadPool, BatchProcessor<Q> {
44

45
    public static final int BATCH_SIZE;
46
    public static final int POOL_SIZE;
47
    public static final PoolPolicy POOL_POLICY;
48

49
    static {
50
        LearnLibSettings settings = LearnLibSettings.getInstance();
2✔
51

52
        int numProcessors = Runtime.getRuntime().availableProcessors();
2✔
53

54
        BATCH_SIZE = settings.getInt(LearnLibProperty.PARALLEL_BATCH_SIZE_DYNAMIC, 1);
2✔
55
        POOL_SIZE = settings.getInt(LearnLibProperty.PARALLEL_POOL_SIZE, numProcessors);
2✔
56
        POOL_POLICY = settings.getEnumValue(LearnLibProperty.PARALLEL_POOL_POLICY, PoolPolicy.class, PoolPolicy.CACHED);
2✔
57
    }
2✔
58

59
    private final ThreadLocal<P> threadLocalOracle;
60
    private final ExecutorService executor;
61
    private final @NonNegative int batchSize;
62

63
    public AbstractDynamicBatchProcessor(Supplier<? extends P> oracleSupplier,
64
                                         @NonNegative int batchSize,
65
                                         ExecutorService executor) {
2✔
66
        this.threadLocalOracle = ThreadLocal.withInitial(oracleSupplier);
2✔
67
        this.executor = executor;
2✔
68
        this.batchSize = batchSize;
2✔
69
    }
2✔
70

71
    @Override
72
    public void shutdown() {
73
        executor.shutdown();
2✔
74
    }
2✔
75

76
    @Override
77
    public void shutdownNow() {
78
        executor.shutdownNow();
2✔
79
    }
2✔
80

81
    @Override
82
    public void processBatch(Collection<? extends Q> queries) {
83
        if (queries.isEmpty()) {
2✔
84
            return;
2✔
85
        }
86

87
        int numQueries = queries.size();
2✔
88
        int numJobs = (numQueries - 1) / batchSize + 1;
2✔
89
        List<Q> currentBatch = null;
2✔
90

91
        List<Future<?>> futures = new ArrayList<>(numJobs);
2✔
92

93
        for (Q query : queries) {
2✔
94

95
            if (currentBatch == null) {
2✔
96
                currentBatch = new ArrayList<>(batchSize);
2✔
97
            }
98

99
            currentBatch.add(query);
2✔
100
            if (currentBatch.size() == batchSize) {
2✔
101
                Future<?> future = executor.submit(new DynamicQueriesJob<>(currentBatch, threadLocalOracle));
2✔
102
                futures.add(future);
2✔
103
                currentBatch = null;
2✔
104
            }
105
        }
2✔
106

107
        if (currentBatch != null) {
2✔
108
            Future<?> future = executor.submit(new DynamicQueriesJob<>(currentBatch, threadLocalOracle));
2✔
109
            futures.add(future);
2✔
110
        }
111

112
        try {
113
            // Await completion of all jobs
114
            for (Future<?> future : futures) {
2✔
115
                future.get();
2✔
116
            }
2✔
117
        } catch (ExecutionException e) {
2✔
118
            Throwables.throwIfUnchecked(e.getCause());
×
119
            throw new AssertionError("Runnables must not throw checked exceptions", e);
×
120
        } catch (InterruptedException e) {
×
121
            Thread.interrupted();
×
122
            throw new BatchInterruptedException(e);
×
123
        }
2✔
124
    }
2✔
125

126
    protected P getProcessor() {
127
        return threadLocalOracle.get();
2✔
128
    }
129

130
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc