• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #82

06 Feb 2025 03:31PM UTC coverage: 91.284% (-0.3%) from 91.534%
#82

push

wz2cool
support headers

68 of 70 new or added lines in 5 files covered. (97.14%)

3 existing lines in 1 file now uncovered.

775 of 849 relevant lines covered (91.28%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.76
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleProducer.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IProducer;
4
import com.github.wz2cool.localqueue.event.CloseListener;
5
import com.github.wz2cool.localqueue.helper.ChronicleQueueHelper;
6
import com.github.wz2cool.localqueue.model.config.SimpleProducerConfig;
7
import com.github.wz2cool.localqueue.model.message.internal.HeaderMessage;
8
import com.github.wz2cool.localqueue.model.message.internal.InternalMessage;
9
import net.openhft.chronicle.core.time.TimeProvider;
10
import net.openhft.chronicle.queue.ChronicleQueue;
11
import net.openhft.chronicle.queue.ExcerptAppender;
12
import net.openhft.chronicle.queue.RollCycle;
13
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
14
import org.slf4j.Logger;
15
import org.slf4j.LoggerFactory;
16

17
import java.io.File;
18
import java.io.IOException;
19
import java.nio.file.Files;
20
import java.time.LocalDate;
21
import java.time.format.DateTimeFormatter;
22
import java.util.ArrayList;
23
import java.util.List;
24
import java.util.concurrent.*;
25
import java.util.concurrent.atomic.AtomicBoolean;
26
import java.util.function.Consumer;
27

28
/**
29
 * simple writer
30
 *
31
 * @author frank
32
 */
33
@SuppressWarnings("Duplicates")
34
public class SimpleProducer implements IProducer {
35

36
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
1✔
37
    private final RollCycle defaultRollCycle;
38
    private final TimeProvider timeProvider;
39
    private final SimpleProducerConfig config;
40
    private final SingleChronicleQueue queue;
41
    private final LinkedBlockingQueue<InternalMessage> messageCache = new LinkedBlockingQueue<>();
1✔
42
    // should only call by flushExecutor
43
    private final ExcerptAppender mainAppender;
44
    private final ExecutorService flushExecutor = Executors.newSingleThreadExecutor();
1✔
45
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1✔
46
    private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd");
1✔
47
    private final ConcurrentLinkedQueue<CloseListener> closeListeners = new ConcurrentLinkedQueue<>();
1✔
48

49
    private final AtomicBoolean isFlushRunning = new AtomicBoolean(true);
1✔
50
    private final AtomicBoolean isClosing = new AtomicBoolean(false);
1✔
51
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
1✔
52
    private final Object closeLocker = new Object();
1✔
53

54
    public SimpleProducer(final SimpleProducerConfig config) {
1✔
55
        this.config = config;
1✔
56
        this.timeProvider = ChronicleQueueHelper.getTimeProvider(config.getTimeZone());
1✔
57
        this.defaultRollCycle = ChronicleQueueHelper.getRollCycle(config.getRollCycleType());
1✔
58
        this.queue = ChronicleQueue.singleBuilder(config.getDataDir())
1✔
59
                .rollCycle(defaultRollCycle)
1✔
60
                .timeProvider(timeProvider)
1✔
61
                .build();
1✔
62
        this.mainAppender = initMainAppender();
1✔
63
        flushExecutor.execute(this::flush);
1✔
64
        scheduler.scheduleAtFixedRate(() -> cleanUpOldFiles(config.getKeepDays()), 0, 1, TimeUnit.HOURS);
1✔
65
    }
1✔
66

67
    private ExcerptAppender initMainAppender() {
68
        return CompletableFuture.supplyAsync(this.queue::createAppender, this.flushExecutor).join();
1✔
69
    }
70

71
    // region flush to file
72

73
    private void stopFlush() {
74
        isFlushRunning.set(false);
1✔
75
    }
1✔
76

77
    private void flush() {
78
        while (isFlushRunning.get() && !isClosing.get()) {
1✔
79
            flushMessages(config.getFlushBatchSize());
1✔
80
        }
81
    }
1✔
82

83
    private final List<InternalMessage> tempFlushMessages = new ArrayList<>();
1✔
84

85
    private void flushMessages(int batchSize) {
86
        try {
87
            logDebug("[flushInternal] start");
1✔
88
            if (tempFlushMessages.isEmpty()) {
1✔
89
                // take 主要作用就是卡主线程
90
                InternalMessage firstItem = this.messageCache.poll(config.getFlushInterval(), TimeUnit.MILLISECONDS);
1✔
91
                if (firstItem == null) {
1✔
92
                    return;
1✔
93
                }
94
                this.tempFlushMessages.add(firstItem);
1✔
95
                // 如果空了从消息缓存放入待刷消息
96
                this.messageCache.drainTo(tempFlushMessages, batchSize - 1);
1✔
97
            }
98
            doFlushMessages(tempFlushMessages);
1✔
99
            tempFlushMessages.clear();
1✔
100
        } catch (InterruptedException ex) {
×
101
            Thread.currentThread().interrupt();
×
102
        } catch (Exception ex) {
×
103
            logger.error("[flushInternal] flush error", ex);
×
104
        } finally {
105
            logDebug("[flushInternal] end");
1✔
106
        }
1✔
107
    }
1✔
108

109
    private void doFlushMessages(final List<InternalMessage> messages) {
110
        synchronized (closeLocker) {
1✔
111
            try {
112
                logDebug("[flushMessages] start");
1✔
113
                if (isClosing.get()) {
1✔
UNCOV
114
                    logDebug("[flushMessages] producer is closing");
×
UNCOV
115
                    return;
×
116
                }
117

118
                for (InternalMessage message : messages) {
1✔
119
                    long writeTime = System.currentTimeMillis();
1✔
120
                    message.setWriteTime(writeTime);
1✔
121
                    mainAppender.writeBytes(message);
1✔
122
                }
1✔
123
            } finally {
124
                logDebug("[flushMessages] end");
1✔
125
            }
1✔
126
        }
1✔
127
    }
1✔
128

129
    // endregion
130

131

132
    @Override
133
    public boolean offer(String message) {
134
        return offer(null, message);
1✔
135
    }
136

137
    @Override
138
    public boolean offer(String messageKey, String message) {
139
        InternalMessage internalMessage = new InternalMessage();
1✔
140
        internalMessage.setContent(message);
1✔
141
        internalMessage.setMessageKey(messageKey);
1✔
142
        return this.messageCache.offer(internalMessage);
1✔
143
    }
144

145
    @Override
146
    public boolean offer(String tag, String messageKey, String message) {
147
        InternalMessage internalMessage = new InternalMessage();
1✔
148
        internalMessage.setContent(message);
1✔
149
        internalMessage.setMessageKey(messageKey);
1✔
150
        internalMessage.setTag(tag);
1✔
151
        return this.messageCache.offer(internalMessage);
1✔
152
    }
153

154

155
    @Override
156
    public boolean offer(String tag, String messageKey, String message, Consumer<HeaderMessage> headerConsumer) {
157
        InternalMessage internalMessage = new InternalMessage();
1✔
158
        internalMessage.setContent(message);
1✔
159
        internalMessage.setMessageKey(messageKey);
1✔
160
        internalMessage.setTag(tag);
1✔
161
        headerConsumer.accept(internalMessage.getHeaderMessage());
1✔
162
        return this.messageCache.offer(internalMessage);
1✔
163
    }
164

165
    /**
166
     * get the last position
167
     *
168
     * @return this last position
169
     */
170
    public long getLastPosition() {
171
        return this.queue.lastIndex();
1✔
172
    }
173

174
    // region close
175

176
    /**
177
     * is closed.
178
     *
179
     * @return true if the Producer is closed, false otherwise.
180
     */
181
    @Override
182
    public boolean isClosed() {
183
        return isClosed.get();
1✔
184
    }
185

186
    @Override
187
    public void close() {
188
        synchronized (closeLocker) {
1✔
189
            try {
190
                logDebug("[close] start");
1✔
191
                if (isClosing.get()) {
1✔
192
                    logDebug("[close] is closing");
×
193
                    return;
×
194
                }
195
                isClosing.set(true);
1✔
196
                stopFlush();
1✔
197
                if (!queue.isClosed()) {
1✔
198
                    queue.close();
1✔
199
                }
200
                flushExecutor.shutdown();
1✔
201
                scheduler.shutdown();
1✔
202
                try {
203
                    if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
204
                        scheduler.shutdownNow();
×
205
                    }
206
                    if (!flushExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
UNCOV
207
                        flushExecutor.shutdownNow();
×
208
                    }
209
                } catch (InterruptedException e) {
×
210
                    scheduler.shutdownNow();
×
211
                    flushExecutor.shutdownNow();
×
212
                    Thread.currentThread().interrupt();
×
213
                }
1✔
214
                for (CloseListener closeListener : closeListeners) {
1✔
215
                    closeListener.onClose();
1✔
216
                }
1✔
217
                isClosed.set(true);
1✔
218
            } finally {
219
                logDebug("[close] end");
1✔
220
            }
1✔
221
        }
1✔
222
    }
1✔
223

224
    @Override
225
    public void addCloseListener(CloseListener listener) {
226
        closeListeners.add(listener);
1✔
227
    }
1✔
228

229
    private void cleanUpOldFiles(int keepDays) {
230
        if (keepDays == -1) {
1✔
231
            // no need clean up old files
232
            return;
1✔
233
        }
234
        logDebug("[cleanUpOldFiles] start");
1✔
235
        try {
236
            // Assuming .cq4 is the file extension for Chronicle Queue
237
            File[] files = config.getDataDir().listFiles((dir, name) -> name.endsWith(".cq4"));
1✔
238
            if (files == null || files.length == 0) {
1✔
239
                logDebug("[cleanUpOldFiles] no files found");
1✔
240
                return;
1✔
241
            }
242
            LocalDate now = LocalDate.now();
1✔
243
            LocalDate keepStartDate = now.minusDays(keepDays);
1✔
244
            for (File file : files) {
1✔
245
                cleanUpOldFile(file, keepStartDate);
1✔
246
            }
247
        } catch (Exception ex) {
×
248
            logger.error("[cleanUpOldFiles] error", ex);
×
249
        } finally {
250
            logDebug("[cleanUpOldFiles] end");
1✔
251
        }
1✔
252
    }
1✔
253

254

255
    private void cleanUpOldFile(final File file, final LocalDate keepDate) throws IOException {
256
        String fileName = file.getName();
1✔
257
        String dateString = fileName.substring(0, 8);
1✔
258
        LocalDate localDate = LocalDate.parse(dateString, this.dateFormatter);
1✔
259
        if (localDate.isBefore(keepDate)) {
1✔
260
            Files.deleteIfExists(file.toPath());
1✔
261
            logDebug("[cleanUpOldFile] Deleted old file: {}", file.getName());
1✔
262
        }
263
    }
1✔
264

265
    // endregion
266

267
    // region logger
268
    private void logDebug(String format, String arg) {
269
        if (logger.isDebugEnabled()) {
1✔
270
            logger.debug(format, arg);
×
271
        }
272
    }
1✔
273

274

275
    private void logDebug(String format) {
276
        if (logger.isDebugEnabled()) {
1✔
277
            logger.debug(format);
×
278
        }
279
    }
1✔
280

281
    // endregion
282
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc