• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #65

04 Feb 2025 12:44PM UTC coverage: 91.822% (-0.4%) from 92.193%
#65

push

wz2cool
change selectTag to selector tag

11 of 11 new or added lines in 3 files covered. (100.0%)

3 existing lines in 1 file now uncovered.

741 of 807 relevant lines covered (91.82%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.5
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleProducer.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IProducer;
4
import com.github.wz2cool.localqueue.event.CloseListener;
5
import com.github.wz2cool.localqueue.helper.ChronicleQueueHelper;
6
import com.github.wz2cool.localqueue.model.config.SimpleProducerConfig;
7
import com.github.wz2cool.localqueue.model.message.InternalWriteMessage;
8
import net.openhft.chronicle.core.time.TimeProvider;
9
import net.openhft.chronicle.queue.ChronicleQueue;
10
import net.openhft.chronicle.queue.ExcerptAppender;
11
import net.openhft.chronicle.queue.RollCycle;
12
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
13
import org.slf4j.Logger;
14
import org.slf4j.LoggerFactory;
15

16
import java.io.File;
17
import java.io.IOException;
18
import java.nio.file.Files;
19
import java.time.LocalDate;
20
import java.time.format.DateTimeFormatter;
21
import java.util.ArrayList;
22
import java.util.List;
23
import java.util.concurrent.*;
24
import java.util.concurrent.atomic.AtomicBoolean;
25

26
/**
27
 * simple writer
28
 *
29
 * @author frank
30
 */
31
public class SimpleProducer implements IProducer {
32

33
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
1✔
34
    private final RollCycle defaultRollCycle;
35
    private final TimeProvider timeProvider;
36
    private final SimpleProducerConfig config;
37
    private final SingleChronicleQueue queue;
38
    private final LinkedBlockingQueue<InternalWriteMessage> messageCache = new LinkedBlockingQueue<>();
1✔
39
    // should only call by flushExecutor
40
    private final ExcerptAppender mainAppender;
41
    private final ExecutorService flushExecutor = Executors.newSingleThreadExecutor();
1✔
42
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1✔
43
    private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd");
1✔
44
    private final ConcurrentLinkedQueue<CloseListener> closeListeners = new ConcurrentLinkedQueue<>();
1✔
45

46
    private final AtomicBoolean isFlushRunning = new AtomicBoolean(true);
1✔
47
    private final AtomicBoolean isClosing = new AtomicBoolean(false);
1✔
48
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
1✔
49
    private final Object closeLocker = new Object();
1✔
50

51
    public SimpleProducer(final SimpleProducerConfig config) {
1✔
52
        this.config = config;
1✔
53
        this.timeProvider = ChronicleQueueHelper.getTimeProvider(config.getTimeZone());
1✔
54
        this.defaultRollCycle = ChronicleQueueHelper.getRollCycle(config.getRollCycleType());
1✔
55
        this.queue = ChronicleQueue.singleBuilder(config.getDataDir())
1✔
56
                .rollCycle(defaultRollCycle)
1✔
57
                .timeProvider(timeProvider)
1✔
58
                .build();
1✔
59
        this.mainAppender = initMainAppender();
1✔
60
        flushExecutor.execute(this::flush);
1✔
61
        scheduler.scheduleAtFixedRate(() -> cleanUpOldFiles(config.getKeepDays()), 0, 1, TimeUnit.HOURS);
1✔
62
    }
1✔
63

64
    private ExcerptAppender initMainAppender() {
65
        return CompletableFuture.supplyAsync(this.queue::createAppender, this.flushExecutor).join();
1✔
66
    }
67

68
    // region flush to file
69

70
    private void stopFlush() {
71
        isFlushRunning.set(false);
1✔
72
    }
1✔
73

74
    private void flush() {
75
        while (isFlushRunning.get() && !isClosing.get()) {
1✔
76
            flushMessages(config.getFlushBatchSize());
1✔
77
        }
78
    }
1✔
79

80
    private final List<InternalWriteMessage> tempFlushMessages = new ArrayList<>();
1✔
81

82
    private void flushMessages(int batchSize) {
83
        try {
84
            logDebug("[flushInternal] start");
1✔
85
            if (tempFlushMessages.isEmpty()) {
1✔
86
                // take 主要作用就是卡主线程
87
                InternalWriteMessage firstItem = this.messageCache.poll(config.getFlushInterval(), TimeUnit.MILLISECONDS);
1✔
88
                if (firstItem == null) {
1✔
89
                    return;
1✔
90
                }
91
                this.tempFlushMessages.add(firstItem);
1✔
92
                // 如果空了从消息缓存放入待刷消息
93
                this.messageCache.drainTo(tempFlushMessages, batchSize - 1);
1✔
94
            }
95
            doFlushMessages(tempFlushMessages);
1✔
96
            tempFlushMessages.clear();
1✔
97
        } catch (InterruptedException ex) {
×
98
            Thread.currentThread().interrupt();
×
99
        } finally {
100
            logDebug("[flushInternal] end");
1✔
101
        }
1✔
102
    }
1✔
103

104
    private void doFlushMessages(final List<InternalWriteMessage> messages) {
105
        synchronized (closeLocker) {
1✔
106
            try {
107
                logDebug("[flushMessages] start");
1✔
108
                if (isClosing.get()) {
1✔
UNCOV
109
                    logDebug("[flushMessages] producer is closing");
×
UNCOV
110
                    return;
×
111
                }
112

113
                for (InternalWriteMessage message : messages) {
1✔
114
                    long writeTime = System.currentTimeMillis();
1✔
115
                    message.setWriteTime(writeTime);
1✔
116
                    mainAppender.writeBytes(message);
1✔
117
                }
1✔
118
            } finally {
119
                logDebug("[flushMessages] end");
1✔
120
            }
1✔
121
        }
1✔
122
    }
1✔
123

124
    // endregion
125

126

127
    @Override
128
    public boolean offer(String message) {
129
        return offer(null, message);
1✔
130
    }
131

132
    @Override
133
    public boolean offer(String messageKey, String message) {
134
        InternalWriteMessage internalWriteMessage = new InternalWriteMessage();
1✔
135
        internalWriteMessage.setContent(message);
1✔
136
        internalWriteMessage.setMessageKey(messageKey);
1✔
137
        return this.messageCache.offer(internalWriteMessage);
1✔
138
    }
139

140
    @Override
141
    public boolean offer(String tag, String messageKey, String message) {
142
        InternalWriteMessage internalWriteMessage = new InternalWriteMessage();
1✔
143
        internalWriteMessage.setContent(message);
1✔
144
        internalWriteMessage.setMessageKey(messageKey);
1✔
145
        internalWriteMessage.setTag(tag);
1✔
146
        return this.messageCache.offer(internalWriteMessage);
1✔
147
    }
148

149
    /**
150
     * get the last position
151
     *
152
     * @return this last position
153
     */
154
    public long getLastPosition() {
155
        return this.queue.lastIndex();
1✔
156
    }
157

158
    // region close
159

160
    /**
161
     * is closed.
162
     *
163
     * @return true if the Producer is closed, false otherwise.
164
     */
165
    @Override
166
    public boolean isClosed() {
167
        return isClosed.get();
1✔
168
    }
169

170
    @Override
171
    public void close() {
172
        synchronized (closeLocker) {
1✔
173
            try {
174
                logDebug("[close] start");
1✔
175
                if (isClosing.get()) {
1✔
176
                    logDebug("[close] is closing");
×
177
                    return;
×
178
                }
179
                isClosing.set(true);
1✔
180
                stopFlush();
1✔
181
                if (!queue.isClosed()) {
1✔
182
                    queue.close();
1✔
183
                }
184
                flushExecutor.shutdown();
1✔
185
                scheduler.shutdown();
1✔
186
                try {
187
                    if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
188
                        scheduler.shutdownNow();
×
189
                    }
190
                    if (!flushExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
UNCOV
191
                        flushExecutor.shutdownNow();
×
192
                    }
193
                } catch (InterruptedException e) {
×
194
                    scheduler.shutdownNow();
×
195
                    flushExecutor.shutdownNow();
×
196
                    Thread.currentThread().interrupt();
×
197
                }
1✔
198
                for (CloseListener closeListener : closeListeners) {
1✔
199
                    closeListener.onClose();
1✔
200
                }
1✔
201
                isClosed.set(true);
1✔
202
            } finally {
203
                logDebug("[close] end");
1✔
204
            }
1✔
205
        }
1✔
206
    }
1✔
207

208
    @Override
209
    public void addCloseListener(CloseListener listener) {
210
        closeListeners.add(listener);
1✔
211
    }
1✔
212

213
    private void cleanUpOldFiles(int keepDays) {
214
        if (keepDays == -1) {
1✔
215
            // no need clean up old files
216
            return;
1✔
217
        }
218
        logDebug("[cleanUpOldFiles] start");
1✔
219
        try {
220
            // Assuming .cq4 is the file extension for Chronicle Queue
221
            File[] files = config.getDataDir().listFiles((dir, name) -> name.endsWith(".cq4"));
1✔
222
            if (files == null || files.length == 0) {
1✔
223
                logDebug("[cleanUpOldFiles] no files found");
1✔
224
                return;
1✔
225
            }
226
            LocalDate now = LocalDate.now();
1✔
227
            LocalDate keepStartDate = now.minusDays(keepDays);
1✔
228
            for (File file : files) {
1✔
229
                cleanUpOldFile(file, keepStartDate);
1✔
230
            }
231
        } catch (Exception ex) {
×
232
            logger.error("[cleanUpOldFiles] error", ex);
×
233
        } finally {
234
            logDebug("[cleanUpOldFiles] end");
1✔
235
        }
1✔
236
    }
1✔
237

238

239
    private void cleanUpOldFile(final File file, final LocalDate keepDate) throws IOException {
240
        String fileName = file.getName();
1✔
241
        String dateString = fileName.substring(0, 8);
1✔
242
        LocalDate localDate = LocalDate.parse(dateString, this.dateFormatter);
1✔
243
        if (localDate.isBefore(keepDate)) {
1✔
244
            Files.deleteIfExists(file.toPath());
1✔
245
            logDebug("[cleanUpOldFile] Deleted old file: {}", file.getName());
1✔
246
        }
247
    }
1✔
248

249
    // endregion
250

251
    // region logger
252
    private void logDebug(String format, String arg) {
253
        if (logger.isDebugEnabled()) {
1✔
254
            logger.debug(format, arg);
×
255
        }
256
    }
1✔
257

258

259
    private void logDebug(String format) {
260
        if (logger.isDebugEnabled()) {
1✔
261
            logger.debug(format);
×
262
        }
263
    }
1✔
264

265
    // endregion
266
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc