• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #25

31 Jan 2025 10:14AM UTC coverage: 90.359% (-1.2%) from 91.583%
#25

push

wz2cool
add CloseListener.java

28 of 36 new or added lines in 4 files covered. (77.78%)

1 existing line in 1 file now uncovered.

478 of 529 relevant lines covered (90.36%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.78
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleQueue.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IConsumer;
4
import com.github.wz2cool.localqueue.IQueue;
5
import com.github.wz2cool.localqueue.event.CloseListener;
6
import com.github.wz2cool.localqueue.model.config.SimpleConsumerConfig;
7
import com.github.wz2cool.localqueue.model.config.SimpleProducerConfig;
8
import com.github.wz2cool.localqueue.model.config.SimpleQueueConfig;
9
import com.github.wz2cool.localqueue.model.enums.ConsumeFromWhere;
10

11
import java.util.Map;
12
import java.util.Objects;
13
import java.util.concurrent.ConcurrentHashMap;
14
import java.util.concurrent.ConcurrentLinkedQueue;
15

16
/**
17
 * simple queue
18
 *
19
 * @author frank
20
 */
21
public class SimpleQueue implements IQueue {
22

23
    private final SimpleQueueConfig config;
24
    private final SimpleProducer simpleProducer;
25
    private final Map<String, SimpleConsumer> consumerMap = new ConcurrentHashMap<>();
1✔
26
    private final ConcurrentLinkedQueue<CloseListener> closeListeners = new ConcurrentLinkedQueue<>();
1✔
27

28
    public SimpleQueue(SimpleQueueConfig config) {
1✔
29
        this.config = config;
1✔
30
        this.simpleProducer = getProducer();
1✔
31
    }
1✔
32

33
    @Override
34
    public boolean offer(String message) {
35
        return simpleProducer.offer(message);
1✔
36
    }
37

38
    @Override
39
    public boolean offer(String messageKey, String message) {
NEW
40
        return simpleProducer.offer(messageKey, message);
×
41
    }
42

43
    private SimpleProducer getProducer() {
44
        return new SimpleProducer(new SimpleProducerConfig.Builder()
1✔
45
                .setDataDir(config.getDataDir())
1✔
46
                .setKeepDays(config.getKeepDays())
1✔
47
                .build());
1✔
48
    }
49

50
    @Override
51
    public synchronized IConsumer getConsumer(final String consumerId) {
52
        return getConsumer(consumerId, ConsumeFromWhere.LAST);
1✔
53
    }
54

55
    @Override
56
    public synchronized IConsumer getConsumer(final String consumerId, final ConsumeFromWhere consumeFromWhere) {
57
        SimpleConsumer consumer = consumerMap.get(consumerId);
1✔
58
        if (Objects.nonNull(consumer)) {
1✔
59
            return consumer;
×
60
        }
61

62
        consumer = new SimpleConsumer(new SimpleConsumerConfig.Builder()
1✔
63
                .setDataDir(config.getDataDir())
1✔
64
                .setConsumerId(consumerId)
1✔
65
                .setConsumeFromWhere(consumeFromWhere)
1✔
66
                .build());
1✔
67
        consumer.addCloseListener(() -> consumerMap.remove(consumerId));
1✔
68
        consumerMap.put(consumerId, consumer);
1✔
69
        return consumer;
1✔
70
    }
71

72
    @Override
73
    public void close() {
74
        if (!simpleProducer.isClosed()) {
1✔
75
            simpleProducer.close();
1✔
76
        }
77
        for (Map.Entry<String, SimpleConsumer> entry : consumerMap.entrySet()) {
1✔
78
            SimpleConsumer consumer = entry.getValue();
1✔
79
            if (!consumer.isClosed()) {
1✔
80
                entry.getValue().close();
1✔
81
            }
82
        }
1✔
83
        for (CloseListener listener : closeListeners) {
1✔
NEW
84
            listener.onClose();
×
UNCOV
85
        }
×
86
    }
1✔
87

88
    @Override
89
    public void addCloseListener(CloseListener listener) {
NEW
90
        closeListeners.add(listener);
×
NEW
91
    }
×
92
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc