• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.43
/source/common/src/tdatablock.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tdatablock.h"
18
#include "tcompare.h"
19
#include "tlog.h"
20
#include "tname.h"
21
#include "tglobal.h"
22

23
#define MALLOC_ALIGN_BYTES 32
24

25
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
32,135,139✔
26
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
32,135,139!
27
    if (pColumnInfoData->reassigned) {
7,147,440!
28
      int32_t totalSize = 0;
×
29
      for (int32_t row = 0; row < numOfRows; ++row) {
×
30
        char*   pColData = pColumnInfoData->pData + pColumnInfoData->varmeta.offset[row];
×
31
        int32_t colSize = 0;
×
32
        if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
×
33
          colSize = getJsonValueLen(pColData);
×
34
        } else {
35
          colSize = varDataTLen(pColData);
×
36
        }
37
        totalSize += colSize;
×
38
      }
39
      return totalSize;
×
40
    }
41
    return pColumnInfoData->varmeta.length;
7,147,440✔
42
  } else {
43
    if (pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) {
24,987,699✔
44
      return 0;
13,990✔
45
    } else {
46
      return pColumnInfoData->info.bytes * numOfRows;
24,973,709✔
47
    }
48
  }
49
}
50

51
int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx) {
4,809,633✔
52
  if (colDataIsNull_s(pColumnInfoData, rowIdx)) {
9,619,266!
53
    return 0;
×
54
  }
55

56
  if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) return pColumnInfoData->info.bytes;
4,809,633!
57
  if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON)
1,066,145!
58
    return getJsonValueLen(colDataGetData(pColumnInfoData, rowIdx));
×
59
  else
60
    return varDataTLen(colDataGetData(pColumnInfoData, rowIdx));
1,066,145!
61
}
62

63
int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
115,176,867✔
64
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
115,176,867!
65
    return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows;
12,289,513✔
66
  } else {
67
    return ((pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) ? 0 : pColumnInfoData->info.bytes * numOfRows) +
102,887,354!
68
           BitmapLen(numOfRows);
102,887,354✔
69
  }
70
}
71

72
int32_t getJsonValueLen(const char* data) {
26,128✔
73
  int32_t dataLen = 0;
26,128✔
74
  if (*data == TSDB_DATA_TYPE_NULL) {
26,128✔
75
    dataLen = CHAR_BYTES;
1,544✔
76
  } else if (*data == TSDB_DATA_TYPE_NCHAR) {
24,584✔
77
    dataLen = varDataTLen(data + CHAR_BYTES) + CHAR_BYTES;
3,744✔
78
  } else if (*data == TSDB_DATA_TYPE_DOUBLE) {
20,840✔
79
    dataLen = DOUBLE_BYTES + CHAR_BYTES;
3,298✔
80
  } else if (*data == TSDB_DATA_TYPE_BOOL) {
17,542✔
81
    dataLen = CHAR_BYTES + CHAR_BYTES;
1,032✔
82
  } else if (tTagIsJson(data)) {  // json string
16,510!
83
    dataLen = ((STag*)(data))->len;
16,510✔
84
  } else {
85
    uError("Invalid data type:%d in Json", *data);
×
86
  }
87
  return dataLen;
26,128✔
88
}
89

90
static int32_t getDataLen(int32_t type, const char* pData) {
627,200,330✔
91
  int32_t dataLen = 0;
627,200,330✔
92
  if (type == TSDB_DATA_TYPE_JSON) {
627,200,330✔
93
    dataLen = getJsonValueLen(pData);
16,176✔
94
  } else {
95
    dataLen = varDataTLen(pData);
627,184,154✔
96
  }
97
  return dataLen;
627,723,007✔
98
}
99

100
static int32_t colDataSetValHelp(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
2,147,483,647✔
101
    if (isNull || pData == NULL) {
2,147,483,647!
102
    // There is a placehold for each NULL value of binary or nchar type.
103
    if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
179,482,653!
104
      pColumnInfoData->varmeta.offset[rowIndex] = -1;  // it is a null value of VAR type.
×
105
    } else {
106
      colDataSetNull_f_s(pColumnInfoData, rowIndex);
193,619,898✔
107
    }
108

109
    pColumnInfoData->hasNull = true;
179,482,653✔
110
    return 0;
179,482,653✔
111
  }
112

113
  int32_t type = pColumnInfoData->info.type;
2,147,483,647✔
114
  if (IS_VAR_DATA_TYPE(type)) {
2,147,483,647!
115
    int32_t dataLen = getDataLen(type, pData);
596,223,213✔
116
    if (pColumnInfoData->varmeta.offset[rowIndex] > 0) {
627,161,926✔
117
      pColumnInfoData->varmeta.length = pColumnInfoData->varmeta.offset[rowIndex];
240✔
118
    }
119

120
    SVarColAttr* pAttr = &pColumnInfoData->varmeta;
627,161,926✔
121
    if (pAttr->allocLen < pAttr->length + dataLen) {
627,161,926✔
122
      uint32_t newSize = pAttr->allocLen;
28,080,593✔
123
      if (newSize <= 1) {
28,080,593✔
124
        newSize = 8;
8,824,386✔
125
      }
126

127
      while (newSize < pAttr->length + dataLen) {
66,398,835✔
128
        newSize = newSize * 1.5;
38,318,242✔
129
        if (newSize > UINT32_MAX) {
130
          return TSDB_CODE_OUT_OF_MEMORY;
131
        }
132
      }
133

134
      char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
28,080,593✔
135
      if (buf == NULL) {
28,087,389!
UNCOV
136
        return terrno;
×
137
      }
138

139
      pColumnInfoData->pData = buf;
28,087,581✔
140
      pAttr->allocLen = newSize;
28,087,581✔
141
    }
142

143
    uint32_t len = pColumnInfoData->varmeta.length;
627,168,914✔
144
    pColumnInfoData->varmeta.offset[rowIndex] = len;
627,168,914✔
145

146
    (void)memmove(pColumnInfoData->pData + len, pData, dataLen);
627,168,914✔
147
    pColumnInfoData->varmeta.length += dataLen;
627,168,914✔
148
  } else {
149
    memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex, pData, pColumnInfoData->info.bytes);
2,147,483,647✔
150
    colDataClearNull_f(pColumnInfoData->nullbitmap, rowIndex);
2,147,483,647✔
151
  }
152

153
  return 0;
2,147,483,647✔
154
}
155

156
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
2,147,483,647✔
157
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
2,147,483,647!
158
   pColumnInfoData->varmeta.offset[rowIndex] = -1;
624,825,042✔
159
  }
160

161
  return colDataSetValHelp(pColumnInfoData, rowIndex, pData, isNull);
2,147,483,647✔
162
}
163

164
int32_t colDataSetValOrCover(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
138,600,205✔
165
  return colDataSetValHelp(pColumnInfoData, rowIndex, pData, isNull);
138,600,205✔
166
}
167

168
int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx,
×
169
                           const char* pData) {
170
  int32_t type = pColumnInfoData->info.type;
×
171
  if (IS_VAR_DATA_TYPE(type)) {
×
172
    pColumnInfoData->varmeta.offset[dstRowIdx] = pColumnInfoData->varmeta.offset[srcRowIdx];
×
173
    pColumnInfoData->reassigned = true;
×
174
  } else {
175
    memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * dstRowIdx, pData, pColumnInfoData->info.bytes);
×
176
    colDataClearNull_f(pColumnInfoData->nullbitmap, dstRowIdx);
×
177
  }
178

179
  return 0;
×
180
}
181

182
static int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) {
1,755,764✔
183
  if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
1,755,764!
184
    return TSDB_CODE_SUCCESS;
×
185
  }
186

187
  if (pColumnInfoData->varmeta.allocLen >= newSize) {
1,755,764!
188
    return TSDB_CODE_SUCCESS;
×
189
  }
190

191
  if (pColumnInfoData->varmeta.allocLen < newSize) {
1,755,764!
192
    char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
1,755,819✔
193
    if (buf == NULL) {
1,756,022!
UNCOV
194
      return terrno;
×
195
    }
196

197
    pColumnInfoData->pData = buf;
1,756,022✔
198
    pColumnInfoData->varmeta.allocLen = newSize;
1,756,022✔
199
  }
200

201
  return TSDB_CODE_SUCCESS;
1,755,967✔
202
}
203

204
static int32_t doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData,
15,941,333✔
205
                            int32_t itemLen, int32_t numOfRows, bool trimValue) {
206
  if (pColumnInfoData->info.bytes < itemLen) {
15,941,333!
207
    uWarn("column/tag actual data len %d is bigger than schema len %d, trim it:%d", itemLen,
×
208
          pColumnInfoData->info.bytes, trimValue);
209
    if (trimValue) {
×
210
      itemLen = pColumnInfoData->info.bytes;
×
211
    } else {
212
      return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
213
    }
214
  }
215

216
  size_t   start = 1;
15,941,333✔
217
  int32_t  t = 0;
15,941,333✔
218
  int32_t  count = log(numOfRows) / log(2);
15,941,333✔
219
  uint32_t startOffset =
15,941,333✔
220
      (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) ? pColumnInfoData->varmeta.length : (currentRow * itemLen);
15,941,333!
221

222
  // the first item
223
  memcpy(pColumnInfoData->pData + startOffset, pData, itemLen);
15,941,333✔
224

225
  while (t < count) {
54,104,064✔
226
    int32_t xlen = 1 << t;
38,162,731✔
227
    memcpy(pColumnInfoData->pData + start * itemLen + startOffset, pColumnInfoData->pData + startOffset,
38,162,731✔
228
           xlen * itemLen);
38,162,731✔
229
    t += 1;
38,162,731✔
230
    start += xlen;
38,162,731✔
231
  }
232

233
  // the tail part
234
  if (numOfRows > start) {
15,941,333✔
235
    memcpy(pColumnInfoData->pData + start * itemLen + startOffset, pColumnInfoData->pData + startOffset,
6,291,128✔
236
           (numOfRows - start) * itemLen);
6,291,128✔
237
  }
238

239
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
15,941,333✔
240
    for (int32_t i = 0; i < numOfRows; ++i) {
828,798,103✔
241
      pColumnInfoData->varmeta.offset[i + currentRow] = pColumnInfoData->varmeta.length + i * itemLen;
824,650,692✔
242
    }
243

244
    pColumnInfoData->varmeta.length += numOfRows * itemLen;
4,147,411✔
245
  }
246

247
  return TSDB_CODE_SUCCESS;
15,941,333✔
248
}
249

250
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows,
15,942,825✔
251
                         bool trimValue) {
252
  int32_t len = pColumnInfoData->info.bytes;
15,942,825✔
253
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
15,942,825✔
254
    if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
4,147,014✔
255
      len = getJsonValueLen(pData);
128✔
256
    } else {
257
      len = varDataTLen(pData);
4,146,886✔
258
    }
259
    if (pColumnInfoData->varmeta.allocLen < (numOfRows * len + pColumnInfoData->varmeta.length)) {
4,147,014✔
260
      int32_t code = colDataReserve(pColumnInfoData, (numOfRows * len + pColumnInfoData->varmeta.length));
1,755,924✔
261
      if (code != TSDB_CODE_SUCCESS) {
1,756,020!
262
        return code;
×
263
      }
264
    }
265
  }
266

267
  return doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows, trimValue);
15,942,921✔
268
}
269

270
void colDataSetNItemsNull(SColumnInfoData* pColumnInfoData, uint32_t currentRow, uint32_t numOfRows) {
759,194✔
271
  pColumnInfoData->hasNull = true;
759,194✔
272

273
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
759,194!
274
    memset(&pColumnInfoData->varmeta.offset[currentRow], -1, sizeof(int32_t) * numOfRows);
149,891✔
275
  } else {
276
    if (numOfRows < 16) {
609,303!
277
      for (int32_t i = 0; i < numOfRows; ++i) {
1,221,596✔
278
        colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
612,293✔
279
      }
280
    } else {
281
      int32_t i = 0;
×
282
      for (; i < numOfRows; ++i) {
×
283
        if (BitPos(currentRow + i)) {
×
284
          colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
×
285
        } else {
286
          break;
×
287
        }
288
      }
289

290
      int32_t bytes = (numOfRows - i) / 8;
×
291
      memset(&BMCharPos(pColumnInfoData->nullbitmap, currentRow + i), 0xFF, bytes);
×
292
      i += bytes * 8;
×
293

294
      for (; i < numOfRows; ++i) {
×
295
        colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
×
296
      }
297
    }
298
  }
299
}
759,194✔
300

301
int32_t colDataCopyAndReassign(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData,
×
302
                               uint32_t numOfRows) {
303
  int32_t code = colDataSetVal(pColumnInfoData, currentRow, pData, false);
×
304
  if (code) {
×
305
    return code;
×
306
  }
307

308
  if (numOfRows > 1) {
×
309
    int32_t* pOffset = pColumnInfoData->varmeta.offset;
×
310
    memset(&pOffset[currentRow + 1], pOffset[currentRow], sizeof(pOffset[0]) * (numOfRows - 1));
×
311
    pColumnInfoData->reassigned = true;
×
312
  }
313

314
  return code;
×
315
}
316

317
int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows,
3,340✔
318
                          bool isNull) {
319
  int32_t len = pColumnInfoData->info.bytes;
3,340✔
320
  if (isNull) {
3,340!
321
    colDataSetNItemsNull(pColumnInfoData, currentRow, numOfRows);
×
322
    pColumnInfoData->hasNull = true;
×
323
    return 0;
×
324
  }
325

326
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
3,340!
327
    return colDataCopyAndReassign(pColumnInfoData, currentRow, pData, numOfRows);
×
328
  } else {
329
    int32_t  colBytes = pColumnInfoData->info.bytes;
3,340✔
330
    int32_t  colOffset = currentRow * colBytes;
3,340✔
331
    uint32_t num = 1;
3,340✔
332

333
    void* pStart = pColumnInfoData->pData + colOffset;
3,340✔
334
    memcpy(pStart, pData, colBytes);
3,340✔
335
    colOffset += num * colBytes;
3,340✔
336

337
    while (num < numOfRows) {
11,398✔
338
      int32_t maxNum = num << 1;
8,058✔
339
      int32_t tnum = maxNum > numOfRows ? (numOfRows - num) : num;
8,058✔
340

341
      memcpy(pColumnInfoData->pData + colOffset, pStart, tnum * colBytes);
8,058✔
342
      colOffset += tnum * colBytes;
8,058✔
343
      num += tnum;
8,058✔
344
    }
345
  }
346

347
  return TSDB_CODE_SUCCESS;
3,340✔
348
}
349

350
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource,
5,633,638✔
351
                          int32_t numOfRow2) {
352
  if (numOfRow2 <= 0) return;
5,633,638!
353

354
  uint32_t total = numOfRow1 + numOfRow2;
5,633,638✔
355

356
  uint32_t remindBits = BitPos(numOfRow1);
5,633,638✔
357
  uint32_t shiftBits = 8 - remindBits;
5,633,638✔
358

359
  if (remindBits == 0) {  // no need to shift bits of bitmap
5,633,638✔
360
    memcpy(pColumnInfoData->nullbitmap + BitmapLen(numOfRow1), pSource->nullbitmap, BitmapLen(numOfRow2));
3,884,362✔
361
    return;
3,884,362✔
362
  }
363

364
  uint8_t* p = (uint8_t*)pSource->nullbitmap;
1,749,276✔
365
  pColumnInfoData->nullbitmap[BitmapLen(numOfRow1) - 1] &= (0B11111111 << shiftBits);  // clear remind bits
1,749,276✔
366
  pColumnInfoData->nullbitmap[BitmapLen(numOfRow1) - 1] |= (p[0] >> remindBits);       // copy remind bits
1,749,276✔
367

368
  if (BitmapLen(numOfRow1) == BitmapLen(total)) {
1,749,276✔
369
    return;
748,323✔
370
  }
371

372
  int32_t len = BitmapLen(numOfRow2);
1,000,953✔
373
  int32_t i = 0;
1,000,953✔
374

375
  uint8_t* start = (uint8_t*)&pColumnInfoData->nullbitmap[BitmapLen(numOfRow1)];
1,000,953✔
376
  int32_t  overCount = BitmapLen(total) - BitmapLen(numOfRow1);
1,000,953✔
377
  memset(start, 0, overCount);
1,000,953✔
378
  while (i < len) {  // size limit of pSource->nullbitmap
18,216,089✔
379
    if (i >= 1) {
17,488,169✔
380
      start[i - 1] |= (p[i] >> remindBits);  // copy remind bits
16,489,365✔
381
    }
382

383
    if (i >= overCount) {  // size limit of pColumnInfoData->nullbitmap
17,488,169✔
384
      return;
273,033✔
385
    }
386

387
    start[i] |= (p[i] << shiftBits);  // copy shift bits
17,215,136✔
388
    i += 1;
17,215,136✔
389
  }
390
}
391

392
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
8,993,551✔
393
                        const SColumnInfoData* pSource, int32_t numOfRow2) {
394
  if (pColumnInfoData->info.type != pSource->info.type) {
8,993,551!
395
    return TSDB_CODE_INVALID_PARA;
×
396
  }
397

398
  if (numOfRow2 == 0) {
8,993,551!
399
    return numOfRow1;
×
400
  }
401

402
  if (pSource->hasNull) {
8,993,551✔
403
    pColumnInfoData->hasNull = pSource->hasNull;
4,103,386✔
404
  }
405

406
  uint32_t finalNumOfRows = numOfRow1 + numOfRow2;
8,993,551✔
407
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
8,993,551✔
408
    // Handle the bitmap
409
    if (finalNumOfRows > (*capacity)) {
3,361,963✔
410
      char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2));
1,807,479✔
411
      if (p == NULL) {
1,807,756!
UNCOV
412
        return terrno;
×
413
      }
414

415
      *capacity = finalNumOfRows;
1,807,775✔
416
      pColumnInfoData->varmeta.offset = (int32_t*)p;
1,807,775✔
417
    }
418

419
    for (int32_t i = 0; i < numOfRow2; ++i) {
181,822,534✔
420
      if (pSource->varmeta.offset[i] == -1) {
178,460,275✔
421
        pColumnInfoData->varmeta.offset[i + numOfRow1] = -1;
13,193,559✔
422
      } else {
423
        pColumnInfoData->varmeta.offset[i + numOfRow1] = pSource->varmeta.offset[i] + pColumnInfoData->varmeta.length;
165,266,716✔
424
      }
425
    }
426

427
    // copy data
428
    uint32_t len = pSource->varmeta.length;
3,362,259✔
429
    uint32_t oldLen = pColumnInfoData->varmeta.length;
3,362,259✔
430
    if (pColumnInfoData->varmeta.allocLen < len + oldLen) {
3,362,259✔
431
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, len + oldLen);
2,247,178✔
432
      if (tmp == NULL) {
2,247,148!
433
        return terrno;
×
434
      }
435

436
      pColumnInfoData->pData = tmp;
2,247,269✔
437
      pColumnInfoData->varmeta.allocLen = len + oldLen;
2,247,269✔
438
    }
439

440
    if (pColumnInfoData->pData && pSource->pData) {  // TD-20382
3,362,350✔
441
      memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len);
3,136,230✔
442
    }
443
    pColumnInfoData->varmeta.length = len + oldLen;
3,362,350✔
444
  } else {
445
    if (finalNumOfRows > (*capacity)) {
5,631,588✔
446
      // all data may be null, when the pColumnInfoData->info.type == 0, bytes == 0;
447
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes);
3,313,319✔
448
      if (tmp == NULL) {
3,316,638✔
449
        return terrno;
357✔
450
      }
451

452
      pColumnInfoData->pData = tmp;
3,316,281✔
453
      if (BitmapLen(numOfRow1) < BitmapLen(finalNumOfRows)) {
3,316,281✔
454
        char* btmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(finalNumOfRows));
2,571,943✔
455
        if (btmp == NULL) {
2,571,941!
UNCOV
456
          return terrno;
×
457
        }
458
        uint32_t extend = BitmapLen(finalNumOfRows) - BitmapLen(numOfRow1);
2,571,941✔
459
        memset(btmp + BitmapLen(numOfRow1), 0, extend);
2,571,941✔
460
        pColumnInfoData->nullbitmap = btmp;
2,571,941✔
461
      }
462

463
      *capacity = finalNumOfRows;
3,316,279✔
464
    }
465

466
    doBitmapMerge(pColumnInfoData, numOfRow1, pSource, numOfRow2);
5,634,548✔
467

468
    if (pSource->pData) {
5,633,820✔
469
      int32_t offset = pColumnInfoData->info.bytes * numOfRow1;
5,632,435✔
470
      memcpy(pColumnInfoData->pData + offset, pSource->pData, pSource->info.bytes * numOfRow2);
5,632,435✔
471
    }
472
  }
473

474
  return numOfRow1 + numOfRow2;
8,996,170✔
475
}
476

477
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
42,183,961✔
478
                      const SDataBlockInfo* pBlockInfo) {
479
  if (pColumnInfoData->info.type != pSource->info.type || (pBlockInfo != NULL && pBlockInfo->capacity < numOfRows)) {
42,183,961!
480
    return TSDB_CODE_INVALID_PARA;
×
481
  }
482

483
  if (numOfRows <= 0) {
42,183,961✔
484
    return numOfRows;
8,512✔
485
  }
486

487
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
51,103,796!
488
    int32_t newLen = pSource->varmeta.length;
8,918,441✔
489
    memcpy(pColumnInfoData->varmeta.offset, pSource->varmeta.offset, sizeof(int32_t) * numOfRows);
8,918,441✔
490
    if (pColumnInfoData->varmeta.allocLen < newLen) {
8,918,441✔
491
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, newLen);
5,224,360✔
492
      if (tmp == NULL) {
5,234,077!
493
        return terrno;
×
494
      }
495

496
      pColumnInfoData->pData = tmp;
5,234,266✔
497
      pColumnInfoData->varmeta.allocLen = newLen;
5,234,266✔
498
    }
499

500
    pColumnInfoData->varmeta.length = newLen;
8,928,347✔
501
    if (pColumnInfoData->pData != NULL && pSource->pData != NULL) {
8,928,347!
502
      memcpy(pColumnInfoData->pData, pSource->pData, newLen);
8,346,104✔
503
    }
504
  } else {
505
    memcpy(pColumnInfoData->nullbitmap, pSource->nullbitmap, BitmapLen(numOfRows));
33,257,008✔
506
    if (pSource->pData != NULL) {
33,257,008!
507
      memcpy(pColumnInfoData->pData, pSource->pData, pSource->info.bytes * numOfRows);
33,278,028✔
508
    }
509
  }
510

511
  pColumnInfoData->hasNull = pSource->hasNull;
42,185,355✔
512
  pColumnInfoData->info = pSource->info;
42,185,355✔
513
  return 0;
42,185,355✔
514
}
515

516
int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx,
7,783,999✔
517
                           int32_t numOfRows) {
518
  if (pDst->info.type != pSrc->info.type || pDst->info.bytes != pSrc->info.bytes || pSrc->reassigned) {
7,783,999!
519
    return TSDB_CODE_INVALID_PARA;
×
520
  }
521

522
  if (numOfRows <= 0) {
7,784,059!
523
    return numOfRows;
×
524
  }
525

526
  if (IS_VAR_DATA_TYPE(pDst->info.type)) {
9,092,026!
527
    int32_t allLen = 0;
1,307,966✔
528
    void*   srcAddr = NULL;
1,307,966✔
529
    if (pSrc->hasNull) {
1,307,966✔
530
      for (int32_t i = 0; i < numOfRows; ++i) {
2,384,501✔
531
        if (colDataIsNull_var(pSrc, srcIdx + i)) {
1,193,099✔
532
          pDst->varmeta.offset[dstIdx + i] = -1;
157,610✔
533
          pDst->hasNull = true;
157,610✔
534
          continue;
157,610✔
535
        }
536

537
        char* pData = colDataGetVarData(pSrc, srcIdx + i);
1,035,489✔
538
        if (NULL == srcAddr) {
1,035,489✔
539
          srcAddr = pData;
1,033,791✔
540
        }
541
        int32_t dataLen = 0;
1,035,489✔
542
        if (pSrc->info.type == TSDB_DATA_TYPE_JSON) {
1,035,489✔
543
          dataLen = getJsonValueLen(pData);
244✔
544
        } else {
545
          dataLen = varDataTLen(pData);
1,035,245✔
546
        }
547
        pDst->varmeta.offset[dstIdx + i] = pDst->varmeta.length + allLen;
1,035,489✔
548
        allLen += dataLen;
1,035,489✔
549
      }
550
    } else {
551
      for (int32_t i = 0; i < numOfRows; ++i) {
234,360✔
552
        char*   pData = colDataGetVarData(pSrc, srcIdx + i);
117,796✔
553
        int32_t dataLen = 0;
117,796✔
554
        if (pSrc->info.type == TSDB_DATA_TYPE_JSON) {
117,796✔
555
          dataLen = getJsonValueLen(pData);
28✔
556
        } else {
557
          dataLen = varDataTLen(pData);
117,768✔
558
        }
559
        pDst->varmeta.offset[dstIdx + i] = pDst->varmeta.length + allLen;
117,796✔
560
        allLen += dataLen;
117,796✔
561
      }
562
    }
563

564
    if (allLen > 0) {
1,307,966✔
565
      // copy data
566
      if (pDst->varmeta.allocLen < pDst->varmeta.length + allLen) {
1,150,593✔
567
        char* tmp = taosMemoryRealloc(pDst->pData, pDst->varmeta.length + allLen);
1,149,621✔
568
        if (tmp == NULL) {
1,149,622!
569
          return terrno;
×
570
        }
571

572
        pDst->pData = tmp;
1,149,622✔
573
        pDst->varmeta.allocLen = pDst->varmeta.length + allLen;
1,149,622✔
574
      }
575
      if (pSrc->hasNull) {
1,150,594✔
576
        memcpy(pDst->pData + pDst->varmeta.length, srcAddr, allLen);
1,033,792✔
577
      } else {
578
        memcpy(pDst->pData + pDst->varmeta.length, colDataGetVarData(pSrc, srcIdx), allLen);
116,802✔
579
      }
580
      pDst->varmeta.length = pDst->varmeta.length + allLen;
1,150,594✔
581
    }
582
  } else {
583
    if (pSrc->hasNull) {
6,476,093✔
584
      if (BitPos(dstIdx) == BitPos(srcIdx)) {
4,976,725✔
585
        for (int32_t i = 0; i < numOfRows; ++i) {
4,435,174✔
586
          if (0 == BitPos(dstIdx) && (i + (1 << NBIT) <= numOfRows)) {
3,574,270✔
587
            BMCharPos(pDst->nullbitmap, dstIdx + i) = BMCharPos(pSrc->nullbitmap, srcIdx + i);
702,887✔
588
            if (BMCharPos(pDst->nullbitmap, dstIdx + i)) {
702,887!
589
              pDst->hasNull = true;
×
590
            }
591
            i += (1 << NBIT) - 1;
702,887✔
592
          } else {
593
            if (colDataIsNull_f(pSrc->nullbitmap, srcIdx + i)) {
2,871,383✔
594
              colDataSetNull_f(pDst->nullbitmap, dstIdx + i);
69,631✔
595
              pDst->hasNull = true;
69,631✔
596
            } else {
597
              colDataClearNull_f(pDst->nullbitmap, dstIdx + i);
2,801,752✔
598
            }
599
          }
600
        }
601
      } else {
602
        for (int32_t i = 0; i < numOfRows; ++i) {
12,999,918✔
603
          if (colDataIsNull_f(pSrc->nullbitmap, srcIdx + i)) {
8,884,097✔
604
            colDataSetNull_f(pDst->nullbitmap, dstIdx + i);
443,551✔
605
            pDst->hasNull = true;
443,551✔
606
          } else {
607
            colDataClearNull_f(pDst->nullbitmap, dstIdx + i);
8,440,546✔
608
          }
609
        }
610
      }
611
    }
612

613
    if (pSrc->pData != NULL) {
6,476,093✔
614
      memcpy(pDst->pData + pDst->info.bytes * dstIdx, pSrc->pData + pSrc->info.bytes * srcIdx,
6,475,796✔
615
             pDst->info.bytes * numOfRows);
6,475,796✔
616
    }
617
  }
618

619
  return 0;
7,784,060✔
620
}
621

622
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSize(pBlock->pDataBlock); }
31,193,237✔
623

624
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; }
5,692,556✔
625

626
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) {
9,100,149✔
627
  if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0) {
9,100,149!
628
    return 0;
95,115✔
629
  }
630

631
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
9,005,034✔
632
  if (numOfCols <= 0) {
9,002,296!
633
    return -1;
×
634
  }
635

636
  int32_t index = (tsColumnIndex == -1) ? 0 : tsColumnIndex;
9,002,296!
637

638
  SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index);
9,002,296✔
639
  if (pColInfoData == NULL) {
9,003,786!
640
    return 0;
×
641
  }
642

643
  if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
9,003,786✔
644
    return 0;
2,174,479✔
645
  }
646

647
  TSKEY skey = *(TSKEY*)colDataGetData(pColInfoData, 0);
6,829,307!
648
  TSKEY ekey = *(TSKEY*)colDataGetData(pColInfoData, (pDataBlock->info.rows - 1));
6,829,307!
649

650
  pDataBlock->info.window.skey = TMIN(skey, ekey);
6,829,307✔
651
  pDataBlock->info.window.ekey = TMAX(skey, ekey);
6,829,307✔
652

653
  return 0;
6,829,307✔
654
}
655

656
int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, bool asc) {
3,674,490✔
657
  if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0 || pkColumnIndex == -1) {
3,674,490!
658
    return 0;
2,964,222✔
659
  }
660

661
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
710,268✔
662
  if (numOfCols <= 0) {
710,062!
663
    return -1;
×
664
  }
665

666
  SDataBlockInfo*  pInfo = &pDataBlock->info;
710,062✔
667
  SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pkColumnIndex);
710,062✔
668
  if (pColInfoData == NULL) {
709,953!
UNCOV
669
    return terrno;
×
670
  }
671

672
  if (!IS_NUMERIC_TYPE(pColInfoData->info.type) && (pColInfoData->info.type != TSDB_DATA_TYPE_VARCHAR)) {
709,957!
673
    return 0;
×
674
  }
675

676
  void* skey = colDataGetData(pColInfoData, 0);
709,957!
677
  void* ekey = colDataGetData(pColInfoData, (pInfo->rows - 1));
709,957!
678

679
  if (asc) {
709,957✔
680
    if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
404,910!
681
      GET_TYPED_DATA(pInfo->pks[0].val, int64_t, pColInfoData->info.type, skey);
260,992!
682
      GET_TYPED_DATA(pInfo->pks[1].val, int64_t, pColInfoData->info.type, ekey);
260,992!
683
    } else {  // todo refactor
684
      memcpy(pInfo->pks[0].pData, varDataVal(skey), varDataLen(skey));
143,918✔
685
      pInfo->pks[0].nData = varDataLen(skey);
143,918✔
686

687
      memcpy(pInfo->pks[1].pData, varDataVal(ekey), varDataLen(ekey));
143,918✔
688
      pInfo->pks[1].nData = varDataLen(ekey);
143,918✔
689
    }
690
  } else {
691
    if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
305,047!
692
      GET_TYPED_DATA(pInfo->pks[0].val, int64_t, pColInfoData->info.type, ekey);
177,922!
693
      GET_TYPED_DATA(pInfo->pks[1].val, int64_t, pColInfoData->info.type, skey);
177,922!
694
    } else {  // todo refactor
695
      memcpy(pInfo->pks[0].pData, varDataVal(ekey), varDataLen(ekey));
127,125✔
696
      pInfo->pks[0].nData = varDataLen(ekey);
127,125✔
697

698
      memcpy(pInfo->pks[1].pData, varDataVal(skey), varDataLen(skey));
127,125✔
699
      pInfo->pks[1].nData = varDataLen(skey);
127,125✔
700
    }
701
  }
702

703
  return 0;
709,957✔
704
}
705

706
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
1,659,921✔
707
  int32_t code = 0;
1,659,921✔
708
  int32_t capacity = pDest->info.capacity;
1,659,921✔
709
  size_t  numOfCols = taosArrayGetSize(pDest->pDataBlock);
1,659,921✔
710
  for (int32_t i = 0; i < numOfCols; ++i) {
7,340,964✔
711
    SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
5,679,471✔
712
    SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
5,679,275✔
713
    if (pCol1 == NULL || pCol2 == NULL) {
5,678,077!
714
      return terrno;
×
715
    }
716

717
    capacity = pDest->info.capacity;
5,678,259✔
718
    int32_t ret = colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows);
5,678,259✔
719
    if (ret < 0) {  // error occurs
5,680,643!
720
      code = ret;
×
721
      return code;
×
722
    }
723
  }
724

725
  pDest->info.capacity = capacity;
1,661,493✔
726
  pDest->info.rows += pSrc->info.rows;
1,661,493✔
727
  return code;
1,661,493✔
728
}
729

730
int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t srcIdx, int32_t numOfRows) {
1,380✔
731
  int32_t code = 0;
1,380✔
732
  if (pDest->info.rows + numOfRows > pDest->info.capacity) {
1,380!
733
    return TSDB_CODE_INVALID_PARA;
×
734
  }
735

736
  size_t numOfCols = taosArrayGetSize(pDest->pDataBlock);
1,380✔
737
  for (int32_t i = 0; i < numOfCols; ++i) {
4,594✔
738
    SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
3,214✔
739
    SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
3,214✔
740
    if (pCol2 == NULL || pCol1 == NULL) {
3,214!
741
      return terrno;
×
742
    }
743

744
    code = colDataAssignNRows(pCol2, pDest->info.rows, pCol1, srcIdx, numOfRows);
3,214✔
745
    if (code) {
3,214!
746
      return code;
×
747
    }
748
  }
749

750
  pDest->info.rows += numOfRows;
1,380✔
751
  return code;
1,380✔
752
}
753

754
void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) {
×
755
  if (numOfRows == 0) {
×
756
    return;
×
757
  }
758
  
759
  if (numOfRows >= pBlock->info.rows) {
×
760
    blockDataCleanup(pBlock);
×
761
    return;
×
762
  }
763

764
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
×
765
  for (int32_t i = 0; i < numOfCols; ++i) {
×
766
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
×
767
    if (pCol == NULL) {
×
768
      continue;
×
769
    }
770

771
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
×
772
      pCol->varmeta.length = pCol->varmeta.offset[pBlock->info.rows - numOfRows];
×
773
      memset(pCol->varmeta.offset + pBlock->info.rows - numOfRows, 0, sizeof(*pCol->varmeta.offset) * numOfRows);
×
774
    } else {
775
      int32_t i = pBlock->info.rows - numOfRows;
×
776
      for (; i < pBlock->info.rows; ++i) {
×
777
        if (BitPos(i)) {
×
778
          colDataClearNull_f(pCol->nullbitmap, i);
×
779
        } else {
780
          break;
×
781
        }
782
      }
783

784
      int32_t bytes = (pBlock->info.rows - i) / 8;
×
785
      memset(&BMCharPos(pCol->nullbitmap, i), 0, bytes);
×
786
      i += bytes * 8;
×
787

788
      for (; i < pBlock->info.rows; ++i) {
×
789
        colDataClearNull_f(pCol->nullbitmap, i);
×
790
      }
791
    }
792
  }
793

794
  pBlock->info.rows -= numOfRows;
×
795
}
796

797
size_t blockDataGetSize(const SSDataBlock* pBlock) {
52,341,974✔
798
  size_t total = 0;
52,341,974✔
799
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
52,341,974✔
800
  for (int32_t i = 0; i < numOfCols; ++i) {
165,791,753✔
801
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
115,340,025✔
802
    if (pColInfoData == NULL) {
115,187,819!
803
      continue;
×
804
    }
805

806
    total += colDataGetFullLength(pColInfoData, pBlock->info.rows);
115,187,819✔
807
  }
808

809
  return total;
50,451,728✔
810
}
811

812
// the number of tuples can be fit in one page.
813
// Actual data rows pluses the corresponding meta data must fit in one memory buffer of the given page size.
814
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
142,112✔
815
                           int32_t pageSize) {
816
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
142,112✔
817
  int32_t numOfRows = pBlock->info.rows;
142,112✔
818

819
  int32_t bitmapChar = 1;
142,112✔
820

821
  size_t headerSize = sizeof(int32_t);
142,112✔
822
  size_t colHeaderSize = sizeof(int32_t) * numOfCols;
142,112✔
823

824
  // TODO speedup by checking if the whole page can fit in firstly.
825
  if (!hasVarCol) {
142,112✔
826
    size_t  rowSize = blockDataGetRowSize(pBlock);
3,987✔
827
    int32_t capacity = blockDataGetCapacityInRow(pBlock, pageSize, headerSize + colHeaderSize);
3,987✔
828
    if (capacity <= 0) {
3,987!
829
      return terrno;
×
830
    }
831

832
    *stopIndex = startIndex + capacity - 1;
3,987✔
833
    if (*stopIndex >= numOfRows) {
3,987✔
834
      *stopIndex = numOfRows - 1;
4✔
835
    }
836

837
    return TSDB_CODE_SUCCESS;
3,987✔
838
  }
839
  // iterate the rows that can be fit in this buffer page
840
  int32_t size = (headerSize + colHeaderSize);
138,125✔
841
  for (int32_t j = startIndex; j < numOfRows; ++j) {
5,818,125✔
842
    for (int32_t i = 0; i < numOfCols; ++i) {
76,594,158✔
843
      SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, i);
70,776,184✔
844
      if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
70,776,184!
845
        if (pColInfoData->varmeta.offset[j] != -1) {
17,377,614!
846
          char* p = colDataGetData(pColInfoData, j);
17,377,614!
847
          size += varDataTLen(p);
17,377,614✔
848
        }
849

850
        size += sizeof(pColInfoData->varmeta.offset[0]);
17,377,614✔
851
      } else {
852
        size += pColInfoData->info.bytes;
53,398,570✔
853

854
        if (((j - startIndex) & 0x07) == 0) {
53,398,570✔
855
          size += 1;  // the space for null bitmap
6,861,896✔
856
        }
857
      }
858
    }
859

860
    if (size > pageSize) {  // pageSize must be able to hold one row
5,817,974✔
861
      *stopIndex = j - 1;
137,974✔
862
      if (*stopIndex < startIndex) {
137,974!
863
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
864
      }
865

866
      return TSDB_CODE_SUCCESS;
137,974✔
867
    }
868
  }
869

870
  // all fit in
871
  *stopIndex = numOfRows - 1;
151✔
872
  return TSDB_CODE_SUCCESS;
151✔
873
}
874

875
int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount, SSDataBlock** pResBlock) {
188,360✔
876
  int32_t code = 0;
188,360✔
877
  QRY_PARAM_CHECK(pResBlock);
188,360!
878

879
  if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) {
188,360!
880
    return TSDB_CODE_INVALID_PARA;
×
881
  }
882

883
  SSDataBlock* pDst = NULL;
188,364✔
884
  code = createOneDataBlock(pBlock, false, &pDst);
188,364✔
885
  if (code) {
188,341!
886
    return code;
×
887
  }
888

889
  code = blockDataEnsureCapacity(pDst, rowCount);
188,341✔
890
  if (code) {
188,350!
891
    blockDataDestroy(pDst);
×
892
    return code;
×
893
  }
894

895
  /* may have disorder varchar data, TODO
896
    for (int32_t i = 0; i < numOfCols; ++i) {
897
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
898
      SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
899

900
      colDataAssignNRows(pDstCol, 0, pColData, startIndex, rowCount);
901
    }
902
  */
903

904
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
188,350✔
905
  for (int32_t i = 0; i < numOfCols; ++i) {
2,179,702✔
906
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
1,989,095✔
907
    SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
1,989,082✔
908
    if (pColData == NULL || pDstCol == NULL) {
1,989,058!
909
      continue;
×
910
    }
911

912
    for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
78,458,047✔
913
      bool isNull = false;
76,466,682✔
914
      if (pBlock->pBlockAgg == NULL) {
76,466,682!
915
        isNull = colDataIsNull_s(pColData, j);
152,933,364✔
916
      } else {
917
        isNull = colDataIsNull(pColData, pBlock->info.rows, j, &pBlock->pBlockAgg[i]);
×
918
      }
919

920
      if (isNull) {
76,466,682✔
921
        colDataSetNULL(pDstCol, j - startIndex);
2,154,304✔
922
      } else {
923
        char* p = colDataGetData(pColData, j);
74,312,378!
924
        code = colDataSetVal(pDstCol, j - startIndex, p, false);
74,312,378✔
925
        if (code) {
74,314,685!
926
          break;
×
927
        }
928
      }
929
    }
930
  }
931

932
  pDst->info.rows = rowCount;
190,607✔
933
  *pResBlock = pDst;
190,607✔
934
  return code;
190,607✔
935
}
936

937
/**
938
 *
939
 * +------------------+---------------------------------------------+
940
 * |the number of rows|                    column #1                |
941
 * |    (4 bytes)     |------------+-----------------------+--------+
942
 * |                  | null bitmap| column length(4bytes) | values |
943
 * +------------------+------------+-----------------------+--------+
944
 * @param buf
945
 * @param pBlock
946
 * @return
947
 */
948
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
342,919✔
949
  // write the number of rows
950
  *(uint32_t*)buf = pBlock->info.rows;
342,919✔
951

952
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
342,919✔
953
  int32_t numOfRows = pBlock->info.rows;
342,888✔
954

955
  char* pStart = buf + sizeof(uint32_t);
342,888✔
956

957
  for (int32_t i = 0; i < numOfCols; ++i) {
2,833,010✔
958
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
2,490,081✔
959
    if (pCol == NULL) {
2,490,036!
960
      continue;
×
961
    }
962

963
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
2,490,036!
964
      memcpy(pStart, pCol->varmeta.offset, numOfRows * sizeof(int32_t));
525,142✔
965
      pStart += numOfRows * sizeof(int32_t);
525,142✔
966
    } else {
967
      memcpy(pStart, pCol->nullbitmap, BitmapLen(numOfRows));
1,964,894✔
968
      pStart += BitmapLen(pBlock->info.rows);
1,964,894✔
969
    }
970

971
    uint32_t dataSize = colDataGetLength(pCol, numOfRows);
2,490,036✔
972

973
    *(int32_t*)pStart = dataSize;
2,490,122✔
974
    pStart += sizeof(int32_t);
2,490,122✔
975

976
    if (pCol->reassigned && IS_VAR_DATA_TYPE(pCol->info.type)) {
2,490,122!
977
      for (int32_t row = 0; row < numOfRows; ++row) {
×
978
        char*   pColData = pCol->pData + pCol->varmeta.offset[row];
×
979
        int32_t colSize = 0;
×
980
        if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
×
981
          colSize = getJsonValueLen(pColData);
×
982
        } else {
983
          colSize = varDataTLen(pColData);
×
984
        }
985
        memcpy(pStart, pColData, colSize);
×
986
        pStart += colSize;
×
987
      }
988
    } else {
989
      if (dataSize != 0) {
2,490,122✔
990
        // ubsan reports error if pCol->pData==NULL && dataSize==0
991
        memcpy(pStart, pCol->pData, dataSize);
2,483,719✔
992
      }
993
      pStart += dataSize;
2,490,122✔
994
    }
995
  }
996

997
  return 0;
342,929✔
998
}
999

1000
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
369,264✔
1001
  int32_t numOfRows = *(int32_t*)buf;
369,264✔
1002
  if (numOfRows == 0) {
369,264!
1003
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1004
  }
1005
  int32_t code = blockDataEnsureCapacity(pBlock, numOfRows);
369,264✔
1006
  if (code) {
369,273!
1007
    return code;
×
1008
  }
1009

1010
  pBlock->info.rows = numOfRows;
369,273✔
1011
  size_t      numOfCols = taosArrayGetSize(pBlock->pDataBlock);
369,273✔
1012
  const char* pStart = buf + sizeof(uint32_t);
369,248✔
1013

1014
  for (int32_t i = 0; i < numOfCols; ++i) {
2,454,330✔
1015
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
2,085,049✔
1016
    if (pCol == NULL) {
2,085,029!
1017
      continue;
×
1018
    }
1019

1020
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
2,085,029!
1021
      size_t metaSize = pBlock->info.rows * sizeof(int32_t);
380,278✔
1022
      char*  tmp = taosMemoryRealloc(pCol->varmeta.offset, metaSize);  // preview calloc is too small
380,278✔
1023
      if (tmp == NULL) {
380,331!
1024
        return terrno;
×
1025
      }
1026

1027
      pCol->varmeta.offset = (int32_t*)tmp;
380,332✔
1028
      memcpy(pCol->varmeta.offset, pStart, metaSize);
380,332✔
1029
      pStart += metaSize;
380,332✔
1030
    } else {
1031
      memcpy(pCol->nullbitmap, pStart, BitmapLen(pBlock->info.rows));
1,704,751✔
1032
      pStart += BitmapLen(pBlock->info.rows);
1,704,751✔
1033
    }
1034

1035
    int32_t colLength = *(int32_t*)pStart;
2,085,083✔
1036
    pStart += sizeof(int32_t);
2,085,083✔
1037

1038
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
2,085,083!
1039
      if (pCol->varmeta.allocLen < colLength) {
380,300✔
1040
        char* tmp = taosMemoryRealloc(pCol->pData, colLength);
17,617✔
1041
        if (tmp == NULL) {
17,616!
1042
          return terrno;
×
1043
        }
1044

1045
        pCol->pData = tmp;
17,616✔
1046
        pCol->varmeta.allocLen = colLength;
17,616✔
1047
      }
1048

1049
      pCol->varmeta.length = colLength;
380,299✔
1050
      if (pCol->varmeta.length > pCol->varmeta.allocLen) {
380,299!
1051
        return TSDB_CODE_FAILED;
×
1052
      }
1053
    }
1054
    if (colLength != 0) {
2,085,082✔
1055
      // ubsan reports error if colLength==0 && pCol->pData == 0
1056
      memcpy(pCol->pData, pStart, colLength);
2,078,669✔
1057
    }
1058
    pStart += colLength;
2,085,082✔
1059
  }
1060

1061
  return TSDB_CODE_SUCCESS;
369,281✔
1062
}
1063

1064
static bool colDataIsNNull(const SColumnInfoData* pColumnInfoData, int32_t startIndex, uint32_t nRows) {
6,570,903✔
1065
  if (!pColumnInfoData->hasNull) {
6,570,903!
1066
    return false;
×
1067
  }
1068

1069
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
6,570,903!
1070
    for (int32_t i = startIndex; i < nRows; ++i) {
5,533,575✔
1071
      if (!colDataIsNull_var(pColumnInfoData, i)) {
5,501,645✔
1072
        return false;
2,095,090✔
1073
      }
1074
    }
1075
  } else {
1076
    if (pColumnInfoData->nullbitmap == NULL) {
4,443,883!
1077
      return false;
×
1078
    }
1079

1080
    for (int32_t i = startIndex; i < nRows; ++i) {
9,930,669✔
1081
      if (!colDataIsNull_f(pColumnInfoData->nullbitmap, i)) {
9,725,282✔
1082
        return false;
4,238,496✔
1083
      }
1084
    }
1085
  }
1086

1087
  return true;
237,317✔
1088
}
1089

1090
// todo remove this
1091
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) {
1,203,111✔
1092
  pBlock->info.rows = *(int32_t*)buf;
1,203,111✔
1093
  pBlock->info.id.groupId = *(uint64_t*)(buf + sizeof(int32_t));
1,203,111✔
1094

1095
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1,203,111✔
1096

1097
  const char* pStart = buf + sizeof(uint32_t) + sizeof(uint64_t);
1,202,948✔
1098

1099
  for (int32_t i = 0; i < numOfCols; ++i) {
7,776,467✔
1100
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
6,575,285✔
1101
    if (pCol == NULL) {
6,571,722!
1102
      continue;
×
1103
    }
1104

1105
    pCol->hasNull = true;
6,571,722✔
1106

1107
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
6,571,722!
1108
      size_t metaSize = capacity * sizeof(int32_t);
2,129,272✔
1109
      memcpy(pCol->varmeta.offset, pStart, metaSize);
2,129,272✔
1110
      pStart += metaSize;
2,129,272✔
1111
    } else {
1112
      memcpy(pCol->nullbitmap, pStart, BitmapLen(capacity));
4,442,450✔
1113
      pStart += BitmapLen(capacity);
4,442,450✔
1114
    }
1115

1116
    int32_t colLength = *(int32_t*)pStart;
6,571,722✔
1117
    pStart += sizeof(int32_t);
6,571,722✔
1118

1119
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
6,571,722!
1120
      if (pCol->varmeta.allocLen < colLength) {
2,127,824✔
1121
        char* tmp = taosMemoryRealloc(pCol->pData, colLength);
5,435✔
1122
        if (tmp == NULL) {
5,435!
1123
          return terrno;
×
1124
        }
1125

1126
        pCol->pData = tmp;
5,435✔
1127
        pCol->varmeta.allocLen = colLength;
5,435✔
1128
      }
1129

1130
      pCol->varmeta.length = colLength;
2,127,824✔
1131
      if (pCol->varmeta.length > pCol->varmeta.allocLen) {
2,127,824!
1132
        return TSDB_CODE_FAILED;
×
1133
      }
1134
    }
1135

1136
    if (!colDataIsNNull(pCol, 0, pBlock->info.rows)) {
6,571,722✔
1137
      memcpy(pCol->pData, pStart, colLength);
6,316,151✔
1138
    }
1139

1140
    pStart += pCol->info.bytes * capacity;
6,573,519✔
1141
  }
1142

1143
  return TSDB_CODE_SUCCESS;
1,201,182✔
1144
}
1145

1146
size_t blockDataGetRowSize(SSDataBlock* pBlock) {
40,489,015✔
1147
  if (pBlock->info.rowSize == 0) {
40,489,015!
1148
    size_t rowSize = 0;
×
1149

1150
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
×
1151
    for (int32_t i = 0; i < numOfCols; ++i) {
×
1152
      SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
×
1153
      if (pColInfo == NULL) {
×
1154
        continue;
×
1155
      }
1156

1157
      rowSize += pColInfo->info.bytes;
×
1158
    }
1159

1160
    pBlock->info.rowSize = rowSize;
×
1161
  }
1162

1163
  return pBlock->info.rowSize;
40,489,015✔
1164
}
1165

1166
/**
1167
 * @refitem blockDataToBuf for the meta size
1168
 * @param pBlock
1169
 * @return
1170
 */
1171
size_t blockDataGetSerialMetaSize(uint32_t numOfCols) {
12,333,796✔
1172
  // | version | total length | total rows | blankFull | total columns | flag seg| block group id | column schema
1173
  // | each column length |
1174
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(bool) + sizeof(int32_t) + sizeof(int32_t) +
1175
         sizeof(uint64_t) + numOfCols * (sizeof(int8_t) + sizeof(int32_t)) + numOfCols * sizeof(int32_t);
12,333,796✔
1176
}
1177

1178
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
65,117✔
1179
  double rowSize = 0;
65,117✔
1180

1181
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
65,117✔
1182
  for (int32_t i = 0; i < numOfCols; ++i) {
287,796✔
1183
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
222,691✔
1184
    if (pColInfo == NULL) {
222,673!
1185
      continue;
×
1186
    }
1187

1188
    rowSize += pColInfo->info.bytes;
222,673✔
1189
    if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
222,673!
1190
      rowSize += sizeof(int32_t);
18,531✔
1191
    } else {
1192
      rowSize += 1 / 8.0;  // one bit for each record
204,142✔
1193
    }
1194
  }
1195

1196
  return rowSize;
65,105✔
1197
}
1198

1199
typedef struct SSDataBlockSortHelper {
1200
  SArray*      orderInfo;  // SArray<SBlockOrderInfo>
1201
  SSDataBlock* pDataBlock;
1202
} SSDataBlockSortHelper;
1203

1204
int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
862,465,902✔
1205
  const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*)param;
862,465,902✔
1206

1207
  SSDataBlock* pDataBlock = pHelper->pDataBlock;
862,465,902✔
1208

1209
  int32_t left = *(int32_t*)p1;
862,465,902✔
1210
  int32_t right = *(int32_t*)p2;
862,465,902✔
1211

1212
  SArray* pInfo = pHelper->orderInfo;
862,465,902✔
1213

1214
  for (int32_t i = 0; i < pInfo->size; ++i) {
1,737,182,731✔
1215
    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
1,546,306,642✔
1216
    SColumnInfoData* pColInfoData = pOrder->pColData;  // TARRAY_GET_ELEM(pDataBlock->pDataBlock, pOrder->colIndex);
1,546,306,642✔
1217

1218
    if (pColInfoData->hasNull) {
1,546,306,642✔
1219
      bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, NULL);
1,468,328,226!
1220
      bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, right, NULL);
1,468,328,226!
1221
      if (leftNull && rightNull) {
1,468,328,226✔
1222
        continue;  // continue to next slot
32,923,427✔
1223
      }
1224

1225
      if (rightNull) {
1,435,404,799✔
1226
        return pOrder->nullFirst ? 1 : -1;
2,478,490✔
1227
      }
1228

1229
      if (leftNull) {
1,432,926,309✔
1230
        return pOrder->nullFirst ? -1 : 1;
4,484,894✔
1231
      }
1232
    }
1233

1234
    void* left1 = colDataGetData(pColInfoData, left);
1,506,419,831!
1235
    void* right1 = colDataGetData(pColInfoData, right);
1,506,419,831!
1236
    if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
1,506,419,831✔
1237
      if (tTagIsJson(left1) || tTagIsJson(right1)) {
1,056!
1238
        terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
32,142,316✔
1239
        return 0;
×
1240
      }
1241
    }
1242

1243
    __compar_fn_t fn;
1244
    if (pOrder->compFn) {
1,474,277,515!
1245
      fn = pOrder->compFn;
1,474,277,515✔
1246
    } else {
1247
      fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
×
1248
    }
1249

1250
    int ret = fn(left1, right1);
1,474,277,515✔
1251
    if (ret == 0) {
1,466,191,105✔
1252
      continue;
841,793,402✔
1253
    } else {
1254
      return ret;
624,397,703✔
1255
    }
1256
  }
1257

1258
  return 0;
190,876,089✔
1259
}
1260

1261
static void blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, const int32_t* index) {
146,844✔
1262
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
146,844✔
1263
  for (int32_t i = 0; i < numOfCols; ++i) {
651,708✔
1264
    SColumnInfoData* pDst = &pCols[i];
504,870✔
1265
    SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
504,870✔
1266
    if (pSrc == NULL) {
504,861!
1267
      continue;
×
1268
    }
1269

1270
    if (IS_VAR_DATA_TYPE(pSrc->info.type)) {
504,861✔
1271
      if (pSrc->varmeta.length != 0) {
155,353✔
1272
        memcpy(pDst->pData, pSrc->pData, pSrc->varmeta.length);
150,244✔
1273
      }
1274
      pDst->varmeta.length = pSrc->varmeta.length;
155,353✔
1275

1276
      for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
49,334,210✔
1277
        pDst->varmeta.offset[j] = pSrc->varmeta.offset[index[j]];
49,178,857✔
1278
      }
1279
    } else {
1280
      for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
219,495,040✔
1281
        if (colDataIsNull_f(pSrc->nullbitmap, index[j])) {
219,145,532✔
1282
          colDataSetNull_f_s(pDst, j);
12,781,513✔
1283
          continue;
12,781,513✔
1284
        }
1285
        memcpy(pDst->pData + j * pDst->info.bytes, pSrc->pData + index[j] * pDst->info.bytes, pDst->info.bytes);
206,364,019✔
1286
      }
1287
    }
1288
  }
1289
}
146,838✔
1290

1291
static int32_t createHelpColInfoData(const SSDataBlock* pDataBlock, SColumnInfoData** ppCols) {
146,841✔
1292
  int32_t code = 0;
146,841✔
1293
  int32_t rows = pDataBlock->info.capacity;
146,841✔
1294
  size_t  numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
146,841✔
1295
  int32_t i = 0;
146,843✔
1296

1297
  *ppCols = NULL;
146,843✔
1298

1299
  SColumnInfoData* pCols = taosMemoryCalloc(numOfCols, sizeof(SColumnInfoData));
146,843✔
1300
  if (pCols == NULL) {
146,849!
1301
    return terrno;
×
1302
  }
1303

1304
  for (i = 0; i < numOfCols; ++i) {
651,754✔
1305
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
504,896✔
1306
    if (pColInfoData == NULL) {
504,895!
1307
      continue;
×
1308
    }
1309

1310
    pCols[i].info = pColInfoData->info;
504,895✔
1311
    if (IS_VAR_DATA_TYPE(pCols[i].info.type)) {
504,895!
1312
      pCols[i].varmeta.offset = taosMemoryCalloc(rows, sizeof(int32_t));
155,376✔
1313
      pCols[i].pData = taosMemoryCalloc(1, pColInfoData->varmeta.length);
155,385✔
1314
      if (pCols[i].varmeta.offset == NULL || pCols[i].pData == NULL) {
155,385!
1315
        code = terrno;
×
1316
        taosMemoryFree(pCols[i].varmeta.offset);
×
1317
        taosMemoryFree(pCols[i].pData);
×
1318
        goto _error;
×
1319
      }
1320

1321
      pCols[i].varmeta.length = pColInfoData->varmeta.length;
155,385✔
1322
      pCols[i].varmeta.allocLen = pCols[i].varmeta.length;
155,385✔
1323
    } else {
1324
      pCols[i].nullbitmap = taosMemoryCalloc(1, BitmapLen(rows));
349,519✔
1325
      pCols[i].pData = taosMemoryCalloc(rows, pCols[i].info.bytes);
349,528✔
1326
      if (pCols[i].nullbitmap == NULL || pCols[i].pData == NULL) {
349,530!
1327
        code = terrno;
10✔
1328
        taosMemoryFree(pCols[i].nullbitmap);
×
1329
        taosMemoryFree(pCols[i].pData);
×
1330
        goto _error;
×
1331
      }
1332
    }
1333
  }
1334

1335
  *ppCols = pCols;
146,858✔
1336
  return code;
146,858✔
1337

1338
  _error:
×
1339
  for(int32_t j = 0; j < i; ++j) {
×
1340
    colDataDestroy(&pCols[j]);
×
1341
  }
1342

1343
  taosMemoryFree(pCols);
×
1344
  return code;
×
1345
}
1346

1347
static void copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) {
146,847✔
1348
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
146,847✔
1349

1350
  for (int32_t i = 0; i < numOfCols; ++i) {
651,711✔
1351
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
504,870✔
1352
    if (pColInfoData == NULL) {
504,867!
1353
      continue;
×
1354
    }
1355

1356
    pColInfoData->info = pCols[i].info;
504,867✔
1357
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
504,867✔
1358
      taosMemoryFreeClear(pColInfoData->varmeta.offset);
155,365!
1359
      pColInfoData->varmeta = pCols[i].varmeta;
155,359✔
1360
    } else {
1361
      taosMemoryFreeClear(pColInfoData->nullbitmap);
349,502!
1362
      pColInfoData->nullbitmap = pCols[i].nullbitmap;
349,491✔
1363
    }
1364

1365
    taosMemoryFreeClear(pColInfoData->pData);
504,850✔
1366
    pColInfoData->pData = pCols[i].pData;
504,862✔
1367
  }
1368

1369
  taosMemoryFreeClear(pCols);
146,841!
1370
}
146,844✔
1371

1372
static int32_t* createTupleIndex(size_t rows) {
146,833✔
1373
  int32_t* index = taosMemoryCalloc(rows, sizeof(int32_t));
146,833✔
1374
  if (index == NULL) {
146,853!
1375
    return NULL;
×
1376
  }
1377

1378
  for (int32_t i = 0; i < rows; ++i) {
57,305,372✔
1379
    index[i] = i;
57,158,519✔
1380
  }
1381

1382
  return index;
146,853✔
1383
}
1384

1385
static void destroyTupleIndex(int32_t* index) { taosMemoryFreeClear(index); }
146,848!
1386

1387
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
154,458✔
1388
  if (pDataBlock->info.rows <= 1) {
154,458✔
1389
    return TSDB_CODE_SUCCESS;
7,619✔
1390
  }
1391

1392
  // Allocate the additional buffer.
1393
  uint32_t rows = pDataBlock->info.rows;
146,839✔
1394

1395
  bool sortColumnHasNull = false;
146,839✔
1396
  bool varTypeSort = false;
146,839✔
1397

1398
  for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
366,139✔
1399
    SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
219,297✔
1400
    if (pInfo == NULL) {
219,299!
1401
      continue;
×
1402
    }
1403

1404
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
219,299✔
1405
    if (pColInfoData == NULL) {
219,300!
1406
      continue;
×
1407
    }
1408

1409
    if (pColInfoData->hasNull) {
219,300✔
1410
      sortColumnHasNull = true;
219,232✔
1411
    }
1412

1413
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
219,300✔
1414
      varTypeSort = true;
128,716✔
1415
    }
1416
  }
1417

1418
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
146,833✔
1419

1420
  if (taosArrayGetSize(pOrderInfo) == 1 && (!sortColumnHasNull)) {
146,841✔
1421
    if (numOfCols == 1) {
45!
1422
      if (!varTypeSort) {
×
1423
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, 0);
×
1424
        SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, 0);
×
1425
        if (pColInfoData == NULL || pOrder == NULL) {
×
1426
          return errno;
×
1427
        }
1428

1429
        int64_t p0 = taosGetTimestampUs();
×
1430

1431
        __compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
×
1432
        taosSort(pColInfoData->pData, pDataBlock->info.rows, pColInfoData->info.bytes, fn);
×
1433

1434
        int64_t p1 = taosGetTimestampUs();
×
1435
        uDebug("blockDataSort easy cost:%" PRId64 ", rows:%" PRId64 "\n", p1 - p0, pDataBlock->info.rows);
×
1436

1437
        return TSDB_CODE_SUCCESS;
×
1438
      } else {  // var data type
1439
      }
1440
    } else if (numOfCols == 2) {
1441
    }
1442
  }
1443

1444
  int32_t* index = createTupleIndex(rows);
146,840✔
1445
  if (index == NULL) {
146,840!
1446
    return terrno;
×
1447
  }
1448

1449
  int64_t p0 = taosGetTimestampUs();
146,846✔
1450

1451
  SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
146,846✔
1452
  for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) {
366,125✔
1453
    struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i);
219,292✔
1454
    if (pInfo == NULL) {
219,293!
1455
      continue;
×
1456
    }
1457

1458
    pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
219,293✔
1459
    if (pInfo->pColData == NULL) {
219,292!
1460
      continue;
×
1461
    }
1462
    pInfo->compFn = getKeyComparFunc(pInfo->pColData->info.type, pInfo->order);
219,292✔
1463
  }
1464

1465
  terrno = 0;
146,811✔
1466
  taosqsort_r(index, rows, sizeof(int32_t), &helper, dataBlockCompar);
146,836✔
1467
  if (terrno) return terrno;
146,847!
1468

1469
  int64_t p1 = taosGetTimestampUs();
146,846✔
1470

1471
  SColumnInfoData* pCols = NULL;
146,846✔
1472
  int32_t code = createHelpColInfoData(pDataBlock, &pCols);
146,846✔
1473
  if (code != 0) {
146,851!
1474
    destroyTupleIndex(index);
×
1475
    return code;
×
1476
  }
1477

1478
  int64_t p2 = taosGetTimestampUs();
146,851✔
1479
  blockDataAssign(pCols, pDataBlock, index);
146,851✔
1480

1481
  int64_t p3 = taosGetTimestampUs();
146,847✔
1482
  copyBackToBlock(pDataBlock, pCols);
146,847✔
1483

1484
  int64_t p4 = taosGetTimestampUs();
146,847✔
1485
  uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64
146,847✔
1486
         ", rows:%d\n",
1487
         p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows);
1488

1489
  destroyTupleIndex(index);
146,847✔
1490
  return TSDB_CODE_SUCCESS;
146,848✔
1491
}
1492

1493
void blockDataCleanup(SSDataBlock* pDataBlock) {
25,319,235✔
1494
  blockDataEmpty(pDataBlock);
25,319,235✔
1495
  SDataBlockInfo* pInfo = &pDataBlock->info;
25,307,080✔
1496
  pInfo->id.uid = 0;
25,307,080✔
1497
  pInfo->id.groupId = 0;
25,307,080✔
1498
}
25,307,080✔
1499

1500
void blockDataEmpty(SSDataBlock* pDataBlock) {
25,734,242✔
1501
  SDataBlockInfo* pInfo = &pDataBlock->info;
25,734,242✔
1502
  if (pInfo->capacity == 0) {
25,734,242✔
1503
    return;
4,807,578✔
1504
  }
1505

1506
  taosMemoryFreeClear(pDataBlock->pBlockAgg);
20,926,664✔
1507

1508
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
20,926,675✔
1509
  for (int32_t i = 0; i < numOfCols; ++i) {
125,419,170✔
1510
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
104,509,835✔
1511
    if (p == NULL) {
104,419,271!
1512
      continue;
×
1513
    }
1514

1515
    colInfoDataCleanup(p, pInfo->capacity);
104,419,271✔
1516
  }
1517

1518
  pInfo->rows = 0;
20,909,335✔
1519
  pInfo->dataLoad = 0;
20,909,335✔
1520
  pInfo->window.ekey = 0;
20,909,335✔
1521
  pInfo->window.skey = 0;
20,909,335✔
1522
}
1523

1524
void blockDataReset(SSDataBlock* pDataBlock) {
×
1525
  SDataBlockInfo* pInfo = &pDataBlock->info;
×
1526
  if (pInfo->capacity == 0) {
×
1527
    return;
×
1528
  }
1529

1530
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
×
1531
  for (int32_t i = 0; i < numOfCols; ++i) {
×
1532
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
×
1533
    if (p == NULL) {
×
1534
      continue;
×
1535
    }
1536

1537
    p->hasNull = false;
×
1538
    p->reassigned = false;
×
1539
    if (IS_VAR_DATA_TYPE(p->info.type)) {
×
1540
      p->varmeta.length = 0;
×
1541
    }
1542
  }
1543

1544
  pInfo->rows = 0;
×
1545
  pInfo->dataLoad = 0;
×
1546
  pInfo->window.ekey = 0;
×
1547
  pInfo->window.skey = 0;
×
1548
  pInfo->id.uid = 0;
×
1549
  pInfo->id.groupId = 0;
×
1550
}
1551

1552
/*
1553
 * NOTE: the type of the input column may be TSDB_DATA_TYPE_NULL, which is used to denote
1554
 * the all NULL value in this column. It is an internal representation of all NULL value column, and no visible to
1555
 * any users. The length of TSDB_DATA_TYPE_NULL is 0, and it is an special case.
1556
 */
1557
int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows,
234,931,065✔
1558
                         bool clearPayload) {
1559
  if ((numOfRows <= 0)|| (pBlockInfo && numOfRows <= pBlockInfo->capacity)) {
234,931,065!
1560
    return TSDB_CODE_SUCCESS;
×
1561
  }
1562

1563
  int32_t existedRows = pBlockInfo ? pBlockInfo->rows : 0;
234,931,065!
1564

1565
  if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
234,931,065!
1566
    char* tmp = taosMemoryRealloc(pColumn->varmeta.offset, sizeof(int32_t) * numOfRows);
21,098,619✔
1567
    if (tmp == NULL) {
20,316,251✔
1568
      return terrno;
6,302✔
1569
    }
1570

1571
    pColumn->varmeta.offset = (int32_t*)tmp;
20,309,949✔
1572
    memset(&pColumn->varmeta.offset[existedRows], 0, sizeof(int32_t) * (numOfRows - existedRows));
20,309,949✔
1573
  } else {
1574
    // prepare for the null bitmap
1575
    char* tmp = taosMemoryRealloc(pColumn->nullbitmap, BitmapLen(numOfRows));
213,832,446✔
1576
    if (tmp == NULL) {
211,047,657✔
1577
      return terrno;
399,271✔
1578
    }
1579

1580
    int32_t oldLen = BitmapLen(existedRows);
210,648,386✔
1581
    pColumn->nullbitmap = tmp;
210,648,386✔
1582
    memset(&pColumn->nullbitmap[oldLen], 0, BitmapLen(numOfRows) - oldLen);
210,648,386✔
1583
    if (pColumn->info.bytes == 0) {
210,648,386!
1584
      return TSDB_CODE_INVALID_PARA;
×
1585
    }
1586

1587
    // here we employ the aligned malloc function, to make sure that the address of allocated memory is aligned
1588
    // to MALLOC_ALIGN_BYTES
1589
    tmp = taosMemoryMallocAlign(MALLOC_ALIGN_BYTES, numOfRows * pColumn->info.bytes);
210,648,386✔
1590
    if (tmp == NULL) {
211,960,716!
UNCOV
1591
      return terrno;
×
1592
    }
1593
    // memset(tmp, 0, numOfRows * pColumn->info.bytes);
1594

1595
    // copy back the existed data
1596
    if (pColumn->pData != NULL) {
211,986,053✔
1597
      memcpy(tmp, pColumn->pData, existedRows * pColumn->info.bytes);
78,602,390✔
1598
      taosMemoryFreeClear(pColumn->pData);
78,602,390!
1599
    }
1600

1601
    pColumn->pData = tmp;
211,599,716✔
1602

1603
    // check if the allocated memory is aligned to the requried bytes.
1604
#if defined LINUX
1605
    if ((((uint64_t)pColumn->pData) & (MALLOC_ALIGN_BYTES - 1)) != 0x0) {
211,599,716!
1606
      return TSDB_CODE_FAILED;
×
1607
    }
1608
#endif
1609

1610
    if (clearPayload) {
211,599,716✔
1611
      memset(tmp + pColumn->info.bytes * existedRows, 0, pColumn->info.bytes * (numOfRows - existedRows));
168,568,692✔
1612
    }
1613
  }
1614

1615
  return TSDB_CODE_SUCCESS;
231,909,665✔
1616
}
1617

1618
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
186,361,388✔
1619
  pColumn->hasNull = false;
186,361,388✔
1620

1621
  if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
186,361,388!
1622
    pColumn->varmeta.length = 0;
26,696,132✔
1623
    if (pColumn->varmeta.offset != NULL) {
26,696,132!
1624
      memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows);
27,039,509✔
1625
    }
1626
  } else {
1627
    if (pColumn->nullbitmap != NULL) {
159,665,256✔
1628
      memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
159,656,986✔
1629
    }
1630
  }
1631
}
186,361,388✔
1632

1633
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload) {
174,189,578✔
1634
  SDataBlockInfo info = {0};
174,189,578✔
1635
  return doEnsureCapacity(pColumn, &info, numOfRows, clearPayload);
174,189,578✔
1636
}
1637

1638
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
367,612,307✔
1639
  int32_t code = 0;
367,612,307✔
1640
  if (numOfRows == 0 || numOfRows <= pDataBlock->info.capacity) {
367,612,307!
1641
    return TSDB_CODE_SUCCESS;
354,961,621✔
1642
  }
1643

1644
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
12,650,686✔
1645
  for (int32_t i = 0; i < numOfCols; ++i) {
67,821,008✔
1646
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
55,132,744✔
1647
    if (p == NULL) {
54,922,828!
1648
      return terrno;
×
1649
    }
1650

1651
    code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, false);
54,922,828✔
1652
    if (code) {
55,174,340!
1653
      return code;
×
1654
    }
1655
  }
1656

1657
  pDataBlock->info.capacity = numOfRows;
12,688,264✔
1658
  return TSDB_CODE_SUCCESS;
12,688,264✔
1659
}
1660

1661
void blockDataFreeRes(SSDataBlock* pBlock) {
13,843,454✔
1662
  if (pBlock == NULL){
13,843,454✔
1663
    return;
117,610✔
1664
  }
1665

1666
  int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
13,725,844✔
1667
  for (int32_t i = 0; i < numOfOutput; ++i) {
74,835,362✔
1668
    SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
60,942,055✔
1669
    if (pColInfoData == NULL) {
60,608,002!
1670
      continue;
×
1671
    }
1672

1673
    colDataDestroy(pColInfoData);
60,608,002✔
1674
  }
1675

1676
  taosArrayDestroy(pBlock->pDataBlock);
13,893,307✔
1677
  pBlock->pDataBlock = NULL;
13,740,590✔
1678

1679
  taosMemoryFreeClear(pBlock->pBlockAgg);
13,740,590✔
1680
  memset(&pBlock->info, 0, sizeof(SDataBlockInfo));
13,730,514✔
1681
}
1682

1683
void blockDataDestroy(SSDataBlock* pBlock) {
13,631,118✔
1684
  if (pBlock == NULL) {
13,631,118✔
1685
    return;
140,377✔
1686
  }
1687

1688
  if (IS_VAR_DATA_TYPE(pBlock->info.pks[0].type)) {
13,490,741!
1689
    taosMemoryFreeClear(pBlock->info.pks[0].pData);
128,944!
1690
    taosMemoryFreeClear(pBlock->info.pks[1].pData);
128,964!
1691
  }
1692

1693
  blockDataFreeRes(pBlock);
13,490,753✔
1694
  taosMemoryFreeClear(pBlock);
13,485,695!
1695
}
1696

1697
// todo remove it
1698
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
18,348✔
1699
  int32_t code = 0;
18,348✔
1700

1701
  dst->info = src->info;
18,348✔
1702
  dst->info.pks[0].pData = NULL;
18,348✔
1703
  dst->info.pks[1].pData = NULL;
18,348✔
1704
  dst->info.rows = 0;
18,348✔
1705
  dst->info.capacity = 0;
18,348✔
1706

1707
  size_t numOfCols = taosArrayGetSize(src->pDataBlock);
18,348✔
1708
  for (int32_t i = 0; i < numOfCols; ++i) {
255,889✔
1709
    SColumnInfoData* p = taosArrayGet(src->pDataBlock, i);
237,542✔
1710
    if (p == NULL) {
237,542!
1711
      return terrno;
×
1712
    }
1713

1714
    SColumnInfoData  colInfo = {.hasNull = true, .info = p->info};
237,542✔
1715
    code = blockDataAppendColInfo(dst, &colInfo);
237,542✔
1716
    if (code) {
237,541!
1717
      return code;
×
1718
    }
1719
  }
1720

1721
  code = blockDataEnsureCapacity(dst, src->info.rows);
18,347✔
1722
  if (code != TSDB_CODE_SUCCESS) {
18,348!
1723
    return code;
×
1724
  }
1725

1726
  for (int32_t i = 0; i < numOfCols; ++i) {
255,874✔
1727
    SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i);
237,527✔
1728
    SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i);
237,526✔
1729
    if (pSrc == NULL || pDst == NULL || (pSrc->pData == NULL && (!IS_VAR_DATA_TYPE(pSrc->info.type)))) {
237,526!
1730
      continue;
×
1731
    }
1732

1733
    int32_t ret = colDataAssign(pDst, pSrc, src->info.rows, &src->info);
237,526✔
1734
    if (ret < 0) {
237,526!
1735
      return ret;
×
1736
    }
1737
  }
1738

1739
  uint32_t cap = dst->info.capacity;
18,347✔
1740
  dst->info = src->info;
18,347✔
1741
  dst->info.pks[0].pData = NULL;
18,347✔
1742
  dst->info.pks[1].pData = NULL;
18,347✔
1743
  dst->info.capacity = cap;
18,347✔
1744
  return code;
18,347✔
1745
}
1746

1747
int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) {
1,070,212✔
1748
  blockDataCleanup(pDst);
1,070,212✔
1749

1750
  int32_t code = blockDataEnsureCapacity(pDst, pSrc->info.rows);
1,074,829✔
1751
  if (code != TSDB_CODE_SUCCESS) {
1,074,543!
1752
    return code;
×
1753
  }
1754

1755
  size_t numOfCols = taosArrayGetSize(pSrc->pDataBlock);
1,074,543✔
1756
  for (int32_t i = 0; i < numOfCols; ++i) {
7,192,813✔
1757
    SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
6,113,350✔
1758
    SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, i);
6,097,347✔
1759
    if (pDstCol == NULL || pSrcCol == NULL) {
6,091,417!
1760
      continue;
×
1761
    }
1762

1763
    int32_t ret = colDataAssign(pDstCol, pSrcCol, pSrc->info.rows, &pSrc->info);
6,091,417✔
1764
    if (ret < 0) {
6,119,199!
1765
      code = ret;
×
1766
      return code;
×
1767
    }
1768
  }
1769

1770
  uint32_t cap = pDst->info.capacity;
1,079,463✔
1771

1772
  pDst->info = pSrc->info;
1,079,463✔
1773
  pDst->info.pks[0].pData = NULL;
1,079,463✔
1774
  pDst->info.pks[1].pData = NULL;
1,079,463✔
1775
  code = copyPkVal(&pDst->info, &pSrc->info);
1,079,463✔
1776
  if (code != TSDB_CODE_SUCCESS) {
1,108,871!
UNCOV
1777
    uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1778
    return code;
×
1779
  }
1780

1781
  pDst->info.capacity = cap;
1,109,269✔
1782
  return code;
1,109,269✔
1783
}
1784

1785
int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock) {
34,574✔
1786
  QRY_PARAM_CHECK(pBlock);
34,574!
1787

1788
  int32_t      code = 0;
34,574✔
1789
  SSDataBlock* p = taosMemoryCalloc(1, sizeof(SSDataBlock));
34,574✔
1790
  if (p == NULL) {
34,574!
1791
    return terrno;
×
1792
  }
1793

1794
  p->info.hasVarCol = false;
34,574✔
1795
  p->info.id.groupId = 0;
34,574✔
1796
  p->info.rows = 0;
34,574✔
1797
  p->info.type = type;
34,574✔
1798
  p->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) +
34,574✔
1799
                    sizeof(TSKEY) + VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
1800
  p->info.watermark = INT64_MIN;
34,574✔
1801

1802
  p->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
34,574✔
1803
  if (p->pDataBlock == NULL) {
34,573!
1804
    taosMemoryFree(p);
×
1805
    return terrno;
×
1806
  }
1807

1808
  SColumnInfoData infoData = {0};
34,573✔
1809
  infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
34,573✔
1810
  infoData.info.bytes = sizeof(TSKEY);
34,573✔
1811

1812
  // window start ts
1813
  void* px = taosArrayPush(p->pDataBlock, &infoData);
34,573✔
1814
  if (px == NULL) {
34,574!
1815
    code = errno;
×
1816
    goto _err;
×
1817
  }
1818

1819
  // window end ts
1820
  px = taosArrayPush(p->pDataBlock, &infoData);
34,574✔
1821
  if (px == NULL) {
34,568!
1822
    code = errno;
×
1823
    goto _err;
×
1824
  }
1825

1826
  infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
34,568✔
1827
  infoData.info.bytes = sizeof(uint64_t);
34,568✔
1828

1829
  // uid
1830
  px = taosArrayPush(p->pDataBlock, &infoData);
34,568✔
1831
  if (px == NULL) {
34,573!
1832
    code = errno;
×
1833
    goto _err;
×
1834
  }
1835

1836
  // group id
1837
  px = taosArrayPush(p->pDataBlock, &infoData);
34,573✔
1838
  if (px == NULL) {
34,568!
1839
    code = errno;
×
1840
    goto _err;
×
1841
  }
1842

1843
  infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
34,568✔
1844
  infoData.info.bytes = sizeof(TSKEY);
34,568✔
1845

1846
  // calculate start ts
1847
  px = taosArrayPush(p->pDataBlock, &infoData);
34,568✔
1848
  if (px == NULL) {
34,573!
1849
    code = errno;
×
1850
    goto _err;
×
1851
  }
1852

1853
  // calculate end ts
1854
  px = taosArrayPush(p->pDataBlock, &infoData);
34,573✔
1855
  if (px == NULL) {
34,569!
1856
    code = errno;
×
1857
    goto _err;
×
1858
  }
1859

1860
  // table name
1861
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
34,569✔
1862
  infoData.info.bytes = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
34,569✔
1863
  px = taosArrayPush(p->pDataBlock, &infoData);
34,569✔
1864
  if (px == NULL) {
34,574!
1865
    code = errno;
×
1866
    goto _err;
×
1867
  }
1868

1869
  *pBlock = p;
34,574✔
1870
  return code;
34,574✔
1871

1872
_err:
×
1873
  taosArrayDestroy(p->pDataBlock);
×
1874
  taosMemoryFree(p);
×
1875
  return code;
×
1876
}
1877

1878
int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlock** pResBlock) {
3,300✔
1879
  QRY_PARAM_CHECK(pResBlock);
3,300!
1880

1881
  if (pDataBlock == NULL) {
3,300!
1882
    return TSDB_CODE_INVALID_PARA;
×
1883
  }
1884

1885
  SSDataBlock* pBlock = NULL;
3,300✔
1886
  int32_t code = createDataBlock(&pBlock);
3,300✔
1887
  if (code) {
3,300!
1888
    return code;
×
1889
  }
1890

1891
  pBlock->info = pDataBlock->info;
3,300✔
1892
  pBlock->info.pks[0].pData = NULL;
3,300✔
1893
  pBlock->info.pks[1].pData = NULL;
3,300✔
1894
  pBlock->info.rows = 0;
3,300✔
1895
  pBlock->info.capacity = 0;
3,300✔
1896

1897
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
3,300✔
1898
  for (int32_t i = 0; i < numOfCols; ++i) {
107,907✔
1899
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
104,607✔
1900
    if (p == NULL) {
104,607!
1901
      blockDataDestroy(pBlock);
×
1902
      return terrno;
×
1903
    }
1904

1905
    SColumnInfoData  colInfo = {.hasNull = true, .info = p->info};
104,607✔
1906
    code = blockDataAppendColInfo(pBlock, &colInfo);
104,607✔
1907
    if (code) {
104,607!
1908
      blockDataDestroy(pBlock);
×
1909
      return code;
×
1910
    }
1911
  }
1912

1913
  code = blockDataEnsureCapacity(pBlock, 1);
3,300✔
1914
  if (code != TSDB_CODE_SUCCESS) {
3,300!
1915
    blockDataDestroy(pBlock);
×
1916
    return code;
×
1917
  }
1918

1919
  for (int32_t i = 0; i < numOfCols; ++i) {
107,911✔
1920
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
104,612✔
1921
    SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
104,612✔
1922
    if (pDst == NULL || pSrc == NULL) {
104,611!
1923
      blockDataDestroy(pBlock);
×
1924
      return terrno;
×
1925
    }
1926

1927
    bool  isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL);
104,611✔
1928
    void* pData = NULL;
104,611✔
1929
    if (!isNull) {
104,611✔
1930
      pData = colDataGetData(pSrc, rowIdx);
102,204!
1931
    }
1932

1933
    code = colDataSetVal(pDst, 0, pData, isNull);
104,611✔
1934
    if (code) {
104,611!
1935
      blockDataDestroy(pBlock);
×
1936
      return code;
×
1937
    }
1938
  }
1939

1940
  pBlock->info.rows = 1;
3,299✔
1941

1942
  *pResBlock = pBlock;
3,299✔
1943
  return code;
3,299✔
1944
}
1945

1946
int32_t copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc) {
10,400,539✔
1947
  int32_t code = TSDB_CODE_SUCCESS;
10,400,539✔
1948
  int32_t lino = 0;
10,400,539✔
1949
  if (!IS_VAR_DATA_TYPE(pSrc->pks[0].type)) {
10,400,539!
1950
    return code;
10,303,734✔
1951
  }
1952

1953
  // prepare the pk buffer if needed
1954
  SValue* p = &pDst->pks[0];
96,805✔
1955

1956
  p->type = pSrc->pks[0].type;
96,805✔
1957
  p->pData = taosMemoryCalloc(1, pSrc->pks[0].nData);
96,805✔
1958
  QUERY_CHECK_NULL(p->pData, code, lino, _end, terrno);
100,973!
1959

1960
  p->nData = pSrc->pks[0].nData;
100,973✔
1961
  memcpy(p->pData, pSrc->pks[0].pData, p->nData);
100,973✔
1962

1963
  p = &pDst->pks[1];
100,973✔
1964
  p->type = pSrc->pks[1].type;
100,973✔
1965
  p->pData = taosMemoryCalloc(1, pSrc->pks[1].nData);
100,973✔
1966
  QUERY_CHECK_NULL(p->pData, code, lino, _end, terrno);
100,939!
1967

1968
  p->nData = pSrc->pks[1].nData;
100,952✔
1969
  memcpy(p->pData, pSrc->pks[1].pData, p->nData);
100,952✔
1970

1971
_end:
100,952✔
1972
  if (code != TSDB_CODE_SUCCESS) {
100,952!
1973
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1974
  }
1975
  return code;
100,924✔
1976
}
1977

1978
int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataBlock** pResBlock) {
9,281,059✔
1979
  QRY_PARAM_CHECK(pResBlock);
9,281,059!
1980
  if (pDataBlock == NULL) {
9,281,059!
1981
    return TSDB_CODE_INVALID_PARA;
×
1982
  }
1983

1984
  SSDataBlock* pDstBlock = NULL;
9,281,059✔
1985
  int32_t code = createDataBlock(&pDstBlock);
9,281,059✔
1986
  if (code) {
9,300,674!
1987
    return code;
×
1988
  }
1989

1990
  pDstBlock->info = pDataBlock->info;
9,300,674✔
1991
  pDstBlock->info.pks[0].pData = NULL;
9,300,674✔
1992
  pDstBlock->info.pks[1].pData = NULL;
9,300,674✔
1993

1994
  pDstBlock->info.rows = 0;
9,300,674✔
1995
  pDstBlock->info.capacity = 0;
9,300,674✔
1996
  pDstBlock->info.rowSize = 0;
9,300,674✔
1997
  pDstBlock->info.id = pDataBlock->info.id;
9,300,674✔
1998
  pDstBlock->info.blankFill = pDataBlock->info.blankFill;
9,300,674✔
1999

2000
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
9,300,674✔
2001
  for (int32_t i = 0; i < numOfCols; ++i) {
52,867,528✔
2002
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
43,553,336✔
2003
    if (p == NULL) {
43,383,880!
2004
      blockDataDestroy(pDstBlock);
×
2005
      return terrno;
×
2006
    }
2007

2008
    SColumnInfoData  colInfo = {.hasNull = true, .info = p->info};
43,383,880✔
2009
    code = blockDataAppendColInfo(pDstBlock, &colInfo);
43,383,880✔
2010
    if (code) {
43,564,033!
2011
      blockDataDestroy(pDstBlock);
×
2012
      return code;
×
2013
    }
2014
  }
2015

2016
  code = copyPkVal(&pDstBlock->info, &pDataBlock->info);
9,314,192✔
2017
  if (code != TSDB_CODE_SUCCESS) {
9,295,047!
2018
    blockDataDestroy(pDstBlock);
×
2019
    uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2020
    return code;
×
2021
  }
2022

2023
  if (copyData) {
9,301,666✔
2024
    code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows);
4,275,435✔
2025
    if (code != TSDB_CODE_SUCCESS) {
4,277,434!
2026
      blockDataDestroy(pDstBlock);
×
2027
      return code;
×
2028
    }
2029

2030
    for (int32_t i = 0; i < numOfCols; ++i) {
24,970,180✔
2031
      SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
20,705,040✔
2032
      SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
20,680,039✔
2033
      if (pDst == NULL) {
20,665,340!
2034
        blockDataDestroy(pDstBlock);
×
2035
        uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2036
        return terrno;
×
2037
      }
2038

2039
      if (pSrc == NULL) {
20,665,340!
2040
        blockDataDestroy(pDstBlock);
×
2041
        uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2042
        return terrno;
×
2043
      }
2044

2045
      int32_t ret = colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
20,665,340✔
2046
      if (ret < 0) {
20,690,865!
2047
        code = ret;
×
2048
        blockDataDestroy(pDstBlock);
×
2049
        return code;
×
2050
      }
2051
    }
2052

2053
    pDstBlock->info.rows = pDataBlock->info.rows;
4,265,140✔
2054
    pDstBlock->info.capacity = pDataBlock->info.rows;
4,265,140✔
2055
  }
2056

2057
  *pResBlock = pDstBlock;
9,291,371✔
2058
  return code;
9,291,371✔
2059
}
2060

2061
int32_t createDataBlock(SSDataBlock** pResBlock) {
13,863,271✔
2062
  QRY_PARAM_CHECK(pResBlock);
13,863,271!
2063
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
13,863,271✔
2064
  if (pBlock == NULL) {
13,881,068!
2065
    return terrno;
×
2066
  }
2067

2068
  pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
13,881,068✔
2069
  if (pBlock->pDataBlock == NULL) {
13,884,990!
UNCOV
2070
    int32_t code = terrno;
×
2071
    taosMemoryFree(pBlock);
×
2072
    return code;
×
2073
  }
2074

2075
  *pResBlock = pBlock;
13,885,489✔
2076
  return 0;
13,885,489✔
2077
}
2078

2079
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData) {
63,024,510✔
2080
  if (pBlock->pDataBlock == NULL) {
63,024,510✔
2081
    pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
191,435✔
2082
    if (pBlock->pDataBlock == NULL) {
191,524!
2083
      return terrno;
×
2084
    }
2085
  }
2086

2087
  void* p = taosArrayPush(pBlock->pDataBlock, pColInfoData);
63,024,599✔
2088
  if (p == NULL) {
63,259,689!
2089
    return terrno;
×
2090
  }
2091

2092
  // todo disable it temporarily
2093
  //  A S S E R T(pColInfoData->info.type != 0);
2094
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
63,259,689✔
2095
    pBlock->info.hasVarCol = true;
14,566,407✔
2096
  }
2097

2098
  pBlock->info.rowSize += pColInfoData->info.bytes;
63,259,689✔
2099
  return TSDB_CODE_SUCCESS;
63,259,689✔
2100
}
2101

2102
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId) {
20,010,334✔
2103
  SColumnInfoData col = {.hasNull = true};
20,010,334✔
2104
  col.info.colId = colId;
20,010,334✔
2105
  col.info.type = type;
20,010,334✔
2106
  col.info.bytes = bytes;
20,010,334✔
2107

2108
  return col;
20,010,334✔
2109
}
2110

2111
int32_t bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index, SColumnInfoData** pColInfoData) {
8,807,865✔
2112
  int32_t code = 0;
8,807,865✔
2113
  QRY_PARAM_CHECK(pColInfoData);
8,807,865!
2114

2115
  if (index >= taosArrayGetSize(pBlock->pDataBlock)) {
8,807,865!
2116
    return TSDB_CODE_INVALID_PARA;
×
2117
  }
2118

2119
  *pColInfoData = taosArrayGet(pBlock->pDataBlock, index);
8,807,475✔
2120
  if (*pColInfoData == NULL) {
8,807,380!
2121
    code = terrno;
×
2122
  }
2123

2124
  return code;
8,807,501✔
2125
}
2126

2127
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int32_t extraSize) {
239,315✔
2128
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
239,315✔
2129

2130
  int32_t payloadSize = pageSize - extraSize;
239,317✔
2131
  int32_t rowSize = pBlock->info.rowSize;
239,317✔
2132
  int32_t nRows = payloadSize / rowSize;
239,317✔
2133
  if (nRows < 1) {
239,317✔
2134
    uError("rows %d in page is too small, payloadSize:%d, rowSize:%d", nRows, payloadSize, rowSize);
5!
2135
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
5✔
2136
    return -1;
×
2137
  }
2138

2139
  int32_t numVarCols = 0;
239,312✔
2140
  int32_t numFixCols = 0;
239,312✔
2141
  for (int32_t i = 0; i < numOfCols; ++i) {
1,158,717✔
2142
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
919,406✔
2143
    if (pCol == NULL) {
919,405!
2144
      return -1;
×
2145
    }
2146

2147
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
919,405!
2148
      ++numVarCols;
135,710✔
2149
    } else {
2150
      ++numFixCols;
783,695✔
2151
    }
2152
  }
2153

2154
  // find the data payload whose size is greater than payloadSize
2155
  int result = -1;
239,311✔
2156
  int start = 1;
239,311✔
2157
  int end = nRows;
239,311✔
2158
  while (start <= end) {
2,182,506✔
2159
    int mid = start + (end - start) / 2;
1,943,195✔
2160
    // data size + var data type columns offset + fixed data type columns bitmap len
2161
    int midSize = rowSize * mid + numVarCols * sizeof(int32_t) * mid + numFixCols * BitmapLen(mid);
1,943,195✔
2162
    if (midSize > payloadSize) {
1,943,195✔
2163
      result = mid;
428,548✔
2164
      end = mid - 1;
428,548✔
2165
    } else {
2166
      start = mid + 1;
1,514,647✔
2167
    }
2168
  }
2169

2170
  int32_t newRows = (result != -1) ? result - 1 : nRows;
239,311✔
2171
  // the true value must be less than the value of nRows
2172
  if (newRows > nRows || newRows < 1) {
239,311!
UNCOV
2173
    uError("invalid newRows:%d, nRows:%d", newRows, nRows);
×
UNCOV
2174
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2175
    return -1;
×
2176
  }
2177

2178
  return newRows;
239,361✔
2179
}
2180

2181
void colDataDestroy(SColumnInfoData* pColData) {
160,292,075✔
2182
  if (!pColData) {
160,292,075✔
2183
    return;
1,064✔
2184
  }
2185

2186
  if (IS_VAR_DATA_TYPE(pColData->info.type)) {
160,291,011!
2187
    taosMemoryFreeClear(pColData->varmeta.offset);
21,066,900✔
2188
  } else {
2189
    taosMemoryFreeClear(pColData->nullbitmap);
139,224,111✔
2190
  }
2191

2192
  taosMemoryFreeClear(pColData->pData);
162,323,767✔
2193
}
2194

2195
static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
397,772✔
2196
  int32_t len = BitmapLen(total);
397,772✔
2197

2198
  int32_t newLen = BitmapLen(total - n);
397,772✔
2199
  if (n % 8 == 0) {
397,772✔
2200
    (void) memmove(nullBitmap, nullBitmap + n / 8, newLen);
6,215✔
2201
  } else {
2202
    int32_t  tail = n % 8;
391,557✔
2203
    int32_t  i = 0;
391,557✔
2204
    uint8_t* p = (uint8_t*)nullBitmap;
391,557✔
2205

2206
    if (n < 8) {
391,557✔
2207
      while (i < len) {
634,292✔
2208
        uint8_t v = p[i];  // source bitmap value
583,419✔
2209
        p[i] = (v << tail);
583,419✔
2210

2211
        if (i < len - 1) {
583,419✔
2212
          uint8_t next = p[i + 1];
532,548✔
2213
          p[i] |= (next >> (8 - tail));
532,548✔
2214
        }
2215

2216
        i += 1;
583,419✔
2217
      }
2218
    } else if (n > 8) {
340,684!
2219
      int32_t remain = (total % 8 != 0 && total % 8 <= tail) ? 1 : 0;
340,692✔
2220
      int32_t gap = len - newLen - remain;
340,692✔
2221
      while (i < newLen) {
6,448,013✔
2222
        uint8_t v = p[i + gap];
6,107,321✔
2223
        p[i] = (v << tail);
6,107,321✔
2224

2225
        if (i < newLen - 1 + remain) {
6,107,321✔
2226
          uint8_t next = p[i + gap + 1];
5,993,556✔
2227
          p[i] |= (next >> (8 - tail));
5,993,556✔
2228
        }
2229

2230
        i += 1;
6,107,321✔
2231
      }
2232
    }
2233
  }
2234
}
397,772✔
2235

2236
static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, size_t end) {
×
2237
  int32_t dataOffset = -1;
×
2238
  int32_t dataLen = 0;
×
2239
  int32_t beigin = start;
×
2240
  while (beigin < end) {
×
2241
    int32_t offset = pColInfoData->varmeta.offset[beigin];
×
2242
    if (offset == -1) {
×
2243
      beigin++;
×
2244
      continue;
×
2245
    }
2246
    if (start != 0) {
×
2247
      pColInfoData->varmeta.offset[beigin] = dataLen;
×
2248
    }
2249
    char* data = pColInfoData->pData + offset;
×
2250
    if (dataOffset == -1) dataOffset = offset;  // mark the begin of data
×
2251
    int32_t type = pColInfoData->info.type;
×
2252
    if (type == TSDB_DATA_TYPE_JSON) {
×
2253
      dataLen += getJsonValueLen(data);
×
2254
    } else {
2255
      dataLen += varDataTLen(data);
×
2256
    }
2257
    beigin++;
×
2258
  }
2259

2260
  if (dataOffset > 0) {
×
2261
    (void) memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen);
×
2262
  }
2263

2264
  (void) memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t));
×
2265
  return dataLen;
×
2266
}
2267

2268
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
488,545✔
2269
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
488,545!
2270
    // pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, n, total);
2271
    (void) memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t));
90,799✔
2272

2273
    // clear the offset value of the unused entries.
2274
    memset(&pColInfoData->varmeta.offset[total - n], 0, n);
90,799✔
2275
  } else {
2276
    int32_t bytes = pColInfoData->info.bytes;
397,746✔
2277
    (void) memmove(pColInfoData->pData, ((char*)pColInfoData->pData + n * bytes), (total - n) * bytes);
397,746✔
2278
    doShiftBitmap(pColInfoData->nullbitmap, n, total);
397,746✔
2279
  }
2280
}
488,572✔
2281

2282
int32_t blockDataTrimFirstRows(SSDataBlock* pBlock, size_t n) {
626,648✔
2283
  if (n == 0) {
626,648✔
2284
    return TSDB_CODE_SUCCESS;
403,502✔
2285
  }
2286

2287
  if (pBlock->info.rows <= n) {
223,146✔
2288
    blockDataEmpty(pBlock);
770✔
2289
  } else {
2290
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
222,376✔
2291
    for (int32_t i = 0; i < numOfCols; ++i) {
710,967✔
2292
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
488,553✔
2293
      if (pColInfoData == NULL) {
488,549!
2294
        return terrno;
×
2295
      }
2296

2297
      colDataTrimFirstNRows(pColInfoData, n, pBlock->info.rows);
488,549✔
2298
    }
2299

2300
    pBlock->info.rows -= n;
222,414✔
2301
  }
2302
  return TSDB_CODE_SUCCESS;
223,178✔
2303
}
2304

2305
static void colDataKeepFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
125,779✔
2306
  if (n >= total || n == 0) return;
125,779!
2307
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
125,781!
2308
    if (pColInfoData->varmeta.length != 0) {
26,177✔
2309
      int32_t newLen = pColInfoData->varmeta.offset[n];
22,284✔
2310
      if (-1 == newLen) {
22,284✔
2311
        for (int i = n - 1; i >= 0; --i) {
82✔
2312
          newLen = pColInfoData->varmeta.offset[i];
71✔
2313
          if (newLen != -1) {
71✔
2314
            if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
16!
2315
              newLen += getJsonValueLen(pColInfoData->pData + newLen);
×
2316
            } else {
2317
              newLen += varDataTLen(pColInfoData->pData + newLen);
16✔
2318
            }
2319
            break;
16✔
2320
          }
2321
        }
2322
      }
2323
      if (newLen <= -1) {
22,284✔
2324
        uFatal("colDataKeepFirstNRows: newLen:%d  old:%d", newLen, pColInfoData->varmeta.length);
11!
2325
      } else {
2326
        pColInfoData->varmeta.length = newLen;
22,273✔
2327
      }
2328
    }
2329
    // pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, 0, n);
2330
    memset(&pColInfoData->varmeta.offset[n], 0, total - n);
26,182✔
2331
  }
2332
}
2333

2334
void blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n) {
301,423✔
2335
  if (n == 0) {
301,423✔
2336
    blockDataEmpty(pBlock);
23,282✔
2337
    return ;
23,282✔
2338
  }
2339

2340
  if (pBlock->info.rows <= n) {
278,141✔
2341
    return ;
226,964✔
2342
  } else {
2343
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
51,177✔
2344
    for (int32_t i = 0; i < numOfCols; ++i) {
176,983✔
2345
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
125,779✔
2346
      if (pColInfoData == NULL) {
125,783!
2347
        continue;
×
2348
      }
2349

2350
      colDataKeepFirstNRows(pColInfoData, n, pBlock->info.rows);
125,783✔
2351
    }
2352

2353
    pBlock->info.rows = n;
51,204✔
2354
  }
2355
}
2356

2357
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
117,107✔
2358
  int64_t tbUid = pBlock->info.id.uid;
117,107✔
2359
  int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
117,107✔
2360
  int16_t hasVarCol = pBlock->info.hasVarCol;
117,156✔
2361
  int64_t rows = pBlock->info.rows;
117,156✔
2362
  int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
117,156✔
2363

2364
  int32_t tlen = 0;
117,160✔
2365
  tlen += taosEncodeFixedI64(buf, tbUid);
117,160✔
2366
  tlen += taosEncodeFixedI16(buf, numOfCols);
117,160✔
2367
  tlen += taosEncodeFixedI16(buf, hasVarCol);
234,320✔
2368
  tlen += taosEncodeFixedI64(buf, rows);
117,160✔
2369
  tlen += taosEncodeFixedI32(buf, sz);
117,160✔
2370
  for (int32_t i = 0; i < sz; i++) {
235,963✔
2371
    SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
118,522✔
2372
    if (pColData == NULL) {
118,571!
2373
      return terrno;
×
2374
    }
2375

2376
    tlen += taosEncodeFixedI16(buf, pColData->info.colId);
118,598✔
2377
    tlen += taosEncodeFixedI8(buf, pColData->info.type);
118,598✔
2378
    tlen += taosEncodeFixedI32(buf, pColData->info.bytes);
118,598✔
2379
    tlen += taosEncodeFixedBool(buf, pColData->hasNull);
118,598✔
2380

2381
    if (IS_VAR_DATA_TYPE(pColData->info.type)) {
118,598!
2382
      tlen += taosEncodeBinary(buf, pColData->varmeta.offset, sizeof(int32_t) * rows);
740✔
2383
    } else {
2384
      tlen += taosEncodeBinary(buf, pColData->nullbitmap, BitmapLen(rows));
236,456✔
2385
    }
2386

2387
    int32_t len = colDataGetLength(pColData, rows);
118,598✔
2388
    tlen += taosEncodeFixedI32(buf, len);
118,803✔
2389

2390
    if (pColData->reassigned && IS_VAR_DATA_TYPE(pColData->info.type)) {
118,803!
2391
      for (int32_t row = 0; row < rows; ++row) {
×
2392
        char*   pData = pColData->pData + pColData->varmeta.offset[row];
×
2393
        int32_t colSize = 0;
×
2394
        if (pColData->info.type == TSDB_DATA_TYPE_JSON) {
×
2395
          colSize = getJsonValueLen(pData);
×
2396
        } else {
2397
          colSize = varDataTLen(pData);
×
2398
        }
2399
        tlen += taosEncodeBinary(buf, pData, colSize);
×
2400
      }
2401
    } else {
2402
      tlen += taosEncodeBinary(buf, pColData->pData, len);
237,606✔
2403
    }
2404
  }
2405
  return tlen;
117,441✔
2406
}
2407

2408
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
58,004✔
2409
  int32_t sz = 0;
58,004✔
2410
  int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
58,004✔
2411

2412
  buf = taosDecodeFixedU64(buf, &pBlock->info.id.uid);
116,046!
2413
  buf = taosDecodeFixedI16(buf, &numOfCols);
58,023✔
2414
  buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol);
58,023!
2415
  buf = taosDecodeFixedI64(buf, &pBlock->info.rows);
116,046!
2416
  buf = taosDecodeFixedI32(buf, &sz);
58,023✔
2417

2418
  pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
58,023✔
2419
  if (pBlock->pDataBlock == NULL) {
58,198!
2420
    return NULL;
×
2421
  }
2422

2423
  for (int32_t i = 0; i < sz; i++) {
117,052✔
2424
    SColumnInfoData data = {0};
58,714!
2425
    buf = taosDecodeFixedI16(buf, &data.info.colId);
58,714✔
2426
    buf = taosDecodeFixedI8(buf, &data.info.type);
58,714!
2427
    buf = taosDecodeFixedI32(buf, &data.info.bytes);
58,714✔
2428
    buf = taosDecodeFixedBool(buf, &data.hasNull);
58,714✔
2429

2430
    if (IS_VAR_DATA_TYPE(data.info.type)) {
58,714!
2431
      buf = taosDecodeBinary(buf, (void**)&data.varmeta.offset, pBlock->info.rows * sizeof(int32_t));
397✔
2432
    } else {
2433
      buf = taosDecodeBinary(buf, (void**)&data.nullbitmap, BitmapLen(pBlock->info.rows));
117,225✔
2434
    }
2435
    if(buf == NULL) {
58,908!
2436
      uError("failed to decode null bitmap/offset, type:%d", data.info.type);
×
2437
      goto _error;
×
2438
    }
2439

2440
    int32_t len = 0;
58,908✔
2441
    buf = taosDecodeFixedI32(buf, &len);
58,908✔
2442
    buf = taosDecodeBinary(buf, (void**)&data.pData, len);
58,908✔
2443
    if (buf == NULL) {
58,906!
2444
      uError("failed to decode data, type:%d", data.info.type);
×
2445
      goto _error;
×
2446
    }
2447
    if (IS_VAR_DATA_TYPE(data.info.type)) {
58,906!
2448
      data.varmeta.length = len;
253✔
2449
      data.varmeta.allocLen = len;
253✔
2450
    }
2451

2452
    void* px = taosArrayPush(pBlock->pDataBlock, &data);
58,906✔
2453
    if (px == NULL) {
58,854!
2454
      return NULL;
×
2455
    }
2456
  }
2457

2458
  return (void*)buf;
58,338✔
2459
_error:
×
2460
  for (int32_t i = 0; i < sz; ++i) {
×
2461
    SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
×
2462
    if (pColInfoData == NULL) {
×
2463
      break;
×
2464
    }
2465
    colDataDestroy(pColInfoData);
×
2466
  }
2467
  return NULL;
×
2468
}
2469

2470
static int32_t formatTimestamp(char* buf, size_t cap, int64_t val, int precision) {
41,488,087✔
2471
  time_t  tt;
2472
  int32_t ms = 0;
41,488,087✔
2473
  int32_t code = TSDB_CODE_SUCCESS;
41,488,087✔
2474
  int32_t lino = 0;
41,488,087✔
2475
  if (precision == TSDB_TIME_PRECISION_NANO) {
41,488,087!
2476
    tt = (time_t)(val / 1000000000);
×
2477
    ms = val % 1000000000;
×
2478
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
41,488,087!
2479
    tt = (time_t)(val / 1000000);
×
2480
    ms = val % 1000000;
×
2481
  } else {
2482
    tt = (time_t)(val / 1000);
41,488,087✔
2483
    ms = val % 1000;
41,488,087✔
2484
  }
2485

2486
  if (tt <= 0 && ms < 0) {
41,488,087!
2487
    tt--;
×
2488
    if (precision == TSDB_TIME_PRECISION_NANO) {
×
2489
      ms += 1000000000;
×
2490
    } else if (precision == TSDB_TIME_PRECISION_MICRO) {
×
2491
      ms += 1000000;
×
2492
    } else {
2493
      ms += 1000;
×
2494
    }
2495
  }
2496
  struct tm ptm = {0};
41,488,087✔
2497
  if (taosLocalTime(&tt, &ptm, buf, cap) == NULL) {
41,488,087!
2498
    code =  TSDB_CODE_INTERNAL_ERROR;
×
2499
    TSDB_CHECK_CODE(code, lino, _end);
×
2500
  }
2501

2502
  size_t pos = strftime(buf, cap, "%Y-%m-%d %H:%M:%S", &ptm);
41,646,278✔
2503
  if (pos == 0) {
41,646,278!
2504
    code = TSDB_CODE_OUT_OF_BUFFER;
×
2505
    TSDB_CHECK_CODE(code, lino, _end);
×
2506
  }
2507
  int32_t nwritten = 0;
41,646,278✔
2508
  if (precision == TSDB_TIME_PRECISION_NANO) {
41,646,278!
2509
    nwritten = snprintf(buf + pos, cap - pos, ".%09d", ms);
×
2510
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
41,646,278!
2511
    nwritten = snprintf(buf + pos, cap - pos, ".%06d", ms);
×
2512
  } else {
2513
    nwritten = snprintf(buf + pos, cap - pos, ".%03d", ms);
41,646,278✔
2514
  }
2515

2516
  if (nwritten >= cap - pos) {
41,646,278!
2517
    code = TSDB_CODE_OUT_OF_BUFFER;
×
2518
    TSDB_CHECK_CODE(code, lino, _end);
×
2519
  }
2520

2521
_end:
41,646,278✔
2522
  if (code != TSDB_CODE_SUCCESS) {
41,646,278!
2523
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2524
  }
2525
  return code;
41,637,028✔
2526
}
2527

2528
// for debug
2529
int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, const char* taskIdStr) {
106,041✔
2530
  int32_t lino = 0;
106,041✔
2531
  int32_t size = 2048 * 1024;
106,041✔
2532
  int32_t code = 0;
106,041✔
2533
  char*   dumpBuf = NULL;
106,041✔
2534
  char    pBuf[TD_TIME_STR_LEN] = {0};
106,041✔
2535
  int32_t rows = pDataBlock->info.rows;
106,041✔
2536
  int32_t len = 0;
106,041✔
2537

2538
  dumpBuf = taosMemoryCalloc(size, 1);
106,041✔
2539
  if (dumpBuf == NULL) {
106,033!
2540
    return terrno;
×
2541
  }
2542

2543
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
106,033✔
2544
  len += tsnprintf(dumpBuf + len, size - len,
212,074✔
2545
                  "%s===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64 "|rows:%" PRId64
2546
                  "|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
2547
                  taskIdStr, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId,
106,033✔
2548
                  pDataBlock->info.id.groupId, pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
2549
                  pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
106,033✔
2550
  if (len >= size - 1) {
106,041!
2551
    goto _exit;
×
2552
  }
2553

2554
  for (int32_t j = 0; j < rows; j++) {
35,723,533✔
2555
    len += tsnprintf(dumpBuf + len, size - len, "%s|", flag);
35,619,333✔
2556
    if (len >= size - 1) {
35,811,802!
2557
      goto _exit;
×
2558
    }
2559

2560
    for (int32_t k = 0; k < colNum; k++) {
408,407,764✔
2561
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
373,029,849✔
2562
      if (pColInfoData == NULL) {
371,337,989!
2563
        code = terrno;
×
2564
        lino = __LINE__;
×
2565
        goto _exit;
×
2566
      }
2567

2568
      if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) {
744,659,394!
2569
        len += tsnprintf(dumpBuf + len, size - len, " %15s |", "NULL");
24,199,917✔
2570
        if (len >= size - 1) goto _exit;
23,394,354!
2571
        continue;
23,394,354✔
2572
      }
2573

2574
      void* var = colDataGetData(pColInfoData, j);
348,129,780!
2575
      switch (pColInfoData->info.type) {
348,129,780!
2576
        case TSDB_DATA_TYPE_TIMESTAMP:
41,511,398✔
2577
          memset(pBuf, 0, sizeof(pBuf));
41,511,398✔
2578
          code = formatTimestamp(pBuf, sizeof(pBuf), *(uint64_t*)var, pColInfoData->info.precision);
41,511,398✔
2579
          if (code != TSDB_CODE_SUCCESS) {
41,634,601!
2580
            TAOS_UNUSED(tsnprintf(pBuf, sizeof(pBuf), "NaN"));
×
2581
          }
2582
          len += tsnprintf(dumpBuf + len, size - len, " %25s |", pBuf);
41,634,601✔
2583
          if (len >= size - 1) goto _exit;
41,617,254!
2584
          break;
41,617,254✔
2585
        case TSDB_DATA_TYPE_TINYINT:
57,896✔
2586
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(int8_t*)var);
57,896✔
2587
          if (len >= size - 1) goto _exit;
57,901!
2588
          break;
57,901✔
2589
        case TSDB_DATA_TYPE_UTINYINT:
47,608✔
2590
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(uint8_t*)var);
47,608✔
2591
          if (len >= size - 1) goto _exit;
47,610!
2592
          break;
47,610✔
2593
        case TSDB_DATA_TYPE_SMALLINT:
29,964,918✔
2594
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(int16_t*)var);
29,964,918✔
2595
          if (len >= size - 1) goto _exit;
30,035,674!
2596
          break;
30,035,674✔
2597
        case TSDB_DATA_TYPE_USMALLINT:
31,228✔
2598
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(uint16_t*)var);
31,228✔
2599
          if (len >= size - 1) goto _exit;
31,230!
2600
          break;
31,230✔
2601
        case TSDB_DATA_TYPE_INT:
75,605,552✔
2602
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var);
75,605,552✔
2603
          if (len >= size - 1) goto _exit;
75,806,256!
2604
          break;
75,806,256✔
2605
        case TSDB_DATA_TYPE_UINT:
31,319✔
2606
          len += tsnprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var);
31,319✔
2607
          if (len >= size - 1) goto _exit;
31,321!
2608
          break;
31,321✔
2609
        case TSDB_DATA_TYPE_BIGINT:
70,575,282✔
2610
          len += tsnprintf(dumpBuf + len, size - len, " %15" PRId64 " |", *(int64_t*)var);
70,575,282✔
2611
          if (len >= size - 1) goto _exit;
70,764,101!
2612
          break;
70,764,101✔
2613
        case TSDB_DATA_TYPE_UBIGINT:
62,462✔
2614
          len += tsnprintf(dumpBuf + len, size - len, " %15" PRIu64 " |", *(uint64_t*)var);
62,462✔
2615
          if (len >= size - 1) goto _exit;
62,470!
2616
          break;
62,470✔
2617
        case TSDB_DATA_TYPE_FLOAT:
3,189,267✔
2618
          len += tsnprintf(dumpBuf + len, size - len, " %15f |", *(float*)var);
3,189,267✔
2619
          if (len >= size - 1) goto _exit;
3,197,949!
2620
          break;
3,197,949✔
2621
        case TSDB_DATA_TYPE_DOUBLE:
29,588,023✔
2622
          len += tsnprintf(dumpBuf + len, size - len, " %15f |", *(double*)var);
29,588,023✔
2623
          if (len >= size - 1) goto _exit;
29,641,203!
2624
          break;
29,641,203✔
2625
        case TSDB_DATA_TYPE_BOOL:
1,461✔
2626
          len += tsnprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var);
1,461✔
2627
          if (len >= size - 1) goto _exit;
1,461!
2628
          break;
1,461✔
2629
        case TSDB_DATA_TYPE_VARCHAR:
77,720,379✔
2630
        case TSDB_DATA_TYPE_VARBINARY:
2631
        case TSDB_DATA_TYPE_GEOMETRY: {
2632
          memset(pBuf, 0, sizeof(pBuf));
77,720,379✔
2633
          char*   pData = colDataGetVarData(pColInfoData, j);
77,720,379✔
2634
          int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
77,720,379✔
2635
          dataSize = TMIN(dataSize, 50);
77,720,379✔
2636
          memcpy(pBuf, varDataVal(pData), dataSize);
77,720,379✔
2637
          len += tsnprintf(dumpBuf + len, size - len, " %15s |", pBuf);
77,720,379✔
2638
          if (len >= size - 1) goto _exit;
78,191,201!
2639
        } break;
78,191,201✔
2640
        case TSDB_DATA_TYPE_NCHAR: {
29,186,619✔
2641
          char*   pData = colDataGetVarData(pColInfoData, j);
29,186,619✔
2642
          int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
29,186,619✔
2643
          memset(pBuf, 0, sizeof(pBuf));
29,186,619✔
2644
          code = taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf);
29,186,619✔
2645
          if (code < 0) {
29,231,333!
2646
            uError("func %s failed to convert to ucs charset since %s", __func__, tstrerror(code));
×
2647
            lino = __LINE__;
×
2648
            goto _exit;
×
2649
          } else { // reset the length value
2650
            code = TSDB_CODE_SUCCESS;
29,231,333✔
2651
          }
2652
          len += tsnprintf(dumpBuf + len, size - len, " %15s |", pBuf);
29,231,333✔
2653
          if (len >= size - 1) goto _exit;
29,159,609!
2654
        } break;
29,159,609✔
2655
      }
2656
    }
2657
    len += tsnprintf(dumpBuf + len, size - len, "%d\n", j);
35,377,915✔
2658
    if (len >= size - 1) goto _exit;
35,617,492!
2659
  }
2660
  len += tsnprintf(dumpBuf + len, size - len, "%s |end\n", flag);
104,200✔
2661

2662
_exit:
106,037✔
2663
  if (code == TSDB_CODE_SUCCESS) {
106,037!
2664
    *pDataBuf = dumpBuf;
106,038✔
2665
    dumpBuf = NULL;
106,038✔
2666
  } else {
2667
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2668
    if (dumpBuf) {
×
2669
      taosMemoryFree(dumpBuf);
×
2670
    }
2671
  }
2672
  return code;
106,037✔
2673
}
2674

2675
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, const STSchema* pTSchema,
625✔
2676
                                    int64_t uid, int32_t vgId, tb_uid_t suid) {
2677
  SSubmitReq2* pReq = *ppReq;
625✔
2678
  SArray*      pVals = NULL;
625✔
2679
  int32_t      sz = 1;
625✔
2680
  int32_t      code = 0;
625✔
2681
  *ppReq = NULL;
625✔
2682
  terrno = 0;
625✔
2683

2684
  if (NULL == pReq) {
625!
2685
    if (!(pReq = taosMemoryMalloc(sizeof(SSubmitReq2)))) {
625!
2686
      code = terrno;
×
2687
      goto _end;
×
2688
    }
2689

2690
    if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
625!
2691
      code = terrno;
×
2692
      goto _end;
×
2693
    }
2694
  }
2695

2696
  for (int32_t i = 0; i < sz; ++i) {
1,250✔
2697
    int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
625✔
2698
    int32_t rows = pDataBlock->info.rows;
625✔
2699

2700
    if (colNum <= 1) {  // invalid if only with TS col
625!
2701
      continue;
×
2702
    }
2703

2704
    // the rsma result should has the same column number with schema.
2705
    if (colNum != pTSchema->numOfCols) {
625!
2706
      uError("colNum %d is not equal to numOfCols %d", colNum, pTSchema->numOfCols);
×
2707
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2708
      goto _end;
×
2709
    }
2710

2711
    SSubmitTbData tbData = {0};
625✔
2712

2713
    if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
625!
2714
      code = terrno;
×
2715
      goto _end;
×
2716
    }
2717

2718
    tbData.suid = suid;
625✔
2719
    tbData.uid = uid;
625✔
2720
    tbData.sver = pTSchema->version;
625✔
2721

2722
    if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
625!
2723
      code = terrno;
×
2724
      taosArrayDestroy(tbData.aRowP);
×
2725
      goto _end;
×
2726
    }
2727

2728
    for (int32_t j = 0; j < rows; ++j) {  // iterate by row
1,250✔
2729

2730
      taosArrayClear(pVals);
625✔
2731

2732
      bool    isStartKey = false;
625✔
2733
      int32_t offset = 0;
625✔
2734
      for (int32_t k = 0; k < colNum; ++k) {  // iterate by column
3,104✔
2735
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
2,479✔
2736
        if (pColInfoData == NULL) {
2,479!
2737
          return terrno;
×
2738
        }
2739

2740
        const STColumn*  pCol = &pTSchema->columns[k];
2,479✔
2741
        void*            var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
2,479✔
2742

2743
        switch (pColInfoData->info.type) {
2,479!
2744
          case TSDB_DATA_TYPE_TIMESTAMP:
625✔
2745
            if (pColInfoData->info.type != pCol->type) {
625!
2746
              uError("colType:%d mismatch with sechma colType:%d", pColInfoData->info.type, pCol->type);
×
2747
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2748
              return terrno;
×
2749
            }
2750
            if (!isStartKey) {
625!
2751
              isStartKey = true;
625✔
2752
              if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId) {
625!
2753
                uError("the first timestamp colId %d is not primary colId", pCol->colId);
×
2754
                terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2755
                return terrno;
×
2756
              }
2757
              SColVal cv = COL_VAL_VALUE(pCol->colId, ((SValue){.type = pCol->type, .val = *(TSKEY*)var}));
625✔
2758
              void*   px = taosArrayPush(pVals, &cv);
625✔
2759
              if (px == NULL) {
625!
2760
                return terrno;
×
2761
              }
2762

2763
            } else if (colDataIsNull_s(pColInfoData, j)) {
×
2764
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
2765
              void*   px = taosArrayPush(pVals, &cv);
×
2766
              if (px == NULL) {
×
2767
                return terrno;
×
2768
              }
2769
            } else {
2770
              SColVal cv = COL_VAL_VALUE(pCol->colId, ((SValue){.type = pCol->type, .val = *(int64_t*)var}));
×
2771
              void*   px = taosArrayPush(pVals, &cv);
×
2772
              if (px == NULL) {
×
2773
                return terrno;
×
2774
              }
2775
            }
2776
            break;
625✔
2777
          case TSDB_DATA_TYPE_NCHAR:
×
2778
          case TSDB_DATA_TYPE_VARBINARY:
2779
          case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
2780
            if (pColInfoData->info.type != pCol->type) {
×
2781
              uError("colType:%d mismatch with sechma colType:%d", pColInfoData->info.type, pCol->type);
×
2782
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2783
              return terrno;
×
2784
            }
2785
            if (colDataIsNull_s(pColInfoData, j)) {
×
2786
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
2787
              void* px = taosArrayPush(pVals, &cv);
×
2788
              if (px == NULL) {
×
2789
                goto _end;
×
2790
              }
2791
            } else {
2792
              void*  data = colDataGetVarData(pColInfoData, j);
×
2793
              SValue sv = (SValue){
×
2794
                  .type = pCol->type, .nData = varDataLen(data), .pData = (uint8_t*) varDataVal(data)};  // address copy, no value
×
2795
              SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
×
2796
              void* px = taosArrayPush(pVals, &cv);
×
2797
              if (px == NULL) {
×
2798
                code = terrno;
×
2799
                goto _end;
×
2800
              }
2801
            }
2802
            break;
×
2803
          }
2804
          case TSDB_DATA_TYPE_DECIMAL:
×
2805
          case TSDB_DATA_TYPE_BLOB:
2806
          case TSDB_DATA_TYPE_JSON:
2807
          case TSDB_DATA_TYPE_MEDIUMBLOB:
2808
            uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
×
2809
            terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2810
            return terrno;
×
2811
            break;
2812
          default:
1,854✔
2813
            if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
1,854!
2814
              if (colDataIsNull_s(pColInfoData, j)) {
3,708✔
2815
                SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);  // should use pCol->type
17✔
2816
                void* px = taosArrayPush(pVals, &cv);
17✔
2817
                if (px == NULL) {
17!
2818
                  goto _end;
×
2819
                }
2820
              } else {
2821
                SValue sv = {.type = pCol->type};
1,837✔
2822
                if (pCol->type == pColInfoData->info.type) {
1,837✔
2823
                  memcpy(&sv.val, var, tDataTypes[pCol->type].bytes);
608✔
2824
                } else {
2825
                  /**
2826
                   *  1. sum/avg would convert to int64_t/uint64_t/double during aggregation
2827
                   *  2. below conversion may lead to overflow or loss, the app should select the right data type.
2828
                   */
2829
                  char tv[8] = {0};
1,229✔
2830
                  if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) {
1,229!
2831
                    float v = 0;
×
2832
                    GET_TYPED_DATA(v, float, pColInfoData->info.type, var);
×
2833
                    SET_TYPED_DATA(&tv, pCol->type, v);
×
2834
                  } else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) {
1,229!
2835
                    double v = 0;
1,229✔
2836
                    GET_TYPED_DATA(v, double, pColInfoData->info.type, var);
1,229!
2837
                    SET_TYPED_DATA(&tv, pCol->type, v);
1,229!
2838
                  } else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) {
×
2839
                    int64_t v = 0;
×
2840
                    GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var);
×
2841
                    SET_TYPED_DATA(&tv, pCol->type, v);
×
2842
                  } else {
2843
                    uint64_t v = 0;
×
2844
                    GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var);
×
2845
                    SET_TYPED_DATA(&tv, pCol->type, v);
×
2846
                  }
2847
                  memcpy(&sv.val, tv, tDataTypes[pCol->type].bytes);
1,229✔
2848
                }
2849
                SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
1,837✔
2850
                void* px = taosArrayPush(pVals, &cv);
1,837✔
2851
                if (px == NULL) {
1,837!
2852
                  code = terrno;
×
2853
                  goto _end;
×
2854
                }
2855
              }
2856
            } else {
2857
              uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
×
2858
              terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2859
              return terrno;
×
2860
            }
2861
            break;
1,854✔
2862
        }
2863
      }
2864
      SRow* pRow = NULL;
625✔
2865
      if ((code = tRowBuild(pVals, pTSchema, &pRow)) < 0) {
625!
2866
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
×
2867
        goto _end;
×
2868
      }
2869

2870
      void* px = taosArrayPush(tbData.aRowP, &pRow);
625✔
2871
      if (px == NULL) {
625!
2872
        code = terrno;
×
2873
        goto _end;
×
2874
      }
2875
    }
2876

2877
    void* px = taosArrayPush(pReq->aSubmitTbData, &tbData);
625✔
2878
    if (px == NULL) {
625!
2879
      code = terrno;
×
2880
      goto _end;
×
2881
    }
2882
  }
2883

2884
_end:
625✔
2885
  taosArrayDestroy(pVals);
625✔
2886
  if (code != 0) {
625!
2887
    if (pReq) {
×
2888
      tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
×
2889
      taosMemoryFreeClear(pReq);
×
2890
    }
2891
  } else {
2892
    *ppReq = pReq;
625✔
2893
  }
2894

2895
  return code;
625✔
2896
}
2897

2898
// Construct the child table name in the form of <ctbName>_<stbName>_<groupId> and store it in `ctbName`.
2899
// If the name length exceeds TSDB_TABLE_NAME_LEN, first convert <stbName>_<groupId> to an MD5 value and then
2900
// concatenate. If the length is still too long, convert <ctbName> to an MD5 value as well.
2901
int32_t buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId, size_t cap) {
1,367✔
2902
  int32_t   code = TSDB_CODE_SUCCESS;
1,367✔
2903
  int32_t   lino = 0;
1,367✔
2904
  char      tmp[TSDB_TABLE_NAME_LEN] = {0};
1,367✔
2905
  char*     suffix = tmp;
1,367✔
2906
  size_t    suffixCap = sizeof(tmp);
1,367✔
2907
  size_t    suffixLen = 0;
1,367✔
2908
  size_t    prefixLen = 0;
1,367✔
2909
  T_MD5_CTX context;
2910

2911
  if (ctbName == NULL || cap < TSDB_TABLE_NAME_LEN) {
1,367!
2912
    code = TSDB_CODE_INTERNAL_ERROR;
×
2913
    TSDB_CHECK_CODE(code, lino, _end);
×
2914
  }
2915

2916
  prefixLen = strlen(ctbName);
1,367✔
2917

2918
  if (stbName == NULL) {
1,367!
2919
    suffixLen = snprintf(suffix, suffixCap, "%" PRIu64, groupId);
×
2920
    if (suffixLen >= suffixCap) {
×
2921
      code = TSDB_CODE_INTERNAL_ERROR;
×
2922
      TSDB_CHECK_CODE(code, lino, _end);
×
2923
    }
2924
  } else {
2925
    int32_t i = strlen(stbName) - 1;
1,367✔
2926
    for (; i >= 0; i--) {
38,898!
2927
      if (stbName[i] == '.') {
38,898✔
2928
        break;
1,367✔
2929
      }
2930
    }
2931
    suffixLen = snprintf(suffix, suffixCap, "%s_%" PRIu64, stbName + i + 1, groupId);
1,367✔
2932
    if (suffixLen >= suffixCap) {
1,367!
2933
      suffixCap = suffixLen + 1;
×
2934
      suffix = taosMemoryMalloc(suffixCap);
×
2935
      TSDB_CHECK_NULL(suffix, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
×
2936
      suffixLen = snprintf(suffix, suffixCap, "%s_%" PRIu64, stbName + i + 1, groupId);
×
2937
      if (suffixLen >= suffixCap) {
×
2938
        code = TSDB_CODE_INTERNAL_ERROR;
×
2939
        TSDB_CHECK_CODE(code, lino, _end);
×
2940
      }
2941
    }
2942
  }
2943

2944
  if (prefixLen + suffixLen + 1 >= TSDB_TABLE_NAME_LEN) {
1,367!
2945
    // If the name length exceeeds the limit, convert the suffix to MD5 value.
2946
    tMD5Init(&context);
×
2947
    tMD5Update(&context, (uint8_t*)suffix, suffixLen);
×
2948
    tMD5Final(&context);
×
2949
    suffixLen = snprintf(suffix, suffixCap, "%016" PRIx64 "%016" PRIx64, *(uint64_t*)context.digest,
×
2950
                         *(uint64_t*)(context.digest + 8));
×
2951
    if (suffixLen >= suffixCap) {
×
2952
      code = TSDB_CODE_INTERNAL_ERROR;
×
2953
      TSDB_CHECK_CODE(code, lino, _end);
×
2954
    }
2955
  }
2956

2957
  if (prefixLen + suffixLen + 1 >= TSDB_TABLE_NAME_LEN) {
1,367!
2958
    // If the name is still too long, convert the ctbName to MD5 value.
2959
    tMD5Init(&context);
×
2960
    tMD5Update(&context, (uint8_t*)ctbName, prefixLen);
×
2961
    tMD5Final(&context);
×
2962
    prefixLen = snprintf(ctbName, cap, "t_%016" PRIx64 "%016" PRIx64, *(uint64_t*)context.digest,
×
2963
                         *(uint64_t*)(context.digest + 8));
×
2964
    if (prefixLen >= cap) {
×
2965
      code = TSDB_CODE_INTERNAL_ERROR;
×
2966
      TSDB_CHECK_CODE(code, lino, _end);
×
2967
    }
2968
  }
2969

2970
  if (prefixLen + suffixLen + 1 >= TSDB_TABLE_NAME_LEN) {
1,367!
2971
    code = TSDB_CODE_INTERNAL_ERROR;
×
2972
    TSDB_CHECK_CODE(code, lino, _end);
×
2973
  }
2974

2975
  ctbName[prefixLen] = '_';
1,367✔
2976
  tstrncpy(&ctbName[prefixLen + 1], suffix, cap - prefixLen - 1);
1,367✔
2977

2978
  for (char* p = ctbName; *p; ++p) {
128,238✔
2979
    if (*p == '.') *p = '_';
126,871✔
2980
  }
2981

2982
_end:
1,367✔
2983
  if (code != TSDB_CODE_SUCCESS) {
1,367!
2984
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2985
  }
2986
  if (suffix != tmp) {
1,367!
2987
    taosMemoryFree(suffix);
×
2988
  }
2989
  return code;
1,367✔
2990
}
2991

2992
// auto stream subtable name starts with 't_', followed by the first segment of MD5 digest for group vals.
2993
// the total length is fixed to be 34 bytes.
2994
bool isAutoTableName(char* ctbName) { return (strlen(ctbName) == 34 && ctbName[0] == 't' && ctbName[1] == '_'); }
3,616!
2995

2996
bool alreadyAddGroupId(char* ctbName, int64_t groupId) {
2,085✔
2997
  char tmp[64] = {0};
2,085✔
2998
  snprintf(tmp, sizeof(tmp), "%" PRIu64, groupId);
2,085✔
2999
  size_t len1 = strlen(ctbName);
2,085✔
3000
  size_t len2 = strlen(tmp);
2,085✔
3001
  if (len1 < len2) return false;
2,085✔
3002
  return memcmp(ctbName + len1 - len2, tmp, len2) == 0;
1,975✔
3003
}
3004

3005
int32_t buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId, char** pName) {
488✔
3006
  QRY_PARAM_CHECK(pName);
488!
3007

3008
  char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
488✔
3009
  if (!pBuf) {
488!
3010
    return terrno;
×
3011
  }
3012

3013
  int32_t code = buildCtbNameByGroupIdImpl(stbFullName, groupId, pBuf);
488✔
3014
  if (code != TSDB_CODE_SUCCESS) {
488!
3015
    taosMemoryFree(pBuf);
×
3016
  } else {
3017
    *pName = pBuf;
488✔
3018
  }
3019

3020
  return code;
488✔
3021
}
3022

3023
int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, char* cname) {
1,189✔
3024
  if (stbFullName[0] == 0) {
1,189!
3025
    return TSDB_CODE_INVALID_PARA;
×
3026
  }
3027

3028
  SArray* tags = taosArrayInit(0, sizeof(SSmlKv));
1,189✔
3029
  if (tags == NULL) {
1,189!
3030
    return terrno;
×
3031
  }
3032

3033
  if (cname == NULL) {
1,189!
3034
    taosArrayDestroy(tags);
×
3035
    return TSDB_CODE_INVALID_PARA;
×
3036
  }
3037

3038
  int8_t      type = TSDB_DATA_TYPE_UBIGINT;
1,189✔
3039
  const char* name = "group_id";
1,189✔
3040
  int32_t     len = strlen(name);
1,189✔
3041

3042
  SSmlKv pTag = {.key = name, .keyLen = len, .type = type, .u = groupId, .length = sizeof(uint64_t)};
1,189✔
3043
  void*  px = taosArrayPush(tags, &pTag);
1,189✔
3044
  if (px == NULL) {
1,189!
3045
    return terrno;
×
3046
  }
3047

3048
  RandTableName rname = {
1,189✔
3049
      .tags = tags, .stbFullName = stbFullName, .stbFullNameLen = strlen(stbFullName), .ctbShortName = cname};
1,189✔
3050

3051
  int32_t code = buildChildTableName(&rname);
1,189✔
3052
  if (code != TSDB_CODE_SUCCESS) {
1,189!
3053
    return code;
×
3054
  }
3055

3056
  taosArrayDestroy(tags);
1,189✔
3057
  if ((rname.ctbShortName && rname.ctbShortName[0]) == 0) {
1,189!
3058
    return TSDB_CODE_INVALID_PARA;
×
3059
  }
3060

3061
  return code;
1,189✔
3062
}
3063

3064
// return length of encoded data, return -1 if failed
3065
int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, int32_t numOfCols) {
5,890,291✔
3066
  int32_t code = blockDataCheck(pBlock);
5,890,291✔
3067
  if (code != TSDB_CODE_SUCCESS) {
5,848,125!
3068
    terrno = code;
×
3069
    return -1;
×
3070
  }
3071

3072
  int32_t dataLen = 0;
5,852,929✔
3073

3074
  // todo extract method
3075
  int32_t* version = (int32_t*)data;
5,852,929✔
3076
  *version = BLOCK_VERSION_1;
5,852,929✔
3077
  data += sizeof(int32_t);
5,852,929✔
3078

3079
  int32_t* actualLen = (int32_t*)data;
5,852,929✔
3080
  data += sizeof(int32_t);
5,852,929✔
3081

3082
  int32_t* rows = (int32_t*)data;
5,852,929✔
3083
  *rows = pBlock->info.rows;
5,852,929✔
3084
  data += sizeof(int32_t);
5,852,929✔
3085
  if (*rows <= 0) {
5,852,929!
3086
    uError("Invalid rows %d in block", *rows);
×
3087
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3088
    return -1;
×
3089
  }
3090

3091
  int32_t* cols = (int32_t*)data;
5,852,929✔
3092
  *cols = numOfCols;
5,852,929✔
3093
  data += sizeof(int32_t);
5,852,929✔
3094

3095
  // flag segment.
3096
  // the inital bit is for column info
3097
  int32_t* flagSegment = (int32_t*)data;
5,852,929✔
3098
  *flagSegment = (1 << 31);
5,852,929✔
3099

3100
  data += sizeof(int32_t);
5,852,929✔
3101

3102
  uint64_t* groupId = (uint64_t*)data;
5,852,929✔
3103
  data += sizeof(uint64_t);
5,852,929✔
3104

3105
  for (int32_t i = 0; i < numOfCols; ++i) {
35,600,987✔
3106
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
29,749,577✔
3107
    if (pColInfoData == NULL) {
29,748,058!
3108
      return -1;
×
3109
    }
3110

3111
    *((int8_t*)data) = pColInfoData->info.type;
29,748,058✔
3112
    data += sizeof(int8_t);
29,748,058✔
3113

3114
    *((int32_t*)data) = pColInfoData->info.bytes;
29,748,058✔
3115
    data += sizeof(int32_t);
29,748,058✔
3116
  }
3117

3118
  int32_t* colSizes = (int32_t*)data;
5,851,410✔
3119
  data += numOfCols * sizeof(int32_t);
5,851,410✔
3120

3121
  dataLen = blockDataGetSerialMetaSize(numOfCols);
5,851,410✔
3122

3123
  int32_t numOfRows = pBlock->info.rows;
5,843,551✔
3124
  for (int32_t col = 0; col < numOfCols; ++col) {
35,362,609✔
3125
    SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);
29,511,025✔
3126
    if (pColRes == NULL) {
29,470,495!
3127
      return -1;
×
3128
    }
3129

3130
    // copy the null bitmap
3131
    size_t metaSize = 0;
29,470,495✔
3132
    if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
29,470,495!
3133
      metaSize = numOfRows * sizeof(int32_t);
6,630,907✔
3134
      if(dataLen + metaSize > dataBuflen) goto _exit;
6,630,907!
3135
      memcpy(data, pColRes->varmeta.offset, metaSize);
6,630,907✔
3136
    } else {
3137
      metaSize = BitmapLen(numOfRows);
22,839,588✔
3138
      if(dataLen + metaSize > dataBuflen) goto _exit;
22,839,588!
3139
      memcpy(data, pColRes->nullbitmap, metaSize);
22,839,588✔
3140
    }
3141

3142
    data += metaSize;
29,470,495✔
3143
    dataLen += metaSize;
29,470,495✔
3144

3145
    if (pColRes->reassigned && IS_VAR_DATA_TYPE(pColRes->info.type)) {
29,470,495!
3146
      colSizes[col] = 0;
×
3147
      for (int32_t row = 0; row < numOfRows; ++row) {
×
3148
        char*   pColData = pColRes->pData + pColRes->varmeta.offset[row];
×
3149
        int32_t colSize = 0;
×
3150
        if (pColRes->info.type == TSDB_DATA_TYPE_JSON) {
×
3151
          colSize = getJsonValueLen(pColData);
×
3152
        } else {
3153
          colSize = varDataTLen(pColData);
×
3154
        }
3155
        colSizes[col] += colSize;
×
3156
        dataLen += colSize;
×
3157
        if(dataLen > dataBuflen) goto _exit;
×
3158
        (void) memmove(data, pColData, colSize);
×
3159
        data += colSize;
×
3160
      }
3161
    } else {
3162
      colSizes[col] = colDataGetLength(pColRes, numOfRows);
29,470,495✔
3163
      dataLen += colSizes[col];
29,519,058✔
3164
      if(dataLen > dataBuflen) goto _exit;
29,519,058!
3165
      if (pColRes->pData != NULL) {
29,519,058✔
3166
        (void) memmove(data, pColRes->pData, colSizes[col]);
28,913,883✔
3167
      }
3168
      data += colSizes[col];
29,519,058✔
3169
    }
3170

3171
    if (colSizes[col] <= 0 && !colDataIsNull_s(pColRes, 0) && pColRes->info.type != TSDB_DATA_TYPE_NULL) {
30,178,755!
3172
      uError("Invalid colSize:%d colIdx:%d colType:%d while encoding block", colSizes[col], col, pColRes->info.type);
×
3173
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3174
      return -1;
×
3175
    }
3176
    
3177
    colSizes[col] = htonl(colSizes[col]);
29,519,058✔
3178
    //    uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type,
3179
    //    htonl(colSizes[col]), colSizes[col]);
3180
  }
3181

3182
  bool* blankFill = (bool*)data;
5,851,584✔
3183
  *blankFill = pBlock->info.blankFill;
5,851,584✔
3184
  data += sizeof(bool);
5,851,584✔
3185

3186
  *actualLen = dataLen;
5,851,584✔
3187
  *groupId = pBlock->info.id.groupId;
5,851,584✔
3188
  if (dataLen > dataBuflen) goto _exit;
5,851,584!
3189

3190
  return dataLen;
5,851,584✔
3191

3192
_exit:
×
3193
  uError("blockEncode dataLen:%d, dataBuflen:%zu", dataLen, dataBuflen);
×
3194
  terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3195
  return -1;
×
3196
}
3197

3198
int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos) {
3,861,086✔
3199
  const char* pStart = pData;
3,861,086✔
3200

3201
  int32_t version = *(int32_t*)pStart;
3,861,086✔
3202
  pStart += sizeof(int32_t);
3,861,086✔
3203

3204
  // total length sizeof(int32_t)
3205
  int32_t dataLen = *(int32_t*)pStart;
3,861,086✔
3206
  pStart += sizeof(int32_t);
3,861,086✔
3207

3208
  // total rows sizeof(int32_t)
3209
  int32_t numOfRows = *(int32_t*)pStart;
3,861,086✔
3210
  pStart += sizeof(int32_t);
3,861,086✔
3211
  if (numOfRows <= 0) {
3,861,086!
3212
    uError("block decode numOfRows:%d error", numOfRows);
×
3213
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3214
    return terrno;
×
3215
  }
3216

3217
  // total columns sizeof(int32_t)
3218
  int32_t numOfCols = *(int32_t*)pStart;
3,861,086✔
3219
  pStart += sizeof(int32_t);
3,861,086✔
3220

3221
  // has column info segment
3222
  int32_t flagSeg = *(int32_t*)pStart;
3,861,086✔
3223
  int32_t hasColumnInfo = (flagSeg >> 31);
3,861,086✔
3224
  pStart += sizeof(int32_t);
3,861,086✔
3225

3226
  // group id sizeof(uint64_t)
3227
  pBlock->info.id.groupId = *(uint64_t*)pStart;
3,861,086✔
3228
  pStart += sizeof(uint64_t);
3,861,086✔
3229

3230
  if (pBlock->pDataBlock == NULL) {
3,861,086✔
3231
    pBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols);
26,697✔
3232
    if (pBlock->pDataBlock == NULL) {
26,698!
UNCOV
3233
      return terrno;
×
3234
    }
3235
  }
3236

3237
  for (int32_t i = 0; i < numOfCols; ++i) {
25,003,334✔
3238
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
21,142,527✔
3239
    if (pColInfoData == NULL) {
21,142,301✔
3240
      return terrno;
82✔
3241
    }
3242

3243
    pColInfoData->info.type = *(int8_t*)pStart;
21,142,219✔
3244
    pStart += sizeof(int8_t);
21,142,219✔
3245

3246
    pColInfoData->info.bytes = *(int32_t*)pStart;
21,142,219✔
3247
    pStart += sizeof(int32_t);
21,142,219✔
3248

3249
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
21,142,219✔
3250
      pBlock->info.hasVarCol = true;
4,734,434✔
3251
    }
3252
  }
3253

3254
  int32_t code = blockDataEnsureCapacity(pBlock, numOfRows);
3,860,807✔
3255
  if (code) {
3,861,422!
3256
    return code;
×
3257
  }
3258

3259
  int32_t* colLen = (int32_t*)pStart;
3,861,422✔
3260
  pStart += sizeof(int32_t) * numOfCols;
3,861,422✔
3261

3262
  for (int32_t i = 0; i < numOfCols; ++i) {
25,004,257✔
3263
    colLen[i] = htonl(colLen[i]);
21,139,830✔
3264
    if (colLen[i] < 0) {
21,139,830!
3265
      uError("block decode colLen:%d error, colIdx:%d", colLen[i], i);
×
3266
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3267
      return terrno;
×
3268
    }
3269

3270
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
21,139,830✔
3271
    if (pColInfoData == NULL) {
21,138,994!
3272
      return terrno;
×
3273
    }
3274

3275
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
21,141,838✔
3276
      memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows);
4,734,173✔
3277
      pStart += sizeof(int32_t) * numOfRows;
4,734,173✔
3278

3279
      if (colLen[i] > 0 && pColInfoData->varmeta.allocLen < colLen[i]) {
4,734,173✔
3280
        char* tmp = taosMemoryRealloc(pColInfoData->pData, colLen[i]);
2,499,090✔
3281
        if (tmp == NULL) {
2,500,061!
UNCOV
3282
          return terrno;
×
3283
        }
3284

3285
        pColInfoData->pData = tmp;
2,500,087✔
3286
        pColInfoData->varmeta.allocLen = colLen[i];
2,500,087✔
3287
      }
3288

3289
      pColInfoData->varmeta.length = colLen[i];
4,735,170✔
3290
    } else {
3291
      memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
16,407,665✔
3292
      pStart += BitmapLen(numOfRows);
16,407,665✔
3293
    }
3294

3295
    // TODO
3296
    // setting this flag to true temporarily so aggregate function on stable will
3297
    // examine NULL value for non-primary key column
3298
    pColInfoData->hasNull = true;
21,142,835✔
3299

3300
    if (colLen[i] > 0) {
21,142,835✔
3301
      memcpy(pColInfoData->pData, pStart, colLen[i]);
20,529,504✔
3302
    } else if (!colDataIsNull_s(pColInfoData, 0) && pColInfoData->info.type != TSDB_DATA_TYPE_NULL) {
613,331!
3303
      uError("block decode colLen:%d error, colIdx:%d, type:%d", colLen[i], i, pColInfoData->info.type);
×
3304
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3305
      return terrno;
×
3306
    }
3307

3308
    pStart += colLen[i];
21,142,835✔
3309
  }
3310

3311
  bool blankFill = *(bool*)pStart;
3,864,427✔
3312
  pStart += sizeof(bool);
3,864,427✔
3313

3314
  pBlock->info.dataLoad = 1;
3,864,427✔
3315
  pBlock->info.rows = numOfRows;
3,864,427✔
3316
  pBlock->info.blankFill = blankFill;
3,864,427✔
3317
  if (pStart - pData != dataLen) {
3,864,427!
3318
    uError("block decode msg len error, pStart:%p, pData:%p, dataLen:%d", pStart, pData, dataLen);
×
3319
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3320
    return terrno;
×
3321
  }
3322

3323
  *pEndPos = pStart;
3,864,427✔
3324

3325
  code = blockDataCheck(pBlock);
3,864,427✔
3326
  if (code != TSDB_CODE_SUCCESS) {
3,860,683!
UNCOV
3327
    terrno = code;
×
3328
    return code;
×
3329
  }
3330

3331
  return TSDB_CODE_SUCCESS;
3,860,696✔
3332
}
3333

3334
int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) {
456,543✔
3335
  //  int32_t totalRows = pBlock->info.rows;
3336
  int32_t code = 0;
456,543✔
3337
  int32_t bmLen = BitmapLen(totalRows);
456,543✔
3338
  char*   pBitmap = NULL;
456,543✔
3339
  int32_t maxRows = 0;
456,543✔
3340

3341
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
456,543✔
3342
  if (!pBoolList) {
456,600✔
3343
    for (int32_t i = 0; i < numOfCols; ++i) {
1,244,297✔
3344
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1,125,957✔
3345
      // it is a reserved column for scalar function, and no data in this column yet.
3346
      if (pDst->pData == NULL) {
1,125,991✔
3347
        continue;
69,088✔
3348
      }
3349

3350
      int32_t numOfRows = 0;
1,056,903✔
3351
      if (IS_VAR_DATA_TYPE(pDst->info.type)) {
1,056,903!
3352
        pDst->varmeta.length = 0;
163,170✔
3353
      } else {
3354
        memset(pDst->nullbitmap, 0, bmLen);
893,733✔
3355
      }
3356
    }
3357
    return code;
118,340✔
3358
  }
3359

3360
  for (int32_t i = 0; i < numOfCols; ++i) {
1,586,184✔
3361
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1,247,735✔
3362
    // it is a reserved column for scalar function, and no data in this column yet.
3363
    if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) {
1,247,637✔
3364
      continue;
52,284✔
3365
    }
3366

3367
    int32_t numOfRows = 0;
1,195,353✔
3368
    if (IS_VAR_DATA_TYPE(pDst->info.type)) {
1,435,805✔
3369
      int32_t j = 0;
241,230✔
3370
      pDst->varmeta.length = 0;
241,230✔
3371

3372
      while (j < totalRows) {
43,779,099✔
3373
        if (pBoolList[j] == 0) {
43,538,647✔
3374
          j += 1;
33,837,401✔
3375
          continue;
33,837,401✔
3376
        }
3377

3378
        if (colDataIsNull_var(pDst, j)) {
9,701,246✔
3379
          colDataSetNull_var(pDst, numOfRows);
31,414✔
3380
        } else {
3381
          // fix address sanitizer error. p1 may point to memory that will change during realloc of colDataSetVal, first
3382
          // copy it to p2
3383
          char*   p1 = colDataGetVarData(pDst, j);
9,669,832✔
3384
          int32_t len = 0;
9,669,832✔
3385
          if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
9,669,832✔
3386
            len = getJsonValueLen(p1);
44✔
3387
          } else {
3388
            len = varDataTLen(p1);
9,669,788✔
3389
          }
3390

3391
          char* p2 = taosMemoryMalloc(len);
9,669,832✔
3392
          if (p2 == NULL) {
9,670,664!
3393
            return terrno;
×
3394
          }
3395

3396
          memcpy(p2, p1, len);
9,670,664✔
3397
          code = colDataSetVal(pDst, numOfRows, p2, false);
9,670,664✔
3398
          taosMemoryFree(p2);
9,669,332✔
3399
          if (code) {
9,669,054!
3400
            return code;
×
3401
          }
3402
        }
3403
        numOfRows += 1;
9,700,468✔
3404
        j += 1;
9,700,468✔
3405
      }
3406

3407
      if (maxRows < numOfRows) {
240,452✔
3408
        maxRows = numOfRows;
33,934✔
3409
      }
3410
    } else {
3411
      if (pBitmap == NULL) {
954,123✔
3412
        pBitmap = taosMemoryCalloc(1, bmLen);
336,278✔
3413
        if (pBitmap == NULL) {
336,449!
3414
          return terrno;
×
3415
        }
3416
      }
3417

3418
      memcpy(pBitmap, pDst->nullbitmap, bmLen);
955,154✔
3419
      memset(pDst->nullbitmap, 0, bmLen);
955,154✔
3420

3421
      int32_t j = 0;
955,154✔
3422

3423
      switch (pDst->info.type) {
955,154✔
3424
        case TSDB_DATA_TYPE_BIGINT:
583,598✔
3425
        case TSDB_DATA_TYPE_UBIGINT:
3426
        case TSDB_DATA_TYPE_DOUBLE:
3427
        case TSDB_DATA_TYPE_TIMESTAMP:
3428
          while (j < totalRows) {
354,250,716✔
3429
            if (pBoolList[j] == 0) {
353,667,118✔
3430
              j += 1;
243,969,506✔
3431
              continue;
243,969,506✔
3432
            }
3433

3434
            if (colDataIsNull_f(pBitmap, j)) {
109,697,612✔
3435
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
1,718,037✔
3436
            } else {
3437
              ((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j];
107,979,575✔
3438
            }
3439
            numOfRows += 1;
109,697,612✔
3440
            j += 1;
109,697,612✔
3441
          }
3442
          break;
583,598✔
3443
        case TSDB_DATA_TYPE_FLOAT:
262,550✔
3444
        case TSDB_DATA_TYPE_INT:
3445
        case TSDB_DATA_TYPE_UINT:
3446
          while (j < totalRows) {
53,972,514✔
3447
            if (pBoolList[j] == 0) {
53,709,964✔
3448
              j += 1;
34,932,634✔
3449
              continue;
34,932,634✔
3450
            }
3451
            if (colDataIsNull_f(pBitmap, j)) {
18,777,330✔
3452
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
1,097,252✔
3453
            } else {
3454
              ((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j];
17,680,078✔
3455
            }
3456
            numOfRows += 1;
18,777,330✔
3457
            j += 1;
18,777,330✔
3458
          }
3459
          break;
262,550✔
3460
        case TSDB_DATA_TYPE_SMALLINT:
44,784✔
3461
        case TSDB_DATA_TYPE_USMALLINT:
3462
          while (j < totalRows) {
767,888✔
3463
            if (pBoolList[j] == 0) {
723,104✔
3464
              j += 1;
362,138✔
3465
              continue;
362,138✔
3466
            }
3467
            if (colDataIsNull_f(pBitmap, j)) {
360,966✔
3468
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
53,777✔
3469
            } else {
3470
              ((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j];
307,189✔
3471
            }
3472
            numOfRows += 1;
360,966✔
3473
            j += 1;
360,966✔
3474
          }
3475
          break;
44,784✔
3476
        case TSDB_DATA_TYPE_BOOL:
63,536✔
3477
        case TSDB_DATA_TYPE_TINYINT:
3478
        case TSDB_DATA_TYPE_UTINYINT:
3479
          while (j < totalRows) {
1,105,282✔
3480
            if (pBoolList[j] == 0) {
1,041,746✔
3481
              j += 1;
523,314✔
3482
              continue;
523,314✔
3483
            }
3484
            if (colDataIsNull_f(pBitmap, j)) {
518,432✔
3485
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
91,175✔
3486
            } else {
3487
              ((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j];
427,257✔
3488
            }
3489
            numOfRows += 1;
518,432✔
3490
            j += 1;
518,432✔
3491
          }
3492
          break;
63,536✔
3493
      }
3494
    }
3495

3496
    if (maxRows < numOfRows) {
1,195,606✔
3497
      maxRows = numOfRows;
303,756✔
3498
    }
3499
  }
3500

3501
  pBlock->info.rows = maxRows;
338,449✔
3502
  if (pBitmap != NULL) {
338,449✔
3503
    taosMemoryFree(pBitmap);
336,467✔
3504
  }
3505

3506
  return code;
338,481✔
3507
}
3508

3509
int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
5,885,064✔
3510
  return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
5,885,064✔
3511
}
3512

3513
int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
602,399✔
3514
  if (!pDataBlock || !pOrderInfo) return 0;
602,399!
3515
  for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
1,204,637✔
3516
    SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, i);
601,755✔
3517
    if (pOrder == NULL) {
602,294!
3518
      continue;
×
3519
    }
3520

3521
    pOrder->pColData = taosArrayGet(pDataBlock->pDataBlock, pOrder->slotId);
602,294✔
3522
    if (pOrder->pColData == NULL) {
602,188!
3523
      continue;
×
3524
    }
3525

3526
    pOrder->compFn = getKeyComparFunc(pOrder->pColData->info.type, pOrder->order);
602,188✔
3527
  }
3528

3529
  SSDataBlockSortHelper sortHelper = {.orderInfo = pOrderInfo, .pDataBlock = pDataBlock};
602,159✔
3530

3531
  int32_t rowIdx = 0, nextRowIdx = 1;
602,159✔
3532
  for (; rowIdx < pDataBlock->info.rows && nextRowIdx < pDataBlock->info.rows; ++rowIdx, ++nextRowIdx) {
15,808,969✔
3533
    if (dataBlockCompar(&nextRowIdx, &rowIdx, &sortHelper) < 0) {
15,404,020✔
3534
      break;
198,925✔
3535
    }
3536
  }
3537

3538
  return nextRowIdx;
603,874✔
3539
}
3540

3541
#define BLOCK_DATA_CHECK_TRESSA(o)                      \
3542
  if (!(o)) {                                           \
3543
    uError("blockDataCheck failed! line:%d", __LINE__); \
3544
    return TSDB_CODE_INTERNAL_ERROR;                    \
3545
  }
3546
int32_t blockDataCheck(const SSDataBlock* pDataBlock) {
29,948,652✔
3547
  if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER || NULL == pDataBlock || pDataBlock->info.rows == 0) {
29,948,652!
3548
    return TSDB_CODE_SUCCESS;
4,190,684✔
3549
  }
3550

3551
  BLOCK_DATA_CHECK_TRESSA(pDataBlock->info.rows > 0);
25,757,968!
3552
  if (!pDataBlock->info.dataLoad) {
25,757,968✔
3553
    return TSDB_CODE_SUCCESS;
1,245,223✔
3554
  }
3555

3556
  bool isVarType = false;
24,512,745✔
3557
  int32_t colLen = 0;
24,512,745✔
3558
  int32_t nextPos = 0;
24,512,745✔
3559
  int64_t checkRows = 0;
24,512,745✔
3560
  int64_t typeValue = 0;
24,512,745✔
3561
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
24,512,745✔
3562
  for (int32_t i = 0; i < colNum; ++i) {
149,920,517✔
3563
    SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, i);
122,550,896✔
3564
    BLOCK_DATA_CHECK_TRESSA(pCol != NULL);
122,526,234!
3565
    isVarType = IS_VAR_DATA_TYPE(pCol->info.type);
123,168,467!
3566
    checkRows = pDataBlock->info.rows;
123,168,467✔
3567
    if (pCol->info.noData == true) continue;
123,168,467✔
3568

3569
    if (isVarType) {
121,027,377✔
3570
      BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset);
28,095,104!
3571
    } else {
3572
      BLOCK_DATA_CHECK_TRESSA(pCol->nullbitmap);
92,932,273!
3573
    }
3574

3575
    nextPos = -1;
121,027,377✔
3576
    for (int64_t r = 0; r < checkRows; ++r) {
2,147,483,647✔
3577
      if (tsSafetyCheckLevel <= TSDB_SAFETY_CHECK_LEVELL_NORMAL) break;
2,147,483,647✔
3578
      if (!colDataIsNull_s(pCol, r)) {
2,147,483,647✔
3579
        BLOCK_DATA_CHECK_TRESSA(pCol->pData);
2,147,483,647!
3580
        BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.length <= pCol->varmeta.allocLen);
2,147,483,647!
3581

3582
        if (isVarType) {
2,147,483,647✔
3583
          BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.allocLen > 0);
2,147,483,647!
3584
          BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] <= pCol->varmeta.length);
2,147,483,647!
3585
          if (pCol->reassigned) {
2,147,483,647!
3586
            BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] >= 0);
×
3587
          } else if (0 == r || nextPos == -1) {
2,147,483,647✔
3588
            nextPos = pCol->varmeta.offset[r];
22,273,730✔
3589
          } else {
3590
            BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] == nextPos);
2,147,483,647!
3591
          }
3592

3593
          char*   pColData = pCol->pData + pCol->varmeta.offset[r];
2,147,483,647✔
3594
          int32_t colSize = 0;
2,147,483,647✔
3595
          if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
3596
            colLen = getJsonValueLen(pColData);
8,282✔
3597
          } else {
3598
            colLen = varDataTLen(pColData);
2,147,483,647✔
3599
          }
3600

3601
          if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
3602
            BLOCK_DATA_CHECK_TRESSA(colLen >= CHAR_BYTES);
8,282!
3603
          } else {
3604
            BLOCK_DATA_CHECK_TRESSA(colLen >= VARSTR_HEADER_SIZE);
2,147,483,647!
3605
          }
3606
          BLOCK_DATA_CHECK_TRESSA(colLen <= pCol->info.bytes);
2,147,483,647!
3607

3608
          if (pCol->reassigned) {
2,147,483,647!
3609
            BLOCK_DATA_CHECK_TRESSA((pCol->varmeta.offset[r] + colLen) <= pCol->varmeta.length);
×
3610
          } else {
3611
            nextPos += colLen;
2,147,483,647✔
3612
            BLOCK_DATA_CHECK_TRESSA(nextPos <= pCol->varmeta.length);
2,147,483,647!
3613
          }
3614

3615
          typeValue = *(char*)(pCol->pData + pCol->varmeta.offset[r] + colLen - 1);
2,147,483,647✔
3616
        } else {
3617
          if (TSDB_DATA_TYPE_FLOAT == pCol->info.type) {
2,147,483,647✔
3618
            float v = 0;
290,161,178✔
3619
            GET_TYPED_DATA(v, float, pCol->info.type, colDataGetNumData(pCol, r));
290,161,178!
3620
          } else if (TSDB_DATA_TYPE_DOUBLE == pCol->info.type) {
2,147,483,647✔
3621
            double v = 0;
396,049,473✔
3622
            GET_TYPED_DATA(v, double, pCol->info.type, colDataGetNumData(pCol, r));
396,049,473!
3623
          } else {
3624
            GET_TYPED_DATA(typeValue, int64_t, pCol->info.type, colDataGetNumData(pCol, r));
2,147,483,647!
3625
          }
3626
        }
3627
      }
3628
    }
3629
  }
3630

3631
  return TSDB_CODE_SUCCESS;
27,369,621✔
3632
}
3633

3634

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc