• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3903

24 Apr 2025 11:36AM UTC coverage: 55.307% (+0.09%) from 55.213%
#3903

push

travis-ci

happyguoxy
Sync branches at 2025-04-24 19:35

175024 of 316459 relevant lines covered (55.31%)

1151858.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.16
/source/common/src/tdatablock.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tdatablock.h"
18
#include "tcompare.h"
19
#include "tlog.h"
20
#include "tname.h"
21
#include "tglobal.h"
22

23
#define MALLOC_ALIGN_BYTES 32
24

25
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
1,079,357✔
26
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
1,079,357✔
27
    if (pColumnInfoData->reassigned) {
203,832✔
28
      int32_t totalSize = 0;
×
29
      for (int32_t row = 0; row < numOfRows; ++row) {
×
30
        char*   pColData = pColumnInfoData->pData + pColumnInfoData->varmeta.offset[row];
×
31
        int32_t colSize = 0;
×
32
        if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
×
33
          colSize = getJsonValueLen(pColData);
×
34
        } else {
35
          colSize = varDataTLen(pColData);
×
36
        }
37
        totalSize += colSize;
×
38
      }
39
      return totalSize;
×
40
    }
41
    return pColumnInfoData->varmeta.length;
203,832✔
42
  } else {
43
    if (pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) {
875,525✔
44
      return 0;
×
45
    } else {
46
      return pColumnInfoData->info.bytes * numOfRows;
875,525✔
47
    }
48
  }
49
}
50

51
int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx) {
×
52
  if (colDataIsNull_s(pColumnInfoData, rowIdx)) {
×
53
    return 0;
×
54
  }
55

56
  if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) return pColumnInfoData->info.bytes;
×
57
  if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON)
×
58
    return getJsonValueLen(colDataGetData(pColumnInfoData, rowIdx));
×
59
  else
60
    return varDataTLen(colDataGetData(pColumnInfoData, rowIdx));
×
61
}
62

63
int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
1,074,848✔
64
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
1,074,848✔
65
    return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows;
203,363✔
66
  } else {
67
    return ((pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) ? 0 : pColumnInfoData->info.bytes * numOfRows) +
871,485✔
68
           BitmapLen(numOfRows);
871,485✔
69
  }
70
}
71

72
int32_t getJsonValueLen(const char* data) {
665✔
73
  int32_t dataLen = 0;
665✔
74
  if (*data == TSDB_DATA_TYPE_NULL) {
665✔
75
    dataLen = CHAR_BYTES;
35✔
76
  } else if (*data == TSDB_DATA_TYPE_NCHAR) {
630✔
77
    dataLen = varDataTLen(data + CHAR_BYTES) + CHAR_BYTES;
70✔
78
  } else if (*data == TSDB_DATA_TYPE_DOUBLE) {
560✔
79
    dataLen = DOUBLE_BYTES + CHAR_BYTES;
140✔
80
  } else if (*data == TSDB_DATA_TYPE_BOOL) {
420✔
81
    dataLen = CHAR_BYTES + CHAR_BYTES;
70✔
82
  } else if (tTagIsJson(data)) {  // json string
350✔
83
    dataLen = ((STag*)(data))->len;
350✔
84
  } else {
85
    uError("Invalid data type:%d in Json", *data);
×
86
  }
87
  return dataLen;
665✔
88
}
89

90
static int32_t getDataLen(int32_t type, const char* pData) {
5,740,096✔
91
  int32_t dataLen = 0;
5,740,096✔
92
  if (type == TSDB_DATA_TYPE_JSON) {
5,740,096✔
93
    dataLen = getJsonValueLen(pData);
665✔
94
  } else {
95
    dataLen = varDataTLen(pData);
5,739,431✔
96
  }
97
  return dataLen;
5,740,672✔
98
}
99

100
static int32_t colDataSetValHelp(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
97,644,459✔
101
  if (isNull || pData == NULL) {
97,644,459✔
102
    // There is a placehold for each NULL value of binary or nchar type.
103
    if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
240,363✔
104
      pColumnInfoData->varmeta.offset[rowIndex] = -1;  // it is a null value of VAR type.
×
105
    } else {
106
      colDataSetNull_f_s(pColumnInfoData, rowIndex);
440,801✔
107
    }
108

109
    pColumnInfoData->hasNull = true;
240,363✔
110
    return 0;
240,363✔
111
  }
112

113
  int32_t type = pColumnInfoData->info.type;
97,404,096✔
114
  if (IS_VAR_DATA_TYPE(type)) {
97,404,096✔
115
    int32_t dataLen = getDataLen(type, pData);
5,681,717✔
116
    if (pColumnInfoData->varmeta.offset[rowIndex] > 0) {
5,740,644✔
117
      pColumnInfoData->varmeta.length = pColumnInfoData->varmeta.offset[rowIndex];
×
118
    }
119

120
    SVarColAttr* pAttr = &pColumnInfoData->varmeta;
5,740,644✔
121
    if (pAttr->allocLen < pAttr->length + dataLen) {
5,740,644✔
122
      uint32_t newSize = pAttr->allocLen;
132,888✔
123
      if (newSize <= 1) {
132,888✔
124
        newSize = 8;
117,594✔
125
      }
126

127
      while (newSize < pAttr->length + dataLen) {
335,518✔
128
        newSize = newSize * 1.5;
202,630✔
129
        if (newSize > UINT32_MAX) {
130
          return TSDB_CODE_OUT_OF_MEMORY;
131
        }
132
      }
133

134
      char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
132,888✔
135
      if (buf == NULL) {
132,897✔
136
        return terrno;
×
137
      }
138

139
      pColumnInfoData->pData = buf;
132,897✔
140
      pAttr->allocLen = newSize;
132,897✔
141
    }
142

143
    uint32_t len = pColumnInfoData->varmeta.length;
5,740,653✔
144
    pColumnInfoData->varmeta.offset[rowIndex] = len;
5,740,653✔
145

146
    (void)memmove(pColumnInfoData->pData + len, pData, dataLen);
5,740,653✔
147
    pColumnInfoData->varmeta.length += dataLen;
5,740,653✔
148
  } else {
149
    memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex, pData, pColumnInfoData->info.bytes);
91,722,379✔
150
    colDataClearNull_f(pColumnInfoData->nullbitmap, rowIndex);
91,722,379✔
151
  }
152

153
  return 0;
97,463,032✔
154
}
155

156
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
97,644,440✔
157
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
97,644,440✔
158
   pColumnInfoData->varmeta.offset[rowIndex] = -1;
5,740,319✔
159
  }
160

161
  return colDataSetValHelp(pColumnInfoData, rowIndex, pData, isNull);
97,644,440✔
162
}
163

164
int32_t colDataSetValOrCover(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
2,566✔
165
  return colDataSetValHelp(pColumnInfoData, rowIndex, pData, isNull);
2,566✔
166
}
167

168
int32_t varColSetVarData(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pVarData, int32_t varDataLen,
×
169
                         bool isNull) {
170
  if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
×
171
    return TSDB_CODE_INVALID_PARA;
×
172
  }
173

174
  if (isNull || pVarData == NULL) {
×
175
    pColumnInfoData->varmeta.offset[rowIndex] = -1;  // it is a null value of VAR type.
×
176
    pColumnInfoData->hasNull = true;
×
177
    return TSDB_CODE_SUCCESS;
×
178
  }
179

180
  int32_t dataLen = VARSTR_HEADER_SIZE + varDataLen;
×
181
  if (pColumnInfoData->varmeta.offset[rowIndex] > 0) {
×
182
    pColumnInfoData->varmeta.length = pColumnInfoData->varmeta.offset[rowIndex];
×
183
  }
184

185
  SVarColAttr* pAttr = &pColumnInfoData->varmeta;
×
186
  if (pAttr->allocLen < pAttr->length + dataLen) {
×
187
    uint32_t newSize = pAttr->allocLen;
×
188
    if (newSize <= 1) {
×
189
      newSize = 8;
×
190
    }
191

192
    while (newSize < pAttr->length + dataLen) {
×
193
      newSize = newSize * 1.5;
×
194
      if (newSize > UINT32_MAX) {
195
        return TSDB_CODE_OUT_OF_MEMORY;
196
      }
197
    }
198

199
    char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
×
200
    if (buf == NULL) {
×
201
      return terrno;
×
202
    }
203

204
    pColumnInfoData->pData = buf;
×
205
    pAttr->allocLen = newSize;
×
206
  }
207

208
  uint32_t len = pColumnInfoData->varmeta.length;
×
209
  pColumnInfoData->varmeta.offset[rowIndex] = len;
×
210

211
  (void)memmove(varDataVal(pColumnInfoData->pData + len), pVarData, varDataLen);
×
212
  varDataSetLen(pColumnInfoData->pData + len, varDataLen);
×
213
  pColumnInfoData->varmeta.length += dataLen;
×
214
  return TSDB_CODE_SUCCESS;
×
215
}
216

217
int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx,
×
218
                           const char* pData) {
219
  int32_t type = pColumnInfoData->info.type;
×
220
  if (IS_VAR_DATA_TYPE(type)) {
×
221
    pColumnInfoData->varmeta.offset[dstRowIdx] = pColumnInfoData->varmeta.offset[srcRowIdx];
×
222
    pColumnInfoData->reassigned = true;
×
223
  } else {
224
    memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * dstRowIdx, pData, pColumnInfoData->info.bytes);
×
225
    colDataClearNull_f(pColumnInfoData->nullbitmap, dstRowIdx);
×
226
  }
227

228
  return 0;
×
229
}
230

231
static int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) {
140,246✔
232
  if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
140,246✔
233
    return TSDB_CODE_SUCCESS;
×
234
  }
235

236
  if (pColumnInfoData->varmeta.allocLen >= newSize) {
140,246✔
237
    return TSDB_CODE_SUCCESS;
×
238
  }
239

240
  if (pColumnInfoData->varmeta.allocLen < newSize) {
140,246✔
241
    char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
140,246✔
242
    if (buf == NULL) {
140,246✔
243
      return terrno;
×
244
    }
245

246
    pColumnInfoData->pData = buf;
140,246✔
247
    pColumnInfoData->varmeta.allocLen = newSize;
140,246✔
248
  }
249

250
  return TSDB_CODE_SUCCESS;
140,246✔
251
}
252

253
static int32_t doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData,
14,348,142✔
254
                            int32_t itemLen, int32_t numOfRows, bool trimValue) {
255
  if (pColumnInfoData->info.bytes < itemLen) {
14,348,142✔
256
    uWarn("column/tag actual data len %d is bigger than schema len %d, trim it:%d", itemLen,
×
257
          pColumnInfoData->info.bytes, trimValue);
258
    if (trimValue) {
×
259
      itemLen = pColumnInfoData->info.bytes;
×
260
    } else {
261
      return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
262
    }
263
  }
264

265
  size_t   start = 1;
14,348,142✔
266
  int32_t  t = 0;
14,348,142✔
267
  int32_t  count = log(numOfRows) / log(2);
14,348,142✔
268
  uint32_t startOffset =
14,348,142✔
269
      (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) ? pColumnInfoData->varmeta.length : (currentRow * itemLen);
14,348,142✔
270

271
  // the first item
272
  memcpy(pColumnInfoData->pData + startOffset, pData, itemLen);
14,348,142✔
273

274
  while (t < count) {
14,462,603✔
275
    int32_t xlen = 1 << t;
114,461✔
276
    memcpy(pColumnInfoData->pData + start * itemLen + startOffset, pColumnInfoData->pData + startOffset,
114,461✔
277
           xlen * itemLen);
114,461✔
278
    t += 1;
114,461✔
279
    start += xlen;
114,461✔
280
  }
281

282
  // the tail part
283
  if (numOfRows > start) {
14,348,142✔
284
    memcpy(pColumnInfoData->pData + start * itemLen + startOffset, pColumnInfoData->pData + startOffset,
9,180✔
285
           (numOfRows - start) * itemLen);
9,180✔
286
  }
287

288
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
14,348,142✔
289
    for (int32_t i = 0; i < numOfRows; ++i) {
35,044,027✔
290
      pColumnInfoData->varmeta.offset[i + currentRow] = pColumnInfoData->varmeta.length + i * itemLen;
30,840,910✔
291
    }
292

293
    pColumnInfoData->varmeta.length += numOfRows * itemLen;
4,203,117✔
294
  }
295

296
  return TSDB_CODE_SUCCESS;
14,348,142✔
297
}
298

299
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows,
14,348,089✔
300
                         bool trimValue) {
301
  int32_t len = pColumnInfoData->info.bytes;
14,348,089✔
302
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
14,348,089✔
303
    if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
4,203,119✔
304
      len = getJsonValueLen(pData);
×
305
    } else {
306
      len = varDataTLen(pData);
4,203,119✔
307
    }
308
    if (pColumnInfoData->varmeta.allocLen < (numOfRows * len + pColumnInfoData->varmeta.length)) {
4,203,119✔
309
      int32_t code = colDataReserve(pColumnInfoData, (numOfRows * len + pColumnInfoData->varmeta.length));
140,246✔
310
      if (code != TSDB_CODE_SUCCESS) {
140,246✔
311
        return code;
×
312
      }
313
    }
314
  }
315

316
  return doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows, trimValue);
14,348,089✔
317
}
318

319
void colDataSetNItemsNull(SColumnInfoData* pColumnInfoData, uint32_t currentRow, uint32_t numOfRows) {
×
320
  pColumnInfoData->hasNull = true;
×
321

322
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
×
323
    memset(&pColumnInfoData->varmeta.offset[currentRow], -1, sizeof(int32_t) * numOfRows);
×
324
  } else {
325
    if (numOfRows < 16) {
×
326
      for (int32_t i = 0; i < numOfRows; ++i) {
×
327
        colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
×
328
      }
329
    } else {
330
      int32_t i = 0;
×
331
      for (; i < numOfRows; ++i) {
×
332
        if (BitPos(currentRow + i)) {
×
333
          colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
×
334
        } else {
335
          break;
×
336
        }
337
      }
338

339
      int32_t bytes = (numOfRows - i) / 8;
×
340
      memset(&BMCharPos(pColumnInfoData->nullbitmap, currentRow + i), 0xFF, bytes);
×
341
      i += bytes * 8;
×
342

343
      for (; i < numOfRows; ++i) {
×
344
        colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
×
345
      }
346
    }
347
  }
348
}
×
349

350
int32_t colDataCopyAndReassign(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData,
×
351
                               uint32_t numOfRows) {
352
  int32_t code = colDataSetVal(pColumnInfoData, currentRow, pData, false);
×
353
  if (code) {
×
354
    return code;
×
355
  }
356

357
  if (numOfRows > 1) {
×
358
    int32_t* pOffset = pColumnInfoData->varmeta.offset;
×
359
    memset(&pOffset[currentRow + 1], pOffset[currentRow], sizeof(pOffset[0]) * (numOfRows - 1));
×
360
    pColumnInfoData->reassigned = true;
×
361
  }
362

363
  return code;
×
364
}
365

366
int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows,
4✔
367
                          bool isNull) {
368
  int32_t len = pColumnInfoData->info.bytes;
4✔
369
  if (isNull) {
4✔
370
    colDataSetNItemsNull(pColumnInfoData, currentRow, numOfRows);
×
371
    pColumnInfoData->hasNull = true;
×
372
    return 0;
×
373
  }
374

375
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
4✔
376
    return colDataCopyAndReassign(pColumnInfoData, currentRow, pData, numOfRows);
×
377
  } else {
378
    int32_t  colBytes = pColumnInfoData->info.bytes;
4✔
379
    int32_t  colOffset = currentRow * colBytes;
4✔
380
    uint32_t num = 1;
4✔
381

382
    void* pStart = pColumnInfoData->pData + colOffset;
4✔
383
    memcpy(pStart, pData, colBytes);
4✔
384
    colOffset += num * colBytes;
4✔
385

386
    while (num < numOfRows) {
4✔
387
      int32_t maxNum = num << 1;
×
388
      int32_t tnum = maxNum > numOfRows ? (numOfRows - num) : num;
×
389

390
      memcpy(pColumnInfoData->pData + colOffset, pStart, tnum * colBytes);
×
391
      colOffset += tnum * colBytes;
×
392
      num += tnum;
×
393
    }
394
  }
395

396
  return TSDB_CODE_SUCCESS;
4✔
397
}
398

399
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource,
28,566✔
400
                          int32_t numOfRow2) {
401
  if (numOfRow2 <= 0) return;
28,566✔
402

403
  uint32_t total = numOfRow1 + numOfRow2;
28,566✔
404

405
  uint32_t remindBits = BitPos(numOfRow1);
28,566✔
406
  uint32_t shiftBits = 8 - remindBits;
28,566✔
407

408
  if (remindBits == 0) {  // no need to shift bits of bitmap
28,566✔
409
    memcpy(pColumnInfoData->nullbitmap + BitmapLen(numOfRow1), pSource->nullbitmap, BitmapLen(numOfRow2));
28,062✔
410
    return;
28,062✔
411
  }
412

413
  uint8_t* p = (uint8_t*)pSource->nullbitmap;
504✔
414
  pColumnInfoData->nullbitmap[BitmapLen(numOfRow1) - 1] &= (0B11111111 << shiftBits);  // clear remind bits
504✔
415
  pColumnInfoData->nullbitmap[BitmapLen(numOfRow1) - 1] |= (p[0] >> remindBits);       // copy remind bits
504✔
416

417
  if (BitmapLen(numOfRow1) == BitmapLen(total)) {
504✔
418
    return;
412✔
419
  }
420

421
  int32_t len = BitmapLen(numOfRow2);
92✔
422
  int32_t i = 0;
92✔
423

424
  uint8_t* start = (uint8_t*)&pColumnInfoData->nullbitmap[BitmapLen(numOfRow1)];
92✔
425
  int32_t  overCount = BitmapLen(total) - BitmapLen(numOfRow1);
92✔
426
  memset(start, 0, overCount);
92✔
427
  while (i < len) {  // size limit of pSource->nullbitmap
7,553✔
428
    if (i >= 1) {
7,513✔
429
      start[i - 1] |= (p[i] >> remindBits);  // copy remind bits
7,411✔
430
    }
431

432
    if (i >= overCount) {  // size limit of pColumnInfoData->nullbitmap
7,513✔
433
      return;
52✔
434
    }
435

436
    start[i] |= (p[i] << shiftBits);  // copy shift bits
7,461✔
437
    i += 1;
7,461✔
438
  }
439
}
440

441
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
82,360✔
442
                        const SColumnInfoData* pSource, int32_t numOfRow2) {
443
  if (pColumnInfoData->info.type != pSource->info.type) {
82,360✔
444
    return TSDB_CODE_INVALID_PARA;
×
445
  }
446

447
  if (numOfRow2 == 0) {
82,360✔
448
    return numOfRow1;
×
449
  }
450

451
  if (pSource->hasNull) {
82,360✔
452
    pColumnInfoData->hasNull = pSource->hasNull;
54,696✔
453
  }
454

455
  uint32_t finalNumOfRows = numOfRow1 + numOfRow2;
82,360✔
456
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
82,360✔
457
    // Handle the bitmap
458
    if (finalNumOfRows > (*capacity)) {
53,787✔
459
      char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2));
728✔
460
      if (p == NULL) {
731✔
461
        return terrno;
×
462
      }
463

464
      *capacity = finalNumOfRows;
731✔
465
      pColumnInfoData->varmeta.offset = (int32_t*)p;
731✔
466
    }
467

468
    for (int32_t i = 0; i < numOfRow2; ++i) {
4,346,206✔
469
      if (pSource->varmeta.offset[i] == -1) {
4,292,416✔
470
        pColumnInfoData->varmeta.offset[i + numOfRow1] = -1;
200,192✔
471
      } else {
472
        pColumnInfoData->varmeta.offset[i + numOfRow1] = pSource->varmeta.offset[i] + pColumnInfoData->varmeta.length;
4,092,224✔
473
      }
474
    }
475

476
    // copy data
477
    uint32_t len = pSource->varmeta.length;
53,790✔
478
    uint32_t oldLen = pColumnInfoData->varmeta.length;
53,790✔
479
    if (pColumnInfoData->varmeta.allocLen < len + oldLen) {
53,790✔
480
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, len + oldLen);
816✔
481
      if (tmp == NULL) {
816✔
482
        return terrno;
×
483
      }
484

485
      pColumnInfoData->pData = tmp;
816✔
486
      pColumnInfoData->varmeta.allocLen = len + oldLen;
816✔
487
    }
488

489
    if (pColumnInfoData->pData && pSource->pData) {  // TD-20382
53,790✔
490
      memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len);
53,679✔
491
    }
492
    pColumnInfoData->varmeta.length = len + oldLen;
53,790✔
493
  } else {
494
    if (finalNumOfRows > (*capacity)) {
28,573✔
495
      // all data may be null, when the pColumnInfoData->info.type == 0, bytes == 0;
496
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes);
2,464✔
497
      if (tmp == NULL) {
2,464✔
498
        return terrno;
×
499
      }
500

501
      pColumnInfoData->pData = tmp;
2,464✔
502
      if (BitmapLen(numOfRow1) < BitmapLen(finalNumOfRows)) {
2,464✔
503
        char* btmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(finalNumOfRows));
2,061✔
504
        if (btmp == NULL) {
2,062✔
505
          return terrno;
×
506
        }
507
        uint32_t extend = BitmapLen(finalNumOfRows) - BitmapLen(numOfRow1);
2,062✔
508
        memset(btmp + BitmapLen(numOfRow1), 0, extend);
2,062✔
509
        pColumnInfoData->nullbitmap = btmp;
2,062✔
510
      }
511

512
      *capacity = finalNumOfRows;
2,465✔
513
    }
514

515
    doBitmapMerge(pColumnInfoData, numOfRow1, pSource, numOfRow2);
28,574✔
516

517
    if (pSource->pData) {
28,579✔
518
      int32_t offset = pColumnInfoData->info.bytes * numOfRow1;
28,576✔
519
      memcpy(pColumnInfoData->pData + offset, pSource->pData, pSource->info.bytes * numOfRow2);
28,576✔
520
    }
521
  }
522

523
  return numOfRow1 + numOfRow2;
82,369✔
524
}
525

526
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
1,777,648✔
527
                      const SDataBlockInfo* pBlockInfo) {
528
  if (pColumnInfoData->info.type != pSource->info.type || (pBlockInfo != NULL && pBlockInfo->capacity < numOfRows)) {
1,777,648✔
529
    return TSDB_CODE_INVALID_PARA;
×
530
  }
531

532
  if (numOfRows <= 0) {
1,777,648✔
533
    return numOfRows;
21✔
534
  }
535

536
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
1,983,233✔
537
    int32_t newLen = pSource->varmeta.length;
205,606✔
538
    memcpy(pColumnInfoData->varmeta.offset, pSource->varmeta.offset, sizeof(int32_t) * numOfRows);
205,606✔
539
    if (pColumnInfoData->varmeta.allocLen < newLen) {
205,606✔
540
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, newLen);
4,053✔
541
      if (tmp == NULL) {
4,053✔
542
        return terrno;
×
543
      }
544

545
      pColumnInfoData->pData = tmp;
4,053✔
546
      pColumnInfoData->varmeta.allocLen = newLen;
4,053✔
547
    }
548

549
    pColumnInfoData->varmeta.length = newLen;
205,606✔
550
    if (pColumnInfoData->pData != NULL && pSource->pData != NULL) {
205,606✔
551
      memcpy(pColumnInfoData->pData, pSource->pData, newLen);
204,826✔
552
    }
553
  } else {
554
    memcpy(pColumnInfoData->nullbitmap, pSource->nullbitmap, BitmapLen(numOfRows));
1,572,021✔
555
    if (pSource->pData != NULL) {
1,572,021✔
556
      memcpy(pColumnInfoData->pData, pSource->pData, pSource->info.bytes * numOfRows);
1,572,023✔
557
    }
558
  }
559

560
  pColumnInfoData->hasNull = pSource->hasNull;
1,777,627✔
561
  pColumnInfoData->info = pSource->info;
1,777,627✔
562
  return 0;
1,777,627✔
563
}
564

565
int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx,
348✔
566
                           int32_t numOfRows) {
567
  if (pDst->info.type != pSrc->info.type || pDst->info.bytes != pSrc->info.bytes || pSrc->reassigned) {
348✔
568
    return TSDB_CODE_INVALID_PARA;
×
569
  }
570

571
  if (numOfRows <= 0) {
348✔
572
    return numOfRows;
×
573
  }
574

575
  if (IS_VAR_DATA_TYPE(pDst->info.type)) {
348✔
576
    int32_t allLen = 0;
×
577
    void*   srcAddr = NULL;
×
578
    if (pSrc->hasNull) {
×
579
      for (int32_t i = 0; i < numOfRows; ++i) {
×
580
        if (colDataIsNull_var(pSrc, srcIdx + i)) {
×
581
          pDst->varmeta.offset[dstIdx + i] = -1;
×
582
          pDst->hasNull = true;
×
583
          continue;
×
584
        }
585

586
        char* pData = colDataGetVarData(pSrc, srcIdx + i);
×
587
        if (NULL == srcAddr) {
×
588
          srcAddr = pData;
×
589
        }
590
        int32_t dataLen = 0;
×
591
        if (pSrc->info.type == TSDB_DATA_TYPE_JSON) {
×
592
          dataLen = getJsonValueLen(pData);
×
593
        } else {
594
          dataLen = varDataTLen(pData);
×
595
        }
596
        pDst->varmeta.offset[dstIdx + i] = pDst->varmeta.length + allLen;
×
597
        allLen += dataLen;
×
598
      }
599
    } else {
600
      for (int32_t i = 0; i < numOfRows; ++i) {
×
601
        char*   pData = colDataGetVarData(pSrc, srcIdx + i);
×
602
        int32_t dataLen = 0;
×
603
        if (pSrc->info.type == TSDB_DATA_TYPE_JSON) {
×
604
          dataLen = getJsonValueLen(pData);
×
605
        } else {
606
          dataLen = varDataTLen(pData);
×
607
        }
608
        pDst->varmeta.offset[dstIdx + i] = pDst->varmeta.length + allLen;
×
609
        allLen += dataLen;
×
610
      }
611
    }
612

613
    if (allLen > 0) {
×
614
      // copy data
615
      if (pDst->varmeta.allocLen < pDst->varmeta.length + allLen) {
×
616
        char* tmp = taosMemoryRealloc(pDst->pData, pDst->varmeta.length + allLen);
×
617
        if (tmp == NULL) {
×
618
          return terrno;
×
619
        }
620

621
        pDst->pData = tmp;
×
622
        pDst->varmeta.allocLen = pDst->varmeta.length + allLen;
×
623
      }
624
      if (pSrc->hasNull) {
×
625
        memcpy(pDst->pData + pDst->varmeta.length, srcAddr, allLen);
×
626
      } else {
627
        memcpy(pDst->pData + pDst->varmeta.length, colDataGetVarData(pSrc, srcIdx), allLen);
×
628
      }
629
      pDst->varmeta.length = pDst->varmeta.length + allLen;
×
630
    }
631
  } else {
632
    if (pSrc->hasNull) {
348✔
633
      if (BitPos(dstIdx) == BitPos(srcIdx)) {
348✔
634
        for (int32_t i = 0; i < numOfRows; ++i) {
586✔
635
          if (0 == BitPos(dstIdx) && (i + (1 << NBIT) <= numOfRows)) {
293✔
636
            BMCharPos(pDst->nullbitmap, dstIdx + i) = BMCharPos(pSrc->nullbitmap, srcIdx + i);
×
637
            if (BMCharPos(pDst->nullbitmap, dstIdx + i)) {
×
638
              pDst->hasNull = true;
×
639
            }
640
            i += (1 << NBIT) - 1;
×
641
          } else {
642
            if (colDataIsNull_f(pSrc->nullbitmap, srcIdx + i)) {
293✔
643
              colDataSetNull_f(pDst->nullbitmap, dstIdx + i);
×
644
              pDst->hasNull = true;
×
645
            } else {
646
              colDataClearNull_f(pDst->nullbitmap, dstIdx + i);
293✔
647
            }
648
          }
649
        }
650
      } else {
651
        for (int32_t i = 0; i < numOfRows; ++i) {
110✔
652
          if (colDataIsNull_f(pSrc->nullbitmap, srcIdx + i)) {
55✔
653
            colDataSetNull_f(pDst->nullbitmap, dstIdx + i);
×
654
            pDst->hasNull = true;
×
655
          } else {
656
            colDataClearNull_f(pDst->nullbitmap, dstIdx + i);
55✔
657
          }
658
        }
659
      }
660
    }
661

662
    if (pSrc->pData != NULL) {
348✔
663
      memcpy(pDst->pData + pDst->info.bytes * dstIdx, pSrc->pData + pSrc->info.bytes * srcIdx,
348✔
664
             pDst->info.bytes * numOfRows);
348✔
665
    }
666
  }
667

668
  return 0;
348✔
669
}
670

671
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSize(pBlock->pDataBlock); }
2,002,092✔
672

673
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; }
3,847✔
674

675
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) {
232,099✔
676
  if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0) {
232,099✔
677
    return 0;
6,413✔
678
  }
679

680
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
225,686✔
681
  if (numOfCols <= 0) {
225,684✔
682
    return -1;
×
683
  }
684

685
  int32_t index = (tsColumnIndex == -1) ? 0 : tsColumnIndex;
225,684✔
686

687
  SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index);
225,684✔
688
  if (pColInfoData == NULL) {
225,684✔
689
    return 0;
×
690
  }
691

692
  if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
225,684✔
693
    return 0;
867✔
694
  }
695

696
  TSKEY skey = *(TSKEY*)colDataGetData(pColInfoData, 0);
224,817✔
697
  TSKEY ekey = *(TSKEY*)colDataGetData(pColInfoData, (pDataBlock->info.rows - 1));
224,817✔
698

699
  pDataBlock->info.window.skey = TMIN(skey, ekey);
224,817✔
700
  pDataBlock->info.window.ekey = TMAX(skey, ekey);
224,817✔
701

702
  return 0;
224,817✔
703
}
704

705
int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, bool asc) {
15,059✔
706
  if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0 || pkColumnIndex == -1) {
15,059✔
707
    return 0;
15,059✔
708
  }
709

710
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
×
711
  if (numOfCols <= 0) {
×
712
    return -1;
×
713
  }
714

715
  SDataBlockInfo*  pInfo = &pDataBlock->info;
×
716
  SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pkColumnIndex);
×
717
  if (pColInfoData == NULL) {
×
718
    return terrno;
×
719
  }
720

721
  if (!IS_NUMERIC_TYPE(pColInfoData->info.type) && (pColInfoData->info.type != TSDB_DATA_TYPE_VARCHAR)) {
×
722
    return 0;
×
723
  }
724

725
  void* skey = colDataGetData(pColInfoData, 0);
×
726
  void* ekey = colDataGetData(pColInfoData, (pInfo->rows - 1));
×
727

728
  int64_t val = 0;
×
729
  if (asc) {
×
730
    if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
×
731
      GET_TYPED_DATA(val, int64_t, pColInfoData->info.type, skey, typeGetTypeModFromColInfo(&pColInfoData->info));
×
732
      VALUE_SET_TRIVIAL_DATUM(&pInfo->pks[0], val);
×
733
      GET_TYPED_DATA(val, int64_t, pColInfoData->info.type, ekey, typeGetTypeModFromColInfo(&pColInfoData->info));
×
734
      VALUE_SET_TRIVIAL_DATUM(&pInfo->pks[1], val);
×
735
    } else {  // todo refactor
736
      memcpy(pInfo->pks[0].pData, varDataVal(skey), varDataLen(skey));
×
737
      pInfo->pks[0].nData = varDataLen(skey);
×
738

739
      memcpy(pInfo->pks[1].pData, varDataVal(ekey), varDataLen(ekey));
×
740
      pInfo->pks[1].nData = varDataLen(ekey);
×
741
    }
742
  } else {
743
    if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
×
744
      GET_TYPED_DATA(val, int64_t, pColInfoData->info.type, ekey, typeGetTypeModFromColInfo(&pColInfoData->info));
×
745
      VALUE_SET_TRIVIAL_DATUM(&pInfo->pks[0], val);
×
746
      GET_TYPED_DATA(val, int64_t, pColInfoData->info.type, skey, typeGetTypeModFromColInfo(&pColInfoData->info));
×
747
      VALUE_SET_TRIVIAL_DATUM(&pInfo->pks[1], val);
×
748
    } else {  // todo refactor
749
      memcpy(pInfo->pks[0].pData, varDataVal(ekey), varDataLen(ekey));
×
750
      pInfo->pks[0].nData = varDataLen(ekey);
×
751

752
      memcpy(pInfo->pks[1].pData, varDataVal(skey), varDataLen(skey));
×
753
      pInfo->pks[1].nData = varDataLen(skey);
×
754
    }
755
  }
756

757
  return 0;
×
758
}
759

760
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
1,219✔
761
  int32_t code = 0;
1,219✔
762
  int32_t capacity = pDest->info.capacity;
1,219✔
763
  size_t  numOfCols = taosArrayGetSize(pDest->pDataBlock);
1,219✔
764
  for (int32_t i = 0; i < numOfCols; ++i) {
5,707✔
765
    SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
4,489✔
766
    SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
4,489✔
767
    if (pCol1 == NULL || pCol2 == NULL) {
4,482✔
768
      return terrno;
×
769
    }
770

771
    capacity = pDest->info.capacity;
4,485✔
772
    int32_t ret = colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows);
4,485✔
773
    if (ret < 0) {  // error occurs
4,489✔
774
      code = ret;
×
775
      return code;
×
776
    }
777
  }
778

779
  pDest->info.capacity = capacity;
1,218✔
780
  pDest->info.rows += pSrc->info.rows;
1,218✔
781
  return code;
1,218✔
782
}
783

784
int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t srcIdx, int32_t numOfRows) {
×
785
  int32_t code = 0;
×
786
  if (pDest->info.rows + numOfRows > pDest->info.capacity) {
×
787
    return TSDB_CODE_INVALID_PARA;
×
788
  }
789

790
  size_t numOfCols = taosArrayGetSize(pDest->pDataBlock);
×
791
  for (int32_t i = 0; i < numOfCols; ++i) {
×
792
    SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
×
793
    SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
×
794
    if (pCol2 == NULL || pCol1 == NULL) {
×
795
      return terrno;
×
796
    }
797

798
    code = colDataAssignNRows(pCol2, pDest->info.rows, pCol1, srcIdx, numOfRows);
×
799
    if (code) {
×
800
      return code;
×
801
    }
802
  }
803

804
  pDest->info.rows += numOfRows;
×
805
  return code;
×
806
}
807

808
void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) {
×
809
  if (numOfRows == 0) {
×
810
    return;
×
811
  }
812
  
813
  if (numOfRows >= pBlock->info.rows) {
×
814
    blockDataCleanup(pBlock);
×
815
    return;
×
816
  }
817

818
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
×
819
  for (int32_t i = 0; i < numOfCols; ++i) {
×
820
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
×
821
    if (pCol == NULL) {
×
822
      continue;
×
823
    }
824

825
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
×
826
      pCol->varmeta.length = pCol->varmeta.offset[pBlock->info.rows - numOfRows];
×
827
      memset(pCol->varmeta.offset + pBlock->info.rows - numOfRows, 0, sizeof(*pCol->varmeta.offset) * numOfRows);
×
828
    } else {
829
      int32_t i = pBlock->info.rows - numOfRows;
×
830
      for (; i < pBlock->info.rows; ++i) {
×
831
        if (BitPos(i)) {
×
832
          colDataClearNull_f(pCol->nullbitmap, i);
×
833
        } else {
834
          break;
×
835
        }
836
      }
837

838
      int32_t bytes = (pBlock->info.rows - i) / 8;
×
839
      memset(&BMCharPos(pCol->nullbitmap, i), 0, bytes);
×
840
      i += bytes * 8;
×
841

842
      for (; i < pBlock->info.rows; ++i) {
×
843
        colDataClearNull_f(pCol->nullbitmap, i);
×
844
      }
845
    }
846
  }
847

848
  pBlock->info.rows -= numOfRows;
×
849
}
850

851
size_t blockDataGetSize(const SSDataBlock* pBlock) {
215,077✔
852
  size_t total = 0;
215,077✔
853
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
215,077✔
854
  for (int32_t i = 0; i < numOfCols; ++i) {
1,290,223✔
855
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
1,075,130✔
856
    if (pColInfoData == NULL) {
1,075,131✔
857
      continue;
×
858
    }
859

860
    total += colDataGetFullLength(pColInfoData, pBlock->info.rows);
1,075,131✔
861
  }
862

863
  return total;
215,093✔
864
}
865

866
// the number of tuples can be fit in one page.
867
// Actual data rows pluses the corresponding meta data must fit in one memory buffer of the given page size.
868
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
520✔
869
                           int32_t pageSize) {
870
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
520✔
871
  int32_t numOfRows = pBlock->info.rows;
520✔
872

873
  int32_t bitmapChar = 1;
520✔
874

875
  size_t headerSize = sizeof(int32_t);
520✔
876
  size_t colHeaderSize = sizeof(int32_t) * numOfCols;
520✔
877

878
  // TODO speedup by checking if the whole page can fit in firstly.
879
  if (!hasVarCol) {
520✔
880
    size_t  rowSize = blockDataGetRowSize(pBlock);
×
881
    int32_t capacity = blockDataGetCapacityInRow(pBlock, pageSize, headerSize + colHeaderSize);
×
882
    if (capacity <= 0) {
×
883
      return terrno;
×
884
    }
885

886
    *stopIndex = startIndex + capacity - 1;
×
887
    if (*stopIndex >= numOfRows) {
×
888
      *stopIndex = numOfRows - 1;
×
889
    }
890

891
    return TSDB_CODE_SUCCESS;
×
892
  }
893
  // iterate the rows that can be fit in this buffer page
894
  int32_t size = (headerSize + colHeaderSize);
520✔
895
  for (int32_t j = startIndex; j < numOfRows; ++j) {
1,000,520✔
896
    for (int32_t i = 0; i < numOfCols; ++i) {
3,001,557✔
897
      SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, i);
2,001,038✔
898
      if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
2,001,038✔
899
        if (pColInfoData->varmeta.offset[j] != -1) {
1,000,519✔
900
          char* p = colDataGetData(pColInfoData, j);
1,000,519✔
901
          size += varDataTLen(p);
1,000,519✔
902
        }
903

904
        size += sizeof(pColInfoData->varmeta.offset[0]);
1,000,519✔
905
      } else {
906
        size += pColInfoData->info.bytes;
1,000,519✔
907

908
        if (((j - startIndex) & 0x07) == 0) {
1,000,519✔
909
          size += 1;  // the space for null bitmap
125,505✔
910
        }
911
      }
912
    }
913

914
    if (size > pageSize) {  // pageSize must be able to hold one row
1,000,519✔
915
      *stopIndex = j - 1;
519✔
916
      if (*stopIndex < startIndex) {
519✔
917
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
918
      }
919

920
      return TSDB_CODE_SUCCESS;
519✔
921
    }
922
  }
923

924
  // all fit in
925
  *stopIndex = numOfRows - 1;
1✔
926
  return TSDB_CODE_SUCCESS;
1✔
927
}
928

929
int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount, SSDataBlock** pResBlock) {
2,698✔
930
  int32_t code = 0;
2,698✔
931
  QRY_PARAM_CHECK(pResBlock);
2,698✔
932

933
  if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) {
2,698✔
934
    return TSDB_CODE_INVALID_PARA;
×
935
  }
936

937
  SSDataBlock* pDst = NULL;
2,698✔
938
  code = createOneDataBlock(pBlock, false, &pDst);
2,698✔
939
  if (code) {
2,698✔
940
    return code;
×
941
  }
942

943
  code = blockDataEnsureCapacity(pDst, rowCount);
2,698✔
944
  if (code) {
2,698✔
945
    blockDataDestroy(pDst);
×
946
    return code;
×
947
  }
948

949
  /* may have disorder varchar data, TODO
950
    for (int32_t i = 0; i < numOfCols; ++i) {
951
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
952
      SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
953

954
      colDataAssignNRows(pDstCol, 0, pColData, startIndex, rowCount);
955
    }
956
  */
957

958
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
2,698✔
959
  for (int32_t i = 0; i < numOfCols; ++i) {
5,474✔
960
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
2,776✔
961
    SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
2,776✔
962
    if (pColData == NULL || pDstCol == NULL) {
2,776✔
963
      continue;
×
964
    }
965

966
    for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
1,371,367✔
967
      bool isNull = false;
1,368,591✔
968
      if (pBlock->pBlockAgg == NULL) {
1,368,591✔
969
        isNull = colDataIsNull_s(pColData, j);
2,737,182✔
970
      } else {
971
        isNull = colDataIsNull(pColData, pBlock->info.rows, j, &pBlock->pBlockAgg[i]);
×
972
      }
973

974
      if (isNull) {
1,368,591✔
975
        colDataSetNULL(pDstCol, j - startIndex);
351✔
976
      } else {
977
        char* p = colDataGetData(pColData, j);
1,368,240✔
978
        code = colDataSetVal(pDstCol, j - startIndex, p, false);
1,368,240✔
979
        if (code) {
1,368,240✔
980
          break;
×
981
        }
982
      }
983
    }
984
  }
985

986
  pDst->info.rows = rowCount;
2,698✔
987
  *pResBlock = pDst;
2,698✔
988
  return code;
2,698✔
989
}
990

991
/**
992
 *
993
 * +------------------+---------------------------------------------+
994
 * |the number of rows|                    column #1                |
995
 * |    (4 bytes)     |------------+-----------------------+--------+
996
 * |                  | null bitmap| column length(4bytes) | values |
997
 * +------------------+------------+-----------------------+--------+
998
 * @param buf
999
 * @param pBlock
1000
 * @return
1001
 */
1002
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
44✔
1003
  // write the number of rows
1004
  *(uint32_t*)buf = pBlock->info.rows;
44✔
1005

1006
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
44✔
1007
  int32_t numOfRows = pBlock->info.rows;
44✔
1008

1009
  char* pStart = buf + sizeof(uint32_t);
44✔
1010

1011
  for (int32_t i = 0; i < numOfCols; ++i) {
189✔
1012
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
145✔
1013
    if (pCol == NULL) {
145✔
1014
      continue;
×
1015
    }
1016

1017
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
145✔
1018
      memcpy(pStart, pCol->varmeta.offset, numOfRows * sizeof(int32_t));
15✔
1019
      pStart += numOfRows * sizeof(int32_t);
15✔
1020
    } else {
1021
      memcpy(pStart, pCol->nullbitmap, BitmapLen(numOfRows));
130✔
1022
      pStart += BitmapLen(pBlock->info.rows);
130✔
1023
    }
1024

1025
    uint32_t dataSize = colDataGetLength(pCol, numOfRows);
145✔
1026

1027
    *(int32_t*)pStart = dataSize;
145✔
1028
    pStart += sizeof(int32_t);
145✔
1029

1030
    if (pCol->reassigned && IS_VAR_DATA_TYPE(pCol->info.type)) {
145✔
1031
      for (int32_t row = 0; row < numOfRows; ++row) {
×
1032
        char*   pColData = pCol->pData + pCol->varmeta.offset[row];
×
1033
        int32_t colSize = 0;
×
1034
        if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
×
1035
          colSize = getJsonValueLen(pColData);
×
1036
        } else {
1037
          colSize = varDataTLen(pColData);
×
1038
        }
1039
        memcpy(pStart, pColData, colSize);
×
1040
        pStart += colSize;
×
1041
      }
1042
    } else {
1043
      if (dataSize != 0) {
145✔
1044
        // ubsan reports error if pCol->pData==NULL && dataSize==0
1045
        memcpy(pStart, pCol->pData, dataSize);
145✔
1046
      }
1047
      pStart += dataSize;
145✔
1048
    }
1049
  }
1050

1051
  return 0;
44✔
1052
}
1053

1054
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
41✔
1055
  int32_t numOfRows = *(int32_t*)buf;
41✔
1056
  if (numOfRows == 0) {
41✔
1057
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1058
  }
1059
  int32_t code = blockDataEnsureCapacity(pBlock, numOfRows);
41✔
1060
  if (code) {
41✔
1061
    return code;
×
1062
  }
1063

1064
  pBlock->info.rows = numOfRows;
41✔
1065
  size_t      numOfCols = taosArrayGetSize(pBlock->pDataBlock);
41✔
1066
  const char* pStart = buf + sizeof(uint32_t);
41✔
1067

1068
  for (int32_t i = 0; i < numOfCols; ++i) {
177✔
1069
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
136✔
1070
    if (pCol == NULL) {
136✔
1071
      continue;
×
1072
    }
1073

1074
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
136✔
1075
      size_t metaSize = pBlock->info.rows * sizeof(int32_t);
15✔
1076
      memcpy(pCol->varmeta.offset, pStart, metaSize);
15✔
1077
      pStart += metaSize;
15✔
1078
    } else {
1079
      memcpy(pCol->nullbitmap, pStart, BitmapLen(pBlock->info.rows));
121✔
1080
      pStart += BitmapLen(pBlock->info.rows);
121✔
1081
    }
1082

1083
    int32_t colLength = *(int32_t*)pStart;
136✔
1084
    pStart += sizeof(int32_t);
136✔
1085

1086
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
136✔
1087
      if (pCol->varmeta.allocLen < colLength) {
15✔
1088
        char* tmp = taosMemoryRealloc(pCol->pData, colLength);
15✔
1089
        if (tmp == NULL) {
15✔
1090
          return terrno;
×
1091
        }
1092

1093
        pCol->pData = tmp;
15✔
1094
        pCol->varmeta.allocLen = colLength;
15✔
1095
      }
1096

1097
      pCol->varmeta.length = colLength;
15✔
1098
      if (pCol->varmeta.length > pCol->varmeta.allocLen) {
15✔
1099
        return TSDB_CODE_FAILED;
×
1100
      }
1101
    }
1102
    if (colLength != 0) {
136✔
1103
      // ubsan reports error if colLength==0 && pCol->pData == 0
1104
      memcpy(pCol->pData, pStart, colLength);
136✔
1105
    }
1106
    pStart += colLength;
136✔
1107
  }
1108

1109
  return TSDB_CODE_SUCCESS;
41✔
1110
}
1111

1112
static bool colDataIsNNull(const SColumnInfoData* pColumnInfoData, int32_t startIndex, uint32_t nRows) {
×
1113
  if (!pColumnInfoData->hasNull) {
×
1114
    return false;
×
1115
  }
1116

1117
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
×
1118
    for (int32_t i = startIndex; i < nRows; ++i) {
×
1119
      if (!colDataIsNull_var(pColumnInfoData, i)) {
×
1120
        return false;
×
1121
      }
1122
    }
1123
  } else {
1124
    if (pColumnInfoData->nullbitmap == NULL) {
×
1125
      return false;
×
1126
    }
1127

1128
    for (int32_t i = startIndex; i < nRows; ++i) {
×
1129
      if (!colDataIsNull_f(pColumnInfoData->nullbitmap, i)) {
×
1130
        return false;
×
1131
      }
1132
    }
1133
  }
1134

1135
  return true;
×
1136
}
1137

1138
// todo remove this
1139
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) {
×
1140
  pBlock->info.rows = *(int32_t*)buf;
×
1141
  pBlock->info.id.groupId = *(uint64_t*)(buf + sizeof(int32_t));
×
1142

1143
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
×
1144

1145
  const char* pStart = buf + sizeof(uint32_t) + sizeof(uint64_t);
×
1146

1147
  for (int32_t i = 0; i < numOfCols; ++i) {
×
1148
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
×
1149
    if (pCol == NULL) {
×
1150
      continue;
×
1151
    }
1152

1153
    pCol->hasNull = true;
×
1154

1155
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
×
1156
      size_t metaSize = capacity * sizeof(int32_t);
×
1157
      memcpy(pCol->varmeta.offset, pStart, metaSize);
×
1158
      pStart += metaSize;
×
1159
    } else {
1160
      memcpy(pCol->nullbitmap, pStart, BitmapLen(capacity));
×
1161
      pStart += BitmapLen(capacity);
×
1162
    }
1163

1164
    int32_t colLength = *(int32_t*)pStart;
×
1165
    pStart += sizeof(int32_t);
×
1166

1167
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
×
1168
      if (pCol->varmeta.allocLen < colLength) {
×
1169
        char* tmp = taosMemoryRealloc(pCol->pData, colLength);
×
1170
        if (tmp == NULL) {
×
1171
          return terrno;
×
1172
        }
1173

1174
        pCol->pData = tmp;
×
1175
        pCol->varmeta.allocLen = colLength;
×
1176
      }
1177

1178
      pCol->varmeta.length = colLength;
×
1179
      if (pCol->varmeta.length > pCol->varmeta.allocLen) {
×
1180
        return TSDB_CODE_FAILED;
×
1181
      }
1182
    }
1183

1184
    if (!colDataIsNNull(pCol, 0, pBlock->info.rows)) {
×
1185
      memcpy(pCol->pData, pStart, colLength);
×
1186
    }
1187

1188
    pStart += pCol->info.bytes * capacity;
×
1189
  }
1190

1191
  return TSDB_CODE_SUCCESS;
×
1192
}
1193

1194
size_t blockDataGetRowSize(SSDataBlock* pBlock) {
361✔
1195
  if (pBlock->info.rowSize == 0) {
361✔
1196
    size_t rowSize = 0;
×
1197

1198
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
×
1199
    for (int32_t i = 0; i < numOfCols; ++i) {
×
1200
      SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
×
1201
      if (pColInfo == NULL) {
×
1202
        continue;
×
1203
      }
1204

1205
      rowSize += pColInfo->info.bytes;
×
1206
    }
1207

1208
    pBlock->info.rowSize = rowSize;
×
1209
  }
1210

1211
  return pBlock->info.rowSize;
361✔
1212
}
1213

1214
/**
1215
 * @refitem blockDataToBuf for the meta size
1216
 * @param pBlock
1217
 * @return
1218
 */
1219
size_t blockDataGetSerialMetaSize(uint32_t numOfCols) {
427,988✔
1220
  // | version | total length | total rows | blankFull | total columns | flag seg| block group id | column schema
1221
  // | each column length
1222
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(bool) + sizeof(int32_t) + sizeof(int32_t) +
1223
         sizeof(uint64_t) + numOfCols * (sizeof(int8_t) + sizeof(int32_t)) + numOfCols * sizeof(int32_t);
427,988✔
1224
}
1225

1226
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
40✔
1227
  double rowSize = 0;
40✔
1228

1229
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
40✔
1230
  for (int32_t i = 0; i < numOfCols; ++i) {
174✔
1231
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
134✔
1232
    if (pColInfo == NULL) {
134✔
1233
      continue;
×
1234
    }
1235

1236
    rowSize += pColInfo->info.bytes;
134✔
1237
    if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
134✔
1238
      rowSize += sizeof(int32_t);
15✔
1239
    } else {
1240
      rowSize += 1 / 8.0;  // one bit for each record
119✔
1241
    }
1242
  }
1243

1244
  return rowSize;
40✔
1245
}
1246

1247
typedef struct SSDataBlockSortHelper {
1248
  SArray*      orderInfo;  // SArray<SBlockOrderInfo>
1249
  SSDataBlock* pDataBlock;
1250
} SSDataBlockSortHelper;
1251

1252
int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
17,076✔
1253
  const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*)param;
17,076✔
1254

1255
  SSDataBlock* pDataBlock = pHelper->pDataBlock;
17,076✔
1256

1257
  int32_t left = *(int32_t*)p1;
17,076✔
1258
  int32_t right = *(int32_t*)p2;
17,076✔
1259

1260
  SArray* pInfo = pHelper->orderInfo;
17,076✔
1261

1262
  for (int32_t i = 0; i < pInfo->size; ++i) {
22,468✔
1263
    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
20,580✔
1264
    SColumnInfoData* pColInfoData = pOrder->pColData;  // TARRAY_GET_ELEM(pDataBlock->pDataBlock, pOrder->colIndex);
20,580✔
1265

1266
    if (pColInfoData->hasNull) {
20,580✔
1267
      bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, NULL);
20,580✔
1268
      bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, right, NULL);
20,580✔
1269
      if (leftNull && rightNull) {
20,580✔
1270
        continue;  // continue to next slot
88✔
1271
      }
1272

1273
      if (rightNull) {
20,492✔
1274
        return pOrder->nullFirst ? 1 : -1;
48✔
1275
      }
1276

1277
      if (leftNull) {
20,444✔
1278
        return pOrder->nullFirst ? -1 : 1;
57✔
1279
      }
1280
    }
1281

1282
    void* left1 = colDataGetData(pColInfoData, left);
20,387✔
1283
    void* right1 = colDataGetData(pColInfoData, right);
20,387✔
1284
    if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
20,387✔
1285
      if (tTagIsJson(left1) || tTagIsJson(right1)) {
×
1286
        terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
×
1287
        return 0;
×
1288
      }
1289
    }
1290

1291
    __compar_fn_t fn;
1292
    if (pOrder->compFn) {
20,387✔
1293
      fn = pOrder->compFn;
20,387✔
1294
    } else {
1295
      fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
×
1296
    }
1297

1298
    int ret = fn(left1, right1);
20,387✔
1299
    if (ret == 0) {
20,387✔
1300
      continue;
5,304✔
1301
    } else {
1302
      return ret;
15,083✔
1303
    }
1304
  }
1305

1306
  return 0;
1,888✔
1307
}
1308

1309
static void blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, const int32_t* index) {
48✔
1310
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
48✔
1311
  for (int32_t i = 0; i < numOfCols; ++i) {
186✔
1312
    SColumnInfoData* pDst = &pCols[i];
138✔
1313
    SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
138✔
1314
    if (pSrc == NULL) {
138✔
1315
      continue;
×
1316
    }
1317

1318
    if (IS_VAR_DATA_TYPE(pSrc->info.type)) {
138✔
1319
      if (pSrc->varmeta.length != 0) {
11✔
1320
        memcpy(pDst->pData, pSrc->pData, pSrc->varmeta.length);
10✔
1321
      }
1322
      pDst->varmeta.length = pSrc->varmeta.length;
11✔
1323

1324
      for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
472✔
1325
        pDst->varmeta.offset[j] = pSrc->varmeta.offset[index[j]];
461✔
1326
      }
1327
    } else {
1328
      for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
6,250✔
1329
        if (colDataIsNull_f(pSrc->nullbitmap, index[j])) {
6,123✔
1330
          colDataSetNull_f_s(pDst, j);
83✔
1331
          continue;
83✔
1332
        }
1333
        memcpy(pDst->pData + j * pDst->info.bytes, pSrc->pData + index[j] * pDst->info.bytes, pDst->info.bytes);
6,040✔
1334
      }
1335
    }
1336
  }
1337
}
48✔
1338

1339
static int32_t createHelpColInfoData(const SSDataBlock* pDataBlock, SColumnInfoData** ppCols) {
48✔
1340
  int32_t code = 0;
48✔
1341
  int32_t rows = pDataBlock->info.capacity;
48✔
1342
  size_t  numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
48✔
1343
  int32_t i = 0;
48✔
1344

1345
  *ppCols = NULL;
48✔
1346

1347
  SColumnInfoData* pCols = taosMemoryCalloc(numOfCols, sizeof(SColumnInfoData));
48✔
1348
  if (pCols == NULL) {
48✔
1349
    return terrno;
×
1350
  }
1351

1352
  for (i = 0; i < numOfCols; ++i) {
186✔
1353
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
138✔
1354
    if (pColInfoData == NULL) {
138✔
1355
      continue;
×
1356
    }
1357

1358
    pCols[i].info = pColInfoData->info;
138✔
1359
    if (IS_VAR_DATA_TYPE(pCols[i].info.type)) {
138✔
1360
      pCols[i].varmeta.offset = taosMemoryCalloc(rows, sizeof(int32_t));
11✔
1361
      pCols[i].pData = taosMemoryCalloc(1, pColInfoData->varmeta.length);
11✔
1362
      if (pCols[i].varmeta.offset == NULL || pCols[i].pData == NULL) {
11✔
1363
        code = terrno;
×
1364
        taosMemoryFree(pCols[i].varmeta.offset);
×
1365
        taosMemoryFree(pCols[i].pData);
×
1366
        goto _error;
×
1367
      }
1368

1369
      pCols[i].varmeta.length = pColInfoData->varmeta.length;
11✔
1370
      pCols[i].varmeta.allocLen = pCols[i].varmeta.length;
11✔
1371
    } else {
1372
      pCols[i].nullbitmap = taosMemoryCalloc(1, BitmapLen(rows));
127✔
1373
      pCols[i].pData = taosMemoryCalloc(rows, pCols[i].info.bytes);
127✔
1374
      if (pCols[i].nullbitmap == NULL || pCols[i].pData == NULL) {
127✔
1375
        code = terrno;
×
1376
        taosMemoryFree(pCols[i].nullbitmap);
×
1377
        taosMemoryFree(pCols[i].pData);
×
1378
        goto _error;
×
1379
      }
1380
    }
1381
  }
1382

1383
  *ppCols = pCols;
48✔
1384
  return code;
48✔
1385

1386
  _error:
×
1387
  for(int32_t j = 0; j < i; ++j) {
×
1388
    colDataDestroy(&pCols[j]);
×
1389
  }
1390

1391
  taosMemoryFree(pCols);
×
1392
  return code;
×
1393
}
1394

1395
static void copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) {
48✔
1396
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
48✔
1397

1398
  for (int32_t i = 0; i < numOfCols; ++i) {
186✔
1399
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
138✔
1400
    if (pColInfoData == NULL) {
138✔
1401
      continue;
×
1402
    }
1403

1404
    pColInfoData->info = pCols[i].info;
138✔
1405
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
138✔
1406
      taosMemoryFreeClear(pColInfoData->varmeta.offset);
11✔
1407
      pColInfoData->varmeta = pCols[i].varmeta;
11✔
1408
    } else {
1409
      taosMemoryFreeClear(pColInfoData->nullbitmap);
127✔
1410
      pColInfoData->nullbitmap = pCols[i].nullbitmap;
127✔
1411
    }
1412

1413
    taosMemoryFreeClear(pColInfoData->pData);
138✔
1414
    pColInfoData->pData = pCols[i].pData;
138✔
1415
  }
1416

1417
  taosMemoryFreeClear(pCols);
48✔
1418
}
48✔
1419

1420
static int32_t* createTupleIndex(size_t rows) {
48✔
1421
  int32_t* index = taosMemoryCalloc(rows, sizeof(int32_t));
48✔
1422
  if (index == NULL) {
48✔
1423
    return NULL;
×
1424
  }
1425

1426
  for (int32_t i = 0; i < rows; ++i) {
2,938✔
1427
    index[i] = i;
2,890✔
1428
  }
1429

1430
  return index;
48✔
1431
}
1432

1433
static void destroyTupleIndex(int32_t* index) { taosMemoryFreeClear(index); }
48✔
1434

1435
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
49✔
1436
  if (pDataBlock->info.rows <= 1) {
49✔
1437
    return TSDB_CODE_SUCCESS;
1✔
1438
  }
1439

1440
  // Allocate the additional buffer.
1441
  uint32_t rows = pDataBlock->info.rows;
48✔
1442

1443
  bool sortColumnHasNull = false;
48✔
1444
  bool varTypeSort = false;
48✔
1445

1446
  for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
125✔
1447
    SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
77✔
1448
    if (pInfo == NULL) {
77✔
1449
      continue;
×
1450
    }
1451

1452
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
77✔
1453
    if (pColInfoData == NULL) {
77✔
1454
      continue;
×
1455
    }
1456

1457
    if (pColInfoData->hasNull) {
77✔
1458
      sortColumnHasNull = true;
77✔
1459
    }
1460

1461
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
77✔
1462
      varTypeSort = true;
5✔
1463
    }
1464
  }
1465

1466
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
48✔
1467

1468
  if (taosArrayGetSize(pOrderInfo) == 1 && (!sortColumnHasNull)) {
48✔
1469
    if (numOfCols == 1) {
×
1470
      if (!varTypeSort) {
×
1471
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, 0);
×
1472
        SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, 0);
×
1473
        if (pColInfoData == NULL || pOrder == NULL) {
×
1474
          return terrno;
×
1475
        }
1476

1477
        int64_t p0 = taosGetTimestampUs();
×
1478

1479
        __compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
×
1480
        taosSort(pColInfoData->pData, pDataBlock->info.rows, pColInfoData->info.bytes, fn);
×
1481

1482
        int64_t p1 = taosGetTimestampUs();
×
1483
        uDebug("blockDataSort easy cost:%" PRId64 ", rows:%" PRId64 "\n", p1 - p0, pDataBlock->info.rows);
×
1484

1485
        return TSDB_CODE_SUCCESS;
×
1486
      } else {  // var data type
1487
      }
1488
    } else if (numOfCols == 2) {
1489
    }
1490
  }
1491

1492
  int32_t* index = createTupleIndex(rows);
48✔
1493
  if (index == NULL) {
48✔
1494
    return terrno;
×
1495
  }
1496

1497
  int64_t p0 = taosGetTimestampUs();
48✔
1498

1499
  SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
48✔
1500
  for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) {
125✔
1501
    struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i);
77✔
1502
    if (pInfo == NULL) {
77✔
1503
      continue;
×
1504
    }
1505

1506
    pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
77✔
1507
    if (pInfo->pColData == NULL) {
77✔
1508
      continue;
×
1509
    }
1510
    pInfo->compFn = getKeyComparFunc(pInfo->pColData->info.type, pInfo->order);
77✔
1511
  }
1512

1513
  terrno = 0;
48✔
1514
  taosqsort_r(index, rows, sizeof(int32_t), &helper, dataBlockCompar);
48✔
1515
  if (terrno) return terrno;
48✔
1516

1517
  int64_t p1 = taosGetTimestampUs();
48✔
1518

1519
  SColumnInfoData* pCols = NULL;
48✔
1520
  int32_t code = createHelpColInfoData(pDataBlock, &pCols);
48✔
1521
  if (code != 0) {
48✔
1522
    destroyTupleIndex(index);
×
1523
    return code;
×
1524
  }
1525

1526
  int64_t p2 = taosGetTimestampUs();
48✔
1527
  blockDataAssign(pCols, pDataBlock, index);
48✔
1528

1529
  int64_t p3 = taosGetTimestampUs();
48✔
1530
  copyBackToBlock(pDataBlock, pCols);
48✔
1531

1532
  int64_t p4 = taosGetTimestampUs();
48✔
1533
  uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64
48✔
1534
         ", rows:%d\n",
1535
         p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows);
1536

1537
  destroyTupleIndex(index);
48✔
1538
  return TSDB_CODE_SUCCESS;
48✔
1539
}
1540

1541
void blockDataCleanup(SSDataBlock* pDataBlock) {
1,093,575✔
1542
  blockDataEmpty(pDataBlock);
1,093,575✔
1543
  SDataBlockInfo* pInfo = &pDataBlock->info;
1,093,358✔
1544
  pInfo->id.uid = 0;
1,093,358✔
1545
  pInfo->id.groupId = 0;
1,093,358✔
1546
}
1,093,358✔
1547

1548
void blockDataEmpty(SSDataBlock* pDataBlock) {
1,093,645✔
1549
  SDataBlockInfo* pInfo = &pDataBlock->info;
1,093,645✔
1550
  if (pInfo->capacity == 0) {
1,093,645✔
1551
    return;
220,468✔
1552
  }
1553

1554
  taosMemoryFreeClear(pDataBlock->pBlockAgg);
873,177✔
1555

1556
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
873,177✔
1557
  for (int32_t i = 0; i < numOfCols; ++i) {
4,317,056✔
1558
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
3,444,125✔
1559
    if (p == NULL) {
3,444,025✔
1560
      continue;
×
1561
    }
1562

1563
    colInfoDataCleanup(p, pInfo->capacity);
3,444,025✔
1564
  }
1565

1566
  pInfo->rows = 0;
872,931✔
1567
  pInfo->dataLoad = 0;
872,931✔
1568
  pInfo->window.ekey = 0;
872,931✔
1569
  pInfo->window.skey = 0;
872,931✔
1570
}
1571

1572
void blockDataReset(SSDataBlock* pDataBlock) {
×
1573
  SDataBlockInfo* pInfo = &pDataBlock->info;
×
1574
  if (pInfo->capacity == 0) {
×
1575
    return;
×
1576
  }
1577

1578
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
×
1579
  for (int32_t i = 0; i < numOfCols; ++i) {
×
1580
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
×
1581
    if (p == NULL) {
×
1582
      continue;
×
1583
    }
1584

1585
    p->hasNull = false;
×
1586
    p->reassigned = false;
×
1587
    if (IS_VAR_DATA_TYPE(p->info.type)) {
×
1588
      p->varmeta.length = 0;
×
1589
    }
1590
  }
1591

1592
  pInfo->rows = 0;
×
1593
  pInfo->dataLoad = 0;
×
1594
  pInfo->window.ekey = 0;
×
1595
  pInfo->window.skey = 0;
×
1596
  pInfo->id.uid = 0;
×
1597
  pInfo->id.groupId = 0;
×
1598
}
1599

1600
/*
1601
 * NOTE: the type of the input column may be TSDB_DATA_TYPE_NULL, which is used to denote
1602
 * the all NULL value in this column. It is an internal representation of all NULL value column, and no visible to
1603
 * any users. The length of TSDB_DATA_TYPE_NULL is 0, and it is an special case.
1604
 */
1605
int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows,
252,686✔
1606
                         bool clearPayload) {
1607
  if ((numOfRows <= 0)|| (pBlockInfo && numOfRows <= pBlockInfo->capacity)) {
252,686✔
1608
    return TSDB_CODE_SUCCESS;
×
1609
  }
1610

1611
  int32_t existedRows = pBlockInfo ? pBlockInfo->rows : 0;
252,686✔
1612

1613
  if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
252,686✔
1614
    char* tmp = taosMemoryRealloc(pColumn->varmeta.offset, sizeof(int32_t) * numOfRows);
126,193✔
1615
    if (tmp == NULL) {
126,251✔
1616
      return terrno;
×
1617
    }
1618

1619
    pColumn->varmeta.offset = (int32_t*)tmp;
126,251✔
1620
    memset(&pColumn->varmeta.offset[existedRows], 0, sizeof(int32_t) * (numOfRows - existedRows));
126,251✔
1621
  } else {
1622
    // prepare for the null bitmap
1623
    char* tmp = taosMemoryRealloc(pColumn->nullbitmap, BitmapLen(numOfRows));
126,493✔
1624
    if (tmp == NULL) {
126,494✔
1625
      return terrno;
×
1626
    }
1627

1628
    int32_t oldLen = BitmapLen(existedRows);
126,494✔
1629
    pColumn->nullbitmap = tmp;
126,494✔
1630
    memset(&pColumn->nullbitmap[oldLen], 0, BitmapLen(numOfRows) - oldLen);
126,494✔
1631
    if (pColumn->info.bytes == 0) {
126,494✔
1632
      return TSDB_CODE_INVALID_PARA;
×
1633
    }
1634

1635
    // here we employ the aligned malloc function, to make sure that the address of allocated memory is aligned
1636
    // to MALLOC_ALIGN_BYTES
1637
    tmp = taosMemoryMallocAlign(MALLOC_ALIGN_BYTES, numOfRows * pColumn->info.bytes);
126,494✔
1638
    if (tmp == NULL) {
126,491✔
1639
      return terrno;
×
1640
    }
1641
    // memset(tmp, 0, numOfRows * pColumn->info.bytes);
1642

1643
    // copy back the existed data
1644
    if (pColumn->pData != NULL) {
126,491✔
1645
      memcpy(tmp, pColumn->pData, existedRows * pColumn->info.bytes);
625✔
1646
      taosMemoryFreeClear(pColumn->pData);
625✔
1647
    }
1648

1649
    pColumn->pData = tmp;
126,492✔
1650

1651
    // check if the allocated memory is aligned to the requried bytes.
1652
#if defined LINUX
1653
    if ((((uint64_t)pColumn->pData) & (MALLOC_ALIGN_BYTES - 1)) != 0x0) {
126,492✔
1654
      return TSDB_CODE_FAILED;
×
1655
    }
1656
#endif
1657

1658
    if (clearPayload) {
126,492✔
1659
      memset(tmp + pColumn->info.bytes * existedRows, 0, pColumn->info.bytes * (numOfRows - existedRows));
89,845✔
1660
    }
1661
  }
1662

1663
  return TSDB_CODE_SUCCESS;
252,743✔
1664
}
1665

1666
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
3,790,785✔
1667
  pColumn->hasNull = false;
3,790,785✔
1668

1669
  if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
3,790,785✔
1670
    pColumn->varmeta.length = 0;
930,872✔
1671
    if (pColumn->varmeta.offset != NULL) {
930,872✔
1672
      memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows);
932,992✔
1673
    }
1674
  } else {
1675
    if (pColumn->nullbitmap != NULL) {
2,859,913✔
1676
      memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
2,859,895✔
1677
    }
1678
  }
1679
}
3,790,785✔
1680

1681
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload) {
204,544✔
1682
  SDataBlockInfo info = {0};
204,544✔
1683
  return doEnsureCapacity(pColumn, &info, numOfRows, clearPayload);
204,544✔
1684
}
1685

1686
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
650,467✔
1687
  int32_t code = 0;
650,467✔
1688
  if (numOfRows == 0 || numOfRows <= pDataBlock->info.capacity) {
650,467✔
1689
    return TSDB_CODE_SUCCESS;
636,508✔
1690
  }
1691

1692
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
13,959✔
1693
  for (int32_t i = 0; i < numOfCols; ++i) {
62,103✔
1694
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
48,135✔
1695
    if (p == NULL) {
48,138✔
1696
      return terrno;
×
1697
    }
1698

1699
    code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, false);
48,138✔
1700
    if (code) {
48,143✔
1701
      return code;
×
1702
    }
1703
  }
1704

1705
  pDataBlock->info.capacity = numOfRows;
13,968✔
1706
  return TSDB_CODE_SUCCESS;
13,968✔
1707
}
1708

1709
void blockDataFreeRes(SSDataBlock* pBlock) {
18,367✔
1710
  if (pBlock == NULL){
18,367✔
1711
    return;
1✔
1712
  }
1713

1714
  int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
18,366✔
1715
  for (int32_t i = 0; i < numOfOutput; ++i) {
80,851✔
1716
    SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
62,478✔
1717
    if (pColInfoData == NULL) {
62,477✔
1718
      continue;
×
1719
    }
1720

1721
    colDataDestroy(pColInfoData);
62,477✔
1722
  }
1723

1724
  taosArrayDestroy(pBlock->pDataBlock);
18,373✔
1725
  pBlock->pDataBlock = NULL;
18,369✔
1726

1727
  taosMemoryFreeClear(pBlock->pBlockAgg);
18,369✔
1728
  memset(&pBlock->info, 0, sizeof(SDataBlockInfo));
18,362✔
1729
}
1730

1731
void blockDataDestroy(SSDataBlock* pBlock) {
15,837✔
1732
  if (pBlock == NULL) {
15,837✔
1733
    return;
1,742✔
1734
  }
1735

1736
  if (IS_VAR_DATA_TYPE(pBlock->info.pks[0].type)) {
14,095✔
1737
    taosMemoryFreeClear(pBlock->info.pks[0].pData);
×
1738
    taosMemoryFreeClear(pBlock->info.pks[1].pData);
×
1739
  }
1740

1741
  blockDataFreeRes(pBlock);
14,095✔
1742
  taosMemoryFreeClear(pBlock);
14,090✔
1743
}
1744

1745
// todo remove it
1746
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
678✔
1747
  int32_t code = 0;
678✔
1748

1749
  dst->info = src->info;
678✔
1750
  dst->info.pks[0].pData = NULL;
678✔
1751
  dst->info.pks[1].pData = NULL;
678✔
1752
  dst->info.rows = 0;
678✔
1753
  dst->info.capacity = 0;
678✔
1754

1755
  size_t numOfCols = taosArrayGetSize(src->pDataBlock);
678✔
1756
  for (int32_t i = 0; i < numOfCols; ++i) {
2,705✔
1757
    SColumnInfoData* p = taosArrayGet(src->pDataBlock, i);
2,027✔
1758
    if (p == NULL) {
2,027✔
1759
      return terrno;
×
1760
    }
1761

1762
    SColumnInfoData  colInfo = {.hasNull = true, .info = p->info};
2,027✔
1763
    code = blockDataAppendColInfo(dst, &colInfo);
2,027✔
1764
    if (code) {
2,027✔
1765
      return code;
×
1766
    }
1767
  }
1768

1769
  code = blockDataEnsureCapacity(dst, src->info.rows);
678✔
1770
  if (code != TSDB_CODE_SUCCESS) {
678✔
1771
    return code;
×
1772
  }
1773

1774
  for (int32_t i = 0; i < numOfCols; ++i) {
2,705✔
1775
    SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i);
2,027✔
1776
    SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i);
2,027✔
1777
    if (pSrc == NULL || pDst == NULL || (pSrc->pData == NULL && (!IS_VAR_DATA_TYPE(pSrc->info.type)))) {
2,027✔
1778
      continue;
×
1779
    }
1780

1781
    int32_t ret = colDataAssign(pDst, pSrc, src->info.rows, &src->info);
2,027✔
1782
    if (ret < 0) {
2,027✔
1783
      return ret;
×
1784
    }
1785
  }
1786

1787
  uint32_t cap = dst->info.capacity;
678✔
1788
  dst->info = src->info;
678✔
1789
  dst->info.pks[0].pData = NULL;
678✔
1790
  dst->info.pks[1].pData = NULL;
678✔
1791
  dst->info.capacity = cap;
678✔
1792
  uTrace("%s,parName:%s, groupId:%"PRIu64, __FUNCTION__, dst->info.parTbName, dst->info.id.groupId)
678✔
1793
  return code;
678✔
1794
}
1795

1796
int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) {
3,511✔
1797
  blockDataCleanup(pDst);
3,511✔
1798

1799
  int32_t code = blockDataEnsureCapacity(pDst, pSrc->info.rows);
3,511✔
1800
  if (code != TSDB_CODE_SUCCESS) {
3,511✔
1801
    return code;
×
1802
  }
1803

1804
  size_t numOfCols = taosArrayGetSize(pSrc->pDataBlock);
3,511✔
1805
  for (int32_t i = 0; i < numOfCols; ++i) {
8,002✔
1806
    SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
4,491✔
1807
    SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, i);
4,491✔
1808
    if (pDstCol == NULL || pSrcCol == NULL) {
4,491✔
1809
      continue;
×
1810
    }
1811

1812
    int32_t ret = colDataAssign(pDstCol, pSrcCol, pSrc->info.rows, &pSrc->info);
4,491✔
1813
    if (ret < 0) {
4,491✔
1814
      code = ret;
×
1815
      return code;
×
1816
    }
1817
  }
1818

1819
  uint32_t cap = pDst->info.capacity;
3,511✔
1820

1821
  if (IS_VAR_DATA_TYPE(pDst->info.pks[0].type)) {
3,511✔
1822
    taosMemoryFreeClear(pDst->info.pks[0].pData);
×
1823
  }
1824

1825
  if (IS_VAR_DATA_TYPE(pDst->info.pks[1].type)) {
3,511✔
1826
    taosMemoryFreeClear(pDst->info.pks[1].pData);
×
1827
  }
1828

1829
  pDst->info = pSrc->info;
3,511✔
1830
  code = copyPkVal(&pDst->info, &pSrc->info);
3,511✔
1831
  if (code != TSDB_CODE_SUCCESS) {
3,511✔
1832
    uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1833
    return code;
×
1834
  }
1835

1836
  pDst->info.capacity = cap;
3,511✔
1837
  return code;
3,511✔
1838
}
1839

1840
int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock) {
391✔
1841
  QRY_PARAM_CHECK(pBlock);
391✔
1842

1843
  int32_t      code = 0;
391✔
1844
  SSDataBlock* p = taosMemoryCalloc(1, sizeof(SSDataBlock));
391✔
1845
  if (p == NULL) {
391✔
1846
    return terrno;
×
1847
  }
1848

1849
  p->info.hasVarCol = false;
391✔
1850
  p->info.id.groupId = 0;
391✔
1851
  p->info.rows = 0;
391✔
1852
  p->info.type = type;
391✔
1853
  p->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) +
391✔
1854
                    sizeof(TSKEY) + VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
1855
  p->info.watermark = INT64_MIN;
391✔
1856

1857
  p->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
391✔
1858
  if (p->pDataBlock == NULL) {
391✔
1859
    taosMemoryFree(p);
×
1860
    return terrno;
×
1861
  }
1862

1863
  SColumnInfoData infoData = {0};
391✔
1864
  infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
391✔
1865
  infoData.info.bytes = sizeof(TSKEY);
391✔
1866

1867
  // window start ts
1868
  void* px = taosArrayPush(p->pDataBlock, &infoData);
391✔
1869
  if (px == NULL) {
391✔
1870
    code = terrno;
×
1871
    goto _err;
×
1872
  }
1873

1874
  // window end ts
1875
  px = taosArrayPush(p->pDataBlock, &infoData);
391✔
1876
  if (px == NULL) {
391✔
1877
    code = terrno;
×
1878
    goto _err;
×
1879
  }
1880

1881
  infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
391✔
1882
  infoData.info.bytes = sizeof(uint64_t);
391✔
1883

1884
  // uid
1885
  px = taosArrayPush(p->pDataBlock, &infoData);
391✔
1886
  if (px == NULL) {
391✔
1887
    code = terrno;
×
1888
    goto _err;
×
1889
  }
1890

1891
  // group id
1892
  px = taosArrayPush(p->pDataBlock, &infoData);
391✔
1893
  if (px == NULL) {
391✔
1894
    code = terrno;
×
1895
    goto _err;
×
1896
  }
1897

1898
  infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
391✔
1899
  infoData.info.bytes = sizeof(TSKEY);
391✔
1900

1901
  // calculate start ts
1902
  px = taosArrayPush(p->pDataBlock, &infoData);
391✔
1903
  if (px == NULL) {
391✔
1904
    code = terrno;
×
1905
    goto _err;
×
1906
  }
1907

1908
  // calculate end ts
1909
  px = taosArrayPush(p->pDataBlock, &infoData);
391✔
1910
  if (px == NULL) {
391✔
1911
    code = terrno;
×
1912
    goto _err;
×
1913
  }
1914

1915
  // table name
1916
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
391✔
1917
  infoData.info.bytes = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
391✔
1918
  px = taosArrayPush(p->pDataBlock, &infoData);
391✔
1919
  if (px == NULL) {
391✔
1920
    code = terrno;
×
1921
    goto _err;
×
1922
  }
1923

1924
  *pBlock = p;
391✔
1925
  return code;
391✔
1926

1927
_err:
×
1928
  taosArrayDestroy(p->pDataBlock);
×
1929
  taosMemoryFree(p);
×
1930
  return code;
×
1931
}
1932

1933
int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlock** pResBlock) {
10✔
1934
  QRY_PARAM_CHECK(pResBlock);
10✔
1935

1936
  if (pDataBlock == NULL) {
10✔
1937
    return TSDB_CODE_INVALID_PARA;
×
1938
  }
1939

1940
  SSDataBlock* pBlock = NULL;
10✔
1941
  int32_t code = createDataBlock(&pBlock);
10✔
1942
  if (code) {
10✔
1943
    return code;
×
1944
  }
1945

1946
  pBlock->info = pDataBlock->info;
10✔
1947
  pBlock->info.pks[0].pData = NULL;
10✔
1948
  pBlock->info.pks[1].pData = NULL;
10✔
1949
  pBlock->info.rows = 0;
10✔
1950
  pBlock->info.capacity = 0;
10✔
1951

1952
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
10✔
1953
  for (int32_t i = 0; i < numOfCols; ++i) {
40✔
1954
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
30✔
1955
    if (p == NULL) {
30✔
1956
      blockDataDestroy(pBlock);
×
1957
      return terrno;
×
1958
    }
1959

1960
    SColumnInfoData  colInfo = {.hasNull = true, .info = p->info};
30✔
1961
    code = blockDataAppendColInfo(pBlock, &colInfo);
30✔
1962
    if (code) {
30✔
1963
      blockDataDestroy(pBlock);
×
1964
      return code;
×
1965
    }
1966
  }
1967

1968
  code = blockDataEnsureCapacity(pBlock, 1);
10✔
1969
  if (code != TSDB_CODE_SUCCESS) {
10✔
1970
    blockDataDestroy(pBlock);
×
1971
    return code;
×
1972
  }
1973

1974
  for (int32_t i = 0; i < numOfCols; ++i) {
40✔
1975
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
30✔
1976
    SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
30✔
1977
    if (pDst == NULL || pSrc == NULL) {
30✔
1978
      blockDataDestroy(pBlock);
×
1979
      return terrno;
×
1980
    }
1981

1982
    bool  isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL);
30✔
1983
    void* pData = NULL;
30✔
1984
    if (!isNull) {
30✔
1985
      pData = colDataGetData(pSrc, rowIdx);
30✔
1986
    }
1987

1988
    code = colDataSetVal(pDst, 0, pData, isNull);
30✔
1989
    if (code) {
30✔
1990
      blockDataDestroy(pBlock);
×
1991
      return code;
×
1992
    }
1993
  }
1994

1995
  pBlock->info.rows = 1;
10✔
1996

1997
  *pResBlock = pBlock;
10✔
1998
  return code;
10✔
1999
}
2000

2001
int32_t copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc) {
10,687✔
2002
  int32_t code = TSDB_CODE_SUCCESS;
10,687✔
2003
  int32_t lino = 0;
10,687✔
2004
  if (!IS_VAR_DATA_TYPE(pSrc->pks[0].type)) {
10,687✔
2005
    return code;
10,688✔
2006
  }
2007

2008
  // prepare the pk buffer if needed
2009
  SValue* p = &pDst->pks[0];
×
2010

2011
  p->type = pSrc->pks[0].type;
×
2012
  p->pData = taosMemoryCalloc(1, pSrc->pks[0].nData);
×
2013
  QUERY_CHECK_NULL(p->pData, code, lino, _end, terrno);
×
2014

2015
  p->nData = pSrc->pks[0].nData;
×
2016
  memcpy(p->pData, pSrc->pks[0].pData, p->nData);
×
2017

2018
  p = &pDst->pks[1];
×
2019
  p->type = pSrc->pks[1].type;
×
2020
  p->pData = taosMemoryCalloc(1, pSrc->pks[1].nData);
×
2021
  QUERY_CHECK_NULL(p->pData, code, lino, _end, terrno);
×
2022

2023
  p->nData = pSrc->pks[1].nData;
×
2024
  memcpy(p->pData, pSrc->pks[1].pData, p->nData);
×
2025

2026
_end:
×
2027
  if (code != TSDB_CODE_SUCCESS) {
×
2028
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2029
  }
2030
  return code;
×
2031
}
2032

2033
int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataBlock** pResBlock) {
7,173✔
2034
  QRY_PARAM_CHECK(pResBlock);
7,173✔
2035
  if (pDataBlock == NULL) {
7,173✔
2036
    return TSDB_CODE_INVALID_PARA;
×
2037
  }
2038

2039
  SSDataBlock* pDstBlock = NULL;
7,173✔
2040
  int32_t code = createDataBlock(&pDstBlock);
7,173✔
2041
  if (code) {
7,178✔
2042
    return code;
×
2043
  }
2044

2045
  pDstBlock->info = pDataBlock->info;
7,178✔
2046
  pDstBlock->info.pks[0].pData = NULL;
7,178✔
2047
  pDstBlock->info.pks[1].pData = NULL;
7,178✔
2048

2049
  pDstBlock->info.rows = 0;
7,178✔
2050
  pDstBlock->info.capacity = 0;
7,178✔
2051
  pDstBlock->info.rowSize = 0;
7,178✔
2052
  pDstBlock->info.id = pDataBlock->info.id;
7,178✔
2053
  pDstBlock->info.blankFill = pDataBlock->info.blankFill;
7,178✔
2054

2055
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
7,178✔
2056
  for (int32_t i = 0; i < numOfCols; ++i) {
28,540✔
2057
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
21,359✔
2058
    if (p == NULL) {
21,358✔
2059
      blockDataDestroy(pDstBlock);
×
2060
      return terrno;
×
2061
    }
2062

2063
    SColumnInfoData  colInfo = {.hasNull = true, .info = p->info};
21,358✔
2064
    code = blockDataAppendColInfo(pDstBlock, &colInfo);
21,358✔
2065
    if (code) {
21,361✔
2066
      blockDataDestroy(pDstBlock);
1✔
2067
      return code;
×
2068
    }
2069
  }
2070

2071
  code = copyPkVal(&pDstBlock->info, &pDataBlock->info);
7,181✔
2072
  if (code != TSDB_CODE_SUCCESS) {
7,175✔
2073
    blockDataDestroy(pDstBlock);
3✔
2074
    uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2075
    return code;
×
2076
  }
2077

2078
  if (copyData) {
7,172✔
2079
    code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows);
2,427✔
2080
    if (code != TSDB_CODE_SUCCESS) {
2,430✔
2081
      blockDataDestroy(pDstBlock);
×
2082
      return code;
×
2083
    }
2084

2085
    for (int32_t i = 0; i < numOfCols; ++i) {
12,597✔
2086
      SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
10,168✔
2087
      SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
10,167✔
2088
      if (pDst == NULL) {
10,165✔
2089
        blockDataDestroy(pDstBlock);
×
2090
        uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2091
        return terrno;
×
2092
      }
2093

2094
      if (pSrc == NULL) {
10,165✔
2095
        blockDataDestroy(pDstBlock);
×
2096
        uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2097
        return terrno;
×
2098
      }
2099

2100
      int32_t ret = colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
10,165✔
2101
      if (ret < 0) {
10,167✔
2102
        code = ret;
×
2103
        blockDataDestroy(pDstBlock);
×
2104
        return code;
×
2105
      }
2106
    }
2107

2108
    pDstBlock->info.rows = pDataBlock->info.rows;
2,429✔
2109
    pDstBlock->info.capacity = pDataBlock->info.rows;
2,429✔
2110
  }
2111

2112
  *pResBlock = pDstBlock;
7,174✔
2113
  return code;
7,174✔
2114
}
2115

2116
int32_t createOneDataBlockWithColArray(const SSDataBlock* pDataBlock, SArray* pColArray, SSDataBlock** pResBlock) {
×
2117
  int32_t      code = TSDB_CODE_SUCCESS;
×
2118
  int32_t      lino = 0;
×
2119
  SSDataBlock* pDstBlock = NULL;
×
2120

2121
  QRY_PARAM_CHECK(pResBlock);
×
2122
  QUERY_CHECK_NULL(pDataBlock, code, lino, _return, TSDB_CODE_INVALID_PARA);
×
2123

2124
  QUERY_CHECK_CODE(createDataBlock(&pDstBlock), lino, _return);
×
2125

2126
  pDstBlock->info = pDataBlock->info;
×
2127
  pDstBlock->info.pks[0].pData = NULL;
×
2128
  pDstBlock->info.pks[1].pData = NULL;
×
2129

2130
  pDstBlock->info.rows = 0;
×
2131
  pDstBlock->info.capacity = 0;
×
2132
  pDstBlock->info.rowSize = 0;
×
2133
  pDstBlock->info.id = pDataBlock->info.id;
×
2134
  pDstBlock->info.blankFill = pDataBlock->info.blankFill;
×
2135

2136
  for (int32_t i = 0; i < taosArrayGetSize(pColArray); ++i) {
×
2137
    SColIdPair *pColPair = taosArrayGet(pColArray, i);
×
2138
    QUERY_CHECK_NULL(pColPair, code, lino, _return, terrno);
×
2139

2140
    for (int32_t j = 0; j < taosArrayGetSize(pDataBlock->pDataBlock); ++j) {
×
2141
      SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, j);
×
2142
      if (p == NULL) {
×
2143
        continue;
×
2144
      }
2145

2146
      if (p->info.colId == pColPair->vtbColId) {
×
2147
        QUERY_CHECK_CODE(blockDataAppendColInfo(pDstBlock, p), lino, _return);
×
2148
        break;
×
2149
      }
2150
    }
2151
  }
2152

2153
  *pResBlock = pDstBlock;
×
2154
  return code;
×
2155
_return:
×
2156
  uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2157
  blockDataDestroy(pDstBlock);
×
2158
  return code;
×
2159
}
2160

2161
int32_t createOneDataBlockWithTwoBlock(const SSDataBlock* pDataBlock, const SSDataBlock* pOrgBlock, SSDataBlock** pResBlock) {
×
2162
  int32_t      code = TSDB_CODE_SUCCESS;
×
2163
  int32_t      lino = 0;
×
2164
  SSDataBlock *pDstBlock = NULL;
×
2165

2166
  QRY_PARAM_CHECK(pResBlock);
×
2167
  QUERY_CHECK_NULL(pDataBlock, code, lino, _return, TSDB_CODE_INVALID_PARA);
×
2168
  QUERY_CHECK_NULL(pOrgBlock, code, lino, _return, TSDB_CODE_INVALID_PARA);
×
2169

2170
  QUERY_CHECK_CODE(createOneDataBlock(pOrgBlock, false, &pDstBlock), lino, _return);
×
2171
  QUERY_CHECK_CODE(blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows), lino, _return);
×
2172

2173
  for (int32_t i = 0; i < taosArrayGetSize(pOrgBlock->pDataBlock); ++i) {
×
2174
    SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
×
2175
    SColumnInfoData* pSrc = taosArrayGet(pOrgBlock->pDataBlock, i);
×
2176

2177
    QUERY_CHECK_NULL(pDst, code, lino, _return, terrno);
×
2178
    QUERY_CHECK_NULL(pSrc, code, lino, _return, terrno);
×
2179

2180
    bool found = false;
×
2181
    for (int32_t j = 0; j < taosArrayGetSize(pDataBlock->pDataBlock); j++) {
×
2182
      SColumnInfoData *p = taosArrayGet(pDataBlock->pDataBlock, j);
×
2183
      if (p->info.slotId == pSrc->info.slotId) {
×
2184
        QUERY_CHECK_CODE(colDataAssign(pDst, p, (int32_t)pDataBlock->info.rows, &pDataBlock->info), lino, _return);
×
2185
        found = true;
×
2186
        break;
×
2187
      }
2188
    }
2189
    if (!found) {
×
2190
      colDataSetNNULL(pDst, 0, pDataBlock->info.rows);
×
2191
    }
2192

2193
  }
2194

2195
  pDstBlock->info.rows = pDataBlock->info.rows;
×
2196
  pDstBlock->info.capacity = pDataBlock->info.rows;
×
2197
  pDstBlock->info.window = pDataBlock->info.window;
×
2198

2199
  *pResBlock = pDstBlock;
×
2200
  return code;
×
2201
_return:
×
2202
  uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2203
  blockDataDestroy(pDstBlock);
×
2204
  return code;
×
2205
}
2206

2207
int32_t createDataBlock(SSDataBlock** pResBlock) {
14,747✔
2208
  QRY_PARAM_CHECK(pResBlock);
14,747✔
2209
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
14,747✔
2210
  if (pBlock == NULL) {
14,732✔
2211
    return terrno;
×
2212
  }
2213

2214
  pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
14,732✔
2215
  if (pBlock->pDataBlock == NULL) {
14,752✔
2216
    int32_t code = terrno;
×
2217
    taosMemoryFree(pBlock);
×
2218
    return code;
×
2219
  }
2220

2221
  *pResBlock = pBlock;
14,753✔
2222
  return 0;
14,753✔
2223
}
2224

2225
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData) {
56,105✔
2226
  if (pBlock->pDataBlock == NULL) {
56,105✔
2227
    pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
1,855✔
2228
    if (pBlock->pDataBlock == NULL) {
1,855✔
2229
      return terrno;
×
2230
    }
2231
  }
2232

2233
  void* p = taosArrayPush(pBlock->pDataBlock, pColInfoData);
56,105✔
2234
  if (p == NULL) {
56,098✔
2235
    return terrno;
×
2236
  }
2237

2238
  // todo disable it temporarily
2239
  //  A S S E R T(pColInfoData->info.type != 0);
2240
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
56,098✔
2241
    pBlock->info.hasVarCol = true;
13,079✔
2242
  }
2243

2244
  pBlock->info.rowSize += pColInfoData->info.bytes;
56,098✔
2245
  return TSDB_CODE_SUCCESS;
56,098✔
2246
}
2247

2248
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId) {
32,234✔
2249
  SColumnInfoData col = {.hasNull = true};
32,234✔
2250
  col.info.colId = colId;
32,234✔
2251
  col.info.type = type;
32,234✔
2252
  col.info.bytes = bytes;
32,234✔
2253

2254
  return col;
32,234✔
2255
}
2256

2257
int32_t bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index, SColumnInfoData** pColInfoData) {
1,794,188✔
2258
  int32_t code = 0;
1,794,188✔
2259
  QRY_PARAM_CHECK(pColInfoData);
1,794,188✔
2260

2261
  if (index >= taosArrayGetSize(pBlock->pDataBlock)) {
1,794,188✔
2262
    return TSDB_CODE_INVALID_PARA;
×
2263
  }
2264

2265
  *pColInfoData = taosArrayGet(pBlock->pDataBlock, index);
1,794,229✔
2266
  if (*pColInfoData == NULL) {
1,794,247✔
2267
    code = terrno;
×
2268
  }
2269

2270
  return code;
1,794,258✔
2271
}
2272

2273
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int32_t extraSize) {
155✔
2274
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
155✔
2275

2276
  int32_t payloadSize = pageSize - extraSize;
155✔
2277
  int32_t rowSize = pBlock->info.rowSize;
155✔
2278
  int32_t nRows = payloadSize / rowSize;
155✔
2279
  if (nRows < 1) {
155✔
2280
    uError("rows %d in page is too small, payloadSize:%d, rowSize:%d", nRows, payloadSize, rowSize);
×
2281
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2282
    return -1;
×
2283
  }
2284

2285
  int32_t numVarCols = 0;
155✔
2286
  int32_t numFixCols = 0;
155✔
2287
  for (int32_t i = 0; i < numOfCols; ++i) {
698✔
2288
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
543✔
2289
    if (pCol == NULL) {
543✔
2290
      return -1;
×
2291
    }
2292

2293
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
543✔
2294
      ++numVarCols;
74✔
2295
    } else {
2296
      ++numFixCols;
469✔
2297
    }
2298
  }
2299

2300
  // find the data payload whose size is greater than payloadSize
2301
  int result = -1;
155✔
2302
  int start = 1;
155✔
2303
  int end = nRows;
155✔
2304
  while (start <= end) {
1,250✔
2305
    int mid = start + (end - start) / 2;
1,095✔
2306
    // data size + var data type columns offset + fixed data type columns bitmap len
2307
    int midSize = rowSize * mid + numVarCols * sizeof(int32_t) * mid + numFixCols * BitmapLen(mid);
1,095✔
2308
    if (midSize > payloadSize) {
1,095✔
2309
      result = mid;
186✔
2310
      end = mid - 1;
186✔
2311
    } else {
2312
      start = mid + 1;
909✔
2313
    }
2314
  }
2315

2316
  int32_t newRows = (result != -1) ? result - 1 : nRows;
155✔
2317
  // the true value must be less than the value of nRows
2318
  if (newRows > nRows || newRows < 1) {
155✔
2319
    uError("invalid newRows:%d, nRows:%d", newRows, nRows);
×
2320
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2321
    return -1;
×
2322
  }
2323

2324
  return newRows;
155✔
2325
}
2326

2327
void colDataDestroy(SColumnInfoData* pColData) {
265,832✔
2328
  if (!pColData) {
265,832✔
2329
    return;
1✔
2330
  }
2331

2332
  if (IS_VAR_DATA_TYPE(pColData->info.type)) {
265,831✔
2333
    taosMemoryFreeClear(pColData->varmeta.offset);
128,052✔
2334
  } else {
2335
    taosMemoryFreeClear(pColData->nullbitmap);
137,779✔
2336
  }
2337

2338
  taosMemoryFreeClear(pColData->pData);
265,873✔
2339
}
2340

2341
static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
1✔
2342
  int32_t len = BitmapLen(total);
1✔
2343

2344
  int32_t newLen = BitmapLen(total - n);
1✔
2345
  if (n % 8 == 0) {
1✔
2346
    (void) memmove(nullBitmap, nullBitmap + n / 8, newLen);
×
2347
  } else {
2348
    int32_t  tail = n % 8;
1✔
2349
    int32_t  i = 0;
1✔
2350
    uint8_t* p = (uint8_t*)nullBitmap;
1✔
2351

2352
    if (n < 8) {
1✔
2353
      while (i < len) {
3✔
2354
        uint8_t v = p[i];  // source bitmap value
2✔
2355
        p[i] = (v << tail);
2✔
2356

2357
        if (i < len - 1) {
2✔
2358
          uint8_t next = p[i + 1];
1✔
2359
          p[i] |= (next >> (8 - tail));
1✔
2360
        }
2361

2362
        i += 1;
2✔
2363
      }
2364
    } else if (n > 8) {
×
2365
      int32_t remain = (total % 8 != 0 && total % 8 <= tail) ? 1 : 0;
×
2366
      int32_t gap = len - newLen - remain;
×
2367
      while (i < newLen) {
×
2368
        uint8_t v = p[i + gap];
×
2369
        p[i] = (v << tail);
×
2370

2371
        if (i < newLen - 1 + remain) {
×
2372
          uint8_t next = p[i + gap + 1];
×
2373
          p[i] |= (next >> (8 - tail));
×
2374
        }
2375

2376
        i += 1;
×
2377
      }
2378
    }
2379
  }
2380
}
1✔
2381

2382
static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, size_t end) {
×
2383
  int32_t dataOffset = -1;
×
2384
  int32_t dataLen = 0;
×
2385
  int32_t beigin = start;
×
2386
  while (beigin < end) {
×
2387
    int32_t offset = pColInfoData->varmeta.offset[beigin];
×
2388
    if (offset == -1) {
×
2389
      beigin++;
×
2390
      continue;
×
2391
    }
2392
    if (start != 0) {
×
2393
      pColInfoData->varmeta.offset[beigin] = dataLen;
×
2394
    }
2395
    char* data = pColInfoData->pData + offset;
×
2396
    if (dataOffset == -1) dataOffset = offset;  // mark the begin of data
×
2397
    int32_t type = pColInfoData->info.type;
×
2398
    if (type == TSDB_DATA_TYPE_JSON) {
×
2399
      dataLen += getJsonValueLen(data);
×
2400
    } else {
2401
      dataLen += varDataTLen(data);
×
2402
    }
2403
    beigin++;
×
2404
  }
2405

2406
  if (dataOffset > 0) {
×
2407
    (void) memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen);
×
2408
  }
2409

2410
  (void) memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t));
×
2411
  return dataLen;
×
2412
}
2413

2414
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
1✔
2415
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
1✔
2416
    // pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, n, total);
2417
    (void) memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t));
×
2418

2419
    // clear the offset value of the unused entries.
2420
    memset(&pColInfoData->varmeta.offset[total - n], 0, n);
×
2421
  } else {
2422
    int32_t bytes = pColInfoData->info.bytes;
1✔
2423
    (void) memmove(pColInfoData->pData, ((char*)pColInfoData->pData + n * bytes), (total - n) * bytes);
1✔
2424
    doShiftBitmap(pColInfoData->nullbitmap, n, total);
1✔
2425
  }
2426
}
1✔
2427

2428
int32_t blockDataTrimFirstRows(SSDataBlock* pBlock, size_t n) {
1✔
2429
  if (n == 0) {
1✔
2430
    return TSDB_CODE_SUCCESS;
×
2431
  }
2432

2433
  if (pBlock->info.rows <= n) {
1✔
2434
    blockDataEmpty(pBlock);
×
2435
  } else {
2436
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1✔
2437
    for (int32_t i = 0; i < numOfCols; ++i) {
2✔
2438
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
1✔
2439
      if (pColInfoData == NULL) {
1✔
2440
        return terrno;
×
2441
      }
2442

2443
      colDataTrimFirstNRows(pColInfoData, n, pBlock->info.rows);
1✔
2444
    }
2445

2446
    pBlock->info.rows -= n;
1✔
2447
  }
2448
  return TSDB_CODE_SUCCESS;
1✔
2449
}
2450

2451
static void colDataKeepFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
755✔
2452
  if (n >= total || n == 0) return;
755✔
2453
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
755✔
2454
    if (pColInfoData->varmeta.length != 0) {
71✔
2455
      int32_t newLen = pColInfoData->varmeta.offset[n];
71✔
2456
      if (-1 == newLen) {
71✔
2457
        for (int i = n - 1; i >= 0; --i) {
×
2458
          newLen = pColInfoData->varmeta.offset[i];
×
2459
          if (newLen != -1) {
×
2460
            if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
×
2461
              newLen += getJsonValueLen(pColInfoData->pData + newLen);
×
2462
            } else {
2463
              newLen += varDataTLen(pColInfoData->pData + newLen);
×
2464
            }
2465
            break;
×
2466
          }
2467
        }
2468
      }
2469
      if (newLen <= -1) {
71✔
2470
        uFatal("colDataKeepFirstNRows: newLen:%d  old:%d", newLen, pColInfoData->varmeta.length);
×
2471
      } else {
2472
        pColInfoData->varmeta.length = newLen;
71✔
2473
      }
2474
    }
2475
    // pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, 0, n);
2476
    memset(&pColInfoData->varmeta.offset[n], 0, total - n);
71✔
2477
  }
2478
}
2479

2480
void blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n) {
393✔
2481
  if (n == 0) {
393✔
2482
    blockDataEmpty(pBlock);
60✔
2483
    return ;
60✔
2484
  }
2485

2486
  if (pBlock->info.rows <= n) {
333✔
2487
    return ;
220✔
2488
  } else {
2489
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
113✔
2490
    for (int32_t i = 0; i < numOfCols; ++i) {
868✔
2491
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
755✔
2492
      if (pColInfoData == NULL) {
755✔
2493
        continue;
×
2494
      }
2495

2496
      colDataKeepFirstNRows(pColInfoData, n, pBlock->info.rows);
755✔
2497
    }
2498

2499
    pBlock->info.rows = n;
113✔
2500
  }
2501
}
2502

2503
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
4,794✔
2504
  int64_t tbUid = pBlock->info.id.uid;
4,794✔
2505
  int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
4,794✔
2506
  int16_t hasVarCol = pBlock->info.hasVarCol;
4,794✔
2507
  int64_t rows = pBlock->info.rows;
4,794✔
2508
  int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
4,794✔
2509

2510
  int32_t tlen = 0;
4,794✔
2511
  tlen += taosEncodeFixedI64(buf, tbUid);
4,794✔
2512
  tlen += taosEncodeFixedI16(buf, numOfCols);
4,794✔
2513
  tlen += taosEncodeFixedI16(buf, hasVarCol);
9,588✔
2514
  tlen += taosEncodeFixedI64(buf, rows);
4,794✔
2515
  tlen += taosEncodeFixedI32(buf, sz);
4,794✔
2516
  for (int32_t i = 0; i < sz; i++) {
10,720✔
2517
    SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
5,925✔
2518
    if (pColData == NULL) {
5,917✔
2519
      return terrno;
×
2520
    }
2521

2522
    tlen += taosEncodeFixedI16(buf, pColData->info.colId);
5,917✔
2523
    tlen += taosEncodeFixedI8(buf, pColData->info.type);
5,917✔
2524
    tlen += taosEncodeFixedI32(buf, pColData->info.bytes);
5,917✔
2525
    tlen += taosEncodeFixedBool(buf, pColData->hasNull);
5,917✔
2526

2527
    if (IS_VAR_DATA_TYPE(pColData->info.type)) {
5,917✔
2528
      tlen += taosEncodeBinary(buf, pColData->varmeta.offset, sizeof(int32_t) * rows);
996✔
2529
    } else {
2530
      tlen += taosEncodeBinary(buf, pColData->nullbitmap, BitmapLen(rows));
10,838✔
2531
    }
2532

2533
    int32_t len = colDataGetLength(pColData, rows);
5,917✔
2534
    tlen += taosEncodeFixedI32(buf, len);
5,926✔
2535

2536
    if (pColData->reassigned && IS_VAR_DATA_TYPE(pColData->info.type)) {
5,926✔
2537
      for (int32_t row = 0; row < rows; ++row) {
×
2538
        char*   pData = pColData->pData + pColData->varmeta.offset[row];
×
2539
        int32_t colSize = 0;
×
2540
        if (pColData->info.type == TSDB_DATA_TYPE_JSON) {
×
2541
          colSize = getJsonValueLen(pData);
×
2542
        } else {
2543
          colSize = varDataTLen(pData);
×
2544
        }
2545
        tlen += taosEncodeBinary(buf, pData, colSize);
×
2546
      }
2547
    } else {
2548
      tlen += taosEncodeBinary(buf, pColData->pData, len);
11,852✔
2549
    }
2550
  }
2551
  return tlen;
4,795✔
2552
}
2553

2554
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
2,396✔
2555
  int32_t sz = 0;
2,396✔
2556
  int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
2,396✔
2557

2558
  buf = taosDecodeFixedU64(buf, &pBlock->info.id.uid);
4,792✔
2559
  buf = taosDecodeFixedI16(buf, &numOfCols);
2,396✔
2560
  buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol);
2,396✔
2561
  buf = taosDecodeFixedI64(buf, &pBlock->info.rows);
4,792✔
2562
  buf = taosDecodeFixedI32(buf, &sz);
2,396✔
2563

2564
  pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
2,396✔
2565
  if (pBlock->pDataBlock == NULL) {
2,396✔
2566
    return NULL;
×
2567
  }
2568

2569
  for (int32_t i = 0; i < sz; i++) {
5,356✔
2570
    SColumnInfoData data = {0};
2,959✔
2571
    buf = taosDecodeFixedI16(buf, &data.info.colId);
2,959✔
2572
    buf = taosDecodeFixedI8(buf, &data.info.type);
2,959✔
2573
    buf = taosDecodeFixedI32(buf, &data.info.bytes);
2,959✔
2574
    buf = taosDecodeFixedBool(buf, &data.hasNull);
2,959✔
2575

2576
    if (IS_VAR_DATA_TYPE(data.info.type)) {
2,959✔
2577
      buf = taosDecodeBinary(buf, (void**)&data.varmeta.offset, pBlock->info.rows * sizeof(int32_t));
497✔
2578
    } else {
2579
      buf = taosDecodeBinary(buf, (void**)&data.nullbitmap, BitmapLen(pBlock->info.rows));
5,424✔
2580
    }
2581
    if(buf == NULL) {
2,962✔
2582
      uError("failed to decode null bitmap/offset, type:%d", data.info.type);
×
2583
      goto _error;
×
2584
    }
2585

2586
    int32_t len = 0;
2,962✔
2587
    buf = taosDecodeFixedI32(buf, &len);
2,962✔
2588
    buf = taosDecodeBinary(buf, (void**)&data.pData, len);
2,962✔
2589
    if (buf == NULL) {
2,959✔
2590
      uError("failed to decode data, type:%d", data.info.type);
×
2591
      goto _error;
×
2592
    }
2593
    if (IS_VAR_DATA_TYPE(data.info.type)) {
2,959✔
2594
      data.varmeta.length = len;
250✔
2595
      data.varmeta.allocLen = len;
250✔
2596
    }
2597

2598
    void* px = taosArrayPush(pBlock->pDataBlock, &data);
2,959✔
2599
    if (px == NULL) {
2,960✔
2600
      return NULL;
×
2601
    }
2602
  }
2603

2604
  return (void*)buf;
2,397✔
2605
_error:
×
2606
  for (int32_t i = 0; i < sz; ++i) {
×
2607
    SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
×
2608
    if (pColInfoData == NULL) {
×
2609
      break;
×
2610
    }
2611
    colDataDestroy(pColInfoData);
×
2612
  }
2613
  return NULL;
×
2614
}
2615

2616
static int32_t formatTimestamp(char* buf, size_t cap, int64_t val, int precision) {
×
2617
  time_t  tt;
2618
  int32_t ms = 0;
×
2619
  int32_t code = TSDB_CODE_SUCCESS;
×
2620
  int32_t lino = 0;
×
2621
  if (precision == TSDB_TIME_PRECISION_NANO) {
×
2622
    tt = (time_t)(val / 1000000000);
×
2623
    ms = val % 1000000000;
×
2624
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
×
2625
    tt = (time_t)(val / 1000000);
×
2626
    ms = val % 1000000;
×
2627
  } else {
2628
    tt = (time_t)(val / 1000);
×
2629
    ms = val % 1000;
×
2630
  }
2631

2632
  if (tt <= 0 && ms < 0) {
×
2633
    tt--;
×
2634
    if (precision == TSDB_TIME_PRECISION_NANO) {
×
2635
      ms += 1000000000;
×
2636
    } else if (precision == TSDB_TIME_PRECISION_MICRO) {
×
2637
      ms += 1000000;
×
2638
    } else {
2639
      ms += 1000;
×
2640
    }
2641
  }
2642
  struct tm ptm = {0};
×
2643
  if (taosLocalTime(&tt, &ptm, buf, cap, NULL) == NULL) {
×
2644
    code =  TSDB_CODE_INTERNAL_ERROR;
×
2645
    TSDB_CHECK_CODE(code, lino, _end);
×
2646
  }
2647

2648
  size_t pos = taosStrfTime(buf, cap, "%Y-%m-%d %H:%M:%S", &ptm);
×
2649
  if (pos == 0) {
×
2650
    code = TSDB_CODE_OUT_OF_BUFFER;
×
2651
    TSDB_CHECK_CODE(code, lino, _end);
×
2652
  }
2653
  int32_t nwritten = 0;
×
2654
  if (precision == TSDB_TIME_PRECISION_NANO) {
×
2655
    nwritten = snprintf(buf + pos, cap - pos, ".%09d", ms);
×
2656
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
×
2657
    nwritten = snprintf(buf + pos, cap - pos, ".%06d", ms);
×
2658
  } else {
2659
    nwritten = snprintf(buf + pos, cap - pos, ".%03d", ms);
×
2660
  }
2661

2662
  if (nwritten >= cap - pos) {
×
2663
    code = TSDB_CODE_OUT_OF_BUFFER;
×
2664
    TSDB_CHECK_CODE(code, lino, _end);
×
2665
  }
2666

2667
_end:
×
2668
  if (code != TSDB_CODE_SUCCESS) {
×
2669
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2670
  }
2671
  return code;
×
2672
}
2673

2674
// for debug
2675
int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, const char* taskIdStr) {
×
2676
  int32_t lino = 0;
×
2677
  int32_t size = 2048 * 1024;
×
2678
  int32_t code = 0;
×
2679
  char*   dumpBuf = NULL;
×
2680
  char    pBuf[TD_TIME_STR_LEN] = {0};
×
2681
  int32_t rows = pDataBlock->info.rows;
×
2682
  int32_t len = 0;
×
2683

2684
  dumpBuf = taosMemoryCalloc(size, 1);
×
2685
  if (dumpBuf == NULL) {
×
2686
    return terrno;
×
2687
  }
2688

2689
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
×
2690
  len += tsnprintf(dumpBuf + len, size - len,
×
2691
                  "%s===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64 "|rows:%" PRId64
2692
                  "|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
2693
                  taskIdStr, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId,
×
2694
                  pDataBlock->info.id.groupId, pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
2695
                  pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
×
2696
  goto _exit;
×
2697
  if (len >= size - 1) {
2698
    goto _exit;
2699
  }
2700

2701
  for (int32_t j = 0; j < rows; j++) {
2702
    len += tsnprintf(dumpBuf + len, size - len, "%s|", flag);
2703
    if (len >= size - 1) {
2704
      goto _exit;
2705
    }
2706

2707
    for (int32_t k = 0; k < colNum; k++) {
2708
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
2709
      if (pColInfoData == NULL) {
2710
        code = terrno;
2711
        lino = __LINE__;
2712
        goto _exit;
2713
      }
2714

2715
      if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) {
2716
        len += tsnprintf(dumpBuf + len, size - len, " %15s |", "NULL");
2717
        if (len >= size - 1) goto _exit;
2718
        continue;
2719
      }
2720

2721
      void* var = colDataGetData(pColInfoData, j);
2722
      switch (pColInfoData->info.type) {
2723
        case TSDB_DATA_TYPE_TIMESTAMP:
2724
          memset(pBuf, 0, sizeof(pBuf));
2725
          code = formatTimestamp(pBuf, sizeof(pBuf), *(uint64_t*)var, pColInfoData->info.precision);
2726
          if (code != TSDB_CODE_SUCCESS) {
2727
            TAOS_UNUSED(tsnprintf(pBuf, sizeof(pBuf), "NaN"));
2728
          }
2729
          len += tsnprintf(dumpBuf + len, size - len, " %25s |", pBuf);
2730
          if (len >= size - 1) goto _exit;
2731
          break;
2732
        case TSDB_DATA_TYPE_TINYINT:
2733
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(int8_t*)var);
2734
          if (len >= size - 1) goto _exit;
2735
          break;
2736
        case TSDB_DATA_TYPE_UTINYINT:
2737
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(uint8_t*)var);
2738
          if (len >= size - 1) goto _exit;
2739
          break;
2740
        case TSDB_DATA_TYPE_SMALLINT:
2741
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(int16_t*)var);
2742
          if (len >= size - 1) goto _exit;
2743
          break;
2744
        case TSDB_DATA_TYPE_USMALLINT:
2745
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(uint16_t*)var);
2746
          if (len >= size - 1) goto _exit;
2747
          break;
2748
        case TSDB_DATA_TYPE_INT:
2749
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var);
2750
          if (len >= size - 1) goto _exit;
2751
          break;
2752
        case TSDB_DATA_TYPE_UINT:
2753
          len += tsnprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var);
2754
          if (len >= size - 1) goto _exit;
2755
          break;
2756
        case TSDB_DATA_TYPE_BIGINT:
2757
          len += tsnprintf(dumpBuf + len, size - len, " %15" PRId64 " |", *(int64_t*)var);
2758
          if (len >= size - 1) goto _exit;
2759
          break;
2760
        case TSDB_DATA_TYPE_UBIGINT:
2761
          len += tsnprintf(dumpBuf + len, size - len, " %15" PRIu64 " |", *(uint64_t*)var);
2762
          if (len >= size - 1) goto _exit;
2763
          break;
2764
        case TSDB_DATA_TYPE_FLOAT:
2765
          len += tsnprintf(dumpBuf + len, size - len, " %15f |", *(float*)var);
2766
          if (len >= size - 1) goto _exit;
2767
          break;
2768
        case TSDB_DATA_TYPE_DOUBLE:
2769
          len += tsnprintf(dumpBuf + len, size - len, " %15f |", *(double*)var);
2770
          if (len >= size - 1) goto _exit;
2771
          break;
2772
        case TSDB_DATA_TYPE_BOOL:
2773
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var);
2774
          if (len >= size - 1) goto _exit;
2775
          break;
2776
        case TSDB_DATA_TYPE_VARCHAR:
2777
        case TSDB_DATA_TYPE_VARBINARY:
2778
        case TSDB_DATA_TYPE_GEOMETRY: {
2779
          memset(pBuf, 0, sizeof(pBuf));
2780
          char*   pData = colDataGetVarData(pColInfoData, j);
2781
          int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
2782
          dataSize = TMIN(dataSize, 50);
2783
          memcpy(pBuf, varDataVal(pData), dataSize);
2784
          len += tsnprintf(dumpBuf + len, size - len, " %15s |", pBuf);
2785
          if (len >= size - 1) goto _exit;
2786
        } break;
2787
        case TSDB_DATA_TYPE_NCHAR: {
2788
          char*   pData = colDataGetVarData(pColInfoData, j);
2789
          int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
2790
          memset(pBuf, 0, sizeof(pBuf));
2791
          code = taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf, NULL);
2792
          if (code < 0) {
2793
            uError("func %s failed to convert to ucs charset since %s", __func__, tstrerror(code));
2794
            lino = __LINE__;
2795
            goto _exit;
2796
          } else { // reset the length value
2797
            code = TSDB_CODE_SUCCESS;
2798
          }
2799
          len += tsnprintf(dumpBuf + len, size - len, " %15s |", pBuf);
2800
          if (len >= size - 1) goto _exit;
2801
        } break;
2802
      }
2803
    }
2804
    len += tsnprintf(dumpBuf + len, size - len, "%d\n", j);
2805
    if (len >= size - 1) goto _exit;
2806
  }
2807
  len += tsnprintf(dumpBuf + len, size - len, "%s |end\n", flag);
2808

2809
_exit:
×
2810
  if (code == TSDB_CODE_SUCCESS) {
×
2811
    *pDataBuf = dumpBuf;
×
2812
    dumpBuf = NULL;
×
2813
  } else {
2814
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2815
    if (dumpBuf) {
×
2816
      taosMemoryFree(dumpBuf);
×
2817
    }
2818
  }
2819
  return code;
×
2820
}
2821

2822
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, const STSchema* pTSchema,
×
2823
                                    int64_t uid, int32_t vgId, tb_uid_t suid) {
2824
  SSubmitReq2* pReq = *ppReq;
×
2825
  SArray*      pVals = NULL;
×
2826
  int32_t      sz = 1;
×
2827
  int32_t      code = 0;
×
2828
  *ppReq = NULL;
×
2829
  terrno = 0;
×
2830

2831
  if (NULL == pReq) {
×
2832
    if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
×
2833
      code = terrno;
×
2834
      goto _end;
×
2835
    }
2836

2837
    if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
×
2838
      code = terrno;
×
2839
      goto _end;
×
2840
    }
2841
  }
2842

2843
  for (int32_t i = 0; i < sz; ++i) {
×
2844
    int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
×
2845
    int32_t rows = pDataBlock->info.rows;
×
2846

2847
    if (colNum <= 1) {  // invalid if only with TS col
×
2848
      continue;
×
2849
    }
2850

2851
    // the rsma result should has the same column number with schema.
2852
    if (colNum != pTSchema->numOfCols) {
×
2853
      uError("colNum %d is not equal to numOfCols %d", colNum, pTSchema->numOfCols);
×
2854
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2855
      goto _end;
×
2856
    }
2857

2858
    SSubmitTbData tbData = {0};
×
2859

2860
    if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
×
2861
      code = terrno;
×
2862
      goto _end;
×
2863
    }
2864

2865
    tbData.suid = suid;
×
2866
    tbData.uid = uid;
×
2867
    tbData.sver = pTSchema->version;
×
2868

2869
    if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
×
2870
      code = terrno;
×
2871
      taosArrayDestroy(tbData.aRowP);
×
2872
      goto _end;
×
2873
    }
2874

2875
    for (int32_t j = 0; j < rows; ++j) {  // iterate by row
×
2876

2877
      taosArrayClear(pVals);
×
2878

2879
      bool    isStartKey = false;
×
2880
      int32_t offset = 0;
×
2881
      for (int32_t k = 0; k < colNum; ++k) {  // iterate by column
×
2882
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
×
2883
        if (pColInfoData == NULL) {
×
2884
          return terrno;
×
2885
        }
2886

2887
        const STColumn*  pCol = &pTSchema->columns[k];
×
2888
        void*            var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
×
2889

2890
        switch (pColInfoData->info.type) {
×
2891
          case TSDB_DATA_TYPE_TIMESTAMP:
×
2892
            if (pColInfoData->info.type != pCol->type) {
×
2893
              uError("colType:%d mismatch with sechma colType:%d", pColInfoData->info.type, pCol->type);
×
2894
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2895
              return terrno;
×
2896
            }
2897
            if (!isStartKey) {
×
2898
              isStartKey = true;
×
2899
              if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId) {
×
2900
                uError("the first timestamp colId %d is not primary colId", pCol->colId);
×
2901
                terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2902
                return terrno;
×
2903
              }
2904
              SValue val = {.type = pCol->type};
×
2905
              VALUE_SET_TRIVIAL_DATUM(&val, *(TSKEY*)var);
×
2906
              SColVal cv = COL_VAL_VALUE(pCol->colId, val);
×
2907
              void*   px = taosArrayPush(pVals, &cv);
×
2908
              if (px == NULL) {
×
2909
                return terrno;
×
2910
              }
2911

2912
            } else if (colDataIsNull_s(pColInfoData, j)) {
×
2913
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
2914
              void*   px = taosArrayPush(pVals, &cv);
×
2915
              if (px == NULL) {
×
2916
                return terrno;
×
2917
              }
2918
            } else {
2919
              SValue val = {.type = pCol->type};
×
2920
              VALUE_SET_TRIVIAL_DATUM(&val, *(int64_t*)var);
×
2921
              SColVal cv = COL_VAL_VALUE(pCol->colId, val);
×
2922
              void*   px = taosArrayPush(pVals, &cv);
×
2923
              if (px == NULL) {
×
2924
                return terrno;
×
2925
              }
2926
            }
2927
            break;
×
2928
          case TSDB_DATA_TYPE_NCHAR:
×
2929
          case TSDB_DATA_TYPE_VARBINARY:
2930
          case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
2931
            if (pColInfoData->info.type != pCol->type) {
×
2932
              uError("colType:%d mismatch with sechma colType:%d", pColInfoData->info.type, pCol->type);
×
2933
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2934
              return terrno;
×
2935
            }
2936
            if (colDataIsNull_s(pColInfoData, j)) {
×
2937
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
2938
              void* px = taosArrayPush(pVals, &cv);
×
2939
              if (px == NULL) {
×
2940
                goto _end;
×
2941
              }
2942
            } else {
2943
              void*  data = colDataGetVarData(pColInfoData, j);
×
2944
              SValue sv = (SValue){
×
2945
                  .type = pCol->type, .nData = varDataLen(data), .pData = (uint8_t*) varDataVal(data)};  // address copy, no value
×
2946
              SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
×
2947
              void* px = taosArrayPush(pVals, &cv);
×
2948
              if (px == NULL) {
×
2949
                code = terrno;
×
2950
                goto _end;
×
2951
              }
2952
            }
2953
            break;
×
2954
          }
2955
          case TSDB_DATA_TYPE_DECIMAL:
×
2956
          case TSDB_DATA_TYPE_BLOB:
2957
          case TSDB_DATA_TYPE_JSON:
2958
          case TSDB_DATA_TYPE_MEDIUMBLOB:
2959
            uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
2960
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2961
            return terrno;
×
2962
            break;
2963
          default:
×
2964
            if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
×
2965
              if (colDataIsNull_s(pColInfoData, j)) {
×
2966
                SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);  // should use pCol->type
×
2967
                void* px = taosArrayPush(pVals, &cv);
×
2968
                if (px == NULL) {
×
2969
                  goto _end;
×
2970
                }
2971
              } else {
2972
                SValue sv = {.type = pCol->type};
×
2973
                if (pCol->type == pColInfoData->info.type) {
×
2974
                  valueSetDatum(&sv, sv.type, var, tDataTypes[pCol->type].bytes);
×
2975
                } else {
2976
                  /**
2977
                   *  1. sum/avg would convert to int64_t/uint64_t/double during aggregation
2978
                   *  2. below conversion may lead to overflow or loss, the app should select the right data type.
2979
                   */
2980
                  char tv[DATUM_MAX_SIZE] = {0};
×
2981
                  if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) {
×
2982
                    float v = 0;
×
2983
                    GET_TYPED_DATA(v, float, pColInfoData->info.type, var, typeGetTypeModFromColInfo(&pColInfoData->info));
×
2984
                    SET_TYPED_DATA(&tv, pCol->type, v);
×
2985
                  } else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) {
×
2986
                    double v = 0;
×
2987
                    GET_TYPED_DATA(v, double, pColInfoData->info.type, var, typeGetTypeModFromColInfo(&pColInfoData->info));
×
2988
                    SET_TYPED_DATA(&tv, pCol->type, v);
×
2989
                  } else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) {
×
2990
                    int64_t v = 0;
×
2991
                    GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var, typeGetTypeModFromColInfo(&pColInfoData->info));
×
2992
                    SET_TYPED_DATA(&tv, pCol->type, v);
×
2993
                  } else {
2994
                    uint64_t v = 0;
×
2995
                    GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var, typeGetTypeModFromColInfo(&pColInfoData->info));
×
2996
                    SET_TYPED_DATA(&tv, pCol->type, v);
×
2997
                  }
2998
                  valueSetDatum(&sv, sv.type, tv, tDataTypes[pCol->type].bytes);
×
2999
                }
3000
                SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
×
3001
                void* px = taosArrayPush(pVals, &cv);
×
3002
                if (px == NULL) {
×
3003
                  code = terrno;
×
3004
                  goto _end;
×
3005
                }
3006
              }
3007
            } else {
3008
              uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
3009
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3010
              return terrno;
×
3011
            }
3012
            break;
×
3013
        }
3014
      }
3015
      SRow* pRow = NULL;
×
3016
      if ((code = tRowBuild(pVals, pTSchema, &pRow)) < 0) {
×
3017
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
3018
        goto _end;
×
3019
      }
3020

3021
      void* px = taosArrayPush(tbData.aRowP, &pRow);
×
3022
      if (px == NULL) {
×
3023
        code = terrno;
×
3024
        goto _end;
×
3025
      }
3026
    }
3027

3028
    void* px = taosArrayPush(pReq->aSubmitTbData, &tbData);
×
3029
    if (px == NULL) {
×
3030
      code = terrno;
×
3031
      goto _end;
×
3032
    }
3033
  }
3034

3035
_end:
×
3036
  taosArrayDestroy(pVals);
×
3037
  if (code != 0) {
×
3038
    if (pReq) {
×
3039
      tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
3040
      taosMemoryFreeClear(pReq);
×
3041
    }
3042
  } else {
3043
    *ppReq = pReq;
×
3044
  }
3045

3046
  return code;
×
3047
}
3048

3049
// Construct the child table name in the form of <ctbName>_<stbName>_<groupId> and store it in `ctbName`.
3050
int32_t buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId, size_t cap) {
19✔
3051
  int32_t   code = TSDB_CODE_SUCCESS;
19✔
3052
  int32_t   lino = 0;
19✔
3053
  char      tmp[TSDB_TABLE_NAME_LEN] = {0};
19✔
3054

3055
  if (ctbName == NULL || cap < TSDB_TABLE_NAME_LEN) {
19✔
3056
    code = TSDB_CODE_INTERNAL_ERROR;
2✔
3057
    TSDB_CHECK_CODE(code, lino, _end);
2✔
3058
  }
3059

3060
  if (stbName == NULL) {
17✔
3061
    snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId);
2✔
3062
  } else {
3063
    int32_t i = strlen(stbName) - 1;
15✔
3064
    for (; i >= 0; i--) {
228✔
3065
      if (stbName[i] == '.') {
224✔
3066
        break;
11✔
3067
      }
3068
    }
3069
    snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%" PRIu64, stbName + i + 1, groupId);
15✔
3070
  }
3071

3072
  ctbName[cap - strlen(tmp) - 1] = 0;  // put stbname + groupId to the end
17✔
3073
  size_t prefixLen = strlen(ctbName);
17✔
3074
  ctbName = strncat(ctbName, tmp, cap - prefixLen - 1);
17✔
3075

3076
  for (char* p = ctbName; *p; ++p) {
637✔
3077
    if (*p == '.') *p = '_';
620✔
3078
  }
3079

3080
_end:
17✔
3081
  if (code != TSDB_CODE_SUCCESS) {
19✔
3082
    uError("%s failed at line %d since %s, ctbName:%s", __func__, lino, tstrerror(code), ctbName);
2✔
3083
  }
3084
  return code;
19✔
3085
}
3086

3087
// auto stream subtable name starts with 't_', followed by the first segment of MD5 digest for group vals.
3088
// the total length is fixed to be 34 bytes.
3089
bool isAutoTableName(char* ctbName) { return (strlen(ctbName) == 34 && ctbName[0] == 't' && ctbName[1] == '_'); }
32✔
3090

3091
bool alreadyAddGroupId(char* ctbName, int64_t groupId) {
24✔
3092
  char tmp[64] = {0};
24✔
3093
  snprintf(tmp, sizeof(tmp), "%" PRIu64, groupId);
24✔
3094
  size_t len1 = strlen(ctbName);
24✔
3095
  size_t len2 = strlen(tmp);
24✔
3096
  if (len1 < len2) return false;
24✔
3097
  return memcmp(ctbName + len1 - len2, tmp, len2) == 0;
13✔
3098
}
3099

3100
int32_t buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId, char** pName) {
×
3101
  QRY_PARAM_CHECK(pName);
×
3102

3103
  char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
×
3104
  if (!pBuf) {
×
3105
    return terrno;
×
3106
  }
3107

3108
  int32_t code = buildCtbNameByGroupIdImpl(stbFullName, groupId, pBuf);
×
3109
  if (code != TSDB_CODE_SUCCESS) {
×
3110
    taosMemoryFree(pBuf);
×
3111
  } else {
3112
    *pName = pBuf;
×
3113
  }
3114

3115
  return code;
×
3116
}
3117

3118
int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, char* cname) {
12✔
3119
  if (stbFullName[0] == 0) {
12✔
3120
    return TSDB_CODE_INVALID_PARA;
×
3121
  }
3122

3123
  SArray* tags = taosArrayInit(0, sizeof(SSmlKv));
12✔
3124
  if (tags == NULL) {
12✔
3125
    return terrno;
×
3126
  }
3127

3128
  if (cname == NULL) {
12✔
3129
    taosArrayDestroy(tags);
×
3130
    return TSDB_CODE_INVALID_PARA;
×
3131
  }
3132

3133
  int8_t      type = TSDB_DATA_TYPE_UBIGINT;
12✔
3134
  const char* name = "group_id";
12✔
3135
  int32_t     len = strlen(name);
12✔
3136

3137
  SSmlKv pTag = {.key = name, .keyLen = len, .type = type, .u = groupId, .length = sizeof(uint64_t)};
12✔
3138
  void*  px = taosArrayPush(tags, &pTag);
12✔
3139
  if (px == NULL) {
12✔
3140
    return terrno;
×
3141
  }
3142

3143
  RandTableName rname = {
12✔
3144
      .tags = tags, .stbFullName = stbFullName, .stbFullNameLen = strlen(stbFullName), .ctbShortName = cname};
12✔
3145

3146
  int32_t code = buildChildTableName(&rname);
12✔
3147
  if (code != TSDB_CODE_SUCCESS) {
12✔
3148
    return code;
×
3149
  }
3150

3151
  taosArrayDestroy(tags);
12✔
3152
  if ((rname.ctbShortName && rname.ctbShortName[0]) == 0) {
12✔
3153
    return TSDB_CODE_INVALID_PARA;
×
3154
  }
3155

3156
  return code;
12✔
3157
}
3158

3159
int32_t buildSinkDestTableName(char* parTbName, const char* stbFullName, uint64_t gid, bool newSubTableRule,
×
3160
                               char** dstTableName) {
3161
  int32_t code = TSDB_CODE_SUCCESS;
×
3162
  int32_t lino = 0;
×
3163

3164
  if (parTbName[0]) {
×
3165
    if (newSubTableRule && !isAutoTableName(parTbName) && !alreadyAddGroupId(parTbName, gid) && gid != 0 &&
×
3166
        stbFullName) {
3167
      *dstTableName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
×
3168
      TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);
×
3169

3170
      tstrncpy(*dstTableName, parTbName, TSDB_TABLE_NAME_LEN);
×
3171
      code = buildCtbNameAddGroupId(stbFullName, *dstTableName, gid, TSDB_TABLE_NAME_LEN);
×
3172
      TSDB_CHECK_CODE(code, lino, _end);
×
3173
    } else {
3174
      *dstTableName = taosStrdup(parTbName);
×
3175
      TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);
×
3176
    }
3177
  } else {
3178
    code = buildCtbNameByGroupId(stbFullName, gid, dstTableName);
×
3179
    TSDB_CHECK_CODE(code, lino, _end);
×
3180
  }
3181

3182
_end:
×
3183
  return code;
×
3184
}
3185

3186
// return length of encoded data, return -1 if failed
3187
int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, int32_t numOfCols) {
213,826✔
3188
  int32_t code = blockDataCheck(pBlock);
213,826✔
3189
  if (code != TSDB_CODE_SUCCESS) {
213,830✔
3190
    terrno = code;
1✔
3191
    return -1;
×
3192
  }
3193

3194
  int32_t dataLen = 0;
213,829✔
3195

3196
  // todo extract method
3197
  int32_t* version = (int32_t*)data;
213,829✔
3198
  *version = BLOCK_VERSION_1;
213,829✔
3199
  data += sizeof(int32_t);
213,829✔
3200

3201
  int32_t* actualLen = (int32_t*)data;
213,829✔
3202
  data += sizeof(int32_t);
213,829✔
3203

3204
  int32_t* rows = (int32_t*)data;
213,829✔
3205
  *rows = pBlock->info.rows;
213,829✔
3206
  data += sizeof(int32_t);
213,829✔
3207
  if (*rows <= 0) {
213,829✔
3208
    uError("Invalid rows %d in block", *rows);
×
3209
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3210
    return -1;
×
3211
  }
3212

3213
  int32_t* cols = (int32_t*)data;
213,829✔
3214
  *cols = numOfCols;
213,829✔
3215
  data += sizeof(int32_t);
213,829✔
3216

3217
  // flag segment.
3218
  // the inital bit is for column info
3219
  int32_t* flagSegment = (int32_t*)data;
213,829✔
3220
  *flagSegment = (1 << 31);
213,829✔
3221

3222
  data += sizeof(int32_t);
213,829✔
3223

3224
  uint64_t* groupId = (uint64_t*)data;
213,829✔
3225
  data += sizeof(uint64_t);
213,829✔
3226

3227
  for (int32_t i = 0; i < numOfCols; ++i) {
1,284,811✔
3228
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
1,070,982✔
3229
    if (pColInfoData == NULL) {
1,070,982✔
3230
      return -1;
×
3231
    }
3232

3233
    *((int8_t*)data) = pColInfoData->info.type;
1,070,982✔
3234
    data += sizeof(int8_t);
1,070,982✔
3235

3236
    int32_t bytes = pColInfoData->info.bytes;
1,070,982✔
3237
    *((int32_t*)data) = bytes;
1,070,982✔
3238
    if (IS_DECIMAL_TYPE(pColInfoData->info.type)) {
1,070,982✔
3239
      fillBytesForDecimalType((int32_t*)data, pColInfoData->info.type, pColInfoData->info.precision,
×
3240
                              pColInfoData->info.scale);
×
3241
    }
3242
    data += sizeof(int32_t);
1,070,982✔
3243
  }
3244

3245
  int32_t* colSizes = (int32_t*)data;
213,829✔
3246
  data += numOfCols * sizeof(int32_t);
213,829✔
3247

3248
  dataLen = blockDataGetSerialMetaSize(numOfCols);
213,829✔
3249

3250
  int32_t numOfRows = pBlock->info.rows;
213,826✔
3251
  for (int32_t col = 0; col < numOfCols; ++col) {
1,284,791✔
3252
    SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);
1,070,945✔
3253
    if (pColRes == NULL) {
1,070,959✔
3254
      return -1;
×
3255
    }
3256

3257
    // copy the null bitmap
3258
    size_t metaSize = 0;
1,070,959✔
3259
    if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
1,070,959✔
3260
      metaSize = numOfRows * sizeof(int32_t);
203,228✔
3261
      if(dataLen + metaSize > dataBuflen) goto _exit;
203,228✔
3262
      memcpy(data, pColRes->varmeta.offset, metaSize);
203,228✔
3263
    } else {
3264
      metaSize = BitmapLen(numOfRows);
867,731✔
3265
      if(dataLen + metaSize > dataBuflen) goto _exit;
867,731✔
3266
      memcpy(data, pColRes->nullbitmap, metaSize);
867,731✔
3267
    }
3268

3269
    data += metaSize;
1,070,959✔
3270
    dataLen += metaSize;
1,070,959✔
3271

3272
    if (pColRes->reassigned && IS_VAR_DATA_TYPE(pColRes->info.type)) {
1,070,959✔
3273
      colSizes[col] = 0;
×
3274
      for (int32_t row = 0; row < numOfRows; ++row) {
×
3275
        char*   pColData = pColRes->pData + pColRes->varmeta.offset[row];
×
3276
        int32_t colSize = 0;
×
3277
        if (pColRes->info.type == TSDB_DATA_TYPE_JSON) {
×
3278
          colSize = getJsonValueLen(pColData);
×
3279
        } else {
3280
          colSize = varDataTLen(pColData);
×
3281
        }
3282
        colSizes[col] += colSize;
×
3283
        dataLen += colSize;
×
3284
        if(dataLen > dataBuflen) goto _exit;
×
3285
        (void) memmove(data, pColData, colSize);
×
3286
        data += colSize;
×
3287
      }
3288
    } else {
3289
      colSizes[col] = colDataGetLength(pColRes, numOfRows);
1,070,959✔
3290
      dataLen += colSizes[col];
1,070,965✔
3291
      if(dataLen > dataBuflen) goto _exit;
1,070,965✔
3292
      if (pColRes->pData != NULL) {
1,070,965✔
3293
        (void) memmove(data, pColRes->pData, colSizes[col]);
1,070,109✔
3294
      }
3295
      data += colSizes[col];
1,070,965✔
3296
    }
3297

3298
    if (colSizes[col] <= 0 && !colDataIsNull_s(pColRes, 0) && pColRes->info.type != TSDB_DATA_TYPE_NULL) {
1,071,824✔
3299
      uError("Invalid colSize:%d colIdx:%d colType:%d while encoding block", colSizes[col], col, pColRes->info.type);
×
3300
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3301
      return -1;
×
3302
    }
3303
    
3304
    colSizes[col] = htonl(colSizes[col]);
1,070,965✔
3305
    //    uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type,
3306
    //    htonl(colSizes[col]), colSizes[col]);
3307
  }
3308

3309
  bool* blankFill = (bool*)data;
213,846✔
3310
  *blankFill = pBlock->info.blankFill;
213,846✔
3311
  data += sizeof(bool);
213,846✔
3312

3313
  *actualLen = dataLen;
213,846✔
3314
#ifndef NO_UNALIGNED_ACCESS
3315
  *groupId = pBlock->info.id.groupId;
213,846✔
3316
#else
3317
  taosSetPUInt64Aligned(groupId, &pBlock->info.id.groupId);
3318
#endif
3319
  if (dataLen > dataBuflen) goto _exit;
213,846✔
3320

3321
  return dataLen;
213,846✔
3322

3323
_exit:
×
3324
  uError("blockEncode dataLen:%d, dataBuflen:%zu", dataLen, dataBuflen);
×
3325
  terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3326
  return -1;
×
3327
}
3328

3329
int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos) {
4,057✔
3330
  const char* pStart = pData;
4,057✔
3331

3332
  int32_t version = *(int32_t*)pStart;
4,057✔
3333
  pStart += sizeof(int32_t);
4,057✔
3334

3335
  // total length sizeof(int32_t)
3336
  int32_t dataLen = *(int32_t*)pStart;
4,057✔
3337
  pStart += sizeof(int32_t);
4,057✔
3338

3339
  // total rows sizeof(int32_t)
3340
  int32_t numOfRows = *(int32_t*)pStart;
4,057✔
3341
  pStart += sizeof(int32_t);
4,057✔
3342
  if (numOfRows <= 0) {
4,057✔
3343
    uError("block decode numOfRows:%d error", numOfRows);
×
3344
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3345
    return terrno;
×
3346
  }
3347

3348
  // total columns sizeof(int32_t)
3349
  int32_t numOfCols = *(int32_t*)pStart;
4,057✔
3350
  pStart += sizeof(int32_t);
4,057✔
3351

3352
  // has column info segment
3353
  int32_t flagSeg = *(int32_t*)pStart;
4,057✔
3354
  int32_t hasColumnInfo = (flagSeg >> 31);
4,057✔
3355
  pStart += sizeof(int32_t);
4,057✔
3356

3357
  // group id sizeof(uint64_t)
3358
#ifndef NO_UNALIGNED_ACCESS
3359
  pBlock->info.id.groupId = *(uint64_t*)pStart;
4,057✔
3360
#else
3361
  taosSetPUInt64Aligned(&pBlock->info.id.groupId, (uint64_t*)pStart);
3362
#endif
3363
  pStart += sizeof(uint64_t);
4,057✔
3364

3365
  if (pBlock->pDataBlock == NULL) {
4,057✔
3366
    pBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols);
602✔
3367
    if (pBlock->pDataBlock == NULL) {
602✔
3368
      return terrno;
×
3369
    }
3370
  }
3371

3372
  for (int32_t i = 0; i < numOfCols; ++i) {
15,794✔
3373
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
11,737✔
3374
    if (pColInfoData == NULL) {
11,737✔
3375
      return terrno;
×
3376
    }
3377

3378
    pColInfoData->info.type = *(int8_t*)pStart;
11,737✔
3379
    pStart += sizeof(int8_t);
11,737✔
3380

3381
    pColInfoData->info.bytes = *(int32_t*)pStart;
11,737✔
3382
    if (IS_DECIMAL_TYPE(pColInfoData->info.type)) {
11,737✔
3383
      extractDecimalTypeInfoFromBytes(&pColInfoData->info.bytes, &pColInfoData->info.precision,
×
3384
                                      &pColInfoData->info.scale);
3385
    }
3386
    pStart += sizeof(int32_t);
11,737✔
3387

3388
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
11,737✔
3389
      pBlock->info.hasVarCol = true;
2,449✔
3390
    }
3391
  }
3392

3393
  int32_t code = blockDataEnsureCapacity(pBlock, numOfRows);
4,057✔
3394
  if (code) {
4,057✔
3395
    return code;
×
3396
  }
3397

3398
  int32_t* colLen = (int32_t*)pStart;
4,057✔
3399
  pStart += sizeof(int32_t) * numOfCols;
4,057✔
3400

3401
  for (int32_t i = 0; i < numOfCols; ++i) {
15,793✔
3402
    colLen[i] = htonl(colLen[i]);
11,736✔
3403
    if (colLen[i] < 0) {
11,736✔
3404
      uError("block decode colLen:%d error, colIdx:%d", colLen[i], i);
×
3405
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3406
      return terrno;
×
3407
    }
3408

3409
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
11,736✔
3410
    if (pColInfoData == NULL) {
11,735✔
3411
      return terrno;
×
3412
    }
3413

3414
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
11,736✔
3415
      memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows);
2,449✔
3416
      pStart += sizeof(int32_t) * numOfRows;
2,449✔
3417

3418
      if (colLen[i] > 0 && pColInfoData->varmeta.allocLen < colLen[i]) {
2,449✔
3419
        char* tmp = taosMemoryRealloc(pColInfoData->pData, colLen[i]);
1,837✔
3420
        if (tmp == NULL) {
1,837✔
3421
          return terrno;
×
3422
        }
3423

3424
        pColInfoData->pData = tmp;
1,837✔
3425
        pColInfoData->varmeta.allocLen = colLen[i];
1,837✔
3426
      }
3427

3428
      pColInfoData->varmeta.length = colLen[i];
2,449✔
3429
    } else {
3430
      memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
9,287✔
3431
      pStart += BitmapLen(numOfRows);
9,287✔
3432
    }
3433

3434
    // TODO
3435
    // setting this flag to true temporarily so aggregate function on stable will
3436
    // examine NULL value for non-primary key column
3437
    pColInfoData->hasNull = true;
11,736✔
3438

3439
    if (colLen[i] > 0) {
11,736✔
3440
      memcpy(pColInfoData->pData, pStart, colLen[i]);
11,287✔
3441
    } else if (!colDataIsNull_s(pColInfoData, 0) && pColInfoData->info.type != TSDB_DATA_TYPE_NULL) {
449✔
3442
      uError("block decode colLen:%d error, colIdx:%d, type:%d", colLen[i], i, pColInfoData->info.type);
×
3443
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3444
      return terrno;
×
3445
    }
3446

3447
    pStart += colLen[i];
11,736✔
3448
  }
3449

3450
  bool blankFill = *(bool*)pStart;
4,057✔
3451
  pStart += sizeof(bool);
4,057✔
3452

3453
  pBlock->info.dataLoad = 1;
4,057✔
3454
  pBlock->info.rows = numOfRows;
4,057✔
3455
  pBlock->info.blankFill = blankFill;
4,057✔
3456
  if (pStart - pData != dataLen) {
4,057✔
3457
    uError("block decode msg len error, pStart:%p, pData:%p, dataLen:%d", pStart, pData, dataLen);
×
3458
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3459
    return terrno;
×
3460
  }
3461

3462
  *pEndPos = pStart;
4,057✔
3463

3464
  code = blockDataCheck(pBlock);
4,057✔
3465
  if (code != TSDB_CODE_SUCCESS) {
4,057✔
3466
    terrno = code;
×
3467
    return code;
×
3468
  }
3469

3470
  return TSDB_CODE_SUCCESS;
4,057✔
3471
}
3472

3473
int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) {
6,578✔
3474
  //  int32_t totalRows = pBlock->info.rows;
3475
  int32_t code = 0;
6,578✔
3476
  int32_t bmLen = BitmapLen(totalRows);
6,578✔
3477
  char*   pBitmap = NULL;
6,578✔
3478
  int32_t maxRows = 0;
6,578✔
3479

3480
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
6,578✔
3481
  if (!pBoolList) {
6,578✔
3482
    for (int32_t i = 0; i < numOfCols; ++i) {
38,580✔
3483
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
32,150✔
3484
      // it is a reserved column for scalar function, and no data in this column yet.
3485
      if (pDst->pData == NULL) {
32,152✔
3486
        continue;
1✔
3487
      }
3488

3489
      int32_t numOfRows = 0;
32,151✔
3490
      if (IS_VAR_DATA_TYPE(pDst->info.type)) {
32,151✔
3491
        pDst->varmeta.length = 0;
6,426✔
3492
      } else {
3493
        memset(pDst->nullbitmap, 0, bmLen);
25,725✔
3494
      }
3495
    }
3496
    return code;
6,430✔
3497
  }
3498

3499
  for (int32_t i = 0; i < numOfCols; ++i) {
737✔
3500
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
587✔
3501
    // it is a reserved column for scalar function, and no data in this column yet.
3502
    if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) {
587✔
3503
      continue;
×
3504
    }
3505

3506
    int32_t numOfRows = 0;
587✔
3507
    if (IS_VAR_DATA_TYPE(pDst->info.type)) {
678✔
3508
      int32_t j = 0;
91✔
3509
      pDst->varmeta.length = 0;
91✔
3510

3511
      while (j < totalRows) {
158,360✔
3512
        if (pBoolList[j] == 0) {
158,269✔
3513
          j += 1;
158,122✔
3514
          continue;
158,122✔
3515
        }
3516

3517
        if (colDataIsNull_var(pDst, j)) {
147✔
3518
          colDataSetNull_var(pDst, numOfRows);
×
3519
        } else {
3520
          // fix address sanitizer error. p1 may point to memory that will change during realloc of colDataSetVal, first
3521
          // copy it to p2
3522
          char*   p1 = colDataGetVarData(pDst, j);
147✔
3523
          int32_t len = 0;
147✔
3524
          if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
147✔
3525
            len = getJsonValueLen(p1);
×
3526
          } else {
3527
            len = varDataTLen(p1);
147✔
3528
          }
3529

3530
          char* p2 = taosMemoryMalloc(len);
147✔
3531
          if (p2 == NULL) {
147✔
3532
            return terrno;
×
3533
          }
3534

3535
          memcpy(p2, p1, len);
147✔
3536
          code = colDataSetVal(pDst, numOfRows, p2, false);
147✔
3537
          taosMemoryFree(p2);
147✔
3538
          if (code) {
147✔
3539
            return code;
×
3540
          }
3541
        }
3542
        numOfRows += 1;
147✔
3543
        j += 1;
147✔
3544
      }
3545

3546
      if (maxRows < numOfRows) {
91✔
3547
        maxRows = numOfRows;
11✔
3548
      }
3549
    } else {
3550
      if (pBitmap == NULL) {
496✔
3551
        pBitmap = taosMemoryCalloc(1, bmLen);
144✔
3552
        if (pBitmap == NULL) {
144✔
3553
          return terrno;
×
3554
        }
3555
      }
3556

3557
      memcpy(pBitmap, pDst->nullbitmap, bmLen);
496✔
3558
      memset(pDst->nullbitmap, 0, bmLen);
496✔
3559

3560
      int32_t j = 0;
496✔
3561

3562
      switch (pDst->info.type) {
496✔
3563
        case TSDB_DATA_TYPE_BIGINT:
208✔
3564
        case TSDB_DATA_TYPE_UBIGINT:
3565
        case TSDB_DATA_TYPE_DOUBLE:
3566
        case TSDB_DATA_TYPE_TIMESTAMP:
3567
          while (j < totalRows) {
166,143✔
3568
            if (pBoolList[j] == 0) {
165,935✔
3569
              j += 1;
161,605✔
3570
              continue;
161,605✔
3571
            }
3572

3573
            if (colDataIsNull_f(pBitmap, j)) {
4,330✔
3574
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
3✔
3575
            } else {
3576
              ((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j];
4,327✔
3577
            }
3578
            numOfRows += 1;
4,330✔
3579
            j += 1;
4,330✔
3580
          }
3581
          break;
208✔
3582
        case TSDB_DATA_TYPE_FLOAT:
266✔
3583
        case TSDB_DATA_TYPE_INT:
3584
        case TSDB_DATA_TYPE_UINT:
3585
          while (j < totalRows) {
478,523✔
3586
            if (pBoolList[j] == 0) {
478,257✔
3587
              j += 1;
475,798✔
3588
              continue;
475,798✔
3589
            }
3590
            if (colDataIsNull_f(pBitmap, j)) {
2,459✔
3591
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
6✔
3592
            } else {
3593
              ((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j];
2,453✔
3594
            }
3595
            numOfRows += 1;
2,459✔
3596
            j += 1;
2,459✔
3597
          }
3598
          break;
266✔
3599
        case TSDB_DATA_TYPE_SMALLINT:
5✔
3600
        case TSDB_DATA_TYPE_USMALLINT:
3601
          while (j < totalRows) {
32✔
3602
            if (pBoolList[j] == 0) {
27✔
3603
              j += 1;
12✔
3604
              continue;
12✔
3605
            }
3606
            if (colDataIsNull_f(pBitmap, j)) {
15✔
3607
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
3608
            } else {
3609
              ((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j];
15✔
3610
            }
3611
            numOfRows += 1;
15✔
3612
            j += 1;
15✔
3613
          }
3614
          break;
5✔
3615
        case TSDB_DATA_TYPE_BOOL:
17✔
3616
        case TSDB_DATA_TYPE_TINYINT:
3617
        case TSDB_DATA_TYPE_UTINYINT:
3618
          while (j < totalRows) {
98✔
3619
            if (pBoolList[j] == 0) {
81✔
3620
              j += 1;
39✔
3621
              continue;
39✔
3622
            }
3623
            if (colDataIsNull_f(pBitmap, j)) {
42✔
3624
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
3625
            } else {
3626
              ((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j];
42✔
3627
            }
3628
            numOfRows += 1;
42✔
3629
            j += 1;
42✔
3630
          }
3631
          break;
17✔
3632
        case TSDB_DATA_TYPE_DECIMAL64:
×
3633
        case TSDB_DATA_TYPE_DECIMAL:
3634
          while (j < totalRows) {
×
3635
            if (pBoolList[j] == 0) {
×
3636
              j += 1;
×
3637
              continue;
×
3638
            }
3639
            if (colDataIsNull_f(pBitmap, j)) {
×
3640
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
3641
            } else {
3642
              memcpy(pDst->pData + numOfRows * pDst->info.bytes, pDst->pData + j * pDst->info.bytes, pDst->info.bytes);
×
3643
            }
3644
            numOfRows += 1;
×
3645
            j += 1;
×
3646
          }
3647
          break;
×
3648
      }
3649
    }
587✔
3650

3651
    if (maxRows < numOfRows) {
587✔
3652
      maxRows = numOfRows;
139✔
3653
    }
3654
  }
3655

3656
  pBlock->info.rows = maxRows;
150✔
3657
  if (pBitmap != NULL) {
150✔
3658
    taosMemoryFree(pBitmap);
144✔
3659
  }
3660

3661
  return code;
150✔
3662
}
3663

3664
int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
213,848✔
3665
  return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
213,848✔
3666
}
3667

3668
int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
×
3669
  if (!pDataBlock || !pOrderInfo) return 0;
×
3670
  for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
×
3671
    SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, i);
×
3672
    if (pOrder == NULL) {
×
3673
      continue;
×
3674
    }
3675

3676
    pOrder->pColData = taosArrayGet(pDataBlock->pDataBlock, pOrder->slotId);
×
3677
    if (pOrder->pColData == NULL) {
×
3678
      continue;
×
3679
    }
3680

3681
    pOrder->compFn = getKeyComparFunc(pOrder->pColData->info.type, pOrder->order);
×
3682
  }
3683

3684
  SSDataBlockSortHelper sortHelper = {.orderInfo = pOrderInfo, .pDataBlock = pDataBlock};
×
3685

3686
  int32_t rowIdx = 0, nextRowIdx = 1;
×
3687
  for (; rowIdx < pDataBlock->info.rows && nextRowIdx < pDataBlock->info.rows; ++rowIdx, ++nextRowIdx) {
×
3688
    if (dataBlockCompar(&nextRowIdx, &rowIdx, &sortHelper) < 0) {
×
3689
      break;
×
3690
    }
3691
  }
3692

3693
  return nextRowIdx;
×
3694
}
3695

3696
#define BLOCK_DATA_CHECK_TRESSA(o)                      \
3697
  if (!(o)) {                                           \
3698
    uError("blockDataCheck failed! line:%d", __LINE__); \
3699
    return TSDB_CODE_INTERNAL_ERROR;                    \
3700
  }
3701
int32_t blockDataCheck(const SSDataBlock* pDataBlock) {
682,165✔
3702
  if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER || NULL == pDataBlock || pDataBlock->info.rows == 0) {
682,165✔
3703
    return TSDB_CODE_SUCCESS;
31,138✔
3704
  }
3705

3706
  BLOCK_DATA_CHECK_TRESSA(pDataBlock->info.rows > 0);
651,027✔
3707
  if (!pDataBlock->info.dataLoad) {
651,027✔
3708
    return TSDB_CODE_SUCCESS;
3,699✔
3709
  }
3710

3711
  bool isVarType = false;
647,328✔
3712
  int32_t colLen = 0;
647,328✔
3713
  int32_t nextPos = 0;
647,328✔
3714
  int64_t checkRows = 0;
647,328✔
3715
  int64_t typeValue = 0;
647,328✔
3716
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
647,328✔
3717
  for (int32_t i = 0; i < colNum; ++i) {
3,970,271✔
3718
    SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, i);
3,258,401✔
3719
    BLOCK_DATA_CHECK_TRESSA(pCol != NULL);
3,258,444✔
3720
    isVarType = IS_VAR_DATA_TYPE(pCol->info.type);
3,259,048✔
3721
    checkRows = pDataBlock->info.rows;
3,259,048✔
3722
    if (pCol->info.noData == true) continue;
3,259,048✔
3723

3724
    if (isVarType) {
3,255,634✔
3725
      BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset);
658,804✔
3726
    } else {
3727
      BLOCK_DATA_CHECK_TRESSA(pCol->nullbitmap);
2,596,830✔
3728
    }
3729

3730
    nextPos = -1;
3,255,634✔
3731
    for (int64_t r = 0; r < checkRows; ++r) {
127,364,086✔
3732
      if (tsSafetyCheckLevel <= TSDB_SAFETY_CHECK_LEVELL_NORMAL) break;
124,053,093✔
3733
      if (!colDataIsNull_s(pCol, r)) {
248,089,034✔
3734
        BLOCK_DATA_CHECK_TRESSA(pCol->pData);
120,260,651✔
3735
        BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.length <= pCol->varmeta.allocLen);
120,260,651✔
3736

3737
        if (isVarType) {
120,260,651✔
3738
          BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.allocLen > 0);
17,687,551✔
3739
          BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] <= pCol->varmeta.length);
17,687,551✔
3740
          if (pCol->reassigned) {
17,687,551✔
3741
            BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] >= 0);
×
3742
          } else if (0 == r || nextPos == -1) {
17,687,551✔
3743
            nextPos = pCol->varmeta.offset[r];
654,288✔
3744
          } else {
3745
            BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] == nextPos);
17,033,263✔
3746
          }
3747

3748
          char*   pColData = pCol->pData + pCol->varmeta.offset[r];
17,687,551✔
3749
          int32_t colSize = 0;
17,687,551✔
3750
          if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
17,687,551✔
3751
            colLen = getJsonValueLen(pColData);
×
3752
          } else {
3753
            colLen = varDataTLen(pColData);
17,687,551✔
3754
          }
3755

3756
          if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
17,684,970✔
3757
            BLOCK_DATA_CHECK_TRESSA(colLen >= CHAR_BYTES);
×
3758
          } else {
3759
            BLOCK_DATA_CHECK_TRESSA(colLen >= VARSTR_HEADER_SIZE);
17,684,970✔
3760
          }
3761
          BLOCK_DATA_CHECK_TRESSA(colLen <= pCol->info.bytes);
17,684,970✔
3762

3763
          if (pCol->reassigned) {
17,684,970✔
3764
            BLOCK_DATA_CHECK_TRESSA((pCol->varmeta.offset[r] + colLen) <= pCol->varmeta.length);
×
3765
          } else {
3766
            nextPos += colLen;
17,684,970✔
3767
            BLOCK_DATA_CHECK_TRESSA(nextPos <= pCol->varmeta.length);
17,684,970✔
3768
          }
3769

3770
          typeValue = *(char*)(pCol->pData + pCol->varmeta.offset[r] + colLen - 1);
17,684,970✔
3771
        } else {
3772
          if (TSDB_DATA_TYPE_FLOAT == pCol->info.type) {
102,573,100✔
3773
            float v = 0;
18,832,927✔
3774
            GET_TYPED_DATA(v, float, pCol->info.type, colDataGetNumData(pCol, r), typeGetTypeModFromColInfo(&pCol->info));
18,832,927✔
3775
          } else if (TSDB_DATA_TYPE_DOUBLE == pCol->info.type) {
83,740,173✔
3776
            double v = 0;
2,013,613✔
3777
            GET_TYPED_DATA(v, double, pCol->info.type, colDataGetNumData(pCol, r), typeGetTypeModFromColInfo(&pCol->info));
2,013,613✔
3778
          } else if (IS_DECIMAL_TYPE(pCol->info.type)) {
81,726,560✔
3779
            // SKIP for decimal types
3780
          } else {
3781
            GET_TYPED_DATA(typeValue, int64_t, pCol->info.type, colDataGetNumData(pCol, r), typeGetTypeModFromColInfo(&pCol->info));
85,528,023✔
3782
          }
3783
        }
3784
      }
3785
    }
3786
  }
3787

3788
  return TSDB_CODE_SUCCESS;
711,870✔
3789
}
3790

3791

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc