• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #32

01 Feb 2025 03:28AM UTC coverage: 90.877% (+0.8%) from 90.052%
#32

push

wz2cool
multi close

0 of 1 new or added line in 1 file covered. (0.0%)

518 of 570 relevant lines covered (90.88%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.88
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleQueue.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IConsumer;
4
import com.github.wz2cool.localqueue.IQueue;
5
import com.github.wz2cool.localqueue.event.CloseListener;
6
import com.github.wz2cool.localqueue.model.config.SimpleConsumerConfig;
7
import com.github.wz2cool.localqueue.model.config.SimpleProducerConfig;
8
import com.github.wz2cool.localqueue.model.config.SimpleQueueConfig;
9
import com.github.wz2cool.localqueue.model.enums.ConsumeFromWhere;
10
import org.slf4j.Logger;
11
import org.slf4j.LoggerFactory;
12

13
import java.util.Map;
14
import java.util.Objects;
15
import java.util.concurrent.ConcurrentHashMap;
16
import java.util.concurrent.ConcurrentLinkedQueue;
17

18
/**
19
 * simple queue
20
 *
21
 * @author frank
22
 */
23
public class SimpleQueue implements IQueue {
24

25
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
1✔
26
    private final SimpleQueueConfig config;
27
    private final SimpleProducer simpleProducer;
28
    private final Map<String, SimpleConsumer> consumerMap = new ConcurrentHashMap<>();
1✔
29
    private final ConcurrentLinkedQueue<CloseListener> closeListeners = new ConcurrentLinkedQueue<>();
1✔
30
    private volatile boolean isClosed = false;
1✔
31

32
    public SimpleQueue(SimpleQueueConfig config) {
1✔
33
        this.config = config;
1✔
34
        this.simpleProducer = getProducer();
1✔
35
    }
1✔
36

37
    @Override
38
    public boolean offer(String message) {
39
        return simpleProducer.offer(message);
1✔
40
    }
41

42
    @Override
43
    public boolean offer(String messageKey, String message) {
44
        return simpleProducer.offer(messageKey, message);
×
45
    }
46

47
    private SimpleProducer getProducer() {
48
        return new SimpleProducer(new SimpleProducerConfig.Builder()
1✔
49
                .setDataDir(config.getDataDir())
1✔
50
                .setKeepDays(config.getKeepDays())
1✔
51
                .build());
1✔
52
    }
53

54
    @Override
55
    public synchronized IConsumer getConsumer(final String consumerId) {
56
        return getConsumer(consumerId, ConsumeFromWhere.LAST);
1✔
57
    }
58

59
    @Override
60
    public synchronized IConsumer getConsumer(final String consumerId, final ConsumeFromWhere consumeFromWhere) {
61
        SimpleConsumer consumer = consumerMap.get(consumerId);
1✔
62
        if (Objects.nonNull(consumer)) {
1✔
63
            return consumer;
×
64
        }
65

66
        consumer = new SimpleConsumer(new SimpleConsumerConfig.Builder()
1✔
67
                .setDataDir(config.getDataDir())
1✔
68
                .setConsumerId(consumerId)
1✔
69
                .setConsumeFromWhere(consumeFromWhere)
1✔
70
                .build());
1✔
71
        consumer.addCloseListener(() -> consumerMap.remove(consumerId));
1✔
72
        consumerMap.put(consumerId, consumer);
1✔
73
        return consumer;
1✔
74
    }
75

76
    @Override
77
    public void close() {
78
        try {
79
            logDebug("[close] start");
1✔
80
            if (isClosed) {
1✔
81
                logDebug("[close] already closed");
1✔
82
                return;
1✔
83
            }
84
            if (!simpleProducer.isClosed()) {
1✔
85
                simpleProducer.close();
1✔
86
            }
87
            for (Map.Entry<String, SimpleConsumer> entry : consumerMap.entrySet()) {
1✔
88
                SimpleConsumer consumer = entry.getValue();
1✔
89
                if (!consumer.isClosed()) {
1✔
90
                    entry.getValue().close();
1✔
91
                }
92
            }
1✔
93
            for (CloseListener listener : closeListeners) {
1✔
94
                listener.onClose();
1✔
95
            }
1✔
96
            isClosed = true;
1✔
97
        } finally {
98
            logDebug("[close] end");
1✔
99
        }
1✔
100
    }
1✔
101

102

103
    @Override
104
    public void addCloseListener(CloseListener listener) {
105
        closeListeners.add(listener);
1✔
106
    }
1✔
107

108
    // region logger
109

110
    private void logDebug(String format) {
111
        if (logger.isDebugEnabled()) {
1✔
NEW
112
            logger.debug(format);
×
113
        }
114
    }
1✔
115

116
    // endregion
117
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc