• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #42

03 Feb 2025 03:03AM UTC coverage: 91.02% (-0.9%) from 91.958%
#42

push

wz2cool
change locker

36 of 42 new or added lines in 2 files covered. (85.71%)

5 existing lines in 1 file now uncovered.

598 of 657 relevant lines covered (91.02%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.22
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleConsumer.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IConsumer;
4
import com.github.wz2cool.localqueue.event.CloseListener;
5
import com.github.wz2cool.localqueue.helper.ChronicleQueueHelper;
6
import com.github.wz2cool.localqueue.model.config.SimpleConsumerConfig;
7
import com.github.wz2cool.localqueue.model.enums.ConsumeFromWhere;
8
import com.github.wz2cool.localqueue.model.message.InternalReadMessage;
9
import com.github.wz2cool.localqueue.model.message.QueueMessage;
10
import net.openhft.chronicle.core.time.TimeProvider;
11
import net.openhft.chronicle.queue.ChronicleQueue;
12
import net.openhft.chronicle.queue.ExcerptTailer;
13
import net.openhft.chronicle.queue.RollCycle;
14
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
15
import org.slf4j.Logger;
16
import org.slf4j.LoggerFactory;
17

18
import java.util.ArrayList;
19
import java.util.List;
20
import java.util.Objects;
21
import java.util.Optional;
22
import java.util.concurrent.*;
23
import java.util.concurrent.atomic.AtomicBoolean;
24
import java.util.concurrent.atomic.AtomicInteger;
25
import java.util.concurrent.atomic.AtomicLong;
26

27
/**
28
 * simple consumer
29
 *
30
 * @author frank
31
 */
32
public class SimpleConsumer implements IConsumer {
33

34
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
1✔
35
    private final RollCycle defaultRollCycle;
36
    private final TimeProvider timeProvider;
37
    private final SimpleConsumerConfig config;
38
    private final PositionStore positionStore;
39
    private final SingleChronicleQueue queue;
40
    // should only call by readCacheExecutor
41
    private final ExcerptTailer mainTailer;
42
    private final ExecutorService readCacheExecutor = Executors.newSingleThreadExecutor();
1✔
43
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1✔
44
    private final LinkedBlockingQueue<QueueMessage> messageCache;
45
    private final ConcurrentLinkedQueue<CloseListener> closeListenerList = new ConcurrentLinkedQueue<>();
1✔
46
    private final AtomicLong ackedReadPosition = new AtomicLong(-1);
1✔
47
    private final AtomicBoolean isReadToCacheRunning = new AtomicBoolean(true);
1✔
48
    private final AtomicBoolean isClosing = new AtomicBoolean(false);
1✔
49
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
1✔
50
    private final Object closeLocker = new Object();
1✔
51
    private final AtomicInteger positionVersion = new AtomicInteger(0);
1✔
52

53
    /**
54
     * constructor
55
     *
56
     * @param config the config of consumer
57
     */
58
    public SimpleConsumer(final SimpleConsumerConfig config) {
1✔
59
        this.config = config;
1✔
60
        this.timeProvider = ChronicleQueueHelper.getTimeProvider(config.getTimeZone());
1✔
61
        this.messageCache = new LinkedBlockingQueue<>(config.getCacheSize());
1✔
62
        this.positionStore = new PositionStore(config.getPositionFile());
1✔
63
        this.defaultRollCycle = ChronicleQueueHelper.getRollCycle(config.getRollCycleType());
1✔
64
        this.queue = ChronicleQueue.singleBuilder(config.getDataDir())
1✔
65
                .timeProvider(timeProvider)
1✔
66
                .rollCycle(defaultRollCycle)
1✔
67
                .build();
1✔
68
        this.mainTailer = initMainTailer();
1✔
69
        startReadToCache();
1✔
70
        scheduler.scheduleAtFixedRate(this::flushPosition, 0, config.getFlushPositionInterval(), TimeUnit.MILLISECONDS);
1✔
71
    }
1✔
72

73
    @Override
74
    public synchronized QueueMessage take() throws InterruptedException {
75
        return this.messageCache.take();
1✔
76
    }
77

78
    @Override
79
    public synchronized List<QueueMessage> batchTake(int maxBatchSize) throws InterruptedException {
80
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
81
        QueueMessage take = this.messageCache.take();
1✔
82
        result.add(take);
1✔
83
        this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
84
        return result;
1✔
85
    }
86

87
    @Override
88
    public synchronized Optional<QueueMessage> take(long timeout, TimeUnit unit) throws InterruptedException {
89
        QueueMessage message = this.messageCache.poll(timeout, unit);
1✔
90
        return Optional.ofNullable(message);
1✔
91
    }
92

93
    @Override
94
    public synchronized List<QueueMessage> batchTake(int maxBatchSize, long timeout, TimeUnit unit) throws InterruptedException {
95
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
96
        QueueMessage poll = this.messageCache.poll(timeout, unit);
1✔
97
        if (Objects.nonNull(poll)) {
1✔
98
            result.add(poll);
1✔
99
            this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
100
        }
101
        return result;
1✔
102
    }
103

104
    @Override
105
    public synchronized Optional<QueueMessage> poll() {
106
        QueueMessage message = this.messageCache.poll();
1✔
107
        return Optional.ofNullable(message);
1✔
108
    }
109

110
    @Override
111
    public synchronized List<QueueMessage> batchPoll(int maxBatchSize) {
112
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
113
        this.messageCache.drainTo(result, maxBatchSize);
1✔
114
        return result;
1✔
115
    }
116

117
    @Override
118
    public synchronized void ack(final QueueMessage message) {
119
        if (Objects.isNull(message)) {
1✔
120
            return;
1✔
121
        }
122

123
        if (message.getPositionVersion() != positionVersion.get()) {
1✔
124
            return;
×
125
        }
126
        ackedReadPosition.set(message.getPosition());
1✔
127
    }
1✔
128

129
    @Override
130
    public synchronized void ack(final List<QueueMessage> messages) {
131
        if (Objects.isNull(messages) || messages.isEmpty()) {
1✔
132
            return;
1✔
133
        }
134
        QueueMessage lastOne = messages.get(messages.size() - 1);
1✔
135
        if (lastOne.getPositionVersion() != positionVersion.get()) {
1✔
136
            return;
×
137
        }
138
        ackedReadPosition.set(lastOne.getPosition());
1✔
139
    }
1✔
140

141
    @Override
142
    public boolean moveToPosition(final long position) {
143
        logDebug("[moveToPosition] start");
1✔
144
        stopReadToCache();
1✔
145
        try {
146
            return moveToPositionInternal(position);
1✔
147
        } finally {
148
            startReadToCache();
1✔
149
            logDebug("[moveToPosition] end");
1✔
150
        }
×
151
    }
152

153
    @Override
154
    public boolean moveToTimestamp(final long timestamp) {
155
        logDebug("[moveToTimestamp] start, timestamp: {}", timestamp);
1✔
156
        stopReadToCache();
1✔
157
        try {
158
            Optional<Long> positionOptional = findPosition(timestamp);
1✔
159
            if (!positionOptional.isPresent()) {
1✔
160
                return false;
1✔
161
            }
162
            Long position = positionOptional.get();
1✔
163
            return moveToPositionInternal(position);
1✔
164
        } finally {
165
            startReadToCache();
1✔
166
            logDebug("[moveToTimestamp] end");
1✔
167
        }
×
168
    }
169

170
    @Override
171
    public Optional<QueueMessage> get(final String messageKey, long searchTimestampStart, long searchTimestampEnd) {
172
        if (messageKey == null || messageKey.isEmpty()) {
1✔
173
            return Optional.empty();
1✔
174
        }
175
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
176
            moveToNearByTimestamp(tailer, searchTimestampStart);
1✔
177
            while (true) {
178
                // for performance, ignore read content.
179
                InternalReadMessage internalReadMessage = new InternalReadMessage(true);
1✔
180
                boolean readResult = tailer.readBytes(internalReadMessage);
1✔
181
                if (!readResult) {
1✔
182
                    return Optional.empty();
1✔
183
                }
184
                if (internalReadMessage.getWriteTime() < searchTimestampStart) {
1✔
185
                    continue;
1✔
186
                }
187
                if (internalReadMessage.getWriteTime() > searchTimestampEnd) {
1✔
188
                    return Optional.empty();
×
189
                }
190
                boolean moveToResult = tailer.moveToIndex(tailer.lastReadIndex());
1✔
191
                if (!moveToResult) {
1✔
192
                    return Optional.empty();
×
193
                }
194
                internalReadMessage = new InternalReadMessage();
1✔
195
                readResult = tailer.readBytes(internalReadMessage);
1✔
196
                if (!readResult) {
1✔
197
                    return Optional.empty();
×
198
                }
199
                QueueMessage queueMessage = toQueueMessage(internalReadMessage, tailer.lastReadIndex());
1✔
200
                if (Objects.equals(messageKey, queueMessage.getMessageKey())) {
1✔
201
                    return Optional.of(queueMessage);
1✔
202
                }
UNCOV
203
            }
×
204
        }
1✔
205
    }
206

207
    private QueueMessage toQueueMessage(final InternalReadMessage internalReadMessage, final long position) {
208
        return new QueueMessage(
1✔
209
                internalReadMessage.getMessageKey(),
1✔
210
                positionVersion.get(),
1✔
211
                position,
212
                internalReadMessage.getContent(),
1✔
213
                internalReadMessage.getWriteTime());
1✔
214
    }
215

216
    private boolean moveToPositionInternal(final long position) {
217
        return CompletableFuture.supplyAsync(() -> {
1✔
218
            synchronized (closeLocker) {
1✔
219
                try {
220
                    if (isClosing.get()) {
1✔
NEW
221
                        logDebug("[moveToPositionInternal] consumer is closing");
×
NEW
222
                        return false;
×
223
                    }
224
                    logDebug("[moveToPositionInternal] start, position: {}", position);
1✔
225
                    boolean moveToResult = mainTailer.moveToIndex(position);
1✔
226
                    if (moveToResult) {
1✔
227
                        positionVersion.incrementAndGet();
1✔
228
                        messageCache.clear();
1✔
229
                        ackedReadPosition.set(position);
1✔
230
                    }
231
                    return moveToResult;
1✔
232
                } finally {
233
                    logDebug("[moveToPositionInternal] end");
1✔
UNCOV
234
                }
×
UNCOV
235
            }
×
236
        }, this.readCacheExecutor).join();
1✔
237
    }
238

239
    private Optional<Long> findPosition(final long timestamp) {
240
        logDebug("[findPosition] start, timestamp: {}", timestamp);
1✔
241
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
242
            moveToNearByTimestamp(tailer, timestamp);
1✔
243
            while (true) {
244
                InternalReadMessage internalReadMessage = new InternalReadMessage(true);
1✔
245
                boolean resultResult = tailer.readBytes(internalReadMessage);
1✔
246
                if (resultResult) {
1✔
247
                    if (internalReadMessage.getWriteTime() >= timestamp) {
1✔
248
                        return Optional.of(tailer.lastReadIndex());
1✔
249
                    }
250
                } else {
251
                    return Optional.empty();
1✔
252
                }
253
            }
1✔
254
        } finally {
1✔
255
            logDebug("[findPosition] end");
1✔
256
        }
×
257
    }
258

259
    public long getAckedReadPosition() {
260
        return ackedReadPosition.get();
1✔
261
    }
262

263
    @Override
264
    public boolean isClosed() {
265
        return isClosed.get();
1✔
266
    }
267

268
    private void stopReadToCache() {
269
        isReadToCacheRunning.set(false);
1✔
270
    }
1✔
271

272
    private void startReadToCache() {
273
        this.isReadToCacheRunning.set(true);
1✔
274
        readCacheExecutor.execute(this::readToCache);
1✔
275
    }
1✔
276

277
    private void readToCache() {
278
        try {
279
            logDebug("[readToCache] start");
1✔
280
            long pullInterval = config.getPullInterval();
1✔
281
            long fillCacheInterval = config.getFillCacheInterval();
1✔
282
            while (isReadToCacheRunning.get() && !isClosing.get()) {
1✔
283
                synchronized (closeLocker) {
1✔
284
                    try {
285
                        if (isClosing.get()) {
1✔
286
                            logDebug("[readToCache] consumer is closing");
1✔
287
                            return;
1✔
288
                        }
289
                        InternalReadMessage internalReadMessage = new InternalReadMessage();
1✔
290
                        boolean readResult = mainTailer.readBytes(internalReadMessage);
1✔
291
                        if (!readResult) {
1✔
292
                            TimeUnit.MILLISECONDS.sleep(pullInterval);
1✔
293
                            continue;
1✔
294
                        }
295
                        long lastedReadIndex = mainTailer.lastReadIndex();
1✔
296
                        QueueMessage queueMessage = toQueueMessage(internalReadMessage, lastedReadIndex);
1✔
297
                        boolean offerResult = this.messageCache.offer(queueMessage, fillCacheInterval, TimeUnit.MILLISECONDS);
1✔
298
                        if (!offerResult) {
1✔
299
                            // if offer failed, move to last read position
300
                            mainTailer.moveToIndex(lastedReadIndex);
1✔
301
                        }
NEW
302
                    } catch (InterruptedException e) {
×
NEW
303
                        Thread.currentThread().interrupt();
×
304
                    }
1✔
305
                }
1✔
306
            }
307
        } finally {
308
            logDebug("[readToCache] end");
1✔
309
        }
1✔
310
    }
1✔
311

312
    private ExcerptTailer initMainTailer() {
313
        return CompletableFuture.supplyAsync(this::initMainTailerInternal, this.readCacheExecutor).join();
1✔
314
    }
315

316
    private ExcerptTailer initMainTailerInternal() {
317
        try {
318
            logDebug("[initExcerptTailerInternal] start");
1✔
319
            ExcerptTailer tailer = queue.createTailer();
1✔
320
            Optional<Long> lastPositionOptional = getLastPosition();
1✔
321
            if (lastPositionOptional.isPresent()) {
1✔
322
                Long position = lastPositionOptional.get();
1✔
323
                long beginPosition = position + 1;
1✔
324
                tailer.moveToIndex(beginPosition);
1✔
325
                logDebug("[initExcerptTailerInternal] find last position and move to position: {}", beginPosition);
1✔
326
            } else {
1✔
327
                ConsumeFromWhere consumeFromWhere = this.config.getConsumeFromWhere();
1✔
328
                if (consumeFromWhere == ConsumeFromWhere.LAST) {
1✔
329
                    tailer.toEnd();
1✔
330
                    logDebug("[initExcerptTailerInternal] move to end");
1✔
331
                } else if (consumeFromWhere == ConsumeFromWhere.FIRST) {
1✔
332
                    tailer.toStart();
1✔
333
                    logDebug("[initExcerptTailerInternal] move to start");
1✔
334
                }
335
            }
336
            return tailer;
1✔
337
        } finally {
338
            logDebug("[initExcerptTailer] end");
1✔
339
        }
×
340

341
    }
342

343
    /// region position
344

345
    private void flushPosition() {
346
        if (ackedReadPosition.get() != -1) {
1✔
347
            setLastPosition(this.ackedReadPosition.get());
1✔
348
        }
349
    }
1✔
350

351
    private Optional<Long> getLastPosition() {
352
        return positionStore.get(config.getConsumerId());
1✔
353
    }
354

355
    private void setLastPosition(long position) {
356
        positionStore.put(config.getConsumerId(), position);
1✔
357
    }
1✔
358

359
    /// endregion
360

361
    @Override
362
    public void close() {
363
        synchronized (closeLocker) {
1✔
364
            try {
365
                logDebug("[close] start");
1✔
366
                if (isClosing.get()) {
1✔
367
                    logDebug("[close] is closing");
1✔
368
                    return;
1✔
369
                }
370
                isClosing.set(true);
1✔
371
                stopReadToCache();
1✔
372
                if (!positionStore.isClosed()) {
1✔
373
                    positionStore.close();
1✔
374
                }
375
                scheduler.shutdown();
1✔
376
                readCacheExecutor.shutdown();
1✔
377
                try {
378
                    if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
379
                        scheduler.shutdownNow();
×
380
                    }
381
                    if (!readCacheExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
382
                        readCacheExecutor.shutdownNow();
1✔
383
                    }
384
                } catch (InterruptedException e) {
×
385
                    scheduler.shutdownNow();
×
386
                    readCacheExecutor.shutdownNow();
×
387
                    Thread.currentThread().interrupt();
×
388
                }
1✔
389
                if (!queue.isClosed()) {
1✔
390
                    queue.close();
1✔
391
                }
392

393
                for (CloseListener closeListener : closeListenerList) {
1✔
394
                    closeListener.onClose();
1✔
395
                }
1✔
396
                isClosed.set(true);
1✔
397
            } finally {
398
                logDebug("[close] end");
1✔
399
            }
1✔
400
        }
1✔
401
    }
1✔
402

403
    private void moveToNearByTimestamp(ExcerptTailer tailer, long timestamp) {
404
        int expectedCycle = ChronicleQueueHelper.cycle(defaultRollCycle, timeProvider, timestamp);
1✔
405
        int currentCycle = tailer.cycle();
1✔
406
        if (currentCycle != expectedCycle) {
1✔
UNCOV
407
            boolean moveToCycleResult = tailer.moveToCycle(expectedCycle);
×
UNCOV
408
            logDebug("[moveToNearByTimestamp] moveToCycleResult: {}", moveToCycleResult);
×
409
        }
410
    }
1✔
411

412
    @Override
413
    public void addCloseListener(CloseListener listener) {
414
        closeListenerList.add(listener);
1✔
415
    }
1✔
416

417
    // region logger
418

419
    private void logDebug(String format) {
420
        if (logger.isDebugEnabled()) {
1✔
421
            logger.debug(format);
×
422
        }
423
    }
1✔
424

425
    private void logDebug(String format, Object arg) {
426
        if (logger.isDebugEnabled()) {
1✔
427
            logger.debug(format, arg);
×
428
        }
429
    }
1✔
430

431
    // endregion
432
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc