• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #25

31 Jan 2025 10:14AM UTC coverage: 90.359% (-1.2%) from 91.583%
#25

push

wz2cool
add CloseListener.java

28 of 36 new or added lines in 4 files covered. (77.78%)

1 existing line in 1 file now uncovered.

478 of 529 relevant lines covered (90.36%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.22
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleProducer.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IProducer;
4
import com.github.wz2cool.localqueue.event.CloseListener;
5
import com.github.wz2cool.localqueue.model.config.SimpleProducerConfig;
6
import com.github.wz2cool.localqueue.model.message.InternalWriteMessage;
7
import net.openhft.chronicle.queue.ChronicleQueue;
8
import net.openhft.chronicle.queue.ExcerptAppender;
9
import net.openhft.chronicle.queue.RollCycles;
10
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
11
import org.slf4j.Logger;
12
import org.slf4j.LoggerFactory;
13

14
import java.io.File;
15
import java.io.IOException;
16
import java.nio.file.Files;
17
import java.time.LocalDate;
18
import java.time.format.DateTimeFormatter;
19
import java.util.ArrayList;
20
import java.util.List;
21
import java.util.UUID;
22
import java.util.concurrent.*;
23
import java.util.concurrent.locks.Lock;
24
import java.util.concurrent.locks.ReentrantLock;
25

26
/**
27
 * simple writer
28
 *
29
 * @author frank
30
 */
31
public class SimpleProducer implements IProducer {
32

33
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
1✔
34

35
    private final SimpleProducerConfig config;
36
    private final SingleChronicleQueue queue;
37
    private final LinkedBlockingQueue<InternalWriteMessage> messageCache = new LinkedBlockingQueue<>();
1✔
38
    // should only call by flushExecutor
39
    private final ExcerptAppender mainAppender;
40
    private final ExecutorService flushExecutor = Executors.newSingleThreadExecutor();
1✔
41
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1✔
42
    private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd");
1✔
43
    private final Lock internalLock = new ReentrantLock();
1✔
44
    private final ConcurrentLinkedQueue<CloseListener> closeListeners = new ConcurrentLinkedQueue<>();
1✔
45

46
    private volatile boolean isFlushRunning = true;
1✔
47
    private volatile boolean isClosing = false;
1✔
48
    private volatile boolean isClosed = false;
1✔
49

50
    public SimpleProducer(final SimpleProducerConfig config) {
1✔
51
        this.config = config;
1✔
52
        this.queue = ChronicleQueue.singleBuilder(config.getDataDir()).rollCycle(RollCycles.FAST_DAILY).build();
1✔
53
        this.mainAppender = initMainAppender();
1✔
54
        flushExecutor.execute(this::flush);
1✔
55
        scheduler.scheduleAtFixedRate(() -> cleanUpOldFiles(config.getKeepDays()), 0, 1, TimeUnit.HOURS);
1✔
56
    }
1✔
57

58
    private ExcerptAppender initMainAppender() {
59
        return CompletableFuture.supplyAsync(this.queue::createAppender, this.flushExecutor).join();
1✔
60
    }
61

62
    /// region flush to file
63
    private void flush() {
64
        while (isFlushRunning && !isClosing) {
1✔
65
            flushMessages(config.getFlushBatchSize());
1✔
66
        }
67
    }
1✔
68

69
    private void stopFlush() {
70
        isFlushRunning = false;
1✔
71
    }
1✔
72

73
    private final List<InternalWriteMessage> tempFlushMessages = new ArrayList<>();
1✔
74

75
    private void flushMessages(int batchSize) {
76
        try {
77
            logDebug("[flushInternal] start");
1✔
78
            if (tempFlushMessages.isEmpty()) {
1✔
79
                // take 主要作用就是卡主线程
80
                InternalWriteMessage firstItem = this.messageCache.poll(config.getFlushInterval(), TimeUnit.MILLISECONDS);
1✔
81
                if (firstItem == null) {
1✔
82
                    return;
1✔
83
                }
84
                this.tempFlushMessages.add(firstItem);
1✔
85
                // 如果空了从消息缓存放入待刷消息
86
                this.messageCache.drainTo(tempFlushMessages, batchSize - 1);
1✔
87
            }
88
            flushMessages(tempFlushMessages);
1✔
89
            tempFlushMessages.clear();
1✔
90
        } catch (InterruptedException ex) {
×
91
            Thread.currentThread().interrupt();
×
92
        } finally {
93
            logDebug("[flushInternal] end");
1✔
94
        }
1✔
95
    }
1✔
96

97
    private void flushMessages(final List<InternalWriteMessage> messages) {
98
        try {
99
            logDebug("[flushMessages] start");
1✔
100
            internalLock.lock();
1✔
101
            if (!isFlushRunning || isClosing) {
1✔
102
                return;
×
103
            }
104

105
            for (InternalWriteMessage message : messages) {
1✔
106
                long writeTime = System.currentTimeMillis();
1✔
107
                message.setWriteTime(writeTime);
1✔
108
                mainAppender.writeBytes(message);
1✔
109
            }
1✔
110
        } finally {
111
            internalLock.unlock();
1✔
112
            logDebug("[flushMessages] end");
1✔
113
        }
1✔
114
    }
1✔
115

116
    /// endregion
117

118

119
    @Override
120
    public boolean offer(String message) {
121
        return offer(null, message);
1✔
122
    }
123

124
    @Override
125
    public boolean offer(String messageKey, String message) {
126
        InternalWriteMessage internalWriteMessage = new InternalWriteMessage();
1✔
127
        internalWriteMessage.setContent(message);
1✔
128
        String useMessageKey = messageKey == null ? UUID.randomUUID().toString() : messageKey;
1✔
129
        internalWriteMessage.setMessageKey(useMessageKey);
1✔
130
        return this.messageCache.offer(internalWriteMessage);
1✔
131
    }
132

133
    /**
134
     * get the last position
135
     *
136
     * @return this last position
137
     */
138
    public long getLastPosition() {
139
        return this.queue.lastIndex();
1✔
140
    }
141

142
    /// region close
143

144
    /**
145
     * is closed.
146
     *
147
     * @return true if the Producer is closed, false otherwise.
148
     */
149
    public boolean isClosed() {
150
        return isClosed;
1✔
151
    }
152

153
    @Override
154
    public void close() {
155
        try {
156
            logDebug("[close] start");
1✔
157
            isClosing = true;
1✔
158
            internalLock.lock();
1✔
159
            stopFlush();
1✔
160
            internalLock.unlock();
1✔
161
            if (!queue.isClosed()) {
1✔
162
                queue.close();
1✔
163
            }
164
            flushExecutor.shutdown();
1✔
165
            scheduler.shutdown();
1✔
166
            try {
167
                if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
168
                    scheduler.shutdownNow();
×
169
                }
170
                if (!flushExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
171
                    flushExecutor.shutdownNow();
×
172
                }
173
            } catch (InterruptedException e) {
×
174
                scheduler.shutdownNow();
×
175
                flushExecutor.shutdownNow();
×
176
                Thread.currentThread().interrupt();
×
177
            }
1✔
178
            for (CloseListener closeListener : closeListeners) {
1✔
NEW
179
                closeListener.onClose();
×
NEW
180
            }
×
181
            isClosed = true;
1✔
182
        } finally {
183
            logDebug("[close] end");
1✔
184
        }
1✔
185
    }
1✔
186

187
    @Override
188
    public void addCloseListener(CloseListener listener) {
NEW
189
        closeListeners.add(listener);
×
NEW
190
    }
×
191

192
    private void cleanUpOldFiles(int keepDays) {
193
        if (keepDays == -1) {
1✔
194
            // no need clean up old files
195
            return;
1✔
196
        }
197
        logDebug("[cleanUpOldFiles] start");
1✔
198
        try {
199
            // Assuming .cq4 is the file extension for Chronicle Queue
200
            File[] files = config.getDataDir().listFiles((dir, name) -> name.endsWith(".cq4"));
1✔
201
            if (files == null || files.length == 0) {
1✔
202
                logDebug("[cleanUpOldFiles] no files found");
1✔
203
                return;
1✔
204
            }
205
            LocalDate now = LocalDate.now();
1✔
206
            LocalDate keepStartDate = now.minusDays(keepDays);
1✔
207
            for (File file : files) {
1✔
208
                cleanUpOldFile(file, keepStartDate);
1✔
209
            }
210
        } catch (Exception ex) {
×
211
            logger.error("[cleanUpOldFiles] error", ex);
×
212
        } finally {
213
            logDebug("[cleanUpOldFiles] end");
1✔
214
        }
1✔
215
    }
1✔
216

217
    private void cleanUpOldFile(final File file, final LocalDate keepDate) throws IOException {
218
        String fileName = file.getName();
1✔
219
        String dateString = fileName.substring(0, 8);
1✔
220
        LocalDate localDate = LocalDate.parse(dateString, this.dateFormatter);
1✔
221
        if (localDate.isBefore(keepDate)) {
1✔
222
            Files.deleteIfExists(file.toPath());
1✔
223
            logDebug("[cleanUpOldFile] Deleted old file: {}", file.getName());
1✔
224
        }
225
    }
1✔
226

227
    /// endregion
228

229
    private void logDebug(String format) {
230
        if (logger.isDebugEnabled()) {
1✔
231
            logDebug(format);
×
232
        }
233
    }
1✔
234

235
    private void logDebug(String format, Object arg) {
236
        if (logger.isDebugEnabled()) {
1✔
237
            logDebug(format, arg);
×
238
        }
239
    }
1✔
240
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc