• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4841

09 Nov 2025 09:08AM UTC coverage: 65.517% (-2.3%) from 67.809%
#4841

push

travis-ci

web-flow
Merge a8d8f75bf into b63644c82

246 of 409 new or added lines in 4 files covered. (60.15%)

443 existing lines in 3 files now uncovered.

143261 of 218662 relevant lines covered (65.52%)

299566096.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.47
/source/libs/executor/src/executil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "function.h"
17
#include "functionMgt.h"
18
#include "index.h"
19
#include "os.h"
20
#include "query.h"
21
#include "querynodes.h"
22
#include "taoserror.h"
23
#include "tarray.h"
24
#include "tcompare.h"
25
#include "tdatablock.h"
26
#include "thash.h"
27
#include "tmsg.h"
28
#include "ttime.h"
29

30
#include "executil.h"
31
#include "executorInt.h"
32
#include "querytask.h"
33
#include "storageapi.h"
34
#include "tcompression.h"
35
#include "tutil.h"
36

37
typedef struct tagFilterAssist {
38
  SHashObj* colHash;
39
  int32_t   index;
40
  SArray*   cInfoList;
41
  int32_t   code;
42
} tagFilterAssist;
43

44
typedef struct STransTagExprCtx {
45
  int32_t      code;
46
  SMetaReader* pReader;
47
} STransTagExprCtx;
48

49
typedef enum {
50
  FILTER_NO_LOGIC = 1,
51
  FILTER_AND,
52
  FILTER_OTHER,
53
} FilterCondType;
54

55
static FilterCondType checkTagCond(SNode* cond);
56
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI);
57
static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI,
58
                                        uint64_t suid);
59

60
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
61
                            STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI, void* pStreamInfo);
62

2,147,483,647✔
63
static int64_t getLimit(const SNode* pLimit) {
2,147,483,647✔
64
  return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->limit) ? -1 : ((SLimitNode*)pLimit)->limit->datum.i;
65
}
2,147,483,647✔
66
static int64_t getOffset(const SNode* pLimit) {
2,147,483,647✔
67
  return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->offset) ? -1 : ((SLimitNode*)pLimit)->offset->datum.i;
68
}
69
static void releaseColInfoData(void* pCol);
70

2,147,483,647✔
71
void initResultRowInfo(SResultRowInfo* pResultRowInfo) {
2,147,483,647✔
72
  pResultRowInfo->size = 0;
2,147,483,647✔
73
  pResultRowInfo->cur.pageId = -1;
2,147,483,647✔
74
}
75

56,200,658✔
76
void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; }
77

2,147,483,647✔
78
void resetResultRow(SResultRow* pResultRow, size_t entrySize) {
2,147,483,647✔
79
  pResultRow->numOfRows = 0;
2,147,483,647✔
80
  pResultRow->closed = false;
2,147,483,647✔
81
  pResultRow->endInterp = false;
2,147,483,647✔
82
  pResultRow->startInterp = false;
83

2,147,483,647✔
84
  if (entrySize > 0) {
2,147,483,647✔
85
    memset(pResultRow->pEntryInfo, 0, entrySize);
86
  }
2,147,483,647✔
87
}
88

89
// TODO refactor: use macro
2,147,483,647✔
90
SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) {
2,147,483,647✔
91
  return (SResultRowEntryInfo*)((char*)pRow->pEntryInfo + offset[index]);
92
}
93

1,909,805,113✔
94
size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
1,909,805,113✔
95
  int32_t rowSize = (numOfOutput * sizeof(SResultRowEntryInfo)) + sizeof(SResultRow);
96

2,147,483,647✔
97
  for (int32_t i = 0; i < numOfOutput; ++i) {
2,147,483,647✔
98
    rowSize += pCtx[i].resDataInfo.interBufSize;
99
  }
100

1,909,697,961✔
101
  return rowSize;
102
}
103

UNCOV
104
// Convert buf read from rocksdb to result row
×
105
int32_t getResultRowFromBuf(SExprSupp* pSup, const char* inBuf, size_t inBufSize, char** outBuf, size_t* outBufSize) {
×
106
  if (inBuf == NULL || pSup == NULL) {
×
107
    qError("invalid input parameters, inBuf:%p, pSup:%p", inBuf, pSup);
×
108
    return TSDB_CODE_INVALID_PARA;
UNCOV
109
  }
×
110
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
111
  int32_t*        offset = pSup->rowEntryInfoOffset;
×
112
  SResultRow*     pResultRow = NULL;
×
113
  size_t          processedSize = 0;
×
114
  int32_t         code = TSDB_CODE_SUCCESS;
115

UNCOV
116
  // calculate the size of output buffer
×
117
  *outBufSize = getResultRowSize(pCtx, pSup->numOfExprs);
×
118
  *outBuf = taosMemoryMalloc(*outBufSize);
×
119
  if (*outBuf == NULL) {
×
120
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
121
    return terrno;
UNCOV
122
  }
×
123
  pResultRow = (SResultRow*)*outBuf;
×
124
  (void)memcpy(pResultRow, inBuf, sizeof(SResultRow));
×
125
  inBuf += sizeof(SResultRow);
×
126
  processedSize += sizeof(SResultRow);
UNCOV
127

×
128
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
×
129
    int32_t len = *(int32_t*)inBuf;
×
130
    inBuf += sizeof(int32_t);
×
131
    processedSize += sizeof(int32_t);
×
132
    if (pResultRow->version != FUNCTION_RESULT_INFO_VERSION && pCtx->fpSet.decode) {
×
133
      code = pCtx->fpSet.decode(&pCtx[i], inBuf, getResultEntryInfo(pResultRow, i, offset), pResultRow->version);
×
134
      if (code != TSDB_CODE_SUCCESS) {
×
135
        qError("failed to decode result row, code:%d", code);
×
136
        return code;
137
      }
UNCOV
138
    } else {
×
139
      (void)memcpy(getResultEntryInfo(pResultRow, i, offset), inBuf, len);
UNCOV
140
    }
×
141
    inBuf += len;
×
142
    processedSize += len;
143
  }
UNCOV
144

×
145
  if (processedSize < inBufSize) {
UNCOV
146
    // stream stores extra data after result row
×
147
    size_t leftLen = inBufSize - processedSize;
×
148
    TAOS_MEMORY_REALLOC(*outBuf, *outBufSize + leftLen);
×
149
    if (*outBuf == NULL) {
×
150
      qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen);
×
151
      return terrno;
UNCOV
152
    }
×
153
    (void)memcpy(*outBuf + *outBufSize, inBuf, leftLen);
×
154
    inBuf += leftLen;
×
155
    processedSize += leftLen;
×
156
    *outBufSize += leftLen;
157
  }
UNCOV
158

×
159
  qTrace("[StreamInternal] get result inBufSize:%zu, outBufSize:%zu", inBufSize, *outBufSize);
×
160
  return TSDB_CODE_SUCCESS;
161
}
162

UNCOV
163
// Convert result row to buf for rocksdb
×
164
int32_t putResultRowToBuf(SExprSupp* pSup, const char* inBuf, size_t inBufSize, char** outBuf, size_t* outBufSize) {
×
165
  if (pSup == NULL || inBuf == NULL || outBuf == NULL || outBufSize == NULL) {
×
166
    qError("invalid input parameters, inBuf:%p, pSup:%p, outBufSize:%p, outBuf:%p", inBuf, pSup, outBufSize, outBuf);
×
167
    return TSDB_CODE_INVALID_PARA;
168
  }
UNCOV
169

×
170
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
171
  int32_t*        offset = pSup->rowEntryInfoOffset;
×
172
  SResultRow*     pResultRow = (SResultRow*)inBuf;
×
173
  size_t          rowSize = getResultRowSize(pCtx, pSup->numOfExprs);
UNCOV
174

×
175
  if (rowSize > inBufSize) {
×
176
    qError("invalid input buffer size, rowSize:%zu, inBufSize:%zu", rowSize, inBufSize);
×
177
    return TSDB_CODE_INVALID_PARA;
178
  }
179

UNCOV
180
  // calculate the size of output buffer
×
181
  *outBufSize = rowSize + sizeof(int32_t) * pSup->numOfExprs;
×
182
  if (rowSize < inBufSize) {
×
183
    *outBufSize += inBufSize - rowSize;
184
  }
UNCOV
185

×
186
  *outBuf = taosMemoryMalloc(*outBufSize);
×
187
  if (*outBuf == NULL) {
×
188
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
189
    return terrno;
190
  }
UNCOV
191

×
192
  char* pBuf = *outBuf;
×
193
  pResultRow->version = FUNCTION_RESULT_INFO_VERSION;
×
194
  (void)memcpy(pBuf, pResultRow, sizeof(SResultRow));
×
195
  pBuf += sizeof(SResultRow);
×
196
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
×
197
    size_t len = sizeof(SResultRowEntryInfo) + pCtx[i].resDataInfo.interBufSize;
×
198
    *(int32_t*)pBuf = (int32_t)len;
×
199
    pBuf += sizeof(int32_t);
×
200
    (void)memcpy(pBuf, getResultEntryInfo(pResultRow, i, offset), len);
×
201
    pBuf += len;
202
  }
UNCOV
203

×
204
  if (rowSize < inBufSize) {
UNCOV
205
    // stream stores extra data after result row
×
206
    size_t leftLen = inBufSize - rowSize;
×
207
    (void)memcpy(pBuf, inBuf + rowSize, leftLen);
×
208
    pBuf += leftLen;
209
  }
UNCOV
210

×
211
  qTrace("[StreamInternal] put result inBufSize:%zu, outBufSize:%zu", inBufSize, *outBufSize);
×
212
  return TSDB_CODE_SUCCESS;
213
}
UNCOV
214

×
215
static void freeEx(void* p) { taosMemoryFree(*(void**)p); }
216

987,791,557✔
217
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
987,791,557✔
218
  taosMemoryFreeClear(pGroupResInfo->pBuf);
987,803,729✔
219
  if (pGroupResInfo->freeItem) {
UNCOV
220
    //    taosArrayDestroy(pGroupResInfo->pRows);
×
221
    taosArrayDestroyEx(pGroupResInfo->pRows, freeEx);
×
222
    pGroupResInfo->freeItem = false;
×
223
    pGroupResInfo->pRows = NULL;
224
  } else {
987,757,542✔
225
    taosArrayDestroy(pGroupResInfo->pRows);
987,777,431✔
226
    pGroupResInfo->pRows = NULL;
227
  }
987,793,317✔
228
  pGroupResInfo->index = 0;
987,793,502✔
229
  pGroupResInfo->delIndex = 0;
987,764,587✔
230
}
231

2,147,483,647✔
232
int32_t resultrowComparAsc(const void* p1, const void* p2) {
2,147,483,647✔
233
  SResKeyPos* pp1 = *(SResKeyPos**)p1;
2,147,483,647✔
234
  SResKeyPos* pp2 = *(SResKeyPos**)p2;
235

2,147,483,647✔
236
  if (pp1->groupId == pp2->groupId) {
2,147,483,647✔
237
    int64_t pts1 = *(int64_t*)pp1->key;
2,147,483,647✔
238
    int64_t pts2 = *(int64_t*)pp2->key;
239

2,147,483,647✔
UNCOV
240
    if (pts1 == pts2) {
×
241
      return 0;
242
    } else {
2,147,483,647✔
243
      return pts1 < pts2 ? -1 : 1;
244
    }
245
  } else {
2,147,483,647✔
246
    return pp1->groupId < pp2->groupId ? -1 : 1;
247
  }
248
}
249

2,147,483,647✔
250
static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); }
251

676,076,985✔
252
int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) {
676,076,985✔
253
  int32_t code = TSDB_CODE_SUCCESS;
676,076,985✔
254
  int32_t lino = 0;
676,076,985✔
255
  if (pGroupResInfo->pRows != NULL) {
60,347,681✔
256
    taosArrayDestroy(pGroupResInfo->pRows);
257
  }
676,104,789✔
258
  if (pGroupResInfo->pBuf) {
60,347,681✔
259
    taosMemoryFree(pGroupResInfo->pBuf);
60,347,681✔
260
    pGroupResInfo->pBuf = NULL;
261
  }
262

263
  // extract the result rows information from the hash map
676,066,472✔
264
  int32_t size = tSimpleHashGetSize(pHashmap);
265

676,071,745✔
266
  void* pData = NULL;
676,071,745✔
267
  pGroupResInfo->pRows = taosArrayInit(size, POINTER_BYTES);
676,073,975✔
268
  QUERY_CHECK_NULL(pGroupResInfo->pRows, code, lino, _end, terrno);
269

676,048,934✔
270
  size_t  keyLen = 0;
676,059,702✔
271
  int32_t iter = 0;
676,092,332✔
272
  int64_t bufLen = 0, offset = 0;
273

274
  // todo move away and record this during create window
2,147,483,647✔
275
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
276
    /*void* key = */ (void)tSimpleHashGetKey(pData, &keyLen);
2,147,483,647✔
277
    bufLen += keyLen + sizeof(SResultRowPosition);
278
  }
279

675,398,610✔
280
  pGroupResInfo->pBuf = taosMemoryMalloc(bufLen);
676,112,180✔
281
  QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
282

676,080,131✔
283
  iter = 0;
2,147,483,647✔
284
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
2,147,483,647✔
285
    void* key = tSimpleHashGetKey(pData, &keyLen);
286

2,147,483,647✔
287
    SResKeyPos* p = (SResKeyPos*)(pGroupResInfo->pBuf + offset);
288

2,147,483,647✔
289
    p->groupId = *(uint64_t*)key;
2,147,483,647✔
290
    p->pos = *(SResultRowPosition*)pData;
2,147,483,647✔
291
    memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
2,147,483,647✔
292
    void* tmp = taosArrayPush(pGroupResInfo->pRows, &p);
2,147,483,647✔
293
    QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
294

2,147,483,647✔
295
    offset += keyLen + sizeof(struct SResultRowPosition);
296
  }
297

675,758,551✔
298
  if (order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC) {
42,146,714✔
299
    __compar_fn_t fn = (order == TSDB_ORDER_ASC) ? resultrowComparAsc : resultrowComparDesc;
42,146,714✔
300
    size = POINTER_BYTES;
42,146,714✔
301
    taosSort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), size, fn);
302
  }
303

675,757,556✔
304
  pGroupResInfo->index = 0;
305

675,758,253✔
306
_end:
676,090,888✔
UNCOV
307
  if (code != TSDB_CODE_SUCCESS) {
×
308
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
309
  }
676,090,888✔
310
  return code;
311
}
UNCOV
312

×
313
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
×
314
  if (pGroupResInfo->pRows != NULL) {
×
315
    taosArrayDestroy(pGroupResInfo->pRows);
316
  }
UNCOV
317

×
318
  pGroupResInfo->freeItem = true;
×
319
  pGroupResInfo->pRows = pArrayList;
×
320
  pGroupResInfo->index = 0;
×
321
  pGroupResInfo->delIndex = 0;
×
322
}
323

2,147,483,647✔
324
bool hasRemainResults(SGroupResInfo* pGroupResInfo) {
2,147,483,647✔
UNCOV
325
  if (pGroupResInfo->pRows == NULL) {
×
326
    return false;
327
  }
328

2,147,483,647✔
329
  return pGroupResInfo->index < taosArrayGetSize(pGroupResInfo->pRows);
330
}
331

1,464,295,320✔
332
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
1,464,295,320✔
UNCOV
333
  if (pGroupResInfo->pRows == 0) {
×
334
    return 0;
335
  }
336

1,464,294,945✔
337
  return (int32_t)taosArrayGetSize(pGroupResInfo->pRows);
338
}
339

379,509,584✔
340
SArray* createSortInfo(SNodeList* pNodeList) {
379,509,584✔
341
  size_t numOfCols = 0;
342

379,509,584✔
343
  if (pNodeList != NULL) {
379,137,313✔
344
    numOfCols = LIST_LENGTH(pNodeList);
345
  } else {
372,313✔
346
    numOfCols = 0;
347
  }
348

379,506,955✔
349
  SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
379,461,460✔
UNCOV
350
  if (pList == NULL) {
×
351
    return pList;
352
  }
353

830,323,769✔
354
  for (int32_t i = 0; i < numOfCols; ++i) {
450,845,672✔
355
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)nodesListGetNode(pNodeList, i);
450,876,885✔
UNCOV
356
    if (!pSortKey) {
×
357
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
358
      taosArrayDestroy(pList);
×
359
      pList = NULL;
×
360
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
361
      break;
362
    }
450,876,885✔
363
    SBlockOrderInfo bi = {0};
450,848,495✔
364
    bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
450,860,084✔
365
    bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
366

450,855,323✔
UNCOV
367
    if (nodeType(pSortKey->pExpr) != QUERY_NODE_COLUMN) {
×
368
      qError("invalid order by expr type:%d", nodeType(pSortKey->pExpr));
×
369
      taosArrayDestroy(pList);
×
370
      pList = NULL;
×
371
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
372
      break;
373
    }
374
    
450,778,494✔
375
    SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr;
450,860,821✔
376
    bi.slotId = pColNode->slotId;
450,866,329✔
377
    void* tmp = taosArrayPush(pList, &bi);
450,866,329✔
UNCOV
378
    if (!tmp) {
×
379
      taosArrayDestroy(pList);
×
380
      pList = NULL;
×
381
      break;
382
    }
383
  }
384

379,481,345✔
385
  return pList;
386
}
387

2,147,483,647✔
388
SSDataBlock* createDataBlockFromDescNode(void* p) {
2,147,483,647✔
389
  SDataBlockDescNode* pNode = (SDataBlockDescNode*)p;
2,147,483,647✔
390
  int32_t      numOfCols = LIST_LENGTH(pNode->pSlots);
2,147,483,647✔
391
  SSDataBlock* pBlock = NULL;
2,147,483,647✔
392
  int32_t      code = createDataBlock(&pBlock);
2,147,483,647✔
UNCOV
393
  if (code) {
×
394
    terrno = code;
×
395
    return NULL;
396
  }
397

2,147,483,647✔
398
  pBlock->info.id.blockId = pNode->dataBlockId;
2,147,483,647✔
399
  pBlock->info.type = STREAM_INVALID;
2,147,483,647✔
400
  pBlock->info.calWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
2,147,483,647✔
401
  pBlock->info.watermark = INT64_MIN;
402

2,147,483,647✔
403
  for (int32_t i = 0; i < numOfCols; ++i) {
2,147,483,647✔
404
    SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
2,147,483,647✔
UNCOV
405
    if (!pDescNode) {
×
406
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
407
      blockDataDestroy(pBlock);
×
408
      pBlock = NULL;
×
409
      terrno = TSDB_CODE_INVALID_PARA;
×
410
      break;
411
    }
2,147,483,647✔
412
    SColumnInfoData idata =
2,147,483,647✔
413
        createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
2,147,483,647✔
414
    idata.info.scale = pDescNode->dataType.scale;
2,147,483,647✔
415
    idata.info.precision = pDescNode->dataType.precision;
2,147,483,647✔
416
    idata.info.noData = pDescNode->reserve;
417

2,147,483,647✔
418
    code = blockDataAppendColInfo(pBlock, &idata);
2,147,483,647✔
419
    if (code != TSDB_CODE_SUCCESS) {
661,148✔
420
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
661,148✔
UNCOV
421
      blockDataDestroy(pBlock);
×
422
      pBlock = NULL;
×
423
      terrno = code;
×
424
      break;
425
    }
426
  }
427

2,147,483,647✔
428
  return pBlock;
429
}
430

1,656,566,117✔
431
int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) {
1,656,566,117✔
432
  SDataBlockInfo* pBlockInfo = &pDataBlock->info;
433

2,147,483,647✔
434
  for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) {
2,147,483,647✔
435
    SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i);
2,147,483,647✔
UNCOV
436
    if (!pItem) {
×
437
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
438
      return terrno;
439
    }
440

2,147,483,647✔
441
    if (pItem->isPk) {
62,678,988✔
442
      SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId);
62,223,402✔
UNCOV
443
      if (!pInfoData) {
×
444
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
445
        return terrno;
446
      }
62,223,402✔
447
      pBlockInfo->pks[0].type = pInfoData->info.type;
62,321,231✔
448
      pBlockInfo->pks[1].type = pInfoData->info.type;
449

450
      // allocate enough buffer size, which is pInfoData->info.bytes
62,344,333✔
451
      if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
20,853,235✔
452
        pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
20,701,079✔
UNCOV
453
        if (pBlockInfo->pks[0].pData == NULL) {
×
454
          return terrno;
455
        }
456

20,706,111✔
457
        pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
20,709,144✔
UNCOV
458
        if (pBlockInfo->pks[1].pData == NULL) {
×
459
          taosMemoryFreeClear(pBlockInfo->pks[0].pData);
×
460
          return terrno;
461
        }
462

20,709,655✔
463
        pBlockInfo->pks[0].nData = pInfoData->info.bytes;
20,716,207✔
464
        pBlockInfo->pks[1].nData = pInfoData->info.bytes;
465
      }
466

62,233,881✔
467
      break;
468
    }
469
  }
470

1,656,943,890✔
471
  return TSDB_CODE_SUCCESS;
472
}
473

22,902,103✔
474
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
22,902,103✔
475
  STransTagExprCtx* pCtx = pContext;
22,902,103✔
476
  SMetaReader*      mr = pCtx->pReader;
22,902,103✔
477
  bool              isTagCol = false, isTbname = false;
22,902,103✔
478
  if (nodeType(*pNode) == QUERY_NODE_COLUMN) {
6,543,458✔
479
    SColumnNode* pCol = (SColumnNode*)*pNode;
6,543,458✔
UNCOV
480
    if (pCol->colType == COLUMN_TYPE_TBNAME)
×
481
      isTbname = true;
482
    else
6,543,458✔
483
      isTagCol = true;
16,358,645✔
UNCOV
484
  } else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) {
×
485
    SFunctionNode* pFunc = (SFunctionNode*)*pNode;
×
486
    if (pFunc->funcType == FUNCTION_TYPE_TBNAME) isTbname = true;
487
  }
22,902,103✔
488
  if (isTagCol) {
6,543,458✔
489
    SColumnNode* pSColumnNode = *(SColumnNode**)pNode;
490

6,543,458✔
491
    SValueNode* res = NULL;
6,543,458✔
492
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
6,543,458✔
UNCOV
493
    if (NULL == res) {
×
494
      return DEAL_RES_ERROR;
495
    }
496

6,543,458✔
497
    res->translate = true;
6,543,458✔
498
    res->node.resType = pSColumnNode->node.resType;
499

6,543,458✔
500
    STagVal tagVal = {0};
6,543,458✔
501
    tagVal.cid = pSColumnNode->colId;
6,543,458✔
502
    const char* p = mr->pAPI->extractTagVal(mr->me.ctbEntry.pTags, pSColumnNode->node.resType.type, &tagVal);
6,543,458✔
UNCOV
503
    if (p == NULL) {
×
504
      res->node.resType.type = TSDB_DATA_TYPE_NULL;
6,543,458✔
UNCOV
505
    } else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) {
×
506
      int32_t len = ((const STag*)p)->len;
×
507
      res->datum.p = taosMemoryCalloc(len + 1, 1);
×
508
      if (NULL == res->datum.p) {
×
509
        return DEAL_RES_ERROR;
UNCOV
510
      }
×
511
      memcpy(res->datum.p, p, len);
6,543,458✔
512
    } else if (IS_VAR_DATA_TYPE(pSColumnNode->node.resType.type)) {
6,543,458✔
UNCOV
513
      if (IS_STR_DATA_BLOB(pSColumnNode->node.resType.type)) {
×
514
        return TSDB_CODE_BLOB_NOT_SUPPORT_TAG;
515
      }
516

6,543,458✔
517
      res->datum.p = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1);
6,543,458✔
UNCOV
518
      if (NULL == res->datum.p) {
×
519
        return DEAL_RES_ERROR;
520
      }
521

6,543,458✔
UNCOV
522
      if (IS_STR_DATA_BLOB(pSColumnNode->node.resType.type)) {
×
523
        memcpy(blobDataVal(res->datum.p), tagVal.pData, tagVal.nData);
×
524
        blobDataSetLen(res->datum.p, tagVal.nData);
525
      } else {
6,543,458✔
526
        memcpy(varDataVal(res->datum.p), tagVal.pData, tagVal.nData);
6,543,458✔
527
        varDataSetLen(res->datum.p, tagVal.nData);
528
      }
UNCOV
529
    } else {
×
530
      int32_t code = nodesSetValueNodeValue(res, &(tagVal.i64));
×
531
      if (code != TSDB_CODE_SUCCESS) {
×
532
        return DEAL_RES_ERROR;
533
      }
534
    }
6,543,458✔
535
    nodesDestroyNode(*pNode);
6,543,458✔
536
    *pNode = (SNode*)res;
16,358,645✔
UNCOV
537
  } else if (isTbname) {
×
538
    SValueNode* res = NULL;
×
539
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
×
540
    if (NULL == res) {
×
541
      return DEAL_RES_ERROR;
542
    }
UNCOV
543

×
544
    res->translate = true;
×
545
    res->node.resType = ((SExprNode*)(*pNode))->resType;
UNCOV
546

×
547
    int32_t len = strlen(mr->me.name);
×
548
    res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
×
549
    if (NULL == res->datum.p) {
×
550
      return DEAL_RES_ERROR;
UNCOV
551
    }
×
552
    memcpy(varDataVal(res->datum.p), mr->me.name, len);
×
553
    varDataSetLen(res->datum.p, len);
×
554
    nodesDestroyNode(*pNode);
×
555
    *pNode = (SNode*)res;
556
  }
557

22,902,103✔
558
  return DEAL_RES_CONTINUE;
559
}
560

3,271,729✔
561
int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI* pAPI) {
3,271,729✔
562
  int32_t     code = TSDB_CODE_SUCCESS;
3,271,729✔
563
  SMetaReader mr = {0};
564

3,271,729✔
565
  pAPI->metaReaderFn.initReader(&mr, metaHandle, META_READER_LOCK, &pAPI->metaFn);
3,271,729✔
566
  code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, info->uid);
3,271,729✔
UNCOV
567
  if (TSDB_CODE_SUCCESS != code) {
×
568
    pAPI->metaReaderFn.clearReader(&mr);
×
569
    *pQualified = false;
UNCOV
570

×
571
    return TSDB_CODE_SUCCESS;
572
  }
573

3,271,729✔
574
  SNode* pTagCondTmp = NULL;
3,271,729✔
575
  code = nodesCloneNode(pTagCond, &pTagCondTmp);
3,271,729✔
UNCOV
576
  if (TSDB_CODE_SUCCESS != code) {
×
577
    *pQualified = false;
×
578
    pAPI->metaReaderFn.clearReader(&mr);
×
579
    return code;
580
  }
3,271,729✔
581
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
3,271,729✔
582
  nodesRewriteExprPostOrder(&pTagCondTmp, doTranslateTagExpr, &ctx);
3,271,729✔
583
  pAPI->metaReaderFn.clearReader(&mr);
3,271,729✔
UNCOV
584
  if (TSDB_CODE_SUCCESS != ctx.code) {
×
585
    *pQualified = false;
×
586
    terrno = code;
×
587
    return code;
588
  }
589

3,271,729✔
590
  SNode* pNew = NULL;
3,271,729✔
591
  code = scalarCalculateConstants(pTagCondTmp, &pNew);
3,271,729✔
UNCOV
592
  if (TSDB_CODE_SUCCESS != code) {
×
593
    terrno = code;
×
594
    nodesDestroyNode(pTagCondTmp);
×
595
    *pQualified = false;
UNCOV
596

×
597
    return code;
598
  }
599

3,271,729✔
600
  SValueNode* pValue = (SValueNode*)pNew;
3,271,729✔
601
  *pQualified = pValue->datum.b;
602

3,271,729✔
603
  nodesDestroyNode(pNew);
3,271,729✔
604
  return TSDB_CODE_SUCCESS;
605
}
606

575,155,065✔
607
static EDealRes getColumn(SNode** pNode, void* pContext) {
575,155,065✔
608
  tagFilterAssist* pData = (tagFilterAssist*)pContext;
575,155,065✔
609
  SColumnNode*     pSColumnNode = NULL;
575,219,955✔
610
  if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
192,203,354✔
611
    pSColumnNode = *(SColumnNode**)pNode;
383,353,594✔
612
  } else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) {
8,214,490✔
613
    SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
8,220,478✔
614
    if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
7,766,544✔
615
      pData->code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pSColumnNode);
7,774,136✔
UNCOV
616
      if (NULL == pSColumnNode) {
×
617
        return DEAL_RES_ERROR;
618
      }
7,774,136✔
619
      pSColumnNode->colId = -1;
7,774,136✔
620
      pSColumnNode->colType = COLUMN_TYPE_TBNAME;
7,775,743✔
621
      pSColumnNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
7,772,327✔
622
      pSColumnNode->node.resType.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
7,768,458✔
623
      nodesDestroyNode(*pNode);
7,770,701✔
624
      *pNode = (SNode*)pSColumnNode;
625
    } else {
448,236✔
626
      return DEAL_RES_CONTINUE;
627
    }
628
  } else {
374,851,448✔
629
    return DEAL_RES_CONTINUE;
630
  }
631

199,967,843✔
632
  void* data = taosHashGet(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId));
199,905,486✔
633
  if (!data) {
634
    int32_t tempRes =
174,185,049✔
635
        taosHashPut(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode)));
174,270,035✔
UNCOV
636
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
×
637
      return DEAL_RES_ERROR;
638
    }
174,270,035✔
639
    pSColumnNode->slotId = pData->index++;
174,235,283✔
640
    SColumnInfo cInfo = {.colId = pSColumnNode->colId,
174,223,118✔
641
                         .type = pSColumnNode->node.resType.type,
174,150,883✔
642
                         .bytes = pSColumnNode->node.resType.bytes,
174,231,726✔
643
                         .pk = pSColumnNode->isPk};
644
#if TAG_FILTER_DEBUG
645
    qDebug("tagfilter build column info, slotId:%d, colId:%d, type:%d", pSColumnNode->slotId, cInfo.colId, cInfo.type);
646
#endif
174,189,744✔
647
    void* tmp = taosArrayPush(pData->cInfoList, &cInfo);
174,224,666✔
UNCOV
648
    if (!tmp) {
×
649
      return DEAL_RES_ERROR;
650
    }
651
  } else {
25,720,437✔
652
    SColumnNode* col = *(SColumnNode**)data;
25,722,424✔
653
    pSColumnNode->slotId = col->slotId;
654
  }
655

199,917,853✔
656
  return DEAL_RES_CONTINUE;
657
}
658

165,146,401✔
659
static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) {
165,146,401✔
660
  SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
165,122,947✔
UNCOV
661
  if (pColumnData == NULL) {
×
662
    return terrno;
663
  }
664

165,122,947✔
665
  pColumnData->info.type = pType->type;
165,164,480✔
666
  pColumnData->info.bytes = pType->bytes;
165,121,365✔
667
  pColumnData->info.scale = pType->scale;
165,159,534✔
668
  pColumnData->info.precision = pType->precision;
669

164,996,059✔
670
  int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
165,087,750✔
UNCOV
671
  if (code != TSDB_CODE_SUCCESS) {
×
672
    terrno = code;
×
673
    releaseColInfoData(pColumnData);
×
674
    return terrno;
675
  }
676

165,087,750✔
677
  pParam->columnData = pColumnData;
165,098,162✔
678
  pParam->colAlloced = true;
165,089,743✔
679
  return TSDB_CODE_SUCCESS;
680
}
681

36,919,695✔
682
static void releaseColInfoData(void* pCol) {
36,919,695✔
683
  if (pCol) {
36,923,941✔
684
    SColumnInfoData* col = (SColumnInfoData*)pCol;
36,923,941✔
685
    colDataDestroy(col);
36,921,726✔
686
    taosMemoryFree(col);
687
  }
36,908,942✔
688
}
689

2,027,866,145✔
690
void freeItem(void* p) {
2,027,866,145✔
691
  STUidTagInfo* pInfo = p;
2,027,866,145✔
692
  if (pInfo->pTagVal != NULL) {
2,023,309,229✔
693
    taosMemoryFree(pInfo->pTagVal);
694
  }
2,027,861,589✔
695
}
696

77,490✔
697
static bool canOptimizeTagFilter(const SNode* pTagCond) {
77,490✔
698
  if (NULL == pTagCond) return false;
51,660✔
699
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR &&
700
    ((SOperatorNode*)pTagCond)->opType == OP_TYPE_EQUAL) {
701
    return true;
25,830✔
702
  }
25,830✔
703
  if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION &&
25,830✔
704
    ((SLogicConditionNode*)pTagCond)->condType == LOGIC_COND_TYPE_AND) {
25,830✔
NEW
705
    SNode* pChild = NULL;
×
NEW
706
    FOREACH(pChild, ((SLogicConditionNode*)pTagCond)->pParameterList) {
×
707
      if (QUERY_NODE_OPERATOR != nodeType(pChild) ||
708
        ((SOperatorNode*)pChild)->opType != OP_TYPE_EQUAL) {
709
        return false;
25,830✔
710
      }
25,830✔
711
    }
25,830✔
712
    return true;
713
  }
714
  return false;
715
}
716

717
typedef struct {
718
  col_id_t  colId;
719
  SNode*    pValueNode;
25,830✔
720
} STagDataEntry;
25,830✔
721

722
static int compareTagDataEntry(const void* a, const void* b) {
NEW
723
  STagDataEntry* p1 = (STagDataEntry*)a;
×
NEW
724
  STagDataEntry* p2 = (STagDataEntry*)b;
×
NEW
725
  
×
NEW
726
  return compareInt16Val(&p1->colId, &p2->colId);
×
NEW
727
}
×
NEW
728

×
NEW
729
static int32_t buildTagDataEntryKey(SArray* pIdWithValue, char** keyBuf, int32_t keyLen) {
×
730
  *keyBuf = (char*)taosMemoryCalloc(1, keyLen);
NEW
731
  if (NULL == *keyBuf) {
×
NEW
732
    qError(
×
NEW
733
      "failed to allocate memory for tag filter optimization key, size:%d",
×
NEW
734
      keyLen);
×
NEW
735
    return terrno;
×
736
  }
737
  char* pStart = *keyBuf;
NEW
738
  for (int32_t i = 0; i < taosArrayGetSize(pIdWithValue); ++i) {
×
NEW
739
    STagDataEntry* entry = (STagDataEntry*)taosArrayGet(pIdWithValue, i);
×
NEW
740
    SValueNode*   pValueNode = (SValueNode*)entry->pValueNode;
×
741

NEW
742
    (void)memcpy(pStart, &entry->colId, sizeof(col_id_t));
×
NEW
743
    pStart += sizeof(col_id_t);
×
NEW
744

×
NEW
745
    switch (pValueNode->node.resType.type) {
×
746
      case TSDB_DATA_TYPE_BOOL:
NEW
747
        (void)memcpy(
×
748
          pStart, &pValueNode->datum.b, pValueNode->node.resType.bytes);
749
        pStart += pValueNode->node.resType.bytes;
750
        break;
162,479,010✔
751
      case TSDB_DATA_TYPE_TINYINT:
162,479,010✔
752
      case TSDB_DATA_TYPE_SMALLINT:
162,479,010✔
753
      case TSDB_DATA_TYPE_INT:
162,496,069✔
754
      case TSDB_DATA_TYPE_BIGINT:
162,444,251✔
NEW
755
      case TSDB_DATA_TYPE_TIMESTAMP:
×
NEW
756
        (void)memcpy(
×
757
          pStart, &pValueNode->datum.i, pValueNode->node.resType.bytes);
758
        pStart += pValueNode->node.resType.bytes;
759
        break;
162,444,251✔
760
      case TSDB_DATA_TYPE_UTINYINT:
162,444,251✔
761
      case TSDB_DATA_TYPE_USMALLINT:
162,507,630✔
762
      case TSDB_DATA_TYPE_UINT:
87,114✔
NEW
763
      case TSDB_DATA_TYPE_UBIGINT:
×
764
        (void)memcpy(
765
          pStart, &pValueNode->datum.u, pValueNode->node.resType.bytes);
766
        pStart += pValueNode->node.resType.bytes;
162,420,516✔
767
        break;
34,233,496✔
768
      case TSDB_DATA_TYPE_FLOAT:
71,153,126✔
769
      case TSDB_DATA_TYPE_DOUBLE:
36,918,324✔
770
        (void)memcpy(
36,919,223✔
NEW
771
          pStart, &pValueNode->datum.d, pValueNode->node.resType.bytes);
×
NEW
772
        pStart += pValueNode->node.resType.bytes;
×
773
        break;
774
      case TSDB_DATA_TYPE_VARCHAR:
36,919,223✔
775
      case TSDB_DATA_TYPE_VARBINARY:
776
      case TSDB_DATA_TYPE_NCHAR:
777
        (void)memcpy(pStart,
128,187,020✔
778
          varDataVal(pValueNode->datum.p), varDataLen(pValueNode->datum.p));
128,193,773✔
779
        pStart += varDataLen(pValueNode->datum.p);
128,223,247✔
NEW
780
        break;
×
NEW
781
      case TSDB_DATA_TYPE_JSON: {
×
782
        int32_t jsonLen = getJsonValueLen(pValueNode->datum.p);
783
        (void)memcpy(pStart, varDataVal(pValueNode->datum.p), jsonLen);
784
        pStart += jsonLen;
785
        break;
162,418,462✔
786
      }
162,376,865✔
787
      default:
788
        qError("unsupported tag data type %d in tag filter optimization",
162,421,039✔
789
          pValueNode->node.resType.type);
162,422,690✔
790
        return TSDB_CODE_STREAM_INTERNAL_ERROR;
162,237,194✔
791
    }
162,304,521✔
792
  }
793

794
  return TSDB_CODE_SUCCESS;
9,461,766✔
795
}
9,461,766✔
796

9,461,766✔
797
static void extractTagDataEntry(
9,469,772✔
NEW
798
  SOperatorNode* pOpNode, SArray* pIdWithValue) {
×
NEW
799
  SNode* pLeft = pOpNode->pLeft;
×
800
  SNode* pRight = pOpNode->pRight;
801
  SColumnNode* pColNode = nodeType(pLeft) == QUERY_NODE_COLUMN ?
18,935,142✔
NEW
802
    (SColumnNode*)pLeft : (SColumnNode*)pRight;
×
803
  SValueNode* pValueNode = nodeType(pLeft) == QUERY_NODE_VALUE ?
804
    (SValueNode*)pLeft : (SValueNode*)pRight;
9,465,370✔
805

9,465,410✔
806
  STagDataEntry entry = {0};
9,458,841✔
NEW
807
  entry.colId = pColNode->colId;
×
NEW
808
  entry.pValueNode = (SNode*)pValueNode;
×
NEW
809
  taosArrayPush(pIdWithValue, &entry);
×
810
  STagDataEntry* pLastEntry = taosArrayGetLast(pIdWithValue);
NEW
811
  ((SValueNode*)pLastEntry->pValueNode)->node.resType = pColNode->node.resType;
×
NEW
812
}
×
NEW
813

×
814
static int32_t extractTagFilterTagDataEntries(
NEW
815
  const SNode* pTagCond, SArray* pIdWithVal) {
×
NEW
816
  if (NULL == pTagCond || NULL == pIdWithVal ||
×
NEW
817
    (nodeType(pTagCond) != QUERY_NODE_OPERATOR &&
×
NEW
818
      nodeType(pTagCond) != QUERY_NODE_LOGIC_CONDITION)) {
×
NEW
819
    qError("invalid parameter to extract tag filter symbol");
×
NEW
820
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
NEW
821
  }
×
822

NEW
823
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR) {
×
NEW
824
    extractTagDataEntry((SOperatorNode*)pTagCond, pIdWithVal);
×
825
  } else if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION) {
9,457,415✔
826
    SNode* pChild = NULL;
6,896,484✔
NEW
827
    FOREACH(pChild, ((SLogicConditionNode*)pTagCond)->pParameterList) {
×
NEW
828
      extractTagDataEntry((SOperatorNode*)pChild, pIdWithVal);
×
829
    }
830
  }
6,897,995✔
831

6,903,065✔
832
  taosArraySort(pIdWithVal, compareTagDataEntry);
6,900,851✔
833

6,905,261✔
NEW
834
  return TSDB_CODE_SUCCESS;
×
NEW
835
}
×
836

837
static int32_t genTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
6,901,642✔
838
  if (pTagCond == NULL) {
6,901,642✔
839
    return TSDB_CODE_SUCCESS;
2,559,434✔
UNCOV
840
  }
×
UNCOV
841

×
UNCOV
842
  char*   payload = NULL;
×
UNCOV
843
  int32_t len = 0;
×
NEW
844
  int32_t code = TSDB_CODE_SUCCESS;
×
NEW
845
  if (canOptimizeTagFilter(pTagCond)) {
×
846
    SArray* pIdWithVal = taosArrayInit(TARRAY_MIN_SIZE, sizeof(STagDataEntry));
NEW
847
    code = extractTagFilterTagDataEntries(pTagCond, pIdWithVal);
×
NEW
848
    if (TSDB_CODE_SUCCESS != code) {
×
849
      qError("%s failed at line %d since %s",
850
        __func__, __LINE__, tstrerror(code));
2,555,821✔
851
      taosArrayDestroy(pIdWithVal);
2,560,893✔
852
      return code;
2,560,202✔
853
    }
854
    for (int32_t i = 0; i < taosArrayGetSize(pIdWithVal); ++i) {
855
      STagDataEntry* pEntry = taosArrayGet(pIdWithVal, i);
1,955,209✔
856
      len += sizeof(col_id_t) + 
9,466,914✔
NEW
857
        ((SValueNode*)pEntry->pValueNode)->node.resType.bytes;
×
NEW
858
    }
×
859
    code = buildTagDataEntryKey(pIdWithVal, &payload, len);
860
    if (TSDB_CODE_SUCCESS != code) {
9,466,914✔
861
      qError("%s failed at line %d since %s",
862
        __func__, __LINE__, tstrerror(code));
863
      taosArrayDestroy(pIdWithVal);
1,610,688✔
864
      return code;
865
    }
1,610,688✔
866
  } else {
1,610,688✔
867
    code = nodesNodeToMsg(pTagCond, &payload, &len);
1,610,688✔
868
  }
1,610,688✔
869
  if (code != TSDB_CODE_SUCCESS) {
1,612,882✔
870
    qError("%s failed at line %d since %s",
1,612,882✔
871
      __func__, __LINE__, tstrerror(code));
1,612,882✔
872
    return code;
1,612,882✔
873
  }
874

1,612,882✔
875
  tMD5Init(pContext);
1,612,882✔
UNCOV
876
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
×
877
  tMD5Final(pContext);
878

879
  // void* tmp = NULL;
1,612,882✔
880
  // uint32_t size = 0;
1,612,882✔
881
  // (void)taosAscii2Hex((const char*)pContext->digest, 16, &tmp, &size);
882
  // qInfo("tag filter digest payload: %s", tmp);
6,445,488✔
883
  // taosMemoryFree(tmp);
4,834,807✔
884

4,834,807✔
885
  taosMemoryFree(payload);
4,834,807✔
886
  return TSDB_CODE_SUCCESS;
4,832,606✔
887
}
4,832,606✔
888

889
static int32_t genTbGroupDigest(const SNode* pGroup, uint8_t* filterDigest, T_MD5_CTX* pContext) {
890
  int32_t code = TSDB_CODE_SUCCESS;
1,610,681✔
891
  int32_t lino = 0;
1,612,882✔
892
  char*   payload = NULL;
893
  int32_t len = 0;
×
894
  code = nodesNodeToMsg(pGroup, &payload, &len);
895
  QUERY_CHECK_CODE(code, lino, _end);
1,612,882✔
UNCOV
896

×
897
  if (filterDigest[0]) {
898
    payload = taosMemoryRealloc(payload, len + tListLen(pContext->digest));
899
    QUERY_CHECK_NULL(payload, code, lino, _end, terrno);
1,612,882✔
900
    memcpy(payload + len, filterDigest + 1, tListLen(pContext->digest));
1,612,882✔
901
    len += tListLen(pContext->digest);
1,612,882✔
UNCOV
902
  }
×
903

904
  tMD5Init(pContext);
905
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
4,210,565✔
906
  tMD5Final(pContext);
2,597,683✔
907

2,597,683✔
908
_end:
1,611,453✔
909
  taosMemoryFree(payload);
910
  if (code != TSDB_CODE_SUCCESS) {
911
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
912
  }
1,612,882✔
913
  return code;
1,612,882✔
914
}
1,611,444✔
915

1,611,444✔
UNCOV
916
int32_t qGetColumnsFromNodeList(void* data, bool isList, SArray** pColList) {
×
UNCOV
917
  int32_t code = TSDB_CODE_SUCCESS;
×
918
  tagFilterAssist ctx = {0};
919
  ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
920
  if (ctx.colHash == NULL) {
1,611,444✔
921
    code = terrno;
1,611,444✔
922
    goto end;
923
  }
1,612,882✔
924

1,612,882✔
925
  ctx.index = 0;
926
  ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
1,612,882✔
927
  if (ctx.cInfoList == NULL) {
1,611,444✔
928
    code = terrno;
929
    goto end;
1,611,444✔
930
  }
4,209,127✔
931

2,597,683✔
932
  if (isList) {
933
    SNode* pNode = NULL;
2,597,683✔
UNCOV
934
    FOREACH(pNode, (SNodeList*)data) {
×
UNCOV
935
      nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx);
×
936
      if (TSDB_CODE_SUCCESS != ctx.code) {
2,596,245✔
937
        code = ctx.code;
938
        goto end;
939
      }
2,596,245✔
940
      REPLACE_NODE(pNode);
2,596,245✔
941
    }
2,596,245✔
UNCOV
942
  } else {
×
943
    SNode* pNode = (SNode*)data;
944
    nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx);
2,596,245✔
945
    if (TSDB_CODE_SUCCESS != ctx.code) {
946
      code = ctx.code;
947
      goto end;
×
UNCOV
948
    }
×
UNCOV
949
  }
×
950
  
951
  if (pColList != NULL) *pColList = ctx.cInfoList;
952
  ctx.cInfoList = NULL;
2,596,245✔
953

2,596,245✔
954
end:
2,596,245✔
955
  taosHashCleanup(ctx.colHash);
2,596,245✔
956
  taosArrayDestroy(ctx.cInfoList);
2,596,245✔
UNCOV
957
  return code;
×
UNCOV
958
}
×
959

UNCOV
960
static int32_t buildGroupInfo(SColumnInfoData* pValue, int32_t i, SArray* gInfo) {
×
961
  int32_t code = TSDB_CODE_SUCCESS;
962
  SStreamGroupValue* v = taosArrayReserve(gInfo, 1);
963
  if (v == NULL) {
2,596,245✔
964
    code = terrno;
×
965
    goto end;
×
966
  }
967
  if (colDataIsNull_s(pValue, i)) {
968
    v->isNull = true;
2,597,683✔
969
  } else {
2,597,683✔
970
    v->isNull = false;
971
    char* data = colDataGetData(pValue, i);
972
    if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
6,446,251✔
973
      if (tTagIsJson(data)) {
4,833,369✔
974
        code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
4,833,369✔
975
        goto end;
976
      }
4,833,369✔
977
      if (tTagIsJsonNull(data)) {
4,833,369✔
978
        v->isNull = true;
979
        goto end;
12,229,931✔
980
      }
7,393,625✔
981
      int32_t len = getJsonValueLen(data);
7,392,187✔
982
      v->data.type = pValue->info.type;
7,398,758✔
983
      v->data.nData = len;
×
984
      v->data.pData = taosMemoryCalloc(1, len + 1);
×
985
      if (v->data.pData == NULL) {
986
        code = terrno;
7,398,758✔
987
        goto end;
4,833,369✔
988
      }
4,833,374✔
989
      memcpy(v->data.pData, data, len);
4,833,374✔
990
      qDebug("buildGroupInfo:%d add json data len:%d, data:%s", i, len, (char*)v->data.pData);
4,833,374✔
991
    } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
992
      if (varDataTLen(data) > pValue->info.bytes) {
993
        code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
994
        goto end;
995
      }
4,829,735✔
996
      v->data.type = pValue->info.type;
4,834,807✔
UNCOV
997
      v->data.nData = varDataLen(data);
×
UNCOV
998
      v->data.pData = taosMemoryCalloc(1, varDataLen(data) + 1);
×
999
      if (v->data.pData == NULL) {
1000
        code = terrno;
4,834,807✔
1001
        goto end;
4,833,369✔
1002
      }
1003
      memcpy(v->data.pData, varDataVal(data), varDataLen(data));
1004
      qDebug("buildGroupInfo:%d add var data type:%d, len:%d, data:%s", i, pValue->info.type, varDataLen(data), (char*)v->data.pData);
1,611,444✔
1005
    } else if (pValue->info.type == TSDB_DATA_TYPE_DECIMAL) {  // reader todo decimal
1,611,444✔
1006
      v->data.type = pValue->info.type;
1,612,882✔
1007
      v->data.nData = pValue->info.bytes;
1,611,444✔
1008
      v->data.pData = taosMemoryCalloc(1, pValue->info.bytes);
1,611,444✔
1009
      if (v->data.pData == NULL) {
1,611,444✔
1010
        code = terrno;
1011
        goto end;
1,611,444✔
UNCOV
1012
      }
×
1013
      memcpy(&v->data.pData, data, pValue->info.bytes);
1014
      qDebug("buildGroupInfo:%d add data type:%d, data:%"PRId64, i, pValue->info.type, v->data.val);
1015
    } else {  // reader todo decimal
1016
      v->data.type = pValue->info.type;
32,613,707✔
1017
      memcpy(&v->data.val, data, pValue->info.bytes);
1018
      qDebug("buildGroupInfo:%d add data type:%d, data:%"PRId64, i, pValue->info.type, v->data.val);
32,613,707✔
1019
    }
32,613,707✔
1020
  }
32,613,707✔
1021
end:
32,613,707✔
1022
  if (code != TSDB_CODE_SUCCESS) {
32,628,593✔
1023
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
32,628,593✔
1024
    v->isNull = true;
32,628,593✔
1025
  }
32,628,593✔
1026
  return code;
32,628,593✔
1027
}
1028

32,628,593✔
1029
static void getColInfoResultForGroupbyForStream(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo,
32,628,593✔
UNCOV
1030
                                   SStorageAPI* pAPI, SHashObj* groupIdMap) {
×
1031
  int32_t      code = TSDB_CODE_SUCCESS;
1032
  int32_t      lino = 0;
1033
  SArray*      pBlockList = NULL;
32,628,593✔
1034
  SSDataBlock* pResBlock = NULL;
32,628,593✔
UNCOV
1035
  SArray*      groupData = NULL;
×
UNCOV
1036
  SArray*      pUidTagList = NULL;
×
UNCOV
1037
  SArray*      gInfo = NULL;
×
UNCOV
1038
  int32_t      tbNameIndex = 0;
×
1039

UNCOV
1040
  int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
×
UNCOV
1041
  if (rows == 0) {
×
1042
    return;
×
1043
  }
UNCOV
1044

×
1045
  pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
UNCOV
1046
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
×
1047

UNCOV
1048
  for (int32_t i = 0; i < rows; ++i) {
×
1049
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
UNCOV
1050
    QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
×
UNCOV
1051
    STUidTagInfo info = {.uid = pkeyInfo->uid};
×
UNCOV
1052
    void*        tmp = taosArrayPush(pUidTagList, &info);
×
UNCOV
1053
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
×
1054
  }
UNCOV
1055

×
1056
  code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
1057
  if (code != TSDB_CODE_SUCCESS) {
1058
    goto end;
1059
  }
32,628,593✔
1060

32,628,575✔
1061
  SArray* pColList = NULL;
1062
  code = qGetColumnsFromNodeList(group, true, &pColList);
214,687,211✔
1063
  if (code != TSDB_CODE_SUCCESS) {
182,052,127✔
1064
    goto end;
182,061,237✔
1065
  }
182,061,237✔
1066

182,059,588✔
1067
  for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
182,059,588✔
1068
    SColumnInfo* tmp = (SColumnInfo*)taosArrayGet(pColList, i);
1069
    if (tmp != NULL && tmp->colId == -1) {
1070
      tbNameIndex = i;
32,635,084✔
1071
    }
32,628,593✔
1072
  }
UNCOV
1073
  
×
1074
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
1075
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
32,628,557✔
UNCOV
1076
  taosArrayDestroy(pColList);
×
1077
  if (pResBlock == NULL) {
1078
    code = terrno;
1079
    goto end;
32,628,557✔
1080
  }
32,628,557✔
1081

32,619,062✔
1082
  pBlockList = taosArrayInit(2, POINTER_BYTES);
×
1083
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
1084

1085
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
32,619,062✔
1086
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
32,621,597✔
1087

32,625,603✔
1088
  groupData = taosArrayInit(2, POINTER_BYTES);
32,623,737✔
UNCOV
1089
  QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
×
UNCOV
1090

×
1091
  SNode* pNode = NULL;
1092
  FOREACH(pNode, group) {
1093
    SScalarParam output = {0};
1094

1095
    switch (nodeType(pNode)) {
1096
      case QUERY_NODE_VALUE:
32,623,737✔
1097
        break;
32,624,318✔
1098
      case QUERY_NODE_COLUMN:
1099
      case QUERY_NODE_OPERATOR:
32,628,593✔
1100
      case QUERY_NODE_FUNCTION: {
32,628,593✔
1101
        SExprNode* expNode = (SExprNode*)pNode;
1102
        code = createResultData(&expNode->resType, rows, &output);
32,628,593✔
1103
        if (code != TSDB_CODE_SUCCESS) {
32,618,324✔
1104
          goto end;
1105
        }
32,618,930✔
1106
        break;
66,950,510✔
1107
      }
34,327,797✔
1108

1109
      default:
34,323,081✔
UNCOV
1110
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
UNCOV
1111
        goto end;
×
1112
    }
34,323,594✔
1113

1114
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
1115
      SColumnNode*     pSColumnNode = (SColumnNode*)pNode;
34,323,594✔
1116
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
34,323,594✔
1117
      QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
34,332,173✔
UNCOV
1118
      code = colDataAssign(output.columnData, pColInfo, rows, NULL);
×
1119
    } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
1120
      continue;
34,332,173✔
1121
    } else {
1122
      code = scalarCalculate(pNode, pBlockList, &output, NULL, NULL);
1123
    }
×
1124

×
UNCOV
1125
    if (code != TSDB_CODE_SUCCESS) {
×
1126
      releaseColInfoData(output.columnData);
1127
      goto end;
1128
    }
34,332,173✔
1129

34,140,762✔
1130
    void* tmp = taosArrayPush(groupData, &output.columnData);
34,140,762✔
1131
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
34,130,502✔
1132
  }
34,130,502✔
1133

192,411✔
UNCOV
1134
  for (int i = 0; i < rows; i++) {
×
1135
    gInfo = taosArrayInit(taosArrayGetSize(groupData), sizeof(SStreamGroupValue));
1136
    QUERY_CHECK_NULL(gInfo, code, lino, end, terrno);
183,979✔
1137

1138
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
1139
    QUERY_CHECK_NULL(info, code, lino, end, terrno);
34,316,065✔
UNCOV
1140

×
UNCOV
1141
    for (int j = 0; j < taosArrayGetSize(groupData); j++) {
×
1142
      SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
1143
        int32_t ret = buildGroupInfo(pValue, i, gInfo);
1144
        if (ret != TSDB_CODE_SUCCESS) {
34,327,134✔
1145
          qError("buildGroupInfo failed at line %d since %s", __LINE__, tstrerror(ret));
34,327,134✔
1146
          goto end;
1147
        }
1148
        if (j == tbNameIndex) {
32,621,210✔
1149
          SStreamGroupValue* v = taosArrayGetLast(gInfo);
1150
          if (v != NULL){
66,928,862✔
1151
            v->isTbname = true;
34,304,618✔
1152
            v->uid = info->uid;
34,304,618✔
1153
          }
1154
        }
1155
    }
32,603,225✔
1156

32,605,139✔
1157
    int32_t ret = taosHashPut(groupIdMap, &info->uid, sizeof(info->uid), &gInfo, POINTER_BYTES);
1158
    if (ret != TSDB_CODE_SUCCESS) {
32,605,139✔
1159
      qError("put groupid to map failed at line %d since %s", __LINE__, tstrerror(ret));
32,597,839✔
UNCOV
1160
      goto end;
×
UNCOV
1161
    }
×
1162
    qDebug("put groupid to map gid:%" PRIu64, info->uid);
1163
    gInfo = NULL;
1164
  }
32,597,839✔
1165

15,014,951✔
1166
end:
15,001,652✔
1167
  blockDataDestroy(pResBlock);
15,014,951✔
UNCOV
1168
  taosArrayDestroy(pBlockList);
×
UNCOV
1169
  taosArrayDestroyEx(pUidTagList, freeItem);
×
1170
  taosArrayDestroyP(groupData, releaseColInfoData);
1171
  taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
1172

1173
  if (code != TSDB_CODE_SUCCESS) {
214,371,132✔
1174
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
181,750,074✔
1175
  }
181,792,073✔
1176
}
1177

181,792,073✔
1178
int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo, uint8_t* digest,
1,131,832✔
1179
                                   SStorageAPI* pAPI, bool initRemainGroups, SHashObj* groupIdMap) {
1180
  int32_t      code = TSDB_CODE_SUCCESS;
1181
  int32_t      lino = 0;
181,788,299✔
1182
  SArray*      pBlockList = NULL;
181,788,299✔
1183
  SSDataBlock* pResBlock = NULL;
376,495,288✔
1184
  void*        keyBuf = NULL;
194,728,111✔
1185
  SArray*      groupData = NULL;
1186
  SArray*      pUidTagList = NULL;
194,738,393✔
1187
  SArray*      tableList = NULL;
2,067,373✔
1188
  SArray*      gInfo = NULL;
2,069,594✔
UNCOV
1189

×
UNCOV
1190
  int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
×
UNCOV
1191
  if (rows == 0) {
×
1192
    return TSDB_CODE_SUCCESS;
1193
  } 
1194

1195
  T_MD5_CTX context = {0};
389,481,989✔
1196
  if (tsTagFilterCache && groupIdMap == NULL) {
628,683✔
1197
    SNodeListNode* listNode = NULL;
1198
    code = nodesMakeNode(QUERY_NODE_NODE_LIST, (SNode**)&listNode);
194,112,692✔
1199
    if (TSDB_CODE_SUCCESS != code) {
194,100,386✔
1200
      goto end;
194,106,626✔
1201
    }
352,110✔
1202
    listNode->pNodeList = group;
16,005✔
1203
    code = genTbGroupDigest((SNode*)listNode, digest, &context);
16,005✔
1204
    QUERY_CHECK_CODE(code, lino, end);
1205

336,105✔
1206
    nodesFree(listNode);
×
1207

×
1208
    code = pAPI->metaFn.metaGetCachedTbGroup(pVnode, pTableListInfo->idInfo.suid, context.digest,
1209
                                             tListLen(context.digest), &tableList);
336,105✔
1210
    QUERY_CHECK_CODE(code, lino, end);
336,105✔
1211

336,105✔
1212
    if (tableList) {
193,741,959✔
1213
      taosArrayDestroy(pTableListInfo->pTableList);
165,786,360✔
1214
      pTableListInfo->pTableList = tableList;
3,472✔
UNCOV
1215
      qDebug("retrieve tb group list from cache, numOfTables:%d",
×
1216
             (int32_t)taosArrayGetSize(pTableListInfo->pTableList));
×
1217
      goto end;
1218
    }
×
1219
  }
×
1220

1221
  pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
165,788,924✔
UNCOV
1222
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
×
UNCOV
1223

×
1224
  for (int32_t i = 0; i < rows; ++i) {
1225
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
165,803,367✔
1226
    QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
165,802,451✔
1227
    STUidTagInfo info = {.uid = pkeyInfo->uid};
1228
    void*        tmp = taosArrayPush(pUidTagList, &info);
1229
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
27,949,711✔
1230
  }
27,961,470✔
1231

1232
  code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
1233
  if (code != TSDB_CODE_SUCCESS) {
1234
    goto end;
1235
  }
181,741,322✔
1236

181,741,322✔
1237
  SArray* pColList = NULL;
181,777,808✔
1238
  code = qGetColumnsFromNodeList(group, true, &pColList); 
1,129,642✔
1239
  if (code != TSDB_CODE_SUCCESS) {
1,131,832✔
UNCOV
1240
    goto end;
×
UNCOV
1241
  }
×
1242

1243
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
1,131,832✔
1244
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
1,131,832✔
1245
  taosArrayDestroy(pColList);
1246
  if (pResBlock == NULL) {
181,779,998✔
1247
    code = terrno;
1248
    goto end;
85,066,375✔
1249
  }
1250

85,044,111✔
1251
  //  int64_t st1 = taosGetTimestampUs();
4,921,475✔
1252
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
1253

85,044,111✔
1254
  pBlockList = taosArrayInit(2, POINTER_BYTES);
1255
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
1256

1257
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
32,621,058✔
UNCOV
1258
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
×
UNCOV
1259

×
1260
  groupData = taosArrayInit(2, POINTER_BYTES);
UNCOV
1261
  QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
×
1262

UNCOV
1263
  SNode* pNode = NULL;
×
UNCOV
1264
  FOREACH(pNode, group) {
×
1265
    SScalarParam output = {0};
1266

1267
    switch (nodeType(pNode)) {
1268
      case QUERY_NODE_VALUE:
1269
        break;
1270
      case QUERY_NODE_COLUMN:
32,630,944✔
1271
      case QUERY_NODE_OPERATOR:
32,621,172✔
1272
      case QUERY_NODE_FUNCTION: {
32,627,881✔
1273
        SExprNode* expNode = (SExprNode*)pNode;
32,619,373✔
1274
        code = createResultData(&expNode->resType, rows, &output);
32,614,443✔
1275
        if (code != TSDB_CODE_SUCCESS) {
32,614,953✔
1276
          goto end;
32,609,115✔
1277
        }
1278
        break;
32,608,422✔
1279
      }
16,005✔
1280

1281
      default:
32,608,465✔
1282
        code = TSDB_CODE_OPS_NOT_SUPPORT;
1283
        goto end;
1284
    }
7,169,428✔
1285

7,169,428✔
1286
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
7,170,828✔
1287
      SColumnNode*     pSColumnNode = (SColumnNode*)pNode;
1288
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
7,179,625✔
1289
      QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
7,179,625✔
1290
      code = colDataAssign(output.columnData, pColInfo, rows, NULL);
53,418✔
1291
    } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
1292
      continue;
7,126,207✔
1293
    } else {
1294
      code = scalarCalculate(pNode, pBlockList, &output, NULL, NULL);
1295
    }
1296

4,654,913✔
1297
    if (code != TSDB_CODE_SUCCESS) {
4,654,913✔
1298
      releaseColInfoData(output.columnData);
4,654,913✔
1299
      goto end;
4,654,913✔
1300
    }
4,654,913✔
1301

1302
    void* tmp = taosArrayPush(groupData, &output.columnData);
4,656,382✔
1303
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
4,657,851✔
1304
  }
1305

12,319,338✔
1306
  int32_t keyLen = 0;
7,657,715✔
1307
  SNode*  node;
7,653,943✔
UNCOV
1308
  FOREACH(node, group) {
×
UNCOV
1309
    SExprNode* pExpr = (SExprNode*)node;
×
UNCOV
1310
    keyLen += pExpr->resType.bytes;
×
1311
  }
1312

1313
  int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
7,658,549✔
1314
  keyLen += nullFlagSize;
7,658,020✔
1315

7,658,020✔
1316
  keyBuf = taosMemoryCalloc(1, keyLen);
7,658,020✔
1317
  if (keyBuf == NULL) {
1318
    code = terrno;
1319
    goto end;
4,657,851✔
1320
  }
1321

1322
  if (initRemainGroups) {
4,657,851✔
1323
    pTableListInfo->remainGroups =
1324
        taosHashInit(rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
1325
    if (pTableListInfo->remainGroups == NULL) {
4,657,151✔
1326
      code = terrno;
4,653,550✔
1327
      goto end;
4,653,550✔
1328
    }
4,652,081✔
1329
  }
4,652,744✔
1330

4,652,744✔
1331
  for (int i = 0; i < rows; i++) {
1332
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
7,657,424✔
1333
    QUERY_CHECK_NULL(info, code, lino, end, terrno);
3,000,063✔
1334

3,003,770✔
1335
    if (groupIdMap != NULL){
3,003,770✔
1336
      gInfo = taosArrayInit(taosArrayGetSize(groupData), sizeof(SStreamGroupValue));
3,003,770✔
1337
    }
30,860✔
1338
    
1339
    char* isNull = (char*)keyBuf;
1340
    char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(group);
2,973,820✔
1341
    for (int j = 0; j < taosArrayGetSize(groupData); j++) {
2,973,820✔
1342
      SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
1343

1344
      if (groupIdMap != NULL && gInfo != NULL) {
4,657,361✔
1345
        int32_t ret = buildGroupInfo(pValue, i, gInfo);
4,657,361✔
1346
        if (ret != TSDB_CODE_SUCCESS) {
4,648,653✔
UNCOV
1347
          qError("buildGroupInfo failed at line %d since %s", __LINE__, tstrerror(ret));
×
UNCOV
1348
          taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
×
1349
          gInfo = NULL;
1350
        }
4,653,684✔
1351
      }
1352
      
UNCOV
1353
      if (colDataIsNull_s(pValue, i)) {
×
UNCOV
1354
        isNull[j] = 1;
×
1355
      } else {
×
1356
        isNull[j] = 0;
1357
        char* data = colDataGetData(pValue, i);
×
UNCOV
1358
        if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
×
1359
          if (tTagIsJson(data)) {
1360
            code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
UNCOV
1361
            goto end;
×
1362
          }
1363
          if (tTagIsJsonNull(data)) {
1364
            isNull[j] = 1;
55,531,994✔
1365
            continue;
55,531,994✔
1366
          }
55,531,994✔
1367
          int32_t len = getJsonValueLen(data);
1368
          memcpy(pStart, data, len);
55,531,994✔
UNCOV
1369
          pStart += len;
×
1370
        } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
1371
          if (IS_STR_DATA_BLOB(pValue->info.type)) {
1372
            if (blobDataTLen(data) > TSDB_MAX_BLOB_LEN) {
55,535,639✔
1373
              code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
1374
              goto end;
1375
            }
151,749,616✔
1376
            memcpy(pStart, data, blobDataTLen(data));
151,749,616✔
1377
            pStart += blobDataTLen(data);
115,962,216✔
1378
          } else {
1379
            if (varDataTLen(data) > pValue->info.bytes) {
35,821,181✔
1380
              code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
4,991,408✔
1381
              goto end;
1382
            }
30,817,491✔
1383
            memcpy(pStart, data, varDataTLen(data));
1384
            pStart += varDataTLen(data);
1385
          }
151,759,493✔
1386
        } else {
151,759,493✔
1387
          memcpy(pStart, data, pValue->info.bytes);
151,759,493✔
1388
          pStart += pValue->info.bytes;
1389
        }
151,784,769✔
1390
      }
115,973,780✔
1391
    }
1392

1393
    int32_t len = (int32_t)(pStart - (char*)keyBuf);
151,681,264✔
1394
    info->groupId = calcGroupId(keyBuf, len);
120,889,754✔
1395
    if (groupIdMap != NULL && gInfo != NULL) {
1396
      int32_t ret = taosHashPut(groupIdMap, &info->groupId, sizeof(info->groupId), &gInfo, POINTER_BYTES);
1397
      if (ret != TSDB_CODE_SUCCESS) {
30,780,398✔
1398
        qError("put groupid to map failed at line %d since %s", __LINE__, tstrerror(ret));
30,780,398✔
1399
        taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
30,780,398✔
1400
      }
1401
      qDebug("put groupid to map gid:%" PRIu64, info->groupId);
30,816,104✔
1402
      gInfo = NULL;
30,802,129✔
UNCOV
1403
    }
×
1404
    if (initRemainGroups) {
1405
      // groupId ~ table uid
1406
      code = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid),
30,802,129✔
1407
                         sizeof(info->uid));
95,052,301✔
1408
      if (code == TSDB_CODE_DUP_KEY) {
64,312,924✔
1409
        code = TSDB_CODE_SUCCESS;
64,312,924✔
1410
      }
44,912✔
1411
      QUERY_CHECK_CODE(code, lino, end);
44,912✔
1412
    }
1413
  }
64,238,900✔
1414

1415
  if (tsTagFilterCache && groupIdMap == NULL) {
1416
    tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
30,784,289✔
1417
    QUERY_CHECK_NULL(tableList, code, lino, end, terrno);
30,790,216✔
1418

1419
    code = pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest,
30,809,673✔
1420
                                              tListLen(context.digest), tableList,
44,912✔
1421
                                              taosArrayGetSize(tableList) * sizeof(STableKeyInfo));
1422
    QUERY_CHECK_CODE(code, lino, end);
1423
  }
30,810,935✔
1424

1425
  //  int64_t st2 = taosGetTimestampUs();
1426
  //  qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
1427

180,284,376✔
1428
end:
1429
  taosMemoryFreeClear(keyBuf);
180,284,376✔
1430
  blockDataDestroy(pResBlock);
61,506✔
1431
  taosArrayDestroy(pBlockList);
1432
  taosArrayDestroyEx(pUidTagList, freeItem);
1433
  taosArrayDestroyP(groupData, releaseColInfoData);
180,235,254✔
1434
  taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
180,235,254✔
1435

170,335,205✔
1436
  if (code != TSDB_CODE_SUCCESS) {
1437
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1438
  }
9,825,815✔
1439
  return code;
4,654,913✔
1440
}
5,169,485✔
1441

4,650,295✔
1442
static int32_t nameComparFn(const void* p1, const void* p2) {
4,657,851✔
1443
  const char* pName1 = *(const char**)p1;
1444
  const char* pName2 = *(const char**)p2;
4,654,913✔
1445

4,654,913✔
UNCOV
1446
  int32_t ret = strcmp(pName1, pName2);
×
1447
  if (ret == 0) {
1448
    return 0;
1449
  } else {
4,654,913✔
1450
    return (ret > 0) ? 1 : -1;
4,652,081✔
1451
  }
4,652,081✔
1452
}
1453

4,652,081✔
1454
static SArray* getTableNameList(const SNodeListNode* pList) {
4,654,384✔
1455
  int32_t    code = TSDB_CODE_SUCCESS;
4,752✔
1456
  int32_t    lino = 0;
4,752✔
UNCOV
1457
  int32_t    len = LIST_LENGTH(pList->pNodeList);
×
UNCOV
1458
  SListCell* cell = pList->pNodeList->pHead;
×
1459

1460
  SArray* pTbList = taosArrayInit(len, POINTER_BYTES);
1461
  QUERY_CHECK_NULL(pTbList, code, lino, _end, terrno);
4,718,736✔
1462

4,713,984✔
1463
  for (int i = 0; i < pList->pNodeList->length; i++) {
4,713,984✔
UNCOV
1464
    SValueNode* valueNode = (SValueNode*)cell->pNode;
×
UNCOV
1465
    if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
×
1466
      terrno = TSDB_CODE_INVALID_PARA;
1467
      taosArrayDestroy(pTbList);
4,713,984✔
1468
      return NULL;
4,713,984✔
UNCOV
1469
    }
×
UNCOV
1470

×
1471
    char* name = varDataVal(valueNode->datum.p);
1472
    void* tmp = taosArrayPush(pTbList, &name);
1473
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1474
    cell = cell->pNext;
1475
  }
12,220,974✔
1476

7,597,991✔
1477
  size_t numOfTables = taosArrayGetSize(pTbList);
1478

7,595,716✔
1479
  // order the name
7,599,423✔
1480
  taosArraySort(pTbList, nameComparFn);
4,658,119✔
1481

4,658,119✔
1482
  // remove the duplicates
4,653,206✔
1483
  SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
4,624,541✔
1484
  QUERY_CHECK_NULL(pNewList, code, lino, _end, terrno);
21,184✔
1485
  void* tmpTbl = taosArrayGet(pTbList, 0);
1486
  QUERY_CHECK_NULL(tmpTbl, code, lino, _end, terrno);
4,603,357✔
1487
  void* tmp = taosArrayPush(pNewList, tmpTbl);
4,600,981✔
1488
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
4,606,810✔
1489

4,606,810✔
UNCOV
1490
  for (int32_t i = 1; i < numOfTables; ++i) {
×
1491
    char** name = taosArrayGetLast(pNewList);
1492
    char** nameInOldList = taosArrayGet(pTbList, i);
1493
    QUERY_CHECK_NULL(nameInOldList, code, lino, _end, terrno);
1494
    if (strcmp(*name, *nameInOldList) == 0) {
28,665✔
1495
      continue;
28,398✔
1496
    }
28,398✔
1497

1498
    tmp = taosArrayPush(pNewList, nameInOldList);
1499
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1500
  }
2,943,302✔
1501

1502
_end:
1503
  taosArrayDestroy(pTbList);
1504
  if (code != TSDB_CODE_SUCCESS) {
4,622,983✔
1505
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
4,629,453✔
1506
    return NULL;
4,629,453✔
1507
  }
1508
  return pNewList;
1509
}
5,176,077✔
1510

1511
static int tableUidCompare(const void* a, const void* b) {
1512
  uint64_t u1 = *(uint64_t*)a;
177,263,516✔
1513
  uint64_t u2 = *(uint64_t*)b;
1514

177,263,516✔
1515
  if (u1 == u2) {
177,263,516✔
1516
    return 0;
177,263,516✔
1517
  }
177,310,071✔
1518

177,264,540✔
1519
  return u1 < u2 ? -1 : 1;
1520
}
366,362,809✔
1521

189,123,624✔
1522
static int32_t filterTableInfoCompare(const void* a, const void* b) {
189,020,732✔
1523
  STUidTagInfo* p1 = (STUidTagInfo*)a;
189,027,345✔
1524
  STUidTagInfo* p2 = (STUidTagInfo*)b;
189,027,345✔
1525

188,939,672✔
1526
  if (p1->uid == p2->uid) {
189,056,064✔
1527
    return 0;
1528
  }
1529

177,251,035✔
1530
  return p1->uid < p2->uid ? -1 : 1;
177,230,579✔
UNCOV
1531
}
×
UNCOV
1532

×
UNCOV
1533
static FilterCondType checkTagCond(SNode* cond) {
×
1534
  if (nodeType(cond) == QUERY_NODE_OPERATOR) {
1535
    return FILTER_NO_LOGIC;
1536
  }
177,230,579✔
1537
  if (nodeType(cond) != QUERY_NODE_LOGIC_CONDITION || ((SLogicConditionNode*)cond)->condType != LOGIC_COND_TYPE_AND) {
1538
    return FILTER_AND;
177,236,815✔
1539
  }
1540
  return FILTER_OTHER;
2,147,483,647✔
1541
}
2,049,733,042✔
1542

2,049,756,092✔
1543
static int32_t optimizeTbnameInCond(void* pVnode, int64_t suid, SArray* list, SNode* cond, SStorageAPI* pAPI) {
1544
  int32_t ret = -1;
2,147,483,647✔
1545
  int32_t ntype = nodeType(cond);
2,111,882,365✔
1546

2,111,835,255✔
1547
  if (ntype == QUERY_NODE_OPERATOR) {
1548
    ret = optimizeTbnameInCondImpl(pVnode, list, cond, pAPI, suid);
2,111,835,255✔
1549
  }
26,739,054✔
1550

26,748,736✔
1551
  if (ntype != QUERY_NODE_LOGIC_CONDITION || ((SLogicConditionNode*)cond)->condType != LOGIC_COND_TYPE_AND) {
4,601,174✔
1552
    return ret;
1553
  }
22,158,234✔
1554

22,160,926✔
1555
  bool                 hasTbnameCond = false;
1556
  SLogicConditionNode* pNode = (SLogicConditionNode*)cond;
1557
  SNodeList*           pList = (SNodeList*)pNode->pParameterList;
26,761,657✔
1558

26,769,180✔
1559
  int32_t len = LIST_LENGTH(pList);
1560
  if (len <= 0) {
1561
    return ret;
1562
  }
1563

2,084,943,921✔
1564
  SListCell* cell = pList->pHead;
2,085,125,155✔
1565
  for (int i = 0; i < len; i++) {
2,085,147,718✔
UNCOV
1566
    if (cell == NULL) break;
×
1567
    if (optimizeTbnameInCondImpl(pVnode, list, cell->pNode, pAPI, suid) == 0) {
1568
      hasTbnameCond = true;
2,085,220,033✔
1569
      break;
1570
    }
2,085,762,573✔
1571
    cell = cell->pNext;
39,496,447✔
1572
  }
2,046,263,163✔
1573

6,492,916✔
1574
  taosArraySort(list, filterTableInfoCompare);
6,492,916✔
1575
  taosArrayRemoveDuplicate(list, filterTableInfoCompare, NULL);
2,147,483,647✔
1576

1,320,544,075✔
UNCOV
1577
  if (hasTbnameCond) {
×
1578
    ret = pAPI->metaFn.getTableTagsByUid(pVnode, suid, list);
1579
  }
1,320,276,615✔
1580

1,320,435,915✔
1581
  return ret;
1,320,435,915✔
1582
}
1,320,437,728✔
1583

1,320,427,098✔
1584
// only return uid that does not contained in pExistedUidList
1585
static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, SNode* pTagCond, SStorageAPI* pStoreAPI,
1586
                                        uint64_t suid) {
1587
  if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) {
1,320,587,378✔
1588
    return -1;
1,320,302,009✔
1589
  }
1590

719,370,119✔
1591
  SOperatorNode* pNode = (SOperatorNode*)pTagCond;
719,690,930✔
1592
  if (pNode->opType != OP_TYPE_IN) {
1593
    return -1;
1594
  }
1595

1596
  if ((pNode->pLeft != NULL && ((nodeType(pNode->pLeft) == QUERY_NODE_FUNCTION &&
1597
                                 ((SFunctionNode*)pNode->pLeft)->funcType == FUNCTION_TYPE_TBNAME)) ||
1598
       (nodeType(pNode->pLeft) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pLeft)->colType == COLUMN_TYPE_TBNAME)) &&
1599
      (pNode->pRight != NULL && nodeType(pNode->pRight) == QUERY_NODE_NODE_LIST)) {
1600
    SNodeListNode* pList = (SNodeListNode*)pNode->pRight;
1601

1602
    int32_t len = LIST_LENGTH(pList->pNodeList);
1603
    if (len <= 0) {
1604
      return -1;
1605
    }
177,378,198✔
1606

177,401,376✔
1607
    SArray*   pTbList = getTableNameList(pList);
42,450✔
UNCOV
1608
    int32_t   numOfTables = taosArrayGetSize(pTbList);
×
UNCOV
1609
    SHashObj* uHash = NULL;
×
UNCOV
1610

×
1611
    size_t numOfExisted = taosArrayGetSize(pExistedUidList);  // len > 0 means there already have uids
1612
    if (numOfExisted > 0) {
177,359,000✔
1613
      uHash = taosHashInit(numOfExisted / 0.7, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
1614
      if (!uHash) {
1615
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
128,045,797✔
1616
        return terrno;
1617
      }
128,045,797✔
1618

1619
      for (int i = 0; i < numOfExisted; i++) {
127,990,872✔
1620
        STUidTagInfo* pTInfo = taosArrayGet(pExistedUidList, i);
128,009,495✔
1621
        if (!pTInfo) {
1,969,051,572✔
1622
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
1,840,805,951✔
1623
          return terrno;
917,301,789✔
1624
        }
917,357,170✔
UNCOV
1625
        int32_t tempRes = taosHashPut(uHash, &pTInfo->uid, sizeof(uint64_t), &i, sizeof(i));
×
UNCOV
1626
        if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
×
1627
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
1628
          return tempRes;
917,357,170✔
1629
        }
917,364,922✔
1630
      }
1631
    }
917,606,709✔
1632

917,606,709✔
1633
    for (int i = 0; i < numOfTables; i++) {
917,517,323✔
UNCOV
1634
      char* name = taosArrayGetP(pTbList, i);
×
1635

1636
      uint64_t uid = 0, csuid = 0;
1637
      if (pStoreAPI->metaFn.getTableUidByName(pVnode, name, &uid) == 0) {
917,517,323✔
1638
        ETableType tbType = TSDB_TABLE_MAX;
18,450✔
1639
        if (pStoreAPI->metaFn.getTableTypeSuidByName(pVnode, name, &tbType, &csuid) == 0 &&
18,450✔
UNCOV
1640
            tbType == TSDB_CHILD_TABLE) {
×
1641
          if (suid != csuid) {
1642
            continue;
1643
          }
1644
          if (NULL == uHash || taosHashGet(uHash, &uid, sizeof(uid)) == NULL) {
1645
            STUidTagInfo s = {.uid = uid, .name = name, .pTagVal = NULL};
1646
            void*        tmp = taosArrayPush(pExistedUidList, &s);
128,245,621✔
1647
            if (!tmp) {
1648
              return terrno;
1649
            }
151,771,934✔
1650
          }
151,771,934✔
1651
        } else {
151,771,934✔
1652
          taosArrayDestroy(pTbList);
151,747,282✔
1653
          taosHashCleanup(uHash);
123,575,017✔
1654
          return -1;
1655
        }
1656
      } else {
351,968,126✔
1657
        //        qWarn("failed to get tableIds from by table name: %s, reason: %s", name, tstrerror(terrno));
323,790,283✔
1658
        terrno = 0;
323,801,301✔
UNCOV
1659
      }
×
UNCOV
1660
    }
×
1661

1662
    taosHashCleanup(uHash);
323,801,301✔
1663
    taosArrayDestroy(pTbList);
323,795,861✔
1664
    return 0;
323,795,861✔
UNCOV
1665
  }
×
UNCOV
1666

×
1667
  return -1;
1668
}
1669

28,177,843✔
1670
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
1671
                                        SStorageAPI* pStorageAPI) {
1672
  int32_t      code = TSDB_CODE_SUCCESS;
1,707,320,281✔
1673
  int32_t      lino = 0;
1674
  SSDataBlock* pResBlock = NULL;
1,707,320,281✔
1675
  code = createDataBlock(&pResBlock);
1,707,543,766✔
1676
  QUERY_CHECK_CODE(code, lino, _end);
1,555,367,575✔
1677

1678
  for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
1679
    SColumnInfoData colInfo = {0};
152,176,191✔
1680
    void*           tmp = taosArrayGet(pColList, i);
1681
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
151,747,736✔
1682
    colInfo.info = *(SColumnInfo*)tmp;
151,747,736✔
1683
    code = blockDataAppendColInfo(pResBlock, &colInfo);
151,747,736✔
1684
    QUERY_CHECK_CODE(code, lino, _end);
151,747,736✔
1685
  }
151,738,727✔
1686

151,718,006✔
1687
  code = blockDataEnsureCapacity(pResBlock, numOfTables);
1688
  if (code != TSDB_CODE_SUCCESS) {
151,718,006✔
1689
    terrno = code;
1690
    blockDataDestroy(pResBlock);
1691
    return NULL;
151,734,049✔
1692
  }
151,690,730✔
1693

1694
  pResBlock->info.rows = numOfTables;
151,690,730✔
1695

151,752,277✔
1696
  int32_t numOfCols = taosArrayGetSize(pResBlock->pDataBlock);
1697

151,752,277✔
1698
  for (int32_t i = 0; i < numOfTables; i++) {
1699
    STUidTagInfo* p1 = taosArrayGet(pUidTagList, i);
151,732,828✔
1700
    QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
151,670,633✔
1701

4,629,453✔
1702
    for (int32_t j = 0; j < numOfCols; j++) {
1703
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
4,629,453✔
1704
      QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
4,629,453✔
1705

4,629,453✔
1706
      if (pColInfo->info.colId == -1) {  // tbname
1707
        char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
13,981,135✔
1708
        if (p1->name != NULL) {
9,351,682✔
1709
          STR_TO_VARSTR(str, p1->name);
9,351,682✔
1710
        } else {  // name is not retrieved during filter
9,351,682✔
1711
          code = pStorageAPI->metaFn.getTableNameByUid(pVnode, p1->uid, str);
9,351,682✔
1712
          QUERY_CHECK_CODE(code, lino, _end);
1713
        }
4,629,453✔
1714

1715
        code = colDataSetVal(pColInfo, i, str, false);
265,067,479✔
1716
        QUERY_CHECK_CODE(code, lino, _end);
118,008,280✔
1717
#if TAG_FILTER_DEBUG
33,915,160✔
1718
        qDebug("tagfilter uid:%ld, tbname:%s", *uid, str + 2);
1719
#endif
113,144,039✔
1720
      } else {
1721
        STagVal tagVal = {0};
147,138,581✔
UNCOV
1722
        tagVal.cid = pColInfo->info.colId;
×
UNCOV
1723
        if (p1->pTagVal == NULL) {
×
UNCOV
1724
          colDataSetNULL(pColInfo, i);
×
1725
        } else {
1726
          const char* p = pStorageAPI->metaFn.extractTagVal(p1->pTagVal, pColInfo->info.type, &tagVal);
1727

1728
          if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) {
151,768,034✔
1729
            colDataSetNULL(pColInfo, i);
151,786,527✔
1730
          } else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
23,539,241✔
1731
            code = colDataSetVal(pColInfo, i, p, false);
1732
            QUERY_CHECK_CODE(code, lino, _end);
1733
          } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
128,247,286✔
1734
            if (IS_STR_DATA_BLOB(pColInfo->info.type)) {
128,254,875✔
1735
              QUERY_CHECK_CODE(code = TSDB_CODE_BLOB_NOT_SUPPORT_TAG, lino, _end);
128,105,699✔
UNCOV
1736
            }
×
1737
            char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
1738
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
128,105,699✔
1739
            varDataSetLen(tmp, tagVal.nData);
128,210,074✔
1740
            memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
128,181,937✔
UNCOV
1741
            code = colDataSetVal(pColInfo, i, tmp, false);
×
UNCOV
1742
#if TAG_FILTER_DEBUG
×
1743
            qDebug("tagfilter varch:%s", tmp + 2);
1744
#endif
1745
            taosMemoryFree(tmp);
1746
            QUERY_CHECK_CODE(code, lino, _end);
1747
          } else {
128,181,937✔
1748
            code = colDataSetVal(pColInfo, i, (const char*)&tagVal.i64, false);
128,232,418✔
1749
            QUERY_CHECK_CODE(code, lino, _end);
1750
#if TAG_FILTER_DEBUG
128,238,357✔
1751
            if (pColInfo->info.type == TSDB_DATA_TYPE_INT) {
128,238,357✔
1752
              qDebug("tagfilter int:%d", *(int*)(&tagVal.i64));
1753
            } else if (pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE) {
128,238,357✔
1754
              qDebug("tagfilter double:%f", *(double*)(&tagVal.i64));
128,157,781✔
UNCOV
1755
            }
×
UNCOV
1756
#endif
×
1757
          }
1758
        }
1759
      }
128,157,781✔
1760
    }
128,043,886✔
1761
  }
10,670✔
1762

10,670✔
1763
_end:
10,670✔
1764
  if (code != TSDB_CODE_SUCCESS) {
1765
    blockDataDestroy(pResBlock);
1766
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
128,033,216✔
1767
    terrno = code;
128,250,986✔
UNCOV
1768
    return NULL;
×
UNCOV
1769
  }
×
1770
  return pResBlock;
1771
}
128,250,986✔
1772

1773
static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList,
151,788,856✔
1774
                                 bool* pResultList, bool addUid) {
151,756,147✔
1775
  taosArrayClear(pUidList);
10,670✔
1776

1777
  STableKeyInfo info = {.uid = 0, .groupId = 0};
151,756,147✔
1778
  int32_t       numOfTables = taosArrayGetSize(pUidTagList);
151,665,514✔
1779
  for (int32_t i = 0; i < numOfTables; ++i) {
151,652,063✔
1780
    if (pResultList[i]) {
1781
      STUidTagInfo* tmpTag = (STUidTagInfo*)taosArrayGet(pUidTagList, i);
151,753,096✔
1782
      if (!tmpTag) {
151,736,004✔
1783
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
151,726,839✔
1784
        return terrno;
1785
      }
1786
      uint64_t uid = tmpTag->uid;
1787
      qDebug("tagfilter get uid:%" PRId64 ", res:%d", uid, pResultList[i]);
1788

1789
      info.uid = uid;
1790
      void* p = taosArrayPush(pListInfo->pTableList, &info);
1791
      if (p == NULL) {
33,210✔
1792
        return terrno;
33,210✔
1793
      }
33,210✔
1794

14,760✔
1795
      if (addUid) {
1796
        void* tmp = taosArrayPush(pUidList, &uid);
18,450✔
1797
        if (tmp == NULL) {
18,450✔
1798
          return terrno;
7,380✔
1799
        }
1800
      }
11,070✔
1801
    }
11,070✔
UNCOV
1802
  }
×
1803

1804
  return TSDB_CODE_SUCCESS;
11,070✔
1805
}
11,070✔
1806

11,070✔
1807
static int32_t copyExistedUids(SArray* pUidTagList, const SArray* pUidList) {
11,070✔
1808
  int32_t code = TSDB_CODE_SUCCESS;
11,070✔
UNCOV
1809
  int32_t numOfExisted = taosArrayGetSize(pUidList);
×
1810
  if (numOfExisted == 0) {
1811
    return code;
11,070✔
1812
  }
11,070✔
1813

1814
  for (int32_t i = 0; i < numOfExisted; ++i) {
11,070✔
1815
    uint64_t* uid = taosArrayGet(pUidList, i);
1816
    if (!uid) {
1817
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
1,706,952,176✔
1818
      return terrno;
1819
    }
1,706,952,176✔
1820
    STUidTagInfo info = {.uid = *uid};
1,706,952,176✔
1821
    void*        tmp = taosArrayPush(pUidTagList, &info);
1,706,952,176✔
1822
    if (!tmp) {
1,706,952,176✔
1823
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
1824
      return code;
1,707,125,436✔
1825
    }
1,706,703,591✔
1826
  }
1827
  return code;
1,706,946,249✔
1828
}
1,706,212,571✔
1829

1830
static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SNode* pTagCond, void* pVnode,
1,706,212,571✔
1831
                                 SIdxFltStatus status, SStorageAPI* pAPI, bool addUid, bool* listAdded, void* pStreamInfo) {
1,706,289,911✔
1832
  *listAdded = false;
736,847,557✔
1833
  if (pTagCond == NULL) {
737,144,857✔
1834
    return TSDB_CODE_SUCCESS;
735,449,178✔
1835
  }
735,456,031✔
1836

1837
  terrno = TSDB_CODE_SUCCESS;
737,339,461✔
1838

737,258,009✔
1839
  int32_t      lino = 0;
1840
  int32_t      code = TSDB_CODE_SUCCESS;
969,877,325✔
1841
  SArray*      pBlockList = NULL;
1842
  SSDataBlock* pResBlock = NULL;
969,742,935✔
1843
  SScalarParam output = {0};
153,801✔
1844
  SArray*      pUidTagList = NULL;
11,070✔
1845

11,070✔
1846
  SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
11,070✔
1847

1848
  //  int64_t stt = taosGetTimestampUs();
11,070✔
1849
  pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
11,070✔
1850
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
11,070✔
UNCOV
1851

×
UNCOV
1852
  code = copyExistedUids(pUidTagList, pUidList);
×
UNCOV
1853
  QUERY_CHECK_CODE(code, lino, end);
×
1854

1855
  FilterCondType condType = checkTagCond(pTagCond);
11,070✔
1856

11,070✔
1857
  int32_t filter = optimizeTbnameInCond(pVnode, pListInfo->idInfo.suid, pUidTagList, pTagCond, pAPI);
1858
  if (filter == 0) {  // tbname in filter is activated, do nothing and return
131,661✔
1859
    taosArrayClear(pUidList);
1860

1861
    int32_t numOfRows = taosArrayGetSize(pUidTagList);
77,490✔
1862
    code = taosArrayEnsureCap(pUidList, numOfRows);
77,490✔
1863
    QUERY_CHECK_CODE(code, lino, end);
77,490✔
1864

1865
    for (int32_t i = 0; i < numOfRows; ++i) {
77,490✔
1866
      STUidTagInfo* pInfo = taosArrayGet(pUidTagList, i);
1867
      QUERY_CHECK_NULL(pInfo, code, lino, end, terrno);
77,490✔
1868
      void* tmp = taosArrayPush(pUidList, &pInfo->uid);
44,280✔
1869
      QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
44,280✔
1870
    }
44,280✔
1871
    terrno = 0;
44,280✔
1872
  } else {
1873
    if ((condType == FILTER_NO_LOGIC || condType == FILTER_AND) && status != SFLT_NOT_INDEX) {
1874
      code = pAPI->metaFn.getTableTagsByUid(pVnode, pListInfo->idInfo.suid, pUidTagList);
1875
    } else {
969,633,414✔
1876
      code = pAPI->metaFn.getTableTags(pVnode, pListInfo->idInfo.suid, pUidTagList);
820,871,868✔
1877
    }
820,982,145✔
1878
    if (code != TSDB_CODE_SUCCESS) {
1879
      qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code), pListInfo->idInfo.suid);
1880
      terrno = code;
148,761,546✔
1881
      QUERY_CHECK_CODE(code, lino, end);
41,652,925✔
1882
    }
1883
  }
41,662,023✔
1884

41,654,098✔
1885
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
1886
  if (numOfTables == 0) {
41,662,622✔
1887
    goto end;
1888
  }
41,684,240✔
1889

41,684,240✔
1890
  SArray* pColList = NULL;
41,638,728✔
1891
  code = qGetColumnsFromNodeList(pTagCond, false, &pColList); 
10,527,752✔
1892
  if (code != TSDB_CODE_SUCCESS) {
1893
    goto end;
31,110,976✔
1894
  }
1895
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
1896
  taosArrayDestroy(pColList);
1897
  if (pResBlock == NULL) {
1898
    code = terrno;
969,797,025✔
1899
    QUERY_CHECK_CODE(code, lino, end);
969,945,691✔
1900
  }
1901

1902
  //  int64_t st1 = taosGetTimestampUs();
969,935,021✔
1903
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
1904
  pBlockList = taosArrayInit(2, POINTER_BYTES);
969,921,856✔
1905
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
33,210✔
1906

33,210✔
1907
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
33,210✔
1908
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
1909

33,210✔
1910
  code = createResultData(&type, numOfTables, &output);
33,210✔
1911
  if (code != TSDB_CODE_SUCCESS) {
25,830✔
1912
    terrno = code;
25,830✔
1913
    QUERY_CHECK_CODE(code, lino, end);
25,830✔
1914
  }
1915

1916
  code = scalarCalculate(pTagCond, pBlockList, &output, pStreamInfo, NULL);
33,210✔
1917
  if (code != TSDB_CODE_SUCCESS) {
1918
    qError("failed to calculate scalar, reason:%s", tstrerror(code));
33,210✔
1919
    terrno = code;
1920
    QUERY_CHECK_CODE(code, lino, end);
33,210✔
1921
  }
33,210✔
1922

1923
  code = doSetQualifiedUid(pListInfo, pUidList, pUidTagList, (bool*)output.columnData->pData, addUid);
1924
  if (code != TSDB_CODE_SUCCESS) {
1925
    terrno = code;
1,707,049,117✔
1926
    QUERY_CHECK_CODE(code, lino, end);
1,708,060,363✔
1927
  }
1,578,911,420✔
1928
  *listAdded = true;
2,147,483,647✔
1929

2,147,483,647✔
1930
end:
2,147,483,647✔
1931
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
1932
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1933
  }
2,147,483,647✔
1934
  blockDataDestroy(pResBlock);
2,147,483,647✔
1935
  taosArrayDestroy(pBlockList);
×
UNCOV
1936
  taosArrayDestroyEx(pUidTagList, freeItem);
×
1937

1938
  colDataDestroy(output.columnData);
1939
  taosMemoryFreeClear(output.columnData);
2,147,483,647✔
1940
  return code;
1941
}
1942

1943
typedef struct {
1,707,722,221✔
1944
  int32_t code;
1945
  SStreamRuntimeFuncInfo* pStreamRuntimeInfo;
1,707,862,944✔
1946
} PlaceHolderContext;
1947

1,708,085,730✔
1948
static EDealRes replacePlaceHolderColumn(SNode** pNode, void* pContext) {
1,707,907,804✔
1949
  PlaceHolderContext* pData = (PlaceHolderContext*)pContext;
10,670✔
1950
  if (QUERY_NODE_FUNCTION != nodeType((*pNode))) {
1951
    return DEAL_RES_CONTINUE;
1,707,887,706✔
1952
  }
1953
  SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
1954
  if (!fmIsStreamPesudoColVal(pFuncNode->funcId)) {
223,391✔
1955
    return DEAL_RES_CONTINUE;
223,391✔
1956
  }
223,391✔
1957
  pData->code = fmSetStreamPseudoFuncParamVal(pFuncNode->funcId, pFuncNode->pParameterList, pData->pStreamRuntimeInfo);
223,391✔
1958
  if (pData->code != TSDB_CODE_SUCCESS) {
223,391✔
1959
    return DEAL_RES_ERROR;
223,391✔
1960
  }
223,391✔
1961
  SNode* pFirstParam = nodesListGetNode(pFuncNode->pParameterList, 0);
223,391✔
1962
  ((SValueNode*)pFirstParam)->translate = true;
1963
  SValueNode* res = NULL;
223,391✔
1964
  pData->code = nodesCloneNode(pFirstParam, (SNode**)&res);
223,391✔
1965
  if (NULL == res) {
223,391✔
1966
    return DEAL_RES_ERROR;
223,391✔
1967
  }
1968
  nodesDestroyNode(*pNode);
223,391✔
1969
  *pNode = (SNode*)res;
223,391✔
1970

223,391✔
1971
  return DEAL_RES_CONTINUE;
223,391✔
1972
}
1973

223,391✔
1974
static void extractTagColId(SOperatorNode* pOpNode, SArray* pColIdArray) {
223,391✔
NEW
1975
  SNode* pLeft = pOpNode->pLeft;
×
1976
  SNode* pRight = pOpNode->pRight;
1977
  SColumnNode* pColNode = nodeType(pLeft) == QUERY_NODE_COLUMN ?
223,391✔
1978
    (SColumnNode*)pLeft : (SColumnNode*)pRight;
1979

NEW
1980
  col_id_t colId = pColNode->colId;
×
NEW
1981
  taosArrayPush(pColIdArray, &colId);
×
1982
}
1983

NEW
1984
static int32_t buildTagCondKey(
×
NEW
1985
  SNode* pTagCond, char** pTagCondKey, int32_t* tagCondKeyLen, SArray** pTagColIds) {
×
NEW
1986
  if (NULL == pTagCond ||
×
1987
    (nodeType(pTagCond) != QUERY_NODE_OPERATOR &&
1988
      nodeType(pTagCond) != QUERY_NODE_LOGIC_CONDITION)) {
NEW
1989
    qError("invalid parameter to extract tag filter symbol");
×
NEW
1990
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1991
  }
1992
  int32_t code = TSDB_CODE_SUCCESS;
NEW
1993
  int32_t lino = 0;
×
1994
  *pTagColIds = taosArrayInit(4, sizeof(col_id_t));
NEW
1995

×
1996
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR) {
NEW
1997
    extractTagColId((SOperatorNode*)pTagCond, *pTagColIds);
×
NEW
1998
  } else if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION) {
×
NEW
1999
    SNode* pChild = NULL;
×
NEW
2000
    FOREACH(pChild, ((SLogicConditionNode*)pTagCond)->pParameterList) {
×
2001
      extractTagColId((SOperatorNode*)pChild, *pTagColIds);
2002
    }
NEW
2003
  }
×
NEW
2004

×
NEW
2005
  taosArraySort(*pTagColIds, compareUint16Val);
×
NEW
2006

×
NEW
2007
  // encode ordered colIds into key string, separated by ','
×
2008
  *tagCondKeyLen =
2009
    (int32_t)(taosArrayGetSize(*pTagColIds) * (sizeof(col_id_t) + 1) - 1);
NEW
2010
  *pTagCondKey = (char*)taosMemoryCalloc(1, *tagCondKeyLen);
×
NEW
2011
  TSDB_CHECK_NULL(*pTagCondKey, code, lino, _end, terrno);
×
NEW
2012
  char* pStart = *pTagCondKey;
×
NEW
2013
  for (int32_t i = 0; i < taosArrayGetSize(*pTagColIds); ++i) {
×
NEW
2014
    col_id_t* pColId = (col_id_t*)taosArrayGet(*pTagColIds, i);
×
NEW
2015
    TSDB_CHECK_NULL(pColId, code, lino, _end, terrno);
×
2016
    memcpy(pStart, pColId, sizeof(col_id_t));
NEW
2017
    pStart += sizeof(col_id_t);
×
NEW
2018
    if (i != taosArrayGetSize(*pTagColIds) - 1) {
×
2019
      *pStart = ',';
2020
      pStart += 1;
NEW
2021
    }
×
NEW
2022
  }
×
NEW
2023

×
NEW
2024
_end:
×
NEW
2025
  if (TSDB_CODE_SUCCESS != code) {
×
NEW
2026
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2027
    terrno = code;
NEW
2028
  }
×
NEW
2029
  return code;
×
NEW
2030
}
×
2031

2032
int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
2033
                     STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI, void* pStreamInfo) {
×
UNCOV
2034
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
2035
  int32_t lino = 0;
×
UNCOV
2036
  size_t  numOfTables = 0;
×
UNCOV
2037
  bool    listAdded = false;
×
2038

UNCOV
2039
  pListInfo->idInfo.suid = pScanNode->suid;
×
2040
  pListInfo->idInfo.tableType = pScanNode->tableType;
UNCOV
2041

×
UNCOV
2042
  SArray* pUidList = taosArrayInit(8, sizeof(uint64_t));
×
UNCOV
2043
  QUERY_CHECK_NULL(pUidList, code, lino, _error, terrno);
×
2044

UNCOV
2045
  SIdxFltStatus status = SFLT_NOT_INDEX;
×
UNCOV
2046
  if (pScanNode->tableType != TSDB_SUPER_TABLE && !pScanNode->virtualStableScan) {
×
UNCOV
2047
    pListInfo->idInfo.uid = pScanNode->uid;
×
UNCOV
2048
    if (pStorageAPI->metaFn.isTableExisted(pVnode, pScanNode->uid)) {
×
UNCOV
2049
      void* tmp = taosArrayPush(pUidList, &pScanNode->uid);
×
UNCOV
2050
      QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
×
UNCOV
2051
    }
×
UNCOV
2052
    code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status, pStorageAPI, false, &listAdded, pStreamInfo);
×
2053
    QUERY_CHECK_CODE(code, lino, _end);
UNCOV
2054
  } else {
×
UNCOV
2055
    T_MD5_CTX context = {0};
×
UNCOV
2056

×
UNCOV
2057
    if (tsTagFilterCache) {
×
UNCOV
2058
      if (pStreamInfo != NULL && ((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
×
UNCOV
2059
        SNode* tmp = NULL;
×
2060
        code = nodesCloneNode((SNode*)pTagCond, &tmp);
UNCOV
2061
        QUERY_CHECK_CODE(code, lino, _error);
×
UNCOV
2062

×
2063
        PlaceHolderContext ctx = {.code = TSDB_CODE_SUCCESS, .pStreamRuntimeInfo = (SStreamRuntimeFuncInfo*)pStreamInfo};
UNCOV
2064
        nodesRewriteExpr(&tmp, replacePlaceHolderColumn, (void*)&ctx);
×
UNCOV
2065
        if (TSDB_CODE_SUCCESS != ctx.code) {
×
2066
          nodesDestroyNode(tmp);
2067
          code = ctx.code;
2068
          goto _error;
2069
        }
UNCOV
2070
        code = genTagFilterDigest(tmp, &context);
×
UNCOV
2071
        nodesDestroyNode(tmp);
×
2072
      } else {
UNCOV
2073
        code = genTagFilterDigest(pTagCond, &context);
×
UNCOV
2074
      }
×
2075
      // try to retrieve the result from meta cache
2076
      QUERY_CHECK_CODE(code, lino, _error);      
×
2077
      bool acquired = false;
2078
      if (pTagCond != NULL && canOptimizeTagFilter(pTagCond)) {
2079
        qDebug("tag filter condition can be optimized");
25,802,471✔
2080
        char* pTagCondKey;
25,802,471✔
NEW
2081
        int32_t tagCondKeyLen;
×
2082
        SArray* pTagColIds = NULL;
2083
        buildTagCondKey(pTagCond, &pTagCondKey, &tagCondKeyLen, &pTagColIds);
2084
        taosArrayDestroy(pTagColIds);
25,802,471✔
2085
        code = pStorageAPI->metaFn.getStableCachedTableList(
25,801,335✔
2086
          pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
25,802,618✔
NEW
2087
          context.digest, tListLen(context.digest), pUidList, &acquired);
×
2088
        QUERY_CHECK_CODE(code, lino, _error);
2089

2090
        if (acquired) {
60,006,268✔
2091
          digest[0] = 1;
34,218,373✔
2092
          memcpy(digest + 1, context.digest, tListLen(context.digest));
34,223,191✔
NEW
2093
          qDebug("retrieve table uid list from cache, numOfTables:%d", (int32_t)taosArrayGetSize(pUidList));
×
NEW
2094
          goto _end;
×
NEW
2095
        }
×
2096
      }
2097
      code = pStorageAPI->metaFn.getCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest),
2098
                                                    pUidList, &acquired);
2099
      QUERY_CHECK_CODE(code, lino, _error);
34,223,191✔
2100

34,212,037✔
2101
      if (acquired) {
34,223,983✔
2102
        digest[0] = 1;
34,224,179✔
2103
        memcpy(digest + 1, context.digest, tListLen(context.digest));
34,212,045✔
2104
        qDebug("retrieve table uid list from cache, numOfTables:%d", (int32_t)taosArrayGetSize(pUidList));
34,221,699✔
2105
        goto _end;
34,209,684✔
2106
      }
2107
    }
34,216,180✔
2108

34,216,180✔
UNCOV
2109
    if (!pTagCond) {  // no tag filter condition exists, let's fetch all tables of this super table
×
UNCOV
2110
      code = pStorageAPI->metaFn.getChildTableList(pVnode, pScanNode->suid, pUidList);
×
UNCOV
2111
      QUERY_CHECK_CODE(code, lino, _error);
×
2112
    } else {
2113
      // failed to find the result in the cache, let try to calculate the results
2114
      if (pTagIndexCond) {
2115
        void* pIndex = pStorageAPI->metaFn.getInvertIndex(pVnode);
25,787,895✔
2116

2117
        SIndexMetaArg metaArg = {.metaEx = pVnode,
2118
                                 .idx = pStorageAPI->metaFn.storeGetIndexInfo(pVnode),
2,147,483,647✔
2119
                                 .ivtIdx = pIndex,
2120
                                 .suid = pScanNode->uid};
2,147,483,647✔
2121

2,147,483,647✔
2122
        status = SFLT_NOT_INDEX;
2,147,483,647✔
2123
        code = doFilterTag(pTagIndexCond, &metaArg, pUidList, &status, &pStorageAPI->metaFilter);
2124
        if (code != 0 || status == SFLT_NOT_INDEX) {  // temporarily disable it for performance sake
2,147,483,647✔
2125
          qDebug("failed to get tableIds from index, suid:%" PRIu64, pScanNode->uid);
2126
        } else {
2,147,483,647✔
2127
          qDebug("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
2,147,483,647✔
UNCOV
2128
        }
×
UNCOV
2129
      }
×
2130
    }
2131

2132
    code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status, pStorageAPI, tsTagFilterCache, &listAdded, pStreamInfo);
2,147,483,647✔
2133
    QUERY_CHECK_CODE(code, lino, _end);
2,147,483,647✔
2134

2,147,483,647✔
2135
    // let's add the filter results into meta-cache
2,147,483,647✔
2136
    numOfTables = taosArrayGetSize(pUidList);
2,147,483,647✔
2137

2138
    if (tsTagFilterCache) {
2,147,483,647✔
2139
      size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
2,147,483,647✔
2140
      char*  pPayload = taosMemoryMalloc(size);
2,147,483,647✔
2141
      QUERY_CHECK_NULL(pPayload, code, lino, _end, terrno);
2,147,483,647✔
2142

2,147,483,647✔
2143
      *(int32_t*)pPayload = numOfTables;
2,147,483,647✔
2144
      if (numOfTables > 0) {
2,147,483,647✔
2145
        void* tmp = taosArrayGet(pUidList, 0);
2,147,483,647✔
2146
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2147
        memcpy(pPayload + sizeof(int32_t), tmp, numOfTables * sizeof(uint64_t));
2148
      }
2149

2150
      if (canOptimizeTagFilter(pTagCond)) {
2,147,483,647✔
2151
        qDebug("tag filter condition can be optimized, cache separately");
2,147,483,647✔
2152
        char* pTagCondKey;
2,147,483,647✔
2153
        int32_t tagCondKeyLen;
2,147,483,647✔
2154
        SArray* pTagColIds = NULL;
2,147,483,647✔
2155
        code = buildTagCondKey(pTagCond, &pTagCondKey, &tagCondKeyLen, &pTagColIds);
2156
        QUERY_CHECK_CODE(code, lino, _error);
2157
        code = pStorageAPI->metaFn.putStableCachedTableList(
2158
          pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
2,147,483,647✔
2159
          context.digest, tListLen(context.digest), pUidList, pTagColIds);
1,260,523,768✔
2160
        QUERY_CHECK_CODE(code, lino, _error);
1,260,557,788✔
2161
      }
2162
      code = pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest),
2163
                                                    pPayload, size, 1);
2,147,483,647✔
2164
      QUERY_CHECK_CODE(code, lino, _error);
2,147,483,647✔
2165

2,147,483,647✔
2166
      digest[0] = 1;
2,147,483,647✔
2167
      memcpy(digest + 1, context.digest, tListLen(context.digest));
2,147,483,647✔
2168
    }
2,147,483,647✔
2169
  }
2170

2171
_end:
2172
  if (!listAdded) {
12,358,299✔
2173
    numOfTables = taosArrayGetSize(pUidList);
2,147,483,647✔
2174
    for (int i = 0; i < numOfTables; i++) {
82,001,684✔
2175
      void* tmp = taosArrayGet(pUidList, i);
2176
      QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
81,752,608✔
2177
      STableKeyInfo info = {.uid = *(uint64_t*)tmp, .groupId = 0};
2178

2179
      void* p = taosArrayPush(pListInfo->pTableList, &info);
2180
      if (p == NULL) {
2,147,483,647✔
2181
        taosArrayDestroy(pUidList);
2182
        return terrno;
2,147,483,647✔
2183
      }
2,147,483,647✔
UNCOV
2184

×
2185
      qTrace("tagfilter get uid:%" PRIu64 ", %s", info.uid, idstr);
2186
    }
2,147,483,647✔
2187
  }
2188

2189
  qDebug("table list with %d uids built", (int32_t)taosArrayGetSize(pListInfo->pTableList));
2,147,483,647✔
2190

2191
_error:
2,147,483,647✔
2192

2,147,483,647✔
2193
  taosArrayDestroy(pUidList);
2,147,483,647✔
2194
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
2195
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
2,147,483,647✔
2196
  }
2,147,483,647✔
2197
  return code;
2,147,483,647✔
2198
}
2199

2,147,483,647✔
2200
int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList, void* pTaskInfo) {
2201
  int32_t        code = TSDB_CODE_SUCCESS;
2202
  int32_t        lino = 0;
2,147,483,647✔
2203
  SSubplan*      pSubplan = (SSubplan*)node;
2,147,483,647✔
2204
  SScanPhysiNode pNode = {0};
2,147,483,647✔
UNCOV
2205
  pNode.suid = suid;
×
2206
  pNode.uid = suid;
2207
  pNode.tableType = TSDB_SUPER_TABLE;
2208

2,147,483,647✔
2209
  STableListInfo* pTableListInfo = tableListCreate();
2,147,483,647✔
2210
  QUERY_CHECK_NULL(pTableListInfo, code, lino, _end, terrno);
2,147,483,647✔
2211
  uint8_t digest[17] = {0};
2,147,483,647✔
2212
  code = getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL,
2,147,483,647✔
2213
                      pTableListInfo, digest, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI, NULL);
2,147,483,647✔
2214
  QUERY_CHECK_CODE(code, lino, _end);
2,147,483,647✔
2215
  *tableList = pTableListInfo->pTableList;
2,147,483,647✔
2216
  pTableListInfo->pTableList = NULL;
2,147,483,647✔
2217
  tableListDestroy(pTableListInfo);
2218

2219
_end:
2,147,483,647✔
2220
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
2221
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
2,147,483,647✔
2222
  }
2,147,483,647✔
2223
  return code;
2,147,483,647✔
2224
}
2,147,483,647✔
2225

2,147,483,647✔
2226
size_t getTableTagsBufLen(const SNodeList* pGroups) {
2227
  size_t keyLen = 0;
2,147,483,647✔
2228

2,147,483,647✔
2229
  SNode* node;
2230
  FOREACH(node, pGroups) {
2,147,483,647✔
2231
    SExprNode* pExpr = (SExprNode*)node;
2232
    keyLen += pExpr->resType.bytes;
2,147,483,647✔
2233
  }
2,147,483,647✔
2234

2,147,483,647✔
2235
  keyLen += sizeof(int8_t) * LIST_LENGTH(pGroups);
2236
  return keyLen;
2,147,483,647✔
2237
}
2,147,483,647✔
2238

2239
int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId,
2,147,483,647✔
2240
                              SStorageAPI* pAPI) {
2241
  SMetaReader mr = {0};
2,147,483,647✔
2242

2243
  pAPI->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &pAPI->metaFn);
2,147,483,647✔
2244
  if (pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid) != 0) {  // table not exist
2245
    pAPI->metaReaderFn.clearReader(&mr);
2,147,483,647✔
2246
    return TSDB_CODE_PAR_TABLE_NOT_EXIST;
2,147,483,647✔
2247
  }
2,147,483,647✔
2248

2249
  SNodeList* groupNew = NULL;
2,147,483,647✔
2250
  int32_t    code = nodesCloneList(pGroupNode, &groupNew);
2,147,483,647✔
2251
  if (TSDB_CODE_SUCCESS != code) {
152,103,825✔
2252
    pAPI->metaReaderFn.clearReader(&mr);
152,117,430✔
2253
    return code;
2254
  }
152,117,430✔
2255

152,060,068✔
2256
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
2257
  nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &ctx);
152,009,273✔
2258
  if (TSDB_CODE_SUCCESS != ctx.code) {
2259
    nodesDestroyList(groupNew);
152,123,326✔
2260
    pAPI->metaReaderFn.clearReader(&mr);
2261
    return code;
152,106,850✔
2262
  }
152,028,502✔
2263
  char* isNull = (char*)keyBuf;
152,127,624✔
2264
  char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(pGroupNode);
152,061,102✔
2265

2,147,483,647✔
2266
  SNode*  pNode;
2,147,483,647✔
2267
  int32_t index = 0;
2,147,483,647✔
2268
  FOREACH(pNode, groupNew) {
2269
    SNode*  pNew = NULL;
2,147,483,647✔
2270
    int32_t code = scalarCalculateConstants(pNode, &pNew);
2271
    if (TSDB_CODE_SUCCESS == code) {
2,147,483,647✔
2272
      REPLACE_NODE(pNew);
2,147,483,647✔
2273
    } else {
2274
      nodesDestroyList(groupNew);
2,147,483,647✔
2275
      pAPI->metaReaderFn.clearReader(&mr);
2,147,483,647✔
2276
      return code;
2,147,483,647✔
2277
    }
2278

2,147,483,647✔
2279
    if (nodeType(pNew) != QUERY_NODE_VALUE) {
2280
      nodesDestroyList(groupNew);
2,147,483,647✔
2281
      pAPI->metaReaderFn.clearReader(&mr);
2282
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
2283
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
2,147,483,647✔
2284
    }
2,147,483,647✔
2285
    SValueNode* pValue = (SValueNode*)pNew;
2286

2,147,483,647✔
2287
    if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
186,000,811✔
2288
      isNull[index++] = 1;
185,940,457✔
2289
      continue;
185,970,326✔
2290
    } else {
186,008,408✔
2291
      isNull[index++] = 0;
186,012,628✔
2292
      char* data = nodesGetValueFromNode(pValue);
186,019,106✔
2293
      if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
2294
        if (tTagIsJson(data)) {
186,086,452✔
2295
          terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
186,086,452✔
2296
          nodesDestroyList(groupNew);
185,957,094✔
2297
          pAPI->metaReaderFn.clearReader(&mr);
186,023,289✔
2298
          return terrno;
×
UNCOV
2299
        }
×
2300
        int32_t len = getJsonValueLen(data);
2301
        memcpy(pStart, data, len);
186,023,289✔
2302
        pStart += len;
2303
      } else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
2304
        if (IS_STR_DATA_BLOB(pValue->node.resType.type)) {
2305
          return TSDB_CODE_BLOB_NOT_SUPPORT_TAG;
2,147,483,647✔
2306
        }
2307
        memcpy(pStart, data, varDataTLen(data));
2,147,483,647✔
2308
        pStart += varDataTLen(data);
2,147,483,647✔
2309
      } else {
2,147,483,647✔
2310
        memcpy(pStart, data, pValue->node.resType.bytes);
2311
        pStart += pValue->node.resType.bytes;
2,147,483,647✔
2312
      }
2,147,483,647✔
2313
    }
2,147,483,647✔
2314
  }
2,147,483,647✔
2315

2,147,483,647✔
2316
  int32_t len = (int32_t)(pStart - (char*)keyBuf);
2317
  *pGroupId = calcGroupId(keyBuf, len);
2,147,483,647✔
2318

2,147,483,647✔
2319
  nodesDestroyList(groupNew);
2,147,483,647✔
2320
  pAPI->metaReaderFn.clearReader(&mr);
2,147,483,647✔
2321

1,659,743,927✔
2322
  return TSDB_CODE_SUCCESS;
939,443,759✔
2323
}
939,443,759✔
2324

939,373,483✔
2325
SArray* makeColumnArrayFromList(SNodeList* pNodeList) {
938,724,948✔
2326
  if (!pNodeList) {
2327
    return NULL;
2328
  }
2,147,483,647✔
2329

182,898,506✔
2330
  size_t  numOfCols = LIST_LENGTH(pNodeList);
170,567,372✔
2331
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
170,567,704✔
2332
  if (pList == NULL) {
2333
    return NULL;
170,567,704✔
2334
  }
170,459,797✔
2335

170,365,059✔
2336
  for (int32_t i = 0; i < numOfCols; ++i) {
2337
    SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
170,518,857✔
2338
    if (!pColNode) {
2339
      taosArrayDestroy(pList);
170,450,365✔
2340
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
170,469,249✔
2341
      return NULL;
12,331,134✔
2342
    }
12,284,292✔
2343

12,283,781✔
2344
    // todo extract method
2345
    SColumn c = {0};
12,283,781✔
2346
    c.slotId = pColNode->slotId;
12,282,743✔
2347
    c.colId = pColNode->colId;
12,284,292✔
2348
    c.type = pColNode->node.resType.type;
2349
    c.bytes = pColNode->node.resType.bytes;
12,283,670✔
2350
    c.precision = pColNode->node.resType.precision;
2351
    c.scale = pColNode->node.resType.scale;
12,282,480✔
2352

12,280,699✔
2353
    void* tmp = taosArrayPush(pList, &c);
46,842✔
2354
    if (!tmp) {
29,212✔
2355
      taosArrayDestroy(pList);
29,212✔
2356
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
29,212✔
2357
      return NULL;
29,212✔
2358
    }
29,212✔
2359
  }
29,212✔
2360

2361
  return pList;
29,212✔
2362
}
29,212✔
2363

2364
int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
17,845✔
2365
                            int32_t type, SColMatchInfo* pMatchInfo) {
17,845✔
2366
  size_t  numOfCols = LIST_LENGTH(pNodeList);
2367
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
2368
  int32_t lino = 0;
2,147,483,647✔
2369

2,147,483,647✔
UNCOV
2370
  pMatchInfo->matchType = type;
×
2371

2372
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchItem));
2,147,483,647✔
2373
  if (pList == NULL) {
2374
    code = terrno;
2375
    return code;
2,147,483,647✔
2376
  }
2,147,483,647✔
2377

2378
  for (int32_t i = 0; i < numOfCols; ++i) {
UNCOV
2379
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
×
UNCOV
2380
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
×
UNCOV
2381
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
×
UNCOV
2382
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
×
2383

×
2384
      SColMatchItem c = {.needOutput = true};
2385
      c.colId = pColNode->colId;
UNCOV
2386
      c.srcSlotId = pColNode->slotId;
×
UNCOV
2387
      c.dstSlotId = pNode->slotId;
×
UNCOV
2388
      c.isPk = pColNode->isPk;
×
UNCOV
2389
      c.dataType = pColNode->node.resType;
×
UNCOV
2390
      void* tmp = taosArrayPush(pList, &c);
×
UNCOV
2391
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
UNCOV
2392
    }
×
UNCOV
2393
  }
×
2394

2395
  // set the output flag for each column in SColMatchInfo, according to the
2396
  *numOfOutputCols = 0;
UNCOV
2397
  int32_t num = LIST_LENGTH(pOutputNodeList->pSlots);
×
2398
  for (int32_t i = 0; i < num; ++i) {
2399
    SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i);
2400
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
2,147,483,647✔
2401

2,147,483,647✔
2402
    // todo: add reserve flag check
2403
    // it is a column reserved for the arithmetic expression calculation
2,147,483,647✔
2404
    if (pNode->slotId >= numOfCols) {
2,147,483,647✔
2405
      (*numOfOutputCols) += 1;
2,147,483,647✔
2406
      continue;
2,147,483,647✔
2407
    }
302,322,434✔
2408

2409
    SColMatchItem* info = NULL;
2410
    for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
2,147,483,647✔
2411
      info = taosArrayGet(pList, j);
2,147,483,647✔
2412
      QUERY_CHECK_NULL(info, code, lino, _end, terrno);
315,342,019✔
2413
      if (info->dstSlotId == pNode->slotId) {
2414
        break;
2415
      }
2,147,483,647✔
2416
    }
2,147,483,647✔
UNCOV
2417

×
2418
    if (pNode->output) {
2419
      (*numOfOutputCols) += 1;
2420
    } else if (info != NULL) {
2,147,483,647✔
2421
      // select distinct tbname from stb where tbname='abc';
2,147,483,647✔
2422
      info->needOutput = false;
2,147,483,647✔
2423
    }
2,147,483,647✔
2424
  }
2425

464,331,593✔
2426
  pMatchInfo->pList = pList;
2427

2,147,483,647✔
UNCOV
2428
_end:
×
UNCOV
2429
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
2430
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
2431
  }
×
2432
  return code;
2433
}
2434

2,147,483,647✔
2435
static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision,
2,147,483,647✔
2436
                                  const char* name) {
2,147,483,647✔
UNCOV
2437
  SResSchema s = {0};
×
UNCOV
2438
  s.scale = scale;
×
2439
  s.type = type;
×
UNCOV
2440
  s.bytes = bytes;
×
2441
  s.slotId = slotId;
2442
  s.precision = precision;
2443
  tstrncpy(s.name, name, tListLen(s.name));
2444

2,147,483,647✔
2445
  return s;
2,147,483,647✔
2446
}
2447

UNCOV
2448
static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType, EColumnType colType) {
×
UNCOV
2449
  SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn));
×
UNCOV
2450
  if (pCol == NULL) {
×
UNCOV
2451
    return NULL;
×
2452
  }
UNCOV
2453

×
2454
  pCol->slotId = slotId;
2455
  pCol->colId = colId;
2456
  pCol->bytes = pType->bytes;
2,147,483,647✔
2457
  pCol->type = pType->type;
2,147,483,647✔
2458
  pCol->scale = pType->scale;
2,147,483,647✔
2459
  pCol->precision = pType->precision;
2,147,483,647✔
2460
  pCol->dataBlockId = blockId;
2461
  pCol->colType = colType;
2,147,483,647✔
2462
  return pCol;
2,147,483,647✔
2463
}
2,147,483,647✔
2464

2,147,483,647✔
2465
int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
18,465,286✔
2466
  int32_t code = TSDB_CODE_SUCCESS;
2467
  int32_t lino = 0;
13,233,315✔
2468
  pExp->base.numOfParams = 0;
13,224,670✔
UNCOV
2469
  pExp->base.pParam = NULL;
×
2470
  pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode));
2471
  QUERY_CHECK_NULL(pExp->pExpr, code, lino, _end, terrno);
2472

18,456,641✔
UNCOV
2473
  pExp->pExpr->_function.num = 1;
×
UNCOV
2474
  pExp->pExpr->_function.functionId = -1;
×
UNCOV
2475

×
2476
  int32_t type = nodeType(pNode);
2477
  // it is a project query, or group by column
18,460,612✔
2478
  if (type == QUERY_NODE_COLUMN) {
18,470,119✔
2479
    pExp->pExpr->nodeType = QUERY_NODE_COLUMN;
18,458,683✔
UNCOV
2480
    SColumnNode* pColNode = (SColumnNode*)pNode;
×
UNCOV
2481

×
2482
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
2483
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
18,456,707✔
2484

18,457,554✔
2485
    pExp->base.numOfParams = 1;
2486

2487
    SDataType* pType = &pColNode->node.resType;
2488
    pExp->base.resSchema =
2,147,483,647✔
2489
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pColNode->colName);
2,147,483,647✔
2490

2,147,483,647✔
2491
    pExp->base.pParam[0].pCol =
2,147,483,647✔
2492
        createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType);
2,147,483,647✔
UNCOV
2493
    QUERY_CHECK_NULL(pExp->base.pParam[0].pCol, code, lino, _end, terrno);
×
2494

2495
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
2496
  } else if (type == QUERY_NODE_VALUE) {
2497
    pExp->pExpr->nodeType = QUERY_NODE_VALUE;
2,147,483,647✔
2498
    SValueNode* pValNode = (SValueNode*)pNode;
2,147,483,647✔
2499

2,147,483,647✔
2500
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
66,493,796✔
2501
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
40,479,271✔
2502

2503
    pExp->base.numOfParams = 1;
26,014,642✔
2504

26,016,382✔
2505
    SDataType* pType = &pValNode->node.resType;
25,389,994✔
2506
    pExp->base.resSchema =
2507
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName);
26,016,382✔
2508
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
26,014,991✔
UNCOV
2509
    code = nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
×
2510
    QUERY_CHECK_CODE(code, lino, _end);
2511
  } else if (type == QUERY_NODE_FUNCTION) {
26,014,991✔
2512
    pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
25,999,248✔
2513
    SFunctionNode* pFuncNode = (SFunctionNode*)pNode;
2514

2,147,483,647✔
2515
    SDataType* pType = &pFuncNode->node.resType;
578,954,170✔
2516
    pExp->base.resSchema =
556,565,714✔
2517
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pFuncNode->node.aliasName);
2518
    tExprNode* pExprNode = pExp->pExpr;
2519

2520
    pExprNode->_function.functionId = pFuncNode->funcId;
2521
    pExprNode->_function.pFunctNode = pFuncNode;
2,147,483,647✔
2522
    pExprNode->_function.functionType = pFuncNode->funcType;
228,640,931✔
2523

228,604,023✔
2524
    tstrncpy(pExprNode->_function.functionName, pFuncNode->functionName, tListLen(pExprNode->_function.functionName));
2525

2,147,483,647✔
2526
    pExp->base.pParamList = pFuncNode->pParameterList;
2527
#if 1
2528
    // todo refactor: add the parameter for tbname function
13,054,300✔
2529
    const char* name = "tbname";
2,147,483,647✔
UNCOV
2530
    int32_t     len = strlen(name);
×
UNCOV
2531

×
UNCOV
2532
    if (!pFuncNode->pParameterList && (memcmp(pExprNode->_function.functionName, name, len) == 0) &&
×
2533
        pExprNode->_function.functionName[len] == 0) {
2534
      pFuncNode->pParameterList = NULL;
2,147,483,647✔
2535
      int32_t     code = nodesMakeList(&pFuncNode->pParameterList);
2536
      SValueNode* res = NULL;
2,147,483,647✔
2537
      if (TSDB_CODE_SUCCESS == code) {
2538
        code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
2539
      }
2,147,483,647✔
2540
      QUERY_CHECK_CODE(code, lino, _end);
2541
      res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
2,147,483,647✔
2542
      code = nodesListAppend(pFuncNode->pParameterList, (SNode*)res);
2,147,483,647✔
2543
      if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
2544
        nodesDestroyNode((SNode*)res);
2,147,483,647✔
UNCOV
2545
        res = NULL;
×
2546
      }
2547
      QUERY_CHECK_CODE(code, lino, _end);
2548
    }
2,147,483,647✔
2549
#endif
2,147,483,647✔
UNCOV
2550

×
UNCOV
2551
    int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
×
2552

2553
    pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam));
2554
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
2,147,483,647✔
2555
    pExp->base.numOfParams = numOfParam;
2,147,483,647✔
2556

2557
    for (int32_t j = 0; j < numOfParam && TSDB_CODE_SUCCESS == code; ++j) {
2,147,483,647✔
2558
      SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
2,147,483,647✔
2559
      QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
2560
      if (p1->type == QUERY_NODE_COLUMN) {
2,147,483,647✔
2561
        SColumnNode* pcn = (SColumnNode*)p1;
2,147,483,647✔
2562

2563
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
2,147,483,647✔
2564
        pExp->base.pParam[j].pCol =
2,147,483,647✔
2565
            createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType);
2,147,483,647✔
2566
        QUERY_CHECK_NULL(pExp->base.pParam[j].pCol, code, lino, _end, terrno);
2,147,483,647✔
2567
      } else if (p1->type == QUERY_NODE_VALUE) {
2,147,483,647✔
2568
        SValueNode* pvn = (SValueNode*)p1;
2569
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
2,147,483,647✔
2570
        code = nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
2,147,483,647✔
2571
        QUERY_CHECK_CODE(code, lino, _end);
1,822,998,253✔
2572
      }
1,822,500,084✔
2573
    }
1,821,772,313✔
2574
    pExp->pExpr->_function.bindExprID = ((SExprNode*)pNode)->bindExprID;
2575
  } else if (type == QUERY_NODE_OPERATOR) {
498,169✔
2576
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
498,169✔
2577
    SOperatorNode* pOpNode = (SOperatorNode*)pNode;
498,169✔
2578

2579
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
498,169✔
2580
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
498,169✔
2581
    pExp->base.numOfParams = 1;
2582

1,822,270,482✔
2583
    SDataType* pType = &pOpNode->node.resType;
1,822,309,291✔
UNCOV
2584
    pExp->base.resSchema =
×
UNCOV
2585
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pOpNode->node.aliasName);
×
2586
    pExp->pExpr->_optrRoot.pRootNode = pNode;
2587
  } else if (type == QUERY_NODE_CASE_WHEN) {
2588
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
1,217,169,879✔
2589
    SCaseWhenNode* pCaseNode = (SCaseWhenNode*)pNode;
101,944,454✔
2590

101,944,481✔
2591
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
2592
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
2593
    pExp->base.numOfParams = 1;
1,217,623,202✔
2594

1,217,456,846✔
2595
    SDataType* pType = &pCaseNode->node.resType;
636,732✔
2596
    pExp->base.resSchema =
2597
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCaseNode->node.aliasName);
1,217,456,846✔
2598
    pExp->pExpr->_optrRoot.pRootNode = pNode;
2599
  } else if (type == QUERY_NODE_LOGIC_CONDITION) {
1,217,456,846✔
2600
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
228,186,954✔
2601
    SLogicConditionNode* pCond = (SLogicConditionNode*)pNode;
228,180,093✔
UNCOV
2602
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
×
UNCOV
2603
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
×
2604
    pExp->base.numOfParams = 1;
2605
    SDataType* pType = &pCond->node.resType;
2606
    pExp->base.resSchema =
2607
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName);
2,147,483,647✔
2608
    pExp->pExpr->_optrRoot.pRootNode = pNode;
2,147,483,647✔
2609
  } else {
152,094,850✔
2610
    code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
2611
    QUERY_CHECK_CODE(code, lino, _end);
2,147,483,647✔
2612
  }
2613
  pExp->pExpr->relatedTo = ((SExprNode*)pNode)->relatedTo;
2614
_end:
2,147,483,647✔
2615
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
2616
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
2,147,483,647✔
2617
  }
2,147,483,647✔
2618
  return code;
2,147,483,647✔
2619
}
2620

2,147,483,647✔
2621
int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) {
2,147,483,647✔
2622
  return createExprFromOneNode(pExp, pTargetNode->pExpr, pTargetNode->slotId);
2,147,483,647✔
2623
}
2,147,483,647✔
2624

2,147,483,647✔
2625
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
2,147,483,647✔
2626
  *numOfExprs = LIST_LENGTH(pNodeList);
2,147,483,647✔
2627
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
2,147,483,647✔
2628
  if (!pExprs) {
2,147,483,647✔
2629
    return NULL;
2,147,483,647✔
2630
  }
2,147,483,647✔
2631

2,147,483,647✔
2632
  for (int32_t i = 0; i < (*numOfExprs); ++i) {
2633
    SExprInfo* pExp = &pExprs[i];
2634
    int32_t    code = createExprFromOneNode(pExp, nodesListGetNode(pNodeList, i), i + UD_TAG_COLUMN_INDEX);
2,147,483,647✔
2635
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
2636
      taosMemoryFreeClear(pExprs);
2,147,483,647✔
2637
      terrno = code;
2638
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
2639
      return NULL;
2,147,483,647✔
2640
    }
2,147,483,647✔
2641
  }
2642

2,147,483,647✔
2643
  return pExprs;
2,147,483,647✔
2644
}
×
2645

×
2646
int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs) {
×
2647
  QRY_PARAM_CHECK(pExprInfo);
×
2648

UNCOV
2649
  int32_t code = 0;
×
UNCOV
2650
  int32_t numOfFuncs = LIST_LENGTH(pNodeList);
×
2651
  int32_t numOfGroupKeys = 0;
2652
  if (pGroupKeys != NULL) {
×
UNCOV
2653
    numOfGroupKeys = LIST_LENGTH(pGroupKeys);
×
2654
  }
2655

2,147,483,647✔
2656
  *numOfExprs = numOfFuncs + numOfGroupKeys;
2657
  if (*numOfExprs == 0) {
2658
    return code;
2659
  }
2660

111,503,752✔
2661
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
111,503,752✔
2662
  if (pExprs == NULL) {
111,503,752✔
2663
    return terrno;
2664
  }
111,510,203✔
2665

869,406,146✔
2666
  for (int32_t i = 0; i < (*numOfExprs); ++i) {
757,910,372✔
2667
    STargetNode* pTargetNode = NULL;
757,893,273✔
UNCOV
2668
    if (i < numOfFuncs) {
×
UNCOV
2669
      pTargetNode = (STargetNode*)nodesListGetNode(pNodeList, i);
×
2670
    } else {
2671
      pTargetNode = (STargetNode*)nodesListGetNode(pGroupKeys, i - numOfFuncs);
757,893,273✔
2672
    }
757,885,625✔
UNCOV
2673
    if (!pTargetNode) {
×
2674
      destroyExprInfo(pExprs, *numOfExprs);
2675
      taosMemoryFreeClear(pExprs);
2676
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
757,885,625✔
2677
      return terrno;
666,242,759✔
2678
    }
666,235,382✔
UNCOV
2679

×
2680
    SExprInfo* pExp = &pExprs[i];
2681
    code = createExprFromTargetNode(pExp, pTargetNode);
666,235,382✔
2682
    if (code != TSDB_CODE_SUCCESS) {
666,231,652✔
2683
      destroyExprInfo(pExprs, *numOfExprs);
×
2684
      taosMemoryFreeClear(pExprs);
×
2685
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
2686
      return code;
666,231,652✔
2687
    }
666,231,652✔
2688
  }
91,658,426✔
2689

91,664,291✔
2690
  *pExprInfo = pExprs;
UNCOV
2691
  return code;
×
2692
}
×
2693

2694
static void deleteSubsidiareCtx(void* pData) {
2695
  SSubsidiaryResInfo* pCtx = (SSubsidiaryResInfo*)pData;
111,509,322✔
2696
  if (pCtx->pCtx) {
2697
    taosMemoryFreeClear(pCtx->pCtx);
2698
  }
1,414,142,803✔
2699
}
2,147,483,647✔
2700

1,412,688,693✔
2701
// set the output buffer for the selectivity + tag query
1,413,230,525✔
2702
static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
1,414,407,980✔
2703
  int32_t num = 0;
1,413,804,582✔
2704
  int32_t code = TSDB_CODE_SUCCESS;
1,414,419,430✔
2705
  int32_t lino = 0;
1,414,398,937✔
2706

2707
  SArray* pValCtxArray = NULL;
2708
  for (int32_t i = numOfOutput - 1; i > 0; --i) {  // select Func is at the end of the list
1,413,013,742✔
2709
    int32_t funcIdx = pCtx[i].pExpr->pExpr->_function.bindExprID;
2710
    if (funcIdx > 0) {
1,413,703,024✔
2711
      if (pValCtxArray == NULL) {
2712
        // the end of the list is the select function of biggest index
2713
        pValCtxArray = taosArrayInit_s(sizeof(SSubsidiaryResInfo*), funcIdx);
495,044,654✔
2714
        if (pValCtxArray == NULL) {
495,044,654✔
2715
          return terrno;
2716
        }
495,044,654✔
2717
      }
495,033,622✔
2718
      if (funcIdx > pValCtxArray->size) {
495,058,959✔
2719
        qError("funcIdx:%d is out of range", funcIdx);
494,987,875✔
2720
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
495,037,478✔
2721
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
495,065,290✔
2722
      }
494,951,867✔
2723
      SSubsidiaryResInfo* pSubsidiary = &pCtx[i].subsidiaries;
2724
      pSubsidiary->pCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
2725
      if (pSubsidiary->pCtx == NULL) {
2726
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
2727
        return terrno;
2728
      }
2729
      pSubsidiary->num = 0;
2730
      taosArraySet(pValCtxArray, funcIdx - 1, &pSubsidiary);
2731
    }
2732
  }
2733

2734
  SqlFunctionCtx*  p = NULL;
22,897,741✔
2735
  SqlFunctionCtx** pValCtx = NULL;
2736
  if (pValCtxArray == NULL) {
22,897,741✔
2737
    pValCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
22,897,741✔
2738
    if (pValCtx == NULL) {
2739
      QUERY_CHECK_CODE(terrno, lino, _end);
2740
    }
22,897,741✔
2741
  }
33,898✔
2742

33,015✔
2743
  for (int32_t i = 0; i < numOfOutput; ++i) {
2744
    const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
2745
    if ((strcmp(pName, "_select_value") == 0)) {
22,863,218✔
2746
      if (pValCtxArray == NULL) {
3,672,620✔
2747
        pValCtx[num++] = &pCtx[i];
3,672,620✔
2748
      } else {
3,672,620✔
2749
        int32_t bindFuncIndex = pCtx[i].pExpr->pExpr->relatedTo;  // start from index 1;
2750
        if (bindFuncIndex > 0) {                                  // 0 is default index related to the select function
2751
          bindFuncIndex -= 1;
19,192,106✔
2752
        }
2,596,537✔
2753
        SSubsidiaryResInfo** pSubsidiary = taosArrayGet(pValCtxArray, bindFuncIndex);
2,596,537✔
2754
        if (pSubsidiary == NULL) {
2,596,537✔
2755
          QUERY_CHECK_CODE(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, lino, _end);
2756
        }
2757
        (*pSubsidiary)->pCtx[(*pSubsidiary)->num] = &pCtx[i];
2758
        (*pSubsidiary)->num++;
16,593,436✔
2759
      }
16,595,569✔
2760
    } else if (fmIsSelectFunc(pCtx[i].functionId)) {
16,593,436✔
2761
      if (pValCtxArray == NULL) {
16,594,061✔
2762
        p = &pCtx[i];
16,594,061✔
2763
      }
2764
    }
16,595,569✔
2765
  }
2766

2767
  if (p != NULL) {
1,708,498,608✔
2768
    p->subsidiaries.pCtx = pValCtx;
2769
    p->subsidiaries.num = num;
1,708,498,608✔
2770
  } else {
1,708,498,608✔
2771
    taosMemoryFreeClear(pValCtx);
1,708,623,838✔
2772
  }
2773

1,708,582,011✔
2774
_end:
1,708,033,915✔
UNCOV
2775
  if (code != TSDB_CODE_SUCCESS) {
×
2776
    taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
2777
    taosMemoryFreeClear(pValCtx);
1,707,728,441✔
2778
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,708,295,737✔
UNCOV
2779
  } else {
×
UNCOV
2780
    taosArrayDestroy(pValCtxArray);
×
2781
  }
2782
  return code;
2783
}
2784

1,707,745,556✔
2785
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset,
1,708,808,796✔
2786
                                     SFunctionStateStore* pStore) {
1,707,669,323✔
2787
  int32_t         code = TSDB_CODE_SUCCESS;
1,708,161,256✔
2788
  int32_t         lino = 0;
1,708,489,942✔
2789
  SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
1,707,826,137✔
2790
  if (pFuncCtx == NULL) {
1,708,105,218✔
2791
    return NULL;
1,223,736✔
2792
  }
2793

2794
  *rowEntryInfoOffset = taosMemoryCalloc(numOfOutput, sizeof(int32_t));
2,147,483,647✔
2795
  if (*rowEntryInfoOffset == 0) {
1,708,458,383✔
2796
    taosMemoryFreeClear(pFuncCtx);
2797
    return NULL;
1,707,569,604✔
2798
  }
2,147,483,647✔
2799

2,147,483,647✔
2800
  for (int32_t i = 0; i < numOfOutput; ++i) {
2,147,483,647✔
UNCOV
2801
    SExprInfo* pExpr = &pExprInfo[i];
×
UNCOV
2802

×
2803
    SExprBasicInfo* pFunct = &pExpr->base;
2804
    SqlFunctionCtx* pCtx = &pFuncCtx[i];
2,147,483,647✔
2805

2,147,483,647✔
2806
    pCtx->functionId = -1;
×
2807
    pCtx->pExpr = pExpr;
2808

2809
    if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) {
2,147,483,647✔
2810
      SFuncExecEnv env = {0};
2,147,483,647✔
2811
      pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
2,147,483,647✔
2812
      pCtx->isPseudoFunc = fmIsWindowPseudoColumnFunc(pCtx->functionId) || fmIsPlaceHolderFunc(pCtx->functionId);
2,147,483,647✔
2813
      pCtx->isNotNullFunc = fmIsNotNullOutputFunc(pCtx->functionId);
2814

2,147,483,647✔
2815
      bool isUdaf = fmIsUserDefinedFunc(pCtx->functionId);
2,147,483,647✔
2816
      if (fmIsAggFunc(pCtx->functionId) || fmIsIndefiniteRowsFunc(pCtx->functionId)) {
2817
        if (!isUdaf) {
2818
          code = fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
1,708,250,972✔
2819
          QUERY_CHECK_CODE(code, lino, _end);
2820
        } else {
1,708,602,984✔
2821
          char* udfName = pExpr->pExpr->_function.pFunctNode->functionName;
1,415,820,596✔
2822
          pCtx->udfName = taosStrdup(udfName);
22,897,741✔
2823
          QUERY_CHECK_NULL(pCtx->udfName, code, lino, _end, terrno);
22,897,741✔
2824

1,391,990,664✔
UNCOV
2825
          code = fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
×
UNCOV
2826
          QUERY_CHECK_CODE(code, lino, _end);
×
2827
        }
2828
        bool tmp = pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
2829
        if (!tmp) {
2830
          code = terrno;
1,708,155,507✔
2831
          QUERY_CHECK_CODE(code, lino, _end);
2832
        }
2833
      } else {
238,065,276✔
2834
        if (fmIsPlaceHolderFunc(pCtx->functionId)) {
2835
          code = fmGetStreamPesudoFuncEnv(pCtx->functionId, pExpr->base.pParamList, &env);
238,065,276✔
2836
          QUERY_CHECK_CODE(code, lino, _end);
238,065,276✔
2837
        }      
2838
        
238,065,276✔
2839
        code = fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp);
238,065,276✔
2840
        if (code != TSDB_CODE_SUCCESS && isUdaf) {
2841
          code = TSDB_CODE_SUCCESS;
238,065,276✔
2842
        }
238,065,276✔
2843
        QUERY_CHECK_CODE(code, lino, _end);
2844

238,065,276✔
2845
        if (pCtx->sfp.getEnv != NULL) {
238,065,276✔
2846
          bool tmp = pCtx->sfp.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
2847
          if (!tmp) {
238,065,276✔
2848
            code = terrno;
238,065,276✔
2849
            QUERY_CHECK_CODE(code, lino, _end);
238,065,276✔
2850
          }
238,065,276✔
2851
        }
238,065,276✔
2852
      }
238,065,276✔
2853
      pCtx->resDataInfo.interBufSize = env.calcMemSize;
2854
    } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR ||
881,323,862✔
2855
               pExpr->pExpr->nodeType == QUERY_NODE_VALUE) {
643,258,586✔
2856
      // for simple column, the result buffer needs to hold at least one element.
643,258,586✔
2857
      pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes;
2858
    }
643,258,586✔
2859

2,147,483,647✔
2860
    pCtx->input.numOfInputCols = pFunct->numOfParams;
2,147,483,647✔
2861
    pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
643,258,586✔
2862
    QUERY_CHECK_NULL(pCtx->input.pData, code, lino, _end, terrno);
643,258,586✔
2863
    pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
643,258,586✔
2864
    QUERY_CHECK_NULL(pCtx->input.pColumnDataAgg, code, lino, _end, terrno);
643,258,586✔
2865

643,258,586✔
2866
    pCtx->pTsOutput = NULL;
643,258,586✔
2867
    pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
643,258,586✔
2868
    pCtx->resDataInfo.type = pFunct->resSchema.type;
2869
    pCtx->order = TSDB_ORDER_ASC;
2870
    pCtx->start.key = INT64_MIN;
643,258,586✔
2871
    pCtx->end.key = INT64_MIN;
2872
    pCtx->numOfParams = pExpr->base.numOfParams;
2873
    pCtx->param = pFunct->pParam;
238,065,276✔
UNCOV
2874
    pCtx->saveHandle.currentPage = -1;
×
UNCOV
2875
    pCtx->pStore = pStore;
×
UNCOV
2876
    pCtx->hasWindowOrGroup = false;
×
UNCOV
2877
    pCtx->needCleanup = false;
×
UNCOV
2878
  }
×
2879

2880
  for (int32_t i = 1; i < numOfOutput; ++i) {
2881
    (*rowEntryInfoOffset)[i] = (int32_t)((*rowEntryInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) +
2,147,483,647✔
2882
                                         pFuncCtx[i - 1].resDataInfo.interBufSize);
2,147,483,647✔
2883
  }
2,147,483,647✔
2884

2,147,483,647✔
2885
  code = setSelectValueColumnInfo(pFuncCtx, numOfOutput);
2886
  QUERY_CHECK_CODE(code, lino, _end);
29,814,656✔
2887

29,814,656✔
2888
_end:
29,814,656✔
2889
  if (code != TSDB_CODE_SUCCESS) {
1,441,031✔
2890
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,441,031✔
2891
    for (int32_t i = 0; i < numOfOutput; ++i) {
1,441,031✔
UNCOV
2892
      taosMemoryFree(pFuncCtx[i].input.pData);
×
UNCOV
2893
      taosMemoryFree(pFuncCtx[i].input.pColumnDataAgg);
×
UNCOV
2894
    }
×
2895
    taosMemoryFreeClear(*rowEntryInfoOffset);
2,013,556✔
2896
    taosMemoryFreeClear(pFuncCtx);
2,013,556✔
2897

2,013,556✔
2898
    terrno = code;
456,169✔
2899
    return NULL;
456,169✔
2900
  }
456,169✔
2901
  return pFuncCtx;
1,433,809✔
2902
}
1,433,809✔
2903

1,433,809✔
2904
// NOTE: sources columns are more than the destination SSDatablock columns.
1,633,709✔
2905
// doFilter in table scan needs every column even its output is false
1,633,709✔
2906
int32_t relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn) {
1,633,709✔
2907
  int32_t code = TSDB_CODE_SUCCESS;
48,944✔
2908
  size_t  numOfSrcCols = taosArrayGetSize(pCols);
48,944✔
2909

48,944✔
2910
  int32_t i = 0, j = 0;
2,252,012✔
2911
  while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
2,252,012✔
2912
    SColumnInfoData* p = taosArrayGet(pCols, i);
2,252,012✔
2913
    if (!p) {
20,535,426✔
2914
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
20,535,426✔
2915
      return terrno;
20,535,426✔
UNCOV
2916
    }
×
UNCOV
2917
    SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j);
×
2918
    if (!pmInfo) {
2919
      return terrno;
2920
    }
29,814,656✔
2921

2922
    if (p->info.colId == pmInfo->colId) {
2923
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->dstSlotId);
22,152,298✔
2924
      if (!pDst) {
22,152,298✔
2925
        return terrno;
18,766,779✔
2926
      }
2927
      code = colDataAssign(pDst, p, pBlock->info.rows, &pBlock->info);
2928
      if (code != TSDB_CODE_SUCCESS) {
3,385,519✔
2929
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
2930
        return code;
3,387,225✔
2931
      }
3,572,229✔
2932
      i++;
1,619,981✔
2933
      j++;
1,612,919✔
2934
    } else if (p->info.colId < pmInfo->colId) {
1,427,915✔
2935
      i++;
2936
    } else {
2937
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
185,004✔
2938
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
2939
    }
3,380,163✔
2940
  }
2941
  return code;
22,162,215✔
2942
}
2943

729,430,067✔
2944
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
729,430,067✔
2945
  SInterval interval = {
2946
      .interval = pTableScanNode->interval,
729,430,067✔
2947
      .sliding = pTableScanNode->sliding,
729,400,544✔
2948
      .intervalUnit = pTableScanNode->intervalUnit,
729,433,480✔
2949
      .slidingUnit = pTableScanNode->slidingUnit,
2950
      .offset = pTableScanNode->offset,
2951
      .precision = pTableScanNode->scan.node.pOutputDataBlockDesc->precision,
10,327,515✔
2952
      .timeRange = pTableScanNode->scanRange,
10,327,515✔
2953
  };
10,327,515✔
2954
  calcIntervalAutoOffset(&interval);
45,532,469✔
2955

35,204,954✔
2956
  return interval;
2957
}
35,204,954✔
2958

2959
SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
2960
  SColumn c = {0};
10,327,515✔
2961

2962
  c.slotId = pColNode->slotId;
2963
  c.colId = pColNode->colId;
2964
  c.type = pColNode->node.resType.type;
2965
  c.bytes = pColNode->node.resType.bytes;
1,018,091,157✔
2966
  c.scale = pColNode->node.resType.scale;
2967
  c.precision = pColNode->node.resType.precision;
1,018,091,157✔
2968
  return c;
1,018,147,127✔
2969
}
22,151,800✔
2970

22,156,161✔
2971
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode,
2972
                               const SReadHandle* readHandle) {
2973
  pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
995,995,893✔
2974
  pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
995,982,090✔
2975

996,019,239✔
2976
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
2977
  if (!pCond->colList) {
2978
    return terrno;
2979
  }
996,004,031✔
2980
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
729,399,995✔
2981
  if (pCond->pSlotList == NULL) {
2982
    taosMemoryFreeClear(pCond->colList);
2983
    return terrno;
996,036,894✔
2984
  }
2985

2986
  // TODO: get it from stable scan node
10,327,515✔
2987
  pCond->twindows = pTableScanNode->scanRange;
2988
  pCond->suid = pTableScanNode->scan.suid;
2989
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
995,930,635✔
2990
  pCond->startVersion = -1;
2991
  pCond->endVersion = -1;
2992
  pCond->skipRollup = readHandle->skipRollup;
2,147,483,647✔
2993
  if (readHandle->winRangeValid) {
2,147,483,647✔
2994
    pCond->twindows = readHandle->winRange;
2,147,483,647✔
2995
  }
2,147,483,647✔
2996
  // allowed read stt file optimization mode
2,147,483,647✔
2997
  pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) &&
2,147,483,647✔
2998
                       (pTableScanNode->scan.node.pConditions == NULL) && (pTableScanNode->interval == 0);
2999

3000
  int32_t j = 0;
2,147,483,647✔
3001
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
2,147,483,647✔
3002
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pTableScanNode->scan.pScanCols, i);
2,147,483,647✔
3003
    if (!pNode) {
2,147,483,647✔
3004
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
3005
      return terrno;
2,147,483,647✔
3006
    }
2,147,483,647✔
3007
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
2,147,483,647✔
3008
    if (pColNode->colType == COLUMN_TYPE_TAG) {
3009
      continue;
UNCOV
3010
    }
×
UNCOV
3011

×
3012
    pCond->colList[j].type = pColNode->node.resType.type;
3013
    pCond->colList[j].bytes = pColNode->node.resType.bytes;
3014
    pCond->colList[j].colId = pColNode->colId;
2,147,483,647✔
3015
    pCond->colList[j].pk = pColNode->isPk;
2,147,483,647✔
3016

2,147,483,647✔
3017
    pCond->pSlotList[j] = pNode->slotId;
3018
    j += 1;
2,147,483,647✔
3019
  }
2,147,483,647✔
3020

2,147,483,647✔
3021
  pCond->numOfCols = j;
2,147,483,647✔
3022
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
3023
}
2,147,483,647✔
3024

2,147,483,647✔
3025
int32_t initQueryTableDataCondWithColArray(SQueryTableDataCond* pCond, SQueryTableDataCond* pOrgCond,
2,147,483,647✔
3026
                                           const SReadHandle* readHandle, SArray* colArray) {
3027
  int32_t code = TSDB_CODE_SUCCESS;
769,847,684✔
3028
  int32_t lino = 0;
769,847,684✔
3029

769,901,502✔
3030
  pCond->order = TSDB_ORDER_ASC;
769,828,180✔
3031
  pCond->numOfCols = (int32_t)taosArrayGetSize(colArray);
3032

2,147,483,647✔
3033
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
2,147,483,647✔
3034
  QUERY_CHECK_NULL(pCond->colList, code, lino, _return, terrno);
×
3035

×
3036
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
3037
  QUERY_CHECK_NULL(pCond->pSlotList, code, lino, _return, terrno);
2,147,483,647✔
3038

2,147,483,647✔
3039
  pCond->twindows = pOrgCond->twindows;
3040
  pCond->type = pOrgCond->type;
3041
  pCond->startVersion = -1;
29,467,801✔
3042
  pCond->endVersion = -1;
3043
  pCond->skipRollup = true;
2,012,956,339✔
3044
  pCond->notLoadData = false;
2,012,956,339✔
3045

32,764✔
3046
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
3047
    SColIdPair* pColPair = taosArrayGet(colArray, i);
3048
    QUERY_CHECK_NULL(pColPair, code, lino, _return, terrno);
2,012,594,369✔
3049

3050
    bool find = false;
3051
    for (int32_t j = 0; j < pOrgCond->numOfCols; ++j) {
458,974✔
3052
      if (pOrgCond->colList[j].colId == pColPair->vtbColId) {
458,974✔
3053
        pCond->colList[i].type = pOrgCond->colList[j].type;
458,974✔
UNCOV
3054
        pCond->colList[i].bytes = pOrgCond->colList[j].bytes;
×
3055
        pCond->colList[i].colId = pColPair->orgColId;
3056
        pCond->colList[i].pk = pOrgCond->colList[j].pk;
3057
        pCond->pSlotList[i] = i;
4,899,616✔
3058
        find = true;
4,899,616✔
3059
        break;
4,899,616✔
UNCOV
3060
      }
×
3061
    }
×
3062
    QUERY_CHECK_CONDITION(find, code, lino, _return, TSDB_CODE_NOT_FOUND);
3063
  }
4,899,616✔
3064

458,974✔
3065
  return code;
3066
_return:
UNCOV
3067
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(terrno));
×
3068
  taosMemoryFreeClear(pCond->colList);
3069
  taosMemoryFreeClear(pCond->pSlotList);
3070
  return code;
3,632,213✔
3071
}
3,632,213✔
3072

3,632,213✔
3073
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) {
3,632,213✔
3074
  taosMemoryFreeClear(pCond->colList);
3,632,213✔
3075
  taosMemoryFreeClear(pCond->pSlotList);
3076
}
2,147,483,647✔
3077

2,147,483,647✔
3078
int32_t convertFillType(int32_t mode) {
2,147,483,647✔
3079
  int32_t type = TSDB_FILL_NONE;
20,292,461✔
3080
  switch (mode) {
20,294,669✔
3081
    case FILL_MODE_PREV:
3082
      type = TSDB_FILL_PREV;
3083
      break;
2,147,483,647✔
3084
    case FILL_MODE_NONE:
2,147,483,647✔
3085
      type = TSDB_FILL_NONE;
3086
      break;
3087
    case FILL_MODE_NULL:
3088
      type = TSDB_FILL_NULL;
18,458,875✔
3089
      break;
18,458,875✔
3090
    case FILL_MODE_NULL_F:
18,458,875✔
3091
      type = TSDB_FILL_NULL_F;
18,458,875✔
UNCOV
3092
      break;
×
UNCOV
3093
    case FILL_MODE_NEXT:
×
3094
      type = TSDB_FILL_NEXT;
3095
      break;
3096
    case FILL_MODE_VALUE:
18,468,160✔
3097
      type = TSDB_FILL_SET_VALUE;
18,463,215✔
3098
      break;
18,455,161✔
3099
    case FILL_MODE_VALUE_F:
8,934✔
3100
      type = TSDB_FILL_SET_VALUE_F;
8,934✔
3101
      break;
3102
    case FILL_MODE_LINEAR:
3103
      type = TSDB_FILL_LINEAR;
18,446,227✔
3104
      break;
18,451,788✔
3105
    case FILL_MODE_NEAR:
3106
      type = TSDB_FILL_NEAR;
18,451,788✔
3107
      break;
18,462,517✔
3108
    default:
18,468,401✔
3109
      type = TSDB_FILL_NONE;
UNCOV
3110
  }
×
UNCOV
3111

×
3112
  return type;
3113
}
3114

18,477,335✔
3115
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery) {
18,461,813✔
UNCOV
3116
  if (ascQuery) {
×
3117
    *w = getAlignQueryTimeWindow(pInterval, ts);
3118
  } else {
18,461,813✔
3119
    // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
3120
    *w = getAlignQueryTimeWindow(pInterval, ts);
3121

18,481,009✔
3122
    int64_t key = w->skey;
3123
    while (key < ts) {  // moving towards end
3124
      key = getNextTimeWindowStart(pInterval, key, TSDB_ORDER_ASC);
1,502,858,686✔
3125
      if (key > ts) {
3126
        break;
1,502,858,686✔
3127
      }
1,502,930,471✔
3128

1,503,048,500✔
3129
      w->skey = key;
1,502,778,627✔
3130
    }
×
3131
    w->ekey = taosTimeAdd(w->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
×
3132
  }
3133
}
3134

1,502,778,627✔
UNCOV
3135
static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) {
×
3136
  STimeWindow w = {0};
3137

3138
  w.skey = taosTimeTruncate(ts, pInterval);
3139
  w.ekey = taosTimeGetIntervalEnd(w.skey, pInterval);
3140
  return w;
1,502,778,627✔
3141
}
1,494,016,463✔
3142

1,493,859,672✔
3143
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) {
1,494,031,467✔
3144
  STimeWindow win = *pWindow;
8,762,347✔
3145
  STimeWindow save = win;
7,871,291✔
3146
  while (win.skey <= ts && win.ekey >= ts) {
7,871,291✔
3147
    save = win;
7,871,291✔
3148
    // get previous time window
3149
    getNextTimeWindow(pInterval, &win, order == TSDB_ORDER_ASC ? TSDB_ORDER_DESC : TSDB_ORDER_ASC);
3150
  }
891,078✔
3151

992,636✔
3152
  return save;
753,628✔
3153
}
3154

239,008✔
3155
// get the correct time window according to the handled timestamp
3156
// todo refactor
3157
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
991,944✔
3158
                                int32_t order) {
991,944✔
3159
  STimeWindow w = {0};
3160
  if (pResultRowInfo->cur.pageId == -1) {  // the first window, from the previous stored value
3161
    getInitialStartTimeWindow(pInterval, ts, &w, (order == TSDB_ORDER_ASC));
2,147,483,647✔
3162
    return w;
3163
  }
6,361,599✔
3164

3165
  SResultRow* pRow = getResultRowByPos(pBuf, &pResultRowInfo->cur, false);
1,834,729,348✔
3166
  if (pRow) {
1,834,729,348✔
3167
    TAOS_SET_OBJ_ALIGNED(&w, pRow->win);
1,834,155,596✔
UNCOV
3168
  }
×
3169

3170
  // in case of typical time window, we can calculate time window directly.
3171
  if (w.skey > ts || w.ekey < ts) {
1,834,155,596✔
3172
    w = doCalculateTimeWindow(ts, pInterval);
1,834,226,312✔
3173
  }
1,834,418,451✔
UNCOV
3174

×
3175
  if (pInterval->interval != pInterval->sliding) {
3176
    // it is an sliding window query, in which sliding value is not equalled to
3177
    // interval value, and we need to find the first qualified time window.
1,834,652,873✔
3178
    w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
1,835,356,703✔
UNCOV
3179
  }
×
3180

3181
  return w;
3182
}
1,835,319,258✔
3183

1,835,320,812✔
3184
TSKEY getNextTimeWindowStart(const SInterval* pInterval, TSKEY start, int32_t order) {
UNCOV
3185
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
×
UNCOV
3186
  TSKEY   nextStart = taosTimeAdd(start, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
×
UNCOV
3187
  nextStart = taosTimeAdd(nextStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision, NULL);
×
3188
  nextStart = taosTimeAdd(nextStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
3189
  return nextStart;
3190
}
1,928,968,050✔
3191

1,928,968,050✔
3192
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order) {
94,076,138✔
3193
  tw->skey = getNextTimeWindowStart(pInterval, tw->skey, order);
3194
  tw->ekey = taosTimeAdd(tw->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
3195
}
1,834,891,912✔
3196

1,834,716,071✔
3197
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
3198
  return (pLimitInfo->limit.limit != -1 || pLimitInfo->limit.offset != -1 || pLimitInfo->slimit.limit != -1 ||
1,834,756,253✔
3199
          pLimitInfo->slimit.offset != -1);
1,835,076,827✔
3200
}
1,835,016,883✔
3201

1,835,183,667✔
3202
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo) {
1,835,103,214✔
3203
  return (pLimitInfo->slimit.limit != -1 || pLimitInfo->slimit.offset != -1);
3204
}
3205

7,617,460✔
3206
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) {
7,617,460✔
UNCOV
3207
  SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)};
×
3208
  SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)};
3209

3210
  pLimitInfo->limit = limit;
7,617,460✔
3211
  pLimitInfo->slimit = slimit;
7,636,233✔
3212
  pLimitInfo->remainOffset = limit.offset;
7,646,713✔
3213
  pLimitInfo->remainGroupOffset = slimit.offset;
7,646,713✔
3214
  pLimitInfo->numOfOutputRows = 0;
7,646,713✔
3215
  pLimitInfo->numOfOutputGroups = 0;
7,646,713✔
3216
  pLimitInfo->currentGroupId = 0;
3217
}
3218

2,147,483,647✔
3219
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo) {
2,147,483,647✔
3220
  pLimitInfo->numOfOutputRows = 0;
2,147,483,647✔
3221
  pLimitInfo->remainOffset = pLimitInfo->limit.offset;
3222
}
2,147,483,647✔
3223

2,147,483,647✔
3224
int32_t tableListGetSize(const STableListInfo* pTableList, int32_t* pRes) {
3225
  if (taosArrayGetSize(pTableList->pTableList) != taosHashGetSize(pTableList->map)) {
316,535,051✔
3226
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
3227
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
3228
  }
3229
  (*pRes) = taosArrayGetSize(pTableList->pTableList);
278,065,344✔
3230
  return TSDB_CODE_SUCCESS;
278,065,344✔
3231
}
278,155,382✔
3232

3233
uint64_t tableListGetSuid(const STableListInfo* pTableList) { return pTableList->idInfo.suid; }
278,140,746✔
3234

278,110,448✔
UNCOV
3235
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) {
×
3236
  if (taosArrayGetSize(pTableList->pTableList) == 0) {
3237
    return NULL;
3238
  }
278,110,448✔
3239

277,959,493✔
UNCOV
3240
  return taosArrayGet(pTableList->pTableList, index);
×
UNCOV
3241
}
×
3242

3243
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex) {
277,959,493✔
3244
  int32_t numOfTables = taosArrayGetSize(pTableList->pTableList);
3245
  if (startIndex >= numOfTables) {
278,131,705✔
3246
    return -1;
278,140,171✔
3247
  }
278,140,171✔
UNCOV
3248

×
UNCOV
3249
  for (int32_t i = startIndex; i < numOfTables; ++i) {
×
3250
    STableKeyInfo* p = taosArrayGet(pTableList->pTableList, i);
3251
    if (!p) {
3252
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
1,512,699,208✔
3253
      return -1;
1,234,556,830✔
3254
    }
1,234,462,329✔
UNCOV
3255
    if (p->uid == uid) {
×
UNCOV
3256
      return i;
×
3257
    }
3258
  }
1,234,462,329✔
3259
  return -1;
66,910,212✔
3260
}
66,910,212✔
UNCOV
3261

×
UNCOV
3262
void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, uint64_t* uid, int32_t* type) {
×
3263
  *psuid = pTableList->idInfo.suid;
3264
  *uid = pTableList->idInfo.uid;
66,910,212✔
3265
  *type = pTableList->idInfo.tableType;
3266
}
3267

3268
uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
278,166,183✔
3269
  int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
278,171,446✔
3270
  if (slot == NULL) {
278,033,870✔
UNCOV
3271
    qDebug("table:%" PRIu64 " not found in table list", tableUid);
×
UNCOV
3272
    return -1;
×
3273
  }
3274

3275
  STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot);
277,979,914✔
3276
  return pKeyInfo->groupId;
278,097,433✔
3277
}
277,846,224✔
3278

3279
// TODO handle the group offset info, fix it, the rule of group output will be broken by this function
3280
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
1,569,815,417✔
3281
  int32_t code = TSDB_CODE_SUCCESS;
3282
  int32_t lino = 0;
1,569,815,417✔
3283
  if (pTableList->map == NULL) {
3284
    pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,569,815,417✔
3285
    QUERY_CHECK_NULL(pTableList->map, code, lino, _end, terrno);
1,569,552,476✔
3286
  }
1,569,733,484✔
UNCOV
3287

×
3288
  STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
3289
  void*         p = taosHashGet(pTableList->map, &uid, sizeof(uid));
1,569,733,484✔
3290
  if (p != NULL) {
1,569,699,309✔
3291
    qInfo("table:%" PRId64 " already in tableIdList, ignore it", uid);
1,537,070,716✔
3292
    goto _end;
1,163,174,704✔
3293
  }
90,551,356✔
3294

90,551,356✔
3295
  void* tmp = taosArrayPush(pTableList->pTableList, &keyInfo);
90,551,356✔
UNCOV
3296
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
3297

3298
  int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
3299
  code = taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
465,096,299✔
3300
  if (code != TSDB_CODE_SUCCESS) {
374,549,703✔
3301
    // we have checked the existence of uid in hash map above
374,550,219✔
UNCOV
3302
    QUERY_CHECK_CONDITION((code != TSDB_CODE_DUP_KEY), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
UNCOV
3303
    taosArrayPopTailBatch(pTableList->pTableList, 1);  // let's pop the last element in the array list
×
3304
  }
3305

374,550,219✔
3306
_end:
374,558,593✔
3307
  if (code != TSDB_CODE_SUCCESS) {
374,554,165✔
3308
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
374,546,482✔
3309
  } else {
×
UNCOV
3310
    qDebug("uid:%" PRIu64 ", groupId:%" PRIu64 " added into table list, slot:%d, total:%d", uid, gid, slot, slot + 1);
×
3311
  }
3312

3313
  return code;
3314
}
2,147,483,647✔
3315

2,147,483,647✔
3316
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
2,147,483,647✔
UNCOV
3317
                              int32_t* size) {
×
UNCOV
3318
  int32_t totalGroups = tableListGetOutputGroups(pTableList);
×
3319
  int32_t numOfTables = 0;
3320
  int32_t code = tableListGetSize(pTableList, &numOfTables);
2,147,483,647✔
3321
  if (code != TSDB_CODE_SUCCESS) {
3322
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
3323
    return code;
3324
  }
1,536,784,644✔
3325

1,612,882✔
3326
  if (ordinalGroupIndex < 0 || ordinalGroupIndex >= totalGroups) {
3327
    return TSDB_CODE_INVALID_PARA;
3328
  }
1,536,784,644✔
3329

1,537,215,257✔
3330
  // here handle two special cases:
408,347,278✔
3331
  // 1. only one group exists, and 2. one table exists for each group.
3332
  if (totalGroups == 1) {
3333
    *size = numOfTables;
1,537,204,371✔
3334
    *pKeyInfo = (*size == 0) ? NULL : taosArrayGet(pTableList->pTableList, 0);
23,377,004✔
3335
    return TSDB_CODE_SUCCESS;
23,373,985✔
3336
  } else if (totalGroups == numOfTables) {
1,513,827,367✔
3337
    *size = 1;
330,615✔
3338
    *pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex);
3339
    return TSDB_CODE_SUCCESS;
1,513,500,665✔
3340
  }
3341

1,537,073,170✔
3342
  int32_t offset = pTableList->groupOffset[ordinalGroupIndex];
276,024,277✔
3343
  if (ordinalGroupIndex < totalGroups - 1) {
3344
    *size = pTableList->groupOffset[ordinalGroupIndex + 1] - offset;
3345
  } else {
32,628,593✔
3346
    *size = numOfTables - offset;
32,628,593✔
3347
  }
31,370,212✔
3348

31,370,212✔
3349
  *pKeyInfo = taosArrayGet(pTableList->pTableList, offset);
15,256,319✔
3350
  return TSDB_CODE_SUCCESS;
15,016,865✔
3351
}
3352

3353
int32_t tableListGetOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; }
3354

32,628,593✔
3355
bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; }
32,620,497✔
3356

16,005✔
3357
STableListInfo* tableListCreate() {
3358
  STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo));
3359
  if (pListInfo == NULL) {
32,604,492✔
3360
    return NULL;
3361
  }
32,608,341✔
3362

2,023,091✔
3363
  pListInfo->remainGroups = NULL;
3364
  pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
3365
  if (pListInfo->pTableList == NULL) {
3366
    goto _error;
3367
  }
1,569,465,948✔
3368

2,147,483,647✔
3369
  pListInfo->map = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
2,147,483,647✔
3370
  if (pListInfo->map == NULL) {
2,147,483,647✔
3371
    goto _error;
×
UNCOV
3372
  }
×
3373

3374
  pListInfo->numOfOuputGroups = 1;
2,147,483,647✔
3375
  return pListInfo;
2,147,483,647✔
UNCOV
3376

×
UNCOV
3377
_error:
×
3378
  tableListDestroy(pListInfo);
3379
  return NULL;
3380
}
3381

1,570,011,696✔
3382
void tableListDestroy(STableListInfo* pTableListInfo) {
3383
  if (pTableListInfo == NULL) {
3384
    return;
1,706,857,949✔
3385
  }
3386

3387
  taosArrayDestroy(pTableListInfo->pTableList);
1,706,896,812✔
3388
  taosMemoryFreeClear(pTableListInfo->groupOffset);
1,706,896,812✔
3389

3390
  taosHashCleanup(pTableListInfo->map);
1,705,056,905✔
UNCOV
3391
  taosHashCleanup(pTableListInfo->remainGroups);
×
UNCOV
3392
  pTableListInfo->pTableList = NULL;
×
3393
  pTableListInfo->map = NULL;
3394
  taosMemoryFree(pTableListInfo);
3395
}
1,705,056,905✔
3396

466,748✔
3397
void tableListClear(STableListInfo* pTableListInfo) {
466,748✔
3398
  if (pTableListInfo == NULL) {
3399
    return;
1,706,616,137✔
3400
  }
1,706,738,031✔
3401

1,706,788,151✔
3402
  taosArrayClear(pTableListInfo->pTableList);
1,707,681,295✔
3403
  taosHashClear(pTableListInfo->map);
10,670✔
3404
  taosHashClear(pTableListInfo->remainGroups);
10,670✔
3405
  taosMemoryFree(pTableListInfo->groupOffset);
3406
  pTableListInfo->numOfOuputGroups = 1;
3407
  pTableListInfo->oneTableForEachGroup = false;
1,707,670,625✔
3408
}
3409

1,707,827,139✔
3410
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
1,707,827,139✔
3411
  STableKeyInfo* pInfo1 = (STableKeyInfo*)p1;
1,707,893,862✔
3412
  STableKeyInfo* pInfo2 = (STableKeyInfo*)p2;
3413

3414
  if (pInfo1->groupId == pInfo2->groupId) {
1,707,659,674✔
3415
    return 0;
137,896,413✔
3416
  } else {
137,901,448✔
3417
    return pInfo1->groupId < pInfo2->groupId ? -1 : 1;
3418
  }
3419
}
1,569,763,261✔
3420

1,569,970,191✔
3421
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
16,005✔
3422
  taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
3423
  int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
3424

1,569,990,385✔
3425
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
1,569,914,066✔
3426
  if (!pList) {
3427
    return terrno;
1,569,901,972✔
3428
  }
3429

3430
  STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
78,383,899✔
3431
  if (!pInfo) {
78,383,899✔
UNCOV
3432
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
3433
    return terrno;
×
3434
  }
77,645,451✔
3435
  uint64_t gid = pInfo->groupId;
77,645,451✔
3436

738,448✔
3437
  int32_t start = 0;
738,448✔
3438
  void*   tmp = taosArrayPush(pList, &start);
UNCOV
3439
  if (!tmp) {
×
3440
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
3441
    return terrno;
3442
  }
2,147,483,647✔
3443

2,147,483,647✔
3444
  for (int32_t i = 1; i < size; ++i) {
2,147,483,647✔
3445
    pInfo = taosArrayGet(pTableListInfo->pTableList, i);
133,665,533✔
3446
    if (!pInfo) {
133,670,393✔
3447
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
2,147,483,647✔
3448
      return terrno;
259,410,758✔
3449
    }
259,444,255✔
3450
    if (pInfo->groupId != gid) {
3451
      tmp = taosArrayPush(pList, &i);
3452
      if (!tmp) {
2,147,483,647✔
3453
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
2,147,483,647✔
3454
        return terrno;
2,147,483,647✔
3455
      }
2,147,483,647✔
3456
      gid = pInfo->groupId;
2,147,483,647✔
3457
    }
3458
  }
3459

3460
  pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
UNCOV
3461
  pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
×
3462
  if (pTableListInfo->groupOffset == NULL) {
×
UNCOV
3463
    taosArrayDestroy(pList);
×
UNCOV
3464
    return terrno;
×
UNCOV
3465
  }
×
UNCOV
3466

×
3467
  memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
3468
  taosArrayDestroy(pList);
UNCOV
3469
  return TSDB_CODE_SUCCESS;
×
3470
}
UNCOV
3471

×
UNCOV
3472
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SScanPhysiNode* pScanNode,
×
UNCOV
3473
                                    SNodeList* group, bool groupSort, uint8_t* digest, SStorageAPI* pAPI, SHashObj* groupIdMap) {
×
UNCOV
3474
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
3475

×
UNCOV
3476
  bool   groupByTbname = groupbyTbname(group);
×
UNCOV
3477
  size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
×
UNCOV
3478
  if (!numOfTables) {
×
3479
    return code;
3480
  }
3481
  qDebug("numOfTables:%zu, groupByTbname:%d, group:%p", numOfTables, groupByTbname, group);
3482
  if (group == NULL || groupByTbname) {
3483
    if (tsCountAlwaysReturnValue && QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode) &&
1,055,239,703✔
3484
        ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) {
3485
      pTableListInfo->remainGroups =
2,147,483,647✔
3486
          taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
2,147,483,647✔
3487
      if (pTableListInfo->remainGroups == NULL) {
3488
        return terrno;
2,147,483,647✔
3489
      }
2,147,483,647✔
3490

2,147,483,647✔
3491
      for (int i = 0; i < numOfTables; i++) {
2,147,483,647✔
3492
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
2,147,483,647✔
3493
        if (!info) {
3494
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
2,147,483,647✔
3495
          return terrno;
3496
        }
2,147,483,647✔
3497
        info->groupId = groupByTbname ? info->uid : 0;
2,147,483,647✔
3498
        int32_t tempRes = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId),
2,147,483,647✔
3499
                                      &(info->uid), sizeof(info->uid));
3500
        if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
2,147,483,647✔
3501
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
2,147,483,647✔
3502
          return tempRes;
2,147,483,647✔
3503
        }
2,147,483,647✔
3504
      }
3505
    } else {
2,147,483,647✔
3506
      for (int32_t i = 0; i < numOfTables; i++) {
2,147,483,647✔
3507
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
3508
        if (!info) {
2,147,483,647✔
3509
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
2,147,483,647✔
3510
          return terrno;
2,147,483,647✔
3511
        }
×
UNCOV
3512
        info->groupId = groupByTbname ? info->uid : 0;
×
UNCOV
3513
        
×
3514
      }
2,147,483,647✔
3515
    }
2,147,483,647✔
3516
    if (groupIdMap && group != NULL){
×
3517
      getColInfoResultForGroupbyForStream(pHandle->vnode, group, pTableListInfo, pAPI, groupIdMap);
×
3518
    }
3519

2,147,483,647✔
3520
    pTableListInfo->oneTableForEachGroup = groupByTbname;
2,147,483,647✔
3521
    if (numOfTables == 1 && pTableListInfo->idInfo.tableType == TSDB_CHILD_TABLE) {
3522
      pTableListInfo->oneTableForEachGroup = true;
3523
    }
2,147,483,647✔
3524

2,147,483,647✔
3525
    if (groupSort && groupByTbname) {
3526
      taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
3527
      pTableListInfo->numOfOuputGroups = numOfTables;
3528
    } else if (groupByTbname && pScanNode->groupOrderScan) {
2,147,483,647✔
3529
      pTableListInfo->numOfOuputGroups = numOfTables;
2,147,483,647✔
3530
    } else {
3531
      pTableListInfo->numOfOuputGroups = 1;
3532
    }
525,142,655✔
3533
    if (groupSort || pScanNode->groupOrderScan) {
525,142,655✔
3534
      code = sortTableGroup(pTableListInfo);
525,147,125✔
3535
    }
525,147,125✔
3536
  } else {
525,147,125✔
3537
    bool initRemainGroups = false;
3538
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) {
1,568,440,928✔
3539
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode;
1,043,237,907✔
3540
      if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable &&
1,043,338,340✔
3541
          !(groupSort || pScanNode->groupOrderScan)) {
1,043,396,356✔
3542
        initRemainGroups = true;
3543
      }
1,043,418,962✔
3544
    }
3545

2,086,691,248✔
3546
    code = getColInfoResultForGroupby(pHandle->vnode, group, pTableListInfo, digest, pAPI, initRemainGroups, groupIdMap);
37,512,690✔
3547
    if (code != TSDB_CODE_SUCCESS) {
3548
      return code;
1,005,920,157✔
3549
    }
1,005,817,745✔
3550

1,005,934,117✔
3551
    if (pScanNode->groupOrderScan) pTableListInfo->numOfOuputGroups = taosArrayGetSize(pTableListInfo->pTableList);
×
UNCOV
3552

×
UNCOV
3553
    if (groupSort || pScanNode->groupOrderScan) {
×
3554
      code = sortTableGroup(pTableListInfo);
1,005,851,117✔
3555
    }
151,910,177✔
UNCOV
3556
  }
×
3557

×
3558
  // add all table entry in the hash map
3559
  size_t size = taosArrayGetSize(pTableListInfo->pTableList);
151,996,164✔
3560
  for (int32_t i = 0; i < size; ++i) {
151,985,352✔
3561
    STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i);
3562
    if (!p) {
3563
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
853,953,733✔
3564
      return terrno;
853,941,678✔
3565
    }
3566
    int32_t tempRes = taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t));
3567
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
3568
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
525,203,021✔
3569
      return tempRes;
3570
    }
3571
  }
2,147,483,647✔
3572

2,147,483,647✔
3573
  return code;
2,147,483,647✔
3574
}
2,147,483,647✔
3575

2,147,483,647✔
3576
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
3577
                                STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
3578
                                SExecTaskInfo* pTaskInfo, SHashObj* groupIdMap) {
2,147,483,647✔
3579
  int64_t     st = taosGetTimestampUs();
2,147,483,647✔
3580
  const char* idStr = GET_TASKID(pTaskInfo);
2,147,483,647✔
3581

2,147,483,647✔
3582
  if (pHandle == NULL) {
3583
    qError("invalid handle, in creating operator tree, %s", idStr);
3584
    return TSDB_CODE_INVALID_PARA;
980,796✔
3585
  }
3586

980,796✔
3587
  if (pHandle->uid != 0) {
2,991,120✔
3588
    pScanNode->uid = pHandle->uid;
2,010,324✔
3589
    pScanNode->tableType = TSDB_CHILD_TABLE;
2,010,324✔
3590
  }
2,010,324✔
UNCOV
3591
  uint8_t digest[17] = {0};
×
UNCOV
3592
  int32_t code = getTableList(pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo, digest, idStr,
×
UNCOV
3593
                              &pTaskInfo->storageAPI, pTaskInfo->pStreamRuntimeInfo);
×
3594
  if (code != TSDB_CODE_SUCCESS) {
3595
    qError("failed to getTableList, code:%s", tstrerror(code));
3596
    return code;
980,796✔
3597
  }
3598

3599
  int32_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
980,796✔
3600

980,796✔
3601
  int64_t st1 = taosGetTimestampUs();
980,796✔
3602
  pTaskInfo->cost.extractListTime = (st1 - st) / 1000.0;
980,796✔
3603
  qDebug("extract queried table list completed, %d tables, elapsed time:%.2f ms %s", numOfTables,
980,796✔
3604
         pTaskInfo->cost.extractListTime, idStr);
2,495,400✔
3605

1,514,604✔
3606
  if (numOfTables == 0) {
1,512,052✔
3607
    qDebug("no table qualified for query, %s", idStr);
1,512,052✔
3608
    return TSDB_CODE_SUCCESS;
3609
  }
980,796✔
3610

980,796✔
3611
  code = buildGroupIdMapForAllTables(pTableListInfo, pHandle, pScanNode, pGroupTags, groupSort, digest, &pTaskInfo->storageAPI, groupIdMap);
3612
  if (code != TSDB_CODE_SUCCESS) {
980,796✔
3613
    return code;
980,796✔
UNCOV
3614
  }
×
3615

3616
  pTaskInfo->cost.groupIdMapTime = (taosGetTimestampUs() - st1) / 1000.0;
980,796✔
3617
  qDebug("generate group id map completed, elapsed time:%.2f ms %s", pTaskInfo->cost.groupIdMapTime, idStr);
3618

3619
  return TSDB_CODE_SUCCESS;
3620
}
3621

3622
char* getStreamOpName(uint16_t opType) {
3623
  switch (opType) {
3624
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
3625
      return "stream scan";
3626
    case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
3627
      return "project";
3628
    case QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW:
3629
      return "external window";
3630
  }
3631
  return "error name";
3632
}
3633

3634
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr, int64_t qId) {
3635
  if (qDebugFlag & DEBUG_TRACE) {
3636
    if (!pBlock) {
3637
      qDebug("%" PRIx64 " %s %s %s: Block is Null", qId, taskIdStr, flag, __func__);
3638
      return;
3639
    } else if (pBlock->info.rows == 0) {
3640
      qDebug("%" PRIx64 " %s %s %s: Block is Empty. block type %d", qId, taskIdStr, flag, __func__, pBlock->info.type);
3641
      return;
3642
    }
3643
    
3644
    char*   pBuf = NULL;
3645
    int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr, qId);
3646
    if (code == 0) {
3647
      qDebugL("%" PRIx64 " %s %s", qId, __func__, pBuf);
3648
      taosMemoryFree(pBuf);
3649
    }
3650
  }
3651
}
3652

3653
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
3654
  if (!pBlock) {
3655
    qDebug("%s===stream===%s %s: Block is Null", taskIdStr, flag, opStr);
3656
    return;
3657
  } else if (pBlock->info.rows == 0) {
3658
    qDebug("%s===stream===%s %s: Block is Empty. block type %d.skey:%" PRId64 ",ekey:%" PRId64 ",version%" PRId64,
3659
           taskIdStr, flag, opStr, pBlock->info.type, pBlock->info.window.skey, pBlock->info.window.ekey,
3660
           pBlock->info.version);
3661
    return;
3662
  }
3663
  if (qDebugFlag & DEBUG_DEBUG) {
3664
    char* pBuf = NULL;
3665
    char  flagBuf[64];
3666
    snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
3667
    int32_t code = dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr, 0);
3668
    if (code == 0) {
3669
      qDebug("%s", pBuf);
3670
      taosMemoryFree(pBuf);
3671
    }
3672
  }
3673
}
3674

3675
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
3676

3677
void updateTimeWindowInfo(SColumnInfoData* pColData, const STimeWindow* pWin, int64_t delta) {
3678
  int64_t* ts = (int64_t*)pColData->pData;
3679

3680
  int64_t duration = pWin->ekey - pWin->skey + delta;
3681
  ts[2] = duration;            // set the duration
3682
  ts[3] = pWin->skey;          // window start key
3683
  ts[4] = pWin->ekey + delta;  // window end key
3684
}
3685

3686
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock,
3687
                 int32_t rowIndex) {
3688
  SColumnDataAgg* pColAgg = NULL;
3689
  const char*     isNull = oldkeyBuf;
3690
  const char*     p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size;
3691

3692
  for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
3693
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
3694
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
3695
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
3696

3697
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
3698
      if (isNull[i] != 1) return 1;
3699
    } else {
3700
      if (isNull[i] != 0) return 1;
3701
      const char* val = colDataGetData(pColInfoData, rowIndex);
3702
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
3703
        int32_t len = getJsonValueLen(val);
3704
        if (memcmp(p, val, len) != 0) return 1;
3705
        p += len;
3706
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
3707
        if (IS_STR_DATA_BLOB(pCol->type)) {
3708
          if (memcmp(p, val, blobDataTLen(val)) != 0) return 1;
3709
          p += blobDataTLen(val);
3710
        } else {
3711
          if (memcmp(p, val, varDataTLen(val)) != 0) return 1;
3712
          p += varDataTLen(val);
3713
        }
3714
      } else {
3715
        if (0 != memcmp(p, val, pCol->bytes)) return 1;
3716
        p += pCol->bytes;
3717
      }
3718
    }
3719
  }
3720
  if ((int32_t)(p - oldkeyBuf) != oldKeysLen) return 1;
3721
  return 0;
3722
}
3723

3724
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex) {
3725
  uint32_t        colNum = pSortGroupCols->size;
3726
  SColumnDataAgg* pColAgg = NULL;
3727
  char*           isNull = keyBuf;
3728
  char*           p = keyBuf + sizeof(int8_t) * colNum;
3729

3730
  for (int32_t i = 0; i < colNum; ++i) {
3731
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
3732
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
3733
    if (pCol->slotId > pBlock->pDataBlock->size) continue;
3734

3735
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
3736

3737
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
3738
      isNull[i] = 1;
3739
    } else {
3740
      isNull[i] = 0;
3741
      const char* val = colDataGetData(pColInfoData, rowIndex);
3742
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
3743
        int32_t len = getJsonValueLen(val);
3744
        memcpy(p, val, len);
3745
        p += len;
3746
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
3747
        if (IS_STR_DATA_BLOB(pCol->type)) {
3748
          blobDataCopy(p, val);
3749
          p += blobDataTLen(val);
3750
        } else {
3751
          varDataCopy(p, val);
3752
          p += varDataTLen(val);
3753
        }
3754
      } else {
3755
        memcpy(p, val, pCol->bytes);
3756
        p += pCol->bytes;
3757
      }
3758
    }
3759
  }
3760
  return (int32_t)(p - keyBuf);
3761
}
3762

3763
uint64_t calcGroupId(char* pData, int32_t len) {
3764
  T_MD5_CTX context;
3765
  tMD5Init(&context);
3766
  tMD5Update(&context, (uint8_t*)pData, len);
3767
  tMD5Final(&context);
3768

3769
  // NOTE: only extract the initial 8 bytes of the final MD5 digest
3770
  uint64_t id = 0;
3771
  memcpy(&id, context.digest, sizeof(uint64_t));
3772
  if (0 == id) memcpy(&id, context.digest + 8, sizeof(uint64_t));
3773
  return id;
3774
}
3775

3776
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
3777
  SNode*     node;
3778
  SNodeList* ret = NULL;
3779
  FOREACH(node, pSortKeys) {
3780
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)node;
3781
    int32_t           code = nodesListMakeAppend(&ret, pSortKey->pExpr);
3782
    if (code != TSDB_CODE_SUCCESS) {
3783
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
3784
      terrno = code;
3785
      return NULL;
3786
    }
3787
  }
3788
  return ret;
3789
}
3790

3791
int32_t extractKeysLen(const SArray* keys, int32_t* pLen) {
3792
  int32_t code = TSDB_CODE_SUCCESS;
3793
  int32_t lino = 0;
3794
  int32_t len = 0;
3795
  int32_t keyNum = taosArrayGetSize(keys);
3796
  for (int32_t i = 0; i < keyNum; ++i) {
3797
    SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
3798
    QUERY_CHECK_NULL(pCol, code, lino, _end, terrno);
3799
    len += pCol->bytes;
3800
  }
3801
  len += sizeof(int8_t) * keyNum;  // null flag
3802
  *pLen = len;
3803

3804
_end:
3805
  if (code != TSDB_CODE_SUCCESS) {
3806
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
3807
  }
3808
  return code;
3809
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc