• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9733

pending completion
#9733

push

travis_ci

web-flow
[To rel/1.2] Add compression and encoding type check for FastCompactionPerformer (#10712)

32 of 32 new or added lines in 4 files covered. (100.0%)

79232 of 165563 relevant lines covered (47.86%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.32
/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
package org.apache.iotdb.tsfile.write.chunk;
20

21
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
22
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
23
import org.apache.iotdb.tsfile.compress.ICompressor;
24
import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
25
import org.apache.iotdb.tsfile.exception.write.PageException;
26
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
27
import org.apache.iotdb.tsfile.file.header.PageHeader;
28
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
29
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
30
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
31
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
32
import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
33
import org.apache.iotdb.tsfile.utils.PublicBAOS;
34
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
35
import org.apache.iotdb.tsfile.write.page.TimePageWriter;
36
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
37

38
import org.slf4j.Logger;
39
import org.slf4j.LoggerFactory;
40

41
import java.io.IOException;
42
import java.nio.ByteBuffer;
43
import java.nio.channels.Channels;
44
import java.nio.channels.WritableByteChannel;
45

46
public class TimeChunkWriter {
47

48
  private static final Logger logger = LoggerFactory.getLogger(TimeChunkWriter.class);
1✔
49

50
  private final String measurementId;
51

52
  private final TSEncoding encodingType;
53

54
  private final CompressionType compressionType;
55

56
  /** all pages of this chunk. */
57
  private final PublicBAOS pageBuffer;
58

59
  private int numOfPages;
60

61
  /** write data into current page */
62
  private TimePageWriter pageWriter;
63

64
  /** page size threshold. */
65
  private final long pageSizeThreshold;
66

67
  private final int maxNumberOfPointsInPage;
68

69
  /** value count in current page. */
70
  private int valueCountInOnePageForNextCheck;
71

72
  // initial value for valueCountInOnePageForNextCheck
73
  private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 1500;
74

75
  /** statistic of this chunk. */
76
  private TimeStatistics statistics;
77

78
  /** first page info */
79
  private int sizeWithoutStatistic;
80

81
  private Statistics<?> firstPageStatistics;
82

83
  public TimeChunkWriter(
84
      String measurementId,
85
      CompressionType compressionType,
86
      TSEncoding encodingType,
87
      Encoder timeEncoder) {
1✔
88
    this.measurementId = measurementId;
1✔
89
    this.encodingType = encodingType;
1✔
90
    this.compressionType = compressionType;
1✔
91
    this.pageBuffer = new PublicBAOS();
1✔
92

93
    this.pageSizeThreshold = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
1✔
94
    this.maxNumberOfPointsInPage =
1✔
95
        TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
1✔
96
    // initial check of memory usage. So that we have enough data to make an initial prediction
97
    this.valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
1✔
98

99
    // init statistics for this chunk and page
100
    this.statistics = new TimeStatistics();
1✔
101

102
    this.pageWriter = new TimePageWriter(timeEncoder, ICompressor.getCompressor(compressionType));
1✔
103
  }
1✔
104

105
  public void write(long time) {
106
    pageWriter.write(time);
1✔
107
  }
1✔
108

109
  public void write(long[] timestamps, int batchSize, int arrayOffset) {
110
    pageWriter.write(timestamps, batchSize, arrayOffset);
1✔
111
  }
1✔
112

113
  /**
114
   * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it
115
   * to pageBuffer
116
   */
117
  public boolean checkPageSizeAndMayOpenANewPage() {
118
    if (pageWriter.getPointNumber() >= maxNumberOfPointsInPage) {
1✔
119
      logger.debug("current line count reaches the upper bound, write page {}", measurementId);
1✔
120
      return true;
1✔
121
    } else if (pageWriter.getPointNumber()
1✔
122
        >= valueCountInOnePageForNextCheck) { // need to check memory size
123
      // not checking the memory used for every value
124
      long currentPageSize = pageWriter.estimateMaxMemSize();
1✔
125
      if (currentPageSize > pageSizeThreshold) { // memory size exceeds threshold
1✔
126
        // we will write the current page
127
        logger.debug(
×
128
            "enough size, write page {}, pageSizeThreshold:{}, currentPateSize:{}, valueCountInOnePage:{}",
129
            measurementId,
130
            pageSizeThreshold,
×
131
            currentPageSize,
×
132
            pageWriter.getPointNumber());
×
133
        valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
×
134
        return true;
×
135
      } else {
136
        // reset the valueCountInOnePageForNextCheck for the next page
137
        valueCountInOnePageForNextCheck =
1✔
138
            (int) (((float) pageSizeThreshold / currentPageSize) * pageWriter.getPointNumber());
1✔
139
      }
140
    }
141
    return false;
1✔
142
  }
143

144
  public long getRemainingPointNumberForCurrentPage() {
145
    return maxNumberOfPointsInPage - pageWriter.getPointNumber();
1✔
146
  }
147

148
  public void writePageToPageBuffer() {
149
    try {
150
      if (numOfPages == 0) { // record the firstPageStatistics
1✔
151
        this.firstPageStatistics = pageWriter.getStatistics();
1✔
152
        this.sizeWithoutStatistic = pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, true);
1✔
153
      } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
1✔
154
        byte[] b = pageBuffer.toByteArray();
1✔
155
        pageBuffer.reset();
1✔
156
        pageBuffer.write(b, 0, this.sizeWithoutStatistic);
1✔
157
        firstPageStatistics.serialize(pageBuffer);
1✔
158
        pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
1✔
159
        pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
1✔
160
        firstPageStatistics = null;
1✔
161
      } else {
1✔
162
        pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
1✔
163
      }
164

165
      // update statistics of this chunk
166
      numOfPages++;
1✔
167
      this.statistics.mergeStatistics(pageWriter.getStatistics());
1✔
168
    } catch (IOException e) {
×
169
      logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e);
×
170
    } finally {
171
      // clear start time stamp for next initializing
172
      pageWriter.reset();
1✔
173
    }
174
  }
1✔
175

176
  public void writePageHeaderAndDataIntoBuff(ByteBuffer data, PageHeader header)
177
      throws PageException {
178
    // write the page header to pageBuffer
179
    try {
180
      logger.debug(
×
181
          "start to flush a page header into buffer, buffer position {} ", pageBuffer.size());
×
182
      // serialize pageHeader  see writePageToPageBuffer method
183
      if (numOfPages == 0) { // record the firstPageStatistics
×
184
        this.firstPageStatistics = header.getStatistics();
×
185
        this.sizeWithoutStatistic +=
×
186
            ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
×
187
        this.sizeWithoutStatistic +=
×
188
            ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
×
189
      } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
×
190
        byte[] b = pageBuffer.toByteArray();
×
191
        pageBuffer.reset();
×
192
        pageBuffer.write(b, 0, this.sizeWithoutStatistic);
×
193
        firstPageStatistics.serialize(pageBuffer);
×
194
        pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
×
195
        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
×
196
        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
×
197
        header.getStatistics().serialize(pageBuffer);
×
198
        firstPageStatistics = null;
×
199
      } else {
×
200
        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
×
201
        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
×
202
        header.getStatistics().serialize(pageBuffer);
×
203
      }
204
      logger.debug(
×
205
          "finish to flush a page header {} of time page into buffer, buffer position {} ",
206
          header,
207
          pageBuffer.size());
×
208

209
      statistics.mergeStatistics(header.getStatistics());
×
210

211
    } catch (IOException e) {
×
212
      throw new PageException("IO Exception in writeDataPageHeader,ignore this page", e);
×
213
    }
×
214
    numOfPages++;
×
215
    // write page content to temp PBAOS
216
    try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
×
217
      channel.write(data);
×
218
    } catch (IOException e) {
×
219
      throw new PageException(e);
×
220
    }
×
221
  }
×
222

223
  public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
224
    sealCurrentPage();
1✔
225
    writeAllPagesOfChunkToTsFile(tsfileWriter);
1✔
226

227
    // reinit this chunk writer
228
    pageBuffer.reset();
1✔
229
    numOfPages = 0;
1✔
230
    sizeWithoutStatistic = 0;
1✔
231
    firstPageStatistics = null;
1✔
232
    this.statistics = new TimeStatistics();
1✔
233
  }
1✔
234

235
  public long estimateMaxSeriesMemSize() {
236
    return pageBuffer.size()
1✔
237
        + pageWriter.estimateMaxMemSize()
1✔
238
        + PageHeader.estimateMaxPageHeaderSizeWithoutStatistics()
1✔
239
        + pageWriter.getStatistics().getSerializedSize();
1✔
240
  }
241

242
  public long getCurrentChunkSize() {
243
    if (pageBuffer.size() == 0) {
1✔
244
      return 0;
1✔
245
    }
246
    // return the serialized size of the chunk header + all pages
247
    return ChunkHeader.getSerializedSize(measurementId, pageBuffer.size())
1✔
248
        + (long) pageBuffer.size();
1✔
249
  }
250

251
  public void sealCurrentPage() {
252
    if (pageWriter != null && pageWriter.getPointNumber() > 0) {
1✔
253
      writePageToPageBuffer();
1✔
254
    }
255
  }
1✔
256

257
  public void clearPageWriter() {
258
    pageWriter = null;
×
259
  }
×
260

261
  public int getNumOfPages() {
262
    return numOfPages;
1✔
263
  }
264

265
  public TSDataType getDataType() {
266
    return TSDataType.VECTOR;
×
267
  }
268

269
  public long getPointNum() {
270
    return statistics.getCount() + pageWriter.getPointNumber();
1✔
271
  }
272

273
  /**
274
   * write the page to specified IOWriter.
275
   *
276
   * @param writer the specified IOWriter
277
   * @throws IOException exception in IO
278
   */
279
  public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer) throws IOException {
280
    if (statistics.getCount() == 0) {
1✔
281
      return;
1✔
282
    }
283

284
    // start to write this column chunk
285
    writer.startFlushChunk(
1✔
286
        measurementId,
287
        compressionType,
288
        TSDataType.VECTOR,
289
        encodingType,
290
        statistics,
291
        pageBuffer.size(),
1✔
292
        numOfPages,
293
        TsFileConstant.TIME_COLUMN_MASK);
294

295
    long dataOffset = writer.getPos();
1✔
296

297
    // write all pages of this column
298
    writer.writeBytesToStream(pageBuffer);
1✔
299

300
    int dataSize = (int) (writer.getPos() - dataOffset);
1✔
301
    if (dataSize != pageBuffer.size()) {
1✔
302
      throw new IOException(
×
303
          "Bytes written is inconsistent with the size of data: "
304
              + dataSize
305
              + " !="
306
              + " "
307
              + pageBuffer.size());
×
308
    }
309

310
    writer.endCurrentChunk();
1✔
311
  }
1✔
312

313
  /** only used for test */
314
  public PublicBAOS getPageBuffer() {
315
    return pageBuffer;
×
316
  }
317

318
  public TimePageWriter getPageWriter() {
319
    return pageWriter;
1✔
320
  }
321

322
  public boolean checkIsUnsealedPageOverThreshold(long size, long pointNum) {
323
    return pageWriter.getPointNumber() >= pointNum || pageWriter.estimateMaxMemSize() >= size;
×
324
  }
325
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc