• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #95

06 Jul 2025 07:50AM UTC coverage: 89.302% (+3.7%) from 85.65%
#95

push

wz2cool
fix null reference issue

19 of 19 new or added lines in 2 files covered. (100.0%)

3 existing lines in 1 file now uncovered.

793 of 888 relevant lines covered (89.3%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.41
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleConsumer.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IConsumer;
4
import com.github.wz2cool.localqueue.event.CloseListener;
5
import com.github.wz2cool.localqueue.helper.ChronicleQueueHelper;
6
import com.github.wz2cool.localqueue.model.config.SimpleConsumerConfig;
7
import com.github.wz2cool.localqueue.model.enums.ConsumeFromWhere;
8
import com.github.wz2cool.localqueue.model.message.QueueMessage;
9
import com.github.wz2cool.localqueue.model.message.internal.InternalMessage;
10
import com.github.wz2cool.localqueue.model.page.PageInfo;
11
import com.github.wz2cool.localqueue.model.page.SortDirection;
12
import com.github.wz2cool.localqueue.model.page.UpDown;
13

14
import java.util.ArrayList;
15
import java.util.Arrays;
16
import java.util.Collections;
17
import java.util.List;
18
import java.util.Objects;
19
import java.util.Optional;
20
import java.util.Set;
21
import java.util.concurrent.CompletableFuture;
22
import java.util.concurrent.ConcurrentHashMap;
23
import java.util.concurrent.ConcurrentLinkedQueue;
24
import java.util.concurrent.ExecutorService;
25
import java.util.concurrent.Executors;
26
import java.util.concurrent.LinkedBlockingQueue;
27
import java.util.concurrent.ScheduledExecutorService;
28
import java.util.concurrent.TimeUnit;
29
import java.util.concurrent.atomic.AtomicBoolean;
30
import java.util.concurrent.atomic.AtomicInteger;
31
import java.util.concurrent.atomic.AtomicLong;
32

33
import net.openhft.chronicle.core.time.TimeProvider;
34
import net.openhft.chronicle.queue.ChronicleQueue;
35
import net.openhft.chronicle.queue.ExcerptTailer;
36
import net.openhft.chronicle.queue.RollCycle;
37
import net.openhft.chronicle.queue.TailerDirection;
38
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
39
import org.slf4j.Logger;
40
import org.slf4j.LoggerFactory;
41

42
/**
43
 * simple consumer
44
 *
45
 * @author frank
46
 */
47
public class SimpleConsumer implements IConsumer {
48

49
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
1✔
50
    private final RollCycle defaultRollCycle;
51
    private final TimeProvider timeProvider;
52
    private final Set<String> matchTags;
53
    private final SimpleConsumerConfig config;
54
    private final PositionStore positionStore;
55
    private final SingleChronicleQueue queue;
56
    // should only call by readCacheExecutor
57
    private final ExcerptTailer mainTailer;
58
    private final ExecutorService readCacheExecutor = Executors.newSingleThreadExecutor();
1✔
59
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1✔
60
    private final LinkedBlockingQueue<QueueMessage> messageCache;
61
    private final ConcurrentLinkedQueue<CloseListener> closeListenerList = new ConcurrentLinkedQueue<>();
1✔
62
    private final AtomicLong ackedReadPosition = new AtomicLong(-1);
1✔
63
    private final AtomicBoolean isReadToCacheRunning = new AtomicBoolean(true);
1✔
64
    private final AtomicBoolean isClosing = new AtomicBoolean(false);
1✔
65
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
1✔
66
    private final Object closeLocker = new Object();
1✔
67
    private final AtomicInteger positionVersion = new AtomicInteger(0);
1✔
68

69
    /**
70
     * constructor
71
     *
72
     * @param config the config of consumer
73
     */
74
    public SimpleConsumer(final SimpleConsumerConfig config) {
1✔
75
        this.config = config;
1✔
76
        this.matchTags = getMatchTags(config.getSelectorTag());
1✔
77
        this.timeProvider = ChronicleQueueHelper.getTimeProvider(config.getTimeZone());
1✔
78
        this.messageCache = new LinkedBlockingQueue<>(config.getCacheSize());
1✔
79
        this.positionStore = new PositionStore(config.getPositionFile());
1✔
80
        this.defaultRollCycle = ChronicleQueueHelper.getRollCycle(config.getRollCycleType());
1✔
81
        this.queue = ChronicleQueue.singleBuilder(config.getDataDir())
1✔
82
                .timeProvider(timeProvider)
1✔
83
                .rollCycle(defaultRollCycle)
1✔
84
                .build();
1✔
85
        this.mainTailer = initMainTailer();
1✔
86
        startReadToCache();
1✔
87
        scheduler.scheduleAtFixedRate(this::flushPosition, 0, config.getFlushPositionInterval(), TimeUnit.MILLISECONDS);
1✔
88
    }
1✔
89

90

91
    private final List<QueueMessage> pendingMessages = Collections.synchronizedList(new ArrayList<>());
1✔
92

93
    @Override
94
    public synchronized QueueMessage take() throws InterruptedException {
95
        if (!pendingMessages.isEmpty()) {
1✔
96
            return pendingMessages.get(0);
×
97
        }
98
        QueueMessage message = this.messageCache.take();
1✔
99
        pendingMessages.add(message);
1✔
100
        return message;
1✔
101
    }
102

103
    @Override
104
    public synchronized List<QueueMessage> batchTake(int maxBatchSize) throws InterruptedException {
105
        if (!pendingMessages.isEmpty()) {
1✔
106
            return pendingMessages.subList(0, Math.min(maxBatchSize, pendingMessages.size()));
1✔
107
        }
108
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
109
        QueueMessage take = this.messageCache.take();
1✔
110
        result.add(take);
1✔
111
        this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
112
        pendingMessages.addAll(result);
1✔
113
        return result;
1✔
114
    }
115

116
    @Override
117
    public synchronized Optional<QueueMessage> take(long timeout, TimeUnit unit) throws InterruptedException {
118
        if (!pendingMessages.isEmpty()) {
1✔
119
            return Optional.of(pendingMessages.get(0));
×
120
        }
121
        QueueMessage message = this.messageCache.poll(timeout, unit);
1✔
122
        if (Objects.nonNull(message)) {
1✔
123
            pendingMessages.add(message);
1✔
124
        }
125
        return Optional.ofNullable(message);
1✔
126
    }
127

128
    @Override
129
    public synchronized List<QueueMessage> batchTake(int maxBatchSize, long timeout, TimeUnit unit) throws InterruptedException {
130
        if (!pendingMessages.isEmpty()) {
1✔
131
            return pendingMessages.subList(0, Math.min(maxBatchSize, pendingMessages.size()));
×
132
        }
133
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
134
        QueueMessage poll = this.messageCache.poll(timeout, unit);
1✔
135
        if (Objects.nonNull(poll)) {
1✔
136
            result.add(poll);
1✔
137
            this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
138
            pendingMessages.addAll(result);
1✔
139
        }
140
        return result;
1✔
141
    }
142

143
    @Override
144
    public synchronized Optional<QueueMessage> poll() {
145
        if (!pendingMessages.isEmpty()) {
1✔
146
            return Optional.of(pendingMessages.get(0));
×
147
        }
148
        QueueMessage message = this.messageCache.poll();
1✔
149
        if (Objects.nonNull(message)) {
1✔
150
            pendingMessages.add(message);
1✔
151
        }
152
        return Optional.ofNullable(message);
1✔
153
    }
154

155
    @Override
156
    public synchronized List<QueueMessage> batchPoll(int maxBatchSize) {
157
        if (!pendingMessages.isEmpty()) {
1✔
158
            return pendingMessages.subList(0, Math.min(maxBatchSize, pendingMessages.size()));
×
159
        }
160
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
161
        this.messageCache.drainTo(result, maxBatchSize);
1✔
162
        pendingMessages.addAll(result);
1✔
163
        return result;
1✔
164
    }
165

166
    @Override
167
    public synchronized void ack(final QueueMessage message) {
168
        if (Objects.isNull(message)) {
1✔
169
            return;
1✔
170
        }
171

172
        if (message.getPositionVersion() != positionVersion.get()) {
1✔
173
            return;
×
174
        }
175
        ackedReadPosition.set(message.getPosition());
1✔
176
        pendingMessages.remove(message);
1✔
177
    }
1✔
178

179
    @Override
180
    public synchronized void ack(final List<QueueMessage> messages) {
181
        if (Objects.isNull(messages) || messages.isEmpty()) {
1✔
182
            return;
1✔
183
        }
184
        QueueMessage lastOne = messages.get(messages.size() - 1);
1✔
185
        if (lastOne.getPositionVersion() != positionVersion.get()) {
1✔
186
            return;
×
187
        }
188
        ackedReadPosition.set(lastOne.getPosition());
1✔
189
        pendingMessages.removeAll(messages);
1✔
190
    }
1✔
191

192
    @Override
193
    public boolean moveToPosition(final long position) {
194
        logDebug("[moveToPosition] start");
1✔
195
        stopReadToCache();
1✔
196
        try {
197
            return moveToPositionInternal(position);
1✔
198
        } finally {
199
            startReadToCache();
1✔
200
            logDebug("[moveToPosition] end");
1✔
201
        }
×
202
    }
203

204
    @Override
205
    public boolean moveToTimestamp(final long timestamp) {
206
        logDebug("[moveToTimestamp] start, timestamp: {}", timestamp);
1✔
207
        stopReadToCache();
1✔
208
        try {
209
            Optional<Long> positionOptional = findPosition(timestamp);
1✔
210
            if (!positionOptional.isPresent()) {
1✔
211
                return false;
1✔
212
            }
213
            Long position = positionOptional.get();
1✔
214
            boolean moveToResult = moveToPositionInternal(position);
1✔
215
            logger.info("[moveToTimestamp] timestamp: {}, moveToResult: {}", timestamp, moveToResult);
1✔
216
            return moveToResult;
1✔
217
        } finally {
218
            startReadToCache();
1✔
219
            logDebug("[moveToTimestamp] end");
1✔
220
        }
×
221
    }
222

223
    @Override
224
    public Optional<QueueMessage> get(final long position) {
225
        if (position < 0) {
1✔
226
            return Optional.empty();
×
227
        }
228
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
229
            tailer.moveToIndex(position);
1✔
230
            InternalMessage internalMessage = new InternalMessage();
1✔
231
            boolean readResult = tailer.readBytes(internalMessage);
1✔
232
            if (readResult) {
1✔
233
                return Optional.of(toQueueMessage(internalMessage, position));
1✔
234
            } else {
235
                return Optional.empty();
×
236
            }
237
        }
1✔
238
    }
239

240
    @Override
241
    public Optional<QueueMessage> get(final String messageKey, long searchTimestampStart, long searchTimestampEnd) {
242
        if (messageKey == null || messageKey.isEmpty()) {
1✔
243
            return Optional.empty();
1✔
244
        }
245
        // reuse this message
246
        InternalMessage internalMessage = new InternalMessage();
1✔
247
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
248
            moveToNearByTimestamp(tailer, searchTimestampStart);
1✔
249
            while (true) {
250
                // for performance, ignore read content.
251
                boolean readResult = tailer.readBytes(internalMessage);
1✔
252
                if (!readResult) {
1✔
253
                    return Optional.empty();
1✔
254
                }
255
                if (internalMessage.getWriteTime() < searchTimestampStart) {
1✔
256
                    continue;
1✔
257
                }
258
                if (internalMessage.getWriteTime() > searchTimestampEnd) {
1✔
259
                    return Optional.empty();
×
260
                }
261
                boolean moveToResult = tailer.moveToIndex(tailer.lastReadIndex());
1✔
262
                if (!moveToResult) {
1✔
263
                    return Optional.empty();
×
264
                }
265
                readResult = tailer.readBytes(internalMessage);
1✔
266
                if (!readResult) {
1✔
267
                    return Optional.empty();
×
268
                }
269
                QueueMessage queueMessage = toQueueMessage(internalMessage, tailer.lastReadIndex());
1✔
270
                if (Objects.equals(messageKey, queueMessage.getMessageKey())) {
1✔
271
                    return Optional.of(queueMessage);
1✔
272
                }
273
            }
×
274
        }
1✔
275
    }
276

277
    private Set<String> getMatchTags(String selectorTag) {
278
        logDebug("[getMatchTags] start, selectorTag: {}", selectorTag);
1✔
279
        ConcurrentHashMap.KeySetView<String, Boolean> mySet = ConcurrentHashMap.newKeySet();
1✔
280
        if (selectorTag == null || selectorTag.isEmpty()) {
1✔
281
            return mySet;
×
282
        }
283

284
        String[] tags = selectorTag.split("\\|\\|");
1✔
285
        mySet.addAll(Arrays.asList(tags));
1✔
286
        return mySet;
1✔
287
    }
288

289
    private QueueMessage toQueueMessage(final InternalMessage internalMessage, final long position) {
290
        return new QueueMessage(
1✔
291
                internalMessage.getTag(),
1✔
292
                internalMessage.getMessageKey(),
1✔
293
                positionVersion.get(),
1✔
294
                position,
295
                internalMessage.getContent(),
1✔
296
                internalMessage.getWriteTime(),
1✔
297
                internalMessage.getHeaderMessage());
1✔
298
    }
299

300
    private boolean moveToPositionInternal(final long position) {
301
        return CompletableFuture.supplyAsync(() -> {
1✔
302
            synchronized (closeLocker) {
1✔
303
                try {
304
                    if (isClosing.get()) {
1✔
305
                        logDebug("[moveToPositionInternal] consumer is closing");
×
306
                        return false;
×
307
                    }
308
                    logDebug("[moveToPositionInternal] start, position: {}", position);
1✔
309
                    boolean moveToResult = mainTailer.moveToIndex(position);
1✔
310
                    if (moveToResult) {
1✔
311
                        positionVersion.incrementAndGet();
1✔
312
                        messageCache.clear();
1✔
313
                        ackedReadPosition.set(position);
1✔
314
                    }
315
                    logger.info("[local-queue] move to position: {}, result: {}", position, moveToResult);
1✔
316
                    return moveToResult;
1✔
317
                } finally {
318
                    logDebug("[moveToPositionInternal] end");
1✔
319
                }
×
320
            }
×
321
        }, this.readCacheExecutor).join();
1✔
322
    }
323

324

325
    @Override
326
    public Optional<Long> findPosition(final long timestamp) {
327
        logDebug("[findPosition] start, timestamp: {}", timestamp);
1✔
328
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
329
            moveToNearByTimestamp(tailer, timestamp);
1✔
330
            // reuse this message.
331
            InternalMessage internalMessage = new InternalMessage(true);
1✔
332
            while (true) {
333
                boolean resultResult = tailer.readBytes(internalMessage);
1✔
334
                if (resultResult) {
1✔
335
                    if (internalMessage.getWriteTime() >= timestamp) {
1✔
336
                        return Optional.of(tailer.lastReadIndex());
1✔
337
                    }
338
                } else {
339
                    return Optional.empty();
1✔
340
                }
341
            }
1✔
342
        } finally {
1✔
343
            logDebug("[findPosition] end");
1✔
344
        }
×
345
    }
346

347
    public long getAckedReadPosition() {
348
        return ackedReadPosition.get();
1✔
349
    }
350

351
    @Override
352
    public boolean isClosed() {
353
        return isClosed.get();
1✔
354
    }
355

356
    private void stopReadToCache() {
357
        isReadToCacheRunning.set(false);
1✔
358
    }
1✔
359

360
    private void startReadToCache() {
361
        this.isReadToCacheRunning.set(true);
1✔
362
        readCacheExecutor.execute(this::readToCache);
1✔
363
    }
1✔
364

365
    private void readToCache() {
366
        try {
367
            logDebug("[readToCache] start");
1✔
368
            long pullInterval = config.getPullInterval();
1✔
369
            long fillCacheInterval = config.getFillCacheInterval();
1✔
370
            // reuse this message.
371
            InternalMessage internalMessage = new InternalMessage(this.matchTags);
1✔
372
            while (isReadToCacheRunning.get()) {
1✔
373
                synchronized (closeLocker) {
1✔
374
                    try {
375
                        if (isClosing.get()) {
1✔
UNCOV
376
                            logDebug("[readToCache] consumer is closing");
×
UNCOV
377
                            return;
×
378
                        }
379

380
                        boolean readResult = mainTailer.readBytes(internalMessage);
1✔
381
                        if (!readResult) {
1✔
382
                            TimeUnit.MILLISECONDS.sleep(pullInterval);
1✔
383
                            continue;
1✔
384
                        }
385
                        String messageTag = internalMessage.getTag() == null ? "*" : internalMessage.getTag();
1✔
386
                        if (matchTags.contains("*") || matchTags.contains(messageTag)) {
1✔
387
                            long lastedReadIndex = mainTailer.lastReadIndex();
1✔
388
                            QueueMessage queueMessage = toQueueMessage(internalMessage, lastedReadIndex);
1✔
389
                            boolean offerResult = this.messageCache.offer(queueMessage, fillCacheInterval, TimeUnit.MILLISECONDS);
1✔
390
                            if (!offerResult) {
1✔
391
                                // if offer failed, move to last read position
392
                                mainTailer.moveToIndex(lastedReadIndex);
1✔
393
                            }
394
                        }
395
                    } catch (InterruptedException e) {
×
396
                        Thread.currentThread().interrupt();
×
397
                    } catch (Exception e) {
×
398
                        logger.error("[local-queue] read to cache error", e);
×
399
                    }
1✔
400
                }
1✔
401
            }
402
        } finally {
403
            logDebug("[readToCache] end");
1✔
404
        }
1✔
405
    }
1✔
406

407
    private ExcerptTailer initMainTailer() {
408
        return CompletableFuture.supplyAsync(this::initMainTailerInternal, this.readCacheExecutor).join();
1✔
409
    }
410

411
    private ExcerptTailer initMainTailerInternal() {
412
        try {
413
            logDebug("[initExcerptTailerInternal] start");
1✔
414
            ExcerptTailer tailer = queue.createTailer();
1✔
415
            Optional<Long> lastPositionOptional = getLastPosition();
1✔
416
            if (lastPositionOptional.isPresent()) {
1✔
417
                Long position = lastPositionOptional.get();
1✔
418
                long beginPosition = position + 1;
1✔
419
                tailer.moveToIndex(beginPosition);
1✔
420
                logDebug("[initExcerptTailerInternal] find last position and move to position: {}", beginPosition);
1✔
421
            } else {
1✔
422
                ConsumeFromWhere consumeFromWhere = this.config.getConsumeFromWhere();
1✔
423
                if (consumeFromWhere == ConsumeFromWhere.LAST) {
1✔
424
                    tailer.toEnd();
1✔
425
                    logDebug("[initExcerptTailerInternal] move to end");
1✔
426
                } else if (consumeFromWhere == ConsumeFromWhere.FIRST) {
1✔
427
                    tailer.toStart();
1✔
428
                    logDebug("[initExcerptTailerInternal] move to start");
1✔
429
                }
430
            }
431
            return tailer;
1✔
432
        } finally {
433
            logDebug("[initExcerptTailer] end");
1✔
434
        }
×
435

436
    }
437

438
    /// region position
439

440
    private void flushPosition() {
441
        try {
442
            if (ackedReadPosition.get() != -1) {
1✔
443
                setLastPosition(this.ackedReadPosition.get());
1✔
444
            }
445
        } catch (Exception e) {
×
446
            logger.error("flushPosition Exception", e);
×
447
        }
1✔
448
    }
1✔
449

450
    private Optional<Long> getLastPosition() {
451
        return positionStore.get(config.getConsumerId());
1✔
452
    }
453

454
    private void setLastPosition(long position) {
455
        positionStore.put(config.getConsumerId(), position);
1✔
456
    }
1✔
457

458
    /// endregion
459

460
    @SuppressWarnings("Duplicates")
461
    @Override
462
    public void close() {
463
        logDebug("[close] start");
1✔
464
        if (isClosing.get()) {
1✔
465
            logDebug("[close] is closing");
1✔
466
            return;
1✔
467
        }
468
        isClosing.set(true);
1✔
469
        stopReadToCache();
1✔
470
        synchronized (closeLocker) {
1✔
471
            try {
472
                if (!positionStore.isClosed()) {
1✔
473
                    positionStore.close();
1✔
474
                }
475
                scheduler.shutdown();
1✔
476
                readCacheExecutor.shutdown();
1✔
477
                try {
478
                    if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
479
                        scheduler.shutdownNow();
×
480
                    }
481
                    if (!readCacheExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
UNCOV
482
                        readCacheExecutor.shutdownNow();
×
483
                    }
484
                } catch (InterruptedException e) {
×
485
                    scheduler.shutdownNow();
×
486
                    readCacheExecutor.shutdownNow();
×
487
                    Thread.currentThread().interrupt();
×
488
                }
1✔
489
                if (!queue.isClosed()) {
1✔
490
                    queue.close();
1✔
491
                }
492

493
                for (CloseListener closeListener : closeListenerList) {
1✔
494
                    closeListener.onClose();
1✔
495
                }
1✔
496
                isClosed.set(true);
1✔
497
            } finally {
498
                logDebug("[close] end");
1✔
499
            }
1✔
500
        }
1✔
501
    }
1✔
502

503
    private void moveToNearByTimestamp(ExcerptTailer tailer, long timestamp) {
504
        int expectedCycle = ChronicleQueueHelper.cycle(defaultRollCycle, timeProvider, timestamp);
1✔
505
        int currentCycle = tailer.cycle();
1✔
506
        if (currentCycle != expectedCycle) {
1✔
507
            boolean moveToCycleResult = tailer.moveToCycle(expectedCycle);
1✔
508
            logDebug("[moveToNearByTimestamp] moveToCycleResult: {}", moveToCycleResult);
1✔
509
        }
510
    }
1✔
511

512
    @Override
513
    public void addCloseListener(CloseListener listener) {
514
        closeListenerList.add(listener);
1✔
515
    }
1✔
516

517
    // region page
518

519
    @Override
520
    public PageInfo<QueueMessage> getPage(SortDirection sortDirection, int pageSize) {
521
        return getPage(-1, sortDirection, pageSize);
1✔
522
    }
523

524
    @SuppressWarnings("Duplicates")
525
    @Override
526
    public PageInfo<QueueMessage> getPage(long moveToPosition, SortDirection sortDirection, int pageSize) {
527
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
528
            if (moveToPosition != -1) {
1✔
529
                tailer.moveToIndex(moveToPosition);
1✔
530
            }
531
            if (sortDirection == SortDirection.DESC) {
1✔
532
                tailer.toEnd();
1✔
533
                tailer.direction(TailerDirection.BACKWARD);
1✔
534
            }
535
            List<QueueMessage> data = new ArrayList<>();
1✔
536
            long start = -1;
1✔
537
            long end = -1;
1✔
538
            // reuse this message.
539
            InternalMessage internalMessage = new InternalMessage();
1✔
540
            for (int i = 0; i < pageSize; i++) {
1✔
541
                boolean readResult = tailer.readBytes(internalMessage);
1✔
542
                if (!readResult) {
1✔
543
                    break;
1✔
544
                }
545
                QueueMessage queueMessage = toQueueMessage(internalMessage, tailer.lastReadIndex());
1✔
546
                data.add(queueMessage);
1✔
547
                if (i == 0) {
1✔
548
                    start = tailer.lastReadIndex();
1✔
549
                }
550
                end = tailer.lastReadIndex();
1✔
551
            }
552
            return new PageInfo<>(start, end, data, sortDirection, pageSize);
1✔
553
        }
1✔
554
    }
555

556
    @SuppressWarnings("Duplicates")
557
    @Override
558
    public PageInfo<QueueMessage> getPage(PageInfo<QueueMessage> prevPageInfo, UpDown upDown) {
559
        SortDirection sortDirection = prevPageInfo.getSortDirection();
1✔
560
        int pageSize = prevPageInfo.getPageSize();
1✔
561
        long start = prevPageInfo.getStart();
1✔
562
        long end = prevPageInfo.getEnd();
1✔
563
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
564
            TailerDirection tailerDirection = getTailerDirection(sortDirection, upDown);
1✔
565
            tailer.direction(tailerDirection);
1✔
566
            if (sortDirection == SortDirection.DESC) {
1✔
567
                if (upDown == UpDown.DOWN) {
1✔
568
                    tailer.moveToIndex(end - 1);
1✔
569
                } else {
570
                    tailer.moveToIndex(start + 1);
1✔
571
                }
572
            } else {
573
                if (upDown == UpDown.DOWN) {
1✔
574
                    tailer.moveToIndex(end + 1);
1✔
575
                } else {
576
                    tailer.moveToIndex(start - 1);
1✔
577
                }
578
            }
579
            List<QueueMessage> data = new ArrayList<>();
1✔
580
            // reuse this message.
581
            InternalMessage internalMessage = new InternalMessage();
1✔
582
            for (int i = 0; i < pageSize; i++) {
1✔
583
                boolean readResult = tailer.readBytes(internalMessage);
1✔
584
                if (!readResult) {
1✔
585
                    break;
×
586
                }
587
                QueueMessage queueMessage = toQueueMessage(internalMessage, tailer.lastReadIndex());
1✔
588
                data.add(queueMessage);
1✔
589
                if (i == 0) {
1✔
590
                    start = tailer.lastReadIndex();
1✔
591
                }
592
                end = tailer.lastReadIndex();
1✔
593
            }
594
            if (upDown == UpDown.UP) {
1✔
595
                Collections.reverse(data);
1✔
596
            }
597
            return new PageInfo<>(start, end, data, sortDirection, pageSize);
1✔
598
        }
1✔
599
    }
600

601
    private TailerDirection getTailerDirection(SortDirection sortDirection, UpDown upDown) {
602
        if (sortDirection == SortDirection.DESC && upDown == UpDown.DOWN) {
1✔
603
            return TailerDirection.BACKWARD;
1✔
604
        }
605
        if (sortDirection == SortDirection.DESC && upDown == UpDown.UP) {
1✔
606
            return TailerDirection.FORWARD;
1✔
607
        }
608
        if (sortDirection == SortDirection.ASC && upDown == UpDown.DOWN) {
1✔
609
            return TailerDirection.FORWARD;
1✔
610
        }
611
        if (sortDirection == SortDirection.ASC && upDown == UpDown.UP) {
1✔
612
            return TailerDirection.BACKWARD;
1✔
613
        }
614
        return TailerDirection.FORWARD;
×
615
    }
616

617
    // endregion
618

619
    // region logger
620

621
    private void logDebug(String format) {
622
        if (logger.isDebugEnabled()) {
1✔
623
            logger.debug(format);
×
624
        }
625
    }
1✔
626

627
    private void logDebug(String format, Object arg) {
628
        if (logger.isDebugEnabled()) {
1✔
629
            logger.debug(format, arg);
×
630
        }
631
    }
1✔
632

633
    // endregion
634
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc