• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.65
/source/common/src/tdatablock.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tdatablock.h"
18
#include "tcompare.h"
19
#include "tglobal.h"
20
#include "tlog.h"
21
#include "tname.h"
22

23
#define MALLOC_ALIGN_BYTES 32
24

25
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
21,637,560✔
26
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
21,637,560!
27
    if (pColumnInfoData->reassigned) {
5,323,891!
28
      int32_t totalSize = 0;
×
29
      for (int32_t row = 0; row < numOfRows; ++row) {
×
30
        char*   pColData = pColumnInfoData->pData + pColumnInfoData->varmeta.offset[row];
×
31
        int32_t colSize = calcStrBytesByType(pColumnInfoData->info.type, pColData);
×
32
        totalSize += colSize;
×
33
      }
34
      return totalSize;
×
35
    }
36
    return pColumnInfoData->varmeta.length;
5,323,891✔
37
  } else {
38
    if (pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) {
16,313,669!
39
      return 0;
×
40
    } else {
41
      return pColumnInfoData->info.bytes * numOfRows;
16,313,669✔
42
    }
43
  }
44
}
45

46
int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx) {
1,914,697✔
47
  if (colDataIsNull_s(pColumnInfoData, rowIdx)) {
3,829,394!
48
    return 0;
×
49
  }
50

51
  if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
1,914,697!
52
    return pColumnInfoData->info.bytes;
1,399,314✔
53
  } else {
54
    return calcStrBytesByType(pColumnInfoData->info.type, colDataGetData(pColumnInfoData, rowIdx));
515,383!
55
  }
56
  // if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
57
  //   return getJsonValueLen(colDataGetData(pColumnInfoData, rowIdx));
58
  // } else if (IS_STR_DATA_BLOB(pColumnInfoData->info.type)) {
59
  //   return blobDataTLen(colDataGetData(pColumnInfoData, rowIdx));
60
  // } else {
61
  //   return varDataTLen(colDataGetData(pColumnInfoData, rowIdx));
62
  // }
63
}
64

65
int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
93,646,031✔
66
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
93,646,031!
67
    return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows;
8,026,449✔
68
  } else {
69
    return ((pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) ? 0 : pColumnInfoData->info.bytes * numOfRows) +
85,619,582!
70
           BitmapLen(numOfRows);
85,619,582✔
71
  }
72
}
73

74
int32_t getJsonValueLen(const char* data) {
35,887✔
75
  int32_t dataLen = 0;
35,887✔
76
  if (*data == TSDB_DATA_TYPE_NULL) {
35,887✔
77
    dataLen = CHAR_BYTES;
1,433✔
78
  } else if (*data == TSDB_DATA_TYPE_NCHAR) {
34,454✔
79
    dataLen = varDataTLen(data + CHAR_BYTES) + CHAR_BYTES;
15,654✔
80
  } else if (*data == TSDB_DATA_TYPE_DOUBLE) {
18,800✔
81
    dataLen = DOUBLE_BYTES + CHAR_BYTES;
3,072✔
82
  } else if (*data == TSDB_DATA_TYPE_BOOL) {
15,728✔
83
    dataLen = CHAR_BYTES + CHAR_BYTES;
1,010✔
84
  } else if (tTagIsJson(data)) {  // json string
14,718!
85
    dataLen = ((STag*)(data))->len;
14,721✔
86
  } else {
87
    uError("Invalid data type:%d in Json", *data);
×
88
  }
89
  return dataLen;
35,888✔
90
}
91

92
static int32_t getDataLen(int32_t type, const char* pData) {
660,217,415✔
93
  int32_t dataLen = 0;
660,217,415✔
94
  if (type == TSDB_DATA_TYPE_JSON) {
660,217,415✔
95
    dataLen = getJsonValueLen(pData);
33,765✔
96
  } else if (IS_STR_DATA_BLOB(type)) {
660,183,650!
97
    dataLen = blobDataTLen(pData);
×
98
  } else {
99
    dataLen = varDataTLen(pData);
660,395,194✔
100
  }
101
  return dataLen;
660,587,828✔
102
}
103

104
int32_t calcStrBytesByType(int8_t type, char* data) { return getDataLen(type, data); }
91,810,839✔
105

106
static int32_t colDataSetValHelp(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
2,147,483,647✔
107
  if (isNull || pData == NULL) {
2,147,483,647!
108
    // There is a placehold for each NULL value of binary or nchar type.
109
    if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
112,974,695!
110
      pColumnInfoData->varmeta.offset[rowIndex] = -1;  // it is a null value of VAR type.
×
111
    } else {
112
      colDataSetNull_f_s(pColumnInfoData, rowIndex);
119,917,818✔
113
    }
114

115
    pColumnInfoData->hasNull = true;
112,974,695✔
116
    return 0;
112,974,695✔
117
  }
118

119
  int32_t type = pColumnInfoData->info.type;
2,147,483,647✔
120
  if (IS_VAR_DATA_TYPE(type)) {
2,147,483,647!
121
    int32_t dataLen = getDataLen(type, pData);
526,285,690✔
122
    if (pColumnInfoData->varmeta.offset[rowIndex] > 0) {
569,320,862✔
123
      pColumnInfoData->varmeta.length = pColumnInfoData->varmeta.offset[rowIndex];
240✔
124
    }
125

126
    SVarColAttr* pAttr = &pColumnInfoData->varmeta;
569,320,862✔
127
    if (pAttr->allocLen < pAttr->length + dataLen) {
569,320,862✔
128
      uint32_t newSize = pAttr->allocLen;
24,796,161✔
129
      if (newSize <= 1) {
24,796,161✔
130
        newSize = 8;
7,416,625✔
131
      }
132

133
      while (newSize < pAttr->length + dataLen) {
54,847,047✔
134
        newSize = newSize * 1.5;
30,050,886✔
135
        if (newSize > UINT32_MAX) {
136
          return TSDB_CODE_OUT_OF_MEMORY;
137
        }
138
      }
139

140
      char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
24,796,161!
141
      if (buf == NULL) {
24,803,019!
142
        return terrno;
×
143
      }
144

145
      pColumnInfoData->pData = buf;
24,803,019✔
146
      pAttr->allocLen = newSize;
24,803,019✔
147
    }
148

149
    uint32_t len = pColumnInfoData->varmeta.length;
569,327,720✔
150
    pColumnInfoData->varmeta.offset[rowIndex] = len;
569,327,720✔
151

152
    (void)memmove(pColumnInfoData->pData + len, pData, dataLen);
569,327,720✔
153
    pColumnInfoData->varmeta.length += dataLen;
569,327,720✔
154
  } else {
155
    memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex, pData, pColumnInfoData->info.bytes);
2,147,483,647✔
156
    colDataClearNull_f(pColumnInfoData->nullbitmap, rowIndex);
2,147,483,647✔
157
  }
158

159
  return 0;
2,147,483,647✔
160
}
161

162
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
2,147,483,647✔
163
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
2,147,483,647!
164
    pColumnInfoData->varmeta.offset[rowIndex] = -1;
569,577,879✔
165
  }
166

167
  return colDataSetValHelp(pColumnInfoData, rowIndex, pData, isNull);
2,147,483,647✔
168
}
169

170
int32_t colDataSetValOrCover(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
71,020,877✔
171
  return colDataSetValHelp(pColumnInfoData, rowIndex, pData, isNull);
71,020,877✔
172
}
173

174
int32_t varColSetVarData(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pVarData, int32_t varDataLen,
1,415✔
175
                         bool isNull) {
176
  if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
1,415!
177
    return TSDB_CODE_INVALID_PARA;
×
178
  }
179

180
  if (isNull || pVarData == NULL) {
1,415!
181
    pColumnInfoData->varmeta.offset[rowIndex] = -1;  // it is a null value of VAR type.
×
182
    pColumnInfoData->hasNull = true;
×
183
    return TSDB_CODE_SUCCESS;
×
184
  }
185

186
  int32_t dataLen = VARSTR_HEADER_SIZE + varDataLen;
1,415✔
187
  if (pColumnInfoData->varmeta.offset[rowIndex] > 0) {
1,415!
188
    pColumnInfoData->varmeta.length = pColumnInfoData->varmeta.offset[rowIndex];
×
189
  }
190

191
  SVarColAttr* pAttr = &pColumnInfoData->varmeta;
1,415✔
192
  if (pAttr->allocLen < pAttr->length + dataLen) {
1,415!
193
    uint32_t newSize = pAttr->allocLen;
1,415✔
194
    if (newSize <= 1) {
1,415!
195
      newSize = 8;
1,415✔
196
    }
197

198
    while (newSize < pAttr->length + dataLen) {
1,432✔
199
      newSize = newSize * 1.5;
17✔
200
      if (newSize > UINT32_MAX) {
201
        return TSDB_CODE_OUT_OF_MEMORY;
202
      }
203
    }
204

205
    char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
1,415!
206
    if (buf == NULL) {
1,416!
207
      return terrno;
×
208
    }
209

210
    pColumnInfoData->pData = buf;
1,416✔
211
    pAttr->allocLen = newSize;
1,416✔
212
  }
213

214
  uint32_t len = pColumnInfoData->varmeta.length;
1,416✔
215
  pColumnInfoData->varmeta.offset[rowIndex] = len;
1,416✔
216

217
  if (IS_STR_DATA_BLOB(pColumnInfoData->info.type)) {
1,416!
218
    (void)memmove(blobDataVal(pColumnInfoData->pData + len), pVarData, varDataLen);
×
219
    blobDataSetLen(pColumnInfoData->pData + len, varDataLen);
×
220

221
  } else {
222
    (void)memmove(varDataVal(pColumnInfoData->pData + len), pVarData, varDataLen);
1,416✔
223
    varDataSetLen(pColumnInfoData->pData + len, varDataLen);
1,416✔
224
  }
225
  pColumnInfoData->varmeta.length += dataLen;
1,416✔
226
  return TSDB_CODE_SUCCESS;
1,416✔
227
}
228

229
int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx,
×
230
                           const char* pData) {
231
  int32_t type = pColumnInfoData->info.type;
×
232
  if (IS_VAR_DATA_TYPE(type)) {
×
233
    pColumnInfoData->varmeta.offset[dstRowIdx] = pColumnInfoData->varmeta.offset[srcRowIdx];
×
234
    pColumnInfoData->reassigned = true;
×
235
  } else {
236
    memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * dstRowIdx, pData, pColumnInfoData->info.bytes);
×
237
    colDataClearNull_f(pColumnInfoData->nullbitmap, dstRowIdx);
×
238
  }
239

240
  return 0;
×
241
}
242

243
static int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) {
1,522,133✔
244
  if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
1,522,133!
245
    return TSDB_CODE_SUCCESS;
×
246
  }
247

248
  if (pColumnInfoData->varmeta.allocLen >= newSize) {
1,522,133!
249
    return TSDB_CODE_SUCCESS;
×
250
  }
251

252
  if (pColumnInfoData->varmeta.allocLen < newSize) {
1,522,133✔
253
    char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
1,522,124!
254
    if (buf == NULL) {
1,522,129!
255
      return terrno;
×
256
    }
257

258
    pColumnInfoData->pData = buf;
1,522,129✔
259
    pColumnInfoData->varmeta.allocLen = newSize;
1,522,129✔
260
  }
261

262
  return TSDB_CODE_SUCCESS;
1,522,138✔
263
}
264

265
static int32_t doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData,
16,765,825✔
266
                            int32_t itemLen, int32_t numOfRows, bool trimValue) {
267
  if (pColumnInfoData->info.bytes < itemLen) {
16,765,825!
268
    uWarn("column/tag actual data len %d is bigger than schema len %d, trim it:%d", itemLen,
×
269
          pColumnInfoData->info.bytes, trimValue);
270
    if (trimValue) {
×
271
      itemLen = pColumnInfoData->info.bytes;
×
272
    } else {
273
      return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
274
    }
275
  }
276

277
  size_t   start = 1;
16,765,825✔
278
  int32_t  t = 0;
16,765,825✔
279
  int32_t  count = log(numOfRows) / log(2);
16,765,825✔
280
  uint32_t startOffset =
16,765,825✔
281
      (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) ? pColumnInfoData->varmeta.length : (currentRow * itemLen);
16,765,825!
282

283
  // the first item
284
  memcpy(pColumnInfoData->pData + startOffset, pData, itemLen);
16,765,825✔
285

286
  while (t < count) {
41,956,650✔
287
    int32_t xlen = 1 << t;
25,190,825✔
288
    memcpy(pColumnInfoData->pData + start * itemLen + startOffset, pColumnInfoData->pData + startOffset,
25,190,825✔
289
           xlen * itemLen);
25,190,825✔
290
    t += 1;
25,190,825✔
291
    start += xlen;
25,190,825✔
292
  }
293

294
  // the tail part
295
  if (numOfRows > start) {
16,765,825✔
296
    memcpy(pColumnInfoData->pData + start * itemLen + startOffset, pColumnInfoData->pData + startOffset,
4,198,257✔
297
           (numOfRows - start) * itemLen);
4,198,257✔
298
  }
299

300
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
16,765,825!
301
    for (int32_t i = 0; i < numOfRows; ++i) {
674,380,642✔
302
      pColumnInfoData->varmeta.offset[i + currentRow] = pColumnInfoData->varmeta.length + i * itemLen;
671,405,831✔
303
    }
304

305
    pColumnInfoData->varmeta.length += numOfRows * itemLen;
2,974,811✔
306
  }
307

308
  return TSDB_CODE_SUCCESS;
16,765,825✔
309
}
310

311
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows,
16,766,600✔
312
                         bool trimValue) {
313
  int32_t len = pColumnInfoData->info.bytes;
16,766,600✔
314
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
16,766,600!
315
    len = calcStrBytesByType(pColumnInfoData->info.type, (char*)pData);
2,974,410✔
316
    // if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
317
    //   len = getJsonValueLen(pData);
318
    // } else if (IS_STR_DATA_BLOB(pColumnInfoData->info.type)) {
319
    //   len = blobDataTLen(pData);
320
    // } else {
321
    //   len = varDataTLen(pData);
322
    // }
323
    if (pColumnInfoData->varmeta.allocLen < (numOfRows * len + pColumnInfoData->varmeta.length)) {
2,974,814✔
324
      int32_t code = colDataReserve(pColumnInfoData, (numOfRows * len + pColumnInfoData->varmeta.length));
1,522,138✔
325
      if (code != TSDB_CODE_SUCCESS) {
1,522,127!
326
        return code;
×
327
      }
328
    }
329
  }
330

331
  return doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows, trimValue);
16,766,993✔
332
}
333

334
void colDataSetNItemsNull(SColumnInfoData* pColumnInfoData, uint32_t currentRow, uint32_t numOfRows) {
487,489✔
335
  pColumnInfoData->hasNull = true;
487,489✔
336

337
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
487,489!
338
    memset(&pColumnInfoData->varmeta.offset[currentRow], -1, sizeof(int32_t) * numOfRows);
103,304✔
339
  } else {
340
    if (numOfRows < 16) {
384,185!
341
      for (int32_t i = 0; i < numOfRows; ++i) {
775,718✔
342
        colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
391,533✔
343
      }
344
    } else {
345
      int32_t i = 0;
×
346
      for (; i < numOfRows; ++i) {
×
347
        if (BitPos(currentRow + i)) {
×
348
          colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
×
349
        } else {
350
          break;
×
351
        }
352
      }
353

354
      int32_t bytes = (numOfRows - i) / 8;
×
355
      memset(&BMCharPos(pColumnInfoData->nullbitmap, currentRow + i), 0xFF, bytes);
×
356
      i += bytes * 8;
×
357

358
      for (; i < numOfRows; ++i) {
×
359
        colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
×
360
      }
361
    }
362
  }
363
}
487,489✔
364

365
int32_t colDataCopyAndReassign(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData,
4,777✔
366
                               uint32_t numOfRows) {
367
  int32_t code = colDataSetVal(pColumnInfoData, currentRow, pData, false);
4,777✔
368
  if (code) {
4,777!
369
    return code;
×
370
  }
371

372
  if (numOfRows > 1) {
4,777✔
373
    int32_t* pOffset = pColumnInfoData->varmeta.offset;
2,747✔
374
    memset(&pOffset[currentRow + 1], pOffset[currentRow], sizeof(pOffset[0]) * (numOfRows - 1));
2,747✔
375
    pColumnInfoData->reassigned = true;
2,747✔
376
  }
377

378
  return code;
4,777✔
379
}
380

381
int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows,
8,934✔
382
                          bool isNull) {
383
  int32_t len = pColumnInfoData->info.bytes;
8,934✔
384
  if (isNull) {
8,934!
385
    colDataSetNItemsNull(pColumnInfoData, currentRow, numOfRows);
×
386
    pColumnInfoData->hasNull = true;
×
387
    return 0;
×
388
  }
389

390
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
8,934!
391
    return colDataCopyAndReassign(pColumnInfoData, currentRow, pData, numOfRows);
4,776✔
392
  } else {
393
    int32_t  colBytes = pColumnInfoData->info.bytes;
4,158✔
394
    int32_t  colOffset = currentRow * colBytes;
4,158✔
395
    uint32_t num = 1;
4,158✔
396

397
    void* pStart = pColumnInfoData->pData + colOffset;
4,158✔
398
    memcpy(pStart, pData, colBytes);
4,158✔
399
    colOffset += num * colBytes;
4,158✔
400

401
    while (num < numOfRows) {
13,170✔
402
      int32_t maxNum = num << 1;
9,012✔
403
      int32_t tnum = maxNum > numOfRows ? (numOfRows - num) : num;
9,012✔
404

405
      memcpy(pColumnInfoData->pData + colOffset, pStart, tnum * colBytes);
9,012✔
406
      colOffset += tnum * colBytes;
9,012✔
407
      num += tnum;
9,012✔
408
    }
409
  }
410

411
  return TSDB_CODE_SUCCESS;
4,158✔
412
}
413

414
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource,
4,096,949✔
415
                          int32_t numOfRow2) {
416
  if (numOfRow2 <= 0) return;
4,096,949!
417

418
  uint32_t total = numOfRow1 + numOfRow2;
4,096,949✔
419

420
  uint32_t remindBits = BitPos(numOfRow1);
4,096,949✔
421
  uint32_t shiftBits = 8 - remindBits;
4,096,949✔
422

423
  if (remindBits == 0) {  // no need to shift bits of bitmap
4,096,949✔
424
    memcpy(pColumnInfoData->nullbitmap + BitmapLen(numOfRow1), pSource->nullbitmap, BitmapLen(numOfRow2));
2,461,410✔
425
    return;
2,461,410✔
426
  }
427

428
  uint8_t* p = (uint8_t*)pSource->nullbitmap;
1,635,539✔
429
  pColumnInfoData->nullbitmap[BitmapLen(numOfRow1) - 1] &= (0B11111111 << shiftBits);  // clear remind bits
1,635,539✔
430
  pColumnInfoData->nullbitmap[BitmapLen(numOfRow1) - 1] |= (p[0] >> remindBits);       // copy remind bits
1,635,539✔
431

432
  if (BitmapLen(numOfRow1) == BitmapLen(total)) {
1,635,539✔
433
    return;
732,040✔
434
  }
435

436
  int32_t len = BitmapLen(numOfRow2);
903,499✔
437
  int32_t i = 0;
903,499✔
438

439
  uint8_t* start = (uint8_t*)&pColumnInfoData->nullbitmap[BitmapLen(numOfRow1)];
903,499✔
440
  int32_t  overCount = BitmapLen(total) - BitmapLen(numOfRow1);
903,499✔
441
  memset(start, 0, overCount);
903,499✔
442
  while (i < len) {  // size limit of pSource->nullbitmap
17,484,945✔
443
    if (i >= 1) {
16,818,392✔
444
      start[i - 1] |= (p[i] >> remindBits);  // copy remind bits
15,918,914✔
445
    }
446

447
    if (i >= overCount) {  // size limit of pColumnInfoData->nullbitmap
16,818,392✔
448
      return;
236,946✔
449
    }
450

451
    start[i] |= (p[i] << shiftBits);  // copy shift bits
16,581,446✔
452
    i += 1;
16,581,446✔
453
  }
454
}
455

456
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
7,209,381✔
457
                        const SColumnInfoData* pSource, int32_t numOfRow2) {
458
  if (pColumnInfoData->info.type != pSource->info.type) {
7,209,381!
459
    return TSDB_CODE_INVALID_PARA;
×
460
  }
461

462
  if (numOfRow2 == 0) {
7,209,381✔
463
    return numOfRow1;
30✔
464
  }
465

466
  if (pSource->hasNull) {
7,209,351✔
467
    pColumnInfoData->hasNull = pSource->hasNull;
3,506,840✔
468
  }
469

470
  uint32_t finalNumOfRows = numOfRow1 + numOfRow2;
7,209,351✔
471
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
7,209,351!
472
    // Handle the bitmap
473
    if (finalNumOfRows > (*capacity)) {
3,114,108✔
474
      char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2));
1,649,887!
475
      if (p == NULL) {
1,650,315!
476
        return terrno;
×
477
      }
478

479
      *capacity = finalNumOfRows;
1,650,315✔
480
      pColumnInfoData->varmeta.offset = (int32_t*)p;
1,650,315✔
481
    }
482

483
    for (int32_t i = 0; i < numOfRow2; ++i) {
195,944,090✔
484
      if (pSource->varmeta.offset[i] == -1) {
192,829,554✔
485
        pColumnInfoData->varmeta.offset[i + numOfRow1] = -1;
13,035,137✔
486
      } else {
487
        pColumnInfoData->varmeta.offset[i + numOfRow1] = pSource->varmeta.offset[i] + pColumnInfoData->varmeta.length;
179,794,417✔
488
      }
489
    }
490

491
    // copy data
492
    uint32_t len = pSource->varmeta.length;
3,114,536✔
493
    uint32_t oldLen = pColumnInfoData->varmeta.length;
3,114,536✔
494
    if (pColumnInfoData->varmeta.allocLen < len + oldLen) {
3,114,536✔
495
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, len + oldLen);
2,058,104!
496
      if (tmp == NULL) {
2,058,182!
497
        return terrno;
×
498
      }
499

500
      pColumnInfoData->pData = tmp;
2,058,182✔
501
      pColumnInfoData->varmeta.allocLen = len + oldLen;
2,058,182✔
502
    }
503

504
    if (pColumnInfoData->pData && pSource->pData) {  // TD-20382
3,114,614✔
505
      memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len);
2,863,271✔
506
    }
507
    pColumnInfoData->varmeta.length = len + oldLen;
3,114,614✔
508
  } else {
509
    if (finalNumOfRows > (*capacity)) {
4,095,243✔
510
      // all data may be null, when the pColumnInfoData->info.type == 0, bytes == 0;
511
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes);
2,721,070!
512
      if (tmp == NULL) {
2,723,270!
513
        return terrno;
×
514
      }
515

516
      pColumnInfoData->pData = tmp;
2,723,270✔
517
      if (BitmapLen(numOfRow1) < BitmapLen(finalNumOfRows)) {
2,723,270✔
518
        char* btmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(finalNumOfRows));
2,025,845!
519
        if (btmp == NULL) {
2,025,994!
520
          return terrno;
×
521
        }
522
        uint32_t extend = BitmapLen(finalNumOfRows) - BitmapLen(numOfRow1);
2,025,994✔
523
        memset(btmp + BitmapLen(numOfRow1), 0, extend);
2,025,994✔
524
        pColumnInfoData->nullbitmap = btmp;
2,025,994✔
525
      }
526

527
      *capacity = finalNumOfRows;
2,723,419✔
528
    }
529

530
    doBitmapMerge(pColumnInfoData, numOfRow1, pSource, numOfRow2);
4,097,592✔
531

532
    if (pSource->pData) {
4,096,910✔
533
      int32_t offset = pColumnInfoData->info.bytes * numOfRow1;
4,096,419✔
534
      memcpy(pColumnInfoData->pData + offset, pSource->pData, pSource->info.bytes * numOfRow2);
4,096,419✔
535
    }
536
  }
537

538
  return numOfRow1 + numOfRow2;
7,211,524✔
539
}
540

541
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
26,599,427✔
542
                      const SDataBlockInfo* pBlockInfo) {
543
  if (pColumnInfoData->info.type != pSource->info.type || (pBlockInfo != NULL && pBlockInfo->capacity < numOfRows)) {
26,599,427!
544
    return TSDB_CODE_INVALID_PARA;
×
545
  }
546

547
  if (numOfRows <= 0) {
26,599,427✔
548
    return numOfRows;
7,869✔
549
  }
550

551
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
33,385,896!
552
    int32_t newLen = pSource->varmeta.length;
6,790,839✔
553
    memcpy(pColumnInfoData->varmeta.offset, pSource->varmeta.offset, sizeof(int32_t) * numOfRows);
6,790,839✔
554
    if (pColumnInfoData->varmeta.allocLen < newLen) {
6,790,839✔
555
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, newLen);
4,082,979!
556
      if (tmp == NULL) {
4,086,478!
557
        return terrno;
×
558
      }
559

560
      pColumnInfoData->pData = tmp;
4,086,478✔
561
      pColumnInfoData->varmeta.allocLen = newLen;
4,086,478✔
562
    }
563

564
    pColumnInfoData->varmeta.length = newLen;
6,794,338✔
565
    if (pColumnInfoData->pData != NULL && pSource->pData != NULL) {
6,794,338✔
566
      memcpy(pColumnInfoData->pData, pSource->pData, newLen);
6,323,824✔
567
    }
568
  } else {
569
    memcpy(pColumnInfoData->nullbitmap, pSource->nullbitmap, BitmapLen(numOfRows));
19,800,719✔
570
    if (pSource->pData != NULL) {
19,800,719!
571
      memcpy(pColumnInfoData->pData, pSource->pData, pSource->info.bytes * numOfRows);
19,804,893✔
572
    }
573
  }
574

575
  pColumnInfoData->hasNull = pSource->hasNull;
26,595,057✔
576
  pColumnInfoData->info = pSource->info;
26,595,057✔
577
  return 0;
26,595,057✔
578
}
579

580
int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx,
5,323,341✔
581
                           int32_t numOfRows) {
582
  if (pDst->info.type != pSrc->info.type || pDst->info.bytes != pSrc->info.bytes || pSrc->reassigned) {
5,323,341!
583
    return TSDB_CODE_INVALID_PARA;
×
584
  }
585

586
  if (numOfRows <= 0) {
5,323,582!
587
    return numOfRows;
×
588
  }
589

590
  if (IS_VAR_DATA_TYPE(pDst->info.type)) {
6,276,580!
591
    int32_t allLen = 0;
952,998✔
592
    void*   srcAddr = NULL;
952,998✔
593
    if (pSrc->hasNull) {
952,998✔
594
      for (int32_t i = 0; i < numOfRows; ++i) {
1,706,471✔
595
        if (colDataIsNull_var(pSrc, srcIdx + i)) {
870,790!
596
          pDst->varmeta.offset[dstIdx + i] = -1;
102,656✔
597
          pDst->hasNull = true;
102,656✔
598
          continue;
102,656✔
599
        }
600

601
        char* pData = colDataGetVarData(pSrc, srcIdx + i);
768,134✔
602
        if (NULL == srcAddr) {
768,134✔
603
          srcAddr = pData;
733,064✔
604
        }
605
        int32_t dataLen = calcStrBytesByType(pSrc->info.type, pData);
768,134✔
606
        // if (pSrc->info.type == TSDB_DATA_TYPE_JSON) {
607
        //   dataLen = getJsonValueLen(pData);
608
        // } else if (IS_STR_DATA_BLOB(pSrc->info.type)) {
609
        //   dataLen = blobDataTLen(pData);
610
        // } else {
611
        //   dataLen = varDataTLen(pData);
612
        // }
613
        pDst->varmeta.offset[dstIdx + i] = pDst->varmeta.length + allLen;
768,134✔
614
        allLen += dataLen;
768,134✔
615
      }
616
    } else {
617
      for (int32_t i = 0; i < numOfRows; ++i) {
600,467✔
618
        char*   pData = colDataGetVarData(pSrc, srcIdx + i);
483,150✔
619
        int32_t dataLen = 0;
483,150✔
620
        if (pSrc->info.type == TSDB_DATA_TYPE_JSON) {
483,150✔
621
          dataLen = getJsonValueLen(pData);
28✔
622
        } else if (IS_STR_DATA_BLOB(pSrc->info.type)) {
483,122!
623
          dataLen = blobDataTLen(pData);
×
624
        } else {
625
          dataLen = varDataTLen(pData);
483,122✔
626
        }
627
        pDst->varmeta.offset[dstIdx + i] = pDst->varmeta.length + allLen;
483,150✔
628
        allLen += dataLen;
483,150✔
629
      }
630
    }
631

632
    if (allLen > 0) {
952,998✔
633
      // copy data
634
      if (pDst->varmeta.allocLen < pDst->varmeta.length + allLen) {
850,345✔
635
        char* tmp = taosMemoryRealloc(pDst->pData, pDst->varmeta.length + allLen);
848,310!
636
        if (tmp == NULL) {
848,310!
637
          return terrno;
×
638
        }
639

640
        pDst->pData = tmp;
848,310✔
641
        pDst->varmeta.allocLen = pDst->varmeta.length + allLen;
848,310✔
642
      }
643
      if (pSrc->hasNull) {
850,345✔
644
        memcpy(pDst->pData + pDst->varmeta.length, srcAddr, allLen);
733,064✔
645
      } else {
646
        memcpy(pDst->pData + pDst->varmeta.length, colDataGetVarData(pSrc, srcIdx), allLen);
117,281✔
647
      }
648
      pDst->varmeta.length = pDst->varmeta.length + allLen;
850,345✔
649
    }
650
  } else {
651
    if (pSrc->hasNull) {
4,370,584✔
652
      if (BitPos(dstIdx) == BitPos(srcIdx)) {
3,209,392✔
653
        for (int32_t i = 0; i < numOfRows; ++i) {
2,848,004✔
654
          if (0 == BitPos(dstIdx) && (i + (1 << NBIT) <= numOfRows)) {
2,249,182✔
655
            BMCharPos(pDst->nullbitmap, dstIdx + i) = BMCharPos(pSrc->nullbitmap, srcIdx + i);
426,427✔
656
            if (BMCharPos(pDst->nullbitmap, dstIdx + i)) {
426,427✔
657
              pDst->hasNull = true;
564✔
658
            }
659
            i += (1 << NBIT) - 1;
426,427✔
660
          } else {
661
            if (colDataIsNull_f(pSrc, srcIdx + i)) {
1,822,755✔
662
              colDataSetNull_f(pDst->nullbitmap, dstIdx + i);
42,687✔
663
              pDst->hasNull = true;
42,687✔
664
            } else {
665
              colDataClearNull_f(pDst->nullbitmap, dstIdx + i);
1,780,068✔
666
            }
667
          }
668
        }
669
      } else {
670
        for (int32_t i = 0; i < numOfRows; ++i) {
8,080,620✔
671
          if (colDataIsNull_f(pSrc, srcIdx + i)) {
5,470,050!
672
            colDataSetNull_f(pDst->nullbitmap, dstIdx + i);
265,569✔
673
            pDst->hasNull = true;
265,569✔
674
          } else {
675
            colDataClearNull_f(pDst->nullbitmap, dstIdx + i);
5,204,481✔
676
          }
677
        }
678
      }
679
    }
680

681
    if (pSrc->pData != NULL) {
4,370,584!
682
      memcpy(pDst->pData + pDst->info.bytes * dstIdx, pSrc->pData + pSrc->info.bytes * srcIdx,
4,370,665✔
683
             pDst->info.bytes * numOfRows);
4,370,665✔
684
    }
685
  }
686

687
  return 0;
5,323,582✔
688
}
689

690
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSize(pBlock->pDataBlock); }
166,187,173✔
691

692
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; }
3,031,375✔
693

694
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) {
9,981,972✔
695
  if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0) {
9,981,972!
696
    return 0;
333,775✔
697
  }
698

699
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
9,648,197✔
700
  if (numOfCols <= 0) {
9,647,705!
701
    return -1;
×
702
  }
703

704
  int32_t index = (tsColumnIndex == -1) ? 0 : tsColumnIndex;
9,647,705!
705

706
  SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index);
9,647,705✔
707
  if (pColInfoData == NULL) {
9,647,339!
708
    return 0;
×
709
  }
710

711
  if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
9,647,339✔
712
    return 0;
4,293,522✔
713
  }
714

715
  TSKEY skey = *(TSKEY*)colDataGetData(pColInfoData, 0);
5,353,817!
716
  TSKEY ekey = *(TSKEY*)colDataGetData(pColInfoData, (pDataBlock->info.rows - 1));
5,353,817!
717

718
  pDataBlock->info.window.skey = TMIN(skey, ekey);
5,353,817✔
719
  pDataBlock->info.window.ekey = TMAX(skey, ekey);
5,353,817✔
720

721
  return 0;
5,353,817✔
722
}
723

724
int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, bool asc) {
3,235,381✔
725
  if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0 || pkColumnIndex == -1) {
3,235,381!
726
    return 0;
3,026,312✔
727
  }
728

729
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
209,069✔
730
  if (numOfCols <= 0) {
209,026!
731
    return -1;
×
732
  }
733

734
  SDataBlockInfo*  pInfo = &pDataBlock->info;
209,026✔
735
  SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pkColumnIndex);
209,026✔
736
  if (pColInfoData == NULL) {
208,993✔
737
    return terrno;
14✔
738
  }
739

740
  if (!IS_NUMERIC_TYPE(pColInfoData->info.type) && (pColInfoData->info.type != TSDB_DATA_TYPE_VARCHAR)) {
208,979!
741
    return 0;
×
742
  }
743

744
  void* skey = colDataGetData(pColInfoData, 0);
208,979!
745
  void* ekey = colDataGetData(pColInfoData, (pInfo->rows - 1));
208,979!
746

747
  int64_t val = 0;
208,979✔
748
  if (asc) {
208,979✔
749
    if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
178,194!
750
      GET_TYPED_DATA(val, int64_t, pColInfoData->info.type, skey, typeGetTypeModFromColInfo(&pColInfoData->info));
109,877!
751
      VALUE_SET_TRIVIAL_DATUM(&pInfo->pks[0], val);
109,816✔
752
      GET_TYPED_DATA(val, int64_t, pColInfoData->info.type, ekey, typeGetTypeModFromColInfo(&pColInfoData->info));
109,816!
753
      VALUE_SET_TRIVIAL_DATUM(&pInfo->pks[1], val);
109,821✔
754
    } else {  // todo refactor
755
      memcpy(pInfo->pks[0].pData, varDataVal(skey), varDataLen(skey));
68,317✔
756
      pInfo->pks[0].nData = varDataLen(skey);
68,317✔
757

758
      memcpy(pInfo->pks[1].pData, varDataVal(ekey), varDataLen(ekey));
68,317✔
759
      pInfo->pks[1].nData = varDataLen(ekey);
68,317✔
760
    }
761
  } else {
762
    if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
30,785!
763
      GET_TYPED_DATA(val, int64_t, pColInfoData->info.type, ekey, typeGetTypeModFromColInfo(&pColInfoData->info));
19,442!
764
      VALUE_SET_TRIVIAL_DATUM(&pInfo->pks[0], val);
19,437✔
765
      GET_TYPED_DATA(val, int64_t, pColInfoData->info.type, skey, typeGetTypeModFromColInfo(&pColInfoData->info));
19,437!
766
      VALUE_SET_TRIVIAL_DATUM(&pInfo->pks[1], val);
19,437✔
767
    } else {  // todo refactor
768
      memcpy(pInfo->pks[0].pData, varDataVal(ekey), varDataLen(ekey));
11,343✔
769
      pInfo->pks[0].nData = varDataLen(ekey);
11,343✔
770

771
      memcpy(pInfo->pks[1].pData, varDataVal(skey), varDataLen(skey));
11,343✔
772
      pInfo->pks[1].nData = varDataLen(skey);
11,343✔
773
    }
774
  }
775

776
  return 0;
208,918✔
777
}
778

779
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
1,454,911✔
780
  int32_t code = 0;
1,454,911✔
781
  int32_t capacity = pDest->info.capacity;
1,454,911✔
782
  size_t  numOfCols = taosArrayGetSize(pDest->pDataBlock);
1,454,911✔
783
  for (int32_t i = 0; i < numOfCols; ++i) {
6,448,085✔
784
    SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
4,991,866✔
785
    SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
4,991,483✔
786
    if (pCol1 == NULL || pCol2 == NULL) {
4,990,217!
787
      return terrno;
×
788
    }
789

790
    capacity = pDest->info.capacity;
4,990,416✔
791
    int32_t ret = colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows);
4,990,416✔
792
    if (ret < 0) {  // error occurs
4,992,712!
793
      code = ret;
×
794
      return code;
×
795
    }
796
  }
797

798
  pDest->info.capacity = capacity;
1,456,219✔
799
  pDest->info.rows += pSrc->info.rows;
1,456,219✔
800
  return code;
1,456,219✔
801
}
802

803
void blockDataTransform(SSDataBlock* pDest, const SSDataBlock* pSrc) {
31,358✔
804
  size_t  numOfCols = taosArrayGetSize(pDest->pDataBlock);
31,358✔
805
  size_t  numOfColsSrc = taosArrayGetSize(pSrc->pDataBlock);
31,357✔
806
  for (int32_t i = 0; i < numOfCols; ++i) {
93,438✔
807
    SColumnInfoData* pCol1 = taosArrayGet(pDest->pDataBlock, i);
62,074✔
808
    for (int32_t j = 0; j < numOfColsSrc; ++j) {
121,193!
809
      SColumnInfoData* pCol2 = taosArrayGet(pSrc->pDataBlock, j);
121,213✔
810
      if (pCol1->info.colId == pCol2->info.colId) {
121,206✔
811
        TSWAP(*pCol1, *pCol2);
62,085✔
812
        break;
62,085✔
813
      }
814
    }
815
  }
816

817
  pDest->info.rows = pSrc->info.rows;
31,364✔
818
  pDest->info.capacity = pSrc->info.rows;
31,364✔
819
}
31,364✔
820

821
int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t srcIdx, int32_t numOfRows) {
2,535✔
822
  int32_t code = 0, lino = 0;
2,535✔
823
  if (pDest->info.rows + numOfRows > pDest->info.capacity) {
2,535!
824
    uError("block capacity %d not enough to merge %d rows, currRows:%" PRId64, pDest->info.capacity, numOfRows, pDest->info.rows);
×
825
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA);
×
826
  }
827

828
  size_t numOfCols = taosArrayGetSize(pDest->pDataBlock);
2,535✔
829
  for (int32_t i = 0; i < numOfCols; ++i) {
8,504✔
830
    SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
5,969✔
831
    TSDB_CHECK_NULL(pCol2, code, lino, _exit, terrno);
5,969!
832
    SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
5,969✔
833
    TSDB_CHECK_NULL(pCol1, code, lino, _exit, terrno);
5,969!
834

835
    TAOS_CHECK_EXIT(colDataAssignNRows(pCol2, pDest->info.rows, pCol1, srcIdx, numOfRows));
5,969!
836
  }
837

838
  pDest->info.rows += numOfRows;
2,535✔
839

840
_exit:
2,535✔
841

842
  if (code) {
2,535!
843
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
844
  }
845
  
846
  return code;
2,535✔
847
}
848

849
void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) {
×
850
  if (numOfRows == 0) {
×
851
    return;
×
852
  }
853

854
  if (numOfRows >= pBlock->info.rows) {
×
855
    blockDataCleanup(pBlock);
×
856
    return;
×
857
  }
858

859
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
×
860
  for (int32_t i = 0; i < numOfCols; ++i) {
×
861
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
×
862
    if (pCol == NULL) {
×
863
      continue;
×
864
    }
865

866
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
×
867
      pCol->varmeta.length = pCol->varmeta.offset[pBlock->info.rows - numOfRows];
×
868
      memset(pCol->varmeta.offset + pBlock->info.rows - numOfRows, 0, sizeof(*pCol->varmeta.offset) * numOfRows);
×
869
    } else {
870
      int32_t i = pBlock->info.rows - numOfRows;
×
871
      for (; i < pBlock->info.rows; ++i) {
×
872
        if (BitPos(i)) {
×
873
          colDataClearNull_f(pCol->nullbitmap, i);
×
874
        } else {
875
          break;
×
876
        }
877
      }
878

879
      int32_t bytes = (pBlock->info.rows - i) / 8;
×
880
      memset(&BMCharPos(pCol->nullbitmap, i), 0, bytes);
×
881
      i += bytes * 8;
×
882

883
      for (; i < pBlock->info.rows; ++i) {
×
884
        colDataClearNull_f(pCol->nullbitmap, i);
×
885
      }
886
    }
887
  }
888

889
  pBlock->info.rows -= numOfRows;
×
890
}
891

892
size_t blockDataGetSize(const SSDataBlock* pBlock) {
42,219,386✔
893
  size_t total = 0;
42,219,386✔
894
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
42,219,386✔
895
  for (int32_t i = 0; i < numOfCols; ++i) {
133,847,085✔
896
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
93,890,783✔
897
    if (pColInfoData == NULL) {
93,661,776!
898
      continue;
×
899
    }
900

901
    total += colDataGetFullLength(pColInfoData, pBlock->info.rows);
93,661,776✔
902
  }
903

904
  return total;
39,956,302✔
905
}
906

907
// the number of tuples can be fit in one page.
908
// Actual data rows pluses the corresponding meta data must fit in one memory buffer of the given page size.
909
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
148,630✔
910
                           int32_t pageSize) {
911
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
148,630✔
912
  int32_t numOfRows = pBlock->info.rows;
148,630✔
913

914
  int32_t bitmapChar = 1;
148,630✔
915

916
  size_t headerSize = sizeof(int32_t);
148,630✔
917
  size_t colHeaderSize = sizeof(int32_t) * numOfCols;
148,630✔
918

919
  // TODO speedup by checking if the whole page can fit in firstly.
920
  if (!hasVarCol) {
148,630✔
921
    size_t  rowSize = blockDataGetRowSize(pBlock);
3,987✔
922
    int32_t capacity = blockDataGetCapacityInRow(pBlock, pageSize, headerSize + colHeaderSize);
3,987✔
923
    if (capacity <= 0) {
3,987!
924
      return terrno;
×
925
    }
926

927
    *stopIndex = startIndex + capacity - 1;
3,987✔
928
    if (*stopIndex >= numOfRows) {
3,987✔
929
      *stopIndex = numOfRows - 1;
4✔
930
    }
931

932
    return TSDB_CODE_SUCCESS;
3,987✔
933
  }
934
  // iterate the rows that can be fit in this buffer page
935
  int32_t size = (headerSize + colHeaderSize);
144,643✔
936
  for (int32_t j = startIndex; j < numOfRows; ++j) {
9,283,083✔
937
    for (int32_t i = 0; i < numOfCols; ++i) {
89,501,941✔
938
      SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, i);
80,219,022✔
939
      if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
80,219,022!
940
        if (pColInfoData->varmeta.offset[j] != -1) {
20,437,233!
941
          char* p = colDataGetData(pColInfoData, j);
20,437,233!
942
          if (IS_STR_DATA_BLOB(pColInfoData->info.type)) {
20,437,233!
943
            size += blobDataTLen(p);
×
944
          } else {
945
            size += varDataTLen(p);
20,437,233✔
946
          }
947
        }
948

949
        size += sizeof(pColInfoData->varmeta.offset[0]);
20,437,233✔
950
      } else {
951
        size += pColInfoData->info.bytes;
59,781,789✔
952

953
        if (((j - startIndex) & 0x07) == 0) {
59,781,789✔
954
          size += 1;  // the space for null bitmap
7,639,383✔
955
        }
956
      }
957
    }
958

959
    if (size > pageSize) {  // pageSize must be able to hold one row
9,282,919✔
960
      *stopIndex = j - 1;
144,479✔
961
      if (*stopIndex < startIndex) {
144,479!
962
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
963
      }
964

965
      return TSDB_CODE_SUCCESS;
144,479✔
966
    }
967
  }
968

969
  // all fit in
970
  *stopIndex = numOfRows - 1;
164✔
971
  return TSDB_CODE_SUCCESS;
164✔
972
}
973

974
int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount, SSDataBlock** pResBlock) {
188,377✔
975
  int32_t code = 0;
188,377✔
976
  QRY_PARAM_CHECK(pResBlock);
188,377!
977

978
  if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) {
188,377!
979
    return TSDB_CODE_INVALID_PARA;
×
980
  }
981

982
  SSDataBlock* pDst = NULL;
188,377✔
983
  code = createOneDataBlock(pBlock, false, &pDst);
188,377✔
984
  if (code) {
188,372!
985
    return code;
×
986
  }
987

988
  code = blockDataEnsureCapacity(pDst, rowCount);
188,372✔
989
  if (code) {
188,375!
990
    blockDataDestroy(pDst);
×
991
    return code;
×
992
  }
993

994
  /* may have disorder varchar data, TODO
995
    for (int32_t i = 0; i < numOfCols; ++i) {
996
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
997
      SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
998

999
      colDataAssignNRows(pDstCol, 0, pColData, startIndex, rowCount);
1000
    }
1001
  */
1002

1003
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
188,375✔
1004
  for (int32_t i = 0; i < numOfCols; ++i) {
2,199,128✔
1005
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
2,009,455✔
1006
    SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
2,009,426✔
1007
    if (pColData == NULL || pDstCol == NULL) {
2,009,421!
1008
      continue;
×
1009
    }
1010

1011
    for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
78,714,232✔
1012
      bool isNull = false;
76,703,476✔
1013
      if (pBlock->pBlockAgg == NULL) {
76,703,476!
1014
        isNull = colDataIsNull_s(pColData, j);
153,406,952✔
1015
      } else {
1016
        isNull = colDataIsNull(pColData, pBlock->info.rows, j, &pBlock->pBlockAgg[i]);
×
1017
      }
1018

1019
      if (isNull) {
76,703,476✔
1020
        colDataSetNULL(pDstCol, j - startIndex);
6,413,333✔
1021
      } else {
1022
        char* p = colDataGetData(pColData, j);
70,290,143!
1023
        code = colDataSetVal(pDstCol, j - startIndex, p, false);
70,290,143✔
1024
        if (code) {
70,291,478!
1025
          break;
×
1026
        }
1027
      }
1028
    }
1029
  }
1030

1031
  pDst->info.rows = rowCount;
189,673✔
1032
  *pResBlock = pDst;
189,673✔
1033
  return code;
189,673✔
1034
}
1035

1036
/**
1037
 *
1038
 * +------------------+---------------------------------------------+
1039
 * |the number of rows|                    column #1                |
1040
 * |    (4 bytes)     |------------+-----------------------+--------+
1041
 * |                  | null bitmap| column length(4bytes) | values |
1042
 * +------------------+------------+-----------------------+--------+
1043
 * @param buf
1044
 * @param pBlock
1045
 * @return
1046
 */
1047
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
327,080✔
1048
  // write the number of rows
1049
  *(uint32_t*)buf = pBlock->info.rows;
327,080✔
1050

1051
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
327,080✔
1052
  int32_t numOfRows = pBlock->info.rows;
327,015✔
1053

1054
  char* pStart = buf + sizeof(uint32_t);
327,015✔
1055

1056
  for (int32_t i = 0; i < numOfCols; ++i) {
2,762,466✔
1057
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
2,435,496✔
1058
    if (pCol == NULL) {
2,435,325!
1059
      continue;
×
1060
    }
1061

1062
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
2,435,325!
1063
      memcpy(pStart, pCol->varmeta.offset, numOfRows * sizeof(int32_t));
505,242✔
1064
      pStart += numOfRows * sizeof(int32_t);
505,242✔
1065
    } else {
1066
      memcpy(pStart, pCol->nullbitmap, BitmapLen(numOfRows));
1,930,083✔
1067
      pStart += BitmapLen(pBlock->info.rows);
1,930,083✔
1068
    }
1069

1070
    uint32_t dataSize = colDataGetLength(pCol, numOfRows);
2,435,325✔
1071

1072
    *(int32_t*)pStart = dataSize;
2,435,451✔
1073
    pStart += sizeof(int32_t);
2,435,451✔
1074

1075
    if (pCol->reassigned && IS_VAR_DATA_TYPE(pCol->info.type)) {
2,435,451!
1076
      for (int32_t row = 0; row < numOfRows; ++row) {
×
1077
        char*   pColData = pCol->pData + pCol->varmeta.offset[row];
×
1078
        int32_t colSize = calcStrBytesByType(pCol->info.type, pColData);
×
1079
        // if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
1080
        //   colSize = getJsonValueLen(pColData);
1081
        // } else if (IS_STR_DATA_BLOB(pCol->info.type)) {
1082
        //   colSize = blobDataTLen(pColData);
1083
        // } else {
1084
        //   colSize = varDataTLen(pColData);
1085
        // }
1086
        memcpy(pStart, pColData, colSize);
×
1087
        pStart += colSize;
×
1088
      }
1089
    } else {
1090
      if (dataSize != 0) {
2,435,451✔
1091
        // ubsan reports error if pCol->pData==NULL && dataSize==0
1092
        memcpy(pStart, pCol->pData, dataSize);
2,431,588✔
1093
      }
1094
      pStart += dataSize;
2,435,451✔
1095
    }
1096
  }
1097

1098
  return 0;
326,970✔
1099
}
1100

1101
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
343,183✔
1102
  int32_t numOfRows = *(int32_t*)buf;
343,183✔
1103
  if (numOfRows == 0) {
343,183!
1104
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1105
  }
1106
  int32_t code = blockDataEnsureCapacity(pBlock, numOfRows);
343,183✔
1107
  if (code) {
343,180!
1108
    return code;
×
1109
  }
1110

1111
  pBlock->info.rows = numOfRows;
343,180✔
1112
  size_t      numOfCols = taosArrayGetSize(pBlock->pDataBlock);
343,180✔
1113
  const char* pStart = buf + sizeof(uint32_t);
343,158✔
1114

1115
  for (int32_t i = 0; i < numOfCols; ++i) {
2,347,320✔
1116
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
2,004,249✔
1117
    if (pCol == NULL) {
2,004,160!
1118
      continue;
×
1119
    }
1120

1121
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
2,004,160!
1122
      size_t metaSize = pBlock->info.rows * sizeof(int32_t);
361,674✔
1123
      memcpy(pCol->varmeta.offset, pStart, metaSize);
361,674✔
1124
      pStart += metaSize;
361,674✔
1125
    } else {
1126
      memcpy(pCol->nullbitmap, pStart, BitmapLen(pBlock->info.rows));
1,642,486✔
1127
      pStart += BitmapLen(pBlock->info.rows);
1,642,486✔
1128
    }
1129

1130
    int32_t colLength = *(int32_t*)pStart;
2,004,160✔
1131
    pStart += sizeof(int32_t);
2,004,160✔
1132

1133
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
2,004,160!
1134
      if (pCol->varmeta.allocLen < colLength) {
361,648✔
1135
        char* tmp = taosMemoryRealloc(pCol->pData, colLength);
11,714!
1136
        if (tmp == NULL) {
11,716!
1137
          return terrno;
×
1138
        }
1139

1140
        pCol->pData = tmp;
11,716✔
1141
        pCol->varmeta.allocLen = colLength;
11,716✔
1142
      }
1143

1144
      pCol->varmeta.length = colLength;
361,650✔
1145
      if (pCol->varmeta.length > pCol->varmeta.allocLen) {
361,650!
1146
        return TSDB_CODE_FAILED;
×
1147
      }
1148
    }
1149
    if (colLength != 0) {
2,004,162✔
1150
      // ubsan reports error if colLength==0 && pCol->pData == 0
1151
      memcpy(pCol->pData, pStart, colLength);
2,000,334✔
1152
    }
1153
    pStart += colLength;
2,004,162✔
1154
  }
1155

1156
  return TSDB_CODE_SUCCESS;
343,071✔
1157
}
1158

1159
static bool colDataIsNNull(const SColumnInfoData* pColumnInfoData, int32_t startIndex, uint32_t nRows) {
6,234,345✔
1160
  if (!pColumnInfoData->hasNull) {
6,234,345!
1161
    return false;
×
1162
  }
1163

1164
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
6,234,345!
1165
    for (int32_t i = startIndex; i < nRows; ++i) {
4,778,582✔
1166
      if (!colDataIsNull_var(pColumnInfoData, i)) {
4,767,450!
1167
        return false;
2,013,524✔
1168
      }
1169
    }
1170
  } else {
1171
    if (pColumnInfoData->nullbitmap == NULL) {
4,209,689!
1172
      return false;
×
1173
    }
1174

1175
    for (int32_t i = startIndex; i < nRows; ++i) {
8,969,892✔
1176
      if (!colDataIsNull_f(pColumnInfoData, i)) {
8,780,412✔
1177
        return false;
4,020,209✔
1178
      }
1179
    }
1180
  }
1181

1182
  return true;
200,612✔
1183
}
1184

1185
// todo remove this
1186
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) {
1,093,284✔
1187
  pBlock->info.rows = *(int32_t*)buf;
1,093,284✔
1188
  pBlock->info.id.groupId = *(uint64_t*)(buf + sizeof(int32_t));
1,093,284✔
1189

1190
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1,093,284✔
1191

1192
  const char* pStart = buf + sizeof(uint32_t) + sizeof(uint64_t);
1,093,121✔
1193

1194
  for (int32_t i = 0; i < numOfCols; ++i) {
7,329,255✔
1195
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
6,247,755✔
1196
    if (pCol == NULL) {
6,235,657!
1197
      continue;
×
1198
    }
1199

1200
    pCol->hasNull = true;
6,235,657✔
1201

1202
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
6,235,657!
1203
      size_t metaSize = capacity * sizeof(int32_t);
2,029,681✔
1204
      memcpy(pCol->varmeta.offset, pStart, metaSize);
2,029,681✔
1205
      pStart += metaSize;
2,029,681✔
1206
    } else {
1207
      memcpy(pCol->nullbitmap, pStart, BitmapLen(capacity));
4,205,976✔
1208
      pStart += BitmapLen(capacity);
4,205,976✔
1209
    }
1210

1211
    int32_t colLength = *(int32_t*)pStart;
6,235,657✔
1212
    pStart += sizeof(int32_t);
6,235,657✔
1213

1214
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
6,235,657!
1215
      if (pCol->varmeta.allocLen < colLength) {
2,027,667✔
1216
        char* tmp = taosMemoryRealloc(pCol->pData, colLength);
2,872!
1217
        if (tmp == NULL) {
2,872!
1218
          return terrno;
×
1219
        }
1220

1221
        pCol->pData = tmp;
2,872✔
1222
        pCol->varmeta.allocLen = colLength;
2,872✔
1223
      }
1224

1225
      pCol->varmeta.length = colLength;
2,027,667✔
1226
      if (pCol->varmeta.length > pCol->varmeta.allocLen) {
2,027,667!
1227
        return TSDB_CODE_FAILED;
×
1228
      }
1229
    }
1230

1231
    if (!colDataIsNNull(pCol, 0, pBlock->info.rows)) {
6,235,657✔
1232
      memcpy(pCol->pData, pStart, colLength);
6,016,678✔
1233
    }
1234

1235
    pStart += pCol->info.bytes * capacity;
6,236,134✔
1236
  }
1237

1238
  return TSDB_CODE_SUCCESS;
1,081,500✔
1239
}
1240

1241
size_t blockDataGetRowSize(SSDataBlock* pBlock) {
33,897,029✔
1242
  if (pBlock->info.rowSize == 0) {
33,897,029!
1243
    size_t rowSize = 0;
×
1244

1245
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
×
1246
    for (int32_t i = 0; i < numOfCols; ++i) {
×
1247
      SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
×
1248
      if (pColInfo == NULL) {
×
1249
        continue;
×
1250
      }
1251

1252
      rowSize += pColInfo->info.bytes;
×
1253
    }
1254

1255
    pBlock->info.rowSize = rowSize;
×
1256
  }
1257

1258
  return pBlock->info.rowSize;
33,897,029✔
1259
}
1260

1261
/**
1262
 * @refitem blockDataToBuf for the meta size
1263
 * @param pBlock
1264
 * @return
1265
 */
1266
size_t blockDataGetSerialMetaSize(uint32_t numOfCols) {
8,381,384✔
1267
  // | version | total length | total rows | blankFull | total columns | flag seg| block group id | column schema
1268
  // | each column length
1269
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(bool) + sizeof(int32_t) + sizeof(int32_t) +
1270
         sizeof(uint64_t) + numOfCols * (sizeof(int8_t) + sizeof(int32_t)) + numOfCols * sizeof(int32_t);
8,381,384✔
1271
}
1272

1273
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
57,997✔
1274
  double rowSize = 0;
57,997✔
1275

1276
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
57,997✔
1277
  for (int32_t i = 0; i < numOfCols; ++i) {
225,590✔
1278
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
167,592✔
1279
    if (pColInfo == NULL) {
167,589!
1280
      continue;
×
1281
    }
1282

1283
    rowSize += pColInfo->info.bytes;
167,589✔
1284
    if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
167,589!
1285
      rowSize += sizeof(int32_t);
11,740✔
1286
    } else {
1287
      rowSize += 1 / 8.0;  // one bit for each record
155,849✔
1288
    }
1289
  }
1290

1291
  return rowSize;
57,998✔
1292
}
1293

1294
typedef struct SSDataBlockSortHelper {
1295
  SArray*      orderInfo;  // SArray<SBlockOrderInfo>
1296
  SSDataBlock* pDataBlock;
1297
} SSDataBlockSortHelper;
1298

1299
int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
757,436,984✔
1300
  const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*)param;
757,436,984✔
1301

1302
  SSDataBlock* pDataBlock = pHelper->pDataBlock;
757,436,984✔
1303

1304
  int32_t left = *(int32_t*)p1;
757,436,984✔
1305
  int32_t right = *(int32_t*)p2;
757,436,984✔
1306

1307
  SArray* pInfo = pHelper->orderInfo;
757,436,984✔
1308

1309
  for (int32_t i = 0; i < pInfo->size; ++i) {
1,567,889,770✔
1310
    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
1,419,975,092✔
1311
    SColumnInfoData* pColInfoData = pOrder->pColData;  // TARRAY_GET_ELEM(pDataBlock->pDataBlock, pOrder->colIndex);
1,419,975,092✔
1312

1313
    if (pColInfoData->hasNull) {
1,419,975,092✔
1314
      bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, NULL);
1,334,049,651!
1315
      bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, right, NULL);
1,334,049,651!
1316
      if (leftNull && rightNull) {
1,334,049,651✔
1317
        continue;  // continue to next slot
25,668,763✔
1318
      }
1319

1320
      if (rightNull) {
1,308,380,888✔
1321
        return pOrder->nullFirst ? 1 : -1;
1,777,558✔
1322
      }
1323

1324
      if (leftNull) {
1,306,603,330✔
1325
        return pOrder->nullFirst ? -1 : 1;
3,397,366✔
1326
      }
1327
    }
1328

1329
    void* left1 = colDataGetData(pColInfoData, left);
1,389,131,405!
1330
    void* right1 = colDataGetData(pColInfoData, right);
1,389,131,405!
1331
    if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
1,389,131,405✔
1332
      if (tTagIsJson(left1) || tTagIsJson(right1)) {
1,056!
1333
        terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
33,845,560✔
1334
        return 0;
×
1335
      }
1336
    }
1337

1338
    __compar_fn_t fn;
1339
    if (pOrder->compFn) {
1,355,285,845!
1340
      fn = pOrder->compFn;
1,355,285,845✔
1341
    } else {
1342
      fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
×
1343
    }
1344

1345
    int ret = fn(left1, right1);
1,355,285,845✔
1346
    if (ret == 0) {
1,339,923,137✔
1347
      continue;
784,784,023✔
1348
    } else {
1349
      return ret;
555,139,114✔
1350
    }
1351
  }
1352

1353
  return 0;
147,914,678✔
1354
}
1355

1356
static void blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, const int32_t* index) {
104,140✔
1357
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
104,140✔
1358
  for (int32_t i = 0; i < numOfCols; ++i) {
402,222✔
1359
    SColumnInfoData* pDst = &pCols[i];
298,076✔
1360
    SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
298,076✔
1361
    if (pSrc == NULL) {
298,078!
1362
      continue;
×
1363
    }
1364

1365
    if (IS_VAR_DATA_TYPE(pSrc->info.type)) {
298,078!
1366
      if (pSrc->varmeta.length != 0) {
110,409✔
1367
        memcpy(pDst->pData, pSrc->pData, pSrc->varmeta.length);
106,810✔
1368
      }
1369
      pDst->varmeta.length = pSrc->varmeta.length;
110,409✔
1370

1371
      for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
53,746,892✔
1372
        pDst->varmeta.offset[j] = pSrc->varmeta.offset[index[j]];
53,636,483✔
1373
      }
1374
    } else {
1375
      for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
217,856,968✔
1376
        if (colDataIsNull_f(pSrc, index[j])) {
217,669,299✔
1377
          colDataSetNull_f_s(pDst, j);
46,892,988✔
1378
          continue;
46,892,988✔
1379
        }
1380
        memcpy(pDst->pData + j * pDst->info.bytes, pSrc->pData + index[j] * pDst->info.bytes, pDst->info.bytes);
170,776,311✔
1381
      }
1382
    }
1383
  }
1384
}
104,146✔
1385

1386
static int32_t createHelpColInfoData(const SSDataBlock* pDataBlock, SColumnInfoData** ppCols) {
104,142✔
1387
  int32_t code = 0;
104,142✔
1388
  int32_t rows = pDataBlock->info.capacity;
104,142✔
1389
  size_t  numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
104,142✔
1390
  int32_t i = 0;
104,143✔
1391

1392
  *ppCols = NULL;
104,143✔
1393

1394
  SColumnInfoData* pCols = taosMemoryCalloc(numOfCols, sizeof(SColumnInfoData));
104,143!
1395
  if (pCols == NULL) {
104,143!
1396
    return terrno;
×
1397
  }
1398

1399
  for (i = 0; i < numOfCols; ++i) {
402,215✔
1400
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
298,070✔
1401
    if (pColInfoData == NULL) {
298,072!
1402
      continue;
×
1403
    }
1404

1405
    pCols[i].info = pColInfoData->info;
298,072✔
1406
    if (IS_VAR_DATA_TYPE(pCols[i].info.type)) {
298,072!
1407
      pCols[i].varmeta.offset = taosMemoryCalloc(rows, sizeof(int32_t));
110,405!
1408
      pCols[i].pData = taosMemoryCalloc(1, pColInfoData->varmeta.length);
110,411!
1409
      if (pCols[i].varmeta.offset == NULL || pCols[i].pData == NULL) {
110,412!
1410
        code = terrno;
1✔
1411
        taosMemoryFree(pCols[i].varmeta.offset);
×
1412
        taosMemoryFree(pCols[i].pData);
×
1413
        goto _error;
×
1414
      }
1415

1416
      pCols[i].varmeta.length = pColInfoData->varmeta.length;
110,411✔
1417
      pCols[i].varmeta.allocLen = pCols[i].varmeta.length;
110,411✔
1418
    } else {
1419
      pCols[i].nullbitmap = taosMemoryCalloc(1, BitmapLen(rows));
187,667✔
1420
      pCols[i].pData = taosMemoryCalloc(rows, pCols[i].info.bytes);
187,669!
1421
      if (pCols[i].nullbitmap == NULL || pCols[i].pData == NULL) {
187,666✔
1422
        code = terrno;
5✔
1423
        taosMemoryFree(pCols[i].nullbitmap);
×
1424
        taosMemoryFree(pCols[i].pData);
×
1425
        goto _error;
×
1426
      }
1427
    }
1428
  }
1429

1430
  *ppCols = pCols;
104,145✔
1431
  return code;
104,145✔
1432

1433
_error:
×
1434
  for (int32_t j = 0; j < i; ++j) {
×
1435
    colDataDestroy(&pCols[j]);
×
1436
  }
1437

1438
  taosMemoryFree(pCols);
×
1439
  return code;
×
1440
}
1441

1442
static void copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) {
104,146✔
1443
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
104,146✔
1444

1445
  for (int32_t i = 0; i < numOfCols; ++i) {
402,216✔
1446
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
298,072✔
1447
    if (pColInfoData == NULL) {
298,070!
1448
      continue;
×
1449
    }
1450

1451
    pColInfoData->info = pCols[i].info;
298,070✔
1452
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
298,070✔
1453
      taosMemoryFreeClear(pColInfoData->varmeta.offset);
110,407!
1454
      pColInfoData->varmeta = pCols[i].varmeta;
110,407✔
1455
    } else {
1456
      taosMemoryFreeClear(pColInfoData->nullbitmap);
187,663!
1457
      pColInfoData->nullbitmap = pCols[i].nullbitmap;
187,660✔
1458
    }
1459

1460
    taosMemoryFreeClear(pColInfoData->pData);
298,067✔
1461
    pColInfoData->pData = pCols[i].pData;
298,070✔
1462
  }
1463

1464
  taosMemoryFreeClear(pCols);
104,144!
1465
}
104,145✔
1466

1467
static int32_t* createTupleIndex(size_t rows) {
104,140✔
1468
  int32_t* index = taosMemoryCalloc(rows, sizeof(int32_t));
104,140!
1469
  if (index == NULL) {
104,143!
1470
    return NULL;
×
1471
  }
1472

1473
  for (int32_t i = 0; i < rows; ++i) {
49,099,947✔
1474
    index[i] = i;
48,995,804✔
1475
  }
1476

1477
  return index;
104,143✔
1478
}
1479

1480
static void destroyTupleIndex(int32_t* index) { taosMemoryFreeClear(index); }
104,143!
1481

1482
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
108,492✔
1483
  if (pDataBlock->info.rows <= 1) {
108,492✔
1484
    return TSDB_CODE_SUCCESS;
4,347✔
1485
  }
1486

1487
  // Allocate the additional buffer.
1488
  uint32_t rows = pDataBlock->info.rows;
104,145✔
1489

1490
  bool sortColumnHasNull = false;
104,145✔
1491
  bool varTypeSort = false;
104,145✔
1492

1493
  for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
245,561✔
1494
    SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
141,416✔
1495
    if (pInfo == NULL) {
141,416!
1496
      continue;
×
1497
    }
1498

1499
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
141,416✔
1500
    if (pColInfoData == NULL) {
141,416!
1501
      continue;
×
1502
    }
1503

1504
    if (pColInfoData->hasNull) {
141,416✔
1505
      sortColumnHasNull = true;
141,352✔
1506
    }
1507

1508
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
141,416!
1509
      varTypeSort = true;
95,587✔
1510
    }
1511
  }
1512

1513
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
104,143✔
1514

1515
  if (taosArrayGetSize(pOrderInfo) == 1 && (!sortColumnHasNull)) {
104,145✔
1516
    if (numOfCols == 1) {
45!
1517
      if (!varTypeSort) {
×
1518
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, 0);
×
1519
        SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, 0);
×
1520
        if (pColInfoData == NULL || pOrder == NULL) {
×
1521
          return terrno;
×
1522
        }
1523

1524
        int64_t p0 = taosGetTimestampUs();
×
1525

1526
        __compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
×
1527
        taosSort(pColInfoData->pData, pDataBlock->info.rows, pColInfoData->info.bytes, fn);
×
1528

1529
        int64_t p1 = taosGetTimestampUs();
×
1530
        uDebug("blockDataSort easy cost:%" PRId64 ", rows:%" PRId64 "\n", p1 - p0, pDataBlock->info.rows);
×
1531

1532
        return TSDB_CODE_SUCCESS;
×
1533
      } else {  // var data type
1534
      }
1535
    } else if (numOfCols == 2) {
1536
    }
1537
  }
1538

1539
  int32_t* index = createTupleIndex(rows);
104,145✔
1540
  if (index == NULL) {
104,143!
1541
    return terrno;
×
1542
  }
1543

1544
  int64_t p0 = taosGetTimestampUs();
104,144✔
1545

1546
  SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
104,144✔
1547
  for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) {
245,555✔
1548
    struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i);
141,414✔
1549
    if (pInfo == NULL) {
141,414!
1550
      continue;
×
1551
    }
1552

1553
    pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
141,414✔
1554
    if (pInfo->pColData == NULL) {
141,415!
1555
      continue;
×
1556
    }
1557
    pInfo->compFn = getKeyComparFunc(pInfo->pColData->info.type, pInfo->order);
141,415✔
1558
  }
1559

1560
  terrno = 0;
104,142✔
1561
  taosqsort_r(index, rows, sizeof(int32_t), &helper, dataBlockCompar);
104,143✔
1562
  if (terrno) return terrno;
104,145!
1563

1564
  int64_t p1 = taosGetTimestampUs();
104,146✔
1565

1566
  SColumnInfoData* pCols = NULL;
104,146✔
1567
  int32_t          code = createHelpColInfoData(pDataBlock, &pCols);
104,146✔
1568
  if (code != 0) {
104,143!
1569
    destroyTupleIndex(index);
×
1570
    return code;
×
1571
  }
1572

1573
  int64_t p2 = taosGetTimestampUs();
104,140✔
1574
  blockDataAssign(pCols, pDataBlock, index);
104,140✔
1575

1576
  int64_t p3 = taosGetTimestampUs();
104,146✔
1577
  copyBackToBlock(pDataBlock, pCols);
104,146✔
1578

1579
  int64_t p4 = taosGetTimestampUs();
104,145✔
1580
  uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64
104,145!
1581
         ", rows:%d\n",
1582
         p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows);
1583

1584
  destroyTupleIndex(index);
104,145✔
1585
  return TSDB_CODE_SUCCESS;
104,145✔
1586
}
1587

1588
void blockDataCleanup(SSDataBlock* pDataBlock) {
19,662,081✔
1589
  if(pDataBlock == NULL) {
19,662,081!
1590
    return;
×
1591
  }
1592
  blockDataEmpty(pDataBlock);
19,662,081✔
1593
  SDataBlockInfo* pInfo = &pDataBlock->info;
19,662,095✔
1594
  pInfo->id.uid = 0;
19,662,095✔
1595
  pInfo->id.groupId = 0;
19,662,095✔
1596
}
1597

1598
void blockDataEmpty(SSDataBlock* pDataBlock) {
20,159,411✔
1599
  SDataBlockInfo* pInfo = &pDataBlock->info;
20,159,411✔
1600
  if (pInfo->capacity == 0) {
20,159,411✔
1601
    return;
3,067,841✔
1602
  }
1603

1604
  taosMemoryFreeClear(pDataBlock->pBlockAgg);
17,091,570!
1605

1606
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
17,091,570✔
1607
  for (int32_t i = 0; i < numOfCols; ++i) {
88,238,636✔
1608
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
71,153,588✔
1609
    if (p == NULL) {
71,131,722!
1610
      continue;
×
1611
    }
1612

1613
    colInfoDataCleanup(p, pInfo->capacity);
71,131,722✔
1614
  }
1615

1616
  pInfo->rows = 0;
17,085,048✔
1617
  pInfo->dataLoad = 0;
17,085,048✔
1618
  pInfo->window.ekey = 0;
17,085,048✔
1619
  pInfo->window.skey = 0;
17,085,048✔
1620
}
1621

1622
void blockDataReset(SSDataBlock* pDataBlock) {
×
1623
  SDataBlockInfo* pInfo = &pDataBlock->info;
×
1624
  if (pInfo->capacity == 0) {
×
1625
    return;
×
1626
  }
1627

1628
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
×
1629
  for (int32_t i = 0; i < numOfCols; ++i) {
×
1630
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
×
1631
    if (p == NULL) {
×
1632
      continue;
×
1633
    }
1634

1635
    p->hasNull = false;
×
1636
    p->reassigned = false;
×
1637
    if (IS_VAR_DATA_TYPE(p->info.type)) {
×
1638
      p->varmeta.length = 0;
×
1639
    }
1640
  }
1641

1642
  pInfo->rows = 0;
×
1643
  pInfo->dataLoad = 0;
×
1644
  pInfo->window.ekey = 0;
×
1645
  pInfo->window.skey = 0;
×
1646
  pInfo->id.uid = 0;
×
1647
  pInfo->id.groupId = 0;
×
1648
}
1649

1650
/*
1651
 * NOTE: the type of the input column may be TSDB_DATA_TYPE_NULL, which is used to denote
1652
 * the all NULL value in this column. It is an internal representation of all NULL value column, and no visible to
1653
 * any users. The length of TSDB_DATA_TYPE_NULL is 0, and it is an special case.
1654
 */
1655
int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows,
68,173,293✔
1656
                         bool clearPayload) {
1657
  if ((numOfRows <= 0) || (pBlockInfo && numOfRows <= pBlockInfo->capacity)) {
68,173,293!
1658
    return TSDB_CODE_SUCCESS;
×
1659
  }
1660

1661
  int32_t existedRows = pBlockInfo ? pBlockInfo->rows : 0;
68,173,293!
1662

1663
  if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
68,173,293!
1664
    char* tmp = taosMemoryRealloc(pColumn->varmeta.offset, sizeof(int32_t) * numOfRows);
16,444,303!
1665
    if (tmp == NULL) {
16,550,834!
1666
      return terrno;
×
1667
    }
1668

1669
    pColumn->varmeta.offset = (int32_t*)tmp;
16,550,834✔
1670
    memset(&pColumn->varmeta.offset[existedRows], 0, sizeof(int32_t) * (numOfRows - existedRows));
16,550,834✔
1671
  } else {
1672
    // prepare for the null bitmap
1673
    char* tmp = taosMemoryRealloc(pColumn->nullbitmap, BitmapLen(numOfRows));
51,728,990✔
1674
    if (tmp == NULL) {
51,559,452!
1675
      return terrno;
×
1676
    }
1677

1678
    int32_t oldLen = BitmapLen(existedRows);
51,559,452✔
1679
    pColumn->nullbitmap = tmp;
51,559,452✔
1680
    memset(&pColumn->nullbitmap[oldLen], 0, BitmapLen(numOfRows) - oldLen);
51,559,452✔
1681
    if (pColumn->info.bytes == 0) {
51,559,452!
1682
      return TSDB_CODE_INVALID_PARA;
×
1683
    }
1684

1685
    // here we employ the aligned malloc function, to make sure that the address of allocated memory is aligned
1686
    // to MALLOC_ALIGN_BYTES
1687
    tmp = taosMemoryMallocAlign(MALLOC_ALIGN_BYTES, numOfRows * pColumn->info.bytes);
51,559,452!
1688
    if (tmp == NULL) {
51,649,639!
1689
      return terrno;
×
1690
    }
1691
    // memset(tmp, 0, numOfRows * pColumn->info.bytes);
1692

1693
    // copy back the existed data
1694
    if (pColumn->pData != NULL) {
51,649,639✔
1695
      memcpy(tmp, pColumn->pData, existedRows * pColumn->info.bytes);
5,300,431✔
1696
      taosMemoryFreeClear(pColumn->pData);
5,300,431✔
1697
    }
1698

1699
    pColumn->pData = tmp;
51,593,545✔
1700

1701
    // check if the allocated memory is aligned to the requried bytes.
1702
#if defined LINUX
1703
    if ((((uint64_t)pColumn->pData) & (MALLOC_ALIGN_BYTES - 1)) != 0x0) {
51,593,545!
1704
      return TSDB_CODE_FAILED;
×
1705
    }
1706
#endif
1707

1708
    if (clearPayload) {
51,593,545✔
1709
      memset(tmp + pColumn->info.bytes * existedRows, 0, pColumn->info.bytes * (numOfRows - existedRows));
23,419,826✔
1710
    }
1711
  }
1712

1713
  return TSDB_CODE_SUCCESS;
68,144,379✔
1714
}
1715

1716
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
78,877,017✔
1717
  pColumn->hasNull = false;
78,877,017✔
1718

1719
  if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
78,877,017!
1720
    pColumn->varmeta.length = 0;
20,132,247✔
1721
    if (pColumn->varmeta.offset != NULL) {
20,132,247!
1722
      memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows);
20,333,319✔
1723
    }
1724
  } else {
1725
    if (pColumn->nullbitmap != NULL) {
58,744,770✔
1726
      memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
58,687,701✔
1727
    }
1728
  }
1729
}
78,877,017✔
1730

1731
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload) {
29,887,467✔
1732
  SDataBlockInfo info = {0};
29,887,467✔
1733
  return doEnsureCapacity(pColumn, &info, numOfRows, clearPayload);
29,887,467✔
1734
}
1735

1736
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
55,250,204✔
1737
  int32_t code = 0;
55,250,204✔
1738
  if (numOfRows == 0 || numOfRows <= pDataBlock->info.capacity) {
55,250,204!
1739
    return TSDB_CODE_SUCCESS;
45,839,514✔
1740
  }
1741

1742
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
9,410,690✔
1743
  for (int32_t i = 0; i < numOfCols; ++i) {
47,469,183✔
1744
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
38,050,754✔
1745
    if (p == NULL) {
37,944,700!
1746
      return terrno;
×
1747
    }
1748

1749
    code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, false);
37,944,700✔
1750
    if (code) {
38,059,032!
1751
      return code;
×
1752
    }
1753
  }
1754

1755
  pDataBlock->info.capacity = numOfRows;
9,418,429✔
1756
  return TSDB_CODE_SUCCESS;
9,418,429✔
1757
}
1758

1759
void blockDataFreeRes(SSDataBlock* pBlock) {
10,748,948✔
1760
  if (pBlock == NULL) {
10,748,948✔
1761
    return;
130,306✔
1762
  }
1763

1764
  int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
10,618,642✔
1765
  for (int32_t i = 0; i < numOfOutput; ++i) {
54,901,620✔
1766
    SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
44,268,067✔
1767
    if (pColInfoData == NULL) {
44,219,883!
1768
      continue;
×
1769
    }
1770

1771
    colDataDestroy(pColInfoData);
44,219,883✔
1772
  }
1773

1774
  taosArrayDestroy(pBlock->pDataBlock);
10,633,553✔
1775
  pBlock->pDataBlock = NULL;
10,620,196✔
1776

1777
  taosMemoryFreeClear(pBlock->pBlockAgg);
10,620,196!
1778
  memset(&pBlock->info, 0, sizeof(SDataBlockInfo));
10,619,398✔
1779
}
1780

1781
void blockDataDestroy(SSDataBlock* pBlock) {
11,627,631✔
1782
  if (pBlock == NULL) {
11,627,631✔
1783
    return;
1,140,728✔
1784
  }
1785

1786
  if (IS_VAR_DATA_TYPE(pBlock->info.pks[0].type)) {
10,486,903!
1787
    taosMemoryFreeClear(pBlock->info.pks[0].pData);
60,450!
1788
    taosMemoryFreeClear(pBlock->info.pks[1].pData);
60,453!
1789
  }
1790

1791
  blockDataFreeRes(pBlock);
10,486,904✔
1792
  taosMemoryFreeClear(pBlock);
10,487,417!
1793
}
1794

1795
// todo remove it
1796
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
×
1797
  int32_t code = 0;
×
1798

1799
  dst->info = src->info;
×
1800
  dst->info.pks[0].pData = NULL;
×
1801
  dst->info.pks[1].pData = NULL;
×
1802
  dst->info.rows = 0;
×
1803
  dst->info.capacity = 0;
×
1804

1805
  size_t numOfCols = taosArrayGetSize(src->pDataBlock);
×
1806
  for (int32_t i = 0; i < numOfCols; ++i) {
×
1807
    SColumnInfoData* p = taosArrayGet(src->pDataBlock, i);
×
1808
    if (p == NULL) {
×
1809
      return terrno;
×
1810
    }
1811

1812
    SColumnInfoData colInfo = {.hasNull = true, .info = p->info};
×
1813
    code = blockDataAppendColInfo(dst, &colInfo);
×
1814
    if (code) {
×
1815
      return code;
×
1816
    }
1817
  }
1818

1819
  code = blockDataEnsureCapacity(dst, src->info.rows);
×
1820
  if (code != TSDB_CODE_SUCCESS) {
×
1821
    return code;
×
1822
  }
1823

1824
  for (int32_t i = 0; i < numOfCols; ++i) {
×
1825
    SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i);
×
1826
    SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i);
×
1827
    if (pSrc == NULL || pDst == NULL || (pSrc->pData == NULL && (!IS_VAR_DATA_TYPE(pSrc->info.type)))) {
×
1828
      continue;
×
1829
    }
1830

1831
    int32_t ret = colDataAssign(pDst, pSrc, src->info.rows, &src->info);
×
1832
    if (ret < 0) {
×
1833
      return ret;
×
1834
    }
1835
  }
1836

1837
  uint32_t cap = dst->info.capacity;
×
1838
  dst->info = src->info;
×
1839
  dst->info.pks[0].pData = NULL;
×
1840
  dst->info.pks[1].pData = NULL;
×
1841
  dst->info.capacity = cap;
×
1842
  uTrace("%s,parName:%s, groupId:%"PRIu64, __FUNCTION__, dst->info.parTbName, dst->info.id.groupId)
×
1843
  return code;
×
1844
}
1845

1846
int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) {
548,818✔
1847
  blockDataCleanup(pDst);
548,818✔
1848

1849
  int32_t code = blockDataEnsureCapacity(pDst, pSrc->info.rows);
548,762✔
1850
  if (code != TSDB_CODE_SUCCESS) {
548,787!
1851
    return code;
×
1852
  }
1853

1854
  size_t numOfCols = taosArrayGetSize(pSrc->pDataBlock);
548,787✔
1855
  for (int32_t i = 0; i < numOfCols; ++i) {
4,681,508✔
1856
    SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
4,139,462✔
1857
    SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, i);
4,130,377✔
1858
    if (pDstCol == NULL || pSrcCol == NULL) {
4,123,593!
1859
      continue;
×
1860
    }
1861

1862
    int32_t ret = colDataAssign(pDstCol, pSrcCol, pSrc->info.rows, &pSrc->info);
4,123,593✔
1863
    if (ret < 0) {
4,132,691!
1864
      code = ret;
×
1865
      return code;
×
1866
    }
1867
  }
1868

1869
  uint32_t cap = pDst->info.capacity;
542,046✔
1870

1871
  if (IS_VAR_DATA_TYPE(pDst->info.pks[0].type)) {
542,046!
1872
    taosMemoryFreeClear(pDst->info.pks[0].pData);
×
1873
  }
1874

1875
  if (IS_VAR_DATA_TYPE(pDst->info.pks[1].type)) {
542,046!
1876
    taosMemoryFreeClear(pDst->info.pks[1].pData);
×
1877
  }
1878

1879
  pDst->info = pSrc->info;
542,046✔
1880
  code = copyPkVal(&pDst->info, &pSrc->info);
542,046✔
1881
  if (code != TSDB_CODE_SUCCESS) {
550,121✔
1882
    uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
12!
1883
    return code;
×
1884
  }
1885

1886
  pDst->info.capacity = cap;
550,109✔
1887
  return code;
550,109✔
1888
}
1889

1890
int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock) {
4,139✔
1891
  QRY_PARAM_CHECK(pBlock);
4,139!
1892

1893
  int32_t      code = 0;
4,139✔
1894
  SSDataBlock* p = taosMemoryCalloc(1, sizeof(SSDataBlock));
4,139!
1895
  if (p == NULL) {
4,140!
1896
    return terrno;
×
1897
  }
1898

1899
  p->info.hasVarCol = false;
4,140✔
1900
  p->info.id.groupId = 0;
4,140✔
1901
  p->info.rows = 0;
4,140✔
1902
  p->info.type = type;
4,140✔
1903
  p->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) +
4,140✔
1904
                    sizeof(TSKEY) + VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
1905
  p->info.watermark = INT64_MIN;
4,140✔
1906

1907
  p->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
4,140✔
1908
  if (p->pDataBlock == NULL) {
4,140!
1909
    taosMemoryFree(p);
×
1910
    return terrno;
×
1911
  }
1912

1913
  SColumnInfoData infoData = {0};
4,140✔
1914
  infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
4,140✔
1915
  infoData.info.bytes = sizeof(TSKEY);
4,140✔
1916

1917
  // window start ts
1918
  void* px = taosArrayPush(p->pDataBlock, &infoData);
4,140✔
1919
  if (px == NULL) {
4,140!
1920
    code = terrno;
×
1921
    goto _err;
×
1922
  }
1923

1924
  // window end ts
1925
  px = taosArrayPush(p->pDataBlock, &infoData);
4,140✔
1926
  if (px == NULL) {
4,140!
1927
    code = terrno;
×
1928
    goto _err;
×
1929
  }
1930

1931
  infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
4,140✔
1932
  infoData.info.bytes = sizeof(uint64_t);
4,140✔
1933

1934
  // uid
1935
  px = taosArrayPush(p->pDataBlock, &infoData);
4,140✔
1936
  if (px == NULL) {
4,140!
1937
    code = terrno;
×
1938
    goto _err;
×
1939
  }
1940

1941
  // group id
1942
  px = taosArrayPush(p->pDataBlock, &infoData);
4,140✔
1943
  if (px == NULL) {
4,140!
1944
    code = terrno;
×
1945
    goto _err;
×
1946
  }
1947

1948
  infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
4,140✔
1949
  infoData.info.bytes = sizeof(TSKEY);
4,140✔
1950

1951
  // calculate start ts
1952
  px = taosArrayPush(p->pDataBlock, &infoData);
4,140✔
1953
  if (px == NULL) {
4,140!
1954
    code = terrno;
×
1955
    goto _err;
×
1956
  }
1957

1958
  // calculate end ts
1959
  px = taosArrayPush(p->pDataBlock, &infoData);
4,140✔
1960
  if (px == NULL) {
4,140!
1961
    code = terrno;
×
1962
    goto _err;
×
1963
  }
1964

1965
  // table name
1966
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
4,140✔
1967
  infoData.info.bytes = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
4,140✔
1968
  px = taosArrayPush(p->pDataBlock, &infoData);
4,140✔
1969
  if (px == NULL) {
4,140!
1970
    code = terrno;
×
1971
    goto _err;
×
1972
  }
1973

1974
  *pBlock = p;
4,140✔
1975
  return code;
4,140✔
1976

1977
_err:
×
1978
  taosArrayDestroy(p->pDataBlock);
×
1979
  taosMemoryFree(p);
×
1980
  return code;
×
1981
}
1982

1983
int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlock** pResBlock) {
×
1984
  QRY_PARAM_CHECK(pResBlock);
×
1985

1986
  if (pDataBlock == NULL) {
×
1987
    return TSDB_CODE_INVALID_PARA;
×
1988
  }
1989

1990
  SSDataBlock* pBlock = NULL;
×
1991
  int32_t      code = createDataBlock(&pBlock);
×
1992
  if (code) {
×
1993
    return code;
×
1994
  }
1995

1996
  pBlock->info = pDataBlock->info;
×
1997
  pBlock->info.pks[0].pData = NULL;
×
1998
  pBlock->info.pks[1].pData = NULL;
×
1999
  pBlock->info.rows = 0;
×
2000
  pBlock->info.capacity = 0;
×
2001

2002
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
×
2003
  for (int32_t i = 0; i < numOfCols; ++i) {
×
2004
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
×
2005
    if (p == NULL) {
×
2006
      blockDataDestroy(pBlock);
×
2007
      return terrno;
×
2008
    }
2009

2010
    SColumnInfoData colInfo = {.hasNull = true, .info = p->info};
×
2011
    code = blockDataAppendColInfo(pBlock, &colInfo);
×
2012
    if (code) {
×
2013
      blockDataDestroy(pBlock);
×
2014
      return code;
×
2015
    }
2016
  }
2017

2018
  code = blockDataEnsureCapacity(pBlock, 1);
×
2019
  if (code != TSDB_CODE_SUCCESS) {
×
2020
    blockDataDestroy(pBlock);
×
2021
    return code;
×
2022
  }
2023

2024
  for (int32_t i = 0; i < numOfCols; ++i) {
×
2025
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
×
2026
    SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
×
2027
    if (pDst == NULL || pSrc == NULL) {
×
2028
      blockDataDestroy(pBlock);
×
2029
      return terrno;
×
2030
    }
2031

2032
    bool  isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL);
×
2033
    void* pData = NULL;
×
2034
    if (!isNull) {
×
2035
      pData = colDataGetData(pSrc, rowIdx);
×
2036
    }
2037

2038
    code = colDataSetVal(pDst, 0, pData, isNull);
×
2039
    if (code) {
×
2040
      blockDataDestroy(pBlock);
×
2041
      return code;
×
2042
    }
2043
  }
2044

2045
  pBlock->info.rows = 1;
×
2046

2047
  *pResBlock = pBlock;
×
2048
  return code;
×
2049
}
2050

2051
int32_t copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc) {
6,959,982✔
2052
  int32_t code = TSDB_CODE_SUCCESS;
6,959,982✔
2053
  int32_t lino = 0;
6,959,982✔
2054
  if (!IS_VAR_DATA_TYPE(pSrc->pks[0].type)) {
6,959,982!
2055
    return code;
6,916,719✔
2056
  }
2057

2058
  // prepare the pk buffer if needed
2059
  SValue* p = &pDst->pks[0];
43,263✔
2060

2061
  p->type = pSrc->pks[0].type;
43,263✔
2062
  p->pData = taosMemoryCalloc(1, pSrc->pks[0].nData);
43,263!
2063
  QUERY_CHECK_NULL(p->pData, code, lino, _end, terrno);
43,694!
2064

2065
  p->nData = pSrc->pks[0].nData;
43,694✔
2066
  memcpy(p->pData, pSrc->pks[0].pData, p->nData);
43,694✔
2067

2068
  p = &pDst->pks[1];
43,694✔
2069
  p->type = pSrc->pks[1].type;
43,694✔
2070
  p->pData = taosMemoryCalloc(1, pSrc->pks[1].nData);
43,694!
2071
  QUERY_CHECK_NULL(p->pData, code, lino, _end, terrno);
43,697!
2072

2073
  p->nData = pSrc->pks[1].nData;
43,697✔
2074
  memcpy(p->pData, pSrc->pks[1].pData, p->nData);
43,697✔
2075

2076
_end:
43,697✔
2077
  if (code != TSDB_CODE_SUCCESS) {
43,697!
2078
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2079
  }
2080
  return code;
43,695✔
2081
}
2082

2083
int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataBlock** pResBlock) {
6,409,398✔
2084
  int32_t code = 0, lino = 0;
6,409,398✔
2085
  QRY_PARAM_CHECK(pResBlock);
6,409,398!
2086
  TSDB_CHECK_NULL(pDataBlock, code, lino, _exit, TSDB_CODE_INVALID_PARA);
6,409,398!
2087

2088
  SSDataBlock* pDstBlock = NULL;
6,409,398✔
2089
  TAOS_CHECK_EXIT(createDataBlock(&pDstBlock));
6,409,398!
2090

2091
  pDstBlock->info = pDataBlock->info;
6,409,974✔
2092
  pDstBlock->info.pks[0].pData = NULL;
6,409,974✔
2093
  pDstBlock->info.pks[1].pData = NULL;
6,409,974✔
2094

2095
  pDstBlock->info.rows = 0;
6,409,974✔
2096
  pDstBlock->info.capacity = 0;
6,409,974✔
2097
  pDstBlock->info.rowSize = 0;
6,409,974✔
2098
  pDstBlock->info.id = pDataBlock->info.id;
6,409,974✔
2099
  pDstBlock->info.blankFill = pDataBlock->info.blankFill;
6,409,974✔
2100

2101
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
6,409,974✔
2102
  for (int32_t i = 0; i < numOfCols; ++i) {
35,302,469✔
2103
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
28,890,167✔
2104
    if (p == NULL) {
28,820,811!
2105
      blockDataDestroy(pDstBlock);
×
2106
      TSDB_CHECK_NULL(p, code, lino, _exit, terrno);
×
2107
    }
2108

2109
    SColumnInfoData colInfo = {.hasNull = true, .info = p->info};
28,820,811✔
2110
    code = blockDataAppendColInfo(pDstBlock, &colInfo);
28,820,811✔
2111
    if (code) {
28,893,862✔
2112
      blockDataDestroy(pDstBlock);
2,403✔
2113
      TAOS_CHECK_EXIT(code);
×
2114
    }
2115
  }
2116

2117
  code = copyPkVal(&pDstBlock->info, &pDataBlock->info);
6,412,302✔
2118
  if (code != TSDB_CODE_SUCCESS) {
6,410,298!
2119
    blockDataDestroy(pDstBlock);
×
2120
    uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2121
    TAOS_CHECK_EXIT(code);
9,643!
2122
  }
2123

2124
  if (copyData) {
6,419,941✔
2125
    code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows);
3,108,837✔
2126
    if (code != TSDB_CODE_SUCCESS) {
3,109,034!
2127
      blockDataDestroy(pDstBlock);
×
2128
      TAOS_CHECK_EXIT(code);
×
2129
    }
2130

2131
    for (int32_t i = 0; i < numOfCols; ++i) {
17,334,539✔
2132
      SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
14,236,080✔
2133
      SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
14,215,490✔
2134
      if (pDst == NULL) {
14,208,025!
2135
        blockDataDestroy(pDstBlock);
×
2136
        uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2137
        TSDB_CHECK_NULL(pDst, code, lino, _exit, terrno);
×
2138
      }
2139

2140
      if (pSrc == NULL) {
14,208,025!
2141
        blockDataDestroy(pDstBlock);
×
2142
        uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2143
        TSDB_CHECK_NULL(pSrc, code, lino, _exit, terrno);
×
2144
      }
2145

2146
      int32_t ret = colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
14,208,025✔
2147
      if (ret < 0) {
14,224,523!
2148
        code = ret;
×
2149
        blockDataDestroy(pDstBlock);
×
2150
        TAOS_CHECK_EXIT(code);
×
2151
      }
2152
    }
2153

2154
    pDstBlock->info.rows = pDataBlock->info.rows;
3,098,459✔
2155
    pDstBlock->info.capacity = pDataBlock->info.rows;
3,098,459✔
2156
  }
2157

2158
  *pResBlock = pDstBlock;
6,409,563✔
2159

2160
_exit:
6,409,563✔
2161
  
2162
  return code;
6,409,563✔
2163
}
2164

2165
int32_t createOneDataBlockWithColArray(const SSDataBlock* pDataBlock, SArray* pColArray, SSDataBlock** pResBlock) {
260✔
2166
  int32_t      code = TSDB_CODE_SUCCESS;
260✔
2167
  int32_t      lino = 0;
260✔
2168
  SSDataBlock* pDstBlock = NULL;
260✔
2169

2170
  QRY_PARAM_CHECK(pResBlock);
260!
2171
  QUERY_CHECK_NULL(pDataBlock, code, lino, _return, TSDB_CODE_INVALID_PARA);
260!
2172

2173
  QUERY_CHECK_CODE(createDataBlock(&pDstBlock), lino, _return);
260!
2174

2175
  pDstBlock->info = pDataBlock->info;
260✔
2176
  pDstBlock->info.pks[0].pData = NULL;
260✔
2177
  pDstBlock->info.pks[1].pData = NULL;
260✔
2178

2179
  pDstBlock->info.rows = 0;
260✔
2180
  pDstBlock->info.capacity = 0;
260✔
2181
  pDstBlock->info.rowSize = 0;
260✔
2182
  pDstBlock->info.id = pDataBlock->info.id;
260✔
2183
  pDstBlock->info.blankFill = pDataBlock->info.blankFill;
260✔
2184

2185
  for (int32_t i = 0; i < taosArrayGetSize(pColArray); ++i) {
1,508✔
2186
    SColIdPair* pColPair = taosArrayGet(pColArray, i);
1,248✔
2187
    QUERY_CHECK_NULL(pColPair, code, lino, _return, terrno);
1,248!
2188

2189
    for (int32_t j = 0; j < taosArrayGetSize(pDataBlock->pDataBlock); ++j) {
8,784!
2190
      SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, j);
8,784✔
2191
      if (p == NULL) {
8,784!
2192
        continue;
×
2193
      }
2194

2195
      if (p->info.colId == pColPair->vtbColId) {
8,784✔
2196
        QUERY_CHECK_CODE(blockDataAppendColInfo(pDstBlock, p), lino, _return);
1,248!
2197
        break;
1,248✔
2198
      }
2199
    }
2200
  }
2201

2202
  *pResBlock = pDstBlock;
260✔
2203
  return code;
260✔
2204
_return:
×
2205
  uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2206
  blockDataDestroy(pDstBlock);
×
2207
  return code;
×
2208
}
2209

2210
int32_t createOneDataBlockWithTwoBlock(const SSDataBlock* pDataBlock, const SSDataBlock* pOrgBlock,
187✔
2211
                                       SSDataBlock** pResBlock) {
2212
  int32_t      code = TSDB_CODE_SUCCESS;
187✔
2213
  int32_t      lino = 0;
187✔
2214
  SSDataBlock* pDstBlock = NULL;
187✔
2215

2216
  QRY_PARAM_CHECK(pResBlock);
187!
2217
  QUERY_CHECK_NULL(pDataBlock, code, lino, _return, TSDB_CODE_INVALID_PARA);
187!
2218
  QUERY_CHECK_NULL(pOrgBlock, code, lino, _return, TSDB_CODE_INVALID_PARA);
187!
2219

2220
  QUERY_CHECK_CODE(createOneDataBlock(pOrgBlock, false, &pDstBlock), lino, _return);
187!
2221
  QUERY_CHECK_CODE(blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows), lino, _return);
187!
2222

2223
  for (int32_t i = 0; i < taosArrayGetSize(pOrgBlock->pDataBlock); ++i) {
4,046✔
2224
    SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
3,859✔
2225
    SColumnInfoData* pSrc = taosArrayGet(pOrgBlock->pDataBlock, i);
3,859✔
2226

2227
    QUERY_CHECK_NULL(pDst, code, lino, _return, terrno);
3,859!
2228
    QUERY_CHECK_NULL(pSrc, code, lino, _return, terrno);
3,859!
2229

2230
    bool found = false;
3,859✔
2231
    for (int32_t j = 0; j < taosArrayGetSize(pDataBlock->pDataBlock); j++) {
20,401✔
2232
      SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, j);
17,454✔
2233
      if (p->info.slotId == pSrc->info.slotId) {
17,454✔
2234
        QUERY_CHECK_CODE(colDataAssign(pDst, p, (int32_t)pDataBlock->info.rows, &pDataBlock->info), lino, _return);
912!
2235
        found = true;
912✔
2236
        break;
912✔
2237
      }
2238
    }
2239
    if (!found) {
3,859✔
2240
      colDataSetNNULL(pDst, 0, pDataBlock->info.rows);
2,947✔
2241
    }
2242
  }
2243

2244
  pDstBlock->info.rows = pDataBlock->info.rows;
187✔
2245
  pDstBlock->info.capacity = pDataBlock->info.rows;
187✔
2246
  pDstBlock->info.window = pDataBlock->info.window;
187✔
2247

2248
  *pResBlock = pDstBlock;
187✔
2249
  return code;
187✔
2250
_return:
×
2251
  uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2252
  blockDataDestroy(pDstBlock);
×
2253
  return code;
×
2254
}
2255

2256
int32_t createDataBlock(SSDataBlock** pResBlock) {
10,066,831✔
2257
  QRY_PARAM_CHECK(pResBlock);
10,066,831!
2258
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
10,066,831!
2259
  if (pBlock == NULL) {
10,067,446!
2260
    return terrno;
×
2261
  }
2262

2263
  pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
10,067,446✔
2264
  if (pBlock->pDataBlock == NULL) {
10,067,854✔
2265
    int32_t code = terrno;
188✔
2266
    taosMemoryFree(pBlock);
×
2267
    return code;
×
2268
  }
2269

2270
  *pResBlock = pBlock;
10,067,666✔
2271
  return 0;
10,067,666✔
2272
}
2273

2274
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData) {
43,357,496✔
2275
  if (pBlock->pDataBlock == NULL) {
43,357,496✔
2276
    pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
131,152✔
2277
    if (pBlock->pDataBlock == NULL) {
131,170!
2278
      return terrno;
×
2279
    }
2280
  }
2281

2282
  void* p = taosArrayPush(pBlock->pDataBlock, pColInfoData);
43,357,514✔
2283
  if (p == NULL) {
43,437,153!
2284
    return terrno;
×
2285
  }
2286

2287
  // todo disable it temporarily
2288
  //  A S S E R T(pColInfoData->info.type != 0);
2289
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
43,437,153!
2290
    pBlock->info.hasVarCol = true;
10,983,611✔
2291
  }
2292
  pBlock->info.rowSize += pColInfoData->info.bytes;
43,437,153✔
2293

2294
  return TSDB_CODE_SUCCESS;
43,437,153✔
2295
}
2296

2297
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId) {
14,884,389✔
2298
  SColumnInfoData col = {.hasNull = true};
14,884,389✔
2299
  col.info.colId = colId;
14,884,389✔
2300
  col.info.type = type;
14,884,389✔
2301
  col.info.bytes = bytes;
14,884,389✔
2302
  // if (type == TSDB_DATA_TYPE_BLOB || type == TSDB_DATA_TYPE_MEDIUMBLOB) {
2303
  //   col.info.bytes = TSDB_MAX_BLOB_LEN;
2304
  // }
2305
  return col;
14,884,389✔
2306
}
2307

2308
int32_t bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index, SColumnInfoData** pColInfoData) {
4,146,186✔
2309
  int32_t code = 0;
4,146,186✔
2310
  QRY_PARAM_CHECK(pColInfoData);
4,146,186!
2311

2312
  if (index >= taosArrayGetSize(pBlock->pDataBlock)) {
4,146,186!
2313
    return TSDB_CODE_INVALID_PARA;
×
2314
  }
2315

2316
  *pColInfoData = taosArrayGet(pBlock->pDataBlock, index);
4,145,880✔
2317
  if (*pColInfoData == NULL) {
4,145,853!
2318
    code = terrno;
×
2319
  }
2320

2321
  return code;
4,145,892✔
2322
}
2323

2324
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int32_t extraSize) {
184,647✔
2325
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
184,647✔
2326

2327
  int32_t payloadSize = pageSize - extraSize;
184,648✔
2328
  int32_t rowSize = pBlock->info.rowSize;
184,648✔
2329
  int32_t nRows = payloadSize / rowSize;
184,648✔
2330
  if (nRows < 1) {
184,648!
2331
    uError("rows %d in page is too small, payloadSize:%d, rowSize:%d", nRows, payloadSize, rowSize);
×
2332
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2333
    return -1;
×
2334
  }
2335

2336
  int32_t numVarCols = 0;
184,650✔
2337
  int32_t numFixCols = 0;
184,650✔
2338
  for (int32_t i = 0; i < numOfCols; ++i) {
767,991✔
2339
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
583,340✔
2340
    if (pCol == NULL) {
583,341!
2341
      return -1;
×
2342
    }
2343

2344
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
583,341!
2345
      ++numVarCols;
75,298✔
2346
    } else {
2347
      ++numFixCols;
508,043✔
2348
    }
2349
  }
2350

2351
  // find the data payload whose size is greater than payloadSize
2352
  int result = -1;
184,651✔
2353
  int start = 1;
184,651✔
2354
  int end = nRows;
184,651✔
2355
  while (start <= end) {
1,755,246✔
2356
    int mid = start + (end - start) / 2;
1,570,595✔
2357
    // data size + var data type columns offset + fixed data type columns bitmap len
2358
    int midSize = rowSize * mid + numVarCols * sizeof(int32_t) * mid + numFixCols * BitmapLen(mid);
1,570,595✔
2359
    if (midSize > payloadSize) {
1,570,595✔
2360
      result = mid;
340,291✔
2361
      end = mid - 1;
340,291✔
2362
    } else {
2363
      start = mid + 1;
1,230,304✔
2364
    }
2365
  }
2366

2367
  int32_t newRows = (result != -1) ? result - 1 : nRows;
184,651✔
2368
  // the true value must be less than the value of nRows
2369
  if (newRows > nRows || newRows < 1) {
184,651!
2370
    uError("invalid newRows:%d, nRows:%d", newRows, nRows);
×
2371
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2372
    return -1;
×
2373
  }
2374

2375
  return newRows;
184,656✔
2376
}
2377

2378
void colDataDestroy(SColumnInfoData* pColData) {
70,923,807✔
2379
  if (!pColData) {
70,923,807✔
2380
    return;
830,143✔
2381
  }
2382

2383
  if (IS_VAR_DATA_TYPE(pColData->info.type)) {
70,093,664!
2384
    taosMemoryFreeClear(pColData->varmeta.offset);
17,791,500!
2385
  } else {
2386
    taosMemoryFreeClear(pColData->nullbitmap);
52,302,164!
2387
  }
2388

2389
  taosMemoryFreeClear(pColData->pData);
70,235,068!
2390
}
2391

2392
static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
368,730✔
2393
  int32_t len = BitmapLen(total);
368,730✔
2394

2395
  int32_t newLen = BitmapLen(total - n);
368,730✔
2396
  if (n % 8 == 0) {
368,730✔
2397
    (void)memmove(nullBitmap, nullBitmap + n / 8, newLen);
3,822✔
2398
  } else {
2399
    int32_t  tail = n % 8;
364,908✔
2400
    int32_t  i = 0;
364,908✔
2401
    uint8_t* p = (uint8_t*)nullBitmap;
364,908✔
2402

2403
    if (n < 8) {
364,908✔
2404
      while (i < len) {
527,867✔
2405
        uint8_t v = p[i];  // source bitmap value
491,175✔
2406
        p[i] = (v << tail);
491,175✔
2407

2408
        if (i < len - 1) {
491,175✔
2409
          uint8_t next = p[i + 1];
454,483✔
2410
          p[i] |= (next >> (8 - tail));
454,483✔
2411
        }
2412

2413
        i += 1;
491,175✔
2414
      }
2415
    } else if (n > 8) {
328,216!
2416
      int32_t remain = (total % 8 != 0 && total % 8 <= tail) ? 1 : 0;
328,220✔
2417
      int32_t gap = len - newLen - remain;
328,220✔
2418
      while (i < newLen) {
6,137,972✔
2419
        uint8_t v = p[i + gap];
5,809,752✔
2420
        p[i] = (v << tail);
5,809,752✔
2421

2422
        if (i < newLen - 1 + remain) {
5,809,752✔
2423
          uint8_t next = p[i + gap + 1];
5,707,149✔
2424
          p[i] |= (next >> (8 - tail));
5,707,149✔
2425
        }
2426

2427
        i += 1;
5,809,752✔
2428
      }
2429
    }
2430
  }
2431
}
368,730✔
2432

2433
static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, size_t end) {
×
2434
  int32_t dataOffset = -1;
×
2435
  int32_t dataLen = 0;
×
2436
  int32_t beigin = start;
×
2437
  while (beigin < end) {
×
2438
    int32_t offset = pColInfoData->varmeta.offset[beigin];
×
2439
    if (offset == -1) {
×
2440
      beigin++;
×
2441
      continue;
×
2442
    }
2443
    if (start != 0) {
×
2444
      pColInfoData->varmeta.offset[beigin] = dataLen;
×
2445
    }
2446
    char* data = pColInfoData->pData + offset;
×
2447
    if (dataOffset == -1) dataOffset = offset;  // mark the begin of data
×
2448
    int32_t type = pColInfoData->info.type;
×
2449
    if (type == TSDB_DATA_TYPE_JSON) {
×
2450
      dataLen += getJsonValueLen(data);
×
2451
    } else {
2452
      dataLen += varDataTLen(data);
×
2453
    }
2454
    beigin++;
×
2455
  }
2456

2457
  if (dataOffset > 0) {
×
2458
    (void)memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen);
×
2459
  }
2460

2461
  (void)memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t));
×
2462
  return dataLen;
×
2463
}
2464

2465
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
455,719✔
2466
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
455,719!
2467
    // pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, n, total);
2468
    (void)memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t));
87,040✔
2469

2470
    // clear the offset value of the unused entries.
2471
    memset(&pColInfoData->varmeta.offset[total - n], 0, n);
87,040✔
2472
  } else {
2473
    int32_t bytes = pColInfoData->info.bytes;
368,679✔
2474
    (void)memmove(pColInfoData->pData, ((char*)pColInfoData->pData + n * bytes), (total - n) * bytes);
368,679✔
2475
    doShiftBitmap(pColInfoData->nullbitmap, n, total);
368,679✔
2476
  }
2477
}
455,759✔
2478

2479
int32_t blockDataTrimFirstRows(SSDataBlock* pBlock, size_t n) {
592,301✔
2480
  if (n == 0) {
592,301✔
2481
    return TSDB_CODE_SUCCESS;
380,471✔
2482
  }
2483

2484
  if (pBlock->info.rows <= n) {
211,830✔
2485
    blockDataEmpty(pBlock);
1,640✔
2486
  } else {
2487
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
210,190✔
2488
    for (int32_t i = 0; i < numOfCols; ++i) {
665,972✔
2489
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
455,730✔
2490
      if (pColInfoData == NULL) {
455,720!
2491
        return terrno;
×
2492
      }
2493

2494
      colDataTrimFirstNRows(pColInfoData, n, pBlock->info.rows);
455,720✔
2495
    }
2496

2497
    pBlock->info.rows -= n;
210,242✔
2498
  }
2499
  return TSDB_CODE_SUCCESS;
211,867✔
2500
}
2501

2502
static void colDataKeepFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
106,946✔
2503
  if (n >= total || n == 0) return;
106,946!
2504
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
106,956!
2505
    if (pColInfoData->varmeta.length != 0) {
23,116✔
2506
      int32_t newLen = pColInfoData->varmeta.offset[n];
20,107✔
2507
      if (-1 == newLen) {
20,107✔
2508
        for (int i = n - 1; i >= 0; --i) {
1,920✔
2509
          newLen = pColInfoData->varmeta.offset[i];
1,910✔
2510
          if (newLen != -1) {
1,910✔
2511
            newLen += calcStrBytesByType(pColInfoData->info.type, pColInfoData->pData + newLen);
255✔
2512
            // if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
2513
            //   newLen += getJsonValueLen(pColInfoData->pData + newLen);
2514
            // } else if (IS_STR_DATA_BLOB(pColInfoData->info.type)) {
2515
            //   newLen += blobDataTLen(pColInfoData->pData + newLen);
2516
            // } else {
2517
            //   newLen += varDataTLen(pColInfoData->pData + newLen);
2518
            // }
2519
            break;
255✔
2520
          }
2521
        }
2522
      }
2523
      if (newLen <= -1) {
20,107✔
2524
        uFatal("colDataKeepFirstNRows: newLen:%d  old:%d", newLen, pColInfoData->varmeta.length);
10!
2525
      } else {
2526
        pColInfoData->varmeta.length = newLen;
20,097✔
2527
      }
2528
    }
2529
    // pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, 0, n);
2530
    memset(&pColInfoData->varmeta.offset[n], 0, total - n);
23,111✔
2531
  }
2532
}
2533

2534
void blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n) {
236,121✔
2535
  if (n == 0) {
236,121✔
2536
    blockDataEmpty(pBlock);
27,722✔
2537
    return;
27,724✔
2538
  }
2539

2540
  if (pBlock->info.rows <= n) {
208,399✔
2541
    return;
166,188✔
2542
  } else {
2543
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
42,211✔
2544
    for (int32_t i = 0; i < numOfCols; ++i) {
149,172✔
2545
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
106,948✔
2546
      if (pColInfoData == NULL) {
106,952!
2547
        continue;
×
2548
      }
2549

2550
      colDataKeepFirstNRows(pColInfoData, n, pBlock->info.rows);
106,952✔
2551
    }
2552

2553
    pBlock->info.rows = n;
42,224✔
2554
  }
2555
}
2556

2557
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
2,596✔
2558
  int64_t tbUid = pBlock->info.id.uid;
2,596✔
2559
  int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
2,596✔
2560
  int16_t hasVarCol = pBlock->info.hasVarCol;
2,596✔
2561
  int64_t rows = pBlock->info.rows;
2,596✔
2562
  int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
2,596✔
2563

2564
  int32_t tlen = 0;
2,596✔
2565
  tlen += taosEncodeFixedI64(buf, tbUid);
2,596✔
2566
  tlen += taosEncodeFixedI16(buf, numOfCols);
2,596✔
2567
  tlen += taosEncodeFixedI16(buf, hasVarCol);
5,192✔
2568
  tlen += taosEncodeFixedI64(buf, rows);
2,596✔
2569
  tlen += taosEncodeFixedI32(buf, sz);
2,596✔
2570
  for (int32_t i = 0; i < sz; i++) {
5,292✔
2571
    SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
2,696✔
2572
    if (pColData == NULL) {
2,696!
2573
      return terrno;
×
2574
    }
2575

2576
    tlen += taosEncodeFixedI16(buf, pColData->info.colId);
2,696✔
2577
    tlen += taosEncodeFixedI8(buf, pColData->info.type);
2,696✔
2578
    tlen += taosEncodeFixedI32(buf, pColData->info.bytes);
2,696✔
2579
    tlen += taosEncodeFixedBool(buf, pColData->hasNull);
2,696✔
2580

2581
    if (IS_VAR_DATA_TYPE(pColData->info.type)) {
2,696!
2582
      tlen += taosEncodeBinary(buf, pColData->varmeta.offset, sizeof(int32_t) * rows);
4✔
2583
    } else {
2584
      tlen += taosEncodeBinary(buf, pColData->nullbitmap, BitmapLen(rows));
5,388✔
2585
    }
2586

2587
    int32_t len = colDataGetLength(pColData, rows);
2,696✔
2588
    tlen += taosEncodeFixedI32(buf, len);
2,696✔
2589

2590
    if (pColData->reassigned && IS_VAR_DATA_TYPE(pColData->info.type)) {
2,696!
2591
      for (int32_t row = 0; row < rows; ++row) {
×
2592
        char*   pData = pColData->pData + pColData->varmeta.offset[row];
×
2593
        int32_t colSize = calcStrBytesByType(pColData->info.type, pData);
×
2594
        // if (pColData->info.type == TSDB_DATA_TYPE_JSON) {
2595
        //   colSize = getJsonValueLen(pData);
2596
        // } else if (IS_STR_DATA_BLOB(pColData->info.type)) {
2597
        //   colSize = blobDataTLen(pData);
2598
        // } else {
2599
        //   colSize = varDataTLen(pData);
2600
        // }
2601
        tlen += taosEncodeBinary(buf, pData, colSize);
×
2602
      }
2603
    } else {
2604
      tlen += taosEncodeBinary(buf, pColData->pData, len);
5,392✔
2605
    }
2606
  }
2607
  return tlen;
2,596✔
2608
}
2609

2610
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
1,298✔
2611
  int32_t sz = 0;
1,298✔
2612
  int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1,298✔
2613

2614
  buf = taosDecodeFixedU64(buf, &pBlock->info.id.uid);
2,596!
2615
  buf = taosDecodeFixedI16(buf, &numOfCols);
1,298✔
2616
  buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol);
1,298!
2617
  buf = taosDecodeFixedI64(buf, &pBlock->info.rows);
2,596!
2618
  buf = taosDecodeFixedI32(buf, &sz);
1,298✔
2619

2620
  pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
1,298✔
2621
  if (pBlock->pDataBlock == NULL) {
1,298!
2622
    return NULL;
×
2623
  }
2624

2625
  for (int32_t i = 0; i < sz; i++) {
2,646✔
2626
    SColumnInfoData data = {0};
1,348!
2627
    buf = taosDecodeFixedI16(buf, &data.info.colId);
1,348✔
2628
    buf = taosDecodeFixedI8(buf, &data.info.type);
1,348!
2629
    buf = taosDecodeFixedI32(buf, &data.info.bytes);
1,348✔
2630
    buf = taosDecodeFixedBool(buf, &data.hasNull);
1,348✔
2631

2632
    if (IS_VAR_DATA_TYPE(data.info.type)) {
1,348!
2633
      buf = taosDecodeBinary(buf, (void**)&data.varmeta.offset, pBlock->info.rows * sizeof(int32_t));
2!
2634
    } else {
2635
      buf = taosDecodeBinary(buf, (void**)&data.nullbitmap, BitmapLen(pBlock->info.rows));
2,694!
2636
    }
2637
    if (buf == NULL) {
1,348!
2638
      uError("failed to decode null bitmap/offset, type:%d", data.info.type);
×
2639
      goto _error;
×
2640
    }
2641

2642
    int32_t len = 0;
1,348!
2643
    buf = taosDecodeFixedI32(buf, &len);
1,348✔
2644
    buf = taosDecodeBinary(buf, (void**)&data.pData, len);
1,348!
2645
    if (buf == NULL) {
1,348!
2646
      uError("failed to decode data, type:%d", data.info.type);
×
2647
      goto _error;
×
2648
    }
2649
    if (IS_VAR_DATA_TYPE(data.info.type)) {
1,348!
2650
      data.varmeta.length = len;
1✔
2651
      data.varmeta.allocLen = len;
1✔
2652
    }
2653

2654
    void* px = taosArrayPush(pBlock->pDataBlock, &data);
1,348✔
2655
    if (px == NULL) {
1,348!
2656
      return NULL;
×
2657
    }
2658
  }
2659

2660
  return (void*)buf;
1,298✔
2661
_error:
×
2662
  for (int32_t i = 0; i < sz; ++i) {
×
2663
    SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
×
2664
    if (pColInfoData == NULL) {
×
2665
      break;
×
2666
    }
2667
    colDataDestroy(pColInfoData);
×
2668
  }
2669
  return NULL;
×
2670
}
2671

2672
static int32_t formatTimestamp(char* buf, size_t cap, int64_t val, int precision) {
×
2673
  time_t  tt;
2674
  int32_t ms = 0;
×
2675
  int32_t code = TSDB_CODE_SUCCESS;
×
2676
  int32_t lino = 0;
×
2677
  if (precision == TSDB_TIME_PRECISION_NANO) {
×
2678
    tt = (time_t)(val / 1000000000);
×
2679
    ms = val % 1000000000;
×
2680
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
×
2681
    tt = (time_t)(val / 1000000);
×
2682
    ms = val % 1000000;
×
2683
  } else {
2684
    tt = (time_t)(val / 1000);
×
2685
    ms = val % 1000;
×
2686
  }
2687

2688
  if (tt <= 0 && ms < 0) {
×
2689
    tt--;
×
2690
    if (precision == TSDB_TIME_PRECISION_NANO) {
×
2691
      ms += 1000000000;
×
2692
    } else if (precision == TSDB_TIME_PRECISION_MICRO) {
×
2693
      ms += 1000000;
×
2694
    } else {
2695
      ms += 1000;
×
2696
    }
2697
  }
2698
  struct tm ptm = {0};
×
2699
  if (taosLocalTime(&tt, &ptm, buf, cap, NULL) == NULL) {
×
2700
    code = TSDB_CODE_INTERNAL_ERROR;
×
2701
    TSDB_CHECK_CODE(code, lino, _end);
×
2702
  }
2703

2704
  size_t pos = taosStrfTime(buf, cap, "%Y-%m-%d %H:%M:%S", &ptm);
×
2705
  if (pos == 0) {
×
2706
    code = TSDB_CODE_OUT_OF_BUFFER;
×
2707
    TSDB_CHECK_CODE(code, lino, _end);
×
2708
  }
2709
  int32_t nwritten = 0;
×
2710
  if (precision == TSDB_TIME_PRECISION_NANO) {
×
2711
    nwritten = snprintf(buf + pos, cap - pos, ".%09d", ms);
×
2712
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
×
2713
    nwritten = snprintf(buf + pos, cap - pos, ".%06d", ms);
×
2714
  } else {
2715
    nwritten = snprintf(buf + pos, cap - pos, ".%03d", ms);
×
2716
  }
2717

2718
  if (nwritten >= cap - pos) {
×
2719
    code = TSDB_CODE_OUT_OF_BUFFER;
×
2720
    TSDB_CHECK_CODE(code, lino, _end);
×
2721
  }
2722

2723
_end:
×
2724
  if (code != TSDB_CODE_SUCCESS) {
×
2725
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2726
  }
2727
  return code;
×
2728
}
2729

2730
// for debug
2731
int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, const char* taskIdStr) {
×
2732
  int32_t lino = 0;
×
2733
  int32_t size = 2048 * 1024;
×
2734
  int32_t code = 0;
×
2735
  char*   dumpBuf = NULL;
×
2736
  char    pBuf[TD_TIME_STR_LEN] = {0};
×
2737
  int32_t rows = pDataBlock->info.rows;
×
2738
  int32_t len = 0;
×
2739

2740
  dumpBuf = taosMemoryCalloc(size, 1);
×
2741
  if (dumpBuf == NULL) {
×
2742
    return terrno;
×
2743
  }
2744

2745
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
×
2746
  len += tsnprintf(dumpBuf + len, size - len,
×
2747
                  "%s===stream===%s|block type %d|child id %d|group id:%" PRIx64 "|uid:%" PRId64 "|rows:%" PRId64
2748
                  "|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
2749
                  taskIdStr, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId,
×
2750
                  pDataBlock->info.id.groupId, pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
2751
                  pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
×
2752
  if (len >= size - 1) {
×
2753
    goto _exit;
×
2754
  }
2755

2756
  for (int32_t j = 0; j < rows; j++) {
×
2757
    len += tsnprintf(dumpBuf + len, size - len, "%s|", flag);
×
2758
    if (len >= size - 1) {
×
2759
      goto _exit;
×
2760
    }
2761

2762
    for (int32_t k = 0; k < colNum; k++) {
×
2763
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
×
2764
      if (pColInfoData == NULL) {
×
2765
        code = terrno;
×
2766
        lino = __LINE__;
×
2767
        goto _exit;
×
2768
      }
2769

2770
      if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) {
×
2771
        len += tsnprintf(dumpBuf + len, size - len, " %15s |", "NULL");
×
2772
        if (len >= size - 1) goto _exit;
×
2773
        continue;
×
2774
      }
2775

2776
      void* var = colDataGetData(pColInfoData, j);
×
2777
      switch (pColInfoData->info.type) {
×
2778
        case TSDB_DATA_TYPE_TIMESTAMP:
×
2779
          memset(pBuf, 0, sizeof(pBuf));
×
2780
          code = formatTimestamp(pBuf, sizeof(pBuf), *(uint64_t*)var, pColInfoData->info.precision);
×
2781
          if (code != TSDB_CODE_SUCCESS) {
×
2782
            TAOS_UNUSED(tsnprintf(pBuf, sizeof(pBuf), "NaN"));
×
2783
          }
2784
          len += tsnprintf(dumpBuf + len, size - len, " %25s |", pBuf);
×
2785
          if (len >= size - 1) goto _exit;
×
2786
          break;
×
2787
        case TSDB_DATA_TYPE_TINYINT:
×
2788
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(int8_t*)var);
×
2789
          if (len >= size - 1) goto _exit;
×
2790
          break;
×
2791
        case TSDB_DATA_TYPE_UTINYINT:
×
2792
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(uint8_t*)var);
×
2793
          if (len >= size - 1) goto _exit;
×
2794
          break;
×
2795
        case TSDB_DATA_TYPE_SMALLINT:
×
2796
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(int16_t*)var);
×
2797
          if (len >= size - 1) goto _exit;
×
2798
          break;
×
2799
        case TSDB_DATA_TYPE_USMALLINT:
×
2800
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(uint16_t*)var);
×
2801
          if (len >= size - 1) goto _exit;
×
2802
          break;
×
2803
        case TSDB_DATA_TYPE_INT:
×
2804
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var);
×
2805
          if (len >= size - 1) goto _exit;
×
2806
          break;
×
2807
        case TSDB_DATA_TYPE_UINT:
×
2808
          len += tsnprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var);
×
2809
          if (len >= size - 1) goto _exit;
×
2810
          break;
×
2811
        case TSDB_DATA_TYPE_BIGINT:
×
2812
          len += tsnprintf(dumpBuf + len, size - len, " %15" PRId64 " |", *(int64_t*)var);
×
2813
          if (len >= size - 1) goto _exit;
×
2814
          break;
×
2815
        case TSDB_DATA_TYPE_UBIGINT:
×
2816
          len += tsnprintf(dumpBuf + len, size - len, " %15" PRIu64 " |", *(uint64_t*)var);
×
2817
          if (len >= size - 1) goto _exit;
×
2818
          break;
×
2819
        case TSDB_DATA_TYPE_FLOAT:
×
2820
          len += tsnprintf(dumpBuf + len, size - len, " %15f |", *(float*)var);
×
2821
          if (len >= size - 1) goto _exit;
×
2822
          break;
×
2823
        case TSDB_DATA_TYPE_DOUBLE:
×
2824
          len += tsnprintf(dumpBuf + len, size - len, " %15f |", *(double*)var);
×
2825
          if (len >= size - 1) goto _exit;
×
2826
          break;
×
2827
        case TSDB_DATA_TYPE_BOOL:
×
2828
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var);
×
2829
          if (len >= size - 1) goto _exit;
×
2830
          break;
×
2831
        case TSDB_DATA_TYPE_VARCHAR:
×
2832
        case TSDB_DATA_TYPE_VARBINARY:
2833
        case TSDB_DATA_TYPE_GEOMETRY: {
2834
          memset(pBuf, 0, sizeof(pBuf));
×
2835
          char*   pData = colDataGetVarData(pColInfoData, j);
×
2836
          int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
×
2837
          dataSize = TMIN(dataSize, 50);
×
2838
          memcpy(pBuf, varDataVal(pData), dataSize);
×
2839
          len += tsnprintf(dumpBuf + len, size - len, " %15s |", pBuf);
×
2840
          if (len >= size - 1) goto _exit;
×
2841
        } break;
×
2842
        case TSDB_DATA_TYPE_NCHAR: {
×
2843
          char*   pData = colDataGetVarData(pColInfoData, j);
×
2844
          int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
×
2845
          memset(pBuf, 0, sizeof(pBuf));
×
2846
          code = taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf, NULL);
×
2847
          if (code < 0) {
×
2848
            uError("func %s failed to convert to ucs charset since %s", __func__, tstrerror(code));
×
2849
            lino = __LINE__;
×
2850
            goto _exit;
×
2851
          } else {  // reset the length value
2852
            code = TSDB_CODE_SUCCESS;
×
2853
          }
2854
          len += tsnprintf(dumpBuf + len, size - len, " %15s |", pBuf);
×
2855
          if (len >= size - 1) goto _exit;
×
2856
        } break;
×
2857
        case TSDB_DATA_TYPE_MEDIUMBLOB:
×
2858
        case TSDB_DATA_TYPE_BLOB: {
2859
          memset(pBuf, 0, sizeof(pBuf));
×
2860
          char*   pData = colDataGetVarData(pColInfoData, j);
×
2861
          int32_t dataSize = TMIN(sizeof(pBuf), blobDataLen(pData));
×
2862
          dataSize = TMIN(dataSize, 50);
×
2863
          memcpy(pBuf, blobDataVal(pData), dataSize);
×
2864
          len += tsnprintf(dumpBuf + len, size - len, " %15s |", pBuf);
×
2865
          if (len >= size - 1) goto _exit;
×
2866
          break;
×
2867
        }
2868
      }
2869
    }
2870
    len += tsnprintf(dumpBuf + len, size - len, "%d\n", j);
×
2871
    if (len >= size - 1) goto _exit;
×
2872
  }
2873
  len += tsnprintf(dumpBuf + len, size - len, "%s |end\n", flag);
×
2874

2875
_exit:
×
2876
  if (code == TSDB_CODE_SUCCESS) {
×
2877
    *pDataBuf = dumpBuf;
×
2878
    dumpBuf = NULL;
×
2879
  } else {
2880
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2881
    if (dumpBuf) {
×
2882
      taosMemoryFree(dumpBuf);
×
2883
    }
2884
  }
2885
  return code;
×
2886
}
2887

2888
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, const STSchema* pTSchema,
×
2889
                                    int64_t uid, int32_t vgId, tb_uid_t suid) {
2890
  SSubmitReq2* pReq = *ppReq;
×
2891
  SArray*      pVals = NULL;
×
2892
  int32_t      sz = 1;
×
2893
  int32_t      code = 0;
×
2894
  *ppReq = NULL;
×
2895
  terrno = 0;
×
2896

2897
  if (NULL == pReq) {
×
2898
    if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
×
2899
      code = terrno;
×
2900
      goto _end;
×
2901
    }
2902

2903
    if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
×
2904
      code = terrno;
×
2905
      goto _end;
×
2906
    }
2907
  }
2908

2909
  for (int32_t i = 0; i < sz; ++i) {
×
2910
    int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
×
2911
    int32_t rows = pDataBlock->info.rows;
×
2912

2913
    if (colNum <= 1) {  // invalid if only with TS col
×
2914
      continue;
×
2915
    }
2916

2917
    // the rsma result should has the same column number with schema.
2918
    if (colNum != pTSchema->numOfCols) {
×
2919
      uError("colNum %d is not equal to numOfCols %d", colNum, pTSchema->numOfCols);
×
2920
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2921
      goto _end;
×
2922
    }
2923

2924
    SSubmitTbData tbData = {0};
×
2925

2926
    if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
×
2927
      code = terrno;
×
2928
      goto _end;
×
2929
    }
2930

2931
    tbData.suid = suid;
×
2932
    tbData.uid = uid;
×
2933
    tbData.sver = pTSchema->version;
×
2934

2935
    if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
×
2936
      code = terrno;
×
2937
      taosArrayDestroy(tbData.aRowP);
×
2938
      goto _end;
×
2939
    }
2940

2941
    for (int32_t j = 0; j < rows; ++j) {  // iterate by row
×
2942

2943
      taosArrayClear(pVals);
×
2944

2945
      bool    isStartKey = false;
×
2946
      int32_t offset = 0;
×
2947
      for (int32_t k = 0; k < colNum; ++k) {  // iterate by column
×
2948
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
×
2949
        if (pColInfoData == NULL) {
×
2950
          return terrno;
×
2951
        }
2952

2953
        const STColumn* pCol = &pTSchema->columns[k];
×
2954
        void*           var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
×
2955

2956
        switch (pColInfoData->info.type) {
×
2957
          case TSDB_DATA_TYPE_TIMESTAMP:
×
2958
            if (pColInfoData->info.type != pCol->type) {
×
2959
              uError("colType:%d mismatch with sechma colType:%d", pColInfoData->info.type, pCol->type);
×
2960
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2961
              return terrno;
×
2962
            }
2963
            if (!isStartKey) {
×
2964
              isStartKey = true;
×
2965
              if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId) {
×
2966
                uError("the first timestamp colId %d is not primary colId", pCol->colId);
×
2967
                terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2968
                return terrno;
×
2969
              }
2970
              SValue val = {.type = pCol->type};
×
2971
              VALUE_SET_TRIVIAL_DATUM(&val, *(TSKEY*)var);
×
2972
              SColVal cv = COL_VAL_VALUE(pCol->colId, val);
×
2973
              void*   px = taosArrayPush(pVals, &cv);
×
2974
              if (px == NULL) {
×
2975
                return terrno;
×
2976
              }
2977

2978
            } else if (colDataIsNull_s(pColInfoData, j)) {
×
2979
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
2980
              void*   px = taosArrayPush(pVals, &cv);
×
2981
              if (px == NULL) {
×
2982
                return terrno;
×
2983
              }
2984
            } else {
2985
              SValue val = {.type = pCol->type};
×
2986
              VALUE_SET_TRIVIAL_DATUM(&val, *(int64_t*)var);
×
2987
              SColVal cv = COL_VAL_VALUE(pCol->colId, val);
×
2988
              void*   px = taosArrayPush(pVals, &cv);
×
2989
              if (px == NULL) {
×
2990
                return terrno;
×
2991
              }
2992
            }
2993
            break;
×
2994
          case TSDB_DATA_TYPE_NCHAR:
×
2995
          case TSDB_DATA_TYPE_VARBINARY:
2996
          case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
2997
            if (pColInfoData->info.type != pCol->type) {
×
2998
              uError("colType:%d mismatch with sechma colType:%d", pColInfoData->info.type, pCol->type);
×
2999
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3000
              return terrno;
×
3001
            }
3002
            if (colDataIsNull_s(pColInfoData, j)) {
×
3003
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
3004
              void*   px = taosArrayPush(pVals, &cv);
×
3005
              if (px == NULL) {
×
3006
                goto _end;
×
3007
              }
3008
            } else {
3009
              void*   data = colDataGetVarData(pColInfoData, j);
×
3010
              SValue  sv = (SValue){.type = pCol->type,
×
3011
                                    .nData = varDataLen(data),
×
3012
                                    .pData = (uint8_t*)varDataVal(data)};  // address copy, no value
×
3013
              SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
×
3014
              void*   px = taosArrayPush(pVals, &cv);
×
3015
              if (px == NULL) {
×
3016
                code = terrno;
×
3017
                goto _end;
×
3018
              }
3019
            }
3020
            break;
×
3021
          }
3022
          case TSDB_DATA_TYPE_DECIMAL:
×
3023
          case TSDB_DATA_TYPE_MEDIUMBLOB:
3024
          case TSDB_DATA_TYPE_BLOB:
3025
            uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
3026
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3027
            return terrno;
×
3028
            break;
3029
          case TSDB_DATA_TYPE_JSON:
×
3030
            uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
3031
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3032
            return terrno;
×
3033
            break;
3034
          default:
×
3035
            if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
×
3036
              if (colDataIsNull_s(pColInfoData, j)) {
×
3037
                SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);  // should use pCol->type
×
3038
                void*   px = taosArrayPush(pVals, &cv);
×
3039
                if (px == NULL) {
×
3040
                  goto _end;
×
3041
                }
3042
              } else {
3043
                SValue sv = {.type = pCol->type};
×
3044
                if (pCol->type == pColInfoData->info.type) {
×
3045
                  valueSetDatum(&sv, sv.type, var, tDataTypes[pCol->type].bytes);
×
3046
                } else {
3047
                  /**
3048
                   *  1. sum/avg would convert to int64_t/uint64_t/double during aggregation
3049
                   *  2. below conversion may lead to overflow or loss, the app should select the right data type.
3050
                   */
3051
                  char tv[DATUM_MAX_SIZE] = {0};
×
3052
                  if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) {
×
3053
                    float v = 0;
×
3054
                    GET_TYPED_DATA(v, float, pColInfoData->info.type, var,
×
3055
                                   typeGetTypeModFromColInfo(&pColInfoData->info));
3056
                    SET_TYPED_DATA(&tv, pCol->type, v);
×
3057
                  } else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) {
×
3058
                    double v = 0;
×
3059
                    GET_TYPED_DATA(v, double, pColInfoData->info.type, var,
×
3060
                                   typeGetTypeModFromColInfo(&pColInfoData->info));
3061
                    SET_TYPED_DATA(&tv, pCol->type, v);
×
3062
                  } else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) {
×
3063
                    int64_t v = 0;
×
3064
                    GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var,
×
3065
                                   typeGetTypeModFromColInfo(&pColInfoData->info));
3066
                    SET_TYPED_DATA(&tv, pCol->type, v);
×
3067
                  } else {
3068
                    uint64_t v = 0;
×
3069
                    GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var,
×
3070
                                   typeGetTypeModFromColInfo(&pColInfoData->info));
3071
                    SET_TYPED_DATA(&tv, pCol->type, v);
×
3072
                  }
3073
                  valueSetDatum(&sv, sv.type, tv, tDataTypes[pCol->type].bytes);
×
3074
                }
3075
                SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
×
3076
                void*   px = taosArrayPush(pVals, &cv);
×
3077
                if (px == NULL) {
×
3078
                  code = terrno;
×
3079
                  goto _end;
×
3080
                }
3081
              }
3082
            } else {
3083
              uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
3084
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3085
              return terrno;
×
3086
            }
3087
            break;
×
3088
        }
3089
      }
3090
      SRow*             pRow = NULL;
×
3091
      SRowBuildScanInfo sinfo = {0};
×
3092
      if ((code = tRowBuild(pVals, pTSchema, &pRow, &sinfo)) < 0) {
×
3093
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
3094
        goto _end;
×
3095
      }
3096

3097
      void* px = taosArrayPush(tbData.aRowP, &pRow);
×
3098
      if (px == NULL) {
×
3099
        code = terrno;
×
3100
        goto _end;
×
3101
      }
3102
    }
3103

3104
    void* px = taosArrayPush(pReq->aSubmitTbData, &tbData);
×
3105
    if (px == NULL) {
×
3106
      code = terrno;
×
3107
      goto _end;
×
3108
    }
3109
  }
3110

3111
_end:
×
3112
  taosArrayDestroy(pVals);
×
3113
  if (code != 0) {
×
3114
    if (pReq) {
×
3115
      tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
3116
      taosMemoryFreeClear(pReq);
×
3117
    }
3118
  } else {
3119
    *ppReq = pReq;
×
3120
  }
3121

3122
  return code;
×
3123
}
3124

3125
// Construct the child table name in the form of <ctbName>_<stbName>_<groupId> and store it in `ctbName`.
3126
int32_t buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId, size_t cap) {
27✔
3127
  int32_t code = TSDB_CODE_SUCCESS;
27✔
3128
  int32_t lino = 0;
27✔
3129
  char    tmp[TSDB_TABLE_NAME_LEN] = {0};
27✔
3130

3131
  if (ctbName == NULL || cap < TSDB_TABLE_NAME_LEN) {
27✔
3132
    code = TSDB_CODE_INTERNAL_ERROR;
6✔
3133
    TSDB_CHECK_CODE(code, lino, _end);
6!
3134
  }
3135

3136
  if (stbName == NULL) {
21✔
3137
    snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%" PRIu64, groupId);
6✔
3138
  } else {
3139
    int32_t i = strlen(stbName) - 1;
15✔
3140
    for (; i >= 0; i--) {
564✔
3141
      if (stbName[i] == '.') {
552✔
3142
        break;
3✔
3143
      }
3144
    }
3145
    snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%" PRIu64, stbName + i + 1, groupId);
15✔
3146
  }
3147

3148
  ctbName[cap - strlen(tmp) - 1] = 0;  // put stbname + groupId to the end
21✔
3149
  size_t prefixLen = strlen(ctbName);
21✔
3150
  ctbName = strncat(ctbName, tmp, cap - prefixLen - 1);
21✔
3151

3152
  for (char* p = ctbName; *p; ++p) {
981✔
3153
    if (*p == '.') *p = '_';
960!
3154
  }
3155

3156
_end:
21✔
3157
  if (code != TSDB_CODE_SUCCESS) {
27✔
3158
    uError("%s failed at line %d since %s, ctbName:%s", __func__, lino, tstrerror(code), ctbName);
6!
3159
  }
3160
  return code;
27✔
3161
}
3162

3163
// auto stream subtable name starts with 't_', followed by the first segment of MD5 digest for group vals.
3164
// the total length is fixed to be 34 bytes.
3165
bool isAutoTableName(char* ctbName) { return (strlen(ctbName) == 34 && ctbName[0] == 't' && ctbName[1] == '_'); }
×
3166

3167
bool alreadyAddGroupId(char* ctbName, int64_t groupId) {
12✔
3168
  char tmp[64] = {0};
12✔
3169
  snprintf(tmp, sizeof(tmp), "%" PRIu64, groupId);
12✔
3170
  size_t len1 = strlen(ctbName);
12✔
3171
  size_t len2 = strlen(tmp);
12✔
3172
  if (len1 < len2) return false;
12✔
3173
  return memcmp(ctbName + len1 - len2, tmp, len2) == 0;
9✔
3174
}
3175

3176
int32_t buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId, char** pName) {
×
3177
  QRY_PARAM_CHECK(pName);
×
3178

3179
  char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
×
3180
  if (!pBuf) {
×
3181
    return terrno;
×
3182
  }
3183

3184
  int32_t code = buildCtbNameByGroupIdImpl(stbFullName, groupId, pBuf);
×
3185
  if (code != TSDB_CODE_SUCCESS) {
×
3186
    taosMemoryFree(pBuf);
×
3187
  } else {
3188
    *pName = pBuf;
×
3189
  }
3190

3191
  return code;
×
3192
}
3193

3194
int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, char* cname) {
×
3195
  if (stbFullName[0] == 0) {
×
3196
    return TSDB_CODE_INVALID_PARA;
×
3197
  }
3198

3199
  SArray* tags = taosArrayInit(0, sizeof(SSmlKv));
×
3200
  if (tags == NULL) {
×
3201
    return terrno;
×
3202
  }
3203

3204
  if (cname == NULL) {
×
3205
    taosArrayDestroy(tags);
×
3206
    return TSDB_CODE_INVALID_PARA;
×
3207
  }
3208

3209
  int8_t      type = TSDB_DATA_TYPE_UBIGINT;
×
3210
  const char* name = "group_id";
×
3211
  int32_t     len = strlen(name);
×
3212

3213
  SSmlKv pTag = {.key = name, .keyLen = len, .type = type, .u = groupId, .length = sizeof(uint64_t)};
×
3214
  void*  px = taosArrayPush(tags, &pTag);
×
3215
  if (px == NULL) {
×
3216
    return terrno;
×
3217
  }
3218

3219
  RandTableName rname = {
×
3220
      .tags = tags, .stbFullName = stbFullName, .stbFullNameLen = strlen(stbFullName), .ctbShortName = cname};
×
3221

3222
  int32_t code = buildChildTableName(&rname);
×
3223
  if (code != TSDB_CODE_SUCCESS) {
×
3224
    return code;
×
3225
  }
3226

3227
  taosArrayDestroy(tags);
×
3228
  if ((rname.ctbShortName && rname.ctbShortName[0]) == 0) {
×
3229
    return TSDB_CODE_INVALID_PARA;
×
3230
  }
3231

3232
  return code;
×
3233
}
3234

3235
int32_t buildSinkDestTableName(char* parTbName, const char* stbFullName, uint64_t gid, bool newSubTableRule,
×
3236
                               char** dstTableName) {
3237
  int32_t code = TSDB_CODE_SUCCESS;
×
3238
  int32_t lino = 0;
×
3239

3240
  if (parTbName[0]) {
×
3241
    if (newSubTableRule && !isAutoTableName(parTbName) && !alreadyAddGroupId(parTbName, gid) && gid != 0 &&
×
3242
        stbFullName) {
3243
      *dstTableName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
×
3244
      TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);
×
3245

3246
      tstrncpy(*dstTableName, parTbName, TSDB_TABLE_NAME_LEN);
×
3247
      code = buildCtbNameAddGroupId(stbFullName, *dstTableName, gid, TSDB_TABLE_NAME_LEN);
×
3248
      TSDB_CHECK_CODE(code, lino, _end);
×
3249
    } else {
3250
      *dstTableName = taosStrdup(parTbName);
×
3251
      TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);
×
3252
    }
3253
  } else {
3254
    code = buildCtbNameByGroupId(stbFullName, gid, dstTableName);
×
3255
    TSDB_CHECK_CODE(code, lino, _end);
×
3256
  }
3257

3258
_end:
×
3259
  return code;
×
3260
}
3261

3262
// return length of encoded data, return -1 if failed
3263
int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, int32_t numOfCols) {
3,572,287✔
3264
  int32_t code = blockDataCheck(pBlock);
3,572,287✔
3265
  if (code != TSDB_CODE_SUCCESS) {
3,571,808!
3266
    terrno = code;
×
3267
    return -1;
×
3268
  }
3269

3270
  int32_t dataLen = 0;
3,571,996✔
3271

3272
  // todo extract method
3273
  int32_t* version = (int32_t*)data;
3,571,996✔
3274
  *version = BLOCK_VERSION_1;
3,571,996✔
3275
  data += sizeof(int32_t);
3,571,996✔
3276

3277
  int32_t* actualLen = (int32_t*)data;
3,571,996✔
3278
  data += sizeof(int32_t);
3,571,996✔
3279

3280
  int32_t* rows = (int32_t*)data;
3,571,996✔
3281
  *rows = pBlock->info.rows;
3,571,996✔
3282
  data += sizeof(int32_t);
3,571,996✔
3283
  if (*rows <= 0) {
3,571,996!
3284
    uError("Invalid rows %d in block", *rows);
×
3285
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3286
    return -1;
×
3287
  }
3288

3289
  int32_t* cols = (int32_t*)data;
3,571,996✔
3290
  *cols = numOfCols;
3,571,996✔
3291
  data += sizeof(int32_t);
3,571,996✔
3292

3293
  // flag segment.
3294
  // the inital bit is for column info
3295
  int32_t* flagSegment = (int32_t*)data;
3,571,996✔
3296
  *flagSegment = (1 << 31);
3,571,996✔
3297

3298
  data += sizeof(int32_t);
3,571,996✔
3299

3300
  uint64_t* groupId = (uint64_t*)data;
3,571,996✔
3301
  data += sizeof(uint64_t);
3,571,996✔
3302

3303
  for (int32_t i = 0; i < numOfCols; ++i) {
22,851,800✔
3304
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
19,280,471✔
3305
    if (pColInfoData == NULL) {
19,279,263!
3306
      return -1;
×
3307
    }
3308

3309
    *((int8_t*)data) = pColInfoData->info.type;
19,279,263✔
3310
    data += sizeof(int8_t);
19,279,263✔
3311

3312
    int32_t bytes = pColInfoData->info.bytes;
19,279,263✔
3313
    *((int32_t*)data) = bytes;
19,279,263✔
3314
    if (IS_DECIMAL_TYPE(pColInfoData->info.type)) {
19,279,263✔
3315
      fillBytesForDecimalType((int32_t*)data, pColInfoData->info.type, pColInfoData->info.precision,
513✔
3316
                              pColInfoData->info.scale);
513✔
3317
    }
3318
    data += sizeof(int32_t);
19,279,804✔
3319
  }
3320

3321
  int32_t* colSizes = (int32_t*)data;
3,571,329✔
3322
  data += numOfCols * sizeof(int32_t);
3,571,329✔
3323

3324
  dataLen = blockDataGetSerialMetaSize(numOfCols);
3,571,329✔
3325

3326
  int32_t numOfRows = pBlock->info.rows;
3,572,248✔
3327
  for (int32_t col = 0; col < numOfCols; ++col) {
22,774,579✔
3328
    SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);
19,205,150✔
3329
    if (pColRes == NULL) {
19,196,852!
3330
      return -1;
×
3331
    }
3332

3333
    // copy the null bitmap
3334
    size_t metaSize = 0;
19,196,852✔
3335
    if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
19,196,852!
3336
      if (IS_STR_DATA_BLOB(pColRes->info.type)) {
4,818,892!
3337
        metaSize = numOfRows * sizeof(int32_t);
×
3338
        if (dataLen + metaSize > dataBuflen) goto _exit;
×
3339
        memcpy(data, pColRes->varmeta.offset, metaSize);
×
3340
      } else {
3341
        metaSize = numOfRows * sizeof(int32_t);
4,862,180✔
3342
        if (dataLen + metaSize > dataBuflen) goto _exit;
4,862,180!
3343
        memcpy(data, pColRes->varmeta.offset, metaSize);
4,862,180✔
3344
      }
3345
    } else {
3346
      metaSize = BitmapLen(numOfRows);
14,377,960✔
3347
      if (dataLen + metaSize > dataBuflen) goto _exit;
14,377,960!
3348
      memcpy(data, pColRes->nullbitmap, metaSize);
14,377,960✔
3349
    }
3350

3351
    data += metaSize;
19,196,852✔
3352
    dataLen += metaSize;
19,196,852✔
3353

3354
    if (pColRes->reassigned && IS_VAR_DATA_TYPE(pColRes->info.type)) {
19,196,852!
3355
      colSizes[col] = 0;
×
3356
      for (int32_t row = 0; row < numOfRows; ++row) {
×
3357
        char*   pColData = pColRes->pData + pColRes->varmeta.offset[row];
×
3358
        int32_t colSize = calcStrBytesByType(pColRes->info.type, pColData);
×
3359

3360
        colSizes[col] += colSize;
×
3361
        dataLen += colSize;
×
3362
        if (dataLen > dataBuflen) goto _exit;
×
3363
        (void)memmove(data, pColData, colSize);
×
3364
        data += colSize;
×
3365
      }
3366
    } else {
3367
      colSizes[col] = colDataGetLength(pColRes, numOfRows);
19,196,852✔
3368
      dataLen += colSizes[col];
19,201,330✔
3369
      if (dataLen > dataBuflen) goto _exit;
19,201,330!
3370
      if (pColRes->pData != NULL) {
19,201,330✔
3371
        (void)memmove(data, pColRes->pData, colSizes[col]);
18,738,345✔
3372
      }
3373
      data += colSizes[col];
19,201,330✔
3374
    }
3375

3376
    if (colSizes[col] <= 0 && !colDataIsNull_s(pColRes, 0) && pColRes->info.type != TSDB_DATA_TYPE_NULL) {
19,682,262!
3377
      uWarn("Invalid colSize:%d colIdx:%d colType:%d while encoding block", colSizes[col], col, pColRes->info.type);
2!
3378
      //terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
3379
      //return -1;
3380
    }
3381

3382
    colSizes[col] = htonl(colSizes[col]);
19,202,331✔
3383
    //    uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type,
3384
    //    htonl(colSizes[col]), colSizes[col]);
3385
  }
3386

3387
  bool* blankFill = (bool*)data;
3,569,429✔
3388
  *blankFill = pBlock->info.blankFill;
3,569,429✔
3389
  data += sizeof(bool);
3,569,429✔
3390

3391
  *actualLen = dataLen;
3,569,429✔
3392
#ifndef NO_UNALIGNED_ACCESS
3393
  *groupId = pBlock->info.id.groupId;
3,569,429✔
3394
#else
3395
  taosSetPUInt64Aligned(groupId, &pBlock->info.id.groupId);
3396
#endif
3397
  if (dataLen > dataBuflen) goto _exit;
3,569,429!
3398

3399
  return dataLen;
3,569,429✔
3400

3401
_exit:
×
3402
  uError("blockEncode dataLen:%d, dataBuflen:%zu", dataLen, dataBuflen);
×
3403
  terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3404
  return -1;
×
3405
}
3406

3407
int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos) {
2,144,116✔
3408
  const char* pStart = pData;
2,144,116✔
3409

3410
  int32_t version = *(int32_t*)pStart;
2,144,116✔
3411
  pStart += sizeof(int32_t);
2,144,116✔
3412

3413
  // total length sizeof(int32_t)
3414
  int32_t dataLen = *(int32_t*)pStart;
2,144,116✔
3415
  pStart += sizeof(int32_t);
2,144,116✔
3416

3417
  // total rows sizeof(int32_t)
3418
  int32_t numOfRows = *(int32_t*)pStart;
2,144,116✔
3419
  pStart += sizeof(int32_t);
2,144,116✔
3420
  if (numOfRows <= 0) {
2,144,116!
3421
    uError("block decode numOfRows:%d error", numOfRows);
×
3422
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3423
    return terrno;
×
3424
  }
3425

3426
  // total columns sizeof(int32_t)
3427
  int32_t numOfCols = *(int32_t*)pStart;
2,144,116✔
3428
  pStart += sizeof(int32_t);
2,144,116✔
3429

3430
  // has column info segment
3431
  int32_t flagSeg = *(int32_t*)pStart;
2,144,116✔
3432
  int32_t hasColumnInfo = (flagSeg >> 31);
2,144,116✔
3433
  pStart += sizeof(int32_t);
2,144,116✔
3434

3435
  // group id sizeof(uint64_t)
3436
#ifndef NO_UNALIGNED_ACCESS
3437
  pBlock->info.id.groupId = *(uint64_t*)pStart;
2,144,116✔
3438
#else
3439
  taosSetPUInt64Aligned(&pBlock->info.id.groupId, (uint64_t*)pStart);
3440
#endif
3441
  pStart += sizeof(uint64_t);
2,144,116✔
3442

3443
  if (pBlock->pDataBlock == NULL) {
2,144,116✔
3444
    pBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols);
12,228✔
3445
    if (pBlock->pDataBlock == NULL) {
12,226!
3446
      return terrno;
×
3447
    }
3448
  }
3449

3450
  for (int32_t i = 0; i < numOfCols; ++i) {
12,956,783✔
3451
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
10,812,708✔
3452
    if (pColInfoData == NULL) {
10,812,677!
3453
      return terrno;
×
3454
    }
3455

3456
    pColInfoData->info.type = *(int8_t*)pStart;
10,812,677✔
3457
    pStart += sizeof(int8_t);
10,812,677✔
3458

3459
    pColInfoData->info.bytes = *(int32_t*)pStart;
10,812,677✔
3460
    if (IS_DECIMAL_TYPE(pColInfoData->info.type)) {
10,812,677✔
3461
      extractDecimalTypeInfoFromBytes(&pColInfoData->info.bytes, &pColInfoData->info.precision,
13✔
3462
                                      &pColInfoData->info.scale);
3463
    }
3464
    pStart += sizeof(int32_t);
10,812,660✔
3465

3466
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
10,812,660!
3467
      pBlock->info.hasVarCol = true;
2,747,776✔
3468
    }
3469
  }
3470

3471
  int32_t code = blockDataEnsureCapacity(pBlock, numOfRows);
2,144,075✔
3472
  if (code) {
2,144,142!
3473
    return code;
×
3474
  }
3475

3476
  int32_t* colLen = (int32_t*)pStart;
2,144,142✔
3477
  pStart += sizeof(int32_t) * numOfCols;
2,144,142✔
3478

3479
  for (int32_t i = 0; i < numOfCols; ++i) {
12,956,015✔
3480
    int oneColsLen = htonl(colLen[i]);
10,811,200✔
3481
    if (oneColsLen < 0) {
10,811,200!
3482
      uError("block decode colLen:%d error, colIdx:%d", oneColsLen, i);
×
3483
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3484
      return terrno;
×
3485
    }
3486

3487
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
10,811,200✔
3488
    if (pColInfoData == NULL) {
10,810,883!
3489
      return terrno;
×
3490
    }
3491

3492
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
10,811,787!
3493
      memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows);
2,747,726✔
3494
      pStart += sizeof(int32_t) * numOfRows;
2,747,726✔
3495

3496
      if (oneColsLen > 0 && pColInfoData->varmeta.allocLen < oneColsLen) {
2,747,726✔
3497
        char* tmp = taosMemoryRealloc(pColInfoData->pData, oneColsLen);
1,470,945!
3498
        if (tmp == NULL) {
1,471,031!
3499
          return terrno;
×
3500
        }
3501

3502
        pColInfoData->pData = tmp;
1,471,031✔
3503
        pColInfoData->varmeta.allocLen = oneColsLen;
1,471,031✔
3504
      }
3505

3506
      pColInfoData->varmeta.length = oneColsLen;
2,747,812✔
3507
    } else {
3508
      memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
8,064,061✔
3509
      pStart += BitmapLen(numOfRows);
8,064,061✔
3510
    }
3511

3512
    // TODO
3513
    // setting this flag to true temporarily so aggregate function on stable will
3514
    // examine NULL value for non-primary key column
3515
    pColInfoData->hasNull = true;
10,811,873✔
3516

3517
    if (oneColsLen > 0) {
10,811,873✔
3518
      memcpy(pColInfoData->pData, pStart, oneColsLen);
10,493,330✔
3519
    } else if (!colDataIsNull_s(pColInfoData, 0) && pColInfoData->info.type != TSDB_DATA_TYPE_NULL) {
318,543!
3520
      uError("block decode colLen:%d error, colIdx:%d, type:%d", oneColsLen, i, pColInfoData->info.type);
×
3521
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3522
      return terrno;
×
3523
    }
3524

3525
    pStart += oneColsLen;
10,811,873✔
3526
  }
3527

3528
  bool blankFill = *(bool*)pStart;
2,144,815✔
3529
  pStart += sizeof(bool);
2,144,815✔
3530

3531
  pBlock->info.dataLoad = 1;
2,144,815✔
3532
  pBlock->info.rows = numOfRows;
2,144,815✔
3533
  pBlock->info.blankFill = blankFill;
2,144,815✔
3534
  if (pStart - pData != dataLen) {
2,144,815!
3535
    uError("block decode msg len error, pStart:%p, pData:%p, dataLen:%d", pStart, pData, dataLen);
×
3536
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3537
    return terrno;
×
3538
  }
3539

3540
  if (pEndPos != NULL) {
2,144,815✔
3541
    *pEndPos = pStart;
2,126,636✔
3542
  }
3543

3544
  code = blockDataCheck(pBlock);
2,144,815✔
3545
  if (code != TSDB_CODE_SUCCESS) {
2,143,983!
3546
    terrno = code;
×
3547
    return code;
×
3548
  }
3549

3550
  return TSDB_CODE_SUCCESS;
2,143,984✔
3551
}
3552

3553
int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) {
444,641✔
3554
  //  int32_t totalRows = pBlock->info.rows;
3555
  int32_t code = 0;
444,641✔
3556
  int32_t bmLen = BitmapLen(totalRows);
444,641✔
3557
  char*   pBitmap = NULL;
444,641✔
3558
  int32_t maxRows = 0;
444,641✔
3559

3560
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
444,641✔
3561
  if (!pBoolList) {
444,713✔
3562
    for (int32_t i = 0; i < numOfCols; ++i) {
1,202,231✔
3563
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1,069,059✔
3564
      // it is a reserved column for scalar function, and no data in this column yet.
3565
      if (pDst->pData == NULL) {
1,069,102✔
3566
        continue;
61,491✔
3567
      }
3568

3569
      int32_t numOfRows = 0;
1,007,611✔
3570
      if (IS_VAR_DATA_TYPE(pDst->info.type)) {
1,007,611!
3571
        pDst->varmeta.length = 0;
161,007✔
3572
      } else {
3573
        memset(pDst->nullbitmap, 0, bmLen);
846,604✔
3574
      }
3575
    }
3576
    return code;
133,172✔
3577
  }
3578

3579
  for (int32_t i = 0; i < numOfCols; ++i) {
1,489,408✔
3580
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1,177,767✔
3581
    // it is a reserved column for scalar function, and no data in this column yet.
3582
    if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) {
1,177,726!
3583
      continue;
41,127✔
3584
    }
3585

3586
    int32_t numOfRows = 0;
1,136,599✔
3587
    if (IS_VAR_DATA_TYPE(pDst->info.type)) {
1,362,361!
3588
      int32_t j = 0;
226,054✔
3589
      pDst->varmeta.length = 0;
226,054✔
3590

3591
      while (j < totalRows) {
49,287,666✔
3592
        if (pBoolList[j] == 0) {
49,061,904✔
3593
          j += 1;
38,044,386✔
3594
          continue;
38,044,386✔
3595
        }
3596

3597
        if (colDataIsNull_var(pDst, j)) {
11,017,518!
3598
          colDataSetNull_var(pDst, numOfRows);
124,019✔
3599
        } else {
3600
          // fix address sanitizer error. p1 may point to memory that will change during realloc of colDataSetVal,
3601
          // first copy it to p2
3602
          char*   p1 = colDataGetVarData(pDst, j);
10,893,499✔
3603
          int32_t len = calcStrBytesByType(pDst->info.type, p1);
10,893,499✔
3604
          // if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
3605
          //   len = getJsonValueLen(p1);
3606
          // } else if (IS_STR_DATA_BLOB(pDst->info.type)) {
3607
          //   len = blobDataTLen(p1);
3608
          // } else {
3609
          //   len = varDataTLen(p1);
3610
          // }
3611

3612
          char* p2 = taosMemoryMalloc(len);
10,892,913!
3613
          if (p2 == NULL) {
10,892,951!
3614
            return terrno;
×
3615
          }
3616

3617
          memcpy(p2, p1, len);
10,892,951✔
3618
          code = colDataSetVal(pDst, numOfRows, p2, false);
10,892,951✔
3619
          taosMemoryFree(p2);
10,892,780!
3620
          if (code) {
10,893,207!
3621
            return code;
×
3622
          }
3623
        }
3624
        numOfRows += 1;
11,017,226✔
3625
        j += 1;
11,017,226✔
3626
      }
3627

3628
      if (maxRows < numOfRows) {
225,762✔
3629
        maxRows = numOfRows;
30,741✔
3630
      }
3631
    } else {
3632
      if (pBitmap == NULL) {
910,545✔
3633
        pBitmap = taosMemoryCalloc(1, bmLen);
309,072!
3634
        if (pBitmap == NULL) {
309,462!
3635
          return terrno;
×
3636
        }
3637
      }
3638

3639
      memcpy(pBitmap, pDst->nullbitmap, bmLen);
910,935✔
3640
      memset(pDst->nullbitmap, 0, bmLen);
910,935✔
3641

3642
      int32_t j = 0;
910,935✔
3643

3644
      switch (pDst->info.type) {
910,935!
3645
        case TSDB_DATA_TYPE_BIGINT:
536,573✔
3646
        case TSDB_DATA_TYPE_UBIGINT:
3647
        case TSDB_DATA_TYPE_DOUBLE:
3648
        case TSDB_DATA_TYPE_TIMESTAMP:
3649
          while (j < totalRows) {
273,319,483✔
3650
            if (pBoolList[j] == 0) {
272,782,910✔
3651
              j += 1;
167,458,709✔
3652
              continue;
167,458,709✔
3653
            }
3654

3655
            if (BMIsNull(pBitmap, j)) {
105,324,201✔
3656
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
1,771,756✔
3657
            } else {
3658
              ((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j];
103,552,445✔
3659
            }
3660
            numOfRows += 1;
105,324,201✔
3661
            j += 1;
105,324,201✔
3662
          }
3663
          break;
536,573✔
3664
        case TSDB_DATA_TYPE_FLOAT:
265,599✔
3665
        case TSDB_DATA_TYPE_INT:
3666
        case TSDB_DATA_TYPE_UINT:
3667
          while (j < totalRows) {
48,655,252✔
3668
            if (pBoolList[j] == 0) {
48,389,653✔
3669
              j += 1;
33,456,984✔
3670
              continue;
33,456,984✔
3671
            }
3672
            if (BMIsNull(pBitmap, j)) {
14,932,669✔
3673
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
1,188,047✔
3674
            } else {
3675
              ((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j];
13,744,622✔
3676
            }
3677
            numOfRows += 1;
14,932,669✔
3678
            j += 1;
14,932,669✔
3679
          }
3680
          break;
265,599✔
3681
        case TSDB_DATA_TYPE_SMALLINT:
45,296✔
3682
        case TSDB_DATA_TYPE_USMALLINT:
3683
          while (j < totalRows) {
3,508,178✔
3684
            if (pBoolList[j] == 0) {
3,462,882✔
3685
              j += 1;
2,704,398✔
3686
              continue;
2,704,398✔
3687
            }
3688
            if (BMIsNull(pBitmap, j)) {
758,484✔
3689
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
108,805✔
3690
            } else {
3691
              ((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j];
649,679✔
3692
            }
3693
            numOfRows += 1;
758,484✔
3694
            j += 1;
758,484✔
3695
          }
3696
          break;
45,296✔
3697
        case TSDB_DATA_TYPE_BOOL:
63,252✔
3698
        case TSDB_DATA_TYPE_TINYINT:
3699
        case TSDB_DATA_TYPE_UTINYINT:
3700
          while (j < totalRows) {
4,315,301✔
3701
            if (pBoolList[j] == 0) {
4,252,049✔
3702
              j += 1;
3,264,300✔
3703
              continue;
3,264,300✔
3704
            }
3705
            if (BMIsNull(pBitmap, j)) {
987,749✔
3706
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
150,441✔
3707
            } else {
3708
              ((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j];
837,308✔
3709
            }
3710
            numOfRows += 1;
987,749✔
3711
            j += 1;
987,749✔
3712
          }
3713
          break;
63,252✔
3714
        case TSDB_DATA_TYPE_DECIMAL64:
×
3715
        case TSDB_DATA_TYPE_DECIMAL:
3716
          while (j < totalRows) {
×
3717
            if (pBoolList[j] == 0) {
×
3718
              j += 1;
×
3719
              continue;
×
3720
            }
3721
            if (BMIsNull(pBitmap, j)) {
×
3722
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
3723
            } else {
3724
              memcpy(pDst->pData + numOfRows * pDst->info.bytes, pDst->pData + j * pDst->info.bytes, pDst->info.bytes);
×
3725
            }
3726
            numOfRows += 1;
×
3727
            j += 1;
×
3728
          }
3729
          break;
×
3730
        case TSDB_DATA_TYPE_BLOB:
×
3731
        case TSDB_DATA_TYPE_MEDIUMBLOB: {
3732
          // impl later
3733
          break;
×
3734
        }
3735
      }
3736
    }
3737

3738
    if (maxRows < numOfRows) {
1,136,697✔
3739
      maxRows = numOfRows;
280,884✔
3740
    }
3741
  }
3742

3743
  pBlock->info.rows = maxRows;
311,641✔
3744
  if (pBitmap != NULL) {
311,641✔
3745
    taosMemoryFree(pBitmap);
309,181✔
3746
  }
3747

3748
  return code;
311,664✔
3749
}
3750

3751
int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
3,581,842✔
3752
  return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
3,581,842✔
3753
}
3754

3755
int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
575,225✔
3756
  if (!pDataBlock || !pOrderInfo) return 0;
575,225!
3757
  for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
1,150,240✔
3758
    SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, i);
574,684✔
3759
    if (pOrder == NULL) {
575,107!
3760
      continue;
×
3761
    }
3762

3763
    pOrder->pColData = taosArrayGet(pDataBlock->pDataBlock, pOrder->slotId);
575,107✔
3764
    if (pOrder->pColData == NULL) {
574,987!
3765
      continue;
×
3766
    }
3767

3768
    pOrder->compFn = getKeyComparFunc(pOrder->pColData->info.type, pOrder->order);
574,987✔
3769
  }
3770

3771
  SSDataBlockSortHelper sortHelper = {.orderInfo = pOrderInfo, .pDataBlock = pDataBlock};
574,957✔
3772

3773
  int32_t rowIdx = 0, nextRowIdx = 1;
574,957✔
3774
  for (; rowIdx < pDataBlock->info.rows && nextRowIdx < pDataBlock->info.rows; ++rowIdx, ++nextRowIdx) {
12,825,795✔
3775
    if (dataBlockCompar(&nextRowIdx, &rowIdx, &sortHelper) < 0) {
12,443,584✔
3776
      break;
194,786✔
3777
    }
3778
  }
3779

3780
  return nextRowIdx;
576,997✔
3781
}
3782

3783
#define BLOCK_DATA_CHECK_TRESSA(o)                      \
3784
  if (!(o)) {                                           \
3785
    uError("blockDataCheck failed! line:%d", __LINE__); \
3786
    return TSDB_CODE_INTERNAL_ERROR;                    \
3787
  }
3788
int32_t blockDataCheck(const SSDataBlock* pDataBlock) {
23,724,203✔
3789
  if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER || NULL == pDataBlock || pDataBlock->info.rows == 0) {
23,724,203!
3790
    return TSDB_CODE_SUCCESS;
3,408,070✔
3791
  }
3792

3793
  BLOCK_DATA_CHECK_TRESSA(pDataBlock->info.rows > 0);
20,316,133!
3794
  if (!pDataBlock->info.dataLoad) {
20,316,133✔
3795
    return TSDB_CODE_SUCCESS;
1,524,872✔
3796
  }
3797

3798
  bool    isVarType = false;
18,791,261✔
3799
  int32_t colLen = 0;
18,791,261✔
3800
  int32_t nextPos = 0;
18,791,261✔
3801
  int64_t checkRows = 0;
18,791,261✔
3802
  int64_t typeValue = 0;
18,791,261✔
3803
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
18,791,261✔
3804
  for (int32_t i = 0; i < colNum; ++i) {
98,019,522✔
3805
    SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, i);
79,165,786✔
3806
    BLOCK_DATA_CHECK_TRESSA(pCol != NULL);
79,157,731!
3807
    isVarType = IS_VAR_DATA_TYPE(pCol->info.type);
79,228,855!
3808
    checkRows = pDataBlock->info.rows;
79,228,855✔
3809
    if (pCol->info.noData == true) continue;
79,228,855✔
3810

3811
    if (isVarType) {
77,930,388✔
3812
      BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset);
19,920,899!
3813
    } else {
3814
      BLOCK_DATA_CHECK_TRESSA(pCol->nullbitmap);
58,009,489!
3815
    }
3816

3817
    nextPos = -1;
77,930,388✔
3818
    for (int64_t r = 0; r < checkRows; ++r) {
78,554,088✔
3819
      if (tsSafetyCheckLevel <= TSDB_SAFETY_CHECK_LEVELL_NORMAL) break;
78,480,792✔
3820
      if (!colDataIsNull_s(pCol, r)) {
1,247,400!
3821
        BLOCK_DATA_CHECK_TRESSA(pCol->pData);
623,700!
3822
        BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.length <= pCol->varmeta.allocLen);
623,700!
3823

3824
        if (isVarType) {
623,700✔
3825
          BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.allocLen > 0);
519,750!
3826
          BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] <= pCol->varmeta.length);
519,750!
3827
          if (pCol->reassigned) {
519,750!
3828
            BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] >= 0);
×
3829
          } else if (0 == r || nextPos == -1) {
519,750!
3830
            nextPos = pCol->varmeta.offset[r];
2,250✔
3831
          } else {
3832
            BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] == nextPos);
517,500!
3833
          }
3834

3835
          char*   pColData = pCol->pData + pCol->varmeta.offset[r];
519,750✔
3836
          int32_t colSize = 0;
519,750✔
3837
          if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
519,750!
3838
            colLen = getJsonValueLen(pColData);
×
3839
          } else if (IS_STR_DATA_BLOB(pCol->info.type)) {
519,750!
3840
            colLen = blobDataTLen(pColData);
×
3841
          } else {
3842
            colLen = varDataTLen(pColData);
519,750✔
3843
          }
3844

3845
          if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
519,750!
3846
            BLOCK_DATA_CHECK_TRESSA(colLen >= CHAR_BYTES);
×
3847
          } else if (IS_STR_DATA_BLOB(pCol->info.type)) {
519,750!
3848
            // check or not
3849
          } else {
3850
            BLOCK_DATA_CHECK_TRESSA(colLen >= VARSTR_HEADER_SIZE);
519,750!
3851
          }
3852
          if (pCol->reassigned) {
519,750!
3853
            BLOCK_DATA_CHECK_TRESSA((pCol->varmeta.offset[r] + colLen) <= pCol->varmeta.length);
×
3854
          } else {
3855
            nextPos += colLen;
519,750✔
3856
            BLOCK_DATA_CHECK_TRESSA(nextPos <= pCol->varmeta.length);
519,750!
3857
          }
3858
          typeValue = *(char*)(pCol->pData + pCol->varmeta.offset[r] + colLen - 1);
519,750✔
3859
        } else {
3860
          if (TSDB_DATA_TYPE_FLOAT == pCol->info.type) {
103,950!
3861
            float v = 0;
×
3862
            GET_TYPED_DATA(v, float, pCol->info.type, colDataGetNumData(pCol, r),
×
3863
                           typeGetTypeModFromColInfo(&pCol->info));
3864
          } else if (TSDB_DATA_TYPE_DOUBLE == pCol->info.type) {
103,950!
3865
            double v = 0;
×
3866
            GET_TYPED_DATA(v, double, pCol->info.type, colDataGetNumData(pCol, r),
×
3867
                           typeGetTypeModFromColInfo(&pCol->info));
3868
          } else if (IS_DECIMAL_TYPE(pCol->info.type)) {
103,950!
3869
            // SKIP for decimal types
3870
          } else {
3871
            GET_TYPED_DATA(typeValue, int64_t, pCol->info.type, colDataGetNumData(pCol, r),
103,950!
3872
                           typeGetTypeModFromColInfo(&pCol->info));
3873
          }
3874
        }
3875
      }
3876
    }
3877
  }
3878
  return TSDB_CODE_SUCCESS;
18,853,736✔
3879
}
3880

3881
int32_t getFirstNotSmallerThanTSRowNum(const char* pts, int32_t startRow, int32_t numOfRows, TSKEY ts) {
364,573✔
3882
  int32_t rowNum = -1;
364,573✔
3883
  if (numOfRows < 7) {
364,573✔
3884
    for (int32_t i = startRow; i < numOfRows; ++i) {
3,490!
3885
      if (ts <= *(TSKEY*)(pts + i * sizeof(TSKEY))) {
3,491!
3886
        rowNum = i;
3,491✔
3887
        break;
3,491✔
3888
      }
3889
    }
3890
    return rowNum;
3,490✔
3891
  }
3892

3893
  int32_t left = startRow;
361,083✔
3894
  int32_t right = numOfRows - 1;
361,083✔
3895
  rowNum = -1;
361,083✔
3896

3897
  while (left <= right) {
2,525,144✔
3898
    int32_t mid = left + (right - left) / 2;
2,164,061✔
3899
    TSKEY midValue = *(TSKEY*)(pts + mid * sizeof(TSKEY));
2,164,061✔
3900

3901
    if (midValue >= ts) {
2,164,061✔
3902
      rowNum = mid;
2,163,995✔
3903
      right = mid - 1;
2,163,995✔
3904
    } else {
3905
      left = mid + 1;
66✔
3906
    }
3907
  }
3908

3909
  return rowNum;
361,083✔
3910
}
3911

3912
int32_t getFirstBiggerThanTSRowNum(const char* pts, int32_t startRow, int32_t numOfRows, TSKEY ts) {
364,755✔
3913
  if (ts == INT64_MAX) return -1;
364,755✔
3914
  int32_t rowNum = -1;
359,985✔
3915
  if (numOfRows < 7) {
359,985✔
3916
    for (int32_t i = 0; i < numOfRows; ++i) {
176✔
3917
      if (ts < *(TSKEY*)(pts + i * sizeof(TSKEY))) {
136!
3918
        rowNum = i;
×
3919
        break;
×
3920
      }
3921
    }
3922
    return rowNum;
40✔
3923
  }
3924
  int32_t left = startRow;
359,945✔
3925
  int32_t right = numOfRows - 1;
359,945✔
3926
  rowNum = -1;
359,945✔
3927
  while (left <= right) {
2,879,172✔
3928
    int32_t mid = left + (right - left) / 2;
2,519,227✔
3929
    TSKEY midValue = *(TSKEY*)(pts + mid * sizeof(TSKEY));
2,519,227✔
3930

3931
    if (midValue > ts) {
2,519,227✔
3932
      rowNum = mid;
45✔
3933
      right = mid - 1;
45✔
3934
    } else {
3935
      left = mid + 1;
2,519,182✔
3936
    }
3937
  }
3938
  return rowNum;
359,945✔
3939
}
3940

3941
int32_t getFirstNotBiggerThanTSRowNum(const char* pts, int32_t numOfRows, TSKEY ts) {
×
3942
  int32_t rowNum = -1;
×
3943
  for (int32_t i = numOfRows - 1; i >= 0; --i) {
×
3944
    if (ts >= *(TSKEY*)(pts + i * sizeof(TSKEY))) {
×
3945
      rowNum = i;
×
3946
      break;
×
3947
    }
3948
  }
3949
  return rowNum;
×
3950
}
3951

3952
static int32_t resetVarDataOffset(int32_t* pOffset, int32_t numOfRows) {
719,783✔
3953
  int32_t offset = 0;
719,783✔
3954
  for (int32_t i = 0; i < numOfRows; ++i) {
1,439,644!
3955
    if (pOffset[i] < 0) {
1,439,873✔
3956
      continue;
719,861✔
3957
    } else {
3958
      offset = pOffset[i];
720,012✔
3959
      break;
720,012✔
3960
    }
3961
  }
3962
  if (offset > 0) {
719,783✔
3963
    for (int32_t i = 0; i < numOfRows; ++i) {
2,310✔
3964
      if (pOffset[i] < 0) {
2,250✔
3965
        continue;
1,125✔
3966
      }
3967
      pOffset[i] -= offset;
1,125✔
3968
    }
3969
  }
3970
  return offset;
719,783✔
3971
}
3972

3973
void colDataGetOffsetAndLen(const SColumnInfoData* pColumnInfoData, int32_t numOfRows, int32_t startIndex,  int32_t endIndex, int32_t* pOffset, int32_t* pLen) {
1,459,680✔
3974
  *pOffset = 0;
1,459,680✔
3975
  *pLen = 0;
1,459,680✔
3976
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
1,459,680!
3977
    for (int32_t row = startIndex; row <= endIndex; ++row) {
1,440,323!
3978
      if (pColumnInfoData->varmeta.offset[row] >= 0) {
1,440,324✔
3979
        *pOffset = pColumnInfoData->varmeta.offset[row];
720,162✔
3980
        break;
720,162✔
3981
      }
3982
    }
3983
    if (endIndex + 1 < numOfRows) {
720,161✔
3984
      for (int32_t row = endIndex + 1; row < numOfRows; ++row) {
156!
3985
        if (pColumnInfoData->varmeta.offset[row] >= 0) {
156✔
3986
          *pLen = pColumnInfoData->varmeta.offset[row] - *pOffset;
78✔
3987
          return;
78✔
3988
        }
3989
      }
3990
    }
3991
    *pLen = pColumnInfoData->varmeta.length - *pOffset;
720,083✔
3992
  } else {
3993
    if (pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) {
739,519!
3994
      return;
×
3995
    } else {
3996
      *pOffset = pColumnInfoData->info.bytes * startIndex;
739,519✔
3997
      *pLen = pColumnInfoData->info.bytes * (endIndex - startIndex + 1);
739,519✔
3998
    }
3999
  }
4000
}
4001

4002
size_t blockDataGetSizeOfRows(const SSDataBlock* pBlock, int32_t startIndex, int32_t endIndex) {
364,864✔
4003
  size_t total = 0;
364,864✔
4004
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
364,864✔
4005
  for (int32_t i = 0; i < numOfCols; ++i) {
1,094,707✔
4006
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
729,844✔
4007
    if (pColInfoData == NULL) {
729,844!
4008
      continue;
×
4009
    }
4010

4011
    int32_t colSize = 0;
729,844✔
4012
    int32_t offset = 0;
729,844✔
4013
    int32_t numOfRows = endIndex - startIndex + 1;
729,844✔
4014

4015
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
729,844!
4016
      total += sizeof(int32_t) * numOfRows;
360,081✔
4017
    } else {
4018
      if (pColInfoData->info.type != TSDB_DATA_TYPE_NULL) {
369,763!
4019
        total += (pColInfoData->info.bytes * numOfRows) + BitmapLen(numOfRows);
369,763✔
4020
      }
4021
    }
4022

4023
    colDataGetOffsetAndLen(pColInfoData, pBlock->info.rows, startIndex, endIndex, &offset, &colSize);
729,844✔
4024
    total += colSize;
729,843✔
4025
  }
4026

4027
  return total;
364,863✔
4028
}
4029

4030
int32_t blockGetEncodeSizeOfRows(const SSDataBlock* pBlock, int32_t startIndex, int32_t endIndex){
364,863✔
4031
  return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSizeOfRows(pBlock, startIndex, endIndex);
364,863✔
4032
}
4033

4034
// return error code if failed
4035
// pLen: return length of encoded data
4036
int32_t blockEncodeAsRows(const SSDataBlock* pBlock, char* data, size_t dataBuflen, int32_t numOfCols, int32_t startIndex, 
364,862✔
4037
                          int32_t endIndex, int32_t* pLen) {
4038
  *pLen = 0;
364,862✔
4039
  int32_t code = blockDataCheck(pBlock);
364,862✔
4040
  if (code != TSDB_CODE_SUCCESS) {
364,860!
4041
    return code;
×
4042
  }
4043

4044
  int32_t dataLen = 0;
364,860✔
4045

4046
  // todo extract method
4047
  int32_t* version = (int32_t*)data;
364,860✔
4048
  *version = BLOCK_VERSION_1;
364,860✔
4049
  data += sizeof(int32_t);
364,860✔
4050

4051
  int32_t* actualLen = (int32_t*)data;
364,860✔
4052
  data += sizeof(int32_t);
364,860✔
4053

4054
  int32_t* rows = (int32_t*)data;
364,860✔
4055
  *rows = pBlock->info.rows;
364,860✔
4056
  data += sizeof(int32_t);
364,860✔
4057
  if (*rows <= 0) {
364,860!
4058
    uError("Invalid rows %d in block", *rows);
×
4059
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
4060
  }
4061
  if(*rows <= endIndex) {
364,860!
4062
    uError("Invalid endIdex %d, there is %d rows in block", endIndex, *rows);
×
4063
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
4064
  }
4065
  int32_t realRows = endIndex - startIndex + 1;
364,860✔
4066
  *rows = realRows;
364,860✔
4067

4068
  int32_t* cols = (int32_t*)data;
364,860✔
4069
  *cols = numOfCols;
364,860✔
4070
  data += sizeof(int32_t);
364,860✔
4071

4072
  // flag segment.
4073
  // the inital bit is for column info
4074
  int32_t* flagSegment = (int32_t*)data;
364,860✔
4075
  *flagSegment = (1 << 31);
364,860✔
4076

4077
  data += sizeof(int32_t);
364,860✔
4078

4079
  uint64_t* groupId = (uint64_t*)data;
364,860✔
4080
  data += sizeof(uint64_t);
364,860✔
4081

4082
  for (int32_t i = 0; i < numOfCols; ++i) {
1,094,703✔
4083
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
729,841✔
4084
    if (pColInfoData == NULL) {
729,841!
4085
      return terrno;
×
4086
    }
4087

4088
    *((int8_t*)data) = pColInfoData->info.type;
729,841✔
4089
    data += sizeof(int8_t);
729,841✔
4090

4091
    int32_t bytes = pColInfoData->info.bytes;
729,841✔
4092
    *((int32_t*)data) = bytes;
729,841✔
4093
    if (IS_DECIMAL_TYPE(pColInfoData->info.type)) {
729,841!
4094
      fillBytesForDecimalType((int32_t*)data, pColInfoData->info.type, pColInfoData->info.precision,
×
4095
                              pColInfoData->info.scale);
×
4096
    }
4097
    data += sizeof(int32_t);
729,843✔
4098
  }
4099

4100
  int32_t* colSizes = (int32_t*)data;
364,862✔
4101
  data += numOfCols * sizeof(int32_t);
364,862✔
4102

4103
  dataLen = blockDataGetSerialMetaSize(numOfCols);
364,862✔
4104

4105
  int32_t numOfRows = pBlock->info.rows;
364,863✔
4106
  for (int32_t col = 0; col < numOfCols; ++col) {
1,094,707✔
4107
    SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);
729,843✔
4108
    if (pColRes == NULL) {
729,842✔
4109
      return terrno;
1✔
4110
    }
4111

4112
    // copy the null bitmap
4113
    size_t metaSize = 0;
729,841✔
4114
    if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
729,841!
4115
      metaSize = realRows * sizeof(int32_t);
360,081✔
4116
      if(dataLen + metaSize > dataBuflen) goto _exit;
360,081!
4117
      TAOS_UNUSED(memcpy(data, (char*)pColRes->varmeta.offset + (startIndex * sizeof(int32_t)), metaSize));
360,081✔
4118
      TAOS_UNUSED(resetVarDataOffset((int32_t*)data, realRows));
360,081✔
4119
    } else {
4120
      metaSize = BitmapLen(realRows);
369,760✔
4121
      if(dataLen + metaSize > dataBuflen) goto _exit;
369,760!
4122
      for (int32_t j = 0; j < realRows; ++j) {
36,418,314✔
4123
        if (colDataIsNull_f(pColRes, j + startIndex)) {
36,048,554!
4124
          colDataSetNull_f(data, j);
594✔
4125
        }
4126
      }
4127
    }
4128

4129
    data += metaSize;
729,841✔
4130
    dataLen += metaSize;
729,841✔
4131

4132
    int32_t skipOffset;
4133
    colDataGetOffsetAndLen(pColRes, numOfRows, startIndex, endIndex, &skipOffset, &colSizes[col]);
729,841✔
4134
    dataLen += colSizes[col];
729,844✔
4135
    if (dataLen > dataBuflen) goto _exit;
729,844!
4136
    if (pColRes->pData != NULL) {
729,844!
4137
      (void)memmove(data, pColRes->pData + skipOffset, colSizes[col]);
729,844✔
4138
    }
4139
    data += colSizes[col];
729,844✔
4140

4141
    if (colSizes[col] <= 0 && !colDataIsNull_s(pColRes, 0) && pColRes->info.type != TSDB_DATA_TYPE_NULL) {
729,844!
4142
      uError("Invalid colSize:%d colIdx:%d colType:%d while encoding block", colSizes[col], col, pColRes->info.type);
×
4143
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
4144
    }
4145
    
4146
    colSizes[col] = htonl(colSizes[col]);
729,844✔
4147
    //    uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type,
4148
    //    htonl(colSizes[col]), colSizes[col]);
4149
  }
4150

4151
  bool* blankFill = (bool*)data;
364,864✔
4152
  *blankFill = pBlock->info.blankFill;
364,864✔
4153
  data += sizeof(bool);
364,864✔
4154

4155
  *actualLen = dataLen;
364,864✔
4156
#ifndef NO_UNALIGNED_ACCESS
4157
  *groupId = pBlock->info.id.groupId;
364,864✔
4158
#else
4159
  taosSetPUInt64Aligned(groupId, &pBlock->info.id.groupId);
4160
#endif
4161
  if (dataLen > dataBuflen) goto _exit;
364,864!
4162

4163
  *pLen = dataLen;
364,864✔
4164
  return TSDB_CODE_SUCCESS;
364,864✔
4165
_exit:
×
4166
  uError("blockEncode dataLen:%d, dataBuflen:%zu", dataLen, dataBuflen);
×
4167
  return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
4168
}
4169

4170
int32_t getTsColDataOffset(SSDataBlock* pBlock, int32_t* colLen, int32_t numOfRows, int32_t tsColSlotId, int32_t* pOffset) {
364,623✔
4171
  *pOffset = 0;
364,623✔
4172
  for (int32_t i = 0; i <= tsColSlotId; ++i) {
730,115✔
4173
    int oneColsLen = htonl(colLen[i]);
365,351✔
4174
    if (oneColsLen < 0) {
365,351!
4175
      uError("block decode colLen:%d error, colIdx:%d", oneColsLen, i);
×
4176
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
4177
      return terrno;
×
4178
    }
4179

4180
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
365,351✔
4181
    if (pColInfoData == NULL) {
365,367!
4182
      return terrno;
×
4183
    }
4184

4185
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
365,492!
4186
      *pOffset += sizeof(int32_t) * numOfRows;
×
4187
    } else {
4188
      *pOffset += BitmapLen(numOfRows);
365,492✔
4189
    }
4190
    if (i != tsColSlotId) {
365,492✔
4191
      *pOffset += oneColsLen;
725✔
4192
    }
4193
  }
4194
  return TSDB_CODE_SUCCESS;
364,764✔
4195
}
4196

4197
int32_t blockSpecialDecodeLaterPart(SSDataBlock* pBlock, const char* pData, int32_t tsColSlotId, TSKEY start, TSKEY end) {
364,757✔
4198
  int32_t code = TSDB_CODE_SUCCESS;
364,757✔
4199

4200
  const char* pStart = pData;
364,757✔
4201

4202
  int32_t version = *(int32_t*)pStart;
364,757✔
4203
  pStart += sizeof(int32_t);
364,757✔
4204

4205
  // total length sizeof(int32_t)
4206
  int32_t dataLen = *(int32_t*)pStart;
364,757✔
4207
  pStart += sizeof(int32_t);
364,757✔
4208

4209
  // total rows sizeof(int32_t)
4210
  int32_t numOfRows = *(int32_t*)pStart;
364,757✔
4211
  pStart += sizeof(int32_t);
364,757✔
4212
  if (numOfRows <= 0) {
364,757!
4213
    uError("block decode numOfRows:%d error", numOfRows);
×
4214
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
4215
    return terrno;
×
4216
  }
4217

4218
  // total columns sizeof(int32_t)
4219
  int32_t numOfCols = *(int32_t*)pStart;
364,757✔
4220
  pStart += sizeof(int32_t);
364,757✔
4221

4222
  // has column info segment
4223
  int32_t flagSeg = *(int32_t*)pStart;
364,757✔
4224
  int32_t hasColumnInfo = (flagSeg >> 31);
364,757✔
4225
  pStart += sizeof(int32_t);
364,757✔
4226

4227
  // group id sizeof(uint64_t)
4228
#ifndef NO_UNALIGNED_ACCESS
4229
  pBlock->info.id.groupId = *(uint64_t*)pStart;
364,757✔
4230
#else
4231
  taosSetPUInt64Aligned(&pBlock->info.id.groupId, (uint64_t*)pStart);
4232
#endif
4233
  pStart += sizeof(uint64_t);
364,757✔
4234

4235
  if (pBlock->pDataBlock == NULL) {
364,757!
4236
    pBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols);
364,765✔
4237
    if (pBlock->pDataBlock == NULL) {
364,801!
4238
      return terrno;
×
4239
    }
4240
  }
4241

4242
  for (int32_t i = 0; i < numOfCols; ++i) {
1,094,436✔
4243
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
729,548✔
4244
    if (pColInfoData == NULL) {
729,551!
4245
      return terrno;
×
4246
    }
4247

4248
    pColInfoData->info.type = *(int8_t*)pStart;
729,551✔
4249
    pStart += sizeof(int8_t);
729,551✔
4250

4251
    pColInfoData->info.bytes = *(int32_t*)pStart;
729,551✔
4252
    if (IS_DECIMAL_TYPE(pColInfoData->info.type)) {
729,551!
4253
      extractDecimalTypeInfoFromBytes(&pColInfoData->info.bytes, &pColInfoData->info.precision,
×
4254
                                      &pColInfoData->info.scale);
4255
    }
4256
    pStart += sizeof(int32_t);
729,634✔
4257

4258
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
729,634!
4259
      pBlock->info.hasVarCol = true;
359,987✔
4260
    }
4261
  }
4262

4263
  int32_t* colLen = (int32_t*)pStart;
364,888✔
4264
  pStart += sizeof(int32_t) * numOfCols;
364,888✔
4265

4266
  int32_t tsLen = htonl(colLen[tsColSlotId]);
364,888✔
4267
  int32_t tsColOffset;
4268
  code = getTsColDataOffset(pBlock, colLen, numOfRows, tsColSlotId, &tsColOffset);
364,888✔
4269
  if (code) {
364,765!
4270
    return code;
×
4271
  }
4272
  const char* pts = pStart + tsColOffset;
364,765✔
4273
  int32_t     firstRowNum = getFirstNotSmallerThanTSRowNum(pts, 0, numOfRows, start);
364,765✔
4274
  if (firstRowNum < 0) {
364,804!
4275
    pBlock->info.rows = 0;
×
4276
    return TSDB_CODE_SUCCESS;
×
4277
  }
4278
  int32_t lastRowNumNext = getFirstBiggerThanTSRowNum(pts, firstRowNum, numOfRows, end);
364,804✔
4279
  if (lastRowNumNext < 0) {
364,822✔
4280
    lastRowNumNext = numOfRows;
364,820✔
4281
  } else if (lastRowNumNext == 0) {
2!
4282
    pBlock->info.rows = 0;
×
4283
    return TSDB_CODE_SUCCESS;
×
4284
  }
4285

4286
  int32_t realRows = lastRowNumNext - firstRowNum;
364,822✔
4287
  int32_t leftRows = numOfRows - lastRowNumNext;
364,822✔
4288

4289
  code = blockDataEnsureCapacity(pBlock, realRows);
364,822✔
4290
  if (code) {
364,676!
4291
    return code;
×
4292
  }
4293

4294
  for (int32_t i = 0; i < numOfCols; ++i) {
1,094,274✔
4295
    int32_t oneColsLen = htonl(colLen[i]);
729,200✔
4296
    int32_t offset = 0;
729,200✔
4297
    int32_t leftlen = 0;
729,200✔
4298
    if (oneColsLen < 0) {
729,200!
4299
      uError("block decode colLen:%d error, colIdx:%d", oneColsLen, i);
×
4300
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
4301
      return terrno;
×
4302
    }
4303

4304
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
729,200✔
4305
    if (pColInfoData == NULL) {
729,217!
4306
      return terrno;
×
4307
    }
4308

4309
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
729,217!
4310
      pStart += sizeof(int32_t) * firstRowNum;
359,685✔
4311
      memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * realRows);
359,685✔
4312
      pStart += sizeof(int32_t) * realRows;
359,685✔
4313

4314
      offset = resetVarDataOffset(pColInfoData->varmeta.offset, realRows);
359,685✔
4315
      if (leftRows > 0) {
360,005✔
4316
        for (int32_t j = 0; j < leftRows; ++j) {
453✔
4317
          if (*((int32_t*)pStart + j) != -1) {
444✔
4318
            leftlen = oneColsLen - *((int32_t*)pStart + j);
225✔
4319
          }
4320
        }
4321
      }
4322
      oneColsLen -= (offset + leftlen);
360,005✔
4323
      pStart += sizeof(int32_t) * leftRows;
360,005✔
4324

4325
      if (oneColsLen > 0 && pColInfoData->varmeta.allocLen < oneColsLen) {
360,005!
4326
        char* tmp = taosMemoryRealloc(pColInfoData->pData, oneColsLen);
359,973!
4327
        if (tmp == NULL) {
360,034!
4328
          return terrno;
×
4329
        }
4330

4331
        pColInfoData->pData = tmp;
360,034✔
4332
        pColInfoData->varmeta.allocLen = oneColsLen;
360,034✔
4333
      }
4334

4335
      pColInfoData->varmeta.length = oneColsLen;
360,066✔
4336
    } else {
4337
      offset = oneColsLen / numOfRows * firstRowNum;
369,532✔
4338
      if (leftRows > 0) {
369,532✔
4339
        leftlen = oneColsLen / numOfRows * leftRows;
9✔
4340
      }
4341
      oneColsLen -= (offset + leftlen);
369,532✔
4342

4343
      memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(realRows));
369,532✔
4344
      for (int32_t j = 0; j < realRows; ++j) {
36,362,937✔
4345
        if (BMIsNull(pStart, j + firstRowNum)) {
35,993,405✔
4346
          colDataSetNull_f(pColInfoData->nullbitmap, j);
594✔
4347
        }
4348
      }
4349
      pStart += BitmapLen(numOfRows);
369,532✔
4350
    }
4351

4352
    pColInfoData->hasNull = true;
729,598✔
4353

4354
    if (oneColsLen > 0) {
729,598!
4355
      pStart += offset;
729,743✔
4356
      memcpy(pColInfoData->pData, pStart, oneColsLen);
729,743✔
4357
    } else if (!colDataIsNull_s(pColInfoData, 0) && pColInfoData->info.type != TSDB_DATA_TYPE_NULL) {
×
4358
      uError("block decode colLen:%d error, colIdx:%d, type:%d", oneColsLen, i, pColInfoData->info.type);
×
4359
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
4360
      return terrno;
×
4361
    }
4362

4363
    pStart += oneColsLen;
729,598✔
4364
    pStart += leftlen;
729,598✔
4365
  }
4366

4367
  bool blankFill = *(bool*)pStart;
365,074✔
4368
  pStart += sizeof(bool);
365,074✔
4369

4370
  pBlock->info.dataLoad = 1;
365,074✔
4371
  pBlock->info.rows = realRows;
365,074✔
4372
  pBlock->info.blankFill = blankFill;
365,074✔
4373
  if (pStart - pData != dataLen) {
365,074!
4374
    uError("block decode msg len error, pStart:%p, pData:%p, dataLen:%d", pStart, pData, dataLen);
×
4375
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
4376
    return terrno;
×
4377
  }
4378

4379
  code = blockDataCheck(pBlock);
365,074✔
4380
  if (code != TSDB_CODE_SUCCESS) {
364,699✔
4381
    terrno = code;
109✔
4382
    return code;
×
4383
  }
4384

4385
  return TSDB_CODE_SUCCESS;
364,590✔
4386
}
4387

4388
int32_t getStreamBlockTS(SSDataBlock* pBlock, int32_t tsColSlotId, int32_t row, TSKEY* ts) {
720,188✔
4389
  if (pBlock == NULL || ts == NULL) {
720,188!
4390
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
4391
  }
4392

4393
  if (pBlock->info.rows <= 0) {
720,188!
4394
    return TSDB_CODE_SUCCESS;
×
4395
  }
4396
  int32_t cols = taosArrayGetSize(pBlock->pDataBlock);
720,188✔
4397
  if (tsColSlotId < 0 || tsColSlotId >= cols) {
720,188!
4398
    uError("Invalid tsColSlotId %d, block has %d columns", tsColSlotId, cols);
×
4399
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
4400
  }
4401

4402
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, tsColSlotId);
720,188✔
4403
  if (pColInfoData == NULL) {
720,188!
4404
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
4405
  }
4406

4407
  *ts = *(TSKEY*)(pColInfoData->pData + row * sizeof(TSKEY));
720,188✔
4408
  return TSDB_CODE_SUCCESS;
720,188✔
4409
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc