• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

amaembo / streamex / #667

02 Sep 2023 01:21PM UTC coverage: 99.462% (-0.2%) from 99.619%
#667

push

amaembo
One more test for Limiter

5733 of 5764 relevant lines covered (99.46%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.0
/src/main/java/one/util/streamex/UnorderedCancellableSpliterator.java
1
/*
2
 * Copyright 2015, 2019 StreamEx contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package one.util.streamex;
17

18
import java.util.Spliterator;
19
import java.util.concurrent.ConcurrentLinkedQueue;
20
import java.util.concurrent.atomic.AtomicBoolean;
21
import java.util.concurrent.atomic.AtomicInteger;
22
import java.util.function.BiConsumer;
23
import java.util.function.BinaryOperator;
24
import java.util.function.Consumer;
25
import java.util.function.Predicate;
26
import java.util.function.Supplier;
27

28
import static one.util.streamex.Internals.CancelException;
29
import static one.util.streamex.Internals.CloneableSpliterator;
30

31
/**
32
 * @author Tagir Valeev
33
 */
34
/* package */class UnorderedCancellableSpliterator<T, A> extends CloneableSpliterator<A, UnorderedCancellableSpliterator<T, A>> {
35
    private volatile Spliterator<T> source;
36
    private final BiConsumer<A, ? super T> accumulator;
37
    private final Predicate<A> cancelPredicate;
38
    private final Supplier<A> supplier;
39
    private final ConcurrentLinkedQueue<A> partialResults = new ConcurrentLinkedQueue<>();
1✔
40
    private final AtomicBoolean cancelled = new AtomicBoolean(false);
1✔
41
    private final AtomicInteger nPeers = new AtomicInteger(1);
1✔
42
    private final BinaryOperator<A> combiner;
43
    private boolean flag;
44

45
    UnorderedCancellableSpliterator(Spliterator<T> source, Supplier<A> supplier, BiConsumer<A, ? super T> accumulator,
46
            BinaryOperator<A> combiner, Predicate<A> cancelPredicate) {
1✔
47
        this.source = source;
1✔
48
        this.supplier = supplier;
1✔
49
        this.accumulator = accumulator;
1✔
50
        this.combiner = combiner;
1✔
51
        this.cancelPredicate = cancelPredicate;
1✔
52
    }
1✔
53

54
    private boolean checkCancel(A acc) {
55
        if (cancelPredicate.test(acc)) {
1✔
56
            if (cancelled.compareAndSet(false, true)) {
1✔
57
                flag = true;
1✔
58
                return true;
1✔
59
            }
60
        }
61
        if (cancelled.get()) {
1✔
62
            flag = false;
×
63
            return true;
×
64
        }
65
        return false;
1✔
66
    }
67

68
    private boolean handleCancel(Consumer<? super A> action, A acc) {
69
        source = null;
1✔
70
        if (flag) {
1✔
71
            action.accept(acc);
1✔
72
            return true;
1✔
73
        }
74
        return false;
×
75
    }
76

77
    @Override
78
    public boolean tryAdvance(Consumer<? super A> action) {
79
        Spliterator<T> source = this.source;
1✔
80
        if (source == null || cancelled.get()) {
1✔
81
            this.source = null;
1✔
82
            return false;
1✔
83
        }
84
        A acc = supplier.get();
1✔
85
        if (checkCancel(acc))
1✔
86
            return handleCancel(action, acc);
1✔
87
        try {
88
            source.forEachRemaining(t -> {
1✔
89
                accumulator.accept(acc, t);
1✔
90
                if (checkCancel(acc))
1✔
91
                    throw new CancelException();
1✔
92
            });
1✔
93
        } catch (CancelException ex) {
1✔
94
            return handleCancel(action, acc);
1✔
95
        }
1✔
96
        A result = acc;
1✔
97
        while (true) {
98
            A acc2 = partialResults.poll();
1✔
99
            if (acc2 == null)
1✔
100
                break;
1✔
101
            result = combiner.apply(result, acc2);
1✔
102
            if (checkCancel(result))
1✔
103
                return handleCancel(action, result);
1✔
104
        }
1✔
105
        partialResults.offer(result);
1✔
106
        this.source = null;
1✔
107
        if (nPeers.decrementAndGet() == 0) {
1✔
108
            result = partialResults.poll();
1✔
109
            // non-cancelled finish
110
            while (true) {
111
                A acc2 = partialResults.poll();
1✔
112
                if (acc2 == null)
1✔
113
                    break;
1✔
114
                result = combiner.apply(result, acc2);
×
115
                if (cancelPredicate.test(result))
×
116
                    break;
×
117
            }
×
118
            this.source = null;
1✔
119
            action.accept(result);
1✔
120
            return true;
1✔
121
        }
122
        return false;
1✔
123
    }
124

125
    @Override
126
    public void forEachRemaining(Consumer<? super A> action) {
127
        tryAdvance(action);
1✔
128
    }
1✔
129

130
    @Override
131
    public Spliterator<A> trySplit() {
132
        if (source == null || cancelled.get()) {
1✔
133
            source = null;
×
134
            return null;
×
135
        }
136
        Spliterator<T> prefix = source.trySplit();
1✔
137
        if (prefix == null) {
1✔
138
            return null;
1✔
139
        }
140
        UnorderedCancellableSpliterator<T, A> result = doClone();
1✔
141
        result.source = prefix;
1✔
142
        nPeers.incrementAndGet();
1✔
143
        return result;
1✔
144
    }
145

146
    @Override
147
    public long estimateSize() {
148
        return source == null ? 0 : source.estimateSize();
1✔
149
    }
150

151
    @Override
152
    public int characteristics() {
153
        return source == null ? SIZED : 0;
1✔
154
    }
155
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc