• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #17

30 Jan 2025 07:26AM UTC coverage: 91.466% (-1.0%) from 92.494%
#17

push

wz2cool
ops code

34 of 35 new or added lines in 2 files covered. (97.14%)

27 existing lines in 2 files now uncovered.

418 of 457 relevant lines covered (91.47%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.66
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleConsumer.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IConsumer;
4
import com.github.wz2cool.localqueue.model.config.SimpleConsumerConfig;
5
import com.github.wz2cool.localqueue.model.enums.ConsumeFromWhere;
6
import com.github.wz2cool.localqueue.model.message.InternalReadMessage;
7
import com.github.wz2cool.localqueue.model.message.QueueMessage;
8
import net.openhft.chronicle.queue.ChronicleQueue;
9
import net.openhft.chronicle.queue.ExcerptTailer;
10
import net.openhft.chronicle.queue.RollCycles;
11
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
12
import org.slf4j.Logger;
13
import org.slf4j.LoggerFactory;
14

15
import java.util.ArrayList;
16
import java.util.List;
17
import java.util.Objects;
18
import java.util.Optional;
19
import java.util.concurrent.*;
20
import java.util.concurrent.atomic.AtomicInteger;
21
import java.util.concurrent.locks.Lock;
22
import java.util.concurrent.locks.ReentrantLock;
23

24
/**
25
 * simple consumer
26
 *
27
 * @author frank
28
 */
29
public class SimpleConsumer implements IConsumer, AutoCloseable {
30

31
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
1✔
32
    private final SimpleConsumerConfig config;
33
    private final PositionStore positionStore;
34
    private final SingleChronicleQueue queue;
35
    // should only call by readCacheExecutor
36
    private final ExcerptTailer mainTailer;
37
    private final ExecutorService readCacheExecutor = Executors.newSingleThreadExecutor();
1✔
38
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1✔
39
    private final LinkedBlockingQueue<QueueMessage> messageCache;
40
    private volatile long ackedReadPosition = -1;
1✔
41
    private volatile boolean isReadToCacheRunning = true;
1✔
42
    private volatile boolean isClosing = false;
1✔
43
    private volatile boolean isClosed = false;
1✔
44
    private final AtomicInteger positionVersion = new AtomicInteger(0);
1✔
45
    private final Lock internalLock = new ReentrantLock();
1✔
46

47
    /**
48
     * constructor
49
     *
50
     * @param config the config of consumer
51
     */
52
    public SimpleConsumer(final SimpleConsumerConfig config) {
1✔
53
        this.config = config;
1✔
54
        this.messageCache = new LinkedBlockingQueue<>(config.getCacheSize());
1✔
55
        this.positionStore = new PositionStore(config.getPositionFile());
1✔
56
        this.queue = ChronicleQueue.singleBuilder(config.getDataDir()).rollCycle(RollCycles.FAST_DAILY).build();
1✔
57
        this.mainTailer = initMainTailer();
1✔
58
        startReadToCache();
1✔
59
        scheduler.scheduleAtFixedRate(this::flushPosition, 0, config.getFlushPositionInterval(), TimeUnit.MILLISECONDS);
1✔
60
    }
1✔
61

62
    @Override
63
    public synchronized QueueMessage take() throws InterruptedException {
64
        return this.messageCache.take();
1✔
65
    }
66

67
    @Override
68
    public synchronized List<QueueMessage> batchTake(int maxBatchSize) throws InterruptedException {
69
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
70
        QueueMessage take = this.messageCache.take();
1✔
71
        result.add(take);
1✔
72
        this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
73
        return result;
1✔
74
    }
75

76
    @Override
77
    public synchronized Optional<QueueMessage> take(long timeout, TimeUnit unit) throws InterruptedException {
78
        QueueMessage message = this.messageCache.poll(timeout, unit);
1✔
79
        return Optional.ofNullable(message);
1✔
80
    }
81

82
    @Override
83
    public synchronized List<QueueMessage> batchTake(int maxBatchSize, long timeout, TimeUnit unit) throws InterruptedException {
84
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
85
        QueueMessage poll = this.messageCache.poll(timeout, unit);
1✔
86
        if (Objects.nonNull(poll)) {
1✔
87
            result.add(poll);
1✔
88
            this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
89
        }
90
        return result;
1✔
91
    }
92

93
    @Override
94
    public synchronized Optional<QueueMessage> poll() {
95
        QueueMessage message = this.messageCache.poll();
1✔
96
        return Optional.ofNullable(message);
1✔
97
    }
98

99
    @Override
100
    public synchronized List<QueueMessage> batchPoll(int maxBatchSize) {
101
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
102
        this.messageCache.drainTo(result, maxBatchSize);
1✔
103
        return result;
1✔
104
    }
105

106
    @Override
107
    public synchronized void ack(final QueueMessage message) {
108
        if (Objects.isNull(message)) {
1✔
109
            return;
1✔
110
        }
111

112
        if (message.getPositionVersion() != positionVersion.get()) {
1✔
UNCOV
113
            return;
×
114
        }
115

116
        this.ackedReadPosition = message.getPosition();
1✔
117
    }
1✔
118

119
    @Override
120
    public synchronized void ack(final List<QueueMessage> messages) {
121
        if (Objects.isNull(messages) || messages.isEmpty()) {
1✔
122
            return;
1✔
123
        }
124
        QueueMessage lastOne = messages.get(messages.size() - 1);
1✔
125
        if (lastOne.getPositionVersion() != positionVersion.get()) {
1✔
UNCOV
126
            return;
×
127
        }
128
        this.ackedReadPosition = lastOne.getPosition();
1✔
129
    }
1✔
130

131
    @Override
132
    public boolean moveToPosition(final long position) {
133
        logDebug("[moveToPosition] start");
1✔
134
        stopReadToCache();
1✔
135
        try {
136
            internalLock.lock();
1✔
137
            return moveToPositionInternal(position);
1✔
138
        } finally {
139
            internalLock.unlock();
1✔
140
            startReadToCache();
1✔
141
            logDebug("[moveToPosition] end");
1✔
UNCOV
142
        }
×
143
    }
144

145
    @Override
146
    public boolean moveToTimestamp(final long timestamp) {
147
        logDebug("[moveToTimestamp] start, timestamp: {}", timestamp);
1✔
148
        stopReadToCache();
1✔
149
        try {
150
            internalLock.lock();
1✔
151
            Optional<Long> positionOptional = findPosition(timestamp);
1✔
152
            if (!positionOptional.isPresent()) {
1✔
153
                return false;
1✔
154
            }
155
            Long position = positionOptional.get();
1✔
156
            return moveToPositionInternal(position);
1✔
157
        } finally {
158
            internalLock.unlock();
1✔
159
            startReadToCache();
1✔
160
            logDebug("[moveToTimestamp] end");
1✔
UNCOV
161
        }
×
162
    }
163

164
    private boolean moveToPositionInternal(final long position) {
165
        return CompletableFuture.supplyAsync(() -> {
1✔
166
            try {
167
                logDebug("[moveToPositionInternal] start, position: {}", position);
1✔
168
                boolean moveToResult = mainTailer.moveToIndex(position);
1✔
169
                if (moveToResult) {
1✔
170
                    positionVersion.incrementAndGet();
1✔
171
                    messageCache.clear();
1✔
172
                    this.ackedReadPosition = position;
1✔
173
                }
174
                return moveToResult;
1✔
175
            } finally {
176
                logDebug("[moveToPositionInternal] end");
1✔
UNCOV
177
            }
×
178
        }, this.readCacheExecutor).join();
1✔
179
    }
180

181
    private Optional<Long> findPosition(final long timestamp) {
182
        logDebug("[findPosition] start, timestamp: {}", timestamp);
1✔
183
        try (ExcerptTailer excerptTailer = queue.createTailer()) {
1✔
184
            while (true) {
185
                InternalReadMessage internalReadMessage = new InternalReadMessage();
1✔
186
                boolean resultResult = excerptTailer.readBytes(internalReadMessage);
1✔
187
                if (resultResult) {
1✔
188
                    if (internalReadMessage.getWriteTime() >= timestamp) {
1✔
189
                        return Optional.of(excerptTailer.lastReadIndex());
1✔
190
                    }
191
                } else {
192
                    return Optional.empty();
1✔
193
                }
194
            }
1✔
195
        } finally {
1✔
196
            logDebug("[findPosition] end");
1✔
UNCOV
197
        }
×
198
    }
199

200
    public long getAckedReadPosition() {
201
        return ackedReadPosition;
1✔
202
    }
203

204
    public boolean isClosed() {
205
        return isClosed;
1✔
206
    }
207

208
    private void stopReadToCache() {
209
        this.isReadToCacheRunning = false;
1✔
210
    }
1✔
211

212
    private void startReadToCache() {
213
        this.isReadToCacheRunning = true;
1✔
214
        readCacheExecutor.execute(this::readToCache);
1✔
215
    }
1✔
216

217
    private void readToCache() {
218
        try {
219
            logDebug("[readToCache] start");
1✔
220
            internalLock.lock();
1✔
221
            long pullInterval = config.getPullInterval();
1✔
222
            long fillCacheInterval = config.getFillCacheInterval();
1✔
223
            while (this.isReadToCacheRunning && !isClosing) {
1✔
224
                try {
225
                    InternalReadMessage internalReadMessage = new InternalReadMessage();
1✔
226
                    boolean readResult = mainTailer.readBytes(internalReadMessage);
1✔
227
                    if (!readResult) {
1✔
228
                        TimeUnit.MILLISECONDS.sleep(pullInterval);
1✔
229
                        continue;
1✔
230
                    }
231
                    long lastedReadIndex = mainTailer.lastReadIndex();
1✔
232
                    QueueMessage queueMessage = new QueueMessage(
1✔
233
                            positionVersion.get(),
1✔
234
                            lastedReadIndex,
235
                            internalReadMessage.getContent(),
1✔
236
                            internalReadMessage.getWriteTime());
1✔
237
                    boolean offerResult = this.messageCache.offer(queueMessage, fillCacheInterval, TimeUnit.MILLISECONDS);
1✔
238
                    if (!offerResult) {
1✔
239
                        // if offer failed, move to last read position
240
                        mainTailer.moveToIndex(lastedReadIndex);
1✔
241
                    }
UNCOV
242
                } catch (InterruptedException e) {
×
UNCOV
243
                    Thread.currentThread().interrupt();
×
244
                }
1✔
245
            }
246
        } finally {
247
            internalLock.unlock();
1✔
248
            logDebug("[readToCache] end");
1✔
249
        }
1✔
250
    }
1✔
251

252
    private ExcerptTailer initMainTailer() {
253
        return CompletableFuture.supplyAsync(this::initMainTailerInternal, this.readCacheExecutor).join();
1✔
254
    }
255

256
    private ExcerptTailer initMainTailerInternal() {
257
        try {
258
            logDebug("[initExcerptTailerInternal] start");
1✔
259
            ExcerptTailer tailer = queue.createTailer();
1✔
260
            Optional<Long> lastPositionOptional = getLastPosition();
1✔
261
            if (lastPositionOptional.isPresent()) {
1✔
262
                Long position = lastPositionOptional.get();
1✔
263
                long beginPosition = position + 1;
1✔
264
                tailer.moveToIndex(beginPosition);
1✔
265
                logDebug("[initExcerptTailerInternal] find last position and move to position: {}", beginPosition);
1✔
266
            } else {
1✔
267
                ConsumeFromWhere consumeFromWhere = this.config.getConsumeFromWhere();
1✔
268
                if (consumeFromWhere == ConsumeFromWhere.LAST) {
1✔
269
                    tailer.toEnd();
1✔
270
                    logDebug("[initExcerptTailerInternal] move to end");
1✔
271
                } else if (consumeFromWhere == ConsumeFromWhere.FIRST) {
1✔
272
                    tailer.toStart();
1✔
273
                    logDebug("[initExcerptTailerInternal] move to start");
1✔
274
                }
275
            }
276
            return tailer;
1✔
277
        } finally {
278
            logDebug("[initExcerptTailer] end");
1✔
UNCOV
279
        }
×
280

281
    }
282

283
    /// region position
284

285
    private void flushPosition() {
286
        if (ackedReadPosition != -1) {
1✔
287
            setLastPosition(this.ackedReadPosition);
1✔
288
        }
289
    }
1✔
290

291
    private Optional<Long> getLastPosition() {
292
        Long position = positionStore.get(config.getConsumerId());
1✔
293
        if (position == null) {
1✔
294
            return Optional.empty();
1✔
295
        }
296
        return Optional.of(position);
1✔
297
    }
298

299
    private void setLastPosition(long position) {
300
        positionStore.put(config.getConsumerId(), position);
1✔
301
    }
1✔
302

303
    /// endregion
304

305
    @Override
306
    public void close() {
307
        logDebug("[close] start");
1✔
308
        try {
309
            isClosing = true;
1✔
310
            this.internalLock.lock();
1✔
311
            stopReadToCache();
1✔
312
            this.internalLock.unlock();
1✔
313

314
            positionStore.close();
1✔
315
            scheduler.shutdown();
1✔
316
            readCacheExecutor.shutdown();
1✔
317
            try {
318
                if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
UNCOV
319
                    scheduler.shutdownNow();
×
320
                }
321
                if (!readCacheExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
UNCOV
322
                    readCacheExecutor.shutdownNow();
×
323
                }
UNCOV
324
            } catch (InterruptedException e) {
×
UNCOV
325
                scheduler.shutdownNow();
×
UNCOV
326
                readCacheExecutor.shutdownNow();
×
UNCOV
327
                Thread.currentThread().interrupt();
×
328
            }
1✔
329
            queue.close();
1✔
330
            isClosed = true;
1✔
331
        } finally {
332
            logDebug("[close] end");
1✔
333
        }
1✔
334
    }
1✔
335

336
    private void logDebug(String format) {
337
        if (logger.isDebugEnabled()) {
1✔
UNCOV
338
            logger.debug(format);
×
339
        }
340
    }
1✔
341

342
    private void logDebug(String format, Object arg) {
343
        if (logger.isDebugEnabled()) {
1✔
NEW
344
            logger.debug(format, arg);
×
345
        }
346
    }
1✔
347
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc