• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4473

08 Jul 2025 09:38AM UTC coverage: 62.922% (+0.7%) from 62.22%
#4473

push

travis-ci

web-flow
Merge pull request #31712 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

158525 of 321496 branches covered (49.31%)

Branch coverage included in aggregate %.

56 of 60 new or added lines in 13 files covered. (93.33%)

1333 existing lines in 67 files now uncovered.

245526 of 320647 relevant lines covered (76.57%)

17689640.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.58
/source/common/src/tdatablock.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tdatablock.h"
18
#include "tcompare.h"
19
#include "tlog.h"
20
#include "tname.h"
21
#include "tglobal.h"
22

23
#define MALLOC_ALIGN_BYTES 32
24

25
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
96,452,655✔
26
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
96,452,655!
27
    if (pColumnInfoData->reassigned) {
17,791,008!
28
      int32_t totalSize = 0;
×
29
      for (int32_t row = 0; row < numOfRows; ++row) {
×
30
        char*   pColData = pColumnInfoData->pData + pColumnInfoData->varmeta.offset[row];
×
31
        int32_t colSize = 0;
×
32
        if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
×
33
          colSize = getJsonValueLen(pColData);
×
34
        } else {
35
          colSize = varDataTLen(pColData);
×
36
        }
37
        totalSize += colSize;
×
38
      }
39
      return totalSize;
×
40
    }
41
    return pColumnInfoData->varmeta.length;
17,791,008✔
42
  } else {
43
    if (pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) {
78,661,647✔
44
      return 0;
19,045✔
45
    } else {
46
      return pColumnInfoData->info.bytes * numOfRows;
78,642,602✔
47
    }
48
  }
49
}
50

51
int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx) {
187,080,453✔
52
  if (colDataIsNull_s(pColumnInfoData, rowIdx)) {
374,160,906!
53
    return 0;
×
54
  }
55

56
  if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) return pColumnInfoData->info.bytes;
187,080,453!
57
  if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON)
1,096,548!
58
    return getJsonValueLen(colDataGetData(pColumnInfoData, rowIdx));
×
59
  else
60
    return varDataTLen(colDataGetData(pColumnInfoData, rowIdx));
1,096,548!
61
}
62

63
int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
1,266,843,377✔
64
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
1,266,843,377!
65
    return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows;
74,773,800✔
66
  } else {
67
    return ((pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) ? 0 : pColumnInfoData->info.bytes * numOfRows) +
1,192,069,577!
68
           BitmapLen(numOfRows);
1,192,069,577✔
69
  }
70
}
71

72
int32_t getJsonValueLen(const char* data) {
27,936✔
73
  int32_t dataLen = 0;
27,936✔
74
  if (*data == TSDB_DATA_TYPE_NULL) {
27,936✔
75
    dataLen = CHAR_BYTES;
1,587✔
76
  } else if (*data == TSDB_DATA_TYPE_NCHAR) {
26,349✔
77
    dataLen = varDataTLen(data + CHAR_BYTES) + CHAR_BYTES;
4,098✔
78
  } else if (*data == TSDB_DATA_TYPE_DOUBLE) {
22,251✔
79
    dataLen = DOUBLE_BYTES + CHAR_BYTES;
3,552✔
80
  } else if (*data == TSDB_DATA_TYPE_BOOL) {
18,699✔
81
    dataLen = CHAR_BYTES + CHAR_BYTES;
1,102✔
82
  } else if (tTagIsJson(data)) {  // json string
17,597!
83
    dataLen = ((STag*)(data))->len;
17,601✔
84
  } else {
85
    uError("Invalid data type:%d in Json", *data);
×
86
  }
87
  return dataLen;
27,939✔
88
}
89

90
static int32_t getDataLen(int32_t type, const char* pData) {
2,147,483,647✔
91
  int32_t dataLen = 0;
2,147,483,647✔
92
  if (type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
93
    dataLen = getJsonValueLen(pData);
17,637✔
94
  } else {
95
    dataLen = varDataTLen(pData);
2,147,483,647✔
96
  }
97
  return dataLen;
2,147,483,647✔
98
}
99

100
static int32_t colDataSetValHelp(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
2,147,483,647✔
101
  if (isNull || pData == NULL) {
2,147,483,647!
102
    // There is a placehold for each NULL value of binary or nchar type.
UNCOV
103
    if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
×
104
      pColumnInfoData->varmeta.offset[rowIndex] = -1;  // it is a null value of VAR type.
×
105
    } else {
106
      colDataSetNull_f_s(pColumnInfoData, rowIndex);
491,903,335✔
107
    }
108

UNCOV
109
    pColumnInfoData->hasNull = true;
×
UNCOV
110
    return 0;
×
111
  }
112

113
  int32_t type = pColumnInfoData->info.type;
2,147,483,647✔
114
  if (IS_VAR_DATA_TYPE(type)) {
2,147,483,647!
115
    int32_t dataLen = getDataLen(type, pData);
845,256,348✔
116
    if (pColumnInfoData->varmeta.offset[rowIndex] > 0) {
2,147,483,647✔
117
      pColumnInfoData->varmeta.length = pColumnInfoData->varmeta.offset[rowIndex];
240✔
118
    }
119

120
    SVarColAttr* pAttr = &pColumnInfoData->varmeta;
2,147,483,647✔
121
    if (pAttr->allocLen < pAttr->length + dataLen) {
2,147,483,647✔
122
      uint32_t newSize = pAttr->allocLen;
146,090,209✔
123
      if (newSize <= 1) {
146,090,209✔
124
        newSize = 8;
26,563,306✔
125
      }
126

127
      while (newSize < pAttr->length + dataLen) {
331,834,608✔
128
        newSize = newSize * 1.5;
185,744,399✔
129
        if (newSize > UINT32_MAX) {
130
          return TSDB_CODE_OUT_OF_MEMORY;
131
        }
132
      }
133

134
      char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
146,090,209!
135
      if (buf == NULL) {
147,633,597!
136
        return terrno;
×
137
      }
138

139
      pColumnInfoData->pData = buf;
147,633,597✔
140
      pAttr->allocLen = newSize;
147,633,597✔
141
    }
142

143
    uint32_t len = pColumnInfoData->varmeta.length;
2,147,483,647✔
144
    pColumnInfoData->varmeta.offset[rowIndex] = len;
2,147,483,647✔
145

146
    (void)memmove(pColumnInfoData->pData + len, pData, dataLen);
2,147,483,647✔
147
    pColumnInfoData->varmeta.length += dataLen;
2,147,483,647✔
148
  } else {
149
    memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex, pData, pColumnInfoData->info.bytes);
2,147,483,647✔
150
    colDataClearNull_f(pColumnInfoData->nullbitmap, rowIndex);
2,147,483,647✔
151
  }
152

153
  return 0;
2,147,483,647✔
154
}
155

156
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
2,147,483,647✔
157
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
2,147,483,647!
158
   pColumnInfoData->varmeta.offset[rowIndex] = -1;
2,147,483,647✔
159
  }
160

161
  return colDataSetValHelp(pColumnInfoData, rowIndex, pData, isNull);
2,147,483,647✔
162
}
163

164
int32_t colDataSetValOrCover(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
363,189,868✔
165
  return colDataSetValHelp(pColumnInfoData, rowIndex, pData, isNull);
363,189,868✔
166
}
167

168
int32_t varColSetVarData(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pVarData, int32_t varDataLen,
×
169
                         bool isNull) {
170
  if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
×
171
    return TSDB_CODE_INVALID_PARA;
×
172
  }
173

174
  if (isNull || pVarData == NULL) {
×
175
    pColumnInfoData->varmeta.offset[rowIndex] = -1;  // it is a null value of VAR type.
×
176
    pColumnInfoData->hasNull = true;
×
177
    return TSDB_CODE_SUCCESS;
×
178
  }
179

180
  int32_t dataLen = VARSTR_HEADER_SIZE + varDataLen;
×
181
  if (pColumnInfoData->varmeta.offset[rowIndex] > 0) {
×
182
    pColumnInfoData->varmeta.length = pColumnInfoData->varmeta.offset[rowIndex];
×
183
  }
184

185
  SVarColAttr* pAttr = &pColumnInfoData->varmeta;
×
186
  if (pAttr->allocLen < pAttr->length + dataLen) {
×
187
    uint32_t newSize = pAttr->allocLen;
×
188
    if (newSize <= 1) {
×
189
      newSize = 8;
×
190
    }
191

192
    while (newSize < pAttr->length + dataLen) {
×
193
      newSize = newSize * 1.5;
×
194
      if (newSize > UINT32_MAX) {
195
        return TSDB_CODE_OUT_OF_MEMORY;
196
      }
197
    }
198

199
    char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
×
200
    if (buf == NULL) {
×
201
      return terrno;
×
202
    }
203

204
    pColumnInfoData->pData = buf;
×
205
    pAttr->allocLen = newSize;
×
206
  }
207

208
  uint32_t len = pColumnInfoData->varmeta.length;
×
209
  pColumnInfoData->varmeta.offset[rowIndex] = len;
×
210

211
  (void)memmove(varDataVal(pColumnInfoData->pData + len), pVarData, varDataLen);
×
212
  varDataSetLen(pColumnInfoData->pData + len, varDataLen);
×
213
  pColumnInfoData->varmeta.length += dataLen;
×
214
  return TSDB_CODE_SUCCESS;
×
215
}
216

217
int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx,
×
218
                           const char* pData) {
219
  int32_t type = pColumnInfoData->info.type;
×
220
  if (IS_VAR_DATA_TYPE(type)) {
×
221
    pColumnInfoData->varmeta.offset[dstRowIdx] = pColumnInfoData->varmeta.offset[srcRowIdx];
×
222
    pColumnInfoData->reassigned = true;
×
223
  } else {
224
    memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * dstRowIdx, pData, pColumnInfoData->info.bytes);
×
225
    colDataClearNull_f(pColumnInfoData->nullbitmap, dstRowIdx);
×
226
  }
227

228
  return 0;
×
229
}
230

231
static int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) {
3,012,848✔
232
  if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
3,012,848!
233
    return TSDB_CODE_SUCCESS;
×
234
  }
235

236
  if (pColumnInfoData->varmeta.allocLen >= newSize) {
3,012,848!
237
    return TSDB_CODE_SUCCESS;
×
238
  }
239

240
  if (pColumnInfoData->varmeta.allocLen < newSize) {
3,012,848!
241
    char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
3,013,016✔
242
    if (buf == NULL) {
3,013,061!
243
      return terrno;
×
244
    }
245

246
    pColumnInfoData->pData = buf;
3,013,061✔
247
    pColumnInfoData->varmeta.allocLen = newSize;
3,013,061✔
248
  }
249

250
  return TSDB_CODE_SUCCESS;
3,012,893✔
251
}
252

253
static int32_t doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData,
38,803,892✔
254
                            int32_t itemLen, int32_t numOfRows, bool trimValue) {
255
  if (pColumnInfoData->info.bytes < itemLen) {
38,803,892!
256
    uWarn("column/tag actual data len %d is bigger than schema len %d, trim it:%d", itemLen,
×
257
          pColumnInfoData->info.bytes, trimValue);
258
    if (trimValue) {
×
259
      itemLen = pColumnInfoData->info.bytes;
×
260
    } else {
261
      return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
262
    }
263
  }
264

265
  size_t   start = 1;
38,803,892✔
266
  int32_t  t = 0;
38,803,892✔
267
  int32_t  count = log(numOfRows) / log(2);
38,803,892✔
268
  uint32_t startOffset =
38,803,892✔
269
      (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) ? pColumnInfoData->varmeta.length : (currentRow * itemLen);
38,803,892!
270

271
  // the first item
272
  memcpy(pColumnInfoData->pData + startOffset, pData, itemLen);
38,803,892✔
273

274
  while (t < count) {
126,043,214✔
275
    int32_t xlen = 1 << t;
87,239,322✔
276
    memcpy(pColumnInfoData->pData + start * itemLen + startOffset, pColumnInfoData->pData + startOffset,
87,239,322✔
277
           xlen * itemLen);
87,239,322✔
278
    t += 1;
87,239,322✔
279
    start += xlen;
87,239,322✔
280
  }
281

282
  // the tail part
283
  if (numOfRows > start) {
38,803,892✔
284
    memcpy(pColumnInfoData->pData + start * itemLen + startOffset, pColumnInfoData->pData + startOffset,
14,401,900✔
285
           (numOfRows - start) * itemLen);
14,401,900✔
286
  }
287

288
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
38,803,892!
289
    for (int32_t i = 0; i < numOfRows; ++i) {
1,265,607,643✔
290
      pColumnInfoData->varmeta.offset[i + currentRow] = pColumnInfoData->varmeta.length + i * itemLen;
1,254,774,760✔
291
    }
292

293
    pColumnInfoData->varmeta.length += numOfRows * itemLen;
10,832,883✔
294
  }
295

296
  return TSDB_CODE_SUCCESS;
38,803,892✔
297
}
298

299
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows,
38,801,826✔
300
                         bool trimValue) {
301
  int32_t len = pColumnInfoData->info.bytes;
38,801,826✔
302
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
38,801,826✔
303
    if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
10,833,197✔
304
      len = getJsonValueLen(pData);
188✔
305
    } else {
306
      len = varDataTLen(pData);
10,833,009✔
307
    }
308
    if (pColumnInfoData->varmeta.allocLen < (numOfRows * len + pColumnInfoData->varmeta.length)) {
10,833,197✔
309
      int32_t code = colDataReserve(pColumnInfoData, (numOfRows * len + pColumnInfoData->varmeta.length));
3,013,007✔
310
      if (code != TSDB_CODE_SUCCESS) {
3,013,067!
311
        return code;
×
312
      }
313
    }
314
  }
315

316
  return doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows, trimValue);
38,801,886✔
317
}
318

319
void colDataSetNItemsNull(SColumnInfoData* pColumnInfoData, uint32_t currentRow, uint32_t numOfRows) {
777,993✔
320
  pColumnInfoData->hasNull = true;
777,993✔
321

322
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
777,993!
323
    memset(&pColumnInfoData->varmeta.offset[currentRow], -1, sizeof(int32_t) * numOfRows);
151,823✔
324
  } else {
325
    if (numOfRows < 16) {
626,170!
326
      for (int32_t i = 0; i < numOfRows; ++i) {
1,255,825✔
327
        colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
629,655✔
328
      }
329
    } else {
330
      int32_t i = 0;
×
331
      for (; i < numOfRows; ++i) {
×
332
        if (BitPos(currentRow + i)) {
×
333
          colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
×
334
        } else {
335
          break;
×
336
        }
337
      }
338

339
      int32_t bytes = (numOfRows - i) / 8;
×
340
      memset(&BMCharPos(pColumnInfoData->nullbitmap, currentRow + i), 0xFF, bytes);
×
341
      i += bytes * 8;
×
342

343
      for (; i < numOfRows; ++i) {
×
344
        colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
×
345
      }
346
    }
347
  }
348
}
777,993✔
349

350
int32_t colDataCopyAndReassign(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData,
×
351
                               uint32_t numOfRows) {
352
  int32_t code = colDataSetVal(pColumnInfoData, currentRow, pData, false);
×
353
  if (code) {
×
354
    return code;
×
355
  }
356

357
  if (numOfRows > 1) {
×
358
    int32_t* pOffset = pColumnInfoData->varmeta.offset;
×
359
    memset(&pOffset[currentRow + 1], pOffset[currentRow], sizeof(pOffset[0]) * (numOfRows - 1));
×
360
    pColumnInfoData->reassigned = true;
×
361
  }
362

363
  return code;
×
364
}
365

366
int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows,
3,894✔
367
                          bool isNull) {
368
  int32_t len = pColumnInfoData->info.bytes;
3,894✔
369
  if (isNull) {
3,894!
370
    colDataSetNItemsNull(pColumnInfoData, currentRow, numOfRows);
×
371
    pColumnInfoData->hasNull = true;
×
372
    return 0;
×
373
  }
374

375
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
3,894!
376
    return colDataCopyAndReassign(pColumnInfoData, currentRow, pData, numOfRows);
×
377
  } else {
378
    int32_t  colBytes = pColumnInfoData->info.bytes;
3,894✔
379
    int32_t  colOffset = currentRow * colBytes;
3,894✔
380
    uint32_t num = 1;
3,894✔
381

382
    void* pStart = pColumnInfoData->pData + colOffset;
3,894✔
383
    memcpy(pStart, pData, colBytes);
3,894✔
384
    colOffset += num * colBytes;
3,894✔
385

386
    while (num < numOfRows) {
12,032✔
387
      int32_t maxNum = num << 1;
8,138✔
388
      int32_t tnum = maxNum > numOfRows ? (numOfRows - num) : num;
8,138✔
389

390
      memcpy(pColumnInfoData->pData + colOffset, pStart, tnum * colBytes);
8,138✔
391
      colOffset += tnum * colBytes;
8,138✔
392
      num += tnum;
8,138✔
393
    }
394
  }
395

396
  return TSDB_CODE_SUCCESS;
3,894✔
397
}
398

399
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource,
21,442,414✔
400
                          int32_t numOfRow2) {
401
  if (numOfRow2 <= 0) return;
21,442,414!
402

403
  uint32_t total = numOfRow1 + numOfRow2;
21,442,414✔
404

405
  uint32_t remindBits = BitPos(numOfRow1);
21,442,414✔
406
  uint32_t shiftBits = 8 - remindBits;
21,442,414✔
407

408
  if (remindBits == 0) {  // no need to shift bits of bitmap
21,442,414✔
409
    memcpy(pColumnInfoData->nullbitmap + BitmapLen(numOfRow1), pSource->nullbitmap, BitmapLen(numOfRow2));
15,214,555✔
410
    return;
15,214,555✔
411
  }
412

413
  uint8_t* p = (uint8_t*)pSource->nullbitmap;
6,227,859✔
414
  pColumnInfoData->nullbitmap[BitmapLen(numOfRow1) - 1] &= (0B11111111 << shiftBits);  // clear remind bits
6,227,859✔
415
  pColumnInfoData->nullbitmap[BitmapLen(numOfRow1) - 1] |= (p[0] >> remindBits);       // copy remind bits
6,227,859✔
416

417
  if (BitmapLen(numOfRow1) == BitmapLen(total)) {
6,227,859✔
418
    return;
2,090,714✔
419
  }
420

421
  int32_t len = BitmapLen(numOfRow2);
4,137,145✔
422
  int32_t i = 0;
4,137,145✔
423

424
  uint8_t* start = (uint8_t*)&pColumnInfoData->nullbitmap[BitmapLen(numOfRow1)];
4,137,145✔
425
  int32_t  overCount = BitmapLen(total) - BitmapLen(numOfRow1);
4,137,145✔
426
  memset(start, 0, overCount);
4,137,145✔
427
  while (i < len) {  // size limit of pSource->nullbitmap
155,175,688✔
428
    if (i >= 1) {
152,780,697✔
429
      start[i - 1] |= (p[i] >> remindBits);  // copy remind bits
148,642,401✔
430
    }
431

432
    if (i >= overCount) {  // size limit of pColumnInfoData->nullbitmap
152,780,697✔
433
      return;
1,742,154✔
434
    }
435

436
    start[i] |= (p[i] << shiftBits);  // copy shift bits
151,038,543✔
437
    i += 1;
151,038,543✔
438
  }
439
}
440

441
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
26,600,477✔
442
                        const SColumnInfoData* pSource, int32_t numOfRow2) {
443
  if (pColumnInfoData->info.type != pSource->info.type) {
26,600,477!
444
    return TSDB_CODE_INVALID_PARA;
×
445
  }
446

447
  if (numOfRow2 == 0) {
26,600,477!
448
    return numOfRow1;
×
449
  }
450

451
  if (pSource->hasNull) {
26,600,477✔
452
    pColumnInfoData->hasNull = pSource->hasNull;
8,085,083✔
453
  }
454

455
  uint32_t finalNumOfRows = numOfRow1 + numOfRow2;
26,600,477✔
456
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
26,600,477✔
457
    // Handle the bitmap
458
    if (finalNumOfRows > (*capacity)) {
5,167,489✔
459
      char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2));
3,447,492!
460
      if (p == NULL) {
3,447,899!
461
        return terrno;
×
462
      }
463

464
      *capacity = finalNumOfRows;
3,447,899✔
465
      pColumnInfoData->varmeta.offset = (int32_t*)p;
3,447,899✔
466
    }
467

468
    for (int32_t i = 0; i < numOfRow2; ++i) {
508,451,190✔
469
      if (pSource->varmeta.offset[i] == -1) {
503,283,294✔
470
        pColumnInfoData->varmeta.offset[i + numOfRow1] = -1;
55,188,679✔
471
      } else {
472
        pColumnInfoData->varmeta.offset[i + numOfRow1] = pSource->varmeta.offset[i] + pColumnInfoData->varmeta.length;
448,094,615✔
473
      }
474
    }
475

476
    // copy data
477
    uint32_t len = pSource->varmeta.length;
5,167,896✔
478
    uint32_t oldLen = pColumnInfoData->varmeta.length;
5,167,896✔
479
    if (pColumnInfoData->varmeta.allocLen < len + oldLen) {
5,167,896✔
480
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, len + oldLen);
3,818,705!
481
      if (tmp == NULL) {
3,818,844!
482
        return terrno;
×
483
      }
484

485
      pColumnInfoData->pData = tmp;
3,818,844✔
486
      pColumnInfoData->varmeta.allocLen = len + oldLen;
3,818,844✔
487
    }
488

489
    if (pColumnInfoData->pData && pSource->pData) {  // TD-20382
5,168,035✔
490
      memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len);
4,753,170✔
491
    }
492
    pColumnInfoData->varmeta.length = len + oldLen;
5,168,035✔
493
  } else {
494
    if (finalNumOfRows > (*capacity)) {
21,432,988✔
495
      // all data may be null, when the pColumnInfoData->info.type == 0, bytes == 0;
496
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes);
12,622,970!
497
      if (tmp == NULL) {
12,633,026!
498
        return terrno;
×
499
      }
500

501
      pColumnInfoData->pData = tmp;
12,633,026✔
502
      if (BitmapLen(numOfRow1) < BitmapLen(finalNumOfRows)) {
12,633,026✔
503
        char* btmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(finalNumOfRows));
10,548,246!
504
        if (btmp == NULL) {
10,548,996!
505
          return terrno;
×
506
        }
507
        uint32_t extend = BitmapLen(finalNumOfRows) - BitmapLen(numOfRow1);
10,548,996✔
508
        memset(btmp + BitmapLen(numOfRow1), 0, extend);
10,548,996✔
509
        pColumnInfoData->nullbitmap = btmp;
10,548,996✔
510
      }
511

512
      *capacity = finalNumOfRows;
12,633,776✔
513
    }
514

515
    doBitmapMerge(pColumnInfoData, numOfRow1, pSource, numOfRow2);
21,443,794✔
516

517
    if (pSource->pData) {
21,442,270✔
518
      int32_t offset = pColumnInfoData->info.bytes * numOfRow1;
21,439,313✔
519
      memcpy(pColumnInfoData->pData + offset, pSource->pData, pSource->info.bytes * numOfRow2);
21,439,313✔
520
    }
521
  }
522

523
  return numOfRow1 + numOfRow2;
26,610,305✔
524
}
525

526
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
131,068,295✔
527
                      const SDataBlockInfo* pBlockInfo) {
528
  if (pColumnInfoData->info.type != pSource->info.type || (pBlockInfo != NULL && pBlockInfo->capacity < numOfRows)) {
131,068,295!
529
    return TSDB_CODE_INVALID_PARA;
×
530
  }
531

532
  if (numOfRows <= 0) {
131,068,295✔
533
    return numOfRows;
14,143✔
534
  }
535

536
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
155,535,494!
537
    int32_t newLen = pSource->varmeta.length;
24,462,323✔
538
    memcpy(pColumnInfoData->varmeta.offset, pSource->varmeta.offset, sizeof(int32_t) * numOfRows);
24,462,323✔
539
    if (pColumnInfoData->varmeta.allocLen < newLen) {
24,462,323✔
540
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, newLen);
19,409,021!
541
      if (tmp == NULL) {
19,428,040!
542
        return terrno;
×
543
      }
544

545
      pColumnInfoData->pData = tmp;
19,428,040✔
546
      pColumnInfoData->varmeta.allocLen = newLen;
19,428,040✔
547
    }
548

549
    pColumnInfoData->varmeta.length = newLen;
24,481,342✔
550
    if (pColumnInfoData->pData != NULL && pSource->pData != NULL) {
24,481,342✔
551
      memcpy(pColumnInfoData->pData, pSource->pData, newLen);
23,014,485✔
552
    }
553
  } else {
554
    memcpy(pColumnInfoData->nullbitmap, pSource->nullbitmap, BitmapLen(numOfRows));
106,591,829✔
555
    if (pSource->pData != NULL) {
106,591,829!
556
      memcpy(pColumnInfoData->pData, pSource->pData, pSource->info.bytes * numOfRows);
106,620,829✔
557
    }
558
  }
559

560
  pColumnInfoData->hasNull = pSource->hasNull;
131,073,171✔
561
  pColumnInfoData->info = pSource->info;
131,073,171✔
562
  return 0;
131,073,171✔
563
}
564

565
int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx,
7,824,734✔
566
                           int32_t numOfRows) {
567
  if (pDst->info.type != pSrc->info.type || pDst->info.bytes != pSrc->info.bytes || pSrc->reassigned) {
7,824,734!
568
    return TSDB_CODE_INVALID_PARA;
×
569
  }
570

571
  if (numOfRows <= 0) {
7,825,062!
572
    return numOfRows;
×
573
  }
574

575
  if (IS_VAR_DATA_TYPE(pDst->info.type)) {
9,136,280!
576
    int32_t allLen = 0;
1,311,218✔
577
    void*   srcAddr = NULL;
1,311,218✔
578
    if (pSrc->hasNull) {
1,311,218✔
579
      for (int32_t i = 0; i < numOfRows; ++i) {
2,390,540✔
580
        if (colDataIsNull_var(pSrc, srcIdx + i)) {
1,196,179✔
581
          pDst->varmeta.offset[dstIdx + i] = -1;
218,492✔
582
          pDst->hasNull = true;
218,492✔
583
          continue;
218,492✔
584
        }
585

586
        char* pData = colDataGetVarData(pSrc, srcIdx + i);
977,687✔
587
        if (NULL == srcAddr) {
977,687✔
588
          srcAddr = pData;
975,869✔
589
        }
590
        int32_t dataLen = 0;
977,687✔
591
        if (pSrc->info.type == TSDB_DATA_TYPE_JSON) {
977,687✔
592
          dataLen = getJsonValueLen(pData);
362✔
593
        } else {
594
          dataLen = varDataTLen(pData);
977,325✔
595
        }
596
        pDst->varmeta.offset[dstIdx + i] = pDst->varmeta.length + allLen;
977,687✔
597
        allLen += dataLen;
977,687✔
598
      }
599
    } else {
600
      for (int32_t i = 0; i < numOfRows; ++i) {
234,683✔
601
        char*   pData = colDataGetVarData(pSrc, srcIdx + i);
117,826✔
602
        int32_t dataLen = 0;
117,826✔
603
        if (pSrc->info.type == TSDB_DATA_TYPE_JSON) {
117,826✔
604
          dataLen = getJsonValueLen(pData);
42✔
605
        } else {
606
          dataLen = varDataTLen(pData);
117,784✔
607
        }
608
        pDst->varmeta.offset[dstIdx + i] = pDst->varmeta.length + allLen;
117,826✔
609
        allLen += dataLen;
117,826✔
610
      }
611
    }
612

613
    if (allLen > 0) {
1,311,218✔
614
      // copy data
615
      if (pDst->varmeta.allocLen < pDst->varmeta.length + allLen) {
1,092,691✔
616
        char* tmp = taosMemoryRealloc(pDst->pData, pDst->varmeta.length + allLen);
1,091,427!
617
        if (tmp == NULL) {
1,091,427!
618
          return terrno;
×
619
        }
620

621
        pDst->pData = tmp;
1,091,427✔
622
        pDst->varmeta.allocLen = pDst->varmeta.length + allLen;
1,091,427✔
623
      }
624
      if (pSrc->hasNull) {
1,092,691✔
625
        memcpy(pDst->pData + pDst->varmeta.length, srcAddr, allLen);
975,869✔
626
      } else {
627
        memcpy(pDst->pData + pDst->varmeta.length, colDataGetVarData(pSrc, srcIdx), allLen);
116,822✔
628
      }
629
      pDst->varmeta.length = pDst->varmeta.length + allLen;
1,092,691✔
630
    }
631
  } else {
632
    if (pSrc->hasNull) {
6,513,844✔
633
      if (BitPos(dstIdx) == BitPos(srcIdx)) {
5,010,560✔
634
        for (int32_t i = 0; i < numOfRows; ++i) {
4,468,598✔
635
          if (0 == BitPos(dstIdx) && (i + (1 << NBIT) <= numOfRows)) {
3,594,122✔
636
            BMCharPos(pDst->nullbitmap, dstIdx + i) = BMCharPos(pSrc->nullbitmap, srcIdx + i);
704,452✔
637
            if (BMCharPos(pDst->nullbitmap, dstIdx + i)) {
704,452!
638
              pDst->hasNull = true;
×
639
            }
640
            i += (1 << NBIT) - 1;
704,452✔
641
          } else {
642
            if (colDataIsNull_f(pSrc->nullbitmap, srcIdx + i)) {
2,889,670✔
643
              colDataSetNull_f(pDst->nullbitmap, dstIdx + i);
76,147✔
644
              pDst->hasNull = true;
76,147✔
645
            } else {
646
              colDataClearNull_f(pDst->nullbitmap, dstIdx + i);
2,813,523✔
647
            }
648
          }
649
        }
650
      } else {
651
        for (int32_t i = 0; i < numOfRows; ++i) {
13,031,675✔
652
          if (colDataIsNull_f(pSrc->nullbitmap, srcIdx + i)) {
8,895,591✔
653
            colDataSetNull_f(pDst->nullbitmap, dstIdx + i);
486,217✔
654
            pDst->hasNull = true;
486,217✔
655
          } else {
656
            colDataClearNull_f(pDst->nullbitmap, dstIdx + i);
8,409,374✔
657
          }
658
        }
659
      }
660
    }
661

662
    if (pSrc->pData != NULL) {
6,513,844!
663
      memcpy(pDst->pData + pDst->info.bytes * dstIdx, pSrc->pData + pSrc->info.bytes * srcIdx,
6,513,865✔
664
             pDst->info.bytes * numOfRows);
6,513,865✔
665
    }
666
  }
667

668
  return 0;
7,825,062✔
669
}
670

671
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSize(pBlock->pDataBlock); }
699,655,665✔
672

673
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; }
23,861,303✔
674

675
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) {
27,955,711✔
676
  if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0) {
27,955,711!
677
    return 0;
1,663,178✔
678
  }
679

680
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
26,292,533✔
681
  if (numOfCols <= 0) {
26,284,282!
682
    return -1;
×
683
  }
684

685
  int32_t index = (tsColumnIndex == -1) ? 0 : tsColumnIndex;
26,284,282!
686

687
  SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index);
26,284,282✔
688
  if (pColInfoData == NULL) {
26,271,523!
689
    return 0;
×
690
  }
691

692
  if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
26,271,523✔
693
    return 0;
8,257,940✔
694
  }
695

696
  TSKEY skey = *(TSKEY*)colDataGetData(pColInfoData, 0);
18,013,583!
697
  TSKEY ekey = *(TSKEY*)colDataGetData(pColInfoData, (pDataBlock->info.rows - 1));
18,013,583!
698

699
  pDataBlock->info.window.skey = TMIN(skey, ekey);
18,013,583✔
700
  pDataBlock->info.window.ekey = TMAX(skey, ekey);
18,013,583✔
701

702
  return 0;
18,013,583✔
703
}
704

705
int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, bool asc) {
10,505,067✔
706
  if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0 || pkColumnIndex == -1) {
10,505,067!
707
    return 0;
9,555,974✔
708
  }
709

710
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
949,093✔
711
  if (numOfCols <= 0) {
948,960!
712
    return -1;
×
713
  }
714

715
  SDataBlockInfo*  pInfo = &pDataBlock->info;
948,960✔
716
  SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pkColumnIndex);
948,960✔
717
  if (pColInfoData == NULL) {
948,872✔
718
    return terrno;
24✔
719
  }
720

721
  if (!IS_NUMERIC_TYPE(pColInfoData->info.type) && (pColInfoData->info.type != TSDB_DATA_TYPE_VARCHAR)) {
948,848!
722
    return 0;
×
723
  }
724

725
  void* skey = colDataGetData(pColInfoData, 0);
948,848!
726
  void* ekey = colDataGetData(pColInfoData, (pInfo->rows - 1));
948,848!
727

728
  int64_t val = 0;
948,848✔
729
  if (asc) {
948,848✔
730
    if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
608,988!
731
      GET_TYPED_DATA(val, int64_t, pColInfoData->info.type, skey, typeGetTypeModFromColInfo(&pColInfoData->info));
406,901!
732
      VALUE_SET_TRIVIAL_DATUM(&pInfo->pks[0], val);
406,753✔
733
      GET_TYPED_DATA(val, int64_t, pColInfoData->info.type, ekey, typeGetTypeModFromColInfo(&pColInfoData->info));
406,753!
734
      VALUE_SET_TRIVIAL_DATUM(&pInfo->pks[1], val);
406,780✔
735
    } else {  // todo refactor
736
      memcpy(pInfo->pks[0].pData, varDataVal(skey), varDataLen(skey));
202,087✔
737
      pInfo->pks[0].nData = varDataLen(skey);
202,087✔
738

739
      memcpy(pInfo->pks[1].pData, varDataVal(ekey), varDataLen(ekey));
202,087✔
740
      pInfo->pks[1].nData = varDataLen(ekey);
202,087✔
741
    }
742
  } else {
743
    if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
339,860!
744
      GET_TYPED_DATA(val, int64_t, pColInfoData->info.type, ekey, typeGetTypeModFromColInfo(&pColInfoData->info));
218,975!
745
      VALUE_SET_TRIVIAL_DATUM(&pInfo->pks[0], val);
218,709✔
746
      GET_TYPED_DATA(val, int64_t, pColInfoData->info.type, skey, typeGetTypeModFromColInfo(&pColInfoData->info));
218,709!
747
      VALUE_SET_TRIVIAL_DATUM(&pInfo->pks[1], val);
218,752✔
748
    } else {  // todo refactor
749
      memcpy(pInfo->pks[0].pData, varDataVal(ekey), varDataLen(ekey));
120,885✔
750
      pInfo->pks[0].nData = varDataLen(ekey);
120,885✔
751

752
      memcpy(pInfo->pks[1].pData, varDataVal(skey), varDataLen(skey));
120,885✔
753
      pInfo->pks[1].nData = varDataLen(skey);
120,885✔
754
    }
755
  }
756

757
  return 0;
948,504✔
758
}
759

760
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
6,680,499✔
761
  int32_t code = 0;
6,680,499✔
762
  int32_t capacity = pDest->info.capacity;
6,680,499✔
763
  size_t  numOfCols = taosArrayGetSize(pDest->pDataBlock);
6,680,499✔
764
  for (int32_t i = 0; i < numOfCols; ++i) {
28,615,811✔
765
    SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
21,932,013✔
766
    SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
21,926,732✔
767
    if (pCol1 == NULL || pCol2 == NULL) {
21,921,134!
768
      return terrno;
×
769
    }
770

771
    capacity = pDest->info.capacity;
21,922,040✔
772
    int32_t ret = colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows);
21,922,040✔
773
    if (ret < 0) {  // error occurs
21,935,069!
774
      code = ret;
×
775
      return code;
×
776
    }
777
  }
778

779
  pDest->info.capacity = capacity;
6,683,798✔
780
  pDest->info.rows += pSrc->info.rows;
6,683,798✔
781
  return code;
6,683,798✔
782
}
783

784
int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t srcIdx, int32_t numOfRows) {
2,016✔
785
  int32_t code = 0;
2,016✔
786
  if (pDest->info.rows + numOfRows > pDest->info.capacity) {
2,016!
787
    return TSDB_CODE_INVALID_PARA;
×
788
  }
789

790
  size_t numOfCols = taosArrayGetSize(pDest->pDataBlock);
2,016✔
791
  for (int32_t i = 0; i < numOfCols; ++i) {
6,783✔
792
    SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
4,767✔
793
    SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
4,767✔
794
    if (pCol2 == NULL || pCol1 == NULL) {
4,767!
795
      return terrno;
×
796
    }
797

798
    code = colDataAssignNRows(pCol2, pDest->info.rows, pCol1, srcIdx, numOfRows);
4,767✔
799
    if (code) {
4,767!
800
      return code;
×
801
    }
802
  }
803

804
  pDest->info.rows += numOfRows;
2,016✔
805
  return code;
2,016✔
806
}
807

808
void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) {
×
809
  if (numOfRows == 0) {
×
810
    return;
×
811
  }
812
  
813
  if (numOfRows >= pBlock->info.rows) {
×
814
    blockDataCleanup(pBlock);
×
815
    return;
×
816
  }
817

818
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
×
819
  for (int32_t i = 0; i < numOfCols; ++i) {
×
820
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
×
821
    if (pCol == NULL) {
×
822
      continue;
×
823
    }
824

825
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
×
826
      pCol->varmeta.length = pCol->varmeta.offset[pBlock->info.rows - numOfRows];
×
827
      memset(pCol->varmeta.offset + pBlock->info.rows - numOfRows, 0, sizeof(*pCol->varmeta.offset) * numOfRows);
×
828
    } else {
829
      int32_t i = pBlock->info.rows - numOfRows;
×
830
      for (; i < pBlock->info.rows; ++i) {
×
831
        if (BitPos(i)) {
×
832
          colDataClearNull_f(pCol->nullbitmap, i);
×
833
        } else {
834
          break;
×
835
        }
836
      }
837

838
      int32_t bytes = (pBlock->info.rows - i) / 8;
×
839
      memset(&BMCharPos(pCol->nullbitmap, i), 0, bytes);
×
840
      i += bytes * 8;
×
841

842
      for (; i < pBlock->info.rows; ++i) {
×
843
        colDataClearNull_f(pCol->nullbitmap, i);
×
844
      }
845
    }
846
  }
847

848
  pBlock->info.rows -= numOfRows;
×
849
}
850

851
size_t blockDataGetSize(const SSDataBlock* pBlock) {
334,763,763✔
852
  size_t total = 0;
334,763,763✔
853
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
334,763,763✔
854
  for (int32_t i = 0; i < numOfCols; ++i) {
1,601,802,947✔
855
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
1,270,481,939✔
856
    if (pColInfoData == NULL) {
1,266,484,892!
857
      continue;
×
858
    }
859

860
    total += colDataGetFullLength(pColInfoData, pBlock->info.rows);
1,266,484,892✔
861
  }
862

863
  return total;
331,321,008✔
864
}
865

866
// the number of tuples can be fit in one page.
867
// Actual data rows pluses the corresponding meta data must fit in one memory buffer of the given page size.
868
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
3,685,398✔
869
                           int32_t pageSize) {
870
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
3,685,398✔
871
  int32_t numOfRows = pBlock->info.rows;
3,685,389✔
872

873
  int32_t bitmapChar = 1;
3,685,389✔
874

875
  size_t headerSize = sizeof(int32_t);
3,685,389✔
876
  size_t colHeaderSize = sizeof(int32_t) * numOfCols;
3,685,389✔
877

878
  // TODO speedup by checking if the whole page can fit in firstly.
879
  if (!hasVarCol) {
3,685,389✔
880
    size_t  rowSize = blockDataGetRowSize(pBlock);
3,510,506✔
881
    int32_t capacity = blockDataGetCapacityInRow(pBlock, pageSize, headerSize + colHeaderSize);
3,510,506✔
882
    if (capacity <= 0) {
3,510,517!
883
      return terrno;
×
884
    }
885

886
    *stopIndex = startIndex + capacity - 1;
3,510,517✔
887
    if (*stopIndex >= numOfRows) {
3,510,517✔
888
      *stopIndex = numOfRows - 1;
797✔
889
    }
890

891
    return TSDB_CODE_SUCCESS;
3,510,517✔
892
  }
893
  // iterate the rows that can be fit in this buffer page
894
  int32_t size = (headerSize + colHeaderSize);
174,883✔
895
  for (int32_t j = startIndex; j < numOfRows; ++j) {
9,578,303✔
896
    for (int32_t i = 0; i < numOfCols; ++i) {
101,676,769✔
897
      SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, i);
92,098,678✔
898
      if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
92,098,678!
899
        if (pColInfoData->varmeta.offset[j] != -1) {
21,342,583!
900
          char* p = colDataGetData(pColInfoData, j);
21,342,583!
901
          size += varDataTLen(p);
21,342,583✔
902
        }
903

904
        size += sizeof(pColInfoData->varmeta.offset[0]);
21,342,583✔
905
      } else {
906
        size += pColInfoData->info.bytes;
70,756,095✔
907

908
        if (((j - startIndex) & 0x07) == 0) {
70,756,095✔
909
          size += 1;  // the space for null bitmap
9,099,865✔
910
        }
911
      }
912
    }
913

914
    if (size > pageSize) {  // pageSize must be able to hold one row
9,578,091✔
915
      *stopIndex = j - 1;
174,671✔
916
      if (*stopIndex < startIndex) {
174,671!
917
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
918
      }
919

920
      return TSDB_CODE_SUCCESS;
174,671✔
921
    }
922
  }
923

924
  // all fit in
925
  *stopIndex = numOfRows - 1;
212✔
926
  return TSDB_CODE_SUCCESS;
212✔
927
}
928

929
int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount, SSDataBlock** pResBlock) {
3,738,709✔
930
  int32_t code = 0;
3,738,709✔
931
  QRY_PARAM_CHECK(pResBlock);
3,738,709!
932

933
  if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) {
3,738,709!
934
    return TSDB_CODE_INVALID_PARA;
×
935
  }
936

937
  SSDataBlock* pDst = NULL;
3,738,722✔
938
  code = createOneDataBlock(pBlock, false, &pDst);
3,738,722✔
939
  if (code) {
3,738,701!
940
    return code;
×
941
  }
942

943
  code = blockDataEnsureCapacity(pDst, rowCount);
3,738,701✔
944
  if (code) {
3,738,731!
945
    blockDataDestroy(pDst);
×
946
    return code;
×
947
  }
948

949
  /* may have disorder varchar data, TODO
950
    for (int32_t i = 0; i < numOfCols; ++i) {
951
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
952
      SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
953

954
      colDataAssignNRows(pDstCol, 0, pColData, startIndex, rowCount);
955
    }
956
  */
957

958
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
3,738,731✔
959
  for (int32_t i = 0; i < numOfCols; ++i) {
16,301,001✔
960
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
12,405,951✔
961
    SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
12,405,892✔
962
    if (pColData == NULL || pDstCol == NULL) {
12,405,822!
963
      continue;
×
964
    }
965

966
    for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
2,147,483,647✔
967
      bool isNull = false;
2,147,483,647✔
968
      if (pBlock->pBlockAgg == NULL) {
2,147,483,647!
969
        isNull = colDataIsNull_s(pColData, j);
2,147,483,647✔
970
      } else {
971
        isNull = colDataIsNull(pColData, pBlock->info.rows, j, &pBlock->pBlockAgg[i]);
×
972
      }
973

974
      if (isNull) {
2,147,483,647✔
975
        colDataSetNULL(pDstCol, j - startIndex);
20,997,185✔
976
      } else {
977
        char* p = colDataGetData(pColData, j);
2,147,483,647!
978
        code = colDataSetVal(pDstCol, j - startIndex, p, false);
2,147,483,647✔
979
        if (code) {
2,147,483,647!
980
          break;
×
981
        }
982
      }
983
    }
984
  }
985

986
  pDst->info.rows = rowCount;
3,895,050✔
987
  *pResBlock = pDst;
3,895,050✔
988
  return code;
3,895,050✔
989
}
990

991
/**
992
 *
993
 * +------------------+---------------------------------------------+
994
 * |the number of rows|                    column #1                |
995
 * |    (4 bytes)     |------------+-----------------------+--------+
996
 * |                  | null bitmap| column length(4bytes) | values |
997
 * +------------------+------------+-----------------------+--------+
998
 * @param buf
999
 * @param pBlock
1000
 * @return
1001
 */
1002
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
6,061,021✔
1003
  // write the number of rows
1004
  *(uint32_t*)buf = pBlock->info.rows;
6,061,021✔
1005

1006
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
6,061,021✔
1007
  int32_t numOfRows = pBlock->info.rows;
6,060,869✔
1008

1009
  char* pStart = buf + sizeof(uint32_t);
6,060,869✔
1010

1011
  for (int32_t i = 0; i < numOfCols; ++i) {
29,752,025✔
1012
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
23,693,156✔
1013
    if (pCol == NULL) {
23,689,241!
1014
      continue;
×
1015
    }
1016

1017
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
23,689,241!
1018
      memcpy(pStart, pCol->varmeta.offset, numOfRows * sizeof(int32_t));
1,702,441✔
1019
      pStart += numOfRows * sizeof(int32_t);
1,702,441✔
1020
    } else {
1021
      memcpy(pStart, pCol->nullbitmap, BitmapLen(numOfRows));
21,986,800✔
1022
      pStart += BitmapLen(pBlock->info.rows);
21,986,800✔
1023
    }
1024

1025
    uint32_t dataSize = colDataGetLength(pCol, numOfRows);
23,689,241✔
1026

1027
    *(int32_t*)pStart = dataSize;
23,691,156✔
1028
    pStart += sizeof(int32_t);
23,691,156✔
1029

1030
    if (pCol->reassigned && IS_VAR_DATA_TYPE(pCol->info.type)) {
23,691,156!
1031
      for (int32_t row = 0; row < numOfRows; ++row) {
×
1032
        char*   pColData = pCol->pData + pCol->varmeta.offset[row];
×
1033
        int32_t colSize = 0;
×
1034
        if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
×
1035
          colSize = getJsonValueLen(pColData);
×
1036
        } else {
1037
          colSize = varDataTLen(pColData);
×
1038
        }
1039
        memcpy(pStart, pColData, colSize);
×
1040
        pStart += colSize;
×
1041
      }
1042
    } else {
1043
      if (dataSize != 0) {
23,691,156✔
1044
        // ubsan reports error if pCol->pData==NULL && dataSize==0
1045
        memcpy(pStart, pCol->pData, dataSize);
23,664,563✔
1046
      }
1047
      pStart += dataSize;
23,691,156✔
1048
    }
1049
  }
1050

1051
  return 0;
6,058,869✔
1052
}
1053

1054
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
5,541,225✔
1055
  int32_t numOfRows = *(int32_t*)buf;
5,541,225✔
1056
  if (numOfRows == 0) {
5,541,225!
1057
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1058
  }
1059
  int32_t code = blockDataEnsureCapacity(pBlock, numOfRows);
5,541,225✔
1060
  if (code) {
5,541,252!
1061
    return code;
×
1062
  }
1063

1064
  pBlock->info.rows = numOfRows;
5,541,252✔
1065
  size_t      numOfCols = taosArrayGetSize(pBlock->pDataBlock);
5,541,252✔
1066
  const char* pStart = buf + sizeof(uint32_t);
5,541,084✔
1067

1068
  for (int32_t i = 0; i < numOfCols; ++i) {
27,260,346✔
1069
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
21,722,519✔
1070
    if (pCol == NULL) {
21,719,258!
1071
      continue;
×
1072
    }
1073

1074
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
21,719,258!
1075
      size_t metaSize = pBlock->info.rows * sizeof(int32_t);
1,558,816✔
1076
      memcpy(pCol->varmeta.offset, pStart, metaSize);
1,558,816✔
1077
      pStart += metaSize;
1,558,816✔
1078
    } else {
1079
      memcpy(pCol->nullbitmap, pStart, BitmapLen(pBlock->info.rows));
20,160,442✔
1080
      pStart += BitmapLen(pBlock->info.rows);
20,160,442✔
1081
    }
1082

1083
    int32_t colLength = *(int32_t*)pStart;
21,719,258✔
1084
    pStart += sizeof(int32_t);
21,719,258✔
1085

1086
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
21,719,258!
1087
      if (pCol->varmeta.allocLen < colLength) {
1,558,699✔
1088
        char* tmp = taosMemoryRealloc(pCol->pData, colLength);
257,460!
1089
        if (tmp == NULL) {
257,464!
1090
          return terrno;
×
1091
        }
1092

1093
        pCol->pData = tmp;
257,464✔
1094
        pCol->varmeta.allocLen = colLength;
257,464✔
1095
      }
1096

1097
      pCol->varmeta.length = colLength;
1,558,703✔
1098
      if (pCol->varmeta.length > pCol->varmeta.allocLen) {
1,558,703!
1099
        return TSDB_CODE_FAILED;
×
1100
      }
1101
    }
1102
    if (colLength != 0) {
21,719,262✔
1103
      // ubsan reports error if colLength==0 && pCol->pData == 0
1104
      memcpy(pCol->pData, pStart, colLength);
21,692,487✔
1105
    }
1106
    pStart += colLength;
21,719,262✔
1107
  }
1108

1109
  return TSDB_CODE_SUCCESS;
5,537,827✔
1110
}
1111

1112
static bool colDataIsNNull(const SColumnInfoData* pColumnInfoData, int32_t startIndex, uint32_t nRows) {
7,140,432✔
1113
  if (!pColumnInfoData->hasNull) {
7,140,432!
1114
    return false;
×
1115
  }
1116

1117
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
7,140,432!
1118
    for (int32_t i = startIndex; i < nRows; ++i) {
5,611,362✔
1119
      if (!colDataIsNull_var(pColumnInfoData, i)) {
5,571,351✔
1120
        return false;
2,151,009✔
1121
      }
1122
    }
1123
  } else {
1124
    if (pColumnInfoData->nullbitmap == NULL) {
4,949,412!
1125
      return false;
×
1126
    }
1127

1128
    for (int32_t i = startIndex; i < nRows; ++i) {
10,598,972✔
1129
      if (!colDataIsNull_f(pColumnInfoData->nullbitmap, i)) {
10,343,697✔
1130
        return false;
4,694,137✔
1131
      }
1132
    }
1133
  }
1134

1135
  return true;
295,286✔
1136
}
1137

1138
// todo remove this
1139
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) {
1,246,510✔
1140
  pBlock->info.rows = *(int32_t*)buf;
1,246,510✔
1141
  pBlock->info.id.groupId = *(uint64_t*)(buf + sizeof(int32_t));
1,246,510✔
1142

1143
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1,246,510✔
1144

1145
  const char* pStart = buf + sizeof(uint32_t) + sizeof(uint64_t);
1,246,391✔
1146

1147
  for (int32_t i = 0; i < numOfCols; ++i) {
8,389,388✔
1148
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
7,144,574✔
1149
    if (pCol == NULL) {
7,141,288!
1150
      continue;
×
1151
    }
1152

1153
    pCol->hasNull = true;
7,141,288✔
1154

1155
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
7,141,288!
1156
      size_t metaSize = capacity * sizeof(int32_t);
2,193,000✔
1157
      memcpy(pCol->varmeta.offset, pStart, metaSize);
2,193,000✔
1158
      pStart += metaSize;
2,193,000✔
1159
    } else {
1160
      memcpy(pCol->nullbitmap, pStart, BitmapLen(capacity));
4,948,288✔
1161
      pStart += BitmapLen(capacity);
4,948,288✔
1162
    }
1163

1164
    int32_t colLength = *(int32_t*)pStart;
7,141,288✔
1165
    pStart += sizeof(int32_t);
7,141,288✔
1166

1167
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
7,141,288!
1168
      if (pCol->varmeta.allocLen < colLength) {
2,191,585✔
1169
        char* tmp = taosMemoryRealloc(pCol->pData, colLength);
14,220!
1170
        if (tmp == NULL) {
14,221!
1171
          return terrno;
×
1172
        }
1173

1174
        pCol->pData = tmp;
14,221✔
1175
        pCol->varmeta.allocLen = colLength;
14,221✔
1176
      }
1177

1178
      pCol->varmeta.length = colLength;
2,191,586✔
1179
      if (pCol->varmeta.length > pCol->varmeta.allocLen) {
2,191,586!
1180
        return TSDB_CODE_FAILED;
×
1181
      }
1182
    }
1183

1184
    if (!colDataIsNNull(pCol, 0, pBlock->info.rows)) {
7,141,289✔
1185
      memcpy(pCol->pData, pStart, colLength);
6,830,587✔
1186
    }
1187

1188
    pStart += pCol->info.bytes * capacity;
7,142,997✔
1189
  }
1190

1191
  return TSDB_CODE_SUCCESS;
1,244,814✔
1192
}
1193

1194
size_t blockDataGetRowSize(SSDataBlock* pBlock) {
258,292,469✔
1195
  if (pBlock->info.rowSize == 0) {
258,292,469!
1196
    size_t rowSize = 0;
×
1197

1198
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
×
1199
    for (int32_t i = 0; i < numOfCols; ++i) {
×
1200
      SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
×
1201
      if (pColInfo == NULL) {
×
1202
        continue;
×
1203
      }
1204

1205
      rowSize += pColInfo->info.bytes;
×
1206
    }
1207

1208
    pBlock->info.rowSize = rowSize;
×
1209
  }
1210

1211
  return pBlock->info.rowSize;
258,292,469✔
1212
}
1213

1214
/**
1215
 * @refitem blockDataToBuf for the meta size
1216
 * @param pBlock
1217
 * @return
1218
 */
1219
size_t blockDataGetSerialMetaSize(uint32_t numOfCols) {
51,790,507✔
1220
  // | version | total length | total rows | blankFull | total columns | flag seg| block group id | column schema
1221
  // | each column length
1222
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(bool) + sizeof(int32_t) + sizeof(int32_t) +
1223
         sizeof(uint64_t) + numOfCols * (sizeof(int8_t) + sizeof(int32_t)) + numOfCols * sizeof(int32_t);
51,790,507✔
1224
}
1225

1226
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
1,249,389✔
1227
  double rowSize = 0;
1,249,389✔
1228

1229
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1,249,389✔
1230
  for (int32_t i = 0; i < numOfCols; ++i) {
6,475,262✔
1231
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
5,225,997✔
1232
    if (pColInfo == NULL) {
5,225,889!
1233
      continue;
×
1234
    }
1235

1236
    rowSize += pColInfo->info.bytes;
5,225,889✔
1237
    if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
5,225,889!
1238
      rowSize += sizeof(int32_t);
248,485✔
1239
    } else {
1240
      rowSize += 1 / 8.0;  // one bit for each record
4,977,404✔
1241
    }
1242
  }
1243

1244
  return rowSize;
1,249,265✔
1245
}
1246

1247
typedef struct SSDataBlockSortHelper {
1248
  SArray*      orderInfo;  // SArray<SBlockOrderInfo>
1249
  SSDataBlock* pDataBlock;
1250
} SSDataBlockSortHelper;
1251

1252
int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
2,147,483,647✔
1253
  const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*)param;
2,147,483,647✔
1254

1255
  SSDataBlock* pDataBlock = pHelper->pDataBlock;
2,147,483,647✔
1256

1257
  int32_t left = *(int32_t*)p1;
2,147,483,647✔
1258
  int32_t right = *(int32_t*)p2;
2,147,483,647✔
1259

1260
  SArray* pInfo = pHelper->orderInfo;
2,147,483,647✔
1261

1262
  for (int32_t i = 0; i < pInfo->size; ++i) {
2,147,483,647✔
1263
    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
2,147,483,647✔
1264
    SColumnInfoData* pColInfoData = pOrder->pColData;  // TARRAY_GET_ELEM(pDataBlock->pDataBlock, pOrder->colIndex);
2,147,483,647✔
1265

1266
    if (pColInfoData->hasNull) {
2,147,483,647✔
1267
      bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, NULL);
2,147,483,647!
1268
      bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, right, NULL);
2,147,483,647!
1269
      if (leftNull && rightNull) {
2,147,483,647✔
1270
        continue;  // continue to next slot
2,147,483,647✔
1271
      }
1272

1273
      if (rightNull) {
2,147,483,647✔
1274
        return pOrder->nullFirst ? 1 : -1;
25,176,575✔
1275
      }
1276

1277
      if (leftNull) {
2,147,483,647✔
1278
        return pOrder->nullFirst ? -1 : 1;
24,395,631✔
1279
      }
1280
    }
1281

1282
    void* left1 = colDataGetData(pColInfoData, left);
2,147,483,647!
1283
    void* right1 = colDataGetData(pColInfoData, right);
2,147,483,647!
1284
    if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
1285
      if (tTagIsJson(left1) || tTagIsJson(right1)) {
1,056!
1286
        terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
37,997,465✔
1287
        return 0;
×
1288
      }
1289
    }
1290

1291
    __compar_fn_t fn;
1292
    if (pOrder->compFn) {
2,147,483,647!
1293
      fn = pOrder->compFn;
2,147,483,647✔
1294
    } else {
1295
      fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
×
1296
    }
1297

1298
    int ret = fn(left1, right1);
2,147,483,647✔
1299
    if (ret == 0) {
2,147,483,647✔
1300
      continue;
2,147,483,647✔
1301
    } else {
1302
      return ret;
1,943,313,706✔
1303
    }
1304
  }
1305

1306
  return 0;
2,147,483,647✔
1307
}
1308

1309
static void blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, const int32_t* index) {
600,751✔
1310
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
600,751✔
1311
  for (int32_t i = 0; i < numOfCols; ++i) {
2,504,661✔
1312
    SColumnInfoData* pDst = &pCols[i];
1,903,982✔
1313
    SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
1,903,982✔
1314
    if (pSrc == NULL) {
1,903,915!
1315
      continue;
×
1316
    }
1317

1318
    if (IS_VAR_DATA_TYPE(pSrc->info.type)) {
1,903,915✔
1319
      if (pSrc->varmeta.length != 0) {
258,650✔
1320
        memcpy(pDst->pData, pSrc->pData, pSrc->varmeta.length);
245,432✔
1321
      }
1322
      pDst->varmeta.length = pSrc->varmeta.length;
258,650✔
1323

1324
      for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
89,374,205✔
1325
        pDst->varmeta.offset[j] = pSrc->varmeta.offset[index[j]];
89,115,555✔
1326
      }
1327
    } else {
1328
      for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
2,147,483,647✔
1329
        if (colDataIsNull_f(pSrc->nullbitmap, index[j])) {
2,147,483,647✔
1330
          colDataSetNull_f_s(pDst, j);
1,002,101,720✔
1331
          continue;
1,002,101,720✔
1332
        }
1333
        memcpy(pDst->pData + j * pDst->info.bytes, pSrc->pData + index[j] * pDst->info.bytes, pDst->info.bytes);
2,147,483,647✔
1334
      }
1335
    }
1336
  }
1337
}
600,679✔
1338

1339
static int32_t createHelpColInfoData(const SSDataBlock* pDataBlock, SColumnInfoData** ppCols) {
600,733✔
1340
  int32_t code = 0;
600,733✔
1341
  int32_t rows = pDataBlock->info.capacity;
600,733✔
1342
  size_t  numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
600,733✔
1343
  int32_t i = 0;
600,736✔
1344

1345
  *ppCols = NULL;
600,736✔
1346

1347
  SColumnInfoData* pCols = taosMemoryCalloc(numOfCols, sizeof(SColumnInfoData));
600,736!
1348
  if (pCols == NULL) {
600,741!
1349
    return terrno;
×
1350
  }
1351

1352
  for (i = 0; i < numOfCols; ++i) {
2,504,867✔
1353
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
1,904,113✔
1354
    if (pColInfoData == NULL) {
1,904,050!
1355
      continue;
×
1356
    }
1357

1358
    pCols[i].info = pColInfoData->info;
1,904,050✔
1359
    if (IS_VAR_DATA_TYPE(pCols[i].info.type)) {
1,904,050✔
1360
      pCols[i].varmeta.offset = taosMemoryCalloc(rows, sizeof(int32_t));
258,679!
1361
      pCols[i].pData = taosMemoryCalloc(1, pColInfoData->varmeta.length);
258,734!
1362
      if (pCols[i].varmeta.offset == NULL || pCols[i].pData == NULL) {
258,728!
1363
        code = terrno;
4✔
1364
        taosMemoryFree(pCols[i].varmeta.offset);
×
1365
        taosMemoryFree(pCols[i].pData);
×
1366
        goto _error;
×
1367
      }
1368

1369
      pCols[i].varmeta.length = pColInfoData->varmeta.length;
258,724✔
1370
      pCols[i].varmeta.allocLen = pCols[i].varmeta.length;
258,724✔
1371
    } else {
1372
      pCols[i].nullbitmap = taosMemoryCalloc(1, BitmapLen(rows));
1,645,371✔
1373
      pCols[i].pData = taosMemoryCalloc(rows, pCols[i].info.bytes);
1,645,475!
1374
      if (pCols[i].nullbitmap == NULL || pCols[i].pData == NULL) {
1,645,464!
1375
        code = terrno;
62✔
1376
        taosMemoryFree(pCols[i].nullbitmap);
×
1377
        taosMemoryFree(pCols[i].pData);
×
1378
        goto _error;
×
1379
      }
1380
    }
1381
  }
1382

1383
  *ppCols = pCols;
600,754✔
1384
  return code;
600,754✔
1385

1386
  _error:
×
1387
  for(int32_t j = 0; j < i; ++j) {
×
1388
    colDataDestroy(&pCols[j]);
×
1389
  }
1390

1391
  taosMemoryFree(pCols);
×
1392
  return code;
×
1393
}
1394

1395
static void copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) {
600,745✔
1396
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
600,745✔
1397

1398
  for (int32_t i = 0; i < numOfCols; ++i) {
2,504,910✔
1399
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
1,904,118✔
1400
    if (pColInfoData == NULL) {
1,904,069!
1401
      continue;
×
1402
    }
1403

1404
    pColInfoData->info = pCols[i].info;
1,904,069✔
1405
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
1,904,069✔
1406
      taosMemoryFreeClear(pColInfoData->varmeta.offset);
258,696!
1407
      pColInfoData->varmeta = pCols[i].varmeta;
258,697✔
1408
    } else {
1409
      taosMemoryFreeClear(pColInfoData->nullbitmap);
1,645,373!
1410
      pColInfoData->nullbitmap = pCols[i].nullbitmap;
1,645,425✔
1411
    }
1412

1413
    taosMemoryFreeClear(pColInfoData->pData);
1,904,122!
1414
    pColInfoData->pData = pCols[i].pData;
1,904,168✔
1415
  }
1416

1417
  taosMemoryFreeClear(pCols);
600,792!
1418
}
600,793✔
1419

1420
static int32_t* createTupleIndex(size_t rows) {
600,700✔
1421
  int32_t* index = taosMemoryCalloc(rows, sizeof(int32_t));
600,700!
1422
  if (index == NULL) {
600,749!
1423
    return NULL;
×
1424
  }
1425

1426
  for (int32_t i = 0; i < rows; ++i) {
2,147,483,647✔
1427
    index[i] = i;
2,147,483,647✔
1428
  }
1429

1430
  return index;
600,749✔
1431
}
1432

1433
static void destroyTupleIndex(int32_t* index) { taosMemoryFreeClear(index); }
600,747!
1434

1435
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
690,910✔
1436
  if (pDataBlock->info.rows <= 1) {
690,910✔
1437
    return TSDB_CODE_SUCCESS;
90,129✔
1438
  }
1439

1440
  // Allocate the additional buffer.
1441
  uint32_t rows = pDataBlock->info.rows;
600,781✔
1442

1443
  bool sortColumnHasNull = false;
600,781✔
1444
  bool varTypeSort = false;
600,781✔
1445

1446
  for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
1,312,345✔
1447
    SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
711,579✔
1448
    if (pInfo == NULL) {
711,573!
1449
      continue;
×
1450
    }
1451

1452
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
711,573✔
1453
    if (pColInfoData == NULL) {
711,564!
1454
      continue;
×
1455
    }
1456

1457
    if (pColInfoData->hasNull) {
711,564✔
1458
      sortColumnHasNull = true;
711,036✔
1459
    }
1460

1461
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
711,564✔
1462
      varTypeSort = true;
135,782✔
1463
    }
1464
  }
1465

1466
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
600,730✔
1467

1468
  if (taosArrayGetSize(pOrderInfo) == 1 && (!sortColumnHasNull)) {
600,773✔
1469
    if (numOfCols == 1) {
485✔
1470
      if (!varTypeSort) {
55!
1471
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, 0);
55✔
1472
        SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, 0);
55✔
1473
        if (pColInfoData == NULL || pOrder == NULL) {
55!
1474
          return terrno;
×
1475
        }
1476

1477
        int64_t p0 = taosGetTimestampUs();
55✔
1478

1479
        __compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
55✔
1480
        taosSort(pColInfoData->pData, pDataBlock->info.rows, pColInfoData->info.bytes, fn);
55✔
1481

1482
        int64_t p1 = taosGetTimestampUs();
55✔
1483
        uDebug("blockDataSort easy cost:%" PRId64 ", rows:%" PRId64 "\n", p1 - p0, pDataBlock->info.rows);
55!
1484

1485
        return TSDB_CODE_SUCCESS;
55✔
1486
      } else {  // var data type
1487
      }
1488
    } else if (numOfCols == 2) {
1489
    }
1490
  }
1491

1492
  int32_t* index = createTupleIndex(rows);
600,718✔
1493
  if (index == NULL) {
600,735!
1494
    return terrno;
×
1495
  }
1496

1497
  int64_t p0 = taosGetTimestampUs();
600,742✔
1498

1499
  SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
600,742✔
1500
  for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) {
1,312,240✔
1501
    struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i);
711,518✔
1502
    if (pInfo == NULL) {
711,504!
1503
      continue;
×
1504
    }
1505

1506
    pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
711,504✔
1507
    if (pInfo->pColData == NULL) {
711,506!
1508
      continue;
×
1509
    }
1510
    pInfo->compFn = getKeyComparFunc(pInfo->pColData->info.type, pInfo->order);
711,506✔
1511
  }
1512

1513
  terrno = 0;
600,684✔
1514
  taosqsort_r(index, rows, sizeof(int32_t), &helper, dataBlockCompar);
600,721✔
1515
  if (terrno) return terrno;
600,743!
1516

1517
  int64_t p1 = taosGetTimestampUs();
600,744✔
1518

1519
  SColumnInfoData* pCols = NULL;
600,744✔
1520
  int32_t code = createHelpColInfoData(pDataBlock, &pCols);
600,744✔
1521
  if (code != 0) {
600,750!
1522
    destroyTupleIndex(index);
×
1523
    return code;
×
1524
  }
1525

1526
  int64_t p2 = taosGetTimestampUs();
600,750✔
1527
  blockDataAssign(pCols, pDataBlock, index);
600,750✔
1528

1529
  int64_t p3 = taosGetTimestampUs();
600,747✔
1530
  copyBackToBlock(pDataBlock, pCols);
600,747✔
1531

1532
  int64_t p4 = taosGetTimestampUs();
600,748✔
1533
  uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64
600,748✔
1534
         ", rows:%d\n",
1535
         p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows);
1536

1537
  destroyTupleIndex(index);
600,748✔
1538
  return TSDB_CODE_SUCCESS;
600,755✔
1539
}
1540

1541
void blockDataCleanup(SSDataBlock* pDataBlock) {
132,719,576✔
1542
  blockDataEmpty(pDataBlock);
132,719,576✔
1543
  SDataBlockInfo* pInfo = &pDataBlock->info;
132,647,796✔
1544
  pInfo->id.uid = 0;
132,647,796✔
1545
  pInfo->id.groupId = 0;
132,647,796✔
1546
}
132,647,796✔
1547

1548
void blockDataEmpty(SSDataBlock* pDataBlock) {
133,211,923✔
1549
  SDataBlockInfo* pInfo = &pDataBlock->info;
133,211,923✔
1550
  if (pInfo->capacity == 0) {
133,211,923✔
1551
    return;
13,490,107✔
1552
  }
1553

1554
  taosMemoryFreeClear(pDataBlock->pBlockAgg);
119,721,816!
1555

1556
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
119,721,817✔
1557
  for (int32_t i = 0; i < numOfCols; ++i) {
489,827,506✔
1558
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
370,360,702✔
1559
    if (p == NULL) {
369,555,271!
1560
      continue;
×
1561
    }
1562

1563
    colInfoDataCleanup(p, pInfo->capacity);
369,555,271✔
1564
  }
1565

1566
  pInfo->rows = 0;
119,466,804✔
1567
  pInfo->dataLoad = 0;
119,466,804✔
1568
  pInfo->window.ekey = 0;
119,466,804✔
1569
  pInfo->window.skey = 0;
119,466,804✔
1570
}
1571

1572
void blockDataReset(SSDataBlock* pDataBlock) {
×
1573
  SDataBlockInfo* pInfo = &pDataBlock->info;
×
1574
  if (pInfo->capacity == 0) {
×
1575
    return;
×
1576
  }
1577

1578
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
×
1579
  for (int32_t i = 0; i < numOfCols; ++i) {
×
1580
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
×
1581
    if (p == NULL) {
×
1582
      continue;
×
1583
    }
1584

1585
    p->hasNull = false;
×
1586
    p->reassigned = false;
×
1587
    if (IS_VAR_DATA_TYPE(p->info.type)) {
×
1588
      p->varmeta.length = 0;
×
1589
    }
1590
  }
1591

1592
  pInfo->rows = 0;
×
1593
  pInfo->dataLoad = 0;
×
1594
  pInfo->window.ekey = 0;
×
1595
  pInfo->window.skey = 0;
×
1596
  pInfo->id.uid = 0;
×
1597
  pInfo->id.groupId = 0;
×
1598
}
1599

1600
/*
1601
 * NOTE: the type of the input column may be TSDB_DATA_TYPE_NULL, which is used to denote
1602
 * the all NULL value in this column. It is an internal representation of all NULL value column, and no visible to
1603
 * any users. The length of TSDB_DATA_TYPE_NULL is 0, and it is an special case.
1604
 */
1605
int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows,
375,114,025✔
1606
                         bool clearPayload) {
1607
  if ((numOfRows <= 0)|| (pBlockInfo && numOfRows <= pBlockInfo->capacity)) {
375,114,025!
1608
    return TSDB_CODE_SUCCESS;
×
1609
  }
1610

1611
  int32_t existedRows = pBlockInfo ? pBlockInfo->rows : 0;
375,114,025!
1612

1613
  if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
375,114,025!
1614
    char* tmp = taosMemoryRealloc(pColumn->varmeta.offset, sizeof(int32_t) * numOfRows);
75,904,960!
1615
    if (tmp == NULL) {
77,700,775!
1616
      return terrno;
×
1617
    }
1618

1619
    pColumn->varmeta.offset = (int32_t*)tmp;
77,700,775✔
1620
    memset(&pColumn->varmeta.offset[existedRows], 0, sizeof(int32_t) * (numOfRows - existedRows));
77,700,775✔
1621
  } else {
1622
    // prepare for the null bitmap
1623
    char* tmp = taosMemoryRealloc(pColumn->nullbitmap, BitmapLen(numOfRows));
299,209,065✔
1624
    if (tmp == NULL) {
300,910,958!
1625
      return terrno;
×
1626
    }
1627

1628
    int32_t oldLen = BitmapLen(existedRows);
300,910,958✔
1629
    pColumn->nullbitmap = tmp;
300,910,958✔
1630
    memset(&pColumn->nullbitmap[oldLen], 0, BitmapLen(numOfRows) - oldLen);
300,910,958✔
1631
    if (pColumn->info.bytes == 0) {
300,910,958!
1632
      return TSDB_CODE_INVALID_PARA;
×
1633
    }
1634

1635
    // here we employ the aligned malloc function, to make sure that the address of allocated memory is aligned
1636
    // to MALLOC_ALIGN_BYTES
1637
    tmp = taosMemoryMallocAlign(MALLOC_ALIGN_BYTES, numOfRows * pColumn->info.bytes);
300,910,958✔
1638
    if (tmp == NULL) {
301,449,291!
1639
      return terrno;
×
1640
    }
1641
    // memset(tmp, 0, numOfRows * pColumn->info.bytes);
1642

1643
    // copy back the existed data
1644
    if (pColumn->pData != NULL) {
301,449,291✔
1645
      memcpy(tmp, pColumn->pData, existedRows * pColumn->info.bytes);
42,576,258✔
1646
      taosMemoryFreeClear(pColumn->pData);
42,576,258!
1647
    }
1648

1649
    pColumn->pData = tmp;
301,452,401✔
1650

1651
    // check if the allocated memory is aligned to the requried bytes.
1652
#if defined LINUX
1653
    if ((((uint64_t)pColumn->pData) & (MALLOC_ALIGN_BYTES - 1)) != 0x0) {
301,452,401!
1654
      return TSDB_CODE_FAILED;
×
1655
    }
1656
#endif
1657

1658
    if (clearPayload) {
301,452,401✔
1659
      memset(tmp + pColumn->info.bytes * existedRows, 0, pColumn->info.bytes * (numOfRows - existedRows));
66,015,730✔
1660
    }
1661
  }
1662

1663
  return TSDB_CODE_SUCCESS;
379,153,176✔
1664
}
1665

1666
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
935,815,335✔
1667
  pColumn->hasNull = false;
935,815,335✔
1668

1669
  if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
935,815,335!
1670
    pColumn->varmeta.length = 0;
70,530,077✔
1671
    if (pColumn->varmeta.offset != NULL) {
70,530,077!
1672
      memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows);
73,335,474✔
1673
    }
1674
  } else {
1675
    if (pColumn->nullbitmap != NULL) {
865,285,258✔
1676
      memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
312,307,785✔
1677
    }
1678
  }
1679
}
935,815,335✔
1680

1681
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload) {
82,058,522✔
1682
  SDataBlockInfo info = {0};
82,058,522✔
1683
  return doEnsureCapacity(pColumn, &info, numOfRows, clearPayload);
82,058,522✔
1684
}
1685

1686
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
1,027,399,456✔
1687
  int32_t code = 0;
1,027,399,456✔
1688
  if (numOfRows == 0 || numOfRows <= pDataBlock->info.capacity) {
1,027,399,456!
1689
    return TSDB_CODE_SUCCESS;
958,408,312✔
1690
  }
1691

1692
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
68,991,144✔
1693
  for (int32_t i = 0; i < numOfCols; ++i) {
365,478,796✔
1694
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
296,258,880✔
1695
    if (p == NULL) {
293,780,075!
1696
      return terrno;
×
1697
    }
1698

1699
    code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, false);
293,780,075✔
1700
    if (code) {
296,551,237!
1701
      return code;
×
1702
    }
1703
  }
1704

1705
  pDataBlock->info.capacity = numOfRows;
69,219,916✔
1706
  return TSDB_CODE_SUCCESS;
69,219,916✔
1707
}
1708

1709
void blockDataFreeRes(SSDataBlock* pBlock) {
74,290,257✔
1710
  if (pBlock == NULL){
74,290,257✔
1711
    return;
117,665✔
1712
  }
1713

1714
  int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
74,172,592✔
1715
  for (int32_t i = 0; i < numOfOutput; ++i) {
405,093,122✔
1716
    SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
330,785,041✔
1717
    if (pColInfoData == NULL) {
329,221,387!
1718
      continue;
×
1719
    }
1720

1721
    colDataDestroy(pColInfoData);
329,221,387✔
1722
  }
1723

1724
  taosArrayDestroy(pBlock->pDataBlock);
74,308,081✔
1725
  pBlock->pDataBlock = NULL;
74,184,465✔
1726

1727
  taosMemoryFreeClear(pBlock->pBlockAgg);
74,184,465!
1728
  memset(&pBlock->info, 0, sizeof(SDataBlockInfo));
74,164,183✔
1729
}
1730

1731
void blockDataDestroy(SSDataBlock* pBlock) {
77,173,976✔
1732
  if (pBlock == NULL) {
77,173,976✔
1733
    return;
3,283,085✔
1734
  }
1735

1736
  if (IS_VAR_DATA_TYPE(pBlock->info.pks[0].type)) {
73,890,891!
1737
    taosMemoryFreeClear(pBlock->info.pks[0].pData);
139,703!
1738
    taosMemoryFreeClear(pBlock->info.pks[1].pData);
139,745!
1739
  }
1740

1741
  blockDataFreeRes(pBlock);
73,890,937✔
1742
  taosMemoryFreeClear(pBlock);
73,888,966!
1743
}
1744

1745
// todo remove it
1746
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
41,443✔
1747
  int32_t code = 0;
41,443✔
1748

1749
  dst->info = src->info;
41,443✔
1750
  dst->info.pks[0].pData = NULL;
41,443✔
1751
  dst->info.pks[1].pData = NULL;
41,443✔
1752
  dst->info.rows = 0;
41,443✔
1753
  dst->info.capacity = 0;
41,443✔
1754

1755
  size_t numOfCols = taosArrayGetSize(src->pDataBlock);
41,443✔
1756
  for (int32_t i = 0; i < numOfCols; ++i) {
355,414✔
1757
    SColumnInfoData* p = taosArrayGet(src->pDataBlock, i);
313,962✔
1758
    if (p == NULL) {
313,953!
1759
      return terrno;
×
1760
    }
1761

1762
    SColumnInfoData  colInfo = {.hasNull = true, .info = p->info};
313,953✔
1763
    code = blockDataAppendColInfo(dst, &colInfo);
313,953✔
1764
    if (code) {
313,971!
1765
      return code;
×
1766
    }
1767
  }
1768

1769
  code = blockDataEnsureCapacity(dst, src->info.rows);
41,452✔
1770
  if (code != TSDB_CODE_SUCCESS) {
41,442!
1771
    return code;
×
1772
  }
1773

1774
  for (int32_t i = 0; i < numOfCols; ++i) {
355,223✔
1775
    SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i);
313,785✔
1776
    SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i);
313,770✔
1777
    if (pSrc == NULL || pDst == NULL || (pSrc->pData == NULL && (!IS_VAR_DATA_TYPE(pSrc->info.type)))) {
313,765!
1778
      continue;
×
1779
    }
1780

1781
    int32_t ret = colDataAssign(pDst, pSrc, src->info.rows, &src->info);
313,765✔
1782
    if (ret < 0) {
313,781!
1783
      return ret;
×
1784
    }
1785
  }
1786

1787
  uint32_t cap = dst->info.capacity;
41,438✔
1788
  dst->info = src->info;
41,438✔
1789
  dst->info.pks[0].pData = NULL;
41,438✔
1790
  dst->info.pks[1].pData = NULL;
41,438✔
1791
  dst->info.capacity = cap;
41,438✔
1792
  uTrace("%s,parName:%s, groupId:%"PRIu64, __FUNCTION__, dst->info.parTbName, dst->info.id.groupId)
41,438✔
1793
  return code;
41,443✔
1794
}
1795

1796
int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) {
7,230,212✔
1797
  blockDataCleanup(pDst);
7,230,212✔
1798

1799
  int32_t code = blockDataEnsureCapacity(pDst, pSrc->info.rows);
7,231,728✔
1800
  if (code != TSDB_CODE_SUCCESS) {
7,231,089!
1801
    return code;
×
1802
  }
1803

1804
  size_t numOfCols = taosArrayGetSize(pSrc->pDataBlock);
7,231,089✔
1805
  for (int32_t i = 0; i < numOfCols; ++i) {
21,459,635✔
1806
    SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
14,228,690✔
1807
    SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, i);
14,215,168✔
1808
    if (pDstCol == NULL || pSrcCol == NULL) {
14,198,976!
1809
      continue;
×
1810
    }
1811

1812
    int32_t ret = colDataAssign(pDstCol, pSrcCol, pSrc->info.rows, &pSrc->info);
14,198,976✔
1813
    if (ret < 0) {
14,230,693!
1814
      code = ret;
×
1815
      return code;
×
1816
    }
1817
  }
1818

1819
  uint32_t cap = pDst->info.capacity;
7,230,945✔
1820

1821
  if (IS_VAR_DATA_TYPE(pDst->info.pks[0].type)) {
7,230,945!
1822
    taosMemoryFreeClear(pDst->info.pks[0].pData);
×
1823
  }
1824

1825
  if (IS_VAR_DATA_TYPE(pDst->info.pks[1].type)) {
7,230,945!
1826
    taosMemoryFreeClear(pDst->info.pks[1].pData);
×
1827
  }
1828

1829
  pDst->info = pSrc->info;
7,230,945✔
1830
  code = copyPkVal(&pDst->info, &pSrc->info);
7,230,945✔
1831
  if (code != TSDB_CODE_SUCCESS) {
7,265,339!
1832
    uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1833
    return code;
×
1834
  }
1835

1836
  pDst->info.capacity = cap;
7,265,589✔
1837
  return code;
7,265,589✔
1838
}
1839

1840
int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock) {
54,491✔
1841
  QRY_PARAM_CHECK(pBlock);
54,491!
1842

1843
  int32_t      code = 0;
54,491✔
1844
  SSDataBlock* p = taosMemoryCalloc(1, sizeof(SSDataBlock));
54,491!
1845
  if (p == NULL) {
54,500!
1846
    return terrno;
×
1847
  }
1848

1849
  p->info.hasVarCol = false;
54,500✔
1850
  p->info.id.groupId = 0;
54,500✔
1851
  p->info.rows = 0;
54,500✔
1852
  p->info.type = type;
54,500✔
1853
  p->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) +
54,500✔
1854
                    sizeof(TSKEY) + VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
1855
  p->info.watermark = INT64_MIN;
54,500✔
1856

1857
  p->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
54,500✔
1858
  if (p->pDataBlock == NULL) {
54,504!
1859
    taosMemoryFree(p);
×
1860
    return terrno;
×
1861
  }
1862

1863
  SColumnInfoData infoData = {0};
54,504✔
1864
  infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
54,504✔
1865
  infoData.info.bytes = sizeof(TSKEY);
54,504✔
1866

1867
  // window start ts
1868
  void* px = taosArrayPush(p->pDataBlock, &infoData);
54,504✔
1869
  if (px == NULL) {
54,497!
1870
    code = terrno;
×
1871
    goto _err;
×
1872
  }
1873

1874
  // window end ts
1875
  px = taosArrayPush(p->pDataBlock, &infoData);
54,497✔
1876
  if (px == NULL) {
54,494!
1877
    code = terrno;
×
1878
    goto _err;
×
1879
  }
1880

1881
  infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
54,494✔
1882
  infoData.info.bytes = sizeof(uint64_t);
54,494✔
1883

1884
  // uid
1885
  px = taosArrayPush(p->pDataBlock, &infoData);
54,494✔
1886
  if (px == NULL) {
54,501!
1887
    code = terrno;
×
1888
    goto _err;
×
1889
  }
1890

1891
  // group id
1892
  px = taosArrayPush(p->pDataBlock, &infoData);
54,501✔
1893
  if (px == NULL) {
54,499!
1894
    code = terrno;
×
1895
    goto _err;
×
1896
  }
1897

1898
  infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
54,499✔
1899
  infoData.info.bytes = sizeof(TSKEY);
54,499✔
1900

1901
  // calculate start ts
1902
  px = taosArrayPush(p->pDataBlock, &infoData);
54,499✔
1903
  if (px == NULL) {
54,499!
1904
    code = terrno;
×
1905
    goto _err;
×
1906
  }
1907

1908
  // calculate end ts
1909
  px = taosArrayPush(p->pDataBlock, &infoData);
54,499✔
1910
  if (px == NULL) {
54,497!
1911
    code = terrno;
×
1912
    goto _err;
×
1913
  }
1914

1915
  // table name
1916
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
54,497✔
1917
  infoData.info.bytes = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
54,497✔
1918
  px = taosArrayPush(p->pDataBlock, &infoData);
54,497✔
1919
  if (px == NULL) {
54,495!
1920
    code = terrno;
×
1921
    goto _err;
×
1922
  }
1923

1924
  *pBlock = p;
54,495✔
1925
  return code;
54,495✔
1926

1927
_err:
×
1928
  taosArrayDestroy(p->pDataBlock);
×
1929
  taosMemoryFree(p);
×
1930
  return code;
×
1931
}
1932

1933
int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlock** pResBlock) {
4,972✔
1934
  QRY_PARAM_CHECK(pResBlock);
4,972!
1935

1936
  if (pDataBlock == NULL) {
4,972!
1937
    return TSDB_CODE_INVALID_PARA;
×
1938
  }
1939

1940
  SSDataBlock* pBlock = NULL;
4,972✔
1941
  int32_t code = createDataBlock(&pBlock);
4,972✔
1942
  if (code) {
4,972!
1943
    return code;
×
1944
  }
1945

1946
  pBlock->info = pDataBlock->info;
4,972✔
1947
  pBlock->info.pks[0].pData = NULL;
4,972✔
1948
  pBlock->info.pks[1].pData = NULL;
4,972✔
1949
  pBlock->info.rows = 0;
4,972✔
1950
  pBlock->info.capacity = 0;
4,972✔
1951

1952
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
4,972✔
1953
  for (int32_t i = 0; i < numOfCols; ++i) {
68,768✔
1954
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
63,790✔
1955
    if (p == NULL) {
63,784!
1956
      blockDataDestroy(pBlock);
×
1957
      return terrno;
×
1958
    }
1959

1960
    SColumnInfoData  colInfo = {.hasNull = true, .info = p->info};
63,784✔
1961
    code = blockDataAppendColInfo(pBlock, &colInfo);
63,784✔
1962
    if (code) {
63,796!
1963
      blockDataDestroy(pBlock);
×
1964
      return code;
×
1965
    }
1966
  }
1967

1968
  code = blockDataEnsureCapacity(pBlock, 1);
4,978✔
1969
  if (code != TSDB_CODE_SUCCESS) {
4,972!
1970
    blockDataDestroy(pBlock);
×
1971
    return code;
×
1972
  }
1973

1974
  for (int32_t i = 0; i < numOfCols; ++i) {
68,803✔
1975
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
63,831✔
1976
    SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
63,823✔
1977
    if (pDst == NULL || pSrc == NULL) {
63,804!
1978
      blockDataDestroy(pBlock);
×
1979
      return terrno;
×
1980
    }
1981

1982
    bool  isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL);
63,807✔
1983
    void* pData = NULL;
63,807✔
1984
    if (!isNull) {
63,807✔
1985
      pData = colDataGetData(pSrc, rowIdx);
59,295!
1986
    }
1987

1988
    code = colDataSetVal(pDst, 0, pData, isNull);
63,807✔
1989
    if (code) {
63,830!
1990
      blockDataDestroy(pBlock);
×
1991
      return code;
×
1992
    }
1993
  }
1994

1995
  pBlock->info.rows = 1;
4,972✔
1996

1997
  *pResBlock = pBlock;
4,972✔
1998
  return code;
4,972✔
1999
}
2000

2001
int32_t copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc) {
55,434,575✔
2002
  int32_t code = TSDB_CODE_SUCCESS;
55,434,575✔
2003
  int32_t lino = 0;
55,434,575✔
2004
  if (!IS_VAR_DATA_TYPE(pSrc->pks[0].type)) {
55,434,575!
2005
    return code;
55,334,188✔
2006
  }
2007

2008
  // prepare the pk buffer if needed
2009
  SValue* p = &pDst->pks[0];
100,387✔
2010

2011
  p->type = pSrc->pks[0].type;
100,387✔
2012
  p->pData = taosMemoryCalloc(1, pSrc->pks[0].nData);
100,387!
2013
  QUERY_CHECK_NULL(p->pData, code, lino, _end, terrno);
114,609!
2014

2015
  p->nData = pSrc->pks[0].nData;
114,609✔
2016
  memcpy(p->pData, pSrc->pks[0].pData, p->nData);
114,609✔
2017

2018
  p = &pDst->pks[1];
114,609✔
2019
  p->type = pSrc->pks[1].type;
114,609✔
2020
  p->pData = taosMemoryCalloc(1, pSrc->pks[1].nData);
114,609!
2021
  QUERY_CHECK_NULL(p->pData, code, lino, _end, terrno);
114,594!
2022

2023
  p->nData = pSrc->pks[1].nData;
114,594✔
2024
  memcpy(p->pData, pSrc->pks[1].pData, p->nData);
114,594✔
2025

2026
_end:
114,594✔
2027
  if (code != TSDB_CODE_SUCCESS) {
114,594!
2028
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2029
  }
2030
  return code;
114,596✔
2031
}
2032

2033
int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataBlock** pResBlock) {
48,147,886✔
2034
  QRY_PARAM_CHECK(pResBlock);
48,147,886!
2035
  if (pDataBlock == NULL) {
48,147,886!
2036
    return TSDB_CODE_INVALID_PARA;
×
2037
  }
2038

2039
  SSDataBlock* pDstBlock = NULL;
48,147,886✔
2040
  int32_t code = createDataBlock(&pDstBlock);
48,147,886✔
2041
  if (code) {
48,186,130!
2042
    return code;
×
2043
  }
2044

2045
  pDstBlock->info = pDataBlock->info;
48,186,130✔
2046
  pDstBlock->info.pks[0].pData = NULL;
48,186,130✔
2047
  pDstBlock->info.pks[1].pData = NULL;
48,186,130✔
2048

2049
  pDstBlock->info.rows = 0;
48,186,130✔
2050
  pDstBlock->info.capacity = 0;
48,186,130✔
2051
  pDstBlock->info.rowSize = 0;
48,186,130✔
2052
  pDstBlock->info.id = pDataBlock->info.id;
48,186,130✔
2053
  pDstBlock->info.blankFill = pDataBlock->info.blankFill;
48,186,130✔
2054

2055
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
48,186,130✔
2056
  for (int32_t i = 0; i < numOfCols; ++i) {
262,444,567✔
2057
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
214,173,072✔
2058
    if (p == NULL) {
212,977,864!
2059
      blockDataDestroy(pDstBlock);
×
2060
      return terrno;
×
2061
    }
2062

2063
    SColumnInfoData  colInfo = {.hasNull = true, .info = p->info};
212,977,864✔
2064
    code = blockDataAppendColInfo(pDstBlock, &colInfo);
212,977,864✔
2065
    if (code) {
214,266,777!
2066
      blockDataDestroy(pDstBlock);
×
2067
      return code;
×
2068
    }
2069
  }
2070

2071
  code = copyPkVal(&pDstBlock->info, &pDataBlock->info);
48,271,495✔
2072
  if (code != TSDB_CODE_SUCCESS) {
48,184,875!
2073
    blockDataDestroy(pDstBlock);
×
2074
    uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2075
    return code;
×
2076
  }
2077

2078
  if (copyData) {
48,197,373✔
2079
    code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows);
17,279,059✔
2080
    if (code != TSDB_CODE_SUCCESS) {
17,282,628!
2081
      blockDataDestroy(pDstBlock);
×
2082
      return code;
×
2083
    }
2084

2085
    for (int32_t i = 0; i < numOfCols; ++i) {
90,942,178✔
2086
      SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
73,678,237✔
2087
      SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
73,573,337✔
2088
      if (pDst == NULL) {
73,539,864!
2089
        blockDataDestroy(pDstBlock);
×
2090
        uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2091
        return terrno;
×
2092
      }
2093

2094
      if (pSrc == NULL) {
73,539,864!
2095
        blockDataDestroy(pDstBlock);
×
2096
        uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2097
        return terrno;
×
2098
      }
2099

2100
      int32_t ret = colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
73,539,864✔
2101
      if (ret < 0) {
73,654,008!
2102
        code = ret;
×
2103
        blockDataDestroy(pDstBlock);
×
2104
        return code;
×
2105
      }
2106
    }
2107

2108
    pDstBlock->info.rows = pDataBlock->info.rows;
17,263,941✔
2109
    pDstBlock->info.capacity = pDataBlock->info.rows;
17,263,941✔
2110
  }
2111

2112
  *pResBlock = pDstBlock;
48,182,255✔
2113
  return code;
48,182,255✔
2114
}
2115

2116
int32_t createOneDataBlockWithColArray(const SSDataBlock* pDataBlock, SArray* pColArray, SSDataBlock** pResBlock) {
228✔
2117
  int32_t      code = TSDB_CODE_SUCCESS;
228✔
2118
  int32_t      lino = 0;
228✔
2119
  SSDataBlock* pDstBlock = NULL;
228✔
2120

2121
  QRY_PARAM_CHECK(pResBlock);
228!
2122
  QUERY_CHECK_NULL(pDataBlock, code, lino, _return, TSDB_CODE_INVALID_PARA);
228!
2123

2124
  QUERY_CHECK_CODE(createDataBlock(&pDstBlock), lino, _return);
228!
2125

2126
  pDstBlock->info = pDataBlock->info;
228✔
2127
  pDstBlock->info.pks[0].pData = NULL;
228✔
2128
  pDstBlock->info.pks[1].pData = NULL;
228✔
2129

2130
  pDstBlock->info.rows = 0;
228✔
2131
  pDstBlock->info.capacity = 0;
228✔
2132
  pDstBlock->info.rowSize = 0;
228✔
2133
  pDstBlock->info.id = pDataBlock->info.id;
228✔
2134
  pDstBlock->info.blankFill = pDataBlock->info.blankFill;
228✔
2135

2136
  for (int32_t i = 0; i < taosArrayGetSize(pColArray); ++i) {
1,380✔
2137
    SColIdPair *pColPair = taosArrayGet(pColArray, i);
1,152✔
2138
    QUERY_CHECK_NULL(pColPair, code, lino, _return, terrno);
1,152!
2139

2140
    for (int32_t j = 0; j < taosArrayGetSize(pDataBlock->pDataBlock); ++j) {
8,592!
2141
      SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, j);
8,592✔
2142
      if (p == NULL) {
8,592!
2143
        continue;
×
2144
      }
2145

2146
      if (p->info.colId == pColPair->vtbColId) {
8,592✔
2147
        QUERY_CHECK_CODE(blockDataAppendColInfo(pDstBlock, p), lino, _return);
1,152!
2148
        break;
1,152✔
2149
      }
2150
    }
2151
  }
2152

2153
  *pResBlock = pDstBlock;
228✔
2154
  return code;
228✔
2155
_return:
×
2156
  uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2157
  blockDataDestroy(pDstBlock);
×
2158
  return code;
×
2159
}
2160

2161
int32_t createOneDataBlockWithTwoBlock(const SSDataBlock* pDataBlock, const SSDataBlock* pOrgBlock, SSDataBlock** pResBlock) {
171✔
2162
  int32_t      code = TSDB_CODE_SUCCESS;
171✔
2163
  int32_t      lino = 0;
171✔
2164
  SSDataBlock *pDstBlock = NULL;
171✔
2165

2166
  QRY_PARAM_CHECK(pResBlock);
171!
2167
  QUERY_CHECK_NULL(pDataBlock, code, lino, _return, TSDB_CODE_INVALID_PARA);
171!
2168
  QUERY_CHECK_NULL(pOrgBlock, code, lino, _return, TSDB_CODE_INVALID_PARA);
171!
2169

2170
  QUERY_CHECK_CODE(createOneDataBlock(pOrgBlock, false, &pDstBlock), lino, _return);
171!
2171
  QUERY_CHECK_CODE(blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows), lino, _return);
171!
2172

2173
  for (int32_t i = 0; i < taosArrayGetSize(pOrgBlock->pDataBlock); ++i) {
3,966✔
2174
    SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
3,795✔
2175
    SColumnInfoData* pSrc = taosArrayGet(pOrgBlock->pDataBlock, i);
3,795✔
2176

2177
    QUERY_CHECK_NULL(pDst, code, lino, _return, terrno);
3,795!
2178
    QUERY_CHECK_NULL(pSrc, code, lino, _return, terrno);
3,795!
2179

2180
    bool found = false;
3,795✔
2181
    for (int32_t j = 0; j < taosArrayGetSize(pDataBlock->pDataBlock); j++) {
20,241✔
2182
      SColumnInfoData *p = taosArrayGet(pDataBlock->pDataBlock, j);
17,310✔
2183
      if (p->info.slotId == pSrc->info.slotId) {
17,310✔
2184
        QUERY_CHECK_CODE(colDataAssign(pDst, p, (int32_t)pDataBlock->info.rows, &pDataBlock->info), lino, _return);
864!
2185
        found = true;
864✔
2186
        break;
864✔
2187
      }
2188
    }
2189
    if (!found) {
3,795✔
2190
      colDataSetNNULL(pDst, 0, pDataBlock->info.rows);
2,931✔
2191
    }
2192

2193
  }
2194

2195
  pDstBlock->info.rows = pDataBlock->info.rows;
171✔
2196
  pDstBlock->info.capacity = pDataBlock->info.rows;
171✔
2197
  pDstBlock->info.window = pDataBlock->info.window;
171✔
2198

2199
  *pResBlock = pDstBlock;
171✔
2200
  return code;
171✔
2201
_return:
×
2202
  uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2203
  blockDataDestroy(pDstBlock);
×
2204
  return code;
×
2205
}
2206

2207
int32_t createDataBlock(SSDataBlock** pResBlock) {
73,742,955✔
2208
  QRY_PARAM_CHECK(pResBlock);
73,742,955!
2209
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
73,742,955!
2210
  if (pBlock == NULL) {
73,770,230!
2211
    return terrno;
×
2212
  }
2213

2214
  pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
73,770,230✔
2215
  if (pBlock->pDataBlock == NULL) {
73,785,405✔
2216
    int32_t code = terrno;
5,192✔
2217
    taosMemoryFree(pBlock);
×
2218
    return code;
×
2219
  }
2220

2221
  *pResBlock = pBlock;
73,780,213✔
2222
  return 0;
73,780,213✔
2223
}
2224

2225
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData) {
328,066,062✔
2226
  if (pBlock->pDataBlock == NULL) {
328,066,062✔
2227
    pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
210,844✔
2228
    if (pBlock->pDataBlock == NULL) {
210,700!
2229
      return terrno;
×
2230
    }
2231
  }
2232

2233
  void* p = taosArrayPush(pBlock->pDataBlock, pColInfoData);
328,065,918✔
2234
  if (p == NULL) {
329,701,253!
2235
    return terrno;
×
2236
  }
2237

2238
  // todo disable it temporarily
2239
  //  A S S E R T(pColInfoData->info.type != 0);
2240
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
329,701,253✔
2241
    pBlock->info.hasVarCol = true;
71,529,460✔
2242
  }
2243

2244
  pBlock->info.rowSize += pColInfoData->info.bytes;
329,701,253✔
2245
  return TSDB_CODE_SUCCESS;
329,701,253✔
2246
}
2247

2248
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId) {
110,783,153✔
2249
  SColumnInfoData col = {.hasNull = true};
110,783,153✔
2250
  col.info.colId = colId;
110,783,153✔
2251
  col.info.type = type;
110,783,153✔
2252
  col.info.bytes = bytes;
110,783,153✔
2253

2254
  return col;
110,783,153✔
2255
}
2256

2257
int32_t bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index, SColumnInfoData** pColInfoData) {
45,397,027✔
2258
  int32_t code = 0;
45,397,027✔
2259
  QRY_PARAM_CHECK(pColInfoData);
45,397,027!
2260

2261
  if (index >= taosArrayGetSize(pBlock->pDataBlock)) {
45,397,027!
2262
    return TSDB_CODE_INVALID_PARA;
×
2263
  }
2264

2265
  *pColInfoData = taosArrayGet(pBlock->pDataBlock, index);
45,393,519✔
2266
  if (*pColInfoData == NULL) {
45,372,264!
2267
    code = terrno;
×
2268
  }
2269

2270
  return code;
45,372,557✔
2271
}
2272

2273
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int32_t extraSize) {
7,384,136✔
2274
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
7,384,136✔
2275

2276
  int32_t payloadSize = pageSize - extraSize;
7,383,806✔
2277
  int32_t rowSize = pBlock->info.rowSize;
7,383,806✔
2278
  int32_t nRows = payloadSize / rowSize;
7,383,806✔
2279
  if (nRows < 1) {
7,383,806!
UNCOV
2280
    uError("rows %d in page is too small, payloadSize:%d, rowSize:%d", nRows, payloadSize, rowSize);
×
UNCOV
2281
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2282
    return -1;
×
2283
  }
2284

2285
  int32_t numVarCols = 0;
7,383,839✔
2286
  int32_t numFixCols = 0;
7,383,839✔
2287
  for (int32_t i = 0; i < numOfCols; ++i) {
34,758,542✔
2288
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
27,375,648✔
2289
    if (pCol == NULL) {
27,374,703!
2290
      return -1;
×
2291
    }
2292

2293
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
27,374,703!
2294
      ++numVarCols;
1,947,526✔
2295
    } else {
2296
      ++numFixCols;
25,427,177✔
2297
    }
2298
  }
2299

2300
  // find the data payload whose size is greater than payloadSize
2301
  int result = -1;
7,382,894✔
2302
  int start = 1;
7,382,894✔
2303
  int end = nRows;
7,382,894✔
2304
  while (start <= end) {
68,731,121✔
2305
    int mid = start + (end - start) / 2;
61,348,227✔
2306
    // data size + var data type columns offset + fixed data type columns bitmap len
2307
    int midSize = rowSize * mid + numVarCols * sizeof(int32_t) * mid + numFixCols * BitmapLen(mid);
61,348,227✔
2308
    if (midSize > payloadSize) {
61,348,227✔
2309
      result = mid;
16,278,503✔
2310
      end = mid - 1;
16,278,503✔
2311
    } else {
2312
      start = mid + 1;
45,069,724✔
2313
    }
2314
  }
2315

2316
  int32_t newRows = (result != -1) ? result - 1 : nRows;
7,382,894✔
2317
  // the true value must be less than the value of nRows
2318
  if (newRows > nRows || newRows < 1) {
7,382,894!
2319
    uError("invalid newRows:%d, nRows:%d", newRows, nRows);
×
2320
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2321
    return -1;
×
2322
  }
2323

2324
  return newRows;
7,384,626✔
2325
}
2326

2327
void colDataDestroy(SColumnInfoData* pColData) {
408,094,108✔
2328
  if (!pColData) {
408,094,108✔
2329
    return;
24,520✔
2330
  }
2331

2332
  if (IS_VAR_DATA_TYPE(pColData->info.type)) {
408,069,588!
2333
    taosMemoryFreeClear(pColData->varmeta.offset);
83,848,878!
2334
  } else {
2335
    taosMemoryFreeClear(pColData->nullbitmap);
324,220,710!
2336
  }
2337

2338
  taosMemoryFreeClear(pColData->pData);
409,294,463!
2339
}
2340

2341
static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
727,410✔
2342
  int32_t len = BitmapLen(total);
727,410✔
2343

2344
  int32_t newLen = BitmapLen(total - n);
727,410✔
2345
  if (n % 8 == 0) {
727,410✔
2346
    (void) memmove(nullBitmap, nullBitmap + n / 8, newLen);
8,616✔
2347
  } else {
2348
    int32_t  tail = n % 8;
718,794✔
2349
    int32_t  i = 0;
718,794✔
2350
    uint8_t* p = (uint8_t*)nullBitmap;
718,794✔
2351

2352
    if (n < 8) {
718,794✔
2353
      while (i < len) {
5,462,334✔
2354
        uint8_t v = p[i];  // source bitmap value
5,112,124✔
2355
        p[i] = (v << tail);
5,112,124✔
2356

2357
        if (i < len - 1) {
5,112,124✔
2358
          uint8_t next = p[i + 1];
4,761,914✔
2359
          p[i] |= (next >> (8 - tail));
4,761,914✔
2360
        }
2361

2362
        i += 1;
5,112,124✔
2363
      }
2364
    } else if (n > 8) {
368,584!
2365
      int32_t remain = (total % 8 != 0 && total % 8 <= tail) ? 1 : 0;
368,594✔
2366
      int32_t gap = len - newLen - remain;
368,594✔
2367
      while (i < newLen) {
7,161,829✔
2368
        uint8_t v = p[i + gap];
6,793,235✔
2369
        p[i] = (v << tail);
6,793,235✔
2370

2371
        if (i < newLen - 1 + remain) {
6,793,235✔
2372
          uint8_t next = p[i + gap + 1];
6,663,549✔
2373
          p[i] |= (next >> (8 - tail));
6,663,549✔
2374
        }
2375

2376
        i += 1;
6,793,235✔
2377
      }
2378
    }
2379
  }
2380
}
727,410✔
2381

2382
static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, size_t end) {
×
2383
  int32_t dataOffset = -1;
×
2384
  int32_t dataLen = 0;
×
2385
  int32_t beigin = start;
×
2386
  while (beigin < end) {
×
2387
    int32_t offset = pColInfoData->varmeta.offset[beigin];
×
2388
    if (offset == -1) {
×
2389
      beigin++;
×
2390
      continue;
×
2391
    }
2392
    if (start != 0) {
×
2393
      pColInfoData->varmeta.offset[beigin] = dataLen;
×
2394
    }
2395
    char* data = pColInfoData->pData + offset;
×
2396
    if (dataOffset == -1) dataOffset = offset;  // mark the begin of data
×
2397
    int32_t type = pColInfoData->info.type;
×
2398
    if (type == TSDB_DATA_TYPE_JSON) {
×
2399
      dataLen += getJsonValueLen(data);
×
2400
    } else {
2401
      dataLen += varDataTLen(data);
×
2402
    }
2403
    beigin++;
×
2404
  }
2405

2406
  if (dataOffset > 0) {
×
2407
    (void) memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen);
×
2408
  }
2409

2410
  (void) memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t));
×
2411
  return dataLen;
×
2412
}
2413

2414
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
890,606✔
2415
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
890,606!
2416
    // pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, n, total);
2417
    (void) memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t));
163,241✔
2418

2419
    // clear the offset value of the unused entries.
2420
    memset(&pColInfoData->varmeta.offset[total - n], 0, n);
163,241✔
2421
  } else {
2422
    int32_t bytes = pColInfoData->info.bytes;
727,365✔
2423
    (void) memmove(pColInfoData->pData, ((char*)pColInfoData->pData + n * bytes), (total - n) * bytes);
727,365✔
2424
    doShiftBitmap(pColInfoData->nullbitmap, n, total);
727,365✔
2425
  }
2426
}
890,655✔
2427

2428
int32_t blockDataTrimFirstRows(SSDataBlock* pBlock, size_t n) {
645,174✔
2429
  if (n == 0) {
645,174✔
2430
    return TSDB_CODE_SUCCESS;
405,069✔
2431
  }
2432

2433
  if (pBlock->info.rows <= n) {
240,105✔
2434
    blockDataEmpty(pBlock);
1,110✔
2435
  } else {
2436
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
238,995✔
2437
    for (int32_t i = 0; i < numOfCols; ++i) {
1,129,663✔
2438
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
890,615✔
2439
      if (pColInfoData == NULL) {
890,607!
2440
        return terrno;
×
2441
      }
2442

2443
      colDataTrimFirstNRows(pColInfoData, n, pBlock->info.rows);
890,607✔
2444
    }
2445

2446
    pBlock->info.rows -= n;
239,048✔
2447
  }
2448
  return TSDB_CODE_SUCCESS;
240,156✔
2449
}
2450

2451
static void colDataKeepFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
2,547,816✔
2452
  if (n >= total || n == 0) return;
2,547,816!
2453
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
2,547,818!
2454
    if (pColInfoData->varmeta.length != 0) {
429,235✔
2455
      int32_t newLen = pColInfoData->varmeta.offset[n];
250,283✔
2456
      if (-1 == newLen) {
250,283✔
2457
        for (int i = n - 1; i >= 0; --i) {
833✔
2458
          newLen = pColInfoData->varmeta.offset[i];
754✔
2459
          if (newLen != -1) {
754✔
2460
            if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
34!
2461
              newLen += getJsonValueLen(pColInfoData->pData + newLen);
×
2462
            } else {
2463
              newLen += varDataTLen(pColInfoData->pData + newLen);
34✔
2464
            }
2465
            break;
34✔
2466
          }
2467
        }
2468
      }
2469
      if (newLen <= -1) {
250,283✔
2470
        uFatal("colDataKeepFirstNRows: newLen:%d  old:%d", newLen, pColInfoData->varmeta.length);
79!
2471
      } else {
2472
        pColInfoData->varmeta.length = newLen;
250,204✔
2473
      }
2474
    }
2475
    // pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, 0, n);
2476
    memset(&pColInfoData->varmeta.offset[n], 0, total - n);
429,253✔
2477
  }
2478
}
2479

2480
void blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n) {
1,037,205✔
2481
  if (n == 0) {
1,037,205✔
2482
    blockDataEmpty(pBlock);
43,975✔
2483
    return ;
43,974✔
2484
  }
2485

2486
  if (pBlock->info.rows <= n) {
993,230✔
2487
    return ;
838,878✔
2488
  } else {
2489
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
154,352✔
2490
    for (int32_t i = 0; i < numOfCols; ++i) {
2,702,194✔
2491
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
2,547,816✔
2492
      if (pColInfoData == NULL) {
2,547,817!
2493
        continue;
×
2494
      }
2495

2496
      colDataKeepFirstNRows(pColInfoData, n, pBlock->info.rows);
2,547,817✔
2497
    }
2498

2499
    pBlock->info.rows = n;
154,378✔
2500
  }
2501
}
2502

2503
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
103,059✔
2504
  int64_t tbUid = pBlock->info.id.uid;
103,059✔
2505
  int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
103,059✔
2506
  int16_t hasVarCol = pBlock->info.hasVarCol;
103,235✔
2507
  int64_t rows = pBlock->info.rows;
103,235✔
2508
  int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
103,235✔
2509

2510
  int32_t tlen = 0;
103,439✔
2511
  tlen += taosEncodeFixedI64(buf, tbUid);
103,439✔
2512
  tlen += taosEncodeFixedI16(buf, numOfCols);
103,439✔
2513
  tlen += taosEncodeFixedI16(buf, hasVarCol);
206,878✔
2514
  tlen += taosEncodeFixedI64(buf, rows);
103,439✔
2515
  tlen += taosEncodeFixedI32(buf, sz);
103,439✔
2516
  for (int32_t i = 0; i < sz; i++) {
207,007✔
2517
    SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
103,104✔
2518
    if (pColData == NULL) {
103,182!
2519
      return terrno;
×
2520
    }
2521

2522
    tlen += taosEncodeFixedI16(buf, pColData->info.colId);
103,239✔
2523
    tlen += taosEncodeFixedI8(buf, pColData->info.type);
103,239✔
2524
    tlen += taosEncodeFixedI32(buf, pColData->info.bytes);
103,239✔
2525
    tlen += taosEncodeFixedBool(buf, pColData->hasNull);
103,239✔
2526

2527
    if (IS_VAR_DATA_TYPE(pColData->info.type)) {
103,239!
2528
      tlen += taosEncodeBinary(buf, pColData->varmeta.offset, sizeof(int32_t) * rows);
×
2529
    } else {
2530
      tlen += taosEncodeBinary(buf, pColData->nullbitmap, BitmapLen(rows));
206,906✔
2531
    }
2532

2533
    int32_t len = colDataGetLength(pColData, rows);
103,239✔
2534
    tlen += taosEncodeFixedI32(buf, len);
103,568✔
2535

2536
    if (pColData->reassigned && IS_VAR_DATA_TYPE(pColData->info.type)) {
103,568!
2537
      for (int32_t row = 0; row < rows; ++row) {
×
2538
        char*   pData = pColData->pData + pColData->varmeta.offset[row];
×
2539
        int32_t colSize = 0;
×
2540
        if (pColData->info.type == TSDB_DATA_TYPE_JSON) {
×
2541
          colSize = getJsonValueLen(pData);
×
2542
        } else {
2543
          colSize = varDataTLen(pData);
×
2544
        }
2545
        tlen += taosEncodeBinary(buf, pData, colSize);
×
2546
      }
2547
    } else {
2548
      tlen += taosEncodeBinary(buf, pColData->pData, len);
207,136✔
2549
    }
2550
  }
2551
  return tlen;
103,903✔
2552
}
2553

2554
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
51,549✔
2555
  int32_t sz = 0;
51,549✔
2556
  int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
51,549✔
2557

2558
  buf = taosDecodeFixedU64(buf, &pBlock->info.id.uid);
103,108!
2559
  buf = taosDecodeFixedI16(buf, &numOfCols);
51,554✔
2560
  buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol);
51,554!
2561
  buf = taosDecodeFixedI64(buf, &pBlock->info.rows);
103,108!
2562
  buf = taosDecodeFixedI32(buf, &sz);
51,554✔
2563

2564
  pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
51,554✔
2565
  if (pBlock->pDataBlock == NULL) {
51,706!
2566
    return NULL;
×
2567
  }
2568

2569
  for (int32_t i = 0; i < sz; i++) {
103,488✔
2570
    SColumnInfoData data = {0};
51,589!
2571
    buf = taosDecodeFixedI16(buf, &data.info.colId);
51,589✔
2572
    buf = taosDecodeFixedI8(buf, &data.info.type);
51,589!
2573
    buf = taosDecodeFixedI32(buf, &data.info.bytes);
51,589✔
2574
    buf = taosDecodeFixedBool(buf, &data.hasNull);
51,589✔
2575

2576
    if (IS_VAR_DATA_TYPE(data.info.type)) {
51,589!
2577
      buf = taosDecodeBinary(buf, (void**)&data.varmeta.offset, pBlock->info.rows * sizeof(int32_t));
×
2578
    } else {
2579
      buf = taosDecodeBinary(buf, (void**)&data.nullbitmap, BitmapLen(pBlock->info.rows));
103,482✔
2580
    }
2581
    if(buf == NULL) {
51,805!
2582
      uError("failed to decode null bitmap/offset, type:%d", data.info.type);
×
2583
      goto _error;
×
2584
    }
2585

2586
    int32_t len = 0;
51,805✔
2587
    buf = taosDecodeFixedI32(buf, &len);
51,805✔
2588
    buf = taosDecodeBinary(buf, (void**)&data.pData, len);
51,805✔
2589
    if (buf == NULL) {
51,733!
2590
      uError("failed to decode data, type:%d", data.info.type);
×
2591
      goto _error;
×
2592
    }
2593
    if (IS_VAR_DATA_TYPE(data.info.type)) {
51,733!
2594
      data.varmeta.length = len;
2✔
2595
      data.varmeta.allocLen = len;
2✔
2596
    }
2597

2598
    void* px = taosArrayPush(pBlock->pDataBlock, &data);
51,733✔
2599
    if (px == NULL) {
51,782!
2600
      return NULL;
×
2601
    }
2602
  }
2603

2604
  return (void*)buf;
51,899✔
2605
_error:
×
2606
  for (int32_t i = 0; i < sz; ++i) {
×
2607
    SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
×
2608
    if (pColInfoData == NULL) {
×
2609
      break;
×
2610
    }
2611
    colDataDestroy(pColInfoData);
×
2612
  }
2613
  return NULL;
×
2614
}
2615

2616
static int32_t formatTimestamp(char* buf, size_t cap, int64_t val, int precision) {
31,134,888✔
2617
  time_t  tt;
2618
  int32_t ms = 0;
31,134,888✔
2619
  int32_t code = TSDB_CODE_SUCCESS;
31,134,888✔
2620
  int32_t lino = 0;
31,134,888✔
2621
  if (precision == TSDB_TIME_PRECISION_NANO) {
31,134,888!
2622
    tt = (time_t)(val / 1000000000);
×
2623
    ms = val % 1000000000;
×
2624
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
31,134,888✔
2625
    tt = (time_t)(val / 1000000);
178✔
2626
    ms = val % 1000000;
178✔
2627
  } else {
2628
    tt = (time_t)(val / 1000);
31,134,710✔
2629
    ms = val % 1000;
31,134,710✔
2630
  }
2631

2632
  if (tt <= 0 && ms < 0) {
31,134,888!
2633
    tt--;
×
2634
    if (precision == TSDB_TIME_PRECISION_NANO) {
×
2635
      ms += 1000000000;
×
2636
    } else if (precision == TSDB_TIME_PRECISION_MICRO) {
×
2637
      ms += 1000000;
×
2638
    } else {
2639
      ms += 1000;
×
2640
    }
2641
  }
2642
  struct tm ptm = {0};
31,134,888✔
2643
  if (taosLocalTime(&tt, &ptm, buf, cap, NULL) == NULL) {
31,134,888!
2644
    code =  TSDB_CODE_INTERNAL_ERROR;
×
2645
    TSDB_CHECK_CODE(code, lino, _end);
×
2646
  }
2647

2648
  size_t pos = taosStrfTime(buf, cap, "%Y-%m-%d %H:%M:%S", &ptm);
31,135,329✔
2649
  if (pos == 0) {
31,135,210!
2650
    code = TSDB_CODE_OUT_OF_BUFFER;
×
2651
    TSDB_CHECK_CODE(code, lino, _end);
×
2652
  }
2653
  int32_t nwritten = 0;
31,135,210✔
2654
  if (precision == TSDB_TIME_PRECISION_NANO) {
31,135,210!
2655
    nwritten = snprintf(buf + pos, cap - pos, ".%09d", ms);
×
2656
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
31,135,210✔
2657
    nwritten = snprintf(buf + pos, cap - pos, ".%06d", ms);
178✔
2658
  } else {
2659
    nwritten = snprintf(buf + pos, cap - pos, ".%03d", ms);
31,135,032✔
2660
  }
2661

2662
  if (nwritten >= cap - pos) {
31,135,210!
2663
    code = TSDB_CODE_OUT_OF_BUFFER;
×
2664
    TSDB_CHECK_CODE(code, lino, _end);
×
2665
  }
2666

2667
_end:
31,135,210✔
2668
  if (code != TSDB_CODE_SUCCESS) {
31,135,210!
2669
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2670
  }
2671
  return code;
31,135,171✔
2672
}
2673

2674
// for debug
2675
int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, const char* taskIdStr) {
93,200✔
2676
  int32_t lino = 0;
93,200✔
2677
  int32_t size = 2048 * 1024;
93,200✔
2678
  int32_t code = 0;
93,200✔
2679
  char*   dumpBuf = NULL;
93,200✔
2680
  char    pBuf[TD_TIME_STR_LEN] = {0};
93,200✔
2681
  int32_t rows = pDataBlock->info.rows;
93,200✔
2682
  int32_t len = 0;
93,200✔
2683

2684
  dumpBuf = taosMemoryCalloc(size, 1);
93,200!
2685
  if (dumpBuf == NULL) {
93,200!
2686
    return terrno;
×
2687
  }
2688

2689
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
93,200✔
2690
  len += tsnprintf(dumpBuf + len, size - len,
186,403✔
2691
                  "%s===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64 "|rows:%" PRId64
2692
                  "|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
2693
                  taskIdStr, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId,
93,200✔
2694
                  pDataBlock->info.id.groupId, pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
2695
                  pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
93,200✔
2696
  if (len >= size - 1) {
93,203!
2697
    goto _exit;
×
2698
  }
2699

2700
  for (int32_t j = 0; j < rows; j++) {
25,407,050✔
2701
    len += tsnprintf(dumpBuf + len, size - len, "%s|", flag);
25,313,844✔
2702
    if (len >= size - 1) {
25,313,714!
2703
      goto _exit;
×
2704
    }
2705

2706
    for (int32_t k = 0; k < colNum; k++) {
305,181,205✔
2707
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
279,867,238✔
2708
      if (pColInfoData == NULL) {
279,867,102!
2709
        code = terrno;
×
2710
        lino = __LINE__;
×
2711
        goto _exit;
×
2712
      }
2713

2714
      if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) {
559,734,702✔
2715
        len += tsnprintf(dumpBuf + len, size - len, " %15s |", "NULL");
19,012,945✔
2716
        if (len >= size - 1) goto _exit;
19,012,777!
2717
        continue;
19,012,777✔
2718
      }
2719

2720
      void* var = colDataGetData(pColInfoData, j);
260,854,406!
2721
      switch (pColInfoData->info.type) {
260,854,406✔
2722
        case TSDB_DATA_TYPE_TIMESTAMP:
31,134,915✔
2723
          memset(pBuf, 0, sizeof(pBuf));
31,134,915✔
2724
          code = formatTimestamp(pBuf, sizeof(pBuf), *(uint64_t*)var, pColInfoData->info.precision);
31,134,915✔
2725
          if (code != TSDB_CODE_SUCCESS) {
31,135,163!
2726
            TAOS_UNUSED(tsnprintf(pBuf, sizeof(pBuf), "NaN"));
×
2727
          }
2728
          len += tsnprintf(dumpBuf + len, size - len, " %25s |", pBuf);
31,135,163✔
2729
          if (len >= size - 1) goto _exit;
31,135,216!
2730
          break;
31,135,216✔
2731
        case TSDB_DATA_TYPE_TINYINT:
51,606✔
2732
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(int8_t*)var);
51,606✔
2733
          if (len >= size - 1) goto _exit;
51,608!
2734
          break;
51,608✔
2735
        case TSDB_DATA_TYPE_UTINYINT:
40,567✔
2736
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(uint8_t*)var);
40,567✔
2737
          if (len >= size - 1) goto _exit;
40,563!
2738
          break;
40,563✔
2739
        case TSDB_DATA_TYPE_SMALLINT:
19,051,321✔
2740
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(int16_t*)var);
19,051,321✔
2741
          if (len >= size - 1) goto _exit;
19,051,323!
2742
          break;
19,051,323✔
2743
        case TSDB_DATA_TYPE_USMALLINT:
27,677✔
2744
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(uint16_t*)var);
27,677✔
2745
          if (len >= size - 1) goto _exit;
27,680!
2746
          break;
27,680✔
2747
        case TSDB_DATA_TYPE_INT:
57,085,211✔
2748
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var);
57,085,211✔
2749
          if (len >= size - 1) goto _exit;
57,085,194!
2750
          break;
57,085,194✔
2751
        case TSDB_DATA_TYPE_UINT:
27,817✔
2752
          len += tsnprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var);
27,817✔
2753
          if (len >= size - 1) goto _exit;
27,817!
2754
          break;
27,817✔
2755
        case TSDB_DATA_TYPE_BIGINT:
56,538,216✔
2756
          len += tsnprintf(dumpBuf + len, size - len, " %15" PRId64 " |", *(int64_t*)var);
56,538,216✔
2757
          if (len >= size - 1) goto _exit;
56,538,235!
2758
          break;
56,538,235✔
2759
        case TSDB_DATA_TYPE_UBIGINT:
54,554✔
2760
          len += tsnprintf(dumpBuf + len, size - len, " %15" PRIu64 " |", *(uint64_t*)var);
54,554✔
2761
          if (len >= size - 1) goto _exit;
54,553!
2762
          break;
54,553✔
2763
        case TSDB_DATA_TYPE_FLOAT:
3,756✔
2764
          len += tsnprintf(dumpBuf + len, size - len, " %15f |", *(float*)var);
3,756✔
2765
          if (len >= size - 1) goto _exit;
3,756!
2766
          break;
3,756✔
2767
        case TSDB_DATA_TYPE_DOUBLE:
19,147,556✔
2768
          len += tsnprintf(dumpBuf + len, size - len, " %15f |", *(double*)var);
19,147,556✔
2769
          if (len >= size - 1) goto _exit;
19,147,561!
2770
          break;
19,147,561✔
2771
        case TSDB_DATA_TYPE_BOOL:
2,286✔
2772
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var);
2,286✔
2773
          if (len >= size - 1) goto _exit;
2,286!
2774
          break;
2,286✔
2775
        case TSDB_DATA_TYPE_VARCHAR:
58,085,672✔
2776
        case TSDB_DATA_TYPE_VARBINARY:
2777
        case TSDB_DATA_TYPE_GEOMETRY: {
2778
          memset(pBuf, 0, sizeof(pBuf));
58,085,672✔
2779
          char*   pData = colDataGetVarData(pColInfoData, j);
58,085,672✔
2780
          int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
58,085,672✔
2781
          dataSize = TMIN(dataSize, 50);
58,085,672✔
2782
          memcpy(pBuf, varDataVal(pData), dataSize);
58,085,672✔
2783
          len += tsnprintf(dumpBuf + len, size - len, " %15s |", pBuf);
58,085,672✔
2784
          if (len >= size - 1) goto _exit;
58,085,670!
2785
        } break;
58,085,670✔
2786
        case TSDB_DATA_TYPE_NCHAR: {
19,009,914✔
2787
          char*   pData = colDataGetVarData(pColInfoData, j);
19,009,914✔
2788
          int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
19,009,914✔
2789
          memset(pBuf, 0, sizeof(pBuf));
19,009,914✔
2790
          code = taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf, NULL);
19,009,914✔
2791
          if (code < 0) {
19,009,914!
2792
            uError("func %s failed to convert to ucs charset since %s", __func__, tstrerror(code));
×
2793
            lino = __LINE__;
×
2794
            goto _exit;
×
2795
          } else { // reset the length value
2796
            code = TSDB_CODE_SUCCESS;
19,009,914✔
2797
          }
2798
          len += tsnprintf(dumpBuf + len, size - len, " %15s |", pBuf);
19,009,914✔
2799
          if (len >= size - 1) goto _exit;
19,009,914!
2800
        } break;
19,009,914✔
2801
      }
2802
    }
2803
    len += tsnprintf(dumpBuf + len, size - len, "%d\n", j);
25,313,967✔
2804
    if (len >= size - 1) goto _exit;
25,313,847!
2805
  }
2806
  len += tsnprintf(dumpBuf + len, size - len, "%s |end\n", flag);
93,206✔
2807

2808
_exit:
93,201✔
2809
  if (code == TSDB_CODE_SUCCESS) {
93,201!
2810
    *pDataBuf = dumpBuf;
93,201✔
2811
    dumpBuf = NULL;
93,201✔
2812
  } else {
2813
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2814
    if (dumpBuf) {
×
2815
      taosMemoryFree(dumpBuf);
×
2816
    }
2817
  }
2818
  return code;
93,199✔
2819
}
2820

2821
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, const STSchema* pTSchema,
1,230✔
2822
                                    int64_t uid, int32_t vgId, tb_uid_t suid) {
2823
  SSubmitReq2* pReq = *ppReq;
1,230✔
2824
  SArray*      pVals = NULL;
1,230✔
2825
  int32_t      sz = 1;
1,230✔
2826
  int32_t      code = 0;
1,230✔
2827
  *ppReq = NULL;
1,230✔
2828
  terrno = 0;
1,230✔
2829

2830
  if (NULL == pReq) {
1,230!
2831
    if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
1,230!
2832
      code = terrno;
×
2833
      goto _end;
×
2834
    }
2835

2836
    if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
1,230!
2837
      code = terrno;
×
2838
      goto _end;
×
2839
    }
2840
  }
2841

2842
  for (int32_t i = 0; i < sz; ++i) {
2,460✔
2843
    int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
1,230✔
2844
    int32_t rows = pDataBlock->info.rows;
1,230✔
2845

2846
    if (colNum <= 1) {  // invalid if only with TS col
1,230!
2847
      continue;
×
2848
    }
2849

2850
    // the rsma result should has the same column number with schema.
2851
    if (colNum != pTSchema->numOfCols) {
1,230!
2852
      uError("colNum %d is not equal to numOfCols %d", colNum, pTSchema->numOfCols);
×
2853
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2854
      goto _end;
×
2855
    }
2856

2857
    SSubmitTbData tbData = {0};
1,230✔
2858

2859
    if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
1,230!
2860
      code = terrno;
×
2861
      goto _end;
×
2862
    }
2863

2864
    tbData.suid = suid;
1,230✔
2865
    tbData.uid = uid;
1,230✔
2866
    tbData.sver = pTSchema->version;
1,230✔
2867

2868
    if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
1,230!
2869
      code = terrno;
×
2870
      taosArrayDestroy(tbData.aRowP);
×
2871
      goto _end;
×
2872
    }
2873

2874
    for (int32_t j = 0; j < rows; ++j) {  // iterate by row
2,460✔
2875

2876
      taosArrayClear(pVals);
1,230✔
2877

2878
      bool    isStartKey = false;
1,230✔
2879
      int32_t offset = 0;
1,230✔
2880
      for (int32_t k = 0; k < colNum; ++k) {  // iterate by column
6,130✔
2881
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
4,900✔
2882
        if (pColInfoData == NULL) {
4,900!
2883
          return terrno;
×
2884
        }
2885

2886
        const STColumn*  pCol = &pTSchema->columns[k];
4,900✔
2887
        void*            var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
4,900✔
2888

2889
        switch (pColInfoData->info.type) {
4,900!
2890
          case TSDB_DATA_TYPE_TIMESTAMP:
1,230✔
2891
            if (pColInfoData->info.type != pCol->type) {
1,230!
2892
              uError("colType:%d mismatch with sechma colType:%d", pColInfoData->info.type, pCol->type);
×
2893
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2894
              return terrno;
×
2895
            }
2896
            if (!isStartKey) {
1,230!
2897
              isStartKey = true;
1,230✔
2898
              if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId) {
1,230!
2899
                uError("the first timestamp colId %d is not primary colId", pCol->colId);
×
2900
                terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2901
                return terrno;
×
2902
              }
2903
              SValue val = {.type = pCol->type};
1,230✔
2904
              VALUE_SET_TRIVIAL_DATUM(&val, *(TSKEY*)var);
1,230✔
2905
              SColVal cv = COL_VAL_VALUE(pCol->colId, val);
1,230✔
2906
              void*   px = taosArrayPush(pVals, &cv);
1,230✔
2907
              if (px == NULL) {
1,230!
2908
                return terrno;
×
2909
              }
2910

2911
            } else if (colDataIsNull_s(pColInfoData, j)) {
×
2912
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
2913
              void*   px = taosArrayPush(pVals, &cv);
×
2914
              if (px == NULL) {
×
2915
                return terrno;
×
2916
              }
2917
            } else {
2918
              SValue val = {.type = pCol->type};
×
2919
              VALUE_SET_TRIVIAL_DATUM(&val, *(int64_t*)var);
×
2920
              SColVal cv = COL_VAL_VALUE(pCol->colId, val);
×
2921
              void*   px = taosArrayPush(pVals, &cv);
×
2922
              if (px == NULL) {
×
2923
                return terrno;
×
2924
              }
2925
            }
2926
            break;
1,230✔
2927
          case TSDB_DATA_TYPE_NCHAR:
×
2928
          case TSDB_DATA_TYPE_VARBINARY:
2929
          case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
2930
            if (pColInfoData->info.type != pCol->type) {
×
2931
              uError("colType:%d mismatch with sechma colType:%d", pColInfoData->info.type, pCol->type);
×
2932
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2933
              return terrno;
×
2934
            }
2935
            if (colDataIsNull_s(pColInfoData, j)) {
×
2936
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
2937
              void* px = taosArrayPush(pVals, &cv);
×
2938
              if (px == NULL) {
×
2939
                goto _end;
×
2940
              }
2941
            } else {
2942
              void*  data = colDataGetVarData(pColInfoData, j);
×
2943
              SValue sv = (SValue){
×
2944
                  .type = pCol->type, .nData = varDataLen(data), .pData = (uint8_t*) varDataVal(data)};  // address copy, no value
×
2945
              SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
×
2946
              void* px = taosArrayPush(pVals, &cv);
×
2947
              if (px == NULL) {
×
2948
                code = terrno;
×
2949
                goto _end;
×
2950
              }
2951
            }
2952
            break;
×
2953
          }
2954
          case TSDB_DATA_TYPE_DECIMAL:
×
2955
          case TSDB_DATA_TYPE_BLOB:
2956
          case TSDB_DATA_TYPE_JSON:
2957
          case TSDB_DATA_TYPE_MEDIUMBLOB:
2958
            uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
2959
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2960
            return terrno;
×
2961
            break;
2962
          default:
3,670✔
2963
            if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
3,670!
2964
              if (colDataIsNull_s(pColInfoData, j)) {
7,340✔
2965
                SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);  // should use pCol->type
16✔
2966
                void* px = taosArrayPush(pVals, &cv);
16✔
2967
                if (px == NULL) {
16!
2968
                  goto _end;
×
2969
                }
2970
              } else {
2971
                SValue sv = {.type = pCol->type};
3,654✔
2972
                if (pCol->type == pColInfoData->info.type) {
3,654✔
2973
                  valueSetDatum(&sv, sv.type, var, tDataTypes[pCol->type].bytes);
1,214✔
2974
                } else {
2975
                  /**
2976
                   *  1. sum/avg would convert to int64_t/uint64_t/double during aggregation
2977
                   *  2. below conversion may lead to overflow or loss, the app should select the right data type.
2978
                   */
2979
                  char tv[DATUM_MAX_SIZE] = {0};
2,440✔
2980
                  if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) {
2,440!
2981
                    float v = 0;
×
2982
                    GET_TYPED_DATA(v, float, pColInfoData->info.type, var, typeGetTypeModFromColInfo(&pColInfoData->info));
×
2983
                    SET_TYPED_DATA(&tv, pCol->type, v);
×
2984
                  } else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) {
2,440!
2985
                    double v = 0;
2,440✔
2986
                    GET_TYPED_DATA(v, double, pColInfoData->info.type, var, typeGetTypeModFromColInfo(&pColInfoData->info));
2,440!
2987
                    SET_TYPED_DATA(&tv, pCol->type, v);
2,440!
2988
                  } else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) {
×
2989
                    int64_t v = 0;
×
2990
                    GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var, typeGetTypeModFromColInfo(&pColInfoData->info));
×
2991
                    SET_TYPED_DATA(&tv, pCol->type, v);
×
2992
                  } else {
2993
                    uint64_t v = 0;
×
2994
                    GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var, typeGetTypeModFromColInfo(&pColInfoData->info));
×
2995
                    SET_TYPED_DATA(&tv, pCol->type, v);
×
2996
                  }
2997
                  valueSetDatum(&sv, sv.type, tv, tDataTypes[pCol->type].bytes);
2,440✔
2998
                }
2999
                SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
3,654✔
3000
                void* px = taosArrayPush(pVals, &cv);
3,654✔
3001
                if (px == NULL) {
3,654!
3002
                  code = terrno;
×
3003
                  goto _end;
×
3004
                }
3005
              }
3006
            } else {
3007
              uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
3008
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3009
              return terrno;
×
3010
            }
3011
            break;
3,670✔
3012
        }
3013
      }
3014
      SRow* pRow = NULL;
1,230✔
3015
      if ((code = tRowBuild(pVals, pTSchema, &pRow)) < 0) {
1,230!
3016
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
3017
        goto _end;
×
3018
      }
3019

3020
      void* px = taosArrayPush(tbData.aRowP, &pRow);
1,230✔
3021
      if (px == NULL) {
1,230!
3022
        code = terrno;
×
3023
        goto _end;
×
3024
      }
3025
    }
3026

3027
    void* px = taosArrayPush(pReq->aSubmitTbData, &tbData);
1,230✔
3028
    if (px == NULL) {
1,230!
3029
      code = terrno;
×
3030
      goto _end;
×
3031
    }
3032
  }
3033

3034
_end:
1,230✔
3035
  taosArrayDestroy(pVals);
1,230✔
3036
  if (code != 0) {
1,230!
3037
    if (pReq) {
×
3038
      tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
3039
      taosMemoryFreeClear(pReq);
×
3040
    }
3041
  } else {
3042
    *ppReq = pReq;
1,230✔
3043
  }
3044

3045
  return code;
1,230✔
3046
}
3047

3048
// Construct the child table name in the form of <ctbName>_<stbName>_<groupId> and store it in `ctbName`.
3049
int32_t buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId, size_t cap) {
4,463✔
3050
  int32_t   code = TSDB_CODE_SUCCESS;
4,463✔
3051
  int32_t   lino = 0;
4,463✔
3052
  char      tmp[TSDB_TABLE_NAME_LEN] = {0};
4,463✔
3053

3054
  if (ctbName == NULL || cap < TSDB_TABLE_NAME_LEN) {
4,463✔
3055
    code = TSDB_CODE_INTERNAL_ERROR;
5✔
3056
    TSDB_CHECK_CODE(code, lino, _end);
5✔
3057
  }
3058

3059
  if (stbName == NULL) {
4,461✔
3060
    snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId);
2✔
3061
  } else {
3062
    int32_t i = strlen(stbName) - 1;
4,459✔
3063
    for (; i >= 0; i--) {
128,110✔
3064
      if (stbName[i] == '.') {
128,106✔
3065
        break;
4,455✔
3066
      }
3067
    }
3068
    snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%" PRIu64, stbName + i + 1, groupId);
4,459✔
3069
  }
3070

3071
  ctbName[cap - strlen(tmp) - 1] = 0;  // put stbname + groupId to the end
4,461✔
3072
  size_t prefixLen = strlen(ctbName);
4,461✔
3073
  ctbName = strncat(ctbName, tmp, cap - prefixLen - 1);
4,461✔
3074

3075
  for (char* p = ctbName; *p; ++p) {
481,377✔
3076
    if (*p == '.') *p = '_';
476,916✔
3077
  }
3078

3079
_end:
4,461✔
3080
  if (code != TSDB_CODE_SUCCESS) {
4,463✔
3081
    uError("%s failed at line %d since %s, ctbName:%s", __func__, lino, tstrerror(code), ctbName);
2!
3082
  }
3083
  return code;
4,464✔
3084
}
3085

3086
// auto stream subtable name starts with 't_', followed by the first segment of MD5 digest for group vals.
3087
// the total length is fixed to be 34 bytes.
3088
bool isAutoTableName(char* ctbName) { return (strlen(ctbName) == 34 && ctbName[0] == 't' && ctbName[1] == '_'); }
27,412!
3089

3090
bool alreadyAddGroupId(char* ctbName, int64_t groupId) {
7,479✔
3091
  char tmp[64] = {0};
7,479✔
3092
  snprintf(tmp, sizeof(tmp), "%" PRIu64, groupId);
7,479✔
3093
  size_t len1 = strlen(ctbName);
7,479✔
3094
  size_t len2 = strlen(tmp);
7,479✔
3095
  if (len1 < len2) return false;
7,479✔
3096
  return memcmp(ctbName + len1 - len2, tmp, len2) == 0;
7,357✔
3097
}
3098

3099
int32_t buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId, char** pName) {
943✔
3100
  QRY_PARAM_CHECK(pName);
943!
3101

3102
  char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
943!
3103
  if (!pBuf) {
943!
3104
    return terrno;
×
3105
  }
3106

3107
  int32_t code = buildCtbNameByGroupIdImpl(stbFullName, groupId, pBuf);
943✔
3108
  if (code != TSDB_CODE_SUCCESS) {
944!
3109
    taosMemoryFree(pBuf);
×
3110
  } else {
3111
    *pName = pBuf;
944✔
3112
  }
3113

3114
  return code;
944✔
3115
}
3116

3117
int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, char* cname) {
15,964✔
3118
  if (stbFullName[0] == 0) {
15,964!
3119
    return TSDB_CODE_INVALID_PARA;
×
3120
  }
3121

3122
  SArray* tags = taosArrayInit(0, sizeof(SSmlKv));
15,964✔
3123
  if (tags == NULL) {
15,964!
3124
    return terrno;
×
3125
  }
3126

3127
  if (cname == NULL) {
15,964!
3128
    taosArrayDestroy(tags);
×
3129
    return TSDB_CODE_INVALID_PARA;
×
3130
  }
3131

3132
  int8_t      type = TSDB_DATA_TYPE_UBIGINT;
15,964✔
3133
  const char* name = "group_id";
15,964✔
3134
  int32_t     len = strlen(name);
15,964✔
3135

3136
  SSmlKv pTag = {.key = name, .keyLen = len, .type = type, .u = groupId, .length = sizeof(uint64_t)};
15,964✔
3137
  void*  px = taosArrayPush(tags, &pTag);
15,964✔
3138
  if (px == NULL) {
15,964!
3139
    return terrno;
×
3140
  }
3141

3142
  RandTableName rname = {
15,964✔
3143
      .tags = tags, .stbFullName = stbFullName, .stbFullNameLen = strlen(stbFullName), .ctbShortName = cname};
15,964✔
3144

3145
  int32_t code = buildChildTableName(&rname);
15,964✔
3146
  if (code != TSDB_CODE_SUCCESS) {
15,964!
3147
    return code;
×
3148
  }
3149

3150
  taosArrayDestroy(tags);
15,964✔
3151
  if ((rname.ctbShortName && rname.ctbShortName[0]) == 0) {
15,965!
3152
    return TSDB_CODE_INVALID_PARA;
×
3153
  }
3154

3155
  return code;
15,965✔
3156
}
3157

3158
int32_t buildSinkDestTableName(char* parTbName, const char* stbFullName, uint64_t gid, bool newSubTableRule,
×
3159
                               char** dstTableName) {
3160
  int32_t code = TSDB_CODE_SUCCESS;
×
3161
  int32_t lino = 0;
×
3162

3163
  if (parTbName[0]) {
×
3164
    if (newSubTableRule && !isAutoTableName(parTbName) && !alreadyAddGroupId(parTbName, gid) && gid != 0 &&
×
3165
        stbFullName) {
3166
      *dstTableName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
×
3167
      TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);
×
3168

3169
      tstrncpy(*dstTableName, parTbName, TSDB_TABLE_NAME_LEN);
×
3170
      code = buildCtbNameAddGroupId(stbFullName, *dstTableName, gid, TSDB_TABLE_NAME_LEN);
×
3171
      TSDB_CHECK_CODE(code, lino, _end);
×
3172
    } else {
3173
      *dstTableName = taosStrdup(parTbName);
×
3174
      TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);
×
3175
    }
3176
  } else {
3177
    code = buildCtbNameByGroupId(stbFullName, gid, dstTableName);
×
3178
    TSDB_CHECK_CODE(code, lino, _end);
×
3179
  }
3180

3181
_end:
×
3182
  return code;
×
3183
}
3184

3185
// return length of encoded data, return -1 if failed
3186
int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, int32_t numOfCols) {
21,969,912✔
3187
  int32_t code = blockDataCheck(pBlock);
21,969,912✔
3188
  if (code != TSDB_CODE_SUCCESS) {
21,934,291!
3189
    terrno = code;
×
3190
    return -1;
×
3191
  }
3192

3193
  int32_t dataLen = 0;
21,942,864✔
3194

3195
  // todo extract method
3196
  int32_t* version = (int32_t*)data;
21,942,864✔
3197
  *version = BLOCK_VERSION_1;
21,942,864✔
3198
  data += sizeof(int32_t);
21,942,864✔
3199

3200
  int32_t* actualLen = (int32_t*)data;
21,942,864✔
3201
  data += sizeof(int32_t);
21,942,864✔
3202

3203
  int32_t* rows = (int32_t*)data;
21,942,864✔
3204
  *rows = pBlock->info.rows;
21,942,864✔
3205
  data += sizeof(int32_t);
21,942,864✔
3206
  if (*rows <= 0) {
21,942,864!
3207
    uError("Invalid rows %d in block", *rows);
×
3208
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3209
    return -1;
×
3210
  }
3211

3212
  int32_t* cols = (int32_t*)data;
21,942,864✔
3213
  *cols = numOfCols;
21,942,864✔
3214
  data += sizeof(int32_t);
21,942,864✔
3215

3216
  // flag segment.
3217
  // the inital bit is for column info
3218
  int32_t* flagSegment = (int32_t*)data;
21,942,864✔
3219
  *flagSegment = (1 << 31);
21,942,864✔
3220

3221
  data += sizeof(int32_t);
21,942,864✔
3222

3223
  uint64_t* groupId = (uint64_t*)data;
21,942,864✔
3224
  data += sizeof(uint64_t);
21,942,864✔
3225

3226
  for (int32_t i = 0; i < numOfCols; ++i) {
94,774,135✔
3227
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
72,845,202✔
3228
    if (pColInfoData == NULL) {
72,828,196!
3229
      return -1;
×
3230
    }
3231

3232
    *((int8_t*)data) = pColInfoData->info.type;
72,828,196✔
3233
    data += sizeof(int8_t);
72,828,196✔
3234

3235
    int32_t bytes = pColInfoData->info.bytes;
72,828,196✔
3236
    *((int32_t*)data) = bytes;
72,828,196✔
3237
    if (IS_DECIMAL_TYPE(pColInfoData->info.type)) {
72,828,196✔
3238
      fillBytesForDecimalType((int32_t*)data, pColInfoData->info.type, pColInfoData->info.precision,
364,481✔
3239
                              pColInfoData->info.scale);
364,481✔
3240
    }
3241
    data += sizeof(int32_t);
72,831,271✔
3242
  }
3243

3244
  int32_t* colSizes = (int32_t*)data;
21,928,933✔
3245
  data += numOfCols * sizeof(int32_t);
21,928,933✔
3246

3247
  dataLen = blockDataGetSerialMetaSize(numOfCols);
21,928,933✔
3248

3249
  int32_t numOfRows = pBlock->info.rows;
21,928,752✔
3250
  for (int32_t col = 0; col < numOfCols; ++col) {
94,617,350✔
3251
    SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);
72,689,349✔
3252
    if (pColRes == NULL) {
72,640,074!
3253
      return -1;
×
3254
    }
3255

3256
    // copy the null bitmap
3257
    size_t metaSize = 0;
72,640,074✔
3258
    if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
72,640,074!
3259
      metaSize = numOfRows * sizeof(int32_t);
16,119,027✔
3260
      if(dataLen + metaSize > dataBuflen) goto _exit;
16,119,027!
3261
      memcpy(data, pColRes->varmeta.offset, metaSize);
16,119,027✔
3262
    } else {
3263
      metaSize = BitmapLen(numOfRows);
56,521,047✔
3264
      if(dataLen + metaSize > dataBuflen) goto _exit;
56,521,047!
3265
      memcpy(data, pColRes->nullbitmap, metaSize);
56,521,047✔
3266
    }
3267

3268
    data += metaSize;
72,640,074✔
3269
    dataLen += metaSize;
72,640,074✔
3270

3271
    if (pColRes->reassigned && IS_VAR_DATA_TYPE(pColRes->info.type)) {
72,640,074!
3272
      colSizes[col] = 0;
×
3273
      for (int32_t row = 0; row < numOfRows; ++row) {
×
3274
        char*   pColData = pColRes->pData + pColRes->varmeta.offset[row];
×
3275
        int32_t colSize = 0;
×
3276
        if (pColRes->info.type == TSDB_DATA_TYPE_JSON) {
×
3277
          colSize = getJsonValueLen(pColData);
×
3278
        } else {
3279
          colSize = varDataTLen(pColData);
×
3280
        }
3281
        colSizes[col] += colSize;
×
3282
        dataLen += colSize;
×
3283
        if(dataLen > dataBuflen) goto _exit;
×
3284
        (void) memmove(data, pColData, colSize);
×
3285
        data += colSize;
×
3286
      }
3287
    } else {
3288
      colSizes[col] = colDataGetLength(pColRes, numOfRows);
72,640,074✔
3289
      dataLen += colSizes[col];
72,688,598✔
3290
      if(dataLen > dataBuflen) goto _exit;
72,688,598!
3291
      if (pColRes->pData != NULL) {
72,688,598✔
3292
        (void) memmove(data, pColRes->pData, colSizes[col]);
71,203,417✔
3293
      }
3294
      data += colSizes[col];
72,688,598✔
3295
    }
3296

3297
    if (colSizes[col] <= 0 && !colDataIsNull_s(pColRes, 0) && pColRes->info.type != TSDB_DATA_TYPE_NULL) {
74,236,875!
3298
      uError("Invalid colSize:%d colIdx:%d colType:%d while encoding block", colSizes[col], col, pColRes->info.type);
×
3299
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3300
      return -1;
×
3301
    }
3302
    
3303
    colSizes[col] = htonl(colSizes[col]);
72,688,598✔
3304
    //    uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type,
3305
    //    htonl(colSizes[col]), colSizes[col]);
3306
  }
3307

3308
  bool* blankFill = (bool*)data;
21,928,001✔
3309
  *blankFill = pBlock->info.blankFill;
21,928,001✔
3310
  data += sizeof(bool);
21,928,001✔
3311

3312
  *actualLen = dataLen;
21,928,001✔
3313
#ifndef NO_UNALIGNED_ACCESS
3314
  *groupId = pBlock->info.id.groupId;
21,928,001✔
3315
#else
3316
  taosSetPUInt64Aligned(groupId, &pBlock->info.id.groupId);
3317
#endif
3318
  if (dataLen > dataBuflen) goto _exit;
21,928,001!
3319

3320
  return dataLen;
21,928,001✔
3321

3322
_exit:
×
3323
  uError("blockEncode dataLen:%d, dataBuflen:%zu", dataLen, dataBuflen);
×
3324
  terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3325
  return -1;
×
3326
}
3327

3328
int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos) {
15,412,047✔
3329
  const char* pStart = pData;
15,412,047✔
3330

3331
  int32_t version = *(int32_t*)pStart;
15,412,047✔
3332
  pStart += sizeof(int32_t);
15,412,047✔
3333

3334
  // total length sizeof(int32_t)
3335
  int32_t dataLen = *(int32_t*)pStart;
15,412,047✔
3336
  pStart += sizeof(int32_t);
15,412,047✔
3337

3338
  // total rows sizeof(int32_t)
3339
  int32_t numOfRows = *(int32_t*)pStart;
15,412,047✔
3340
  pStart += sizeof(int32_t);
15,412,047✔
3341
  if (numOfRows <= 0) {
15,412,047!
3342
    uError("block decode numOfRows:%d error", numOfRows);
×
3343
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3344
    return terrno;
×
3345
  }
3346

3347
  // total columns sizeof(int32_t)
3348
  int32_t numOfCols = *(int32_t*)pStart;
15,412,047✔
3349
  pStart += sizeof(int32_t);
15,412,047✔
3350

3351
  // has column info segment
3352
  int32_t flagSeg = *(int32_t*)pStart;
15,412,047✔
3353
  int32_t hasColumnInfo = (flagSeg >> 31);
15,412,047✔
3354
  pStart += sizeof(int32_t);
15,412,047✔
3355

3356
  // group id sizeof(uint64_t)
3357
#ifndef NO_UNALIGNED_ACCESS
3358
  pBlock->info.id.groupId = *(uint64_t*)pStart;
15,412,047✔
3359
#else
3360
  taosSetPUInt64Aligned(&pBlock->info.id.groupId, (uint64_t*)pStart);
3361
#endif
3362
  pStart += sizeof(uint64_t);
15,412,047✔
3363

3364
  if (pBlock->pDataBlock == NULL) {
15,412,047✔
3365
    pBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols);
45,819✔
3366
    if (pBlock->pDataBlock == NULL) {
45,840!
3367
      return terrno;
×
3368
    }
3369
  }
3370

3371
  for (int32_t i = 0; i < numOfCols; ++i) {
68,849,597✔
3372
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
53,442,931✔
3373
    if (pColInfoData == NULL) {
53,435,703!
3374
      return terrno;
×
3375
    }
3376

3377
    pColInfoData->info.type = *(int8_t*)pStart;
53,435,703✔
3378
    pStart += sizeof(int8_t);
53,435,703✔
3379

3380
    pColInfoData->info.bytes = *(int32_t*)pStart;
53,435,703✔
3381
    if (IS_DECIMAL_TYPE(pColInfoData->info.type)) {
53,435,703✔
3382
      extractDecimalTypeInfoFromBytes(&pColInfoData->info.bytes, &pColInfoData->info.precision,
359,049✔
3383
                                      &pColInfoData->info.scale);
3384
    }
3385
    pStart += sizeof(int32_t);
53,436,317✔
3386

3387
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
53,436,317✔
3388
      pBlock->info.hasVarCol = true;
12,870,789✔
3389
    }
3390
  }
3391

3392
  int32_t code = blockDataEnsureCapacity(pBlock, numOfRows);
15,406,666✔
3393
  if (code) {
15,411,413!
3394
    return code;
×
3395
  }
3396

3397
  int32_t* colLen = (int32_t*)pStart;
15,411,413✔
3398
  pStart += sizeof(int32_t) * numOfCols;
15,411,413✔
3399

3400
  for (int32_t i = 0; i < numOfCols; ++i) {
68,851,267✔
3401
    colLen[i] = htonl(colLen[i]);
53,436,353✔
3402
    if (colLen[i] < 0) {
53,436,353!
3403
      uError("block decode colLen:%d error, colIdx:%d", colLen[i], i);
×
3404
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3405
      return terrno;
×
3406
    }
3407

3408
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
53,436,353✔
3409
    if (pColInfoData == NULL) {
53,410,293!
3410
      return terrno;
×
3411
    }
3412

3413
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
53,432,239!
3414
      memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows);
12,867,614✔
3415
      pStart += sizeof(int32_t) * numOfRows;
12,867,614✔
3416

3417
      if (colLen[i] > 0 && pColInfoData->varmeta.allocLen < colLen[i]) {
12,867,614✔
3418
        char* tmp = taosMemoryRealloc(pColInfoData->pData, colLen[i]);
8,714,306!
3419
        if (tmp == NULL) {
8,721,921!
3420
          return terrno;
×
3421
        }
3422

3423
        pColInfoData->pData = tmp;
8,721,921✔
3424
        pColInfoData->varmeta.allocLen = colLen[i];
8,721,921✔
3425
      }
3426

3427
      pColInfoData->varmeta.length = colLen[i];
12,875,229✔
3428
    } else {
3429
      memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
40,564,625✔
3430
      pStart += BitmapLen(numOfRows);
40,564,625✔
3431
    }
3432

3433
    // TODO
3434
    // setting this flag to true temporarily so aggregate function on stable will
3435
    // examine NULL value for non-primary key column
3436
    pColInfoData->hasNull = true;
53,439,854✔
3437

3438
    if (colLen[i] > 0) {
53,439,854✔
3439
      memcpy(pColInfoData->pData, pStart, colLen[i]);
52,226,984✔
3440
    } else if (!colDataIsNull_s(pColInfoData, 0) && pColInfoData->info.type != TSDB_DATA_TYPE_NULL) {
1,212,870!
3441
      uError("block decode colLen:%d error, colIdx:%d, type:%d", colLen[i], i, pColInfoData->info.type);
×
3442
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3443
      return terrno;
×
3444
    }
3445

3446
    pStart += colLen[i];
53,439,854✔
3447
  }
3448

3449
  bool blankFill = *(bool*)pStart;
15,414,914✔
3450
  pStart += sizeof(bool);
15,414,914✔
3451

3452
  pBlock->info.dataLoad = 1;
15,414,914✔
3453
  pBlock->info.rows = numOfRows;
15,414,914✔
3454
  pBlock->info.blankFill = blankFill;
15,414,914✔
3455
  if (pStart - pData != dataLen) {
15,414,914!
3456
    uError("block decode msg len error, pStart:%p, pData:%p, dataLen:%d", pStart, pData, dataLen);
×
3457
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3458
    return terrno;
×
3459
  }
3460

3461
  *pEndPos = pStart;
15,414,914✔
3462

3463
  code = blockDataCheck(pBlock);
15,414,914✔
3464
  if (code != TSDB_CODE_SUCCESS) {
15,408,934!
3465
    terrno = code;
×
3466
    return code;
×
3467
  }
3468

3469
  return TSDB_CODE_SUCCESS;
15,408,979✔
3470
}
3471

3472
int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) {
996,481✔
3473
  //  int32_t totalRows = pBlock->info.rows;
3474
  int32_t code = 0;
996,481✔
3475
  int32_t bmLen = BitmapLen(totalRows);
996,481✔
3476
  char*   pBitmap = NULL;
996,481✔
3477
  int32_t maxRows = 0;
996,481✔
3478

3479
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
996,481✔
3480
  if (!pBoolList) {
996,560✔
3481
    for (int32_t i = 0; i < numOfCols; ++i) {
2,520,675✔
3482
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1,939,173✔
3483
      // it is a reserved column for scalar function, and no data in this column yet.
3484
      if (pDst->pData == NULL) {
1,939,231✔
3485
        continue;
80,128✔
3486
      }
3487

3488
      int32_t numOfRows = 0;
1,859,103✔
3489
      if (IS_VAR_DATA_TYPE(pDst->info.type)) {
1,859,103✔
3490
        pDst->varmeta.length = 0;
208,802✔
3491
      } else {
3492
        memset(pDst->nullbitmap, 0, bmLen);
1,650,301✔
3493
      }
3494
    }
3495
    return code;
581,502✔
3496
  }
3497

3498
  for (int32_t i = 0; i < numOfCols; ++i) {
1,957,566✔
3499
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1,542,331✔
3500
    // it is a reserved column for scalar function, and no data in this column yet.
3501
    if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) {
1,542,162✔
3502
      continue;
55,142✔
3503
    }
3504

3505
    int32_t numOfRows = 0;
1,487,020✔
3506
    if (IS_VAR_DATA_TYPE(pDst->info.type)) {
1,754,102✔
3507
      int32_t j = 0;
267,301✔
3508
      pDst->varmeta.length = 0;
267,301✔
3509

3510
      while (j < totalRows) {
45,585,151✔
3511
        if (pBoolList[j] == 0) {
45,318,069✔
3512
          j += 1;
35,486,994✔
3513
          continue;
35,486,994✔
3514
        }
3515

3516
        if (colDataIsNull_var(pDst, j)) {
9,831,075✔
3517
          colDataSetNull_var(pDst, numOfRows);
33,287✔
3518
        } else {
3519
          // fix address sanitizer error. p1 may point to memory that will change during realloc of colDataSetVal, first
3520
          // copy it to p2
3521
          char*   p1 = colDataGetVarData(pDst, j);
9,797,788✔
3522
          int32_t len = 0;
9,797,788✔
3523
          if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
9,797,788✔
3524
            len = getJsonValueLen(p1);
60✔
3525
          } else {
3526
            len = varDataTLen(p1);
9,797,728✔
3527
          }
3528

3529
          char* p2 = taosMemoryMalloc(len);
9,797,788!
3530
          if (p2 == NULL) {
9,798,170!
3531
            return terrno;
×
3532
          }
3533

3534
          memcpy(p2, p1, len);
9,798,170✔
3535
          code = colDataSetVal(pDst, numOfRows, p2, false);
9,798,170✔
3536
          taosMemoryFree(p2);
9,797,472!
3537
          if (code) {
9,797,569!
3538
            return code;
×
3539
          }
3540
        }
3541
        numOfRows += 1;
9,830,856✔
3542
        j += 1;
9,830,856✔
3543
      }
3544

3545
      if (maxRows < numOfRows) {
267,082✔
3546
        maxRows = numOfRows;
41,335✔
3547
      }
3548
    } else {
3549
      if (pBitmap == NULL) {
1,219,719✔
3550
        pBitmap = taosMemoryCalloc(1, bmLen);
406,500!
3551
        if (pBitmap == NULL) {
407,007!
3552
          return terrno;
×
3553
        }
3554
      }
3555

3556
      memcpy(pBitmap, pDst->nullbitmap, bmLen);
1,220,226✔
3557
      memset(pDst->nullbitmap, 0, bmLen);
1,220,226✔
3558

3559
      int32_t j = 0;
1,220,226✔
3560

3561
      switch (pDst->info.type) {
1,220,226✔
3562
        case TSDB_DATA_TYPE_BIGINT:
735,657✔
3563
        case TSDB_DATA_TYPE_UBIGINT:
3564
        case TSDB_DATA_TYPE_DOUBLE:
3565
        case TSDB_DATA_TYPE_TIMESTAMP:
3566
          while (j < totalRows) {
615,126,823✔
3567
            if (pBoolList[j] == 0) {
614,391,166✔
3568
              j += 1;
341,937,029✔
3569
              continue;
341,937,029✔
3570
            }
3571

3572
            if (colDataIsNull_f(pBitmap, j)) {
272,454,137✔
3573
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
1,648,648✔
3574
            } else {
3575
              ((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j];
270,805,489✔
3576
            }
3577
            numOfRows += 1;
272,454,137✔
3578
            j += 1;
272,454,137✔
3579
          }
3580
          break;
735,657✔
3581
        case TSDB_DATA_TYPE_FLOAT:
350,860✔
3582
        case TSDB_DATA_TYPE_INT:
3583
        case TSDB_DATA_TYPE_UINT:
3584
          while (j < totalRows) {
227,784,605✔
3585
            if (pBoolList[j] == 0) {
227,433,745✔
3586
              j += 1;
125,578,625✔
3587
              continue;
125,578,625✔
3588
            }
3589
            if (colDataIsNull_f(pBitmap, j)) {
101,855,120✔
3590
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
1,072,109✔
3591
            } else {
3592
              ((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j];
100,783,011✔
3593
            }
3594
            numOfRows += 1;
101,855,120✔
3595
            j += 1;
101,855,120✔
3596
          }
3597
          break;
350,860✔
3598
        case TSDB_DATA_TYPE_SMALLINT:
54,472✔
3599
        case TSDB_DATA_TYPE_USMALLINT:
3600
          while (j < totalRows) {
1,147,407✔
3601
            if (pBoolList[j] == 0) {
1,092,935✔
3602
              j += 1;
606,110✔
3603
              continue;
606,110✔
3604
            }
3605
            if (colDataIsNull_f(pBitmap, j)) {
486,825✔
3606
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
51,992✔
3607
            } else {
3608
              ((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j];
434,833✔
3609
            }
3610
            numOfRows += 1;
486,825✔
3611
            j += 1;
486,825✔
3612
          }
3613
          break;
54,472✔
3614
        case TSDB_DATA_TYPE_BOOL:
71,145✔
3615
        case TSDB_DATA_TYPE_TINYINT:
3616
        case TSDB_DATA_TYPE_UTINYINT:
3617
          while (j < totalRows) {
1,780,303✔
3618
            if (pBoolList[j] == 0) {
1,709,158✔
3619
              j += 1;
894,611✔
3620
              continue;
894,611✔
3621
            }
3622
            if (colDataIsNull_f(pBitmap, j)) {
814,547✔
3623
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
91,391✔
3624
            } else {
3625
              ((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j];
723,156✔
3626
            }
3627
            numOfRows += 1;
814,547✔
3628
            j += 1;
814,547✔
3629
          }
3630
          break;
71,145✔
3631
        case TSDB_DATA_TYPE_DECIMAL64:
7,963✔
3632
        case TSDB_DATA_TYPE_DECIMAL:
3633
          while (j < totalRows) {
7,951,124✔
3634
            if (pBoolList[j] == 0) {
7,943,161✔
3635
              j += 1;
2,736,720✔
3636
              continue;
2,736,720✔
3637
            }
3638
            if (colDataIsNull_f(pBitmap, j)) {
5,206,441!
3639
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
3640
            } else {
3641
              memcpy(pDst->pData + numOfRows * pDst->info.bytes, pDst->pData + j * pDst->info.bytes, pDst->info.bytes);
5,206,441✔
3642
            }
3643
            numOfRows += 1;
5,206,441✔
3644
            j += 1;
5,206,441✔
3645
          }
3646
          break;
7,963✔
3647
      }
3648
    }
3649

3650
    if (maxRows < numOfRows) {
1,487,308✔
3651
      maxRows = numOfRows;
365,856✔
3652
    }
3653
  }
3654

3655
  pBlock->info.rows = maxRows;
415,235✔
3656
  if (pBitmap != NULL) {
415,235✔
3657
    taosMemoryFree(pBitmap);
406,644✔
3658
  }
3659

3660
  return code;
415,289✔
3661
}
3662

3663
int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
22,024,465✔
3664
  return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
22,024,465✔
3665
}
3666

3667
int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
603,943✔
3668
  if (!pDataBlock || !pOrderInfo) return 0;
603,943!
3669
  for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
1,207,798✔
3670
    SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, i);
603,287✔
3671
    if (pOrder == NULL) {
603,845!
3672
      continue;
×
3673
    }
3674

3675
    pOrder->pColData = taosArrayGet(pDataBlock->pDataBlock, pOrder->slotId);
603,845✔
3676
    if (pOrder->pColData == NULL) {
603,768!
3677
      continue;
×
3678
    }
3679

3680
    pOrder->compFn = getKeyComparFunc(pOrder->pColData->info.type, pOrder->order);
603,768✔
3681
  }
3682

3683
  SSDataBlockSortHelper sortHelper = {.orderInfo = pOrderInfo, .pDataBlock = pDataBlock};
603,811✔
3684

3685
  int32_t rowIdx = 0, nextRowIdx = 1;
603,811✔
3686
  for (; rowIdx < pDataBlock->info.rows && nextRowIdx < pDataBlock->info.rows; ++rowIdx, ++nextRowIdx) {
15,810,719✔
3687
    if (dataBlockCompar(&nextRowIdx, &rowIdx, &sortHelper) < 0) {
15,404,067✔
3688
      break;
198,923✔
3689
    }
3690
  }
3691

3692
  return nextRowIdx;
605,575✔
3693
}
3694

3695
#define BLOCK_DATA_CHECK_TRESSA(o)                      \
3696
  if (!(o)) {                                           \
3697
    uError("blockDataCheck failed! line:%d", __LINE__); \
3698
    return TSDB_CODE_INTERNAL_ERROR;                    \
3699
  }
3700
int32_t blockDataCheck(const SSDataBlock* pDataBlock) {
125,707,550✔
3701
  if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER || NULL == pDataBlock || pDataBlock->info.rows == 0) {
125,707,550!
3702
    return TSDB_CODE_SUCCESS;
21,258,446✔
3703
  }
3704

3705
  BLOCK_DATA_CHECK_TRESSA(pDataBlock->info.rows > 0);
104,449,104!
3706
  if (!pDataBlock->info.dataLoad) {
104,449,104✔
3707
    return TSDB_CODE_SUCCESS;
7,972,471✔
3708
  }
3709

3710
  bool isVarType = false;
96,476,633✔
3711
  int32_t colLen = 0;
96,476,633✔
3712
  int32_t nextPos = 0;
96,476,633✔
3713
  int64_t checkRows = 0;
96,476,633✔
3714
  int64_t typeValue = 0;
96,476,633✔
3715
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
96,476,633✔
3716
  for (int32_t i = 0; i < colNum; ++i) {
417,575,275✔
3717
    SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, i);
324,648,613✔
3718
    BLOCK_DATA_CHECK_TRESSA(pCol != NULL);
324,397,662!
3719
    isVarType = IS_VAR_DATA_TYPE(pCol->info.type);
325,315,226!
3720
    checkRows = pDataBlock->info.rows;
325,315,226✔
3721
    if (pCol->info.noData == true) continue;
325,315,226✔
3722

3723
    if (isVarType) {
321,989,943✔
3724
      BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset);
68,650,203!
3725
    } else {
3726
      BLOCK_DATA_CHECK_TRESSA(pCol->nullbitmap);
253,339,740!
3727
    }
3728

3729
    nextPos = -1;
321,989,943✔
3730
    for (int64_t r = 0; r < checkRows; ++r) {
2,147,483,647✔
3731
      if (tsSafetyCheckLevel <= TSDB_SAFETY_CHECK_LEVELL_NORMAL) break;
2,147,483,647✔
3732
      if (!colDataIsNull_s(pCol, r)) {
2,147,483,647✔
3733
        BLOCK_DATA_CHECK_TRESSA(pCol->pData);
2,147,483,647!
3734
        BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.length <= pCol->varmeta.allocLen);
2,147,483,647!
3735

3736
        if (isVarType) {
2,147,483,647✔
3737
          BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.allocLen > 0);
2,147,483,647!
3738
          BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] <= pCol->varmeta.length);
2,147,483,647!
3739
          if (pCol->reassigned) {
2,147,483,647!
3740
            BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] >= 0);
×
3741
          } else if (0 == r || nextPos == -1) {
2,147,483,647✔
3742
            nextPos = pCol->varmeta.offset[r];
24,171,152✔
3743
          } else {
3744
            BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] == nextPos);
2,147,483,647!
3745
          }
3746

3747
          char*   pColData = pCol->pData + pCol->varmeta.offset[r];
2,147,483,647✔
3748
          int32_t colSize = 0;
2,147,483,647✔
3749
          if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
3750
            colLen = getJsonValueLen(pColData);
8,281✔
3751
          } else {
3752
            colLen = varDataTLen(pColData);
2,147,483,647✔
3753
          }
3754

3755
          if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
3756
            BLOCK_DATA_CHECK_TRESSA(colLen >= CHAR_BYTES);
8,280!
3757
          } else {
3758
            BLOCK_DATA_CHECK_TRESSA(colLen >= VARSTR_HEADER_SIZE);
2,147,483,647!
3759
          }
3760
          BLOCK_DATA_CHECK_TRESSA(colLen <= pCol->info.bytes);
2,147,483,647!
3761

3762
          if (pCol->reassigned) {
2,147,483,647!
3763
            BLOCK_DATA_CHECK_TRESSA((pCol->varmeta.offset[r] + colLen) <= pCol->varmeta.length);
×
3764
          } else {
3765
            nextPos += colLen;
2,147,483,647✔
3766
            BLOCK_DATA_CHECK_TRESSA(nextPos <= pCol->varmeta.length);
2,147,483,647!
3767
          }
3768

3769
          typeValue = *(char*)(pCol->pData + pCol->varmeta.offset[r] + colLen - 1);
2,147,483,647✔
3770
        } else {
3771
          if (TSDB_DATA_TYPE_FLOAT == pCol->info.type) {
2,147,483,647✔
3772
            float v = 0;
291,382,288✔
3773
            GET_TYPED_DATA(v, float, pCol->info.type, colDataGetNumData(pCol, r), typeGetTypeModFromColInfo(&pCol->info));
291,382,288!
3774
          } else if (TSDB_DATA_TYPE_DOUBLE == pCol->info.type) {
2,147,483,647✔
3775
            double v = 0;
385,989,990✔
3776
            GET_TYPED_DATA(v, double, pCol->info.type, colDataGetNumData(pCol, r), typeGetTypeModFromColInfo(&pCol->info));
385,989,990!
3777
          } else if (IS_DECIMAL_TYPE(pCol->info.type)) {
2,147,483,647!
3778
            // SKIP for decimal types
3779
          } else {
3780
            GET_TYPED_DATA(typeValue, int64_t, pCol->info.type, colDataGetNumData(pCol, r), typeGetTypeModFromColInfo(&pCol->info));
2,147,483,647!
3781
          }
3782
        }
3783
      }
3784
    }
3785
  }
3786

3787
  return TSDB_CODE_SUCCESS;
92,926,662✔
3788
}
3789

3790

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc