• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4935

22 Jan 2026 06:38AM UTC coverage: 66.708% (+0.02%) from 66.691%
#4935

push

travis-ci

web-flow
merge: from main to 3.0 #34371

121 of 271 new or added lines in 17 files covered. (44.65%)

9066 existing lines in 149 files now uncovered.

203884 of 305637 relevant lines covered (66.71%)

125811266.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.45
/source/libs/executor/src/executil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "function.h"
17
#include "functionMgt.h"
18
#include "index.h"
19
#include "os.h"
20
#include "query.h"
21
#include "querynodes.h"
22
#include "taoserror.h"
23
#include "tarray.h"
24
#include "tcompare.h"
25
#include "tdatablock.h"
26
#include "thash.h"
27
#include "tmsg.h"
28
#include "ttime.h"
29

30
#include "executil.h"
31
#include "executorInt.h"
32
#include "querytask.h"
33
#include "storageapi.h"
34
#include "tutil.h"
35
#include "tjson.h"
36
#include "trpc.h"
37
#include "filter.h"
38

39
typedef struct tagFilterAssist {
40
  SHashObj* colHash;
41
  int32_t   index;
42
  SArray*   cInfoList;
43
  int32_t   code;
44
} tagFilterAssist;
45

46
typedef struct STransTagExprCtx {
47
  int32_t      code;
48
  SMetaReader* pReader;
49
} STransTagExprCtx;
50

51
typedef enum {
52
  FILTER_NO_LOGIC = 1,
53
  FILTER_AND,
54
  FILTER_OTHER,
55
} FilterCondType;
56

57
static FilterCondType checkTagCond(SNode* cond);
58
static bool optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI);
59
static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI,
60
                                        uint64_t suid);
61

62
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
63
                            STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI, void* pStreamInfo);
64

65
static int64_t getLimit(const SNode* pLimit) {
1,093,734,844✔
66
  return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->limit) ? -1 : ((SLimitNode*)pLimit)->limit->datum.i;
1,093,734,844✔
67
}
68
static int64_t getOffset(const SNode* pLimit) {
1,093,641,744✔
69
  return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->offset) ? -1 : ((SLimitNode*)pLimit)->offset->datum.i;
1,093,641,744✔
70
}
71
static void releaseColInfoData(void* pCol);
72
int32_t sendFetchRemoteNodeReq(STaskSubJobCtx* ctx, int32_t subQIdx, SNode* pRes);
73

74
void initResultRowInfo(SResultRowInfo* pResultRowInfo) {
387,356,364✔
75
  pResultRowInfo->size = 0;
387,356,364✔
76
  pResultRowInfo->cur.pageId = -1;
387,413,584✔
77
}
387,431,992✔
78

79
void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; }
5,248,031✔
80

81
void resetResultRow(SResultRow* pResultRow, size_t entrySize) {
904,164,898✔
82
  pResultRow->numOfRows = 0;
904,164,898✔
83
  pResultRow->closed = false;
904,233,903✔
84
  pResultRow->endInterp = false;
904,231,292✔
85
  pResultRow->startInterp = false;
904,236,141✔
86

87
  if (entrySize > 0) {
904,239,871✔
88
    memset(pResultRow->pEntryInfo, 0, entrySize);
904,240,031✔
89
  }
90
}
904,096,212✔
91

92
// TODO refactor: use macro
93
SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) {
2,147,483,647✔
94
  return (SResultRowEntryInfo*)((char*)pRow->pEntryInfo + offset[index]);
2,147,483,647✔
95
}
96

97
size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
231,442,884✔
98
  int32_t rowSize = (numOfOutput * sizeof(SResultRowEntryInfo)) + sizeof(SResultRow);
231,442,884✔
99

100
  for (int32_t i = 0; i < numOfOutput; ++i) {
1,070,483,265✔
101
    rowSize += pCtx[i].resDataInfo.interBufSize;
839,106,064✔
102
  }
103

104
  return rowSize;
231,377,201✔
105
}
106

107
// Convert buf read from rocksdb to result row
108
int32_t getResultRowFromBuf(SExprSupp* pSup, const char* inBuf, size_t inBufSize, char** outBuf, size_t* outBufSize) {
×
109
  if (inBuf == NULL || pSup == NULL) {
×
110
    qError("invalid input parameters, inBuf:%p, pSup:%p", inBuf, pSup);
×
UNCOV
111
    return TSDB_CODE_INVALID_PARA;
×
112
  }
113
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
114
  int32_t*        offset = pSup->rowEntryInfoOffset;
×
115
  SResultRow*     pResultRow = NULL;
×
116
  size_t          processedSize = 0;
×
UNCOV
117
  int32_t         code = TSDB_CODE_SUCCESS;
×
118

119
  // calculate the size of output buffer
120
  *outBufSize = getResultRowSize(pCtx, pSup->numOfExprs);
×
121
  *outBuf = taosMemoryMalloc(*outBufSize);
×
122
  if (*outBuf == NULL) {
×
123
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
UNCOV
124
    return terrno;
×
125
  }
126
  pResultRow = (SResultRow*)*outBuf;
×
127
  (void)memcpy(pResultRow, inBuf, sizeof(SResultRow));
×
128
  inBuf += sizeof(SResultRow);
×
UNCOV
129
  processedSize += sizeof(SResultRow);
×
130

131
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
×
132
    int32_t len = *(int32_t*)inBuf;
×
133
    inBuf += sizeof(int32_t);
×
134
    processedSize += sizeof(int32_t);
×
135
    if (pResultRow->version != FUNCTION_RESULT_INFO_VERSION && pCtx->fpSet.decode) {
×
136
      code = pCtx->fpSet.decode(&pCtx[i], inBuf, getResultEntryInfo(pResultRow, i, offset), pResultRow->version);
×
137
      if (code != TSDB_CODE_SUCCESS) {
×
138
        qError("failed to decode result row, code:%d", code);
×
UNCOV
139
        return code;
×
140
      }
141
    } else {
UNCOV
142
      (void)memcpy(getResultEntryInfo(pResultRow, i, offset), inBuf, len);
×
143
    }
144
    inBuf += len;
×
UNCOV
145
    processedSize += len;
×
146
  }
147

UNCOV
148
  if (processedSize < inBufSize) {
×
149
    // stream stores extra data after result row
150
    size_t leftLen = inBufSize - processedSize;
×
151
    TAOS_MEMORY_REALLOC(*outBuf, *outBufSize + leftLen);
×
152
    if (*outBuf == NULL) {
×
153
      qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen);
×
UNCOV
154
      return terrno;
×
155
    }
156
    (void)memcpy(*outBuf + *outBufSize, inBuf, leftLen);
×
157
    inBuf += leftLen;
×
158
    processedSize += leftLen;
×
UNCOV
159
    *outBufSize += leftLen;
×
160
  }
161

162
  qTrace("[StreamInternal] get result inBufSize:%zu, outBufSize:%zu", inBufSize, *outBufSize);
×
UNCOV
163
  return TSDB_CODE_SUCCESS;
×
164
}
165

166
// Convert result row to buf for rocksdb
167
int32_t putResultRowToBuf(SExprSupp* pSup, const char* inBuf, size_t inBufSize, char** outBuf, size_t* outBufSize) {
×
168
  if (pSup == NULL || inBuf == NULL || outBuf == NULL || outBufSize == NULL) {
×
169
    qError("invalid input parameters, inBuf:%p, pSup:%p, outBufSize:%p, outBuf:%p", inBuf, pSup, outBufSize, outBuf);
×
UNCOV
170
    return TSDB_CODE_INVALID_PARA;
×
171
  }
172

173
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
174
  int32_t*        offset = pSup->rowEntryInfoOffset;
×
175
  SResultRow*     pResultRow = (SResultRow*)inBuf;
×
UNCOV
176
  size_t          rowSize = getResultRowSize(pCtx, pSup->numOfExprs);
×
177

178
  if (rowSize > inBufSize) {
×
179
    qError("invalid input buffer size, rowSize:%zu, inBufSize:%zu", rowSize, inBufSize);
×
UNCOV
180
    return TSDB_CODE_INVALID_PARA;
×
181
  }
182

183
  // calculate the size of output buffer
184
  *outBufSize = rowSize + sizeof(int32_t) * pSup->numOfExprs;
×
185
  if (rowSize < inBufSize) {
×
UNCOV
186
    *outBufSize += inBufSize - rowSize;
×
187
  }
188

189
  *outBuf = taosMemoryMalloc(*outBufSize);
×
190
  if (*outBuf == NULL) {
×
191
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
UNCOV
192
    return terrno;
×
193
  }
194

195
  char* pBuf = *outBuf;
×
196
  pResultRow->version = FUNCTION_RESULT_INFO_VERSION;
×
197
  (void)memcpy(pBuf, pResultRow, sizeof(SResultRow));
×
198
  pBuf += sizeof(SResultRow);
×
199
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
×
200
    size_t len = sizeof(SResultRowEntryInfo) + pCtx[i].resDataInfo.interBufSize;
×
201
    *(int32_t*)pBuf = (int32_t)len;
×
202
    pBuf += sizeof(int32_t);
×
203
    (void)memcpy(pBuf, getResultEntryInfo(pResultRow, i, offset), len);
×
UNCOV
204
    pBuf += len;
×
205
  }
206

UNCOV
207
  if (rowSize < inBufSize) {
×
208
    // stream stores extra data after result row
209
    size_t leftLen = inBufSize - rowSize;
×
210
    (void)memcpy(pBuf, inBuf + rowSize, leftLen);
×
UNCOV
211
    pBuf += leftLen;
×
212
  }
213

214
  qTrace("[StreamInternal] put result inBufSize:%zu, outBufSize:%zu", inBufSize, *outBufSize);
×
UNCOV
215
  return TSDB_CODE_SUCCESS;
×
216
}
217

UNCOV
218
static void freeEx(void* p) { taosMemoryFree(*(void**)p); }
×
219

220
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
123,008,357✔
221
  taosMemoryFreeClear(pGroupResInfo->pBuf);
123,008,357✔
222
  if (pGroupResInfo->freeItem) {
123,002,076✔
223
    //    taosArrayDestroy(pGroupResInfo->pRows);
224
    taosArrayDestroyEx(pGroupResInfo->pRows, freeEx);
×
225
    pGroupResInfo->freeItem = false;
×
UNCOV
226
    pGroupResInfo->pRows = NULL;
×
227
  } else {
228
    taosArrayDestroy(pGroupResInfo->pRows);
122,991,919✔
229
    pGroupResInfo->pRows = NULL;
122,986,681✔
230
  }
231
  pGroupResInfo->index = 0;
122,991,204✔
232
  pGroupResInfo->delIndex = 0;
122,988,898✔
233
}
123,015,089✔
234

235
int32_t resultrowComparAsc(const void* p1, const void* p2) {
2,147,483,647✔
236
  SResKeyPos* pp1 = *(SResKeyPos**)p1;
2,147,483,647✔
237
  SResKeyPos* pp2 = *(SResKeyPos**)p2;
2,147,483,647✔
238

239
  if (pp1->groupId == pp2->groupId) {
2,147,483,647✔
240
    int64_t pts1 = *(int64_t*)pp1->key;
2,147,483,647✔
241
    int64_t pts2 = *(int64_t*)pp2->key;
2,147,483,647✔
242

243
    if (pts1 == pts2) {
2,147,483,647✔
UNCOV
244
      return 0;
×
245
    } else {
246
      return pts1 < pts2 ? -1 : 1;
2,147,483,647✔
247
    }
248
  } else {
249
    return pp1->groupId < pp2->groupId ? -1 : 1;
2,147,483,647✔
250
  }
251
}
252

253
static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); }
2,147,483,647✔
254

255
int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) {
86,106,257✔
256
  int32_t code = TSDB_CODE_SUCCESS;
86,106,257✔
257
  int32_t lino = 0;
86,106,257✔
258
  if (pGroupResInfo->pRows != NULL) {
86,106,257✔
259
    taosArrayDestroy(pGroupResInfo->pRows);
4,654,840✔
260
  }
261
  if (pGroupResInfo->pBuf) {
86,111,374✔
262
    taosMemoryFree(pGroupResInfo->pBuf);
4,654,442✔
263
    pGroupResInfo->pBuf = NULL;
4,653,646✔
264
  }
265

266
  // extract the result rows information from the hash map
267
  int32_t size = tSimpleHashGetSize(pHashmap);
86,101,566✔
268

269
  void* pData = NULL;
86,099,532✔
270
  pGroupResInfo->pRows = taosArrayInit(size, POINTER_BYTES);
86,099,532✔
271
  QUERY_CHECK_NULL(pGroupResInfo->pRows, code, lino, _end, terrno);
86,094,163✔
272

273
  size_t  keyLen = 0;
86,099,728✔
274
  int32_t iter = 0;
86,099,728✔
275
  int64_t bufLen = 0, offset = 0;
86,094,854✔
276

277
  // todo move away and record this during create window
278
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
2,147,483,647✔
279
    /*void* key = */ (void)tSimpleHashGetKey(pData, &keyLen);
280
    bufLen += keyLen + sizeof(SResultRowPosition);
2,147,483,647✔
281
  }
282

283
  pGroupResInfo->pBuf = taosMemoryMalloc(bufLen);
86,010,880✔
284
  QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
86,105,411✔
285

286
  iter = 0;
86,093,464✔
287
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
2,147,483,647✔
288
    void* key = tSimpleHashGetKey(pData, &keyLen);
2,147,483,647✔
289

290
    SResKeyPos* p = (SResKeyPos*)(pGroupResInfo->pBuf + offset);
2,147,483,647✔
291

292
    p->groupId = *(uint64_t*)key;
2,147,483,647✔
293
    p->pos = *(SResultRowPosition*)pData;
2,147,483,647✔
294
    memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
2,147,483,647✔
295
    void* tmp = taosArrayPush(pGroupResInfo->pRows, &p);
2,147,483,647✔
296
    QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
2,147,483,647✔
297

298
    offset += keyLen + sizeof(struct SResultRowPosition);
2,147,483,647✔
299
  }
300

301
  if (order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC) {
85,979,335✔
302
    __compar_fn_t fn = (order == TSDB_ORDER_ASC) ? resultrowComparAsc : resultrowComparDesc;
7,802,247✔
303
    size = POINTER_BYTES;
7,802,247✔
304
    taosSort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), size, fn);
7,802,247✔
305
  }
306

307
  pGroupResInfo->index = 0;
85,981,649✔
308

309
_end:
85,955,371✔
310
  if (code != TSDB_CODE_SUCCESS) {
86,111,701✔
UNCOV
311
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
312
  }
313
  return code;
86,111,189✔
314
}
315

316
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
×
317
  if (pGroupResInfo->pRows != NULL) {
×
UNCOV
318
    taosArrayDestroy(pGroupResInfo->pRows);
×
319
  }
320

321
  pGroupResInfo->freeItem = true;
×
322
  pGroupResInfo->pRows = pArrayList;
×
323
  pGroupResInfo->index = 0;
×
324
  pGroupResInfo->delIndex = 0;
×
UNCOV
325
}
×
326

327
bool hasRemainResults(SGroupResInfo* pGroupResInfo) {
341,641,946✔
328
  if (pGroupResInfo->pRows == NULL) {
341,641,946✔
UNCOV
329
    return false;
×
330
  }
331

332
  return pGroupResInfo->index < taosArrayGetSize(pGroupResInfo->pRows);
341,646,417✔
333
}
334

335
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
182,992,309✔
336
  if (pGroupResInfo->pRows == 0) {
182,992,309✔
UNCOV
337
    return 0;
×
338
  }
339

340
  return (int32_t)taosArrayGetSize(pGroupResInfo->pRows);
183,000,286✔
341
}
342

343
SArray* createSortInfo(SNodeList* pNodeList) {
57,964,672✔
344
  size_t numOfCols = 0;
57,964,672✔
345

346
  if (pNodeList != NULL) {
57,964,672✔
347
    numOfCols = LIST_LENGTH(pNodeList);
57,725,753✔
348
  } else {
349
    numOfCols = 0;
239,003✔
350
  }
351

352
  SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
57,959,469✔
353
  if (pList == NULL) {
57,931,563✔
UNCOV
354
    return pList;
×
355
  }
356

357
  for (int32_t i = 0; i < numOfCols; ++i) {
131,026,510✔
358
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)nodesListGetNode(pNodeList, i);
73,074,149✔
359
    if (!pSortKey) {
73,091,848✔
360
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
4,616✔
361
      taosArrayDestroy(pList);
4,616✔
362
      pList = NULL;
×
363
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
364
      break;
×
365
    }
366
    SBlockOrderInfo bi = {0};
73,087,232✔
367
    bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
73,089,195✔
368
    bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
73,081,731✔
369

370
    if (nodeType(pSortKey->pExpr) != QUERY_NODE_COLUMN) {
73,060,360✔
371
      qError("invalid order by expr type:%d", nodeType(pSortKey->pExpr));
×
372
      taosArrayDestroy(pList);
×
373
      pList = NULL;
×
374
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
375
      break;
×
376
    }
377
    
378
    SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr;
73,038,505✔
379
    bi.slotId = pColNode->slotId;
73,088,817✔
380
    void* tmp = taosArrayPush(pList, &bi);
73,096,858✔
381
    if (!tmp) {
73,096,858✔
382
      taosArrayDestroy(pList);
×
383
      pList = NULL;
×
UNCOV
384
      break;
×
385
    }
386
  }
387

388
  return pList;
57,953,276✔
389
}
390

391
SSDataBlock* createDataBlockFromDescNode(void* p) {
660,422,750✔
392
  SDataBlockDescNode* pNode = (SDataBlockDescNode*)p;
660,422,750✔
393
  int32_t      numOfCols = LIST_LENGTH(pNode->pSlots);
660,422,750✔
394
  SSDataBlock* pBlock = NULL;
660,750,263✔
395
  int32_t      code = createDataBlock(&pBlock);
660,707,869✔
396
  if (code) {
660,483,758✔
397
    terrno = code;
×
UNCOV
398
    return NULL;
×
399
  }
400

401
  pBlock->info.id.blockId = pNode->dataBlockId;
660,569,884✔
402
  pBlock->info.type = STREAM_INVALID;
660,581,570✔
403
  pBlock->info.calWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
660,598,785✔
404
  pBlock->info.watermark = INT64_MIN;
660,843,938✔
405

406
  for (int32_t i = 0; i < numOfCols; ++i) {
2,147,483,647✔
407
    SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
2,147,483,647✔
408
    if (!pDescNode) {
2,147,483,647✔
409
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
410
      blockDataDestroy(pBlock);
×
411
      pBlock = NULL;
×
412
      terrno = TSDB_CODE_INVALID_PARA;
×
UNCOV
413
      break;
×
414
    }
415
    SColumnInfoData idata =
2,147,483,647✔
416
        createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
2,147,483,647✔
417
    idata.info.scale = pDescNode->dataType.scale;
2,147,483,647✔
418
    idata.info.precision = pDescNode->dataType.precision;
2,147,483,647✔
419
    idata.info.noData = pDescNode->reserve;
2,147,483,647✔
420

421
    code = blockDataAppendColInfo(pBlock, &idata);
2,147,483,647✔
422
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
423
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
39,816✔
424
      blockDataDestroy(pBlock);
39,816✔
425
      pBlock = NULL;
×
426
      terrno = code;
×
UNCOV
427
      break;
×
428
    }
429
  }
430

431
  return pBlock;
661,269,986✔
432
}
433

434
int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) {
209,887,433✔
435
  SDataBlockInfo* pBlockInfo = &pDataBlock->info;
209,887,433✔
436

437
  for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) {
986,842,454✔
438
    SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i);
794,471,136✔
439
    if (!pItem) {
794,285,107✔
440
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
58,526✔
441
      return terrno;
58,526✔
442
    }
443

444
    if (pItem->isPk) {
794,226,581✔
445
      SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId);
17,837,695✔
446
      if (!pInfoData) {
17,083,843✔
447
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
448
        return terrno;
×
449
      }
450
      pBlockInfo->pks[0].type = pInfoData->info.type;
17,083,843✔
451
      pBlockInfo->pks[1].type = pInfoData->info.type;
17,108,363✔
452

453
      // allocate enough buffer size, which is pInfoData->info.bytes
454
      if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
17,106,023✔
455
        pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
5,360,207✔
456
        if (pBlockInfo->pks[0].pData == NULL) {
5,356,537✔
UNCOV
457
          return terrno;
×
458
        }
459

460
        pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
5,357,739✔
461
        if (pBlockInfo->pks[1].pData == NULL) {
5,359,005✔
462
          taosMemoryFreeClear(pBlockInfo->pks[0].pData);
×
UNCOV
463
          return terrno;
×
464
        }
465

466
        pBlockInfo->pks[0].nData = pInfoData->info.bytes;
5,358,420✔
467
        pBlockInfo->pks[1].nData = pInfoData->info.bytes;
5,359,558✔
468
      }
469

470
      break;
17,112,746✔
471
    }
472
  }
473

474
  return TSDB_CODE_SUCCESS;
209,423,404✔
475
}
476

477
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
2,386,391✔
478
  STransTagExprCtx* pCtx = pContext;
2,386,391✔
479
  SMetaReader*      mr = pCtx->pReader;
2,386,391✔
480
  bool              isTagCol = false, isTbname = false;
2,386,391✔
481
  if (nodeType(*pNode) == QUERY_NODE_COLUMN) {
2,386,391✔
482
    SColumnNode* pCol = (SColumnNode*)*pNode;
681,826✔
483
    if (pCol->colType == COLUMN_TYPE_TBNAME)
681,826✔
UNCOV
484
      isTbname = true;
×
485
    else
486
      isTagCol = true;
681,826✔
487
  } else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) {
1,704,565✔
488
    SFunctionNode* pFunc = (SFunctionNode*)*pNode;
×
UNCOV
489
    if (pFunc->funcType == FUNCTION_TYPE_TBNAME) isTbname = true;
×
490
  }
491
  if (isTagCol) {
2,386,391✔
492
    SColumnNode* pSColumnNode = *(SColumnNode**)pNode;
681,826✔
493

494
    SValueNode* res = NULL;
681,826✔
495
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
681,826✔
496
    if (NULL == res) {
681,826✔
UNCOV
497
      return DEAL_RES_ERROR;
×
498
    }
499

500
    res->translate = true;
681,826✔
501
    res->node.resType = pSColumnNode->node.resType;
681,826✔
502

503
    STagVal tagVal = {0};
681,826✔
504
    tagVal.cid = pSColumnNode->colId;
681,826✔
505
    const char* p = mr->pAPI->extractTagVal(mr->me.ctbEntry.pTags, pSColumnNode->node.resType.type, &tagVal);
681,826✔
506
    if (p == NULL) {
681,826✔
UNCOV
507
      res->node.resType.type = TSDB_DATA_TYPE_NULL;
×
508
    } else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) {
681,826✔
509
      int32_t len = ((const STag*)p)->len;
×
510
      res->datum.p = taosMemoryCalloc(len + 1, 1);
×
511
      if (NULL == res->datum.p) {
×
UNCOV
512
        return DEAL_RES_ERROR;
×
513
      }
UNCOV
514
      memcpy(res->datum.p, p, len);
×
515
    } else if (IS_VAR_DATA_TYPE(pSColumnNode->node.resType.type)) {
681,826✔
516
      if (IS_STR_DATA_BLOB(pSColumnNode->node.resType.type)) {
681,826✔
UNCOV
517
        return TSDB_CODE_BLOB_NOT_SUPPORT_TAG;
×
518
      }
519

520
      res->datum.p = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1);
681,826✔
521
      if (NULL == res->datum.p) {
681,826✔
UNCOV
522
        return DEAL_RES_ERROR;
×
523
      }
524

525
      if (IS_STR_DATA_BLOB(pSColumnNode->node.resType.type)) {
681,826✔
526
        memcpy(blobDataVal(res->datum.p), tagVal.pData, tagVal.nData);
×
UNCOV
527
        blobDataSetLen(res->datum.p, tagVal.nData);
×
528
      } else {
529
        memcpy(varDataVal(res->datum.p), tagVal.pData, tagVal.nData);
681,826✔
530
        varDataSetLen(res->datum.p, tagVal.nData);
681,826✔
531
      }
532
    } else {
533
      int32_t code = nodesSetValueNodeValue(res, &(tagVal.i64));
×
534
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
535
        return DEAL_RES_ERROR;
×
536
      }
537
    }
538
    nodesDestroyNode(*pNode);
681,826✔
539
    *pNode = (SNode*)res;
681,826✔
540
  } else if (isTbname) {
1,704,565✔
541
    SValueNode* res = NULL;
×
542
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
×
543
    if (NULL == res) {
×
UNCOV
544
      return DEAL_RES_ERROR;
×
545
    }
546

547
    res->translate = true;
×
UNCOV
548
    res->node.resType = ((SExprNode*)(*pNode))->resType;
×
549

550
    int32_t len = strlen(mr->me.name);
×
551
    res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
×
552
    if (NULL == res->datum.p) {
×
UNCOV
553
      return DEAL_RES_ERROR;
×
554
    }
555
    memcpy(varDataVal(res->datum.p), mr->me.name, len);
×
556
    varDataSetLen(res->datum.p, len);
×
557
    nodesDestroyNode(*pNode);
×
UNCOV
558
    *pNode = (SNode*)res;
×
559
  }
560

561
  return DEAL_RES_CONTINUE;
2,386,391✔
562
}
563

564
int32_t isQualifiedTable(int64_t uid, SNode* pTagCond, void* vnode, bool* pQualified, SStorageAPI* pAPI) {
340,913✔
565
  int32_t     code = TSDB_CODE_SUCCESS;
340,913✔
566
  SMetaReader mr = {0};
340,913✔
567

568
  pAPI->metaReaderFn.initReader(&mr, vnode, META_READER_LOCK, &pAPI->metaFn);
340,913✔
569
  code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid);
340,913✔
570
  if (TSDB_CODE_SUCCESS != code) {
340,913✔
571
    pAPI->metaReaderFn.clearReader(&mr);
×
UNCOV
572
    *pQualified = false;
×
573

UNCOV
574
    return TSDB_CODE_SUCCESS;
×
575
  }
576

577
  SNode* pTagCondTmp = NULL;
340,913✔
578
  code = nodesCloneNode(pTagCond, &pTagCondTmp);
340,913✔
579
  if (TSDB_CODE_SUCCESS != code) {
340,913✔
580
    *pQualified = false;
×
581
    pAPI->metaReaderFn.clearReader(&mr);
×
UNCOV
582
    return code;
×
583
  }
584
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
340,913✔
585
  nodesRewriteExprPostOrder(&pTagCondTmp, doTranslateTagExpr, &ctx);
340,913✔
586
  pAPI->metaReaderFn.clearReader(&mr);
340,913✔
587
  if (TSDB_CODE_SUCCESS != ctx.code) {
340,913✔
588
    *pQualified = false;
×
589
    nodesDestroyNode(pTagCondTmp);
×
590
    terrno = code;
×
UNCOV
591
    return code;
×
592
  }
593

594
  SNode* pNew = NULL;
340,913✔
595
  code = scalarCalculateConstants(pTagCondTmp, &pNew);
340,913✔
596
  if (TSDB_CODE_SUCCESS != code) {
340,913✔
597
    terrno = code;
×
598
    nodesDestroyNode(pTagCondTmp);
×
UNCOV
599
    *pQualified = false;
×
600

UNCOV
601
    return code;
×
602
  }
603

604
  SValueNode* pValue = (SValueNode*)pNew;
340,913✔
605
  *pQualified = pValue->datum.b;
340,913✔
606

607
  nodesDestroyNode(pNew);
340,913✔
608
  return TSDB_CODE_SUCCESS;
340,913✔
609
}
610

611
static EDealRes getColumn(SNode** pNode, void* pContext) {
56,247,269✔
612
  tagFilterAssist* pData = (tagFilterAssist*)pContext;
56,247,269✔
613
  SColumnNode*     pSColumnNode = NULL;
56,247,269✔
614
  if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
56,256,148✔
615
    pSColumnNode = *(SColumnNode**)pNode;
18,977,282✔
616
  } else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) {
37,297,007✔
617
    SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
714,770✔
618
    if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
713,795✔
619
      pData->code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pSColumnNode);
669,269✔
620
      if (NULL == pSColumnNode) {
670,648✔
UNCOV
621
        return DEAL_RES_ERROR;
×
622
      }
623
      pSColumnNode->colId = -1;
670,648✔
624
      pSColumnNode->colType = COLUMN_TYPE_TBNAME;
670,648✔
625
      pSColumnNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
670,648✔
626
      pSColumnNode->node.resType.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
670,648✔
627
      nodesDestroyNode(*pNode);
670,648✔
628
      *pNode = (SNode*)pSColumnNode;
670,648✔
629
    } else {
630
      return DEAL_RES_CONTINUE;
44,122✔
631
    }
632
  } else {
633
    return DEAL_RES_CONTINUE;
36,578,213✔
634
  }
635

636
  void* data = taosHashGet(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId));
19,653,075✔
637
  if (!data) {
19,628,019✔
638
    int32_t tempRes =
639
        taosHashPut(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode)));
16,901,486✔
640
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
16,922,724✔
UNCOV
641
      return DEAL_RES_ERROR;
×
642
    }
643
    pSColumnNode->slotId = pData->index++;
16,922,724✔
644
    SColumnInfo cInfo = {.colId = pSColumnNode->colId,
16,920,257✔
645
                         .type = pSColumnNode->node.resType.type,
16,904,802✔
646
                         .bytes = pSColumnNode->node.resType.bytes,
16,915,492✔
647
                         .pk = pSColumnNode->isPk};
16,899,193✔
648
#if TAG_FILTER_DEBUG
649
    qDebug("tagfilter build column info, slotId:%d, colId:%d, type:%d", pSColumnNode->slotId, cInfo.colId, cInfo.type);
650
#endif
651
    void* tmp = taosArrayPush(pData->cInfoList, &cInfo);
16,917,493✔
652
    if (!tmp) {
16,926,555✔
UNCOV
653
      return DEAL_RES_ERROR;
×
654
    }
655
  } else {
656
    SColumnNode* col = *(SColumnNode**)data;
2,726,533✔
657
    pSColumnNode->slotId = col->slotId;
2,726,533✔
658
  }
659

660
  return DEAL_RES_CONTINUE;
19,638,522✔
661
}
662

663
static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) {
15,733,380✔
664
  SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
15,733,380✔
665
  if (pColumnData == NULL) {
15,724,630✔
UNCOV
666
    return terrno;
×
667
  }
668

669
  pColumnData->info.type = pType->type;
15,724,630✔
670
  pColumnData->info.bytes = pType->bytes;
15,728,213✔
671
  pColumnData->info.scale = pType->scale;
15,734,219✔
672
  pColumnData->info.precision = pType->precision;
15,718,498✔
673

674
  int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
15,732,901✔
675
  if (code != TSDB_CODE_SUCCESS) {
15,705,370✔
676
    terrno = code;
×
677
    releaseColInfoData(pColumnData);
×
UNCOV
678
    return terrno;
×
679
  }
680

681
  pParam->columnData = pColumnData;
15,705,370✔
682
  pParam->colAlloced = true;
15,723,301✔
683
  return TSDB_CODE_SUCCESS;
15,702,949✔
684
}
685

686
static void releaseColInfoData(void* pCol) {
4,197,087✔
687
  if (pCol) {
4,197,087✔
688
    SColumnInfoData* col = (SColumnInfoData*)pCol;
4,197,087✔
689
    colDataDestroy(col);
4,197,087✔
690
    taosMemoryFree(col);
4,197,087✔
691
  }
692
}
4,196,634✔
693

694
void freeItem(void* p) {
186,374,565✔
695
  STUidTagInfo* pInfo = p;
186,374,565✔
696
  if (pInfo->pTagVal != NULL) {
186,374,565✔
697
    taosMemoryFree(pInfo->pTagVal);
186,042,844✔
698
  }
699
}
186,360,637✔
700

701
typedef struct {
702
  col_id_t  colId;
703
  SNode*    pValueNode;
704
  int32_t   bytes;  // length defined in schema
705
} STagDataEntry;
706

707
static int compareTagDataEntry(const void* a, const void* b) {
20,790✔
708
  STagDataEntry* p1 = (STagDataEntry*)a;
20,790✔
709
  STagDataEntry* p2 = (STagDataEntry*)b;
20,790✔
710
  return compareInt16Val(&p1->colId, &p2->colId);
20,790✔
711
}
712

713
static int32_t buildTagDataEntryKey(SArray* pIdWithValue, char** keyBuf, int32_t keyLen) {
10,395✔
714
  *keyBuf = (char*)taosMemoryCalloc(1, keyLen);
10,395✔
715
  if (NULL == *keyBuf) {
10,395✔
UNCOV
716
    qError(
×
717
      "failed to allocate memory for tag filter optimization key, size:%d",
718
      keyLen);
UNCOV
719
    return terrno;
×
720
  }
721
  char* pStart = *keyBuf;
10,395✔
722
  for (int32_t i = 0; i < taosArrayGetSize(pIdWithValue); ++i) {
31,185✔
723
    STagDataEntry* entry      = (STagDataEntry*)taosArrayGet(pIdWithValue, i);
20,790✔
724
    SValueNode*    pValueNode = (SValueNode*)entry->pValueNode;
20,790✔
725
    // num type may have different bytes length, use the smaller one
726
    int32_t        bytes = TMIN(entry->bytes, pValueNode->node.resType.bytes);
20,790✔
727

728
    (void)memcpy(pStart, &entry->colId, sizeof(col_id_t));
20,790✔
729
    pStart += sizeof(col_id_t);
20,790✔
730

731
    if (!pValueNode->isNull) {
20,790✔
732
      switch (pValueNode->node.resType.type) {
18,315✔
733
        case TSDB_DATA_TYPE_BOOL:
2,145✔
734
          (void)memcpy(
2,145✔
735
            pStart, &pValueNode->datum.b, bytes);
2,145✔
736
          pStart += bytes;
2,145✔
737
          break;
2,145✔
738
        case TSDB_DATA_TYPE_TINYINT:
9,075✔
739
        case TSDB_DATA_TYPE_SMALLINT:
740
        case TSDB_DATA_TYPE_INT:
741
        case TSDB_DATA_TYPE_BIGINT:
742
        case TSDB_DATA_TYPE_TIMESTAMP:
743
          (void)memcpy(
9,075✔
744
            pStart, &pValueNode->datum.i, bytes);
9,075✔
745
          pStart += bytes;
9,075✔
746
          break;
9,075✔
UNCOV
747
        case TSDB_DATA_TYPE_UTINYINT:
×
748
        case TSDB_DATA_TYPE_USMALLINT:
749
        case TSDB_DATA_TYPE_UINT:
750
        case TSDB_DATA_TYPE_UBIGINT:
751
          (void)memcpy(
×
752
            pStart, &pValueNode->datum.u, bytes);
×
753
          pStart += bytes;
×
UNCOV
754
          break;
×
755
        case TSDB_DATA_TYPE_FLOAT:
2,475✔
756
        case TSDB_DATA_TYPE_DOUBLE:
757
          (void)memcpy(
2,475✔
758
            pStart, &pValueNode->datum.d, bytes);
2,475✔
759
          pStart += bytes;
2,475✔
760
          break;
2,475✔
761
        case TSDB_DATA_TYPE_VARCHAR:
4,620✔
762
        case TSDB_DATA_TYPE_VARBINARY:
763
        case TSDB_DATA_TYPE_NCHAR:
764
          (void)memcpy(pStart,
4,620✔
765
            varDataVal(pValueNode->datum.p), varDataLen(pValueNode->datum.p));
4,620✔
766
          pStart += varDataLen(pValueNode->datum.p);
4,620✔
767
          break;
4,620✔
768
        default:
×
UNCOV
769
          qError("unsupported tag data type %d in tag filter optimization",
×
770
            pValueNode->node.resType.type);
UNCOV
771
          return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
772
      }
773
    }
774
  }
775

776
  return TSDB_CODE_SUCCESS;
10,395✔
777
}
778

779
static void extractTagDataEntry(
20,790✔
780
  SOperatorNode* pOpNode, SArray* pIdWithValue) {
781
  SNode* pLeft = pOpNode->pLeft;
20,790✔
782
  SNode* pRight = pOpNode->pRight;
20,790✔
783
  SColumnNode* pColNode = nodeType(pLeft) == QUERY_NODE_COLUMN ?
20,790✔
784
    (SColumnNode*)pLeft : (SColumnNode*)pRight;
20,625✔
785
  SValueNode* pValueNode = nodeType(pLeft) == QUERY_NODE_VALUE ?
20,625✔
786
    (SValueNode*)pLeft : (SValueNode*)pRight;
20,790✔
787

788
  STagDataEntry entry = {0};
20,790✔
789
  entry.colId = pColNode->colId;
20,790✔
790
  entry.pValueNode = (SNode*)pValueNode;
20,790✔
791
  entry.bytes = pColNode->node.resType.bytes;
20,790✔
792
  void* _tmp = taosArrayPush(pIdWithValue, &entry);
20,790✔
793
}
20,790✔
794

795
static int32_t extractTagFilterTagDataEntries(
10,395✔
796
  const SNode* pTagCond, SArray* pIdWithVal) {
797
  if (NULL == pTagCond || NULL == pIdWithVal ||
10,395✔
798
    (nodeType(pTagCond) != QUERY_NODE_OPERATOR &&
10,395✔
799
      nodeType(pTagCond) != QUERY_NODE_LOGIC_CONDITION)) {
10,395✔
800
    qError("invalid parameter to extract tag filter symbol");
×
UNCOV
801
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
802
  }
803

804
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR) {
10,395✔
UNCOV
805
    extractTagDataEntry((SOperatorNode*)pTagCond, pIdWithVal);
×
806
  } else if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION) {
10,395✔
807
    SNode* pChild = NULL;
10,395✔
808
    FOREACH(pChild, ((SLogicConditionNode*)pTagCond)->pParameterList) {
31,185✔
809
      extractTagDataEntry((SOperatorNode*)pChild, pIdWithVal);
20,790✔
810
    }
811
  }
812

813
  taosArraySort(pIdWithVal, compareTagDataEntry);
10,230✔
814

815
  return TSDB_CODE_SUCCESS;
10,395✔
816
}
817

818
static int32_t genStableTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
10,395✔
819
  if (pTagCond == NULL) {
10,395✔
UNCOV
820
    return TSDB_CODE_SUCCESS;
×
821
  }
822

823
  char*   payload = NULL;
10,395✔
824
  int32_t len = 0;
10,395✔
825
  int32_t code = TSDB_CODE_SUCCESS;
10,395✔
826
  int32_t lino = 0;
10,395✔
827

828
  SArray* pIdWithVal = taosArrayInit(TARRAY_MIN_SIZE, sizeof(STagDataEntry));
10,395✔
829
  code = extractTagFilterTagDataEntries(pTagCond, pIdWithVal);
10,395✔
830
  QUERY_CHECK_CODE(code, lino, _end);
10,395✔
831
  for (int32_t i = 0; i < taosArrayGetSize(pIdWithVal); ++i) {
31,185✔
832
    STagDataEntry* pEntry = taosArrayGet(pIdWithVal, i);
20,790✔
833
    len += sizeof(col_id_t) + pEntry->bytes;
20,790✔
834
  }
835
  code = buildTagDataEntryKey(pIdWithVal, &payload, len);
10,230✔
836
  QUERY_CHECK_CODE(code, lino, _end);
10,395✔
837

838
  tMD5Init(pContext);
10,395✔
839
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
10,395✔
840
  tMD5Final(pContext);
10,395✔
841

842
_end:
10,395✔
843
  if (TSDB_CODE_SUCCESS != code) {
10,395✔
UNCOV
844
    qError("%s failed at line %d since %s",
×
845
      __func__, __LINE__, tstrerror(code));
846
  }
847
  taosArrayDestroy(pIdWithVal);
10,395✔
848
  taosMemoryFree(payload);
10,395✔
849
  return code;
10,395✔
850
}
851

852
static int32_t genTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
29,838✔
853
  if (pTagCond == NULL) {
29,838✔
854
    return TSDB_CODE_SUCCESS;
28,113✔
855
  }
856

857
  char*   payload = NULL;
1,725✔
858
  int32_t len = 0;
1,725✔
859
  int32_t code = nodesNodeToMsg(pTagCond, &payload, &len);
1,725✔
860
  if (code != TSDB_CODE_SUCCESS) {
1,725✔
861
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
862
    return code;
×
863
  }
864

865
  tMD5Init(pContext);
1,725✔
866
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
1,725✔
867
  tMD5Final(pContext);
1,725✔
868

869
  // void* tmp = NULL;
870
  // uint32_t size = 0;
871
  // (void)taosAscii2Hex((const char*)pContext->digest, 16, &tmp, &size);
872
  // qInfo("tag filter digest payload: %s", tmp);
873
  // taosMemoryFree(tmp);
874

875
  taosMemoryFree(payload);
1,725✔
876
  return TSDB_CODE_SUCCESS;
1,725✔
877
}
878

879
static int32_t genTbGroupDigest(const SNode* pGroup, uint8_t* filterDigest, T_MD5_CTX* pContext) {
×
880
  int32_t code = TSDB_CODE_SUCCESS;
×
881
  int32_t lino = 0;
×
882
  char*   payload = NULL;
×
883
  int32_t len = 0;
×
884
  code = nodesNodeToMsg(pGroup, &payload, &len);
×
UNCOV
885
  QUERY_CHECK_CODE(code, lino, _end);
×
886

887
  if (filterDigest[0]) {
×
888
    payload = taosMemoryRealloc(payload, len + tListLen(pContext->digest));
×
889
    QUERY_CHECK_NULL(payload, code, lino, _end, terrno);
×
890
    memcpy(payload + len, filterDigest + 1, tListLen(pContext->digest));
×
UNCOV
891
    len += tListLen(pContext->digest);
×
892
  }
893

894
  tMD5Init(pContext);
×
895
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
×
UNCOV
896
  tMD5Final(pContext);
×
897

898
_end:
×
899
  taosMemoryFree(payload);
×
900
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
901
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
902
  }
UNCOV
903
  return code;
×
904
}
905

906
int32_t qGetColumnsFromNodeList(void* data, bool isList, SArray** pColList) {
15,566,160✔
907
  int32_t code = TSDB_CODE_SUCCESS;
15,566,160✔
908
  tagFilterAssist ctx = {0};
15,566,160✔
909
  ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
15,566,636✔
910
  if (ctx.colHash == NULL) {
15,566,675✔
911
    code = terrno;
×
UNCOV
912
    goto end;
×
913
  }
914

915
  ctx.index = 0;
15,566,675✔
916
  ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
15,566,675✔
917
  if (ctx.cInfoList == NULL) {
15,566,018✔
918
    code = terrno;
5,326✔
UNCOV
919
    goto end;
×
920
  }
921

922
  if (isList) {
15,560,692✔
923
    SNode* pNode = NULL;
4,021,872✔
924
    FOREACH(pNode, (SNodeList*)data) {
8,219,451✔
925
      nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx);
4,197,579✔
926
      if (TSDB_CODE_SUCCESS != ctx.code) {
4,197,579✔
927
        code = ctx.code;
×
UNCOV
928
        goto end;
×
929
      }
930
      REPLACE_NODE(pNode);
4,197,579✔
931
    }
932
  } else {
933
    SNode* pNode = (SNode*)data;
11,538,820✔
934
    nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx);
11,539,627✔
935
    if (TSDB_CODE_SUCCESS != ctx.code) {
11,537,153✔
936
      code = ctx.code;
×
UNCOV
937
      goto end;
×
938
    }
939
  }
940
  
941
  if (pColList != NULL) *pColList = ctx.cInfoList;
15,540,694✔
942
  ctx.cInfoList = NULL;
15,553,147✔
943

944
end:
15,573,528✔
945
  taosHashCleanup(ctx.colHash);
15,545,171✔
946
  taosArrayDestroy(ctx.cInfoList);
15,522,940✔
947
  return code;
15,530,454✔
948
}
949

950
static int32_t buildGroupInfo(SColumnInfoData* pValue, int32_t i, SArray* gInfo) {
428,474✔
951
  int32_t code = TSDB_CODE_SUCCESS;
428,474✔
952
  SStreamGroupValue* v = taosArrayReserve(gInfo, 1);
428,474✔
953
  if (v == NULL) {
428,294✔
954
    code = terrno;
×
UNCOV
955
    goto end;
×
956
  }
957
  if (colDataIsNull_s(pValue, i)) {
856,768✔
958
    v->isNull = true;
6,600✔
959
  } else {
960
    v->isNull = false;
421,874✔
961
    char* data = colDataGetData(pValue, i);
421,874✔
962
    if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
421,694✔
963
      if (tTagIsJson(data)) {
×
964
        code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
×
UNCOV
965
        goto end;
×
966
      }
967
      if (tTagIsJsonNull(data)) {
×
968
        v->isNull = true;
×
UNCOV
969
        goto end;
×
970
      }
971
      int32_t len = getJsonValueLen(data);
×
972
      v->data.type = pValue->info.type;
×
973
      v->data.nData = len;
×
974
      v->data.pData = taosMemoryCalloc(1, len + 1);
×
975
      if (v->data.pData == NULL) {
×
976
        code = terrno;
×
UNCOV
977
        goto end;
×
978
      }
979
      memcpy(v->data.pData, data, len);
×
UNCOV
980
      qDebug("buildGroupInfo:%d add json data len:%d, data:%s", i, len, (char*)v->data.pData);
×
981
    } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
421,694✔
982
      if (varDataTLen(data) > pValue->info.bytes) {
310,383✔
983
        code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
UNCOV
984
        goto end;
×
985
      }
986
      v->data.type = pValue->info.type;
310,203✔
987
      v->data.nData = varDataLen(data);
310,203✔
988
      v->data.pData = taosMemoryCalloc(1, varDataLen(data) + 1);
310,203✔
989
      if (v->data.pData == NULL) {
310,419✔
990
        code = terrno;
×
UNCOV
991
        goto end;
×
992
      }
993
      memcpy(v->data.pData, varDataVal(data), varDataLen(data));
310,419✔
994
      qDebug("buildGroupInfo:%d add var data type:%d, len:%d, data:%s", i, pValue->info.type, varDataLen(data), (char*)v->data.pData);
310,419✔
995
    } else if (pValue->info.type == TSDB_DATA_TYPE_DECIMAL) {  // reader todo decimal
111,491✔
996
      v->data.type = pValue->info.type;
×
997
      v->data.nData = pValue->info.bytes;
×
998
      v->data.pData = taosMemoryCalloc(1, pValue->info.bytes);
×
999
      if (v->data.pData == NULL) {
×
1000
        code = terrno;
×
UNCOV
1001
        goto end;
×
1002
      }
1003
      memcpy(&v->data.pData, data, pValue->info.bytes);
×
UNCOV
1004
      qDebug("buildGroupInfo:%d add data type:%d, data:%"PRId64, i, pValue->info.type, v->data.val);
×
1005
    } else {  // reader todo decimal
1006
      v->data.type = pValue->info.type;
111,671✔
1007
      memcpy(&v->data.val, data, pValue->info.bytes);
111,491✔
1008
      qDebug("buildGroupInfo:%d add data type:%d, data:%"PRId64, i, pValue->info.type, v->data.val);
111,671✔
1009
    }
1010
  }
1011
end:
41,322✔
1012
  if (code != TSDB_CODE_SUCCESS) {
428,690✔
1013
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
1014
    v->isNull = true;
×
1015
  }
1016
  return code;
428,690✔
1017
}
1018

1019
static void getColInfoResultForGroupbyForStream(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo,
67,011✔
1020
                                   SStorageAPI* pAPI, SHashObj* groupIdMap) {
1021
  int32_t      code = TSDB_CODE_SUCCESS;
67,011✔
1022
  int32_t      lino = 0;
67,011✔
1023
  SArray*      pBlockList = NULL;
67,011✔
1024
  SSDataBlock* pResBlock = NULL;
67,011✔
1025
  SArray*      groupData = NULL;
67,011✔
1026
  SArray*      pUidTagList = NULL;
67,011✔
1027
  SArray*      gInfo = NULL;
67,011✔
1028
  int32_t      tbNameIndex = 0;
67,011✔
1029

1030
  int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
67,011✔
1031
  if (rows == 0) {
67,011✔
UNCOV
1032
    return;
×
1033
  }
1034

1035
  pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
67,011✔
1036
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
67,011✔
1037

1038
  for (int32_t i = 0; i < rows; ++i) {
302,533✔
1039
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
235,522✔
1040
    QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
235,522✔
1041
    STUidTagInfo info = {.uid = pkeyInfo->uid};
235,522✔
1042
    void*        tmp = taosArrayPush(pUidTagList, &info);
235,522✔
1043
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
235,522✔
1044
  }
1045
 
1046
  if (taosArrayGetSize(pUidTagList) > 0) {
67,011✔
1047
    code = pAPI->metaFn.getTableTagsByUid(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
67,011✔
1048
  } else {
UNCOV
1049
    code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
×
1050
  }
1051
  if (code != TSDB_CODE_SUCCESS) {
67,011✔
UNCOV
1052
    goto end;
×
1053
  }
1054

1055
  SArray* pColList = NULL;
67,011✔
1056
  code = qGetColumnsFromNodeList(group, true, &pColList);
67,011✔
1057
  if (code != TSDB_CODE_SUCCESS) {
67,011✔
UNCOV
1058
    goto end;
×
1059
  }
1060

1061
  for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
175,056✔
1062
    SColumnInfo* tmp = (SColumnInfo*)taosArrayGet(pColList, i);
108,045✔
1063
    if (tmp != NULL && tmp->colId == -1) {
108,045✔
1064
      tbNameIndex = i;
67,011✔
1065
    }
1066
  }
1067
  
1068
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
67,011✔
1069
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
67,011✔
1070
  taosArrayDestroy(pColList);
67,011✔
1071
  if (pResBlock == NULL) {
67,011✔
1072
    code = terrno;
246✔
1073
    goto end;
246✔
1074
  }
1075

1076
  pBlockList = taosArrayInit(2, POINTER_BYTES);
66,765✔
1077
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
66,765✔
1078

1079
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
66,765✔
1080
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
66,765✔
1081

1082
  groupData = taosArrayInit(2, POINTER_BYTES);
66,765✔
1083
  QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
66,765✔
1084

1085
  SNode* pNode = NULL;
66,765✔
1086
  FOREACH(pNode, group) {
174,318✔
1087
    SScalarParam output = {0};
107,553✔
1088

1089
    switch (nodeType(pNode)) {
107,553✔
1090
      case QUERY_NODE_VALUE:
×
UNCOV
1091
        break;
×
1092
      case QUERY_NODE_COLUMN:
107,553✔
1093
      case QUERY_NODE_OPERATOR:
1094
      case QUERY_NODE_FUNCTION: {
1095
        SExprNode* expNode = (SExprNode*)pNode;
107,553✔
1096
        code = createResultData(&expNode->resType, rows, &output);
107,553✔
1097
        if (code != TSDB_CODE_SUCCESS) {
107,553✔
UNCOV
1098
          goto end;
×
1099
        }
1100
        break;
107,553✔
1101
      }
1102

1103
      default:
×
1104
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
UNCOV
1105
        goto end;
×
1106
    }
1107

1108
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
107,553✔
1109
      SColumnNode*     pSColumnNode = (SColumnNode*)pNode;
107,553✔
1110
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
107,553✔
1111
      QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
107,553✔
1112
      code = colDataAssign(output.columnData, pColInfo, rows, NULL);
107,553✔
1113
    } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
×
UNCOV
1114
      continue;
×
1115
    } else {
1116
      gTaskScalarExtra.pStreamInfo = NULL;
×
1117
      gTaskScalarExtra.pStreamRange = NULL;
×
UNCOV
1118
      code = scalarCalculate(pNode, pBlockList, &output, &gTaskScalarExtra);
×
1119
    }
1120

1121
    if (code != TSDB_CODE_SUCCESS) {
107,553✔
1122
      releaseColInfoData(output.columnData);
×
UNCOV
1123
      goto end;
×
1124
    }
1125

1126
    void* tmp = taosArrayPush(groupData, &output.columnData);
107,553✔
1127
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
107,553✔
1128
  }
1129

1130
  for (int i = 0; i < rows; i++) {
302,041✔
1131
    gInfo = taosArrayInit(taosArrayGetSize(groupData), sizeof(SStreamGroupValue));
235,276✔
1132
    QUERY_CHECK_NULL(gInfo, code, lino, end, terrno);
235,276✔
1133

1134
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
235,276✔
1135
    QUERY_CHECK_NULL(info, code, lino, end, terrno);
235,276✔
1136

1137
    for (int j = 0; j < taosArrayGetSize(groupData); j++) {
589,447✔
1138
      SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
354,171✔
1139
        int32_t ret = buildGroupInfo(pValue, i, gInfo);
354,171✔
1140
        if (ret != TSDB_CODE_SUCCESS) {
354,171✔
1141
          qError("buildGroupInfo failed at line %d since %s", __LINE__, tstrerror(ret));
×
UNCOV
1142
          goto end;
×
1143
        }
1144
        if (j == tbNameIndex) {
354,171✔
1145
          SStreamGroupValue* v = taosArrayGetLast(gInfo);
235,276✔
1146
          if (v != NULL){
235,276✔
1147
            v->isTbname = true;
235,276✔
1148
            v->uid = info->uid;
235,276✔
1149
          }
1150
        }
1151
    }
1152

1153
    int32_t ret = taosHashPut(groupIdMap, &info->uid, sizeof(info->uid), &gInfo, POINTER_BYTES);
235,276✔
1154
    if (ret != TSDB_CODE_SUCCESS) {
235,276✔
1155
      qError("put groupid to map failed at line %d since %s", __LINE__, tstrerror(ret));
×
UNCOV
1156
      goto end;
×
1157
    }
1158
    qDebug("put groupid to map gid:%" PRIu64, info->uid);
235,276✔
1159
    gInfo = NULL;
235,276✔
1160
  }
1161

1162
end:
67,011✔
1163
  blockDataDestroy(pResBlock);
67,011✔
1164
  taosArrayDestroy(pBlockList);
67,011✔
1165
  taosArrayDestroyEx(pUidTagList, freeItem);
67,011✔
1166
  taosArrayDestroyP(groupData, releaseColInfoData);
67,011✔
1167
  taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
67,011✔
1168

1169
  if (code != TSDB_CODE_SUCCESS) {
67,011✔
1170
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
246✔
1171
  }
1172
}
1173

1174
int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo, uint8_t* digest,
3,954,408✔
1175
                                   SStorageAPI* pAPI, bool initRemainGroups, SHashObj* groupIdMap) {
1176
  int32_t      code = TSDB_CODE_SUCCESS;
3,954,408✔
1177
  int32_t      lino = 0;
3,954,408✔
1178
  SArray*      pBlockList = NULL;
3,954,408✔
1179
  SSDataBlock* pResBlock = NULL;
3,954,408✔
1180
  void*        keyBuf = NULL;
3,954,861✔
1181
  SArray*      groupData = NULL;
3,954,861✔
1182
  SArray*      pUidTagList = NULL;
3,954,861✔
1183
  SArray*      tableList = NULL;
3,954,861✔
1184
  SArray*      gInfo = NULL;
3,954,861✔
1185

1186
  int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
3,954,861✔
1187
  if (rows == 0) {
3,954,861✔
UNCOV
1188
    return TSDB_CODE_SUCCESS;
×
1189
  } 
1190

1191
  T_MD5_CTX context = {0};
3,954,861✔
1192
  if (tsTagFilterCache && groupIdMap == NULL) {
3,954,861✔
1193
    SNodeListNode* listNode = NULL;
×
1194
    code = nodesMakeNode(QUERY_NODE_NODE_LIST, (SNode**)&listNode);
×
1195
    if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1196
      goto end;
×
1197
    }
1198
    listNode->pNodeList = group;
×
1199
    code = genTbGroupDigest((SNode*)listNode, digest, &context);
×
UNCOV
1200
    QUERY_CHECK_CODE(code, lino, end);
×
1201

UNCOV
1202
    nodesFree(listNode);
×
1203

UNCOV
1204
    code = pAPI->metaFn.metaGetCachedTbGroup(pVnode, pTableListInfo->idInfo.suid, context.digest,
×
1205
                                             tListLen(context.digest), &tableList);
UNCOV
1206
    QUERY_CHECK_CODE(code, lino, end);
×
1207

1208
    if (tableList) {
×
1209
      taosArrayDestroy(pTableListInfo->pTableList);
×
1210
      pTableListInfo->pTableList = tableList;
×
UNCOV
1211
      qDebug("retrieve tb group list from cache, numOfTables:%d",
×
1212
             (int32_t)taosArrayGetSize(pTableListInfo->pTableList));
UNCOV
1213
      goto end;
×
1214
    }
1215
  }
1216

1217
  pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
3,954,861✔
1218
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
3,954,861✔
1219

1220
  for (int32_t i = 0; i < rows; ++i) {
26,231,271✔
1221
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
22,275,908✔
1222
    QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
22,275,908✔
1223
    STUidTagInfo info = {.uid = pkeyInfo->uid};
22,275,908✔
1224
    void*        tmp = taosArrayPush(pUidTagList, &info);
22,276,863✔
1225
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
22,276,863✔
1226
  }
1227

1228
  if (taosArrayGetSize(pUidTagList) > 0) {
3,955,363✔
1229
    code = pAPI->metaFn.getTableTagsByUid(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
3,954,861✔
1230
  } else {
UNCOV
1231
    code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
×
1232
  }
1233
  if (code != TSDB_CODE_SUCCESS) {
3,954,861✔
UNCOV
1234
    goto end;
×
1235
  }
1236

1237
  SArray* pColList = NULL;
3,954,861✔
1238
  code = qGetColumnsFromNodeList(group, true, &pColList); 
3,954,861✔
1239
  if (code != TSDB_CODE_SUCCESS) {
3,954,359✔
UNCOV
1240
    goto end;
×
1241
  }
1242

1243
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
3,954,359✔
1244
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
3,954,359✔
1245
  taosArrayDestroy(pColList);
3,954,861✔
1246
  if (pResBlock == NULL) {
3,954,162✔
1247
    code = terrno;
×
UNCOV
1248
    goto end;
×
1249
  }
1250

1251
  //  int64_t st1 = taosGetTimestampUs();
1252
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
1253

1254
  pBlockList = taosArrayInit(2, POINTER_BYTES);
3,954,162✔
1255
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
3,954,696✔
1256

1257
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
3,954,861✔
1258
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
3,954,861✔
1259

1260
  groupData = taosArrayInit(2, POINTER_BYTES);
3,954,861✔
1261
  QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
3,954,861✔
1262

1263
  SNode* pNode = NULL;
3,954,861✔
1264
  FOREACH(pNode, group) {
8,042,487✔
1265
    SScalarParam output = {0};
4,089,354✔
1266

1267
    switch (nodeType(pNode)) {
4,089,354✔
1268
      case QUERY_NODE_VALUE:
×
UNCOV
1269
        break;
×
1270
      case QUERY_NODE_COLUMN:
4,089,000✔
1271
      case QUERY_NODE_OPERATOR:
1272
      case QUERY_NODE_FUNCTION: {
1273
        SExprNode* expNode = (SExprNode*)pNode;
4,089,000✔
1274
        code = createResultData(&expNode->resType, rows, &output);
4,089,000✔
1275
        if (code != TSDB_CODE_SUCCESS) {
4,089,138✔
UNCOV
1276
          goto end;
×
1277
        }
1278
        break;
4,089,138✔
1279
      }
UNCOV
1280
      case QUERY_NODE_REMOTE_VALUE: {
×
UNCOV
1281
        SRemoteValueNode* pRemote = (SRemoteValueNode*)pNode;
×
UNCOV
1282
        code = qFetchRemoteNode(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, pNode);
×
UNCOV
1283
        QUERY_CHECK_CODE(code, lino, end);
×
UNCOV
1284
        break;
×
1285
      }
1286
      
1287
      default:
×
1288
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
UNCOV
1289
        goto end;
×
1290
    }
1291

1292
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
4,089,138✔
1293
      SColumnNode*     pSColumnNode = (SColumnNode*)pNode;
4,071,348✔
1294
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
4,071,348✔
1295
      QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
4,071,111✔
1296
      code = colDataAssign(output.columnData, pColInfo, rows, NULL);
4,071,111✔
1297
    } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
17,970✔
UNCOV
1298
      continue;
×
1299
    } else {
1300
      gTaskScalarExtra.pStreamInfo = NULL;
17,790✔
1301
      gTaskScalarExtra.pStreamRange = NULL;
17,790✔
1302
      code = scalarCalculate(pNode, pBlockList, &output, &gTaskScalarExtra);
17,790✔
1303
    }
1304

1305
    if (code != TSDB_CODE_SUCCESS) {
4,088,581✔
1306
      releaseColInfoData(output.columnData);
×
UNCOV
1307
      goto end;
×
1308
    }
1309

1310
    void* tmp = taosArrayPush(groupData, &output.columnData);
4,088,079✔
1311
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
4,088,079✔
1312
  }
1313

1314
  int32_t keyLen = 0;
3,953,906✔
1315
  SNode*  node;
1316
  FOREACH(node, group) {
8,042,326✔
1317
    SExprNode* pExpr = (SExprNode*)node;
4,087,969✔
1318
    keyLen += pExpr->resType.bytes;
4,087,969✔
1319
  }
1320

1321
  int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
3,952,309✔
1322
  keyLen += nullFlagSize;
3,952,309✔
1323

1324
  keyBuf = taosMemoryCalloc(1, keyLen);
3,952,309✔
1325
  if (keyBuf == NULL) {
3,953,906✔
1326
    code = terrno;
×
UNCOV
1327
    goto end;
×
1328
  }
1329

1330
  if (initRemainGroups) {
3,953,906✔
1331
    pTableListInfo->remainGroups =
1,921,117✔
1332
        taosHashInit(rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
1,920,664✔
1333
    if (pTableListInfo->remainGroups == NULL) {
1,921,117✔
1334
      code = terrno;
×
UNCOV
1335
      goto end;
×
1336
    }
1337
  }
1338

1339
  for (int i = 0; i < rows; i++) {
26,228,615✔
1340
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
22,273,374✔
1341
    QUERY_CHECK_NULL(info, code, lino, end, terrno);
22,271,437✔
1342

1343
    if (groupIdMap != NULL){
22,271,437✔
1344
      gInfo = taosArrayInit(taosArrayGetSize(groupData), sizeof(SStreamGroupValue));
62,419✔
1345
    }
1346
    
1347
    char* isNull = (char*)keyBuf;
22,272,955✔
1348
    char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(group);
22,272,955✔
1349
    for (int j = 0; j < taosArrayGetSize(groupData); j++) {
45,629,400✔
1350
      SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
23,354,740✔
1351

1352
      if (groupIdMap != NULL && gInfo != NULL) {
23,354,307✔
1353
        int32_t ret = buildGroupInfo(pValue, i, gInfo);
74,303✔
1354
        if (ret != TSDB_CODE_SUCCESS) {
74,519✔
1355
          qError("buildGroupInfo failed at line %d since %s", __LINE__, tstrerror(ret));
×
1356
          taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
×
UNCOV
1357
          gInfo = NULL;
×
1358
        }
1359
      }
1360
      
1361
      if (colDataIsNull_s(pValue, i)) {
46,711,000✔
1362
        isNull[j] = 1;
96,625✔
1363
      } else {
1364
        isNull[j] = 0;
23,259,852✔
1365
        char* data = colDataGetData(pValue, i);
23,259,399✔
1366
        if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
23,259,493✔
1367
          // if (tTagIsJson(data)) {
1368
          //   code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
1369
          //   goto end;
1370
          // }
1371
          if (tTagIsJsonNull(data)) {
73,187✔
1372
            isNull[j] = 1;
×
UNCOV
1373
            continue;
×
1374
          }
1375
          int32_t len = getJsonValueLen(data);
73,187✔
1376
          memcpy(pStart, data, len);
73,187✔
1377
          pStart += len;
73,187✔
1378
        } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
23,186,393✔
1379
          if (IS_STR_DATA_BLOB(pValue->info.type)) {
20,331,569✔
1380
            if (blobDataTLen(data) > TSDB_MAX_BLOB_LEN) {
1,506✔
1381
              code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
UNCOV
1382
              goto end;
×
1383
            }
1384
            memcpy(pStart, data, blobDataTLen(data));
×
UNCOV
1385
            pStart += blobDataTLen(data);
×
1386
          } else {
1387
            if (varDataTLen(data) > pValue->info.bytes) {
20,329,492✔
1388
              code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
UNCOV
1389
              goto end;
×
1390
            }
1391
            memcpy(pStart, data, varDataTLen(data));
20,330,949✔
1392
            pStart += varDataTLen(data);
20,330,949✔
1393
          }
1394
        } else {
1395
          memcpy(pStart, data, pValue->info.bytes);
2,858,173✔
1396
          pStart += pValue->info.bytes;
2,857,720✔
1397
        }
1398
      }
1399
    }
1400

1401
    int32_t len = (int32_t)(pStart - (char*)keyBuf);
22,271,563✔
1402
    info->groupId = calcGroupId(keyBuf, len);
22,271,563✔
1403
    if (groupIdMap != NULL && gInfo != NULL) {
22,275,908✔
1404
      int32_t ret = taosHashPut(groupIdMap, &info->groupId, sizeof(info->groupId), &gInfo, POINTER_BYTES);
62,599✔
1405
      if (ret != TSDB_CODE_SUCCESS) {
62,599✔
1406
        qError("put groupid to map failed at line %d since %s", __LINE__, tstrerror(ret));
×
UNCOV
1407
        taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
×
1408
      }
1409
      qDebug("put groupid to map gid:%" PRIu64, info->groupId);
62,599✔
1410
      gInfo = NULL;
62,599✔
1411
    }
1412
    if (initRemainGroups) {
22,275,908✔
1413
      // groupId ~ table uid
1414
      code = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid),
10,961,004✔
1415
                         sizeof(info->uid));
1416
      if (code == TSDB_CODE_DUP_KEY) {
10,959,352✔
1417
        code = TSDB_CODE_SUCCESS;
741,539✔
1418
      }
1419
      QUERY_CHECK_CODE(code, lino, end);
10,959,352✔
1420
    }
1421
  }
1422

1423
  if (tsTagFilterCache && groupIdMap == NULL) {
3,955,241✔
1424
    tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
×
UNCOV
1425
    QUERY_CHECK_NULL(tableList, code, lino, end, terrno);
×
1426

UNCOV
1427
    code = pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest,
×
1428
                                              tListLen(context.digest), tableList,
1429
                                              taosArrayGetSize(tableList) * sizeof(STableKeyInfo));
×
UNCOV
1430
    QUERY_CHECK_CODE(code, lino, end);
×
1431
  }
1432

1433
  //  int64_t st2 = taosGetTimestampUs();
1434
  //  qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
1435

1436
end:
3,954,858✔
1437
  taosMemoryFreeClear(keyBuf);
3,954,861✔
1438
  blockDataDestroy(pResBlock);
3,954,327✔
1439
  taosArrayDestroy(pBlockList);
3,953,743✔
1440
  taosArrayDestroyEx(pUidTagList, freeItem);
3,953,209✔
1441
  taosArrayDestroyP(groupData, releaseColInfoData);
3,954,327✔
1442
  taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
3,953,258✔
1443

1444
  if (code != TSDB_CODE_SUCCESS) {
3,953,874✔
UNCOV
1445
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1446
  }
1447
  return code;
3,953,258✔
1448
}
1449

1450
static int32_t nameComparFn(const void* p1, const void* p2) {
1,580,170✔
1451
  const char* pName1 = *(const char**)p1;
1,580,170✔
1452
  const char* pName2 = *(const char**)p2;
1,580,170✔
1453

1454
  int32_t ret = strcmp(pName1, pName2);
1,580,594✔
1455
  if (ret == 0) {
1,580,594✔
1456
    return 0;
15,684✔
1457
  } else {
1458
    return (ret > 0) ? 1 : -1;
1,564,910✔
1459
  }
1460
}
1461

1462
static SArray* getTableNameList(const SNodeListNode* pList) {
817,369✔
1463
  int32_t    code = TSDB_CODE_SUCCESS;
817,369✔
1464
  int32_t    lino = 0;
817,369✔
1465
  int32_t    len = LIST_LENGTH(pList->pNodeList);
817,369✔
1466
  SListCell* cell = pList->pNodeList->pHead;
817,369✔
1467

1468
  SArray* pTbList = taosArrayInit(len, POINTER_BYTES);
817,369✔
1469
  QUERY_CHECK_NULL(pTbList, code, lino, _end, terrno);
817,369✔
1470

1471
  for (int i = 0; i < pList->pNodeList->length; i++) {
2,371,112✔
1472
    SValueNode* valueNode = (SValueNode*)cell->pNode;
1,553,345✔
1473
    if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
1,553,743✔
1474
      terrno = TSDB_CODE_INVALID_PARA;
×
1475
      taosArrayDestroy(pTbList);
×
UNCOV
1476
      return NULL;
×
1477
    }
1478

1479
    char* name = varDataVal(valueNode->datum.p);
1,553,743✔
1480
    void* tmp = taosArrayPush(pTbList, &name);
1,553,743✔
1481
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,553,743✔
1482
    cell = cell->pNext;
1,553,743✔
1483
  }
1484

1485
  size_t numOfTables = taosArrayGetSize(pTbList);
817,369✔
1486

1487
  // order the name
1488
  taosArraySort(pTbList, nameComparFn);
817,369✔
1489

1490
  // remove the duplicates
1491
  SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
816,945✔
1492
  QUERY_CHECK_NULL(pNewList, code, lino, _end, terrno);
817,369✔
1493
  void* tmpTbl = taosArrayGet(pTbList, 0);
817,369✔
1494
  QUERY_CHECK_NULL(tmpTbl, code, lino, _end, terrno);
816,945✔
1495
  void* tmp = taosArrayPush(pNewList, tmpTbl);
817,369✔
1496
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
817,369✔
1497

1498
  for (int32_t i = 1; i < numOfTables; ++i) {
1,553,743✔
1499
    char** name = taosArrayGetLast(pNewList);
735,950✔
1500
    char** nameInOldList = taosArrayGet(pTbList, i);
735,526✔
1501
    QUERY_CHECK_NULL(nameInOldList, code, lino, _end, terrno);
735,950✔
1502
    if (strcmp(*name, *nameInOldList) == 0) {
735,950✔
1503
      continue;
8,456✔
1504
    }
1505

1506
    tmp = taosArrayPush(pNewList, nameInOldList);
727,918✔
1507
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
727,918✔
1508
  }
1509

1510
_end:
817,793✔
1511
  taosArrayDestroy(pTbList);
817,793✔
1512
  if (code != TSDB_CODE_SUCCESS) {
816,945✔
1513
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1514
    return NULL;
×
1515
  }
1516
  return pNewList;
816,945✔
1517
}
1518

1519
static int tableUidCompare(const void* a, const void* b) {
×
1520
  uint64_t u1 = *(uint64_t*)a;
×
UNCOV
1521
  uint64_t u2 = *(uint64_t*)b;
×
1522

1523
  if (u1 == u2) {
×
UNCOV
1524
    return 0;
×
1525
  }
1526

UNCOV
1527
  return u1 < u2 ? -1 : 1;
×
1528
}
1529

1530
static int32_t filterTableInfoCompare(const void* a, const void* b) {
14,887,711✔
1531
  STUidTagInfo* p1 = (STUidTagInfo*)a;
14,887,711✔
1532
  STUidTagInfo* p2 = (STUidTagInfo*)b;
14,887,711✔
1533

1534
  if (p1->uid == p2->uid) {
14,887,711✔
UNCOV
1535
    return 0;
×
1536
  }
1537

1538
  return p1->uid < p2->uid ? -1 : 1;
14,829,711✔
1539
}
1540

1541
static FilterCondType checkTagCond(SNode* cond) {
13,340,503✔
1542
  if (nodeType(cond) == QUERY_NODE_OPERATOR) {
13,340,503✔
1543
    return FILTER_NO_LOGIC;
9,691,960✔
1544
  }
1545
  if (nodeType(cond) == QUERY_NODE_LOGIC_CONDITION && ((SLogicConditionNode*)cond)->condType == LOGIC_COND_TYPE_AND) {
3,650,372✔
1546
    return FILTER_AND;
3,069,490✔
1547
  }
1548
  return FILTER_OTHER;
570,360✔
1549
}
1550

1551
static bool optimizeTbnameInCond(void* pVnode, int64_t suid, SArray* list, SNode* cond, SStorageAPI* pAPI) {
14,066,426✔
1552
  int32_t code = 0;
14,066,426✔
1553
  int32_t ntype = nodeType(cond);
14,066,426✔
1554

1555
  if (ntype == QUERY_NODE_OPERATOR) {
14,070,170✔
1556
    code = optimizeTbnameInCondImpl(pVnode, list, cond, pAPI, suid);
10,413,372✔
1557
    return code == 0;
10,401,188✔
1558
  }
1559
  if (ntype != QUERY_NODE_LOGIC_CONDITION || ((SLogicConditionNode*)cond)->condType != LOGIC_COND_TYPE_AND) {
3,656,798✔
1560
    return false;
573,360✔
1561
  }
1562

1563
  bool                 hasTbnameCond = false;
3,083,024✔
1564
  SLogicConditionNode* pNode = (SLogicConditionNode*)cond;
3,083,024✔
1565
  SNodeList*           pList = (SNodeList*)pNode->pParameterList;
3,083,024✔
1566

1567
  int32_t len = LIST_LENGTH(pList);
3,080,102✔
1568
  if (len <= 0) {
3,080,943✔
UNCOV
1569
    return false;
×
1570
  }
1571

1572
  SListCell* cell = pList->pHead;
3,080,943✔
1573
  for (int i = 0; i < len; i++) {
9,606,638✔
1574
    if (cell == NULL) break;
6,524,622✔
1575
    if (optimizeTbnameInCondImpl(pVnode, list, cell->pNode, pAPI, suid) == 0) {
6,524,622✔
1576
      hasTbnameCond = true;
7,500✔
1577
      break;
7,500✔
1578
    }
1579
    cell = cell->pNext;
6,521,644✔
1580
  }
1581

1582
  taosArraySort(list, filterTableInfoCompare);
3,089,516✔
1583
  taosArrayRemoveDuplicate(list, filterTableInfoCompare, NULL);
3,081,048✔
1584

1585
  if (hasTbnameCond) {
3,080,885✔
1586
    code = pAPI->metaFn.getTableTagsByUid(pVnode, suid, list);
7,500✔
1587
    return code == 0;
7,500✔
1588
  }
1589

1590
  return false;
3,073,385✔
1591
}
1592

1593
// only return uid that does not contained in pExistedUidList
1594
static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, SNode* pTagCond, SStorageAPI* pStoreAPI,
16,944,324✔
1595
                                        uint64_t suid) {
1596
  if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) {
16,944,324✔
1597
    return -1;
5,038✔
1598
  }
1599

1600
  SOperatorNode* pNode = (SOperatorNode*)pTagCond;
16,938,250✔
1601
  if (pNode->opType != OP_TYPE_IN) {
16,938,250✔
1602
    return -1;
14,728,910✔
1603
  }
1604

1605
  if ((pNode->pLeft != NULL && ((nodeType(pNode->pLeft) == QUERY_NODE_FUNCTION &&
2,212,104✔
1606
                                 ((SFunctionNode*)pNode->pLeft)->funcType == FUNCTION_TYPE_TBNAME)) ||
819,361✔
1607
       (nodeType(pNode->pLeft) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pLeft)->colType == COLUMN_TYPE_TBNAME)) &&
1,392,863✔
1608
      (pNode->pRight != NULL && nodeType(pNode->pRight) == QUERY_NODE_NODE_LIST)) {
818,147✔
1609
    SNodeListNode* pList = (SNodeListNode*)pNode->pRight;
817,369✔
1610

1611
    int32_t len = LIST_LENGTH(pList->pNodeList);
817,369✔
1612
    if (len <= 0) {
817,369✔
UNCOV
1613
      return -1;
×
1614
    }
1615

1616
    SArray*   pTbList = getTableNameList(pList);
817,369✔
1617
    int32_t   numOfTables = taosArrayGetSize(pTbList);
816,945✔
1618
    SHashObj* uHash = NULL;
816,945✔
1619

1620
    size_t numOfExisted = taosArrayGetSize(pExistedUidList);  // len > 0 means there already have uids
816,945✔
1621
    if (numOfExisted > 0) {
816,945✔
1622
      uHash = taosHashInit(numOfExisted / 0.7, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
2,000✔
1623
      if (!uHash) {
2,000✔
1624
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
1625
        return terrno;
×
1626
      }
1627

1628
      for (int i = 0; i < numOfExisted; i++) {
1,982,500✔
1629
        STUidTagInfo* pTInfo = taosArrayGet(pExistedUidList, i);
1,981,500✔
1630
        if (!pTInfo) {
1,980,000✔
1631
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
1632
          return terrno;
×
1633
        }
1634
        int32_t tempRes = taosHashPut(uHash, &pTInfo->uid, sizeof(uint64_t), &i, sizeof(i));
1,980,000✔
1635
        if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
1,980,500✔
1636
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
UNCOV
1637
          return tempRes;
×
1638
        }
1639
      }
1640
    }
1641

1642
    for (int i = 0; i < numOfTables; i++) {
2,181,764✔
1643
      char* name = taosArrayGetP(pTbList, i);
1,453,515✔
1644

1645
      uint64_t uid = 0, csuid = 0;
1,453,515✔
1646
      if (pStoreAPI->metaFn.getTableUidByName(pVnode, name, &uid) == 0) {
1,453,541✔
1647
        ETableType tbType = TSDB_TABLE_MAX;
442,046✔
1648
        if (pStoreAPI->metaFn.getTableTypeSuidByName(pVnode, name, &tbType, &csuid) == 0 &&
442,046✔
1649
            tbType == TSDB_CHILD_TABLE) {
442,470✔
1650
          if (suid != csuid) {
353,350✔
1651
            continue;
2,000✔
1652
          }
1653
          if (NULL == uHash || taosHashGet(uHash, &uid, sizeof(uid)) == NULL) {
351,350✔
1654
            STUidTagInfo s = {.uid = uid, .name = name, .pTagVal = NULL};
350,350✔
1655
            void*        tmp = taosArrayPush(pExistedUidList, &s);
350,350✔
1656
            if (!tmp) {
350,350✔
UNCOV
1657
              return terrno;
×
1658
            }
1659
          }
1660
        } else {
1661
          taosArrayDestroy(pTbList);
89,120✔
1662
          taosHashCleanup(uHash);
89,120✔
1663
          return -1;
89,120✔
1664
        }
1665
      } else {
1666
        //        qWarn("failed to get tableIds from by table name: %s, reason: %s", name, tstrerror(terrno));
1667
        terrno = 0;
1,011,469✔
1668
      }
1669
    }
1670

1671
    taosHashCleanup(uHash);
728,249✔
1672
    taosArrayDestroy(pTbList);
728,249✔
1673
    return 0;
728,249✔
1674
  }
1675

1676
  return -1;
1,395,225✔
1677
}
1678

1679
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
17,113,141✔
1680
                                        SStorageAPI* pStorageAPI) {
1681
  int32_t      code = TSDB_CODE_SUCCESS;
17,113,141✔
1682
  int32_t      lino = 0;
17,113,141✔
1683
  SSDataBlock* pResBlock = NULL;
17,113,141✔
1684
  code = createDataBlock(&pResBlock);
17,122,890✔
1685
  QUERY_CHECK_CODE(code, lino, _end);
17,111,442✔
1686

1687
  for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
35,593,034✔
1688
    SColumnInfoData colInfo = {0};
18,480,118✔
1689
    void*           tmp = taosArrayGet(pColList, i);
18,476,617✔
1690
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
18,463,297✔
1691
    colInfo.info = *(SColumnInfo*)tmp;
18,463,297✔
1692
    code = blockDataAppendColInfo(pResBlock, &colInfo);
18,452,658✔
1693
    QUERY_CHECK_CODE(code, lino, _end);
18,480,285✔
1694
  }
1695

1696
  code = blockDataEnsureCapacity(pResBlock, numOfTables);
17,111,581✔
1697
  if (code != TSDB_CODE_SUCCESS) {
17,094,839✔
1698
    terrno = code;
×
1699
    blockDataDestroy(pResBlock);
×
UNCOV
1700
    return NULL;
×
1701
  }
1702

1703
  pResBlock->info.rows = numOfTables;
17,094,839✔
1704

1705
  int32_t numOfCols = taosArrayGetSize(pResBlock->pDataBlock);
17,114,391✔
1706

1707
  for (int32_t i = 0; i < numOfTables; i++) {
205,686,268✔
1708
    STUidTagInfo* p1 = taosArrayGet(pUidTagList, i);
188,550,658✔
1709
    QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
188,473,234✔
1710

1711
    for (int32_t j = 0; j < numOfCols; j++) {
385,555,540✔
1712
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
197,000,260✔
1713
      QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
196,939,748✔
1714

1715
      if (pColInfo->info.colId == -1) {  // tbname
196,939,748✔
1716
        char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
7,227,713✔
1717
        if (p1->name != NULL) {
7,228,344✔
1718
          STR_TO_VARSTR(str, p1->name);
349,925✔
1719
        } else {  // name is not retrieved during filter
1720
          code = pStorageAPI->metaFn.getTableNameByUid(pVnode, p1->uid, str);
6,883,918✔
1721
          QUERY_CHECK_CODE(code, lino, _end);
6,883,044✔
1722
        }
1723

1724
        code = colDataSetVal(pColInfo, i, str, false);
7,232,299✔
1725
        QUERY_CHECK_CODE(code, lino, _end);
7,229,903✔
1726
#if TAG_FILTER_DEBUG
1727
        qDebug("tagfilter uid:%ld, tbname:%s", *uid, str + 2);
1728
#endif
1729
      } else {
1730
        STagVal tagVal = {0};
189,616,425✔
1731
        tagVal.cid = pColInfo->info.colId;
189,625,752✔
1732
        if (p1->pTagVal == NULL) {
189,745,918✔
1733
          colDataSetNULL(pColInfo, i);
4,125✔
1734
        } else {
1735
          const char* p = pStorageAPI->metaFn.extractTagVal(p1->pTagVal, pColInfo->info.type, &tagVal);
189,764,689✔
1736

1737
          if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) {
189,808,548✔
1738
            colDataSetNULL(pColInfo, i);
2,121,050✔
1739
          } else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
187,765,782✔
1740
            code = colDataSetVal(pColInfo, i, p, false);
578,611✔
1741
            QUERY_CHECK_CODE(code, lino, _end);
578,611✔
1742
          } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
314,279,416✔
1743
            if (IS_STR_DATA_BLOB(pColInfo->info.type)) {
127,344,381✔
UNCOV
1744
              QUERY_CHECK_CODE(code = TSDB_CODE_BLOB_NOT_SUPPORT_TAG, lino, _end);
×
1745
            }
1746
            char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
127,316,564✔
1747
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
127,256,760✔
1748
            varDataSetLen(tmp, tagVal.nData);
127,256,760✔
1749
            memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
127,163,771✔
1750
            code = colDataSetVal(pColInfo, i, tmp, false);
127,165,578✔
1751
#if TAG_FILTER_DEBUG
1752
            qDebug("tagfilter varch:%s", tmp + 2);
1753
#endif
1754
            taosMemoryFree(tmp);
127,279,902✔
1755
            QUERY_CHECK_CODE(code, lino, _end);
127,103,773✔
1756
          } else {
1757
            code = colDataSetVal(pColInfo, i, (const char*)&tagVal.i64, false);
59,904,194✔
1758
            QUERY_CHECK_CODE(code, lino, _end);
59,926,821✔
1759
#if TAG_FILTER_DEBUG
1760
            if (pColInfo->info.type == TSDB_DATA_TYPE_INT) {
1761
              qDebug("tagfilter int:%d", *(int*)(&tagVal.i64));
1762
            } else if (pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE) {
1763
              qDebug("tagfilter double:%f", *(double*)(&tagVal.i64));
1764
            }
1765
#endif
1766
          }
1767
        }
1768
      }
1769
    }
1770
  }
1771

1772
_end:
17,096,692✔
1773
  if (code != TSDB_CODE_SUCCESS) {
17,136,203✔
1774
    blockDataDestroy(pResBlock);
5,004✔
1775
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
246✔
1776
    terrno = code;
246✔
1777
    return NULL;
246✔
1778
  }
1779
  return pResBlock;
17,131,199✔
1780
}
1781

1782
static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList,
11,498,104✔
1783
                                 bool* pResultList, bool addUid) {
1784
  taosArrayClear(pUidList);
11,498,104✔
1785

1786
  STableKeyInfo info = {.uid = 0, .groupId = 0};
11,500,403✔
1787
  int32_t       numOfTables = taosArrayGetSize(pUidTagList);
11,506,109✔
1788
  for (int32_t i = 0; i < numOfTables; ++i) {
175,436,071✔
1789
    if (pResultList[i]) {
163,898,215✔
1790
      STUidTagInfo* tmpTag = (STUidTagInfo*)taosArrayGet(pUidTagList, i);
74,728,900✔
1791
      if (!tmpTag) {
74,726,513✔
1792
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
1793
        return terrno;
×
1794
      }
1795
      uint64_t uid = tmpTag->uid;
74,726,513✔
1796
      qDebug("tagfilter get uid:%" PRId64 ", res:%d", uid, pResultList[i]);
74,726,811✔
1797

1798
      info.uid = uid;
74,774,903✔
1799
      //qInfo("doSetQualifiedUid row:%d added to pTableList", i);
1800
      void* p = taosArrayPush(pListInfo->pTableList, &info);
74,774,903✔
1801
      if (p == NULL) {
74,767,722✔
UNCOV
1802
        return terrno;
×
1803
      }
1804

1805
      if (addUid) {
74,767,722✔
1806
        //qInfo("doSetQualifiedUid row:%d added to pUidList", i);
1807
        void* tmp = taosArrayPush(pUidList, &uid);
9,783✔
1808
        if (tmp == NULL) {
9,783✔
UNCOV
1809
          return terrno;
×
1810
        }
1811
      }
1812
    } else {
1813
      //qInfo("doSetQualifiedUid row:%d failed", i);
1814
    }
1815
  }
1816

1817
  return TSDB_CODE_SUCCESS;
11,537,856✔
1818
}
1819

1820
static int32_t copyExistedUids(SArray* pUidTagList, const SArray* pUidList) {
14,067,918✔
1821
  int32_t code = TSDB_CODE_SUCCESS;
14,067,918✔
1822
  int32_t numOfExisted = taosArrayGetSize(pUidList);
14,067,918✔
1823
  if (numOfExisted == 0) {
14,062,618✔
1824
    return code;
11,549,576✔
1825
  }
1826

1827
  for (int32_t i = 0; i < numOfExisted; ++i) {
28,525,584✔
1828
    uint64_t* uid = taosArrayGet(pUidList, i);
26,012,042✔
1829
    if (!uid) {
26,017,186✔
1830
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
1831
      return terrno;
×
1832
    }
1833
    STUidTagInfo info = {.uid = *uid};
26,017,186✔
1834
    void*        tmp = taosArrayPush(pUidTagList, &info);
26,011,542✔
1835
    if (!tmp) {
26,011,542✔
1836
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
1837
      return code;
×
1838
    }
1839
  }
1840
  return code;
2,513,542✔
1841
}
1842

1843
int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SNode* pTagCond, void* pVnode,
99,791,865✔
1844
                                 SIdxFltStatus status, SStorageAPI* pAPI, bool addUid, bool* listAdded, void* pStreamInfo) {
1845
  *listAdded = false;
99,791,865✔
1846
  if (pTagCond == NULL) {
99,797,765✔
1847
    return TSDB_CODE_SUCCESS;
85,721,730✔
1848
  }
1849

1850
  terrno = TSDB_CODE_SUCCESS;
14,076,782✔
1851

1852
  int32_t      lino = 0;
14,064,317✔
1853
  int32_t      code = TSDB_CODE_SUCCESS;
14,064,317✔
1854
  SArray*      pBlockList = NULL;
14,064,317✔
1855
  SSDataBlock* pResBlock = NULL;
14,064,317✔
1856
  SScalarParam output = {0};
14,056,815✔
1857
  SArray*      pUidTagList = NULL;
14,057,961✔
1858

1859
  SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
14,057,961✔
1860

1861
  //  int64_t stt = taosGetTimestampUs();
1862
  pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
14,053,152✔
1863
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
14,056,143✔
1864

1865
  code = copyExistedUids(pUidTagList, pUidList);
14,056,143✔
1866
  QUERY_CHECK_CODE(code, lino, end);
14,054,938✔
1867

1868
  // Narrow down the scope of the tablelist set if there is tbname in condition and And Logical operator
1869
  bool narrowed = optimizeTbnameInCond(pVnode, pListInfo->idInfo.suid, pUidTagList, pTagCond, pAPI);
14,054,938✔
1870
  if (narrowed) {  // tbname in filter is activated, do nothing and return
14,051,988✔
1871
    taosArrayClear(pUidList);
728,249✔
1872

1873
    int32_t numOfRows = taosArrayGetSize(pUidTagList);
728,249✔
1874
    code = taosArrayEnsureCap(pUidList, numOfRows);
728,249✔
1875
    QUERY_CHECK_CODE(code, lino, end);
728,249✔
1876

1877
    for (int32_t i = 0; i < numOfRows; ++i) {
3,075,599✔
1878
      STUidTagInfo* pInfo = taosArrayGet(pUidTagList, i);
2,347,350✔
1879
      QUERY_CHECK_NULL(pInfo, code, lino, end, terrno);
2,347,350✔
1880
      void* tmp = taosArrayPush(pUidList, &pInfo->uid);
2,347,350✔
1881
      QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
2,347,350✔
1882
    }
1883
    terrno = 0;
728,249✔
1884
  } else {
1885
    qDebug("pUidTagList size:%d", (int32_t)taosArrayGetSize(pUidTagList));
13,323,739✔
1886

1887
    FilterCondType condType = checkTagCond(pTagCond);
13,326,770✔
1888
    if (((condType == FILTER_NO_LOGIC || condType == FILTER_AND) && status != SFLT_NOT_INDEX) || // (super table) use tagIndex and operator is and
13,336,298✔
1889
        (status == SFLT_NOT_INDEX && taosArrayGetSize(pUidTagList) > 0)) {                       // (child table with tagCond)
10,491,064✔
1890
      code = pAPI->metaFn.getTableTagsByUid(pVnode, pListInfo->idInfo.suid, pUidTagList);
3,096,089✔
1891
    } else {
1892
      taosArrayClear(pUidTagList);        // clear tablelist if using tagIndex and or condition
10,238,414✔
1893
      code = pAPI->metaFn.getTableTags(pVnode, pListInfo->idInfo.suid, pUidTagList);
10,239,586✔
1894
    }
1895
    if (code != TSDB_CODE_SUCCESS) {
13,331,833✔
1896
      qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code), pListInfo->idInfo.suid);
×
1897
      terrno = code;
×
UNCOV
1898
      QUERY_CHECK_CODE(code, lino, end);
×
1899
    }
1900
  }
1901

1902
  qDebug("final pUidTagList size:%d", (int32_t)taosArrayGetSize(pUidTagList));
14,060,082✔
1903

1904
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
14,066,775✔
1905
  if (numOfTables == 0) {
14,069,476✔
1906
    goto end;
2,524,774✔
1907
  }
1908

1909
  SArray* pColList = NULL;
11,544,702✔
1910
  code = qGetColumnsFromNodeList(pTagCond, false, &pColList); 
11,544,287✔
1911
  if (code != TSDB_CODE_SUCCESS) {
11,506,291✔
UNCOV
1912
    goto end;
×
1913
  }
1914
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
11,506,291✔
1915
  taosArrayDestroy(pColList);
11,530,904✔
1916
  if (pResBlock == NULL) {
11,522,782✔
1917
    code = terrno;
×
UNCOV
1918
    QUERY_CHECK_CODE(code, lino, end);
×
1919
  }
1920

1921
  //fprintDataBlock(pResBlock, "tagFilter", "", 0);
1922

1923
  //  int64_t st1 = taosGetTimestampUs();
1924
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
1925
  pBlockList = taosArrayInit(2, POINTER_BYTES);
11,522,782✔
1926
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
11,530,585✔
1927

1928
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
11,541,283✔
1929
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
11,541,283✔
1930

1931
  code = createResultData(&type, numOfTables, &output);
11,541,283✔
1932
  if (code != TSDB_CODE_SUCCESS) {
11,518,891✔
1933
    terrno = code;
×
UNCOV
1934
    QUERY_CHECK_CODE(code, lino, end);
×
1935
  }
1936

1937
  gTaskScalarExtra.pStreamInfo = pStreamInfo;
11,518,891✔
1938
  gTaskScalarExtra.pStreamRange = NULL;
11,518,891✔
1939
  code = scalarCalculate(pTagCond, pBlockList, &output, &gTaskScalarExtra);
11,494,120✔
1940
  if (code != TSDB_CODE_SUCCESS) {
11,498,468✔
1941
    qError("failed to calculate scalar, reason:%s", tstrerror(code));
898✔
1942
    terrno = code;
898✔
1943
    QUERY_CHECK_CODE(code, lino, end);
898✔
1944
  }
1945

1946
  code = doSetQualifiedUid(pListInfo, pUidList, pUidTagList, (bool*)output.columnData->pData, addUid);
11,497,570✔
1947
  if (code != TSDB_CODE_SUCCESS) {
11,538,609✔
1948
    terrno = code;
×
UNCOV
1949
    QUERY_CHECK_CODE(code, lino, end);
×
1950
  }
1951
  *listAdded = true;
11,538,609✔
1952

1953
end:
14,061,594✔
1954
  if (code != TSDB_CODE_SUCCESS) {
14,068,327✔
1955
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
898✔
1956
  }
1957
  blockDataDestroy(pResBlock);
14,068,327✔
1958
  taosArrayDestroy(pBlockList);
14,025,468✔
1959
  taosArrayDestroyEx(pUidTagList, freeItem);
14,018,530✔
1960

1961
  colDataDestroy(output.columnData);
14,056,195✔
1962
  taosMemoryFreeClear(output.columnData);
14,053,432✔
1963
  return code;
14,048,427✔
1964
}
1965

1966
typedef struct {
1967
  int32_t code;
1968
  SStreamRuntimeFuncInfo* pStreamRuntimeInfo;
1969
} PlaceHolderContext;
1970

1971
static EDealRes replacePlaceHolderColumn(SNode** pNode, void* pContext) {
76,464✔
1972
  PlaceHolderContext* pData = (PlaceHolderContext*)pContext;
76,464✔
1973
  if (QUERY_NODE_FUNCTION != nodeType((*pNode))) {
76,464✔
1974
    return DEAL_RES_CONTINUE;
63,024✔
1975
  }
1976
  SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
13,440✔
1977
  if (!fmIsStreamPesudoColVal(pFuncNode->funcId)) {
13,440✔
1978
    return DEAL_RES_CONTINUE;
492✔
1979
  }
1980
  pData->code = fmSetStreamPseudoFuncParamVal(pFuncNode->funcId, pFuncNode->pParameterList, pData->pStreamRuntimeInfo);
12,948✔
1981
  if (pData->code != TSDB_CODE_SUCCESS) {
12,948✔
UNCOV
1982
    return DEAL_RES_ERROR;
×
1983
  }
1984
  SNode* pFirstParam = nodesListGetNode(pFuncNode->pParameterList, 0);
12,948✔
1985
  ((SValueNode*)pFirstParam)->translate = true;
12,783✔
1986
  SValueNode* res = NULL;
12,783✔
1987
  pData->code = nodesCloneNode(pFirstParam, (SNode**)&res);
12,948✔
1988
  if (NULL == res) {
12,948✔
UNCOV
1989
    return DEAL_RES_ERROR;
×
1990
  }
1991
  nodesDestroyNode(*pNode);
12,948✔
1992
  *pNode = (SNode*)res;
12,948✔
1993

1994
  return DEAL_RES_CONTINUE;
12,948✔
1995
}
1996

1997
static void extractTagColId(SOperatorNode* pOpNode, SArray* pColIdArray) {
20,790✔
1998
  SNode* pLeft = pOpNode->pLeft;
20,790✔
1999
  SNode* pRight = pOpNode->pRight;
20,790✔
2000
  SColumnNode* pColNode = nodeType(pLeft) == QUERY_NODE_COLUMN ?
20,790✔
2001
    (SColumnNode*)pLeft : (SColumnNode*)pRight;
20,790✔
2002

2003
  col_id_t colId = pColNode->colId;
20,790✔
2004
  void* _tmp = taosArrayPush(pColIdArray, &colId);
20,790✔
2005
}
20,790✔
2006

2007
static int32_t buildTagCondKey(
10,395✔
2008
  const SNode* pTagCond, char** pTagCondKey,
2009
  int32_t* tagCondKeyLen, SArray** pTagColIds) {
2010
  if (NULL == pTagCond ||
10,395✔
2011
    (nodeType(pTagCond) != QUERY_NODE_OPERATOR &&
10,395✔
2012
      nodeType(pTagCond) != QUERY_NODE_LOGIC_CONDITION)) {
10,395✔
2013
    qError("invalid parameter to extract tag filter symbol");
×
UNCOV
2014
    return TSDB_CODE_INTERNAL_ERROR;
×
2015
  }
2016
  int32_t code = TSDB_CODE_SUCCESS;
10,395✔
2017
  int32_t lino = 0;
10,395✔
2018
  *pTagColIds = taosArrayInit(4, sizeof(col_id_t));
10,395✔
2019

2020
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR) {
10,395✔
UNCOV
2021
    extractTagColId((SOperatorNode*)pTagCond, *pTagColIds);
×
2022
  } else if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION) {
10,395✔
2023
    SNode* pChild = NULL;
10,395✔
2024
    FOREACH(pChild, ((SLogicConditionNode*)pTagCond)->pParameterList) {
31,185✔
2025
      extractTagColId((SOperatorNode*)pChild, *pTagColIds);
20,790✔
2026
    }
2027
  }
2028

2029
  taosArraySort(*pTagColIds, compareUint16Val);
10,395✔
2030

2031
  // encode ordered colIds into key string, separated by ','
2032
  *tagCondKeyLen =
20,790✔
2033
    (int32_t)(taosArrayGetSize(*pTagColIds) * (sizeof(col_id_t) + 1) - 1);
10,395✔
2034
  *pTagCondKey = (char*)taosMemoryCalloc(1, *tagCondKeyLen);
10,395✔
2035
  TSDB_CHECK_NULL(*pTagCondKey, code, lino, _end, terrno);
10,395✔
2036
  char* pStart = *pTagCondKey;
10,395✔
2037
  for (int32_t i = 0; i < taosArrayGetSize(*pTagColIds); ++i) {
31,185✔
2038
    col_id_t* pColId = (col_id_t*)taosArrayGet(*pTagColIds, i);
20,790✔
2039
    TSDB_CHECK_NULL(pColId, code, lino, _end, terrno);
20,790✔
2040
    memcpy(pStart, pColId, sizeof(col_id_t));
20,790✔
2041
    pStart += sizeof(col_id_t);
20,790✔
2042
    if (i != taosArrayGetSize(*pTagColIds) - 1) {
20,790✔
2043
      *pStart = ',';
10,395✔
2044
      pStart += 1;
10,395✔
2045
    }
2046
  }
2047

2048
_end:
10,395✔
2049
  if (TSDB_CODE_SUCCESS != code) {
10,395✔
2050
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
2051
    terrno = code;
×
2052
  }
2053
  return code;
10,395✔
2054
}
2055

2056
static EDealRes canOptimizeTagCondFilter(SNode* pTagCond, void* pContext) {
96,690✔
2057
  if (NULL == pTagCond) {
96,690✔
2058
    *(bool*)pContext = false;
×
UNCOV
2059
    return DEAL_RES_END;
×
2060
  }
2061
  if (nodeType(pTagCond) == QUERY_NODE_VALUE ||
96,690✔
2062
    nodeType(pTagCond) == QUERY_NODE_COLUMN) {
64,185✔
2063
    return DEAL_RES_CONTINUE;
53,295✔
2064
  }
2065
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR &&
43,395✔
2066
    ((SOperatorNode*)pTagCond)->opType == OP_TYPE_EQUAL) {
21,285✔
2067
    return DEAL_RES_CONTINUE;
20,790✔
2068
  }
2069
  if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION &&
22,605✔
2070
    ((SLogicConditionNode*)pTagCond)->condType == LOGIC_COND_TYPE_AND) {
10,395✔
2071
    return DEAL_RES_CONTINUE;
10,395✔
2072
  }
2073
  if (nodeType(pTagCond) == QUERY_NODE_FUNCTION &&
23,925✔
2074
    fmIsStreamPesudoColVal(((SFunctionNode*)pTagCond)->funcId)) {
11,715✔
2075
    return DEAL_RES_CONTINUE;
11,715✔
2076
  }
2077
  *(bool*)pContext = false;
495✔
2078
  return DEAL_RES_END;
495✔
2079
}
2080

2081
int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
214,291,237✔
2082
                     STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI, void* pStreamInfo) {
2083
  int32_t code = TSDB_CODE_SUCCESS;
214,291,237✔
2084
  int32_t lino = 0;
214,291,237✔
2085
  size_t  numOfTables = 0;
214,291,237✔
2086
  bool    listAdded = false;
214,291,237✔
2087

2088
  pListInfo->idInfo.suid = pScanNode->suid;
214,302,434✔
2089
  pListInfo->idInfo.tableType = pScanNode->tableType;
214,267,615✔
2090

2091
  SArray* pUidList = taosArrayInit(8, sizeof(uint64_t));
214,051,416✔
2092
  QUERY_CHECK_NULL(pUidList, code, lino, _error, terrno);
214,048,671✔
2093

2094
  char*   pTagCondKey = NULL;
214,048,671✔
2095
  int32_t tagCondKeyLen;
175,723,373✔
2096
  SArray* pTagColIds = NULL;
214,298,193✔
2097
  char*   pPayload = NULL;
214,306,509✔
2098
  qTrace("getTableList called, suid:%" PRIu64
214,306,509✔
2099
    ", tagCond:%p, tagIndexCond:%p, %d %d", pScanNode->suid, pTagCond,
2100
    pTagIndexCond, pScanNode->tableType, pScanNode->virtualStableScan);
2101
  if (pScanNode->tableType != TSDB_SUPER_TABLE && !pScanNode->virtualStableScan) {
214,306,509✔
2102
    pListInfo->idInfo.uid = pScanNode->uid;
85,899,124✔
2103
    if (pStorageAPI->metaFn.isTableExisted(pVnode, pScanNode->uid)) {
85,857,324✔
2104
      void* tmp = taosArrayPush(pUidList, &pScanNode->uid);
85,914,278✔
2105
      QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
85,914,469✔
2106
    }
2107
    code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, SFLT_NOT_INDEX, pStorageAPI, false, &listAdded, pStreamInfo);
85,960,550✔
2108
    QUERY_CHECK_CODE(code, lino, _end);
85,946,526✔
2109
  } else {
2110
    bool      isStream = (pStreamInfo != NULL);
128,542,994✔
2111
    bool      hasTagCond = (pTagCond != NULL);
128,542,994✔
2112
    bool      canCacheTagEqCondFilter = false;
128,542,994✔
2113
    T_MD5_CTX context = {0};
128,431,369✔
2114

2115
    qTrace("start to get table list by tag filter, suid:%" PRIu64
128,536,801✔
2116
      ",tsStableTagFilterCache:%d, tsTagFilterCache:%d", 
2117
      pScanNode->suid, tsStableTagFilterCache, tsTagFilterCache);
2118

2119
    bool acquired = false;
128,536,801✔
2120
    // first, check whether we can use stable tag filter cache
2121
    if (tsStableTagFilterCache && isStream && hasTagCond) {
128,413,671✔
2122
      canCacheTagEqCondFilter = true;
10,890✔
2123
      nodesWalkExpr(pTagCond, canOptimizeTagCondFilter,
10,890✔
2124
        (void*)&canCacheTagEqCondFilter);
2125
    }
2126
    if (canCacheTagEqCondFilter) {
128,200,915✔
2127
      qDebug("%s, stable tag filter condition can be optimized", idstr);
10,395✔
2128
      if (((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
10,395✔
2129
        SNode* tmp = NULL;
10,395✔
2130
        code = nodesCloneNode((SNode*)pTagCond, &tmp);
10,395✔
2131
        QUERY_CHECK_CODE(code, lino, _error);
10,395✔
2132

2133
        PlaceHolderContext ctx = {.code = TSDB_CODE_SUCCESS, .pStreamRuntimeInfo = (SStreamRuntimeFuncInfo*)pStreamInfo};
10,395✔
2134
        nodesRewriteExpr(&tmp, replacePlaceHolderColumn, (void*)&ctx);
10,395✔
2135
        if (TSDB_CODE_SUCCESS != ctx.code) {
10,395✔
2136
          nodesDestroyNode(tmp);
×
2137
          code = ctx.code;
×
UNCOV
2138
          goto _error;
×
2139
        }
2140
        code = genStableTagFilterDigest(tmp, &context);
10,395✔
2141
        nodesDestroyNode(tmp);
10,395✔
2142
      } else {
UNCOV
2143
        code = genStableTagFilterDigest(pTagCond, &context);
×
2144
      }
2145
      QUERY_CHECK_CODE(code, lino, _error);
10,395✔
2146

2147
      code = buildTagCondKey(
10,395✔
2148
        pTagCond, &pTagCondKey, &tagCondKeyLen, &pTagColIds);
2149
      QUERY_CHECK_CODE(code, lino, _error);
10,395✔
2150
      code = pStorageAPI->metaFn.getStableCachedTableList(
10,395✔
2151
        pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
10,395✔
2152
        context.digest, tListLen(context.digest), pUidList, &acquired);
2153
      QUERY_CHECK_CODE(code, lino, _error);
10,395✔
2154
    } else if (tsTagFilterCache) {
128,190,520✔
2155
      // second, try to use normal tag filter cache
2156
      qDebug("%s using normal tag filter cache", idstr);
29,838✔
2157
      if (pStreamInfo != NULL && ((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
31,071✔
2158
        SNode* tmp = NULL;
1,233✔
2159
        code = nodesCloneNode((SNode*)pTagCond, &tmp);
1,233✔
2160
        QUERY_CHECK_CODE(code, lino, _error);
1,233✔
2161

2162
        PlaceHolderContext ctx = {.code = TSDB_CODE_SUCCESS, .pStreamRuntimeInfo = (SStreamRuntimeFuncInfo*)pStreamInfo};
1,233✔
2163
        nodesRewriteExpr(&tmp, replacePlaceHolderColumn, (void*)&ctx);
1,233✔
2164
        if (TSDB_CODE_SUCCESS != ctx.code) {
1,233✔
2165
          nodesDestroyNode(tmp);
×
2166
          code = ctx.code;
×
UNCOV
2167
          goto _error;
×
2168
        }
2169
        code = genTagFilterDigest(tmp, &context);
1,233✔
2170
        nodesDestroyNode(tmp);
1,233✔
2171
      } else {
2172
        code = genTagFilterDigest(pTagCond, &context);
28,605✔
2173
      }
2174
      // try to retrieve the result from meta cache
2175
      QUERY_CHECK_CODE(code, lino, _error);      
29,838✔
2176
      code = pStorageAPI->metaFn.getCachedTableList(
29,838✔
2177
        pVnode, pScanNode->suid, context.digest,
29,838✔
2178
        tListLen(context.digest), pUidList, &acquired);
2179
      QUERY_CHECK_CODE(code, lino, _error);
180,400✔
2180
    }
2181
    if (acquired) {
128,351,477✔
2182
      taosArrayDestroy(pTagColIds);
28,119✔
2183
      pTagColIds = NULL;
28,119✔
2184
      
2185
      digest[0] = 1;
28,119✔
2186
      memcpy(
56,238✔
2187
        digest + 1, context.digest, tListLen(context.digest));
28,119✔
2188
      qDebug("suid:%" PRIu64 ", %s retrieve table uid list from cache,"
28,119✔
2189
        " numOfTables:%d", 
2190
        pScanNode->suid, idstr, (int32_t)taosArrayGetSize(pUidList));
2191
      goto _end;
28,119✔
2192
    } else {
2193
      qDebug("suid:%" PRIu64 
128,323,358✔
2194
        ", failed to get table uid list from cache", pScanNode->suid);
2195
    }
2196

2197
    if (!pTagCond) {  // no tag filter condition exists, let's fetch all tables of this super table
128,487,596✔
2198
      code = pStorageAPI->metaFn.getChildTableList(pVnode, pScanNode->suid, pUidList);
114,687,392✔
2199
      QUERY_CHECK_CODE(code, lino, _error);
114,605,112✔
2200
      qTrace("no tag filter, get all child tables, numOfTables:%d", (int32_t)taosArrayGetSize(pUidList));
114,605,112✔
2201
    } else {
2202
      SIdxFltStatus status = SFLT_NOT_INDEX;
13,800,204✔
2203
      if (pTagIndexCond) {
13,810,355✔
2204
        void* pIndex = pStorageAPI->metaFn.getInvertIndex(pVnode);
3,853,458✔
2205

2206
        SIndexMetaArg metaArg = {.metaEx = pVnode,
3,853,484✔
2207
                                 .idx = pStorageAPI->metaFn.storeGetIndexInfo(pVnode),
3,853,458✔
2208
                                 .ivtIdx = pIndex,
2209
                                 .suid = pScanNode->uid};
3,853,105✔
2210
        code = doFilterTag(pTagIndexCond, &metaArg, pUidList, &status, &pStorageAPI->metaFilter);
3,853,105✔
2211
        if (code != 0 || status == SFLT_NOT_INDEX) {  // temporarily disable it for performance sake
3,837,608✔
2212
          qDebug("failed to get tableIds from index, suid:%" PRIu64 ", uidListSize:%d", pScanNode->uid, (int32_t)taosArrayGetSize(pUidList));
996,283✔
2213
        } else {
2214
          qDebug("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
2,841,325✔
2215
        }
2216
      }
2217
      qTrace("after index filter, pTagCond:%p uidListSize:%d", pTagCond, (int32_t)taosArrayGetSize(pUidList));
13,810,355✔
2218
      code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status,
13,814,701✔
2219
        pStorageAPI, tsTagFilterCache || tsStableTagFilterCache,
13,814,701✔
2220
        &listAdded, pStreamInfo);
2221
      QUERY_CHECK_CODE(code, lino, _error);
13,789,593✔
2222
    }
2223
    // let's add the filter results into meta-cache
2224
    numOfTables = taosArrayGetSize(pUidList);
128,396,760✔
2225

2226
    if (canCacheTagEqCondFilter) {
128,414,916✔
2227
      qInfo("%s, suid:%" PRIu64 ", add uid list to stable tag filter cache, "
15,502✔
2228
            "uidListSize:%d, origin key:%" PRIu64 ",%" PRIu64,
2229
            idstr, pScanNode->suid, (int32_t)numOfTables,
2230
            *(uint64_t*)context.digest, *(uint64_t*)(context.digest + 8));
2231

2232
      code = pStorageAPI->metaFn.putStableCachedTableList(
15,502✔
2233
        pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
2234
        context.digest, tListLen(context.digest),
2235
        pUidList, &pTagColIds);
2236
      QUERY_CHECK_CODE(code, lino, _end);
4,620✔
2237

2238
      digest[0] = 1;
4,620✔
2239
      memcpy(digest + 1, context.digest, tListLen(context.digest));
4,620✔
2240
    } else if (tsTagFilterCache) {
128,399,414✔
2241
      qInfo("%s, suid:%" PRIu64 ", add uid list to normal tag filter cache, "
7,494✔
2242
            "uidListSize:%d, origin key:%" PRIu64 ",%" PRIu64,
2243
            idstr, pScanNode->suid, (int32_t)numOfTables,
2244
            *(uint64_t*)context.digest, *(uint64_t*)(context.digest + 8));
2245
      size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
7,494✔
2246
      pPayload = taosMemoryMalloc(size);
7,494✔
2247
      QUERY_CHECK_NULL(pPayload, code, lino, _end, terrno);
7,494✔
2248

2249
      *(int32_t*)pPayload = (int32_t)numOfTables;
7,494✔
2250
      if (numOfTables > 0) {
7,494✔
2251
        void* tmp = taosArrayGet(pUidList, 0);
6,177✔
2252
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
6,177✔
2253
        memcpy(pPayload + sizeof(int32_t), tmp, numOfTables * sizeof(uint64_t));
6,177✔
2254
      }
2255

2256
      code = pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid,
7,494✔
2257
                                                    context.digest,
2258
                                                    tListLen(context.digest),
2259
                                                    pPayload, size, 1);
2260
      if (TSDB_CODE_SUCCESS == code) {
7,494✔
2261
        /*
2262
          data referenced by pPayload is used in lru cache,
2263
          reset pPayload to NULL to avoid being freed in _error block
2264
        */
2265
        pPayload = NULL;
7,494✔
2266
      } else {
UNCOV
2267
        if (TSDB_CODE_DUP_KEY == code) {
×
2268
          /*
2269
            another thread has already put the same key into cache,
2270
            we can just ignore this error
2271
          */
UNCOV
2272
          code = TSDB_CODE_SUCCESS;
×
2273
        }
UNCOV
2274
        QUERY_CHECK_CODE(code, lino, _end);
×
2275
      }
2276

2277

2278
      digest[0] = 1;
7,494✔
2279
      memcpy(digest + 1, context.digest, tListLen(context.digest));
7,494✔
2280
    }
2281
  }
2282

2283
_end:
214,290,865✔
2284
  if (!listAdded) {
214,479,642✔
2285
    numOfTables = taosArrayGetSize(pUidList);
202,868,419✔
2286
    for (int i = 0; i < numOfTables; i++) {
748,458,195✔
2287
      void* tmp = taosArrayGet(pUidList, i);
545,574,396✔
2288
      QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
545,582,103✔
2289
      STableKeyInfo info = {.uid = *(uint64_t*)tmp, .groupId = 0};
545,582,103✔
2290

2291
      void* p = taosArrayPush(pListInfo->pTableList, &info);
545,593,280✔
2292
      if (p == NULL) {
545,734,162✔
2293
        taosArrayDestroy(pUidList);
×
UNCOV
2294
        return terrno;
×
2295
      }
2296

2297
      qTrace("tagfilter get uid:%" PRIu64 ", %s", info.uid, idstr);
545,734,162✔
2298
    }
2299
  }
2300

2301
  qDebug("%s, table list with %d uids built", idstr, (int32_t)numOfTables);
214,495,022✔
2302

2303
_error:
214,487,576✔
2304
  taosArrayDestroy(pUidList);
214,543,019✔
2305
  taosArrayDestroy(pTagColIds);
214,394,958✔
2306
  taosMemFreeClear(pTagCondKey);
214,450,031✔
2307
  taosMemFreeClear(pPayload);
214,450,031✔
2308
  if (code != TSDB_CODE_SUCCESS) {
214,450,031✔
2309
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
898✔
2310
  }
2311
  return code;
214,377,844✔
2312
}
2313

2314
int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList, void* pTaskInfo) {
20,628✔
2315
  int32_t        code = TSDB_CODE_SUCCESS;
20,628✔
2316
  int32_t        lino = 0;
20,628✔
2317
  SSubplan*      pSubplan = (SSubplan*)node;
20,628✔
2318
  SScanPhysiNode pNode = {0};
20,628✔
2319
  pNode.suid = suid;
20,628✔
2320
  pNode.uid = suid;
20,628✔
2321
  pNode.tableType = TSDB_SUPER_TABLE;
20,628✔
2322

2323
  STableListInfo* pTableListInfo = tableListCreate();
20,628✔
2324
  QUERY_CHECK_NULL(pTableListInfo, code, lino, _end, terrno);
20,628✔
2325
  uint8_t digest[17] = {0};
20,628✔
2326
  code = getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL,
20,628✔
2327
                      pTableListInfo, digest, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI, NULL);
2328
  QUERY_CHECK_CODE(code, lino, _end);
20,628✔
2329
  *tableList = pTableListInfo->pTableList;
20,628✔
2330
  pTableListInfo->pTableList = NULL;
20,628✔
2331
  tableListDestroy(pTableListInfo);
20,628✔
2332

2333
_end:
20,628✔
2334
  if (code != TSDB_CODE_SUCCESS) {
20,628✔
UNCOV
2335
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2336
  }
2337
  return code;
20,628✔
2338
}
2339

2340
size_t getTableTagsBufLen(const SNodeList* pGroups) {
×
UNCOV
2341
  size_t keyLen = 0;
×
2342

2343
  SNode* node;
2344
  FOREACH(node, pGroups) {
×
2345
    SExprNode* pExpr = (SExprNode*)node;
×
UNCOV
2346
    keyLen += pExpr->resType.bytes;
×
2347
  }
2348

2349
  keyLen += sizeof(int8_t) * LIST_LENGTH(pGroups);
×
UNCOV
2350
  return keyLen;
×
2351
}
2352

UNCOV
2353
int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId,
×
2354
                              SStorageAPI* pAPI) {
UNCOV
2355
  SMetaReader mr = {0};
×
2356

2357
  pAPI->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &pAPI->metaFn);
×
2358
  if (pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid) != 0) {  // table not exist
×
2359
    pAPI->metaReaderFn.clearReader(&mr);
×
UNCOV
2360
    return TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
2361
  }
2362

2363
  SNodeList* groupNew = NULL;
×
2364
  int32_t    code = nodesCloneList(pGroupNode, &groupNew);
×
2365
  if (TSDB_CODE_SUCCESS != code) {
×
2366
    pAPI->metaReaderFn.clearReader(&mr);
×
UNCOV
2367
    return code;
×
2368
  }
2369

2370
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
×
2371
  nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &ctx);
×
2372
  if (TSDB_CODE_SUCCESS != ctx.code) {
×
2373
    nodesDestroyList(groupNew);
×
2374
    pAPI->metaReaderFn.clearReader(&mr);
×
UNCOV
2375
    return code;
×
2376
  }
2377
  char* isNull = (char*)keyBuf;
×
UNCOV
2378
  char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(pGroupNode);
×
2379

2380
  SNode*  pNode;
2381
  int32_t index = 0;
×
2382
  FOREACH(pNode, groupNew) {
×
2383
    SNode*  pNew = NULL;
×
2384
    int32_t code = scalarCalculateConstants(pNode, &pNew);
×
2385
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
2386
      REPLACE_NODE(pNew);
×
2387
    } else {
2388
      nodesDestroyList(groupNew);
×
2389
      pAPI->metaReaderFn.clearReader(&mr);
×
UNCOV
2390
      return code;
×
2391
    }
2392

2393
    if (nodeType(pNew) != QUERY_NODE_VALUE) {
×
2394
      nodesDestroyList(groupNew);
×
2395
      pAPI->metaReaderFn.clearReader(&mr);
×
2396
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
UNCOV
2397
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2398
    }
UNCOV
2399
    SValueNode* pValue = (SValueNode*)pNew;
×
2400

2401
    if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
×
2402
      isNull[index++] = 1;
×
UNCOV
2403
      continue;
×
2404
    } else {
2405
      isNull[index++] = 0;
×
2406
      char* data = nodesGetValueFromNode(pValue);
×
2407
      if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
×
2408
        if (tTagIsJson(data)) {
×
2409
          terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
×
2410
          nodesDestroyList(groupNew);
×
2411
          pAPI->metaReaderFn.clearReader(&mr);
×
UNCOV
2412
          return terrno;
×
2413
        }
2414
        int32_t len = getJsonValueLen(data);
×
2415
        memcpy(pStart, data, len);
×
2416
        pStart += len;
×
2417
      } else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
×
2418
        if (IS_STR_DATA_BLOB(pValue->node.resType.type)) {
×
UNCOV
2419
          return TSDB_CODE_BLOB_NOT_SUPPORT_TAG;
×
2420
        }
2421
        memcpy(pStart, data, varDataTLen(data));
×
UNCOV
2422
        pStart += varDataTLen(data);
×
2423
      } else {
2424
        memcpy(pStart, data, pValue->node.resType.bytes);
×
UNCOV
2425
        pStart += pValue->node.resType.bytes;
×
2426
      }
2427
    }
2428
  }
2429

2430
  int32_t len = (int32_t)(pStart - (char*)keyBuf);
×
UNCOV
2431
  *pGroupId = calcGroupId(keyBuf, len);
×
2432

2433
  nodesDestroyList(groupNew);
×
UNCOV
2434
  pAPI->metaReaderFn.clearReader(&mr);
×
2435

UNCOV
2436
  return TSDB_CODE_SUCCESS;
×
2437
}
2438

2439
SArray* makeColumnArrayFromList(SNodeList* pNodeList) {
5,547,838✔
2440
  if (!pNodeList) {
5,547,838✔
UNCOV
2441
    return NULL;
×
2442
  }
2443

2444
  size_t  numOfCols = LIST_LENGTH(pNodeList);
5,547,838✔
2445
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
5,546,929✔
2446
  if (pList == NULL) {
5,550,165✔
UNCOV
2447
    return NULL;
×
2448
  }
2449

2450
  for (int32_t i = 0; i < numOfCols; ++i) {
12,545,103✔
2451
    SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
6,990,468✔
2452
    if (!pColNode) {
7,002,158✔
2453
      taosArrayDestroy(pList);
×
2454
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
UNCOV
2455
      return NULL;
×
2456
    }
2457

2458
    // todo extract method
2459
    SColumn c = {0};
7,002,158✔
2460
    c.slotId = pColNode->slotId;
6,999,816✔
2461
    c.colId = pColNode->colId;
6,999,454✔
2462
    c.type = pColNode->node.resType.type;
7,001,387✔
2463
    c.bytes = pColNode->node.resType.bytes;
6,999,103✔
2464
    c.precision = pColNode->node.resType.precision;
6,997,336✔
2465
    c.scale = pColNode->node.resType.scale;
6,999,816✔
2466

2467
    void* tmp = taosArrayPush(pList, &c);
6,995,610✔
2468
    if (!tmp) {
6,995,610✔
2469
      taosArrayDestroy(pList);
×
2470
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
2471
      return NULL;
×
2472
    }
2473
  }
2474

2475
  return pList;
5,554,635✔
2476
}
2477

2478
int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
289,102,722✔
2479
                            int32_t type, SColMatchInfo* pMatchInfo) {
2480
  size_t  numOfCols = LIST_LENGTH(pNodeList);
289,102,722✔
2481
  int32_t code = TSDB_CODE_SUCCESS;
289,097,476✔
2482
  int32_t lino = 0;
289,097,476✔
2483

2484
  pMatchInfo->matchType = type;
289,097,476✔
2485

2486
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchItem));
289,101,193✔
2487
  if (pList == NULL) {
288,987,970✔
2488
    code = terrno;
×
UNCOV
2489
    return code;
×
2490
  }
2491

2492
  for (int32_t i = 0; i < numOfCols; ++i) {
1,423,937,834✔
2493
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
1,134,718,631✔
2494
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
1,134,978,601✔
2495
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
1,134,985,571✔
2496
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
1,123,432,935✔
2497

2498
      SColMatchItem c = {.needOutput = true};
1,123,493,279✔
2499
      c.colId = pColNode->colId;
1,123,452,453✔
2500
      c.srcSlotId = pColNode->slotId;
1,123,313,289✔
2501
      c.dstSlotId = pNode->slotId;
1,123,265,470✔
2502
      c.isPk = pColNode->isPk;
1,123,481,702✔
2503
      c.dataType = pColNode->node.resType;
1,123,371,284✔
2504
      void* tmp = taosArrayPush(pList, &c);
1,123,428,905✔
2505
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,123,428,905✔
2506
    }
2507
  }
2508

2509
  // set the output flag for each column in SColMatchInfo, according to the
2510
  *numOfOutputCols = 0;
289,219,203✔
2511
  int32_t num = LIST_LENGTH(pOutputNodeList->pSlots);
289,278,396✔
2512
  for (int32_t i = 0; i < num; ++i) {
1,625,553,157✔
2513
    SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i);
1,336,303,647✔
2514
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
1,336,510,331✔
2515

2516
    // todo: add reserve flag check
2517
    // it is a column reserved for the arithmetic expression calculation
2518
    if (pNode->slotId >= numOfCols) {
1,336,499,176✔
2519
      (*numOfOutputCols) += 1;
201,731,330✔
2520
      continue;
201,732,627✔
2521
    }
2522

2523
    SColMatchItem* info = NULL;
1,134,830,501✔
2524
    for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
2,147,483,647✔
2525
      info = taosArrayGet(pList, j);
2,147,483,647✔
2526
      QUERY_CHECK_NULL(info, code, lino, _end, terrno);
2,147,483,647✔
2527
      if (info->dstSlotId == pNode->slotId) {
2,147,483,647✔
2528
        break;
1,122,364,718✔
2529
      }
2530
    }
2531

2532
    if (pNode->output) {
85,632,022✔
2533
      (*numOfOutputCols) += 1;
1,123,494,819✔
2534
    } else if (info != NULL) {
11,159,781✔
2535
      // select distinct tbname from stb where tbname='abc';
2536
      info->needOutput = false;
11,183,569✔
2537
    }
2538
  }
2539

2540
  pMatchInfo->pList = pList;
289,249,510✔
2541

2542
_end:
289,273,479✔
2543
  if (code != TSDB_CODE_SUCCESS) {
289,273,479✔
UNCOV
2544
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2545
  }
2546
  return code;
289,081,366✔
2547
}
2548

2549
static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision,
1,117,222,802✔
2550
                                  const char* name) {
2551
  SResSchema s = {0};
1,117,222,802✔
2552
  s.scale = scale;
1,117,259,399✔
2553
  s.type = type;
1,117,259,399✔
2554
  s.bytes = bytes;
1,117,259,399✔
2555
  s.slotId = slotId;
1,117,259,399✔
2556
  s.precision = precision;
1,117,259,399✔
2557
  tstrncpy(s.name, name, tListLen(s.name));
1,117,259,399✔
2558

2559
  return s;
1,117,259,399✔
2560
}
2561

2562
static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType, EColumnType colType) {
1,076,571,929✔
2563
  SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn));
1,076,571,929✔
2564
  if (pCol == NULL) {
1,075,729,469✔
UNCOV
2565
    return NULL;
×
2566
  }
2567

2568
  pCol->slotId = slotId;
1,075,729,469✔
2569
  pCol->colId = colId;
1,075,797,193✔
2570
  pCol->bytes = pType->bytes;
1,075,911,782✔
2571
  pCol->type = pType->type;
1,076,086,182✔
2572
  pCol->scale = pType->scale;
1,076,411,091✔
2573
  pCol->precision = pType->precision;
1,076,351,607✔
2574
  pCol->dataBlockId = blockId;
1,076,675,036✔
2575
  pCol->colType = colType;
1,076,601,571✔
2576
  return pCol;
1,076,633,471✔
2577
}
2578

2579
int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
1,120,957,621✔
2580
  int32_t code = TSDB_CODE_SUCCESS;
1,120,957,621✔
2581
  int32_t lino = 0;
1,120,957,621✔
2582
  pExp->base.numOfParams = 0;
1,120,957,621✔
2583
  pExp->base.pParam = NULL;
1,120,988,845✔
2584
  pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode));
1,120,929,635✔
2585
  QUERY_CHECK_NULL(pExp->pExpr, code, lino, _end, terrno);
1,119,838,701✔
2586

2587
  pExp->pExpr->_function.num = 1;
1,120,063,253✔
2588
  pExp->pExpr->_function.functionId = -1;
1,120,310,524✔
2589

2590
  int32_t type = nodeType(pNode);
1,120,460,655✔
2591
  // it is a project query, or group by column
2592
  if (type == QUERY_NODE_COLUMN) {
1,120,764,573✔
2593
    pExp->pExpr->nodeType = QUERY_NODE_COLUMN;
611,698,591✔
2594
    SColumnNode* pColNode = (SColumnNode*)pNode;
611,754,484✔
2595

2596
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
611,754,484✔
2597
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
611,559,494✔
2598

2599
    pExp->base.numOfParams = 1;
611,551,030✔
2600

2601
    SDataType* pType = &pColNode->node.resType;
611,577,975✔
2602
    pExp->base.resSchema =
2603
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pColNode->colName);
611,653,203✔
2604

2605
    pExp->base.pParam[0].pCol =
1,223,271,774✔
2606
        createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType);
1,203,234,109✔
2607
    QUERY_CHECK_NULL(pExp->base.pParam[0].pCol, code, lino, _end, terrno);
611,712,922✔
2608

2609
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
611,485,307✔
2610
  } else if (type == QUERY_NODE_VALUE) {
509,065,982✔
2611
    pExp->pExpr->nodeType = QUERY_NODE_VALUE;
20,025,926✔
2612
    SValueNode* pValNode = (SValueNode*)pNode;
20,023,592✔
2613

2614
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
20,023,592✔
2615
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
19,995,901✔
2616

2617
    pExp->base.numOfParams = 1;
20,021,232✔
2618

2619
    SDataType* pType = &pValNode->node.resType;
20,024,813✔
2620
    pExp->base.resSchema =
2621
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName);
20,005,150✔
2622
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
20,001,032✔
2623
    code = nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
19,987,563✔
2624
    QUERY_CHECK_CODE(code, lino, _end);
19,992,626✔
2625
  } else if (type == QUERY_NODE_REMOTE_VALUE) {
489,040,056✔
2626
    SRemoteValueNode* pRemote = (SRemoteValueNode*)pNode;
10,282,949✔
2627
    code = qFetchRemoteNode(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, pNode);
10,282,949✔
2628
    QUERY_CHECK_CODE(code, lino, _end);
10,296,243✔
2629

2630
    pExp->pExpr->nodeType = QUERY_NODE_VALUE;
6,713,713✔
2631
    SValueNode* pValNode = (SValueNode*)pNode;
6,713,713✔
2632

2633
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
6,713,713✔
2634
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
6,714,527✔
2635

2636
    pExp->base.numOfParams = 1;
6,714,527✔
2637

2638
    SDataType* pType = &pValNode->node.resType;
6,714,527✔
2639
    pExp->base.resSchema =
2640
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName);
6,714,527✔
2641
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
6,713,874✔
2642
    code = nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
6,713,874✔
2643
    QUERY_CHECK_CODE(code, lino, _end);
6,707,770✔
2644
  } else if (type == QUERY_NODE_FUNCTION) {
478,757,107✔
2645
    pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
450,142,494✔
2646
    SFunctionNode* pFuncNode = (SFunctionNode*)pNode;
450,149,013✔
2647

2648
    SDataType* pType = &pFuncNode->node.resType;
450,149,013✔
2649
    pExp->base.resSchema =
2650
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pFuncNode->node.aliasName);
450,061,389✔
2651
    tExprNode* pExprNode = pExp->pExpr;
450,081,856✔
2652

2653
    pExprNode->_function.functionId = pFuncNode->funcId;
450,052,637✔
2654
    pExprNode->_function.pFunctNode = pFuncNode;
450,082,695✔
2655
    pExprNode->_function.functionType = pFuncNode->funcType;
450,220,701✔
2656

2657
    tstrncpy(pExprNode->_function.functionName, pFuncNode->functionName, tListLen(pExprNode->_function.functionName));
450,080,883✔
2658

2659
    pExp->base.pParamList = pFuncNode->pParameterList;
450,129,653✔
2660
#if 1
2661
    // todo refactor: add the parameter for tbname function
2662
    const char* name = "tbname";
450,220,503✔
2663
    int32_t     len = strlen(name);
450,220,503✔
2664

2665
    if (!pFuncNode->pParameterList && (memcmp(pExprNode->_function.functionName, name, len) == 0) &&
450,220,503✔
2666
        pExprNode->_function.functionName[len] == 0) {
30,755,190✔
2667
      pFuncNode->pParameterList = NULL;
30,716,508✔
2668
      int32_t     code = nodesMakeList(&pFuncNode->pParameterList);
30,763,815✔
2669
      SValueNode* res = NULL;
30,738,547✔
2670
      if (TSDB_CODE_SUCCESS == code) {
30,740,053✔
2671
        code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
30,738,569✔
2672
      }
2673
      QUERY_CHECK_CODE(code, lino, _end);
30,775,413✔
2674
      res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
30,775,413✔
2675
      code = nodesListAppend(pFuncNode->pParameterList, (SNode*)res);
30,741,940✔
2676
      if (code != TSDB_CODE_SUCCESS) {
30,759,705✔
2677
        nodesDestroyNode((SNode*)res);
×
UNCOV
2678
        res = NULL;
×
2679
      }
2680
      QUERY_CHECK_CODE(code, lino, _end);
30,759,705✔
2681
    }
2682
#endif
2683

2684
    int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
450,179,257✔
2685

2686
    pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam));
450,237,091✔
2687
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
449,914,360✔
2688
    pExp->base.numOfParams = numOfParam;
449,914,313✔
2689

2690
    for (int32_t j = 0; j < numOfParam && TSDB_CODE_SUCCESS == code; ++j) {
1,092,113,234✔
2691
      SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
642,375,359✔
2692
      QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
642,451,139✔
2693
      if (p1->type == QUERY_NODE_COLUMN) {
642,443,413✔
2694
        SColumnNode* pcn = (SColumnNode*)p1;
464,871,790✔
2695

2696
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
464,871,790✔
2697
        pExp->base.pParam[j].pCol =
929,512,955✔
2698
            createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType);
916,151,174✔
2699
        QUERY_CHECK_NULL(pExp->base.pParam[j].pCol, code, lino, _end, terrno);
464,807,683✔
2700
      } else if (p1->type == QUERY_NODE_VALUE) {
177,606,762✔
2701
        SValueNode* pvn = (SValueNode*)p1;
122,305,331✔
2702
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
122,305,331✔
2703
        code = nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
122,259,431✔
2704
        QUERY_CHECK_CODE(code, lino, _end);
122,213,986✔
2705
      } else if (p1->type == QUERY_NODE_REMOTE_VALUE) {
55,425,920✔
2706
        SRemoteValueNode* pRemote = (SRemoteValueNode*)p1;
514,625✔
2707
        code = qFetchRemoteNode(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, p1);
514,625✔
2708
        QUERY_CHECK_CODE(code, lino, _end);
515,537✔
2709

2710
        SValueNode* pvn = (SValueNode*)pRemote;
343,242✔
2711
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
343,242✔
2712
        code = nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
343,242✔
2713
        QUERY_CHECK_CODE(code, lino, _end);
303,181✔
2714
      }
2715
    }
2716
    pExp->pExpr->_function.bindExprID = ((SExprNode*)pNode)->bindExprID;
449,737,875✔
2717
  } else if (type == QUERY_NODE_OPERATOR) {
28,614,613✔
2718
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
26,343,333✔
2719
    SOperatorNode* pOpNode = (SOperatorNode*)pNode;
26,343,805✔
2720

2721
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
26,343,805✔
2722
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
26,303,128✔
2723
    pExp->base.numOfParams = 1;
26,302,931✔
2724

2725
    SDataType* pType = &pOpNode->node.resType;
26,307,951✔
2726
    pExp->base.resSchema =
2727
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pOpNode->node.aliasName);
26,291,113✔
2728
    pExp->pExpr->_optrRoot.pRootNode = pNode;
26,320,155✔
2729
  } else if (type == QUERY_NODE_CASE_WHEN) {
2,271,611✔
2730
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
2,345,676✔
2731
    SCaseWhenNode* pCaseNode = (SCaseWhenNode*)pNode;
2,345,067✔
2732

2733
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
2,345,067✔
2734
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
2,344,635✔
2735
    pExp->base.numOfParams = 1;
2,344,694✔
2736

2737
    SDataType* pType = &pCaseNode->node.resType;
2,344,635✔
2738
    pExp->base.resSchema =
2739
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCaseNode->node.aliasName);
2,346,049✔
2740
    pExp->pExpr->_optrRoot.pRootNode = pNode;
2,344,635✔
UNCOV
2741
  } else if (type == QUERY_NODE_LOGIC_CONDITION) {
×
2742
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
2,994✔
2743
    SLogicConditionNode* pCond = (SLogicConditionNode*)pNode;
2,994✔
2744
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
2,994✔
2745
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
2,994✔
2746
    pExp->base.numOfParams = 1;
2,994✔
2747
    SDataType* pType = &pCond->node.resType;
2,994✔
2748
    pExp->base.resSchema =
2749
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName);
2,994✔
2750
    pExp->pExpr->_optrRoot.pRootNode = pNode;
2,994✔
2751
  } else {
2752
    code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
2753
    QUERY_CHECK_CODE(code, lino, _end);
×
2754
  }
2755
  pExp->pExpr->relatedTo = ((SExprNode*)pNode)->relatedTo;
1,116,779,752✔
2756
_end:
1,120,647,875✔
2757
  if (code != TSDB_CODE_SUCCESS) {
1,120,647,875✔
2758
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
3,754,825✔
2759
  }
2760
  return code;
1,120,726,316✔
2761
}
2762

2763
int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) {
1,120,818,392✔
2764
  return createExprFromOneNode(pExp, pTargetNode->pExpr, pTargetNode->slotId);
1,120,818,392✔
2765
}
2766

2767
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
×
2768
  *numOfExprs = LIST_LENGTH(pNodeList);
×
2769
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
×
2770
  if (!pExprs) {
×
UNCOV
2771
    return NULL;
×
2772
  }
2773

2774
  for (int32_t i = 0; i < (*numOfExprs); ++i) {
×
2775
    SExprInfo* pExp = &pExprs[i];
×
2776
    int32_t    code = createExprFromOneNode(pExp, nodesListGetNode(pNodeList, i), i + UD_TAG_COLUMN_INDEX);
×
2777
    if (code != TSDB_CODE_SUCCESS) {
×
2778
      taosMemoryFreeClear(pExprs);
×
2779
      terrno = code;
×
2780
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
2781
      return NULL;
×
2782
    }
2783
  }
2784

UNCOV
2785
  return pExprs;
×
2786
}
2787

2788
int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs) {
398,962,513✔
2789
  QRY_PARAM_CHECK(pExprInfo);
398,962,513✔
2790

2791
  int32_t code = 0;
399,054,150✔
2792
  int32_t numOfFuncs = LIST_LENGTH(pNodeList);
399,054,150✔
2793
  int32_t numOfGroupKeys = 0;
398,773,833✔
2794
  if (pGroupKeys != NULL) {
398,773,833✔
2795
    numOfGroupKeys = LIST_LENGTH(pGroupKeys);
35,529,129✔
2796
  }
2797

2798
  *numOfExprs = numOfFuncs + numOfGroupKeys;
398,776,942✔
2799
  if (*numOfExprs == 0) {
399,014,710✔
2800
    return code;
42,947,570✔
2801
  }
2802

2803
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
356,151,431✔
2804
  if (pExprs == NULL) {
355,556,015✔
UNCOV
2805
    return terrno;
×
2806
  }
2807

2808
  for (int32_t i = 0; i < (*numOfExprs); ++i) {
1,472,071,195✔
2809
    STargetNode* pTargetNode = NULL;
1,120,039,072✔
2810
    if (i < numOfFuncs) {
1,120,039,072✔
2811
      pTargetNode = (STargetNode*)nodesListGetNode(pNodeList, i);
1,053,104,671✔
2812
    } else {
2813
      pTargetNode = (STargetNode*)nodesListGetNode(pGroupKeys, i - numOfFuncs);
66,934,401✔
2814
    }
2815
    if (!pTargetNode) {
1,120,573,999✔
2816
      destroyExprInfo(pExprs, *numOfExprs);
×
2817
      taosMemoryFreeClear(pExprs);
×
2818
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
2819
      return terrno;
×
2820
    }
2821

2822
    SExprInfo* pExp = &pExprs[i];
1,120,573,999✔
2823
    code = createExprFromTargetNode(pExp, pTargetNode);
1,120,712,780✔
2824
    if (code != TSDB_CODE_SUCCESS) {
1,120,271,689✔
2825
      destroyExprInfo(pExprs, *numOfExprs);
3,756,509✔
2826
      taosMemoryFreeClear(pExprs);
3,754,825✔
2827
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
3,754,825✔
2828
      return code;
3,754,825✔
2829
    }
2830
  }
2831

2832
  *pExprInfo = pExprs;
352,239,345✔
2833
  return code;
352,198,210✔
2834
}
2835

2836
static void deleteSubsidiareCtx(void* pData) {
×
2837
  SSubsidiaryResInfo* pCtx = (SSubsidiaryResInfo*)pData;
×
2838
  if (pCtx->pCtx) {
×
UNCOV
2839
    taosMemoryFreeClear(pCtx->pCtx);
×
2840
  }
UNCOV
2841
}
×
2842

2843
// set the output buffer for the selectivity + tag query
2844
static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
381,667,207✔
2845
  int32_t num = 0;
381,667,207✔
2846
  int32_t code = TSDB_CODE_SUCCESS;
381,667,207✔
2847
  int32_t lino = 0;
381,667,207✔
2848

2849
  SArray* pValCtxArray = NULL;
381,667,207✔
2850
  for (int32_t i = numOfOutput - 1; i > 0; --i) {  // select Func is at the end of the list
1,137,433,771✔
2851
    int32_t funcIdx = pCtx[i].pExpr->pExpr->_function.bindExprID;
756,037,570✔
2852
    if (funcIdx > 0) {
755,958,147✔
2853
      if (pValCtxArray == NULL) {
1,473,221✔
2854
        // the end of the list is the select function of biggest index
2855
        pValCtxArray = taosArrayInit_s(sizeof(SSubsidiaryResInfo*), funcIdx);
1,058,293✔
2856
        if (pValCtxArray == NULL) {
1,058,293✔
UNCOV
2857
          return terrno;
×
2858
        }
2859
      }
2860
      if (funcIdx > pValCtxArray->size) {
1,473,221✔
2861
        qError("funcIdx:%d is out of range", funcIdx);
×
2862
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
UNCOV
2863
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2864
      }
2865
      SSubsidiaryResInfo* pSubsidiary = &pCtx[i].subsidiaries;
1,472,015✔
2866
      pSubsidiary->pCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
1,473,221✔
2867
      if (pSubsidiary->pCtx == NULL) {
1,474,025✔
2868
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
UNCOV
2869
        return terrno;
×
2870
      }
2871
      pSubsidiary->num = 0;
1,472,015✔
2872
      taosArraySet(pValCtxArray, funcIdx - 1, &pSubsidiary);
1,472,015✔
2873
    }
2874
  }
2875

2876
  SqlFunctionCtx*  p = NULL;
381,396,201✔
2877
  SqlFunctionCtx** pValCtx = NULL;
381,396,201✔
2878
  if (pValCtxArray == NULL) {
381,396,201✔
2879
    pValCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
380,381,754✔
2880
    if (pValCtx == NULL) {
380,489,856✔
UNCOV
2881
      QUERY_CHECK_CODE(terrno, lino, _end);
×
2882
    }
2883
  }
2884

2885
  for (int32_t i = 0; i < numOfOutput; ++i) {
1,483,196,006✔
2886
    const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
1,101,661,655✔
2887
    if ((strcmp(pName, "_select_value") == 0)) {
1,101,914,099✔
2888
      if (pValCtxArray == NULL) {
7,052,922✔
2889
        pValCtx[num++] = &pCtx[i];
4,984,131✔
2890
      } else {
2891
        int32_t bindFuncIndex = pCtx[i].pExpr->pExpr->relatedTo;  // start from index 1;
2,068,791✔
2892
        if (bindFuncIndex > 0) {                                  // 0 is default index related to the select function
2,067,585✔
2893
          bindFuncIndex -= 1;
2,018,943✔
2894
        }
2895
        SSubsidiaryResInfo** pSubsidiary = taosArrayGet(pValCtxArray, bindFuncIndex);
2,067,585✔
2896
        if (pSubsidiary == NULL) {
2,067,183✔
UNCOV
2897
          QUERY_CHECK_CODE(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, lino, _end);
×
2898
        }
2899
        (*pSubsidiary)->pCtx[(*pSubsidiary)->num] = &pCtx[i];
2,067,183✔
2900
        (*pSubsidiary)->num++;
2,067,183✔
2901
      }
2902
    } else if (fmIsSelectFunc(pCtx[i].functionId)) {
1,094,861,177✔
2903
      if (pValCtxArray == NULL) {
109,241,645✔
2904
        p = &pCtx[i];
107,445,589✔
2905
      }
2906
    }
2907
  }
2908

2909
  if (p != NULL) {
381,534,351✔
2910
    p->subsidiaries.pCtx = pValCtx;
32,500,307✔
2911
    p->subsidiaries.num = num;
32,499,107✔
2912
  } else {
2913
    taosMemoryFreeClear(pValCtx);
349,034,044✔
2914
  }
2915

2916
_end:
1,102,912✔
2917
  if (code != TSDB_CODE_SUCCESS) {
381,370,018✔
2918
    taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2919
    taosMemoryFreeClear(pValCtx);
×
UNCOV
2920
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2921
  } else {
2922
    taosArrayDestroy(pValCtxArray);
381,370,018✔
2923
  }
2924
  return code;
381,510,862✔
2925
}
2926

2927
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset,
381,641,633✔
2928
                                     SFunctionStateStore* pStore) {
2929
  int32_t         code = TSDB_CODE_SUCCESS;
381,641,633✔
2930
  int32_t         lino = 0;
381,641,633✔
2931
  SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
381,641,633✔
2932
  if (pFuncCtx == NULL) {
380,987,620✔
UNCOV
2933
    return NULL;
×
2934
  }
2935

2936
  *rowEntryInfoOffset = taosMemoryCalloc(numOfOutput, sizeof(int32_t));
380,987,620✔
2937
  if (*rowEntryInfoOffset == 0) {
381,593,428✔
2938
    taosMemoryFreeClear(pFuncCtx);
×
UNCOV
2939
    return NULL;
×
2940
  }
2941

2942
  for (int32_t i = 0; i < numOfOutput; ++i) {
1,483,434,519✔
2943
    SExprInfo* pExpr = &pExprInfo[i];
1,101,930,685✔
2944

2945
    SExprBasicInfo* pFunct = &pExpr->base;
1,101,770,515✔
2946
    SqlFunctionCtx* pCtx = &pFuncCtx[i];
1,101,878,493✔
2947

2948
    pCtx->functionId = -1;
1,101,843,767✔
2949
    pCtx->pExpr = pExpr;
1,101,936,899✔
2950

2951
    if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) {
1,101,827,159✔
2952
      SFuncExecEnv env = {0};
448,951,375✔
2953
      pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
448,988,752✔
2954
      pCtx->isPseudoFunc = fmIsWindowPseudoColumnFunc(pCtx->functionId) || fmIsPlaceHolderFunc(pCtx->functionId);
449,016,107✔
2955
      pCtx->isNotNullFunc = fmIsNotNullOutputFunc(pCtx->functionId);
448,934,061✔
2956

2957
      bool isUdaf = fmIsUserDefinedFunc(pCtx->functionId);
449,064,034✔
2958
      if (fmIsAggFunc(pCtx->functionId) || fmIsIndefiniteRowsFunc(pCtx->functionId)) {
752,429,510✔
2959
        if (!isUdaf) {
303,798,566✔
2960
          code = fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
303,701,344✔
2961
          QUERY_CHECK_CODE(code, lino, _end);
303,581,721✔
2962
        } else {
2963
          char* udfName = pExpr->pExpr->_function.pFunctNode->functionName;
97,222✔
2964
          pCtx->udfName = taosStrdup(udfName);
97,222✔
2965
          QUERY_CHECK_NULL(pCtx->udfName, code, lino, _end, terrno);
97,222✔
2966

2967
          code = fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
97,222✔
2968
          QUERY_CHECK_CODE(code, lino, _end);
97,222✔
2969
        }
2970
        bool tmp = pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
303,678,943✔
2971
        if (!tmp) {
303,548,021✔
UNCOV
2972
          code = terrno;
×
2973
          QUERY_CHECK_CODE(code, lino, _end);
19,211✔
2974
        }
2975
      } else {
2976
        if (fmIsPlaceHolderFunc(pCtx->functionId)) {
144,993,291✔
2977
          code = fmGetStreamPesudoFuncEnv(pCtx->functionId, pExpr->base.pParamList, &env);
5,781,876✔
2978
          QUERY_CHECK_CODE(code, lino, _end);
5,781,934✔
2979
        }      
2980
        
2981
        code = fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp);
145,074,048✔
2982
        if (code != TSDB_CODE_SUCCESS && isUdaf) {
145,054,052✔
2983
          code = TSDB_CODE_SUCCESS;
112,035✔
2984
        }
2985
        QUERY_CHECK_CODE(code, lino, _end);
145,054,052✔
2986

2987
        if (pCtx->sfp.getEnv != NULL) {
145,054,052✔
2988
          bool tmp = pCtx->sfp.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
16,631,473✔
2989
          if (!tmp) {
16,636,140✔
2990
            code = terrno;
368✔
UNCOV
2991
            QUERY_CHECK_CODE(code, lino, _end);
×
2992
          }
2993
        }
2994
      }
2995
      pCtx->resDataInfo.interBufSize = env.calcMemSize;
448,682,014✔
2996
    } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR ||
652,844,951✔
2997
               pExpr->pExpr->nodeType == QUERY_NODE_VALUE) {
26,701,269✔
2998
      // for simple column, the result buffer needs to hold at least one element.
2999
      pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes;
653,151,054✔
3000
    }
3001

3002
    pCtx->input.numOfInputCols = pFunct->numOfParams;
1,102,034,431✔
3003
    pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
1,101,652,886✔
3004
    QUERY_CHECK_NULL(pCtx->input.pData, code, lino, _end, terrno);
1,102,118,399✔
3005
    pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
1,101,687,733✔
3006
    QUERY_CHECK_NULL(pCtx->input.pColumnDataAgg, code, lino, _end, terrno);
1,102,084,299✔
3007

3008
    pCtx->pTsOutput = NULL;
1,101,736,555✔
3009
    pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
1,102,138,180✔
3010
    pCtx->resDataInfo.type = pFunct->resSchema.type;
1,102,023,074✔
3011
    pCtx->order = TSDB_ORDER_ASC;
1,102,106,675✔
3012
    pCtx->start.key = INT64_MIN;
1,102,230,851✔
3013
    pCtx->end.key = INT64_MIN;
1,102,040,492✔
3014
    pCtx->numOfParams = pExpr->base.numOfParams;
1,102,030,385✔
3015
    pCtx->param = pFunct->pParam;
1,102,300,726✔
3016
    pCtx->saveHandle.currentPage = -1;
1,101,984,063✔
3017
    pCtx->pStore = pStore;
1,102,234,519✔
3018
    pCtx->hasWindowOrGroup = false;
1,102,298,170✔
3019
    pCtx->needCleanup = false;
1,102,093,516✔
3020
    pCtx->skipDynDataCheck = false;
1,101,953,662✔
3021
  }
3022

3023
  for (int32_t i = 1; i < numOfOutput; ++i) {
1,137,916,315✔
3024
    (*rowEntryInfoOffset)[i] = (int32_t)((*rowEntryInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) +
1,504,043,890✔
3025
                                         pFuncCtx[i - 1].resDataInfo.interBufSize);
756,208,255✔
3026
  }
3027

3028
  code = setSelectValueColumnInfo(pFuncCtx, numOfOutput);
381,684,273✔
3029
  QUERY_CHECK_CODE(code, lino, _end);
381,531,503✔
3030

3031
_end:
381,531,503✔
3032
  if (code != TSDB_CODE_SUCCESS) {
381,377,491✔
3033
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3034
    for (int32_t i = 0; i < numOfOutput; ++i) {
×
3035
      taosMemoryFree(pFuncCtx[i].input.pData);
×
UNCOV
3036
      taosMemoryFree(pFuncCtx[i].input.pColumnDataAgg);
×
3037
    }
3038
    taosMemoryFreeClear(*rowEntryInfoOffset);
×
UNCOV
3039
    taosMemoryFreeClear(pFuncCtx);
×
3040

3041
    terrno = code;
×
UNCOV
3042
    return NULL;
×
3043
  }
3044
  return pFuncCtx;
381,377,491✔
3045
}
3046

3047
// NOTE: sources columns are more than the destination SSDatablock columns.
3048
// doFilter in table scan needs every column even its output is false
3049
int32_t relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn) {
13,913,365✔
3050
  int32_t code = TSDB_CODE_SUCCESS;
13,913,365✔
3051
  size_t  numOfSrcCols = taosArrayGetSize(pCols);
13,913,365✔
3052

3053
  int32_t i = 0, j = 0;
13,912,863✔
3054
  while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
109,952,181✔
3055
    SColumnInfoData* p = taosArrayGet(pCols, i);
96,038,378✔
3056
    if (!p) {
96,035,549✔
3057
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
3058
      return terrno;
×
3059
    }
3060
    SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j);
96,035,549✔
3061
    if (!pmInfo) {
96,037,310✔
UNCOV
3062
      return terrno;
×
3063
    }
3064

3065
    if (p->info.colId == pmInfo->colId) {
96,037,310✔
3066
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->dstSlotId);
89,962,569✔
3067
      if (!pDst) {
89,960,779✔
UNCOV
3068
        return terrno;
×
3069
      }
3070
      code = colDataAssign(pDst, p, pBlock->info.rows, &pBlock->info);
89,960,779✔
3071
      if (code != TSDB_CODE_SUCCESS) {
89,962,140✔
3072
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
3073
        return code;
×
3074
      }
3075
      i++;
89,962,140✔
3076
      j++;
89,962,140✔
3077
    } else if (p->info.colId < pmInfo->colId) {
6,076,687✔
3078
      i++;
6,077,178✔
3079
    } else {
3080
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
UNCOV
3081
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3082
    }
3083
  }
3084
  return code;
13,912,809✔
3085
}
3086

3087
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
183,086,363✔
3088
  SInterval interval = {
330,467,050✔
3089
      .interval = pTableScanNode->interval,
182,849,494✔
3090
      .sliding = pTableScanNode->sliding,
182,819,014✔
3091
      .intervalUnit = pTableScanNode->intervalUnit,
183,142,395✔
3092
      .slidingUnit = pTableScanNode->slidingUnit,
182,949,848✔
3093
      .offset = pTableScanNode->offset,
183,108,847✔
3094
      .precision = pTableScanNode->scan.node.pOutputDataBlockDesc->precision,
183,067,460✔
3095
      .timeRange = pTableScanNode->scanRange,
3096
  };
3097
  calcIntervalAutoOffset(&interval);
182,822,923✔
3098

3099
  return interval;
182,711,977✔
3100
}
3101

3102
SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
70,565,084✔
3103
  SColumn c = {0};
70,565,084✔
3104

3105
  c.slotId = pColNode->slotId;
70,565,084✔
3106
  c.colId = pColNode->colId;
70,562,351✔
3107
  c.type = pColNode->node.resType.type;
70,561,560✔
3108
  c.bytes = pColNode->node.resType.bytes;
70,548,247✔
3109
  c.scale = pColNode->node.resType.scale;
70,560,867✔
3110
  c.precision = pColNode->node.resType.precision;
70,565,790✔
3111
  return c;
70,540,005✔
3112
}
3113

3114

3115
/**
3116
 * @brief Determine the actual time range for reading data based on the RANGE clause and the WHERE conditions.
3117
 * @param[in] cond The range specified by WHERE condition.
3118
 * @param[in] range The range specified by RANGE clause.
3119
 * @param[out] twindow The range to be read in DESC order, and only one record is needed.
3120
 * @param[out] extTwindow The external range to read for only one record, which is used for FILL clause.
3121
 * @note `cond` and `twindow` may be the same address.
3122
 */
3123
static int32_t getQueryExtWindow(const STimeWindow* cond, const STimeWindow* range, STimeWindow* twindow,
934,049✔
3124
                                 STimeWindow* extTwindows) {
3125
  int32_t     code = TSDB_CODE_SUCCESS;
934,049✔
3126
  int32_t     lino = 0;
934,049✔
3127
  STimeWindow tempWindow;
3128

3129
  if (cond->skey > cond->ekey || range->skey > range->ekey) {
934,049✔
3130
    *twindow = extTwindows[0] = extTwindows[1] = TSWINDOW_DESC_INITIALIZER;
391✔
UNCOV
3131
    return code;
×
3132
  }
3133

3134
  if (range->ekey < cond->skey) {
934,049✔
3135
    extTwindows[1] = *cond;
136,068✔
3136
    *twindow = extTwindows[0] = TSWINDOW_DESC_INITIALIZER;
136,068✔
3137
    return code;
136,068✔
3138
  }
3139

3140
  if (cond->ekey < range->skey) {
797,981✔
3141
    extTwindows[0] = *cond;
101,269✔
3142
    *twindow = extTwindows[1] = TSWINDOW_DESC_INITIALIZER;
101,269✔
3143
    return code;
101,269✔
3144
  }
3145

3146
  // Only scan data in the time range intersecion.
3147
  extTwindows[0] = extTwindows[1] = *cond;
696,712✔
3148
  twindow->skey = TMAX(cond->skey, range->skey);
696,712✔
3149
  twindow->ekey = TMIN(cond->ekey, range->ekey);
696,712✔
3150
  extTwindows[0].ekey = twindow->skey - 1;
697,103✔
3151
  extTwindows[1].skey = twindow->ekey + 1;
696,712✔
3152

3153
  return code;
696,712✔
3154
}
3155

3156
static int32_t getPrimaryTimeRange(SNode** pPrimaryKeyCond, STimeWindow* pTimeRange, bool* isStrict) {
5,976✔
3157
  SNode*  pNew = NULL;
5,976✔
3158
  int32_t code = scalarCalculateRemoteConstants(*pPrimaryKeyCond, &pNew);
5,976✔
3159
  if (TSDB_CODE_SUCCESS == code) {
5,976✔
3160
    *pPrimaryKeyCond = pNew;
5,976✔
3161
    if (nodeType(pNew) != QUERY_NODE_VALUE) {
5,976✔
3162
      code = filterGetTimeRange(*pPrimaryKeyCond, pTimeRange, isStrict, NULL);
5,976✔
3163
    }
3164
  }
3165
  return code;
5,976✔
3166
}
3167

3168
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, STableScanPhysiNode* pTableScanNode,
212,531,104✔
3169
                               const SReadHandle* readHandle, bool applyExtWin) {
3170
  int32_t code = 0;                             
212,531,104✔
3171
  pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
212,531,104✔
3172
  pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
212,506,419✔
3173

3174
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
212,549,571✔
3175
  if (!pCond->colList) {
212,525,645✔
UNCOV
3176
    return terrno;
×
3177
  }
3178
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
212,404,639✔
3179
  if (pCond->pSlotList == NULL) {
212,499,215✔
3180
    taosMemoryFreeClear(pCond->colList);
×
UNCOV
3181
    return terrno;
×
3182
  }
3183

3184
  // TODO: get it from stable scan node
3185
  pCond->twindows = pTableScanNode->scanRange;
212,127,502✔
3186
  pCond->suid = pTableScanNode->scan.suid;
212,496,012✔
3187
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
212,288,683✔
3188
  pCond->startVersion = -1;
212,443,040✔
3189
  pCond->endVersion = -1;
212,504,753✔
3190
  pCond->skipRollup = readHandle->skipRollup;
212,222,129✔
3191
  if (readHandle->winRangeValid) {
212,454,647✔
3192
    pCond->twindows = readHandle->winRange;
295,995✔
3193
  }
3194
  pCond->cacheSttStatis = readHandle->cacheSttStatis;
212,585,904✔
3195
  // allowed read stt file optimization mode
3196
  pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) &&
426,272,335✔
3197
                       (pTableScanNode->scan.node.pConditions == NULL) && (pTableScanNode->interval == 0);
212,479,300✔
3198

3199
  int32_t j = 0;
212,415,965✔
3200
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
1,057,412,661✔
3201
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pTableScanNode->scan.pScanCols, i);
845,271,936✔
3202
    if (!pNode) {
844,662,049✔
3203
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
105,429✔
3204
      return terrno;
105,429✔
3205
    }
3206
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
844,556,620✔
3207
    if (pColNode->colType == COLUMN_TYPE_TAG) {
845,116,344✔
UNCOV
3208
      continue;
×
3209
    }
3210

3211
    pCond->colList[j].type = pColNode->node.resType.type;
845,079,117✔
3212
    pCond->colList[j].bytes = pColNode->node.resType.bytes;
845,029,370✔
3213
    pCond->colList[j].colId = pColNode->colId;
845,073,412✔
3214
    pCond->colList[j].pk = pColNode->isPk;
845,008,381✔
3215

3216
    pCond->pSlotList[j] = pNode->slotId;
845,211,266✔
3217
    j += 1;
844,996,696✔
3218
  }
3219

3220
  pCond->numOfCols = j;
212,504,394✔
3221

3222
  if (applyExtWin) {
212,528,290✔
3223
    if (NULL != pTableScanNode->pExtScanRange) {
183,515,090✔
3224
      pCond->type = TIMEWINDOW_RANGE_EXTERNAL;
934,440✔
3225
      code = getQueryExtWindow(&pCond->twindows, pTableScanNode->pExtScanRange, &pCond->twindows, pCond->extTwindows);
934,440✔
3226
    } else if (readHandle->extWinRangeValid) {
182,357,625✔
UNCOV
3227
      pCond->type = TIMEWINDOW_RANGE_EXTERNAL;
×
UNCOV
3228
      code = getQueryExtWindow(&pCond->twindows, &readHandle->extWinRange, &pCond->twindows, pCond->extTwindows);
×
3229
    }
3230
  }
3231

3232
  if (pTableScanNode->pPrimaryCond) {
212,512,828✔
3233
    bool isStrict = false;
5,976✔
3234
    code = getPrimaryTimeRange((SNode**)&pTableScanNode->pPrimaryCond, &pCond->twindows, &isStrict);
5,976✔
3235
    if (code || !isStrict) {
5,976✔
3236
      code = nodesMergeNode((SNode**)&pTableScanNode->scan.node.pConditions, &pTableScanNode->pPrimaryCond);
1,992✔
3237
    }
3238
  }
3239

3240
  return code;
212,471,529✔
3241
}
3242

3243
int32_t initQueryTableDataCondWithColArray(SQueryTableDataCond* pCond, SQueryTableDataCond* pOrgCond,
44,275,666✔
3244
                                           const SReadHandle* readHandle, SArray* colArray) {
3245
  int32_t code = TSDB_CODE_SUCCESS;
44,275,666✔
3246
  int32_t lino = 0;
44,275,666✔
3247

3248
  pCond->order = TSDB_ORDER_ASC;
44,275,666✔
3249
  pCond->numOfCols = (int32_t)taosArrayGetSize(colArray);
44,284,105✔
3250

3251
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
44,276,052✔
3252
  QUERY_CHECK_NULL(pCond->colList, code, lino, _return, terrno);
44,269,099✔
3253

3254
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
44,265,588✔
3255
  QUERY_CHECK_NULL(pCond->pSlotList, code, lino, _return, terrno);
44,275,586✔
3256

3257
  pCond->twindows = pOrgCond->twindows;
44,266,294✔
3258
  pCond->order = pOrgCond->order;
44,281,740✔
3259
  pCond->type = pOrgCond->type;
44,275,211✔
3260
  pCond->startVersion = -1;
44,265,544✔
3261
  pCond->endVersion = -1;
44,265,394✔
3262
  pCond->skipRollup = true;
44,270,706✔
3263
  pCond->notLoadData = false;
44,273,446✔
3264

3265
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
213,829,936✔
3266
    SColIdPair* pColPair = taosArrayGet(colArray, i);
169,570,719✔
3267
    QUERY_CHECK_NULL(pColPair, code, lino, _return, terrno);
169,569,600✔
3268

3269
    bool find = false;
169,564,119✔
3270
    for (int32_t j = 0; j < pOrgCond->numOfCols; ++j) {
1,021,936,430✔
3271
      if (pOrgCond->colList[j].colId == pColPair->vtbColId) {
1,021,726,995✔
3272
        pCond->colList[i].type = pOrgCond->colList[j].type;
169,584,113✔
3273
        pCond->colList[i].bytes = pOrgCond->colList[j].bytes;
169,585,713✔
3274
        pCond->colList[i].colId = pColPair->orgColId;
169,582,792✔
3275
        pCond->colList[i].pk = pOrgCond->colList[j].pk;
169,592,771✔
3276
        pCond->pSlotList[i] = i;
169,607,122✔
3277
        find = true;
169,578,122✔
3278
        qDebug("%s mapped vtb colId:%d to org colId:%d", __func__, pColPair->vtbColId, pColPair->orgColId);
169,578,122✔
3279
        break;
169,560,972✔
3280
      }
3281
    }
3282
    QUERY_CHECK_CONDITION(find, code, lino, _return, TSDB_CODE_NOT_FOUND);
169,559,218✔
3283
  }
3284

3285
  return code;
44,287,326✔
3286
_return:
×
3287
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(terrno));
×
3288
  taosMemoryFreeClear(pCond->colList);
×
3289
  taosMemoryFreeClear(pCond->pSlotList);
×
UNCOV
3290
  return code;
×
3291
}
3292

3293
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) {
486,308,472✔
3294
  taosMemoryFreeClear(pCond->colList);
486,308,472✔
3295
  taosMemoryFreeClear(pCond->pSlotList);
486,300,518✔
3296
}
486,254,794✔
3297

3298
int32_t convertFillType(int32_t mode) {
1,510,365✔
3299
  int32_t type = TSDB_FILL_NONE;
1,510,365✔
3300
  switch (mode) {
1,510,365✔
3301
    case FILL_MODE_PREV:
114,160✔
3302
      type = TSDB_FILL_PREV;
114,160✔
3303
      break;
114,160✔
3304
    case FILL_MODE_NONE:
×
3305
      type = TSDB_FILL_NONE;
×
UNCOV
3306
      break;
×
3307
    case FILL_MODE_NULL:
161,886✔
3308
      type = TSDB_FILL_NULL;
161,886✔
3309
      break;
161,886✔
3310
    case FILL_MODE_NULL_F:
27,195✔
3311
      type = TSDB_FILL_NULL_F;
27,195✔
3312
      break;
27,195✔
3313
    case FILL_MODE_NEXT:
99,641✔
3314
      type = TSDB_FILL_NEXT;
99,641✔
3315
      break;
99,641✔
3316
    case FILL_MODE_VALUE:
140,149✔
3317
      type = TSDB_FILL_SET_VALUE;
140,149✔
3318
      break;
140,149✔
3319
    case FILL_MODE_VALUE_F:
10,804✔
3320
      type = TSDB_FILL_SET_VALUE_F;
10,804✔
3321
      break;
10,804✔
3322
    case FILL_MODE_LINEAR:
182,350✔
3323
      type = TSDB_FILL_LINEAR;
182,350✔
3324
      break;
182,350✔
3325
    case FILL_MODE_NEAR:
774,180✔
3326
      type = TSDB_FILL_NEAR;
774,180✔
3327
      break;
774,180✔
UNCOV
3328
    default:
×
UNCOV
3329
      type = TSDB_FILL_NONE;
×
3330
  }
3331

3332
  return type;
1,510,365✔
3333
}
3334

3335
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery) {
2,147,483,647✔
3336
  if (ascQuery) {
2,147,483,647✔
3337
    *w = getAlignQueryTimeWindow(pInterval, ts);
2,147,483,647✔
3338
  } else {
3339
    // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
3340
    *w = getAlignQueryTimeWindow(pInterval, ts);
10,010✔
3341

3342
    int64_t key = w->skey;
200,554✔
3343
    while (key < ts) {  // moving towards end
215,802✔
3344
      key = getNextTimeWindowStart(pInterval, key, TSDB_ORDER_ASC);
98,490✔
3345
      if (key > ts) {
97,400✔
3346
        break;
82,152✔
3347
      }
3348

3349
      w->skey = key;
15,248✔
3350
    }
3351
    w->ekey = taosTimeAdd(w->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
199,464✔
3352
  }
3353
}
2,147,483,647✔
3354

3355
static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) {
70,520,203✔
3356
  STimeWindow w = {0};
70,520,203✔
3357

3358
  w.skey = taosTimeTruncate(ts, pInterval);
70,520,203✔
3359
  w.ekey = taosTimeGetIntervalEnd(w.skey, pInterval);
70,507,808✔
3360
  return w;
70,531,271✔
3361
}
3362

3363
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) {
2,164,846✔
3364
  STimeWindow win = *pWindow;
2,164,846✔
3365
  STimeWindow save = win;
2,164,846✔
3366
  while (win.skey <= ts && win.ekey >= ts) {
12,965,466✔
3367
    save = win;
10,800,620✔
3368
    // get previous time window
3369
    getNextTimeWindow(pInterval, &win, order == TSDB_ORDER_DESC ? TSDB_ORDER_ASC : TSDB_ORDER_DESC);
10,800,620✔
3370
  }
3371

3372
  return save;
2,164,846✔
3373
}
3374

3375
// get the correct time window according to the handled timestamp
3376
// todo refactor
3377
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
111,578,286✔
3378
                                int32_t order) {
3379
  STimeWindow w = {0};
111,578,286✔
3380
  if (pResultRowInfo->cur.pageId == -1) {  // the first window, from the previous stored value
111,594,274✔
3381
    getInitialStartTimeWindow(pInterval, ts, &w, (order != TSDB_ORDER_DESC));
4,228,524✔
3382
    return w;
4,225,253✔
3383
  }
3384

3385
  SResultRow* pRow = getResultRowByPos(pBuf, &pResultRowInfo->cur, false);
107,362,460✔
3386
  if (pRow) {
107,358,447✔
3387
    TAOS_SET_OBJ_ALIGNED(&w, pRow->win);
107,363,752✔
3388
  }
3389

3390
  // in case of typical time window, we can calculate time window directly.
3391
  if (w.skey > ts || w.ekey < ts) {
107,370,395✔
3392
    w = doCalculateTimeWindow(ts, pInterval);
70,523,363✔
3393
  }
3394

3395
  if (pInterval->interval != pInterval->sliding) {
107,377,850✔
3396
    // it is an sliding window query, in which sliding value is not equalled to
3397
    // interval value, and we need to find the first qualified time window.
3398
    w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
2,164,846✔
3399
  }
3400

3401
  return w;
107,364,107✔
3402
}
3403

3404
TSKEY getNextTimeWindowStart(const SInterval* pInterval, TSKEY start, int32_t order) {
2,147,483,647✔
3405
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
2,147,483,647✔
3406
  TSKEY   nextStart = taosTimeAdd(start, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
2,147,483,647✔
3407
  nextStart = taosTimeAdd(nextStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision, NULL);
2,147,483,647✔
3408
  nextStart = taosTimeAdd(nextStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
2,147,483,647✔
3409
  return nextStart;
2,147,483,647✔
3410
}
3411

3412
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order) {
2,147,483,647✔
3413
  tw->skey = getNextTimeWindowStart(pInterval, tw->skey, order);
2,147,483,647✔
3414
  tw->ekey = taosTimeAdd(tw->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
2,147,483,647✔
3415
}
2,147,483,647✔
3416

3417
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
415,699,421✔
3418
  return (pLimitInfo->limit.limit != -1 || pLimitInfo->limit.offset != -1 || pLimitInfo->slimit.limit != -1 ||
828,095,505✔
3419
          pLimitInfo->slimit.offset != -1);
412,394,539✔
3420
}
3421

3422
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo) {
×
UNCOV
3423
  return (pLimitInfo->slimit.limit != -1 || pLimitInfo->slimit.offset != -1);
×
3424
}
3425

3426
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) {
547,168,037✔
3427
  SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)};
547,168,037✔
3428
  SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)};
546,861,151✔
3429

3430
  pLimitInfo->limit = limit;
546,853,925✔
3431
  pLimitInfo->slimit = slimit;
546,896,442✔
3432
  pLimitInfo->remainOffset = limit.offset;
546,887,498✔
3433
  pLimitInfo->remainGroupOffset = slimit.offset;
546,865,016✔
3434
  pLimitInfo->numOfOutputRows = 0;
546,890,612✔
3435
  pLimitInfo->numOfOutputGroups = 0;
547,135,286✔
3436
  pLimitInfo->currentGroupId = 0;
547,086,807✔
3437
}
546,998,707✔
3438

3439
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo) {
81,612,387✔
3440
  pLimitInfo->numOfOutputRows = 0;
81,612,387✔
3441
  pLimitInfo->remainOffset = pLimitInfo->limit.offset;
81,644,980✔
3442
}
81,550,826✔
3443

3444
int32_t tableListGetSize(const STableListInfo* pTableList, int32_t* pRes) {
555,151,651✔
3445
  if (taosArrayGetSize(pTableList->pTableList) != taosHashGetSize(pTableList->map)) {
555,151,651✔
3446
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
UNCOV
3447
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3448
  }
3449
  (*pRes) = taosArrayGetSize(pTableList->pTableList);
555,058,704✔
3450
  return TSDB_CODE_SUCCESS;
554,965,548✔
3451
}
3452

3453
uint64_t tableListGetSuid(const STableListInfo* pTableList) { return pTableList->idInfo.suid; }
3,147,083✔
3454

3455
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) {
207,429,076✔
3456
  if (taosArrayGetSize(pTableList->pTableList) == 0) {
207,429,076✔
3457
    return NULL;
1,980✔
3458
  }
3459

3460
  return taosArrayGet(pTableList->pTableList, index);
207,279,162✔
3461
}
3462

3463
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex) {
44,934✔
3464
  int32_t numOfTables = taosArrayGetSize(pTableList->pTableList);
44,934✔
3465
  if (startIndex >= numOfTables) {
44,934✔
UNCOV
3466
    return -1;
×
3467
  }
3468

3469
  for (int32_t i = startIndex; i < numOfTables; ++i) {
493,906✔
3470
    STableKeyInfo* p = taosArrayGet(pTableList->pTableList, i);
493,906✔
3471
    if (!p) {
493,906✔
3472
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
3473
      return -1;
×
3474
    }
3475
    if (p->uid == uid) {
493,906✔
3476
      return i;
44,934✔
3477
    }
3478
  }
UNCOV
3479
  return -1;
×
3480
}
3481

3482
void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, uint64_t* uid, int32_t* type) {
391,985✔
3483
  *psuid = pTableList->idInfo.suid;
391,985✔
3484
  *uid = pTableList->idInfo.uid;
391,985✔
3485
  *type = pTableList->idInfo.tableType;
391,985✔
3486
}
391,985✔
3487

3488
uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
723,710,279✔
3489
  int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
723,710,279✔
3490
  if (slot == NULL) {
724,149,214✔
3491
    qDebug("table:%" PRIu64 " not found in table list", tableUid);
×
UNCOV
3492
    return -1;
×
3493
  }
3494

3495
  STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot);
724,149,214✔
3496
  if (pKeyInfo == NULL) {
724,184,802✔
3497
    qDebug("table:%" PRIu64 " not found in table list", tableUid);
×
UNCOV
3498
    return -1;
×
3499
  }
3500
  return pKeyInfo->groupId;
724,204,446✔
3501
}
3502

3503
// TODO handle the group offset info, fix it, the rule of group output will be broken by this function
3504
// int32_t tableListRemoveTableInfo(STableListInfo* pTableList, uint64_t uid) {
3505
//   int32_t code = TSDB_CODE_SUCCESS;
3506
//   int32_t lino = 0;
3507

3508
//   int32_t* slot = taosHashGet(pTableList->map, &uid, sizeof(uid));
3509
//   if (slot == NULL) {
3510
//     qDebug("table:%" PRIu64 " not found in table list", uid);
3511
//     return 0;
3512
//   }
3513

3514
//   taosArrayRemove(pTableList->pTableList, *slot);
3515
//   code = taosHashRemove(pTableList->map, &uid, sizeof(uid));
3516

3517
//   _end:
3518
//   if (code != TSDB_CODE_SUCCESS) {
3519
//     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
3520
//   } else {
3521
//     qDebug("uid:%" PRIu64 ", remove from table list", uid);
3522
//   }
3523

3524
//   return code;
3525
// }
3526

3527
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
1,170,079✔
3528
  int32_t code = TSDB_CODE_SUCCESS;
1,170,079✔
3529
  int32_t lino = 0;
1,170,079✔
3530
  if (pTableList->map == NULL) {
1,170,079✔
3531
    pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
UNCOV
3532
    QUERY_CHECK_NULL(pTableList->map, code, lino, _end, terrno);
×
3533
  }
3534

3535
  STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
1,172,717✔
3536
  void*         p = taosHashGet(pTableList->map, &uid, sizeof(uid));
1,171,267✔
3537
  if (p != NULL) {
1,170,077✔
3538
    qInfo("table:%" PRId64 " already in tableIdList, ignore it", uid);
910✔
3539
    goto _end;
910✔
3540
  }
3541

3542
  void* tmp = taosArrayPush(pTableList->pTableList, &keyInfo);
1,169,167✔
3543
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,168,870✔
3544

3545
  int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
1,168,870✔
3546
  code = taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
1,170,355✔
3547
  if (code != TSDB_CODE_SUCCESS) {
1,173,327✔
3548
    // we have checked the existence of uid in hash map above
3549
    QUERY_CHECK_CONDITION((code != TSDB_CODE_DUP_KEY), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
UNCOV
3550
    taosArrayPopTailBatch(pTableList->pTableList, 1);  // let's pop the last element in the array list
×
3551
  }
3552

3553
_end:
1,174,237✔
3554
  if (code != TSDB_CODE_SUCCESS) {
1,170,898✔
UNCOV
3555
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3556
  } else {
3557
    qDebug("uid:%" PRIu64 ", groupId:%" PRIu64 " added into table list, slot:%d, total:%d", uid, gid, slot, slot + 1);
1,170,898✔
3558
  }
3559

3560
  return code;
1,173,868✔
3561
}
3562

3563
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
199,649,588✔
3564
                              int32_t* size) {
3565
  int32_t totalGroups = tableListGetOutputGroups(pTableList);
199,649,588✔
3566
  int32_t numOfTables = 0;
199,661,577✔
3567
  int32_t code = tableListGetSize(pTableList, &numOfTables);
199,700,371✔
3568
  if (code != TSDB_CODE_SUCCESS) {
199,736,577✔
3569
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
3570
    return code;
×
3571
  }
3572

3573
  if (ordinalGroupIndex < 0 || ordinalGroupIndex >= totalGroups) {
199,736,577✔
UNCOV
3574
    return TSDB_CODE_INVALID_PARA;
×
3575
  }
3576

3577
  // here handle two special cases:
3578
  // 1. only one group exists, and 2. one table exists for each group.
3579
  if (totalGroups == 1) {
199,736,577✔
3580
    *size = numOfTables;
199,299,714✔
3581
    *pKeyInfo = (*size == 0) ? NULL : taosArrayGet(pTableList->pTableList, 0);
199,247,386✔
3582
    return TSDB_CODE_SUCCESS;
199,271,849✔
3583
  } else if (totalGroups == numOfTables) {
481,347✔
3584
    *size = 1;
452,455✔
3585
    *pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex);
452,455✔
3586
    return TSDB_CODE_SUCCESS;
452,455✔
3587
  }
3588

3589
  int32_t offset = pTableList->groupOffset[ordinalGroupIndex];
29,502✔
3590
  if (ordinalGroupIndex < totalGroups - 1) {
55,127✔
3591
    *size = pTableList->groupOffset[ordinalGroupIndex + 1] - offset;
41,465✔
3592
  } else {
3593
    *size = numOfTables - offset;
13,662✔
3594
  }
3595

3596
  *pKeyInfo = taosArrayGet(pTableList->pTableList, offset);
55,127✔
3597
  return TSDB_CODE_SUCCESS;
55,127✔
3598
}
3599

3600
int32_t tableListGetOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; }
549,651,527✔
3601

3602
bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; }
584,152✔
3603

3604
STableListInfo* tableListCreate() {
230,992,310✔
3605
  STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo));
230,992,310✔
3606
  if (pListInfo == NULL) {
230,784,731✔
UNCOV
3607
    return NULL;
×
3608
  }
3609

3610
  pListInfo->remainGroups = NULL;
230,784,731✔
3611
  pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
230,819,759✔
3612
  if (pListInfo->pTableList == NULL) {
231,041,184✔
UNCOV
3613
    goto _error;
×
3614
  }
3615

3616
  pListInfo->map = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
231,018,122✔
3617
  if (pListInfo->map == NULL) {
231,247,534✔
UNCOV
3618
    goto _error;
×
3619
  }
3620

3621
  pListInfo->numOfOuputGroups = 1;
231,245,531✔
3622
  return pListInfo;
231,246,347✔
3623

3624
_error:
×
3625
  tableListDestroy(pListInfo);
×
UNCOV
3626
  return NULL;
×
3627
}
3628

3629
void tableListDestroy(STableListInfo* pTableListInfo) {
241,056,694✔
3630
  if (pTableListInfo == NULL) {
241,056,694✔
3631
    return;
9,952,243✔
3632
  }
3633

3634
  taosArrayDestroy(pTableListInfo->pTableList);
231,104,451✔
3635
  taosMemoryFreeClear(pTableListInfo->groupOffset);
231,067,476✔
3636

3637
  taosHashCleanup(pTableListInfo->map);
231,071,970✔
3638
  taosHashCleanup(pTableListInfo->remainGroups);
231,192,633✔
3639
  pTableListInfo->pTableList = NULL;
231,194,498✔
3640
  pTableListInfo->map = NULL;
231,205,920✔
3641
  taosMemoryFree(pTableListInfo);
231,198,348✔
3642
}
3643

3644
void tableListClear(STableListInfo* pTableListInfo) {
767,645✔
3645
  if (pTableListInfo == NULL) {
767,645✔
UNCOV
3646
    return;
×
3647
  }
3648

3649
  taosArrayClear(pTableListInfo->pTableList);
767,645✔
3650
  taosHashClear(pTableListInfo->map);
769,617✔
3651
  taosHashClear(pTableListInfo->remainGroups);
770,615✔
3652
  taosMemoryFree(pTableListInfo->groupOffset);
770,912✔
3653
  pTableListInfo->numOfOuputGroups = 1;
770,912✔
3654
  pTableListInfo->oneTableForEachGroup = false;
770,615✔
3655
}
3656

3657
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
534,561,159✔
3658
  STableKeyInfo* pInfo1 = (STableKeyInfo*)p1;
534,561,159✔
3659
  STableKeyInfo* pInfo2 = (STableKeyInfo*)p2;
534,561,159✔
3660

3661
  if (pInfo1->groupId == pInfo2->groupId) {
534,561,159✔
3662
    return 0;
501,322,576✔
3663
  } else {
3664
    return pInfo1->groupId < pInfo2->groupId ? -1 : 1;
33,247,080✔
3665
  }
3666
}
3667

3668
int32_t sortTableGroup(STableListInfo* pTableListInfo) {
28,791,212✔
3669
  int32_t code = TSDB_CODE_SUCCESS;
28,791,212✔
3670
  taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
28,791,212✔
3671
  int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
28,826,271✔
3672
  if (size == 0) {
28,802,471✔
3673
    pTableListInfo->numOfOuputGroups = 0;
×
UNCOV
3674
    return code;
×
3675
  }
3676

3677
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
28,802,471✔
3678
  if (!pList) {
28,802,384✔
3679
    code = terrno;
×
UNCOV
3680
    goto end;
×
3681
  }
3682

3683
  STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
28,802,384✔
3684
  if (pInfo == NULL) {
28,784,102✔
3685
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3686
    code = terrno;
×
UNCOV
3687
    goto end;
×
3688
  }
3689
  uint64_t gid = pInfo->groupId;
28,784,102✔
3690

3691
  int32_t start = 0;
28,799,239✔
3692
  void*   tmp = taosArrayPush(pList, &start);
28,802,360✔
3693
  if (tmp == NULL) {
28,802,360✔
3694
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3695
    code = terrno;
×
UNCOV
3696
    goto end;
×
3697
  }
3698

3699
  for (int32_t i = 1; i < size; ++i) {
149,924,770✔
3700
    pInfo = taosArrayGet(pTableListInfo->pTableList, i);
121,136,895✔
3701
    if (pInfo == NULL) {
121,117,160✔
3702
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
164✔
3703
      code = terrno;
164✔
UNCOV
3704
      goto end;
×
3705
    }
3706
    if (pInfo->groupId != gid) {
121,116,996✔
3707
      tmp = taosArrayPush(pList, &i);
6,807,831✔
3708
      if (tmp == NULL) {
6,807,831✔
3709
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3710
        code = terrno;
×
UNCOV
3711
        goto end;
×
3712
      }
3713
      gid = pInfo->groupId;
6,807,831✔
3714
    }
3715
  }
3716

3717
  pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
28,813,007✔
3718
  pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
28,819,906✔
3719
  if (pTableListInfo->groupOffset == NULL) {
28,782,458✔
3720
    code = terrno;
×
UNCOV
3721
    goto end;
×
3722
  }
3723

3724
  memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
28,743,494✔
3725

3726
end:
28,787,058✔
3727
  taosArrayDestroy(pList);
28,789,257✔
3728
  return code;
28,713,931✔
3729
}
3730

3731
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SScanPhysiNode* pScanNode,
198,194,062✔
3732
                                    SNodeList* group, bool groupSort, uint8_t* digest, SStorageAPI* pAPI, SHashObj* groupIdMap) {
3733
  int32_t code = TSDB_CODE_SUCCESS;
198,194,062✔
3734

3735
  bool   groupByTbname = groupbyTbname(group);
198,194,062✔
3736
  size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
198,132,323✔
3737
  if (!numOfTables) {
198,127,551✔
3738
    return code;
3,261✔
3739
  }
3740
  qDebug("numOfTables:%zu, groupByTbname:%d, group:%p", numOfTables, groupByTbname, group);
198,124,290✔
3741
  if (group == NULL || groupByTbname) {
198,033,658✔
3742
    if (tsCountAlwaysReturnValue && QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode) &&
194,078,797✔
3743
        ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) {
150,076,774✔
3744
      pTableListInfo->remainGroups =
14,632,169✔
3745
          taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
14,631,958✔
3746
      if (pTableListInfo->remainGroups == NULL) {
14,632,169✔
UNCOV
3747
        return terrno;
×
3748
      }
3749

3750
      for (int i = 0; i < numOfTables; i++) {
68,234,572✔
3751
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
53,601,646✔
3752
        if (!info) {
53,604,741✔
3753
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
3754
          return terrno;
×
3755
        }
3756
        info->groupId = groupByTbname ? info->uid : 0;
53,604,741✔
3757
        int32_t tempRes = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId),
53,603,943✔
3758
                                      &(info->uid), sizeof(info->uid));
53,604,288✔
3759
        if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
53,603,786✔
3760
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
UNCOV
3761
          return tempRes;
×
3762
        }
3763
      }
3764
    } else {
3765
      for (int32_t i = 0; i < numOfTables; i++) {
724,229,332✔
3766
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
544,980,453✔
3767
        if (!info) {
544,951,374✔
3768
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
55,151✔
3769
          return terrno;
55,151✔
3770
        }
3771
        info->groupId = groupByTbname ? info->uid : 0;
544,896,223✔
3772
        
3773
      }
3774
    }
3775
    if (groupIdMap && group != NULL){
193,881,805✔
3776
      getColInfoResultForGroupbyForStream(pHandle->vnode, group, pTableListInfo, pAPI, groupIdMap);
67,011✔
3777
    }
3778

3779
    pTableListInfo->oneTableForEachGroup = groupByTbname;
193,905,589✔
3780
    if (numOfTables == 1 && pTableListInfo->idInfo.tableType == TSDB_CHILD_TABLE) {
194,083,536✔
3781
      pTableListInfo->oneTableForEachGroup = true;
45,751,760✔
3782
    }
3783

3784
    if (groupSort && groupByTbname) {
194,098,172✔
3785
      taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
2,176,178✔
3786
      pTableListInfo->numOfOuputGroups = numOfTables;
2,178,337✔
3787
    } else if (groupByTbname && pScanNode->groupOrderScan) {
191,921,994✔
3788
      pTableListInfo->numOfOuputGroups = numOfTables;
29,076✔
3789
    } else {
3790
      pTableListInfo->numOfOuputGroups = 1;
191,891,046✔
3791
    }
3792
    if (groupSort || pScanNode->groupOrderScan) {
194,174,742✔
3793
      code = sortTableGroup(pTableListInfo);
28,715,961✔
3794
    }
3795
  } else {
3796
    bool initRemainGroups = false;
3,954,861✔
3797
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) {
3,954,861✔
3798
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode;
3,871,649✔
3799
      if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable &&
3,871,649✔
3800
          !(groupSort || pScanNode->groupOrderScan)) {
1,944,421✔
3801
        initRemainGroups = true;
1,921,117✔
3802
      }
3803
    }
3804

3805
    code = getColInfoResultForGroupby(pHandle->vnode, group, pTableListInfo, digest, pAPI, initRemainGroups, groupIdMap);
3,954,861✔
3806
    if (code != TSDB_CODE_SUCCESS) {
3,953,792✔
UNCOV
3807
      return code;
×
3808
    }
3809

3810
    if (pScanNode->groupOrderScan) pTableListInfo->numOfOuputGroups = taosArrayGetSize(pTableListInfo->pTableList);
3,953,792✔
3811

3812
    if (groupSort || pScanNode->groupOrderScan) {
3,954,408✔
3813
      code = sortTableGroup(pTableListInfo);
128,973✔
3814
    }
3815
  }
3816

3817
  // add all table entry in the hash map
3818
  size_t size = taosArrayGetSize(pTableListInfo->pTableList);
198,061,826✔
3819
  for (int32_t i = 0; i < size; ++i) {
819,216,572✔
3820
    STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i);
620,944,906✔
3821
    if (!p) {
620,684,773✔
3822
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
3823
      return terrno;
×
3824
    }
3825
    int32_t tempRes = taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t));
620,684,773✔
3826
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
621,201,065✔
3827
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
UNCOV
3828
      return tempRes;
×
3829
    }
3830
  }
3831

3832
  return code;
198,373,588✔
3833
}
3834

3835
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
214,300,808✔
3836
                                STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
3837
                                SExecTaskInfo* pTaskInfo, SHashObj* groupIdMap) {
3838
  int64_t     st = taosGetTimestampUs();
214,319,867✔
3839
  const char* idStr = GET_TASKID(pTaskInfo);
214,319,867✔
3840

3841
  if (pHandle == NULL) {
214,033,583✔
3842
    qError("invalid handle, in creating operator tree, %s", idStr);
×
UNCOV
3843
    return TSDB_CODE_INVALID_PARA;
×
3844
  }
3845

3846
  if (pHandle->uid != 0) {
214,033,583✔
3847
    pScanNode->uid = pHandle->uid;
94,652✔
3848
    pScanNode->tableType = TSDB_CHILD_TABLE;
94,652✔
3849
  }
3850
  uint8_t digest[17] = {0};
214,287,935✔
3851
  int32_t code = getTableList(pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo, digest, idStr,
214,284,751✔
3852
                              &pTaskInfo->storageAPI, pTaskInfo->pStreamRuntimeInfo);
214,308,408✔
3853
  if (code != TSDB_CODE_SUCCESS) {
214,459,126✔
3854
    qError("failed to getTableList, code:%s", tstrerror(code));
898✔
3855
    return code;
898✔
3856
  }
3857

3858
  int32_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
214,458,228✔
3859

3860
  int64_t st1 = taosGetTimestampUs();
214,438,679✔
3861
  pTaskInfo->cost.extractListTime = (st1 - st) / 1000.0;
214,438,679✔
3862
  qDebug("extract queried table list completed, %d tables, elapsed time:%.2f ms %s", numOfTables,
214,397,771✔
3863
         pTaskInfo->cost.extractListTime, idStr);
3864

3865
  if (numOfTables == 0) {
214,369,210✔
3866
    qDebug("no table qualified for query, %s", idStr);
16,261,001✔
3867
    return TSDB_CODE_SUCCESS;
16,260,343✔
3868
  }
3869

3870
  code = buildGroupIdMapForAllTables(pTableListInfo, pHandle, pScanNode, pGroupTags, groupSort, digest, &pTaskInfo->storageAPI, groupIdMap);
198,108,209✔
3871
  if (code != TSDB_CODE_SUCCESS) {
198,331,117✔
UNCOV
3872
    return code;
×
3873
  }
3874

3875
  pTaskInfo->cost.groupIdMapTime = (taosGetTimestampUs() - st1) / 1000.0;
198,337,029✔
3876
  qDebug("generate group id map completed, elapsed time:%.2f ms %s", pTaskInfo->cost.groupIdMapTime, idStr);
198,322,589✔
3877

3878
  return TSDB_CODE_SUCCESS;
198,317,332✔
3879
}
3880

3881
char* getStreamOpName(uint16_t opType) {
5,853,324✔
3882
  switch (opType) {
5,853,324✔
3883
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
×
UNCOV
3884
      return "stream scan";
×
3885
    case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
5,691,850✔
3886
      return "project";
5,691,850✔
3887
    case QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW:
161,474✔
3888
      return "external window";
161,474✔
3889
  }
UNCOV
3890
  return "error name";
×
3891
}
3892

3893
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr, int64_t qId) {
653,743,598✔
3894
  if (qDebugFlag & DEBUG_TRACE) {
653,743,598✔
3895
    if (!pBlock) {
20,022✔
3896
      qDebug("%" PRIx64 " %s %s %s: Block is Null", qId, taskIdStr, flag, __func__);
3,621✔
3897
      return;
3,621✔
3898
    } else if (pBlock->info.rows == 0) {
16,401✔
3899
      qDebug("%" PRIx64 " %s %s %s: Block is Empty. block type %d", qId, taskIdStr, flag, __func__, pBlock->info.type);
×
UNCOV
3900
      return;
×
3901
    }
3902
    
3903
    char*   pBuf = NULL;
16,401✔
3904
    int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr, qId);
16,401✔
3905
    if (code == 0) {
16,401✔
3906
      qDebugL("%" PRIx64 " %s %s", qId, __func__, pBuf);
16,401✔
3907
      taosMemoryFree(pBuf);
16,401✔
3908
    }
3909
  }
3910
}
3911

3912
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
×
3913
  if (!pBlock) {
×
3914
    qDebug("%s===stream===%s %s: Block is Null", taskIdStr, flag, opStr);
×
3915
    return;
×
3916
  } else if (pBlock->info.rows == 0) {
×
UNCOV
3917
    qDebug("%s===stream===%s %s: Block is Empty. block type %d.skey:%" PRId64 ",ekey:%" PRId64 ",version%" PRId64,
×
3918
           taskIdStr, flag, opStr, pBlock->info.type, pBlock->info.window.skey, pBlock->info.window.ekey,
3919
           pBlock->info.version);
UNCOV
3920
    return;
×
3921
  }
3922
  if (qDebugFlag & DEBUG_TRACE) {
×
3923
    char* pBuf = NULL;
×
3924
    char  flagBuf[64];
×
3925
    snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
×
3926
    int32_t code = dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr, 0);
×
3927
    if (code == 0) {
×
3928
      qDebug("%s", pBuf);
×
UNCOV
3929
      taosMemoryFree(pBuf);
×
3930
    }
3931
  }
3932
}
3933

3934
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
14,972,840✔
3935

3936
void updateTimeWindowInfo(SColumnInfoData* pColData, const STimeWindow* pWin, int64_t delta) {
2,147,483,647✔
3937
  int64_t* ts = (int64_t*)pColData->pData;
2,147,483,647✔
3938

3939
  int64_t duration = pWin->ekey > pWin->skey ? pWin->ekey - pWin->skey + delta : pWin->skey - pWin->ekey + delta;
2,147,483,647✔
3940
  ts[2] = duration;            // set the duration
2,147,483,647✔
3941
  ts[3] = pWin->skey;          // window start key
2,147,483,647✔
3942
  ts[4] = pWin->ekey + delta;  // window end key
2,147,483,647✔
3943
}
2,147,483,647✔
3944

3945
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock,
2,147,483,647✔
3946
                 int32_t rowIndex) {
3947
  SColumnDataAgg* pColAgg = NULL;
2,147,483,647✔
3948
  const char*     isNull = oldkeyBuf;
2,147,483,647✔
3949
  const char*     p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size;
2,147,483,647✔
3950

3951
  for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
2,147,483,647✔
3952
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
2,147,483,647✔
3953
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
2,147,483,647✔
3954
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
2,147,483,647✔
3955

3956
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
2,147,483,647✔
3957
      if (isNull[i] != 1) return 1;
264,505,341✔
3958
    } else {
3959
      if (isNull[i] != 0) return 1;
2,147,483,647✔
3960
      const char* val = colDataGetData(pColInfoData, rowIndex);
2,147,483,647✔
3961
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
3962
        int32_t len = getJsonValueLen(val);
×
3963
        if (memcmp(p, val, len) != 0) return 1;
×
UNCOV
3964
        p += len;
×
3965
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
2,147,483,647✔
3966
        if (IS_STR_DATA_BLOB(pCol->type)) {
1,221,986,073✔
3967
          if (memcmp(p, val, blobDataTLen(val)) != 0) return 1;
×
UNCOV
3968
          p += blobDataTLen(val);
×
3969
        } else {
3970
          if (memcmp(p, val, varDataTLen(val)) != 0) return 1;
1,222,808,721✔
3971
          p += varDataTLen(val);
1,222,245,642✔
3972
        }
3973
      } else {
3974
        if (0 != memcmp(p, val, pCol->bytes)) return 1;
2,147,483,647✔
3975
        p += pCol->bytes;
2,147,483,647✔
3976
      }
3977
    }
3978
  }
3979
  if ((int32_t)(p - oldkeyBuf) != oldKeysLen) return 1;
2,147,483,647✔
3980
  return 0;
2,147,483,647✔
3981
}
3982

3983
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex) {
1,615,398✔
3984
  uint32_t        colNum = pSortGroupCols->size;
1,615,398✔
3985
  SColumnDataAgg* pColAgg = NULL;
1,615,398✔
3986
  char*           isNull = keyBuf;
1,615,398✔
3987
  char*           p = keyBuf + sizeof(int8_t) * colNum;
1,615,398✔
3988

3989
  for (int32_t i = 0; i < colNum; ++i) {
4,422,186✔
3990
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
2,806,788✔
3991
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
2,806,788✔
3992
    if (pCol->slotId > pBlock->pDataBlock->size) continue;
2,806,788✔
3993

3994
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
2,806,788✔
3995

3996
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
5,613,576✔
3997
      isNull[i] = 1;
135,900✔
3998
    } else {
3999
      isNull[i] = 0;
2,670,888✔
4000
      const char* val = colDataGetData(pColInfoData, rowIndex);
2,670,888✔
4001
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
2,670,888✔
4002
        int32_t len = getJsonValueLen(val);
×
4003
        memcpy(p, val, len);
×
UNCOV
4004
        p += len;
×
4005
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
2,670,888✔
4006
        if (IS_STR_DATA_BLOB(pCol->type)) {
998,412✔
4007
          blobDataCopy(p, val);
×
UNCOV
4008
          p += blobDataTLen(val);
×
4009
        } else {
4010
          varDataCopy(p, val);
998,412✔
4011
          p += varDataTLen(val);
998,412✔
4012
        }
4013
      } else {
4014
        memcpy(p, val, pCol->bytes);
1,672,476✔
4015
        p += pCol->bytes;
1,672,476✔
4016
      }
4017
    }
4018
  }
4019
  return (int32_t)(p - keyBuf);
1,615,398✔
4020
}
4021

4022
uint64_t calcGroupId(char* pData, int32_t len) {
2,147,483,647✔
4023
  T_MD5_CTX context;
2,147,483,647✔
4024
  tMD5Init(&context);
2,147,483,647✔
4025
  tMD5Update(&context, (uint8_t*)pData, len);
2,147,483,647✔
4026
  tMD5Final(&context);
2,147,483,647✔
4027

4028
  // NOTE: only extract the initial 8 bytes of the final MD5 digest
4029
  uint64_t id = 0;
2,147,483,647✔
4030
  memcpy(&id, context.digest, sizeof(uint64_t));
2,147,483,647✔
4031
  if (0 == id) memcpy(&id, context.digest + 8, sizeof(uint64_t));
2,147,483,647✔
4032
  return id;
2,147,483,647✔
4033
}
4034

4035
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
106,455✔
4036
  SNode*     node;
4037
  SNodeList* ret = NULL;
106,455✔
4038
  FOREACH(node, pSortKeys) {
325,254✔
4039
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)node;
218,799✔
4040
    int32_t           code = nodesListMakeAppend(&ret, pSortKey->pExpr);
218,799✔
4041
    if (code != TSDB_CODE_SUCCESS) {
218,799✔
4042
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
4043
      terrno = code;
×
UNCOV
4044
      return NULL;
×
4045
    }
4046
  }
4047
  return ret;
106,908✔
4048
}
4049

4050
int32_t extractKeysLen(const SArray* keys, int32_t* pLen) {
106,455✔
4051
  int32_t code = TSDB_CODE_SUCCESS;
106,455✔
4052
  int32_t lino = 0;
106,455✔
4053
  int32_t len = 0;
106,455✔
4054
  int32_t keyNum = taosArrayGetSize(keys);
106,455✔
4055
  for (int32_t i = 0; i < keyNum; ++i) {
269,988✔
4056
    SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
163,533✔
4057
    QUERY_CHECK_NULL(pCol, code, lino, _end, terrno);
163,986✔
4058
    len += pCol->bytes;
163,986✔
4059
  }
4060
  len += sizeof(int8_t) * keyNum;  // null flag
106,455✔
4061
  *pLen = len;
106,455✔
4062

4063
_end:
106,455✔
4064
  if (code != TSDB_CODE_SUCCESS) {
106,455✔
UNCOV
4065
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4066
  }
4067
  return code;
106,455✔
4068
}
4069

4070
int32_t parseErrorMsgFromAnalyticServer(SJson* pJson, const char* pId) {
×
4071
  int32_t code = TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
4072
  if (pJson == NULL) {
×
UNCOV
4073
    return code;
×
4074
  }
4075

4076
  char    pMsg[1024] = {0};
×
UNCOV
4077
  int32_t ret = tjsonGetStringValue(pJson, "msg", pMsg);
×
4078

4079
  if (ret == 0) {
×
4080
    qError("%s failed to exec imputation, msg:%s", pId, pMsg);
×
4081
    if (strstr(pMsg, "white noise") != NULL) {
×
4082
      code = TSDB_CODE_ANA_WN_DATA;
×
4083
    } else if (strstr(pMsg, "white-noise") != NULL) {
×
4084
      code = TSDB_CODE_ANA_WN_DATA;
×
4085
    } else if (strstr(pMsg, "[Errno 111] Connection refused") != NULL) {
×
UNCOV
4086
      code = TSDB_CODE_ANA_ALGO_NOT_LOAD;
×
4087
    }
4088
  } else {
UNCOV
4089
    qError("%s failed to extract msg from server, unknown error", pId);
×
4090
  }
4091

UNCOV
4092
  return code;
×
4093
}
4094

4095

4096
int32_t createExprSubQResBlock(SSDataBlock** ppBlock, SDataType* pResType) {
12,289,893✔
4097
  int32_t code = 0;
12,289,893✔
4098
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
12,289,893✔
4099
  if (pBlock == NULL) {
12,288,420✔
4100
    return terrno;
×
4101
  }
4102

4103
  pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
12,288,420✔
4104
  if (pBlock->pDataBlock == NULL) {
12,290,519✔
4105
    code = terrno;
×
4106
    taosMemoryFree(pBlock);
×
4107
    return code;
×
4108
  }
4109

4110
  SColumnInfoData idata =
1,585,316✔
4111
      createColumnInfoData(pResType->type, pResType->bytes, 0);
12,290,519✔
4112
  idata.info.scale = pResType->scale;
12,292,535✔
4113
  idata.info.precision = pResType->precision;
12,292,535✔
4114

4115
  code = blockDataAppendColInfo(pBlock, &idata);
12,292,535✔
4116
  if (code != TSDB_CODE_SUCCESS) {
12,289,808✔
4117
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
4118
    blockDataDestroy(pBlock);
×
4119
    *ppBlock = NULL;
×
4120
    return code;
×
4121
  }
4122

4123
  *ppBlock = pBlock;
12,289,808✔
4124

4125
  return code;
12,290,221✔
4126
}
4127

4128

4129
int32_t extractSingleRspBlock(SRetrieveTableRsp* pRetrieveRsp, SSDataBlock* pb) {
12,290,595✔
4130
  int32_t            code = TSDB_CODE_SUCCESS;
12,290,595✔
4131
  int32_t            lino = 0;
12,290,595✔
4132
  void*              decompBuf = NULL;
12,290,595✔
4133

4134
  char* pNextStart = pRetrieveRsp->data;
12,290,595✔
4135
  char* pStart = pNextStart;
12,291,008✔
4136

4137
  int32_t index = 0;
12,291,008✔
4138

4139
  if (pRetrieveRsp->compressed) {  // decompress the data
12,291,008✔
4140
    decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
4141
    QUERY_CHECK_NULL(decompBuf, code, lino, _end, terrno);
×
4142
  }
4143

4144
  int32_t compLen = *(int32_t*)pStart;
12,289,682✔
4145
  pStart += sizeof(int32_t);
12,289,682✔
4146

4147
  int32_t rawLen = *(int32_t*)pStart;
12,289,682✔
4148
  pStart += sizeof(int32_t);
12,289,682✔
4149
  QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
12,289,682✔
4150

4151
  pNextStart = pStart + compLen;
12,289,682✔
4152
  if (pRetrieveRsp->compressed && (compLen < rawLen)) {
12,290,095✔
4153
    int32_t t = tsDecompressString(pStart, compLen, 1, decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
4154
    QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
4155
    pStart = decompBuf;
×
4156
  }
4157

4158
  code = blockDecodeInternal(pb, pStart, (const char**)&pStart);
12,289,682✔
4159
  if (code != 0) {
12,288,900✔
4160
    taosMemoryFreeClear(pRetrieveRsp);
×
4161
    goto _end;
×
4162
  }
4163

4164
_end:
12,288,900✔
4165
  if (code != TSDB_CODE_SUCCESS) {
12,288,900✔
4166
    blockDataDestroy(pb);
×
4167
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4168
  }
4169
  return code;
12,285,607✔
4170
}
4171

4172
int32_t setValueFromResBlock(STaskSubJobCtx* ctx, SRemoteValueNode* pRes, SSDataBlock* pBlock) {
9,329,555✔
4173
  int32_t code = 0;
9,329,555✔
4174
  bool needFree = true;
9,329,555✔
4175
  int32_t colNum = taosArrayGetSize(pBlock->pDataBlock);
9,329,555✔
4176
  if (NULL == pBlock->pDataBlock || 1 != colNum || pBlock->info.rows > 1) {
9,331,979✔
4177
    qError("%s invalid scl fetch res block, pDataBlock:%p, colNum:%d, rows:%" PRId64, 
2,376✔
4178
      ctx->idStr, pBlock->pDataBlock, colNum, pBlock->info.rows);
4179
    return TSDB_CODE_PAR_INVALID_SCALAR_SUBQ_RES_ROWS;
×
4180
  }
4181
  
4182
  pRes->val.node.type = QUERY_NODE_VALUE;
9,329,603✔
4183
  pRes->val.flag &= (~VALUE_FLAG_VAL_UNSET);
9,329,603✔
4184
  pRes->val.translate = true;
9,329,603✔
4185
  
4186
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0);
9,329,603✔
4187
  if (colDataIsNull_s(pCol, 0)) {
9,327,933✔
4188
    pRes->val.isNull = true;
462,589✔
4189
  } else {
4190
    code = nodesSetValueNodeValueExt(&pRes->val, colDataGetData(pCol, 0), &needFree);
8,865,344✔
4191
  }
4192

4193
  if (!needFree) {
9,326,462✔
UNCOV
4194
    pCol->pData = NULL;
×
4195
  }
4196

4197
  return code;
9,326,462✔
4198
}
4199

4200
void handleRemoteValueRes(SScalarFetchParam* pParam, STaskSubJobCtx* ctx, SRetrieveTableRsp* pRsp) {
12,070,143✔
4201
  SSDataBlock* pResBlock = NULL;
12,070,143✔
4202

4203
  qDebug("%s scl fetch rsp received, subQIdx:%d, rows:%" PRId64 , ctx->idStr, pParam->subQIdx, pRsp->numOfRows);
12,070,143✔
4204

4205
  if (pRsp->numOfRows > 1 || pRsp->numOfBlocks > 1 || !pRsp->completed) {
12,070,143✔
4206
    qError("%s invalid scl fetch rsp received, subQIdx:%d, rows:%" PRId64 ", blocks:%d, completed:%d", 
476✔
4207
      ctx->idStr, pParam->subQIdx, pRsp->numOfRows, pRsp->numOfBlocks, pRsp->completed);
4208
    ctx->code = TSDB_CODE_PAR_INVALID_SCALAR_SUBQ_RES_ROWS;
413✔
4209

4210
    return;
2,607,575✔
4211
  }
4212

4213
  if (0 == pRsp->numOfRows) {
12,070,080✔
4214
    SRemoteValueNode* pRemote = (SRemoteValueNode*)pParam->pRes;
2,737,263✔
4215
    pRemote->val.node.type = QUERY_NODE_VALUE;
2,737,263✔
4216
    pRemote->val.isNull = true;
2,737,263✔
4217
    pRemote->val.translate = true;
2,737,263✔
4218
    pRemote->val.flag &= (~VALUE_FLAG_VAL_UNSET);
2,737,263✔
4219
    taosArraySet(ctx->subResNodes, pParam->subQIdx, &pParam->pRes);
2,737,263✔
4220

4221
    return;
2,737,257✔
4222
  }
4223
  
4224
  ctx->code = createExprSubQResBlock(&pResBlock, &((SRemoteValueNode*)pParam->pRes)->val.node.resType);
9,332,817✔
4225
  if (TSDB_CODE_SUCCESS == ctx->code) {
9,331,602✔
4226
    ctx->code = blockDataEnsureCapacity(pResBlock, 1);
9,332,018✔
4227
  }
4228
  if (TSDB_CODE_SUCCESS == ctx->code) {
9,333,464✔
4229
    ctx->code = extractSingleRspBlock(pRsp, pResBlock);
9,333,183✔
4230
  }
4231
  if (TSDB_CODE_SUCCESS == ctx->code) {
9,327,682✔
4232
    ctx->code = setValueFromResBlock(ctx, (SRemoteValueNode*)pParam->pRes, pResBlock);
9,329,591✔
4233
  }
4234
  if (TSDB_CODE_SUCCESS == ctx->code) {
9,324,695✔
4235
    taosArraySet(ctx->subResNodes, pParam->subQIdx, &pParam->pRes);
9,326,812✔
4236
  }
4237

4238
  blockDataDestroy(pResBlock);  
9,327,335✔
4239
}
4240

4241

4242
int32_t updateValueListFromResBlock(STaskSubJobCtx* ctx, SRemoteValueListNode* pRes, SSDataBlock* pBlock) {
2,956,452✔
4243
  int32_t code = 0, lino = 0;
2,956,452✔
4244
  int32_t colNum = taosArrayGetSize(pBlock->pDataBlock);
2,956,452✔
4245
  if (NULL == pBlock->pDataBlock || 1 != colNum) {
2,958,352✔
4246
    qError("%s invalid scl fetch res block, pDataBlock:%p, colNum:%d", ctx->idStr, pBlock->pDataBlock, colNum);
325✔
UNCOV
4247
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
4248
  }
4249

4250
  pRes->hasValue = true;
2,958,028✔
4251
  
4252
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0);
2,958,028✔
4253
  TAOS_CHECK_EXIT(scalarBuildRemoteListHash(pRes, pCol, pBlock->info.rows));
2,957,178✔
4254

4255
_exit:
2,958,420✔
4256

4257
  if (code) {
2,958,420✔
UNCOV
4258
    qError("%s %s failed with error: %s", ctx->idStr, __func__, tstrerror(code));
×
4259
  }
4260
  
4261
  return code;
2,957,394✔
4262
}
4263

4264

4265

4266
void handleRemoteValueListRes(SScalarFetchParam* pParam, STaskSubJobCtx* ctx, SRetrieveTableRsp* pRsp, bool* fetchDone) {
3,089,445✔
4267
  SSDataBlock* pResBlock = NULL;
3,089,445✔
4268
  SRemoteValueListNode* pRemote = (SRemoteValueListNode*)pParam->pRes;
3,089,445✔
4269

4270
  qDebug("%s scl fetch rsp received, subQIdx:%d, rows:%" PRId64 , ctx->idStr, pParam->subQIdx, pRsp->numOfRows);
3,089,445✔
4271

4272
  if (pRsp->numOfRows > 0) {
3,090,338✔
4273
    ctx->code = createExprSubQResBlock(&pResBlock, &((SExprNode*)pParam->pRes)->resType);
2,959,062✔
4274
    if (TSDB_CODE_SUCCESS == ctx->code) {
2,958,387✔
4275
      ctx->code = blockDataEnsureCapacity(pResBlock, pRsp->numOfRows);
2,959,160✔
4276
    }
4277
    if (TSDB_CODE_SUCCESS == ctx->code) {
2,958,220✔
4278
      ctx->code = extractSingleRspBlock(pRsp, pResBlock);
2,959,220✔
4279
    }
4280
    if (TSDB_CODE_SUCCESS == ctx->code) {
2,955,467✔
4281
      ctx->code = updateValueListFromResBlock(ctx, pRemote, pResBlock);
2,957,306✔
4282
    }
4283
    if (TSDB_CODE_SUCCESS == ctx->code && pRsp->completed) {
2,956,047✔
4284
      pRemote->flag &= (~VALUELIST_FLAG_VAL_UNSET);
2,082,876✔
4285
      taosArraySet(ctx->subResNodes, pParam->subQIdx, &pParam->pRes);
2,082,876✔
4286
    }
4287

4288
    blockDataDestroy(pResBlock);  
2,954,827✔
4289
  } else if (0 == pRsp->numOfRows && pRsp->completed) {
131,276✔
4290
    if (!pRemote->hasValue) {
131,355✔
4291
      ctx->code = scalarBuildRemoteListHash(pRemote, NULL, 0);
131,355✔
4292
    }
4293
    if (TSDB_CODE_SUCCESS == ctx->code) {    
131,355✔
4294
      pRemote->flag &= (~VALUELIST_FLAG_VAL_UNSET);
131,355✔
4295
      taosArraySet(ctx->subResNodes, pParam->subQIdx, &pParam->pRes);
131,355✔
4296
    }
4297
  }
4298

4299
  *fetchDone = (TSDB_CODE_SUCCESS != ctx->code || pRsp->completed) ? true : false;
3,089,911✔
4300

4301
  if (!(*fetchDone)) {
3,089,911✔
4302
    ctx->code = sendFetchRemoteNodeReq(ctx, pParam->subQIdx, pParam->pRes);
876,034✔
4303
    if (ctx->code) {
875,461✔
UNCOV
4304
      *fetchDone = true;
×
4305
    }
4306
  }
4307
}
3,089,338✔
4308

4309

4310
int32_t remoteFetchCallBack(void* param, SDataBuf* pMsg, int32_t code) {
22,381,495✔
4311
  SScalarFetchParam* pParam = (SScalarFetchParam*)param;
22,381,495✔
4312
  STaskSubJobCtx* ctx = pParam->pSubJobCtx;
22,381,495✔
4313
  
4314
  taosMemoryFreeClear(pMsg->pEpSet);
22,381,082✔
4315

4316
  if (NULL == ctx) {
22,381,082✔
4317
    qWarn("scl fetch ctx not exists since it may have been released");
49,225✔
4318
    goto _exit;
49,225✔
4319
  }
4320

4321
  qDebug("%s subQIdx %d got rsp, blockIdx:%" PRId64 ", code:%d, rsp:%p", ctx->idStr, pParam->subQIdx, ctx->blockIdx, code, pMsg->pData);
22,331,857✔
4322

4323
  taosWLockLatch(&ctx->lock);
22,331,857✔
4324
  ctx->param = NULL;
22,335,150✔
4325
  taosWUnLockLatch(&ctx->lock);
22,335,150✔
4326

4327
  if (ctx->transporterId > 0) {
22,335,105✔
4328
    int32_t ret = asyncFreeConnById(ctx->rpcHandle, ctx->transporterId);
22,336,510✔
4329
    if (ret != 0) {
22,335,008✔
4330
      qDebug("%s failed to free subQ rpc handle, code:%s, subQIdx:%d", ctx->idStr, tstrerror(ret), pParam->subQIdx);
×
4331
    }
4332
    ctx->transporterId = -1;
22,331,352✔
4333
  }
4334

4335
  if (0 == code && NULL == pMsg->pData) {
22,329,947✔
4336
    qError("%s invalid rsp msg, msgType:%d, len:%d", ctx->idStr, pMsg->msgType, pMsg->len);
×
4337
    code = TSDB_CODE_QRY_INVALID_MSG;
×
4338
  }
4339

4340
  if (code == TSDB_CODE_SUCCESS) {
22,329,947✔
4341
    SRetrieveTableRsp* pRsp = pMsg->pData;
15,158,492✔
4342
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
15,158,492✔
4343
    pRsp->compLen = htonl(pRsp->compLen);
15,159,294✔
4344
    pRsp->payloadLen = htonl(pRsp->payloadLen);
15,159,294✔
4345
    pRsp->numOfCols = htonl(pRsp->numOfCols);
15,159,294✔
4346
    pRsp->useconds = htobe64(pRsp->useconds);
15,159,294✔
4347
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
15,161,795✔
4348

4349
    qDebug("%s subQIdx %d blockIdx:%" PRIu64 " rsp detail, numOfBlocks:%d, numOfRows:%" PRId64 ", completed:%d", 
15,161,795✔
4350
      ctx->idStr, pParam->subQIdx, ctx->blockIdx, pRsp->numOfBlocks, pRsp->numOfRows, pRsp->completed);
4351

4352
    ctx->blockIdx++;
15,161,874✔
4353

4354
    switch (nodeType(pParam->pRes)) {
15,161,874✔
4355
      case QUERY_NODE_REMOTE_VALUE:
12,071,134✔
4356
        handleRemoteValueRes(pParam, ctx, pRsp);
12,071,134✔
4357
        break;
12,070,109✔
4358
      case QUERY_NODE_REMOTE_VALUE_LIST: {
3,090,740✔
4359
        bool fetchDone = false;
3,090,740✔
4360
        handleRemoteValueListRes(pParam, ctx, pRsp, &fetchDone);
3,090,740✔
4361
        qDebug("%s subQIdx %d handle remote value list finished, fetchDone:%d", ctx->idStr, pParam->subQIdx, fetchDone);
3,088,927✔
4362
        if (!fetchDone) {
3,089,684✔
4363
          goto _exit;
875,546✔
4364
        }
4365
        break;
2,214,138✔
4366
      }  
UNCOV
4367
      default:
×
UNCOV
4368
        qError("%s invalid scl fetch res node %d, subQIdx:%d", ctx->idStr, nodeType(pParam->pRes), pParam->subQIdx);
×
UNCOV
4369
        ctx->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
4370
        break;
×
4371
    }
4372
  } else {
4373
    ctx->code = rpcCvtErrCode(code);
7,171,455✔
4374
    if (ctx->code != code) {
7,171,136✔
UNCOV
4375
      qError("%s scl fetch rsp received, subQIdx:%d, error:%s, cvted error: %s", ctx->idStr, pParam->subQIdx,
×
4376
             tstrerror(code), tstrerror(ctx->code));
4377
    } else {
4378
      qError("%s scl fetch rsp received, subQIdx:%d, error:%s", ctx->idStr, pParam->subQIdx, tstrerror(code));
7,170,723✔
4379
    }
4380
  }
4381

4382
  qDebug("%s subQIdx %d sem_post subQ ready", ctx->idStr, pParam->subQIdx);
21,456,870✔
4383
  
4384
  code = tsem_post(&pParam->pSubJobCtx->ready);
21,456,870✔
4385
  if (code != TSDB_CODE_SUCCESS) {
21,458,459✔
UNCOV
4386
    qError("failed to invoke post when scl fetch rsp is ready, code:%s", tstrerror(code));
×
4387
  }
4388

4389
_exit:
21,458,459✔
4390

4391
  taosMemoryFree(pMsg->pData);
22,383,643✔
4392

4393
  return code;
22,385,038✔
4394
}
4395

4396

4397
int32_t sendFetchRemoteNodeReq(STaskSubJobCtx* ctx, int32_t subQIdx, SNode* pRes) {
22,332,816✔
4398
  int32_t          code = TSDB_CODE_SUCCESS;
22,332,816✔
4399
  int32_t          lino = 0;
22,332,816✔
4400
  SDownstreamSourceNode* pSource = (SDownstreamSourceNode*)taosArrayGetP(ctx->subEndPoints, subQIdx);
22,332,816✔
4401

4402
  SResFetchReq req = {0};
22,314,634✔
4403
  req.header.vgId = pSource->addr.nodeId;
22,313,808✔
4404
  req.sId = pSource->sId;
22,313,808✔
4405
  req.clientId = pSource->clientId;
22,316,699✔
4406
  req.taskId = pSource->taskId;
22,315,784✔
4407
  req.srcTaskId = ctx->taskId;
22,318,262✔
4408
  req.blockIdx = ctx->blockIdx;
22,314,634✔
4409
  req.queryId = ctx->queryId;
22,312,480✔
4410
  req.execId = pSource->execId;
22,312,893✔
4411

4412
  int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, false);
22,313,808✔
4413
  if (msgSize < 0) {
22,331,328✔
UNCOV
4414
    return msgSize;
×
4415
  }
4416

4417
  void* msg = taosMemoryCalloc(1, msgSize);
22,331,328✔
4418
  if (NULL == msg) {
22,343,402✔
UNCOV
4419
    return terrno;
×
4420
  }
4421

4422
  msgSize = tSerializeSResFetchReq(msg, msgSize, &req, false);
22,343,402✔
4423
  if (msgSize < 0) {
22,387,257✔
UNCOV
4424
    taosMemoryFree(msg);
×
UNCOV
4425
    return msgSize;
×
4426
  }
4427

4428
  qDebug("%s scl build fetch msg and send to nodeId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
22,387,257✔
4429
         ", execId:%d, blockIdx:%" PRId64,
4430
         ctx->idStr, pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
4431
         pSource->taskId, pSource->execId, req.blockIdx);
4432

4433
  // send the fetch remote task result reques
4434
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
22,387,257✔
4435
  if (NULL == pMsgSendInfo) {
22,326,784✔
UNCOV
4436
    taosMemoryFreeClear(msg);
×
UNCOV
4437
    qError("%s prepare message %d failed", ctx->idStr, (int32_t)sizeof(SMsgSendInfo));
×
UNCOV
4438
    return terrno;
×
4439
  }
4440

4441
  SScalarFetchParam* param = taosMemoryMalloc(sizeof(SScalarFetchParam));
22,326,784✔
4442
  if (NULL == param) {
22,338,278✔
UNCOV
4443
    taosMemoryFreeClear(msg);
×
UNCOV
4444
    taosMemoryFreeClear(pMsgSendInfo);
×
UNCOV
4445
    qError("%s prepare param %d failed", ctx->idStr, (int32_t)sizeof(SScalarFetchParam));
×
UNCOV
4446
    return terrno;
×
4447
  }
4448

4449
  taosWLockLatch(&ctx->lock);
22,338,278✔
4450
  
4451
  if (ctx->code) {
22,358,411✔
4452
    qError("task has been killed, error:%s", tstrerror(ctx->code));
688✔
4453
    taosMemoryFree(param);
688✔
4454
    taosMemoryFreeClear(msg);
688✔
4455
    taosMemoryFreeClear(pMsgSendInfo);
688✔
4456
    code = ctx->code;
688✔
4457
    taosWUnLockLatch(&ctx->lock);
688✔
4458
    goto _end;
688✔
4459
  } else {
4460
    ctx->param = param;
22,355,658✔
4461
  }
4462
  
4463
  taosWUnLockLatch(&ctx->lock);
22,357,723✔
4464

4465
  param->subQIdx = subQIdx;
22,354,852✔
4466
  param->pRes = pRes;
22,354,852✔
4467
  param->pSubJobCtx = ctx;
22,355,678✔
4468

4469
  pMsgSendInfo->param = param;
22,355,678✔
4470
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
22,354,026✔
4471
  pMsgSendInfo->msgInfo.pData = msg;
22,352,787✔
4472
  pMsgSendInfo->msgInfo.len = msgSize;
22,351,961✔
4473
  pMsgSendInfo->msgType = pSource->fetchMsgType;
22,355,265✔
4474
  pMsgSendInfo->fp = remoteFetchCallBack;
22,345,766✔
4475
  pMsgSendInfo->requestId = ctx->queryId;
22,352,787✔
4476

4477
  code = asyncSendMsgToServer(ctx->rpcHandle, &pSource->addr.epSet, &ctx->transporterId, pMsgSendInfo);
22,353,613✔
4478
  QUERY_CHECK_CODE(code, lino, _end);
22,388,656✔
4479
      
4480
_end:
22,388,656✔
4481

4482
  if (code != TSDB_CODE_SUCCESS) {
22,389,344✔
4483
    qError("%s %s failed at line %d since %s", ctx->idStr, __func__, lino, tstrerror(code));
688✔
4484
  }
4485
  
4486
  return code;
22,391,385✔
4487
}
4488

4489
int32_t fetchRemoteNodeImpl(STaskSubJobCtx* ctx, int32_t subQIdx, SNode* pRes) {
21,464,032✔
4490
  int32_t          code = TSDB_CODE_SUCCESS;
21,464,032✔
4491
  int32_t          lino = 0;
21,464,032✔
4492

4493
  ctx->blockIdx = 0;
21,464,032✔
4494

4495
  code = sendFetchRemoteNodeReq(ctx, subQIdx, pRes);
21,466,097✔
4496
  QUERY_CHECK_CODE(code, lino, _end);
21,514,424✔
4497

4498
  code = qSemWait(ctx->pTaskInfo, &ctx->ready);
21,513,736✔
4499
  if (isTaskKilled(ctx->pTaskInfo)) {
21,526,564✔
4500
    code = getTaskCode(ctx->pTaskInfo);
88,067✔
4501
  } else {
4502
    code = ctx->code;
21,438,535✔
4503
  }
4504
      
4505
_end:
21,527,290✔
4506

4507
  taosWLockLatch(&ctx->lock);
21,527,290✔
4508
  ctx->param = NULL;
21,527,779✔
4509
  taosWUnLockLatch(&ctx->lock);
21,527,779✔
4510

4511
  if (code != TSDB_CODE_SUCCESS) {
21,525,810✔
4512
    qError("%s %s failed at line %d since %s", ctx->idStr, __func__, lino, tstrerror(code));
7,240,844✔
4513
  }
4514
  return code;
21,526,593✔
4515
}
4516

4517
int32_t remoteNodeCopy(SNode* pSrc, SNode* pDst) {
401,249✔
4518
  int32_t code = 0, lino = 0;
401,249✔
4519
  
4520
  switch (nodeType(pSrc)) {
401,249✔
4521
    case QUERY_NODE_VALUE:
328,209✔
4522
      TAOS_CHECK_EXIT(valueNodeCopy((SValueNode*)pSrc, &((SRemoteValueNode*)pDst)->val));
328,209✔
4523
      ((SRemoteValueNode*)pDst)->val.node.type = QUERY_NODE_VALUE;
328,209✔
4524
      break;
328,209✔
4525
    case QUERY_NODE_REMOTE_VALUE_LIST: {
73,040✔
4526
      SRemoteValueListNode* pDstNode = (SRemoteValueListNode*)pDst;
73,040✔
4527
      memcpy(pDst, pSrc, sizeof(SRemoteValueListNode));
73,040✔
4528
      pDstNode->hashAllocated = false;      
73,040✔
4529
      break;
73,040✔
4530
    } 
UNCOV
4531
    default:
×
UNCOV
4532
      break;
×
4533
  }
4534

4535
_exit:
401,249✔
4536

4537
  if (code) {
401,249✔
UNCOV
4538
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4539
  }
4540

4541
  return code;
401,249✔
4542
}
4543

4544
int32_t qFetchRemoteNode(void* pCtx, int32_t subQIdx, SNode* pRes) {
21,872,235✔
4545
  STaskSubJobCtx*  ctx = (STaskSubJobCtx*)pCtx;
21,872,235✔
4546
  int32_t code = 0, lino = 0;
21,872,235✔
4547
  int32_t       subEndPoinsNum = taosArrayGetSize(ctx->subEndPoints);
21,872,235✔
4548
  if (subQIdx >= subEndPoinsNum) {
21,896,879✔
UNCOV
4549
    qError("%s invalid subQIdx %d, subEndPointsNum:%d", ctx->idStr, subQIdx, subEndPoinsNum);
×
UNCOV
4550
    return TSDB_CODE_QRY_SUBQ_NOT_FOUND;
×
4551
  }
4552

4553
  SNode** ppRes = taosArrayGet(ctx->subResNodes, subQIdx);
21,896,879✔
4554
  if (NULL == *ppRes) {
21,864,435✔
4555
    TAOS_CHECK_EXIT(fetchRemoteNodeImpl(ctx, subQIdx, pRes));
21,469,071✔
4556
    *ppRes = pRes;
14,284,135✔
4557
  } else {
4558
    TAOS_CHECK_EXIT(remoteNodeCopy(*ppRes, pRes));
397,842✔
4559
  }
4560

4561
_exit:
401,249✔
4562

4563
  if (code) {
21,926,916✔
4564
    qError("%s %s failed at line %d since %s", ctx->idStr, __func__, lino, tstrerror(code));
7,241,532✔
4565
  } else {
4566
    qDebug("%s %s subQIdx %d succeed", ctx->idStr, __func__, subQIdx);
14,685,384✔
4567
  }
4568

4569
  return code;
21,927,397✔
4570
}
4571

4572

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc