• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4904

29 Dec 2025 12:28PM UTC coverage: 65.734% (+0.004%) from 65.73%
#4904

push

travis-ci

web-flow
Merge d019577a3 into 31126ab2b

15 of 16 new or added lines in 2 files covered. (93.75%)

268 existing lines in 3 files now uncovered.

192952 of 293535 relevant lines covered (65.73%)

118519910.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.78
/source/libs/executor/src/executil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "function.h"
17
#include "functionMgt.h"
18
#include "index.h"
19
#include "os.h"
20
#include "query.h"
21
#include "querynodes.h"
22
#include "taoserror.h"
23
#include "tarray.h"
24
#include "tcompare.h"
25
#include "tdatablock.h"
26
#include "thash.h"
27
#include "tmsg.h"
28
#include "ttime.h"
29

30
#include "executil.h"
31
#include "executorInt.h"
32
#include "querytask.h"
33
#include "storageapi.h"
34
#include "tutil.h"
35
#include "tjson.h"
36
#include "trpc.h"
37
#include "filter.h"
38

39
typedef struct tagFilterAssist {
40
  SHashObj* colHash;
41
  int32_t   index;
42
  SArray*   cInfoList;
43
  int32_t   code;
44
} tagFilterAssist;
45

46
typedef struct STransTagExprCtx {
47
  int32_t      code;
48
  SMetaReader* pReader;
49
} STransTagExprCtx;
50

51
typedef enum {
52
  FILTER_NO_LOGIC = 1,
53
  FILTER_AND,
54
  FILTER_OTHER,
55
} FilterCondType;
56

57
static FilterCondType checkTagCond(SNode* cond);
58
static bool optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI);
59
static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI,
60
                                        uint64_t suid);
61

62
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
63
                            STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI, void* pStreamInfo);
64

65
static int64_t getLimit(const SNode* pLimit) {
1,188,322,048✔
66
  return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->limit) ? -1 : ((SLimitNode*)pLimit)->limit->datum.i;
1,188,322,048✔
67
}
68
static int64_t getOffset(const SNode* pLimit) {
1,187,916,511✔
69
  return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->offset) ? -1 : ((SLimitNode*)pLimit)->offset->datum.i;
1,187,916,511✔
70
}
71
static void releaseColInfoData(void* pCol);
72

73
void initResultRowInfo(SResultRowInfo* pResultRowInfo) {
484,389,928✔
74
  pResultRowInfo->size = 0;
484,389,928✔
75
  pResultRowInfo->cur.pageId = -1;
484,596,836✔
76
}
484,695,778✔
77

78
void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; }
5,203,341✔
79

80
void resetResultRow(SResultRow* pResultRow, size_t entrySize) {
853,547,402✔
81
  pResultRow->numOfRows = 0;
853,547,402✔
82
  pResultRow->closed = false;
853,547,778✔
83
  pResultRow->endInterp = false;
853,547,778✔
84
  pResultRow->startInterp = false;
853,547,778✔
85

86
  if (entrySize > 0) {
853,549,282✔
87
    memset(pResultRow->pEntryInfo, 0, entrySize);
853,549,282✔
88
  }
89
}
853,554,170✔
90

91
// TODO refactor: use macro
92
SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) {
2,147,483,647✔
93
  return (SResultRowEntryInfo*)((char*)pRow->pEntryInfo + offset[index]);
2,147,483,647✔
94
}
95

96
size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
281,580,151✔
97
  int32_t rowSize = (numOfOutput * sizeof(SResultRowEntryInfo)) + sizeof(SResultRow);
281,580,151✔
98

99
  for (int32_t i = 0; i < numOfOutput; ++i) {
1,049,466,911✔
100
    rowSize += pCtx[i].resDataInfo.interBufSize;
768,138,180✔
101
  }
102

103
  return rowSize;
281,328,731✔
104
}
105

106
// Convert buf read from rocksdb to result row
107
int32_t getResultRowFromBuf(SExprSupp* pSup, const char* inBuf, size_t inBufSize, char** outBuf, size_t* outBufSize) {
×
108
  if (inBuf == NULL || pSup == NULL) {
×
109
    qError("invalid input parameters, inBuf:%p, pSup:%p", inBuf, pSup);
×
110
    return TSDB_CODE_INVALID_PARA;
×
111
  }
112
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
113
  int32_t*        offset = pSup->rowEntryInfoOffset;
×
114
  SResultRow*     pResultRow = NULL;
×
115
  size_t          processedSize = 0;
×
116
  int32_t         code = TSDB_CODE_SUCCESS;
×
117

118
  // calculate the size of output buffer
119
  *outBufSize = getResultRowSize(pCtx, pSup->numOfExprs);
×
120
  *outBuf = taosMemoryMalloc(*outBufSize);
×
121
  if (*outBuf == NULL) {
×
122
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
123
    return terrno;
×
124
  }
125
  pResultRow = (SResultRow*)*outBuf;
×
126
  (void)memcpy(pResultRow, inBuf, sizeof(SResultRow));
×
127
  inBuf += sizeof(SResultRow);
×
128
  processedSize += sizeof(SResultRow);
×
129

130
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
×
131
    int32_t len = *(int32_t*)inBuf;
×
132
    inBuf += sizeof(int32_t);
×
133
    processedSize += sizeof(int32_t);
×
134
    if (pResultRow->version != FUNCTION_RESULT_INFO_VERSION && pCtx->fpSet.decode) {
×
135
      code = pCtx->fpSet.decode(&pCtx[i], inBuf, getResultEntryInfo(pResultRow, i, offset), pResultRow->version);
×
136
      if (code != TSDB_CODE_SUCCESS) {
×
137
        qError("failed to decode result row, code:%d", code);
×
138
        return code;
×
139
      }
140
    } else {
141
      (void)memcpy(getResultEntryInfo(pResultRow, i, offset), inBuf, len);
×
142
    }
143
    inBuf += len;
×
144
    processedSize += len;
×
145
  }
146

147
  if (processedSize < inBufSize) {
×
148
    // stream stores extra data after result row
149
    size_t leftLen = inBufSize - processedSize;
×
150
    TAOS_MEMORY_REALLOC(*outBuf, *outBufSize + leftLen);
×
151
    if (*outBuf == NULL) {
×
152
      qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen);
×
153
      return terrno;
×
154
    }
155
    (void)memcpy(*outBuf + *outBufSize, inBuf, leftLen);
×
156
    inBuf += leftLen;
×
157
    processedSize += leftLen;
×
158
    *outBufSize += leftLen;
×
159
  }
160

161
  qTrace("[StreamInternal] get result inBufSize:%zu, outBufSize:%zu", inBufSize, *outBufSize);
×
162
  return TSDB_CODE_SUCCESS;
×
163
}
164

165
// Convert result row to buf for rocksdb
166
int32_t putResultRowToBuf(SExprSupp* pSup, const char* inBuf, size_t inBufSize, char** outBuf, size_t* outBufSize) {
×
167
  if (pSup == NULL || inBuf == NULL || outBuf == NULL || outBufSize == NULL) {
×
168
    qError("invalid input parameters, inBuf:%p, pSup:%p, outBufSize:%p, outBuf:%p", inBuf, pSup, outBufSize, outBuf);
×
169
    return TSDB_CODE_INVALID_PARA;
×
170
  }
171

172
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
173
  int32_t*        offset = pSup->rowEntryInfoOffset;
×
174
  SResultRow*     pResultRow = (SResultRow*)inBuf;
×
175
  size_t          rowSize = getResultRowSize(pCtx, pSup->numOfExprs);
×
176

177
  if (rowSize > inBufSize) {
×
178
    qError("invalid input buffer size, rowSize:%zu, inBufSize:%zu", rowSize, inBufSize);
×
179
    return TSDB_CODE_INVALID_PARA;
×
180
  }
181

182
  // calculate the size of output buffer
183
  *outBufSize = rowSize + sizeof(int32_t) * pSup->numOfExprs;
×
184
  if (rowSize < inBufSize) {
×
185
    *outBufSize += inBufSize - rowSize;
×
186
  }
187

188
  *outBuf = taosMemoryMalloc(*outBufSize);
×
189
  if (*outBuf == NULL) {
×
190
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
191
    return terrno;
×
192
  }
193

194
  char* pBuf = *outBuf;
×
195
  pResultRow->version = FUNCTION_RESULT_INFO_VERSION;
×
196
  (void)memcpy(pBuf, pResultRow, sizeof(SResultRow));
×
197
  pBuf += sizeof(SResultRow);
×
198
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
×
199
    size_t len = sizeof(SResultRowEntryInfo) + pCtx[i].resDataInfo.interBufSize;
×
200
    *(int32_t*)pBuf = (int32_t)len;
×
201
    pBuf += sizeof(int32_t);
×
202
    (void)memcpy(pBuf, getResultEntryInfo(pResultRow, i, offset), len);
×
203
    pBuf += len;
×
204
  }
205

206
  if (rowSize < inBufSize) {
×
207
    // stream stores extra data after result row
208
    size_t leftLen = inBufSize - rowSize;
×
209
    (void)memcpy(pBuf, inBuf + rowSize, leftLen);
×
210
    pBuf += leftLen;
×
211
  }
212

213
  qTrace("[StreamInternal] put result inBufSize:%zu, outBufSize:%zu", inBufSize, *outBufSize);
×
214
  return TSDB_CODE_SUCCESS;
×
215
}
216

217
static void freeEx(void* p) { taosMemoryFree(*(void**)p); }
×
218

219
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
143,747,755✔
220
  taosMemoryFreeClear(pGroupResInfo->pBuf);
143,747,755✔
221
  if (pGroupResInfo->freeItem) {
143,745,594✔
222
    //    taosArrayDestroy(pGroupResInfo->pRows);
223
    taosArrayDestroyEx(pGroupResInfo->pRows, freeEx);
×
224
    pGroupResInfo->freeItem = false;
×
225
    pGroupResInfo->pRows = NULL;
×
226
  } else {
227
    taosArrayDestroy(pGroupResInfo->pRows);
143,741,813✔
228
    pGroupResInfo->pRows = NULL;
143,733,806✔
229
  }
230
  pGroupResInfo->index = 0;
143,744,950✔
231
  pGroupResInfo->delIndex = 0;
143,745,409✔
232
}
143,738,946✔
233

234
int32_t resultrowComparAsc(const void* p1, const void* p2) {
2,147,483,647✔
235
  SResKeyPos* pp1 = *(SResKeyPos**)p1;
2,147,483,647✔
236
  SResKeyPos* pp2 = *(SResKeyPos**)p2;
2,147,483,647✔
237

238
  if (pp1->groupId == pp2->groupId) {
2,147,483,647✔
239
    int64_t pts1 = *(int64_t*)pp1->key;
2,147,483,647✔
240
    int64_t pts2 = *(int64_t*)pp2->key;
2,147,483,647✔
241

242
    if (pts1 == pts2) {
2,147,483,647✔
243
      return 0;
×
244
    } else {
245
      return pts1 < pts2 ? -1 : 1;
2,147,483,647✔
246
    }
247
  } else {
248
    return pp1->groupId < pp2->groupId ? -1 : 1;
2,147,483,647✔
249
  }
250
}
251

252
static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); }
2,147,483,647✔
253

254
int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) {
93,010,543✔
255
  int32_t code = TSDB_CODE_SUCCESS;
93,010,543✔
256
  int32_t lino = 0;
93,010,543✔
257
  if (pGroupResInfo->pRows != NULL) {
93,010,543✔
258
    taosArrayDestroy(pGroupResInfo->pRows);
646,667✔
259
  }
260
  if (pGroupResInfo->pBuf) {
93,016,466✔
261
    taosMemoryFree(pGroupResInfo->pBuf);
646,667✔
262
    pGroupResInfo->pBuf = NULL;
646,667✔
263
  }
264

265
  // extract the result rows information from the hash map
266
  int32_t size = tSimpleHashGetSize(pHashmap);
93,007,885✔
267

268
  void* pData = NULL;
93,008,746✔
269
  pGroupResInfo->pRows = taosArrayInit(size, POINTER_BYTES);
93,008,746✔
270
  QUERY_CHECK_NULL(pGroupResInfo->pRows, code, lino, _end, terrno);
93,009,924✔
271

272
  size_t  keyLen = 0;
93,003,357✔
273
  int32_t iter = 0;
93,005,447✔
274
  int64_t bufLen = 0, offset = 0;
93,012,408✔
275

276
  // todo move away and record this during create window
277
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
2,147,483,647✔
278
    /*void* key = */ (void)tSimpleHashGetKey(pData, &keyLen);
279
    bufLen += keyLen + sizeof(SResultRowPosition);
2,147,483,647✔
280
  }
281

282
  pGroupResInfo->pBuf = taosMemoryMalloc(bufLen);
92,972,558✔
283
  QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
93,014,880✔
284

285
  iter = 0;
93,011,764✔
286
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
2,147,483,647✔
287
    void* key = tSimpleHashGetKey(pData, &keyLen);
2,147,483,647✔
288

289
    SResKeyPos* p = (SResKeyPos*)(pGroupResInfo->pBuf + offset);
2,147,483,647✔
290

291
    p->groupId = *(uint64_t*)key;
2,147,483,647✔
292
    p->pos = *(SResultRowPosition*)pData;
2,147,483,647✔
293
    memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
2,147,483,647✔
294
    void* tmp = taosArrayPush(pGroupResInfo->pRows, &p);
2,147,483,647✔
295
    QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
2,147,483,647✔
296

297
    offset += keyLen + sizeof(struct SResultRowPosition);
2,147,483,647✔
298
  }
299

300
  if (order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC) {
92,992,846✔
301
    __compar_fn_t fn = (order == TSDB_ORDER_ASC) ? resultrowComparAsc : resultrowComparDesc;
14,213,226✔
302
    size = POINTER_BYTES;
14,213,226✔
303
    taosSort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), size, fn);
14,213,226✔
304
  }
305

306
  pGroupResInfo->index = 0;
92,993,650✔
307

308
_end:
92,950,064✔
309
  if (code != TSDB_CODE_SUCCESS) {
93,014,664✔
310
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
311
  }
312
  return code;
93,014,664✔
313
}
314

315
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
×
316
  if (pGroupResInfo->pRows != NULL) {
×
317
    taosArrayDestroy(pGroupResInfo->pRows);
×
318
  }
319

320
  pGroupResInfo->freeItem = true;
×
321
  pGroupResInfo->pRows = pArrayList;
×
322
  pGroupResInfo->index = 0;
×
323
  pGroupResInfo->delIndex = 0;
×
324
}
×
325

326
bool hasRemainResults(SGroupResInfo* pGroupResInfo) {
351,019,977✔
327
  if (pGroupResInfo->pRows == NULL) {
351,019,977✔
328
    return false;
×
329
  }
330

331
  return pGroupResInfo->index < taosArrayGetSize(pGroupResInfo->pRows);
351,029,579✔
332
}
333

334
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
191,847,325✔
335
  if (pGroupResInfo->pRows == 0) {
191,847,325✔
336
    return 0;
×
337
  }
338

339
  return (int32_t)taosArrayGetSize(pGroupResInfo->pRows);
191,856,407✔
340
}
341

342
SArray* createSortInfo(SNodeList* pNodeList) {
61,764,858✔
343
  size_t numOfCols = 0;
61,764,858✔
344

345
  if (pNodeList != NULL) {
61,764,858✔
346
    numOfCols = LIST_LENGTH(pNodeList);
61,330,354✔
347
  } else {
348
    numOfCols = 0;
434,611✔
349
  }
350

351
  SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
61,761,209✔
352
  if (pList == NULL) {
61,729,167✔
353
    return pList;
×
354
  }
355

356
  for (int32_t i = 0; i < numOfCols; ++i) {
140,070,594✔
357
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)nodesListGetNode(pNodeList, i);
78,280,694✔
358
    if (!pSortKey) {
78,292,387✔
359
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
360
      taosArrayDestroy(pList);
×
361
      pList = NULL;
×
362
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
363
      break;
×
364
    }
365
    SBlockOrderInfo bi = {0};
78,292,387✔
366
    bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
78,279,947✔
367
    bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
78,287,688✔
368

369
    if (nodeType(pSortKey->pExpr) != QUERY_NODE_COLUMN) {
78,312,265✔
370
      qError("invalid order by expr type:%d", nodeType(pSortKey->pExpr));
×
371
      taosArrayDestroy(pList);
×
372
      pList = NULL;
×
373
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
374
      break;
×
375
    }
376
    
377
    SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr;
78,241,306✔
378
    bi.slotId = pColNode->slotId;
78,312,451✔
379
    void* tmp = taosArrayPush(pList, &bi);
78,343,901✔
380
    if (!tmp) {
78,343,901✔
381
      taosArrayDestroy(pList);
×
382
      pList = NULL;
×
383
      break;
×
384
    }
385
  }
386

387
  return pList;
61,790,168✔
388
}
389

390
SSDataBlock* createDataBlockFromDescNode(void* p) {
766,935,678✔
391
  SDataBlockDescNode* pNode = (SDataBlockDescNode*)p;
766,935,678✔
392
  int32_t      numOfCols = LIST_LENGTH(pNode->pSlots);
766,935,678✔
393
  SSDataBlock* pBlock = NULL;
767,031,095✔
394
  int32_t      code = createDataBlock(&pBlock);
767,038,865✔
395
  if (code) {
766,776,035✔
396
    terrno = code;
×
397
    return NULL;
×
398
  }
399

400
  pBlock->info.id.blockId = pNode->dataBlockId;
766,776,035✔
401
  pBlock->info.type = STREAM_INVALID;
766,748,861✔
402
  pBlock->info.calWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
767,065,016✔
403
  pBlock->info.watermark = INT64_MIN;
766,883,250✔
404

405
  for (int32_t i = 0; i < numOfCols; ++i) {
2,147,483,647✔
406
    SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
2,147,483,647✔
407
    if (!pDescNode) {
2,147,483,647✔
408
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
409
      blockDataDestroy(pBlock);
×
410
      pBlock = NULL;
×
411
      terrno = TSDB_CODE_INVALID_PARA;
×
412
      break;
×
413
    }
414
    SColumnInfoData idata =
2,147,483,647✔
415
        createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
2,147,483,647✔
416
    idata.info.scale = pDescNode->dataType.scale;
2,147,483,647✔
417
    idata.info.precision = pDescNode->dataType.precision;
2,147,483,647✔
418
    idata.info.noData = pDescNode->reserve;
2,147,483,647✔
419

420
    code = blockDataAppendColInfo(pBlock, &idata);
2,147,483,647✔
421
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
422
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
472,954✔
423
      blockDataDestroy(pBlock);
472,954✔
424
      pBlock = NULL;
×
425
      terrno = code;
×
426
      break;
×
427
    }
428
  }
429

430
  return pBlock;
767,663,765✔
431
}
432

433
int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) {
261,443,335✔
434
  SDataBlockInfo* pBlockInfo = &pDataBlock->info;
261,443,335✔
435

436
  for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) {
1,054,995,389✔
437
    SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i);
814,028,823✔
438
    if (!pItem) {
814,192,058✔
439
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
440
      return terrno;
×
441
    }
442

443
    if (pItem->isPk) {
814,192,058✔
444
      SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId);
20,776,559✔
445
      if (!pInfoData) {
20,630,086✔
446
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
447
        return terrno;
×
448
      }
449
      pBlockInfo->pks[0].type = pInfoData->info.type;
20,630,086✔
450
      pBlockInfo->pks[1].type = pInfoData->info.type;
20,640,142✔
451

452
      // allocate enough buffer size, which is pInfoData->info.bytes
453
      if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
20,643,352✔
454
        pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
6,269,662✔
455
        if (pBlockInfo->pks[0].pData == NULL) {
6,277,993✔
456
          return terrno;
×
457
        }
458

459
        pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
6,279,810✔
460
        if (pBlockInfo->pks[1].pData == NULL) {
6,282,234✔
461
          taosMemoryFreeClear(pBlockInfo->pks[0].pData);
×
462
          return terrno;
×
463
        }
464

465
        pBlockInfo->pks[0].nData = pInfoData->info.bytes;
6,284,019✔
466
        pBlockInfo->pks[1].nData = pInfoData->info.bytes;
6,285,828✔
467
      }
468

469
      break;
20,639,035✔
470
    }
471
  }
472

473
  return TSDB_CODE_SUCCESS;
261,660,960✔
474
}
475

476
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
2,173,570✔
477
  STransTagExprCtx* pCtx = pContext;
2,173,570✔
478
  SMetaReader*      mr = pCtx->pReader;
2,173,570✔
479
  bool              isTagCol = false, isTbname = false;
2,173,570✔
480
  if (nodeType(*pNode) == QUERY_NODE_COLUMN) {
2,173,570✔
481
    SColumnNode* pCol = (SColumnNode*)*pNode;
621,020✔
482
    if (pCol->colType == COLUMN_TYPE_TBNAME)
621,020✔
483
      isTbname = true;
×
484
    else
485
      isTagCol = true;
621,020✔
486
  } else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) {
1,552,550✔
487
    SFunctionNode* pFunc = (SFunctionNode*)*pNode;
×
488
    if (pFunc->funcType == FUNCTION_TYPE_TBNAME) isTbname = true;
×
489
  }
490
  if (isTagCol) {
2,173,570✔
491
    SColumnNode* pSColumnNode = *(SColumnNode**)pNode;
621,020✔
492

493
    SValueNode* res = NULL;
621,020✔
494
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
621,020✔
495
    if (NULL == res) {
621,020✔
496
      return DEAL_RES_ERROR;
×
497
    }
498

499
    res->translate = true;
621,020✔
500
    res->node.resType = pSColumnNode->node.resType;
621,020✔
501

502
    STagVal tagVal = {0};
621,020✔
503
    tagVal.cid = pSColumnNode->colId;
621,020✔
504
    const char* p = mr->pAPI->extractTagVal(mr->me.ctbEntry.pTags, pSColumnNode->node.resType.type, &tagVal);
621,020✔
505
    if (p == NULL) {
621,020✔
506
      res->node.resType.type = TSDB_DATA_TYPE_NULL;
×
507
    } else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) {
621,020✔
508
      int32_t len = ((const STag*)p)->len;
×
509
      res->datum.p = taosMemoryCalloc(len + 1, 1);
×
510
      if (NULL == res->datum.p) {
×
511
        return DEAL_RES_ERROR;
×
512
      }
513
      memcpy(res->datum.p, p, len);
×
514
    } else if (IS_VAR_DATA_TYPE(pSColumnNode->node.resType.type)) {
621,020✔
515
      if (IS_STR_DATA_BLOB(pSColumnNode->node.resType.type)) {
621,020✔
516
        return TSDB_CODE_BLOB_NOT_SUPPORT_TAG;
×
517
      }
518

519
      res->datum.p = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1);
621,020✔
520
      if (NULL == res->datum.p) {
621,020✔
521
        return DEAL_RES_ERROR;
×
522
      }
523

524
      if (IS_STR_DATA_BLOB(pSColumnNode->node.resType.type)) {
621,020✔
525
        memcpy(blobDataVal(res->datum.p), tagVal.pData, tagVal.nData);
×
526
        blobDataSetLen(res->datum.p, tagVal.nData);
×
527
      } else {
528
        memcpy(varDataVal(res->datum.p), tagVal.pData, tagVal.nData);
621,020✔
529
        varDataSetLen(res->datum.p, tagVal.nData);
621,020✔
530
      }
531
    } else {
532
      int32_t code = nodesSetValueNodeValue(res, &(tagVal.i64));
×
533
      if (code != TSDB_CODE_SUCCESS) {
×
534
        return DEAL_RES_ERROR;
×
535
      }
536
    }
537
    nodesDestroyNode(*pNode);
621,020✔
538
    *pNode = (SNode*)res;
621,020✔
539
  } else if (isTbname) {
1,552,550✔
540
    SValueNode* res = NULL;
×
541
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
×
542
    if (NULL == res) {
×
543
      return DEAL_RES_ERROR;
×
544
    }
545

546
    res->translate = true;
×
547
    res->node.resType = ((SExprNode*)(*pNode))->resType;
×
548

549
    int32_t len = strlen(mr->me.name);
×
550
    res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
×
551
    if (NULL == res->datum.p) {
×
552
      return DEAL_RES_ERROR;
×
553
    }
554
    memcpy(varDataVal(res->datum.p), mr->me.name, len);
×
555
    varDataSetLen(res->datum.p, len);
×
556
    nodesDestroyNode(*pNode);
×
557
    *pNode = (SNode*)res;
×
558
  }
559

560
  return DEAL_RES_CONTINUE;
2,173,570✔
561
}
562

563
int32_t isQualifiedTable(int64_t uid, SNode* pTagCond, void* vnode, bool* pQualified, SStorageAPI* pAPI) {
310,510✔
564
  int32_t     code = TSDB_CODE_SUCCESS;
310,510✔
565
  SMetaReader mr = {0};
310,510✔
566

567
  pAPI->metaReaderFn.initReader(&mr, vnode, META_READER_LOCK, &pAPI->metaFn);
310,510✔
568
  code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid);
310,510✔
569
  if (TSDB_CODE_SUCCESS != code) {
310,510✔
570
    pAPI->metaReaderFn.clearReader(&mr);
×
571
    *pQualified = false;
×
572

573
    return TSDB_CODE_SUCCESS;
×
574
  }
575

576
  SNode* pTagCondTmp = NULL;
310,510✔
577
  code = nodesCloneNode(pTagCond, &pTagCondTmp);
310,510✔
578
  if (TSDB_CODE_SUCCESS != code) {
310,510✔
579
    *pQualified = false;
×
580
    pAPI->metaReaderFn.clearReader(&mr);
×
581
    return code;
×
582
  }
583
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
310,510✔
584
  nodesRewriteExprPostOrder(&pTagCondTmp, doTranslateTagExpr, &ctx);
310,510✔
585
  pAPI->metaReaderFn.clearReader(&mr);
310,510✔
586
  if (TSDB_CODE_SUCCESS != ctx.code) {
310,510✔
587
    *pQualified = false;
×
588
    nodesDestroyNode(pTagCondTmp);
×
589
    terrno = code;
×
590
    return code;
×
591
  }
592

593
  SNode* pNew = NULL;
310,510✔
594
  code = scalarCalculateConstants(pTagCondTmp, &pNew);
310,510✔
595
  if (TSDB_CODE_SUCCESS != code) {
310,510✔
596
    terrno = code;
×
597
    nodesDestroyNode(pTagCondTmp);
×
598
    *pQualified = false;
×
599

600
    return code;
×
601
  }
602

603
  SValueNode* pValue = (SValueNode*)pNew;
310,510✔
604
  *pQualified = pValue->datum.b;
310,510✔
605

606
  nodesDestroyNode(pNew);
310,510✔
607
  return TSDB_CODE_SUCCESS;
310,510✔
608
}
609

610
static EDealRes getColumn(SNode** pNode, void* pContext) {
51,818,897✔
611
  tagFilterAssist* pData = (tagFilterAssist*)pContext;
51,818,897✔
612
  SColumnNode*     pSColumnNode = NULL;
51,818,897✔
613
  if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
51,822,441✔
614
    pSColumnNode = *(SColumnNode**)pNode;
17,827,015✔
615
  } else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) {
34,014,055✔
616
    SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
572,287✔
617
    if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
572,287✔
618
      pData->code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pSColumnNode);
528,191✔
619
      if (NULL == pSColumnNode) {
528,191✔
620
        return DEAL_RES_ERROR;
×
621
      }
622
      pSColumnNode->colId = -1;
528,191✔
623
      pSColumnNode->colType = COLUMN_TYPE_TBNAME;
528,191✔
624
      pSColumnNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
528,191✔
625
      pSColumnNode->node.resType.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
528,191✔
626
      nodesDestroyNode(*pNode);
528,191✔
627
      *pNode = (SNode*)pSColumnNode;
528,191✔
628
    } else {
629
      return DEAL_RES_CONTINUE;
44,096✔
630
    }
631
  } else {
632
    return DEAL_RES_CONTINUE;
33,430,010✔
633
  }
634

635
  void* data = taosHashGet(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId));
18,354,217✔
636
  if (!data) {
18,348,462✔
637
    int32_t tempRes =
638
        taosHashPut(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode)));
15,783,144✔
639
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
15,785,401✔
640
      return DEAL_RES_ERROR;
×
641
    }
642
    pSColumnNode->slotId = pData->index++;
15,785,401✔
643
    SColumnInfo cInfo = {.colId = pSColumnNode->colId,
15,787,667✔
644
                         .type = pSColumnNode->node.resType.type,
15,785,825✔
645
                         .bytes = pSColumnNode->node.resType.bytes,
15,781,645✔
646
                         .pk = pSColumnNode->isPk};
15,786,948✔
647
#if TAG_FILTER_DEBUG
648
    qDebug("tagfilter build column info, slotId:%d, colId:%d, type:%d", pSColumnNode->slotId, cInfo.colId, cInfo.type);
649
#endif
650
    void* tmp = taosArrayPush(pData->cInfoList, &cInfo);
15,784,984✔
651
    if (!tmp) {
15,782,015✔
652
      return DEAL_RES_ERROR;
×
653
    }
654
  } else {
655
    SColumnNode* col = *(SColumnNode**)data;
2,565,318✔
656
    pSColumnNode->slotId = col->slotId;
2,565,318✔
657
  }
658

659
  return DEAL_RES_CONTINUE;
18,343,728✔
660
}
661

662
static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) {
14,824,736✔
663
  SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
14,824,736✔
664
  if (pColumnData == NULL) {
14,828,174✔
665
    return terrno;
×
666
  }
667

668
  pColumnData->info.type = pType->type;
14,828,174✔
669
  pColumnData->info.bytes = pType->bytes;
14,826,373✔
670
  pColumnData->info.scale = pType->scale;
14,822,486✔
671
  pColumnData->info.precision = pType->precision;
14,829,637✔
672

673
  int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
14,822,354✔
674
  if (code != TSDB_CODE_SUCCESS) {
14,820,705✔
675
    terrno = code;
×
676
    releaseColInfoData(pColumnData);
×
677
    return terrno;
×
678
  }
679

680
  pParam->columnData = pColumnData;
14,820,705✔
681
  pParam->colAlloced = true;
14,823,190✔
682
  return TSDB_CODE_SUCCESS;
14,819,761✔
683
}
684

685
static void releaseColInfoData(void* pCol) {
4,006,474✔
686
  if (pCol) {
4,006,474✔
687
    SColumnInfoData* col = (SColumnInfoData*)pCol;
4,006,474✔
688
    colDataDestroy(col);
4,006,474✔
689
    taosMemoryFree(col);
4,006,251✔
690
  }
691
}
4,005,894✔
692

693
void freeItem(void* p) {
183,399,659✔
694
  STUidTagInfo* pInfo = p;
183,399,659✔
695
  if (pInfo->pTagVal != NULL) {
183,399,659✔
696
    taosMemoryFree(pInfo->pTagVal);
183,057,283✔
697
  }
698
}
183,406,236✔
699

700
typedef struct {
701
  col_id_t  colId;
702
  SNode*    pValueNode;
703
  int32_t   bytes;  // length defined in schema
704
} STagDataEntry;
705

706
static int compareTagDataEntry(const void* a, const void* b) {
26,852✔
707
  STagDataEntry* p1 = (STagDataEntry*)a;
26,852✔
708
  STagDataEntry* p2 = (STagDataEntry*)b;
26,852✔
709
  return compareInt16Val(&p1->colId, &p2->colId);
26,852✔
710
}
711

712
static int32_t buildTagDataEntryKey(SArray* pIdWithValue, char** keyBuf, int32_t keyLen) {
13,426✔
713
  *keyBuf = (char*)taosMemoryCalloc(1, keyLen);
13,426✔
714
  if (NULL == *keyBuf) {
13,426✔
715
    qError(
×
716
      "failed to allocate memory for tag filter optimization key, size:%d",
717
      keyLen);
718
    return terrno;
×
719
  }
720
  char* pStart = *keyBuf;
13,426✔
721
  for (int32_t i = 0; i < taosArrayGetSize(pIdWithValue); ++i) {
40,278✔
722
    STagDataEntry* entry      = (STagDataEntry*)taosArrayGet(pIdWithValue, i);
26,852✔
723
    SValueNode*    pValueNode = (SValueNode*)entry->pValueNode;
26,852✔
724
    // num type may have different bytes length, use the smaller one
725
    int32_t        bytes = TMIN(entry->bytes, pValueNode->node.resType.bytes);
26,852✔
726

727
    (void)memcpy(pStart, &entry->colId, sizeof(col_id_t));
26,852✔
728
    pStart += sizeof(col_id_t);
26,852✔
729

730
    if (!pValueNode->isNull) {
26,852✔
731
      switch (pValueNode->node.resType.type) {
24,112✔
732
        case TSDB_DATA_TYPE_BOOL:
2,740✔
733
          (void)memcpy(
2,740✔
734
            pStart, &pValueNode->datum.b, bytes);
2,740✔
735
          pStart += bytes;
2,740✔
736
          break;
2,740✔
737
        case TSDB_DATA_TYPE_TINYINT:
12,604✔
738
        case TSDB_DATA_TYPE_SMALLINT:
739
        case TSDB_DATA_TYPE_INT:
740
        case TSDB_DATA_TYPE_BIGINT:
741
        case TSDB_DATA_TYPE_TIMESTAMP:
742
          (void)memcpy(
12,604✔
743
            pStart, &pValueNode->datum.i, bytes);
12,604✔
744
          pStart += bytes;
12,604✔
745
          break;
12,604✔
746
        case TSDB_DATA_TYPE_UTINYINT:
×
747
        case TSDB_DATA_TYPE_USMALLINT:
748
        case TSDB_DATA_TYPE_UINT:
749
        case TSDB_DATA_TYPE_UBIGINT:
750
          (void)memcpy(
×
751
            pStart, &pValueNode->datum.u, bytes);
×
752
          pStart += bytes;
×
753
          break;
×
754
        case TSDB_DATA_TYPE_FLOAT:
3,014✔
755
        case TSDB_DATA_TYPE_DOUBLE:
756
          (void)memcpy(
3,014✔
757
            pStart, &pValueNode->datum.d, bytes);
3,014✔
758
          pStart += bytes;
3,014✔
759
          break;
3,014✔
760
        case TSDB_DATA_TYPE_VARCHAR:
5,754✔
761
        case TSDB_DATA_TYPE_VARBINARY:
762
        case TSDB_DATA_TYPE_NCHAR:
763
          (void)memcpy(pStart,
5,754✔
764
            varDataVal(pValueNode->datum.p), varDataLen(pValueNode->datum.p));
5,754✔
765
          pStart += varDataLen(pValueNode->datum.p);
5,754✔
766
          break;
5,754✔
767
        default:
×
768
          qError("unsupported tag data type %d in tag filter optimization",
×
769
            pValueNode->node.resType.type);
770
          return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
771
      }
772
    }
773
  }
774

775
  return TSDB_CODE_SUCCESS;
13,426✔
776
}
777

778
static void extractTagDataEntry(
26,852✔
779
  SOperatorNode* pOpNode, SArray* pIdWithValue) {
780
  SNode* pLeft = pOpNode->pLeft;
26,852✔
781
  SNode* pRight = pOpNode->pRight;
26,852✔
782
  SColumnNode* pColNode = nodeType(pLeft) == QUERY_NODE_COLUMN ?
26,852✔
783
    (SColumnNode*)pLeft : (SColumnNode*)pRight;
26,852✔
784
  SValueNode* pValueNode = nodeType(pLeft) == QUERY_NODE_VALUE ?
26,852✔
785
    (SValueNode*)pLeft : (SValueNode*)pRight;
26,852✔
786

787
  STagDataEntry entry = {0};
26,852✔
788
  entry.colId = pColNode->colId;
26,852✔
789
  entry.pValueNode = (SNode*)pValueNode;
26,852✔
790
  entry.bytes = pColNode->node.resType.bytes;
26,852✔
791
  void* _tmp = taosArrayPush(pIdWithValue, &entry);
26,852✔
792
}
26,852✔
793

794
static int32_t extractTagFilterTagDataEntries(
13,426✔
795
  const SNode* pTagCond, SArray* pIdWithVal) {
796
  if (NULL == pTagCond || NULL == pIdWithVal ||
13,426✔
797
    (nodeType(pTagCond) != QUERY_NODE_OPERATOR &&
13,426✔
798
      nodeType(pTagCond) != QUERY_NODE_LOGIC_CONDITION)) {
13,426✔
799
    qError("invalid parameter to extract tag filter symbol");
×
800
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
801
  }
802

803
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR) {
13,426✔
804
    extractTagDataEntry((SOperatorNode*)pTagCond, pIdWithVal);
×
805
  } else if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION) {
13,426✔
806
    SNode* pChild = NULL;
13,426✔
807
    FOREACH(pChild, ((SLogicConditionNode*)pTagCond)->pParameterList) {
40,278✔
808
      extractTagDataEntry((SOperatorNode*)pChild, pIdWithVal);
26,852✔
809
    }
810
  }
811

812
  taosArraySort(pIdWithVal, compareTagDataEntry);
13,426✔
813

814
  return TSDB_CODE_SUCCESS;
13,426✔
815
}
816

817
static int32_t genStableTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
13,426✔
818
  if (pTagCond == NULL) {
13,426✔
819
    return TSDB_CODE_SUCCESS;
×
820
  }
821

822
  char*   payload = NULL;
13,426✔
823
  int32_t len = 0;
13,426✔
824
  int32_t code = TSDB_CODE_SUCCESS;
13,426✔
825
  int32_t lino = 0;
13,426✔
826

827
  SArray* pIdWithVal = taosArrayInit(TARRAY_MIN_SIZE, sizeof(STagDataEntry));
13,426✔
828
  code = extractTagFilterTagDataEntries(pTagCond, pIdWithVal);
13,426✔
829
  QUERY_CHECK_CODE(code, lino, _end);
13,426✔
830
  for (int32_t i = 0; i < taosArrayGetSize(pIdWithVal); ++i) {
40,278✔
831
    STagDataEntry* pEntry = taosArrayGet(pIdWithVal, i);
26,852✔
832
    len += sizeof(col_id_t) + pEntry->bytes;
26,852✔
833
  }
834
  code = buildTagDataEntryKey(pIdWithVal, &payload, len);
13,426✔
835
  QUERY_CHECK_CODE(code, lino, _end);
13,426✔
836

837
  tMD5Init(pContext);
13,426✔
838
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
13,426✔
839
  tMD5Final(pContext);
13,426✔
840

841
_end:
13,426✔
842
  if (TSDB_CODE_SUCCESS != code) {
13,426✔
843
    qError("%s failed at line %d since %s",
×
844
      __func__, __LINE__, tstrerror(code));
845
  }
846
  taosArrayDestroy(pIdWithVal);
13,426✔
847
  taosMemoryFree(payload);
13,426✔
848
  return code;
13,426✔
849
}
850

851
static int32_t genTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
26,910✔
852
  if (pTagCond == NULL) {
26,910✔
853
    return TSDB_CODE_SUCCESS;
25,294✔
854
  }
855

856
  char*   payload = NULL;
1,616✔
857
  int32_t len = 0;
1,616✔
858
  int32_t code = nodesNodeToMsg(pTagCond, &payload, &len);
1,616✔
859
  if (code != TSDB_CODE_SUCCESS) {
1,616✔
860
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
861
    return code;
×
862
  }
863

864
  tMD5Init(pContext);
1,616✔
865
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
1,616✔
866
  tMD5Final(pContext);
1,616✔
867

868
  // void* tmp = NULL;
869
  // uint32_t size = 0;
870
  // (void)taosAscii2Hex((const char*)pContext->digest, 16, &tmp, &size);
871
  // qInfo("tag filter digest payload: %s", tmp);
872
  // taosMemoryFree(tmp);
873

874
  taosMemoryFree(payload);
1,616✔
875
  return TSDB_CODE_SUCCESS;
1,616✔
876
}
877

878
static int32_t genTbGroupDigest(const SNode* pGroup, uint8_t* filterDigest, T_MD5_CTX* pContext) {
×
879
  int32_t code = TSDB_CODE_SUCCESS;
×
880
  int32_t lino = 0;
×
881
  char*   payload = NULL;
×
882
  int32_t len = 0;
×
883
  code = nodesNodeToMsg(pGroup, &payload, &len);
×
884
  QUERY_CHECK_CODE(code, lino, _end);
×
885

886
  if (filterDigest[0]) {
×
887
    payload = taosMemoryRealloc(payload, len + tListLen(pContext->digest));
×
888
    QUERY_CHECK_NULL(payload, code, lino, _end, terrno);
×
889
    memcpy(payload + len, filterDigest + 1, tListLen(pContext->digest));
×
890
    len += tListLen(pContext->digest);
×
891
  }
892

893
  tMD5Init(pContext);
×
894
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
×
895
  tMD5Final(pContext);
×
896

897
_end:
×
898
  taosMemoryFree(payload);
×
899
  if (code != TSDB_CODE_SUCCESS) {
×
900
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
901
  }
902
  return code;
×
903
}
904

905
int32_t qGetColumnsFromNodeList(void* data, bool isList, SArray** pColList) {
14,670,744✔
906
  int32_t code = TSDB_CODE_SUCCESS;
14,670,744✔
907
  tagFilterAssist ctx = {0};
14,670,744✔
908
  ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
14,672,357✔
909
  if (ctx.colHash == NULL) {
14,669,510✔
910
    code = terrno;
×
911
    goto end;
×
912
  }
913

914
  ctx.index = 0;
14,669,510✔
915
  ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
14,669,510✔
916
  if (ctx.cInfoList == NULL) {
14,673,370✔
917
    code = terrno;
7,655✔
918
    goto end;
×
919
  }
920

921
  if (isList) {
14,665,715✔
922
    SNode* pNode = NULL;
3,847,859✔
923
    FOREACH(pNode, (SNodeList*)data) {
7,860,927✔
924
      nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx);
4,012,593✔
925
      if (TSDB_CODE_SUCCESS != ctx.code) {
4,013,543✔
926
        code = ctx.code;
×
927
        goto end;
×
928
      }
929
      REPLACE_NODE(pNode);
4,013,543✔
930
    }
931
  } else {
932
    SNode* pNode = (SNode*)data;
10,817,856✔
933
    nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx);
10,818,739✔
934
    if (TSDB_CODE_SUCCESS != ctx.code) {
10,823,121✔
935
      code = ctx.code;
×
936
      goto end;
×
937
    }
938
  }
939
  
940
  if (pColList != NULL) *pColList = ctx.cInfoList;
14,669,919✔
941
  ctx.cInfoList = NULL;
14,664,748✔
942

943
end:
14,666,621✔
944
  taosHashCleanup(ctx.colHash);
14,669,856✔
945
  taosArrayDestroy(ctx.cInfoList);
14,652,467✔
946
  return code;
14,660,902✔
947
}
948

949
static int32_t buildGroupInfo(SColumnInfoData* pValue, int32_t i, SArray* gInfo) {
394,167✔
950
  int32_t code = TSDB_CODE_SUCCESS;
394,167✔
951
  SStreamGroupValue* v = taosArrayReserve(gInfo, 1);
394,167✔
952
  if (v == NULL) {
394,357✔
953
    code = terrno;
×
954
    goto end;
×
955
  }
956
  if (colDataIsNull_s(pValue, i)) {
788,919✔
957
    v->isNull = true;
6,165✔
958
  } else {
959
    v->isNull = false;
388,397✔
960
    char* data = colDataGetData(pValue, i);
388,397✔
961
    if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
388,002✔
962
      if (tTagIsJson(data)) {
×
963
        code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
×
964
        goto end;
×
965
      }
966
      if (tTagIsJsonNull(data)) {
×
967
        v->isNull = true;
×
968
        goto end;
×
969
      }
970
      int32_t len = getJsonValueLen(data);
×
971
      v->data.type = pValue->info.type;
×
972
      v->data.nData = len;
×
973
      v->data.pData = taosMemoryCalloc(1, len + 1);
×
974
      if (v->data.pData == NULL) {
×
975
        code = terrno;
×
976
        goto end;
×
977
      }
978
      memcpy(v->data.pData, data, len);
×
979
      qDebug("buildGroupInfo:%d add json data len:%d, data:%s", i, len, (char*)v->data.pData);
×
980
    } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
388,002✔
981
      if (varDataTLen(data) > pValue->info.bytes) {
291,731✔
982
        code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
983
        goto end;
×
984
      }
985
      v->data.type = pValue->info.type;
291,746✔
986
      v->data.nData = varDataLen(data);
291,746✔
987
      v->data.pData = taosMemoryCalloc(1, varDataLen(data) + 1);
291,541✔
988
      if (v->data.pData == NULL) {
291,541✔
989
        code = terrno;
×
990
        goto end;
×
991
      }
992
      memcpy(v->data.pData, varDataVal(data), varDataLen(data));
291,541✔
993
      qDebug("buildGroupInfo:%d add var data type:%d, len:%d, data:%s", i, pValue->info.type, varDataLen(data), (char*)v->data.pData);
291,541✔
994
    } else if (pValue->info.type == TSDB_DATA_TYPE_DECIMAL) {  // reader todo decimal
96,461✔
995
      v->data.type = pValue->info.type;
×
996
      v->data.nData = pValue->info.bytes;
×
997
      v->data.pData = taosMemoryCalloc(1, pValue->info.bytes);
×
998
      if (v->data.pData == NULL) {
×
999
        code = terrno;
×
1000
        goto end;
×
1001
      }
1002
      memcpy(&v->data.pData, data, pValue->info.bytes);
×
1003
      qDebug("buildGroupInfo:%d add data type:%d, data:%"PRId64, i, pValue->info.type, v->data.val);
×
1004
    } else {  // reader todo decimal
1005
      v->data.type = pValue->info.type;
96,461✔
1006
      memcpy(&v->data.val, data, pValue->info.bytes);
96,651✔
1007
      qDebug("buildGroupInfo:%d add data type:%d, data:%"PRId64, i, pValue->info.type, v->data.val);
96,651✔
1008
    }
1009
  }
1010
end:
40,150✔
1011
  if (code != TSDB_CODE_SUCCESS) {
394,562✔
1012
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1013
    v->isNull = true;
×
1014
  }
1015
  return code;
394,562✔
1016
}
1017

1018
static void getColInfoResultForGroupbyForStream(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo,
61,086✔
1019
                                   SStorageAPI* pAPI, SHashObj* groupIdMap) {
1020
  int32_t      code = TSDB_CODE_SUCCESS;
61,086✔
1021
  int32_t      lino = 0;
61,086✔
1022
  SArray*      pBlockList = NULL;
61,086✔
1023
  SSDataBlock* pResBlock = NULL;
61,086✔
1024
  SArray*      groupData = NULL;
61,086✔
1025
  SArray*      pUidTagList = NULL;
61,086✔
1026
  SArray*      gInfo = NULL;
61,086✔
1027
  int32_t      tbNameIndex = 0;
61,086✔
1028

1029
  int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
61,086✔
1030
  if (rows == 0) {
61,086✔
1031
    return;
×
1032
  }
1033

1034
  pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
61,086✔
1035
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
61,086✔
1036

1037
  for (int32_t i = 0; i < rows; ++i) {
281,712✔
1038
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
220,626✔
1039
    QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
220,626✔
1040
    STUidTagInfo info = {.uid = pkeyInfo->uid};
220,626✔
1041
    void*        tmp = taosArrayPush(pUidTagList, &info);
220,626✔
1042
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
220,626✔
1043
  }
1044
 
1045
  if (taosArrayGetSize(pUidTagList) > 0) {
61,086✔
1046
    code = pAPI->metaFn.getTableTagsByUid(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
61,086✔
1047
  } else {
1048
    code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
×
1049
  }
1050
  if (code != TSDB_CODE_SUCCESS) {
61,086✔
1051
    goto end;
×
1052
  }
1053

1054
  SArray* pColList = NULL;
61,086✔
1055
  code = qGetColumnsFromNodeList(group, true, &pColList);
61,086✔
1056
  if (code != TSDB_CODE_SUCCESS) {
61,086✔
1057
    goto end;
×
1058
  }
1059

1060
  for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
155,521✔
1061
    SColumnInfo* tmp = (SColumnInfo*)taosArrayGet(pColList, i);
94,435✔
1062
    if (tmp != NULL && tmp->colId == -1) {
94,435✔
1063
      tbNameIndex = i;
61,086✔
1064
    }
1065
  }
1066
  
1067
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
61,086✔
1068
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
61,086✔
1069
  taosArrayDestroy(pColList);
61,086✔
1070
  if (pResBlock == NULL) {
61,086✔
1071
    code = terrno;
×
1072
    goto end;
×
1073
  }
1074

1075
  pBlockList = taosArrayInit(2, POINTER_BYTES);
61,086✔
1076
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
61,086✔
1077

1078
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
61,086✔
1079
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
61,086✔
1080

1081
  groupData = taosArrayInit(2, POINTER_BYTES);
61,086✔
1082
  QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
61,086✔
1083

1084
  SNode* pNode = NULL;
61,086✔
1085
  FOREACH(pNode, group) {
155,521✔
1086
    SScalarParam output = {0};
94,230✔
1087

1088
    switch (nodeType(pNode)) {
94,230✔
1089
      case QUERY_NODE_VALUE:
×
1090
        break;
×
1091
      case QUERY_NODE_COLUMN:
94,435✔
1092
      case QUERY_NODE_OPERATOR:
1093
      case QUERY_NODE_FUNCTION: {
1094
        SExprNode* expNode = (SExprNode*)pNode;
94,435✔
1095
        code = createResultData(&expNode->resType, rows, &output);
94,435✔
1096
        if (code != TSDB_CODE_SUCCESS) {
94,230✔
1097
          goto end;
×
1098
        }
1099
        break;
94,230✔
1100
      }
1101

1102
      default:
×
1103
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
1104
        goto end;
×
1105
    }
1106

1107
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
94,230✔
1108
      SColumnNode*     pSColumnNode = (SColumnNode*)pNode;
94,435✔
1109
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
94,435✔
1110
      QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
94,230✔
1111
      code = colDataAssign(output.columnData, pColInfo, rows, NULL);
94,230✔
1112
    } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
×
1113
      continue;
×
1114
    } else {
1115
      gTaskScalarExtra.pStreamInfo = NULL;
×
1116
      gTaskScalarExtra.pStreamRange = NULL;
×
1117
      code = scalarCalculate(pNode, pBlockList, &output, &gTaskScalarExtra);
×
1118
    }
1119

1120
    if (code != TSDB_CODE_SUCCESS) {
94,230✔
1121
      releaseColInfoData(output.columnData);
×
1122
      goto end;
×
1123
    }
1124

1125
    void* tmp = taosArrayPush(groupData, &output.columnData);
94,435✔
1126
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
94,435✔
1127
  }
1128

1129
  for (int i = 0; i < rows; i++) {
281,712✔
1130
    gInfo = taosArrayInit(taosArrayGetSize(groupData), sizeof(SStreamGroupValue));
220,421✔
1131
    QUERY_CHECK_NULL(gInfo, code, lino, end, terrno);
220,421✔
1132

1133
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
220,421✔
1134
    QUERY_CHECK_NULL(info, code, lino, end, terrno);
220,421✔
1135

1136
    for (int j = 0; j < taosArrayGetSize(groupData); j++) {
544,569✔
1137
      SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
323,943✔
1138
        int32_t ret = buildGroupInfo(pValue, i, gInfo);
324,148✔
1139
        if (ret != TSDB_CODE_SUCCESS) {
324,148✔
1140
          qError("buildGroupInfo failed at line %d since %s", __LINE__, tstrerror(ret));
×
1141
          goto end;
×
1142
        }
1143
        if (j == tbNameIndex) {
324,148✔
1144
          SStreamGroupValue* v = taosArrayGetLast(gInfo);
220,626✔
1145
          if (v != NULL){
220,626✔
1146
            v->isTbname = true;
220,626✔
1147
            v->uid = info->uid;
220,626✔
1148
          }
1149
        }
1150
    }
1151

1152
    int32_t ret = taosHashPut(groupIdMap, &info->uid, sizeof(info->uid), &gInfo, POINTER_BYTES);
220,626✔
1153
    if (ret != TSDB_CODE_SUCCESS) {
220,626✔
1154
      qError("put groupid to map failed at line %d since %s", __LINE__, tstrerror(ret));
×
1155
      goto end;
×
1156
    }
1157
    qDebug("put groupid to map gid:%" PRIu64, info->uid);
220,626✔
1158
    gInfo = NULL;
220,626✔
1159
  }
1160

1161
end:
61,291✔
1162
  blockDataDestroy(pResBlock);
61,086✔
1163
  taosArrayDestroy(pBlockList);
61,086✔
1164
  taosArrayDestroyEx(pUidTagList, freeItem);
61,086✔
1165
  taosArrayDestroyP(groupData, releaseColInfoData);
60,881✔
1166
  taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
60,881✔
1167

1168
  if (code != TSDB_CODE_SUCCESS) {
61,086✔
1169
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1170
  }
1171
}
1172

1173
int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo, uint8_t* digest,
3,786,720✔
1174
                                   SStorageAPI* pAPI, bool initRemainGroups, SHashObj* groupIdMap) {
1175
  int32_t      code = TSDB_CODE_SUCCESS;
3,786,720✔
1176
  int32_t      lino = 0;
3,786,720✔
1177
  SArray*      pBlockList = NULL;
3,786,720✔
1178
  SSDataBlock* pResBlock = NULL;
3,786,720✔
1179
  void*        keyBuf = NULL;
3,787,131✔
1180
  SArray*      groupData = NULL;
3,787,131✔
1181
  SArray*      pUidTagList = NULL;
3,787,131✔
1182
  SArray*      tableList = NULL;
3,787,131✔
1183
  SArray*      gInfo = NULL;
3,787,659✔
1184

1185
  int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
3,787,131✔
1186
  if (rows == 0) {
3,787,131✔
1187
    return TSDB_CODE_SUCCESS;
×
1188
  } 
1189

1190
  T_MD5_CTX context = {0};
3,787,131✔
1191
  if (tsTagFilterCache && groupIdMap == NULL) {
3,787,131✔
1192
    SNodeListNode* listNode = NULL;
×
1193
    code = nodesMakeNode(QUERY_NODE_NODE_LIST, (SNode**)&listNode);
×
1194
    if (TSDB_CODE_SUCCESS != code) {
×
1195
      goto end;
×
1196
    }
1197
    listNode->pNodeList = group;
×
1198
    code = genTbGroupDigest((SNode*)listNode, digest, &context);
×
1199
    QUERY_CHECK_CODE(code, lino, end);
×
1200

1201
    nodesFree(listNode);
×
1202

1203
    code = pAPI->metaFn.metaGetCachedTbGroup(pVnode, pTableListInfo->idInfo.suid, context.digest,
×
1204
                                             tListLen(context.digest), &tableList);
1205
    QUERY_CHECK_CODE(code, lino, end);
×
1206

1207
    if (tableList) {
×
1208
      taosArrayDestroy(pTableListInfo->pTableList);
×
1209
      pTableListInfo->pTableList = tableList;
×
1210
      qDebug("retrieve tb group list from cache, numOfTables:%d",
×
1211
             (int32_t)taosArrayGetSize(pTableListInfo->pTableList));
1212
      goto end;
×
1213
    }
1214
  }
1215

1216
  pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
3,787,131✔
1217
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
3,787,131✔
1218

1219
  for (int32_t i = 0; i < rows; ++i) {
25,101,453✔
1220
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
21,313,182✔
1221
    QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
21,313,794✔
1222
    STUidTagInfo info = {.uid = pkeyInfo->uid};
21,313,794✔
1223
    void*        tmp = taosArrayPush(pUidTagList, &info);
21,314,322✔
1224
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
21,314,322✔
1225
  }
1226

1227
  if (taosArrayGetSize(pUidTagList) > 0) {
3,788,271✔
1228
    code = pAPI->metaFn.getTableTagsByUid(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
3,787,659✔
1229
  } else {
1230
    code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
×
1231
  }
1232
  if (code != TSDB_CODE_SUCCESS) {
3,787,248✔
1233
    goto end;
×
1234
  }
1235

1236
  SArray* pColList = NULL;
3,787,248✔
1237
  code = qGetColumnsFromNodeList(group, true, &pColList); 
3,787,659✔
1238
  if (code != TSDB_CODE_SUCCESS) {
3,787,248✔
1239
    goto end;
×
1240
  }
1241

1242
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
3,787,248✔
1243
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
3,787,184✔
1244
  taosArrayDestroy(pColList);
3,787,184✔
1245
  if (pResBlock == NULL) {
3,787,184✔
1246
    code = terrno;
×
1247
    goto end;
×
1248
  }
1249

1250
  //  int64_t st1 = taosGetTimestampUs();
1251
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
1252

1253
  pBlockList = taosArrayInit(2, POINTER_BYTES);
3,787,184✔
1254
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
3,787,659✔
1255

1256
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
3,787,659✔
1257
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
3,787,659✔
1258

1259
  groupData = taosArrayInit(2, POINTER_BYTES);
3,787,659✔
1260
  QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
3,786,843✔
1261

1262
  SNode* pNode = NULL;
3,786,843✔
1263
  FOREACH(pNode, group) {
7,703,124✔
1264
    SScalarParam output = {0};
3,919,143✔
1265

1266
    switch (nodeType(pNode)) {
3,918,703✔
1267
      case QUERY_NODE_VALUE:
×
1268
        break;
×
1269
      case QUERY_NODE_COLUMN:
3,913,183✔
1270
      case QUERY_NODE_OPERATOR:
1271
      case QUERY_NODE_FUNCTION: {
1272
        SExprNode* expNode = (SExprNode*)pNode;
3,913,183✔
1273
        code = createResultData(&expNode->resType, rows, &output);
3,913,183✔
1274
        if (code != TSDB_CODE_SUCCESS) {
3,913,183✔
1275
          goto end;
×
1276
        }
1277
        break;
3,913,183✔
1278
      }
1279
      case QUERY_NODE_REMOTE_VALUE: {
6,336✔
1280
        SRemoteValueNode* pRemote = (SRemoteValueNode*)pNode;
6,336✔
1281
        code = qFetchRemoteValue(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, pRemote);
6,336✔
1282
        QUERY_CHECK_CODE(code, lino, end);
6,336✔
1283
        break;
6,336✔
1284
      }
1285
      
1286
      default:
×
1287
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
1288
        goto end;
×
1289
    }
1290

1291
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
3,919,519✔
1292
      SColumnNode*     pSColumnNode = (SColumnNode*)pNode;
3,888,578✔
1293
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
3,888,578✔
1294
      QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
3,888,613✔
1295
      code = colDataAssign(output.columnData, pColInfo, rows, NULL);
3,888,613✔
1296
    } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
30,530✔
1297
      continue;
6,336✔
1298
    } else {
1299
      gTaskScalarExtra.pStreamInfo = NULL;
24,194✔
1300
      gTaskScalarExtra.pStreamRange = NULL;
24,194✔
1301
      code = scalarCalculate(pNode, pBlockList, &output, &gTaskScalarExtra);
24,194✔
1302
    }
1303

1304
    if (code != TSDB_CODE_SUCCESS) {
3,911,632✔
1305
      releaseColInfoData(output.columnData);
×
1306
      goto end;
×
1307
    }
1308

1309
    void* tmp = taosArrayPush(groupData, &output.columnData);
3,911,021✔
1310
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
3,911,021✔
1311
  }
1312

1313
  int32_t keyLen = 0;
3,785,089✔
1314
  SNode*  node;
1315
  FOREACH(node, group) {
7,700,206✔
1316
    SExprNode* pExpr = (SExprNode*)node;
3,917,234✔
1317
    keyLen += pExpr->resType.bytes;
3,917,234✔
1318
  }
1319

1320
  int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
3,784,013✔
1321
  keyLen += nullFlagSize;
3,784,389✔
1322

1323
  keyBuf = taosMemoryCalloc(1, keyLen);
3,784,389✔
1324
  if (keyBuf == NULL) {
3,783,815✔
1325
    code = terrno;
×
1326
    goto end;
×
1327
  }
1328

1329
  if (initRemainGroups) {
3,783,815✔
1330
    pTableListInfo->remainGroups =
1,834,096✔
1331
        taosHashInit(rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
1,831,721✔
1332
    if (pTableListInfo->remainGroups == NULL) {
1,834,096✔
1333
      code = terrno;
×
1334
      goto end;
×
1335
    }
1336
  }
1337

1338
  for (int i = 0; i < rows; i++) {
25,093,428✔
1339
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
21,306,367✔
1340
    QUERY_CHECK_NULL(info, code, lino, end, terrno);
21,308,699✔
1341

1342
    if (groupIdMap != NULL){
21,308,699✔
1343
      gInfo = taosArrayInit(taosArrayGetSize(groupData), sizeof(SStreamGroupValue));
59,089✔
1344
    }
1345
    
1346
    char* isNull = (char*)keyBuf;
21,309,690✔
1347
    char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(group);
21,309,690✔
1348
    for (int j = 0; j < taosArrayGetSize(groupData); j++) {
43,670,642✔
1349
      SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
22,362,533✔
1350

1351
      if (groupIdMap != NULL && gInfo != NULL) {
22,362,066✔
1352
        int32_t ret = buildGroupInfo(pValue, i, gInfo);
70,224✔
1353
        if (ret != TSDB_CODE_SUCCESS) {
70,414✔
1354
          qError("buildGroupInfo failed at line %d since %s", __LINE__, tstrerror(ret));
×
1355
          taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
×
1356
          gInfo = NULL;
×
1357
        }
1358
      }
1359
      
1360
      if (colDataIsNull_s(pValue, i)) {
44,723,306✔
1361
        isNull[j] = 1;
95,619✔
1362
      } else {
1363
        isNull[j] = 0;
22,265,431✔
1364
        char* data = colDataGetData(pValue, i);
22,264,676✔
1365
        if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
22,266,957✔
1366
          // if (tTagIsJson(data)) {
1367
          //   code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
1368
          //   goto end;
1369
          // }
1370
          if (tTagIsJsonNull(data)) {
73,187✔
1371
            isNull[j] = 1;
×
1372
            continue;
×
1373
          }
1374
          int32_t len = getJsonValueLen(data);
73,187✔
1375
          memcpy(pStart, data, len);
73,187✔
1376
          pStart += len;
73,187✔
1377
        } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
22,190,203✔
1378
          if (IS_STR_DATA_BLOB(pValue->info.type)) {
19,334,554✔
1379
            if (blobDataTLen(data) > TSDB_MAX_BLOB_LEN) {
3,365✔
1380
              code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
1381
              goto end;
×
1382
            }
1383
            memcpy(pStart, data, blobDataTLen(data));
×
1384
            pStart += blobDataTLen(data);
×
1385
          } else {
1386
            if (varDataTLen(data) > pValue->info.bytes) {
19,334,512✔
1387
              code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
1388
              goto end;
×
1389
            }
1390
            memcpy(pStart, data, varDataTLen(data));
19,335,457✔
1391
            pStart += varDataTLen(data);
19,336,885✔
1392
          }
1393
        } else {
1394
          memcpy(pStart, data, pValue->info.bytes);
2,856,758✔
1395
          pStart += pValue->info.bytes;
2,856,969✔
1396
        }
1397
      }
1398
    }
1399

1400
    int32_t len = (int32_t)(pStart - (char*)keyBuf);
21,305,135✔
1401
    info->groupId = calcGroupId(keyBuf, len);
21,305,135✔
1402
    if (groupIdMap != NULL && gInfo != NULL) {
21,308,252✔
1403
      int32_t ret = taosHashPut(groupIdMap, &info->groupId, sizeof(info->groupId), &gInfo, POINTER_BYTES);
59,089✔
1404
      if (ret != TSDB_CODE_SUCCESS) {
59,089✔
1405
        qError("put groupid to map failed at line %d since %s", __LINE__, tstrerror(ret));
×
1406
        taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
×
1407
      }
1408
      qDebug("put groupid to map gid:%" PRIu64, info->groupId);
59,089✔
1409
      gInfo = NULL;
59,089✔
1410
    }
1411
    if (initRemainGroups) {
21,308,252✔
1412
      // groupId ~ table uid
1413
      code = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid),
10,449,507✔
1414
                         sizeof(info->uid));
1415
      if (code == TSDB_CODE_DUP_KEY) {
10,447,502✔
1416
        code = TSDB_CODE_SUCCESS;
742,674✔
1417
      }
1418
      QUERY_CHECK_CODE(code, lino, end);
10,447,502✔
1419
    }
1420
  }
1421

1422
  if (tsTagFilterCache && groupIdMap == NULL) {
3,787,061✔
1423
    tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
×
1424
    QUERY_CHECK_NULL(tableList, code, lino, end, terrno);
×
1425

1426
    code = pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest,
×
1427
                                              tListLen(context.digest), tableList,
1428
                                              taosArrayGetSize(tableList) * sizeof(STableKeyInfo));
×
1429
    QUERY_CHECK_CODE(code, lino, end);
×
1430
  }
1431

1432
  //  int64_t st2 = taosGetTimestampUs();
1433
  //  qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
1434

1435
end:
3,787,061✔
1436
  taosMemoryFreeClear(keyBuf);
3,787,283✔
1437
  blockDataDestroy(pResBlock);
3,787,044✔
1438
  taosArrayDestroy(pBlockList);
3,786,566✔
1439
  taosArrayDestroyEx(pUidTagList, freeItem);
3,785,275✔
1440
  taosArrayDestroyP(groupData, releaseColInfoData);
3,787,159✔
1441
  taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
3,786,380✔
1442

1443
  if (code != TSDB_CODE_SUCCESS) {
3,786,380✔
1444
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1445
  }
1446
  return code;
3,786,748✔
1447
}
1448

1449
static int32_t nameComparFn(const void* p1, const void* p2) {
639,490✔
1450
  const char* pName1 = *(const char**)p1;
639,490✔
1451
  const char* pName2 = *(const char**)p2;
639,490✔
1452

1453
  int32_t ret = strcmp(pName1, pName2);
639,490✔
1454
  if (ret == 0) {
639,490✔
1455
    return 0;
15,762✔
1456
  } else {
1457
    return (ret > 0) ? 1 : -1;
623,728✔
1458
  }
1459
}
1460

1461
static SArray* getTableNameList(const SNodeListNode* pList) {
358,763✔
1462
  int32_t    code = TSDB_CODE_SUCCESS;
358,763✔
1463
  int32_t    lino = 0;
358,763✔
1464
  int32_t    len = LIST_LENGTH(pList->pNodeList);
358,763✔
1465
  SListCell* cell = pList->pNodeList->pHead;
358,763✔
1466

1467
  SArray* pTbList = taosArrayInit(len, POINTER_BYTES);
358,763✔
1468
  QUERY_CHECK_NULL(pTbList, code, lino, _end, terrno);
358,763✔
1469

1470
  for (int i = 0; i < pList->pNodeList->length; i++) {
987,550✔
1471
    SValueNode* valueNode = (SValueNode*)cell->pNode;
628,787✔
1472
    if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
628,787✔
1473
      terrno = TSDB_CODE_INVALID_PARA;
×
1474
      taosArrayDestroy(pTbList);
×
1475
      return NULL;
×
1476
    }
1477

1478
    char* name = varDataVal(valueNode->datum.p);
628,787✔
1479
    void* tmp = taosArrayPush(pTbList, &name);
628,787✔
1480
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
628,787✔
1481
    cell = cell->pNext;
628,787✔
1482
  }
1483

1484
  size_t numOfTables = taosArrayGetSize(pTbList);
358,763✔
1485

1486
  // order the name
1487
  taosArraySort(pTbList, nameComparFn);
358,763✔
1488

1489
  // remove the duplicates
1490
  SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
358,763✔
1491
  QUERY_CHECK_NULL(pNewList, code, lino, _end, terrno);
358,763✔
1492
  void* tmpTbl = taosArrayGet(pTbList, 0);
358,763✔
1493
  QUERY_CHECK_NULL(tmpTbl, code, lino, _end, terrno);
358,763✔
1494
  void* tmp = taosArrayPush(pNewList, tmpTbl);
358,763✔
1495
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
358,763✔
1496

1497
  for (int32_t i = 1; i < numOfTables; ++i) {
628,787✔
1498
    char** name = taosArrayGetLast(pNewList);
270,024✔
1499
    char** nameInOldList = taosArrayGet(pTbList, i);
270,024✔
1500
    QUERY_CHECK_NULL(nameInOldList, code, lino, _end, terrno);
270,024✔
1501
    if (strcmp(*name, *nameInOldList) == 0) {
270,024✔
1502
      continue;
8,498✔
1503
    }
1504

1505
    tmp = taosArrayPush(pNewList, nameInOldList);
261,526✔
1506
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
261,526✔
1507
  }
1508

1509
_end:
358,763✔
1510
  taosArrayDestroy(pTbList);
358,763✔
1511
  if (code != TSDB_CODE_SUCCESS) {
358,763✔
1512
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1513
    return NULL;
×
1514
  }
1515
  return pNewList;
358,763✔
1516
}
1517

1518
static int tableUidCompare(const void* a, const void* b) {
×
1519
  uint64_t u1 = *(uint64_t*)a;
×
1520
  uint64_t u2 = *(uint64_t*)b;
×
1521

1522
  if (u1 == u2) {
×
1523
    return 0;
×
1524
  }
1525

1526
  return u1 < u2 ? -1 : 1;
×
1527
}
1528

1529
static int32_t filterTableInfoCompare(const void* a, const void* b) {
16,315,454✔
1530
  STUidTagInfo* p1 = (STUidTagInfo*)a;
16,315,454✔
1531
  STUidTagInfo* p2 = (STUidTagInfo*)b;
16,315,454✔
1532

1533
  if (p1->uid == p2->uid) {
16,315,454✔
1534
    return 0;
×
1535
  }
1536

1537
  return p1->uid < p2->uid ? -1 : 1;
16,315,982✔
1538
}
1539

1540
static FilterCondType checkTagCond(SNode* cond) {
12,033,017✔
1541
  if (nodeType(cond) == QUERY_NODE_OPERATOR) {
12,033,017✔
1542
    return FILTER_NO_LOGIC;
8,771,933✔
1543
  }
1544
  if (nodeType(cond) == QUERY_NODE_LOGIC_CONDITION && ((SLogicConditionNode*)cond)->condType == LOGIC_COND_TYPE_AND) {
3,264,062✔
1545
    return FILTER_AND;
2,764,627✔
1546
  }
1547
  return FILTER_OTHER;
496,246✔
1548
}
1549

1550
static bool optimizeTbnameInCond(void* pVnode, int64_t suid, SArray* list, SNode* cond, SStorageAPI* pAPI) {
12,384,939✔
1551
  int32_t code = 0;
12,384,939✔
1552
  int32_t ntype = nodeType(cond);
12,384,939✔
1553

1554
  if (ntype == QUERY_NODE_OPERATOR) {
12,387,810✔
1555
    code = optimizeTbnameInCondImpl(pVnode, list, cond, pAPI, suid);
9,117,527✔
1556
    return code == 0;
9,113,088✔
1557
  }
1558
  if (ntype != QUERY_NODE_LOGIC_CONDITION || ((SLogicConditionNode*)cond)->condType != LOGIC_COND_TYPE_AND) {
3,270,283✔
1559
    return false;
496,246✔
1560
  }
1561

1562
  bool                 hasTbnameCond = false;
2,774,037✔
1563
  SLogicConditionNode* pNode = (SLogicConditionNode*)cond;
2,774,037✔
1564
  SNodeList*           pList = (SNodeList*)pNode->pParameterList;
2,774,037✔
1565

1566
  int32_t len = LIST_LENGTH(pList);
2,773,844✔
1567
  if (len <= 0) {
2,773,844✔
NEW
1568
    return false;
×
1569
  }
1570

1571
  SListCell* cell = pList->pHead;
2,773,844✔
1572
  for (int i = 0; i < len; i++) {
8,692,673✔
1573
    if (cell == NULL) break;
5,918,720✔
1574
    if (optimizeTbnameInCondImpl(pVnode, list, cell->pNode, pAPI, suid) == 0) {
5,918,720✔
1575
      hasTbnameCond = true;
7,520✔
1576
      break;
7,520✔
1577
    }
1578
    cell = cell->pNext;
5,913,715✔
1579
  }
1580

1581
  taosArraySort(list, filterTableInfoCompare);
2,781,473✔
1582
  taosArrayRemoveDuplicate(list, filterTableInfoCompare, NULL);
2,773,844✔
1583

1584
  if (hasTbnameCond) {
2,771,258✔
1585
    code = pAPI->metaFn.getTableTagsByUid(pVnode, suid, list);
7,520✔
1586
    return code == 0;
1587
  }
1588

2,772,413✔
1589
  return false;
1590
}
1591

1592
// only return uid that does not contained in pExistedUidList
15,037,765✔
1593
static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, SNode* pTagCond, SStorageAPI* pStoreAPI,
1594
                                        uint64_t suid) {
15,037,765✔
1595
  if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) {
5,022✔
1596
    return -1;
1597
  }
1598

15,034,897✔
1599
  SOperatorNode* pNode = (SOperatorNode*)pTagCond;
15,034,897✔
1600
  if (pNode->opType != OP_TYPE_IN) {
14,172,129✔
1601
    return -1;
1602
  }
1603

863,862✔
1604
  if ((pNode->pLeft != NULL && ((nodeType(pNode->pLeft) == QUERY_NODE_FUNCTION &&
358,763✔
1605
                                 ((SFunctionNode*)pNode->pLeft)->funcType == FUNCTION_TYPE_TBNAME)) ||
504,281✔
1606
       (nodeType(pNode->pLeft) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pLeft)->colType == COLUMN_TYPE_TBNAME)) &&
359,244✔
1607
      (pNode->pRight != NULL && nodeType(pNode->pRight) == QUERY_NODE_NODE_LIST)) {
358,763✔
1608
    SNodeListNode* pList = (SNodeListNode*)pNode->pRight;
1609

358,763✔
1610
    int32_t len = LIST_LENGTH(pList->pNodeList);
358,763✔
UNCOV
1611
    if (len <= 0) {
×
1612
      return -1;
1613
    }
1614

358,763✔
1615
    SArray*   pTbList = getTableNameList(pList);
358,763✔
1616
    int32_t   numOfTables = taosArrayGetSize(pTbList);
358,763✔
1617
    SHashObj* uHash = NULL;
1618

358,763✔
1619
    size_t numOfExisted = taosArrayGetSize(pExistedUidList);  // len > 0 means there already have uids
358,763✔
1620
    if (numOfExisted > 0) {
2,010✔
1621
      uHash = taosHashInit(numOfExisted / 0.7, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
2,010✔
UNCOV
1622
      if (!uHash) {
×
1623
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1624
        return terrno;
1625
      }
1626

2,003,495✔
1627
      for (int i = 0; i < numOfExisted; i++) {
2,000,985✔
1628
        STUidTagInfo* pTInfo = taosArrayGet(pExistedUidList, i);
2,001,485✔
UNCOV
1629
        if (!pTInfo) {
×
1630
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1631
          return terrno;
1632
        }
2,001,485✔
1633
        int32_t tempRes = taosHashPut(uHash, &pTInfo->uid, sizeof(uint64_t), &i, sizeof(i));
2,001,485✔
UNCOV
1634
        if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
×
1635
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
1636
          return tempRes;
1637
        }
1638
      }
1639
    }
1640

964,300✔
1641
    for (int i = 0; i < numOfTables; i++) {
611,791✔
1642
      char* name = taosArrayGetP(pTbList, i);
1643

611,791✔
1644
      uint64_t uid = 0, csuid = 0;
611,791✔
1645
      if (pStoreAPI->metaFn.getTableUidByName(pVnode, name, &uid) == 0) {
359,968✔
1646
        ETableType tbType = TSDB_TABLE_MAX;
359,968✔
1647
        if (pStoreAPI->metaFn.getTableTypeSuidByName(pVnode, name, &tbType, &csuid) == 0 &&
359,968✔
1648
            tbType == TSDB_CHILD_TABLE) {
353,714✔
1649
          if (suid != csuid) {
2,000✔
1650
            continue;
1651
          }
351,714✔
1652
          if (NULL == uHash || taosHashGet(uHash, &uid, sizeof(uid)) == NULL) {
350,709✔
1653
            STUidTagInfo s = {.uid = uid, .name = name, .pTagVal = NULL};
350,709✔
1654
            void*        tmp = taosArrayPush(pExistedUidList, &s);
350,709✔
UNCOV
1655
            if (!tmp) {
×
1656
              return terrno;
1657
            }
1658
          }
1659
        } else {
6,254✔
1660
          taosArrayDestroy(pTbList);
6,254✔
1661
          taosHashCleanup(uHash);
6,254✔
1662
          return -1;
1663
        }
1664
      } else {
1665
        //        qWarn("failed to get tableIds from by table name: %s, reason: %s", name, tstrerror(terrno));
251,823✔
1666
        terrno = 0;
1667
      }
1668
    }
1669

352,509✔
1670
    taosHashCleanup(uHash);
352,509✔
1671
    taosArrayDestroy(pTbList);
352,509✔
1672
    return 0;
1673
  }
1674

504,149✔
1675
  return -1;
1676
}
1677

16,267,600✔
1678
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
1679
                                        SStorageAPI* pStorageAPI) {
16,267,600✔
1680
  int32_t      code = TSDB_CODE_SUCCESS;
16,267,600✔
1681
  int32_t      lino = 0;
16,267,600✔
1682
  SSDataBlock* pResBlock = NULL;
16,269,672✔
1683
  code = createDataBlock(&pResBlock);
16,268,935✔
1684
  QUERY_CHECK_CODE(code, lino, _end);
1685

33,663,145✔
1686
  for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
17,389,768✔
1687
    SColumnInfoData colInfo = {0};
17,388,961✔
1688
    void*           tmp = taosArrayGet(pColList, i);
17,383,025✔
1689
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,383,025✔
1690
    colInfo.info = *(SColumnInfo*)tmp;
17,380,723✔
1691
    code = blockDataAppendColInfo(pResBlock, &colInfo);
17,393,187✔
1692
    QUERY_CHECK_CODE(code, lino, _end);
1693
  }
1694

16,273,082✔
1695
  code = blockDataEnsureCapacity(pResBlock, numOfTables);
16,264,741✔
UNCOV
1696
  if (code != TSDB_CODE_SUCCESS) {
×
1697
    terrno = code;
×
1698
    blockDataDestroy(pResBlock);
×
1699
    return NULL;
1700
  }
1701

16,264,741✔
1702
  pResBlock->info.rows = numOfTables;
1703

16,271,674✔
1704
  int32_t numOfCols = taosArrayGetSize(pResBlock->pDataBlock);
1705

201,533,762✔
1706
  for (int32_t i = 0; i < numOfTables; i++) {
185,254,021✔
1707
    STUidTagInfo* p1 = taosArrayGet(pUidTagList, i);
185,698,873✔
1708
    QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
1709

379,426,607✔
1710
    for (int32_t j = 0; j < numOfCols; j++) {
193,546,667✔
1711
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
193,473,708✔
1712
      QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1713

193,473,708✔
1714
      if (pColInfo->info.colId == -1) {  // tbname
6,813,061✔
1715
        char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
6,800,441✔
1716
        if (p1->name != NULL) {
350,709✔
1717
          STR_TO_VARSTR(str, p1->name);
1718
        } else {  // name is not retrieved during filter
6,454,067✔
1719
          code = pStorageAPI->metaFn.getTableNameByUid(pVnode, p1->uid, str);
6,427,332✔
1720
          QUERY_CHECK_CODE(code, lino, _end);
1721
        }
1722

6,778,041✔
1723
        code = colDataSetVal(pColInfo, i, str, false);
6,807,011✔
1724
        QUERY_CHECK_CODE(code, lino, _end);
1725
#if TAG_FILTER_DEBUG
1726
        qDebug("tagfilter uid:%ld, tbname:%s", *uid, str + 2);
1727
#endif
1728
      } else {
186,696,537✔
1729
        STagVal tagVal = {0};
186,706,754✔
1730
        tagVal.cid = pColInfo->info.colId;
186,697,206✔
1731
        if (p1->pTagVal == NULL) {
3,425✔
1732
          colDataSetNULL(pColInfo, i);
1733
        } else {
186,665,480✔
1734
          const char* p = pStorageAPI->metaFn.extractTagVal(p1->pTagVal, pColInfo->info.type, &tagVal);
1735

186,789,501✔
1736
          if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) {
2,475,027✔
1737
            colDataSetNULL(pColInfo, i);
184,304,116✔
1738
          } else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
578,605✔
1739
            code = colDataSetVal(pColInfo, i, p, false);
578,605✔
1740
            QUERY_CHECK_CODE(code, lino, _end);
308,654,422✔
1741
          } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
124,805,941✔
UNCOV
1742
            if (IS_STR_DATA_BLOB(pColInfo->info.type)) {
×
1743
              QUERY_CHECK_CODE(code = TSDB_CODE_BLOB_NOT_SUPPORT_TAG, lino, _end);
1744
            }
124,822,439✔
1745
            char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
124,911,263✔
1746
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
124,911,263✔
1747
            varDataSetLen(tmp, tagVal.nData);
124,922,741✔
1748
            memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
124,922,434✔
1749
            code = colDataSetVal(pColInfo, i, tmp, false);
1750
#if TAG_FILTER_DEBUG
1751
            qDebug("tagfilter varch:%s", tmp + 2);
1752
#endif
124,914,532✔
1753
            taosMemoryFree(tmp);
124,967,676✔
1754
            QUERY_CHECK_CODE(code, lino, _end);
1755
          } else {
58,871,517✔
1756
            code = colDataSetVal(pColInfo, i, (const char*)&tagVal.i64, false);
58,934,818✔
1757
            QUERY_CHECK_CODE(code, lino, _end);
1758
#if TAG_FILTER_DEBUG
1759
            if (pColInfo->info.type == TSDB_DATA_TYPE_INT) {
1760
              qDebug("tagfilter int:%d", *(int*)(&tagVal.i64));
1761
            } else if (pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE) {
1762
              qDebug("tagfilter double:%f", *(double*)(&tagVal.i64));
1763
            }
1764
#endif
1765
          }
1766
        }
1767
      }
1768
    }
1769
  }
1770

16,302,414✔
1771
_end:
16,279,741✔
1772
  if (code != TSDB_CODE_SUCCESS) {
5,000✔
UNCOV
1773
    blockDataDestroy(pResBlock);
×
1774
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1775
    terrno = code;
×
1776
    return NULL;
1777
  }
16,274,741✔
1778
  return pResBlock;
1779
}
1780

10,798,124✔
1781
static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList,
1782
                                 bool* pResultList, bool addUid) {
10,798,124✔
1783
  taosArrayClear(pUidList);
1784

10,805,282✔
1785
  STableKeyInfo info = {.uid = 0, .groupId = 0};
10,806,206✔
1786
  int32_t       numOfTables = taosArrayGetSize(pUidTagList);
172,660,646✔
1787
  for (int32_t i = 0; i < numOfTables; ++i) {
161,838,966✔
1788
    if (pResultList[i]) {
73,438,370✔
1789
      STUidTagInfo* tmpTag = (STUidTagInfo*)taosArrayGet(pUidTagList, i);
73,442,985✔
UNCOV
1790
      if (!tmpTag) {
×
1791
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1792
        return terrno;
1793
      }
73,442,985✔
1794
      uint64_t uid = tmpTag->uid;
73,430,755✔
1795
      qDebug("tagfilter get uid:%" PRId64 ", res:%d", uid, pResultList[i]);
1796

73,446,531✔
1797
      info.uid = uid;
1798
      //qInfo("doSetQualifiedUid row:%d added to pTableList", i);
73,446,531✔
1799
      void* p = taosArrayPush(pListInfo->pTableList, &info);
73,445,963✔
UNCOV
1800
      if (p == NULL) {
×
1801
        return terrno;
1802
      }
1803

73,445,963✔
1804
      if (addUid) {
1805
        //qInfo("doSetQualifiedUid row:%d added to pUidList", i);
8,559✔
1806
        void* tmp = taosArrayPush(pUidList, &uid);
8,559✔
UNCOV
1807
        if (tmp == NULL) {
×
1808
          return terrno;
1809
        }
1810
      }
1811
    } else {
1812
      //qInfo("doSetQualifiedUid row:%d failed", i);
1813
    }
1814
  }
1815

10,821,680✔
1816
  return TSDB_CODE_SUCCESS;
1817
}
1818

12,386,957✔
1819
static int32_t copyExistedUids(SArray* pUidTagList, const SArray* pUidList) {
12,386,957✔
1820
  int32_t code = TSDB_CODE_SUCCESS;
12,386,957✔
1821
  int32_t numOfExisted = taosArrayGetSize(pUidList);
12,386,813✔
1822
  if (numOfExisted == 0) {
9,881,522✔
1823
    return code;
1824
  }
1825

28,540,520✔
1826
  for (int32_t i = 0; i < numOfExisted; ++i) {
26,035,085✔
1827
    uint64_t* uid = taosArrayGet(pUidList, i);
26,035,085✔
UNCOV
1828
    if (!uid) {
×
1829
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1830
      return terrno;
1831
    }
26,035,085✔
1832
    STUidTagInfo info = {.uid = *uid};
26,035,229✔
1833
    void*        tmp = taosArrayPush(pUidTagList, &info);
26,035,229✔
UNCOV
1834
    if (!tmp) {
×
1835
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1836
      return code;
1837
    }
1838
  }
2,505,435✔
1839
  return code;
1840
}
1841

264,898,315✔
1842
int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SNode* pTagCond, void* pVnode,
1843
                                 SIdxFltStatus status, SStorageAPI* pAPI, bool addUid, bool* listAdded, void* pStreamInfo) {
264,898,315✔
1844
  *listAdded = false;
265,006,790✔
1845
  if (pTagCond == NULL) {
252,418,723✔
1846
    return TSDB_CODE_SUCCESS;
1847
  }
1848

12,588,067✔
1849
  terrno = TSDB_CODE_SUCCESS;
1850

12,386,665✔
1851
  int32_t      lino = 0;
12,386,665✔
1852
  int32_t      code = TSDB_CODE_SUCCESS;
12,386,665✔
1853
  SArray*      pBlockList = NULL;
12,386,665✔
1854
  SSDataBlock* pResBlock = NULL;
12,385,356✔
1855
  SScalarParam output = {0};
12,385,019✔
1856
  SArray*      pUidTagList = NULL;
1857

12,385,019✔
1858
  SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
1859

1860
  //  int64_t stt = taosGetTimestampUs();
12,384,351✔
1861
  pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
12,382,636✔
1862
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
1863

12,382,636✔
1864
  code = copyExistedUids(pUidTagList, pUidList);
12,383,705✔
1865
  QUERY_CHECK_CODE(code, lino, end);
1866

12,383,705✔
1867
  // Narrow down the scope of the tablelist set if there is tbname in condition and And Logical operator
12,384,576✔
1868
  bool narrowed = optimizeTbnameInCond(pVnode, pListInfo->idInfo.suid, pUidTagList, pTagCond, pAPI);
352,509✔
1869
  if (narrowed) {  // tbname in filter is activated, do nothing and return
1870
    taosArrayClear(pUidList);
352,509✔
1871

352,509✔
1872
    int32_t numOfRows = taosArrayGetSize(pUidTagList);
352,509✔
1873
    code = taosArrayEnsureCap(pUidList, numOfRows);
1874
    QUERY_CHECK_CODE(code, lino, end);
2,710,203✔
1875

2,357,694✔
1876
    for (int32_t i = 0; i < numOfRows; ++i) {
2,357,694✔
1877
      STUidTagInfo* pInfo = taosArrayGet(pUidTagList, i);
2,357,694✔
1878
      QUERY_CHECK_NULL(pInfo, code, lino, end, terrno);
2,357,694✔
1879
      void* tmp = taosArrayPush(pUidList, &pInfo->uid);
1880
      QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
352,509✔
1881
    }
1882
    terrno = 0;
12,032,067✔
1883
  } else {
1884
    qDebug("pUidTagList size:%d", (int32_t)taosArrayGetSize(pUidTagList));
12,032,067✔
1885

21,230,968✔
1886
    FilterCondType condType = checkTagCond(pTagCond);
9,199,540✔
1887
    if (((condType == FILTER_NO_LOGIC || condType == FILTER_AND) && status != SFLT_NOT_INDEX) || // (super table) use tagIndex and operator is and
3,090,152✔
1888
        (status == SFLT_NOT_INDEX && taosArrayGetSize(pUidTagList) > 0)) {                       // (child table with tagCond)
1889
      code = pAPI->metaFn.getTableTagsByUid(pVnode, pListInfo->idInfo.suid, pUidTagList);
8,941,276✔
1890
    } else {
1891
      taosArrayClear(pUidTagList);        // clear tablelist if using tagIndex and or condition
12,033,066✔
UNCOV
1892
      code = pAPI->metaFn.getTableTags(pVnode, pListInfo->idInfo.suid, pUidTagList);
×
UNCOV
1893
    }
×
UNCOV
1894
    if (code != TSDB_CODE_SUCCESS) {
×
1895
      qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code), pListInfo->idInfo.suid);
1896
      terrno = code;
1897
      QUERY_CHECK_CODE(code, lino, end);
1898
    }
12,385,575✔
1899
  }
1900

12,388,180✔
1901
  qDebug("final pUidTagList size:%d", (int32_t)taosArrayGetSize(pUidTagList));
12,388,240✔
1902

1,564,874✔
1903
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
1904
  if (numOfTables == 0) {
1905
    goto end;
10,823,366✔
1906
  }
10,824,377✔
1907

10,816,350✔
UNCOV
1908
  SArray* pColList = NULL;
×
1909
  code = qGetColumnsFromNodeList(pTagCond, false, &pColList); 
1910
  if (code != TSDB_CODE_SUCCESS) {
10,816,350✔
1911
    goto end;
10,819,151✔
1912
  }
10,818,936✔
UNCOV
1913
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
×
UNCOV
1914
  taosArrayDestroy(pColList);
×
1915
  if (pResBlock == NULL) {
1916
    code = terrno;
1917
    QUERY_CHECK_CODE(code, lino, end);
1918
  }
1919

1920
  //fprintDataBlock(pResBlock, "tagFilter", "", 0);
1921

10,818,936✔
1922
  //  int64_t st1 = taosGetTimestampUs();
10,821,893✔
1923
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
1924
  pBlockList = taosArrayInit(2, POINTER_BYTES);
10,818,814✔
1925
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
10,818,814✔
1926

1927
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
10,818,814✔
1928
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
10,811,986✔
UNCOV
1929

×
UNCOV
1930
  code = createResultData(&type, numOfTables, &output);
×
1931
  if (code != TSDB_CODE_SUCCESS) {
1932
    terrno = code;
1933
    QUERY_CHECK_CODE(code, lino, end);
10,811,986✔
1934
  }
10,811,986✔
1935

10,809,177✔
1936
  gTaskScalarExtra.pStreamInfo = pStreamInfo;
10,805,109✔
1937
  gTaskScalarExtra.pStreamRange = NULL;
898✔
1938
  code = scalarCalculate(pTagCond, pBlockList, &output, &gTaskScalarExtra);
898✔
1939
  if (code != TSDB_CODE_SUCCESS) {
898✔
1940
    qError("failed to calculate scalar, reason:%s", tstrerror(code));
1941
    terrno = code;
1942
    QUERY_CHECK_CODE(code, lino, end);
10,804,211✔
1943
  }
10,819,370✔
UNCOV
1944

×
UNCOV
1945
  code = doSetQualifiedUid(pListInfo, pUidList, pUidTagList, (bool*)output.columnData->pData, addUid);
×
1946
  if (code != TSDB_CODE_SUCCESS) {
1947
    terrno = code;
10,819,370✔
1948
    QUERY_CHECK_CODE(code, lino, end);
1949
  }
12,385,545✔
1950
  *listAdded = true;
12,385,625✔
1951

898✔
1952
end:
1953
  if (code != TSDB_CODE_SUCCESS) {
12,385,625✔
1954
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
12,373,346✔
1955
  }
12,377,603✔
1956
  blockDataDestroy(pResBlock);
1957
  taosArrayDestroy(pBlockList);
12,383,086✔
1958
  taosArrayDestroyEx(pUidTagList, freeItem);
12,390,742✔
1959

12,378,634✔
1960
  colDataDestroy(output.columnData);
1961
  taosMemoryFreeClear(output.columnData);
1962
  return code;
1963
}
1964

1965
typedef struct {
1966
  int32_t code;
1967
  SStreamRuntimeFuncInfo* pStreamRuntimeInfo;
97,384✔
1968
} PlaceHolderContext;
97,384✔
1969

97,384✔
1970
static EDealRes replacePlaceHolderColumn(SNode** pNode, void* pContext) {
81,246✔
1971
  PlaceHolderContext* pData = (PlaceHolderContext*)pContext;
1972
  if (QUERY_NODE_FUNCTION != nodeType((*pNode))) {
16,138✔
1973
    return DEAL_RES_CONTINUE;
16,138✔
1974
  }
482✔
1975
  SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
1976
  if (!fmIsStreamPesudoColVal(pFuncNode->funcId)) {
15,656✔
1977
    return DEAL_RES_CONTINUE;
15,656✔
UNCOV
1978
  }
×
1979
  pData->code = fmSetStreamPseudoFuncParamVal(pFuncNode->funcId, pFuncNode->pParameterList, pData->pStreamRuntimeInfo);
1980
  if (pData->code != TSDB_CODE_SUCCESS) {
15,656✔
1981
    return DEAL_RES_ERROR;
15,656✔
1982
  }
15,656✔
1983
  SNode* pFirstParam = nodesListGetNode(pFuncNode->pParameterList, 0);
15,656✔
1984
  ((SValueNode*)pFirstParam)->translate = true;
15,656✔
UNCOV
1985
  SValueNode* res = NULL;
×
1986
  pData->code = nodesCloneNode(pFirstParam, (SNode**)&res);
1987
  if (NULL == res) {
15,656✔
1988
    return DEAL_RES_ERROR;
15,656✔
1989
  }
1990
  nodesDestroyNode(*pNode);
15,656✔
1991
  *pNode = (SNode*)res;
1992

1993
  return DEAL_RES_CONTINUE;
26,852✔
1994
}
26,852✔
1995

26,852✔
1996
static void extractTagColId(SOperatorNode* pOpNode, SArray* pColIdArray) {
26,852✔
1997
  SNode* pLeft = pOpNode->pLeft;
26,852✔
1998
  SNode* pRight = pOpNode->pRight;
1999
  SColumnNode* pColNode = nodeType(pLeft) == QUERY_NODE_COLUMN ?
26,852✔
2000
    (SColumnNode*)pLeft : (SColumnNode*)pRight;
26,852✔
2001

26,852✔
2002
  col_id_t colId = pColNode->colId;
2003
  void* _tmp = taosArrayPush(pColIdArray, &colId);
13,426✔
2004
}
2005

2006
static int32_t buildTagCondKey(
13,426✔
2007
  const SNode* pTagCond, char** pTagCondKey,
13,426✔
2008
  int32_t* tagCondKeyLen, SArray** pTagColIds) {
13,426✔
UNCOV
2009
  if (NULL == pTagCond ||
×
UNCOV
2010
    (nodeType(pTagCond) != QUERY_NODE_OPERATOR &&
×
2011
      nodeType(pTagCond) != QUERY_NODE_LOGIC_CONDITION)) {
2012
    qError("invalid parameter to extract tag filter symbol");
13,426✔
2013
    return TSDB_CODE_INTERNAL_ERROR;
13,426✔
2014
  }
13,426✔
2015
  int32_t code = TSDB_CODE_SUCCESS;
2016
  int32_t lino = 0;
13,426✔
UNCOV
2017
  *pTagColIds = taosArrayInit(4, sizeof(col_id_t));
×
2018

13,426✔
2019
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR) {
13,426✔
2020
    extractTagColId((SOperatorNode*)pTagCond, *pTagColIds);
40,278✔
2021
  } else if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION) {
26,852✔
2022
    SNode* pChild = NULL;
2023
    FOREACH(pChild, ((SLogicConditionNode*)pTagCond)->pParameterList) {
2024
      extractTagColId((SOperatorNode*)pChild, *pTagColIds);
2025
    }
13,426✔
2026
  }
2027

2028
  taosArraySort(*pTagColIds, compareUint16Val);
26,852✔
2029

13,426✔
2030
  // encode ordered colIds into key string, separated by ','
13,426✔
2031
  *tagCondKeyLen =
13,426✔
2032
    (int32_t)(taosArrayGetSize(*pTagColIds) * (sizeof(col_id_t) + 1) - 1);
13,426✔
2033
  *pTagCondKey = (char*)taosMemoryCalloc(1, *tagCondKeyLen);
40,278✔
2034
  TSDB_CHECK_NULL(*pTagCondKey, code, lino, _end, terrno);
26,852✔
2035
  char* pStart = *pTagCondKey;
26,852✔
2036
  for (int32_t i = 0; i < taosArrayGetSize(*pTagColIds); ++i) {
26,852✔
2037
    col_id_t* pColId = (col_id_t*)taosArrayGet(*pTagColIds, i);
26,852✔
2038
    TSDB_CHECK_NULL(pColId, code, lino, _end, terrno);
26,852✔
2039
    memcpy(pStart, pColId, sizeof(col_id_t));
13,426✔
2040
    pStart += sizeof(col_id_t);
13,426✔
2041
    if (i != taosArrayGetSize(*pTagColIds) - 1) {
2042
      *pStart = ',';
2043
      pStart += 1;
2044
    }
13,426✔
2045
  }
13,426✔
UNCOV
2046

×
UNCOV
2047
_end:
×
2048
  if (TSDB_CODE_SUCCESS != code) {
2049
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
13,426✔
2050
    terrno = code;
2051
  }
2052
  return code;
123,437✔
2053
}
123,437✔
UNCOV
2054

×
UNCOV
2055
static EDealRes canOptimizeTagCondFilter(SNode* pTagCond, void* pContext) {
×
2056
  if (NULL == pTagCond) {
2057
    *(bool*)pContext = false;
123,437✔
2058
    return DEAL_RES_END;
82,063✔
2059
  }
68,226✔
2060
  if (nodeType(pTagCond) == QUERY_NODE_VALUE ||
2061
    nodeType(pTagCond) == QUERY_NODE_COLUMN) {
55,211✔
2062
    return DEAL_RES_CONTINUE;
27,263✔
2063
  }
26,852✔
2064
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR &&
2065
    ((SOperatorNode*)pTagCond)->opType == OP_TYPE_EQUAL) {
28,359✔
2066
    return DEAL_RES_CONTINUE;
13,426✔
2067
  }
13,426✔
2068
  if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION &&
2069
    ((SLogicConditionNode*)pTagCond)->condType == LOGIC_COND_TYPE_AND) {
29,455✔
2070
    return DEAL_RES_CONTINUE;
14,522✔
2071
  }
14,522✔
2072
  if (nodeType(pTagCond) == QUERY_NODE_FUNCTION &&
2073
    fmIsStreamPesudoColVal(((SFunctionNode*)pTagCond)->funcId)) {
411✔
2074
    return DEAL_RES_CONTINUE;
411✔
2075
  }
2076
  *(bool*)pContext = false;
2077
  return DEAL_RES_END;
264,735,690✔
2078
}
2079

264,735,690✔
2080
int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
264,735,690✔
2081
                     STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI, void* pStreamInfo) {
264,735,690✔
2082
  int32_t code = TSDB_CODE_SUCCESS;
264,735,690✔
2083
  int32_t lino = 0;
2084
  size_t  numOfTables = 0;
264,869,117✔
2085
  bool    listAdded = false;
264,565,459✔
2086

2087
  pListInfo->idInfo.suid = pScanNode->suid;
264,780,465✔
2088
  pListInfo->idInfo.tableType = pScanNode->tableType;
264,327,767✔
2089

2090
  SArray* pUidList = taosArrayInit(8, sizeof(uint64_t));
264,327,767✔
2091
  QUERY_CHECK_NULL(pUidList, code, lino, _error, terrno);
264,360,470✔
2092

264,419,302✔
2093
  char*   pTagCondKey = NULL;
264,600,800✔
2094
  int32_t tagCondKeyLen;
264,709,635✔
2095
  SArray* pTagColIds = NULL;
264,709,635✔
2096
  char*   pPayload = NULL;
2097
  qTrace("getTableList called, suid:%" PRIu64
2098
    ", tagCond:%p, tagIndexCond:%p, %d %d", pScanNode->suid, pTagCond,
264,709,635✔
2099
    pTagIndexCond, pScanNode->tableType, pScanNode->virtualStableScan);
141,471,607✔
2100
  if (pScanNode->tableType != TSDB_SUPER_TABLE && !pScanNode->virtualStableScan) {
141,499,986✔
2101
    pListInfo->idInfo.uid = pScanNode->uid;
141,515,053✔
2102
    if (pStorageAPI->metaFn.isTableExisted(pVnode, pScanNode->uid)) {
141,521,570✔
2103
      void* tmp = taosArrayPush(pUidList, &pScanNode->uid);
2104
      QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
141,562,947✔
2105
    }
141,552,166✔
2106
    code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, SFLT_NOT_INDEX, pStorageAPI, false, &listAdded, pStreamInfo);
2107
    QUERY_CHECK_CODE(code, lino, _end);
123,367,119✔
2108
  } else {
123,367,119✔
2109
    bool      isStream = (pStreamInfo != NULL);
123,367,119✔
2110
    bool      hasTagCond = (pTagCond != NULL);
123,262,047✔
2111
    bool      canCacheTagEqCondFilter = false;
2112
    T_MD5_CTX context = {0};
123,515,769✔
2113

2114
    qTrace("start to get table list by tag filter, suid:%" PRIu64
2115
      ",tsStableTagFilterCache:%d, tsTagFilterCache:%d", 
2116
      pScanNode->suid, tsStableTagFilterCache, tsTagFilterCache);
123,515,769✔
2117

2118
    bool acquired = false;
123,240,216✔
2119
    // first, check whether we can use stable tag filter cache
13,837✔
2120
    if (tsStableTagFilterCache && isStream && hasTagCond) {
13,837✔
2121
      canCacheTagEqCondFilter = true;
2122
      nodesWalkExpr(pTagCond, canOptimizeTagCondFilter,
2123
        (void*)&canCacheTagEqCondFilter);
122,552,479✔
2124
    }
13,426✔
2125
    if (canCacheTagEqCondFilter) {
13,426✔
2126
      qDebug("%s, stable tag filter condition can be optimized", idstr);
13,426✔
2127
      if (((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
13,426✔
2128
        SNode* tmp = NULL;
13,426✔
2129
        code = nodesCloneNode((SNode*)pTagCond, &tmp);
2130
        QUERY_CHECK_CODE(code, lino, _error);
13,426✔
2131

13,426✔
2132
        PlaceHolderContext ctx = {.code = TSDB_CODE_SUCCESS, .pStreamRuntimeInfo = (SStreamRuntimeFuncInfo*)pStreamInfo};
13,426✔
UNCOV
2133
        nodesRewriteExpr(&tmp, replacePlaceHolderColumn, (void*)&ctx);
×
UNCOV
2134
        if (TSDB_CODE_SUCCESS != ctx.code) {
×
2135
          nodesDestroyNode(tmp);
×
2136
          code = ctx.code;
2137
          goto _error;
13,426✔
2138
        }
13,426✔
2139
        code = genStableTagFilterDigest(tmp, &context);
UNCOV
2140
        nodesDestroyNode(tmp);
×
2141
      } else {
2142
        code = genStableTagFilterDigest(pTagCond, &context);
13,426✔
2143
      }
2144
      QUERY_CHECK_CODE(code, lino, _error);
13,426✔
2145

2146
      code = buildTagCondKey(
13,426✔
2147
        pTagCond, &pTagCondKey, &tagCondKeyLen, &pTagColIds);
13,426✔
2148
      QUERY_CHECK_CODE(code, lino, _error);
13,426✔
2149
      code = pStorageAPI->metaFn.getStableCachedTableList(
2150
        pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
13,426✔
2151
        context.digest, tListLen(context.digest), pUidList, &acquired);
122,539,053✔
2152
      QUERY_CHECK_CODE(code, lino, _error);
2153
    } else if (tsTagFilterCache) {
26,910✔
2154
      // second, try to use normal tag filter cache
28,044✔
2155
      qDebug("%s using normal tag filter cache", idstr);
1,134✔
2156
      if (pStreamInfo != NULL && ((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
1,134✔
2157
        SNode* tmp = NULL;
1,134✔
2158
        code = nodesCloneNode((SNode*)pTagCond, &tmp);
2159
        QUERY_CHECK_CODE(code, lino, _error);
1,134✔
2160

1,134✔
2161
        PlaceHolderContext ctx = {.code = TSDB_CODE_SUCCESS, .pStreamRuntimeInfo = (SStreamRuntimeFuncInfo*)pStreamInfo};
1,134✔
UNCOV
2162
        nodesRewriteExpr(&tmp, replacePlaceHolderColumn, (void*)&ctx);
×
UNCOV
2163
        if (TSDB_CODE_SUCCESS != ctx.code) {
×
2164
          nodesDestroyNode(tmp);
×
2165
          code = ctx.code;
2166
          goto _error;
1,134✔
2167
        }
1,134✔
2168
        code = genTagFilterDigest(tmp, &context);
2169
        nodesDestroyNode(tmp);
25,776✔
2170
      } else {
2171
        code = genTagFilterDigest(pTagCond, &context);
2172
      }
26,910✔
2173
      // try to retrieve the result from meta cache
26,910✔
2174
      QUERY_CHECK_CODE(code, lino, _error);      
26,910✔
2175
      code = pStorageAPI->metaFn.getCachedTableList(
2176
        pVnode, pScanNode->suid, context.digest,
461,515✔
2177
        tListLen(context.digest), pUidList, &acquired);
2178
      QUERY_CHECK_CODE(code, lino, _error);
122,986,905✔
2179
    }
29,947✔
2180
    if (acquired) {
29,947✔
2181
      taosArrayDestroy(pTagColIds);
2182
      pTagColIds = NULL;
29,947✔
2183
      
59,894✔
2184
      digest[0] = 1;
29,947✔
2185
      memcpy(
29,947✔
2186
        digest + 1, context.digest, tListLen(context.digest));
2187
      qDebug("suid:%" PRIu64 ", %s retrieve table uid list from cache,"
2188
        " numOfTables:%d", 
29,947✔
2189
        pScanNode->suid, idstr, (int32_t)taosArrayGetSize(pUidList));
2190
      goto _end;
122,956,958✔
2191
    } else {
2192
      qDebug("suid:%" PRIu64 
2193
        ", failed to get table uid list from cache", pScanNode->suid);
2194
    }
123,218,810✔
2195

111,086,386✔
2196
    if (!pTagCond) {  // no tag filter condition exists, let's fetch all tables of this super table
111,230,286✔
2197
      code = pStorageAPI->metaFn.getChildTableList(pVnode, pScanNode->suid, pUidList);
111,230,286✔
2198
      QUERY_CHECK_CODE(code, lino, _error);
2199
      qTrace("no tag filter, get all child tables, numOfTables:%d", (int32_t)taosArrayGetSize(pUidList));
2200
    } else {
12,132,424✔
2201
      SIdxFltStatus status = SFLT_NOT_INDEX;
3,879,111✔
2202
      if (pTagIndexCond) {
2203
        void* pIndex = pStorageAPI->metaFn.getInvertIndex(pVnode);
3,879,139✔
2204

3,879,111✔
2205
        SIndexMetaArg metaArg = {.metaEx = pVnode,
2206
                                 .idx = pStorageAPI->metaFn.storeGetIndexInfo(pVnode),
3,878,987✔
2207
                                 .ivtIdx = pIndex,
2208
                                 .suid = pScanNode->uid};
3,878,987✔
2209
        code = doFilterTag(pTagIndexCond, &metaArg, pUidList, &status, &pStorageAPI->metaFilter);
3,878,987✔
2210
        if (code != 0 || status == SFLT_NOT_INDEX) {  // temporarily disable it for performance sake
3,868,601✔
2211
          qDebug("failed to get tableIds from index, suid:%" PRIu64 ", uidListSize:%d", pScanNode->uid, (int32_t)taosArrayGetSize(pUidList));
1,037,501✔
2212
        } else {
2213
          qDebug("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
2,831,100✔
2214
        }
2215
      }
2216
      qTrace("after index filter, pTagCond:%p uidListSize:%d", pTagCond, (int32_t)taosArrayGetSize(pUidList));
2217
      code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status,
123,343,063✔
2218
        pStorageAPI, tsTagFilterCache || tsStableTagFilterCache,
123,572,332✔
2219
        &listAdded, pStreamInfo);
123,572,332✔
2220
      QUERY_CHECK_CODE(code, lino, _error);
2221
    }
123,393,999✔
2222
    // let's add the filter results into meta-cache
2223
    numOfTables = taosArrayGetSize(pUidList);
2224

123,393,101✔
2225
    if (canCacheTagEqCondFilter) {
2226
      qInfo("%s, suid:%" PRIu64 ", add uid list to stable tag filter cache, "
123,423,852✔
2227
            "uidListSize:%d, origin key:%" PRIu64 ",%" PRIu64,
3,836✔
2228
            idstr, pScanNode->suid, (int32_t)numOfTables,
2229
            *(uint64_t*)context.digest, *(uint64_t*)(context.digest + 8));
2230

2231
      code = pStorageAPI->metaFn.putStableCachedTableList(
2232
        pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
3,836✔
2233
        context.digest, tListLen(context.digest),
2234
        pUidList, &pTagColIds);
2235
      QUERY_CHECK_CODE(code, lino, _end);
2236

3,836✔
2237
      digest[0] = 1;
2238
      memcpy(digest + 1, context.digest, tListLen(context.digest));
3,836✔
2239
    } else if (tsTagFilterCache) {
3,836✔
2240
      qInfo("%s, suid:%" PRIu64 ", add uid list to normal tag filter cache, "
123,420,016✔
2241
            "uidListSize:%d, origin key:%" PRIu64 ",%" PRIu64,
6,553✔
2242
            idstr, pScanNode->suid, (int32_t)numOfTables,
2243
            *(uint64_t*)context.digest, *(uint64_t*)(context.digest + 8));
2244
      size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
2245
      pPayload = taosMemoryMalloc(size);
6,553✔
2246
      QUERY_CHECK_NULL(pPayload, code, lino, _end, terrno);
6,553✔
2247

6,553✔
2248
      *(int32_t*)pPayload = (int32_t)numOfTables;
2249
      if (numOfTables > 0) {
6,553✔
2250
        void* tmp = taosArrayGet(pUidList, 0);
6,553✔
2251
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
5,386✔
2252
        memcpy(pPayload + sizeof(int32_t), tmp, numOfTables * sizeof(uint64_t));
5,386✔
2253
      }
5,386✔
2254

2255
      code = pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid,
2256
                                                    context.digest,
6,553✔
2257
                                                    tListLen(context.digest),
2258
                                                    pPayload, size, 1);
2259
      if (TSDB_CODE_SUCCESS == code) {
2260
        /*
6,553✔
2261
          data referenced by pPayload is used in lru cache,
2262
          reset pPayload to NULL to avoid being freed in _error block
2263
        */
2264
        pPayload = NULL;
2265
      } else {
6,553✔
2266
        if (TSDB_CODE_DUP_KEY == code) {
UNCOV
2267
          /*
×
2268
            another thread has already put the same key into cache,
2269
            we can just ignore this error
2270
          */
2271
          code = TSDB_CODE_SUCCESS;
UNCOV
2272
        }
×
2273
        QUERY_CHECK_CODE(code, lino, _end);
UNCOV
2274
      }
×
2275

2276

2277
      digest[0] = 1;
2278
      memcpy(digest + 1, context.digest, tListLen(context.digest));
6,553✔
2279
    }
6,553✔
2280
  }
2281

2282
_end:
2283
  if (!listAdded) {
264,780,302✔
2284
    numOfTables = taosArrayGetSize(pUidList);
265,188,688✔
2285
    for (int i = 0; i < numOfTables; i++) {
253,933,768✔
2286
      void* tmp = taosArrayGet(pUidList, i);
833,336,870✔
2287
      QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
579,488,544✔
2288
      STableKeyInfo info = {.uid = *(uint64_t*)tmp, .groupId = 0};
579,670,802✔
2289

579,670,802✔
2290
      void* p = taosArrayPush(pListInfo->pTableList, &info);
2291
      if (p == NULL) {
579,620,538✔
2292
        taosArrayDestroy(pUidList);
579,634,694✔
2293
        return terrno;
×
UNCOV
2294
      }
×
2295

2296
      qTrace("tagfilter get uid:%" PRIu64 ", %s", info.uid, idstr);
2297
    }
579,634,694✔
2298
  }
2299

2300
  qDebug("%s, table list with %d uids built", idstr, (int32_t)numOfTables);
2301

265,103,246✔
2302
_error:
2303
  taosArrayDestroy(pUidList);
265,094,990✔
2304
  taosArrayDestroy(pTagColIds);
265,163,208✔
2305
  taosMemFreeClear(pTagCondKey);
264,514,948✔
2306
  taosMemFreeClear(pPayload);
264,615,905✔
2307
  if (code != TSDB_CODE_SUCCESS) {
264,615,905✔
2308
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
264,615,905✔
2309
  }
898✔
2310
  return code;
2311
}
264,759,571✔
2312

2313
int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList, void* pTaskInfo) {
2314
  int32_t        code = TSDB_CODE_SUCCESS;
18,144✔
2315
  int32_t        lino = 0;
18,144✔
2316
  SSubplan*      pSubplan = (SSubplan*)node;
18,144✔
2317
  SScanPhysiNode pNode = {0};
18,144✔
2318
  pNode.suid = suid;
18,144✔
2319
  pNode.uid = suid;
18,144✔
2320
  pNode.tableType = TSDB_SUPER_TABLE;
18,144✔
2321

18,144✔
2322
  STableListInfo* pTableListInfo = tableListCreate();
2323
  QUERY_CHECK_NULL(pTableListInfo, code, lino, _end, terrno);
18,144✔
2324
  uint8_t digest[17] = {0};
18,144✔
2325
  code = getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL,
18,144✔
2326
                      pTableListInfo, digest, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI, NULL);
18,144✔
2327
  QUERY_CHECK_CODE(code, lino, _end);
2328
  *tableList = pTableListInfo->pTableList;
18,144✔
2329
  pTableListInfo->pTableList = NULL;
18,144✔
2330
  tableListDestroy(pTableListInfo);
18,144✔
2331

18,144✔
2332
_end:
2333
  if (code != TSDB_CODE_SUCCESS) {
18,144✔
2334
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
18,144✔
UNCOV
2335
  }
×
2336
  return code;
2337
}
18,144✔
2338

2339
size_t getTableTagsBufLen(const SNodeList* pGroups) {
2340
  size_t keyLen = 0;
×
UNCOV
2341

×
2342
  SNode* node;
2343
  FOREACH(node, pGroups) {
2344
    SExprNode* pExpr = (SExprNode*)node;
×
2345
    keyLen += pExpr->resType.bytes;
×
UNCOV
2346
  }
×
2347

2348
  keyLen += sizeof(int8_t) * LIST_LENGTH(pGroups);
2349
  return keyLen;
×
UNCOV
2350
}
×
2351

2352
int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId,
UNCOV
2353
                              SStorageAPI* pAPI) {
×
2354
  SMetaReader mr = {0};
UNCOV
2355

×
2356
  pAPI->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &pAPI->metaFn);
2357
  if (pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid) != 0) {  // table not exist
×
2358
    pAPI->metaReaderFn.clearReader(&mr);
×
2359
    return TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
UNCOV
2360
  }
×
2361

2362
  SNodeList* groupNew = NULL;
2363
  int32_t    code = nodesCloneList(pGroupNode, &groupNew);
×
2364
  if (TSDB_CODE_SUCCESS != code) {
×
2365
    pAPI->metaReaderFn.clearReader(&mr);
×
2366
    return code;
×
UNCOV
2367
  }
×
2368

2369
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
2370
  nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &ctx);
×
2371
  if (TSDB_CODE_SUCCESS != ctx.code) {
×
2372
    nodesDestroyList(groupNew);
×
2373
    pAPI->metaReaderFn.clearReader(&mr);
×
2374
    return code;
×
UNCOV
2375
  }
×
2376
  char* isNull = (char*)keyBuf;
2377
  char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(pGroupNode);
×
UNCOV
2378

×
2379
  SNode*  pNode;
2380
  int32_t index = 0;
2381
  FOREACH(pNode, groupNew) {
×
2382
    SNode*  pNew = NULL;
×
2383
    int32_t code = scalarCalculateConstants(pNode, &pNew);
×
2384
    if (TSDB_CODE_SUCCESS == code) {
×
2385
      REPLACE_NODE(pNew);
×
UNCOV
2386
    } else {
×
2387
      nodesDestroyList(groupNew);
2388
      pAPI->metaReaderFn.clearReader(&mr);
×
2389
      return code;
×
UNCOV
2390
    }
×
2391

2392
    if (nodeType(pNew) != QUERY_NODE_VALUE) {
2393
      nodesDestroyList(groupNew);
×
2394
      pAPI->metaReaderFn.clearReader(&mr);
×
2395
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
2396
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
2397
    }
×
2398
    SValueNode* pValue = (SValueNode*)pNew;
UNCOV
2399

×
2400
    if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
2401
      isNull[index++] = 1;
×
2402
      continue;
×
UNCOV
2403
    } else {
×
2404
      isNull[index++] = 0;
2405
      char* data = nodesGetValueFromNode(pValue);
×
2406
      if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
×
2407
        if (tTagIsJson(data)) {
×
2408
          terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
×
2409
          nodesDestroyList(groupNew);
×
2410
          pAPI->metaReaderFn.clearReader(&mr);
×
2411
          return terrno;
×
UNCOV
2412
        }
×
2413
        int32_t len = getJsonValueLen(data);
2414
        memcpy(pStart, data, len);
×
2415
        pStart += len;
×
2416
      } else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
×
2417
        if (IS_STR_DATA_BLOB(pValue->node.resType.type)) {
×
2418
          return TSDB_CODE_BLOB_NOT_SUPPORT_TAG;
×
UNCOV
2419
        }
×
2420
        memcpy(pStart, data, varDataTLen(data));
2421
        pStart += varDataTLen(data);
×
UNCOV
2422
      } else {
×
2423
        memcpy(pStart, data, pValue->node.resType.bytes);
2424
        pStart += pValue->node.resType.bytes;
×
UNCOV
2425
      }
×
2426
    }
2427
  }
2428

2429
  int32_t len = (int32_t)(pStart - (char*)keyBuf);
2430
  *pGroupId = calcGroupId(keyBuf, len);
×
UNCOV
2431

×
2432
  nodesDestroyList(groupNew);
2433
  pAPI->metaReaderFn.clearReader(&mr);
×
UNCOV
2434

×
2435
  return TSDB_CODE_SUCCESS;
UNCOV
2436
}
×
2437

2438
SArray* makeColumnArrayFromList(SNodeList* pNodeList) {
2439
  if (!pNodeList) {
8,809,823✔
2440
    return NULL;
8,809,823✔
UNCOV
2441
  }
×
2442

2443
  size_t  numOfCols = LIST_LENGTH(pNodeList);
2444
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
8,809,823✔
2445
  if (pList == NULL) {
8,812,879✔
2446
    return NULL;
8,784,894✔
UNCOV
2447
  }
×
2448

2449
  for (int32_t i = 0; i < numOfCols; ++i) {
2450
    SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
20,066,201✔
2451
    if (!pColNode) {
11,285,081✔
2452
      taosArrayDestroy(pList);
11,307,507✔
2453
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
2454
      return NULL;
×
UNCOV
2455
    }
×
2456

2457
    // todo extract method
2458
    SColumn c = {0};
2459
    c.slotId = pColNode->slotId;
11,307,507✔
2460
    c.colId = pColNode->colId;
11,297,945✔
2461
    c.type = pColNode->node.resType.type;
11,309,417✔
2462
    c.bytes = pColNode->node.resType.bytes;
11,308,579✔
2463
    c.precision = pColNode->node.resType.precision;
11,305,899✔
2464
    c.scale = pColNode->node.resType.scale;
11,300,271✔
2465

11,278,607✔
2466
    void* tmp = taosArrayPush(pList, &c);
2467
    if (!tmp) {
11,291,742✔
2468
      taosArrayDestroy(pList);
11,291,742✔
2469
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
2470
      return NULL;
×
UNCOV
2471
    }
×
2472
  }
2473

2474
  return pList;
2475
}
8,781,120✔
2476

2477
int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
2478
                            int32_t type, SColMatchInfo* pMatchInfo) {
342,788,963✔
2479
  size_t  numOfCols = LIST_LENGTH(pNodeList);
2480
  int32_t code = TSDB_CODE_SUCCESS;
342,788,963✔
2481
  int32_t lino = 0;
342,850,272✔
2482

342,850,272✔
2483
  pMatchInfo->matchType = type;
2484

342,850,272✔
2485
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchItem));
2486
  if (pList == NULL) {
342,805,528✔
2487
    code = terrno;
342,432,297✔
2488
    return code;
×
UNCOV
2489
  }
×
2490

2491
  for (int32_t i = 0; i < numOfCols; ++i) {
2492
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
1,420,452,405✔
2493
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
1,077,583,709✔
2494
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
1,077,794,799✔
2495
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
1,077,794,799✔
2496

1,068,838,168✔
2497
      SColMatchItem c = {.needOutput = true};
2498
      c.colId = pColNode->colId;
1,068,874,130✔
2499
      c.srcSlotId = pColNode->slotId;
1,068,864,515✔
2500
      c.dstSlotId = pNode->slotId;
1,068,957,836✔
2501
      c.isPk = pColNode->isPk;
1,068,764,099✔
2502
      c.dataType = pColNode->node.resType;
1,068,742,775✔
2503
      void* tmp = taosArrayPush(pList, &c);
1,068,735,307✔
2504
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,069,042,420✔
2505
    }
1,069,042,420✔
2506
  }
2507

2508
  // set the output flag for each column in SColMatchInfo, according to the
2509
  *numOfOutputCols = 0;
2510
  int32_t num = LIST_LENGTH(pOutputNodeList->pSlots);
342,868,696✔
2511
  for (int32_t i = 0; i < num; ++i) {
342,922,874✔
2512
    SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i);
1,595,718,104✔
2513
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
1,253,134,726✔
2514

1,253,101,915✔
2515
    // todo: add reserve flag check
2516
    // it is a column reserved for the arithmetic expression calculation
2517
    if (pNode->slotId >= numOfCols) {
2518
      (*numOfOutputCols) += 1;
1,253,101,915✔
2519
      continue;
175,413,663✔
2520
    }
175,426,230✔
2521

2522
    SColMatchItem* info = NULL;
2523
    for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
1,077,771,154✔
2524
      info = taosArrayGet(pList, j);
2,147,483,647✔
2525
      QUERY_CHECK_NULL(info, code, lino, _end, terrno);
2,147,483,647✔
2526
      if (info->dstSlotId == pNode->slotId) {
2,147,483,647✔
2527
        break;
2,147,483,647✔
2528
      }
1,067,709,003✔
2529
    }
2530

2531
    if (pNode->output) {
2532
      (*numOfOutputCols) += 1;
20,098,703✔
2533
    } else if (info != NULL) {
1,066,198,763✔
2534
      // select distinct tbname from stb where tbname='abc';
11,445,827✔
2535
      info->needOutput = false;
2536
    }
11,333,073✔
2537
  }
2538

2539
  pMatchInfo->pList = pList;
2540

342,583,378✔
2541
_end:
2542
  if (code != TSDB_CODE_SUCCESS) {
342,637,801✔
2543
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
342,637,801✔
UNCOV
2544
  }
×
2545
  return code;
2546
}
342,732,142✔
2547

2548
static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision,
2549
                                  const char* name) {
998,141,921✔
2550
  SResSchema s = {0};
2551
  s.scale = scale;
998,141,921✔
2552
  s.type = type;
998,257,384✔
2553
  s.bytes = bytes;
998,257,384✔
2554
  s.slotId = slotId;
998,257,384✔
2555
  s.precision = precision;
998,257,384✔
2556
  tstrncpy(s.name, name, tListLen(s.name));
998,257,384✔
2557

998,257,384✔
2558
  return s;
2559
}
998,257,384✔
2560

2561
static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType, EColumnType colType) {
2562
  SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn));
897,531,982✔
2563
  if (pCol == NULL) {
897,531,982✔
2564
    return NULL;
896,880,303✔
UNCOV
2565
  }
×
2566

2567
  pCol->slotId = slotId;
2568
  pCol->colId = colId;
896,880,303✔
2569
  pCol->bytes = pType->bytes;
896,943,181✔
2570
  pCol->type = pType->type;
897,010,082✔
2571
  pCol->scale = pType->scale;
897,338,684✔
2572
  pCol->precision = pType->precision;
897,312,917✔
2573
  pCol->dataBlockId = blockId;
897,624,273✔
2574
  pCol->colType = colType;
897,538,704✔
2575
  return pCol;
897,362,723✔
2576
}
897,552,519✔
2577

2578
int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
2579
  int32_t code = TSDB_CODE_SUCCESS;
1,004,950,145✔
2580
  int32_t lino = 0;
1,004,950,145✔
2581
  pExp->base.numOfParams = 0;
1,004,950,145✔
2582
  pExp->base.pParam = NULL;
1,004,950,145✔
2583
  pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode));
1,005,087,563✔
2584
  QUERY_CHECK_NULL(pExp->pExpr, code, lino, _end, terrno);
1,004,881,953✔
2585

1,003,856,390✔
2586
  pExp->pExpr->_function.num = 1;
2587
  pExp->pExpr->_function.functionId = -1;
1,004,105,624✔
2588

1,004,384,932✔
2589
  int32_t type = nodeType(pNode);
2590
  // it is a project query, or group by column
1,004,487,739✔
2591
  if (type == QUERY_NODE_COLUMN) {
2592
    pExp->pExpr->nodeType = QUERY_NODE_COLUMN;
1,004,847,291✔
2593
    SColumnNode* pColNode = (SColumnNode*)pNode;
544,962,783✔
2594

545,015,556✔
2595
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
2596
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
545,015,556✔
2597

544,832,141✔
2598
    pExp->base.numOfParams = 1;
2599

544,752,326✔
2600
    SDataType* pType = &pColNode->node.resType;
2601
    pExp->base.resSchema =
544,934,648✔
2602
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pColNode->colName);
2603

544,917,562✔
2604
    pExp->base.pParam[0].pCol =
2605
        createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType);
1,090,024,669✔
2606
    QUERY_CHECK_NULL(pExp->base.pParam[0].pCol, code, lino, _end, terrno);
1,089,815,655✔
2607

545,075,998✔
2608
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
2609
  } else if (type == QUERY_NODE_VALUE) {
544,910,383✔
2610
    pExp->pExpr->nodeType = QUERY_NODE_VALUE;
459,884,508✔
2611
    SValueNode* pValNode = (SValueNode*)pNode;
23,254,438✔
2612

23,259,079✔
2613
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
2614
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
23,259,079✔
2615

23,221,873✔
2616
    pExp->base.numOfParams = 1;
2617

23,216,388✔
2618
    SDataType* pType = &pValNode->node.resType;
2619
    pExp->base.resSchema =
23,249,138✔
2620
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName);
2621
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
23,238,795✔
2622
    code = nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
23,208,165✔
2623
    QUERY_CHECK_CODE(code, lino, _end);
23,246,780✔
2624
  } else if (type == QUERY_NODE_REMOTE_VALUE) {
23,250,953✔
2625
    SRemoteValueNode* pRemote = (SRemoteValueNode*)pNode;
436,630,070✔
2626
    code = qFetchRemoteValue(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, pRemote);
27,965,303✔
2627
    QUERY_CHECK_CODE(code, lino, _end);
27,965,303✔
2628

28,006,810✔
2629
    pExp->pExpr->nodeType = QUERY_NODE_VALUE;
2630
    SValueNode* pValNode = (SValueNode*)pNode;
21,244,675✔
2631

21,246,657✔
2632
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
2633
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
21,246,657✔
2634

21,246,307✔
2635
    pExp->base.numOfParams = 1;
2636

21,245,771✔
2637
    SDataType* pType = &pValNode->node.resType;
2638
    pExp->base.resSchema =
21,246,843✔
2639
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName);
2640
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
21,245,852✔
2641
    code = nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
21,243,611✔
2642
    QUERY_CHECK_CODE(code, lino, _end);
21,244,861✔
2643
  } else if (type == QUERY_NODE_FUNCTION) {
21,244,870✔
2644
    pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
408,664,767✔
2645
    SFunctionNode* pFuncNode = (SFunctionNode*)pNode;
369,737,605✔
2646

369,737,585✔
2647
    SDataType* pType = &pFuncNode->node.resType;
2648
    pExp->base.resSchema =
369,737,585✔
2649
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pFuncNode->node.aliasName);
2650
    tExprNode* pExprNode = pExp->pExpr;
369,777,501✔
2651

369,638,176✔
2652
    pExprNode->_function.functionId = pFuncNode->funcId;
2653
    pExprNode->_function.pFunctNode = pFuncNode;
369,681,185✔
2654
    pExprNode->_function.functionType = pFuncNode->funcType;
369,765,298✔
2655

369,697,840✔
2656
    tstrncpy(pExprNode->_function.functionName, pFuncNode->functionName, tListLen(pExprNode->_function.functionName));
2657

369,811,250✔
2658
    pExp->base.pParamList = pFuncNode->pParameterList;
2659
#if 1
369,845,486✔
2660
    // todo refactor: add the parameter for tbname function
2661
    const char* name = "tbname";
2662
    int32_t     len = strlen(name);
369,770,535✔
2663

369,770,535✔
2664
    if (!pFuncNode->pParameterList && (memcmp(pExprNode->_function.functionName, name, len) == 0) &&
2665
        pExprNode->_function.functionName[len] == 0) {
369,770,535✔
2666
      pFuncNode->pParameterList = NULL;
25,040,539✔
2667
      int32_t     code = nodesMakeList(&pFuncNode->pParameterList);
25,043,640✔
2668
      SValueNode* res = NULL;
25,044,876✔
2669
      if (TSDB_CODE_SUCCESS == code) {
25,048,320✔
2670
        code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
25,036,367✔
2671
      }
25,049,265✔
2672
      QUERY_CHECK_CODE(code, lino, _end);
2673
      res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
25,052,254✔
2674
      code = nodesListAppend(pFuncNode->pParameterList, (SNode*)res);
25,052,254✔
2675
      if (code != TSDB_CODE_SUCCESS) {
25,039,062✔
2676
        nodesDestroyNode((SNode*)res);
25,053,189✔
2677
        res = NULL;
×
UNCOV
2678
      }
×
2679
      QUERY_CHECK_CODE(code, lino, _end);
2680
    }
25,053,189✔
2681
#endif
2682

2683
    int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
2684

369,816,075✔
2685
    pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam));
2686
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
369,812,053✔
2687
    pExp->base.numOfParams = numOfParam;
369,638,340✔
2688

369,695,854✔
2689
    for (int32_t j = 0; j < numOfParam && TSDB_CODE_SUCCESS == code; ++j) {
2690
      SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
877,257,810✔
2691
      QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
507,855,089✔
2692
      if (p1->type == QUERY_NODE_COLUMN) {
507,842,365✔
2693
        SColumnNode* pcn = (SColumnNode*)p1;
507,842,365✔
2694

352,610,587✔
2695
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
2696
        pExp->base.pParam[j].pCol =
352,610,587✔
2697
            createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType);
705,111,325✔
2698
        QUERY_CHECK_NULL(pExp->base.pParam[j].pCol, code, lino, _end, terrno);
705,034,462✔
2699
      } else if (p1->type == QUERY_NODE_VALUE) {
352,580,498✔
2700
        SValueNode* pvn = (SValueNode*)p1;
155,252,516✔
2701
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
99,702,168✔
2702
        code = nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
99,702,168✔
2703
        QUERY_CHECK_CODE(code, lino, _end);
99,684,962✔
2704
      } else if (p1->type == QUERY_NODE_REMOTE_VALUE) {
99,680,427✔
2705
        SRemoteValueNode* pRemote = (SRemoteValueNode*)p1;
55,609,380✔
2706
        code = qFetchRemoteValue(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, pRemote);
1,429,896✔
2707
        QUERY_CHECK_CODE(code, lino, _end);
1,429,896✔
2708

1,430,432✔
2709
        SValueNode* pvn = (SValueNode*)pRemote;
2710
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
1,099,911✔
2711
        code = nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
1,099,911✔
2712
        QUERY_CHECK_CODE(code, lino, _end);
1,099,911✔
2713
      }
1,083,007✔
2714
    }
2715
    pExp->pExpr->_function.bindExprID = ((SExprNode*)pNode)->bindExprID;
2716
  } else if (type == QUERY_NODE_OPERATOR) {
369,402,721✔
2717
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
38,927,162✔
2718
    SOperatorNode* pOpNode = (SOperatorNode*)pNode;
33,780,654✔
2719

33,783,507✔
2720
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
2721
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
33,783,507✔
2722
    pExp->base.numOfParams = 1;
33,762,056✔
2723

33,736,017✔
2724
    SDataType* pType = &pOpNode->node.resType;
2725
    pExp->base.resSchema =
33,778,282✔
2726
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pOpNode->node.aliasName);
2727
    pExp->pExpr->_optrRoot.pRootNode = pNode;
33,771,878✔
2728
  } else if (type == QUERY_NODE_CASE_WHEN) {
33,767,896✔
2729
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
5,146,569✔
2730
    SCaseWhenNode* pCaseNode = (SCaseWhenNode*)pNode;
5,134,458✔
2731

5,134,458✔
2732
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
2733
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
5,134,458✔
2734
    pExp->base.numOfParams = 1;
5,134,458✔
2735

5,134,458✔
2736
    SDataType* pType = &pCaseNode->node.resType;
2737
    pExp->base.resSchema =
5,134,458✔
2738
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCaseNode->node.aliasName);
2739
    pExp->pExpr->_optrRoot.pRootNode = pNode;
5,134,030✔
2740
  } else if (type == QUERY_NODE_LOGIC_CONDITION) {
5,134,458✔
2741
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
12,111✔
2742
    SLogicConditionNode* pCond = (SLogicConditionNode*)pNode;
3,982✔
2743
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
3,982✔
2744
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
3,982✔
2745
    pExp->base.numOfParams = 1;
3,982✔
2746
    SDataType* pType = &pCond->node.resType;
3,982✔
2747
    pExp->base.resSchema =
3,982✔
2748
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName);
2749
    pExp->pExpr->_optrRoot.pRootNode = pNode;
3,982✔
2750
  } else {
3,982✔
2751
    code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
2752
    QUERY_CHECK_CODE(code, lino, _end);
8,167✔
2753
  }
8,167✔
2754
  pExp->pExpr->relatedTo = ((SExprNode*)pNode)->relatedTo;
2755
_end:
997,668,721✔
2756
  if (code != TSDB_CODE_SUCCESS) {
1,004,803,870✔
2757
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1,004,803,870✔
2758
  }
7,092,656✔
2759
  return code;
2760
}
1,004,818,556✔
2761

2762
int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) {
2763
  return createExprFromOneNode(pExp, pTargetNode->pExpr, pTargetNode->slotId);
1,004,697,768✔
2764
}
1,004,697,768✔
2765

2766
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
2767
  *numOfExprs = LIST_LENGTH(pNodeList);
×
2768
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
×
2769
  if (!pExprs) {
×
2770
    return NULL;
×
UNCOV
2771
  }
×
2772

2773
  for (int32_t i = 0; i < (*numOfExprs); ++i) {
2774
    SExprInfo* pExp = &pExprs[i];
×
2775
    int32_t    code = createExprFromOneNode(pExp, nodesListGetNode(pNodeList, i), i + UD_TAG_COLUMN_INDEX);
×
2776
    if (code != TSDB_CODE_SUCCESS) {
×
2777
      taosMemoryFreeClear(pExprs);
×
2778
      terrno = code;
×
2779
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2780
      return NULL;
×
UNCOV
2781
    }
×
2782
  }
2783

2784
  return pExprs;
UNCOV
2785
}
×
2786

2787
int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs) {
2788
  QRY_PARAM_CHECK(pExprInfo);
463,684,063✔
2789

463,684,063✔
2790
  int32_t code = 0;
2791
  int32_t numOfFuncs = LIST_LENGTH(pNodeList);
463,836,318✔
2792
  int32_t numOfGroupKeys = 0;
463,836,318✔
2793
  if (pGroupKeys != NULL) {
463,770,156✔
2794
    numOfGroupKeys = LIST_LENGTH(pGroupKeys);
463,770,156✔
2795
  }
43,801,399✔
2796

2797
  *numOfExprs = numOfFuncs + numOfGroupKeys;
2798
  if (*numOfExprs == 0) {
463,768,427✔
2799
    return code;
463,610,374✔
2800
  }
55,565,091✔
2801

2802
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
2803
  if (pExprs == NULL) {
408,172,957✔
2804
    return terrno;
407,514,385✔
UNCOV
2805
  }
×
2806

2807
  for (int32_t i = 0; i < (*numOfExprs); ++i) {
2808
    STargetNode* pTargetNode = NULL;
1,404,765,767✔
2809
    if (i < numOfFuncs) {
1,004,378,812✔
2810
      pTargetNode = (STargetNode*)nodesListGetNode(pNodeList, i);
1,004,378,812✔
2811
    } else {
941,846,533✔
2812
      pTargetNode = (STargetNode*)nodesListGetNode(pGroupKeys, i - numOfFuncs);
2813
    }
62,532,279✔
2814
    if (!pTargetNode) {
2815
      destroyExprInfo(pExprs, *numOfExprs);
1,004,591,823✔
2816
      taosMemoryFreeClear(pExprs);
×
2817
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
2818
      return terrno;
×
UNCOV
2819
    }
×
2820

2821
    SExprInfo* pExp = &pExprs[i];
2822
    code = createExprFromTargetNode(pExp, pTargetNode);
1,004,591,823✔
2823
    if (code != TSDB_CODE_SUCCESS) {
1,004,591,870✔
2824
      destroyExprInfo(pExprs, *numOfExprs);
1,004,344,038✔
2825
      taosMemoryFreeClear(pExprs);
7,092,656✔
2826
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
7,092,178✔
2827
      return code;
7,091,123✔
2828
    }
7,092,388✔
2829
  }
2830

2831
  *pExprInfo = pExprs;
2832
  return code;
400,821,245✔
2833
}
401,048,426✔
2834

2835
static void deleteSubsidiareCtx(void* pData) {
2836
  SSubsidiaryResInfo* pCtx = (SSubsidiaryResInfo*)pData;
×
2837
  if (pCtx->pCtx) {
×
2838
    taosMemoryFreeClear(pCtx->pCtx);
×
UNCOV
2839
  }
×
2840
}
UNCOV
2841

×
2842
// set the output buffer for the selectivity + tag query
2843
static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
2844
  int32_t num = 0;
437,363,730✔
2845
  int32_t code = TSDB_CODE_SUCCESS;
437,363,730✔
2846
  int32_t lino = 0;
437,363,730✔
2847

437,363,730✔
2848
  SArray* pValCtxArray = NULL;
2849
  for (int32_t i = numOfOutput - 1; i > 0; --i) {  // select Func is at the end of the list
437,363,730✔
2850
    int32_t funcIdx = pCtx[i].pExpr->pExpr->_function.bindExprID;
1,028,127,735✔
2851
    if (funcIdx > 0) {
590,906,250✔
2852
      if (pValCtxArray == NULL) {
590,847,599✔
2853
        // the end of the list is the select function of biggest index
1,445,091✔
2854
        pValCtxArray = taosArrayInit_s(sizeof(SSubsidiaryResInfo*), funcIdx);
2855
        if (pValCtxArray == NULL) {
1,037,499✔
2856
          return terrno;
1,037,105✔
UNCOV
2857
        }
×
2858
      }
2859
      if (funcIdx > pValCtxArray->size) {
2860
        qError("funcIdx:%d is out of range", funcIdx);
1,444,697✔
2861
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2862
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
2863
      }
×
2864
      SSubsidiaryResInfo* pSubsidiary = &pCtx[i].subsidiaries;
2865
      pSubsidiary->pCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
1,445,091✔
2866
      if (pSubsidiary->pCtx == NULL) {
1,445,091✔
2867
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
1,443,909✔
2868
        return terrno;
×
UNCOV
2869
      }
×
2870
      pSubsidiary->num = 0;
2871
      taosArraySet(pValCtxArray, funcIdx - 1, &pSubsidiary);
1,445,091✔
2872
    }
1,444,697✔
2873
  }
2874

2875
  SqlFunctionCtx*  p = NULL;
2876
  SqlFunctionCtx** pValCtx = NULL;
437,221,485✔
2877
  if (pValCtxArray == NULL) {
437,221,485✔
2878
    pValCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
437,221,485✔
2879
    if (pValCtx == NULL) {
436,229,705✔
2880
      QUERY_CHECK_CODE(terrno, lino, _end);
436,121,352✔
UNCOV
2881
    }
×
2882
  }
2883

2884
  for (int32_t i = 0; i < numOfOutput; ++i) {
2885
    const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
1,419,280,499✔
2886
    if ((strcmp(pName, "_select_value") == 0)) {
982,210,893✔
2887
      if (pValCtxArray == NULL) {
982,089,398✔
2888
        pValCtx[num++] = &pCtx[i];
7,659,371✔
2889
      } else {
5,630,506✔
2890
        int32_t bindFuncIndex = pCtx[i].pExpr->pExpr->relatedTo;  // start from index 1;
2891
        if (bindFuncIndex > 0) {                                  // 0 is default index related to the select function
2,028,865✔
2892
          bindFuncIndex -= 1;
2,028,513✔
2893
        }
1,981,233✔
2894
        SSubsidiaryResInfo** pSubsidiary = taosArrayGet(pValCtxArray, bindFuncIndex);
2895
        if (pSubsidiary == NULL) {
2,028,513✔
2896
          QUERY_CHECK_CODE(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, lino, _end);
2,028,119✔
UNCOV
2897
        }
×
2898
        (*pSubsidiary)->pCtx[(*pSubsidiary)->num] = &pCtx[i];
2899
        (*pSubsidiary)->num++;
2,028,119✔
2900
      }
2,028,119✔
2901
    } else if (fmIsSelectFunc(pCtx[i].functionId)) {
2902
      if (pValCtxArray == NULL) {
974,430,027✔
2903
        p = &pCtx[i];
63,698,029✔
2904
      }
61,938,465✔
2905
    }
2906
  }
2907

2908
  if (p != NULL) {
2909
    p->subsidiaries.pCtx = pValCtx;
437,069,606✔
2910
    p->subsidiaries.num = num;
28,412,408✔
2911
  } else {
28,404,026✔
2912
    taosMemoryFreeClear(pValCtx);
2913
  }
408,657,198✔
2914

2915
_end:
2916
  if (code != TSDB_CODE_SUCCESS) {
871,641✔
2917
    taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
437,086,333✔
2918
    taosMemoryFreeClear(pValCtx);
×
2919
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
2920
  } else {
×
2921
    taosArrayDestroy(pValCtxArray);
2922
  }
437,086,333✔
2923
  return code;
2924
}
437,126,228✔
2925

2926
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset,
2927
                                     SFunctionStateStore* pStore) {
437,409,230✔
2928
  int32_t         code = TSDB_CODE_SUCCESS;
2929
  int32_t         lino = 0;
437,409,230✔
2930
  SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
437,409,230✔
2931
  if (pFuncCtx == NULL) {
437,409,230✔
2932
    return NULL;
436,761,080✔
UNCOV
2933
  }
×
2934

2935
  *rowEntryInfoOffset = taosMemoryCalloc(numOfOutput, sizeof(int32_t));
2936
  if (*rowEntryInfoOffset == 0) {
436,761,080✔
2937
    taosMemoryFreeClear(pFuncCtx);
437,495,904✔
2938
    return NULL;
×
UNCOV
2939
  }
×
2940

2941
  for (int32_t i = 0; i < numOfOutput; ++i) {
2942
    SExprInfo* pExpr = &pExprInfo[i];
1,419,669,023✔
2943

982,482,419✔
2944
    SExprBasicInfo* pFunct = &pExpr->base;
2945
    SqlFunctionCtx* pCtx = &pFuncCtx[i];
982,230,560✔
2946

982,320,009✔
2947
    pCtx->functionId = -1;
2948
    pCtx->pExpr = pExpr;
982,321,140✔
2949

982,468,594✔
2950
    if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) {
2951
      SFuncExecEnv env = {0};
982,409,839✔
2952
      pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
368,491,150✔
2953
      pCtx->isPseudoFunc = fmIsWindowPseudoColumnFunc(pCtx->functionId) || fmIsPlaceHolderFunc(pCtx->functionId);
368,521,714✔
2954
      pCtx->isNotNullFunc = fmIsNotNullOutputFunc(pCtx->functionId);
368,510,890✔
2955

368,507,721✔
2956
      bool isUdaf = fmIsUserDefinedFunc(pCtx->functionId);
2957
      if (fmIsAggFunc(pCtx->functionId) || fmIsIndefiniteRowsFunc(pCtx->functionId)) {
368,618,097✔
2958
        if (!isUdaf) {
610,148,650✔
2959
          code = fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
241,650,389✔
2960
          QUERY_CHECK_CODE(code, lino, _end);
241,617,328✔
2961
        } else {
241,590,465✔
2962
          char* udfName = pExpr->pExpr->_function.pFunctNode->functionName;
2963
          pCtx->udfName = taosStrdup(udfName);
33,061✔
2964
          QUERY_CHECK_NULL(pCtx->udfName, code, lino, _end, terrno);
33,061✔
2965

33,061✔
2966
          code = fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
2967
          QUERY_CHECK_CODE(code, lino, _end);
33,061✔
2968
        }
33,061✔
2969
        bool tmp = pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
2970
        if (!tmp) {
241,623,526✔
2971
          code = terrno;
241,607,758✔
UNCOV
2972
          QUERY_CHECK_CODE(code, lino, _end);
×
2973
        }
14,912✔
2974
      } else {
2975
        if (fmIsPlaceHolderFunc(pCtx->functionId)) {
2976
          code = fmGetStreamPesudoFuncEnv(pCtx->functionId, pExpr->base.pParamList, &env);
126,822,793✔
2977
          QUERY_CHECK_CODE(code, lino, _end);
5,825,761✔
2978
        }      
5,826,180✔
2979
        
2980
        code = fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp);
2981
        if (code != TSDB_CODE_SUCCESS && isUdaf) {
126,838,792✔
2982
          code = TSDB_CODE_SUCCESS;
126,834,927✔
2983
        }
53,892✔
2984
        QUERY_CHECK_CODE(code, lino, _end);
2985

126,834,927✔
2986
        if (pCtx->sfp.getEnv != NULL) {
2987
          bool tmp = pCtx->sfp.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
126,834,927✔
2988
          if (!tmp) {
19,001,139✔
2989
            code = terrno;
19,007,626✔
2990
            QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
2991
          }
×
2992
        }
2993
      }
2994
      pCtx->resDataInfo.interBufSize = env.calcMemSize;
2995
    } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR ||
368,472,069✔
2996
               pExpr->pExpr->nodeType == QUERY_NODE_VALUE) {
613,693,506✔
2997
      // for simple column, the result buffer needs to hold at least one element.
44,402,905✔
2998
      pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes;
2999
    }
614,011,006✔
3000

3001
    pCtx->input.numOfInputCols = pFunct->numOfParams;
3002
    pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
982,471,071✔
3003
    QUERY_CHECK_NULL(pCtx->input.pData, code, lino, _end, terrno);
982,370,460✔
3004
    pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
982,385,745✔
3005
    QUERY_CHECK_NULL(pCtx->input.pColumnDataAgg, code, lino, _end, terrno);
982,473,254✔
3006

982,400,431✔
3007
    pCtx->pTsOutput = NULL;
3008
    pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
982,491,525✔
3009
    pCtx->resDataInfo.type = pFunct->resSchema.type;
982,558,607✔
3010
    pCtx->order = TSDB_ORDER_ASC;
982,649,923✔
3011
    pCtx->start.key = INT64_MIN;
982,563,892✔
3012
    pCtx->end.key = INT64_MIN;
982,425,769✔
3013
    pCtx->numOfParams = pExpr->base.numOfParams;
982,532,532✔
3014
    pCtx->param = pFunct->pParam;
982,662,322✔
3015
    pCtx->saveHandle.currentPage = -1;
982,454,077✔
3016
    pCtx->pStore = pStore;
982,782,668✔
3017
    pCtx->hasWindowOrGroup = false;
982,520,454✔
3018
    pCtx->needCleanup = false;
982,550,858✔
3019
    pCtx->skipDynDataCheck = false;
982,068,743✔
3020
  }
982,438,704✔
3021

3022
  for (int32_t i = 1; i < numOfOutput; ++i) {
3023
    (*rowEntryInfoOffset)[i] = (int32_t)((*rowEntryInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) +
1,028,331,750✔
3024
                                         pFuncCtx[i - 1].resDataInfo.interBufSize);
1,182,025,387✔
3025
  }
591,009,465✔
3026

3027
  code = setSelectValueColumnInfo(pFuncCtx, numOfOutput);
3028
  QUERY_CHECK_CODE(code, lino, _end);
437,287,837✔
3029

437,120,505✔
3030
_end:
3031
  if (code != TSDB_CODE_SUCCESS) {
437,120,505✔
3032
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
437,105,651✔
3033
    for (int32_t i = 0; i < numOfOutput; ++i) {
×
3034
      taosMemoryFree(pFuncCtx[i].input.pData);
×
3035
      taosMemoryFree(pFuncCtx[i].input.pColumnDataAgg);
×
UNCOV
3036
    }
×
3037
    taosMemoryFreeClear(*rowEntryInfoOffset);
3038
    taosMemoryFreeClear(pFuncCtx);
×
UNCOV
3039

×
3040
    terrno = code;
3041
    return NULL;
×
UNCOV
3042
  }
×
3043
  return pFuncCtx;
3044
}
437,105,651✔
3045

3046
// NOTE: sources columns are more than the destination SSDatablock columns.
3047
// doFilter in table scan needs every column even its output is false
3048
int32_t relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn) {
3049
  int32_t code = TSDB_CODE_SUCCESS;
9,897,459✔
3050
  size_t  numOfSrcCols = taosArrayGetSize(pCols);
9,897,459✔
3051

9,897,459✔
3052
  int32_t i = 0, j = 0;
3053
  while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
9,897,941✔
3054
    SColumnInfoData* p = taosArrayGet(pCols, i);
70,229,367✔
3055
    if (!p) {
60,333,836✔
3056
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
60,332,660✔
3057
      return terrno;
×
UNCOV
3058
    }
×
3059
    SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j);
3060
    if (!pmInfo) {
60,332,660✔
3061
      return terrno;
60,333,007✔
UNCOV
3062
    }
×
3063

3064
    if (p->info.colId == pmInfo->colId) {
3065
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->dstSlotId);
60,333,007✔
3066
      if (!pDst) {
54,303,975✔
3067
        return terrno;
54,304,205✔
UNCOV
3068
      }
×
3069
      code = colDataAssign(pDst, p, pBlock->info.rows, &pBlock->info);
3070
      if (code != TSDB_CODE_SUCCESS) {
54,304,205✔
3071
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
54,302,671✔
3072
        return code;
×
UNCOV
3073
      }
×
3074
      i++;
3075
      j++;
54,302,671✔
3076
    } else if (p->info.colId < pmInfo->colId) {
54,302,671✔
3077
      i++;
6,029,372✔
3078
    } else {
6,028,755✔
3079
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
3080
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
3081
    }
×
3082
  }
3083
  return code;
3084
}
9,898,076✔
3085

3086
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
3087
  SInterval interval = {
235,736,216✔
3088
      .interval = pTableScanNode->interval,
471,448,598✔
3089
      .sliding = pTableScanNode->sliding,
235,479,161✔
3090
      .intervalUnit = pTableScanNode->intervalUnit,
235,480,962✔
3091
      .slidingUnit = pTableScanNode->slidingUnit,
235,875,193✔
3092
      .offset = pTableScanNode->offset,
235,565,675✔
3093
      .precision = pTableScanNode->scan.node.pOutputDataBlockDesc->precision,
235,171,289✔
3094
      .timeRange = pTableScanNode->scanRange,
235,097,377✔
3095
  };
3096
  calcIntervalAutoOffset(&interval);
3097

235,862,826✔
3098
  return interval;
3099
}
234,947,886✔
3100

3101
SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
3102
  SColumn c = {0};
68,721,736✔
3103

68,721,736✔
3104
  c.slotId = pColNode->slotId;
3105
  c.colId = pColNode->colId;
68,721,736✔
3106
  c.type = pColNode->node.resType.type;
68,713,263✔
3107
  c.bytes = pColNode->node.resType.bytes;
68,723,045✔
3108
  c.scale = pColNode->node.resType.scale;
68,694,667✔
3109
  c.precision = pColNode->node.resType.precision;
68,701,298✔
3110
  return c;
68,683,077✔
3111
}
68,719,722✔
3112

3113

3114
/**
3115
 * @brief Determine the actual time range for reading data based on the RANGE clause and the WHERE conditions.
3116
 * @param[in] cond The range specified by WHERE condition.
3117
 * @param[in] range The range specified by RANGE clause.
3118
 * @param[out] twindow The range to be read in DESC order, and only one record is needed.
3119
 * @param[out] extTwindow The external range to read for only one record, which is used for FILL clause.
3120
 * @note `cond` and `twindow` may be the same address.
3121
 */
3122
static int32_t getQueryExtWindow(const STimeWindow* cond, const STimeWindow* range, STimeWindow* twindow,
3123
                                 STimeWindow* extTwindows) {
1,990,581✔
3124
  int32_t     code = TSDB_CODE_SUCCESS;
3125
  int32_t     lino = 0;
1,990,581✔
3126
  STimeWindow tempWindow;
1,990,581✔
3127

3128
  if (cond->skey > cond->ekey || range->skey > range->ekey) {
3129
    *twindow = extTwindows[0] = extTwindows[1] = TSWINDOW_DESC_INITIALIZER;
1,990,581✔
3130
    return code;
2,485✔
3131
  }
2,485✔
3132

3133
  if (range->ekey < cond->skey) {
3134
    extTwindows[1] = *cond;
1,988,096✔
3135
    *twindow = extTwindows[0] = TSWINDOW_DESC_INITIALIZER;
315,857✔
3136
    return code;
315,857✔
3137
  }
315,857✔
3138

3139
  if (cond->ekey < range->skey) {
3140
    extTwindows[0] = *cond;
1,672,239✔
3141
    *twindow = extTwindows[1] = TSWINDOW_DESC_INITIALIZER;
205,536✔
3142
    return code;
205,536✔
3143
  }
205,536✔
3144

3145
  // Only scan data in the time range intersecion.
3146
  extTwindows[0] = extTwindows[1] = *cond;
3147
  twindow->skey = TMAX(cond->skey, range->skey);
1,466,703✔
3148
  twindow->ekey = TMIN(cond->ekey, range->ekey);
1,466,703✔
3149
  extTwindows[0].ekey = twindow->skey - 1;
1,466,703✔
3150
  extTwindows[1].skey = twindow->ekey + 1;
1,466,703✔
3151

1,466,703✔
3152
  return code;
3153
}
1,466,703✔
3154

3155
static int32_t getPrimaryTimeRange(SNode** pPrimaryKeyCond, STimeWindow* pTimeRange, bool* isStrict) {
3156
  SNode*  pNew = NULL;
10,560✔
3157
  int32_t code = scalarCalculateRemoteConstants(*pPrimaryKeyCond, &pNew);
10,560✔
3158
  if (TSDB_CODE_SUCCESS == code) {
10,560✔
3159
    *pPrimaryKeyCond = pNew;
10,560✔
3160
    if (nodeType(pNew) != QUERY_NODE_VALUE) {
10,560✔
3161
      code = filterGetTimeRange(*pPrimaryKeyCond, pTimeRange, isStrict, NULL);
10,560✔
3162
    }
10,560✔
3163
  }
3164
  return code;
3165
}
10,560✔
3166

3167
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, STableScanPhysiNode* pTableScanNode,
3168
                               const SReadHandle* readHandle, bool applyExtWin) {
264,410,449✔
3169
  int32_t code = 0;                             
3170
  pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
264,410,449✔
3171
  pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
264,410,449✔
3172

263,986,674✔
3173
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
3174
  if (!pCond->colList) {
264,212,545✔
3175
    return terrno;
264,224,422✔
UNCOV
3176
  }
×
3177
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
3178
  if (pCond->pSlotList == NULL) {
264,147,068✔
3179
    taosMemoryFreeClear(pCond->colList);
264,299,746✔
3180
    return terrno;
×
UNCOV
3181
  }
×
3182

3183
  // TODO: get it from stable scan node
3184
  pCond->twindows = pTableScanNode->scanRange;
3185
  pCond->suid = pTableScanNode->scan.suid;
263,978,570✔
3186
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
264,576,769✔
3187
  pCond->startVersion = -1;
264,358,817✔
3188
  pCond->endVersion = -1;
264,298,431✔
3189
  pCond->skipRollup = readHandle->skipRollup;
263,847,646✔
3190
  if (readHandle->winRangeValid) {
264,294,546✔
3191
    pCond->twindows = readHandle->winRange;
263,988,246✔
3192
  }
255,030✔
3193
  pCond->cacheSttStatis = readHandle->cacheSttStatis;
3194
  // allowed read stt file optimization mode
264,218,958✔
3195
  pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) &&
3196
                       (pTableScanNode->scan.node.pConditions == NULL) && (pTableScanNode->interval == 0);
528,903,041✔
3197

264,327,555✔
3198
  int32_t j = 0;
3199
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
264,268,086✔
3200
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pTableScanNode->scan.pScanCols, i);
1,127,102,539✔
3201
    if (!pNode) {
863,232,279✔
3202
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
861,874,133✔
3203
      return terrno;
×
UNCOV
3204
    }
×
3205
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
3206
    if (pColNode->colType == COLUMN_TYPE_TAG) {
861,874,133✔
3207
      continue;
862,736,894✔
UNCOV
3208
    }
×
3209

3210
    pCond->colList[j].type = pColNode->node.resType.type;
3211
    pCond->colList[j].bytes = pColNode->node.resType.bytes;
862,585,766✔
3212
    pCond->colList[j].colId = pColNode->colId;
863,107,686✔
3213
    pCond->colList[j].pk = pColNode->isPk;
863,170,792✔
3214

863,294,991✔
3215
    pCond->pSlotList[j] = pNode->slotId;
3216
    j += 1;
863,059,130✔
3217
  }
862,834,453✔
3218

3219
  pCond->numOfCols = j;
3220

264,376,420✔
3221
  if (applyExtWin) {
3222
    if (NULL != pTableScanNode->pExtScanRange) {
264,540,814✔
3223
      pCond->type = TIMEWINDOW_RANGE_EXTERNAL;
236,157,501✔
3224
      code = getQueryExtWindow(&pCond->twindows, pTableScanNode->pExtScanRange, &pCond->twindows, pCond->extTwindows);
1,990,581✔
3225
    } else if (readHandle->extWinRangeValid) {
1,990,581✔
3226
      pCond->type = TIMEWINDOW_RANGE_EXTERNAL;
233,440,995✔
3227
      code = getQueryExtWindow(&pCond->twindows, &readHandle->extWinRange, &pCond->twindows, pCond->extTwindows);
×
UNCOV
3228
    }
×
3229
  }
3230

3231
  if (pTableScanNode->pPrimaryCond) {
3232
    bool isStrict = false;
264,291,708✔
3233
    code = getPrimaryTimeRange((SNode**)&pTableScanNode->pPrimaryCond, &pCond->twindows, &isStrict);
10,560✔
3234
    if (code || !isStrict) {
10,560✔
3235
      code = nodesMergeNode((SNode**)&pTableScanNode->scan.node.pConditions, &pTableScanNode->pPrimaryCond);
10,560✔
UNCOV
3236
    }
×
3237
  }
3238

3239
  return code;
3240
}
264,352,548✔
3241

3242
int32_t initQueryTableDataCondWithColArray(SQueryTableDataCond* pCond, SQueryTableDataCond* pOrgCond,
3243
                                           const SReadHandle* readHandle, SArray* colArray) {
1,480,484✔
3244
  int32_t code = TSDB_CODE_SUCCESS;
3245
  int32_t lino = 0;
1,480,484✔
3246

1,480,484✔
3247
  pCond->order = TSDB_ORDER_ASC;
3248
  pCond->numOfCols = (int32_t)taosArrayGetSize(colArray);
1,480,484✔
3249

1,480,484✔
3250
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
3251
  QUERY_CHECK_NULL(pCond->colList, code, lino, _return, terrno);
1,480,484✔
3252

1,480,484✔
3253
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
3254
  QUERY_CHECK_NULL(pCond->pSlotList, code, lino, _return, terrno);
1,480,484✔
3255

1,480,484✔
3256
  pCond->twindows = pOrgCond->twindows;
3257
  pCond->order = pOrgCond->order;
1,480,484✔
3258
  pCond->type = pOrgCond->type;
1,480,484✔
3259
  pCond->startVersion = -1;
1,480,484✔
3260
  pCond->endVersion = -1;
1,480,484✔
3261
  pCond->skipRollup = true;
1,480,484✔
3262
  pCond->notLoadData = false;
1,480,484✔
3263

1,480,484✔
3264
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
3265
    SColIdPair* pColPair = taosArrayGet(colArray, i);
6,587,326✔
3266
    QUERY_CHECK_NULL(pColPair, code, lino, _return, terrno);
5,106,842✔
3267

5,106,842✔
3268
    bool find = false;
3269
    for (int32_t j = 0; j < pOrgCond->numOfCols; ++j) {
5,106,842✔
3270
      if (pOrgCond->colList[j].colId == pColPair->vtbColId) {
27,465,461✔
3271
        pCond->colList[i].type = pOrgCond->colList[j].type;
27,465,461✔
3272
        pCond->colList[i].bytes = pOrgCond->colList[j].bytes;
5,106,842✔
3273
        pCond->colList[i].colId = pColPair->orgColId;
5,106,842✔
3274
        pCond->colList[i].pk = pOrgCond->colList[j].pk;
5,106,842✔
3275
        pCond->pSlotList[i] = i;
5,106,842✔
3276
        find = true;
5,106,842✔
3277
        qDebug("%s mapped vtb colId:%d to org colId:%d", __func__, pColPair->vtbColId, pColPair->orgColId);
5,106,842✔
3278
        break;
5,106,842✔
3279
      }
5,106,842✔
3280
    }
3281
    QUERY_CHECK_CONDITION(find, code, lino, _return, TSDB_CODE_NOT_FOUND);
3282
  }
5,106,842✔
3283

3284
  return code;
3285
_return:
1,480,484✔
3286
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(terrno));
×
3287
  taosMemoryFreeClear(pCond->colList);
×
3288
  taosMemoryFreeClear(pCond->pSlotList);
×
3289
  return code;
×
UNCOV
3290
}
×
3291

3292
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) {
3293
  taosMemoryFreeClear(pCond->colList);
544,913,704✔
3294
  taosMemoryFreeClear(pCond->pSlotList);
544,913,704✔
3295
}
544,901,058✔
3296

544,837,452✔
3297
int32_t convertFillType(int32_t mode) {
3298
  int32_t type = TSDB_FILL_NONE;
2,566,694✔
3299
  switch (mode) {
2,566,694✔
3300
    case FILL_MODE_PREV:
2,566,694✔
3301
      type = TSDB_FILL_PREV;
126,768✔
3302
      break;
126,768✔
3303
    case FILL_MODE_NONE:
126,768✔
3304
      type = TSDB_FILL_NONE;
×
3305
      break;
×
UNCOV
3306
    case FILL_MODE_NULL:
×
3307
      type = TSDB_FILL_NULL;
156,862✔
3308
      break;
156,862✔
3309
    case FILL_MODE_NULL_F:
156,862✔
3310
      type = TSDB_FILL_NULL_F;
36,907✔
3311
      break;
36,907✔
3312
    case FILL_MODE_NEXT:
36,907✔
3313
      type = TSDB_FILL_NEXT;
126,675✔
3314
      break;
126,675✔
3315
    case FILL_MODE_VALUE:
126,675✔
3316
      type = TSDB_FILL_SET_VALUE;
145,343✔
3317
      break;
145,343✔
3318
    case FILL_MODE_VALUE_F:
145,343✔
3319
      type = TSDB_FILL_SET_VALUE_F;
10,826✔
3320
      break;
10,826✔
3321
    case FILL_MODE_LINEAR:
10,826✔
3322
      type = TSDB_FILL_LINEAR;
191,341✔
3323
      break;
191,341✔
3324
    case FILL_MODE_NEAR:
191,341✔
3325
      type = TSDB_FILL_NEAR;
1,771,972✔
3326
      break;
1,771,972✔
3327
    default:
1,771,972✔
3328
      type = TSDB_FILL_NONE;
×
UNCOV
3329
  }
×
3330

3331
  return type;
3332
}
2,566,694✔
3333

3334
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery) {
3335
  if (ascQuery) {
2,147,483,647✔
3336
    *w = getAlignQueryTimeWindow(pInterval, ts);
2,147,483,647✔
3337
  } else {
2,147,483,647✔
3338
    // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
3339
    *w = getAlignQueryTimeWindow(pInterval, ts);
3340

33,542✔
3341
    int64_t key = w->skey;
3342
    while (key < ts) {  // moving towards end
263,827✔
3343
      key = getNextTimeWindowStart(pInterval, key, TSDB_ORDER_ASC);
279,257✔
3344
      if (key > ts) {
124,033✔
3345
        break;
123,543✔
3346
      }
108,113✔
3347

3348
      w->skey = key;
3349
    }
15,430✔
3350
    w->ekey = taosTimeAdd(w->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
3351
  }
263,337✔
3352
}
3353

2,147,483,647✔
3354
static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) {
3355
  STimeWindow w = {0};
64,120,060✔
3356

64,120,060✔
3357
  w.skey = taosTimeTruncate(ts, pInterval);
3358
  w.ekey = taosTimeGetIntervalEnd(w.skey, pInterval);
64,120,060✔
3359
  return w;
64,119,274✔
3360
}
64,123,460✔
3361

3362
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) {
3363
  STimeWindow win = *pWindow;
1,126,613✔
3364
  STimeWindow save = win;
1,126,613✔
3365
  while (win.skey <= ts && win.ekey >= ts) {
1,126,613✔
3366
    save = win;
6,402,812✔
3367
    // get previous time window
5,276,199✔
3368
    getNextTimeWindow(pInterval, &win, order == TSDB_ORDER_DESC ? TSDB_ORDER_ASC : TSDB_ORDER_DESC);
3369
  }
5,276,199✔
3370

3371
  return save;
3372
}
1,126,613✔
3373

3374
// get the correct time window according to the handled timestamp
3375
// todo refactor
3376
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
3377
                                int32_t order) {
103,517,781✔
3378
  STimeWindow w = {0};
3379
  if (pResultRowInfo->cur.pageId == -1) {  // the first window, from the previous stored value
103,517,781✔
3380
    getInitialStartTimeWindow(pInterval, ts, &w, (order != TSDB_ORDER_DESC));
103,522,992✔
3381
    return w;
7,029,825✔
3382
  }
7,056,559✔
3383

3384
  SResultRow* pRow = getResultRowByPos(pBuf, &pResultRowInfo->cur, false);
3385
  if (pRow) {
96,463,397✔
3386
    TAOS_SET_OBJ_ALIGNED(&w, pRow->win);
96,464,941✔
3387
  }
96,467,123✔
3388

3389
  // in case of typical time window, we can calculate time window directly.
3390
  if (w.skey > ts || w.ekey < ts) {
3391
    w = doCalculateTimeWindow(ts, pInterval);
96,465,865✔
3392
  }
64,121,684✔
3393

3394
  if (pInterval->interval != pInterval->sliding) {
3395
    // it is an sliding window query, in which sliding value is not equalled to
96,467,641✔
3396
    // interval value, and we need to find the first qualified time window.
3397
    w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
3398
  }
1,126,613✔
3399

3400
  return w;
3401
}
96,465,196✔
3402

3403
TSKEY getNextTimeWindowStart(const SInterval* pInterval, TSKEY start, int32_t order) {
3404
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
2,147,483,647✔
3405
  TSKEY   nextStart = taosTimeAdd(start, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
2,147,483,647✔
3406
  nextStart = taosTimeAdd(nextStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision, NULL);
2,147,483,647✔
3407
  nextStart = taosTimeAdd(nextStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
2,147,483,647✔
3408
  return nextStart;
2,147,483,647✔
3409
}
2,147,483,647✔
3410

3411
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order) {
3412
  tw->skey = getNextTimeWindowStart(pInterval, tw->skey, order);
2,147,483,647✔
3413
  tw->ekey = taosTimeAdd(tw->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
2,147,483,647✔
3414
}
2,147,483,647✔
3415

2,147,483,647✔
3416
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
3417
  return (pLimitInfo->limit.limit != -1 || pLimitInfo->limit.offset != -1 || pLimitInfo->slimit.limit != -1 ||
406,557,952✔
3418
          pLimitInfo->slimit.offset != -1);
809,313,548✔
3419
}
402,756,677✔
3420

3421
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo) {
3422
  return (pLimitInfo->slimit.limit != -1 || pLimitInfo->slimit.offset != -1);
×
UNCOV
3423
}
×
3424

3425
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) {
3426
  SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)};
594,619,708✔
3427
  SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)};
594,619,708✔
3428

594,053,001✔
3429
  pLimitInfo->limit = limit;
3430
  pLimitInfo->slimit = slimit;
594,184,078✔
3431
  pLimitInfo->remainOffset = limit.offset;
594,214,604✔
3432
  pLimitInfo->remainGroupOffset = slimit.offset;
594,091,917✔
3433
  pLimitInfo->numOfOutputRows = 0;
594,080,138✔
3434
  pLimitInfo->numOfOutputGroups = 0;
594,042,131✔
3435
  pLimitInfo->currentGroupId = 0;
594,192,340✔
3436
}
594,239,097✔
3437

594,603,127✔
3438
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo) {
3439
  pLimitInfo->numOfOutputRows = 0;
68,575,619✔
3440
  pLimitInfo->remainOffset = pLimitInfo->limit.offset;
68,575,619✔
3441
}
68,591,433✔
3442

68,575,762✔
3443
int32_t tableListGetSize(const STableListInfo* pTableList, int32_t* pRes) {
3444
  if (taosArrayGetSize(pTableList->pTableList) != taosHashGetSize(pTableList->map)) {
618,448,139✔
3445
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
618,448,139✔
3446
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
3447
  }
×
3448
  (*pRes) = taosArrayGetSize(pTableList->pTableList);
3449
  return TSDB_CODE_SUCCESS;
618,520,986✔
3450
}
618,534,101✔
3451

3452
uint64_t tableListGetSuid(const STableListInfo* pTableList) { return pTableList->idInfo.suid; }
3453

4,710,054✔
3454
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) {
3455
  if (taosArrayGetSize(pTableList->pTableList) == 0) {
198,403,205✔
3456
    return NULL;
198,403,205✔
3457
  }
1,974✔
3458

3459
  return taosArrayGet(pTableList->pTableList, index);
3460
}
198,375,102✔
3461

3462
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex) {
3463
  int32_t numOfTables = taosArrayGetSize(pTableList->pTableList);
42,993✔
3464
  if (startIndex >= numOfTables) {
42,993✔
3465
    return -1;
42,993✔
UNCOV
3466
  }
×
3467

3468
  for (int32_t i = startIndex; i < numOfTables; ++i) {
3469
    STableKeyInfo* p = taosArrayGet(pTableList->pTableList, i);
400,948✔
3470
    if (!p) {
400,948✔
3471
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
400,948✔
3472
      return -1;
×
UNCOV
3473
    }
×
3474
    if (p->uid == uid) {
3475
      return i;
400,948✔
3476
    }
42,993✔
3477
  }
3478
  return -1;
UNCOV
3479
}
×
3480

3481
void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, uint64_t* uid, int32_t* type) {
3482
  *psuid = pTableList->idInfo.suid;
341,739✔
3483
  *uid = pTableList->idInfo.uid;
341,739✔
3484
  *type = pTableList->idInfo.tableType;
341,739✔
3485
}
341,739✔
3486

341,739✔
3487
uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
3488
  int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
752,646,754✔
3489
  if (slot == NULL) {
752,646,754✔
3490
    qDebug("table:%" PRIu64 " not found in table list", tableUid);
752,887,988✔
3491
    return -1;
×
UNCOV
3492
  }
×
3493

3494
  STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot);
3495
  if (pKeyInfo == NULL) {
752,887,988✔
3496
    qDebug("table:%" PRIu64 " not found in table list", tableUid);
753,256,030✔
3497
    return -1;
×
UNCOV
3498
  }
×
3499
  return pKeyInfo->groupId;
3500
}
753,256,030✔
3501

3502
// TODO handle the group offset info, fix it, the rule of group output will be broken by this function
3503
// int32_t tableListRemoveTableInfo(STableListInfo* pTableList, uint64_t uid) {
3504
//   int32_t code = TSDB_CODE_SUCCESS;
3505
//   int32_t lino = 0;
3506

3507
//   int32_t* slot = taosHashGet(pTableList->map, &uid, sizeof(uid));
3508
//   if (slot == NULL) {
3509
//     qDebug("table:%" PRIu64 " not found in table list", uid);
3510
//     return 0;
3511
//   }
3512

3513
//   taosArrayRemove(pTableList->pTableList, *slot);
3514
//   code = taosHashRemove(pTableList->map, &uid, sizeof(uid));
3515

3516
//   _end:
3517
//   if (code != TSDB_CODE_SUCCESS) {
3518
//     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
3519
//   } else {
3520
//     qDebug("uid:%" PRIu64 ", remove from table list", uid);
3521
//   }
3522

3523
//   return code;
3524
// }
3525

3526
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
3527
  int32_t code = TSDB_CODE_SUCCESS;
919,891✔
3528
  int32_t lino = 0;
919,891✔
3529
  if (pTableList->map == NULL) {
919,891✔
3530
    pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
919,891✔
3531
    QUERY_CHECK_NULL(pTableList->map, code, lino, _end, terrno);
×
UNCOV
3532
  }
×
3533

3534
  STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
3535
  void*         p = taosHashGet(pTableList->map, &uid, sizeof(uid));
920,178✔
3536
  if (p != NULL) {
919,317✔
3537
    qInfo("table:%" PRId64 " already in tableIdList, ignore it", uid);
918,729✔
3538
    goto _end;
618✔
3539
  }
618✔
3540

3541
  void* tmp = taosArrayPush(pTableList->pTableList, &keyInfo);
3542
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
918,111✔
3543

919,273✔
3544
  int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
3545
  code = taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
919,273✔
3546
  if (code != TSDB_CODE_SUCCESS) {
919,273✔
3547
    // we have checked the existence of uid in hash map above
920,421✔
3548
    QUERY_CHECK_CONDITION((code != TSDB_CODE_DUP_KEY), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
3549
    taosArrayPopTailBatch(pTableList->pTableList, 1);  // let's pop the last element in the array list
×
UNCOV
3550
  }
×
3551

3552
_end:
3553
  if (code != TSDB_CODE_SUCCESS) {
921,039✔
3554
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
920,178✔
UNCOV
3555
  } else {
×
3556
    qDebug("uid:%" PRIu64 ", groupId:%" PRIu64 " added into table list, slot:%d, total:%d", uid, gid, slot, slot + 1);
3557
  }
920,178✔
3558

3559
  return code;
3560
}
922,187✔
3561

3562
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
3563
                              int32_t* size) {
226,298,537✔
3564
  int32_t totalGroups = tableListGetOutputGroups(pTableList);
3565
  int32_t numOfTables = 0;
226,298,537✔
3566
  int32_t code = tableListGetSize(pTableList, &numOfTables);
226,412,645✔
3567
  if (code != TSDB_CODE_SUCCESS) {
226,512,032✔
3568
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
226,311,643✔
3569
    return code;
×
UNCOV
3570
  }
×
3571

3572
  if (ordinalGroupIndex < 0 || ordinalGroupIndex >= totalGroups) {
3573
    return TSDB_CODE_INVALID_PARA;
226,311,643✔
UNCOV
3574
  }
×
3575

3576
  // here handle two special cases:
3577
  // 1. only one group exists, and 2. one table exists for each group.
3578
  if (totalGroups == 1) {
3579
    *size = numOfTables;
226,311,643✔
3580
    *pKeyInfo = (*size == 0) ? NULL : taosArrayGet(pTableList->pTableList, 0);
225,880,666✔
3581
    return TSDB_CODE_SUCCESS;
226,117,712✔
3582
  } else if (totalGroups == numOfTables) {
225,630,880✔
3583
    *size = 1;
430,977✔
3584
    *pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex);
287,120✔
3585
    return TSDB_CODE_SUCCESS;
287,120✔
3586
  }
287,631✔
3587

3588
  int32_t offset = pTableList->groupOffset[ordinalGroupIndex];
3589
  if (ordinalGroupIndex < totalGroups - 1) {
143,857✔
3590
    *size = pTableList->groupOffset[ordinalGroupIndex + 1] - offset;
53,670✔
3591
  } else {
40,312✔
3592
    *size = numOfTables - offset;
3593
  }
13,358✔
3594

3595
  *pKeyInfo = taosArrayGet(pTableList->pTableList, offset);
3596
  return TSDB_CODE_SUCCESS;
53,670✔
3597
}
53,670✔
3598

3599
int32_t tableListGetOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; }
3600

672,597,646✔
3601
bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; }
3602

1,340,603✔
3603
STableListInfo* tableListCreate() {
3604
  STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo));
275,094,725✔
3605
  if (pListInfo == NULL) {
275,094,725✔
3606
    return NULL;
274,623,021✔
UNCOV
3607
  }
×
3608

3609
  pListInfo->remainGroups = NULL;
3610
  pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
274,623,021✔
3611
  if (pListInfo->pTableList == NULL) {
274,690,090✔
3612
    goto _error;
274,840,544✔
UNCOV
3613
  }
×
3614

3615
  pListInfo->map = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
3616
  if (pListInfo->map == NULL) {
274,983,882✔
3617
    goto _error;
275,463,316✔
UNCOV
3618
  }
×
3619

3620
  pListInfo->numOfOuputGroups = 1;
3621
  return pListInfo;
275,447,958✔
3622

275,438,411✔
3623
_error:
3624
  tableListDestroy(pListInfo);
×
3625
  return NULL;
×
UNCOV
3626
}
×
3627

3628
void tableListDestroy(STableListInfo* pTableListInfo) {
3629
  if (pTableListInfo == NULL) {
285,062,774✔
3630
    return;
285,062,774✔
3631
  }
9,911,380✔
3632

3633
  taosArrayDestroy(pTableListInfo->pTableList);
3634
  taosMemoryFreeClear(pTableListInfo->groupOffset);
275,151,394✔
3635

274,953,980✔
3636
  taosHashCleanup(pTableListInfo->map);
3637
  taosHashCleanup(pTableListInfo->remainGroups);
275,079,749✔
3638
  pTableListInfo->pTableList = NULL;
275,268,892✔
3639
  pTableListInfo->map = NULL;
275,319,325✔
3640
  taosMemoryFree(pTableListInfo);
275,298,626✔
3641
}
275,245,453✔
3642

3643
void tableListClear(STableListInfo* pTableListInfo) {
3644
  if (pTableListInfo == NULL) {
724,285✔
3645
    return;
724,285✔
UNCOV
3646
  }
×
3647

3648
  taosArrayClear(pTableListInfo->pTableList);
3649
  taosHashClear(pTableListInfo->map);
724,285✔
3650
  taosHashClear(pTableListInfo->remainGroups);
725,706✔
3651
  taosMemoryFree(pTableListInfo->groupOffset);
726,007✔
3652
  pTableListInfo->numOfOuputGroups = 1;
726,007✔
3653
  pTableListInfo->oneTableForEachGroup = false;
726,007✔
3654
}
726,007✔
3655

3656
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
3657
  STableKeyInfo* pInfo1 = (STableKeyInfo*)p1;
544,362,982✔
3658
  STableKeyInfo* pInfo2 = (STableKeyInfo*)p2;
544,362,982✔
3659

544,362,982✔
3660
  if (pInfo1->groupId == pInfo2->groupId) {
3661
    return 0;
544,362,982✔
3662
  } else {
513,113,087✔
3663
    return pInfo1->groupId < pInfo2->groupId ? -1 : 1;
3664
  }
31,248,854✔
3665
}
3666

3667
int32_t sortTableGroup(STableListInfo* pTableListInfo) {
3668
  int32_t code = TSDB_CODE_SUCCESS;
28,612,796✔
3669
  taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
28,612,796✔
3670
  int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
28,612,796✔
3671
  if (size == 0) {
28,661,357✔
3672
    pTableListInfo->numOfOuputGroups = 0;
28,651,713✔
3673
    return code;
×
UNCOV
3674
  }
×
3675

3676
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
3677
  if (!pList) {
28,651,713✔
3678
    code = terrno;
28,647,682✔
3679
    goto end;
×
UNCOV
3680
  }
×
3681

3682
  STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
3683
  if (pInfo == NULL) {
28,647,682✔
3684
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
28,593,604✔
3685
    code = terrno;
×
3686
    goto end;
×
UNCOV
3687
  }
×
3688
  uint64_t gid = pInfo->groupId;
3689

28,593,604✔
3690
  int32_t start = 0;
3691
  void*   tmp = taosArrayPush(pList, &start);
28,617,001✔
3692
  if (tmp == NULL) {
28,644,616✔
3693
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
28,644,616✔
3694
    code = terrno;
×
3695
    goto end;
×
UNCOV
3696
  }
×
3697

3698
  for (int32_t i = 1; i < size; ++i) {
3699
    pInfo = taosArrayGet(pTableListInfo->pTableList, i);
150,088,170✔
3700
    if (pInfo == NULL) {
121,461,380✔
3701
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
121,454,546✔
3702
      code = terrno;
×
3703
      goto end;
×
UNCOV
3704
    }
×
3705
    if (pInfo->groupId != gid) {
3706
      tmp = taosArrayPush(pList, &i);
121,454,546✔
3707
      if (tmp == NULL) {
6,406,781✔
3708
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
6,406,781✔
3709
        code = terrno;
×
3710
        goto end;
×
UNCOV
3711
      }
×
3712
      gid = pInfo->groupId;
3713
    }
6,406,781✔
3714
  }
3715

3716
  pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
3717
  pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
28,650,385✔
3718
  if (pTableListInfo->groupOffset == NULL) {
28,648,531✔
3719
    code = terrno;
28,616,688✔
3720
    goto end;
×
UNCOV
3721
  }
×
3722

3723
  memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
3724

28,579,653✔
3725
end:
3726
  taosArrayDestroy(pList);
28,610,861✔
3727
  return code;
28,592,232✔
3728
}
28,555,369✔
3729

3730
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SScanPhysiNode* pScanNode,
3731
                                    SNodeList* group, bool groupSort, uint8_t* digest, SStorageAPI* pAPI, SHashObj* groupIdMap) {
250,860,457✔
3732
  int32_t code = TSDB_CODE_SUCCESS;
3733

250,860,457✔
3734
  bool   groupByTbname = groupbyTbname(group);
3735
  size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
250,860,457✔
3736
  if (!numOfTables) {
250,985,415✔
3737
    return code;
250,819,021✔
3738
  }
3,238✔
3739
  qDebug("numOfTables:%zu, groupByTbname:%d, group:%p", numOfTables, groupByTbname, group);
3740
  if (group == NULL || groupByTbname) {
250,815,783✔
3741
    if (tsCountAlwaysReturnValue && QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode) &&
250,508,879✔
3742
        ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) {
246,721,748✔
3743
      pTableListInfo->remainGroups =
208,708,970✔
3744
          taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
15,470,777✔
3745
      if (pTableListInfo->remainGroups == NULL) {
15,470,777✔
3746
        return terrno;
15,470,777✔
UNCOV
3747
      }
×
3748

3749
      for (int i = 0; i < numOfTables; i++) {
3750
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
68,976,846✔
3751
        if (!info) {
53,505,763✔
3752
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
53,507,324✔
3753
          return terrno;
×
UNCOV
3754
        }
×
3755
        info->groupId = groupByTbname ? info->uid : 0;
3756
        int32_t tempRes = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId),
53,507,324✔
3757
                                      &(info->uid), sizeof(info->uid));
53,508,274✔
3758
        if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
53,507,799✔
3759
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
53,506,544✔
3760
          return tempRes;
×
UNCOV
3761
        }
×
3762
      }
3763
    } else {
3764
      for (int32_t i = 0; i < numOfTables; i++) {
3765
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
808,379,410✔
3766
        if (!info) {
577,417,812✔
3767
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
576,644,780✔
3768
          return terrno;
×
UNCOV
3769
        }
×
3770
        info->groupId = groupByTbname ? info->uid : 0;
3771
        
576,644,780✔
3772
      }
3773
    }
3774
    if (groupIdMap && group != NULL){
3775
      getColInfoResultForGroupbyForStream(pHandle->vnode, group, pTableListInfo, pAPI, groupIdMap);
246,432,681✔
3776
    }
61,086✔
3777

3778
    pTableListInfo->oneTableForEachGroup = groupByTbname;
3779
    if (numOfTables == 1 && pTableListInfo->idInfo.tableType == TSDB_CHILD_TABLE) {
246,432,476✔
3780
      pTableListInfo->oneTableForEachGroup = true;
247,063,808✔
3781
    }
82,347,501✔
3782

3783
    if (groupSort && groupByTbname) {
3784
      taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
247,047,700✔
3785
      pTableListInfo->numOfOuputGroups = numOfTables;
2,103,977✔
3786
    } else if (groupByTbname && pScanNode->groupOrderScan) {
2,105,325✔
3787
      pTableListInfo->numOfOuputGroups = numOfTables;
244,943,723✔
3788
    } else {
29,858✔
3789
      pTableListInfo->numOfOuputGroups = 1;
3790
    }
244,917,641✔
3791
    if (groupSort || pScanNode->groupOrderScan) {
3792
      code = sortTableGroup(pTableListInfo);
247,035,710✔
3793
    }
28,504,495✔
3794
  } else {
3795
    bool initRemainGroups = false;
3796
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) {
3,787,131✔
3797
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode;
3,787,131✔
3798
      if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable &&
3,708,297✔
3799
          !(groupSort || pScanNode->groupOrderScan)) {
3,708,297✔
3800
        initRemainGroups = true;
1,857,164✔
3801
      }
1,834,096✔
3802
    }
3803

3804
    code = getColInfoResultForGroupby(pHandle->vnode, group, pTableListInfo, digest, pAPI, initRemainGroups, groupIdMap);
3805
    if (code != TSDB_CODE_SUCCESS) {
3,787,131✔
3806
      return code;
3,787,248✔
UNCOV
3807
    }
×
3808

3809
    if (pScanNode->groupOrderScan) pTableListInfo->numOfOuputGroups = taosArrayGetSize(pTableListInfo->pTableList);
3810

3,787,248✔
3811
    if (groupSort || pScanNode->groupOrderScan) {
3812
      code = sortTableGroup(pTableListInfo);
3,787,659✔
3813
    }
126,751✔
3814
  }
3815

3816
  // add all table entry in the hash map
3817
  size_t size = taosArrayGetSize(pTableListInfo->pTableList);
3818
  for (int32_t i = 0; i < size; ++i) {
250,772,656✔
3819
    STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i);
903,732,148✔
3820
    if (!p) {
652,851,298✔
3821
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
652,035,492✔
3822
      return terrno;
×
UNCOV
3823
    }
×
3824
    int32_t tempRes = taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t));
3825
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
652,035,492✔
3826
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
653,039,809✔
3827
      return tempRes;
×
UNCOV
3828
    }
×
3829
  }
3830

3831
  return code;
3832
}
251,233,690✔
3833

3834
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
3835
                                STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
264,756,053✔
3836
                                SExecTaskInfo* pTaskInfo, SHashObj* groupIdMap) {
3837
  int64_t     st = taosGetTimestampUs();
3838
  const char* idStr = GET_TASKID(pTaskInfo);
264,744,338✔
3839

264,744,338✔
3840
  if (pHandle == NULL) {
3841
    qError("invalid handle, in creating operator tree, %s", idStr);
264,088,028✔
3842
    return TSDB_CODE_INVALID_PARA;
×
UNCOV
3843
  }
×
3844

3845
  if (pHandle->uid != 0) {
3846
    pScanNode->uid = pHandle->uid;
264,088,028✔
3847
    pScanNode->tableType = TSDB_CHILD_TABLE;
46,520✔
3848
  }
46,520✔
3849
  uint8_t digest[17] = {0};
3850
  int32_t code = getTableList(pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo, digest, idStr,
264,636,499✔
3851
                              &pTaskInfo->storageAPI, pTaskInfo->pStreamRuntimeInfo);
264,669,215✔
3852
  if (code != TSDB_CODE_SUCCESS) {
264,712,943✔
3853
    qError("failed to getTableList, code:%s", tstrerror(code));
264,557,384✔
3854
    return code;
898✔
3855
  }
898✔
3856

3857
  int32_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
3858

264,556,486✔
3859
  int64_t st1 = taosGetTimestampUs();
3860
  pTaskInfo->cost.extractListTime = (st1 - st) / 1000.0;
264,865,001✔
3861
  qDebug("extract queried table list completed, %d tables, elapsed time:%.2f ms %s", numOfTables,
264,865,001✔
3862
         pTaskInfo->cost.extractListTime, idStr);
265,105,567✔
3863

3864
  if (numOfTables == 0) {
3865
    qDebug("no table qualified for query, %s", idStr);
264,676,808✔
3866
    return TSDB_CODE_SUCCESS;
13,975,938✔
3867
  }
13,975,124✔
3868

3869
  code = buildGroupIdMapForAllTables(pTableListInfo, pHandle, pScanNode, pGroupTags, groupSort, digest, &pTaskInfo->storageAPI, groupIdMap);
3870
  if (code != TSDB_CODE_SUCCESS) {
250,700,870✔
3871
    return code;
251,187,826✔
UNCOV
3872
  }
×
3873

3874
  pTaskInfo->cost.groupIdMapTime = (taosGetTimestampUs() - st1) / 1000.0;
3875
  qDebug("generate group id map completed, elapsed time:%.2f ms %s", pTaskInfo->cost.groupIdMapTime, idStr);
251,185,991✔
3876

251,176,134✔
3877
  return TSDB_CODE_SUCCESS;
3878
}
251,132,998✔
3879

3880
char* getStreamOpName(uint16_t opType) {
3881
  switch (opType) {
5,863,711✔
3882
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
5,863,711✔
3883
      return "stream scan";
×
UNCOV
3884
    case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
×
3885
      return "project";
5,700,660✔
3886
    case QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW:
5,700,660✔
3887
      return "external window";
163,051✔
3888
  }
163,051✔
3889
  return "error name";
UNCOV
3890
}
×
3891

3892
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr, int64_t qId) {
3893
  if (qDebugFlag & DEBUG_TRACE) {
373,801,012✔
3894
    if (!pBlock) {
373,801,012✔
3895
      qDebug("%" PRIx64 " %s %s %s: Block is Null", qId, taskIdStr, flag, __func__);
19,270✔
3896
      return;
3,485✔
3897
    } else if (pBlock->info.rows == 0) {
3,485✔
3898
      qDebug("%" PRIx64 " %s %s %s: Block is Empty. block type %d", qId, taskIdStr, flag, __func__, pBlock->info.type);
15,785✔
3899
      return;
×
UNCOV
3900
    }
×
3901
    
3902
    char*   pBuf = NULL;
3903
    int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr, qId);
15,785✔
3904
    if (code == 0) {
15,785✔
3905
      qDebugL("%" PRIx64 " %s %s", qId, __func__, pBuf);
15,785✔
3906
      taosMemoryFree(pBuf);
15,785✔
3907
    }
15,785✔
3908
  }
3909
}
3910

3911
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
3912
  if (!pBlock) {
×
3913
    qDebug("%s===stream===%s %s: Block is Null", taskIdStr, flag, opStr);
×
3914
    return;
×
3915
  } else if (pBlock->info.rows == 0) {
×
3916
    qDebug("%s===stream===%s %s: Block is Empty. block type %d.skey:%" PRId64 ",ekey:%" PRId64 ",version%" PRId64,
×
UNCOV
3917
           taskIdStr, flag, opStr, pBlock->info.type, pBlock->info.window.skey, pBlock->info.window.ekey,
×
3918
           pBlock->info.version);
3919
    return;
UNCOV
3920
  }
×
3921
  if (qDebugFlag & DEBUG_TRACE) {
3922
    char* pBuf = NULL;
×
3923
    char  flagBuf[64];
×
3924
    snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
×
3925
    int32_t code = dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr, 0);
×
3926
    if (code == 0) {
×
3927
      qDebug("%s", pBuf);
×
3928
      taosMemoryFree(pBuf);
×
UNCOV
3929
    }
×
3930
  }
3931
}
3932

3933
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
3934

14,576,732✔
3935
void updateTimeWindowInfo(SColumnInfoData* pColData, const STimeWindow* pWin, int64_t delta) {
3936
  int64_t* ts = (int64_t*)pColData->pData;
2,147,483,647✔
3937

2,147,483,647✔
3938
  int64_t duration = pWin->ekey > pWin->skey ? pWin->ekey - pWin->skey + delta : pWin->skey - pWin->ekey + delta;
3939
  ts[2] = duration;            // set the duration
2,147,483,647✔
3940
  ts[3] = pWin->skey;          // window start key
2,147,483,647✔
3941
  ts[4] = pWin->ekey + delta;  // window end key
2,147,483,647✔
3942
}
2,147,483,647✔
3943

2,147,483,647✔
3944
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock,
3945
                 int32_t rowIndex) {
2,147,483,647✔
3946
  SColumnDataAgg* pColAgg = NULL;
3947
  const char*     isNull = oldkeyBuf;
2,147,483,647✔
3948
  const char*     p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size;
2,147,483,647✔
3949

2,147,483,647✔
3950
  for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
3951
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
2,147,483,647✔
3952
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
2,147,483,647✔
3953
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
2,147,483,647✔
3954

2,147,483,647✔
3955
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
3956
      if (isNull[i] != 1) return 1;
2,147,483,647✔
3957
    } else {
258,867,400✔
3958
      if (isNull[i] != 0) return 1;
3959
      const char* val = colDataGetData(pColInfoData, rowIndex);
2,147,483,647✔
3960
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
3961
        int32_t len = getJsonValueLen(val);
2,147,483,647✔
3962
        if (memcmp(p, val, len) != 0) return 1;
×
3963
        p += len;
×
UNCOV
3964
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
×
3965
        if (IS_STR_DATA_BLOB(pCol->type)) {
2,147,483,647✔
3966
          if (memcmp(p, val, blobDataTLen(val)) != 0) return 1;
1,194,978,400✔
3967
          p += blobDataTLen(val);
×
UNCOV
3968
        } else {
×
3969
          if (memcmp(p, val, varDataTLen(val)) != 0) return 1;
3970
          p += varDataTLen(val);
1,191,003,880✔
3971
        }
1,190,462,680✔
3972
      } else {
3973
        if (0 != memcmp(p, val, pCol->bytes)) return 1;
3974
        p += pCol->bytes;
2,147,483,647✔
3975
      }
2,147,483,647✔
3976
    }
3977
  }
3978
  if ((int32_t)(p - oldkeyBuf) != oldKeysLen) return 1;
3979
  return 0;
2,147,483,647✔
3980
}
2,147,483,647✔
3981

3982
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex) {
3983
  uint32_t        colNum = pSortGroupCols->size;
1,571,680✔
3984
  SColumnDataAgg* pColAgg = NULL;
1,571,680✔
3985
  char*           isNull = keyBuf;
1,571,680✔
3986
  char*           p = keyBuf + sizeof(int8_t) * colNum;
1,571,680✔
3987

1,571,680✔
3988
  for (int32_t i = 0; i < colNum; ++i) {
3989
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
4,300,560✔
3990
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
2,728,880✔
3991
    if (pCol->slotId > pBlock->pDataBlock->size) continue;
2,728,880✔
3992

2,728,880✔
3993
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
3994

2,728,880✔
3995
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
3996
      isNull[i] = 1;
5,457,760✔
3997
    } else {
132,000✔
3998
      isNull[i] = 0;
3999
      const char* val = colDataGetData(pColInfoData, rowIndex);
2,596,880✔
4000
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
2,596,880✔
4001
        int32_t len = getJsonValueLen(val);
2,596,880✔
4002
        memcpy(p, val, len);
×
4003
        p += len;
×
UNCOV
4004
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
×
4005
        if (IS_STR_DATA_BLOB(pCol->type)) {
2,596,880✔
4006
          blobDataCopy(p, val);
970,200✔
4007
          p += blobDataTLen(val);
×
UNCOV
4008
        } else {
×
4009
          varDataCopy(p, val);
4010
          p += varDataTLen(val);
970,200✔
4011
        }
970,200✔
4012
      } else {
4013
        memcpy(p, val, pCol->bytes);
4014
        p += pCol->bytes;
1,626,680✔
4015
      }
1,626,680✔
4016
    }
4017
  }
4018
  return (int32_t)(p - keyBuf);
4019
}
1,571,680✔
4020

4021
uint64_t calcGroupId(char* pData, int32_t len) {
4022
  T_MD5_CTX context;
2,147,483,647✔
4023
  tMD5Init(&context);
2,147,483,647✔
4024
  tMD5Update(&context, (uint8_t*)pData, len);
2,147,483,647✔
4025
  tMD5Final(&context);
2,147,483,647✔
4026

2,147,483,647✔
4027
  // NOTE: only extract the initial 8 bytes of the final MD5 digest
4028
  uint64_t id = 0;
4029
  memcpy(&id, context.digest, sizeof(uint64_t));
2,147,483,647✔
4030
  if (0 == id) memcpy(&id, context.digest + 8, sizeof(uint64_t));
2,147,483,647✔
4031
  return id;
2,147,483,647✔
4032
}
2,147,483,647✔
4033

4034
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
4035
  SNode*     node;
103,840✔
4036
  SNodeList* ret = NULL;
4037
  FOREACH(node, pSortKeys) {
103,840✔
4038
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)node;
316,800✔
4039
    int32_t           code = nodesListMakeAppend(&ret, pSortKey->pExpr);
212,960✔
4040
    if (code != TSDB_CODE_SUCCESS) {
212,960✔
4041
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
212,960✔
4042
      terrno = code;
×
4043
      return NULL;
×
UNCOV
4044
    }
×
4045
  }
4046
  return ret;
4047
}
103,840✔
4048

4049
int32_t extractKeysLen(const SArray* keys, int32_t* pLen) {
4050
  int32_t code = TSDB_CODE_SUCCESS;
103,840✔
4051
  int32_t lino = 0;
103,840✔
4052
  int32_t len = 0;
103,840✔
4053
  int32_t keyNum = taosArrayGetSize(keys);
103,840✔
4054
  for (int32_t i = 0; i < keyNum; ++i) {
103,840✔
4055
    SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
264,000✔
4056
    QUERY_CHECK_NULL(pCol, code, lino, _end, terrno);
160,160✔
4057
    len += pCol->bytes;
160,160✔
4058
  }
160,160✔
4059
  len += sizeof(int8_t) * keyNum;  // null flag
4060
  *pLen = len;
103,840✔
4061

103,840✔
4062
_end:
4063
  if (code != TSDB_CODE_SUCCESS) {
103,840✔
4064
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
103,840✔
UNCOV
4065
  }
×
4066
  return code;
4067
}
103,840✔
4068

4069
int32_t parseErrorMsgFromAnalyticServer(SJson* pJson, const char* pId) {
4070
  int32_t code = TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
4071
  if (pJson == NULL) {
×
4072
    return code;
×
UNCOV
4073
  }
×
4074

4075
  char    pMsg[1024] = {0};
4076
  int32_t ret = tjsonGetStringValue(pJson, "msg", pMsg);
×
UNCOV
4077

×
4078
  if (ret == 0) {
4079
    qError("%s failed to exec imputation, msg:%s", pId, pMsg);
×
4080
    if (strstr(pMsg, "white noise") != NULL) {
×
4081
      code = TSDB_CODE_ANA_WN_DATA;
×
4082
    } else if (strstr(pMsg, "white-noise") != NULL) {
×
4083
      code = TSDB_CODE_ANA_WN_DATA;
×
4084
    } else if (strstr(pMsg, "[Errno 111] Connection refused") != NULL) {
×
4085
      code = TSDB_CODE_ANA_ALGO_NOT_LOAD;
×
UNCOV
4086
    }
×
4087
  } else {
4088
    qError("%s failed to extract msg from server, unknown error", pId);
UNCOV
4089
  }
×
4090

4091
  return code;
UNCOV
4092
}
×
4093

4094

4095
int32_t createBlockFromRemoteValueNode(SSDataBlock** ppBlock, SRemoteValueNode* pRemote) {
4096
  SValueNode* pVal = (SValueNode*)pRemote;
25,399,091✔
4097
  int32_t code = 0;
25,399,091✔
4098
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
25,399,091✔
4099
  if (pBlock == NULL) {
25,399,091✔
4100
    return terrno;
25,393,852✔
UNCOV
4101
  }
×
4102

4103
  pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
4104
  if (pBlock->pDataBlock == NULL) {
25,393,852✔
4105
    code = terrno;
25,397,992✔
4106
    taosMemoryFree(pBlock);
×
4107
    return code;
×
UNCOV
4108
  }
×
4109

4110
  SColumnInfoData idata =
4111
      createColumnInfoData(pVal->node.resType.type, pVal->node.resType.bytes, 0);
25,396,555✔
4112
  idata.info.scale = pVal->node.resType.scale;
25,396,831✔
4113
  idata.info.precision = pVal->node.resType.precision;
25,400,420✔
4114

25,401,224✔
4115
  code = blockDataAppendColInfo(pBlock, &idata);
4116
  if (code != TSDB_CODE_SUCCESS) {
25,399,965✔
4117
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
25,398,358✔
4118
    blockDataDestroy(pBlock);
×
4119
    *ppBlock = NULL;
×
4120
    return code;
×
UNCOV
4121
  }
×
4122

4123
  *ppBlock = pBlock;
4124

25,398,358✔
4125
  return code;
4126
}
25,398,626✔
4127

4128

4129
int32_t extractSingleRspBlock(SRetrieveTableRsp* pRetrieveRsp, SSDataBlock* pb) {
4130
  int32_t            code = TSDB_CODE_SUCCESS;
25,399,072✔
4131
  int32_t            lino = 0;
25,399,072✔
4132
  void*              decompBuf = NULL;
25,399,072✔
4133

25,399,072✔
4134
  char* pNextStart = pRetrieveRsp->data;
4135
  char* pStart = pNextStart;
25,399,072✔
4136

25,399,244✔
4137
  int32_t index = 0;
4138

25,402,379✔
4139
  if (pRetrieveRsp->compressed) {  // decompress the data
4140
    decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
25,402,379✔
4141
    QUERY_CHECK_NULL(decompBuf, code, lino, _end, terrno);
×
UNCOV
4142
  }
×
4143

4144
  int32_t compLen = *(int32_t*)pStart;
4145
  pStart += sizeof(int32_t);
25,393,520✔
4146

25,398,082✔
4147
  int32_t rawLen = *(int32_t*)pStart;
4148
  pStart += sizeof(int32_t);
25,403,335✔
4149
  QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
25,400,925✔
4150

25,395,045✔
4151
  pNextStart = pStart + compLen;
4152
  if (pRetrieveRsp->compressed && (compLen < rawLen)) {
25,395,045✔
4153
    int32_t t = tsDecompressString(pStart, compLen, 1, decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
25,394,866✔
4154
    QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
4155
    pStart = decompBuf;
×
UNCOV
4156
  }
×
4157

4158
  code = blockDecodeInternal(pb, pStart, (const char**)&pStart);
4159
  if (code != 0) {
25,399,250✔
4160
    taosMemoryFreeClear(pRetrieveRsp);
25,395,418✔
4161
    goto _end;
×
UNCOV
4162
  }
×
4163

4164
_end:
4165
  if (code != TSDB_CODE_SUCCESS) {
25,395,418✔
4166
    blockDataDestroy(pb);
25,395,686✔
4167
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
4168
  }
×
4169
  return code;
4170
}
25,395,686✔
4171

4172
int32_t setValueFromResBlock(STaskSubJobCtx* ctx, SRemoteValueNode* pRes, SSDataBlock* pBlock) {
4173
  int32_t code = 0;
25,395,500✔
4174
  bool needFree = true;
25,395,500✔
4175
  int32_t colNum = taosArrayGetSize(pBlock->pDataBlock);
25,395,500✔
4176
  if (NULL == pBlock->pDataBlock || 1 != colNum || pBlock->info.rows > 1) {
25,398,894✔
4177
    qError("%s invalid scl fetch res block, pDataBlock:%p, colNum:%d, rows:%" PRId64, 
25,389,827✔
4178
      ctx->idStr, pBlock->pDataBlock, colNum, pBlock->info.rows);
9,664✔
4179
    return TSDB_CODE_PAR_INVALID_SCALAR_SUBQ_RES_ROWS;
UNCOV
4180
  }
×
4181
  
4182
  pRes->val.node.type = QUERY_NODE_VALUE;
4183
  pRes->val.flag &= (~VALUE_FLAG_VAL_UNSET);
25,381,623✔
4184
  pRes->val.translate = true;
25,394,160✔
4185
  
25,394,489✔
4186
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0);
4187
  if (colDataIsNull_s(pCol, 0)) {
25,391,157✔
4188
    pRes->val.isNull = true;
25,390,151✔
4189
  } else {
1,785,053✔
4190
    code = nodesSetValueNodeValueExt(&pRes->val, colDataGetData(pCol, 0), &needFree);
4191
  }
23,605,098✔
4192

4193
  if (!needFree) {
4194
    pCol->pData = NULL;
25,389,889✔
4195
  }
14,784✔
4196

4197
  return code;
4198
}
25,389,889✔
4199

4200
int32_t remoteFetchCallBack(void* param, SDataBuf* pMsg, int32_t code) {
4201
  SScalarFetchParam* pParam = (SScalarFetchParam*)param;
49,054,516✔
4202
  STaskSubJobCtx* ctx = pParam->pSubJobCtx;
49,054,516✔
4203
  SSDataBlock* pResBlock = NULL;
49,054,516✔
4204
  
49,059,349✔
4205
  taosMemoryFreeClear(pMsg->pEpSet);
4206

49,057,264✔
4207
  if (NULL == ctx) {
4208
    qWarn("scl fetch ctx not exists since it may have been released");
49,055,561✔
4209
    goto _exit;
37,808✔
4210
  }
37,808✔
4211

4212
  qDebug("%s subQIdx %d got rsp, code:%d, rsp:%p", ctx->idStr, pParam->subQIdx, code, pMsg->pData);
4213

49,017,753✔
4214
  taosWLockLatch(&ctx->lock);
4215
  ctx->param = NULL;
49,017,753✔
4216
  taosWUnLockLatch(&ctx->lock);
49,010,991✔
4217

49,018,710✔
4218
  if (ctx->transporterId > 0) {
4219
    int32_t ret = asyncFreeConnById(ctx->rpcHandle, ctx->transporterId);
49,026,108✔
4220
    if (ret != 0) {
49,015,173✔
4221
      qDebug("%s failed to free subQ rpc handle, code:%s, subQIdx:%d", ctx->idStr, tstrerror(ret), pParam->subQIdx);
49,018,909✔
UNCOV
4222
    }
×
4223
    ctx->transporterId = -1;
4224
  }
49,018,909✔
4225

4226
  if (0 == code && NULL == pMsg->pData) {
4227
    qError("%s invalid rsp msg, msgType:%d, len:%d", ctx->idStr, pMsg->msgType, pMsg->len);
49,037,020✔
4228
    code = TSDB_CODE_QRY_INVALID_MSG;
×
UNCOV
4229
  }
×
4230

4231
  if (code == TSDB_CODE_SUCCESS) {
4232
    SRetrieveTableRsp* pRsp = pMsg->pData;
49,012,502✔
4233
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
36,439,024✔
4234
    pRsp->compLen = htonl(pRsp->compLen);
36,443,017✔
4235
    pRsp->payloadLen = htonl(pRsp->payloadLen);
36,440,686✔
4236
    pRsp->numOfCols = htonl(pRsp->numOfCols);
36,436,223✔
4237
    pRsp->useconds = htobe64(pRsp->useconds);
36,435,677✔
4238
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
36,431,920✔
4239

36,438,382✔
4240
    if (pRsp->numOfRows > 1 || pRsp->numOfBlocks > 1 || !pRsp->completed) {
4241
      qError("%s invalid scl fetch rsp received, subQIdx:%d, rows:%" PRId64 ", blocks:%d, completed:%d", 
36,433,286✔
4242
        ctx->idStr, pParam->subQIdx, pRsp->numOfRows, pRsp->numOfBlocks, pRsp->completed);
920,640✔
4243
      ctx->code = TSDB_CODE_PAR_INVALID_SCALAR_SUBQ_RES_ROWS;
4244
    } else if (0 == pRsp->numOfRows) {
920,908✔
4245
      SRemoteValueNode* pRemote = (SRemoteValueNode*)pParam->pRes;
35,517,517✔
4246
      pRemote->val.node.type = QUERY_NODE_VALUE;
10,123,729✔
4247
      pRemote->val.isNull = true;
10,123,274✔
4248
      pRemote->val.translate = true;
10,122,364✔
4249
      pRemote->val.flag &= (~VALUE_FLAG_VAL_UNSET);
10,122,819✔
4250
      taosArraySet(ctx->subResValues, pParam->subQIdx, &pParam->pRes);
10,123,274✔
4251
    } else {
10,122,364✔
4252
      qDebug("%s scl fetch rsp received, subQIdx:%d, rows:%" PRId64 , ctx->idStr, pParam->subQIdx, pRsp->numOfRows);
4253
      ctx->code = createBlockFromRemoteValueNode(&pResBlock, pParam->pRes);
25,398,451✔
4254
      if (TSDB_CODE_SUCCESS == ctx->code) {
25,398,451✔
4255
        ctx->code = blockDataEnsureCapacity(pResBlock, 1);
25,399,414✔
4256
      }
25,398,886✔
4257
      if (TSDB_CODE_SUCCESS == ctx->code) {
4258
        ctx->code = extractSingleRspBlock(pRsp, pResBlock);
25,398,861✔
4259
      }
25,402,717✔
4260
      if (TSDB_CODE_SUCCESS == ctx->code) {
4261
        ctx->code = setValueFromResBlock(ctx, pParam->pRes, pResBlock);
25,397,164✔
4262
      }
25,395,755✔
4263
      if (TSDB_CODE_SUCCESS == ctx->code) {
4264
        taosArraySet(ctx->subResValues, pParam->subQIdx, &pParam->pRes);
25,395,305✔
4265
      }
25,388,968✔
4266
    }
4267
  } else {
4268
    ctx->code = rpcCvtErrCode(code);
4269
    if (ctx->code != code) {
12,573,478✔
4270
      qError("%s scl fetch rsp received, subQIdx:%d, error:%s, cvted error: %s", ctx->idStr, pParam->subQIdx,
12,580,161✔
UNCOV
4271
             tstrerror(code), tstrerror(ctx->code));
×
4272
    } else {
4273
      qError("%s scl fetch rsp received, subQIdx:%d, error:%s", ctx->idStr, pParam->subQIdx, tstrerror(code));
4274
    }
12,571,351✔
4275
  }
4276
  
4277
  code = tsem_post(&pParam->pSubJobCtx->ready);
4278
  if (code != TSDB_CODE_SUCCESS) {
49,014,344✔
4279
    qError("failed to invoke post when scl fetch rsp is ready, code:%s", tstrerror(code));
49,026,645✔
UNCOV
4280
  }
×
4281

4282
_exit:
4283

49,064,453✔
4284
  taosMemoryFree(pMsg->pData);
4285
  blockDataDestroy(pResBlock);
49,062,317✔
4286

49,055,886✔
4287
  return code;
4288
}
49,053,987✔
4289

4290

4291
int32_t fetchRemoteValueImpl(STaskSubJobCtx* ctx, int32_t subQIdx, SRemoteValueNode* pRes) {
4292
  int32_t          code = TSDB_CODE_SUCCESS;
48,965,213✔
4293
  int32_t          lino = 0;
48,965,213✔
4294
  SDownstreamSourceNode* pSource = (SDownstreamSourceNode*)taosArrayGetP(ctx->subEndPoints, subQIdx);
48,965,213✔
4295

48,965,213✔
4296
  SResFetchReq req = {0};
4297
  req.header.vgId = pSource->addr.nodeId;
48,936,378✔
4298
  req.sId = pSource->sId;
48,913,821✔
4299
  req.clientId = pSource->clientId;
48,918,693✔
4300
  req.taskId = pSource->taskId;
48,978,435✔
4301
  req.queryId = ctx->queryId;
48,945,687✔
4302
  req.execId = pSource->execId;
48,939,766✔
4303

48,899,572✔
4304
  int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, false);
4305
  if (msgSize < 0) {
48,860,915✔
4306
    return msgSize;
48,892,434✔
UNCOV
4307
  }
×
4308

4309
  void* msg = taosMemoryCalloc(1, msgSize);
4310
  if (NULL == msg) {
48,892,434✔
4311
    return terrno;
48,795,115✔
UNCOV
4312
  }
×
4313

4314
  msgSize = tSerializeSResFetchReq(msg, msgSize, &req, false);
4315
  if (msgSize < 0) {
48,795,115✔
4316
    taosMemoryFree(msg);
48,951,338✔
4317
    return msgSize;
×
UNCOV
4318
  }
×
4319

4320
  qDebug("%s scl build fetch msg and send to nodeId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
4321
         ", execId:%d",
48,951,338✔
4322
         ctx->idStr, pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
4323
         pSource->taskId, pSource->execId);
4324

4325
  // send the fetch remote task result reques
4326
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
4327
  if (NULL == pMsgSendInfo) {
48,959,504✔
4328
    taosMemoryFreeClear(msg);
48,856,329✔
4329
    qError("%s prepare message %d failed", ctx->idStr, (int32_t)sizeof(SMsgSendInfo));
×
4330
    return terrno;
×
UNCOV
4331
  }
×
4332

4333
  SScalarFetchParam* param = taosMemoryMalloc(sizeof(SScalarFetchParam));
4334
  if (NULL == param) {
48,856,329✔
4335
    taosMemoryFreeClear(msg);
48,856,836✔
4336
    taosMemoryFreeClear(pMsgSendInfo);
×
4337
    qError("%s prepare param %d failed", ctx->idStr, (int32_t)sizeof(SScalarFetchParam));
×
4338
    return terrno;
×
UNCOV
4339
  }
×
4340

4341
  taosWLockLatch(&ctx->lock);
4342
  
48,856,836✔
4343
  if (ctx->code) {
4344
    qError("task has been killed, error:%s", tstrerror(ctx->code));
49,012,143✔
4345
    taosMemoryFree(param);
804✔
4346
    code = ctx->code;
804✔
4347
    taosWUnLockLatch(&ctx->lock);
804✔
4348
    goto _end;
804✔
4349
  } else {
804✔
4350
    ctx->param = param;
4351
  }
48,900,151✔
4352
  
4353
  taosWUnLockLatch(&ctx->lock);
4354

48,978,801✔
4355
  param->subQIdx = subQIdx;
4356
  param->pRes = pRes;
49,036,593✔
4357
  param->pSubJobCtx = ctx;
49,041,236✔
4358

49,033,660✔
4359
  pMsgSendInfo->param = param;
4360
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
48,991,209✔
4361
  pMsgSendInfo->msgInfo.pData = msg;
48,972,300✔
4362
  pMsgSendInfo->msgInfo.len = msgSize;
48,917,679✔
4363
  pMsgSendInfo->msgType = pSource->fetchMsgType;
48,841,286✔
4364
  pMsgSendInfo->fp = remoteFetchCallBack;
49,000,517✔
4365
  pMsgSendInfo->requestId = ctx->queryId;
48,867,940✔
4366

48,841,002✔
4367
  code = asyncSendMsgToServer(ctx->rpcHandle, &pSource->addr.epSet, &ctx->transporterId, pMsgSendInfo);
4368
  QUERY_CHECK_CODE(code, lino, _end);
49,007,628✔
4369

49,044,249✔
4370
  code = qSemWait(ctx->pTaskInfo, &ctx->ready);
4371
  if (isTaskKilled(ctx->pTaskInfo)) {
49,044,249✔
4372
    code = getTaskCode(ctx->pTaskInfo);
49,074,142✔
4373
  } else {
68,182✔
4374
    code = ctx->code;
4375
  }
49,004,806✔
4376
      
4377
_end:
4378

49,072,801✔
4379
  taosWLockLatch(&ctx->lock);
4380
  ctx->param = NULL;
49,070,926✔
4381
  taosWUnLockLatch(&ctx->lock);
49,073,629✔
4382

49,073,629✔
4383
  if (code != TSDB_CODE_SUCCESS) {
4384
    qError("%s %s failed at line %d since %s", ctx->idStr, __func__, lino, tstrerror(code));
49,071,811✔
4385
  }
13,544,130✔
4386
  return code;
4387
}
49,072,825✔
4388

4389

4390
int32_t qFetchRemoteValue(void* pCtx, int32_t subQIdx, SRemoteValueNode* pRes) {
4391
  STaskSubJobCtx*  ctx = (STaskSubJobCtx*)pCtx;
50,272,551✔
4392
  int32_t code = 0, lino = 0;
50,272,551✔
4393
  int32_t       subEndPoinsNum = taosArrayGetSize(ctx->subEndPoints);
50,272,551✔
4394
  if (subQIdx >= subEndPoinsNum) {
50,272,551✔
4395
    qError("%s invalid subQIdx %d, subEndPointsNum:%d", ctx->idStr, subQIdx, subEndPoinsNum);
50,094,669✔
4396
    return TSDB_CODE_QRY_SUBQ_NOT_FOUND;
×
UNCOV
4397
  }
×
4398

4399
  SValueNode** ppRes = taosArrayGet(ctx->subResValues, subQIdx);
4400
  if (NULL == *ppRes) {
50,094,669✔
4401
    TAOS_CHECK_EXIT(fetchRemoteValueImpl(ctx, subQIdx, pRes));
50,270,448✔
4402
    *ppRes = (SValueNode*)pRes;
49,011,143✔
4403
  } else {
35,525,618✔
4404
    TAOS_CHECK_EXIT(valueNodeCopy(*ppRes, &pRes->val));
4405
    pRes->val.node.type = QUERY_NODE_VALUE;
1,308,446✔
4406
  }
1,308,446✔
4407

4408
_exit:
4409

50,381,003✔
4410
  if (code) {
4411
    qError("%s %s failed at line %d since %s", ctx->idStr, __func__, lino, tstrerror(code));
50,381,003✔
4412
  }
13,545,144✔
4413

4414
  return code;
4415
}
50,381,994✔
4416

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc