• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #40

02 Feb 2025 02:03PM UTC coverage: 91.757% (-0.09%) from 91.85%
#40

push

wz2cool
use closing

9 of 13 new or added lines in 4 files covered. (69.23%)

3 existing lines in 3 files now uncovered.

590 of 643 relevant lines covered (91.76%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.8
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleProducer.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IProducer;
4
import com.github.wz2cool.localqueue.event.CloseListener;
5
import com.github.wz2cool.localqueue.helper.ChronicleQueueHelper;
6
import com.github.wz2cool.localqueue.model.config.SimpleProducerConfig;
7
import com.github.wz2cool.localqueue.model.message.InternalWriteMessage;
8
import net.openhft.chronicle.core.time.TimeProvider;
9
import net.openhft.chronicle.queue.ChronicleQueue;
10
import net.openhft.chronicle.queue.ExcerptAppender;
11
import net.openhft.chronicle.queue.RollCycle;
12
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
13
import org.slf4j.Logger;
14
import org.slf4j.LoggerFactory;
15

16
import java.io.File;
17
import java.io.IOException;
18
import java.nio.file.Files;
19
import java.time.LocalDate;
20
import java.time.format.DateTimeFormatter;
21
import java.util.ArrayList;
22
import java.util.List;
23
import java.util.UUID;
24
import java.util.concurrent.*;
25
import java.util.concurrent.locks.Lock;
26
import java.util.concurrent.locks.ReentrantLock;
27

28
/**
29
 * simple writer
30
 *
31
 * @author frank
32
 */
33
public class SimpleProducer implements IProducer {
34

35
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
1✔
36
    private final RollCycle defaultRollCycle;
37
    private final TimeProvider timeProvider;
38
    private final SimpleProducerConfig config;
39
    private final SingleChronicleQueue queue;
40
    private final LinkedBlockingQueue<InternalWriteMessage> messageCache = new LinkedBlockingQueue<>();
1✔
41
    // should only call by flushExecutor
42
    private final ExcerptAppender mainAppender;
43
    private final ExecutorService flushExecutor = Executors.newSingleThreadExecutor();
1✔
44
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1✔
45
    private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd");
1✔
46
    private final Lock internalLock = new ReentrantLock();
1✔
47
    private final ConcurrentLinkedQueue<CloseListener> closeListeners = new ConcurrentLinkedQueue<>();
1✔
48

49
    private volatile boolean isFlushRunning = true;
1✔
50
    private volatile boolean isClosing = false;
1✔
51
    private volatile boolean isClosed = false;
1✔
52

53
    public SimpleProducer(final SimpleProducerConfig config) {
1✔
54
        this.config = config;
1✔
55
        this.timeProvider = ChronicleQueueHelper.getTimeProvider(config.getTimeZone());
1✔
56
        this.defaultRollCycle = ChronicleQueueHelper.getRollCycle(config.getRollCycleType());
1✔
57
        this.queue = ChronicleQueue.singleBuilder(config.getDataDir())
1✔
58
                .rollCycle(defaultRollCycle)
1✔
59
                .timeProvider(timeProvider)
1✔
60
                .build();
1✔
61
        this.mainAppender = initMainAppender();
1✔
62
        flushExecutor.execute(this::flush);
1✔
63
        scheduler.scheduleAtFixedRate(() -> cleanUpOldFiles(config.getKeepDays()), 0, 1, TimeUnit.HOURS);
1✔
64
    }
1✔
65

66
    private ExcerptAppender initMainAppender() {
67
        return CompletableFuture.supplyAsync(this.queue::createAppender, this.flushExecutor).join();
1✔
68
    }
69

70
    // region flush to file
71
    private void flush() {
72
        while (isFlushRunning && !isClosing) {
1✔
73
            flushMessages(config.getFlushBatchSize());
1✔
74
        }
75
    }
1✔
76

77
    private void stopFlush() {
78
        isFlushRunning = false;
1✔
79
    }
1✔
80

81
    private final List<InternalWriteMessage> tempFlushMessages = new ArrayList<>();
1✔
82

83
    private void flushMessages(int batchSize) {
84
        try {
85
            logDebug("[flushInternal] start");
1✔
86
            if (tempFlushMessages.isEmpty()) {
1✔
87
                // take 主要作用就是卡主线程
88
                InternalWriteMessage firstItem = this.messageCache.poll(config.getFlushInterval(), TimeUnit.MILLISECONDS);
1✔
89
                if (firstItem == null) {
1✔
90
                    return;
1✔
91
                }
92
                this.tempFlushMessages.add(firstItem);
1✔
93
                // 如果空了从消息缓存放入待刷消息
94
                this.messageCache.drainTo(tempFlushMessages, batchSize - 1);
1✔
95
            }
96
            flushMessages(tempFlushMessages);
1✔
97
            tempFlushMessages.clear();
1✔
98
        } catch (InterruptedException ex) {
×
99
            Thread.currentThread().interrupt();
×
100
        } finally {
101
            logDebug("[flushInternal] end");
1✔
102
        }
1✔
103
    }
1✔
104

105
    private void flushMessages(final List<InternalWriteMessage> messages) {
106
        try {
107
            logDebug("[flushMessages] start");
1✔
108
            internalLock.lock();
1✔
109
            if (!isFlushRunning || isClosing) {
1✔
110
                return;
×
111
            }
112

113
            for (InternalWriteMessage message : messages) {
1✔
114
                long writeTime = System.currentTimeMillis();
1✔
115
                message.setWriteTime(writeTime);
1✔
116
                mainAppender.writeBytes(message);
1✔
117
            }
1✔
118
        } finally {
119
            internalLock.unlock();
1✔
120
            logDebug("[flushMessages] end");
1✔
121
        }
1✔
122
    }
1✔
123

124
    // endregion
125

126

127
    @Override
128
    public boolean offer(String message) {
129
        return offer(null, message);
1✔
130
    }
131

132
    @Override
133
    public boolean offer(String messageKey, String message) {
134
        InternalWriteMessage internalWriteMessage = new InternalWriteMessage();
1✔
135
        internalWriteMessage.setContent(message);
1✔
136
        String useMessageKey = messageKey == null ? UUID.randomUUID().toString() : messageKey;
1✔
137
        internalWriteMessage.setMessageKey(useMessageKey);
1✔
138
        return this.messageCache.offer(internalWriteMessage);
1✔
139
    }
140

141
    /**
142
     * get the last position
143
     *
144
     * @return this last position
145
     */
146
    public long getLastPosition() {
147
        return this.queue.lastIndex();
1✔
148
    }
149

150
    // region close
151

152
    /**
153
     * is closed.
154
     *
155
     * @return true if the Producer is closed, false otherwise.
156
     */
157
    @Override
158
    public boolean isClosed() {
159
        return isClosed;
1✔
160
    }
161

162
    @Override
163
    public void close() {
164
        try {
165
            logDebug("[close] start");
1✔
166
            if (isClosing) {
1✔
NEW
167
                logDebug("[close] is closing");
×
UNCOV
168
                return;
×
169
            }
170
            isClosing = true;
1✔
171
            internalLock.lock();
1✔
172
            stopFlush();
1✔
173
            internalLock.unlock();
1✔
174
            if (!queue.isClosed()) {
1✔
175
                queue.close();
1✔
176
            }
177
            flushExecutor.shutdown();
1✔
178
            scheduler.shutdown();
1✔
179
            try {
180
                if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
181
                    scheduler.shutdownNow();
×
182
                }
183
                if (!flushExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
184
                    flushExecutor.shutdownNow();
×
185
                }
186
            } catch (InterruptedException e) {
×
187
                scheduler.shutdownNow();
×
188
                flushExecutor.shutdownNow();
×
189
                Thread.currentThread().interrupt();
×
190
            }
1✔
191
            for (CloseListener closeListener : closeListeners) {
1✔
192
                closeListener.onClose();
1✔
193
            }
1✔
194
            isClosed = true;
1✔
195
        } finally {
196
            logDebug("[close] end");
1✔
197
        }
1✔
198
    }
1✔
199

200
    @Override
201
    public void addCloseListener(CloseListener listener) {
202
        closeListeners.add(listener);
1✔
203
    }
1✔
204

205
    private void cleanUpOldFiles(int keepDays) {
206
        if (keepDays == -1) {
1✔
207
            // no need clean up old files
208
            return;
1✔
209
        }
210
        logDebug("[cleanUpOldFiles] start");
1✔
211
        try {
212
            // Assuming .cq4 is the file extension for Chronicle Queue
213
            File[] files = config.getDataDir().listFiles((dir, name) -> name.endsWith(".cq4"));
1✔
214
            if (files == null || files.length == 0) {
1✔
215
                logDebug("[cleanUpOldFiles] no files found");
1✔
216
                return;
1✔
217
            }
218
            LocalDate now = LocalDate.now();
1✔
219
            LocalDate keepStartDate = now.minusDays(keepDays);
1✔
220
            for (File file : files) {
1✔
221
                cleanUpOldFile(file, keepStartDate);
1✔
222
            }
223
        } catch (Exception ex) {
×
224
            logger.error("[cleanUpOldFiles] error", ex);
×
225
        } finally {
226
            logDebug("[cleanUpOldFiles] end");
1✔
227
        }
1✔
228
    }
1✔
229

230
    private void cleanUpOldFile(final File file, final LocalDate keepDate) throws IOException {
231
        String fileName = file.getName();
1✔
232
        String dateString = fileName.substring(0, 8);
1✔
233
        LocalDate localDate = LocalDate.parse(dateString, this.dateFormatter);
1✔
234
        if (localDate.isBefore(keepDate)) {
1✔
235
            Files.deleteIfExists(file.toPath());
1✔
236
            logDebug("[cleanUpOldFile] Deleted old file: {}", file.getName());
1✔
237
        }
238
    }
1✔
239

240
    // endregion
241

242
    // region logger
243

244
    private void logDebug(String format) {
245
        if (logger.isDebugEnabled()) {
1✔
246
            logDebug(format);
×
247
        }
248
    }
1✔
249

250
    private void logDebug(String format, Object arg) {
251
        if (logger.isDebugEnabled()) {
1✔
252
            logDebug(format, arg);
×
253
        }
254
    }
1✔
255

256
    // endregion
257
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc