• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4898

26 Dec 2025 09:58AM UTC coverage: 65.061% (-0.7%) from 65.717%
#4898

push

travis-ci

web-flow
feat: support encryption of configuration files, data files and metadata files (#33801)

350 of 1333 new or added lines in 31 files covered. (26.26%)

2796 existing lines in 159 files now uncovered.

184024 of 282850 relevant lines covered (65.06%)

113940470.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.53
/source/libs/executor/src/executil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "function.h"
17
#include "functionMgt.h"
18
#include "index.h"
19
#include "os.h"
20
#include "query.h"
21
#include "querynodes.h"
22
#include "taoserror.h"
23
#include "tarray.h"
24
#include "tcompare.h"
25
#include "tdatablock.h"
26
#include "thash.h"
27
#include "tmsg.h"
28
#include "ttime.h"
29

30
#include "executil.h"
31
#include "executorInt.h"
32
#include "querytask.h"
33
#include "storageapi.h"
34
#include "tutil.h"
35
#include "tjson.h"
36
#include "trpc.h"
37
#include "filter.h"
38

39
typedef struct tagFilterAssist {
40
  SHashObj* colHash;
41
  int32_t   index;
42
  SArray*   cInfoList;
43
  int32_t   code;
44
} tagFilterAssist;
45

46
typedef struct STransTagExprCtx {
47
  int32_t      code;
48
  SMetaReader* pReader;
49
} STransTagExprCtx;
50

51
typedef enum {
52
  FILTER_NO_LOGIC = 1,
53
  FILTER_AND,
54
  FILTER_OTHER,
55
} FilterCondType;
56

57
static FilterCondType checkTagCond(SNode* cond);
58
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI);
59
static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI,
60
                                        uint64_t suid);
61

62
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
63
                            STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI, void* pStreamInfo);
64

65
static int64_t getLimit(const SNode* pLimit) {
1,063,282,711✔
66
  return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->limit) ? -1 : ((SLimitNode*)pLimit)->limit->datum.i;
1,063,282,711✔
67
}
68
static int64_t getOffset(const SNode* pLimit) {
1,063,184,299✔
69
  return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->offset) ? -1 : ((SLimitNode*)pLimit)->offset->datum.i;
1,063,184,299✔
70
}
71
static void releaseColInfoData(void* pCol);
72

73
void initResultRowInfo(SResultRowInfo* pResultRowInfo) {
439,663,802✔
74
  pResultRowInfo->size = 0;
439,663,802✔
75
  pResultRowInfo->cur.pageId = -1;
439,703,890✔
76
}
439,751,911✔
77

78
void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; }
5,161,844✔
79

80
void resetResultRow(SResultRow* pResultRow, size_t entrySize) {
837,734,130✔
81
  pResultRow->numOfRows = 0;
837,734,130✔
82
  pResultRow->closed = false;
837,734,130✔
83
  pResultRow->endInterp = false;
837,481,420✔
84
  pResultRow->startInterp = false;
837,734,130✔
85

86
  if (entrySize > 0) {
837,733,390✔
87
    memset(pResultRow->pEntryInfo, 0, entrySize);
837,733,390✔
88
  }
89
}
837,735,240✔
90

91
// TODO refactor: use macro
92
SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) {
2,147,483,647✔
93
  return (SResultRowEntryInfo*)((char*)pRow->pEntryInfo + offset[index]);
2,147,483,647✔
94
}
95

96
size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
257,191,532✔
97
  int32_t rowSize = (numOfOutput * sizeof(SResultRowEntryInfo)) + sizeof(SResultRow);
257,191,532✔
98

99
  for (int32_t i = 0; i < numOfOutput; ++i) {
1,050,157,113✔
100
    rowSize += pCtx[i].resDataInfo.interBufSize;
793,027,272✔
101
  }
102

103
  return rowSize;
257,129,841✔
104
}
105

106
// Convert buf read from rocksdb to result row
107
int32_t getResultRowFromBuf(SExprSupp* pSup, const char* inBuf, size_t inBufSize, char** outBuf, size_t* outBufSize) {
×
108
  if (inBuf == NULL || pSup == NULL) {
×
109
    qError("invalid input parameters, inBuf:%p, pSup:%p", inBuf, pSup);
×
110
    return TSDB_CODE_INVALID_PARA;
×
111
  }
112
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
113
  int32_t*        offset = pSup->rowEntryInfoOffset;
×
114
  SResultRow*     pResultRow = NULL;
×
115
  size_t          processedSize = 0;
×
116
  int32_t         code = TSDB_CODE_SUCCESS;
×
117

118
  // calculate the size of output buffer
119
  *outBufSize = getResultRowSize(pCtx, pSup->numOfExprs);
×
120
  *outBuf = taosMemoryMalloc(*outBufSize);
×
121
  if (*outBuf == NULL) {
×
122
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
123
    return terrno;
×
124
  }
125
  pResultRow = (SResultRow*)*outBuf;
×
126
  (void)memcpy(pResultRow, inBuf, sizeof(SResultRow));
×
127
  inBuf += sizeof(SResultRow);
×
128
  processedSize += sizeof(SResultRow);
×
129

130
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
×
131
    int32_t len = *(int32_t*)inBuf;
×
132
    inBuf += sizeof(int32_t);
×
133
    processedSize += sizeof(int32_t);
×
134
    if (pResultRow->version != FUNCTION_RESULT_INFO_VERSION && pCtx->fpSet.decode) {
×
135
      code = pCtx->fpSet.decode(&pCtx[i], inBuf, getResultEntryInfo(pResultRow, i, offset), pResultRow->version);
×
136
      if (code != TSDB_CODE_SUCCESS) {
×
137
        qError("failed to decode result row, code:%d", code);
×
138
        return code;
×
139
      }
140
    } else {
141
      (void)memcpy(getResultEntryInfo(pResultRow, i, offset), inBuf, len);
×
142
    }
143
    inBuf += len;
×
144
    processedSize += len;
×
145
  }
146

147
  if (processedSize < inBufSize) {
×
148
    // stream stores extra data after result row
149
    size_t leftLen = inBufSize - processedSize;
×
150
    TAOS_MEMORY_REALLOC(*outBuf, *outBufSize + leftLen);
×
151
    if (*outBuf == NULL) {
×
152
      qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen);
×
153
      return terrno;
×
154
    }
155
    (void)memcpy(*outBuf + *outBufSize, inBuf, leftLen);
×
156
    inBuf += leftLen;
×
157
    processedSize += leftLen;
×
158
    *outBufSize += leftLen;
×
159
  }
160

161
  qTrace("[StreamInternal] get result inBufSize:%zu, outBufSize:%zu", inBufSize, *outBufSize);
×
162
  return TSDB_CODE_SUCCESS;
×
163
}
164

165
// Convert result row to buf for rocksdb
166
int32_t putResultRowToBuf(SExprSupp* pSup, const char* inBuf, size_t inBufSize, char** outBuf, size_t* outBufSize) {
×
167
  if (pSup == NULL || inBuf == NULL || outBuf == NULL || outBufSize == NULL) {
×
168
    qError("invalid input parameters, inBuf:%p, pSup:%p, outBufSize:%p, outBuf:%p", inBuf, pSup, outBufSize, outBuf);
×
169
    return TSDB_CODE_INVALID_PARA;
×
170
  }
171

172
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
173
  int32_t*        offset = pSup->rowEntryInfoOffset;
×
174
  SResultRow*     pResultRow = (SResultRow*)inBuf;
×
175
  size_t          rowSize = getResultRowSize(pCtx, pSup->numOfExprs);
×
176

177
  if (rowSize > inBufSize) {
×
178
    qError("invalid input buffer size, rowSize:%zu, inBufSize:%zu", rowSize, inBufSize);
×
179
    return TSDB_CODE_INVALID_PARA;
×
180
  }
181

182
  // calculate the size of output buffer
183
  *outBufSize = rowSize + sizeof(int32_t) * pSup->numOfExprs;
×
184
  if (rowSize < inBufSize) {
×
185
    *outBufSize += inBufSize - rowSize;
×
186
  }
187

188
  *outBuf = taosMemoryMalloc(*outBufSize);
×
189
  if (*outBuf == NULL) {
×
190
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
191
    return terrno;
×
192
  }
193

194
  char* pBuf = *outBuf;
×
195
  pResultRow->version = FUNCTION_RESULT_INFO_VERSION;
×
196
  (void)memcpy(pBuf, pResultRow, sizeof(SResultRow));
×
197
  pBuf += sizeof(SResultRow);
×
198
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
×
199
    size_t len = sizeof(SResultRowEntryInfo) + pCtx[i].resDataInfo.interBufSize;
×
200
    *(int32_t*)pBuf = (int32_t)len;
×
201
    pBuf += sizeof(int32_t);
×
202
    (void)memcpy(pBuf, getResultEntryInfo(pResultRow, i, offset), len);
×
203
    pBuf += len;
×
204
  }
205

206
  if (rowSize < inBufSize) {
×
207
    // stream stores extra data after result row
208
    size_t leftLen = inBufSize - rowSize;
×
209
    (void)memcpy(pBuf, inBuf + rowSize, leftLen);
×
210
    pBuf += leftLen;
×
211
  }
212

213
  qTrace("[StreamInternal] put result inBufSize:%zu, outBufSize:%zu", inBufSize, *outBufSize);
×
214
  return TSDB_CODE_SUCCESS;
×
215
}
216

217
static void freeEx(void* p) { taosMemoryFree(*(void**)p); }
×
218

219
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
132,120,604✔
220
  taosMemoryFreeClear(pGroupResInfo->pBuf);
132,120,604✔
221
  if (pGroupResInfo->freeItem) {
132,126,752✔
222
    //    taosArrayDestroy(pGroupResInfo->pRows);
223
    taosArrayDestroyEx(pGroupResInfo->pRows, freeEx);
×
224
    pGroupResInfo->freeItem = false;
×
225
    pGroupResInfo->pRows = NULL;
×
226
  } else {
227
    taosArrayDestroy(pGroupResInfo->pRows);
132,119,601✔
228
    pGroupResInfo->pRows = NULL;
132,115,769✔
229
  }
230
  pGroupResInfo->index = 0;
132,117,611✔
231
  pGroupResInfo->delIndex = 0;
132,117,743✔
232
}
132,112,596✔
233

234
int32_t resultrowComparAsc(const void* p1, const void* p2) {
2,147,483,647✔
235
  SResKeyPos* pp1 = *(SResKeyPos**)p1;
2,147,483,647✔
236
  SResKeyPos* pp2 = *(SResKeyPos**)p2;
2,147,483,647✔
237

238
  if (pp1->groupId == pp2->groupId) {
2,147,483,647✔
239
    int64_t pts1 = *(int64_t*)pp1->key;
2,147,483,647✔
240
    int64_t pts2 = *(int64_t*)pp2->key;
2,147,483,647✔
241

242
    if (pts1 == pts2) {
2,147,483,647✔
243
      return 0;
×
244
    } else {
245
      return pts1 < pts2 ? -1 : 1;
2,147,483,647✔
246
    }
247
  } else {
248
    return pp1->groupId < pp2->groupId ? -1 : 1;
2,147,483,647✔
249
  }
250
}
251

252
static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); }
2,147,483,647✔
253

254
int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) {
87,213,650✔
255
  int32_t code = TSDB_CODE_SUCCESS;
87,213,650✔
256
  int32_t lino = 0;
87,213,650✔
257
  if (pGroupResInfo->pRows != NULL) {
87,213,650✔
258
    taosArrayDestroy(pGroupResInfo->pRows);
627,616✔
259
  }
260
  if (pGroupResInfo->pBuf) {
87,221,017✔
261
    taosMemoryFree(pGroupResInfo->pBuf);
627,616✔
262
    pGroupResInfo->pBuf = NULL;
627,616✔
263
  }
264

265
  // extract the result rows information from the hash map
266
  int32_t size = tSimpleHashGetSize(pHashmap);
87,200,854✔
267

268
  void* pData = NULL;
87,214,252✔
269
  pGroupResInfo->pRows = taosArrayInit(size, POINTER_BYTES);
87,214,252✔
270
  QUERY_CHECK_NULL(pGroupResInfo->pRows, code, lino, _end, terrno);
87,220,673✔
271

272
  size_t  keyLen = 0;
87,205,516✔
273
  int32_t iter = 0;
87,210,616✔
274
  int64_t bufLen = 0, offset = 0;
87,211,118✔
275

276
  // todo move away and record this during create window
277
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
2,147,483,647✔
278
    /*void* key = */ (void)tSimpleHashGetKey(pData, &keyLen);
279
    bufLen += keyLen + sizeof(SResultRowPosition);
2,147,483,647✔
280
  }
281

282
  pGroupResInfo->pBuf = taosMemoryMalloc(bufLen);
87,218,941✔
283
  QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
87,220,980✔
284

285
  iter = 0;
87,217,078✔
286
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
2,147,483,647✔
287
    void* key = tSimpleHashGetKey(pData, &keyLen);
2,147,483,647✔
288

289
    SResKeyPos* p = (SResKeyPos*)(pGroupResInfo->pBuf + offset);
2,147,483,647✔
290

291
    p->groupId = *(uint64_t*)key;
2,147,483,647✔
292
    p->pos = *(SResultRowPosition*)pData;
2,147,483,647✔
293
    memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
2,147,483,647✔
294
    void* tmp = taosArrayPush(pGroupResInfo->pRows, &p);
2,147,483,647✔
295
    QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
2,147,483,647✔
296

297
    offset += keyLen + sizeof(struct SResultRowPosition);
2,147,483,647✔
298
  }
299

300
  if (order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC) {
87,213,447✔
301
    __compar_fn_t fn = (order == TSDB_ORDER_ASC) ? resultrowComparAsc : resultrowComparDesc;
12,278,776✔
302
    size = POINTER_BYTES;
12,278,776✔
303
    taosSort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), size, fn);
12,278,776✔
304
  }
305

306
  pGroupResInfo->index = 0;
87,213,077✔
307

308
_end:
87,220,386✔
309
  if (code != TSDB_CODE_SUCCESS) {
87,226,042✔
310
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
311
  }
312
  return code;
87,226,042✔
313
}
314

315
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
×
316
  if (pGroupResInfo->pRows != NULL) {
×
317
    taosArrayDestroy(pGroupResInfo->pRows);
×
318
  }
319

320
  pGroupResInfo->freeItem = true;
×
321
  pGroupResInfo->pRows = pArrayList;
×
322
  pGroupResInfo->index = 0;
×
323
  pGroupResInfo->delIndex = 0;
×
324
}
×
325

326
bool hasRemainResults(SGroupResInfo* pGroupResInfo) {
335,955,796✔
327
  if (pGroupResInfo->pRows == NULL) {
335,955,796✔
328
    return false;
×
329
  }
330

331
  return pGroupResInfo->index < taosArrayGetSize(pGroupResInfo->pRows);
335,958,708✔
332
}
333

334
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
179,330,828✔
335
  if (pGroupResInfo->pRows == 0) {
179,330,828✔
336
    return 0;
×
337
  }
338

339
  return (int32_t)taosArrayGetSize(pGroupResInfo->pRows);
179,334,008✔
340
}
341

342
SArray* createSortInfo(SNodeList* pNodeList) {
53,456,876✔
343
  size_t numOfCols = 0;
53,456,876✔
344

345
  if (pNodeList != NULL) {
53,456,876✔
346
    numOfCols = LIST_LENGTH(pNodeList);
53,259,501✔
347
  } else {
348
    numOfCols = 0;
197,415✔
349
  }
350

351
  SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
53,457,884✔
352
  if (pList == NULL) {
53,457,795✔
353
    return pList;
×
354
  }
355

356
  for (int32_t i = 0; i < numOfCols; ++i) {
121,121,589✔
357
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)nodesListGetNode(pNodeList, i);
67,653,263✔
358
    if (!pSortKey) {
67,659,286✔
359
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
360
      taosArrayDestroy(pList);
×
361
      pList = NULL;
×
362
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
363
      break;
×
364
    }
365
    SBlockOrderInfo bi = {0};
67,659,286✔
366
    bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
67,659,743✔
367
    bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
67,658,592✔
368

369
    if (nodeType(pSortKey->pExpr) != QUERY_NODE_COLUMN) {
67,659,396✔
370
      qError("invalid order by expr type:%d", nodeType(pSortKey->pExpr));
×
371
      taosArrayDestroy(pList);
×
372
      pList = NULL;
×
373
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
374
      break;
×
375
    }
376
    
377
    SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr;
67,650,006✔
378
    bi.slotId = pColNode->slotId;
67,652,317✔
379
    void* tmp = taosArrayPush(pList, &bi);
67,664,402✔
380
    if (!tmp) {
67,664,402✔
381
      taosArrayDestroy(pList);
×
382
      pList = NULL;
×
383
      break;
×
384
    }
385
  }
386

387
  return pList;
53,468,934✔
388
}
389

390
SSDataBlock* createDataBlockFromDescNode(void* p) {
685,210,543✔
391
  SDataBlockDescNode* pNode = (SDataBlockDescNode*)p;
685,210,543✔
392
  int32_t      numOfCols = LIST_LENGTH(pNode->pSlots);
685,210,543✔
393
  SSDataBlock* pBlock = NULL;
685,299,201✔
394
  int32_t      code = createDataBlock(&pBlock);
685,276,024✔
395
  if (code) {
685,056,259✔
396
    terrno = code;
×
397
    return NULL;
×
398
  }
399

400
  pBlock->info.id.blockId = pNode->dataBlockId;
685,056,259✔
401
  pBlock->info.type = STREAM_INVALID;
685,128,140✔
402
  pBlock->info.calWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
685,160,440✔
403
  pBlock->info.watermark = INT64_MIN;
685,229,265✔
404

405
  for (int32_t i = 0; i < numOfCols; ++i) {
2,147,483,647✔
406
    SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
2,147,483,647✔
407
    if (!pDescNode) {
2,147,483,647✔
408
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
409
      blockDataDestroy(pBlock);
×
410
      pBlock = NULL;
×
411
      terrno = TSDB_CODE_INVALID_PARA;
×
412
      break;
×
413
    }
414
    SColumnInfoData idata =
2,147,483,647✔
415
        createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
2,147,483,647✔
416
    idata.info.scale = pDescNode->dataType.scale;
2,147,483,647✔
417
    idata.info.precision = pDescNode->dataType.precision;
2,147,483,647✔
418
    idata.info.noData = pDescNode->reserve;
2,147,483,647✔
419

420
    code = blockDataAppendColInfo(pBlock, &idata);
2,147,483,647✔
421
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
422
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
131,928✔
423
      blockDataDestroy(pBlock);
131,928✔
424
      pBlock = NULL;
×
425
      terrno = code;
×
426
      break;
×
427
    }
428
  }
429

430
  return pBlock;
685,436,103✔
431
}
432

433
int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) {
236,314,049✔
434
  SDataBlockInfo* pBlockInfo = &pDataBlock->info;
236,314,049✔
435

436
  for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) {
987,836,192✔
437
    SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i);
772,255,797✔
438
    if (!pItem) {
772,068,221✔
439
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
440
      return terrno;
×
441
    }
442

443
    if (pItem->isPk) {
772,068,221✔
444
      SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId);
20,637,907✔
445
      if (!pInfoData) {
20,579,366✔
446
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
447
        return terrno;
×
448
      }
449
      pBlockInfo->pks[0].type = pInfoData->info.type;
20,579,366✔
450
      pBlockInfo->pks[1].type = pInfoData->info.type;
20,591,786✔
451

452
      // allocate enough buffer size, which is pInfoData->info.bytes
453
      if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
20,591,263✔
454
        pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
6,361,815✔
455
        if (pBlockInfo->pks[0].pData == NULL) {
6,354,041✔
456
          return terrno;
×
457
        }
458

459
        pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
6,358,701✔
460
        if (pBlockInfo->pks[1].pData == NULL) {
6,356,309✔
461
          taosMemoryFreeClear(pBlockInfo->pks[0].pData);
×
462
          return terrno;
×
463
        }
464

465
        pBlockInfo->pks[0].nData = pInfoData->info.bytes;
6,356,835✔
466
        pBlockInfo->pks[1].nData = pInfoData->info.bytes;
6,362,732✔
467
      }
468

469
      break;
20,593,775✔
470
    }
471
  }
472

473
  return TSDB_CODE_SUCCESS;
236,211,389✔
474
}
475

476
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
2,155,482✔
477
  STransTagExprCtx* pCtx = pContext;
2,155,482✔
478
  SMetaReader*      mr = pCtx->pReader;
2,155,482✔
479
  bool              isTagCol = false, isTbname = false;
2,155,482✔
480
  if (nodeType(*pNode) == QUERY_NODE_COLUMN) {
2,155,482✔
481
    SColumnNode* pCol = (SColumnNode*)*pNode;
615,852✔
482
    if (pCol->colType == COLUMN_TYPE_TBNAME)
615,852✔
483
      isTbname = true;
×
484
    else
485
      isTagCol = true;
615,852✔
486
  } else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) {
1,539,630✔
487
    SFunctionNode* pFunc = (SFunctionNode*)*pNode;
×
488
    if (pFunc->funcType == FUNCTION_TYPE_TBNAME) isTbname = true;
×
489
  }
490
  if (isTagCol) {
2,155,482✔
491
    SColumnNode* pSColumnNode = *(SColumnNode**)pNode;
615,852✔
492

493
    SValueNode* res = NULL;
615,852✔
494
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
615,852✔
495
    if (NULL == res) {
615,852✔
496
      return DEAL_RES_ERROR;
×
497
    }
498

499
    res->translate = true;
615,852✔
500
    res->node.resType = pSColumnNode->node.resType;
615,852✔
501

502
    STagVal tagVal = {0};
615,852✔
503
    tagVal.cid = pSColumnNode->colId;
615,852✔
504
    const char* p = mr->pAPI->extractTagVal(mr->me.ctbEntry.pTags, pSColumnNode->node.resType.type, &tagVal);
615,852✔
505
    if (p == NULL) {
615,852✔
506
      res->node.resType.type = TSDB_DATA_TYPE_NULL;
×
507
    } else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) {
615,852✔
508
      int32_t len = ((const STag*)p)->len;
×
509
      res->datum.p = taosMemoryCalloc(len + 1, 1);
×
510
      if (NULL == res->datum.p) {
×
511
        return DEAL_RES_ERROR;
×
512
      }
513
      memcpy(res->datum.p, p, len);
×
514
    } else if (IS_VAR_DATA_TYPE(pSColumnNode->node.resType.type)) {
615,852✔
515
      if (IS_STR_DATA_BLOB(pSColumnNode->node.resType.type)) {
615,852✔
516
        return TSDB_CODE_BLOB_NOT_SUPPORT_TAG;
×
517
      }
518

519
      res->datum.p = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1);
615,852✔
520
      if (NULL == res->datum.p) {
615,852✔
521
        return DEAL_RES_ERROR;
×
522
      }
523

524
      if (IS_STR_DATA_BLOB(pSColumnNode->node.resType.type)) {
615,852✔
525
        memcpy(blobDataVal(res->datum.p), tagVal.pData, tagVal.nData);
×
526
        blobDataSetLen(res->datum.p, tagVal.nData);
×
527
      } else {
528
        memcpy(varDataVal(res->datum.p), tagVal.pData, tagVal.nData);
615,852✔
529
        varDataSetLen(res->datum.p, tagVal.nData);
615,852✔
530
      }
531
    } else {
532
      int32_t code = nodesSetValueNodeValue(res, &(tagVal.i64));
×
533
      if (code != TSDB_CODE_SUCCESS) {
×
534
        return DEAL_RES_ERROR;
×
535
      }
536
    }
537
    nodesDestroyNode(*pNode);
615,852✔
538
    *pNode = (SNode*)res;
615,852✔
539
  } else if (isTbname) {
1,539,630✔
540
    SValueNode* res = NULL;
×
541
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
×
542
    if (NULL == res) {
×
543
      return DEAL_RES_ERROR;
×
544
    }
545

546
    res->translate = true;
×
547
    res->node.resType = ((SExprNode*)(*pNode))->resType;
×
548

549
    int32_t len = strlen(mr->me.name);
×
550
    res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
×
551
    if (NULL == res->datum.p) {
×
552
      return DEAL_RES_ERROR;
×
553
    }
554
    memcpy(varDataVal(res->datum.p), mr->me.name, len);
×
555
    varDataSetLen(res->datum.p, len);
×
556
    nodesDestroyNode(*pNode);
×
557
    *pNode = (SNode*)res;
×
558
  }
559

560
  return DEAL_RES_CONTINUE;
2,155,482✔
561
}
562

563
int32_t isQualifiedTable(int64_t uid, SNode* pTagCond, void* vnode, bool* pQualified, SStorageAPI* pAPI) {
307,926✔
564
  int32_t     code = TSDB_CODE_SUCCESS;
307,926✔
565
  SMetaReader mr = {0};
307,926✔
566

567
  pAPI->metaReaderFn.initReader(&mr, vnode, META_READER_LOCK, &pAPI->metaFn);
307,926✔
568
  code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid);
307,926✔
569
  if (TSDB_CODE_SUCCESS != code) {
307,926✔
570
    pAPI->metaReaderFn.clearReader(&mr);
×
571
    *pQualified = false;
×
572

573
    return TSDB_CODE_SUCCESS;
×
574
  }
575

576
  SNode* pTagCondTmp = NULL;
307,926✔
577
  code = nodesCloneNode(pTagCond, &pTagCondTmp);
307,926✔
578
  if (TSDB_CODE_SUCCESS != code) {
307,926✔
579
    *pQualified = false;
×
580
    pAPI->metaReaderFn.clearReader(&mr);
×
581
    return code;
×
582
  }
583
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
307,926✔
584
  nodesRewriteExprPostOrder(&pTagCondTmp, doTranslateTagExpr, &ctx);
307,926✔
585
  pAPI->metaReaderFn.clearReader(&mr);
307,926✔
586
  if (TSDB_CODE_SUCCESS != ctx.code) {
307,926✔
587
    *pQualified = false;
×
588
    nodesDestroyNode(pTagCondTmp);
×
589
    terrno = code;
×
590
    return code;
×
591
  }
592

593
  SNode* pNew = NULL;
307,926✔
594
  code = scalarCalculateConstants(pTagCondTmp, &pNew);
307,926✔
595
  if (TSDB_CODE_SUCCESS != code) {
307,926✔
596
    terrno = code;
×
597
    nodesDestroyNode(pTagCondTmp);
×
598
    *pQualified = false;
×
599

600
    return code;
×
601
  }
602

603
  SValueNode* pValue = (SValueNode*)pNew;
307,926✔
604
  *pQualified = pValue->datum.b;
307,926✔
605

606
  nodesDestroyNode(pNew);
307,926✔
607
  return TSDB_CODE_SUCCESS;
307,926✔
608
}
609

610
static EDealRes getColumn(SNode** pNode, void* pContext) {
50,408,545✔
611
  tagFilterAssist* pData = (tagFilterAssist*)pContext;
50,408,545✔
612
  SColumnNode*     pSColumnNode = NULL;
50,408,545✔
613
  if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
50,412,745✔
614
    pSColumnNode = *(SColumnNode**)pNode;
17,359,942✔
615
  } else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) {
33,066,733✔
616
    SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
560,025✔
617
    if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
560,025✔
618
      pData->code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pSColumnNode);
516,746✔
619
      if (NULL == pSColumnNode) {
516,746✔
620
        return DEAL_RES_ERROR;
×
621
      }
622
      pSColumnNode->colId = -1;
516,746✔
623
      pSColumnNode->colType = COLUMN_TYPE_TBNAME;
516,409✔
624
      pSColumnNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
516,409✔
625
      pSColumnNode->node.resType.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
516,409✔
626
      nodesDestroyNode(*pNode);
516,409✔
627
      *pNode = (SNode*)pSColumnNode;
516,746✔
628
    } else {
629
      return DEAL_RES_CONTINUE;
43,279✔
630
    }
631
  } else {
632
    return DEAL_RES_CONTINUE;
32,503,213✔
633
  }
634

635
  void* data = taosHashGet(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId));
17,873,878✔
636
  if (!data) {
17,870,342✔
637
    int32_t tempRes =
638
        taosHashPut(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode)));
15,366,645✔
639
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
15,371,193✔
640
      return DEAL_RES_ERROR;
×
641
    }
642
    pSColumnNode->slotId = pData->index++;
15,371,193✔
643
    SColumnInfo cInfo = {.colId = pSColumnNode->colId,
15,366,379✔
644
                         .type = pSColumnNode->node.resType.type,
15,362,297✔
645
                         .bytes = pSColumnNode->node.resType.bytes,
15,369,890✔
646
                         .pk = pSColumnNode->isPk};
15,362,010✔
647
#if TAG_FILTER_DEBUG
648
    qDebug("tagfilter build column info, slotId:%d, colId:%d, type:%d", pSColumnNode->slotId, cInfo.colId, cInfo.type);
649
#endif
650
    void* tmp = taosArrayPush(pData->cInfoList, &cInfo);
15,361,670✔
651
    if (!tmp) {
15,372,564✔
652
      return DEAL_RES_ERROR;
×
653
    }
654
  } else {
655
    SColumnNode* col = *(SColumnNode**)data;
2,503,697✔
656
    pSColumnNode->slotId = col->slotId;
2,503,701✔
657
  }
658

659
  return DEAL_RES_CONTINUE;
17,868,290✔
660
}
661

662
static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) {
14,423,161✔
663
  SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
14,423,161✔
664
  if (pColumnData == NULL) {
14,423,142✔
665
    return terrno;
×
666
  }
667

668
  pColumnData->info.type = pType->type;
14,423,142✔
669
  pColumnData->info.bytes = pType->bytes;
14,420,133✔
670
  pColumnData->info.scale = pType->scale;
14,422,800✔
671
  pColumnData->info.precision = pType->precision;
14,419,169✔
672

673
  int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
14,420,816✔
674
  if (code != TSDB_CODE_SUCCESS) {
14,415,413✔
675
    terrno = code;
×
676
    releaseColInfoData(pColumnData);
×
677
    return terrno;
×
678
  }
679

680
  pParam->columnData = pColumnData;
14,415,413✔
681
  pParam->colAlloced = true;
14,415,069✔
682
  return TSDB_CODE_SUCCESS;
14,415,744✔
683
}
684

685
static void releaseColInfoData(void* pCol) {
3,938,651✔
686
  if (pCol) {
3,938,651✔
687
    SColumnInfoData* col = (SColumnInfoData*)pCol;
3,938,651✔
688
    colDataDestroy(col);
3,938,651✔
689
    taosMemoryFree(col);
3,938,358✔
690
  }
691
}
3,939,157✔
692

693
void freeItem(void* p) {
179,463,494✔
694
  STUidTagInfo* pInfo = p;
179,463,494✔
695
  if (pInfo->pTagVal != NULL) {
179,463,494✔
696
    taosMemoryFree(pInfo->pTagVal);
179,129,979✔
697
  }
698
}
179,452,054✔
699

700
typedef struct {
701
  col_id_t  colId;
702
  SNode*    pValueNode;
703
  int32_t   bytes;  // length defined in schema
704
} STagDataEntry;
705

706
static int compareTagDataEntry(const void* a, const void* b) {
18,048✔
707
  STagDataEntry* p1 = (STagDataEntry*)a;
18,048✔
708
  STagDataEntry* p2 = (STagDataEntry*)b;
18,048✔
709
  return compareInt16Val(&p1->colId, &p2->colId);
18,048✔
710
}
711

712
static int32_t buildTagDataEntryKey(SArray* pIdWithValue, char** keyBuf, int32_t keyLen) {
8,883✔
713
  *keyBuf = (char*)taosMemoryCalloc(1, keyLen);
8,883✔
714
  if (NULL == *keyBuf) {
9,024✔
715
    qError(
×
716
      "failed to allocate memory for tag filter optimization key, size:%d",
717
      keyLen);
718
    return terrno;
×
719
  }
720
  char* pStart = *keyBuf;
9,024✔
721
  for (int32_t i = 0; i < taosArrayGetSize(pIdWithValue); ++i) {
27,072✔
722
    STagDataEntry* entry      = (STagDataEntry*)taosArrayGet(pIdWithValue, i);
17,766✔
723
    SValueNode*    pValueNode = (SValueNode*)entry->pValueNode;
17,766✔
724
    // num type may have different bytes length, use the smaller one
725
    int32_t        bytes = TMIN(entry->bytes, pValueNode->node.resType.bytes);
17,766✔
726

727
    (void)memcpy(pStart, &entry->colId, sizeof(col_id_t));
17,907✔
728
    pStart += sizeof(col_id_t);
17,907✔
729

730
    if (!pValueNode->isNull) {
18,048✔
731
      switch (pValueNode->node.resType.type) {
15,228✔
732
        case TSDB_DATA_TYPE_BOOL:
1,692✔
733
          (void)memcpy(
1,692✔
734
            pStart, &pValueNode->datum.b, bytes);
1,692✔
735
          pStart += bytes;
1,692✔
736
          break;
1,692✔
737
        case TSDB_DATA_TYPE_TINYINT:
7,050✔
738
        case TSDB_DATA_TYPE_SMALLINT:
739
        case TSDB_DATA_TYPE_INT:
740
        case TSDB_DATA_TYPE_BIGINT:
741
        case TSDB_DATA_TYPE_TIMESTAMP:
742
          (void)memcpy(
7,050✔
743
            pStart, &pValueNode->datum.i, bytes);
7,050✔
744
          pStart += bytes;
7,050✔
745
          break;
7,050✔
746
        case TSDB_DATA_TYPE_UTINYINT:
×
747
        case TSDB_DATA_TYPE_USMALLINT:
748
        case TSDB_DATA_TYPE_UINT:
749
        case TSDB_DATA_TYPE_UBIGINT:
750
          (void)memcpy(
×
751
            pStart, &pValueNode->datum.u, bytes);
×
752
          pStart += bytes;
×
753
          break;
×
754
        case TSDB_DATA_TYPE_FLOAT:
2,397✔
755
        case TSDB_DATA_TYPE_DOUBLE:
756
          (void)memcpy(
2,397✔
757
            pStart, &pValueNode->datum.d, bytes);
2,397✔
758
          pStart += bytes;
2,397✔
759
          break;
2,397✔
760
        case TSDB_DATA_TYPE_VARCHAR:
4,089✔
761
        case TSDB_DATA_TYPE_VARBINARY:
762
        case TSDB_DATA_TYPE_NCHAR:
763
          (void)memcpy(pStart,
4,089✔
764
            varDataVal(pValueNode->datum.p), varDataLen(pValueNode->datum.p));
4,089✔
765
          pStart += varDataLen(pValueNode->datum.p);
4,089✔
766
          break;
4,089✔
767
        default:
×
768
          qError("unsupported tag data type %d in tag filter optimization",
×
769
            pValueNode->node.resType.type);
770
          return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
771
      }
772
    }
773
  }
774

775
  return TSDB_CODE_SUCCESS;
9,024✔
776
}
777

778
static void extractTagDataEntry(
18,048✔
779
  SOperatorNode* pOpNode, SArray* pIdWithValue) {
780
  SNode* pLeft = pOpNode->pLeft;
18,048✔
781
  SNode* pRight = pOpNode->pRight;
18,048✔
782
  SColumnNode* pColNode = nodeType(pLeft) == QUERY_NODE_COLUMN ?
17,907✔
783
    (SColumnNode*)pLeft : (SColumnNode*)pRight;
17,907✔
784
  SValueNode* pValueNode = nodeType(pLeft) == QUERY_NODE_VALUE ?
17,907✔
785
    (SValueNode*)pLeft : (SValueNode*)pRight;
17,907✔
786

787
  STagDataEntry entry = {0};
17,907✔
788
  entry.colId = pColNode->colId;
18,048✔
789
  entry.pValueNode = (SNode*)pValueNode;
17,907✔
790
  entry.bytes = pColNode->node.resType.bytes;
17,907✔
791
  void* _tmp = taosArrayPush(pIdWithValue, &entry);
18,048✔
792
}
18,048✔
793

794
static int32_t extractTagFilterTagDataEntries(
8,883✔
795
  const SNode* pTagCond, SArray* pIdWithVal) {
796
  if (NULL == pTagCond || NULL == pIdWithVal ||
8,883✔
797
    (nodeType(pTagCond) != QUERY_NODE_OPERATOR &&
9,024✔
798
      nodeType(pTagCond) != QUERY_NODE_LOGIC_CONDITION)) {
9,024✔
799
    qError("invalid parameter to extract tag filter symbol");
×
800
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
801
  }
802

803
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR) {
8,883✔
804
    extractTagDataEntry((SOperatorNode*)pTagCond, pIdWithVal);
×
805
  } else if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION) {
8,883✔
806
    SNode* pChild = NULL;
8,883✔
807
    FOREACH(pChild, ((SLogicConditionNode*)pTagCond)->pParameterList) {
26,790✔
808
      extractTagDataEntry((SOperatorNode*)pChild, pIdWithVal);
17,907✔
809
    }
810
  }
811

812
  taosArraySort(pIdWithVal, compareTagDataEntry);
9,306✔
813

814
  return TSDB_CODE_SUCCESS;
9,024✔
815
}
816

817
static int32_t genStableTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
9,024✔
818
  if (pTagCond == NULL) {
9,024✔
819
    return TSDB_CODE_SUCCESS;
×
820
  }
821

822
  char*   payload = NULL;
9,024✔
823
  int32_t len = 0;
9,024✔
824
  int32_t code = TSDB_CODE_SUCCESS;
9,024✔
825
  int32_t lino = 0;
9,024✔
826

827
  SArray* pIdWithVal = taosArrayInit(TARRAY_MIN_SIZE, sizeof(STagDataEntry));
9,024✔
828
  code = extractTagFilterTagDataEntries(pTagCond, pIdWithVal);
8,883✔
829
  QUERY_CHECK_CODE(code, lino, _end);
9,024✔
830
  for (int32_t i = 0; i < taosArrayGetSize(pIdWithVal); ++i) {
26,931✔
831
    STagDataEntry* pEntry = taosArrayGet(pIdWithVal, i);
18,048✔
832
    len += sizeof(col_id_t) + pEntry->bytes;
17,766✔
833
  }
834
  code = buildTagDataEntryKey(pIdWithVal, &payload, len);
8,883✔
835
  QUERY_CHECK_CODE(code, lino, _end);
9,024✔
836

837
  tMD5Init(pContext);
9,024✔
838
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
9,024✔
839
  tMD5Final(pContext);
9,024✔
840

841
_end:
8,883✔
842
  if (TSDB_CODE_SUCCESS != code) {
8,883✔
843
    qError("%s failed at line %d since %s",
×
844
      __func__, __LINE__, tstrerror(code));
845
  }
846
  taosArrayDestroy(pIdWithVal);
8,883✔
847
  taosMemoryFree(payload);
9,024✔
848
  return code;
9,024✔
849
}
850

851
static int32_t genTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
26,987✔
852
  if (pTagCond == NULL) {
26,987✔
853
    return TSDB_CODE_SUCCESS;
25,399✔
854
  }
855

856
  char*   payload = NULL;
1,588✔
857
  int32_t len = 0;
1,588✔
858
  int32_t code = nodesNodeToMsg(pTagCond, &payload, &len);
1,588✔
859
  if (code != TSDB_CODE_SUCCESS) {
1,588✔
860
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
861
    return code;
×
862
  }
863

864
  tMD5Init(pContext);
1,588✔
865
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
1,588✔
866
  tMD5Final(pContext);
1,588✔
867

868
  // void* tmp = NULL;
869
  // uint32_t size = 0;
870
  // (void)taosAscii2Hex((const char*)pContext->digest, 16, &tmp, &size);
871
  // qInfo("tag filter digest payload: %s", tmp);
872
  // taosMemoryFree(tmp);
873

874
  taosMemoryFree(payload);
1,588✔
875
  return TSDB_CODE_SUCCESS;
1,588✔
876
}
877

878
static int32_t genTbGroupDigest(const SNode* pGroup, uint8_t* filterDigest, T_MD5_CTX* pContext) {
×
879
  int32_t code = TSDB_CODE_SUCCESS;
×
880
  int32_t lino = 0;
×
881
  char*   payload = NULL;
×
882
  int32_t len = 0;
×
883
  code = nodesNodeToMsg(pGroup, &payload, &len);
×
884
  QUERY_CHECK_CODE(code, lino, _end);
×
885

886
  if (filterDigest[0]) {
×
887
    payload = taosMemoryRealloc(payload, len + tListLen(pContext->digest));
×
888
    QUERY_CHECK_NULL(payload, code, lino, _end, terrno);
×
889
    memcpy(payload + len, filterDigest + 1, tListLen(pContext->digest));
×
890
    len += tListLen(pContext->digest);
×
891
  }
892

893
  tMD5Init(pContext);
×
894
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
×
895
  tMD5Final(pContext);
×
896

897
_end:
×
898
  taosMemoryFree(payload);
×
899
  if (code != TSDB_CODE_SUCCESS) {
×
900
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
901
  }
902
  return code;
×
903
}
904

905
int32_t qGetColumnsFromNodeList(void* data, bool isList, SArray** pColList) {
14,268,388✔
906
  int32_t code = TSDB_CODE_SUCCESS;
14,268,388✔
907
  tagFilterAssist ctx = {0};
14,268,388✔
908
  ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
14,268,388✔
909
  if (ctx.colHash == NULL) {
14,268,135✔
910
    code = terrno;
×
911
    goto end;
×
912
  }
913

914
  ctx.index = 0;
14,268,135✔
915
  ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
14,268,135✔
916
  if (ctx.cInfoList == NULL) {
14,269,639✔
917
    code = terrno;
1,504✔
918
    goto end;
×
919
  }
920

921
  if (isList) {
14,268,135✔
922
    SNode* pNode = NULL;
3,780,199✔
923
    FOREACH(pNode, (SNodeList*)data) {
7,725,204✔
924
      nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx);
3,945,433✔
925
      if (TSDB_CODE_SUCCESS != ctx.code) {
3,944,798✔
926
        code = ctx.code;
×
927
        goto end;
×
928
      }
929
      REPLACE_NODE(pNode);
3,944,798✔
930
    }
931
  } else {
932
    SNode* pNode = (SNode*)data;
10,487,936✔
933
    nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx);
10,486,807✔
934
    if (TSDB_CODE_SUCCESS != ctx.code) {
10,488,965✔
935
      code = ctx.code;
×
936
      goto end;
×
937
    }
938
  }
939
  
940
  if (pColList != NULL) *pColList = ctx.cInfoList;
14,258,785✔
941
  ctx.cInfoList = NULL;
14,264,199✔
942

943
end:
14,274,578✔
944
  taosHashCleanup(ctx.colHash);
14,268,066✔
945
  taosArrayDestroy(ctx.cInfoList);
14,256,747✔
946
  return code;
14,257,222✔
947
}
948

949
static int32_t buildGroupInfo(SColumnInfoData* pValue, int32_t i, SArray* gInfo) {
388,342✔
950
  int32_t code = TSDB_CODE_SUCCESS;
388,342✔
951
  SStreamGroupValue* v = taosArrayReserve(gInfo, 1);
388,342✔
952
  if (v == NULL) {
388,342✔
953
    code = terrno;
×
954
    goto end;
×
955
  }
956
  if (colDataIsNull_s(pValue, i)) {
776,684✔
957
    v->isNull = true;
6,345✔
958
  } else {
959
    v->isNull = false;
381,997✔
960
    char* data = colDataGetData(pValue, i);
381,833✔
961
    if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
381,833✔
962
      if (tTagIsJson(data)) {
×
963
        code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
×
964
        goto end;
×
965
      }
966
      if (tTagIsJsonNull(data)) {
×
967
        v->isNull = true;
×
968
        goto end;
×
969
      }
970
      int32_t len = getJsonValueLen(data);
×
971
      v->data.type = pValue->info.type;
×
972
      v->data.nData = len;
×
973
      v->data.pData = taosMemoryCalloc(1, len + 1);
×
974
      if (v->data.pData == NULL) {
×
975
        code = terrno;
×
976
        goto end;
×
977
      }
978
      memcpy(v->data.pData, data, len);
×
979
      qDebug("buildGroupInfo:%d add json data len:%d, data:%s", i, len, (char*)v->data.pData);
×
980
    } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
381,833✔
981
      if (varDataTLen(data) > pValue->info.bytes) {
285,575✔
982
        code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
983
        goto end;
×
984
      }
985
      v->data.type = pValue->info.type;
284,659✔
986
      v->data.nData = varDataLen(data);
284,659✔
987
      v->data.pData = taosMemoryCalloc(1, varDataLen(data) + 1);
284,871✔
988
      if (v->data.pData == NULL) {
284,871✔
989
        code = terrno;
×
990
        goto end;
×
991
      }
992
      memcpy(v->data.pData, varDataVal(data), varDataLen(data));
284,871✔
993
      qDebug("buildGroupInfo:%d add var data type:%d, len:%d, data:%s", i, pValue->info.type, varDataLen(data), (char*)v->data.pData);
284,871✔
994
    } else if (pValue->info.type == TSDB_DATA_TYPE_DECIMAL) {  // reader todo decimal
96,962✔
995
      v->data.type = pValue->info.type;
×
996
      v->data.nData = pValue->info.bytes;
×
997
      v->data.pData = taosMemoryCalloc(1, pValue->info.bytes);
×
998
      if (v->data.pData == NULL) {
×
999
        code = terrno;
×
1000
        goto end;
×
1001
      }
1002
      memcpy(&v->data.pData, data, pValue->info.bytes);
×
1003
      qDebug("buildGroupInfo:%d add data type:%d, data:%"PRId64, i, pValue->info.type, v->data.val);
×
1004
    } else {  // reader todo decimal
1005
      v->data.type = pValue->info.type;
97,126✔
1006
      memcpy(&v->data.val, data, pValue->info.bytes);
96,962✔
1007
      qDebug("buildGroupInfo:%d add data type:%d, data:%"PRId64, i, pValue->info.type, v->data.val);
96,962✔
1008
    }
1009
  }
1010
end:
40,227✔
1011
  if (code != TSDB_CODE_SUCCESS) {
388,342✔
1012
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1013
    v->isNull = true;
×
1014
  }
1015
  return code;
388,342✔
1016
}
1017

1018
static void getColInfoResultForGroupbyForStream(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo,
59,851✔
1019
                                   SStorageAPI* pAPI, SHashObj* groupIdMap) {
1020
  int32_t      code = TSDB_CODE_SUCCESS;
59,851✔
1021
  int32_t      lino = 0;
59,851✔
1022
  SArray*      pBlockList = NULL;
59,851✔
1023
  SSDataBlock* pResBlock = NULL;
59,851✔
1024
  SArray*      groupData = NULL;
59,851✔
1025
  SArray*      pUidTagList = NULL;
59,851✔
1026
  SArray*      gInfo = NULL;
59,851✔
1027
  int32_t      tbNameIndex = 0;
59,851✔
1028

1029
  int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
59,851✔
1030
  if (rows == 0) {
59,851✔
1031
    return;
×
1032
  }
1033

1034
  pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
59,851✔
1035
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
59,851✔
1036

1037
  for (int32_t i = 0; i < rows; ++i) {
273,940✔
1038
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
214,089✔
1039
    QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
214,089✔
1040
    STUidTagInfo info = {.uid = pkeyInfo->uid};
214,089✔
1041
    void*        tmp = taosArrayPush(pUidTagList, &info);
214,089✔
1042
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
214,089✔
1043
  }
1044
 
1045
  if (taosArrayGetSize(pUidTagList) > 0) {
59,851✔
1046
    code = pAPI->metaFn.getTableTagsByUid(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
59,851✔
1047
  } else {
1048
    code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
×
1049
  }
1050
  if (code != TSDB_CODE_SUCCESS) {
59,851✔
1051
    goto end;
×
1052
  }
1053

1054
  SArray* pColList = NULL;
59,851✔
1055
  code = qGetColumnsFromNodeList(group, true, &pColList);
59,851✔
1056
  if (code != TSDB_CODE_SUCCESS) {
59,851✔
1057
    goto end;
×
1058
  }
1059

1060
  for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
154,831✔
1061
    SColumnInfo* tmp = (SColumnInfo*)taosArrayGet(pColList, i);
94,980✔
1062
    if (tmp != NULL && tmp->colId == -1) {
94,980✔
1063
      tbNameIndex = i;
59,851✔
1064
    }
1065
  }
1066
  
1067
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
59,851✔
1068
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
59,851✔
1069
  taosArrayDestroy(pColList);
59,851✔
1070
  if (pResBlock == NULL) {
59,851✔
1071
    code = terrno;
×
1072
    goto end;
×
1073
  }
1074

1075
  pBlockList = taosArrayInit(2, POINTER_BYTES);
59,851✔
1076
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
59,851✔
1077

1078
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
59,851✔
1079
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
59,851✔
1080

1081
  groupData = taosArrayInit(2, POINTER_BYTES);
59,851✔
1082
  QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
59,851✔
1083

1084
  SNode* pNode = NULL;
59,851✔
1085
  FOREACH(pNode, group) {
154,831✔
1086
    SScalarParam output = {0};
94,980✔
1087

1088
    switch (nodeType(pNode)) {
94,980✔
1089
      case QUERY_NODE_VALUE:
×
1090
        break;
×
1091
      case QUERY_NODE_COLUMN:
94,980✔
1092
      case QUERY_NODE_OPERATOR:
1093
      case QUERY_NODE_FUNCTION: {
1094
        SExprNode* expNode = (SExprNode*)pNode;
94,980✔
1095
        code = createResultData(&expNode->resType, rows, &output);
94,980✔
1096
        if (code != TSDB_CODE_SUCCESS) {
94,980✔
1097
          goto end;
×
1098
        }
1099
        break;
94,980✔
1100
      }
1101

1102
      default:
×
1103
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
1104
        goto end;
×
1105
    }
1106

1107
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
94,980✔
1108
      SColumnNode*     pSColumnNode = (SColumnNode*)pNode;
94,980✔
1109
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
94,980✔
1110
      QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
94,980✔
1111
      code = colDataAssign(output.columnData, pColInfo, rows, NULL);
94,980✔
1112
    } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
×
1113
      continue;
×
1114
    } else {
1115
      gTaskScalarExtra.pStreamInfo = NULL;
×
1116
      gTaskScalarExtra.pStreamRange = NULL;
×
1117
      code = scalarCalculate(pNode, pBlockList, &output, &gTaskScalarExtra);
×
1118
    }
1119

1120
    if (code != TSDB_CODE_SUCCESS) {
94,980✔
1121
      releaseColInfoData(output.columnData);
×
1122
      goto end;
×
1123
    }
1124

1125
    void* tmp = taosArrayPush(groupData, &output.columnData);
94,980✔
1126
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
94,980✔
1127
  }
1128

1129
  for (int i = 0; i < rows; i++) {
273,940✔
1130
    gInfo = taosArrayInit(taosArrayGetSize(groupData), sizeof(SStreamGroupValue));
214,089✔
1131
    QUERY_CHECK_NULL(gInfo, code, lino, end, terrno);
214,089✔
1132

1133
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
214,089✔
1134
    QUERY_CHECK_NULL(info, code, lino, end, terrno);
214,089✔
1135

1136
    for (int j = 0; j < taosArrayGetSize(groupData); j++) {
531,529✔
1137
      SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
317,440✔
1138
        int32_t ret = buildGroupInfo(pValue, i, gInfo);
317,440✔
1139
        if (ret != TSDB_CODE_SUCCESS) {
317,440✔
1140
          qError("buildGroupInfo failed at line %d since %s", __LINE__, tstrerror(ret));
×
1141
          goto end;
×
1142
        }
1143
        if (j == tbNameIndex) {
317,440✔
1144
          SStreamGroupValue* v = taosArrayGetLast(gInfo);
214,089✔
1145
          if (v != NULL){
214,089✔
1146
            v->isTbname = true;
214,089✔
1147
            v->uid = info->uid;
214,089✔
1148
          }
1149
        }
1150
    }
1151

1152
    int32_t ret = taosHashPut(groupIdMap, &info->uid, sizeof(info->uid), &gInfo, POINTER_BYTES);
214,089✔
1153
    if (ret != TSDB_CODE_SUCCESS) {
214,089✔
1154
      qError("put groupid to map failed at line %d since %s", __LINE__, tstrerror(ret));
×
1155
      goto end;
×
1156
    }
1157
    qDebug("put groupid to map gid:%" PRIu64, info->uid);
214,089✔
1158
    gInfo = NULL;
214,089✔
1159
  }
1160

1161
end:
59,851✔
1162
  blockDataDestroy(pResBlock);
59,851✔
1163
  taosArrayDestroy(pBlockList);
59,851✔
1164
  taosArrayDestroyEx(pUidTagList, freeItem);
59,851✔
1165
  taosArrayDestroyP(groupData, releaseColInfoData);
59,851✔
1166
  taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
59,851✔
1167

1168
  if (code != TSDB_CODE_SUCCESS) {
59,851✔
1169
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1170
  }
1171
}
1172

1173
int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo, uint8_t* digest,
3,719,972✔
1174
                                   SStorageAPI* pAPI, bool initRemainGroups, SHashObj* groupIdMap) {
1175
  int32_t      code = TSDB_CODE_SUCCESS;
3,719,972✔
1176
  int32_t      lino = 0;
3,719,972✔
1177
  SArray*      pBlockList = NULL;
3,719,972✔
1178
  SSDataBlock* pResBlock = NULL;
3,719,972✔
1179
  void*        keyBuf = NULL;
3,720,348✔
1180
  SArray*      groupData = NULL;
3,720,348✔
1181
  SArray*      pUidTagList = NULL;
3,720,348✔
1182
  SArray*      tableList = NULL;
3,720,348✔
1183
  SArray*      gInfo = NULL;
3,720,348✔
1184

1185
  int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
3,720,348✔
1186
  if (rows == 0) {
3,720,207✔
1187
    return TSDB_CODE_SUCCESS;
×
1188
  } 
1189

1190
  T_MD5_CTX context = {0};
3,720,207✔
1191
  if (tsTagFilterCache && groupIdMap == NULL) {
3,720,207✔
1192
    SNodeListNode* listNode = NULL;
×
1193
    code = nodesMakeNode(QUERY_NODE_NODE_LIST, (SNode**)&listNode);
×
1194
    if (TSDB_CODE_SUCCESS != code) {
×
1195
      goto end;
×
1196
    }
1197
    listNode->pNodeList = group;
×
1198
    code = genTbGroupDigest((SNode*)listNode, digest, &context);
×
1199
    QUERY_CHECK_CODE(code, lino, end);
×
1200

1201
    nodesFree(listNode);
×
1202

1203
    code = pAPI->metaFn.metaGetCachedTbGroup(pVnode, pTableListInfo->idInfo.suid, context.digest,
×
1204
                                             tListLen(context.digest), &tableList);
1205
    QUERY_CHECK_CODE(code, lino, end);
×
1206

1207
    if (tableList) {
×
1208
      taosArrayDestroy(pTableListInfo->pTableList);
×
1209
      pTableListInfo->pTableList = tableList;
×
1210
      qDebug("retrieve tb group list from cache, numOfTables:%d",
×
1211
             (int32_t)taosArrayGetSize(pTableListInfo->pTableList));
1212
      goto end;
×
1213
    }
1214
  }
1215

1216
  pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
3,720,207✔
1217
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
3,720,348✔
1218

1219
  for (int32_t i = 0; i < rows; ++i) {
24,669,619✔
1220
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
20,948,805✔
1221
    QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
20,948,215✔
1222
    STUidTagInfo info = {.uid = pkeyInfo->uid};
20,948,215✔
1223
    void*        tmp = taosArrayPush(pUidTagList, &info);
20,949,271✔
1224
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
20,949,271✔
1225
  }
1226

1227
  if (taosArrayGetSize(pUidTagList) > 0) {
3,720,814✔
1228
    code = pAPI->metaFn.getTableTagsByUid(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
3,720,348✔
1229
  } else {
1230
    code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
×
1231
  }
1232
  if (code != TSDB_CODE_SUCCESS) {
3,720,348✔
1233
    goto end;
×
1234
  }
1235

1236
  SArray* pColList = NULL;
3,720,348✔
1237
  code = qGetColumnsFromNodeList(group, true, &pColList); 
3,720,348✔
1238
  if (code != TSDB_CODE_SUCCESS) {
3,719,877✔
1239
    goto end;
×
1240
  }
1241

1242
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
3,719,877✔
1243
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
3,719,877✔
1244
  taosArrayDestroy(pColList);
3,720,348✔
1245
  if (pResBlock == NULL) {
3,720,348✔
1246
    code = terrno;
×
1247
    goto end;
×
1248
  }
1249

1250
  //  int64_t st1 = taosGetTimestampUs();
1251
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
1252

1253
  pBlockList = taosArrayInit(2, POINTER_BYTES);
3,720,348✔
1254
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
3,720,348✔
1255

1256
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
3,720,348✔
1257
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
3,720,348✔
1258

1259
  groupData = taosArrayInit(2, POINTER_BYTES);
3,720,348✔
1260
  QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
3,719,882✔
1261

1262
  SNode* pNode = NULL;
3,719,882✔
1263
  FOREACH(pNode, group) {
7,570,002✔
1264
    SScalarParam output = {0};
3,849,987✔
1265

1266
    switch (nodeType(pNode)) {
3,849,987✔
1267
      case QUERY_NODE_VALUE:
×
1268
        break;
×
1269
      case QUERY_NODE_COLUMN:
3,844,177✔
1270
      case QUERY_NODE_OPERATOR:
1271
      case QUERY_NODE_FUNCTION: {
1272
        SExprNode* expNode = (SExprNode*)pNode;
3,844,177✔
1273
        code = createResultData(&expNode->resType, rows, &output);
3,844,177✔
1274
        if (code != TSDB_CODE_SUCCESS) {
3,843,081✔
1275
          goto end;
×
1276
        }
1277
        break;
3,843,081✔
1278
      }
1279
      case QUERY_NODE_REMOTE_VALUE: {
6,276✔
1280
        SRemoteValueNode* pRemote = (SRemoteValueNode*)pNode;
6,276✔
1281
        code = qFetchRemoteValue(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, pRemote);
6,276✔
1282
        QUERY_CHECK_CODE(code, lino, end);
6,276✔
1283
        break;
6,276✔
1284
      }
1285
      
1286
      default:
×
1287
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
1288
        goto end;
×
1289
    }
1290

1291
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
3,849,357✔
1292
      SColumnNode*     pSColumnNode = (SColumnNode*)pNode;
3,820,365✔
1293
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
3,820,365✔
1294
      QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
3,819,100✔
1295
      code = colDataAssign(output.columnData, pColInfo, rows, NULL);
3,819,100✔
1296
    } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
29,924✔
1297
      continue;
6,276✔
1298
    } else {
1299
      gTaskScalarExtra.pStreamInfo = NULL;
23,812✔
1300
      gTaskScalarExtra.pStreamRange = NULL;
23,812✔
1301
      code = scalarCalculate(pNode, pBlockList, &output, &gTaskScalarExtra);
23,812✔
1302
    }
1303

1304
    if (code != TSDB_CODE_SUCCESS) {
3,843,490✔
1305
      releaseColInfoData(output.columnData);
×
1306
      goto end;
×
1307
    }
1308

1309
    void* tmp = taosArrayPush(groupData, &output.columnData);
3,843,844✔
1310
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
3,843,844✔
1311
  }
1312

1313
  int32_t keyLen = 0;
3,720,015✔
1314
  SNode*  node;
1315
  FOREACH(node, group) {
7,567,442✔
1316
    SExprNode* pExpr = (SExprNode*)node;
3,849,215✔
1317
    keyLen += pExpr->resType.bytes;
3,849,215✔
1318
  }
1319

1320
  int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
3,718,560✔
1321
  keyLen += nullFlagSize;
3,718,724✔
1322

1323
  keyBuf = taosMemoryCalloc(1, keyLen);
3,718,724✔
1324
  if (keyBuf == NULL) {
3,719,416✔
1325
    code = terrno;
×
1326
    goto end;
×
1327
  }
1328

1329
  if (initRemainGroups) {
3,719,416✔
1330
    pTableListInfo->remainGroups =
1,802,881✔
1331
        taosHashInit(rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
1,801,949✔
1332
    if (pTableListInfo->remainGroups == NULL) {
1,802,881✔
1333
      code = terrno;
×
1334
      goto end;
×
1335
    }
1336
  }
1337

1338
  for (int i = 0; i < rows; i++) {
24,666,460✔
1339
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
20,946,587✔
1340
    QUERY_CHECK_NULL(info, code, lino, end, terrno);
20,945,546✔
1341

1342
    if (groupIdMap != NULL){
20,945,546✔
1343
      gInfo = taosArrayInit(taosArrayGetSize(groupData), sizeof(SStreamGroupValue));
59,600✔
1344
    }
1345
    
1346
    char* isNull = (char*)keyBuf;
20,945,320✔
1347
    char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(group);
20,945,320✔
1348
    for (int j = 0; j < taosArrayGetSize(groupData); j++) {
42,931,205✔
1349
      SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
21,982,121✔
1350

1351
      if (groupIdMap != NULL && gInfo != NULL) {
21,982,266✔
1352
        int32_t ret = buildGroupInfo(pValue, i, gInfo);
70,738✔
1353
        if (ret != TSDB_CODE_SUCCESS) {
70,902✔
1354
          qError("buildGroupInfo failed at line %d since %s", __LINE__, tstrerror(ret));
×
1355
          taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
×
1356
          gInfo = NULL;
×
1357
        }
1358
      }
1359
      
1360
      if (colDataIsNull_s(pValue, i)) {
43,964,209✔
1361
        isNull[j] = 1;
93,807✔
1362
      } else {
1363
        isNull[j] = 0;
21,887,972✔
1364
        char* data = colDataGetData(pValue, i);
21,887,596✔
1365
        if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
21,886,567✔
1366
          // if (tTagIsJson(data)) {
1367
          //   code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
1368
          //   goto end;
1369
          // }
1370
          if (tTagIsJsonNull(data)) {
71,394✔
1371
            isNull[j] = 1;
×
1372
            continue;
×
1373
          }
1374
          int32_t len = getJsonValueLen(data);
71,394✔
1375
          memcpy(pStart, data, len);
71,394✔
1376
          pStart += len;
71,394✔
1377
        } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
21,813,988✔
1378
          if (IS_STR_DATA_BLOB(pValue->info.type)) {
19,003,886✔
UNCOV
1379
            if (blobDataTLen(data) > TSDB_MAX_BLOB_LEN) {
×
1380
              code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
1381
              goto end;
×
1382
            }
1383
            memcpy(pStart, data, blobDataTLen(data));
×
1384
            pStart += blobDataTLen(data);
×
1385
          } else {
1386
            if (varDataTLen(data) > pValue->info.bytes) {
19,003,312✔
1387
              code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
1388
              goto end;
×
1389
            }
1390
            memcpy(pStart, data, varDataTLen(data));
19,002,375✔
1391
            pStart += varDataTLen(data);
19,002,375✔
1392
          }
1393
        } else {
1394
          memcpy(pStart, data, pValue->info.bytes);
2,813,174✔
1395
          pStart += pValue->info.bytes;
2,813,422✔
1396
        }
1397
      }
1398
    }
1399

1400
    int32_t len = (int32_t)(pStart - (char*)keyBuf);
20,940,249✔
1401
    info->groupId = calcGroupId(keyBuf, len);
20,940,249✔
1402
    if (groupIdMap != NULL && gInfo != NULL) {
20,945,334✔
1403
      int32_t ret = taosHashPut(groupIdMap, &info->groupId, sizeof(info->groupId), &gInfo, POINTER_BYTES);
59,764✔
1404
      if (ret != TSDB_CODE_SUCCESS) {
59,764✔
1405
        qError("put groupid to map failed at line %d since %s", __LINE__, tstrerror(ret));
×
1406
        taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
×
1407
      }
1408
      qDebug("put groupid to map gid:%" PRIu64, info->groupId);
59,764✔
1409
      gInfo = NULL;
59,764✔
1410
    }
1411
    if (initRemainGroups) {
20,945,334✔
1412
      // groupId ~ table uid
1413
      code = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid),
10,262,841✔
1414
                         sizeof(info->uid));
1415
      if (code == TSDB_CODE_DUP_KEY) {
10,263,783✔
1416
        code = TSDB_CODE_SUCCESS;
733,535✔
1417
      }
1418
      QUERY_CHECK_CODE(code, lino, end);
10,263,783✔
1419
    }
1420
  }
1421

1422
  if (tsTagFilterCache && groupIdMap == NULL) {
3,719,873✔
1423
    tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
×
1424
    QUERY_CHECK_NULL(tableList, code, lino, end, terrno);
×
1425

1426
    code = pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest,
×
1427
                                              tListLen(context.digest), tableList,
1428
                                              taosArrayGetSize(tableList) * sizeof(STableKeyInfo));
×
1429
    QUERY_CHECK_CODE(code, lino, end);
×
1430
  }
1431

1432
  //  int64_t st2 = taosGetTimestampUs();
1433
  //  qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
1434

1435
end:
3,719,873✔
1436
  taosMemoryFreeClear(keyBuf);
3,720,348✔
1437
  blockDataDestroy(pResBlock);
3,720,348✔
1438
  taosArrayDestroy(pBlockList);
3,720,348✔
1439
  taosArrayDestroyEx(pUidTagList, freeItem);
3,719,842✔
1440
  taosArrayDestroyP(groupData, releaseColInfoData);
3,720,348✔
1441
  taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
3,718,803✔
1442

1443
  if (code != TSDB_CODE_SUCCESS) {
3,718,803✔
1444
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1445
  }
1446
  return code;
3,718,302✔
1447
}
1448

1449
static int32_t nameComparFn(const void* p1, const void* p2) {
627,916✔
1450
  const char* pName1 = *(const char**)p1;
627,916✔
1451
  const char* pName2 = *(const char**)p2;
627,916✔
1452

1453
  int32_t ret = strcmp(pName1, pName2);
627,916✔
1454
  if (ret == 0) {
627,916✔
1455
    return 0;
15,570✔
1456
  } else {
1457
    return (ret > 0) ? 1 : -1;
612,346✔
1458
  }
1459
}
1460

1461
static SArray* getTableNameList(const SNodeListNode* pList) {
349,329✔
1462
  int32_t    code = TSDB_CODE_SUCCESS;
349,329✔
1463
  int32_t    lino = 0;
349,329✔
1464
  int32_t    len = LIST_LENGTH(pList->pNodeList);
349,329✔
1465
  SListCell* cell = pList->pNodeList->pHead;
349,329✔
1466

1467
  SArray* pTbList = taosArrayInit(len, POINTER_BYTES);
349,329✔
1468
  QUERY_CHECK_NULL(pTbList, code, lino, _end, terrno);
349,329✔
1469

1470
  for (int i = 0; i < pList->pNodeList->length; i++) {
963,836✔
1471
    SValueNode* valueNode = (SValueNode*)cell->pNode;
614,507✔
1472
    if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
614,507✔
1473
      terrno = TSDB_CODE_INVALID_PARA;
×
1474
      taosArrayDestroy(pTbList);
×
1475
      return NULL;
×
1476
    }
1477

1478
    char* name = varDataVal(valueNode->datum.p);
614,507✔
1479
    void* tmp = taosArrayPush(pTbList, &name);
614,507✔
1480
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
614,507✔
1481
    cell = cell->pNext;
614,507✔
1482
  }
1483

1484
  size_t numOfTables = taosArrayGetSize(pTbList);
349,329✔
1485

1486
  // order the name
1487
  taosArraySort(pTbList, nameComparFn);
349,329✔
1488

1489
  // remove the duplicates
1490
  SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
349,329✔
1491
  QUERY_CHECK_NULL(pNewList, code, lino, _end, terrno);
349,329✔
1492
  void* tmpTbl = taosArrayGet(pTbList, 0);
349,329✔
1493
  QUERY_CHECK_NULL(tmpTbl, code, lino, _end, terrno);
349,329✔
1494
  void* tmp = taosArrayPush(pNewList, tmpTbl);
349,329✔
1495
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
349,329✔
1496

1497
  for (int32_t i = 1; i < numOfTables; ++i) {
614,507✔
1498
    char** name = taosArrayGetLast(pNewList);
265,178✔
1499
    char** nameInOldList = taosArrayGet(pTbList, i);
265,178✔
1500
    QUERY_CHECK_NULL(nameInOldList, code, lino, _end, terrno);
265,178✔
1501
    if (strcmp(*name, *nameInOldList) == 0) {
265,178✔
1502
      continue;
8,400✔
1503
    }
1504

1505
    tmp = taosArrayPush(pNewList, nameInOldList);
256,778✔
1506
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
256,778✔
1507
  }
1508

1509
_end:
349,329✔
1510
  taosArrayDestroy(pTbList);
349,329✔
1511
  if (code != TSDB_CODE_SUCCESS) {
349,329✔
1512
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1513
    return NULL;
×
1514
  }
1515
  return pNewList;
349,329✔
1516
}
1517

1518
static int tableUidCompare(const void* a, const void* b) {
×
1519
  uint64_t u1 = *(uint64_t*)a;
×
1520
  uint64_t u2 = *(uint64_t*)b;
×
1521

1522
  if (u1 == u2) {
×
1523
    return 0;
×
1524
  }
1525

1526
  return u1 < u2 ? -1 : 1;
×
1527
}
1528

1529
static int32_t filterTableInfoCompare(const void* a, const void* b) {
15,093,028✔
1530
  STUidTagInfo* p1 = (STUidTagInfo*)a;
15,093,028✔
1531
  STUidTagInfo* p2 = (STUidTagInfo*)b;
15,093,028✔
1532

1533
  if (p1->uid == p2->uid) {
15,093,028✔
1534
    return 0;
×
1535
  }
1536

1537
  return p1->uid < p2->uid ? -1 : 1;
15,211,646✔
1538
}
1539

1540
static FilterCondType checkTagCond(SNode* cond) {
11,695,757✔
1541
  if (nodeType(cond) == QUERY_NODE_OPERATOR) {
11,695,757✔
1542
    return FILTER_NO_LOGIC;
8,505,218✔
1543
  }
1544
  if (nodeType(cond) == QUERY_NODE_LOGIC_CONDITION && ((SLogicConditionNode*)cond)->condType == LOGIC_COND_TYPE_AND) {
3,190,872✔
1545
    return FILTER_AND;
2,711,246✔
1546
  }
1547
  return FILTER_OTHER;
480,625✔
1548
}
1549

1550
static int32_t optimizeTbnameInCond(void* pVnode, int64_t suid, SArray* list, SNode* cond, SStorageAPI* pAPI) {
12,038,873✔
1551
  int32_t ret = -1;
12,038,873✔
1552
  int32_t ntype = nodeType(cond);
12,038,873✔
1553

1554
  if (ntype == QUERY_NODE_OPERATOR) {
12,040,974✔
1555
    ret = optimizeTbnameInCondImpl(pVnode, list, cond, pAPI, suid);
8,841,410✔
1556
    return ret;
8,839,816✔
1557
  }
1558
  if (ntype != QUERY_NODE_LOGIC_CONDITION || ((SLogicConditionNode*)cond)->condType != LOGIC_COND_TYPE_AND) {
3,199,564✔
1559
    return ret;
480,292✔
1560
  }
1561

1562
  bool                 hasTbnameCond = false;
2,719,139✔
1563
  SLogicConditionNode* pNode = (SLogicConditionNode*)cond;
2,719,139✔
1564
  SNodeList*           pList = (SNodeList*)pNode->pParameterList;
2,719,139✔
1565

1566
  int32_t len = LIST_LENGTH(pList);
2,719,756✔
1567
  if (len <= 0) {
2,718,615✔
1568
    return ret;
×
1569
  }
1570

1571
  SListCell* cell = pList->pHead;
2,718,615✔
1572
  for (int i = 0; i < len; i++) {
8,513,809✔
1573
    if (cell == NULL) break;
5,799,970✔
1574
    if (optimizeTbnameInCondImpl(pVnode, list, cell->pNode, pAPI, suid) == 0) {
5,799,970✔
1575
      hasTbnameCond = true;
7,418✔
1576
      break;
7,418✔
1577
    }
1578
    cell = cell->pNext;
5,794,337✔
1579
  }
1580

1581
  taosArraySort(list, filterTableInfoCompare);
2,721,257✔
1582
  taosArrayRemoveDuplicate(list, filterTableInfoCompare, NULL);
2,719,281✔
1583

1584
  if (hasTbnameCond) {
2,717,523✔
1585
    ret = pAPI->metaFn.getTableTagsByUid(pVnode, suid, list);
7,418✔
1586
  }
1587

1588
  return ret;
2,718,331✔
1589
}
1590

1591
// only return uid that does not contained in pExistedUidList
1592
static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, SNode* pTagCond, SStorageAPI* pStoreAPI,
14,644,116✔
1593
                                        uint64_t suid) {
1594
  if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) {
14,644,116✔
1595
    return -1;
4,970✔
1596
  }
1597

1598
  SOperatorNode* pNode = (SOperatorNode*)pTagCond;
14,639,146✔
1599
  if (pNode->opType != OP_TYPE_IN) {
14,639,146✔
1600
    return -1;
13,801,841✔
1601
  }
1602

1603
  if ((pNode->pLeft != NULL && ((nodeType(pNode->pLeft) == QUERY_NODE_FUNCTION &&
838,235✔
1604
                                 ((SFunctionNode*)pNode->pLeft)->funcType == FUNCTION_TYPE_TBNAME)) ||
349,329✔
1605
       (nodeType(pNode->pLeft) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pLeft)->colType == COLUMN_TYPE_TBNAME)) &&
489,239✔
1606
      (pNode->pRight != NULL && nodeType(pNode->pRight) == QUERY_NODE_NODE_LIST)) {
349,662✔
1607
    SNodeListNode* pList = (SNodeListNode*)pNode->pRight;
349,329✔
1608

1609
    int32_t len = LIST_LENGTH(pList->pNodeList);
349,329✔
1610
    if (len <= 0) {
349,329✔
1611
      return -1;
×
1612
    }
1613

1614
    SArray*   pTbList = getTableNameList(pList);
349,329✔
1615
    int32_t   numOfTables = taosArrayGetSize(pTbList);
349,329✔
1616
    SHashObj* uHash = NULL;
349,329✔
1617

1618
    size_t numOfExisted = taosArrayGetSize(pExistedUidList);  // len > 0 means there already have uids
349,329✔
1619
    if (numOfExisted > 0) {
349,329✔
1620
      uHash = taosHashInit(numOfExisted / 0.7, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
1,980✔
1621
      if (!uHash) {
1,980✔
1622
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1623
        return terrno;
×
1624
      }
1625

1626
      for (int i = 0; i < numOfExisted; i++) {
1,955,728✔
1627
        STUidTagInfo* pTInfo = taosArrayGet(pExistedUidList, i);
1,955,730✔
1628
        if (!pTInfo) {
1,949,782✔
1629
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1630
          return terrno;
×
1631
        }
1632
        int32_t tempRes = taosHashPut(uHash, &pTInfo->uid, sizeof(uint64_t), &i, sizeof(i));
1,949,782✔
1633
        if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
1,953,748✔
1634
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
1635
          return tempRes;
×
1636
        }
1637
      }
1638
    }
1639

1640
    for (int i = 0; i < numOfTables; i++) {
940,858✔
1641
      char* name = taosArrayGetP(pTbList, i);
597,707✔
1642

1643
      uint64_t uid = 0, csuid = 0;
597,707✔
1644
      if (pStoreAPI->metaFn.getTableUidByName(pVnode, name, &uid) == 0) {
597,707✔
1645
        ETableType tbType = TSDB_TABLE_MAX;
350,444✔
1646
        if (pStoreAPI->metaFn.getTableTypeSuidByName(pVnode, name, &tbType, &csuid) == 0 &&
350,444✔
1647
            tbType == TSDB_CHILD_TABLE) {
350,444✔
1648
          if (suid != csuid) {
344,266✔
1649
            continue;
1,976✔
1650
          }
1651
          if (NULL == uHash || taosHashGet(uHash, &uid, sizeof(uid)) == NULL) {
342,290✔
1652
            STUidTagInfo s = {.uid = uid, .name = name, .pTagVal = NULL};
341,300✔
1653
            void*        tmp = taosArrayPush(pExistedUidList, &s);
341,300✔
1654
            if (!tmp) {
341,300✔
1655
              return terrno;
×
1656
            }
1657
          }
1658
        } else {
1659
          taosArrayDestroy(pTbList);
6,178✔
1660
          taosHashCleanup(uHash);
6,178✔
1661
          return -1;
6,178✔
1662
        }
1663
      } else {
1664
        //        qWarn("failed to get tableIds from by table name: %s, reason: %s", name, tstrerror(terrno));
1665
        terrno = 0;
247,263✔
1666
      }
1667
    }
1668

1669
    taosHashCleanup(uHash);
343,151✔
1670
    taosArrayDestroy(pTbList);
343,151✔
1671
    return 0;
343,151✔
1672
  }
1673

1674
  return -1;
489,054✔
1675
}
1676

1677
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
15,821,232✔
1678
                                        SStorageAPI* pStorageAPI) {
1679
  int32_t      code = TSDB_CODE_SUCCESS;
15,821,232✔
1680
  int32_t      lino = 0;
15,821,232✔
1681
  SSDataBlock* pResBlock = NULL;
15,821,232✔
1682
  code = createDataBlock(&pResBlock);
15,826,715✔
1683
  QUERY_CHECK_CODE(code, lino, _end);
15,817,477✔
1684

1685
  for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
32,745,443✔
1686
    SColumnInfoData colInfo = {0};
16,929,075✔
1687
    void*           tmp = taosArrayGet(pColList, i);
16,926,600✔
1688
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
16,913,864✔
1689
    colInfo.info = *(SColumnInfo*)tmp;
16,913,864✔
1690
    code = blockDataAppendColInfo(pResBlock, &colInfo);
16,913,864✔
1691
    QUERY_CHECK_CODE(code, lino, _end);
16,927,057✔
1692
  }
1693

1694
  code = blockDataEnsureCapacity(pResBlock, numOfTables);
15,824,028✔
1695
  if (code != TSDB_CODE_SUCCESS) {
15,815,359✔
1696
    terrno = code;
×
1697
    blockDataDestroy(pResBlock);
×
1698
    return NULL;
×
1699
  }
1700

1701
  pResBlock->info.rows = numOfTables;
15,815,359✔
1702

1703
  int32_t numOfCols = taosArrayGetSize(pResBlock->pDataBlock);
15,815,533✔
1704

1705
  for (int32_t i = 0; i < numOfTables; i++) {
197,464,437✔
1706
    STUidTagInfo* p1 = taosArrayGet(pUidTagList, i);
181,636,459✔
1707
    QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
181,619,334✔
1708

1709
    for (int32_t j = 0; j < numOfCols; j++) {
371,185,284✔
1710
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
189,449,166✔
1711
      QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
189,427,172✔
1712

1713
      if (pColInfo->info.colId == -1) {  // tbname
189,427,172✔
1714
        char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
6,705,407✔
1715
        if (p1->name != NULL) {
6,706,399✔
1716
          STR_TO_VARSTR(str, p1->name);
341,300✔
1717
        } else {  // name is not retrieved during filter
1718
          code = pStorageAPI->metaFn.getTableNameByUid(pVnode, p1->uid, str);
6,363,772✔
1719
          QUERY_CHECK_CODE(code, lino, _end);
6,364,605✔
1720
        }
1721

1722
        code = colDataSetVal(pColInfo, i, str, false);
6,705,905✔
1723
        QUERY_CHECK_CODE(code, lino, _end);
6,706,895✔
1724
#if TAG_FILTER_DEBUG
1725
        qDebug("tagfilter uid:%ld, tbname:%s", *uid, str + 2);
1726
#endif
1727
      } else {
1728
        STagVal tagVal = {0};
182,736,705✔
1729
        tagVal.cid = pColInfo->info.colId;
182,754,489✔
1730
        if (p1->pTagVal == NULL) {
182,791,369✔
1731
          colDataSetNULL(pColInfo, i);
3,525✔
1732
        } else {
1733
          const char* p = pStorageAPI->metaFn.extractTagVal(p1->pTagVal, pColInfo->info.type, &tagVal);
182,793,455✔
1734

1735
          if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) {
182,828,169✔
1736
            colDataSetNULL(pColInfo, i);
2,463,995✔
1737
          } else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
180,374,097✔
1738
            code = colDataSetVal(pColInfo, i, p, false);
564,430✔
1739
            QUERY_CHECK_CODE(code, lino, _end);
564,430✔
1740
          } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
302,238,718✔
1741
            if (IS_STR_DATA_BLOB(pColInfo->info.type)) {
122,402,290✔
1742
              QUERY_CHECK_CODE(code = TSDB_CODE_BLOB_NOT_SUPPORT_TAG, lino, _end);
×
1743
            }
1744
            char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
122,417,493✔
1745
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
122,419,897✔
1746
            varDataSetLen(tmp, tagVal.nData);
122,419,897✔
1747
            memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
122,435,946✔
1748
            code = colDataSetVal(pColInfo, i, tmp, false);
122,454,194✔
1749
#if TAG_FILTER_DEBUG
1750
            qDebug("tagfilter varch:%s", tmp + 2);
1751
#endif
1752
            taosMemoryFree(tmp);
122,458,024✔
1753
            QUERY_CHECK_CODE(code, lino, _end);
122,429,010✔
1754
          } else {
1755
            code = colDataSetVal(pColInfo, i, (const char*)&tagVal.i64, false);
57,402,240✔
1756
            QUERY_CHECK_CODE(code, lino, _end);
57,411,561✔
1757
#if TAG_FILTER_DEBUG
1758
            if (pColInfo->info.type == TSDB_DATA_TYPE_INT) {
1759
              qDebug("tagfilter int:%d", *(int*)(&tagVal.i64));
1760
            } else if (pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE) {
1761
              qDebug("tagfilter double:%f", *(double*)(&tagVal.i64));
1762
            }
1763
#endif
1764
          }
1765
        }
1766
      }
1767
    }
1768
  }
1769

1770
_end:
15,824,386✔
1771
  if (code != TSDB_CODE_SUCCESS) {
15,829,119✔
1772
    blockDataDestroy(pResBlock);
4,884✔
1773
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1774
    terrno = code;
×
1775
    return NULL;
×
1776
  }
1777
  return pResBlock;
15,824,235✔
1778
}
1779

1780
static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList,
10,472,858✔
1781
                                 bool* pResultList, bool addUid) {
1782
  taosArrayClear(pUidList);
10,472,858✔
1783

1784
  STableKeyInfo info = {.uid = 0, .groupId = 0};
10,471,269✔
1785
  int32_t       numOfTables = taosArrayGetSize(pUidTagList);
10,473,132✔
1786
  for (int32_t i = 0; i < numOfTables; ++i) {
168,747,987✔
1787
    if (pResultList[i]) {
158,264,574✔
1788
      STUidTagInfo* tmpTag = (STUidTagInfo*)taosArrayGet(pUidTagList, i);
71,572,923✔
1789
      if (!tmpTag) {
71,574,475✔
1790
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1791
        return terrno;
×
1792
      }
1793
      uint64_t uid = tmpTag->uid;
71,574,475✔
1794
      qDebug("tagfilter get uid:%" PRId64 ", res:%d", uid, pResultList[i]);
71,577,622✔
1795

1796
      info.uid = uid;
71,589,950✔
1797
      //qInfo("doSetQualifiedUid row:%d added to pTableList", i);
1798
      void* p = taosArrayPush(pListInfo->pTableList, &info);
71,589,950✔
1799
      if (p == NULL) {
71,583,668✔
1800
        return terrno;
×
1801
      }
1802

1803
      if (addUid) {
71,583,668✔
1804
        //qInfo("doSetQualifiedUid row:%d added to pUidList", i);
1805
        void* tmp = taosArrayPush(pUidList, &uid);
8,477✔
1806
        if (tmp == NULL) {
8,477✔
1807
          return terrno;
×
1808
        }
1809
      }
1810
    } else {
1811
      //qInfo("doSetQualifiedUid row:%d failed", i);
1812
    }
1813
  }
1814

1815
  return TSDB_CODE_SUCCESS;
10,483,413✔
1816
}
1817

1818
static int32_t copyExistedUids(SArray* pUidTagList, const SArray* pUidList) {
12,040,446✔
1819
  int32_t code = TSDB_CODE_SUCCESS;
12,040,446✔
1820
  int32_t numOfExisted = taosArrayGetSize(pUidList);
12,040,446✔
1821
  if (numOfExisted == 0) {
12,041,584✔
1822
    return code;
9,591,318✔
1823
  }
1824

1825
  for (int32_t i = 0; i < numOfExisted; ++i) {
27,803,110✔
1826
    uint64_t* uid = taosArrayGet(pUidList, i);
25,352,844✔
1827
    if (!uid) {
25,352,844✔
1828
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1829
      return terrno;
×
1830
    }
1831
    STUidTagInfo info = {.uid = *uid};
25,352,844✔
1832
    void*        tmp = taosArrayPush(pUidTagList, &info);
25,352,844✔
1833
    if (!tmp) {
25,352,844✔
1834
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1835
      return code;
×
1836
    }
1837
  }
1838
  return code;
2,450,266✔
1839
}
1840

1841
int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SNode* pTagCond, void* pVnode,
238,670,932✔
1842
                                 SIdxFltStatus status, SStorageAPI* pAPI, bool addUid, bool* listAdded, void* pStreamInfo) {
1843
  *listAdded = false;
238,670,932✔
1844
  if (pTagCond == NULL) {
238,698,803✔
1845
    return TSDB_CODE_SUCCESS;
226,627,051✔
1846
  }
1847

1848
  terrno = TSDB_CODE_SUCCESS;
12,071,752✔
1849

1850
  int32_t      lino = 0;
12,040,763✔
1851
  int32_t      code = TSDB_CODE_SUCCESS;
12,040,763✔
1852
  SArray*      pBlockList = NULL;
12,040,763✔
1853
  SSDataBlock* pResBlock = NULL;
12,040,763✔
1854
  SScalarParam output = {0};
12,039,813✔
1855
  SArray*      pUidTagList = NULL;
12,039,813✔
1856

1857
  SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
12,039,813✔
1858

1859
  //  int64_t stt = taosGetTimestampUs();
1860
  pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
12,039,622✔
1861
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
12,039,776✔
1862

1863
  code = copyExistedUids(pUidTagList, pUidList);
12,039,776✔
1864
  QUERY_CHECK_CODE(code, lino, end);
12,040,426✔
1865

1866
  int32_t filter = optimizeTbnameInCond(pVnode, pListInfo->idInfo.suid, pUidTagList, pTagCond, pAPI);
12,040,426✔
1867
  if (filter == 0) {  // tbname in filter is activated, do nothing and return
12,036,672✔
1868
    taosArrayClear(pUidList);
343,151✔
1869

1870
    int32_t numOfRows = taosArrayGetSize(pUidTagList);
343,151✔
1871
    code = taosArrayEnsureCap(pUidList, numOfRows);
343,151✔
1872
    QUERY_CHECK_CODE(code, lino, end);
343,151✔
1873

1874
    for (int32_t i = 0; i < numOfRows; ++i) {
2,661,481✔
1875
      STUidTagInfo* pInfo = taosArrayGet(pUidTagList, i);
2,318,330✔
1876
      QUERY_CHECK_NULL(pInfo, code, lino, end, terrno);
2,318,330✔
1877
      void* tmp = taosArrayPush(pUidList, &pInfo->uid);
2,318,330✔
1878
      QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
2,318,330✔
1879
    }
1880
    terrno = 0;
343,151✔
1881
  } else {
1882
    qDebug("pUidTagList size:%d", (int32_t)taosArrayGetSize(pUidTagList));
11,693,521✔
1883

1884
    FilterCondType condType = checkTagCond(pTagCond);
11,694,719✔
1885
    if (((condType == FILTER_NO_LOGIC || condType == FILTER_AND) && status != SFLT_NOT_INDEX) ||
20,604,911✔
1886
          taosArrayGetSize(pUidTagList) > 0) {
8,906,604✔
1887
      code = pAPI->metaFn.getTableTagsByUid(pVnode, pListInfo->idInfo.suid, pUidTagList);
3,046,857✔
1888
    } else {
1889
      code = pAPI->metaFn.getTableTags(pVnode, pListInfo->idInfo.suid, pUidTagList);
8,651,450✔
1890
    }
1891
    if (code != TSDB_CODE_SUCCESS) {
11,696,154✔
1892
      qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code), pListInfo->idInfo.suid);
×
1893
      terrno = code;
×
1894
      QUERY_CHECK_CODE(code, lino, end);
×
1895
    }
1896
  }
1897

1898
  qDebug("final pUidTagList size:%d", (int32_t)taosArrayGetSize(pUidTagList));
12,039,305✔
1899

1900
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
12,041,197✔
1901
  if (numOfTables == 0) {
12,042,266✔
1902
    goto end;
1,553,618✔
1903
  }
1904

1905
  SArray* pColList = NULL;
10,488,648✔
1906
  code = qGetColumnsFromNodeList(pTagCond, false, &pColList); 
10,488,985✔
1907
  if (code != TSDB_CODE_SUCCESS) {
10,474,953✔
1908
    goto end;
×
1909
  }
1910
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
10,474,953✔
1911
  taosArrayDestroy(pColList);
10,484,179✔
1912
  if (pResBlock == NULL) {
10,484,676✔
1913
    code = terrno;
×
1914
    QUERY_CHECK_CODE(code, lino, end);
×
1915
  }
1916

1917
  //fprintDataBlock(pResBlock, "tagFilter", "", 0);
1918

1919
  //  int64_t st1 = taosGetTimestampUs();
1920
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
1921
  pBlockList = taosArrayInit(2, POINTER_BYTES);
10,484,676✔
1922
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
10,484,733✔
1923

1924
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
10,489,304✔
1925
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
10,489,304✔
1926

1927
  code = createResultData(&type, numOfTables, &output);
10,489,304✔
1928
  if (code != TSDB_CODE_SUCCESS) {
10,476,010✔
1929
    terrno = code;
×
1930
    QUERY_CHECK_CODE(code, lino, end);
×
1931
  }
1932

1933
  gTaskScalarExtra.pStreamInfo = pStreamInfo;
10,476,010✔
1934
  gTaskScalarExtra.pStreamRange = NULL;
10,476,010✔
1935
  code = scalarCalculate(pTagCond, pBlockList, &output, &gTaskScalarExtra);
10,479,737✔
1936
  if (code != TSDB_CODE_SUCCESS) {
10,475,381✔
1937
    qError("failed to calculate scalar, reason:%s", tstrerror(code));
876✔
1938
    terrno = code;
876✔
1939
    QUERY_CHECK_CODE(code, lino, end);
876✔
1940
  }
1941

1942
  code = doSetQualifiedUid(pListInfo, pUidList, pUidTagList, (bool*)output.columnData->pData, addUid);
10,474,505✔
1943
  if (code != TSDB_CODE_SUCCESS) {
10,484,416✔
1944
    terrno = code;
×
1945
    QUERY_CHECK_CODE(code, lino, end);
×
1946
  }
1947
  *listAdded = true;
10,484,416✔
1948

1949
end:
12,038,719✔
1950
  if (code != TSDB_CODE_SUCCESS) {
12,038,599✔
1951
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
876✔
1952
  }
1953
  blockDataDestroy(pResBlock);
12,038,599✔
1954
  taosArrayDestroy(pBlockList);
12,035,406✔
1955
  taosArrayDestroyEx(pUidTagList, freeItem);
12,035,960✔
1956

1957
  colDataDestroy(output.columnData);
12,040,107✔
1958
  taosMemoryFreeClear(output.columnData);
12,038,067✔
1959
  return code;
12,037,530✔
1960
}
1961

1962
typedef struct {
1963
  int32_t code;
1964
  SStreamRuntimeFuncInfo* pStreamRuntimeInfo;
1965
} PlaceHolderContext;
1966

1967
static EDealRes replacePlaceHolderColumn(SNode** pNode, void* pContext) {
66,393✔
1968
  PlaceHolderContext* pData = (PlaceHolderContext*)pContext;
66,393✔
1969
  if (QUERY_NODE_FUNCTION != nodeType((*pNode))) {
66,393✔
1970
    return DEAL_RES_CONTINUE;
54,794✔
1971
  }
1972
  SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
11,740✔
1973
  if (!fmIsStreamPesudoColVal(pFuncNode->funcId)) {
11,740✔
1974
    return DEAL_RES_CONTINUE;
466✔
1975
  }
1976
  pData->code = fmSetStreamPseudoFuncParamVal(pFuncNode->funcId, pFuncNode->pParameterList, pData->pStreamRuntimeInfo);
11,274✔
1977
  if (pData->code != TSDB_CODE_SUCCESS) {
11,274✔
1978
    return DEAL_RES_ERROR;
×
1979
  }
1980
  SNode* pFirstParam = nodesListGetNode(pFuncNode->pParameterList, 0);
11,274✔
1981
  ((SValueNode*)pFirstParam)->translate = true;
11,274✔
1982
  SValueNode* res = NULL;
11,274✔
1983
  pData->code = nodesCloneNode(pFirstParam, (SNode**)&res);
11,133✔
1984
  if (NULL == res) {
11,274✔
1985
    return DEAL_RES_ERROR;
×
1986
  }
1987
  nodesDestroyNode(*pNode);
11,274✔
1988
  *pNode = (SNode*)res;
11,133✔
1989

1990
  return DEAL_RES_CONTINUE;
11,133✔
1991
}
1992

1993
static void extractTagColId(SOperatorNode* pOpNode, SArray* pColIdArray) {
18,048✔
1994
  SNode* pLeft = pOpNode->pLeft;
18,048✔
1995
  SNode* pRight = pOpNode->pRight;
18,048✔
1996
  SColumnNode* pColNode = nodeType(pLeft) == QUERY_NODE_COLUMN ?
18,048✔
1997
    (SColumnNode*)pLeft : (SColumnNode*)pRight;
18,048✔
1998

1999
  col_id_t colId = pColNode->colId;
18,048✔
2000
  void* _tmp = taosArrayPush(pColIdArray, &colId);
18,048✔
2001
}
18,048✔
2002

2003
static int32_t buildTagCondKey(
9,024✔
2004
  const SNode* pTagCond, char** pTagCondKey,
2005
  int32_t* tagCondKeyLen, SArray** pTagColIds) {
2006
  if (NULL == pTagCond ||
9,024✔
2007
    (nodeType(pTagCond) != QUERY_NODE_OPERATOR &&
9,024✔
2008
      nodeType(pTagCond) != QUERY_NODE_LOGIC_CONDITION)) {
9,024✔
2009
    qError("invalid parameter to extract tag filter symbol");
×
2010
    return TSDB_CODE_INTERNAL_ERROR;
×
2011
  }
2012
  int32_t code = TSDB_CODE_SUCCESS;
9,024✔
2013
  int32_t lino = 0;
9,024✔
2014
  *pTagColIds = taosArrayInit(4, sizeof(col_id_t));
9,024✔
2015

2016
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR) {
9,024✔
2017
    extractTagColId((SOperatorNode*)pTagCond, *pTagColIds);
×
2018
  } else if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION) {
9,024✔
2019
    SNode* pChild = NULL;
9,024✔
2020
    FOREACH(pChild, ((SLogicConditionNode*)pTagCond)->pParameterList) {
27,072✔
2021
      extractTagColId((SOperatorNode*)pChild, *pTagColIds);
18,048✔
2022
    }
2023
  }
2024

2025
  taosArraySort(*pTagColIds, compareUint16Val);
9,024✔
2026

2027
  // encode ordered colIds into key string, separated by ','
2028
  *tagCondKeyLen =
18,048✔
2029
    (int32_t)(taosArrayGetSize(*pTagColIds) * (sizeof(col_id_t) + 1) - 1);
9,024✔
2030
  *pTagCondKey = (char*)taosMemoryCalloc(1, *tagCondKeyLen);
9,024✔
2031
  TSDB_CHECK_NULL(*pTagCondKey, code, lino, _end, terrno);
9,024✔
2032
  char* pStart = *pTagCondKey;
9,024✔
2033
  for (int32_t i = 0; i < taosArrayGetSize(*pTagColIds); ++i) {
27,072✔
2034
    col_id_t* pColId = (col_id_t*)taosArrayGet(*pTagColIds, i);
18,048✔
2035
    TSDB_CHECK_NULL(pColId, code, lino, _end, terrno);
18,048✔
2036
    memcpy(pStart, pColId, sizeof(col_id_t));
18,048✔
2037
    pStart += sizeof(col_id_t);
18,048✔
2038
    if (i != taosArrayGetSize(*pTagColIds) - 1) {
18,048✔
2039
      *pStart = ',';
9,024✔
2040
      pStart += 1;
9,024✔
2041
    }
2042
  }
2043

2044
_end:
9,024✔
2045
  if (TSDB_CODE_SUCCESS != code) {
9,024✔
2046
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2047
    terrno = code;
×
2048
  }
2049
  return code;
9,024✔
2050
}
2051

2052
static EDealRes canOptimizeTagCondFilter(SNode* pTagCond, void* pContext) {
83,895✔
2053
  if (NULL == pTagCond) {
83,895✔
2054
    *(bool*)pContext = false;
×
2055
    return DEAL_RES_END;
×
2056
  }
2057
  if (nodeType(pTagCond) == QUERY_NODE_VALUE ||
83,895✔
2058
    nodeType(pTagCond) == QUERY_NODE_COLUMN) {
55,695✔
2059
    return DEAL_RES_CONTINUE;
46,248✔
2060
  }
2061
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR &&
37,647✔
2062
    ((SOperatorNode*)pTagCond)->opType == OP_TYPE_EQUAL) {
18,471✔
2063
    return DEAL_RES_CONTINUE;
18,048✔
2064
  }
2065
  if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION &&
19,599✔
2066
    ((SLogicConditionNode*)pTagCond)->condType == LOGIC_COND_TYPE_AND) {
9,024✔
2067
    return DEAL_RES_CONTINUE;
9,024✔
2068
  }
2069
  if (nodeType(pTagCond) == QUERY_NODE_FUNCTION &&
20,727✔
2070
    fmIsStreamPesudoColVal(((SFunctionNode*)pTagCond)->funcId)) {
10,152✔
2071
    return DEAL_RES_CONTINUE;
10,152✔
2072
  }
2073
  *(bool*)pContext = false;
423✔
2074
  return DEAL_RES_END;
423✔
2075
}
2076

2077
int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
238,589,819✔
2078
                     STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI, void* pStreamInfo) {
2079
  int32_t code = TSDB_CODE_SUCCESS;
238,589,819✔
2080
  int32_t lino = 0;
238,589,819✔
2081
  size_t  numOfTables = 0;
238,589,819✔
2082
  bool    listAdded = false;
238,589,819✔
2083

2084
  pListInfo->idInfo.suid = pScanNode->suid;
238,650,324✔
2085
  pListInfo->idInfo.tableType = pScanNode->tableType;
238,530,201✔
2086

2087
  SArray* pUidList = taosArrayInit(8, sizeof(uint64_t));
238,477,788✔
2088
  QUERY_CHECK_NULL(pUidList, code, lino, _error, terrno);
238,456,379✔
2089

2090
  SIdxFltStatus status = SFLT_NOT_INDEX;
238,456,379✔
2091
  char*   pTagCondKey = NULL;
238,488,516✔
2092
  int32_t tagCondKeyLen;
238,516,267✔
2093
  SArray* pTagColIds = NULL;
238,534,070✔
2094
  char*   pPayload = NULL;
238,525,478✔
2095
  qTrace("getTableList called, suid:%" PRIu64
238,525,478✔
2096
    ", tagCond:%p, tagIndexCond:%p, %d %d", pScanNode->suid, pTagCond,
2097
    pTagIndexCond, pScanNode->tableType, pScanNode->virtualStableScan);
2098
  if (pScanNode->tableType != TSDB_SUPER_TABLE && !pScanNode->virtualStableScan) {
238,525,478✔
2099
    pListInfo->idInfo.uid = pScanNode->uid;
136,975,197✔
2100
    if (pStorageAPI->metaFn.isTableExisted(pVnode, pScanNode->uid)) {
136,932,793✔
2101
      void* tmp = taosArrayPush(pUidList, &pScanNode->uid);
137,006,051✔
2102
      QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
136,995,580✔
2103
    }
2104
    code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status, pStorageAPI, false, &listAdded, pStreamInfo);
137,039,116✔
2105
    QUERY_CHECK_CODE(code, lino, _end);
137,018,017✔
2106
  } else {
2107
    bool      isStream = (pStreamInfo != NULL);
101,580,540✔
2108
    bool      hasTagCond = (pTagCond != NULL);
101,580,540✔
2109
    bool      canCacheTagEqCondFilter = false;
101,580,540✔
2110
    T_MD5_CTX context = {0};
101,599,546✔
2111

2112
    qTrace("start to get table list by tag filter, suid:%" PRIu64
101,669,795✔
2113
      ",tsStableTagFilterCache:%d, tsTagFilterCache:%d", 
2114
      pScanNode->suid, tsStableTagFilterCache, tsTagFilterCache);
2115

2116
    bool acquired = false;
101,669,795✔
2117
    // first, check whether we can use stable tag filter cache
2118
    if (tsStableTagFilterCache && isStream && hasTagCond) {
101,549,595✔
2119
      canCacheTagEqCondFilter = true;
9,447✔
2120
      nodesWalkExpr(pTagCond, canOptimizeTagCondFilter,
9,447✔
2121
        (void*)&canCacheTagEqCondFilter);
2122
    }
2123
    if (canCacheTagEqCondFilter) {
101,569,866✔
2124
      qDebug("%s, stable tag filter condition can be optimized", idstr);
9,024✔
2125
      if (((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
9,024✔
2126
        SNode* tmp = NULL;
9,024✔
2127
        code = nodesCloneNode((SNode*)pTagCond, &tmp);
9,024✔
2128
        QUERY_CHECK_CODE(code, lino, _error);
9,024✔
2129

2130
        PlaceHolderContext ctx = {.code = TSDB_CODE_SUCCESS, .pStreamRuntimeInfo = (SStreamRuntimeFuncInfo*)pStreamInfo};
9,024✔
2131
        nodesRewriteExpr(&tmp, replacePlaceHolderColumn, (void*)&ctx);
9,024✔
2132
        if (TSDB_CODE_SUCCESS != ctx.code) {
9,024✔
2133
          nodesDestroyNode(tmp);
×
2134
          code = ctx.code;
×
2135
          goto _error;
×
2136
        }
2137
        code = genStableTagFilterDigest(tmp, &context);
9,024✔
2138
        nodesDestroyNode(tmp);
9,024✔
2139
      } else {
2140
        code = genStableTagFilterDigest(pTagCond, &context);
×
2141
      }
2142
      QUERY_CHECK_CODE(code, lino, _error);
9,024✔
2143

2144
      code = buildTagCondKey(
9,024✔
2145
        pTagCond, &pTagCondKey, &tagCondKeyLen, &pTagColIds);
2146
      QUERY_CHECK_CODE(code, lino, _error);
9,024✔
2147
      code = pStorageAPI->metaFn.getStableCachedTableList(
9,024✔
2148
        pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
9,024✔
2149
        context.digest, tListLen(context.digest), pUidList, &acquired);
2150
      QUERY_CHECK_CODE(code, lino, _error);
9,024✔
2151
    } else if (tsTagFilterCache) {
101,560,842✔
2152
      // second, try to use normal tag filter cache
2153
      qDebug("%s using normal tag filter cache", idstr);
26,987✔
2154
      if (pStreamInfo != NULL && ((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
28,109✔
2155
        SNode* tmp = NULL;
1,122✔
2156
        code = nodesCloneNode((SNode*)pTagCond, &tmp);
1,122✔
2157
        QUERY_CHECK_CODE(code, lino, _error);
1,122✔
2158

2159
        PlaceHolderContext ctx = {.code = TSDB_CODE_SUCCESS, .pStreamRuntimeInfo = (SStreamRuntimeFuncInfo*)pStreamInfo};
1,122✔
2160
        nodesRewriteExpr(&tmp, replacePlaceHolderColumn, (void*)&ctx);
1,122✔
2161
        if (TSDB_CODE_SUCCESS != ctx.code) {
1,122✔
2162
          nodesDestroyNode(tmp);
×
2163
          code = ctx.code;
×
2164
          goto _error;
×
2165
        }
2166
        code = genTagFilterDigest(tmp, &context);
1,122✔
2167
        nodesDestroyNode(tmp);
1,122✔
2168
      } else {
2169
        code = genTagFilterDigest(pTagCond, &context);
25,865✔
2170
      }
2171
      // try to retrieve the result from meta cache
2172
      QUERY_CHECK_CODE(code, lino, _error);      
26,987✔
2173
      code = pStorageAPI->metaFn.getCachedTableList(
26,987✔
2174
        pVnode, pScanNode->suid, context.digest,
26,987✔
2175
        tListLen(context.digest), pUidList, &acquired);
2176
      QUERY_CHECK_CODE(code, lino, _error);
7,747✔
2177
    }
2178
    if (acquired) {
101,508,094✔
2179
      taosArrayDestroy(pTagColIds);
25,313✔
2180
      pTagColIds = NULL;
25,313✔
2181
      
2182
      digest[0] = 1;
25,313✔
2183
      memcpy(
50,626✔
2184
        digest + 1, context.digest, tListLen(context.digest));
25,313✔
2185
      qDebug("suid:%" PRIu64 ", %s retrieve table uid list from cache,"
25,313✔
2186
        " numOfTables:%d", 
2187
        pScanNode->suid, idstr, (int32_t)taosArrayGetSize(pUidList));
2188
      goto _end;
25,313✔
2189
    } else {
2190
      qDebug("suid:%" PRIu64 
101,482,781✔
2191
        ", failed to get table uid list from cache", pScanNode->suid);
2192
    }
2193

2194
    if (!pTagCond) {  // no tag filter condition exists, let's fetch all tables of this super table
101,624,649✔
2195
      code = pStorageAPI->metaFn.getChildTableList(pVnode, pScanNode->suid, pUidList);
89,834,622✔
2196
      QUERY_CHECK_CODE(code, lino, _error);
89,835,627✔
2197
      qTrace("no tag filter, get all child tables, numOfTables:%d", (int32_t)taosArrayGetSize(pUidList));
89,835,627✔
2198
    } else {
2199
      // failed to find the result in the cache, let try to calculate the results
2200
      if (pTagIndexCond) {
11,790,027✔
2201
        void* pIndex = pStorageAPI->metaFn.getInvertIndex(pVnode);
3,801,172✔
2202

2203
        SIndexMetaArg metaArg = {.metaEx = pVnode,
3,801,198✔
2204
                                 .idx = pStorageAPI->metaFn.storeGetIndexInfo(pVnode),
3,801,172✔
2205
                                 .ivtIdx = pIndex,
2206
                                 .suid = pScanNode->uid};
3,801,172✔
2207

2208
        status = SFLT_NOT_INDEX;
3,801,172✔
2209
        code = doFilterTag(pTagIndexCond, &metaArg, pUidList, &status, &pStorageAPI->metaFilter);
3,801,172✔
2210
        if (code != 0 || status == SFLT_NOT_INDEX) {  // temporarily disable it for performance sake
3,795,086✔
2211
          qDebug("failed to get tableIds from index, suid:%" PRIu64 ", uidListSize:%d", pScanNode->uid, (int32_t)taosArrayGetSize(pUidList));
1,002,828✔
2212
        } else {
2213
          qDebug("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
2,792,258✔
2214
        }
2215
      }
2216
    }
2217
    qTrace("after index filter, pTagCond:%p uidListSize:%d", pTagCond, (int32_t)taosArrayGetSize(pUidList));
101,608,630✔
2218
    code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status,
101,626,976✔
2219
      pStorageAPI, tsTagFilterCache || tsStableTagFilterCache,
101,626,976✔
2220
      &listAdded, pStreamInfo);
2221
    QUERY_CHECK_CODE(code, lino, _error);
101,601,771✔
2222

2223
    // let's add the filter results into meta-cache
2224
    numOfTables = taosArrayGetSize(pUidList);
101,600,895✔
2225

2226
    if (canCacheTagEqCondFilter) {
101,613,978✔
2227
      qInfo("%s, suid:%" PRIu64 ", add uid list to stable tag filter cache, "
3,961✔
2228
            "uidListSize:%d, origin key:%" PRIu64 ",%" PRIu64,
2229
            idstr, pScanNode->suid, (int32_t)numOfTables,
2230
            *(uint64_t*)context.digest, *(uint64_t*)(context.digest + 8));
2231

2232
      code = pStorageAPI->metaFn.putStableCachedTableList(
3,961✔
2233
        pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
2234
        context.digest, tListLen(context.digest),
2235
        pUidList, &pTagColIds);
2236
      QUERY_CHECK_CODE(code, lino, _end);
3,948✔
2237

2238
      digest[0] = 1;
3,948✔
2239
      memcpy(digest + 1, context.digest, tListLen(context.digest));
3,948✔
2240
    } else if (tsTagFilterCache) {
101,610,017✔
2241
      qInfo("%s, suid:%" PRIu64 ", add uid list to normal tag filter cache, "
6,750✔
2242
            "uidListSize:%d, origin key:%" PRIu64 ",%" PRIu64,
2243
            idstr, pScanNode->suid, (int32_t)numOfTables,
2244
            *(uint64_t*)context.digest, *(uint64_t*)(context.digest + 8));
2245
      size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
6,750✔
2246
      pPayload = taosMemoryMalloc(size);
6,750✔
2247
      QUERY_CHECK_NULL(pPayload, code, lino, _end, terrno);
6,750✔
2248

2249
      *(int32_t*)pPayload = (int32_t)numOfTables;
6,750✔
2250
      if (numOfTables > 0) {
6,750✔
2251
        void* tmp = taosArrayGet(pUidList, 0);
5,579✔
2252
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
5,579✔
2253
        memcpy(pPayload + sizeof(int32_t), tmp, numOfTables * sizeof(uint64_t));
5,579✔
2254
      }
2255

2256
      code = pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid,
6,750✔
2257
                                                    context.digest,
2258
                                                    tListLen(context.digest),
2259
                                                    pPayload, size, 1);
2260
      if (TSDB_CODE_SUCCESS == code) {
6,750✔
2261
        /*
2262
          data referenced by pPayload is used in lru cache,
2263
          reset pPayload to NULL to avoid being freed in _error block
2264
        */
2265
        pPayload = NULL;
6,609✔
2266
      } else {
2267
        if (TSDB_CODE_DUP_KEY == code) {
141✔
2268
          /*
2269
            another thread has already put the same key into cache,
2270
            we can just ignore this error
2271
          */
2272
          code = TSDB_CODE_SUCCESS;
141✔
2273
        }
2274
        QUERY_CHECK_CODE(code, lino, _end);
141✔
2275
      }
2276

2277

2278
      digest[0] = 1;
6,750✔
2279
      memcpy(digest + 1, context.digest, tListLen(context.digest));
6,750✔
2280
    }
2281
  }
2282

2283
_end:
238,625,576✔
2284
  if (!listAdded) {
238,647,281✔
2285
    numOfTables = taosArrayGetSize(pUidList);
228,179,132✔
2286
    for (int i = 0; i < numOfTables; i++) {
759,459,605✔
2287
      void* tmp = taosArrayGet(pUidList, i);
531,233,730✔
2288
      QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
531,281,564✔
2289
      STableKeyInfo info = {.uid = *(uint64_t*)tmp, .groupId = 0};
531,281,564✔
2290

2291
      void* p = taosArrayPush(pListInfo->pTableList, &info);
531,266,845✔
2292
      if (p == NULL) {
531,334,679✔
2293
        taosArrayDestroy(pUidList);
×
2294
        return terrno;
×
2295
      }
2296

2297
      qTrace("tagfilter get uid:%" PRIu64 ", %s", info.uid, idstr);
531,334,679✔
2298
    }
2299
  }
2300

2301
  qDebug("%s, table list with %d uids built", idstr, (int32_t)numOfTables);
238,694,024✔
2302

2303
_error:
238,704,195✔
2304
  taosArrayDestroy(pUidList);
238,713,448✔
2305
  taosArrayDestroy(pTagColIds);
238,676,928✔
2306
  taosMemFreeClear(pTagCondKey);
238,688,189✔
2307
  taosMemFreeClear(pPayload);
238,688,189✔
2308
  if (code != TSDB_CODE_SUCCESS) {
238,688,189✔
2309
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
876✔
2310
  }
2311
  return code;
238,634,034✔
2312
}
2313

2314
int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList, void* pTaskInfo) {
18,918✔
2315
  int32_t        code = TSDB_CODE_SUCCESS;
18,918✔
2316
  int32_t        lino = 0;
18,918✔
2317
  SSubplan*      pSubplan = (SSubplan*)node;
18,918✔
2318
  SScanPhysiNode pNode = {0};
18,918✔
2319
  pNode.suid = suid;
18,918✔
2320
  pNode.uid = suid;
18,918✔
2321
  pNode.tableType = TSDB_SUPER_TABLE;
18,918✔
2322

2323
  STableListInfo* pTableListInfo = tableListCreate();
18,918✔
2324
  QUERY_CHECK_NULL(pTableListInfo, code, lino, _end, terrno);
18,918✔
2325
  uint8_t digest[17] = {0};
18,918✔
2326
  code = getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL,
18,918✔
2327
                      pTableListInfo, digest, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI, NULL);
2328
  QUERY_CHECK_CODE(code, lino, _end);
18,918✔
2329
  *tableList = pTableListInfo->pTableList;
18,918✔
2330
  pTableListInfo->pTableList = NULL;
18,918✔
2331
  tableListDestroy(pTableListInfo);
18,918✔
2332

2333
_end:
18,918✔
2334
  if (code != TSDB_CODE_SUCCESS) {
18,918✔
2335
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2336
  }
2337
  return code;
18,918✔
2338
}
2339

2340
size_t getTableTagsBufLen(const SNodeList* pGroups) {
×
2341
  size_t keyLen = 0;
×
2342

2343
  SNode* node;
2344
  FOREACH(node, pGroups) {
×
2345
    SExprNode* pExpr = (SExprNode*)node;
×
2346
    keyLen += pExpr->resType.bytes;
×
2347
  }
2348

2349
  keyLen += sizeof(int8_t) * LIST_LENGTH(pGroups);
×
2350
  return keyLen;
×
2351
}
2352

2353
int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId,
×
2354
                              SStorageAPI* pAPI) {
2355
  SMetaReader mr = {0};
×
2356

2357
  pAPI->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &pAPI->metaFn);
×
2358
  if (pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid) != 0) {  // table not exist
×
2359
    pAPI->metaReaderFn.clearReader(&mr);
×
2360
    return TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
2361
  }
2362

2363
  SNodeList* groupNew = NULL;
×
2364
  int32_t    code = nodesCloneList(pGroupNode, &groupNew);
×
2365
  if (TSDB_CODE_SUCCESS != code) {
×
2366
    pAPI->metaReaderFn.clearReader(&mr);
×
2367
    return code;
×
2368
  }
2369

2370
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
×
2371
  nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &ctx);
×
2372
  if (TSDB_CODE_SUCCESS != ctx.code) {
×
2373
    nodesDestroyList(groupNew);
×
2374
    pAPI->metaReaderFn.clearReader(&mr);
×
2375
    return code;
×
2376
  }
2377
  char* isNull = (char*)keyBuf;
×
2378
  char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(pGroupNode);
×
2379

2380
  SNode*  pNode;
2381
  int32_t index = 0;
×
2382
  FOREACH(pNode, groupNew) {
×
2383
    SNode*  pNew = NULL;
×
2384
    int32_t code = scalarCalculateConstants(pNode, &pNew);
×
2385
    if (TSDB_CODE_SUCCESS == code) {
×
2386
      REPLACE_NODE(pNew);
×
2387
    } else {
2388
      nodesDestroyList(groupNew);
×
2389
      pAPI->metaReaderFn.clearReader(&mr);
×
2390
      return code;
×
2391
    }
2392

2393
    if (nodeType(pNew) != QUERY_NODE_VALUE) {
×
2394
      nodesDestroyList(groupNew);
×
2395
      pAPI->metaReaderFn.clearReader(&mr);
×
2396
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
2397
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2398
    }
2399
    SValueNode* pValue = (SValueNode*)pNew;
×
2400

2401
    if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
×
2402
      isNull[index++] = 1;
×
2403
      continue;
×
2404
    } else {
2405
      isNull[index++] = 0;
×
2406
      char* data = nodesGetValueFromNode(pValue);
×
2407
      if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
×
2408
        if (tTagIsJson(data)) {
×
2409
          terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
×
2410
          nodesDestroyList(groupNew);
×
2411
          pAPI->metaReaderFn.clearReader(&mr);
×
2412
          return terrno;
×
2413
        }
2414
        int32_t len = getJsonValueLen(data);
×
2415
        memcpy(pStart, data, len);
×
2416
        pStart += len;
×
2417
      } else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
×
2418
        if (IS_STR_DATA_BLOB(pValue->node.resType.type)) {
×
2419
          return TSDB_CODE_BLOB_NOT_SUPPORT_TAG;
×
2420
        }
2421
        memcpy(pStart, data, varDataTLen(data));
×
2422
        pStart += varDataTLen(data);
×
2423
      } else {
2424
        memcpy(pStart, data, pValue->node.resType.bytes);
×
2425
        pStart += pValue->node.resType.bytes;
×
2426
      }
2427
    }
2428
  }
2429

2430
  int32_t len = (int32_t)(pStart - (char*)keyBuf);
×
2431
  *pGroupId = calcGroupId(keyBuf, len);
×
2432

2433
  nodesDestroyList(groupNew);
×
2434
  pAPI->metaReaderFn.clearReader(&mr);
×
2435

2436
  return TSDB_CODE_SUCCESS;
×
2437
}
2438

2439
SArray* makeColumnArrayFromList(SNodeList* pNodeList) {
7,115,252✔
2440
  if (!pNodeList) {
7,115,252✔
2441
    return NULL;
×
2442
  }
2443

2444
  size_t  numOfCols = LIST_LENGTH(pNodeList);
7,115,252✔
2445
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
7,116,779✔
2446
  if (pList == NULL) {
7,115,091✔
2447
    return NULL;
×
2448
  }
2449

2450
  for (int32_t i = 0; i < numOfCols; ++i) {
16,280,771✔
2451
    SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
9,165,856✔
2452
    if (!pColNode) {
9,166,874✔
2453
      taosArrayDestroy(pList);
×
2454
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
2455
      return NULL;
×
2456
    }
2457

2458
    // todo extract method
2459
    SColumn c = {0};
9,166,874✔
2460
    c.slotId = pColNode->slotId;
9,166,698✔
2461
    c.colId = pColNode->colId;
9,166,698✔
2462
    c.type = pColNode->node.resType.type;
9,165,856✔
2463
    c.bytes = pColNode->node.resType.bytes;
9,165,172✔
2464
    c.precision = pColNode->node.resType.precision;
9,165,852✔
2465
    c.scale = pColNode->node.resType.scale;
9,165,006✔
2466

2467
    void* tmp = taosArrayPush(pList, &c);
9,166,698✔
2468
    if (!tmp) {
9,166,698✔
2469
      taosArrayDestroy(pList);
×
2470
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
2471
      return NULL;
×
2472
    }
2473
  }
2474

2475
  return pList;
7,114,915✔
2476
}
2477

2478
int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
307,203,796✔
2479
                            int32_t type, SColMatchInfo* pMatchInfo) {
2480
  size_t  numOfCols = LIST_LENGTH(pNodeList);
307,203,796✔
2481
  int32_t code = TSDB_CODE_SUCCESS;
307,238,557✔
2482
  int32_t lino = 0;
307,238,557✔
2483

2484
  pMatchInfo->matchType = type;
307,238,557✔
2485

2486
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchItem));
307,226,303✔
2487
  if (pList == NULL) {
307,128,772✔
2488
    code = terrno;
×
2489
    return code;
×
2490
  }
2491

2492
  for (int32_t i = 0; i < numOfCols; ++i) {
1,330,820,491✔
2493
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
1,023,586,464✔
2494
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
1,023,684,184✔
2495
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
1,023,684,184✔
2496
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
1,014,666,823✔
2497

2498
      SColMatchItem c = {.needOutput = true};
1,014,695,330✔
2499
      c.colId = pColNode->colId;
1,014,707,337✔
2500
      c.srcSlotId = pColNode->slotId;
1,014,687,405✔
2501
      c.dstSlotId = pNode->slotId;
1,014,704,408✔
2502
      c.isPk = pColNode->isPk;
1,014,698,819✔
2503
      c.dataType = pColNode->node.resType;
1,014,671,780✔
2504
      void* tmp = taosArrayPush(pList, &c);
1,014,750,320✔
2505
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,014,750,320✔
2506
    }
2507
  }
2508

2509
  // set the output flag for each column in SColMatchInfo, according to the
2510
  *numOfOutputCols = 0;
307,234,027✔
2511
  int32_t num = LIST_LENGTH(pOutputNodeList->pSlots);
307,281,885✔
2512
  for (int32_t i = 0; i < num; ++i) {
1,527,911,415✔
2513
    SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i);
1,220,641,460✔
2514
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
1,220,722,302✔
2515

2516
    // todo: add reserve flag check
2517
    // it is a column reserved for the arithmetic expression calculation
2518
    if (pNode->slotId >= numOfCols) {
1,220,722,302✔
2519
      (*numOfOutputCols) += 1;
197,111,855✔
2520
      continue;
197,110,424✔
2521
    }
2522

2523
    SColMatchItem* info = NULL;
1,023,667,263✔
2524
    for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
2,147,483,647✔
2525
      info = taosArrayGet(pList, j);
2,147,483,647✔
2526
      QUERY_CHECK_NULL(info, code, lino, _end, terrno);
2,147,483,647✔
2527
      if (info->dstSlotId == pNode->slotId) {
2,147,483,647✔
2528
        break;
1,013,857,442✔
2529
      }
2530
    }
2531

2532
    if (pNode->output) {
15,636,075✔
2533
      (*numOfOutputCols) += 1;
1,012,399,068✔
2534
    } else if (info != NULL) {
11,177,238✔
2535
      // select distinct tbname from stb where tbname='abc';
2536
      info->needOutput = false;
11,194,981✔
2537
    }
2538
  }
2539

2540
  pMatchInfo->pList = pList;
307,269,955✔
2541

2542
_end:
307,294,672✔
2543
  if (code != TSDB_CODE_SUCCESS) {
307,294,672✔
2544
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2545
  }
2546
  return code;
307,174,773✔
2547
}
2548

2549
static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision,
1,054,366,379✔
2550
                                  const char* name) {
2551
  SResSchema s = {0};
1,054,366,379✔
2552
  s.scale = scale;
1,054,425,878✔
2553
  s.type = type;
1,054,425,878✔
2554
  s.bytes = bytes;
1,054,425,878✔
2555
  s.slotId = slotId;
1,054,425,878✔
2556
  s.precision = precision;
1,054,425,878✔
2557
  tstrncpy(s.name, name, tListLen(s.name));
1,054,425,878✔
2558

2559
  return s;
1,054,425,878✔
2560
}
2561

2562
static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType, EColumnType colType) {
981,281,458✔
2563
  SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn));
981,281,458✔
2564
  if (pCol == NULL) {
980,860,154✔
2565
    return NULL;
×
2566
  }
2567

2568
  pCol->slotId = slotId;
980,860,154✔
2569
  pCol->colId = colId;
980,860,844✔
2570
  pCol->bytes = pType->bytes;
980,894,094✔
2571
  pCol->type = pType->type;
980,999,064✔
2572
  pCol->scale = pType->scale;
981,076,061✔
2573
  pCol->precision = pType->precision;
981,139,033✔
2574
  pCol->dataBlockId = blockId;
981,265,769✔
2575
  pCol->colType = colType;
981,241,033✔
2576
  return pCol;
981,270,492✔
2577
}
2578

2579
int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
1,058,127,016✔
2580
  int32_t code = TSDB_CODE_SUCCESS;
1,058,127,016✔
2581
  int32_t lino = 0;
1,058,127,016✔
2582
  pExp->base.numOfParams = 0;
1,058,127,016✔
2583
  pExp->base.pParam = NULL;
1,058,223,801✔
2584
  pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode));
1,058,113,491✔
2585
  QUERY_CHECK_NULL(pExp->pExpr, code, lino, _end, terrno);
1,057,700,176✔
2586

2587
  pExp->pExpr->_function.num = 1;
1,057,780,999✔
2588
  pExp->pExpr->_function.functionId = -1;
1,057,738,382✔
2589

2590
  int32_t type = nodeType(pNode);
1,058,078,951✔
2591
  // it is a project query, or group by column
2592
  if (type == QUERY_NODE_COLUMN) {
1,058,158,515✔
2593
    pExp->pExpr->nodeType = QUERY_NODE_COLUMN;
566,009,865✔
2594
    SColumnNode* pColNode = (SColumnNode*)pNode;
566,022,965✔
2595

2596
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
566,022,965✔
2597
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
565,878,752✔
2598

2599
    pExp->base.numOfParams = 1;
565,836,807✔
2600

2601
    SDataType* pType = &pColNode->node.resType;
565,917,499✔
2602
    pExp->base.resSchema =
2603
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pColNode->colName);
565,840,617✔
2604

2605
    pExp->base.pParam[0].pCol =
1,131,838,524✔
2606
        createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType);
1,131,955,926✔
2607
    QUERY_CHECK_NULL(pExp->base.pParam[0].pCol, code, lino, _end, terrno);
565,881,996✔
2608

2609
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
565,866,355✔
2610
  } else if (type == QUERY_NODE_VALUE) {
492,148,650✔
2611
    pExp->pExpr->nodeType = QUERY_NODE_VALUE;
21,379,482✔
2612
    SValueNode* pValNode = (SValueNode*)pNode;
21,382,774✔
2613

2614
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
21,382,774✔
2615
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
21,379,465✔
2616

2617
    pExp->base.numOfParams = 1;
21,373,719✔
2618

2619
    SDataType* pType = &pValNode->node.resType;
21,379,251✔
2620
    pExp->base.resSchema =
2621
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName);
21,374,825✔
2622
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
21,378,413✔
2623
    code = nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
21,378,896✔
2624
    QUERY_CHECK_CODE(code, lino, _end);
21,380,470✔
2625
  } else if (type == QUERY_NODE_REMOTE_VALUE) {
470,769,168✔
2626
    SRemoteValueNode* pRemote = (SRemoteValueNode*)pNode;
22,524,771✔
2627
    code = qFetchRemoteValue(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, pRemote);
22,524,771✔
2628
    QUERY_CHECK_CODE(code, lino, _end);
22,533,089✔
2629

2630
    pExp->pExpr->nodeType = QUERY_NODE_VALUE;
18,681,377✔
2631
    SValueNode* pValNode = (SValueNode*)pNode;
18,681,813✔
2632

2633
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
18,681,813✔
2634
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
18,680,941✔
2635

2636
    pExp->base.numOfParams = 1;
18,680,941✔
2637

2638
    SDataType* pType = &pValNode->node.resType;
18,681,377✔
2639
    pExp->base.resSchema =
2640
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName);
18,681,813✔
2641
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
18,681,813✔
2642
    code = nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
18,681,813✔
2643
    QUERY_CHECK_CODE(code, lino, _end);
18,680,941✔
2644
  } else if (type == QUERY_NODE_FUNCTION) {
448,244,397✔
2645
    pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
412,962,528✔
2646
    SFunctionNode* pFuncNode = (SFunctionNode*)pNode;
412,981,667✔
2647

2648
    SDataType* pType = &pFuncNode->node.resType;
412,981,667✔
2649
    pExp->base.resSchema =
2650
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pFuncNode->node.aliasName);
412,973,062✔
2651
    tExprNode* pExprNode = pExp->pExpr;
412,968,631✔
2652

2653
    pExprNode->_function.functionId = pFuncNode->funcId;
412,972,904✔
2654
    pExprNode->_function.pFunctNode = pFuncNode;
412,969,473✔
2655
    pExprNode->_function.functionType = pFuncNode->funcType;
412,967,002✔
2656

2657
    tstrncpy(pExprNode->_function.functionName, pFuncNode->functionName, tListLen(pExprNode->_function.functionName));
412,955,424✔
2658

2659
    pExp->base.pParamList = pFuncNode->pParameterList;
412,982,368✔
2660
#if 1
2661
    // todo refactor: add the parameter for tbname function
2662
    const char* name = "tbname";
412,925,949✔
2663
    int32_t     len = strlen(name);
412,925,949✔
2664

2665
    if (!pFuncNode->pParameterList && (memcmp(pExprNode->_function.functionName, name, len) == 0) &&
412,925,949✔
2666
        pExprNode->_function.functionName[len] == 0) {
27,663,488✔
2667
      pFuncNode->pParameterList = NULL;
27,652,275✔
2668
      int32_t     code = nodesMakeList(&pFuncNode->pParameterList);
27,649,854✔
2669
      SValueNode* res = NULL;
27,657,959✔
2670
      if (TSDB_CODE_SUCCESS == code) {
27,657,959✔
2671
        code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
27,657,959✔
2672
      }
2673
      QUERY_CHECK_CODE(code, lino, _end);
27,662,334✔
2674
      res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
27,662,334✔
2675
      code = nodesListAppend(pFuncNode->pParameterList, (SNode*)res);
27,656,482✔
2676
      if (code != TSDB_CODE_SUCCESS) {
27,660,522✔
2677
        nodesDestroyNode((SNode*)res);
×
2678
        res = NULL;
×
2679
      }
2680
      QUERY_CHECK_CODE(code, lino, _end);
27,660,522✔
2681
    }
2682
#endif
2683

2684
    int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
413,087,116✔
2685

2686
    pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam));
412,985,369✔
2687
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
412,888,349✔
2688
    pExp->base.numOfParams = numOfParam;
412,864,314✔
2689

2690
    for (int32_t j = 0; j < numOfParam && TSDB_CODE_SUCCESS == code; ++j) {
996,354,744✔
2691
      SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
583,698,496✔
2692
      QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
583,763,057✔
2693
      if (p1->type == QUERY_NODE_COLUMN) {
583,763,057✔
2694
        SColumnNode* pcn = (SColumnNode*)p1;
415,331,346✔
2695

2696
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
415,331,346✔
2697
        pExp->base.pParam[j].pCol =
830,593,778✔
2698
            createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType);
830,641,443✔
2699
        QUERY_CHECK_NULL(pExp->base.pParam[j].pCol, code, lino, _end, terrno);
415,309,642✔
2700
      } else if (p1->type == QUERY_NODE_VALUE) {
168,432,309✔
2701
        SValueNode* pvn = (SValueNode*)p1;
115,270,693✔
2702
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
115,270,693✔
2703
        code = nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
115,262,882✔
2704
        QUERY_CHECK_CODE(code, lino, _end);
115,254,946✔
2705
      } else if (p1->type == QUERY_NODE_REMOTE_VALUE) {
53,202,854✔
2706
        SRemoteValueNode* pRemote = (SRemoteValueNode*)p1;
1,144,667✔
2707
        code = qFetchRemoteValue(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, pRemote);
1,144,667✔
2708
        QUERY_CHECK_CODE(code, lino, _end);
1,144,667✔
2709

2710
        SValueNode* pvn = (SValueNode*)pRemote;
968,568✔
2711
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
968,568✔
2712
        code = nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
968,568✔
2713
        QUERY_CHECK_CODE(code, lino, _end);
901,218✔
2714
      }
2715
    }
2716
    pExp->pExpr->_function.bindExprID = ((SExprNode*)pNode)->bindExprID;
412,656,248✔
2717
  } else if (type == QUERY_NODE_OPERATOR) {
35,281,869✔
2718
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
30,753,870✔
2719
    SOperatorNode* pOpNode = (SOperatorNode*)pNode;
30,747,155✔
2720

2721
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
30,747,155✔
2722
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
30,737,429✔
2723
    pExp->base.numOfParams = 1;
30,734,498✔
2724

2725
    SDataType* pType = &pOpNode->node.resType;
30,740,233✔
2726
    pExp->base.resSchema =
2727
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pOpNode->node.aliasName);
30,737,180✔
2728
    pExp->pExpr->_optrRoot.pRootNode = pNode;
30,744,783✔
2729
  } else if (type == QUERY_NODE_CASE_WHEN) {
4,531,998✔
2730
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
4,587,595✔
2731
    SCaseWhenNode* pCaseNode = (SCaseWhenNode*)pNode;
4,587,595✔
2732

2733
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
4,587,595✔
2734
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
4,587,595✔
2735
    pExp->base.numOfParams = 1;
4,587,595✔
2736

2737
    SDataType* pType = &pCaseNode->node.resType;
4,587,595✔
2738
    pExp->base.resSchema =
2739
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCaseNode->node.aliasName);
4,587,595✔
2740
    pExp->pExpr->_optrRoot.pRootNode = pNode;
4,587,595✔
2741
  } else if (type == QUERY_NODE_LOGIC_CONDITION) {
×
2742
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
2,974✔
2743
    SLogicConditionNode* pCond = (SLogicConditionNode*)pNode;
2,974✔
2744
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
2,974✔
2745
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
2,974✔
2746
    pExp->base.numOfParams = 1;
2,974✔
2747
    SDataType* pType = &pCond->node.resType;
2,974✔
2748
    pExp->base.resSchema =
2749
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName);
2,974✔
2750
    pExp->pExpr->_optrRoot.pRootNode = pNode;
2,974✔
2751
  } else {
2752
    code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2753
    QUERY_CHECK_CODE(code, lino, _end);
×
2754
  }
2755
  pExp->pExpr->relatedTo = ((SExprNode*)pNode)->relatedTo;
1,053,994,960✔
2756
_end:
1,058,046,404✔
2757
  if (code != TSDB_CODE_SUCCESS) {
1,058,046,404✔
2758
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
4,027,811✔
2759
  }
2760
  return code;
1,057,971,606✔
2761
}
2762

2763
int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) {
1,058,119,984✔
2764
  return createExprFromOneNode(pExp, pTargetNode->pExpr, pTargetNode->slotId);
1,058,119,984✔
2765
}
2766

2767
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
×
2768
  *numOfExprs = LIST_LENGTH(pNodeList);
×
2769
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
×
2770
  if (!pExprs) {
×
2771
    return NULL;
×
2772
  }
2773

2774
  for (int32_t i = 0; i < (*numOfExprs); ++i) {
×
2775
    SExprInfo* pExp = &pExprs[i];
×
2776
    int32_t    code = createExprFromOneNode(pExp, nodesListGetNode(pNodeList, i), i + UD_TAG_COLUMN_INDEX);
×
2777
    if (code != TSDB_CODE_SUCCESS) {
×
2778
      taosMemoryFreeClear(pExprs);
×
2779
      terrno = code;
×
2780
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2781
      return NULL;
×
2782
    }
2783
  }
2784

2785
  return pExprs;
×
2786
}
2787

2788
int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs) {
430,291,661✔
2789
  QRY_PARAM_CHECK(pExprInfo);
430,291,661✔
2790

2791
  int32_t code = 0;
430,343,509✔
2792
  int32_t numOfFuncs = LIST_LENGTH(pNodeList);
430,343,509✔
2793
  int32_t numOfGroupKeys = 0;
430,298,568✔
2794
  if (pGroupKeys != NULL) {
430,298,568✔
2795
    numOfGroupKeys = LIST_LENGTH(pGroupKeys);
40,423,296✔
2796
  }
2797

2798
  *numOfExprs = numOfFuncs + numOfGroupKeys;
430,298,901✔
2799
  if (*numOfExprs == 0) {
430,291,302✔
2800
    return code;
49,997,295✔
2801
  }
2802

2803
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
380,317,597✔
2804
  if (pExprs == NULL) {
380,026,835✔
2805
    return terrno;
×
2806
  }
2807

2808
  for (int32_t i = 0; i < (*numOfExprs); ++i) {
1,433,893,348✔
2809
    STargetNode* pTargetNode = NULL;
1,057,843,041✔
2810
    if (i < numOfFuncs) {
1,057,843,041✔
2811
      pTargetNode = (STargetNode*)nodesListGetNode(pNodeList, i);
986,320,197✔
2812
    } else {
2813
      pTargetNode = (STargetNode*)nodesListGetNode(pGroupKeys, i - numOfFuncs);
71,522,844✔
2814
    }
2815
    if (!pTargetNode) {
1,058,083,847✔
2816
      destroyExprInfo(pExprs, *numOfExprs);
×
2817
      taosMemoryFreeClear(pExprs);
×
2818
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
2819
      return terrno;
×
2820
    }
2821

2822
    SExprInfo* pExp = &pExprs[i];
1,058,083,847✔
2823
    code = createExprFromTargetNode(pExp, pTargetNode);
1,058,122,305✔
2824
    if (code != TSDB_CODE_SUCCESS) {
1,057,894,324✔
2825
      destroyExprInfo(pExprs, *numOfExprs);
4,027,811✔
2826
      taosMemoryFreeClear(pExprs);
4,027,811✔
2827
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
4,027,811✔
2828
      return code;
4,027,811✔
2829
    }
2830
  }
2831

2832
  *pExprInfo = pExprs;
376,203,683✔
2833
  return code;
376,235,053✔
2834
}
2835

2836
static void deleteSubsidiareCtx(void* pData) {
×
2837
  SSubsidiaryResInfo* pCtx = (SSubsidiaryResInfo*)pData;
×
2838
  if (pCtx->pCtx) {
×
2839
    taosMemoryFreeClear(pCtx->pCtx);
×
2840
  }
2841
}
×
2842

2843
// set the output buffer for the selectivity + tag query
2844
static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
409,343,811✔
2845
  int32_t num = 0;
409,343,811✔
2846
  int32_t code = TSDB_CODE_SUCCESS;
409,343,811✔
2847
  int32_t lino = 0;
409,343,811✔
2848

2849
  SArray* pValCtxArray = NULL;
409,343,811✔
2850
  for (int32_t i = numOfOutput - 1; i > 0; --i) {  // select Func is at the end of the list
1,081,583,317✔
2851
    int32_t funcIdx = pCtx[i].pExpr->pExpr->_function.bindExprID;
672,279,700✔
2852
    if (funcIdx > 0) {
672,312,614✔
2853
      if (pValCtxArray == NULL) {
1,416,192✔
2854
        // the end of the list is the select function of biggest index
2855
        pValCtxArray = taosArrayInit_s(sizeof(SSubsidiaryResInfo*), funcIdx);
1,016,853✔
2856
        if (pValCtxArray == NULL) {
1,016,467✔
2857
          return terrno;
×
2858
        }
2859
      }
2860
      if (funcIdx > pValCtxArray->size) {
1,415,806✔
2861
        qError("funcIdx:%d is out of range", funcIdx);
×
2862
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2863
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2864
      }
2865
      SSubsidiaryResInfo* pSubsidiary = &pCtx[i].subsidiaries;
1,415,806✔
2866
      pSubsidiary->pCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
1,416,192✔
2867
      if (pSubsidiary->pCtx == NULL) {
1,416,192✔
2868
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2869
        return terrno;
×
2870
      }
2871
      pSubsidiary->num = 0;
1,415,806✔
2872
      taosArraySet(pValCtxArray, funcIdx - 1, &pSubsidiary);
1,415,806✔
2873
    }
2874
  }
2875

2876
  SqlFunctionCtx*  p = NULL;
409,303,617✔
2877
  SqlFunctionCtx** pValCtx = NULL;
409,303,617✔
2878
  if (pValCtxArray == NULL) {
409,303,617✔
2879
    pValCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
408,346,399✔
2880
    if (pValCtx == NULL) {
408,425,099✔
2881
      QUERY_CHECK_CODE(terrno, lino, _end);
×
2882
    }
2883
  }
2884

2885
  for (int32_t i = 0; i < numOfOutput; ++i) {
1,450,027,856✔
2886
    const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
1,040,544,208✔
2887
    if ((strcmp(pName, "_select_value") == 0)) {
1,040,659,416✔
2888
      if (pValCtxArray == NULL) {
6,747,802✔
2889
        pValCtx[num++] = &pCtx[i];
4,760,489✔
2890
      } else {
2891
        int32_t bindFuncIndex = pCtx[i].pExpr->pExpr->relatedTo;  // start from index 1;
1,987,313✔
2892
        if (bindFuncIndex > 0) {                                  // 0 is default index related to the select function
1,987,699✔
2893
          bindFuncIndex -= 1;
1,941,379✔
2894
        }
2895
        SSubsidiaryResInfo** pSubsidiary = taosArrayGet(pValCtxArray, bindFuncIndex);
1,987,699✔
2896
        if (pSubsidiary == NULL) {
1,987,313✔
2897
          QUERY_CHECK_CODE(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, lino, _end);
×
2898
        }
2899
        (*pSubsidiary)->pCtx[(*pSubsidiary)->num] = &pCtx[i];
1,987,313✔
2900
        (*pSubsidiary)->num++;
1,987,313✔
2901
      }
2902
    } else if (fmIsSelectFunc(pCtx[i].functionId)) {
1,033,911,614✔
2903
      if (pValCtxArray == NULL) {
77,463,836✔
2904
        p = &pCtx[i];
75,735,805✔
2905
      }
2906
    }
2907
  }
2908

2909
  if (p != NULL) {
409,483,648✔
2910
    p->subsidiaries.pCtx = pValCtx;
29,636,350✔
2911
    p->subsidiaries.num = num;
29,638,358✔
2912
  } else {
2913
    taosMemoryFreeClear(pValCtx);
379,847,298✔
2914
  }
2915

2916
_end:
1,125,954✔
2917
  if (code != TSDB_CODE_SUCCESS) {
409,355,302✔
2918
    taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2919
    taosMemoryFreeClear(pValCtx);
×
2920
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2921
  } else {
2922
    taosArrayDestroy(pValCtxArray);
409,355,302✔
2923
  }
2924
  return code;
409,323,123✔
2925
}
2926

2927
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset,
409,412,544✔
2928
                                     SFunctionStateStore* pStore) {
2929
  int32_t         code = TSDB_CODE_SUCCESS;
409,412,544✔
2930
  int32_t         lino = 0;
409,412,544✔
2931
  SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
409,412,544✔
2932
  if (pFuncCtx == NULL) {
409,255,360✔
2933
    return NULL;
×
2934
  }
2935

2936
  *rowEntryInfoOffset = taosMemoryCalloc(numOfOutput, sizeof(int32_t));
409,255,360✔
2937
  if (*rowEntryInfoOffset == 0) {
409,413,164✔
2938
    taosMemoryFreeClear(pFuncCtx);
×
2939
    return NULL;
×
2940
  }
2941

2942
  for (int32_t i = 0; i < numOfOutput; ++i) {
1,449,961,443✔
2943
    SExprInfo* pExpr = &pExprInfo[i];
1,040,589,696✔
2944

2945
    SExprBasicInfo* pFunct = &pExpr->base;
1,040,668,516✔
2946
    SqlFunctionCtx* pCtx = &pFuncCtx[i];
1,040,715,429✔
2947

2948
    pCtx->functionId = -1;
1,040,738,781✔
2949
    pCtx->pExpr = pExpr;
1,040,746,054✔
2950

2951
    if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) {
1,040,693,743✔
2952
      SFuncExecEnv env = {0};
411,977,090✔
2953
      pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
411,986,469✔
2954
      pCtx->isPseudoFunc = fmIsWindowPseudoColumnFunc(pCtx->functionId) || fmIsPlaceHolderFunc(pCtx->functionId);
411,991,975✔
2955
      pCtx->isNotNullFunc = fmIsNotNullOutputFunc(pCtx->functionId);
411,974,463✔
2956

2957
      bool isUdaf = fmIsUserDefinedFunc(pCtx->functionId);
411,999,731✔
2958
      if (fmIsAggFunc(pCtx->functionId) || fmIsIndefiniteRowsFunc(pCtx->functionId)) {
684,508,161✔
2959
        if (!isUdaf) {
272,650,332✔
2960
          code = fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
272,632,575✔
2961
          QUERY_CHECK_CODE(code, lino, _end);
272,566,817✔
2962
        } else {
2963
          char* udfName = pExpr->pExpr->_function.pFunctNode->functionName;
17,757✔
2964
          pCtx->udfName = taosStrdup(udfName);
17,757✔
2965
          QUERY_CHECK_NULL(pCtx->udfName, code, lino, _end, terrno);
17,757✔
2966

2967
          code = fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
17,757✔
2968
          QUERY_CHECK_CODE(code, lino, _end);
17,757✔
2969
        }
2970
        bool tmp = pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
272,584,574✔
2971
        if (!tmp) {
272,577,361✔
2972
          code = terrno;
×
UNCOV
2973
          QUERY_CHECK_CODE(code, lino, _end);
×
2974
        }
2975
      } else {
2976
        if (fmIsPlaceHolderFunc(pCtx->functionId)) {
139,313,509✔
2977
          code = fmGetStreamPesudoFuncEnv(pCtx->functionId, pExpr->base.pParamList, &env);
6,203,445✔
2978
          QUERY_CHECK_CODE(code, lino, _end);
6,203,464✔
2979
        }      
2980
        
2981
        code = fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp);
139,321,361✔
2982
        if (code != TSDB_CODE_SUCCESS && isUdaf) {
139,332,796✔
2983
          code = TSDB_CODE_SUCCESS;
46,548✔
2984
        }
2985
        QUERY_CHECK_CODE(code, lino, _end);
139,332,796✔
2986

2987
        if (pCtx->sfp.getEnv != NULL) {
139,332,796✔
2988
          bool tmp = pCtx->sfp.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
17,380,461✔
2989
          if (!tmp) {
17,382,283✔
2990
            code = terrno;
×
2991
            QUERY_CHECK_CODE(code, lino, _end);
×
2992
          }
2993
        }
2994
      }
2995
      pCtx->resDataInfo.interBufSize = env.calcMemSize;
411,909,563✔
2996
    } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR ||
628,660,824✔
2997
               pExpr->pExpr->nodeType == QUERY_NODE_VALUE) {
40,017,857✔
2998
      // for simple column, the result buffer needs to hold at least one element.
2999
      pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes;
628,776,538✔
3000
    }
3001

3002
    pCtx->input.numOfInputCols = pFunct->numOfParams;
1,040,778,207✔
3003
    pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
1,040,719,024✔
3004
    QUERY_CHECK_NULL(pCtx->input.pData, code, lino, _end, terrno);
1,040,753,138✔
3005
    pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
1,040,722,388✔
3006
    QUERY_CHECK_NULL(pCtx->input.pColumnDataAgg, code, lino, _end, terrno);
1,040,769,902✔
3007

3008
    pCtx->pTsOutput = NULL;
1,040,744,822✔
3009
    pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
1,040,768,055✔
3010
    pCtx->resDataInfo.type = pFunct->resSchema.type;
1,040,700,536✔
3011
    pCtx->order = TSDB_ORDER_ASC;
1,040,673,314✔
3012
    pCtx->start.key = INT64_MIN;
1,040,728,958✔
3013
    pCtx->end.key = INT64_MIN;
1,040,760,897✔
3014
    pCtx->numOfParams = pExpr->base.numOfParams;
1,040,735,187✔
3015
    pCtx->param = pFunct->pParam;
1,040,798,052✔
3016
    pCtx->saveHandle.currentPage = -1;
1,040,809,346✔
3017
    pCtx->pStore = pStore;
1,040,674,183✔
3018
    pCtx->hasWindowOrGroup = false;
1,040,788,065✔
3019
    pCtx->needCleanup = false;
1,040,702,525✔
3020
    pCtx->skipDynDataCheck = false;
1,040,750,257✔
3021
  }
3022

3023
  for (int32_t i = 1; i < numOfOutput; ++i) {
1,081,755,468✔
3024
    (*rowEntryInfoOffset)[i] = (int32_t)((*rowEntryInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) +
1,344,702,309✔
3025
                                         pFuncCtx[i - 1].resDataInfo.interBufSize);
672,364,150✔
3026
  }
3027

3028
  code = setSelectValueColumnInfo(pFuncCtx, numOfOutput);
409,384,998✔
3029
  QUERY_CHECK_CODE(code, lino, _end);
409,343,149✔
3030

3031
_end:
409,343,149✔
3032
  if (code != TSDB_CODE_SUCCESS) {
409,300,596✔
3033
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3034
    for (int32_t i = 0; i < numOfOutput; ++i) {
×
3035
      taosMemoryFree(pFuncCtx[i].input.pData);
×
3036
      taosMemoryFree(pFuncCtx[i].input.pColumnDataAgg);
×
3037
    }
3038
    taosMemoryFreeClear(*rowEntryInfoOffset);
×
3039
    taosMemoryFreeClear(pFuncCtx);
×
3040

3041
    terrno = code;
×
3042
    return NULL;
×
3043
  }
3044
  return pFuncCtx;
409,300,596✔
3045
}
3046

3047
// NOTE: sources columns are more than the destination SSDatablock columns.
3048
// doFilter in table scan needs every column even its output is false
3049
int32_t relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn) {
9,285,763✔
3050
  int32_t code = TSDB_CODE_SUCCESS;
9,285,763✔
3051
  size_t  numOfSrcCols = taosArrayGetSize(pCols);
9,285,763✔
3052

3053
  int32_t i = 0, j = 0;
9,285,282✔
3054
  while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
63,957,192✔
3055
    SColumnInfoData* p = taosArrayGet(pCols, i);
54,672,391✔
3056
    if (!p) {
54,672,050✔
3057
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3058
      return terrno;
×
3059
    }
3060
    SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j);
54,672,050✔
3061
    if (!pmInfo) {
54,670,460✔
3062
      return terrno;
×
3063
    }
3064

3065
    if (p->info.colId == pmInfo->colId) {
54,670,460✔
3066
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->dstSlotId);
48,981,558✔
3067
      if (!pDst) {
48,981,108✔
3068
        return terrno;
×
3069
      }
3070
      code = colDataAssign(pDst, p, pBlock->info.rows, &pBlock->info);
48,981,108✔
3071
      if (code != TSDB_CODE_SUCCESS) {
48,980,322✔
3072
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3073
        return code;
×
3074
      }
3075
      i++;
48,980,322✔
3076
      j++;
48,980,322✔
3077
    } else if (p->info.colId < pmInfo->colId) {
5,690,971✔
3078
      i++;
5,691,588✔
3079
    } else {
3080
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
3081
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3082
    }
3083
  }
3084
  return code;
9,284,801✔
3085
}
3086

3087
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
211,298,759✔
3088
  SInterval interval = {
422,566,309✔
3089
      .interval = pTableScanNode->interval,
211,237,698✔
3090
      .sliding = pTableScanNode->sliding,
211,192,612✔
3091
      .intervalUnit = pTableScanNode->intervalUnit,
211,293,673✔
3092
      .slidingUnit = pTableScanNode->slidingUnit,
211,321,761✔
3093
      .offset = pTableScanNode->offset,
211,317,254✔
3094
      .precision = pTableScanNode->scan.node.pOutputDataBlockDesc->precision,
211,344,042✔
3095
      .timeRange = pTableScanNode->scanRange,
3096
  };
3097
  calcIntervalAutoOffset(&interval);
211,290,355✔
3098

3099
  return interval;
211,424,427✔
3100
}
3101

3102
SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
76,395,191✔
3103
  SColumn c = {0};
76,395,191✔
3104

3105
  c.slotId = pColNode->slotId;
76,395,191✔
3106
  c.colId = pColNode->colId;
76,399,355✔
3107
  c.type = pColNode->node.resType.type;
76,398,994✔
3108
  c.bytes = pColNode->node.resType.bytes;
76,395,933✔
3109
  c.scale = pColNode->node.resType.scale;
76,394,743✔
3110
  c.precision = pColNode->node.resType.precision;
76,394,693✔
3111
  return c;
76,391,132✔
3112
}
3113

3114

3115
/**
3116
 * @brief Determine the actual time range for reading data based on the RANGE clause and the WHERE conditions.
3117
 * @param[in] cond The range specified by WHERE condition.
3118
 * @param[in] range The range specified by RANGE clause.
3119
 * @param[out] twindow The range to be read in DESC order, and only one record is needed.
3120
 * @param[out] extTwindow The external range to read for only one record, which is used for FILL clause.
3121
 * @note `cond` and `twindow` may be the same address.
3122
 */
3123
static int32_t getQueryExtWindow(const STimeWindow* cond, const STimeWindow* range, STimeWindow* twindow,
1,935,005✔
3124
                                 STimeWindow* extTwindows) {
3125
  int32_t     code = TSDB_CODE_SUCCESS;
1,935,005✔
3126
  int32_t     lino = 0;
1,935,005✔
3127
  STimeWindow tempWindow;
3128

3129
  if (cond->skey > cond->ekey || range->skey > range->ekey) {
1,935,005✔
3130
    *twindow = extTwindows[0] = extTwindows[1] = TSWINDOW_DESC_INITIALIZER;
2,435✔
3131
    return code;
2,435✔
3132
  }
3133

3134
  if (range->ekey < cond->skey) {
1,932,570✔
3135
    extTwindows[1] = *cond;
324,365✔
3136
    *twindow = extTwindows[0] = TSWINDOW_DESC_INITIALIZER;
324,365✔
3137
    return code;
324,365✔
3138
  }
3139

3140
  if (cond->ekey < range->skey) {
1,608,205✔
3141
    extTwindows[0] = *cond;
217,808✔
3142
    *twindow = extTwindows[1] = TSWINDOW_DESC_INITIALIZER;
217,808✔
3143
    return code;
217,808✔
3144
  }
3145

3146
  // Only scan data in the time range intersecion.
3147
  extTwindows[0] = extTwindows[1] = *cond;
1,390,397✔
3148
  twindow->skey = TMAX(cond->skey, range->skey);
1,390,397✔
3149
  twindow->ekey = TMIN(cond->ekey, range->ekey);
1,390,397✔
3150
  extTwindows[0].ekey = twindow->skey - 1;
1,390,397✔
3151
  extTwindows[1].skey = twindow->ekey + 1;
1,390,397✔
3152

3153
  return code;
1,390,397✔
3154
}
3155

3156
static int32_t getPrimaryTimeRange(SNode** pPrimaryKeyCond, STimeWindow* pTimeRange, bool* isStrict) {
10,460✔
3157
  SNode*  pNew = NULL;
10,460✔
3158
  int32_t code = scalarCalculateRemoteConstants(*pPrimaryKeyCond, &pNew);
10,460✔
3159
  if (TSDB_CODE_SUCCESS == code) {
10,460✔
3160
    *pPrimaryKeyCond = pNew;
10,460✔
3161
    if (nodeType(pNew) != QUERY_NODE_VALUE) {
10,460✔
3162
      code = filterGetTimeRange(*pPrimaryKeyCond, pTimeRange, isStrict, NULL);
10,460✔
3163
    }
3164
  }
3165
  return code;
10,460✔
3166
}
3167

3168
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, STableScanPhysiNode* pTableScanNode,
239,220,485✔
3169
                               const SReadHandle* readHandle, bool applyExtWin) {
3170
  int32_t code = 0;                             
239,220,485✔
3171
  pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
239,220,485✔
3172
  pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
239,143,472✔
3173

3174
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
239,240,143✔
3175
  if (!pCond->colList) {
239,131,119✔
3176
    return terrno;
×
3177
  }
3178
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
239,056,556✔
3179
  if (pCond->pSlotList == NULL) {
239,144,314✔
3180
    taosMemoryFreeClear(pCond->colList);
×
3181
    return terrno;
×
3182
  }
3183

3184
  // TODO: get it from stable scan node
3185
  pCond->twindows = pTableScanNode->scanRange;
238,919,257✔
3186
  pCond->suid = pTableScanNode->scan.suid;
239,232,980✔
3187
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
239,085,092✔
3188
  pCond->startVersion = -1;
239,086,719✔
3189
  pCond->endVersion = -1;
239,061,485✔
3190
  pCond->skipRollup = readHandle->skipRollup;
239,093,728✔
3191
  if (readHandle->winRangeValid) {
239,049,382✔
3192
    pCond->twindows = readHandle->winRange;
199,895✔
3193
  }
3194
  pCond->cacheSttStatis = readHandle->cacheSttStatis;
239,164,280✔
3195
  // allowed read stt file optimization mode
3196
  pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) &&
478,270,339✔
3197
                       (pTableScanNode->scan.node.pConditions == NULL) && (pTableScanNode->interval == 0);
239,014,282✔
3198

3199
  int32_t j = 0;
239,096,407✔
3200
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
1,067,585,676✔
3201
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pTableScanNode->scan.pScanCols, i);
828,421,814✔
3202
    if (!pNode) {
828,266,296✔
3203
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3204
      return terrno;
×
3205
    }
3206
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
828,266,296✔
3207
    if (pColNode->colType == COLUMN_TYPE_TAG) {
828,407,245✔
3208
      continue;
×
3209
    }
3210

3211
    pCond->colList[j].type = pColNode->node.resType.type;
828,463,334✔
3212
    pCond->colList[j].bytes = pColNode->node.resType.bytes;
828,485,787✔
3213
    pCond->colList[j].colId = pColNode->colId;
828,448,797✔
3214
    pCond->colList[j].pk = pColNode->isPk;
828,472,725✔
3215

3216
    pCond->pSlotList[j] = pNode->slotId;
828,519,697✔
3217
    j += 1;
828,489,269✔
3218
  }
3219

3220
  pCond->numOfCols = j;
239,251,557✔
3221

3222
  if (applyExtWin) {
239,253,128✔
3223
    if (NULL != pTableScanNode->pExtScanRange) {
211,713,570✔
3224
      pCond->type = TIMEWINDOW_RANGE_EXTERNAL;
1,935,005✔
3225
      code = getQueryExtWindow(&pCond->twindows, pTableScanNode->pExtScanRange, &pCond->twindows, pCond->extTwindows);
1,935,005✔
3226
    } else if (readHandle->extWinRangeValid) {
209,576,972✔
3227
      pCond->type = TIMEWINDOW_RANGE_EXTERNAL;
×
3228
      code = getQueryExtWindow(&pCond->twindows, &readHandle->extWinRange, &pCond->twindows, pCond->extTwindows);
×
3229
    }
3230
  }
3231

3232
  if (pTableScanNode->pPrimaryCond) {
239,089,101✔
3233
    bool isStrict = false;
10,460✔
3234
    code = getPrimaryTimeRange((SNode**)&pTableScanNode->pPrimaryCond, &pCond->twindows, &isStrict);
10,460✔
3235
    if (code || !isStrict) {
10,460✔
3236
      code = nodesMergeNode((SNode**)&pTableScanNode->scan.node.pConditions, &pTableScanNode->pPrimaryCond);
×
3237
    }
3238
  }
3239

3240
  return code;
238,976,901✔
3241
}
3242

3243
int32_t initQueryTableDataCondWithColArray(SQueryTableDataCond* pCond, SQueryTableDataCond* pOrgCond,
303,834✔
3244
                                           const SReadHandle* readHandle, SArray* colArray) {
3245
  int32_t code = TSDB_CODE_SUCCESS;
303,834✔
3246
  int32_t lino = 0;
303,834✔
3247

3248
  pCond->order = TSDB_ORDER_ASC;
303,834✔
3249
  pCond->numOfCols = (int32_t)taosArrayGetSize(colArray);
303,834✔
3250

3251
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
303,834✔
3252
  QUERY_CHECK_NULL(pCond->colList, code, lino, _return, terrno);
303,834✔
3253

3254
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
303,834✔
3255
  QUERY_CHECK_NULL(pCond->pSlotList, code, lino, _return, terrno);
303,834✔
3256

3257
  pCond->twindows = pOrgCond->twindows;
303,834✔
3258
  pCond->order = pOrgCond->order;
303,834✔
3259
  pCond->type = pOrgCond->type;
303,834✔
3260
  pCond->startVersion = -1;
303,834✔
3261
  pCond->endVersion = -1;
303,834✔
3262
  pCond->skipRollup = true;
303,834✔
3263
  pCond->notLoadData = false;
303,834✔
3264

3265
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
1,465,692✔
3266
    SColIdPair* pColPair = taosArrayGet(colArray, i);
1,161,858✔
3267
    QUERY_CHECK_NULL(pColPair, code, lino, _return, terrno);
1,161,858✔
3268

3269
    bool find = false;
1,161,858✔
3270
    for (int32_t j = 0; j < pOrgCond->numOfCols; ++j) {
6,122,370✔
3271
      if (pOrgCond->colList[j].colId == pColPair->vtbColId) {
6,122,370✔
3272
        pCond->colList[i].type = pOrgCond->colList[j].type;
1,161,858✔
3273
        pCond->colList[i].bytes = pOrgCond->colList[j].bytes;
1,161,858✔
3274
        pCond->colList[i].colId = pColPair->orgColId;
1,161,858✔
3275
        pCond->colList[i].pk = pOrgCond->colList[j].pk;
1,161,858✔
3276
        pCond->pSlotList[i] = i;
1,161,858✔
3277
        find = true;
1,161,858✔
3278
        qDebug("%s mapped vtb colId:%d to org colId:%d", __func__, pColPair->vtbColId, pColPair->orgColId);
1,161,858✔
3279
        break;
1,161,858✔
3280
      }
3281
    }
3282
    QUERY_CHECK_CONDITION(find, code, lino, _return, TSDB_CODE_NOT_FOUND);
1,161,858✔
3283
  }
3284

3285
  return code;
303,834✔
3286
_return:
×
3287
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(terrno));
×
3288
  taosMemoryFreeClear(pCond->colList);
×
3289
  taosMemoryFreeClear(pCond->pSlotList);
×
3290
  return code;
×
3291
}
3292

3293
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) {
491,329,281✔
3294
  taosMemoryFreeClear(pCond->colList);
491,329,281✔
3295
  taosMemoryFreeClear(pCond->pSlotList);
491,295,583✔
3296
}
491,266,361✔
3297

3298
int32_t convertFillType(int32_t mode) {
2,496,792✔
3299
  int32_t type = TSDB_FILL_NONE;
2,496,792✔
3300
  switch (mode) {
2,496,792✔
3301
    case FILL_MODE_PREV:
128,832✔
3302
      type = TSDB_FILL_PREV;
128,832✔
3303
      break;
128,832✔
3304
    case FILL_MODE_NONE:
×
3305
      type = TSDB_FILL_NONE;
×
3306
      break;
×
3307
    case FILL_MODE_NULL:
159,921✔
3308
      type = TSDB_FILL_NULL;
159,921✔
3309
      break;
159,921✔
3310
    case FILL_MODE_NULL_F:
27,517✔
3311
      type = TSDB_FILL_NULL_F;
27,517✔
3312
      break;
27,517✔
3313
    case FILL_MODE_NEXT:
125,856✔
3314
      type = TSDB_FILL_NEXT;
125,856✔
3315
      break;
125,856✔
3316
    case FILL_MODE_VALUE:
141,204✔
3317
      type = TSDB_FILL_SET_VALUE;
141,204✔
3318
      break;
141,204✔
3319
    case FILL_MODE_VALUE_F:
10,522✔
3320
      type = TSDB_FILL_SET_VALUE_F;
10,522✔
3321
      break;
10,522✔
3322
    case FILL_MODE_LINEAR:
179,997✔
3323
      type = TSDB_FILL_LINEAR;
179,997✔
3324
      break;
179,997✔
3325
    case FILL_MODE_NEAR:
1,722,943✔
3326
      type = TSDB_FILL_NEAR;
1,722,943✔
3327
      break;
1,722,943✔
UNCOV
3328
    default:
×
UNCOV
3329
      type = TSDB_FILL_NONE;
×
3330
  }
3331

3332
  return type;
2,496,792✔
3333
}
3334

3335
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery) {
2,147,483,647✔
3336
  if (ascQuery) {
2,147,483,647✔
3337
    *w = getAlignQueryTimeWindow(pInterval, ts);
2,147,483,647✔
3338
  } else {
3339
    // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
3340
    *w = getAlignQueryTimeWindow(pInterval, ts);
19,933✔
3341

3342
    int64_t key = w->skey;
226,214✔
3343
    while (key < ts) {  // moving towards end
241,174✔
3344
      key = getNextTimeWindowStart(pInterval, key, TSDB_ORDER_ASC);
105,231✔
3345
      if (key > ts) {
105,231✔
3346
        break;
90,271✔
3347
      }
3348

3349
      w->skey = key;
14,960✔
3350
    }
3351
    w->ekey = taosTimeAdd(w->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
226,214✔
3352
  }
3353
}
2,147,483,647✔
3354

3355
static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) {
63,467,673✔
3356
  STimeWindow w = {0};
63,467,673✔
3357

3358
  w.skey = taosTimeTruncate(ts, pInterval);
63,467,673✔
3359
  w.ekey = taosTimeGetIntervalEnd(w.skey, pInterval);
63,462,542✔
3360
  return w;
63,465,569✔
3361
}
3362

3363
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) {
624,969✔
3364
  STimeWindow win = *pWindow;
624,969✔
3365
  STimeWindow save = win;
624,969✔
3366
  while (win.skey <= ts && win.ekey >= ts) {
2,780,440✔
3367
    save = win;
2,155,471✔
3368
    // get previous time window
3369
    getNextTimeWindow(pInterval, &win, order == TSDB_ORDER_DESC ? TSDB_ORDER_ASC : TSDB_ORDER_DESC);
2,155,471✔
3370
  }
3371

3372
  return save;
624,969✔
3373
}
3374

3375
// get the correct time window according to the handled timestamp
3376
// todo refactor
3377
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
99,239,345✔
3378
                                int32_t order) {
3379
  STimeWindow w = {0};
99,239,345✔
3380
  if (pResultRowInfo->cur.pageId == -1) {  // the first window, from the previous stored value
99,252,165✔
3381
    getInitialStartTimeWindow(pInterval, ts, &w, (order != TSDB_ORDER_DESC));
5,647,427✔
3382
    return w;
5,651,307✔
3383
  }
3384

3385
  SResultRow* pRow = getResultRowByPos(pBuf, &pResultRowInfo->cur, false);
93,600,863✔
3386
  if (pRow) {
93,605,728✔
3387
    TAOS_SET_OBJ_ALIGNED(&w, pRow->win);
93,612,082✔
3388
  }
3389

3390
  // in case of typical time window, we can calculate time window directly.
3391
  if (w.skey > ts || w.ekey < ts) {
93,609,276✔
3392
    w = doCalculateTimeWindow(ts, pInterval);
63,465,269✔
3393
  }
3394

3395
  if (pInterval->interval != pInterval->sliding) {
93,610,432✔
3396
    // it is an sliding window query, in which sliding value is not equalled to
3397
    // interval value, and we need to find the first qualified time window.
3398
    w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
624,969✔
3399
  }
3400

3401
  return w;
93,604,934✔
3402
}
3403

3404
TSKEY getNextTimeWindowStart(const SInterval* pInterval, TSKEY start, int32_t order) {
2,147,483,647✔
3405
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
2,147,483,647✔
3406
  TSKEY   nextStart = taosTimeAdd(start, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
2,147,483,647✔
3407
  nextStart = taosTimeAdd(nextStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision, NULL);
2,147,483,647✔
3408
  nextStart = taosTimeAdd(nextStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
2,147,483,647✔
3409
  return nextStart;
2,147,483,647✔
3410
}
3411

3412
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order) {
2,147,483,647✔
3413
  tw->skey = getNextTimeWindowStart(pInterval, tw->skey, order);
2,147,483,647✔
3414
  tw->ekey = taosTimeAdd(tw->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
2,147,483,647✔
3415
}
2,147,483,647✔
3416

3417
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
349,559,999✔
3418
  return (pLimitInfo->limit.limit != -1 || pLimitInfo->limit.offset != -1 || pLimitInfo->slimit.limit != -1 ||
695,223,318✔
3419
          pLimitInfo->slimit.offset != -1);
345,662,738✔
3420
}
3421

3422
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo) {
×
3423
  return (pLimitInfo->slimit.limit != -1 || pLimitInfo->slimit.offset != -1);
×
3424
}
3425

3426
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) {
531,799,401✔
3427
  SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)};
531,799,401✔
3428
  SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)};
531,648,631✔
3429

3430
  pLimitInfo->limit = limit;
531,629,007✔
3431
  pLimitInfo->slimit = slimit;
531,647,978✔
3432
  pLimitInfo->remainOffset = limit.offset;
531,672,090✔
3433
  pLimitInfo->remainGroupOffset = slimit.offset;
531,668,745✔
3434
  pLimitInfo->numOfOutputRows = 0;
531,653,941✔
3435
  pLimitInfo->numOfOutputGroups = 0;
531,698,869✔
3436
  pLimitInfo->currentGroupId = 0;
531,635,260✔
3437
}
531,752,534✔
3438

3439
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo) {
79,159,989✔
3440
  pLimitInfo->numOfOutputRows = 0;
79,159,989✔
3441
  pLimitInfo->remainOffset = pLimitInfo->limit.offset;
79,164,878✔
3442
}
79,156,745✔
3443

3444
int32_t tableListGetSize(const STableListInfo* pTableList, int32_t* pRes) {
569,859,461✔
3445
  if (taosArrayGetSize(pTableList->pTableList) != taosHashGetSize(pTableList->map)) {
569,859,461✔
3446
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
3447
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3448
  }
3449
  (*pRes) = taosArrayGetSize(pTableList->pTableList);
569,808,918✔
3450
  return TSDB_CODE_SUCCESS;
569,749,187✔
3451
}
3452

3453
uint64_t tableListGetSuid(const STableListInfo* pTableList) { return pTableList->idInfo.suid; }
3,409,857✔
3454

3455
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) {
189,989,250✔
3456
  if (taosArrayGetSize(pTableList->pTableList) == 0) {
189,989,250✔
3457
    return NULL;
1,951✔
3458
  }
3459

3460
  return taosArrayGet(pTableList->pTableList, index);
189,969,617✔
3461
}
3462

3463
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex) {
42,680✔
3464
  int32_t numOfTables = taosArrayGetSize(pTableList->pTableList);
42,680✔
3465
  if (startIndex >= numOfTables) {
42,680✔
3466
    return -1;
×
3467
  }
3468

3469
  for (int32_t i = startIndex; i < numOfTables; ++i) {
411,274✔
3470
    STableKeyInfo* p = taosArrayGet(pTableList->pTableList, i);
411,274✔
3471
    if (!p) {
411,274✔
3472
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3473
      return -1;
×
3474
    }
3475
    if (p->uid == uid) {
411,274✔
3476
      return i;
42,680✔
3477
    }
3478
  }
3479
  return -1;
×
3480
}
3481

3482
void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, uint64_t* uid, int32_t* type) {
339,348✔
3483
  *psuid = pTableList->idInfo.suid;
339,348✔
3484
  *uid = pTableList->idInfo.uid;
339,348✔
3485
  *type = pTableList->idInfo.tableType;
339,348✔
3486
}
339,348✔
3487

3488
uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
717,166,684✔
3489
  int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
717,166,684✔
3490
  if (slot == NULL) {
717,311,227✔
3491
    qDebug("table:%" PRIu64 " not found in table list", tableUid);
×
3492
    return -1;
×
3493
  }
3494

3495
  STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot);
717,311,227✔
3496
  if (pKeyInfo == NULL) {
717,376,834✔
3497
    qDebug("table:%" PRIu64 " not found in table list", tableUid);
×
3498
    return -1;
×
3499
  }
3500
  return pKeyInfo->groupId;
717,376,834✔
3501
}
3502

3503
// TODO handle the group offset info, fix it, the rule of group output will be broken by this function
3504
// int32_t tableListRemoveTableInfo(STableListInfo* pTableList, uint64_t uid) {
3505
//   int32_t code = TSDB_CODE_SUCCESS;
3506
//   int32_t lino = 0;
3507

3508
//   int32_t* slot = taosHashGet(pTableList->map, &uid, sizeof(uid));
3509
//   if (slot == NULL) {
3510
//     qDebug("table:%" PRIu64 " not found in table list", uid);
3511
//     return 0;
3512
//   }
3513

3514
//   taosArrayRemove(pTableList->pTableList, *slot);
3515
//   code = taosHashRemove(pTableList->map, &uid, sizeof(uid));
3516

3517
//   _end:
3518
//   if (code != TSDB_CODE_SUCCESS) {
3519
//     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
3520
//   } else {
3521
//     qDebug("uid:%" PRIu64 ", remove from table list", uid);
3522
//   }
3523

3524
//   return code;
3525
// }
3526

3527
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
911,514✔
3528
  int32_t code = TSDB_CODE_SUCCESS;
911,514✔
3529
  int32_t lino = 0;
911,514✔
3530
  if (pTableList->map == NULL) {
911,514✔
3531
    pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
3532
    QUERY_CHECK_NULL(pTableList->map, code, lino, _end, terrno);
×
3533
  }
3534

3535
  STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
911,233✔
3536
  void*         p = taosHashGet(pTableList->map, &uid, sizeof(uid));
911,223✔
3537
  if (p != NULL) {
910,651✔
3538
    qInfo("table:%" PRId64 " already in tableIdList, ignore it", uid);
614✔
3539
    goto _end;
614✔
3540
  }
3541

3542
  void* tmp = taosArrayPush(pTableList->pTableList, &keyInfo);
910,037✔
3543
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
910,298✔
3544

3545
  int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
910,298✔
3546
  code = taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
910,880✔
3547
  if (code != TSDB_CODE_SUCCESS) {
911,181✔
3548
    // we have checked the existence of uid in hash map above
3549
    QUERY_CHECK_CONDITION((code != TSDB_CODE_DUP_KEY), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3550
    taosArrayPopTailBatch(pTableList->pTableList, 1);  // let's pop the last element in the array list
×
3551
  }
3552

3553
_end:
911,795✔
3554
  if (code != TSDB_CODE_SUCCESS) {
910,661✔
3555
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3556
  } else {
3557
    qDebug("uid:%" PRIu64 ", groupId:%" PRIu64 " added into table list, slot:%d, total:%d", uid, gid, slot, slot + 1);
910,661✔
3558
  }
3559

3560
  return code;
911,785✔
3561
}
3562

3563
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
204,064,208✔
3564
                              int32_t* size) {
3565
  int32_t totalGroups = tableListGetOutputGroups(pTableList);
204,064,208✔
3566
  int32_t numOfTables = 0;
204,105,403✔
3567
  int32_t code = tableListGetSize(pTableList, &numOfTables);
204,109,467✔
3568
  if (code != TSDB_CODE_SUCCESS) {
204,163,847✔
3569
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3570
    return code;
×
3571
  }
3572

3573
  if (ordinalGroupIndex < 0 || ordinalGroupIndex >= totalGroups) {
204,163,847✔
3574
    return TSDB_CODE_INVALID_PARA;
×
3575
  }
3576

3577
  // here handle two special cases:
3578
  // 1. only one group exists, and 2. one table exists for each group.
3579
  if (totalGroups == 1) {
204,163,847✔
3580
    *size = numOfTables;
203,850,093✔
3581
    *pKeyInfo = (*size == 0) ? NULL : taosArrayGet(pTableList->pTableList, 0);
203,876,317✔
3582
    return TSDB_CODE_SUCCESS;
203,839,146✔
3583
  } else if (totalGroups == numOfTables) {
313,901✔
3584
    *size = 1;
272,142✔
3585
    *pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex);
272,142✔
3586
    return TSDB_CODE_SUCCESS;
272,641✔
3587
  }
3588

3589
  int32_t offset = pTableList->groupOffset[ordinalGroupIndex];
42,350✔
3590
  if (ordinalGroupIndex < totalGroups - 1) {
52,621✔
3591
    *size = pTableList->groupOffset[ordinalGroupIndex + 1] - offset;
39,529✔
3592
  } else {
3593
    *size = numOfTables - offset;
13,092✔
3594
  }
3595

3596
  *pKeyInfo = taosArrayGet(pTableList->pTableList, offset);
52,621✔
3597
  return TSDB_CODE_SUCCESS;
52,621✔
3598
}
3599

3600
int32_t tableListGetOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; }
608,165,966✔
3601

3602
bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; }
734,399✔
3603

3604
STableListInfo* tableListCreate() {
247,867,873✔
3605
  STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo));
247,867,873✔
3606
  if (pListInfo == NULL) {
247,673,367✔
3607
    return NULL;
×
3608
  }
3609

3610
  pListInfo->remainGroups = NULL;
247,673,367✔
3611
  pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
247,677,398✔
3612
  if (pListInfo->pTableList == NULL) {
247,724,507✔
3613
    goto _error;
×
3614
  }
3615

3616
  pListInfo->map = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
247,787,136✔
3617
  if (pListInfo->map == NULL) {
247,964,160✔
3618
    goto _error;
×
3619
  }
3620

3621
  pListInfo->numOfOuputGroups = 1;
247,966,940✔
3622
  return pListInfo;
247,966,519✔
3623

3624
_error:
×
3625
  tableListDestroy(pListInfo);
×
3626
  return NULL;
×
3627
}
3628

3629
void tableListDestroy(STableListInfo* pTableListInfo) {
257,422,348✔
3630
  if (pTableListInfo == NULL) {
257,422,348✔
3631
    return;
9,563,436✔
3632
  }
3633

3634
  taosArrayDestroy(pTableListInfo->pTableList);
247,858,912✔
3635
  taosMemoryFreeClear(pTableListInfo->groupOffset);
247,825,950✔
3636

3637
  taosHashCleanup(pTableListInfo->map);
247,826,842✔
3638
  taosHashCleanup(pTableListInfo->remainGroups);
247,901,977✔
3639
  pTableListInfo->pTableList = NULL;
247,915,749✔
3640
  pTableListInfo->map = NULL;
247,911,254✔
3641
  taosMemoryFree(pTableListInfo);
247,904,034✔
3642
}
3643

3644
void tableListClear(STableListInfo* pTableListInfo) {
707,513✔
3645
  if (pTableListInfo == NULL) {
707,513✔
3646
    return;
×
3647
  }
3648

3649
  taosArrayClear(pTableListInfo->pTableList);
707,513✔
3650
  taosHashClear(pTableListInfo->map);
708,918✔
3651
  taosHashClear(pTableListInfo->remainGroups);
708,928✔
3652
  taosMemoryFree(pTableListInfo->groupOffset);
709,209✔
3653
  pTableListInfo->numOfOuputGroups = 1;
709,209✔
3654
  pTableListInfo->oneTableForEachGroup = false;
709,209✔
3655
}
3656

3657
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
494,326,343✔
3658
  STableKeyInfo* pInfo1 = (STableKeyInfo*)p1;
494,326,343✔
3659
  STableKeyInfo* pInfo2 = (STableKeyInfo*)p2;
494,326,343✔
3660

3661
  if (pInfo1->groupId == pInfo2->groupId) {
494,326,343✔
3662
    return 0;
463,693,534✔
3663
  } else {
3664
    return pInfo1->groupId < pInfo2->groupId ? -1 : 1;
30,647,954✔
3665
  }
3666
}
3667

3668
int32_t sortTableGroup(STableListInfo* pTableListInfo) {
27,332,780✔
3669
  int32_t code = TSDB_CODE_SUCCESS;
27,332,780✔
3670
  taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
27,332,780✔
3671
  int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
27,360,879✔
3672
  if (size == 0) {
27,354,224✔
3673
    pTableListInfo->numOfOuputGroups = 0;
×
3674
    return code;
×
3675
  }
3676

3677
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
27,354,224✔
3678
  if (!pList) {
27,359,924✔
3679
    code = terrno;
×
3680
    goto end;
×
3681
  }
3682

3683
  STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
27,359,924✔
3684
  if (pInfo == NULL) {
27,340,517✔
3685
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3686
    code = terrno;
×
3687
    goto end;
×
3688
  }
3689
  uint64_t gid = pInfo->groupId;
27,340,517✔
3690

3691
  int32_t start = 0;
27,346,877✔
3692
  void*   tmp = taosArrayPush(pList, &start);
27,358,684✔
3693
  if (tmp == NULL) {
27,358,684✔
3694
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3695
    code = terrno;
×
3696
    goto end;
×
3697
  }
3698

3699
  for (int32_t i = 1; i < size; ++i) {
142,456,418✔
3700
    pInfo = taosArrayGet(pTableListInfo->pTableList, i);
115,098,989✔
3701
    if (pInfo == NULL) {
115,095,958✔
3702
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3703
      code = terrno;
×
3704
      goto end;
×
3705
    }
3706
    if (pInfo->groupId != gid) {
115,095,958✔
3707
      tmp = taosArrayPush(pList, &i);
6,279,009✔
3708
      if (tmp == NULL) {
6,279,009✔
3709
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3710
        code = terrno;
×
3711
        goto end;
×
3712
      }
3713
      gid = pInfo->groupId;
6,279,009✔
3714
    }
3715
  }
3716

3717
  pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
27,355,806✔
3718
  pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
27,357,990✔
3719
  if (pTableListInfo->groupOffset == NULL) {
27,345,228✔
3720
    code = terrno;
×
3721
    goto end;
×
3722
  }
3723

3724
  memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
27,339,635✔
3725

3726
end:
27,347,565✔
3727
  taosArrayDestroy(pList);
27,350,725✔
3728
  return code;
27,340,657✔
3729
}
3730

3731
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SScanPhysiNode* pScanNode,
225,645,480✔
3732
                                    SNodeList* group, bool groupSort, uint8_t* digest, SStorageAPI* pAPI, SHashObj* groupIdMap) {
3733
  int32_t code = TSDB_CODE_SUCCESS;
225,645,480✔
3734

3735
  bool   groupByTbname = groupbyTbname(group);
225,645,480✔
3736
  size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
225,639,857✔
3737
  if (!numOfTables) {
225,630,935✔
3738
    return code;
3,110✔
3739
  }
3740
  qDebug("numOfTables:%zu, groupByTbname:%d, group:%p", numOfTables, groupByTbname, group);
225,627,825✔
3741
  if (group == NULL || groupByTbname) {
225,582,826✔
3742
    if (tsCountAlwaysReturnValue && QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode) &&
221,862,478✔
3743
        ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) {
185,489,116✔
3744
      pTableListInfo->remainGroups =
16,576,652✔
3745
          taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
16,578,172✔
3746
      if (pTableListInfo->remainGroups == NULL) {
16,578,172✔
3747
        return terrno;
×
3748
      }
3749

3750
      for (int i = 0; i < numOfTables; i++) {
68,895,311✔
3751
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
52,317,139✔
3752
        if (!info) {
52,317,038✔
3753
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3754
          return terrno;
×
3755
        }
3756
        info->groupId = groupByTbname ? info->uid : 0;
52,317,038✔
3757
        int32_t tempRes = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId),
52,318,659✔
3758
                                      &(info->uid), sizeof(info->uid));
52,318,659✔
3759
        if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
52,317,605✔
3760
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
3761
          return tempRes;
×
3762
        }
3763
      }
3764
    } else {
3765
      for (int32_t i = 0; i < numOfTables; i++) {
734,513,193✔
3766
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
529,282,853✔
3767
        if (!info) {
529,268,702✔
3768
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3769
          return terrno;
×
3770
        }
3771
        info->groupId = groupByTbname ? info->uid : 0;
529,268,702✔
3772
        
3773
      }
3774
    }
3775
    if (groupIdMap && group != NULL){
221,808,512✔
3776
      getColInfoResultForGroupbyForStream(pHandle->vnode, group, pTableListInfo, pAPI, groupIdMap);
59,851✔
3777
    }
3778

3779
    pTableListInfo->oneTableForEachGroup = groupByTbname;
221,808,512✔
3780
    if (numOfTables == 1 && pTableListInfo->idInfo.tableType == TSDB_CHILD_TABLE) {
221,883,949✔
3781
      pTableListInfo->oneTableForEachGroup = true;
79,635,172✔
3782
    }
3783

3784
    if (groupSort && groupByTbname) {
221,888,346✔
3785
      taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
2,029,264✔
3786
      pTableListInfo->numOfOuputGroups = numOfTables;
2,030,198✔
3787
    } else if (groupByTbname && pScanNode->groupOrderScan) {
219,859,082✔
3788
      pTableListInfo->numOfOuputGroups = numOfTables;
23,400✔
3789
    } else {
3790
      pTableListInfo->numOfOuputGroups = 1;
219,836,096✔
3791
    }
3792
    if (groupSort || pScanNode->groupOrderScan) {
221,966,948✔
3793
      code = sortTableGroup(pTableListInfo);
27,266,195✔
3794
    }
3795
  } else {
3796
    bool initRemainGroups = false;
3,720,348✔
3797
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) {
3,720,348✔
3798
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode;
3,641,656✔
3799
      if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable &&
3,641,656✔
3800
          !(groupSort || pScanNode->groupOrderScan)) {
1,825,701✔
3801
        initRemainGroups = true;
1,802,881✔
3802
      }
3803
    }
3804

3805
    code = getColInfoResultForGroupby(pHandle->vnode, group, pTableListInfo, digest, pAPI, initRemainGroups, groupIdMap);
3,720,348✔
3806
    if (code != TSDB_CODE_SUCCESS) {
3,719,882✔
3807
      return code;
×
3808
    }
3809

3810
    if (pScanNode->groupOrderScan) pTableListInfo->numOfOuputGroups = taosArrayGetSize(pTableListInfo->pTableList);
3,719,882✔
3811

3812
    if (groupSort || pScanNode->groupOrderScan) {
3,719,882✔
3813
      code = sortTableGroup(pTableListInfo);
125,211✔
3814
    }
3815
  }
3816

3817
  // add all table entry in the hash map
3818
  size_t size = taosArrayGetSize(pTableListInfo->pTableList);
225,576,090✔
3819
  for (int32_t i = 0; i < size; ++i) {
828,243,663✔
3820
    STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i);
602,580,235✔
3821
    if (!p) {
602,390,589✔
3822
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3823
      return terrno;
×
3824
    }
3825
    int32_t tempRes = taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t));
602,390,589✔
3826
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
602,637,410✔
3827
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
3828
      return tempRes;
×
3829
    }
3830
  }
3831

3832
  return code;
225,706,336✔
3833
}
3834

3835
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
238,586,018✔
3836
                                STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
3837
                                SExecTaskInfo* pTaskInfo, SHashObj* groupIdMap) {
3838
  int64_t     st = taosGetTimestampUs();
238,512,729✔
3839
  const char* idStr = GET_TASKID(pTaskInfo);
238,512,729✔
3840

3841
  if (pHandle == NULL) {
238,439,440✔
3842
    qError("invalid handle, in creating operator tree, %s", idStr);
×
3843
    return TSDB_CODE_INVALID_PARA;
×
3844
  }
3845

3846
  if (pHandle->uid != 0) {
238,439,440✔
3847
    pScanNode->uid = pHandle->uid;
36,253✔
3848
    pScanNode->tableType = TSDB_CHILD_TABLE;
36,253✔
3849
  }
3850
  uint8_t digest[17] = {0};
238,539,851✔
3851
  int32_t code = getTableList(pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo, digest, idStr,
238,552,322✔
3852
                              &pTaskInfo->storageAPI, pTaskInfo->pStreamRuntimeInfo);
238,540,757✔
3853
  if (code != TSDB_CODE_SUCCESS) {
238,606,468✔
3854
    qError("failed to getTableList, code:%s", tstrerror(code));
876✔
3855
    return code;
876✔
3856
  }
3857

3858
  int32_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
238,605,592✔
3859

3860
  int64_t st1 = taosGetTimestampUs();
238,663,154✔
3861
  pTaskInfo->cost.extractListTime = (st1 - st) / 1000.0;
238,663,154✔
3862
  qDebug("extract queried table list completed, %d tables, elapsed time:%.2f ms %s", numOfTables,
238,639,447✔
3863
         pTaskInfo->cost.extractListTime, idStr);
3864

3865
  if (numOfTables == 0) {
238,596,515✔
3866
    qDebug("no table qualified for query, %s", idStr);
13,025,489✔
3867
    return TSDB_CODE_SUCCESS;
13,025,489✔
3868
  }
3869

3870
  code = buildGroupIdMapForAllTables(pTableListInfo, pHandle, pScanNode, pGroupTags, groupSort, digest, &pTaskInfo->storageAPI, groupIdMap);
225,571,026✔
3871
  if (code != TSDB_CODE_SUCCESS) {
225,676,270✔
3872
    return code;
×
3873
  }
3874

3875
  pTaskInfo->cost.groupIdMapTime = (taosGetTimestampUs() - st1) / 1000.0;
225,689,752✔
3876
  qDebug("generate group id map completed, elapsed time:%.2f ms %s", pTaskInfo->cost.groupIdMapTime, idStr);
225,673,044✔
3877

3878
  return TSDB_CODE_SUCCESS;
225,632,170✔
3879
}
3880

3881
char* getStreamOpName(uint16_t opType) {
5,881,037✔
3882
  switch (opType) {
5,881,037✔
3883
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
×
3884
      return "stream scan";
×
3885
    case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
5,754,832✔
3886
      return "project";
5,754,832✔
3887
    case QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW:
126,205✔
3888
      return "external window";
126,205✔
3889
  }
3890
  return "error name";
×
3891
}
3892

3893
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr, int64_t qId) {
340,380,934✔
3894
  if (qDebugFlag & DEBUG_TRACE) {
340,380,934✔
3895
    if (!pBlock) {
19,082✔
3896
      qDebug("%" PRIx64 " %s %s %s: Block is Null", qId, taskIdStr, flag, __func__);
3,451✔
3897
      return;
3,451✔
3898
    } else if (pBlock->info.rows == 0) {
15,631✔
3899
      qDebug("%" PRIx64 " %s %s %s: Block is Empty. block type %d", qId, taskIdStr, flag, __func__, pBlock->info.type);
×
3900
      return;
×
3901
    }
3902
    
3903
    char*   pBuf = NULL;
15,631✔
3904
    int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr, qId);
15,631✔
3905
    if (code == 0) {
15,631✔
3906
      qDebugL("%" PRIx64 " %s %s", qId, __func__, pBuf);
15,631✔
3907
      taosMemoryFree(pBuf);
15,631✔
3908
    }
3909
  }
3910
}
3911

3912
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
×
3913
  if (!pBlock) {
×
3914
    qDebug("%s===stream===%s %s: Block is Null", taskIdStr, flag, opStr);
×
3915
    return;
×
3916
  } else if (pBlock->info.rows == 0) {
×
3917
    qDebug("%s===stream===%s %s: Block is Empty. block type %d.skey:%" PRId64 ",ekey:%" PRId64 ",version%" PRId64,
×
3918
           taskIdStr, flag, opStr, pBlock->info.type, pBlock->info.window.skey, pBlock->info.window.ekey,
3919
           pBlock->info.version);
3920
    return;
×
3921
  }
3922
  if (qDebugFlag & DEBUG_TRACE) {
×
3923
    char* pBuf = NULL;
×
3924
    char  flagBuf[64];
×
3925
    snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
×
3926
    int32_t code = dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr, 0);
×
3927
    if (code == 0) {
×
3928
      qDebug("%s", pBuf);
×
3929
      taosMemoryFree(pBuf);
×
3930
    }
3931
  }
3932
}
3933

3934
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
12,275,585✔
3935

3936
void updateTimeWindowInfo(SColumnInfoData* pColData, const STimeWindow* pWin, int64_t delta) {
2,147,483,647✔
3937
  int64_t* ts = (int64_t*)pColData->pData;
2,147,483,647✔
3938

3939
  int64_t duration = pWin->ekey > pWin->skey ? pWin->ekey - pWin->skey + delta : pWin->skey - pWin->ekey + delta;
2,147,483,647✔
3940
  ts[2] = duration;            // set the duration
2,147,483,647✔
3941
  ts[3] = pWin->skey;          // window start key
2,147,483,647✔
3942
  ts[4] = pWin->ekey + delta;  // window end key
2,147,483,647✔
3943
}
2,147,483,647✔
3944

3945
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock,
2,147,483,647✔
3946
                 int32_t rowIndex) {
3947
  SColumnDataAgg* pColAgg = NULL;
2,147,483,647✔
3948
  const char*     isNull = oldkeyBuf;
2,147,483,647✔
3949
  const char*     p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size;
2,147,483,647✔
3950

3951
  for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
2,147,483,647✔
3952
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
2,147,483,647✔
3953
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
2,147,483,647✔
3954
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
2,147,483,647✔
3955

3956
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
2,147,483,647✔
3957
      if (isNull[i] != 1) return 1;
250,064,136✔
3958
    } else {
3959
      if (isNull[i] != 0) return 1;
2,147,483,647✔
3960
      const char* val = colDataGetData(pColInfoData, rowIndex);
2,147,483,647✔
3961
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
3962
        int32_t len = getJsonValueLen(val);
×
3963
        if (memcmp(p, val, len) != 0) return 1;
×
3964
        p += len;
×
3965
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
2,147,483,647✔
3966
        if (IS_STR_DATA_BLOB(pCol->type)) {
1,155,690,308✔
3967
          if (memcmp(p, val, blobDataTLen(val)) != 0) return 1;
×
3968
          p += blobDataTLen(val);
×
3969
        } else {
3970
          if (memcmp(p, val, varDataTLen(val)) != 0) return 1;
1,155,882,908✔
3971
          p += varDataTLen(val);
1,155,273,436✔
3972
        }
3973
      } else {
3974
        if (0 != memcmp(p, val, pCol->bytes)) return 1;
2,147,483,647✔
3975
        p += pCol->bytes;
2,147,483,647✔
3976
      }
3977
    }
3978
  }
3979
  if ((int32_t)(p - oldkeyBuf) != oldKeysLen) return 1;
2,147,483,647✔
3980
  return 0;
2,147,483,647✔
3981
}
3982

3983
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex) {
1,527,532✔
3984
  uint32_t        colNum = pSortGroupCols->size;
1,527,532✔
3985
  SColumnDataAgg* pColAgg = NULL;
1,527,532✔
3986
  char*           isNull = keyBuf;
1,527,532✔
3987
  char*           p = keyBuf + sizeof(int8_t) * colNum;
1,527,532✔
3988

3989
  for (int32_t i = 0; i < colNum; ++i) {
4,180,704✔
3990
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
2,653,172✔
3991
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
2,653,172✔
3992
    if (pCol->slotId > pBlock->pDataBlock->size) continue;
2,653,172✔
3993

3994
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
2,653,172✔
3995

3996
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
5,306,344✔
3997
      isNull[i] = 1;
128,400✔
3998
    } else {
3999
      isNull[i] = 0;
2,524,772✔
4000
      const char* val = colDataGetData(pColInfoData, rowIndex);
2,524,772✔
4001
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
2,524,772✔
4002
        int32_t len = getJsonValueLen(val);
×
4003
        memcpy(p, val, len);
×
4004
        p += len;
×
4005
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
2,524,772✔
4006
        if (IS_STR_DATA_BLOB(pCol->type)) {
943,312✔
4007
          blobDataCopy(p, val);
×
4008
          p += blobDataTLen(val);
×
4009
        } else {
4010
          varDataCopy(p, val);
943,312✔
4011
          p += varDataTLen(val);
943,312✔
4012
        }
4013
      } else {
4014
        memcpy(p, val, pCol->bytes);
1,581,460✔
4015
        p += pCol->bytes;
1,581,460✔
4016
      }
4017
    }
4018
  }
4019
  return (int32_t)(p - keyBuf);
1,527,532✔
4020
}
4021

4022
uint64_t calcGroupId(char* pData, int32_t len) {
2,147,483,647✔
4023
  T_MD5_CTX context;
2,147,483,647✔
4024
  tMD5Init(&context);
2,147,483,647✔
4025
  tMD5Update(&context, (uint8_t*)pData, len);
2,147,483,647✔
4026
  tMD5Final(&context);
2,147,483,647✔
4027

4028
  // NOTE: only extract the initial 8 bytes of the final MD5 digest
4029
  uint64_t id = 0;
2,147,483,647✔
4030
  memcpy(&id, context.digest, sizeof(uint64_t));
2,147,483,647✔
4031
  if (0 == id) memcpy(&id, context.digest + 8, sizeof(uint64_t));
2,147,483,647✔
4032
  return id;
2,147,483,647✔
4033
}
4034

4035
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
101,008✔
4036
  SNode*     node;
4037
  SNodeList* ret = NULL;
101,008✔
4038
  FOREACH(node, pSortKeys) {
308,160✔
4039
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)node;
207,152✔
4040
    int32_t           code = nodesListMakeAppend(&ret, pSortKey->pExpr);
207,152✔
4041
    if (code != TSDB_CODE_SUCCESS) {
207,152✔
4042
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
4043
      terrno = code;
×
4044
      return NULL;
×
4045
    }
4046
  }
4047
  return ret;
101,008✔
4048
}
4049

4050
int32_t extractKeysLen(const SArray* keys, int32_t* pLen) {
101,008✔
4051
  int32_t code = TSDB_CODE_SUCCESS;
101,008✔
4052
  int32_t lino = 0;
101,008✔
4053
  int32_t len = 0;
101,008✔
4054
  int32_t keyNum = taosArrayGetSize(keys);
101,008✔
4055
  for (int32_t i = 0; i < keyNum; ++i) {
256,800✔
4056
    SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
155,792✔
4057
    QUERY_CHECK_NULL(pCol, code, lino, _end, terrno);
155,792✔
4058
    len += pCol->bytes;
155,792✔
4059
  }
4060
  len += sizeof(int8_t) * keyNum;  // null flag
101,008✔
4061
  *pLen = len;
101,008✔
4062

4063
_end:
101,008✔
4064
  if (code != TSDB_CODE_SUCCESS) {
101,008✔
4065
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4066
  }
4067
  return code;
101,008✔
4068
}
4069

4070
int32_t parseErrorMsgFromAnalyticServer(SJson* pJson, const char* pId) {
×
4071
  int32_t code = TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
4072
  if (pJson == NULL) {
×
4073
    return code;
×
4074
  }
4075

4076
  char    pMsg[1024] = {0};
×
4077
  int32_t ret = tjsonGetStringValue(pJson, "msg", pMsg);
×
4078

4079
  if (ret == 0) {
×
4080
    qError("%s failed to exec imputation, msg:%s", pId, pMsg);
×
4081
    if (strstr(pMsg, "white noise") != NULL) {
×
4082
      code = TSDB_CODE_ANA_WN_DATA;
×
4083
    } else if (strstr(pMsg, "white-noise") != NULL) {
×
4084
      code = TSDB_CODE_ANA_WN_DATA;
×
4085
    } else if (strstr(pMsg, "[Errno 111] Connection refused") != NULL) {
×
4086
      code = TSDB_CODE_ANA_ALGO_NOT_LOAD;
×
4087
    }
4088
  } else {
4089
    qError("%s failed to extract msg from server, unknown error", pId);
×
4090
  }
4091

4092
  return code;
×
4093
}
4094

4095

4096
int32_t createBlockFromRemoteValueNode(SSDataBlock** ppBlock, SRemoteValueNode* pRemote) {
20,976,894✔
4097
  SValueNode* pVal = (SValueNode*)pRemote;
20,976,894✔
4098
  int32_t code = 0;
20,976,894✔
4099
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
20,976,894✔
4100
  if (pBlock == NULL) {
20,975,586✔
4101
    return terrno;
×
4102
  }
4103

4104
  pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
20,975,586✔
4105
  if (pBlock->pDataBlock == NULL) {
20,976,894✔
4106
    code = terrno;
×
4107
    taosMemoryFree(pBlock);
×
4108
    return code;
×
4109
  }
4110

4111
  SColumnInfoData idata =
20,976,894✔
4112
      createColumnInfoData(pVal->node.resType.type, pVal->node.resType.bytes, 0);
20,976,458✔
4113
  idata.info.scale = pVal->node.resType.scale;
20,977,330✔
4114
  idata.info.precision = pVal->node.resType.precision;
20,977,330✔
4115

4116
  code = blockDataAppendColInfo(pBlock, &idata);
20,976,894✔
4117
  if (code != TSDB_CODE_SUCCESS) {
20,975,513✔
4118
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
4119
    blockDataDestroy(pBlock);
×
4120
    *ppBlock = NULL;
×
4121
    return code;
×
4122
  }
4123

4124
  *ppBlock = pBlock;
20,975,513✔
4125

4126
  return code;
20,975,513✔
4127
}
4128

4129

4130
int32_t extractSingleRspBlock(SRetrieveTableRsp* pRetrieveRsp, SSDataBlock* pb) {
20,976,371✔
4131
  int32_t            code = TSDB_CODE_SUCCESS;
20,976,371✔
4132
  int32_t            lino = 0;
20,976,371✔
4133
  void*              decompBuf = NULL;
20,976,371✔
4134

4135
  char* pNextStart = pRetrieveRsp->data;
20,976,371✔
4136
  char* pStart = pNextStart;
20,976,821✔
4137

4138
  int32_t index = 0;
20,975,862✔
4139

4140
  if (pRetrieveRsp->compressed) {  // decompress the data
20,975,862✔
4141
    decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
4142
    QUERY_CHECK_NULL(decompBuf, code, lino, _end, terrno);
×
4143
  }
4144

4145
  int32_t compLen = *(int32_t*)pStart;
20,975,935✔
4146
  pStart += sizeof(int32_t);
20,974,903✔
4147

4148
  int32_t rawLen = *(int32_t*)pStart;
20,976,371✔
4149
  pStart += sizeof(int32_t);
20,976,807✔
4150
  QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
20,976,821✔
4151

4152
  pNextStart = pStart + compLen;
20,976,821✔
4153
  if (pRetrieveRsp->compressed && (compLen < rawLen)) {
20,974,467✔
4154
    int32_t t = tsDecompressString(pStart, compLen, 1, decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
4155
    QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
4156
    pStart = decompBuf;
×
4157
  }
4158

4159
  code = blockDecodeInternal(pb, pStart, (const char**)&pStart);
20,974,467✔
4160
  if (code != 0) {
20,976,371✔
4161
    taosMemoryFreeClear(pRetrieveRsp);
×
4162
    goto _end;
×
4163
  }
4164

4165
_end:
20,976,371✔
4166
  if (code != TSDB_CODE_SUCCESS) {
20,975,499✔
4167
    blockDataDestroy(pb);
×
4168
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4169
  }
4170
  return code;
20,975,499✔
4171
}
4172

4173
int32_t setValueFromResBlock(STaskSubJobCtx* ctx, SRemoteValueNode* pRes, SSDataBlock* pBlock) {
20,975,935✔
4174
  int32_t code = 0;
20,975,935✔
4175
  bool needFree = true;
20,975,935✔
4176
  int32_t colNum = taosArrayGetSize(pBlock->pDataBlock);
20,975,935✔
4177
  if (NULL == pBlock->pDataBlock || 1 != colNum || pBlock->info.rows > 1) {
20,974,045✔
UNCOV
4178
    qError("%s invalid scl fetch res block, pDataBlock:%p, colNum:%d, rows:%" PRId64, 
×
4179
      ctx->idStr, pBlock->pDataBlock, colNum, pBlock->info.rows);
4180
    return TSDB_CODE_PAR_INVALID_SCALAR_SUBQ_RES_ROWS;
×
4181
  }
4182
  
4183
  pRes->val.node.type = QUERY_NODE_VALUE;
20,975,063✔
4184
  pRes->val.flag &= (~VALUE_FLAG_VAL_UNSET);
20,973,668✔
4185
  pRes->val.translate = true;
20,976,371✔
4186
  
4187
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0);
20,974,554✔
4188
  if (colDataIsNull_s(pCol, 0)) {
20,973,260✔
4189
    pRes->val.isNull = true;
1,712,355✔
4190
  } else {
4191
    code = nodesSetValueNodeValueExt(&pRes->val, colDataGetData(pCol, 0), &needFree);
19,260,905✔
4192
  }
4193

4194
  if (!needFree) {
20,975,499✔
4195
    pCol->pData = NULL;
14,644✔
4196
  }
4197

4198
  return code;
20,975,499✔
4199
}
4200

4201
int32_t remoteFetchCallBack(void* param, SDataBuf* pMsg, int32_t code) {
37,974,166✔
4202
  SScalarFetchParam* pParam = (SScalarFetchParam*)param;
37,974,166✔
4203
  STaskSubJobCtx* ctx = pParam->pSubJobCtx;
37,974,166✔
4204
  SSDataBlock* pResBlock = NULL;
37,973,657✔
4205
  
4206
  taosMemoryFreeClear(pMsg->pEpSet);
37,974,675✔
4207

4208
  if (NULL == ctx) {
37,971,331✔
4209
    qWarn("scl fetch ctx not exists since it may have been released");
5,599✔
4210
    goto _exit;
5,599✔
4211
  }
4212

4213
  qDebug("%s subQIdx %d got rsp, code:%d, rsp:%p", ctx->idStr, pParam->subQIdx, code, pMsg->pData);
37,965,732✔
4214

4215
  taosWLockLatch(&ctx->lock);
37,965,732✔
4216
  ctx->param = NULL;
37,968,567✔
4217
  taosWUnLockLatch(&ctx->lock);
37,968,058✔
4218

4219
  if (ctx->transporterId > 0) {
37,969,512✔
4220
    int32_t ret = asyncFreeConnById(ctx->rpcHandle, ctx->transporterId);
37,969,969✔
4221
    if (ret != 0) {
37,967,643✔
4222
      qDebug("%s failed to free subQ rpc handle, code:%s, subQIdx:%d", ctx->idStr, tstrerror(ret), pParam->subQIdx);
×
4223
    }
4224
    ctx->transporterId = -1;
37,967,643✔
4225
  }
4226

4227
  if (0 == code && NULL == pMsg->pData) {
37,967,622✔
4228
    qError("%s invalid rsp msg, msgType:%d, len:%d", ctx->idStr, pMsg->msgType, pMsg->len);
×
4229
    code = TSDB_CODE_QRY_INVALID_MSG;
×
4230
  }
4231

4232
  if (code == TSDB_CODE_SUCCESS) {
37,966,698✔
4233
    SRetrieveTableRsp* pRsp = pMsg->pData;
31,236,182✔
4234
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
31,237,999✔
4235
    pRsp->compLen = htonl(pRsp->compLen);
31,237,999✔
4236
    pRsp->payloadLen = htonl(pRsp->payloadLen);
31,237,490✔
4237
    pRsp->numOfCols = htonl(pRsp->numOfCols);
31,236,691✔
4238
    pRsp->useconds = htobe64(pRsp->useconds);
31,237,054✔
4239
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
31,236,691✔
4240

4241
    if (pRsp->numOfRows > 1 || pRsp->numOfBlocks > 1 || !pRsp->completed) {
31,237,926✔
4242
      qError("%s invalid scl fetch rsp received, subQIdx:%d, rows:%" PRId64 ", blocks:%d, completed:%d", 
724,068✔
4243
        ctx->idStr, pParam->subQIdx, pRsp->numOfRows, pRsp->numOfBlocks, pRsp->completed);
4244
      ctx->code = TSDB_CODE_PAR_INVALID_SCALAR_SUBQ_RES_ROWS;
724,068✔
4245
    } else if (0 == pRsp->numOfRows) {
30,514,876✔
4246
      SRemoteValueNode* pRemote = (SRemoteValueNode*)pParam->pRes;
9,539,363✔
4247
      pRemote->val.node.type = QUERY_NODE_VALUE;
9,539,363✔
4248
      pRemote->val.isNull = true;
9,539,363✔
4249
      pRemote->val.translate = true;
9,539,363✔
4250
      pRemote->val.flag &= (~VALUE_FLAG_VAL_UNSET);
9,539,363✔
4251
      taosArraySet(ctx->subResValues, pParam->subQIdx, &pParam->pRes);
9,538,491✔
4252
    } else {
4253
      qDebug("%s scl fetch rsp received, subQIdx:%d, rows:%" PRId64 , ctx->idStr, pParam->subQIdx, pRsp->numOfRows);
20,975,586✔
4254
      ctx->code = createBlockFromRemoteValueNode(&pResBlock, pParam->pRes);
20,975,586✔
4255
      if (TSDB_CODE_SUCCESS == ctx->code) {
20,976,385✔
4256
        ctx->code = blockDataEnsureCapacity(pResBlock, 1);
20,976,022✔
4257
      }
4258
      if (TSDB_CODE_SUCCESS == ctx->code) {
20,976,371✔
4259
        ctx->code = extractSingleRspBlock(pRsp, pResBlock);
20,975,935✔
4260
      }
4261
      if (TSDB_CODE_SUCCESS == ctx->code) {
20,975,949✔
4262
        ctx->code = setValueFromResBlock(ctx, pParam->pRes, pResBlock);
20,975,586✔
4263
      }
4264
      if (TSDB_CODE_SUCCESS == ctx->code) {
20,974,045✔
4265
        taosArraySet(ctx->subResValues, pParam->subQIdx, &pParam->pRes);
20,973,239✔
4266
      }
4267
    }
4268
  } else {
4269
    ctx->code = rpcCvtErrCode(code);
6,730,516✔
4270
    if (ctx->code != code) {
6,732,043✔
4271
      qError("%s scl fetch rsp received, subQIdx:%d, error:%s, cvted error: %s", ctx->idStr, pParam->subQIdx,
×
4272
             tstrerror(code), tstrerror(ctx->code));
4273
    } else {
4274
      qError("%s scl fetch rsp received, subQIdx:%d, error:%s", ctx->idStr, pParam->subQIdx, tstrerror(code));
6,730,516✔
4275
    }
4276
  }
4277
  
4278
  code = tsem_post(&pParam->pSubJobCtx->ready);
37,968,560✔
4279
  if (code != TSDB_CODE_SUCCESS) {
37,970,478✔
4280
    qError("failed to invoke post when scl fetch rsp is ready, code:%s", tstrerror(code));
×
4281
  }
4282

4283
_exit:
37,976,077✔
4284

4285
  taosMemoryFree(pMsg->pData);
37,975,554✔
4286
  blockDataDestroy(pResBlock);
37,975,641✔
4287

4288
  return code;
37,974,682✔
4289
}
4290

4291

4292
int32_t fetchRemoteValueImpl(STaskSubJobCtx* ctx, int32_t subQIdx, SRemoteValueNode* pRes) {
37,965,869✔
4293
  int32_t          code = TSDB_CODE_SUCCESS;
37,965,869✔
4294
  int32_t          lino = 0;
37,965,869✔
4295
  SDownstreamSourceNode* pSource = (SDownstreamSourceNode*)taosArrayGetP(ctx->subEndPoints, subQIdx);
37,965,869✔
4296

4297
  SResFetchReq req = {0};
37,964,080✔
4298
  req.header.vgId = pSource->addr.nodeId;
37,963,135✔
4299
  req.sId = pSource->sId;
37,957,798✔
4300
  req.clientId = pSource->clientId;
37,963,724✔
4301
  req.taskId = pSource->taskId;
37,961,109✔
4302
  req.queryId = ctx->queryId;
37,942,125✔
4303
  req.execId = pSource->execId;
37,955,201✔
4304

4305
  int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, false);
37,949,199✔
4306
  if (msgSize < 0) {
37,941,638✔
4307
    return msgSize;
×
4308
  }
4309

4310
  void* msg = taosMemoryCalloc(1, msgSize);
37,941,638✔
4311
  if (NULL == msg) {
37,935,181✔
4312
    return terrno;
×
4313
  }
4314

4315
  msgSize = tSerializeSResFetchReq(msg, msgSize, &req, false);
37,935,181✔
4316
  if (msgSize < 0) {
37,959,525✔
4317
    taosMemoryFree(msg);
×
4318
    return msgSize;
×
4319
  }
4320

4321
  qDebug("%s scl build fetch msg and send to nodeId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
37,959,525✔
4322
         ", execId:%d",
4323
         ctx->idStr, pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
4324
         pSource->taskId, pSource->execId);
4325

4326
  // send the fetch remote task result reques
4327
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
37,963,180✔
4328
  if (NULL == pMsgSendInfo) {
37,930,635✔
4329
    taosMemoryFreeClear(msg);
×
4330
    qError("%s prepare message %d failed", ctx->idStr, (int32_t)sizeof(SMsgSendInfo));
×
4331
    return terrno;
×
4332
  }
4333

4334
  SScalarFetchParam* param = taosMemoryMalloc(sizeof(SScalarFetchParam));
37,930,635✔
4335
  if (NULL == param) {
37,944,103✔
4336
    taosMemoryFreeClear(msg);
×
4337
    taosMemoryFreeClear(pMsgSendInfo);
×
4338
    qError("%s prepare param %d failed", ctx->idStr, (int32_t)sizeof(SScalarFetchParam));
×
4339
    return terrno;
×
4340
  }
4341

4342
  taosWLockLatch(&ctx->lock);
37,944,103✔
4343
  
4344
  if (ctx->code) {
37,969,552✔
4345
    qError("task has been killed, error:%s", tstrerror(ctx->code));
×
4346
    taosMemoryFree(param);
×
4347
    code = ctx->code;
×
4348
    goto _end;
×
4349
  } else {
4350
    ctx->param = param;
37,947,500✔
4351
  }
4352
  
4353
  taosWUnLockLatch(&ctx->lock);
37,955,979✔
4354

4355
  param->subQIdx = subQIdx;
37,965,405✔
4356
  param->pRes = pRes;
37,965,768✔
4357
  param->pSubJobCtx = ctx;
37,965,049✔
4358

4359
  pMsgSendInfo->param = param;
37,960,958✔
4360
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
37,942,027✔
4361
  pMsgSendInfo->msgInfo.pData = msg;
37,938,584✔
4362
  pMsgSendInfo->msgInfo.len = msgSize;
37,952,696✔
4363
  pMsgSendInfo->msgType = pSource->fetchMsgType;
37,939,356✔
4364
  pMsgSendInfo->fp = remoteFetchCallBack;
37,953,459✔
4365
  pMsgSendInfo->requestId = ctx->queryId;
37,945,961✔
4366

4367
  code = asyncSendMsgToServer(ctx->rpcHandle, &pSource->addr.epSet, &ctx->transporterId, pMsgSendInfo);
37,939,154✔
4368
  QUERY_CHECK_CODE(code, lino, _end);
37,972,816✔
4369

4370
  code = qSemWait(ctx->pTaskInfo, &ctx->ready);
37,972,816✔
4371
  if (isTaskKilled(ctx->pTaskInfo)) {
37,976,150✔
4372
    code = getTaskCode(ctx->pTaskInfo);
9,162✔
4373
  } else {
4374
    code = ctx->code;
37,966,479✔
4375
  }
4376
      
4377
_end:
37,974,260✔
4378

4379
  taosWLockLatch(&ctx->lock);
37,974,260✔
4380
  ctx->param = NULL;
37,974,239✔
4381
  taosWUnLockLatch(&ctx->lock);
37,975,184✔
4382

4383
  if (code != TSDB_CODE_SUCCESS) {
37,976,129✔
4384
    qError("%s %s failed at line %d since %s", ctx->idStr, __func__, lino, tstrerror(code));
7,459,436✔
4385
  }
4386
  return code;
37,975,191✔
4387
}
4388

4389

4390
int32_t qFetchRemoteValue(void* pCtx, int32_t subQIdx, SRemoteValueNode* pRes) {
39,219,185✔
4391
  STaskSubJobCtx*  ctx = (STaskSubJobCtx*)pCtx;
39,219,185✔
4392
  int32_t code = 0, lino = 0;
39,219,185✔
4393
  int32_t       subEndPoinsNum = taosArrayGetSize(ctx->subEndPoints);
39,219,185✔
4394
  if (subQIdx >= subEndPoinsNum) {
39,203,722✔
4395
    qError("%s invalid subQIdx %d, subEndPointsNum:%d", ctx->idStr, subQIdx, subEndPoinsNum);
×
4396
    return TSDB_CODE_QRY_SUBQ_NOT_FOUND;
×
4397
  }
4398

4399
  SValueNode** ppRes = taosArrayGet(ctx->subResValues, subQIdx);
39,203,722✔
4400
  if (NULL == *ppRes) {
39,212,413✔
4401
    TAOS_CHECK_EXIT(fetchRemoteValueImpl(ctx, subQIdx, pRes));
37,967,474✔
4402
    *ppRes = (SValueNode*)pRes;
30,514,789✔
4403
  } else {
4404
    TAOS_CHECK_EXIT(valueNodeCopy(*ppRes, &pRes->val));
1,254,261✔
4405
    pRes->val.node.type = QUERY_NODE_VALUE;
1,254,261✔
4406
  }
4407

4408
_exit:
39,229,452✔
4409

4410
  if (code) {
39,229,452✔
4411
    qError("%s %s failed at line %d since %s", ctx->idStr, __func__, lino, tstrerror(code));
7,459,893✔
4412
  }
4413

4414
  return code;
39,229,888✔
4415
}
4416

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc