• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #88

11 Feb 2025 01:57PM UTC coverage: 90.82% (-0.4%) from 91.187%
#88

push

web-flow
Merge pull request #16 from wz2cool/0.2.5-1

fix close slow issue

10 of 12 new or added lines in 2 files covered. (83.33%)

3 existing lines in 1 file now uncovered.

742 of 817 relevant lines covered (90.82%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.37
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleConsumer.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IConsumer;
4
import com.github.wz2cool.localqueue.event.CloseListener;
5
import com.github.wz2cool.localqueue.helper.ChronicleQueueHelper;
6
import com.github.wz2cool.localqueue.model.config.SimpleConsumerConfig;
7
import com.github.wz2cool.localqueue.model.enums.ConsumeFromWhere;
8
import com.github.wz2cool.localqueue.model.message.InternalReadMessage;
9
import com.github.wz2cool.localqueue.model.message.QueueMessage;
10
import com.github.wz2cool.localqueue.model.page.PageInfo;
11
import com.github.wz2cool.localqueue.model.page.SortDirection;
12
import com.github.wz2cool.localqueue.model.page.UpDown;
13

14
import java.util.ArrayList;
15
import java.util.Arrays;
16
import java.util.Collections;
17
import java.util.List;
18
import java.util.Objects;
19
import java.util.Optional;
20
import java.util.Set;
21
import java.util.concurrent.CompletableFuture;
22
import java.util.concurrent.ConcurrentHashMap;
23
import java.util.concurrent.ConcurrentLinkedQueue;
24
import java.util.concurrent.ExecutorService;
25
import java.util.concurrent.Executors;
26
import java.util.concurrent.LinkedBlockingQueue;
27
import java.util.concurrent.ScheduledExecutorService;
28
import java.util.concurrent.TimeUnit;
29
import java.util.concurrent.atomic.AtomicBoolean;
30
import java.util.concurrent.atomic.AtomicInteger;
31
import java.util.concurrent.atomic.AtomicLong;
32

33
import net.openhft.chronicle.core.time.TimeProvider;
34
import net.openhft.chronicle.queue.ChronicleQueue;
35
import net.openhft.chronicle.queue.ExcerptTailer;
36
import net.openhft.chronicle.queue.RollCycle;
37
import net.openhft.chronicle.queue.TailerDirection;
38
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
39
import org.slf4j.Logger;
40
import org.slf4j.LoggerFactory;
41

42
/**
43
 * simple consumer
44
 *
45
 * @author frank
46
 */
47
public class SimpleConsumer implements IConsumer {
48

49
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
1✔
50
    private final RollCycle defaultRollCycle;
51
    private final TimeProvider timeProvider;
52
    private final Set<String> matchTags;
53
    private final SimpleConsumerConfig config;
54
    private final PositionStore positionStore;
55
    private final SingleChronicleQueue queue;
56
    // should only call by readCacheExecutor
57
    private final ExcerptTailer mainTailer;
58
    private final ExecutorService readCacheExecutor = Executors.newSingleThreadExecutor();
1✔
59
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1✔
60
    private final LinkedBlockingQueue<QueueMessage> messageCache;
61
    private final ConcurrentLinkedQueue<CloseListener> closeListenerList = new ConcurrentLinkedQueue<>();
1✔
62
    private final AtomicLong ackedReadPosition = new AtomicLong(-1);
1✔
63
    private final AtomicBoolean isReadToCacheRunning = new AtomicBoolean(true);
1✔
64
    private final AtomicBoolean isClosing = new AtomicBoolean(false);
1✔
65
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
1✔
66
    private final Object closeLocker = new Object();
1✔
67
    private final AtomicInteger positionVersion = new AtomicInteger(0);
1✔
68

69
    /**
70
     * constructor
71
     *
72
     * @param config the config of consumer
73
     */
74
    public SimpleConsumer(final SimpleConsumerConfig config) {
1✔
75
        this.config = config;
1✔
76
        this.matchTags = getMatchTags(config.getSelectorTag());
1✔
77
        this.timeProvider = ChronicleQueueHelper.getTimeProvider(config.getTimeZone());
1✔
78
        this.messageCache = new LinkedBlockingQueue<>(config.getCacheSize());
1✔
79
        this.positionStore = new PositionStore(config.getPositionFile());
1✔
80
        this.defaultRollCycle = ChronicleQueueHelper.getRollCycle(config.getRollCycleType());
1✔
81
        this.queue = ChronicleQueue.singleBuilder(config.getDataDir())
1✔
82
                .timeProvider(timeProvider)
1✔
83
                .rollCycle(defaultRollCycle)
1✔
84
                .build();
1✔
85
        this.mainTailer = initMainTailer();
1✔
86
        startReadToCache();
1✔
87
        scheduler.scheduleAtFixedRate(this::flushPosition, 0, config.getFlushPositionInterval(), TimeUnit.MILLISECONDS);
1✔
88
    }
1✔
89

90

91
    private final List<QueueMessage> pendingMessages = Collections.synchronizedList(new ArrayList<>());
1✔
92

93
    @Override
94
    public synchronized QueueMessage take() throws InterruptedException {
95
        if (!pendingMessages.isEmpty()) {
1✔
96
            return pendingMessages.get(0);
×
97
        }
98
        QueueMessage message = this.messageCache.take();
1✔
99
        pendingMessages.add(message);
1✔
100
        return message;
1✔
101
    }
102

103
    @Override
104
    public synchronized List<QueueMessage> batchTake(int maxBatchSize) throws InterruptedException {
105
        if (!pendingMessages.isEmpty()) {
1✔
106
            return pendingMessages.subList(0, Math.min(maxBatchSize, pendingMessages.size()));
1✔
107
        }
108
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
109
        QueueMessage take = this.messageCache.take();
1✔
110
        result.add(take);
1✔
111
        this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
112
        pendingMessages.addAll(result);
1✔
113
        return result;
1✔
114
    }
115

116
    @Override
117
    public synchronized Optional<QueueMessage> take(long timeout, TimeUnit unit) throws InterruptedException {
118
        if (!pendingMessages.isEmpty()) {
1✔
119
            return Optional.of(pendingMessages.get(0));
×
120
        }
121
        QueueMessage message = this.messageCache.poll(timeout, unit);
1✔
122
        if (Objects.nonNull(message)) {
1✔
123
            pendingMessages.add(message);
1✔
124
        }
125
        return Optional.ofNullable(message);
1✔
126
    }
127

128
    @Override
129
    public synchronized List<QueueMessage> batchTake(int maxBatchSize, long timeout, TimeUnit unit) throws InterruptedException {
130
        if (!pendingMessages.isEmpty()) {
1✔
131
            return pendingMessages.subList(0, Math.min(maxBatchSize, pendingMessages.size()));
×
132
        }
133
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
134
        QueueMessage poll = this.messageCache.poll(timeout, unit);
1✔
135
        if (Objects.nonNull(poll)) {
1✔
136
            result.add(poll);
1✔
137
            this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
138
            pendingMessages.addAll(result);
1✔
139
        }
140
        return result;
1✔
141
    }
142

143
    @Override
144
    public synchronized Optional<QueueMessage> poll() {
145
        if (!pendingMessages.isEmpty()) {
1✔
146
            return Optional.of(pendingMessages.get(0));
×
147
        }
148
        QueueMessage message = this.messageCache.poll();
1✔
149
        if (Objects.nonNull(message)) {
1✔
150
            pendingMessages.add(message);
1✔
151
        }
152
        return Optional.ofNullable(message);
1✔
153
    }
154

155
    @Override
156
    public synchronized List<QueueMessage> batchPoll(int maxBatchSize) {
157
        if (!pendingMessages.isEmpty()) {
1✔
158
            return pendingMessages.subList(0, Math.min(maxBatchSize, pendingMessages.size()));
×
159
        }
160
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
161
        this.messageCache.drainTo(result, maxBatchSize);
1✔
162
        pendingMessages.addAll(result);
1✔
163
        return result;
1✔
164
    }
165

166
    @Override
167
    public synchronized void ack(final QueueMessage message) {
168
        if (Objects.isNull(message)) {
1✔
169
            return;
1✔
170
        }
171

172
        if (message.getPositionVersion() != positionVersion.get()) {
1✔
173
            return;
×
174
        }
175
        ackedReadPosition.set(message.getPosition());
1✔
176
        pendingMessages.remove(message);
1✔
177
    }
1✔
178

179
    @Override
180
    public synchronized void ack(final List<QueueMessage> messages) {
181
        if (Objects.isNull(messages) || messages.isEmpty()) {
1✔
182
            return;
1✔
183
        }
184
        QueueMessage lastOne = messages.get(messages.size() - 1);
1✔
185
        if (lastOne.getPositionVersion() != positionVersion.get()) {
1✔
186
            return;
×
187
        }
188
        ackedReadPosition.set(lastOne.getPosition());
1✔
189
        pendingMessages.removeAll(messages);
1✔
190
    }
1✔
191

192
    @Override
193
    public boolean moveToPosition(final long position) {
194
        logDebug("[moveToPosition] start");
1✔
195
        stopReadToCache();
1✔
196
        try {
197
            return moveToPositionInternal(position);
1✔
198
        } finally {
199
            startReadToCache();
1✔
200
            logDebug("[moveToPosition] end");
1✔
201
        }
×
202
    }
203

204
    @Override
205
    public boolean moveToTimestamp(final long timestamp) {
206
        logDebug("[moveToTimestamp] start, timestamp: {}", timestamp);
1✔
207
        stopReadToCache();
1✔
208
        try {
209
            Optional<Long> positionOptional = findPosition(timestamp);
1✔
210
            if (!positionOptional.isPresent()) {
1✔
211
                return false;
1✔
212
            }
213
            Long position = positionOptional.get();
1✔
214
            boolean moveToResult = moveToPositionInternal(position);
1✔
215
            logger.info("[moveToTimestamp] timestamp: {}, moveToResult: {}", timestamp, moveToResult);
1✔
216
            return moveToResult;
1✔
217
        } finally {
218
            startReadToCache();
1✔
219
            logDebug("[moveToTimestamp] end");
1✔
220
        }
×
221
    }
222

223
    @Override
224
    public Optional<QueueMessage> get(final long position) {
225
        if (position < 0) {
1✔
226
            return Optional.empty();
×
227
        }
228
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
229
            tailer.moveToIndex(position);
1✔
230
            InternalReadMessage internalReadMessage = new InternalReadMessage();
1✔
231
            boolean readResult = tailer.readBytes(internalReadMessage);
1✔
232
            if (readResult) {
1✔
233
                return Optional.of(toQueueMessage(internalReadMessage, position));
1✔
234
            } else {
235
                return Optional.empty();
×
236
            }
237
        }
1✔
238
    }
239

240
    @Override
241
    public Optional<QueueMessage> get(final String messageKey, long searchTimestampStart, long searchTimestampEnd) {
242
        if (messageKey == null || messageKey.isEmpty()) {
1✔
243
            return Optional.empty();
1✔
244
        }
245
        // reuse this message
246
        InternalReadMessage internalReadMessage = new InternalReadMessage();
1✔
247
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
248
            moveToNearByTimestamp(tailer, searchTimestampStart);
1✔
249
            while (true) {
250
                // for performance, ignore read content.
251
                boolean readResult = tailer.readBytes(internalReadMessage);
1✔
252
                if (!readResult) {
1✔
253
                    return Optional.empty();
1✔
254
                }
255
                if (internalReadMessage.getWriteTime() < searchTimestampStart) {
1✔
256
                    continue;
1✔
257
                }
258
                if (internalReadMessage.getWriteTime() > searchTimestampEnd) {
1✔
259
                    return Optional.empty();
×
260
                }
261
                boolean moveToResult = tailer.moveToIndex(tailer.lastReadIndex());
1✔
262
                if (!moveToResult) {
1✔
263
                    return Optional.empty();
×
264
                }
265
                readResult = tailer.readBytes(internalReadMessage);
1✔
266
                if (!readResult) {
1✔
267
                    return Optional.empty();
×
268
                }
269
                QueueMessage queueMessage = toQueueMessage(internalReadMessage, tailer.lastReadIndex());
1✔
270
                if (Objects.equals(messageKey, queueMessage.getMessageKey())) {
1✔
271
                    return Optional.of(queueMessage);
1✔
272
                }
273
            }
×
274
        }
1✔
275
    }
276

277
    private Set<String> getMatchTags(String selectorTag) {
278
        logDebug("[getMatchTags] start, selectorTag: {}", selectorTag);
1✔
279
        ConcurrentHashMap.KeySetView<String, Boolean> mySet = ConcurrentHashMap.newKeySet();
1✔
280
        if (selectorTag == null || selectorTag.isEmpty()) {
1✔
281
            return mySet;
×
282
        }
283

284
        String[] tags = selectorTag.split("\\|\\|");
1✔
285
        mySet.addAll(Arrays.asList(tags));
1✔
286
        return mySet;
1✔
287
    }
288

289
    private QueueMessage toQueueMessage(final InternalReadMessage internalReadMessage, final long position) {
290
        return new QueueMessage(
1✔
291
                internalReadMessage.getTag(),
1✔
292
                internalReadMessage.getMessageKey(),
1✔
293
                positionVersion.get(),
1✔
294
                position,
295
                internalReadMessage.getContent(),
1✔
296
                internalReadMessage.getWriteTime());
1✔
297
    }
298

299
    private boolean moveToPositionInternal(final long position) {
300
        return CompletableFuture.supplyAsync(() -> {
1✔
301
            synchronized (closeLocker) {
1✔
302
                try {
303
                    if (isClosing.get()) {
1✔
304
                        logDebug("[moveToPositionInternal] consumer is closing");
×
305
                        return false;
×
306
                    }
307
                    logDebug("[moveToPositionInternal] start, position: {}", position);
1✔
308
                    boolean moveToResult = mainTailer.moveToIndex(position);
1✔
309
                    if (moveToResult) {
1✔
310
                        positionVersion.incrementAndGet();
1✔
311
                        messageCache.clear();
1✔
312
                        ackedReadPosition.set(position);
1✔
313
                    }
314
                    logger.info("[local-queue] move to position: {}, result: {}", position, moveToResult);
1✔
315
                    return moveToResult;
1✔
316
                } finally {
317
                    logDebug("[moveToPositionInternal] end");
1✔
318
                }
×
319
            }
×
320
        }, this.readCacheExecutor).join();
1✔
321
    }
322

323

324
    @Override
325
    public Optional<Long> findPosition(final long timestamp) {
326
        logDebug("[findPosition] start, timestamp: {}", timestamp);
1✔
327
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
328
            moveToNearByTimestamp(tailer, timestamp);
1✔
329
            // reuse this message.
330
            InternalReadMessage internalReadMessage = new InternalReadMessage(true);
1✔
331
            while (true) {
332
                boolean resultResult = tailer.readBytes(internalReadMessage);
1✔
333
                if (resultResult) {
1✔
334
                    if (internalReadMessage.getWriteTime() >= timestamp) {
1✔
335
                        return Optional.of(tailer.lastReadIndex());
1✔
336
                    }
337
                } else {
338
                    return Optional.empty();
1✔
339
                }
340
            }
1✔
341
        } finally {
1✔
342
            logDebug("[findPosition] end");
1✔
343
        }
×
344
    }
345

346
    public long getAckedReadPosition() {
347
        return ackedReadPosition.get();
1✔
348
    }
349

350
    @Override
351
    public boolean isClosed() {
352
        return isClosed.get();
1✔
353
    }
354

355
    private void stopReadToCache() {
356
        isReadToCacheRunning.set(false);
1✔
357
    }
1✔
358

359
    private void startReadToCache() {
360
        this.isReadToCacheRunning.set(true);
1✔
361
        readCacheExecutor.execute(this::readToCache);
1✔
362
    }
1✔
363

364
    private void readToCache() {
365
        try {
366
            logDebug("[readToCache] start");
1✔
367
            long pullInterval = config.getPullInterval();
1✔
368
            long fillCacheInterval = config.getFillCacheInterval();
1✔
369
            // reuse this message.
370
            InternalReadMessage internalReadMessage = new InternalReadMessage(this.matchTags);
1✔
371
            while (isReadToCacheRunning.get()) {
1✔
372
                synchronized (closeLocker) {
1✔
373
                    try {
374
                        if (isClosing.get()) {
1✔
UNCOV
375
                            logDebug("[readToCache] consumer is closing");
×
UNCOV
376
                            return;
×
377
                        }
378

379
                        boolean readResult = mainTailer.readBytes(internalReadMessage);
1✔
380
                        if (!readResult) {
1✔
381
                            TimeUnit.MILLISECONDS.sleep(pullInterval);
1✔
382
                            continue;
1✔
383
                        }
384
                        String messageTag = internalReadMessage.getTag() == null ? "*" : internalReadMessage.getTag();
1✔
385
                        if (matchTags.contains("*") || matchTags.contains(messageTag)) {
1✔
386
                            long lastedReadIndex = mainTailer.lastReadIndex();
1✔
387
                            QueueMessage queueMessage = toQueueMessage(internalReadMessage, lastedReadIndex);
1✔
388
                            boolean offerResult = this.messageCache.offer(queueMessage, fillCacheInterval, TimeUnit.MILLISECONDS);
1✔
389
                            if (!offerResult) {
1✔
390
                                // if offer failed, move to last read position
391
                                mainTailer.moveToIndex(lastedReadIndex);
1✔
392
                            }
393
                        }
394
                    } catch (InterruptedException e) {
×
395
                        Thread.currentThread().interrupt();
×
396
                    } catch (Exception e) {
×
397
                        logger.error("[local-queue] read to cache error", e);
×
398
                    }
1✔
399
                }
1✔
400
            }
401
        } finally {
402
            logDebug("[readToCache] end");
1✔
403
        }
1✔
404
    }
1✔
405

406
    private ExcerptTailer initMainTailer() {
407
        return CompletableFuture.supplyAsync(this::initMainTailerInternal, this.readCacheExecutor).join();
1✔
408
    }
409

410
    private ExcerptTailer initMainTailerInternal() {
411
        try {
412
            logDebug("[initExcerptTailerInternal] start");
1✔
413
            ExcerptTailer tailer = queue.createTailer();
1✔
414
            Optional<Long> lastPositionOptional = getLastPosition();
1✔
415
            if (lastPositionOptional.isPresent()) {
1✔
416
                Long position = lastPositionOptional.get();
1✔
417
                long beginPosition = position + 1;
1✔
418
                tailer.moveToIndex(beginPosition);
1✔
419
                logDebug("[initExcerptTailerInternal] find last position and move to position: {}", beginPosition);
1✔
420
            } else {
1✔
421
                ConsumeFromWhere consumeFromWhere = this.config.getConsumeFromWhere();
1✔
422
                if (consumeFromWhere == ConsumeFromWhere.LAST) {
1✔
423
                    tailer.toEnd();
1✔
424
                    logDebug("[initExcerptTailerInternal] move to end");
1✔
425
                } else if (consumeFromWhere == ConsumeFromWhere.FIRST) {
1✔
426
                    tailer.toStart();
1✔
427
                    logDebug("[initExcerptTailerInternal] move to start");
1✔
428
                }
429
            }
430
            return tailer;
1✔
431
        } finally {
432
            logDebug("[initExcerptTailer] end");
1✔
433
        }
×
434

435
    }
436

437
    /// region position
438

439
    private void flushPosition() {
440
        try {
441
            if (ackedReadPosition.get() != -1) {
1✔
442
                setLastPosition(this.ackedReadPosition.get());
1✔
443
            }
444
        } catch (Exception e) {
×
445
            logger.error("flushPosition Exception", e);
×
446
        }
1✔
447
    }
1✔
448

449
    private Optional<Long> getLastPosition() {
450
        return positionStore.get(config.getConsumerId());
1✔
451
    }
452

453
    private void setLastPosition(long position) {
454
        positionStore.put(config.getConsumerId(), position);
1✔
455
    }
1✔
456

457
    /// endregion
458

459
    @SuppressWarnings("Duplicates")
460
    @Override
461
    public void close() {
462
        logDebug("[close] start");
1✔
463
        if (isClosing.get()) {
1✔
464
            logDebug("[close] is closing");
1✔
465
            return;
1✔
466
        }
467
        isClosing.set(true);
1✔
468
        stopReadToCache();
1✔
469
        synchronized (closeLocker) {
1✔
470
            try {
471
                if (!positionStore.isClosed()) {
1✔
472
                    positionStore.close();
1✔
473
                }
474
                scheduler.shutdown();
1✔
475
                readCacheExecutor.shutdown();
1✔
476
                try {
477
                    if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
478
                        scheduler.shutdownNow();
×
479
                    }
480
                    if (!readCacheExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
UNCOV
481
                        readCacheExecutor.shutdownNow();
×
482
                    }
483
                } catch (InterruptedException e) {
×
484
                    scheduler.shutdownNow();
×
485
                    readCacheExecutor.shutdownNow();
×
486
                    Thread.currentThread().interrupt();
×
487
                }
1✔
488
                if (!queue.isClosed()) {
1✔
489
                    queue.close();
1✔
490
                }
491

492
                for (CloseListener closeListener : closeListenerList) {
1✔
493
                    closeListener.onClose();
1✔
494
                }
1✔
495
                isClosed.set(true);
1✔
496
            } finally {
497
                logDebug("[close] end");
1✔
498
            }
1✔
499
        }
1✔
500
    }
1✔
501

502
    private void moveToNearByTimestamp(ExcerptTailer tailer, long timestamp) {
503
        int expectedCycle = ChronicleQueueHelper.cycle(defaultRollCycle, timeProvider, timestamp);
1✔
504
        int currentCycle = tailer.cycle();
1✔
505
        if (currentCycle != expectedCycle) {
1✔
506
            boolean moveToCycleResult = tailer.moveToCycle(expectedCycle);
1✔
507
            logDebug("[moveToNearByTimestamp] moveToCycleResult: {}", moveToCycleResult);
1✔
508
        }
509
    }
1✔
510

511
    @Override
512
    public void addCloseListener(CloseListener listener) {
513
        closeListenerList.add(listener);
1✔
514
    }
1✔
515

516
    // region page
517

518
    @Override
519
    public PageInfo<QueueMessage> getPage(SortDirection sortDirection, int pageSize) {
520
        return getPage(-1, sortDirection, pageSize);
1✔
521
    }
522

523
    @SuppressWarnings("Duplicates")
524
    @Override
525
    public PageInfo<QueueMessage> getPage(long moveToPosition, SortDirection sortDirection, int pageSize) {
526
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
527
            if (moveToPosition != -1) {
1✔
528
                tailer.moveToIndex(moveToPosition);
1✔
529
            }
530
            if (sortDirection == SortDirection.DESC) {
1✔
531
                tailer.toEnd();
1✔
532
                tailer.direction(TailerDirection.BACKWARD);
1✔
533
            }
534
            List<QueueMessage> data = new ArrayList<>();
1✔
535
            long start = -1;
1✔
536
            long end = -1;
1✔
537
            // reuse this message.
538
            InternalReadMessage internalReadMessage = new InternalReadMessage();
1✔
539
            for (int i = 0; i < pageSize; i++) {
1✔
540
                boolean readResult = tailer.readBytes(internalReadMessage);
1✔
541
                if (!readResult) {
1✔
542
                    break;
1✔
543
                }
544
                QueueMessage queueMessage = toQueueMessage(internalReadMessage, tailer.lastReadIndex());
1✔
545
                data.add(queueMessage);
1✔
546
                if (i == 0) {
1✔
547
                    start = tailer.lastReadIndex();
1✔
548
                }
549
                end = tailer.lastReadIndex();
1✔
550
            }
551
            return new PageInfo<>(start, end, data, sortDirection, pageSize);
1✔
552
        }
1✔
553
    }
554

555
    @SuppressWarnings("Duplicates")
556
    @Override
557
    public PageInfo<QueueMessage> getPage(PageInfo<QueueMessage> prevPageInfo, UpDown upDown) {
558
        SortDirection sortDirection = prevPageInfo.getSortDirection();
1✔
559
        int pageSize = prevPageInfo.getPageSize();
1✔
560
        long start = prevPageInfo.getStart();
1✔
561
        long end = prevPageInfo.getEnd();
1✔
562
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
563
            TailerDirection tailerDirection = getTailerDirection(sortDirection, upDown);
1✔
564
            tailer.direction(tailerDirection);
1✔
565
            if (sortDirection == SortDirection.DESC) {
1✔
566
                if (upDown == UpDown.DOWN) {
1✔
567
                    tailer.moveToIndex(end - 1);
1✔
568
                } else {
569
                    tailer.moveToIndex(start + 1);
1✔
570
                }
571
            } else {
572
                if (upDown == UpDown.DOWN) {
1✔
573
                    tailer.moveToIndex(end + 1);
1✔
574
                } else {
575
                    tailer.moveToIndex(start - 1);
1✔
576
                }
577
            }
578
            List<QueueMessage> data = new ArrayList<>();
1✔
579
            // reuse this message.
580
            InternalReadMessage internalReadMessage = new InternalReadMessage();
1✔
581
            for (int i = 0; i < pageSize; i++) {
1✔
582
                boolean readResult = tailer.readBytes(internalReadMessage);
1✔
583
                if (!readResult) {
1✔
584
                    break;
×
585
                }
586
                QueueMessage queueMessage = toQueueMessage(internalReadMessage, tailer.lastReadIndex());
1✔
587
                data.add(queueMessage);
1✔
588
                if (i == 0) {
1✔
589
                    start = tailer.lastReadIndex();
1✔
590
                }
591
                end = tailer.lastReadIndex();
1✔
592
            }
593
            if (upDown == UpDown.UP) {
1✔
594
                Collections.reverse(data);
1✔
595
            }
596
            return new PageInfo<>(start, end, data, sortDirection, pageSize);
1✔
597
        }
1✔
598
    }
599

600
    private TailerDirection getTailerDirection(SortDirection sortDirection, UpDown upDown) {
601
        if (sortDirection == SortDirection.DESC && upDown == UpDown.DOWN) {
1✔
602
            return TailerDirection.BACKWARD;
1✔
603
        }
604
        if (sortDirection == SortDirection.DESC && upDown == UpDown.UP) {
1✔
605
            return TailerDirection.FORWARD;
1✔
606
        }
607
        if (sortDirection == SortDirection.ASC && upDown == UpDown.DOWN) {
1✔
608
            return TailerDirection.FORWARD;
1✔
609
        }
610
        if (sortDirection == SortDirection.ASC && upDown == UpDown.UP) {
1✔
611
            return TailerDirection.BACKWARD;
1✔
612
        }
613
        return TailerDirection.FORWARD;
×
614
    }
615

616
    // endregion
617

618
    // region logger
619

620
    private void logDebug(String format) {
621
        if (logger.isDebugEnabled()) {
1✔
622
            logger.debug(format);
×
623
        }
624
    }
1✔
625

626
    private void logDebug(String format, Object arg) {
627
        if (logger.isDebugEnabled()) {
1✔
628
            logger.debug(format, arg);
×
629
        }
630
    }
1✔
631

632
    // endregion
633
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc