• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

LearnLib / learnlib / 6805330902

08 Nov 2023 11:40PM UTC coverage: 93.335% (+0.008%) from 93.327%
6805330902

push

github

mtf90
typo

11736 of 12574 relevant lines covered (93.34%)

1.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.89
/oracles/parallelism/src/main/java/de/learnlib/oracle/parallelism/AbstractDynamicBatchProcessor.java
1
/* Copyright (C) 2013-2023 TU Dortmund
2
 * This file is part of LearnLib, http://www.learnlib.de/.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package de.learnlib.oracle.parallelism;
17

18
import java.util.ArrayList;
19
import java.util.Collection;
20
import java.util.List;
21
import java.util.concurrent.ExecutionException;
22
import java.util.concurrent.ExecutorService;
23
import java.util.concurrent.Future;
24
import java.util.function.Supplier;
25

26
import com.google.common.base.Throwables;
27
import de.learnlib.setting.LearnLibProperty;
28
import de.learnlib.setting.LearnLibSettings;
29
import org.checkerframework.checker.index.qual.NonNegative;
30

31
/**
32
 * A batch processor that dynamically distributes queries to worker threads.
33
 *
34
 * @param <Q>
35
 *         query type
36
 * @param <P>
37
 *         (sub-) processor type
38
 */
39
public abstract class AbstractDynamicBatchProcessor<Q, P extends BatchProcessor<Q>>
40
        implements ThreadPool, BatchProcessor<Q> {
41

42
    public static final int BATCH_SIZE;
43
    public static final int POOL_SIZE;
44
    public static final PoolPolicy POOL_POLICY;
45

46
    static {
47
        LearnLibSettings settings = LearnLibSettings.getInstance();
2✔
48

49
        int numProcessors = Runtime.getRuntime().availableProcessors();
2✔
50

51
        BATCH_SIZE = settings.getInt(LearnLibProperty.PARALLEL_BATCH_SIZE_DYNAMIC, 1);
2✔
52
        POOL_SIZE = settings.getInt(LearnLibProperty.PARALLEL_POOL_SIZE, numProcessors);
2✔
53
        POOL_POLICY = settings.getEnumValue(LearnLibProperty.PARALLEL_POOL_POLICY, PoolPolicy.class, PoolPolicy.CACHED);
2✔
54
    }
2✔
55

56
    private final ThreadLocal<P> threadLocalOracle;
57
    private final ExecutorService executor;
58
    private final @NonNegative int batchSize;
59

60
    public AbstractDynamicBatchProcessor(Supplier<? extends P> oracleSupplier,
61
                                         @NonNegative int batchSize,
62
                                         ExecutorService executor) {
2✔
63
        this.threadLocalOracle = ThreadLocal.withInitial(oracleSupplier);
2✔
64
        this.executor = executor;
2✔
65
        this.batchSize = batchSize;
2✔
66
    }
2✔
67

68
    @Override
69
    public void shutdown() {
70
        executor.shutdown();
2✔
71
    }
2✔
72

73
    @Override
74
    public void shutdownNow() {
75
        executor.shutdownNow();
2✔
76
    }
2✔
77

78
    @Override
79
    public void processBatch(Collection<? extends Q> queries) {
80
        if (queries.isEmpty()) {
2✔
81
            return;
2✔
82
        }
83

84
        int numQueries = queries.size();
2✔
85
        int numJobs = (numQueries - 1) / batchSize + 1;
2✔
86
        List<Q> currentBatch = null;
2✔
87

88
        List<Future<?>> futures = new ArrayList<>(numJobs);
2✔
89

90
        for (Q query : queries) {
2✔
91

92
            if (currentBatch == null) {
2✔
93
                currentBatch = new ArrayList<>(batchSize);
2✔
94
            }
95

96
            currentBatch.add(query);
2✔
97
            if (currentBatch.size() == batchSize) {
2✔
98
                Future<?> future = executor.submit(new DynamicQueriesJob<>(currentBatch, threadLocalOracle));
2✔
99
                futures.add(future);
2✔
100
                currentBatch = null;
2✔
101
            }
102
        }
2✔
103

104
        if (currentBatch != null) {
2✔
105
            Future<?> future = executor.submit(new DynamicQueriesJob<>(currentBatch, threadLocalOracle));
2✔
106
            futures.add(future);
2✔
107
        }
108

109
        try {
110
            // Await completion of all jobs
111
            for (Future<?> future : futures) {
2✔
112
                future.get();
2✔
113
            }
2✔
114
        } catch (ExecutionException e) {
2✔
115
            Throwables.throwIfUnchecked(e.getCause());
×
116
            throw new AssertionError("Runnables must not throw checked exceptions", e);
×
117
        } catch (InterruptedException e) {
×
118
            Thread.interrupted();
×
119
            throw new BatchInterruptedException(e);
×
120
        }
2✔
121
    }
2✔
122

123
    protected P getProcessor() {
124
        return threadLocalOracle.get();
2✔
125
    }
126

127
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc