• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #5

27 Jan 2025 01:17PM UTC coverage: 92.442%. First build
#5

push

web-flow
Merge pull request #3 from wz2cool/0.0.5

0.0.5

71 of 82 new or added lines in 5 files covered. (86.59%)

318 of 344 relevant lines covered (92.44%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.06
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleConsumer.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IConsumer;
4
import com.github.wz2cool.localqueue.model.config.SimpleConsumerConfig;
5
import com.github.wz2cool.localqueue.model.message.QueueMessage;
6
import net.openhft.chronicle.queue.ChronicleQueue;
7
import net.openhft.chronicle.queue.ExcerptTailer;
8
import net.openhft.chronicle.queue.RollCycles;
9
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
10

11
import java.util.ArrayList;
12
import java.util.List;
13
import java.util.Objects;
14
import java.util.Optional;
15
import java.util.concurrent.*;
16
import java.util.concurrent.atomic.AtomicInteger;
17
import java.util.concurrent.locks.Lock;
18
import java.util.concurrent.locks.ReentrantLock;
19

20
/**
21
 * simple consumer
22
 *
23
 * @author frank
24
 */
25
public class SimpleConsumer implements IConsumer, AutoCloseable {
26

27
    private final SimpleConsumerConfig config;
28
    private final PositionStore positionStore;
29

30
    private final SingleChronicleQueue queue;
31
    private final ThreadLocal<ExcerptTailer> tailerThreadLocal;
32
    private final ExecutorService readCacheExecutor = Executors.newSingleThreadExecutor();
1✔
33
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1✔
34
    private final LinkedBlockingQueue<QueueMessage> messageCache;
35
    private volatile long ackedReadPosition = -1;
1✔
36
    private volatile boolean isReadToCacheRunning = true;
1✔
37
    private volatile boolean isClosing = false;
1✔
38
    private volatile boolean isClosed = false;
1✔
39
    private final AtomicInteger positionVersion = new AtomicInteger(0);
1✔
40

41
    private final Lock internalLock = new ReentrantLock();
1✔
42

43

44
    /**
45
     * constructor
46
     *
47
     * @param config the config of consumer
48
     */
49
    public SimpleConsumer(final SimpleConsumerConfig config) {
1✔
50
        this.config = config;
1✔
51
        this.messageCache = new LinkedBlockingQueue<>(config.getCacheSize());
1✔
52
        this.positionStore = new PositionStore(config.getPositionFile());
1✔
53
        this.queue = ChronicleQueue.singleBuilder(config.getDataDir()).rollCycle(RollCycles.FAST_DAILY).build();
1✔
54
        this.tailerThreadLocal = ThreadLocal.withInitial(this::initExcerptTailer);
1✔
55
        scheduler.scheduleAtFixedRate(this::flushPosition, 0, config.getFlushPositionInterval(), TimeUnit.MILLISECONDS);
1✔
56
        startReadToCache();
1✔
57
    }
1✔
58

59
    @Override
60
    public synchronized QueueMessage take() throws InterruptedException {
61
        return this.messageCache.take();
1✔
62
    }
63

64
    @Override
65
    public synchronized List<QueueMessage> batchTake(int maxBatchSize) throws InterruptedException {
66
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
67
        QueueMessage take = this.messageCache.take();
1✔
68
        result.add(take);
1✔
69
        this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
70
        return result;
1✔
71
    }
72

73
    @Override
74
    public synchronized Optional<QueueMessage> take(long timeout, TimeUnit unit) throws InterruptedException {
75
        QueueMessage message = this.messageCache.poll(timeout, unit);
1✔
76
        return Optional.ofNullable(message);
1✔
77
    }
78

79
    @Override
80
    public synchronized List<QueueMessage> batchTake(int maxBatchSize, long timeout, TimeUnit unit) throws InterruptedException {
81
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
82
        QueueMessage poll = this.messageCache.poll(timeout, unit);
1✔
83
        if (Objects.nonNull(poll)) {
1✔
84
            result.add(poll);
1✔
85
            this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
86
        }
87
        return result;
1✔
88
    }
89

90
    @Override
91
    public synchronized Optional<QueueMessage> poll() {
92
        QueueMessage message = this.messageCache.poll();
1✔
93
        return Optional.ofNullable(message);
1✔
94
    }
95

96
    @Override
97
    public synchronized List<QueueMessage> batchPoll(int maxBatchSize) {
98
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
99
        this.messageCache.drainTo(result, maxBatchSize);
1✔
100
        return result;
1✔
101
    }
102

103
    @Override
104
    public synchronized void ack(final QueueMessage message) {
105
        if (Objects.isNull(message)) {
1✔
106
            return;
1✔
107
        }
108

109
        if (message.getPositionVersion() != positionVersion.get()) {
1✔
110
            return;
×
111
        }
112

113
        this.ackedReadPosition = message.getPosition();
1✔
114
    }
1✔
115

116
    @Override
117
    public synchronized void ack(final List<QueueMessage> messages) {
118
        if (Objects.isNull(messages) || messages.isEmpty()) {
1✔
119
            return;
1✔
120
        }
121
        QueueMessage lastOne = messages.get(messages.size() - 1);
1✔
122
        if (lastOne.getPositionVersion() != positionVersion.get()) {
1✔
123
            return;
×
124
        }
125
        this.ackedReadPosition = lastOne.getPosition();
1✔
126
    }
1✔
127

128
    @Override
129
    public boolean moveToPosition(final long position) {
130
        stopReadToCache();
1✔
131
        try {
132
            internalLock.lock();
1✔
133
            return CompletableFuture.supplyAsync(() -> {
1✔
134
                ExcerptTailer tailer = tailerThreadLocal.get();
1✔
135
                boolean moveToResult = tailer.moveToIndex(position);
1✔
136
                if (moveToResult) {
1✔
137
                    positionVersion.incrementAndGet();
1✔
138
                    messageCache.clear();
1✔
139
                    this.ackedReadPosition = position;
1✔
140
                }
141
                return moveToResult;
1✔
142
            }, this.readCacheExecutor).join();
1✔
143
        } finally {
144
            internalLock.unlock();
1✔
145
            startReadToCache();
1✔
146
        }
×
147
    }
148

149
    public long getAckedReadPosition() {
150
        return ackedReadPosition;
1✔
151
    }
152

153
    public boolean isClosed() {
154
        return isClosed;
1✔
155
    }
156

157
    private void stopReadToCache() {
158
        this.isReadToCacheRunning = false;
1✔
159
    }
1✔
160

161
    private void startReadToCache() {
162
        this.isReadToCacheRunning = true;
1✔
163
        readCacheExecutor.execute(this::readToCache);
1✔
164
    }
1✔
165

166
    private void readToCache() {
167
        try {
168
            internalLock.lock();
1✔
169
            long pullInterval = config.getPullInterval();
1✔
170
            while (this.isReadToCacheRunning && !isClosing) {
1✔
171
                try {
172
                    ExcerptTailer tailer = tailerThreadLocal.get();
1✔
173
                    String message = tailer.readText();
1✔
174
                    if (Objects.isNull(message)) {
1✔
175
                        TimeUnit.MILLISECONDS.sleep(pullInterval);
1✔
176
                        continue;
1✔
177
                    }
178
                    long lastedReadIndex = tailer.lastReadIndex();
1✔
179
                    QueueMessage queueMessage = new QueueMessage(positionVersion.get(), lastedReadIndex, message);
1✔
180
                    this.messageCache.put(queueMessage);
1✔
181
                } catch (InterruptedException e) {
×
182
                    Thread.currentThread().interrupt();
×
183
                }
1✔
184
            }
185
        } finally {
186
            internalLock.unlock();
1✔
187
        }
1✔
188
    }
1✔
189

190
    private ExcerptTailer initExcerptTailer() {
191
        ExcerptTailer tailer = queue.createTailer();
1✔
192
        Optional<Long> lastPositionOptional = getLastPosition();
1✔
193
        if (lastPositionOptional.isPresent()) {
1✔
194
            Long position = lastPositionOptional.get();
1✔
195
            long beginPosition = position + 1;
1✔
196
            tailer.moveToIndex(beginPosition);
1✔
197
        }
198
        return tailer;
1✔
199
    }
200

201
    /// region position
202

203
    private void flushPosition() {
204
        if (ackedReadPosition != -1) {
1✔
205
            setLastPosition(this.ackedReadPosition);
1✔
206
        }
207
    }
1✔
208

209
    private Optional<Long> getLastPosition() {
210
        Long position = positionStore.get(config.getConsumerId());
1✔
211
        if (position == null) {
1✔
212
            return Optional.empty();
1✔
213
        }
214
        return Optional.of(position);
1✔
215
    }
216

217
    private void setLastPosition(long position) {
218
        positionStore.put(config.getConsumerId(), position);
1✔
219
    }
1✔
220

221
    /// endregion
222

223
    @Override
224
    public synchronized void close() {
225
        isClosing = true;
1✔
226
        this.internalLock.lock();
1✔
227
        stopReadToCache();
1✔
228
        this.internalLock.unlock();
1✔
229

230
        positionStore.close();
1✔
231
        scheduler.shutdown();
1✔
232
        readCacheExecutor.shutdown();
1✔
233
        try {
234
            if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
235
                scheduler.shutdownNow();
×
236
            }
237
            if (!readCacheExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
NEW
238
                readCacheExecutor.shutdownNow();
×
239
            }
NEW
240
        } catch (InterruptedException e) {
×
NEW
241
            scheduler.shutdownNow();
×
NEW
242
            readCacheExecutor.shutdownNow();
×
NEW
243
            Thread.currentThread().interrupt();
×
244
        }
1✔
245
        tailerThreadLocal.remove();
1✔
246
        queue.close();
1✔
247
        isClosed = true;
1✔
248

249
    }
1✔
250
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc