• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9733

pending completion
#9733

push

travis_ci

web-flow
[To rel/1.2] Add compression and encoding type check for FastCompactionPerformer (#10712)

32 of 32 new or added lines in 4 files covered. (100.0%)

79232 of 165563 relevant lines covered (47.86%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.84
/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
package org.apache.iotdb.tsfile.write.chunk;
20

21
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
22
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
23
import org.apache.iotdb.tsfile.compress.ICompressor;
24
import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
25
import org.apache.iotdb.tsfile.exception.write.PageException;
26
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
27
import org.apache.iotdb.tsfile.file.header.PageHeader;
28
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
29
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
30
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
31
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
32
import org.apache.iotdb.tsfile.utils.Binary;
33
import org.apache.iotdb.tsfile.utils.PublicBAOS;
34
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
35
import org.apache.iotdb.tsfile.write.page.ValuePageWriter;
36
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
37

38
import org.slf4j.Logger;
39
import org.slf4j.LoggerFactory;
40

41
import java.io.IOException;
42
import java.io.Serializable;
43
import java.nio.ByteBuffer;
44
import java.nio.channels.Channels;
45
import java.nio.channels.WritableByteChannel;
46

47
public class ValueChunkWriter {
48

49
  private static final Logger logger = LoggerFactory.getLogger(ValueChunkWriter.class);
1✔
50

51
  private final String measurementId;
52

53
  private final TSEncoding encodingType;
54

55
  private final TSDataType dataType;
56

57
  private final CompressionType compressionType;
58

59
  /** all pages of this chunk. */
60
  private final PublicBAOS pageBuffer;
61

62
  private int numOfPages;
63

64
  /** write data into current page */
65
  private ValuePageWriter pageWriter;
66

67
  /** page size threshold. */
68
  private final long pageSizeThreshold;
69

70
  private final int maxNumberOfPointsInPage;
71

72
  /** value count in current page. */
73
  private int valueCountInOnePageForNextCheck;
74

75
  // initial value for valueCountInOnePageForNextCheck
76
  private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 1500;
77

78
  /** statistic of this chunk. */
79
  private Statistics<? extends Serializable> statistics;
80

81
  /** first page info */
82
  private int sizeWithoutStatistic;
83

84
  private Statistics<?> firstPageStatistics;
85

86
  public ValueChunkWriter(
87
      String measurementId,
88
      CompressionType compressionType,
89
      TSDataType dataType,
90
      TSEncoding encodingType,
91
      Encoder valueEncoder) {
1✔
92
    this.measurementId = measurementId;
1✔
93
    this.encodingType = encodingType;
1✔
94
    this.dataType = dataType;
1✔
95
    this.compressionType = compressionType;
1✔
96
    this.pageBuffer = new PublicBAOS();
1✔
97
    this.pageSizeThreshold = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
1✔
98
    this.maxNumberOfPointsInPage =
1✔
99
        TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
1✔
100
    this.valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
1✔
101

102
    // init statistics for this chunk and page
103
    this.statistics = Statistics.getStatsByType(dataType);
1✔
104

105
    this.pageWriter =
1✔
106
        new ValuePageWriter(valueEncoder, ICompressor.getCompressor(compressionType), dataType);
1✔
107
  }
1✔
108

109
  public void write(long time, long value, boolean isNull) {
110
    pageWriter.write(time, value, isNull);
1✔
111
  }
1✔
112

113
  public void write(long time, int value, boolean isNull) {
114
    pageWriter.write(time, value, isNull);
1✔
115
  }
1✔
116

117
  public void write(long time, boolean value, boolean isNull) {
118
    pageWriter.write(time, value, isNull);
1✔
119
  }
1✔
120

121
  public void write(long time, float value, boolean isNull) {
122
    pageWriter.write(time, value, isNull);
1✔
123
  }
1✔
124

125
  public void write(long time, double value, boolean isNull) {
126
    pageWriter.write(time, value, isNull);
1✔
127
  }
1✔
128

129
  public void write(long time, Binary value, boolean isNull) {
130
    pageWriter.write(time, value, isNull);
1✔
131
  }
1✔
132

133
  public void write(long[] timestamps, int[] values, boolean[] isNull, int batchSize, int pos) {
134
    pageWriter.write(timestamps, values, isNull, batchSize, pos);
1✔
135
  }
1✔
136

137
  public void write(long[] timestamps, long[] values, boolean[] isNull, int batchSize, int pos) {
138
    pageWriter.write(timestamps, values, isNull, batchSize, pos);
1✔
139
  }
1✔
140

141
  public void write(long[] timestamps, boolean[] values, boolean[] isNull, int batchSize, int pos) {
142
    pageWriter.write(timestamps, values, isNull, batchSize, pos);
1✔
143
  }
1✔
144

145
  public void write(long[] timestamps, float[] values, boolean[] isNull, int batchSize, int pos) {
146
    pageWriter.write(timestamps, values, isNull, batchSize, pos);
1✔
147
  }
1✔
148

149
  public void write(long[] timestamps, double[] values, boolean[] isNull, int batchSize, int pos) {
150
    pageWriter.write(timestamps, values, isNull, batchSize, pos);
1✔
151
  }
1✔
152

153
  public void write(long[] timestamps, Binary[] values, boolean[] isNull, int batchSize, int pos) {
154
    pageWriter.write(timestamps, values, isNull, batchSize, pos);
1✔
155
  }
1✔
156

157
  public void writeEmptyPageToPageBuffer() throws IOException {
158
    if (numOfPages == 1 && firstPageStatistics != null) {
1✔
159
      // if the first page is not an empty page
160
      byte[] b = pageBuffer.toByteArray();
1✔
161
      pageBuffer.reset();
1✔
162
      pageBuffer.write(b, 0, this.sizeWithoutStatistic);
1✔
163
      firstPageStatistics.serialize(pageBuffer);
1✔
164
      pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
1✔
165
      firstPageStatistics = null;
1✔
166
    }
167
    pageWriter.writeEmptyPageIntoBuff(pageBuffer);
1✔
168
    numOfPages++;
1✔
169
  }
1✔
170

171
  public void writePageToPageBuffer() {
172
    try {
173
      if (numOfPages == 0) {
1✔
174
        if (pageWriter.getStatistics().getCount() != 0) {
1✔
175
          // record the firstPageStatistics if it is not empty page
176
          this.firstPageStatistics = pageWriter.getStatistics();
1✔
177
        }
178
        this.sizeWithoutStatistic = pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, true);
1✔
179
      } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
1✔
180
        if (firstPageStatistics != null) { // Consider previous page is an empty page
1✔
181
          byte[] b = pageBuffer.toByteArray();
1✔
182
          pageBuffer.reset();
1✔
183
          pageBuffer.write(b, 0, this.sizeWithoutStatistic);
1✔
184
          firstPageStatistics.serialize(pageBuffer);
1✔
185
          pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
1✔
186
        }
187
        pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
1✔
188
        firstPageStatistics = null;
1✔
189
      } else {
190
        pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, false);
1✔
191
      }
192

193
      // update statistics of this chunk
194
      numOfPages++;
1✔
195
      this.statistics.mergeStatistics(pageWriter.getStatistics());
1✔
196
    } catch (IOException e) {
×
197
      logger.error("meet error in pageWriter.writePageHeaderAndDataIntoBuff,ignore this page:", e);
×
198
    } finally {
199
      // clear start time stamp for next initializing
200
      pageWriter.reset(dataType);
1✔
201
    }
202
  }
1✔
203

204
  public void writePageHeaderAndDataIntoBuff(ByteBuffer data, PageHeader header)
205
      throws PageException {
206
    // write the page header to pageBuffer
207
    try {
208
      logger.debug(
×
209
          "start to flush a page header into buffer, buffer position {} ", pageBuffer.size());
×
210
      // serialize pageHeader  see writePageToPageBuffer method
211
      if (numOfPages == 0) { // record the firstPageStatistics
×
212

213
        this.sizeWithoutStatistic +=
×
214
            ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
×
215

216
        if (header.getStatistics() == null) {
×
217
          this.firstPageStatistics = null;
×
218
        } else {
219
          this.firstPageStatistics = header.getStatistics();
×
220
          this.sizeWithoutStatistic +=
×
221
              ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
×
222
        }
223
      } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
×
224
        if (firstPageStatistics != null) {
×
225
          byte[] b = pageBuffer.toByteArray();
×
226
          pageBuffer.reset();
×
227
          pageBuffer.write(b, 0, this.sizeWithoutStatistic);
×
228
          firstPageStatistics.serialize(pageBuffer);
×
229
          pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
×
230
        }
231
        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
×
232
        if (header.getUncompressedSize() != 0) {
×
233
          ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
×
234
          header.getStatistics().serialize(pageBuffer);
×
235
        }
236

237
        firstPageStatistics = null;
×
238
      } else {
239
        ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
×
240
        if (header.getUncompressedSize() != 0) {
×
241
          ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
×
242
          header.getStatistics().serialize(pageBuffer);
×
243
        }
244
      }
245
      logger.debug(
×
246
          "finish to flush a page header {} of time page into buffer, buffer position {} ",
247
          header,
248
          pageBuffer.size());
×
249

250
      if (header.getStatistics() != null) {
×
251
        statistics.mergeStatistics(header.getStatistics());
×
252
      }
253
    } catch (IOException e) {
×
254
      throw new PageException("IO Exception in writeDataPageHeader,ignore this page", e);
×
255
    }
×
256
    numOfPages++;
×
257
    // write page content to temp PBAOS
258
    try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
×
259
      channel.write(data);
×
260
    } catch (IOException e) {
×
261
      throw new PageException(e);
×
262
    }
×
263
  }
×
264

265
  public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
266
    sealCurrentPage();
1✔
267
    writeAllPagesOfChunkToTsFile(tsfileWriter);
1✔
268

269
    // reinit this chunk writer
270
    pageBuffer.reset();
1✔
271
    numOfPages = 0;
1✔
272
    sizeWithoutStatistic = 0;
1✔
273
    firstPageStatistics = null;
1✔
274
    this.statistics = Statistics.getStatsByType(dataType);
1✔
275
  }
1✔
276

277
  public long estimateMaxSeriesMemSize() {
278
    return pageBuffer.size()
1✔
279
        + pageWriter.estimateMaxMemSize()
1✔
280
        + PageHeader.estimateMaxPageHeaderSizeWithoutStatistics()
1✔
281
        + pageWriter.getStatistics().getSerializedSize();
1✔
282
  }
283

284
  public long getCurrentChunkSize() {
285
    /**
286
     * It may happen if subsequent write operations are all out of order, then count of statistics
287
     * in this chunk will be 0 and this chunk will not be flushed.
288
     */
289
    if (pageBuffer.size() == 0) {
1✔
290
      return 0;
×
291
    }
292

293
    // Empty chunk, it may happen if pageBuffer stores empty bits and only chunk header will be
294
    // flushed.
295
    if (statistics.getCount() == 0) {
1✔
296
      return ChunkHeader.getSerializedSize(measurementId, 0);
1✔
297
    }
298

299
    // return the serialized size of the chunk header + all pages
300
    return ChunkHeader.getSerializedSize(measurementId, pageBuffer.size())
1✔
301
        + (long) pageBuffer.size();
1✔
302
  }
303

304
  public boolean checkPageSizeAndMayOpenANewPage() {
305
    if (pageWriter.getPointNumber() == maxNumberOfPointsInPage) {
1✔
306
      logger.debug("current line count reaches the upper bound, write page {}", measurementId);
×
307
      return true;
×
308
    } else if (pageWriter.getPointNumber()
1✔
309
        >= valueCountInOnePageForNextCheck) { // need to check memory size
310
      // not checking the memory used for every value
311
      long currentPageSize = pageWriter.estimateMaxMemSize();
1✔
312
      if (currentPageSize > pageSizeThreshold) { // memory size exceeds threshold
1✔
313
        // we will write the current page
314
        logger.debug(
1✔
315
            "enough size, write page {}, pageSizeThreshold:{}, currentPageSize:{}, valueCountInOnePage:{}",
316
            measurementId,
317
            pageSizeThreshold,
1✔
318
            currentPageSize,
1✔
319
            pageWriter.getPointNumber());
1✔
320
        valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
1✔
321
        return true;
1✔
322
      } else {
323
        // reset the valueCountInOnePageForNextCheck for the next page
324
        valueCountInOnePageForNextCheck =
1✔
325
            (int) (((float) pageSizeThreshold / currentPageSize) * pageWriter.getPointNumber());
1✔
326
      }
327
    }
328
    return false;
1✔
329
  }
330

331
  public void sealCurrentPage() {
332
    // if the page contains no points, we still need to serialize it
333
    if (pageWriter != null && pageWriter.getSize() != 0) {
1✔
334
      writePageToPageBuffer();
1✔
335
    }
336
  }
1✔
337

338
  public void clearPageWriter() {
339
    pageWriter = null;
×
340
  }
×
341

342
  public int getNumOfPages() {
343
    return numOfPages;
×
344
  }
345

346
  public TSDataType getDataType() {
347
    return dataType;
1✔
348
  }
349

350
  /**
351
   * write the page to specified IOWriter.
352
   *
353
   * @param writer the specified IOWriter
354
   * @throws IOException exception in IO
355
   */
356
  public void writeAllPagesOfChunkToTsFile(TsFileIOWriter writer) throws IOException {
357
    if (statistics.getCount() == 0) {
1✔
358
      if (pageBuffer.size() == 0) {
1✔
359
        return;
1✔
360
      }
361
      // In order to ensure that different chunkgroups in a tsfile have the same chunks or if all
362
      // data of this timeseries has been deleted, it is possible to have an empty valueChunk in a
363
      // chunkGroup during compaction. To save the disk space, we only serialize chunkHeader for the
364
      // empty valueChunk, whose dataSize is 0.
365
      writer.startFlushChunk(
1✔
366
          measurementId,
367
          compressionType,
368
          dataType,
369
          encodingType,
370
          statistics,
371
          0,
372
          0,
373
          TsFileConstant.VALUE_COLUMN_MASK);
374
      writer.endCurrentChunk();
1✔
375
      return;
1✔
376
    }
377

378
    // start to write this column chunk
379
    writer.startFlushChunk(
1✔
380
        measurementId,
381
        compressionType,
382
        dataType,
383
        encodingType,
384
        statistics,
385
        pageBuffer.size(),
1✔
386
        numOfPages,
387
        TsFileConstant.VALUE_COLUMN_MASK);
388

389
    long dataOffset = writer.getPos();
1✔
390

391
    // write all pages of this column
392
    writer.writeBytesToStream(pageBuffer);
1✔
393

394
    int dataSize = (int) (writer.getPos() - dataOffset);
1✔
395
    if (dataSize != pageBuffer.size()) {
1✔
396
      throw new IOException(
×
397
          "Bytes written is inconsistent with the size of data: "
398
              + dataSize
399
              + " !="
400
              + " "
401
              + pageBuffer.size());
×
402
    }
403

404
    writer.endCurrentChunk();
1✔
405
  }
1✔
406

407
  public String getMeasurementId() {
408
    return measurementId;
×
409
  }
410

411
  public TSEncoding getEncodingType() {
412
    return encodingType;
×
413
  }
414

415
  public CompressionType getCompressionType() {
416
    return compressionType;
×
417
  }
418

419
  /** only used for test */
420
  public PublicBAOS getPageBuffer() {
421
    return pageBuffer;
×
422
  }
423

424
  public boolean checkIsUnsealedPageOverThreshold(long size) {
425
    return pageWriter.estimateMaxMemSize() >= size;
×
426
  }
427

428
  public ValuePageWriter getPageWriter() {
429
    return pageWriter;
1✔
430
  }
431
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc