• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

amaembo / streamex / #667

02 Sep 2023 01:21PM UTC coverage: 99.462% (-0.2%) from 99.619%
#667

push

amaembo
One more test for Limiter

5733 of 5764 relevant lines covered (99.46%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.94
/src/main/java/one/util/streamex/OrderedCancellableSpliterator.java
1
/*
2
 * Copyright 2015, 2019 StreamEx contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package one.util.streamex;
17

18
import java.util.ArrayDeque;
19
import java.util.Spliterator;
20
import java.util.function.BiConsumer;
21
import java.util.function.BinaryOperator;
22
import java.util.function.Consumer;
23
import java.util.function.Predicate;
24
import java.util.function.Supplier;
25

26
import static one.util.streamex.Internals.CancelException;
27
import static one.util.streamex.Internals.CloneableSpliterator;
28

29
/**
30
 * @author Tagir Valeev
31
 */
32
/* package */final class OrderedCancellableSpliterator<T, A> extends CloneableSpliterator<A, OrderedCancellableSpliterator<T, A>> {
33
    private Spliterator<T> source;
34
    private final Object lock = new Object();
1✔
35
    private final BiConsumer<A, ? super T> accumulator;
36
    private final Predicate<A> cancelPredicate;
37
    private final BinaryOperator<A> combiner;
38
    private final Supplier<A> supplier;
39
    private volatile boolean localCancelled;
40
    private volatile OrderedCancellableSpliterator<T, A> prefix;
41
    private volatile OrderedCancellableSpliterator<T, A> suffix;
42
    private A payload;
43

44
    OrderedCancellableSpliterator(Spliterator<T> source, Supplier<A> supplier, BiConsumer<A, ? super T> accumulator,
45
            BinaryOperator<A> combiner, Predicate<A> cancelPredicate) {
1✔
46
        this.source = source;
1✔
47
        this.supplier = supplier;
1✔
48
        this.accumulator = accumulator;
1✔
49
        this.combiner = combiner;
1✔
50
        this.cancelPredicate = cancelPredicate;
1✔
51
    }
1✔
52

53
    @Override
54
    public boolean tryAdvance(Consumer<? super A> action) {
55
        Spliterator<T> source = this.source;
1✔
56
        if (source == null || localCancelled) {
1✔
57
            this.source = null;
1✔
58
            return false;
1✔
59
        }
60
        A acc = supplier.get();
1✔
61
        try {
62
            source.forEachRemaining(t -> {
1✔
63
                accumulator.accept(acc, t);
1✔
64
                if (cancelPredicate.test(acc)) {
1✔
65
                    cancelSuffix();
1✔
66
                    throw new CancelException();
1✔
67
                }
68
                if (localCancelled) {
1✔
69
                    throw new CancelException();
1✔
70
                }
71
            });
1✔
72
        } catch (CancelException ex) {
1✔
73
            if (localCancelled) {
1✔
74
                return false;
1✔
75
            }
76
        }
1✔
77
        this.source = null;
1✔
78
        A result = acc;
1✔
79
        while (true) {
80
            if (prefix == null && suffix == null) {
1✔
81
                action.accept(result);
1✔
82
                return true;
1✔
83
            }
84
            ArrayDeque<A> res = new ArrayDeque<>();
1✔
85
            res.offer(result);
1✔
86
            synchronized (lock) {
1✔
87
                if (localCancelled)
1✔
88
                    return false;
1✔
89
                OrderedCancellableSpliterator<T, A> s = prefix;
1✔
90
                while (s != null) {
1✔
91
                    if (s.payload == null)
1✔
92
                        break;
1✔
93
                    res.offerFirst(s.payload);
1✔
94
                    s = s.prefix;
1✔
95
                }
96
                prefix = s;
1✔
97
                if (s != null) {
1✔
98
                    s.suffix = this;
1✔
99
                }
100
                s = suffix;
1✔
101
                while (s != null) {
1✔
102
                    if (s.payload == null)
1✔
103
                        break;
1✔
104
                    res.offerLast(s.payload);
1✔
105
                    s = s.suffix;
1✔
106
                }
107
                suffix = s;
1✔
108
                if (s != null) {
1✔
109
                    s.prefix = this;
1✔
110
                }
111
                if (res.size() == 1) {
1✔
112
                    if (prefix == null && suffix == null) {
1✔
113
                        action.accept(result);
×
114
                        return true;
×
115
                    }
116
                    this.payload = result;
1✔
117
                    break;
1✔
118
                }
119
            }
1✔
120
            result = res.pollFirst();
1✔
121
            while (!res.isEmpty()) {
1✔
122
                result = combiner.apply(result, res.pollFirst());
1✔
123
                if (cancelPredicate.test(result)) {
1✔
124
                    cancelSuffix();
1✔
125
                }
126
            }
127
        }
1✔
128
        return false;
1✔
129
    }
130

131
    private void cancelSuffix() {
132
        if (this.suffix == null)
1✔
133
            return;
1✔
134
        synchronized (lock) {
1✔
135
            OrderedCancellableSpliterator<T, A> suffix = this.suffix;
1✔
136
            while (suffix != null && !suffix.localCancelled) {
1✔
137
                suffix.prefix = null;
1✔
138
                suffix.localCancelled = true;
1✔
139
                suffix = suffix.suffix;
1✔
140
            }
141
            this.suffix = null;
1✔
142
        }
1✔
143
    }
1✔
144

145
    @Override
146
    public void forEachRemaining(Consumer<? super A> action) {
147
        tryAdvance(action);
1✔
148
    }
1✔
149

150
    @Override
151
    public Spliterator<A> trySplit() {
152
        if (source == null || localCancelled) {
1✔
153
            source = null;
1✔
154
            return null;
1✔
155
        }
156
        Spliterator<T> prefix = source.trySplit();
1✔
157
        if (prefix == null) {
1✔
158
            return null;
1✔
159
        }
160
        synchronized (lock) {
1✔
161
            OrderedCancellableSpliterator<T, A> result = doClone();
1✔
162
            result.source = prefix;
1✔
163
            this.prefix = result;
1✔
164
            result.suffix = this;
1✔
165
            OrderedCancellableSpliterator<T, A> prefixPrefix = result.prefix;
1✔
166
            if (prefixPrefix != null)
1✔
167
                prefixPrefix.suffix = result;
1✔
168
            return result;
1✔
169
        }
170
    }
171

172
    @Override
173
    public long estimateSize() {
174
        return source == null ? 0 : source.estimateSize();
1✔
175
    }
176

177
    @Override
178
    public int characteristics() {
179
        return source == null ? SIZED : ORDERED;
1✔
180
    }
181
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc