• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #40

02 Feb 2025 02:03PM UTC coverage: 91.757% (-0.09%) from 91.85%
#40

push

wz2cool
use closing

9 of 13 new or added lines in 4 files covered. (69.23%)

3 existing lines in 3 files now uncovered.

590 of 643 relevant lines covered (91.76%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.52
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleConsumer.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IConsumer;
4
import com.github.wz2cool.localqueue.event.CloseListener;
5
import com.github.wz2cool.localqueue.helper.ChronicleQueueHelper;
6
import com.github.wz2cool.localqueue.model.config.SimpleConsumerConfig;
7
import com.github.wz2cool.localqueue.model.enums.ConsumeFromWhere;
8
import com.github.wz2cool.localqueue.model.message.InternalReadMessage;
9
import com.github.wz2cool.localqueue.model.message.QueueMessage;
10
import net.openhft.chronicle.core.time.TimeProvider;
11
import net.openhft.chronicle.queue.ChronicleQueue;
12
import net.openhft.chronicle.queue.ExcerptTailer;
13
import net.openhft.chronicle.queue.RollCycle;
14
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
15
import org.slf4j.Logger;
16
import org.slf4j.LoggerFactory;
17

18
import java.util.ArrayList;
19
import java.util.List;
20
import java.util.Objects;
21
import java.util.Optional;
22
import java.util.concurrent.*;
23
import java.util.concurrent.atomic.AtomicInteger;
24
import java.util.concurrent.locks.Lock;
25
import java.util.concurrent.locks.ReentrantLock;
26

27
/**
28
 * simple consumer
29
 *
30
 * @author frank
31
 */
32
public class SimpleConsumer implements IConsumer {
33

34
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
1✔
35
    private final RollCycle defaultRollCycle;
36
    private final TimeProvider timeProvider;
37
    private final SimpleConsumerConfig config;
38
    private final PositionStore positionStore;
39
    private final SingleChronicleQueue queue;
40
    // should only call by readCacheExecutor
41
    private final ExcerptTailer mainTailer;
42
    private final ExecutorService readCacheExecutor = Executors.newSingleThreadExecutor();
1✔
43
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1✔
44
    private final LinkedBlockingQueue<QueueMessage> messageCache;
45
    private final ConcurrentLinkedQueue<CloseListener> closeListenerList = new ConcurrentLinkedQueue<>();
1✔
46
    private volatile long ackedReadPosition = -1;
1✔
47
    private volatile boolean isReadToCacheRunning = true;
1✔
48
    private volatile boolean isClosing = false;
1✔
49
    private volatile boolean isClosed = false;
1✔
50
    private final AtomicInteger positionVersion = new AtomicInteger(0);
1✔
51
    private final Lock internalLock = new ReentrantLock();
1✔
52

53
    /**
54
     * constructor
55
     *
56
     * @param config the config of consumer
57
     */
58
    public SimpleConsumer(final SimpleConsumerConfig config) {
1✔
59
        this.config = config;
1✔
60
        this.timeProvider = ChronicleQueueHelper.getTimeProvider(config.getTimeZone());
1✔
61
        this.messageCache = new LinkedBlockingQueue<>(config.getCacheSize());
1✔
62
        this.positionStore = new PositionStore(config.getPositionFile());
1✔
63
        this.defaultRollCycle = ChronicleQueueHelper.getRollCycle(config.getRollCycleType());
1✔
64
        this.queue = ChronicleQueue.singleBuilder(config.getDataDir())
1✔
65
                .timeProvider(timeProvider)
1✔
66
                .rollCycle(defaultRollCycle)
1✔
67
                .build();
1✔
68
        this.mainTailer = initMainTailer();
1✔
69
        startReadToCache();
1✔
70
        scheduler.scheduleAtFixedRate(this::flushPosition, 0, config.getFlushPositionInterval(), TimeUnit.MILLISECONDS);
1✔
71
    }
1✔
72

73
    @Override
74
    public synchronized QueueMessage take() throws InterruptedException {
75
        return this.messageCache.take();
1✔
76
    }
77

78
    @Override
79
    public synchronized List<QueueMessage> batchTake(int maxBatchSize) throws InterruptedException {
80
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
81
        QueueMessage take = this.messageCache.take();
1✔
82
        result.add(take);
1✔
83
        this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
84
        return result;
1✔
85
    }
86

87
    @Override
88
    public synchronized Optional<QueueMessage> take(long timeout, TimeUnit unit) throws InterruptedException {
89
        QueueMessage message = this.messageCache.poll(timeout, unit);
1✔
90
        return Optional.ofNullable(message);
1✔
91
    }
92

93
    @Override
94
    public synchronized List<QueueMessage> batchTake(int maxBatchSize, long timeout, TimeUnit unit) throws InterruptedException {
95
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
96
        QueueMessage poll = this.messageCache.poll(timeout, unit);
1✔
97
        if (Objects.nonNull(poll)) {
1✔
98
            result.add(poll);
1✔
99
            this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
100
        }
101
        return result;
1✔
102
    }
103

104
    @Override
105
    public synchronized Optional<QueueMessage> poll() {
106
        QueueMessage message = this.messageCache.poll();
1✔
107
        return Optional.ofNullable(message);
1✔
108
    }
109

110
    @Override
111
    public synchronized List<QueueMessage> batchPoll(int maxBatchSize) {
112
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
113
        this.messageCache.drainTo(result, maxBatchSize);
1✔
114
        return result;
1✔
115
    }
116

117
    @Override
118
    public synchronized void ack(final QueueMessage message) {
119
        if (Objects.isNull(message)) {
1✔
120
            return;
1✔
121
        }
122

123
        if (message.getPositionVersion() != positionVersion.get()) {
1✔
124
            return;
×
125
        }
126

127
        this.ackedReadPosition = message.getPosition();
1✔
128
    }
1✔
129

130
    @Override
131
    public synchronized void ack(final List<QueueMessage> messages) {
132
        if (Objects.isNull(messages) || messages.isEmpty()) {
1✔
133
            return;
1✔
134
        }
135
        QueueMessage lastOne = messages.get(messages.size() - 1);
1✔
136
        if (lastOne.getPositionVersion() != positionVersion.get()) {
1✔
137
            return;
×
138
        }
139
        this.ackedReadPosition = lastOne.getPosition();
1✔
140
    }
1✔
141

142
    @Override
143
    public boolean moveToPosition(final long position) {
144
        logDebug("[moveToPosition] start");
1✔
145
        stopReadToCache();
1✔
146
        try {
147
            internalLock.lock();
1✔
148
            return moveToPositionInternal(position);
1✔
149
        } finally {
150
            internalLock.unlock();
1✔
151
            startReadToCache();
1✔
152
            logDebug("[moveToPosition] end");
1✔
153
        }
×
154
    }
155

156
    @Override
157
    public boolean moveToTimestamp(final long timestamp) {
158
        logDebug("[moveToTimestamp] start, timestamp: {}", timestamp);
1✔
159
        stopReadToCache();
1✔
160
        try {
161
            internalLock.lock();
1✔
162
            Optional<Long> positionOptional = findPosition(timestamp);
1✔
163
            if (!positionOptional.isPresent()) {
1✔
164
                return false;
1✔
165
            }
166
            Long position = positionOptional.get();
1✔
167
            return moveToPositionInternal(position);
1✔
168
        } finally {
169
            internalLock.unlock();
1✔
170
            startReadToCache();
1✔
171
            logDebug("[moveToTimestamp] end");
1✔
172
        }
×
173
    }
174

175
    @Override
176
    public Optional<QueueMessage> get(final String messageKey) {
177
        return get(messageKey, 0L, Long.MAX_VALUE);
1✔
178
    }
179

180
    @Override
181
    public Optional<QueueMessage> get(final String messageKey, long searchTimestampStart, long searchTimestampEnd) {
182
        if (messageKey == null || messageKey.isEmpty()) {
1✔
183
            return Optional.empty();
1✔
184
        }
185
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
186
            moveToNearByTimestamp(tailer, searchTimestampStart);
1✔
187
            while (true) {
188
                // for performance, ignore read content.
189
                InternalReadMessage internalReadMessage = new InternalReadMessage(true);
1✔
190
                boolean readResult = tailer.readBytes(internalReadMessage);
1✔
191
                if (!readResult) {
1✔
192
                    return Optional.empty();
1✔
193
                }
194
                if (internalReadMessage.getWriteTime() < searchTimestampStart) {
1✔
195
                    continue;
1✔
196
                }
197
                if (internalReadMessage.getWriteTime() > searchTimestampEnd) {
1✔
198
                    return Optional.empty();
×
199
                }
200
                boolean moveToResult = tailer.moveToIndex(tailer.lastReadIndex());
1✔
201
                if (!moveToResult) {
1✔
202
                    return Optional.empty();
×
203
                }
204
                internalReadMessage = new InternalReadMessage();
1✔
205
                readResult = tailer.readBytes(internalReadMessage);
1✔
206
                if (!readResult) {
1✔
207
                    return Optional.empty();
×
208
                }
209
                QueueMessage queueMessage = toQueueMessage(internalReadMessage, tailer.lastReadIndex());
1✔
210
                if (Objects.equals(messageKey, queueMessage.getMessageKey())) {
1✔
211
                    return Optional.of(queueMessage);
1✔
212
                }
213
            }
1✔
214
        }
1✔
215
    }
216

217
    private QueueMessage toQueueMessage(final InternalReadMessage internalReadMessage, final long position) {
218
        return new QueueMessage(
1✔
219
                internalReadMessage.getMessageKey(),
1✔
220
                positionVersion.get(),
1✔
221
                position,
222
                internalReadMessage.getContent(),
1✔
223
                internalReadMessage.getWriteTime());
1✔
224
    }
225

226
    private boolean moveToPositionInternal(final long position) {
227
        return CompletableFuture.supplyAsync(() -> {
1✔
228
            try {
229
                logDebug("[moveToPositionInternal] start, position: {}", position);
1✔
230
                boolean moveToResult = mainTailer.moveToIndex(position);
1✔
231
                if (moveToResult) {
1✔
232
                    positionVersion.incrementAndGet();
1✔
233
                    messageCache.clear();
1✔
234
                    this.ackedReadPosition = position;
1✔
235
                }
236
                return moveToResult;
1✔
237
            } finally {
238
                logDebug("[moveToPositionInternal] end");
1✔
239
            }
×
240
        }, this.readCacheExecutor).join();
1✔
241
    }
242

243
    private Optional<Long> findPosition(final long timestamp) {
244
        logDebug("[findPosition] start, timestamp: {}", timestamp);
1✔
245
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
246
            moveToNearByTimestamp(tailer, timestamp);
1✔
247
            while (true) {
248
                InternalReadMessage internalReadMessage = new InternalReadMessage(true);
1✔
249
                boolean resultResult = tailer.readBytes(internalReadMessage);
1✔
250
                if (resultResult) {
1✔
251
                    if (internalReadMessage.getWriteTime() >= timestamp) {
1✔
252
                        return Optional.of(tailer.lastReadIndex());
1✔
253
                    }
254
                } else {
255
                    return Optional.empty();
1✔
256
                }
257
            }
1✔
258
        } finally {
1✔
259
            logDebug("[findPosition] end");
1✔
260
        }
×
261
    }
262

263
    public long getAckedReadPosition() {
264
        return ackedReadPosition;
1✔
265
    }
266

267
    @Override
268
    public boolean isClosed() {
269
        return isClosed;
1✔
270
    }
271

272
    private void stopReadToCache() {
273
        this.isReadToCacheRunning = false;
1✔
274
    }
1✔
275

276
    private void startReadToCache() {
277
        this.isReadToCacheRunning = true;
1✔
278
        readCacheExecutor.execute(this::readToCache);
1✔
279
    }
1✔
280

281
    private void readToCache() {
282
        try {
283
            logDebug("[readToCache] start");
1✔
284
            internalLock.lock();
1✔
285
            long pullInterval = config.getPullInterval();
1✔
286
            long fillCacheInterval = config.getFillCacheInterval();
1✔
287
            while (this.isReadToCacheRunning && !isClosing) {
1✔
288
                try {
289
                    InternalReadMessage internalReadMessage = new InternalReadMessage();
1✔
290
                    boolean readResult = mainTailer.readBytes(internalReadMessage);
1✔
291
                    if (!readResult) {
1✔
292
                        TimeUnit.MILLISECONDS.sleep(pullInterval);
1✔
293
                        continue;
1✔
294
                    }
295
                    long lastedReadIndex = mainTailer.lastReadIndex();
1✔
296
                    QueueMessage queueMessage = toQueueMessage(internalReadMessage, lastedReadIndex);
1✔
297
                    boolean offerResult = this.messageCache.offer(queueMessage, fillCacheInterval, TimeUnit.MILLISECONDS);
1✔
298
                    if (!offerResult) {
1✔
299
                        // if offer failed, move to last read position
300
                        mainTailer.moveToIndex(lastedReadIndex);
1✔
301
                    }
302
                } catch (InterruptedException e) {
×
303
                    Thread.currentThread().interrupt();
×
304
                }
1✔
305
            }
306
        } finally {
307
            internalLock.unlock();
1✔
308
            logDebug("[readToCache] end");
1✔
309
        }
1✔
310
    }
1✔
311

312
    private ExcerptTailer initMainTailer() {
313
        return CompletableFuture.supplyAsync(this::initMainTailerInternal, this.readCacheExecutor).join();
1✔
314
    }
315

316
    private ExcerptTailer initMainTailerInternal() {
317
        try {
318
            logDebug("[initExcerptTailerInternal] start");
1✔
319
            ExcerptTailer tailer = queue.createTailer();
1✔
320
            Optional<Long> lastPositionOptional = getLastPosition();
1✔
321
            if (lastPositionOptional.isPresent()) {
1✔
322
                Long position = lastPositionOptional.get();
1✔
323
                long beginPosition = position + 1;
1✔
324
                tailer.moveToIndex(beginPosition);
1✔
325
                logDebug("[initExcerptTailerInternal] find last position and move to position: {}", beginPosition);
1✔
326
            } else {
1✔
327
                ConsumeFromWhere consumeFromWhere = this.config.getConsumeFromWhere();
1✔
328
                if (consumeFromWhere == ConsumeFromWhere.LAST) {
1✔
329
                    tailer.toEnd();
1✔
330
                    logDebug("[initExcerptTailerInternal] move to end");
1✔
331
                } else if (consumeFromWhere == ConsumeFromWhere.FIRST) {
1✔
332
                    tailer.toStart();
1✔
333
                    logDebug("[initExcerptTailerInternal] move to start");
1✔
334
                }
335
            }
336
            return tailer;
1✔
337
        } finally {
338
            logDebug("[initExcerptTailer] end");
1✔
339
        }
×
340

341
    }
342

343
    /// region position
344

345
    private void flushPosition() {
346
        if (ackedReadPosition != -1) {
1✔
347
            setLastPosition(this.ackedReadPosition);
1✔
348
        }
349
    }
1✔
350

351
    private Optional<Long> getLastPosition() {
352
        Long position = positionStore.get(config.getConsumerId());
1✔
353
        if (position == null) {
1✔
354
            return Optional.empty();
1✔
355
        }
356
        return Optional.of(position);
1✔
357
    }
358

359
    private void setLastPosition(long position) {
360
        positionStore.put(config.getConsumerId(), position);
1✔
361
    }
1✔
362

363
    /// endregion
364

365
    @Override
366
    public void close() {
367
        try {
368
            logDebug("[close] start");
1✔
369
            if (isClosing) {
1✔
NEW
370
                logDebug("[close] is closing");
×
UNCOV
371
                return;
×
372
            }
373
            isClosing = true;
1✔
374
            this.internalLock.lock();
1✔
375
            stopReadToCache();
1✔
376
            this.internalLock.unlock();
1✔
377
            if (!positionStore.isClosed()) {
1✔
378
                positionStore.close();
1✔
379
            }
380
            scheduler.shutdown();
1✔
381
            readCacheExecutor.shutdown();
1✔
382
            try {
383
                if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
384
                    scheduler.shutdownNow();
×
385
                }
386
                if (!readCacheExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
387
                    readCacheExecutor.shutdownNow();
×
388
                }
389
            } catch (InterruptedException e) {
×
390
                scheduler.shutdownNow();
×
391
                readCacheExecutor.shutdownNow();
×
392
                Thread.currentThread().interrupt();
×
393
            }
1✔
394
            if (!queue.isClosed()) {
1✔
395
                queue.close();
1✔
396
            }
397

398
            for (CloseListener closeListener : closeListenerList) {
1✔
399
                closeListener.onClose();
1✔
400
            }
1✔
401
            isClosed = true;
1✔
402
        } finally {
403
            logDebug("[close] end");
1✔
404
        }
1✔
405
    }
1✔
406

407
    private void moveToNearByTimestamp(ExcerptTailer tailer, long timestamp) {
408
        int expectedCycle = ChronicleQueueHelper.cycle(defaultRollCycle, timeProvider, timestamp);
1✔
409
        int currentCycle = tailer.cycle();
1✔
410
        if (currentCycle != expectedCycle) {
1✔
411
            boolean moveToCycleResult = tailer.moveToCycle(expectedCycle);
1✔
412
            logDebug("[moveToNearByTimestamp] moveToCycleResult: {}", moveToCycleResult);
1✔
413
        }
414
    }
1✔
415

416
    @Override
417
    public void addCloseListener(CloseListener listener) {
418
        closeListenerList.add(listener);
1✔
419
    }
1✔
420

421
    // region logger
422

423
    private void logDebug(String format) {
424
        if (logger.isDebugEnabled()) {
1✔
425
            logger.debug(format);
×
426
        }
427
    }
1✔
428

429
    private void logDebug(String format, Object arg) {
430
        if (logger.isDebugEnabled()) {
1✔
431
            logger.debug(format, arg);
×
432
        }
433
    }
1✔
434

435
    // endregion
436
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc