• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9647

pending completion
#9647

push

travis_ci

web-flow
[IOTDB-6075] Pipe: File resource races when different tsfile load operations concurrently modify the same tsfile at receiver (#10629)

* Add a parameter in iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties to control the connection timeout.
* Set the pipe connection timeout between sender and receiver to 15 mins to allow long time-cost load operation.
* Redesign the pipe receiver's dir to avoid file resource races when different tsfile load operations concurrently modify the same tsfile.

184 of 184 new or added lines in 9 files covered. (100.0%)

79058 of 165585 relevant lines covered (47.74%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.84
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.utils.datastructure;
21

22
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
23
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
24
import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
25
import org.apache.iotdb.db.utils.MathUtils;
26
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
27
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
28
import org.apache.iotdb.tsfile.read.TimeValuePair;
29
import org.apache.iotdb.tsfile.read.common.TimeRange;
30
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
31
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
32
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
33
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
34
import org.apache.iotdb.tsfile.utils.Binary;
35
import org.apache.iotdb.tsfile.utils.BitMap;
36
import org.apache.iotdb.tsfile.utils.Pair;
37
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
38
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
39

40
import java.io.DataInputStream;
41
import java.io.IOException;
42
import java.util.ArrayList;
43
import java.util.List;
44
import java.util.Objects;
45

46
import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE;
47
import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM;
48
import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
49
import static org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
50
import static org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
51

52
public abstract class AlignedTVList extends TVList {
53

54
  // Data types of this aligned tvList
55
  protected List<TSDataType> dataTypes;
56

57
  // Record total memory size of binary column
58
  protected long[] memoryBinaryChunkSize;
59

60
  // Data type list -> list of TVList, add 1 when expanded -> primitive array of basic type
61
  // Index relation: columnIndex(dataTypeIndex) -> arrayIndex -> elementIndex
62
  protected List<List<Object>> values;
63

64
  // List of index array, add 1 when expanded -> data point index array
65
  // Index relation: arrayIndex -> elementIndex
66
  // Used in sort method, sort only changes indices
67
  protected List<int[]> indices;
68

69
  // Data type list -> list of BitMap, add 1 when expanded -> BitMap(maybe null), marked means the
70
  // Value is null
71
  // Index relation: columnIndex(dataTypeIndex) -> arrayIndex -> elementIndex
72
  protected List<List<BitMap>> bitMaps;
73

74
  // If a sensor chunk size of Text datatype reaches the threshold, this flag will be set true
75
  boolean reachMaxChunkSizeFlag;
76

77
  AlignedTVList(List<TSDataType> types) {
78
    super();
1✔
79
    indices = new ArrayList<>(types.size());
1✔
80
    dataTypes = types;
1✔
81
    memoryBinaryChunkSize = new long[dataTypes.size()];
1✔
82
    reachMaxChunkSizeFlag = false;
1✔
83

84
    values = new ArrayList<>(types.size());
1✔
85
    for (int i = 0; i < types.size(); i++) {
1✔
86
      values.add(new ArrayList<>());
1✔
87
    }
88
  }
1✔
89

90
  public static AlignedTVList newAlignedList(List<TSDataType> dataTypes) {
91
    switch (TVLIST_SORT_ALGORITHM) {
1✔
92
      case QUICK:
93
        return new QuickAlignedTVList(dataTypes);
×
94
      case BACKWARD:
95
        return new BackAlignedTVList(dataTypes);
×
96
      default:
97
        return new TimAlignedTVList(dataTypes);
1✔
98
    }
99
  }
100

101
  @Override
102
  public TVList getTvListByColumnIndex(List<Integer> columnIndex, List<TSDataType> dataTypeList) {
103
    List<List<Object>> values = new ArrayList<>();
1✔
104
    List<List<BitMap>> bitMaps = null;
1✔
105
    for (int i = 0; i < columnIndex.size(); i++) {
1✔
106
      // columnIndex == -1 means querying a non-exist column, add null column here
107
      if (columnIndex.get(i) == -1) {
1✔
108
        values.add(null);
1✔
109
      } else {
110
        values.add(this.values.get(columnIndex.get(i)));
1✔
111
        if (this.bitMaps != null && this.bitMaps.get(columnIndex.get(i)) != null) {
1✔
112
          if (bitMaps == null) {
1✔
113
            bitMaps = new ArrayList<>(columnIndex.size());
1✔
114
            for (int j = 0; j < columnIndex.size(); j++) {
1✔
115
              bitMaps.add(null);
1✔
116
            }
117
          }
118
          bitMaps.set(i, this.bitMaps.get(columnIndex.get(i)));
1✔
119
        }
120
      }
121
    }
122
    AlignedTVList alignedTvList = AlignedTVList.newAlignedList(dataTypeList);
1✔
123
    alignedTvList.timestamps = this.timestamps;
1✔
124
    alignedTvList.indices = this.indices;
1✔
125
    alignedTvList.values = values;
1✔
126
    alignedTvList.bitMaps = bitMaps;
1✔
127
    alignedTvList.rowCount = this.rowCount;
1✔
128
    return alignedTvList;
1✔
129
  }
130

131
  @Override
132
  public AlignedTVList clone() {
133
    AlignedTVList cloneList = AlignedTVList.newAlignedList(dataTypes);
1✔
134
    cloneAs(cloneList);
1✔
135
    System.arraycopy(
1✔
136
        memoryBinaryChunkSize, 0, cloneList.memoryBinaryChunkSize, 0, dataTypes.size());
1✔
137
    for (int[] indicesArray : indices) {
1✔
138
      cloneList.indices.add(cloneIndex(indicesArray));
1✔
139
    }
1✔
140
    for (int i = 0; i < values.size(); i++) {
1✔
141
      List<Object> columnValues = values.get(i);
1✔
142
      for (Object valueArray : columnValues) {
1✔
143
        cloneList.values.get(i).add(cloneValue(dataTypes.get(i), valueArray));
1✔
144
      }
1✔
145
      // Clone bitmap in columnIndex
146
      if (bitMaps != null && bitMaps.get(i) != null) {
1✔
147
        List<BitMap> columnBitMaps = bitMaps.get(i);
1✔
148
        if (cloneList.bitMaps == null) {
1✔
149
          cloneList.bitMaps = new ArrayList<>(dataTypes.size());
1✔
150
          for (int j = 0; j < dataTypes.size(); j++) {
1✔
151
            cloneList.bitMaps.add(null);
1✔
152
          }
153
        }
154
        if (cloneList.bitMaps.get(i) == null) {
1✔
155
          List<BitMap> cloneColumnBitMaps = new ArrayList<>();
1✔
156
          for (BitMap bitMap : columnBitMaps) {
1✔
157
            cloneColumnBitMaps.add(bitMap == null ? null : bitMap.clone());
1✔
158
          }
1✔
159
          cloneList.bitMaps.set(i, cloneColumnBitMaps);
1✔
160
        }
161
      }
162
    }
163
    return cloneList;
1✔
164
  }
165

166
  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
167
  @Override
168
  public void putAlignedValue(long timestamp, Object[] value) {
169
    checkExpansion();
1✔
170
    int arrayIndex = rowCount / ARRAY_SIZE;
1✔
171
    int elementIndex = rowCount % ARRAY_SIZE;
1✔
172
    maxTime = Math.max(maxTime, timestamp);
1✔
173
    timestamps.get(arrayIndex)[elementIndex] = timestamp;
1✔
174
    for (int i = 0; i < values.size(); i++) {
1✔
175
      Object columnValue = value[i];
1✔
176
      List<Object> columnValues = values.get(i);
1✔
177
      if (columnValue == null) {
1✔
178
        markNullValue(i, arrayIndex, elementIndex);
1✔
179
      }
180
      switch (dataTypes.get(i)) {
1✔
181
        case TEXT:
182
          ((Binary[]) columnValues.get(arrayIndex))[elementIndex] =
1✔
183
              columnValue != null ? (Binary) columnValue : Binary.EMPTY_VALUE;
1✔
184
          memoryBinaryChunkSize[i] +=
1✔
185
              columnValue != null
1✔
186
                  ? getBinarySize((Binary) columnValue)
1✔
187
                  : getBinarySize(Binary.EMPTY_VALUE);
1✔
188
          if (memoryBinaryChunkSize[i] >= targetChunkSize) {
1✔
189
            reachMaxChunkSizeFlag = true;
×
190
          }
191
          break;
192
        case FLOAT:
193
          ((float[]) columnValues.get(arrayIndex))[elementIndex] =
1✔
194
              columnValue != null ? (float) columnValue : Float.MIN_VALUE;
1✔
195
          break;
1✔
196
        case INT32:
197
          ((int[]) columnValues.get(arrayIndex))[elementIndex] =
1✔
198
              columnValue != null ? (int) columnValue : Integer.MIN_VALUE;
1✔
199
          break;
1✔
200
        case INT64:
201
          ((long[]) columnValues.get(arrayIndex))[elementIndex] =
1✔
202
              columnValue != null ? (long) columnValue : Long.MIN_VALUE;
1✔
203
          break;
1✔
204
        case DOUBLE:
205
          ((double[]) columnValues.get(arrayIndex))[elementIndex] =
1✔
206
              columnValue != null ? (double) columnValue : Double.MIN_VALUE;
1✔
207
          break;
1✔
208
        case BOOLEAN:
209
          ((boolean[]) columnValues.get(arrayIndex))[elementIndex] =
1✔
210
              columnValue != null && (boolean) columnValue;
1✔
211
          break;
1✔
212
        default:
213
          break;
214
      }
215
    }
216
    indices.get(arrayIndex)[elementIndex] = rowCount;
1✔
217
    rowCount++;
1✔
218
    if (sorted && rowCount > 1 && timestamp < getTime(rowCount - 2)) {
1✔
219
      sorted = false;
1✔
220
    }
221
  }
1✔
222

223
  @Override
224
  public Object getAlignedValue(int index) {
225
    return getAlignedValueForQuery(index, null, null);
1✔
226
  }
227

228
  @Override
229
  protected TimeValuePair getTimeValuePair(
230
      int index, long time, Integer floatPrecision, TSEncoding encoding) {
231
    throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
×
232
  }
233

234
  @Override
235
  public TimeValuePair getTimeValuePair(int index) {
236
    return new TimeValuePair(
×
237
        getTime(index), (TsPrimitiveType) getAlignedValueForQuery(index, null, null));
×
238
  }
239

240
  private Object getAlignedValueForQuery(
241
      int index, Integer floatPrecision, List<TSEncoding> encodingList) {
242
    if (index >= rowCount) {
1✔
243
      throw new ArrayIndexOutOfBoundsException(index);
×
244
    }
245
    int arrayIndex = index / ARRAY_SIZE;
1✔
246
    int elementIndex = index % ARRAY_SIZE;
1✔
247
    int valueIndex = indices.get(arrayIndex)[elementIndex];
1✔
248
    return getAlignedValueByValueIndex(valueIndex, null, floatPrecision, encodingList);
1✔
249
  }
250

251
  private TsPrimitiveType getAlignedValueByValueIndex(
252
      int valueIndex,
253
      int[] validIndexesForTimeDuplicatedRows,
254
      Integer floatPrecision,
255
      List<TSEncoding> encodingList) {
256
    if (valueIndex >= rowCount) {
1✔
257
      throw new ArrayIndexOutOfBoundsException(valueIndex);
×
258
    }
259
    TsPrimitiveType[] vector = new TsPrimitiveType[values.size()];
1✔
260
    for (int columnIndex = 0; columnIndex < values.size(); columnIndex++) {
1✔
261
      List<Object> columnValues = values.get(columnIndex);
1✔
262
      int validValueIndex;
263
      if (validIndexesForTimeDuplicatedRows != null) {
1✔
264
        validValueIndex = validIndexesForTimeDuplicatedRows[columnIndex];
×
265
      } else {
266
        validValueIndex = valueIndex;
1✔
267
      }
268
      int arrayIndex = validValueIndex / ARRAY_SIZE;
1✔
269
      int elementIndex = validValueIndex % ARRAY_SIZE;
1✔
270
      if (columnValues == null || isNullValue(validValueIndex, columnIndex)) {
1✔
271
        continue;
1✔
272
      }
273
      switch (dataTypes.get(columnIndex)) {
1✔
274
        case TEXT:
275
          Binary valueT = ((Binary[]) columnValues.get(arrayIndex))[elementIndex];
1✔
276
          vector[columnIndex] = TsPrimitiveType.getByType(TSDataType.TEXT, valueT);
1✔
277
          break;
1✔
278
        case FLOAT:
279
          float valueF = ((float[]) columnValues.get(arrayIndex))[elementIndex];
1✔
280
          if (floatPrecision != null
1✔
281
              && encodingList != null
282
              && !Float.isNaN(valueF)
×
283
              && (encodingList.get(columnIndex) == TSEncoding.RLE
×
284
                  || encodingList.get(columnIndex) == TSEncoding.TS_2DIFF)) {
×
285
            valueF = MathUtils.roundWithGivenPrecision(valueF, floatPrecision);
×
286
          }
287
          vector[columnIndex] = TsPrimitiveType.getByType(TSDataType.FLOAT, valueF);
1✔
288
          break;
1✔
289
        case INT32:
290
          int valueI = ((int[]) columnValues.get(arrayIndex))[elementIndex];
1✔
291
          vector[columnIndex] = TsPrimitiveType.getByType(TSDataType.INT32, valueI);
1✔
292
          break;
1✔
293
        case INT64:
294
          long valueL = ((long[]) columnValues.get(arrayIndex))[elementIndex];
1✔
295
          vector[columnIndex] = TsPrimitiveType.getByType(TSDataType.INT64, valueL);
1✔
296
          break;
1✔
297
        case DOUBLE:
298
          double valueD = ((double[]) columnValues.get(arrayIndex))[elementIndex];
1✔
299
          if (floatPrecision != null
1✔
300
              && encodingList != null
301
              && !Double.isNaN(valueD)
×
302
              && (encodingList.get(columnIndex) == TSEncoding.RLE
×
303
                  || encodingList.get(columnIndex) == TSEncoding.TS_2DIFF)) {
×
304
            valueD = MathUtils.roundWithGivenPrecision(valueD, floatPrecision);
×
305
          }
306
          vector[columnIndex] = TsPrimitiveType.getByType(TSDataType.DOUBLE, valueD);
1✔
307
          break;
1✔
308
        case BOOLEAN:
309
          boolean valueB = ((boolean[]) columnValues.get(arrayIndex))[elementIndex];
1✔
310
          vector[columnIndex] = TsPrimitiveType.getByType(TSDataType.BOOLEAN, valueB);
1✔
311
          break;
1✔
312
        default:
313
          throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
×
314
      }
315
    }
316
    return TsPrimitiveType.getByType(TSDataType.VECTOR, vector);
1✔
317
  }
318

319
  public void extendColumn(TSDataType dataType) {
320
    if (bitMaps == null) {
1✔
321
      bitMaps = new ArrayList<>(values.size());
1✔
322
      for (int i = 0; i < values.size(); i++) {
1✔
323
        bitMaps.add(null);
1✔
324
      }
325
    }
326
    List<Object> columnValue = new ArrayList<>();
1✔
327
    List<BitMap> columnBitMaps = new ArrayList<>();
1✔
328
    for (int i = 0; i < timestamps.size(); i++) {
1✔
329
      switch (dataType) {
1✔
330
        case TEXT:
331
          columnValue.add(getPrimitiveArraysByType(TSDataType.TEXT));
1✔
332
          break;
1✔
333
        case FLOAT:
334
          columnValue.add(getPrimitiveArraysByType(TSDataType.FLOAT));
×
335
          break;
×
336
        case INT32:
337
          columnValue.add(getPrimitiveArraysByType(TSDataType.INT32));
1✔
338
          break;
1✔
339
        case INT64:
340
          columnValue.add(getPrimitiveArraysByType(TSDataType.INT64));
×
341
          break;
×
342
        case DOUBLE:
343
          columnValue.add(getPrimitiveArraysByType(TSDataType.DOUBLE));
×
344
          break;
×
345
        case BOOLEAN:
346
          columnValue.add(getPrimitiveArraysByType(TSDataType.BOOLEAN));
×
347
          break;
×
348
        default:
349
          break;
350
      }
351
      BitMap bitMap = new BitMap(ARRAY_SIZE);
1✔
352
      // The following code is for these 2 kinds of scenarios.
353

354
      // Eg1: If rowCount=5 and ARRAY_SIZE=2, we need to supply 3 bitmaps for the extending column.
355
      // The first 2 bitmaps should mark all bits to represent 4 nulls and the 3rd bitmap should
356
      // mark
357
      // the 1st bit to represent 1 null value.
358

359
      // Eg2: If rowCount=4 and ARRAY_SIZE=2, we need to supply 2 bitmaps for the extending column.
360
      // These 2 bitmaps should mark all bits to represent 4 nulls.
361
      if (i == timestamps.size() - 1 && rowCount % ARRAY_SIZE != 0) {
1✔
362
        for (int j = 0; j < rowCount % ARRAY_SIZE; j++) {
1✔
363
          bitMap.mark(j);
1✔
364
        }
365
      } else {
366
        bitMap.markAll();
1✔
367
      }
368
      columnBitMaps.add(bitMap);
1✔
369
    }
370
    this.bitMaps.add(columnBitMaps);
1✔
371
    this.values.add(columnValue);
1✔
372
    this.dataTypes.add(dataType);
1✔
373

374
    long[] tmpValueChunkRawSize = memoryBinaryChunkSize;
1✔
375
    memoryBinaryChunkSize = new long[dataTypes.size()];
1✔
376
    System.arraycopy(
1✔
377
        tmpValueChunkRawSize, 0, memoryBinaryChunkSize, 0, tmpValueChunkRawSize.length);
378
  }
1✔
379

380
  /**
381
   * Get the int value at the given position in AlignedTvList.
382
   *
383
   * @param rowIndex value index inside this column
384
   * @param columnIndex index of the column
385
   * @return the value at this position in VectorTvList
386
   */
387
  public int getIntByValueIndex(int rowIndex, int columnIndex) {
388
    int arrayIndex = rowIndex / ARRAY_SIZE;
1✔
389
    int elementIndex = rowIndex % ARRAY_SIZE;
1✔
390
    List<Object> columnValues = values.get(columnIndex);
1✔
391
    return ((int[]) columnValues.get(arrayIndex))[elementIndex];
1✔
392
  }
393

394
  /**
395
   * Get the long value at the given position in VectorTvList.
396
   *
397
   * @param rowIndex value index inside this column
398
   * @param columnIndex index of the column
399
   * @return the value at this position in VectorTvList
400
   */
401
  public long getLongByValueIndex(int rowIndex, int columnIndex) {
402
    int arrayIndex = rowIndex / ARRAY_SIZE;
1✔
403
    int elementIndex = rowIndex % ARRAY_SIZE;
1✔
404
    List<Object> columnValues = values.get(columnIndex);
1✔
405
    return ((long[]) columnValues.get(arrayIndex))[elementIndex];
1✔
406
  }
407

408
  /**
409
   * Get the float value at the given position in VectorTvList.
410
   *
411
   * @param rowIndex value index inside this column
412
   * @param columnIndex index of the column
413
   * @return the value at this position in VectorTvList
414
   */
415
  public float getFloatByValueIndex(int rowIndex, int columnIndex) {
416
    int arrayIndex = rowIndex / ARRAY_SIZE;
1✔
417
    int elementIndex = rowIndex % ARRAY_SIZE;
1✔
418
    List<Object> columnValues = values.get(columnIndex);
1✔
419
    return ((float[]) columnValues.get(arrayIndex))[elementIndex];
1✔
420
  }
421

422
  /**
423
   * Get the double value at the given position in VectorTvList.
424
   *
425
   * @param rowIndex value index inside this column
426
   * @param columnIndex index of the column
427
   * @return the value at this position in VectorTvList
428
   */
429
  public double getDoubleByValueIndex(int rowIndex, int columnIndex) {
430
    int arrayIndex = rowIndex / ARRAY_SIZE;
×
431
    int elementIndex = rowIndex % ARRAY_SIZE;
×
432
    List<Object> columnValues = values.get(columnIndex);
×
433
    return ((double[]) columnValues.get(arrayIndex))[elementIndex];
×
434
  }
435

436
  /**
437
   * Get the Binary value at the given position in VectorTvList.
438
   *
439
   * @param rowIndex value index inside this column
440
   * @param columnIndex index of the column
441
   * @return the value at this position in VectorTvList
442
   */
443
  public Binary getBinaryByValueIndex(int rowIndex, int columnIndex) {
444
    int arrayIndex = rowIndex / ARRAY_SIZE;
1✔
445
    int elementIndex = rowIndex % ARRAY_SIZE;
1✔
446
    List<Object> columnValues = values.get(columnIndex);
1✔
447
    return ((Binary[]) columnValues.get(arrayIndex))[elementIndex];
1✔
448
  }
449

450
  /**
451
   * Get the boolean value at the given position in VectorTvList.
452
   *
453
   * @param rowIndex value index inside this column
454
   * @param columnIndex index of the column
455
   * @return the value at this position in VectorTvList
456
   */
457
  public boolean getBooleanByValueIndex(int rowIndex, int columnIndex) {
458
    int arrayIndex = rowIndex / ARRAY_SIZE;
1✔
459
    int elementIndex = rowIndex % ARRAY_SIZE;
1✔
460
    List<Object> columnValues = values.get(columnIndex);
1✔
461
    return ((boolean[]) columnValues.get(arrayIndex))[elementIndex];
1✔
462
  }
463

464
  /**
465
   * Get whether value is null at the given position in AlignedTvList.
466
   *
467
   * @param rowIndex value index inside this column
468
   * @param columnIndex index of the column
469
   * @return boolean
470
   */
471
  public boolean isNullValue(int rowIndex, int columnIndex) {
472
    if (rowIndex >= rowCount) {
1✔
473
      return false;
×
474
    }
475
    if (values.get(columnIndex) == null) {
1✔
476
      return true;
1✔
477
    }
478
    if (bitMaps == null
1✔
479
        || bitMaps.get(columnIndex) == null
1✔
480
        || bitMaps.get(columnIndex).get(rowIndex / ARRAY_SIZE) == null) {
1✔
481
      return false;
1✔
482
    }
483
    int arrayIndex = rowIndex / ARRAY_SIZE;
1✔
484
    int elementIndex = rowIndex % ARRAY_SIZE;
1✔
485
    List<BitMap> columnBitMaps = bitMaps.get(columnIndex);
1✔
486
    return columnBitMaps.get(arrayIndex).isMarked(elementIndex);
1✔
487
  }
488

489
  public List<List<Object>> getValues() {
490
    return values;
×
491
  }
492

493
  public List<TSDataType> getTsDataTypes() {
494
    return dataTypes;
×
495
  }
496

497
  @Override
498
  public int delete(long lowerBound, long upperBound) {
499
    int deletedNumber = 0;
1✔
500
    for (int i = 0; i < dataTypes.size(); i++) {
1✔
501
      deletedNumber += delete(lowerBound, upperBound, i).left;
1✔
502
    }
503
    return deletedNumber;
1✔
504
  }
505

506
  /**
507
   * Delete points in a specific column.
508
   *
509
   * @param lowerBound deletion lower bound
510
   * @param upperBound deletion upper bound
511
   * @param columnIndex column index to be deleted
512
   * @return Delete info pair. Left: deletedNumber int; right: ifDeleteColumn boolean
513
   */
514
  public Pair<Integer, Boolean> delete(long lowerBound, long upperBound, int columnIndex) {
515
    int deletedNumber = 0;
1✔
516
    boolean deleteColumn = true;
1✔
517
    for (int i = 0; i < rowCount; i++) {
1✔
518
      long time = getTime(i);
1✔
519
      if (time >= lowerBound && time <= upperBound) {
1✔
520
        int originRowIndex = getValueIndex(i);
1✔
521
        int arrayIndex = originRowIndex / ARRAY_SIZE;
1✔
522
        int elementIndex = originRowIndex % ARRAY_SIZE;
1✔
523
        if (dataTypes.get(columnIndex) == TSDataType.TEXT) {
1✔
524
          Binary value = ((Binary[]) values.get(columnIndex).get(arrayIndex))[elementIndex];
1✔
525
          if (value != null) {
1✔
526
            memoryBinaryChunkSize[columnIndex] -= getBinarySize(value);
1✔
527
          }
528
        }
529
        markNullValue(columnIndex, arrayIndex, elementIndex);
1✔
530
        deletedNumber++;
1✔
531
      } else {
1✔
532
        deleteColumn = false;
1✔
533
      }
534
    }
535
    return new Pair<>(deletedNumber, deleteColumn);
1✔
536
  }
537

538
  public void deleteColumn(int columnIndex) {
539
    dataTypes.remove(columnIndex);
1✔
540

541
    long[] tmpValueChunkRawSize = memoryBinaryChunkSize;
1✔
542
    memoryBinaryChunkSize = new long[dataTypes.size()];
1✔
543
    int copyIndex = 0;
1✔
544
    for (int i = 0; i < tmpValueChunkRawSize.length; i++) {
1✔
545
      if (i == columnIndex) {
1✔
546
        continue;
1✔
547
      }
548
      memoryBinaryChunkSize[copyIndex++] = tmpValueChunkRawSize[i];
1✔
549
    }
550

551
    for (Object array : values.get(columnIndex)) {
1✔
552
      PrimitiveArrayManager.release(array);
1✔
553
    }
1✔
554
    values.remove(columnIndex);
1✔
555
    bitMaps.remove(columnIndex);
1✔
556
  }
1✔
557

558
  protected void set(int index, long timestamp, int value) {
559
    int arrayIndex = index / ARRAY_SIZE;
1✔
560
    int elementIndex = index % ARRAY_SIZE;
1✔
561
    timestamps.get(arrayIndex)[elementIndex] = timestamp;
1✔
562
    indices.get(arrayIndex)[elementIndex] = value;
1✔
563
  }
1✔
564

565
  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
566
  protected int[] cloneIndex(int[] array) {
567
    int[] cloneArray = new int[array.length];
1✔
568
    System.arraycopy(array, 0, cloneArray, 0, array.length);
1✔
569
    return cloneArray;
1✔
570
  }
571

572
  protected Object cloneValue(TSDataType type, Object value) {
573
    switch (type) {
1✔
574
      case TEXT:
575
        Binary[] valueT = (Binary[]) value;
×
576
        Binary[] cloneT = new Binary[valueT.length];
×
577
        System.arraycopy(valueT, 0, cloneT, 0, valueT.length);
×
578
        return cloneT;
×
579
      case FLOAT:
580
        float[] valueF = (float[]) value;
×
581
        float[] cloneF = new float[valueF.length];
×
582
        System.arraycopy(valueF, 0, cloneF, 0, valueF.length);
×
583
        return cloneF;
×
584
      case INT32:
585
        int[] valueI = (int[]) value;
×
586
        int[] cloneI = new int[valueI.length];
×
587
        System.arraycopy(valueI, 0, cloneI, 0, valueI.length);
×
588
        return cloneI;
×
589
      case INT64:
590
        long[] valueL = (long[]) value;
1✔
591
        long[] cloneL = new long[valueL.length];
1✔
592
        System.arraycopy(valueL, 0, cloneL, 0, valueL.length);
1✔
593
        return cloneL;
1✔
594
      case DOUBLE:
595
        double[] valueD = (double[]) value;
×
596
        double[] cloneD = new double[valueD.length];
×
597
        System.arraycopy(valueD, 0, cloneD, 0, valueD.length);
×
598
        return cloneD;
×
599
      case BOOLEAN:
600
        boolean[] valueB = (boolean[]) value;
×
601
        boolean[] cloneB = new boolean[valueB.length];
×
602
        System.arraycopy(valueB, 0, cloneB, 0, valueB.length);
×
603
        return cloneB;
×
604
      default:
605
        return null;
×
606
    }
607
  }
608

609
  @Override
610
  public void clearValue() {
611
    if (indices != null) {
1✔
612
      for (int[] dataArray : indices) {
1✔
613
        PrimitiveArrayManager.release(dataArray);
1✔
614
      }
1✔
615
      indices.clear();
1✔
616
    }
617
    for (int i = 0; i < dataTypes.size(); i++) {
1✔
618
      List<Object> columnValues = values.get(i);
1✔
619
      if (columnValues != null) {
1✔
620
        for (Object dataArray : columnValues) {
1✔
621
          PrimitiveArrayManager.release(dataArray);
1✔
622
        }
1✔
623
        columnValues.clear();
1✔
624
      }
625
      if (bitMaps != null) {
1✔
626
        List<BitMap> columnBitMaps = bitMaps.get(i);
1✔
627
        if (columnBitMaps != null) {
1✔
628
          columnBitMaps.clear();
1✔
629
        }
630
      }
631
      memoryBinaryChunkSize[i] = 0;
1✔
632
    }
633
  }
1✔
634

635
  @Override
636
  protected void expandValues() {
637
    indices.add((int[]) getPrimitiveArraysByType(TSDataType.INT32));
1✔
638
    for (int i = 0; i < dataTypes.size(); i++) {
1✔
639
      values.get(i).add(getPrimitiveArraysByType(dataTypes.get(i)));
1✔
640
      if (bitMaps != null && bitMaps.get(i) != null) {
1✔
641
        bitMaps.get(i).add(null);
1✔
642
      }
643
    }
644
  }
1✔
645

646
  /**
647
   * Get the row index value in index column.
648
   *
649
   * @param index row index
650
   */
651
  @Override
652
  public int getValueIndex(int index) {
653
    if (index >= rowCount) {
1✔
654
      throw new ArrayIndexOutOfBoundsException(index);
×
655
    }
656
    int arrayIndex = index / ARRAY_SIZE;
1✔
657
    int elementIndex = index % ARRAY_SIZE;
1✔
658
    return indices.get(arrayIndex)[elementIndex];
1✔
659
  }
660

661
  /**
662
   * Get the valid original row index in a column by a given time duplicated original row index
663
   * list.
664
   *
665
   * @param timeDuplicatedOriginRowIndexList The row index list that the time of all indexes are
666
   *     same.
667
   * @param columnIndex The index of a given column.
668
   * @return The original row index of the latest non-null value, or the first row index if all
669
   *     values in given columns are null.
670
   */
671
  public int getValidRowIndexForTimeDuplicatedRows(
672
      List<Integer> timeDuplicatedOriginRowIndexList, int columnIndex) {
673
    int validRowIndex = timeDuplicatedOriginRowIndexList.get(0);
×
674
    for (int originRowIndex : timeDuplicatedOriginRowIndexList) {
×
675
      if (!isNullValue(originRowIndex, columnIndex)) {
×
676
        validRowIndex = originRowIndex;
×
677
      }
678
    }
×
679
    return validRowIndex;
×
680
  }
681

682
  protected TimeValuePair getTimeValuePair(
683
      int index, long time, Integer floatPrecision, List<TSEncoding> encodingList) {
684
    return new TimeValuePair(
×
685
        time, (TsPrimitiveType) getAlignedValueForQuery(index, floatPrecision, encodingList));
×
686
  }
687

688
  @Override
689
  protected void releaseLastValueArray() {
690
    PrimitiveArrayManager.release(indices.remove(indices.size() - 1));
×
691
    for (List<Object> valueList : values) {
×
692
      PrimitiveArrayManager.release(valueList.remove(valueList.size() - 1));
×
693
    }
×
694
  }
×
695

696
  @Override
697
  public boolean reachMaxChunkSizeThreshold() {
698
    return reachMaxChunkSizeFlag;
1✔
699
  }
700

701
  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
702
  @Override
703
  public void putAlignedValues(long[] time, Object[] value, BitMap[] bitMaps, int start, int end) {
704
    checkExpansion();
1✔
705
    int idx = start;
1✔
706

707
    updateMaxTimeAndSorted(time, start, end);
1✔
708

709
    while (idx < end) {
1✔
710
      int inputRemaining = end - idx;
1✔
711
      int arrayIdx = rowCount / ARRAY_SIZE;
1✔
712
      int elementIdx = rowCount % ARRAY_SIZE;
1✔
713
      int internalRemaining = ARRAY_SIZE - elementIdx;
1✔
714
      if (internalRemaining >= inputRemaining) {
1✔
715
        // the remaining inputs can fit the last array, copy all remaining inputs into last array
716
        System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, inputRemaining);
1✔
717
        arrayCopy(value, idx, arrayIdx, elementIdx, inputRemaining);
1✔
718
        for (int i = 0; i < inputRemaining; i++) {
1✔
719
          indices.get(arrayIdx)[elementIdx + i] = rowCount;
1✔
720
          for (int j = 0; j < values.size(); j++) {
1✔
721
            if (value[j] == null
1✔
722
                || bitMaps != null && bitMaps[j] != null && bitMaps[j].isMarked(idx + i)) {
1✔
723
              markNullValue(j, arrayIdx, elementIdx + i);
1✔
724
            }
725
          }
726
          rowCount++;
1✔
727
        }
728
        break;
1✔
729
      } else {
730
        // the remaining inputs cannot fit the last array, fill the last array and create a new
731
        // one and enter the next loop
732
        System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, internalRemaining);
1✔
733
        arrayCopy(value, idx, arrayIdx, elementIdx, internalRemaining);
1✔
734
        for (int i = 0; i < internalRemaining; i++) {
1✔
735
          indices.get(arrayIdx)[elementIdx + i] = rowCount;
1✔
736
          for (int j = 0; j < values.size(); j++) {
1✔
737
            if (value[j] == null
1✔
738
                || bitMaps != null && bitMaps[j] != null && bitMaps[j].isMarked(idx + i)) {
1✔
739
              markNullValue(j, arrayIdx, elementIdx + i);
1✔
740
            }
741
          }
742
          rowCount++;
1✔
743
        }
744
        idx += internalRemaining;
1✔
745
        checkExpansion();
1✔
746
      }
747
    }
1✔
748
  }
1✔
749

750
  private void arrayCopy(Object[] value, int idx, int arrayIndex, int elementIndex, int remaining) {
751
    for (int i = 0; i < values.size(); i++) {
1✔
752
      if (value[i] == null) {
1✔
753
        continue;
×
754
      }
755
      List<Object> columnValues = values.get(i);
1✔
756
      switch (dataTypes.get(i)) {
1✔
757
        case TEXT:
758
          Binary[] arrayT = ((Binary[]) columnValues.get(arrayIndex));
1✔
759
          System.arraycopy(value[i], idx, arrayT, elementIndex, remaining);
1✔
760

761
          // update raw size of Text chunk
762
          for (int i1 = 0; i1 < remaining; i1++) {
1✔
763
            memoryBinaryChunkSize[i] +=
1✔
764
                arrayT[elementIndex + i1] != null ? getBinarySize(arrayT[elementIndex + i1]) : 0;
1✔
765
          }
766
          if (memoryBinaryChunkSize[i] > targetChunkSize) {
1✔
767
            reachMaxChunkSizeFlag = true;
×
768
          }
769
          break;
770
        case FLOAT:
771
          float[] arrayF = ((float[]) columnValues.get(arrayIndex));
1✔
772
          System.arraycopy(value[i], idx, arrayF, elementIndex, remaining);
1✔
773
          break;
1✔
774
        case INT32:
775
          int[] arrayI = ((int[]) columnValues.get(arrayIndex));
1✔
776
          System.arraycopy(value[i], idx, arrayI, elementIndex, remaining);
1✔
777
          break;
1✔
778
        case INT64:
779
          long[] arrayL = ((long[]) columnValues.get(arrayIndex));
1✔
780
          System.arraycopy(value[i], idx, arrayL, elementIndex, remaining);
1✔
781
          break;
1✔
782
        case DOUBLE:
783
          double[] arrayD = ((double[]) columnValues.get(arrayIndex));
×
784
          System.arraycopy(value[i], idx, arrayD, elementIndex, remaining);
×
785
          break;
×
786
        case BOOLEAN:
787
          boolean[] arrayB = ((boolean[]) columnValues.get(arrayIndex));
1✔
788
          System.arraycopy(value[i], idx, arrayB, elementIndex, remaining);
1✔
789
          break;
1✔
790
        default:
791
          break;
792
      }
793
    }
794
  }
1✔
795

796
  private void markNullValue(int columnIndex, int arrayIndex, int elementIndex) {
797
    // init BitMaps if doesn't have
798
    if (bitMaps == null) {
1✔
799
      bitMaps = new ArrayList<>(dataTypes.size());
1✔
800
      for (int i = 0; i < dataTypes.size(); i++) {
1✔
801
        bitMaps.add(null);
1✔
802
      }
803
    }
804

805
    // if the bitmap in columnIndex is null, init the bitmap of this column from the beginning
806
    if (bitMaps.get(columnIndex) == null) {
1✔
807
      List<BitMap> columnBitMaps = new ArrayList<>();
1✔
808
      for (int i = 0; i < values.get(columnIndex).size(); i++) {
1✔
809
        columnBitMaps.add(new BitMap(ARRAY_SIZE));
1✔
810
      }
811
      bitMaps.set(columnIndex, columnBitMaps);
1✔
812
    }
813

814
    // if the bitmap in arrayIndex is null, init the bitmap
815
    if (bitMaps.get(columnIndex).get(arrayIndex) == null) {
1✔
816
      bitMaps.get(columnIndex).set(arrayIndex, new BitMap(ARRAY_SIZE));
1✔
817
    }
818

819
    // mark the null value in the current bitmap
820
    bitMaps.get(columnIndex).get(arrayIndex).mark(elementIndex);
1✔
821
  }
1✔
822

823
  @Override
824
  public TSDataType getDataType() {
825
    return TSDataType.VECTOR;
×
826
  }
827

828
  /**
829
   * Get the single alignedTVList array mem cost by give types.
830
   *
831
   * @param types the types in the vector
832
   * @return AlignedTvListArrayMemSize
833
   */
834
  public static long alignedTvListArrayMemCost(TSDataType[] types) {
835
    long size = 0;
1✔
836
    // value array mem size
837
    for (TSDataType type : types) {
1✔
838
      if (type != null) {
1✔
839
        size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize();
1✔
840
      }
841
    }
842
    // size is 0 when all types are null
843
    if (size == 0) {
1✔
844
      return size;
×
845
    }
846
    // time array mem size
847
    size += PrimitiveArrayManager.ARRAY_SIZE * 8L;
1✔
848
    // index array mem size
849
    size += PrimitiveArrayManager.ARRAY_SIZE * 4L;
1✔
850
    // array headers mem size
851
    size += (long) NUM_BYTES_ARRAY_HEADER * (2 + types.length);
1✔
852
    // Object references size in ArrayList
853
    size += (long) NUM_BYTES_OBJECT_REF * (2 + types.length);
1✔
854
    return size;
1✔
855
  }
856

857
  /** Build TsBlock by column. */
858
  public TsBlock buildTsBlock(
859
      int floatPrecision, List<TSEncoding> encodingList, List<List<TimeRange>> deletionList) {
860
    TsBlockBuilder builder = new TsBlockBuilder(dataTypes);
1✔
861
    // Time column
862
    TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
1✔
863
    int validRowCount = 0;
1✔
864
    boolean[] timeDuplicateInfo = null;
1✔
865
    // time column
866
    for (int sortedRowIndex = 0; sortedRowIndex < rowCount; sortedRowIndex++) {
1✔
867
      if (sortedRowIndex == rowCount - 1
1✔
868
          || getTime(sortedRowIndex) != getTime(sortedRowIndex + 1)) {
1✔
869
        timeBuilder.writeLong(getTime(sortedRowIndex));
1✔
870
        validRowCount++;
1✔
871
      } else {
872
        if (Objects.isNull(timeDuplicateInfo)) {
1✔
873
          timeDuplicateInfo = new boolean[rowCount];
1✔
874
        }
875
        timeDuplicateInfo[sortedRowIndex] = true;
1✔
876
      }
877
    }
878

879
    // value columns
880
    for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) {
1✔
881
      int deleteCursor = 0;
1✔
882
      // Pair of Time and Index
883
      Pair<Long, Integer> lastValidPointIndexForTimeDupCheck = null;
1✔
884
      if (Objects.nonNull(timeDuplicateInfo)) {
1✔
885
        lastValidPointIndexForTimeDupCheck = new Pair<>(Long.MIN_VALUE, null);
1✔
886
      }
887
      ColumnBuilder valueBuilder = builder.getColumnBuilder(columnIndex);
1✔
888
      for (int sortedRowIndex = 0; sortedRowIndex < rowCount; sortedRowIndex++) {
1✔
889
        // skip time duplicated rows
890
        if (Objects.nonNull(timeDuplicateInfo)) {
1✔
891
          if (!isNullValue(getValueIndex(sortedRowIndex), columnIndex)) {
1✔
892
            lastValidPointIndexForTimeDupCheck.left = getTime(sortedRowIndex);
1✔
893
            lastValidPointIndexForTimeDupCheck.right = getValueIndex(sortedRowIndex);
1✔
894
          }
895
          if (timeDuplicateInfo[sortedRowIndex]) {
1✔
896
            continue;
1✔
897
          }
898
        }
899
        // The part of code solves the following problem:
900
        // Time: 1,2,2,3
901
        // Value: 1,2,null,null
902
        // When rowIndex:1, pair(min,null), timeDuplicateInfo:false, write(T:1,V:1)
903
        // When rowIndex:2, pair(2,2), timeDuplicateInfo:true, skip writing value
904
        // When rowIndex:3, pair(2,2), timeDuplicateInfo:false, T:2!=air.left:2, write(T:2,V:2)
905
        // When rowIndex:4, pair(2,2), timeDuplicateInfo:false, T:3!=pair.left:2, write(T:3,V:null)
906
        int originRowIndex;
907
        if (Objects.nonNull(lastValidPointIndexForTimeDupCheck)
1✔
908
            && (getTime(sortedRowIndex) == lastValidPointIndexForTimeDupCheck.left)) {
1✔
909
          originRowIndex = lastValidPointIndexForTimeDupCheck.right;
1✔
910
        } else {
911
          originRowIndex = getValueIndex(sortedRowIndex);
1✔
912
        }
913
        if (isNullValue(originRowIndex, columnIndex)
1✔
914
            || isPointDeleted(
1✔
915
                getTime(sortedRowIndex),
1✔
916
                Objects.isNull(deletionList) ? null : deletionList.get(columnIndex),
1✔
917
                deleteCursor)) {
1✔
918
          valueBuilder.appendNull();
1✔
919
          continue;
1✔
920
        }
921
        switch (dataTypes.get(columnIndex)) {
1✔
922
          case BOOLEAN:
923
            valueBuilder.writeBoolean(getBooleanByValueIndex(originRowIndex, columnIndex));
1✔
924
            break;
1✔
925
          case INT32:
926
            valueBuilder.writeInt(getIntByValueIndex(originRowIndex, columnIndex));
1✔
927
            break;
1✔
928
          case INT64:
929
            valueBuilder.writeLong(getLongByValueIndex(originRowIndex, columnIndex));
1✔
930
            break;
1✔
931
          case FLOAT:
932
            valueBuilder.writeFloat(
1✔
933
                roundValueWithGivenPrecision(
1✔
934
                    getFloatByValueIndex(originRowIndex, columnIndex),
1✔
935
                    floatPrecision,
936
                    encodingList.get(columnIndex)));
1✔
937
            break;
1✔
938
          case DOUBLE:
939
            valueBuilder.writeDouble(
×
940
                roundValueWithGivenPrecision(
×
941
                    getDoubleByValueIndex(originRowIndex, columnIndex),
×
942
                    floatPrecision,
943
                    encodingList.get(columnIndex)));
×
944
            break;
×
945
          case TEXT:
946
            valueBuilder.writeBinary(getBinaryByValueIndex(originRowIndex, columnIndex));
1✔
947
            break;
1✔
948
          default:
949
            break;
950
        }
951
      }
952
    }
953
    builder.declarePositions(validRowCount);
1✔
954
    return builder.build();
1✔
955
  }
956

957
  protected void writeValidValuesIntoTsBlock(
958
      TsBlockBuilder builder,
959
      int floatPrecision,
960
      TSEncoding encoding,
961
      List<TimeRange> deletionList) {
962
    throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
×
963
  }
964

965
  @Override
966
  public int serializedSize() {
967
    int size = (1 + dataTypes.size()) * Byte.BYTES + 2 * Integer.BYTES;
1✔
968
    // time
969
    size += rowCount * Long.BYTES;
1✔
970
    // value
971
    for (int columnIndex = 0; columnIndex < values.size(); ++columnIndex) {
1✔
972
      switch (dataTypes.get(columnIndex)) {
1✔
973
        case TEXT:
974
          for (int rowIdx = 0; rowIdx < rowCount; ++rowIdx) {
×
975
            size += ReadWriteIOUtils.sizeToWrite(getBinaryByValueIndex(rowIdx, columnIndex));
×
976
          }
977
          break;
×
978
        case FLOAT:
979
          size += rowCount * Float.BYTES;
×
980
          break;
×
981
        case INT32:
982
          size += rowCount * Integer.BYTES;
×
983
          break;
×
984
        case INT64:
985
          size += rowCount * Long.BYTES;
1✔
986
          break;
1✔
987
        case DOUBLE:
988
          size += rowCount * Double.BYTES;
×
989
          break;
×
990
        case BOOLEAN:
991
          size += rowCount * Byte.BYTES;
1✔
992
          break;
1✔
993
        default:
994
          throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
×
995
      }
996
    }
997
    // bitmap
998
    size += rowCount * dataTypes.size() * Byte.BYTES;
1✔
999
    return size;
1✔
1000
  }
1001

1002
  @Override
1003
  public void serializeToWAL(IWALByteBufferView buffer) {
1004
    WALWriteUtils.write(TSDataType.VECTOR, buffer);
1✔
1005
    buffer.putInt(dataTypes.size());
1✔
1006
    for (TSDataType dataType : dataTypes) {
1✔
1007
      buffer.put(dataType.serialize());
1✔
1008
    }
1✔
1009
    buffer.putInt(rowCount);
1✔
1010
    // time
1011
    for (int rowIndex = 0; rowIndex < rowCount; ++rowIndex) {
1✔
1012
      buffer.putLong(getTime(rowIndex));
1✔
1013
    }
1014
    // serialize value and bitmap by column
1015
    for (int columnIndex = 0; columnIndex < values.size(); columnIndex++) {
1✔
1016
      List<Object> columnValues = values.get(columnIndex);
1✔
1017
      for (int rowIndex = 0; rowIndex < rowCount; ++rowIndex) {
1✔
1018
        int arrayIndex = rowIndex / ARRAY_SIZE;
1✔
1019
        int elementIndex = rowIndex % ARRAY_SIZE;
1✔
1020
        // value
1021
        switch (dataTypes.get(columnIndex)) {
1✔
1022
          case TEXT:
1023
            Binary valueT = ((Binary[]) columnValues.get(arrayIndex))[elementIndex];
×
1024
            // In some scenario, the Binary in AlignedTVList will be null if this field is empty in
1025
            // current row. We need to handle this scenario to get rid of NPE. See the similar issue
1026
            // here: https://github.com/apache/iotdb/pull/9884
1027
            // Furthermore, we use an empty Binary as a placeholder here. It won't lead to data
1028
            // error because whether this field is null or not is decided by the bitMap rather than
1029
            // the object's value here.
1030
            if (valueT != null) {
×
1031
              WALWriteUtils.write(valueT, buffer);
×
1032
            } else {
1033
              WALWriteUtils.write(new Binary(new byte[0]), buffer);
×
1034
            }
1035
            break;
×
1036
          case FLOAT:
1037
            float valueF = ((float[]) columnValues.get(arrayIndex))[elementIndex];
×
1038
            buffer.putFloat(valueF);
×
1039
            break;
×
1040
          case INT32:
1041
            int valueI = ((int[]) columnValues.get(arrayIndex))[elementIndex];
×
1042
            buffer.putInt(valueI);
×
1043
            break;
×
1044
          case INT64:
1045
            long valueL = ((long[]) columnValues.get(arrayIndex))[elementIndex];
1✔
1046
            buffer.putLong(valueL);
1✔
1047
            break;
1✔
1048
          case DOUBLE:
1049
            double valueD = ((double[]) columnValues.get(arrayIndex))[elementIndex];
×
1050
            buffer.putDouble(valueD);
×
1051
            break;
×
1052
          case BOOLEAN:
1053
            boolean valueB = ((boolean[]) columnValues.get(arrayIndex))[elementIndex];
1✔
1054
            WALWriteUtils.write(valueB, buffer);
1✔
1055
            break;
1✔
1056
          default:
1057
            throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
×
1058
        }
1059
        // bitmap
1060
        WALWriteUtils.write(isNullValue(rowIndex, columnIndex), buffer);
1✔
1061
      }
1062
    }
1063
  }
1✔
1064

1065
  public static AlignedTVList deserialize(DataInputStream stream) throws IOException {
1066
    int dataTypeNum = stream.readInt();
×
1067
    List<TSDataType> dataTypes = new ArrayList<>(dataTypeNum);
×
1068
    for (int columnIndex = 0; columnIndex < dataTypeNum; ++columnIndex) {
×
1069
      dataTypes.add(ReadWriteIOUtils.readDataType(stream));
×
1070
    }
1071

1072
    int rowCount = stream.readInt();
×
1073
    // time
1074
    long[] times = new long[rowCount];
×
1075
    for (int rowIndex = 0; rowIndex < rowCount; ++rowIndex) {
×
1076
      times[rowIndex] = stream.readLong();
×
1077
    }
1078
    // read value and bitmap by column
1079
    Object[] values = new Object[dataTypeNum];
×
1080
    BitMap[] bitMaps = new BitMap[dataTypeNum];
×
1081
    for (int columnIndex = 0; columnIndex < dataTypeNum; ++columnIndex) {
×
1082
      BitMap bitMap = new BitMap(rowCount);
×
1083
      Object valuesOfOneColumn;
1084
      switch (dataTypes.get(columnIndex)) {
×
1085
        case TEXT:
1086
          Binary[] binaryValues = new Binary[rowCount];
×
1087
          for (int rowIndex = 0; rowIndex < rowCount; ++rowIndex) {
×
1088
            binaryValues[rowIndex] = ReadWriteIOUtils.readBinary(stream);
×
1089
            if (ReadWriteIOUtils.readBool(stream)) {
×
1090
              bitMap.mark(rowIndex);
×
1091
            }
1092
          }
1093
          valuesOfOneColumn = binaryValues;
×
1094
          break;
×
1095
        case FLOAT:
1096
          float[] floatValues = new float[rowCount];
×
1097
          for (int rowIndex = 0; rowIndex < rowCount; ++rowIndex) {
×
1098
            floatValues[rowIndex] = stream.readFloat();
×
1099
            if (ReadWriteIOUtils.readBool(stream)) {
×
1100
              bitMap.mark(rowIndex);
×
1101
            }
1102
          }
1103
          valuesOfOneColumn = floatValues;
×
1104
          break;
×
1105
        case INT32:
1106
          int[] intValues = new int[rowCount];
×
1107
          for (int rowIndex = 0; rowIndex < rowCount; ++rowIndex) {
×
1108
            intValues[rowIndex] = stream.readInt();
×
1109
            if (ReadWriteIOUtils.readBool(stream)) {
×
1110
              bitMap.mark(rowIndex);
×
1111
            }
1112
          }
1113
          valuesOfOneColumn = intValues;
×
1114
          break;
×
1115
        case INT64:
1116
          long[] longValues = new long[rowCount];
×
1117
          for (int rowIndex = 0; rowIndex < rowCount; ++rowIndex) {
×
1118
            longValues[rowIndex] = stream.readLong();
×
1119
            if (ReadWriteIOUtils.readBool(stream)) {
×
1120
              bitMap.mark(rowIndex);
×
1121
            }
1122
          }
1123
          valuesOfOneColumn = longValues;
×
1124
          break;
×
1125
        case DOUBLE:
1126
          double[] doubleValues = new double[rowCount];
×
1127
          for (int rowIndex = 0; rowIndex < rowCount; ++rowIndex) {
×
1128
            doubleValues[rowIndex] = stream.readDouble();
×
1129
            if (ReadWriteIOUtils.readBool(stream)) {
×
1130
              bitMap.mark(rowIndex);
×
1131
            }
1132
          }
1133
          valuesOfOneColumn = doubleValues;
×
1134
          break;
×
1135
        case BOOLEAN:
1136
          boolean[] booleanValues = new boolean[rowCount];
×
1137
          for (int rowIndex = 0; rowIndex < rowCount; ++rowIndex) {
×
1138
            booleanValues[rowIndex] = ReadWriteIOUtils.readBool(stream);
×
1139
            if (ReadWriteIOUtils.readBool(stream)) {
×
1140
              bitMap.mark(rowIndex);
×
1141
            }
1142
          }
1143
          valuesOfOneColumn = booleanValues;
×
1144
          break;
×
1145
        default:
1146
          throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
×
1147
      }
1148
      values[columnIndex] = valuesOfOneColumn;
×
1149
      bitMaps[columnIndex] = bitMap;
×
1150
    }
1151

1152
    AlignedTVList tvList = AlignedTVList.newAlignedList(dataTypes);
×
1153
    tvList.putAlignedValues(times, values, bitMaps, 0, rowCount);
×
1154
    return tvList;
×
1155
  }
1156
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc