• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #12

29 Jan 2025 08:22AM UTC coverage: 93.366% (+0.5%) from 92.857%
#12

push

wz2cool
add consumeFrom

1 of 1 new or added line in 1 file covered. (100.0%)

11 existing lines in 3 files now uncovered.

380 of 407 relevant lines covered (93.37%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.16
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleConsumer.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IConsumer;
4
import com.github.wz2cool.localqueue.model.config.SimpleConsumerConfig;
5
import com.github.wz2cool.localqueue.model.enums.ConsumeFromWhere;
6
import com.github.wz2cool.localqueue.model.message.InternalReadMessage;
7
import com.github.wz2cool.localqueue.model.message.QueueMessage;
8
import net.openhft.chronicle.queue.ChronicleQueue;
9
import net.openhft.chronicle.queue.ExcerptTailer;
10
import net.openhft.chronicle.queue.RollCycles;
11
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
12

13
import java.util.*;
14
import java.util.concurrent.*;
15
import java.util.concurrent.atomic.AtomicInteger;
16
import java.util.concurrent.locks.Lock;
17
import java.util.concurrent.locks.ReentrantLock;
18

19
/**
20
 * simple consumer
21
 *
22
 * @author frank
23
 */
24
public class SimpleConsumer implements IConsumer, AutoCloseable {
25

26
    private final SimpleConsumerConfig config;
27
    private final PositionStore positionStore;
28

29
    private final SingleChronicleQueue queue;
30
    private final ThreadLocal<ExcerptTailer> tailerThreadLocal;
31
    private final ExecutorService readCacheExecutor = Executors.newSingleThreadExecutor();
1✔
32
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1✔
33
    private final LinkedBlockingQueue<QueueMessage> messageCache;
34
    private volatile long ackedReadPosition = -1;
1✔
35
    private volatile boolean isReadToCacheRunning = true;
1✔
36
    private volatile boolean isClosing = false;
1✔
37
    private volatile boolean isClosed = false;
1✔
38
    private final AtomicInteger positionVersion = new AtomicInteger(0);
1✔
39

40
    private final Lock internalLock = new ReentrantLock();
1✔
41

42

43
    /**
44
     * constructor
45
     *
46
     * @param config the config of consumer
47
     */
48
    public SimpleConsumer(final SimpleConsumerConfig config) {
1✔
49
        this.config = config;
1✔
50
        this.messageCache = new LinkedBlockingQueue<>(config.getCacheSize());
1✔
51
        this.positionStore = new PositionStore(config.getPositionFile());
1✔
52
        this.queue = ChronicleQueue.singleBuilder(config.getDataDir()).rollCycle(RollCycles.FAST_DAILY).build();
1✔
53
        this.tailerThreadLocal = ThreadLocal.withInitial(this::initExcerptTailer);
1✔
54
        scheduler.scheduleAtFixedRate(this::flushPosition, 0, config.getFlushPositionInterval(), TimeUnit.MILLISECONDS);
1✔
55
        startReadToCache();
1✔
56
    }
1✔
57

58
    @Override
59
    public synchronized QueueMessage take() throws InterruptedException {
60
        return this.messageCache.take();
1✔
61
    }
62

63
    @Override
64
    public synchronized List<QueueMessage> batchTake(int maxBatchSize) throws InterruptedException {
65
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
66
        QueueMessage take = this.messageCache.take();
1✔
67
        result.add(take);
1✔
68
        this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
69
        return result;
1✔
70
    }
71

72
    @Override
73
    public synchronized Optional<QueueMessage> take(long timeout, TimeUnit unit) throws InterruptedException {
74
        QueueMessage message = this.messageCache.poll(timeout, unit);
1✔
75
        return Optional.ofNullable(message);
1✔
76
    }
77

78
    @Override
79
    public synchronized List<QueueMessage> batchTake(int maxBatchSize, long timeout, TimeUnit unit) throws InterruptedException {
80
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
81
        QueueMessage poll = this.messageCache.poll(timeout, unit);
1✔
82
        if (Objects.nonNull(poll)) {
1✔
83
            result.add(poll);
1✔
84
            this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
85
        }
86
        return result;
1✔
87
    }
88

89
    @Override
90
    public synchronized Optional<QueueMessage> poll() {
91
        QueueMessage message = this.messageCache.poll();
1✔
92
        return Optional.ofNullable(message);
1✔
93
    }
94

95
    @Override
96
    public synchronized List<QueueMessage> batchPoll(int maxBatchSize) {
97
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
98
        this.messageCache.drainTo(result, maxBatchSize);
1✔
99
        return result;
1✔
100
    }
101

102
    @Override
103
    public synchronized void ack(final QueueMessage message) {
104
        if (Objects.isNull(message)) {
1✔
105
            return;
1✔
106
        }
107

108
        if (message.getPositionVersion() != positionVersion.get()) {
1✔
UNCOV
109
            return;
×
110
        }
111

112
        this.ackedReadPosition = message.getPosition();
1✔
113
    }
1✔
114

115
    @Override
116
    public synchronized void ack(final List<QueueMessage> messages) {
117
        if (Objects.isNull(messages) || messages.isEmpty()) {
1✔
118
            return;
1✔
119
        }
120
        QueueMessage lastOne = messages.get(messages.size() - 1);
1✔
121
        if (lastOne.getPositionVersion() != positionVersion.get()) {
1✔
UNCOV
122
            return;
×
123
        }
124
        this.ackedReadPosition = lastOne.getPosition();
1✔
125
    }
1✔
126

127
    @Override
128
    public boolean moveToPosition(final long position) {
129
        stopReadToCache();
1✔
130
        try {
131
            internalLock.lock();
1✔
132
            return CompletableFuture.supplyAsync(() -> {
1✔
133
                ExcerptTailer tailer = tailerThreadLocal.get();
1✔
134
                boolean moveToResult = tailer.moveToIndex(position);
1✔
135
                if (moveToResult) {
1✔
136
                    positionVersion.incrementAndGet();
1✔
137
                    messageCache.clear();
1✔
138
                    this.ackedReadPosition = position;
1✔
139
                }
140
                return moveToResult;
1✔
141
            }, this.readCacheExecutor).join();
1✔
142
        } finally {
143
            internalLock.unlock();
1✔
144
            startReadToCache();
1✔
UNCOV
145
        }
×
146
    }
147

148
    @Override
149
    public boolean moveToTimestamp(long timestamp) {
150
        Optional<Long> positionOptional = findPosition(timestamp);
1✔
151
        if (!positionOptional.isPresent()) {
1✔
152
            return false;
1✔
153
        }
154
        Long position = positionOptional.get();
1✔
155
        return moveToPosition(position);
1✔
156
    }
157

158
    private Optional<Long> findPosition(final long timestamp) {
159
        try (ExcerptTailer excerptTailer = initExcerptTailer()) {
1✔
160
            while (true) {
161
                InternalReadMessage internalReadMessage = new InternalReadMessage();
1✔
162
                boolean resultResult = excerptTailer.readBytes(internalReadMessage);
1✔
163
                if (resultResult) {
1✔
164
                    if (internalReadMessage.getWriteTime() >= timestamp) {
1✔
165
                        return Optional.of(excerptTailer.lastReadIndex());
1✔
166
                    }
167
                } else {
168
                    return Optional.empty();
1✔
169
                }
170
            }
1✔
171
        }
1✔
172
    }
173

174
    public long getAckedReadPosition() {
175
        return ackedReadPosition;
1✔
176
    }
177

178
    public boolean isClosed() {
179
        return isClosed;
1✔
180
    }
181

182
    private void stopReadToCache() {
183
        this.isReadToCacheRunning = false;
1✔
184
    }
1✔
185

186
    private void startReadToCache() {
187
        this.isReadToCacheRunning = true;
1✔
188
        readCacheExecutor.execute(this::readToCache);
1✔
189
    }
1✔
190

191
    private void readToCache() {
192
        try {
193
            internalLock.lock();
1✔
194
            long pullInterval = config.getPullInterval();
1✔
195
            while (this.isReadToCacheRunning && !isClosing) {
1✔
196
                try {
197
                    // when cache is full, sleep
198
                    if (this.messageCache.remainingCapacity() == 0) {
1✔
199
                        TimeUnit.MILLISECONDS.sleep(pullInterval);
1✔
200
                        continue;
1✔
201
                    }
202
                    ExcerptTailer tailer = tailerThreadLocal.get();
1✔
203
                    InternalReadMessage internalReadMessage = new InternalReadMessage();
1✔
204
                    boolean readResult = tailer.readBytes(internalReadMessage);
1✔
205
                    System.out.println("cache: " + readResult);
1✔
206
                    if (!readResult) {
1✔
207
                        TimeUnit.MILLISECONDS.sleep(pullInterval);
1✔
208
                        continue;
1✔
209
                    }
210
                    long lastedReadIndex = tailer.lastReadIndex();
1✔
211
                    QueueMessage queueMessage = new QueueMessage(
1✔
212
                            positionVersion.get(),
1✔
213
                            lastedReadIndex,
214
                            internalReadMessage.getContent(),
1✔
215
                            internalReadMessage.getWriteTime());
1✔
216
                    boolean offerResult = this.messageCache.offer(queueMessage, pullInterval, TimeUnit.MILLISECONDS);
1✔
217
                    if (!offerResult) {
1✔
218
                        // if offer failed, move to last read position
UNCOV
219
                        tailer.moveToIndex(lastedReadIndex);
×
220
                    }
UNCOV
221
                } catch (InterruptedException e) {
×
UNCOV
222
                    Thread.currentThread().interrupt();
×
223
                }
1✔
224
            }
225
        } finally {
226
            internalLock.unlock();
1✔
227
        }
1✔
228
    }
1✔
229

230

231
    private ExcerptTailer initExcerptTailer() {
232
        ExcerptTailer tailer = queue.createTailer();
1✔
233
        Optional<Long> lastPositionOptional = getLastPosition();
1✔
234
        if (lastPositionOptional.isPresent()) {
1✔
235
            Long position = lastPositionOptional.get();
1✔
236
            long beginPosition = position + 1;
1✔
237
            tailer.moveToIndex(beginPosition);
1✔
238
        } else {
1✔
239
            ConsumeFromWhere consumeFromWhere = this.config.getConsumeFromWhere();
1✔
240
            if (consumeFromWhere == ConsumeFromWhere.LAST) {
1✔
241
                tailer.toEnd();
1✔
242
            } else if (consumeFromWhere == ConsumeFromWhere.FIRST) {
1✔
243
                tailer.toStart();
1✔
244
            }
245
        }
246
        return tailer;
1✔
247
    }
248

249
    /// region position
250

251
    private void flushPosition() {
252
        if (ackedReadPosition != -1) {
1✔
253
            setLastPosition(this.ackedReadPosition);
1✔
254
        }
255
    }
1✔
256

257
    private Optional<Long> getLastPosition() {
258
        Long position = positionStore.get(config.getConsumerId());
1✔
259
        if (position == null) {
1✔
260
            return Optional.empty();
1✔
261
        }
262
        return Optional.of(position);
1✔
263
    }
264

265
    private void setLastPosition(long position) {
266
        positionStore.put(config.getConsumerId(), position);
1✔
267
    }
1✔
268

269
    /// endregion
270

271
    @Override
272
    public synchronized void close() {
273
        isClosing = true;
1✔
274
        this.internalLock.lock();
1✔
275
        stopReadToCache();
1✔
276
        this.internalLock.unlock();
1✔
277

278
        positionStore.close();
1✔
279
        scheduler.shutdown();
1✔
280
        readCacheExecutor.shutdown();
1✔
281
        try {
282
            if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
UNCOV
283
                scheduler.shutdownNow();
×
284
            }
285
            if (!readCacheExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
UNCOV
286
                readCacheExecutor.shutdownNow();
×
287
            }
UNCOV
288
        } catch (InterruptedException e) {
×
289
            scheduler.shutdownNow();
×
290
            readCacheExecutor.shutdownNow();
×
291
            Thread.currentThread().interrupt();
×
292
        }
1✔
293
        tailerThreadLocal.remove();
1✔
294
        queue.close();
1✔
295
        isClosed = true;
1✔
296

297
    }
1✔
298
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc