• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #9

29 Jan 2025 03:00AM UTC coverage: 89.817% (-3.1%) from 92.954%
#9

push

wz2cool
region collapse

344 of 383 relevant lines covered (89.82%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.27
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleConsumer.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IConsumer;
4
import com.github.wz2cool.localqueue.model.config.SimpleConsumerConfig;
5
import com.github.wz2cool.localqueue.model.message.InternalReadMessage;
6
import com.github.wz2cool.localqueue.model.message.QueueMessage;
7
import net.openhft.chronicle.queue.ChronicleQueue;
8
import net.openhft.chronicle.queue.ExcerptTailer;
9
import net.openhft.chronicle.queue.RollCycles;
10
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
11

12
import java.util.ArrayList;
13
import java.util.List;
14
import java.util.Objects;
15
import java.util.Optional;
16
import java.util.concurrent.*;
17
import java.util.concurrent.atomic.AtomicInteger;
18
import java.util.concurrent.locks.Lock;
19
import java.util.concurrent.locks.ReentrantLock;
20

21
/**
22
 * simple consumer
23
 *
24
 * @author frank
25
 */
26
public class SimpleConsumer implements IConsumer, AutoCloseable {
27

28
    private final SimpleConsumerConfig config;
29
    private final PositionStore positionStore;
30

31
    private final SingleChronicleQueue queue;
32
    private final ThreadLocal<ExcerptTailer> tailerThreadLocal;
33
    private final ExecutorService readCacheExecutor = Executors.newSingleThreadExecutor();
1✔
34
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1✔
35
    private final LinkedBlockingQueue<QueueMessage> messageCache;
36
    private volatile long ackedReadPosition = -1;
1✔
37
    private volatile boolean isReadToCacheRunning = true;
1✔
38
    private volatile boolean isClosing = false;
1✔
39
    private volatile boolean isClosed = false;
1✔
40
    private final AtomicInteger positionVersion = new AtomicInteger(0);
1✔
41

42
    private final Lock internalLock = new ReentrantLock();
1✔
43

44

45
    /**
46
     * constructor
47
     *
48
     * @param config the config of consumer
49
     */
50
    public SimpleConsumer(final SimpleConsumerConfig config) {
1✔
51
        this.config = config;
1✔
52
        this.messageCache = new LinkedBlockingQueue<>(config.getCacheSize());
1✔
53
        this.positionStore = new PositionStore(config.getPositionFile());
1✔
54
        this.queue = ChronicleQueue.singleBuilder(config.getDataDir()).rollCycle(RollCycles.FAST_DAILY).build();
1✔
55
        this.tailerThreadLocal = ThreadLocal.withInitial(this::initExcerptTailer);
1✔
56
        scheduler.scheduleAtFixedRate(this::flushPosition, 0, config.getFlushPositionInterval(), TimeUnit.MILLISECONDS);
1✔
57
        startReadToCache();
1✔
58
    }
1✔
59

60
    @Override
61
    public synchronized QueueMessage take() throws InterruptedException {
62
        return this.messageCache.take();
1✔
63
    }
64

65
    @Override
66
    public synchronized List<QueueMessage> batchTake(int maxBatchSize) throws InterruptedException {
67
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
68
        QueueMessage take = this.messageCache.take();
1✔
69
        result.add(take);
1✔
70
        this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
71
        return result;
1✔
72
    }
73

74
    @Override
75
    public synchronized Optional<QueueMessage> take(long timeout, TimeUnit unit) throws InterruptedException {
76
        QueueMessage message = this.messageCache.poll(timeout, unit);
1✔
77
        return Optional.ofNullable(message);
1✔
78
    }
79

80
    @Override
81
    public synchronized List<QueueMessage> batchTake(int maxBatchSize, long timeout, TimeUnit unit) throws InterruptedException {
82
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
83
        QueueMessage poll = this.messageCache.poll(timeout, unit);
1✔
84
        if (Objects.nonNull(poll)) {
1✔
85
            result.add(poll);
1✔
86
            this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
87
        }
88
        return result;
1✔
89
    }
90

91
    @Override
92
    public synchronized Optional<QueueMessage> poll() {
93
        QueueMessage message = this.messageCache.poll();
1✔
94
        return Optional.ofNullable(message);
1✔
95
    }
96

97
    @Override
98
    public synchronized List<QueueMessage> batchPoll(int maxBatchSize) {
99
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
100
        this.messageCache.drainTo(result, maxBatchSize);
1✔
101
        return result;
1✔
102
    }
103

104
    @Override
105
    public synchronized void ack(final QueueMessage message) {
106
        if (Objects.isNull(message)) {
1✔
107
            return;
1✔
108
        }
109

110
        if (message.getPositionVersion() != positionVersion.get()) {
1✔
111
            return;
×
112
        }
113

114
        this.ackedReadPosition = message.getPosition();
1✔
115
    }
1✔
116

117
    @Override
118
    public synchronized void ack(final List<QueueMessage> messages) {
119
        if (Objects.isNull(messages) || messages.isEmpty()) {
1✔
120
            return;
1✔
121
        }
122
        QueueMessage lastOne = messages.get(messages.size() - 1);
1✔
123
        if (lastOne.getPositionVersion() != positionVersion.get()) {
1✔
124
            return;
×
125
        }
126
        this.ackedReadPosition = lastOne.getPosition();
1✔
127
    }
1✔
128

129
    @Override
130
    public boolean moveToPosition(final long position) {
131
        stopReadToCache();
1✔
132
        try {
133
            internalLock.lock();
1✔
134
            return CompletableFuture.supplyAsync(() -> {
1✔
135
                ExcerptTailer tailer = tailerThreadLocal.get();
1✔
136
                boolean moveToResult = tailer.moveToIndex(position);
1✔
137
                if (moveToResult) {
1✔
138
                    positionVersion.incrementAndGet();
1✔
139
                    messageCache.clear();
1✔
140
                    this.ackedReadPosition = position;
1✔
141
                }
142
                return moveToResult;
1✔
143
            }, this.readCacheExecutor).join();
1✔
144
        } finally {
145
            internalLock.unlock();
1✔
146
            startReadToCache();
1✔
147
        }
×
148
    }
149

150
    @Override
151
    public boolean moveToTimestamp(long timestamp) {
152
        Optional<Long> positionOptional = findPosition(timestamp);
×
153
        if (!positionOptional.isPresent()) {
×
154
            return false;
×
155
        }
156
        Long position = positionOptional.get();
×
157
        return moveToPosition(position);
×
158
    }
159

160
    private Optional<Long> findPosition(final long timestamp) {
161
        try (ExcerptTailer excerptTailer = initExcerptTailer()) {
×
162
            while (true) {
163
                InternalReadMessage internalReadMessage = new InternalReadMessage();
×
164
                boolean resultResult = excerptTailer.readBytes(internalReadMessage);
×
165
                if (resultResult) {
×
166
                    if (internalReadMessage.getWriteTime() >= timestamp) {
×
167
                        return Optional.of(excerptTailer.lastReadIndex());
×
168
                    }
169
                } else {
170
                    return Optional.empty();
×
171
                }
172
            }
×
173
        }
×
174
    }
175

176
    public long getAckedReadPosition() {
177
        return ackedReadPosition;
1✔
178
    }
179

180
    public boolean isClosed() {
181
        return isClosed;
1✔
182
    }
183

184
    private void stopReadToCache() {
185
        this.isReadToCacheRunning = false;
1✔
186
    }
1✔
187

188
    private void startReadToCache() {
189
        this.isReadToCacheRunning = true;
1✔
190
        readCacheExecutor.execute(this::readToCache);
1✔
191
    }
1✔
192

193
    private void readToCache() {
194
        try {
195
            internalLock.lock();
1✔
196
            long pullInterval = config.getPullInterval();
1✔
197
            while (this.isReadToCacheRunning && !isClosing) {
1✔
198
                try {
199
                    ExcerptTailer tailer = tailerThreadLocal.get();
1✔
200
                    InternalReadMessage internalReadMessage = new InternalReadMessage();
1✔
201
                    boolean readResult = tailer.readBytes(internalReadMessage);
1✔
202
                    if (!readResult) {
1✔
203
                        TimeUnit.MILLISECONDS.sleep(pullInterval);
1✔
204
                        continue;
1✔
205
                    }
206
                    long lastedReadIndex = tailer.lastReadIndex();
1✔
207
                    QueueMessage queueMessage = new QueueMessage(
1✔
208
                            positionVersion.get(),
1✔
209
                            lastedReadIndex,
210
                            internalReadMessage.getContent(),
1✔
211
                            internalReadMessage.getWriteTime());
1✔
212
                    this.messageCache.put(queueMessage);
1✔
213
                } catch (InterruptedException e) {
×
214
                    Thread.currentThread().interrupt();
×
215
                }
1✔
216
            }
217
        } finally {
218
            internalLock.unlock();
1✔
219
        }
1✔
220
    }
1✔
221

222

223
    private ExcerptTailer initExcerptTailer() {
224
        ExcerptTailer tailer = queue.createTailer();
1✔
225
        Optional<Long> lastPositionOptional = getLastPosition();
1✔
226
        if (lastPositionOptional.isPresent()) {
1✔
227
            Long position = lastPositionOptional.get();
1✔
228
            long beginPosition = position + 1;
1✔
229
            tailer.moveToIndex(beginPosition);
1✔
230
        }
231
        return tailer;
1✔
232
    }
233

234
    /// region position
235

236
    private void flushPosition() {
237
        if (ackedReadPosition != -1) {
1✔
238
            setLastPosition(this.ackedReadPosition);
1✔
239
        }
240
    }
1✔
241

242
    private Optional<Long> getLastPosition() {
243
        Long position = positionStore.get(config.getConsumerId());
1✔
244
        if (position == null) {
1✔
245
            return Optional.empty();
1✔
246
        }
247
        return Optional.of(position);
1✔
248
    }
249

250
    private void setLastPosition(long position) {
251
        positionStore.put(config.getConsumerId(), position);
1✔
252
    }
1✔
253

254
    /// endregion
255

256
    @Override
257
    public synchronized void close() {
258
        isClosing = true;
1✔
259
        this.internalLock.lock();
1✔
260
        stopReadToCache();
1✔
261
        this.internalLock.unlock();
1✔
262

263
        positionStore.close();
1✔
264
        scheduler.shutdown();
1✔
265
        readCacheExecutor.shutdown();
1✔
266
        try {
267
            if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
268
                scheduler.shutdownNow();
×
269
            }
270
            if (!readCacheExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
271
                readCacheExecutor.shutdownNow();
×
272
            }
273
        } catch (InterruptedException e) {
×
274
            scheduler.shutdownNow();
×
275
            readCacheExecutor.shutdownNow();
×
276
            Thread.currentThread().interrupt();
×
277
        }
1✔
278
        tailerThreadLocal.remove();
1✔
279
        queue.close();
1✔
280
        isClosed = true;
1✔
281

282
    }
1✔
283
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc