• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.08
/source/libs/executor/src/executil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "function.h"
17
#include "functionMgt.h"
18
#include "index.h"
19
#include "os.h"
20
#include "query.h"
21
#include "querynodes.h"
22
#include "tarray.h"
23
#include "tdatablock.h"
24
#include "thash.h"
25
#include "tmsg.h"
26
#include "ttime.h"
27

28
#include "executil.h"
29
#include "executorInt.h"
30
#include "querytask.h"
31
#include "storageapi.h"
32
#include "tcompression.h"
33

34
typedef struct tagFilterAssist {
35
  SHashObj* colHash;
36
  int32_t   index;
37
  SArray*   cInfoList;
38
  int32_t   code;
39
} tagFilterAssist;
40

41
typedef struct STransTagExprCtx {
42
  int32_t      code;
43
  SMetaReader* pReader;
44
} STransTagExprCtx;
45

46
typedef enum {
47
  FILTER_NO_LOGIC = 1,
48
  FILTER_AND,
49
  FILTER_OTHER,
50
} FilterCondType;
51

52
static FilterCondType checkTagCond(SNode* cond);
53
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI);
54
static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI, uint64_t suid);
55

56
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
57
                            STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI);
58

59
static int64_t getLimit(const SNode* pLimit) { return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->limit) ? -1 : ((SLimitNode*)pLimit)->limit->datum.i; }
27,384,488!
60
static int64_t getOffset(const SNode* pLimit) { return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->offset) ? -1 : ((SLimitNode*)pLimit)->offset->datum.i; }
27,391,061✔
61
static void    releaseColInfoData(void* pCol);
62

63
void initResultRowInfo(SResultRowInfo* pResultRowInfo) {
11,461,226✔
64
  pResultRowInfo->size = 0;
11,461,226✔
65
  pResultRowInfo->cur.pageId = -1;
11,461,226✔
66
}
11,461,226✔
67

68
void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; }
7,242,437✔
69

70
void resetResultRow(SResultRow* pResultRow, size_t entrySize) {
62,674,372✔
71
  pResultRow->numOfRows = 0;
62,674,372✔
72
  pResultRow->closed = false;
62,674,372✔
73
  pResultRow->endInterp = false;
62,674,372✔
74
  pResultRow->startInterp = false;
62,674,372✔
75

76
  if (entrySize > 0) {
62,674,372!
77
    memset(pResultRow->pEntryInfo, 0, entrySize);
62,678,688✔
78
  }
79
}
62,674,372✔
80

81
// TODO refactor: use macro
82
SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) {
2,147,483,647✔
83
  return (SResultRowEntryInfo*)((char*)pRow->pEntryInfo + offset[index]);
2,147,483,647✔
84
}
85

86
size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
8,744,873✔
87
  int32_t rowSize = (numOfOutput * sizeof(SResultRowEntryInfo)) + sizeof(SResultRow);
8,744,873✔
88

89
  for (int32_t i = 0; i < numOfOutput; ++i) {
39,780,158✔
90
    rowSize += pCtx[i].resDataInfo.interBufSize;
31,035,285✔
91
  }
92

93
  return rowSize;
8,744,873✔
94
}
95

96
// Convert buf read from rocksdb to result row
97
int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) {
2,849✔
98
  if (inBuf == NULL || pSup == NULL) {
2,849!
99
    qError("invalid input parameters, inBuf:%p, pSup:%p", inBuf, pSup);
31!
100
    return TSDB_CODE_INVALID_PARA;
31✔
101
  }
102
  SqlFunctionCtx *pCtx = pSup->pCtx;
2,818✔
103
  int32_t        *offset = pSup->rowEntryInfoOffset;
2,818✔
104
  SResultRow     *pResultRow  = NULL;
2,818✔
105
  size_t          processedSize = 0;
2,818✔
106
  int32_t         code = TSDB_CODE_SUCCESS;
2,818✔
107

108
  // calculate the size of output buffer
109
  *outBufSize = getResultRowSize(pCtx, pSup->numOfExprs);
2,818✔
110
  *outBuf = taosMemoryMalloc(*outBufSize);
2,818!
111
  if (*outBuf == NULL) {
2,818!
112
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
113
    return terrno;
×
114
  }
115
  pResultRow = (SResultRow*)*outBuf;
2,818✔
116
  (void)memcpy(pResultRow, inBuf, sizeof(SResultRow));
2,818✔
117
  inBuf += sizeof(SResultRow);
2,818✔
118
  processedSize += sizeof(SResultRow);
2,818✔
119

120
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
43,699✔
121
    int32_t len = *(int32_t*)inBuf;
40,882✔
122
    inBuf += sizeof(int32_t);
40,882✔
123
    processedSize += sizeof(int32_t);
40,882✔
124
    if (pResultRow->version != FUNCTION_RESULT_INFO_VERSION && pCtx->fpSet.decode) {
40,882!
125
      code = pCtx->fpSet.decode(&pCtx[i], inBuf, getResultEntryInfo(pResultRow, i, offset), pResultRow->version);
×
126
      if (code != TSDB_CODE_SUCCESS) {
×
127
        qError("failed to decode result row, code:%d", code);
×
128
        return code;
×
129
      }
130
    } else {
131
      (void)memcpy(getResultEntryInfo(pResultRow, i, offset), inBuf, len);
40,882✔
132
    }
133
    inBuf += len;
40,881✔
134
    processedSize += len;
40,881✔
135
  }
136

137
  if (processedSize < inBufSize) {
2,817✔
138
    // stream stores extra data after result row
139
    size_t leftLen = inBufSize - processedSize;
1,035✔
140
    TAOS_MEMORY_REALLOC(*outBuf, *outBufSize + leftLen);
1,035!
141
    if (*outBuf == NULL) {
1,035!
142
      qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen);
×
143
      return terrno;
×
144
    }
145
    (void)memcpy(*outBuf + *outBufSize, inBuf, leftLen);
1,035✔
146
    inBuf += leftLen;
1,035✔
147
    processedSize += leftLen;
1,035✔
148
    *outBufSize += leftLen;
1,035✔
149
  }
150
  return TSDB_CODE_SUCCESS;
2,817✔
151
}
152

153
// Convert result row to buf for rocksdb
154
int32_t putResultRowToBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) {
2,032,903✔
155
  if (pSup == NULL || inBuf == NULL || outBuf == NULL || outBufSize == NULL) {
2,032,903!
156
    qError("invalid input parameters, inBuf:%p, pSup:%p, outBufSize:%p, outBuf:%p", inBuf, pSup, outBufSize, outBuf);
×
157
    return TSDB_CODE_INVALID_PARA;
×
158
  }
159

160
  SqlFunctionCtx *pCtx = pSup->pCtx;
2,032,907✔
161
  int32_t        *offset = pSup->rowEntryInfoOffset;
2,032,907✔
162
  SResultRow     *pResultRow = (SResultRow*)inBuf;
2,032,907✔
163
  size_t          rowSize = getResultRowSize(pCtx, pSup->numOfExprs);
2,032,907✔
164

165
  if (rowSize > inBufSize) {
2,032,893!
166
    qError("invalid input buffer size, rowSize:%zu, inBufSize:%zu", rowSize, inBufSize);
×
167
    return TSDB_CODE_INVALID_PARA;
×
168
  }
169

170
  // calculate the size of output buffer
171
  *outBufSize = rowSize + sizeof(int32_t) * pSup->numOfExprs;
2,032,893✔
172
  if (rowSize < inBufSize) {
2,032,893✔
173
    *outBufSize += inBufSize - rowSize;
1,286✔
174
  }
175

176
  *outBuf = taosMemoryMalloc(*outBufSize);
2,032,893!
177
  if (*outBuf == NULL) {
2,032,890!
178
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
179
    return terrno;
×
180
  }
181

182
  char *pBuf = *outBuf;
2,032,890✔
183
  pResultRow->version = FUNCTION_RESULT_INFO_VERSION;
2,032,890✔
184
  (void)memcpy(pBuf, pResultRow, sizeof(SResultRow));
2,032,890✔
185
  pBuf += sizeof(SResultRow);
2,032,890✔
186
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
13,026,791✔
187
    size_t len = sizeof(SResultRowEntryInfo) + pCtx[i].resDataInfo.interBufSize;
10,994,004✔
188
    *(int32_t *) pBuf = (int32_t)len;
10,994,004✔
189
    pBuf += sizeof(int32_t);
10,994,004✔
190
    (void)memcpy(pBuf, getResultEntryInfo(pResultRow, i, offset), len);
10,994,004✔
191
    pBuf += len;
10,993,901✔
192
  }
193

194
  if (rowSize < inBufSize) {
2,032,787✔
195
    // stream stores extra data after result row
196
    size_t leftLen = inBufSize - rowSize;
1,286✔
197
    (void)memcpy(pBuf, inBuf + rowSize, leftLen);
1,286✔
198
    pBuf += leftLen;
1,286✔
199
  }
200
  return TSDB_CODE_SUCCESS;
2,032,787✔
201
}
202

203
static void freeEx(void* p) { taosMemoryFree(*(void**)p); }
×
204

205
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
4,526,614✔
206
  taosMemoryFreeClear(pGroupResInfo->pBuf);
4,526,614!
207
  if (pGroupResInfo->freeItem) {
4,527,211!
208
    //    taosArrayDestroy(pGroupResInfo->pRows);
209
    taosArrayDestroyEx(pGroupResInfo->pRows, freeEx);
×
210
    pGroupResInfo->freeItem = false;
×
211
    pGroupResInfo->pRows = NULL;
×
212
  } else {
213
    taosArrayDestroy(pGroupResInfo->pRows);
4,527,211✔
214
    pGroupResInfo->pRows = NULL;
4,527,820✔
215
  }
216
  pGroupResInfo->index = 0;
4,527,820✔
217
  pGroupResInfo->delIndex = 0;
4,527,820✔
218
}
4,527,820✔
219

220
int32_t resultrowComparAsc(const void* p1, const void* p2) {
1,842,787,075✔
221
  SResKeyPos* pp1 = *(SResKeyPos**)p1;
1,842,787,075✔
222
  SResKeyPos* pp2 = *(SResKeyPos**)p2;
1,842,787,075✔
223

224
  if (pp1->groupId == pp2->groupId) {
1,842,787,075✔
225
    int64_t pts1 = *(int64_t*)pp1->key;
1,677,608,806✔
226
    int64_t pts2 = *(int64_t*)pp2->key;
1,677,608,806✔
227

228
    if (pts1 == pts2) {
1,677,608,806!
229
      return 0;
×
230
    } else {
231
      return pts1 < pts2 ? -1 : 1;
1,677,608,806✔
232
    }
233
  } else {
234
    return pp1->groupId < pp2->groupId ? -1 : 1;
165,178,269✔
235
  }
236
}
237

238
static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); }
68,005,647✔
239

240
int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) {
3,655,529✔
241
  int32_t code = TSDB_CODE_SUCCESS;
3,655,529✔
242
  int32_t lino = 0;
3,655,529✔
243
  if (pGroupResInfo->pRows != NULL) {
3,655,529✔
244
    taosArrayDestroy(pGroupResInfo->pRows);
49,945✔
245
  }
246
  if (pGroupResInfo->pBuf) {
3,655,529✔
247
    taosMemoryFree(pGroupResInfo->pBuf);
49,945!
248
    pGroupResInfo->pBuf = NULL;
49,945✔
249
  }
250

251
  // extract the result rows information from the hash map
252
  int32_t size = tSimpleHashGetSize(pHashmap);
3,655,529✔
253

254
  void* pData = NULL;
3,655,665✔
255
  pGroupResInfo->pRows = taosArrayInit(size, POINTER_BYTES);
3,655,665✔
256
  QUERY_CHECK_NULL(pGroupResInfo->pRows, code, lino, _end, terrno);
3,656,324!
257

258
  size_t  keyLen = 0;
3,656,324✔
259
  int32_t iter = 0;
3,656,324✔
260
  int64_t bufLen = 0, offset = 0;
3,656,324✔
261

262
  // todo move away and record this during create window
263
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
204,855,838✔
264
    /*void* key = */ (void)tSimpleHashGetKey(pData, &keyLen);
265
    bufLen += keyLen + sizeof(SResultRowPosition);
201,199,514✔
266
  }
267

268
  pGroupResInfo->pBuf = taosMemoryMalloc(bufLen);
3,641,167!
269
  QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
3,656,337!
270

271
  iter = 0;
3,656,337✔
272
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
204,802,550✔
273
    void* key = tSimpleHashGetKey(pData, &keyLen);
201,172,038✔
274

275
    SResKeyPos* p = (SResKeyPos*)(pGroupResInfo->pBuf + offset);
201,172,038✔
276

277
    p->groupId = *(uint64_t*)key;
201,172,038✔
278
    p->pos = *(SResultRowPosition*)pData;
201,172,038✔
279
    memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
201,172,038✔
280
    void* tmp = taosArrayPush(pGroupResInfo->pRows, &p);
201,172,038✔
281
    QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
201,146,213!
282

283
    offset += keyLen + sizeof(struct SResultRowPosition);
201,146,213✔
284
  }
285

286
  if (order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC) {
3,665,903✔
287
    __compar_fn_t fn = (order == TSDB_ORDER_ASC) ? resultrowComparAsc : resultrowComparDesc;
1,642,778✔
288
    size = POINTER_BYTES;
1,642,778✔
289
    taosSort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), size, fn);
1,642,778✔
290
  }
291

292
  pGroupResInfo->index = 0;
3,656,803✔
293

294
_end:
3,656,803✔
295
  if (code != TSDB_CODE_SUCCESS) {
3,656,803!
296
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
297
  }
298
  return code;
3,656,252✔
299
}
300

301
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
36,422✔
302
  if (pGroupResInfo->pRows != NULL) {
36,422✔
303
    taosArrayDestroy(pGroupResInfo->pRows);
32,407✔
304
  }
305

306
  pGroupResInfo->freeItem = true;
36,422✔
307
  pGroupResInfo->pRows = pArrayList;
36,422✔
308
  pGroupResInfo->index = 0;
36,422✔
309
  pGroupResInfo->delIndex = 0;
36,422✔
310
}
36,422✔
311

312
bool hasRemainResults(SGroupResInfo* pGroupResInfo) {
10,531,195✔
313
  if (pGroupResInfo->pRows == NULL) {
10,531,195✔
314
    return false;
1,522✔
315
  }
316

317
  return pGroupResInfo->index < taosArrayGetSize(pGroupResInfo->pRows);
10,529,673✔
318
}
319

320
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
6,969,792✔
321
  if (pGroupResInfo->pRows == 0) {
6,969,792✔
322
    return 0;
2,518✔
323
  }
324

325
  return (int32_t)taosArrayGetSize(pGroupResInfo->pRows);
6,967,274✔
326
}
327

328
SArray* createSortInfo(SNodeList* pNodeList) {
2,246,243✔
329
  size_t numOfCols = 0;
2,246,243✔
330

331
  if (pNodeList != NULL) {
2,246,243✔
332
    numOfCols = LIST_LENGTH(pNodeList);
2,245,994!
333
  } else {
334
    numOfCols = 0;
249✔
335
  }
336

337
  SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
2,246,243✔
338
  if (pList == NULL) {
2,246,323!
339
    return pList;
×
340
  }
341

342
  for (int32_t i = 0; i < numOfCols; ++i) {
4,618,938✔
343
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)nodesListGetNode(pNodeList, i);
2,372,439✔
344
    if (!pSortKey) {
2,372,526✔
345
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
13!
346
      taosArrayDestroy(pList);
13✔
347
      pList = NULL;
×
348
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
349
      break;
×
350
    }
351
    SBlockOrderInfo   bi = {0};
2,372,513✔
352
    bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
2,372,513✔
353
    bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
2,372,513✔
354

355
    SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr;
2,372,513✔
356
    bi.slotId = pColNode->slotId;
2,372,513✔
357
    void* tmp = taosArrayPush(pList, &bi);
2,372,615✔
358
    if (!tmp) {
2,372,615!
359
      taosArrayDestroy(pList);
×
360
      pList = NULL;
×
361
      break;
×
362
    }
363
  }
364

365
  return pList;
2,246,499✔
366
}
367

368
SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) {
20,669,618✔
369
  int32_t      numOfCols = LIST_LENGTH(pNode->pSlots);
20,669,618!
370
  SSDataBlock* pBlock = NULL;
20,669,618✔
371
  int32_t      code = createDataBlock(&pBlock);
20,669,618✔
372
  if (code) {
20,677,576!
373
    terrno = code;
×
374
    return NULL;
×
375
  }
376

377
  pBlock->info.id.blockId = pNode->dataBlockId;
20,682,048✔
378
  pBlock->info.type = STREAM_INVALID;
20,682,048✔
379
  pBlock->info.calWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
20,682,048✔
380
  pBlock->info.watermark = INT64_MIN;
20,682,048✔
381

382
  for (int32_t i = 0; i < numOfCols; ++i) {
112,613,233✔
383
    SSlotDescNode*  pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
91,913,994✔
384
    if (!pDescNode) {
91,979,209!
385
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
386
      blockDataDestroy(pBlock);
×
387
      pBlock = NULL;
×
388
      terrno = TSDB_CODE_INVALID_PARA;
×
389
      break;
×
390
    }
391
    SColumnInfoData idata =
392
        createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
91,979,209✔
393
    idata.info.scale = pDescNode->dataType.scale;
92,042,881✔
394
    idata.info.precision = pDescNode->dataType.precision;
92,042,881✔
395
    idata.info.noData = pDescNode->reserve;
92,042,881✔
396

397
    code = blockDataAppendColInfo(pBlock, &idata);
92,042,881✔
398
    if (code != TSDB_CODE_SUCCESS) {
91,931,185!
399
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
400
      blockDataDestroy(pBlock);
×
401
      pBlock = NULL;
×
402
      terrno = code;
×
403
      break;
×
404
    }
405
  }
406

407
  return pBlock;
20,699,239✔
408
}
409

410
int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) {
4,764,473✔
411
  SDataBlockInfo* pBlockInfo = &pDataBlock->info;
4,764,473✔
412

413
  for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) {
27,830,516✔
414
    SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i);
23,229,445✔
415
    if (!pItem) {
23,239,542✔
416
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
2,281!
417
      return terrno;
2,281✔
418
    }
419

420
    if (pItem->isPk) {
23,237,261✔
421
      SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId);
171,218✔
422
      if (!pInfoData) {
171,217!
423
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
424
        return terrno;
×
425
      }
426
      pBlockInfo->pks[0].type = pInfoData->info.type;
171,217✔
427
      pBlockInfo->pks[1].type = pInfoData->info.type;
171,217✔
428

429
      // allocate enough buffer size, which is pInfoData->info.bytes
430
      if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
171,217!
431
        pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
48!
432
        if (pBlockInfo->pks[0].pData == NULL) {
33!
433
          return terrno;
×
434
        }
435

436
        pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
33!
437
        if (pBlockInfo->pks[1].pData == NULL) {
33!
438
          taosMemoryFreeClear(pBlockInfo->pks[0].pData);
×
439
          return terrno;
×
440
        }
441

442
        pBlockInfo->pks[0].nData = pInfoData->info.bytes;
33✔
443
        pBlockInfo->pks[1].nData = pInfoData->info.bytes;
33✔
444
      }
445

446
      break;
171,202✔
447
    }
448
  }
449

450
  return TSDB_CODE_SUCCESS;
4,762,205✔
451
}
452

453
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
7,145✔
454
  STransTagExprCtx* pCtx = pContext;
7,145✔
455
  SMetaReader*      mr = pCtx->pReader;
7,145✔
456
  bool              isTagCol = false, isTbname = false;
7,145✔
457
  if (nodeType(*pNode) == QUERY_NODE_COLUMN) {
7,145✔
458
    SColumnNode* pCol = (SColumnNode*)*pNode;
2,053✔
459
    if (pCol->colType == COLUMN_TYPE_TBNAME)
2,053!
460
      isTbname = true;
×
461
    else
462
      isTagCol = true;
2,053✔
463
  } else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) {
5,092!
464
    SFunctionNode* pFunc = (SFunctionNode*)*pNode;
×
465
    if (pFunc->funcType == FUNCTION_TYPE_TBNAME) isTbname = true;
×
466
  }
467
  if (isTagCol) {
7,145✔
468
    SColumnNode* pSColumnNode = *(SColumnNode**)pNode;
2,053✔
469

470
    SValueNode* res = NULL;
2,053✔
471
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
2,053✔
472
    if (NULL == res) {
2,053!
473
      return DEAL_RES_ERROR;
×
474
    }
475

476
    res->translate = true;
2,053✔
477
    res->node.resType = pSColumnNode->node.resType;
2,053✔
478

479
    STagVal tagVal = {0};
2,053✔
480
    tagVal.cid = pSColumnNode->colId;
2,053✔
481
    const char* p = mr->pAPI->extractTagVal(mr->me.ctbEntry.pTags, pSColumnNode->node.resType.type, &tagVal);
2,053✔
482
    if (p == NULL) {
2,053!
483
      res->node.resType.type = TSDB_DATA_TYPE_NULL;
×
484
    } else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) {
2,053!
485
      int32_t len = ((const STag*)p)->len;
×
486
      res->datum.p = taosMemoryCalloc(len + 1, 1);
×
487
      if (NULL == res->datum.p) {
×
488
        return DEAL_RES_ERROR;
×
489
      }
490
      memcpy(res->datum.p, p, len);
×
491
    } else if (IS_VAR_DATA_TYPE(pSColumnNode->node.resType.type)) {
2,053!
492
      res->datum.p = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1);
2,030!
493
      if (NULL == res->datum.p) {
2,030!
494
        return DEAL_RES_ERROR;
×
495
      }
496
      memcpy(varDataVal(res->datum.p), tagVal.pData, tagVal.nData);
2,030✔
497
      varDataSetLen(res->datum.p, tagVal.nData);
2,030✔
498
    } else {
499
      int32_t code = nodesSetValueNodeValue(res, &(tagVal.i64));
23✔
500
      if (code != TSDB_CODE_SUCCESS) {
23!
501
        return DEAL_RES_ERROR;
×
502
      }
503
    }
504
    nodesDestroyNode(*pNode);
2,053✔
505
    *pNode = (SNode*)res;
2,053✔
506
  } else if (isTbname) {
5,092!
507
    SValueNode* res = NULL;
×
508
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
×
509
    if (NULL == res) {
×
510
      return DEAL_RES_ERROR;
×
511
    }
512

513
    res->translate = true;
×
514
    res->node.resType = ((SExprNode*)(*pNode))->resType;
×
515

516
    int32_t len = strlen(mr->me.name);
×
517
    res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
×
518
    if (NULL == res->datum.p) {
×
519
      return DEAL_RES_ERROR;
×
520
    }
521
    memcpy(varDataVal(res->datum.p), mr->me.name, len);
×
522
    varDataSetLen(res->datum.p, len);
×
523
    nodesDestroyNode(*pNode);
×
524
    *pNode = (SNode*)res;
×
525
  }
526

527
  return DEAL_RES_CONTINUE;
7,145✔
528
}
529

530
int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI* pAPI) {
1,025✔
531
  int32_t     code = TSDB_CODE_SUCCESS;
1,025✔
532
  SMetaReader mr = {0};
1,025✔
533

534
  pAPI->metaReaderFn.initReader(&mr, metaHandle, META_READER_LOCK, &pAPI->metaFn);
1,025✔
535
  code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, info->uid);
1,025✔
536
  if (TSDB_CODE_SUCCESS != code) {
1,025!
537
    pAPI->metaReaderFn.clearReader(&mr);
×
538
    *pQualified = false;
×
539

540
    return TSDB_CODE_SUCCESS;
×
541
  }
542

543
  SNode* pTagCondTmp = NULL;
1,025✔
544
  code = nodesCloneNode(pTagCond, &pTagCondTmp);
1,025✔
545
  if (TSDB_CODE_SUCCESS != code) {
1,025!
546
    *pQualified = false;
×
547
    pAPI->metaReaderFn.clearReader(&mr);
×
548
    return code;
×
549
  }
550
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
1,025✔
551
  nodesRewriteExprPostOrder(&pTagCondTmp, doTranslateTagExpr, &ctx);
1,025✔
552
  pAPI->metaReaderFn.clearReader(&mr);
1,025✔
553
  if (TSDB_CODE_SUCCESS != ctx.code) {
1,025!
554
    *pQualified = false;
×
555
    terrno = code;
×
556
    return code;
×
557
  }
558

559
  SNode* pNew = NULL;
1,025✔
560
  code = scalarCalculateConstants(pTagCondTmp, &pNew);
1,025✔
561
  if (TSDB_CODE_SUCCESS != code) {
1,025!
562
    terrno = code;
×
563
    nodesDestroyNode(pTagCondTmp);
×
564
    *pQualified = false;
×
565

566
    return code;
×
567
  }
568

569
  SValueNode* pValue = (SValueNode*)pNew;
1,025✔
570
  *pQualified = pValue->datum.b;
1,025✔
571

572
  nodesDestroyNode(pNew);
1,025✔
573
  return TSDB_CODE_SUCCESS;
1,025✔
574
}
575

576
static EDealRes getColumn(SNode** pNode, void* pContext) {
74,771,561✔
577
  tagFilterAssist* pData = (tagFilterAssist*)pContext;
74,771,561✔
578
  SColumnNode*     pSColumnNode = NULL;
74,771,561✔
579
  if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
74,771,561✔
580
    pSColumnNode = *(SColumnNode**)pNode;
4,240,362✔
581
  } else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) {
70,531,199✔
582
    SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
432,616✔
583
    if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
432,616✔
584
      pData->code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pSColumnNode);
421,366✔
585
      if (NULL == pSColumnNode) {
421,371!
586
        return DEAL_RES_ERROR;
×
587
      }
588
      pSColumnNode->colId = -1;
421,371✔
589
      pSColumnNode->colType = COLUMN_TYPE_TBNAME;
421,371✔
590
      pSColumnNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
421,371✔
591
      pSColumnNode->node.resType.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
421,371✔
592
      nodesDestroyNode(*pNode);
421,371✔
593
      *pNode = (SNode*)pSColumnNode;
421,373✔
594
    } else {
595
      return DEAL_RES_CONTINUE;
11,250✔
596
    }
597
  } else {
598
    return DEAL_RES_CONTINUE;
70,098,583✔
599
  }
600

601
  void* data = taosHashGet(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId));
4,661,735✔
602
  if (!data) {
4,660,386✔
603
    int32_t tempRes =
604
        taosHashPut(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode)));
3,592,807✔
605
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
3,595,514!
606
      return DEAL_RES_ERROR;
×
607
    }
608
    pSColumnNode->slotId = pData->index++;
3,595,514✔
609
    SColumnInfo cInfo = {.colId = pSColumnNode->colId,
3,595,514✔
610
                         .type = pSColumnNode->node.resType.type,
3,595,514✔
611
                         .bytes = pSColumnNode->node.resType.bytes,
3,595,514✔
612
                         .pk = pSColumnNode->isPk};
3,595,514✔
613
#if TAG_FILTER_DEBUG
614
    qDebug("tagfilter build column info, slotId:%d, colId:%d, type:%d", pSColumnNode->slotId, cInfo.colId, cInfo.type);
615
#endif
616
    void* tmp = taosArrayPush(pData->cInfoList, &cInfo);
3,595,514✔
617
    if (!tmp) {
3,595,132!
618
      return DEAL_RES_ERROR;
×
619
    }
620
  } else {
621
    SColumnNode* col = *(SColumnNode**)data;
1,067,579✔
622
    pSColumnNode->slotId = col->slotId;
1,067,579✔
623
  }
624

625
  return DEAL_RES_CONTINUE;
4,662,711✔
626
}
627

628
static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) {
1,379,831✔
629
  SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
1,379,831!
630
  if (pColumnData == NULL) {
1,380,285!
631
    return terrno;
×
632
  }
633

634
  pColumnData->info.type = pType->type;
1,380,285✔
635
  pColumnData->info.bytes = pType->bytes;
1,380,285✔
636
  pColumnData->info.scale = pType->scale;
1,380,285✔
637
  pColumnData->info.precision = pType->precision;
1,380,285✔
638

639
  int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
1,380,285✔
640
  if (code != TSDB_CODE_SUCCESS) {
1,380,200!
UNCOV
641
    terrno = code;
×
642
    releaseColInfoData(pColumnData);
×
643
    return terrno;
×
644
  }
645

646
  pParam->columnData = pColumnData;
1,380,216✔
647
  pParam->colAlloced = true;
1,380,216✔
648
  return TSDB_CODE_SUCCESS;
1,380,216✔
649
}
650

651
static void releaseColInfoData(void* pCol) {
75,894✔
652
  if (pCol) {
75,894!
653
    SColumnInfoData* col = (SColumnInfoData*)pCol;
75,900✔
654
    colDataDestroy(col);
75,900✔
655
    taosMemoryFree(col);
75,909!
656
  }
657
}
75,904✔
658

659
void freeItem(void* p) {
3,370,736✔
660
  STUidTagInfo* pInfo = p;
3,370,736✔
661
  if (pInfo->pTagVal != NULL) {
3,370,736✔
662
    taosMemoryFree(pInfo->pTagVal);
3,207,841!
663
  }
664
}
3,371,051✔
665

666
static int32_t genTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
×
667
  if (pTagCond == NULL) {
×
668
    return TSDB_CODE_SUCCESS;
×
669
  }
670

671
  char*   payload = NULL;
×
672
  int32_t len = 0;
×
673
  int32_t code = nodesNodeToMsg(pTagCond, &payload, &len);
×
674
  if (code != TSDB_CODE_SUCCESS) {
×
675
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
676
    return code;
×
677
  }
678

679
  tMD5Init(pContext);
×
680
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
×
681
  tMD5Final(pContext);
×
682

683
  taosMemoryFree(payload);
×
684
  return TSDB_CODE_SUCCESS;
×
685
}
686

687
static int32_t genTbGroupDigest(const SNode* pGroup, uint8_t* filterDigest, T_MD5_CTX* pContext) {
×
688
  int32_t code = TSDB_CODE_SUCCESS;
×
689
  int32_t lino = 0;
×
690
  char*   payload = NULL;
×
691
  int32_t len = 0;
×
692
  code = nodesNodeToMsg(pGroup, &payload, &len);
×
693
  QUERY_CHECK_CODE(code, lino, _end);
×
694

695
  if (filterDigest[0]) {
×
696
    payload = taosMemoryRealloc(payload, len + tListLen(pContext->digest));
×
697
    QUERY_CHECK_NULL(payload, code, lino, _end, terrno);
×
698
    memcpy(payload + len, filterDigest + 1, tListLen(pContext->digest));
×
699
    len += tListLen(pContext->digest);
×
700
  }
701

702
  tMD5Init(pContext);
×
703
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
×
704
  tMD5Final(pContext);
×
705

706
_end:
×
707
  taosMemoryFree(payload);
×
708
  if (code != TSDB_CODE_SUCCESS) {
×
709
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
710
  }
711
  return code;
×
712
}
713

714
int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo, uint8_t* digest,
63,415✔
715
                                   SStorageAPI* pAPI, bool initRemainGroups) {
716
  int32_t      code = TSDB_CODE_SUCCESS;
63,415✔
717
  int32_t      lino = 0;
63,415✔
718
  SArray*      pBlockList = NULL;
63,415✔
719
  SSDataBlock* pResBlock = NULL;
63,415✔
720
  void*        keyBuf = NULL;
63,415✔
721
  SArray*      groupData = NULL;
63,415✔
722
  SArray*      pUidTagList = NULL;
63,415✔
723
  SArray*      tableList = NULL;
63,415✔
724

725
  int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
63,415✔
726
  if (rows == 0) {
63,420!
727
    return TSDB_CODE_SUCCESS;
×
728
  }
729

730
  tagFilterAssist ctx = {0};
63,420✔
731
  ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
63,420✔
732
  if (ctx.colHash == NULL) {
63,457!
733
    code = terrno;
×
734
    goto end;
×
735
  }
736

737
  ctx.index = 0;
63,457✔
738
  ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
63,457✔
739
  if (ctx.cInfoList == NULL) {
63,462✔
740
    code = terrno;
3✔
741
    goto end;
×
742
  }
743

744
  SNode* pNode = NULL;
63,459✔
745
  FOREACH(pNode, group) {
139,367!
746
    nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx);
75,885✔
747
    if (TSDB_CODE_SUCCESS != ctx.code) {
75,908!
748
      code = ctx.code;
×
749
      goto end;
×
750
    }
751
    REPLACE_NODE(pNode);
75,908✔
752
  }
753

754
  T_MD5_CTX context = {0};
63,482✔
755
  if (tsTagFilterCache) {
63,482!
756
    SNodeListNode* listNode = NULL;
×
757
    code = nodesMakeNode(QUERY_NODE_NODE_LIST, (SNode**)&listNode);
×
758
    if (TSDB_CODE_SUCCESS != code) {
×
759
      goto end;
×
760
    }
761
    listNode->pNodeList = group;
×
762
    code = genTbGroupDigest((SNode*)listNode, digest, &context);
×
763
    QUERY_CHECK_CODE(code, lino, end);
×
764

765
    nodesFree(listNode);
×
766

767
    code = pAPI->metaFn.metaGetCachedTbGroup(pVnode, pTableListInfo->idInfo.suid, context.digest,
×
768
                                             tListLen(context.digest), &tableList);
769
    QUERY_CHECK_CODE(code, lino, end);
×
770

771
    if (tableList) {
×
772
      taosArrayDestroy(pTableListInfo->pTableList);
×
773
      pTableListInfo->pTableList = tableList;
×
774
      qDebug("retrieve tb group list from cache, numOfTables:%d",
×
775
             (int32_t)taosArrayGetSize(pTableListInfo->pTableList));
776
      goto end;
×
777
    }
778
  }
779

780
  pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
63,482✔
781
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
63,428!
782

783
  for (int32_t i = 0; i < rows; ++i) {
319,324✔
784
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
255,805✔
785
    QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
255,787!
786
    STUidTagInfo   info = {.uid = pkeyInfo->uid};
255,787✔
787
    void*          tmp = taosArrayPush(pUidTagList, &info);
255,892✔
788
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
255,892!
789
  }
790

791
  code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
63,519✔
792
  if (code != TSDB_CODE_SUCCESS) {
63,465!
793
    goto end;
×
794
  }
795

796
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
63,465✔
797
  pResBlock = createTagValBlockForFilter(ctx.cInfoList, numOfTables, pUidTagList, pVnode, pAPI);
63,447✔
798
  if (pResBlock == NULL) {
63,451!
799
    code = terrno;
×
800
    goto end;
×
801
  }
802

803
  //  int64_t st1 = taosGetTimestampUs();
804
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
805

806
  pBlockList = taosArrayInit(2, POINTER_BYTES);
63,451✔
807
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
63,457!
808

809
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
63,458✔
810
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
63,458!
811

812
  groupData = taosArrayInit(2, POINTER_BYTES);
63,458✔
813
  QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
63,459!
814

815
  FOREACH(pNode, group) {
139,346!
816
    SScalarParam output = {0};
75,889✔
817

818
    switch (nodeType(pNode)) {
75,889!
819
      case QUERY_NODE_VALUE:
×
820
        break;
×
821
      case QUERY_NODE_COLUMN:
75,889✔
822
      case QUERY_NODE_OPERATOR:
823
      case QUERY_NODE_FUNCTION: {
824
        SExprNode* expNode = (SExprNode*)pNode;
75,889✔
825
        code = createResultData(&expNode->resType, rows, &output);
75,889✔
826
        if (code != TSDB_CODE_SUCCESS) {
75,885!
827
          goto end;
×
828
        }
829
        break;
75,885✔
830
      }
831

832
      default:
×
833
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
834
        goto end;
×
835
    }
836

837
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
75,885✔
838
      SColumnNode*     pSColumnNode = (SColumnNode*)pNode;
71,757✔
839
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
71,757✔
840
      QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
71,753!
841
      code = colDataAssign(output.columnData, pColInfo, rows, NULL);
71,753✔
842
    } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
4,128!
843
      continue;
×
844
    } else {
845
      code = scalarCalculate(pNode, pBlockList, &output);
4,128✔
846
    }
847

848
    if (code != TSDB_CODE_SUCCESS) {
75,885!
849
      releaseColInfoData(output.columnData);
×
850
      goto end;
×
851
    }
852

853
    void* tmp = taosArrayPush(groupData, &output.columnData);
75,884✔
854
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
75,884!
855
  }
856

857
  int32_t keyLen = 0;
63,457✔
858
  SNode*  node;
859
  FOREACH(node, group) {
139,342✔
860
    SExprNode* pExpr = (SExprNode*)node;
75,885✔
861
    keyLen += pExpr->resType.bytes;
75,885✔
862
  }
863

864
  int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
63,457✔
865
  keyLen += nullFlagSize;
63,457✔
866

867
  keyBuf = taosMemoryCalloc(1, keyLen);
63,457✔
868
  if (keyBuf == NULL) {
63,449!
869
    code = terrno;
×
870
    goto end;
×
871
  }
872

873
  if (initRemainGroups) {
63,449✔
874
    pTableListInfo->remainGroups =
13,153✔
875
        taosHashInit(rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
13,165✔
876
    if (pTableListInfo->remainGroups == NULL) {
13,153!
877
      code = terrno;
×
878
      goto end;
×
879
    }
880
  }
881

882
  for (int i = 0; i < rows; i++) {
319,167✔
883
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
255,680✔
884
    QUERY_CHECK_NULL(info, code, lino, end, terrno);
255,627✔
885

886
    char* isNull = (char*)keyBuf;
255,618✔
887
    char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(group);
255,618!
888
    for (int j = 0; j < taosArrayGetSize(groupData); j++) {
540,840✔
889
      SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
285,224✔
890

891
      if (colDataIsNull_s(pValue, i)) {
570,474✔
892
        isNull[j] = 1;
488✔
893
      } else {
894
        isNull[j] = 0;
284,749✔
895
        char* data = colDataGetData(pValue, i);
284,749!
896
        if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
284,749✔
897
          if (tTagIsJson(data)) {
336✔
898
            code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
12✔
899
            goto end;
12✔
900
          }
901
          if (tTagIsJsonNull(data)) {
324!
902
            isNull[j] = 1;
×
903
            continue;
×
904
          }
905
          int32_t len = getJsonValueLen(data);
324✔
906
          memcpy(pStart, data, len);
324✔
907
          pStart += len;
324✔
908
        } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
284,413!
909
          if (varDataTLen(data) > pValue->info.bytes) {
253,874✔
910
            code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
3✔
911
            goto end;
3✔
912
          }
913
          memcpy(pStart, data, varDataTLen(data));
253,871✔
914
          pStart += varDataTLen(data);
253,871✔
915
        } else {
916
          memcpy(pStart, data, pValue->info.bytes);
30,539✔
917
          pStart += pValue->info.bytes;
30,539✔
918
        }
919
      }
920
    }
921

922
    int32_t len = (int32_t)(pStart - (char*)keyBuf);
254,758✔
923
    info->groupId = calcGroupId(keyBuf, len);
254,758✔
924
    if (initRemainGroups) {
255,722✔
925
      // groupId ~ table uid
926
      code = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid),
76,771✔
927
                         sizeof(info->uid));
928
      if (code == TSDB_CODE_DUP_KEY) {
76,779✔
929
        code = TSDB_CODE_SUCCESS;
1,320✔
930
      }
931
      QUERY_CHECK_CODE(code, lino, end);
76,779!
932
    }
933
  }
934

935
  if (tsTagFilterCache) {
63,487!
936
    tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
×
937
    QUERY_CHECK_NULL(tableList, code, lino, end, terrno);
×
938

939
    code = pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest,
×
940
                                              tListLen(context.digest), tableList,
941
                                              taosArrayGetSize(tableList) * sizeof(STableKeyInfo));
×
942
    QUERY_CHECK_CODE(code, lino, end);
×
943
  }
944

945
  //  int64_t st2 = taosGetTimestampUs();
946
  //  qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
947

948
end:
63,487✔
949
  taosMemoryFreeClear(keyBuf);
63,502!
950
  taosHashCleanup(ctx.colHash);
63,506✔
951
  taosArrayDestroy(ctx.cInfoList);
63,446✔
952
  blockDataDestroy(pResBlock);
63,447✔
953
  taosArrayDestroy(pBlockList);
63,461✔
954
  taosArrayDestroyEx(pUidTagList, freeItem);
63,460✔
955
  taosArrayDestroyP(groupData, releaseColInfoData);
63,464✔
956
  if (code != TSDB_CODE_SUCCESS) {
63,460✔
957
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
15!
958
  }
959
  return code;
63,431✔
960
}
961

962
static int32_t nameComparFn(const void* p1, const void* p2) {
2,421✔
963
  const char* pName1 = *(const char**)p1;
2,421✔
964
  const char* pName2 = *(const char**)p2;
2,421✔
965

966
  int32_t ret = strcmp(pName1, pName2);
2,421✔
967
  if (ret == 0) {
2,421✔
968
    return 0;
16✔
969
  } else {
970
    return (ret > 0) ? 1 : -1;
2,405✔
971
  }
972
}
973

974
static SArray* getTableNameList(const SNodeListNode* pList) {
416,112✔
975
  int32_t    code = TSDB_CODE_SUCCESS;
416,112✔
976
  int32_t    lino = 0;
416,112✔
977
  int32_t    len = LIST_LENGTH(pList->pNodeList);
416,112!
978
  SListCell* cell = pList->pNodeList->pHead;
416,112✔
979

980
  SArray* pTbList = taosArrayInit(len, POINTER_BYTES);
416,112✔
981
  QUERY_CHECK_NULL(pTbList, code, lino, _end, terrno);
416,121!
982

983
  for (int i = 0; i < pList->pNodeList->length; i++) {
833,971✔
984
    SValueNode* valueNode = (SValueNode*)cell->pNode;
417,853✔
985
    if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
417,853!
986
      terrno = TSDB_CODE_INVALID_PARA;
×
987
      taosArrayDestroy(pTbList);
×
988
      return NULL;
×
989
    }
990

991
    char* name = varDataVal(valueNode->datum.p);
417,853✔
992
    void* tmp = taosArrayPush(pTbList, &name);
417,850✔
993
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
417,850!
994
    cell = cell->pNext;
417,850✔
995
  }
996

997
  size_t numOfTables = taosArrayGetSize(pTbList);
416,118✔
998

999
  // order the name
1000
  taosArraySort(pTbList, nameComparFn);
416,110✔
1001

1002
  // remove the duplicates
1003
  SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
416,103✔
1004
  QUERY_CHECK_NULL(pNewList, code, lino, _end, terrno);
416,117!
1005
  void*   tmpTbl = taosArrayGet(pTbList, 0);
416,117✔
1006
  QUERY_CHECK_NULL(tmpTbl, code, lino, _end, terrno);
416,110!
1007
  void*   tmp = taosArrayPush(pNewList, tmpTbl);
416,114✔
1008
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
416,114!
1009

1010
  for (int32_t i = 1; i < numOfTables; ++i) {
417,858✔
1011
    char** name = taosArrayGetLast(pNewList);
1,740✔
1012
    char** nameInOldList = taosArrayGet(pTbList, i);
1,741✔
1013
    QUERY_CHECK_NULL(nameInOldList, code, lino, _end, terrno);
1,740!
1014
    if (strcmp(*name, *nameInOldList) == 0) {
1,740✔
1015
      continue;
16✔
1016
    }
1017

1018
    tmp = taosArrayPush(pNewList, nameInOldList);
1,728✔
1019
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,728!
1020
  }
1021

1022
_end:
416,118✔
1023
  taosArrayDestroy(pTbList);
416,118✔
1024
  if (code != TSDB_CODE_SUCCESS) {
416,116✔
1025
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
3!
1026
    return NULL;
×
1027
  }
1028
  return pNewList;
416,113✔
1029
}
1030

1031
static int tableUidCompare(const void* a, const void* b) {
×
1032
  uint64_t u1 = *(uint64_t*)a;
×
1033
  uint64_t u2 = *(uint64_t*)b;
×
1034

1035
  if (u1 == u2) {
×
1036
    return 0;
×
1037
  }
1038

1039
  return u1 < u2 ? -1 : 1;
×
1040
}
1041

1042
static int32_t filterTableInfoCompare(const void* a, const void* b) {
3,336✔
1043
  STUidTagInfo* p1 = (STUidTagInfo*)a;
3,336✔
1044
  STUidTagInfo* p2 = (STUidTagInfo*)b;
3,336✔
1045

1046
  if (p1->uid == p2->uid) {
3,336!
1047
    return 0;
×
1048
  }
1049

1050
  return p1->uid < p2->uid ? -1 : 1;
3,336!
1051
}
1052

1053
static FilterCondType checkTagCond(SNode* cond) {
1,341,549✔
1054
  if (nodeType(cond) == QUERY_NODE_OPERATOR) {
1,341,549✔
1055
    return FILTER_NO_LOGIC;
292,044✔
1056
  }
1057
  if (nodeType(cond) != QUERY_NODE_LOGIC_CONDITION || ((SLogicConditionNode*)cond)->condType != LOGIC_COND_TYPE_AND) {
1,049,505!
1058
    return FILTER_AND;
9,210✔
1059
  }
1060
  return FILTER_OTHER;
1,040,295✔
1061
}
1062

1063
static int32_t optimizeTbnameInCond(void* pVnode, int64_t suid, SArray* list, SNode* cond, SStorageAPI* pAPI) {
1,341,726✔
1064
  int32_t ret = -1;
1,341,726✔
1065
  int32_t ntype = nodeType(cond);
1,341,726✔
1066

1067
  if (ntype == QUERY_NODE_OPERATOR) {
1,341,726✔
1068
    ret = optimizeTbnameInCondImpl(pVnode, list, cond, pAPI, suid);
292,045✔
1069
  }
1070

1071
  if (ntype != QUERY_NODE_LOGIC_CONDITION || ((SLogicConditionNode*)cond)->condType != LOGIC_COND_TYPE_AND) {
1,342,010✔
1072
    return ret;
302,124✔
1073
  }
1074

1075
  bool                 hasTbnameCond = false;
1,039,886✔
1076
  SLogicConditionNode* pNode = (SLogicConditionNode*)cond;
1,039,886✔
1077
  SNodeList*           pList = (SNodeList*)pNode->pParameterList;
1,039,886✔
1078

1079
  int32_t len = LIST_LENGTH(pList);
1,039,886✔
1080
  if (len <= 0) {
1,039,886!
1081
    return ret;
×
1082
  }
1083

1084
  SListCell* cell = pList->pHead;
1,039,886✔
1085
  for (int i = 0; i < len; i++) {
3,763,915✔
1086
    if (cell == NULL) break;
2,977,215!
1087
    if (optimizeTbnameInCondImpl(pVnode, list, cell->pNode, pAPI, suid) == 0) {
2,977,215✔
1088
      hasTbnameCond = true;
252,506✔
1089
      break;
252,506✔
1090
    }
1091
    cell = cell->pNext;
2,724,029✔
1092
  }
1093

1094
  taosArraySort(list, filterTableInfoCompare);
1,039,206✔
1095
  taosArrayRemoveDuplicate(list, filterTableInfoCompare, NULL);
1,038,543✔
1096

1097
  if (hasTbnameCond) {
1,038,283✔
1098
    ret = pAPI->metaFn.getTableTagsByUid(pVnode, suid, list);
252,505✔
1099
  }
1100

1101
  return ret;
1,038,673✔
1102
}
1103

1104
// only return uid that does not contained in pExistedUidList
1105
static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, SNode* pTagCond,
3,268,842✔
1106
                                        SStorageAPI* pStoreAPI, uint64_t suid) {
1107
  if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) {
3,268,842✔
1108
    return -1;
394,756✔
1109
  }
1110

1111
  SOperatorNode* pNode = (SOperatorNode*)pTagCond;
2,874,086✔
1112
  if (pNode->opType != OP_TYPE_IN) {
2,874,086✔
1113
    return -1;
1,973,783✔
1114
  }
1115

1116
  if ((pNode->pLeft != NULL && nodeType(pNode->pLeft) == QUERY_NODE_COLUMN &&
900,303!
1117
       ((SColumnNode*)pNode->pLeft)->colType == COLUMN_TYPE_TBNAME) &&
902,516✔
1118
      (pNode->pRight != NULL && nodeType(pNode->pRight) == QUERY_NODE_NODE_LIST)) {
416,121!
1119
    SNodeListNode* pList = (SNodeListNode*)pNode->pRight;
416,113✔
1120

1121
    int32_t len = LIST_LENGTH(pList->pNodeList);
416,113✔
1122
    if (len <= 0) {
416,113!
1123
      return -1;
×
1124
    }
1125

1126
    SArray*   pTbList = getTableNameList(pList);
416,113✔
1127
    int32_t   numOfTables = taosArrayGetSize(pTbList);
416,115✔
1128
    SHashObj* uHash = NULL;
416,116✔
1129

1130
    size_t numOfExisted = taosArrayGetSize(pExistedUidList);  // len > 0 means there already have uids
416,116✔
1131
    if (numOfExisted > 0) {
416,110!
1132
      uHash = taosHashInit(numOfExisted / 0.7, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
1133
      if (!uHash) {
×
1134
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1135
        return terrno;
×
1136
      }
1137

1138
      for (int i = 0; i < numOfExisted; i++) {
×
1139
        STUidTagInfo* pTInfo = taosArrayGet(pExistedUidList, i);
×
1140
        if (!pTInfo) {
×
1141
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1142
          return terrno;
×
1143
        }
1144
        int32_t       tempRes = taosHashPut(uHash, &pTInfo->uid, sizeof(uint64_t), &i, sizeof(i));
×
1145
        if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
×
1146
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
1147
          return tempRes;
×
1148
        }
1149
      }
1150
    }
1151

1152
    for (int i = 0; i < numOfTables; i++) {
833,921✔
1153
      char* name = taosArrayGetP(pTbList, i);
417,819✔
1154

1155
      uint64_t uid = 0, csuid = 0;
417,821✔
1156
      if (pStoreAPI->metaFn.getTableUidByName(pVnode, name, &uid) == 0) {
417,821✔
1157
        ETableType tbType = TSDB_TABLE_MAX;
416,116✔
1158
        if (pStoreAPI->metaFn.getTableTypeSuidByName(pVnode, name, &tbType, &csuid) == 0 && tbType == TSDB_CHILD_TABLE) {
416,116!
1159
          if (suid != csuid) {
416,100✔
1160
            continue;
20✔
1161
          }
1162
          if (NULL == uHash || taosHashGet(uHash, &uid, sizeof(uid)) == NULL) {
416,080!
1163
            STUidTagInfo s = {.uid = uid, .name = name, .pTagVal = NULL};
416,078✔
1164
            void*        tmp = taosArrayPush(pExistedUidList, &s);
416,074✔
1165
            if (!tmp) {
416,074!
1166
              return terrno;
×
1167
            }
1168
          }
1169
        } else {
1170
          taosArrayDestroy(pTbList);
21✔
1171
          taosHashCleanup(uHash);
21✔
1172
          return -1;
21✔
1173
        }
1174
      } else {
1175
        //        qWarn("failed to get tableIds from by table name: %s, reason: %s", name, tstrerror(terrno));
1176
        terrno = 0;
1,714✔
1177
      }
1178
    }
1179

1180
    taosHashCleanup(uHash);
416,102✔
1181
    taosArrayDestroy(pTbList);
416,088✔
1182
    return 0;
416,094✔
1183
  }
1184

1185
  return -1;
484,190✔
1186
}
1187

1188
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
1,381,382✔
1189
                                        SStorageAPI* pStorageAPI) {
1190
  int32_t      code = TSDB_CODE_SUCCESS;
1,381,382✔
1191
  int32_t      lino = 0;
1,381,382✔
1192
  SSDataBlock* pResBlock = NULL;
1,381,382✔
1193
  code = createDataBlock(&pResBlock);
1,381,382✔
1194
  QUERY_CHECK_CODE(code, lino, _end);
1,381,623!
1195

1196
  for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
4,873,923✔
1197
    SColumnInfoData colInfo = {0};
3,490,861✔
1198
    void* tmp = taosArrayGet(pColList, i);
3,490,861✔
1199
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
3,489,498!
1200
    colInfo.info = *(SColumnInfo*)tmp;
3,489,498✔
1201
    code = blockDataAppendColInfo(pResBlock, &colInfo);
3,489,498✔
1202
    QUERY_CHECK_CODE(code, lino, _end);
3,492,300!
1203
  }
1204

1205
  code = blockDataEnsureCapacity(pResBlock, numOfTables);
1,380,711✔
1206
  if (code != TSDB_CODE_SUCCESS) {
1,382,215!
1207
    terrno = code;
×
1208
    blockDataDestroy(pResBlock);
×
1209
    return NULL;
×
1210
  }
1211

1212
  pResBlock->info.rows = numOfTables;
1,382,215✔
1213

1214
  int32_t numOfCols = taosArrayGetSize(pResBlock->pDataBlock);
1,382,215✔
1215

1216
  for (int32_t i = 0; i < numOfTables; i++) {
4,773,490✔
1217
    STUidTagInfo* p1 = taosArrayGet(pUidTagList, i);
3,386,494✔
1218
    QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
3,385,350!
1219

1220
    for (int32_t j = 0; j < numOfCols; j++) {
11,491,159✔
1221
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
8,099,050✔
1222
      QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
8,094,649!
1223

1224
      if (pColInfo->info.colId == -1) {  // tbname
8,094,699✔
1225
        char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
432,566✔
1226
        if (p1->name != NULL) {
432,566✔
1227
          STR_TO_VARSTR(str, p1->name);
416,072✔
1228
        } else {  // name is not retrieved during filter
1229
          code = pStorageAPI->metaFn.getTableNameByUid(pVnode, p1->uid, str);
16,494✔
1230
          QUERY_CHECK_CODE(code, lino, _end);
16,512!
1231
        }
1232

1233
        code = colDataSetVal(pColInfo, i, str, false);
432,584✔
1234
        QUERY_CHECK_CODE(code, lino, _end);
432,582!
1235
#if TAG_FILTER_DEBUG
1236
        qDebug("tagfilter uid:%ld, tbname:%s", *uid, str + 2);
1237
#endif
1238
      } else {
1239
        STagVal tagVal = {0};
7,662,133✔
1240
        tagVal.cid = pColInfo->info.colId;
7,662,133✔
1241
        if (p1->pTagVal == NULL) {
7,662,133!
1242
          colDataSetNULL(pColInfo, i);
×
1243
        } else {
1244
          const char* p = pStorageAPI->metaFn.extractTagVal(p1->pTagVal, pColInfo->info.type, &tagVal);
7,662,133✔
1245

1246
          if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) {
7,678,874✔
1247
            colDataSetNULL(pColInfo, i);
3,766✔
1248
          } else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
7,675,108✔
1249
            code = colDataSetVal(pColInfo, i, p, false);
6,004✔
1250
            QUERY_CHECK_CODE(code, lino, _end);
6,005!
1251
          } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
9,813,707!
1252
            char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
2,146,709✔
1253
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2,143,751!
1254
            varDataSetLen(tmp, tagVal.nData);
2,143,751✔
1255
            memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
2,143,751✔
1256
            code = colDataSetVal(pColInfo, i, tmp, false);
2,143,751✔
1257
#if TAG_FILTER_DEBUG
1258
            qDebug("tagfilter varch:%s", tmp + 2);
1259
#endif
1260
            taosMemoryFree(tmp);
2,142,994!
1261
            QUERY_CHECK_CODE(code, lino, _end);
2,144,603!
1262
          } else {
1263
            code = colDataSetVal(pColInfo, i, (const char*)&tagVal.i64, false);
5,522,395✔
1264
            QUERY_CHECK_CODE(code, lino, _end);
5,518,691!
1265
#if TAG_FILTER_DEBUG
1266
            if (pColInfo->info.type == TSDB_DATA_TYPE_INT) {
1267
              qDebug("tagfilter int:%d", *(int*)(&tagVal.i64));
1268
            } else if (pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE) {
1269
              qDebug("tagfilter double:%f", *(double*)(&tagVal.i64));
1270
            }
1271
#endif
1272
          }
1273
        }
1274
      }
1275
    }
1276
  }
1277

1278
_end:
1,386,996✔
1279
  if (code != TSDB_CODE_SUCCESS) {
1,386,996!
1280
    blockDataDestroy(pResBlock);
×
1281
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1282
    terrno = code;
×
1283
    return NULL;
×
1284
  }
1285
  return pResBlock;
1,386,996✔
1286
}
1287

1288
static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList,
1,302,892✔
1289
                                 bool* pResultList, bool addUid) {
1290
  taosArrayClear(pUidList);
1,302,892✔
1291

1292
  STableKeyInfo info = {.uid = 0, .groupId = 0};
1,302,846✔
1293
  int32_t       numOfTables = taosArrayGetSize(pUidTagList);
1,302,846✔
1294
  for (int32_t i = 0; i < numOfTables; ++i) {
4,417,795✔
1295
    if (pResultList[i]) {
3,113,669✔
1296
      STUidTagInfo* tmpTag = (STUidTagInfo*)taosArrayGet(pUidTagList, i);
2,483,863✔
1297
      if (!tmpTag) {
2,483,140!
1298
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1299
        return terrno;
×
1300
      }
1301
      uint64_t uid = tmpTag->uid;
2,483,140✔
1302
      qDebug("tagfilter get uid:%" PRId64 ", res:%d", uid, pResultList[i]);
2,483,140✔
1303

1304
      info.uid = uid;
2,483,168✔
1305
      void* p = taosArrayPush(pListInfo->pTableList, &info);
2,483,168✔
1306
      if (p == NULL) {
2,484,377!
1307
        return terrno;
×
1308
      }
1309

1310
      if (addUid) {
2,484,377!
1311
        void* tmp = taosArrayPush(pUidList, &uid);
×
1312
        if (tmp == NULL) {
×
1313
          return terrno;
×
1314
        }
1315
      }
1316
    }
1317
  }
1318

1319
  return TSDB_CODE_SUCCESS;
1,304,126✔
1320
}
1321

1322
static int32_t copyExistedUids(SArray* pUidTagList, const SArray* pUidList) {
1,341,524✔
1323
  int32_t code = TSDB_CODE_SUCCESS;
1,341,524✔
1324
  int32_t numOfExisted = taosArrayGetSize(pUidList);
1,341,524✔
1325
  if (numOfExisted == 0) {
1,341,955✔
1326
    return code;
1,312,249✔
1327
  }
1328

1329
  for (int32_t i = 0; i < numOfExisted; ++i) {
61,058✔
1330
    uint64_t*    uid = taosArrayGet(pUidList, i);
31,352✔
1331
    if (!uid) {
31,352!
1332
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1333
      return terrno;
×
1334
    }
1335
    STUidTagInfo info = {.uid = *uid};
31,352✔
1336
    void*        tmp = taosArrayPush(pUidTagList, &info);
31,352✔
1337
    if (!tmp) {
31,352!
1338
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1339
      return code;
×
1340
    }
1341
  }
1342
  return code;
29,706✔
1343
}
1344

1345
static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SNode* pTagCond, void* pVnode,
4,731,130✔
1346
                                 SIdxFltStatus status, SStorageAPI* pAPI, bool addUid, bool* listAdded) {
1347
  *listAdded = false;
4,731,130✔
1348
  if (pTagCond == NULL) {
4,731,130✔
1349
    return TSDB_CODE_SUCCESS;
3,390,555✔
1350
  }
1351

1352
  terrno = TSDB_CODE_SUCCESS;
1,340,575✔
1353

1354
  int32_t      lino = 0;
1,342,293✔
1355
  int32_t      code = TSDB_CODE_SUCCESS;
1,342,293✔
1356
  SArray*      pBlockList = NULL;
1,342,293✔
1357
  SSDataBlock* pResBlock = NULL;
1,342,293✔
1358
  SScalarParam output = {0};
1,342,293✔
1359
  SArray*      pUidTagList = NULL;
1,342,293✔
1360

1361
  tagFilterAssist ctx = {0};
1,342,293✔
1362
  ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
1,342,293✔
1363
  if (ctx.colHash == NULL) {
1,342,967!
1364
    code = terrno;
×
1365
    QUERY_CHECK_CODE(code, lino, end);
×
1366
  }
1367

1368
  ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
1,342,967✔
1369
  if (ctx.cInfoList == NULL) {
1,343,054!
1370
    code = terrno;
×
1371
    QUERY_CHECK_CODE(code, lino, end);
×
1372
  }
1373

1374
  nodesRewriteExprPostOrder(&pTagCond, getColumn, (void*)&ctx);
1,343,054✔
1375
  if (TSDB_CODE_SUCCESS != ctx.code) {
1,341,725!
1376
    terrno = code = ctx.code;
×
1377
    goto end;
×
1378
  }
1379

1380
  SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
1,341,725✔
1381

1382
  //  int64_t stt = taosGetTimestampUs();
1383
  pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
1,341,725✔
1384
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
1,341,700!
1385

1386
  code = copyExistedUids(pUidTagList, pUidList);
1,341,700✔
1387
  QUERY_CHECK_CODE(code, lino, end);
1,342,253!
1388

1389
  FilterCondType condType = checkTagCond(pTagCond);
1,342,253✔
1390

1391
  int32_t filter = optimizeTbnameInCond(pVnode, pListInfo->idInfo.suid, pUidTagList, pTagCond, pAPI);
1,342,102✔
1392
  if (filter == 0) {  // tbname in filter is activated, do nothing and return
1,339,970✔
1393
    taosArrayClear(pUidList);
416,101✔
1394

1395
    int32_t numOfRows = taosArrayGetSize(pUidTagList);
416,095✔
1396
    code = taosArrayEnsureCap(pUidList, numOfRows);
416,096✔
1397
    QUERY_CHECK_CODE(code, lino, end);
416,097!
1398

1399
    for (int32_t i = 0; i < numOfRows; ++i) {
832,174✔
1400
      STUidTagInfo* pInfo = taosArrayGet(pUidTagList, i);
416,073✔
1401
      QUERY_CHECK_NULL(pInfo, code, lino, end, terrno);
416,072!
1402
      void*         tmp = taosArrayPush(pUidList, &pInfo->uid);
416,072✔
1403
      QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
416,077!
1404
    }
1405
    terrno = 0;
416,101✔
1406
  } else {
1407
    if ((condType == FILTER_NO_LOGIC || condType == FILTER_AND) && status != SFLT_NOT_INDEX) {
923,869✔
1408
      code = pAPI->metaFn.getTableTagsByUid(pVnode, pListInfo->idInfo.suid, pUidTagList);
63✔
1409
    } else {
1410
      code = pAPI->metaFn.getTableTags(pVnode, pListInfo->idInfo.suid, pUidTagList);
923,806✔
1411
    }
1412
    if (code != TSDB_CODE_SUCCESS) {
926,589!
1413
      qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code), pListInfo->idInfo.suid);
×
1414
      terrno = code;
×
1415
      QUERY_CHECK_CODE(code, lino, end);
×
1416
    }
1417
  }
1418

1419
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
1,342,687✔
1420
  if (numOfTables == 0) {
1,341,834✔
1421
    goto end;
38,708✔
1422
  }
1423

1424
  pResBlock = createTagValBlockForFilter(ctx.cInfoList, numOfTables, pUidTagList, pVnode, pAPI);
1,303,126✔
1425
  if (pResBlock == NULL) {
1,304,120!
1426
    code = terrno;
×
1427
    QUERY_CHECK_CODE(code, lino, end);
×
1428
  }
1429

1430
  //  int64_t st1 = taosGetTimestampUs();
1431
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
1432
  pBlockList = taosArrayInit(2, POINTER_BYTES);
1,304,120✔
1433
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
1,304,242!
1434

1435
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
1,304,250✔
1436
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
1,304,250!
1437

1438
  code = createResultData(&type, numOfTables, &output);
1,304,250✔
1439
  if (code != TSDB_CODE_SUCCESS) {
1,304,317!
1440
    terrno = code;
×
1441
    QUERY_CHECK_CODE(code, lino, end);
×
1442
  }
1443

1444
  code = scalarCalculate(pTagCond, pBlockList, &output);
1,304,317✔
1445
  if (code != TSDB_CODE_SUCCESS) {
1,303,732✔
1446
    qError("failed to calculate scalar, reason:%s", tstrerror(code));
14!
1447
    terrno = code;
14✔
1448
    QUERY_CHECK_CODE(code, lino, end);
14!
1449
  }
1450

1451
  code = doSetQualifiedUid(pListInfo, pUidList, pUidTagList, (bool*)output.columnData->pData, addUid);
1,303,718✔
1452
  if (code != TSDB_CODE_SUCCESS) {
1,303,469✔
1453
    terrno = code;
3✔
1454
    QUERY_CHECK_CODE(code, lino, end);
×
1455
  }
1456
  *listAdded = true;
1,303,466✔
1457

1458
end:
1,342,188✔
1459
  if (code != TSDB_CODE_SUCCESS) {
1,342,188✔
1460
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
14!
1461
  }
1462
  taosHashCleanup(ctx.colHash);
1,342,188✔
1463
  taosArrayDestroy(ctx.cInfoList);
1,342,726✔
1464
  blockDataDestroy(pResBlock);
1,343,029✔
1465
  taosArrayDestroy(pBlockList);
1,343,110✔
1466
  taosArrayDestroyEx(pUidTagList, freeItem);
1,343,116✔
1467

1468
  colDataDestroy(output.columnData);
1,342,604✔
1469
  taosMemoryFreeClear(output.columnData);
1,343,129!
1470
  return code;
1,343,101✔
1471
}
1472

1473
int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
4,729,380✔
1474
                     STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI) {
1475
  int32_t code = TSDB_CODE_SUCCESS;
4,729,380✔
1476
  int32_t lino = 0;
4,729,380✔
1477
  size_t  numOfTables = 0;
4,729,380✔
1478
  bool    listAdded = false;
4,729,380✔
1479

1480
  pListInfo->idInfo.suid = pScanNode->suid;
4,729,380✔
1481
  pListInfo->idInfo.tableType = pScanNode->tableType;
4,729,380✔
1482

1483
  SArray* pUidList = taosArrayInit(8, sizeof(uint64_t));
4,729,380✔
1484
  QUERY_CHECK_NULL(pUidList, code, lino, _error, terrno);
4,735,355✔
1485

1486
  SIdxFltStatus status = SFLT_NOT_INDEX;
4,731,519✔
1487
  if (pScanNode->tableType != TSDB_SUPER_TABLE) {
4,731,519✔
1488
    pListInfo->idInfo.uid = pScanNode->uid;
725,786✔
1489
    if (pStorageAPI->metaFn.isTableExisted(pVnode, pScanNode->uid)) {
725,786✔
1490
      void* tmp = taosArrayPush(pUidList, &pScanNode->uid);
724,051✔
1491
      QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
724,043!
1492
    }
1493
    code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status, pStorageAPI, false, &listAdded);
725,844✔
1494
    QUERY_CHECK_CODE(code, lino, _end);
725,795!
1495
  } else {
1496
    T_MD5_CTX context = {0};
4,005,733✔
1497

1498
    if (tsTagFilterCache) {
4,005,733!
1499
      // try to retrieve the result from meta cache
1500
      code = genTagFilterDigest(pTagCond, &context);
×
1501
      QUERY_CHECK_CODE(code, lino, _error);
×
1502

1503
      bool acquired = false;
×
1504
      code = pStorageAPI->metaFn.getCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest),
×
1505
                                                    pUidList, &acquired);
1506
      QUERY_CHECK_CODE(code, lino, _error);
×
1507

1508
      if (acquired) {
×
1509
        digest[0] = 1;
×
1510
        memcpy(digest + 1, context.digest, tListLen(context.digest));
×
1511
        qDebug("retrieve table uid list from cache, numOfTables:%d", (int32_t)taosArrayGetSize(pUidList));
×
1512
        goto _end;
×
1513
      }
1514
    }
1515

1516
    if (!pTagCond) {  // no tag filter condition exists, let's fetch all tables of this super table
4,005,733✔
1517
      code = pStorageAPI->metaFn.getChildTableList(pVnode, pScanNode->suid, pUidList);
2,691,474✔
1518
      QUERY_CHECK_CODE(code, lino, _error);
2,695,420!
1519
    } else {
1520
      // failed to find the result in the cache, let try to calculate the results
1521
      if (pTagIndexCond) {
1,314,259✔
1522
        void* pIndex = pStorageAPI->metaFn.getInvertIndex(pVnode);
52,963✔
1523

1524
        SIndexMetaArg metaArg = {.metaEx = pVnode,
105,974✔
1525
                                 .idx = pStorageAPI->metaFn.storeGetIndexInfo(pVnode),
52,993✔
1526
                                 .ivtIdx = pIndex,
1527
                                 .suid = pScanNode->uid};
52,981✔
1528

1529
        status = SFLT_NOT_INDEX;
52,981✔
1530
        code = doFilterTag(pTagIndexCond, &metaArg, pUidList, &status, &pStorageAPI->metaFilter);
52,981✔
1531
        if (code != 0 || status == SFLT_NOT_INDEX) {  // temporarily disable it for performance sake
52,975✔
1532
          qDebug("failed to get tableIds from index, suid:%" PRIu64, pScanNode->uid);
52,856✔
1533
        } else {
1534
          qDebug("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
119✔
1535
        }
1536
      }
1537
    }
1538

1539
    code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status, pStorageAPI, tsTagFilterCache, &listAdded);
4,009,661✔
1540
    QUERY_CHECK_CODE(code, lino, _end);
4,005,205✔
1541

1542
    // let's add the filter results into meta-cache
1543
    numOfTables = taosArrayGetSize(pUidList);
4,005,191✔
1544

1545
    if (tsTagFilterCache) {
4,004,348✔
1546
      size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
613✔
1547
      char*  pPayload = taosMemoryMalloc(size);
613!
1548
      QUERY_CHECK_NULL(pPayload, code, lino, _end, terrno);
×
1549

1550
      *(int32_t*)pPayload = numOfTables;
×
1551
      if (numOfTables > 0) {
×
1552
        void* tmp = taosArrayGet(pUidList, 0);
×
1553
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1554
        memcpy(pPayload + sizeof(int32_t), tmp, numOfTables * sizeof(uint64_t));
×
1555
      }
1556

1557
      code = pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest),
×
1558
                                                    pPayload, size, 1);
1559
      QUERY_CHECK_CODE(code, lino, _error);
×
1560

1561
      digest[0] = 1;
×
1562
      memcpy(digest + 1, context.digest, tListLen(context.digest));
×
1563
    }
1564
  }
1565

1566
_end:
4,729,544✔
1567
  if (!listAdded) {
4,729,544✔
1568
    numOfTables = taosArrayGetSize(pUidList);
3,427,163✔
1569
    for (int i = 0; i < numOfTables; i++) {
12,935,251✔
1570
      void* tmp = taosArrayGet(pUidList, i);
9,506,920✔
1571
      QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
9,501,058!
1572
      STableKeyInfo info = {.uid = *(uint64_t*)tmp, .groupId = 0};
9,501,058✔
1573

1574
      void* p = taosArrayPush(pListInfo->pTableList, &info);
9,501,058✔
1575
      if (p == NULL) {
9,509,890!
1576
        taosArrayDestroy(pUidList);
×
1577
        return terrno;
×
1578
      }
1579

1580
      qTrace("tagfilter get uid:%" PRIu64 ", %s", info.uid, idstr);
9,509,890✔
1581
    }
1582
  }
1583

1584
_error:
4,730,712✔
1585
  taosArrayDestroy(pUidList);
4,730,712✔
1586
  if (code != TSDB_CODE_SUCCESS) {
4,734,328✔
1587
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
14!
1588
  }
1589
  return code;
4,732,465✔
1590
}
1591

1592
int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList, void* pTaskInfo) {
81✔
1593
  int32_t        code = TSDB_CODE_SUCCESS;
81✔
1594
  int32_t        lino = 0;
81✔
1595
  SSubplan*      pSubplan = (SSubplan*)node;
81✔
1596
  SScanPhysiNode pNode = {0};
81✔
1597
  pNode.suid = suid;
81✔
1598
  pNode.uid = suid;
81✔
1599
  pNode.tableType = TSDB_SUPER_TABLE;
81✔
1600

1601
  STableListInfo* pTableListInfo = tableListCreate();
81✔
1602
  QUERY_CHECK_NULL(pTableListInfo, code, lino, _end, terrno);
81!
1603
  uint8_t         digest[17] = {0};
81✔
1604
  code =
1605
      getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL,
81✔
1606
                   pTableListInfo, digest, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI);
1607
  QUERY_CHECK_CODE(code, lino, _end);
81!
1608
  *tableList = pTableListInfo->pTableList;
81✔
1609
  pTableListInfo->pTableList = NULL;
81✔
1610
  tableListDestroy(pTableListInfo);
81✔
1611

1612
_end:
81✔
1613
  if (code != TSDB_CODE_SUCCESS) {
81!
1614
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1615
  }
1616
  return code;
81✔
1617
}
1618

1619
size_t getTableTagsBufLen(const SNodeList* pGroups) {
9,777✔
1620
  size_t keyLen = 0;
9,777✔
1621

1622
  SNode* node;
1623
  FOREACH(node, pGroups) {
55,051!
1624
    SExprNode* pExpr = (SExprNode*)node;
45,274✔
1625
    keyLen += pExpr->resType.bytes;
45,274✔
1626
  }
1627

1628
  keyLen += sizeof(int8_t) * LIST_LENGTH(pGroups);
9,777!
1629
  return keyLen;
9,777✔
1630
}
1631

1632
int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId,
14✔
1633
                              SStorageAPI* pAPI) {
1634
  SMetaReader mr = {0};
14✔
1635

1636
  pAPI->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &pAPI->metaFn);
14✔
1637
  if (pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid) != 0) {  // table not exist
14!
1638
    pAPI->metaReaderFn.clearReader(&mr);
×
1639
    return TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
1640
  }
1641

1642
  SNodeList* groupNew = NULL;
14✔
1643
  int32_t    code = nodesCloneList(pGroupNode, &groupNew);
14✔
1644
  if (TSDB_CODE_SUCCESS != code) {
14!
1645
    pAPI->metaReaderFn.clearReader(&mr);
×
1646
    return code;
×
1647
  }
1648

1649
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
14✔
1650
  nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &ctx);
14✔
1651
  if (TSDB_CODE_SUCCESS != ctx.code) {
14!
1652
    nodesDestroyList(groupNew);
×
1653
    pAPI->metaReaderFn.clearReader(&mr);
×
1654
    return code;
×
1655
  }
1656
  char* isNull = (char*)keyBuf;
14✔
1657
  char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(pGroupNode);
14!
1658

1659
  SNode*  pNode;
1660
  int32_t index = 0;
14✔
1661
  FOREACH(pNode, groupNew) {
28!
1662
    SNode*  pNew = NULL;
14✔
1663
    int32_t code = scalarCalculateConstants(pNode, &pNew);
14✔
1664
    if (TSDB_CODE_SUCCESS == code) {
14!
1665
      REPLACE_NODE(pNew);
14✔
1666
    } else {
1667
      nodesDestroyList(groupNew);
×
1668
      pAPI->metaReaderFn.clearReader(&mr);
×
1669
      return code;
×
1670
    }
1671

1672
    if (nodeType(pNew) != QUERY_NODE_VALUE) {
14!
1673
      nodesDestroyList(groupNew);
×
1674
      pAPI->metaReaderFn.clearReader(&mr);
×
1675
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
1676
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1677
    }
1678
    SValueNode* pValue = (SValueNode*)pNew;
14✔
1679

1680
    if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
14!
1681
      isNull[index++] = 1;
×
1682
      continue;
×
1683
    } else {
1684
      isNull[index++] = 0;
14✔
1685
      char* data = nodesGetValueFromNode(pValue);
14✔
1686
      if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
14!
1687
        if (tTagIsJson(data)) {
×
1688
          terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
×
1689
          nodesDestroyList(groupNew);
×
1690
          pAPI->metaReaderFn.clearReader(&mr);
×
1691
          return terrno;
×
1692
        }
1693
        int32_t len = getJsonValueLen(data);
×
1694
        memcpy(pStart, data, len);
×
1695
        pStart += len;
×
1696
      } else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
14!
1697
        memcpy(pStart, data, varDataTLen(data));
2✔
1698
        pStart += varDataTLen(data);
2✔
1699
      } else {
1700
        memcpy(pStart, data, pValue->node.resType.bytes);
12✔
1701
        pStart += pValue->node.resType.bytes;
12✔
1702
      }
1703
    }
1704
  }
1705

1706
  int32_t len = (int32_t)(pStart - (char*)keyBuf);
14✔
1707
  *pGroupId = calcGroupId(keyBuf, len);
14✔
1708

1709
  nodesDestroyList(groupNew);
14✔
1710
  pAPI->metaReaderFn.clearReader(&mr);
14✔
1711
  
1712
  return TSDB_CODE_SUCCESS;
14✔
1713
}
1714

1715
SArray* makeColumnArrayFromList(SNodeList* pNodeList) {
27,998✔
1716
  if (!pNodeList) {
27,998!
1717
    return NULL;
×
1718
  }
1719

1720
  size_t  numOfCols = LIST_LENGTH(pNodeList);
27,998!
1721
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
27,998✔
1722
  if (pList == NULL) {
27,995!
1723
    return NULL;
×
1724
  }
1725

1726
  for (int32_t i = 0; i < numOfCols; ++i) {
106,493✔
1727
    SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
78,435✔
1728
    if (!pColNode) {
78,443!
1729
      taosArrayDestroy(pList);
×
1730
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
1731
      return NULL;
×
1732
    }
1733

1734
    // todo extract method
1735
    SColumn c = {0};
78,443✔
1736
    c.slotId = pColNode->slotId;
78,443✔
1737
    c.colId = pColNode->colId;
78,443✔
1738
    c.type = pColNode->node.resType.type;
78,443✔
1739
    c.bytes = pColNode->node.resType.bytes;
78,443✔
1740
    c.precision = pColNode->node.resType.precision;
78,443✔
1741
    c.scale = pColNode->node.resType.scale;
78,443✔
1742

1743
    void* tmp = taosArrayPush(pList, &c);
78,498✔
1744
    if (!tmp) {
78,498!
1745
      taosArrayDestroy(pList);
×
1746
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1747
      return NULL;
×
1748
    }
1749
  }
1750

1751
  return pList;
28,058✔
1752
}
1753

1754
int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
8,760,415✔
1755
                            int32_t type, SColMatchInfo* pMatchInfo) {
1756
  size_t  numOfCols = LIST_LENGTH(pNodeList);
8,760,415!
1757
  int32_t code = TSDB_CODE_SUCCESS;
8,760,415✔
1758
  int32_t lino = 0;
8,760,415✔
1759

1760
  pMatchInfo->matchType = type;
8,760,415✔
1761

1762
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchItem));
8,760,415✔
1763
  if (pList == NULL) {
8,761,037✔
1764
    code = terrno;
456✔
1765
    return code;
×
1766
  }
1767

1768
  for (int32_t i = 0; i < numOfCols; ++i) {
45,199,701✔
1769
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
36,420,258✔
1770
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
36,425,799!
1771
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
36,433,224✔
1772
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
36,340,151✔
1773

1774
      SColMatchItem c = {.needOutput = true};
36,340,151✔
1775
      c.colId = pColNode->colId;
36,340,151✔
1776
      c.srcSlotId = pColNode->slotId;
36,340,151✔
1777
      c.dstSlotId = pNode->slotId;
36,340,151✔
1778
      c.isPk = pColNode->isPk;
36,340,151✔
1779
      c.dataType = pColNode->node.resType;
36,340,151✔
1780
      void* tmp = taosArrayPush(pList, &c);
36,346,047✔
1781
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
36,346,047!
1782
    }
1783
  }
1784

1785
  // set the output flag for each column in SColMatchInfo, according to the
1786
  *numOfOutputCols = 0;
8,779,443✔
1787
  int32_t num = LIST_LENGTH(pOutputNodeList->pSlots);
8,779,443✔
1788
  for (int32_t i = 0; i < num; ++i) {
52,908,870✔
1789
    SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i);
44,117,455✔
1790
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
44,184,309!
1791

1792
    // todo: add reserve flag check
1793
    // it is a column reserved for the arithmetic expression calculation
1794
    if (pNode->slotId >= numOfCols) {
44,201,165✔
1795
      (*numOfOutputCols) += 1;
7,759,300✔
1796
      continue;
7,759,300✔
1797
    }
1798

1799
    SColMatchItem* info = NULL;
36,441,865✔
1800
    for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
905,769,635✔
1801
      info = taosArrayGet(pList, j);
906,681,711✔
1802
      QUERY_CHECK_NULL(info, code, lino, _end, terrno);
907,746,804✔
1803
      if (info->dstSlotId == pNode->slotId) {
905,426,989✔
1804
        break;
36,099,219✔
1805
      }
1806
    }
1807

1808
    if (pNode->output) {
36,370,127✔
1809
      (*numOfOutputCols) += 1;
36,301,818✔
1810
    } else if (info != NULL) {
68,309!
1811
      // select distinct tbname from stb where tbname='abc';
1812
      info->needOutput = false;
68,314✔
1813
    }
1814
  }
1815

1816
  pMatchInfo->pList = pList;
8,791,415✔
1817

1818
_end:
8,791,415✔
1819
  if (code != TSDB_CODE_SUCCESS) {
8,791,415!
1820
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1821
  }
1822
  return code;
8,759,969✔
1823
}
1824

1825
static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision,
29,006,163✔
1826
                                  const char* name) {
1827
  SResSchema s = {0};
29,006,163✔
1828
  s.scale = scale;
29,006,163✔
1829
  s.type = type;
29,006,163✔
1830
  s.bytes = bytes;
29,006,163✔
1831
  s.slotId = slotId;
29,006,163✔
1832
  s.precision = precision;
29,006,163✔
1833
  tstrncpy(s.name, name, tListLen(s.name));
29,006,163✔
1834

1835
  return s;
29,006,163✔
1836
}
1837

1838
static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType, EColumnType colType) {
27,585,267✔
1839
  SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn));
27,585,267!
1840
  if (pCol == NULL) {
27,588,773!
1841
    return NULL;
×
1842
  }
1843

1844
  pCol->slotId = slotId;
27,588,773✔
1845
  pCol->colId = colId;
27,588,773✔
1846
  pCol->bytes = pType->bytes;
27,588,773✔
1847
  pCol->type = pType->type;
27,588,773✔
1848
  pCol->scale = pType->scale;
27,588,773✔
1849
  pCol->precision = pType->precision;
27,588,773✔
1850
  pCol->dataBlockId = blockId;
27,588,773✔
1851
  pCol->colType = colType;
27,588,773✔
1852
  return pCol;
27,588,773✔
1853
}
1854

1855
int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
28,999,190✔
1856
  int32_t code = TSDB_CODE_SUCCESS;
28,999,190✔
1857
  int32_t lino = 0;
28,999,190✔
1858
  pExp->base.numOfParams = 0;
28,999,190✔
1859
  pExp->base.pParam = NULL;
28,999,190✔
1860
  pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode));
28,999,190!
1861
  QUERY_CHECK_NULL(pExp->pExpr, code, lino, _end, terrno);
29,015,689!
1862

1863
  pExp->pExpr->_function.num = 1;
29,015,689✔
1864
  pExp->pExpr->_function.functionId = -1;
29,015,689✔
1865

1866
  int32_t type = nodeType(pNode);
29,015,689✔
1867
  // it is a project query, or group by column
1868
  if (type == QUERY_NODE_COLUMN) {
29,015,689✔
1869
    pExp->pExpr->nodeType = QUERY_NODE_COLUMN;
18,345,480✔
1870
    SColumnNode* pColNode = (SColumnNode*)pNode;
18,345,480✔
1871

1872
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
18,345,480✔
1873
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
18,346,159!
1874

1875
    pExp->base.numOfParams = 1;
18,346,159✔
1876

1877
    SDataType* pType = &pColNode->node.resType;
18,346,159✔
1878
    pExp->base.resSchema =
1879
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pColNode->colName);
18,346,159✔
1880

1881
    pExp->base.pParam[0].pCol =
36,677,686✔
1882
        createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType);
18,341,189✔
1883
    QUERY_CHECK_NULL(pExp->base.pParam[0].pCol, code, lino, _end, terrno);
18,336,497!
1884

1885
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
18,336,497✔
1886
  } else if (type == QUERY_NODE_VALUE) {
10,670,209✔
1887
    pExp->pExpr->nodeType = QUERY_NODE_VALUE;
251,985✔
1888
    SValueNode* pValNode = (SValueNode*)pNode;
251,985✔
1889

1890
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
251,985!
1891
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
252,069!
1892

1893
    pExp->base.numOfParams = 1;
252,069✔
1894

1895
    SDataType* pType = &pValNode->node.resType;
252,069✔
1896
    pExp->base.resSchema =
1897
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName);
252,069✔
1898
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
252,038✔
1899
    code = nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
252,038✔
1900
    QUERY_CHECK_CODE(code, lino, _end);
236,868!
1901
  } else if (type == QUERY_NODE_FUNCTION) {
10,418,224✔
1902
    pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
10,184,258✔
1903
    SFunctionNode* pFuncNode = (SFunctionNode*)pNode;
10,184,258✔
1904

1905
    SDataType* pType = &pFuncNode->node.resType;
10,184,258✔
1906
    pExp->base.resSchema =
1907
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pFuncNode->node.aliasName);
10,184,258✔
1908
    tExprNode* pExprNode = pExp->pExpr;
10,185,038✔
1909

1910
    pExprNode->_function.functionId = pFuncNode->funcId;
10,185,038✔
1911
    pExprNode->_function.pFunctNode = pFuncNode;
10,185,038✔
1912
    pExprNode->_function.functionType = pFuncNode->funcType;
10,185,038✔
1913

1914
    tstrncpy(pExprNode->_function.functionName, pFuncNode->functionName, tListLen(pExprNode->_function.functionName));
10,185,038✔
1915

1916
#if 1
1917
    // todo refactor: add the parameter for tbname function
1918
    const char* name = "tbname";
10,185,038✔
1919
    int32_t     len = strlen(name);
10,185,038✔
1920

1921
    if (!pFuncNode->pParameterList && (memcmp(pExprNode->_function.functionName, name, len) == 0) &&
10,185,038✔
1922
        pExprNode->_function.functionName[len] == 0) {
622,859!
1923
      pFuncNode->pParameterList = NULL;
622,980✔
1924
      int32_t     code = nodesMakeList(&pFuncNode->pParameterList);
622,980✔
1925
      SValueNode* res = NULL;
622,807✔
1926
      if (TSDB_CODE_SUCCESS == code) {
622,807!
1927
        code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
622,858✔
1928
      }
1929
      QUERY_CHECK_CODE(code, lino, _end);
622,825!
1930
      res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
622,825✔
1931
      code = nodesListAppend(pFuncNode->pParameterList, (SNode*)res);
622,825✔
1932
      if (code != TSDB_CODE_SUCCESS) {
623,006✔
1933
        nodesDestroyNode((SNode*)res);
8✔
1934
        res = NULL;
×
1935
      }
1936
      QUERY_CHECK_CODE(code, lino, _end);
622,998!
1937
    }
1938
#endif
1939

1940
    int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
10,185,056✔
1941

1942
    pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam));
10,185,056!
1943
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
10,188,551!
1944
    pExp->base.numOfParams = numOfParam;
10,188,551✔
1945

1946
    for (int32_t j = 0; j < numOfParam && TSDB_CODE_SUCCESS == code; ++j) {
21,874,015!
1947
      SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
11,684,823✔
1948
      QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
11,685,866!
1949
      if (p1->type == QUERY_NODE_COLUMN) {
11,686,611✔
1950
        SColumnNode* pcn = (SColumnNode*)p1;
9,250,485✔
1951

1952
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
9,250,485✔
1953
        pExp->base.pParam[j].pCol =
18,501,412✔
1954
            createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType);
9,250,485✔
1955
        QUERY_CHECK_NULL(pExp->base.pParam[j].pCol, code, lino, _end, terrno);
9,250,927✔
1956
      } else if (p1->type == QUERY_NODE_VALUE) {
2,436,126✔
1957
        SValueNode* pvn = (SValueNode*)p1;
2,017,948✔
1958
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
2,017,948✔
1959
        code = nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
2,017,948✔
1960
        QUERY_CHECK_CODE(code, lino, _end);
2,017,293!
1961
      }
1962
    }
1963
    pExp->pExpr->_function.bindExprID = ((SExprNode*)pNode)->bindExprID;
10,189,192✔
1964
  } else if (type == QUERY_NODE_OPERATOR) {
233,966✔
1965
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
224,017✔
1966
    SOperatorNode* pOpNode = (SOperatorNode*)pNode;
224,017✔
1967

1968
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
224,017!
1969
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
224,028!
1970
    pExp->base.numOfParams = 1;
224,028✔
1971

1972
    SDataType* pType = &pOpNode->node.resType;
224,028✔
1973
    pExp->base.resSchema =
1974
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pOpNode->node.aliasName);
224,028✔
1975
    pExp->pExpr->_optrRoot.pRootNode = pNode;
223,997✔
1976
  } else if (type == QUERY_NODE_CASE_WHEN) {
9,949✔
1977
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
6,192✔
1978
    SCaseWhenNode* pCaseNode = (SCaseWhenNode*)pNode;
6,192✔
1979

1980
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
6,192!
1981
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
6,193!
1982
    pExp->base.numOfParams = 1;
6,193✔
1983

1984
    SDataType* pType = &pCaseNode->node.resType;
6,193✔
1985
    pExp->base.resSchema =
1986
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCaseNode->node.aliasName);
6,193✔
1987
    pExp->pExpr->_optrRoot.pRootNode = pNode;
6,190✔
1988
  } else if (type == QUERY_NODE_LOGIC_CONDITION) {
3,757✔
1989
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
2✔
1990
    SLogicConditionNode* pCond = (SLogicConditionNode*)pNode;
2✔
1991
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
2!
1992
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
2!
1993
    pExp->base.numOfParams = 1;
2✔
1994
    SDataType* pType = &pCond->node.resType;
2✔
1995
    pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName);
2✔
1996
    pExp->pExpr->_optrRoot.pRootNode = pNode;
2✔
1997
  } else {
1998
    code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
3,755✔
1999
    QUERY_CHECK_CODE(code, lino, _end);
3,755!
2000
  }
2001
  pExp->pExpr->relatedTo = ((SExprNode*)pNode)->relatedTo;
28,996,501✔
2002
_end:
28,996,501✔
2003
  if (code != TSDB_CODE_SUCCESS) {
28,996,501!
2004
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2005
  }
2006
  return code;
28,992,355✔
2007
}
2008

2009
int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) {
28,974,953✔
2010
  return createExprFromOneNode(pExp, pTargetNode->pExpr, pTargetNode->slotId);
28,974,953✔
2011
}
2012

2013
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
2,699✔
2014
  *numOfExprs = LIST_LENGTH(pNodeList);
2,699!
2015
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
2,699!
2016
  if (!pExprs) {
2,699!
2017
    return NULL;
×
2018
  }
2019

2020
  for (int32_t i = 0; i < (*numOfExprs); ++i) {
22,794✔
2021
    SExprInfo* pExp = &pExprs[i];
20,095✔
2022
    int32_t    code = createExprFromOneNode(pExp, nodesListGetNode(pNodeList, i), i + UD_TAG_COLUMN_INDEX);
20,095✔
2023
    if (code != TSDB_CODE_SUCCESS) {
20,095!
2024
      taosMemoryFreeClear(pExprs);
×
2025
      terrno = code;
×
2026
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2027
      return NULL;
×
2028
    }
2029
  }
2030

2031
  return pExprs;
2,699✔
2032
}
2033

2034
int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs) {
11,314,811✔
2035
  QRY_PARAM_CHECK(pExprInfo);
11,314,811!
2036

2037
  int32_t code = 0;
11,314,811✔
2038
  int32_t numOfFuncs = LIST_LENGTH(pNodeList);
11,314,811✔
2039
  int32_t numOfGroupKeys = 0;
11,314,811✔
2040
  if (pGroupKeys != NULL) {
11,314,811✔
2041
    numOfGroupKeys = LIST_LENGTH(pGroupKeys);
364,523✔
2042
  }
2043

2044
  *numOfExprs = numOfFuncs + numOfGroupKeys;
11,314,811✔
2045
  if (*numOfExprs == 0) {
11,314,811✔
2046
    return code;
1,574,953✔
2047
  }
2048

2049
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
9,739,858!
2050
  if (pExprs == NULL) {
9,741,472!
2051
    return terrno;
×
2052
  }
2053

2054
  for (int32_t i = 0; i < (*numOfExprs); ++i) {
38,324,476✔
2055
    STargetNode* pTargetNode = NULL;
28,579,158✔
2056
    if (i < numOfFuncs) {
28,579,158✔
2057
      pTargetNode = (STargetNode*)nodesListGetNode(pNodeList, i);
28,098,630✔
2058
    } else {
2059
      pTargetNode = (STargetNode*)nodesListGetNode(pGroupKeys, i - numOfFuncs);
480,528✔
2060
    }
2061
    if (!pTargetNode) {
28,593,545!
2062
      destroyExprInfo(pExprs, *numOfExprs);
×
2063
      taosMemoryFreeClear(pExprs);
×
2064
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
2065
      return terrno;
×
2066
    }
2067

2068
    SExprInfo* pExp = &pExprs[i];
28,593,545✔
2069
    code = createExprFromTargetNode(pExp, pTargetNode);
28,593,545✔
2070
    if (code != TSDB_CODE_SUCCESS) {
28,583,525✔
2071
      destroyExprInfo(pExprs, *numOfExprs);
521✔
2072
      taosMemoryFreeClear(pExprs);
×
2073
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2074
      return code;
×
2075
    }
2076
  }
2077

2078
  *pExprInfo = pExprs;
9,745,318✔
2079
  return code;
9,745,318✔
2080
}
2081

2082
static void deleteSubsidiareCtx(void* pData) {
×
2083
  SSubsidiaryResInfo* pCtx = (SSubsidiaryResInfo*)pData;
×
2084
  if (pCtx->pCtx) {
×
2085
    taosMemoryFreeClear(pCtx->pCtx);
×
2086
  }
2087
}
×
2088

2089
// set the output buffer for the selectivity + tag query
2090
static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
10,978,705✔
2091
  int32_t num = 0;
10,978,705✔
2092
  int32_t code = TSDB_CODE_SUCCESS;
10,978,705✔
2093
  int32_t lino = 0;
10,978,705✔
2094

2095
  SArray* pValCtxArray = NULL;
10,978,705✔
2096
  for (int32_t i = numOfOutput - 1; i > 0; --i) {  // select Func is at the end of the list
29,617,643✔
2097
    int32_t funcIdx = pCtx[i].pExpr->pExpr->_function.bindExprID;
18,638,942✔
2098
    if (funcIdx > 0) {
18,638,942✔
2099
      if (pValCtxArray == NULL) {
3,492✔
2100
        // the end of the list is the select function of biggest index
2101
        pValCtxArray = taosArrayInit_s(sizeof(SSubsidiaryResInfo*), funcIdx);
2,476✔
2102
        if (pValCtxArray == NULL) {
2,474✔
2103
          return terrno;
3✔
2104
        }
2105
      }
2106
      if (funcIdx > pValCtxArray->size) {
3,487!
2107
        qError("funcIdx:%d is out of range", funcIdx);
×
2108
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2109
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2110
      }
2111
      SSubsidiaryResInfo* pSubsidiary = &pCtx[i].subsidiaries;
3,487✔
2112
      pSubsidiary->pCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
3,487!
2113
      if (pSubsidiary->pCtx == NULL) {
3,496!
2114
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2115
        return terrno;
×
2116
      }
2117
      pSubsidiary->num = 0;
3,496✔
2118
      taosArraySet(pValCtxArray, funcIdx - 1, &pSubsidiary);
3,496✔
2119
    }
2120
  }
2121

2122
  SqlFunctionCtx*  p = NULL;
10,978,701✔
2123
  SqlFunctionCtx** pValCtx = NULL;
10,978,701✔
2124
  if (pValCtxArray == NULL) {
10,978,701✔
2125
    pValCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
10,973,357!
2126
    if (pValCtx == NULL) {
10,979,716!
2127
      QUERY_CHECK_CODE(terrno, lino, _end);
×
2128
    }
2129
  }
2130

2131
  for (int32_t i = 0; i < numOfOutput; ++i) {
39,694,955✔
2132
    const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
28,715,090✔
2133
    if ((strcmp(pName, "_select_value") == 0)) {
28,715,090✔
2134
      if (pValCtxArray == NULL) {
1,122,883✔
2135
        pValCtx[num++] = &pCtx[i];
1,118,274✔
2136
      } else {
2137
        int32_t bindFuncIndex = pCtx[i].pExpr->pExpr->relatedTo;  // start from index 1;
4,609✔
2138
        if (bindFuncIndex > 0) {  // 0 is default index related to the select function
4,609!
2139
          bindFuncIndex -= 1;
4,706✔
2140
        }
2141
        SSubsidiaryResInfo** pSubsidiary = taosArrayGet(pValCtxArray, bindFuncIndex);
4,609✔
2142
        if(pSubsidiary == NULL) {
4,826!
2143
          QUERY_CHECK_CODE(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, lino, _end);
×
2144
        }
2145
        (*pSubsidiary)->pCtx[(*pSubsidiary)->num] = &pCtx[i];
4,826✔
2146
        (*pSubsidiary)->num++;
4,826✔
2147
      }
2148
    } else if (fmIsSelectFunc(pCtx[i].functionId)) {
27,592,207✔
2149
       if (pValCtxArray == NULL) {
2,477,889✔
2150
        p = &pCtx[i];
2,473,314✔
2151
       }
2152
    }
2153
  }
2154

2155
  if (p != NULL) {
10,979,865✔
2156
    p->subsidiaries.pCtx = pValCtx;
2,131,872✔
2157
    p->subsidiaries.num = num;
2,131,872✔
2158
  } else {
2159
    taosMemoryFreeClear(pValCtx);
8,847,993!
2160
  }
2161

2162
_end:
6,813✔
2163
  if (code != TSDB_CODE_SUCCESS) {
10,984,072!
2164
    taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2165
    taosMemoryFreeClear(pValCtx);
×
2166
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2167
  } else {
2168
    taosArrayDestroy(pValCtxArray);
10,984,072✔
2169
  }
2170
  return code;
10,972,516✔
2171
}
2172

2173
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset,
10,977,430✔
2174
                                     SFunctionStateStore* pStore) {
2175
  int32_t         code = TSDB_CODE_SUCCESS;
10,977,430✔
2176
  int32_t         lino = 0;
10,977,430✔
2177
  SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
10,977,430!
2178
  if (pFuncCtx == NULL) {
10,975,912!
2179
    return NULL;
×
2180
  }
2181

2182
  *rowEntryInfoOffset = taosMemoryCalloc(numOfOutput, sizeof(int32_t));
10,975,912!
2183
  if (*rowEntryInfoOffset == 0) {
10,984,849!
2184
    taosMemoryFreeClear(pFuncCtx);
×
2185
    return NULL;
×
2186
  }
2187

2188
  for (int32_t i = 0; i < numOfOutput; ++i) {
39,737,797✔
2189
    SExprInfo* pExpr = &pExprInfo[i];
28,742,600✔
2190

2191
    SExprBasicInfo* pFunct = &pExpr->base;
28,742,600✔
2192
    SqlFunctionCtx* pCtx = &pFuncCtx[i];
28,742,600✔
2193

2194
    pCtx->functionId = -1;
28,742,600✔
2195
    pCtx->pExpr = pExpr;
28,742,600✔
2196

2197
    if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) {
28,742,600✔
2198
      SFuncExecEnv env = {0};
10,182,026✔
2199
      pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
10,182,026✔
2200
      pCtx->isPseudoFunc = fmIsWindowPseudoColumnFunc(pCtx->functionId);
10,182,026✔
2201
      pCtx->isNotNullFunc = fmIsNotNullOutputFunc(pCtx->functionId);
10,175,988✔
2202

2203
      bool isUdaf = fmIsUserDefinedFunc(pCtx->functionId);
10,177,663✔
2204
      if (fmIsAggFunc(pCtx->functionId) || fmIsIndefiniteRowsFunc(pCtx->functionId)) {
17,103,438✔
2205
        if (!isUdaf) {
6,925,931✔
2206
          code = fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
6,925,899✔
2207
          QUERY_CHECK_CODE(code, lino, _end);
6,924,999!
2208
        } else {
2209
          char* udfName = pExpr->pExpr->_function.pFunctNode->functionName;
32✔
2210
          pCtx->udfName = taosStrdup(udfName);
32!
2211
          QUERY_CHECK_NULL(pCtx->udfName, code, lino, _end, terrno);
32!
2212

2213
          code = fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
32✔
2214
          QUERY_CHECK_CODE(code, lino, _end);
32!
2215
        }
2216
        bool tmp = pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
6,925,031✔
2217
        if (!tmp) {
6,926,336!
2218
          code = terrno;
×
2219
          QUERY_CHECK_CODE(code, lino, _end);
×
2220
        }
2221
      } else {
2222
        code = fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp);
3,246,112✔
2223
        if (code != TSDB_CODE_SUCCESS && isUdaf) {
3,243,801!
2224
          code = TSDB_CODE_SUCCESS;
262✔
2225
        }
2226
        QUERY_CHECK_CODE(code, lino, _end);
3,243,801!
2227

2228
        if (pCtx->sfp.getEnv != NULL) {
3,243,801✔
2229
          bool tmp = pCtx->sfp.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
1,843,473✔
2230
          if (!tmp) {
1,842,037✔
2231
            code = terrno;
1,235✔
2232
            QUERY_CHECK_CODE(code, lino, _end);
×
2233
          }
2234
        }
2235
      }
2236
      pCtx->resDataInfo.interBufSize = env.calcMemSize;
10,169,481✔
2237
    } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR ||
18,560,574✔
2238
               pExpr->pExpr->nodeType == QUERY_NODE_VALUE) {
252,026!
2239
      // for simple column, the result buffer needs to hold at least one element.
2240
      pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes;
18,563,527✔
2241
    }
2242

2243
    pCtx->input.numOfInputCols = pFunct->numOfParams;
28,730,055✔
2244
    pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
28,730,055!
2245
    QUERY_CHECK_NULL(pCtx->input.pData, code, lino, _end, terrno);
28,746,484!
2246
    pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
28,746,484!
2247
    QUERY_CHECK_NULL(pCtx->input.pColumnDataAgg, code, lino, _end, terrno);
28,752,948!
2248

2249
    pCtx->pTsOutput = NULL;
28,752,948✔
2250
    pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
28,752,948✔
2251
    pCtx->resDataInfo.type = pFunct->resSchema.type;
28,752,948✔
2252
    pCtx->order = TSDB_ORDER_ASC;
28,752,948✔
2253
    pCtx->start.key = INT64_MIN;
28,752,948✔
2254
    pCtx->end.key = INT64_MIN;
28,752,948✔
2255
    pCtx->numOfParams = pExpr->base.numOfParams;
28,752,948✔
2256
    pCtx->param = pFunct->pParam;
28,752,948✔
2257
    pCtx->saveHandle.currentPage = -1;
28,752,948✔
2258
    pCtx->pStore = pStore;
28,752,948✔
2259
    pCtx->hasWindowOrGroup = false;
28,752,948✔
2260
    pCtx->needCleanup = false;
28,752,948✔
2261
  }
2262

2263
  for (int32_t i = 1; i < numOfOutput; ++i) {
29,645,006✔
2264
    (*rowEntryInfoOffset)[i] = (int32_t)((*rowEntryInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) +
18,649,809✔
2265
                                         pFuncCtx[i - 1].resDataInfo.interBufSize);
18,649,809✔
2266
  }
2267

2268
  code = setSelectValueColumnInfo(pFuncCtx, numOfOutput);
10,995,197✔
2269
  QUERY_CHECK_CODE(code, lino, _end);
10,973,730!
2270

2271
_end:
10,973,730✔
2272
  if (code != TSDB_CODE_SUCCESS) {
10,973,730!
2273
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2274
    for (int32_t i = 0; i < numOfOutput; ++i) {
×
2275
      taosMemoryFree(pFuncCtx[i].input.pData);
×
2276
      taosMemoryFree(pFuncCtx[i].input.pColumnDataAgg);
×
2277
    }
2278
    taosMemoryFreeClear(*rowEntryInfoOffset);
×
2279
    taosMemoryFreeClear(pFuncCtx);
×
2280

2281
    terrno = code;
×
2282
    return NULL;
×
2283
  }
2284
  return pFuncCtx;
10,973,730✔
2285
}
2286

2287
// NOTE: sources columns are more than the destination SSDatablock columns.
2288
// doFilter in table scan needs every column even its output is false
2289
int32_t relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn) {
1,088,858✔
2290
  int32_t code = TSDB_CODE_SUCCESS;
1,088,858✔
2291
  size_t  numOfSrcCols = taosArrayGetSize(pCols);
1,088,858✔
2292

2293
  int32_t i = 0, j = 0;
1,089,029✔
2294
  while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
2,480,628✔
2295
    SColumnInfoData* p = taosArrayGet(pCols, i);
1,392,027✔
2296
    if (!p) {
1,391,557!
2297
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
2298
      return terrno;
×
2299
    }
2300
    SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j);
1,391,557✔
2301
    if (!pmInfo) {
1,391,343!
2302
      return terrno;
×
2303
    }
2304

2305
    if (p->info.colId == pmInfo->colId) {
1,391,607✔
2306
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->dstSlotId);
1,275,043✔
2307
      if (!pDst) {
1,274,906!
2308
        return terrno;
×
2309
      }
2310
      code = colDataAssign(pDst, p, pBlock->info.rows, &pBlock->info);
1,274,906✔
2311
      if (code != TSDB_CODE_SUCCESS) {
1,274,970!
2312
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2313
        return code;
×
2314
      }
2315
      i++;
1,275,037✔
2316
      j++;
1,275,037✔
2317
    } else if (p->info.colId < pmInfo->colId) {
116,564✔
2318
      i++;
116,562✔
2319
    } else {
2320
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
2!
2321
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2322
    }
2323
  }
2324
  return code;
1,088,963✔
2325
}
2326

2327
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
2,785,303✔
2328
  SInterval interval = {
2,785,303✔
2329
      .interval = pTableScanNode->interval,
2,785,303✔
2330
      .sliding = pTableScanNode->sliding,
2,785,303✔
2331
      .intervalUnit = pTableScanNode->intervalUnit,
2,785,303✔
2332
      .slidingUnit = pTableScanNode->slidingUnit,
2,785,303✔
2333
      .offset = pTableScanNode->offset,
2,785,303✔
2334
      .precision = pTableScanNode->scan.node.pOutputDataBlockDesc->precision,
2,785,303✔
2335
      .timeRange = pTableScanNode->scanRange,
2336
  };
2337
  calcIntervalAutoOffset(&interval);
2,785,303✔
2338

2339
  return interval;
2,787,953✔
2340
}
2341

2342
SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
613,251✔
2343
  SColumn c = {0};
613,251✔
2344

2345
  c.slotId = pColNode->slotId;
613,251✔
2346
  c.colId = pColNode->colId;
613,251✔
2347
  c.type = pColNode->node.resType.type;
613,251✔
2348
  c.bytes = pColNode->node.resType.bytes;
613,251✔
2349
  c.scale = pColNode->node.resType.scale;
613,251✔
2350
  c.precision = pColNode->node.resType.precision;
613,251✔
2351
  return c;
613,251✔
2352
}
2353

2354
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode,
4,763,079✔
2355
                               const SReadHandle* readHandle) {
2356
  pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
4,763,079✔
2357
  pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
4,763,079!
2358

2359
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
4,763,079!
2360
  if (!pCond->colList) {
4,771,098!
2361
    return terrno;
×
2362
  }
2363
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
4,771,098!
2364
  if (pCond->pSlotList == NULL) {
4,763,185!
2365
    taosMemoryFreeClear(pCond->colList);
×
2366
    return terrno;
×
2367
  }
2368

2369
  // TODO: get it from stable scan node
2370
  pCond->twindows = pTableScanNode->scanRange;
4,763,185✔
2371
  pCond->suid = pTableScanNode->scan.suid;
4,763,185✔
2372
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
4,763,185✔
2373
  pCond->startVersion = -1;
4,763,185✔
2374
  pCond->endVersion = -1;
4,763,185✔
2375
  pCond->skipRollup = readHandle->skipRollup;
4,763,185✔
2376

2377
  // allowed read stt file optimization mode
2378
  pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) &&
9,710,553✔
2379
                       (pTableScanNode->scan.node.pConditions == NULL) && (pTableScanNode->interval == 0);
4,763,185✔
2380

2381
  int32_t j = 0;
4,763,185✔
2382
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
28,028,409✔
2383
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pTableScanNode->scan.pScanCols, i);
23,266,935✔
2384
    if (!pNode) {
23,278,538✔
2385
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
13,314!
2386
      return terrno;
13,314✔
2387
    }
2388
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
23,265,224✔
2389
    if (pColNode->colType == COLUMN_TYPE_TAG) {
23,265,224!
2390
      continue;
×
2391
    }
2392

2393
    pCond->colList[j].type = pColNode->node.resType.type;
23,265,224✔
2394
    pCond->colList[j].bytes = pColNode->node.resType.bytes;
23,265,224✔
2395
    pCond->colList[j].colId = pColNode->colId;
23,265,224✔
2396
    pCond->colList[j].pk = pColNode->isPk;
23,265,224✔
2397

2398
    pCond->pSlotList[j] = pNode->slotId;
23,265,224✔
2399
    j += 1;
23,265,224✔
2400
  }
2401

2402
  pCond->numOfCols = j;
4,761,474✔
2403
  return TSDB_CODE_SUCCESS;
4,761,474✔
2404
}
2405

UNCOV
2406
int32_t initQueryTableDataCondWithColArray(SQueryTableDataCond* pCond, SQueryTableDataCond* pOrgCond,
×
2407
                                           const SReadHandle* readHandle, SArray* colArray) {
UNCOV
2408
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
2409
  int32_t lino = 0;
×
2410

UNCOV
2411
  pCond->order = TSDB_ORDER_ASC;
×
UNCOV
2412
  pCond->numOfCols = (int32_t)taosArrayGetSize(colArray);
×
2413

UNCOV
2414
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
×
UNCOV
2415
  QUERY_CHECK_NULL(pCond->colList, code, lino, _return, terrno);
×
2416

2417
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
×
2418
  QUERY_CHECK_NULL(pCond->pSlotList, code, lino, _return, terrno);
×
2419

UNCOV
2420
  pCond->twindows = pOrgCond->twindows;
×
UNCOV
2421
  pCond->type = pOrgCond->type;
×
UNCOV
2422
  pCond->startVersion = -1;
×
UNCOV
2423
  pCond->endVersion = -1;
×
UNCOV
2424
  pCond->skipRollup = true;
×
UNCOV
2425
  pCond->notLoadData = false;
×
2426

UNCOV
2427
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
×
UNCOV
2428
    SColIdPair* pColPair = taosArrayGet(colArray, i);
×
UNCOV
2429
    QUERY_CHECK_NULL(pColPair, code, lino, _return, terrno);
×
2430

UNCOV
2431
    bool find = false;
×
UNCOV
2432
    for (int32_t j = 0; j < pOrgCond->numOfCols; ++j) {
×
UNCOV
2433
      if (pOrgCond->colList[j].colId == pColPair->vtbColId) {
×
UNCOV
2434
        pCond->colList[i].type = pOrgCond->colList[j].type;
×
UNCOV
2435
        pCond->colList[i].bytes = pOrgCond->colList[j].bytes;
×
UNCOV
2436
        pCond->colList[i].colId = pColPair->orgColId;
×
UNCOV
2437
        pCond->colList[i].pk = pOrgCond->colList[j].pk;
×
UNCOV
2438
        pCond->pSlotList[i] = i;
×
UNCOV
2439
        find = true;
×
UNCOV
2440
        break;
×
2441
      }
2442
    }
UNCOV
2443
    QUERY_CHECK_CONDITION(find, code, lino, _return, TSDB_CODE_NOT_FOUND);
×
2444
  }
2445

UNCOV
2446
  return code;
×
UNCOV
2447
_return:
×
UNCOV
2448
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(terrno));
×
UNCOV
2449
  taosMemoryFreeClear(pCond->colList);
×
UNCOV
2450
  taosMemoryFreeClear(pCond->pSlotList);
×
UNCOV
2451
  return code;
×
2452
}
2453

2454
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) {
11,529,118✔
2455
  taosMemoryFreeClear(pCond->colList);
11,529,118!
2456
  taosMemoryFreeClear(pCond->pSlotList);
11,529,379!
2457
}
11,529,465✔
2458

2459
int32_t convertFillType(int32_t mode) {
538,171✔
2460
  int32_t type = TSDB_FILL_NONE;
538,171✔
2461
  switch (mode) {
538,171!
2462
    case FILL_MODE_PREV:
13,744✔
2463
      type = TSDB_FILL_PREV;
13,744✔
2464
      break;
13,744✔
UNCOV
2465
    case FILL_MODE_NONE:
×
UNCOV
2466
      type = TSDB_FILL_NONE;
×
UNCOV
2467
      break;
×
2468
    case FILL_MODE_NULL:
26,678✔
2469
      type = TSDB_FILL_NULL;
26,678✔
2470
      break;
26,678✔
2471
    case FILL_MODE_NULL_F:
30,894✔
2472
      type = TSDB_FILL_NULL_F;
30,894✔
2473
      break;
30,894✔
2474
    case FILL_MODE_NEXT:
15,012✔
2475
      type = TSDB_FILL_NEXT;
15,012✔
2476
      break;
15,012✔
2477
    case FILL_MODE_VALUE:
138,727✔
2478
      type = TSDB_FILL_SET_VALUE;
138,727✔
2479
      break;
138,727✔
2480
    case FILL_MODE_VALUE_F:
289,927✔
2481
      type = TSDB_FILL_SET_VALUE_F;
289,927✔
2482
      break;
289,927✔
2483
    case FILL_MODE_LINEAR:
13,169✔
2484
      type = TSDB_FILL_LINEAR;
13,169✔
2485
      break;
13,169✔
2486
    case FILL_MODE_NEAR:
10,030✔
2487
      type = TSDB_FILL_NEAR;
10,030✔
2488
      break;
10,030✔
UNCOV
2489
    default:
×
UNCOV
2490
      type = TSDB_FILL_NONE;
×
2491
  }
2492

2493
  return type;
538,171✔
2494
}
2495

2496
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery) {
33,155,561✔
2497
  if (ascQuery) {
33,155,561✔
2498
    *w = getAlignQueryTimeWindow(pInterval, ts);
32,866,276✔
2499
  } else {
2500
    // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
2501
    *w = getAlignQueryTimeWindow(pInterval, ts);
289,285✔
2502

2503
    int64_t key = w->skey;
290,747✔
2504
    while (key < ts) {  // moving towards end
956,289✔
2505
      key = getNextTimeWindowStart(pInterval, key, TSDB_ORDER_ASC);
940,929✔
2506
      if (key > ts) {
940,922✔
2507
        break;
275,380✔
2508
      }
2509

2510
      w->skey = key;
665,542✔
2511
    }
2512
    w->ekey = taosTimeAdd(w->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
290,740✔
2513
  }
2514
}
33,142,846✔
2515

2516
static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) {
962,200✔
2517
  STimeWindow w = {0};
962,200✔
2518

2519
  w.skey = taosTimeTruncate(ts, pInterval);
962,200✔
2520
  w.ekey = taosTimeGetIntervalEnd(w.skey, pInterval);
962,356✔
2521
  return w;
962,299✔
2522
}
2523

2524
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) {
362,059✔
2525
  STimeWindow win = *pWindow;
362,059✔
2526
  STimeWindow save = win;
362,059✔
2527
  while (win.skey <= ts && win.ekey >= ts) {
1,691,786✔
2528
    save = win;
1,329,726✔
2529
    // get previous time window
2530
    getNextTimeWindow(pInterval, &win, order == TSDB_ORDER_ASC ? TSDB_ORDER_DESC : TSDB_ORDER_ASC);
1,329,726✔
2531
  }
2532

2533
  return save;
362,060✔
2534
}
2535

2536
// get the correct time window according to the handled timestamp
2537
// todo refactor
2538
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
34,663,174✔
2539
                                int32_t order) {
2540
  STimeWindow w = {0};
34,663,174✔
2541
  if (pResultRowInfo->cur.pageId == -1) {  // the first window, from the previous stored value
34,663,174✔
2542
    getInitialStartTimeWindow(pInterval, ts, &w, (order == TSDB_ORDER_ASC));
33,157,416✔
2543
    return w;
33,142,803✔
2544
  }
2545

2546
  SResultRow* pRow = getResultRowByPos(pBuf, &pResultRowInfo->cur, false);
1,505,758✔
2547
  if (pRow) {
1,507,425!
2548
    TAOS_SET_OBJ_ALIGNED(&w, pRow->win);
1,507,431✔
2549
  }
2550

2551
  // in case of typical time window, we can calculate time window directly.
2552
  if (w.skey > ts || w.ekey < ts) {
1,507,425✔
2553
    w = doCalculateTimeWindow(ts, pInterval);
962,190✔
2554
  }
2555

2556
  if (pInterval->interval != pInterval->sliding) {
1,507,526✔
2557
    // it is an sliding window query, in which sliding value is not equalled to
2558
    // interval value, and we need to find the first qualified time window.
2559
    w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
362,061✔
2560
  }
2561

2562
  return w;
1,507,521✔
2563
}
2564

2565
TSKEY getNextTimeWindowStart(const SInterval* pInterval, TSKEY start, int32_t order) {
226,366,826✔
2566
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
226,366,826✔
2567
  TSKEY   nextStart = taosTimeAdd(start, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
226,366,826✔
2568
  nextStart = taosTimeAdd(nextStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision, NULL);
226,439,131✔
2569
  nextStart = taosTimeAdd(nextStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
226,404,140✔
2570
  return nextStart;
226,014,833✔
2571
}
2572

2573
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order) {
225,457,304✔
2574
  tw->skey = getNextTimeWindowStart(pInterval, tw->skey, order);
225,457,304✔
2575
  tw->ekey = taosTimeAdd(tw->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
225,118,633✔
2576
}
225,255,834✔
2577

2578
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
11,863,724✔
2579
  return (pLimitInfo->limit.limit != -1 || pLimitInfo->limit.offset != -1 || pLimitInfo->slimit.limit != -1 ||
23,705,444!
2580
          pLimitInfo->slimit.offset != -1);
11,841,720✔
2581
}
2582

2583
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo) {
×
UNCOV
2584
  return (pLimitInfo->slimit.limit != -1 || pLimitInfo->slimit.offset != -1);
×
2585
}
2586

2587
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) {
13,694,239✔
2588
  SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)};
13,694,239✔
2589
  SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)};
13,698,325✔
2590

2591
  pLimitInfo->limit = limit;
13,701,066✔
2592
  pLimitInfo->slimit = slimit;
13,701,066✔
2593
  pLimitInfo->remainOffset = limit.offset;
13,701,066✔
2594
  pLimitInfo->remainGroupOffset = slimit.offset;
13,701,066✔
2595
}
13,701,066✔
2596

2597
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo) {
2,258,346✔
2598
  pLimitInfo->numOfOutputRows = 0;
2,258,346✔
2599
  pLimitInfo->remainOffset = pLimitInfo->limit.offset;
2,258,346✔
2600
}
2,258,346✔
2601

2602
int32_t tableListGetSize(const STableListInfo* pTableList, int32_t* pRes) {
16,022,519✔
2603
  if (taosArrayGetSize(pTableList->pTableList) != taosHashGetSize(pTableList->map)) {
16,022,519!
UNCOV
2604
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
UNCOV
2605
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2606
  }
2607
  (*pRes) = taosArrayGetSize(pTableList->pTableList);
16,026,526✔
2608
  return TSDB_CODE_SUCCESS;
16,025,049✔
2609
}
2610

2611
uint64_t tableListGetSuid(const STableListInfo* pTableList) { return pTableList->idInfo.suid; }
78,269✔
2612

2613
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) {
7,100,497✔
2614
  if (taosArrayGetSize(pTableList->pTableList) == 0) {
7,100,497✔
2615
    return NULL;
554✔
2616
  }
2617

2618
  return taosArrayGet(pTableList->pTableList, index);
7,100,464✔
2619
}
2620

2621
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex) {
180✔
2622
  int32_t numOfTables = taosArrayGetSize(pTableList->pTableList);
180✔
2623
  if (startIndex >= numOfTables) {
180!
UNCOV
2624
    return -1;
×
2625
  }
2626

2627
  for (int32_t i = startIndex; i < numOfTables; ++i) {
3,399✔
2628
    STableKeyInfo* p = taosArrayGet(pTableList->pTableList, i);
3,398✔
2629
    if (!p) {
3,398!
UNCOV
2630
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
2631
      return -1;
×
2632
    }
2633
    if (p->uid == uid) {
3,399✔
2634
      return i;
180✔
2635
    }
2636
  }
2637
  return -1;
1✔
2638
}
2639

2640
void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, uint64_t* uid, int32_t* type) {
19,755✔
2641
  *psuid = pTableList->idInfo.suid;
19,755✔
2642
  *uid = pTableList->idInfo.uid;
19,755✔
2643
  *type = pTableList->idInfo.tableType;
19,755✔
2644
}
19,755✔
2645

2646
uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
13,602,356✔
2647
  int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
13,602,356✔
2648
  if (slot == NULL) {
13,606,345!
UNCOV
2649
    return -1;
×
2650
  }
2651

2652
  STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot);
13,606,345✔
2653

2654
  return pKeyInfo->groupId;
13,604,873✔
2655
}
2656

2657
// TODO handle the group offset info, fix it, the rule of group output will be broken by this function
2658
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
10,563✔
2659
  int32_t code = TSDB_CODE_SUCCESS;
10,563✔
2660
  int32_t lino = 0;
10,563✔
2661
  if (pTableList->map == NULL) {
10,563!
UNCOV
2662
    pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
UNCOV
2663
    QUERY_CHECK_NULL(pTableList->map, code, lino, _end, terrno);
×
2664
  }
2665

2666
  STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
10,563✔
2667
  void*         p = taosHashGet(pTableList->map, &uid, sizeof(uid));
10,563✔
2668
  if (p != NULL) {
10,567✔
2669
    qInfo("table:%" PRId64 " already in tableIdList, ignore it", uid);
3!
2670
    goto _end;
3✔
2671
  }
2672

2673
  void* tmp = taosArrayPush(pTableList->pTableList, &keyInfo);
10,564✔
2674
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
10,560!
2675

2676
  int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
10,560✔
2677
  code = taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
10,556✔
2678
  if (code != TSDB_CODE_SUCCESS) {
10,570!
2679
    // we have checked the existence of uid in hash map above
UNCOV
2680
    QUERY_CHECK_CONDITION((code != TSDB_CODE_DUP_KEY), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
UNCOV
2681
    taosArrayPopTailBatch(pTableList->pTableList, 1);  // let's pop the last element in the array list
×
2682
  }
2683

2684
_end:
10,570✔
2685
  if (code != TSDB_CODE_SUCCESS) {
10,572!
UNCOV
2686
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2687
  } else {
2688
    qDebug("uid:%" PRIu64 ", groupId:%" PRIu64 " added into table list, slot:%d, total:%d", uid, gid, slot, slot + 1);
10,572✔
2689
  }
2690

2691
  return code;
10,570✔
2692
}
2693

2694
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
2,752,397✔
2695
                              int32_t* size) {
2696
  int32_t totalGroups = tableListGetOutputGroups(pTableList);
2,752,397✔
2697
  int32_t numOfTables = 0;
2,755,939✔
2698
  int32_t code = tableListGetSize(pTableList, &numOfTables);
2,755,939✔
2699
  if (code != TSDB_CODE_SUCCESS) {
2,752,584!
UNCOV
2700
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2701
    return code;
×
2702
  }
2703

2704
  if (ordinalGroupIndex < 0 || ordinalGroupIndex >= totalGroups) {
2,752,584!
UNCOV
2705
    return TSDB_CODE_INVALID_PARA;
×
2706
  }
2707

2708
  // here handle two special cases:
2709
  // 1. only one group exists, and 2. one table exists for each group.
2710
  if (totalGroups == 1) {
2,752,584✔
2711
    *size = numOfTables;
2,751,159✔
2712
    *pKeyInfo = (*size == 0) ? NULL : taosArrayGet(pTableList->pTableList, 0);
2,751,159✔
2713
    return TSDB_CODE_SUCCESS;
2,750,760✔
2714
  } else if (totalGroups == numOfTables) {
1,425✔
2715
    *size = 1;
1,037✔
2716
    *pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex);
1,037✔
2717
    return TSDB_CODE_SUCCESS;
1,038✔
2718
  }
2719

2720
  int32_t offset = pTableList->groupOffset[ordinalGroupIndex];
388✔
2721
  if (ordinalGroupIndex < totalGroups - 1) {
388!
2722
    *size = pTableList->groupOffset[ordinalGroupIndex + 1] - offset;
528✔
2723
  } else {
UNCOV
2724
    *size = numOfTables - offset;
×
2725
  }
2726

2727
  *pKeyInfo = taosArrayGet(pTableList->pTableList, offset);
388✔
2728
  return TSDB_CODE_SUCCESS;
660✔
2729
}
2730

2731
int32_t tableListGetOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; }
8,220,645✔
2732

2733
bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; }
4,682✔
2734

2735
STableListInfo* tableListCreate() {
4,821,342✔
2736
  STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo));
4,821,342!
2737
  if (pListInfo == NULL) {
4,828,455!
UNCOV
2738
    return NULL;
×
2739
  }
2740

2741
  pListInfo->remainGroups = NULL;
4,828,455✔
2742
  pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
4,828,455✔
2743
  if (pListInfo->pTableList == NULL) {
4,828,558!
UNCOV
2744
    goto _error;
×
2745
  }
2746

2747
  pListInfo->map = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
4,828,558✔
2748
  if (pListInfo->map == NULL) {
4,828,599!
UNCOV
2749
    goto _error;
×
2750
  }
2751

2752
  pListInfo->numOfOuputGroups = 1;
4,828,599✔
2753
  return pListInfo;
4,828,599✔
2754

UNCOV
2755
_error:
×
UNCOV
2756
  tableListDestroy(pListInfo);
×
2757
  return NULL;
×
2758
}
2759

2760
void tableListDestroy(STableListInfo* pTableListInfo) {
4,829,054✔
2761
  if (pTableListInfo == NULL) {
4,829,054✔
2762
    return;
176✔
2763
  }
2764

2765
  taosArrayDestroy(pTableListInfo->pTableList);
4,828,878✔
2766
  taosMemoryFreeClear(pTableListInfo->groupOffset);
4,829,651!
2767

2768
  taosHashCleanup(pTableListInfo->map);
4,829,651✔
2769
  taosHashCleanup(pTableListInfo->remainGroups);
4,829,506✔
2770
  pTableListInfo->pTableList = NULL;
4,829,714✔
2771
  pTableListInfo->map = NULL;
4,829,714✔
2772
  taosMemoryFree(pTableListInfo);
4,829,714!
2773
}
2774

2775
void tableListClear(STableListInfo* pTableListInfo) {
2,511✔
2776
  if (pTableListInfo == NULL) {
2,511!
UNCOV
2777
    return;
×
2778
  }
2779

2780
  taosArrayClear(pTableListInfo->pTableList);
2,511✔
2781
  taosHashClear(pTableListInfo->map);
2,512✔
2782
  taosHashClear(pTableListInfo->remainGroups);
2,512✔
2783
  taosMemoryFree(pTableListInfo->groupOffset);
2,512!
2784
  pTableListInfo->numOfOuputGroups = 1;
2,512✔
2785
  pTableListInfo->oneTableForEachGroup = false;
2,512✔
2786
}
2787

2788
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
91,628✔
2789
  STableKeyInfo* pInfo1 = (STableKeyInfo*)p1;
91,628✔
2790
  STableKeyInfo* pInfo2 = (STableKeyInfo*)p2;
91,628✔
2791

2792
  if (pInfo1->groupId == pInfo2->groupId) {
91,628✔
2793
    return 0;
168✔
2794
  } else {
2795
    return pInfo1->groupId < pInfo2->groupId ? -1 : 1;
91,460✔
2796
  }
2797
}
2798

2799
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
914✔
2800
  taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
914✔
2801
  int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
915✔
2802

2803
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
915✔
2804
  if (!pList) {
914!
UNCOV
2805
    return terrno;
×
2806
  }
2807

2808
  STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
914✔
2809
  if (!pInfo) {
915!
UNCOV
2810
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
2811
    return terrno;
×
2812
  }
2813
  uint64_t       gid = pInfo->groupId;
915✔
2814

2815
  int32_t start = 0;
915✔
2816
  void*   tmp = taosArrayPush(pList, &start);
915✔
2817
  if (!tmp) {
915!
UNCOV
2818
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
2819
    return terrno;
×
2820
  }
2821

2822
  for (int32_t i = 1; i < size; ++i) {
2,882✔
2823
    pInfo = taosArrayGet(pTableListInfo->pTableList, i);
1,966✔
2824
    if (!pInfo) {
1,965!
UNCOV
2825
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
2826
      return terrno;
×
2827
    }
2828
    if (pInfo->groupId != gid) {
1,966✔
2829
      tmp = taosArrayPush(pList, &i);
1,799✔
2830
      if (!tmp) {
1,799!
2831
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
2832
        return terrno;
×
2833
      }
2834
      gid = pInfo->groupId;
1,799✔
2835
    }
2836
  }
2837

2838
  pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
916✔
2839
  pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
914!
2840
  if (pTableListInfo->groupOffset == NULL) {
914!
UNCOV
2841
    taosArrayDestroy(pList);
×
UNCOV
2842
    return terrno;
×
2843
  }
2844

2845
  memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
914✔
2846
  taosArrayDestroy(pList);
914✔
2847
  return TSDB_CODE_SUCCESS;
915✔
2848
}
2849

2850
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SScanPhysiNode* pScanNode,
4,462,514✔
2851
                                    SNodeList* group, bool groupSort, uint8_t* digest, SStorageAPI* pAPI) {
2852
  int32_t code = TSDB_CODE_SUCCESS;
4,462,514✔
2853

2854
  bool   groupByTbname = groupbyTbname(group);
4,462,514✔
2855
  size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
4,458,631✔
2856
  if (!numOfTables) {
4,460,888!
UNCOV
2857
    return code;
×
2858
  }
2859
  if (group == NULL || groupByTbname) {
4,460,888✔
2860
    if (tsCountAlwaysReturnValue && QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode) &&
4,397,454✔
2861
        ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) {
2,477,920✔
2862
      pTableListInfo->remainGroups =
193,810✔
2863
          taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
193,814✔
2864
      if (pTableListInfo->remainGroups == NULL) {
193,810!
UNCOV
2865
        return terrno;
×
2866
      }
2867

2868
      for (int i = 0; i < numOfTables; i++) {
669,422✔
2869
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
475,508✔
2870
        if (!info) {
475,456✔
2871
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
22!
2872
          return terrno;
22✔
2873
        }
2874
        info->groupId = groupByTbname ? info->uid : 0;
475,434✔
2875

2876
        int32_t tempRes = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId),
475,434✔
2877
                                      &(info->uid), sizeof(info->uid));
475,434✔
2878
        if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
475,612!
UNCOV
2879
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
UNCOV
2880
          return tempRes;
×
2881
        }
2882
      }
2883
    } else {
2884
      for (int32_t i = 0; i < numOfTables; i++) {
15,446,016✔
2885
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
11,245,703✔
2886
        if (!info) {
11,243,387✔
2887
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
1,011!
2888
          return terrno;
1,011✔
2889
        }
2890
        info->groupId = groupByTbname ? info->uid : 0;
11,242,376✔
2891
      }
2892
    }
2893

2894
    pTableListInfo->oneTableForEachGroup = groupByTbname;
4,394,227✔
2895
    if (numOfTables == 1 && pTableListInfo->idInfo.tableType == TSDB_CHILD_TABLE) {
4,394,227✔
2896
      pTableListInfo->oneTableForEachGroup = true;
443,033✔
2897
    }
2898

2899
    if (groupSort && groupByTbname) {
4,394,227✔
2900
      taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
27,933✔
2901
      pTableListInfo->numOfOuputGroups = numOfTables;
27,954✔
2902
    } else if (groupByTbname && pScanNode->groupOrderScan) {
4,366,294✔
2903
      pTableListInfo->numOfOuputGroups = numOfTables;
163✔
2904
    } else {
2905
      pTableListInfo->numOfOuputGroups = 1;
4,366,131✔
2906
    }
2907
  } else {
2908
    bool initRemainGroups = false;
63,434✔
2909
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) {
63,434✔
2910
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode;
62,817✔
2911
      if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable &&
62,817✔
2912
          !(groupSort || pScanNode->groupOrderScan)) {
13,210!
2913
        initRemainGroups = true;
13,166✔
2914
      }
2915
    }
2916

2917
    code = getColInfoResultForGroupby(pHandle->vnode, group, pTableListInfo, digest, pAPI, initRemainGroups);
63,434✔
2918
    if (code != TSDB_CODE_SUCCESS) {
63,434✔
2919
      return code;
15✔
2920
    }
2921

2922
    if (pScanNode->groupOrderScan) pTableListInfo->numOfOuputGroups = taosArrayGetSize(pTableListInfo->pTableList);
63,419✔
2923

2924
    if (groupSort || pScanNode->groupOrderScan) {
63,419✔
2925
      code = sortTableGroup(pTableListInfo);
903✔
2926
    }
2927
  }
2928

2929
  // add all table entry in the hash map
2930
  size_t size = taosArrayGetSize(pTableListInfo->pTableList);
4,457,679✔
2931
  for (int32_t i = 0; i < size; ++i) {
16,467,559✔
2932
    STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i);
11,993,445✔
2933
    if (!p) {
11,987,838!
UNCOV
2934
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
2935
      return terrno;
×
2936
    }
2937
    int32_t        tempRes = taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t));
11,987,838✔
2938
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
12,012,142!
UNCOV
2939
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
UNCOV
2940
      return tempRes;
×
2941
    }
2942
  }
2943

2944
  return code;
4,474,114✔
2945
}
2946

2947
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
4,730,161✔
2948
                                STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
2949
                                SExecTaskInfo* pTaskInfo) {
2950
  int64_t     st = taosGetTimestampUs();
4,734,020✔
2951
  const char* idStr = GET_TASKID(pTaskInfo);
4,734,020✔
2952

2953
  if (pHandle == NULL) {
4,734,020!
UNCOV
2954
    qError("invalid handle, in creating operator tree, %s", idStr);
×
UNCOV
2955
    return TSDB_CODE_INVALID_PARA;
×
2956
  }
2957

2958
  uint8_t digest[17] = {0};
4,734,020✔
2959
  int32_t code = getTableList(pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo, digest, idStr,
4,734,020✔
2960
                              &pTaskInfo->storageAPI);
2961
  if (code != TSDB_CODE_SUCCESS) {
4,732,005✔
2962
    qError("failed to getTableList, code:%s", tstrerror(code));
14!
2963
    return code;
14✔
2964
  }
2965

2966
  int32_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
4,731,991✔
2967

2968
  int64_t st1 = taosGetTimestampUs();
4,732,945✔
2969
  pTaskInfo->cost.extractListTime = (st1 - st) / 1000.0;
4,732,945✔
2970
  qDebug("extract queried table list completed, %d tables, elapsed time:%.2f ms %s", numOfTables,
4,732,945✔
2971
         pTaskInfo->cost.extractListTime, idStr);
2972

2973
  if (numOfTables == 0) {
4,731,588✔
2974
    qDebug("no table qualified for query, %s" PRIx64, idStr);
266,901✔
2975
    return TSDB_CODE_SUCCESS;
266,873✔
2976
  }
2977

2978
  code = buildGroupIdMapForAllTables(pTableListInfo, pHandle, pScanNode, pGroupTags, groupSort, digest,
4,464,687✔
2979
                                     &pTaskInfo->storageAPI);
2980
  if (code != TSDB_CODE_SUCCESS) {
4,470,021✔
2981
    return code;
15✔
2982
  }
2983

2984
  pTaskInfo->cost.groupIdMapTime = (taosGetTimestampUs() - st1) / 1000.0;
4,469,963✔
2985
  qDebug("generate group id map completed, elapsed time:%.2f ms %s", pTaskInfo->cost.groupIdMapTime, idStr);
4,469,963✔
2986

2987
  return TSDB_CODE_SUCCESS;
4,469,317✔
2988
}
2989

2990
char* getStreamOpName(uint16_t opType) {
2,610,844✔
2991
  switch (opType) {
2,610,844!
2992
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
459,723✔
2993
      return "stream scan";
459,723✔
2994
    case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
29,455✔
2995
      return "project";
29,455✔
2996
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
1,285,334✔
2997
      return "interval single";
1,285,334✔
2998
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
12,450✔
2999
      return "interval final";
12,450✔
3000
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
35,742✔
3001
      return "interval semi";
35,742✔
3002
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
332✔
3003
      return "interval mid";
332✔
3004
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
5,551✔
3005
      return "stream fill";
5,551✔
3006
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
9,270✔
3007
      return "session single";
9,270✔
3008
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
1,241✔
3009
      return "session semi";
1,241✔
3010
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
992✔
3011
      return "session final";
992✔
3012
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
7,310✔
3013
      return "state single";
7,310✔
3014
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
739,312✔
3015
      return "stream partitionby";
739,312✔
3016
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
938✔
3017
      return "stream event";
938✔
3018
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT:
9,971✔
3019
      return "stream count";
9,971✔
3020
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC:
7,333✔
3021
      return "stream interp";
7,333✔
3022
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_INTERVAL:
4,777✔
3023
      return "interval continue";
4,777✔
3024
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_SEMI_INTERVAL:
155✔
3025
      return "interval continue semi";
155✔
3026
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_FINAL_INTERVAL:
102✔
3027
      return "interval continue final";
102✔
UNCOV
3028
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_SESSION:
×
UNCOV
3029
      return "session continue";
×
UNCOV
3030
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_SEMI_SESSION:
×
UNCOV
3031
      return "session continue semi";
×
UNCOV
3032
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_FINAL_SESSION:
×
UNCOV
3033
      return "session continue final";
×
UNCOV
3034
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_STATE:
×
UNCOV
3035
      return "state continue";
×
UNCOV
3036
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_EVENT:
×
UNCOV
3037
      return "event continue";
×
UNCOV
3038
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_COUNT:
×
UNCOV
3039
      return "count continue";
×
3040
  }
3041
  return "error name";
856✔
3042
}
3043

3044
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr) {
1,248,631✔
3045
  if (!pBlock) {
1,248,631✔
3046
    qInfo("%s===stream===%s: Block is Null", taskIdStr, flag);
57!
3047
    return;
57✔
3048
  } else if (pBlock->info.rows == 0) {
1,248,574✔
3049
    qInfo("%s===stream===%s: Block is Empty. block type %d", taskIdStr, flag, pBlock->info.type);
21,538!
3050
    return;
21,585✔
3051
  }
3052
  if (qDebugFlag & DEBUG_DEBUG) {
1,227,036✔
3053
    char*   pBuf = NULL;
47,070✔
3054
    int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr);
47,070✔
3055
    if (code == 0) {
47,070!
3056
      qInfo("%s", pBuf);
47,070!
3057
      taosMemoryFree(pBuf);
47,070!
3058
    }
3059
  }
3060
}
3061

3062
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
1,296,623✔
3063
  if (!pBlock) {
1,296,623!
UNCOV
3064
    qDebug("%s===stream===%s %s: Block is Null", taskIdStr, flag, opStr);
×
UNCOV
3065
    return;
×
3066
  } else if (pBlock->info.rows == 0) {
1,296,623✔
3067
    qDebug("%s===stream===%s %s: Block is Empty. block type %d.skey:%" PRId64 ",ekey:%" PRId64 ",version%" PRId64,
12,980✔
3068
           taskIdStr, flag, opStr, pBlock->info.type, pBlock->info.window.skey, pBlock->info.window.ekey,
3069
           pBlock->info.version);
3070
    return;
12,979✔
3071
  }
3072
  if (qDebugFlag & DEBUG_DEBUG) {
1,283,643✔
3073
    char* pBuf = NULL;
59,829✔
3074
    char  flagBuf[64];
3075
    snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
59,829✔
3076
    int32_t code = dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr);
59,829✔
3077
    if (code == 0) {
59,828!
3078
      qDebug("%s", pBuf);
59,828!
3079
      taosMemoryFree(pBuf);
59,829!
3080
    }
3081
  }
3082
}
3083

3084
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
3,745,969!
3085

3086
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) {
388,715,920✔
3087
  int64_t* ts = (int64_t*)pColData->pData;
388,715,920✔
3088

3089
  int64_t duration = pWin->ekey - pWin->skey + delta;
388,715,920✔
3090
  ts[2] = duration;            // set the duration
388,715,920✔
3091
  ts[3] = pWin->skey;          // window start key
388,715,920✔
3092
  ts[4] = pWin->ekey + delta;  // window end key
388,715,920✔
3093
}
388,715,920✔
3094

3095
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock,
21,343,468✔
3096
                 int32_t rowIndex) {
3097
  SColumnDataAgg* pColAgg = NULL;
21,343,468✔
3098
  const char*     isNull = oldkeyBuf;
21,343,468✔
3099
  const char*     p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size;
21,343,468✔
3100

3101
  for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
53,615,068✔
3102
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
32,754,237✔
3103
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
32,754,237✔
3104
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
32,754,237!
3105

3106
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
65,508,474!
3107
      if (isNull[i] != 1) return 1;
2,178,730✔
3108
    } else {
3109
      if (isNull[i] != 0) return 1;
30,575,507✔
3110
      const char* val = colDataGetData(pColInfoData, rowIndex);
30,561,286!
3111
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
30,561,286!
UNCOV
3112
        int32_t len = getJsonValueLen(val);
×
UNCOV
3113
        if (memcmp(p, val, len) != 0) return 1;
×
UNCOV
3114
        p += len;
×
3115
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
30,561,286!
3116
        if (memcmp(p, val, varDataTLen(val)) != 0) return 1;
10,379,297✔
3117
        p += varDataTLen(val);
10,242,959✔
3118
      } else {
3119
        if (0 != memcmp(p, val, pCol->bytes)) return 1;
20,181,989✔
3120
        p += pCol->bytes;
19,850,103✔
3121
      }
3122
    }
3123
  }
3124
  if ((int32_t)(p - oldkeyBuf) != oldKeysLen) return 1;
20,860,831✔
3125
  return 0;
20,860,535✔
3126
}
3127

3128
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex) {
482,583✔
3129
  uint32_t        colNum = pSortGroupCols->size;
482,583✔
3130
  SColumnDataAgg* pColAgg = NULL;
482,583✔
3131
  char*           isNull = keyBuf;
482,583✔
3132
  char*           p = keyBuf + sizeof(int8_t) * colNum;
482,583✔
3133

3134
  for (int32_t i = 0; i < colNum; ++i) {
1,437,456✔
3135
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
954,873✔
3136
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
954,873✔
3137
    if (pCol->slotId > pBlock->pDataBlock->size) continue;
954,873!
3138

3139
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
954,873!
3140

3141
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
1,909,746!
3142
      isNull[i] = 1;
34,521✔
3143
    } else {
3144
      isNull[i] = 0;
920,352✔
3145
      const char* val = colDataGetData(pColInfoData, rowIndex);
920,352!
3146
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
920,352!
UNCOV
3147
        int32_t len = getJsonValueLen(val);
×
UNCOV
3148
        memcpy(p, val, len);
×
UNCOV
3149
        p += len;
×
3150
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
920,352!
3151
        varDataCopy(p, val);
139,859✔
3152
        p += varDataTLen(val);
139,859✔
3153
      } else {
3154
        memcpy(p, val, pCol->bytes);
780,493✔
3155
        p += pCol->bytes;
780,493✔
3156
      }
3157
    }
3158
  }
3159
  return (int32_t)(p - keyBuf);
482,583✔
3160
}
3161

3162
uint64_t calcGroupId(char* pData, int32_t len) {
18,087,909✔
3163
  T_MD5_CTX context;
3164
  tMD5Init(&context);
18,087,909✔
3165
  tMD5Update(&context, (uint8_t*)pData, len);
18,060,510✔
3166
  tMD5Final(&context);
18,172,175✔
3167

3168
  // NOTE: only extract the initial 8 bytes of the final MD5 digest
3169
  uint64_t id = 0;
18,304,948✔
3170
  memcpy(&id, context.digest, sizeof(uint64_t));
18,304,948✔
3171
  if (0 == id) memcpy(&id, context.digest + 8, sizeof(uint64_t));
18,304,948!
3172
  return id;
18,304,948✔
3173
}
3174

3175
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
943✔
3176
  SNode*     node;
3177
  SNodeList* ret = NULL;
943✔
3178
  FOREACH(node, pSortKeys) {
2,879!
3179
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)node;
1,935✔
3180
    int32_t           code = nodesListMakeAppend(&ret, pSortKey->pExpr);
1,935✔
3181
    if (code != TSDB_CODE_SUCCESS) {
1,936!
UNCOV
3182
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
3183
      terrno = code;
×
UNCOV
3184
      return NULL;
×
3185
    }
3186
  }
3187
  return ret;
944✔
3188
}
3189

3190
int32_t extractKeysLen(const SArray* keys, int32_t* pLen) {
942✔
3191
  int32_t code = TSDB_CODE_SUCCESS;
942✔
3192
  int32_t lino = 0;
942✔
3193
  int32_t len = 0;
942✔
3194
  int32_t keyNum = taosArrayGetSize(keys);
942✔
3195
  for (int32_t i = 0; i < keyNum; ++i) {
2,399✔
3196
    SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
1,456✔
3197
    QUERY_CHECK_NULL(pCol, code, lino, _end, terrno);
1,455!
3198
    len += pCol->bytes;
1,455✔
3199
  }
3200
  len += sizeof(int8_t) * keyNum;  // null flag
943✔
3201
  *pLen = len;
943✔
3202

3203
_end:
943✔
3204
  if (code != TSDB_CODE_SUCCESS) {
943!
UNCOV
3205
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3206
  }
3207
  return code;
942✔
3208
}
3209

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc