• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4896

24 Dec 2025 07:36AM UTC coverage: 65.929% (+0.4%) from 65.513%
#4896

push

travis-ci

web-flow
enh: [TS-7591] Some code refactor and add more log. (#34022)

326 of 537 new or added lines in 4 files covered. (60.71%)

370 existing lines in 111 files now uncovered.

185828 of 281861 relevant lines covered (65.93%)

116309824.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.66
/source/libs/executor/src/executil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "function.h"
17
#include "functionMgt.h"
18
#include "index.h"
19
#include "os.h"
20
#include "query.h"
21
#include "querynodes.h"
22
#include "taoserror.h"
23
#include "tarray.h"
24
#include "tcompare.h"
25
#include "tdatablock.h"
26
#include "thash.h"
27
#include "tmsg.h"
28
#include "ttime.h"
29

30
#include "executil.h"
31
#include "executorInt.h"
32
#include "querytask.h"
33
#include "storageapi.h"
34
#include "tutil.h"
35
#include "tjson.h"
36
#include "trpc.h"
37

38
typedef struct tagFilterAssist {
39
  SHashObj* colHash;
40
  int32_t   index;
41
  SArray*   cInfoList;
42
  int32_t   code;
43
} tagFilterAssist;
44

45
typedef struct STransTagExprCtx {
46
  int32_t      code;
47
  SMetaReader* pReader;
48
} STransTagExprCtx;
49

50
typedef enum {
51
  FILTER_NO_LOGIC = 1,
52
  FILTER_AND,
53
  FILTER_OTHER,
54
} FilterCondType;
55

56
static FilterCondType checkTagCond(SNode* cond);
57
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI);
58
static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI,
59
                                        uint64_t suid);
60

61
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
62
                            STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI, void* pStreamInfo);
63

64
static int64_t getLimit(const SNode* pLimit) {
977,742,489✔
65
  return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->limit) ? -1 : ((SLimitNode*)pLimit)->limit->datum.i;
977,742,489✔
66
}
67
static int64_t getOffset(const SNode* pLimit) {
977,692,774✔
68
  return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->offset) ? -1 : ((SLimitNode*)pLimit)->offset->datum.i;
977,692,774✔
69
}
70
static void releaseColInfoData(void* pCol);
71

72
void initResultRowInfo(SResultRowInfo* pResultRowInfo) {
414,509,098✔
73
  pResultRowInfo->size = 0;
414,509,098✔
74
  pResultRowInfo->cur.pageId = -1;
414,527,942✔
75
}
414,564,226✔
76

77
void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; }
6,197,465✔
78

79
void resetResultRow(SResultRow* pResultRow, size_t entrySize) {
415,810,620✔
80
  pResultRow->numOfRows = 0;
415,810,620✔
81
  pResultRow->closed = false;
415,810,620✔
82
  pResultRow->endInterp = false;
415,810,620✔
83
  pResultRow->startInterp = false;
415,810,620✔
84

85
  if (entrySize > 0) {
415,810,620✔
86
    memset(pResultRow->pEntryInfo, 0, entrySize);
415,810,620✔
87
  }
88
}
415,810,982✔
89

90
// TODO refactor: use macro
91
SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) {
2,147,483,647✔
92
  return (SResultRowEntryInfo*)((char*)pRow->pEntryInfo + offset[index]);
2,147,483,647✔
93
}
94

95
size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
233,864,635✔
96
  int32_t rowSize = (numOfOutput * sizeof(SResultRowEntryInfo)) + sizeof(SResultRow);
233,864,635✔
97

98
  for (int32_t i = 0; i < numOfOutput; ++i) {
928,651,760✔
99
    rowSize += pCtx[i].resDataInfo.interBufSize;
694,827,920✔
100
  }
101

102
  return rowSize;
233,823,840✔
103
}
104

105
// Convert buf read from rocksdb to result row
106
int32_t getResultRowFromBuf(SExprSupp* pSup, const char* inBuf, size_t inBufSize, char** outBuf, size_t* outBufSize) {
×
107
  if (inBuf == NULL || pSup == NULL) {
×
108
    qError("invalid input parameters, inBuf:%p, pSup:%p", inBuf, pSup);
×
109
    return TSDB_CODE_INVALID_PARA;
×
110
  }
111
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
112
  int32_t*        offset = pSup->rowEntryInfoOffset;
×
113
  SResultRow*     pResultRow = NULL;
×
114
  size_t          processedSize = 0;
×
115
  int32_t         code = TSDB_CODE_SUCCESS;
×
116

117
  // calculate the size of output buffer
118
  *outBufSize = getResultRowSize(pCtx, pSup->numOfExprs);
×
119
  *outBuf = taosMemoryMalloc(*outBufSize);
×
120
  if (*outBuf == NULL) {
×
121
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
122
    return terrno;
×
123
  }
124
  pResultRow = (SResultRow*)*outBuf;
×
125
  (void)memcpy(pResultRow, inBuf, sizeof(SResultRow));
×
126
  inBuf += sizeof(SResultRow);
×
127
  processedSize += sizeof(SResultRow);
×
128

129
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
×
130
    int32_t len = *(int32_t*)inBuf;
×
131
    inBuf += sizeof(int32_t);
×
132
    processedSize += sizeof(int32_t);
×
133
    if (pResultRow->version != FUNCTION_RESULT_INFO_VERSION && pCtx->fpSet.decode) {
×
134
      code = pCtx->fpSet.decode(&pCtx[i], inBuf, getResultEntryInfo(pResultRow, i, offset), pResultRow->version);
×
135
      if (code != TSDB_CODE_SUCCESS) {
×
136
        qError("failed to decode result row, code:%d", code);
×
137
        return code;
×
138
      }
139
    } else {
140
      (void)memcpy(getResultEntryInfo(pResultRow, i, offset), inBuf, len);
×
141
    }
142
    inBuf += len;
×
143
    processedSize += len;
×
144
  }
145

146
  if (processedSize < inBufSize) {
×
147
    // stream stores extra data after result row
148
    size_t leftLen = inBufSize - processedSize;
×
149
    TAOS_MEMORY_REALLOC(*outBuf, *outBufSize + leftLen);
×
150
    if (*outBuf == NULL) {
×
151
      qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen);
×
152
      return terrno;
×
153
    }
154
    (void)memcpy(*outBuf + *outBufSize, inBuf, leftLen);
×
155
    inBuf += leftLen;
×
156
    processedSize += leftLen;
×
157
    *outBufSize += leftLen;
×
158
  }
159

160
  qTrace("[StreamInternal] get result inBufSize:%zu, outBufSize:%zu", inBufSize, *outBufSize);
×
161
  return TSDB_CODE_SUCCESS;
×
162
}
163

164
// Convert result row to buf for rocksdb
165
int32_t putResultRowToBuf(SExprSupp* pSup, const char* inBuf, size_t inBufSize, char** outBuf, size_t* outBufSize) {
×
166
  if (pSup == NULL || inBuf == NULL || outBuf == NULL || outBufSize == NULL) {
×
167
    qError("invalid input parameters, inBuf:%p, pSup:%p, outBufSize:%p, outBuf:%p", inBuf, pSup, outBufSize, outBuf);
×
168
    return TSDB_CODE_INVALID_PARA;
×
169
  }
170

171
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
172
  int32_t*        offset = pSup->rowEntryInfoOffset;
×
173
  SResultRow*     pResultRow = (SResultRow*)inBuf;
×
174
  size_t          rowSize = getResultRowSize(pCtx, pSup->numOfExprs);
×
175

176
  if (rowSize > inBufSize) {
×
177
    qError("invalid input buffer size, rowSize:%zu, inBufSize:%zu", rowSize, inBufSize);
×
178
    return TSDB_CODE_INVALID_PARA;
×
179
  }
180

181
  // calculate the size of output buffer
182
  *outBufSize = rowSize + sizeof(int32_t) * pSup->numOfExprs;
×
183
  if (rowSize < inBufSize) {
×
184
    *outBufSize += inBufSize - rowSize;
×
185
  }
186

187
  *outBuf = taosMemoryMalloc(*outBufSize);
×
188
  if (*outBuf == NULL) {
×
189
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
190
    return terrno;
×
191
  }
192

193
  char* pBuf = *outBuf;
×
194
  pResultRow->version = FUNCTION_RESULT_INFO_VERSION;
×
195
  (void)memcpy(pBuf, pResultRow, sizeof(SResultRow));
×
196
  pBuf += sizeof(SResultRow);
×
197
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
×
198
    size_t len = sizeof(SResultRowEntryInfo) + pCtx[i].resDataInfo.interBufSize;
×
199
    *(int32_t*)pBuf = (int32_t)len;
×
200
    pBuf += sizeof(int32_t);
×
201
    (void)memcpy(pBuf, getResultEntryInfo(pResultRow, i, offset), len);
×
202
    pBuf += len;
×
203
  }
204

205
  if (rowSize < inBufSize) {
×
206
    // stream stores extra data after result row
207
    size_t leftLen = inBufSize - rowSize;
×
208
    (void)memcpy(pBuf, inBuf + rowSize, leftLen);
×
209
    pBuf += leftLen;
×
210
  }
211

212
  qTrace("[StreamInternal] put result inBufSize:%zu, outBufSize:%zu", inBufSize, *outBufSize);
×
213
  return TSDB_CODE_SUCCESS;
×
214
}
215

216
static void freeEx(void* p) { taosMemoryFree(*(void**)p); }
×
217

218
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
108,646,012✔
219
  taosMemoryFreeClear(pGroupResInfo->pBuf);
108,646,012✔
220
  if (pGroupResInfo->freeItem) {
108,646,336✔
221
    //    taosArrayDestroy(pGroupResInfo->pRows);
222
    taosArrayDestroyEx(pGroupResInfo->pRows, freeEx);
×
223
    pGroupResInfo->freeItem = false;
×
224
    pGroupResInfo->pRows = NULL;
×
225
  } else {
226
    taosArrayDestroy(pGroupResInfo->pRows);
108,647,102✔
227
    pGroupResInfo->pRows = NULL;
108,646,865✔
228
  }
229
  pGroupResInfo->index = 0;
108,646,391✔
230
  pGroupResInfo->delIndex = 0;
108,648,319✔
231
}
108,645,223✔
232

233
int32_t resultrowComparAsc(const void* p1, const void* p2) {
2,147,483,647✔
234
  SResKeyPos* pp1 = *(SResKeyPos**)p1;
2,147,483,647✔
235
  SResKeyPos* pp2 = *(SResKeyPos**)p2;
2,147,483,647✔
236

237
  if (pp1->groupId == pp2->groupId) {
2,147,483,647✔
238
    int64_t pts1 = *(int64_t*)pp1->key;
2,147,483,647✔
239
    int64_t pts2 = *(int64_t*)pp2->key;
2,147,483,647✔
240

241
    if (pts1 == pts2) {
2,147,483,647✔
242
      return 0;
×
243
    } else {
244
      return pts1 < pts2 ? -1 : 1;
2,147,483,647✔
245
    }
246
  } else {
247
    return pp1->groupId < pp2->groupId ? -1 : 1;
2,147,483,647✔
248
  }
249
}
250

251
static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); }
1,935,991,999✔
252

253
int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) {
72,195,639✔
254
  int32_t code = TSDB_CODE_SUCCESS;
72,195,639✔
255
  int32_t lino = 0;
72,195,639✔
256
  if (pGroupResInfo->pRows != NULL) {
72,195,639✔
257
    taosArrayDestroy(pGroupResInfo->pRows);
5,644,271✔
258
  }
259
  if (pGroupResInfo->pBuf) {
72,194,236✔
260
    taosMemoryFree(pGroupResInfo->pBuf);
5,644,271✔
261
    pGroupResInfo->pBuf = NULL;
5,644,271✔
262
  }
263

264
  // extract the result rows information from the hash map
265
  int32_t size = tSimpleHashGetSize(pHashmap);
72,195,153✔
266

267
  void* pData = NULL;
72,194,458✔
268
  pGroupResInfo->pRows = taosArrayInit(size, POINTER_BYTES);
72,194,458✔
269
  QUERY_CHECK_NULL(pGroupResInfo->pRows, code, lino, _end, terrno);
72,194,100✔
270

271
  size_t  keyLen = 0;
72,193,390✔
272
  int32_t iter = 0;
72,194,398✔
273
  int64_t bufLen = 0, offset = 0;
72,193,380✔
274

275
  // todo move away and record this during create window
276
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
2,147,483,647✔
277
    /*void* key = */ (void)tSimpleHashGetKey(pData, &keyLen);
278
    bufLen += keyLen + sizeof(SResultRowPosition);
2,147,483,647✔
279
  }
280

281
  pGroupResInfo->pBuf = taosMemoryMalloc(bufLen);
72,187,066✔
282
  QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
72,193,900✔
283

284
  iter = 0;
72,190,824✔
285
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
2,147,483,647✔
286
    void* key = tSimpleHashGetKey(pData, &keyLen);
2,147,483,647✔
287

288
    SResKeyPos* p = (SResKeyPos*)(pGroupResInfo->pBuf + offset);
2,147,483,647✔
289

290
    p->groupId = *(uint64_t*)key;
2,147,483,647✔
291
    p->pos = *(SResultRowPosition*)pData;
2,147,483,647✔
292
    memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
2,147,483,647✔
293
    void* tmp = taosArrayPush(pGroupResInfo->pRows, &p);
2,147,483,647✔
294
    QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
2,147,483,647✔
295

296
    offset += keyLen + sizeof(struct SResultRowPosition);
2,147,483,647✔
297
  }
298

299
  if (order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC) {
72,171,158✔
300
    __compar_fn_t fn = (order == TSDB_ORDER_ASC) ? resultrowComparAsc : resultrowComparDesc;
13,912,600✔
301
    size = POINTER_BYTES;
13,912,600✔
302
    taosSort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), size, fn);
13,912,600✔
303
  }
304

305
  pGroupResInfo->index = 0;
72,171,756✔
306

307
_end:
72,191,491✔
308
  if (code != TSDB_CODE_SUCCESS) {
72,193,857✔
309
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
310
  }
311
  return code;
72,193,857✔
312
}
313

314
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
×
315
  if (pGroupResInfo->pRows != NULL) {
×
316
    taosArrayDestroy(pGroupResInfo->pRows);
×
317
  }
318

319
  pGroupResInfo->freeItem = true;
×
320
  pGroupResInfo->pRows = pArrayList;
×
321
  pGroupResInfo->index = 0;
×
322
  pGroupResInfo->delIndex = 0;
×
323
}
×
324

325
bool hasRemainResults(SGroupResInfo* pGroupResInfo) {
248,022,985✔
326
  if (pGroupResInfo->pRows == NULL) {
248,022,985✔
327
    return false;
×
328
  }
329

330
  return pGroupResInfo->index < taosArrayGetSize(pGroupResInfo->pRows);
248,024,242✔
331
}
332

333
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
133,004,256✔
334
  if (pGroupResInfo->pRows == 0) {
133,004,256✔
335
    return 0;
×
336
  }
337

338
  return (int32_t)taosArrayGetSize(pGroupResInfo->pRows);
133,005,396✔
339
}
340

341
SArray* createSortInfo(SNodeList* pNodeList) {
44,750,720✔
342
  size_t numOfCols = 0;
44,750,720✔
343

344
  if (pNodeList != NULL) {
44,750,720✔
345
    numOfCols = LIST_LENGTH(pNodeList);
44,697,083✔
346
  } else {
347
    numOfCols = 0;
53,637✔
348
  }
349

350
  SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
44,751,385✔
351
  if (pList == NULL) {
44,745,608✔
352
    return pList;
×
353
  }
354

355
  for (int32_t i = 0; i < numOfCols; ++i) {
99,482,356✔
356
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)nodesListGetNode(pNodeList, i);
54,732,679✔
357
    if (!pSortKey) {
54,737,028✔
358
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
359
      taosArrayDestroy(pList);
×
360
      pList = NULL;
×
361
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
362
      break;
×
363
    }
364
    SBlockOrderInfo bi = {0};
54,737,028✔
365
    bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
54,738,203✔
366
    bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
54,732,107✔
367

368
    if (nodeType(pSortKey->pExpr) != QUERY_NODE_COLUMN) {
54,738,002✔
369
      qError("invalid order by expr type:%d", nodeType(pSortKey->pExpr));
×
370
      taosArrayDestroy(pList);
×
371
      pList = NULL;
×
372
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
373
      break;
×
374
    }
375
    
376
    SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr;
54,732,261✔
377
    bi.slotId = pColNode->slotId;
54,738,678✔
378
    void* tmp = taosArrayPush(pList, &bi);
54,736,748✔
379
    if (!tmp) {
54,736,748✔
380
      taosArrayDestroy(pList);
×
381
      pList = NULL;
×
382
      break;
×
383
    }
384
  }
385

386
  return pList;
44,749,677✔
387
}
388

389
SSDataBlock* createDataBlockFromDescNode(void* p) {
598,408,544✔
390
  SDataBlockDescNode* pNode = (SDataBlockDescNode*)p;
598,408,544✔
391
  int32_t      numOfCols = LIST_LENGTH(pNode->pSlots);
598,408,544✔
392
  SSDataBlock* pBlock = NULL;
598,474,763✔
393
  int32_t      code = createDataBlock(&pBlock);
598,442,919✔
394
  if (code) {
598,368,284✔
395
    terrno = code;
×
396
    return NULL;
×
397
  }
398

399
  pBlock->info.id.blockId = pNode->dataBlockId;
598,368,284✔
400
  pBlock->info.type = STREAM_INVALID;
598,315,928✔
401
  pBlock->info.calWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
598,369,467✔
402
  pBlock->info.watermark = INT64_MIN;
598,456,704✔
403

404
  for (int32_t i = 0; i < numOfCols; ++i) {
2,147,483,647✔
405
    SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
2,147,483,647✔
406
    if (!pDescNode) {
2,147,483,647✔
407
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
408
      blockDataDestroy(pBlock);
×
409
      pBlock = NULL;
×
410
      terrno = TSDB_CODE_INVALID_PARA;
×
411
      break;
×
412
    }
413
    SColumnInfoData idata =
2,147,483,647✔
414
        createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
2,147,483,647✔
415
    idata.info.scale = pDescNode->dataType.scale;
2,147,483,647✔
416
    idata.info.precision = pDescNode->dataType.precision;
2,147,483,647✔
417
    idata.info.noData = pDescNode->reserve;
2,147,483,647✔
418

419
    code = blockDataAppendColInfo(pBlock, &idata);
2,147,483,647✔
420
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
421
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
15,360✔
422
      blockDataDestroy(pBlock);
15,360✔
423
      pBlock = NULL;
×
424
      terrno = code;
×
425
      break;
×
426
    }
427
  }
428

429
  return pBlock;
598,527,000✔
430
}
431

432
int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) {
214,392,435✔
433
  SDataBlockInfo* pBlockInfo = &pDataBlock->info;
214,392,435✔
434

435
  for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) {
1,006,450,315✔
436
    SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i);
799,738,957✔
437
    if (!pItem) {
799,664,462✔
438
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
439
      return terrno;
×
440
    }
441

442
    if (pItem->isPk) {
799,664,462✔
443
      SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId);
7,765,341✔
444
      if (!pInfoData) {
7,517,852✔
445
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
446
        return terrno;
×
447
      }
448
      pBlockInfo->pks[0].type = pInfoData->info.type;
7,517,852✔
449
      pBlockInfo->pks[1].type = pInfoData->info.type;
7,524,264✔
450

451
      // allocate enough buffer size, which is pInfoData->info.bytes
452
      if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
7,522,336✔
453
        pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
2,518,212✔
454
        if (pBlockInfo->pks[0].pData == NULL) {
2,507,406✔
455
          return terrno;
×
456
        }
457

458
        pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
2,510,881✔
459
        if (pBlockInfo->pks[1].pData == NULL) {
2,511,576✔
460
          taosMemoryFreeClear(pBlockInfo->pks[0].pData);
×
461
          return terrno;
×
462
        }
463

464
        pBlockInfo->pks[0].nData = pInfoData->info.bytes;
2,511,576✔
465
        pBlockInfo->pks[1].nData = pInfoData->info.bytes;
2,512,966✔
466
      }
467

468
      break;
7,525,497✔
469
    }
470
  }
471

472
  return TSDB_CODE_SUCCESS;
214,293,462✔
473
}
474

475
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
431,781✔
476
  STransTagExprCtx* pCtx = pContext;
431,781✔
477
  SMetaReader*      mr = pCtx->pReader;
431,781✔
478
  bool              isTagCol = false, isTbname = false;
431,781✔
479
  if (nodeType(*pNode) == QUERY_NODE_COLUMN) {
431,781✔
480
    SColumnNode* pCol = (SColumnNode*)*pNode;
123,366✔
481
    if (pCol->colType == COLUMN_TYPE_TBNAME)
123,366✔
482
      isTbname = true;
×
483
    else
484
      isTagCol = true;
123,366✔
485
  } else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) {
308,415✔
486
    SFunctionNode* pFunc = (SFunctionNode*)*pNode;
×
487
    if (pFunc->funcType == FUNCTION_TYPE_TBNAME) isTbname = true;
×
488
  }
489
  if (isTagCol) {
431,781✔
490
    SColumnNode* pSColumnNode = *(SColumnNode**)pNode;
123,366✔
491

492
    SValueNode* res = NULL;
123,366✔
493
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
123,366✔
494
    if (NULL == res) {
123,366✔
495
      return DEAL_RES_ERROR;
×
496
    }
497

498
    res->translate = true;
123,366✔
499
    res->node.resType = pSColumnNode->node.resType;
123,366✔
500

501
    STagVal tagVal = {0};
123,366✔
502
    tagVal.cid = pSColumnNode->colId;
123,366✔
503
    const char* p = mr->pAPI->extractTagVal(mr->me.ctbEntry.pTags, pSColumnNode->node.resType.type, &tagVal);
123,366✔
504
    if (p == NULL) {
123,366✔
505
      res->node.resType.type = TSDB_DATA_TYPE_NULL;
×
506
    } else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) {
123,366✔
507
      int32_t len = ((const STag*)p)->len;
×
508
      res->datum.p = taosMemoryCalloc(len + 1, 1);
×
509
      if (NULL == res->datum.p) {
×
510
        return DEAL_RES_ERROR;
×
511
      }
512
      memcpy(res->datum.p, p, len);
×
513
    } else if (IS_VAR_DATA_TYPE(pSColumnNode->node.resType.type)) {
123,366✔
514
      if (IS_STR_DATA_BLOB(pSColumnNode->node.resType.type)) {
123,366✔
515
        return TSDB_CODE_BLOB_NOT_SUPPORT_TAG;
×
516
      }
517

518
      res->datum.p = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1);
123,366✔
519
      if (NULL == res->datum.p) {
123,366✔
520
        return DEAL_RES_ERROR;
×
521
      }
522

523
      if (IS_STR_DATA_BLOB(pSColumnNode->node.resType.type)) {
123,366✔
524
        memcpy(blobDataVal(res->datum.p), tagVal.pData, tagVal.nData);
×
525
        blobDataSetLen(res->datum.p, tagVal.nData);
×
526
      } else {
527
        memcpy(varDataVal(res->datum.p), tagVal.pData, tagVal.nData);
123,366✔
528
        varDataSetLen(res->datum.p, tagVal.nData);
123,366✔
529
      }
530
    } else {
531
      int32_t code = nodesSetValueNodeValue(res, &(tagVal.i64));
×
532
      if (code != TSDB_CODE_SUCCESS) {
×
533
        return DEAL_RES_ERROR;
×
534
      }
535
    }
536
    nodesDestroyNode(*pNode);
123,366✔
537
    *pNode = (SNode*)res;
123,366✔
538
  } else if (isTbname) {
308,415✔
539
    SValueNode* res = NULL;
×
540
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
×
541
    if (NULL == res) {
×
542
      return DEAL_RES_ERROR;
×
543
    }
544

545
    res->translate = true;
×
546
    res->node.resType = ((SExprNode*)(*pNode))->resType;
×
547

548
    int32_t len = strlen(mr->me.name);
×
549
    res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
×
550
    if (NULL == res->datum.p) {
×
551
      return DEAL_RES_ERROR;
×
552
    }
553
    memcpy(varDataVal(res->datum.p), mr->me.name, len);
×
554
    varDataSetLen(res->datum.p, len);
×
555
    nodesDestroyNode(*pNode);
×
556
    *pNode = (SNode*)res;
×
557
  }
558

559
  return DEAL_RES_CONTINUE;
431,781✔
560
}
561

562
int32_t isQualifiedTable(int64_t uid, SNode* pTagCond, void* vnode, bool* pQualified, SStorageAPI* pAPI) {
61,683✔
563
  int32_t     code = TSDB_CODE_SUCCESS;
61,683✔
564
  SMetaReader mr = {0};
61,683✔
565

566
  pAPI->metaReaderFn.initReader(&mr, vnode, META_READER_LOCK, &pAPI->metaFn);
61,683✔
567
  code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid);
61,683✔
568
  if (TSDB_CODE_SUCCESS != code) {
61,683✔
569
    pAPI->metaReaderFn.clearReader(&mr);
×
570
    *pQualified = false;
×
571

572
    return TSDB_CODE_SUCCESS;
×
573
  }
574

575
  SNode* pTagCondTmp = NULL;
61,683✔
576
  code = nodesCloneNode(pTagCond, &pTagCondTmp);
61,683✔
577
  if (TSDB_CODE_SUCCESS != code) {
61,683✔
578
    *pQualified = false;
×
579
    pAPI->metaReaderFn.clearReader(&mr);
×
580
    return code;
×
581
  }
582
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
61,683✔
583
  nodesRewriteExprPostOrder(&pTagCondTmp, doTranslateTagExpr, &ctx);
61,683✔
584
  pAPI->metaReaderFn.clearReader(&mr);
61,683✔
585
  if (TSDB_CODE_SUCCESS != ctx.code) {
61,683✔
586
    *pQualified = false;
×
587
    nodesDestroyNode(pTagCondTmp);
×
588
    terrno = code;
×
589
    return code;
×
590
  }
591

592
  SNode* pNew = NULL;
61,683✔
593
  code = scalarCalculateConstants(pTagCondTmp, &pNew);
61,683✔
594
  if (TSDB_CODE_SUCCESS != code) {
61,683✔
595
    terrno = code;
×
596
    nodesDestroyNode(pTagCondTmp);
×
597
    *pQualified = false;
×
598

599
    return code;
×
600
  }
601

602
  SValueNode* pValue = (SValueNode*)pNew;
61,683✔
603
  *pQualified = pValue->datum.b;
61,683✔
604

605
  nodesDestroyNode(pNew);
61,683✔
606
  return TSDB_CODE_SUCCESS;
61,683✔
607
}
608

609
static EDealRes getColumn(SNode** pNode, void* pContext) {
48,236,233✔
610
  tagFilterAssist* pData = (tagFilterAssist*)pContext;
48,236,233✔
611
  SColumnNode*     pSColumnNode = NULL;
48,236,233✔
612
  if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
48,237,843✔
613
    pSColumnNode = *(SColumnNode**)pNode;
15,629,900✔
614
  } else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) {
32,613,307✔
615
    SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
739,140✔
616
    if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
739,140✔
617
      pData->code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pSColumnNode);
687,447✔
618
      if (NULL == pSColumnNode) {
688,093✔
619
        return DEAL_RES_ERROR;
×
620
      }
621
      pSColumnNode->colId = -1;
688,093✔
622
      pSColumnNode->colType = COLUMN_TYPE_TBNAME;
688,093✔
623
      pSColumnNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
687,447✔
624
      pSColumnNode->node.resType.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
687,447✔
625
      nodesDestroyNode(*pNode);
688,093✔
626
      *pNode = (SNode*)pSColumnNode;
687,447✔
627
    } else {
628
      return DEAL_RES_CONTINUE;
51,047✔
629
    }
630
  } else {
631
    return DEAL_RES_CONTINUE;
31,871,502✔
632
  }
633

634
  void* data = taosHashGet(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId));
16,316,409✔
635
  if (!data) {
16,315,977✔
636
    int32_t tempRes =
637
        taosHashPut(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode)));
15,007,789✔
638
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
15,008,005✔
639
      return DEAL_RES_ERROR;
×
640
    }
641
    pSColumnNode->slotId = pData->index++;
15,008,005✔
642
    SColumnInfo cInfo = {.colId = pSColumnNode->colId,
15,008,148✔
643
                         .type = pSColumnNode->node.resType.type,
15,005,815✔
644
                         .bytes = pSColumnNode->node.resType.bytes,
15,009,867✔
645
                         .pk = pSColumnNode->isPk};
15,005,459✔
646
#if TAG_FILTER_DEBUG
647
    qDebug("tagfilter build column info, slotId:%d, colId:%d, type:%d", pSColumnNode->slotId, cInfo.colId, cInfo.type);
648
#endif
649
    void* tmp = taosArrayPush(pData->cInfoList, &cInfo);
15,008,472✔
650
    if (!tmp) {
15,010,978✔
651
      return DEAL_RES_ERROR;
×
652
    }
653
  } else {
654
    SColumnNode* col = *(SColumnNode**)data;
1,308,188✔
655
    pSColumnNode->slotId = col->slotId;
1,308,188✔
656
  }
657

658
  return DEAL_RES_CONTINUE;
16,316,369✔
659
}
660

661
static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) {
13,899,577✔
662
  SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
13,899,577✔
663
  if (pColumnData == NULL) {
13,900,568✔
664
    return terrno;
×
665
  }
666

667
  pColumnData->info.type = pType->type;
13,900,568✔
668
  pColumnData->info.bytes = pType->bytes;
13,901,467✔
669
  pColumnData->info.scale = pType->scale;
13,902,353✔
670
  pColumnData->info.precision = pType->precision;
13,899,004✔
671

672
  int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
13,900,492✔
673
  if (code != TSDB_CODE_SUCCESS) {
13,895,409✔
674
    terrno = code;
×
675
    releaseColInfoData(pColumnData);
×
676
    return terrno;
×
677
  }
678

679
  pParam->columnData = pColumnData;
13,895,409✔
680
  pParam->colAlloced = true;
13,898,713✔
681
  return TSDB_CODE_SUCCESS;
13,897,715✔
682
}
683

684
static void releaseColInfoData(void* pCol) {
2,196,250✔
685
  if (pCol) {
2,196,250✔
686
    SColumnInfoData* col = (SColumnInfoData*)pCol;
2,196,250✔
687
    colDataDestroy(col);
2,196,250✔
688
    taosMemoryFree(col);
2,196,994✔
689
  }
690
}
2,196,586✔
691

692
void freeItem(void* p) {
191,870,062✔
693
  STUidTagInfo* pInfo = p;
191,870,062✔
694
  if (pInfo->pTagVal != NULL) {
191,870,062✔
695
    taosMemoryFree(pInfo->pTagVal);
191,452,147✔
696
  }
697
}
191,867,743✔
698

699
typedef struct {
700
  col_id_t  colId;
701
  SNode*    pValueNode;
702
  int32_t   bytes;  // length defined in schema
703
} STagDataEntry;
704

705
static int compareTagDataEntry(const void* a, const void* b) {
44,764✔
706
  STagDataEntry* p1 = (STagDataEntry*)a;
44,764✔
707
  STagDataEntry* p2 = (STagDataEntry*)b;
44,764✔
708
  return compareInt16Val(&p1->colId, &p2->colId);
44,764✔
709
}
710

711
static int32_t buildTagDataEntryKey(SArray* pIdWithValue, char** keyBuf, int32_t keyLen) {
22,382✔
712
  *keyBuf = (char*)taosMemoryCalloc(1, keyLen);
22,382✔
713
  if (NULL == *keyBuf) {
22,743✔
714
    qError(
×
715
      "failed to allocate memory for tag filter optimization key, size:%d",
716
      keyLen);
717
    return terrno;
×
718
  }
719
  char* pStart = *keyBuf;
22,382✔
720
  for (int32_t i = 0; i < taosArrayGetSize(pIdWithValue); ++i) {
67,868✔
721
    STagDataEntry* entry      = (STagDataEntry*)taosArrayGet(pIdWithValue, i);
45,125✔
722
    SValueNode*    pValueNode = (SValueNode*)entry->pValueNode;
45,125✔
723
    // num type may have different bytes length, use the smaller one
724
    int32_t        bytes = TMIN(entry->bytes, pValueNode->node.resType.bytes);
45,486✔
725

726
    (void)memcpy(pStart, &entry->colId, sizeof(col_id_t));
45,486✔
727
    pStart += sizeof(col_id_t);
45,125✔
728

729
    if (!pValueNode->isNull) {
45,486✔
730
      switch (pValueNode->node.resType.type) {
40,071✔
731
        case TSDB_DATA_TYPE_BOOL:
4,693✔
732
          (void)memcpy(
4,693✔
733
            pStart, &pValueNode->datum.b, bytes);
4,693✔
734
          pStart += bytes;
4,693✔
735
          break;
4,693✔
736
        case TSDB_DATA_TYPE_TINYINT:
19,855✔
737
        case TSDB_DATA_TYPE_SMALLINT:
738
        case TSDB_DATA_TYPE_INT:
739
        case TSDB_DATA_TYPE_BIGINT:
740
        case TSDB_DATA_TYPE_TIMESTAMP:
741
          (void)memcpy(
19,855✔
742
            pStart, &pValueNode->datum.i, bytes);
19,855✔
743
          pStart += bytes;
19,855✔
744
          break;
19,855✔
745
        case TSDB_DATA_TYPE_UTINYINT:
×
746
        case TSDB_DATA_TYPE_USMALLINT:
747
        case TSDB_DATA_TYPE_UINT:
748
        case TSDB_DATA_TYPE_UBIGINT:
749
          (void)memcpy(
×
750
            pStart, &pValueNode->datum.u, bytes);
×
751
          pStart += bytes;
×
752
          break;
×
753
        case TSDB_DATA_TYPE_FLOAT:
5,415✔
754
        case TSDB_DATA_TYPE_DOUBLE:
755
          (void)memcpy(
5,415✔
756
            pStart, &pValueNode->datum.d, bytes);
5,415✔
757
          pStart += bytes;
5,415✔
758
          break;
5,415✔
759
        case TSDB_DATA_TYPE_VARCHAR:
10,108✔
760
        case TSDB_DATA_TYPE_VARBINARY:
761
        case TSDB_DATA_TYPE_NCHAR:
762
          (void)memcpy(pStart,
10,108✔
763
            varDataVal(pValueNode->datum.p), varDataLen(pValueNode->datum.p));
10,108✔
764
          pStart += varDataLen(pValueNode->datum.p);
10,108✔
765
          break;
10,108✔
766
        default:
×
767
          qError("unsupported tag data type %d in tag filter optimization",
×
768
            pValueNode->node.resType.type);
769
          return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
770
      }
771
    }
772
  }
773

774
  return TSDB_CODE_SUCCESS;
22,743✔
775
}
776

777
static void extractTagDataEntry(
45,125✔
778
  SOperatorNode* pOpNode, SArray* pIdWithValue) {
779
  SNode* pLeft = pOpNode->pLeft;
45,125✔
780
  SNode* pRight = pOpNode->pRight;
45,486✔
781
  SColumnNode* pColNode = nodeType(pLeft) == QUERY_NODE_COLUMN ?
45,125✔
782
    (SColumnNode*)pLeft : (SColumnNode*)pRight;
45,125✔
783
  SValueNode* pValueNode = nodeType(pLeft) == QUERY_NODE_VALUE ?
45,125✔
784
    (SValueNode*)pLeft : (SValueNode*)pRight;
45,125✔
785

786
  STagDataEntry entry = {0};
45,125✔
787
  entry.colId = pColNode->colId;
45,125✔
788
  entry.pValueNode = (SNode*)pValueNode;
45,125✔
789
  entry.bytes = pColNode->node.resType.bytes;
45,125✔
790
  void* _tmp = taosArrayPush(pIdWithValue, &entry);
45,486✔
791
}
45,486✔
792

793
static int32_t extractTagFilterTagDataEntries(
22,382✔
794
  const SNode* pTagCond, SArray* pIdWithVal) {
795
  if (NULL == pTagCond || NULL == pIdWithVal ||
22,382✔
796
    (nodeType(pTagCond) != QUERY_NODE_OPERATOR &&
22,743✔
797
      nodeType(pTagCond) != QUERY_NODE_LOGIC_CONDITION)) {
22,743✔
798
    qError("invalid parameter to extract tag filter symbol");
×
799
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
800
  }
801

802
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR) {
22,021✔
803
    extractTagDataEntry((SOperatorNode*)pTagCond, pIdWithVal);
×
804
  } else if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION) {
22,382✔
805
    SNode* pChild = NULL;
22,743✔
806
    FOREACH(pChild, ((SLogicConditionNode*)pTagCond)->pParameterList) {
68,229✔
807
      extractTagDataEntry((SOperatorNode*)pChild, pIdWithVal);
45,486✔
808
    }
809
  }
810

811
  taosArraySort(pIdWithVal, compareTagDataEntry);
22,743✔
812

813
  return TSDB_CODE_SUCCESS;
22,743✔
814
}
815

816
static int32_t genStableTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
22,743✔
817
  if (pTagCond == NULL) {
22,743✔
818
    return TSDB_CODE_SUCCESS;
×
819
  }
820

821
  char*   payload = NULL;
22,743✔
822
  int32_t len = 0;
22,743✔
823
  int32_t code = TSDB_CODE_SUCCESS;
22,743✔
824
  int32_t lino = 0;
22,743✔
825

826
  SArray* pIdWithVal = taosArrayInit(TARRAY_MIN_SIZE, sizeof(STagDataEntry));
22,743✔
827
  code = extractTagFilterTagDataEntries(pTagCond, pIdWithVal);
22,382✔
828
  QUERY_CHECK_CODE(code, lino, _end);
22,382✔
829
  for (int32_t i = 0; i < taosArrayGetSize(pIdWithVal); ++i) {
67,507✔
830
    STagDataEntry* pEntry = taosArrayGet(pIdWithVal, i);
45,486✔
831
    len += sizeof(col_id_t) + pEntry->bytes;
45,125✔
832
  }
833
  code = buildTagDataEntryKey(pIdWithVal, &payload, len);
22,743✔
834
  QUERY_CHECK_CODE(code, lino, _end);
22,743✔
835

836
  tMD5Init(pContext);
22,743✔
837
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
22,382✔
838
  tMD5Final(pContext);
22,743✔
839

840
_end:
22,743✔
841
  if (TSDB_CODE_SUCCESS != code) {
22,743✔
842
    qError("%s failed at line %d since %s",
×
843
      __func__, __LINE__, tstrerror(code));
844
  }
845
  taosArrayDestroy(pIdWithVal);
22,743✔
846
  taosMemoryFree(payload);
22,382✔
847
  return code;
22,382✔
848
}
849

850
static int32_t genTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
64,372✔
851
  if (pTagCond == NULL) {
64,372✔
852
    return TSDB_CODE_SUCCESS;
61,074✔
853
  }
854

855
  char*   payload = NULL;
3,298✔
856
  int32_t len = 0;
3,298✔
857
  int32_t code = nodesNodeToMsg(pTagCond, &payload, &len);
3,298✔
858
  if (code != TSDB_CODE_SUCCESS) {
3,298✔
859
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
860
    return code;
×
861
  }
862

863
  tMD5Init(pContext);
3,298✔
864
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
3,298✔
865
  tMD5Final(pContext);
3,298✔
866

867
  // void* tmp = NULL;
868
  // uint32_t size = 0;
869
  // (void)taosAscii2Hex((const char*)pContext->digest, 16, &tmp, &size);
870
  // qInfo("tag filter digest payload: %s", tmp);
871
  // taosMemoryFree(tmp);
872

873
  taosMemoryFree(payload);
3,298✔
874
  return TSDB_CODE_SUCCESS;
3,298✔
875
}
876

877
static int32_t genTbGroupDigest(const SNode* pGroup, uint8_t* filterDigest, T_MD5_CTX* pContext) {
×
878
  int32_t code = TSDB_CODE_SUCCESS;
×
879
  int32_t lino = 0;
×
880
  char*   payload = NULL;
×
881
  int32_t len = 0;
×
882
  code = nodesNodeToMsg(pGroup, &payload, &len);
×
883
  QUERY_CHECK_CODE(code, lino, _end);
×
884

885
  if (filterDigest[0]) {
×
886
    payload = taosMemoryRealloc(payload, len + tListLen(pContext->digest));
×
887
    QUERY_CHECK_NULL(payload, code, lino, _end, terrno);
×
888
    memcpy(payload + len, filterDigest + 1, tListLen(pContext->digest));
×
889
    len += tListLen(pContext->digest);
×
890
  }
891

892
  tMD5Init(pContext);
×
893
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
×
894
  tMD5Final(pContext);
×
895

896
_end:
×
897
  taosMemoryFree(payload);
×
898
  if (code != TSDB_CODE_SUCCESS) {
×
899
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
900
  }
901
  return code;
×
902
}
903

904
int32_t qGetColumnsFromNodeList(void* data, bool isList, SArray** pColList) {
13,716,724✔
905
  int32_t code = TSDB_CODE_SUCCESS;
13,716,724✔
906
  tagFilterAssist ctx = {0};
13,716,724✔
907
  ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
13,717,859✔
908
  if (ctx.colHash == NULL) {
13,717,515✔
909
    code = terrno;
×
910
    goto end;
×
911
  }
912

913
  ctx.index = 0;
13,717,515✔
914
  ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
13,717,515✔
915
  if (ctx.cInfoList == NULL) {
13,719,032✔
916
    code = terrno;
3,393✔
917
    goto end;
×
918
  }
919

920
  if (isList) {
13,715,639✔
921
    SNode* pNode = NULL;
2,011,833✔
922
    FOREACH(pNode, (SNodeList*)data) {
4,215,434✔
923
      nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx);
2,203,750✔
924
      if (TSDB_CODE_SUCCESS != ctx.code) {
2,203,903✔
925
        code = ctx.code;
×
926
        goto end;
×
927
      }
928
      REPLACE_NODE(pNode);
2,203,903✔
929
    }
930
  } else {
931
    SNode* pNode = (SNode*)data;
11,703,806✔
932
    nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx);
11,704,015✔
933
    if (TSDB_CODE_SUCCESS != ctx.code) {
11,704,016✔
934
      code = ctx.code;
×
935
      goto end;
×
936
    }
937
  }
938
  
939
  if (pColList != NULL) *pColList = ctx.cInfoList;
13,711,867✔
940
  ctx.cInfoList = NULL;
13,715,547✔
941

942
end:
13,720,980✔
943
  taosHashCleanup(ctx.colHash);
13,717,011✔
944
  taosArrayDestroy(ctx.cInfoList);
13,710,245✔
945
  return code;
13,711,872✔
946
}
947

948
static int32_t buildGroupInfo(SColumnInfoData* pValue, int32_t i, SArray* gInfo) {
744,709✔
949
  int32_t code = TSDB_CODE_SUCCESS;
744,709✔
950
  SStreamGroupValue* v = taosArrayReserve(gInfo, 1);
744,709✔
951
  if (v == NULL) {
744,299✔
952
    code = terrno;
×
953
    goto end;
×
954
  }
955
  if (colDataIsNull_s(pValue, i)) {
1,488,237✔
956
    v->isNull = true;
14,440✔
957
  } else {
958
    v->isNull = false;
729,498✔
959
    char* data = colDataGetData(pValue, i);
730,269✔
960
    if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
730,269✔
961
      if (tTagIsJson(data)) {
×
962
        code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
×
963
        goto end;
×
964
      }
965
      if (tTagIsJsonNull(data)) {
×
966
        v->isNull = true;
×
967
        goto end;
×
968
      }
969
      int32_t len = getJsonValueLen(data);
×
970
      v->data.type = pValue->info.type;
×
971
      v->data.nData = len;
×
972
      v->data.pData = taosMemoryCalloc(1, len + 1);
×
973
      if (v->data.pData == NULL) {
×
974
        code = terrno;
×
975
        goto end;
×
976
      }
977
      memcpy(v->data.pData, data, len);
×
978
      qDebug("buildGroupInfo:%d add json data len:%d, data:%s", i, len, (char*)v->data.pData);
×
979
    } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
730,269✔
980
      if (varDataTLen(data) > pValue->info.bytes) {
452,586✔
981
        code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
982
        goto end;
×
983
      }
984
      v->data.type = pValue->info.type;
452,176✔
985
      v->data.nData = varDataLen(data);
452,586✔
986
      v->data.pData = taosMemoryCalloc(1, varDataLen(data) + 1);
452,586✔
987
      if (v->data.pData == NULL) {
452,586✔
988
        code = terrno;
×
989
        goto end;
×
990
      }
991
      memcpy(v->data.pData, varDataVal(data), varDataLen(data));
452,586✔
992
      qDebug("buildGroupInfo:%d add var data type:%d, len:%d, data:%s", i, pValue->info.type, varDataLen(data), (char*)v->data.pData);
452,586✔
993
    } else if (pValue->info.type == TSDB_DATA_TYPE_DECIMAL) {  // reader todo decimal
277,683✔
994
      v->data.type = pValue->info.type;
×
995
      v->data.nData = pValue->info.bytes;
×
996
      v->data.pData = taosMemoryCalloc(1, pValue->info.bytes);
×
997
      if (v->data.pData == NULL) {
×
998
        code = terrno;
×
999
        goto end;
×
1000
      }
1001
      memcpy(&v->data.pData, data, pValue->info.bytes);
×
1002
      qDebug("buildGroupInfo:%d add data type:%d, data:%"PRId64, i, pValue->info.type, v->data.val);
×
1003
    } else {  // reader todo decimal
1004
      v->data.type = pValue->info.type;
277,683✔
1005
      memcpy(&v->data.val, data, pValue->info.bytes);
277,683✔
1006
      qDebug("buildGroupInfo:%d add data type:%d, data:%"PRId64, i, pValue->info.type, v->data.val);
277,683✔
1007
    }
1008
  }
1009
end:
79,563✔
1010
  if (code != TSDB_CODE_SUCCESS) {
744,709✔
1011
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1012
    v->isNull = true;
×
1013
  }
1014
  return code;
744,709✔
1015
}
1016

1017
static void getColInfoResultForGroupbyForStream(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo,
107,094✔
1018
                                   SStorageAPI* pAPI, SHashObj* groupIdMap) {
1019
  int32_t      code = TSDB_CODE_SUCCESS;
107,094✔
1020
  int32_t      lino = 0;
107,094✔
1021
  SArray*      pBlockList = NULL;
107,094✔
1022
  SSDataBlock* pResBlock = NULL;
107,094✔
1023
  SArray*      groupData = NULL;
107,094✔
1024
  SArray*      pUidTagList = NULL;
107,094✔
1025
  SArray*      gInfo = NULL;
107,094✔
1026
  int32_t      tbNameIndex = 0;
107,094✔
1027

1028
  int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
107,094✔
1029
  if (rows == 0) {
107,094✔
1030
    return;
×
1031
  }
1032

1033
  pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
107,094✔
1034
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
107,094✔
1035

1036
  for (int32_t i = 0; i < rows; ++i) {
450,071✔
1037
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
342,977✔
1038
    QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
342,977✔
1039
    STUidTagInfo info = {.uid = pkeyInfo->uid};
342,977✔
1040
    void*        tmp = taosArrayPush(pUidTagList, &info);
342,977✔
1041
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
342,977✔
1042
  }
1043
 
1044
  if (taosArrayGetSize(pUidTagList) > 0) {
107,094✔
1045
    code = pAPI->metaFn.getTableTagsByUid(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
107,094✔
1046
  } else {
1047
    code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
×
1048
  }
1049
  if (code != TSDB_CODE_SUCCESS) {
107,094✔
1050
    goto end;
×
1051
  }
1052

1053
  SArray* pColList = NULL;
107,094✔
1054
  code = qGetColumnsFromNodeList(group, true, &pColList);
107,094✔
1055
  if (code != TSDB_CODE_SUCCESS) {
107,094✔
1056
    goto end;
×
1057
  }
1058

1059
  for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
279,804✔
1060
    SColumnInfo* tmp = (SColumnInfo*)taosArrayGet(pColList, i);
172,710✔
1061
    if (tmp != NULL && tmp->colId == -1) {
172,710✔
1062
      tbNameIndex = i;
106,653✔
1063
    }
1064
  }
1065
  
1066
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
106,653✔
1067
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
106,653✔
1068
  taosArrayDestroy(pColList);
107,094✔
1069
  if (pResBlock == NULL) {
107,094✔
1070
    code = terrno;
×
1071
    goto end;
×
1072
  }
1073

1074
  pBlockList = taosArrayInit(2, POINTER_BYTES);
107,094✔
1075
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
107,094✔
1076

1077
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
107,094✔
1078
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
107,094✔
1079

1080
  groupData = taosArrayInit(2, POINTER_BYTES);
107,094✔
1081
  QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
106,684✔
1082

1083
  SNode* pNode = NULL;
107,094✔
1084
  FOREACH(pNode, group) {
280,686✔
1085
    SScalarParam output = {0};
173,592✔
1086

1087
    switch (nodeType(pNode)) {
173,182✔
1088
      case QUERY_NODE_VALUE:
×
1089
        break;
×
1090
      case QUERY_NODE_COLUMN:
173,592✔
1091
      case QUERY_NODE_OPERATOR:
1092
      case QUERY_NODE_FUNCTION: {
1093
        SExprNode* expNode = (SExprNode*)pNode;
173,592✔
1094
        code = createResultData(&expNode->resType, rows, &output);
173,592✔
1095
        if (code != TSDB_CODE_SUCCESS) {
173,182✔
1096
          goto end;
×
1097
        }
1098
        break;
173,182✔
1099
      }
1100

1101
      default:
×
1102
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
1103
        goto end;
×
1104
    }
1105

1106
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
173,182✔
1107
      SColumnNode*     pSColumnNode = (SColumnNode*)pNode;
173,592✔
1108
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
173,592✔
1109
      QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
173,182✔
1110
      code = colDataAssign(output.columnData, pColInfo, rows, NULL);
173,182✔
1111
    } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
×
1112
      continue;
×
1113
    } else {
1114
      gTaskScalarExtra.pStreamInfo = NULL;
×
1115
      gTaskScalarExtra.pStreamRange = NULL;
×
1116
      code = scalarCalculate(pNode, pBlockList, &output, &gTaskScalarExtra);
×
1117
    }
1118

1119
    if (code != TSDB_CODE_SUCCESS) {
173,592✔
1120
      releaseColInfoData(output.columnData);
×
1121
      goto end;
×
1122
    }
1123

1124
    void* tmp = taosArrayPush(groupData, &output.columnData);
173,592✔
1125
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
173,592✔
1126
  }
1127

1128
  for (int i = 0; i < rows; i++) {
450,071✔
1129
    gInfo = taosArrayInit(taosArrayGetSize(groupData), sizeof(SStreamGroupValue));
342,977✔
1130
    QUERY_CHECK_NULL(gInfo, code, lino, end, terrno);
342,977✔
1131

1132
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
342,977✔
1133
    QUERY_CHECK_NULL(info, code, lino, end, terrno);
342,977✔
1134

1135
    for (int j = 0; j < taosArrayGetSize(groupData); j++) {
858,399✔
1136
      SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
515,422✔
1137
        int32_t ret = buildGroupInfo(pValue, i, gInfo);
515,422✔
1138
        if (ret != TSDB_CODE_SUCCESS) {
515,422✔
1139
          qError("buildGroupInfo failed at line %d since %s", __LINE__, tstrerror(ret));
×
1140
          goto end;
×
1141
        }
1142
        if (j == tbNameIndex) {
515,422✔
1143
          SStreamGroupValue* v = taosArrayGetLast(gInfo);
342,977✔
1144
          if (v != NULL){
342,977✔
1145
            v->isTbname = true;
342,977✔
1146
            v->uid = info->uid;
342,977✔
1147
          }
1148
        }
1149
    }
1150

1151
    int32_t ret = taosHashPut(groupIdMap, &info->uid, sizeof(info->uid), &gInfo, POINTER_BYTES);
342,977✔
1152
    if (ret != TSDB_CODE_SUCCESS) {
342,977✔
1153
      qError("put groupid to map failed at line %d since %s", __LINE__, tstrerror(ret));
×
1154
      goto end;
×
1155
    }
1156
    qDebug("put groupid to map gid:%" PRIu64, info->uid);
342,977✔
1157
    gInfo = NULL;
342,977✔
1158
  }
1159

1160
end:
107,094✔
1161
  blockDataDestroy(pResBlock);
107,094✔
1162
  taosArrayDestroy(pBlockList);
107,094✔
1163
  taosArrayDestroyEx(pUidTagList, freeItem);
107,094✔
1164
  taosArrayDestroyP(groupData, releaseColInfoData);
106,684✔
1165
  taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
107,094✔
1166

1167
  if (code != TSDB_CODE_SUCCESS) {
107,094✔
1168
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1169
  }
1170
}
1171

1172
int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo, uint8_t* digest,
1,905,526✔
1173
                                   SStorageAPI* pAPI, bool initRemainGroups, SHashObj* groupIdMap) {
1174
  int32_t      code = TSDB_CODE_SUCCESS;
1,905,526✔
1175
  int32_t      lino = 0;
1,905,526✔
1176
  SArray*      pBlockList = NULL;
1,905,526✔
1177
  SSDataBlock* pResBlock = NULL;
1,905,526✔
1178
  void*        keyBuf = NULL;
1,905,526✔
1179
  SArray*      groupData = NULL;
1,905,526✔
1180
  SArray*      pUidTagList = NULL;
1,905,526✔
1181
  SArray*      tableList = NULL;
1,905,526✔
1182
  SArray*      gInfo = NULL;
1,905,526✔
1183

1184
  int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
1,905,526✔
1185
  if (rows == 0) {
1,905,526✔
1186
    return TSDB_CODE_SUCCESS;
×
1187
  } 
1188

1189
  T_MD5_CTX context = {0};
1,905,526✔
1190
  if (tsTagFilterCache && groupIdMap == NULL) {
1,905,526✔
1191
    SNodeListNode* listNode = NULL;
×
1192
    code = nodesMakeNode(QUERY_NODE_NODE_LIST, (SNode**)&listNode);
×
1193
    if (TSDB_CODE_SUCCESS != code) {
×
1194
      goto end;
×
1195
    }
1196
    listNode->pNodeList = group;
×
1197
    code = genTbGroupDigest((SNode*)listNode, digest, &context);
×
1198
    QUERY_CHECK_CODE(code, lino, end);
×
1199

1200
    nodesFree(listNode);
×
1201

1202
    code = pAPI->metaFn.metaGetCachedTbGroup(pVnode, pTableListInfo->idInfo.suid, context.digest,
×
1203
                                             tListLen(context.digest), &tableList);
1204
    QUERY_CHECK_CODE(code, lino, end);
×
1205

1206
    if (tableList) {
×
1207
      taosArrayDestroy(pTableListInfo->pTableList);
×
1208
      pTableListInfo->pTableList = tableList;
×
1209
      qDebug("retrieve tb group list from cache, numOfTables:%d",
×
1210
             (int32_t)taosArrayGetSize(pTableListInfo->pTableList));
1211
      goto end;
×
1212
    }
1213
  }
1214

1215
  pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
1,905,526✔
1216
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
1,904,910✔
1217

1218
  for (int32_t i = 0; i < rows; ++i) {
11,038,774✔
1219
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
9,133,745✔
1220
    QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
9,133,864✔
1221
    STUidTagInfo info = {.uid = pkeyInfo->uid};
9,133,864✔
1222
    void*        tmp = taosArrayPush(pUidTagList, &info);
9,134,349✔
1223
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
9,134,349✔
1224
  }
1225

1226
  if (taosArrayGetSize(pUidTagList) > 0) {
1,905,029✔
1227
    code = pAPI->metaFn.getTableTagsByUid(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
1,905,041✔
1228
  } else {
1229
    code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
485✔
1230
  }
1231
  if (code != TSDB_CODE_SUCCESS) {
1,904,890✔
1232
    goto end;
×
1233
  }
1234

1235
  SArray* pColList = NULL;
1,904,890✔
1236
  code = qGetColumnsFromNodeList(group, true, &pColList); 
1,904,890✔
1237
  if (code != TSDB_CODE_SUCCESS) {
1,904,741✔
1238
    goto end;
×
1239
  }
1240

1241
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
1,904,741✔
1242
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
1,905,226✔
1243
  taosArrayDestroy(pColList);
1,905,041✔
1244
  if (pResBlock == NULL) {
1,905,041✔
1245
    code = terrno;
×
1246
    goto end;
×
1247
  }
1248

1249
  //  int64_t st1 = taosGetTimestampUs();
1250
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
1251

1252
  pBlockList = taosArrayInit(2, POINTER_BYTES);
1,905,041✔
1253
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
1,905,375✔
1254

1255
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
1,904,890✔
1256
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
1,904,890✔
1257

1258
  groupData = taosArrayInit(2, POINTER_BYTES);
1,904,890✔
1259
  QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
1,905,041✔
1260

1261
  SNode* pNode = NULL;
1,905,041✔
1262
  FOREACH(pNode, group) {
3,935,501✔
1263
    SScalarParam output = {0};
2,030,945✔
1264

1265
    switch (nodeType(pNode)) {
2,030,104✔
1266
      case QUERY_NODE_VALUE:
×
1267
        break;
×
1268
      case QUERY_NODE_COLUMN:
2,023,068✔
1269
      case QUERY_NODE_OPERATOR:
1270
      case QUERY_NODE_FUNCTION: {
1271
        SExprNode* expNode = (SExprNode*)pNode;
2,023,068✔
1272
        code = createResultData(&expNode->resType, rows, &output);
2,023,068✔
1273
        if (code != TSDB_CODE_SUCCESS) {
2,023,068✔
1274
          goto end;
×
1275
        }
1276
        break;
2,023,068✔
1277
      }
1278
      case QUERY_NODE_REMOTE_VALUE: {
7,392✔
1279
        SRemoteValueNode* pRemote = (SRemoteValueNode*)pNode;
7,392✔
1280
        code = qFetchRemoteValue(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, pRemote);
7,392✔
1281
        QUERY_CHECK_CODE(code, lino, end);
7,392✔
1282
        break;
7,392✔
1283
      }
1284
      
1285
      default:
×
1286
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
1287
        goto end;
×
1288
    }
1289

1290
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
2,030,460✔
1291
      SColumnNode*     pSColumnNode = (SColumnNode*)pNode;
2,006,015✔
1292
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
2,006,015✔
1293
      QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
2,005,379✔
1294
      code = colDataAssign(output.columnData, pColInfo, rows, NULL);
2,005,379✔
1295
    } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
24,445✔
1296
      continue;
7,392✔
1297
    } else {
1298
      gTaskScalarExtra.pStreamInfo = NULL;
17,538✔
1299
      gTaskScalarExtra.pStreamRange = NULL;
17,538✔
1300
      code = scalarCalculate(pNode, pBlockList, &output, &gTaskScalarExtra);
17,538✔
1301
    }
1302

1303
    if (code != TSDB_CODE_SUCCESS) {
2,023,068✔
1304
      releaseColInfoData(output.columnData);
×
1305
      goto end;
×
1306
    }
1307

1308
    void* tmp = taosArrayPush(groupData, &output.columnData);
2,023,553✔
1309
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
2,023,553✔
1310
  }
1311

1312
  int32_t keyLen = 0;
1,904,348✔
1313
  SNode*  node;
1314
  FOREACH(node, group) {
3,933,630✔
1315
    SExprNode* pExpr = (SExprNode*)node;
2,030,460✔
1316
    keyLen += pExpr->resType.bytes;
2,030,460✔
1317
  }
1318

1319
  int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
1,904,833✔
1320
  keyLen += nullFlagSize;
1,905,318✔
1321

1322
  keyBuf = taosMemoryCalloc(1, keyLen);
1,905,318✔
1323
  if (keyBuf == NULL) {
1,905,041✔
1324
    code = terrno;
×
1325
    goto end;
×
1326
  }
1327

1328
  if (initRemainGroups) {
1,905,041✔
1329
    pTableListInfo->remainGroups =
840,135✔
1330
        taosHashInit(rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
840,135✔
1331
    if (pTableListInfo->remainGroups == NULL) {
840,135✔
1332
      code = terrno;
×
1333
      goto end;
×
1334
    }
1335
  }
1336

1337
  for (int i = 0; i < rows; i++) {
11,037,071✔
1338
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
9,132,330✔
1339
    QUERY_CHECK_NULL(info, code, lino, end, terrno);
9,132,966✔
1340

1341
    if (groupIdMap != NULL){
9,132,966✔
1342
      gInfo = taosArrayInit(taosArrayGetSize(groupData), sizeof(SStreamGroupValue));
205,575✔
1343
    }
1344
    
1345
    char* isNull = (char*)keyBuf;
9,132,120✔
1346
    char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(group);
9,132,120✔
1347
    for (int j = 0; j < taosArrayGetSize(groupData); j++) {
18,924,772✔
1348
      SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
9,792,383✔
1349

1350
      if (groupIdMap != NULL && gInfo != NULL) {
9,791,415✔
1351
        int32_t ret = buildGroupInfo(pValue, i, gInfo);
229,287✔
1352
        if (ret != TSDB_CODE_SUCCESS) {
229,287✔
1353
          qError("buildGroupInfo failed at line %d since %s", __LINE__, tstrerror(ret));
×
1354
          taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
×
1355
          gInfo = NULL;
×
1356
        }
1357
      }
1358
      
1359
      if (colDataIsNull_s(pValue, i)) {
19,583,523✔
1360
        isNull[j] = 1;
95,192✔
1361
      } else {
1362
        isNull[j] = 0;
9,696,916✔
1363
        char* data = colDataGetData(pValue, i);
9,696,226✔
1364
        if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
9,697,609✔
1365
          // if (tTagIsJson(data)) {
1366
          //   code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
1367
          //   goto end;
1368
          // }
1369
          if (tTagIsJsonNull(data)) {
89,324✔
1370
            isNull[j] = 1;
×
1371
            continue;
×
1372
          }
1373
          int32_t len = getJsonValueLen(data);
89,324✔
1374
          memcpy(pStart, data, len);
89,324✔
1375
          pStart += len;
89,324✔
1376
        } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
9,608,675✔
1377
          if (IS_STR_DATA_BLOB(pValue->info.type)) {
6,515,288✔
1378
            if (blobDataTLen(data) > TSDB_MAX_BLOB_LEN) {
149✔
1379
              code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
1380
              goto end;
×
1381
            }
1382
            memcpy(pStart, data, blobDataTLen(data));
×
1383
            pStart += blobDataTLen(data);
×
1384
          } else {
1385
            if (varDataTLen(data) > pValue->info.bytes) {
6,514,897✔
1386
              code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
1387
              goto end;
×
1388
            }
1389
            memcpy(pStart, data, varDataTLen(data));
6,514,897✔
1390
            pStart += varDataTLen(data);
6,514,746✔
1391
          }
1392
        } else {
1393
          memcpy(pStart, data, pValue->info.bytes);
3,092,846✔
1394
          pStart += pValue->info.bytes;
3,092,846✔
1395
        }
1396
      }
1397
    }
1398

1399
    int32_t len = (int32_t)(pStart - (char*)keyBuf);
9,132,874✔
1400
    info->groupId = calcGroupId(keyBuf, len);
9,132,874✔
1401
    if (groupIdMap != NULL && gInfo != NULL) {
9,132,366✔
1402
      int32_t ret = taosHashPut(groupIdMap, &info->groupId, sizeof(info->groupId), &gInfo, POINTER_BYTES);
205,575✔
1403
      if (ret != TSDB_CODE_SUCCESS) {
205,575✔
1404
        qError("put groupid to map failed at line %d since %s", __LINE__, tstrerror(ret));
×
1405
        taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
×
1406
      }
1407
      qDebug("put groupid to map gid:%" PRIu64, info->groupId);
205,575✔
1408
      gInfo = NULL;
205,575✔
1409
    }
1410
    if (initRemainGroups) {
9,132,366✔
1411
      // groupId ~ table uid
1412
      code = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid),
4,481,132✔
1413
                         sizeof(info->uid));
1414
      if (code == TSDB_CODE_DUP_KEY) {
4,480,796✔
1415
        code = TSDB_CODE_SUCCESS;
844,814✔
1416
      }
1417
      QUERY_CHECK_CODE(code, lino, end);
4,480,796✔
1418
    }
1419
  }
1420

1421
  if (tsTagFilterCache && groupIdMap == NULL) {
1,904,741✔
1422
    tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
×
1423
    QUERY_CHECK_NULL(tableList, code, lino, end, terrno);
×
1424

1425
    code = pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest,
×
1426
                                              tListLen(context.digest), tableList,
1427
                                              taosArrayGetSize(tableList) * sizeof(STableKeyInfo));
×
1428
    QUERY_CHECK_CODE(code, lino, end);
×
1429
  }
1430

1431
  //  int64_t st2 = taosGetTimestampUs();
1432
  //  qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
1433

1434
end:
1,904,610✔
1435
  taosMemoryFreeClear(keyBuf);
1,905,041✔
1436
  blockDataDestroy(pResBlock);
1,905,165✔
1437
  taosArrayDestroy(pBlockList);
1,904,892✔
1438
  taosArrayDestroyEx(pUidTagList, freeItem);
1,905,377✔
1439
  taosArrayDestroyP(groupData, releaseColInfoData);
1,905,041✔
1440
  taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
1,905,075✔
1441

1442
  if (code != TSDB_CODE_SUCCESS) {
1,904,590✔
1443
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1444
  }
1445
  return code;
1,904,590✔
1446
}
1447

1448
static int32_t nameComparFn(const void* p1, const void* p2) {
942,444✔
1449
  const char* pName1 = *(const char**)p1;
942,444✔
1450
  const char* pName2 = *(const char**)p2;
942,444✔
1451

1452
  int32_t ret = strcmp(pName1, pName2);
942,444✔
1453
  if (ret == 0) {
942,444✔
1454
    return 0;
18,408✔
1455
  } else {
1456
    return (ret > 0) ? 1 : -1;
924,036✔
1457
  }
1458
}
1459

1460
static SArray* getTableNameList(const SNodeListNode* pList) {
528,312✔
1461
  int32_t    code = TSDB_CODE_SUCCESS;
528,312✔
1462
  int32_t    lino = 0;
528,312✔
1463
  int32_t    len = LIST_LENGTH(pList->pNodeList);
528,312✔
1464
  SListCell* cell = pList->pNodeList->pHead;
528,312✔
1465

1466
  SArray* pTbList = taosArrayInit(len, POINTER_BYTES);
528,312✔
1467
  QUERY_CHECK_NULL(pTbList, code, lino, _end, terrno);
528,312✔
1468

1469
  for (int i = 0; i < pList->pNodeList->length; i++) {
1,466,689✔
1470
    SValueNode* valueNode = (SValueNode*)cell->pNode;
938,377✔
1471
    if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
938,377✔
1472
      terrno = TSDB_CODE_INVALID_PARA;
×
1473
      taosArrayDestroy(pTbList);
×
1474
      return NULL;
×
1475
    }
1476

1477
    char* name = varDataVal(valueNode->datum.p);
938,377✔
1478
    void* tmp = taosArrayPush(pTbList, &name);
938,377✔
1479
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
938,377✔
1480
    cell = cell->pNext;
938,377✔
1481
  }
1482

1483
  size_t numOfTables = taosArrayGetSize(pTbList);
528,312✔
1484

1485
  // order the name
1486
  taosArraySort(pTbList, nameComparFn);
528,312✔
1487

1488
  // remove the duplicates
1489
  SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
528,312✔
1490
  QUERY_CHECK_NULL(pNewList, code, lino, _end, terrno);
528,312✔
1491
  void* tmpTbl = taosArrayGet(pTbList, 0);
528,312✔
1492
  QUERY_CHECK_NULL(tmpTbl, code, lino, _end, terrno);
528,312✔
1493
  void* tmp = taosArrayPush(pNewList, tmpTbl);
528,312✔
1494
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
528,312✔
1495

1496
  for (int32_t i = 1; i < numOfTables; ++i) {
938,377✔
1497
    char** name = taosArrayGetLast(pNewList);
410,065✔
1498
    char** nameInOldList = taosArrayGet(pTbList, i);
410,065✔
1499
    QUERY_CHECK_NULL(nameInOldList, code, lino, _end, terrno);
410,065✔
1500
    if (strcmp(*name, *nameInOldList) == 0) {
410,065✔
1501
      continue;
9,900✔
1502
    }
1503

1504
    tmp = taosArrayPush(pNewList, nameInOldList);
400,165✔
1505
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
400,165✔
1506
  }
1507

1508
_end:
528,312✔
1509
  taosArrayDestroy(pTbList);
528,312✔
1510
  if (code != TSDB_CODE_SUCCESS) {
528,312✔
1511
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1512
    return NULL;
×
1513
  }
1514
  return pNewList;
528,312✔
1515
}
1516

1517
static int tableUidCompare(const void* a, const void* b) {
×
1518
  uint64_t u1 = *(uint64_t*)a;
×
1519
  uint64_t u2 = *(uint64_t*)b;
×
1520

1521
  if (u1 == u2) {
×
1522
    return 0;
×
1523
  }
1524

1525
  return u1 < u2 ? -1 : 1;
×
1526
}
1527

1528
static int32_t filterTableInfoCompare(const void* a, const void* b) {
18,559,529✔
1529
  STUidTagInfo* p1 = (STUidTagInfo*)a;
18,559,529✔
1530
  STUidTagInfo* p2 = (STUidTagInfo*)b;
18,559,529✔
1531

1532
  if (p1->uid == p2->uid) {
18,559,529✔
1533
    return 0;
×
1534
  }
1535

1536
  return p1->uid < p2->uid ? -1 : 1;
18,559,529✔
1537
}
1538

1539
static FilterCondType checkTagCond(SNode* cond) {
13,142,676✔
1540
  if (nodeType(cond) == QUERY_NODE_OPERATOR) {
13,142,676✔
1541
    return FILTER_NO_LOGIC;
11,092,030✔
1542
  }
1543
  if (nodeType(cond) == QUERY_NODE_LOGIC_CONDITION && ((SLogicConditionNode*)cond)->condType == LOGIC_COND_TYPE_AND) {
2,051,700✔
1544
    return FILTER_AND;
1,851,960✔
1545
  }
1546
  return FILTER_OTHER;
199,537✔
1547
}
1548

1549
static int32_t optimizeTbnameInCond(void* pVnode, int64_t suid, SArray* list, SNode* cond, SStorageAPI* pAPI) {
13,611,707✔
1550
  int32_t ret = -1;
13,611,707✔
1551
  int32_t ntype = nodeType(cond);
13,611,707✔
1552

1553
  if (ntype == QUERY_NODE_OPERATOR) {
13,612,725✔
1554
    ret = optimizeTbnameInCondImpl(pVnode, list, cond, pAPI, suid);
11,554,471✔
1555
    return ret;
11,553,777✔
1556
  }
1557
  if (ntype != QUERY_NODE_LOGIC_CONDITION || ((SLogicConditionNode*)cond)->condType != LOGIC_COND_TYPE_AND) {
2,058,254✔
1558
    return ret;
199,537✔
1559
  }
1560

1561
  bool                 hasTbnameCond = false;
1,857,858✔
1562
  SLogicConditionNode* pNode = (SLogicConditionNode*)cond;
1,857,858✔
1563
  SNodeList*           pList = (SNodeList*)pNode->pParameterList;
1,857,858✔
1564

1565
  int32_t len = LIST_LENGTH(pList);
1,858,069✔
1566
  if (len <= 0) {
1,858,274✔
1567
    return ret;
×
1568
  }
1569

1570
  SListCell* cell = pList->pHead;
1,858,274✔
1571
  for (int i = 0; i < len; i++) {
5,998,566✔
1572
    if (cell == NULL) break;
4,147,227✔
1573
    if (optimizeTbnameInCondImpl(pVnode, list, cell->pNode, pAPI, suid) == 0) {
4,147,227✔
1574
      hasTbnameCond = true;
6,314✔
1575
      break;
6,314✔
1576
    }
1577
    cell = cell->pNext;
4,140,704✔
1578
  }
1579

1580
  taosArraySort(list, filterTableInfoCompare);
1,857,653✔
1581
  taosArrayRemoveDuplicate(list, filterTableInfoCompare, NULL);
1,858,272✔
1582

1583
  if (hasTbnameCond) {
1,858,069✔
1584
    ret = pAPI->metaFn.getTableTagsByUid(pVnode, suid, list);
6,314✔
1585
  }
1586

1587
  return ret;
1,858,069✔
1588
}
1589

1590
// only return uid that does not contained in pExistedUidList
1591
static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, SNode* pTagCond, SStorageAPI* pStoreAPI,
15,701,736✔
1592
                                        uint64_t suid) {
1593
  if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) {
15,701,736✔
1594
    return -1;
6,426✔
1595
  }
1596

1597
  SOperatorNode* pNode = (SOperatorNode*)pTagCond;
15,695,831✔
1598
  if (pNode->opType != OP_TYPE_IN) {
15,695,831✔
1599
    return -1;
14,644,057✔
1600
  }
1601

1602
  if ((pNode->pLeft != NULL && ((nodeType(pNode->pLeft) == QUERY_NODE_FUNCTION &&
1,051,899✔
1603
                                 ((SFunctionNode*)pNode->pLeft)->funcType == FUNCTION_TYPE_TBNAME)) ||
528,312✔
1604
       (nodeType(pNode->pLeft) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pLeft)->colType == COLUMN_TYPE_TBNAME)) &&
523,957✔
1605
      (pNode->pRight != NULL && nodeType(pNode->pRight) == QUERY_NODE_NODE_LIST)) {
527,988✔
1606
    SNodeListNode* pList = (SNodeListNode*)pNode->pRight;
528,312✔
1607

1608
    int32_t len = LIST_LENGTH(pList->pNodeList);
528,312✔
1609
    if (len <= 0) {
528,312✔
1610
      return -1;
×
1611
    }
1612

1613
    SArray*   pTbList = getTableNameList(pList);
528,312✔
1614
    int32_t   numOfTables = taosArrayGetSize(pTbList);
528,312✔
1615
    SHashObj* uHash = NULL;
528,312✔
1616

1617
    size_t numOfExisted = taosArrayGetSize(pExistedUidList);  // len > 0 means there already have uids
528,312✔
1618
    if (numOfExisted > 0) {
528,312✔
1619
      uHash = taosHashInit(numOfExisted / 0.7, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
2,372✔
1620
      if (!uHash) {
2,372✔
1621
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1622
        return terrno;
×
1623
      }
1624

1625
      for (int i = 0; i < numOfExisted; i++) {
2,370,814✔
1626
        STUidTagInfo* pTInfo = taosArrayGet(pExistedUidList, i);
2,368,442✔
1627
        if (!pTInfo) {
2,367,256✔
1628
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1629
          return terrno;
×
1630
        }
1631
        int32_t tempRes = taosHashPut(uHash, &pTInfo->uid, sizeof(uint64_t), &i, sizeof(i));
2,367,256✔
1632
        if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
2,368,442✔
1633
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
1634
          return tempRes;
×
1635
        }
1636
      }
1637
    }
1638

1639
    for (int i = 0; i < numOfTables; i++) {
1,334,505✔
1640
      char* name = taosArrayGetP(pTbList, i);
865,677✔
1641

1642
      uint64_t uid = 0, csuid = 0;
865,677✔
1643
      if (pStoreAPI->metaFn.getTableUidByName(pVnode, name, &uid) == 0) {
865,677✔
1644
        ETableType tbType = TSDB_TABLE_MAX;
475,513✔
1645
        if (pStoreAPI->metaFn.getTableTypeSuidByName(pVnode, name, &tbType, &csuid) == 0 &&
475,513✔
1646
            tbType == TSDB_CHILD_TABLE) {
475,513✔
1647
          if (suid != csuid) {
416,029✔
1648
            continue;
896✔
1649
          }
1650
          if (NULL == uHash || taosHashGet(uHash, &uid, sizeof(uid)) == NULL) {
415,133✔
1651
            STUidTagInfo s = {.uid = uid, .name = name, .pTagVal = NULL};
413,947✔
1652
            void*        tmp = taosArrayPush(pExistedUidList, &s);
413,947✔
1653
            if (!tmp) {
413,947✔
1654
              return terrno;
×
1655
            }
1656
          }
1657
        } else {
1658
          taosArrayDestroy(pTbList);
59,484✔
1659
          taosHashCleanup(uHash);
59,484✔
1660
          return -1;
59,484✔
1661
        }
1662
      } else {
1663
        //        qWarn("failed to get tableIds from by table name: %s, reason: %s", name, tstrerror(terrno));
1664
        terrno = 0;
390,164✔
1665
      }
1666
    }
1667

1668
    taosHashCleanup(uHash);
468,828✔
1669
    taosArrayDestroy(pTbList);
468,828✔
1670
    return 0;
468,828✔
1671
  }
1672

1673
  return -1;
523,587✔
1674
}
1675

1676
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
14,374,733✔
1677
                                        SStorageAPI* pStorageAPI) {
1678
  int32_t      code = TSDB_CODE_SUCCESS;
14,374,733✔
1679
  int32_t      lino = 0;
14,374,733✔
1680
  SSDataBlock* pResBlock = NULL;
14,374,733✔
1681
  code = createDataBlock(&pResBlock);
14,376,471✔
1682
  QUERY_CHECK_CODE(code, lino, _end);
14,375,133✔
1683

1684
  for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
30,044,971✔
1685
    SColumnInfoData colInfo = {0};
15,670,698✔
1686
    void*           tmp = taosArrayGet(pColList, i);
15,668,412✔
1687
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
15,666,642✔
1688
    colInfo.info = *(SColumnInfo*)tmp;
15,666,642✔
1689
    code = blockDataAppendColInfo(pResBlock, &colInfo);
15,664,642✔
1690
    QUERY_CHECK_CODE(code, lino, _end);
15,667,505✔
1691
  }
1692

1693
  code = blockDataEnsureCapacity(pResBlock, numOfTables);
14,373,878✔
1694
  if (code != TSDB_CODE_SUCCESS) {
14,373,314✔
1695
    terrno = code;
×
1696
    blockDataDestroy(pResBlock);
×
1697
    return NULL;
×
1698
  }
1699

1700
  pResBlock->info.rows = numOfTables;
14,373,314✔
1701

1702
  int32_t numOfCols = taosArrayGetSize(pResBlock->pDataBlock);
14,372,905✔
1703

1704
  for (int32_t i = 0; i < numOfTables; i++) {
207,216,629✔
1705
    STUidTagInfo* p1 = taosArrayGet(pUidTagList, i);
192,836,808✔
1706
    QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
192,796,971✔
1707

1708
    for (int32_t j = 0; j < numOfCols; j++) {
393,560,035✔
1709
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
200,699,156✔
1710
      QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,690,898✔
1711

1712
      if (pColInfo->info.colId == -1) {  // tbname
200,690,898✔
1713
        char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
8,299,340✔
1714
        if (p1->name != NULL) {
8,303,240✔
1715
          STR_TO_VARSTR(str, p1->name);
413,947✔
1716
        } else {  // name is not retrieved during filter
1717
          code = pStorageAPI->metaFn.getTableNameByUid(pVnode, p1->uid, str);
7,888,206✔
1718
          QUERY_CHECK_CODE(code, lino, _end);
7,888,700✔
1719
        }
1720

1721
        code = colDataSetVal(pColInfo, i, str, false);
8,302,647✔
1722
        QUERY_CHECK_CODE(code, lino, _end);
8,302,647✔
1723
#if TAG_FILTER_DEBUG
1724
        qDebug("tagfilter uid:%ld, tbname:%s", *uid, str + 2);
1725
#endif
1726
      } else {
1727
        STagVal tagVal = {0};
192,383,939✔
1728
        tagVal.cid = pColInfo->info.colId;
192,396,277✔
1729
        if (p1->pTagVal == NULL) {
192,400,286✔
1730
          colDataSetNULL(pColInfo, i);
9,025✔
1731
        } else {
1732
          const char* p = pStorageAPI->metaFn.extractTagVal(p1->pTagVal, pColInfo->info.type, &tagVal);
192,391,477✔
1733

1734
          if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) {
192,411,894✔
1735
            colDataSetNULL(pColInfo, i);
4,077,749✔
1736
          } else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
188,335,407✔
1737
            code = colDataSetVal(pColInfo, i, p, false);
706,527✔
1738
            QUERY_CHECK_CODE(code, lino, _end);
706,527✔
1739
          } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
303,929,788✔
1740
            if (IS_STR_DATA_BLOB(pColInfo->info.type)) {
116,325,220✔
1741
              QUERY_CHECK_CODE(code = TSDB_CODE_BLOB_NOT_SUPPORT_TAG, lino, _end);
×
1742
            }
1743
            char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
116,314,352✔
1744
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
116,286,053✔
1745
            varDataSetLen(tmp, tagVal.nData);
116,286,053✔
1746
            memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
116,307,309✔
1747
            code = colDataSetVal(pColInfo, i, tmp, false);
116,305,768✔
1748
#if TAG_FILTER_DEBUG
1749
            qDebug("tagfilter varch:%s", tmp + 2);
1750
#endif
1751
            taosMemoryFree(tmp);
116,309,395✔
1752
            QUERY_CHECK_CODE(code, lino, _end);
116,307,992✔
1753
          } else {
1754
            code = colDataSetVal(pColInfo, i, (const char*)&tagVal.i64, false);
71,308,727✔
1755
            QUERY_CHECK_CODE(code, lino, _end);
71,316,046✔
1756
#if TAG_FILTER_DEBUG
1757
            if (pColInfo->info.type == TSDB_DATA_TYPE_INT) {
1758
              qDebug("tagfilter int:%d", *(int*)(&tagVal.i64));
1759
            } else if (pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE) {
1760
              qDebug("tagfilter double:%f", *(double*)(&tagVal.i64));
1761
            }
1762
#endif
1763
          }
1764
        }
1765
      }
1766
    }
1767
  }
1768

1769
_end:
14,382,493✔
1770
  if (code != TSDB_CODE_SUCCESS) {
14,379,821✔
1771
    blockDataDestroy(pResBlock);
1,461✔
1772
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1773
    terrno = code;
×
1774
    return NULL;
×
1775
  }
1776
  return pResBlock;
14,378,360✔
1777
}
1778

1779
static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList,
11,696,440✔
1780
                                 bool* pResultList, bool addUid) {
1781
  taosArrayClear(pUidList);
11,696,440✔
1782

1783
  STableKeyInfo info = {.uid = 0, .groupId = 0};
11,688,710✔
1784
  int32_t       numOfTables = taosArrayGetSize(pUidTagList);
11,689,833✔
1785
  for (int32_t i = 0; i < numOfTables; ++i) {
194,066,721✔
1786
    if (pResultList[i]) {
182,363,753✔
1787
      STUidTagInfo* tmpTag = (STUidTagInfo*)taosArrayGet(pUidTagList, i);
79,846,117✔
1788
      if (!tmpTag) {
79,843,120✔
1789
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1790
        return terrno;
×
1791
      }
1792
      uint64_t uid = tmpTag->uid;
79,843,120✔
1793
      qDebug("tagfilter get uid:%" PRId64 ", res:%d", uid, pResultList[i]);
79,844,829✔
1794

1795
      info.uid = uid;
79,860,356✔
1796
      //qInfo("doSetQualifiedUid row:%d added to pTableList", i);
1797
      void* p = taosArrayPush(pListInfo->pTableList, &info);
79,860,356✔
1798
      if (p == NULL) {
79,851,250✔
1799
        return terrno;
×
1800
      }
1801

1802
      if (addUid) {
79,851,250✔
1803
        //qInfo("doSetQualifiedUid row:%d added to pUidList", i);
1804
        void* tmp = taosArrayPush(pUidList, &uid);
20,382✔
1805
        if (tmp == NULL) {
20,382✔
1806
          return terrno;
×
1807
        }
1808
      }
1809
    } else {
1810
      //qInfo("doSetQualifiedUid row:%d failed", i);
1811
    }
1812
  }
1813

1814
  return TSDB_CODE_SUCCESS;
11,702,968✔
1815
}
1816

1817
static int32_t copyExistedUids(SArray* pUidTagList, const SArray* pUidList) {
13,611,707✔
1818
  int32_t code = TSDB_CODE_SUCCESS;
13,611,707✔
1819
  int32_t numOfExisted = taosArrayGetSize(pUidList);
13,611,707✔
1820
  if (numOfExisted == 0) {
13,611,293✔
1821
    return code;
10,658,452✔
1822
  }
1823

1824
  for (int32_t i = 0; i < numOfExisted; ++i) {
36,026,172✔
1825
    uint64_t* uid = taosArrayGet(pUidList, i);
33,072,864✔
1826
    if (!uid) {
33,073,331✔
1827
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1828
      return terrno;
×
1829
    }
1830
    STUidTagInfo info = {.uid = *uid};
33,073,331✔
1831
    void*        tmp = taosArrayPush(pUidTagList, &info);
33,073,331✔
1832
    if (!tmp) {
33,073,331✔
1833
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1834
      return code;
×
1835
    }
1836
  }
1837
  return code;
2,953,308✔
1838
}
1839

1840
int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SNode* pTagCond, void* pVnode,
224,135,144✔
1841
                                 SIdxFltStatus status, SStorageAPI* pAPI, bool addUid, bool* listAdded, void* pStreamInfo) {
1842
  *listAdded = false;
224,135,144✔
1843
  if (pTagCond == NULL) {
224,153,197✔
1844
    return TSDB_CODE_SUCCESS;
210,515,128✔
1845
  }
1846

1847
  terrno = TSDB_CODE_SUCCESS;
13,638,069✔
1848

1849
  int32_t      lino = 0;
13,611,707✔
1850
  int32_t      code = TSDB_CODE_SUCCESS;
13,611,707✔
1851
  SArray*      pBlockList = NULL;
13,611,707✔
1852
  SSDataBlock* pResBlock = NULL;
13,611,707✔
1853
  SScalarParam output = {0};
13,611,910✔
1854
  SArray*      pUidTagList = NULL;
13,612,282✔
1855

1856
  SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
13,612,282✔
1857

1858
  //  int64_t stt = taosGetTimestampUs();
1859
  pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
13,611,057✔
1860
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
13,611,115✔
1861

1862
  code = copyExistedUids(pUidTagList, pUidList);
13,611,115✔
1863
  QUERY_CHECK_CODE(code, lino, end);
13,611,707✔
1864

1865
  int32_t filter = optimizeTbnameInCond(pVnode, pListInfo->idInfo.suid, pUidTagList, pTagCond, pAPI);
13,611,707✔
1866
  if (filter == 0) {  // tbname in filter is activated, do nothing and return
13,610,354✔
1867
    taosArrayClear(pUidList);
468,828✔
1868

1869
    int32_t numOfRows = taosArrayGetSize(pUidTagList);
468,828✔
1870
    code = taosArrayEnsureCap(pUidList, numOfRows);
468,828✔
1871
    QUERY_CHECK_CODE(code, lino, end);
468,828✔
1872

1873
    for (int32_t i = 0; i < numOfRows; ++i) {
3,251,217✔
1874
      STUidTagInfo* pInfo = taosArrayGet(pUidTagList, i);
2,782,389✔
1875
      QUERY_CHECK_NULL(pInfo, code, lino, end, terrno);
2,782,389✔
1876
      void* tmp = taosArrayPush(pUidList, &pInfo->uid);
2,782,389✔
1877
      QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
2,782,389✔
1878
    }
1879
    terrno = 0;
468,828✔
1880
  } else {
1881
    qDebug("pUidTagList size:%d", (int32_t)taosArrayGetSize(pUidTagList));
13,141,526✔
1882

1883
    FilterCondType condType = checkTagCond(pTagCond);
13,141,526✔
1884
    if (((condType == FILTER_NO_LOGIC || condType == FILTER_AND) && status != SFLT_NOT_INDEX) ||
22,911,204✔
1885
          taosArrayGetSize(pUidTagList) > 0) {
9,768,536✔
1886
      code = pAPI->metaFn.getTableTagsByUid(pVnode, pListInfo->idInfo.suid, pUidTagList);
3,659,146✔
1887
    } else {
1888
      code = pAPI->metaFn.getTableTags(pVnode, pListInfo->idInfo.suid, pUidTagList);
9,483,522✔
1889
    }
1890
    if (code != TSDB_CODE_SUCCESS) {
13,143,336✔
1891
      qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code), pListInfo->idInfo.suid);
×
1892
      terrno = code;
×
1893
      QUERY_CHECK_CODE(code, lino, end);
×
1894
    }
1895
  }
1896

1897
  qDebug("final pUidTagList size:%d", (int32_t)taosArrayGetSize(pUidTagList));
13,612,164✔
1898

1899
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
13,613,746✔
1900
  if (numOfTables == 0) {
13,612,318✔
1901
    goto end;
1,906,715✔
1902
  }
1903

1904
  SArray* pColList = NULL;
11,705,603✔
1905
  code = qGetColumnsFromNodeList(pTagCond, false, &pColList); 
11,706,046✔
1906
  if (code != TSDB_CODE_SUCCESS) {
11,698,048✔
1907
    goto end;
×
1908
  }
1909
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
11,698,048✔
1910
  taosArrayDestroy(pColList);
11,704,978✔
1911
  if (pResBlock == NULL) {
11,704,314✔
1912
    code = terrno;
×
1913
    QUERY_CHECK_CODE(code, lino, end);
×
1914
  }
1915

1916
  //fprintDataBlock(pResBlock, "tagFilter", "", 0);
1917

1918
  //  int64_t st1 = taosGetTimestampUs();
1919
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
1920
  pBlockList = taosArrayInit(2, POINTER_BYTES);
11,704,314✔
1921
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
11,702,534✔
1922

1923
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
11,706,059✔
1924
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
11,706,059✔
1925

1926
  code = createResultData(&type, numOfTables, &output);
11,706,059✔
1927
  if (code != TSDB_CODE_SUCCESS) {
11,701,085✔
1928
    terrno = code;
×
1929
    QUERY_CHECK_CODE(code, lino, end);
×
1930
  }
1931

1932
  gTaskScalarExtra.pStreamInfo = pStreamInfo;
11,701,085✔
1933
  gTaskScalarExtra.pStreamRange = NULL;
11,701,085✔
1934
  code = scalarCalculate(pTagCond, pBlockList, &output, &gTaskScalarExtra);
11,701,612✔
1935
  if (code != TSDB_CODE_SUCCESS) {
11,693,132✔
1936
    qError("failed to calculate scalar, reason:%s", tstrerror(code));
1,096✔
1937
    terrno = code;
1,096✔
1938
    QUERY_CHECK_CODE(code, lino, end);
1,096✔
1939
  }
1940

1941
  code = doSetQualifiedUid(pListInfo, pUidList, pUidTagList, (bool*)output.columnData->pData, addUid);
11,692,036✔
1942
  if (code != TSDB_CODE_SUCCESS) {
11,703,173✔
1943
    terrno = code;
×
1944
    QUERY_CHECK_CODE(code, lino, end);
×
1945
  }
1946
  *listAdded = true;
11,703,173✔
1947

1948
end:
13,612,113✔
1949
  if (code != TSDB_CODE_SUCCESS) {
13,611,801✔
1950
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,096✔
1951
  }
1952
  blockDataDestroy(pResBlock);
13,611,801✔
1953
  taosArrayDestroy(pBlockList);
13,609,780✔
1954
  taosArrayDestroyEx(pUidTagList, freeItem);
13,609,505✔
1955

1956
  colDataDestroy(output.columnData);
13,611,408✔
1957
  taosMemoryFreeClear(output.columnData);
13,610,384✔
1958
  return code;
13,611,222✔
1959
}
1960

1961
typedef struct {
1962
  int32_t code;
1963
  SStreamRuntimeFuncInfo* pStreamRuntimeInfo;
1964
} PlaceHolderContext;
1965

1966
static EDealRes replacePlaceHolderColumn(SNode** pNode, void* pContext) {
166,437✔
1967
  PlaceHolderContext* pData = (PlaceHolderContext*)pContext;
166,437✔
1968
  if (QUERY_NODE_FUNCTION != nodeType((*pNode))) {
166,437✔
1969
    return DEAL_RES_CONTINUE;
137,508✔
1970
  }
1971
  SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
28,929✔
1972
  if (!fmIsStreamPesudoColVal(pFuncNode->funcId)) {
28,929✔
1973
    return DEAL_RES_CONTINUE;
886✔
1974
  }
1975
  pData->code = fmSetStreamPseudoFuncParamVal(pFuncNode->funcId, pFuncNode->pParameterList, pData->pStreamRuntimeInfo);
28,043✔
1976
  if (pData->code != TSDB_CODE_SUCCESS) {
28,043✔
1977
    return DEAL_RES_ERROR;
×
1978
  }
1979
  SNode* pFirstParam = nodesListGetNode(pFuncNode->pParameterList, 0);
28,043✔
1980
  ((SValueNode*)pFirstParam)->translate = true;
28,043✔
1981
  SValueNode* res = NULL;
28,043✔
1982
  pData->code = nodesCloneNode(pFirstParam, (SNode**)&res);
28,043✔
1983
  if (NULL == res) {
28,043✔
1984
    return DEAL_RES_ERROR;
×
1985
  }
1986
  nodesDestroyNode(*pNode);
28,043✔
1987
  *pNode = (SNode*)res;
28,043✔
1988

1989
  return DEAL_RES_CONTINUE;
28,043✔
1990
}
1991

1992
static void extractTagColId(SOperatorNode* pOpNode, SArray* pColIdArray) {
45,125✔
1993
  SNode* pLeft = pOpNode->pLeft;
45,125✔
1994
  SNode* pRight = pOpNode->pRight;
45,486✔
1995
  SColumnNode* pColNode = nodeType(pLeft) == QUERY_NODE_COLUMN ?
45,486✔
1996
    (SColumnNode*)pLeft : (SColumnNode*)pRight;
45,125✔
1997

1998
  col_id_t colId = pColNode->colId;
45,125✔
1999
  void* _tmp = taosArrayPush(pColIdArray, &colId);
45,486✔
2000
}
45,486✔
2001

2002
static int32_t buildTagCondKey(
22,382✔
2003
  const SNode* pTagCond, char** pTagCondKey,
2004
  int32_t* tagCondKeyLen, SArray** pTagColIds) {
2005
  if (NULL == pTagCond ||
22,382✔
2006
    (nodeType(pTagCond) != QUERY_NODE_OPERATOR &&
22,382✔
2007
      nodeType(pTagCond) != QUERY_NODE_LOGIC_CONDITION)) {
22,382✔
2008
    qError("invalid parameter to extract tag filter symbol");
×
2009
    return TSDB_CODE_INTERNAL_ERROR;
×
2010
  }
2011
  int32_t code = TSDB_CODE_SUCCESS;
22,382✔
2012
  int32_t lino = 0;
22,382✔
2013
  *pTagColIds = taosArrayInit(4, sizeof(col_id_t));
22,382✔
2014

2015
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR) {
22,743✔
2016
    extractTagColId((SOperatorNode*)pTagCond, *pTagColIds);
×
2017
  } else if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION) {
22,382✔
2018
    SNode* pChild = NULL;
22,743✔
2019
    FOREACH(pChild, ((SLogicConditionNode*)pTagCond)->pParameterList) {
68,229✔
2020
      extractTagColId((SOperatorNode*)pChild, *pTagColIds);
44,764✔
2021
    }
2022
  }
2023

2024
  taosArraySort(*pTagColIds, compareUint16Val);
22,382✔
2025

2026
  // encode ordered colIds into key string, separated by ','
2027
  *tagCondKeyLen =
45,125✔
2028
    (int32_t)(taosArrayGetSize(*pTagColIds) * (sizeof(col_id_t) + 1) - 1);
22,743✔
2029
  *pTagCondKey = (char*)taosMemoryCalloc(1, *tagCondKeyLen);
22,382✔
2030
  TSDB_CHECK_NULL(*pTagCondKey, code, lino, _end, terrno);
22,743✔
2031
  char* pStart = *pTagCondKey;
22,382✔
2032
  for (int32_t i = 0; i < taosArrayGetSize(*pTagColIds); ++i) {
67,146✔
2033
    col_id_t* pColId = (col_id_t*)taosArrayGet(*pTagColIds, i);
44,764✔
2034
    TSDB_CHECK_NULL(pColId, code, lino, _end, terrno);
45,486✔
2035
    memcpy(pStart, pColId, sizeof(col_id_t));
45,486✔
2036
    pStart += sizeof(col_id_t);
45,125✔
2037
    if (i != taosArrayGetSize(*pTagColIds) - 1) {
45,125✔
2038
      *pStart = ',';
22,382✔
2039
      pStart += 1;
22,382✔
2040
    }
2041
  }
2042

2043
_end:
22,382✔
2044
  if (TSDB_CODE_SUCCESS != code) {
22,382✔
2045
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2046
    terrno = code;
×
2047
  }
2048
  return code;
22,382✔
2049
}
2050

2051
static EDealRes canOptimizeTagCondFilter(SNode* pTagCond, void* pContext) {
211,546✔
2052
  if (NULL == pTagCond) {
211,546✔
2053
    *(bool*)pContext = false;
×
2054
    return DEAL_RES_END;
×
2055
  }
2056
  if (nodeType(pTagCond) == QUERY_NODE_VALUE ||
211,546✔
2057
    nodeType(pTagCond) == QUERY_NODE_COLUMN) {
140,429✔
2058
    return DEAL_RES_CONTINUE;
116,603✔
2059
  }
2060
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR &&
94,943✔
2061
    ((SOperatorNode*)pTagCond)->opType == OP_TYPE_EQUAL) {
46,569✔
2062
    return DEAL_RES_CONTINUE;
45,486✔
2063
  }
2064
  if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION &&
49,457✔
2065
    ((SLogicConditionNode*)pTagCond)->condType == LOGIC_COND_TYPE_AND) {
22,743✔
2066
    return DEAL_RES_CONTINUE;
22,743✔
2067
  }
2068
  if (nodeType(pTagCond) == QUERY_NODE_FUNCTION &&
52,345✔
2069
    fmIsStreamPesudoColVal(((SFunctionNode*)pTagCond)->funcId)) {
25,631✔
2070
    return DEAL_RES_CONTINUE;
25,631✔
2071
  }
2072
  *(bool*)pContext = false;
1,083✔
2073
  return DEAL_RES_END;
1,083✔
2074
}
2075

2076
int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
224,022,757✔
2077
                     STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI, void* pStreamInfo) {
2078
  int32_t code = TSDB_CODE_SUCCESS;
224,022,757✔
2079
  int32_t lino = 0;
224,022,757✔
2080
  size_t  numOfTables = 0;
224,022,757✔
2081
  bool    listAdded = false;
224,022,757✔
2082

2083
  pListInfo->idInfo.suid = pScanNode->suid;
224,054,290✔
2084
  pListInfo->idInfo.tableType = pScanNode->tableType;
224,018,453✔
2085

2086
  SArray* pUidList = taosArrayInit(8, sizeof(uint64_t));
223,938,686✔
2087
  QUERY_CHECK_NULL(pUidList, code, lino, _error, terrno);
223,924,287✔
2088

2089
  SIdxFltStatus status = SFLT_NOT_INDEX;
223,924,287✔
2090
  char*   pTagCondKey = NULL;
223,957,191✔
2091
  int32_t tagCondKeyLen;
223,997,072✔
2092
  SArray* pTagColIds = NULL;
223,995,312✔
2093
  char*   pPayload = NULL;
223,982,845✔
2094
  qTrace("getTableList called, suid:%" PRIu64
223,982,845✔
2095
    ", tagCond:%p, tagIndexCond:%p, %d %d", pScanNode->suid, pTagCond,
2096
    pTagIndexCond, pScanNode->tableType, pScanNode->virtualStableScan);
2097
  if (pScanNode->tableType != TSDB_SUPER_TABLE && !pScanNode->virtualStableScan) {
223,982,845✔
2098
    pListInfo->idInfo.uid = pScanNode->uid;
149,791,814✔
2099
    if (pStorageAPI->metaFn.isTableExisted(pVnode, pScanNode->uid)) {
149,727,158✔
2100
      void* tmp = taosArrayPush(pUidList, &pScanNode->uid);
149,781,819✔
2101
      QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
149,767,624✔
2102
    }
2103
    code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status, pStorageAPI, false, &listAdded, pStreamInfo);
149,820,285✔
2104
    QUERY_CHECK_CODE(code, lino, _end);
149,820,646✔
2105
  } else {
2106
    bool      isStream = (pStreamInfo != NULL);
74,268,379✔
2107
    bool      hasTagCond = (pTagCond != NULL);
74,268,379✔
2108
    bool      canCacheTagEqCondFilter = false;
74,268,379✔
2109
    T_MD5_CTX context = {0};
74,233,518✔
2110

2111
    qTrace("start to get table list by tag filter, suid:%" PRIu64
74,269,346✔
2112
      ",tsStableTagFilterCache:%d, tsTagFilterCache:%d", 
2113
      pScanNode->suid, tsStableTagFilterCache, tsTagFilterCache);
2114

2115
    bool acquired = false;
74,269,346✔
2116
    // first, check whether we can use stable tag filter cache
2117
    if (tsStableTagFilterCache && isStream && hasTagCond) {
74,209,240✔
2118
      canCacheTagEqCondFilter = true;
23,826✔
2119
      nodesWalkExpr(pTagCond, canOptimizeTagCondFilter,
23,826✔
2120
        (void*)&canCacheTagEqCondFilter);
2121
    }
2122
    if (canCacheTagEqCondFilter) {
74,195,702✔
2123
      qDebug("%s, stable tag filter condition can be optimized", idstr);
22,743✔
2124
      if (((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
22,743✔
2125
        SNode* tmp = NULL;
22,743✔
2126
        code = nodesCloneNode((SNode*)pTagCond, &tmp);
22,743✔
2127
        QUERY_CHECK_CODE(code, lino, _error);
22,743✔
2128

2129
        PlaceHolderContext ctx = {.code = TSDB_CODE_SUCCESS, .pStreamRuntimeInfo = (SStreamRuntimeFuncInfo*)pStreamInfo};
22,743✔
2130
        nodesRewriteExpr(&tmp, replacePlaceHolderColumn, (void*)&ctx);
22,743✔
2131
        if (TSDB_CODE_SUCCESS != ctx.code) {
22,743✔
2132
          nodesDestroyNode(tmp);
×
2133
          code = ctx.code;
×
2134
          goto _error;
×
2135
        }
2136
        code = genStableTagFilterDigest(tmp, &context);
22,743✔
2137
        nodesDestroyNode(tmp);
22,382✔
2138
      } else {
2139
        code = genStableTagFilterDigest(pTagCond, &context);
×
2140
      }
2141
      QUERY_CHECK_CODE(code, lino, _error);
22,743✔
2142

2143
      code = buildTagCondKey(
22,743✔
2144
        pTagCond, &pTagCondKey, &tagCondKeyLen, &pTagColIds);
2145
      QUERY_CHECK_CODE(code, lino, _error);
22,382✔
2146
      code = pStorageAPI->metaFn.getStableCachedTableList(
22,382✔
2147
        pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
22,743✔
2148
        context.digest, tListLen(context.digest), pUidList, &acquired);
2149
      QUERY_CHECK_CODE(code, lino, _error);
22,743✔
2150
    } else if (tsTagFilterCache) {
74,172,959✔
2151
      // second, try to use normal tag filter cache
2152
      qDebug("%s using normal tag filter cache", idstr);
64,372✔
2153
      if (pStreamInfo != NULL && ((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
66,784✔
2154
        SNode* tmp = NULL;
2,412✔
2155
        code = nodesCloneNode((SNode*)pTagCond, &tmp);
2,412✔
2156
        QUERY_CHECK_CODE(code, lino, _error);
2,412✔
2157

2158
        PlaceHolderContext ctx = {.code = TSDB_CODE_SUCCESS, .pStreamRuntimeInfo = (SStreamRuntimeFuncInfo*)pStreamInfo};
2,412✔
2159
        nodesRewriteExpr(&tmp, replacePlaceHolderColumn, (void*)&ctx);
2,412✔
2160
        if (TSDB_CODE_SUCCESS != ctx.code) {
2,412✔
2161
          nodesDestroyNode(tmp);
×
2162
          code = ctx.code;
×
2163
          goto _error;
×
2164
        }
2165
        code = genTagFilterDigest(tmp, &context);
2,412✔
2166
        nodesDestroyNode(tmp);
2,412✔
2167
      } else {
2168
        code = genTagFilterDigest(pTagCond, &context);
61,960✔
2169
      }
2170
      // try to retrieve the result from meta cache
2171
      QUERY_CHECK_CODE(code, lino, _error);      
64,372✔
2172
      code = pStorageAPI->metaFn.getCachedTableList(
64,372✔
2173
        pVnode, pScanNode->suid, context.digest,
64,372✔
2174
        tListLen(context.digest), pUidList, &acquired);
2175
      QUERY_CHECK_CODE(code, lino, _error);
75,929✔
2176
    }
2177
    if (acquired) {
74,206,998✔
2178
      taosArrayDestroy(pTagColIds);
60,385✔
2179
      pTagColIds = NULL;
60,746✔
2180
      
2181
      digest[0] = 1;
60,746✔
2182
      memcpy(
120,770✔
2183
        digest + 1, context.digest, tListLen(context.digest));
60,746✔
2184
      qDebug("suid:%" PRIu64 ", %s retrieve table uid list from cache,"
60,385✔
2185
        " numOfTables:%d", 
2186
        pScanNode->suid, idstr, (int32_t)taosArrayGetSize(pUidList));
2187
      goto _end;
60,746✔
2188
    } else {
2189
      qDebug("suid:%" PRIu64 
74,146,613✔
2190
        ", failed to get table uid list from cache", pScanNode->suid);
2191
    }
2192

2193
    if (!pTagCond) {  // no tag filter condition exists, let's fetch all tables of this super table
74,198,486✔
2194
      code = pStorageAPI->metaFn.getChildTableList(pVnode, pScanNode->suid, pUidList);
60,863,310✔
2195
      QUERY_CHECK_CODE(code, lino, _error);
60,849,602✔
2196
      qTrace("no tag filter, get all child tables, numOfTables:%d", (int32_t)taosArrayGetSize(pUidList));
60,849,602✔
2197
    } else {
2198
      // failed to find the result in the cache, let try to calculate the results
2199
      if (pTagIndexCond) {
13,335,176✔
2200
        void* pIndex = pStorageAPI->metaFn.getInvertIndex(pVnode);
4,479,093✔
2201

2202
        SIndexMetaArg metaArg = {.metaEx = pVnode,
4,479,157✔
2203
                                 .idx = pStorageAPI->metaFn.storeGetIndexInfo(pVnode),
4,479,093✔
2204
                                 .ivtIdx = pIndex,
2205
                                 .suid = pScanNode->uid};
4,479,093✔
2206

2207
        status = SFLT_NOT_INDEX;
4,479,093✔
2208
        code = doFilterTag(pTagIndexCond, &metaArg, pUidList, &status, &pStorageAPI->metaFilter);
4,479,093✔
2209
        if (code != 0 || status == SFLT_NOT_INDEX) {  // temporarily disable it for performance sake
4,475,320✔
2210
          qDebug("failed to get tableIds from index, suid:%" PRIu64 ", uidListSize:%d", pScanNode->uid, (int32_t)taosArrayGetSize(pUidList));
1,096,460✔
2211
        } else {
2212
          qDebug("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
3,378,860✔
2213
        }
2214
      }
2215
    }
2216
    qTrace("after index filter, pTagCond:%p uidListSize:%d", pTagCond, (int32_t)taosArrayGetSize(pUidList));
74,190,622✔
2217
    code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status,
74,179,790✔
2218
      pStorageAPI, tsTagFilterCache || tsStableTagFilterCache,
74,179,790✔
2219
      &listAdded, pStreamInfo);
2220
    QUERY_CHECK_CODE(code, lino, _error);
74,182,747✔
2221

2222
    // let's add the filter results into meta-cache
2223
    numOfTables = taosArrayGetSize(pUidList);
74,181,651✔
2224

2225
    if (canCacheTagEqCondFilter) {
74,179,783✔
2226
      qInfo("%s, suid:%" PRIu64 ", add uid list to stable tag filter cache, "
10,108✔
2227
            "uidListSize:%d, origin key:%" PRIu64 ",%" PRIu64,
2228
            idstr, pScanNode->suid, (int32_t)numOfTables,
2229
            *(uint64_t*)context.digest, *(uint64_t*)(context.digest + 8));
2230

2231
      code = pStorageAPI->metaFn.putStableCachedTableList(
10,108✔
2232
        pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
2233
        context.digest, tListLen(context.digest),
2234
        pUidList, &pTagColIds);
2235
      QUERY_CHECK_CODE(code, lino, _end);
10,108✔
2236

2237
      digest[0] = 1;
10,108✔
2238
      memcpy(digest + 1, context.digest, tListLen(context.digest));
10,108✔
2239
    } else if (tsTagFilterCache) {
74,169,675✔
2240
      qInfo("%s, suid:%" PRIu64 ", add uid list to normal tag filter cache, "
16,261✔
2241
            "uidListSize:%d, origin key:%" PRIu64 ",%" PRIu64,
2242
            idstr, pScanNode->suid, (int32_t)numOfTables,
2243
            *(uint64_t*)context.digest, *(uint64_t*)(context.digest + 8));
2244
      size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
16,261✔
2245
      pPayload = taosMemoryMalloc(size);
16,261✔
2246
      QUERY_CHECK_NULL(pPayload, code, lino, _end, terrno);
16,261✔
2247

2248
      *(int32_t*)pPayload = (int32_t)numOfTables;
16,261✔
2249
      if (numOfTables > 0) {
16,261✔
2250
        void* tmp = taosArrayGet(pUidList, 0);
13,570✔
2251
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
13,570✔
2252
        memcpy(pPayload + sizeof(int32_t), tmp, numOfTables * sizeof(uint64_t));
13,570✔
2253
      }
2254

2255
      code = pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid,
16,261✔
2256
                                                    context.digest,
2257
                                                    tListLen(context.digest),
2258
                                                    pPayload, size, 1);
2259
      if (TSDB_CODE_SUCCESS == code) {
16,261✔
2260
        /*
2261
          data referenced by pPayload is used in lru cache,
2262
          reset pPayload to NULL to avoid being freed in _error block
2263
        */
2264
        pPayload = NULL;
15,539✔
2265
      } else {
2266
        if (TSDB_CODE_DUP_KEY == code) {
722✔
2267
          /*
2268
            another thread has already put the same key into cache,
2269
            we can just ignore this error
2270
          */
2271
          code = TSDB_CODE_SUCCESS;
722✔
2272
        }
2273
        QUERY_CHECK_CODE(code, lino, _end);
722✔
2274
      }
2275

2276

2277
      digest[0] = 1;
16,261✔
2278
      memcpy(digest + 1, context.digest, tListLen(context.digest));
16,261✔
2279
    }
2280
  }
2281

2282
_end:
224,070,569✔
2283
  if (!listAdded) {
224,080,778✔
2284
    numOfTables = taosArrayGetSize(pUidList);
212,343,461✔
2285
    for (int i = 0; i < numOfTables; i++) {
642,964,651✔
2286
      void* tmp = taosArrayGet(pUidList, i);
430,614,523✔
2287
      QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
430,635,602✔
2288
      STableKeyInfo info = {.uid = *(uint64_t*)tmp, .groupId = 0};
430,635,602✔
2289

2290
      void* p = taosArrayPush(pListInfo->pTableList, &info);
430,618,582✔
2291
      if (p == NULL) {
430,672,389✔
2292
        taosArrayDestroy(pUidList);
×
2293
        return terrno;
×
2294
      }
2295

2296
      qTrace("tagfilter get uid:%" PRIu64 ", %s", info.uid, idstr);
430,672,389✔
2297
    }
2298
  }
2299

2300
  qDebug("%s, table list with %d uids built", idstr, (int32_t)numOfTables);
224,087,445✔
2301

2302
_error:
224,081,005✔
2303
  taosArrayDestroy(pUidList);
224,100,483✔
2304
  taosArrayDestroy(pTagColIds);
224,049,139✔
2305
  taosMemFreeClear(pTagCondKey);
224,053,803✔
2306
  taosMemFreeClear(pPayload);
224,053,803✔
2307
  if (code != TSDB_CODE_SUCCESS) {
224,053,803✔
2308
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,096✔
2309
  }
2310
  return code;
224,039,343✔
2311
}
2312

2313
int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList, void* pTaskInfo) {
4,120✔
2314
  int32_t        code = TSDB_CODE_SUCCESS;
4,120✔
2315
  int32_t        lino = 0;
4,120✔
2316
  SSubplan*      pSubplan = (SSubplan*)node;
4,120✔
2317
  SScanPhysiNode pNode = {0};
4,120✔
2318
  pNode.suid = suid;
4,120✔
2319
  pNode.uid = suid;
4,120✔
2320
  pNode.tableType = TSDB_SUPER_TABLE;
4,120✔
2321

2322
  STableListInfo* pTableListInfo = tableListCreate();
4,120✔
2323
  QUERY_CHECK_NULL(pTableListInfo, code, lino, _end, terrno);
4,120✔
2324
  uint8_t digest[17] = {0};
4,120✔
2325
  code = getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL,
4,120✔
2326
                      pTableListInfo, digest, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI, NULL);
2327
  QUERY_CHECK_CODE(code, lino, _end);
4,120✔
2328
  *tableList = pTableListInfo->pTableList;
4,120✔
2329
  pTableListInfo->pTableList = NULL;
4,120✔
2330
  tableListDestroy(pTableListInfo);
4,120✔
2331

2332
_end:
4,120✔
2333
  if (code != TSDB_CODE_SUCCESS) {
4,120✔
2334
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2335
  }
2336
  return code;
4,120✔
2337
}
2338

2339
size_t getTableTagsBufLen(const SNodeList* pGroups) {
×
2340
  size_t keyLen = 0;
×
2341

2342
  SNode* node;
2343
  FOREACH(node, pGroups) {
×
2344
    SExprNode* pExpr = (SExprNode*)node;
×
2345
    keyLen += pExpr->resType.bytes;
×
2346
  }
2347

2348
  keyLen += sizeof(int8_t) * LIST_LENGTH(pGroups);
×
2349
  return keyLen;
×
2350
}
2351

2352
int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId,
×
2353
                              SStorageAPI* pAPI) {
2354
  SMetaReader mr = {0};
×
2355

2356
  pAPI->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &pAPI->metaFn);
×
2357
  if (pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid) != 0) {  // table not exist
×
2358
    pAPI->metaReaderFn.clearReader(&mr);
×
2359
    return TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
2360
  }
2361

2362
  SNodeList* groupNew = NULL;
×
2363
  int32_t    code = nodesCloneList(pGroupNode, &groupNew);
×
2364
  if (TSDB_CODE_SUCCESS != code) {
×
2365
    pAPI->metaReaderFn.clearReader(&mr);
×
2366
    return code;
×
2367
  }
2368

2369
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
×
2370
  nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &ctx);
×
2371
  if (TSDB_CODE_SUCCESS != ctx.code) {
×
2372
    nodesDestroyList(groupNew);
×
2373
    pAPI->metaReaderFn.clearReader(&mr);
×
2374
    return code;
×
2375
  }
2376
  char* isNull = (char*)keyBuf;
×
2377
  char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(pGroupNode);
×
2378

2379
  SNode*  pNode;
2380
  int32_t index = 0;
×
2381
  FOREACH(pNode, groupNew) {
×
2382
    SNode*  pNew = NULL;
×
2383
    int32_t code = scalarCalculateConstants(pNode, &pNew);
×
2384
    if (TSDB_CODE_SUCCESS == code) {
×
2385
      REPLACE_NODE(pNew);
×
2386
    } else {
2387
      nodesDestroyList(groupNew);
×
2388
      pAPI->metaReaderFn.clearReader(&mr);
×
2389
      return code;
×
2390
    }
2391

2392
    if (nodeType(pNew) != QUERY_NODE_VALUE) {
×
2393
      nodesDestroyList(groupNew);
×
2394
      pAPI->metaReaderFn.clearReader(&mr);
×
2395
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
2396
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2397
    }
2398
    SValueNode* pValue = (SValueNode*)pNew;
×
2399

2400
    if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
×
2401
      isNull[index++] = 1;
×
2402
      continue;
×
2403
    } else {
2404
      isNull[index++] = 0;
×
2405
      char* data = nodesGetValueFromNode(pValue);
×
2406
      if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
×
2407
        if (tTagIsJson(data)) {
×
2408
          terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
×
2409
          nodesDestroyList(groupNew);
×
2410
          pAPI->metaReaderFn.clearReader(&mr);
×
2411
          return terrno;
×
2412
        }
2413
        int32_t len = getJsonValueLen(data);
×
2414
        memcpy(pStart, data, len);
×
2415
        pStart += len;
×
2416
      } else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
×
2417
        if (IS_STR_DATA_BLOB(pValue->node.resType.type)) {
×
2418
          return TSDB_CODE_BLOB_NOT_SUPPORT_TAG;
×
2419
        }
2420
        memcpy(pStart, data, varDataTLen(data));
×
2421
        pStart += varDataTLen(data);
×
2422
      } else {
2423
        memcpy(pStart, data, pValue->node.resType.bytes);
×
2424
        pStart += pValue->node.resType.bytes;
×
2425
      }
2426
    }
2427
  }
2428

2429
  int32_t len = (int32_t)(pStart - (char*)keyBuf);
×
2430
  *pGroupId = calcGroupId(keyBuf, len);
×
2431

2432
  nodesDestroyList(groupNew);
×
2433
  pAPI->metaReaderFn.clearReader(&mr);
×
2434

2435
  return TSDB_CODE_SUCCESS;
×
2436
}
2437

2438
SArray* makeColumnArrayFromList(SNodeList* pNodeList) {
8,498,346✔
2439
  if (!pNodeList) {
8,498,346✔
2440
    return NULL;
×
2441
  }
2442

2443
  size_t  numOfCols = LIST_LENGTH(pNodeList);
8,498,346✔
2444
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
8,499,408✔
2445
  if (pList == NULL) {
8,497,430✔
2446
    return NULL;
×
2447
  }
2448

2449
  for (int32_t i = 0; i < numOfCols; ++i) {
19,020,264✔
2450
    SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
10,523,053✔
2451
    if (!pColNode) {
10,523,657✔
2452
      taosArrayDestroy(pList);
×
2453
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
2454
      return NULL;
×
2455
    }
2456

2457
    // todo extract method
2458
    SColumn c = {0};
10,523,657✔
2459
    c.slotId = pColNode->slotId;
10,523,969✔
2460
    c.colId = pColNode->colId;
10,524,177✔
2461
    c.type = pColNode->node.resType.type;
10,523,854✔
2462
    c.bytes = pColNode->node.resType.bytes;
10,522,115✔
2463
    c.precision = pColNode->node.resType.precision;
10,523,042✔
2464
    c.scale = pColNode->node.resType.scale;
10,523,719✔
2465

2466
    void* tmp = taosArrayPush(pList, &c);
10,523,896✔
2467
    if (!tmp) {
10,523,896✔
2468
      taosArrayDestroy(pList);
×
2469
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
2470
      return NULL;
×
2471
    }
2472
  }
2473

2474
  return pList;
8,497,211✔
2475
}
2476

2477
int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
275,885,571✔
2478
                            int32_t type, SColMatchInfo* pMatchInfo) {
2479
  size_t  numOfCols = LIST_LENGTH(pNodeList);
275,885,571✔
2480
  int32_t code = TSDB_CODE_SUCCESS;
275,891,890✔
2481
  int32_t lino = 0;
275,891,890✔
2482

2483
  pMatchInfo->matchType = type;
275,891,890✔
2484

2485
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchItem));
275,754,974✔
2486
  if (pList == NULL) {
275,818,658✔
2487
    code = terrno;
×
2488
    return code;
×
2489
  }
2490

2491
  for (int32_t i = 0; i < numOfCols; ++i) {
1,284,015,032✔
2492
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
1,008,139,158✔
2493
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
1,008,187,620✔
2494
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
1,008,187,620✔
2495
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
1,002,423,219✔
2496

2497
      SColMatchItem c = {.needOutput = true};
1,002,448,853✔
2498
      c.colId = pColNode->colId;
1,002,451,062✔
2499
      c.srcSlotId = pColNode->slotId;
1,002,408,633✔
2500
      c.dstSlotId = pNode->slotId;
1,002,390,923✔
2501
      c.isPk = pColNode->isPk;
1,002,438,279✔
2502
      c.dataType = pColNode->node.resType;
1,002,362,016✔
2503
      void* tmp = taosArrayPush(pList, &c);
1,002,421,508✔
2504
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,002,421,508✔
2505
    }
2506
  }
2507

2508
  // set the output flag for each column in SColMatchInfo, according to the
2509
  *numOfOutputCols = 0;
275,875,874✔
2510
  int32_t num = LIST_LENGTH(pOutputNodeList->pSlots);
275,923,454✔
2511
  for (int32_t i = 0; i < num; ++i) {
1,394,204,692✔
2512
    SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i);
1,118,328,418✔
2513
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
1,118,369,868✔
2514

2515
    // todo: add reserve flag check
2516
    // it is a column reserved for the arithmetic expression calculation
2517
    if (pNode->slotId >= numOfCols) {
1,118,369,868✔
2518
      (*numOfOutputCols) += 1;
110,192,052✔
2519
      continue;
110,192,643✔
2520
    }
2521

2522
    SColMatchItem* info = NULL;
1,008,208,001✔
2523
    for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
2,147,483,647✔
2524
      info = taosArrayGet(pList, j);
2,147,483,647✔
2525
      QUERY_CHECK_NULL(info, code, lino, _end, terrno);
2,147,483,647✔
2526
      if (info->dstSlotId == pNode->slotId) {
2,147,483,647✔
2527
        break;
1,001,569,536✔
2528
      }
2529
    }
2530

2531
    if (pNode->output) {
8,869,900✔
2532
      (*numOfOutputCols) += 1;
999,065,754✔
2533
    } else if (info != NULL) {
9,078,378✔
2534
      // select distinct tbname from stb where tbname='abc';
2535
      info->needOutput = false;
9,091,766✔
2536
    }
2537
  }
2538

2539
  pMatchInfo->pList = pList;
275,876,274✔
2540

2541
_end:
275,860,422✔
2542
  if (code != TSDB_CODE_SUCCESS) {
275,860,422✔
2543
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2544
  }
2545
  return code;
275,880,736✔
2546
}
2547

2548
static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision,
846,055,857✔
2549
                                  const char* name) {
2550
  SResSchema s = {0};
846,055,857✔
2551
  s.scale = scale;
846,087,697✔
2552
  s.type = type;
846,087,697✔
2553
  s.bytes = bytes;
846,087,697✔
2554
  s.slotId = slotId;
846,087,697✔
2555
  s.precision = precision;
846,087,697✔
2556
  tstrncpy(s.name, name, tListLen(s.name));
846,087,697✔
2557

2558
  return s;
846,087,697✔
2559
}
2560

2561
static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType, EColumnType colType) {
766,179,429✔
2562
  SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn));
766,179,429✔
2563
  if (pCol == NULL) {
765,963,421✔
2564
    return NULL;
×
2565
  }
2566

2567
  pCol->slotId = slotId;
765,963,421✔
2568
  pCol->colId = colId;
765,991,501✔
2569
  pCol->bytes = pType->bytes;
766,017,047✔
2570
  pCol->type = pType->type;
765,999,152✔
2571
  pCol->scale = pType->scale;
766,160,953✔
2572
  pCol->precision = pType->precision;
766,097,048✔
2573
  pCol->dataBlockId = blockId;
766,203,442✔
2574
  pCol->colType = colType;
766,087,398✔
2575
  return pCol;
766,173,582✔
2576
}
2577

2578
int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
850,731,767✔
2579
  int32_t code = TSDB_CODE_SUCCESS;
850,731,767✔
2580
  int32_t lino = 0;
850,731,767✔
2581
  pExp->base.numOfParams = 0;
850,731,767✔
2582
  pExp->base.pParam = NULL;
850,757,007✔
2583
  pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode));
850,780,319✔
2584
  QUERY_CHECK_NULL(pExp->pExpr, code, lino, _end, terrno);
850,488,598✔
2585

2586
  pExp->pExpr->_function.num = 1;
850,594,732✔
2587
  pExp->pExpr->_function.functionId = -1;
850,664,512✔
2588

2589
  int32_t type = nodeType(pNode);
850,703,416✔
2590
  // it is a project query, or group by column
2591
  if (type == QUERY_NODE_COLUMN) {
850,744,019✔
2592
    pExp->pExpr->nodeType = QUERY_NODE_COLUMN;
508,962,329✔
2593
    SColumnNode* pColNode = (SColumnNode*)pNode;
509,003,025✔
2594

2595
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
509,003,025✔
2596
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
508,889,648✔
2597

2598
    pExp->base.numOfParams = 1;
508,908,086✔
2599

2600
    SDataType* pType = &pColNode->node.resType;
508,919,525✔
2601
    pExp->base.resSchema =
2602
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pColNode->colName);
508,962,698✔
2603

2604
    pExp->base.pParam[0].pCol =
1,017,901,875✔
2605
        createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType);
1,017,850,073✔
2606
    QUERY_CHECK_NULL(pExp->base.pParam[0].pCol, code, lino, _end, terrno);
508,983,229✔
2607

2608
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
508,872,151✔
2609
  } else if (type == QUERY_NODE_VALUE) {
341,781,690✔
2610
    pExp->pExpr->nodeType = QUERY_NODE_VALUE;
16,286,014✔
2611
    SValueNode* pValNode = (SValueNode*)pNode;
16,285,035✔
2612

2613
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
16,285,035✔
2614
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
16,282,523✔
2615

2616
    pExp->base.numOfParams = 1;
16,285,160✔
2617

2618
    SDataType* pType = &pValNode->node.resType;
16,284,607✔
2619
    pExp->base.resSchema =
2620
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName);
16,282,953✔
2621
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
16,283,805✔
2622
    code = nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
16,280,653✔
2623
    QUERY_CHECK_CODE(code, lino, _end);
16,284,547✔
2624
  } else if (type == QUERY_NODE_REMOTE_VALUE) {
325,495,676✔
2625
    SRemoteValueNode* pRemote = (SRemoteValueNode*)pNode;
28,019,127✔
2626
    code = qFetchRemoteValue(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, pRemote);
28,019,127✔
2627
    QUERY_CHECK_CODE(code, lino, _end);
28,026,405✔
2628

2629
    pExp->pExpr->nodeType = QUERY_NODE_VALUE;
23,289,386✔
2630
    SValueNode* pValNode = (SValueNode*)pNode;
23,289,932✔
2631

2632
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
23,289,932✔
2633
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
23,289,932✔
2634

2635
    pExp->base.numOfParams = 1;
23,289,932✔
2636

2637
    SDataType* pType = &pValNode->node.resType;
23,289,932✔
2638
    pExp->base.resSchema =
2639
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName);
23,289,386✔
2640
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
23,288,842✔
2641
    code = nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
23,289,386✔
2642
    QUERY_CHECK_CODE(code, lino, _end);
23,289,932✔
2643
  } else if (type == QUERY_NODE_FUNCTION) {
297,476,549✔
2644
    pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
265,948,628✔
2645
    SFunctionNode* pFuncNode = (SFunctionNode*)pNode;
265,949,124✔
2646

2647
    SDataType* pType = &pFuncNode->node.resType;
265,949,124✔
2648
    pExp->base.resSchema =
2649
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pFuncNode->node.aliasName);
265,946,878✔
2650
    tExprNode* pExprNode = pExp->pExpr;
265,940,906✔
2651

2652
    pExprNode->_function.functionId = pFuncNode->funcId;
265,945,733✔
2653
    pExprNode->_function.pFunctNode = pFuncNode;
265,945,834✔
2654
    pExprNode->_function.functionType = pFuncNode->funcType;
265,954,747✔
2655

2656
    tstrncpy(pExprNode->_function.functionName, pFuncNode->functionName, tListLen(pExprNode->_function.functionName));
265,941,215✔
2657

2658
    pExp->base.pParamList = pFuncNode->pParameterList;
265,942,803✔
2659
#if 1
2660
    // todo refactor: add the parameter for tbname function
2661
    const char* name = "tbname";
265,957,021✔
2662
    int32_t     len = strlen(name);
265,957,021✔
2663

2664
    if (!pFuncNode->pParameterList && (memcmp(pExprNode->_function.functionName, name, len) == 0) &&
265,957,021✔
2665
        pExprNode->_function.functionName[len] == 0) {
9,710,204✔
2666
      pFuncNode->pParameterList = NULL;
9,710,011✔
2667
      int32_t     code = nodesMakeList(&pFuncNode->pParameterList);
9,709,645✔
2668
      SValueNode* res = NULL;
9,710,869✔
2669
      if (TSDB_CODE_SUCCESS == code) {
9,710,869✔
2670
        code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
9,710,869✔
2671
      }
2672
      QUERY_CHECK_CODE(code, lino, _end);
9,711,324✔
2673
      res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
9,711,324✔
2674
      code = nodesListAppend(pFuncNode->pParameterList, (SNode*)res);
9,709,868✔
2675
      if (code != TSDB_CODE_SUCCESS) {
9,711,675✔
2676
        nodesDestroyNode((SNode*)res);
×
2677
        res = NULL;
×
2678
      }
2679
      QUERY_CHECK_CODE(code, lino, _end);
9,711,675✔
2680
    }
2681
#endif
2682

2683
    int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
265,972,108✔
2684

2685
    pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam));
265,958,595✔
2686
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
265,930,424✔
2687
    pExp->base.numOfParams = numOfParam;
265,924,651✔
2688

2689
    for (int32_t j = 0; j < numOfParam && TSDB_CODE_SUCCESS == code; ++j) {
654,828,548✔
2690
      SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
389,111,738✔
2691
      QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
389,117,950✔
2692
      if (p1->type == QUERY_NODE_COLUMN) {
389,117,950✔
2693
        SColumnNode* pcn = (SColumnNode*)p1;
257,170,603✔
2694

2695
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
257,170,603✔
2696
        pExp->base.pParam[j].pCol =
514,326,435✔
2697
            createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType);
514,347,882✔
2698
        QUERY_CHECK_NULL(pExp->base.pParam[j].pCol, code, lino, _end, terrno);
257,166,261✔
2699
      } else if (p1->type == QUERY_NODE_VALUE) {
131,943,735✔
2700
        SValueNode* pvn = (SValueNode*)p1;
67,444,220✔
2701
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
67,444,220✔
2702
        code = nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
67,445,077✔
2703
        QUERY_CHECK_CODE(code, lino, _end);
67,445,157✔
2704
      } else if (p1->type == QUERY_NODE_REMOTE_VALUE) {
64,512,086✔
2705
        SRemoteValueNode* pRemote = (SRemoteValueNode*)p1;
1,419,774✔
2706
        code = qFetchRemoteValue(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, pRemote);
1,419,774✔
2707
        QUERY_CHECK_CODE(code, lino, _end);
1,419,774✔
2708

2709
        SValueNode* pvn = (SValueNode*)pRemote;
1,203,226✔
2710
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
1,203,226✔
2711
        code = nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
1,203,226✔
2712
        QUERY_CHECK_CODE(code, lino, _end);
1,180,302✔
2713
      }
2714
    }
2715
    pExp->pExpr->_function.bindExprID = ((SExprNode*)pNode)->bindExprID;
265,716,810✔
2716
  } else if (type == QUERY_NODE_OPERATOR) {
31,527,921✔
2717
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
25,961,552✔
2718
    SOperatorNode* pOpNode = (SOperatorNode*)pNode;
25,960,026✔
2719

2720
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
25,960,026✔
2721
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
25,958,083✔
2722
    pExp->base.numOfParams = 1;
25,959,097✔
2723

2724
    SDataType* pType = &pOpNode->node.resType;
25,959,448✔
2725
    pExp->base.resSchema =
2726
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pOpNode->node.aliasName);
25,957,285✔
2727
    pExp->pExpr->_optrRoot.pRootNode = pNode;
25,959,995✔
2728
  } else if (type == QUERY_NODE_CASE_WHEN) {
5,566,551✔
2729
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
5,568,718✔
2730
    SCaseWhenNode* pCaseNode = (SCaseWhenNode*)pNode;
5,568,718✔
2731

2732
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
5,568,718✔
2733
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
5,568,718✔
2734
    pExp->base.numOfParams = 1;
5,568,718✔
2735

2736
    SDataType* pType = &pCaseNode->node.resType;
5,568,718✔
2737
    pExp->base.resSchema =
2738
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCaseNode->node.aliasName);
5,568,718✔
2739
    pExp->pExpr->_optrRoot.pRootNode = pNode;
5,568,718✔
2740
  } else if (type == QUERY_NODE_LOGIC_CONDITION) {
43✔
2741
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
1,166✔
2742
    SLogicConditionNode* pCond = (SLogicConditionNode*)pNode;
1,166✔
2743
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
1,166✔
2744
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
1,166✔
2745
    pExp->base.numOfParams = 1;
1,166✔
2746
    SDataType* pType = &pCond->node.resType;
1,166✔
2747
    pExp->base.resSchema =
2748
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName);
1,166✔
2749
    pExp->pExpr->_optrRoot.pRootNode = pNode;
1,166✔
2750
  } else {
UNCOV
2751
    code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
2752
    QUERY_CHECK_CODE(code, lino, _end);
×
2753
  }
2754
  pExp->pExpr->relatedTo = ((SExprNode*)pNode)->relatedTo;
845,817,631✔
2755
_end:
850,760,989✔
2756
  if (code != TSDB_CODE_SUCCESS) {
850,760,989✔
2757
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
4,953,567✔
2758
  }
2759
  return code;
850,787,778✔
2760
}
2761

2762
int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) {
850,766,975✔
2763
  return createExprFromOneNode(pExp, pTargetNode->pExpr, pTargetNode->slotId);
850,766,975✔
2764
}
2765

2766
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
×
2767
  *numOfExprs = LIST_LENGTH(pNodeList);
×
2768
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
×
2769
  if (!pExprs) {
×
2770
    return NULL;
×
2771
  }
2772

2773
  for (int32_t i = 0; i < (*numOfExprs); ++i) {
×
2774
    SExprInfo* pExp = &pExprs[i];
×
2775
    int32_t    code = createExprFromOneNode(pExp, nodesListGetNode(pNodeList, i), i + UD_TAG_COLUMN_INDEX);
×
2776
    if (code != TSDB_CODE_SUCCESS) {
×
2777
      taosMemoryFreeClear(pExprs);
×
2778
      terrno = code;
×
2779
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2780
      return NULL;
×
2781
    }
2782
  }
2783

2784
  return pExprs;
×
2785
}
2786

2787
int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs) {
370,022,130✔
2788
  QRY_PARAM_CHECK(pExprInfo);
370,022,130✔
2789

2790
  int32_t code = 0;
370,050,382✔
2791
  int32_t numOfFuncs = LIST_LENGTH(pNodeList);
370,050,382✔
2792
  int32_t numOfGroupKeys = 0;
370,004,380✔
2793
  if (pGroupKeys != NULL) {
370,004,380✔
2794
    numOfGroupKeys = LIST_LENGTH(pGroupKeys);
35,036,930✔
2795
  }
2796

2797
  *numOfExprs = numOfFuncs + numOfGroupKeys;
370,003,826✔
2798
  if (*numOfExprs == 0) {
370,042,715✔
2799
    return code;
45,456,931✔
2800
  }
2801

2802
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
324,609,911✔
2803
  if (pExprs == NULL) {
324,467,177✔
2804
    return terrno;
×
2805
  }
2806

2807
  for (int32_t i = 0; i < (*numOfExprs); ++i) {
1,170,142,638✔
2808
    STargetNode* pTargetNode = NULL;
850,519,886✔
2809
    if (i < numOfFuncs) {
850,519,886✔
2810
      pTargetNode = (STargetNode*)nodesListGetNode(pNodeList, i);
809,529,322✔
2811
    } else {
2812
      pTargetNode = (STargetNode*)nodesListGetNode(pGroupKeys, i - numOfFuncs);
40,990,564✔
2813
    }
2814
    if (!pTargetNode) {
850,657,408✔
2815
      destroyExprInfo(pExprs, *numOfExprs);
×
2816
      taosMemoryFreeClear(pExprs);
×
2817
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
2818
      return terrno;
×
2819
    }
2820

2821
    SExprInfo* pExp = &pExprs[i];
850,657,408✔
2822
    code = createExprFromTargetNode(pExp, pTargetNode);
850,676,851✔
2823
    if (code != TSDB_CODE_SUCCESS) {
850,629,028✔
2824
      destroyExprInfo(pExprs, *numOfExprs);
4,953,567✔
2825
      taosMemoryFreeClear(pExprs);
4,953,567✔
2826
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
4,953,567✔
2827
      return code;
4,953,567✔
2828
    }
2829
  }
2830

2831
  *pExprInfo = pExprs;
319,687,000✔
2832
  return code;
319,628,679✔
2833
}
2834

2835
static void deleteSubsidiareCtx(void* pData) {
×
2836
  SSubsidiaryResInfo* pCtx = (SSubsidiaryResInfo*)pData;
×
2837
  if (pCtx->pCtx) {
×
2838
    taosMemoryFreeClear(pCtx->pCtx);
×
2839
  }
2840
}
×
2841

2842
// set the output buffer for the selectivity + tag query
2843
static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
342,705,309✔
2844
  int32_t num = 0;
342,705,309✔
2845
  int32_t code = TSDB_CODE_SUCCESS;
342,705,309✔
2846
  int32_t lino = 0;
342,705,309✔
2847

2848
  SArray* pValCtxArray = NULL;
342,705,309✔
2849
  for (int32_t i = numOfOutput - 1; i > 0; --i) {  // select Func is at the end of the list
862,514,771✔
2850
    int32_t funcIdx = pCtx[i].pExpr->pExpr->_function.bindExprID;
519,842,290✔
2851
    if (funcIdx > 0) {
519,841,144✔
2852
      if (pValCtxArray == NULL) {
1,856,184✔
2853
        // the end of the list is the select function of biggest index
2854
        pValCtxArray = taosArrayInit_s(sizeof(SSubsidiaryResInfo*), funcIdx);
1,332,272✔
2855
        if (pValCtxArray == NULL) {
1,332,272✔
2856
          return terrno;
×
2857
        }
2858
      }
2859
      if (funcIdx > pValCtxArray->size) {
1,856,184✔
2860
        qError("funcIdx:%d is out of range", funcIdx);
×
2861
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2862
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2863
      }
2864
      SSubsidiaryResInfo* pSubsidiary = &pCtx[i].subsidiaries;
1,856,184✔
2865
      pSubsidiary->pCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
1,855,678✔
2866
      if (pSubsidiary->pCtx == NULL) {
1,855,678✔
2867
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2868
        return terrno;
×
2869
      }
2870
      pSubsidiary->num = 0;
1,856,184✔
2871
      taosArraySet(pValCtxArray, funcIdx - 1, &pSubsidiary);
1,855,678✔
2872
    }
2873
  }
2874

2875
  SqlFunctionCtx*  p = NULL;
342,672,481✔
2876
  SqlFunctionCtx** pValCtx = NULL;
342,672,481✔
2877
  if (pValCtxArray == NULL) {
342,672,481✔
2878
    pValCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
341,354,686✔
2879
    if (pValCtx == NULL) {
341,287,353✔
2880
      QUERY_CHECK_CODE(terrno, lino, _end);
×
2881
    }
2882
  }
2883

2884
  for (int32_t i = 0; i < numOfOutput; ++i) {
1,172,141,054✔
2885
    const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
829,591,078✔
2886
    if ((strcmp(pName, "_select_value") == 0)) {
829,668,406✔
2887
      if (pValCtxArray == NULL) {
5,936,480✔
2888
        pValCtx[num++] = &pCtx[i];
3,331,756✔
2889
      } else {
2890
        int32_t bindFuncIndex = pCtx[i].pExpr->pExpr->relatedTo;  // start from index 1;
2,604,724✔
2891
        if (bindFuncIndex > 0) {                                  // 0 is default index related to the select function
2,604,716✔
2892
          bindFuncIndex -= 1;
2,544,502✔
2893
        }
2894
        SSubsidiaryResInfo** pSubsidiary = taosArrayGet(pValCtxArray, bindFuncIndex);
2,604,716✔
2895
        if (pSubsidiary == NULL) {
2,605,222✔
2896
          QUERY_CHECK_CODE(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, lino, _end);
×
2897
        }
2898
        (*pSubsidiary)->pCtx[(*pSubsidiary)->num] = &pCtx[i];
2,605,222✔
2899
        (*pSubsidiary)->num++;
2,604,210✔
2900
      }
2901
    } else if (fmIsSelectFunc(pCtx[i].functionId)) {
823,731,926✔
2902
      if (pValCtxArray == NULL) {
67,502,140✔
2903
        p = &pCtx[i];
65,237,573✔
2904
      }
2905
    }
2906
  }
2907

2908
  if (p != NULL) {
342,549,976✔
2909
    p->subsidiaries.pCtx = pValCtx;
25,754,253✔
2910
    p->subsidiaries.num = num;
25,754,253✔
2911
  } else {
2912
    taosMemoryFreeClear(pValCtx);
316,795,723✔
2913
  }
2914

2915
_end:
1,291,567✔
2916
  if (code != TSDB_CODE_SUCCESS) {
342,579,973✔
2917
    taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2918
    taosMemoryFreeClear(pValCtx);
×
2919
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2920
  } else {
2921
    taosArrayDestroy(pValCtxArray);
342,579,973✔
2922
  }
2923
  return code;
342,671,722✔
2924
}
2925

2926
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset,
342,715,305✔
2927
                                     SFunctionStateStore* pStore) {
2928
  int32_t         code = TSDB_CODE_SUCCESS;
342,715,305✔
2929
  int32_t         lino = 0;
342,715,305✔
2930
  SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
342,715,305✔
2931
  if (pFuncCtx == NULL) {
342,593,859✔
2932
    return NULL;
×
2933
  }
2934

2935
  *rowEntryInfoOffset = taosMemoryCalloc(numOfOutput, sizeof(int32_t));
342,593,859✔
2936
  if (*rowEntryInfoOffset == 0) {
342,677,397✔
2937
    taosMemoryFreeClear(pFuncCtx);
×
2938
    return NULL;
×
2939
  }
2940

2941
  for (int32_t i = 0; i < numOfOutput; ++i) {
1,172,282,175✔
2942
    SExprInfo* pExpr = &pExprInfo[i];
829,686,379✔
2943

2944
    SExprBasicInfo* pFunct = &pExpr->base;
829,641,295✔
2945
    SqlFunctionCtx* pCtx = &pFuncCtx[i];
829,667,780✔
2946

2947
    pCtx->functionId = -1;
829,670,697✔
2948
    pCtx->pExpr = pExpr;
829,672,940✔
2949

2950
    if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) {
829,650,840✔
2951
      SFuncExecEnv env = {0};
264,347,915✔
2952
      pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
264,350,982✔
2953
      pCtx->isPseudoFunc = fmIsWindowPseudoColumnFunc(pCtx->functionId) || fmIsPlaceHolderFunc(pCtx->functionId);
264,357,059✔
2954
      pCtx->isNotNullFunc = fmIsNotNullOutputFunc(pCtx->functionId);
264,350,323✔
2955

2956
      bool isUdaf = fmIsUserDefinedFunc(pCtx->functionId);
264,361,726✔
2957
      if (fmIsAggFunc(pCtx->functionId) || fmIsIndefiniteRowsFunc(pCtx->functionId)) {
439,346,340✔
2958
        if (!isUdaf) {
175,020,705✔
2959
          code = fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
174,978,296✔
2960
          QUERY_CHECK_CODE(code, lino, _end);
174,947,639✔
2961
        } else {
2962
          char* udfName = pExpr->pExpr->_function.pFunctNode->functionName;
42,409✔
2963
          pCtx->udfName = taosStrdup(udfName);
42,409✔
2964
          QUERY_CHECK_NULL(pCtx->udfName, code, lino, _end, terrno);
42,409✔
2965

2966
          code = fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
42,409✔
2967
          QUERY_CHECK_CODE(code, lino, _end);
42,409✔
2968
        }
2969
        bool tmp = pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
174,990,048✔
2970
        if (!tmp) {
174,994,531✔
2971
          code = terrno;
×
2972
          QUERY_CHECK_CODE(code, lino, _end);
4,154✔
2973
        }
2974
      } else {
2975
        if (fmIsPlaceHolderFunc(pCtx->functionId)) {
89,322,520✔
2976
          code = fmGetStreamPesudoFuncEnv(pCtx->functionId, pExpr->base.pParamList, &env);
9,295,527✔
2977
          QUERY_CHECK_CODE(code, lino, _end);
9,295,527✔
2978
        }      
2979
        
2980
        code = fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp);
89,325,061✔
2981
        if (code != TSDB_CODE_SUCCESS && isUdaf) {
89,329,415✔
2982
          code = TSDB_CODE_SUCCESS;
26,028✔
2983
        }
2984
        QUERY_CHECK_CODE(code, lino, _end);
89,329,415✔
2985

2986
        if (pCtx->sfp.getEnv != NULL) {
89,329,415✔
2987
          bool tmp = pCtx->sfp.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
18,133,229✔
2988
          if (!tmp) {
18,132,765✔
2989
            code = terrno;
×
2990
            QUERY_CHECK_CODE(code, lino, _end);
×
2991
          }
2992
        }
2993
      }
2994
      pCtx->resDataInfo.interBufSize = env.calcMemSize;
264,330,383✔
2995
    } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR ||
565,277,884✔
2996
               pExpr->pExpr->nodeType == QUERY_NODE_VALUE) {
39,523,261✔
2997
      // for simple column, the result buffer needs to hold at least one element.
2998
      pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes;
565,363,700✔
2999
    }
3000

3001
    pCtx->input.numOfInputCols = pFunct->numOfParams;
829,689,085✔
3002
    pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
829,670,445✔
3003
    QUERY_CHECK_NULL(pCtx->input.pData, code, lino, _end, terrno);
829,691,738✔
3004
    pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
829,649,189✔
3005
    QUERY_CHECK_NULL(pCtx->input.pColumnDataAgg, code, lino, _end, terrno);
829,715,409✔
3006

3007
    pCtx->pTsOutput = NULL;
829,648,780✔
3008
    pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
829,727,946✔
3009
    pCtx->resDataInfo.type = pFunct->resSchema.type;
829,644,446✔
3010
    pCtx->order = TSDB_ORDER_ASC;
829,715,971✔
3011
    pCtx->start.key = INT64_MIN;
829,706,779✔
3012
    pCtx->end.key = INT64_MIN;
829,692,246✔
3013
    pCtx->numOfParams = pExpr->base.numOfParams;
829,646,503✔
3014
    pCtx->param = pFunct->pParam;
829,737,826✔
3015
    pCtx->saveHandle.currentPage = -1;
829,653,952✔
3016
    pCtx->pStore = pStore;
829,715,405✔
3017
    pCtx->hasWindowOrGroup = false;
829,719,288✔
3018
    pCtx->needCleanup = false;
829,649,851✔
3019
    pCtx->skipDynDataCheck = false;
829,668,211✔
3020
  }
3021

3022
  for (int32_t i = 1; i < numOfOutput; ++i) {
862,571,368✔
3023
    (*rowEntryInfoOffset)[i] = (int32_t)((*rowEntryInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) +
1,039,707,691✔
3024
                                         pFuncCtx[i - 1].resDataInfo.interBufSize);
519,874,687✔
3025
  }
3026

3027
  code = setSelectValueColumnInfo(pFuncCtx, numOfOutput);
342,701,323✔
3028
  QUERY_CHECK_CODE(code, lino, _end);
342,670,876✔
3029

3030
_end:
342,670,876✔
3031
  if (code != TSDB_CODE_SUCCESS) {
342,652,169✔
3032
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3033
    for (int32_t i = 0; i < numOfOutput; ++i) {
×
3034
      taosMemoryFree(pFuncCtx[i].input.pData);
×
3035
      taosMemoryFree(pFuncCtx[i].input.pColumnDataAgg);
×
3036
    }
3037
    taosMemoryFreeClear(*rowEntryInfoOffset);
×
3038
    taosMemoryFreeClear(pFuncCtx);
×
3039

3040
    terrno = code;
×
3041
    return NULL;
×
3042
  }
3043
  return pFuncCtx;
342,652,169✔
3044
}
3045

3046
// NOTE: sources columns are more than the destination SSDatablock columns.
3047
// doFilter in table scan needs every column even its output is false
3048
int32_t relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn) {
10,941,244✔
3049
  int32_t code = TSDB_CODE_SUCCESS;
10,941,244✔
3050
  size_t  numOfSrcCols = taosArrayGetSize(pCols);
10,941,244✔
3051

3052
  int32_t i = 0, j = 0;
10,941,452✔
3053
  while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
101,704,386✔
3054
    SColumnInfoData* p = taosArrayGet(pCols, i);
90,763,350✔
3055
    if (!p) {
90,765,023✔
3056
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3057
      return terrno;
×
3058
    }
3059
    SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j);
90,765,023✔
3060
    if (!pmInfo) {
90,765,023✔
3061
      return terrno;
×
3062
    }
3063

3064
    if (p->info.colId == pmInfo->colId) {
90,765,023✔
3065
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->dstSlotId);
81,891,244✔
3066
      if (!pDst) {
81,891,904✔
3067
        return terrno;
×
3068
      }
3069
      code = colDataAssign(pDst, p, pBlock->info.rows, &pBlock->info);
81,891,904✔
3070
      if (code != TSDB_CODE_SUCCESS) {
81,889,815✔
3071
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3072
        return code;
×
3073
      }
3074
      i++;
81,889,815✔
3075
      j++;
81,889,815✔
3076
    } else if (p->info.colId < pmInfo->colId) {
8,873,119✔
3077
      i++;
8,873,119✔
3078
    } else {
3079
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
3080
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3081
    }
3082
  }
3083
  return code;
10,941,452✔
3084
}
3085

3086
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
196,289,918✔
3087
  SInterval interval = {
392,454,123✔
3088
      .interval = pTableScanNode->interval,
196,175,029✔
3089
      .sliding = pTableScanNode->sliding,
196,166,278✔
3090
      .intervalUnit = pTableScanNode->intervalUnit,
196,309,104✔
3091
      .slidingUnit = pTableScanNode->slidingUnit,
196,228,259✔
3092
      .offset = pTableScanNode->offset,
196,308,915✔
3093
      .precision = pTableScanNode->scan.node.pOutputDataBlockDesc->precision,
196,301,017✔
3094
      .timeRange = pTableScanNode->scanRange,
3095
  };
3096
  calcIntervalAutoOffset(&interval);
196,145,562✔
3097

3098
  return interval;
196,200,594✔
3099
}
3100

3101
SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
46,062,406✔
3102
  SColumn c = {0};
46,062,406✔
3103

3104
  c.slotId = pColNode->slotId;
46,062,406✔
3105
  c.colId = pColNode->colId;
46,062,148✔
3106
  c.type = pColNode->node.resType.type;
46,061,457✔
3107
  c.bytes = pColNode->node.resType.bytes;
46,062,016✔
3108
  c.scale = pColNode->node.resType.scale;
46,062,447✔
3109
  c.precision = pColNode->node.resType.precision;
46,063,880✔
3110
  return c;
46,059,289✔
3111
}
3112

3113

3114
/**
3115
 * @brief Determine the actual time range for reading data based on the RANGE clause and the WHERE conditions.
3116
 * @param[in] cond The range specified by WHERE condition.
3117
 * @param[in] range The range specified by RANGE clause.
3118
 * @param[out] twindow The range to be read in DESC order, and only one record is needed.
3119
 * @param[out] extTwindow The external range to read for only one record, which is used for FILL clause.
3120
 * @note `cond` and `twindow` may be the same address.
3121
 */
3122
static int32_t getQueryExtWindow(const STimeWindow* cond, const STimeWindow* range, STimeWindow* twindow,
1,647,587✔
3123
                                 STimeWindow* extTwindows) {
3124
  int32_t     code = TSDB_CODE_SUCCESS;
1,647,587✔
3125
  int32_t     lino = 0;
1,647,587✔
3126
  STimeWindow tempWindow;
3127

3128
  if (cond->skey > cond->ekey || range->skey > range->ekey) {
1,647,587✔
3129
    *twindow = extTwindows[0] = extTwindows[1] = TSWINDOW_DESC_INITIALIZER;
2,995✔
3130
    return code;
2,995✔
3131
  }
3132

3133
  if (range->ekey < cond->skey) {
1,644,592✔
3134
    extTwindows[1] = *cond;
241,772✔
3135
    *twindow = extTwindows[0] = TSWINDOW_DESC_INITIALIZER;
241,772✔
3136
    return code;
241,772✔
3137
  }
3138

3139
  if (cond->ekey < range->skey) {
1,402,820✔
3140
    extTwindows[0] = *cond;
190,217✔
3141
    *twindow = extTwindows[1] = TSWINDOW_DESC_INITIALIZER;
190,217✔
3142
    return code;
190,217✔
3143
  }
3144

3145
  // Only scan data in the time range intersecion.
3146
  extTwindows[0] = extTwindows[1] = *cond;
1,212,603✔
3147
  twindow->skey = TMAX(cond->skey, range->skey);
1,212,603✔
3148
  twindow->ekey = TMIN(cond->ekey, range->ekey);
1,212,603✔
3149
  extTwindows[0].ekey = twindow->skey - 1;
1,212,603✔
3150
  extTwindows[1].skey = twindow->ekey + 1;
1,212,603✔
3151

3152
  return code;
1,212,603✔
3153
}
3154

3155
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode,
219,700,944✔
3156
                               const SReadHandle* readHandle, bool applyExtWin) {
3157
  int32_t code = 0;                             
219,700,944✔
3158
  pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
219,700,944✔
3159
  pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
219,701,873✔
3160

3161
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
219,689,687✔
3162
  if (!pCond->colList) {
219,637,718✔
3163
    return terrno;
×
3164
  }
3165
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
219,650,130✔
3166
  if (pCond->pSlotList == NULL) {
219,675,594✔
3167
    taosMemoryFreeClear(pCond->colList);
×
3168
    return terrno;
×
3169
  }
3170

3171
  // TODO: get it from stable scan node
3172
  pCond->twindows = pTableScanNode->scanRange;
219,648,000✔
3173
  pCond->suid = pTableScanNode->scan.suid;
219,707,618✔
3174
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
219,681,590✔
3175
  pCond->startVersion = -1;
219,665,848✔
3176
  pCond->endVersion = -1;
219,623,380✔
3177
  pCond->skipRollup = readHandle->skipRollup;
219,655,429✔
3178
  if (readHandle->winRangeValid) {
219,665,011✔
3179
    pCond->twindows = readHandle->winRange;
321,654✔
3180
  }
3181
  pCond->cacheSttStatis = readHandle->cacheSttStatis;
219,623,827✔
3182
  // allowed read stt file optimization mode
3183
  pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) &&
439,397,693✔
3184
                       (pTableScanNode->scan.node.pConditions == NULL) && (pTableScanNode->interval == 0);
219,687,183✔
3185

3186
  int32_t j = 0;
219,687,950✔
3187
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
1,047,187,907✔
3188
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pTableScanNode->scan.pScanCols, i);
827,543,415✔
3189
    if (!pNode) {
827,369,375✔
3190
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3191
      return terrno;
×
3192
    }
3193
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
827,369,375✔
3194
    if (pColNode->colType == COLUMN_TYPE_TAG) {
827,561,415✔
3195
      continue;
×
3196
    }
3197

3198
    pCond->colList[j].type = pColNode->node.resType.type;
827,526,055✔
3199
    pCond->colList[j].bytes = pColNode->node.resType.bytes;
827,581,978✔
3200
    pCond->colList[j].colId = pColNode->colId;
827,612,373✔
3201
    pCond->colList[j].pk = pColNode->isPk;
827,596,432✔
3202

3203
    pCond->pSlotList[j] = pNode->slotId;
827,500,155✔
3204
    j += 1;
827,499,957✔
3205
  }
3206

3207
  pCond->numOfCols = j;
219,704,953✔
3208

3209
  if (applyExtWin) {
219,731,815✔
3210
    if (NULL != pTableScanNode->pExtScanRange) {
196,625,896✔
3211
      pCond->type = TIMEWINDOW_RANGE_EXTERNAL;
1,647,587✔
3212
      code = getQueryExtWindow(&pCond->twindows, pTableScanNode->pExtScanRange, &pCond->twindows, pCond->extTwindows);
1,647,587✔
3213
    } else if (readHandle->extWinRangeValid) {
194,855,357✔
3214
      pCond->type = TIMEWINDOW_RANGE_EXTERNAL;
×
3215
      code = getQueryExtWindow(&pCond->twindows, &readHandle->extWinRange, &pCond->twindows, pCond->extTwindows);
×
3216
    }
3217
  }
3218
  
3219
  return code;
219,691,677✔
3220
}
3221

3222
int32_t initQueryTableDataCondWithColArray(SQueryTableDataCond* pCond, SQueryTableDataCond* pOrgCond,
20,635,575✔
3223
                                           const SReadHandle* readHandle, SArray* colArray) {
3224
  int32_t code = TSDB_CODE_SUCCESS;
20,635,575✔
3225
  int32_t lino = 0;
20,635,575✔
3226

3227
  pCond->order = TSDB_ORDER_ASC;
20,635,575✔
3228
  pCond->numOfCols = (int32_t)taosArrayGetSize(colArray);
20,636,725✔
3229

3230
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
20,631,550✔
3231
  QUERY_CHECK_NULL(pCond->colList, code, lino, _return, terrno);
20,632,700✔
3232

3233
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
20,628,100✔
3234
  QUERY_CHECK_NULL(pCond->pSlotList, code, lino, _return, terrno);
20,631,550✔
3235

3236
  pCond->twindows = pOrgCond->twindows;
20,633,275✔
3237
  pCond->order = pOrgCond->order;
20,633,850✔
3238
  pCond->type = pOrgCond->type;
20,629,825✔
3239
  pCond->startVersion = -1;
20,632,700✔
3240
  pCond->endVersion = -1;
20,631,550✔
3241
  pCond->skipRollup = true;
20,631,550✔
3242
  pCond->notLoadData = false;
20,630,975✔
3243

3244
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
101,893,428✔
3245
    SColIdPair* pColPair = taosArrayGet(colArray, i);
81,269,353✔
3246
    QUERY_CHECK_NULL(pColPair, code, lino, _return, terrno);
81,259,578✔
3247

3248
    bool find = false;
81,261,303✔
3249
    for (int32_t j = 0; j < pOrgCond->numOfCols; ++j) {
499,631,909✔
3250
      if (pOrgCond->colList[j].colId == pColPair->vtbColId) {
499,512,309✔
3251
        pCond->colList[i].type = pOrgCond->colList[j].type;
81,255,553✔
3252
        pCond->colList[i].bytes = pOrgCond->colList[j].bytes;
81,265,328✔
3253
        pCond->colList[i].colId = pColPair->orgColId;
81,265,328✔
3254
        pCond->colList[i].pk = pOrgCond->colList[j].pk;
81,270,503✔
3255
        pCond->pSlotList[i] = i;
81,264,753✔
3256
        find = true;
81,270,503✔
3257
        qDebug("%s mapped vtb colId:%d to org colId:%d", __func__, pColPair->vtbColId, pColPair->orgColId);
81,270,503✔
3258
        break;
81,265,328✔
3259
      }
3260
    }
3261
    QUERY_CHECK_CONDITION(find, code, lino, _return, TSDB_CODE_NOT_FOUND);
81,260,153✔
3262
  }
3263

3264
  return code;
20,636,150✔
3265
_return:
×
3266
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(terrno));
×
3267
  taosMemoryFreeClear(pCond->colList);
×
3268
  taosMemoryFreeClear(pCond->pSlotList);
×
3269
  return code;
×
3270
}
3271

3272
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) {
482,487,587✔
3273
  taosMemoryFreeClear(pCond->colList);
482,487,587✔
3274
  taosMemoryFreeClear(pCond->pSlotList);
482,469,922✔
3275
}
482,419,495✔
3276

3277
int32_t convertFillType(int32_t mode) {
2,088,381✔
3278
  int32_t type = TSDB_FILL_NONE;
2,088,381✔
3279
  switch (mode) {
2,088,381✔
3280
    case FILL_MODE_PREV:
110,673✔
3281
      type = TSDB_FILL_PREV;
110,673✔
3282
      break;
110,673✔
3283
    case FILL_MODE_NONE:
×
3284
      type = TSDB_FILL_NONE;
×
3285
      break;
×
3286
    case FILL_MODE_NULL:
135,963✔
3287
      type = TSDB_FILL_NULL;
135,963✔
3288
      break;
135,963✔
3289
    case FILL_MODE_NULL_F:
16,469✔
3290
      type = TSDB_FILL_NULL_F;
16,469✔
3291
      break;
16,469✔
3292
    case FILL_MODE_NEXT:
93,797✔
3293
      type = TSDB_FILL_NEXT;
93,797✔
3294
      break;
93,797✔
3295
    case FILL_MODE_VALUE:
150,667✔
3296
      type = TSDB_FILL_SET_VALUE;
150,667✔
3297
      break;
150,667✔
3298
    case FILL_MODE_VALUE_F:
4,302✔
3299
      type = TSDB_FILL_SET_VALUE_F;
4,302✔
3300
      break;
4,302✔
3301
    case FILL_MODE_LINEAR:
156,710✔
3302
      type = TSDB_FILL_LINEAR;
156,710✔
3303
      break;
156,710✔
3304
    case FILL_MODE_NEAR:
1,419,633✔
3305
      type = TSDB_FILL_NEAR;
1,419,633✔
3306
      break;
1,419,633✔
3307
    default:
167✔
3308
      type = TSDB_FILL_NONE;
167✔
3309
  }
3310

3311
  return type;
2,088,381✔
3312
}
3313

3314
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery) {
1,914,172,556✔
3315
  if (ascQuery) {
1,914,172,556✔
3316
    *w = getAlignQueryTimeWindow(pInterval, ts);
1,914,852,614✔
3317
  } else {
3318
    // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
3319
    *w = getAlignQueryTimeWindow(pInterval, ts);
69,969✔
3320

3321
    int64_t key = w->skey;
148,023✔
3322
    while (key < ts) {  // moving towards end
162,730✔
3323
      key = getNextTimeWindowStart(pInterval, key, TSDB_ORDER_ASC);
76,218✔
3324
      if (key > ts) {
76,816✔
3325
        break;
62,109✔
3326
      }
3327

3328
      w->skey = key;
14,707✔
3329
    }
3330
    w->ekey = taosTimeAdd(w->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
148,621✔
3331
  }
3332
}
1,913,998,733✔
3333

3334
static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) {
27,652,748✔
3335
  STimeWindow w = {0};
27,652,748✔
3336

3337
  w.skey = taosTimeTruncate(ts, pInterval);
27,652,748✔
3338
  w.ekey = taosTimeGetIntervalEnd(w.skey, pInterval);
27,652,211✔
3339
  return w;
27,653,348✔
3340
}
3341

3342
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) {
1,649,109✔
3343
  STimeWindow win = *pWindow;
1,649,109✔
3344
  STimeWindow save = win;
1,649,109✔
3345
  while (win.skey <= ts && win.ekey >= ts) {
9,116,662✔
3346
    save = win;
7,467,553✔
3347
    // get previous time window
3348
    getNextTimeWindow(pInterval, &win, order == TSDB_ORDER_DESC ? TSDB_ORDER_ASC : TSDB_ORDER_DESC);
7,467,553✔
3349
  }
3350

3351
  return save;
1,649,109✔
3352
}
3353

3354
// get the correct time window according to the handled timestamp
3355
// todo refactor
3356
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
47,411,767✔
3357
                                int32_t order) {
3358
  STimeWindow w = {0};
47,411,767✔
3359
  if (pResultRowInfo->cur.pageId == -1) {  // the first window, from the previous stored value
47,413,339✔
3360
    getInitialStartTimeWindow(pInterval, ts, &w, (order != TSDB_ORDER_DESC));
6,037,481✔
3361
    return w;
6,039,440✔
3362
  }
3363

3364
  SResultRow* pRow = getResultRowByPos(pBuf, &pResultRowInfo->cur, false);
41,374,807✔
3365
  if (pRow) {
41,375,238✔
3366
    TAOS_SET_OBJ_ALIGNED(&w, pRow->win);
41,375,200✔
3367
  }
3368

3369
  // in case of typical time window, we can calculate time window directly.
3370
  if (w.skey > ts || w.ekey < ts) {
41,375,711✔
3371
    w = doCalculateTimeWindow(ts, pInterval);
27,654,726✔
3372
  }
3373

3374
  if (pInterval->interval != pInterval->sliding) {
41,374,333✔
3375
    // it is an sliding window query, in which sliding value is not equalled to
3376
    // interval value, and we need to find the first qualified time window.
3377
    w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
1,649,109✔
3378
  }
3379

3380
  return w;
41,372,457✔
3381
}
3382

3383
TSKEY getNextTimeWindowStart(const SInterval* pInterval, TSKEY start, int32_t order) {
2,147,483,647✔
3384
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
2,147,483,647✔
3385
  TSKEY   nextStart = taosTimeAdd(start, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
2,147,483,647✔
3386
  nextStart = taosTimeAdd(nextStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision, NULL);
2,147,483,647✔
3387
  nextStart = taosTimeAdd(nextStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
2,147,483,647✔
3388
  return nextStart;
2,147,483,647✔
3389
}
3390

3391
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order) {
2,147,483,647✔
3392
  tw->skey = getNextTimeWindowStart(pInterval, tw->skey, order);
2,147,483,647✔
3393
  tw->ekey = taosTimeAdd(tw->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
2,147,483,647✔
3394
}
2,147,483,647✔
3395

3396
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
320,499,381✔
3397
  return (pLimitInfo->limit.limit != -1 || pLimitInfo->limit.offset != -1 || pLimitInfo->slimit.limit != -1 ||
638,401,599✔
3398
          pLimitInfo->slimit.offset != -1);
317,902,218✔
3399
}
3400

3401
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo) {
×
3402
  return (pLimitInfo->slimit.limit != -1 || pLimitInfo->slimit.offset != -1);
×
3403
}
3404

3405
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) {
489,006,136✔
3406
  SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)};
489,006,136✔
3407
  SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)};
488,851,532✔
3408

3409
  pLimitInfo->limit = limit;
488,875,173✔
3410
  pLimitInfo->slimit = slimit;
488,887,056✔
3411
  pLimitInfo->remainOffset = limit.offset;
488,855,761✔
3412
  pLimitInfo->remainGroupOffset = slimit.offset;
488,881,825✔
3413
  pLimitInfo->numOfOutputRows = 0;
488,865,652✔
3414
  pLimitInfo->numOfOutputGroups = 0;
488,862,437✔
3415
  pLimitInfo->currentGroupId = 0;
488,934,137✔
3416
}
489,000,320✔
3417

3418
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo) {
58,572,756✔
3419
  pLimitInfo->numOfOutputRows = 0;
58,572,756✔
3420
  pLimitInfo->remainOffset = pLimitInfo->limit.offset;
58,582,494✔
3421
}
58,563,783✔
3422

3423
int32_t tableListGetSize(const STableListInfo* pTableList, int32_t* pRes) {
502,888,938✔
3424
  if (taosArrayGetSize(pTableList->pTableList) != taosHashGetSize(pTableList->map)) {
502,888,938✔
3425
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
3426
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3427
  }
3428
  (*pRes) = taosArrayGetSize(pTableList->pTableList);
502,893,414✔
3429
  return TSDB_CODE_SUCCESS;
502,914,671✔
3430
}
3431

3432
uint64_t tableListGetSuid(const STableListInfo* pTableList) { return pTableList->idInfo.suid; }
3,260,746✔
3433

3434
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) {
161,606,824✔
3435
  if (taosArrayGetSize(pTableList->pTableList) == 0) {
161,606,824✔
3436
    return NULL;
3,668✔
3437
  }
3438

3439
  return taosArrayGet(pTableList->pTableList, index);
161,590,822✔
3440
}
3441

3442
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex) {
9,725✔
3443
  int32_t numOfTables = taosArrayGetSize(pTableList->pTableList);
9,725✔
3444
  if (startIndex >= numOfTables) {
9,725✔
3445
    return -1;
×
3446
  }
3447

3448
  for (int32_t i = startIndex; i < numOfTables; ++i) {
93,024✔
3449
    STableKeyInfo* p = taosArrayGet(pTableList->pTableList, i);
93,024✔
3450
    if (!p) {
93,024✔
3451
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3452
      return -1;
×
3453
    }
3454
    if (p->uid == uid) {
93,024✔
3455
      return i;
9,725✔
3456
    }
3457
  }
3458
  return -1;
×
3459
}
3460

3461
void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, uint64_t* uid, int32_t* type) {
69,510✔
3462
  *psuid = pTableList->idInfo.suid;
69,510✔
3463
  *uid = pTableList->idInfo.uid;
69,510✔
3464
  *type = pTableList->idInfo.tableType;
69,510✔
3465
}
69,510✔
3466

3467
uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
595,404,820✔
3468
  int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
595,404,820✔
3469
  if (slot == NULL) {
595,517,715✔
3470
    qDebug("table:%" PRIu64 " not found in table list", tableUid);
×
3471
    return -1;
×
3472
  }
3473

3474
  STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot);
595,517,715✔
3475
  if (pKeyInfo == NULL) {
595,628,068✔
3476
    qDebug("table:%" PRIu64 " not found in table list", tableUid);
×
3477
    return -1;
×
3478
  }
3479
  return pKeyInfo->groupId;
595,628,068✔
3480
}
3481

3482
// TODO handle the group offset info, fix it, the rule of group output will be broken by this function
3483
// int32_t tableListRemoveTableInfo(STableListInfo* pTableList, uint64_t uid) {
3484
//   int32_t code = TSDB_CODE_SUCCESS;
3485
//   int32_t lino = 0;
3486

3487
//   int32_t* slot = taosHashGet(pTableList->map, &uid, sizeof(uid));
3488
//   if (slot == NULL) {
3489
//     qDebug("table:%" PRIu64 " not found in table list", uid);
3490
//     return 0;
3491
//   }
3492

3493
//   taosArrayRemove(pTableList->pTableList, *slot);
3494
//   code = taosHashRemove(pTableList->map, &uid, sizeof(uid));
3495

3496
//   _end:
3497
//   if (code != TSDB_CODE_SUCCESS) {
3498
//     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
3499
//   } else {
3500
//     qDebug("uid:%" PRIu64 ", remove from table list", uid);
3501
//   }
3502

3503
//   return code;
3504
// }
3505

3506
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
202,228✔
3507
  int32_t code = TSDB_CODE_SUCCESS;
202,228✔
3508
  int32_t lino = 0;
202,228✔
3509
  if (pTableList->map == NULL) {
202,228✔
3510
    pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
3511
    QUERY_CHECK_NULL(pTableList->map, code, lino, _end, terrno);
×
3512
  }
3513

3514
  STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
202,228✔
3515
  void*         p = taosHashGet(pTableList->map, &uid, sizeof(uid));
202,228✔
3516
  if (p != NULL) {
202,069✔
3517
    qInfo("table:%" PRId64 " already in tableIdList, ignore it", uid);
147✔
3518
    goto _end;
147✔
3519
  }
3520

3521
  void* tmp = taosArrayPush(pTableList->pTableList, &keyInfo);
201,922✔
3522
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
202,193✔
3523

3524
  int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
202,193✔
3525
  code = taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
201,988✔
3526
  if (code != TSDB_CODE_SUCCESS) {
202,193✔
3527
    // we have checked the existence of uid in hash map above
3528
    QUERY_CHECK_CONDITION((code != TSDB_CODE_DUP_KEY), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3529
    taosArrayPopTailBatch(pTableList->pTableList, 1);  // let's pop the last element in the array list
×
3530
  }
3531

3532
_end:
202,340✔
3533
  if (code != TSDB_CODE_SUCCESS) {
202,228✔
3534
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3535
  } else {
3536
    qDebug("uid:%" PRIu64 ", groupId:%" PRIu64 " added into table list, slot:%d, total:%d", uid, gid, slot, slot + 1);
202,228✔
3537
  }
3538

3539
  return code;
202,340✔
3540
}
3541

3542
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
202,458,877✔
3543
                              int32_t* size) {
3544
  int32_t totalGroups = tableListGetOutputGroups(pTableList);
202,458,877✔
3545
  int32_t numOfTables = 0;
202,468,016✔
3546
  int32_t code = tableListGetSize(pTableList, &numOfTables);
202,486,767✔
3547
  if (code != TSDB_CODE_SUCCESS) {
202,456,339✔
3548
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3549
    return code;
×
3550
  }
3551

3552
  if (ordinalGroupIndex < 0 || ordinalGroupIndex >= totalGroups) {
202,456,339✔
3553
    return TSDB_CODE_INVALID_PARA;
×
3554
  }
3555

3556
  // here handle two special cases:
3557
  // 1. only one group exists, and 2. one table exists for each group.
3558
  if (totalGroups == 1) {
202,456,339✔
3559
    *size = numOfTables;
202,047,732✔
3560
    *pKeyInfo = (*size == 0) ? NULL : taosArrayGet(pTableList->pTableList, 0);
202,098,478✔
3561
    return TSDB_CODE_SUCCESS;
201,973,802✔
3562
  } else if (totalGroups == numOfTables) {
408,607✔
3563
    *size = 1;
331,341✔
3564
    *pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex);
331,341✔
3565
    return TSDB_CODE_SUCCESS;
331,341✔
3566
  }
3567

3568
  int32_t offset = pTableList->groupOffset[ordinalGroupIndex];
77,266✔
3569
  if (ordinalGroupIndex < totalGroups - 1) {
55,294✔
3570
    *size = pTableList->groupOffset[ordinalGroupIndex + 1] - offset;
40,659✔
3571
  } else {
3572
    *size = numOfTables - offset;
14,635✔
3573
  }
3574

3575
  *pKeyInfo = taosArrayGet(pTableList->pTableList, offset);
55,294✔
3576
  return TSDB_CODE_SUCCESS;
55,294✔
3577
}
3578

3579
int32_t tableListGetOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; }
585,445,532✔
3580

3581
bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; }
585,535✔
3582

3583
STableListInfo* tableListCreate() {
230,765,706✔
3584
  STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo));
230,765,706✔
3585
  if (pListInfo == NULL) {
230,682,228✔
3586
    return NULL;
×
3587
  }
3588

3589
  pListInfo->remainGroups = NULL;
230,682,228✔
3590
  pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
230,697,043✔
3591
  if (pListInfo->pTableList == NULL) {
230,696,770✔
3592
    goto _error;
×
3593
  }
3594

3595
  pListInfo->map = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
230,731,972✔
3596
  if (pListInfo->map == NULL) {
230,830,192✔
3597
    goto _error;
×
3598
  }
3599

3600
  pListInfo->numOfOuputGroups = 1;
230,823,422✔
3601
  return pListInfo;
230,829,085✔
3602

3603
_error:
×
3604
  tableListDestroy(pListInfo);
×
3605
  return NULL;
×
3606
}
3607

3608
void tableListDestroy(STableListInfo* pTableListInfo) {
240,004,591✔
3609
  if (pTableListInfo == NULL) {
240,004,591✔
3610
    return;
9,259,798✔
3611
  }
3612

3613
  taosArrayDestroy(pTableListInfo->pTableList);
230,744,793✔
3614
  taosMemoryFreeClear(pTableListInfo->groupOffset);
230,644,226✔
3615

3616
  taosHashCleanup(pTableListInfo->map);
230,673,865✔
3617
  taosHashCleanup(pTableListInfo->remainGroups);
230,747,989✔
3618
  pTableListInfo->pTableList = NULL;
230,762,741✔
3619
  pTableListInfo->map = NULL;
230,751,688✔
3620
  taosMemoryFree(pTableListInfo);
230,771,005✔
3621
}
3622

3623
void tableListClear(STableListInfo* pTableListInfo) {
149,761✔
3624
  if (pTableListInfo == NULL) {
149,761✔
3625
    return;
×
3626
  }
3627

3628
  taosArrayClear(pTableListInfo->pTableList);
149,761✔
3629
  taosHashClear(pTableListInfo->map);
149,995✔
3630
  taosHashClear(pTableListInfo->remainGroups);
149,995✔
3631
  taosMemoryFree(pTableListInfo->groupOffset);
149,995✔
3632
  pTableListInfo->numOfOuputGroups = 1;
149,995✔
3633
  pTableListInfo->oneTableForEachGroup = false;
149,939✔
3634
}
3635

3636
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
490,804,409✔
3637
  STableKeyInfo* pInfo1 = (STableKeyInfo*)p1;
490,804,409✔
3638
  STableKeyInfo* pInfo2 = (STableKeyInfo*)p2;
490,804,409✔
3639

3640
  if (pInfo1->groupId == pInfo2->groupId) {
490,804,409✔
3641
    return 0;
474,669,324✔
3642
  } else {
3643
    return pInfo1->groupId < pInfo2->groupId ? -1 : 1;
16,135,913✔
3644
  }
3645
}
3646

3647
int32_t sortTableGroup(STableListInfo* pTableListInfo) {
19,020,010✔
3648
  int32_t code = TSDB_CODE_SUCCESS;
19,020,010✔
3649
  taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
19,020,010✔
3650
  int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
19,024,467✔
3651
  if (size == 0) {
19,021,683✔
3652
    pTableListInfo->numOfOuputGroups = 0;
×
3653
    return code;
×
3654
  }
3655

3656
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
19,021,683✔
3657
  if (!pList) {
19,022,309✔
3658
    code = terrno;
×
3659
    goto end;
×
3660
  }
3661

3662
  STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
19,022,309✔
3663
  if (pInfo == NULL) {
19,011,238✔
3664
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3665
    code = terrno;
×
3666
    goto end;
×
3667
  }
3668
  uint64_t gid = pInfo->groupId;
19,011,238✔
3669

3670
  int32_t start = 0;
19,020,819✔
3671
  void*   tmp = taosArrayPush(pList, &start);
19,024,835✔
3672
  if (tmp == NULL) {
19,024,835✔
3673
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3674
    code = terrno;
×
3675
    goto end;
×
3676
  }
3677

3678
  for (int32_t i = 1; i < size; ++i) {
120,807,971✔
3679
    pInfo = taosArrayGet(pTableListInfo->pTableList, i);
101,786,175✔
3680
    if (pInfo == NULL) {
101,783,877✔
3681
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3682
      code = terrno;
×
3683
      goto end;
×
3684
    }
3685
    if (pInfo->groupId != gid) {
101,783,877✔
3686
      tmp = taosArrayPush(pList, &i);
3,520,681✔
3687
      if (tmp == NULL) {
3,520,681✔
3688
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3689
        code = terrno;
×
3690
        goto end;
×
3691
      }
3692
      gid = pInfo->groupId;
3,520,681✔
3693
    }
3694
  }
3695

3696
  pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
19,022,020✔
3697
  pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
19,022,613✔
3698
  if (pTableListInfo->groupOffset == NULL) {
19,021,022✔
3699
    code = terrno;
×
3700
    goto end;
×
3701
  }
3702

3703
  memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
19,014,230✔
3704

3705
end:
19,018,739✔
3706
  taosArrayDestroy(pList);
19,016,979✔
3707
  return code;
19,009,987✔
3708
}
3709

3710
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SScanPhysiNode* pScanNode,
212,098,628✔
3711
                                    SNodeList* group, bool groupSort, uint8_t* digest, SStorageAPI* pAPI, SHashObj* groupIdMap) {
3712
  int32_t code = TSDB_CODE_SUCCESS;
212,098,628✔
3713

3714
  bool   groupByTbname = groupbyTbname(group);
212,098,628✔
3715
  size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
212,086,946✔
3716
  if (!numOfTables) {
212,087,819✔
3717
    return code;
6,229✔
3718
  }
3719
  qDebug("numOfTables:%zu, groupByTbname:%d, group:%p", numOfTables, groupByTbname, group);
212,081,590✔
3720
  if (group == NULL || groupByTbname) {
212,042,129✔
3721
    if (tsCountAlwaysReturnValue && QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode) &&
210,136,603✔
3722
        ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) {
180,012,469✔
3723
      pTableListInfo->remainGroups =
7,654,440✔
3724
          taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
7,654,440✔
3725
      if (pTableListInfo->remainGroups == NULL) {
7,654,440✔
3726
        return terrno;
×
3727
      }
3728

3729
      for (int i = 0; i < numOfTables; i++) {
26,299,162✔
3730
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
18,644,722✔
3731
        if (!info) {
18,644,424✔
3732
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3733
          return terrno;
×
3734
        }
3735
        info->groupId = groupByTbname ? info->uid : 0;
18,644,424✔
3736
        int32_t tempRes = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId),
18,644,573✔
3737
                                      &(info->uid), sizeof(info->uid));
18,644,573✔
3738
        if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
18,644,722✔
3739
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
3740
          return tempRes;
×
3741
        }
3742
      }
3743
    } else {
3744
      for (int32_t i = 0; i < numOfTables; i++) {
685,246,790✔
3745
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
482,780,697✔
3746
        if (!info) {
482,717,509✔
3747
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3748
          return terrno;
×
3749
        }
3750
        info->groupId = groupByTbname ? info->uid : 0;
482,717,509✔
3751
        
3752
      }
3753
    }
3754
    if (groupIdMap && group != NULL){
210,120,533✔
3755
      getColInfoResultForGroupbyForStream(pHandle->vnode, group, pTableListInfo, pAPI, groupIdMap);
107,094✔
3756
    }
3757

3758
    pTableListInfo->oneTableForEachGroup = groupByTbname;
210,120,123✔
3759
    if (numOfTables == 1 && pTableListInfo->idInfo.tableType == TSDB_CHILD_TABLE) {
210,179,318✔
3760
      pTableListInfo->oneTableForEachGroup = true;
92,355,814✔
3761
    }
3762

3763
    if (groupSort && groupByTbname) {
210,174,669✔
3764
      taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
1,464,203✔
3765
      pTableListInfo->numOfOuputGroups = numOfTables;
1,464,613✔
3766
    } else if (groupByTbname && pScanNode->groupOrderScan) {
208,710,466✔
3767
      pTableListInfo->numOfOuputGroups = numOfTables;
30,711✔
3768
    } else {
3769
      pTableListInfo->numOfOuputGroups = 1;
208,679,755✔
3770
    }
3771
    if (groupSort || pScanNode->groupOrderScan) {
210,171,393✔
3772
      code = sortTableGroup(pTableListInfo);
18,878,379✔
3773
    }
3774
  } else {
3775
    bool initRemainGroups = false;
1,905,526✔
3776
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) {
1,905,526✔
3777
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode;
1,695,013✔
3778
      if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable &&
1,695,013✔
3779
          !(groupSort || pScanNode->groupOrderScan)) {
867,447✔
3780
        initRemainGroups = true;
840,135✔
3781
      }
3782
    }
3783

3784
    code = getColInfoResultForGroupby(pHandle->vnode, group, pTableListInfo, digest, pAPI, initRemainGroups, groupIdMap);
1,905,526✔
3785
    if (code != TSDB_CODE_SUCCESS) {
1,904,890✔
3786
      return code;
×
3787
    }
3788

3789
    if (pScanNode->groupOrderScan) pTableListInfo->numOfOuputGroups = taosArrayGetSize(pTableListInfo->pTableList);
1,904,890✔
3790

3791
    if (groupSort || pScanNode->groupOrderScan) {
1,905,041✔
3792
      code = sortTableGroup(pTableListInfo);
152,851✔
3793
    }
3794
  }
3795

3796
  // add all table entry in the hash map
3797
  size_t size = taosArrayGetSize(pTableListInfo->pTableList);
212,057,629✔
3798
  for (int32_t i = 0; i < size; ++i) {
722,620,657✔
3799
    STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i);
510,540,765✔
3800
    if (!p) {
510,402,589✔
3801
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3802
      return terrno;
×
3803
    }
3804
    int32_t tempRes = taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t));
510,402,589✔
3805
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
510,551,864✔
3806
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
3807
      return tempRes;
×
3808
    }
3809
  }
3810

3811
  return code;
212,123,473✔
3812
}
3813

3814
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
224,028,466✔
3815
                                STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
3816
                                SExecTaskInfo* pTaskInfo, SHashObj* groupIdMap) {
3817
  int64_t     st = taosGetTimestampUs();
224,018,390✔
3818
  const char* idStr = GET_TASKID(pTaskInfo);
224,018,390✔
3819

3820
  if (pHandle == NULL) {
223,873,040✔
3821
    qError("invalid handle, in creating operator tree, %s", idStr);
×
3822
    return TSDB_CODE_INVALID_PARA;
×
3823
  }
3824

3825
  if (pHandle->uid != 0) {
223,873,040✔
3826
    pScanNode->uid = pHandle->uid;
51,472✔
3827
    pScanNode->tableType = TSDB_CHILD_TABLE;
51,472✔
3828
  }
3829
  uint8_t digest[17] = {0};
224,034,679✔
3830
  int32_t code = getTableList(pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo, digest, idStr,
223,996,640✔
3831
                              &pTaskInfo->storageAPI, pTaskInfo->pStreamRuntimeInfo);
224,016,694✔
3832
  if (code != TSDB_CODE_SUCCESS) {
224,041,951✔
3833
    qError("failed to getTableList, code:%s", tstrerror(code));
1,096✔
3834
    return code;
1,096✔
3835
  }
3836

3837
  int32_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
224,040,855✔
3838

3839
  int64_t st1 = taosGetTimestampUs();
224,059,367✔
3840
  pTaskInfo->cost.extractListTime = (st1 - st) / 1000.0;
224,059,367✔
3841
  qDebug("extract queried table list completed, %d tables, elapsed time:%.2f ms %s", numOfTables,
224,085,211✔
3842
         pTaskInfo->cost.extractListTime, idStr);
3843

3844
  if (numOfTables == 0) {
224,054,439✔
3845
    qDebug("no table qualified for query, %s", idStr);
12,096,674✔
3846
    return TSDB_CODE_SUCCESS;
12,096,674✔
3847
  }
3848

3849
  code = buildGroupIdMapForAllTables(pTableListInfo, pHandle, pScanNode, pGroupTags, groupSort, digest, &pTaskInfo->storageAPI, groupIdMap);
211,957,765✔
3850
  if (code != TSDB_CODE_SUCCESS) {
211,993,268✔
3851
    return code;
×
3852
  }
3853

3854
  pTaskInfo->cost.groupIdMapTime = (taosGetTimestampUs() - st1) / 1000.0;
211,994,255✔
3855
  qDebug("generate group id map completed, elapsed time:%.2f ms %s", pTaskInfo->cost.groupIdMapTime, idStr);
211,978,371✔
3856

3857
  return TSDB_CODE_SUCCESS;
211,941,824✔
3858
}
3859

3860
char* getStreamOpName(uint16_t opType) {
10,024,089✔
3861
  switch (opType) {
10,024,089✔
3862
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
×
3863
      return "stream scan";
×
3864
    case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
9,806,078✔
3865
      return "project";
9,806,078✔
3866
    case QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW:
218,011✔
3867
      return "external window";
218,011✔
3868
  }
3869
  return "error name";
×
3870
}
3871

3872
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr, int64_t qId) {
464,684,118✔
3873
  if (qDebugFlag & DEBUG_TRACE) {
464,684,118✔
3874
    if (!pBlock) {
38,540✔
3875
      qDebug("%" PRIx64 " %s %s %s: Block is Null", qId, taskIdStr, flag, __func__);
6,970✔
3876
      return;
6,970✔
3877
    } else if (pBlock->info.rows == 0) {
31,570✔
3878
      qDebug("%" PRIx64 " %s %s %s: Block is Empty. block type %d", qId, taskIdStr, flag, __func__, pBlock->info.type);
×
3879
      return;
×
3880
    }
3881
    
3882
    char*   pBuf = NULL;
31,570✔
3883
    int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr, qId);
31,570✔
3884
    if (code == 0) {
31,570✔
3885
      qDebugL("%" PRIx64 " %s %s", qId, __func__, pBuf);
31,570✔
3886
      taosMemoryFree(pBuf);
31,570✔
3887
    }
3888
  }
3889
}
3890

3891
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
×
3892
  if (!pBlock) {
×
3893
    qDebug("%s===stream===%s %s: Block is Null", taskIdStr, flag, opStr);
×
3894
    return;
×
3895
  } else if (pBlock->info.rows == 0) {
×
3896
    qDebug("%s===stream===%s %s: Block is Empty. block type %d.skey:%" PRId64 ",ekey:%" PRId64 ",version%" PRId64,
×
3897
           taskIdStr, flag, opStr, pBlock->info.type, pBlock->info.window.skey, pBlock->info.window.ekey,
3898
           pBlock->info.version);
3899
    return;
×
3900
  }
3901
  if (qDebugFlag & DEBUG_TRACE) {
×
3902
    char* pBuf = NULL;
×
3903
    char  flagBuf[64];
×
3904
    snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
×
3905
    int32_t code = dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr, 0);
×
3906
    if (code == 0) {
×
3907
      qDebug("%s", pBuf);
×
3908
      taosMemoryFree(pBuf);
×
3909
    }
3910
  }
3911
}
3912

3913
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
12,460,062✔
3914

3915
void updateTimeWindowInfo(SColumnInfoData* pColData, const STimeWindow* pWin, int64_t delta) {
2,147,483,647✔
3916
  int64_t* ts = (int64_t*)pColData->pData;
2,147,483,647✔
3917

3918
  int64_t duration = pWin->ekey > pWin->skey ? pWin->ekey - pWin->skey + delta : pWin->skey - pWin->ekey + delta;
2,147,483,647✔
3919
  ts[2] = duration;            // set the duration
2,147,483,647✔
3920
  ts[3] = pWin->skey;          // window start key
2,147,483,647✔
3921
  ts[4] = pWin->ekey + delta;  // window end key
2,147,483,647✔
3922
}
2,147,483,647✔
3923

3924
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock,
976,484,607✔
3925
                 int32_t rowIndex) {
3926
  SColumnDataAgg* pColAgg = NULL;
976,484,607✔
3927
  const char*     isNull = oldkeyBuf;
976,484,607✔
3928
  const char*     p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size;
976,484,607✔
3929

3930
  for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
2,147,483,647✔
3931
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
1,514,386,099✔
3932
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
1,515,513,588✔
3933
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
1,515,558,424✔
3934

3935
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
2,147,483,647✔
3936
      if (isNull[i] != 1) return 1;
103,250,481✔
3937
    } else {
3938
      if (isNull[i] != 0) return 1;
1,412,457,575✔
3939
      const char* val = colDataGetData(pColInfoData, rowIndex);
1,411,672,514✔
3940
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
1,411,735,264✔
3941
        int32_t len = getJsonValueLen(val);
×
3942
        if (memcmp(p, val, len) != 0) return 1;
×
3943
        p += len;
×
3944
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
1,411,193,417✔
3945
        if (IS_STR_DATA_BLOB(pCol->type)) {
479,465,168✔
UNCOV
3946
          if (memcmp(p, val, blobDataTLen(val)) != 0) return 1;
×
3947
          p += blobDataTLen(val);
×
3948
        } else {
3949
          if (memcmp(p, val, varDataTLen(val)) != 0) return 1;
478,638,001✔
3950
          p += varDataTLen(val);
471,508,180✔
3951
        }
3952
      } else {
3953
        if (0 != memcmp(p, val, pCol->bytes)) return 1;
931,931,225✔
3954
        p += pCol->bytes;
915,608,814✔
3955
      }
3956
    }
3957
  }
3958
  if ((int32_t)(p - oldkeyBuf) != oldKeysLen) return 1;
951,735,040✔
3959
  return 0;
951,721,670✔
3960
}
3961

3962
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex) {
25,332,684✔
3963
  uint32_t        colNum = pSortGroupCols->size;
25,332,684✔
3964
  SColumnDataAgg* pColAgg = NULL;
25,336,674✔
3965
  char*           isNull = keyBuf;
25,336,674✔
3966
  char*           p = keyBuf + sizeof(int8_t) * colNum;
25,336,674✔
3967

3968
  for (int32_t i = 0; i < colNum; ++i) {
75,690,868✔
3969
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
50,345,374✔
3970
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
50,359,024✔
3971
    if (pCol->slotId > pBlock->pDataBlock->size) continue;
50,364,484✔
3972

3973
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
50,369,314✔
3974

3975
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
100,718,048✔
3976
      isNull[i] = 1;
1,811,044✔
3977
    } else {
3978
      isNull[i] = 0;
48,550,920✔
3979
      const char* val = colDataGetData(pColInfoData, rowIndex);
48,543,360✔
3980
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
48,553,020✔
3981
        int32_t len = getJsonValueLen(val);
×
3982
        memcpy(p, val, len);
×
3983
        p += len;
×
3984
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
48,550,710✔
3985
        if (IS_STR_DATA_BLOB(pCol->type)) {
7,297,468✔
3986
          blobDataCopy(p, val);
×
3987
          p += blobDataTLen(val);
×
3988
        } else {
3989
          varDataCopy(p, val);
7,301,668✔
3990
          p += varDataTLen(val);
7,298,098✔
3991
        }
3992
      } else {
3993
        memcpy(p, val, pCol->bytes);
41,251,982✔
3994
        p += pCol->bytes;
41,252,822✔
3995
      }
3996
    }
3997
  }
3998
  return (int32_t)(p - keyBuf);
25,345,494✔
3999
}
4000

4001
uint64_t calcGroupId(char* pData, int32_t len) {
2,147,483,647✔
4002
  T_MD5_CTX context;
2,147,483,647✔
4003
  tMD5Init(&context);
2,147,483,647✔
4004
  tMD5Update(&context, (uint8_t*)pData, len);
2,147,483,647✔
4005
  tMD5Final(&context);
2,147,483,647✔
4006

4007
  // NOTE: only extract the initial 8 bytes of the final MD5 digest
4008
  uint64_t id = 0;
2,147,483,647✔
4009
  memcpy(&id, context.digest, sizeof(uint64_t));
2,147,483,647✔
4010
  if (0 == id) memcpy(&id, context.digest + 8, sizeof(uint64_t));
2,147,483,647✔
4011
  return id;
2,147,483,647✔
4012
}
4013

4014
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
43,112✔
4015
  SNode*     node;
4016
  SNodeList* ret = NULL;
43,112✔
4017
  FOREACH(node, pSortKeys) {
131,360✔
4018
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)node;
88,248✔
4019
    int32_t           code = nodesListMakeAppend(&ret, pSortKey->pExpr);
88,248✔
4020
    if (code != TSDB_CODE_SUCCESS) {
88,248✔
4021
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
4022
      terrno = code;
×
4023
      return NULL;
×
4024
    }
4025
  }
4026
  return ret;
43,112✔
4027
}
4028

4029
int32_t extractKeysLen(const SArray* keys, int32_t* pLen) {
43,112✔
4030
  int32_t code = TSDB_CODE_SUCCESS;
43,112✔
4031
  int32_t lino = 0;
43,112✔
4032
  int32_t len = 0;
43,112✔
4033
  int32_t keyNum = taosArrayGetSize(keys);
43,112✔
4034
  for (int32_t i = 0; i < keyNum; ++i) {
109,880✔
4035
    SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
66,768✔
4036
    QUERY_CHECK_NULL(pCol, code, lino, _end, terrno);
66,768✔
4037
    len += pCol->bytes;
66,768✔
4038
  }
4039
  len += sizeof(int8_t) * keyNum;  // null flag
43,112✔
4040
  *pLen = len;
43,112✔
4041

4042
_end:
43,112✔
4043
  if (code != TSDB_CODE_SUCCESS) {
43,112✔
4044
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4045
  }
4046
  return code;
43,112✔
4047
}
4048

4049
int32_t parseErrorMsgFromAnalyticServer(SJson* pJson, const char* pId) {
×
4050
  int32_t code = TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
4051
  if (pJson == NULL) {
×
4052
    return code;
×
4053
  }
4054

4055
  char    pMsg[1024] = {0};
×
4056
  int32_t ret = tjsonGetStringValue(pJson, "msg", pMsg);
×
4057

4058
  if (ret == 0) {
×
4059
    qError("%s failed to exec imputation, msg:%s", pId, pMsg);
×
4060
    if (strstr(pMsg, "white noise") != NULL) {
×
4061
      code = TSDB_CODE_ANA_WN_DATA;
×
4062
    } else if (strstr(pMsg, "white-noise") != NULL) {
×
4063
      code = TSDB_CODE_ANA_WN_DATA;
×
4064
    } else if (strstr(pMsg, "[Errno 111] Connection refused") != NULL) {
×
4065
      code = TSDB_CODE_ANA_ALGO_NOT_LOAD;
×
4066
    }
4067
  } else {
4068
    qError("%s failed to extract msg from server, unknown error", pId);
×
4069
  }
4070

4071
  return code;
×
4072
}
4073

4074

4075
int32_t createBlockFromRemoteValueNode(SSDataBlock** ppBlock, SRemoteValueNode* pRemote) {
26,042,459✔
4076
  SValueNode* pVal = (SValueNode*)pRemote;
26,042,459✔
4077
  int32_t code = 0;
26,042,459✔
4078
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
26,042,459✔
4079
  if (pBlock == NULL) {
26,039,083✔
4080
    return terrno;
×
4081
  }
4082

4083
  pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
26,039,083✔
4084
  if (pBlock->pDataBlock == NULL) {
26,041,843✔
4085
    code = terrno;
×
4086
    taosMemoryFree(pBlock);
×
4087
    return code;
×
4088
  }
4089

4090
  SColumnInfoData idata =
26,040,243✔
4091
      createColumnInfoData(pVal->node.resType.type, pVal->node.resType.bytes, 0);
26,042,459✔
4092
  idata.info.scale = pVal->node.resType.scale;
26,042,459✔
4093
  idata.info.precision = pVal->node.resType.precision;
26,042,459✔
4094

4095
  code = blockDataAppendColInfo(pBlock, &idata);
26,041,299✔
4096
  if (code != TSDB_CODE_SUCCESS) {
26,041,915✔
4097
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
4098
    blockDataDestroy(pBlock);
×
4099
    *ppBlock = NULL;
×
4100
    return code;
×
4101
  }
4102

4103
  *ppBlock = pBlock;
26,041,915✔
4104

4105
  return code;
26,042,459✔
4106
}
4107

4108

4109
int32_t extractSingleRspBlock(SRetrieveTableRsp* pRetrieveRsp, SSDataBlock* pb) {
26,042,459✔
4110
  int32_t            code = TSDB_CODE_SUCCESS;
26,042,459✔
4111
  int32_t            lino = 0;
26,042,459✔
4112
  void*              decompBuf = NULL;
26,042,459✔
4113

4114
  char* pNextStart = pRetrieveRsp->data;
26,042,459✔
4115
  char* pStart = pNextStart;
26,041,895✔
4116

4117
  int32_t index = 0;
26,041,895✔
4118

4119
  if (pRetrieveRsp->compressed) {  // decompress the data
26,041,895✔
4120
    decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
4121
    QUERY_CHECK_NULL(decompBuf, code, lino, _end, terrno);
×
4122
  }
4123

4124
  int32_t compLen = *(int32_t*)pStart;
26,039,647✔
4125
  pStart += sizeof(int32_t);
26,042,459✔
4126

4127
  int32_t rawLen = *(int32_t*)pStart;
26,041,299✔
4128
  pStart += sizeof(int32_t);
26,041,299✔
4129
  QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
26,040,807✔
4130

4131
  pNextStart = pStart + compLen;
26,040,807✔
4132
  if (pRetrieveRsp->compressed && (compLen < rawLen)) {
26,041,915✔
4133
    int32_t t = tsDecompressString(pStart, compLen, 1, decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
4134
    QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
4135
    pStart = decompBuf;
×
4136
  }
4137

4138
  code = blockDecodeInternal(pb, pStart, (const char**)&pStart);
26,041,371✔
4139
  if (code != 0) {
26,042,459✔
4140
    taosMemoryFreeClear(pRetrieveRsp);
×
4141
    goto _end;
×
4142
  }
4143

4144
_end:
26,042,459✔
4145
  if (code != TSDB_CODE_SUCCESS) {
26,042,459✔
4146
    blockDataDestroy(pb);
×
4147
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4148
  }
4149
  return code;
26,042,459✔
4150
}
4151

4152
int32_t setValueFromResBlock(STaskSubJobCtx* ctx, SRemoteValueNode* pRes, SSDataBlock* pBlock) {
26,042,459✔
4153
  int32_t code = 0;
26,042,459✔
4154
  bool needFree = true;
26,042,459✔
4155
  int32_t colNum = taosArrayGetSize(pBlock->pDataBlock);
26,041,915✔
4156
  if (NULL == pBlock->pDataBlock || 1 != colNum || pBlock->info.rows > 1) {
26,037,412✔
UNCOV
4157
    qError("%s invalid scl fetch res block, pDataBlock:%p, colNum:%d, rows:%" PRId64, 
×
4158
      ctx->idStr, pBlock->pDataBlock, colNum, pBlock->info.rows);
4159
    return TSDB_CODE_PAR_INVALID_SCALAR_SUBQ_RES_ROWS;
×
4160
  }
4161
  
4162
  pRes->val.node.type = QUERY_NODE_VALUE;
26,038,485✔
4163
  pRes->val.flag &= (~VALUE_FLAG_VAL_UNSET);
26,041,369✔
4164
  pRes->val.translate = true;
26,040,263✔
4165
  
4166
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0);
26,040,206✔
4167
  if (colDataIsNull_s(pCol, 0)) {
26,040,206✔
4168
    pRes->val.isNull = true;
2,134,896✔
4169
  } else {
4170
    code = nodesSetValueNodeValueExt(&pRes->val, colDataGetData(pCol, 0), &needFree);
23,905,310✔
4171
  }
4172

4173
  if (!needFree) {
26,041,915✔
4174
    pCol->pData = NULL;
17,248✔
4175
  }
4176

4177
  return code;
26,041,915✔
4178
}
4179

4180
int32_t remoteFetchCallBack(void* param, SDataBuf* pMsg, int32_t code) {
47,133,974✔
4181
  SScalarFetchParam* pParam = (SScalarFetchParam*)param;
47,133,974✔
4182
  STaskSubJobCtx* ctx = pParam->pSubJobCtx;
47,133,974✔
4183
  SSDataBlock* pResBlock = NULL;
47,136,232✔
4184
  
4185
  taosMemoryFreeClear(pMsg->pEpSet);
47,133,488✔
4186

4187
  if (NULL == ctx) {
47,133,974✔
4188
    qWarn("scl fetch ctx not exists since it may have been released");
3,017✔
4189
    goto _exit;
3,017✔
4190
  }
4191

4192
  qDebug("%s subQIdx %d got rsp, code:%d, rsp:%p", ctx->idStr, pParam->subQIdx, code, pMsg->pData);
47,130,957✔
4193

4194
  taosWLockLatch(&ctx->lock);
47,130,957✔
4195
  ctx->param = NULL;
47,127,501✔
4196
  taosWUnLockLatch(&ctx->lock);
47,128,045✔
4197

4198
  if (ctx->transporterId > 0) {
47,131,451✔
4199
    int32_t ret = asyncFreeConnById(ctx->rpcHandle, ctx->transporterId);
47,129,193✔
4200
    if (ret != 0) {
47,130,887✔
4201
      qDebug("%s failed to free subQ rpc handle, code:%s, subQIdx:%d", ctx->idStr, tstrerror(ret), pParam->subQIdx);
×
4202
    }
4203
    ctx->transporterId = -1;
47,130,887✔
4204
  }
4205

4206
  if (0 == code && NULL == pMsg->pData) {
47,135,491✔
4207
    qError("%s invalid rsp msg, msgType:%d, len:%d", ctx->idStr, pMsg->msgType, pMsg->len);
×
4208
    code = TSDB_CODE_QRY_INVALID_MSG;
×
4209
  }
4210

4211
  if (code == TSDB_CODE_SUCCESS) {
47,131,503✔
4212
    SRetrieveTableRsp* pRsp = pMsg->pData;
38,858,745✔
4213
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
38,858,745✔
4214
    pRsp->compLen = htonl(pRsp->compLen);
38,857,107✔
4215
    pRsp->payloadLen = htonl(pRsp->payloadLen);
38,857,651✔
4216
    pRsp->numOfCols = htonl(pRsp->numOfCols);
38,856,017✔
4217
    pRsp->useconds = htobe64(pRsp->useconds);
38,857,019✔
4218
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
38,856,545✔
4219

4220
    if (pRsp->numOfRows > 1 || pRsp->numOfBlocks > 1 || !pRsp->completed) {
38,855,999✔
4221
      qError("%s invalid scl fetch rsp received, subQIdx:%d, rows:%" PRId64 ", blocks:%d, completed:%d", 
899,006✔
4222
        ctx->idStr, pParam->subQIdx, pRsp->numOfRows, pRsp->numOfBlocks, pRsp->completed);
4223
      ctx->code = TSDB_CODE_PAR_INVALID_SCALAR_SUBQ_RES_ROWS;
899,006✔
4224
    } else if (0 == pRsp->numOfRows) {
37,956,541✔
4225
      SRemoteValueNode* pRemote = (SRemoteValueNode*)pParam->pRes;
11,920,118✔
4226
      pRemote->val.node.type = QUERY_NODE_VALUE;
11,920,118✔
4227
      pRemote->val.isNull = true;
11,918,480✔
4228
      pRemote->val.translate = true;
11,916,842✔
4229
      pRemote->val.flag &= (~VALUE_FLAG_VAL_UNSET);
11,920,118✔
4230
      taosArraySet(ctx->subResValues, pParam->subQIdx, &pParam->pRes);
11,920,118✔
4231
    } else {
4232
      qDebug("%s scl fetch rsp received, subQIdx:%d, rows:%" PRId64 , ctx->idStr, pParam->subQIdx, pRsp->numOfRows);
26,042,459✔
4233
      ctx->code = createBlockFromRemoteValueNode(&pResBlock, pParam->pRes);
26,042,459✔
4234
      if (TSDB_CODE_SUCCESS == ctx->code) {
26,040,191✔
4235
        ctx->code = blockDataEnsureCapacity(pResBlock, 1);
26,042,459✔
4236
      }
4237
      if (TSDB_CODE_SUCCESS == ctx->code) {
26,041,895✔
4238
        ctx->code = extractSingleRspBlock(pRsp, pResBlock);
26,040,211✔
4239
      }
4240
      if (TSDB_CODE_SUCCESS == ctx->code) {
26,044,687✔
4241
        ctx->code = setValueFromResBlock(ctx, pParam->pRes, pResBlock);
26,039,588✔
4242
      }
4243
      if (TSDB_CODE_SUCCESS == ctx->code) {
26,041,897✔
4244
        taosArraySet(ctx->subResValues, pParam->subQIdx, &pParam->pRes);
26,041,314✔
4245
      }
4246
    }
4247
  } else {
4248
    ctx->code = rpcCvtErrCode(code);
8,272,758✔
4249
    if (ctx->code != code) {
8,273,926✔
4250
      qError("%s scl fetch rsp received, subQIdx:%d, error:%s, cvted error: %s", ctx->idStr, pParam->subQIdx,
×
4251
             tstrerror(code), tstrerror(ctx->code));
4252
    } else {
4253
      qError("%s scl fetch rsp received, subQIdx:%d, error:%s", ctx->idStr, pParam->subQIdx, tstrerror(code));
8,271,590✔
4254
    }
4255
  }
4256
  
4257
  code = tsem_post(&pParam->pSubJobCtx->ready);
47,125,925✔
4258
  if (code != TSDB_CODE_SUCCESS) {
47,133,233✔
4259
    qError("failed to invoke post when scl fetch rsp is ready, code:%s", tstrerror(code));
×
4260
  }
4261

4262
_exit:
47,136,250✔
4263

4264
  taosMemoryFree(pMsg->pData);
47,135,704✔
4265
  blockDataDestroy(pResBlock);
47,134,616✔
4266

4267
  return code;
47,134,524✔
4268
}
4269

4270

4271
int32_t fetchRemoteValueImpl(STaskSubJobCtx* ctx, int32_t subQIdx, SRemoteValueNode* pRes) {
47,127,688✔
4272
  int32_t          code = TSDB_CODE_SUCCESS;
47,127,688✔
4273
  int32_t          lino = 0;
47,127,688✔
4274
  SDownstreamSourceNode* pSource = (SDownstreamSourceNode*)taosArrayGetP(ctx->subEndPoints, subQIdx);
47,127,688✔
4275

4276
  SResFetchReq req = {0};
47,127,130✔
4277
  req.header.vgId = pSource->addr.nodeId;
47,127,694✔
4278
  req.sId = pSource->sId;
47,130,022✔
4279
  req.clientId = pSource->clientId;
47,123,833✔
4280
  req.taskId = pSource->taskId;
47,124,997✔
4281
  req.queryId = ctx->queryId;
47,117,626✔
4282
  req.execId = pSource->execId;
47,115,326✔
4283

4284
  int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, false);
47,116,468✔
4285
  if (msgSize < 0) {
47,127,138✔
4286
    return msgSize;
×
4287
  }
4288

4289
  void* msg = taosMemoryCalloc(1, msgSize);
47,127,138✔
4290
  if (NULL == msg) {
47,117,770✔
4291
    return terrno;
×
4292
  }
4293

4294
  msgSize = tSerializeSResFetchReq(msg, msgSize, &req, false);
47,117,770✔
4295
  if (msgSize < 0) {
47,127,878✔
4296
    taosMemoryFree(msg);
×
4297
    return msgSize;
×
4298
  }
4299

4300
  qDebug("%s scl build fetch msg and send to nodeId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
47,127,878✔
4301
         ", execId:%d",
4302
         ctx->idStr, pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
4303
         pSource->taskId, pSource->execId);
4304

4305
  // send the fetch remote task result reques
4306
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
47,130,606✔
4307
  if (NULL == pMsgSendInfo) {
47,114,810✔
4308
    taosMemoryFreeClear(msg);
×
4309
    qError("%s prepare message %d failed", ctx->idStr, (int32_t)sizeof(SMsgSendInfo));
×
4310
    return terrno;
×
4311
  }
4312

4313
  SScalarFetchParam* param = taosMemoryMalloc(sizeof(SScalarFetchParam));
47,114,810✔
4314
  if (NULL == param) {
47,124,436✔
4315
    taosMemoryFreeClear(msg);
×
4316
    taosMemoryFreeClear(pMsgSendInfo);
×
4317
    qError("%s prepare param %d failed", ctx->idStr, (int32_t)sizeof(SScalarFetchParam));
×
4318
    return terrno;
×
4319
  }
4320

4321
  taosWLockLatch(&ctx->lock);
47,124,436✔
4322
  
4323
  if (ctx->code) {
47,132,382✔
4324
    qError("task has been killed, error:%s", tstrerror(ctx->code));
×
4325
    taosMemoryFree(param);
×
4326
    code = ctx->code;
×
4327
    goto _end;
×
4328
  } else {
4329
    ctx->param = param;
47,126,122✔
4330
  }
4331
  
4332
  taosWUnLockLatch(&ctx->lock);
47,129,578✔
4333

4334
  param->subQIdx = subQIdx;
47,132,864✔
4335
  param->pRes = pRes;
47,132,864✔
4336
  param->pSubJobCtx = ctx;
47,127,846✔
4337

4338
  pMsgSendInfo->param = param;
47,126,272✔
4339
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
47,116,500✔
4340
  pMsgSendInfo->msgInfo.pData = msg;
47,115,966✔
4341
  pMsgSendInfo->msgInfo.len = msgSize;
47,128,430✔
4342
  pMsgSendInfo->msgType = pSource->fetchMsgType;
47,127,302✔
4343
  pMsgSendInfo->fp = remoteFetchCallBack;
47,118,988✔
4344
  pMsgSendInfo->requestId = ctx->queryId;
47,118,804✔
4345

4346
  code = asyncSendMsgToServer(ctx->rpcHandle, &pSource->addr.epSet, &ctx->transporterId, pMsgSendInfo);
47,124,982✔
4347
  QUERY_CHECK_CODE(code, lino, _end);
47,135,140✔
4348

4349
  code = qSemWait(ctx->pTaskInfo, &ctx->ready);
47,135,140✔
4350
  if (isTaskKilled(ctx->pTaskInfo)) {
47,137,400✔
4351
    code = getTaskCode(ctx->pTaskInfo);
9,655✔
4352
  } else {
4353
    code = ctx->code;
47,126,655✔
4354
  }
4355
      
4356
_end:
47,136,854✔
4357

4358
  taosWLockLatch(&ctx->lock);
47,136,854✔
4359
  ctx->param = NULL;
47,137,400✔
4360
  taosWUnLockLatch(&ctx->lock);
47,137,400✔
4361

4362
  if (code != TSDB_CODE_SUCCESS) {
47,136,854✔
4363
    qError("%s %s failed at line %d since %s", ctx->idStr, __func__, lino, tstrerror(code));
9,173,733✔
4364
  }
4365
  return code;
47,136,854✔
4366
}
4367

4368

4369
int32_t qFetchRemoteValue(void* pCtx, int32_t subQIdx, SRemoteValueNode* pRes) {
48,693,261✔
4370
  STaskSubJobCtx*  ctx = (STaskSubJobCtx*)pCtx;
48,693,261✔
4371
  int32_t code = 0, lino = 0;
48,693,261✔
4372
  int32_t       subEndPoinsNum = taosArrayGetSize(ctx->subEndPoints);
48,693,261✔
4373
  if (subQIdx >= subEndPoinsNum) {
48,690,361✔
4374
    qError("%s invalid subQIdx %d, subEndPointsNum:%d", ctx->idStr, subQIdx, subEndPoinsNum);
×
4375
    return TSDB_CODE_QRY_SUBQ_NOT_FOUND;
×
4376
  }
4377

4378
  SValueNode** ppRes = taosArrayGet(ctx->subResValues, subQIdx);
48,690,361✔
4379
  if (NULL == *ppRes) {
48,696,611✔
4380
    TAOS_CHECK_EXIT(fetchRemoteValueImpl(ctx, subQIdx, pRes));
47,130,530✔
4381
    *ppRes = (SValueNode*)pRes;
37,963,121✔
4382
  } else {
4383
    TAOS_CHECK_EXIT(valueNodeCopy(*ppRes, &pRes->val));
1,564,347✔
4384
    pRes->val.node.type = QUERY_NODE_VALUE;
1,564,347✔
4385
  }
4386

4387
_exit:
48,701,747✔
4388

4389
  if (code) {
48,701,747✔
4390
    qError("%s %s failed at line %d since %s", ctx->idStr, __func__, lino, tstrerror(code));
9,173,733✔
4391
  }
4392

4393
  return code;
48,701,201✔
4394
}
4395

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc