• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #20

30 Jan 2025 07:56AM UTC coverage: 91.466% (-1.0%) from 92.442%
#20

push

web-flow
Merge pull request #4 from wz2cool/0.1.0

0.1.0

154 of 174 new or added lines in 10 files covered. (88.51%)

6 existing lines in 2 files now uncovered.

418 of 457 relevant lines covered (91.47%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.62
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleProducer.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IProducer;
4
import com.github.wz2cool.localqueue.model.config.SimpleProducerConfig;
5
import com.github.wz2cool.localqueue.model.message.InternalWriteMessage;
6
import net.openhft.chronicle.queue.ChronicleQueue;
7
import net.openhft.chronicle.queue.ExcerptAppender;
8
import net.openhft.chronicle.queue.RollCycles;
9
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
10
import org.slf4j.Logger;
11
import org.slf4j.LoggerFactory;
12

13
import java.io.File;
14
import java.io.IOException;
15
import java.nio.file.Files;
16
import java.time.LocalDate;
17
import java.time.format.DateTimeFormatter;
18
import java.util.ArrayList;
19
import java.util.List;
20
import java.util.concurrent.*;
21
import java.util.concurrent.locks.Lock;
22
import java.util.concurrent.locks.ReentrantLock;
23

24
/**
25
 * simple writer
26
 *
27
 * @author frank
28
 */
29
public class SimpleProducer implements IProducer, AutoCloseable {
30

31
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
1✔
32

33
    private final SimpleProducerConfig config;
34
    private final SingleChronicleQueue queue;
35
    private final LinkedBlockingQueue<String> messageCache = new LinkedBlockingQueue<>();
1✔
36
    // should only call by flushExecutor
37
    private final ExcerptAppender mainAppender;
38
    private final ExecutorService flushExecutor = Executors.newSingleThreadExecutor();
1✔
39
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1✔
40
    private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd");
1✔
41
    private final Lock internalLock = new ReentrantLock();
1✔
42

43
    private volatile boolean isFlushRunning = true;
1✔
44
    private volatile boolean isClosing = false;
1✔
45
    private volatile boolean isClosed = false;
1✔
46

47
    public SimpleProducer(final SimpleProducerConfig config) {
1✔
48
        this.config = config;
1✔
49
        this.queue = ChronicleQueue.singleBuilder(config.getDataDir()).rollCycle(RollCycles.FAST_DAILY).build();
1✔
50
        this.mainAppender = initMainAppender();
1✔
51
        flushExecutor.execute(this::flush);
1✔
52
        scheduler.scheduleAtFixedRate(() -> cleanUpOldFiles(config.getKeepDays()), 0, 1, TimeUnit.HOURS);
1✔
53
    }
1✔
54

55
    private ExcerptAppender initMainAppender() {
56
        return CompletableFuture.supplyAsync(this.queue::createAppender, this.flushExecutor).join();
1✔
57
    }
58

59
    /// region flush to file
60
    private void flush() {
61
        while (isFlushRunning && !isClosing) {
1✔
62
            flushMessages(config.getFlushBatchSize());
1✔
63
        }
64
    }
1✔
65

66
    private void stopFlush() {
67
        isFlushRunning = false;
1✔
68
    }
1✔
69

70
    private final List<String> tempFlushMessages = new ArrayList<>();
1✔
71

72
    private void flushMessages(int batchSize) {
73
        try {
74
            logDebug("[flushInternal] start");
1✔
75
            if (tempFlushMessages.isEmpty()) {
1✔
76
                // take 主要作用就是卡主线程
77
                String firstItem = this.messageCache.poll(config.getFlushInterval(), TimeUnit.MILLISECONDS);
1✔
78
                if (firstItem == null) {
1✔
79
                    return;
1✔
80
                }
81
                this.tempFlushMessages.add(firstItem);
1✔
82
                // 如果空了从消息缓存放入待刷消息
83
                this.messageCache.drainTo(tempFlushMessages, batchSize - 1);
1✔
84
            }
85
            flushMessages(tempFlushMessages);
1✔
86
            tempFlushMessages.clear();
1✔
87
        } catch (InterruptedException ex) {
×
88
            Thread.currentThread().interrupt();
×
89
        } finally {
90
            logDebug("[flushInternal] end");
1✔
91
        }
1✔
92
    }
1✔
93

94
    private void flushMessages(final List<String> messages) {
95
        try {
96
            logDebug("[flushMessages] start");
1✔
97
            internalLock.lock();
1✔
98
            if (!isFlushRunning || isClosing) {
1✔
99
                return;
×
100
            }
101

102
            for (String message : messages) {
1✔
103
                long writeTime = System.currentTimeMillis();
1✔
104
                InternalWriteMessage internalWriteMessage = new InternalWriteMessage();
1✔
105
                internalWriteMessage.setWriteTime(writeTime);
1✔
106
                internalWriteMessage.setContent(message);
1✔
107
                mainAppender.writeBytes(internalWriteMessage);
1✔
108
            }
1✔
109
        } finally {
110
            internalLock.unlock();
1✔
111
            logDebug("[flushMessages] end");
1✔
112
        }
1✔
113
    }
1✔
114

115
    /// endregion
116

117

118
    @Override
119
    public boolean offer(String message) {
120
        return this.messageCache.offer(message);
1✔
121
    }
122

123
    /**
124
     * get the last position
125
     *
126
     * @return this last position
127
     */
128
    public long getLastPosition() {
129
        return this.queue.lastIndex();
1✔
130
    }
131

132
    /// region close
133

134
    /**
135
     * is closed.
136
     *
137
     * @return true if the Producer is closed, false otherwise.
138
     */
139
    public boolean isClosed() {
140
        return isClosed;
1✔
141
    }
142

143
    @Override
144
    public void close() {
145
        try {
146
            logDebug("[close] start");
1✔
147
            isClosing = true;
1✔
148
            internalLock.lock();
1✔
149
            stopFlush();
1✔
150
            internalLock.unlock();
1✔
151
            queue.close();
1✔
152
            flushExecutor.shutdown();
1✔
153
            scheduler.shutdown();
1✔
154
            try {
155
                if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
NEW
156
                    scheduler.shutdownNow();
×
157
                }
158
                if (!flushExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
NEW
159
                    flushExecutor.shutdownNow();
×
160
                }
NEW
161
            } catch (InterruptedException e) {
×
UNCOV
162
                scheduler.shutdownNow();
×
UNCOV
163
                flushExecutor.shutdownNow();
×
NEW
164
                Thread.currentThread().interrupt();
×
165
            }
1✔
166
            isClosed = true;
1✔
167
        } finally {
168
            logDebug("[close] end");
1✔
169
        }
1✔
170
    }
1✔
171

172
    private void cleanUpOldFiles(int keepDays) {
173
        if (keepDays == -1) {
1✔
174
            // no need clean up old files
175
            return;
1✔
176
        }
177
        logDebug("[cleanUpOldFiles] start");
1✔
178
        try {
179
            // Assuming .cq4 is the file extension for Chronicle Queue
180
            File[] files = config.getDataDir().listFiles((dir, name) -> name.endsWith(".cq4"));
1✔
181
            if (files == null || files.length == 0) {
1✔
182
                logDebug("[cleanUpOldFiles] no files found");
1✔
183
                return;
1✔
184
            }
185
            LocalDate now = LocalDate.now();
1✔
186
            LocalDate keepStartDate = now.minusDays(keepDays);
1✔
187
            for (File file : files) {
1✔
188
                cleanUpOldFile(file, keepStartDate);
1✔
189
            }
190
        } catch (Exception ex) {
×
191
            logger.error("[cleanUpOldFiles] error", ex);
×
192
        } finally {
193
            logDebug("[cleanUpOldFiles] end");
1✔
194
        }
1✔
195
    }
1✔
196

197
    private void cleanUpOldFile(final File file, final LocalDate keepDate) throws IOException {
198
        String fileName = file.getName();
1✔
199
        String dateString = fileName.substring(0, 8);
1✔
200
        LocalDate localDate = LocalDate.parse(dateString, this.dateFormatter);
1✔
201
        if (localDate.isBefore(keepDate)) {
1✔
202
            Files.deleteIfExists(file.toPath());
1✔
203
            logDebug("[cleanUpOldFile] Deleted old file: {}", file.getName());
1✔
204
        }
205
    }
1✔
206

207
    /// endregion
208

209
    private void logDebug(String format) {
210
        if (logger.isDebugEnabled()) {
1✔
NEW
211
            logDebug(format);
×
212
        }
213
    }
1✔
214

215
    private void logDebug(String format, Object arg) {
216
        if (logger.isDebugEnabled()) {
1✔
NEW
217
            logDebug(format, arg);
×
218
        }
219
    }
1✔
220
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc