• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wz2cool / local-queue / #5

27 Jan 2025 01:17PM UTC coverage: 92.442%. First build
#5

push

web-flow
Merge pull request #3 from wz2cool/0.0.5

0.0.5

71 of 82 new or added lines in 5 files covered. (86.59%)

318 of 344 relevant lines covered (92.44%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.64
/src/main/java/com/github/wz2cool/localqueue/impl/SimpleProducer.java
1
package com.github.wz2cool.localqueue.impl;
2

3
import com.github.wz2cool.localqueue.IProducer;
4
import com.github.wz2cool.localqueue.model.config.SimpleProducerConfig;
5
import net.openhft.chronicle.queue.ChronicleQueue;
6
import net.openhft.chronicle.queue.ExcerptAppender;
7
import net.openhft.chronicle.queue.RollCycles;
8
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
9
import org.slf4j.Logger;
10
import org.slf4j.LoggerFactory;
11

12
import java.io.File;
13
import java.io.IOException;
14
import java.nio.file.Files;
15
import java.time.LocalDate;
16
import java.time.format.DateTimeFormatter;
17
import java.util.ArrayList;
18
import java.util.List;
19
import java.util.concurrent.*;
20
import java.util.concurrent.locks.Lock;
21
import java.util.concurrent.locks.ReentrantLock;
22

23
/**
24
 * simple writer
25
 *
26
 * @author frank
27
 */
28
public class SimpleProducer implements IProducer, AutoCloseable {
29

30
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
1✔
31

32
    private final SimpleProducerConfig config;
33
    private final SingleChronicleQueue queue;
34
    private final LinkedBlockingQueue<String> messageCache = new LinkedBlockingQueue<>();
1✔
35
    private final ThreadLocal<ExcerptAppender> appenderThreadLocal;
36
    private final ExecutorService flushExecutor = Executors.newSingleThreadExecutor();
1✔
37
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1✔
38
    private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd");
1✔
39
    private final Lock internalLock = new ReentrantLock();
1✔
40
    private volatile boolean isFlushRunning = true;
1✔
41
    private volatile boolean isClosing = false;
1✔
42
    private volatile boolean isClosed = false;
1✔
43

44
    public SimpleProducer(final SimpleProducerConfig config) {
1✔
45
        this.config = config;
1✔
46
        this.queue = ChronicleQueue.singleBuilder(config.getDataDir()).rollCycle(RollCycles.FAST_DAILY).build();
1✔
47
        this.appenderThreadLocal = ThreadLocal.withInitial(this.queue::createAppender);
1✔
48
        flushExecutor.execute(this::flush);
1✔
49
        scheduler.scheduleAtFixedRate(() -> cleanUpOldFiles(config.getKeepDays()), 0, 1, TimeUnit.HOURS);
1✔
50
    }
1✔
51

52

53
    /// region flush to file
54
    private void flush() {
55
        while (isFlushRunning && !isClosing) {
1✔
56
            flushInternal(config.getFlushBatchSize());
1✔
57
        }
58
    }
1✔
59

60
    private void stopFlush() {
61
        isFlushRunning = false;
1✔
62
    }
1✔
63

64
    private final List<String> tempFlushMessages = new ArrayList<>();
1✔
65

66
    private void flushInternal(int batchSize) {
67
        try {
68
            if (tempFlushMessages.isEmpty()) {
1✔
69
                // take 主要作用就是卡主线程
70
                String firstItem = this.messageCache.poll(config.getFlushInterval(), TimeUnit.MILLISECONDS);
1✔
71
                if (firstItem == null) {
1✔
72
                    return;
1✔
73
                }
74
                this.tempFlushMessages.add(firstItem);
1✔
75
                // 如果空了从消息缓存放入待刷消息
76
                this.messageCache.drainTo(tempFlushMessages, batchSize - 1);
1✔
77
            }
78
            flushInternal(tempFlushMessages);
1✔
79
            tempFlushMessages.clear();
1✔
80
        } catch (InterruptedException ex) {
×
81
            Thread.currentThread().interrupt();
×
82
        }
1✔
83
    }
1✔
84

85
    private void flushInternal(List<String> messages) {
86
        try {
87
            internalLock.lock();
1✔
88
            if (!isFlushRunning || isClosing) {
1✔
89
                return;
×
90
            }
91
            ExcerptAppender appender = appenderThreadLocal.get();
1✔
92
            for (String message : messages) {
1✔
93
                appender.writeText(message);
1✔
94
            }
1✔
95
        } finally {
96
            internalLock.unlock();
1✔
97
        }
1✔
98
    }
1✔
99

100
    /// endregion
101

102

103
    @Override
104
    public boolean offer(String message) {
105
        return this.messageCache.offer(message);
1✔
106
    }
107

108
    /**
109
     * get the last position
110
     *
111
     * @return this last position
112
     */
113
    public long getLastPosition() {
114
        return this.queue.lastIndex();
1✔
115
    }
116

117
    /// region close
118

119
    /**
120
     * is closed.
121
     *
122
     * @return true if the Producer is closed, false otherwise.
123
     */
124
    public boolean isClosed() {
125
        return isClosed;
1✔
126
    }
127

128
    @Override
129
    public void close() {
130
        isClosing = true;
1✔
131
        internalLock.lock();
1✔
132
        stopFlush();
1✔
133
        internalLock.unlock();
1✔
134
        queue.close();
1✔
135
        flushExecutor.shutdown();
1✔
136
        scheduler.shutdown();
1✔
137
        try {
138
            if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
139
                scheduler.shutdownNow();
×
140
            }
141
            if (!flushExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
1✔
142
                flushExecutor.shutdownNow();
×
143
            }
NEW
144
        } catch (InterruptedException e) {
×
NEW
145
            scheduler.shutdownNow();
×
NEW
146
            flushExecutor.shutdownNow();
×
NEW
147
            Thread.currentThread().interrupt();
×
148
        }
1✔
149

150
        appenderThreadLocal.remove();
1✔
151
        isClosed = true;
1✔
152
    }
1✔
153

154
    private void cleanUpOldFiles(int keepDays) {
155
        if (keepDays == -1) {
1✔
156
            // no need clean up old files
157
            return;
1✔
158
        }
159
        logger.debug("[cleanUpOldFiles] start");
1✔
160
        try {
161
            // Assuming .cq4 is the file extension for Chronicle Queue
162
            File[] files = config.getDataDir().listFiles((dir, name) -> name.endsWith(".cq4"));
1✔
163
            if (files == null || files.length == 0) {
1✔
164
                logger.debug("[cleanUpOldFiles] no files found");
1✔
165
                return;
1✔
166
            }
167
            LocalDate now = LocalDate.now();
1✔
168
            LocalDate keepStartDate = now.minusDays(keepDays);
1✔
169
            for (File file : files) {
1✔
170
                cleanUpOldFile(file, keepStartDate);
1✔
171
            }
172
        } catch (Exception ex) {
×
173
            logger.error("[cleanUpOldFiles] error", ex);
×
174
        } finally {
175
            logger.debug("[cleanUpOldFiles] end");
1✔
176
        }
1✔
177
    }
1✔
178

179
    private void cleanUpOldFile(final File file, final LocalDate keepDate) throws IOException {
180
        String fileName = file.getName();
1✔
181
        String dateString = fileName.substring(0, 8);
1✔
182
        LocalDate localDate = LocalDate.parse(dateString, this.dateFormatter);
1✔
183
        if (localDate.isBefore(keepDate)) {
1✔
184
            Files.deleteIfExists(file.toPath());
1✔
185
            logger.debug("[cleanUpOldFile] Deleted old file: {}", file.getName());
1✔
186
        }
187
    }
1✔
188

189
    /// endregion
190
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc