• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4838

08 Nov 2025 04:37AM UTC coverage: 71.256% (+12.3%) from 58.963%
#4838

push

travis-ci

web-flow
test: adjust source list (#33506)

243241 of 341361 relevant lines covered (71.26%)

281946921.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.47
/source/libs/executor/src/executil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "function.h"
17
#include "functionMgt.h"
18
#include "index.h"
19
#include "os.h"
20
#include "query.h"
21
#include "querynodes.h"
22
#include "taoserror.h"
23
#include "tarray.h"
24
#include "tdatablock.h"
25
#include "thash.h"
26
#include "tmsg.h"
27
#include "ttime.h"
28

29
#include "executil.h"
30
#include "executorInt.h"
31
#include "querytask.h"
32
#include "storageapi.h"
33
#include "tcompression.h"
34
#include "tutil.h"
35

36
typedef struct tagFilterAssist {
37
  SHashObj* colHash;
38
  int32_t   index;
39
  SArray*   cInfoList;
40
  int32_t   code;
41
} tagFilterAssist;
42

43
typedef struct STransTagExprCtx {
44
  int32_t      code;
45
  SMetaReader* pReader;
46
} STransTagExprCtx;
47

48
typedef enum {
49
  FILTER_NO_LOGIC = 1,
50
  FILTER_AND,
51
  FILTER_OTHER,
52
} FilterCondType;
53

54
static FilterCondType checkTagCond(SNode* cond);
55
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI);
56
static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI,
57
                                        uint64_t suid);
58

59
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
60
                            STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI, void* pStreamInfo);
61

62
static int64_t getLimit(const SNode* pLimit) {
2,147,483,647✔
63
  return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->limit) ? -1 : ((SLimitNode*)pLimit)->limit->datum.i;
2,147,483,647✔
64
}
65
static int64_t getOffset(const SNode* pLimit) {
2,147,483,647✔
66
  return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->offset) ? -1 : ((SLimitNode*)pLimit)->offset->datum.i;
2,147,483,647✔
67
}
68
static void releaseColInfoData(void* pCol);
69

70
void initResultRowInfo(SResultRowInfo* pResultRowInfo) {
2,147,483,647✔
71
  pResultRowInfo->size = 0;
2,147,483,647✔
72
  pResultRowInfo->cur.pageId = -1;
2,147,483,647✔
73
}
2,147,483,647✔
74

75
void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; }
56,200,658✔
76

77
void resetResultRow(SResultRow* pResultRow, size_t entrySize) {
2,147,483,647✔
78
  pResultRow->numOfRows = 0;
2,147,483,647✔
79
  pResultRow->closed = false;
2,147,483,647✔
80
  pResultRow->endInterp = false;
2,147,483,647✔
81
  pResultRow->startInterp = false;
2,147,483,647✔
82

83
  if (entrySize > 0) {
2,147,483,647✔
84
    memset(pResultRow->pEntryInfo, 0, entrySize);
2,147,483,647✔
85
  }
86
}
2,147,483,647✔
87

88
// TODO refactor: use macro
89
SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) {
2,147,483,647✔
90
  return (SResultRowEntryInfo*)((char*)pRow->pEntryInfo + offset[index]);
2,147,483,647✔
91
}
92

93
size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
1,909,805,113✔
94
  int32_t rowSize = (numOfOutput * sizeof(SResultRowEntryInfo)) + sizeof(SResultRow);
1,909,805,113✔
95

96
  for (int32_t i = 0; i < numOfOutput; ++i) {
2,147,483,647✔
97
    rowSize += pCtx[i].resDataInfo.interBufSize;
2,147,483,647✔
98
  }
99

100
  return rowSize;
1,909,697,961✔
101
}
102

103
// Convert buf read from rocksdb to result row
104
int32_t getResultRowFromBuf(SExprSupp* pSup, const char* inBuf, size_t inBufSize, char** outBuf, size_t* outBufSize) {
×
105
  if (inBuf == NULL || pSup == NULL) {
×
106
    qError("invalid input parameters, inBuf:%p, pSup:%p", inBuf, pSup);
×
107
    return TSDB_CODE_INVALID_PARA;
×
108
  }
109
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
110
  int32_t*        offset = pSup->rowEntryInfoOffset;
×
111
  SResultRow*     pResultRow = NULL;
×
112
  size_t          processedSize = 0;
×
113
  int32_t         code = TSDB_CODE_SUCCESS;
×
114

115
  // calculate the size of output buffer
116
  *outBufSize = getResultRowSize(pCtx, pSup->numOfExprs);
×
117
  *outBuf = taosMemoryMalloc(*outBufSize);
×
118
  if (*outBuf == NULL) {
×
119
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
120
    return terrno;
×
121
  }
122
  pResultRow = (SResultRow*)*outBuf;
×
123
  (void)memcpy(pResultRow, inBuf, sizeof(SResultRow));
×
124
  inBuf += sizeof(SResultRow);
×
125
  processedSize += sizeof(SResultRow);
×
126

127
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
×
128
    int32_t len = *(int32_t*)inBuf;
×
129
    inBuf += sizeof(int32_t);
×
130
    processedSize += sizeof(int32_t);
×
131
    if (pResultRow->version != FUNCTION_RESULT_INFO_VERSION && pCtx->fpSet.decode) {
×
132
      code = pCtx->fpSet.decode(&pCtx[i], inBuf, getResultEntryInfo(pResultRow, i, offset), pResultRow->version);
×
133
      if (code != TSDB_CODE_SUCCESS) {
×
134
        qError("failed to decode result row, code:%d", code);
×
135
        return code;
×
136
      }
137
    } else {
138
      (void)memcpy(getResultEntryInfo(pResultRow, i, offset), inBuf, len);
×
139
    }
140
    inBuf += len;
×
141
    processedSize += len;
×
142
  }
143

144
  if (processedSize < inBufSize) {
×
145
    // stream stores extra data after result row
146
    size_t leftLen = inBufSize - processedSize;
×
147
    TAOS_MEMORY_REALLOC(*outBuf, *outBufSize + leftLen);
×
148
    if (*outBuf == NULL) {
×
149
      qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen);
×
150
      return terrno;
×
151
    }
152
    (void)memcpy(*outBuf + *outBufSize, inBuf, leftLen);
×
153
    inBuf += leftLen;
×
154
    processedSize += leftLen;
×
155
    *outBufSize += leftLen;
×
156
  }
157

158
  qTrace("[StreamInternal] get result inBufSize:%zu, outBufSize:%zu", inBufSize, *outBufSize);
×
159
  return TSDB_CODE_SUCCESS;
×
160
}
161

162
// Convert result row to buf for rocksdb
163
int32_t putResultRowToBuf(SExprSupp* pSup, const char* inBuf, size_t inBufSize, char** outBuf, size_t* outBufSize) {
×
164
  if (pSup == NULL || inBuf == NULL || outBuf == NULL || outBufSize == NULL) {
×
165
    qError("invalid input parameters, inBuf:%p, pSup:%p, outBufSize:%p, outBuf:%p", inBuf, pSup, outBufSize, outBuf);
×
166
    return TSDB_CODE_INVALID_PARA;
×
167
  }
168

169
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
170
  int32_t*        offset = pSup->rowEntryInfoOffset;
×
171
  SResultRow*     pResultRow = (SResultRow*)inBuf;
×
172
  size_t          rowSize = getResultRowSize(pCtx, pSup->numOfExprs);
×
173

174
  if (rowSize > inBufSize) {
×
175
    qError("invalid input buffer size, rowSize:%zu, inBufSize:%zu", rowSize, inBufSize);
×
176
    return TSDB_CODE_INVALID_PARA;
×
177
  }
178

179
  // calculate the size of output buffer
180
  *outBufSize = rowSize + sizeof(int32_t) * pSup->numOfExprs;
×
181
  if (rowSize < inBufSize) {
×
182
    *outBufSize += inBufSize - rowSize;
×
183
  }
184

185
  *outBuf = taosMemoryMalloc(*outBufSize);
×
186
  if (*outBuf == NULL) {
×
187
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
188
    return terrno;
×
189
  }
190

191
  char* pBuf = *outBuf;
×
192
  pResultRow->version = FUNCTION_RESULT_INFO_VERSION;
×
193
  (void)memcpy(pBuf, pResultRow, sizeof(SResultRow));
×
194
  pBuf += sizeof(SResultRow);
×
195
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
×
196
    size_t len = sizeof(SResultRowEntryInfo) + pCtx[i].resDataInfo.interBufSize;
×
197
    *(int32_t*)pBuf = (int32_t)len;
×
198
    pBuf += sizeof(int32_t);
×
199
    (void)memcpy(pBuf, getResultEntryInfo(pResultRow, i, offset), len);
×
200
    pBuf += len;
×
201
  }
202

203
  if (rowSize < inBufSize) {
×
204
    // stream stores extra data after result row
205
    size_t leftLen = inBufSize - rowSize;
×
206
    (void)memcpy(pBuf, inBuf + rowSize, leftLen);
×
207
    pBuf += leftLen;
×
208
  }
209

210
  qTrace("[StreamInternal] put result inBufSize:%zu, outBufSize:%zu", inBufSize, *outBufSize);
×
211
  return TSDB_CODE_SUCCESS;
×
212
}
213

214
static void freeEx(void* p) { taosMemoryFree(*(void**)p); }
×
215

216
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
987,791,557✔
217
  taosMemoryFreeClear(pGroupResInfo->pBuf);
987,791,557✔
218
  if (pGroupResInfo->freeItem) {
987,803,729✔
219
    //    taosArrayDestroy(pGroupResInfo->pRows);
220
    taosArrayDestroyEx(pGroupResInfo->pRows, freeEx);
×
221
    pGroupResInfo->freeItem = false;
×
222
    pGroupResInfo->pRows = NULL;
×
223
  } else {
224
    taosArrayDestroy(pGroupResInfo->pRows);
987,757,542✔
225
    pGroupResInfo->pRows = NULL;
987,777,431✔
226
  }
227
  pGroupResInfo->index = 0;
987,793,317✔
228
  pGroupResInfo->delIndex = 0;
987,793,502✔
229
}
987,764,587✔
230

231
int32_t resultrowComparAsc(const void* p1, const void* p2) {
2,147,483,647✔
232
  SResKeyPos* pp1 = *(SResKeyPos**)p1;
2,147,483,647✔
233
  SResKeyPos* pp2 = *(SResKeyPos**)p2;
2,147,483,647✔
234

235
  if (pp1->groupId == pp2->groupId) {
2,147,483,647✔
236
    int64_t pts1 = *(int64_t*)pp1->key;
2,147,483,647✔
237
    int64_t pts2 = *(int64_t*)pp2->key;
2,147,483,647✔
238

239
    if (pts1 == pts2) {
2,147,483,647✔
240
      return 0;
×
241
    } else {
242
      return pts1 < pts2 ? -1 : 1;
2,147,483,647✔
243
    }
244
  } else {
245
    return pp1->groupId < pp2->groupId ? -1 : 1;
2,147,483,647✔
246
  }
247
}
248

249
static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); }
2,147,483,647✔
250

251
int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) {
676,076,985✔
252
  int32_t code = TSDB_CODE_SUCCESS;
676,076,985✔
253
  int32_t lino = 0;
676,076,985✔
254
  if (pGroupResInfo->pRows != NULL) {
676,076,985✔
255
    taosArrayDestroy(pGroupResInfo->pRows);
60,347,681✔
256
  }
257
  if (pGroupResInfo->pBuf) {
676,104,789✔
258
    taosMemoryFree(pGroupResInfo->pBuf);
60,347,681✔
259
    pGroupResInfo->pBuf = NULL;
60,347,681✔
260
  }
261

262
  // extract the result rows information from the hash map
263
  int32_t size = tSimpleHashGetSize(pHashmap);
676,066,472✔
264

265
  void* pData = NULL;
676,071,745✔
266
  pGroupResInfo->pRows = taosArrayInit(size, POINTER_BYTES);
676,071,745✔
267
  QUERY_CHECK_NULL(pGroupResInfo->pRows, code, lino, _end, terrno);
676,073,975✔
268

269
  size_t  keyLen = 0;
676,048,934✔
270
  int32_t iter = 0;
676,059,702✔
271
  int64_t bufLen = 0, offset = 0;
676,092,332✔
272

273
  // todo move away and record this during create window
274
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
2,147,483,647✔
275
    /*void* key = */ (void)tSimpleHashGetKey(pData, &keyLen);
276
    bufLen += keyLen + sizeof(SResultRowPosition);
2,147,483,647✔
277
  }
278

279
  pGroupResInfo->pBuf = taosMemoryMalloc(bufLen);
675,398,610✔
280
  QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
676,112,180✔
281

282
  iter = 0;
676,080,131✔
283
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
2,147,483,647✔
284
    void* key = tSimpleHashGetKey(pData, &keyLen);
2,147,483,647✔
285

286
    SResKeyPos* p = (SResKeyPos*)(pGroupResInfo->pBuf + offset);
2,147,483,647✔
287

288
    p->groupId = *(uint64_t*)key;
2,147,483,647✔
289
    p->pos = *(SResultRowPosition*)pData;
2,147,483,647✔
290
    memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
2,147,483,647✔
291
    void* tmp = taosArrayPush(pGroupResInfo->pRows, &p);
2,147,483,647✔
292
    QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
2,147,483,647✔
293

294
    offset += keyLen + sizeof(struct SResultRowPosition);
2,147,483,647✔
295
  }
296

297
  if (order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC) {
675,758,551✔
298
    __compar_fn_t fn = (order == TSDB_ORDER_ASC) ? resultrowComparAsc : resultrowComparDesc;
42,146,714✔
299
    size = POINTER_BYTES;
42,146,714✔
300
    taosSort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), size, fn);
42,146,714✔
301
  }
302

303
  pGroupResInfo->index = 0;
675,757,556✔
304

305
_end:
675,758,253✔
306
  if (code != TSDB_CODE_SUCCESS) {
676,090,888✔
307
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
308
  }
309
  return code;
676,090,888✔
310
}
311

312
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
×
313
  if (pGroupResInfo->pRows != NULL) {
×
314
    taosArrayDestroy(pGroupResInfo->pRows);
×
315
  }
316

317
  pGroupResInfo->freeItem = true;
×
318
  pGroupResInfo->pRows = pArrayList;
×
319
  pGroupResInfo->index = 0;
×
320
  pGroupResInfo->delIndex = 0;
×
321
}
×
322

323
bool hasRemainResults(SGroupResInfo* pGroupResInfo) {
2,147,483,647✔
324
  if (pGroupResInfo->pRows == NULL) {
2,147,483,647✔
325
    return false;
×
326
  }
327

328
  return pGroupResInfo->index < taosArrayGetSize(pGroupResInfo->pRows);
2,147,483,647✔
329
}
330

331
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
1,464,295,320✔
332
  if (pGroupResInfo->pRows == 0) {
1,464,295,320✔
333
    return 0;
×
334
  }
335

336
  return (int32_t)taosArrayGetSize(pGroupResInfo->pRows);
1,464,294,945✔
337
}
338

339
SArray* createSortInfo(SNodeList* pNodeList) {
379,509,584✔
340
  size_t numOfCols = 0;
379,509,584✔
341

342
  if (pNodeList != NULL) {
379,509,584✔
343
    numOfCols = LIST_LENGTH(pNodeList);
379,137,313✔
344
  } else {
345
    numOfCols = 0;
372,313✔
346
  }
347

348
  SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
379,506,955✔
349
  if (pList == NULL) {
379,461,460✔
350
    return pList;
×
351
  }
352

353
  for (int32_t i = 0; i < numOfCols; ++i) {
830,323,769✔
354
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)nodesListGetNode(pNodeList, i);
450,845,672✔
355
    if (!pSortKey) {
450,876,885✔
356
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
357
      taosArrayDestroy(pList);
×
358
      pList = NULL;
×
359
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
360
      break;
×
361
    }
362
    SBlockOrderInfo bi = {0};
450,876,885✔
363
    bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
450,848,495✔
364
    bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
450,860,084✔
365

366
    if (nodeType(pSortKey->pExpr) != QUERY_NODE_COLUMN) {
450,855,323✔
367
      qError("invalid order by expr type:%d", nodeType(pSortKey->pExpr));
×
368
      taosArrayDestroy(pList);
×
369
      pList = NULL;
×
370
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
371
      break;
×
372
    }
373
    
374
    SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr;
450,778,494✔
375
    bi.slotId = pColNode->slotId;
450,860,821✔
376
    void* tmp = taosArrayPush(pList, &bi);
450,866,329✔
377
    if (!tmp) {
450,866,329✔
378
      taosArrayDestroy(pList);
×
379
      pList = NULL;
×
380
      break;
×
381
    }
382
  }
383

384
  return pList;
379,481,345✔
385
}
386

387
SSDataBlock* createDataBlockFromDescNode(void* p) {
2,147,483,647✔
388
  SDataBlockDescNode* pNode = (SDataBlockDescNode*)p;
2,147,483,647✔
389
  int32_t      numOfCols = LIST_LENGTH(pNode->pSlots);
2,147,483,647✔
390
  SSDataBlock* pBlock = NULL;
2,147,483,647✔
391
  int32_t      code = createDataBlock(&pBlock);
2,147,483,647✔
392
  if (code) {
2,147,483,647✔
393
    terrno = code;
×
394
    return NULL;
×
395
  }
396

397
  pBlock->info.id.blockId = pNode->dataBlockId;
2,147,483,647✔
398
  pBlock->info.type = STREAM_INVALID;
2,147,483,647✔
399
  pBlock->info.calWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
2,147,483,647✔
400
  pBlock->info.watermark = INT64_MIN;
2,147,483,647✔
401

402
  for (int32_t i = 0; i < numOfCols; ++i) {
2,147,483,647✔
403
    SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
2,147,483,647✔
404
    if (!pDescNode) {
2,147,483,647✔
405
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
406
      blockDataDestroy(pBlock);
×
407
      pBlock = NULL;
×
408
      terrno = TSDB_CODE_INVALID_PARA;
×
409
      break;
×
410
    }
411
    SColumnInfoData idata =
2,147,483,647✔
412
        createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
2,147,483,647✔
413
    idata.info.scale = pDescNode->dataType.scale;
2,147,483,647✔
414
    idata.info.precision = pDescNode->dataType.precision;
2,147,483,647✔
415
    idata.info.noData = pDescNode->reserve;
2,147,483,647✔
416

417
    code = blockDataAppendColInfo(pBlock, &idata);
2,147,483,647✔
418
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
419
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
661,148✔
420
      blockDataDestroy(pBlock);
661,148✔
421
      pBlock = NULL;
×
422
      terrno = code;
×
423
      break;
×
424
    }
425
  }
426

427
  return pBlock;
2,147,483,647✔
428
}
429

430
int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) {
1,656,566,117✔
431
  SDataBlockInfo* pBlockInfo = &pDataBlock->info;
1,656,566,117✔
432

433
  for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) {
2,147,483,647✔
434
    SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i);
2,147,483,647✔
435
    if (!pItem) {
2,147,483,647✔
436
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
437
      return terrno;
×
438
    }
439

440
    if (pItem->isPk) {
2,147,483,647✔
441
      SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId);
62,678,988✔
442
      if (!pInfoData) {
62,223,402✔
443
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
444
        return terrno;
×
445
      }
446
      pBlockInfo->pks[0].type = pInfoData->info.type;
62,223,402✔
447
      pBlockInfo->pks[1].type = pInfoData->info.type;
62,321,231✔
448

449
      // allocate enough buffer size, which is pInfoData->info.bytes
450
      if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
62,344,333✔
451
        pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
20,853,235✔
452
        if (pBlockInfo->pks[0].pData == NULL) {
20,701,079✔
453
          return terrno;
×
454
        }
455

456
        pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
20,706,111✔
457
        if (pBlockInfo->pks[1].pData == NULL) {
20,709,144✔
458
          taosMemoryFreeClear(pBlockInfo->pks[0].pData);
×
459
          return terrno;
×
460
        }
461

462
        pBlockInfo->pks[0].nData = pInfoData->info.bytes;
20,709,655✔
463
        pBlockInfo->pks[1].nData = pInfoData->info.bytes;
20,716,207✔
464
      }
465

466
      break;
62,233,881✔
467
    }
468
  }
469

470
  return TSDB_CODE_SUCCESS;
1,656,943,890✔
471
}
472

473
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
22,902,103✔
474
  STransTagExprCtx* pCtx = pContext;
22,902,103✔
475
  SMetaReader*      mr = pCtx->pReader;
22,902,103✔
476
  bool              isTagCol = false, isTbname = false;
22,902,103✔
477
  if (nodeType(*pNode) == QUERY_NODE_COLUMN) {
22,902,103✔
478
    SColumnNode* pCol = (SColumnNode*)*pNode;
6,543,458✔
479
    if (pCol->colType == COLUMN_TYPE_TBNAME)
6,543,458✔
480
      isTbname = true;
×
481
    else
482
      isTagCol = true;
6,543,458✔
483
  } else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) {
16,358,645✔
484
    SFunctionNode* pFunc = (SFunctionNode*)*pNode;
×
485
    if (pFunc->funcType == FUNCTION_TYPE_TBNAME) isTbname = true;
×
486
  }
487
  if (isTagCol) {
22,902,103✔
488
    SColumnNode* pSColumnNode = *(SColumnNode**)pNode;
6,543,458✔
489

490
    SValueNode* res = NULL;
6,543,458✔
491
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
6,543,458✔
492
    if (NULL == res) {
6,543,458✔
493
      return DEAL_RES_ERROR;
×
494
    }
495

496
    res->translate = true;
6,543,458✔
497
    res->node.resType = pSColumnNode->node.resType;
6,543,458✔
498

499
    STagVal tagVal = {0};
6,543,458✔
500
    tagVal.cid = pSColumnNode->colId;
6,543,458✔
501
    const char* p = mr->pAPI->extractTagVal(mr->me.ctbEntry.pTags, pSColumnNode->node.resType.type, &tagVal);
6,543,458✔
502
    if (p == NULL) {
6,543,458✔
503
      res->node.resType.type = TSDB_DATA_TYPE_NULL;
×
504
    } else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) {
6,543,458✔
505
      int32_t len = ((const STag*)p)->len;
×
506
      res->datum.p = taosMemoryCalloc(len + 1, 1);
×
507
      if (NULL == res->datum.p) {
×
508
        return DEAL_RES_ERROR;
×
509
      }
510
      memcpy(res->datum.p, p, len);
×
511
    } else if (IS_VAR_DATA_TYPE(pSColumnNode->node.resType.type)) {
6,543,458✔
512
      if (IS_STR_DATA_BLOB(pSColumnNode->node.resType.type)) {
6,543,458✔
513
        return TSDB_CODE_BLOB_NOT_SUPPORT_TAG;
×
514
      }
515

516
      res->datum.p = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1);
6,543,458✔
517
      if (NULL == res->datum.p) {
6,543,458✔
518
        return DEAL_RES_ERROR;
×
519
      }
520

521
      if (IS_STR_DATA_BLOB(pSColumnNode->node.resType.type)) {
6,543,458✔
522
        memcpy(blobDataVal(res->datum.p), tagVal.pData, tagVal.nData);
×
523
        blobDataSetLen(res->datum.p, tagVal.nData);
×
524
      } else {
525
        memcpy(varDataVal(res->datum.p), tagVal.pData, tagVal.nData);
6,543,458✔
526
        varDataSetLen(res->datum.p, tagVal.nData);
6,543,458✔
527
      }
528
    } else {
529
      int32_t code = nodesSetValueNodeValue(res, &(tagVal.i64));
×
530
      if (code != TSDB_CODE_SUCCESS) {
×
531
        return DEAL_RES_ERROR;
×
532
      }
533
    }
534
    nodesDestroyNode(*pNode);
6,543,458✔
535
    *pNode = (SNode*)res;
6,543,458✔
536
  } else if (isTbname) {
16,358,645✔
537
    SValueNode* res = NULL;
×
538
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
×
539
    if (NULL == res) {
×
540
      return DEAL_RES_ERROR;
×
541
    }
542

543
    res->translate = true;
×
544
    res->node.resType = ((SExprNode*)(*pNode))->resType;
×
545

546
    int32_t len = strlen(mr->me.name);
×
547
    res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
×
548
    if (NULL == res->datum.p) {
×
549
      return DEAL_RES_ERROR;
×
550
    }
551
    memcpy(varDataVal(res->datum.p), mr->me.name, len);
×
552
    varDataSetLen(res->datum.p, len);
×
553
    nodesDestroyNode(*pNode);
×
554
    *pNode = (SNode*)res;
×
555
  }
556

557
  return DEAL_RES_CONTINUE;
22,902,103✔
558
}
559

560
int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI* pAPI) {
3,271,729✔
561
  int32_t     code = TSDB_CODE_SUCCESS;
3,271,729✔
562
  SMetaReader mr = {0};
3,271,729✔
563

564
  pAPI->metaReaderFn.initReader(&mr, metaHandle, META_READER_LOCK, &pAPI->metaFn);
3,271,729✔
565
  code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, info->uid);
3,271,729✔
566
  if (TSDB_CODE_SUCCESS != code) {
3,271,729✔
567
    pAPI->metaReaderFn.clearReader(&mr);
×
568
    *pQualified = false;
×
569

570
    return TSDB_CODE_SUCCESS;
×
571
  }
572

573
  SNode* pTagCondTmp = NULL;
3,271,729✔
574
  code = nodesCloneNode(pTagCond, &pTagCondTmp);
3,271,729✔
575
  if (TSDB_CODE_SUCCESS != code) {
3,271,729✔
576
    *pQualified = false;
×
577
    pAPI->metaReaderFn.clearReader(&mr);
×
578
    return code;
×
579
  }
580
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
3,271,729✔
581
  nodesRewriteExprPostOrder(&pTagCondTmp, doTranslateTagExpr, &ctx);
3,271,729✔
582
  pAPI->metaReaderFn.clearReader(&mr);
3,271,729✔
583
  if (TSDB_CODE_SUCCESS != ctx.code) {
3,271,729✔
584
    *pQualified = false;
×
585
    terrno = code;
×
586
    return code;
×
587
  }
588

589
  SNode* pNew = NULL;
3,271,729✔
590
  code = scalarCalculateConstants(pTagCondTmp, &pNew);
3,271,729✔
591
  if (TSDB_CODE_SUCCESS != code) {
3,271,729✔
592
    terrno = code;
×
593
    nodesDestroyNode(pTagCondTmp);
×
594
    *pQualified = false;
×
595

596
    return code;
×
597
  }
598

599
  SValueNode* pValue = (SValueNode*)pNew;
3,271,729✔
600
  *pQualified = pValue->datum.b;
3,271,729✔
601

602
  nodesDestroyNode(pNew);
3,271,729✔
603
  return TSDB_CODE_SUCCESS;
3,271,729✔
604
}
605

606
static EDealRes getColumn(SNode** pNode, void* pContext) {
575,155,065✔
607
  tagFilterAssist* pData = (tagFilterAssist*)pContext;
575,155,065✔
608
  SColumnNode*     pSColumnNode = NULL;
575,155,065✔
609
  if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
575,219,955✔
610
    pSColumnNode = *(SColumnNode**)pNode;
192,203,354✔
611
  } else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) {
383,353,594✔
612
    SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
8,214,490✔
613
    if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
8,220,478✔
614
      pData->code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pSColumnNode);
7,766,544✔
615
      if (NULL == pSColumnNode) {
7,774,136✔
616
        return DEAL_RES_ERROR;
×
617
      }
618
      pSColumnNode->colId = -1;
7,774,136✔
619
      pSColumnNode->colType = COLUMN_TYPE_TBNAME;
7,774,136✔
620
      pSColumnNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
7,775,743✔
621
      pSColumnNode->node.resType.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
7,772,327✔
622
      nodesDestroyNode(*pNode);
7,768,458✔
623
      *pNode = (SNode*)pSColumnNode;
7,770,701✔
624
    } else {
625
      return DEAL_RES_CONTINUE;
448,236✔
626
    }
627
  } else {
628
    return DEAL_RES_CONTINUE;
374,851,448✔
629
  }
630

631
  void* data = taosHashGet(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId));
199,967,843✔
632
  if (!data) {
199,905,486✔
633
    int32_t tempRes =
634
        taosHashPut(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode)));
174,185,049✔
635
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
174,270,035✔
636
      return DEAL_RES_ERROR;
×
637
    }
638
    pSColumnNode->slotId = pData->index++;
174,270,035✔
639
    SColumnInfo cInfo = {.colId = pSColumnNode->colId,
174,235,283✔
640
                         .type = pSColumnNode->node.resType.type,
174,223,118✔
641
                         .bytes = pSColumnNode->node.resType.bytes,
174,150,883✔
642
                         .pk = pSColumnNode->isPk};
174,231,726✔
643
#if TAG_FILTER_DEBUG
644
    qDebug("tagfilter build column info, slotId:%d, colId:%d, type:%d", pSColumnNode->slotId, cInfo.colId, cInfo.type);
645
#endif
646
    void* tmp = taosArrayPush(pData->cInfoList, &cInfo);
174,189,744✔
647
    if (!tmp) {
174,224,666✔
648
      return DEAL_RES_ERROR;
×
649
    }
650
  } else {
651
    SColumnNode* col = *(SColumnNode**)data;
25,720,437✔
652
    pSColumnNode->slotId = col->slotId;
25,722,424✔
653
  }
654

655
  return DEAL_RES_CONTINUE;
199,917,853✔
656
}
657

658
static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) {
165,146,401✔
659
  SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
165,146,401✔
660
  if (pColumnData == NULL) {
165,122,947✔
661
    return terrno;
×
662
  }
663

664
  pColumnData->info.type = pType->type;
165,122,947✔
665
  pColumnData->info.bytes = pType->bytes;
165,164,480✔
666
  pColumnData->info.scale = pType->scale;
165,121,365✔
667
  pColumnData->info.precision = pType->precision;
165,159,534✔
668

669
  int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
164,996,059✔
670
  if (code != TSDB_CODE_SUCCESS) {
165,087,750✔
671
    terrno = code;
×
672
    releaseColInfoData(pColumnData);
×
673
    return terrno;
×
674
  }
675

676
  pParam->columnData = pColumnData;
165,087,750✔
677
  pParam->colAlloced = true;
165,098,162✔
678
  return TSDB_CODE_SUCCESS;
165,089,743✔
679
}
680

681
static void releaseColInfoData(void* pCol) {
36,919,695✔
682
  if (pCol) {
36,919,695✔
683
    SColumnInfoData* col = (SColumnInfoData*)pCol;
36,923,941✔
684
    colDataDestroy(col);
36,923,941✔
685
    taosMemoryFree(col);
36,921,726✔
686
  }
687
}
36,908,942✔
688

689
void freeItem(void* p) {
2,027,866,145✔
690
  STUidTagInfo* pInfo = p;
2,027,866,145✔
691
  if (pInfo->pTagVal != NULL) {
2,027,866,145✔
692
    taosMemoryFree(pInfo->pTagVal);
2,023,309,229✔
693
  }
694
}
2,027,861,589✔
695

696
static int32_t genTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
77,490✔
697
  if (pTagCond == NULL) {
77,490✔
698
    return TSDB_CODE_SUCCESS;
51,660✔
699
  }
700

701
  char*   payload = NULL;
25,830✔
702
  int32_t len = 0;
25,830✔
703
  int32_t code = nodesNodeToMsg(pTagCond, &payload, &len);
25,830✔
704
  if (code != TSDB_CODE_SUCCESS) {
25,830✔
705
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
706
    return code;
×
707
  }
708

709
  tMD5Init(pContext);
25,830✔
710
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
25,830✔
711
  tMD5Final(pContext);
25,830✔
712

713
  // void* tmp = NULL;
714
  // uint32_t size = 0;
715
  // (void)taosAscii2Hex((const char*)pContext->digest, 16, &tmp, &size);
716
  // qInfo("tag filter digest payload: %s", tmp);
717
  // taosMemoryFree(tmp);
718

719
  taosMemoryFree(payload);
25,830✔
720
  return TSDB_CODE_SUCCESS;
25,830✔
721
}
722

723
static int32_t genTbGroupDigest(const SNode* pGroup, uint8_t* filterDigest, T_MD5_CTX* pContext) {
×
724
  int32_t code = TSDB_CODE_SUCCESS;
×
725
  int32_t lino = 0;
×
726
  char*   payload = NULL;
×
727
  int32_t len = 0;
×
728
  code = nodesNodeToMsg(pGroup, &payload, &len);
×
729
  QUERY_CHECK_CODE(code, lino, _end);
×
730

731
  if (filterDigest[0]) {
×
732
    payload = taosMemoryRealloc(payload, len + tListLen(pContext->digest));
×
733
    QUERY_CHECK_NULL(payload, code, lino, _end, terrno);
×
734
    memcpy(payload + len, filterDigest + 1, tListLen(pContext->digest));
×
735
    len += tListLen(pContext->digest);
×
736
  }
737

738
  tMD5Init(pContext);
×
739
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
×
740
  tMD5Final(pContext);
×
741

742
_end:
×
743
  taosMemoryFree(payload);
×
744
  if (code != TSDB_CODE_SUCCESS) {
×
745
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
746
  }
747
  return code;
×
748
}
749

750
int32_t qGetColumnsFromNodeList(void* data, bool isList, SArray** pColList) {
162,479,010✔
751
  int32_t code = TSDB_CODE_SUCCESS;
162,479,010✔
752
  tagFilterAssist ctx = {0};
162,479,010✔
753
  ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
162,496,069✔
754
  if (ctx.colHash == NULL) {
162,444,251✔
755
    code = terrno;
×
756
    goto end;
×
757
  }
758

759
  ctx.index = 0;
162,444,251✔
760
  ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
162,444,251✔
761
  if (ctx.cInfoList == NULL) {
162,507,630✔
762
    code = terrno;
87,114✔
763
    goto end;
×
764
  }
765

766
  if (isList) {
162,420,516✔
767
    SNode* pNode = NULL;
34,233,496✔
768
    FOREACH(pNode, (SNodeList*)data) {
71,153,126✔
769
      nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx);
36,918,324✔
770
      if (TSDB_CODE_SUCCESS != ctx.code) {
36,919,223✔
771
        code = ctx.code;
×
772
        goto end;
×
773
      }
774
      REPLACE_NODE(pNode);
36,919,223✔
775
    }
776
  } else {
777
    SNode* pNode = (SNode*)data;
128,187,020✔
778
    nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx);
128,193,773✔
779
    if (TSDB_CODE_SUCCESS != ctx.code) {
128,223,247✔
780
      code = ctx.code;
×
781
      goto end;
×
782
    }
783
  }
784
  
785
  if (pColList != NULL) *pColList = ctx.cInfoList;
162,418,462✔
786
  ctx.cInfoList = NULL;
162,376,865✔
787

788
end:
162,421,039✔
789
  taosHashCleanup(ctx.colHash);
162,422,690✔
790
  taosArrayDestroy(ctx.cInfoList);
162,237,194✔
791
  return code;
162,304,521✔
792
}
793

794
static int32_t buildGroupInfo(SColumnInfoData* pValue, int32_t i, SArray* gInfo) {
9,461,766✔
795
  int32_t code = TSDB_CODE_SUCCESS;
9,461,766✔
796
  SStreamGroupValue* v = taosArrayReserve(gInfo, 1);
9,461,766✔
797
  if (v == NULL) {
9,469,772✔
798
    code = terrno;
×
799
    goto end;
×
800
  }
801
  if (colDataIsNull_s(pValue, i)) {
18,935,142✔
802
    v->isNull = true;
×
803
  } else {
804
    v->isNull = false;
9,465,370✔
805
    char* data = colDataGetData(pValue, i);
9,465,410✔
806
    if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
9,458,841✔
807
      if (tTagIsJson(data)) {
×
808
        code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
×
809
        goto end;
×
810
      }
811
      if (tTagIsJsonNull(data)) {
×
812
        v->isNull = true;
×
813
        goto end;
×
814
      }
815
      int32_t len = getJsonValueLen(data);
×
816
      v->data.type = pValue->info.type;
×
817
      v->data.nData = len;
×
818
      v->data.pData = taosMemoryCalloc(1, len + 1);
×
819
      if (v->data.pData == NULL) {
×
820
        code = terrno;
×
821
        goto end;
×
822
      }
823
      memcpy(v->data.pData, data, len);
×
824
      qDebug("buildGroupInfo:%d add json data len:%d, data:%s", i, len, (char*)v->data.pData);
×
825
    } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
9,457,415✔
826
      if (varDataTLen(data) > pValue->info.bytes) {
6,896,484✔
827
        code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
828
        goto end;
×
829
      }
830
      v->data.type = pValue->info.type;
6,897,995✔
831
      v->data.nData = varDataLen(data);
6,903,065✔
832
      v->data.pData = taosMemoryCalloc(1, varDataLen(data) + 1);
6,900,851✔
833
      if (v->data.pData == NULL) {
6,905,261✔
834
        code = terrno;
×
835
        goto end;
×
836
      }
837
      memcpy(v->data.pData, varDataVal(data), varDataLen(data));
6,901,642✔
838
      qDebug("buildGroupInfo:%d add var data type:%d, len:%d, data:%s", i, pValue->info.type, varDataLen(data), (char*)v->data.pData);
6,901,642✔
839
    } else if (pValue->info.type == TSDB_DATA_TYPE_DECIMAL) {  // reader todo decimal
2,559,434✔
840
      v->data.type = pValue->info.type;
×
841
      v->data.nData = pValue->info.bytes;
×
842
      v->data.pData = taosMemoryCalloc(1, pValue->info.bytes);
×
843
      if (v->data.pData == NULL) {
×
844
        code = terrno;
×
845
        goto end;
×
846
      }
847
      memcpy(&v->data.pData, data, pValue->info.bytes);
×
848
      qDebug("buildGroupInfo:%d add data type:%d, data:%"PRId64, i, pValue->info.type, v->data.val);
×
849
    } else {  // reader todo decimal
850
      v->data.type = pValue->info.type;
2,555,821✔
851
      memcpy(&v->data.val, data, pValue->info.bytes);
2,560,893✔
852
      qDebug("buildGroupInfo:%d add data type:%d, data:%"PRId64, i, pValue->info.type, v->data.val);
2,560,202✔
853
    }
854
  }
855
end:
1,955,209✔
856
  if (code != TSDB_CODE_SUCCESS) {
9,466,914✔
857
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
858
    v->isNull = true;
×
859
  }
860
  return code;
9,466,914✔
861
}
862

863
static void getColInfoResultForGroupbyForStream(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo,
1,610,688✔
864
                                   SStorageAPI* pAPI, SHashObj* groupIdMap) {
865
  int32_t      code = TSDB_CODE_SUCCESS;
1,610,688✔
866
  int32_t      lino = 0;
1,610,688✔
867
  SArray*      pBlockList = NULL;
1,610,688✔
868
  SSDataBlock* pResBlock = NULL;
1,610,688✔
869
  SArray*      groupData = NULL;
1,612,882✔
870
  SArray*      pUidTagList = NULL;
1,612,882✔
871
  SArray*      gInfo = NULL;
1,612,882✔
872
  int32_t      tbNameIndex = 0;
1,612,882✔
873

874
  int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
1,612,882✔
875
  if (rows == 0) {
1,612,882✔
876
    return;
×
877
  }
878

879
  pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
1,612,882✔
880
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
1,612,882✔
881

882
  for (int32_t i = 0; i < rows; ++i) {
6,445,488✔
883
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
4,834,807✔
884
    QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
4,834,807✔
885
    STUidTagInfo info = {.uid = pkeyInfo->uid};
4,834,807✔
886
    void*        tmp = taosArrayPush(pUidTagList, &info);
4,832,606✔
887
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
4,832,606✔
888
  }
889
 
890
  if (taosArrayGetSize(pUidTagList) > 0) {
1,610,681✔
891
    code = pAPI->metaFn.getTableTagsByUid(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
1,612,882✔
892
  } else {
893
    code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
×
894
  }
895
  if (code != TSDB_CODE_SUCCESS) {
1,612,882✔
896
    goto end;
×
897
  }
898

899
  SArray* pColList = NULL;
1,612,882✔
900
  code = qGetColumnsFromNodeList(group, true, &pColList);
1,612,882✔
901
  if (code != TSDB_CODE_SUCCESS) {
1,612,882✔
902
    goto end;
×
903
  }
904

905
  for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
4,210,565✔
906
    SColumnInfo* tmp = (SColumnInfo*)taosArrayGet(pColList, i);
2,597,683✔
907
    if (tmp != NULL && tmp->colId == -1) {
2,597,683✔
908
      tbNameIndex = i;
1,611,453✔
909
    }
910
  }
911
  
912
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
1,612,882✔
913
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
1,612,882✔
914
  taosArrayDestroy(pColList);
1,611,444✔
915
  if (pResBlock == NULL) {
1,611,444✔
916
    code = terrno;
×
917
    goto end;
×
918
  }
919

920
  pBlockList = taosArrayInit(2, POINTER_BYTES);
1,611,444✔
921
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
1,611,444✔
922

923
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
1,612,882✔
924
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
1,612,882✔
925

926
  groupData = taosArrayInit(2, POINTER_BYTES);
1,612,882✔
927
  QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
1,611,444✔
928

929
  SNode* pNode = NULL;
1,611,444✔
930
  FOREACH(pNode, group) {
4,209,127✔
931
    SScalarParam output = {0};
2,597,683✔
932

933
    switch (nodeType(pNode)) {
2,597,683✔
934
      case QUERY_NODE_VALUE:
×
935
        break;
×
936
      case QUERY_NODE_COLUMN:
2,596,245✔
937
      case QUERY_NODE_OPERATOR:
938
      case QUERY_NODE_FUNCTION: {
939
        SExprNode* expNode = (SExprNode*)pNode;
2,596,245✔
940
        code = createResultData(&expNode->resType, rows, &output);
2,596,245✔
941
        if (code != TSDB_CODE_SUCCESS) {
2,596,245✔
942
          goto end;
×
943
        }
944
        break;
2,596,245✔
945
      }
946

947
      default:
×
948
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
949
        goto end;
×
950
    }
951

952
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
2,596,245✔
953
      SColumnNode*     pSColumnNode = (SColumnNode*)pNode;
2,596,245✔
954
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
2,596,245✔
955
      QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
2,596,245✔
956
      code = colDataAssign(output.columnData, pColInfo, rows, NULL);
2,596,245✔
957
    } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
×
958
      continue;
×
959
    } else {
960
      code = scalarCalculate(pNode, pBlockList, &output, NULL, NULL);
×
961
    }
962

963
    if (code != TSDB_CODE_SUCCESS) {
2,596,245✔
964
      releaseColInfoData(output.columnData);
×
965
      goto end;
×
966
    }
967

968
    void* tmp = taosArrayPush(groupData, &output.columnData);
2,597,683✔
969
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
2,597,683✔
970
  }
971

972
  for (int i = 0; i < rows; i++) {
6,446,251✔
973
    gInfo = taosArrayInit(taosArrayGetSize(groupData), sizeof(SStreamGroupValue));
4,833,369✔
974
    QUERY_CHECK_NULL(gInfo, code, lino, end, terrno);
4,833,369✔
975

976
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
4,833,369✔
977
    QUERY_CHECK_NULL(info, code, lino, end, terrno);
4,833,369✔
978

979
    for (int j = 0; j < taosArrayGetSize(groupData); j++) {
12,229,931✔
980
      SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
7,393,625✔
981
        int32_t ret = buildGroupInfo(pValue, i, gInfo);
7,392,187✔
982
        if (ret != TSDB_CODE_SUCCESS) {
7,398,758✔
983
          qError("buildGroupInfo failed at line %d since %s", __LINE__, tstrerror(ret));
×
984
          goto end;
×
985
        }
986
        if (j == tbNameIndex) {
7,398,758✔
987
          SStreamGroupValue* v = taosArrayGetLast(gInfo);
4,833,369✔
988
          if (v != NULL){
4,833,374✔
989
            v->isTbname = true;
4,833,374✔
990
            v->uid = info->uid;
4,833,374✔
991
          }
992
        }
993
    }
994

995
    int32_t ret = taosHashPut(groupIdMap, &info->uid, sizeof(info->uid), &gInfo, POINTER_BYTES);
4,829,735✔
996
    if (ret != TSDB_CODE_SUCCESS) {
4,834,807✔
997
      qError("put groupid to map failed at line %d since %s", __LINE__, tstrerror(ret));
×
998
      goto end;
×
999
    }
1000
    qDebug("put groupid to map gid:%" PRIu64, info->uid);
4,834,807✔
1001
    gInfo = NULL;
4,833,369✔
1002
  }
1003

1004
end:
1,611,444✔
1005
  blockDataDestroy(pResBlock);
1,611,444✔
1006
  taosArrayDestroy(pBlockList);
1,612,882✔
1007
  taosArrayDestroyEx(pUidTagList, freeItem);
1,611,444✔
1008
  taosArrayDestroyP(groupData, releaseColInfoData);
1,611,444✔
1009
  taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
1,611,444✔
1010

1011
  if (code != TSDB_CODE_SUCCESS) {
1,611,444✔
1012
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1013
  }
1014
}
1015

1016
int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo, uint8_t* digest,
32,613,707✔
1017
                                   SStorageAPI* pAPI, bool initRemainGroups, SHashObj* groupIdMap) {
1018
  int32_t      code = TSDB_CODE_SUCCESS;
32,613,707✔
1019
  int32_t      lino = 0;
32,613,707✔
1020
  SArray*      pBlockList = NULL;
32,613,707✔
1021
  SSDataBlock* pResBlock = NULL;
32,613,707✔
1022
  void*        keyBuf = NULL;
32,628,593✔
1023
  SArray*      groupData = NULL;
32,628,593✔
1024
  SArray*      pUidTagList = NULL;
32,628,593✔
1025
  SArray*      tableList = NULL;
32,628,593✔
1026
  SArray*      gInfo = NULL;
32,628,593✔
1027

1028
  int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
32,628,593✔
1029
  if (rows == 0) {
32,628,593✔
1030
    return TSDB_CODE_SUCCESS;
×
1031
  } 
1032

1033
  T_MD5_CTX context = {0};
32,628,593✔
1034
  if (tsTagFilterCache && groupIdMap == NULL) {
32,628,593✔
1035
    SNodeListNode* listNode = NULL;
×
1036
    code = nodesMakeNode(QUERY_NODE_NODE_LIST, (SNode**)&listNode);
×
1037
    if (TSDB_CODE_SUCCESS != code) {
×
1038
      goto end;
×
1039
    }
1040
    listNode->pNodeList = group;
×
1041
    code = genTbGroupDigest((SNode*)listNode, digest, &context);
×
1042
    QUERY_CHECK_CODE(code, lino, end);
×
1043

1044
    nodesFree(listNode);
×
1045

1046
    code = pAPI->metaFn.metaGetCachedTbGroup(pVnode, pTableListInfo->idInfo.suid, context.digest,
×
1047
                                             tListLen(context.digest), &tableList);
1048
    QUERY_CHECK_CODE(code, lino, end);
×
1049

1050
    if (tableList) {
×
1051
      taosArrayDestroy(pTableListInfo->pTableList);
×
1052
      pTableListInfo->pTableList = tableList;
×
1053
      qDebug("retrieve tb group list from cache, numOfTables:%d",
×
1054
             (int32_t)taosArrayGetSize(pTableListInfo->pTableList));
1055
      goto end;
×
1056
    }
1057
  }
1058

1059
  pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
32,628,593✔
1060
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
32,628,575✔
1061

1062
  for (int32_t i = 0; i < rows; ++i) {
214,687,211✔
1063
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
182,052,127✔
1064
    QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
182,061,237✔
1065
    STUidTagInfo info = {.uid = pkeyInfo->uid};
182,061,237✔
1066
    void*        tmp = taosArrayPush(pUidTagList, &info);
182,059,588✔
1067
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
182,059,588✔
1068
  }
1069

1070
  if (taosArrayGetSize(pUidTagList) > 0) {
32,635,084✔
1071
    code = pAPI->metaFn.getTableTagsByUid(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
32,628,593✔
1072
  } else {
1073
    code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
×
1074
  }
1075
  if (code != TSDB_CODE_SUCCESS) {
32,628,557✔
1076
    goto end;
×
1077
  }
1078

1079
  SArray* pColList = NULL;
32,628,557✔
1080
  code = qGetColumnsFromNodeList(group, true, &pColList); 
32,628,557✔
1081
  if (code != TSDB_CODE_SUCCESS) {
32,619,062✔
1082
    goto end;
×
1083
  }
1084

1085
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
32,619,062✔
1086
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
32,621,597✔
1087
  taosArrayDestroy(pColList);
32,625,603✔
1088
  if (pResBlock == NULL) {
32,623,737✔
1089
    code = terrno;
×
1090
    goto end;
×
1091
  }
1092

1093
  //  int64_t st1 = taosGetTimestampUs();
1094
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
1095

1096
  pBlockList = taosArrayInit(2, POINTER_BYTES);
32,623,737✔
1097
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
32,624,318✔
1098

1099
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
32,628,593✔
1100
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
32,628,593✔
1101

1102
  groupData = taosArrayInit(2, POINTER_BYTES);
32,628,593✔
1103
  QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
32,618,324✔
1104

1105
  SNode* pNode = NULL;
32,618,930✔
1106
  FOREACH(pNode, group) {
66,950,510✔
1107
    SScalarParam output = {0};
34,327,797✔
1108

1109
    switch (nodeType(pNode)) {
34,323,081✔
1110
      case QUERY_NODE_VALUE:
×
1111
        break;
×
1112
      case QUERY_NODE_COLUMN:
34,323,594✔
1113
      case QUERY_NODE_OPERATOR:
1114
      case QUERY_NODE_FUNCTION: {
1115
        SExprNode* expNode = (SExprNode*)pNode;
34,323,594✔
1116
        code = createResultData(&expNode->resType, rows, &output);
34,323,594✔
1117
        if (code != TSDB_CODE_SUCCESS) {
34,332,173✔
1118
          goto end;
×
1119
        }
1120
        break;
34,332,173✔
1121
      }
1122

1123
      default:
×
1124
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
1125
        goto end;
×
1126
    }
1127

1128
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
34,332,173✔
1129
      SColumnNode*     pSColumnNode = (SColumnNode*)pNode;
34,140,762✔
1130
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
34,140,762✔
1131
      QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
34,130,502✔
1132
      code = colDataAssign(output.columnData, pColInfo, rows, NULL);
34,130,502✔
1133
    } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
192,411✔
1134
      continue;
×
1135
    } else {
1136
      code = scalarCalculate(pNode, pBlockList, &output, NULL, NULL);
183,979✔
1137
    }
1138

1139
    if (code != TSDB_CODE_SUCCESS) {
34,316,065✔
1140
      releaseColInfoData(output.columnData);
×
1141
      goto end;
×
1142
    }
1143

1144
    void* tmp = taosArrayPush(groupData, &output.columnData);
34,327,134✔
1145
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
34,327,134✔
1146
  }
1147

1148
  int32_t keyLen = 0;
32,621,210✔
1149
  SNode*  node;
1150
  FOREACH(node, group) {
66,928,862✔
1151
    SExprNode* pExpr = (SExprNode*)node;
34,304,618✔
1152
    keyLen += pExpr->resType.bytes;
34,304,618✔
1153
  }
1154

1155
  int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
32,603,225✔
1156
  keyLen += nullFlagSize;
32,605,139✔
1157

1158
  keyBuf = taosMemoryCalloc(1, keyLen);
32,605,139✔
1159
  if (keyBuf == NULL) {
32,597,839✔
1160
    code = terrno;
×
1161
    goto end;
×
1162
  }
1163

1164
  if (initRemainGroups) {
32,597,839✔
1165
    pTableListInfo->remainGroups =
15,014,951✔
1166
        taosHashInit(rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
15,001,652✔
1167
    if (pTableListInfo->remainGroups == NULL) {
15,014,951✔
1168
      code = terrno;
×
1169
      goto end;
×
1170
    }
1171
  }
1172

1173
  for (int i = 0; i < rows; i++) {
214,371,132✔
1174
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
181,750,074✔
1175
    QUERY_CHECK_NULL(info, code, lino, end, terrno);
181,792,073✔
1176

1177
    if (groupIdMap != NULL){
181,792,073✔
1178
      gInfo = taosArrayInit(taosArrayGetSize(groupData), sizeof(SStreamGroupValue));
1,131,832✔
1179
    }
1180
    
1181
    char* isNull = (char*)keyBuf;
181,788,299✔
1182
    char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(group);
181,788,299✔
1183
    for (int j = 0; j < taosArrayGetSize(groupData); j++) {
376,495,288✔
1184
      SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
194,728,111✔
1185

1186
      if (groupIdMap != NULL && gInfo != NULL) {
194,738,393✔
1187
        int32_t ret = buildGroupInfo(pValue, i, gInfo);
2,067,373✔
1188
        if (ret != TSDB_CODE_SUCCESS) {
2,069,594✔
1189
          qError("buildGroupInfo failed at line %d since %s", __LINE__, tstrerror(ret));
×
1190
          taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
×
1191
          gInfo = NULL;
×
1192
        }
1193
      }
1194
      
1195
      if (colDataIsNull_s(pValue, i)) {
389,481,989✔
1196
        isNull[j] = 1;
628,683✔
1197
      } else {
1198
        isNull[j] = 0;
194,112,692✔
1199
        char* data = colDataGetData(pValue, i);
194,100,386✔
1200
        if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
194,106,626✔
1201
          if (tTagIsJson(data)) {
352,110✔
1202
            code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
16,005✔
1203
            goto end;
16,005✔
1204
          }
1205
          if (tTagIsJsonNull(data)) {
336,105✔
1206
            isNull[j] = 1;
×
1207
            continue;
×
1208
          }
1209
          int32_t len = getJsonValueLen(data);
336,105✔
1210
          memcpy(pStart, data, len);
336,105✔
1211
          pStart += len;
336,105✔
1212
        } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
193,741,959✔
1213
          if (IS_STR_DATA_BLOB(pValue->info.type)) {
165,786,360✔
1214
            if (blobDataTLen(data) > TSDB_MAX_BLOB_LEN) {
3,472✔
1215
              code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
1216
              goto end;
×
1217
            }
1218
            memcpy(pStart, data, blobDataTLen(data));
×
1219
            pStart += blobDataTLen(data);
×
1220
          } else {
1221
            if (varDataTLen(data) > pValue->info.bytes) {
165,788,924✔
1222
              code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
1223
              goto end;
×
1224
            }
1225
            memcpy(pStart, data, varDataTLen(data));
165,803,367✔
1226
            pStart += varDataTLen(data);
165,802,451✔
1227
          }
1228
        } else {
1229
          memcpy(pStart, data, pValue->info.bytes);
27,949,711✔
1230
          pStart += pValue->info.bytes;
27,961,470✔
1231
        }
1232
      }
1233
    }
1234

1235
    int32_t len = (int32_t)(pStart - (char*)keyBuf);
181,741,322✔
1236
    info->groupId = calcGroupId(keyBuf, len);
181,741,322✔
1237
    if (groupIdMap != NULL && gInfo != NULL) {
181,777,808✔
1238
      int32_t ret = taosHashPut(groupIdMap, &info->groupId, sizeof(info->groupId), &gInfo, POINTER_BYTES);
1,129,642✔
1239
      if (ret != TSDB_CODE_SUCCESS) {
1,131,832✔
1240
        qError("put groupid to map failed at line %d since %s", __LINE__, tstrerror(ret));
×
1241
        taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
×
1242
      }
1243
      qDebug("put groupid to map gid:%" PRIu64, info->groupId);
1,131,832✔
1244
      gInfo = NULL;
1,131,832✔
1245
    }
1246
    if (initRemainGroups) {
181,779,998✔
1247
      // groupId ~ table uid
1248
      code = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid),
85,066,375✔
1249
                         sizeof(info->uid));
1250
      if (code == TSDB_CODE_DUP_KEY) {
85,044,111✔
1251
        code = TSDB_CODE_SUCCESS;
4,921,475✔
1252
      }
1253
      QUERY_CHECK_CODE(code, lino, end);
85,044,111✔
1254
    }
1255
  }
1256

1257
  if (tsTagFilterCache && groupIdMap == NULL) {
32,621,058✔
1258
    tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
×
1259
    QUERY_CHECK_NULL(tableList, code, lino, end, terrno);
×
1260

1261
    code = pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest,
×
1262
                                              tListLen(context.digest), tableList,
1263
                                              taosArrayGetSize(tableList) * sizeof(STableKeyInfo));
×
1264
    QUERY_CHECK_CODE(code, lino, end);
×
1265
  }
1266

1267
  //  int64_t st2 = taosGetTimestampUs();
1268
  //  qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
1269

1270
end:
32,630,944✔
1271
  taosMemoryFreeClear(keyBuf);
32,621,172✔
1272
  blockDataDestroy(pResBlock);
32,627,881✔
1273
  taosArrayDestroy(pBlockList);
32,619,373✔
1274
  taosArrayDestroyEx(pUidTagList, freeItem);
32,614,443✔
1275
  taosArrayDestroyP(groupData, releaseColInfoData);
32,614,953✔
1276
  taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
32,609,115✔
1277

1278
  if (code != TSDB_CODE_SUCCESS) {
32,608,422✔
1279
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
16,005✔
1280
  }
1281
  return code;
32,608,465✔
1282
}
1283

1284
static int32_t nameComparFn(const void* p1, const void* p2) {
7,169,428✔
1285
  const char* pName1 = *(const char**)p1;
7,169,428✔
1286
  const char* pName2 = *(const char**)p2;
7,170,828✔
1287

1288
  int32_t ret = strcmp(pName1, pName2);
7,179,625✔
1289
  if (ret == 0) {
7,179,625✔
1290
    return 0;
53,418✔
1291
  } else {
1292
    return (ret > 0) ? 1 : -1;
7,126,207✔
1293
  }
1294
}
1295

1296
static SArray* getTableNameList(const SNodeListNode* pList) {
4,654,913✔
1297
  int32_t    code = TSDB_CODE_SUCCESS;
4,654,913✔
1298
  int32_t    lino = 0;
4,654,913✔
1299
  int32_t    len = LIST_LENGTH(pList->pNodeList);
4,654,913✔
1300
  SListCell* cell = pList->pNodeList->pHead;
4,654,913✔
1301

1302
  SArray* pTbList = taosArrayInit(len, POINTER_BYTES);
4,656,382✔
1303
  QUERY_CHECK_NULL(pTbList, code, lino, _end, terrno);
4,657,851✔
1304

1305
  for (int i = 0; i < pList->pNodeList->length; i++) {
12,319,338✔
1306
    SValueNode* valueNode = (SValueNode*)cell->pNode;
7,657,715✔
1307
    if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
7,653,943✔
1308
      terrno = TSDB_CODE_INVALID_PARA;
×
1309
      taosArrayDestroy(pTbList);
×
1310
      return NULL;
×
1311
    }
1312

1313
    char* name = varDataVal(valueNode->datum.p);
7,658,549✔
1314
    void* tmp = taosArrayPush(pTbList, &name);
7,658,020✔
1315
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
7,658,020✔
1316
    cell = cell->pNext;
7,658,020✔
1317
  }
1318

1319
  size_t numOfTables = taosArrayGetSize(pTbList);
4,657,851✔
1320

1321
  // order the name
1322
  taosArraySort(pTbList, nameComparFn);
4,657,851✔
1323

1324
  // remove the duplicates
1325
  SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
4,657,151✔
1326
  QUERY_CHECK_NULL(pNewList, code, lino, _end, terrno);
4,653,550✔
1327
  void* tmpTbl = taosArrayGet(pTbList, 0);
4,653,550✔
1328
  QUERY_CHECK_NULL(tmpTbl, code, lino, _end, terrno);
4,652,081✔
1329
  void* tmp = taosArrayPush(pNewList, tmpTbl);
4,652,744✔
1330
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
4,652,744✔
1331

1332
  for (int32_t i = 1; i < numOfTables; ++i) {
7,657,424✔
1333
    char** name = taosArrayGetLast(pNewList);
3,000,063✔
1334
    char** nameInOldList = taosArrayGet(pTbList, i);
3,003,770✔
1335
    QUERY_CHECK_NULL(nameInOldList, code, lino, _end, terrno);
3,003,770✔
1336
    if (strcmp(*name, *nameInOldList) == 0) {
3,003,770✔
1337
      continue;
30,860✔
1338
    }
1339

1340
    tmp = taosArrayPush(pNewList, nameInOldList);
2,973,820✔
1341
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2,973,820✔
1342
  }
1343

1344
_end:
4,657,361✔
1345
  taosArrayDestroy(pTbList);
4,657,361✔
1346
  if (code != TSDB_CODE_SUCCESS) {
4,648,653✔
1347
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1348
    return NULL;
×
1349
  }
1350
  return pNewList;
4,653,684✔
1351
}
1352

1353
static int tableUidCompare(const void* a, const void* b) {
×
1354
  uint64_t u1 = *(uint64_t*)a;
×
1355
  uint64_t u2 = *(uint64_t*)b;
×
1356

1357
  if (u1 == u2) {
×
1358
    return 0;
×
1359
  }
1360

1361
  return u1 < u2 ? -1 : 1;
×
1362
}
1363

1364
static int32_t filterTableInfoCompare(const void* a, const void* b) {
55,531,994✔
1365
  STUidTagInfo* p1 = (STUidTagInfo*)a;
55,531,994✔
1366
  STUidTagInfo* p2 = (STUidTagInfo*)b;
55,531,994✔
1367

1368
  if (p1->uid == p2->uid) {
55,531,994✔
1369
    return 0;
×
1370
  }
1371

1372
  return p1->uid < p2->uid ? -1 : 1;
55,535,639✔
1373
}
1374

1375
static FilterCondType checkTagCond(SNode* cond) {
151,749,616✔
1376
  if (nodeType(cond) == QUERY_NODE_OPERATOR) {
151,749,616✔
1377
    return FILTER_NO_LOGIC;
115,962,216✔
1378
  }
1379
  if (nodeType(cond) != QUERY_NODE_LOGIC_CONDITION || ((SLogicConditionNode*)cond)->condType != LOGIC_COND_TYPE_AND) {
35,821,181✔
1380
    return FILTER_AND;
4,991,408✔
1381
  }
1382
  return FILTER_OTHER;
30,817,491✔
1383
}
1384

1385
static int32_t optimizeTbnameInCond(void* pVnode, int64_t suid, SArray* list, SNode* cond, SStorageAPI* pAPI) {
151,759,493✔
1386
  int32_t ret = -1;
151,759,493✔
1387
  int32_t ntype = nodeType(cond);
151,759,493✔
1388

1389
  if (ntype == QUERY_NODE_OPERATOR) {
151,784,769✔
1390
    ret = optimizeTbnameInCondImpl(pVnode, list, cond, pAPI, suid);
115,973,780✔
1391
  }
1392

1393
  if (ntype != QUERY_NODE_LOGIC_CONDITION || ((SLogicConditionNode*)cond)->condType != LOGIC_COND_TYPE_AND) {
151,681,264✔
1394
    return ret;
120,889,754✔
1395
  }
1396

1397
  bool                 hasTbnameCond = false;
30,780,398✔
1398
  SLogicConditionNode* pNode = (SLogicConditionNode*)cond;
30,780,398✔
1399
  SNodeList*           pList = (SNodeList*)pNode->pParameterList;
30,780,398✔
1400

1401
  int32_t len = LIST_LENGTH(pList);
30,816,104✔
1402
  if (len <= 0) {
30,802,129✔
1403
    return ret;
×
1404
  }
1405

1406
  SListCell* cell = pList->pHead;
30,802,129✔
1407
  for (int i = 0; i < len; i++) {
95,052,301✔
1408
    if (cell == NULL) break;
64,312,924✔
1409
    if (optimizeTbnameInCondImpl(pVnode, list, cell->pNode, pAPI, suid) == 0) {
64,312,924✔
1410
      hasTbnameCond = true;
44,912✔
1411
      break;
44,912✔
1412
    }
1413
    cell = cell->pNext;
64,238,900✔
1414
  }
1415

1416
  taosArraySort(list, filterTableInfoCompare);
30,784,289✔
1417
  taosArrayRemoveDuplicate(list, filterTableInfoCompare, NULL);
30,790,216✔
1418

1419
  if (hasTbnameCond) {
30,809,673✔
1420
    ret = pAPI->metaFn.getTableTagsByUid(pVnode, suid, list);
44,912✔
1421
  }
1422

1423
  return ret;
30,810,935✔
1424
}
1425

1426
// only return uid that does not contained in pExistedUidList
1427
static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, SNode* pTagCond, SStorageAPI* pStoreAPI,
180,284,376✔
1428
                                        uint64_t suid) {
1429
  if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) {
180,284,376✔
1430
    return -1;
61,506✔
1431
  }
1432

1433
  SOperatorNode* pNode = (SOperatorNode*)pTagCond;
180,235,254✔
1434
  if (pNode->opType != OP_TYPE_IN) {
180,235,254✔
1435
    return -1;
170,335,205✔
1436
  }
1437

1438
  if ((pNode->pLeft != NULL && ((nodeType(pNode->pLeft) == QUERY_NODE_FUNCTION &&
9,825,815✔
1439
                                 ((SFunctionNode*)pNode->pLeft)->funcType == FUNCTION_TYPE_TBNAME)) ||
4,654,913✔
1440
       (nodeType(pNode->pLeft) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pLeft)->colType == COLUMN_TYPE_TBNAME)) &&
5,169,485✔
1441
      (pNode->pRight != NULL && nodeType(pNode->pRight) == QUERY_NODE_NODE_LIST)) {
4,650,295✔
1442
    SNodeListNode* pList = (SNodeListNode*)pNode->pRight;
4,657,851✔
1443

1444
    int32_t len = LIST_LENGTH(pList->pNodeList);
4,654,913✔
1445
    if (len <= 0) {
4,654,913✔
1446
      return -1;
×
1447
    }
1448

1449
    SArray*   pTbList = getTableNameList(pList);
4,654,913✔
1450
    int32_t   numOfTables = taosArrayGetSize(pTbList);
4,652,081✔
1451
    SHashObj* uHash = NULL;
4,652,081✔
1452

1453
    size_t numOfExisted = taosArrayGetSize(pExistedUidList);  // len > 0 means there already have uids
4,652,081✔
1454
    if (numOfExisted > 0) {
4,654,384✔
1455
      uHash = taosHashInit(numOfExisted / 0.7, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
4,752✔
1456
      if (!uHash) {
4,752✔
1457
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1458
        return terrno;
×
1459
      }
1460

1461
      for (int i = 0; i < numOfExisted; i++) {
4,718,736✔
1462
        STUidTagInfo* pTInfo = taosArrayGet(pExistedUidList, i);
4,713,984✔
1463
        if (!pTInfo) {
4,713,984✔
1464
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1465
          return terrno;
×
1466
        }
1467
        int32_t tempRes = taosHashPut(uHash, &pTInfo->uid, sizeof(uint64_t), &i, sizeof(i));
4,713,984✔
1468
        if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
4,713,984✔
1469
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
1470
          return tempRes;
×
1471
        }
1472
      }
1473
    }
1474

1475
    for (int i = 0; i < numOfTables; i++) {
12,220,974✔
1476
      char* name = taosArrayGetP(pTbList, i);
7,597,991✔
1477

1478
      uint64_t uid = 0, csuid = 0;
7,595,716✔
1479
      if (pStoreAPI->metaFn.getTableUidByName(pVnode, name, &uid) == 0) {
7,599,423✔
1480
        ETableType tbType = TSDB_TABLE_MAX;
4,658,119✔
1481
        if (pStoreAPI->metaFn.getTableTypeSuidByName(pVnode, name, &tbType, &csuid) == 0 &&
4,658,119✔
1482
            tbType == TSDB_CHILD_TABLE) {
4,653,206✔
1483
          if (suid != csuid) {
4,624,541✔
1484
            continue;
21,184✔
1485
          }
1486
          if (NULL == uHash || taosHashGet(uHash, &uid, sizeof(uid)) == NULL) {
4,603,357✔
1487
            STUidTagInfo s = {.uid = uid, .name = name, .pTagVal = NULL};
4,600,981✔
1488
            void*        tmp = taosArrayPush(pExistedUidList, &s);
4,606,810✔
1489
            if (!tmp) {
4,606,810✔
1490
              return terrno;
×
1491
            }
1492
          }
1493
        } else {
1494
          taosArrayDestroy(pTbList);
28,665✔
1495
          taosHashCleanup(uHash);
28,398✔
1496
          return -1;
28,398✔
1497
        }
1498
      } else {
1499
        //        qWarn("failed to get tableIds from by table name: %s, reason: %s", name, tstrerror(terrno));
1500
        terrno = 0;
2,943,302✔
1501
      }
1502
    }
1503

1504
    taosHashCleanup(uHash);
4,622,983✔
1505
    taosArrayDestroy(pTbList);
4,629,453✔
1506
    return 0;
4,629,453✔
1507
  }
1508

1509
  return -1;
5,176,077✔
1510
}
1511

1512
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
177,263,516✔
1513
                                        SStorageAPI* pStorageAPI) {
1514
  int32_t      code = TSDB_CODE_SUCCESS;
177,263,516✔
1515
  int32_t      lino = 0;
177,263,516✔
1516
  SSDataBlock* pResBlock = NULL;
177,263,516✔
1517
  code = createDataBlock(&pResBlock);
177,310,071✔
1518
  QUERY_CHECK_CODE(code, lino, _end);
177,264,540✔
1519

1520
  for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
366,362,809✔
1521
    SColumnInfoData colInfo = {0};
189,123,624✔
1522
    void*           tmp = taosArrayGet(pColList, i);
189,020,732✔
1523
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
189,027,345✔
1524
    colInfo.info = *(SColumnInfo*)tmp;
189,027,345✔
1525
    code = blockDataAppendColInfo(pResBlock, &colInfo);
188,939,672✔
1526
    QUERY_CHECK_CODE(code, lino, _end);
189,056,064✔
1527
  }
1528

1529
  code = blockDataEnsureCapacity(pResBlock, numOfTables);
177,251,035✔
1530
  if (code != TSDB_CODE_SUCCESS) {
177,230,579✔
1531
    terrno = code;
×
1532
    blockDataDestroy(pResBlock);
×
1533
    return NULL;
×
1534
  }
1535

1536
  pResBlock->info.rows = numOfTables;
177,230,579✔
1537

1538
  int32_t numOfCols = taosArrayGetSize(pResBlock->pDataBlock);
177,236,815✔
1539

1540
  for (int32_t i = 0; i < numOfTables; i++) {
2,147,483,647✔
1541
    STUidTagInfo* p1 = taosArrayGet(pUidTagList, i);
2,049,733,042✔
1542
    QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
2,049,756,092✔
1543

1544
    for (int32_t j = 0; j < numOfCols; j++) {
2,147,483,647✔
1545
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
2,111,882,365✔
1546
      QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
2,111,835,255✔
1547

1548
      if (pColInfo->info.colId == -1) {  // tbname
2,111,835,255✔
1549
        char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
26,739,054✔
1550
        if (p1->name != NULL) {
26,748,736✔
1551
          STR_TO_VARSTR(str, p1->name);
4,601,174✔
1552
        } else {  // name is not retrieved during filter
1553
          code = pStorageAPI->metaFn.getTableNameByUid(pVnode, p1->uid, str);
22,158,234✔
1554
          QUERY_CHECK_CODE(code, lino, _end);
22,160,926✔
1555
        }
1556

1557
        code = colDataSetVal(pColInfo, i, str, false);
26,761,657✔
1558
        QUERY_CHECK_CODE(code, lino, _end);
26,769,180✔
1559
#if TAG_FILTER_DEBUG
1560
        qDebug("tagfilter uid:%ld, tbname:%s", *uid, str + 2);
1561
#endif
1562
      } else {
1563
        STagVal tagVal = {0};
2,084,943,921✔
1564
        tagVal.cid = pColInfo->info.colId;
2,085,125,155✔
1565
        if (p1->pTagVal == NULL) {
2,085,147,718✔
1566
          colDataSetNULL(pColInfo, i);
×
1567
        } else {
1568
          const char* p = pStorageAPI->metaFn.extractTagVal(p1->pTagVal, pColInfo->info.type, &tagVal);
2,085,220,033✔
1569

1570
          if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) {
2,085,762,573✔
1571
            colDataSetNULL(pColInfo, i);
39,496,447✔
1572
          } else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
2,046,263,163✔
1573
            code = colDataSetVal(pColInfo, i, p, false);
6,492,916✔
1574
            QUERY_CHECK_CODE(code, lino, _end);
6,492,916✔
1575
          } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
2,147,483,647✔
1576
            if (IS_STR_DATA_BLOB(pColInfo->info.type)) {
1,320,544,075✔
1577
              QUERY_CHECK_CODE(code = TSDB_CODE_BLOB_NOT_SUPPORT_TAG, lino, _end);
×
1578
            }
1579
            char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
1,320,276,615✔
1580
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,320,435,915✔
1581
            varDataSetLen(tmp, tagVal.nData);
1,320,435,915✔
1582
            memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
1,320,437,728✔
1583
            code = colDataSetVal(pColInfo, i, tmp, false);
1,320,427,098✔
1584
#if TAG_FILTER_DEBUG
1585
            qDebug("tagfilter varch:%s", tmp + 2);
1586
#endif
1587
            taosMemoryFree(tmp);
1,320,587,378✔
1588
            QUERY_CHECK_CODE(code, lino, _end);
1,320,302,009✔
1589
          } else {
1590
            code = colDataSetVal(pColInfo, i, (const char*)&tagVal.i64, false);
719,370,119✔
1591
            QUERY_CHECK_CODE(code, lino, _end);
719,690,930✔
1592
#if TAG_FILTER_DEBUG
1593
            if (pColInfo->info.type == TSDB_DATA_TYPE_INT) {
1594
              qDebug("tagfilter int:%d", *(int*)(&tagVal.i64));
1595
            } else if (pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE) {
1596
              qDebug("tagfilter double:%f", *(double*)(&tagVal.i64));
1597
            }
1598
#endif
1599
          }
1600
        }
1601
      }
1602
    }
1603
  }
1604

1605
_end:
177,378,198✔
1606
  if (code != TSDB_CODE_SUCCESS) {
177,401,376✔
1607
    blockDataDestroy(pResBlock);
42,450✔
1608
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1609
    terrno = code;
×
1610
    return NULL;
×
1611
  }
1612
  return pResBlock;
177,359,000✔
1613
}
1614

1615
static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList,
128,045,797✔
1616
                                 bool* pResultList, bool addUid) {
1617
  taosArrayClear(pUidList);
128,045,797✔
1618

1619
  STableKeyInfo info = {.uid = 0, .groupId = 0};
127,990,872✔
1620
  int32_t       numOfTables = taosArrayGetSize(pUidTagList);
128,009,495✔
1621
  for (int32_t i = 0; i < numOfTables; ++i) {
1,969,051,572✔
1622
    if (pResultList[i]) {
1,840,805,951✔
1623
      STUidTagInfo* tmpTag = (STUidTagInfo*)taosArrayGet(pUidTagList, i);
917,301,789✔
1624
      if (!tmpTag) {
917,357,170✔
1625
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1626
        return terrno;
×
1627
      }
1628
      uint64_t uid = tmpTag->uid;
917,357,170✔
1629
      qDebug("tagfilter get uid:%" PRId64 ", res:%d", uid, pResultList[i]);
917,364,922✔
1630

1631
      info.uid = uid;
917,606,709✔
1632
      void* p = taosArrayPush(pListInfo->pTableList, &info);
917,606,709✔
1633
      if (p == NULL) {
917,517,323✔
1634
        return terrno;
×
1635
      }
1636

1637
      if (addUid) {
917,517,323✔
1638
        void* tmp = taosArrayPush(pUidList, &uid);
18,450✔
1639
        if (tmp == NULL) {
18,450✔
1640
          return terrno;
×
1641
        }
1642
      }
1643
    }
1644
  }
1645

1646
  return TSDB_CODE_SUCCESS;
128,245,621✔
1647
}
1648

1649
static int32_t copyExistedUids(SArray* pUidTagList, const SArray* pUidList) {
151,771,934✔
1650
  int32_t code = TSDB_CODE_SUCCESS;
151,771,934✔
1651
  int32_t numOfExisted = taosArrayGetSize(pUidList);
151,771,934✔
1652
  if (numOfExisted == 0) {
151,747,282✔
1653
    return code;
123,575,017✔
1654
  }
1655

1656
  for (int32_t i = 0; i < numOfExisted; ++i) {
351,968,126✔
1657
    uint64_t* uid = taosArrayGet(pUidList, i);
323,790,283✔
1658
    if (!uid) {
323,801,301✔
1659
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1660
      return terrno;
×
1661
    }
1662
    STUidTagInfo info = {.uid = *uid};
323,801,301✔
1663
    void*        tmp = taosArrayPush(pUidTagList, &info);
323,795,861✔
1664
    if (!tmp) {
323,795,861✔
1665
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1666
      return code;
×
1667
    }
1668
  }
1669
  return code;
28,177,843✔
1670
}
1671

1672
static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SNode* pTagCond, void* pVnode,
1,707,320,281✔
1673
                                 SIdxFltStatus status, SStorageAPI* pAPI, bool addUid, bool* listAdded, void* pStreamInfo) {
1674
  *listAdded = false;
1,707,320,281✔
1675
  if (pTagCond == NULL) {
1,707,543,766✔
1676
    return TSDB_CODE_SUCCESS;
1,555,367,575✔
1677
  }
1678

1679
  terrno = TSDB_CODE_SUCCESS;
152,176,191✔
1680

1681
  int32_t      lino = 0;
151,747,736✔
1682
  int32_t      code = TSDB_CODE_SUCCESS;
151,747,736✔
1683
  SArray*      pBlockList = NULL;
151,747,736✔
1684
  SSDataBlock* pResBlock = NULL;
151,747,736✔
1685
  SScalarParam output = {0};
151,738,727✔
1686
  SArray*      pUidTagList = NULL;
151,718,006✔
1687

1688
  SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
151,718,006✔
1689

1690
  //  int64_t stt = taosGetTimestampUs();
1691
  pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
151,734,049✔
1692
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
151,690,730✔
1693

1694
  code = copyExistedUids(pUidTagList, pUidList);
151,690,730✔
1695
  QUERY_CHECK_CODE(code, lino, end);
151,752,277✔
1696

1697
  FilterCondType condType = checkTagCond(pTagCond);
151,752,277✔
1698

1699
  int32_t filter = optimizeTbnameInCond(pVnode, pListInfo->idInfo.suid, pUidTagList, pTagCond, pAPI);
151,732,828✔
1700
  if (filter == 0) {  // tbname in filter is activated, do nothing and return
151,670,633✔
1701
    taosArrayClear(pUidList);
4,629,453✔
1702

1703
    int32_t numOfRows = taosArrayGetSize(pUidTagList);
4,629,453✔
1704
    code = taosArrayEnsureCap(pUidList, numOfRows);
4,629,453✔
1705
    QUERY_CHECK_CODE(code, lino, end);
4,629,453✔
1706

1707
    for (int32_t i = 0; i < numOfRows; ++i) {
13,981,135✔
1708
      STUidTagInfo* pInfo = taosArrayGet(pUidTagList, i);
9,351,682✔
1709
      QUERY_CHECK_NULL(pInfo, code, lino, end, terrno);
9,351,682✔
1710
      void* tmp = taosArrayPush(pUidList, &pInfo->uid);
9,351,682✔
1711
      QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
9,351,682✔
1712
    }
1713
    terrno = 0;
4,629,453✔
1714
  } else {
1715
    if (((condType == FILTER_NO_LOGIC || condType == FILTER_AND) && status != SFLT_NOT_INDEX) ||
265,067,479✔
1716
          taosArrayGetSize(pUidTagList) > 0) {
118,008,280✔
1717
      code = pAPI->metaFn.getTableTagsByUid(pVnode, pListInfo->idInfo.suid, pUidTagList);
33,915,160✔
1718
    } else {
1719
      code = pAPI->metaFn.getTableTags(pVnode, pListInfo->idInfo.suid, pUidTagList);
113,144,039✔
1720
    }
1721
    if (code != TSDB_CODE_SUCCESS) {
147,138,581✔
1722
      qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code), pListInfo->idInfo.suid);
×
1723
      terrno = code;
×
1724
      QUERY_CHECK_CODE(code, lino, end);
×
1725
    }
1726
  }
1727

1728
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
151,768,034✔
1729
  if (numOfTables == 0) {
151,786,527✔
1730
    goto end;
23,539,241✔
1731
  }
1732

1733
  SArray* pColList = NULL;
128,247,286✔
1734
  code = qGetColumnsFromNodeList(pTagCond, false, &pColList); 
128,254,875✔
1735
  if (code != TSDB_CODE_SUCCESS) {
128,105,699✔
1736
    goto end;
×
1737
  }
1738
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
128,105,699✔
1739
  taosArrayDestroy(pColList);
128,210,074✔
1740
  if (pResBlock == NULL) {
128,181,937✔
1741
    code = terrno;
×
1742
    QUERY_CHECK_CODE(code, lino, end);
×
1743
  }
1744

1745
  //  int64_t st1 = taosGetTimestampUs();
1746
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
1747
  pBlockList = taosArrayInit(2, POINTER_BYTES);
128,181,937✔
1748
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
128,232,418✔
1749

1750
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
128,238,357✔
1751
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
128,238,357✔
1752

1753
  code = createResultData(&type, numOfTables, &output);
128,238,357✔
1754
  if (code != TSDB_CODE_SUCCESS) {
128,157,781✔
1755
    terrno = code;
×
1756
    QUERY_CHECK_CODE(code, lino, end);
×
1757
  }
1758

1759
  code = scalarCalculate(pTagCond, pBlockList, &output, pStreamInfo, NULL);
128,157,781✔
1760
  if (code != TSDB_CODE_SUCCESS) {
128,043,886✔
1761
    qError("failed to calculate scalar, reason:%s", tstrerror(code));
10,670✔
1762
    terrno = code;
10,670✔
1763
    QUERY_CHECK_CODE(code, lino, end);
10,670✔
1764
  }
1765

1766
  code = doSetQualifiedUid(pListInfo, pUidList, pUidTagList, (bool*)output.columnData->pData, addUid);
128,033,216✔
1767
  if (code != TSDB_CODE_SUCCESS) {
128,250,986✔
1768
    terrno = code;
×
1769
    QUERY_CHECK_CODE(code, lino, end);
×
1770
  }
1771
  *listAdded = true;
128,250,986✔
1772

1773
end:
151,788,856✔
1774
  if (code != TSDB_CODE_SUCCESS) {
151,756,147✔
1775
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
10,670✔
1776
  }
1777
  blockDataDestroy(pResBlock);
151,756,147✔
1778
  taosArrayDestroy(pBlockList);
151,665,514✔
1779
  taosArrayDestroyEx(pUidTagList, freeItem);
151,652,063✔
1780

1781
  colDataDestroy(output.columnData);
151,753,096✔
1782
  taosMemoryFreeClear(output.columnData);
151,736,004✔
1783
  return code;
151,726,839✔
1784
}
1785

1786
typedef struct {
1787
  int32_t code;
1788
  SStreamRuntimeFuncInfo* pStreamRuntimeInfo;
1789
} PlaceHolderContext;
1790

1791
static EDealRes replacePlaceHolderColumn(SNode** pNode, void* pContext) {
33,210✔
1792
  PlaceHolderContext* pData = (PlaceHolderContext*)pContext;
33,210✔
1793
  if (QUERY_NODE_FUNCTION != nodeType((*pNode))) {
33,210✔
1794
    return DEAL_RES_CONTINUE;
14,760✔
1795
  }
1796
  SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
18,450✔
1797
  if (!fmIsStreamPesudoColVal(pFuncNode->funcId)) {
18,450✔
1798
    return DEAL_RES_CONTINUE;
7,380✔
1799
  }
1800
  pData->code = fmSetStreamPseudoFuncParamVal(pFuncNode->funcId, pFuncNode->pParameterList, pData->pStreamRuntimeInfo);
11,070✔
1801
  if (pData->code != TSDB_CODE_SUCCESS) {
11,070✔
1802
    return DEAL_RES_ERROR;
×
1803
  }
1804
  SNode* pFirstParam = nodesListGetNode(pFuncNode->pParameterList, 0);
11,070✔
1805
  ((SValueNode*)pFirstParam)->translate = true;
11,070✔
1806
  SValueNode* res = NULL;
11,070✔
1807
  pData->code = nodesCloneNode(pFirstParam, (SNode**)&res);
11,070✔
1808
  if (NULL == res) {
11,070✔
1809
    return DEAL_RES_ERROR;
×
1810
  }
1811
  nodesDestroyNode(*pNode);
11,070✔
1812
  *pNode = (SNode*)res;
11,070✔
1813

1814
  return DEAL_RES_CONTINUE;
11,070✔
1815
}
1816

1817
int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
1,706,952,176✔
1818
                     STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI, void* pStreamInfo) {
1819
  int32_t code = TSDB_CODE_SUCCESS;
1,706,952,176✔
1820
  int32_t lino = 0;
1,706,952,176✔
1821
  size_t  numOfTables = 0;
1,706,952,176✔
1822
  bool    listAdded = false;
1,706,952,176✔
1823

1824
  pListInfo->idInfo.suid = pScanNode->suid;
1,707,125,436✔
1825
  pListInfo->idInfo.tableType = pScanNode->tableType;
1,706,703,591✔
1826

1827
  SArray* pUidList = taosArrayInit(8, sizeof(uint64_t));
1,706,946,249✔
1828
  QUERY_CHECK_NULL(pUidList, code, lino, _error, terrno);
1,706,212,571✔
1829

1830
  SIdxFltStatus status = SFLT_NOT_INDEX;
1,706,212,571✔
1831
  if (pScanNode->tableType != TSDB_SUPER_TABLE && !pScanNode->virtualStableScan) {
1,706,289,911✔
1832
    pListInfo->idInfo.uid = pScanNode->uid;
736,847,557✔
1833
    if (pStorageAPI->metaFn.isTableExisted(pVnode, pScanNode->uid)) {
737,144,857✔
1834
      void* tmp = taosArrayPush(pUidList, &pScanNode->uid);
735,449,178✔
1835
      QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
735,456,031✔
1836
    }
1837
    code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status, pStorageAPI, false, &listAdded, pStreamInfo);
737,339,461✔
1838
    QUERY_CHECK_CODE(code, lino, _end);
737,258,009✔
1839
  } else {
1840
    T_MD5_CTX context = {0};
969,877,325✔
1841

1842
    if (tsTagFilterCache) {
969,742,935✔
1843
      if (pStreamInfo != NULL && ((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
153,801✔
1844
        SNode* tmp = NULL;
11,070✔
1845
        code = nodesCloneNode((SNode*)pTagCond, &tmp);
11,070✔
1846
        QUERY_CHECK_CODE(code, lino, _error);
11,070✔
1847

1848
        PlaceHolderContext ctx = {.code = TSDB_CODE_SUCCESS, .pStreamRuntimeInfo = (SStreamRuntimeFuncInfo*)pStreamInfo};
11,070✔
1849
        nodesRewriteExpr(&tmp, replacePlaceHolderColumn, (void*)&ctx);
11,070✔
1850
        if (TSDB_CODE_SUCCESS != ctx.code) {
11,070✔
1851
          nodesDestroyNode(tmp);
×
1852
          code = ctx.code;
×
1853
          goto _error;
×
1854
        }
1855
        code = genTagFilterDigest(tmp, &context);
11,070✔
1856
        nodesDestroyNode(tmp);
11,070✔
1857
      } else {
1858
        code = genTagFilterDigest(pTagCond, &context);
131,661✔
1859
      }
1860
      // try to retrieve the result from meta cache
1861
      QUERY_CHECK_CODE(code, lino, _error);      
77,490✔
1862
      bool acquired = false;
77,490✔
1863
      code = pStorageAPI->metaFn.getCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest),
77,490✔
1864
                                                    pUidList, &acquired);
1865
      QUERY_CHECK_CODE(code, lino, _error);
77,490✔
1866

1867
      if (acquired) {
77,490✔
1868
        digest[0] = 1;
44,280✔
1869
        memcpy(digest + 1, context.digest, tListLen(context.digest));
44,280✔
1870
        qDebug("retrieve table uid list from cache, numOfTables:%d", (int32_t)taosArrayGetSize(pUidList));
44,280✔
1871
        goto _end;
44,280✔
1872
      }
1873
    }
1874

1875
    if (!pTagCond) {  // no tag filter condition exists, let's fetch all tables of this super table
969,633,414✔
1876
      code = pStorageAPI->metaFn.getChildTableList(pVnode, pScanNode->suid, pUidList);
820,871,868✔
1877
      QUERY_CHECK_CODE(code, lino, _error);
820,982,145✔
1878
    } else {
1879
      // failed to find the result in the cache, let try to calculate the results
1880
      if (pTagIndexCond) {
148,761,546✔
1881
        void* pIndex = pStorageAPI->metaFn.getInvertIndex(pVnode);
41,652,925✔
1882

1883
        SIndexMetaArg metaArg = {.metaEx = pVnode,
41,662,023✔
1884
                                 .idx = pStorageAPI->metaFn.storeGetIndexInfo(pVnode),
41,654,098✔
1885
                                 .ivtIdx = pIndex,
1886
                                 .suid = pScanNode->uid};
41,662,622✔
1887

1888
        status = SFLT_NOT_INDEX;
41,684,240✔
1889
        code = doFilterTag(pTagIndexCond, &metaArg, pUidList, &status, &pStorageAPI->metaFilter);
41,684,240✔
1890
        if (code != 0 || status == SFLT_NOT_INDEX) {  // temporarily disable it for performance sake
41,638,728✔
1891
          qDebug("failed to get tableIds from index, suid:%" PRIu64, pScanNode->uid);
10,527,752✔
1892
        } else {
1893
          qDebug("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
31,110,976✔
1894
        }
1895
      }
1896
    }
1897

1898
    code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status, pStorageAPI, tsTagFilterCache, &listAdded, pStreamInfo);
969,797,025✔
1899
    QUERY_CHECK_CODE(code, lino, _end);
969,945,691✔
1900

1901
    // let's add the filter results into meta-cache
1902
    numOfTables = taosArrayGetSize(pUidList);
969,935,021✔
1903

1904
    if (tsTagFilterCache) {
969,921,856✔
1905
      size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
33,210✔
1906
      char*  pPayload = taosMemoryMalloc(size);
33,210✔
1907
      QUERY_CHECK_NULL(pPayload, code, lino, _end, terrno);
33,210✔
1908

1909
      *(int32_t*)pPayload = numOfTables;
33,210✔
1910
      if (numOfTables > 0) {
33,210✔
1911
        void* tmp = taosArrayGet(pUidList, 0);
25,830✔
1912
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
25,830✔
1913
        memcpy(pPayload + sizeof(int32_t), tmp, numOfTables * sizeof(uint64_t));
25,830✔
1914
      }
1915

1916
      code = pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest),
33,210✔
1917
                                                    pPayload, size, 1);
1918
      QUERY_CHECK_CODE(code, lino, _error);
33,210✔
1919

1920
      digest[0] = 1;
33,210✔
1921
      memcpy(digest + 1, context.digest, tListLen(context.digest));
33,210✔
1922
    }
1923
  }
1924

1925
_end:
1,707,049,117✔
1926
  if (!listAdded) {
1,708,060,363✔
1927
    numOfTables = taosArrayGetSize(pUidList);
1,578,911,420✔
1928
    for (int i = 0; i < numOfTables; i++) {
2,147,483,647✔
1929
      void* tmp = taosArrayGet(pUidList, i);
2,147,483,647✔
1930
      QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
2,147,483,647✔
1931
      STableKeyInfo info = {.uid = *(uint64_t*)tmp, .groupId = 0};
2,147,483,647✔
1932

1933
      void* p = taosArrayPush(pListInfo->pTableList, &info);
2,147,483,647✔
1934
      if (p == NULL) {
2,147,483,647✔
1935
        taosArrayDestroy(pUidList);
×
1936
        return terrno;
×
1937
      }
1938

1939
      qTrace("tagfilter get uid:%" PRIu64 ", %s", info.uid, idstr);
2,147,483,647✔
1940
    }
1941
  }
1942

1943
  qDebug("table list with %d uids built", (int32_t)taosArrayGetSize(pListInfo->pTableList));
1,707,722,221✔
1944

1945
_error:
1,707,862,944✔
1946

1947
  taosArrayDestroy(pUidList);
1,708,085,730✔
1948
  if (code != TSDB_CODE_SUCCESS) {
1,707,907,804✔
1949
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
10,670✔
1950
  }
1951
  return code;
1,707,887,706✔
1952
}
1953

1954
int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList, void* pTaskInfo) {
223,391✔
1955
  int32_t        code = TSDB_CODE_SUCCESS;
223,391✔
1956
  int32_t        lino = 0;
223,391✔
1957
  SSubplan*      pSubplan = (SSubplan*)node;
223,391✔
1958
  SScanPhysiNode pNode = {0};
223,391✔
1959
  pNode.suid = suid;
223,391✔
1960
  pNode.uid = suid;
223,391✔
1961
  pNode.tableType = TSDB_SUPER_TABLE;
223,391✔
1962

1963
  STableListInfo* pTableListInfo = tableListCreate();
223,391✔
1964
  QUERY_CHECK_NULL(pTableListInfo, code, lino, _end, terrno);
223,391✔
1965
  uint8_t digest[17] = {0};
223,391✔
1966
  code = getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL,
223,391✔
1967
                      pTableListInfo, digest, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI, NULL);
1968
  QUERY_CHECK_CODE(code, lino, _end);
223,391✔
1969
  *tableList = pTableListInfo->pTableList;
223,391✔
1970
  pTableListInfo->pTableList = NULL;
223,391✔
1971
  tableListDestroy(pTableListInfo);
223,391✔
1972

1973
_end:
223,391✔
1974
  if (code != TSDB_CODE_SUCCESS) {
223,391✔
1975
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1976
  }
1977
  return code;
223,391✔
1978
}
1979

1980
size_t getTableTagsBufLen(const SNodeList* pGroups) {
×
1981
  size_t keyLen = 0;
×
1982

1983
  SNode* node;
1984
  FOREACH(node, pGroups) {
×
1985
    SExprNode* pExpr = (SExprNode*)node;
×
1986
    keyLen += pExpr->resType.bytes;
×
1987
  }
1988

1989
  keyLen += sizeof(int8_t) * LIST_LENGTH(pGroups);
×
1990
  return keyLen;
×
1991
}
1992

1993
int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId,
×
1994
                              SStorageAPI* pAPI) {
1995
  SMetaReader mr = {0};
×
1996

1997
  pAPI->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &pAPI->metaFn);
×
1998
  if (pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid) != 0) {  // table not exist
×
1999
    pAPI->metaReaderFn.clearReader(&mr);
×
2000
    return TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
2001
  }
2002

2003
  SNodeList* groupNew = NULL;
×
2004
  int32_t    code = nodesCloneList(pGroupNode, &groupNew);
×
2005
  if (TSDB_CODE_SUCCESS != code) {
×
2006
    pAPI->metaReaderFn.clearReader(&mr);
×
2007
    return code;
×
2008
  }
2009

2010
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
×
2011
  nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &ctx);
×
2012
  if (TSDB_CODE_SUCCESS != ctx.code) {
×
2013
    nodesDestroyList(groupNew);
×
2014
    pAPI->metaReaderFn.clearReader(&mr);
×
2015
    return code;
×
2016
  }
2017
  char* isNull = (char*)keyBuf;
×
2018
  char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(pGroupNode);
×
2019

2020
  SNode*  pNode;
2021
  int32_t index = 0;
×
2022
  FOREACH(pNode, groupNew) {
×
2023
    SNode*  pNew = NULL;
×
2024
    int32_t code = scalarCalculateConstants(pNode, &pNew);
×
2025
    if (TSDB_CODE_SUCCESS == code) {
×
2026
      REPLACE_NODE(pNew);
×
2027
    } else {
2028
      nodesDestroyList(groupNew);
×
2029
      pAPI->metaReaderFn.clearReader(&mr);
×
2030
      return code;
×
2031
    }
2032

2033
    if (nodeType(pNew) != QUERY_NODE_VALUE) {
×
2034
      nodesDestroyList(groupNew);
×
2035
      pAPI->metaReaderFn.clearReader(&mr);
×
2036
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
2037
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2038
    }
2039
    SValueNode* pValue = (SValueNode*)pNew;
×
2040

2041
    if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
×
2042
      isNull[index++] = 1;
×
2043
      continue;
×
2044
    } else {
2045
      isNull[index++] = 0;
×
2046
      char* data = nodesGetValueFromNode(pValue);
×
2047
      if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
×
2048
        if (tTagIsJson(data)) {
×
2049
          terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
×
2050
          nodesDestroyList(groupNew);
×
2051
          pAPI->metaReaderFn.clearReader(&mr);
×
2052
          return terrno;
×
2053
        }
2054
        int32_t len = getJsonValueLen(data);
×
2055
        memcpy(pStart, data, len);
×
2056
        pStart += len;
×
2057
      } else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
×
2058
        if (IS_STR_DATA_BLOB(pValue->node.resType.type)) {
×
2059
          return TSDB_CODE_BLOB_NOT_SUPPORT_TAG;
×
2060
        }
2061
        memcpy(pStart, data, varDataTLen(data));
×
2062
        pStart += varDataTLen(data);
×
2063
      } else {
2064
        memcpy(pStart, data, pValue->node.resType.bytes);
×
2065
        pStart += pValue->node.resType.bytes;
×
2066
      }
2067
    }
2068
  }
2069

2070
  int32_t len = (int32_t)(pStart - (char*)keyBuf);
×
2071
  *pGroupId = calcGroupId(keyBuf, len);
×
2072

2073
  nodesDestroyList(groupNew);
×
2074
  pAPI->metaReaderFn.clearReader(&mr);
×
2075

2076
  return TSDB_CODE_SUCCESS;
×
2077
}
2078

2079
SArray* makeColumnArrayFromList(SNodeList* pNodeList) {
25,802,471✔
2080
  if (!pNodeList) {
25,802,471✔
2081
    return NULL;
×
2082
  }
2083

2084
  size_t  numOfCols = LIST_LENGTH(pNodeList);
25,802,471✔
2085
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
25,801,335✔
2086
  if (pList == NULL) {
25,802,618✔
2087
    return NULL;
×
2088
  }
2089

2090
  for (int32_t i = 0; i < numOfCols; ++i) {
60,006,268✔
2091
    SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
34,218,373✔
2092
    if (!pColNode) {
34,223,191✔
2093
      taosArrayDestroy(pList);
×
2094
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
2095
      return NULL;
×
2096
    }
2097

2098
    // todo extract method
2099
    SColumn c = {0};
34,223,191✔
2100
    c.slotId = pColNode->slotId;
34,212,037✔
2101
    c.colId = pColNode->colId;
34,223,983✔
2102
    c.type = pColNode->node.resType.type;
34,224,179✔
2103
    c.bytes = pColNode->node.resType.bytes;
34,212,045✔
2104
    c.precision = pColNode->node.resType.precision;
34,221,699✔
2105
    c.scale = pColNode->node.resType.scale;
34,209,684✔
2106

2107
    void* tmp = taosArrayPush(pList, &c);
34,216,180✔
2108
    if (!tmp) {
34,216,180✔
2109
      taosArrayDestroy(pList);
×
2110
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
2111
      return NULL;
×
2112
    }
2113
  }
2114

2115
  return pList;
25,787,895✔
2116
}
2117

2118
int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
2,147,483,647✔
2119
                            int32_t type, SColMatchInfo* pMatchInfo) {
2120
  size_t  numOfCols = LIST_LENGTH(pNodeList);
2,147,483,647✔
2121
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
2122
  int32_t lino = 0;
2,147,483,647✔
2123

2124
  pMatchInfo->matchType = type;
2,147,483,647✔
2125

2126
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchItem));
2,147,483,647✔
2127
  if (pList == NULL) {
2,147,483,647✔
2128
    code = terrno;
×
2129
    return code;
×
2130
  }
2131

2132
  for (int32_t i = 0; i < numOfCols; ++i) {
2,147,483,647✔
2133
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
2,147,483,647✔
2134
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
2,147,483,647✔
2135
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
2,147,483,647✔
2136
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
2,147,483,647✔
2137

2138
      SColMatchItem c = {.needOutput = true};
2,147,483,647✔
2139
      c.colId = pColNode->colId;
2,147,483,647✔
2140
      c.srcSlotId = pColNode->slotId;
2,147,483,647✔
2141
      c.dstSlotId = pNode->slotId;
2,147,483,647✔
2142
      c.isPk = pColNode->isPk;
2,147,483,647✔
2143
      c.dataType = pColNode->node.resType;
2,147,483,647✔
2144
      void* tmp = taosArrayPush(pList, &c);
2,147,483,647✔
2145
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2,147,483,647✔
2146
    }
2147
  }
2148

2149
  // set the output flag for each column in SColMatchInfo, according to the
2150
  *numOfOutputCols = 0;
2,147,483,647✔
2151
  int32_t num = LIST_LENGTH(pOutputNodeList->pSlots);
2,147,483,647✔
2152
  for (int32_t i = 0; i < num; ++i) {
2,147,483,647✔
2153
    SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i);
2,147,483,647✔
2154
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
2,147,483,647✔
2155

2156
    // todo: add reserve flag check
2157
    // it is a column reserved for the arithmetic expression calculation
2158
    if (pNode->slotId >= numOfCols) {
2,147,483,647✔
2159
      (*numOfOutputCols) += 1;
1,260,523,768✔
2160
      continue;
1,260,557,788✔
2161
    }
2162

2163
    SColMatchItem* info = NULL;
2,147,483,647✔
2164
    for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
2,147,483,647✔
2165
      info = taosArrayGet(pList, j);
2,147,483,647✔
2166
      QUERY_CHECK_NULL(info, code, lino, _end, terrno);
2,147,483,647✔
2167
      if (info->dstSlotId == pNode->slotId) {
2,147,483,647✔
2168
        break;
2,147,483,647✔
2169
      }
2170
    }
2171

2172
    if (pNode->output) {
12,358,299✔
2173
      (*numOfOutputCols) += 1;
2,147,483,647✔
2174
    } else if (info != NULL) {
82,001,684✔
2175
      // select distinct tbname from stb where tbname='abc';
2176
      info->needOutput = false;
81,752,608✔
2177
    }
2178
  }
2179

2180
  pMatchInfo->pList = pList;
2,147,483,647✔
2181

2182
_end:
2,147,483,647✔
2183
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
2184
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2185
  }
2186
  return code;
2,147,483,647✔
2187
}
2188

2189
static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision,
2,147,483,647✔
2190
                                  const char* name) {
2191
  SResSchema s = {0};
2,147,483,647✔
2192
  s.scale = scale;
2,147,483,647✔
2193
  s.type = type;
2,147,483,647✔
2194
  s.bytes = bytes;
2,147,483,647✔
2195
  s.slotId = slotId;
2,147,483,647✔
2196
  s.precision = precision;
2,147,483,647✔
2197
  tstrncpy(s.name, name, tListLen(s.name));
2,147,483,647✔
2198

2199
  return s;
2,147,483,647✔
2200
}
2201

2202
static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType, EColumnType colType) {
2,147,483,647✔
2203
  SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn));
2,147,483,647✔
2204
  if (pCol == NULL) {
2,147,483,647✔
2205
    return NULL;
×
2206
  }
2207

2208
  pCol->slotId = slotId;
2,147,483,647✔
2209
  pCol->colId = colId;
2,147,483,647✔
2210
  pCol->bytes = pType->bytes;
2,147,483,647✔
2211
  pCol->type = pType->type;
2,147,483,647✔
2212
  pCol->scale = pType->scale;
2,147,483,647✔
2213
  pCol->precision = pType->precision;
2,147,483,647✔
2214
  pCol->dataBlockId = blockId;
2,147,483,647✔
2215
  pCol->colType = colType;
2,147,483,647✔
2216
  return pCol;
2,147,483,647✔
2217
}
2218

2219
int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
2,147,483,647✔
2220
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
2221
  int32_t lino = 0;
2,147,483,647✔
2222
  pExp->base.numOfParams = 0;
2,147,483,647✔
2223
  pExp->base.pParam = NULL;
2,147,483,647✔
2224
  pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode));
2,147,483,647✔
2225
  QUERY_CHECK_NULL(pExp->pExpr, code, lino, _end, terrno);
2,147,483,647✔
2226

2227
  pExp->pExpr->_function.num = 1;
2,147,483,647✔
2228
  pExp->pExpr->_function.functionId = -1;
2,147,483,647✔
2229

2230
  int32_t type = nodeType(pNode);
2,147,483,647✔
2231
  // it is a project query, or group by column
2232
  if (type == QUERY_NODE_COLUMN) {
2,147,483,647✔
2233
    pExp->pExpr->nodeType = QUERY_NODE_COLUMN;
2,147,483,647✔
2234
    SColumnNode* pColNode = (SColumnNode*)pNode;
2,147,483,647✔
2235

2236
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
2,147,483,647✔
2237
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
2,147,483,647✔
2238

2239
    pExp->base.numOfParams = 1;
2,147,483,647✔
2240

2241
    SDataType* pType = &pColNode->node.resType;
2,147,483,647✔
2242
    pExp->base.resSchema =
2243
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pColNode->colName);
2,147,483,647✔
2244

2245
    pExp->base.pParam[0].pCol =
2,147,483,647✔
2246
        createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType);
2,147,483,647✔
2247
    QUERY_CHECK_NULL(pExp->base.pParam[0].pCol, code, lino, _end, terrno);
2,147,483,647✔
2248

2249
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
2,147,483,647✔
2250
  } else if (type == QUERY_NODE_VALUE) {
2,147,483,647✔
2251
    pExp->pExpr->nodeType = QUERY_NODE_VALUE;
152,103,825✔
2252
    SValueNode* pValNode = (SValueNode*)pNode;
152,117,430✔
2253

2254
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
152,117,430✔
2255
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
152,060,068✔
2256

2257
    pExp->base.numOfParams = 1;
152,009,273✔
2258

2259
    SDataType* pType = &pValNode->node.resType;
152,123,326✔
2260
    pExp->base.resSchema =
2261
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName);
152,106,850✔
2262
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
152,028,502✔
2263
    code = nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
152,127,624✔
2264
    QUERY_CHECK_CODE(code, lino, _end);
152,061,102✔
2265
  } else if (type == QUERY_NODE_FUNCTION) {
2,147,483,647✔
2266
    pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
2,147,483,647✔
2267
    SFunctionNode* pFuncNode = (SFunctionNode*)pNode;
2,147,483,647✔
2268

2269
    SDataType* pType = &pFuncNode->node.resType;
2,147,483,647✔
2270
    pExp->base.resSchema =
2271
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pFuncNode->node.aliasName);
2,147,483,647✔
2272
    tExprNode* pExprNode = pExp->pExpr;
2,147,483,647✔
2273

2274
    pExprNode->_function.functionId = pFuncNode->funcId;
2,147,483,647✔
2275
    pExprNode->_function.pFunctNode = pFuncNode;
2,147,483,647✔
2276
    pExprNode->_function.functionType = pFuncNode->funcType;
2,147,483,647✔
2277

2278
    tstrncpy(pExprNode->_function.functionName, pFuncNode->functionName, tListLen(pExprNode->_function.functionName));
2,147,483,647✔
2279

2280
    pExp->base.pParamList = pFuncNode->pParameterList;
2,147,483,647✔
2281
#if 1
2282
    // todo refactor: add the parameter for tbname function
2283
    const char* name = "tbname";
2,147,483,647✔
2284
    int32_t     len = strlen(name);
2,147,483,647✔
2285

2286
    if (!pFuncNode->pParameterList && (memcmp(pExprNode->_function.functionName, name, len) == 0) &&
2,147,483,647✔
2287
        pExprNode->_function.functionName[len] == 0) {
186,000,811✔
2288
      pFuncNode->pParameterList = NULL;
185,940,457✔
2289
      int32_t     code = nodesMakeList(&pFuncNode->pParameterList);
185,970,326✔
2290
      SValueNode* res = NULL;
186,008,408✔
2291
      if (TSDB_CODE_SUCCESS == code) {
186,012,628✔
2292
        code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
186,019,106✔
2293
      }
2294
      QUERY_CHECK_CODE(code, lino, _end);
186,086,452✔
2295
      res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
186,086,452✔
2296
      code = nodesListAppend(pFuncNode->pParameterList, (SNode*)res);
185,957,094✔
2297
      if (code != TSDB_CODE_SUCCESS) {
186,023,289✔
2298
        nodesDestroyNode((SNode*)res);
×
2299
        res = NULL;
×
2300
      }
2301
      QUERY_CHECK_CODE(code, lino, _end);
186,023,289✔
2302
    }
2303
#endif
2304

2305
    int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
2,147,483,647✔
2306

2307
    pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam));
2,147,483,647✔
2308
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
2,147,483,647✔
2309
    pExp->base.numOfParams = numOfParam;
2,147,483,647✔
2310

2311
    for (int32_t j = 0; j < numOfParam && TSDB_CODE_SUCCESS == code; ++j) {
2,147,483,647✔
2312
      SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
2,147,483,647✔
2313
      QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
2,147,483,647✔
2314
      if (p1->type == QUERY_NODE_COLUMN) {
2,147,483,647✔
2315
        SColumnNode* pcn = (SColumnNode*)p1;
2,147,483,647✔
2316

2317
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
2,147,483,647✔
2318
        pExp->base.pParam[j].pCol =
2,147,483,647✔
2319
            createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType);
2,147,483,647✔
2320
        QUERY_CHECK_NULL(pExp->base.pParam[j].pCol, code, lino, _end, terrno);
2,147,483,647✔
2321
      } else if (p1->type == QUERY_NODE_VALUE) {
1,659,743,927✔
2322
        SValueNode* pvn = (SValueNode*)p1;
939,443,759✔
2323
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
939,443,759✔
2324
        code = nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
939,373,483✔
2325
        QUERY_CHECK_CODE(code, lino, _end);
938,724,948✔
2326
      }
2327
    }
2328
    pExp->pExpr->_function.bindExprID = ((SExprNode*)pNode)->bindExprID;
2,147,483,647✔
2329
  } else if (type == QUERY_NODE_OPERATOR) {
182,898,506✔
2330
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
170,567,372✔
2331
    SOperatorNode* pOpNode = (SOperatorNode*)pNode;
170,567,704✔
2332

2333
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
170,567,704✔
2334
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
170,459,797✔
2335
    pExp->base.numOfParams = 1;
170,365,059✔
2336

2337
    SDataType* pType = &pOpNode->node.resType;
170,518,857✔
2338
    pExp->base.resSchema =
2339
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pOpNode->node.aliasName);
170,450,365✔
2340
    pExp->pExpr->_optrRoot.pRootNode = pNode;
170,469,249✔
2341
  } else if (type == QUERY_NODE_CASE_WHEN) {
12,331,134✔
2342
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
12,284,292✔
2343
    SCaseWhenNode* pCaseNode = (SCaseWhenNode*)pNode;
12,283,781✔
2344

2345
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
12,283,781✔
2346
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
12,282,743✔
2347
    pExp->base.numOfParams = 1;
12,284,292✔
2348

2349
    SDataType* pType = &pCaseNode->node.resType;
12,283,670✔
2350
    pExp->base.resSchema =
2351
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCaseNode->node.aliasName);
12,282,480✔
2352
    pExp->pExpr->_optrRoot.pRootNode = pNode;
12,280,699✔
2353
  } else if (type == QUERY_NODE_LOGIC_CONDITION) {
46,842✔
2354
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
29,212✔
2355
    SLogicConditionNode* pCond = (SLogicConditionNode*)pNode;
29,212✔
2356
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
29,212✔
2357
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
29,212✔
2358
    pExp->base.numOfParams = 1;
29,212✔
2359
    SDataType* pType = &pCond->node.resType;
29,212✔
2360
    pExp->base.resSchema =
2361
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName);
29,212✔
2362
    pExp->pExpr->_optrRoot.pRootNode = pNode;
29,212✔
2363
  } else {
2364
    code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
17,845✔
2365
    QUERY_CHECK_CODE(code, lino, _end);
17,845✔
2366
  }
2367
  pExp->pExpr->relatedTo = ((SExprNode*)pNode)->relatedTo;
2,147,483,647✔
2368
_end:
2,147,483,647✔
2369
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
2370
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2371
  }
2372
  return code;
2,147,483,647✔
2373
}
2374

2375
int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) {
2,147,483,647✔
2376
  return createExprFromOneNode(pExp, pTargetNode->pExpr, pTargetNode->slotId);
2,147,483,647✔
2377
}
2378

2379
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
×
2380
  *numOfExprs = LIST_LENGTH(pNodeList);
×
2381
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
×
2382
  if (!pExprs) {
×
2383
    return NULL;
×
2384
  }
2385

2386
  for (int32_t i = 0; i < (*numOfExprs); ++i) {
×
2387
    SExprInfo* pExp = &pExprs[i];
×
2388
    int32_t    code = createExprFromOneNode(pExp, nodesListGetNode(pNodeList, i), i + UD_TAG_COLUMN_INDEX);
×
2389
    if (code != TSDB_CODE_SUCCESS) {
×
2390
      taosMemoryFreeClear(pExprs);
×
2391
      terrno = code;
×
2392
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2393
      return NULL;
×
2394
    }
2395
  }
2396

2397
  return pExprs;
×
2398
}
2399

2400
int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs) {
2,147,483,647✔
2401
  QRY_PARAM_CHECK(pExprInfo);
2,147,483,647✔
2402

2403
  int32_t code = 0;
2,147,483,647✔
2404
  int32_t numOfFuncs = LIST_LENGTH(pNodeList);
2,147,483,647✔
2405
  int32_t numOfGroupKeys = 0;
2,147,483,647✔
2406
  if (pGroupKeys != NULL) {
2,147,483,647✔
2407
    numOfGroupKeys = LIST_LENGTH(pGroupKeys);
302,322,434✔
2408
  }
2409

2410
  *numOfExprs = numOfFuncs + numOfGroupKeys;
2,147,483,647✔
2411
  if (*numOfExprs == 0) {
2,147,483,647✔
2412
    return code;
315,342,019✔
2413
  }
2414

2415
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
2,147,483,647✔
2416
  if (pExprs == NULL) {
2,147,483,647✔
2417
    return terrno;
×
2418
  }
2419

2420
  for (int32_t i = 0; i < (*numOfExprs); ++i) {
2,147,483,647✔
2421
    STargetNode* pTargetNode = NULL;
2,147,483,647✔
2422
    if (i < numOfFuncs) {
2,147,483,647✔
2423
      pTargetNode = (STargetNode*)nodesListGetNode(pNodeList, i);
2,147,483,647✔
2424
    } else {
2425
      pTargetNode = (STargetNode*)nodesListGetNode(pGroupKeys, i - numOfFuncs);
464,331,593✔
2426
    }
2427
    if (!pTargetNode) {
2,147,483,647✔
2428
      destroyExprInfo(pExprs, *numOfExprs);
×
2429
      taosMemoryFreeClear(pExprs);
×
2430
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
2431
      return terrno;
×
2432
    }
2433

2434
    SExprInfo* pExp = &pExprs[i];
2,147,483,647✔
2435
    code = createExprFromTargetNode(pExp, pTargetNode);
2,147,483,647✔
2436
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
2437
      destroyExprInfo(pExprs, *numOfExprs);
×
2438
      taosMemoryFreeClear(pExprs);
×
2439
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2440
      return code;
×
2441
    }
2442
  }
2443

2444
  *pExprInfo = pExprs;
2,147,483,647✔
2445
  return code;
2,147,483,647✔
2446
}
2447

2448
static void deleteSubsidiareCtx(void* pData) {
×
2449
  SSubsidiaryResInfo* pCtx = (SSubsidiaryResInfo*)pData;
×
2450
  if (pCtx->pCtx) {
×
2451
    taosMemoryFreeClear(pCtx->pCtx);
×
2452
  }
2453
}
×
2454

2455
// set the output buffer for the selectivity + tag query
2456
static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
2,147,483,647✔
2457
  int32_t num = 0;
2,147,483,647✔
2458
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
2459
  int32_t lino = 0;
2,147,483,647✔
2460

2461
  SArray* pValCtxArray = NULL;
2,147,483,647✔
2462
  for (int32_t i = numOfOutput - 1; i > 0; --i) {  // select Func is at the end of the list
2,147,483,647✔
2463
    int32_t funcIdx = pCtx[i].pExpr->pExpr->_function.bindExprID;
2,147,483,647✔
2464
    if (funcIdx > 0) {
2,147,483,647✔
2465
      if (pValCtxArray == NULL) {
18,465,286✔
2466
        // the end of the list is the select function of biggest index
2467
        pValCtxArray = taosArrayInit_s(sizeof(SSubsidiaryResInfo*), funcIdx);
13,233,315✔
2468
        if (pValCtxArray == NULL) {
13,224,670✔
2469
          return terrno;
×
2470
        }
2471
      }
2472
      if (funcIdx > pValCtxArray->size) {
18,456,641✔
2473
        qError("funcIdx:%d is out of range", funcIdx);
×
2474
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2475
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2476
      }
2477
      SSubsidiaryResInfo* pSubsidiary = &pCtx[i].subsidiaries;
18,460,612✔
2478
      pSubsidiary->pCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
18,470,119✔
2479
      if (pSubsidiary->pCtx == NULL) {
18,458,683✔
2480
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2481
        return terrno;
×
2482
      }
2483
      pSubsidiary->num = 0;
18,456,707✔
2484
      taosArraySet(pValCtxArray, funcIdx - 1, &pSubsidiary);
18,457,554✔
2485
    }
2486
  }
2487

2488
  SqlFunctionCtx*  p = NULL;
2,147,483,647✔
2489
  SqlFunctionCtx** pValCtx = NULL;
2,147,483,647✔
2490
  if (pValCtxArray == NULL) {
2,147,483,647✔
2491
    pValCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
2,147,483,647✔
2492
    if (pValCtx == NULL) {
2,147,483,647✔
2493
      QUERY_CHECK_CODE(terrno, lino, _end);
×
2494
    }
2495
  }
2496

2497
  for (int32_t i = 0; i < numOfOutput; ++i) {
2,147,483,647✔
2498
    const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
2,147,483,647✔
2499
    if ((strcmp(pName, "_select_value") == 0)) {
2,147,483,647✔
2500
      if (pValCtxArray == NULL) {
66,493,796✔
2501
        pValCtx[num++] = &pCtx[i];
40,479,271✔
2502
      } else {
2503
        int32_t bindFuncIndex = pCtx[i].pExpr->pExpr->relatedTo;  // start from index 1;
26,014,642✔
2504
        if (bindFuncIndex > 0) {                                  // 0 is default index related to the select function
26,016,382✔
2505
          bindFuncIndex -= 1;
25,389,994✔
2506
        }
2507
        SSubsidiaryResInfo** pSubsidiary = taosArrayGet(pValCtxArray, bindFuncIndex);
26,016,382✔
2508
        if (pSubsidiary == NULL) {
26,014,991✔
2509
          QUERY_CHECK_CODE(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, lino, _end);
×
2510
        }
2511
        (*pSubsidiary)->pCtx[(*pSubsidiary)->num] = &pCtx[i];
26,014,991✔
2512
        (*pSubsidiary)->num++;
25,999,248✔
2513
      }
2514
    } else if (fmIsSelectFunc(pCtx[i].functionId)) {
2,147,483,647✔
2515
      if (pValCtxArray == NULL) {
578,954,170✔
2516
        p = &pCtx[i];
556,565,714✔
2517
      }
2518
    }
2519
  }
2520

2521
  if (p != NULL) {
2,147,483,647✔
2522
    p->subsidiaries.pCtx = pValCtx;
228,640,931✔
2523
    p->subsidiaries.num = num;
228,604,023✔
2524
  } else {
2525
    taosMemoryFreeClear(pValCtx);
2,147,483,647✔
2526
  }
2527

2528
_end:
13,054,300✔
2529
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
2530
    taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2531
    taosMemoryFreeClear(pValCtx);
×
2532
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2533
  } else {
2534
    taosArrayDestroy(pValCtxArray);
2,147,483,647✔
2535
  }
2536
  return code;
2,147,483,647✔
2537
}
2538

2539
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset,
2,147,483,647✔
2540
                                     SFunctionStateStore* pStore) {
2541
  int32_t         code = TSDB_CODE_SUCCESS;
2,147,483,647✔
2542
  int32_t         lino = 0;
2,147,483,647✔
2543
  SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
2,147,483,647✔
2544
  if (pFuncCtx == NULL) {
2,147,483,647✔
2545
    return NULL;
×
2546
  }
2547

2548
  *rowEntryInfoOffset = taosMemoryCalloc(numOfOutput, sizeof(int32_t));
2,147,483,647✔
2549
  if (*rowEntryInfoOffset == 0) {
2,147,483,647✔
2550
    taosMemoryFreeClear(pFuncCtx);
×
2551
    return NULL;
×
2552
  }
2553

2554
  for (int32_t i = 0; i < numOfOutput; ++i) {
2,147,483,647✔
2555
    SExprInfo* pExpr = &pExprInfo[i];
2,147,483,647✔
2556

2557
    SExprBasicInfo* pFunct = &pExpr->base;
2,147,483,647✔
2558
    SqlFunctionCtx* pCtx = &pFuncCtx[i];
2,147,483,647✔
2559

2560
    pCtx->functionId = -1;
2,147,483,647✔
2561
    pCtx->pExpr = pExpr;
2,147,483,647✔
2562

2563
    if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) {
2,147,483,647✔
2564
      SFuncExecEnv env = {0};
2,147,483,647✔
2565
      pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
2,147,483,647✔
2566
      pCtx->isPseudoFunc = fmIsWindowPseudoColumnFunc(pCtx->functionId) || fmIsPlaceHolderFunc(pCtx->functionId);
2,147,483,647✔
2567
      pCtx->isNotNullFunc = fmIsNotNullOutputFunc(pCtx->functionId);
2,147,483,647✔
2568

2569
      bool isUdaf = fmIsUserDefinedFunc(pCtx->functionId);
2,147,483,647✔
2570
      if (fmIsAggFunc(pCtx->functionId) || fmIsIndefiniteRowsFunc(pCtx->functionId)) {
2,147,483,647✔
2571
        if (!isUdaf) {
1,822,998,253✔
2572
          code = fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
1,822,500,084✔
2573
          QUERY_CHECK_CODE(code, lino, _end);
1,821,772,313✔
2574
        } else {
2575
          char* udfName = pExpr->pExpr->_function.pFunctNode->functionName;
498,169✔
2576
          pCtx->udfName = taosStrdup(udfName);
498,169✔
2577
          QUERY_CHECK_NULL(pCtx->udfName, code, lino, _end, terrno);
498,169✔
2578

2579
          code = fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
498,169✔
2580
          QUERY_CHECK_CODE(code, lino, _end);
498,169✔
2581
        }
2582
        bool tmp = pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
1,822,270,482✔
2583
        if (!tmp) {
1,822,309,291✔
2584
          code = terrno;
×
2585
          QUERY_CHECK_CODE(code, lino, _end);
×
2586
        }
2587
      } else {
2588
        if (fmIsPlaceHolderFunc(pCtx->functionId)) {
1,217,169,879✔
2589
          code = fmGetStreamPesudoFuncEnv(pCtx->functionId, pExpr->base.pParamList, &env);
101,944,454✔
2590
          QUERY_CHECK_CODE(code, lino, _end);
101,944,481✔
2591
        }      
2592
        
2593
        code = fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp);
1,217,623,202✔
2594
        if (code != TSDB_CODE_SUCCESS && isUdaf) {
1,217,456,846✔
2595
          code = TSDB_CODE_SUCCESS;
636,732✔
2596
        }
2597
        QUERY_CHECK_CODE(code, lino, _end);
1,217,456,846✔
2598

2599
        if (pCtx->sfp.getEnv != NULL) {
1,217,456,846✔
2600
          bool tmp = pCtx->sfp.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
228,186,954✔
2601
          if (!tmp) {
228,180,093✔
2602
            code = terrno;
×
2603
            QUERY_CHECK_CODE(code, lino, _end);
×
2604
          }
2605
        }
2606
      }
2607
      pCtx->resDataInfo.interBufSize = env.calcMemSize;
2,147,483,647✔
2608
    } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR ||
2,147,483,647✔
2609
               pExpr->pExpr->nodeType == QUERY_NODE_VALUE) {
152,094,850✔
2610
      // for simple column, the result buffer needs to hold at least one element.
2611
      pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes;
2,147,483,647✔
2612
    }
2613

2614
    pCtx->input.numOfInputCols = pFunct->numOfParams;
2,147,483,647✔
2615
    pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
2,147,483,647✔
2616
    QUERY_CHECK_NULL(pCtx->input.pData, code, lino, _end, terrno);
2,147,483,647✔
2617
    pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
2,147,483,647✔
2618
    QUERY_CHECK_NULL(pCtx->input.pColumnDataAgg, code, lino, _end, terrno);
2,147,483,647✔
2619

2620
    pCtx->pTsOutput = NULL;
2,147,483,647✔
2621
    pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
2,147,483,647✔
2622
    pCtx->resDataInfo.type = pFunct->resSchema.type;
2,147,483,647✔
2623
    pCtx->order = TSDB_ORDER_ASC;
2,147,483,647✔
2624
    pCtx->start.key = INT64_MIN;
2,147,483,647✔
2625
    pCtx->end.key = INT64_MIN;
2,147,483,647✔
2626
    pCtx->numOfParams = pExpr->base.numOfParams;
2,147,483,647✔
2627
    pCtx->param = pFunct->pParam;
2,147,483,647✔
2628
    pCtx->saveHandle.currentPage = -1;
2,147,483,647✔
2629
    pCtx->pStore = pStore;
2,147,483,647✔
2630
    pCtx->hasWindowOrGroup = false;
2,147,483,647✔
2631
    pCtx->needCleanup = false;
2,147,483,647✔
2632
  }
2633

2634
  for (int32_t i = 1; i < numOfOutput; ++i) {
2,147,483,647✔
2635
    (*rowEntryInfoOffset)[i] = (int32_t)((*rowEntryInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) +
2,147,483,647✔
2636
                                         pFuncCtx[i - 1].resDataInfo.interBufSize);
2,147,483,647✔
2637
  }
2638

2639
  code = setSelectValueColumnInfo(pFuncCtx, numOfOutput);
2,147,483,647✔
2640
  QUERY_CHECK_CODE(code, lino, _end);
2,147,483,647✔
2641

2642
_end:
2,147,483,647✔
2643
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
2644
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2645
    for (int32_t i = 0; i < numOfOutput; ++i) {
×
2646
      taosMemoryFree(pFuncCtx[i].input.pData);
×
2647
      taosMemoryFree(pFuncCtx[i].input.pColumnDataAgg);
×
2648
    }
2649
    taosMemoryFreeClear(*rowEntryInfoOffset);
×
2650
    taosMemoryFreeClear(pFuncCtx);
×
2651

2652
    terrno = code;
×
2653
    return NULL;
×
2654
  }
2655
  return pFuncCtx;
2,147,483,647✔
2656
}
2657

2658
// NOTE: sources columns are more than the destination SSDatablock columns.
2659
// doFilter in table scan needs every column even its output is false
2660
int32_t relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn) {
111,503,752✔
2661
  int32_t code = TSDB_CODE_SUCCESS;
111,503,752✔
2662
  size_t  numOfSrcCols = taosArrayGetSize(pCols);
111,503,752✔
2663

2664
  int32_t i = 0, j = 0;
111,510,203✔
2665
  while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
869,406,146✔
2666
    SColumnInfoData* p = taosArrayGet(pCols, i);
757,910,372✔
2667
    if (!p) {
757,893,273✔
2668
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
2669
      return terrno;
×
2670
    }
2671
    SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j);
757,893,273✔
2672
    if (!pmInfo) {
757,885,625✔
2673
      return terrno;
×
2674
    }
2675

2676
    if (p->info.colId == pmInfo->colId) {
757,885,625✔
2677
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->dstSlotId);
666,242,759✔
2678
      if (!pDst) {
666,235,382✔
2679
        return terrno;
×
2680
      }
2681
      code = colDataAssign(pDst, p, pBlock->info.rows, &pBlock->info);
666,235,382✔
2682
      if (code != TSDB_CODE_SUCCESS) {
666,231,652✔
2683
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2684
        return code;
×
2685
      }
2686
      i++;
666,231,652✔
2687
      j++;
666,231,652✔
2688
    } else if (p->info.colId < pmInfo->colId) {
91,658,426✔
2689
      i++;
91,664,291✔
2690
    } else {
2691
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
2692
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2693
    }
2694
  }
2695
  return code;
111,509,322✔
2696
}
2697

2698
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
1,414,142,803✔
2699
  SInterval interval = {
2,147,483,647✔
2700
      .interval = pTableScanNode->interval,
1,412,688,693✔
2701
      .sliding = pTableScanNode->sliding,
1,413,230,525✔
2702
      .intervalUnit = pTableScanNode->intervalUnit,
1,414,407,980✔
2703
      .slidingUnit = pTableScanNode->slidingUnit,
1,413,804,582✔
2704
      .offset = pTableScanNode->offset,
1,414,419,430✔
2705
      .precision = pTableScanNode->scan.node.pOutputDataBlockDesc->precision,
1,414,398,937✔
2706
      .timeRange = pTableScanNode->scanRange,
2707
  };
2708
  calcIntervalAutoOffset(&interval);
1,413,013,742✔
2709

2710
  return interval;
1,413,703,024✔
2711
}
2712

2713
SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
495,044,654✔
2714
  SColumn c = {0};
495,044,654✔
2715

2716
  c.slotId = pColNode->slotId;
495,044,654✔
2717
  c.colId = pColNode->colId;
495,033,622✔
2718
  c.type = pColNode->node.resType.type;
495,058,959✔
2719
  c.bytes = pColNode->node.resType.bytes;
494,987,875✔
2720
  c.scale = pColNode->node.resType.scale;
495,037,478✔
2721
  c.precision = pColNode->node.resType.precision;
495,065,290✔
2722
  return c;
494,951,867✔
2723
}
2724

2725

2726
/**
2727
 * @brief Determine the actual time range for reading data based on the RANGE clause and the WHERE conditions.
2728
 * @param[in] cond The range specified by WHERE condition.
2729
 * @param[in] range The range specified by RANGE clause.
2730
 * @param[out] twindow The range to be read in DESC order, and only one record is needed.
2731
 * @param[out] extTwindow The external range to read for only one record, which is used for FILL clause.
2732
 * @note `cond` and `twindow` may be the same address.
2733
 */
2734
static int32_t getQueryExtWindow(const STimeWindow* cond, const STimeWindow* range, STimeWindow* twindow,
22,897,741✔
2735
                                 STimeWindow* extTwindows) {
2736
  int32_t     code = TSDB_CODE_SUCCESS;
22,897,741✔
2737
  int32_t     lino = 0;
22,897,741✔
2738
  STimeWindow tempWindow;
2739

2740
  if (cond->skey > cond->ekey || range->skey > range->ekey) {
22,897,741✔
2741
    *twindow = extTwindows[0] = extTwindows[1] = TSWINDOW_DESC_INITIALIZER;
33,898✔
2742
    return code;
33,015✔
2743
  }
2744

2745
  if (range->ekey < cond->skey) {
22,863,218✔
2746
    extTwindows[1] = *cond;
3,672,620✔
2747
    *twindow = extTwindows[0] = TSWINDOW_DESC_INITIALIZER;
3,672,620✔
2748
    return code;
3,672,620✔
2749
  }
2750

2751
  if (cond->ekey < range->skey) {
19,192,106✔
2752
    extTwindows[0] = *cond;
2,596,537✔
2753
    *twindow = extTwindows[1] = TSWINDOW_DESC_INITIALIZER;
2,596,537✔
2754
    return code;
2,596,537✔
2755
  }
2756

2757
  // Only scan data in the time range intersecion.
2758
  extTwindows[0] = extTwindows[1] = *cond;
16,593,436✔
2759
  twindow->skey = TMAX(cond->skey, range->skey);
16,595,569✔
2760
  twindow->ekey = TMIN(cond->ekey, range->ekey);
16,593,436✔
2761
  extTwindows[0].ekey = twindow->skey - 1;
16,594,061✔
2762
  extTwindows[1].skey = twindow->ekey + 1;
16,594,061✔
2763

2764
  return code;
16,595,569✔
2765
}
2766

2767
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode,
1,708,498,608✔
2768
                               const SReadHandle* readHandle, bool applyExtWin) {
2769
  int32_t code = 0;                             
1,708,498,608✔
2770
  pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
1,708,498,608✔
2771
  pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
1,708,623,838✔
2772

2773
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
1,708,582,011✔
2774
  if (!pCond->colList) {
1,708,033,915✔
2775
    return terrno;
×
2776
  }
2777
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
1,707,728,441✔
2778
  if (pCond->pSlotList == NULL) {
1,708,295,737✔
2779
    taosMemoryFreeClear(pCond->colList);
×
2780
    return terrno;
×
2781
  }
2782

2783
  // TODO: get it from stable scan node
2784
  pCond->twindows = pTableScanNode->scanRange;
1,707,745,556✔
2785
  pCond->suid = pTableScanNode->scan.suid;
1,708,808,796✔
2786
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
1,707,669,323✔
2787
  pCond->startVersion = -1;
1,708,161,256✔
2788
  pCond->endVersion = -1;
1,708,489,942✔
2789
  pCond->skipRollup = readHandle->skipRollup;
1,707,826,137✔
2790
  if (readHandle->winRangeValid) {
1,708,105,218✔
2791
    pCond->twindows = readHandle->winRange;
1,223,736✔
2792
  }
2793
  // allowed read stt file optimization mode
2794
  pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) &&
2,147,483,647✔
2795
                       (pTableScanNode->scan.node.pConditions == NULL) && (pTableScanNode->interval == 0);
1,708,458,383✔
2796

2797
  int32_t j = 0;
1,707,569,604✔
2798
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
2,147,483,647✔
2799
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pTableScanNode->scan.pScanCols, i);
2,147,483,647✔
2800
    if (!pNode) {
2,147,483,647✔
2801
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
2802
      return terrno;
×
2803
    }
2804
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
2,147,483,647✔
2805
    if (pColNode->colType == COLUMN_TYPE_TAG) {
2,147,483,647✔
2806
      continue;
×
2807
    }
2808

2809
    pCond->colList[j].type = pColNode->node.resType.type;
2,147,483,647✔
2810
    pCond->colList[j].bytes = pColNode->node.resType.bytes;
2,147,483,647✔
2811
    pCond->colList[j].colId = pColNode->colId;
2,147,483,647✔
2812
    pCond->colList[j].pk = pColNode->isPk;
2,147,483,647✔
2813

2814
    pCond->pSlotList[j] = pNode->slotId;
2,147,483,647✔
2815
    j += 1;
2,147,483,647✔
2816
  }
2817

2818
  pCond->numOfCols = j;
1,708,250,972✔
2819

2820
  if (applyExtWin) {
1,708,602,984✔
2821
    if (NULL != pTableScanNode->pExtScanRange) {
1,415,820,596✔
2822
      pCond->type = TIMEWINDOW_RANGE_EXTERNAL;
22,897,741✔
2823
      code = getQueryExtWindow(&pCond->twindows, pTableScanNode->pExtScanRange, &pCond->twindows, pCond->extTwindows);
22,897,741✔
2824
    } else if (readHandle->extWinRangeValid) {
1,391,990,664✔
2825
      pCond->type = TIMEWINDOW_RANGE_EXTERNAL;
×
2826
      code = getQueryExtWindow(&pCond->twindows, &readHandle->extWinRange, &pCond->twindows, pCond->extTwindows);
×
2827
    }
2828
  }
2829
  
2830
  return code;
1,708,155,507✔
2831
}
2832

2833
int32_t initQueryTableDataCondWithColArray(SQueryTableDataCond* pCond, SQueryTableDataCond* pOrgCond,
238,065,276✔
2834
                                           const SReadHandle* readHandle, SArray* colArray) {
2835
  int32_t code = TSDB_CODE_SUCCESS;
238,065,276✔
2836
  int32_t lino = 0;
238,065,276✔
2837

2838
  pCond->order = TSDB_ORDER_ASC;
238,065,276✔
2839
  pCond->numOfCols = (int32_t)taosArrayGetSize(colArray);
238,065,276✔
2840

2841
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
238,065,276✔
2842
  QUERY_CHECK_NULL(pCond->colList, code, lino, _return, terrno);
238,065,276✔
2843

2844
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
238,065,276✔
2845
  QUERY_CHECK_NULL(pCond->pSlotList, code, lino, _return, terrno);
238,065,276✔
2846

2847
  pCond->twindows = pOrgCond->twindows;
238,065,276✔
2848
  pCond->type = pOrgCond->type;
238,065,276✔
2849
  pCond->startVersion = -1;
238,065,276✔
2850
  pCond->endVersion = -1;
238,065,276✔
2851
  pCond->skipRollup = true;
238,065,276✔
2852
  pCond->notLoadData = false;
238,065,276✔
2853

2854
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
881,323,862✔
2855
    SColIdPair* pColPair = taosArrayGet(colArray, i);
643,258,586✔
2856
    QUERY_CHECK_NULL(pColPair, code, lino, _return, terrno);
643,258,586✔
2857

2858
    bool find = false;
643,258,586✔
2859
    for (int32_t j = 0; j < pOrgCond->numOfCols; ++j) {
2,147,483,647✔
2860
      if (pOrgCond->colList[j].colId == pColPair->vtbColId) {
2,147,483,647✔
2861
        pCond->colList[i].type = pOrgCond->colList[j].type;
643,258,586✔
2862
        pCond->colList[i].bytes = pOrgCond->colList[j].bytes;
643,258,586✔
2863
        pCond->colList[i].colId = pColPair->orgColId;
643,258,586✔
2864
        pCond->colList[i].pk = pOrgCond->colList[j].pk;
643,258,586✔
2865
        pCond->pSlotList[i] = i;
643,258,586✔
2866
        find = true;
643,258,586✔
2867
        break;
643,258,586✔
2868
      }
2869
    }
2870
    QUERY_CHECK_CONDITION(find, code, lino, _return, TSDB_CODE_NOT_FOUND);
643,258,586✔
2871
  }
2872

2873
  return code;
238,065,276✔
2874
_return:
×
2875
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(terrno));
×
2876
  taosMemoryFreeClear(pCond->colList);
×
2877
  taosMemoryFreeClear(pCond->pSlotList);
×
2878
  return code;
×
2879
}
2880

2881
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) {
2,147,483,647✔
2882
  taosMemoryFreeClear(pCond->colList);
2,147,483,647✔
2883
  taosMemoryFreeClear(pCond->pSlotList);
2,147,483,647✔
2884
}
2,147,483,647✔
2885

2886
int32_t convertFillType(int32_t mode) {
29,814,656✔
2887
  int32_t type = TSDB_FILL_NONE;
29,814,656✔
2888
  switch (mode) {
29,814,656✔
2889
    case FILL_MODE_PREV:
1,441,031✔
2890
      type = TSDB_FILL_PREV;
1,441,031✔
2891
      break;
1,441,031✔
2892
    case FILL_MODE_NONE:
×
2893
      type = TSDB_FILL_NONE;
×
2894
      break;
×
2895
    case FILL_MODE_NULL:
2,013,556✔
2896
      type = TSDB_FILL_NULL;
2,013,556✔
2897
      break;
2,013,556✔
2898
    case FILL_MODE_NULL_F:
456,169✔
2899
      type = TSDB_FILL_NULL_F;
456,169✔
2900
      break;
456,169✔
2901
    case FILL_MODE_NEXT:
1,433,809✔
2902
      type = TSDB_FILL_NEXT;
1,433,809✔
2903
      break;
1,433,809✔
2904
    case FILL_MODE_VALUE:
1,633,709✔
2905
      type = TSDB_FILL_SET_VALUE;
1,633,709✔
2906
      break;
1,633,709✔
2907
    case FILL_MODE_VALUE_F:
48,944✔
2908
      type = TSDB_FILL_SET_VALUE_F;
48,944✔
2909
      break;
48,944✔
2910
    case FILL_MODE_LINEAR:
2,252,012✔
2911
      type = TSDB_FILL_LINEAR;
2,252,012✔
2912
      break;
2,252,012✔
2913
    case FILL_MODE_NEAR:
20,535,426✔
2914
      type = TSDB_FILL_NEAR;
20,535,426✔
2915
      break;
20,535,426✔
2916
    default:
×
2917
      type = TSDB_FILL_NONE;
×
2918
  }
2919

2920
  return type;
29,814,656✔
2921
}
2922

2923
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery) {
22,152,298✔
2924
  if (ascQuery) {
22,152,298✔
2925
    *w = getAlignQueryTimeWindow(pInterval, ts);
18,766,779✔
2926
  } else {
2927
    // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
2928
    *w = getAlignQueryTimeWindow(pInterval, ts);
3,385,519✔
2929

2930
    int64_t key = w->skey;
3,387,225✔
2931
    while (key < ts) {  // moving towards end
3,572,229✔
2932
      key = getNextTimeWindowStart(pInterval, key, TSDB_ORDER_ASC);
1,619,981✔
2933
      if (key > ts) {
1,612,919✔
2934
        break;
1,427,915✔
2935
      }
2936

2937
      w->skey = key;
185,004✔
2938
    }
2939
    w->ekey = taosTimeAdd(w->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
3,380,163✔
2940
  }
2941
}
22,162,215✔
2942

2943
static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) {
729,430,067✔
2944
  STimeWindow w = {0};
729,430,067✔
2945

2946
  w.skey = taosTimeTruncate(ts, pInterval);
729,430,067✔
2947
  w.ekey = taosTimeGetIntervalEnd(w.skey, pInterval);
729,400,544✔
2948
  return w;
729,433,480✔
2949
}
2950

2951
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) {
10,327,515✔
2952
  STimeWindow win = *pWindow;
10,327,515✔
2953
  STimeWindow save = win;
10,327,515✔
2954
  while (win.skey <= ts && win.ekey >= ts) {
45,532,469✔
2955
    save = win;
35,204,954✔
2956
    // get previous time window
2957
    getNextTimeWindow(pInterval, &win, order == TSDB_ORDER_ASC ? TSDB_ORDER_DESC : TSDB_ORDER_ASC);
35,204,954✔
2958
  }
2959

2960
  return save;
10,327,515✔
2961
}
2962

2963
// get the correct time window according to the handled timestamp
2964
// todo refactor
2965
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
1,018,091,157✔
2966
                                int32_t order) {
2967
  STimeWindow w = {0};
1,018,091,157✔
2968
  if (pResultRowInfo->cur.pageId == -1) {  // the first window, from the previous stored value
1,018,147,127✔
2969
    getInitialStartTimeWindow(pInterval, ts, &w, (order == TSDB_ORDER_ASC));
22,151,800✔
2970
    return w;
22,156,161✔
2971
  }
2972

2973
  SResultRow* pRow = getResultRowByPos(pBuf, &pResultRowInfo->cur, false);
995,995,893✔
2974
  if (pRow) {
995,982,090✔
2975
    TAOS_SET_OBJ_ALIGNED(&w, pRow->win);
996,019,239✔
2976
  }
2977

2978
  // in case of typical time window, we can calculate time window directly.
2979
  if (w.skey > ts || w.ekey < ts) {
996,004,031✔
2980
    w = doCalculateTimeWindow(ts, pInterval);
729,399,995✔
2981
  }
2982

2983
  if (pInterval->interval != pInterval->sliding) {
996,036,894✔
2984
    // it is an sliding window query, in which sliding value is not equalled to
2985
    // interval value, and we need to find the first qualified time window.
2986
    w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
10,327,515✔
2987
  }
2988

2989
  return w;
995,930,635✔
2990
}
2991

2992
TSKEY getNextTimeWindowStart(const SInterval* pInterval, TSKEY start, int32_t order) {
2,147,483,647✔
2993
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
2,147,483,647✔
2994
  TSKEY   nextStart = taosTimeAdd(start, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
2,147,483,647✔
2995
  nextStart = taosTimeAdd(nextStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision, NULL);
2,147,483,647✔
2996
  nextStart = taosTimeAdd(nextStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
2,147,483,647✔
2997
  return nextStart;
2,147,483,647✔
2998
}
2999

3000
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order) {
2,147,483,647✔
3001
  tw->skey = getNextTimeWindowStart(pInterval, tw->skey, order);
2,147,483,647✔
3002
  tw->ekey = taosTimeAdd(tw->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
2,147,483,647✔
3003
}
2,147,483,647✔
3004

3005
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
2,147,483,647✔
3006
  return (pLimitInfo->limit.limit != -1 || pLimitInfo->limit.offset != -1 || pLimitInfo->slimit.limit != -1 ||
2,147,483,647✔
3007
          pLimitInfo->slimit.offset != -1);
2,147,483,647✔
3008
}
3009

3010
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo) {
×
3011
  return (pLimitInfo->slimit.limit != -1 || pLimitInfo->slimit.offset != -1);
×
3012
}
3013

3014
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) {
2,147,483,647✔
3015
  SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)};
2,147,483,647✔
3016
  SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)};
2,147,483,647✔
3017

3018
  pLimitInfo->limit = limit;
2,147,483,647✔
3019
  pLimitInfo->slimit = slimit;
2,147,483,647✔
3020
  pLimitInfo->remainOffset = limit.offset;
2,147,483,647✔
3021
  pLimitInfo->remainGroupOffset = slimit.offset;
2,147,483,647✔
3022
  pLimitInfo->numOfOutputRows = 0;
2,147,483,647✔
3023
  pLimitInfo->numOfOutputGroups = 0;
2,147,483,647✔
3024
  pLimitInfo->currentGroupId = 0;
2,147,483,647✔
3025
}
2,147,483,647✔
3026

3027
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo) {
769,847,684✔
3028
  pLimitInfo->numOfOutputRows = 0;
769,847,684✔
3029
  pLimitInfo->remainOffset = pLimitInfo->limit.offset;
769,901,502✔
3030
}
769,828,180✔
3031

3032
int32_t tableListGetSize(const STableListInfo* pTableList, int32_t* pRes) {
2,147,483,647✔
3033
  if (taosArrayGetSize(pTableList->pTableList) != taosHashGetSize(pTableList->map)) {
2,147,483,647✔
3034
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
3035
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3036
  }
3037
  (*pRes) = taosArrayGetSize(pTableList->pTableList);
2,147,483,647✔
3038
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
3039
}
3040

3041
uint64_t tableListGetSuid(const STableListInfo* pTableList) { return pTableList->idInfo.suid; }
29,467,801✔
3042

3043
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) {
2,012,956,339✔
3044
  if (taosArrayGetSize(pTableList->pTableList) == 0) {
2,012,956,339✔
3045
    return NULL;
32,764✔
3046
  }
3047

3048
  return taosArrayGet(pTableList->pTableList, index);
2,012,594,369✔
3049
}
3050

3051
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex) {
458,974✔
3052
  int32_t numOfTables = taosArrayGetSize(pTableList->pTableList);
458,974✔
3053
  if (startIndex >= numOfTables) {
458,974✔
3054
    return -1;
×
3055
  }
3056

3057
  for (int32_t i = startIndex; i < numOfTables; ++i) {
4,899,616✔
3058
    STableKeyInfo* p = taosArrayGet(pTableList->pTableList, i);
4,899,616✔
3059
    if (!p) {
4,899,616✔
3060
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3061
      return -1;
×
3062
    }
3063
    if (p->uid == uid) {
4,899,616✔
3064
      return i;
458,974✔
3065
    }
3066
  }
3067
  return -1;
×
3068
}
3069

3070
void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, uint64_t* uid, int32_t* type) {
3,632,213✔
3071
  *psuid = pTableList->idInfo.suid;
3,632,213✔
3072
  *uid = pTableList->idInfo.uid;
3,632,213✔
3073
  *type = pTableList->idInfo.tableType;
3,632,213✔
3074
}
3,632,213✔
3075

3076
uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
2,147,483,647✔
3077
  int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
2,147,483,647✔
3078
  if (slot == NULL) {
2,147,483,647✔
3079
    qDebug("table:%" PRIu64 " not found in table list", tableUid);
20,292,461✔
3080
    return -1;
20,294,669✔
3081
  }
3082

3083
  STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot);
2,147,483,647✔
3084
  return pKeyInfo->groupId;
2,147,483,647✔
3085
}
3086

3087
// TODO handle the group offset info, fix it, the rule of group output will be broken by this function
3088
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
18,458,875✔
3089
  int32_t code = TSDB_CODE_SUCCESS;
18,458,875✔
3090
  int32_t lino = 0;
18,458,875✔
3091
  if (pTableList->map == NULL) {
18,458,875✔
3092
    pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
3093
    QUERY_CHECK_NULL(pTableList->map, code, lino, _end, terrno);
×
3094
  }
3095

3096
  STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
18,468,160✔
3097
  void*         p = taosHashGet(pTableList->map, &uid, sizeof(uid));
18,463,215✔
3098
  if (p != NULL) {
18,455,161✔
3099
    qInfo("table:%" PRId64 " already in tableIdList, ignore it", uid);
8,934✔
3100
    goto _end;
8,934✔
3101
  }
3102

3103
  void* tmp = taosArrayPush(pTableList->pTableList, &keyInfo);
18,446,227✔
3104
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
18,451,788✔
3105

3106
  int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
18,451,788✔
3107
  code = taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
18,462,517✔
3108
  if (code != TSDB_CODE_SUCCESS) {
18,468,401✔
3109
    // we have checked the existence of uid in hash map above
3110
    QUERY_CHECK_CONDITION((code != TSDB_CODE_DUP_KEY), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3111
    taosArrayPopTailBatch(pTableList->pTableList, 1);  // let's pop the last element in the array list
×
3112
  }
3113

3114
_end:
18,477,335✔
3115
  if (code != TSDB_CODE_SUCCESS) {
18,461,813✔
3116
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3117
  } else {
3118
    qDebug("uid:%" PRIu64 ", groupId:%" PRIu64 " added into table list, slot:%d, total:%d", uid, gid, slot, slot + 1);
18,461,813✔
3119
  }
3120

3121
  return code;
18,481,009✔
3122
}
3123

3124
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
1,502,858,686✔
3125
                              int32_t* size) {
3126
  int32_t totalGroups = tableListGetOutputGroups(pTableList);
1,502,858,686✔
3127
  int32_t numOfTables = 0;
1,502,930,471✔
3128
  int32_t code = tableListGetSize(pTableList, &numOfTables);
1,503,048,500✔
3129
  if (code != TSDB_CODE_SUCCESS) {
1,502,778,627✔
3130
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3131
    return code;
×
3132
  }
3133

3134
  if (ordinalGroupIndex < 0 || ordinalGroupIndex >= totalGroups) {
1,502,778,627✔
3135
    return TSDB_CODE_INVALID_PARA;
×
3136
  }
3137

3138
  // here handle two special cases:
3139
  // 1. only one group exists, and 2. one table exists for each group.
3140
  if (totalGroups == 1) {
1,502,778,627✔
3141
    *size = numOfTables;
1,494,016,463✔
3142
    *pKeyInfo = (*size == 0) ? NULL : taosArrayGet(pTableList->pTableList, 0);
1,493,859,672✔
3143
    return TSDB_CODE_SUCCESS;
1,494,031,467✔
3144
  } else if (totalGroups == numOfTables) {
8,762,347✔
3145
    *size = 1;
7,871,291✔
3146
    *pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex);
7,871,291✔
3147
    return TSDB_CODE_SUCCESS;
7,871,291✔
3148
  }
3149

3150
  int32_t offset = pTableList->groupOffset[ordinalGroupIndex];
891,078✔
3151
  if (ordinalGroupIndex < totalGroups - 1) {
992,636✔
3152
    *size = pTableList->groupOffset[ordinalGroupIndex + 1] - offset;
753,628✔
3153
  } else {
3154
    *size = numOfTables - offset;
239,008✔
3155
  }
3156

3157
  *pKeyInfo = taosArrayGet(pTableList->pTableList, offset);
991,944✔
3158
  return TSDB_CODE_SUCCESS;
991,944✔
3159
}
3160

3161
int32_t tableListGetOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; }
2,147,483,647✔
3162

3163
bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; }
6,361,599✔
3164

3165
STableListInfo* tableListCreate() {
1,834,729,348✔
3166
  STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo));
1,834,729,348✔
3167
  if (pListInfo == NULL) {
1,834,155,596✔
3168
    return NULL;
×
3169
  }
3170

3171
  pListInfo->remainGroups = NULL;
1,834,155,596✔
3172
  pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
1,834,226,312✔
3173
  if (pListInfo->pTableList == NULL) {
1,834,418,451✔
3174
    goto _error;
×
3175
  }
3176

3177
  pListInfo->map = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,834,652,873✔
3178
  if (pListInfo->map == NULL) {
1,835,356,703✔
3179
    goto _error;
×
3180
  }
3181

3182
  pListInfo->numOfOuputGroups = 1;
1,835,319,258✔
3183
  return pListInfo;
1,835,320,812✔
3184

3185
_error:
×
3186
  tableListDestroy(pListInfo);
×
3187
  return NULL;
×
3188
}
3189

3190
void tableListDestroy(STableListInfo* pTableListInfo) {
1,928,968,050✔
3191
  if (pTableListInfo == NULL) {
1,928,968,050✔
3192
    return;
94,076,138✔
3193
  }
3194

3195
  taosArrayDestroy(pTableListInfo->pTableList);
1,834,891,912✔
3196
  taosMemoryFreeClear(pTableListInfo->groupOffset);
1,834,716,071✔
3197

3198
  taosHashCleanup(pTableListInfo->map);
1,834,756,253✔
3199
  taosHashCleanup(pTableListInfo->remainGroups);
1,835,076,827✔
3200
  pTableListInfo->pTableList = NULL;
1,835,016,883✔
3201
  pTableListInfo->map = NULL;
1,835,183,667✔
3202
  taosMemoryFree(pTableListInfo);
1,835,103,214✔
3203
}
3204

3205
void tableListClear(STableListInfo* pTableListInfo) {
7,617,460✔
3206
  if (pTableListInfo == NULL) {
7,617,460✔
3207
    return;
×
3208
  }
3209

3210
  taosArrayClear(pTableListInfo->pTableList);
7,617,460✔
3211
  taosHashClear(pTableListInfo->map);
7,636,233✔
3212
  taosHashClear(pTableListInfo->remainGroups);
7,646,713✔
3213
  taosMemoryFree(pTableListInfo->groupOffset);
7,646,713✔
3214
  pTableListInfo->numOfOuputGroups = 1;
7,646,713✔
3215
  pTableListInfo->oneTableForEachGroup = false;
7,646,713✔
3216
}
3217

3218
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
2,147,483,647✔
3219
  STableKeyInfo* pInfo1 = (STableKeyInfo*)p1;
2,147,483,647✔
3220
  STableKeyInfo* pInfo2 = (STableKeyInfo*)p2;
2,147,483,647✔
3221

3222
  if (pInfo1->groupId == pInfo2->groupId) {
2,147,483,647✔
3223
    return 0;
2,147,483,647✔
3224
  } else {
3225
    return pInfo1->groupId < pInfo2->groupId ? -1 : 1;
316,535,051✔
3226
  }
3227
}
3228

3229
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
278,065,344✔
3230
  taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
278,065,344✔
3231
  int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
278,155,382✔
3232

3233
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
278,140,746✔
3234
  if (!pList) {
278,110,448✔
3235
    return terrno;
×
3236
  }
3237

3238
  STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
278,110,448✔
3239
  if (!pInfo) {
277,959,493✔
3240
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3241
    return terrno;
×
3242
  }
3243
  uint64_t gid = pInfo->groupId;
277,959,493✔
3244

3245
  int32_t start = 0;
278,131,705✔
3246
  void*   tmp = taosArrayPush(pList, &start);
278,140,171✔
3247
  if (!tmp) {
278,140,171✔
3248
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3249
    return terrno;
×
3250
  }
3251

3252
  for (int32_t i = 1; i < size; ++i) {
1,512,699,208✔
3253
    pInfo = taosArrayGet(pTableListInfo->pTableList, i);
1,234,556,830✔
3254
    if (!pInfo) {
1,234,462,329✔
3255
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3256
      return terrno;
×
3257
    }
3258
    if (pInfo->groupId != gid) {
1,234,462,329✔
3259
      tmp = taosArrayPush(pList, &i);
66,910,212✔
3260
      if (!tmp) {
66,910,212✔
3261
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3262
        return terrno;
×
3263
      }
3264
      gid = pInfo->groupId;
66,910,212✔
3265
    }
3266
  }
3267

3268
  pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
278,166,183✔
3269
  pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
278,171,446✔
3270
  if (pTableListInfo->groupOffset == NULL) {
278,033,870✔
3271
    taosArrayDestroy(pList);
×
3272
    return terrno;
×
3273
  }
3274

3275
  memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
277,979,914✔
3276
  taosArrayDestroy(pList);
278,097,433✔
3277
  return TSDB_CODE_SUCCESS;
277,846,224✔
3278
}
3279

3280
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SScanPhysiNode* pScanNode,
1,569,815,417✔
3281
                                    SNodeList* group, bool groupSort, uint8_t* digest, SStorageAPI* pAPI, SHashObj* groupIdMap) {
3282
  int32_t code = TSDB_CODE_SUCCESS;
1,569,815,417✔
3283

3284
  bool   groupByTbname = groupbyTbname(group);
1,569,815,417✔
3285
  size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
1,569,552,476✔
3286
  if (!numOfTables) {
1,569,733,484✔
3287
    return code;
×
3288
  }
3289
  qDebug("numOfTables:%zu, groupByTbname:%d, group:%p", numOfTables, groupByTbname, group);
1,569,733,484✔
3290
  if (group == NULL || groupByTbname) {
1,569,699,309✔
3291
    if (tsCountAlwaysReturnValue && QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode) &&
1,537,070,716✔
3292
        ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) {
1,163,174,704✔
3293
      pTableListInfo->remainGroups =
90,551,356✔
3294
          taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
90,551,356✔
3295
      if (pTableListInfo->remainGroups == NULL) {
90,551,356✔
3296
        return terrno;
×
3297
      }
3298

3299
      for (int i = 0; i < numOfTables; i++) {
465,096,299✔
3300
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
374,549,703✔
3301
        if (!info) {
374,550,219✔
3302
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3303
          return terrno;
×
3304
        }
3305
        info->groupId = groupByTbname ? info->uid : 0;
374,550,219✔
3306
        int32_t tempRes = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId),
374,558,593✔
3307
                                      &(info->uid), sizeof(info->uid));
374,554,165✔
3308
        if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
374,546,482✔
3309
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
3310
          return tempRes;
×
3311
        }
3312
      }
3313
    } else {
3314
      for (int32_t i = 0; i < numOfTables; i++) {
2,147,483,647✔
3315
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
2,147,483,647✔
3316
        if (!info) {
2,147,483,647✔
3317
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3318
          return terrno;
×
3319
        }
3320
        info->groupId = groupByTbname ? info->uid : 0;
2,147,483,647✔
3321
        
3322
      }
3323
    }
3324
    if (groupIdMap && group != NULL){
1,536,784,644✔
3325
      getColInfoResultForGroupbyForStream(pHandle->vnode, group, pTableListInfo, pAPI, groupIdMap);
1,612,882✔
3326
    }
3327

3328
    pTableListInfo->oneTableForEachGroup = groupByTbname;
1,536,784,644✔
3329
    if (numOfTables == 1 && pTableListInfo->idInfo.tableType == TSDB_CHILD_TABLE) {
1,537,215,257✔
3330
      pTableListInfo->oneTableForEachGroup = true;
408,347,278✔
3331
    }
3332

3333
    if (groupSort && groupByTbname) {
1,537,204,371✔
3334
      taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
23,377,004✔
3335
      pTableListInfo->numOfOuputGroups = numOfTables;
23,373,985✔
3336
    } else if (groupByTbname && pScanNode->groupOrderScan) {
1,513,827,367✔
3337
      pTableListInfo->numOfOuputGroups = numOfTables;
330,615✔
3338
    } else {
3339
      pTableListInfo->numOfOuputGroups = 1;
1,513,500,665✔
3340
    }
3341
    if (groupSort || pScanNode->groupOrderScan) {
1,537,073,170✔
3342
      code = sortTableGroup(pTableListInfo);
276,024,277✔
3343
    }
3344
  } else {
3345
    bool initRemainGroups = false;
32,628,593✔
3346
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) {
32,628,593✔
3347
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode;
31,370,212✔
3348
      if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable &&
31,370,212✔
3349
          !(groupSort || pScanNode->groupOrderScan)) {
15,256,319✔
3350
        initRemainGroups = true;
15,016,865✔
3351
      }
3352
    }
3353

3354
    code = getColInfoResultForGroupby(pHandle->vnode, group, pTableListInfo, digest, pAPI, initRemainGroups, groupIdMap);
32,628,593✔
3355
    if (code != TSDB_CODE_SUCCESS) {
32,620,497✔
3356
      return code;
16,005✔
3357
    }
3358

3359
    if (pScanNode->groupOrderScan) pTableListInfo->numOfOuputGroups = taosArrayGetSize(pTableListInfo->pTableList);
32,604,492✔
3360

3361
    if (groupSort || pScanNode->groupOrderScan) {
32,608,341✔
3362
      code = sortTableGroup(pTableListInfo);
2,023,091✔
3363
    }
3364
  }
3365

3366
  // add all table entry in the hash map
3367
  size_t size = taosArrayGetSize(pTableListInfo->pTableList);
1,569,465,948✔
3368
  for (int32_t i = 0; i < size; ++i) {
2,147,483,647✔
3369
    STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i);
2,147,483,647✔
3370
    if (!p) {
2,147,483,647✔
3371
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3372
      return terrno;
×
3373
    }
3374
    int32_t tempRes = taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t));
2,147,483,647✔
3375
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
2,147,483,647✔
3376
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
3377
      return tempRes;
×
3378
    }
3379
  }
3380

3381
  return code;
1,570,011,696✔
3382
}
3383

3384
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
1,706,857,949✔
3385
                                STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
3386
                                SExecTaskInfo* pTaskInfo, SHashObj* groupIdMap) {
3387
  int64_t     st = taosGetTimestampUs();
1,706,896,812✔
3388
  const char* idStr = GET_TASKID(pTaskInfo);
1,706,896,812✔
3389

3390
  if (pHandle == NULL) {
1,705,056,905✔
3391
    qError("invalid handle, in creating operator tree, %s", idStr);
×
3392
    return TSDB_CODE_INVALID_PARA;
×
3393
  }
3394

3395
  if (pHandle->uid != 0) {
1,705,056,905✔
3396
    pScanNode->uid = pHandle->uid;
466,748✔
3397
    pScanNode->tableType = TSDB_CHILD_TABLE;
466,748✔
3398
  }
3399
  uint8_t digest[17] = {0};
1,706,616,137✔
3400
  int32_t code = getTableList(pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo, digest, idStr,
1,706,738,031✔
3401
                              &pTaskInfo->storageAPI, pTaskInfo->pStreamRuntimeInfo);
1,706,788,151✔
3402
  if (code != TSDB_CODE_SUCCESS) {
1,707,681,295✔
3403
    qError("failed to getTableList, code:%s", tstrerror(code));
10,670✔
3404
    return code;
10,670✔
3405
  }
3406

3407
  int32_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
1,707,670,625✔
3408

3409
  int64_t st1 = taosGetTimestampUs();
1,707,827,139✔
3410
  pTaskInfo->cost.extractListTime = (st1 - st) / 1000.0;
1,707,827,139✔
3411
  qDebug("extract queried table list completed, %d tables, elapsed time:%.2f ms %s", numOfTables,
1,707,893,862✔
3412
         pTaskInfo->cost.extractListTime, idStr);
3413

3414
  if (numOfTables == 0) {
1,707,659,674✔
3415
    qDebug("no table qualified for query, %s", idStr);
137,896,413✔
3416
    return TSDB_CODE_SUCCESS;
137,901,448✔
3417
  }
3418

3419
  code = buildGroupIdMapForAllTables(pTableListInfo, pHandle, pScanNode, pGroupTags, groupSort, digest, &pTaskInfo->storageAPI, groupIdMap);
1,569,763,261✔
3420
  if (code != TSDB_CODE_SUCCESS) {
1,569,970,191✔
3421
    return code;
16,005✔
3422
  }
3423

3424
  pTaskInfo->cost.groupIdMapTime = (taosGetTimestampUs() - st1) / 1000.0;
1,569,990,385✔
3425
  qDebug("generate group id map completed, elapsed time:%.2f ms %s", pTaskInfo->cost.groupIdMapTime, idStr);
1,569,914,066✔
3426

3427
  return TSDB_CODE_SUCCESS;
1,569,901,972✔
3428
}
3429

3430
char* getStreamOpName(uint16_t opType) {
78,383,899✔
3431
  switch (opType) {
78,383,899✔
3432
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
×
3433
      return "stream scan";
×
3434
    case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
77,645,451✔
3435
      return "project";
77,645,451✔
3436
    case QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW:
738,448✔
3437
      return "external window";
738,448✔
3438
  }
3439
  return "error name";
×
3440
}
3441

3442
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr, int64_t qId) {
2,147,483,647✔
3443
  if (qDebugFlag & DEBUG_DEBUG) {
2,147,483,647✔
3444
    if (!pBlock) {
2,147,483,647✔
3445
      qDebug("%" PRIx64 " %s %s %s: Block is Null", qId, taskIdStr, flag, __func__);
133,665,533✔
3446
      return;
133,670,393✔
3447
    } else if (pBlock->info.rows == 0) {
2,147,483,647✔
3448
      qDebug("%" PRIx64 " %s %s %s: Block is Empty. block type %d", qId, taskIdStr, flag, __func__, pBlock->info.type);
259,410,758✔
3449
      return;
259,444,255✔
3450
    }
3451
    
3452
    char*   pBuf = NULL;
2,147,483,647✔
3453
    int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr, qId);
2,147,483,647✔
3454
    if (code == 0) {
2,147,483,647✔
3455
      qDebugL("%" PRIx64 " %s %s", qId, __func__, pBuf);
2,147,483,647✔
3456
      taosMemoryFree(pBuf);
2,147,483,647✔
3457
    }
3458
  }
3459
}
3460

3461
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
×
3462
  if (!pBlock) {
×
3463
    qDebug("%s===stream===%s %s: Block is Null", taskIdStr, flag, opStr);
×
3464
    return;
×
3465
  } else if (pBlock->info.rows == 0) {
×
3466
    qDebug("%s===stream===%s %s: Block is Empty. block type %d.skey:%" PRId64 ",ekey:%" PRId64 ",version%" PRId64,
×
3467
           taskIdStr, flag, opStr, pBlock->info.type, pBlock->info.window.skey, pBlock->info.window.ekey,
3468
           pBlock->info.version);
3469
    return;
×
3470
  }
3471
  if (qDebugFlag & DEBUG_DEBUG) {
×
3472
    char* pBuf = NULL;
×
3473
    char  flagBuf[64];
×
3474
    snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
×
3475
    int32_t code = dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr, 0);
×
3476
    if (code == 0) {
×
3477
      qDebug("%s", pBuf);
×
3478
      taosMemoryFree(pBuf);
×
3479
    }
3480
  }
3481
}
3482

3483
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
1,055,239,703✔
3484

3485
void updateTimeWindowInfo(SColumnInfoData* pColData, const STimeWindow* pWin, int64_t delta) {
2,147,483,647✔
3486
  int64_t* ts = (int64_t*)pColData->pData;
2,147,483,647✔
3487

3488
  int64_t duration = pWin->ekey - pWin->skey + delta;
2,147,483,647✔
3489
  ts[2] = duration;            // set the duration
2,147,483,647✔
3490
  ts[3] = pWin->skey;          // window start key
2,147,483,647✔
3491
  ts[4] = pWin->ekey + delta;  // window end key
2,147,483,647✔
3492
}
2,147,483,647✔
3493

3494
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock,
2,147,483,647✔
3495
                 int32_t rowIndex) {
3496
  SColumnDataAgg* pColAgg = NULL;
2,147,483,647✔
3497
  const char*     isNull = oldkeyBuf;
2,147,483,647✔
3498
  const char*     p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size;
2,147,483,647✔
3499

3500
  for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
2,147,483,647✔
3501
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
2,147,483,647✔
3502
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
2,147,483,647✔
3503
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
2,147,483,647✔
3504

3505
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
2,147,483,647✔
3506
      if (isNull[i] != 1) return 1;
2,147,483,647✔
3507
    } else {
3508
      if (isNull[i] != 0) return 1;
2,147,483,647✔
3509
      const char* val = colDataGetData(pColInfoData, rowIndex);
2,147,483,647✔
3510
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
3511
        int32_t len = getJsonValueLen(val);
×
3512
        if (memcmp(p, val, len) != 0) return 1;
×
3513
        p += len;
×
3514
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
2,147,483,647✔
3515
        if (IS_STR_DATA_BLOB(pCol->type)) {
2,147,483,647✔
3516
          if (memcmp(p, val, blobDataTLen(val)) != 0) return 1;
×
3517
          p += blobDataTLen(val);
×
3518
        } else {
3519
          if (memcmp(p, val, varDataTLen(val)) != 0) return 1;
2,147,483,647✔
3520
          p += varDataTLen(val);
2,147,483,647✔
3521
        }
3522
      } else {
3523
        if (0 != memcmp(p, val, pCol->bytes)) return 1;
2,147,483,647✔
3524
        p += pCol->bytes;
2,147,483,647✔
3525
      }
3526
    }
3527
  }
3528
  if ((int32_t)(p - oldkeyBuf) != oldKeysLen) return 1;
2,147,483,647✔
3529
  return 0;
2,147,483,647✔
3530
}
3531

3532
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex) {
525,142,655✔
3533
  uint32_t        colNum = pSortGroupCols->size;
525,142,655✔
3534
  SColumnDataAgg* pColAgg = NULL;
525,147,125✔
3535
  char*           isNull = keyBuf;
525,147,125✔
3536
  char*           p = keyBuf + sizeof(int8_t) * colNum;
525,147,125✔
3537

3538
  for (int32_t i = 0; i < colNum; ++i) {
1,568,440,928✔
3539
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
1,043,237,907✔
3540
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
1,043,338,340✔
3541
    if (pCol->slotId > pBlock->pDataBlock->size) continue;
1,043,396,356✔
3542

3543
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
1,043,418,962✔
3544

3545
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
2,086,691,248✔
3546
      isNull[i] = 1;
37,512,690✔
3547
    } else {
3548
      isNull[i] = 0;
1,005,920,157✔
3549
      const char* val = colDataGetData(pColInfoData, rowIndex);
1,005,817,745✔
3550
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
1,005,934,117✔
3551
        int32_t len = getJsonValueLen(val);
×
3552
        memcpy(p, val, len);
×
3553
        p += len;
×
3554
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
1,005,851,117✔
3555
        if (IS_STR_DATA_BLOB(pCol->type)) {
151,910,177✔
3556
          blobDataCopy(p, val);
×
3557
          p += blobDataTLen(val);
×
3558
        } else {
3559
          varDataCopy(p, val);
151,996,164✔
3560
          p += varDataTLen(val);
151,985,352✔
3561
        }
3562
      } else {
3563
        memcpy(p, val, pCol->bytes);
853,953,733✔
3564
        p += pCol->bytes;
853,941,678✔
3565
      }
3566
    }
3567
  }
3568
  return (int32_t)(p - keyBuf);
525,203,021✔
3569
}
3570

3571
uint64_t calcGroupId(char* pData, int32_t len) {
2,147,483,647✔
3572
  T_MD5_CTX context;
2,147,483,647✔
3573
  tMD5Init(&context);
2,147,483,647✔
3574
  tMD5Update(&context, (uint8_t*)pData, len);
2,147,483,647✔
3575
  tMD5Final(&context);
2,147,483,647✔
3576

3577
  // NOTE: only extract the initial 8 bytes of the final MD5 digest
3578
  uint64_t id = 0;
2,147,483,647✔
3579
  memcpy(&id, context.digest, sizeof(uint64_t));
2,147,483,647✔
3580
  if (0 == id) memcpy(&id, context.digest + 8, sizeof(uint64_t));
2,147,483,647✔
3581
  return id;
2,147,483,647✔
3582
}
3583

3584
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
980,796✔
3585
  SNode*     node;
3586
  SNodeList* ret = NULL;
980,796✔
3587
  FOREACH(node, pSortKeys) {
2,991,120✔
3588
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)node;
2,010,324✔
3589
    int32_t           code = nodesListMakeAppend(&ret, pSortKey->pExpr);
2,010,324✔
3590
    if (code != TSDB_CODE_SUCCESS) {
2,010,324✔
3591
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3592
      terrno = code;
×
3593
      return NULL;
×
3594
    }
3595
  }
3596
  return ret;
980,796✔
3597
}
3598

3599
int32_t extractKeysLen(const SArray* keys, int32_t* pLen) {
980,796✔
3600
  int32_t code = TSDB_CODE_SUCCESS;
980,796✔
3601
  int32_t lino = 0;
980,796✔
3602
  int32_t len = 0;
980,796✔
3603
  int32_t keyNum = taosArrayGetSize(keys);
980,796✔
3604
  for (int32_t i = 0; i < keyNum; ++i) {
2,495,400✔
3605
    SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
1,514,604✔
3606
    QUERY_CHECK_NULL(pCol, code, lino, _end, terrno);
1,512,052✔
3607
    len += pCol->bytes;
1,512,052✔
3608
  }
3609
  len += sizeof(int8_t) * keyNum;  // null flag
980,796✔
3610
  *pLen = len;
980,796✔
3611

3612
_end:
980,796✔
3613
  if (code != TSDB_CODE_SUCCESS) {
980,796✔
3614
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3615
  }
3616
  return code;
980,796✔
3617
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc