• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #10

29 Jan 2025 05:10AM UTC coverage: 91.94% (+2.1%) from 89.817%
#10

push

wz2cool
move to timestamp

9 of 15 new or added lines in 4 files covered. (60.0%)

1 existing line in 1 file now uncovered.

365 of 397 relevant lines covered (91.94%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.67
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleConsumer.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IConsumer;
4
import com.github.wz2cool.localqueue.model.config.SimpleConsumerConfig;
5
import com.github.wz2cool.localqueue.model.message.InternalReadMessage;
6
import com.github.wz2cool.localqueue.model.message.QueueMessage;
7
import net.openhft.chronicle.queue.ChronicleQueue;
8
import net.openhft.chronicle.queue.ExcerptTailer;
9
import net.openhft.chronicle.queue.RollCycles;
10
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
11

12
import java.util.ArrayList;
13
import java.util.List;
14
import java.util.Objects;
15
import java.util.Optional;
16
import java.util.concurrent.*;
17
import java.util.concurrent.atomic.AtomicInteger;
18
import java.util.concurrent.locks.Lock;
19
import java.util.concurrent.locks.ReentrantLock;
20

21
/**
22
 * simple consumer
23
 *
24
 * @author frank
25
 */
26
public class SimpleConsumer implements IConsumer, AutoCloseable {
27

28
    private final SimpleConsumerConfig config;
29
    private final PositionStore positionStore;
30

31
    private final SingleChronicleQueue queue;
32
    private final ThreadLocal<ExcerptTailer> tailerThreadLocal;
33
    private final ExecutorService readCacheExecutor = Executors.newSingleThreadExecutor();
1✔
34
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1✔
35
    private final LinkedBlockingQueue<QueueMessage> messageCache;
36
    private volatile long ackedReadPosition = -1;
1✔
37
    private volatile boolean isReadToCacheRunning = true;
1✔
38
    private volatile boolean isClosing = false;
1✔
39
    private volatile boolean isClosed = false;
1✔
40
    private final AtomicInteger positionVersion = new AtomicInteger(0);
1✔
41

42
    private final Lock internalLock = new ReentrantLock();
1✔
43

44

45
    /**
46
     * constructor
47
     *
48
     * @param config the config of consumer
49
     */
50
    public SimpleConsumer(final SimpleConsumerConfig config) {
1✔
51
        this.config = config;
1✔
52
        this.messageCache = new LinkedBlockingQueue<>(config.getCacheSize());
1✔
53
        this.positionStore = new PositionStore(config.getPositionFile());
1✔
54
        this.queue = ChronicleQueue.singleBuilder(config.getDataDir()).rollCycle(RollCycles.FAST_DAILY).build();
1✔
55
        this.tailerThreadLocal = ThreadLocal.withInitial(this::initExcerptTailer);
1✔
56
        scheduler.scheduleAtFixedRate(this::flushPosition, 0, config.getFlushPositionInterval(), TimeUnit.MILLISECONDS);
1✔
57
        startReadToCache();
1✔
58
    }
1✔
59

60
    @Override
61
    public synchronized QueueMessage take() throws InterruptedException {
62
        return this.messageCache.take();
1✔
63
    }
64

65
    @Override
66
    public synchronized List<QueueMessage> batchTake(int maxBatchSize) throws InterruptedException {
67
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
68
        QueueMessage take = this.messageCache.take();
1✔
69
        result.add(take);
1✔
70
        this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
71
        return result;
1✔
72
    }
73

74
    @Override
75
    public synchronized Optional<QueueMessage> take(long timeout, TimeUnit unit) throws InterruptedException {
76
        QueueMessage message = this.messageCache.poll(timeout, unit);
1✔
77
        return Optional.ofNullable(message);
1✔
78
    }
79

80
    @Override
81
    public synchronized List<QueueMessage> batchTake(int maxBatchSize, long timeout, TimeUnit unit) throws InterruptedException {
82
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
83
        QueueMessage poll = this.messageCache.poll(timeout, unit);
1✔
84
        if (Objects.nonNull(poll)) {
1✔
85
            result.add(poll);
1✔
86
            this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
87
        }
88
        return result;
1✔
89
    }
90

91
    @Override
92
    public synchronized Optional<QueueMessage> poll() {
93
        QueueMessage message = this.messageCache.poll();
1✔
94
        return Optional.ofNullable(message);
1✔
95
    }
96

97
    @Override
98
    public synchronized List<QueueMessage> batchPoll(int maxBatchSize) {
99
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
100
        this.messageCache.drainTo(result, maxBatchSize);
1✔
101
        return result;
1✔
102
    }
103

104
    @Override
105
    public synchronized void ack(final QueueMessage message) {
106
        if (Objects.isNull(message)) {
1✔
107
            return;
1✔
108
        }
109

110
        if (message.getPositionVersion() != positionVersion.get()) {
1✔
111
            return;
×
112
        }
113

114
        this.ackedReadPosition = message.getPosition();
1✔
115
    }
1✔
116

117
    @Override
118
    public synchronized void ack(final List<QueueMessage> messages) {
119
        if (Objects.isNull(messages) || messages.isEmpty()) {
1✔
120
            return;
1✔
121
        }
122
        QueueMessage lastOne = messages.get(messages.size() - 1);
1✔
123
        if (lastOne.getPositionVersion() != positionVersion.get()) {
1✔
124
            return;
×
125
        }
126
        this.ackedReadPosition = lastOne.getPosition();
1✔
127
    }
1✔
128

129
    @Override
130
    public boolean moveToPosition(final long position) {
131
        stopReadToCache();
1✔
132
        try {
133
            if (isClosing) {
1✔
NEW
134
                return false;
×
135
            }
136
            internalLock.lock();
1✔
137
            if (isClosing) {
1✔
NEW
138
                return false;
×
139
            }
140
            return CompletableFuture.supplyAsync(() -> {
1✔
141
                ExcerptTailer tailer = tailerThreadLocal.get();
1✔
142
                boolean moveToResult = tailer.moveToIndex(position);
1✔
143
                if (moveToResult) {
1✔
144
                    positionVersion.incrementAndGet();
1✔
145
                    messageCache.clear();
1✔
146
                    this.ackedReadPosition = position;
1✔
147
                }
148
                return moveToResult;
1✔
149
            }, this.readCacheExecutor).join();
1✔
150
        } finally {
151
            internalLock.unlock();
1✔
152
            startReadToCache();
1✔
153
        }
×
154
    }
155

156
    @Override
157
    public boolean moveToTimestamp(long timestamp) {
158
        Optional<Long> positionOptional = findPosition(timestamp);
1✔
159
        if (!positionOptional.isPresent()) {
1✔
160
            return false;
1✔
161
        }
162
        Long position = positionOptional.get();
1✔
163
        return moveToPosition(position);
1✔
164
    }
165

166
    private Optional<Long> findPosition(final long timestamp) {
167
        try (ExcerptTailer excerptTailer = initExcerptTailer()) {
1✔
168
            while (true) {
169
                InternalReadMessage internalReadMessage = new InternalReadMessage();
1✔
170
                boolean resultResult = excerptTailer.readBytes(internalReadMessage);
1✔
171
                if (resultResult) {
1✔
172
                    if (internalReadMessage.getWriteTime() >= timestamp) {
1✔
173
                        return Optional.of(excerptTailer.lastReadIndex());
1✔
174
                    }
175
                } else {
176
                    return Optional.empty();
1✔
177
                }
178
            }
1✔
179
        }
1✔
180
    }
181

182
    public long getAckedReadPosition() {
183
        return ackedReadPosition;
1✔
184
    }
185

186
    public boolean isClosed() {
187
        return isClosed;
1✔
188
    }
189

190
    private void stopReadToCache() {
191
        this.isReadToCacheRunning = false;
1✔
192
    }
1✔
193

194
    private void startReadToCache() {
195
        this.isReadToCacheRunning = true;
1✔
196
        readCacheExecutor.execute(this::readToCache);
1✔
197
    }
1✔
198

199
    private void readToCache() {
200
        try {
201
            internalLock.lock();
1✔
202
            long pullInterval = config.getPullInterval();
1✔
203
            while (this.isReadToCacheRunning && !isClosing) {
1✔
204
                try {
205
                    // when cache is full, sleep
206
                    if (this.messageCache.remainingCapacity() == 0) {
1✔
207
                        TimeUnit.MILLISECONDS.sleep(pullInterval);
1✔
208
                        continue;
1✔
209
                    }
210
                    ExcerptTailer tailer = tailerThreadLocal.get();
1✔
211
                    InternalReadMessage internalReadMessage = new InternalReadMessage();
1✔
212
                    boolean readResult = tailer.readBytes(internalReadMessage);
1✔
213
                    if (!readResult) {
1✔
214
                        TimeUnit.MILLISECONDS.sleep(pullInterval);
1✔
215
                        continue;
1✔
216
                    }
217
                    long lastedReadIndex = tailer.lastReadIndex();
1✔
218
                    QueueMessage queueMessage = new QueueMessage(
1✔
219
                            positionVersion.get(),
1✔
220
                            lastedReadIndex,
221
                            internalReadMessage.getContent(),
1✔
222
                            internalReadMessage.getWriteTime());
1✔
223
                    boolean offerResult = this.messageCache.offer(queueMessage, pullInterval, TimeUnit.MILLISECONDS);
1✔
224
                    if (!offerResult) {
1✔
225
                        // if offer failed, move to last read position
NEW
226
                        tailer.moveToIndex(lastedReadIndex);
×
227
                    }
228
                } catch (InterruptedException e) {
×
229
                    Thread.currentThread().interrupt();
×
230
                }
1✔
231
            }
232
        } finally {
233
            internalLock.unlock();
1✔
234
        }
1✔
235
    }
1✔
236

237

238
    private ExcerptTailer initExcerptTailer() {
239
        ExcerptTailer tailer = queue.createTailer();
1✔
240
        Optional<Long> lastPositionOptional = getLastPosition();
1✔
241
        if (lastPositionOptional.isPresent()) {
1✔
242
            Long position = lastPositionOptional.get();
1✔
243
            long beginPosition = position + 1;
1✔
244
            tailer.moveToIndex(beginPosition);
1✔
245
        }
246
        return tailer;
1✔
247
    }
248

249
    /// region position
250

251
    private void flushPosition() {
252
        if (ackedReadPosition != -1) {
1✔
253
            setLastPosition(this.ackedReadPosition);
1✔
254
        }
255
    }
1✔
256

257
    private Optional<Long> getLastPosition() {
258
        Long position = positionStore.get(config.getConsumerId());
1✔
259
        if (position == null) {
1✔
260
            return Optional.empty();
1✔
261
        }
262
        return Optional.of(position);
1✔
263
    }
264

265
    private void setLastPosition(long position) {
266
        positionStore.put(config.getConsumerId(), position);
1✔
267
    }
1✔
268

269
    /// endregion
270

271
    @Override
272
    public synchronized void close() {
273
        isClosing = true;
1✔
274
        this.internalLock.lock();
1✔
275
        stopReadToCache();
1✔
276
        this.internalLock.unlock();
1✔
277

278
        positionStore.close();
1✔
279
        scheduler.shutdown();
1✔
280
        readCacheExecutor.shutdown();
1✔
281
        try {
282
            if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
283
                scheduler.shutdownNow();
×
284
            }
285
            if (!readCacheExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
286
                readCacheExecutor.shutdownNow();
×
287
            }
288
        } catch (InterruptedException e) {
×
289
            scheduler.shutdownNow();
×
290
            readCacheExecutor.shutdownNow();
×
291
            Thread.currentThread().interrupt();
×
292
        }
1✔
293
        tailerThreadLocal.remove();
1✔
294
        queue.close();
1✔
295
        isClosed = true;
1✔
296

297
    }
1✔
298
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc