• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #10

29 Jan 2025 05:10AM UTC coverage: 91.94% (+2.1%) from 89.817%
#10

push

wz2cool
move to timestamp

9 of 15 new or added lines in 4 files covered. (60.0%)

1 existing line in 1 file now uncovered.

365 of 397 relevant lines covered (91.94%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.17
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleProducer.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IProducer;
4
import com.github.wz2cool.localqueue.model.config.SimpleProducerConfig;
5
import com.github.wz2cool.localqueue.model.message.InternalWriteMessage;
6
import net.openhft.chronicle.queue.ChronicleQueue;
7
import net.openhft.chronicle.queue.ExcerptAppender;
8
import net.openhft.chronicle.queue.RollCycles;
9
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
10
import org.slf4j.Logger;
11
import org.slf4j.LoggerFactory;
12

13
import java.io.File;
14
import java.io.IOException;
15
import java.nio.file.Files;
16
import java.time.LocalDate;
17
import java.time.format.DateTimeFormatter;
18
import java.util.ArrayList;
19
import java.util.List;
20
import java.util.concurrent.*;
21
import java.util.concurrent.locks.Lock;
22
import java.util.concurrent.locks.ReentrantLock;
23

24
/**
25
 * simple writer
26
 *
27
 * @author frank
28
 */
29
public class SimpleProducer implements IProducer, AutoCloseable {
30

31
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
1✔
32

33
    private final SimpleProducerConfig config;
34
    private final SingleChronicleQueue queue;
35
    private final LinkedBlockingQueue<String> messageCache = new LinkedBlockingQueue<>();
1✔
36
    private final ThreadLocal<ExcerptAppender> appenderThreadLocal;
37
    private final ExecutorService flushExecutor = Executors.newSingleThreadExecutor();
1✔
38
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1✔
39
    private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd");
1✔
40
    private final Lock internalLock = new ReentrantLock();
1✔
41

42
    private volatile boolean isFlushRunning = true;
1✔
43
    private volatile boolean isClosing = false;
1✔
44
    private volatile boolean isClosed = false;
1✔
45

46
    public SimpleProducer(final SimpleProducerConfig config) {
1✔
47
        this.config = config;
1✔
48
        this.queue = ChronicleQueue.singleBuilder(config.getDataDir()).rollCycle(RollCycles.FAST_DAILY).build();
1✔
49
        this.appenderThreadLocal = ThreadLocal.withInitial(this.queue::createAppender);
1✔
50
        flushExecutor.execute(this::flush);
1✔
51
        scheduler.scheduleAtFixedRate(() -> cleanUpOldFiles(config.getKeepDays()), 0, 1, TimeUnit.HOURS);
1✔
52
    }
1✔
53

54
    /// region flush to file
55
    private void flush() {
56
        while (isFlushRunning && !isClosing) {
1✔
57
            flushInternal(config.getFlushBatchSize());
1✔
58
        }
59
    }
1✔
60

61
    private void stopFlush() {
62
        isFlushRunning = false;
1✔
63
    }
1✔
64

65
    private final List<String> tempFlushMessages = new ArrayList<>();
1✔
66

67
    private void flushInternal(int batchSize) {
68
        try {
69
            if (tempFlushMessages.isEmpty()) {
1✔
70
                // take 主要作用就是卡主线程
71
                String firstItem = this.messageCache.poll(config.getFlushInterval(), TimeUnit.MILLISECONDS);
1✔
72
                if (firstItem == null) {
1✔
73
                    return;
1✔
74
                }
75
                this.tempFlushMessages.add(firstItem);
1✔
76
                // 如果空了从消息缓存放入待刷消息
77
                this.messageCache.drainTo(tempFlushMessages, batchSize - 1);
1✔
78
            }
79
            flushInternal(tempFlushMessages);
1✔
80
            tempFlushMessages.clear();
1✔
81
        } catch (InterruptedException ex) {
×
82
            Thread.currentThread().interrupt();
×
83
        }
1✔
84
    }
1✔
85

86
    private void flushInternal(List<String> messages) {
87
        try {
88
            internalLock.lock();
1✔
89
            if (!isFlushRunning || isClosing) {
1✔
UNCOV
90
                return;
×
91
            }
92
            ExcerptAppender appender = appenderThreadLocal.get();
1✔
93
            for (String message : messages) {
1✔
94
                long writeTime = System.currentTimeMillis();
1✔
95
                InternalWriteMessage internalWriteMessage = new InternalWriteMessage();
1✔
96
                internalWriteMessage.setWriteTime(writeTime);
1✔
97
                internalWriteMessage.setContent(message);
1✔
98
                appender.writeBytes(internalWriteMessage);
1✔
99
            }
1✔
100
        } finally {
101
            internalLock.unlock();
1✔
102
        }
1✔
103
    }
1✔
104

105
    /// endregion
106

107

108
    @Override
109
    public boolean offer(String message) {
110
        return this.messageCache.offer(message);
1✔
111
    }
112

113
    /**
114
     * get the last position
115
     *
116
     * @return this last position
117
     */
118
    public long getLastPosition() {
119
        return this.queue.lastIndex();
1✔
120
    }
121

122
    /// region close
123

124
    /**
125
     * is closed.
126
     *
127
     * @return true if the Producer is closed, false otherwise.
128
     */
129
    public boolean isClosed() {
130
        return isClosed;
1✔
131
    }
132

133
    @Override
134
    public void close() {
135
        isClosing = true;
1✔
136
        internalLock.lock();
1✔
137
        stopFlush();
1✔
138
        internalLock.unlock();
1✔
139
        queue.close();
1✔
140
        flushExecutor.shutdown();
1✔
141
        scheduler.shutdown();
1✔
142
        try {
143
            if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
144
                scheduler.shutdownNow();
×
145
            }
146
            if (!flushExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
147
                flushExecutor.shutdownNow();
×
148
            }
149
        } catch (InterruptedException e) {
×
150
            scheduler.shutdownNow();
×
151
            flushExecutor.shutdownNow();
×
152
            Thread.currentThread().interrupt();
×
153
        }
1✔
154

155
        appenderThreadLocal.remove();
1✔
156
        isClosed = true;
1✔
157
    }
1✔
158

159
    private void cleanUpOldFiles(int keepDays) {
160
        if (keepDays == -1) {
1✔
161
            // no need clean up old files
162
            return;
1✔
163
        }
164
        logger.debug("[cleanUpOldFiles] start");
1✔
165
        try {
166
            // Assuming .cq4 is the file extension for Chronicle Queue
167
            File[] files = config.getDataDir().listFiles((dir, name) -> name.endsWith(".cq4"));
1✔
168
            if (files == null || files.length == 0) {
1✔
169
                logger.debug("[cleanUpOldFiles] no files found");
1✔
170
                return;
1✔
171
            }
172
            LocalDate now = LocalDate.now();
1✔
173
            LocalDate keepStartDate = now.minusDays(keepDays);
1✔
174
            for (File file : files) {
1✔
175
                cleanUpOldFile(file, keepStartDate);
1✔
176
            }
177
        } catch (Exception ex) {
×
178
            logger.error("[cleanUpOldFiles] error", ex);
×
179
        } finally {
180
            logger.debug("[cleanUpOldFiles] end");
1✔
181
        }
1✔
182
    }
1✔
183

184
    private void cleanUpOldFile(final File file, final LocalDate keepDate) throws IOException {
185
        String fileName = file.getName();
1✔
186
        String dateString = fileName.substring(0, 8);
1✔
187
        LocalDate localDate = LocalDate.parse(dateString, this.dateFormatter);
1✔
188
        if (localDate.isBefore(keepDate)) {
1✔
189
            Files.deleteIfExists(file.toPath());
1✔
190
            logger.debug("[cleanUpOldFile] Deleted old file: {}", file.getName());
1✔
191
        }
192
    }
1✔
193

194
    /// endregion
195
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc