• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9629

pending completion
#9629

push

travis_ci

web-flow
Add executeGroupByQueryIntervalQuery rpc interface (#10571)

260 of 260 new or added lines in 6 files covered. (100.0%)

79015 of 165199 relevant lines covered (47.83%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.61
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.utils;
21

22
import org.apache.iotdb.commons.exception.IoTDBException;
23
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
24
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
25
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
26
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
27
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
28
import org.apache.iotdb.tsfile.read.common.block.column.Column;
29
import org.apache.iotdb.tsfile.utils.Binary;
30
import org.apache.iotdb.tsfile.utils.BitMap;
31
import org.apache.iotdb.tsfile.utils.BytesUtils;
32
import org.apache.iotdb.tsfile.utils.Pair;
33

34
import java.io.ByteArrayOutputStream;
35
import java.io.DataInputStream;
36
import java.io.DataOutputStream;
37
import java.io.IOException;
38
import java.nio.ByteBuffer;
39
import java.util.ArrayList;
40
import java.util.LinkedList;
41
import java.util.List;
42
import java.util.Optional;
43

44
/** TimeValuePairUtils to convert between thrift format and TsFile format. */
45
public class QueryDataSetUtils {
46

47
  private static final int FLAG = 0x01;
48

49
  private QueryDataSetUtils() {}
50

51
  public static Pair<TSQueryDataSet, Boolean> convertTsBlockByFetchSize(
52
      IQueryExecution queryExecution, int fetchSize) throws IOException, IoTDBException {
53
    boolean finished = false;
1✔
54
    int columnNum = queryExecution.getOutputValueColumnCount();
1✔
55
    // one time column and each value column has an actual value buffer and a bitmap value to
56
    // indicate whether it is a null
57
    int columnNumWithTime = columnNum * 2 + 1;
1✔
58
    DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
1✔
59
    ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
1✔
60
    for (int i = 0; i < columnNumWithTime; i++) {
1✔
61
      byteArrayOutputStreams[i] = new ByteArrayOutputStream();
1✔
62
      dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
1✔
63
    }
64

65
    int rowCount = 0;
1✔
66
    int[] valueOccupation = new int[columnNum];
1✔
67

68
    // used to record a bitmap for every 8 points
69
    int[] bitmaps = new int[columnNum];
1✔
70
    while (rowCount < fetchSize) {
1✔
71
      Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
1✔
72
      if (!optionalTsBlock.isPresent()) {
1✔
73
        finished = true;
1✔
74
        break;
1✔
75
      }
76
      TsBlock tsBlock = optionalTsBlock.get();
1✔
77
      if (!tsBlock.isEmpty()) {
1✔
78
        int currentCount = tsBlock.getPositionCount();
1✔
79
        serializeTsBlock(
1✔
80
            rowCount,
81
            currentCount,
82
            tsBlock,
83
            columnNum,
84
            dataOutputStreams,
85
            valueOccupation,
86
            bitmaps);
87
        rowCount += currentCount;
1✔
88
      }
89
    }
1✔
90

91
    fillRemainingBitMap(rowCount, columnNum, dataOutputStreams, bitmaps);
1✔
92

93
    TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
1✔
94

95
    fillTimeColumn(rowCount, byteArrayOutputStreams, tsQueryDataSet);
1✔
96

97
    fillValueColumnsAndBitMaps(rowCount, byteArrayOutputStreams, valueOccupation, tsQueryDataSet);
1✔
98

99
    return new Pair<>(tsQueryDataSet, finished);
1✔
100
  }
101

102
  public static TSQueryDataSet convertTsBlockByFetchSize(List<TsBlock> tsBlocks)
103
      throws IOException {
104
    TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
×
105

106
    // one time column and each value column has an actual value buffer and a bitmap value to
107
    // indicate whether it is a null
108
    int columnNum = 1;
×
109
    int columnNumWithTime = columnNum * 2 + 1;
×
110
    DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
×
111
    ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
×
112
    for (int i = 0; i < columnNumWithTime; i++) {
×
113
      byteArrayOutputStreams[i] = new ByteArrayOutputStream();
×
114
      dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
×
115
    }
116

117
    int rowCount = 0;
×
118
    int[] valueOccupation = new int[columnNum];
×
119

120
    // used to record a bitmap for every 8 points
121
    int[] bitmaps = new int[columnNum];
×
122
    for (TsBlock tsBlock : tsBlocks) {
×
123
      if (tsBlock.isEmpty()) {
×
124
        continue;
×
125
      }
126

127
      int currentCount = tsBlock.getPositionCount();
×
128
      // serialize time column
129
      for (int i = 0; i < currentCount; i++) {
×
130
        // use columnOutput to write byte array
131
        dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i));
×
132
      }
133

134
      // serialize each value column and its bitmap
135
      for (int k = 0; k < columnNum; k++) {
×
136
        // get DataOutputStream for current value column and its bitmap
137
        DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1];
×
138
        DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
×
139

140
        Column column = tsBlock.getColumn(k);
×
141
        TSDataType type = column.getDataType();
×
142
        switch (type) {
×
143
          case INT32:
144
            for (int i = 0; i < currentCount; i++) {
×
145
              rowCount++;
×
146
              if (column.isNull(i)) {
×
147
                bitmaps[k] = bitmaps[k] << 1;
×
148
              } else {
149
                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
×
150
                dataOutputStream.writeInt(column.getInt(i));
×
151
                valueOccupation[k] += 4;
×
152
              }
153
              if (rowCount != 0 && rowCount % 8 == 0) {
×
154
                dataBitmapOutputStream.writeByte(bitmaps[k]);
×
155
                // we should clear the bitmap every 8 points
156
                bitmaps[k] = 0;
×
157
              }
158
            }
159
            break;
×
160
          case INT64:
161
            for (int i = 0; i < currentCount; i++) {
×
162
              rowCount++;
×
163
              if (column.isNull(i)) {
×
164
                bitmaps[k] = bitmaps[k] << 1;
×
165
              } else {
166
                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
×
167
                dataOutputStream.writeLong(column.getLong(i));
×
168
                valueOccupation[k] += 8;
×
169
              }
170
              if (rowCount != 0 && rowCount % 8 == 0) {
×
171
                dataBitmapOutputStream.writeByte(bitmaps[k]);
×
172
                // we should clear the bitmap every 8 points
173
                bitmaps[k] = 0;
×
174
              }
175
            }
176
            break;
×
177
          case FLOAT:
178
            for (int i = 0; i < currentCount; i++) {
×
179
              rowCount++;
×
180
              if (column.isNull(i)) {
×
181
                bitmaps[k] = bitmaps[k] << 1;
×
182
              } else {
183
                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
×
184
                dataOutputStream.writeFloat(column.getFloat(i));
×
185
                valueOccupation[k] += 4;
×
186
              }
187
              if (rowCount != 0 && rowCount % 8 == 0) {
×
188
                dataBitmapOutputStream.writeByte(bitmaps[k]);
×
189
                // we should clear the bitmap every 8 points
190
                bitmaps[k] = 0;
×
191
              }
192
            }
193
            break;
×
194
          case DOUBLE:
195
            for (int i = 0; i < currentCount; i++) {
×
196
              rowCount++;
×
197
              if (column.isNull(i)) {
×
198
                bitmaps[k] = bitmaps[k] << 1;
×
199
              } else {
200
                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
×
201
                dataOutputStream.writeDouble(column.getDouble(i));
×
202
                valueOccupation[k] += 8;
×
203
              }
204
              if (rowCount != 0 && rowCount % 8 == 0) {
×
205
                dataBitmapOutputStream.writeByte(bitmaps[k]);
×
206
                // we should clear the bitmap every 8 points
207
                bitmaps[k] = 0;
×
208
              }
209
            }
210
            break;
×
211
          case BOOLEAN:
212
            for (int i = 0; i < currentCount; i++) {
×
213
              rowCount++;
×
214
              if (column.isNull(i)) {
×
215
                bitmaps[k] = bitmaps[k] << 1;
×
216
              } else {
217
                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
×
218
                dataOutputStream.writeBoolean(column.getBoolean(i));
×
219
                valueOccupation[k] += 1;
×
220
              }
221
              if (rowCount != 0 && rowCount % 8 == 0) {
×
222
                dataBitmapOutputStream.writeByte(bitmaps[k]);
×
223
                // we should clear the bitmap every 8 points
224
                bitmaps[k] = 0;
×
225
              }
226
            }
227
            break;
×
228
          case TEXT:
229
            for (int i = 0; i < currentCount; i++) {
×
230
              rowCount++;
×
231
              if (column.isNull(i)) {
×
232
                bitmaps[k] = bitmaps[k] << 1;
×
233
              } else {
234
                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
×
235
                Binary binary = column.getBinary(i);
×
236
                dataOutputStream.writeInt(binary.getLength());
×
237
                dataOutputStream.write(binary.getValues());
×
238
                valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength();
×
239
              }
240
              if (rowCount != 0 && rowCount % 8 == 0) {
×
241
                dataBitmapOutputStream.writeByte(bitmaps[k]);
×
242
                // we should clear the bitmap every 8 points
243
                bitmaps[k] = 0;
×
244
              }
245
            }
246
            break;
×
247
          default:
248
            throw new UnSupportedDataTypeException(
×
249
                String.format("Data type %s is not supported.", type));
×
250
        }
251
        if (k != columnNum - 1) {
×
252
          rowCount -= currentCount;
×
253
        }
254
      }
255
    }
×
256
    // feed the remaining bitmap
257
    int remaining = rowCount % 8;
×
258
    for (int k = 0; k < columnNum; k++) {
×
259
      if (remaining != 0) {
×
260
        DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
×
261
        dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining));
×
262
      }
263
    }
264

265
    // calculate the time buffer size
266
    int timeOccupation = rowCount * 8;
×
267
    ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
×
268
    timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
×
269
    timeBuffer.flip();
×
270
    tsQueryDataSet.setTime(timeBuffer);
×
271

272
    // calculate the bitmap buffer size
273
    int bitmapOccupation = (rowCount + 7) / 8;
×
274

275
    List<ByteBuffer> bitmapList = new LinkedList<>();
×
276
    List<ByteBuffer> valueList = new LinkedList<>();
×
277
    for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
×
278
      ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]);
×
279
      valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
×
280
      valueBuffer.flip();
×
281
      valueList.add(valueBuffer);
×
282

283
      ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
×
284
      bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
×
285
      bitmapBuffer.flip();
×
286
      bitmapList.add(bitmapBuffer);
×
287
    }
288
    tsQueryDataSet.setBitmapList(bitmapList);
×
289
    tsQueryDataSet.setValueList(valueList);
×
290
    return tsQueryDataSet;
×
291
  }
292

293
  private static void serializeTsBlock(
294
      int rowCount,
295
      int currentCount,
296
      TsBlock tsBlock,
297
      int columnNum,
298
      DataOutputStream[] dataOutputStreams,
299
      int[] valueOccupation,
300
      int[] bitmaps)
301
      throws IOException {
302
    // serialize time column
303
    for (int i = 0; i < currentCount; i++) {
1✔
304
      // use columnOutput to write byte array
305
      dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i));
1✔
306
    }
307

308
    // serialize each value column and its bitmap
309
    for (int k = 0; k < columnNum; k++) {
1✔
310
      // get DataOutputStream for current value column and its bitmap
311
      DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1];
1✔
312
      DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
1✔
313

314
      Column column = tsBlock.getColumn(k);
1✔
315
      TSDataType type = column.getDataType();
1✔
316
      switch (type) {
1✔
317
        case INT32:
318
          doWithInt32Column(
1✔
319
              rowCount,
320
              column,
321
              bitmaps,
322
              k,
323
              dataOutputStream,
324
              valueOccupation,
325
              dataBitmapOutputStream);
326
          break;
1✔
327
        case INT64:
328
          doWithInt64Column(
1✔
329
              rowCount,
330
              column,
331
              bitmaps,
332
              k,
333
              dataOutputStream,
334
              valueOccupation,
335
              dataBitmapOutputStream);
336
          break;
1✔
337
        case FLOAT:
338
          doWithFloatColumn(
1✔
339
              rowCount,
340
              column,
341
              bitmaps,
342
              k,
343
              dataOutputStream,
344
              valueOccupation,
345
              dataBitmapOutputStream);
346
          break;
1✔
347
        case DOUBLE:
348
          doWithDoubleColumn(
1✔
349
              rowCount,
350
              column,
351
              bitmaps,
352
              k,
353
              dataOutputStream,
354
              valueOccupation,
355
              dataBitmapOutputStream);
356
          break;
1✔
357
        case BOOLEAN:
358
          doWithBooleanColumn(
1✔
359
              rowCount,
360
              column,
361
              bitmaps,
362
              k,
363
              dataOutputStream,
364
              valueOccupation,
365
              dataBitmapOutputStream);
366
          break;
1✔
367
        case TEXT:
368
          doWithTextColumn(
1✔
369
              rowCount,
370
              column,
371
              bitmaps,
372
              k,
373
              dataOutputStream,
374
              valueOccupation,
375
              dataBitmapOutputStream);
376
          break;
1✔
377
        default:
378
          throw new UnSupportedDataTypeException(
×
379
              String.format("Data type %s is not supported.", type));
×
380
      }
381
    }
382
  }
1✔
383

384
  private static void doWithInt32Column(
385
      int rowCount,
386
      Column column,
387
      int[] bitmaps,
388
      int columnIndex,
389
      DataOutputStream dataOutputStream,
390
      int[] valueOccupation,
391
      DataOutputStream dataBitmapOutputStream)
392
      throws IOException {
393
    for (int i = 0, size = column.getPositionCount(); i < size; i++) {
1✔
394
      rowCount++;
1✔
395
      if (column.isNull(i)) {
1✔
396
        bitmaps[columnIndex] = bitmaps[columnIndex] << 1;
1✔
397
      } else {
398
        bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
1✔
399
        dataOutputStream.writeInt(column.getInt(i));
1✔
400
        valueOccupation[columnIndex] += 4;
1✔
401
      }
402
      if (rowCount != 0 && rowCount % 8 == 0) {
1✔
403
        dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
×
404
        // we should clear the bitmap every 8 points
405
        bitmaps[columnIndex] = 0;
×
406
      }
407
    }
408
  }
1✔
409

410
  private static void doWithInt64Column(
411
      int rowCount,
412
      Column column,
413
      int[] bitmaps,
414
      int columnIndex,
415
      DataOutputStream dataOutputStream,
416
      int[] valueOccupation,
417
      DataOutputStream dataBitmapOutputStream)
418
      throws IOException {
419
    for (int i = 0, size = column.getPositionCount(); i < size; i++) {
1✔
420
      rowCount++;
1✔
421
      if (column.isNull(i)) {
1✔
422
        bitmaps[columnIndex] = bitmaps[columnIndex] << 1;
1✔
423
      } else {
424
        bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
1✔
425
        dataOutputStream.writeLong(column.getLong(i));
1✔
426
        valueOccupation[columnIndex] += 8;
1✔
427
      }
428
      if (rowCount != 0 && rowCount % 8 == 0) {
1✔
429
        dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
×
430
        // we should clear the bitmap every 8 points
431
        bitmaps[columnIndex] = 0;
×
432
      }
433
    }
434
  }
1✔
435

436
  private static void doWithFloatColumn(
437
      int rowCount,
438
      Column column,
439
      int[] bitmaps,
440
      int columnIndex,
441
      DataOutputStream dataOutputStream,
442
      int[] valueOccupation,
443
      DataOutputStream dataBitmapOutputStream)
444
      throws IOException {
445
    for (int i = 0, size = column.getPositionCount(); i < size; i++) {
1✔
446
      rowCount++;
1✔
447
      if (column.isNull(i)) {
1✔
448
        bitmaps[columnIndex] = bitmaps[columnIndex] << 1;
1✔
449
      } else {
450
        bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
1✔
451
        dataOutputStream.writeFloat(column.getFloat(i));
1✔
452
        valueOccupation[columnIndex] += 4;
1✔
453
      }
454
      if (rowCount != 0 && rowCount % 8 == 0) {
1✔
455
        dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
×
456
        // we should clear the bitmap every 8 points
457
        bitmaps[columnIndex] = 0;
×
458
      }
459
    }
460
  }
1✔
461

462
  private static void doWithDoubleColumn(
463
      int rowCount,
464
      Column column,
465
      int[] bitmaps,
466
      int columnIndex,
467
      DataOutputStream dataOutputStream,
468
      int[] valueOccupation,
469
      DataOutputStream dataBitmapOutputStream)
470
      throws IOException {
471
    for (int i = 0, size = column.getPositionCount(); i < size; i++) {
1✔
472
      rowCount++;
1✔
473
      if (column.isNull(i)) {
1✔
474
        bitmaps[columnIndex] = bitmaps[columnIndex] << 1;
1✔
475
      } else {
476
        bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
1✔
477
        dataOutputStream.writeDouble(column.getDouble(i));
1✔
478
        valueOccupation[columnIndex] += 8;
1✔
479
      }
480
      if (rowCount != 0 && rowCount % 8 == 0) {
1✔
481
        dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
×
482
        // we should clear the bitmap every 8 points
483
        bitmaps[columnIndex] = 0;
×
484
      }
485
    }
486
  }
1✔
487

488
  private static void doWithBooleanColumn(
489
      int rowCount,
490
      Column column,
491
      int[] bitmaps,
492
      int columnIndex,
493
      DataOutputStream dataOutputStream,
494
      int[] valueOccupation,
495
      DataOutputStream dataBitmapOutputStream)
496
      throws IOException {
497
    for (int i = 0, size = column.getPositionCount(); i < size; i++) {
1✔
498
      rowCount++;
1✔
499
      if (column.isNull(i)) {
1✔
500
        bitmaps[columnIndex] = bitmaps[columnIndex] << 1;
1✔
501
      } else {
502
        bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
1✔
503
        dataOutputStream.writeBoolean(column.getBoolean(i));
1✔
504
        valueOccupation[columnIndex] += 1;
1✔
505
      }
506
      if (rowCount != 0 && rowCount % 8 == 0) {
1✔
507
        dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
×
508
        // we should clear the bitmap every 8 points
509
        bitmaps[columnIndex] = 0;
×
510
      }
511
    }
512
  }
1✔
513

514
  private static void doWithTextColumn(
515
      int rowCount,
516
      Column column,
517
      int[] bitmaps,
518
      int columnIndex,
519
      DataOutputStream dataOutputStream,
520
      int[] valueOccupation,
521
      DataOutputStream dataBitmapOutputStream)
522
      throws IOException {
523
    for (int i = 0, size = column.getPositionCount(); i < size; i++) {
1✔
524
      rowCount++;
1✔
525
      if (column.isNull(i)) {
1✔
526
        bitmaps[columnIndex] = bitmaps[columnIndex] << 1;
1✔
527
      } else {
528
        bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
1✔
529
        Binary binary = column.getBinary(i);
1✔
530
        dataOutputStream.writeInt(binary.getLength());
1✔
531
        dataOutputStream.write(binary.getValues());
1✔
532
        valueOccupation[columnIndex] = valueOccupation[columnIndex] + 4 + binary.getLength();
1✔
533
      }
534
      if (rowCount != 0 && rowCount % 8 == 0) {
1✔
535
        dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
×
536
        // we should clear the bitmap every 8 points
537
        bitmaps[columnIndex] = 0;
×
538
      }
539
    }
540
  }
1✔
541

542
  private static void fillRemainingBitMap(
543
      int rowCount, int columnNum, DataOutputStream[] dataOutputStreams, int[] bitmaps)
544
      throws IOException {
545
    // feed the remaining bitmap
546
    int remaining = rowCount % 8;
1✔
547
    for (int k = 0; k < columnNum; k++) {
1✔
548
      if (remaining != 0) {
1✔
549
        DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
1✔
550
        dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining));
1✔
551
      }
552
    }
553
  }
1✔
554

555
  private static void fillTimeColumn(
556
      int rowCount, ByteArrayOutputStream[] byteArrayOutputStreams, TSQueryDataSet tsQueryDataSet) {
557
    // calculate the time buffer size
558
    int timeOccupation = rowCount * 8;
1✔
559
    ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
1✔
560
    timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
1✔
561
    timeBuffer.flip();
1✔
562
    tsQueryDataSet.setTime(timeBuffer);
1✔
563
  }
1✔
564

565
  private static void fillValueColumnsAndBitMaps(
566
      int rowCount,
567
      ByteArrayOutputStream[] byteArrayOutputStreams,
568
      int[] valueOccupation,
569
      TSQueryDataSet tsQueryDataSet) {
570
    // calculate the bitmap buffer size
571
    int bitmapOccupation = (rowCount + 7) / 8;
1✔
572

573
    List<ByteBuffer> bitmapList = new LinkedList<>();
1✔
574
    List<ByteBuffer> valueList = new LinkedList<>();
1✔
575
    for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
1✔
576
      ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]);
1✔
577
      valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
1✔
578
      valueBuffer.flip();
1✔
579
      valueList.add(valueBuffer);
1✔
580

581
      ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
1✔
582
      bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
1✔
583
      bitmapBuffer.flip();
1✔
584
      bitmapList.add(bitmapBuffer);
1✔
585
    }
586
    tsQueryDataSet.setBitmapList(bitmapList);
1✔
587
    tsQueryDataSet.setValueList(valueList);
1✔
588
  }
1✔
589

590
  /**
591
   * To fetch required amounts of data and combine them through List
592
   *
593
   * @param queryExecution used to get TsBlock from and judge whether there is more data.
594
   * @param fetchSize wanted row size
595
   * @return pair.left is serialized TsBlock pair.right indicates if the read finished
596
   * @throws IoTDBException IoTDBException may be thrown if error happened while getting TsBlock
597
   *     from IQueryExecution
598
   */
599
  public static Pair<List<ByteBuffer>, Boolean> convertQueryResultByFetchSize(
600
      IQueryExecution queryExecution, int fetchSize) throws IoTDBException {
601
    int rowCount = 0;
1✔
602
    List<ByteBuffer> res = new ArrayList<>();
1✔
603
    while (rowCount < fetchSize) {
1✔
604
      Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult();
1✔
605
      if (!optionalByteBuffer.isPresent()) {
1✔
606
        break;
1✔
607
      }
608
      ByteBuffer byteBuffer = optionalByteBuffer.get();
1✔
609
      byteBuffer.mark();
1✔
610
      int valueColumnCount = byteBuffer.getInt();
1✔
611
      for (int i = 0; i < valueColumnCount; i++) {
1✔
612
        byteBuffer.get();
1✔
613
      }
614
      int positionCount = byteBuffer.getInt();
1✔
615
      byteBuffer.reset();
1✔
616
      if (positionCount != 0) {
1✔
617
        res.add(byteBuffer);
1✔
618
      }
619
      rowCount += positionCount;
1✔
620
    }
1✔
621
    return new Pair<>(res, !queryExecution.hasNextResult());
1✔
622
  }
623

624
  public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) {
625
    long[] times = new long[size];
1✔
626
    for (int i = 0; i < size; i++) {
1✔
627
      times[i] = buffer.getLong();
1✔
628
    }
629
    return times;
1✔
630
  }
631

632
  public static long[] readTimesFromStream(DataInputStream stream, int size) throws IOException {
633
    long[] times = new long[size];
1✔
634
    for (int i = 0; i < size; i++) {
1✔
635
      times[i] = stream.readLong();
1✔
636
    }
637
    return times;
1✔
638
  }
639

640
  public static Optional<BitMap[]> readBitMapsFromBuffer(ByteBuffer buffer, int columns, int size) {
641
    if (!buffer.hasRemaining()) {
1✔
642
      return Optional.empty();
1✔
643
    }
644
    BitMap[] bitMaps = new BitMap[columns];
1✔
645
    for (int i = 0; i < columns; i++) {
1✔
646
      boolean hasBitMap = BytesUtils.byteToBool(buffer.get());
1✔
647
      if (hasBitMap) {
1✔
648
        byte[] bytes = new byte[size / Byte.SIZE + 1];
1✔
649
        for (int j = 0; j < bytes.length; j++) {
1✔
650
          bytes[j] = buffer.get();
1✔
651
        }
652
        bitMaps[i] = new BitMap(size, bytes);
1✔
653
      }
654
    }
655
    return Optional.of(bitMaps);
1✔
656
  }
657

658
  public static Optional<BitMap[]> readBitMapsFromStream(
659
      DataInputStream stream, int columns, int size) throws IOException {
660
    if (stream.available() <= 0) {
1✔
661
      return Optional.empty();
×
662
    }
663
    BitMap[] bitMaps = new BitMap[columns];
1✔
664
    for (int i = 0; i < columns; i++) {
1✔
665
      boolean hasBitMap = BytesUtils.byteToBool(stream.readByte());
1✔
666
      if (hasBitMap) {
1✔
667
        byte[] bytes = new byte[size / Byte.SIZE + 1];
1✔
668
        for (int j = 0; j < bytes.length; j++) {
1✔
669
          bytes[j] = stream.readByte();
1✔
670
        }
671
        bitMaps[i] = new BitMap(size, bytes);
1✔
672
      }
673
    }
674
    return Optional.of(bitMaps);
1✔
675
  }
676

677
  public static Object[] readTabletValuesFromBuffer(
678
      ByteBuffer buffer, List<Integer> types, int columns, int size) {
679
    TSDataType[] dataTypes = new TSDataType[types.size()];
1✔
680
    for (int i = 0; i < dataTypes.length; i++) {
1✔
681
      dataTypes[i] = TSDataType.values()[types.get(i)];
1✔
682
    }
683
    return readTabletValuesFromBuffer(buffer, dataTypes, columns, size);
1✔
684
  }
685

686
  /**
687
   * Deserialize Tablet Values From Buffer
688
   *
689
   * @param buffer data values
690
   * @param columns column number
691
   * @param size value count in each column
692
   * @throws UnSupportedDataTypeException if TSDataType is unknown, UnSupportedDataTypeException
693
   *     will be thrown.
694
   */
695
  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
696
  public static Object[] readTabletValuesFromBuffer(
697
      ByteBuffer buffer, TSDataType[] types, int columns, int size) {
698
    Object[] values = new Object[columns];
1✔
699
    for (int i = 0; i < columns; i++) {
1✔
700
      switch (types[i]) {
1✔
701
        case BOOLEAN:
702
          boolean[] boolValues = new boolean[size];
1✔
703
          for (int index = 0; index < size; index++) {
1✔
704
            boolValues[index] = BytesUtils.byteToBool(buffer.get());
1✔
705
          }
706
          values[i] = boolValues;
1✔
707
          break;
1✔
708
        case INT32:
709
          int[] intValues = new int[size];
1✔
710
          for (int index = 0; index < size; index++) {
1✔
711
            intValues[index] = buffer.getInt();
1✔
712
          }
713
          values[i] = intValues;
1✔
714
          break;
1✔
715
        case INT64:
716
          long[] longValues = new long[size];
1✔
717
          for (int index = 0; index < size; index++) {
1✔
718
            longValues[index] = buffer.getLong();
1✔
719
          }
720
          values[i] = longValues;
1✔
721
          break;
1✔
722
        case FLOAT:
723
          float[] floatValues = new float[size];
1✔
724
          for (int index = 0; index < size; index++) {
1✔
725
            floatValues[index] = buffer.getFloat();
1✔
726
          }
727
          values[i] = floatValues;
1✔
728
          break;
1✔
729
        case DOUBLE:
730
          double[] doubleValues = new double[size];
1✔
731
          for (int index = 0; index < size; index++) {
1✔
732
            doubleValues[index] = buffer.getDouble();
1✔
733
          }
734
          values[i] = doubleValues;
1✔
735
          break;
1✔
736
        case TEXT:
737
          Binary[] binaryValues = new Binary[size];
1✔
738
          for (int index = 0; index < size; index++) {
1✔
739
            int binarySize = buffer.getInt();
1✔
740
            byte[] binaryValue = new byte[binarySize];
1✔
741
            buffer.get(binaryValue);
1✔
742
            binaryValues[index] = new Binary(binaryValue);
1✔
743
          }
744
          values[i] = binaryValues;
1✔
745
          break;
1✔
746
        default:
747
          throw new UnSupportedDataTypeException(
×
748
              String.format("data type %s is not supported when convert data at client", types[i]));
×
749
      }
750
    }
751
    return values;
1✔
752
  }
753

754
  public static Object[] readTabletValuesFromStream(
755
      DataInputStream stream, TSDataType[] types, int columns, int size) throws IOException {
756
    Object[] values = new Object[columns];
1✔
757
    for (int i = 0; i < columns; i++) {
1✔
758
      switch (types[i]) {
1✔
759
        case BOOLEAN:
760
          parseBooleanColumn(size, stream, values, i);
1✔
761
          break;
1✔
762
        case INT32:
763
          parseInt32Column(size, stream, values, i);
1✔
764
          break;
1✔
765
        case INT64:
766
          parseInt64Column(size, stream, values, i);
1✔
767
          break;
1✔
768
        case FLOAT:
769
          parseFloatColumn(size, stream, values, i);
1✔
770
          break;
1✔
771
        case DOUBLE:
772
          parseDoubleColumn(size, stream, values, i);
1✔
773
          break;
1✔
774
        case TEXT:
775
          parseTextColumn(size, stream, values, i);
1✔
776
          break;
1✔
777
        default:
778
          throw new UnSupportedDataTypeException(
×
779
              String.format("data type %s is not supported when convert data at client", types[i]));
×
780
      }
781
    }
782
    return values;
1✔
783
  }
784

785
  private static void parseBooleanColumn(
786
      int size, DataInputStream stream, Object[] values, int columnIndex) throws IOException {
787
    boolean[] boolValues = new boolean[size];
1✔
788
    for (int index = 0; index < size; index++) {
1✔
789
      boolValues[index] = BytesUtils.byteToBool(stream.readByte());
1✔
790
    }
791
    values[columnIndex] = boolValues;
1✔
792
  }
1✔
793

794
  private static void parseInt32Column(
795
      int size, DataInputStream stream, Object[] values, int columnIndex) throws IOException {
796
    int[] intValues = new int[size];
1✔
797
    for (int index = 0; index < size; index++) {
1✔
798
      intValues[index] = stream.readInt();
1✔
799
    }
800
    values[columnIndex] = intValues;
1✔
801
  }
1✔
802

803
  private static void parseInt64Column(
804
      int size, DataInputStream stream, Object[] values, int columnIndex) throws IOException {
805
    long[] longValues = new long[size];
1✔
806
    for (int index = 0; index < size; index++) {
1✔
807
      longValues[index] = stream.readLong();
1✔
808
    }
809
    values[columnIndex] = longValues;
1✔
810
  }
1✔
811

812
  private static void parseFloatColumn(
813
      int size, DataInputStream stream, Object[] values, int columnIndex) throws IOException {
814
    float[] floatValues = new float[size];
1✔
815
    for (int index = 0; index < size; index++) {
1✔
816
      floatValues[index] = stream.readFloat();
1✔
817
    }
818
    values[columnIndex] = floatValues;
1✔
819
  }
1✔
820

821
  private static void parseDoubleColumn(
822
      int size, DataInputStream stream, Object[] values, int columnIndex) throws IOException {
823
    double[] doubleValues = new double[size];
1✔
824
    for (int index = 0; index < size; index++) {
1✔
825
      doubleValues[index] = stream.readDouble();
1✔
826
    }
827
    values[columnIndex] = doubleValues;
1✔
828
  }
1✔
829

830
  private static void parseTextColumn(
831
      int size, DataInputStream stream, Object[] values, int columnIndex) throws IOException {
832
    Binary[] binaryValues = new Binary[size];
1✔
833
    for (int index = 0; index < size; index++) {
1✔
834
      int binarySize = stream.readInt();
1✔
835
      byte[] binaryValue = new byte[binarySize];
1✔
836
      int actualReadSize = stream.read(binaryValue);
1✔
837
      if (actualReadSize != binarySize) {
1✔
838
        throw new IllegalStateException(
×
839
            "Expect to read " + binarySize + " bytes, actually read " + actualReadSize + "bytes.");
840
      }
841
      binaryValues[index] = new Binary(binaryValue);
1✔
842
    }
843
    values[columnIndex] = binaryValues;
1✔
844
  }
1✔
845
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc