• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #40

02 Feb 2025 02:03PM UTC coverage: 91.757% (-0.09%) from 91.85%
#40

push

wz2cool
use closing

9 of 13 new or added lines in 4 files covered. (69.23%)

3 existing lines in 3 files now uncovered.

590 of 643 relevant lines covered (91.76%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.86
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleQueue.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IConsumer;
4
import com.github.wz2cool.localqueue.IQueue;
5
import com.github.wz2cool.localqueue.event.CloseListener;
6
import com.github.wz2cool.localqueue.model.config.SimpleConsumerConfig;
7
import com.github.wz2cool.localqueue.model.config.SimpleProducerConfig;
8
import com.github.wz2cool.localqueue.model.config.SimpleQueueConfig;
9
import com.github.wz2cool.localqueue.model.enums.ConsumeFromWhere;
10
import org.slf4j.Logger;
11
import org.slf4j.LoggerFactory;
12

13
import java.util.Map;
14
import java.util.Objects;
15
import java.util.concurrent.ConcurrentHashMap;
16
import java.util.concurrent.ConcurrentLinkedQueue;
17

18
/**
19
 * simple queue
20
 *
21
 * @author frank
22
 */
23
public class SimpleQueue implements IQueue {
24

25
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
1✔
26
    private final SimpleQueueConfig config;
27
    private final SimpleProducer simpleProducer;
28
    private final Map<String, SimpleConsumer> consumerMap = new ConcurrentHashMap<>();
1✔
29
    private final ConcurrentLinkedQueue<CloseListener> closeListeners = new ConcurrentLinkedQueue<>();
1✔
30
    private volatile boolean isClosing = false;
1✔
31
    private volatile boolean isClosed = false;
1✔
32

33
    public SimpleQueue(SimpleQueueConfig config) {
1✔
34
        this.config = config;
1✔
35
        this.simpleProducer = getProducer();
1✔
36
    }
1✔
37

38
    @Override
39
    public boolean offer(String message) {
40
        return simpleProducer.offer(message);
1✔
41
    }
42

43
    @Override
44
    public boolean offer(String messageKey, String message) {
45
        return simpleProducer.offer(messageKey, message);
×
46
    }
47

48
    @Override
49
    public boolean isClosed() {
NEW
50
        return isClosed;
×
51
    }
52

53
    private SimpleProducer getProducer() {
54
        return new SimpleProducer(new SimpleProducerConfig.Builder()
1✔
55
                .setDataDir(config.getDataDir())
1✔
56
                .setKeepDays(config.getKeepDays())
1✔
57
                .setRollCycleType(config.getRollCycleType())
1✔
58
                .setTimeZone(config.getTimeZone())
1✔
59
                .build());
1✔
60
    }
61

62
    @Override
63
    public synchronized IConsumer getConsumer(final String consumerId) {
64
        return getConsumer(consumerId, ConsumeFromWhere.LAST);
1✔
65
    }
66

67
    @Override
68
    public synchronized IConsumer getConsumer(final String consumerId, final ConsumeFromWhere consumeFromWhere) {
69
        SimpleConsumer consumer = consumerMap.get(consumerId);
1✔
70
        if (Objects.nonNull(consumer)) {
1✔
71
            return consumer;
×
72
        }
73

74
        consumer = new SimpleConsumer(new SimpleConsumerConfig.Builder()
1✔
75
                .setDataDir(config.getDataDir())
1✔
76
                .setConsumerId(consumerId)
1✔
77
                .setConsumeFromWhere(consumeFromWhere)
1✔
78
                .setRollCycleType(config.getRollCycleType())
1✔
79
                .setTimeZone(config.getTimeZone())
1✔
80
                .build());
1✔
81
        consumer.addCloseListener(() -> consumerMap.remove(consumerId));
1✔
82
        consumerMap.put(consumerId, consumer);
1✔
83
        return consumer;
1✔
84
    }
85

86
    @Override
87
    public void close() {
88
        try {
89
            logDebug("[close] start");
1✔
90
            if (isClosing) {
1✔
91
                logDebug("[close] is closing");
1✔
92
                return;
1✔
93
            }
94
            isClosing = true;
1✔
95
            if (!simpleProducer.isClosed()) {
1✔
96
                simpleProducer.close();
1✔
97
            }
98
            for (Map.Entry<String, SimpleConsumer> entry : consumerMap.entrySet()) {
1✔
99
                SimpleConsumer consumer = entry.getValue();
1✔
100
                if (!consumer.isClosed()) {
1✔
101
                    entry.getValue().close();
1✔
102
                }
103
            }
1✔
104
            for (CloseListener listener : closeListeners) {
1✔
105
                listener.onClose();
1✔
106
            }
1✔
107
            isClosed = true;
1✔
108
        } finally {
109
            logDebug("[close] end");
1✔
110
        }
1✔
111
    }
1✔
112

113

114
    @Override
115
    public void addCloseListener(CloseListener listener) {
116
        closeListeners.add(listener);
1✔
117
    }
1✔
118

119
    // region logger
120

121
    private void logDebug(String format) {
122
        if (logger.isDebugEnabled()) {
1✔
123
            logger.debug(format);
×
124
        }
125
    }
1✔
126

127
    // endregion
128
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc