• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9733

pending completion
#9733

push

travis_ci

web-flow
[To rel/1.2] Add compression and encoding type check for FastCompactionPerformer (#10712)

32 of 32 new or added lines in 4 files covered. (100.0%)

79232 of 165563 relevant lines covered (47.86%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.96
/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
package org.apache.iotdb.tsfile.write.chunk;
20

21
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
22
import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
23
import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder;
24
import org.apache.iotdb.tsfile.exception.write.PageException;
25
import org.apache.iotdb.tsfile.file.header.PageHeader;
26
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
27
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
28
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
29
import org.apache.iotdb.tsfile.read.common.block.column.Column;
30
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
31
import org.apache.iotdb.tsfile.utils.Binary;
32
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
33
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
34
import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
35
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
36

37
import java.io.IOException;
38
import java.nio.ByteBuffer;
39
import java.nio.charset.StandardCharsets;
40
import java.util.ArrayList;
41
import java.util.List;
42

43
public class AlignedChunkWriterImpl implements IChunkWriter {
44

45
  private final TimeChunkWriter timeChunkWriter;
46
  private final List<ValueChunkWriter> valueChunkWriterList;
47
  private int valueIndex;
48

49
  // Used for batch writing
50
  private long remainingPointsNumber;
51

52
  /** @param schema schema of this measurement */
53
  public AlignedChunkWriterImpl(VectorMeasurementSchema schema) {
1✔
54
    timeChunkWriter =
1✔
55
        new TimeChunkWriter(
56
            schema.getMeasurementId(),
1✔
57
            schema.getCompressor(),
1✔
58
            schema.getTimeTSEncoding(),
1✔
59
            schema.getTimeEncoder());
1✔
60

61
    List<String> valueMeasurementIdList = schema.getSubMeasurementsList();
1✔
62
    List<TSDataType> valueTSDataTypeList = schema.getSubMeasurementsTSDataTypeList();
1✔
63
    List<TSEncoding> valueTSEncodingList = schema.getSubMeasurementsTSEncodingList();
1✔
64
    List<Encoder> valueEncoderList = schema.getSubMeasurementsEncoderList();
1✔
65

66
    valueChunkWriterList = new ArrayList<>(valueMeasurementIdList.size());
1✔
67
    for (int i = 0; i < valueMeasurementIdList.size(); i++) {
1✔
68
      valueChunkWriterList.add(
1✔
69
          new ValueChunkWriter(
70
              valueMeasurementIdList.get(i),
1✔
71
              schema.getCompressor(),
1✔
72
              valueTSDataTypeList.get(i),
1✔
73
              valueTSEncodingList.get(i),
1✔
74
              valueEncoderList.get(i)));
1✔
75
    }
76

77
    this.valueIndex = 0;
1✔
78
    this.remainingPointsNumber = timeChunkWriter.getRemainingPointNumberForCurrentPage();
1✔
79
  }
1✔
80

81
  /**
82
   * This is used to rewrite file. The encoding and compression of the time column should be the
83
   * same as the source file.
84
   *
85
   * @param timeSchema time schema
86
   * @param valueSchemaList value schema list
87
   */
88
  public AlignedChunkWriterImpl(
89
      IMeasurementSchema timeSchema, List<IMeasurementSchema> valueSchemaList) {
1✔
90
    timeChunkWriter =
1✔
91
        new TimeChunkWriter(
92
            timeSchema.getMeasurementId(),
1✔
93
            timeSchema.getCompressor(),
1✔
94
            timeSchema.getEncodingType(),
1✔
95
            timeSchema.getTimeEncoder());
1✔
96

97
    valueChunkWriterList = new ArrayList<>(valueSchemaList.size());
1✔
98
    for (int i = 0; i < valueSchemaList.size(); i++) {
1✔
99
      valueChunkWriterList.add(
1✔
100
          new ValueChunkWriter(
101
              valueSchemaList.get(i).getMeasurementId(),
1✔
102
              valueSchemaList.get(i).getCompressor(),
1✔
103
              valueSchemaList.get(i).getType(),
1✔
104
              valueSchemaList.get(i).getEncodingType(),
1✔
105
              valueSchemaList.get(i).getValueEncoder()));
1✔
106
    }
107

108
    this.valueIndex = 0;
1✔
109
    this.remainingPointsNumber = timeChunkWriter.getRemainingPointNumberForCurrentPage();
1✔
110
  }
1✔
111

112
  /**
113
   * This is used to write 0-level file. The compression of the time column is 'LZ4' in the
114
   * configuration by default. The encoding of the time column is 'TS_2DIFF' in the configuration by
115
   * default.
116
   *
117
   * @param schemaList value schema list
118
   */
119
  public AlignedChunkWriterImpl(List<IMeasurementSchema> schemaList) {
1✔
120
    TSEncoding timeEncoding =
121
        TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
1✔
122
    TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
1✔
123
    CompressionType timeCompression = TSFileDescriptor.getInstance().getConfig().getCompressor();
1✔
124
    timeChunkWriter =
1✔
125
        new TimeChunkWriter(
126
            "",
127
            timeCompression,
128
            timeEncoding,
129
            TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType));
1✔
130

131
    valueChunkWriterList = new ArrayList<>(schemaList.size());
1✔
132
    for (int i = 0; i < schemaList.size(); i++) {
1✔
133
      valueChunkWriterList.add(
1✔
134
          new ValueChunkWriter(
135
              schemaList.get(i).getMeasurementId(),
1✔
136
              schemaList.get(i).getCompressor(),
1✔
137
              schemaList.get(i).getType(),
1✔
138
              schemaList.get(i).getEncodingType(),
1✔
139
              schemaList.get(i).getValueEncoder()));
1✔
140
    }
141

142
    this.valueIndex = 0;
1✔
143

144
    this.remainingPointsNumber = timeChunkWriter.getRemainingPointNumberForCurrentPage();
1✔
145
  }
1✔
146

147
  public void write(long time, int value, boolean isNull) {
148
    valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
1✔
149
  }
1✔
150

151
  public void write(long time, long value, boolean isNull) {
152
    valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
1✔
153
  }
1✔
154

155
  public void write(long time, boolean value, boolean isNull) {
156
    valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
1✔
157
  }
1✔
158

159
  public void write(long time, float value, boolean isNull) {
160
    valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
1✔
161
  }
1✔
162

163
  public void write(long time, double value, boolean isNull) {
164
    valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
1✔
165
  }
1✔
166

167
  public void write(long time, Binary value, boolean isNull) {
168
    valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
1✔
169
  }
1✔
170

171
  public void write(long time, int value, boolean isNull, int valueIndex) {
172
    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
×
173
  }
×
174

175
  public void write(long time, long value, boolean isNull, int valueIndex) {
176
    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
×
177
  }
×
178

179
  public void write(long time, boolean value, boolean isNull, int valueIndex) {
180
    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
×
181
  }
×
182

183
  public void write(long time, float value, boolean isNull, int valueIndex) {
184
    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
×
185
  }
×
186

187
  public void write(long time, double value, boolean isNull, int valueIndex) {
188
    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
×
189
  }
×
190

191
  public void write(long time, Binary value, boolean isNull, int valueIndex) {
192
    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
×
193
  }
×
194

195
  public void write(long time, TsPrimitiveType[] points) {
196
    valueIndex = 0;
1✔
197
    for (TsPrimitiveType point : points) {
1✔
198
      ValueChunkWriter writer = valueChunkWriterList.get(valueIndex++);
1✔
199
      switch (writer.getDataType()) {
1✔
200
        case INT64:
201
          writer.write(time, point != null ? point.getLong() : Long.MAX_VALUE, point == null);
1✔
202
          break;
1✔
203
        case INT32:
204
          writer.write(time, point != null ? point.getInt() : Integer.MAX_VALUE, point == null);
1✔
205
          break;
1✔
206
        case FLOAT:
207
          writer.write(time, point != null ? point.getFloat() : Float.MAX_VALUE, point == null);
1✔
208
          break;
1✔
209
        case DOUBLE:
210
          writer.write(time, point != null ? point.getDouble() : Double.MAX_VALUE, point == null);
1✔
211
          break;
1✔
212
        case BOOLEAN:
213
          writer.write(time, point != null ? point.getBoolean() : false, point == null);
1✔
214
          break;
1✔
215
        case TEXT:
216
          writer.write(
1✔
217
              time,
218
              point != null ? point.getBinary() : new Binary("".getBytes(StandardCharsets.UTF_8)),
1✔
219
              point == null);
220
          break;
221
      }
222
    }
223
    write(time);
1✔
224
  }
1✔
225

226
  public void write(long time) {
227
    valueIndex = 0;
1✔
228
    timeChunkWriter.write(time);
1✔
229
    if (checkPageSizeAndMayOpenANewPage()) {
1✔
230
      writePageToPageBuffer();
1✔
231
    }
232
  }
1✔
233

234
  public void writeTime(long time) {
235
    timeChunkWriter.write(time);
×
236
  }
×
237

238
  public void write(TimeColumn timeColumn, Column[] valueColumns, int batchSize) {
239
    if (remainingPointsNumber < batchSize) {
1✔
240
      int pointsHasWritten = (int) remainingPointsNumber;
1✔
241
      batchWrite(timeColumn, valueColumns, pointsHasWritten, 0);
1✔
242
      batchWrite(timeColumn, valueColumns, batchSize - pointsHasWritten, pointsHasWritten);
1✔
243
    } else {
1✔
244
      batchWrite(timeColumn, valueColumns, batchSize, 0);
1✔
245
    }
246
  }
1✔
247

248
  private void batchWrite(
249
      TimeColumn timeColumn, Column[] valueColumns, int batchSize, int arrayOffset) {
250
    valueIndex = 0;
1✔
251
    long[] times = timeColumn.getTimes();
1✔
252

253
    for (Column column : valueColumns) {
1✔
254
      ValueChunkWriter chunkWriter = valueChunkWriterList.get(valueIndex++);
1✔
255
      TSDataType tsDataType = chunkWriter.getDataType();
1✔
256
      switch (tsDataType) {
1✔
257
        case TEXT:
258
          chunkWriter.write(times, column.getBinaries(), column.isNull(), batchSize, arrayOffset);
1✔
259
          break;
1✔
260
        case DOUBLE:
261
          chunkWriter.write(times, column.getDoubles(), column.isNull(), batchSize, arrayOffset);
1✔
262
          break;
1✔
263
        case BOOLEAN:
264
          chunkWriter.write(times, column.getBooleans(), column.isNull(), batchSize, arrayOffset);
1✔
265
          break;
1✔
266
        case INT64:
267
          chunkWriter.write(times, column.getLongs(), column.isNull(), batchSize, arrayOffset);
1✔
268
          break;
1✔
269
        case INT32:
270
          chunkWriter.write(times, column.getInts(), column.isNull(), batchSize, arrayOffset);
1✔
271
          break;
1✔
272
        case FLOAT:
273
          chunkWriter.write(times, column.getFloats(), column.isNull(), batchSize, arrayOffset);
1✔
274
          break;
1✔
275
        default:
276
          throw new UnsupportedOperationException("Unknown data type " + tsDataType);
×
277
      }
278
    }
279

280
    write(times, batchSize, arrayOffset);
1✔
281
  }
1✔
282

283
  public void write(long[] time, int batchSize, int arrayOffset) {
284
    valueIndex = 0;
1✔
285
    timeChunkWriter.write(time, batchSize, arrayOffset);
1✔
286
    if (checkPageSizeAndMayOpenANewPage()) {
1✔
287
      writePageToPageBuffer();
1✔
288
    }
289

290
    remainingPointsNumber = timeChunkWriter.getRemainingPointNumberForCurrentPage();
1✔
291
  }
1✔
292

293
  public void writeByColumn(long time, int value, boolean isNull) {
294
    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
×
295
  }
×
296

297
  public void writeByColumn(long time, long value, boolean isNull) {
298
    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
×
299
  }
×
300

301
  public void writeByColumn(long time, boolean value, boolean isNull) {
302
    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
×
303
  }
×
304

305
  public void writeByColumn(long time, float value, boolean isNull) {
306
    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
×
307
  }
×
308

309
  public void writeByColumn(long time, double value, boolean isNull) {
310
    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
×
311
  }
×
312

313
  public void writeByColumn(long time, Binary value, boolean isNull) {
314
    valueChunkWriterList.get(valueIndex).write(time, value, isNull);
×
315
  }
×
316

317
  public void nextColumn() {
318
    valueIndex++;
×
319
  }
×
320

321
  /**
322
   * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it
323
   * to pageBuffer
324
   */
325
  private boolean checkPageSizeAndMayOpenANewPage() {
326
    if (timeChunkWriter.checkPageSizeAndMayOpenANewPage()) {
1✔
327
      return true;
1✔
328
    }
329
    for (ValueChunkWriter writer : valueChunkWriterList) {
1✔
330
      if (writer.checkPageSizeAndMayOpenANewPage()) {
1✔
331
        return true;
1✔
332
      }
333
    }
1✔
334
    return false;
1✔
335
  }
336

337
  private void writePageToPageBuffer() {
338
    timeChunkWriter.writePageToPageBuffer();
1✔
339
    for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
1✔
340
      valueChunkWriter.writePageToPageBuffer();
1✔
341
    }
1✔
342
  }
1✔
343

344
  public void writePageHeaderAndDataIntoTimeBuff(ByteBuffer data, PageHeader header)
345
      throws PageException {
346
    timeChunkWriter.writePageHeaderAndDataIntoBuff(data, header);
×
347
  }
×
348

349
  public void writePageHeaderAndDataIntoValueBuff(
350
      ByteBuffer data, PageHeader header, int valueIndex) throws PageException {
351
    valueChunkWriterList.get(valueIndex).writePageHeaderAndDataIntoBuff(data, header);
×
352
  }
×
353

354
  @Override
355
  public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
356
    timeChunkWriter.writeToFileWriter(tsfileWriter);
1✔
357
    for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
1✔
358
      valueChunkWriter.writeToFileWriter(tsfileWriter);
1✔
359
    }
1✔
360
  }
1✔
361

362
  @Override
363
  public long estimateMaxSeriesMemSize() {
364
    long estimateMaxSeriesMemSize = timeChunkWriter.estimateMaxSeriesMemSize();
1✔
365
    for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
1✔
366
      estimateMaxSeriesMemSize += valueChunkWriter.estimateMaxSeriesMemSize();
1✔
367
    }
1✔
368
    return estimateMaxSeriesMemSize;
1✔
369
  }
370

371
  public long getSerializedChunkSize() {
372
    long currentChunkSize = timeChunkWriter.getCurrentChunkSize();
1✔
373
    for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
1✔
374
      currentChunkSize += valueChunkWriter.getCurrentChunkSize();
1✔
375
    }
1✔
376
    return currentChunkSize;
1✔
377
  }
378

379
  @Override
380
  public void sealCurrentPage() {
381
    timeChunkWriter.sealCurrentPage();
1✔
382
    for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
1✔
383
      valueChunkWriter.sealCurrentPage();
1✔
384
    }
1✔
385
  }
1✔
386

387
  public void sealCurrentTimePage() {
388
    timeChunkWriter.sealCurrentPage();
×
389
  }
×
390

391
  public void sealCurrentValuePage(int valueIndex) {
392
    valueChunkWriterList.get(valueIndex).sealCurrentPage();
×
393
  }
×
394

395
  @Override
396
  public void clearPageWriter() {
397
    timeChunkWriter.clearPageWriter();
×
398
    for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
×
399
      valueChunkWriter.clearPageWriter();
×
400
    }
×
401
  }
×
402

403
  @Override
404
  public boolean checkIsChunkSizeOverThreshold(
405
      long size, long pointNum, boolean returnTrueIfChunkEmpty) {
406
    if ((returnTrueIfChunkEmpty && timeChunkWriter.getPointNum() == 0)
1✔
407
        || (timeChunkWriter.getPointNum() >= pointNum
1✔
408
            || timeChunkWriter.estimateMaxSeriesMemSize() >= size)) {
1✔
409
      return true;
1✔
410
    }
411
    for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
1✔
412
      if (valueChunkWriter.estimateMaxSeriesMemSize() >= size) {
1✔
413
        return true;
1✔
414
      }
415
    }
1✔
416
    return false;
1✔
417
  }
418

419
  @Override
420
  public boolean checkIsUnsealedPageOverThreshold(
421
      long size, long pointNum, boolean returnTrueIfPageEmpty) {
422
    if ((returnTrueIfPageEmpty && timeChunkWriter.getPageWriter().getPointNumber() == 0)
×
423
        || timeChunkWriter.checkIsUnsealedPageOverThreshold(size, pointNum)) {
×
424
      return true;
×
425
    }
426
    for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
×
427
      if (valueChunkWriter.checkIsUnsealedPageOverThreshold(size)) {
×
428
        return true;
×
429
      }
430
    }
×
431
    return false;
×
432
  }
433

434
  public ValueChunkWriter getValueChunkWriterByIndex(int valueIndex) {
435
    return valueChunkWriterList.get(valueIndex);
1✔
436
  }
437

438
  /** Test only */
439
  public TimeChunkWriter getTimeChunkWriter() {
440
    return timeChunkWriter;
1✔
441
  }
442

443
  /** Test only */
444
  public List<ValueChunkWriter> getValueChunkWriterList() {
445
    return valueChunkWriterList;
1✔
446
  }
447
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc