• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3768

28 Mar 2025 10:15AM UTC coverage: 33.726% (-0.3%) from 33.993%
#3768

push

travis-ci

happyguoxy
test:alter lcov result

144891 of 592084 branches covered (24.47%)

Branch coverage included in aggregate %.

218795 of 486283 relevant lines covered (44.99%)

765715.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.9
/source/common/src/tdatablock.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tdatablock.h"
18
#include "tcompare.h"
19
#include "tlog.h"
20
#include "tname.h"
21
#include "tglobal.h"
22

23
#define MALLOC_ALIGN_BYTES 32
24

25
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
904,013✔
26
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
904,013!
27
    if (pColumnInfoData->reassigned) {
171,329!
28
      int32_t totalSize = 0;
×
29
      for (int32_t row = 0; row < numOfRows; ++row) {
×
30
        char*   pColData = pColumnInfoData->pData + pColumnInfoData->varmeta.offset[row];
×
31
        int32_t colSize = 0;
×
32
        if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
×
33
          colSize = getJsonValueLen(pColData);
×
34
        } else {
35
          colSize = varDataTLen(pColData);
×
36
        }
37
        totalSize += colSize;
×
38
      }
39
      return totalSize;
×
40
    }
41
    return pColumnInfoData->varmeta.length;
171,329✔
42
  } else {
43
    if (pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) {
732,684!
44
      return 0;
×
45
    } else {
46
      return pColumnInfoData->info.bytes * numOfRows;
732,684✔
47
    }
48
  }
49
}
50

51
int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx) {
×
52
  if (colDataIsNull_s(pColumnInfoData, rowIdx)) {
×
53
    return 0;
×
54
  }
55

56
  if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) return pColumnInfoData->info.bytes;
×
57
  if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON)
×
58
    return getJsonValueLen(colDataGetData(pColumnInfoData, rowIdx));
×
59
  else
60
    return varDataTLen(colDataGetData(pColumnInfoData, rowIdx));
×
61
}
62

63
int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
899,862✔
64
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
899,862!
65
    return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows;
171,099✔
66
  } else {
67
    return ((pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) ? 0 : pColumnInfoData->info.bytes * numOfRows) +
728,763!
68
           BitmapLen(numOfRows);
728,763✔
69
  }
70
}
71

72
int32_t getJsonValueLen(const char* data) {
665✔
73
  int32_t dataLen = 0;
665✔
74
  if (*data == TSDB_DATA_TYPE_NULL) {
665✔
75
    dataLen = CHAR_BYTES;
35✔
76
  } else if (*data == TSDB_DATA_TYPE_NCHAR) {
630✔
77
    dataLen = varDataTLen(data + CHAR_BYTES) + CHAR_BYTES;
70✔
78
  } else if (*data == TSDB_DATA_TYPE_DOUBLE) {
560✔
79
    dataLen = DOUBLE_BYTES + CHAR_BYTES;
140✔
80
  } else if (*data == TSDB_DATA_TYPE_BOOL) {
420✔
81
    dataLen = CHAR_BYTES + CHAR_BYTES;
70✔
82
  } else if (tTagIsJson(data)) {  // json string
350!
83
    dataLen = ((STag*)(data))->len;
350✔
84
  } else {
85
    uError("Invalid data type:%d in Json", *data);
×
86
  }
87
  return dataLen;
665✔
88
}
89

90
static int32_t getDataLen(int32_t type, const char* pData) {
5,714,147✔
91
  int32_t dataLen = 0;
5,714,147✔
92
  if (type == TSDB_DATA_TYPE_JSON) {
5,714,147✔
93
    dataLen = getJsonValueLen(pData);
665✔
94
  } else {
95
    dataLen = varDataTLen(pData);
5,713,482✔
96
  }
97
  return dataLen;
5,717,027✔
98
}
99

100
static int32_t colDataSetValHelp(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
95,809,320✔
101
  if (isNull || pData == NULL) {
95,809,320!
102
    // There is a placehold for each NULL value of binary or nchar type.
103
    if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
17,026!
104
      pColumnInfoData->varmeta.offset[rowIndex] = -1;  // it is a null value of VAR type.
105
    } else {
106
      colDataSetNull_f_s(pColumnInfoData, rowIndex);
242,402✔
107
    }
108

109
    pColumnInfoData->hasNull = true;
17,026✔
110
    return 0;
17,026✔
111
  }
112

113
  int32_t type = pColumnInfoData->info.type;
95,792,294✔
114
  if (IS_VAR_DATA_TYPE(type)) {
95,792,294!
115
    int32_t dataLen = getDataLen(type, pData);
5,623,025✔
116
    if (pColumnInfoData->varmeta.offset[rowIndex] > 0) {
5,716,714!
117
      pColumnInfoData->varmeta.length = pColumnInfoData->varmeta.offset[rowIndex];
×
118
    }
119

120
    SVarColAttr* pAttr = &pColumnInfoData->varmeta;
5,716,714✔
121
    if (pAttr->allocLen < pAttr->length + dataLen) {
5,716,714✔
122
      uint32_t newSize = pAttr->allocLen;
131,052✔
123
      if (newSize <= 1) {
131,052✔
124
        newSize = 8;
114,414✔
125
      }
126

127
      while (newSize < pAttr->length + dataLen) {
324,048✔
128
        newSize = newSize * 1.5;
192,996✔
129
        if (newSize > UINT32_MAX) {
130
          return TSDB_CODE_OUT_OF_MEMORY;
131
        }
132
      }
133

134
      char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
131,052!
135
      if (buf == NULL) {
131,095!
136
        return terrno;
×
137
      }
138

139
      pColumnInfoData->pData = buf;
131,095✔
140
      pAttr->allocLen = newSize;
131,095✔
141
    }
142

143
    uint32_t len = pColumnInfoData->varmeta.length;
5,716,757✔
144
    pColumnInfoData->varmeta.offset[rowIndex] = len;
5,716,757✔
145

146
    (void)memmove(pColumnInfoData->pData + len, pData, dataLen);
5,716,757✔
147
    pColumnInfoData->varmeta.length += dataLen;
5,716,757✔
148
  } else {
149
    memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex, pData, pColumnInfoData->info.bytes);
90,169,269✔
150
    colDataClearNull_f(pColumnInfoData->nullbitmap, rowIndex);
90,169,269✔
151
  }
152

153
  return 0;
95,886,026✔
154
}
155

156
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
95,704,390✔
157
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
95,704,390!
158
   pColumnInfoData->varmeta.offset[rowIndex] = -1;
5,716,808✔
159
  }
160

161
  return colDataSetValHelp(pColumnInfoData, rowIndex, pData, isNull);
95,704,390✔
162
}
163

164
int32_t colDataSetValOrCover(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
2,460✔
165
  return colDataSetValHelp(pColumnInfoData, rowIndex, pData, isNull);
2,460✔
166
}
167

168
int32_t varColSetVarData(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pVarData, int32_t varDataLen,
×
169
                         bool isNull) {
170
  if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
×
171
    return TSDB_CODE_INVALID_PARA;
×
172
  }
173

174
  if (isNull || pVarData == NULL) {
×
175
    pColumnInfoData->varmeta.offset[rowIndex] = -1;  // it is a null value of VAR type.
×
176
    pColumnInfoData->hasNull = true;
×
177
    return TSDB_CODE_SUCCESS;
×
178
  }
179

180
  int32_t dataLen = VARSTR_HEADER_SIZE + varDataLen;
×
181
  if (pColumnInfoData->varmeta.offset[rowIndex] > 0) {
×
182
    pColumnInfoData->varmeta.length = pColumnInfoData->varmeta.offset[rowIndex];
×
183
  }
184

185
  SVarColAttr* pAttr = &pColumnInfoData->varmeta;
×
186
  if (pAttr->allocLen < pAttr->length + dataLen) {
×
187
    uint32_t newSize = pAttr->allocLen;
×
188
    if (newSize <= 1) {
×
189
      newSize = 8;
×
190
    }
191

192
    while (newSize < pAttr->length + dataLen) {
×
193
      newSize = newSize * 1.5;
×
194
      if (newSize > UINT32_MAX) {
195
        return TSDB_CODE_OUT_OF_MEMORY;
196
      }
197
    }
198

199
    char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
×
200
    if (buf == NULL) {
×
201
      return terrno;
×
202
    }
203

204
    pColumnInfoData->pData = buf;
×
205
    pAttr->allocLen = newSize;
×
206
  }
207

208
  uint32_t len = pColumnInfoData->varmeta.length;
×
209
  pColumnInfoData->varmeta.offset[rowIndex] = len;
×
210

211
  (void)memmove(varDataVal(pColumnInfoData->pData + len), pVarData, varDataLen);
×
212
  varDataSetLen(pColumnInfoData->pData + len, varDataLen);
×
213
  pColumnInfoData->varmeta.length += dataLen;
×
214
  return TSDB_CODE_SUCCESS;
×
215
}
216

217
int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx,
×
218
                           const char* pData) {
219
  int32_t type = pColumnInfoData->info.type;
×
220
  if (IS_VAR_DATA_TYPE(type)) {
×
221
    pColumnInfoData->varmeta.offset[dstRowIdx] = pColumnInfoData->varmeta.offset[srcRowIdx];
×
222
    pColumnInfoData->reassigned = true;
×
223
  } else {
224
    memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * dstRowIdx, pData, pColumnInfoData->info.bytes);
×
225
    colDataClearNull_f(pColumnInfoData->nullbitmap, dstRowIdx);
×
226
  }
227

228
  return 0;
×
229
}
230

231
static int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) {
140,560✔
232
  if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
140,560!
233
    return TSDB_CODE_SUCCESS;
×
234
  }
235

236
  if (pColumnInfoData->varmeta.allocLen >= newSize) {
140,560!
237
    return TSDB_CODE_SUCCESS;
×
238
  }
239

240
  if (pColumnInfoData->varmeta.allocLen < newSize) {
140,560!
241
    char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
140,560!
242
    if (buf == NULL) {
140,560!
243
      return terrno;
×
244
    }
245

246
    pColumnInfoData->pData = buf;
140,560✔
247
    pColumnInfoData->varmeta.allocLen = newSize;
140,560✔
248
  }
249

250
  return TSDB_CODE_SUCCESS;
140,560✔
251
}
252

253
static int32_t doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData,
14,288,025✔
254
                            int32_t itemLen, int32_t numOfRows, bool trimValue) {
255
  if (pColumnInfoData->info.bytes < itemLen) {
14,288,025!
256
    uWarn("column/tag actual data len %d is bigger than schema len %d, trim it:%d", itemLen,
×
257
          pColumnInfoData->info.bytes, trimValue);
258
    if (trimValue) {
×
259
      itemLen = pColumnInfoData->info.bytes;
×
260
    } else {
261
      return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
262
    }
263
  }
264

265
  size_t   start = 1;
14,288,025✔
266
  int32_t  t = 0;
14,288,025✔
267
  int32_t  count = log(numOfRows) / log(2);
14,288,025✔
268
  uint32_t startOffset =
14,288,025✔
269
      (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) ? pColumnInfoData->varmeta.length : (currentRow * itemLen);
14,288,025!
270

271
  // the first item
272
  memcpy(pColumnInfoData->pData + startOffset, pData, itemLen);
14,288,025✔
273

274
  while (t < count) {
14,380,892✔
275
    int32_t xlen = 1 << t;
92,867✔
276
    memcpy(pColumnInfoData->pData + start * itemLen + startOffset, pColumnInfoData->pData + startOffset,
92,867✔
277
           xlen * itemLen);
92,867✔
278
    t += 1;
92,867✔
279
    start += xlen;
92,867✔
280
  }
281

282
  // the tail part
283
  if (numOfRows > start) {
14,288,025✔
284
    memcpy(pColumnInfoData->pData + start * itemLen + startOffset, pColumnInfoData->pData + startOffset,
1,908✔
285
           (numOfRows - start) * itemLen);
1,908✔
286
  }
287

288
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
14,288,025!
289
    for (int32_t i = 0; i < numOfRows; ++i) {
35,008,612✔
290
      pColumnInfoData->varmeta.offset[i + currentRow] = pColumnInfoData->varmeta.length + i * itemLen;
30,835,729✔
291
    }
292

293
    pColumnInfoData->varmeta.length += numOfRows * itemLen;
4,172,883✔
294
  }
295

296
  return TSDB_CODE_SUCCESS;
14,288,025✔
297
}
298

299
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows,
14,288,027✔
300
                         bool trimValue) {
301
  int32_t len = pColumnInfoData->info.bytes;
14,288,027✔
302
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
14,288,027!
303
    if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
4,172,882!
304
      len = getJsonValueLen(pData);
×
305
    } else {
306
      len = varDataTLen(pData);
4,172,882✔
307
    }
308
    if (pColumnInfoData->varmeta.allocLen < (numOfRows * len + pColumnInfoData->varmeta.length)) {
4,172,882✔
309
      int32_t code = colDataReserve(pColumnInfoData, (numOfRows * len + pColumnInfoData->varmeta.length));
140,560✔
310
      if (code != TSDB_CODE_SUCCESS) {
140,560!
311
        return code;
×
312
      }
313
    }
314
  }
315

316
  return doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows, trimValue);
14,288,027✔
317
}
318

319
void colDataSetNItemsNull(SColumnInfoData* pColumnInfoData, uint32_t currentRow, uint32_t numOfRows) {
×
320
  pColumnInfoData->hasNull = true;
×
321

322
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
×
323
    memset(&pColumnInfoData->varmeta.offset[currentRow], -1, sizeof(int32_t) * numOfRows);
×
324
  } else {
325
    if (numOfRows < 16) {
×
326
      for (int32_t i = 0; i < numOfRows; ++i) {
×
327
        colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
×
328
      }
329
    } else {
330
      int32_t i = 0;
×
331
      for (; i < numOfRows; ++i) {
×
332
        if (BitPos(currentRow + i)) {
×
333
          colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
×
334
        } else {
335
          break;
×
336
        }
337
      }
338

339
      int32_t bytes = (numOfRows - i) / 8;
×
340
      memset(&BMCharPos(pColumnInfoData->nullbitmap, currentRow + i), 0xFF, bytes);
×
341
      i += bytes * 8;
×
342

343
      for (; i < numOfRows; ++i) {
×
344
        colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
×
345
      }
346
    }
347
  }
348
}
×
349

350
int32_t colDataCopyAndReassign(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData,
×
351
                               uint32_t numOfRows) {
352
  int32_t code = colDataSetVal(pColumnInfoData, currentRow, pData, false);
×
353
  if (code) {
×
354
    return code;
×
355
  }
356

357
  if (numOfRows > 1) {
×
358
    int32_t* pOffset = pColumnInfoData->varmeta.offset;
×
359
    memset(&pOffset[currentRow + 1], pOffset[currentRow], sizeof(pOffset[0]) * (numOfRows - 1));
×
360
    pColumnInfoData->reassigned = true;
×
361
  }
362

363
  return code;
×
364
}
365

366
int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows,
4✔
367
                          bool isNull) {
368
  int32_t len = pColumnInfoData->info.bytes;
4✔
369
  if (isNull) {
4!
370
    colDataSetNItemsNull(pColumnInfoData, currentRow, numOfRows);
×
371
    pColumnInfoData->hasNull = true;
×
372
    return 0;
×
373
  }
374

375
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
4!
376
    return colDataCopyAndReassign(pColumnInfoData, currentRow, pData, numOfRows);
×
377
  } else {
378
    int32_t  colBytes = pColumnInfoData->info.bytes;
4✔
379
    int32_t  colOffset = currentRow * colBytes;
4✔
380
    uint32_t num = 1;
4✔
381

382
    void* pStart = pColumnInfoData->pData + colOffset;
4✔
383
    memcpy(pStart, pData, colBytes);
4✔
384
    colOffset += num * colBytes;
4✔
385

386
    while (num < numOfRows) {
4!
387
      int32_t maxNum = num << 1;
×
388
      int32_t tnum = maxNum > numOfRows ? (numOfRows - num) : num;
×
389

390
      memcpy(pColumnInfoData->pData + colOffset, pStart, tnum * colBytes);
×
391
      colOffset += tnum * colBytes;
×
392
      num += tnum;
×
393
    }
394
  }
395

396
  return TSDB_CODE_SUCCESS;
4✔
397
}
398

399
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource,
28,449✔
400
                          int32_t numOfRow2) {
401
  if (numOfRow2 <= 0) return;
28,449!
402

403
  uint32_t total = numOfRow1 + numOfRow2;
28,449✔
404

405
  uint32_t remindBits = BitPos(numOfRow1);
28,449✔
406
  uint32_t shiftBits = 8 - remindBits;
28,449✔
407

408
  if (remindBits == 0) {  // no need to shift bits of bitmap
28,449✔
409
    memcpy(pColumnInfoData->nullbitmap + BitmapLen(numOfRow1), pSource->nullbitmap, BitmapLen(numOfRow2));
27,956✔
410
    return;
27,956✔
411
  }
412

413
  uint8_t* p = (uint8_t*)pSource->nullbitmap;
493✔
414
  pColumnInfoData->nullbitmap[BitmapLen(numOfRow1) - 1] &= (0B11111111 << shiftBits);  // clear remind bits
493✔
415
  pColumnInfoData->nullbitmap[BitmapLen(numOfRow1) - 1] |= (p[0] >> remindBits);       // copy remind bits
493✔
416

417
  if (BitmapLen(numOfRow1) == BitmapLen(total)) {
493✔
418
    return;
408✔
419
  }
420

421
  int32_t len = BitmapLen(numOfRow2);
85✔
422
  int32_t i = 0;
85✔
423

424
  uint8_t* start = (uint8_t*)&pColumnInfoData->nullbitmap[BitmapLen(numOfRow1)];
85✔
425
  int32_t  overCount = BitmapLen(total) - BitmapLen(numOfRow1);
85✔
426
  memset(start, 0, overCount);
85✔
427
  while (i < len) {  // size limit of pSource->nullbitmap
7,568✔
428
    if (i >= 1) {
7,541✔
429
      start[i - 1] |= (p[i] >> remindBits);  // copy remind bits
7,435✔
430
    }
431

432
    if (i >= overCount) {  // size limit of pColumnInfoData->nullbitmap
7,541✔
433
      return;
58✔
434
    }
435

436
    start[i] |= (p[i] << shiftBits);  // copy shift bits
7,483✔
437
    i += 1;
7,483✔
438
  }
439
}
440

441
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
80,442✔
442
                        const SColumnInfoData* pSource, int32_t numOfRow2) {
443
  if (pColumnInfoData->info.type != pSource->info.type) {
80,442!
444
    return TSDB_CODE_INVALID_PARA;
×
445
  }
446

447
  if (numOfRow2 == 0) {
80,442!
448
    return numOfRow1;
×
449
  }
450

451
  if (pSource->hasNull) {
80,442✔
452
    pColumnInfoData->hasNull = pSource->hasNull;
52,880✔
453
  }
454

455
  uint32_t finalNumOfRows = numOfRow1 + numOfRow2;
80,442✔
456
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
80,442!
457
    // Handle the bitmap
458
    if (finalNumOfRows > (*capacity)) {
51,976✔
459
      char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2));
741!
460
      if (p == NULL) {
741!
461
        return terrno;
×
462
      }
463

464
      *capacity = finalNumOfRows;
741✔
465
      pColumnInfoData->varmeta.offset = (int32_t*)p;
741✔
466
    }
467

468
    for (int32_t i = 0; i < numOfRow2; ++i) {
4,342,694✔
469
      if (pSource->varmeta.offset[i] == -1) {
4,290,718✔
470
        pColumnInfoData->varmeta.offset[i + numOfRow1] = -1;
200,192✔
471
      } else {
472
        pColumnInfoData->varmeta.offset[i + numOfRow1] = pSource->varmeta.offset[i] + pColumnInfoData->varmeta.length;
4,090,526✔
473
      }
474
    }
475

476
    // copy data
477
    uint32_t len = pSource->varmeta.length;
51,976✔
478
    uint32_t oldLen = pColumnInfoData->varmeta.length;
51,976✔
479
    if (pColumnInfoData->varmeta.allocLen < len + oldLen) {
51,976✔
480
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, len + oldLen);
850!
481
      if (tmp == NULL) {
850!
482
        return terrno;
×
483
      }
484

485
      pColumnInfoData->pData = tmp;
850✔
486
      pColumnInfoData->varmeta.allocLen = len + oldLen;
850✔
487
    }
488

489
    if (pColumnInfoData->pData && pSource->pData) {  // TD-20382
51,976!
490
      memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len);
51,900✔
491
    }
492
    pColumnInfoData->varmeta.length = len + oldLen;
51,976✔
493
  } else {
494
    if (finalNumOfRows > (*capacity)) {
28,466✔
495
      // all data may be null, when the pColumnInfoData->info.type == 0, bytes == 0;
496
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes);
2,460!
497
      if (tmp == NULL) {
2,459!
498
        return terrno;
×
499
      }
500

501
      pColumnInfoData->pData = tmp;
2,459✔
502
      if (BitmapLen(numOfRow1) < BitmapLen(finalNumOfRows)) {
2,459✔
503
        char* btmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(finalNumOfRows));
2,060!
504
        if (btmp == NULL) {
2,060!
505
          return terrno;
×
506
        }
507
        uint32_t extend = BitmapLen(finalNumOfRows) - BitmapLen(numOfRow1);
2,060✔
508
        memset(btmp + BitmapLen(numOfRow1), 0, extend);
2,060✔
509
        pColumnInfoData->nullbitmap = btmp;
2,060✔
510
      }
511

512
      *capacity = finalNumOfRows;
2,459✔
513
    }
514

515
    doBitmapMerge(pColumnInfoData, numOfRow1, pSource, numOfRow2);
28,465✔
516

517
    if (pSource->pData) {
28,477✔
518
      int32_t offset = pColumnInfoData->info.bytes * numOfRow1;
28,472✔
519
      memcpy(pColumnInfoData->pData + offset, pSource->pData, pSource->info.bytes * numOfRow2);
28,472✔
520
    }
521
  }
522

523
  return numOfRow1 + numOfRow2;
80,453✔
524
}
525

526
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
1,485,659✔
527
                      const SDataBlockInfo* pBlockInfo) {
528
  if (pColumnInfoData->info.type != pSource->info.type || (pBlockInfo != NULL && pBlockInfo->capacity < numOfRows)) {
1,485,659!
529
    return TSDB_CODE_INVALID_PARA;
×
530
  }
531

532
  if (numOfRows <= 0) {
1,485,659✔
533
    return numOfRows;
21✔
534
  }
535

536
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
1,655,341!
537
    int32_t newLen = pSource->varmeta.length;
169,702✔
538
    memcpy(pColumnInfoData->varmeta.offset, pSource->varmeta.offset, sizeof(int32_t) * numOfRows);
169,702✔
539
    if (pColumnInfoData->varmeta.allocLen < newLen) {
169,702✔
540
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, newLen);
4,172!
541
      if (tmp == NULL) {
4,173!
542
        return terrno;
×
543
      }
544

545
      pColumnInfoData->pData = tmp;
4,173✔
546
      pColumnInfoData->varmeta.allocLen = newLen;
4,173✔
547
    }
548

549
    pColumnInfoData->varmeta.length = newLen;
169,703✔
550
    if (pColumnInfoData->pData != NULL && pSource->pData != NULL) {
169,703!
551
      memcpy(pColumnInfoData->pData, pSource->pData, newLen);
168,892✔
552
    }
553
  } else {
554
    memcpy(pColumnInfoData->nullbitmap, pSource->nullbitmap, BitmapLen(numOfRows));
1,315,936✔
555
    if (pSource->pData != NULL) {
1,315,936!
556
      memcpy(pColumnInfoData->pData, pSource->pData, pSource->info.bytes * numOfRows);
1,315,948✔
557
    }
558
  }
559

560
  pColumnInfoData->hasNull = pSource->hasNull;
1,485,639✔
561
  pColumnInfoData->info = pSource->info;
1,485,639✔
562
  return 0;
1,485,639✔
563
}
564

565
int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx,
348✔
566
                           int32_t numOfRows) {
567
  if (pDst->info.type != pSrc->info.type || pDst->info.bytes != pSrc->info.bytes || pSrc->reassigned) {
348!
568
    return TSDB_CODE_INVALID_PARA;
×
569
  }
570

571
  if (numOfRows <= 0) {
348!
572
    return numOfRows;
×
573
  }
574

575
  if (IS_VAR_DATA_TYPE(pDst->info.type)) {
348!
576
    int32_t allLen = 0;
×
577
    void*   srcAddr = NULL;
×
578
    if (pSrc->hasNull) {
×
579
      for (int32_t i = 0; i < numOfRows; ++i) {
×
580
        if (colDataIsNull_var(pSrc, srcIdx + i)) {
×
581
          pDst->varmeta.offset[dstIdx + i] = -1;
×
582
          pDst->hasNull = true;
×
583
          continue;
×
584
        }
585

586
        char* pData = colDataGetVarData(pSrc, srcIdx + i);
×
587
        if (NULL == srcAddr) {
×
588
          srcAddr = pData;
×
589
        }
590
        int32_t dataLen = 0;
×
591
        if (pSrc->info.type == TSDB_DATA_TYPE_JSON) {
×
592
          dataLen = getJsonValueLen(pData);
×
593
        } else {
594
          dataLen = varDataTLen(pData);
×
595
        }
596
        pDst->varmeta.offset[dstIdx + i] = pDst->varmeta.length + allLen;
×
597
        allLen += dataLen;
×
598
      }
599
    } else {
600
      for (int32_t i = 0; i < numOfRows; ++i) {
×
601
        char*   pData = colDataGetVarData(pSrc, srcIdx + i);
×
602
        int32_t dataLen = 0;
×
603
        if (pSrc->info.type == TSDB_DATA_TYPE_JSON) {
×
604
          dataLen = getJsonValueLen(pData);
×
605
        } else {
606
          dataLen = varDataTLen(pData);
×
607
        }
608
        pDst->varmeta.offset[dstIdx + i] = pDst->varmeta.length + allLen;
×
609
        allLen += dataLen;
×
610
      }
611
    }
612

613
    if (allLen > 0) {
×
614
      // copy data
615
      if (pDst->varmeta.allocLen < pDst->varmeta.length + allLen) {
×
616
        char* tmp = taosMemoryRealloc(pDst->pData, pDst->varmeta.length + allLen);
×
617
        if (tmp == NULL) {
×
618
          return terrno;
×
619
        }
620

621
        pDst->pData = tmp;
×
622
        pDst->varmeta.allocLen = pDst->varmeta.length + allLen;
×
623
      }
624
      if (pSrc->hasNull) {
×
625
        memcpy(pDst->pData + pDst->varmeta.length, srcAddr, allLen);
×
626
      } else {
627
        memcpy(pDst->pData + pDst->varmeta.length, colDataGetVarData(pSrc, srcIdx), allLen);
×
628
      }
629
      pDst->varmeta.length = pDst->varmeta.length + allLen;
×
630
    }
631
  } else {
632
    if (pSrc->hasNull) {
348!
633
      if (BitPos(dstIdx) == BitPos(srcIdx)) {
348✔
634
        for (int32_t i = 0; i < numOfRows; ++i) {
586✔
635
          if (0 == BitPos(dstIdx) && (i + (1 << NBIT) <= numOfRows)) {
293!
636
            BMCharPos(pDst->nullbitmap, dstIdx + i) = BMCharPos(pSrc->nullbitmap, srcIdx + i);
×
637
            if (BMCharPos(pDst->nullbitmap, dstIdx + i)) {
×
638
              pDst->hasNull = true;
×
639
            }
640
            i += (1 << NBIT) - 1;
×
641
          } else {
642
            if (colDataIsNull_f(pSrc->nullbitmap, srcIdx + i)) {
293!
643
              colDataSetNull_f(pDst->nullbitmap, dstIdx + i);
×
644
              pDst->hasNull = true;
×
645
            } else {
646
              colDataClearNull_f(pDst->nullbitmap, dstIdx + i);
293✔
647
            }
648
          }
649
        }
650
      } else {
651
        for (int32_t i = 0; i < numOfRows; ++i) {
110✔
652
          if (colDataIsNull_f(pSrc->nullbitmap, srcIdx + i)) {
55!
653
            colDataSetNull_f(pDst->nullbitmap, dstIdx + i);
×
654
            pDst->hasNull = true;
×
655
          } else {
656
            colDataClearNull_f(pDst->nullbitmap, dstIdx + i);
55✔
657
          }
658
        }
659
      }
660
    }
661

662
    if (pSrc->pData != NULL) {
348!
663
      memcpy(pDst->pData + pDst->info.bytes * dstIdx, pSrc->pData + pSrc->info.bytes * srcIdx,
348✔
664
             pDst->info.bytes * numOfRows);
348✔
665
    }
666
  }
667

668
  return 0;
348✔
669
}
670

671
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSize(pBlock->pDataBlock); }
1,666,211✔
672

673
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; }
3,686✔
674

675
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) {
203,143✔
676
  if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0) {
203,143!
677
    return 0;
6,417✔
678
  }
679

680
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
196,726✔
681
  if (numOfCols <= 0) {
196,721!
682
    return -1;
×
683
  }
684

685
  int32_t index = (tsColumnIndex == -1) ? 0 : tsColumnIndex;
196,721!
686

687
  SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index);
196,721✔
688
  if (pColInfoData == NULL) {
196,721!
689
    return 0;
×
690
  }
691

692
  if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
196,721✔
693
    return 0;
700✔
694
  }
695

696
  TSKEY skey = *(TSKEY*)colDataGetData(pColInfoData, 0);
196,021!
697
  TSKEY ekey = *(TSKEY*)colDataGetData(pColInfoData, (pDataBlock->info.rows - 1));
196,021!
698

699
  pDataBlock->info.window.skey = TMIN(skey, ekey);
196,021✔
700
  pDataBlock->info.window.ekey = TMAX(skey, ekey);
196,021✔
701

702
  return 0;
196,021✔
703
}
704

705
int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, bool asc) {
15,038✔
706
  if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0 || pkColumnIndex == -1) {
15,038!
707
    return 0;
15,038✔
708
  }
709

710
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
×
711
  if (numOfCols <= 0) {
×
712
    return -1;
×
713
  }
714

715
  SDataBlockInfo*  pInfo = &pDataBlock->info;
×
716
  SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pkColumnIndex);
×
717
  if (pColInfoData == NULL) {
×
718
    return terrno;
×
719
  }
720

721
  if (!IS_NUMERIC_TYPE(pColInfoData->info.type) && (pColInfoData->info.type != TSDB_DATA_TYPE_VARCHAR)) {
×
722
    return 0;
×
723
  }
724

725
  void* skey = colDataGetData(pColInfoData, 0);
×
726
  void* ekey = colDataGetData(pColInfoData, (pInfo->rows - 1));
×
727

728
  int64_t val = 0;
×
729
  if (asc) {
×
730
    if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
×
731
      GET_TYPED_DATA(val, int64_t, pColInfoData->info.type, skey, typeGetTypeModFromColInfo(&pColInfoData->info));
×
732
      VALUE_SET_TRIVIAL_DATUM(&pInfo->pks[0], val);
×
733
      GET_TYPED_DATA(val, int64_t, pColInfoData->info.type, ekey, typeGetTypeModFromColInfo(&pColInfoData->info));
×
734
      VALUE_SET_TRIVIAL_DATUM(&pInfo->pks[1], val);
×
735
    } else {  // todo refactor
736
      memcpy(pInfo->pks[0].pData, varDataVal(skey), varDataLen(skey));
×
737
      pInfo->pks[0].nData = varDataLen(skey);
×
738

739
      memcpy(pInfo->pks[1].pData, varDataVal(ekey), varDataLen(ekey));
×
740
      pInfo->pks[1].nData = varDataLen(ekey);
×
741
    }
742
  } else {
743
    if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
×
744
      GET_TYPED_DATA(val, int64_t, pColInfoData->info.type, ekey, typeGetTypeModFromColInfo(&pColInfoData->info));
×
745
      VALUE_SET_TRIVIAL_DATUM(&pInfo->pks[0], val);
×
746
      GET_TYPED_DATA(val, int64_t, pColInfoData->info.type, skey, typeGetTypeModFromColInfo(&pColInfoData->info));
×
747
      VALUE_SET_TRIVIAL_DATUM(&pInfo->pks[1], val);
×
748
    } else {  // todo refactor
749
      memcpy(pInfo->pks[0].pData, varDataVal(ekey), varDataLen(ekey));
×
750
      pInfo->pks[0].nData = varDataLen(ekey);
×
751

752
      memcpy(pInfo->pks[1].pData, varDataVal(skey), varDataLen(skey));
×
753
      pInfo->pks[1].nData = varDataLen(skey);
×
754
    }
755
  }
756

757
  return 0;
×
758
}
759

760
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
1,221✔
761
  int32_t code = 0;
1,221✔
762
  int32_t capacity = pDest->info.capacity;
1,221✔
763
  size_t  numOfCols = taosArrayGetSize(pDest->pDataBlock);
1,221✔
764
  for (int32_t i = 0; i < numOfCols; ++i) {
5,717✔
765
    SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
4,496✔
766
    SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
4,496✔
767
    if (pCol1 == NULL || pCol2 == NULL) {
4,496!
768
      return terrno;
×
769
    }
770

771
    capacity = pDest->info.capacity;
4,496✔
772
    int32_t ret = colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows);
4,496✔
773
    if (ret < 0) {  // error occurs
4,496!
774
      code = ret;
×
775
      return code;
×
776
    }
777
  }
778

779
  pDest->info.capacity = capacity;
1,221✔
780
  pDest->info.rows += pSrc->info.rows;
1,221✔
781
  return code;
1,221✔
782
}
783

784
int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t srcIdx, int32_t numOfRows) {
×
785
  int32_t code = 0;
×
786
  if (pDest->info.rows + numOfRows > pDest->info.capacity) {
×
787
    return TSDB_CODE_INVALID_PARA;
×
788
  }
789

790
  size_t numOfCols = taosArrayGetSize(pDest->pDataBlock);
×
791
  for (int32_t i = 0; i < numOfCols; ++i) {
×
792
    SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
×
793
    SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
×
794
    if (pCol2 == NULL || pCol1 == NULL) {
×
795
      return terrno;
×
796
    }
797

798
    code = colDataAssignNRows(pCol2, pDest->info.rows, pCol1, srcIdx, numOfRows);
×
799
    if (code) {
×
800
      return code;
×
801
    }
802
  }
803

804
  pDest->info.rows += numOfRows;
×
805
  return code;
×
806
}
807

808
void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) {
×
809
  if (numOfRows == 0) {
×
810
    return;
×
811
  }
812
  
813
  if (numOfRows >= pBlock->info.rows) {
×
814
    blockDataCleanup(pBlock);
×
815
    return;
×
816
  }
817

818
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
×
819
  for (int32_t i = 0; i < numOfCols; ++i) {
×
820
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
×
821
    if (pCol == NULL) {
×
822
      continue;
×
823
    }
824

825
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
×
826
      pCol->varmeta.length = pCol->varmeta.offset[pBlock->info.rows - numOfRows];
×
827
      memset(pCol->varmeta.offset + pBlock->info.rows - numOfRows, 0, sizeof(*pCol->varmeta.offset) * numOfRows);
×
828
    } else {
829
      int32_t i = pBlock->info.rows - numOfRows;
×
830
      for (; i < pBlock->info.rows; ++i) {
×
831
        if (BitPos(i)) {
×
832
          colDataClearNull_f(pCol->nullbitmap, i);
×
833
        } else {
834
          break;
×
835
        }
836
      }
837

838
      int32_t bytes = (pBlock->info.rows - i) / 8;
×
839
      memset(&BMCharPos(pCol->nullbitmap, i), 0, bytes);
×
840
      i += bytes * 8;
×
841

842
      for (; i < pBlock->info.rows; ++i) {
×
843
        colDataClearNull_f(pCol->nullbitmap, i);
×
844
      }
845
    }
846
  }
847

848
  pBlock->info.rows -= numOfRows;
×
849
}
850

851
size_t blockDataGetSize(const SSDataBlock* pBlock) {
186,232✔
852
  size_t total = 0;
186,232✔
853
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
186,232✔
854
  for (int32_t i = 0; i < numOfCols; ++i) {
1,086,167✔
855
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
899,900✔
856
    if (pColInfoData == NULL) {
899,900!
857
      continue;
×
858
    }
859

860
    total += colDataGetFullLength(pColInfoData, pBlock->info.rows);
899,900✔
861
  }
862

863
  return total;
186,267✔
864
}
865

866
// the number of tuples can be fit in one page.
867
// Actual data rows pluses the corresponding meta data must fit in one memory buffer of the given page size.
868
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
520✔
869
                           int32_t pageSize) {
870
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
520✔
871
  int32_t numOfRows = pBlock->info.rows;
520✔
872

873
  int32_t bitmapChar = 1;
520✔
874

875
  size_t headerSize = sizeof(int32_t);
520✔
876
  size_t colHeaderSize = sizeof(int32_t) * numOfCols;
520✔
877

878
  // TODO speedup by checking if the whole page can fit in firstly.
879
  if (!hasVarCol) {
520!
880
    size_t  rowSize = blockDataGetRowSize(pBlock);
×
881
    int32_t capacity = blockDataGetCapacityInRow(pBlock, pageSize, headerSize + colHeaderSize);
×
882
    if (capacity <= 0) {
×
883
      return terrno;
×
884
    }
885

886
    *stopIndex = startIndex + capacity - 1;
×
887
    if (*stopIndex >= numOfRows) {
×
888
      *stopIndex = numOfRows - 1;
×
889
    }
890

891
    return TSDB_CODE_SUCCESS;
×
892
  }
893
  // iterate the rows that can be fit in this buffer page
894
  int32_t size = (headerSize + colHeaderSize);
520✔
895
  for (int32_t j = startIndex; j < numOfRows; ++j) {
1,000,520✔
896
    for (int32_t i = 0; i < numOfCols; ++i) {
3,001,557✔
897
      SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, i);
2,001,038✔
898
      if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
2,001,038!
899
        if (pColInfoData->varmeta.offset[j] != -1) {
1,000,519!
900
          char* p = colDataGetData(pColInfoData, j);
1,000,519!
901
          size += varDataTLen(p);
1,000,519✔
902
        }
903

904
        size += sizeof(pColInfoData->varmeta.offset[0]);
1,000,519✔
905
      } else {
906
        size += pColInfoData->info.bytes;
1,000,519✔
907

908
        if (((j - startIndex) & 0x07) == 0) {
1,000,519✔
909
          size += 1;  // the space for null bitmap
125,505✔
910
        }
911
      }
912
    }
913

914
    if (size > pageSize) {  // pageSize must be able to hold one row
1,000,519✔
915
      *stopIndex = j - 1;
519✔
916
      if (*stopIndex < startIndex) {
519!
917
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
918
      }
919

920
      return TSDB_CODE_SUCCESS;
519✔
921
    }
922
  }
923

924
  // all fit in
925
  *stopIndex = numOfRows - 1;
1✔
926
  return TSDB_CODE_SUCCESS;
1✔
927
}
928

929
int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount, SSDataBlock** pResBlock) {
2,704✔
930
  int32_t code = 0;
2,704✔
931
  QRY_PARAM_CHECK(pResBlock);
2,704!
932

933
  if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) {
2,704!
934
    return TSDB_CODE_INVALID_PARA;
×
935
  }
936

937
  SSDataBlock* pDst = NULL;
2,704✔
938
  code = createOneDataBlock(pBlock, false, &pDst);
2,704✔
939
  if (code) {
2,704!
940
    return code;
×
941
  }
942

943
  code = blockDataEnsureCapacity(pDst, rowCount);
2,704✔
944
  if (code) {
2,704!
945
    blockDataDestroy(pDst);
×
946
    return code;
×
947
  }
948

949
  /* may have disorder varchar data, TODO
950
    for (int32_t i = 0; i < numOfCols; ++i) {
951
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
952
      SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
953

954
      colDataAssignNRows(pDstCol, 0, pColData, startIndex, rowCount);
955
    }
956
  */
957

958
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
2,704✔
959
  for (int32_t i = 0; i < numOfCols; ++i) {
5,488✔
960
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
2,782✔
961
    SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
2,782✔
962
    if (pColData == NULL || pDstCol == NULL) {
2,782!
963
      continue;
×
964
    }
965

966
    for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
1,370,545✔
967
      bool isNull = false;
1,367,761✔
968
      if (pBlock->pBlockAgg == NULL) {
1,367,761!
969
        isNull = colDataIsNull_s(pColData, j);
2,735,522✔
970
      } else {
971
        isNull = colDataIsNull(pColData, pBlock->info.rows, j, &pBlock->pBlockAgg[i]);
×
972
      }
973

974
      if (isNull) {
1,367,761✔
975
        colDataSetNULL(pDstCol, j - startIndex);
351✔
976
      } else {
977
        char* p = colDataGetData(pColData, j);
1,367,410!
978
        code = colDataSetVal(pDstCol, j - startIndex, p, false);
1,367,410✔
979
        if (code) {
1,367,412!
980
          break;
×
981
        }
982
      }
983
    }
984
  }
985

986
  pDst->info.rows = rowCount;
2,706✔
987
  *pResBlock = pDst;
2,706✔
988
  return code;
2,706✔
989
}
990

991
/**
992
 *
993
 * +------------------+---------------------------------------------+
994
 * |the number of rows|                    column #1                |
995
 * |    (4 bytes)     |------------+-----------------------+--------+
996
 * |                  | null bitmap| column length(4bytes) | values |
997
 * +------------------+------------+-----------------------+--------+
998
 * @param buf
999
 * @param pBlock
1000
 * @return
1001
 */
1002
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
44✔
1003
  // write the number of rows
1004
  *(uint32_t*)buf = pBlock->info.rows;
44✔
1005

1006
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
44✔
1007
  int32_t numOfRows = pBlock->info.rows;
44✔
1008

1009
  char* pStart = buf + sizeof(uint32_t);
44✔
1010

1011
  for (int32_t i = 0; i < numOfCols; ++i) {
190✔
1012
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
146✔
1013
    if (pCol == NULL) {
146!
1014
      continue;
×
1015
    }
1016

1017
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
146!
1018
      memcpy(pStart, pCol->varmeta.offset, numOfRows * sizeof(int32_t));
15✔
1019
      pStart += numOfRows * sizeof(int32_t);
15✔
1020
    } else {
1021
      memcpy(pStart, pCol->nullbitmap, BitmapLen(numOfRows));
131✔
1022
      pStart += BitmapLen(pBlock->info.rows);
131✔
1023
    }
1024

1025
    uint32_t dataSize = colDataGetLength(pCol, numOfRows);
146✔
1026

1027
    *(int32_t*)pStart = dataSize;
146✔
1028
    pStart += sizeof(int32_t);
146✔
1029

1030
    if (pCol->reassigned && IS_VAR_DATA_TYPE(pCol->info.type)) {
146!
1031
      for (int32_t row = 0; row < numOfRows; ++row) {
×
1032
        char*   pColData = pCol->pData + pCol->varmeta.offset[row];
×
1033
        int32_t colSize = 0;
×
1034
        if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
×
1035
          colSize = getJsonValueLen(pColData);
×
1036
        } else {
1037
          colSize = varDataTLen(pColData);
×
1038
        }
1039
        memcpy(pStart, pColData, colSize);
×
1040
        pStart += colSize;
×
1041
      }
1042
    } else {
1043
      if (dataSize != 0) {
146!
1044
        // ubsan reports error if pCol->pData==NULL && dataSize==0
1045
        memcpy(pStart, pCol->pData, dataSize);
146✔
1046
      }
1047
      pStart += dataSize;
146✔
1048
    }
1049
  }
1050

1051
  return 0;
44✔
1052
}
1053

1054
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
42✔
1055
  int32_t numOfRows = *(int32_t*)buf;
42✔
1056
  if (numOfRows == 0) {
42!
1057
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1058
  }
1059
  int32_t code = blockDataEnsureCapacity(pBlock, numOfRows);
42✔
1060
  if (code) {
42!
1061
    return code;
×
1062
  }
1063

1064
  pBlock->info.rows = numOfRows;
42✔
1065
  size_t      numOfCols = taosArrayGetSize(pBlock->pDataBlock);
42✔
1066
  const char* pStart = buf + sizeof(uint32_t);
42✔
1067

1068
  for (int32_t i = 0; i < numOfCols; ++i) {
182✔
1069
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
140✔
1070
    if (pCol == NULL) {
140!
1071
      continue;
×
1072
    }
1073

1074
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
140!
1075
      size_t metaSize = pBlock->info.rows * sizeof(int32_t);
15✔
1076
      memcpy(pCol->varmeta.offset, pStart, metaSize);
15✔
1077
      pStart += metaSize;
15✔
1078
    } else {
1079
      memcpy(pCol->nullbitmap, pStart, BitmapLen(pBlock->info.rows));
125✔
1080
      pStart += BitmapLen(pBlock->info.rows);
125✔
1081
    }
1082

1083
    int32_t colLength = *(int32_t*)pStart;
140✔
1084
    pStart += sizeof(int32_t);
140✔
1085

1086
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
140!
1087
      if (pCol->varmeta.allocLen < colLength) {
15!
1088
        char* tmp = taosMemoryRealloc(pCol->pData, colLength);
15!
1089
        if (tmp == NULL) {
15!
1090
          return terrno;
×
1091
        }
1092

1093
        pCol->pData = tmp;
15✔
1094
        pCol->varmeta.allocLen = colLength;
15✔
1095
      }
1096

1097
      pCol->varmeta.length = colLength;
15✔
1098
      if (pCol->varmeta.length > pCol->varmeta.allocLen) {
15!
1099
        return TSDB_CODE_FAILED;
×
1100
      }
1101
    }
1102
    if (colLength != 0) {
140!
1103
      // ubsan reports error if colLength==0 && pCol->pData == 0
1104
      memcpy(pCol->pData, pStart, colLength);
140✔
1105
    }
1106
    pStart += colLength;
140✔
1107
  }
1108

1109
  return TSDB_CODE_SUCCESS;
42✔
1110
}
1111

1112
static bool colDataIsNNull(const SColumnInfoData* pColumnInfoData, int32_t startIndex, uint32_t nRows) {
×
1113
  if (!pColumnInfoData->hasNull) {
×
1114
    return false;
×
1115
  }
1116

1117
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
×
1118
    for (int32_t i = startIndex; i < nRows; ++i) {
×
1119
      if (!colDataIsNull_var(pColumnInfoData, i)) {
×
1120
        return false;
×
1121
      }
1122
    }
1123
  } else {
1124
    if (pColumnInfoData->nullbitmap == NULL) {
×
1125
      return false;
×
1126
    }
1127

1128
    for (int32_t i = startIndex; i < nRows; ++i) {
×
1129
      if (!colDataIsNull_f(pColumnInfoData->nullbitmap, i)) {
×
1130
        return false;
×
1131
      }
1132
    }
1133
  }
1134

1135
  return true;
×
1136
}
1137

1138
// todo remove this
1139
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) {
×
1140
  pBlock->info.rows = *(int32_t*)buf;
×
1141
  pBlock->info.id.groupId = *(uint64_t*)(buf + sizeof(int32_t));
×
1142

1143
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
×
1144

1145
  const char* pStart = buf + sizeof(uint32_t) + sizeof(uint64_t);
×
1146

1147
  for (int32_t i = 0; i < numOfCols; ++i) {
×
1148
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
×
1149
    if (pCol == NULL) {
×
1150
      continue;
×
1151
    }
1152

1153
    pCol->hasNull = true;
×
1154

1155
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
×
1156
      size_t metaSize = capacity * sizeof(int32_t);
×
1157
      memcpy(pCol->varmeta.offset, pStart, metaSize);
×
1158
      pStart += metaSize;
×
1159
    } else {
1160
      memcpy(pCol->nullbitmap, pStart, BitmapLen(capacity));
×
1161
      pStart += BitmapLen(capacity);
×
1162
    }
1163

1164
    int32_t colLength = *(int32_t*)pStart;
×
1165
    pStart += sizeof(int32_t);
×
1166

1167
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
×
1168
      if (pCol->varmeta.allocLen < colLength) {
×
1169
        char* tmp = taosMemoryRealloc(pCol->pData, colLength);
×
1170
        if (tmp == NULL) {
×
1171
          return terrno;
×
1172
        }
1173

1174
        pCol->pData = tmp;
×
1175
        pCol->varmeta.allocLen = colLength;
×
1176
      }
1177

1178
      pCol->varmeta.length = colLength;
×
1179
      if (pCol->varmeta.length > pCol->varmeta.allocLen) {
×
1180
        return TSDB_CODE_FAILED;
×
1181
      }
1182
    }
1183

1184
    if (!colDataIsNNull(pCol, 0, pBlock->info.rows)) {
×
1185
      memcpy(pCol->pData, pStart, colLength);
×
1186
    }
1187

1188
    pStart += pCol->info.bytes * capacity;
×
1189
  }
1190

1191
  return TSDB_CODE_SUCCESS;
×
1192
}
1193

1194
size_t blockDataGetRowSize(SSDataBlock* pBlock) {
365✔
1195
  if (pBlock->info.rowSize == 0) {
365!
1196
    size_t rowSize = 0;
×
1197

1198
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
×
1199
    for (int32_t i = 0; i < numOfCols; ++i) {
×
1200
      SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
×
1201
      if (pColInfo == NULL) {
×
1202
        continue;
×
1203
      }
1204

1205
      rowSize += pColInfo->info.bytes;
×
1206
    }
1207

1208
    pBlock->info.rowSize = rowSize;
×
1209
  }
1210

1211
  return pBlock->info.rowSize;
365✔
1212
}
1213

1214
/**
1215
 * @refitem blockDataToBuf for the meta size
1216
 * @param pBlock
1217
 * @return
1218
 */
1219
size_t blockDataGetSerialMetaSize(uint32_t numOfCols) {
370,263✔
1220
  // | version | total length | total rows | blankFull | total columns | flag seg| block group id | column schema
1221
  // | each column length
1222
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(bool) + sizeof(int32_t) + sizeof(int32_t) +
1223
         sizeof(uint64_t) + numOfCols * (sizeof(int8_t) + sizeof(int32_t)) + numOfCols * sizeof(int32_t);
370,263✔
1224
}
1225

1226
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
40✔
1227
  double rowSize = 0;
40✔
1228

1229
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
40✔
1230
  for (int32_t i = 0; i < numOfCols; ++i) {
174✔
1231
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
134✔
1232
    if (pColInfo == NULL) {
134!
1233
      continue;
×
1234
    }
1235

1236
    rowSize += pColInfo->info.bytes;
134✔
1237
    if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
134!
1238
      rowSize += sizeof(int32_t);
15✔
1239
    } else {
1240
      rowSize += 1 / 8.0;  // one bit for each record
119✔
1241
    }
1242
  }
1243

1244
  return rowSize;
40✔
1245
}
1246

1247
typedef struct SSDataBlockSortHelper {
1248
  SArray*      orderInfo;  // SArray<SBlockOrderInfo>
1249
  SSDataBlock* pDataBlock;
1250
} SSDataBlockSortHelper;
1251

1252
int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
18,508✔
1253
  const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*)param;
18,508✔
1254

1255
  SSDataBlock* pDataBlock = pHelper->pDataBlock;
18,508✔
1256

1257
  int32_t left = *(int32_t*)p1;
18,508✔
1258
  int32_t right = *(int32_t*)p2;
18,508✔
1259

1260
  SArray* pInfo = pHelper->orderInfo;
18,508✔
1261

1262
  for (int32_t i = 0; i < pInfo->size; ++i) {
24,314✔
1263
    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
22,437✔
1264
    SColumnInfoData* pColInfoData = pOrder->pColData;  // TARRAY_GET_ELEM(pDataBlock->pDataBlock, pOrder->colIndex);
22,437✔
1265

1266
    if (pColInfoData->hasNull) {
22,437!
1267
      bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, NULL);
22,437!
1268
      bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, right, NULL);
22,437!
1269
      if (leftNull && rightNull) {
22,437✔
1270
        continue;  // continue to next slot
88✔
1271
      }
1272

1273
      if (rightNull) {
22,349✔
1274
        return pOrder->nullFirst ? 1 : -1;
48!
1275
      }
1276

1277
      if (leftNull) {
22,301✔
1278
        return pOrder->nullFirst ? -1 : 1;
57!
1279
      }
1280
    }
1281

1282
    void* left1 = colDataGetData(pColInfoData, left);
22,244!
1283
    void* right1 = colDataGetData(pColInfoData, right);
22,244!
1284
    if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
22,244!
1285
      if (tTagIsJson(left1) || tTagIsJson(right1)) {
×
1286
        terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
×
1287
        return 0;
×
1288
      }
1289
    }
1290

1291
    __compar_fn_t fn;
1292
    if (pOrder->compFn) {
22,244!
1293
      fn = pOrder->compFn;
22,244✔
1294
    } else {
1295
      fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
×
1296
    }
1297

1298
    int ret = fn(left1, right1);
22,244✔
1299
    if (ret == 0) {
22,244✔
1300
      continue;
5,718✔
1301
    } else {
1302
      return ret;
16,526✔
1303
    }
1304
  }
1305

1306
  return 0;
1,877✔
1307
}
1308

1309
static void blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, const int32_t* index) {
47✔
1310
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
47✔
1311
  for (int32_t i = 0; i < numOfCols; ++i) {
181✔
1312
    SColumnInfoData* pDst = &pCols[i];
134✔
1313
    SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
134✔
1314
    if (pSrc == NULL) {
134!
1315
      continue;
×
1316
    }
1317

1318
    if (IS_VAR_DATA_TYPE(pSrc->info.type)) {
134!
1319
      if (pSrc->varmeta.length != 0) {
11✔
1320
        memcpy(pDst->pData, pSrc->pData, pSrc->varmeta.length);
10✔
1321
      }
1322
      pDst->varmeta.length = pSrc->varmeta.length;
11✔
1323

1324
      for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
472✔
1325
        pDst->varmeta.offset[j] = pSrc->varmeta.offset[index[j]];
461✔
1326
      }
1327
    } else {
1328
      for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
6,616✔
1329
        if (colDataIsNull_f(pSrc->nullbitmap, index[j])) {
6,493✔
1330
          colDataSetNull_f_s(pDst, j);
83✔
1331
          continue;
83✔
1332
        }
1333
        memcpy(pDst->pData + j * pDst->info.bytes, pSrc->pData + index[j] * pDst->info.bytes, pDst->info.bytes);
6,410✔
1334
      }
1335
    }
1336
  }
1337
}
47✔
1338

1339
static int32_t createHelpColInfoData(const SSDataBlock* pDataBlock, SColumnInfoData** ppCols) {
47✔
1340
  int32_t code = 0;
47✔
1341
  int32_t rows = pDataBlock->info.capacity;
47✔
1342
  size_t  numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
47✔
1343
  int32_t i = 0;
47✔
1344

1345
  *ppCols = NULL;
47✔
1346

1347
  SColumnInfoData* pCols = taosMemoryCalloc(numOfCols, sizeof(SColumnInfoData));
47!
1348
  if (pCols == NULL) {
47!
1349
    return terrno;
×
1350
  }
1351

1352
  for (i = 0; i < numOfCols; ++i) {
181✔
1353
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
134✔
1354
    if (pColInfoData == NULL) {
134!
1355
      continue;
×
1356
    }
1357

1358
    pCols[i].info = pColInfoData->info;
134✔
1359
    if (IS_VAR_DATA_TYPE(pCols[i].info.type)) {
134!
1360
      pCols[i].varmeta.offset = taosMemoryCalloc(rows, sizeof(int32_t));
11!
1361
      pCols[i].pData = taosMemoryCalloc(1, pColInfoData->varmeta.length);
11!
1362
      if (pCols[i].varmeta.offset == NULL || pCols[i].pData == NULL) {
11!
1363
        code = terrno;
×
1364
        taosMemoryFree(pCols[i].varmeta.offset);
×
1365
        taosMemoryFree(pCols[i].pData);
×
1366
        goto _error;
×
1367
      }
1368

1369
      pCols[i].varmeta.length = pColInfoData->varmeta.length;
11✔
1370
      pCols[i].varmeta.allocLen = pCols[i].varmeta.length;
11✔
1371
    } else {
1372
      pCols[i].nullbitmap = taosMemoryCalloc(1, BitmapLen(rows));
123!
1373
      pCols[i].pData = taosMemoryCalloc(rows, pCols[i].info.bytes);
123!
1374
      if (pCols[i].nullbitmap == NULL || pCols[i].pData == NULL) {
123!
1375
        code = terrno;
×
1376
        taosMemoryFree(pCols[i].nullbitmap);
×
1377
        taosMemoryFree(pCols[i].pData);
×
1378
        goto _error;
×
1379
      }
1380
    }
1381
  }
1382

1383
  *ppCols = pCols;
47✔
1384
  return code;
47✔
1385

1386
  _error:
×
1387
  for(int32_t j = 0; j < i; ++j) {
×
1388
    colDataDestroy(&pCols[j]);
×
1389
  }
1390

1391
  taosMemoryFree(pCols);
×
1392
  return code;
×
1393
}
1394

1395
static void copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) {
47✔
1396
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
47✔
1397

1398
  for (int32_t i = 0; i < numOfCols; ++i) {
181✔
1399
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
134✔
1400
    if (pColInfoData == NULL) {
134!
1401
      continue;
×
1402
    }
1403

1404
    pColInfoData->info = pCols[i].info;
134✔
1405
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
134!
1406
      taosMemoryFreeClear(pColInfoData->varmeta.offset);
11!
1407
      pColInfoData->varmeta = pCols[i].varmeta;
11✔
1408
    } else {
1409
      taosMemoryFreeClear(pColInfoData->nullbitmap);
123!
1410
      pColInfoData->nullbitmap = pCols[i].nullbitmap;
123✔
1411
    }
1412

1413
    taosMemoryFreeClear(pColInfoData->pData);
134!
1414
    pColInfoData->pData = pCols[i].pData;
134✔
1415
  }
1416

1417
  taosMemoryFreeClear(pCols);
47!
1418
}
47✔
1419

1420
static int32_t* createTupleIndex(size_t rows) {
47✔
1421
  int32_t* index = taosMemoryCalloc(rows, sizeof(int32_t));
47!
1422
  if (index == NULL) {
47!
1423
    return NULL;
×
1424
  }
1425

1426
  for (int32_t i = 0; i < rows; ++i) {
3,044✔
1427
    index[i] = i;
2,997✔
1428
  }
1429

1430
  return index;
47✔
1431
}
1432

1433
static void destroyTupleIndex(int32_t* index) { taosMemoryFreeClear(index); }
47!
1434

1435
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
48✔
1436
  if (pDataBlock->info.rows <= 1) {
48✔
1437
    return TSDB_CODE_SUCCESS;
1✔
1438
  }
1439

1440
  // Allocate the additional buffer.
1441
  uint32_t rows = pDataBlock->info.rows;
47✔
1442

1443
  bool sortColumnHasNull = false;
47✔
1444
  bool varTypeSort = false;
47✔
1445

1446
  for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
122✔
1447
    SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
75✔
1448
    if (pInfo == NULL) {
75!
1449
      continue;
×
1450
    }
1451

1452
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
75✔
1453
    if (pColInfoData == NULL) {
75!
1454
      continue;
×
1455
    }
1456

1457
    if (pColInfoData->hasNull) {
75!
1458
      sortColumnHasNull = true;
75✔
1459
    }
1460

1461
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
75!
1462
      varTypeSort = true;
5✔
1463
    }
1464
  }
1465

1466
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
47✔
1467

1468
  if (taosArrayGetSize(pOrderInfo) == 1 && (!sortColumnHasNull)) {
47!
1469
    if (numOfCols == 1) {
×
1470
      if (!varTypeSort) {
×
1471
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, 0);
×
1472
        SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, 0);
×
1473
        if (pColInfoData == NULL || pOrder == NULL) {
×
1474
          return terrno;
×
1475
        }
1476

1477
        int64_t p0 = taosGetTimestampUs();
×
1478

1479
        __compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
×
1480
        taosSort(pColInfoData->pData, pDataBlock->info.rows, pColInfoData->info.bytes, fn);
×
1481

1482
        int64_t p1 = taosGetTimestampUs();
×
1483
        uDebug("blockDataSort easy cost:%" PRId64 ", rows:%" PRId64 "\n", p1 - p0, pDataBlock->info.rows);
×
1484

1485
        return TSDB_CODE_SUCCESS;
×
1486
      } else {  // var data type
1487
      }
1488
    } else if (numOfCols == 2) {
1489
    }
1490
  }
1491

1492
  int32_t* index = createTupleIndex(rows);
47✔
1493
  if (index == NULL) {
47!
1494
    return terrno;
×
1495
  }
1496

1497
  int64_t p0 = taosGetTimestampUs();
47✔
1498

1499
  SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
47✔
1500
  for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) {
122✔
1501
    struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i);
75✔
1502
    if (pInfo == NULL) {
75!
1503
      continue;
×
1504
    }
1505

1506
    pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
75✔
1507
    if (pInfo->pColData == NULL) {
75!
1508
      continue;
×
1509
    }
1510
    pInfo->compFn = getKeyComparFunc(pInfo->pColData->info.type, pInfo->order);
75✔
1511
  }
1512

1513
  terrno = 0;
47✔
1514
  taosqsort_r(index, rows, sizeof(int32_t), &helper, dataBlockCompar);
47✔
1515
  if (terrno) return terrno;
47!
1516

1517
  int64_t p1 = taosGetTimestampUs();
47✔
1518

1519
  SColumnInfoData* pCols = NULL;
47✔
1520
  int32_t code = createHelpColInfoData(pDataBlock, &pCols);
47✔
1521
  if (code != 0) {
47!
1522
    destroyTupleIndex(index);
×
1523
    return code;
×
1524
  }
1525

1526
  int64_t p2 = taosGetTimestampUs();
47✔
1527
  blockDataAssign(pCols, pDataBlock, index);
47✔
1528

1529
  int64_t p3 = taosGetTimestampUs();
47✔
1530
  copyBackToBlock(pDataBlock, pCols);
47✔
1531

1532
  int64_t p4 = taosGetTimestampUs();
47✔
1533
  uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64
47✔
1534
         ", rows:%d\n",
1535
         p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows);
1536

1537
  destroyTupleIndex(index);
47✔
1538
  return TSDB_CODE_SUCCESS;
47✔
1539
}
1540

1541
void blockDataCleanup(SSDataBlock* pDataBlock) {
949,039✔
1542
  blockDataEmpty(pDataBlock);
949,039✔
1543
  SDataBlockInfo* pInfo = &pDataBlock->info;
949,055✔
1544
  pInfo->id.uid = 0;
949,055✔
1545
  pInfo->id.groupId = 0;
949,055✔
1546
}
949,055✔
1547

1548
void blockDataEmpty(SSDataBlock* pDataBlock) {
949,190✔
1549
  SDataBlockInfo* pInfo = &pDataBlock->info;
949,190✔
1550
  if (pInfo->capacity == 0) {
949,190✔
1551
    return;
191,719✔
1552
  }
1553

1554
  taosMemoryFreeClear(pDataBlock->pBlockAgg);
757,471!
1555

1556
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
757,471✔
1557
  for (int32_t i = 0; i < numOfCols; ++i) {
3,669,408✔
1558
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
2,911,929✔
1559
    if (p == NULL) {
2,911,219!
1560
      continue;
×
1561
    }
1562

1563
    colInfoDataCleanup(p, pInfo->capacity);
2,911,219✔
1564
  }
1565

1566
  pInfo->rows = 0;
757,479✔
1567
  pInfo->dataLoad = 0;
757,479✔
1568
  pInfo->window.ekey = 0;
757,479✔
1569
  pInfo->window.skey = 0;
757,479✔
1570
}
1571

1572
void blockDataReset(SSDataBlock* pDataBlock) {
×
1573
  SDataBlockInfo* pInfo = &pDataBlock->info;
×
1574
  if (pInfo->capacity == 0) {
×
1575
    return;
×
1576
  }
1577

1578
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
×
1579
  for (int32_t i = 0; i < numOfCols; ++i) {
×
1580
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
×
1581
    if (p == NULL) {
×
1582
      continue;
×
1583
    }
1584

1585
    p->hasNull = false;
×
1586
    p->reassigned = false;
×
1587
    if (IS_VAR_DATA_TYPE(p->info.type)) {
×
1588
      p->varmeta.length = 0;
×
1589
    }
1590
  }
1591

1592
  pInfo->rows = 0;
×
1593
  pInfo->dataLoad = 0;
×
1594
  pInfo->window.ekey = 0;
×
1595
  pInfo->window.skey = 0;
×
1596
  pInfo->id.uid = 0;
×
1597
  pInfo->id.groupId = 0;
×
1598
}
1599

1600
/*
1601
 * NOTE: the type of the input column may be TSDB_DATA_TYPE_NULL, which is used to denote
1602
 * the all NULL value in this column. It is an internal representation of all NULL value column, and no visible to
1603
 * any users. The length of TSDB_DATA_TYPE_NULL is 0, and it is an special case.
1604
 */
1605
int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows,
248,979✔
1606
                         bool clearPayload) {
1607
  if ((numOfRows <= 0)|| (pBlockInfo && numOfRows <= pBlockInfo->capacity)) {
248,979!
1608
    return TSDB_CODE_SUCCESS;
×
1609
  }
1610

1611
  int32_t existedRows = pBlockInfo ? pBlockInfo->rows : 0;
248,979!
1612

1613
  if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
248,979!
1614
    char* tmp = taosMemoryRealloc(pColumn->varmeta.offset, sizeof(int32_t) * numOfRows);
123,317!
1615
    if (tmp == NULL) {
123,432!
1616
      return terrno;
×
1617
    }
1618

1619
    pColumn->varmeta.offset = (int32_t*)tmp;
123,432✔
1620
    memset(&pColumn->varmeta.offset[existedRows], 0, sizeof(int32_t) * (numOfRows - existedRows));
123,432✔
1621
  } else {
1622
    // prepare for the null bitmap
1623
    char* tmp = taosMemoryRealloc(pColumn->nullbitmap, BitmapLen(numOfRows));
125,662✔
1624
    if (tmp == NULL) {
125,654!
1625
      return terrno;
×
1626
    }
1627

1628
    int32_t oldLen = BitmapLen(existedRows);
125,654✔
1629
    pColumn->nullbitmap = tmp;
125,654✔
1630
    memset(&pColumn->nullbitmap[oldLen], 0, BitmapLen(numOfRows) - oldLen);
125,654✔
1631
    if (pColumn->info.bytes == 0) {
125,654!
1632
      return TSDB_CODE_INVALID_PARA;
×
1633
    }
1634

1635
    // here we employ the aligned malloc function, to make sure that the address of allocated memory is aligned
1636
    // to MALLOC_ALIGN_BYTES
1637
    tmp = taosMemoryMallocAlign(MALLOC_ALIGN_BYTES, numOfRows * pColumn->info.bytes);
125,654!
1638
    if (tmp == NULL) {
125,671!
1639
      return terrno;
×
1640
    }
1641
    // memset(tmp, 0, numOfRows * pColumn->info.bytes);
1642

1643
    // copy back the existed data
1644
    if (pColumn->pData != NULL) {
125,671✔
1645
      memcpy(tmp, pColumn->pData, existedRows * pColumn->info.bytes);
701✔
1646
      taosMemoryFreeClear(pColumn->pData);
701!
1647
    }
1648

1649
    pColumn->pData = tmp;
125,671✔
1650

1651
    // check if the allocated memory is aligned to the requried bytes.
1652
#if defined LINUX
1653
    if ((((uint64_t)pColumn->pData) & (MALLOC_ALIGN_BYTES - 1)) != 0x0) {
125,671!
1654
      return TSDB_CODE_FAILED;
×
1655
    }
1656
#endif
1657

1658
    if (clearPayload) {
125,671✔
1659
      memset(tmp + pColumn->info.bytes * existedRows, 0, pColumn->info.bytes * (numOfRows - existedRows));
89,611✔
1660
    }
1661
  }
1662

1663
  return TSDB_CODE_SUCCESS;
249,103✔
1664
}
1665

1666
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
3,199,170✔
1667
  pColumn->hasNull = false;
3,199,170✔
1668

1669
  if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
3,199,170!
1670
    pColumn->varmeta.length = 0;
798,801✔
1671
    if (pColumn->varmeta.offset != NULL) {
798,801!
1672
      memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows);
799,897✔
1673
    }
1674
  } else {
1675
    if (pColumn->nullbitmap != NULL) {
2,400,369✔
1676
      memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
2,400,363✔
1677
    }
1678
  }
1679
}
3,199,170✔
1680

1681
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload) {
200,596✔
1682
  SDataBlockInfo info = {0};
200,596✔
1683
  return doEnsureCapacity(pColumn, &info, numOfRows, clearPayload);
200,596✔
1684
}
1685

1686
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
562,729✔
1687
  int32_t code = 0;
562,729✔
1688
  if (numOfRows == 0 || numOfRows <= pDataBlock->info.capacity) {
562,729!
1689
    return TSDB_CODE_SUCCESS;
549,291✔
1690
  }
1691

1692
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
13,438✔
1693
  for (int32_t i = 0; i < numOfCols; ++i) {
61,825✔
1694
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
48,382✔
1695
    if (p == NULL) {
48,383!
1696
      return terrno;
×
1697
    }
1698

1699
    code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, false);
48,383✔
1700
    if (code) {
48,390!
1701
      return code;
×
1702
    }
1703
  }
1704

1705
  pDataBlock->info.capacity = numOfRows;
13,443✔
1706
  return TSDB_CODE_SUCCESS;
13,443✔
1707
}
1708

1709
void blockDataFreeRes(SSDataBlock* pBlock) {
17,794✔
1710
  if (pBlock == NULL){
17,794✔
1711
    return;
1✔
1712
  }
1713

1714
  int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
17,793✔
1715
  for (int32_t i = 0; i < numOfOutput; ++i) {
80,505✔
1716
    SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
62,709✔
1717
    if (pColInfoData == NULL) {
62,708!
1718
      continue;
×
1719
    }
1720

1721
    colDataDestroy(pColInfoData);
62,708✔
1722
  }
1723

1724
  taosArrayDestroy(pBlock->pDataBlock);
17,796✔
1725
  pBlock->pDataBlock = NULL;
17,796✔
1726

1727
  taosMemoryFreeClear(pBlock->pBlockAgg);
17,796!
1728
  memset(&pBlock->info, 0, sizeof(SDataBlockInfo));
17,793✔
1729
}
1730

1731
void blockDataDestroy(SSDataBlock* pBlock) {
15,032✔
1732
  if (pBlock == NULL) {
15,032✔
1733
    return;
1,566✔
1734
  }
1735

1736
  if (IS_VAR_DATA_TYPE(pBlock->info.pks[0].type)) {
13,466!
1737
    taosMemoryFreeClear(pBlock->info.pks[0].pData);
×
1738
    taosMemoryFreeClear(pBlock->info.pks[1].pData);
×
1739
  }
1740

1741
  blockDataFreeRes(pBlock);
13,466✔
1742
  taosMemoryFreeClear(pBlock);
13,465!
1743
}
1744

1745
// todo remove it
1746
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
713✔
1747
  int32_t code = 0;
713✔
1748

1749
  dst->info = src->info;
713✔
1750
  dst->info.pks[0].pData = NULL;
713✔
1751
  dst->info.pks[1].pData = NULL;
713✔
1752
  dst->info.rows = 0;
713✔
1753
  dst->info.capacity = 0;
713✔
1754

1755
  size_t numOfCols = taosArrayGetSize(src->pDataBlock);
713✔
1756
  for (int32_t i = 0; i < numOfCols; ++i) {
2,845✔
1757
    SColumnInfoData* p = taosArrayGet(src->pDataBlock, i);
2,132✔
1758
    if (p == NULL) {
2,132!
1759
      return terrno;
×
1760
    }
1761

1762
    SColumnInfoData  colInfo = {.hasNull = true, .info = p->info};
2,132✔
1763
    code = blockDataAppendColInfo(dst, &colInfo);
2,132✔
1764
    if (code) {
2,132!
1765
      return code;
×
1766
    }
1767
  }
1768

1769
  code = blockDataEnsureCapacity(dst, src->info.rows);
713✔
1770
  if (code != TSDB_CODE_SUCCESS) {
713!
1771
    return code;
×
1772
  }
1773

1774
  for (int32_t i = 0; i < numOfCols; ++i) {
2,845✔
1775
    SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i);
2,132✔
1776
    SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i);
2,132✔
1777
    if (pSrc == NULL || pDst == NULL || (pSrc->pData == NULL && (!IS_VAR_DATA_TYPE(pSrc->info.type)))) {
2,132!
1778
      continue;
×
1779
    }
1780

1781
    int32_t ret = colDataAssign(pDst, pSrc, src->info.rows, &src->info);
2,132✔
1782
    if (ret < 0) {
2,132!
1783
      return ret;
×
1784
    }
1785
  }
1786

1787
  uint32_t cap = dst->info.capacity;
713✔
1788
  dst->info = src->info;
713✔
1789
  dst->info.pks[0].pData = NULL;
713✔
1790
  dst->info.pks[1].pData = NULL;
713✔
1791
  dst->info.capacity = cap;
713✔
1792
  return code;
713✔
1793
}
1794

1795
int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) {
3,502✔
1796
  blockDataCleanup(pDst);
3,502✔
1797

1798
  int32_t code = blockDataEnsureCapacity(pDst, pSrc->info.rows);
3,502✔
1799
  if (code != TSDB_CODE_SUCCESS) {
3,502!
1800
    return code;
×
1801
  }
1802

1803
  size_t numOfCols = taosArrayGetSize(pSrc->pDataBlock);
3,502✔
1804
  for (int32_t i = 0; i < numOfCols; ++i) {
7,982✔
1805
    SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
4,480✔
1806
    SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, i);
4,480✔
1807
    if (pDstCol == NULL || pSrcCol == NULL) {
4,480!
1808
      continue;
×
1809
    }
1810

1811
    int32_t ret = colDataAssign(pDstCol, pSrcCol, pSrc->info.rows, &pSrc->info);
4,480✔
1812
    if (ret < 0) {
4,480!
1813
      code = ret;
×
1814
      return code;
×
1815
    }
1816
  }
1817

1818
  uint32_t cap = pDst->info.capacity;
3,502✔
1819

1820
  pDst->info = pSrc->info;
3,502✔
1821
  pDst->info.pks[0].pData = NULL;
3,502✔
1822
  pDst->info.pks[1].pData = NULL;
3,502✔
1823
  code = copyPkVal(&pDst->info, &pSrc->info);
3,502✔
1824
  if (code != TSDB_CODE_SUCCESS) {
3,502!
1825
    uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1826
    return code;
×
1827
  }
1828

1829
  pDst->info.capacity = cap;
3,502✔
1830
  return code;
3,502✔
1831
}
1832

1833
int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock) {
391✔
1834
  QRY_PARAM_CHECK(pBlock);
391!
1835

1836
  int32_t      code = 0;
391✔
1837
  SSDataBlock* p = taosMemoryCalloc(1, sizeof(SSDataBlock));
391!
1838
  if (p == NULL) {
391!
1839
    return terrno;
×
1840
  }
1841

1842
  p->info.hasVarCol = false;
391✔
1843
  p->info.id.groupId = 0;
391✔
1844
  p->info.rows = 0;
391✔
1845
  p->info.type = type;
391✔
1846
  p->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) +
391✔
1847
                    sizeof(TSKEY) + VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
1848
  p->info.watermark = INT64_MIN;
391✔
1849

1850
  p->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
391✔
1851
  if (p->pDataBlock == NULL) {
391!
1852
    taosMemoryFree(p);
×
1853
    return terrno;
×
1854
  }
1855

1856
  SColumnInfoData infoData = {0};
391✔
1857
  infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
391✔
1858
  infoData.info.bytes = sizeof(TSKEY);
391✔
1859

1860
  // window start ts
1861
  void* px = taosArrayPush(p->pDataBlock, &infoData);
391✔
1862
  if (px == NULL) {
391!
1863
    code = terrno;
×
1864
    goto _err;
×
1865
  }
1866

1867
  // window end ts
1868
  px = taosArrayPush(p->pDataBlock, &infoData);
391✔
1869
  if (px == NULL) {
391!
1870
    code = terrno;
×
1871
    goto _err;
×
1872
  }
1873

1874
  infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
391✔
1875
  infoData.info.bytes = sizeof(uint64_t);
391✔
1876

1877
  // uid
1878
  px = taosArrayPush(p->pDataBlock, &infoData);
391✔
1879
  if (px == NULL) {
391!
1880
    code = terrno;
×
1881
    goto _err;
×
1882
  }
1883

1884
  // group id
1885
  px = taosArrayPush(p->pDataBlock, &infoData);
391✔
1886
  if (px == NULL) {
391!
1887
    code = terrno;
×
1888
    goto _err;
×
1889
  }
1890

1891
  infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
391✔
1892
  infoData.info.bytes = sizeof(TSKEY);
391✔
1893

1894
  // calculate start ts
1895
  px = taosArrayPush(p->pDataBlock, &infoData);
391✔
1896
  if (px == NULL) {
391!
1897
    code = terrno;
×
1898
    goto _err;
×
1899
  }
1900

1901
  // calculate end ts
1902
  px = taosArrayPush(p->pDataBlock, &infoData);
391✔
1903
  if (px == NULL) {
391!
1904
    code = terrno;
×
1905
    goto _err;
×
1906
  }
1907

1908
  // table name
1909
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
391✔
1910
  infoData.info.bytes = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
391✔
1911
  px = taosArrayPush(p->pDataBlock, &infoData);
391✔
1912
  if (px == NULL) {
391!
1913
    code = terrno;
×
1914
    goto _err;
×
1915
  }
1916

1917
  *pBlock = p;
391✔
1918
  return code;
391✔
1919

1920
_err:
×
1921
  taosArrayDestroy(p->pDataBlock);
×
1922
  taosMemoryFree(p);
×
1923
  return code;
×
1924
}
1925

1926
int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlock** pResBlock) {
10✔
1927
  QRY_PARAM_CHECK(pResBlock);
10!
1928

1929
  if (pDataBlock == NULL) {
10!
1930
    return TSDB_CODE_INVALID_PARA;
×
1931
  }
1932

1933
  SSDataBlock* pBlock = NULL;
10✔
1934
  int32_t code = createDataBlock(&pBlock);
10✔
1935
  if (code) {
10!
1936
    return code;
×
1937
  }
1938

1939
  pBlock->info = pDataBlock->info;
10✔
1940
  pBlock->info.pks[0].pData = NULL;
10✔
1941
  pBlock->info.pks[1].pData = NULL;
10✔
1942
  pBlock->info.rows = 0;
10✔
1943
  pBlock->info.capacity = 0;
10✔
1944

1945
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
10✔
1946
  for (int32_t i = 0; i < numOfCols; ++i) {
40✔
1947
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
30✔
1948
    if (p == NULL) {
30!
1949
      blockDataDestroy(pBlock);
×
1950
      return terrno;
×
1951
    }
1952

1953
    SColumnInfoData  colInfo = {.hasNull = true, .info = p->info};
30✔
1954
    code = blockDataAppendColInfo(pBlock, &colInfo);
30✔
1955
    if (code) {
30!
1956
      blockDataDestroy(pBlock);
×
1957
      return code;
×
1958
    }
1959
  }
1960

1961
  code = blockDataEnsureCapacity(pBlock, 1);
10✔
1962
  if (code != TSDB_CODE_SUCCESS) {
10!
1963
    blockDataDestroy(pBlock);
×
1964
    return code;
×
1965
  }
1966

1967
  for (int32_t i = 0; i < numOfCols; ++i) {
40✔
1968
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
30✔
1969
    SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
30✔
1970
    if (pDst == NULL || pSrc == NULL) {
30!
1971
      blockDataDestroy(pBlock);
×
1972
      return terrno;
×
1973
    }
1974

1975
    bool  isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL);
30✔
1976
    void* pData = NULL;
30✔
1977
    if (!isNull) {
30!
1978
      pData = colDataGetData(pSrc, rowIdx);
30!
1979
    }
1980

1981
    code = colDataSetVal(pDst, 0, pData, isNull);
30✔
1982
    if (code) {
30!
1983
      blockDataDestroy(pBlock);
×
1984
      return code;
×
1985
    }
1986
  }
1987

1988
  pBlock->info.rows = 1;
10✔
1989

1990
  *pResBlock = pBlock;
10✔
1991
  return code;
10✔
1992
}
1993

1994
int32_t copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc) {
10,497✔
1995
  int32_t code = TSDB_CODE_SUCCESS;
10,497✔
1996
  int32_t lino = 0;
10,497✔
1997
  if (!IS_VAR_DATA_TYPE(pSrc->pks[0].type)) {
10,497!
1998
    return code;
10,497✔
1999
  }
2000

2001
  // prepare the pk buffer if needed
2002
  SValue* p = &pDst->pks[0];
×
2003

2004
  p->type = pSrc->pks[0].type;
×
2005
  p->pData = taosMemoryCalloc(1, pSrc->pks[0].nData);
×
2006
  QUERY_CHECK_NULL(p->pData, code, lino, _end, terrno);
×
2007

2008
  p->nData = pSrc->pks[0].nData;
×
2009
  memcpy(p->pData, pSrc->pks[0].pData, p->nData);
×
2010

2011
  p = &pDst->pks[1];
×
2012
  p->type = pSrc->pks[1].type;
×
2013
  p->pData = taosMemoryCalloc(1, pSrc->pks[1].nData);
×
2014
  QUERY_CHECK_NULL(p->pData, code, lino, _end, terrno);
×
2015

2016
  p->nData = pSrc->pks[1].nData;
×
2017
  memcpy(p->pData, pSrc->pks[1].pData, p->nData);
×
2018

2019
_end:
×
2020
  if (code != TSDB_CODE_SUCCESS) {
×
2021
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2022
  }
2023
  return code;
×
2024
}
2025

2026
int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataBlock** pResBlock) {
6,994✔
2027
  QRY_PARAM_CHECK(pResBlock);
6,994!
2028
  if (pDataBlock == NULL) {
6,994!
2029
    return TSDB_CODE_INVALID_PARA;
×
2030
  }
2031

2032
  SSDataBlock* pDstBlock = NULL;
6,994✔
2033
  int32_t code = createDataBlock(&pDstBlock);
6,994✔
2034
  if (code) {
6,995!
2035
    return code;
×
2036
  }
2037

2038
  pDstBlock->info = pDataBlock->info;
6,995✔
2039
  pDstBlock->info.pks[0].pData = NULL;
6,995✔
2040
  pDstBlock->info.pks[1].pData = NULL;
6,995✔
2041

2042
  pDstBlock->info.rows = 0;
6,995✔
2043
  pDstBlock->info.capacity = 0;
6,995✔
2044
  pDstBlock->info.rowSize = 0;
6,995✔
2045
  pDstBlock->info.id = pDataBlock->info.id;
6,995✔
2046
  pDstBlock->info.blankFill = pDataBlock->info.blankFill;
6,995✔
2047

2048
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
6,995✔
2049
  for (int32_t i = 0; i < numOfCols; ++i) {
28,110✔
2050
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
21,115✔
2051
    if (p == NULL) {
21,116!
2052
      blockDataDestroy(pDstBlock);
×
2053
      return terrno;
×
2054
    }
2055

2056
    SColumnInfoData  colInfo = {.hasNull = true, .info = p->info};
21,116✔
2057
    code = blockDataAppendColInfo(pDstBlock, &colInfo);
21,116✔
2058
    if (code) {
21,116✔
2059
      blockDataDestroy(pDstBlock);
1✔
2060
      return code;
×
2061
    }
2062
  }
2063

2064
  code = copyPkVal(&pDstBlock->info, &pDataBlock->info);
6,995✔
2065
  if (code != TSDB_CODE_SUCCESS) {
6,993!
2066
    blockDataDestroy(pDstBlock);
2067
    uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2068
    return code;
×
2069
  }
2070

2071
  if (copyData) {
6,994✔
2072
    code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows);
2,275✔
2073
    if (code != TSDB_CODE_SUCCESS) {
2,275!
2074
      blockDataDestroy(pDstBlock);
×
2075
      return code;
×
2076
    }
2077

2078
    for (int32_t i = 0; i < numOfCols; ++i) {
12,426✔
2079
      SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
10,151✔
2080
      SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
10,151✔
2081
      if (pDst == NULL) {
10,150!
2082
        blockDataDestroy(pDstBlock);
×
2083
        uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2084
        return terrno;
×
2085
      }
2086

2087
      if (pSrc == NULL) {
10,150!
2088
        blockDataDestroy(pDstBlock);
×
2089
        uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2090
        return terrno;
×
2091
      }
2092

2093
      int32_t ret = colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
10,150✔
2094
      if (ret < 0) {
10,151!
2095
        code = ret;
×
2096
        blockDataDestroy(pDstBlock);
×
2097
        return code;
×
2098
      }
2099
    }
2100

2101
    pDstBlock->info.rows = pDataBlock->info.rows;
2,275✔
2102
    pDstBlock->info.capacity = pDataBlock->info.rows;
2,275✔
2103
  }
2104

2105
  *pResBlock = pDstBlock;
6,994✔
2106
  return code;
6,994✔
2107
}
2108

2109
int32_t createOneDataBlockWithColArray(const SSDataBlock* pDataBlock, SArray* pColArray, SSDataBlock** pResBlock) {
×
2110
  int32_t      code = TSDB_CODE_SUCCESS;
×
2111
  int32_t      lino = 0;
×
2112
  SSDataBlock* pDstBlock = NULL;
×
2113

2114
  QRY_PARAM_CHECK(pResBlock);
×
2115
  QUERY_CHECK_NULL(pDataBlock, code, lino, _return, TSDB_CODE_INVALID_PARA);
×
2116

2117
  QUERY_CHECK_CODE(createDataBlock(&pDstBlock), lino, _return);
×
2118

2119
  pDstBlock->info = pDataBlock->info;
×
2120
  pDstBlock->info.pks[0].pData = NULL;
×
2121
  pDstBlock->info.pks[1].pData = NULL;
×
2122

2123
  pDstBlock->info.rows = 0;
×
2124
  pDstBlock->info.capacity = 0;
×
2125
  pDstBlock->info.rowSize = 0;
×
2126
  pDstBlock->info.id = pDataBlock->info.id;
×
2127
  pDstBlock->info.blankFill = pDataBlock->info.blankFill;
×
2128

2129
  for (int32_t i = 0; i < taosArrayGetSize(pColArray); ++i) {
×
2130
    SColIdPair *pColPair = taosArrayGet(pColArray, i);
×
2131
    QUERY_CHECK_NULL(pColPair, code, lino, _return, terrno);
×
2132

2133
    for (int32_t j = 0; j < taosArrayGetSize(pDataBlock->pDataBlock); ++j) {
×
2134
      SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, j);
×
2135
      if (p == NULL) {
×
2136
        continue;
×
2137
      }
2138

2139
      if (p->info.colId == pColPair->vtbColId) {
×
2140
        QUERY_CHECK_CODE(blockDataAppendColInfo(pDstBlock, p), lino, _return);
×
2141
        break;
×
2142
      }
2143
    }
2144
  }
2145

2146
  *pResBlock = pDstBlock;
×
2147
  return code;
×
2148
_return:
×
2149
  uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2150
  blockDataDestroy(pDstBlock);
×
2151
  return code;
×
2152
}
2153

2154
int32_t createOneDataBlockWithTwoBlock(const SSDataBlock* pDataBlock, const SSDataBlock* pOrgBlock, SSDataBlock** pResBlock) {
×
2155
  int32_t      code = TSDB_CODE_SUCCESS;
×
2156
  int32_t      lino = 0;
×
2157
  SSDataBlock *pDstBlock = NULL;
×
2158

2159
  QRY_PARAM_CHECK(pResBlock);
×
2160
  QUERY_CHECK_NULL(pDataBlock, code, lino, _return, TSDB_CODE_INVALID_PARA);
×
2161
  QUERY_CHECK_NULL(pOrgBlock, code, lino, _return, TSDB_CODE_INVALID_PARA);
×
2162

2163
  QUERY_CHECK_CODE(createOneDataBlock(pOrgBlock, false, &pDstBlock), lino, _return);
×
2164
  QUERY_CHECK_CODE(blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows), lino, _return);
×
2165

2166
  for (int32_t i = 0; i < taosArrayGetSize(pOrgBlock->pDataBlock); ++i) {
×
2167
    SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
×
2168
    SColumnInfoData* pSrc = taosArrayGet(pOrgBlock->pDataBlock, i);
×
2169

2170
    QUERY_CHECK_NULL(pDst, code, lino, _return, terrno);
×
2171
    QUERY_CHECK_NULL(pSrc, code, lino, _return, terrno);
×
2172

2173
    bool found = false;
×
2174
    for (int32_t j = 0; j < taosArrayGetSize(pDataBlock->pDataBlock); j++) {
×
2175
      SColumnInfoData *p = taosArrayGet(pDataBlock->pDataBlock, j);
×
2176
      if (p->info.slotId == pSrc->info.slotId) {
×
2177
        QUERY_CHECK_CODE(colDataAssign(pDst, p, (int32_t)pDataBlock->info.rows, &pDataBlock->info), lino, _return);
×
2178
        found = true;
×
2179
        break;
×
2180
      }
2181
    }
2182
    if (!found) {
×
2183
      colDataSetNNULL(pDst, 0, pDataBlock->info.rows);
×
2184
    }
2185

2186
  }
2187

2188
  pDstBlock->info.rows = pDataBlock->info.rows;
×
2189
  pDstBlock->info.capacity = pDataBlock->info.rows;
×
2190
  pDstBlock->info.window = pDataBlock->info.window;
×
2191

2192
  *pResBlock = pDstBlock;
×
2193
  return code;
×
2194
_return:
×
2195
  uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2196
  blockDataDestroy(pDstBlock);
×
2197
  return code;
×
2198
}
2199

2200
int32_t createDataBlock(SSDataBlock** pResBlock) {
14,119✔
2201
  QRY_PARAM_CHECK(pResBlock);
14,119!
2202
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
14,119!
2203
  if (pBlock == NULL) {
14,113!
2204
    return terrno;
×
2205
  }
2206

2207
  pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
14,113✔
2208
  if (pBlock->pDataBlock == NULL) {
14,121!
2209
    int32_t code = terrno;
×
2210
    taosMemoryFree(pBlock);
×
2211
    return code;
×
2212
  }
2213

2214
  *pResBlock = pBlock;
14,121✔
2215
  return 0;
14,121✔
2216
}
2217

2218
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData) {
56,381✔
2219
  if (pBlock->pDataBlock == NULL) {
56,381✔
2220
    pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
1,890✔
2221
    if (pBlock->pDataBlock == NULL) {
1,890!
2222
      return terrno;
×
2223
    }
2224
  }
2225

2226
  void* p = taosArrayPush(pBlock->pDataBlock, pColInfoData);
56,381✔
2227
  if (p == NULL) {
56,373!
2228
    return terrno;
×
2229
  }
2230

2231
  // todo disable it temporarily
2232
  //  A S S E R T(pColInfoData->info.type != 0);
2233
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
56,373!
2234
    pBlock->info.hasVarCol = true;
13,925✔
2235
  }
2236

2237
  pBlock->info.rowSize += pColInfoData->info.bytes;
56,373✔
2238
  return TSDB_CODE_SUCCESS;
56,373✔
2239
}
2240

2241
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId) {
32,492✔
2242
  SColumnInfoData col = {.hasNull = true};
32,492✔
2243
  col.info.colId = colId;
32,492✔
2244
  col.info.type = type;
32,492✔
2245
  col.info.bytes = bytes;
32,492✔
2246

2247
  return col;
32,492✔
2248
}
2249

2250
int32_t bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index, SColumnInfoData** pColInfoData) {
1,486,391✔
2251
  int32_t code = 0;
1,486,391✔
2252
  QRY_PARAM_CHECK(pColInfoData);
1,486,391!
2253

2254
  if (index >= taosArrayGetSize(pBlock->pDataBlock)) {
1,486,391!
2255
    return TSDB_CODE_INVALID_PARA;
×
2256
  }
2257

2258
  *pColInfoData = taosArrayGet(pBlock->pDataBlock, index);
1,486,947✔
2259
  if (*pColInfoData == NULL) {
1,487,420!
2260
    code = terrno;
2261
  }
2262

2263
  return code;
1,487,436✔
2264
}
2265

2266
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int32_t extraSize) {
147✔
2267
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
147✔
2268

2269
  int32_t payloadSize = pageSize - extraSize;
147✔
2270
  int32_t rowSize = pBlock->info.rowSize;
147✔
2271
  int32_t nRows = payloadSize / rowSize;
147✔
2272
  if (nRows < 1) {
147!
2273
    uError("rows %d in page is too small, payloadSize:%d, rowSize:%d", nRows, payloadSize, rowSize);
×
2274
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2275
    return -1;
×
2276
  }
2277

2278
  int32_t numVarCols = 0;
147✔
2279
  int32_t numFixCols = 0;
147✔
2280
  for (int32_t i = 0; i < numOfCols; ++i) {
669✔
2281
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
522✔
2282
    if (pCol == NULL) {
522!
2283
      return -1;
×
2284
    }
2285

2286
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
522!
2287
      ++numVarCols;
75✔
2288
    } else {
2289
      ++numFixCols;
447✔
2290
    }
2291
  }
2292

2293
  // find the data payload whose size is greater than payloadSize
2294
  int result = -1;
147✔
2295
  int start = 1;
147✔
2296
  int end = nRows;
147✔
2297
  while (start <= end) {
1,179✔
2298
    int mid = start + (end - start) / 2;
1,032✔
2299
    // data size + var data type columns offset + fixed data type columns bitmap len
2300
    int midSize = rowSize * mid + numVarCols * sizeof(int32_t) * mid + numFixCols * BitmapLen(mid);
1,032✔
2301
    if (midSize > payloadSize) {
1,032✔
2302
      result = mid;
174✔
2303
      end = mid - 1;
174✔
2304
    } else {
2305
      start = mid + 1;
858✔
2306
    }
2307
  }
2308

2309
  int32_t newRows = (result != -1) ? result - 1 : nRows;
147✔
2310
  // the true value must be less than the value of nRows
2311
  if (newRows > nRows || newRows < 1) {
147!
2312
    uError("invalid newRows:%d, nRows:%d", newRows, nRows);
×
2313
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2314
    return -1;
×
2315
  }
2316

2317
  return newRows;
147✔
2318
}
2319

2320
void colDataDestroy(SColumnInfoData* pColData) {
262,177✔
2321
  if (!pColData) {
262,177✔
2322
    return;
1✔
2323
  }
2324

2325
  if (IS_VAR_DATA_TYPE(pColData->info.type)) {
262,176!
2326
    taosMemoryFreeClear(pColData->varmeta.offset);
125,313✔
2327
  } else {
2328
    taosMemoryFreeClear(pColData->nullbitmap);
136,863!
2329
  }
2330

2331
  taosMemoryFreeClear(pColData->pData);
262,210!
2332
}
2333

2334
static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
1✔
2335
  int32_t len = BitmapLen(total);
1✔
2336

2337
  int32_t newLen = BitmapLen(total - n);
1✔
2338
  if (n % 8 == 0) {
1!
2339
    (void) memmove(nullBitmap, nullBitmap + n / 8, newLen);
×
2340
  } else {
2341
    int32_t  tail = n % 8;
1✔
2342
    int32_t  i = 0;
1✔
2343
    uint8_t* p = (uint8_t*)nullBitmap;
1✔
2344

2345
    if (n < 8) {
1!
2346
      while (i < len) {
3✔
2347
        uint8_t v = p[i];  // source bitmap value
2✔
2348
        p[i] = (v << tail);
2✔
2349

2350
        if (i < len - 1) {
2✔
2351
          uint8_t next = p[i + 1];
1✔
2352
          p[i] |= (next >> (8 - tail));
1✔
2353
        }
2354

2355
        i += 1;
2✔
2356
      }
2357
    } else if (n > 8) {
×
2358
      int32_t remain = (total % 8 != 0 && total % 8 <= tail) ? 1 : 0;
×
2359
      int32_t gap = len - newLen - remain;
×
2360
      while (i < newLen) {
×
2361
        uint8_t v = p[i + gap];
×
2362
        p[i] = (v << tail);
×
2363

2364
        if (i < newLen - 1 + remain) {
×
2365
          uint8_t next = p[i + gap + 1];
×
2366
          p[i] |= (next >> (8 - tail));
×
2367
        }
2368

2369
        i += 1;
×
2370
      }
2371
    }
2372
  }
2373
}
1✔
2374

2375
static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, size_t end) {
×
2376
  int32_t dataOffset = -1;
×
2377
  int32_t dataLen = 0;
×
2378
  int32_t beigin = start;
×
2379
  while (beigin < end) {
×
2380
    int32_t offset = pColInfoData->varmeta.offset[beigin];
×
2381
    if (offset == -1) {
×
2382
      beigin++;
×
2383
      continue;
×
2384
    }
2385
    if (start != 0) {
×
2386
      pColInfoData->varmeta.offset[beigin] = dataLen;
×
2387
    }
2388
    char* data = pColInfoData->pData + offset;
×
2389
    if (dataOffset == -1) dataOffset = offset;  // mark the begin of data
×
2390
    int32_t type = pColInfoData->info.type;
×
2391
    if (type == TSDB_DATA_TYPE_JSON) {
×
2392
      dataLen += getJsonValueLen(data);
×
2393
    } else {
2394
      dataLen += varDataTLen(data);
×
2395
    }
2396
    beigin++;
×
2397
  }
2398

2399
  if (dataOffset > 0) {
×
2400
    (void) memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen);
×
2401
  }
2402

2403
  (void) memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t));
×
2404
  return dataLen;
×
2405
}
2406

2407
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
1✔
2408
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
1!
2409
    // pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, n, total);
2410
    (void) memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t));
×
2411

2412
    // clear the offset value of the unused entries.
2413
    memset(&pColInfoData->varmeta.offset[total - n], 0, n);
×
2414
  } else {
2415
    int32_t bytes = pColInfoData->info.bytes;
1✔
2416
    (void) memmove(pColInfoData->pData, ((char*)pColInfoData->pData + n * bytes), (total - n) * bytes);
1✔
2417
    doShiftBitmap(pColInfoData->nullbitmap, n, total);
1✔
2418
  }
2419
}
1✔
2420

2421
int32_t blockDataTrimFirstRows(SSDataBlock* pBlock, size_t n) {
1✔
2422
  if (n == 0) {
1!
2423
    return TSDB_CODE_SUCCESS;
×
2424
  }
2425

2426
  if (pBlock->info.rows <= n) {
1!
2427
    blockDataEmpty(pBlock);
×
2428
  } else {
2429
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1✔
2430
    for (int32_t i = 0; i < numOfCols; ++i) {
2✔
2431
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
1✔
2432
      if (pColInfoData == NULL) {
1!
2433
        return terrno;
×
2434
      }
2435

2436
      colDataTrimFirstNRows(pColInfoData, n, pBlock->info.rows);
1✔
2437
    }
2438

2439
    pBlock->info.rows -= n;
1✔
2440
  }
2441
  return TSDB_CODE_SUCCESS;
1✔
2442
}
2443

2444
static void colDataKeepFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
755✔
2445
  if (n >= total || n == 0) return;
755!
2446
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
755!
2447
    if (pColInfoData->varmeta.length != 0) {
81!
2448
      int32_t newLen = pColInfoData->varmeta.offset[n];
81✔
2449
      if (-1 == newLen) {
81!
2450
        for (int i = n - 1; i >= 0; --i) {
×
2451
          newLen = pColInfoData->varmeta.offset[i];
×
2452
          if (newLen != -1) {
×
2453
            if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
×
2454
              newLen += getJsonValueLen(pColInfoData->pData + newLen);
×
2455
            } else {
2456
              newLen += varDataTLen(pColInfoData->pData + newLen);
×
2457
            }
2458
            break;
×
2459
          }
2460
        }
2461
      }
2462
      if (newLen <= -1) {
81!
2463
        uFatal("colDataKeepFirstNRows: newLen:%d  old:%d", newLen, pColInfoData->varmeta.length);
×
2464
      } else {
2465
        pColInfoData->varmeta.length = newLen;
81✔
2466
      }
2467
    }
2468
    // pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, 0, n);
2469
    memset(&pColInfoData->varmeta.offset[n], 0, total - n);
81✔
2470
  }
2471
}
2472

2473
void blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n) {
392✔
2474
  if (n == 0) {
392✔
2475
    blockDataEmpty(pBlock);
60✔
2476
    return ;
60✔
2477
  }
2478

2479
  if (pBlock->info.rows <= n) {
332✔
2480
    return ;
219✔
2481
  } else {
2482
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
113✔
2483
    for (int32_t i = 0; i < numOfCols; ++i) {
868✔
2484
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
755✔
2485
      if (pColInfoData == NULL) {
755!
2486
        continue;
×
2487
      }
2488

2489
      colDataKeepFirstNRows(pColInfoData, n, pBlock->info.rows);
755✔
2490
    }
2491

2492
    pBlock->info.rows = n;
113✔
2493
  }
2494
}
2495

2496
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
4,794✔
2497
  int64_t tbUid = pBlock->info.id.uid;
4,794✔
2498
  int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
4,794✔
2499
  int16_t hasVarCol = pBlock->info.hasVarCol;
4,794✔
2500
  int64_t rows = pBlock->info.rows;
4,794✔
2501
  int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
4,794✔
2502

2503
  int32_t tlen = 0;
4,794✔
2504
  tlen += taosEncodeFixedI64(buf, tbUid);
4,794✔
2505
  tlen += taosEncodeFixedI16(buf, numOfCols);
4,794✔
2506
  tlen += taosEncodeFixedI16(buf, hasVarCol);
9,588✔
2507
  tlen += taosEncodeFixedI64(buf, rows);
4,794✔
2508
  tlen += taosEncodeFixedI32(buf, sz);
4,794✔
2509
  for (int32_t i = 0; i < sz; i++) {
10,721✔
2510
    SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
5,928✔
2511
    if (pColData == NULL) {
5,928!
2512
      return terrno;
×
2513
    }
2514

2515
    tlen += taosEncodeFixedI16(buf, pColData->info.colId);
5,928✔
2516
    tlen += taosEncodeFixedI8(buf, pColData->info.type);
5,928✔
2517
    tlen += taosEncodeFixedI32(buf, pColData->info.bytes);
5,928✔
2518
    tlen += taosEncodeFixedBool(buf, pColData->hasNull);
5,928✔
2519

2520
    if (IS_VAR_DATA_TYPE(pColData->info.type)) {
5,928!
2521
      tlen += taosEncodeBinary(buf, pColData->varmeta.offset, sizeof(int32_t) * rows);
1,040✔
2522
    } else {
2523
      tlen += taosEncodeBinary(buf, pColData->nullbitmap, BitmapLen(rows));
10,816✔
2524
    }
2525

2526
    int32_t len = colDataGetLength(pColData, rows);
5,928✔
2527
    tlen += taosEncodeFixedI32(buf, len);
5,927✔
2528

2529
    if (pColData->reassigned && IS_VAR_DATA_TYPE(pColData->info.type)) {
5,927!
2530
      for (int32_t row = 0; row < rows; ++row) {
×
2531
        char*   pData = pColData->pData + pColData->varmeta.offset[row];
×
2532
        int32_t colSize = 0;
×
2533
        if (pColData->info.type == TSDB_DATA_TYPE_JSON) {
×
2534
          colSize = getJsonValueLen(pData);
×
2535
        } else {
2536
          colSize = varDataTLen(pData);
×
2537
        }
2538
        tlen += taosEncodeBinary(buf, pData, colSize);
×
2539
      }
2540
    } else {
2541
      tlen += taosEncodeBinary(buf, pColData->pData, len);
11,854✔
2542
    }
2543
  }
2544
  return tlen;
4,793✔
2545
}
2546

2547
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
2,397✔
2548
  int32_t sz = 0;
2,397✔
2549
  int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
2,397✔
2550

2551
  buf = taosDecodeFixedU64(buf, &pBlock->info.id.uid);
4,792!
2552
  buf = taosDecodeFixedI16(buf, &numOfCols);
2,396✔
2553
  buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol);
2,396!
2554
  buf = taosDecodeFixedI64(buf, &pBlock->info.rows);
4,792!
2555
  buf = taosDecodeFixedI32(buf, &sz);
2,396✔
2556

2557
  pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
2,396✔
2558
  if (pBlock->pDataBlock == NULL) {
2,395!
2559
    return NULL;
×
2560
  }
2561

2562
  for (int32_t i = 0; i < sz; i++) {
5,358✔
2563
    SColumnInfoData data = {0};
2,962!
2564
    buf = taosDecodeFixedI16(buf, &data.info.colId);
2,962✔
2565
    buf = taosDecodeFixedI8(buf, &data.info.type);
2,962✔
2566
    buf = taosDecodeFixedI32(buf, &data.info.bytes);
2,962✔
2567
    buf = taosDecodeFixedBool(buf, &data.hasNull);
2,962✔
2568

2569
    if (IS_VAR_DATA_TYPE(data.info.type)) {
2,962!
2570
      buf = taosDecodeBinary(buf, (void**)&data.varmeta.offset, pBlock->info.rows * sizeof(int32_t));
521✔
2571
    } else {
2572
      buf = taosDecodeBinary(buf, (void**)&data.nullbitmap, BitmapLen(pBlock->info.rows));
5,405!
2573
    }
2574
    if(buf == NULL) {
2,964!
2575
      uError("failed to decode null bitmap/offset, type:%d", data.info.type);
×
2576
      goto _error;
×
2577
    }
2578

2579
    int32_t len = 0;
2,964✔
2580
    buf = taosDecodeFixedI32(buf, &len);
2,964✔
2581
    buf = taosDecodeBinary(buf, (void**)&data.pData, len);
2,964✔
2582
    if (buf == NULL) {
2,964!
2583
      uError("failed to decode data, type:%d", data.info.type);
×
2584
      goto _error;
×
2585
    }
2586
    if (IS_VAR_DATA_TYPE(data.info.type)) {
2,964!
2587
      data.varmeta.length = len;
260✔
2588
      data.varmeta.allocLen = len;
260✔
2589
    }
2590

2591
    void* px = taosArrayPush(pBlock->pDataBlock, &data);
2,964✔
2592
    if (px == NULL) {
2,963!
2593
      return NULL;
×
2594
    }
2595
  }
2596

2597
  return (void*)buf;
2,396✔
2598
_error:
×
2599
  for (int32_t i = 0; i < sz; ++i) {
×
2600
    SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
×
2601
    if (pColInfoData == NULL) {
×
2602
      break;
×
2603
    }
2604
    colDataDestroy(pColInfoData);
×
2605
  }
2606
  return NULL;
×
2607
}
2608

2609
static int32_t formatTimestamp(char* buf, size_t cap, int64_t val, int precision) {
×
2610
  time_t  tt;
2611
  int32_t ms = 0;
×
2612
  int32_t code = TSDB_CODE_SUCCESS;
×
2613
  int32_t lino = 0;
×
2614
  if (precision == TSDB_TIME_PRECISION_NANO) {
×
2615
    tt = (time_t)(val / 1000000000);
×
2616
    ms = val % 1000000000;
×
2617
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
×
2618
    tt = (time_t)(val / 1000000);
×
2619
    ms = val % 1000000;
×
2620
  } else {
2621
    tt = (time_t)(val / 1000);
×
2622
    ms = val % 1000;
×
2623
  }
2624

2625
  if (tt <= 0 && ms < 0) {
×
2626
    tt--;
×
2627
    if (precision == TSDB_TIME_PRECISION_NANO) {
×
2628
      ms += 1000000000;
×
2629
    } else if (precision == TSDB_TIME_PRECISION_MICRO) {
×
2630
      ms += 1000000;
×
2631
    } else {
2632
      ms += 1000;
×
2633
    }
2634
  }
2635
  struct tm ptm = {0};
×
2636
  if (taosLocalTime(&tt, &ptm, buf, cap, NULL) == NULL) {
×
2637
    code =  TSDB_CODE_INTERNAL_ERROR;
×
2638
    TSDB_CHECK_CODE(code, lino, _end);
×
2639
  }
2640

2641
  size_t pos = taosStrfTime(buf, cap, "%Y-%m-%d %H:%M:%S", &ptm);
×
2642
  if (pos == 0) {
×
2643
    code = TSDB_CODE_OUT_OF_BUFFER;
×
2644
    TSDB_CHECK_CODE(code, lino, _end);
×
2645
  }
2646
  int32_t nwritten = 0;
×
2647
  if (precision == TSDB_TIME_PRECISION_NANO) {
×
2648
    nwritten = snprintf(buf + pos, cap - pos, ".%09d", ms);
×
2649
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
×
2650
    nwritten = snprintf(buf + pos, cap - pos, ".%06d", ms);
×
2651
  } else {
2652
    nwritten = snprintf(buf + pos, cap - pos, ".%03d", ms);
×
2653
  }
2654

2655
  if (nwritten >= cap - pos) {
×
2656
    code = TSDB_CODE_OUT_OF_BUFFER;
×
2657
    TSDB_CHECK_CODE(code, lino, _end);
×
2658
  }
2659

2660
_end:
×
2661
  if (code != TSDB_CODE_SUCCESS) {
×
2662
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2663
  }
2664
  return code;
×
2665
}
2666

2667
// for debug
2668
int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, const char* taskIdStr) {
3,036✔
2669
  int32_t lino = 0;
3,036✔
2670
  int32_t size = 2048 * 1024;
3,036✔
2671
  int32_t code = 0;
3,036✔
2672
  char*   dumpBuf = NULL;
3,036✔
2673
  char    pBuf[TD_TIME_STR_LEN] = {0};
3,036✔
2674
  int32_t rows = pDataBlock->info.rows;
3,036✔
2675
  int32_t len = 0;
3,036✔
2676

2677
  dumpBuf = taosMemoryCalloc(size, 1);
3,036!
2678
  if (dumpBuf == NULL) {
3,036!
2679
    return terrno;
×
2680
  }
2681

2682
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
3,036✔
2683
  len += tsnprintf(dumpBuf + len, size - len,
6,072✔
2684
                  "%s===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64 "|rows:%" PRId64
2685
                  "|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
2686
                  taskIdStr, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId,
3,036✔
2687
                  pDataBlock->info.id.groupId, pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
2688
                  pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
3,036✔
2689
  goto _exit;
3,036✔
2690
  if (len >= size - 1) {
2691
    goto _exit;
2692
  }
2693

2694
  for (int32_t j = 0; j < rows; j++) {
2695
    len += tsnprintf(dumpBuf + len, size - len, "%s|", flag);
2696
    if (len >= size - 1) {
2697
      goto _exit;
2698
    }
2699

2700
    for (int32_t k = 0; k < colNum; k++) {
2701
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
2702
      if (pColInfoData == NULL) {
2703
        code = terrno;
2704
        lino = __LINE__;
2705
        goto _exit;
2706
      }
2707

2708
      if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) {
2709
        len += tsnprintf(dumpBuf + len, size - len, " %15s |", "NULL");
2710
        if (len >= size - 1) goto _exit;
2711
        continue;
2712
      }
2713

2714
      void* var = colDataGetData(pColInfoData, j);
2715
      switch (pColInfoData->info.type) {
2716
        case TSDB_DATA_TYPE_TIMESTAMP:
2717
          memset(pBuf, 0, sizeof(pBuf));
2718
          code = formatTimestamp(pBuf, sizeof(pBuf), *(uint64_t*)var, pColInfoData->info.precision);
2719
          if (code != TSDB_CODE_SUCCESS) {
2720
            TAOS_UNUSED(tsnprintf(pBuf, sizeof(pBuf), "NaN"));
2721
          }
2722
          len += tsnprintf(dumpBuf + len, size - len, " %25s |", pBuf);
2723
          if (len >= size - 1) goto _exit;
2724
          break;
2725
        case TSDB_DATA_TYPE_TINYINT:
2726
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(int8_t*)var);
2727
          if (len >= size - 1) goto _exit;
2728
          break;
2729
        case TSDB_DATA_TYPE_UTINYINT:
2730
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(uint8_t*)var);
2731
          if (len >= size - 1) goto _exit;
2732
          break;
2733
        case TSDB_DATA_TYPE_SMALLINT:
2734
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(int16_t*)var);
2735
          if (len >= size - 1) goto _exit;
2736
          break;
2737
        case TSDB_DATA_TYPE_USMALLINT:
2738
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(uint16_t*)var);
2739
          if (len >= size - 1) goto _exit;
2740
          break;
2741
        case TSDB_DATA_TYPE_INT:
2742
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var);
2743
          if (len >= size - 1) goto _exit;
2744
          break;
2745
        case TSDB_DATA_TYPE_UINT:
2746
          len += tsnprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var);
2747
          if (len >= size - 1) goto _exit;
2748
          break;
2749
        case TSDB_DATA_TYPE_BIGINT:
2750
          len += tsnprintf(dumpBuf + len, size - len, " %15" PRId64 " |", *(int64_t*)var);
2751
          if (len >= size - 1) goto _exit;
2752
          break;
2753
        case TSDB_DATA_TYPE_UBIGINT:
2754
          len += tsnprintf(dumpBuf + len, size - len, " %15" PRIu64 " |", *(uint64_t*)var);
2755
          if (len >= size - 1) goto _exit;
2756
          break;
2757
        case TSDB_DATA_TYPE_FLOAT:
2758
          len += tsnprintf(dumpBuf + len, size - len, " %15f |", *(float*)var);
2759
          if (len >= size - 1) goto _exit;
2760
          break;
2761
        case TSDB_DATA_TYPE_DOUBLE:
2762
          len += tsnprintf(dumpBuf + len, size - len, " %15f |", *(double*)var);
2763
          if (len >= size - 1) goto _exit;
2764
          break;
2765
        case TSDB_DATA_TYPE_BOOL:
2766
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var);
2767
          if (len >= size - 1) goto _exit;
2768
          break;
2769
        case TSDB_DATA_TYPE_VARCHAR:
2770
        case TSDB_DATA_TYPE_VARBINARY:
2771
        case TSDB_DATA_TYPE_GEOMETRY: {
2772
          memset(pBuf, 0, sizeof(pBuf));
2773
          char*   pData = colDataGetVarData(pColInfoData, j);
2774
          int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
2775
          dataSize = TMIN(dataSize, 50);
2776
          memcpy(pBuf, varDataVal(pData), dataSize);
2777
          len += tsnprintf(dumpBuf + len, size - len, " %15s |", pBuf);
2778
          if (len >= size - 1) goto _exit;
2779
        } break;
2780
        case TSDB_DATA_TYPE_NCHAR: {
2781
          char*   pData = colDataGetVarData(pColInfoData, j);
2782
          int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
2783
          memset(pBuf, 0, sizeof(pBuf));
2784
          code = taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf, NULL);
2785
          if (code < 0) {
2786
            uError("func %s failed to convert to ucs charset since %s", __func__, tstrerror(code));
2787
            lino = __LINE__;
2788
            goto _exit;
2789
          } else { // reset the length value
2790
            code = TSDB_CODE_SUCCESS;
2791
          }
2792
          len += tsnprintf(dumpBuf + len, size - len, " %15s |", pBuf);
2793
          if (len >= size - 1) goto _exit;
2794
        } break;
2795
      }
2796
    }
2797
    len += tsnprintf(dumpBuf + len, size - len, "%d\n", j);
2798
    if (len >= size - 1) goto _exit;
2799
  }
2800
  len += tsnprintf(dumpBuf + len, size - len, "%s |end\n", flag);
2801

2802
_exit:
3,036✔
2803
  if (code == TSDB_CODE_SUCCESS) {
3,036!
2804
    *pDataBuf = dumpBuf;
3,036✔
2805
    dumpBuf = NULL;
3,036✔
2806
  } else {
2807
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2808
    if (dumpBuf) {
×
2809
      taosMemoryFree(dumpBuf);
×
2810
    }
2811
  }
2812
  return code;
3,036✔
2813
}
2814

2815
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, const STSchema* pTSchema,
×
2816
                                    int64_t uid, int32_t vgId, tb_uid_t suid) {
2817
  SSubmitReq2* pReq = *ppReq;
×
2818
  SArray*      pVals = NULL;
×
2819
  int32_t      sz = 1;
×
2820
  int32_t      code = 0;
×
2821
  *ppReq = NULL;
×
2822
  terrno = 0;
×
2823

2824
  if (NULL == pReq) {
×
2825
    if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
×
2826
      code = terrno;
×
2827
      goto _end;
×
2828
    }
2829

2830
    if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
×
2831
      code = terrno;
×
2832
      goto _end;
×
2833
    }
2834
  }
2835

2836
  for (int32_t i = 0; i < sz; ++i) {
×
2837
    int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
×
2838
    int32_t rows = pDataBlock->info.rows;
×
2839

2840
    if (colNum <= 1) {  // invalid if only with TS col
×
2841
      continue;
×
2842
    }
2843

2844
    // the rsma result should has the same column number with schema.
2845
    if (colNum != pTSchema->numOfCols) {
×
2846
      uError("colNum %d is not equal to numOfCols %d", colNum, pTSchema->numOfCols);
×
2847
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2848
      goto _end;
×
2849
    }
2850

2851
    SSubmitTbData tbData = {0};
×
2852

2853
    if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
×
2854
      code = terrno;
×
2855
      goto _end;
×
2856
    }
2857

2858
    tbData.suid = suid;
×
2859
    tbData.uid = uid;
×
2860
    tbData.sver = pTSchema->version;
×
2861

2862
    if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
×
2863
      code = terrno;
×
2864
      taosArrayDestroy(tbData.aRowP);
×
2865
      goto _end;
×
2866
    }
2867

2868
    for (int32_t j = 0; j < rows; ++j) {  // iterate by row
×
2869

2870
      taosArrayClear(pVals);
×
2871

2872
      bool    isStartKey = false;
×
2873
      int32_t offset = 0;
×
2874
      for (int32_t k = 0; k < colNum; ++k) {  // iterate by column
×
2875
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
×
2876
        if (pColInfoData == NULL) {
×
2877
          return terrno;
×
2878
        }
2879

2880
        const STColumn*  pCol = &pTSchema->columns[k];
×
2881
        void*            var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
×
2882

2883
        switch (pColInfoData->info.type) {
×
2884
          case TSDB_DATA_TYPE_TIMESTAMP:
×
2885
            if (pColInfoData->info.type != pCol->type) {
×
2886
              uError("colType:%d mismatch with sechma colType:%d", pColInfoData->info.type, pCol->type);
×
2887
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2888
              return terrno;
×
2889
            }
2890
            if (!isStartKey) {
×
2891
              isStartKey = true;
×
2892
              if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId) {
×
2893
                uError("the first timestamp colId %d is not primary colId", pCol->colId);
×
2894
                terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2895
                return terrno;
×
2896
              }
2897
              SValue val = {.type = pCol->type};
×
2898
              VALUE_SET_TRIVIAL_DATUM(&val, *(TSKEY*)var);
×
2899
              SColVal cv = COL_VAL_VALUE(pCol->colId, val);
×
2900
              void*   px = taosArrayPush(pVals, &cv);
×
2901
              if (px == NULL) {
×
2902
                return terrno;
×
2903
              }
2904

2905
            } else if (colDataIsNull_s(pColInfoData, j)) {
×
2906
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
2907
              void*   px = taosArrayPush(pVals, &cv);
×
2908
              if (px == NULL) {
×
2909
                return terrno;
×
2910
              }
2911
            } else {
2912
              SValue val = {.type = pCol->type};
×
2913
              VALUE_SET_TRIVIAL_DATUM(&val, *(int64_t*)var);
×
2914
              SColVal cv = COL_VAL_VALUE(pCol->colId, val);
×
2915
              void*   px = taosArrayPush(pVals, &cv);
×
2916
              if (px == NULL) {
×
2917
                return terrno;
×
2918
              }
2919
            }
2920
            break;
×
2921
          case TSDB_DATA_TYPE_NCHAR:
×
2922
          case TSDB_DATA_TYPE_VARBINARY:
2923
          case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
2924
            if (pColInfoData->info.type != pCol->type) {
×
2925
              uError("colType:%d mismatch with sechma colType:%d", pColInfoData->info.type, pCol->type);
×
2926
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2927
              return terrno;
×
2928
            }
2929
            if (colDataIsNull_s(pColInfoData, j)) {
×
2930
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
2931
              void* px = taosArrayPush(pVals, &cv);
×
2932
              if (px == NULL) {
×
2933
                goto _end;
×
2934
              }
2935
            } else {
2936
              void*  data = colDataGetVarData(pColInfoData, j);
×
2937
              SValue sv = (SValue){
×
2938
                  .type = pCol->type, .nData = varDataLen(data), .pData = (uint8_t*) varDataVal(data)};  // address copy, no value
×
2939
              SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
×
2940
              void* px = taosArrayPush(pVals, &cv);
×
2941
              if (px == NULL) {
×
2942
                code = terrno;
×
2943
                goto _end;
×
2944
              }
2945
            }
2946
            break;
×
2947
          }
2948
          case TSDB_DATA_TYPE_DECIMAL:
×
2949
          case TSDB_DATA_TYPE_BLOB:
2950
          case TSDB_DATA_TYPE_JSON:
2951
          case TSDB_DATA_TYPE_MEDIUMBLOB:
2952
            uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
2953
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2954
            return terrno;
×
2955
            break;
2956
          default:
×
2957
            if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
×
2958
              if (colDataIsNull_s(pColInfoData, j)) {
×
2959
                SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);  // should use pCol->type
×
2960
                void* px = taosArrayPush(pVals, &cv);
×
2961
                if (px == NULL) {
×
2962
                  goto _end;
×
2963
                }
2964
              } else {
2965
                SValue sv = {.type = pCol->type};
×
2966
                if (pCol->type == pColInfoData->info.type) {
×
2967
                  valueSetDatum(&sv, sv.type, var, tDataTypes[pCol->type].bytes);
×
2968
                } else {
2969
                  /**
2970
                   *  1. sum/avg would convert to int64_t/uint64_t/double during aggregation
2971
                   *  2. below conversion may lead to overflow or loss, the app should select the right data type.
2972
                   */
2973
                  char tv[DATUM_MAX_SIZE] = {0};
×
2974
                  if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) {
×
2975
                    float v = 0;
×
2976
                    GET_TYPED_DATA(v, float, pColInfoData->info.type, var, typeGetTypeModFromColInfo(&pColInfoData->info));
×
2977
                    SET_TYPED_DATA(&tv, pCol->type, v);
×
2978
                  } else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) {
×
2979
                    double v = 0;
×
2980
                    GET_TYPED_DATA(v, double, pColInfoData->info.type, var, typeGetTypeModFromColInfo(&pColInfoData->info));
×
2981
                    SET_TYPED_DATA(&tv, pCol->type, v);
×
2982
                  } else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) {
×
2983
                    int64_t v = 0;
×
2984
                    GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var, typeGetTypeModFromColInfo(&pColInfoData->info));
×
2985
                    SET_TYPED_DATA(&tv, pCol->type, v);
×
2986
                  } else {
2987
                    uint64_t v = 0;
×
2988
                    GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var, typeGetTypeModFromColInfo(&pColInfoData->info));
×
2989
                    SET_TYPED_DATA(&tv, pCol->type, v);
×
2990
                  }
2991
                  valueSetDatum(&sv, sv.type, tv, tDataTypes[pCol->type].bytes);
×
2992
                }
2993
                SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
×
2994
                void* px = taosArrayPush(pVals, &cv);
×
2995
                if (px == NULL) {
×
2996
                  code = terrno;
×
2997
                  goto _end;
×
2998
                }
2999
              }
3000
            } else {
3001
              uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
3002
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3003
              return terrno;
×
3004
            }
3005
            break;
×
3006
        }
3007
      }
3008
      SRow* pRow = NULL;
×
3009
      if ((code = tRowBuild(pVals, pTSchema, &pRow)) < 0) {
×
3010
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
3011
        goto _end;
×
3012
      }
3013

3014
      void* px = taosArrayPush(tbData.aRowP, &pRow);
×
3015
      if (px == NULL) {
×
3016
        code = terrno;
×
3017
        goto _end;
×
3018
      }
3019
    }
3020

3021
    void* px = taosArrayPush(pReq->aSubmitTbData, &tbData);
×
3022
    if (px == NULL) {
×
3023
      code = terrno;
×
3024
      goto _end;
×
3025
    }
3026
  }
3027

3028
_end:
×
3029
  taosArrayDestroy(pVals);
×
3030
  if (code != 0) {
×
3031
    if (pReq) {
×
3032
      tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
3033
      taosMemoryFreeClear(pReq);
×
3034
    }
3035
  } else {
3036
    *ppReq = pReq;
×
3037
  }
3038

3039
  return code;
×
3040
}
3041

3042
// Construct the child table name in the form of <ctbName>_<stbName>_<groupId> and store it in `ctbName`.
3043
int32_t buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId, size_t cap) {
19✔
3044
  int32_t   code = TSDB_CODE_SUCCESS;
19✔
3045
  int32_t   lino = 0;
19✔
3046
  char      tmp[TSDB_TABLE_NAME_LEN] = {0};
19✔
3047

3048
  if (ctbName == NULL || cap < TSDB_TABLE_NAME_LEN) {
19✔
3049
    code = TSDB_CODE_INTERNAL_ERROR;
2✔
3050
    TSDB_CHECK_CODE(code, lino, _end);
2!
3051
  }
3052

3053
  if (stbName == NULL) {
17✔
3054
    snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId);
2✔
3055
  } else {
3056
    int32_t i = strlen(stbName) - 1;
15✔
3057
    for (; i >= 0; i--) {
228✔
3058
      if (stbName[i] == '.') {
224✔
3059
        break;
11✔
3060
      }
3061
    }
3062
    snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%" PRIu64, stbName + i + 1, groupId);
15✔
3063
  }
3064

3065
  ctbName[cap - strlen(tmp) - 1] = 0;  // put stbname + groupId to the end
17✔
3066
  size_t prefixLen = strlen(ctbName);
17✔
3067
  ctbName = strncat(ctbName, tmp, cap - prefixLen - 1);
17✔
3068

3069
  for (char* p = ctbName; *p; ++p) {
637✔
3070
    if (*p == '.') *p = '_';
620!
3071
  }
3072

3073
_end:
17✔
3074
  if (code != TSDB_CODE_SUCCESS) {
19✔
3075
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
2!
3076
  }
3077
  return code;
19✔
3078
}
3079

3080
// auto stream subtable name starts with 't_', followed by the first segment of MD5 digest for group vals.
3081
// the total length is fixed to be 34 bytes.
3082
bool isAutoTableName(char* ctbName) { return (strlen(ctbName) == 34 && ctbName[0] == 't' && ctbName[1] == '_'); }
36!
3083

3084
bool alreadyAddGroupId(char* ctbName, int64_t groupId) {
24✔
3085
  char tmp[64] = {0};
24✔
3086
  snprintf(tmp, sizeof(tmp), "%" PRIu64, groupId);
24✔
3087
  size_t len1 = strlen(ctbName);
24✔
3088
  size_t len2 = strlen(tmp);
24✔
3089
  if (len1 < len2) return false;
24✔
3090
  return memcmp(ctbName + len1 - len2, tmp, len2) == 0;
13✔
3091
}
3092

3093
int32_t buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId, char** pName) {
×
3094
  QRY_PARAM_CHECK(pName);
×
3095

3096
  char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
×
3097
  if (!pBuf) {
×
3098
    return terrno;
×
3099
  }
3100

3101
  int32_t code = buildCtbNameByGroupIdImpl(stbFullName, groupId, pBuf);
×
3102
  if (code != TSDB_CODE_SUCCESS) {
×
3103
    taosMemoryFree(pBuf);
×
3104
  } else {
3105
    *pName = pBuf;
×
3106
  }
3107

3108
  return code;
×
3109
}
3110

3111
int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, char* cname) {
15✔
3112
  if (stbFullName[0] == 0) {
15!
3113
    return TSDB_CODE_INVALID_PARA;
×
3114
  }
3115

3116
  SArray* tags = taosArrayInit(0, sizeof(SSmlKv));
15✔
3117
  if (tags == NULL) {
15!
3118
    return terrno;
×
3119
  }
3120

3121
  if (cname == NULL) {
15!
3122
    taosArrayDestroy(tags);
×
3123
    return TSDB_CODE_INVALID_PARA;
×
3124
  }
3125

3126
  int8_t      type = TSDB_DATA_TYPE_UBIGINT;
15✔
3127
  const char* name = "group_id";
15✔
3128
  int32_t     len = strlen(name);
15✔
3129

3130
  SSmlKv pTag = {.key = name, .keyLen = len, .type = type, .u = groupId, .length = sizeof(uint64_t)};
15✔
3131
  void*  px = taosArrayPush(tags, &pTag);
15✔
3132
  if (px == NULL) {
15!
3133
    return terrno;
×
3134
  }
3135

3136
  RandTableName rname = {
15✔
3137
      .tags = tags, .stbFullName = stbFullName, .stbFullNameLen = strlen(stbFullName), .ctbShortName = cname};
15✔
3138

3139
  int32_t code = buildChildTableName(&rname);
15✔
3140
  if (code != TSDB_CODE_SUCCESS) {
15!
3141
    return code;
×
3142
  }
3143

3144
  taosArrayDestroy(tags);
15✔
3145
  if ((rname.ctbShortName && rname.ctbShortName[0]) == 0) {
15!
3146
    return TSDB_CODE_INVALID_PARA;
×
3147
  }
3148

3149
  return code;
15✔
3150
}
3151

3152
int32_t buildSinkDestTableName(char* parTbName, const char* stbFullName, uint64_t gid, bool newSubTableRule,
×
3153
                               char** dstTableName) {
3154
  int32_t code = TSDB_CODE_SUCCESS;
×
3155
  int32_t lino = 0;
×
3156

3157
  if (parTbName[0]) {
×
3158
    if (newSubTableRule && !isAutoTableName(parTbName) && !alreadyAddGroupId(parTbName, gid) && gid != 0 &&
×
3159
        stbFullName) {
3160
      *dstTableName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
×
3161
      TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);
×
3162

3163
      tstrncpy(*dstTableName, parTbName, TSDB_TABLE_NAME_LEN);
×
3164
      code = buildCtbNameAddGroupId(stbFullName, *dstTableName, gid, TSDB_TABLE_NAME_LEN);
×
3165
      TSDB_CHECK_CODE(code, lino, _end);
×
3166
    } else {
3167
      *dstTableName = taosStrdup(parTbName);
×
3168
      TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);
×
3169
    }
3170
  } else {
3171
    code = buildCtbNameByGroupId(stbFullName, gid, dstTableName);
×
3172
    TSDB_CHECK_CODE(code, lino, _end);
×
3173
  }
3174

3175
_end:
×
3176
  return code;
×
3177
}
3178

3179
// return length of encoded data, return -1 if failed
3180
int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, int32_t numOfCols) {
184,843✔
3181
  int32_t code = blockDataCheck(pBlock);
184,843✔
3182
  if (code != TSDB_CODE_SUCCESS) {
184,963✔
3183
    terrno = code;
12✔
3184
    return -1;
×
3185
  }
3186

3187
  int32_t dataLen = 0;
184,951✔
3188

3189
  // todo extract method
3190
  int32_t* version = (int32_t*)data;
184,951✔
3191
  *version = BLOCK_VERSION_1;
184,951✔
3192
  data += sizeof(int32_t);
184,951✔
3193

3194
  int32_t* actualLen = (int32_t*)data;
184,951✔
3195
  data += sizeof(int32_t);
184,951✔
3196

3197
  int32_t* rows = (int32_t*)data;
184,951✔
3198
  *rows = pBlock->info.rows;
184,951✔
3199
  data += sizeof(int32_t);
184,951✔
3200
  if (*rows <= 0) {
184,951!
3201
    uError("Invalid rows %d in block", *rows);
×
3202
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3203
    return -1;
×
3204
  }
3205

3206
  int32_t* cols = (int32_t*)data;
184,951✔
3207
  *cols = numOfCols;
184,951✔
3208
  data += sizeof(int32_t);
184,951✔
3209

3210
  // flag segment.
3211
  // the inital bit is for column info
3212
  int32_t* flagSegment = (int32_t*)data;
184,951✔
3213
  *flagSegment = (1 << 31);
184,951✔
3214

3215
  data += sizeof(int32_t);
184,951✔
3216

3217
  uint64_t* groupId = (uint64_t*)data;
184,951✔
3218
  data += sizeof(uint64_t);
184,951✔
3219

3220
  for (int32_t i = 0; i < numOfCols; ++i) {
1,080,538✔
3221
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
895,523✔
3222
    if (pColInfoData == NULL) {
895,585!
3223
      return -1;
×
3224
    }
3225

3226
    *((int8_t*)data) = pColInfoData->info.type;
895,585✔
3227
    data += sizeof(int8_t);
895,585✔
3228

3229
    int32_t bytes = pColInfoData->info.bytes;
895,585✔
3230
    *((int32_t*)data) = bytes;
895,585✔
3231
    if (IS_DECIMAL_TYPE(pColInfoData->info.type)) {
895,585!
3232
      fillBytesForDecimalType((int32_t*)data, pColInfoData->info.type, pColInfoData->info.precision,
20✔
3233
                              pColInfoData->info.scale);
20✔
3234
    }
3235
    data += sizeof(int32_t);
895,587✔
3236
  }
3237

3238
  int32_t* colSizes = (int32_t*)data;
185,015✔
3239
  data += numOfCols * sizeof(int32_t);
185,015✔
3240

3241
  dataLen = blockDataGetSerialMetaSize(numOfCols);
185,015✔
3242

3243
  int32_t numOfRows = pBlock->info.rows;
184,962✔
3244
  for (int32_t col = 0; col < numOfCols; ++col) {
1,080,549✔
3245
    SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);
895,465✔
3246
    if (pColRes == NULL) {
895,569!
3247
      return -1;
×
3248
    }
3249

3250
    // copy the null bitmap
3251
    size_t metaSize = 0;
895,569✔
3252
    if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
895,569!
3253
      metaSize = numOfRows * sizeof(int32_t);
170,654✔
3254
      if(dataLen + metaSize > dataBuflen) goto _exit;
170,654!
3255
      memcpy(data, pColRes->varmeta.offset, metaSize);
170,654✔
3256
    } else {
3257
      metaSize = BitmapLen(numOfRows);
724,915✔
3258
      if(dataLen + metaSize > dataBuflen) goto _exit;
724,915!
3259
      memcpy(data, pColRes->nullbitmap, metaSize);
724,915✔
3260
    }
3261

3262
    data += metaSize;
895,569✔
3263
    dataLen += metaSize;
895,569✔
3264

3265
    if (pColRes->reassigned && IS_VAR_DATA_TYPE(pColRes->info.type)) {
895,569!
3266
      colSizes[col] = 0;
×
3267
      for (int32_t row = 0; row < numOfRows; ++row) {
×
3268
        char*   pColData = pColRes->pData + pColRes->varmeta.offset[row];
×
3269
        int32_t colSize = 0;
×
3270
        if (pColRes->info.type == TSDB_DATA_TYPE_JSON) {
×
3271
          colSize = getJsonValueLen(pColData);
×
3272
        } else {
3273
          colSize = varDataTLen(pColData);
×
3274
        }
3275
        colSizes[col] += colSize;
×
3276
        dataLen += colSize;
×
3277
        if(dataLen > dataBuflen) goto _exit;
×
3278
        (void) memmove(data, pColData, colSize);
×
3279
        data += colSize;
×
3280
      }
3281
    } else {
3282
      colSizes[col] = colDataGetLength(pColRes, numOfRows);
895,569✔
3283
      dataLen += colSizes[col];
895,587✔
3284
      if(dataLen > dataBuflen) goto _exit;
895,587!
3285
      if (pColRes->pData != NULL) {
895,587✔
3286
        (void) memmove(data, pColRes->pData, colSizes[col]);
894,414✔
3287
      }
3288
      data += colSizes[col];
895,587✔
3289
    }
3290

3291
    if (colSizes[col] <= 0 && !colDataIsNull_s(pColRes, 0) && pColRes->info.type != TSDB_DATA_TYPE_NULL) {
896,495!
3292
      uError("Invalid colSize:%d colIdx:%d colType:%d while encoding block", colSizes[col], col, pColRes->info.type);
×
3293
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3294
      return -1;
×
3295
    }
3296
    
3297
    colSizes[col] = htonl(colSizes[col]);
895,587✔
3298
    //    uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type,
3299
    //    htonl(colSizes[col]), colSizes[col]);
3300
  }
3301

3302
  bool* blankFill = (bool*)data;
185,084✔
3303
  *blankFill = pBlock->info.blankFill;
185,084✔
3304
  data += sizeof(bool);
185,084✔
3305

3306
  *actualLen = dataLen;
185,084✔
3307
#ifndef NO_UNALIGNED_ACCESS
3308
  *groupId = pBlock->info.id.groupId;
185,084✔
3309
#else
3310
  taosSetPUInt64Aligned(groupId, &pBlock->info.id.groupId);
3311
#endif
3312
  if (dataLen > dataBuflen) goto _exit;
185,084!
3313

3314
  return dataLen;
185,084✔
3315

3316
_exit:
×
3317
  uError("blockEncode dataLen:%d, dataBuflen:%zu", dataLen, dataBuflen);
×
3318
  terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3319
  return -1;
×
3320
}
3321

3322
int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos) {
4,080✔
3323
  const char* pStart = pData;
4,080✔
3324

3325
  int32_t version = *(int32_t*)pStart;
4,080✔
3326
  pStart += sizeof(int32_t);
4,080✔
3327

3328
  // total length sizeof(int32_t)
3329
  int32_t dataLen = *(int32_t*)pStart;
4,080✔
3330
  pStart += sizeof(int32_t);
4,080✔
3331

3332
  // total rows sizeof(int32_t)
3333
  int32_t numOfRows = *(int32_t*)pStart;
4,080✔
3334
  pStart += sizeof(int32_t);
4,080✔
3335
  if (numOfRows <= 0) {
4,080!
3336
    uError("block decode numOfRows:%d error", numOfRows);
×
3337
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3338
    return terrno;
×
3339
  }
3340

3341
  // total columns sizeof(int32_t)
3342
  int32_t numOfCols = *(int32_t*)pStart;
4,080✔
3343
  pStart += sizeof(int32_t);
4,080✔
3344

3345
  // has column info segment
3346
  int32_t flagSeg = *(int32_t*)pStart;
4,080✔
3347
  int32_t hasColumnInfo = (flagSeg >> 31);
4,080✔
3348
  pStart += sizeof(int32_t);
4,080✔
3349

3350
  // group id sizeof(uint64_t)
3351
#ifndef NO_UNALIGNED_ACCESS
3352
  pBlock->info.id.groupId = *(uint64_t*)pStart;
4,080✔
3353
#else
3354
  taosSetPUInt64Aligned(&pBlock->info.id.groupId, (uint64_t*)pStart);
3355
#endif
3356
  pStart += sizeof(uint64_t);
4,080✔
3357

3358
  if (pBlock->pDataBlock == NULL) {
4,080✔
3359
    pBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols);
624✔
3360
    if (pBlock->pDataBlock == NULL) {
624!
3361
      return terrno;
×
3362
    }
3363
  }
3364

3365
  for (int32_t i = 0; i < numOfCols; ++i) {
16,021✔
3366
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
11,941✔
3367
    if (pColInfoData == NULL) {
11,941!
3368
      return terrno;
×
3369
    }
3370

3371
    pColInfoData->info.type = *(int8_t*)pStart;
11,941✔
3372
    pStart += sizeof(int8_t);
11,941✔
3373

3374
    pColInfoData->info.bytes = *(int32_t*)pStart;
11,941✔
3375
    if (IS_DECIMAL_TYPE(pColInfoData->info.type)) {
11,941!
3376
      extractDecimalTypeInfoFromBytes(&pColInfoData->info.bytes, &pColInfoData->info.precision,
×
3377
                                      &pColInfoData->info.scale);
3378
    }
3379
    pStart += sizeof(int32_t);
11,941✔
3380

3381
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
11,941!
3382
      pBlock->info.hasVarCol = true;
2,546✔
3383
    }
3384
  }
3385

3386
  int32_t code = blockDataEnsureCapacity(pBlock, numOfRows);
4,080✔
3387
  if (code) {
4,080!
3388
    return code;
×
3389
  }
3390

3391
  int32_t* colLen = (int32_t*)pStart;
4,080✔
3392
  pStart += sizeof(int32_t) * numOfCols;
4,080✔
3393

3394
  for (int32_t i = 0; i < numOfCols; ++i) {
16,021✔
3395
    colLen[i] = htonl(colLen[i]);
11,941✔
3396
    if (colLen[i] < 0) {
11,941!
3397
      uError("block decode colLen:%d error, colIdx:%d", colLen[i], i);
×
3398
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3399
      return terrno;
×
3400
    }
3401

3402
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
11,941✔
3403
    if (pColInfoData == NULL) {
11,941!
3404
      return terrno;
×
3405
    }
3406

3407
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
11,941!
3408
      memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows);
2,546✔
3409
      pStart += sizeof(int32_t) * numOfRows;
2,546✔
3410

3411
      if (colLen[i] > 0 && pColInfoData->varmeta.allocLen < colLen[i]) {
2,546✔
3412
        char* tmp = taosMemoryRealloc(pColInfoData->pData, colLen[i]);
1,909!
3413
        if (tmp == NULL) {
1,909!
3414
          return terrno;
×
3415
        }
3416

3417
        pColInfoData->pData = tmp;
1,909✔
3418
        pColInfoData->varmeta.allocLen = colLen[i];
1,909✔
3419
      }
3420

3421
      pColInfoData->varmeta.length = colLen[i];
2,546✔
3422
    } else {
3423
      memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
9,395✔
3424
      pStart += BitmapLen(numOfRows);
9,395✔
3425
    }
3426

3427
    // TODO
3428
    // setting this flag to true temporarily so aggregate function on stable will
3429
    // examine NULL value for non-primary key column
3430
    pColInfoData->hasNull = true;
11,941✔
3431

3432
    if (colLen[i] > 0) {
11,941✔
3433
      memcpy(pColInfoData->pData, pStart, colLen[i]);
11,468✔
3434
    } else if (!colDataIsNull_s(pColInfoData, 0) && pColInfoData->info.type != TSDB_DATA_TYPE_NULL) {
473!
3435
      uError("block decode colLen:%d error, colIdx:%d, type:%d", colLen[i], i, pColInfoData->info.type);
×
3436
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3437
      return terrno;
×
3438
    }
3439

3440
    pStart += colLen[i];
11,941✔
3441
  }
3442

3443
  bool blankFill = *(bool*)pStart;
4,080✔
3444
  pStart += sizeof(bool);
4,080✔
3445

3446
  pBlock->info.dataLoad = 1;
4,080✔
3447
  pBlock->info.rows = numOfRows;
4,080✔
3448
  pBlock->info.blankFill = blankFill;
4,080✔
3449
  if (pStart - pData != dataLen) {
4,080!
3450
    uError("block decode msg len error, pStart:%p, pData:%p, dataLen:%d", pStart, pData, dataLen);
×
3451
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3452
    return terrno;
×
3453
  }
3454

3455
  *pEndPos = pStart;
4,080✔
3456

3457
  code = blockDataCheck(pBlock);
4,080✔
3458
  if (code != TSDB_CODE_SUCCESS) {
4,080!
3459
    terrno = code;
×
3460
    return code;
×
3461
  }
3462

3463
  return TSDB_CODE_SUCCESS;
4,080✔
3464
}
3465

3466
int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) {
6,586✔
3467
  //  int32_t totalRows = pBlock->info.rows;
3468
  int32_t code = 0;
6,586✔
3469
  int32_t bmLen = BitmapLen(totalRows);
6,586✔
3470
  char*   pBitmap = NULL;
6,586✔
3471
  int32_t maxRows = 0;
6,586✔
3472

3473
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
6,586✔
3474
  if (!pBoolList) {
6,586✔
3475
    for (int32_t i = 0; i < numOfCols; ++i) {
38,605✔
3476
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
32,174✔
3477
      // it is a reserved column for scalar function, and no data in this column yet.
3478
      if (pDst->pData == NULL) {
32,175✔
3479
        continue;
1✔
3480
      }
3481

3482
      int32_t numOfRows = 0;
32,174✔
3483
      if (IS_VAR_DATA_TYPE(pDst->info.type)) {
32,174!
3484
        pDst->varmeta.length = 0;
6,437✔
3485
      } else {
3486
        memset(pDst->nullbitmap, 0, bmLen);
25,737✔
3487
      }
3488
    }
3489
    return code;
6,431✔
3490
  }
3491

3492
  for (int32_t i = 0; i < numOfCols; ++i) {
760✔
3493
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
604✔
3494
    // it is a reserved column for scalar function, and no data in this column yet.
3495
    if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) {
604!
3496
      continue;
×
3497
    }
3498

3499
    int32_t numOfRows = 0;
604✔
3500
    if (IS_VAR_DATA_TYPE(pDst->info.type)) {
696!
3501
      int32_t j = 0;
92✔
3502
      pDst->varmeta.length = 0;
92✔
3503

3504
      while (j < totalRows) {
158,364✔
3505
        if (pBoolList[j] == 0) {
158,272✔
3506
          j += 1;
158,124✔
3507
          continue;
158,124✔
3508
        }
3509

3510
        if (colDataIsNull_var(pDst, j)) {
148!
3511
          colDataSetNull_var(pDst, numOfRows);
×
3512
        } else {
3513
          // fix address sanitizer error. p1 may point to memory that will change during realloc of colDataSetVal, first
3514
          // copy it to p2
3515
          char*   p1 = colDataGetVarData(pDst, j);
148✔
3516
          int32_t len = 0;
148✔
3517
          if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
148!
3518
            len = getJsonValueLen(p1);
×
3519
          } else {
3520
            len = varDataTLen(p1);
148✔
3521
          }
3522

3523
          char* p2 = taosMemoryMalloc(len);
148!
3524
          if (p2 == NULL) {
148!
3525
            return terrno;
×
3526
          }
3527

3528
          memcpy(p2, p1, len);
148✔
3529
          code = colDataSetVal(pDst, numOfRows, p2, false);
148✔
3530
          taosMemoryFree(p2);
148!
3531
          if (code) {
148!
3532
            return code;
×
3533
          }
3534
        }
3535
        numOfRows += 1;
148✔
3536
        j += 1;
148✔
3537
      }
3538

3539
      if (maxRows < numOfRows) {
92✔
3540
        maxRows = numOfRows;
12✔
3541
      }
3542
    } else {
3543
      if (pBitmap == NULL) {
512✔
3544
        pBitmap = taosMemoryCalloc(1, bmLen);
150!
3545
        if (pBitmap == NULL) {
150!
3546
          return terrno;
×
3547
        }
3548
      }
3549

3550
      memcpy(pBitmap, pDst->nullbitmap, bmLen);
512✔
3551
      memset(pDst->nullbitmap, 0, bmLen);
512✔
3552

3553
      int32_t j = 0;
512✔
3554

3555
      switch (pDst->info.type) {
512!
3556
        case TSDB_DATA_TYPE_BIGINT:
213✔
3557
        case TSDB_DATA_TYPE_UBIGINT:
3558
        case TSDB_DATA_TYPE_DOUBLE:
3559
        case TSDB_DATA_TYPE_TIMESTAMP:
3560
          while (j < totalRows) {
166,158✔
3561
            if (pBoolList[j] == 0) {
165,945✔
3562
              j += 1;
161,658✔
3563
              continue;
161,658✔
3564
            }
3565

3566
            if (colDataIsNull_f(pBitmap, j)) {
4,287✔
3567
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
3✔
3568
            } else {
3569
              ((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j];
4,284✔
3570
            }
3571
            numOfRows += 1;
4,287✔
3572
            j += 1;
4,287✔
3573
          }
3574
          break;
213✔
3575
        case TSDB_DATA_TYPE_FLOAT:
276✔
3576
        case TSDB_DATA_TYPE_INT:
3577
        case TSDB_DATA_TYPE_UINT:
3578
          while (j < totalRows) {
478,553✔
3579
            if (pBoolList[j] == 0) {
478,277✔
3580
              j += 1;
475,832✔
3581
              continue;
475,832✔
3582
            }
3583
            if (colDataIsNull_f(pBitmap, j)) {
2,445✔
3584
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
6✔
3585
            } else {
3586
              ((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j];
2,439✔
3587
            }
3588
            numOfRows += 1;
2,445✔
3589
            j += 1;
2,445✔
3590
          }
3591
          break;
276✔
3592
        case TSDB_DATA_TYPE_SMALLINT:
5✔
3593
        case TSDB_DATA_TYPE_USMALLINT:
3594
          while (j < totalRows) {
32✔
3595
            if (pBoolList[j] == 0) {
27✔
3596
              j += 1;
12✔
3597
              continue;
12✔
3598
            }
3599
            if (colDataIsNull_f(pBitmap, j)) {
15!
3600
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
3601
            } else {
3602
              ((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j];
15✔
3603
            }
3604
            numOfRows += 1;
15✔
3605
            j += 1;
15✔
3606
          }
3607
          break;
5✔
3608
        case TSDB_DATA_TYPE_BOOL:
18✔
3609
        case TSDB_DATA_TYPE_TINYINT:
3610
        case TSDB_DATA_TYPE_UTINYINT:
3611
          while (j < totalRows) {
102✔
3612
            if (pBoolList[j] == 0) {
84✔
3613
              j += 1;
41✔
3614
              continue;
41✔
3615
            }
3616
            if (colDataIsNull_f(pBitmap, j)) {
43!
3617
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
3618
            } else {
3619
              ((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j];
43✔
3620
            }
3621
            numOfRows += 1;
43✔
3622
            j += 1;
43✔
3623
          }
3624
          break;
18✔
3625
        case TSDB_DATA_TYPE_DECIMAL64:
×
3626
        case TSDB_DATA_TYPE_DECIMAL:
3627
          while (j < totalRows) {
×
3628
            if (pBoolList[j] == 0) {
×
3629
              j += 1;
×
3630
              continue;
×
3631
            }
3632
            if (colDataIsNull_f(pBitmap, j)) {
×
3633
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
3634
            } else {
3635
              memcpy(pDst->pData + numOfRows * pDst->info.bytes, pDst->pData + j * pDst->info.bytes, pDst->info.bytes);
×
3636
            }
3637
            numOfRows += 1;
×
3638
            j += 1;
×
3639
          }
3640
          break;
×
3641
      }
3642
    }
604✔
3643

3644
    if (maxRows < numOfRows) {
604✔
3645
      maxRows = numOfRows;
144✔
3646
    }
3647
  }
3648

3649
  pBlock->info.rows = maxRows;
156✔
3650
  if (pBitmap != NULL) {
156✔
3651
    taosMemoryFree(pBitmap);
150!
3652
  }
3653

3654
  return code;
156✔
3655
}
3656

3657
int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
184,986✔
3658
  return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
184,986✔
3659
}
3660

3661
int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
×
3662
  if (!pDataBlock || !pOrderInfo) return 0;
×
3663
  for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
×
3664
    SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, i);
×
3665
    if (pOrder == NULL) {
×
3666
      continue;
×
3667
    }
3668

3669
    pOrder->pColData = taosArrayGet(pDataBlock->pDataBlock, pOrder->slotId);
×
3670
    if (pOrder->pColData == NULL) {
×
3671
      continue;
×
3672
    }
3673

3674
    pOrder->compFn = getKeyComparFunc(pOrder->pColData->info.type, pOrder->order);
×
3675
  }
3676

3677
  SSDataBlockSortHelper sortHelper = {.orderInfo = pOrderInfo, .pDataBlock = pDataBlock};
×
3678

3679
  int32_t rowIdx = 0, nextRowIdx = 1;
×
3680
  for (; rowIdx < pDataBlock->info.rows && nextRowIdx < pDataBlock->info.rows; ++rowIdx, ++nextRowIdx) {
×
3681
    if (dataBlockCompar(&nextRowIdx, &rowIdx, &sortHelper) < 0) {
×
3682
      break;
×
3683
    }
3684
  }
3685

3686
  return nextRowIdx;
×
3687
}
3688

3689
#define BLOCK_DATA_CHECK_TRESSA(o)                      \
3690
  if (!(o)) {                                           \
3691
    uError("blockDataCheck failed! line:%d", __LINE__); \
3692
    return TSDB_CODE_INTERNAL_ERROR;                    \
3693
  }
3694
int32_t blockDataCheck(const SSDataBlock* pDataBlock) {
594,924✔
3695
  if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER || NULL == pDataBlock || pDataBlock->info.rows == 0) {
594,924!
3696
    return TSDB_CODE_SUCCESS;
30,687✔
3697
  }
3698

3699
  BLOCK_DATA_CHECK_TRESSA(pDataBlock->info.rows > 0);
564,237!
3700
  if (!pDataBlock->info.dataLoad) {
564,237✔
3701
    return TSDB_CODE_SUCCESS;
3,752✔
3702
  }
3703

3704
  bool isVarType = false;
560,485✔
3705
  int32_t colLen = 0;
560,485✔
3706
  int32_t nextPos = 0;
560,485✔
3707
  int64_t checkRows = 0;
560,485✔
3708
  int64_t typeValue = 0;
560,485✔
3709
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
560,485✔
3710
  for (int32_t i = 0; i < colNum; ++i) {
3,345,646✔
3711
    SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, i);
2,728,762✔
3712
    BLOCK_DATA_CHECK_TRESSA(pCol != NULL);
2,728,743!
3713
    isVarType = IS_VAR_DATA_TYPE(pCol->info.type);
2,729,250!
3714
    checkRows = pDataBlock->info.rows;
2,729,250✔
3715
    if (pCol->info.noData == true) continue;
2,729,250✔
3716

3717
    if (isVarType) {
2,725,486✔
3718
      BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset);
557,969!
3719
    } else {
3720
      BLOCK_DATA_CHECK_TRESSA(pCol->nullbitmap);
2,167,517!
3721
    }
3722

3723
    nextPos = -1;
2,725,486✔
3724
    for (int64_t r = 0; r < checkRows; ++r) {
122,633,411✔
3725
      if (tsSafetyCheckLevel <= TSDB_SAFETY_CHECK_LEVELL_NORMAL) break;
119,860,662✔
3726
      if (!colDataIsNull_s(pCol, r)) {
239,704,100✔
3727
        BLOCK_DATA_CHECK_TRESSA(pCol->pData);
116,294,543!
3728
        BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.length <= pCol->varmeta.allocLen);
116,294,543!
3729

3730
        if (isVarType) {
116,294,543✔
3731
          BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.allocLen > 0);
17,434,013!
3732
          BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] <= pCol->varmeta.length);
17,434,013!
3733
          if (pCol->reassigned) {
17,434,013!
3734
            BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] >= 0);
×
3735
          } else if (0 == r || nextPos == -1) {
17,434,013✔
3736
            nextPos = pCol->varmeta.offset[r];
553,463✔
3737
          } else {
3738
            BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] == nextPos);
16,880,550!
3739
          }
3740

3741
          char*   pColData = pCol->pData + pCol->varmeta.offset[r];
17,434,013✔
3742
          int32_t colSize = 0;
17,434,013✔
3743
          if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
17,434,013!
3744
            colLen = getJsonValueLen(pColData);
×
3745
          } else {
3746
            colLen = varDataTLen(pColData);
17,434,013✔
3747
          }
3748

3749
          if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
17,433,635!
3750
            BLOCK_DATA_CHECK_TRESSA(colLen >= CHAR_BYTES);
×
3751
          } else {
3752
            BLOCK_DATA_CHECK_TRESSA(colLen >= VARSTR_HEADER_SIZE);
17,433,635!
3753
          }
3754
          BLOCK_DATA_CHECK_TRESSA(colLen <= pCol->info.bytes);
17,433,635!
3755

3756
          if (pCol->reassigned) {
17,433,635!
3757
            BLOCK_DATA_CHECK_TRESSA((pCol->varmeta.offset[r] + colLen) <= pCol->varmeta.length);
×
3758
          } else {
3759
            nextPos += colLen;
17,433,635✔
3760
            BLOCK_DATA_CHECK_TRESSA(nextPos <= pCol->varmeta.length);
17,433,635!
3761
          }
3762

3763
          typeValue = *(char*)(pCol->pData + pCol->varmeta.offset[r] + colLen - 1);
17,433,635✔
3764
        } else {
3765
          if (TSDB_DATA_TYPE_FLOAT == pCol->info.type) {
98,860,530✔
3766
            float v = 0;
18,693,691✔
3767
            GET_TYPED_DATA(v, float, pCol->info.type, colDataGetNumData(pCol, r), typeGetTypeModFromColInfo(&pCol->info));
18,693,691!
3768
          } else if (TSDB_DATA_TYPE_DOUBLE == pCol->info.type) {
80,166,839✔
3769
            double v = 0;
1,967,692✔
3770
            GET_TYPED_DATA(v, double, pCol->info.type, colDataGetNumData(pCol, r), typeGetTypeModFromColInfo(&pCol->info));
1,967,692!
3771
          } else if (IS_DECIMAL_TYPE(pCol->info.type)) {
78,199,147!
3772
            // SKIP for decimal types
3773
          } else {
3774
            GET_TYPED_DATA(typeValue, int64_t, pCol->info.type, colDataGetNumData(pCol, r), typeGetTypeModFromColInfo(&pCol->info));
81,757,501!
3775
          }
3776
        }
3777
      }
3778
    }
3779
  }
3780

3781
  return TSDB_CODE_SUCCESS;
616,884✔
3782
}
3783

3784

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc