• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4945

30 Jan 2026 06:19AM UTC coverage: 66.87% (+0.02%) from 66.849%
#4945

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1126 of 2018 new or added lines in 72 files covered. (55.8%)

13708 existing lines in 159 files now uncovered.

205277 of 306978 relevant lines covered (66.87%)

126353544.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.58
/source/libs/executor/src/executil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "function.h"
17
#include "functionMgt.h"
18
#include "index.h"
19
#include "os.h"
20
#include "query.h"
21
#include "querynodes.h"
22
#include "taoserror.h"
23
#include "tarray.h"
24
#include "tcompare.h"
25
#include "tdatablock.h"
26
#include "thash.h"
27
#include "tmsg.h"
28
#include "ttime.h"
29

30
#include "executil.h"
31
#include "executorInt.h"
32
#include "querytask.h"
33
#include "storageapi.h"
34
#include "tutil.h"
35
#include "tjson.h"
36
#include "trpc.h"
37
#include "filter.h"
38

39
typedef struct tagFilterAssist {
40
  SHashObj* colHash;
41
  int32_t   index;
42
  SArray*   cInfoList;
43
  int32_t   code;
44
} tagFilterAssist;
45

46
typedef struct STransTagExprCtx {
47
  int32_t      code;
48
  SMetaReader* pReader;
49
} STransTagExprCtx;
50

51
typedef enum {
52
  FILTER_NO_LOGIC = 1,
53
  FILTER_AND,
54
  FILTER_OTHER,
55
} FilterCondType;
56

57
static FilterCondType checkTagCond(SNode* cond);
58
static bool optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI);
59
static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI,
60
                                        uint64_t suid);
61

62
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
63
                            STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI, void* pStreamInfo);
64

65
static int64_t getLimit(const SNode* pLimit) {
934,084,728✔
66
  return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->limit) ? -1 : ((SLimitNode*)pLimit)->limit->datum.i;
934,084,728✔
67
}
68
static int64_t getOffset(const SNode* pLimit) {
934,080,416✔
69
  return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->offset) ? -1 : ((SLimitNode*)pLimit)->offset->datum.i;
934,080,416✔
70
}
71
static void releaseColInfoData(void* pCol);
72
int32_t sendFetchRemoteNodeReq(STaskSubJobCtx* ctx, int32_t subQIdx, SNode* pRes);
73

74
void initResultRowInfo(SResultRowInfo* pResultRowInfo) {
324,229,523✔
75
  pResultRowInfo->size = 0;
324,229,523✔
76
  pResultRowInfo->cur.pageId = -1;
324,270,825✔
77
}
324,322,160✔
78

79
void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; }
5,268,729✔
80

81
void resetResultRow(SResultRow* pResultRow, size_t entrySize) {
879,664,478✔
82
  pResultRow->numOfRows = 0;
879,664,478✔
83
  pResultRow->nOrigRows = 0;
879,710,717✔
84
  pResultRow->closed = false;
879,709,289✔
85
  pResultRow->endInterp = false;
879,710,543✔
86
  pResultRow->startInterp = false;
879,712,110✔
87

88
  if (entrySize > 0) {
879,712,110✔
89
    memset(pResultRow->pEntryInfo, 0, entrySize);
879,713,259✔
90
  }
91
}
879,713,850✔
92

93
// TODO refactor: use macro
94
SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) {
2,147,483,647✔
95
  return (SResultRowEntryInfo*)((char*)pRow->pEntryInfo + offset[index]);
2,147,483,647✔
96
}
97

98
size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
197,406,148✔
99
  int32_t rowSize = (numOfOutput * sizeof(SResultRowEntryInfo)) + sizeof(SResultRow);
197,406,148✔
100

101
  for (int32_t i = 0; i < numOfOutput; ++i) {
987,734,651✔
102
    rowSize += pCtx[i].resDataInfo.interBufSize;
790,373,145✔
103
  }
104

105
  return rowSize;
197,361,506✔
106
}
107

108
// Convert buf read from rocksdb to result row
UNCOV
109
int32_t getResultRowFromBuf(SExprSupp* pSup, const char* inBuf, size_t inBufSize, char** outBuf, size_t* outBufSize) {
×
110
  if (inBuf == NULL || pSup == NULL) {
×
111
    qError("invalid input parameters, inBuf:%p, pSup:%p", inBuf, pSup);
×
112
    return TSDB_CODE_INVALID_PARA;
×
113
  }
UNCOV
114
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
115
  int32_t*        offset = pSup->rowEntryInfoOffset;
×
116
  SResultRow*     pResultRow = NULL;
×
117
  size_t          processedSize = 0;
×
118
  int32_t         code = TSDB_CODE_SUCCESS;
×
119

120
  // calculate the size of output buffer
UNCOV
121
  *outBufSize = getResultRowSize(pCtx, pSup->numOfExprs);
×
122
  *outBuf = taosMemoryMalloc(*outBufSize);
×
123
  if (*outBuf == NULL) {
×
124
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
125
    return terrno;
×
126
  }
UNCOV
127
  pResultRow = (SResultRow*)*outBuf;
×
128
  (void)memcpy(pResultRow, inBuf, sizeof(SResultRow));
×
129
  inBuf += sizeof(SResultRow);
×
130
  processedSize += sizeof(SResultRow);
×
131

UNCOV
132
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
×
133
    int32_t len = *(int32_t*)inBuf;
×
134
    inBuf += sizeof(int32_t);
×
135
    processedSize += sizeof(int32_t);
×
136
    if (pResultRow->version != FUNCTION_RESULT_INFO_VERSION && pCtx->fpSet.decode) {
×
137
      code = pCtx->fpSet.decode(&pCtx[i], inBuf, getResultEntryInfo(pResultRow, i, offset), pResultRow->version);
×
138
      if (code != TSDB_CODE_SUCCESS) {
×
139
        qError("failed to decode result row, code:%d", code);
×
140
        return code;
×
141
      }
142
    } else {
UNCOV
143
      (void)memcpy(getResultEntryInfo(pResultRow, i, offset), inBuf, len);
×
144
    }
UNCOV
145
    inBuf += len;
×
146
    processedSize += len;
×
147
  }
148

UNCOV
149
  if (processedSize < inBufSize) {
×
150
    // stream stores extra data after result row
UNCOV
151
    size_t leftLen = inBufSize - processedSize;
×
152
    TAOS_MEMORY_REALLOC(*outBuf, *outBufSize + leftLen);
×
153
    if (*outBuf == NULL) {
×
154
      qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen);
×
155
      return terrno;
×
156
    }
UNCOV
157
    (void)memcpy(*outBuf + *outBufSize, inBuf, leftLen);
×
158
    inBuf += leftLen;
×
159
    processedSize += leftLen;
×
160
    *outBufSize += leftLen;
×
161
  }
162

UNCOV
163
  qTrace("[StreamInternal] get result inBufSize:%zu, outBufSize:%zu", inBufSize, *outBufSize);
×
164
  return TSDB_CODE_SUCCESS;
×
165
}
166

167
// Convert result row to buf for rocksdb
UNCOV
168
int32_t putResultRowToBuf(SExprSupp* pSup, const char* inBuf, size_t inBufSize, char** outBuf, size_t* outBufSize) {
×
169
  if (pSup == NULL || inBuf == NULL || outBuf == NULL || outBufSize == NULL) {
×
170
    qError("invalid input parameters, inBuf:%p, pSup:%p, outBufSize:%p, outBuf:%p", inBuf, pSup, outBufSize, outBuf);
×
171
    return TSDB_CODE_INVALID_PARA;
×
172
  }
173

UNCOV
174
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
175
  int32_t*        offset = pSup->rowEntryInfoOffset;
×
176
  SResultRow*     pResultRow = (SResultRow*)inBuf;
×
177
  size_t          rowSize = getResultRowSize(pCtx, pSup->numOfExprs);
×
178

UNCOV
179
  if (rowSize > inBufSize) {
×
180
    qError("invalid input buffer size, rowSize:%zu, inBufSize:%zu", rowSize, inBufSize);
×
181
    return TSDB_CODE_INVALID_PARA;
×
182
  }
183

184
  // calculate the size of output buffer
UNCOV
185
  *outBufSize = rowSize + sizeof(int32_t) * pSup->numOfExprs;
×
186
  if (rowSize < inBufSize) {
×
187
    *outBufSize += inBufSize - rowSize;
×
188
  }
189

UNCOV
190
  *outBuf = taosMemoryMalloc(*outBufSize);
×
191
  if (*outBuf == NULL) {
×
192
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
193
    return terrno;
×
194
  }
195

UNCOV
196
  char* pBuf = *outBuf;
×
197
  pResultRow->version = FUNCTION_RESULT_INFO_VERSION;
×
198
  (void)memcpy(pBuf, pResultRow, sizeof(SResultRow));
×
199
  pBuf += sizeof(SResultRow);
×
200
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
×
201
    size_t len = sizeof(SResultRowEntryInfo) + pCtx[i].resDataInfo.interBufSize;
×
202
    *(int32_t*)pBuf = (int32_t)len;
×
203
    pBuf += sizeof(int32_t);
×
204
    (void)memcpy(pBuf, getResultEntryInfo(pResultRow, i, offset), len);
×
205
    pBuf += len;
×
206
  }
207

UNCOV
208
  if (rowSize < inBufSize) {
×
209
    // stream stores extra data after result row
UNCOV
210
    size_t leftLen = inBufSize - rowSize;
×
211
    (void)memcpy(pBuf, inBuf + rowSize, leftLen);
×
212
    pBuf += leftLen;
×
213
  }
214

UNCOV
215
  qTrace("[StreamInternal] put result inBufSize:%zu, outBufSize:%zu", inBufSize, *outBufSize);
×
216
  return TSDB_CODE_SUCCESS;
×
217
}
218

UNCOV
219
static void freeEx(void* p) { taosMemoryFree(*(void**)p); }
×
220

221
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
107,546,850✔
222
  taosMemoryFreeClear(pGroupResInfo->pBuf);
107,546,850✔
223
  if (pGroupResInfo->freeItem) {
107,543,155✔
224
    //    taosArrayDestroy(pGroupResInfo->pRows);
UNCOV
225
    taosArrayDestroyEx(pGroupResInfo->pRows, freeEx);
×
226
    pGroupResInfo->freeItem = false;
×
227
    pGroupResInfo->pRows = NULL;
×
228
  } else {
229
    taosArrayDestroy(pGroupResInfo->pRows);
107,541,096✔
230
    pGroupResInfo->pRows = NULL;
107,527,352✔
231
  }
232
  pGroupResInfo->index = 0;
107,531,669✔
233
  pGroupResInfo->delIndex = 0;
107,533,138✔
234
}
107,538,282✔
235

236
int32_t resultrowComparAsc(const void* p1, const void* p2) {
2,147,483,647✔
237
  SResKeyPos* pp1 = *(SResKeyPos**)p1;
2,147,483,647✔
238
  SResKeyPos* pp2 = *(SResKeyPos**)p2;
2,147,483,647✔
239

240
  if (pp1->groupId == pp2->groupId) {
2,147,483,647✔
241
    int64_t pts1 = *(int64_t*)pp1->key;
2,147,483,647✔
242
    int64_t pts2 = *(int64_t*)pp2->key;
2,147,483,647✔
243

244
    if (pts1 == pts2) {
2,147,483,647✔
UNCOV
245
      return 0;
×
246
    } else {
247
      return pts1 < pts2 ? -1 : 1;
2,147,483,647✔
248
    }
249
  } else {
250
    return pp1->groupId < pp2->groupId ? -1 : 1;
2,147,483,647✔
251
  }
252
}
253

254
static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); }
2,147,483,647✔
255

256
int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) {
78,795,314✔
257
  int32_t code = TSDB_CODE_SUCCESS;
78,795,314✔
258
  int32_t lino = 0;
78,795,314✔
259
  if (pGroupResInfo->pRows != NULL) {
78,795,314✔
260
    taosArrayDestroy(pGroupResInfo->pRows);
5,106,574✔
261
  }
262
  if (pGroupResInfo->pBuf) {
78,800,458✔
263
    taosMemoryFree(pGroupResInfo->pBuf);
5,108,050✔
264
    pGroupResInfo->pBuf = NULL;
5,105,590✔
265
  }
266

267
  // extract the result rows information from the hash map
268
  int32_t size = tSimpleHashGetSize(pHashmap);
78,791,142✔
269

270
  void* pData = NULL;
78,791,127✔
271
  pGroupResInfo->pRows = taosArrayInit(size, POINTER_BYTES);
78,791,127✔
272
  QUERY_CHECK_NULL(pGroupResInfo->pRows, code, lino, _end, terrno);
78,784,735✔
273

274
  size_t  keyLen = 0;
78,784,591✔
275
  int32_t iter = 0;
78,787,547✔
276
  int64_t bufLen = 0, offset = 0;
78,780,436✔
277

278
  // todo move away and record this during create window
279
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
2,147,483,647✔
280
    /*void* key = */ (void)tSimpleHashGetKey(pData, &keyLen);
281
    bufLen += keyLen + sizeof(SResultRowPosition);
2,147,483,647✔
282
  }
283

284
  pGroupResInfo->pBuf = taosMemoryMalloc(bufLen);
78,773,379✔
285
  QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
78,793,665✔
286

287
  iter = 0;
78,791,735✔
288
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
2,147,483,647✔
289
    void* key = tSimpleHashGetKey(pData, &keyLen);
2,147,483,647✔
290

291
    SResKeyPos* p = (SResKeyPos*)(pGroupResInfo->pBuf + offset);
2,147,483,647✔
292

293
    p->groupId = *(uint64_t*)key;
2,147,483,647✔
294
    p->pos = *(SResultRowPosition*)pData;
2,147,483,647✔
295
    memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
2,147,483,647✔
296
    void* tmp = taosArrayPush(pGroupResInfo->pRows, &p);
2,147,483,647✔
297
    QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
2,147,483,647✔
298

299
    offset += keyLen + sizeof(struct SResultRowPosition);
2,147,483,647✔
300
  }
301

302
  if (order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC) {
78,780,216✔
303
    __compar_fn_t fn = (order == TSDB_ORDER_ASC) ? resultrowComparAsc : resultrowComparDesc;
4,704,850✔
304
    size = POINTER_BYTES;
4,704,850✔
305
    taosSort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), size, fn);
4,704,850✔
306
  }
307

308
  pGroupResInfo->index = 0;
78,780,619✔
309

310
_end:
78,794,382✔
311
  if (code != TSDB_CODE_SUCCESS) {
78,798,674✔
UNCOV
312
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
313
  }
314
  return code;
78,798,674✔
315
}
316

UNCOV
317
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
×
318
  if (pGroupResInfo->pRows != NULL) {
×
319
    taosArrayDestroy(pGroupResInfo->pRows);
×
320
  }
321

UNCOV
322
  pGroupResInfo->freeItem = true;
×
323
  pGroupResInfo->pRows = pArrayList;
×
324
  pGroupResInfo->index = 0;
×
325
  pGroupResInfo->delIndex = 0;
×
326
}
×
327

328
bool hasRemainResults(SGroupResInfo* pGroupResInfo) {
319,233,371✔
329
  if (pGroupResInfo->pRows == NULL) {
319,233,371✔
UNCOV
330
    return false;
×
331
  }
332

333
  return pGroupResInfo->index < taosArrayGetSize(pGroupResInfo->pRows);
319,237,476✔
334
}
335

336
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
168,908,919✔
337
  if (pGroupResInfo->pRows == 0) {
168,908,919✔
UNCOV
338
    return 0;
×
339
  }
340

341
  return (int32_t)taosArrayGetSize(pGroupResInfo->pRows);
168,910,326✔
342
}
343

344
SArray* createSortInfo(SNodeList* pNodeList) {
48,214,660✔
345
  size_t numOfCols = 0;
48,214,660✔
346

347
  if (pNodeList != NULL) {
48,214,660✔
348
    numOfCols = LIST_LENGTH(pNodeList);
48,175,590✔
349
  } else {
350
    numOfCols = 0;
39,153✔
351
  }
352

353
  SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
48,215,229✔
354
  if (pList == NULL) {
48,204,439✔
UNCOV
355
    return pList;
×
356
  }
357

358
  for (int32_t i = 0; i < numOfCols; ++i) {
110,015,616✔
359
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)nodesListGetNode(pNodeList, i);
61,803,006✔
360
    if (!pSortKey) {
61,808,362✔
UNCOV
361
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
362
      taosArrayDestroy(pList);
×
363
      pList = NULL;
×
364
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
365
      break;
×
366
    }
367
    SBlockOrderInfo bi = {0};
61,808,362✔
368
    bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
61,804,485✔
369
    bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
61,805,567✔
370

371
    if (nodeType(pSortKey->pExpr) != QUERY_NODE_COLUMN) {
61,800,251✔
UNCOV
372
      qError("invalid order by expr type:%d", nodeType(pSortKey->pExpr));
×
373
      taosArrayDestroy(pList);
×
374
      pList = NULL;
×
375
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
376
      break;
×
377
    }
378
    
379
    SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr;
61,787,766✔
380
    bi.slotId = pColNode->slotId;
61,808,276✔
381
    void* tmp = taosArrayPush(pList, &bi);
61,811,658✔
382
    if (!tmp) {
61,811,658✔
UNCOV
383
      taosArrayDestroy(pList);
×
384
      pList = NULL;
×
385
      break;
×
386
    }
387
  }
388

389
  return pList;
48,212,739✔
390
}
391

392
SSDataBlock* createDataBlockFromDescNode(void* p) {
565,406,711✔
393
  SDataBlockDescNode* pNode = (SDataBlockDescNode*)p;
565,406,711✔
394
  int32_t      numOfCols = LIST_LENGTH(pNode->pSlots);
565,406,711✔
395
  SSDataBlock* pBlock = NULL;
565,567,556✔
396
  int32_t      code = createDataBlock(&pBlock);
565,558,730✔
397
  if (code) {
565,403,422✔
UNCOV
398
    terrno = code;
×
399
    return NULL;
×
400
  }
401

402
  pBlock->info.id.blockId = pNode->dataBlockId;
565,403,422✔
403
  pBlock->info.type = STREAM_INVALID;
565,357,436✔
404
  pBlock->info.calWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
565,413,499✔
405
  pBlock->info.watermark = INT64_MIN;
565,582,920✔
406

407
  for (int32_t i = 0; i < numOfCols; ++i) {
2,147,483,647✔
408
    SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
2,147,483,647✔
409
    if (!pDescNode) {
2,147,483,647✔
UNCOV
410
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
411
      blockDataDestroy(pBlock);
×
412
      pBlock = NULL;
×
413
      terrno = TSDB_CODE_INVALID_PARA;
×
414
      break;
×
415
    }
416
    SColumnInfoData idata =
2,147,483,647✔
417
        createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
2,147,483,647✔
418
    idata.info.scale = pDescNode->dataType.scale;
2,147,483,647✔
419
    idata.info.precision = pDescNode->dataType.precision;
2,147,483,647✔
420
    idata.info.noData = pDescNode->reserve;
2,147,483,647✔
421

422
    code = blockDataAppendColInfo(pBlock, &idata);
2,147,483,647✔
423
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
424
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
30✔
425
      blockDataDestroy(pBlock);
30✔
UNCOV
426
      pBlock = NULL;
×
427
      terrno = code;
×
428
      break;
×
429
    }
430
  }
431

432
  return pBlock;
565,772,524✔
433
}
434

435
int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) {
177,508,571✔
436
  SDataBlockInfo* pBlockInfo = &pDataBlock->info;
177,508,571✔
437

438
  for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) {
899,687,294✔
439
    SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i);
738,772,134✔
440
    if (!pItem) {
738,575,496✔
UNCOV
441
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
442
      return terrno;
×
443
    }
444

445
    if (pItem->isPk) {
738,575,496✔
446
      SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId);
16,748,500✔
447
      if (!pInfoData) {
16,339,489✔
UNCOV
448
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
449
        return terrno;
×
450
      }
451
      pBlockInfo->pks[0].type = pInfoData->info.type;
16,339,489✔
452
      pBlockInfo->pks[1].type = pInfoData->info.type;
16,356,039✔
453

454
      // allocate enough buffer size, which is pInfoData->info.bytes
455
      if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
16,350,021✔
456
        pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
5,139,263✔
457
        if (pBlockInfo->pks[0].pData == NULL) {
5,127,221✔
UNCOV
458
          return terrno;
×
459
        }
460

461
        pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
5,128,433✔
462
        if (pBlockInfo->pks[1].pData == NULL) {
5,125,417✔
UNCOV
463
          taosMemoryFreeClear(pBlockInfo->pks[0].pData);
×
464
          return terrno;
×
465
        }
466

467
        pBlockInfo->pks[0].nData = pInfoData->info.bytes;
5,130,941✔
468
        pBlockInfo->pks[1].nData = pInfoData->info.bytes;
5,132,801✔
469
      }
470

471
      break;
16,360,005✔
472
    }
473
  }
474

475
  return TSDB_CODE_SUCCESS;
177,326,516✔
476
}
477

478
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
2,350,908✔
479
  STransTagExprCtx* pCtx = pContext;
2,350,908✔
480
  SMetaReader*      mr = pCtx->pReader;
2,350,908✔
481
  bool              isTagCol = false, isTbname = false;
2,350,908✔
482
  if (nodeType(*pNode) == QUERY_NODE_COLUMN) {
2,350,908✔
483
    SColumnNode* pCol = (SColumnNode*)*pNode;
671,688✔
484
    if (pCol->colType == COLUMN_TYPE_TBNAME)
671,688✔
UNCOV
485
      isTbname = true;
×
486
    else
487
      isTagCol = true;
671,688✔
488
  } else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) {
1,679,220✔
UNCOV
489
    SFunctionNode* pFunc = (SFunctionNode*)*pNode;
×
490
    if (pFunc->funcType == FUNCTION_TYPE_TBNAME) isTbname = true;
×
491
  }
492
  if (isTagCol) {
2,350,908✔
493
    SColumnNode* pSColumnNode = *(SColumnNode**)pNode;
671,688✔
494

495
    SValueNode* res = NULL;
671,688✔
496
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
671,688✔
497
    if (NULL == res) {
671,688✔
UNCOV
498
      return DEAL_RES_ERROR;
×
499
    }
500

501
    res->translate = true;
671,688✔
502
    res->node.resType = pSColumnNode->node.resType;
671,688✔
503

504
    STagVal tagVal = {0};
671,688✔
505
    tagVal.cid = pSColumnNode->colId;
671,688✔
506
    const char* p = mr->pAPI->extractTagVal(mr->me.ctbEntry.pTags, pSColumnNode->node.resType.type, &tagVal);
671,688✔
507
    if (p == NULL) {
671,688✔
UNCOV
508
      res->node.resType.type = TSDB_DATA_TYPE_NULL;
×
509
    } else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) {
671,688✔
UNCOV
510
      int32_t len = ((const STag*)p)->len;
×
511
      res->datum.p = taosMemoryCalloc(len + 1, 1);
×
512
      if (NULL == res->datum.p) {
×
513
        return DEAL_RES_ERROR;
×
514
      }
UNCOV
515
      memcpy(res->datum.p, p, len);
×
516
    } else if (IS_VAR_DATA_TYPE(pSColumnNode->node.resType.type)) {
671,688✔
517
      if (IS_STR_DATA_BLOB(pSColumnNode->node.resType.type)) {
671,688✔
UNCOV
518
        return TSDB_CODE_BLOB_NOT_SUPPORT_TAG;
×
519
      }
520

521
      res->datum.p = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1);
671,688✔
522
      if (NULL == res->datum.p) {
671,688✔
UNCOV
523
        return DEAL_RES_ERROR;
×
524
      }
525

526
      if (IS_STR_DATA_BLOB(pSColumnNode->node.resType.type)) {
671,688✔
UNCOV
527
        memcpy(blobDataVal(res->datum.p), tagVal.pData, tagVal.nData);
×
528
        blobDataSetLen(res->datum.p, tagVal.nData);
×
529
      } else {
530
        memcpy(varDataVal(res->datum.p), tagVal.pData, tagVal.nData);
671,688✔
531
        varDataSetLen(res->datum.p, tagVal.nData);
671,688✔
532
      }
533
    } else {
UNCOV
534
      int32_t code = nodesSetValueNodeValue(res, &(tagVal.i64));
×
535
      if (code != TSDB_CODE_SUCCESS) {
×
536
        return DEAL_RES_ERROR;
×
537
      }
538
    }
539
    nodesDestroyNode(*pNode);
671,688✔
540
    *pNode = (SNode*)res;
671,688✔
541
  } else if (isTbname) {
1,679,220✔
UNCOV
542
    SValueNode* res = NULL;
×
543
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
×
544
    if (NULL == res) {
×
545
      return DEAL_RES_ERROR;
×
546
    }
547

UNCOV
548
    res->translate = true;
×
549
    res->node.resType = ((SExprNode*)(*pNode))->resType;
×
550

UNCOV
551
    int32_t len = strlen(mr->me.name);
×
552
    res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
×
553
    if (NULL == res->datum.p) {
×
554
      return DEAL_RES_ERROR;
×
555
    }
UNCOV
556
    memcpy(varDataVal(res->datum.p), mr->me.name, len);
×
557
    varDataSetLen(res->datum.p, len);
×
558
    nodesDestroyNode(*pNode);
×
559
    *pNode = (SNode*)res;
×
560
  }
561

562
  return DEAL_RES_CONTINUE;
2,350,908✔
563
}
564

565
int32_t isQualifiedTable(int64_t uid, SNode* pTagCond, void* vnode, bool* pQualified, SStorageAPI* pAPI) {
335,844✔
566
  int32_t     code = TSDB_CODE_SUCCESS;
335,844✔
567
  SMetaReader mr = {0};
335,844✔
568

569
  pAPI->metaReaderFn.initReader(&mr, vnode, META_READER_LOCK, &pAPI->metaFn);
335,844✔
570
  code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid);
335,844✔
571
  if (TSDB_CODE_SUCCESS != code) {
335,844✔
UNCOV
572
    pAPI->metaReaderFn.clearReader(&mr);
×
573
    *pQualified = false;
×
574

UNCOV
575
    return TSDB_CODE_SUCCESS;
×
576
  }
577

578
  SNode* pTagCondTmp = NULL;
335,844✔
579
  code = nodesCloneNode(pTagCond, &pTagCondTmp);
335,844✔
580
  if (TSDB_CODE_SUCCESS != code) {
335,844✔
UNCOV
581
    *pQualified = false;
×
582
    pAPI->metaReaderFn.clearReader(&mr);
×
583
    return code;
×
584
  }
585
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
335,844✔
586
  nodesRewriteExprPostOrder(&pTagCondTmp, doTranslateTagExpr, &ctx);
335,844✔
587
  pAPI->metaReaderFn.clearReader(&mr);
335,844✔
588
  if (TSDB_CODE_SUCCESS != ctx.code) {
335,844✔
UNCOV
589
    *pQualified = false;
×
590
    nodesDestroyNode(pTagCondTmp);
×
591
    terrno = code;
×
592
    return code;
×
593
  }
594

595
  SNode* pNew = NULL;
335,844✔
596
  code = scalarCalculateConstants(pTagCondTmp, &pNew);
335,844✔
597
  if (TSDB_CODE_SUCCESS != code) {
335,844✔
UNCOV
598
    terrno = code;
×
599
    nodesDestroyNode(pTagCondTmp);
×
600
    *pQualified = false;
×
601

UNCOV
602
    return code;
×
603
  }
604

605
  SValueNode* pValue = (SValueNode*)pNew;
335,844✔
606
  *pQualified = pValue->datum.b;
335,844✔
607

608
  nodesDestroyNode(pNew);
335,844✔
609
  return TSDB_CODE_SUCCESS;
335,844✔
610
}
611

612
static EDealRes getColumn(SNode** pNode, void* pContext) {
55,638,956✔
613
  tagFilterAssist* pData = (tagFilterAssist*)pContext;
55,638,956✔
614
  SColumnNode*     pSColumnNode = NULL;
55,638,956✔
615
  if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
55,645,006✔
616
    pSColumnNode = *(SColumnNode**)pNode;
18,752,519✔
617
  } else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) {
36,905,744✔
618
    SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
738,336✔
619
    if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
738,124✔
620
      pData->code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pSColumnNode);
693,335✔
621
      if (NULL == pSColumnNode) {
693,547✔
UNCOV
622
        return DEAL_RES_ERROR;
×
623
      }
624
      pSColumnNode->colId = -1;
693,547✔
625
      pSColumnNode->colType = COLUMN_TYPE_TBNAME;
693,198✔
626
      pSColumnNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
693,198✔
627
      pSColumnNode->node.resType.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
693,547✔
628
      nodesDestroyNode(*pNode);
693,547✔
629
      *pNode = (SNode*)pSColumnNode;
693,195✔
630
    } else {
631
      return DEAL_RES_CONTINUE;
44,789✔
632
    }
633
  } else {
634
    return DEAL_RES_CONTINUE;
36,163,906✔
635
  }
636

637
  void* data = taosHashGet(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId));
19,442,647✔
638
  if (!data) {
19,439,566✔
639
    int32_t tempRes =
640
        taosHashPut(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode)));
16,815,081✔
641
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
16,817,553✔
UNCOV
642
      return DEAL_RES_ERROR;
×
643
    }
644
    pSColumnNode->slotId = pData->index++;
16,817,553✔
645
    SColumnInfo cInfo = {.colId = pSColumnNode->colId,
16,816,694✔
646
                         .type = pSColumnNode->node.resType.type,
16,806,063✔
647
                         .bytes = pSColumnNode->node.resType.bytes,
16,816,669✔
648
                         .pk = pSColumnNode->isPk};
16,811,114✔
649
#if TAG_FILTER_DEBUG
650
    qDebug("tagfilter build column info, slotId:%d, colId:%d, type:%d", pSColumnNode->slotId, cInfo.colId, cInfo.type);
651
#endif
652
    void* tmp = taosArrayPush(pData->cInfoList, &cInfo);
16,814,163✔
653
    if (!tmp) {
16,821,508✔
UNCOV
654
      return DEAL_RES_ERROR;
×
655
    }
656
  } else {
657
    SColumnNode* col = *(SColumnNode**)data;
2,624,485✔
658
    pSColumnNode->slotId = col->slotId;
2,623,996✔
659
  }
660

661
  return DEAL_RES_CONTINUE;
19,438,141✔
662
}
663

664
static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) {
15,631,705✔
665
  SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
15,631,705✔
666
  if (pColumnData == NULL) {
15,626,827✔
UNCOV
667
    return terrno;
×
668
  }
669

670
  pColumnData->info.type = pType->type;
15,626,827✔
671
  pColumnData->info.bytes = pType->bytes;
15,628,966✔
672
  pColumnData->info.scale = pType->scale;
15,627,348✔
673
  pColumnData->info.precision = pType->precision;
15,620,990✔
674

675
  int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
15,628,930✔
676
  if (code != TSDB_CODE_SUCCESS) {
15,616,952✔
UNCOV
677
    terrno = code;
×
678
    releaseColInfoData(pColumnData);
×
679
    return terrno;
×
680
  }
681

682
  pParam->columnData = pColumnData;
15,616,952✔
683
  pParam->colAlloced = true;
15,627,102✔
684
  return TSDB_CODE_SUCCESS;
15,616,255✔
685
}
686

687
static void releaseColInfoData(void* pCol) {
4,210,421✔
688
  if (pCol) {
4,210,421✔
689
    SColumnInfoData* col = (SColumnInfoData*)pCol;
4,210,421✔
690
    colDataDestroy(col);
4,210,421✔
691
    taosMemoryFree(col);
4,209,860✔
692
  }
693
}
4,210,097✔
694

695
void freeItem(void* p) {
187,134,429✔
696
  STUidTagInfo* pInfo = p;
187,134,429✔
697
  if (pInfo->pTagVal != NULL) {
187,134,429✔
698
    taosMemoryFree(pInfo->pTagVal);
186,787,900✔
699
  }
700
}
187,123,491✔
701

702
typedef struct {
703
  col_id_t  colId;
704
  SNode*    pValueNode;
705
  int32_t   bytes;  // length defined in schema
706
} STagDataEntry;
707

708
static int compareTagDataEntry(const void* a, const void* b) {
20,916✔
709
  STagDataEntry* p1 = (STagDataEntry*)a;
20,916✔
710
  STagDataEntry* p2 = (STagDataEntry*)b;
20,916✔
711
  return compareInt16Val(&p1->colId, &p2->colId);
20,916✔
712
}
713

714
static int32_t buildTagDataEntryKey(SArray* pIdWithValue, char** keyBuf, int32_t keyLen) {
10,458✔
715
  *keyBuf = (char*)taosMemoryCalloc(1, keyLen);
10,458✔
716
  if (NULL == *keyBuf) {
10,458✔
UNCOV
717
    qError(
×
718
      "failed to allocate memory for tag filter optimization key, size:%d",
719
      keyLen);
UNCOV
720
    return terrno;
×
721
  }
722
  char* pStart = *keyBuf;
10,458✔
723
  for (int32_t i = 0; i < taosArrayGetSize(pIdWithValue); ++i) {
31,374✔
724
    STagDataEntry* entry      = (STagDataEntry*)taosArrayGet(pIdWithValue, i);
20,916✔
725
    SValueNode*    pValueNode = (SValueNode*)entry->pValueNode;
20,916✔
726
    // num type may have different bytes length, use the smaller one
727
    int32_t        bytes = TMIN(entry->bytes, pValueNode->node.resType.bytes);
20,916✔
728

729
    (void)memcpy(pStart, &entry->colId, sizeof(col_id_t));
20,916✔
730
    pStart += sizeof(col_id_t);
20,916✔
731

732
    if (!pValueNode->isNull) {
20,916✔
733
      switch (pValueNode->node.resType.type) {
18,426✔
734
        case TSDB_DATA_TYPE_BOOL:
2,158✔
735
          (void)memcpy(
2,158✔
736
            pStart, &pValueNode->datum.b, bytes);
2,158✔
737
          pStart += bytes;
2,158✔
738
          break;
2,158✔
739
        case TSDB_DATA_TYPE_TINYINT:
9,130✔
740
        case TSDB_DATA_TYPE_SMALLINT:
741
        case TSDB_DATA_TYPE_INT:
742
        case TSDB_DATA_TYPE_BIGINT:
743
        case TSDB_DATA_TYPE_TIMESTAMP:
744
          (void)memcpy(
9,130✔
745
            pStart, &pValueNode->datum.i, bytes);
9,130✔
746
          pStart += bytes;
9,130✔
747
          break;
9,130✔
UNCOV
748
        case TSDB_DATA_TYPE_UTINYINT:
×
749
        case TSDB_DATA_TYPE_USMALLINT:
750
        case TSDB_DATA_TYPE_UINT:
751
        case TSDB_DATA_TYPE_UBIGINT:
UNCOV
752
          (void)memcpy(
×
753
            pStart, &pValueNode->datum.u, bytes);
×
754
          pStart += bytes;
×
755
          break;
×
756
        case TSDB_DATA_TYPE_FLOAT:
2,490✔
757
        case TSDB_DATA_TYPE_DOUBLE:
758
          (void)memcpy(
2,490✔
759
            pStart, &pValueNode->datum.d, bytes);
2,490✔
760
          pStart += bytes;
2,490✔
761
          break;
2,490✔
762
        case TSDB_DATA_TYPE_VARCHAR:
4,648✔
763
        case TSDB_DATA_TYPE_VARBINARY:
764
        case TSDB_DATA_TYPE_NCHAR:
765
          (void)memcpy(pStart,
4,648✔
766
            varDataVal(pValueNode->datum.p), varDataLen(pValueNode->datum.p));
4,648✔
767
          pStart += varDataLen(pValueNode->datum.p);
4,648✔
768
          break;
4,648✔
UNCOV
769
        default:
×
770
          qError("unsupported tag data type %d in tag filter optimization",
×
771
            pValueNode->node.resType.type);
UNCOV
772
          return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
773
      }
774
    }
775
  }
776

777
  return TSDB_CODE_SUCCESS;
10,458✔
778
}
779

780
static void extractTagDataEntry(
20,916✔
781
  SOperatorNode* pOpNode, SArray* pIdWithValue) {
782
  SNode* pLeft = pOpNode->pLeft;
20,916✔
783
  SNode* pRight = pOpNode->pRight;
20,916✔
784
  SColumnNode* pColNode = nodeType(pLeft) == QUERY_NODE_COLUMN ?
20,916✔
785
    (SColumnNode*)pLeft : (SColumnNode*)pRight;
20,916✔
786
  SValueNode* pValueNode = nodeType(pLeft) == QUERY_NODE_VALUE ?
20,916✔
787
    (SValueNode*)pLeft : (SValueNode*)pRight;
20,916✔
788

789
  STagDataEntry entry = {0};
20,916✔
790
  entry.colId = pColNode->colId;
20,916✔
791
  entry.pValueNode = (SNode*)pValueNode;
20,916✔
792
  entry.bytes = pColNode->node.resType.bytes;
20,916✔
793
  void* _tmp = taosArrayPush(pIdWithValue, &entry);
20,916✔
794
}
20,916✔
795

796
static int32_t extractTagFilterTagDataEntries(
10,458✔
797
  const SNode* pTagCond, SArray* pIdWithVal) {
798
  if (NULL == pTagCond || NULL == pIdWithVal ||
10,458✔
799
    (nodeType(pTagCond) != QUERY_NODE_OPERATOR &&
10,458✔
800
      nodeType(pTagCond) != QUERY_NODE_LOGIC_CONDITION)) {
10,458✔
UNCOV
801
    qError("invalid parameter to extract tag filter symbol");
×
802
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
803
  }
804

805
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR) {
10,458✔
UNCOV
806
    extractTagDataEntry((SOperatorNode*)pTagCond, pIdWithVal);
×
807
  } else if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION) {
10,458✔
808
    SNode* pChild = NULL;
10,458✔
809
    FOREACH(pChild, ((SLogicConditionNode*)pTagCond)->pParameterList) {
31,374✔
810
      extractTagDataEntry((SOperatorNode*)pChild, pIdWithVal);
20,916✔
811
    }
812
  }
813

814
  taosArraySort(pIdWithVal, compareTagDataEntry);
10,458✔
815

816
  return TSDB_CODE_SUCCESS;
10,458✔
817
}
818

819
static int32_t genStableTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
10,458✔
820
  if (pTagCond == NULL) {
10,458✔
UNCOV
821
    return TSDB_CODE_SUCCESS;
×
822
  }
823

824
  char*   payload = NULL;
10,458✔
825
  int32_t len = 0;
10,458✔
826
  int32_t code = TSDB_CODE_SUCCESS;
10,458✔
827
  int32_t lino = 0;
10,458✔
828

829
  SArray* pIdWithVal = taosArrayInit(TARRAY_MIN_SIZE, sizeof(STagDataEntry));
10,458✔
830
  code = extractTagFilterTagDataEntries(pTagCond, pIdWithVal);
10,458✔
831
  QUERY_CHECK_CODE(code, lino, _end);
10,458✔
832
  for (int32_t i = 0; i < taosArrayGetSize(pIdWithVal); ++i) {
31,374✔
833
    STagDataEntry* pEntry = taosArrayGet(pIdWithVal, i);
20,916✔
834
    len += sizeof(col_id_t) + pEntry->bytes;
20,916✔
835
  }
836
  code = buildTagDataEntryKey(pIdWithVal, &payload, len);
10,458✔
837
  QUERY_CHECK_CODE(code, lino, _end);
10,458✔
838

839
  tMD5Init(pContext);
10,458✔
840
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
10,458✔
841
  tMD5Final(pContext);
10,458✔
842

843
_end:
10,458✔
844
  if (TSDB_CODE_SUCCESS != code) {
10,458✔
UNCOV
845
    qError("%s failed at line %d since %s",
×
846
      __func__, __LINE__, tstrerror(code));
847
  }
848
  taosArrayDestroy(pIdWithVal);
10,458✔
849
  taosMemoryFree(payload);
10,458✔
850
  return code;
10,458✔
851
}
852

853
static int32_t genTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
30,176✔
854
  if (pTagCond == NULL) {
30,176✔
855
    return TSDB_CODE_SUCCESS;
28,443✔
856
  }
857

858
  char*   payload = NULL;
1,733✔
859
  int32_t len = 0;
1,733✔
860
  int32_t code = nodesNodeToMsg(pTagCond, &payload, &len);
1,733✔
861
  if (code != TSDB_CODE_SUCCESS) {
1,733✔
UNCOV
862
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
863
    return code;
×
864
  }
865

866
  tMD5Init(pContext);
1,733✔
867
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
1,733✔
868
  tMD5Final(pContext);
1,733✔
869

870
  // void* tmp = NULL;
871
  // uint32_t size = 0;
872
  // (void)taosAscii2Hex((const char*)pContext->digest, 16, &tmp, &size);
873
  // qInfo("tag filter digest payload: %s", tmp);
874
  // taosMemoryFree(tmp);
875

876
  taosMemoryFree(payload);
1,733✔
877
  return TSDB_CODE_SUCCESS;
1,733✔
878
}
879

UNCOV
880
static int32_t genTbGroupDigest(const SNode* pGroup, uint8_t* filterDigest, T_MD5_CTX* pContext) {
×
881
  int32_t code = TSDB_CODE_SUCCESS;
×
882
  int32_t lino = 0;
×
883
  char*   payload = NULL;
×
884
  int32_t len = 0;
×
885
  code = nodesNodeToMsg(pGroup, &payload, &len);
×
886
  QUERY_CHECK_CODE(code, lino, _end);
×
887

UNCOV
888
  if (filterDigest[0]) {
×
889
    payload = taosMemoryRealloc(payload, len + tListLen(pContext->digest));
×
890
    QUERY_CHECK_NULL(payload, code, lino, _end, terrno);
×
891
    memcpy(payload + len, filterDigest + 1, tListLen(pContext->digest));
×
892
    len += tListLen(pContext->digest);
×
893
  }
894

UNCOV
895
  tMD5Init(pContext);
×
896
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
×
897
  tMD5Final(pContext);
×
898

UNCOV
899
_end:
×
900
  taosMemoryFree(payload);
×
901
  if (code != TSDB_CODE_SUCCESS) {
×
902
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
903
  }
UNCOV
904
  return code;
×
905
}
906

907
int32_t qGetColumnsFromNodeList(void* data, bool isList, SArray** pColList) {
15,461,402✔
908
  int32_t code = TSDB_CODE_SUCCESS;
15,461,402✔
909
  tagFilterAssist ctx = {0};
15,461,402✔
910
  ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
15,462,451✔
911
  if (ctx.colHash == NULL) {
15,463,494✔
UNCOV
912
    code = terrno;
×
913
    goto end;
×
914
  }
915

916
  ctx.index = 0;
15,463,494✔
917
  ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
15,463,494✔
918
  if (ctx.cInfoList == NULL) {
15,462,698✔
919
    code = terrno;
4,040✔
UNCOV
920
    goto end;
×
921
  }
922

923
  if (isList) {
15,458,658✔
924
    SNode* pNode = NULL;
4,036,063✔
925
    FOREACH(pNode, (SNodeList*)data) {
8,246,503✔
926
      nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx);
4,211,219✔
927
      if (TSDB_CODE_SUCCESS != ctx.code) {
4,210,652✔
UNCOV
928
        code = ctx.code;
×
929
        goto end;
×
930
      }
931
      REPLACE_NODE(pNode);
4,210,652✔
932
    }
933
  } else {
934
    SNode* pNode = (SNode*)data;
11,422,595✔
935
    nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx);
11,422,595✔
936
    if (TSDB_CODE_SUCCESS != ctx.code) {
11,423,927✔
UNCOV
937
      code = ctx.code;
×
938
      goto end;
×
939
    }
940
  }
941
  
942
  if (pColList != NULL) *pColList = ctx.cInfoList;
15,446,640✔
943
  ctx.cInfoList = NULL;
15,454,091✔
944

945
end:
15,468,661✔
946
  taosHashCleanup(ctx.colHash);
15,457,861✔
947
  taosArrayDestroy(ctx.cInfoList);
15,442,271✔
948
  return code;
15,444,459✔
949
}
950

951
static int32_t buildGroupInfo(SColumnInfoData* pValue, int32_t i, SArray* gInfo) {
427,084✔
952
  int32_t code = TSDB_CODE_SUCCESS;
427,084✔
953
  SStreamGroupValue* v = taosArrayReserve(gInfo, 1);
427,084✔
954
  if (v == NULL) {
426,877✔
UNCOV
955
    code = terrno;
×
956
    goto end;
×
957
  }
958
  if (colDataIsNull_s(pValue, i)) {
853,961✔
959
    v->isNull = true;
6,474✔
960
  } else {
961
    v->isNull = false;
420,610✔
962
    char* data = colDataGetData(pValue, i);
420,444✔
963
    if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
420,444✔
UNCOV
964
      if (tTagIsJson(data)) {
×
965
        code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
×
966
        goto end;
×
967
      }
UNCOV
968
      if (tTagIsJsonNull(data)) {
×
969
        v->isNull = true;
×
970
        goto end;
×
971
      }
UNCOV
972
      int32_t len = getJsonValueLen(data);
×
973
      v->data.type = pValue->info.type;
×
974
      v->data.nData = len;
×
975
      v->data.pData = taosMemoryCalloc(1, len + 1);
×
976
      if (v->data.pData == NULL) {
×
977
        code = terrno;
×
978
        goto end;
×
979
      }
UNCOV
980
      memcpy(v->data.pData, data, len);
×
981
      qDebug("buildGroupInfo:%d add json data len:%d, data:%s", i, len, (char*)v->data.pData);
×
982
    } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
420,237✔
983
      if (varDataTLen(data) > pValue->info.bytes) {
308,062✔
UNCOV
984
        code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
985
        goto end;
×
986
      }
987
      v->data.type = pValue->info.type;
308,269✔
988
      v->data.nData = varDataLen(data);
308,269✔
989
      v->data.pData = taosMemoryCalloc(1, varDataLen(data) + 1);
308,269✔
990
      if (v->data.pData == NULL) {
308,269✔
UNCOV
991
        code = terrno;
×
992
        goto end;
×
993
      }
994
      memcpy(v->data.pData, varDataVal(data), varDataLen(data));
308,269✔
995
      qDebug("buildGroupInfo:%d add var data type:%d, len:%d, data:%s", i, pValue->info.type, varDataLen(data), (char*)v->data.pData);
308,269✔
996
    } else if (pValue->info.type == TSDB_DATA_TYPE_DECIMAL) {  // reader todo decimal
112,175✔
UNCOV
997
      v->data.type = pValue->info.type;
×
998
      v->data.nData = pValue->info.bytes;
×
999
      v->data.pData = taosMemoryCalloc(1, pValue->info.bytes);
×
1000
      if (v->data.pData == NULL) {
×
1001
        code = terrno;
×
1002
        goto end;
×
1003
      }
UNCOV
1004
      memcpy(&v->data.pData, data, pValue->info.bytes);
×
1005
      qDebug("buildGroupInfo:%d add data type:%d, data:%"PRId64, i, pValue->info.type, v->data.val);
×
1006
    } else {  // reader todo decimal
1007
      v->data.type = pValue->info.type;
112,175✔
1008
      memcpy(&v->data.val, data, pValue->info.bytes);
112,175✔
1009
      qDebug("buildGroupInfo:%d add data type:%d, data:%"PRId64, i, pValue->info.type, v->data.val);
112,175✔
1010
    }
1011
  }
1012
end:
41,802✔
1013
  if (code != TSDB_CODE_SUCCESS) {
426,918✔
UNCOV
1014
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1015
    v->isNull = true;
×
1016
  }
1017
  return code;
426,918✔
1018
}
1019

1020
static void getColInfoResultForGroupbyForStream(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo,
66,638✔
1021
                                   SStorageAPI* pAPI, SHashObj* groupIdMap) {
1022
  int32_t      code = TSDB_CODE_SUCCESS;
66,638✔
1023
  int32_t      lino = 0;
66,638✔
1024
  SArray*      pBlockList = NULL;
66,638✔
1025
  SSDataBlock* pResBlock = NULL;
66,638✔
1026
  SArray*      groupData = NULL;
66,847✔
1027
  SArray*      pUidTagList = NULL;
66,847✔
1028
  SArray*      gInfo = NULL;
66,847✔
1029
  int32_t      tbNameIndex = 0;
66,847✔
1030

1031
  int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
66,847✔
1032
  if (rows == 0) {
66,847✔
UNCOV
1033
    return;
×
1034
  }
1035

1036
  pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
66,847✔
1037
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
66,847✔
1038

1039
  for (int32_t i = 0; i < rows; ++i) {
300,468✔
1040
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
233,621✔
1041
    QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
233,621✔
1042
    STUidTagInfo info = {.uid = pkeyInfo->uid};
233,621✔
1043
    void*        tmp = taosArrayPush(pUidTagList, &info);
233,621✔
1044
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
233,621✔
1045
  }
1046
 
1047
  if (taosArrayGetSize(pUidTagList) > 0) {
66,847✔
1048
    code = pAPI->metaFn.getTableTagsByUid(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
66,847✔
1049
  } else {
UNCOV
1050
    code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
×
1051
  }
1052
  if (code != TSDB_CODE_SUCCESS) {
66,847✔
UNCOV
1053
    goto end;
×
1054
  }
1055

1056
  SArray* pColList = NULL;
66,847✔
1057
  code = qGetColumnsFromNodeList(group, true, &pColList);
66,847✔
1058
  if (code != TSDB_CODE_SUCCESS) {
66,847✔
UNCOV
1059
    goto end;
×
1060
  }
1061

1062
  for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
175,074✔
1063
    SColumnInfo* tmp = (SColumnInfo*)taosArrayGet(pColList, i);
108,227✔
1064
    if (tmp != NULL && tmp->colId == -1) {
108,227✔
1065
      tbNameIndex = i;
66,847✔
1066
    }
1067
  }
1068
  
1069
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
66,628✔
1070
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
66,847✔
1071
  taosArrayDestroy(pColList);
66,847✔
1072
  if (pResBlock == NULL) {
66,847✔
1073
    code = terrno;
247✔
1074
    goto end;
247✔
1075
  }
1076

1077
  pBlockList = taosArrayInit(2, POINTER_BYTES);
66,600✔
1078
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
66,600✔
1079

1080
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
66,600✔
1081
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
66,600✔
1082

1083
  groupData = taosArrayInit(2, POINTER_BYTES);
66,600✔
1084
  QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
66,600✔
1085

1086
  SNode* pNode = NULL;
66,600✔
1087
  FOREACH(pNode, group) {
174,333✔
1088
    SScalarParam output = {0};
107,733✔
1089

1090
    switch (nodeType(pNode)) {
107,733✔
UNCOV
1091
      case QUERY_NODE_VALUE:
×
1092
        break;
×
1093
      case QUERY_NODE_COLUMN:
107,733✔
1094
      case QUERY_NODE_OPERATOR:
1095
      case QUERY_NODE_FUNCTION: {
1096
        SExprNode* expNode = (SExprNode*)pNode;
107,733✔
1097
        code = createResultData(&expNode->resType, rows, &output);
107,733✔
1098
        if (code != TSDB_CODE_SUCCESS) {
107,733✔
UNCOV
1099
          goto end;
×
1100
        }
1101
        break;
107,733✔
1102
      }
1103

UNCOV
1104
      default:
×
1105
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
1106
        goto end;
×
1107
    }
1108

1109
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
107,733✔
1110
      SColumnNode*     pSColumnNode = (SColumnNode*)pNode;
107,733✔
1111
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
107,733✔
1112
      QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
107,733✔
1113
      code = colDataAssign(output.columnData, pColInfo, rows, NULL);
107,733✔
UNCOV
1114
    } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
×
1115
      continue;
×
1116
    } else {
UNCOV
1117
      gTaskScalarExtra.pStreamInfo = NULL;
×
1118
      gTaskScalarExtra.pStreamRange = NULL;
×
1119
      code = scalarCalculate(pNode, pBlockList, &output, &gTaskScalarExtra);
×
1120
    }
1121

1122
    if (code != TSDB_CODE_SUCCESS) {
107,733✔
UNCOV
1123
      releaseColInfoData(output.columnData);
×
1124
      goto end;
×
1125
    }
1126

1127
    void* tmp = taosArrayPush(groupData, &output.columnData);
107,733✔
1128
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
107,733✔
1129
  }
1130

1131
  for (int i = 0; i < rows; i++) {
299,974✔
1132
    gInfo = taosArrayInit(taosArrayGetSize(groupData), sizeof(SStreamGroupValue));
233,374✔
1133
    QUERY_CHECK_NULL(gInfo, code, lino, end, terrno);
233,167✔
1134

1135
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
233,167✔
1136
    QUERY_CHECK_NULL(info, code, lino, end, terrno);
233,374✔
1137

1138
    for (int j = 0; j < taosArrayGetSize(groupData); j++) {
585,444✔
1139
      SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
352,070✔
1140
        int32_t ret = buildGroupInfo(pValue, i, gInfo);
352,070✔
1141
        if (ret != TSDB_CODE_SUCCESS) {
352,070✔
UNCOV
1142
          qError("buildGroupInfo failed at line %d since %s", __LINE__, tstrerror(ret));
×
1143
          goto end;
×
1144
        }
1145
        if (j == tbNameIndex) {
352,070✔
1146
          SStreamGroupValue* v = taosArrayGetLast(gInfo);
233,374✔
1147
          if (v != NULL){
233,374✔
1148
            v->isTbname = true;
233,374✔
1149
            v->uid = info->uid;
233,374✔
1150
          }
1151
        }
1152
    }
1153

1154
    int32_t ret = taosHashPut(groupIdMap, &info->uid, sizeof(info->uid), &gInfo, POINTER_BYTES);
233,374✔
1155
    if (ret != TSDB_CODE_SUCCESS) {
233,374✔
UNCOV
1156
      qError("put groupid to map failed at line %d since %s", __LINE__, tstrerror(ret));
×
1157
      goto end;
×
1158
    }
1159
    qDebug("put groupid to map gid:%" PRIu64, info->uid);
233,374✔
1160
    gInfo = NULL;
233,374✔
1161
  }
1162

1163
end:
66,847✔
1164
  blockDataDestroy(pResBlock);
66,847✔
1165
  taosArrayDestroy(pBlockList);
66,847✔
1166
  taosArrayDestroyEx(pUidTagList, freeItem);
66,847✔
1167
  taosArrayDestroyP(groupData, releaseColInfoData);
66,847✔
1168
  taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
66,847✔
1169

1170
  if (code != TSDB_CODE_SUCCESS) {
66,847✔
1171
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
247✔
1172
  }
1173
}
1174

1175
int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo, uint8_t* digest,
3,968,779✔
1176
                                   SStorageAPI* pAPI, bool initRemainGroups, SHashObj* groupIdMap) {
1177
  int32_t      code = TSDB_CODE_SUCCESS;
3,968,779✔
1178
  int32_t      lino = 0;
3,968,779✔
1179
  SArray*      pBlockList = NULL;
3,968,779✔
1180
  SSDataBlock* pResBlock = NULL;
3,968,779✔
1181
  void*        keyBuf = NULL;
3,968,779✔
1182
  SArray*      groupData = NULL;
3,968,779✔
1183
  SArray*      pUidTagList = NULL;
3,968,779✔
1184
  SArray*      tableList = NULL;
3,968,779✔
1185
  SArray*      gInfo = NULL;
3,968,779✔
1186

1187
  int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
3,968,779✔
1188
  if (rows == 0) {
3,968,779✔
UNCOV
1189
    return TSDB_CODE_SUCCESS;
×
1190
  } 
1191

1192
  T_MD5_CTX context = {0};
3,968,779✔
1193
  if (tsTagFilterCache && groupIdMap == NULL) {
3,969,216✔
UNCOV
1194
    SNodeListNode* listNode = NULL;
×
1195
    code = nodesMakeNode(QUERY_NODE_NODE_LIST, (SNode**)&listNode);
×
1196
    if (TSDB_CODE_SUCCESS != code) {
×
1197
      goto end;
×
1198
    }
UNCOV
1199
    listNode->pNodeList = group;
×
1200
    code = genTbGroupDigest((SNode*)listNode, digest, &context);
×
1201
    QUERY_CHECK_CODE(code, lino, end);
×
1202

UNCOV
1203
    nodesFree(listNode);
×
1204

UNCOV
1205
    code = pAPI->metaFn.metaGetCachedTbGroup(pVnode, pTableListInfo->idInfo.suid, context.digest,
×
1206
                                             tListLen(context.digest), &tableList);
UNCOV
1207
    QUERY_CHECK_CODE(code, lino, end);
×
1208

UNCOV
1209
    if (tableList) {
×
1210
      taosArrayDestroy(pTableListInfo->pTableList);
×
1211
      pTableListInfo->pTableList = tableList;
×
1212
      qDebug("retrieve tb group list from cache, numOfTables:%d",
×
1213
             (int32_t)taosArrayGetSize(pTableListInfo->pTableList));
UNCOV
1214
      goto end;
×
1215
    }
1216
  }
1217

1218
  pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
3,969,216✔
1219
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
3,969,216✔
1220

1221
  for (int32_t i = 0; i < rows; ++i) {
26,278,938✔
1222
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
22,304,820✔
1223
    QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
22,308,852✔
1224
    STUidTagInfo info = {.uid = pkeyInfo->uid};
22,308,852✔
1225
    void*        tmp = taosArrayPush(pUidTagList, &info);
22,309,722✔
1226
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
22,309,722✔
1227
  }
1228

1229
  if (taosArrayGetSize(pUidTagList) > 0) {
3,974,118✔
1230
    code = pAPI->metaFn.getTableTagsByUid(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
3,969,216✔
1231
  } else {
UNCOV
1232
    code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
×
1233
  }
1234
  if (code != TSDB_CODE_SUCCESS) {
3,969,216✔
UNCOV
1235
    goto end;
×
1236
  }
1237

1238
  SArray* pColList = NULL;
3,969,216✔
1239
  code = qGetColumnsFromNodeList(group, true, &pColList); 
3,969,216✔
1240
  if (code != TSDB_CODE_SUCCESS) {
3,969,216✔
UNCOV
1241
    goto end;
×
1242
  }
1243

1244
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
3,969,216✔
1245
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
3,969,216✔
1246
  taosArrayDestroy(pColList);
3,968,712✔
1247
  if (pResBlock == NULL) {
3,968,475✔
UNCOV
1248
    code = terrno;
×
1249
    goto end;
×
1250
  }
1251

1252
  //  int64_t st1 = taosGetTimestampUs();
1253
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
1254

1255
  pBlockList = taosArrayInit(2, POINTER_BYTES);
3,968,475✔
1256
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
3,969,216✔
1257

1258
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
3,969,216✔
1259
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
3,969,216✔
1260

1261
  groupData = taosArrayInit(2, POINTER_BYTES);
3,969,216✔
1262
  QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
3,968,821✔
1263

1264
  SNode* pNode = NULL;
3,968,475✔
1265
  FOREACH(pNode, group) {
8,071,509✔
1266
    SScalarParam output = {0};
4,102,251✔
1267

1268
    switch (nodeType(pNode)) {
4,102,251✔
UNCOV
1269
      case QUERY_NODE_VALUE:
×
1270
        break;
×
1271
      case QUERY_NODE_COLUMN:
4,103,034✔
1272
      case QUERY_NODE_OPERATOR:
1273
      case QUERY_NODE_FUNCTION: {
1274
        SExprNode* expNode = (SExprNode*)pNode;
4,103,034✔
1275
        code = createResultData(&expNode->resType, rows, &output);
4,103,034✔
1276
        if (code != TSDB_CODE_SUCCESS) {
4,101,905✔
UNCOV
1277
          goto end;
×
1278
        }
1279
        break;
4,101,905✔
1280
      }
UNCOV
1281
      case QUERY_NODE_REMOTE_VALUE: {
×
1282
        SRemoteValueNode* pRemote = (SRemoteValueNode*)pNode;
×
1283
        code = qFetchRemoteNode(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, pNode);
×
1284
        QUERY_CHECK_CODE(code, lino, end);
×
1285
        break;
×
1286
      }
1287
      
UNCOV
1288
      default:
×
1289
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
1290
        goto end;
×
1291
    }
1292

1293
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
4,101,905✔
1294
      SColumnNode*     pSColumnNode = (SColumnNode*)pNode;
4,084,871✔
1295
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
4,084,871✔
1296
      QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
4,081,966✔
1297
      code = colDataAssign(output.columnData, pColInfo, rows, NULL);
4,081,966✔
1298
    } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
17,107✔
UNCOV
1299
      continue;
×
1300
    } else {
1301
      gTaskScalarExtra.pStreamInfo = NULL;
18,163✔
1302
      gTaskScalarExtra.pStreamRange = NULL;
18,163✔
1303
      code = scalarCalculate(pNode, pBlockList, &output, &gTaskScalarExtra);
18,163✔
1304
    }
1305

1306
    if (code != TSDB_CODE_SUCCESS) {
4,102,522✔
UNCOV
1307
      releaseColInfoData(output.columnData);
×
1308
      goto end;
×
1309
    }
1310

1311
    void* tmp = taosArrayPush(groupData, &output.columnData);
4,103,034✔
1312
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
4,103,034✔
1313
  }
1314

1315
  int32_t keyLen = 0;
3,967,643✔
1316
  SNode*  node;
1317
  FOREACH(node, group) {
8,066,282✔
1318
    SExprNode* pExpr = (SExprNode*)node;
4,101,631✔
1319
    keyLen += pExpr->resType.bytes;
4,101,631✔
1320
  }
1321

1322
  int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
3,966,536✔
1323
  keyLen += nullFlagSize;
3,966,536✔
1324

1325
  keyBuf = taosMemoryCalloc(1, keyLen);
3,966,536✔
1326
  if (keyBuf == NULL) {
3,965,533✔
UNCOV
1327
    code = terrno;
×
1328
    goto end;
×
1329
  }
1330

1331
  if (initRemainGroups) {
3,965,533✔
1332
    pTableListInfo->remainGroups =
1,916,350✔
1333
        taosHashInit(rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
1,915,913✔
1334
    if (pTableListInfo->remainGroups == NULL) {
1,916,350✔
UNCOV
1335
      code = terrno;
×
1336
      goto end;
×
1337
    }
1338
  }
1339

1340
  for (int i = 0; i < rows; i++) {
26,275,350✔
1341
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
22,306,197✔
1342
    QUERY_CHECK_NULL(info, code, lino, end, terrno);
22,306,580✔
1343

1344
    if (groupIdMap != NULL){
22,306,580✔
1345
      gInfo = taosArrayInit(taosArrayGetSize(groupData), sizeof(SStreamGroupValue));
62,765✔
1346
    }
1347
    
1348
    char* isNull = (char*)keyBuf;
22,307,205✔
1349
    char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(group);
22,307,205✔
1350
    for (int j = 0; j < taosArrayGetSize(groupData); j++) {
45,680,875✔
1351
      SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
23,377,229✔
1352

1353
      if (groupIdMap != NULL && gInfo != NULL) {
23,376,007✔
1354
        int32_t ret = buildGroupInfo(pValue, i, gInfo);
75,014✔
1355
        if (ret != TSDB_CODE_SUCCESS) {
74,848✔
UNCOV
1356
          qError("buildGroupInfo failed at line %d since %s", __LINE__, tstrerror(ret));
×
1357
          taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
×
1358
          gInfo = NULL;
×
1359
        }
1360
      }
1361
      
1362
      if (colDataIsNull_s(pValue, i)) {
46,752,626✔
1363
        isNull[j] = 1;
96,966✔
1364
      } else {
1365
        isNull[j] = 0;
23,279,819✔
1366
        char* data = colDataGetData(pValue, i);
23,281,064✔
1367
        if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
23,280,809✔
1368
          // if (tTagIsJson(data)) {
1369
          //   code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
1370
          //   goto end;
1371
          // }
1372
          if (tTagIsJsonNull(data)) {
74,491✔
UNCOV
1373
            isNull[j] = 1;
×
1374
            continue;
×
1375
          }
1376
          int32_t len = getJsonValueLen(data);
74,491✔
1377
          memcpy(pStart, data, len);
74,491✔
1378
          pStart += len;
74,491✔
1379
        } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
23,204,046✔
1380
          if (IS_STR_DATA_BLOB(pValue->info.type)) {
20,308,284✔
UNCOV
1381
            if (blobDataTLen(data) > TSDB_MAX_BLOB_LEN) {
×
UNCOV
1382
              code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
1383
              goto end;
×
1384
            }
UNCOV
1385
            memcpy(pStart, data, blobDataTLen(data));
×
1386
            pStart += blobDataTLen(data);
×
1387
          } else {
1388
            if (varDataTLen(data) > pValue->info.bytes) {
20,311,048✔
UNCOV
1389
              code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
1390
              goto end;
×
1391
            }
1392
            memcpy(pStart, data, varDataTLen(data));
20,309,807✔
1393
            pStart += varDataTLen(data);
20,310,548✔
1394
          }
1395
        } else {
1396
          memcpy(pStart, data, pValue->info.bytes);
2,896,582✔
1397
          pStart += pValue->info.bytes;
2,895,750✔
1398
        }
1399
      }
1400
    }
1401

1402
    int32_t len = (int32_t)(pStart - (char*)keyBuf);
22,296,733✔
1403
    info->groupId = calcGroupId(keyBuf, len);
22,296,733✔
1404
    if (groupIdMap != NULL && gInfo != NULL) {
22,306,928✔
1405
      int32_t ret = taosHashPut(groupIdMap, &info->groupId, sizeof(info->groupId), &gInfo, POINTER_BYTES);
62,931✔
1406
      if (ret != TSDB_CODE_SUCCESS) {
62,931✔
UNCOV
1407
        qError("put groupid to map failed at line %d since %s", __LINE__, tstrerror(ret));
×
1408
        taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
×
1409
      }
1410
      qDebug("put groupid to map gid:%" PRIu64, info->groupId);
62,931✔
1411
      gInfo = NULL;
62,931✔
1412
    }
1413
    if (initRemainGroups) {
22,306,928✔
1414
      // groupId ~ table uid
1415
      code = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid),
10,930,035✔
1416
                         sizeof(info->uid));
1417
      if (code == TSDB_CODE_DUP_KEY) {
10,930,148✔
1418
        code = TSDB_CODE_SUCCESS;
753,080✔
1419
      }
1420
      QUERY_CHECK_CODE(code, lino, end);
10,930,148✔
1421
    }
1422
  }
1423

1424
  if (tsTagFilterCache && groupIdMap == NULL) {
3,969,153✔
UNCOV
1425
    tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
×
1426
    QUERY_CHECK_NULL(tableList, code, lino, end, terrno);
×
1427

UNCOV
1428
    code = pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest,
×
1429
                                              tListLen(context.digest), tableList,
UNCOV
1430
                                              taosArrayGetSize(tableList) * sizeof(STableKeyInfo));
×
1431
    QUERY_CHECK_CODE(code, lino, end);
×
1432
  }
1433

1434
  //  int64_t st2 = taosGetTimestampUs();
1435
  //  qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
1436

1437
end:
3,968,716✔
1438
  taosMemoryFreeClear(keyBuf);
3,968,475✔
1439
  blockDataDestroy(pResBlock);
3,968,475✔
1440
  taosArrayDestroy(pBlockList);
3,967,538✔
1441
  taosArrayDestroyEx(pUidTagList, freeItem);
3,966,597✔
1442
  taosArrayDestroyP(groupData, releaseColInfoData);
3,969,216✔
1443
  taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
3,968,546✔
1444

1445
  if (code != TSDB_CODE_SUCCESS) {
3,968,546✔
UNCOV
1446
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1447
  }
1448
  return code;
3,967,914✔
1449
}
1450

1451
static int32_t nameComparFn(const void* p1, const void* p2) {
1,753,721✔
1452
  const char* pName1 = *(const char**)p1;
1,753,721✔
1453
  const char* pName2 = *(const char**)p2;
1,753,721✔
1454

1455
  int32_t ret = strcmp(pName1, pName2);
1,752,871✔
1456
  if (ret == 0) {
1,752,871✔
1457
    return 0;
15,864✔
1458
  } else {
1459
    return (ret > 0) ? 1 : -1;
1,737,007✔
1460
  }
1461
}
1462

1463
static SArray* getTableNameList(const SNodeListNode* pList) {
909,699✔
1464
  int32_t    code = TSDB_CODE_SUCCESS;
909,699✔
1465
  int32_t    lino = 0;
909,699✔
1466
  int32_t    len = LIST_LENGTH(pList->pNodeList);
909,699✔
1467
  SListCell* cell = pList->pNodeList->pHead;
909,699✔
1468

1469
  SArray* pTbList = taosArrayInit(len, POINTER_BYTES);
909,712✔
1470
  QUERY_CHECK_NULL(pTbList, code, lino, _end, terrno);
910,204✔
1471

1472
  for (int i = 0; i < pList->pNodeList->length; i++) {
2,645,208✔
1473
    SValueNode* valueNode = (SValueNode*)cell->pNode;
1,734,150✔
1474
    if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
1,734,499✔
UNCOV
1475
      terrno = TSDB_CODE_INVALID_PARA;
×
1476
      taosArrayDestroy(pTbList);
×
1477
      return NULL;
×
1478
    }
1479

1480
    char* name = varDataVal(valueNode->datum.p);
1,734,512✔
1481
    void* tmp = taosArrayPush(pTbList, &name);
1,735,004✔
1482
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,735,004✔
1483
    cell = cell->pNext;
1,735,004✔
1484
  }
1485

1486
  size_t numOfTables = taosArrayGetSize(pTbList);
910,204✔
1487

1488
  // order the name
1489
  taosArraySort(pTbList, nameComparFn);
910,204✔
1490

1491
  // remove the duplicates
1492
  SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
910,204✔
1493
  QUERY_CHECK_NULL(pNewList, code, lino, _end, terrno);
908,782✔
1494
  void* tmpTbl = taosArrayGet(pTbList, 0);
908,782✔
1495
  QUERY_CHECK_NULL(tmpTbl, code, lino, _end, terrno);
909,699✔
1496
  void* tmp = taosArrayPush(pNewList, tmpTbl);
909,779✔
1497
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
909,779✔
1498

1499
  for (int32_t i = 1; i < numOfTables; ++i) {
1,733,872✔
1500
    char** name = taosArrayGetLast(pNewList);
823,937✔
1501
    char** nameInOldList = taosArrayGet(pTbList, i);
823,937✔
1502
    QUERY_CHECK_NULL(nameInOldList, code, lino, _end, terrno);
824,442✔
1503
    if (strcmp(*name, *nameInOldList) == 0) {
824,442✔
1504
      continue;
8,558✔
1505
    }
1506

1507
    tmp = taosArrayPush(pNewList, nameInOldList);
815,535✔
1508
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
815,535✔
1509
  }
1510

1511
_end:
909,935✔
1512
  taosArrayDestroy(pTbList);
909,935✔
1513
  if (code != TSDB_CODE_SUCCESS) {
909,207✔
UNCOV
1514
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1515
    return NULL;
×
1516
  }
1517
  return pNewList;
909,699✔
1518
}
1519

UNCOV
1520
static int tableUidCompare(const void* a, const void* b) {
×
1521
  uint64_t u1 = *(uint64_t*)a;
×
1522
  uint64_t u2 = *(uint64_t*)b;
×
1523

UNCOV
1524
  if (u1 == u2) {
×
1525
    return 0;
×
1526
  }
1527

UNCOV
1528
  return u1 < u2 ? -1 : 1;
×
1529
}
1530

1531
static int32_t filterTableInfoCompare(const void* a, const void* b) {
16,399,801✔
1532
  STUidTagInfo* p1 = (STUidTagInfo*)a;
16,399,801✔
1533
  STUidTagInfo* p2 = (STUidTagInfo*)b;
16,399,801✔
1534

1535
  if (p1->uid == p2->uid) {
16,399,801✔
UNCOV
1536
    return 0;
×
1537
  }
1538

1539
  return p1->uid < p2->uid ? -1 : 1;
16,399,801✔
1540
}
1541

1542
static FilterCondType checkTagCond(SNode* cond) {
13,360,870✔
1543
  if (nodeType(cond) == QUERY_NODE_OPERATOR) {
13,360,870✔
1544
    return FILTER_NO_LOGIC;
9,821,047✔
1545
  }
1546
  if (nodeType(cond) == QUERY_NODE_LOGIC_CONDITION && ((SLogicConditionNode*)cond)->condType == LOGIC_COND_TYPE_AND) {
3,540,438✔
1547
    return FILTER_AND;
3,050,970✔
1548
  }
1549
  return FILTER_OTHER;
484,599✔
1550
}
1551

1552
static bool optimizeTbnameInCond(void* pVnode, int64_t suid, SArray* list, SNode* cond, SStorageAPI* pAPI) {
14,171,409✔
1553
  int32_t code = 0;
14,171,409✔
1554
  int32_t ntype = nodeType(cond);
14,171,409✔
1555

1556
  if (ntype == QUERY_NODE_OPERATOR) {
14,174,225✔
1557
    code = optimizeTbnameInCondImpl(pVnode, list, cond, pAPI, suid);
10,626,994✔
1558
    return code == 0;
10,623,023✔
1559
  }
1560
  if (ntype != QUERY_NODE_LOGIC_CONDITION || ((SLogicConditionNode*)cond)->condType != LOGIC_COND_TYPE_AND) {
3,547,231✔
1561
    return false;
483,756✔
1562
  }
1563

1564
  bool                 hasTbnameCond = false;
3,061,407✔
1565
  SLogicConditionNode* pNode = (SLogicConditionNode*)cond;
3,061,407✔
1566
  SNodeList*           pList = (SNodeList*)pNode->pParameterList;
3,061,407✔
1567

1568
  int32_t len = LIST_LENGTH(pList);
3,058,246✔
1569
  if (len <= 0) {
3,059,057✔
UNCOV
1570
    return false;
×
1571
  }
1572

1573
  SListCell* cell = pList->pHead;
3,059,057✔
1574
  for (int i = 0; i < len; i++) {
9,555,149✔
1575
    if (cell == NULL) break;
6,495,448✔
1576
    if (optimizeTbnameInCondImpl(pVnode, list, cell->pNode, pAPI, suid) == 0) {
6,495,448✔
1577
      hasTbnameCond = true;
7,564✔
1578
      break;
7,564✔
1579
    }
1580
    cell = cell->pNext;
6,492,378✔
1581
  }
1582

1583
  taosArraySort(list, filterTableInfoCompare);
3,067,265✔
1584
  taosArrayRemoveDuplicate(list, filterTableInfoCompare, NULL);
3,061,759✔
1585

1586
  if (hasTbnameCond) {
3,061,075✔
1587
    code = pAPI->metaFn.getTableTagsByUid(pVnode, suid, list);
7,564✔
1588
    return code == 0;
7,564✔
1589
  }
1590

1591
  return false;
3,053,511✔
1592
}
1593

1594
// only return uid that does not contained in pExistedUidList
1595
static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, SNode* pTagCond, SStorageAPI* pStoreAPI,
17,127,883✔
1596
                                        uint64_t suid) {
1597
  if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) {
17,127,883✔
1598
    return -1;
5,072✔
1599
  }
1600

1601
  SOperatorNode* pNode = (SOperatorNode*)pTagCond;
17,123,431✔
1602
  if (pNode->opType != OP_TYPE_IN) {
17,123,431✔
1603
    return -1;
14,625,844✔
1604
  }
1605

1606
  if ((pNode->pLeft != NULL && ((nodeType(pNode->pLeft) == QUERY_NODE_FUNCTION &&
2,496,945✔
1607
                                 ((SFunctionNode*)pNode->pLeft)->funcType == FUNCTION_TYPE_TBNAME)) ||
912,268✔
1608
       (nodeType(pNode->pLeft) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pLeft)->colType == COLUMN_TYPE_TBNAME)) &&
1,586,477✔
1609
      (pNode->pRight != NULL && nodeType(pNode->pRight) == QUERY_NODE_NODE_LIST)) {
910,781✔
1610
    SNodeListNode* pList = (SNodeListNode*)pNode->pRight;
910,204✔
1611

1612
    int32_t len = LIST_LENGTH(pList->pNodeList);
910,204✔
1613
    if (len <= 0) {
910,204✔
UNCOV
1614
      return -1;
×
1615
    }
1616

1617
    SArray*   pTbList = getTableNameList(pList);
910,204✔
1618
    int32_t   numOfTables = taosArrayGetSize(pTbList);
909,779✔
1619
    SHashObj* uHash = NULL;
909,779✔
1620

1621
    size_t numOfExisted = taosArrayGetSize(pExistedUidList);  // len > 0 means there already have uids
909,779✔
1622
    if (numOfExisted > 0) {
909,274✔
1623
      uHash = taosHashInit(numOfExisted / 0.7, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
2,018✔
1624
      if (!uHash) {
2,018✔
UNCOV
1625
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1626
        return terrno;
×
1627
      }
1628

1629
      for (int i = 0; i < numOfExisted; i++) {
2,011,447✔
1630
        STUidTagInfo* pTInfo = taosArrayGet(pExistedUidList, i);
2,009,429✔
1631
        if (!pTInfo) {
2,008,925✔
UNCOV
1632
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1633
          return terrno;
×
1634
        }
1635
        int32_t tempRes = taosHashPut(uHash, &pTInfo->uid, sizeof(uint64_t), &i, sizeof(i));
2,008,925✔
1636
        if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
2,009,429✔
UNCOV
1637
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
1638
          return tempRes;
×
1639
        }
1640
      }
1641
    }
1642

1643
    for (int i = 0; i < numOfTables; i++) {
2,436,982✔
1644
      char* name = taosArrayGetP(pTbList, i);
1,624,798✔
1645

1646
      uint64_t uid = 0, csuid = 0;
1,625,234✔
1647
      if (pStoreAPI->metaFn.getTableUidByName(pVnode, name, &uid) == 0) {
1,625,234✔
1648
        ETableType tbType = TSDB_TABLE_MAX;
456,857✔
1649
        if (pStoreAPI->metaFn.getTableTypeSuidByName(pVnode, name, &tbType, &csuid) == 0 &&
456,857✔
1650
            tbType == TSDB_CHILD_TABLE) {
456,857✔
1651
          if (suid != csuid) {
358,837✔
1652
            continue;
2,016✔
1653
          }
1654
          if (NULL == uHash || taosHashGet(uHash, &uid, sizeof(uid)) == NULL) {
356,821✔
1655
            STUidTagInfo s = {.uid = uid, .name = name, .pTagVal = NULL};
356,248✔
1656
            void*        tmp = taosArrayPush(pExistedUidList, &s);
356,248✔
1657
            if (!tmp) {
356,248✔
UNCOV
1658
              return terrno;
×
1659
            }
1660
          }
1661
        } else {
1662
          taosArrayDestroy(pTbList);
98,020✔
1663
          taosHashCleanup(uHash);
98,020✔
1664
          return -1;
98,020✔
1665
        }
1666
      } else {
1667
        //        qWarn("failed to get tableIds from by table name: %s, reason: %s", name, tstrerror(terrno));
1668
        terrno = 0;
1,168,871✔
1669
      }
1670
    }
1671

1672
    taosHashCleanup(uHash);
812,184✔
1673
    taosArrayDestroy(pTbList);
812,184✔
1674
    return 0;
811,692✔
1675
  }
1676

1677
  return -1;
1,587,246✔
1678
}
1679

1680
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
17,087,170✔
1681
                                        SStorageAPI* pStorageAPI) {
1682
  int32_t      code = TSDB_CODE_SUCCESS;
17,087,170✔
1683
  int32_t      lino = 0;
17,087,170✔
1684
  SSDataBlock* pResBlock = NULL;
17,087,170✔
1685
  code = createDataBlock(&pResBlock);
17,091,125✔
1686
  QUERY_CHECK_CODE(code, lino, _end);
17,088,533✔
1687

1688
  for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
35,539,842✔
1689
    SColumnInfoData colInfo = {0};
18,455,257✔
1690
    void*           tmp = taosArrayGet(pColList, i);
18,452,222✔
1691
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
18,449,176✔
1692
    colInfo.info = *(SColumnInfo*)tmp;
18,449,176✔
1693
    code = blockDataAppendColInfo(pResBlock, &colInfo);
18,440,969✔
1694
    QUERY_CHECK_CODE(code, lino, _end);
18,448,761✔
1695
  }
1696

1697
  code = blockDataEnsureCapacity(pResBlock, numOfTables);
17,084,020✔
1698
  if (code != TSDB_CODE_SUCCESS) {
17,092,827✔
UNCOV
1699
    terrno = code;
×
1700
    blockDataDestroy(pResBlock);
×
1701
    return NULL;
×
1702
  }
1703

1704
  pResBlock->info.rows = numOfTables;
17,092,827✔
1705

1706
  int32_t numOfCols = taosArrayGetSize(pResBlock->pDataBlock);
17,095,299✔
1707

1708
  for (int32_t i = 0; i < numOfTables; i++) {
206,658,641✔
1709
    STUidTagInfo* p1 = taosArrayGet(pUidTagList, i);
189,557,039✔
1710
    QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
189,569,752✔
1711

1712
    for (int32_t j = 0; j < numOfCols; j++) {
387,703,925✔
1713
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
197,960,601✔
1714
      QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
197,869,955✔
1715

1716
      if (pColInfo->info.colId == -1) {  // tbname
197,869,955✔
1717
        char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
7,341,680✔
1718
        if (p1->name != NULL) {
7,343,585✔
1719
          STR_TO_VARSTR(str, p1->name);
356,248✔
1720
        } else {  // name is not retrieved during filter
1721
          code = pStorageAPI->metaFn.getTableNameByUid(pVnode, p1->uid, str);
6,989,406✔
1722
          QUERY_CHECK_CODE(code, lino, _end);
6,989,728✔
1723
        }
1724

1725
        code = colDataSetVal(pColInfo, i, str, false);
7,345,729✔
1726
        QUERY_CHECK_CODE(code, lino, _end);
7,344,598✔
1727
#if TAG_FILTER_DEBUG
1728
        qDebug("tagfilter uid:%ld, tbname:%s", *uid, str + 2);
1729
#endif
1730
      } else {
1731
        STagVal tagVal = {0};
190,561,980✔
1732
        tagVal.cid = pColInfo->info.colId;
190,573,468✔
1733
        if (p1->pTagVal == NULL) {
190,581,563✔
1734
          colDataSetNULL(pColInfo, i);
4,150✔
1735
        } else {
1736
          const char* p = pStorageAPI->metaFn.extractTagVal(p1->pTagVal, pColInfo->info.type, &tagVal);
190,554,545✔
1737

1738
          if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) {
190,722,617✔
1739
            colDataSetNULL(pColInfo, i);
2,066,260✔
1740
          } else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
188,655,378✔
1741
            code = colDataSetVal(pColInfo, i, p, false);
588,892✔
1742
            QUERY_CHECK_CODE(code, lino, _end);
588,892✔
1743
          } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
315,988,583✔
1744
            if (IS_STR_DATA_BLOB(pColInfo->info.type)) {
127,905,117✔
UNCOV
1745
              QUERY_CHECK_CODE(code = TSDB_CODE_BLOB_NOT_SUPPORT_TAG, lino, _end);
×
1746
            }
1747
            char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
127,878,734✔
1748
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
127,891,237✔
1749
            varDataSetLen(tmp, tagVal.nData);
127,891,237✔
1750
            memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
127,907,633✔
1751
            code = colDataSetVal(pColInfo, i, tmp, false);
127,891,370✔
1752
#if TAG_FILTER_DEBUG
1753
            qDebug("tagfilter varch:%s", tmp + 2);
1754
#endif
1755
            taosMemoryFree(tmp);
127,912,724✔
1756
            QUERY_CHECK_CODE(code, lino, _end);
127,927,475✔
1757
          } else {
1758
            code = colDataSetVal(pColInfo, i, (const char*)&tagVal.i64, false);
60,193,080✔
1759
            QUERY_CHECK_CODE(code, lino, _end);
60,220,892✔
1760
#if TAG_FILTER_DEBUG
1761
            if (pColInfo->info.type == TSDB_DATA_TYPE_INT) {
1762
              qDebug("tagfilter int:%d", *(int*)(&tagVal.i64));
1763
            } else if (pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE) {
1764
              qDebug("tagfilter double:%f", *(double*)(&tagVal.i64));
1765
            }
1766
#endif
1767
          }
1768
        }
1769
      }
1770
    }
1771
  }
1772

1773
_end:
17,108,480✔
1774
  if (code != TSDB_CODE_SUCCESS) {
17,102,318✔
1775
    blockDataDestroy(pResBlock);
3,965✔
1776
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
247✔
1777
    terrno = code;
247✔
1778
    return NULL;
247✔
1779
  }
1780
  return pResBlock;
17,098,353✔
1781
}
1782

1783
static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList,
11,397,313✔
1784
                                 bool* pResultList, bool addUid) {
1785
  taosArrayClear(pUidList);
11,397,313✔
1786

1787
  STableKeyInfo info = {.uid = 0, .groupId = 0};
11,404,854✔
1788
  int32_t       numOfTables = taosArrayGetSize(pUidTagList);
11,407,159✔
1789
  for (int32_t i = 0; i < numOfTables; ++i) {
176,008,132✔
1790
    if (pResultList[i]) {
164,582,399✔
1791
      STUidTagInfo* tmpTag = (STUidTagInfo*)taosArrayGet(pUidTagList, i);
75,372,784✔
1792
      if (!tmpTag) {
75,375,051✔
UNCOV
1793
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1794
        return terrno;
×
1795
      }
1796
      uint64_t uid = tmpTag->uid;
75,375,051✔
1797
      qDebug("tagfilter get uid:%" PRId64 ", res:%d", uid, pResultList[i]);
75,361,573✔
1798

1799
      info.uid = uid;
75,379,337✔
1800
      //qInfo("doSetQualifiedUid row:%d added to pTableList", i);
1801
      void* p = taosArrayPush(pListInfo->pTableList, &info);
75,379,337✔
1802
      if (p == NULL) {
75,384,913✔
UNCOV
1803
        return terrno;
×
1804
      }
1805

1806
      if (addUid) {
75,384,913✔
1807
        //qInfo("doSetQualifiedUid row:%d added to pUidList", i);
1808
        void* tmp = taosArrayPush(pUidList, &uid);
10,183✔
1809
        if (tmp == NULL) {
10,183✔
UNCOV
1810
          return terrno;
×
1811
        }
1812
      }
1813
    } else {
1814
      //qInfo("doSetQualifiedUid row:%d failed", i);
1815
    }
1816
  }
1817

1818
  return TSDB_CODE_SUCCESS;
11,425,733✔
1819
}
1820

1821
static int32_t copyExistedUids(SArray* pUidTagList, const SArray* pUidList) {
14,172,527✔
1822
  int32_t code = TSDB_CODE_SUCCESS;
14,172,527✔
1823
  int32_t numOfExisted = taosArrayGetSize(pUidList);
14,172,527✔
1824
  if (numOfExisted == 0) {
14,174,287✔
1825
    return code;
11,633,336✔
1826
  }
1827

1828
  for (int32_t i = 0; i < numOfExisted; ++i) {
29,010,892✔
1829
    uint64_t* uid = taosArrayGet(pUidList, i);
26,469,579✔
1830
    if (!uid) {
26,469,579✔
UNCOV
1831
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1832
      return terrno;
×
1833
    }
1834
    STUidTagInfo info = {.uid = *uid};
26,469,579✔
1835
    void*        tmp = taosArrayPush(pUidTagList, &info);
26,469,941✔
1836
    if (!tmp) {
26,469,941✔
UNCOV
1837
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1838
      return code;
×
1839
    }
1840
  }
1841
  return code;
2,541,313✔
1842
}
1843

1844
int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SNode* pTagCond, void* pVnode,
84,387,616✔
1845
                                 SIdxFltStatus status, SStorageAPI* pAPI, bool addUid, bool* listAdded, void* pStreamInfo) {
1846
  *listAdded = false;
84,387,616✔
1847
  if (pTagCond == NULL) {
84,391,294✔
1848
    return TSDB_CODE_SUCCESS;
70,215,201✔
1849
  }
1850

1851
  terrno = TSDB_CODE_SUCCESS;
14,176,093✔
1852

1853
  int32_t      lino = 0;
14,170,329✔
1854
  int32_t      code = TSDB_CODE_SUCCESS;
14,170,329✔
1855
  SArray*      pBlockList = NULL;
14,170,329✔
1856
  SSDataBlock* pResBlock = NULL;
14,170,329✔
1857
  SScalarParam output = {0};
14,166,347✔
1858
  SArray*      pUidTagList = NULL;
14,165,212✔
1859

1860
  SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
14,165,212✔
1861

1862
  //  int64_t stt = taosGetTimestampUs();
1863
  pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
14,164,510✔
1864
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
14,167,602✔
1865

1866
  code = copyExistedUids(pUidTagList, pUidList);
14,167,602✔
1867
  QUERY_CHECK_CODE(code, lino, end);
14,166,475✔
1868

1869
  // Narrow down the scope of the tablelist set if there is tbname in condition and And Logical operator
1870
  bool narrowed = optimizeTbnameInCond(pVnode, pListInfo->idInfo.suid, pUidTagList, pTagCond, pAPI);
14,166,475✔
1871
  if (narrowed) {  // tbname in filter is activated, do nothing and return
14,169,134✔
1872
    taosArrayClear(pUidList);
812,184✔
1873

1874
    int32_t numOfRows = taosArrayGetSize(pUidTagList);
811,759✔
1875
    code = taosArrayEnsureCap(pUidList, numOfRows);
811,759✔
1876
    QUERY_CHECK_CODE(code, lino, end);
811,759✔
1877

1878
    for (int32_t i = 0; i < numOfRows; ++i) {
3,182,980✔
1879
      STUidTagInfo* pInfo = taosArrayGet(pUidTagList, i);
2,371,221✔
1880
      QUERY_CHECK_NULL(pInfo, code, lino, end, terrno);
2,371,221✔
1881
      void* tmp = taosArrayPush(pUidList, &pInfo->uid);
2,371,221✔
1882
      QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
2,371,221✔
1883
    }
1884
    terrno = 0;
811,759✔
1885
  } else {
1886
    qDebug("pUidTagList size:%d", (int32_t)taosArrayGetSize(pUidTagList));
13,356,950✔
1887

1888
    FilterCondType condType = checkTagCond(pTagCond);
13,358,303✔
1889
    if (((condType == FILTER_NO_LOGIC || condType == FILTER_AND) && status != SFLT_NOT_INDEX) || // (super table) use tagIndex and operator is and
13,355,837✔
1890
        (status == SFLT_NOT_INDEX && taosArrayGetSize(pUidTagList) > 0)) {                       // (child table with tagCond)
10,481,365✔
1891
      code = pAPI->metaFn.getTableTagsByUid(pVnode, pListInfo->idInfo.suid, pUidTagList);
3,126,302✔
1892
    } else {
1893
      taosArrayClear(pUidTagList);        // clear tablelist if using tagIndex and or condition
10,228,400✔
1894
      code = pAPI->metaFn.getTableTags(pVnode, pListInfo->idInfo.suid, pUidTagList);
10,231,350✔
1895
    }
1896
    if (code != TSDB_CODE_SUCCESS) {
13,359,917✔
UNCOV
1897
      qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code), pListInfo->idInfo.suid);
×
1898
      terrno = code;
×
1899
      QUERY_CHECK_CODE(code, lino, end);
×
1900
    }
1901
  }
1902

1903
  qDebug("final pUidTagList size:%d", (int32_t)taosArrayGetSize(pUidTagList));
14,171,609✔
1904

1905
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
14,176,259✔
1906
  if (numOfTables == 0) {
14,174,476✔
1907
    goto end;
2,748,579✔
1908
  }
1909

1910
  SArray* pColList = NULL;
11,425,897✔
1911
  code = qGetColumnsFromNodeList(pTagCond, false, &pColList); 
11,426,046✔
1912
  if (code != TSDB_CODE_SUCCESS) {
11,402,742✔
UNCOV
1913
    goto end;
×
1914
  }
1915
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
11,402,742✔
1916
  taosArrayDestroy(pColList);
11,422,477✔
1917
  if (pResBlock == NULL) {
11,420,891✔
UNCOV
1918
    code = terrno;
×
1919
    QUERY_CHECK_CODE(code, lino, end);
×
1920
  }
1921

1922
  //fprintDataBlock(pResBlock, "tagFilter", "", 0);
1923

1924
  //  int64_t st1 = taosGetTimestampUs();
1925
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
1926
  pBlockList = taosArrayInit(2, POINTER_BYTES);
11,420,891✔
1927
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
11,420,089✔
1928

1929
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
11,425,722✔
1930
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
11,425,722✔
1931

1932
  code = createResultData(&type, numOfTables, &output);
11,425,722✔
1933
  if (code != TSDB_CODE_SUCCESS) {
11,414,450✔
UNCOV
1934
    terrno = code;
×
1935
    QUERY_CHECK_CODE(code, lino, end);
×
1936
  }
1937

1938
  gTaskScalarExtra.pStreamInfo = pStreamInfo;
11,414,450✔
1939
  gTaskScalarExtra.pStreamRange = NULL;
11,414,450✔
1940
  code = scalarCalculate(pTagCond, pBlockList, &output, &gTaskScalarExtra);
11,399,348✔
1941
  if (code != TSDB_CODE_SUCCESS) {
11,399,000✔
1942
    qError("failed to calculate scalar, reason:%s", tstrerror(code));
914✔
1943
    terrno = code;
914✔
1944
    QUERY_CHECK_CODE(code, lino, end);
914✔
1945
  }
1946

1947
  code = doSetQualifiedUid(pListInfo, pUidList, pUidTagList, (bool*)output.columnData->pData, addUid);
11,398,086✔
1948
  if (code != TSDB_CODE_SUCCESS) {
11,425,946✔
UNCOV
1949
    terrno = code;
×
1950
    QUERY_CHECK_CODE(code, lino, end);
×
1951
  }
1952
  *listAdded = true;
11,425,946✔
1953

1954
end:
14,172,285✔
1955
  if (code != TSDB_CODE_SUCCESS) {
14,172,440✔
1956
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
914✔
1957
  }
1958
  blockDataDestroy(pResBlock);
14,172,440✔
1959
  taosArrayDestroy(pBlockList);
14,157,899✔
1960
  taosArrayDestroyEx(pUidTagList, freeItem);
14,150,313✔
1961

1962
  colDataDestroy(output.columnData);
14,166,237✔
1963
  taosMemoryFreeClear(output.columnData);
14,169,211✔
1964
  return code;
14,166,731✔
1965
}
1966

1967
typedef struct {
1968
  int32_t code;
1969
  SStreamRuntimeFuncInfo* pStreamRuntimeInfo;
1970
} PlaceHolderContext;
1971

1972
static EDealRes replacePlaceHolderColumn(SNode** pNode, void* pContext) {
76,923✔
1973
  PlaceHolderContext* pData = (PlaceHolderContext*)pContext;
76,923✔
1974
  if (QUERY_NODE_FUNCTION != nodeType((*pNode))) {
76,923✔
1975
    return DEAL_RES_CONTINUE;
63,404✔
1976
  }
1977
  SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
13,519✔
1978
  if (!fmIsStreamPesudoColVal(pFuncNode->funcId)) {
13,519✔
1979
    return DEAL_RES_CONTINUE;
494✔
1980
  }
1981
  pData->code = fmSetStreamPseudoFuncParamVal(pFuncNode->funcId, pFuncNode->pParameterList, pData->pStreamRuntimeInfo);
13,025✔
1982
  if (pData->code != TSDB_CODE_SUCCESS) {
13,025✔
UNCOV
1983
    return DEAL_RES_ERROR;
×
1984
  }
1985
  SNode* pFirstParam = nodesListGetNode(pFuncNode->pParameterList, 0);
13,025✔
1986
  ((SValueNode*)pFirstParam)->translate = true;
13,025✔
1987
  SValueNode* res = NULL;
13,025✔
1988
  pData->code = nodesCloneNode(pFirstParam, (SNode**)&res);
13,025✔
1989
  if (NULL == res) {
13,025✔
UNCOV
1990
    return DEAL_RES_ERROR;
×
1991
  }
1992
  nodesDestroyNode(*pNode);
13,025✔
1993
  *pNode = (SNode*)res;
13,025✔
1994

1995
  return DEAL_RES_CONTINUE;
13,025✔
1996
}
1997

1998
static void extractTagColId(SOperatorNode* pOpNode, SArray* pColIdArray) {
20,916✔
1999
  SNode* pLeft = pOpNode->pLeft;
20,916✔
2000
  SNode* pRight = pOpNode->pRight;
20,916✔
2001
  SColumnNode* pColNode = nodeType(pLeft) == QUERY_NODE_COLUMN ?
20,916✔
2002
    (SColumnNode*)pLeft : (SColumnNode*)pRight;
20,916✔
2003

2004
  col_id_t colId = pColNode->colId;
20,916✔
2005
  void* _tmp = taosArrayPush(pColIdArray, &colId);
20,916✔
2006
}
20,916✔
2007

2008
static int32_t buildTagCondKey(
10,458✔
2009
  const SNode* pTagCond, char** pTagCondKey,
2010
  int32_t* tagCondKeyLen, SArray** pTagColIds) {
2011
  if (NULL == pTagCond ||
10,458✔
2012
    (nodeType(pTagCond) != QUERY_NODE_OPERATOR &&
10,458✔
2013
      nodeType(pTagCond) != QUERY_NODE_LOGIC_CONDITION)) {
10,458✔
UNCOV
2014
    qError("invalid parameter to extract tag filter symbol");
×
2015
    return TSDB_CODE_INTERNAL_ERROR;
×
2016
  }
2017
  int32_t code = TSDB_CODE_SUCCESS;
10,458✔
2018
  int32_t lino = 0;
10,458✔
2019
  *pTagColIds = taosArrayInit(4, sizeof(col_id_t));
10,458✔
2020

2021
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR) {
10,458✔
UNCOV
2022
    extractTagColId((SOperatorNode*)pTagCond, *pTagColIds);
×
2023
  } else if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION) {
10,458✔
2024
    SNode* pChild = NULL;
10,458✔
2025
    FOREACH(pChild, ((SLogicConditionNode*)pTagCond)->pParameterList) {
31,374✔
2026
      extractTagColId((SOperatorNode*)pChild, *pTagColIds);
20,916✔
2027
    }
2028
  }
2029

2030
  taosArraySort(*pTagColIds, compareUint16Val);
10,458✔
2031

2032
  // encode ordered colIds into key string, separated by ','
2033
  *tagCondKeyLen =
20,916✔
2034
    (int32_t)(taosArrayGetSize(*pTagColIds) * (sizeof(col_id_t) + 1) - 1);
10,458✔
2035
  *pTagCondKey = (char*)taosMemoryCalloc(1, *tagCondKeyLen);
10,458✔
2036
  TSDB_CHECK_NULL(*pTagCondKey, code, lino, _end, terrno);
10,458✔
2037
  char* pStart = *pTagCondKey;
10,458✔
2038
  for (int32_t i = 0; i < taosArrayGetSize(*pTagColIds); ++i) {
31,374✔
2039
    col_id_t* pColId = (col_id_t*)taosArrayGet(*pTagColIds, i);
20,916✔
2040
    TSDB_CHECK_NULL(pColId, code, lino, _end, terrno);
20,916✔
2041
    memcpy(pStart, pColId, sizeof(col_id_t));
20,916✔
2042
    pStart += sizeof(col_id_t);
20,916✔
2043
    if (i != taosArrayGetSize(*pTagColIds) - 1) {
20,916✔
2044
      *pStart = ',';
10,458✔
2045
      pStart += 1;
10,458✔
2046
    }
2047
  }
2048

2049
_end:
10,458✔
2050
  if (TSDB_CODE_SUCCESS != code) {
10,458✔
UNCOV
2051
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2052
    terrno = code;
×
2053
  }
2054
  return code;
10,458✔
2055
}
2056

2057
static EDealRes canOptimizeTagCondFilter(SNode* pTagCond, void* pContext) {
97,276✔
2058
  if (NULL == pTagCond) {
97,276✔
UNCOV
2059
    *(bool*)pContext = false;
×
2060
    return DEAL_RES_END;
×
2061
  }
2062
  if (nodeType(pTagCond) == QUERY_NODE_VALUE ||
97,276✔
2063
    nodeType(pTagCond) == QUERY_NODE_COLUMN) {
64,574✔
2064
    return DEAL_RES_CONTINUE;
53,618✔
2065
  }
2066
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR &&
43,658✔
2067
    ((SOperatorNode*)pTagCond)->opType == OP_TYPE_EQUAL) {
21,414✔
2068
    return DEAL_RES_CONTINUE;
20,916✔
2069
  }
2070
  if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION &&
22,742✔
2071
    ((SLogicConditionNode*)pTagCond)->condType == LOGIC_COND_TYPE_AND) {
10,458✔
2072
    return DEAL_RES_CONTINUE;
10,458✔
2073
  }
2074
  if (nodeType(pTagCond) == QUERY_NODE_FUNCTION &&
24,070✔
2075
    fmIsStreamPesudoColVal(((SFunctionNode*)pTagCond)->funcId)) {
11,786✔
2076
    return DEAL_RES_CONTINUE;
11,786✔
2077
  }
2078
  *(bool*)pContext = false;
498✔
2079
  return DEAL_RES_END;
498✔
2080
}
2081

2082
int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
181,679,155✔
2083
                     STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI, void* pStreamInfo) {
2084
  int32_t code = TSDB_CODE_SUCCESS;
181,679,155✔
2085
  int32_t lino = 0;
181,679,155✔
2086
  size_t  numOfTables = 0;
181,679,155✔
2087
  bool    listAdded = false;
181,679,155✔
2088

2089
  pListInfo->idInfo.suid = pScanNode->suid;
181,712,491✔
2090
  pListInfo->idInfo.tableType = pScanNode->tableType;
181,718,352✔
2091

2092
  SArray* pUidList = taosArrayInit(8, sizeof(uint64_t));
181,623,477✔
2093
  QUERY_CHECK_NULL(pUidList, code, lino, _error, terrno);
181,558,478✔
2094

2095
  char*   pTagCondKey = NULL;
181,558,478✔
2096
  int32_t tagCondKeyLen;
181,522,529✔
2097
  SArray* pTagColIds = NULL;
181,583,969✔
2098
  char*   pPayload = NULL;
181,597,163✔
2099
  qTrace("getTableList called, suid:%" PRIu64
181,597,163✔
2100
    ", tagCond:%p, tagIndexCond:%p, %d %d", pScanNode->suid, pTagCond,
2101
    pTagIndexCond, pScanNode->tableType, pScanNode->virtualStableScan);
2102
  if (pScanNode->tableType != TSDB_SUPER_TABLE && !pScanNode->virtualStableScan) {
181,599,759✔
2103
    pListInfo->idInfo.uid = pScanNode->uid;
70,418,467✔
2104
    if (pStorageAPI->metaFn.isTableExisted(pVnode, pScanNode->uid)) {
70,398,956✔
2105
      void* tmp = taosArrayPush(pUidList, &pScanNode->uid);
70,397,676✔
2106
      QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
70,395,026✔
2107
    }
2108
    code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, SFLT_NOT_INDEX, pStorageAPI, false, &listAdded, pStreamInfo);
70,440,135✔
2109
    QUERY_CHECK_CODE(code, lino, _end);
70,436,705✔
2110
  } else {
2111
    bool      isStream = (pStreamInfo != NULL);
111,319,631✔
2112
    bool      hasTagCond = (pTagCond != NULL);
111,319,631✔
2113
    bool      canCacheTagEqCondFilter = false;
111,319,631✔
2114
    T_MD5_CTX context = {0};
111,255,332✔
2115

2116
    qTrace("start to get table list by tag filter, suid:%" PRIu64
111,352,644✔
2117
      ",tsStableTagFilterCache:%d, tsTagFilterCache:%d", 
2118
      pScanNode->suid, tsStableTagFilterCache, tsTagFilterCache);
2119

2120
    bool acquired = false;
111,352,644✔
2121
    // first, check whether we can use stable tag filter cache
2122
    if (tsStableTagFilterCache && isStream && hasTagCond) {
111,255,997✔
2123
      canCacheTagEqCondFilter = true;
10,956✔
2124
      nodesWalkExpr(pTagCond, canOptimizeTagCondFilter,
10,956✔
2125
        (void*)&canCacheTagEqCondFilter);
2126
    }
2127
    if (canCacheTagEqCondFilter) {
111,160,198✔
2128
      qDebug("%s, stable tag filter condition can be optimized", idstr);
10,458✔
2129
      if (((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
10,458✔
2130
        SNode* tmp = NULL;
10,458✔
2131
        code = nodesCloneNode((SNode*)pTagCond, &tmp);
10,458✔
2132
        QUERY_CHECK_CODE(code, lino, _error);
10,458✔
2133

2134
        PlaceHolderContext ctx = {.code = TSDB_CODE_SUCCESS, .pStreamRuntimeInfo = (SStreamRuntimeFuncInfo*)pStreamInfo};
10,458✔
2135
        nodesRewriteExpr(&tmp, replacePlaceHolderColumn, (void*)&ctx);
10,458✔
2136
        if (TSDB_CODE_SUCCESS != ctx.code) {
10,458✔
UNCOV
2137
          nodesDestroyNode(tmp);
×
2138
          code = ctx.code;
×
2139
          goto _error;
×
2140
        }
2141
        code = genStableTagFilterDigest(tmp, &context);
10,458✔
2142
        nodesDestroyNode(tmp);
10,458✔
2143
      } else {
UNCOV
2144
        code = genStableTagFilterDigest(pTagCond, &context);
×
2145
      }
2146
      QUERY_CHECK_CODE(code, lino, _error);
10,458✔
2147

2148
      code = buildTagCondKey(
10,458✔
2149
        pTagCond, &pTagCondKey, &tagCondKeyLen, &pTagColIds);
2150
      QUERY_CHECK_CODE(code, lino, _error);
10,458✔
2151
      code = pStorageAPI->metaFn.getStableCachedTableList(
10,458✔
2152
        pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
10,458✔
2153
        context.digest, tListLen(context.digest), pUidList, &acquired);
2154
      QUERY_CHECK_CODE(code, lino, _error);
10,458✔
2155
    } else if (tsTagFilterCache) {
111,149,740✔
2156
      // second, try to use normal tag filter cache
2157
      qDebug("%s using normal tag filter cache", idstr);
30,176✔
2158
      if (pStreamInfo != NULL && ((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
31,415✔
2159
        SNode* tmp = NULL;
1,239✔
2160
        code = nodesCloneNode((SNode*)pTagCond, &tmp);
1,239✔
2161
        QUERY_CHECK_CODE(code, lino, _error);
1,239✔
2162

2163
        PlaceHolderContext ctx = {.code = TSDB_CODE_SUCCESS, .pStreamRuntimeInfo = (SStreamRuntimeFuncInfo*)pStreamInfo};
1,239✔
2164
        nodesRewriteExpr(&tmp, replacePlaceHolderColumn, (void*)&ctx);
1,239✔
2165
        if (TSDB_CODE_SUCCESS != ctx.code) {
1,239✔
UNCOV
2166
          nodesDestroyNode(tmp);
×
2167
          code = ctx.code;
×
2168
          goto _error;
×
2169
        }
2170
        code = genTagFilterDigest(tmp, &context);
1,239✔
2171
        nodesDestroyNode(tmp);
1,239✔
2172
      } else {
2173
        code = genTagFilterDigest(pTagCond, &context);
28,937✔
2174
      }
2175
      // try to retrieve the result from meta cache
2176
      QUERY_CHECK_CODE(code, lino, _error);      
30,176✔
2177
      code = pStorageAPI->metaFn.getCachedTableList(
30,176✔
2178
        pVnode, pScanNode->suid, context.digest,
30,176✔
2179
        tListLen(context.digest), pUidList, &acquired);
2180
      QUERY_CHECK_CODE(code, lino, _error);
2✔
2181
    }
2182
    if (acquired) {
111,076,575✔
2183
      taosArrayDestroy(pTagColIds);
28,119✔
2184
      pTagColIds = NULL;
28,119✔
2185
      
2186
      digest[0] = 1;
28,119✔
2187
      memcpy(
56,238✔
2188
        digest + 1, context.digest, tListLen(context.digest));
28,119✔
2189
      qDebug("suid:%" PRIu64 ", %s retrieve table uid list from cache,"
28,119✔
2190
        " numOfTables:%d", 
2191
        pScanNode->suid, idstr, (int32_t)taosArrayGetSize(pUidList));
2192
      goto _end;
28,119✔
2193
    } else {
2194
      qDebug("suid:%" PRIu64 
111,048,456✔
2195
        ", failed to get table uid list from cache", pScanNode->suid);
2196
    }
2197

2198
    if (!pTagCond) {  // no tag filter condition exists, let's fetch all tables of this super table
111,334,393✔
2199
      code = pStorageAPI->metaFn.getChildTableList(pVnode, pScanNode->suid, pUidList);
97,419,983✔
2200
      QUERY_CHECK_CODE(code, lino, _error);
97,373,838✔
2201
      qTrace("no tag filter, get all child tables, numOfTables:%d", (int32_t)taosArrayGetSize(pUidList));
97,373,838✔
2202
    } else {
2203
      SIdxFltStatus status = SFLT_NOT_INDEX;
13,914,410✔
2204
      if (pTagIndexCond) {
13,915,587✔
2205
        void* pIndex = pStorageAPI->metaFn.getInvertIndex(pVnode);
3,942,153✔
2206

2207
        SIndexMetaArg metaArg = {.metaEx = pVnode,
3,942,177✔
2208
                                 .idx = pStorageAPI->metaFn.storeGetIndexInfo(pVnode),
3,942,153✔
2209
                                 .ivtIdx = pIndex,
2210
                                 .suid = pScanNode->uid};
3,942,153✔
2211
        code = doFilterTag(pTagIndexCond, &metaArg, pUidList, &status, &pStorageAPI->metaFilter);
3,942,153✔
2212
        if (code != 0 || status == SFLT_NOT_INDEX) {  // temporarily disable it for performance sake
3,934,893✔
2213
          qDebug("failed to get tableIds from index, suid:%" PRIu64 ", uidListSize:%d", pScanNode->uid, (int32_t)taosArrayGetSize(pUidList));
1,060,204✔
2214
        } else {
2215
          qDebug("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
2,874,689✔
2216
        }
2217
      }
2218
      qTrace("after index filter, pTagCond:%p uidListSize:%d", pTagCond, (int32_t)taosArrayGetSize(pUidList));
13,915,587✔
2219
      code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status,
13,916,161✔
2220
        pStorageAPI, tsTagFilterCache || tsStableTagFilterCache,
13,916,161✔
2221
        &listAdded, pStreamInfo);
2222
      QUERY_CHECK_CODE(code, lino, _error);
13,906,414✔
2223
    }
2224
    // let's add the filter results into meta-cache
2225
    numOfTables = taosArrayGetSize(pUidList);
111,281,625✔
2226

2227
    if (canCacheTagEqCondFilter) {
111,291,465✔
2228
      qInfo("%s, suid:%" PRIu64 ", add uid list to stable tag filter cache, "
4,648✔
2229
            "uidListSize:%d, origin key:%" PRIu64 ",%" PRIu64,
2230
            idstr, pScanNode->suid, (int32_t)numOfTables,
2231
            *(uint64_t*)context.digest, *(uint64_t*)(context.digest + 8));
2232

2233
      code = pStorageAPI->metaFn.putStableCachedTableList(
4,648✔
2234
        pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
2235
        context.digest, tListLen(context.digest),
2236
        pUidList, &pTagColIds);
2237
      QUERY_CHECK_CODE(code, lino, _end);
4,648✔
2238

2239
      digest[0] = 1;
4,648✔
2240
      memcpy(digest + 1, context.digest, tListLen(context.digest));
4,648✔
2241
    } else if (tsTagFilterCache) {
111,286,817✔
2242
      qInfo("%s, suid:%" PRIu64 ", add uid list to normal tag filter cache, "
7,867✔
2243
            "uidListSize:%d, origin key:%" PRIu64 ",%" PRIu64,
2244
            idstr, pScanNode->suid, (int32_t)numOfTables,
2245
            *(uint64_t*)context.digest, *(uint64_t*)(context.digest + 8));
2246
      size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
7,867✔
2247
      pPayload = taosMemoryMalloc(size);
7,867✔
2248
      QUERY_CHECK_NULL(pPayload, code, lino, _end, terrno);
7,867✔
2249

2250
      *(int32_t*)pPayload = (int32_t)numOfTables;
7,867✔
2251
      if (numOfTables > 0) {
7,867✔
2252
        void* tmp = taosArrayGet(pUidList, 0);
6,377✔
2253
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
6,377✔
2254
        memcpy(pPayload + sizeof(int32_t), tmp, numOfTables * sizeof(uint64_t));
6,377✔
2255
      }
2256

2257
      code = pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid,
7,867✔
2258
                                                    context.digest,
2259
                                                    tListLen(context.digest),
2260
                                                    pPayload, size, 1);
2261
      if (TSDB_CODE_SUCCESS == code) {
7,867✔
2262
        /*
2263
          data referenced by pPayload is used in lru cache,
2264
          reset pPayload to NULL to avoid being freed in _error block
2265
        */
2266
        pPayload = NULL;
7,701✔
2267
      } else {
2268
        if (TSDB_CODE_DUP_KEY == code) {
166✔
2269
          /*
2270
            another thread has already put the same key into cache,
2271
            we can just ignore this error
2272
          */
2273
          code = TSDB_CODE_SUCCESS;
166✔
2274
        }
2275
        QUERY_CHECK_CODE(code, lino, _end);
166✔
2276
      }
2277

2278

2279
      digest[0] = 1;
7,867✔
2280
      memcpy(digest + 1, context.digest, tListLen(context.digest));
7,867✔
2281
    }
2282
  }
2283

2284
_end:
181,732,585✔
2285
  if (!listAdded) {
181,769,565✔
2286
    numOfTables = taosArrayGetSize(pUidList);
170,332,829✔
2287
    for (int i = 0; i < numOfTables; i++) {
673,555,502✔
2288
      void* tmp = taosArrayGet(pUidList, i);
503,179,418✔
2289
      QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
503,226,272✔
2290
      STableKeyInfo info = {.uid = *(uint64_t*)tmp, .groupId = 0};
503,226,272✔
2291

2292
      void* p = taosArrayPush(pListInfo->pTableList, &info);
503,207,731✔
2293
      if (p == NULL) {
503,275,341✔
UNCOV
2294
        taosArrayDestroy(pUidList);
×
2295
        return terrno;
×
2296
      }
2297

2298
      qTrace("tagfilter get uid:%" PRIu64 ", %s", info.uid, idstr);
503,275,341✔
2299
    }
2300
  }
2301

2302
  qDebug("%s, table list with %d uids built", idstr, (int32_t)numOfTables);
181,812,820✔
2303

2304
_error:
181,816,589✔
2305
  taosArrayDestroy(pUidList);
181,829,043✔
2306
  taosArrayDestroy(pTagColIds);
181,744,423✔
2307
  taosMemFreeClear(pTagCondKey);
181,750,094✔
2308
  taosMemFreeClear(pPayload);
181,750,094✔
2309
  if (code != TSDB_CODE_SUCCESS) {
181,750,094✔
2310
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
914✔
2311
  }
2312
  return code;
181,735,702✔
2313
}
2314

2315
int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList, void* pTaskInfo) {
20,178✔
2316
  int32_t        code = TSDB_CODE_SUCCESS;
20,178✔
2317
  int32_t        lino = 0;
20,178✔
2318
  SSubplan*      pSubplan = (SSubplan*)node;
20,178✔
2319
  SScanPhysiNode pNode = {0};
20,178✔
2320
  pNode.suid = suid;
20,178✔
2321
  pNode.uid = suid;
20,178✔
2322
  pNode.tableType = TSDB_SUPER_TABLE;
20,178✔
2323

2324
  STableListInfo* pTableListInfo = tableListCreate();
20,178✔
2325
  QUERY_CHECK_NULL(pTableListInfo, code, lino, _end, terrno);
20,178✔
2326
  uint8_t digest[17] = {0};
20,178✔
2327
  code = getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL,
20,178✔
2328
                      pTableListInfo, digest, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI, NULL);
2329
  QUERY_CHECK_CODE(code, lino, _end);
20,178✔
2330
  *tableList = pTableListInfo->pTableList;
20,178✔
2331
  pTableListInfo->pTableList = NULL;
20,178✔
2332
  tableListDestroy(pTableListInfo);
20,178✔
2333

2334
_end:
20,178✔
2335
  if (code != TSDB_CODE_SUCCESS) {
20,178✔
UNCOV
2336
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2337
  }
2338
  return code;
20,178✔
2339
}
2340

UNCOV
2341
size_t getTableTagsBufLen(const SNodeList* pGroups) {
×
2342
  size_t keyLen = 0;
×
2343

2344
  SNode* node;
UNCOV
2345
  FOREACH(node, pGroups) {
×
2346
    SExprNode* pExpr = (SExprNode*)node;
×
2347
    keyLen += pExpr->resType.bytes;
×
2348
  }
2349

UNCOV
2350
  keyLen += sizeof(int8_t) * LIST_LENGTH(pGroups);
×
2351
  return keyLen;
×
2352
}
2353

UNCOV
2354
int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId,
×
2355
                              SStorageAPI* pAPI) {
UNCOV
2356
  SMetaReader mr = {0};
×
2357

UNCOV
2358
  pAPI->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &pAPI->metaFn);
×
2359
  if (pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid) != 0) {  // table not exist
×
2360
    pAPI->metaReaderFn.clearReader(&mr);
×
2361
    return TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
2362
  }
2363

UNCOV
2364
  SNodeList* groupNew = NULL;
×
2365
  int32_t    code = nodesCloneList(pGroupNode, &groupNew);
×
2366
  if (TSDB_CODE_SUCCESS != code) {
×
2367
    pAPI->metaReaderFn.clearReader(&mr);
×
2368
    return code;
×
2369
  }
2370

UNCOV
2371
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
×
2372
  nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &ctx);
×
2373
  if (TSDB_CODE_SUCCESS != ctx.code) {
×
2374
    nodesDestroyList(groupNew);
×
2375
    pAPI->metaReaderFn.clearReader(&mr);
×
2376
    return code;
×
2377
  }
UNCOV
2378
  char* isNull = (char*)keyBuf;
×
2379
  char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(pGroupNode);
×
2380

2381
  SNode*  pNode;
UNCOV
2382
  int32_t index = 0;
×
2383
  FOREACH(pNode, groupNew) {
×
2384
    SNode*  pNew = NULL;
×
2385
    int32_t code = scalarCalculateConstants(pNode, &pNew);
×
2386
    if (TSDB_CODE_SUCCESS == code) {
×
2387
      REPLACE_NODE(pNew);
×
2388
    } else {
UNCOV
2389
      nodesDestroyList(groupNew);
×
2390
      pAPI->metaReaderFn.clearReader(&mr);
×
2391
      return code;
×
2392
    }
2393

UNCOV
2394
    if (nodeType(pNew) != QUERY_NODE_VALUE) {
×
2395
      nodesDestroyList(groupNew);
×
2396
      pAPI->metaReaderFn.clearReader(&mr);
×
2397
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
2398
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2399
    }
UNCOV
2400
    SValueNode* pValue = (SValueNode*)pNew;
×
2401

UNCOV
2402
    if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
×
2403
      isNull[index++] = 1;
×
2404
      continue;
×
2405
    } else {
UNCOV
2406
      isNull[index++] = 0;
×
2407
      char* data = nodesGetValueFromNode(pValue);
×
2408
      if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
×
2409
        if (tTagIsJson(data)) {
×
2410
          terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
×
2411
          nodesDestroyList(groupNew);
×
2412
          pAPI->metaReaderFn.clearReader(&mr);
×
2413
          return terrno;
×
2414
        }
UNCOV
2415
        int32_t len = getJsonValueLen(data);
×
2416
        memcpy(pStart, data, len);
×
2417
        pStart += len;
×
2418
      } else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
×
2419
        if (IS_STR_DATA_BLOB(pValue->node.resType.type)) {
×
2420
          return TSDB_CODE_BLOB_NOT_SUPPORT_TAG;
×
2421
        }
UNCOV
2422
        memcpy(pStart, data, varDataTLen(data));
×
2423
        pStart += varDataTLen(data);
×
2424
      } else {
UNCOV
2425
        memcpy(pStart, data, pValue->node.resType.bytes);
×
2426
        pStart += pValue->node.resType.bytes;
×
2427
      }
2428
    }
2429
  }
2430

UNCOV
2431
  int32_t len = (int32_t)(pStart - (char*)keyBuf);
×
2432
  *pGroupId = calcGroupId(keyBuf, len);
×
2433

UNCOV
2434
  nodesDestroyList(groupNew);
×
2435
  pAPI->metaReaderFn.clearReader(&mr);
×
2436

UNCOV
2437
  return TSDB_CODE_SUCCESS;
×
2438
}
2439

2440
SArray* makeColumnArrayFromList(SNodeList* pNodeList) {
3,275,523✔
2441
  if (!pNodeList) {
3,275,523✔
UNCOV
2442
    return NULL;
×
2443
  }
2444

2445
  size_t  numOfCols = LIST_LENGTH(pNodeList);
3,275,523✔
2446
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
3,275,535✔
2447
  if (pList == NULL) {
3,270,964✔
UNCOV
2448
    return NULL;
×
2449
  }
2450

2451
  for (int32_t i = 0; i < numOfCols; ++i) {
7,430,528✔
2452
    SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
4,160,174✔
2453
    if (!pColNode) {
4,163,187✔
UNCOV
2454
      taosArrayDestroy(pList);
×
2455
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
2456
      return NULL;
×
2457
    }
2458

2459
    // todo extract method
2460
    SColumn c = {0};
4,163,187✔
2461
    c.slotId = pColNode->slotId;
4,162,325✔
2462
    c.colId = pColNode->colId;
4,161,536✔
2463
    c.type = pColNode->node.resType.type;
4,162,884✔
2464
    c.bytes = pColNode->node.resType.bytes;
4,160,727✔
2465
    c.precision = pColNode->node.resType.precision;
4,160,692✔
2466
    c.scale = pColNode->node.resType.scale;
4,162,073✔
2467

2468
    void* tmp = taosArrayPush(pList, &c);
4,161,453✔
2469
    if (!tmp) {
4,161,453✔
UNCOV
2470
      taosArrayDestroy(pList);
×
2471
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
2472
      return NULL;
×
2473
    }
2474
  }
2475

2476
  return pList;
3,270,354✔
2477
}
2478

2479
int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
246,627,901✔
2480
                            int32_t type, SColMatchInfo* pMatchInfo) {
2481
  size_t  numOfCols = LIST_LENGTH(pNodeList);
246,627,901✔
2482
  int32_t code = TSDB_CODE_SUCCESS;
246,629,330✔
2483
  int32_t lino = 0;
246,629,330✔
2484

2485
  pMatchInfo->matchType = type;
246,629,330✔
2486

2487
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchItem));
246,644,717✔
2488
  if (pList == NULL) {
246,531,763✔
UNCOV
2489
    code = terrno;
×
2490
    return code;
×
2491
  }
2492

2493
  for (int32_t i = 0; i < numOfCols; ++i) {
1,317,240,157✔
2494
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
1,070,636,309✔
2495
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
1,070,764,865✔
2496
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
1,070,764,865✔
2497
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
1,058,979,568✔
2498

2499
      SColMatchItem c = {.needOutput = true};
1,059,021,795✔
2500
      c.colId = pColNode->colId;
1,059,000,242✔
2501
      c.srcSlotId = pColNode->slotId;
1,058,954,022✔
2502
      c.dstSlotId = pNode->slotId;
1,058,935,273✔
2503
      c.isPk = pColNode->isPk;
1,059,037,478✔
2504
      c.dataType = pColNode->node.resType;
1,058,985,030✔
2505
      void* tmp = taosArrayPush(pList, &c);
1,058,984,417✔
2506
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,058,984,417✔
2507
    }
2508
  }
2509

2510
  // set the output flag for each column in SColMatchInfo, according to the
2511
  *numOfOutputCols = 0;
246,603,848✔
2512
  int32_t num = LIST_LENGTH(pOutputNodeList->pSlots);
246,696,306✔
2513
  for (int32_t i = 0; i < num; ++i) {
1,511,311,649✔
2514
    SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i);
1,264,681,323✔
2515
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
1,264,815,560✔
2516

2517
    // todo: add reserve flag check
2518
    // it is a column reserved for the arithmetic expression calculation
2519
    if (pNode->slotId >= numOfCols) {
1,264,815,560✔
2520
      (*numOfOutputCols) += 1;
194,126,778✔
2521
      continue;
194,130,535✔
2522
    }
2523

2524
    SColMatchItem* info = NULL;
1,070,739,136✔
2525
    for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
2,147,483,647✔
2526
      info = taosArrayGet(pList, j);
2,147,483,647✔
2527
      QUERY_CHECK_NULL(info, code, lino, _end, terrno);
2,147,483,647✔
2528
      if (info->dstSlotId == pNode->slotId) {
2,147,483,647✔
2529
        break;
1,058,092,609✔
2530
      }
2531
    }
2532

2533
    if (pNode->output) {
13,637,705✔
2534
      (*numOfOutputCols) += 1;
1,061,289,603✔
2535
    } else if (info != NULL) {
9,323,670✔
2536
      // select distinct tbname from stb where tbname='abc';
2537
      info->needOutput = false;
9,333,647✔
2538
    }
2539
  }
2540

2541
  pMatchInfo->pList = pList;
246,630,326✔
2542

2543
_end:
246,678,781✔
2544
  if (code != TSDB_CODE_SUCCESS) {
246,678,781✔
UNCOV
2545
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2546
  }
2547
  return code;
246,632,184✔
2548
}
2549

2550
static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision,
1,070,248,236✔
2551
                                  const char* name) {
2552
  SResSchema s = {0};
1,070,248,236✔
2553
  s.scale = scale;
1,070,352,597✔
2554
  s.type = type;
1,070,352,597✔
2555
  s.bytes = bytes;
1,070,352,597✔
2556
  s.slotId = slotId;
1,070,352,597✔
2557
  s.precision = precision;
1,070,352,597✔
2558
  tstrncpy(s.name, name, tListLen(s.name));
1,070,352,597✔
2559

2560
  return s;
1,070,352,597✔
2561
}
2562

2563
static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType, EColumnType colType) {
1,044,185,446✔
2564
  SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn));
1,044,185,446✔
2565
  if (pCol == NULL) {
1,043,455,448✔
UNCOV
2566
    return NULL;
×
2567
  }
2568

2569
  pCol->slotId = slotId;
1,043,455,448✔
2570
  pCol->colId = colId;
1,043,511,850✔
2571
  pCol->bytes = pType->bytes;
1,043,594,023✔
2572
  pCol->type = pType->type;
1,043,638,610✔
2573
  pCol->scale = pType->scale;
1,043,960,350✔
2574
  pCol->precision = pType->precision;
1,043,851,873✔
2575
  pCol->dataBlockId = blockId;
1,044,221,480✔
2576
  pCol->colType = colType;
1,044,186,208✔
2577
  return pCol;
1,044,178,395✔
2578
}
2579

2580
int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
1,070,711,207✔
2581
  int32_t code = TSDB_CODE_SUCCESS;
1,070,711,207✔
2582
  int32_t lino = 0;
1,070,711,207✔
2583
  pExp->base.numOfParams = 0;
1,070,711,207✔
2584
  pExp->base.pParam = NULL;
1,070,789,134✔
2585
  pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode));
1,070,692,302✔
2586
  QUERY_CHECK_NULL(pExp->pExpr, code, lino, _end, terrno);
1,069,838,459✔
2587

2588
  pExp->pExpr->_function.num = 1;
1,069,971,432✔
2589
  pExp->pExpr->_function.functionId = -1;
1,070,093,087✔
2590

2591
  int32_t type = nodeType(pNode);
1,070,176,912✔
2592
  // it is a project query, or group by column
2593
  if (type == QUERY_NODE_COLUMN) {
1,070,482,944✔
2594
    pExp->pExpr->nodeType = QUERY_NODE_COLUMN;
589,970,970✔
2595
    SColumnNode* pColNode = (SColumnNode*)pNode;
590,031,201✔
2596

2597
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
590,031,201✔
2598
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
589,917,635✔
2599

2600
    pExp->base.numOfParams = 1;
589,927,528✔
2601

2602
    SDataType* pType = &pColNode->node.resType;
589,921,959✔
2603
    pExp->base.resSchema =
2604
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pColNode->colName);
589,993,035✔
2605

2606
    pExp->base.pParam[0].pCol =
1,179,980,411✔
2607
        createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType);
1,179,961,577✔
2608
    QUERY_CHECK_NULL(pExp->base.pParam[0].pCol, code, lino, _end, terrno);
590,012,334✔
2609

2610
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
589,860,149✔
2611
  } else if (type == QUERY_NODE_VALUE) {
480,511,974✔
2612
    pExp->pExpr->nodeType = QUERY_NODE_VALUE;
16,723,535✔
2613
    SValueNode* pValNode = (SValueNode*)pNode;
16,721,453✔
2614

2615
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
16,721,453✔
2616
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
16,712,094✔
2617

2618
    pExp->base.numOfParams = 1;
16,716,915✔
2619

2620
    SDataType* pType = &pValNode->node.resType;
16,716,202✔
2621
    pExp->base.resSchema =
2622
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName);
16,713,384✔
2623
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
16,721,182✔
2624
    code = nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
16,717,424✔
2625
    QUERY_CHECK_CODE(code, lino, _end);
16,719,881✔
2626
  } else if (type == QUERY_NODE_REMOTE_VALUE) {
463,788,439✔
2627
    SRemoteValueNode* pRemote = (SRemoteValueNode*)pNode;
929,800✔
2628
    code = qFetchRemoteNode(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, pNode);
929,800✔
2629
    QUERY_CHECK_CODE(code, lino, _end);
931,590✔
2630

2631
    pExp->pExpr->nodeType = QUERY_NODE_VALUE;
655,572✔
2632
    SValueNode* pValNode = (SValueNode*)pNode;
655,572✔
2633

2634
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
655,572✔
2635
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
655,572✔
2636

2637
    pExp->base.numOfParams = 1;
655,572✔
2638

2639
    SDataType* pType = &pValNode->node.resType;
655,572✔
2640
    pExp->base.resSchema =
2641
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName);
655,572✔
2642
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
655,572✔
2643
    code = nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
655,572✔
2644
    QUERY_CHECK_CODE(code, lino, _end);
655,572✔
2645
  } else if (type == QUERY_NODE_FUNCTION) {
462,858,639✔
2646
    pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
440,130,235✔
2647
    SFunctionNode* pFuncNode = (SFunctionNode*)pNode;
440,126,485✔
2648

2649
    SDataType* pType = &pFuncNode->node.resType;
440,126,485✔
2650
    pExp->base.resSchema =
2651
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pFuncNode->node.aliasName);
440,172,465✔
2652
    tExprNode* pExprNode = pExp->pExpr;
440,161,289✔
2653

2654
    pExprNode->_function.functionId = pFuncNode->funcId;
440,123,638✔
2655
    pExprNode->_function.pFunctNode = pFuncNode;
440,194,035✔
2656
    pExprNode->_function.functionType = pFuncNode->funcType;
440,247,451✔
2657

2658
    tstrncpy(pExprNode->_function.functionName, pFuncNode->functionName, tListLen(pExprNode->_function.functionName));
440,156,600✔
2659

2660
    pExp->base.pParamList = pFuncNode->pParameterList;
440,194,937✔
2661
#if 1
2662
    // todo refactor: add the parameter for tbname function
2663
    const char* name = "tbname";
440,239,063✔
2664
    int32_t     len = strlen(name);
440,239,063✔
2665

2666
    if (!pFuncNode->pParameterList && (memcmp(pExprNode->_function.functionName, name, len) == 0) &&
440,239,063✔
2667
        pExprNode->_function.functionName[len] == 0) {
30,695,591✔
2668
      pFuncNode->pParameterList = NULL;
30,658,848✔
2669
      int32_t     code = nodesMakeList(&pFuncNode->pParameterList);
30,696,554✔
2670
      SValueNode* res = NULL;
30,708,265✔
2671
      if (TSDB_CODE_SUCCESS == code) {
30,709,339✔
2672
        code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
30,709,358✔
2673
      }
2674
      QUERY_CHECK_CODE(code, lino, _end);
30,722,130✔
2675
      res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
30,722,130✔
2676
      code = nodesListAppend(pFuncNode->pParameterList, (SNode*)res);
30,689,269✔
2677
      if (code != TSDB_CODE_SUCCESS) {
30,701,554✔
UNCOV
2678
        nodesDestroyNode((SNode*)res);
×
2679
        res = NULL;
×
2680
      }
2681
      QUERY_CHECK_CODE(code, lino, _end);
30,701,554✔
2682
    }
2683
#endif
2684

2685
    int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
440,305,836✔
2686

2687
    pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam));
440,262,565✔
2688
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
440,041,481✔
2689
    pExp->base.numOfParams = numOfParam;
440,007,239✔
2690

2691
    for (int32_t j = 0; j < numOfParam && TSDB_CODE_SUCCESS == code; ++j) {
1,065,746,263✔
2692
      SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
625,739,061✔
2693
      QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
625,726,059✔
2694
      if (p1->type == QUERY_NODE_COLUMN) {
625,726,059✔
2695
        SColumnNode* pcn = (SColumnNode*)p1;
454,145,363✔
2696

2697
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
454,145,363✔
2698
        pExp->base.pParam[j].pCol =
908,206,866✔
2699
            createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType);
908,277,642✔
2700
        QUERY_CHECK_NULL(pExp->base.pParam[j].pCol, code, lino, _end, terrno);
454,106,151✔
2701
      } else if (p1->type == QUERY_NODE_VALUE) {
171,645,684✔
2702
        SValueNode* pvn = (SValueNode*)p1;
118,298,884✔
2703
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
118,298,884✔
2704
        code = nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
118,263,069✔
2705
        QUERY_CHECK_CODE(code, lino, _end);
118,234,240✔
2706
      } else if (p1->type == QUERY_NODE_REMOTE_VALUE) {
53,456,861✔
2707
        SRemoteValueNode* pRemote = (SRemoteValueNode*)p1;
47,972✔
2708
        code = qFetchRemoteNode(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, p1);
47,972✔
2709
        QUERY_CHECK_CODE(code, lino, _end);
47,972✔
2710

2711
        SValueNode* pvn = (SValueNode*)pRemote;
34,726✔
2712
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
34,726✔
2713
        code = nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
34,726✔
UNCOV
2714
        QUERY_CHECK_CODE(code, lino, _end);
×
2715
      }
2716
    }
2717
    pExp->pExpr->_function.bindExprID = ((SExprNode*)pNode)->bindExprID;
440,007,202✔
2718
  } else if (type == QUERY_NODE_OPERATOR) {
22,728,404✔
2719
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
21,586,607✔
2720
    SOperatorNode* pOpNode = (SOperatorNode*)pNode;
21,580,362✔
2721

2722
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
21,580,362✔
2723
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
21,558,468✔
2724
    pExp->base.numOfParams = 1;
21,551,299✔
2725

2726
    SDataType* pType = &pOpNode->node.resType;
21,560,600✔
2727
    pExp->base.resSchema =
2728
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pOpNode->node.aliasName);
21,542,267✔
2729
    pExp->pExpr->_optrRoot.pRootNode = pNode;
21,571,227✔
2730
  } else if (type == QUERY_NODE_CASE_WHEN) {
1,142,771✔
2731
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
1,136,697✔
2732
    SCaseWhenNode* pCaseNode = (SCaseWhenNode*)pNode;
1,136,265✔
2733

2734
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
1,136,265✔
2735
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
1,136,697✔
2736
    pExp->base.numOfParams = 1;
1,136,265✔
2737

2738
    SDataType* pType = &pCaseNode->node.resType;
1,136,697✔
2739
    pExp->base.resSchema =
2740
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCaseNode->node.aliasName);
1,136,697✔
2741
    pExp->pExpr->_optrRoot.pRootNode = pNode;
1,136,265✔
2742
  } else if (type == QUERY_NODE_LOGIC_CONDITION) {
6,288✔
2743
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
3,038✔
2744
    SLogicConditionNode* pCond = (SLogicConditionNode*)pNode;
3,038✔
2745
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
3,038✔
2746
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
3,038✔
2747
    pExp->base.numOfParams = 1;
3,038✔
2748
    SDataType* pType = &pCond->node.resType;
3,038✔
2749
    pExp->base.resSchema =
2750
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName);
3,038✔
2751
    pExp->pExpr->_optrRoot.pRootNode = pNode;
3,038✔
2752
  } else {
2753
    code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
3,435✔
2754
    QUERY_CHECK_CODE(code, lino, _end);
3,435✔
2755
  }
2756
  pExp->pExpr->relatedTo = ((SExprNode*)pNode)->relatedTo;
1,070,177,875✔
2757
_end:
1,070,470,423✔
2758
  if (code != TSDB_CODE_SUCCESS) {
1,070,470,423✔
2759
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
288,906✔
2760
  }
2761
  return code;
1,070,537,079✔
2762
}
2763

2764
int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) {
1,070,576,198✔
2765
  return createExprFromOneNode(pExp, pTargetNode->pExpr, pTargetNode->slotId);
1,070,576,198✔
2766
}
2767

UNCOV
2768
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
×
2769
  *numOfExprs = LIST_LENGTH(pNodeList);
×
2770
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
×
2771
  if (!pExprs) {
×
2772
    return NULL;
×
2773
  }
2774

UNCOV
2775
  for (int32_t i = 0; i < (*numOfExprs); ++i) {
×
2776
    SExprInfo* pExp = &pExprs[i];
×
2777
    int32_t    code = createExprFromOneNode(pExp, nodesListGetNode(pNodeList, i), i + UD_TAG_COLUMN_INDEX);
×
2778
    if (code != TSDB_CODE_SUCCESS) {
×
2779
      taosMemoryFreeClear(pExprs);
×
2780
      terrno = code;
×
2781
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2782
      return NULL;
×
2783
    }
2784
  }
2785

UNCOV
2786
  return pExprs;
×
2787
}
2788

2789
int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs) {
347,385,171✔
2790
  QRY_PARAM_CHECK(pExprInfo);
347,385,171✔
2791

2792
  int32_t code = 0;
347,453,203✔
2793
  int32_t numOfFuncs = LIST_LENGTH(pNodeList);
347,453,203✔
2794
  int32_t numOfGroupKeys = 0;
347,400,938✔
2795
  if (pGroupKeys != NULL) {
347,400,938✔
2796
    numOfGroupKeys = LIST_LENGTH(pGroupKeys);
29,661,374✔
2797
  }
2798

2799
  *numOfExprs = numOfFuncs + numOfGroupKeys;
347,399,705✔
2800
  if (*numOfExprs == 0) {
347,471,672✔
2801
    return code;
35,430,401✔
2802
  }
2803

2804
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
312,120,690✔
2805
  if (pExprs == NULL) {
311,742,744✔
UNCOV
2806
    return terrno;
×
2807
  }
2808

2809
  for (int32_t i = 0; i < (*numOfExprs); ++i) {
1,381,711,440✔
2810
    STargetNode* pTargetNode = NULL;
1,070,109,322✔
2811
    if (i < numOfFuncs) {
1,070,109,322✔
2812
      pTargetNode = (STargetNode*)nodesListGetNode(pNodeList, i);
1,011,509,217✔
2813
    } else {
2814
      pTargetNode = (STargetNode*)nodesListGetNode(pGroupKeys, i - numOfFuncs);
58,600,105✔
2815
    }
2816
    if (!pTargetNode) {
1,070,466,975✔
UNCOV
2817
      destroyExprInfo(pExprs, *numOfExprs);
×
2818
      taosMemoryFreeClear(pExprs);
×
2819
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
2820
      return terrno;
×
2821
    }
2822

2823
    SExprInfo* pExp = &pExprs[i];
1,070,466,975✔
2824
    code = createExprFromTargetNode(pExp, pTargetNode);
1,070,485,536✔
2825
    if (code != TSDB_CODE_SUCCESS) {
1,070,257,960✔
2826
      destroyExprInfo(pExprs, *numOfExprs);
289,264✔
2827
      taosMemoryFreeClear(pExprs);
289,264✔
2828
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
289,264✔
2829
      return code;
289,264✔
2830
    }
2831
  }
2832

2833
  *pExprInfo = pExprs;
311,727,927✔
2834
  return code;
311,773,731✔
2835
}
2836

UNCOV
2837
static void deleteSubsidiareCtx(void* pData) {
×
2838
  SSubsidiaryResInfo* pCtx = (SSubsidiaryResInfo*)pData;
×
2839
  if (pCtx->pCtx) {
×
2840
    taosMemoryFreeClear(pCtx->pCtx);
×
2841
  }
UNCOV
2842
}
×
2843

2844
// set the output buffer for the selectivity + tag query
2845
static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
336,940,517✔
2846
  int32_t num = 0;
336,940,517✔
2847
  int32_t code = TSDB_CODE_SUCCESS;
336,940,517✔
2848
  int32_t lino = 0;
336,940,517✔
2849

2850
  SArray* pValCtxArray = NULL;
336,940,517✔
2851
  for (int32_t i = numOfOutput - 1; i > 0; --i) {  // select Func is at the end of the list
1,086,444,215✔
2852
    int32_t funcIdx = pCtx[i].pExpr->pExpr->_function.bindExprID;
749,671,869✔
2853
    if (funcIdx > 0) {
749,722,281✔
2854
      if (pValCtxArray == NULL) {
1,480,254✔
2855
        // the end of the list is the select function of biggest index
2856
        pValCtxArray = taosArrayInit_s(sizeof(SSubsidiaryResInfo*), funcIdx);
1,061,071✔
2857
        if (pValCtxArray == NULL) {
1,060,668✔
UNCOV
2858
          return terrno;
×
2859
        }
2860
      }
2861
      if (funcIdx > pValCtxArray->size) {
1,479,851✔
UNCOV
2862
        qError("funcIdx:%d is out of range", funcIdx);
×
2863
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2864
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2865
      }
2866
      SSubsidiaryResInfo* pSubsidiary = &pCtx[i].subsidiaries;
1,479,448✔
2867
      pSubsidiary->pCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
1,478,642✔
2868
      if (pSubsidiary->pCtx == NULL) {
1,478,642✔
UNCOV
2869
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2870
        return terrno;
×
2871
      }
2872
      pSubsidiary->num = 0;
1,476,627✔
2873
      taosArraySet(pValCtxArray, funcIdx - 1, &pSubsidiary);
1,476,627✔
2874
    }
2875
  }
2876

2877
  SqlFunctionCtx*  p = NULL;
336,772,346✔
2878
  SqlFunctionCtx** pValCtx = NULL;
336,772,346✔
2879
  if (pValCtxArray == NULL) {
336,772,346✔
2880
    pValCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
335,780,808✔
2881
    if (pValCtx == NULL) {
335,787,814✔
UNCOV
2882
      QUERY_CHECK_CODE(terrno, lino, _end);
×
2883
    }
2884
  }
2885

2886
  for (int32_t i = 0; i < numOfOutput; ++i) {
1,394,067,927✔
2887
    const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
1,057,129,677✔
2888
    if ((strcmp(pName, "_select_value") == 0)) {
1,057,316,106✔
2889
      if (pValCtxArray == NULL) {
7,022,626✔
2890
        pValCtx[num++] = &pCtx[i];
4,944,897✔
2891
      } else {
2892
        int32_t bindFuncIndex = pCtx[i].pExpr->pExpr->relatedTo;  // start from index 1;
2,077,764✔
2893
        if (bindFuncIndex > 0) {                                  // 0 is default index related to the select function
2,078,444✔
2894
          bindFuncIndex -= 1;
2,030,487✔
2895
        }
2896
        SSubsidiaryResInfo** pSubsidiary = taosArrayGet(pValCtxArray, bindFuncIndex);
2,078,444✔
2897
        if (pSubsidiary == NULL) {
2,078,041✔
UNCOV
2898
          QUERY_CHECK_CODE(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, lino, _end);
×
2899
        }
2900
        (*pSubsidiary)->pCtx[(*pSubsidiary)->num] = &pCtx[i];
2,078,041✔
2901
        (*pSubsidiary)->num++;
2,077,235✔
2902
      }
2903
    } else if (fmIsSelectFunc(pCtx[i].functionId)) {
1,050,293,480✔
2904
      if (pValCtxArray == NULL) {
107,992,028✔
2905
        p = &pCtx[i];
106,183,682✔
2906
      }
2907
    }
2908
  }
2909

2910
  if (p != NULL) {
336,938,250✔
2911
    p->subsidiaries.pCtx = pValCtx;
30,146,022✔
2912
    p->subsidiaries.num = num;
30,145,587✔
2913
  } else {
2914
    taosMemoryFreeClear(pValCtx);
306,792,228✔
2915
  }
2916

2917
_end:
1,120,474✔
2918
  if (code != TSDB_CODE_SUCCESS) {
336,788,150✔
UNCOV
2919
    taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2920
    taosMemoryFreeClear(pValCtx);
×
2921
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2922
  } else {
2923
    taosArrayDestroy(pValCtxArray);
336,788,150✔
2924
  }
2925
  return code;
336,864,961✔
2926
}
2927

2928
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset,
336,959,598✔
2929
                                     SFunctionStateStore* pStore) {
2930
  int32_t         code = TSDB_CODE_SUCCESS;
336,959,598✔
2931
  int32_t         lino = 0;
336,959,598✔
2932
  SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
336,959,598✔
2933
  if (pFuncCtx == NULL) {
336,583,530✔
UNCOV
2934
    return NULL;
×
2935
  }
2936

2937
  *rowEntryInfoOffset = taosMemoryCalloc(numOfOutput, sizeof(int32_t));
336,583,530✔
2938
  if (*rowEntryInfoOffset == 0) {
336,926,378✔
UNCOV
2939
    taosMemoryFreeClear(pFuncCtx);
×
2940
    return NULL;
×
2941
  }
2942

2943
  for (int32_t i = 0; i < numOfOutput; ++i) {
1,394,284,093✔
2944
    SExprInfo* pExpr = &pExprInfo[i];
1,057,384,654✔
2945

2946
    SExprBasicInfo* pFunct = &pExpr->base;
1,057,266,096✔
2947
    SqlFunctionCtx* pCtx = &pFuncCtx[i];
1,057,343,742✔
2948

2949
    pCtx->functionId = -1;
1,057,360,490✔
2950
    pCtx->pExpr = pExpr;
1,057,427,226✔
2951

2952
    if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) {
1,057,315,443✔
2953
      SFuncExecEnv env = {0};
439,241,729✔
2954
      pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
439,261,820✔
2955
      pCtx->isPseudoFunc = fmIsWindowPseudoColumnFunc(pCtx->functionId) || fmIsPlaceHolderFunc(pCtx->functionId);
439,254,642✔
2956
      pCtx->isNotNullFunc = fmIsNotNullOutputFunc(pCtx->functionId);
439,192,950✔
2957

2958
      bool isUdaf = fmIsUserDefinedFunc(pCtx->functionId);
439,206,617✔
2959
      if (fmIsAggFunc(pCtx->functionId) || fmIsIndefiniteRowsFunc(pCtx->functionId)) {
727,650,192✔
2960
        if (!isUdaf) {
288,684,467✔
2961
          code = fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
288,585,492✔
2962
          QUERY_CHECK_CODE(code, lino, _end);
288,439,135✔
2963
        } else {
2964
          char* udfName = pExpr->pExpr->_function.pFunctNode->functionName;
98,975✔
2965
          pCtx->udfName = taosStrdup(udfName);
98,975✔
2966
          QUERY_CHECK_NULL(pCtx->udfName, code, lino, _end, terrno);
98,975✔
2967

2968
          code = fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
98,975✔
2969
          QUERY_CHECK_CODE(code, lino, _end);
98,975✔
2970
        }
2971
        bool tmp = pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
288,538,110✔
2972
        if (!tmp) {
288,548,817✔
UNCOV
2973
          code = terrno;
×
2974
          QUERY_CHECK_CODE(code, lino, _end);
×
2975
        }
2976
      } else {
2977
        if (fmIsPlaceHolderFunc(pCtx->functionId)) {
150,480,944✔
2978
          code = fmGetStreamPesudoFuncEnv(pCtx->functionId, pExpr->base.pParamList, &env);
6,144,590✔
2979
          QUERY_CHECK_CODE(code, lino, _end);
6,144,799✔
2980
        }      
2981
        
2982
        code = fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp);
150,528,949✔
2983
        if (code != TSDB_CODE_SUCCESS && isUdaf) {
150,510,280✔
2984
          code = TSDB_CODE_SUCCESS;
114,105✔
2985
        }
2986
        QUERY_CHECK_CODE(code, lino, _end);
150,510,280✔
2987

2988
        if (pCtx->sfp.getEnv != NULL) {
150,510,280✔
2989
          bool tmp = pCtx->sfp.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
23,910,123✔
2990
          if (!tmp) {
23,910,061✔
UNCOV
2991
            code = terrno;
×
2992
            QUERY_CHECK_CODE(code, lino, _end);
×
2993
          }
2994
        }
2995
      }
2996
      pCtx->resDataInfo.interBufSize = env.calcMemSize;
439,042,142✔
2997
    } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR ||
618,068,371✔
2998
               pExpr->pExpr->nodeType == QUERY_NODE_VALUE) {
17,367,653✔
2999
      // for simple column, the result buffer needs to hold at least one element.
3000
      pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes;
618,263,036✔
3001
    }
3002

3003
    pCtx->input.numOfInputCols = pFunct->numOfParams;
1,057,415,029✔
3004
    pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
1,057,312,010✔
3005
    QUERY_CHECK_NULL(pCtx->input.pData, code, lino, _end, terrno);
1,057,413,868✔
3006
    pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
1,057,291,300✔
3007
    QUERY_CHECK_NULL(pCtx->input.pColumnDataAgg, code, lino, _end, terrno);
1,057,445,601✔
3008

3009
    pCtx->pTsOutput = NULL;
1,057,328,527✔
3010
    pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
1,057,503,255✔
3011
    pCtx->resDataInfo.type = pFunct->resSchema.type;
1,057,348,897✔
3012
    pCtx->order = TSDB_ORDER_ASC;
1,057,406,957✔
3013
    pCtx->start.key = INT64_MIN;
1,057,531,757✔
3014
    pCtx->end.key = INT64_MIN;
1,057,402,771✔
3015
    pCtx->numOfParams = pExpr->base.numOfParams;
1,057,398,249✔
3016
    pCtx->param = pFunct->pParam;
1,057,616,097✔
3017
    pCtx->saveHandle.currentPage = -1;
1,057,450,937✔
3018
    pCtx->pStore = pStore;
1,057,484,295✔
3019
    pCtx->hasWindowOrGroup = false;
1,057,541,718✔
3020
    pCtx->needCleanup = false;
1,057,424,470✔
3021
    pCtx->skipDynDataCheck = false;
1,057,250,129✔
3022
  }
3023

3024
  for (int32_t i = 1; i < numOfOutput; ++i) {
1,086,773,428✔
3025
    (*rowEntryInfoOffset)[i] = (int32_t)((*rowEntryInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) +
1,499,597,819✔
3026
                                         pFuncCtx[i - 1].resDataInfo.interBufSize);
749,853,278✔
3027
  }
3028

3029
  code = setSelectValueColumnInfo(pFuncCtx, numOfOutput);
336,934,938✔
3030
  QUERY_CHECK_CODE(code, lino, _end);
336,880,937✔
3031

3032
_end:
336,880,937✔
3033
  if (code != TSDB_CODE_SUCCESS) {
336,803,504✔
UNCOV
3034
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3035
    for (int32_t i = 0; i < numOfOutput; ++i) {
×
3036
      taosMemoryFree(pFuncCtx[i].input.pData);
×
3037
      taosMemoryFree(pFuncCtx[i].input.pColumnDataAgg);
×
3038
    }
UNCOV
3039
    taosMemoryFreeClear(*rowEntryInfoOffset);
×
3040
    taosMemoryFreeClear(pFuncCtx);
×
3041

UNCOV
3042
    terrno = code;
×
3043
    return NULL;
×
3044
  }
3045
  return pFuncCtx;
336,803,504✔
3046
}
3047

3048
// NOTE: sources columns are more than the destination SSDatablock columns.
3049
// doFilter in table scan needs every column even its output is false
3050
int32_t relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn) {
14,057,976✔
3051
  int32_t code = TSDB_CODE_SUCCESS;
14,057,976✔
3052
  size_t  numOfSrcCols = taosArrayGetSize(pCols);
14,057,976✔
3053

3054
  int32_t i = 0, j = 0;
14,060,566✔
3055
  while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
111,927,513✔
3056
    SColumnInfoData* p = taosArrayGet(pCols, i);
97,868,553✔
3057
    if (!p) {
97,868,034✔
UNCOV
3058
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3059
      return terrno;
×
3060
    }
3061
    SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j);
97,868,034✔
3062
    if (!pmInfo) {
97,866,087✔
UNCOV
3063
      return terrno;
×
3064
    }
3065

3066
    if (p->info.colId == pmInfo->colId) {
97,866,087✔
3067
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->dstSlotId);
91,906,321✔
3068
      if (!pDst) {
91,904,395✔
UNCOV
3069
        return terrno;
×
3070
      }
3071
      code = colDataAssign(pDst, p, pBlock->info.rows, &pBlock->info);
91,904,395✔
3072
      if (code != TSDB_CODE_SUCCESS) {
91,904,970✔
UNCOV
3073
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3074
        return code;
×
3075
      }
3076
      i++;
91,904,970✔
3077
      j++;
91,904,970✔
3078
    } else if (p->info.colId < pmInfo->colId) {
5,961,977✔
3079
      i++;
5,961,977✔
3080
    } else {
UNCOV
3081
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
3082
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3083
    }
3084
  }
3085
  return code;
14,059,584✔
3086
}
3087

3088
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
152,114,111✔
3089
  SInterval interval = {
304,012,502✔
3090
      .interval = pTableScanNode->interval,
151,970,532✔
3091
      .sliding = pTableScanNode->sliding,
151,909,452✔
3092
      .intervalUnit = pTableScanNode->intervalUnit,
152,166,824✔
3093
      .slidingUnit = pTableScanNode->slidingUnit,
151,887,377✔
3094
      .offset = pTableScanNode->offset,
152,165,146✔
3095
      .precision = pTableScanNode->scan.node.pOutputDataBlockDesc->precision,
152,156,667✔
3096
      .timeRange = pTableScanNode->scanRange,
3097
  };
3098
  calcIntervalAutoOffset(&interval);
151,888,688✔
3099

3100
  return interval;
151,985,818✔
3101
}
3102

3103
SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
62,835,926✔
3104
  SColumn c = {0};
62,835,926✔
3105

3106
  c.slotId = pColNode->slotId;
62,835,926✔
3107
  c.colId = pColNode->colId;
62,834,350✔
3108
  c.type = pColNode->node.resType.type;
62,842,646✔
3109
  c.bytes = pColNode->node.resType.bytes;
62,838,639✔
3110
  c.scale = pColNode->node.resType.scale;
62,834,142✔
3111
  c.precision = pColNode->node.resType.precision;
62,839,595✔
3112
  return c;
62,820,129✔
3113
}
3114

3115

3116
/**
3117
 * @brief Determine the actual time range for reading data based on the RANGE clause and the WHERE conditions.
3118
 * @param[in] cond The range specified by WHERE condition.
3119
 * @param[in] range The range specified by RANGE clause.
3120
 * @param[out] twindow The range to be read in DESC order, and only one record is needed.
3121
 * @param[out] extTwindow The external range to read for only one record, which is used for FILL clause.
3122
 * @note `cond` and `twindow` may be the same address.
3123
 */
3124
static int32_t getQueryExtWindow(const STimeWindow* cond, const STimeWindow* range, STimeWindow* twindow,
3,066,841✔
3125
                                 STimeWindow* extTwindows) {
3126
  int32_t     code = TSDB_CODE_SUCCESS;
3,066,841✔
3127
  int32_t     lino = 0;
3,066,841✔
3128
  STimeWindow tempWindow;
3129

3130
  if (cond->skey > cond->ekey || range->skey > range->ekey) {
3,066,841✔
3131
    *twindow = extTwindows[0] = extTwindows[1] = TSWINDOW_DESC_INITIALIZER;
4,745✔
3132
    return code;
4,745✔
3133
  }
3134

3135
  if (range->ekey < cond->skey) {
3,062,096✔
3136
    extTwindows[1] = *cond;
468,999✔
3137
    *twindow = extTwindows[0] = TSWINDOW_DESC_INITIALIZER;
468,999✔
3138
    return code;
468,999✔
3139
  }
3140

3141
  if (cond->ekey < range->skey) {
2,593,097✔
3142
    extTwindows[0] = *cond;
349,865✔
3143
    *twindow = extTwindows[1] = TSWINDOW_DESC_INITIALIZER;
349,865✔
3144
    return code;
349,865✔
3145
  }
3146

3147
  // Only scan data in the time range intersecion.
3148
  extTwindows[0] = extTwindows[1] = *cond;
2,243,704✔
3149
  twindow->skey = TMAX(cond->skey, range->skey);
2,243,704✔
3150
  twindow->ekey = TMIN(cond->ekey, range->ekey);
2,243,704✔
3151
  extTwindows[0].ekey = twindow->skey - 1;
2,243,704✔
3152
  extTwindows[1].skey = twindow->ekey + 1;
2,243,704✔
3153

3154
  return code;
2,243,704✔
3155
}
3156

3157
static int32_t getPrimaryTimeRange(SNode** pPrimaryKeyCond, STimeWindow* pTimeRange, bool* isStrict) {
6,192✔
3158
  SNode*  pNew = NULL;
6,192✔
3159
  int32_t code = scalarCalculateRemoteConstants(*pPrimaryKeyCond, &pNew);
6,192✔
3160
  if (TSDB_CODE_SUCCESS == code) {
6,192✔
3161
    *pPrimaryKeyCond = pNew;
6,192✔
3162
    if (nodeType(pNew) != QUERY_NODE_VALUE) {
6,192✔
3163
      code = filterGetTimeRange(*pPrimaryKeyCond, pTimeRange, isStrict, NULL);
6,192✔
3164
    }
3165
  }
3166
  return code;
6,192✔
3167
}
3168

3169
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, STableScanPhysiNode* pTableScanNode,
180,092,013✔
3170
                               const SReadHandle* readHandle, bool applyExtWin) {
3171
  int32_t code = 0;                             
180,092,013✔
3172
  pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
180,092,013✔
3173
  pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
180,055,900✔
3174

3175
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
180,130,722✔
3176
  if (!pCond->colList) {
180,070,352✔
UNCOV
3177
    return terrno;
×
3178
  }
3179
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
179,983,583✔
3180
  if (pCond->pSlotList == NULL) {
180,055,351✔
UNCOV
3181
    taosMemoryFreeClear(pCond->colList);
×
3182
    return terrno;
×
3183
  }
3184

3185
  // TODO: get it from stable scan node
3186
  pCond->twindows = pTableScanNode->scanRange;
179,928,167✔
3187
  pCond->suid = pTableScanNode->scan.suid;
180,131,294✔
3188
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
179,903,285✔
3189
  pCond->startVersion = -1;
180,047,996✔
3190
  pCond->endVersion = -1;
180,113,726✔
3191
  pCond->skipRollup = readHandle->skipRollup;
179,865,838✔
3192
  if (readHandle->winRangeValid) {
180,000,904✔
3193
    pCond->twindows = readHandle->winRange;
248,289✔
3194
  }
3195
  pCond->cacheSttStatis = readHandle->cacheSttStatis;
180,107,076✔
3196
  // allowed read stt file optimization mode
3197
  pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) &&
360,226,676✔
3198
                       (pTableScanNode->scan.node.pConditions == NULL) && (pTableScanNode->interval == 0);
180,049,637✔
3199

3200
  int32_t j = 0;
180,038,815✔
3201
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
965,984,727✔
3202
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pTableScanNode->scan.pScanCols, i);
786,053,952✔
3203
    if (!pNode) {
785,638,848✔
UNCOV
3204
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3205
      return terrno;
×
3206
    }
3207
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
785,638,848✔
3208
    if (pColNode->colType == COLUMN_TYPE_TAG) {
786,016,328✔
UNCOV
3209
      continue;
×
3210
    }
3211

3212
    pCond->colList[j].type = pColNode->node.resType.type;
786,004,743✔
3213
    pCond->colList[j].bytes = pColNode->node.resType.bytes;
785,979,831✔
3214
    pCond->colList[j].colId = pColNode->colId;
785,782,571✔
3215
    pCond->colList[j].pk = pColNode->isPk;
785,968,831✔
3216

3217
    pCond->pSlotList[j] = pNode->slotId;
786,088,039✔
3218
    j += 1;
785,945,912✔
3219
  }
3220

3221
  pCond->numOfCols = j;
180,128,279✔
3222

3223
  if (applyExtWin) {
180,138,629✔
3224
    if (NULL != pTableScanNode->pExtScanRange) {
152,487,482✔
3225
      pCond->type = TIMEWINDOW_RANGE_EXTERNAL;
3,002,781✔
3226
      code = getQueryExtWindow(&pCond->twindows, pTableScanNode->pExtScanRange, &pCond->twindows, pCond->extTwindows);
3,002,309✔
3227
    } else if (readHandle->extWinRangeValid) {
149,294,660✔
3228
      pCond->type = TIMEWINDOW_RANGE_EXTERNAL;
64,532✔
3229
      code = getQueryExtWindow(&pCond->twindows, &readHandle->extWinRange, &pCond->twindows, pCond->extTwindows);
64,532✔
3230
    }
3231
  }
3232

3233
  if (pTableScanNode->pPrimaryCond) {
180,022,911✔
3234
    bool isStrict = false;
6,192✔
3235
    code = getPrimaryTimeRange((SNode**)&pTableScanNode->pPrimaryCond, &pCond->twindows, &isStrict);
6,192✔
3236
    if (code || !isStrict) {
6,192✔
3237
      code = nodesMergeNode((SNode**)&pTableScanNode->scan.node.pConditions, &pTableScanNode->pPrimaryCond);
2,064✔
3238
    }
3239
  }
3240

3241
  return code;
180,033,405✔
3242
}
3243

3244
int32_t initQueryTableDataCondWithColArray(SQueryTableDataCond* pCond, SQueryTableDataCond* pOrgCond,
44,797,398✔
3245
                                           const SReadHandle* readHandle, SArray* colArray) {
3246
  int32_t code = TSDB_CODE_SUCCESS;
44,797,398✔
3247
  int32_t lino = 0;
44,797,398✔
3248

3249
  pCond->order = TSDB_ORDER_ASC;
44,797,398✔
3250
  pCond->numOfCols = (int32_t)taosArrayGetSize(colArray);
44,803,532✔
3251

3252
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
44,790,443✔
3253
  QUERY_CHECK_NULL(pCond->colList, code, lino, _return, terrno);
44,782,959✔
3254

3255
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
44,776,524✔
3256
  QUERY_CHECK_NULL(pCond->pSlotList, code, lino, _return, terrno);
44,787,334✔
3257

3258
  pCond->twindows = pOrgCond->twindows;
44,761,295✔
3259
  pCond->order = pOrgCond->order;
44,795,015✔
3260
  pCond->type = pOrgCond->type;
44,792,942✔
3261
  pCond->startVersion = -1;
44,793,086✔
3262
  pCond->endVersion = -1;
44,772,353✔
3263
  pCond->skipRollup = true;
44,794,829✔
3264
  pCond->notLoadData = false;
44,782,467✔
3265

3266
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
218,271,340✔
3267
    SColIdPair* pColPair = taosArrayGet(colArray, i);
173,507,453✔
3268
    QUERY_CHECK_NULL(pColPair, code, lino, _return, terrno);
173,506,238✔
3269

3270
    bool find = false;
173,495,087✔
3271
    for (int32_t j = 0; j < pOrgCond->numOfCols; ++j) {
1,054,839,382✔
3272
      if (pOrgCond->colList[j].colId == pColPair->vtbColId) {
1,054,512,058✔
3273
        pCond->colList[i].type = pOrgCond->colList[j].type;
173,533,329✔
3274
        pCond->colList[i].bytes = pOrgCond->colList[j].bytes;
173,552,372✔
3275
        pCond->colList[i].colId = pColPair->orgColId;
173,555,971✔
3276
        pCond->colList[i].pk = pOrgCond->colList[j].pk;
173,554,936✔
3277
        pCond->pSlotList[i] = i;
173,566,178✔
3278
        find = true;
173,545,607✔
3279
        qDebug("%s mapped vtb colId:%d to org colId:%d", __func__, pColPair->vtbColId, pColPair->orgColId);
173,545,607✔
3280
        break;
173,517,477✔
3281
      }
3282
    }
3283
    QUERY_CHECK_CONDITION(find, code, lino, _return, TSDB_CODE_NOT_FOUND);
173,507,520✔
3284
  }
3285

3286
  return code;
44,810,022✔
UNCOV
3287
_return:
×
3288
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(terrno));
×
3289
  taosMemoryFreeClear(pCond->colList);
×
3290
  taosMemoryFreeClear(pCond->pSlotList);
×
3291
  return code;
×
3292
}
3293

3294
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) {
421,640,915✔
3295
  taosMemoryFreeClear(pCond->colList);
421,640,915✔
3296
  taosMemoryFreeClear(pCond->pSlotList);
421,648,902✔
3297
}
421,605,389✔
3298

3299
int32_t convertFillType(int32_t mode) {
3,629,303✔
3300
  int32_t type = TSDB_FILL_NONE;
3,629,303✔
3301
  switch (mode) {
3,629,303✔
3302
    case FILL_MODE_PREV:
168,500✔
3303
      type = TSDB_FILL_PREV;
168,500✔
3304
      break;
168,500✔
UNCOV
3305
    case FILL_MODE_NONE:
×
3306
      type = TSDB_FILL_NONE;
×
3307
      break;
×
3308
    case FILL_MODE_NULL:
168,756✔
3309
      type = TSDB_FILL_NULL;
168,756✔
3310
      break;
168,756✔
3311
    case FILL_MODE_NULL_F:
24,689✔
3312
      type = TSDB_FILL_NULL_F;
24,689✔
3313
      break;
24,689✔
3314
    case FILL_MODE_NEXT:
179,356✔
3315
      type = TSDB_FILL_NEXT;
179,356✔
3316
      break;
179,356✔
3317
    case FILL_MODE_VALUE:
154,027✔
3318
      type = TSDB_FILL_SET_VALUE;
154,027✔
3319
      break;
154,027✔
3320
    case FILL_MODE_VALUE_F:
10,994✔
3321
      type = TSDB_FILL_SET_VALUE_F;
10,994✔
3322
      break;
10,994✔
3323
    case FILL_MODE_LINEAR:
214,021✔
3324
      type = TSDB_FILL_LINEAR;
214,021✔
3325
      break;
214,021✔
3326
    case FILL_MODE_NEAR:
2,708,960✔
3327
      type = TSDB_FILL_NEAR;
2,708,960✔
3328
      break;
2,708,960✔
UNCOV
3329
    default:
×
3330
      type = TSDB_FILL_NONE;
×
3331
  }
3332

3333
  return type;
3,629,303✔
3334
}
3335

3336
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery) {
2,147,483,647✔
3337
  if (ascQuery) {
2,147,483,647✔
3338
    *w = getAlignQueryTimeWindow(pInterval, ts);
2,147,483,647✔
3339
  } else {
3340
    // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
3341
    *w = getAlignQueryTimeWindow(pInterval, ts);
11,349✔
3342

3343
    int64_t key = w->skey;
201,482✔
3344
    while (key < ts) {  // moving towards end
217,035✔
3345
      key = getNextTimeWindowStart(pInterval, key, TSDB_ORDER_ASC);
99,945✔
3346
      if (key > ts) {
100,321✔
3347
        break;
84,768✔
3348
      }
3349

3350
      w->skey = key;
15,553✔
3351
    }
3352
    w->ekey = taosTimeAdd(w->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
201,858✔
3353
  }
3354
}
2,147,483,647✔
3355

3356
static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) {
69,271,454✔
3357
  STimeWindow w = {0};
69,271,454✔
3358

3359
  w.skey = taosTimeTruncate(ts, pInterval);
69,271,454✔
3360
  w.ekey = taosTimeGetIntervalEnd(w.skey, pInterval);
69,270,116✔
3361
  return w;
69,271,891✔
3362
}
3363

3364
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) {
1,778,443✔
3365
  STimeWindow win = *pWindow;
1,778,443✔
3366
  STimeWindow save = win;
1,778,443✔
3367
  while (win.skey <= ts && win.ekey >= ts) {
10,523,994✔
3368
    save = win;
8,745,551✔
3369
    // get previous time window
3370
    getNextTimeWindow(pInterval, &win, order == TSDB_ORDER_DESC ? TSDB_ORDER_ASC : TSDB_ORDER_DESC);
8,745,551✔
3371
  }
3372

3373
  return save;
1,778,443✔
3374
}
3375

3376
// get the correct time window according to the handled timestamp
3377
// todo refactor
3378
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
106,763,629✔
3379
                                int32_t order) {
3380
  STimeWindow w = {0};
106,763,629✔
3381
  if (pResultRowInfo->cur.pageId == -1) {  // the first window, from the previous stored value
106,764,866✔
3382
    getInitialStartTimeWindow(pInterval, ts, &w, (order != TSDB_ORDER_DESC));
2,481,952✔
3383
    return w;
2,483,809✔
3384
  }
3385

3386
  SResultRow* pRow = getResultRowByPos(pBuf, &pResultRowInfo->cur, false);
104,281,559✔
3387
  if (pRow) {
104,280,685✔
3388
    TAOS_SET_OBJ_ALIGNED(&w, pRow->win);
104,281,559✔
3389
  }
3390

3391
  // in case of typical time window, we can calculate time window directly.
3392
  if (w.skey > ts || w.ekey < ts) {
104,283,364✔
3393
    w = doCalculateTimeWindow(ts, pInterval);
69,271,891✔
3394
  }
3395

3396
  if (pInterval->interval != pInterval->sliding) {
104,283,364✔
3397
    // it is an sliding window query, in which sliding value is not equalled to
3398
    // interval value, and we need to find the first qualified time window.
3399
    w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
1,778,443✔
3400
  }
3401

3402
  return w;
104,278,970✔
3403
}
3404

3405
TSKEY getNextTimeWindowStart(const SInterval* pInterval, TSKEY start, int32_t order) {
2,147,483,647✔
3406
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
2,147,483,647✔
3407
  TSKEY   nextStart = taosTimeAdd(start, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
2,147,483,647✔
3408
  nextStart = taosTimeAdd(nextStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision, NULL);
2,147,483,647✔
3409
  nextStart = taosTimeAdd(nextStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL);
2,147,483,647✔
3410
  return nextStart;
2,147,483,647✔
3411
}
3412

3413
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order) {
2,147,483,647✔
3414
  tw->skey = getNextTimeWindowStart(pInterval, tw->skey, order);
2,147,483,647✔
3415
  tw->ekey = taosTimeAdd(tw->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
2,147,483,647✔
3416
}
2,147,483,647✔
3417

3418
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
376,358,816✔
3419
  return (pLimitInfo->limit.limit != -1 || pLimitInfo->limit.offset != -1 || pLimitInfo->slimit.limit != -1 ||
748,609,090✔
3420
          pLimitInfo->slimit.offset != -1);
372,249,314✔
3421
}
3422

UNCOV
3423
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo) {
×
3424
  return (pLimitInfo->slimit.limit != -1 || pLimitInfo->slimit.offset != -1);
×
3425
}
3426

3427
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) {
467,197,195✔
3428
  SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)};
467,197,195✔
3429
  SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)};
466,991,590✔
3430

3431
  pLimitInfo->limit = limit;
466,995,001✔
3432
  pLimitInfo->slimit = slimit;
467,028,035✔
3433
  pLimitInfo->remainOffset = limit.offset;
467,035,190✔
3434
  pLimitInfo->remainGroupOffset = slimit.offset;
467,023,459✔
3435
  pLimitInfo->numOfOutputRows = 0;
467,004,965✔
3436
  pLimitInfo->numOfOutputGroups = 0;
467,178,507✔
3437
  pLimitInfo->currentGroupId = 0;
467,162,342✔
3438
}
467,184,680✔
3439

3440
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo) {
86,405,693✔
3441
  pLimitInfo->numOfOutputRows = 0;
86,405,693✔
3442
  pLimitInfo->remainOffset = pLimitInfo->limit.offset;
86,432,478✔
3443
}
86,379,032✔
3444

3445
int32_t tableListGetSize(const STableListInfo* pTableList, int32_t* pRes) {
497,448,529✔
3446
  if (taosArrayGetSize(pTableList->pTableList) != taosHashGetSize(pTableList->map)) {
497,448,529✔
UNCOV
3447
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
3448
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3449
  }
3450
  (*pRes) = taosArrayGetSize(pTableList->pTableList);
497,401,451✔
3451
  return TSDB_CODE_SUCCESS;
497,389,697✔
3452
}
3453

3454
uint64_t tableListGetSuid(const STableListInfo* pTableList) { return pTableList->idInfo.suid; }
3,104,505✔
3455

3456
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) {
202,430,940✔
3457
  if (taosArrayGetSize(pTableList->pTableList) == 0) {
202,430,940✔
3458
    return NULL;
2,019✔
3459
  }
3460

3461
  return taosArrayGet(pTableList->pTableList, index);
202,380,111✔
3462
}
3463

3464
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex) {
47,365✔
3465
  int32_t numOfTables = taosArrayGetSize(pTableList->pTableList);
47,365✔
3466
  if (startIndex >= numOfTables) {
47,365✔
UNCOV
3467
    return -1;
×
3468
  }
3469

3470
  for (int32_t i = startIndex; i < numOfTables; ++i) {
577,643✔
3471
    STableKeyInfo* p = taosArrayGet(pTableList->pTableList, i);
577,643✔
3472
    if (!p) {
577,643✔
UNCOV
3473
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3474
      return -1;
×
3475
    }
3476
    if (p->uid == uid) {
577,643✔
3477
      return i;
47,365✔
3478
    }
3479
  }
UNCOV
3480
  return -1;
×
3481
}
3482

3483
void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, uint64_t* uid, int32_t* type) {
383,345✔
3484
  *psuid = pTableList->idInfo.suid;
383,345✔
3485
  *uid = pTableList->idInfo.uid;
383,345✔
3486
  *type = pTableList->idInfo.tableType;
383,345✔
3487
}
383,345✔
3488

3489
uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
747,737,424✔
3490
  int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
747,737,424✔
3491
  if (slot == NULL) {
748,049,572✔
UNCOV
3492
    qDebug("table:%" PRIu64 " not found in table list", tableUid);
×
3493
    return -1;
×
3494
  }
3495

3496
  STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot);
748,049,572✔
3497
  if (pKeyInfo == NULL) {
748,030,353✔
UNCOV
3498
    qDebug("table:%" PRIu64 " not found in table list", tableUid);
×
3499
    return -1;
×
3500
  }
3501
  return pKeyInfo->groupId;
748,030,353✔
3502
}
3503

3504
// TODO handle the group offset info, fix it, the rule of group output will be broken by this function
3505
// int32_t tableListRemoveTableInfo(STableListInfo* pTableList, uint64_t uid) {
3506
//   int32_t code = TSDB_CODE_SUCCESS;
3507
//   int32_t lino = 0;
3508

3509
//   int32_t* slot = taosHashGet(pTableList->map, &uid, sizeof(uid));
3510
//   if (slot == NULL) {
3511
//     qDebug("table:%" PRIu64 " not found in table list", uid);
3512
//     return 0;
3513
//   }
3514

3515
//   taosArrayRemove(pTableList->pTableList, *slot);
3516
//   code = taosHashRemove(pTableList->map, &uid, sizeof(uid));
3517

3518
//   _end:
3519
//   if (code != TSDB_CODE_SUCCESS) {
3520
//     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
3521
//   } else {
3522
//     qDebug("uid:%" PRIu64 ", remove from table list", uid);
3523
//   }
3524

3525
//   return code;
3526
// }
3527

3528
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
1,105,899✔
3529
  int32_t code = TSDB_CODE_SUCCESS;
1,105,899✔
3530
  int32_t lino = 0;
1,105,899✔
3531
  if (pTableList->map == NULL) {
1,105,899✔
UNCOV
3532
    pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
3533
    QUERY_CHECK_NULL(pTableList->map, code, lino, _end, terrno);
×
3534
  }
3535

3536
  STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
1,107,668✔
3537
  void*         p = taosHashGet(pTableList->map, &uid, sizeof(uid));
1,107,375✔
3538
  if (p != NULL) {
1,106,203✔
3539
    qInfo("table:%" PRId64 " already in tableIdList, ignore it", uid);
636✔
3540
    goto _end;
636✔
3541
  }
3542

3543
  void* tmp = taosArrayPush(pTableList->pTableList, &keyInfo);
1,105,567✔
3544
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,106,728✔
3545

3546
  int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
1,106,728✔
3547
  code = taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
1,106,435✔
3548
  if (code != TSDB_CODE_SUCCESS) {
1,107,043✔
3549
    // we have checked the existence of uid in hash map above
UNCOV
3550
    QUERY_CHECK_CONDITION((code != TSDB_CODE_DUP_KEY), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3551
    taosArrayPopTailBatch(pTableList->pTableList, 1);  // let's pop the last element in the array list
×
3552
  }
3553

3554
_end:
1,107,679✔
3555
  if (code != TSDB_CODE_SUCCESS) {
1,106,485✔
UNCOV
3556
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3557
  } else {
3558
    qDebug("uid:%" PRIu64 ", groupId:%" PRIu64 " added into table list, slot:%d, total:%d", uid, gid, slot, slot + 1);
1,106,485✔
3559
  }
3560

3561
  return code;
1,107,668✔
3562
}
3563

3564
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
172,111,980✔
3565
                              int32_t* size) {
3566
  int32_t totalGroups = tableListGetOutputGroups(pTableList);
172,111,980✔
3567
  int32_t numOfTables = 0;
172,088,953✔
3568
  int32_t code = tableListGetSize(pTableList, &numOfTables);
172,106,042✔
3569
  if (code != TSDB_CODE_SUCCESS) {
172,097,697✔
UNCOV
3570
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3571
    return code;
×
3572
  }
3573

3574
  if (ordinalGroupIndex < 0 || ordinalGroupIndex >= totalGroups) {
172,097,697✔
UNCOV
3575
    return TSDB_CODE_INVALID_PARA;
×
3576
  }
3577

3578
  // here handle two special cases:
3579
  // 1. only one group exists, and 2. one table exists for each group.
3580
  if (totalGroups == 1) {
172,097,697✔
3581
    *size = numOfTables;
171,609,893✔
3582
    *pKeyInfo = (*size == 0) ? NULL : taosArrayGet(pTableList->pTableList, 0);
171,585,684✔
3583
    return TSDB_CODE_SUCCESS;
171,624,833✔
3584
  } else if (totalGroups == numOfTables) {
487,804✔
3585
    *size = 1;
459,705✔
3586
    *pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex);
459,705✔
3587
    return TSDB_CODE_SUCCESS;
459,236✔
3588
  }
3589

3590
  int32_t offset = pTableList->groupOffset[ordinalGroupIndex];
30,366✔
3591
  if (ordinalGroupIndex < totalGroups - 1) {
54,823✔
3592
    *size = pTableList->groupOffset[ordinalGroupIndex + 1] - offset;
41,209✔
3593
  } else {
3594
    *size = numOfTables - offset;
13,614✔
3595
  }
3596

3597
  *pKeyInfo = taosArrayGet(pTableList->pTableList, offset);
54,823✔
3598
  return TSDB_CODE_SUCCESS;
54,823✔
3599
}
3600

3601
int32_t tableListGetOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; }
466,707,974✔
3602

3603
bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; }
568,890✔
3604

3605
STableListInfo* tableListCreate() {
197,944,070✔
3606
  STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo));
197,944,070✔
3607
  if (pListInfo == NULL) {
197,777,814✔
UNCOV
3608
    return NULL;
×
3609
  }
3610

3611
  pListInfo->remainGroups = NULL;
197,777,814✔
3612
  pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
197,808,895✔
3613
  if (pListInfo->pTableList == NULL) {
197,872,982✔
UNCOV
3614
    goto _error;
×
3615
  }
3616

3617
  pListInfo->map = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
197,913,480✔
3618
  if (pListInfo->map == NULL) {
198,067,651✔
UNCOV
3619
    goto _error;
×
3620
  }
3621

3622
  pListInfo->numOfOuputGroups = 1;
198,068,123✔
3623
  return pListInfo;
198,066,183✔
3624

UNCOV
3625
_error:
×
3626
  tableListDestroy(pListInfo);
×
3627
  return NULL;
×
3628
}
3629

3630
void tableListDestroy(STableListInfo* pTableListInfo) {
208,116,131✔
3631
  if (pTableListInfo == NULL) {
208,116,131✔
3632
    return;
10,126,088✔
3633
  }
3634

3635
  taosArrayDestroy(pTableListInfo->pTableList);
197,990,043✔
3636
  taosMemoryFreeClear(pTableListInfo->groupOffset);
197,937,181✔
3637

3638
  taosHashCleanup(pTableListInfo->map);
197,946,203✔
3639
  taosHashCleanup(pTableListInfo->remainGroups);
198,019,185✔
3640
  pTableListInfo->pTableList = NULL;
198,019,055✔
3641
  pTableListInfo->map = NULL;
198,040,822✔
3642
  taosMemoryFree(pTableListInfo);
198,031,433✔
3643
}
3644

3645
void tableListClear(STableListInfo* pTableListInfo) {
755,629✔
3646
  if (pTableListInfo == NULL) {
755,629✔
UNCOV
3647
    return;
×
3648
  }
3649

3650
  taosArrayClear(pTableListInfo->pTableList);
755,629✔
3651
  taosHashClear(pTableListInfo->map);
755,900✔
3652
  taosHashClear(pTableListInfo->remainGroups);
756,508✔
3653
  taosMemoryFree(pTableListInfo->groupOffset);
756,508✔
3654
  pTableListInfo->numOfOuputGroups = 1;
756,508✔
3655
  pTableListInfo->oneTableForEachGroup = false;
756,508✔
3656
}
3657

3658
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
507,373,105✔
3659
  STableKeyInfo* pInfo1 = (STableKeyInfo*)p1;
507,373,105✔
3660
  STableKeyInfo* pInfo2 = (STableKeyInfo*)p2;
507,373,105✔
3661

3662
  if (pInfo1->groupId == pInfo2->groupId) {
507,373,105✔
3663
    return 0;
473,997,039✔
3664
  } else {
3665
    return pInfo1->groupId < pInfo2->groupId ? -1 : 1;
33,377,637✔
3666
  }
3667
}
3668

3669
int32_t sortTableGroup(STableListInfo* pTableListInfo) {
27,964,023✔
3670
  int32_t code = TSDB_CODE_SUCCESS;
27,964,023✔
3671
  taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
27,964,023✔
3672
  int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
27,984,560✔
3673
  if (size == 0) {
27,975,912✔
UNCOV
3674
    pTableListInfo->numOfOuputGroups = 0;
×
3675
    return code;
×
3676
  }
3677

3678
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
27,975,912✔
3679
  if (!pList) {
27,964,918✔
UNCOV
3680
    code = terrno;
×
3681
    goto end;
×
3682
  }
3683

3684
  STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
27,964,918✔
3685
  if (pInfo == NULL) {
27,962,271✔
UNCOV
3686
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3687
    code = terrno;
×
3688
    goto end;
×
3689
  }
3690
  uint64_t gid = pInfo->groupId;
27,962,271✔
3691

3692
  int32_t start = 0;
27,971,218✔
3693
  void*   tmp = taosArrayPush(pList, &start);
27,977,242✔
3694
  if (tmp == NULL) {
27,977,242✔
UNCOV
3695
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3696
    code = terrno;
×
3697
    goto end;
×
3698
  }
3699

3700
  for (int32_t i = 1; i < size; ++i) {
146,530,969✔
3701
    pInfo = taosArrayGet(pTableListInfo->pTableList, i);
118,563,054✔
3702
    if (pInfo == NULL) {
118,548,308✔
UNCOV
3703
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3704
      code = terrno;
×
3705
      goto end;
×
3706
    }
3707
    if (pInfo->groupId != gid) {
118,548,308✔
3708
      tmp = taosArrayPush(pList, &i);
6,837,494✔
3709
      if (tmp == NULL) {
6,837,494✔
UNCOV
3710
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3711
        code = terrno;
×
3712
        goto end;
×
3713
      }
3714
      gid = pInfo->groupId;
6,837,494✔
3715
    }
3716
  }
3717

3718
  pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
27,979,508✔
3719
  pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
27,976,775✔
3720
  if (pTableListInfo->groupOffset == NULL) {
27,950,465✔
UNCOV
3721
    code = terrno;
×
3722
    goto end;
×
3723
  }
3724

3725
  memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
27,950,183✔
3726

3727
end:
27,956,203✔
3728
  taosArrayDestroy(pList);
27,957,854✔
3729
  return code;
27,923,022✔
3730
}
3731

3732
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SScanPhysiNode* pScanNode,
165,445,613✔
3733
                                    SNodeList* group, bool groupSort, uint8_t* digest, SStorageAPI* pAPI, SHashObj* groupIdMap) {
3734
  int32_t code = TSDB_CODE_SUCCESS;
165,445,613✔
3735

3736
  bool   groupByTbname = groupbyTbname(group);
165,445,613✔
3737
  size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
165,362,515✔
3738
  if (!numOfTables) {
165,373,738✔
3739
    return code;
3,341✔
3740
  }
3741
  qDebug("numOfTables:%zu, groupByTbname:%d, group:%p", numOfTables, groupByTbname, group);
165,370,397✔
3742
  if (group == NULL || groupByTbname) {
165,403,248✔
3743
    if (tsCountAlwaysReturnValue && QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode) &&
161,434,032✔
3744
        ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) {
119,426,610✔
3745
      pTableListInfo->remainGroups =
13,929,060✔
3746
          taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
13,929,060✔
3747
      if (pTableListInfo->remainGroups == NULL) {
13,929,060✔
UNCOV
3748
        return terrno;
×
3749
      }
3750

3751
      for (int i = 0; i < numOfTables; i++) {
66,812,552✔
3752
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
52,881,992✔
3753
        if (!info) {
52,882,992✔
UNCOV
3754
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3755
          return terrno;
×
3756
        }
3757
        info->groupId = groupByTbname ? info->uid : 0;
52,882,992✔
3758
        int32_t tempRes = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId),
52,882,492✔
3759
                                      &(info->uid), sizeof(info->uid));
52,882,992✔
3760
        if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
52,883,492✔
UNCOV
3761
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
3762
          return tempRes;
×
3763
        }
3764
      }
3765
    } else {
3766
      for (int32_t i = 0; i < numOfTables; i++) {
650,784,120✔
3767
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
503,375,347✔
3768
        if (!info) {
503,359,575✔
UNCOV
3769
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3770
          return terrno;
×
3771
        }
3772
        info->groupId = groupByTbname ? info->uid : 0;
503,359,575✔
3773
        
3774
      }
3775
    }
3776
    if (groupIdMap && group != NULL){
161,339,333✔
3777
      getColInfoResultForGroupbyForStream(pHandle->vnode, group, pTableListInfo, pAPI, groupIdMap);
66,847✔
3778
    }
3779

3780
    pTableListInfo->oneTableForEachGroup = groupByTbname;
161,339,124✔
3781
    if (numOfTables == 1 && pTableListInfo->idInfo.tableType == TSDB_CHILD_TABLE) {
161,418,965✔
3782
      pTableListInfo->oneTableForEachGroup = true;
35,696,224✔
3783
    }
3784

3785
    if (groupSort && groupByTbname) {
161,423,977✔
3786
      taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
2,192,173✔
3787
      pTableListInfo->numOfOuputGroups = numOfTables;
2,192,521✔
3788
    } else if (groupByTbname && pScanNode->groupOrderScan) {
159,231,804✔
3789
      pTableListInfo->numOfOuputGroups = numOfTables;
31,230✔
3790
    } else {
3791
      pTableListInfo->numOfOuputGroups = 1;
159,201,916✔
3792
    }
3793
    if (groupSort || pScanNode->groupOrderScan) {
161,489,849✔
3794
      code = sortTableGroup(pTableListInfo);
27,841,932✔
3795
    }
3796
  } else {
3797
    bool initRemainGroups = false;
3,969,216✔
3798
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) {
3,969,216✔
3799
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode;
3,880,680✔
3800
      if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable &&
3,880,680✔
3801
          !(groupSort || pScanNode->groupOrderScan)) {
1,939,790✔
3802
        initRemainGroups = true;
1,916,350✔
3803
      }
3804
    }
3805

3806
    code = getColInfoResultForGroupby(pHandle->vnode, group, pTableListInfo, digest, pAPI, initRemainGroups, groupIdMap);
3,969,216✔
3807
    if (code != TSDB_CODE_SUCCESS) {
3,968,716✔
UNCOV
3808
      return code;
×
3809
    }
3810

3811
    if (pScanNode->groupOrderScan) pTableListInfo->numOfOuputGroups = taosArrayGetSize(pTableListInfo->pTableList);
3,968,716✔
3812

3813
    if (groupSort || pScanNode->groupOrderScan) {
3,968,716✔
3814
      code = sortTableGroup(pTableListInfo);
134,198✔
3815
    }
3816
  }
3817

3818
  // add all table entry in the hash map
3819
  size_t size = taosArrayGetSize(pTableListInfo->pTableList);
165,386,874✔
3820
  for (int32_t i = 0; i < size; ++i) {
743,936,069✔
3821
    STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i);
578,529,490✔
3822
    if (!p) {
578,315,000✔
UNCOV
3823
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3824
      return terrno;
×
3825
    }
3826
    int32_t tempRes = taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t));
578,315,000✔
3827
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
578,548,438✔
UNCOV
3828
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
3829
      return tempRes;
×
3830
    }
3831
  }
3832

3833
  return code;
165,484,366✔
3834
}
3835

3836
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
181,675,622✔
3837
                                STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
3838
                                SExecTaskInfo* pTaskInfo, SHashObj* groupIdMap) {
3839
  int64_t     st = taosGetTimestampUs();
181,678,119✔
3840
  const char* idStr = GET_TASKID(pTaskInfo);
181,678,119✔
3841

3842
  if (pHandle == NULL) {
181,443,123✔
UNCOV
3843
    qError("invalid handle, in creating operator tree, %s", idStr);
×
3844
    return TSDB_CODE_INVALID_PARA;
×
3845
  }
3846

3847
  if (pHandle->uid != 0) {
181,443,123✔
3848
    pScanNode->uid = pHandle->uid;
44,968✔
3849
    pScanNode->tableType = TSDB_CHILD_TABLE;
44,968✔
3850
  }
3851
  uint8_t digest[17] = {0};
181,648,792✔
3852
  int32_t code = getTableList(pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo, digest, idStr,
181,581,274✔
3853
                              &pTaskInfo->storageAPI, pTaskInfo->pStreamRuntimeInfo);
181,661,759✔
3854
  if (code != TSDB_CODE_SUCCESS) {
181,771,766✔
3855
    qError("failed to getTableList, code:%s", tstrerror(code));
914✔
3856
    return code;
914✔
3857
  }
3858

3859
  int32_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
181,770,852✔
3860

3861
  int64_t st1 = taosGetTimestampUs();
181,769,913✔
3862
  pTaskInfo->cost.extractListTime = (st1 - st) / 1000.0;
181,769,913✔
3863
  qDebug("extract queried table list completed, %d tables, elapsed time:%.2f ms %s", numOfTables,
181,728,103✔
3864
         pTaskInfo->cost.extractListTime, idStr);
3865

3866
  if (numOfTables == 0) {
181,742,639✔
3867
    qDebug("no table qualified for query, %s", idStr);
16,355,808✔
3868
    return TSDB_CODE_SUCCESS;
16,355,808✔
3869
  }
3870

3871
  code = buildGroupIdMapForAllTables(pTableListInfo, pHandle, pScanNode, pGroupTags, groupSort, digest, &pTaskInfo->storageAPI, groupIdMap);
165,386,831✔
3872
  if (code != TSDB_CODE_SUCCESS) {
165,454,334✔
UNCOV
3873
    return code;
×
3874
  }
3875

3876
  pTaskInfo->cost.groupIdMapTime = (taosGetTimestampUs() - st1) / 1000.0;
165,462,294✔
3877
  qDebug("generate group id map completed, elapsed time:%.2f ms %s", pTaskInfo->cost.groupIdMapTime, idStr);
165,440,156✔
3878

3879
  return TSDB_CODE_SUCCESS;
165,451,645✔
3880
}
3881

3882
char* getStreamOpName(uint16_t opType) {
5,647,409✔
3883
  switch (opType) {
5,647,409✔
UNCOV
3884
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
×
3885
      return "stream scan";
×
3886
    case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
5,481,338✔
3887
      return "project";
5,481,338✔
3888
    case QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW:
166,071✔
3889
      return "external window";
166,071✔
3890
  }
UNCOV
3891
  return "error name";
×
3892
}
3893

3894
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr, int64_t qId) {
707,950,619✔
3895
  if (qDebugFlag & DEBUG_TRACE) {
707,950,619✔
3896
    if (!pBlock) {
1,127,672✔
3897
      qDebug("%" PRIx64 " %s %s %s: Block is Null", qId, taskIdStr, flag, __func__);
3,638✔
3898
      return;
3,638✔
3899
    } else if (pBlock->info.rows == 0) {
1,124,034✔
3900
      qDebug("%" PRIx64 " %s %s %s: Block is Empty. block type %d", qId, taskIdStr, flag, __func__, pBlock->info.type);
736✔
3901
      return;
736✔
3902
    }
3903
    
3904
    char*   pBuf = NULL;
1,123,298✔
3905
    int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr, qId);
1,123,298✔
3906
    if (code == 0) {
1,123,298✔
3907
      qDebugL("%" PRIx64 " %s %s", qId, __func__, pBuf);
1,123,298✔
3908
      taosMemoryFree(pBuf);
1,123,298✔
3909
    }
3910
  }
3911
}
3912

UNCOV
3913
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
×
3914
  if (!pBlock) {
×
3915
    qDebug("%s===stream===%s %s: Block is Null", taskIdStr, flag, opStr);
×
3916
    return;
×
3917
  } else if (pBlock->info.rows == 0) {
×
3918
    qDebug("%s===stream===%s %s: Block is Empty. block type %d.skey:%" PRId64 ",ekey:%" PRId64 ",version%" PRId64,
×
3919
           taskIdStr, flag, opStr, pBlock->info.type, pBlock->info.window.skey, pBlock->info.window.ekey,
3920
           pBlock->info.version);
UNCOV
3921
    return;
×
3922
  }
UNCOV
3923
  if (qDebugFlag & DEBUG_TRACE) {
×
3924
    char* pBuf = NULL;
×
3925
    char  flagBuf[64];
×
3926
    snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
×
3927
    int32_t code = dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr, 0);
×
3928
    if (code == 0) {
×
3929
      qDebug("%s", pBuf);
×
3930
      taosMemoryFree(pBuf);
×
3931
    }
3932
  }
3933
}
3934

3935
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
13,087,333✔
3936

3937
void updateTimeWindowInfo(SColumnInfoData* pColData, const STimeWindow* pWin, int64_t delta) {
2,147,483,647✔
3938
  int64_t* ts = (int64_t*)pColData->pData;
2,147,483,647✔
3939

3940
  int64_t duration = pWin->ekey > pWin->skey ? pWin->ekey - pWin->skey + delta : pWin->skey - pWin->ekey + delta;
2,147,483,647✔
3941
  ts[2] = duration;            // set the duration
2,147,483,647✔
3942
  ts[3] = pWin->skey;          // window start key
2,147,483,647✔
3943
  ts[4] = pWin->ekey + delta;  // window end key
2,147,483,647✔
3944
}
2,147,483,647✔
3945

3946
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock,
2,147,483,647✔
3947
                 int32_t rowIndex) {
3948
  SColumnDataAgg* pColAgg = NULL;
2,147,483,647✔
3949
  const char*     isNull = oldkeyBuf;
2,147,483,647✔
3950
  const char*     p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size;
2,147,483,647✔
3951

3952
  for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
2,147,483,647✔
3953
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
2,147,483,647✔
3954
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
2,147,483,647✔
3955
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
2,147,483,647✔
3956

3957
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
2,147,483,647✔
3958
      if (isNull[i] != 1) return 1;
255,326,427✔
3959
    } else {
3960
      if (isNull[i] != 0) return 1;
2,147,483,647✔
3961
      const char* val = colDataGetData(pColInfoData, rowIndex);
2,147,483,647✔
3962
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
UNCOV
3963
        int32_t len = getJsonValueLen(val);
×
3964
        if (memcmp(p, val, len) != 0) return 1;
×
3965
        p += len;
×
3966
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
2,147,483,647✔
3967
        if (IS_STR_DATA_BLOB(pCol->type)) {
1,180,262,710✔
UNCOV
3968
          if (memcmp(p, val, blobDataTLen(val)) != 0) return 1;
×
UNCOV
3969
          p += blobDataTLen(val);
×
3970
        } else {
3971
          if (memcmp(p, val, varDataTLen(val)) != 0) return 1;
1,180,854,408✔
3972
          p += varDataTLen(val);
1,180,323,453✔
3973
        }
3974
      } else {
3975
        if (0 != memcmp(p, val, pCol->bytes)) return 1;
2,147,483,647✔
3976
        p += pCol->bytes;
2,147,483,647✔
3977
      }
3978
    }
3979
  }
3980
  if ((int32_t)(p - oldkeyBuf) != oldKeysLen) return 1;
2,147,483,647✔
3981
  return 0;
2,147,483,647✔
3982
}
3983

3984
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex) {
1,558,342✔
3985
  uint32_t        colNum = pSortGroupCols->size;
1,558,342✔
3986
  SColumnDataAgg* pColAgg = NULL;
1,558,342✔
3987
  char*           isNull = keyBuf;
1,558,342✔
3988
  char*           p = keyBuf + sizeof(int8_t) * colNum;
1,558,342✔
3989

3990
  for (int32_t i = 0; i < colNum; ++i) {
4,265,994✔
3991
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
2,707,652✔
3992
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
2,707,652✔
3993
    if (pCol->slotId > pBlock->pDataBlock->size) continue;
2,707,652✔
3994

3995
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
2,707,652✔
3996

3997
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
5,415,304✔
3998
      isNull[i] = 1;
131,100✔
3999
    } else {
4000
      isNull[i] = 0;
2,576,552✔
4001
      const char* val = colDataGetData(pColInfoData, rowIndex);
2,576,552✔
4002
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
2,576,552✔
UNCOV
4003
        int32_t len = getJsonValueLen(val);
×
4004
        memcpy(p, val, len);
×
4005
        p += len;
×
4006
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
2,576,552✔
4007
        if (IS_STR_DATA_BLOB(pCol->type)) {
962,711✔
UNCOV
4008
          blobDataCopy(p, val);
×
4009
          p += blobDataTLen(val);
×
4010
        } else {
4011
          varDataCopy(p, val);
962,711✔
4012
          p += varDataTLen(val);
962,711✔
4013
        }
4014
      } else {
4015
        memcpy(p, val, pCol->bytes);
1,613,841✔
4016
        p += pCol->bytes;
1,613,841✔
4017
      }
4018
    }
4019
  }
4020
  return (int32_t)(p - keyBuf);
1,558,342✔
4021
}
4022

4023
uint64_t calcGroupId(char* pData, int32_t len) {
2,147,483,647✔
4024
  T_MD5_CTX context;
2,147,483,647✔
4025
  tMD5Init(&context);
2,147,483,647✔
4026
  tMD5Update(&context, (uint8_t*)pData, len);
2,147,483,647✔
4027
  tMD5Final(&context);
2,147,483,647✔
4028

4029
  // NOTE: only extract the initial 8 bytes of the final MD5 digest
4030
  uint64_t id = 0;
2,147,483,647✔
4031
  memcpy(&id, context.digest, sizeof(uint64_t));
2,147,483,647✔
4032
  if (0 == id) memcpy(&id, context.digest + 8, sizeof(uint64_t));
2,147,483,647✔
4033
  return id;
2,147,483,647✔
4034
}
4035

4036
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
103,132✔
4037
  SNode*     node;
4038
  SNodeList* ret = NULL;
103,132✔
4039
  FOREACH(node, pSortKeys) {
314,640✔
4040
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)node;
211,508✔
4041
    int32_t           code = nodesListMakeAppend(&ret, pSortKey->pExpr);
211,508✔
4042
    if (code != TSDB_CODE_SUCCESS) {
211,508✔
UNCOV
4043
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
4044
      terrno = code;
×
4045
      return NULL;
×
4046
    }
4047
  }
4048
  return ret;
103,132✔
4049
}
4050

4051
int32_t extractKeysLen(const SArray* keys, int32_t* pLen) {
103,132✔
4052
  int32_t code = TSDB_CODE_SUCCESS;
103,132✔
4053
  int32_t lino = 0;
103,132✔
4054
  int32_t len = 0;
103,132✔
4055
  int32_t keyNum = taosArrayGetSize(keys);
103,132✔
4056
  for (int32_t i = 0; i < keyNum; ++i) {
262,200✔
4057
    SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
159,068✔
4058
    QUERY_CHECK_NULL(pCol, code, lino, _end, terrno);
159,068✔
4059
    len += pCol->bytes;
159,068✔
4060
  }
4061
  len += sizeof(int8_t) * keyNum;  // null flag
103,132✔
4062
  *pLen = len;
103,132✔
4063

4064
_end:
103,132✔
4065
  if (code != TSDB_CODE_SUCCESS) {
103,132✔
UNCOV
4066
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4067
  }
4068
  return code;
103,132✔
4069
}
4070

UNCOV
4071
int32_t parseErrorMsgFromAnalyticServer(SJson* pJson, const char* pId) {
×
4072
  int32_t code = TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
4073
  if (pJson == NULL) {
×
4074
    return code;
×
4075
  }
4076

UNCOV
4077
  char    pMsg[1024] = {0};
×
4078
  int32_t ret = tjsonGetStringValue(pJson, "msg", pMsg);
×
4079

UNCOV
4080
  if (ret == 0) {
×
4081
    qError("%s failed to exec imputation, msg:%s", pId, pMsg);
×
4082
    if (strstr(pMsg, "white noise") != NULL) {
×
4083
      code = TSDB_CODE_ANA_WN_DATA;
×
4084
    } else if (strstr(pMsg, "white-noise") != NULL) {
×
4085
      code = TSDB_CODE_ANA_WN_DATA;
×
4086
    } else if (strstr(pMsg, "[Errno 111] Connection refused") != NULL) {
×
4087
      code = TSDB_CODE_ANA_ALGO_NOT_LOAD;
×
4088
    }
4089
  } else {
UNCOV
4090
    qError("%s failed to extract msg from server, unknown error", pId);
×
4091
  }
4092

UNCOV
4093
  return code;
×
4094
}
4095

4096

4097
int32_t createExprSubQResBlock(SSDataBlock** ppBlock, SDataType* pResType) {
1,396,060✔
4098
  int32_t code = 0;
1,396,060✔
4099
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
1,396,060✔
4100
  if (pBlock == NULL) {
1,395,344✔
UNCOV
4101
    return terrno;
×
4102
  }
4103

4104
  pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
1,395,344✔
4105
  if (pBlock->pDataBlock == NULL) {
1,396,060✔
UNCOV
4106
    code = terrno;
×
4107
    taosMemoryFree(pBlock);
×
4108
    return code;
×
4109
  }
4110

4111
  SColumnInfoData idata =
1,396,060✔
4112
      createColumnInfoData(pResType->type, pResType->bytes, 0);
1,396,060✔
4113
  idata.info.scale = pResType->scale;
1,396,060✔
4114
  idata.info.precision = pResType->precision;
1,396,060✔
4115

4116
  code = blockDataAppendColInfo(pBlock, &idata);
1,396,060✔
4117
  if (code != TSDB_CODE_SUCCESS) {
1,396,060✔
UNCOV
4118
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
4119
    blockDataDestroy(pBlock);
×
4120
    *ppBlock = NULL;
×
4121
    return code;
×
4122
  }
4123

4124
  *ppBlock = pBlock;
1,396,060✔
4125

4126
  return code;
1,396,060✔
4127
}
4128

4129

4130
int32_t extractSingleRspBlock(SRetrieveTableRsp* pRetrieveRsp, SSDataBlock* pb) {
1,396,060✔
4131
  int32_t            code = TSDB_CODE_SUCCESS;
1,396,060✔
4132
  int32_t            lino = 0;
1,396,060✔
4133
  void*              decompBuf = NULL;
1,396,060✔
4134

4135
  char* pNextStart = pRetrieveRsp->data;
1,396,060✔
4136
  char* pStart = pNextStart;
1,395,702✔
4137

4138
  int32_t index = 0;
1,396,060✔
4139

4140
  if (pRetrieveRsp->compressed) {  // decompress the data
1,396,060✔
UNCOV
4141
    decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
4142
    QUERY_CHECK_NULL(decompBuf, code, lino, _end, terrno);
×
4143
  }
4144

4145
  int32_t compLen = *(int32_t*)pStart;
1,396,060✔
4146
  pStart += sizeof(int32_t);
1,396,060✔
4147

4148
  int32_t rawLen = *(int32_t*)pStart;
1,396,060✔
4149
  pStart += sizeof(int32_t);
1,396,060✔
4150
  QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
1,396,060✔
4151

4152
  pNextStart = pStart + compLen;
1,396,060✔
4153
  if (pRetrieveRsp->compressed && (compLen < rawLen)) {
1,395,702✔
UNCOV
4154
    int32_t t = tsDecompressString(pStart, compLen, 1, decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
4155
    QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
4156
    pStart = decompBuf;
×
4157
  }
4158

4159
  code = blockDecodeInternal(pb, pStart, (const char**)&pStart);
1,396,060✔
4160
  if (code != 0) {
1,395,702✔
UNCOV
4161
    taosMemoryFreeClear(pRetrieveRsp);
×
4162
    goto _end;
×
4163
  }
4164

4165
_end:
1,395,702✔
4166
  if (code != TSDB_CODE_SUCCESS) {
1,396,060✔
UNCOV
4167
    blockDataDestroy(pb);
×
4168
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4169
  }
4170
  return code;
1,396,060✔
4171
}
4172

4173
int32_t setValueFromResBlock(STaskSubJobCtx* ctx, SRemoteValueNode* pRes, SSDataBlock* pBlock) {
1,204,872✔
4174
  int32_t code = 0;
1,204,872✔
4175
  bool needFree = true;
1,204,872✔
4176
  int32_t colNum = taosArrayGetSize(pBlock->pDataBlock);
1,204,872✔
4177
  if (NULL == pBlock->pDataBlock || 1 != colNum || pBlock->info.rows > 1) {
1,204,872✔
4178
    qError("%s invalid scl fetch res block, pDataBlock:%p, colNum:%d, rows:%" PRId64, 
358✔
4179
      ctx->idStr, pBlock->pDataBlock, colNum, pBlock->info.rows);
UNCOV
4180
    return TSDB_CODE_PAR_INVALID_SCALAR_SUBQ_RES_ROWS;
×
4181
  }
4182
  
4183
  pRes->val.node.type = QUERY_NODE_VALUE;
1,204,514✔
4184
  pRes->val.flag &= (~VALUE_FLAG_VAL_UNSET);
1,204,872✔
4185
  pRes->val.translate = true;
1,204,872✔
4186
  
4187
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0);
1,204,514✔
4188
  if (colDataIsNull_s(pCol, 0)) {
1,204,872✔
4189
    pRes->val.isNull = true;
74,106✔
4190
  } else {
4191
    code = nodesSetValueNodeValueExt(&pRes->val, colDataGetData(pCol, 0), &needFree);
1,130,766✔
4192
  }
4193

4194
  if (!needFree) {
1,204,514✔
UNCOV
4195
    pCol->pData = NULL;
×
4196
  }
4197

4198
  return code;
1,204,514✔
4199
}
4200

4201
void handleRemoteValueRes(SScalarFetchParam* pParam, STaskSubJobCtx* ctx, SRetrieveTableRsp* pRsp) {
1,317,284✔
4202
  SSDataBlock* pResBlock = NULL;
1,317,284✔
4203

4204
  qDebug("%s scl fetch rsp received, subQIdx:%d, rows:%" PRId64 , ctx->idStr, pParam->subQIdx, pRsp->numOfRows);
1,317,284✔
4205

4206
  if (pRsp->numOfRows > 1 || pRsp->numOfBlocks > 1 || !pRsp->completed) {
1,317,284✔
UNCOV
4207
    qError("%s invalid scl fetch rsp received, subQIdx:%d, rows:%" PRId64 ", blocks:%d, completed:%d", 
×
4208
      ctx->idStr, pParam->subQIdx, pRsp->numOfRows, pRsp->numOfBlocks, pRsp->completed);
UNCOV
4209
    ctx->code = TSDB_CODE_PAR_INVALID_SCALAR_SUBQ_RES_ROWS;
×
4210

UNCOV
4211
    return;
×
4212
  }
4213

4214
  if (0 == pRsp->numOfRows) {
1,317,284✔
4215
    SRemoteValueNode* pRemote = (SRemoteValueNode*)pParam->pRes;
112,412✔
4216
    pRemote->val.node.type = QUERY_NODE_VALUE;
112,412✔
4217
    pRemote->val.isNull = true;
112,412✔
4218
    pRemote->val.translate = true;
112,412✔
4219
    pRemote->val.flag &= (~VALUE_FLAG_VAL_UNSET);
112,412✔
4220
    taosArraySet(ctx->subResNodes, pParam->subQIdx, &pParam->pRes);
112,412✔
4221

4222
    return;
112,412✔
4223
  }
4224
  
4225
  ctx->code = createExprSubQResBlock(&pResBlock, &((SRemoteValueNode*)pParam->pRes)->val.node.resType);
1,204,872✔
4226
  if (TSDB_CODE_SUCCESS == ctx->code) {
1,204,872✔
4227
    ctx->code = blockDataEnsureCapacity(pResBlock, 1);
1,204,872✔
4228
  }
4229
  if (TSDB_CODE_SUCCESS == ctx->code) {
1,204,872✔
4230
    ctx->code = extractSingleRspBlock(pRsp, pResBlock);
1,204,872✔
4231
  }
4232
  if (TSDB_CODE_SUCCESS == ctx->code) {
1,204,872✔
4233
    ctx->code = setValueFromResBlock(ctx, (SRemoteValueNode*)pParam->pRes, pResBlock);
1,204,514✔
4234
  }
4235
  if (TSDB_CODE_SUCCESS == ctx->code) {
1,204,872✔
4236
    taosArraySet(ctx->subResNodes, pParam->subQIdx, &pParam->pRes);
1,204,514✔
4237
  }
4238

4239
  blockDataDestroy(pResBlock);  
1,204,514✔
4240
}
4241

4242

4243
int32_t updateValueListFromResBlock(STaskSubJobCtx* ctx, SRemoteValueListNode* pRes, SSDataBlock* pBlock) {
191,188✔
4244
  int32_t code = 0, lino = 0;
191,188✔
4245
  int32_t colNum = taosArrayGetSize(pBlock->pDataBlock);
191,188✔
4246
  if (NULL == pBlock->pDataBlock || 1 != colNum) {
190,830✔
UNCOV
4247
    qError("%s invalid scl fetch res block, pDataBlock:%p, colNum:%d", ctx->idStr, pBlock->pDataBlock, colNum);
×
4248
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
4249
  }
4250

4251
  pRes->hasValue = true;
191,188✔
4252
  
4253
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0);
191,188✔
4254
  TAOS_CHECK_EXIT(scalarBuildRemoteListHash(pRes, pCol, pBlock->info.rows));
190,830✔
4255

4256
_exit:
191,188✔
4257

4258
  if (code) {
191,188✔
UNCOV
4259
    qError("%s %s failed with error: %s", ctx->idStr, __func__, tstrerror(code));
×
4260
  }
4261
  
4262
  return code;
191,188✔
4263
}
4264

4265

4266

4267
void handleRemoteValueListRes(SScalarFetchParam* pParam, STaskSubJobCtx* ctx, SRetrieveTableRsp* pRsp, bool* fetchDone) {
191,188✔
4268
  SSDataBlock* pResBlock = NULL;
191,188✔
4269
  SRemoteValueListNode* pRemote = (SRemoteValueListNode*)pParam->pRes;
191,188✔
4270

4271
  qDebug("%s scl fetch rsp received, subQIdx:%d, rows:%" PRId64 , ctx->idStr, pParam->subQIdx, pRsp->numOfRows);
191,188✔
4272

4273
  if (pRsp->numOfRows > 0) {
191,188✔
4274
    ctx->code = createExprSubQResBlock(&pResBlock, &((SExprNode*)pParam->pRes)->resType);
190,830✔
4275
    if (TSDB_CODE_SUCCESS == ctx->code) {
191,188✔
4276
      ctx->code = blockDataEnsureCapacity(pResBlock, pRsp->numOfRows);
191,188✔
4277
    }
4278
    if (TSDB_CODE_SUCCESS == ctx->code) {
190,830✔
4279
      ctx->code = extractSingleRspBlock(pRsp, pResBlock);
191,188✔
4280
    }
4281
    if (TSDB_CODE_SUCCESS == ctx->code) {
191,188✔
4282
      ctx->code = updateValueListFromResBlock(ctx, pRemote, pResBlock);
190,830✔
4283
    }
4284
    if (TSDB_CODE_SUCCESS == ctx->code && pRsp->completed) {
191,546✔
4285
      pRemote->flag &= (~VALUELIST_FLAG_VAL_UNSET);
143,432✔
4286
      taosArraySet(ctx->subResNodes, pParam->subQIdx, &pParam->pRes);
143,432✔
4287
    }
4288

4289
    blockDataDestroy(pResBlock);  
191,188✔
UNCOV
4290
  } else if (0 == pRsp->numOfRows && pRsp->completed) {
×
4291
    if (!pRemote->hasValue) {
×
4292
      ctx->code = scalarBuildRemoteListHash(pRemote, NULL, 0);
×
4293
    }
UNCOV
4294
    if (TSDB_CODE_SUCCESS == ctx->code) {    
×
4295
      pRemote->flag &= (~VALUELIST_FLAG_VAL_UNSET);
×
4296
      taosArraySet(ctx->subResNodes, pParam->subQIdx, &pParam->pRes);
×
4297
    }
4298
  }
4299

4300
  *fetchDone = (TSDB_CODE_SUCCESS != ctx->code || pRsp->completed) ? true : false;
191,188✔
4301

4302
  if (!(*fetchDone)) {
191,188✔
4303
    int32_t code = sendFetchRemoteNodeReq(ctx, pParam->subQIdx, pParam->pRes);
47,756✔
4304
    if (TSDB_CODE_SUCCESS != code) {
47,756✔
UNCOV
4305
      ctx->code = code;
×
4306
      *fetchDone = true;
×
4307
    }
4308
  }
4309
}
191,188✔
4310

4311

4312
int32_t remoteFetchCallBack(void* param, SDataBuf* pMsg, int32_t code) {
2,076,976✔
4313
  SScalarFetchParam* pParam = (SScalarFetchParam*)param;
2,076,976✔
4314
  STaskSubJobCtx* ctx = pParam->pSubJobCtx;
2,076,976✔
4315
  char idStr[64];
2,077,334✔
4316
  
4317
  taosMemoryFreeClear(pMsg->pEpSet);
2,076,976✔
4318

4319
  if (NULL == ctx) {
2,077,334✔
4320
    qWarn("scl fetch ctx not exists since it may have been released");
2,864✔
4321
    goto _exit;
2,864✔
4322
  }
4323

4324
  if (qDebugFlag & DEBUG_DEBUG) {
2,074,470✔
4325
    strncpy(idStr, ctx->idStr, sizeof(idStr));
134,468✔
4326
  }
4327
  
4328
  qDebug("%s subQIdx %d got rsp, blockIdx:%" PRId64 ", code:%d, rsp:%p", ctx->idStr, pParam->subQIdx, ctx->blockIdx, code, pMsg->pData);
2,074,470✔
4329

4330
  taosWLockLatch(&ctx->lock);
2,074,470✔
4331
  ctx->param = NULL;
2,074,470✔
4332
  taosWUnLockLatch(&ctx->lock);
2,074,470✔
4333

4334
  if (ctx->transporterId > 0) {
2,074,112✔
4335
    int32_t ret = asyncFreeConnById(ctx->rpcHandle, ctx->transporterId);
2,074,112✔
4336
    if (ret != 0) {
2,074,470✔
UNCOV
4337
      qDebug("%s failed to free subQ rpc handle, code:%s, subQIdx:%d", ctx->idStr, tstrerror(ret), pParam->subQIdx);
×
4338
    }
4339
    ctx->transporterId = -1;
2,074,470✔
4340
  }
4341

4342
  if (0 == code && NULL == pMsg->pData) {
2,074,112✔
UNCOV
4343
    qError("%s invalid rsp msg, msgType:%d, len:%d", ctx->idStr, pMsg->msgType, pMsg->len);
×
4344
    code = TSDB_CODE_QRY_INVALID_MSG;
×
4345
  }
4346

4347
  if (code == TSDB_CODE_SUCCESS) {
2,074,112✔
4348
    SRetrieveTableRsp* pRsp = pMsg->pData;
1,508,472✔
4349
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
1,508,472✔
4350
    pRsp->compLen = htonl(pRsp->compLen);
1,508,472✔
4351
    pRsp->payloadLen = htonl(pRsp->payloadLen);
1,508,472✔
4352
    pRsp->numOfCols = htonl(pRsp->numOfCols);
1,508,472✔
4353
    pRsp->useconds = htobe64(pRsp->useconds);
1,508,472✔
4354
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
1,508,472✔
4355

4356
    qDebug("%s subQIdx %d blockIdx:%" PRIu64 " rsp detail, numOfBlocks:%d, numOfRows:%" PRId64 ", completed:%d", 
1,508,472✔
4357
      ctx->idStr, pParam->subQIdx, ctx->blockIdx, pRsp->numOfBlocks, pRsp->numOfRows, pRsp->completed);
4358

4359
    ctx->blockIdx++;
1,508,472✔
4360

4361
    switch (nodeType(pParam->pRes)) {
1,508,472✔
4362
      case QUERY_NODE_REMOTE_VALUE:
1,316,926✔
4363
        handleRemoteValueRes(pParam, ctx, pRsp);
1,316,926✔
4364
        break;
1,316,568✔
4365
      case QUERY_NODE_REMOTE_VALUE_LIST: {
191,188✔
4366
        bool fetchDone = false;
191,188✔
4367
        handleRemoteValueListRes(pParam, ctx, pRsp, &fetchDone);
191,188✔
4368
        qDebug("%s subQIdx %d handle remote value list finished, fetchDone:%d", idStr, pParam->subQIdx, fetchDone);
191,188✔
4369
        if (!fetchDone) {
191,188✔
4370
          goto _exit;
47,756✔
4371
        }
4372
        break;
143,432✔
4373
      }  
UNCOV
4374
      default:
×
4375
        qError("%s invalid scl fetch res node %d, subQIdx:%d", ctx->idStr, nodeType(pParam->pRes), pParam->subQIdx);
×
4376
        ctx->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
4377
        break;
×
4378
    }
4379
  } else {
4380
    ctx->code = rpcCvtErrCode(code);
565,640✔
4381
    if (ctx->code != code) {
565,640✔
UNCOV
4382
      qError("%s scl fetch rsp received, subQIdx:%d, error:%s, cvted error: %s", ctx->idStr, pParam->subQIdx,
×
4383
             tstrerror(code), tstrerror(ctx->code));
4384
    } else {
4385
      qError("%s scl fetch rsp received, subQIdx:%d, error:%s", ctx->idStr, pParam->subQIdx, tstrerror(code));
565,640✔
4386
    }
4387
  }
4388

4389
  qDebug("%s subQIdx %d sem_post subQ ready", ctx->idStr, pParam->subQIdx);
2,026,356✔
4390
  
4391
  code = tsem_post(&pParam->pSubJobCtx->ready);
2,026,356✔
4392
  if (code != TSDB_CODE_SUCCESS) {
2,026,714✔
UNCOV
4393
    qError("failed to invoke post when scl fetch rsp is ready, code:%s", tstrerror(code));
×
4394
  }
4395

4396
_exit:
2,076,976✔
4397

4398
  taosMemoryFree(pMsg->pData);
2,077,334✔
4399

4400
  return code;
2,076,976✔
4401
}
4402

4403

4404
int32_t sendFetchRemoteNodeReq(STaskSubJobCtx* ctx, int32_t subQIdx, SNode* pRes) {
2,076,976✔
4405
  int32_t          code = TSDB_CODE_SUCCESS;
2,076,976✔
4406
  int32_t          lino = 0;
2,076,976✔
4407
  SDownstreamSourceNode* pSource = (SDownstreamSourceNode*)taosArrayGetP(ctx->subEndPoints, subQIdx);
2,076,976✔
4408

4409
  SResFetchReq req = {0};
2,075,544✔
4410
  req.header.vgId = pSource->addr.nodeId;
2,075,544✔
4411
  req.sId = pSource->sId;
2,073,038✔
4412
  req.clientId = pSource->clientId;
2,075,544✔
4413
  req.taskId = pSource->taskId;
2,073,396✔
4414
  req.srcTaskId = ctx->taskId;
2,067,668✔
4415
  req.blockIdx = ctx->blockIdx;
2,069,100✔
4416
  req.queryId = ctx->queryId;
2,074,470✔
4417
  req.execId = pSource->execId;
2,070,532✔
4418

4419
  int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, false);
2,066,594✔
4420
  if (msgSize < 0) {
2,073,396✔
UNCOV
4421
    return msgSize;
×
4422
  }
4423

4424
  void* msg = taosMemoryCalloc(1, msgSize);
2,073,396✔
4425
  if (NULL == msg) {
2,067,868✔
UNCOV
4426
    return terrno;
×
4427
  }
4428

4429
  msgSize = tSerializeSResFetchReq(msg, msgSize, &req, false);
2,067,868✔
4430
  if (msgSize < 0) {
2,074,112✔
UNCOV
4431
    taosMemoryFree(msg);
×
4432
    return msgSize;
×
4433
  }
4434

4435
  qDebug("%s scl build fetch msg and send to nodeId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
2,074,112✔
4436
         ", execId:%d, blockIdx:%" PRId64,
4437
         ctx->idStr, pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
4438
         pSource->taskId, pSource->execId, req.blockIdx);
4439

4440
  // send the fetch remote task result reques
4441
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2,076,618✔
4442
  if (NULL == pMsgSendInfo) {
2,075,186✔
UNCOV
4443
    taosMemoryFreeClear(msg);
×
4444
    qError("%s prepare message %d failed", ctx->idStr, (int32_t)sizeof(SMsgSendInfo));
×
4445
    return terrno;
×
4446
  }
4447

4448
  SScalarFetchParam* param = taosMemoryMalloc(sizeof(SScalarFetchParam));
2,075,186✔
4449
  if (NULL == param) {
2,070,174✔
UNCOV
4450
    taosMemoryFreeClear(msg);
×
4451
    taosMemoryFreeClear(pMsgSendInfo);
×
4452
    qError("%s prepare param %d failed", ctx->idStr, (int32_t)sizeof(SScalarFetchParam));
×
4453
    return terrno;
×
4454
  }
4455

4456
  taosWLockLatch(&ctx->lock);
2,070,174✔
4457
  
4458
  if (ctx->code) {
2,079,482✔
UNCOV
4459
    qError("task has been killed, error:%s", tstrerror(ctx->code));
×
4460
    taosMemoryFree(param);
×
4461
    taosMemoryFreeClear(msg);
×
4462
    taosMemoryFreeClear(pMsgSendInfo);
×
4463
    code = ctx->code;
×
4464
    taosWUnLockLatch(&ctx->lock);
×
4465
    goto _end;
×
4466
  } else {
4467
    ctx->param = param;
2,072,680✔
4468
  }
4469
  
4470
  taosWUnLockLatch(&ctx->lock);
2,075,902✔
4471

4472
  param->subQIdx = subQIdx;
2,074,828✔
4473
  param->pRes = pRes;
2,076,618✔
4474
  param->pSubJobCtx = ctx;
2,068,742✔
4475

4476
  pMsgSendInfo->param = param;
2,075,544✔
4477
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
2,065,878✔
4478
  pMsgSendInfo->msgInfo.pData = msg;
2,077,334✔
4479
  pMsgSendInfo->msgInfo.len = msgSize;
2,071,606✔
4480
  pMsgSendInfo->msgType = pSource->fetchMsgType;
2,071,606✔
4481
  pMsgSendInfo->fp = remoteFetchCallBack;
2,078,766✔
4482
  pMsgSendInfo->requestId = ctx->queryId;
2,074,828✔
4483

4484
  code = asyncSendMsgToServer(ctx->rpcHandle, &pSource->addr.epSet, &ctx->transporterId, pMsgSendInfo);
2,071,248✔
4485
  QUERY_CHECK_CODE(code, lino, _end);
2,078,766✔
4486
      
4487
_end:
2,078,766✔
4488

4489
  if (code != TSDB_CODE_SUCCESS) {
2,078,766✔
UNCOV
4490
    qError("%s %s failed at line %d since %s", ctx->idStr, __func__, lino, tstrerror(code));
×
4491
  }
4492
  
4493
  return code;
2,079,124✔
4494
}
4495

4496
int32_t fetchRemoteNodeImpl(STaskSubJobCtx* ctx, int32_t subQIdx, SNode* pRes) {
2,028,862✔
4497
  int32_t          code = TSDB_CODE_SUCCESS;
2,028,862✔
4498
  int32_t          lino = 0;
2,028,862✔
4499

4500
  ctx->blockIdx = 0;
2,028,862✔
4501

4502
  code = sendFetchRemoteNodeReq(ctx, subQIdx, pRes);
2,029,936✔
4503
  QUERY_CHECK_CODE(code, lino, _end);
2,031,010✔
4504

4505
  code = qSemWait(ctx->pTaskInfo, &ctx->ready);
2,031,010✔
4506
  if (isTaskKilled(ctx->pTaskInfo)) {
2,032,084✔
4507
    code = getTaskCode(ctx->pTaskInfo);
8,950✔
4508
  } else {
4509
    code = ctx->code;
2,023,134✔
4510
  }
4511
      
4512
_end:
2,032,084✔
4513

4514
  taosWLockLatch(&ctx->lock);
2,032,084✔
4515
  ctx->param = NULL;
2,032,084✔
4516
  taosWUnLockLatch(&ctx->lock);
2,032,084✔
4517

4518
  if (code != TSDB_CODE_SUCCESS) {
2,032,084✔
4519
    qError("%s %s failed at line %d since %s", ctx->idStr, __func__, lino, tstrerror(code));
571,368✔
4520
  }
4521
  return code;
2,032,084✔
4522
}
4523

4524
int32_t remoteNodeCopy(SNode* pSrc, SNode* pDst) {
37,948✔
4525
  int32_t code = 0, lino = 0;
37,948✔
4526
  
4527
  switch (nodeType(pSrc)) {
37,948✔
4528
    case QUERY_NODE_VALUE:
37,948✔
4529
      TAOS_CHECK_EXIT(valueNodeCopy((SValueNode*)pSrc, &((SRemoteValueNode*)pDst)->val));
37,948✔
4530
      ((SRemoteValueNode*)pDst)->val.node.type = QUERY_NODE_VALUE;
37,948✔
4531
      break;
37,948✔
UNCOV
4532
    case QUERY_NODE_REMOTE_VALUE_LIST: {
×
4533
      SRemoteValueListNode* pDstNode = (SRemoteValueListNode*)pDst;
×
4534
      memcpy(pDst, pSrc, sizeof(SRemoteValueListNode));
×
4535
      pDstNode->hashAllocated = false;      
×
4536
      break;
×
4537
    } 
UNCOV
4538
    default:
×
4539
      break;
×
4540
  }
4541

4542
_exit:
37,948✔
4543

4544
  if (code) {
37,948✔
UNCOV
4545
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4546
  }
4547

4548
  return code;
37,948✔
4549
}
4550

4551
int32_t qFetchRemoteNode(void* pCtx, int32_t subQIdx, SNode* pRes) {
2,066,810✔
4552
  STaskSubJobCtx*  ctx = (STaskSubJobCtx*)pCtx;
2,066,810✔
4553
  int32_t code = 0, lino = 0;
2,066,810✔
4554
  int32_t       subEndPoinsNum = taosArrayGetSize(ctx->subEndPoints);
2,066,810✔
4555
  if (subQIdx >= subEndPoinsNum) {
2,060,724✔
UNCOV
4556
    qError("%s invalid subQIdx %d, subEndPointsNum:%d", ctx->idStr, subQIdx, subEndPoinsNum);
×
4557
    return TSDB_CODE_QRY_SUBQ_NOT_FOUND;
×
4558
  }
4559

4560
  SNode** ppRes = taosArrayGet(ctx->subResNodes, subQIdx);
2,060,724✔
4561
  if (NULL == *ppRes) {
2,065,378✔
4562
    TAOS_CHECK_EXIT(fetchRemoteNodeImpl(ctx, subQIdx, pRes));
2,029,220✔
4563
    *ppRes = pRes;
1,460,716✔
4564
  } else {
4565
    TAOS_CHECK_EXIT(remoteNodeCopy(*ppRes, pRes));
37,948✔
4566
  }
4567

4568
_exit:
37,948✔
4569

4570
  if (code) {
2,070,032✔
4571
    qError("%s %s failed at line %d since %s", ctx->idStr, __func__, lino, tstrerror(code));
571,368✔
4572
  } else {
4573
    qDebug("%s %s subQIdx %d succeed", ctx->idStr, __func__, subQIdx);
1,498,664✔
4574
  }
4575

4576
  return code;
2,070,032✔
4577
}
4578

4579

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc