• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #66

04 Feb 2025 08:39AM UTC coverage: 91.822% (-0.5%) from 92.337%
#66

push

web-flow
Merge pull request #10 from wz2cool/0.2.2

0.2.2

20 of 25 new or added lines in 1 file covered. (80.0%)

1 existing line in 1 file now uncovered.

741 of 807 relevant lines covered (91.82%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.18
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleConsumer.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IConsumer;
4
import com.github.wz2cool.localqueue.event.CloseListener;
5
import com.github.wz2cool.localqueue.helper.ChronicleQueueHelper;
6
import com.github.wz2cool.localqueue.model.config.SimpleConsumerConfig;
7
import com.github.wz2cool.localqueue.model.enums.ConsumeFromWhere;
8
import com.github.wz2cool.localqueue.model.message.InternalReadMessage;
9
import com.github.wz2cool.localqueue.model.message.QueueMessage;
10
import com.github.wz2cool.localqueue.model.page.PageInfo;
11
import com.github.wz2cool.localqueue.model.page.SortDirection;
12
import com.github.wz2cool.localqueue.model.page.UpDown;
13
import net.openhft.chronicle.core.time.TimeProvider;
14
import net.openhft.chronicle.queue.ChronicleQueue;
15
import net.openhft.chronicle.queue.ExcerptTailer;
16
import net.openhft.chronicle.queue.RollCycle;
17
import net.openhft.chronicle.queue.TailerDirection;
18
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
19
import org.slf4j.Logger;
20
import org.slf4j.LoggerFactory;
21

22
import java.util.*;
23
import java.util.concurrent.*;
24
import java.util.concurrent.atomic.AtomicBoolean;
25
import java.util.concurrent.atomic.AtomicInteger;
26
import java.util.concurrent.atomic.AtomicLong;
27

28
/**
29
 * simple consumer
30
 *
31
 * @author frank
32
 */
33
public class SimpleConsumer implements IConsumer {
34

35
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
1✔
36
    private final RollCycle defaultRollCycle;
37
    private final TimeProvider timeProvider;
38
    private final Set<String> matchTags;
39
    private final SimpleConsumerConfig config;
40
    private final PositionStore positionStore;
41
    private final SingleChronicleQueue queue;
42
    // should only call by readCacheExecutor
43
    private final ExcerptTailer mainTailer;
44
    private final ExecutorService readCacheExecutor = Executors.newSingleThreadExecutor();
1✔
45
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1✔
46
    private final LinkedBlockingQueue<QueueMessage> messageCache;
47
    private final ConcurrentLinkedQueue<CloseListener> closeListenerList = new ConcurrentLinkedQueue<>();
1✔
48
    private final AtomicLong ackedReadPosition = new AtomicLong(-1);
1✔
49
    private final AtomicBoolean isReadToCacheRunning = new AtomicBoolean(true);
1✔
50
    private final AtomicBoolean isClosing = new AtomicBoolean(false);
1✔
51
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
1✔
52
    private final Object closeLocker = new Object();
1✔
53
    private final AtomicInteger positionVersion = new AtomicInteger(0);
1✔
54

55
    /**
56
     * constructor
57
     *
58
     * @param config the config of consumer
59
     */
60
    public SimpleConsumer(final SimpleConsumerConfig config) {
1✔
61
        this.config = config;
1✔
62
        this.matchTags = getMatchTags(config.getSelectTag());
1✔
63
        this.timeProvider = ChronicleQueueHelper.getTimeProvider(config.getTimeZone());
1✔
64
        this.messageCache = new LinkedBlockingQueue<>(config.getCacheSize());
1✔
65
        this.positionStore = new PositionStore(config.getPositionFile());
1✔
66
        this.defaultRollCycle = ChronicleQueueHelper.getRollCycle(config.getRollCycleType());
1✔
67
        this.queue = ChronicleQueue.singleBuilder(config.getDataDir())
1✔
68
                .timeProvider(timeProvider)
1✔
69
                .rollCycle(defaultRollCycle)
1✔
70
                .build();
1✔
71
        this.mainTailer = initMainTailer();
1✔
72
        startReadToCache();
1✔
73
        scheduler.scheduleAtFixedRate(this::flushPosition, 0, config.getFlushPositionInterval(), TimeUnit.MILLISECONDS);
1✔
74
    }
1✔
75

76

77
    private final List<QueueMessage> pendingMessages = Collections.synchronizedList(new ArrayList<>());
1✔
78

79
    @Override
80
    public synchronized QueueMessage take() throws InterruptedException {
81
        if (!pendingMessages.isEmpty()) {
1✔
NEW
82
            return pendingMessages.get(0);
×
83
        }
84
        QueueMessage message = this.messageCache.take();
1✔
85
        pendingMessages.add(message);
1✔
86
        return message;
1✔
87
    }
88

89
    @Override
90
    public synchronized List<QueueMessage> batchTake(int maxBatchSize) throws InterruptedException {
91
        if (!pendingMessages.isEmpty()) {
1✔
92
            return pendingMessages.subList(0, Math.min(maxBatchSize, pendingMessages.size()));
1✔
93
        }
94
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
95
        QueueMessage take = this.messageCache.take();
1✔
96
        result.add(take);
1✔
97
        this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
98
        pendingMessages.addAll(result);
1✔
99
        return result;
1✔
100
    }
101

102
    @Override
103
    public synchronized Optional<QueueMessage> take(long timeout, TimeUnit unit) throws InterruptedException {
104
        if (!pendingMessages.isEmpty()) {
1✔
NEW
105
            return Optional.of(pendingMessages.get(0));
×
106
        }
107
        QueueMessage message = this.messageCache.poll(timeout, unit);
1✔
108
        if (Objects.nonNull(message)) {
1✔
109
            pendingMessages.add(message);
1✔
110
        }
111
        return Optional.ofNullable(message);
1✔
112
    }
113

114
    @Override
115
    public synchronized List<QueueMessage> batchTake(int maxBatchSize, long timeout, TimeUnit unit) throws InterruptedException {
116
        if (!pendingMessages.isEmpty()) {
1✔
NEW
117
            return pendingMessages.subList(0, Math.min(maxBatchSize, pendingMessages.size()));
×
118
        }
119
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
120
        QueueMessage poll = this.messageCache.poll(timeout, unit);
1✔
121
        if (Objects.nonNull(poll)) {
1✔
122
            result.add(poll);
1✔
123
            this.messageCache.drainTo(result, maxBatchSize - 1);
1✔
124
            pendingMessages.addAll(result);
1✔
125
        }
126
        return result;
1✔
127
    }
128

129
    @Override
130
    public synchronized Optional<QueueMessage> poll() {
131
        if (!pendingMessages.isEmpty()) {
1✔
NEW
132
            return Optional.of(pendingMessages.get(0));
×
133
        }
134
        QueueMessage message = this.messageCache.poll();
1✔
135
        if (Objects.nonNull(message)) {
1✔
136
            pendingMessages.add(message);
1✔
137
        }
138
        return Optional.ofNullable(message);
1✔
139
    }
140

141
    @Override
142
    public synchronized List<QueueMessage> batchPoll(int maxBatchSize) {
143
        if (!pendingMessages.isEmpty()) {
1✔
NEW
144
            return pendingMessages.subList(0, Math.min(maxBatchSize, pendingMessages.size()));
×
145
        }
146
        List<QueueMessage> result = new ArrayList<>(maxBatchSize);
1✔
147
        this.messageCache.drainTo(result, maxBatchSize);
1✔
148
        pendingMessages.addAll(result);
1✔
149
        return result;
1✔
150
    }
151

152
    @Override
153
    public synchronized void ack(final QueueMessage message) {
154
        if (Objects.isNull(message)) {
1✔
155
            return;
1✔
156
        }
157

158
        if (message.getPositionVersion() != positionVersion.get()) {
1✔
159
            return;
×
160
        }
161
        ackedReadPosition.set(message.getPosition());
1✔
162
        pendingMessages.remove(message);
1✔
163
    }
1✔
164

165
    @Override
166
    public synchronized void ack(final List<QueueMessage> messages) {
167
        if (Objects.isNull(messages) || messages.isEmpty()) {
1✔
168
            return;
1✔
169
        }
170
        QueueMessage lastOne = messages.get(messages.size() - 1);
1✔
171
        if (lastOne.getPositionVersion() != positionVersion.get()) {
1✔
172
            return;
×
173
        }
174
        ackedReadPosition.set(lastOne.getPosition());
1✔
175
        pendingMessages.removeAll(messages);
1✔
176
    }
1✔
177

178
    @Override
179
    public boolean moveToPosition(final long position) {
180
        logDebug("[moveToPosition] start");
1✔
181
        stopReadToCache();
1✔
182
        try {
183
            return moveToPositionInternal(position);
1✔
184
        } finally {
185
            startReadToCache();
1✔
186
            logDebug("[moveToPosition] end");
1✔
187
        }
×
188
    }
189

190
    @Override
191
    public boolean moveToTimestamp(final long timestamp) {
192
        logDebug("[moveToTimestamp] start, timestamp: {}", timestamp);
1✔
193
        stopReadToCache();
1✔
194
        try {
195
            Optional<Long> positionOptional = findPosition(timestamp);
1✔
196
            if (!positionOptional.isPresent()) {
1✔
197
                return false;
1✔
198
            }
199
            Long position = positionOptional.get();
1✔
200
            return moveToPositionInternal(position);
1✔
201
        } finally {
202
            startReadToCache();
1✔
203
            logDebug("[moveToTimestamp] end");
1✔
204
        }
×
205
    }
206

207
    @Override
208
    public Optional<QueueMessage> get(final long position) {
209
        if (position < 0) {
1✔
210
            return Optional.empty();
×
211
        }
212
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
213
            tailer.moveToIndex(position);
1✔
214
            InternalReadMessage internalReadMessage = new InternalReadMessage();
1✔
215
            boolean readResult = tailer.readBytes(internalReadMessage);
1✔
216
            if (readResult) {
1✔
217
                return Optional.of(toQueueMessage(internalReadMessage, position));
1✔
218
            } else {
219
                return Optional.empty();
×
220
            }
221
        }
1✔
222
    }
223

224
    @Override
225
    public Optional<QueueMessage> get(final String messageKey, long searchTimestampStart, long searchTimestampEnd) {
226
        if (messageKey == null || messageKey.isEmpty()) {
1✔
227
            return Optional.empty();
1✔
228
        }
229
        // reuse this message
230
        InternalReadMessage internalReadMessage = new InternalReadMessage();
1✔
231
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
232
            moveToNearByTimestamp(tailer, searchTimestampStart);
1✔
233
            while (true) {
234
                // for performance, ignore read content.
235
                boolean readResult = tailer.readBytes(internalReadMessage);
1✔
236
                if (!readResult) {
1✔
237
                    return Optional.empty();
1✔
238
                }
239
                if (internalReadMessage.getWriteTime() < searchTimestampStart) {
1✔
240
                    continue;
1✔
241
                }
242
                if (internalReadMessage.getWriteTime() > searchTimestampEnd) {
1✔
243
                    return Optional.empty();
×
244
                }
245
                boolean moveToResult = tailer.moveToIndex(tailer.lastReadIndex());
1✔
246
                if (!moveToResult) {
1✔
247
                    return Optional.empty();
×
248
                }
249
                readResult = tailer.readBytes(internalReadMessage);
1✔
250
                if (!readResult) {
1✔
251
                    return Optional.empty();
×
252
                }
253
                QueueMessage queueMessage = toQueueMessage(internalReadMessage, tailer.lastReadIndex());
1✔
254
                if (Objects.equals(messageKey, queueMessage.getMessageKey())) {
1✔
255
                    return Optional.of(queueMessage);
1✔
256
                }
257
            }
×
258
        }
1✔
259
    }
260

261
    private Set<String> getMatchTags(String selectTag) {
262
        logDebug("[getMatchTags] start, selectTag: {}", selectTag);
1✔
263
        ConcurrentHashMap.KeySetView<String, Boolean> mySet = ConcurrentHashMap.newKeySet();
1✔
264
        if (selectTag == null || selectTag.isEmpty()) {
1✔
265
            return mySet;
×
266
        }
267

268
        String[] tags = selectTag.split("\\|\\|");
1✔
269
        mySet.addAll(Arrays.asList(tags));
1✔
270
        return mySet;
1✔
271
    }
272

273
    private QueueMessage toQueueMessage(final InternalReadMessage internalReadMessage, final long position) {
274
        return new QueueMessage(
1✔
275
                internalReadMessage.getTag(),
1✔
276
                internalReadMessage.getMessageKey(),
1✔
277
                positionVersion.get(),
1✔
278
                position,
279
                internalReadMessage.getContent(),
1✔
280
                internalReadMessage.getWriteTime());
1✔
281
    }
282

283
    private boolean moveToPositionInternal(final long position) {
284
        return CompletableFuture.supplyAsync(() -> {
1✔
285
            synchronized (closeLocker) {
1✔
286
                try {
287
                    if (isClosing.get()) {
1✔
288
                        logDebug("[moveToPositionInternal] consumer is closing");
×
289
                        return false;
×
290
                    }
291
                    logDebug("[moveToPositionInternal] start, position: {}", position);
1✔
292
                    boolean moveToResult = mainTailer.moveToIndex(position);
1✔
293
                    if (moveToResult) {
1✔
294
                        positionVersion.incrementAndGet();
1✔
295
                        messageCache.clear();
1✔
296
                        ackedReadPosition.set(position);
1✔
297
                    }
298
                    return moveToResult;
1✔
299
                } finally {
300
                    logDebug("[moveToPositionInternal] end");
1✔
301
                }
×
302
            }
×
303
        }, this.readCacheExecutor).join();
1✔
304
    }
305

306

307
    @Override
308
    public Optional<Long> findPosition(final long timestamp) {
309
        logDebug("[findPosition] start, timestamp: {}", timestamp);
1✔
310
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
311
            moveToNearByTimestamp(tailer, timestamp);
1✔
312
            // reuse this message.
313
            InternalReadMessage internalReadMessage = new InternalReadMessage(true);
1✔
314
            while (true) {
315
                boolean resultResult = tailer.readBytes(internalReadMessage);
1✔
316
                if (resultResult) {
1✔
317
                    if (internalReadMessage.getWriteTime() >= timestamp) {
1✔
318
                        return Optional.of(tailer.lastReadIndex());
1✔
319
                    }
320
                } else {
321
                    return Optional.empty();
1✔
322
                }
323
            }
1✔
324
        } finally {
1✔
325
            logDebug("[findPosition] end");
1✔
326
        }
×
327
    }
328

329
    public long getAckedReadPosition() {
330
        return ackedReadPosition.get();
1✔
331
    }
332

333
    @Override
334
    public boolean isClosed() {
335
        return isClosed.get();
1✔
336
    }
337

338
    private void stopReadToCache() {
339
        isReadToCacheRunning.set(false);
1✔
340
    }
1✔
341

342
    private void startReadToCache() {
343
        this.isReadToCacheRunning.set(true);
1✔
344
        readCacheExecutor.execute(this::readToCache);
1✔
345
    }
1✔
346

347
    private void readToCache() {
348
        try {
349
            logDebug("[readToCache] start");
1✔
350
            long pullInterval = config.getPullInterval();
1✔
351
            long fillCacheInterval = config.getFillCacheInterval();
1✔
352
            // reuse this message.
353
            InternalReadMessage internalReadMessage = new InternalReadMessage(this.matchTags);
1✔
354
            while (isReadToCacheRunning.get()) {
1✔
355
                synchronized (closeLocker) {
1✔
356
                    try {
357
                        if (isClosing.get()) {
1✔
358
                            logDebug("[readToCache] consumer is closing");
1✔
359
                            return;
1✔
360
                        }
361

362
                        boolean readResult = mainTailer.readBytes(internalReadMessage);
1✔
363
                        if (!readResult) {
1✔
364
                            TimeUnit.MILLISECONDS.sleep(pullInterval);
1✔
365
                            continue;
1✔
366
                        }
367
                        String messageTag = internalReadMessage.getTag() == null ? "*" : internalReadMessage.getTag();
1✔
368
                        if (matchTags.contains("*") || matchTags.contains(messageTag)) {
1✔
369
                            long lastedReadIndex = mainTailer.lastReadIndex();
1✔
370
                            QueueMessage queueMessage = toQueueMessage(internalReadMessage, lastedReadIndex);
1✔
371
                            boolean offerResult = this.messageCache.offer(queueMessage, fillCacheInterval, TimeUnit.MILLISECONDS);
1✔
372
                            if (!offerResult) {
1✔
373
                                // if offer failed, move to last read position
374
                                mainTailer.moveToIndex(lastedReadIndex);
1✔
375
                            }
376
                        }
377
                    } catch (InterruptedException e) {
×
378
                        Thread.currentThread().interrupt();
×
379
                    }
1✔
380
                }
1✔
381
            }
382
        } finally {
383
            logDebug("[readToCache] end");
1✔
384
        }
1✔
385
    }
1✔
386

387
    private ExcerptTailer initMainTailer() {
388
        return CompletableFuture.supplyAsync(this::initMainTailerInternal, this.readCacheExecutor).join();
1✔
389
    }
390

391
    private ExcerptTailer initMainTailerInternal() {
392
        try {
393
            logDebug("[initExcerptTailerInternal] start");
1✔
394
            ExcerptTailer tailer = queue.createTailer();
1✔
395
            Optional<Long> lastPositionOptional = getLastPosition();
1✔
396
            if (lastPositionOptional.isPresent()) {
1✔
397
                Long position = lastPositionOptional.get();
1✔
398
                long beginPosition = position + 1;
1✔
399
                tailer.moveToIndex(beginPosition);
1✔
400
                logDebug("[initExcerptTailerInternal] find last position and move to position: {}", beginPosition);
1✔
401
            } else {
1✔
402
                ConsumeFromWhere consumeFromWhere = this.config.getConsumeFromWhere();
1✔
403
                if (consumeFromWhere == ConsumeFromWhere.LAST) {
1✔
404
                    tailer.toEnd();
1✔
405
                    logDebug("[initExcerptTailerInternal] move to end");
1✔
406
                } else if (consumeFromWhere == ConsumeFromWhere.FIRST) {
1✔
407
                    tailer.toStart();
1✔
408
                    logDebug("[initExcerptTailerInternal] move to start");
1✔
409
                }
410
            }
411
            return tailer;
1✔
412
        } finally {
413
            logDebug("[initExcerptTailer] end");
1✔
414
        }
×
415

416
    }
417

418
    /// region position
419

420
    private void flushPosition() {
421
        if (ackedReadPosition.get() != -1) {
1✔
422
            setLastPosition(this.ackedReadPosition.get());
1✔
423
        }
424
    }
1✔
425

426
    private Optional<Long> getLastPosition() {
427
        return positionStore.get(config.getConsumerId());
1✔
428
    }
429

430
    private void setLastPosition(long position) {
431
        positionStore.put(config.getConsumerId(), position);
1✔
432
    }
1✔
433

434
    /// endregion
435

436
    @SuppressWarnings("Duplicates")
437
    @Override
438
    public void close() {
439
        synchronized (closeLocker) {
1✔
440
            try {
441
                logDebug("[close] start");
1✔
442
                if (isClosing.get()) {
1✔
443
                    logDebug("[close] is closing");
1✔
444
                    return;
1✔
445
                }
446
                isClosing.set(true);
1✔
447
                stopReadToCache();
1✔
448
                if (!positionStore.isClosed()) {
1✔
449
                    positionStore.close();
1✔
450
                }
451
                scheduler.shutdown();
1✔
452
                readCacheExecutor.shutdown();
1✔
453
                try {
454
                    if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
455
                        scheduler.shutdownNow();
×
456
                    }
457
                    if (!readCacheExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
458
                        readCacheExecutor.shutdownNow();
1✔
459
                    }
460
                } catch (InterruptedException e) {
×
461
                    scheduler.shutdownNow();
×
462
                    readCacheExecutor.shutdownNow();
×
463
                    Thread.currentThread().interrupt();
×
464
                }
1✔
465
                if (!queue.isClosed()) {
1✔
466
                    queue.close();
1✔
467
                }
468

469
                for (CloseListener closeListener : closeListenerList) {
1✔
470
                    closeListener.onClose();
1✔
471
                }
1✔
472
                isClosed.set(true);
1✔
473
            } finally {
474
                logDebug("[close] end");
1✔
475
            }
1✔
476
        }
1✔
477
    }
1✔
478

479
    private void moveToNearByTimestamp(ExcerptTailer tailer, long timestamp) {
480
        int expectedCycle = ChronicleQueueHelper.cycle(defaultRollCycle, timeProvider, timestamp);
1✔
481
        int currentCycle = tailer.cycle();
1✔
482
        if (currentCycle != expectedCycle) {
1✔
483
            boolean moveToCycleResult = tailer.moveToCycle(expectedCycle);
1✔
484
            logDebug("[moveToNearByTimestamp] moveToCycleResult: {}", moveToCycleResult);
1✔
485
        }
486
    }
1✔
487

488
    @Override
489
    public void addCloseListener(CloseListener listener) {
490
        closeListenerList.add(listener);
1✔
491
    }
1✔
492

493

494
    // region page
495

496
    @Override
497
    public PageInfo<QueueMessage> getPage(SortDirection sortDirection, int pageSize) {
498
        return getPage(-1, sortDirection, pageSize);
1✔
499
    }
500

501
    @SuppressWarnings("Duplicates")
502
    @Override
503
    public PageInfo<QueueMessage> getPage(long moveToPosition, SortDirection sortDirection, int pageSize) {
504
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
505
            if (moveToPosition != -1) {
1✔
506
                tailer.moveToIndex(moveToPosition);
1✔
507
            }
508
            if (sortDirection == SortDirection.DESC) {
1✔
509
                tailer.toEnd();
1✔
510
                tailer.direction(TailerDirection.BACKWARD);
1✔
511
            }
512
            List<QueueMessage> data = new ArrayList<>();
1✔
513
            long start = -1;
1✔
514
            long end = -1;
1✔
515
            // reuse this message.
516
            InternalReadMessage internalReadMessage = new InternalReadMessage();
1✔
517
            for (int i = 0; i < pageSize; i++) {
1✔
518
                boolean readResult = tailer.readBytes(internalReadMessage);
1✔
519
                if (!readResult) {
1✔
520
                    break;
1✔
521
                }
522
                QueueMessage queueMessage = toQueueMessage(internalReadMessage, tailer.lastReadIndex());
1✔
523
                data.add(queueMessage);
1✔
524
                if (i == 0) {
1✔
525
                    start = tailer.lastReadIndex();
1✔
526
                }
527
                end = tailer.lastReadIndex();
1✔
528
            }
529
            return new PageInfo<>(start, end, data, sortDirection, pageSize);
1✔
530
        }
1✔
531
    }
532

533
    @SuppressWarnings("Duplicates")
534
    @Override
535
    public PageInfo<QueueMessage> getPage(PageInfo<QueueMessage> prevPageInfo, UpDown upDown) {
536
        SortDirection sortDirection = prevPageInfo.getSortDirection();
1✔
537
        int pageSize = prevPageInfo.getPageSize();
1✔
538
        long start = prevPageInfo.getStart();
1✔
539
        long end = prevPageInfo.getEnd();
1✔
540
        try (ExcerptTailer tailer = queue.createTailer()) {
1✔
541
            TailerDirection tailerDirection = getTailerDirection(sortDirection, upDown);
1✔
542
            tailer.direction(tailerDirection);
1✔
543
            if (sortDirection == SortDirection.DESC) {
1✔
544
                if (upDown == UpDown.DOWN) {
1✔
545
                    tailer.moveToIndex(end - 1);
1✔
546
                } else {
547
                    tailer.moveToIndex(start + 1);
1✔
548
                }
549
            } else {
550
                if (upDown == UpDown.DOWN) {
1✔
551
                    tailer.moveToIndex(end + 1);
1✔
552
                } else {
553
                    tailer.moveToIndex(start - 1);
1✔
554
                }
555
            }
556
            List<QueueMessage> data = new ArrayList<>();
1✔
557
            // reuse this message.
558
            InternalReadMessage internalReadMessage = new InternalReadMessage();
1✔
559
            for (int i = 0; i < pageSize; i++) {
1✔
560
                boolean readResult = tailer.readBytes(internalReadMessage);
1✔
561
                if (!readResult) {
1✔
562
                    break;
×
563
                }
564
                QueueMessage queueMessage = toQueueMessage(internalReadMessage, tailer.lastReadIndex());
1✔
565
                data.add(queueMessage);
1✔
566
                if (i == 0) {
1✔
567
                    start = tailer.lastReadIndex();
1✔
568
                }
569
                end = tailer.lastReadIndex();
1✔
570
            }
571
            if (upDown == UpDown.UP) {
1✔
572
                Collections.reverse(data);
1✔
573
            }
574
            return new PageInfo<>(start, end, data, sortDirection, pageSize);
1✔
575
        }
1✔
576
    }
577

578
    private TailerDirection getTailerDirection(SortDirection sortDirection, UpDown upDown) {
579
        if (sortDirection == SortDirection.DESC && upDown == UpDown.DOWN) {
1✔
580
            return TailerDirection.BACKWARD;
1✔
581
        }
582
        if (sortDirection == SortDirection.DESC && upDown == UpDown.UP) {
1✔
583
            return TailerDirection.FORWARD;
1✔
584
        }
585
        if (sortDirection == SortDirection.ASC && upDown == UpDown.DOWN) {
1✔
586
            return TailerDirection.FORWARD;
1✔
587
        }
588
        if (sortDirection == SortDirection.ASC && upDown == UpDown.UP) {
1✔
589
            return TailerDirection.BACKWARD;
1✔
590
        }
591
        return TailerDirection.FORWARD;
×
592
    }
593

594

595
    // endregion
596

597
    // region logger
598

599
    private void logDebug(String format) {
600
        if (logger.isDebugEnabled()) {
1✔
601
            logger.debug(format);
×
602
        }
603
    }
1✔
604

605
    private void logDebug(String format, Object arg) {
606
        if (logger.isDebugEnabled()) {
1✔
607
            logger.debug(format, arg);
×
608
        }
609
    }
1✔
610

611
    // endregion
612
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc