• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #41

02 Feb 2025 03:13PM UTC coverage: 91.958% (+0.2%) from 91.757%
#41

push

wz2cool
use atomic instead of volatile

105 of 124 new or added lines in 4 files covered. (84.68%)

2 existing lines in 2 files now uncovered.

606 of 659 relevant lines covered (91.96%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.55
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleQueue.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IConsumer;
4
import com.github.wz2cool.localqueue.IQueue;
5
import com.github.wz2cool.localqueue.event.CloseListener;
6
import com.github.wz2cool.localqueue.model.config.SimpleConsumerConfig;
7
import com.github.wz2cool.localqueue.model.config.SimpleProducerConfig;
8
import com.github.wz2cool.localqueue.model.config.SimpleQueueConfig;
9
import com.github.wz2cool.localqueue.model.enums.ConsumeFromWhere;
10
import org.slf4j.Logger;
11
import org.slf4j.LoggerFactory;
12

13
import java.util.Map;
14
import java.util.Objects;
15
import java.util.concurrent.ConcurrentHashMap;
16
import java.util.concurrent.ConcurrentLinkedQueue;
17
import java.util.concurrent.atomic.AtomicBoolean;
18

19
/**
20
 * simple queue
21
 *
22
 * @author frank
23
 */
24
public class SimpleQueue implements IQueue {
25

26
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
1✔
27
    private final SimpleQueueConfig config;
28
    private final SimpleProducer simpleProducer;
29
    private final Map<String, SimpleConsumer> consumerMap = new ConcurrentHashMap<>();
1✔
30
    private final ConcurrentLinkedQueue<CloseListener> closeListeners = new ConcurrentLinkedQueue<>();
1✔
31

32
    private final Object closeLocker = new Object();
1✔
33
    private final AtomicBoolean isClosing = new AtomicBoolean(false);
1✔
34
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
1✔
35

36
    public SimpleQueue(SimpleQueueConfig config) {
1✔
37
        this.config = config;
1✔
38
        this.simpleProducer = getProducer();
1✔
39
    }
1✔
40

41
    @Override
42
    public boolean offer(String message) {
43
        return simpleProducer.offer(message);
1✔
44
    }
45

46
    @Override
47
    public boolean offer(String messageKey, String message) {
48
        return simpleProducer.offer(messageKey, message);
×
49
    }
50

51
    @Override
52
    public boolean isClosed() {
NEW
53
        return isClosed.get();
×
54
    }
55

56
    private SimpleProducer getProducer() {
57
        return new SimpleProducer(new SimpleProducerConfig.Builder()
1✔
58
                .setDataDir(config.getDataDir())
1✔
59
                .setKeepDays(config.getKeepDays())
1✔
60
                .setRollCycleType(config.getRollCycleType())
1✔
61
                .setTimeZone(config.getTimeZone())
1✔
62
                .build());
1✔
63
    }
64

65
    @Override
66
    public synchronized IConsumer getConsumer(final String consumerId) {
67
        return getConsumer(consumerId, ConsumeFromWhere.LAST);
1✔
68
    }
69

70
    @Override
71
    public synchronized IConsumer getConsumer(final String consumerId, final ConsumeFromWhere consumeFromWhere) {
72
        SimpleConsumer consumer = consumerMap.get(consumerId);
1✔
73
        if (Objects.nonNull(consumer)) {
1✔
74
            return consumer;
×
75
        }
76

77
        consumer = new SimpleConsumer(new SimpleConsumerConfig.Builder()
1✔
78
                .setDataDir(config.getDataDir())
1✔
79
                .setConsumerId(consumerId)
1✔
80
                .setConsumeFromWhere(consumeFromWhere)
1✔
81
                .setRollCycleType(config.getRollCycleType())
1✔
82
                .setTimeZone(config.getTimeZone())
1✔
83
                .build());
1✔
84
        consumer.addCloseListener(() -> {
1✔
85
            SimpleConsumer removeItem = consumerMap.remove(consumerId);
1✔
86
            removeItem.close();
1✔
87
        });
1✔
88
        consumerMap.put(consumerId, consumer);
1✔
89
        return consumer;
1✔
90
    }
91

92
    @Override
93
    public void close() {
94
        synchronized (closeLocker) {
1✔
95
            try {
96
                logDebug("[close] start");
1✔
97
                if (isClosing.get()) {
1✔
98
                    logDebug("[close] is closing");
1✔
99
                    return;
1✔
100
                }
101
                isClosing.set(true);
1✔
102
                if (!simpleProducer.isClosed()) {
1✔
103
                    simpleProducer.close();
1✔
104
                }
105
                for (Map.Entry<String, SimpleConsumer> entry : consumerMap.entrySet()) {
1✔
106
                    SimpleConsumer consumer = entry.getValue();
1✔
107
                    if (!consumer.isClosed()) {
1✔
108
                        entry.getValue().close();
1✔
109
                    }
110
                }
1✔
111
                for (CloseListener listener : closeListeners) {
1✔
112
                    listener.onClose();
1✔
113
                }
1✔
114
                isClosed.set(true);
1✔
115
            } finally {
116
                logDebug("[close] end");
1✔
117
            }
1✔
118
        }
1✔
119
    }
1✔
120

121

122
    @Override
123
    public void addCloseListener(CloseListener listener) {
124
        closeListeners.add(listener);
1✔
125
    }
1✔
126

127
    // region logger
128

129
    private void logDebug(String format) {
130
        if (logger.isDebugEnabled()) {
1✔
131
            logger.debug(format);
×
132
        }
133
    }
1✔
134

135
    // endregion
136
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc