• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #41

02 Feb 2025 03:13PM UTC coverage: 91.958% (+0.2%) from 91.757%
#41

push

wz2cool
use atomic instead of volatile

105 of 124 new or added lines in 4 files covered. (84.68%)

2 existing lines in 2 files now uncovered.

606 of 659 relevant lines covered (91.96%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.38
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleConsumer.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IConsumer;
4
import com.github.wz2cool.localqueue.event.CloseListener;
5
import com.github.wz2cool.localqueue.helper.ChronicleQueueHelper;
6
import com.github.wz2cool.localqueue.model.config.SimpleConsumerConfig;
7
import com.github.wz2cool.localqueue.model.enums.ConsumeFromWhere;
8
import com.github.wz2cool.localqueue.model.message.InternalReadMessage;
9
import com.github.wz2cool.localqueue.model.message.QueueMessage;
10
import net.openhft.chronicle.core.time.TimeProvider;
11
import net.openhft.chronicle.queue.ChronicleQueue;
12
import net.openhft.chronicle.queue.ExcerptTailer;
13
import net.openhft.chronicle.queue.RollCycle;
14
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
15
import org.slf4j.Logger;
16
import org.slf4j.LoggerFactory;
17

18
import java.util.ArrayList;
19
import java.util.List;
20
import java.util.Objects;
21
import java.util.Optional;
22
import java.util.concurrent.*;
23
import java.util.concurrent.atomic.AtomicBoolean;
24
import java.util.concurrent.atomic.AtomicInteger;
25
import java.util.concurrent.atomic.AtomicLong;
26
import java.util.concurrent.locks.Lock;
27
import java.util.concurrent.locks.ReentrantLock;
28

29
/**
30
 * simple consumer
31
 *
32
 * @author frank
33
 */
34
public class SimpleConsumer implements IConsumer {
35

36
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
1✔
37
    private final RollCycle defaultRollCycle;
38
    private final TimeProvider timeProvider;
39
    private final SimpleConsumerConfig config;
40
    private final PositionStore positionStore;
41
    private final SingleChronicleQueue queue;
42
    // should only call by readCacheExecutor
43
    private final ExcerptTailer mainTailer;
44
    private final ExecutorService readCacheExecutor = Executors.newSingleThreadExecutor();
1✔
45
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1✔
46
    private final LinkedBlockingQueue<QueueMessage> messageCache;
47
    private final ConcurrentLinkedQueue<CloseListener> closeListenerList = new ConcurrentLinkedQueue<>();
1✔
48
    private final AtomicLong ackedReadPosition = new AtomicLong(-1);
1✔
49
    private final AtomicBoolean isReadToCacheRunning = new AtomicBoolean(true);
1✔
50
    private final AtomicBoolean isClosing = new AtomicBoolean(false);
1✔
51
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
1✔
52
    private final Object closeLocker = new Object();
1✔
53
    private final AtomicInteger positionVersion = new AtomicInteger(0);
1✔
54
    private final Lock internalLock = new ReentrantLock();
1✔
55

56
    /**
57
     * constructor
58
     *
59
     * @param config the config of consumer
60
     */
61
    public SimpleConsumer(final SimpleConsumerConfig config) {
1✔
62
        this.config = config;
1✔
63
        this.timeProvider = ChronicleQueueHelper.getTimeProvider(config.getTimeZone());
1✔
64
        this.messageCache = new LinkedBlockingQueue<>(config.getCacheSize());
1✔
65
        this.positionStore = new PositionStore(config.getPositionFile());
1✔
66
        this.defaultRollCycle = ChronicleQueueHelper.getRollCycle(config.getRollCycleType());
1✔
67
        this.queue = ChronicleQueue.singleBuilder(config.getDataDir())
1✔
68
                .timeProvider(timeProvider)
1✔
69
                .rollCycle(defaultRollCycle)
1✔
70
                .build();
1✔
71
        this.mainTailer = initMainTailer();
1✔
72
        startReadToCache();
1✔
73
        scheduler.scheduleAtFixedRate(this::flushPosition, 0, config.getFlushPositionInterval(), TimeUnit.MILLISECONDS);
1✔
74
    }
1✔
75

76
    @Override
77
    public synchronized QueueMessage take() throws InterruptedException {
78
        return this.messageCache.take();
1✔
79
    }
80

81
    @Override
82
    public synchronized List<QueueMessage> batchTake(int maxBatchSize) throws InterruptedException {
83
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
84
        QueueMessage take = this.messageCache.take();
1✔
85
        result.add(take);
1✔
86
        this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
87
        return result;
1✔
88
    }
89

90
    @Override
91
    public synchronized Optional<QueueMessage> take(long timeout, TimeUnit unit) throws InterruptedException {
92
        QueueMessage message = this.messageCache.poll(timeout, unit);
1✔
93
        return Optional.ofNullable(message);
1✔
94
    }
95

96
    @Override
97
    public synchronized List<QueueMessage> batchTake(int maxBatchSize, long timeout, TimeUnit unit) throws InterruptedException {
98
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
99
        QueueMessage poll = this.messageCache.poll(timeout, unit);
1✔
100
        if (Objects.nonNull(poll)) {
1✔
101
            result.add(poll);
1✔
102
            this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
103
        }
104
        return result;
1✔
105
    }
106

107
    @Override
108
    public synchronized Optional<QueueMessage> poll() {
109
        QueueMessage message = this.messageCache.poll();
1✔
110
        return Optional.ofNullable(message);
1✔
111
    }
112

113
    @Override
114
    public synchronized List<QueueMessage> batchPoll(int maxBatchSize) {
115
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
116
        this.messageCache.drainTo(result, maxBatchSize);
1✔
117
        return result;
1✔
118
    }
119

120
    @Override
121
    public synchronized void ack(final QueueMessage message) {
122
        if (Objects.isNull(message)) {
1✔
123
            return;
1✔
124
        }
125

126
        if (message.getPositionVersion() != positionVersion.get()) {
1✔
127
            return;
×
128
        }
129
        ackedReadPosition.set(message.getPosition());
1✔
130
    }
1✔
131

132
    @Override
133
    public synchronized void ack(final List<QueueMessage> messages) {
134
        if (Objects.isNull(messages) || messages.isEmpty()) {
1✔
135
            return;
1✔
136
        }
137
        QueueMessage lastOne = messages.get(messages.size() - 1);
1✔
138
        if (lastOne.getPositionVersion() != positionVersion.get()) {
1✔
139
            return;
×
140
        }
141
        ackedReadPosition.set(lastOne.getPosition());
1✔
142
    }
1✔
143

144
    @Override
145
    public boolean moveToPosition(final long position) {
146
        logDebug("[moveToPosition] start");
1✔
147
        stopReadToCache();
1✔
148
        try {
149
            internalLock.lock();
1✔
150
            return moveToPositionInternal(position);
1✔
151
        } finally {
152
            internalLock.unlock();
1✔
153
            startReadToCache();
1✔
154
            logDebug("[moveToPosition] end");
1✔
155
        }
×
156
    }
157

158
    @Override
159
    public boolean moveToTimestamp(final long timestamp) {
160
        logDebug("[moveToTimestamp] start, timestamp: {}", timestamp);
1✔
161
        stopReadToCache();
1✔
162
        try {
163
            internalLock.lock();
1✔
164
            Optional<Long> positionOptional = findPosition(timestamp);
1✔
165
            if (!positionOptional.isPresent()) {
1✔
166
                return false;
1✔
167
            }
168
            Long position = positionOptional.get();
1✔
169
            return moveToPositionInternal(position);
1✔
170
        } finally {
171
            internalLock.unlock();
1✔
172
            startReadToCache();
1✔
173
            logDebug("[moveToTimestamp] end");
1✔
174
        }
×
175
    }
176

177
    @Override
178
    public Optional<QueueMessage> get(final String messageKey) {
179
        return get(messageKey, 0L, Long.MAX_VALUE);
1✔
180
    }
181

182
    @Override
183
    public Optional<QueueMessage> get(final String messageKey, long searchTimestampStart, long searchTimestampEnd) {
184
        if (messageKey == null || messageKey.isEmpty()) {
1✔
185
            return Optional.empty();
1✔
186
        }
187
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
188
            moveToNearByTimestamp(tailer, searchTimestampStart);
1✔
189
            while (true) {
190
                // for performance, ignore read content.
191
                InternalReadMessage internalReadMessage = new InternalReadMessage(true);
1✔
192
                boolean readResult = tailer.readBytes(internalReadMessage);
1✔
193
                if (!readResult) {
1✔
194
                    return Optional.empty();
1✔
195
                }
196
                if (internalReadMessage.getWriteTime() < searchTimestampStart) {
1✔
197
                    continue;
1✔
198
                }
199
                if (internalReadMessage.getWriteTime() > searchTimestampEnd) {
1✔
200
                    return Optional.empty();
×
201
                }
202
                boolean moveToResult = tailer.moveToIndex(tailer.lastReadIndex());
1✔
203
                if (!moveToResult) {
1✔
204
                    return Optional.empty();
×
205
                }
206
                internalReadMessage = new InternalReadMessage();
1✔
207
                readResult = tailer.readBytes(internalReadMessage);
1✔
208
                if (!readResult) {
1✔
209
                    return Optional.empty();
×
210
                }
211
                QueueMessage queueMessage = toQueueMessage(internalReadMessage, tailer.lastReadIndex());
1✔
212
                if (Objects.equals(messageKey, queueMessage.getMessageKey())) {
1✔
213
                    return Optional.of(queueMessage);
1✔
214
                }
215
            }
1✔
216
        }
1✔
217
    }
218

219
    private QueueMessage toQueueMessage(final InternalReadMessage internalReadMessage, final long position) {
220
        return new QueueMessage(
1✔
221
                internalReadMessage.getMessageKey(),
1✔
222
                positionVersion.get(),
1✔
223
                position,
224
                internalReadMessage.getContent(),
1✔
225
                internalReadMessage.getWriteTime());
1✔
226
    }
227

228
    private boolean moveToPositionInternal(final long position) {
229
        return CompletableFuture.supplyAsync(() -> {
1✔
230
            try {
231
                logDebug("[moveToPositionInternal] start, position: {}", position);
1✔
232
                boolean moveToResult = mainTailer.moveToIndex(position);
1✔
233
                if (moveToResult) {
1✔
234
                    positionVersion.incrementAndGet();
1✔
235
                    messageCache.clear();
1✔
236
                    ackedReadPosition.set(position);
1✔
237
                }
238
                return moveToResult;
1✔
239
            } finally {
240
                logDebug("[moveToPositionInternal] end");
1✔
241
            }
×
242
        }, this.readCacheExecutor).join();
1✔
243
    }
244

245
    private Optional<Long> findPosition(final long timestamp) {
246
        logDebug("[findPosition] start, timestamp: {}", timestamp);
1✔
247
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
248
            moveToNearByTimestamp(tailer, timestamp);
1✔
249
            while (true) {
250
                InternalReadMessage internalReadMessage = new InternalReadMessage(true);
1✔
251
                boolean resultResult = tailer.readBytes(internalReadMessage);
1✔
252
                if (resultResult) {
1✔
253
                    if (internalReadMessage.getWriteTime() >= timestamp) {
1✔
254
                        return Optional.of(tailer.lastReadIndex());
1✔
255
                    }
256
                } else {
257
                    return Optional.empty();
1✔
258
                }
259
            }
1✔
260
        } finally {
1✔
261
            logDebug("[findPosition] end");
1✔
262
        }
×
263
    }
264

265
    public long getAckedReadPosition() {
266
        return ackedReadPosition.get();
1✔
267
    }
268

269
    @Override
270
    public boolean isClosed() {
271
        return isClosed.get();
1✔
272
    }
273

274
    private void stopReadToCache() {
275
        isReadToCacheRunning.set(false);
1✔
276
    }
1✔
277

278
    private void startReadToCache() {
279
        this.isReadToCacheRunning.set(true);
1✔
280
        readCacheExecutor.execute(this::readToCache);
1✔
281
    }
1✔
282

283
    private void readToCache() {
284
        try {
285
            logDebug("[readToCache] start");
1✔
286
            internalLock.lock();
1✔
287
            long pullInterval = config.getPullInterval();
1✔
288
            long fillCacheInterval = config.getFillCacheInterval();
1✔
289
            while (isReadToCacheRunning.get() && !isClosing.get()) {
1✔
290
                try {
291
                    InternalReadMessage internalReadMessage = new InternalReadMessage();
1✔
292
                    boolean readResult = mainTailer.readBytes(internalReadMessage);
1✔
293
                    if (!readResult) {
1✔
294
                        TimeUnit.MILLISECONDS.sleep(pullInterval);
1✔
295
                        continue;
1✔
296
                    }
297
                    long lastedReadIndex = mainTailer.lastReadIndex();
1✔
298
                    QueueMessage queueMessage = toQueueMessage(internalReadMessage, lastedReadIndex);
1✔
299
                    boolean offerResult = this.messageCache.offer(queueMessage, fillCacheInterval, TimeUnit.MILLISECONDS);
1✔
300
                    if (!offerResult) {
1✔
301
                        // if offer failed, move to last read position
302
                        mainTailer.moveToIndex(lastedReadIndex);
1✔
303
                    }
304
                } catch (InterruptedException e) {
×
305
                    Thread.currentThread().interrupt();
×
306
                }
1✔
307
            }
308
        } finally {
309
            internalLock.unlock();
1✔
310
            logDebug("[readToCache] end");
1✔
311
        }
1✔
312
    }
1✔
313

314
    private ExcerptTailer initMainTailer() {
315
        return CompletableFuture.supplyAsync(this::initMainTailerInternal, this.readCacheExecutor).join();
1✔
316
    }
317

318
    private ExcerptTailer initMainTailerInternal() {
319
        try {
320
            logDebug("[initExcerptTailerInternal] start");
1✔
321
            ExcerptTailer tailer = queue.createTailer();
1✔
322
            Optional<Long> lastPositionOptional = getLastPosition();
1✔
323
            if (lastPositionOptional.isPresent()) {
1✔
324
                Long position = lastPositionOptional.get();
1✔
325
                long beginPosition = position + 1;
1✔
326
                tailer.moveToIndex(beginPosition);
1✔
327
                logDebug("[initExcerptTailerInternal] find last position and move to position: {}", beginPosition);
1✔
328
            } else {
1✔
329
                ConsumeFromWhere consumeFromWhere = this.config.getConsumeFromWhere();
1✔
330
                if (consumeFromWhere == ConsumeFromWhere.LAST) {
1✔
331
                    tailer.toEnd();
1✔
332
                    logDebug("[initExcerptTailerInternal] move to end");
1✔
333
                } else if (consumeFromWhere == ConsumeFromWhere.FIRST) {
1✔
334
                    tailer.toStart();
1✔
335
                    logDebug("[initExcerptTailerInternal] move to start");
1✔
336
                }
337
            }
338
            return tailer;
1✔
339
        } finally {
340
            logDebug("[initExcerptTailer] end");
1✔
341
        }
×
342

343
    }
344

345
    /// region position
346

347
    private void flushPosition() {
348
        if (ackedReadPosition.get() != -1) {
1✔
349
            setLastPosition(this.ackedReadPosition.get());
1✔
350
        }
351
    }
1✔
352

353
    private Optional<Long> getLastPosition() {
354
        return positionStore.get(config.getConsumerId());
1✔
355
    }
356

357
    private void setLastPosition(long position) {
358
        positionStore.put(config.getConsumerId(), position);
1✔
359
    }
1✔
360

361
    /// endregion
362

363
    @Override
364
    public void close() {
365
        synchronized (closeLocker) {
1✔
366
            try {
367
                logDebug("[close] start");
1✔
368
                if (isClosing.get()) {
1✔
369
                    logDebug("[close] is closing");
1✔
370
                    return;
1✔
371
                }
372
                isClosing.set(true);
1✔
373
                this.internalLock.lock();
1✔
374
                stopReadToCache();
1✔
375
                this.internalLock.unlock();
1✔
376
                if (!positionStore.isClosed()) {
1✔
377
                    positionStore.close();
1✔
378
                }
379
                scheduler.shutdown();
1✔
380
                readCacheExecutor.shutdown();
1✔
381
                try {
382
                    if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
NEW
383
                        scheduler.shutdownNow();
×
384
                    }
385
                    if (!readCacheExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
NEW
386
                        readCacheExecutor.shutdownNow();
×
387
                    }
NEW
388
                } catch (InterruptedException e) {
×
NEW
389
                    scheduler.shutdownNow();
×
UNCOV
390
                    readCacheExecutor.shutdownNow();
×
NEW
391
                    Thread.currentThread().interrupt();
×
392
                }
1✔
393
                if (!queue.isClosed()) {
1✔
394
                    queue.close();
1✔
395
                }
396

397
                for (CloseListener closeListener : closeListenerList) {
1✔
398
                    closeListener.onClose();
1✔
399
                }
1✔
400
                isClosed.set(true);
1✔
401
            } finally {
402
                logDebug("[close] end");
1✔
403
            }
1✔
404
        }
1✔
405
    }
1✔
406

407
    private void moveToNearByTimestamp(ExcerptTailer tailer, long timestamp) {
408
        int expectedCycle = ChronicleQueueHelper.cycle(defaultRollCycle, timeProvider, timestamp);
1✔
409
        int currentCycle = tailer.cycle();
1✔
410
        if (currentCycle != expectedCycle) {
1✔
411
            boolean moveToCycleResult = tailer.moveToCycle(expectedCycle);
1✔
412
            logDebug("[moveToNearByTimestamp] moveToCycleResult: {}", moveToCycleResult);
1✔
413
        }
414
    }
1✔
415

416
    @Override
417
    public void addCloseListener(CloseListener listener) {
418
        closeListenerList.add(listener);
1✔
419
    }
1✔
420

421
    // region logger
422

423
    private void logDebug(String format) {
424
        if (logger.isDebugEnabled()) {
1✔
425
            logger.debug(format);
×
426
        }
427
    }
1✔
428

429
    private void logDebug(String format, Object arg) {
430
        if (logger.isDebugEnabled()) {
1✔
431
            logger.debug(format, arg);
×
432
        }
433
    }
1✔
434

435
    // endregion
436
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc