• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

LearnLib / learnlib / 6433387082

06 Oct 2023 03:10PM UTC coverage: 92.296% (-0.007%) from 92.303%
6433387082

push

github

mtf90
update Falk's developer id

11573 of 12539 relevant lines covered (92.3%)

1.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.71
/oracles/parallelism/src/main/java/de/learnlib/oracle/parallelism/AbstractStaticBatchProcessor.java
1
/* Copyright (C) 2013-2023 TU Dortmund
2
 * This file is part of LearnLib, http://www.learnlib.de/.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package de.learnlib.oracle.parallelism;
17

18
import java.util.ArrayList;
19
import java.util.Collection;
20
import java.util.Iterator;
21
import java.util.List;
22
import java.util.concurrent.ExecutionException;
23
import java.util.concurrent.ExecutorService;
24
import java.util.concurrent.Executors;
25
import java.util.concurrent.Future;
26

27
import com.google.common.base.Throwables;
28
import de.learnlib.api.oracle.parallelism.BatchInterruptedException;
29
import de.learnlib.api.oracle.parallelism.BatchProcessor;
30
import de.learnlib.api.oracle.parallelism.ThreadPool;
31
import de.learnlib.setting.LearnLibProperty;
32
import de.learnlib.setting.LearnLibSettings;
33
import net.automatalib.commons.smartcollections.ArrayStorage;
34
import org.checkerframework.checker.index.qual.NonNegative;
35

36
/**
37
 * A batch processor that statically distributes a set of queries among several threads.
38
 * <p>
39
 * An incoming set of queries is divided into a given number of batches, such that the sizes of all batches differ by at
40
 * most one. This keeps the required synchronization effort low, but if some batches are "harder" (for whatever reason)
41
 * than others, the load can be very unbalanced.
42
 *
43
 * @param <Q>
44
 *         query type
45
 * @param <P>
46
 *         (sub-) processor type
47
 */
48
public abstract class AbstractStaticBatchProcessor<Q, P extends BatchProcessor<Q>>
49
        implements ThreadPool, BatchProcessor<Q> {
50

51
    private static final int DEFAULT_MIN_BATCH_SIZE = 10;
52
    public static final int MIN_BATCH_SIZE;
53
    public static final int NUM_INSTANCES;
54
    public static final PoolPolicy POOL_POLICY;
55

56
    static {
57
        LearnLibSettings settings = LearnLibSettings.getInstance();
2✔
58

59
        int numCores = Runtime.getRuntime().availableProcessors();
2✔
60

61
        MIN_BATCH_SIZE = settings.getInt(LearnLibProperty.PARALLEL_BATCH_SIZE_STATIC, DEFAULT_MIN_BATCH_SIZE);
2✔
62
        NUM_INSTANCES = settings.getInt(LearnLibProperty.PARALLEL_POOL_SIZE, numCores);
2✔
63
        POOL_POLICY = settings.getEnumValue(LearnLibProperty.PARALLEL_POOL_SIZE, PoolPolicy.class, PoolPolicy.CACHED);
2✔
64
    }
2✔
65

66
    private final @NonNegative int minBatchSize;
67
    private final ArrayStorage<P> oracles;
68
    private final ExecutorService executor;
69

70
    public AbstractStaticBatchProcessor(Collection<? extends P> oracles,
71
                                        @NonNegative int minBatchSize,
72
                                        PoolPolicy policy) {
2✔
73

74
        this.oracles = new ArrayStorage<>(oracles);
2✔
75

76
        switch (policy) {
2✔
77
            case FIXED:
78
                this.executor = Executors.newFixedThreadPool(this.oracles.size() - 1);
2✔
79
                break;
2✔
80
            case CACHED:
81
                this.executor = Executors.newCachedThreadPool();
2✔
82
                break;
2✔
83
            default:
84
                throw new IllegalArgumentException("Illegal pool policy: " + policy);
×
85
        }
86
        this.minBatchSize = minBatchSize;
2✔
87
    }
2✔
88

89
    @Override
90
    public void processBatch(Collection<? extends Q> queries) {
91
        int num = queries.size();
2✔
92
        if (num == 0) {
2✔
93
            return;
2✔
94
        }
95

96
        int numBatches = (num - minBatchSize) / minBatchSize + 1;
2✔
97
        if (numBatches > oracles.size()) {
2✔
98
            numBatches = oracles.size();
2✔
99
        }
100

101
        // One batch is always executed in the local thread. This saves the thread creation
102
        // overhead for the common case where the batch size is quite small.
103
        int externalBatches = numBatches - 1;
2✔
104

105
        if (externalBatches == 0) {
2✔
106
            processQueriesLocally(queries);
2✔
107
            return;
2✔
108
        }
109

110
        // Calculate the number of full and non-full batches. The difference in size
111
        // will never exceed one (cf. pidgeonhole principle)
112
        int fullBatchSize = (num - 1) / numBatches + 1;
2✔
113
        int nonFullBatches = fullBatchSize * numBatches - num;
2✔
114

115
        List<Future<?>> futures = new ArrayList<>(externalBatches);
2✔
116

117
        Iterator<? extends Q> queryIt = queries.iterator();
2✔
118

119
        // Start the threads for the external batches
120
        for (int i = 0; i < externalBatches; i++) {
2✔
121
            int bs = fullBatchSize;
2✔
122
            if (i < nonFullBatches) {
2✔
123
                bs--;
2✔
124
            }
125
            List<Q> batch = new ArrayList<>(bs);
2✔
126
            for (int j = 0; j < bs; j++) {
2✔
127
                batch.add(queryIt.next());
2✔
128
            }
129

130
            Runnable job = new StaticQueriesJob<>(batch, oracles.get(i + 1));
2✔
131
            Future<?> future = executor.submit(job);
2✔
132
            futures.add(future);
2✔
133
        }
134

135
        // Finally, prepare and process the batch for the oracle executed in this thread.
136
        List<Q> localBatch = new ArrayList<>(fullBatchSize);
2✔
137
        for (int j = 0; j < fullBatchSize; j++) {
2✔
138
            localBatch.add(queryIt.next());
2✔
139
        }
140

141
        processQueriesLocally(localBatch);
2✔
142

143
        try {
144
            for (Future<?> f : futures) {
2✔
145
                f.get();
2✔
146
            }
2✔
147
        } catch (ExecutionException ex) {
×
148
            Throwables.throwIfUnchecked(ex.getCause());
×
149
            throw new AssertionError("Runnable must not throw checked exceptions", ex);
×
150
        } catch (InterruptedException ex) {
×
151
            Thread.interrupted();
×
152
            throw new BatchInterruptedException(ex);
×
153
        }
2✔
154
    }
2✔
155

156
    private void processQueriesLocally(Collection<? extends Q> localBatch) {
157
        oracles.get(0).processBatch(localBatch);
2✔
158
    }
2✔
159

160
    @Override
161
    public void shutdown() {
162
        executor.shutdown();
2✔
163
    }
2✔
164

165
    @Override
166
    public void shutdownNow() {
167
        executor.shutdownNow();
2✔
168
    }
2✔
169

170
    protected P getProcessor() {
171
        return oracles.get(0);
2✔
172
    }
173

174
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc