• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4889

18 Dec 2025 07:22AM UTC coverage: 65.487% (+0.2%) from 65.281%
#4889

push

travis-ci

web-flow
merge: from main to 3.0 branch #33964

9 of 13 new or added lines in 3 files covered. (69.23%)

594 existing lines in 113 files now uncovered.

182473 of 278642 relevant lines covered (65.49%)

105098310.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.04
/source/libs/executor/src/executil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "function.h"
17
#include "functionMgt.h"
18
#include "index.h"
19
#include "os.h"
20
#include "query.h"
21
#include "querynodes.h"
22
#include "taoserror.h"
23
#include "tarray.h"
24
#include "tcompare.h"
25
#include "tdatablock.h"
26
#include "thash.h"
27
#include "tmsg.h"
28
#include "ttime.h"
29

30
#include "executil.h"
31
#include "executorInt.h"
32
#include "querytask.h"
33
#include "storageapi.h"
34
#include "tutil.h"
35
#include "tjson.h"
36

37
typedef struct tagFilterAssist {
38
  SHashObj* colHash;
39
  int32_t   index;
40
  SArray*   cInfoList;
41
  int32_t   code;
42
} tagFilterAssist;
43

44
typedef struct STransTagExprCtx {
45
  int32_t      code;
46
  SMetaReader* pReader;
47
} STransTagExprCtx;
48

49
typedef enum {
50
  FILTER_NO_LOGIC = 1,
51
  FILTER_AND,
52
  FILTER_OTHER,
53
} FilterCondType;
54

55
static FilterCondType checkTagCond(SNode* cond);
56
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI);
57
static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI,
58
                                        uint64_t suid);
59

60
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
61
                            STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI, void* pStreamInfo);
62

63
static int64_t getLimit(const SNode* pLimit) {
577,357,039✔
64
  return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->limit) ? -1 : ((SLimitNode*)pLimit)->limit->datum.i;
577,357,039✔
65
}
66
static int64_t getOffset(const SNode* pLimit) {
577,359,278✔
67
  return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->offset) ? -1 : ((SLimitNode*)pLimit)->offset->datum.i;
577,359,278✔
68
}
69
static void releaseColInfoData(void* pCol);
70

71
void initResultRowInfo(SResultRowInfo* pResultRowInfo) {
226,496,612✔
72
  pResultRowInfo->size = 0;
226,496,612✔
73
  pResultRowInfo->cur.pageId = -1;
226,510,122✔
74
}
226,540,920✔
75

76
void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; }
6,070,333✔
77

78
void resetResultRow(SResultRow* pResultRow, size_t entrySize) {
402,218,971✔
79
  pResultRow->numOfRows = 0;
402,218,971✔
80
  pResultRow->closed = false;
402,218,971✔
81
  pResultRow->endInterp = false;
402,218,971✔
82
  pResultRow->startInterp = false;
402,218,971✔
83

84
  if (entrySize > 0) {
402,218,971✔
85
    memset(pResultRow->pEntryInfo, 0, entrySize);
402,218,971✔
86
  }
87
}
402,218,971✔
88

89
// TODO refactor: use macro
90
SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) {
2,147,483,647✔
91
  return (SResultRowEntryInfo*)((char*)pRow->pEntryInfo + offset[index]);
2,147,483,647✔
92
}
93

94
size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
133,353,130✔
95
  int32_t rowSize = (numOfOutput * sizeof(SResultRowEntryInfo)) + sizeof(SResultRow);
133,353,130✔
96

97
  for (int32_t i = 0; i < numOfOutput; ++i) {
689,217,218✔
98
    rowSize += pCtx[i].resDataInfo.interBufSize;
555,882,185✔
99
  }
100

101
  return rowSize;
133,335,033✔
102
}
103

104
// Convert buf read from rocksdb to result row
105
int32_t getResultRowFromBuf(SExprSupp* pSup, const char* inBuf, size_t inBufSize, char** outBuf, size_t* outBufSize) {
×
106
  if (inBuf == NULL || pSup == NULL) {
×
107
    qError("invalid input parameters, inBuf:%p, pSup:%p", inBuf, pSup);
×
108
    return TSDB_CODE_INVALID_PARA;
×
109
  }
110
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
111
  int32_t*        offset = pSup->rowEntryInfoOffset;
×
112
  SResultRow*     pResultRow = NULL;
×
113
  size_t          processedSize = 0;
×
114
  int32_t         code = TSDB_CODE_SUCCESS;
×
115

116
  // calculate the size of output buffer
117
  *outBufSize = getResultRowSize(pCtx, pSup->numOfExprs);
×
118
  *outBuf = taosMemoryMalloc(*outBufSize);
×
119
  if (*outBuf == NULL) {
×
120
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
121
    return terrno;
×
122
  }
123
  pResultRow = (SResultRow*)*outBuf;
×
124
  (void)memcpy(pResultRow, inBuf, sizeof(SResultRow));
×
125
  inBuf += sizeof(SResultRow);
×
126
  processedSize += sizeof(SResultRow);
×
127

128
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
×
129
    int32_t len = *(int32_t*)inBuf;
×
130
    inBuf += sizeof(int32_t);
×
131
    processedSize += sizeof(int32_t);
×
132
    if (pResultRow->version != FUNCTION_RESULT_INFO_VERSION && pCtx->fpSet.decode) {
×
133
      code = pCtx->fpSet.decode(&pCtx[i], inBuf, getResultEntryInfo(pResultRow, i, offset), pResultRow->version);
×
134
      if (code != TSDB_CODE_SUCCESS) {
×
135
        qError("failed to decode result row, code:%d", code);
×
136
        return code;
×
137
      }
138
    } else {
139
      (void)memcpy(getResultEntryInfo(pResultRow, i, offset), inBuf, len);
×
140
    }
141
    inBuf += len;
×
142
    processedSize += len;
×
143
  }
144

145
  if (processedSize < inBufSize) {
×
146
    // stream stores extra data after result row
147
    size_t leftLen = inBufSize - processedSize;
×
148
    TAOS_MEMORY_REALLOC(*outBuf, *outBufSize + leftLen);
×
149
    if (*outBuf == NULL) {
×
150
      qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen);
×
151
      return terrno;
×
152
    }
153
    (void)memcpy(*outBuf + *outBufSize, inBuf, leftLen);
×
154
    inBuf += leftLen;
×
155
    processedSize += leftLen;
×
156
    *outBufSize += leftLen;
×
157
  }
158

159
  qTrace("[StreamInternal] get result inBufSize:%zu, outBufSize:%zu", inBufSize, *outBufSize);
×
160
  return TSDB_CODE_SUCCESS;
×
161
}
162

163
// Convert result row to buf for rocksdb
164
int32_t putResultRowToBuf(SExprSupp* pSup, const char* inBuf, size_t inBufSize, char** outBuf, size_t* outBufSize) {
×
165
  if (pSup == NULL || inBuf == NULL || outBuf == NULL || outBufSize == NULL) {
×
166
    qError("invalid input parameters, inBuf:%p, pSup:%p, outBufSize:%p, outBuf:%p", inBuf, pSup, outBufSize, outBuf);
×
167
    return TSDB_CODE_INVALID_PARA;
×
168
  }
169

170
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
171
  int32_t*        offset = pSup->rowEntryInfoOffset;
×
172
  SResultRow*     pResultRow = (SResultRow*)inBuf;
×
173
  size_t          rowSize = getResultRowSize(pCtx, pSup->numOfExprs);
×
174

175
  if (rowSize > inBufSize) {
×
176
    qError("invalid input buffer size, rowSize:%zu, inBufSize:%zu", rowSize, inBufSize);
×
177
    return TSDB_CODE_INVALID_PARA;
×
178
  }
179

180
  // calculate the size of output buffer
181
  *outBufSize = rowSize + sizeof(int32_t) * pSup->numOfExprs;
×
182
  if (rowSize < inBufSize) {
×
183
    *outBufSize += inBufSize - rowSize;
×
184
  }
185

186
  *outBuf = taosMemoryMalloc(*outBufSize);
×
187
  if (*outBuf == NULL) {
×
188
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
189
    return terrno;
×
190
  }
191

192
  char* pBuf = *outBuf;
×
193
  pResultRow->version = FUNCTION_RESULT_INFO_VERSION;
×
194
  (void)memcpy(pBuf, pResultRow, sizeof(SResultRow));
×
195
  pBuf += sizeof(SResultRow);
×
196
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
×
197
    size_t len = sizeof(SResultRowEntryInfo) + pCtx[i].resDataInfo.interBufSize;
×
198
    *(int32_t*)pBuf = (int32_t)len;
×
199
    pBuf += sizeof(int32_t);
×
200
    (void)memcpy(pBuf, getResultEntryInfo(pResultRow, i, offset), len);
×
201
    pBuf += len;
×
202
  }
203

204
  if (rowSize < inBufSize) {
×
205
    // stream stores extra data after result row
206
    size_t leftLen = inBufSize - rowSize;
×
207
    (void)memcpy(pBuf, inBuf + rowSize, leftLen);
×
208
    pBuf += leftLen;
×
209
  }
210

211
  qTrace("[StreamInternal] put result inBufSize:%zu, outBufSize:%zu", inBufSize, *outBufSize);
×
212
  return TSDB_CODE_SUCCESS;
×
213
}
214

215
static void freeEx(void* p) { taosMemoryFree(*(void**)p); }
×
216

217
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
65,701,275✔
218
  taosMemoryFreeClear(pGroupResInfo->pBuf);
65,701,275✔
219
  if (pGroupResInfo->freeItem) {
65,698,080✔
220
    //    taosArrayDestroy(pGroupResInfo->pRows);
221
    taosArrayDestroyEx(pGroupResInfo->pRows, freeEx);
×
222
    pGroupResInfo->freeItem = false;
×
223
    pGroupResInfo->pRows = NULL;
×
224
  } else {
225
    taosArrayDestroy(pGroupResInfo->pRows);
65,696,828✔
226
    pGroupResInfo->pRows = NULL;
65,697,030✔
227
  }
228
  pGroupResInfo->index = 0;
65,696,830✔
229
  pGroupResInfo->delIndex = 0;
65,697,886✔
230
}
65,700,880✔
231

232
int32_t resultrowComparAsc(const void* p1, const void* p2) {
2,147,483,647✔
233
  SResKeyPos* pp1 = *(SResKeyPos**)p1;
2,147,483,647✔
234
  SResKeyPos* pp2 = *(SResKeyPos**)p2;
2,147,483,647✔
235

236
  if (pp1->groupId == pp2->groupId) {
2,147,483,647✔
237
    int64_t pts1 = *(int64_t*)pp1->key;
2,147,483,647✔
238
    int64_t pts2 = *(int64_t*)pp2->key;
2,147,483,647✔
239

240
    if (pts1 == pts2) {
2,147,483,647✔
241
      return 0;
×
242
    } else {
243
      return pts1 < pts2 ? -1 : 1;
2,147,483,647✔
244
    }
245
  } else {
246
    return pp1->groupId < pp2->groupId ? -1 : 1;
2,147,483,647✔
247
  }
248
}
249

250
static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); }
1,942,050,749✔
251

252
int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) {
46,348,885✔
253
  int32_t code = TSDB_CODE_SUCCESS;
46,348,885✔
254
  int32_t lino = 0;
46,348,885✔
255
  if (pGroupResInfo->pRows != NULL) {
46,348,885✔
256
    taosArrayDestroy(pGroupResInfo->pRows);
3,307,230✔
257
  }
258
  if (pGroupResInfo->pBuf) {
46,351,137✔
259
    taosMemoryFree(pGroupResInfo->pBuf);
3,307,230✔
260
    pGroupResInfo->pBuf = NULL;
3,307,230✔
261
  }
262

263
  // extract the result rows information from the hash map
264
  int32_t size = tSimpleHashGetSize(pHashmap);
46,347,883✔
265

266
  void* pData = NULL;
46,348,533✔
267
  pGroupResInfo->pRows = taosArrayInit(size, POINTER_BYTES);
46,348,533✔
268
  QUERY_CHECK_NULL(pGroupResInfo->pRows, code, lino, _end, terrno);
46,348,672✔
269

270
  size_t  keyLen = 0;
46,347,477✔
271
  int32_t iter = 0;
46,346,793✔
272
  int64_t bufLen = 0, offset = 0;
46,346,750✔
273

274
  // todo move away and record this during create window
275
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
2,147,483,647✔
276
    /*void* key = */ (void)tSimpleHashGetKey(pData, &keyLen);
277
    bufLen += keyLen + sizeof(SResultRowPosition);
2,147,483,647✔
278
  }
279

280
  pGroupResInfo->pBuf = taosMemoryMalloc(bufLen);
46,553,885✔
281
  QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
46,346,864✔
282

283
  iter = 0;
46,343,027✔
284
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
2,147,483,647✔
285
    void* key = tSimpleHashGetKey(pData, &keyLen);
2,147,483,647✔
286

287
    SResKeyPos* p = (SResKeyPos*)(pGroupResInfo->pBuf + offset);
2,147,483,647✔
288

289
    p->groupId = *(uint64_t*)key;
2,147,483,647✔
290
    p->pos = *(SResultRowPosition*)pData;
2,147,483,647✔
291
    memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
2,147,483,647✔
292
    void* tmp = taosArrayPush(pGroupResInfo->pRows, &p);
2,147,483,647✔
293
    QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
2,147,483,647✔
294

295
    offset += keyLen + sizeof(struct SResultRowPosition);
2,147,483,647✔
296
  }
297

298
  if (order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC) {
46,341,245✔
299
    __compar_fn_t fn = (order == TSDB_ORDER_ASC) ? resultrowComparAsc : resultrowComparDesc;
2,966,565✔
300
    size = POINTER_BYTES;
2,966,565✔
301
    taosSort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), size, fn);
2,966,565✔
302
  }
303

304
  pGroupResInfo->index = 0;
46,340,160✔
305

306
_end:
46,348,987✔
307
  if (code != TSDB_CODE_SUCCESS) {
46,351,154✔
308
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
309
  }
310
  return code;
46,351,154✔
311
}
312

313
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
×
314
  if (pGroupResInfo->pRows != NULL) {
×
315
    taosArrayDestroy(pGroupResInfo->pRows);
×
316
  }
317

318
  pGroupResInfo->freeItem = true;
×
319
  pGroupResInfo->pRows = pArrayList;
×
320
  pGroupResInfo->index = 0;
×
321
  pGroupResInfo->delIndex = 0;
×
322
}
×
323

324
bool hasRemainResults(SGroupResInfo* pGroupResInfo) {
168,037,801✔
325
  if (pGroupResInfo->pRows == NULL) {
168,037,801✔
326
    return false;
×
327
  }
328

329
  return pGroupResInfo->index < taosArrayGetSize(pGroupResInfo->pRows);
168,042,976✔
330
}
331

332
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
91,432,803✔
333
  if (pGroupResInfo->pRows == 0) {
91,432,803✔
334
    return 0;
×
335
  }
336

337
  return (int32_t)taosArrayGetSize(pGroupResInfo->pRows);
91,436,017✔
338
}
339

340
SArray* createSortInfo(SNodeList* pNodeList) {
23,322,206✔
341
  size_t numOfCols = 0;
23,322,206✔
342

343
  if (pNodeList != NULL) {
23,322,206✔
344
    numOfCols = LIST_LENGTH(pNodeList);
23,284,556✔
345
  } else {
346
    numOfCols = 0;
37,650✔
347
  }
348

349
  SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
23,321,843✔
350
  if (pList == NULL) {
23,319,048✔
351
    return pList;
×
352
  }
353

354
  for (int32_t i = 0; i < numOfCols; ++i) {
50,703,305✔
355
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)nodesListGetNode(pNodeList, i);
27,383,158✔
356
    if (!pSortKey) {
27,383,505✔
357
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
358
      taosArrayDestroy(pList);
×
359
      pList = NULL;
×
360
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
361
      break;
×
362
    }
363
    SBlockOrderInfo bi = {0};
27,383,505✔
364
    bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
27,384,153✔
365
    bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
27,380,942✔
366

367
    if (nodeType(pSortKey->pExpr) != QUERY_NODE_COLUMN) {
27,380,838✔
368
      qError("invalid order by expr type:%d", nodeType(pSortKey->pExpr));
×
369
      taosArrayDestroy(pList);
×
370
      pList = NULL;
×
371
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
372
      break;
×
373
    }
374
    
375
    SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr;
27,380,736✔
376
    bi.slotId = pColNode->slotId;
27,383,880✔
377
    void* tmp = taosArrayPush(pList, &bi);
27,384,649✔
378
    if (!tmp) {
27,384,649✔
379
      taosArrayDestroy(pList);
×
380
      pList = NULL;
×
381
      break;
×
382
    }
383
  }
384

385
  return pList;
23,320,147✔
386
}
387

388
SSDataBlock* createDataBlockFromDescNode(void* p) {
343,099,584✔
389
  SDataBlockDescNode* pNode = (SDataBlockDescNode*)p;
343,099,584✔
390
  int32_t      numOfCols = LIST_LENGTH(pNode->pSlots);
343,099,584✔
391
  SSDataBlock* pBlock = NULL;
343,137,191✔
392
  int32_t      code = createDataBlock(&pBlock);
343,139,643✔
393
  if (code) {
343,088,963✔
394
    terrno = code;
×
395
    return NULL;
×
396
  }
397

398
  pBlock->info.id.blockId = pNode->dataBlockId;
343,088,963✔
399
  pBlock->info.type = STREAM_INVALID;
343,078,900✔
400
  pBlock->info.calWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
343,092,665✔
401
  pBlock->info.watermark = INT64_MIN;
343,142,228✔
402

403
  for (int32_t i = 0; i < numOfCols; ++i) {
2,110,126,502✔
404
    SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
1,766,947,841✔
405
    if (!pDescNode) {
1,766,864,833✔
406
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
407
      blockDataDestroy(pBlock);
×
408
      pBlock = NULL;
×
409
      terrno = TSDB_CODE_INVALID_PARA;
×
410
      break;
×
411
    }
412
    SColumnInfoData idata =
1,766,808,529✔
413
        createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
1,766,945,682✔
414
    idata.info.scale = pDescNode->dataType.scale;
1,766,979,657✔
415
    idata.info.precision = pDescNode->dataType.precision;
1,767,002,213✔
416
    idata.info.noData = pDescNode->reserve;
1,766,999,911✔
417

418
    code = blockDataAppendColInfo(pBlock, &idata);
1,767,025,272✔
419
    if (code != TSDB_CODE_SUCCESS) {
1,767,032,885✔
420
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
38,912✔
421
      blockDataDestroy(pBlock);
38,912✔
422
      pBlock = NULL;
×
423
      terrno = code;
×
424
      break;
×
425
    }
426
  }
427

428
  return pBlock;
343,185,986✔
429
}
430

431
int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) {
116,183,844✔
432
  SDataBlockInfo* pBlockInfo = &pDataBlock->info;
116,183,844✔
433

434
  for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) {
727,347,361✔
435
    SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i);
618,816,407✔
436
    if (!pItem) {
618,767,388✔
437
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
34✔
438
      return terrno;
34✔
439
    }
440

441
    if (pItem->isPk) {
618,767,354✔
442
      SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId);
7,686,800✔
443
      if (!pInfoData) {
7,567,556✔
444
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
445
        return terrno;
×
446
      }
447
      pBlockInfo->pks[0].type = pInfoData->info.type;
7,567,556✔
448
      pBlockInfo->pks[1].type = pInfoData->info.type;
7,583,778✔
449

450
      // allocate enough buffer size, which is pInfoData->info.bytes
451
      if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
7,579,262✔
452
        pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
2,539,266✔
453
        if (pBlockInfo->pks[0].pData == NULL) {
2,532,898✔
454
          return terrno;
×
455
        }
456

457
        pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
2,534,950✔
458
        if (pBlockInfo->pks[1].pData == NULL) {
2,533,582✔
459
          taosMemoryFreeClear(pBlockInfo->pks[0].pData);
×
460
          return terrno;
×
461
        }
462

463
        pBlockInfo->pks[0].nData = pInfoData->info.bytes;
2,533,582✔
464
        pBlockInfo->pks[1].nData = pInfoData->info.bytes;
2,533,582✔
465
      }
466

467
      break;
7,575,842✔
468
    }
469
  }
470

471
  return TSDB_CODE_SUCCESS;
116,142,080✔
472
}
473

474
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
431,788✔
475
  STransTagExprCtx* pCtx = pContext;
431,788✔
476
  SMetaReader*      mr = pCtx->pReader;
431,788✔
477
  bool              isTagCol = false, isTbname = false;
431,788✔
478
  if (nodeType(*pNode) == QUERY_NODE_COLUMN) {
431,788✔
479
    SColumnNode* pCol = (SColumnNode*)*pNode;
123,368✔
480
    if (pCol->colType == COLUMN_TYPE_TBNAME)
123,368✔
481
      isTbname = true;
×
482
    else
483
      isTagCol = true;
123,368✔
484
  } else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) {
308,420✔
485
    SFunctionNode* pFunc = (SFunctionNode*)*pNode;
×
486
    if (pFunc->funcType == FUNCTION_TYPE_TBNAME) isTbname = true;
×
487
  }
488
  if (isTagCol) {
431,788✔
489
    SColumnNode* pSColumnNode = *(SColumnNode**)pNode;
123,368✔
490

491
    SValueNode* res = NULL;
123,368✔
492
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
123,368✔
493
    if (NULL == res) {
123,368✔
494
      return DEAL_RES_ERROR;
×
495
    }
496

497
    res->translate = true;
123,368✔
498
    res->node.resType = pSColumnNode->node.resType;
123,368✔
499

500
    STagVal tagVal = {0};
123,368✔
501
    tagVal.cid = pSColumnNode->colId;
123,368✔
502
    const char* p = mr->pAPI->extractTagVal(mr->me.ctbEntry.pTags, pSColumnNode->node.resType.type, &tagVal);
123,368✔
503
    if (p == NULL) {
123,368✔
504
      res->node.resType.type = TSDB_DATA_TYPE_NULL;
×
505
    } else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) {
123,368✔
506
      int32_t len = ((const STag*)p)->len;
×
507
      res->datum.p = taosMemoryCalloc(len + 1, 1);
×
508
      if (NULL == res->datum.p) {
×
509
        return DEAL_RES_ERROR;
×
510
      }
511
      memcpy(res->datum.p, p, len);
×
512
    } else if (IS_VAR_DATA_TYPE(pSColumnNode->node.resType.type)) {
123,368✔
513
      if (IS_STR_DATA_BLOB(pSColumnNode->node.resType.type)) {
123,368✔
514
        return TSDB_CODE_BLOB_NOT_SUPPORT_TAG;
×
515
      }
516

517
      res->datum.p = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1);
123,368✔
518
      if (NULL == res->datum.p) {
123,368✔
519
        return DEAL_RES_ERROR;
×
520
      }
521

522
      if (IS_STR_DATA_BLOB(pSColumnNode->node.resType.type)) {
123,368✔
523
        memcpy(blobDataVal(res->datum.p), tagVal.pData, tagVal.nData);
×
524
        blobDataSetLen(res->datum.p, tagVal.nData);
×
525
      } else {
526
        memcpy(varDataVal(res->datum.p), tagVal.pData, tagVal.nData);
123,368✔
527
        varDataSetLen(res->datum.p, tagVal.nData);
123,368✔
528
      }
529
    } else {
530
      int32_t code = nodesSetValueNodeValue(res, &(tagVal.i64));
×
531
      if (code != TSDB_CODE_SUCCESS) {
×
532
        return DEAL_RES_ERROR;
×
533
      }
534
    }
535
    nodesDestroyNode(*pNode);
123,368✔
536
    *pNode = (SNode*)res;
123,368✔
537
  } else if (isTbname) {
308,420✔
538
    SValueNode* res = NULL;
×
539
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
×
540
    if (NULL == res) {
×
541
      return DEAL_RES_ERROR;
×
542
    }
543

544
    res->translate = true;
×
545
    res->node.resType = ((SExprNode*)(*pNode))->resType;
×
546

547
    int32_t len = strlen(mr->me.name);
×
548
    res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
×
549
    if (NULL == res->datum.p) {
×
550
      return DEAL_RES_ERROR;
×
551
    }
552
    memcpy(varDataVal(res->datum.p), mr->me.name, len);
×
553
    varDataSetLen(res->datum.p, len);
×
554
    nodesDestroyNode(*pNode);
×
555
    *pNode = (SNode*)res;
×
556
  }
557

558
  return DEAL_RES_CONTINUE;
431,788✔
559
}
560

561
int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI* pAPI) {
61,684✔
562
  int32_t     code = TSDB_CODE_SUCCESS;
61,684✔
563
  SMetaReader mr = {0};
61,684✔
564

565
  pAPI->metaReaderFn.initReader(&mr, metaHandle, META_READER_LOCK, &pAPI->metaFn);
61,684✔
566
  code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, info->uid);
61,684✔
567
  if (TSDB_CODE_SUCCESS != code) {
61,684✔
568
    pAPI->metaReaderFn.clearReader(&mr);
×
569
    *pQualified = false;
×
570

571
    return TSDB_CODE_SUCCESS;
×
572
  }
573

574
  SNode* pTagCondTmp = NULL;
61,684✔
575
  code = nodesCloneNode(pTagCond, &pTagCondTmp);
61,684✔
576
  if (TSDB_CODE_SUCCESS != code) {
61,684✔
577
    *pQualified = false;
×
578
    pAPI->metaReaderFn.clearReader(&mr);
×
579
    return code;
×
580
  }
581
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
61,684✔
582
  nodesRewriteExprPostOrder(&pTagCondTmp, doTranslateTagExpr, &ctx);
61,684✔
583
  pAPI->metaReaderFn.clearReader(&mr);
61,684✔
584
  if (TSDB_CODE_SUCCESS != ctx.code) {
61,684✔
585
    *pQualified = false;
×
586
    terrno = code;
×
587
    return code;
×
588
  }
589

590
  SNode* pNew = NULL;
61,684✔
591
  code = scalarCalculateConstants(pTagCondTmp, &pNew);
61,684✔
592
  if (TSDB_CODE_SUCCESS != code) {
61,684✔
593
    terrno = code;
×
594
    nodesDestroyNode(pTagCondTmp);
×
595
    *pQualified = false;
×
596

597
    return code;
×
598
  }
599

600
  SValueNode* pValue = (SValueNode*)pNew;
61,684✔
601
  *pQualified = pValue->datum.b;
61,684✔
602

603
  nodesDestroyNode(pNew);
61,684✔
604
  return TSDB_CODE_SUCCESS;
61,684✔
605
}
606

607
static EDealRes getColumn(SNode** pNode, void* pContext) {
46,053,475✔
608
  tagFilterAssist* pData = (tagFilterAssist*)pContext;
46,053,475✔
609
  SColumnNode*     pSColumnNode = NULL;
46,053,475✔
610
  if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
46,057,983✔
611
    pSColumnNode = *(SColumnNode**)pNode;
15,087,843✔
612
  } else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) {
30,975,900✔
613
    SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
689,530✔
614
    if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
689,652✔
615
      pData->code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pSColumnNode);
641,936✔
616
      if (NULL == pSColumnNode) {
641,936✔
617
        return DEAL_RES_ERROR;
×
618
      }
619
      pSColumnNode->colId = -1;
641,936✔
620
      pSColumnNode->colType = COLUMN_TYPE_TBNAME;
641,936✔
621
      pSColumnNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
641,604✔
622
      pSColumnNode->node.resType.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
641,604✔
623
      nodesDestroyNode(*pNode);
641,726✔
624
      *pNode = (SNode*)pSColumnNode;
641,814✔
625
    } else {
626
      return DEAL_RES_CONTINUE;
47,716✔
627
    }
628
  } else {
629
    return DEAL_RES_CONTINUE;
30,284,077✔
630
  }
631

632
  void* data = taosHashGet(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId));
15,728,982✔
633
  if (!data) {
15,728,124✔
634
    int32_t tempRes =
635
        taosHashPut(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode)));
14,455,183✔
636
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
14,455,037✔
637
      return DEAL_RES_ERROR;
×
638
    }
639
    pSColumnNode->slotId = pData->index++;
14,455,037✔
640
    SColumnInfo cInfo = {.colId = pSColumnNode->colId,
14,455,718✔
641
                         .type = pSColumnNode->node.resType.type,
14,453,835✔
642
                         .bytes = pSColumnNode->node.resType.bytes,
14,455,806✔
643
                         .pk = pSColumnNode->isPk};
14,451,767✔
644
#if TAG_FILTER_DEBUG
645
    qDebug("tagfilter build column info, slotId:%d, colId:%d, type:%d", pSColumnNode->slotId, cInfo.colId, cInfo.type);
646
#endif
647
    void* tmp = taosArrayPush(pData->cInfoList, &cInfo);
14,453,960✔
648
    if (!tmp) {
14,457,304✔
649
      return DEAL_RES_ERROR;
×
650
    }
651
  } else {
652
    SColumnNode* col = *(SColumnNode**)data;
1,272,941✔
653
    pSColumnNode->slotId = col->slotId;
1,272,941✔
654
  }
655

656
  return DEAL_RES_CONTINUE;
15,724,611✔
657
}
658

659
static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) {
13,307,001✔
660
  SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
13,307,001✔
661
  if (pColumnData == NULL) {
13,307,938✔
662
    return terrno;
×
663
  }
664

665
  pColumnData->info.type = pType->type;
13,307,938✔
666
  pColumnData->info.bytes = pType->bytes;
13,306,013✔
667
  pColumnData->info.scale = pType->scale;
13,306,747✔
668
  pColumnData->info.precision = pType->precision;
13,302,998✔
669

670
  int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
13,305,974✔
671
  if (code != TSDB_CODE_SUCCESS) {
13,298,804✔
672
    terrno = code;
×
673
    releaseColInfoData(pColumnData);
×
674
    return terrno;
×
675
  }
676

677
  pParam->columnData = pColumnData;
13,298,804✔
678
  pParam->colAlloced = true;
13,305,668✔
679
  return TSDB_CODE_SUCCESS;
13,303,105✔
680
}
681

682
static void releaseColInfoData(void* pCol) {
2,140,809✔
683
  if (pCol) {
2,140,809✔
684
    SColumnInfoData* col = (SColumnInfoData*)pCol;
2,140,809✔
685
    colDataDestroy(col);
2,140,809✔
686
    taosMemoryFree(col);
2,140,128✔
687
  }
688
}
2,139,589✔
689

690
void freeItem(void* p) {
188,556,969✔
691
  STUidTagInfo* pInfo = p;
188,556,969✔
692
  if (pInfo->pTagVal != NULL) {
188,556,969✔
693
    taosMemoryFree(pInfo->pTagVal);
188,134,552✔
694
  }
695
}
188,554,305✔
696

697
typedef struct {
698
  col_id_t  colId;
699
  SNode*    pValueNode;
700
  int32_t   bytes;  // length defined in schema
701
} STagDataEntry;
702

703
static int compareTagDataEntry(const void* a, const void* b) {
39,490✔
704
  STagDataEntry* p1 = (STagDataEntry*)a;
39,490✔
705
  STagDataEntry* p2 = (STagDataEntry*)b;
39,490✔
706
  return compareInt16Val(&p1->colId, &p2->colId);
39,490✔
707
}
708

709
static int32_t buildTagDataEntryKey(SArray* pIdWithValue, char** keyBuf, int32_t keyLen) {
19,745✔
710
  *keyBuf = (char*)taosMemoryCalloc(1, keyLen);
19,745✔
711
  if (NULL == *keyBuf) {
19,745✔
712
    qError(
×
713
      "failed to allocate memory for tag filter optimization key, size:%d",
714
      keyLen);
715
    return terrno;
×
716
  }
717
  char* pStart = *keyBuf;
19,745✔
718
  for (int32_t i = 0; i < taosArrayGetSize(pIdWithValue); ++i) {
59,235✔
719
    STagDataEntry* entry      = (STagDataEntry*)taosArrayGet(pIdWithValue, i);
39,490✔
720
    SValueNode*    pValueNode = (SValueNode*)entry->pValueNode;
39,490✔
721
    // num type may have different bytes length, use the smaller one
722
    int32_t        bytes = TMIN(entry->bytes, pValueNode->node.resType.bytes);
39,490✔
723

724
    (void)memcpy(pStart, &entry->colId, sizeof(col_id_t));
39,490✔
725
    pStart += sizeof(col_id_t);
39,490✔
726

727
    if (!pValueNode->isNull) {
39,490✔
728
      switch (pValueNode->node.resType.type) {
34,105✔
729
        case TSDB_DATA_TYPE_BOOL:
3,949✔
730
          (void)memcpy(
3,949✔
731
            pStart, &pValueNode->datum.b, bytes);
3,949✔
732
          pStart += bytes;
3,949✔
733
          break;
3,949✔
734
        case TSDB_DATA_TYPE_TINYINT:
16,155✔
735
        case TSDB_DATA_TYPE_SMALLINT:
736
        case TSDB_DATA_TYPE_INT:
737
        case TSDB_DATA_TYPE_BIGINT:
738
        case TSDB_DATA_TYPE_TIMESTAMP:
739
          (void)memcpy(
16,155✔
740
            pStart, &pValueNode->datum.i, bytes);
16,155✔
741
          pStart += bytes;
16,155✔
742
          break;
16,155✔
743
        case TSDB_DATA_TYPE_UTINYINT:
×
744
        case TSDB_DATA_TYPE_USMALLINT:
745
        case TSDB_DATA_TYPE_UINT:
746
        case TSDB_DATA_TYPE_UBIGINT:
747
          (void)memcpy(
×
748
            pStart, &pValueNode->datum.u, bytes);
×
749
          pStart += bytes;
×
750
          break;
×
751
        case TSDB_DATA_TYPE_FLOAT:
5,385✔
752
        case TSDB_DATA_TYPE_DOUBLE:
753
          (void)memcpy(
5,385✔
754
            pStart, &pValueNode->datum.d, bytes);
5,385✔
755
          pStart += bytes;
5,385✔
756
          break;
5,385✔
757
        case TSDB_DATA_TYPE_VARCHAR:
8,616✔
758
        case TSDB_DATA_TYPE_VARBINARY:
759
        case TSDB_DATA_TYPE_NCHAR:
760
          (void)memcpy(pStart,
8,616✔
761
            varDataVal(pValueNode->datum.p), varDataLen(pValueNode->datum.p));
8,616✔
762
          pStart += varDataLen(pValueNode->datum.p);
8,616✔
763
          break;
8,616✔
764
        default:
×
765
          qError("unsupported tag data type %d in tag filter optimization",
×
766
            pValueNode->node.resType.type);
767
          return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
768
      }
769
    }
770
  }
771

772
  return TSDB_CODE_SUCCESS;
19,745✔
773
}
774

775
static void extractTagDataEntry(
39,490✔
776
  SOperatorNode* pOpNode, SArray* pIdWithValue) {
777
  SNode* pLeft = pOpNode->pLeft;
39,490✔
778
  SNode* pRight = pOpNode->pRight;
39,490✔
779
  SColumnNode* pColNode = nodeType(pLeft) == QUERY_NODE_COLUMN ?
39,490✔
780
    (SColumnNode*)pLeft : (SColumnNode*)pRight;
39,490✔
781
  SValueNode* pValueNode = nodeType(pLeft) == QUERY_NODE_VALUE ?
39,490✔
782
    (SValueNode*)pLeft : (SValueNode*)pRight;
39,490✔
783

784
  STagDataEntry entry = {0};
39,490✔
785
  entry.colId = pColNode->colId;
39,490✔
786
  entry.pValueNode = (SNode*)pValueNode;
39,490✔
787
  entry.bytes = pColNode->node.resType.bytes;
39,490✔
788
  void* _tmp = taosArrayPush(pIdWithValue, &entry);
39,490✔
789
}
39,490✔
790

791
static int32_t extractTagFilterTagDataEntries(
19,745✔
792
  const SNode* pTagCond, SArray* pIdWithVal) {
793
  if (NULL == pTagCond || NULL == pIdWithVal ||
19,745✔
794
    (nodeType(pTagCond) != QUERY_NODE_OPERATOR &&
19,745✔
795
      nodeType(pTagCond) != QUERY_NODE_LOGIC_CONDITION)) {
19,745✔
796
    qError("invalid parameter to extract tag filter symbol");
×
797
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
798
  }
799

800
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR) {
19,745✔
801
    extractTagDataEntry((SOperatorNode*)pTagCond, pIdWithVal);
×
802
  } else if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION) {
19,745✔
803
    SNode* pChild = NULL;
19,745✔
804
    FOREACH(pChild, ((SLogicConditionNode*)pTagCond)->pParameterList) {
59,235✔
805
      extractTagDataEntry((SOperatorNode*)pChild, pIdWithVal);
39,490✔
806
    }
807
  }
808

809
  taosArraySort(pIdWithVal, compareTagDataEntry);
19,745✔
810

811
  return TSDB_CODE_SUCCESS;
19,745✔
812
}
813

814
static int32_t genStableTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
19,745✔
815
  if (pTagCond == NULL) {
19,745✔
816
    return TSDB_CODE_SUCCESS;
×
817
  }
818

819
  char*   payload = NULL;
19,745✔
820
  int32_t len = 0;
19,745✔
821
  int32_t code = TSDB_CODE_SUCCESS;
19,745✔
822
  int32_t lino = 0;
19,745✔
823

824
  SArray* pIdWithVal = taosArrayInit(TARRAY_MIN_SIZE, sizeof(STagDataEntry));
19,745✔
825
  code = extractTagFilterTagDataEntries(pTagCond, pIdWithVal);
19,745✔
826
  QUERY_CHECK_CODE(code, lino, _end);
19,745✔
827
  for (int32_t i = 0; i < taosArrayGetSize(pIdWithVal); ++i) {
59,235✔
828
    STagDataEntry* pEntry = taosArrayGet(pIdWithVal, i);
39,490✔
829
    len += sizeof(col_id_t) + pEntry->bytes;
39,490✔
830
  }
831
  code = buildTagDataEntryKey(pIdWithVal, &payload, len);
19,745✔
832
  QUERY_CHECK_CODE(code, lino, _end);
19,745✔
833

834
  tMD5Init(pContext);
19,745✔
835
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
19,745✔
836
  tMD5Final(pContext);
19,745✔
837

838
_end:
19,745✔
839
  if (TSDB_CODE_SUCCESS != code) {
19,745✔
840
    qError("%s failed at line %d since %s",
×
841
      __func__, __LINE__, tstrerror(code));
842
  }
843
  taosArrayDestroy(pIdWithVal);
19,745✔
844
  taosMemoryFree(payload);
19,745✔
845
  return code;
19,745✔
846
}
847

848
static int32_t genTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
64,780✔
849
  if (pTagCond == NULL) {
64,780✔
850
    return TSDB_CODE_SUCCESS;
61,488✔
851
  }
852

853
  char*   payload = NULL;
3,292✔
854
  int32_t len = 0;
3,292✔
855
  int32_t code = nodesNodeToMsg(pTagCond, &payload, &len);
3,292✔
856
  if (code != TSDB_CODE_SUCCESS) {
3,292✔
857
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
858
    return code;
×
859
  }
860

861
  tMD5Init(pContext);
3,292✔
862
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
3,292✔
863
  tMD5Final(pContext);
3,292✔
864

865
  // void* tmp = NULL;
866
  // uint32_t size = 0;
867
  // (void)taosAscii2Hex((const char*)pContext->digest, 16, &tmp, &size);
868
  // qInfo("tag filter digest payload: %s", tmp);
869
  // taosMemoryFree(tmp);
870

871
  taosMemoryFree(payload);
3,292✔
872
  return TSDB_CODE_SUCCESS;
3,292✔
873
}
874

875
static int32_t genTbGroupDigest(const SNode* pGroup, uint8_t* filterDigest, T_MD5_CTX* pContext) {
1,436✔
876
  int32_t code = TSDB_CODE_SUCCESS;
1,436✔
877
  int32_t lino = 0;
1,436✔
878
  char*   payload = NULL;
1,436✔
879
  int32_t len = 0;
1,436✔
880
  code = nodesNodeToMsg(pGroup, &payload, &len);
1,436✔
881
  QUERY_CHECK_CODE(code, lino, _end);
1,436✔
882

883
  if (filterDigest[0]) {
1,436✔
884
    payload = taosMemoryRealloc(payload, len + tListLen(pContext->digest));
1,436✔
885
    QUERY_CHECK_NULL(payload, code, lino, _end, terrno);
1,436✔
886
    memcpy(payload + len, filterDigest + 1, tListLen(pContext->digest));
1,436✔
887
    len += tListLen(pContext->digest);
1,436✔
888
  }
889

890
  tMD5Init(pContext);
1,436✔
891
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
1,436✔
892
  tMD5Final(pContext);
1,436✔
893

894
_end:
1,436✔
895
  taosMemoryFree(payload);
1,436✔
896
  if (code != TSDB_CODE_SUCCESS) {
1,436✔
897
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
898
  }
899
  return code;
1,436✔
900
}
901

902
int32_t qGetColumnsFromNodeList(void* data, bool isList, SArray** pColList) {
13,119,465✔
903
  int32_t code = TSDB_CODE_SUCCESS;
13,119,465✔
904
  tagFilterAssist ctx = {0};
13,119,465✔
905
  ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
13,119,873✔
906
  if (ctx.colHash == NULL) {
13,119,794✔
907
    code = terrno;
×
908
    goto end;
×
909
  }
910

911
  ctx.index = 0;
13,119,794✔
912
  ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
13,119,794✔
913
  if (ctx.cInfoList == NULL) {
13,119,596✔
914
    code = terrno;
198✔
915
    goto end;
×
916
  }
917

918
  if (isList) {
13,119,398✔
919
    SNode* pNode = NULL;
1,949,695✔
920
    FOREACH(pNode, (SNodeList*)data) {
4,090,939✔
921
      nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx);
2,141,101✔
922
      if (TSDB_CODE_SUCCESS != ctx.code) {
2,141,101✔
923
        code = ctx.code;
×
924
        goto end;
×
925
      }
926
      REPLACE_NODE(pNode);
2,141,101✔
927
    }
928
  } else {
929
    SNode* pNode = (SNode*)data;
11,169,703✔
930
    nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx);
11,169,703✔
931
    if (TSDB_CODE_SUCCESS != ctx.code) {
11,169,776✔
932
      code = ctx.code;
×
933
      goto end;
×
934
    }
935
  }
936
  
937
  if (pColList != NULL) *pColList = ctx.cInfoList;
13,113,328✔
938
  ctx.cInfoList = NULL;
13,117,571✔
939

940
end:
13,124,235✔
941
  taosHashCleanup(ctx.colHash);
13,117,286✔
942
  taosArrayDestroy(ctx.cInfoList);
13,116,048✔
943
  return code;
13,117,643✔
944
}
945

946
static int32_t buildGroupInfo(SColumnInfoData* pValue, int32_t i, SArray* gInfo) {
717,354✔
947
  int32_t code = TSDB_CODE_SUCCESS;
717,354✔
948
  SStreamGroupValue* v = taosArrayReserve(gInfo, 1);
717,354✔
949
  if (v == NULL) {
717,744✔
950
    code = terrno;
×
951
    goto end;
×
952
  }
953
  if (colDataIsNull_s(pValue, i)) {
1,435,098✔
954
    v->isNull = true;
14,360✔
955
  } else {
956
    v->isNull = false;
702,994✔
957
    char* data = colDataGetData(pValue, i);
703,384✔
958
    if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
702,994✔
959
      if (tTagIsJson(data)) {
×
960
        code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
×
961
        goto end;
×
962
      }
963
      if (tTagIsJsonNull(data)) {
×
964
        v->isNull = true;
×
965
        goto end;
×
966
      }
967
      int32_t len = getJsonValueLen(data);
×
968
      v->data.type = pValue->info.type;
×
969
      v->data.nData = len;
×
970
      v->data.pData = taosMemoryCalloc(1, len + 1);
×
971
      if (v->data.pData == NULL) {
×
972
        code = terrno;
×
973
        goto end;
×
974
      }
975
      memcpy(v->data.pData, data, len);
×
976
      qDebug("buildGroupInfo:%d add json data len:%d, data:%s", i, len, (char*)v->data.pData);
×
977
    } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
702,562✔
978
      if (varDataTLen(data) > pValue->info.bytes) {
454,693✔
979
        code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
980
        goto end;
×
981
      }
982
      v->data.type = pValue->info.type;
455,041✔
983
      v->data.nData = varDataLen(data);
455,473✔
984
      v->data.pData = taosMemoryCalloc(1, varDataLen(data) + 1);
455,041✔
985
      if (v->data.pData == NULL) {
455,473✔
986
        code = terrno;
×
987
        goto end;
×
988
      }
989
      memcpy(v->data.pData, varDataVal(data), varDataLen(data));
455,473✔
990
      qDebug("buildGroupInfo:%d add var data type:%d, len:%d, data:%s", i, pValue->info.type, varDataLen(data), (char*)v->data.pData);
455,473✔
991
    } else if (pValue->info.type == TSDB_DATA_TYPE_DECIMAL) {  // reader todo decimal
247,911✔
992
      v->data.type = pValue->info.type;
×
993
      v->data.nData = pValue->info.bytes;
×
994
      v->data.pData = taosMemoryCalloc(1, pValue->info.bytes);
×
995
      if (v->data.pData == NULL) {
×
996
        code = terrno;
×
997
        goto end;
×
998
      }
999
      memcpy(&v->data.pData, data, pValue->info.bytes);
×
1000
      qDebug("buildGroupInfo:%d add data type:%d, data:%"PRId64, i, pValue->info.type, v->data.val);
×
1001
    } else {  // reader todo decimal
1002
      v->data.type = pValue->info.type;
247,521✔
1003
      memcpy(&v->data.val, data, pValue->info.bytes);
247,911✔
1004
      qDebug("buildGroupInfo:%d add data type:%d, data:%"PRId64, i, pValue->info.type, v->data.val);
247,911✔
1005
    }
1006
  }
1007
end:
79,328✔
1008
  if (code != TSDB_CODE_SUCCESS) {
717,744✔
1009
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1010
    v->isNull = true;
×
1011
  }
1012
  return code;
717,744✔
1013
}
1014

1015
static void getColInfoResultForGroupbyForStream(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo,
107,509✔
1016
                                   SStorageAPI* pAPI, SHashObj* groupIdMap) {
1017
  int32_t      code = TSDB_CODE_SUCCESS;
107,509✔
1018
  int32_t      lino = 0;
107,509✔
1019
  SArray*      pBlockList = NULL;
107,509✔
1020
  SSDataBlock* pResBlock = NULL;
107,509✔
1021
  SArray*      groupData = NULL;
107,509✔
1022
  SArray*      pUidTagList = NULL;
107,509✔
1023
  SArray*      gInfo = NULL;
107,509✔
1024
  int32_t      tbNameIndex = 0;
107,509✔
1025

1026
  int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
107,509✔
1027
  if (rows == 0) {
107,509✔
1028
    return;
×
1029
  }
1030

1031
  pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
107,509✔
1032
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
107,509✔
1033

1034
  for (int32_t i = 0; i < rows; ++i) {
452,101✔
1035
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
344,592✔
1036
    QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
344,592✔
1037
    STUidTagInfo info = {.uid = pkeyInfo->uid};
344,592✔
1038
    void*        tmp = taosArrayPush(pUidTagList, &info);
344,592✔
1039
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
344,592✔
1040
  }
1041
 
1042
  if (taosArrayGetSize(pUidTagList) > 0) {
107,509✔
1043
    code = pAPI->metaFn.getTableTagsByUid(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
107,509✔
1044
  } else {
1045
    code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
×
1046
  }
1047
  if (code != TSDB_CODE_SUCCESS) {
107,509✔
1048
    goto end;
×
1049
  }
1050

1051
  SArray* pColList = NULL;
107,509✔
1052
  code = qGetColumnsFromNodeList(group, true, &pColList);
107,509✔
1053
  if (code != TSDB_CODE_SUCCESS) {
107,509✔
1054
    goto end;
×
1055
  }
1056

1057
  for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
281,991✔
1058
    SColumnInfo* tmp = (SColumnInfo*)taosArrayGet(pColList, i);
174,482✔
1059
    if (tmp != NULL && tmp->colId == -1) {
174,482✔
1060
      tbNameIndex = i;
107,509✔
1061
    }
1062
  }
1063
  
1064
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
107,509✔
1065
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
107,509✔
1066
  taosArrayDestroy(pColList);
107,509✔
1067
  if (pResBlock == NULL) {
107,509✔
1068
    code = terrno;
×
1069
    goto end;
×
1070
  }
1071

1072
  pBlockList = taosArrayInit(2, POINTER_BYTES);
107,509✔
1073
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
107,509✔
1074

1075
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
107,077✔
1076
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
107,077✔
1077

1078
  groupData = taosArrayInit(2, POINTER_BYTES);
107,077✔
1079
  QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
107,509✔
1080

1081
  SNode* pNode = NULL;
107,509✔
1082
  FOREACH(pNode, group) {
281,991✔
1083
    SScalarParam output = {0};
174,482✔
1084

1085
    switch (nodeType(pNode)) {
174,482✔
1086
      case QUERY_NODE_VALUE:
×
1087
        break;
×
1088
      case QUERY_NODE_COLUMN:
174,482✔
1089
      case QUERY_NODE_OPERATOR:
1090
      case QUERY_NODE_FUNCTION: {
1091
        SExprNode* expNode = (SExprNode*)pNode;
174,482✔
1092
        code = createResultData(&expNode->resType, rows, &output);
174,482✔
1093
        if (code != TSDB_CODE_SUCCESS) {
174,482✔
1094
          goto end;
×
1095
        }
1096
        break;
174,482✔
1097
      }
1098

1099
      default:
×
1100
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
1101
        goto end;
×
1102
    }
1103

1104
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
174,482✔
1105
      SColumnNode*     pSColumnNode = (SColumnNode*)pNode;
174,482✔
1106
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
174,482✔
1107
      QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
174,482✔
1108
      code = colDataAssign(output.columnData, pColInfo, rows, NULL);
174,482✔
1109
    } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
×
1110
      continue;
×
1111
    } else {
1112
      code = scalarCalculate(pNode, pBlockList, &output, NULL, NULL);
×
1113
    }
1114

1115
    if (code != TSDB_CODE_SUCCESS) {
174,482✔
1116
      releaseColInfoData(output.columnData);
×
1117
      goto end;
×
1118
    }
1119

1120
    void* tmp = taosArrayPush(groupData, &output.columnData);
174,482✔
1121
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
174,482✔
1122
  }
1123

1124
  for (int i = 0; i < rows; i++) {
452,101✔
1125
    gInfo = taosArrayInit(taosArrayGetSize(groupData), sizeof(SStreamGroupValue));
344,592✔
1126
    QUERY_CHECK_NULL(gInfo, code, lino, end, terrno);
344,592✔
1127

1128
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
344,592✔
1129
    QUERY_CHECK_NULL(info, code, lino, end, terrno);
344,592✔
1130

1131
    for (int j = 0; j < taosArrayGetSize(groupData); j++) {
862,311✔
1132
      SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
517,719✔
1133
        int32_t ret = buildGroupInfo(pValue, i, gInfo);
517,719✔
1134
        if (ret != TSDB_CODE_SUCCESS) {
517,719✔
1135
          qError("buildGroupInfo failed at line %d since %s", __LINE__, tstrerror(ret));
×
1136
          goto end;
×
1137
        }
1138
        if (j == tbNameIndex) {
517,719✔
1139
          SStreamGroupValue* v = taosArrayGetLast(gInfo);
344,592✔
1140
          if (v != NULL){
344,592✔
1141
            v->isTbname = true;
344,592✔
1142
            v->uid = info->uid;
344,592✔
1143
          }
1144
        }
1145
    }
1146

1147
    int32_t ret = taosHashPut(groupIdMap, &info->uid, sizeof(info->uid), &gInfo, POINTER_BYTES);
344,592✔
1148
    if (ret != TSDB_CODE_SUCCESS) {
344,592✔
1149
      qError("put groupid to map failed at line %d since %s", __LINE__, tstrerror(ret));
×
1150
      goto end;
×
1151
    }
1152
    qDebug("put groupid to map gid:%" PRIu64, info->uid);
344,592✔
1153
    gInfo = NULL;
344,592✔
1154
  }
1155

1156
end:
107,509✔
1157
  blockDataDestroy(pResBlock);
107,509✔
1158
  taosArrayDestroy(pBlockList);
107,509✔
1159
  taosArrayDestroyEx(pUidTagList, freeItem);
107,509✔
1160
  taosArrayDestroyP(groupData, releaseColInfoData);
107,509✔
1161
  taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
107,509✔
1162

1163
  if (code != TSDB_CODE_SUCCESS) {
107,509✔
1164
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1165
  }
1166
}
1167

1168
int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo, uint8_t* digest,
1,841,796✔
1169
                                   SStorageAPI* pAPI, bool initRemainGroups, SHashObj* groupIdMap) {
1170
  int32_t      code = TSDB_CODE_SUCCESS;
1,841,796✔
1171
  int32_t      lino = 0;
1,841,796✔
1172
  SArray*      pBlockList = NULL;
1,841,796✔
1173
  SSDataBlock* pResBlock = NULL;
1,841,796✔
1174
  void*        keyBuf = NULL;
1,842,186✔
1175
  SArray*      groupData = NULL;
1,842,186✔
1176
  SArray*      pUidTagList = NULL;
1,842,186✔
1177
  SArray*      tableList = NULL;
1,842,186✔
1178
  SArray*      gInfo = NULL;
1,842,186✔
1179

1180
  int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
1,842,186✔
1181
  if (rows == 0) {
1,842,186✔
1182
    return TSDB_CODE_SUCCESS;
×
1183
  } 
1184

1185
  T_MD5_CTX context = {0};
1,842,186✔
1186
  if (tsTagFilterCache && groupIdMap == NULL) {
1,842,186✔
1187
    SNodeListNode* listNode = NULL;
1,436✔
1188
    code = nodesMakeNode(QUERY_NODE_NODE_LIST, (SNode**)&listNode);
1,436✔
1189
    if (TSDB_CODE_SUCCESS != code) {
1,436✔
1190
      goto end;
×
1191
    }
1192
    listNode->pNodeList = group;
1,436✔
1193
    code = genTbGroupDigest((SNode*)listNode, digest, &context);
1,436✔
1194
    QUERY_CHECK_CODE(code, lino, end);
1,436✔
1195

1196
    nodesFree(listNode);
1,436✔
1197

1198
    code = pAPI->metaFn.metaGetCachedTbGroup(pVnode, pTableListInfo->idInfo.suid, context.digest,
1,436✔
1199
                                             tListLen(context.digest), &tableList);
1200
    QUERY_CHECK_CODE(code, lino, end);
1,436✔
1201

1202
    if (tableList) {
1,436✔
1203
      taosArrayDestroy(pTableListInfo->pTableList);
×
1204
      pTableListInfo->pTableList = tableList;
×
1205
      qDebug("retrieve tb group list from cache, numOfTables:%d",
×
1206
             (int32_t)taosArrayGetSize(pTableListInfo->pTableList));
1207
      goto end;
×
1208
    }
1209
  }
1210

1211
  pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
1,842,186✔
1212
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
1,842,186✔
1213

1214
  for (int32_t i = 0; i < rows; ++i) {
10,779,870✔
1215
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
8,937,279✔
1216
    QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
8,936,889✔
1217
    STUidTagInfo info = {.uid = pkeyInfo->uid};
8,936,889✔
1218
    void*        tmp = taosArrayPush(pUidTagList, &info);
8,937,294✔
1219
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
8,937,294✔
1220
  }
1221

1222
  if (taosArrayGetSize(pUidTagList) > 0) {
1,842,591✔
1223
    code = pAPI->metaFn.getTableTagsByUid(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
1,842,186✔
1224
  } else {
1225
    code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
×
1226
  }
1227
  if (code != TSDB_CODE_SUCCESS) {
1,842,186✔
1228
    goto end;
×
1229
  }
1230

1231
  SArray* pColList = NULL;
1,842,186✔
1232
  code = qGetColumnsFromNodeList(group, true, &pColList); 
1,842,186✔
1233
  if (code != TSDB_CODE_SUCCESS) {
1,842,043✔
1234
    goto end;
×
1235
  }
1236

1237
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
1,842,043✔
1238
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
1,842,043✔
1239
  taosArrayDestroy(pColList);
1,842,186✔
1240
  if (pResBlock == NULL) {
1,841,505✔
1241
    code = terrno;
×
1242
    goto end;
×
1243
  }
1244

1245
  //  int64_t st1 = taosGetTimestampUs();
1246
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
1247

1248
  pBlockList = taosArrayInit(2, POINTER_BYTES);
1,841,505✔
1249
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
1,842,186✔
1250

1251
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
1,842,186✔
1252
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
1,842,186✔
1253

1254
  groupData = taosArrayInit(2, POINTER_BYTES);
1,842,186✔
1255
  QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
1,842,186✔
1256

1257
  SNode* pNode = NULL;
1,842,186✔
1258
  FOREACH(pNode, group) {
3,808,805✔
1259
    SScalarParam output = {0};
1,966,619✔
1260

1261
    switch (nodeType(pNode)) {
1,965,938✔
1262
      case QUERY_NODE_VALUE:
×
1263
        break;
×
1264
      case QUERY_NODE_COLUMN:
1,966,412✔
1265
      case QUERY_NODE_OPERATOR:
1266
      case QUERY_NODE_FUNCTION: {
1267
        SExprNode* expNode = (SExprNode*)pNode;
1,966,412✔
1268
        code = createResultData(&expNode->resType, rows, &output);
1,966,412✔
1269
        if (code != TSDB_CODE_SUCCESS) {
1,965,773✔
1270
          goto end;
×
1271
        }
1272
        break;
1,965,773✔
1273
      }
1274

1275
      default:
×
1276
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
1277
        goto end;
×
1278
    }
1279

1280
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
1,965,773✔
1281
      SColumnNode*     pSColumnNode = (SColumnNode*)pNode;
1,956,227✔
1282
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
1,956,227✔
1283
      QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
1,955,546✔
1284
      code = colDataAssign(output.columnData, pColInfo, rows, NULL);
1,955,546✔
1285
    } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
10,392✔
1286
      continue;
×
1287
    } else {
1288
      code = scalarCalculate(pNode, pBlockList, &output, NULL, NULL);
10,392✔
1289
    }
1290

1291
    if (code != TSDB_CODE_SUCCESS) {
1,966,333✔
1292
      releaseColInfoData(output.columnData);
×
1293
      goto end;
×
1294
    }
1295

1296
    void* tmp = taosArrayPush(groupData, &output.columnData);
1,966,619✔
1297
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
1,966,619✔
1298
  }
1299

1300
  int32_t keyLen = 0;
1,842,186✔
1301
  SNode*  node;
1302
  FOREACH(node, group) {
3,808,519✔
1303
    SExprNode* pExpr = (SExprNode*)node;
1,966,333✔
1304
    keyLen += pExpr->resType.bytes;
1,966,333✔
1305
  }
1306

1307
  int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
1,841,219✔
1308
  keyLen += nullFlagSize;
1,841,362✔
1309

1310
  keyBuf = taosMemoryCalloc(1, keyLen);
1,841,362✔
1311
  if (keyBuf == NULL) {
1,842,043✔
1312
    code = terrno;
×
1313
    goto end;
×
1314
  }
1315

1316
  if (initRemainGroups) {
1,842,043✔
1317
    pTableListInfo->remainGroups =
817,443✔
1318
        taosHashInit(rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
817,300✔
1319
    if (pTableListInfo->remainGroups == NULL) {
817,443✔
1320
      code = terrno;
×
1321
      goto end;
×
1322
    }
1323
  }
1324

1325
  for (int i = 0; i < rows; i++) {
10,778,150✔
1326
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
8,935,815✔
1327
    QUERY_CHECK_NULL(info, code, lino, end, terrno);
8,935,815✔
1328

1329
    if (groupIdMap != NULL){
8,935,815✔
1330
      gInfo = taosArrayInit(taosArrayGetSize(groupData), sizeof(SStreamGroupValue));
176,091✔
1331
    }
1332
    
1333
    char* isNull = (char*)keyBuf;
8,935,134✔
1334
    char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(group);
8,935,134✔
1335
    for (int j = 0; j < taosArrayGetSize(groupData); j++) {
18,540,268✔
1336
      SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
9,603,388✔
1337

1338
      if (groupIdMap != NULL && gInfo != NULL) {
9,604,167✔
1339
        int32_t ret = buildGroupInfo(pValue, i, gInfo);
199,635✔
1340
        if (ret != TSDB_CODE_SUCCESS) {
200,025✔
1341
          qError("buildGroupInfo failed at line %d since %s", __LINE__, tstrerror(ret));
×
1342
          taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
×
1343
          gInfo = NULL;
×
1344
        }
1345
      }
1346
      
1347
      if (colDataIsNull_s(pValue, i)) {
19,209,088✔
1348
        isNull[j] = 1;
95,615✔
1349
      } else {
1350
        isNull[j] = 0;
9,508,916✔
1351
        char* data = colDataGetData(pValue, i);
9,507,680✔
1352
        if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
9,509,613✔
1353
          // if (tTagIsJson(data)) {
1354
          //   code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
1355
          //   goto end;
1356
          // }
1357
          if (tTagIsJsonNull(data)) {
89,976✔
1358
            isNull[j] = 1;
×
1359
            continue;
×
1360
          }
1361
          int32_t len = getJsonValueLen(data);
89,976✔
1362
          memcpy(pStart, data, len);
89,976✔
1363
          pStart += len;
89,976✔
1364
        } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
9,419,589✔
1365
          if (IS_STR_DATA_BLOB(pValue->info.type)) {
6,369,596✔
1366
            if (blobDataTLen(data) > TSDB_MAX_BLOB_LEN) {
143✔
1367
              code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
1368
              goto end;
×
1369
            }
1370
            memcpy(pStart, data, blobDataTLen(data));
×
1371
            pStart += blobDataTLen(data);
×
1372
          } else {
1373
            if (varDataTLen(data) > pValue->info.bytes) {
6,368,614✔
1374
              code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
1375
              goto end;
×
1376
            }
1377
            memcpy(pStart, data, varDataTLen(data));
6,368,757✔
1378
            pStart += varDataTLen(data);
6,368,900✔
1379
          }
1380
        } else {
1381
          memcpy(pStart, data, pValue->info.bytes);
3,049,985✔
1382
          pStart += pValue->info.bytes;
3,051,497✔
1383
        }
1384
      }
1385
    }
1386

1387
    int32_t len = (int32_t)(pStart - (char*)keyBuf);
8,934,376✔
1388
    info->groupId = calcGroupId(keyBuf, len);
8,934,376✔
1389
    if (groupIdMap != NULL && gInfo != NULL) {
8,932,914✔
1390
      int32_t ret = taosHashPut(groupIdMap, &info->groupId, sizeof(info->groupId), &gInfo, POINTER_BYTES);
176,091✔
1391
      if (ret != TSDB_CODE_SUCCESS) {
176,091✔
1392
        qError("put groupid to map failed at line %d since %s", __LINE__, tstrerror(ret));
×
1393
        taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
×
1394
      }
1395
      qDebug("put groupid to map gid:%" PRIu64, info->groupId);
176,091✔
1396
      gInfo = NULL;
176,091✔
1397
    }
1398
    if (initRemainGroups) {
8,932,914✔
1399
      // groupId ~ table uid
1400
      code = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid),
4,352,600✔
1401
                         sizeof(info->uid));
1402
      if (code == TSDB_CODE_DUP_KEY) {
4,355,650✔
1403
        code = TSDB_CODE_SUCCESS;
834,733✔
1404
      }
1405
      QUERY_CHECK_CODE(code, lino, end);
4,355,650✔
1406
    }
1407
  }
1408

1409
  if (tsTagFilterCache && groupIdMap == NULL) {
1,842,335✔
1410
    tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
1,436✔
1411
    QUERY_CHECK_NULL(tableList, code, lino, end, terrno);
1,436✔
1412

1413
    code = pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest,
2,872✔
1414
                                              tListLen(context.digest), tableList,
1415
                                              taosArrayGetSize(tableList) * sizeof(STableKeyInfo));
1,436✔
1416
    QUERY_CHECK_CODE(code, lino, end);
1,436✔
1417
  }
1418

1419
  //  int64_t st2 = taosGetTimestampUs();
1420
  //  qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
1421

1422
end:
1,842,049✔
1423
  taosMemoryFreeClear(keyBuf);
1,842,186✔
1424
  blockDataDestroy(pResBlock);
1,842,043✔
1425
  taosArrayDestroy(pBlockList);
1,841,362✔
1426
  taosArrayDestroyEx(pUidTagList, freeItem);
1,841,900✔
1427
  taosArrayDestroyP(groupData, releaseColInfoData);
1,841,375✔
1428
  taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
1,841,298✔
1429

1430
  if (code != TSDB_CODE_SUCCESS) {
1,841,155✔
1431
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1432
  }
1433
  return code;
1,841,362✔
1434
}
1435

1436
static int32_t nameComparFn(const void* p1, const void* p2) {
731,334✔
1437
  const char* pName1 = *(const char**)p1;
731,334✔
1438
  const char* pName2 = *(const char**)p2;
731,334✔
1439

1440
  int32_t ret = strcmp(pName1, pName2);
731,334✔
1441
  if (ret == 0) {
731,334✔
1442
    return 0;
18,444✔
1443
  } else {
1444
    return (ret > 0) ? 1 : -1;
712,890✔
1445
  }
1446
}
1447

1448
static SArray* getTableNameList(const SNodeListNode* pList) {
426,984✔
1449
  int32_t    code = TSDB_CODE_SUCCESS;
426,984✔
1450
  int32_t    lino = 0;
426,984✔
1451
  int32_t    len = LIST_LENGTH(pList->pNodeList);
426,984✔
1452
  SListCell* cell = pList->pNodeList->pHead;
426,984✔
1453

1454
  SArray* pTbList = taosArrayInit(len, POINTER_BYTES);
426,984✔
1455
  QUERY_CHECK_NULL(pTbList, code, lino, _end, terrno);
426,984✔
1456

1457
  for (int i = 0; i < pList->pNodeList->length; i++) {
1,158,616✔
1458
    SValueNode* valueNode = (SValueNode*)cell->pNode;
731,632✔
1459
    if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
731,632✔
1460
      terrno = TSDB_CODE_INVALID_PARA;
×
1461
      taosArrayDestroy(pTbList);
×
1462
      return NULL;
×
1463
    }
1464

1465
    char* name = varDataVal(valueNode->datum.p);
731,632✔
1466
    void* tmp = taosArrayPush(pTbList, &name);
731,632✔
1467
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
731,632✔
1468
    cell = cell->pNext;
731,632✔
1469
  }
1470

1471
  size_t numOfTables = taosArrayGetSize(pTbList);
426,984✔
1472

1473
  // order the name
1474
  taosArraySort(pTbList, nameComparFn);
426,984✔
1475

1476
  // remove the duplicates
1477
  SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
426,984✔
1478
  QUERY_CHECK_NULL(pNewList, code, lino, _end, terrno);
426,984✔
1479
  void* tmpTbl = taosArrayGet(pTbList, 0);
426,984✔
1480
  QUERY_CHECK_NULL(tmpTbl, code, lino, _end, terrno);
426,984✔
1481
  void* tmp = taosArrayPush(pNewList, tmpTbl);
426,984✔
1482
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
426,984✔
1483

1484
  for (int32_t i = 1; i < numOfTables; ++i) {
731,632✔
1485
    char** name = taosArrayGetLast(pNewList);
304,648✔
1486
    char** nameInOldList = taosArrayGet(pTbList, i);
304,648✔
1487
    QUERY_CHECK_NULL(nameInOldList, code, lino, _end, terrno);
304,648✔
1488
    if (strcmp(*name, *nameInOldList) == 0) {
304,648✔
1489
      continue;
9,906✔
1490
    }
1491

1492
    tmp = taosArrayPush(pNewList, nameInOldList);
294,742✔
1493
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
294,742✔
1494
  }
1495

1496
_end:
426,984✔
1497
  taosArrayDestroy(pTbList);
426,984✔
1498
  if (code != TSDB_CODE_SUCCESS) {
426,984✔
UNCOV
1499
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1500
    return NULL;
×
1501
  }
1502
  return pNewList;
426,984✔
1503
}
1504

1505
static int tableUidCompare(const void* a, const void* b) {
×
1506
  uint64_t u1 = *(uint64_t*)a;
×
1507
  uint64_t u2 = *(uint64_t*)b;
×
1508

1509
  if (u1 == u2) {
×
1510
    return 0;
×
1511
  }
1512

1513
  return u1 < u2 ? -1 : 1;
×
1514
}
1515

1516
static int32_t filterTableInfoCompare(const void* a, const void* b) {
18,649,031✔
1517
  STUidTagInfo* p1 = (STUidTagInfo*)a;
18,649,031✔
1518
  STUidTagInfo* p2 = (STUidTagInfo*)b;
18,649,031✔
1519

1520
  if (p1->uid == p2->uid) {
18,649,031✔
1521
    return 0;
×
1522
  }
1523

1524
  return p1->uid < p2->uid ? -1 : 1;
18,650,075✔
1525
}
1526

1527
static FilterCondType checkTagCond(SNode* cond) {
12,969,101✔
1528
  if (nodeType(cond) == QUERY_NODE_OPERATOR) {
12,969,101✔
1529
    return FILTER_NO_LOGIC;
10,956,199✔
1530
  }
1531
  if (nodeType(cond) != QUERY_NODE_LOGIC_CONDITION || ((SLogicConditionNode*)cond)->condType != LOGIC_COND_TYPE_AND) {
2,013,784✔
1532
    return FILTER_AND;
213,054✔
1533
  }
1534
  return FILTER_OTHER;
1,800,254✔
1535
}
1536

1537
static int32_t optimizeTbnameInCond(void* pVnode, int64_t suid, SArray* list, SNode* cond, SStorageAPI* pAPI) {
12,969,101✔
1538
  int32_t ret = -1;
12,969,101✔
1539
  int32_t ntype = nodeType(cond);
12,969,101✔
1540

1541
  if (ntype == QUERY_NODE_OPERATOR) {
12,969,782✔
1542
    ret = optimizeTbnameInCondImpl(pVnode, list, cond, pAPI, suid);
10,956,672✔
1543
  }
1544

1545
  if (ntype != QUERY_NODE_LOGIC_CONDITION || ((SLogicConditionNode*)cond)->condType != LOGIC_COND_TYPE_AND) {
12,967,091✔
1546
    return ret;
11,166,639✔
1547
  }
1548

1549
  bool                 hasTbnameCond = false;
1,799,735✔
1550
  SLogicConditionNode* pNode = (SLogicConditionNode*)cond;
1,799,735✔
1551
  SNodeList*           pList = (SNodeList*)pNode->pParameterList;
1,799,735✔
1552

1553
  int32_t len = LIST_LENGTH(pList);
1,799,933✔
1554
  if (len <= 0) {
1,799,933✔
1555
    return ret;
×
1556
  }
1557

1558
  SListCell* cell = pList->pHead;
1,799,933✔
1559
  for (int i = 0; i < len; i++) {
5,818,043✔
1560
    if (cell == NULL) break;
4,024,464✔
1561
    if (optimizeTbnameInCondImpl(pVnode, list, cell->pNode, pAPI, suid) == 0) {
4,024,464✔
1562
      hasTbnameCond = true;
6,351✔
1563
      break;
6,351✔
1564
    }
1565
    cell = cell->pNext;
4,018,113✔
1566
  }
1567

1568
  taosArraySort(list, filterTableInfoCompare);
1,799,930✔
1569
  taosArrayRemoveDuplicate(list, filterTableInfoCompare, NULL);
1,799,214✔
1570

1571
  if (hasTbnameCond) {
1,800,254✔
1572
    ret = pAPI->metaFn.getTableTagsByUid(pVnode, suid, list);
6,351✔
1573
  }
1574

1575
  return ret;
1,800,254✔
1576
}
1577

1578
// only return uid that does not contained in pExistedUidList
1579
static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, SNode* pTagCond, SStorageAPI* pStoreAPI,
14,980,247✔
1580
                                        uint64_t suid) {
1581
  if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) {
14,980,247✔
1582
    return -1;
6,342✔
1583
  }
1584

1585
  SOperatorNode* pNode = (SOperatorNode*)pTagCond;
14,973,838✔
1586
  if (pNode->opType != OP_TYPE_IN) {
14,973,838✔
1587
    return -1;
14,250,988✔
1588
  }
1589

1590
  if ((pNode->pLeft != NULL && ((nodeType(pNode->pLeft) == QUERY_NODE_FUNCTION &&
722,117✔
1591
                                 ((SFunctionNode*)pNode->pLeft)->funcType == FUNCTION_TYPE_TBNAME)) ||
426,984✔
1592
       (nodeType(pNode->pLeft) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pLeft)->colType == COLUMN_TYPE_TBNAME)) &&
295,140✔
1593
      (pNode->pRight != NULL && nodeType(pNode->pRight) == QUERY_NODE_NODE_LIST)) {
426,984✔
1594
    SNodeListNode* pList = (SNodeListNode*)pNode->pRight;
426,984✔
1595

1596
    int32_t len = LIST_LENGTH(pList->pNodeList);
426,984✔
1597
    if (len <= 0) {
426,984✔
1598
      return -1;
×
1599
    }
1600

1601
    SArray*   pTbList = getTableNameList(pList);
426,984✔
1602
    int32_t   numOfTables = taosArrayGetSize(pTbList);
426,984✔
1603
    SHashObj* uHash = NULL;
426,984✔
1604

1605
    size_t numOfExisted = taosArrayGetSize(pExistedUidList);  // len > 0 means there already have uids
426,984✔
1606
    if (numOfExisted > 0) {
426,984✔
1607
      uHash = taosHashInit(numOfExisted / 0.7, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
2,390✔
1608
      if (!uHash) {
2,390✔
1609
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1610
        return terrno;
×
1611
      }
1612

1613
      for (int i = 0; i < numOfExisted; i++) {
2,388,805✔
1614
        STUidTagInfo* pTInfo = taosArrayGet(pExistedUidList, i);
2,386,415✔
1615
        if (!pTInfo) {
2,386,415✔
1616
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1617
          return terrno;
×
1618
        }
1619
        int32_t tempRes = taosHashPut(uHash, &pTInfo->uid, sizeof(uint64_t), &i, sizeof(i));
2,386,415✔
1620
        if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
2,386,415✔
1621
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
1622
          return tempRes;
×
1623
        }
1624
      }
1625
    }
1626

1627
    for (int i = 0; i < numOfTables; i++) {
1,132,210✔
1628
      char* name = taosArrayGetP(pTbList, i);
711,820✔
1629

1630
      uint64_t uid = 0, csuid = 0;
711,820✔
1631
      if (pStoreAPI->metaFn.getTableUidByName(pVnode, name, &uid) == 0) {
711,820✔
1632
        ETableType tbType = TSDB_TABLE_MAX;
427,102✔
1633
        if (pStoreAPI->metaFn.getTableTypeSuidByName(pVnode, name, &tbType, &csuid) == 0 &&
427,102✔
1634
            tbType == TSDB_CHILD_TABLE) {
426,646✔
1635
          if (suid != csuid) {
420,052✔
1636
            continue;
892✔
1637
          }
1638
          if (NULL == uHash || taosHashGet(uHash, &uid, sizeof(uid)) == NULL) {
419,160✔
1639
            STUidTagInfo s = {.uid = uid, .name = name, .pTagVal = NULL};
417,965✔
1640
            void*        tmp = taosArrayPush(pExistedUidList, &s);
418,421✔
1641
            if (!tmp) {
418,421✔
1642
              return terrno;
×
1643
            }
1644
          }
1645
        } else {
1646
          taosArrayDestroy(pTbList);
6,594✔
1647
          taosHashCleanup(uHash);
6,594✔
1648
          return -1;
6,594✔
1649
        }
1650
      } else {
1651
        //        qWarn("failed to get tableIds from by table name: %s, reason: %s", name, tstrerror(terrno));
1652
        terrno = 0;
284,718✔
1653
      }
1654
    }
1655

1656
    taosHashCleanup(uHash);
420,390✔
1657
    taosArrayDestroy(pTbList);
420,390✔
1658
    return 0;
420,390✔
1659
  }
1660

1661
  return -1;
295,542✔
1662
}
1663

1664
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
13,780,162✔
1665
                                        SStorageAPI* pStorageAPI) {
1666
  int32_t      code = TSDB_CODE_SUCCESS;
13,780,162✔
1667
  int32_t      lino = 0;
13,780,162✔
1668
  SSDataBlock* pResBlock = NULL;
13,780,162✔
1669
  code = createDataBlock(&pResBlock);
13,781,419✔
1670
  QUERY_CHECK_CODE(code, lino, _end);
13,780,743✔
1671

1672
  for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
28,899,713✔
1673
    SColumnInfoData colInfo = {0};
15,115,596✔
1674
    void*           tmp = taosArrayGet(pColList, i);
15,114,587✔
1675
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
15,111,146✔
1676
    colInfo.info = *(SColumnInfo*)tmp;
15,111,146✔
1677
    code = blockDataAppendColInfo(pResBlock, &colInfo);
15,110,673✔
1678
    QUERY_CHECK_CODE(code, lino, _end);
15,117,365✔
1679
  }
1680

1681
  code = blockDataEnsureCapacity(pResBlock, numOfTables);
13,777,210✔
1682
  if (code != TSDB_CODE_SUCCESS) {
13,779,005✔
1683
    terrno = code;
×
1684
    blockDataDestroy(pResBlock);
×
1685
    return NULL;
×
1686
  }
1687

1688
  pResBlock->info.rows = numOfTables;
13,779,005✔
1689

1690
  int32_t numOfCols = taosArrayGetSize(pResBlock->pDataBlock);
13,780,764✔
1691

1692
  for (int32_t i = 0; i < numOfTables; i++) {
203,326,897✔
1693
    STUidTagInfo* p1 = taosArrayGet(pUidTagList, i);
189,542,282✔
1694
    QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
189,542,934✔
1695

1696
    for (int32_t j = 0; j < numOfCols; j++) {
387,012,301✔
1697
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
197,423,511✔
1698
      QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
197,444,280✔
1699

1700
      if (pColInfo->info.colId == -1) {  // tbname
197,444,280✔
1701
        char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
8,158,687✔
1702
        if (p1->name != NULL) {
8,159,286✔
1703
          STR_TO_VARSTR(str, p1->name);
418,211✔
1704
        } else {  // name is not retrieved during filter
1705
          code = pStorageAPI->metaFn.getTableNameByUid(pVnode, p1->uid, str);
7,737,885✔
1706
          QUERY_CHECK_CODE(code, lino, _end);
7,728,108✔
1707
        }
1708

1709
        code = colDataSetVal(pColInfo, i, str, false);
8,146,529✔
1710
        QUERY_CHECK_CODE(code, lino, _end);
8,153,684✔
1711
#if TAG_FILTER_DEBUG
1712
        qDebug("tagfilter uid:%ld, tbname:%s", *uid, str + 2);
1713
#endif
1714
      } else {
1715
        STagVal tagVal = {0};
189,284,517✔
1716
        tagVal.cid = pColInfo->info.colId;
189,292,081✔
1717
        if (p1->pTagVal == NULL) {
189,285,948✔
1718
          colDataSetNULL(pColInfo, i);
8,975✔
1719
        } else {
1720
          const char* p = pStorageAPI->metaFn.extractTagVal(p1->pTagVal, pColInfo->info.type, &tagVal);
189,290,440✔
1721

1722
          if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) {
189,318,415✔
1723
            colDataSetNULL(pColInfo, i);
3,988,870✔
1724
          } else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
185,331,268✔
1725
            code = colDataSetVal(pColInfo, i, p, false);
711,671✔
1726
            QUERY_CHECK_CODE(code, lino, _end);
711,671✔
1727
          } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
298,051,534✔
1728
            if (IS_STR_DATA_BLOB(pColInfo->info.type)) {
113,462,683✔
1729
              QUERY_CHECK_CODE(code = TSDB_CODE_BLOB_NOT_SUPPORT_TAG, lino, _end);
×
1730
            }
1731
            char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
113,451,426✔
1732
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
113,443,005✔
1733
            varDataSetLen(tmp, tagVal.nData);
113,443,005✔
1734
            memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
113,445,640✔
1735
            code = colDataSetVal(pColInfo, i, tmp, false);
113,441,154✔
1736
#if TAG_FILTER_DEBUG
1737
            qDebug("tagfilter varch:%s", tmp + 2);
1738
#endif
1739
            taosMemoryFree(tmp);
113,449,176✔
1740
            QUERY_CHECK_CODE(code, lino, _end);
113,437,580✔
1741
          } else {
1742
            code = colDataSetVal(pColInfo, i, (const char*)&tagVal.i64, false);
71,155,426✔
1743
            QUERY_CHECK_CODE(code, lino, _end);
71,181,477✔
1744
#if TAG_FILTER_DEBUG
1745
            if (pColInfo->info.type == TSDB_DATA_TYPE_INT) {
1746
              qDebug("tagfilter int:%d", *(int*)(&tagVal.i64));
1747
            } else if (pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE) {
1748
              qDebug("tagfilter double:%f", *(double*)(&tagVal.i64));
1749
            }
1750
#endif
1751
          }
1752
        }
1753
      }
1754
    }
1755
  }
1756

1757
_end:
13,779,773✔
1758
  if (code != TSDB_CODE_SUCCESS) {
13,784,615✔
1759
    blockDataDestroy(pResBlock);
3,427✔
1760
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1761
    terrno = code;
×
1762
    return NULL;
×
1763
  }
1764
  return pResBlock;
13,781,188✔
1765
}
1766

1767
static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList,
11,154,928✔
1768
                                 bool* pResultList, bool addUid) {
1769
  taosArrayClear(pUidList);
11,154,928✔
1770

1771
  STableKeyInfo info = {.uid = 0, .groupId = 0};
11,148,880✔
1772
  int32_t       numOfTables = taosArrayGetSize(pUidTagList);
11,150,630✔
1773
  for (int32_t i = 0; i < numOfTables; ++i) {
190,416,375✔
1774
    if (pResultList[i]) {
179,249,903✔
1775
      STUidTagInfo* tmpTag = (STUidTagInfo*)taosArrayGet(pUidTagList, i);
79,720,655✔
1776
      if (!tmpTag) {
79,719,475✔
1777
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1778
        return terrno;
×
1779
      }
1780
      uint64_t uid = tmpTag->uid;
79,719,475✔
1781
      qDebug("tagfilter get uid:%" PRId64 ", res:%d", uid, pResultList[i]);
79,725,689✔
1782

1783
      info.uid = uid;
79,737,246✔
1784
      //qInfo("doSetQualifiedUid row:%d added to pTableList", i);
1785
      void* p = taosArrayPush(pListInfo->pTableList, &info);
79,737,246✔
1786
      if (p == NULL) {
79,730,229✔
1787
        return terrno;
×
1788
      }
1789

1790
      if (addUid) {
79,730,229✔
1791
        //qInfo("doSetQualifiedUid row:%d added to pUidList", i);
1792
        void* tmp = taosArrayPush(pUidList, &uid);
20,292✔
1793
        if (tmp == NULL) {
20,292✔
1794
          return terrno;
×
1795
        }
1796
      }
1797
    } else {
1798
      //qInfo("doSetQualifiedUid row:%d failed", i);
1799
    }
1800
  }
1801

1802
  return TSDB_CODE_SUCCESS;
11,166,472✔
1803
}
1804

1805
static int32_t copyExistedUids(SArray* pUidTagList, const SArray* pUidList) {
12,969,101✔
1806
  int32_t code = TSDB_CODE_SUCCESS;
12,969,101✔
1807
  int32_t numOfExisted = taosArrayGetSize(pUidList);
12,969,101✔
1808
  if (numOfExisted == 0) {
12,968,377✔
1809
    return code;
10,023,807✔
1810
  }
1811

1812
  for (int32_t i = 0; i < numOfExisted; ++i) {
36,319,213✔
1813
    uint64_t* uid = taosArrayGet(pUidList, i);
33,373,290✔
1814
    if (!uid) {
33,374,295✔
1815
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1816
      return terrno;
×
1817
    }
1818
    STUidTagInfo info = {.uid = *uid};
33,374,295✔
1819
    void*        tmp = taosArrayPush(pUidTagList, &info);
33,374,643✔
1820
    if (!tmp) {
33,374,643✔
1821
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1822
      return code;
×
1823
    }
1824
  }
1825
  return code;
2,945,923✔
1826
}
1827

1828
int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SNode* pTagCond, void* pVnode,
121,660,847✔
1829
                                 SIdxFltStatus status, SStorageAPI* pAPI, bool addUid, bool* listAdded, void* pStreamInfo) {
1830
  *listAdded = false;
121,660,847✔
1831
  if (pTagCond == NULL) {
121,671,802✔
1832
    return TSDB_CODE_SUCCESS;
108,673,557✔
1833
  }
1834

1835
  terrno = TSDB_CODE_SUCCESS;
12,998,245✔
1836

1837
  int32_t      lino = 0;
12,968,974✔
1838
  int32_t      code = TSDB_CODE_SUCCESS;
12,968,974✔
1839
  SArray*      pBlockList = NULL;
12,968,974✔
1840
  SSDataBlock* pResBlock = NULL;
12,968,974✔
1841
  SScalarParam output = {0};
12,968,657✔
1842
  SArray*      pUidTagList = NULL;
12,968,047✔
1843

1844
  SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
12,968,047✔
1845

1846
  //  int64_t stt = taosGetTimestampUs();
1847
  pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
12,969,172✔
1848
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
12,966,898✔
1849

1850
  code = copyExistedUids(pUidTagList, pUidList);
12,966,898✔
1851
  QUERY_CHECK_CODE(code, lino, end);
12,968,504✔
1852

1853
  FilterCondType condType = checkTagCond(pTagCond);
12,968,504✔
1854

1855
  int32_t filter = optimizeTbnameInCond(pVnode, pListInfo->idInfo.suid, pUidTagList, pTagCond, pAPI);
12,968,028✔
1856
  if (filter == 0) {  // tbname in filter is activated, do nothing and return
12,966,780✔
1857
    taosArrayClear(pUidList);
420,390✔
1858

1859
    int32_t numOfRows = taosArrayGetSize(pUidTagList);
420,390✔
1860
    code = taosArrayEnsureCap(pUidList, numOfRows);
420,390✔
1861
    QUERY_CHECK_CODE(code, lino, end);
420,390✔
1862

1863
    for (int32_t i = 0; i < numOfRows; ++i) {
3,225,226✔
1864
      STUidTagInfo* pInfo = taosArrayGet(pUidTagList, i);
2,804,836✔
1865
      QUERY_CHECK_NULL(pInfo, code, lino, end, terrno);
2,804,836✔
1866
      void* tmp = taosArrayPush(pUidList, &pInfo->uid);
2,804,836✔
1867
      QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
2,804,836✔
1868
    }
1869
    terrno = 0;
420,390✔
1870
  } else {
1871
    qDebug("pUidTagList size:%d", (int32_t)taosArrayGetSize(pUidTagList));
12,546,390✔
1872
    
1873
    if (((condType == FILTER_NO_LOGIC || condType == FILTER_AND) && status != SFLT_NOT_INDEX) ||
21,971,578✔
1874
          taosArrayGetSize(pUidTagList) > 0) {
9,422,912✔
1875
      code = pAPI->metaFn.getTableTagsByUid(pVnode, pListInfo->idInfo.suid, pUidTagList);
3,623,026✔
1876
    } else {
1877
      code = pAPI->metaFn.getTableTags(pVnode, pListInfo->idInfo.suid, pUidTagList);
8,925,640✔
1878
    }
1879
    if (code != TSDB_CODE_SUCCESS) {
12,547,741✔
1880
      qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code), pListInfo->idInfo.suid);
×
1881
      terrno = code;
×
1882
      QUERY_CHECK_CODE(code, lino, end);
×
1883
    }
1884
  }
1885

1886
  qDebug("final pUidTagList size:%d", (int32_t)taosArrayGetSize(pUidTagList));
12,968,131✔
1887

1888
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
12,970,614✔
1889
  if (numOfTables == 0) {
12,970,181✔
1890
    goto end;
1,800,082✔
1891
  }
1892

1893
  SArray* pColList = NULL;
11,170,099✔
1894
  code = qGetColumnsFromNodeList(pTagCond, false, &pColList); 
11,170,099✔
1895
  if (code != TSDB_CODE_SUCCESS) {
11,164,260✔
1896
    goto end;
×
1897
  }
1898
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
11,164,260✔
1899
  taosArrayDestroy(pColList);
11,165,204✔
1900
  if (pResBlock == NULL) {
11,167,112✔
1901
    code = terrno;
×
1902
    QUERY_CHECK_CODE(code, lino, end);
×
1903
  }
1904

1905
  //fprintDataBlock(pResBlock, "tagFilter", "", 0);
1906

1907
  //  int64_t st1 = taosGetTimestampUs();
1908
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
1909
  pBlockList = taosArrayInit(2, POINTER_BYTES);
11,167,112✔
1910
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
11,165,243✔
1911

1912
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
11,169,980✔
1913
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
11,169,980✔
1914

1915
  code = createResultData(&type, numOfTables, &output);
11,169,980✔
1916
  if (code != TSDB_CODE_SUCCESS) {
11,164,134✔
1917
    terrno = code;
×
1918
    QUERY_CHECK_CODE(code, lino, end);
×
1919
  }
1920

1921
  code = scalarCalculate(pTagCond, pBlockList, &output, pStreamInfo, NULL);
11,164,134✔
1922
  if (code != TSDB_CODE_SUCCESS) {
11,155,989✔
1923
    qError("failed to calculate scalar, reason:%s", tstrerror(code));
1,104✔
1924
    terrno = code;
1,104✔
1925
    QUERY_CHECK_CODE(code, lino, end);
1,104✔
1926
  }
1927

1928
  code = doSetQualifiedUid(pListInfo, pUidList, pUidTagList, (bool*)output.columnData->pData, addUid);
11,154,885✔
1929
  if (code != TSDB_CODE_SUCCESS) {
11,166,472✔
1930
    terrno = code;
×
1931
    QUERY_CHECK_CODE(code, lino, end);
×
1932
  }
1933
  *listAdded = true;
11,166,472✔
1934

1935
end:
12,968,255✔
1936
  if (code != TSDB_CODE_SUCCESS) {
12,965,894✔
1937
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,104✔
1938
  }
1939
  blockDataDestroy(pResBlock);
12,965,894✔
1940
  taosArrayDestroy(pBlockList);
12,962,121✔
1941
  taosArrayDestroyEx(pUidTagList, freeItem);
12,964,269✔
1942

1943
  colDataDestroy(output.columnData);
12,967,158✔
1944
  taosMemoryFreeClear(output.columnData);
12,967,686✔
1945
  return code;
12,966,285✔
1946
}
1947

1948
typedef struct {
1949
  int32_t code;
1950
  SStreamRuntimeFuncInfo* pStreamRuntimeInfo;
1951
} PlaceHolderContext;
1952

1953
static EDealRes replacePlaceHolderColumn(SNode** pNode, void* pContext) {
145,433✔
1954
  PlaceHolderContext* pData = (PlaceHolderContext*)pContext;
145,433✔
1955
  if (QUERY_NODE_FUNCTION != nodeType((*pNode))) {
145,433✔
1956
    return DEAL_RES_CONTINUE;
119,524✔
1957
  }
1958
  SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
25,909✔
1959
  if (!fmIsStreamPesudoColVal(pFuncNode->funcId)) {
25,909✔
1960
    return DEAL_RES_CONTINUE;
886✔
1961
  }
1962
  pData->code = fmSetStreamPseudoFuncParamVal(pFuncNode->funcId, pFuncNode->pParameterList, pData->pStreamRuntimeInfo);
25,023✔
1963
  if (pData->code != TSDB_CODE_SUCCESS) {
25,023✔
1964
    return DEAL_RES_ERROR;
×
1965
  }
1966
  SNode* pFirstParam = nodesListGetNode(pFuncNode->pParameterList, 0);
25,023✔
1967
  ((SValueNode*)pFirstParam)->translate = true;
25,023✔
1968
  SValueNode* res = NULL;
25,023✔
1969
  pData->code = nodesCloneNode(pFirstParam, (SNode**)&res);
25,023✔
1970
  if (NULL == res) {
25,023✔
1971
    return DEAL_RES_ERROR;
×
1972
  }
1973
  nodesDestroyNode(*pNode);
25,023✔
1974
  *pNode = (SNode*)res;
25,023✔
1975

1976
  return DEAL_RES_CONTINUE;
25,023✔
1977
}
1978

1979
static void extractTagColId(SOperatorNode* pOpNode, SArray* pColIdArray) {
39,490✔
1980
  SNode* pLeft = pOpNode->pLeft;
39,490✔
1981
  SNode* pRight = pOpNode->pRight;
39,490✔
1982
  SColumnNode* pColNode = nodeType(pLeft) == QUERY_NODE_COLUMN ?
39,490✔
1983
    (SColumnNode*)pLeft : (SColumnNode*)pRight;
39,490✔
1984

1985
  col_id_t colId = pColNode->colId;
39,490✔
1986
  void* _tmp = taosArrayPush(pColIdArray, &colId);
39,490✔
1987
}
39,490✔
1988

1989
static int32_t buildTagCondKey(
19,745✔
1990
  const SNode* pTagCond, char** pTagCondKey,
1991
  int32_t* tagCondKeyLen, SArray** pTagColIds) {
1992
  if (NULL == pTagCond ||
19,745✔
1993
    (nodeType(pTagCond) != QUERY_NODE_OPERATOR &&
19,745✔
1994
      nodeType(pTagCond) != QUERY_NODE_LOGIC_CONDITION)) {
19,745✔
1995
    qError("invalid parameter to extract tag filter symbol");
×
1996
    return TSDB_CODE_INTERNAL_ERROR;
×
1997
  }
1998
  int32_t code = TSDB_CODE_SUCCESS;
19,745✔
1999
  int32_t lino = 0;
19,745✔
2000
  *pTagColIds = taosArrayInit(4, sizeof(col_id_t));
19,745✔
2001

2002
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR) {
19,745✔
2003
    extractTagColId((SOperatorNode*)pTagCond, *pTagColIds);
×
2004
  } else if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION) {
19,745✔
2005
    SNode* pChild = NULL;
19,745✔
2006
    FOREACH(pChild, ((SLogicConditionNode*)pTagCond)->pParameterList) {
59,235✔
2007
      extractTagColId((SOperatorNode*)pChild, *pTagColIds);
39,490✔
2008
    }
2009
  }
2010

2011
  taosArraySort(*pTagColIds, compareUint16Val);
19,745✔
2012

2013
  // encode ordered colIds into key string, separated by ','
2014
  *tagCondKeyLen =
39,490✔
2015
    (int32_t)(taosArrayGetSize(*pTagColIds) * (sizeof(col_id_t) + 1) - 1);
19,745✔
2016
  *pTagCondKey = (char*)taosMemoryCalloc(1, *tagCondKeyLen);
19,745✔
2017
  TSDB_CHECK_NULL(*pTagCondKey, code, lino, _end, terrno);
19,745✔
2018
  char* pStart = *pTagCondKey;
19,745✔
2019
  for (int32_t i = 0; i < taosArrayGetSize(*pTagColIds); ++i) {
59,235✔
2020
    col_id_t* pColId = (col_id_t*)taosArrayGet(*pTagColIds, i);
39,490✔
2021
    TSDB_CHECK_NULL(pColId, code, lino, _end, terrno);
39,490✔
2022
    memcpy(pStart, pColId, sizeof(col_id_t));
39,490✔
2023
    pStart += sizeof(col_id_t);
39,490✔
2024
    if (i != taosArrayGetSize(*pTagColIds) - 1) {
39,490✔
2025
      *pStart = ',';
19,745✔
2026
      pStart += 1;
19,745✔
2027
    }
2028
  }
2029

2030
_end:
19,745✔
2031
  if (TSDB_CODE_SUCCESS != code) {
19,745✔
2032
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2033
    terrno = code;
×
2034
  }
2035
  return code;
19,745✔
2036
}
2037

2038
static EDealRes canOptimizeTagCondFilter(SNode* pTagCond, void* pContext) {
184,526✔
2039
  if (NULL == pTagCond) {
184,526✔
2040
    *(bool*)pContext = false;
×
2041
    return DEAL_RES_END;
×
2042
  }
2043
  if (nodeType(pTagCond) == QUERY_NODE_VALUE ||
184,526✔
2044
    nodeType(pTagCond) == QUERY_NODE_COLUMN) {
122,419✔
2045
    return DEAL_RES_CONTINUE;
101,597✔
2046
  }
2047
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR &&
82,929✔
2048
    ((SOperatorNode*)pTagCond)->opType == OP_TYPE_EQUAL) {
40,567✔
2049
    return DEAL_RES_CONTINUE;
39,490✔
2050
  }
2051
  if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION &&
43,439✔
2052
    ((SLogicConditionNode*)pTagCond)->condType == LOGIC_COND_TYPE_AND) {
19,745✔
2053
    return DEAL_RES_CONTINUE;
19,745✔
2054
  }
2055
  if (nodeType(pTagCond) == QUERY_NODE_FUNCTION &&
46,311✔
2056
    fmIsStreamPesudoColVal(((SFunctionNode*)pTagCond)->funcId)) {
22,617✔
2057
    return DEAL_RES_CONTINUE;
22,617✔
2058
  }
2059
  *(bool*)pContext = false;
1,077✔
2060
  return DEAL_RES_END;
1,077✔
2061
}
2062

2063
int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
121,582,875✔
2064
                     STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI, void* pStreamInfo) {
2065
  int32_t code = TSDB_CODE_SUCCESS;
121,582,875✔
2066
  int32_t lino = 0;
121,582,875✔
2067
  size_t  numOfTables = 0;
121,582,875✔
2068
  bool    listAdded = false;
121,582,875✔
2069

2070
  pListInfo->idInfo.suid = pScanNode->suid;
121,606,920✔
2071
  pListInfo->idInfo.tableType = pScanNode->tableType;
121,585,535✔
2072

2073
  SArray* pUidList = taosArrayInit(8, sizeof(uint64_t));
121,528,656✔
2074
  QUERY_CHECK_NULL(pUidList, code, lino, _error, terrno);
121,555,957✔
2075

2076
  SIdxFltStatus status = SFLT_NOT_INDEX;
121,555,957✔
2077
  char*   pTagCondKey = NULL;
121,543,743✔
2078
  int32_t tagCondKeyLen;
121,561,100✔
2079
  SArray* pTagColIds = NULL;
121,564,830✔
2080
  char*   pPayload = NULL;
121,574,171✔
2081
  qTrace("getTableList called, suid:%" PRIu64
121,574,171✔
2082
    ", tagCond:%p, tagIndexCond:%p, %d %d", pScanNode->suid, pTagCond,
2083
    pTagIndexCond, pScanNode->tableType, pScanNode->virtualStableScan);
2084
  if (pScanNode->tableType != TSDB_SUPER_TABLE && !pScanNode->virtualStableScan) {
121,574,171✔
2085
    pListInfo->idInfo.uid = pScanNode->uid;
52,491,172✔
2086
    if (pStorageAPI->metaFn.isTableExisted(pVnode, pScanNode->uid)) {
52,482,830✔
2087
      void* tmp = taosArrayPush(pUidList, &pScanNode->uid);
52,444,936✔
2088
      QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
52,442,253✔
2089
    }
2090
    code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status, pStorageAPI, false, &listAdded, pStreamInfo);
52,497,031✔
2091
    QUERY_CHECK_CODE(code, lino, _end);
52,496,734✔
2092
  } else {
2093
    bool      isStream = (pStreamInfo != NULL);
69,107,534✔
2094
    bool      hasTagCond = (pTagCond != NULL);
69,107,534✔
2095
    bool      canCacheTagEqCondFilter = false;
69,107,534✔
2096
    T_MD5_CTX context = {0};
69,086,530✔
2097

2098
    qTrace("start to get table list by tag filter, suid:%" PRIu64
69,134,714✔
2099
      ",tsStableTagFilterCache:%d, tsTagFilterCache:%d", 
2100
      pScanNode->suid, tsStableTagFilterCache, tsTagFilterCache);
2101

2102
    bool acquired = false;
69,134,714✔
2103
    // first, check whether we can use stable tag filter cache
2104
    if (tsStableTagFilterCache && isStream && hasTagCond) {
69,069,549✔
2105
      canCacheTagEqCondFilter = true;
20,822✔
2106
      nodesWalkExpr(pTagCond, canOptimizeTagCondFilter,
20,822✔
2107
        (void*)&canCacheTagEqCondFilter);
2108
    }
2109
    if (canCacheTagEqCondFilter) {
69,084,062✔
2110
      qDebug("%s, stable tag filter condition can be optimized", idstr);
19,745✔
2111
      if (((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
19,745✔
2112
        SNode* tmp = NULL;
19,745✔
2113
        code = nodesCloneNode((SNode*)pTagCond, &tmp);
19,745✔
2114
        QUERY_CHECK_CODE(code, lino, _error);
19,745✔
2115

2116
        PlaceHolderContext ctx = {.code = TSDB_CODE_SUCCESS, .pStreamRuntimeInfo = (SStreamRuntimeFuncInfo*)pStreamInfo};
19,745✔
2117
        nodesRewriteExpr(&tmp, replacePlaceHolderColumn, (void*)&ctx);
19,745✔
2118
        if (TSDB_CODE_SUCCESS != ctx.code) {
19,745✔
2119
          nodesDestroyNode(tmp);
×
2120
          code = ctx.code;
×
2121
          goto _error;
×
2122
        }
2123
        code = genStableTagFilterDigest(tmp, &context);
19,745✔
2124
        nodesDestroyNode(tmp);
19,745✔
2125
      } else {
2126
        code = genStableTagFilterDigest(pTagCond, &context);
×
2127
      }
2128
      QUERY_CHECK_CODE(code, lino, _error);
19,745✔
2129

2130
      code = buildTagCondKey(
19,745✔
2131
        pTagCond, &pTagCondKey, &tagCondKeyLen, &pTagColIds);
2132
      QUERY_CHECK_CODE(code, lino, _error);
19,745✔
2133
      code = pStorageAPI->metaFn.getStableCachedTableList(
19,745✔
2134
        pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
19,745✔
2135
        context.digest, tListLen(context.digest), pUidList, &acquired);
2136
      QUERY_CHECK_CODE(code, lino, _error);
19,745✔
2137
    } else if (tsTagFilterCache) {
69,064,317✔
2138
      // second, try to use normal tag filter cache
2139
      qDebug("%s using normal tag filter cache", idstr);
64,780✔
2140
      if (pStreamInfo != NULL && ((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
67,186✔
2141
        SNode* tmp = NULL;
2,406✔
2142
        code = nodesCloneNode((SNode*)pTagCond, &tmp);
2,406✔
2143
        QUERY_CHECK_CODE(code, lino, _error);
2,406✔
2144

2145
        PlaceHolderContext ctx = {.code = TSDB_CODE_SUCCESS, .pStreamRuntimeInfo = (SStreamRuntimeFuncInfo*)pStreamInfo};
2,406✔
2146
        nodesRewriteExpr(&tmp, replacePlaceHolderColumn, (void*)&ctx);
2,406✔
2147
        if (TSDB_CODE_SUCCESS != ctx.code) {
2,406✔
2148
          nodesDestroyNode(tmp);
×
2149
          code = ctx.code;
×
2150
          goto _error;
×
2151
        }
2152
        code = genTagFilterDigest(tmp, &context);
2,406✔
2153
        nodesDestroyNode(tmp);
2,406✔
2154
      } else {
2155
        code = genTagFilterDigest(pTagCond, &context);
62,374✔
2156
      }
2157
      // try to retrieve the result from meta cache
2158
      QUERY_CHECK_CODE(code, lino, _error);      
64,780✔
2159
      code = pStorageAPI->metaFn.getCachedTableList(
64,780✔
2160
        pVnode, pScanNode->suid, context.digest,
64,780✔
2161
        tListLen(context.digest), pUidList, &acquired);
2162
      QUERY_CHECK_CODE(code, lino, _error);
29,204✔
2163
    }
2164
    if (acquired) {
69,037,482✔
2165
      taosArrayDestroy(pTagColIds);
58,998✔
2166
      pTagColIds = NULL;
58,998✔
2167
      
2168
      digest[0] = 1;
58,998✔
2169
      memcpy(
117,996✔
2170
        digest + 1, context.digest, tListLen(context.digest));
58,998✔
2171
      qDebug("suid:%" PRIu64 ", %s retrieve table uid list from cache,"
58,998✔
2172
        " numOfTables:%d", 
2173
        pScanNode->suid, idstr, (int32_t)taosArrayGetSize(pUidList));
2174
      goto _end;
58,998✔
2175
    } else {
2176
      qDebug("suid:%" PRIu64 
68,978,484✔
2177
        ", failed to get table uid list from cache", pScanNode->suid);
2178
    }
2179

2180
    if (!pTagCond) {  // no tag filter condition exists, let's fetch all tables of this super table
69,083,299✔
2181
      code = pStorageAPI->metaFn.getChildTableList(pVnode, pScanNode->suid, pUidList);
56,387,825✔
2182
      QUERY_CHECK_CODE(code, lino, _error);
56,371,849✔
2183
      qTrace("no tag filter, get all child tables, numOfTables:%d", (int32_t)taosArrayGetSize(pUidList));
56,371,849✔
2184
    } else {
2185
      // failed to find the result in the cache, let try to calculate the results
2186
      if (pTagIndexCond) {
12,695,474✔
2187
        void* pIndex = pStorageAPI->metaFn.getInvertIndex(pVnode);
4,448,688✔
2188

2189
        SIndexMetaArg metaArg = {.metaEx = pVnode,
4,448,756✔
2190
                                 .idx = pStorageAPI->metaFn.storeGetIndexInfo(pVnode),
4,448,688✔
2191
                                 .ivtIdx = pIndex,
2192
                                 .suid = pScanNode->uid};
4,448,688✔
2193

2194
        status = SFLT_NOT_INDEX;
4,448,688✔
2195
        code = doFilterTag(pTagIndexCond, &metaArg, pUidList, &status, &pStorageAPI->metaFilter);
4,448,688✔
2196
        if (code != 0 || status == SFLT_NOT_INDEX) {  // temporarily disable it for performance sake
4,438,711✔
2197
          qDebug("failed to get tableIds from index, suid:%" PRIu64 ", uidListSize:%d", pScanNode->uid, (int32_t)taosArrayGetSize(pUidList));
1,079,046✔
2198
        } else {
2199
          qDebug("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
3,359,665✔
2200
        }
2201
      }
2202
    }
2203
    qTrace("after index filter, pTagCond:%p uidListSize:%d", pTagCond, (int32_t)taosArrayGetSize(pUidList));
69,069,679✔
2204
    code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status,
69,066,656✔
2205
      pStorageAPI, tsTagFilterCache || tsStableTagFilterCache,
69,066,656✔
2206
      &listAdded, pStreamInfo);
2207
    QUERY_CHECK_CODE(code, lino, _error);
69,059,253✔
2208

2209
    // let's add the filter results into meta-cache
2210
    numOfTables = taosArrayGetSize(pUidList);
69,058,149✔
2211

2212
    if (canCacheTagEqCondFilter) {
69,053,255✔
2213
      qInfo("%s, suid:%" PRIu64 ", add uid list to stable tag filter cache, "
10,052✔
2214
            "uidListSize:%d, origin key:%" PRIu64 ",%" PRIu64,
2215
            idstr, pScanNode->suid, (int32_t)numOfTables,
2216
            *(uint64_t*)context.digest, *(uint64_t*)(context.digest + 8));
2217

2218
      code = pStorageAPI->metaFn.putStableCachedTableList(
10,052✔
2219
        pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
2220
        context.digest, tListLen(context.digest),
2221
        pUidList, &pTagColIds);
2222
      QUERY_CHECK_CODE(code, lino, _end);
10,052✔
2223

2224
      digest[0] = 1;
10,052✔
2225
      memcpy(digest + 1, context.digest, tListLen(context.digest));
10,052✔
2226
    } else if (tsTagFilterCache) {
69,043,203✔
2227
      qInfo("%s, suid:%" PRIu64 ", add uid list to normal tag filter cache, "
15,475✔
2228
            "uidListSize:%d, origin key:%" PRIu64 ",%" PRIu64,
2229
            idstr, pScanNode->suid, (int32_t)numOfTables,
2230
            *(uint64_t*)context.digest, *(uint64_t*)(context.digest + 8));
2231
      size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
15,475✔
2232
      pPayload = taosMemoryMalloc(size);
15,475✔
2233
      QUERY_CHECK_NULL(pPayload, code, lino, _end, terrno);
15,475✔
2234

2235
      *(int32_t*)pPayload = (int32_t)numOfTables;
15,475✔
2236
      if (numOfTables > 0) {
15,475✔
2237
        void* tmp = taosArrayGet(pUidList, 0);
12,794✔
2238
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
12,794✔
2239
        memcpy(pPayload + sizeof(int32_t), tmp, numOfTables * sizeof(uint64_t));
12,794✔
2240
      }
2241

2242
      code = pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid,
15,475✔
2243
                                                    context.digest,
2244
                                                    tListLen(context.digest),
2245
                                                    pPayload, size, 1);
2246
      if (TSDB_CODE_SUCCESS == code) {
15,475✔
2247
        /*
2248
          data referenced by pPayload is used in lru cache,
2249
          reset pPayload to NULL to avoid being freed in _error block
2250
        */
2251
        pPayload = NULL;
15,475✔
2252
      } else {
2253
        if (TSDB_CODE_DUP_KEY == code) {
×
2254
          /*
2255
            another thread has already put the same key into cache,
2256
            we can just ignore this error
2257
          */
2258
          code = TSDB_CODE_SUCCESS;
×
2259
        }
2260
        QUERY_CHECK_CODE(code, lino, _end);
×
2261
      }
2262

2263

2264
      digest[0] = 1;
15,475✔
2265
      memcpy(digest + 1, context.digest, tListLen(context.digest));
15,475✔
2266
    }
2267
  }
2268

2269
_end:
121,604,515✔
2270
  if (!listAdded) {
121,626,176✔
2271
    numOfTables = taosArrayGetSize(pUidList);
110,433,838✔
2272
    for (int i = 0; i < numOfTables; i++) {
429,136,528✔
2273
      void* tmp = taosArrayGet(pUidList, i);
318,696,527✔
2274
      QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
318,713,722✔
2275
      STableKeyInfo info = {.uid = *(uint64_t*)tmp, .groupId = 0};
318,713,722✔
2276

2277
      void* p = taosArrayPush(pListInfo->pTableList, &info);
318,716,275✔
2278
      if (p == NULL) {
318,746,068✔
2279
        taosArrayDestroy(pUidList);
×
2280
        return terrno;
×
2281
      }
2282

2283
      qTrace("tagfilter get uid:%" PRIu64 ", %s", info.uid, idstr);
318,746,068✔
2284
    }
2285
  }
2286

2287
  qDebug("%s, table list with %d uids built", idstr, (int32_t)numOfTables);
121,632,339✔
2288

2289
_error:
121,635,185✔
2290
  taosArrayDestroy(pUidList);
121,646,792✔
2291
  taosArrayDestroy(pTagColIds);
121,631,097✔
2292
  taosMemFreeClear(pTagCondKey);
121,631,949✔
2293
  taosMemFreeClear(pPayload);
121,631,949✔
2294
  if (code != TSDB_CODE_SUCCESS) {
121,631,949✔
2295
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,104✔
2296
  }
2297
  return code;
121,629,846✔
2298
}
2299

2300
int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList, void* pTaskInfo) {
5,620✔
2301
  int32_t        code = TSDB_CODE_SUCCESS;
5,620✔
2302
  int32_t        lino = 0;
5,620✔
2303
  SSubplan*      pSubplan = (SSubplan*)node;
5,620✔
2304
  SScanPhysiNode pNode = {0};
5,620✔
2305
  pNode.suid = suid;
5,620✔
2306
  pNode.uid = suid;
5,620✔
2307
  pNode.tableType = TSDB_SUPER_TABLE;
5,620✔
2308

2309
  STableListInfo* pTableListInfo = tableListCreate();
5,620✔
2310
  QUERY_CHECK_NULL(pTableListInfo, code, lino, _end, terrno);
5,620✔
2311
  uint8_t digest[17] = {0};
5,620✔
2312
  code = getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL,
5,620✔
2313
                      pTableListInfo, digest, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI, NULL);
2314
  QUERY_CHECK_CODE(code, lino, _end);
5,620✔
2315
  *tableList = pTableListInfo->pTableList;
5,620✔
2316
  pTableListInfo->pTableList = NULL;
5,620✔
2317
  tableListDestroy(pTableListInfo);
5,620✔
2318

2319
_end:
5,620✔
2320
  if (code != TSDB_CODE_SUCCESS) {
5,620✔
2321
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2322
  }
2323
  return code;
5,620✔
2324
}
2325

2326
size_t getTableTagsBufLen(const SNodeList* pGroups) {
×
2327
  size_t keyLen = 0;
×
2328

2329
  SNode* node;
2330
  FOREACH(node, pGroups) {
×
2331
    SExprNode* pExpr = (SExprNode*)node;
×
2332
    keyLen += pExpr->resType.bytes;
×
2333
  }
2334

2335
  keyLen += sizeof(int8_t) * LIST_LENGTH(pGroups);
×
2336
  return keyLen;
×
2337
}
2338

2339
int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId,
×
2340
                              SStorageAPI* pAPI) {
2341
  SMetaReader mr = {0};
×
2342

2343
  pAPI->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &pAPI->metaFn);
×
2344
  if (pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid) != 0) {  // table not exist
×
2345
    pAPI->metaReaderFn.clearReader(&mr);
×
2346
    return TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
2347
  }
2348

2349
  SNodeList* groupNew = NULL;
×
2350
  int32_t    code = nodesCloneList(pGroupNode, &groupNew);
×
2351
  if (TSDB_CODE_SUCCESS != code) {
×
2352
    pAPI->metaReaderFn.clearReader(&mr);
×
2353
    return code;
×
2354
  }
2355

2356
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
×
2357
  nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &ctx);
×
2358
  if (TSDB_CODE_SUCCESS != ctx.code) {
×
2359
    nodesDestroyList(groupNew);
×
2360
    pAPI->metaReaderFn.clearReader(&mr);
×
2361
    return code;
×
2362
  }
2363
  char* isNull = (char*)keyBuf;
×
2364
  char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(pGroupNode);
×
2365

2366
  SNode*  pNode;
2367
  int32_t index = 0;
×
2368
  FOREACH(pNode, groupNew) {
×
2369
    SNode*  pNew = NULL;
×
2370
    int32_t code = scalarCalculateConstants(pNode, &pNew);
×
2371
    if (TSDB_CODE_SUCCESS == code) {
×
2372
      REPLACE_NODE(pNew);
×
2373
    } else {
2374
      nodesDestroyList(groupNew);
×
2375
      pAPI->metaReaderFn.clearReader(&mr);
×
2376
      return code;
×
2377
    }
2378

2379
    if (nodeType(pNew) != QUERY_NODE_VALUE) {
×
2380
      nodesDestroyList(groupNew);
×
2381
      pAPI->metaReaderFn.clearReader(&mr);
×
2382
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
2383
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2384
    }
2385
    SValueNode* pValue = (SValueNode*)pNew;
×
2386

2387
    if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
×
2388
      isNull[index++] = 1;
×
2389
      continue;
×
2390
    } else {
2391
      isNull[index++] = 0;
×
2392
      char* data = nodesGetValueFromNode(pValue);
×
2393
      if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
×
2394
        if (tTagIsJson(data)) {
×
2395
          terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
×
2396
          nodesDestroyList(groupNew);
×
2397
          pAPI->metaReaderFn.clearReader(&mr);
×
2398
          return terrno;
×
2399
        }
2400
        int32_t len = getJsonValueLen(data);
×
2401
        memcpy(pStart, data, len);
×
2402
        pStart += len;
×
2403
      } else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
×
2404
        if (IS_STR_DATA_BLOB(pValue->node.resType.type)) {
×
2405
          return TSDB_CODE_BLOB_NOT_SUPPORT_TAG;
×
2406
        }
2407
        memcpy(pStart, data, varDataTLen(data));
×
2408
        pStart += varDataTLen(data);
×
2409
      } else {
2410
        memcpy(pStart, data, pValue->node.resType.bytes);
×
2411
        pStart += pValue->node.resType.bytes;
×
2412
      }
2413
    }
2414
  }
2415

2416
  int32_t len = (int32_t)(pStart - (char*)keyBuf);
×
2417
  *pGroupId = calcGroupId(keyBuf, len);
×
2418

2419
  nodesDestroyList(groupNew);
×
2420
  pAPI->metaReaderFn.clearReader(&mr);
×
2421

2422
  return TSDB_CODE_SUCCESS;
×
2423
}
2424

2425
SArray* makeColumnArrayFromList(SNodeList* pNodeList) {
1,799,514✔
2426
  if (!pNodeList) {
1,799,514✔
2427
    return NULL;
×
2428
  }
2429

2430
  size_t  numOfCols = LIST_LENGTH(pNodeList);
1,799,514✔
2431
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
1,799,968✔
2432
  if (pList == NULL) {
1,798,117✔
2433
    return NULL;
×
2434
  }
2435

2436
  for (int32_t i = 0; i < numOfCols; ++i) {
4,056,265✔
2437
    SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
2,260,453✔
2438
    if (!pColNode) {
2,260,907✔
2439
      taosArrayDestroy(pList);
×
2440
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
2441
      return NULL;
×
2442
    }
2443

2444
    // todo extract method
2445
    SColumn c = {0};
2,260,907✔
2446
    c.slotId = pColNode->slotId;
2,260,453✔
2447
    c.colId = pColNode->colId;
2,260,453✔
2448
    c.type = pColNode->node.resType.type;
2,259,528✔
2449
    c.bytes = pColNode->node.resType.bytes;
2,259,074✔
2450
    c.precision = pColNode->node.resType.precision;
2,259,965✔
2451
    c.scale = pColNode->node.resType.scale;
2,259,982✔
2452

2453
    void* tmp = taosArrayPush(pList, &c);
2,259,528✔
2454
    if (!tmp) {
2,259,528✔
2455
      taosArrayDestroy(pList);
×
2456
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
2457
      return NULL;
×
2458
    }
2459
  }
2460

2461
  return pList;
1,795,812✔
2462
}
2463

2464
int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
151,563,192✔
2465
                            int32_t type, SColMatchInfo* pMatchInfo) {
2466
  size_t  numOfCols = LIST_LENGTH(pNodeList);
151,563,192✔
2467
  int32_t code = TSDB_CODE_SUCCESS;
151,563,495✔
2468
  int32_t lino = 0;
151,563,495✔
2469

2470
  pMatchInfo->matchType = type;
151,563,495✔
2471

2472
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchItem));
151,559,402✔
2473
  if (pList == NULL) {
151,542,463✔
2474
    code = terrno;
×
2475
    return code;
×
2476
  }
2477

2478
  for (int32_t i = 0; i < numOfCols; ++i) {
938,231,500✔
2479
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
786,662,680✔
2480
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
786,689,030✔
2481
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
786,689,030✔
2482
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
782,265,011✔
2483

2484
      SColMatchItem c = {.needOutput = true};
782,279,778✔
2485
      c.colId = pColNode->colId;
782,272,923✔
2486
      c.srcSlotId = pColNode->slotId;
782,263,090✔
2487
      c.dstSlotId = pNode->slotId;
782,269,302✔
2488
      c.isPk = pColNode->isPk;
782,279,696✔
2489
      c.dataType = pColNode->node.resType;
782,275,294✔
2490
      void* tmp = taosArrayPush(pList, &c);
782,275,364✔
2491
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
782,275,364✔
2492
    }
2493
  }
2494

2495
  // set the output flag for each column in SColMatchInfo, according to the
2496
  *numOfOutputCols = 0;
151,568,820✔
2497
  int32_t num = LIST_LENGTH(pOutputNodeList->pSlots);
151,578,633✔
2498
  for (int32_t i = 0; i < num; ++i) {
1,026,211,631✔
2499
    SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i);
874,635,567✔
2500
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
874,679,806✔
2501

2502
    // todo: add reserve flag check
2503
    // it is a column reserved for the arithmetic expression calculation
2504
    if (pNode->slotId >= numOfCols) {
874,679,806✔
2505
      (*numOfOutputCols) += 1;
87,975,702✔
2506
      continue;
87,977,952✔
2507
    }
2508

2509
    SColMatchItem* info = NULL;
786,713,609✔
2510
    for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
2,147,483,647✔
2511
      info = taosArrayGet(pList, j);
2,147,483,647✔
2512
      QUERY_CHECK_NULL(info, code, lino, _end, terrno);
2,147,483,647✔
2513
      if (info->dstSlotId == pNode->slotId) {
2,147,483,647✔
2514
        break;
781,457,384✔
2515
      }
2516
    }
2517

2518
    if (pNode->output) {
13,358,120✔
2519
      (*numOfOutputCols) += 1;
781,052,776✔
2520
    } else if (info != NULL) {
5,631,843✔
2521
      // select distinct tbname from stb where tbname='abc';
2522
      info->needOutput = false;
5,638,397✔
2523
    }
2524
  }
2525

2526
  pMatchInfo->pList = pList;
151,576,064✔
2527

2528
_end:
151,563,328✔
2529
  if (code != TSDB_CODE_SUCCESS) {
151,563,328✔
2530
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2531
  }
2532
  return code;
151,555,485✔
2533
}
2534

2535
static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision,
674,789,893✔
2536
                                  const char* name) {
2537
  SResSchema s = {0};
674,789,893✔
2538
  s.scale = scale;
674,885,249✔
2539
  s.type = type;
674,885,249✔
2540
  s.bytes = bytes;
674,885,249✔
2541
  s.slotId = slotId;
674,885,249✔
2542
  s.precision = precision;
674,885,249✔
2543
  tstrncpy(s.name, name, tListLen(s.name));
674,885,249✔
2544

2545
  return s;
674,885,249✔
2546
}
2547

2548
static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType, EColumnType colType) {
647,861,528✔
2549
  SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn));
647,861,528✔
2550
  if (pCol == NULL) {
647,692,594✔
2551
    return NULL;
×
2552
  }
2553

2554
  pCol->slotId = slotId;
647,692,594✔
2555
  pCol->colId = colId;
647,703,207✔
2556
  pCol->bytes = pType->bytes;
647,727,438✔
2557
  pCol->type = pType->type;
647,743,868✔
2558
  pCol->scale = pType->scale;
647,810,291✔
2559
  pCol->precision = pType->precision;
647,797,362✔
2560
  pCol->dataBlockId = blockId;
647,881,127✔
2561
  pCol->colType = colType;
647,869,245✔
2562
  return pCol;
647,888,938✔
2563
}
2564

2565
int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
674,867,784✔
2566
  int32_t code = TSDB_CODE_SUCCESS;
674,867,784✔
2567
  int32_t lino = 0;
674,867,784✔
2568
  pExp->base.numOfParams = 0;
674,867,784✔
2569
  pExp->base.pParam = NULL;
674,882,637✔
2570
  pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode));
674,861,874✔
2571
  QUERY_CHECK_NULL(pExp->pExpr, code, lino, _end, terrno);
674,634,728✔
2572

2573
  pExp->pExpr->_function.num = 1;
674,675,040✔
2574
  pExp->pExpr->_function.functionId = -1;
674,704,704✔
2575

2576
  int32_t type = nodeType(pNode);
674,706,875✔
2577
  // it is a project query, or group by column
2578
  if (type == QUERY_NODE_COLUMN) {
674,815,237✔
2579
    pExp->pExpr->nodeType = QUERY_NODE_COLUMN;
445,730,507✔
2580
    SColumnNode* pColNode = (SColumnNode*)pNode;
445,750,882✔
2581

2582
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
445,750,882✔
2583
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
445,710,228✔
2584

2585
    pExp->base.numOfParams = 1;
445,716,762✔
2586

2587
    SDataType* pType = &pColNode->node.resType;
445,709,124✔
2588
    pExp->base.resSchema =
2589
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pColNode->colName);
445,728,922✔
2590

2591
    pExp->base.pParam[0].pCol =
891,510,347✔
2592
        createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType);
891,489,503✔
2593
    QUERY_CHECK_NULL(pExp->base.pParam[0].pCol, code, lino, _end, terrno);
445,752,904✔
2594

2595
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
445,714,573✔
2596
  } else if (type == QUERY_NODE_VALUE) {
229,084,730✔
2597
    pExp->pExpr->nodeType = QUERY_NODE_VALUE;
8,303,346✔
2598
    SValueNode* pValNode = (SValueNode*)pNode;
8,302,787✔
2599

2600
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
8,302,787✔
2601
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
8,300,066✔
2602

2603
    pExp->base.numOfParams = 1;
8,302,429✔
2604

2605
    SDataType* pType = &pValNode->node.resType;
8,303,007✔
2606
    pExp->base.resSchema =
2607
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName);
8,301,394✔
2608
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
8,303,056✔
2609
    code = nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
8,300,875✔
2610
    QUERY_CHECK_CODE(code, lino, _end);
8,301,960✔
2611
  } else if (type == QUERY_NODE_FUNCTION) {
220,781,384✔
2612
    pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
208,987,701✔
2613
    SFunctionNode* pFuncNode = (SFunctionNode*)pNode;
208,994,082✔
2614

2615
    SDataType* pType = &pFuncNode->node.resType;
208,994,082✔
2616
    pExp->base.resSchema =
2617
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pFuncNode->node.aliasName);
209,006,730✔
2618
    tExprNode* pExprNode = pExp->pExpr;
208,986,933✔
2619

2620
    pExprNode->_function.functionId = pFuncNode->funcId;
208,991,282✔
2621
    pExprNode->_function.pFunctNode = pFuncNode;
209,004,602✔
2622
    pExprNode->_function.functionType = pFuncNode->funcType;
208,992,615✔
2623

2624
    tstrncpy(pExprNode->_function.functionName, pFuncNode->functionName, tListLen(pExprNode->_function.functionName));
208,997,852✔
2625

2626
    pExp->base.pParamList = pFuncNode->pParameterList;
208,991,882✔
2627
#if 1
2628
    // todo refactor: add the parameter for tbname function
2629
    const char* name = "tbname";
208,981,771✔
2630
    int32_t     len = strlen(name);
208,981,771✔
2631

2632
    if (!pFuncNode->pParameterList && (memcmp(pExprNode->_function.functionName, name, len) == 0) &&
208,981,771✔
2633
        pExprNode->_function.functionName[len] == 0) {
8,779,642✔
2634
      pFuncNode->pParameterList = NULL;
8,783,789✔
2635
      int32_t     code = nodesMakeList(&pFuncNode->pParameterList);
8,783,600✔
2636
      SValueNode* res = NULL;
8,782,842✔
2637
      if (TSDB_CODE_SUCCESS == code) {
8,782,842✔
2638
        code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
8,782,818✔
2639
      }
2640
      QUERY_CHECK_CODE(code, lino, _end);
8,782,545✔
2641
      res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
8,782,545✔
2642
      code = nodesListAppend(pFuncNode->pParameterList, (SNode*)res);
8,784,558✔
2643
      if (code != TSDB_CODE_SUCCESS) {
8,783,269✔
2644
        nodesDestroyNode((SNode*)res);
×
2645
        res = NULL;
×
2646
      }
2647
      QUERY_CHECK_CODE(code, lino, _end);
8,783,269✔
2648
    }
2649
#endif
2650

2651
    int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
208,995,050✔
2652

2653
    pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam));
209,000,172✔
2654
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
208,965,918✔
2655
    pExp->base.numOfParams = numOfParam;
208,966,202✔
2656

2657
    for (int32_t j = 0; j < numOfParam && TSDB_CODE_SUCCESS == code; ++j) {
536,033,055✔
2658
      SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
327,050,046✔
2659
      QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
327,064,366✔
2660
      if (p1->type == QUERY_NODE_COLUMN) {
327,064,366✔
2661
        SColumnNode* pcn = (SColumnNode*)p1;
202,116,819✔
2662

2663
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
202,116,819✔
2664
        pExp->base.pParam[j].pCol =
404,223,054✔
2665
            createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType);
404,209,780✔
2666
        QUERY_CHECK_NULL(pExp->base.pParam[j].pCol, code, lino, _end, terrno);
202,109,093✔
2667
      } else if (p1->type == QUERY_NODE_VALUE) {
124,936,385✔
2668
        SValueNode* pvn = (SValueNode*)p1;
64,279,987✔
2669
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
64,279,987✔
2670
        code = nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
64,270,686✔
2671
        QUERY_CHECK_CODE(code, lino, _end);
64,256,532✔
2672
      }
2673
    }
2674
    pExp->pExpr->_function.bindExprID = ((SExprNode*)pNode)->bindExprID;
208,983,009✔
2675
  } else if (type == QUERY_NODE_OPERATOR) {
11,793,683✔
2676
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
10,741,202✔
2677
    SOperatorNode* pOpNode = (SOperatorNode*)pNode;
10,741,053✔
2678

2679
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
10,741,053✔
2680
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
10,738,659✔
2681
    pExp->base.numOfParams = 1;
10,738,522✔
2682

2683
    SDataType* pType = &pOpNode->node.resType;
10,739,380✔
2684
    pExp->base.resSchema =
2685
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pOpNode->node.aliasName);
10,739,118✔
2686
    pExp->pExpr->_optrRoot.pRootNode = pNode;
10,739,090✔
2687
  } else if (type == QUERY_NODE_CASE_WHEN) {
1,052,481✔
2688
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
1,053,977✔
2689
    SCaseWhenNode* pCaseNode = (SCaseWhenNode*)pNode;
1,053,977✔
2690

2691
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
1,053,977✔
2692
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
1,053,977✔
2693
    pExp->base.numOfParams = 1;
1,053,977✔
2694

2695
    SDataType* pType = &pCaseNode->node.resType;
1,053,977✔
2696
    pExp->base.resSchema =
2697
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCaseNode->node.aliasName);
1,053,977✔
2698
    pExp->pExpr->_optrRoot.pRootNode = pNode;
1,053,977✔
2699
  } else if (type == QUERY_NODE_LOGIC_CONDITION) {
178✔
2700
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
1,204✔
2701
    SLogicConditionNode* pCond = (SLogicConditionNode*)pNode;
1,204✔
2702
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
1,204✔
2703
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
1,204✔
2704
    pExp->base.numOfParams = 1;
1,204✔
2705
    SDataType* pType = &pCond->node.resType;
1,204✔
2706
    pExp->base.resSchema =
2707
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName);
1,204✔
2708
    pExp->pExpr->_optrRoot.pRootNode = pNode;
1,204✔
2709
  } else {
2710
    code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
168✔
2711
    QUERY_CHECK_CODE(code, lino, _end);
168✔
2712
  }
2713
  pExp->pExpr->relatedTo = ((SExprNode*)pNode)->relatedTo;
674,818,800✔
2714
_end:
674,830,206✔
2715
  if (code != TSDB_CODE_SUCCESS) {
674,830,206✔
2716
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2717
  }
2718
  return code;
674,821,290✔
2719
}
2720

2721
int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) {
674,830,045✔
2722
  return createExprFromOneNode(pExp, pTargetNode->pExpr, pTargetNode->slotId);
674,830,045✔
2723
}
2724

2725
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
×
2726
  *numOfExprs = LIST_LENGTH(pNodeList);
×
2727
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
×
2728
  if (!pExprs) {
×
2729
    return NULL;
×
2730
  }
2731

2732
  for (int32_t i = 0; i < (*numOfExprs); ++i) {
×
2733
    SExprInfo* pExp = &pExprs[i];
×
2734
    int32_t    code = createExprFromOneNode(pExp, nodesListGetNode(pNodeList, i), i + UD_TAG_COLUMN_INDEX);
×
2735
    if (code != TSDB_CODE_SUCCESS) {
×
2736
      taosMemoryFreeClear(pExprs);
×
2737
      terrno = code;
×
2738
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2739
      return NULL;
×
2740
    }
2741
  }
2742

2743
  return pExprs;
×
2744
}
2745

2746
int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs) {
211,823,166✔
2747
  QRY_PARAM_CHECK(pExprInfo);
211,823,166✔
2748

2749
  int32_t code = 0;
211,850,843✔
2750
  int32_t numOfFuncs = LIST_LENGTH(pNodeList);
211,850,843✔
2751
  int32_t numOfGroupKeys = 0;
211,819,708✔
2752
  if (pGroupKeys != NULL) {
211,819,708✔
2753
    numOfGroupKeys = LIST_LENGTH(pGroupKeys);
19,440,777✔
2754
  }
2755

2756
  *numOfExprs = numOfFuncs + numOfGroupKeys;
211,820,275✔
2757
  if (*numOfExprs == 0) {
211,836,469✔
2758
    return code;
20,320,058✔
2759
  }
2760

2761
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
191,544,622✔
2762
  if (pExprs == NULL) {
191,442,028✔
2763
    return terrno;
×
2764
  }
2765

2766
  for (int32_t i = 0; i < (*numOfExprs); ++i) {
866,118,800✔
2767
    STargetNode* pTargetNode = NULL;
674,635,413✔
2768
    if (i < numOfFuncs) {
674,635,413✔
2769
      pTargetNode = (STargetNode*)nodesListGetNode(pNodeList, i);
645,387,399✔
2770
    } else {
2771
      pTargetNode = (STargetNode*)nodesListGetNode(pGroupKeys, i - numOfFuncs);
29,248,014✔
2772
    }
2773
    if (!pTargetNode) {
674,713,208✔
2774
      destroyExprInfo(pExprs, *numOfExprs);
×
2775
      taosMemoryFreeClear(pExprs);
×
2776
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
2777
      return terrno;
×
2778
    }
2779

2780
    SExprInfo* pExp = &pExprs[i];
674,713,208✔
2781
    code = createExprFromTargetNode(pExp, pTargetNode);
674,729,938✔
2782
    if (code != TSDB_CODE_SUCCESS) {
674,676,772✔
2783
      destroyExprInfo(pExprs, *numOfExprs);
×
2784
      taosMemoryFreeClear(pExprs);
×
2785
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2786
      return code;
×
2787
    }
2788
  }
2789

2790
  *pExprInfo = pExprs;
191,521,941✔
2791
  return code;
191,509,616✔
2792
}
2793

2794
static void deleteSubsidiareCtx(void* pData) {
×
2795
  SSubsidiaryResInfo* pCtx = (SSubsidiaryResInfo*)pData;
×
2796
  if (pCtx->pCtx) {
×
2797
    taosMemoryFreeClear(pCtx->pCtx);
×
2798
  }
2799
}
×
2800

2801
// set the output buffer for the selectivity + tag query
2802
static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
201,352,922✔
2803
  int32_t num = 0;
201,352,922✔
2804
  int32_t code = TSDB_CODE_SUCCESS;
201,352,922✔
2805
  int32_t lino = 0;
201,352,922✔
2806

2807
  SArray* pValCtxArray = NULL;
201,352,922✔
2808
  for (int32_t i = numOfOutput - 1; i > 0; --i) {  // select Func is at the end of the list
680,035,800✔
2809
    int32_t funcIdx = pCtx[i].pExpr->pExpr->_function.bindExprID;
478,732,853✔
2810
    if (funcIdx > 0) {
478,737,750✔
2811
      if (pValCtxArray == NULL) {
1,846,496✔
2812
        // the end of the list is the select function of biggest index
2813
        pValCtxArray = taosArrayInit_s(sizeof(SSubsidiaryResInfo*), funcIdx);
1,324,039✔
2814
        if (pValCtxArray == NULL) {
1,325,049✔
2815
          return terrno;
×
2816
        }
2817
      }
2818
      if (funcIdx > pValCtxArray->size) {
1,847,506✔
2819
        qError("funcIdx:%d is out of range", funcIdx);
×
2820
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2821
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2822
      }
2823
      SSubsidiaryResInfo* pSubsidiary = &pCtx[i].subsidiaries;
1,844,981✔
2824
      pSubsidiary->pCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
1,847,001✔
2825
      if (pSubsidiary->pCtx == NULL) {
1,847,001✔
2826
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2827
        return terrno;
×
2828
      }
2829
      pSubsidiary->num = 0;
1,847,001✔
2830
      taosArraySet(pValCtxArray, funcIdx - 1, &pSubsidiary);
1,845,991✔
2831
    }
2832
  }
2833

2834
  SqlFunctionCtx*  p = NULL;
201,302,947✔
2835
  SqlFunctionCtx** pValCtx = NULL;
201,302,947✔
2836
  if (pValCtxArray == NULL) {
201,302,947✔
2837
    pValCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
199,996,161✔
2838
    if (pValCtx == NULL) {
199,992,199✔
2839
      QUERY_CHECK_CODE(terrno, lino, _end);
×
2840
    }
2841
  }
2842

2843
  for (int32_t i = 0; i < numOfOutput; ++i) {
868,448,693✔
2844
    const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
667,149,908✔
2845
    if ((strcmp(pName, "_select_value") == 0)) {
667,203,118✔
2846
      if (pValCtxArray == NULL) {
5,833,689✔
2847
        pValCtx[num++] = &pCtx[i];
3,240,559✔
2848
      } else {
2849
        int32_t bindFuncIndex = pCtx[i].pExpr->pExpr->relatedTo;  // start from index 1;
2,593,130✔
2850
        if (bindFuncIndex > 0) {                                  // 0 is default index related to the select function
2,592,111✔
2851
          bindFuncIndex -= 1;
2,533,026✔
2852
        }
2853
        SSubsidiaryResInfo** pSubsidiary = taosArrayGet(pValCtxArray, bindFuncIndex);
2,592,111✔
2854
        if (pSubsidiary == NULL) {
2,594,131✔
2855
          QUERY_CHECK_CODE(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, lino, _end);
×
2856
        }
2857
        (*pSubsidiary)->pCtx[(*pSubsidiary)->num] = &pCtx[i];
2,594,131✔
2858
        (*pSubsidiary)->num++;
2,593,626✔
2859
      }
2860
    } else if (fmIsSelectFunc(pCtx[i].functionId)) {
661,369,429✔
2861
      if (pValCtxArray == NULL) {
52,825,393✔
2862
        p = &pCtx[i];
50,579,126✔
2863
      }
2864
    }
2865
  }
2866

2867
  if (p != NULL) {
201,298,785✔
2868
    p->subsidiaries.pCtx = pValCtx;
20,867,515✔
2869
    p->subsidiaries.num = num;
20,866,536✔
2870
  } else {
2871
    taosMemoryFreeClear(pValCtx);
180,431,270✔
2872
  }
2873

2874
_end:
1,299,147✔
2875
  if (code != TSDB_CODE_SUCCESS) {
201,309,652✔
2876
    taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2877
    taosMemoryFreeClear(pValCtx);
×
2878
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2879
  } else {
2880
    taosArrayDestroy(pValCtxArray);
201,309,652✔
2881
  }
2882
  return code;
201,332,030✔
2883
}
2884

2885
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset,
201,347,230✔
2886
                                     SFunctionStateStore* pStore) {
2887
  int32_t         code = TSDB_CODE_SUCCESS;
201,347,230✔
2888
  int32_t         lino = 0;
201,347,230✔
2889
  SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
201,347,230✔
2890
  if (pFuncCtx == NULL) {
201,273,056✔
2891
    return NULL;
×
2892
  }
2893

2894
  *rowEntryInfoOffset = taosMemoryCalloc(numOfOutput, sizeof(int32_t));
201,273,056✔
2895
  if (*rowEntryInfoOffset == 0) {
201,371,762✔
2896
    taosMemoryFreeClear(pFuncCtx);
×
2897
    return NULL;
×
2898
  }
2899

2900
  for (int32_t i = 0; i < numOfOutput; ++i) {
868,594,822✔
2901
    SExprInfo* pExpr = &pExprInfo[i];
667,249,166✔
2902

2903
    SExprBasicInfo* pFunct = &pExpr->base;
667,223,749✔
2904
    SqlFunctionCtx* pCtx = &pFuncCtx[i];
667,243,540✔
2905

2906
    pCtx->functionId = -1;
667,250,020✔
2907
    pCtx->pExpr = pExpr;
667,254,710✔
2908

2909
    if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) {
667,237,840✔
2910
      SFuncExecEnv env = {0};
207,662,746✔
2911
      pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
207,665,366✔
2912
      pCtx->isPseudoFunc = fmIsWindowPseudoColumnFunc(pCtx->functionId) || fmIsPlaceHolderFunc(pCtx->functionId);
207,670,999✔
2913
      pCtx->isNotNullFunc = fmIsNotNullOutputFunc(pCtx->functionId);
207,671,599✔
2914

2915
      bool isUdaf = fmIsUserDefinedFunc(pCtx->functionId);
207,659,549✔
2916
      if (fmIsAggFunc(pCtx->functionId) || fmIsIndefiniteRowsFunc(pCtx->functionId)) {
332,860,815✔
2917
        if (!isUdaf) {
125,215,684✔
2918
          code = fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
125,172,352✔
2919
          QUERY_CHECK_CODE(code, lino, _end);
125,172,036✔
2920
        } else {
2921
          char* udfName = pExpr->pExpr->_function.pFunctNode->functionName;
43,332✔
2922
          pCtx->udfName = taosStrdup(udfName);
43,332✔
2923
          QUERY_CHECK_NULL(pCtx->udfName, code, lino, _end, terrno);
43,332✔
2924

2925
          code = fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
43,332✔
2926
          QUERY_CHECK_CODE(code, lino, _end);
43,332✔
2927
        }
2928
        bool tmp = pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
125,215,368✔
2929
        if (!tmp) {
125,204,227✔
2930
          code = terrno;
×
2931
          QUERY_CHECK_CODE(code, lino, _end);
×
2932
        }
2933
      } else {
2934
        if (fmIsPlaceHolderFunc(pCtx->functionId)) {
82,436,496✔
2935
          code = fmGetStreamPesudoFuncEnv(pCtx->functionId, pExpr->base.pParamList, &env);
8,710,947✔
2936
          QUERY_CHECK_CODE(code, lino, _end);
8,711,827✔
2937
        }      
2938
        
2939
        code = fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp);
82,444,815✔
2940
        if (code != TSDB_CODE_SUCCESS && isUdaf) {
82,449,514✔
2941
          code = TSDB_CODE_SUCCESS;
26,568✔
2942
        }
2943
        QUERY_CHECK_CODE(code, lino, _end);
82,449,514✔
2944

2945
        if (pCtx->sfp.getEnv != NULL) {
82,449,514✔
2946
          bool tmp = pCtx->sfp.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
16,667,523✔
2947
          if (!tmp) {
16,668,644✔
2948
            code = terrno;
×
2949
            QUERY_CHECK_CODE(code, lino, _end);
×
2950
          }
2951
        }
2952
      }
2953
      pCtx->resDataInfo.interBufSize = env.calcMemSize;
207,650,981✔
2954
    } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR ||
459,535,676✔
2955
               pExpr->pExpr->nodeType == QUERY_NODE_VALUE) {
8,300,290✔
2956
      // for simple column, the result buffer needs to hold at least one element.
2957
      pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes;
459,597,059✔
2958
    }
2959

2960
    pCtx->input.numOfInputCols = pFunct->numOfParams;
667,272,583✔
2961
    pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
667,240,137✔
2962
    QUERY_CHECK_NULL(pCtx->input.pData, code, lino, _end, terrno);
667,241,115✔
2963
    pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
667,269,707✔
2964
    QUERY_CHECK_NULL(pCtx->input.pColumnDataAgg, code, lino, _end, terrno);
667,253,114✔
2965

2966
    pCtx->pTsOutput = NULL;
667,270,891✔
2967
    pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
667,298,462✔
2968
    pCtx->resDataInfo.type = pFunct->resSchema.type;
667,297,117✔
2969
    pCtx->order = TSDB_ORDER_ASC;
667,259,587✔
2970
    pCtx->start.key = INT64_MIN;
667,255,492✔
2971
    pCtx->end.key = INT64_MIN;
667,266,821✔
2972
    pCtx->numOfParams = pExpr->base.numOfParams;
667,284,401✔
2973
    pCtx->param = pFunct->pParam;
667,238,183✔
2974
    pCtx->saveHandle.currentPage = -1;
667,308,687✔
2975
    pCtx->pStore = pStore;
667,274,477✔
2976
    pCtx->hasWindowOrGroup = false;
667,242,735✔
2977
    pCtx->needCleanup = false;
667,183,852✔
2978
  }
2979

2980
  for (int32_t i = 1; i < numOfOutput; ++i) {
680,132,706✔
2981
    (*rowEntryInfoOffset)[i] = (int32_t)((*rowEntryInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) +
957,528,077✔
2982
                                         pFuncCtx[i - 1].resDataInfo.interBufSize);
478,786,135✔
2983
  }
2984

2985
  code = setSelectValueColumnInfo(pFuncCtx, numOfOutput);
201,356,280✔
2986
  QUERY_CHECK_CODE(code, lino, _end);
201,341,230✔
2987

2988
_end:
201,341,230✔
2989
  if (code != TSDB_CODE_SUCCESS) {
201,303,536✔
2990
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2991
    for (int32_t i = 0; i < numOfOutput; ++i) {
×
2992
      taosMemoryFree(pFuncCtx[i].input.pData);
×
2993
      taosMemoryFree(pFuncCtx[i].input.pColumnDataAgg);
×
2994
    }
2995
    taosMemoryFreeClear(*rowEntryInfoOffset);
×
2996
    taosMemoryFreeClear(pFuncCtx);
×
2997

2998
    terrno = code;
×
2999
    return NULL;
×
3000
  }
3001
  return pFuncCtx;
201,303,536✔
3002
}
3003

3004
// NOTE: sources columns are more than the destination SSDatablock columns.
3005
// doFilter in table scan needs every column even its output is false
3006
int32_t relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn) {
9,540,987✔
3007
  int32_t code = TSDB_CODE_SUCCESS;
9,540,987✔
3008
  size_t  numOfSrcCols = taosArrayGetSize(pCols);
9,540,987✔
3009

3010
  int32_t i = 0, j = 0;
9,541,668✔
3011
  while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
88,102,937✔
3012
    SColumnInfoData* p = taosArrayGet(pCols, i);
78,562,158✔
3013
    if (!p) {
78,560,168✔
3014
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3015
      return terrno;
×
3016
    }
3017
    SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j);
78,560,168✔
3018
    if (!pmInfo) {
78,559,279✔
3019
      return terrno;
×
3020
    }
3021

3022
    if (p->info.colId == pmInfo->colId) {
78,559,279✔
3023
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->dstSlotId);
69,746,129✔
3024
      if (!pDst) {
69,746,241✔
3025
        return terrno;
×
3026
      }
3027
      code = colDataAssign(pDst, p, pBlock->info.rows, &pBlock->info);
69,746,241✔
3028
      if (code != TSDB_CODE_SUCCESS) {
69,746,013✔
3029
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3030
        return code;
×
3031
      }
3032
      i++;
69,746,013✔
3033
      j++;
69,746,013✔
3034
    } else if (p->info.colId < pmInfo->colId) {
8,814,675✔
3035
      i++;
8,815,256✔
3036
    } else {
3037
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
3038
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3039
    }
3040
  }
3041
  return code;
9,541,876✔
3042
}
3043

3044
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
98,751,533✔
3045
  SInterval interval = {
197,497,907✔
3046
      .interval = pTableScanNode->interval,
98,729,394✔
3047
      .sliding = pTableScanNode->sliding,
98,735,548✔
3048
      .intervalUnit = pTableScanNode->intervalUnit,
98,765,038✔
3049
      .slidingUnit = pTableScanNode->slidingUnit,
98,697,525✔
3050
      .offset = pTableScanNode->offset,
98,701,147✔
3051
      .precision = pTableScanNode->scan.node.pOutputDataBlockDesc->precision,
98,724,527✔
3052
      .timeRange = pTableScanNode->scanRange,
3053
  };
3054
  calcIntervalAutoOffset(&interval);
98,762,050✔
3055

3056
  return interval;
98,698,025✔
3057
}
3058

3059
SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
31,553,511✔
3060
  SColumn c = {0};
31,553,511✔
3061

3062
  c.slotId = pColNode->slotId;
31,553,511✔
3063
  c.colId = pColNode->colId;
31,554,728✔
3064
  c.type = pColNode->node.resType.type;
31,554,269✔
3065
  c.bytes = pColNode->node.resType.bytes;
31,555,435✔
3066
  c.scale = pColNode->node.resType.scale;
31,550,735✔
3067
  c.precision = pColNode->node.resType.precision;
31,548,728✔
3068
  return c;
31,553,150✔
3069
}
3070

3071

3072
/**
3073
 * @brief Determine the actual time range for reading data based on the RANGE clause and the WHERE conditions.
3074
 * @param[in] cond The range specified by WHERE condition.
3075
 * @param[in] range The range specified by RANGE clause.
3076
 * @param[out] twindow The range to be read in DESC order, and only one record is needed.
3077
 * @param[out] extTwindow The external range to read for only one record, which is used for FILL clause.
3078
 * @note `cond` and `twindow` may be the same address.
3079
 */
3080
static int32_t getQueryExtWindow(const STimeWindow* cond, const STimeWindow* range, STimeWindow* twindow,
1,648,398✔
3081
                                 STimeWindow* extTwindows) {
3082
  int32_t     code = TSDB_CODE_SUCCESS;
1,648,398✔
3083
  int32_t     lino = 0;
1,648,398✔
3084
  STimeWindow tempWindow;
3085

3086
  if (cond->skey > cond->ekey || range->skey > range->ekey) {
1,648,398✔
3087
    *twindow = extTwindows[0] = extTwindows[1] = TSWINDOW_DESC_INITIALIZER;
3,025✔
3088
    return code;
3,025✔
3089
  }
3090

3091
  if (range->ekey < cond->skey) {
1,645,373✔
3092
    extTwindows[1] = *cond;
254,645✔
3093
    *twindow = extTwindows[0] = TSWINDOW_DESC_INITIALIZER;
254,645✔
3094
    return code;
254,645✔
3095
  }
3096

3097
  if (cond->ekey < range->skey) {
1,390,728✔
3098
    extTwindows[0] = *cond;
176,523✔
3099
    *twindow = extTwindows[1] = TSWINDOW_DESC_INITIALIZER;
176,523✔
3100
    return code;
176,523✔
3101
  }
3102

3103
  // Only scan data in the time range intersecion.
3104
  extTwindows[0] = extTwindows[1] = *cond;
1,214,205✔
3105
  twindow->skey = TMAX(cond->skey, range->skey);
1,214,205✔
3106
  twindow->ekey = TMIN(cond->ekey, range->ekey);
1,214,205✔
3107
  extTwindows[0].ekey = twindow->skey - 1;
1,214,205✔
3108
  extTwindows[1].skey = twindow->ekey + 1;
1,214,205✔
3109

3110
  return code;
1,214,205✔
3111
}
3112

3113
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode,
121,462,831✔
3114
                               const SReadHandle* readHandle, bool applyExtWin) {
3115
  int32_t code = 0;                             
121,462,831✔
3116
  pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
121,462,831✔
3117
  pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
121,474,365✔
3118

3119
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
121,471,910✔
3120
  if (!pCond->colList) {
121,456,064✔
3121
    return terrno;
×
3122
  }
3123
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
121,411,165✔
3124
  if (pCond->pSlotList == NULL) {
121,453,127✔
3125
    taosMemoryFreeClear(pCond->colList);
×
3126
    return terrno;
×
3127
  }
3128

3129
  // TODO: get it from stable scan node
3130
  pCond->twindows = pTableScanNode->scanRange;
121,393,690✔
3131
  pCond->suid = pTableScanNode->scan.suid;
121,468,216✔
3132
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
121,424,471✔
3133
  pCond->startVersion = -1;
121,451,398✔
3134
  pCond->endVersion = -1;
121,478,902✔
3135
  pCond->skipRollup = readHandle->skipRollup;
121,428,258✔
3136
  if (readHandle->winRangeValid) {
121,442,505✔
3137
    pCond->twindows = readHandle->winRange;
292,064✔
3138
  }
3139
  pCond->cacheSttStatis = readHandle->cacheSttStatis;
121,470,499✔
3140
  // allowed read stt file optimization mode
3141
  pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) &&
242,923,363✔
3142
                       (pTableScanNode->scan.node.pConditions == NULL) && (pTableScanNode->interval == 0);
121,447,774✔
3143

3144
  int32_t j = 0;
121,442,929✔
3145
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
769,660,134✔
3146
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pTableScanNode->scan.pScanCols, i);
648,231,418✔
3147
    if (!pNode) {
648,101,971✔
3148
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3149
      return terrno;
×
3150
    }
3151
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
648,101,971✔
3152
    if (pColNode->colType == COLUMN_TYPE_TAG) {
648,195,789✔
3153
      continue;
×
3154
    }
3155

3156
    pCond->colList[j].type = pColNode->node.resType.type;
648,205,148✔
3157
    pCond->colList[j].bytes = pColNode->node.resType.bytes;
648,198,415✔
3158
    pCond->colList[j].colId = pColNode->colId;
648,213,092✔
3159
    pCond->colList[j].pk = pColNode->isPk;
648,235,641✔
3160

3161
    pCond->pSlotList[j] = pNode->slotId;
648,236,187✔
3162
    j += 1;
648,217,205✔
3163
  }
3164

3165
  pCond->numOfCols = j;
121,479,998✔
3166

3167
  if (applyExtWin) {
121,478,724✔
3168
    if (NULL != pTableScanNode->pExtScanRange) {
99,052,823✔
3169
      pCond->type = TIMEWINDOW_RANGE_EXTERNAL;
1,648,398✔
3170
      code = getQueryExtWindow(&pCond->twindows, pTableScanNode->pExtScanRange, &pCond->twindows, pCond->extTwindows);
1,648,398✔
3171
    } else if (readHandle->extWinRangeValid) {
97,348,609✔
3172
      pCond->type = TIMEWINDOW_RANGE_EXTERNAL;
×
3173
      code = getQueryExtWindow(&pCond->twindows, &readHandle->extWinRange, &pCond->twindows, pCond->extTwindows);
×
3174
    }
3175
  }
3176
  
3177
  return code;
121,450,237✔
3178
}
3179

3180
int32_t initQueryTableDataCondWithColArray(SQueryTableDataCond* pCond, SQueryTableDataCond* pOrgCond,
5,283,095✔
3181
                                           const SReadHandle* readHandle, SArray* colArray) {
3182
  int32_t code = TSDB_CODE_SUCCESS;
5,283,095✔
3183
  int32_t lino = 0;
5,283,095✔
3184

3185
  pCond->order = TSDB_ORDER_ASC;
5,283,095✔
3186
  pCond->numOfCols = (int32_t)taosArrayGetSize(colArray);
5,283,095✔
3187

3188
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
5,283,095✔
3189
  QUERY_CHECK_NULL(pCond->colList, code, lino, _return, terrno);
5,283,095✔
3190

3191
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
5,283,095✔
3192
  QUERY_CHECK_NULL(pCond->pSlotList, code, lino, _return, terrno);
5,283,095✔
3193

3194
  pCond->twindows = pOrgCond->twindows;
5,283,095✔
3195
  pCond->type = pOrgCond->type;
5,283,095✔
3196
  pCond->startVersion = -1;
5,283,095✔
3197
  pCond->endVersion = -1;
5,283,095✔
3198
  pCond->skipRollup = true;
5,283,095✔
3199
  pCond->notLoadData = false;
5,283,095✔
3200

3201
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
27,494,727✔
3202
    SColIdPair* pColPair = taosArrayGet(colArray, i);
22,211,632✔
3203
    QUERY_CHECK_NULL(pColPair, code, lino, _return, terrno);
22,211,632✔
3204

3205
    bool find = false;
22,211,632✔
3206
    for (int32_t j = 0; j < pOrgCond->numOfCols; ++j) {
143,597,751✔
3207
      if (pOrgCond->colList[j].colId == pColPair->vtbColId) {
143,597,751✔
3208
        pCond->colList[i].type = pOrgCond->colList[j].type;
22,211,632✔
3209
        pCond->colList[i].bytes = pOrgCond->colList[j].bytes;
22,211,632✔
3210
        pCond->colList[i].colId = pColPair->orgColId;
22,211,632✔
3211
        pCond->colList[i].pk = pOrgCond->colList[j].pk;
22,211,632✔
3212
        pCond->pSlotList[i] = i;
22,211,632✔
3213
        find = true;
22,211,632✔
3214
        break;
22,211,632✔
3215
      }
3216
    }
3217
    QUERY_CHECK_CONDITION(find, code, lino, _return, TSDB_CODE_NOT_FOUND);
22,211,632✔
3218
  }
3219

3220
  return code;
5,283,095✔
3221
_return:
×
3222
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(terrno));
×
3223
  taosMemoryFreeClear(pCond->colList);
×
3224
  taosMemoryFreeClear(pCond->pSlotList);
×
3225
  return code;
×
3226
}
3227

3228
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) {
271,255,157✔
3229
  taosMemoryFreeClear(pCond->colList);
271,255,157✔
3230
  taosMemoryFreeClear(pCond->pSlotList);
271,258,652✔
3231
}
271,247,164✔
3232

3233
int32_t convertFillType(int32_t mode) {
2,097,926✔
3234
  int32_t type = TSDB_FILL_NONE;
2,097,926✔
3235
  switch (mode) {
2,097,926✔
3236
    case FILL_MODE_PREV:
107,759✔
3237
      type = TSDB_FILL_PREV;
107,759✔
3238
      break;
107,759✔
3239
    case FILL_MODE_NONE:
×
3240
      type = TSDB_FILL_NONE;
×
3241
      break;
×
3242
    case FILL_MODE_NULL:
141,609✔
3243
      type = TSDB_FILL_NULL;
141,609✔
3244
      break;
141,609✔
3245
    case FILL_MODE_NULL_F:
18,675✔
3246
      type = TSDB_FILL_NULL_F;
18,675✔
3247
      break;
18,675✔
3248
    case FILL_MODE_NEXT:
99,369✔
3249
      type = TSDB_FILL_NEXT;
99,369✔
3250
      break;
99,369✔
3251
    case FILL_MODE_VALUE:
149,883✔
3252
      type = TSDB_FILL_SET_VALUE;
149,883✔
3253
      break;
149,883✔
3254
    case FILL_MODE_VALUE_F:
4,346✔
3255
      type = TSDB_FILL_SET_VALUE_F;
4,346✔
3256
      break;
4,346✔
3257
    case FILL_MODE_LINEAR:
156,044✔
3258
      type = TSDB_FILL_LINEAR;
156,044✔
3259
      break;
156,044✔
3260
    case FILL_MODE_NEAR:
1,420,241✔
3261
      type = TSDB_FILL_NEAR;
1,420,241✔
3262
      break;
1,420,241✔
3263
    default:
×
3264
      type = TSDB_FILL_NONE;
×
3265
  }
3266

3267
  return type;
2,097,926✔
3268
}
3269

3270
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery) {
1,445,484✔
3271
  if (ascQuery) {
1,445,484✔
3272
    *w = getAlignQueryTimeWindow(pInterval, ts);
1,163,952✔
3273
  } else {
3274
    // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
3275
    *w = getAlignQueryTimeWindow(pInterval, ts);
281,532✔
3276

3277
    int64_t key = w->skey;
281,532✔
3278
    while (key < ts) {  // moving towards end
296,399✔
3279
      key = getNextTimeWindowStart(pInterval, key, TSDB_ORDER_ASC);
165,422✔
3280
      if (key > ts) {
165,422✔
3281
        break;
150,555✔
3282
      }
3283

3284
      w->skey = key;
14,867✔
3285
    }
3286
    w->ekey = taosTimeAdd(w->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
281,532✔
3287
  }
3288
}
1,445,989✔
3289

3290
static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) {
28,300,308✔
3291
  STimeWindow w = {0};
28,300,308✔
3292

3293
  w.skey = taosTimeTruncate(ts, pInterval);
28,300,308✔
3294
  w.ekey = taosTimeGetIntervalEnd(w.skey, pInterval);
28,299,711✔
3295
  return w;
28,300,520✔
3296
}
3297

3298
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) {
1,085,038✔
3299
  STimeWindow win = *pWindow;
1,085,038✔
3300
  STimeWindow save = win;
1,085,038✔
3301
  while (win.skey <= ts && win.ekey >= ts) {
5,351,536✔
3302
    save = win;
4,266,498✔
3303
    // get previous time window
3304
    getNextTimeWindow(pInterval, &win, order == TSDB_ORDER_ASC ? TSDB_ORDER_DESC : TSDB_ORDER_ASC);
4,266,498✔
3305
  }
3306

3307
  return save;
1,085,038✔
3308
}
3309

3310
// get the correct time window according to the handled timestamp
3311
// todo refactor
3312
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
40,396,152✔
3313
                                int32_t order) {
3314
  STimeWindow w = {0};
40,396,152✔
3315
  if (pResultRowInfo->cur.pageId == -1) {  // the first window, from the previous stored value
40,396,442✔
3316
    getInitialStartTimeWindow(pInterval, ts, &w, (order == TSDB_ORDER_ASC));
1,445,782✔
3317
    return w;
1,445,989✔
3318
  }
3319

3320
  SResultRow* pRow = getResultRowByPos(pBuf, &pResultRowInfo->cur, false);
38,951,645✔
3321
  if (pRow) {
38,950,050✔
3322
    TAOS_SET_OBJ_ALIGNED(&w, pRow->win);
38,951,164✔
3323
  }
3324

3325
  // in case of typical time window, we can calculate time window directly.
3326
  if (w.skey > ts || w.ekey < ts) {
38,950,656✔
3327
    w = doCalculateTimeWindow(ts, pInterval);
28,299,462✔
3328
  }
3329

3330
  if (pInterval->interval != pInterval->sliding) {
38,951,714✔
3331
    // it is an sliding window query, in which sliding value is not equalled to
3332
    // interval value, and we need to find the first qualified time window.
3333
    w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
1,085,038✔
3334
  }
3335

3336
  return w;
38,949,411✔
3337
}
3338

3339
TSKEY getNextTimeWindowStart(const SInterval* pInterval, TSKEY start, int32_t order) {
2,147,483,647✔
3340
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
2,147,483,647✔
3341
  TSKEY   nextStart = taosTimeAdd(start, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
2,147,483,647✔
3342
  nextStart = taosTimeAdd(nextStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision, NULL);
2,147,483,647✔
3343
  nextStart = taosTimeAdd(nextStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
2,147,483,647✔
3344
  return nextStart;
2,147,483,647✔
3345
}
3346

3347
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order) {
2,147,483,647✔
3348
  tw->skey = getNextTimeWindowStart(pInterval, tw->skey, order);
2,147,483,647✔
3349
  tw->ekey = taosTimeAdd(tw->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
2,147,483,647✔
3350
}
2,147,483,647✔
3351

3352
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
298,787,819✔
3353
  return (pLimitInfo->limit.limit != -1 || pLimitInfo->limit.offset != -1 || pLimitInfo->slimit.limit != -1 ||
595,525,919✔
3354
          pLimitInfo->slimit.offset != -1);
296,738,710✔
3355
}
3356

3357
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo) {
×
3358
  return (pLimitInfo->slimit.limit != -1 || pLimitInfo->slimit.offset != -1);
×
3359
}
3360

3361
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) {
288,717,098✔
3362
  SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)};
288,717,098✔
3363
  SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)};
288,672,636✔
3364

3365
  pLimitInfo->limit = limit;
288,674,412✔
3366
  pLimitInfo->slimit = slimit;
288,686,759✔
3367
  pLimitInfo->remainOffset = limit.offset;
288,687,184✔
3368
  pLimitInfo->remainGroupOffset = slimit.offset;
288,678,004✔
3369
  pLimitInfo->numOfOutputRows = 0;
288,686,335✔
3370
  pLimitInfo->numOfOutputGroups = 0;
288,720,914✔
3371
  pLimitInfo->currentGroupId = 0;
288,722,036✔
3372
}
288,695,210✔
3373

3374
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo) {
47,299,384✔
3375
  pLimitInfo->numOfOutputRows = 0;
47,299,384✔
3376
  pLimitInfo->remainOffset = pLimitInfo->limit.offset;
47,302,871✔
3377
}
47,294,802✔
3378

3379
int32_t tableListGetSize(const STableListInfo* pTableList, int32_t* pRes) {
294,389,510✔
3380
  if (taosArrayGetSize(pTableList->pTableList) != taosHashGetSize(pTableList->map)) {
294,389,510✔
3381
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
3382
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3383
  }
3384
  (*pRes) = taosArrayGetSize(pTableList->pTableList);
294,390,001✔
3385
  return TSDB_CODE_SUCCESS;
294,380,157✔
3386
}
3387

3388
uint64_t tableListGetSuid(const STableListInfo* pTableList) { return pTableList->idInfo.suid; }
3,196,865✔
3389

3390
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) {
151,133,827✔
3391
  if (taosArrayGetSize(pTableList->pTableList) == 0) {
151,133,827✔
3392
    return NULL;
3,683✔
3393
  }
3394

3395
  return taosArrayGet(pTableList->pTableList, index);
151,120,766✔
3396
}
3397

3398
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex) {
10,316✔
3399
  int32_t numOfTables = taosArrayGetSize(pTableList->pTableList);
10,316✔
3400
  if (startIndex >= numOfTables) {
10,316✔
3401
    return -1;
×
3402
  }
3403

3404
  for (int32_t i = startIndex; i < numOfTables; ++i) {
117,927✔
3405
    STableKeyInfo* p = taosArrayGet(pTableList->pTableList, i);
117,927✔
3406
    if (!p) {
117,927✔
3407
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3408
      return -1;
×
3409
    }
3410
    if (p->uid == uid) {
117,927✔
3411
      return i;
10,316✔
3412
    }
3413
  }
3414
  return -1;
×
3415
}
3416

3417
void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, uint64_t* uid, int32_t* type) {
69,447✔
3418
  *psuid = pTableList->idInfo.suid;
69,447✔
3419
  *uid = pTableList->idInfo.uid;
69,447✔
3420
  *type = pTableList->idInfo.tableType;
69,447✔
3421
}
69,447✔
3422

3423
uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
531,111,349✔
3424
  int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
531,111,349✔
3425
  if (slot == NULL) {
531,317,556✔
3426
    qDebug("table:%" PRIu64 " not found in table list", tableUid);
×
3427
    return -1;
×
3428
  }
3429

3430
  STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot);
531,317,556✔
3431
  if (pKeyInfo == NULL) {
531,326,953✔
3432
    qDebug("table:%" PRIu64 " not found in table list", tableUid);
×
3433
    return -1;
×
3434
  }
3435
  return pKeyInfo->groupId;
531,326,953✔
3436
}
3437

3438
// TODO handle the group offset info, fix it, the rule of group output will be broken by this function
3439
// int32_t tableListRemoveTableInfo(STableListInfo* pTableList, uint64_t uid) {
3440
//   int32_t code = TSDB_CODE_SUCCESS;
3441
//   int32_t lino = 0;
3442

3443
//   int32_t* slot = taosHashGet(pTableList->map, &uid, sizeof(uid));
3444
//   if (slot == NULL) {
3445
//     qDebug("table:%" PRIu64 " not found in table list", uid);
3446
//     return 0;
3447
//   }
3448

3449
//   taosArrayRemove(pTableList->pTableList, *slot);
3450
//   code = taosHashRemove(pTableList->map, &uid, sizeof(uid));
3451

3452
//   _end:
3453
//   if (code != TSDB_CODE_SUCCESS) {
3454
//     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
3455
//   } else {
3456
//     qDebug("uid:%" PRIu64 ", remove from table list", uid);
3457
//   }
3458

3459
//   return code;
3460
// }
3461

3462
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
212,140✔
3463
  int32_t code = TSDB_CODE_SUCCESS;
212,140✔
3464
  int32_t lino = 0;
212,140✔
3465
  if (pTableList->map == NULL) {
212,140✔
3466
    pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
3467
    QUERY_CHECK_NULL(pTableList->map, code, lino, _end, terrno);
×
3468
  }
3469

3470
  STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
212,200✔
3471
  void*         p = taosHashGet(pTableList->map, &uid, sizeof(uid));
212,080✔
3472
  if (p != NULL) {
212,009✔
3473
    qInfo("table:%" PRId64 " already in tableIdList, ignore it", uid);
220✔
3474
    goto _end;
220✔
3475
  }
3476

3477
  void* tmp = taosArrayPush(pTableList->pTableList, &keyInfo);
211,789✔
3478
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
211,849✔
3479

3480
  int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
211,849✔
3481
  code = taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
212,005✔
3482
  if (code != TSDB_CODE_SUCCESS) {
212,186✔
3483
    // we have checked the existence of uid in hash map above
3484
    QUERY_CHECK_CONDITION((code != TSDB_CODE_DUP_KEY), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3485
    taosArrayPopTailBatch(pTableList->pTableList, 1);  // let's pop the last element in the array list
×
3486
  }
3487

3488
_end:
212,406✔
3489
  if (code != TSDB_CODE_SUCCESS) {
212,260✔
3490
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3491
  } else {
3492
    qDebug("uid:%" PRIu64 ", groupId:%" PRIu64 " added into table list, slot:%d, total:%d", uid, gid, slot, slot + 1);
212,260✔
3493
  }
3494

3495
  return code;
212,500✔
3496
}
3497

3498
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
100,204,817✔
3499
                              int32_t* size) {
3500
  int32_t totalGroups = tableListGetOutputGroups(pTableList);
100,204,817✔
3501
  int32_t numOfTables = 0;
100,210,905✔
3502
  int32_t code = tableListGetSize(pTableList, &numOfTables);
100,225,344✔
3503
  if (code != TSDB_CODE_SUCCESS) {
100,201,145✔
3504
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3505
    return code;
×
3506
  }
3507

3508
  if (ordinalGroupIndex < 0 || ordinalGroupIndex >= totalGroups) {
100,201,145✔
3509
    return TSDB_CODE_INVALID_PARA;
×
3510
  }
3511

3512
  // here handle two special cases:
3513
  // 1. only one group exists, and 2. one table exists for each group.
3514
  if (totalGroups == 1) {
100,201,145✔
3515
    *size = numOfTables;
99,829,605✔
3516
    *pKeyInfo = (*size == 0) ? NULL : taosArrayGet(pTableList->pTableList, 0);
99,815,445✔
3517
    return TSDB_CODE_SUCCESS;
99,824,802✔
3518
  } else if (totalGroups == numOfTables) {
372,407✔
3519
    *size = 1;
327,092✔
3520
    *pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex);
327,092✔
3521
    return TSDB_CODE_SUCCESS;
327,092✔
3522
  }
3523

3524
  int32_t offset = pTableList->groupOffset[ordinalGroupIndex];
45,370✔
3525
  if (ordinalGroupIndex < totalGroups - 1) {
56,225✔
3526
    *size = pTableList->groupOffset[ordinalGroupIndex + 1] - offset;
41,420✔
3527
  } else {
3528
    *size = numOfTables - offset;
14,805✔
3529
  }
3530

3531
  *pKeyInfo = taosArrayGet(pTableList->pTableList, offset);
56,225✔
3532
  return TSDB_CODE_SUCCESS;
56,225✔
3533
}
3534

3535
int32_t tableListGetOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; }
294,674,648✔
3536

3537
bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; }
575,758✔
3538

3539
STableListInfo* tableListCreate() {
126,929,771✔
3540
  STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo));
126,929,771✔
3541
  if (pListInfo == NULL) {
126,898,031✔
3542
    return NULL;
×
3543
  }
3544

3545
  pListInfo->remainGroups = NULL;
126,898,031✔
3546
  pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
126,903,387✔
3547
  if (pListInfo->pTableList == NULL) {
126,921,940✔
3548
    goto _error;
×
3549
  }
3550

3551
  pListInfo->map = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
126,916,102✔
3552
  if (pListInfo->map == NULL) {
126,957,908✔
3553
    goto _error;
×
3554
  }
3555

3556
  pListInfo->numOfOuputGroups = 1;
126,959,785✔
3557
  return pListInfo;
126,957,812✔
3558

3559
_error:
×
3560
  tableListDestroy(pListInfo);
×
3561
  return NULL;
×
3562
}
3563

3564
void tableListDestroy(STableListInfo* pTableListInfo) {
136,084,051✔
3565
  if (pTableListInfo == NULL) {
136,084,051✔
3566
    return;
9,144,267✔
3567
  }
3568

3569
  taosArrayDestroy(pTableListInfo->pTableList);
126,939,784✔
3570
  taosMemoryFreeClear(pTableListInfo->groupOffset);
126,924,846✔
3571

3572
  taosHashCleanup(pTableListInfo->map);
126,925,442✔
3573
  taosHashCleanup(pTableListInfo->remainGroups);
126,952,176✔
3574
  pTableListInfo->pTableList = NULL;
126,951,025✔
3575
  pTableListInfo->map = NULL;
126,955,880✔
3576
  taosMemoryFree(pTableListInfo);
126,952,375✔
3577
}
3578

3579
void tableListClear(STableListInfo* pTableListInfo) {
159,509✔
3580
  if (pTableListInfo == NULL) {
159,509✔
3581
    return;
×
3582
  }
3583

3584
  taosArrayClear(pTableListInfo->pTableList);
159,509✔
3585
  taosHashClear(pTableListInfo->map);
159,502✔
3586
  taosHashClear(pTableListInfo->remainGroups);
159,809✔
3587
  taosMemoryFree(pTableListInfo->groupOffset);
159,809✔
3588
  pTableListInfo->numOfOuputGroups = 1;
159,809✔
3589
  pTableListInfo->oneTableForEachGroup = false;
159,809✔
3590
}
3591

3592
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
479,974,877✔
3593
  STableKeyInfo* pInfo1 = (STableKeyInfo*)p1;
479,974,877✔
3594
  STableKeyInfo* pInfo2 = (STableKeyInfo*)p2;
479,974,877✔
3595

3596
  if (pInfo1->groupId == pInfo2->groupId) {
479,974,877✔
3597
    return 0;
464,096,375✔
3598
  } else {
3599
    return pInfo1->groupId < pInfo2->groupId ? -1 : 1;
15,879,053✔
3600
  }
3601
}
3602

3603
int32_t sortTableGroup(STableListInfo* pTableListInfo) {
17,885,777✔
3604
  int32_t code = TSDB_CODE_SUCCESS;
17,885,777✔
3605
  taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
17,885,777✔
3606
  int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
17,888,041✔
3607
  if (size == 0) {
17,883,215✔
3608
    pTableListInfo->numOfOuputGroups = 0;
×
3609
    return code;
×
3610
  }
3611

3612
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
17,883,215✔
3613
  if (!pList) {
17,884,949✔
3614
    code = terrno;
×
3615
    goto end;
×
3616
  }
3617

3618
  STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
17,884,949✔
3619
  if (pInfo == NULL) {
17,884,640✔
3620
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3621
    code = terrno;
×
3622
    goto end;
×
3623
  }
3624
  uint64_t gid = pInfo->groupId;
17,884,640✔
3625

3626
  int32_t start = 0;
17,885,193✔
3627
  void*   tmp = taosArrayPush(pList, &start);
17,885,707✔
3628
  if (tmp == NULL) {
17,885,707✔
3629
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3630
    code = terrno;
×
3631
    goto end;
×
3632
  }
3633

3634
  for (int32_t i = 1; i < size; ++i) {
117,158,354✔
3635
    pInfo = taosArrayGet(pTableListInfo->pTableList, i);
99,275,233✔
3636
    if (pInfo == NULL) {
99,270,515✔
3637
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3638
      code = terrno;
×
3639
      goto end;
×
3640
    }
3641
    if (pInfo->groupId != gid) {
99,270,515✔
3642
      tmp = taosArrayPush(pList, &i);
3,467,366✔
3643
      if (tmp == NULL) {
3,467,366✔
3644
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3645
        code = terrno;
×
3646
        goto end;
×
3647
      }
3648
      gid = pInfo->groupId;
3,467,366✔
3649
    }
3650
  }
3651

3652
  pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
17,885,909✔
3653
  pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
17,886,235✔
3654
  if (pTableListInfo->groupOffset == NULL) {
17,880,730✔
3655
    code = terrno;
×
3656
    goto end;
×
3657
  }
3658

3659
  memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
17,882,593✔
3660

3661
end:
17,878,883✔
3662
  taosArrayDestroy(pList);
17,880,729✔
3663
  return code;
17,873,119✔
3664
}
3665

3666
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SScanPhysiNode* pScanNode,
109,497,753✔
3667
                                    SNodeList* group, bool groupSort, uint8_t* digest, SStorageAPI* pAPI, SHashObj* groupIdMap) {
3668
  int32_t code = TSDB_CODE_SUCCESS;
109,497,753✔
3669

3670
  bool   groupByTbname = groupbyTbname(group);
109,497,753✔
3671
  size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
109,499,636✔
3672
  if (!numOfTables) {
109,502,216✔
3673
    return code;
5,794✔
3674
  }
3675
  qDebug("numOfTables:%zu, groupByTbname:%d, group:%p", numOfTables, groupByTbname, group);
109,496,422✔
3676
  if (group == NULL || groupByTbname) {
109,486,662✔
3677
    if (tsCountAlwaysReturnValue && QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode) &&
107,644,476✔
3678
        ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) {
83,475,864✔
3679
      pTableListInfo->remainGroups =
4,499,544✔
3680
          taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
4,499,544✔
3681
      if (pTableListInfo->remainGroups == NULL) {
4,499,544✔
3682
        return terrno;
×
3683
      }
3684

3685
      for (int i = 0; i < numOfTables; i++) {
19,555,132✔
3686
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
15,055,731✔
3687
        if (!info) {
15,055,588✔
3688
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3689
          return terrno;
×
3690
        }
3691
        info->groupId = groupByTbname ? info->uid : 0;
15,055,588✔
3692
        int32_t tempRes = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId),
15,055,588✔
3693
                                      &(info->uid), sizeof(info->uid));
15,055,588✔
3694
        if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
15,055,588✔
3695
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
3696
          return tempRes;
×
3697
        }
3698
      }
3699
    } else {
3700
      for (int32_t i = 0; i < numOfTables; i++) {
477,711,253✔
3701
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
374,587,462✔
3702
        if (!info) {
374,586,013✔
3703
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3704
          return terrno;
×
3705
        }
3706
        info->groupId = groupByTbname ? info->uid : 0;
374,586,013✔
3707
        
3708
      }
3709
    }
3710
    if (groupIdMap && group != NULL){
107,623,192✔
3711
      getColInfoResultForGroupbyForStream(pHandle->vnode, group, pTableListInfo, pAPI, groupIdMap);
107,509✔
3712
    }
3713

3714
    pTableListInfo->oneTableForEachGroup = groupByTbname;
107,623,192✔
3715
    if (numOfTables == 1 && pTableListInfo->idInfo.tableType == TSDB_CHILD_TABLE) {
107,642,565✔
3716
      pTableListInfo->oneTableForEachGroup = true;
33,071,201✔
3717
    }
3718

3719
    if (groupSort && groupByTbname) {
107,643,458✔
3720
      taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
1,455,758✔
3721
      pTableListInfo->numOfOuputGroups = numOfTables;
1,455,758✔
3722
    } else if (groupByTbname && pScanNode->groupOrderScan) {
106,187,700✔
3723
      pTableListInfo->numOfOuputGroups = numOfTables;
30,115✔
3724
    } else {
3725
      pTableListInfo->numOfOuputGroups = 1;
106,157,786✔
3726
    }
3727
    if (groupSort || pScanNode->groupOrderScan) {
107,652,817✔
3728
      code = sortTableGroup(pTableListInfo);
17,739,417✔
3729
    }
3730
  } else {
3731
    bool initRemainGroups = false;
1,842,186✔
3732
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) {
1,842,186✔
3733
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode;
1,660,515✔
3734
      if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable &&
1,660,515✔
3735
          !(groupSort || pScanNode->groupOrderScan)) {
844,237✔
3736
        initRemainGroups = true;
817,443✔
3737
      }
3738
    }
3739

3740
    code = getColInfoResultForGroupby(pHandle->vnode, group, pTableListInfo, digest, pAPI, initRemainGroups, groupIdMap);
1,842,186✔
3741
    if (code != TSDB_CODE_SUCCESS) {
1,841,155✔
3742
      return code;
×
3743
    }
3744

3745
    if (pScanNode->groupOrderScan) pTableListInfo->numOfOuputGroups = taosArrayGetSize(pTableListInfo->pTableList);
1,841,155✔
3746

3747
    if (groupSort || pScanNode->groupOrderScan) {
1,841,569✔
3748
      code = sortTableGroup(pTableListInfo);
152,197✔
3749
    }
3750
  }
3751

3752
  // add all table entry in the hash map
3753
  size_t size = taosArrayGetSize(pTableListInfo->pTableList);
109,477,245✔
3754
  for (int32_t i = 0; i < size; ++i) {
508,053,493✔
3755
    STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i);
398,568,907✔
3756
    if (!p) {
398,512,058✔
3757
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3758
      return terrno;
×
3759
    }
3760
    int32_t tempRes = taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t));
398,512,058✔
3761
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
398,572,480✔
3762
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
3763
      return tempRes;
×
3764
    }
3765
  }
3766

3767
  return code;
109,500,069✔
3768
}
3769

3770
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
121,578,185✔
3771
                                STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
3772
                                SExecTaskInfo* pTaskInfo, SHashObj* groupIdMap) {
3773
  int64_t     st = taosGetTimestampUs();
121,600,511✔
3774
  const char* idStr = GET_TASKID(pTaskInfo);
121,600,511✔
3775

3776
  if (pHandle == NULL) {
121,527,369✔
3777
    qError("invalid handle, in creating operator tree, %s", idStr);
×
3778
    return TSDB_CODE_INVALID_PARA;
×
3779
  }
3780

3781
  if (pHandle->uid != 0) {
121,527,369✔
3782
    pScanNode->uid = pHandle->uid;
37,788✔
3783
    pScanNode->tableType = TSDB_CHILD_TABLE;
37,788✔
3784
  }
3785
  uint8_t digest[17] = {0};
121,571,967✔
3786
  int32_t code = getTableList(pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo, digest, idStr,
121,548,858✔
3787
                              &pTaskInfo->storageAPI, pTaskInfo->pStreamRuntimeInfo);
121,578,676✔
3788
  if (code != TSDB_CODE_SUCCESS) {
121,629,304✔
3789
    qError("failed to getTableList, code:%s", tstrerror(code));
1,104✔
3790
    return code;
1,104✔
3791
  }
3792

3793
  int32_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
121,628,200✔
3794

3795
  int64_t st1 = taosGetTimestampUs();
121,638,263✔
3796
  pTaskInfo->cost.extractListTime = (st1 - st) / 1000.0;
121,638,263✔
3797
  qDebug("extract queried table list completed, %d tables, elapsed time:%.2f ms %s", numOfTables,
121,630,812✔
3798
         pTaskInfo->cost.extractListTime, idStr);
3799

3800
  if (numOfTables == 0) {
121,629,474✔
3801
    qDebug("no table qualified for query, %s", idStr);
12,237,344✔
3802
    return TSDB_CODE_SUCCESS;
12,237,344✔
3803
  }
3804

3805
  code = buildGroupIdMapForAllTables(pTableListInfo, pHandle, pScanNode, pGroupTags, groupSort, digest, &pTaskInfo->storageAPI, groupIdMap);
109,392,130✔
3806
  if (code != TSDB_CODE_SUCCESS) {
109,401,936✔
3807
    return code;
×
3808
  }
3809

3810
  pTaskInfo->cost.groupIdMapTime = (taosGetTimestampUs() - st1) / 1000.0;
109,404,094✔
3811
  qDebug("generate group id map completed, elapsed time:%.2f ms %s", pTaskInfo->cost.groupIdMapTime, idStr);
109,398,310✔
3812

3813
  return TSDB_CODE_SUCCESS;
109,402,350✔
3814
}
3815

3816
char* getStreamOpName(uint16_t opType) {
10,032,108✔
3817
  switch (opType) {
10,032,108✔
3818
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
×
3819
      return "stream scan";
×
3820
    case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
9,826,185✔
3821
      return "project";
9,826,185✔
3822
    case QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW:
205,923✔
3823
      return "external window";
205,923✔
3824
  }
3825
  return "error name";
×
3826
}
3827

3828
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr, int64_t qId) {
254,742,782✔
3829
  if (qDebugFlag & DEBUG_TRACE) {
254,742,782✔
3830
    if (!pBlock) {
38,446✔
3831
      qDebug("%" PRIx64 " %s %s %s: Block is Null", qId, taskIdStr, flag, __func__);
6,953✔
3832
      return;
6,953✔
3833
    } else if (pBlock->info.rows == 0) {
31,493✔
3834
      qDebug("%" PRIx64 " %s %s %s: Block is Empty. block type %d", qId, taskIdStr, flag, __func__, pBlock->info.type);
×
3835
      return;
×
3836
    }
3837
    
3838
    char*   pBuf = NULL;
31,493✔
3839
    int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr, qId);
31,493✔
3840
    if (code == 0) {
31,493✔
3841
      qDebugL("%" PRIx64 " %s %s", qId, __func__, pBuf);
31,493✔
3842
      taosMemoryFree(pBuf);
31,493✔
3843
    }
3844
  }
3845
}
3846

3847
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
×
3848
  if (!pBlock) {
×
3849
    qDebug("%s===stream===%s %s: Block is Null", taskIdStr, flag, opStr);
×
3850
    return;
×
3851
  } else if (pBlock->info.rows == 0) {
×
3852
    qDebug("%s===stream===%s %s: Block is Empty. block type %d.skey:%" PRId64 ",ekey:%" PRId64 ",version%" PRId64,
×
3853
           taskIdStr, flag, opStr, pBlock->info.type, pBlock->info.window.skey, pBlock->info.window.ekey,
3854
           pBlock->info.version);
3855
    return;
×
3856
  }
3857
  if (qDebugFlag & DEBUG_TRACE) {
×
3858
    char* pBuf = NULL;
×
3859
    char  flagBuf[64];
×
3860
    snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
×
3861
    int32_t code = dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr, 0);
×
3862
    if (code == 0) {
×
3863
      qDebug("%s", pBuf);
×
3864
      taosMemoryFree(pBuf);
×
3865
    }
3866
  }
3867
}
3868

3869
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
42,217,225✔
3870

3871
void updateTimeWindowInfo(SColumnInfoData* pColData, const STimeWindow* pWin, int64_t delta) {
2,147,483,647✔
3872
  int64_t* ts = (int64_t*)pColData->pData;
2,147,483,647✔
3873

3874
  int64_t duration = pWin->ekey - pWin->skey + delta;
2,147,483,647✔
3875
  ts[2] = duration;            // set the duration
2,147,483,647✔
3876
  ts[3] = pWin->skey;          // window start key
2,147,483,647✔
3877
  ts[4] = pWin->ekey + delta;  // window end key
2,147,483,647✔
3878
}
2,147,483,647✔
3879

3880
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock,
945,030,226✔
3881
                 int32_t rowIndex) {
3882
  SColumnDataAgg* pColAgg = NULL;
945,030,226✔
3883
  const char*     isNull = oldkeyBuf;
945,030,226✔
3884
  const char*     p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size;
945,030,226✔
3885

3886
  for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
2,147,483,647✔
3887
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
1,467,044,647✔
3888
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
1,467,793,327✔
3889
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
1,467,746,704✔
3890

3891
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
2,147,483,647✔
3892
      if (isNull[i] != 1) return 1;
100,776,754✔
3893
    } else {
3894
      if (isNull[i] != 0) return 1;
1,366,877,099✔
3895
      const char* val = colDataGetData(pColInfoData, rowIndex);
1,366,385,613✔
3896
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
1,366,530,298✔
3897
        int32_t len = getJsonValueLen(val);
×
3898
        if (memcmp(p, val, len) != 0) return 1;
×
3899
        p += len;
×
3900
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
1,366,436,354✔
3901
        if (IS_STR_DATA_BLOB(pCol->type)) {
462,984,654✔
UNCOV
3902
          if (memcmp(p, val, blobDataTLen(val)) != 0) return 1;
×
3903
          p += blobDataTLen(val);
×
3904
        } else {
3905
          if (memcmp(p, val, varDataTLen(val)) != 0) return 1;
462,924,213✔
3906
          p += varDataTLen(val);
455,834,726✔
3907
        }
3908
      } else {
3909
        if (0 != memcmp(p, val, pCol->bytes)) return 1;
903,607,954✔
3910
        p += pCol->bytes;
886,100,575✔
3911
      }
3912
    }
3913
  }
3914
  if ((int32_t)(p - oldkeyBuf) != oldKeysLen) return 1;
919,612,829✔
3915
  return 0;
919,599,883✔
3916
}
3917

3918
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex) {
25,186,118✔
3919
  uint32_t        colNum = pSortGroupCols->size;
25,186,118✔
3920
  SColumnDataAgg* pColAgg = NULL;
25,186,954✔
3921
  char*           isNull = keyBuf;
25,186,954✔
3922
  char*           p = keyBuf + sizeof(int8_t) * colNum;
25,186,954✔
3923

3924
  for (int32_t i = 0; i < colNum; ++i) {
75,259,049✔
3925
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
50,063,526✔
3926
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
50,076,311✔
3927
    if (pCol->slotId > pBlock->pDataBlock->size) continue;
50,083,172✔
3928

3929
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
50,081,918✔
3930

3931
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
100,166,762✔
3932
      isNull[i] = 1;
1,801,514✔
3933
    } else {
3934
      isNull[i] = 0;
48,281,867✔
3935
      const char* val = colDataGetData(pColInfoData, rowIndex);
48,278,523✔
3936
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
48,279,150✔
3937
        int32_t len = getJsonValueLen(val);
×
3938
        memcpy(p, val, len);
×
3939
        p += len;
×
3940
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
48,258,041✔
3941
        if (IS_STR_DATA_BLOB(pCol->type)) {
7,248,872✔
3942
          blobDataCopy(p, val);
836✔
3943
          p += blobDataTLen(val);
×
3944
        } else {
3945
          varDataCopy(p, val);
7,255,142✔
3946
          p += varDataTLen(val);
7,257,441✔
3947
        }
3948
      } else {
3949
        memcpy(p, val, pCol->bytes);
41,016,693✔
3950
        p += pCol->bytes;
41,016,902✔
3951
      }
3952
    }
3953
  }
3954
  return (int32_t)(p - keyBuf);
25,195,523✔
3955
}
3956

3957
uint64_t calcGroupId(char* pData, int32_t len) {
2,147,483,647✔
3958
  T_MD5_CTX context;
2,147,483,647✔
3959
  tMD5Init(&context);
2,147,483,647✔
3960
  tMD5Update(&context, (uint8_t*)pData, len);
2,147,483,647✔
3961
  tMD5Final(&context);
2,147,483,647✔
3962

3963
  // NOTE: only extract the initial 8 bytes of the final MD5 digest
3964
  uint64_t id = 0;
2,147,483,647✔
3965
  memcpy(&id, context.digest, sizeof(uint64_t));
2,147,483,647✔
3966
  if (0 == id) memcpy(&id, context.digest + 8, sizeof(uint64_t));
2,147,483,647✔
3967
  return id;
2,147,483,647✔
3968
}
3969

3970
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
41,836✔
3971
  SNode*     node;
3972
  SNodeList* ret = NULL;
41,836✔
3973
  FOREACH(node, pSortKeys) {
127,440✔
3974
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)node;
85,604✔
3975
    int32_t           code = nodesListMakeAppend(&ret, pSortKey->pExpr);
85,604✔
3976
    if (code != TSDB_CODE_SUCCESS) {
85,604✔
3977
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3978
      terrno = code;
×
3979
      return NULL;
×
3980
    }
3981
  }
3982
  return ret;
41,836✔
3983
}
3984

3985
int32_t extractKeysLen(const SArray* keys, int32_t* pLen) {
41,836✔
3986
  int32_t code = TSDB_CODE_SUCCESS;
41,836✔
3987
  int32_t lino = 0;
41,836✔
3988
  int32_t len = 0;
41,836✔
3989
  int32_t keyNum = taosArrayGetSize(keys);
41,836✔
3990
  for (int32_t i = 0; i < keyNum; ++i) {
106,680✔
3991
    SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
64,844✔
3992
    QUERY_CHECK_NULL(pCol, code, lino, _end, terrno);
64,844✔
3993
    len += pCol->bytes;
64,844✔
3994
  }
3995
  len += sizeof(int8_t) * keyNum;  // null flag
41,836✔
3996
  *pLen = len;
41,836✔
3997

3998
_end:
41,836✔
3999
  if (code != TSDB_CODE_SUCCESS) {
41,836✔
4000
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4001
  }
4002
  return code;
41,836✔
4003
}
4004

4005
int32_t parseErrorMsgFromAnalyticServer(SJson* pJson, const char* pId) {
×
4006
  int32_t code = TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
4007
  if (pJson == NULL) {
×
4008
    return code;
×
4009
  }
4010

4011
  char    pMsg[1024] = {0};
×
4012
  int32_t ret = tjsonGetStringValue(pJson, "msg", pMsg);
×
4013

4014
  if (ret == 0) {
×
4015
    qError("%s failed to exec imputation, msg:%s", pId, pMsg);
×
4016
    if (strstr(pMsg, "white noise") != NULL) {
×
4017
      code = TSDB_CODE_ANA_WN_DATA;
×
4018
    } else if (strstr(pMsg, "white-noise") != NULL) {
×
4019
      code = TSDB_CODE_ANA_WN_DATA;
×
4020
    } else if (strstr(pMsg, "[Errno 111] Connection refused") != NULL) {
×
4021
      code = TSDB_CODE_ANA_ALGO_NOT_LOAD;
×
4022
    }
4023
  } else {
4024
    qError("%s failed to extract msg from server, unknown error", pId);
×
4025
  }
4026

4027
  return code;
×
4028
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc