• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4976

06 Mar 2026 09:48AM UTC coverage: 68.446% (+0.08%) from 68.37%
#4976

push

travis-ci

web-flow
feat(TDgpt): support multiple input data columns for anomaly detection. (#34606)

0 of 93 new or added lines in 9 files covered. (0.0%)

5718 existing lines in 144 files now uncovered.

211146 of 308486 relevant lines covered (68.45%)

136170362.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.45
/source/libs/executor/src/executil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "function.h"
17
#include "functionMgt.h"
18
#include "index.h"
19
#include "os.h"
20
#include "query.h"
21
#include "querynodes.h"
22
#include "taoserror.h"
23
#include "tarray.h"
24
#include "tcompare.h"
25
#include "tdatablock.h"
26
#include "thash.h"
27
#include "tmsg.h"
28
#include "ttime.h"
29

30
#include "executil.h"
31
#include "executorInt.h"
32
#include "querytask.h"
33
#include "storageapi.h"
34
#include "ttypes.h"
35
#include "tutil.h"
36
#include "tjson.h"
37
#include "trpc.h"
38
#include "filter.h"
39
#include "operator.h"
40
#include "tref.h"
41

42
typedef struct tagFilterAssist {
43
  SHashObj* colHash;
44
  int32_t   index;
45
  SArray*   cInfoList;
46
  int32_t   code;
47
} tagFilterAssist;
48

49
typedef struct STransTagExprCtx {
50
  int32_t      code;
51
  SMetaReader* pReader;
52
} STransTagExprCtx;
53

54
typedef enum {
55
  FILTER_NO_LOGIC = 1,
56
  FILTER_AND,
57
  FILTER_OTHER,
58
} FilterCondType;
59

60
static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI,
61
                                        uint64_t suid);
62

63
static int64_t getLimit(const SNode* pLimit) {
1,266,377,013✔
64
  return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->limit) ? -1 : ((SLimitNode*)pLimit)->limit->datum.i;
1,266,377,013✔
65
}
66
static int64_t getOffset(const SNode* pLimit) {
1,266,282,658✔
67
  return (NULL == pLimit || NULL == ((SLimitNode*)pLimit)->offset) ? -1 : ((SLimitNode*)pLimit)->offset->datum.i;
1,266,282,658✔
68
}
69
static void releaseColInfoData(void* pCol);
70
int32_t sendFetchRemoteNodeReq(STaskSubJobCtx* ctx, int32_t subQIdx, SNode* pRes);
71

72
void initResultRowInfo(SResultRowInfo* pResultRowInfo) {
476,603,881✔
73
  pResultRowInfo->size = 0;
476,603,881✔
74
  pResultRowInfo->cur.pageId = -1;
476,645,218✔
75
}
476,689,449✔
76

77
void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; }
5,380,328✔
78

79
void resetResultRow(SResultRow* pResultRow, size_t entrySize) {
899,395,314✔
80
  pResultRow->numOfRows = 0;
899,395,314✔
81
  pResultRow->nOrigRows = 0;
899,400,378✔
82
  pResultRow->closed = false;
899,399,984✔
83
  pResultRow->endInterp = false;
899,401,354✔
84
  pResultRow->startInterp = false;
899,401,010✔
85

86
  if (entrySize > 0) {
899,403,480✔
87
    memset(pResultRow->pEntryInfo, 0, entrySize);
899,403,480✔
88
  }
89
}
899,409,096✔
90

91
// TODO refactor: use macro
92
SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) {
2,147,483,647✔
93
  return (SResultRowEntryInfo*)((char*)pRow->pEntryInfo + offset[index]);
2,147,483,647✔
94
}
95

96
size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
286,114,210✔
97
  int32_t rowSize = (numOfOutput * sizeof(SResultRowEntryInfo)) + sizeof(SResultRow);
286,114,210✔
98

99
  for (int32_t i = 0; i < numOfOutput; ++i) {
1,222,639,093✔
100
    rowSize += pCtx[i].resDataInfo.interBufSize;
936,586,680✔
101
  }
102

103
  return rowSize;
286,052,413✔
104
}
105

106
// Convert buf read from rocksdb to result row
107
int32_t getResultRowFromBuf(SExprSupp* pSup, const char* inBuf, size_t inBufSize, char** outBuf, size_t* outBufSize) {
×
108
  if (inBuf == NULL || pSup == NULL) {
×
109
    qError("invalid input parameters, inBuf:%p, pSup:%p", inBuf, pSup);
×
UNCOV
110
    return TSDB_CODE_INVALID_PARA;
×
111
  }
112
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
113
  int32_t*        offset = pSup->rowEntryInfoOffset;
×
114
  SResultRow*     pResultRow = NULL;
×
115
  size_t          processedSize = 0;
×
UNCOV
116
  int32_t         code = TSDB_CODE_SUCCESS;
×
117

118
  // calculate the size of output buffer
119
  *outBufSize = getResultRowSize(pCtx, pSup->numOfExprs);
×
120
  *outBuf = taosMemoryMalloc(*outBufSize);
×
121
  if (*outBuf == NULL) {
×
122
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
UNCOV
123
    return terrno;
×
124
  }
125
  pResultRow = (SResultRow*)*outBuf;
×
126
  (void)memcpy(pResultRow, inBuf, sizeof(SResultRow));
×
127
  inBuf += sizeof(SResultRow);
×
UNCOV
128
  processedSize += sizeof(SResultRow);
×
129

130
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
×
131
    int32_t len = *(int32_t*)inBuf;
×
132
    inBuf += sizeof(int32_t);
×
133
    processedSize += sizeof(int32_t);
×
134
    if (pResultRow->version != FUNCTION_RESULT_INFO_VERSION && pCtx->fpSet.decode) {
×
135
      code = pCtx->fpSet.decode(&pCtx[i], inBuf, getResultEntryInfo(pResultRow, i, offset), pResultRow->version);
×
136
      if (code != TSDB_CODE_SUCCESS) {
×
137
        qError("failed to decode result row, code:%d", code);
×
UNCOV
138
        return code;
×
139
      }
140
    } else {
UNCOV
141
      (void)memcpy(getResultEntryInfo(pResultRow, i, offset), inBuf, len);
×
142
    }
143
    inBuf += len;
×
UNCOV
144
    processedSize += len;
×
145
  }
146

UNCOV
147
  if (processedSize < inBufSize) {
×
148
    // stream stores extra data after result row
149
    size_t leftLen = inBufSize - processedSize;
×
150
    TAOS_MEMORY_REALLOC(*outBuf, *outBufSize + leftLen);
×
151
    if (*outBuf == NULL) {
×
152
      qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen);
×
UNCOV
153
      return terrno;
×
154
    }
155
    (void)memcpy(*outBuf + *outBufSize, inBuf, leftLen);
×
156
    inBuf += leftLen;
×
157
    processedSize += leftLen;
×
UNCOV
158
    *outBufSize += leftLen;
×
159
  }
160

161
  qTrace("[StreamInternal] get result inBufSize:%zu, outBufSize:%zu", inBufSize, *outBufSize);
×
UNCOV
162
  return TSDB_CODE_SUCCESS;
×
163
}
164

165
// Convert result row to buf for rocksdb
166
int32_t putResultRowToBuf(SExprSupp* pSup, const char* inBuf, size_t inBufSize, char** outBuf, size_t* outBufSize) {
×
167
  if (pSup == NULL || inBuf == NULL || outBuf == NULL || outBufSize == NULL) {
×
168
    qError("invalid input parameters, inBuf:%p, pSup:%p, outBufSize:%p, outBuf:%p", inBuf, pSup, outBufSize, outBuf);
×
UNCOV
169
    return TSDB_CODE_INVALID_PARA;
×
170
  }
171

172
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
173
  int32_t*        offset = pSup->rowEntryInfoOffset;
×
174
  SResultRow*     pResultRow = (SResultRow*)inBuf;
×
UNCOV
175
  size_t          rowSize = getResultRowSize(pCtx, pSup->numOfExprs);
×
176

177
  if (rowSize > inBufSize) {
×
178
    qError("invalid input buffer size, rowSize:%zu, inBufSize:%zu", rowSize, inBufSize);
×
UNCOV
179
    return TSDB_CODE_INVALID_PARA;
×
180
  }
181

182
  // calculate the size of output buffer
183
  *outBufSize = rowSize + sizeof(int32_t) * pSup->numOfExprs;
×
184
  if (rowSize < inBufSize) {
×
UNCOV
185
    *outBufSize += inBufSize - rowSize;
×
186
  }
187

188
  *outBuf = taosMemoryMalloc(*outBufSize);
×
189
  if (*outBuf == NULL) {
×
190
    qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
×
UNCOV
191
    return terrno;
×
192
  }
193

194
  char* pBuf = *outBuf;
×
195
  pResultRow->version = FUNCTION_RESULT_INFO_VERSION;
×
196
  (void)memcpy(pBuf, pResultRow, sizeof(SResultRow));
×
197
  pBuf += sizeof(SResultRow);
×
198
  for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
×
199
    size_t len = sizeof(SResultRowEntryInfo) + pCtx[i].resDataInfo.interBufSize;
×
200
    *(int32_t*)pBuf = (int32_t)len;
×
201
    pBuf += sizeof(int32_t);
×
202
    (void)memcpy(pBuf, getResultEntryInfo(pResultRow, i, offset), len);
×
UNCOV
203
    pBuf += len;
×
204
  }
205

UNCOV
206
  if (rowSize < inBufSize) {
×
207
    // stream stores extra data after result row
208
    size_t leftLen = inBufSize - rowSize;
×
209
    (void)memcpy(pBuf, inBuf + rowSize, leftLen);
×
UNCOV
210
    pBuf += leftLen;
×
211
  }
212

213
  qTrace("[StreamInternal] put result inBufSize:%zu, outBufSize:%zu", inBufSize, *outBufSize);
×
UNCOV
214
  return TSDB_CODE_SUCCESS;
×
215
}
216

UNCOV
217
static void freeEx(void* p) { taosMemoryFree(*(void**)p); }
×
218

219
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
133,971,535✔
220
  taosMemoryFreeClear(pGroupResInfo->pBuf);
133,971,535✔
221
  if (pGroupResInfo->freeItem) {
133,975,260✔
222
    //    taosArrayDestroy(pGroupResInfo->pRows);
223
    taosArrayDestroyEx(pGroupResInfo->pRows, freeEx);
×
224
    pGroupResInfo->freeItem = false;
×
UNCOV
225
    pGroupResInfo->pRows = NULL;
×
226
  } else {
227
    taosArrayDestroy(pGroupResInfo->pRows);
133,971,316✔
228
    pGroupResInfo->pRows = NULL;
133,959,828✔
229
  }
230
  pGroupResInfo->index = 0;
133,972,765✔
231
  pGroupResInfo->delIndex = 0;
133,972,578✔
232
}
133,964,928✔
233

234
int32_t resultrowComparAsc(const void* p1, const void* p2) {
2,147,483,647✔
235
  SResKeyPos* pp1 = *(SResKeyPos**)p1;
2,147,483,647✔
236
  SResKeyPos* pp2 = *(SResKeyPos**)p2;
2,147,483,647✔
237

238
  if (pp1->groupId == pp2->groupId) {
2,147,483,647✔
239
    int64_t pts1 = *(int64_t*)pp1->key;
2,147,483,647✔
240
    int64_t pts2 = *(int64_t*)pp2->key;
2,147,483,647✔
241

242
    if (pts1 == pts2) {
2,147,483,647✔
UNCOV
243
      return 0;
×
244
    } else {
245
      return pts1 < pts2 ? -1 : 1;
2,147,483,647✔
246
    }
247
  } else {
248
    return pp1->groupId < pp2->groupId ? -1 : 1;
2,147,483,647✔
249
  }
250
}
251

252
static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); }
2,147,483,647✔
253

254
int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) {
105,335,557✔
255
  int32_t code = TSDB_CODE_SUCCESS;
105,335,557✔
256
  int32_t lino = 0;
105,335,557✔
257
  if (pGroupResInfo->pRows != NULL) {
105,335,557✔
258
    taosArrayDestroy(pGroupResInfo->pRows);
6,012,641✔
259
  }
260
  if (pGroupResInfo->pBuf) {
105,342,465✔
261
    taosMemoryFree(pGroupResInfo->pBuf);
6,012,641✔
262
    pGroupResInfo->pBuf = NULL;
6,011,707✔
263
  }
264

265
  // extract the result rows information from the hash map
266
  int32_t size = tSimpleHashGetSize(pHashmap);
105,337,388✔
267

268
  void* pData = NULL;
105,341,787✔
269
  pGroupResInfo->pRows = taosArrayInit(size, POINTER_BYTES);
105,341,787✔
270
  QUERY_CHECK_NULL(pGroupResInfo->pRows, code, lino, _end, terrno);
105,341,237✔
271

272
  size_t  keyLen = 0;
105,335,695✔
273
  int32_t iter = 0;
105,336,140✔
274
  int64_t bufLen = 0, offset = 0;
105,346,020✔
275

276
  // todo move away and record this during create window
277
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
2,147,483,647✔
278
    /*void* key = */ (void)tSimpleHashGetKey(pData, &keyLen);
279
    bufLen += keyLen + sizeof(SResultRowPosition);
2,147,483,647✔
280
  }
281

282
  pGroupResInfo->pBuf = taosMemoryMalloc(bufLen);
104,154,839✔
283
  QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
105,346,625✔
284

285
  iter = 0;
105,339,638✔
286
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
2,147,483,647✔
287
    void* key = tSimpleHashGetKey(pData, &keyLen);
2,147,483,647✔
288

289
    SResKeyPos* p = (SResKeyPos*)(pGroupResInfo->pBuf + offset);
2,147,483,647✔
290

291
    p->groupId = *(uint64_t*)key;
2,147,483,647✔
292
    p->pos = *(SResultRowPosition*)pData;
2,147,483,647✔
293
    memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
2,147,483,647✔
294
    void* tmp = taosArrayPush(pGroupResInfo->pRows, &p);
2,147,483,647✔
295
    QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
2,147,483,647✔
296

297
    offset += keyLen + sizeof(struct SResultRowPosition);
2,147,483,647✔
298
  }
299

300
  if (order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC) {
103,281,281✔
301
    __compar_fn_t fn = (order == TSDB_ORDER_ASC) ? resultrowComparAsc : resultrowComparDesc;
6,084,457✔
302
    size = POINTER_BYTES;
6,084,457✔
303
    taosSort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), size, fn);
6,084,457✔
304
  }
305

306
  pGroupResInfo->index = 0;
103,280,924✔
307

308
_end:
105,251,554✔
309
  if (code != TSDB_CODE_SUCCESS) {
105,339,485✔
UNCOV
310
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
311
  }
312
  return code;
105,339,485✔
313
}
314

315
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
×
316
  if (pGroupResInfo->pRows != NULL) {
×
UNCOV
317
    taosArrayDestroy(pGroupResInfo->pRows);
×
318
  }
319

320
  pGroupResInfo->freeItem = true;
×
321
  pGroupResInfo->pRows = pArrayList;
×
322
  pGroupResInfo->index = 0;
×
323
  pGroupResInfo->delIndex = 0;
×
UNCOV
324
}
×
325

326
bool hasRemainResults(SGroupResInfo* pGroupResInfo) {
409,076,410✔
327
  if (pGroupResInfo->pRows == NULL) {
409,076,410✔
UNCOV
328
    return false;
×
329
  }
330

331
  return pGroupResInfo->index < taosArrayGetSize(pGroupResInfo->pRows);
409,082,008✔
332
}
333

334
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
217,633,878✔
335
  if (pGroupResInfo->pRows == 0) {
217,633,878✔
UNCOV
336
    return 0;
×
337
  }
338

339
  return (int32_t)taosArrayGetSize(pGroupResInfo->pRows);
217,648,446✔
340
}
341

342
SArray* createSortInfo(SNodeList* pNodeList) {
54,439,560✔
343
  size_t numOfCols = 0;
54,439,560✔
344

345
  if (pNodeList != NULL) {
54,439,560✔
346
    numOfCols = LIST_LENGTH(pNodeList);
54,236,810✔
347
  } else {
348
    numOfCols = 0;
202,750✔
349
  }
350

351
  SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
54,440,209✔
352
  if (pList == NULL) {
54,428,955✔
UNCOV
353
    return pList;
×
354
  }
355

356
  for (int32_t i = 0; i < numOfCols; ++i) {
124,606,745✔
357
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)nodesListGetNode(pNodeList, i);
70,162,586✔
358
    if (!pSortKey) {
70,170,650✔
359
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
360
      taosArrayDestroy(pList);
×
361
      pList = NULL;
×
362
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
363
      break;
×
364
    }
365
    SBlockOrderInfo bi = {0};
70,170,650✔
366
    bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
70,165,024✔
367
    bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
70,167,889✔
368

369
    if (nodeType(pSortKey->pExpr) != QUERY_NODE_COLUMN) {
70,172,395✔
370
      qError("invalid order by expr type:%d", nodeType(pSortKey->pExpr));
×
371
      taosArrayDestroy(pList);
×
372
      pList = NULL;
×
373
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
374
      break;
×
375
    }
376
    
377
    SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr;
70,153,434✔
378
    bi.slotId = pColNode->slotId;
70,168,570✔
379
    void* tmp = taosArrayPush(pList, &bi);
70,178,794✔
380
    if (!tmp) {
70,178,794✔
381
      taosArrayDestroy(pList);
×
382
      pList = NULL;
×
UNCOV
383
      break;
×
384
    }
385
  }
386

387
  return pList;
54,444,159✔
388
}
389

390
SSDataBlock* createDataBlockFromDescNode(void* p) {
764,779,556✔
391
  SDataBlockDescNode* pNode = (SDataBlockDescNode*)p;
764,779,556✔
392
  int32_t      numOfCols = LIST_LENGTH(pNode->pSlots);
764,779,556✔
393
  SSDataBlock* pBlock = NULL;
764,965,701✔
394
  int32_t      code = TSDB_CODE_SUCCESS;
764,973,401✔
395
  int32_t      lino = 0;
764,973,401✔
396

397
  code = createDataBlock(&pBlock);
764,973,401✔
398
  QUERY_CHECK_CODE(code, lino, _return);
764,762,052✔
399

400
  pBlock->info.id.blockId = pNode->dataBlockId;
764,762,052✔
401
  pBlock->info.type = STREAM_INVALID;
764,839,385✔
402
  pBlock->info.calWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
764,867,540✔
403
  pBlock->info.watermark = INT64_MIN;
764,776,569✔
404

405
  int32_t i = 0;
764,939,327✔
406
  SNode*  node = NULL;
764,939,327✔
407
  FOREACH(node, pNode->pSlots) {
2,147,483,647✔
408
    SSlotDescNode* pDescNode = (SSlotDescNode*)node;
2,147,483,647✔
409
    SColumnInfoData idata =
2,147,483,647✔
410
        createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
2,147,483,647✔
411
    idata.info.scale = pDescNode->dataType.scale;
2,147,483,647✔
412
    idata.info.precision = pDescNode->dataType.precision;
2,147,483,647✔
413
    idata.info.noData = pDescNode->reserve;
2,147,483,647✔
414

415
    code = blockDataAppendColInfo(pBlock, &idata);
2,147,483,647✔
416
    QUERY_CHECK_CODE(code, lino, _return);
2,147,483,647✔
417
    ++i;
2,147,483,647✔
418
  }
419

420
  if (pBlock != NULL && i != numOfCols) {
764,981,942✔
421
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
422
    QUERY_CHECK_CODE(code, lino, _return);
×
423
  }
424

425
  return pBlock;
764,981,942✔
426
_return:
68✔
427
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
428
  blockDataDestroy(pBlock);
×
429
  terrno = code;
×
UNCOV
430
  return NULL;
×
431
}
432

433
int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) {
258,811,765✔
434
  SDataBlockInfo* pBlockInfo = &pDataBlock->info;
258,811,765✔
435

436
  for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) {
1,158,349,973✔
437
    SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i);
920,918,168✔
438
    if (!pItem) {
920,822,730✔
439
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
440
      return terrno;
×
441
    }
442

443
    if (pItem->isPk) {
920,822,730✔
444
      SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId);
21,520,801✔
445
      if (!pInfoData) {
21,162,399✔
446
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
447
        return terrno;
×
448
      }
449
      pBlockInfo->pks[0].type = pInfoData->info.type;
21,162,399✔
450
      pBlockInfo->pks[1].type = pInfoData->info.type;
21,177,261✔
451

452
      // allocate enough buffer size, which is pInfoData->info.bytes
453
      if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
21,165,586✔
454
        pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
6,438,095✔
455
        if (pBlockInfo->pks[0].pData == NULL) {
6,429,619✔
UNCOV
456
          return terrno;
×
457
        }
458

459
        pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
6,434,534✔
460
        if (pBlockInfo->pks[1].pData == NULL) {
6,430,853✔
461
          taosMemoryFreeClear(pBlockInfo->pks[0].pData);
×
UNCOV
462
          return terrno;
×
463
        }
464

465
        pBlockInfo->pks[0].nData = pInfoData->info.bytes;
6,435,690✔
466
        pBlockInfo->pks[1].nData = pInfoData->info.bytes;
6,436,331✔
467
      }
468

469
      break;
21,180,367✔
470
    }
471
  }
472

473
  return TSDB_CODE_SUCCESS;
258,664,320✔
474
}
475

476
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
2,422,588✔
477
  STransTagExprCtx* pCtx = pContext;
2,422,588✔
478
  SMetaReader*      mr = pCtx->pReader;
2,422,588✔
479
  bool              isTagCol = false, isTbname = false;
2,422,588✔
480
  if (nodeType(*pNode) == QUERY_NODE_COLUMN) {
2,422,588✔
481
    SColumnNode* pCol = (SColumnNode*)*pNode;
692,168✔
482
    if (pCol->colType == COLUMN_TYPE_TBNAME)
692,168✔
UNCOV
483
      isTbname = true;
×
484
    else
485
      isTagCol = true;
692,168✔
486
  } else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) {
1,730,420✔
487
    SFunctionNode* pFunc = (SFunctionNode*)*pNode;
×
UNCOV
488
    if (pFunc->funcType == FUNCTION_TYPE_TBNAME) isTbname = true;
×
489
  }
490
  if (isTagCol) {
2,422,588✔
491
    SColumnNode* pSColumnNode = *(SColumnNode**)pNode;
692,168✔
492

493
    SValueNode* res = NULL;
692,168✔
494
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
692,168✔
495
    if (NULL == res) {
692,168✔
UNCOV
496
      return DEAL_RES_ERROR;
×
497
    }
498

499
    res->translate = true;
692,168✔
500
    res->node.resType = pSColumnNode->node.resType;
692,168✔
501

502
    STagVal tagVal = {0};
692,168✔
503
    tagVal.cid = pSColumnNode->colId;
692,168✔
504
    const char* p = mr->pAPI->extractTagVal(mr->me.ctbEntry.pTags, pSColumnNode->node.resType.type, &tagVal);
692,168✔
505
    if (p == NULL) {
692,168✔
UNCOV
506
      res->node.resType.type = TSDB_DATA_TYPE_NULL;
×
507
    } else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) {
692,168✔
508
      int32_t len = ((const STag*)p)->len;
×
509
      res->datum.p = taosMemoryCalloc(len + 1, 1);
×
510
      if (NULL == res->datum.p) {
×
UNCOV
511
        return DEAL_RES_ERROR;
×
512
      }
UNCOV
513
      memcpy(res->datum.p, p, len);
×
514
    } else if (IS_VAR_DATA_TYPE(pSColumnNode->node.resType.type)) {
692,168✔
515
      if (IS_STR_DATA_BLOB(pSColumnNode->node.resType.type)) {
692,168✔
UNCOV
516
        return TSDB_CODE_BLOB_NOT_SUPPORT_TAG;
×
517
      }
518

519
      res->datum.p = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1);
692,168✔
520
      if (NULL == res->datum.p) {
692,168✔
UNCOV
521
        return DEAL_RES_ERROR;
×
522
      }
523

524
      if (IS_STR_DATA_BLOB(pSColumnNode->node.resType.type)) {
692,168✔
525
        memcpy(blobDataVal(res->datum.p), tagVal.pData, tagVal.nData);
×
UNCOV
526
        blobDataSetLen(res->datum.p, tagVal.nData);
×
527
      } else {
528
        memcpy(varDataVal(res->datum.p), tagVal.pData, tagVal.nData);
692,168✔
529
        varDataSetLen(res->datum.p, tagVal.nData);
692,168✔
530
      }
531
    } else {
532
      int32_t code = nodesSetValueNodeValue(res, &(tagVal.i64));
×
533
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
534
        return DEAL_RES_ERROR;
×
535
      }
536
    }
537
    nodesDestroyNode(*pNode);
692,168✔
538
    *pNode = (SNode*)res;
692,168✔
539
  } else if (isTbname) {
1,730,420✔
540
    SValueNode* res = NULL;
×
541
    pCtx->code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
×
542
    if (NULL == res) {
×
UNCOV
543
      return DEAL_RES_ERROR;
×
544
    }
545

546
    res->translate = true;
×
UNCOV
547
    res->node.resType = ((SExprNode*)(*pNode))->resType;
×
548

549
    int32_t len = strlen(mr->me.name);
×
550
    res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
×
551
    if (NULL == res->datum.p) {
×
UNCOV
552
      return DEAL_RES_ERROR;
×
553
    }
554
    memcpy(varDataVal(res->datum.p), mr->me.name, len);
×
555
    varDataSetLen(res->datum.p, len);
×
556
    nodesDestroyNode(*pNode);
×
UNCOV
557
    *pNode = (SNode*)res;
×
558
  }
559

560
  return DEAL_RES_CONTINUE;
2,422,588✔
561
}
562

563
int32_t isQualifiedTable(int64_t uid, SNode* pTagCond, void* vnode, bool* pQualified, SStorageAPI* pAPI) {
346,084✔
564
  int32_t     code = TSDB_CODE_SUCCESS;
346,084✔
565
  SMetaReader mr = {0};
346,084✔
566

567
  pAPI->metaReaderFn.initReader(&mr, vnode, META_READER_LOCK, &pAPI->metaFn);
346,084✔
568
  code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid);
346,084✔
569
  if (TSDB_CODE_SUCCESS != code) {
346,084✔
570
    pAPI->metaReaderFn.clearReader(&mr);
×
UNCOV
571
    *pQualified = false;
×
572

UNCOV
573
    return TSDB_CODE_SUCCESS;
×
574
  }
575

576
  SNode* pTagCondTmp = NULL;
346,084✔
577
  code = nodesCloneNode(pTagCond, &pTagCondTmp);
346,084✔
578
  if (TSDB_CODE_SUCCESS != code) {
346,084✔
579
    *pQualified = false;
×
580
    pAPI->metaReaderFn.clearReader(&mr);
×
UNCOV
581
    return code;
×
582
  }
583
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
346,084✔
584
  nodesRewriteExprPostOrder(&pTagCondTmp, doTranslateTagExpr, &ctx);
346,084✔
585
  pAPI->metaReaderFn.clearReader(&mr);
346,084✔
586
  if (TSDB_CODE_SUCCESS != ctx.code) {
346,084✔
587
    *pQualified = false;
×
588
    nodesDestroyNode(pTagCondTmp);
×
589
    terrno = code;
×
UNCOV
590
    return code;
×
591
  }
592

593
  SNode* pNew = NULL;
346,084✔
594
  code = scalarCalculateConstants(pTagCondTmp, &pNew);
346,084✔
595
  if (TSDB_CODE_SUCCESS != code) {
346,084✔
596
    terrno = code;
×
597
    nodesDestroyNode(pTagCondTmp);
×
UNCOV
598
    *pQualified = false;
×
599

UNCOV
600
    return code;
×
601
  }
602

603
  SValueNode* pValue = (SValueNode*)pNew;
346,084✔
604
  *pQualified = pValue->datum.b;
346,084✔
605

606
  nodesDestroyNode(pNew);
346,084✔
607
  return TSDB_CODE_SUCCESS;
346,084✔
608
}
609

610
static EDealRes getColumn(SNode** pNode, void* pContext) {
60,060,713✔
611
  tagFilterAssist* pData = (tagFilterAssist*)pContext;
60,060,713✔
612
  SColumnNode*     pSColumnNode = NULL;
60,060,713✔
613
  if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
60,063,460✔
614
    pSColumnNode = *(SColumnNode**)pNode;
20,810,566✔
615
  } else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) {
39,267,645✔
616
    SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
885,651✔
617
    if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
885,651✔
618
      pData->code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pSColumnNode);
486,984✔
619
      if (NULL == pSColumnNode) {
487,563✔
UNCOV
620
        return DEAL_RES_ERROR;
×
621
      }
622
      pSColumnNode->colId = -1;
487,563✔
623
      pSColumnNode->colType = COLUMN_TYPE_TBNAME;
487,563✔
624
      pSColumnNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
487,563✔
625
      pSColumnNode->node.resType.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
487,563✔
626
      nodesDestroyNode(*pNode);
487,563✔
627
      *pNode = (SNode*)pSColumnNode;
486,978✔
628
    } else {
629
      return DEAL_RES_CONTINUE;
398,088✔
630
    }
631
  } else {
632
    return DEAL_RES_CONTINUE;
38,379,174✔
633
  }
634

635
  void* data = taosHashGet(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId));
21,300,683✔
636
  if (!data) {
21,296,596✔
637
    int32_t tempRes =
638
        taosHashPut(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode)));
18,278,625✔
639
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
18,277,890✔
UNCOV
640
      return DEAL_RES_ERROR;
×
641
    }
642
    pSColumnNode->slotId = pData->index++;
18,277,890✔
643
    SColumnInfo cInfo = {.colId = pSColumnNode->colId,
18,277,235✔
644
                         .type = pSColumnNode->node.resType.type,
18,270,589✔
645
                         .bytes = pSColumnNode->node.resType.bytes,
18,272,746✔
646
                         .pk = pSColumnNode->isPk};
18,267,473✔
647
#if TAG_FILTER_DEBUG
648
    qDebug("tagfilter build column info, slotId:%d, colId:%d, type:%d", pSColumnNode->slotId, cInfo.colId, cInfo.type);
649
#endif
650
    void* tmp = taosArrayPush(pData->cInfoList, &cInfo);
18,276,956✔
651
    if (!tmp) {
18,285,900✔
UNCOV
652
      return DEAL_RES_ERROR;
×
653
    }
654
  } else {
655
    SColumnNode* col = *(SColumnNode**)data;
3,017,971✔
656
    pSColumnNode->slotId = col->slotId;
3,017,971✔
657
  }
658

659
  return DEAL_RES_CONTINUE;
21,293,719✔
660
}
661

662
static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) {
16,937,615✔
663
  SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
16,937,615✔
664
  if (pColumnData == NULL) {
16,936,454✔
UNCOV
665
    return terrno;
×
666
  }
667

668
  pColumnData->info.type = pType->type;
16,936,454✔
669
  pColumnData->info.bytes = pType->bytes;
16,935,912✔
670
  pColumnData->info.scale = pType->scale;
16,936,396✔
671
  pColumnData->info.precision = pType->precision;
16,932,709✔
672

673
  int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
16,938,354✔
674
  if (code != TSDB_CODE_SUCCESS) {
16,927,727✔
675
    terrno = code;
×
676
    releaseColInfoData(pColumnData);
×
UNCOV
677
    return terrno;
×
678
  }
679

680
  pParam->columnData = pColumnData;
16,927,727✔
681
  pParam->colAlloced = true;
16,932,797✔
682
  return TSDB_CODE_SUCCESS;
16,930,372✔
683
}
684

685
static void releaseColInfoData(void* pCol) {
4,703,172✔
686
  if (pCol) {
4,703,172✔
687
    SColumnInfoData* col = (SColumnInfoData*)pCol;
4,703,058✔
688
    colDataDestroy(col);
4,703,058✔
689
    taosMemoryFree(col);
4,701,627✔
690
  }
691
}
4,703,058✔
692

693
void freeItem(void* p) {
190,758,058✔
694
  STUidTagInfo* pInfo = p;
190,758,058✔
695
  if (pInfo->pTagVal != NULL) {
190,758,058✔
696
    taosMemoryFree(pInfo->pTagVal);
190,241,156✔
697
  }
698
}
190,742,585✔
699

700
typedef struct {
701
  col_id_t  colId;
702
  SNode*    pValueNode;
703
  int32_t   bytes;  // length defined in schema
704
} STagDataEntry;
705

706
static int compareTagDataEntry(const void* a, const void* b) {
27,864✔
707
  STagDataEntry* p1 = (STagDataEntry*)a;
27,864✔
708
  STagDataEntry* p2 = (STagDataEntry*)b;
27,864✔
709
  return compareInt16Val(&p1->colId, &p2->colId);
27,864✔
710
}
711

712
static int32_t buildTagDataEntryKey(SArray* pIdWithValue, char** keyBuf, int32_t keyLen) {
13,932✔
713
  *keyBuf = (char*)taosMemoryCalloc(1, keyLen);
13,932✔
714
  if (NULL == *keyBuf) {
13,932✔
UNCOV
715
    qError(
×
716
      "failed to allocate memory for tag filter optimization key, size:%d",
717
      keyLen);
UNCOV
718
    return terrno;
×
719
  }
720
  char* pStart = *keyBuf;
13,932✔
721
  for (int32_t i = 0; i < taosArrayGetSize(pIdWithValue); ++i) {
41,796✔
722
    STagDataEntry* entry      = (STagDataEntry*)taosArrayGet(pIdWithValue, i);
27,864✔
723
    SValueNode*    pValueNode = (SValueNode*)entry->pValueNode;
27,864✔
724
    // num type may have different bytes length, use the smaller one
725
    int32_t        bytes = TMIN(entry->bytes, pValueNode->node.resType.bytes);
27,864✔
726

727
    (void)memcpy(pStart, &entry->colId, sizeof(col_id_t));
27,864✔
728
    pStart += sizeof(col_id_t);
27,864✔
729

730
    if (!pValueNode->isNull) {
27,864✔
731
      switch (pValueNode->node.resType.type) {
24,424✔
732
        case TSDB_DATA_TYPE_BOOL:
2,236✔
733
          (void)memcpy(
2,236✔
734
            pStart, &pValueNode->datum.b, bytes);
2,236✔
735
          pStart += bytes;
2,236✔
736
          break;
2,236✔
737
        case TSDB_DATA_TYPE_TINYINT:
12,212✔
738
        case TSDB_DATA_TYPE_SMALLINT:
739
        case TSDB_DATA_TYPE_INT:
740
        case TSDB_DATA_TYPE_BIGINT:
741
        case TSDB_DATA_TYPE_TIMESTAMP:
742
          (void)memcpy(
12,212✔
743
            pStart, &pValueNode->datum.i, bytes);
12,212✔
744
          pStart += bytes;
12,212✔
745
          break;
12,212✔
UNCOV
746
        case TSDB_DATA_TYPE_UTINYINT:
×
747
        case TSDB_DATA_TYPE_USMALLINT:
748
        case TSDB_DATA_TYPE_UINT:
749
        case TSDB_DATA_TYPE_UBIGINT:
750
          (void)memcpy(
×
751
            pStart, &pValueNode->datum.u, bytes);
×
752
          pStart += bytes;
×
UNCOV
753
          break;
×
754
        case TSDB_DATA_TYPE_FLOAT:
3,096✔
755
        case TSDB_DATA_TYPE_DOUBLE:
756
          (void)memcpy(
3,096✔
757
            pStart, &pValueNode->datum.d, bytes);
3,096✔
758
          pStart += bytes;
3,096✔
759
          break;
3,096✔
760
        case TSDB_DATA_TYPE_VARCHAR:
6,880✔
761
        case TSDB_DATA_TYPE_VARBINARY:
762
        case TSDB_DATA_TYPE_NCHAR:
763
          (void)memcpy(pStart,
6,880✔
764
            varDataVal(pValueNode->datum.p), varDataLen(pValueNode->datum.p));
6,880✔
765
          pStart += varDataLen(pValueNode->datum.p);
6,880✔
766
          break;
6,880✔
767
        default:
×
UNCOV
768
          qError("unsupported tag data type %d in tag filter optimization",
×
769
            pValueNode->node.resType.type);
UNCOV
770
          return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
771
      }
772
    }
773
  }
774

775
  return TSDB_CODE_SUCCESS;
13,932✔
776
}
777

778
static void extractTagDataEntry(
27,864✔
779
  SOperatorNode* pOpNode, SArray* pIdWithValue) {
780
  SNode* pLeft = pOpNode->pLeft;
27,864✔
781
  SNode* pRight = pOpNode->pRight;
27,864✔
782
  SColumnNode* pColNode = nodeType(pLeft) == QUERY_NODE_COLUMN ?
27,864✔
783
    (SColumnNode*)pLeft : (SColumnNode*)pRight;
27,864✔
784
  SValueNode* pValueNode = nodeType(pLeft) == QUERY_NODE_VALUE ?
27,864✔
785
    (SValueNode*)pLeft : (SValueNode*)pRight;
27,864✔
786

787
  STagDataEntry entry = {0};
27,864✔
788
  entry.colId = pColNode->colId;
27,864✔
789
  entry.pValueNode = (SNode*)pValueNode;
27,864✔
790
  entry.bytes = pColNode->node.resType.bytes;
27,864✔
791
  void* _tmp = taosArrayPush(pIdWithValue, &entry);
27,864✔
792
}
27,864✔
793

794
static int32_t extractTagFilterTagDataEntries(
13,932✔
795
  const SNode* pTagCond, SArray* pIdWithVal) {
796
  if (NULL == pTagCond || NULL == pIdWithVal ||
13,932✔
797
    (nodeType(pTagCond) != QUERY_NODE_OPERATOR &&
13,932✔
798
      nodeType(pTagCond) != QUERY_NODE_LOGIC_CONDITION)) {
13,932✔
799
    qError("invalid parameter to extract tag filter symbol");
×
UNCOV
800
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
801
  }
802

803
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR) {
13,932✔
UNCOV
804
    extractTagDataEntry((SOperatorNode*)pTagCond, pIdWithVal);
×
805
  } else if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION) {
13,932✔
806
    SNode* pChild = NULL;
13,932✔
807
    FOREACH(pChild, ((SLogicConditionNode*)pTagCond)->pParameterList) {
41,796✔
808
      extractTagDataEntry((SOperatorNode*)pChild, pIdWithVal);
27,864✔
809
    }
810
  }
811

812
  taosArraySort(pIdWithVal, compareTagDataEntry);
13,932✔
813

814
  return TSDB_CODE_SUCCESS;
13,932✔
815
}
816

817
static int32_t genStableTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
13,932✔
818
  if (pTagCond == NULL) {
13,932✔
UNCOV
819
    return TSDB_CODE_SUCCESS;
×
820
  }
821

822
  char*   payload = NULL;
13,932✔
823
  int32_t len = 0;
13,932✔
824
  int32_t code = TSDB_CODE_SUCCESS;
13,932✔
825
  int32_t lino = 0;
13,932✔
826

827
  SArray* pIdWithVal = taosArrayInit(TARRAY_MIN_SIZE, sizeof(STagDataEntry));
13,932✔
828
  code = extractTagFilterTagDataEntries(pTagCond, pIdWithVal);
13,932✔
829
  QUERY_CHECK_CODE(code, lino, _end);
13,932✔
830
  for (int32_t i = 0; i < taosArrayGetSize(pIdWithVal); ++i) {
41,796✔
831
    STagDataEntry* pEntry = taosArrayGet(pIdWithVal, i);
27,864✔
832
    len += sizeof(col_id_t) + pEntry->bytes;
27,864✔
833
  }
834
  code = buildTagDataEntryKey(pIdWithVal, &payload, len);
13,932✔
835
  QUERY_CHECK_CODE(code, lino, _end);
13,932✔
836

837
  tMD5Init(pContext);
13,932✔
838
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
13,932✔
839
  tMD5Final(pContext);
13,932✔
840

841
_end:
13,932✔
842
  if (TSDB_CODE_SUCCESS != code) {
13,932✔
UNCOV
843
    qError("%s failed at line %d since %s",
×
844
      __func__, __LINE__, tstrerror(code));
845
  }
846
  taosArrayDestroy(pIdWithVal);
13,932✔
847
  taosMemoryFree(payload);
13,932✔
848
  return code;
13,932✔
849
}
850

851
static int32_t genTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
32,766✔
852
  if (pTagCond == NULL) {
32,766✔
853
    return TSDB_CODE_SUCCESS;
30,960✔
854
  }
855

856
  char*   payload = NULL;
1,806✔
857
  int32_t len = 0;
1,806✔
858
  int32_t code = nodesNodeToMsg(pTagCond, &payload, &len);
1,806✔
859
  if (code != TSDB_CODE_SUCCESS) {
1,806✔
860
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
861
    return code;
×
862
  }
863

864
  tMD5Init(pContext);
1,806✔
865
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
1,806✔
866
  tMD5Final(pContext);
1,806✔
867

868
  // void* tmp = NULL;
869
  // uint32_t size = 0;
870
  // (void)taosAscii2Hex((const char*)pContext->digest, 16, &tmp, &size);
871
  // qInfo("tag filter digest payload: %s", tmp);
872
  // taosMemoryFree(tmp);
873

874
  taosMemoryFree(payload);
1,806✔
875
  return TSDB_CODE_SUCCESS;
1,806✔
876
}
877

878
static int32_t genTbGroupDigest(const SNode* pGroup, uint8_t* filterDigest, T_MD5_CTX* pContext) {
×
879
  int32_t code = TSDB_CODE_SUCCESS;
×
880
  int32_t lino = 0;
×
881
  char*   payload = NULL;
×
882
  int32_t len = 0;
×
883
  code = nodesNodeToMsg(pGroup, &payload, &len);
×
UNCOV
884
  QUERY_CHECK_CODE(code, lino, _end);
×
885

886
  if (filterDigest[0]) {
×
887
    payload = taosMemoryRealloc(payload, len + tListLen(pContext->digest));
×
888
    QUERY_CHECK_NULL(payload, code, lino, _end, terrno);
×
889
    memcpy(payload + len, filterDigest + 1, tListLen(pContext->digest));
×
UNCOV
890
    len += tListLen(pContext->digest);
×
891
  }
892

893
  tMD5Init(pContext);
×
894
  tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
×
UNCOV
895
  tMD5Final(pContext);
×
896

897
_end:
×
898
  taosMemoryFree(payload);
×
899
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
900
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
901
  }
UNCOV
902
  return code;
×
903
}
904

905
int32_t qGetColumnsFromNodeList(void* data, bool isList, SArray** pColList) {
16,539,179✔
906
  int32_t code = TSDB_CODE_SUCCESS;
16,539,179✔
907
  tagFilterAssist ctx = {0};
16,539,179✔
908
  ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
16,540,942✔
909
  if (ctx.colHash == NULL) {
16,540,537✔
910
    code = terrno;
×
UNCOV
911
    goto end;
×
912
  }
913

914
  ctx.index = 0;
16,540,537✔
915
  ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
16,540,537✔
916
  if (ctx.cInfoList == NULL) {
16,543,375✔
917
    code = terrno;
4,638✔
UNCOV
918
    goto end;
×
919
  }
920

921
  if (isList) {
16,538,737✔
922
    SNode* pNode = NULL;
4,303,054✔
923
    FOREACH(pNode, (SNodeList*)data) {
9,006,004✔
924
      nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx);
4,703,502✔
925
      if (TSDB_CODE_SUCCESS != ctx.code) {
4,703,466✔
926
        code = ctx.code;
×
UNCOV
927
        goto end;
×
928
      }
929
      REPLACE_NODE(pNode);
4,703,466✔
930
    }
931
  } else {
932
    SNode* pNode = (SNode*)data;
12,235,683✔
933
    nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx);
12,235,096✔
934
    if (TSDB_CODE_SUCCESS != ctx.code) {
12,238,308✔
935
      code = ctx.code;
×
UNCOV
936
      goto end;
×
937
    }
938
  }
939
  
940
  if (pColList != NULL) *pColList = ctx.cInfoList;
16,527,971✔
941
  ctx.cInfoList = NULL;
16,538,405✔
942

943
end:
16,550,984✔
944
  taosHashCleanup(ctx.colHash);
16,535,040✔
945
  taosArrayDestroy(ctx.cInfoList);
16,522,560✔
946
  return code;
16,527,336✔
947
}
948

949
static int32_t buildGroupInfo(SColumnInfoData* pValue, int32_t i, SArray* gInfo) {
577,070✔
950
  int32_t code = TSDB_CODE_SUCCESS;
577,070✔
951
  SStreamGroupValue* v = taosArrayReserve(gInfo, 1);
577,070✔
952
  if (v == NULL) {
577,070✔
953
    code = terrno;
×
UNCOV
954
    goto end;
×
955
  }
956
  if (colDataIsNull_s(pValue, i)) {
1,154,140✔
957
    v->isNull = true;
7,740✔
958
  } else {
959
    v->isNull = false;
569,330✔
960
    char* data = colDataGetData(pValue, i);
569,330✔
961
    if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
569,330✔
962
      if (tTagIsJson(data)) {
×
963
        code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
×
UNCOV
964
        goto end;
×
965
      }
966
      if (tTagIsJsonNull(data)) {
×
967
        v->isNull = true;
×
UNCOV
968
        goto end;
×
969
      }
970
      int32_t len = getJsonValueLen(data);
×
971
      v->data.type = pValue->info.type;
×
972
      v->data.nData = len;
×
973
      v->data.pData = taosMemoryCalloc(1, len + 1);
×
974
      if (v->data.pData == NULL) {
×
975
        code = terrno;
×
UNCOV
976
        goto end;
×
977
      }
978
      memcpy(v->data.pData, data, len);
×
UNCOV
979
      qDebug("buildGroupInfo:%d add json data len:%d, data:%s", i, len, (char*)v->data.pData);
×
980
    } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
569,158✔
981
      if (varDataTLen(data) > pValue->info.bytes) {
374,912✔
982
        code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
UNCOV
983
        goto end;
×
984
      }
985
      v->data.type = pValue->info.type;
375,084✔
986
      v->data.nData = varDataLen(data);
375,084✔
987
      v->data.pData = taosMemoryCalloc(1, varDataLen(data) + 1);
375,084✔
988
      if (v->data.pData == NULL) {
375,084✔
989
        code = terrno;
×
UNCOV
990
        goto end;
×
991
      }
992
      memcpy(v->data.pData, varDataVal(data), varDataLen(data));
375,084✔
993
      qDebug("buildGroupInfo:%d add var data type:%d, len:%d, data:%s", i, pValue->info.type, varDataLen(data), (char*)v->data.pData);
375,084✔
994
    } else if (pValue->info.type == TSDB_DATA_TYPE_DECIMAL) {  // reader todo decimal
194,246✔
995
      v->data.type = pValue->info.type;
×
996
      v->data.nData = pValue->info.bytes;
×
997
      v->data.pData = taosMemoryCalloc(1, pValue->info.bytes);
×
998
      if (v->data.pData == NULL) {
×
999
        code = terrno;
×
UNCOV
1000
        goto end;
×
1001
      }
1002
      memcpy(&v->data.pData, data, pValue->info.bytes);
×
UNCOV
1003
      qDebug("buildGroupInfo:%d add data type:%d, data:%"PRId64, i, pValue->info.type, v->data.val);
×
1004
    } else {  // reader todo decimal
1005
      v->data.type = pValue->info.type;
194,246✔
1006
      memcpy(&v->data.val, data, pValue->info.bytes);
194,246✔
1007
      qDebug("buildGroupInfo:%d add data type:%d, data:%"PRId64, i, pValue->info.type, v->data.val);
194,246✔
1008
    }
1009
  }
1010
end:
43,376✔
1011
  if (code != TSDB_CODE_SUCCESS) {
577,070✔
1012
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
1013
    v->isNull = true;
×
1014
  }
1015
  return code;
577,070✔
1016
}
1017

1018
static void getColInfoResultForGroupbyForStream(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo,
76,703✔
1019
                                   SStorageAPI* pAPI, SHashObj* groupIdMap) {
1020
  int32_t      code = TSDB_CODE_SUCCESS;
76,703✔
1021
  int32_t      lino = 0;
76,703✔
1022
  SArray*      pBlockList = NULL;
76,703✔
1023
  SSDataBlock* pResBlock = NULL;
76,703✔
1024
  SArray*      groupData = NULL;
76,925✔
1025
  SArray*      pUidTagList = NULL;
76,925✔
1026
  SArray*      gInfo = NULL;
76,925✔
1027
  int32_t      tbNameIndex = 0;
76,925✔
1028

1029
  int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
76,925✔
1030
  if (rows == 0) {
76,925✔
UNCOV
1031
    return;
×
1032
  }
1033

1034
  pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
76,925✔
1035
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
76,925✔
1036

1037
  for (int32_t i = 0; i < rows; ++i) {
334,436✔
1038
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
257,511✔
1039
    QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
257,511✔
1040
    STUidTagInfo info = {.uid = pkeyInfo->uid};
257,511✔
1041
    void*        tmp = taosArrayPush(pUidTagList, &info);
257,511✔
1042
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
257,511✔
1043
  }
1044
 
1045
  if (taosArrayGetSize(pUidTagList) > 0) {
76,925✔
1046
    code = pAPI->metaFn.getTableTagsByUid(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
76,925✔
1047
  } else {
UNCOV
1048
    code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
×
1049
  }
1050
  if (code != TSDB_CODE_SUCCESS) {
76,925✔
UNCOV
1051
    goto end;
×
1052
  }
1053

1054
  SArray* pColList = NULL;
76,925✔
1055
  code = qGetColumnsFromNodeList(group, true, &pColList);
76,925✔
1056
  if (code != TSDB_CODE_SUCCESS) {
76,925✔
UNCOV
1057
    goto end;
×
1058
  }
1059

1060
  for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
229,282✔
1061
    SColumnInfo* tmp = (SColumnInfo*)taosArrayGet(pColList, i);
152,357✔
1062
    if (tmp != NULL && tmp->colId == -1) {
152,357✔
1063
      tbNameIndex = i;
76,703✔
1064
    }
1065
  }
1066
  
1067
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
76,925✔
1068
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
76,925✔
1069
  taosArrayDestroy(pColList);
76,925✔
1070
  if (pResBlock == NULL) {
76,925✔
1071
    code = terrno;
258✔
1072
    goto end;
258✔
1073
  }
1074

1075
  pBlockList = taosArrayInit(2, POINTER_BYTES);
76,667✔
1076
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
76,667✔
1077

1078
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
76,667✔
1079
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
76,667✔
1080

1081
  groupData = taosArrayInit(2, POINTER_BYTES);
76,667✔
1082
  QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
76,667✔
1083

1084
  SNode* pNode = NULL;
76,667✔
1085
  FOREACH(pNode, group) {
228,508✔
1086
    SScalarParam output = {0};
151,841✔
1087

1088
    switch (nodeType(pNode)) {
151,841✔
1089
      case QUERY_NODE_VALUE:
×
UNCOV
1090
        break;
×
1091
      case QUERY_NODE_COLUMN:
151,841✔
1092
      case QUERY_NODE_OPERATOR:
1093
      case QUERY_NODE_FUNCTION: {
1094
        SExprNode* expNode = (SExprNode*)pNode;
151,841✔
1095
        code = createResultData(&expNode->resType, rows, &output);
151,841✔
1096
        if (code != TSDB_CODE_SUCCESS) {
151,841✔
UNCOV
1097
          goto end;
×
1098
        }
1099
        break;
151,841✔
1100
      }
1101

1102
      default:
×
1103
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
UNCOV
1104
        goto end;
×
1105
    }
1106

1107
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
151,841✔
1108
      SColumnNode*     pSColumnNode = (SColumnNode*)pNode;
151,841✔
1109
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
151,841✔
1110
      QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
151,841✔
1111
      code = colDataAssign(output.columnData, pColInfo, rows, NULL);
151,841✔
1112
    } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
×
UNCOV
1113
      continue;
×
1114
    } else {
1115
      gTaskScalarExtra.pStreamInfo = NULL;
×
1116
      gTaskScalarExtra.pStreamRange = NULL;
×
UNCOV
1117
      code = scalarCalculate(pNode, pBlockList, &output, &gTaskScalarExtra);
×
1118
    }
1119

1120
    if (code != TSDB_CODE_SUCCESS) {
151,841✔
1121
      releaseColInfoData(output.columnData);
×
UNCOV
1122
      goto end;
×
1123
    }
1124

1125
    void* tmp = taosArrayPush(groupData, &output.columnData);
151,841✔
1126
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
151,841✔
1127
  }
1128

1129
  for (int i = 0; i < rows; i++) {
333,920✔
1130
    gInfo = taosArrayInit(taosArrayGetSize(groupData), sizeof(SStreamGroupValue));
257,253✔
1131
    QUERY_CHECK_NULL(gInfo, code, lino, end, terrno);
257,253✔
1132

1133
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
257,253✔
1134
    QUERY_CHECK_NULL(info, code, lino, end, terrno);
257,253✔
1135

1136
    for (int j = 0; j < taosArrayGetSize(groupData); j++) {
718,747✔
1137
      SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
461,494✔
1138
        int32_t ret = buildGroupInfo(pValue, i, gInfo);
461,494✔
1139
        if (ret != TSDB_CODE_SUCCESS) {
461,494✔
1140
          qError("buildGroupInfo failed at line %d since %s", __LINE__, tstrerror(ret));
×
UNCOV
1141
          goto end;
×
1142
        }
1143
        if (j == tbNameIndex) {
461,494✔
1144
          SStreamGroupValue* v = taosArrayGetLast(gInfo);
257,253✔
1145
          if (v != NULL){
257,253✔
1146
            v->isTbname = true;
257,253✔
1147
            v->uid = info->uid;
257,253✔
1148
          }
1149
        }
1150
    }
1151

1152
    int32_t ret = taosHashPut(groupIdMap, &info->uid, sizeof(info->uid), &gInfo, POINTER_BYTES);
257,253✔
1153
    if (ret != TSDB_CODE_SUCCESS) {
257,253✔
1154
      qError("put groupid to map failed at line %d since %s", __LINE__, tstrerror(ret));
×
UNCOV
1155
      goto end;
×
1156
    }
1157
    qDebug("put groupid to map gid:%" PRIu64, info->uid);
257,253✔
1158
    gInfo = NULL;
257,253✔
1159
  }
1160

1161
end:
76,925✔
1162
  blockDataDestroy(pResBlock);
76,925✔
1163
  taosArrayDestroy(pBlockList);
76,925✔
1164
  taosArrayDestroyEx(pUidTagList, freeItem);
76,925✔
1165
  taosArrayDestroyP(groupData, releaseColInfoData);
76,925✔
1166
  taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
76,925✔
1167

1168
  if (code != TSDB_CODE_SUCCESS) {
76,925✔
1169
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
258✔
1170
  }
1171
}
1172

1173
int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInfo* pTableListInfo, uint8_t* digest,
4,226,129✔
1174
                                   SStorageAPI* pAPI, bool initRemainGroups, SHashObj* groupIdMap) {
1175
  int32_t      code = TSDB_CODE_SUCCESS;
4,226,129✔
1176
  int32_t      lino = 0;
4,226,129✔
1177
  SArray*      pBlockList = NULL;
4,226,129✔
1178
  SSDataBlock* pResBlock = NULL;
4,226,129✔
1179
  void*        keyBuf = NULL;
4,226,129✔
1180
  SArray*      groupData = NULL;
4,226,129✔
1181
  SArray*      pUidTagList = NULL;
4,226,129✔
1182
  SArray*      tableList = NULL;
4,226,129✔
1183
  SArray*      gInfo = NULL;
4,226,129✔
1184

1185
  int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
4,226,129✔
1186
  if (rows == 0) {
4,226,129✔
UNCOV
1187
    return TSDB_CODE_SUCCESS;
×
1188
  } 
1189

1190
  T_MD5_CTX context = {0};
4,226,129✔
1191
  if (tsTagFilterCache && groupIdMap == NULL) {
4,226,129✔
1192
    SNodeListNode* listNode = NULL;
×
1193
    code = nodesMakeNode(QUERY_NODE_NODE_LIST, (SNode**)&listNode);
×
1194
    if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1195
      goto end;
×
1196
    }
1197
    listNode->pNodeList = group;
×
1198
    code = genTbGroupDigest((SNode*)listNode, digest, &context);
×
UNCOV
1199
    QUERY_CHECK_CODE(code, lino, end);
×
1200

UNCOV
1201
    nodesFree(listNode);
×
1202

UNCOV
1203
    code = pAPI->metaFn.metaGetCachedTbGroup(pVnode, pTableListInfo->idInfo.suid, context.digest,
×
1204
                                             tListLen(context.digest), &tableList);
UNCOV
1205
    QUERY_CHECK_CODE(code, lino, end);
×
1206

1207
    if (tableList) {
×
1208
      taosArrayDestroy(pTableListInfo->pTableList);
×
1209
      pTableListInfo->pTableList = tableList;
×
UNCOV
1210
      qDebug("retrieve tb group list from cache, numOfTables:%d",
×
1211
             (int32_t)taosArrayGetSize(pTableListInfo->pTableList));
UNCOV
1212
      goto end;
×
1213
    }
1214
  }
1215

1216
  pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
4,226,129✔
1217
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
4,226,129✔
1218

1219
  for (int32_t i = 0; i < rows; ++i) {
27,510,164✔
1220
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
23,284,652✔
1221
    QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
23,284,650✔
1222
    STUidTagInfo info = {.uid = pkeyInfo->uid};
23,284,650✔
1223
    void*        tmp = taosArrayPush(pUidTagList, &info);
23,284,652✔
1224
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
23,284,652✔
1225
  }
1226

1227
  if (taosArrayGetSize(pUidTagList) > 0) {
4,225,512✔
1228
    code = pAPI->metaFn.getTableTagsByUid(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
4,226,129✔
1229
  } else {
UNCOV
1230
    code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
×
1231
  }
1232
  if (code != TSDB_CODE_SUCCESS) {
4,226,129✔
UNCOV
1233
    goto end;
×
1234
  }
1235

1236
  SArray* pColList = NULL;
4,226,129✔
1237
  code = qGetColumnsFromNodeList(group, true, &pColList); 
4,226,129✔
1238
  if (code != TSDB_CODE_SUCCESS) {
4,225,370✔
UNCOV
1239
    goto end;
×
1240
  }
1241

1242
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
4,225,370✔
1243
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
4,224,852✔
1244
  taosArrayDestroy(pColList);
4,225,571✔
1245
  if (pResBlock == NULL) {
4,224,812✔
1246
    code = terrno;
×
UNCOV
1247
    goto end;
×
1248
  }
1249

1250
  //  int64_t st1 = taosGetTimestampUs();
1251
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
1252

1253
  pBlockList = taosArrayInit(2, POINTER_BYTES);
4,224,812✔
1254
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
4,226,129✔
1255

1256
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
4,226,129✔
1257
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
4,226,129✔
1258

1259
  groupData = taosArrayInit(2, POINTER_BYTES);
4,226,129✔
1260
  QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
4,224,954✔
1261

1262
  SNode* pNode = NULL;
4,224,954✔
1263
  FOREACH(pNode, group) {
8,776,044✔
1264
    SScalarParam output = {0};
4,551,849✔
1265

1266
    switch (nodeType(pNode)) {
4,551,232✔
1267
      case QUERY_NODE_VALUE:
×
UNCOV
1268
        break;
×
1269
      case QUERY_NODE_COLUMN:
4,551,849✔
1270
      case QUERY_NODE_OPERATOR:
1271
      case QUERY_NODE_FUNCTION: {
1272
        SExprNode* expNode = (SExprNode*)pNode;
4,551,849✔
1273
        code = createResultData(&expNode->resType, rows, &output);
4,551,849✔
1274
        if (code != TSDB_CODE_SUCCESS) {
4,551,419✔
UNCOV
1275
          goto end;
×
1276
        }
1277
        break;
4,551,419✔
1278
      }
1279
      case QUERY_NODE_REMOTE_VALUE: {
×
1280
        SRemoteValueNode* pRemote = (SRemoteValueNode*)pNode;
×
1281
        code = qFetchRemoteNode(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, pNode);
×
1282
        QUERY_CHECK_CODE(code, lino, end);
×
UNCOV
1283
        break;
×
1284
      }
1285
      
1286
      default:
×
1287
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
UNCOV
1288
        goto end;
×
1289
    }
1290

1291
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
4,551,419✔
1292
      SColumnNode*     pSColumnNode = (SColumnNode*)pNode;
4,448,706✔
1293
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
4,448,706✔
1294
      QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
4,448,034✔
1295
      code = colDataAssign(output.columnData, pColInfo, rows, NULL);
4,448,034✔
1296
    } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
103,143✔
UNCOV
1297
      continue;
×
1298
    } else {
1299
      gTaskScalarExtra.pStreamInfo = NULL;
103,143✔
1300
      gTaskScalarExtra.pStreamRange = NULL;
103,143✔
1301
      code = scalarCalculate(pNode, pBlockList, &output, &gTaskScalarExtra);
103,143✔
1302
    }
1303

1304
    if (code != TSDB_CODE_SUCCESS) {
4,551,621✔
1305
      releaseColInfoData(output.columnData);
×
UNCOV
1306
      goto end;
×
1307
    }
1308

1309
    void* tmp = taosArrayPush(groupData, &output.columnData);
4,551,090✔
1310
    QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
4,551,090✔
1311
  }
1312

1313
  int32_t keyLen = 0;
4,225,370✔
1314
  SNode*  node;
1315
  FOREACH(node, group) {
8,777,219✔
1316
    SExprNode* pExpr = (SExprNode*)node;
4,551,849✔
1317
    keyLen += pExpr->resType.bytes;
4,551,849✔
1318
  }
1319

1320
  int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
4,225,699✔
1321
  keyLen += nullFlagSize;
4,225,699✔
1322

1323
  keyBuf = taosMemoryCalloc(1, keyLen);
4,225,699✔
1324
  if (keyBuf == NULL) {
4,225,497✔
1325
    code = terrno;
×
UNCOV
1326
    goto end;
×
1327
  }
1328

1329
  if (initRemainGroups) {
4,225,497✔
1330
    pTableListInfo->remainGroups =
1,974,435✔
1331
        taosHashInit(rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
1,974,435✔
1332
    if (pTableListInfo->remainGroups == NULL) {
1,974,435✔
1333
      code = terrno;
×
UNCOV
1334
      goto end;
×
1335
    }
1336
  }
1337

1338
  for (int i = 0; i < rows; i++) {
27,505,838✔
1339
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
23,281,274✔
1340
    QUERY_CHECK_NULL(info, code, lino, end, terrno);
23,278,824✔
1341

1342
    if (groupIdMap != NULL){
23,278,824✔
1343
      gInfo = taosArrayInit(taosArrayGetSize(groupData), sizeof(SStreamGroupValue));
75,744✔
1344
    }
1345
    
1346
    char* isNull = (char*)keyBuf;
23,279,969✔
1347
    char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(group);
23,279,969✔
1348
    for (int j = 0; j < taosArrayGetSize(groupData); j++) {
48,090,119✔
1349
      SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
24,810,348✔
1350

1351
      if (groupIdMap != NULL && gInfo != NULL) {
24,808,006✔
1352
        int32_t ret = buildGroupInfo(pValue, i, gInfo);
115,576✔
1353
        if (ret != TSDB_CODE_SUCCESS) {
115,576✔
1354
          qError("buildGroupInfo failed at line %d since %s", __LINE__, tstrerror(ret));
×
1355
          taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
×
UNCOV
1356
          gInfo = NULL;
×
1357
        }
1358
      }
1359
      
1360
      if (colDataIsNull_s(pValue, i)) {
49,618,507✔
1361
        isNull[j] = 1;
100,865✔
1362
      } else {
1363
        isNull[j] = 0;
24,709,636✔
1364
        char* data = colDataGetData(pValue, i);
24,708,631✔
1365
        if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
24,710,213✔
1366
          // if (tTagIsJson(data)) {
1367
          //   code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
1368
          //   goto end;
1369
          // }
1370
          if (tTagIsJsonNull(data)) {
76,121✔
1371
            isNull[j] = 1;
×
UNCOV
1372
            continue;
×
1373
          }
1374
          int32_t len = getJsonValueLen(data);
76,121✔
1375
          memcpy(pStart, data, len);
76,121✔
1376
          pStart += len;
76,121✔
1377
        } else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
24,633,376✔
1378
          if (IS_STR_DATA_BLOB(pValue->info.type)) {
21,291,614✔
1379
            if (blobDataTLen(data) > TSDB_MAX_BLOB_LEN) {
1,315✔
1380
              code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
UNCOV
1381
              goto end;
×
1382
            }
1383
            memcpy(pStart, data, blobDataTLen(data));
×
UNCOV
1384
            pStart += blobDataTLen(data);
×
1385
          } else {
1386
            if (varDataTLen(data) > pValue->info.bytes) {
21,285,645✔
1387
              code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
×
UNCOV
1388
              goto end;
×
1389
            }
1390
            memcpy(pStart, data, varDataTLen(data));
21,286,332✔
1391
            pStart += varDataTLen(data);
21,286,332✔
1392
          }
1393
        } else {
1394
          memcpy(pStart, data, pValue->info.bytes);
3,346,408✔
1395
          pStart += pValue->info.bytes;
3,346,408✔
1396
        }
1397
      }
1398
    }
1399

1400
    int32_t len = (int32_t)(pStart - (char*)keyBuf);
23,277,791✔
1401
    info->groupId = calcGroupId(keyBuf, len);
23,277,791✔
1402
    if (groupIdMap != NULL && gInfo != NULL) {
23,282,885✔
1403
      int32_t ret = taosHashPut(groupIdMap, &info->groupId, sizeof(info->groupId), &gInfo, POINTER_BYTES);
75,744✔
1404
      if (ret != TSDB_CODE_SUCCESS) {
75,744✔
1405
        qError("put groupid to map failed at line %d since %s", __LINE__, tstrerror(ret));
×
UNCOV
1406
        taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
×
1407
      }
1408
      qDebug("put groupid to map gid:%" PRIu64, info->groupId);
75,744✔
1409
      gInfo = NULL;
75,744✔
1410
    }
1411
    if (initRemainGroups) {
23,282,885✔
1412
      // groupId ~ table uid
1413
      code = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid),
11,266,720✔
1414
                         sizeof(info->uid));
1415
      if (code == TSDB_CODE_DUP_KEY) {
11,264,234✔
1416
        code = TSDB_CODE_SUCCESS;
768,781✔
1417
      }
1418
      QUERY_CHECK_CODE(code, lino, end);
11,264,234✔
1419
    }
1420
  }
1421

1422
  if (tsTagFilterCache && groupIdMap == NULL) {
4,224,564✔
1423
    tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
×
UNCOV
1424
    QUERY_CHECK_NULL(tableList, code, lino, end, terrno);
×
1425

UNCOV
1426
    code = pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest,
×
1427
                                              tListLen(context.digest), tableList,
1428
                                              taosArrayGetSize(tableList) * sizeof(STableKeyInfo));
×
UNCOV
1429
    QUERY_CHECK_CODE(code, lino, end);
×
1430
  }
1431

1432
  //  int64_t st2 = taosGetTimestampUs();
1433
  //  qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
1434

1435
end:
4,223,408✔
1436
  taosMemoryFreeClear(keyBuf);
4,225,901✔
1437
  blockDataDestroy(pResBlock);
4,225,370✔
1438
  taosArrayDestroy(pBlockList);
4,225,256✔
1439
  taosArrayDestroyEx(pUidTagList, freeItem);
4,225,165✔
1440
  taosArrayDestroyP(groupData, releaseColInfoData);
4,225,383✔
1441
  taosArrayDestroyEx(gInfo, tDestroySStreamGroupValue);
4,223,662✔
1442

1443
  if (code != TSDB_CODE_SUCCESS) {
4,224,180✔
UNCOV
1444
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1445
  }
1446
  return code;
4,224,436✔
1447
}
1448

1449
static int32_t nameComparFn(const void* p1, const void* p2) {
1,506,646✔
1450
  const char* pName1 = *(const char**)p1;
1,506,646✔
1451
  const char* pName2 = *(const char**)p2;
1,506,646✔
1452

1453
  int32_t ret = strcmp(pName1, pName2);
1,506,646✔
1454
  if (ret == 0) {
1,506,646✔
1455
    return 0;
14,298✔
1456
  } else {
1457
    return (ret > 0) ? 1 : -1;
1,492,348✔
1458
  }
1459
}
1460

1461
static SArray* getTableNameList(const SNodeListNode* pList) {
786,864✔
1462
  int32_t    code = TSDB_CODE_SUCCESS;
786,864✔
1463
  int32_t    lino = 0;
786,864✔
1464
  int32_t    len = LIST_LENGTH(pList->pNodeList);
786,864✔
1465
  SListCell* cell = pList->pNodeList->pHead;
786,518✔
1466

1467
  SArray* pTbList = taosArrayInit(len, POINTER_BYTES);
786,864✔
1468
  QUERY_CHECK_NULL(pTbList, code, lino, _end, terrno);
786,518✔
1469

1470
  for (int i = 0; i < pList->pNodeList->length; i++) {
2,272,778✔
1471
    SValueNode* valueNode = (SValueNode*)cell->pNode;
1,486,260✔
1472
    if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
1,486,260✔
1473
      terrno = TSDB_CODE_INVALID_PARA;
×
1474
      taosArrayDestroy(pTbList);
×
UNCOV
1475
      return NULL;
×
1476
    }
1477

1478
    char* name = varDataVal(valueNode->datum.p);
1,486,260✔
1479
    void* tmp = taosArrayPush(pTbList, &name);
1,486,260✔
1480
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,486,260✔
1481
    cell = cell->pNext;
1,486,260✔
1482
  }
1483

1484
  size_t numOfTables = taosArrayGetSize(pTbList);
786,864✔
1485

1486
  // order the name
1487
  taosArraySort(pTbList, nameComparFn);
786,864✔
1488

1489
  // remove the duplicates
1490
  SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
786,864✔
1491
  QUERY_CHECK_NULL(pNewList, code, lino, _end, terrno);
786,518✔
1492
  void* tmpTbl = taosArrayGet(pTbList, 0);
786,518✔
1493
  QUERY_CHECK_NULL(tmpTbl, code, lino, _end, terrno);
786,864✔
1494
  void* tmp = taosArrayPush(pNewList, tmpTbl);
786,864✔
1495
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
786,864✔
1496

1497
  for (int32_t i = 1; i < numOfTables; ++i) {
1,486,260✔
1498
    char** name = taosArrayGetLast(pNewList);
699,396✔
1499
    char** nameInOldList = taosArrayGet(pTbList, i);
699,396✔
1500
    QUERY_CHECK_NULL(nameInOldList, code, lino, _end, terrno);
699,396✔
1501
    if (strcmp(*name, *nameInOldList) == 0) {
699,396✔
1502
      continue;
7,470✔
1503
    }
1504

1505
    tmp = taosArrayPush(pNewList, nameInOldList);
691,926✔
1506
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
691,926✔
1507
  }
1508

1509
_end:
786,864✔
1510
  taosArrayDestroy(pTbList);
786,864✔
1511
  if (code != TSDB_CODE_SUCCESS) {
786,518✔
1512
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1513
    return NULL;
×
1514
  }
1515
  return pNewList;
786,518✔
1516
}
1517

1518
static int tableUidCompare(const void* a, const void* b) {
×
1519
  uint64_t u1 = *(uint64_t*)a;
×
UNCOV
1520
  uint64_t u2 = *(uint64_t*)b;
×
1521

1522
  if (u1 == u2) {
×
UNCOV
1523
    return 0;
×
1524
  }
1525

UNCOV
1526
  return u1 < u2 ? -1 : 1;
×
1527
}
1528

1529
static int32_t filterTableInfoCompare(const void* a, const void* b) {
276,120✔
1530
  STUidTagInfo* p1 = (STUidTagInfo*)a;
276,120✔
1531
  STUidTagInfo* p2 = (STUidTagInfo*)b;
276,120✔
1532

1533
  if (p1->uid == p2->uid) {
276,120✔
UNCOV
1534
    return 0;
×
1535
  }
1536

1537
  return p1->uid < p2->uid ? -1 : 1;
276,120✔
1538
}
1539

1540
static FilterCondType checkTagCond(SNode* cond) {
13,862,326✔
1541
  if (nodeType(cond) == QUERY_NODE_OPERATOR) {
13,862,326✔
1542
    return FILTER_NO_LOGIC;
9,899,935✔
1543
  }
1544
  if (nodeType(cond) == QUERY_NODE_LOGIC_CONDITION && ((SLogicConditionNode*)cond)->condType == LOGIC_COND_TYPE_AND) {
3,963,581✔
1545
    return FILTER_AND;
3,351,642✔
1546
  }
1547
  return FILTER_OTHER;
608,688✔
1548
}
1549

1550
static int32_t doInWithAnd(SNode* cond, void* pVnode, SArray* list, SStorageAPI* pAPI, uint64_t suid) {
3,971,475✔
1551
  if (nodeType(cond) != QUERY_NODE_LOGIC_CONDITION || ((SLogicConditionNode*)cond)->condType != LOGIC_COND_TYPE_AND) {
3,971,475✔
1552
    return -1;
608,688✔
1553
  }
1554

1555
  SLogicConditionNode* pNode = (SLogicConditionNode*)cond;
3,360,709✔
1556
  SNodeList*           pList = (SNodeList*)pNode->pParameterList;
3,360,709✔
1557

1558
  int32_t len = LIST_LENGTH(pList);
3,359,150✔
1559
  if (len <= 0) {
3,361,079✔
UNCOV
1560
    return -1;
×
1561
  }
1562

1563
  SListCell* cell = pList->pHead;
3,361,079✔
1564
  for (int i = 0; i < len; i++) {
10,448,432✔
1565
    if (cell == NULL) break;
7,097,094✔
1566
    if (nodeType(cell->pNode) == QUERY_NODE_OPERATOR && optimizeTbnameInCondImpl(pVnode, list, cell->pNode, pAPI, suid) == 0) {
7,097,094✔
1567
      return 0;
7,722✔
1568
    }
1569
    cell = cell->pNext;
7,092,137✔
1570
  }
1571
  return -1;
3,351,338✔
1572
}
1573

1574
static int32_t optimizeTbnameInCond(void* pVnode, int64_t suid, SArray* list, SNode* cond, SStorageAPI* pAPI) {
14,648,069✔
1575
  int32_t code = 0;
14,648,069✔
1576
  int32_t lino = 0;
14,648,069✔
1577
  if (nodeType(cond) == QUERY_NODE_OPERATOR) {
14,648,069✔
1578
    code = optimizeTbnameInCondImpl(pVnode, list, cond, pAPI, suid);
10,676,684✔
1579
  } else {
1580
    code = doInWithAnd(cond, pVnode, list, pAPI, suid);
3,972,668✔
1581
    QUERY_CHECK_CODE(code, lino, end);
3,966,182✔
1582
    code = pAPI->metaFn.getTableTagsByUid(pVnode, suid, list);
6,707✔
1583
  }
1584
  
1585
end:
14,645,653✔
1586
  return code;
14,645,653✔
1587
}
1588

1589
static int32_t getTableListInInOperator(void* pVnode, SArray* pExistedUidList, SNodeListNode* pList, SStorageAPI* pStoreAPI,
786,864✔
1590
                                        uint64_t suid) {
1591
  int32_t   code = 0;                                          
786,864✔
1592
  SArray*   pTbList = getTableNameList(pList);
786,864✔
1593
  int32_t   numOfTables = taosArrayGetSize(pTbList);
786,518✔
1594
  SHashObj* uHash = NULL;
786,518✔
1595

1596
  size_t numOfExisted = taosArrayGetSize(pExistedUidList);  // len > 0 means there already have uids
786,518✔
1597
  if (numOfExisted > 0) {
786,518✔
1598
    uHash = taosHashInit(numOfExisted / 0.7, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
2,062✔
1599
    if (!uHash) {
2,062✔
1600
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1601
      code = terrno;
×
UNCOV
1602
      goto end;
×
1603
    }
1604

1605
    for (int i = 0; i < numOfExisted; i++) {
2,060,969✔
1606
      STUidTagInfo* pTInfo = taosArrayGet(pExistedUidList, i);
2,058,907✔
1607
      if (!pTInfo) {
2,058,907✔
1608
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1609
        code = terrno;
×
UNCOV
1610
        goto end;
×
1611
      }
1612
      code = taosHashPut(uHash, &pTInfo->uid, sizeof(uint64_t), &i, sizeof(i));
2,058,907✔
1613
      if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_DUP_KEY) {
2,058,907✔
1614
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
1615
        goto end;
×
1616
      }
1617
    }
1618
  }
1619
  taosArrayClear(pExistedUidList);
786,518✔
1620

1621
  for (int i = 0; i < numOfTables; i++) {
2,264,616✔
1622
    char* name = taosArrayGetP(pTbList, i);
1,477,752✔
1623
    uint64_t uid = 0, csuid = 0;
1,478,444✔
1624
    if (pStoreAPI->metaFn.getTableUidByName(pVnode, name, &uid) != 0) {
1,478,098✔
1625
      continue;
939,627✔
1626
    }
1627

1628
    ETableType tbType = TSDB_TABLE_MAX;
538,817✔
1629
    if (pStoreAPI->metaFn.getTableTypeSuidByName(pVnode, name, &tbType, &csuid) == 0 &&
538,817✔
1630
        (tbType == TSDB_CHILD_TABLE || tbType == TSDB_VIRTUAL_CHILD_TABLE) &&
538,817✔
1631
        csuid == suid) {
533,023✔
1632
      if (NULL == uHash || taosHashGet(uHash, &uid, sizeof(uid)) != NULL) {
530,967✔
1633
        STUidTagInfo s = {.uid = uid, .name = name, .pTagVal = NULL};
529,936✔
1634
        void*        tmp = taosArrayPush(pExistedUidList, &s);
529,936✔
1635
        if (!tmp) {
529,936✔
1636
          code = terrno;
×
UNCOV
1637
          goto end;
×
1638
        }
1639
      }
1640
    } 
1641
  }
1642
  taosArraySort(pExistedUidList, filterTableInfoCompare);
786,864✔
1643
  taosArrayRemoveDuplicate(pExistedUidList, filterTableInfoCompare, NULL);
786,864✔
1644
end:
786,864✔
1645
  taosArrayDestroy(pTbList);
786,864✔
1646
  taosHashCleanup(uHash);
786,518✔
1647
  return code;
786,518✔
1648
}
1649

1650
// only return uid that does not contained in pExistedUidList
1651
static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, SNode* pTagCond, SStorageAPI* pStoreAPI,
17,776,228✔
1652
                                        uint64_t suid) {
1653
  SOperatorNode* pNode = (SOperatorNode*)pTagCond;
17,776,228✔
1654
  if (pNode->opType != OP_TYPE_IN) {
17,776,228✔
1655
    return -1;
15,746,079✔
1656
  }
1657

1658
  if ((pNode->pLeft != NULL && ((nodeType(pNode->pLeft) == QUERY_NODE_FUNCTION &&
2,027,346✔
1659
                                 ((SFunctionNode*)pNode->pLeft)->funcType == FUNCTION_TYPE_TBNAME)) ||
789,016✔
1660
       (nodeType(pNode->pLeft) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pLeft)->colType == COLUMN_TYPE_TBNAME)) &&
1,238,710✔
1661
      (pNode->pRight != NULL && nodeType(pNode->pRight) == QUERY_NODE_NODE_LIST)) {
789,345✔
1662
    SNodeListNode* pList = (SNodeListNode*)pNode->pRight;
786,864✔
1663

1664
    int32_t len = LIST_LENGTH(pList->pNodeList);
786,864✔
1665
    if (len <= 0) {
786,864✔
UNCOV
1666
      return -1;
×
1667
    }
1668

1669
    return getTableListInInOperator(pVnode, pExistedUidList, pList, pStoreAPI, suid);
786,864✔
1670
  }
1671

1672
  return -1;
1,240,845✔
1673
}
1674

1675
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
18,194,120✔
1676
                                        SStorageAPI* pStorageAPI) {
1677
  int32_t      code = TSDB_CODE_SUCCESS;
18,194,120✔
1678
  int32_t      lino = 0;
18,194,120✔
1679
  SSDataBlock* pResBlock = NULL;
18,194,120✔
1680
  code = createDataBlock(&pResBlock);
18,198,686✔
1681
  QUERY_CHECK_CODE(code, lino, _end);
18,193,510✔
1682

1683
  for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
38,140,754✔
1684
    SColumnInfoData colInfo = {0};
19,944,535✔
1685
    void*           tmp = taosArrayGet(pColList, i);
19,942,447✔
1686
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
19,927,525✔
1687
    colInfo.info = *(SColumnInfo*)tmp;
19,927,525✔
1688
    code = blockDataAppendColInfo(pResBlock, &colInfo);
19,925,116✔
1689
    QUERY_CHECK_CODE(code, lino, _end);
19,944,221✔
1690
  }
1691

1692
  code = blockDataEnsureCapacity(pResBlock, numOfTables);
18,192,331✔
1693
  if (code != TSDB_CODE_SUCCESS) {
18,193,083✔
1694
    terrno = code;
×
1695
    blockDataDestroy(pResBlock);
×
UNCOV
1696
    return NULL;
×
1697
  }
1698

1699
  pResBlock->info.rows = numOfTables;
18,193,083✔
1700

1701
  int32_t numOfCols = taosArrayGetSize(pResBlock->pDataBlock);
18,200,346✔
1702

1703
  for (int32_t i = 0; i < numOfTables; i++) {
210,953,547✔
1704
    STUidTagInfo* p1 = taosArrayGet(pUidTagList, i);
192,743,180✔
1705
    QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
192,759,082✔
1706

1707
    for (int32_t j = 0; j < numOfCols; j++) {
392,992,594✔
1708
      SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
200,153,579✔
1709
      QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
200,082,718✔
1710

1711
      if (pColInfo->info.colId == -1) {  // tbname
200,082,718✔
1712
        char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
1,003,494✔
1713
        if (p1->name != NULL) {
1,002,976✔
1714
          STR_TO_VARSTR(str, p1->name);
5,663✔
1715
        } else {  // name is not retrieved during filter
1716
          code = pStorageAPI->metaFn.getTableNameByUid(pVnode, p1->uid, str);
996,149✔
1717
          QUERY_CHECK_CODE(code, lino, _end);
997,831✔
1718
        }
1719

1720
        code = colDataSetVal(pColInfo, i, str, false);
1,003,236✔
1721
        QUERY_CHECK_CODE(code, lino, _end);
1,003,012✔
1722
#if TAG_FILTER_DEBUG
1723
        qDebug("tagfilter uid:%ld, tbname:%s", *uid, str + 2);
1724
#endif
1725
      } else {
1726
        STagVal tagVal = {0};
199,107,978✔
1727
        tagVal.cid = pColInfo->info.colId;
199,104,577✔
1728
        if (p1->pTagVal == NULL) {
199,134,689✔
1729
          colDataSetNULL(pColInfo, i);
4,300✔
1730
        } else {
1731
          const char* p = pStorageAPI->metaFn.extractTagVal(p1->pTagVal, pColInfo->info.type, &tagVal);
199,101,460✔
1732

1733
          if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) {
199,167,306✔
1734
            colDataSetNULL(pColInfo, i);
2,208,266✔
1735
          } else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
196,954,705✔
1736
            code = colDataSetVal(pColInfo, i, p, false);
601,786✔
1737
            QUERY_CHECK_CODE(code, lino, _end);
601,786✔
1738
          } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
327,941,223✔
1739
            if (IS_STR_DATA_BLOB(pColInfo->info.type)) {
131,537,584✔
UNCOV
1740
              QUERY_CHECK_CODE(code = TSDB_CODE_BLOB_NOT_SUPPORT_TAG, lino, _end);
×
1741
            }
1742
            char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
131,547,063✔
1743
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
131,574,354✔
1744
            varDataSetLen(tmp, tagVal.nData);
131,574,354✔
1745
            memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
131,572,936✔
1746
            code = colDataSetVal(pColInfo, i, tmp, false);
131,568,196✔
1747
#if TAG_FILTER_DEBUG
1748
            qDebug("tagfilter varch:%s", tmp + 2);
1749
#endif
1750
            taosMemoryFree(tmp);
131,585,570✔
1751
            QUERY_CHECK_CODE(code, lino, _end);
131,586,600✔
1752
          } else {
1753
            code = colDataSetVal(pColInfo, i, (const char*)&tagVal.i64, false);
64,817,483✔
1754
            QUERY_CHECK_CODE(code, lino, _end);
64,836,788✔
1755
#if TAG_FILTER_DEBUG
1756
            if (pColInfo->info.type == TSDB_DATA_TYPE_INT) {
1757
              qDebug("tagfilter int:%d", *(int*)(&tagVal.i64));
1758
            } else if (pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE) {
1759
              qDebug("tagfilter double:%f", *(double*)(&tagVal.i64));
1760
            }
1761
#endif
1762
          }
1763
        }
1764
      }
1765
    }
1766
  }
1767

1768
_end:
18,211,661✔
1769
  if (code != TSDB_CODE_SUCCESS) {
18,210,625✔
1770
    blockDataDestroy(pResBlock);
1,581✔
1771
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
258✔
1772
    terrno = code;
258✔
1773
    return NULL;
258✔
1774
  }
1775
  return pResBlock;
18,209,166✔
1776
}
1777

1778
static int32_t doSetQualifiedUid(SArray* pUidList, const SArray* pUidTagList, SColumnInfoData* colData) {
14,620,663✔
1779
  taosArrayClear(pUidList);
14,620,663✔
1780

1781
  int32_t       numOfTables = taosArrayGetSize(pUidTagList);
14,610,930✔
1782
  bool* pResultList = colData == NULL ? NULL : (bool*)colData->pData;
14,621,918✔
1783
  for (int32_t i = 0; i < numOfTables; ++i) {
181,884,204✔
1784
    if (pResultList != NULL && !pResultList[i]) {
167,241,327✔
1785
       continue;
91,753,884✔
1786
    }
1787
    STUidTagInfo* tmpTag = (STUidTagInfo*)taosArrayGet(pUidTagList, i);
75,514,449✔
1788
    if (!tmpTag) {
75,503,867✔
1789
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
1790
      return terrno;
×
1791
    }
1792
    uint64_t uid = tmpTag->uid;
75,503,867✔
1793
    qDebug("tagfilter get uid:%" PRId64, uid);
75,509,000✔
1794

1795
    void* tmp = taosArrayPush(pUidList, &uid);
75,518,432✔
1796
    if (tmp == NULL) {
75,518,432✔
UNCOV
1797
      return terrno;
×
1798
    }
1799
  }
1800

1801
  return TSDB_CODE_SUCCESS;
14,642,877✔
1802
}
1803

1804
static int32_t copyExistedUids(SArray* pUidTagList, const SArray* pUidList) {
14,650,034✔
1805
  int32_t code = TSDB_CODE_SUCCESS;
14,650,034✔
1806
  int32_t numOfExisted = taosArrayGetSize(pUidList);
14,650,034✔
1807
  if (numOfExisted == 0) {
14,649,424✔
1808
    return code;
12,049,820✔
1809
  }
1810

1811
  for (int32_t i = 0; i < numOfExisted; ++i) {
28,950,050✔
1812
    uint64_t* uid = taosArrayGet(pUidList, i);
26,349,600✔
1813
    if (!uid) {
26,348,983✔
1814
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
1815
      return terrno;
×
1816
    }
1817
    STUidTagInfo info = {.uid = *uid};
26,348,983✔
1818
    void*        tmp = taosArrayPush(pUidTagList, &info);
26,350,446✔
1819
    if (!tmp) {
26,350,446✔
1820
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
1821
      return code;
×
1822
    }
1823
  }
1824
  return code;
2,600,450✔
1825
}
1826

1827
static EDealRes getCids(SNode** pNode, void* pContext) {
6,408✔
1828
  SHashObj* colHash = (SHashObj*)pContext;
6,408✔
1829
  col_id_t colId = 0;
6,408✔
1830
  if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
6,408✔
1831
    colId = (*(SColumnNode**)pNode)->colId;
2,136✔
1832
  } else {
1833
    return DEAL_RES_CONTINUE;
4,272✔
1834
  }
1835

1836
  int32_t tempRes = taosHashPut(colHash, &colId, sizeof(colId), pNode, sizeof((*pNode)));
2,136✔
1837
  if (tempRes != TSDB_CODE_SUCCESS ) {
2,136✔
UNCOV
1838
    return DEAL_RES_ERROR;
×
1839
  }
1840

1841
  return DEAL_RES_CONTINUE;
2,136✔
1842
}
1843

1844
SNode* getTagCondNodeForQueryTmq(void* tinfo) {
654✔
1845
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
654✔
1846
  const char*    id = GET_TASKID(pTaskInfo);
654✔
1847
  int32_t        code = 0;
654✔
1848

1849
  // traverse to the stream scanner node to add this table id
1850
  SOperatorInfo* pInfo = NULL;
654✔
1851
  code = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pInfo);
654✔
1852
  if (code != 0 || pInfo == NULL) {
654✔
UNCOV
1853
    return NULL;
×
1854
  }
1855

1856
  SStreamScanInfo* pScanInfo = pInfo->info;
654✔
1857
  return pScanInfo->pTagCond;
654✔
1858
}
1859

1860
SNode* getTagCondNodeForStableTmq(void* node) {
2,136✔
1861
  return node == NULL ? NULL : ((SSubplan*)node)->pTagCond;
2,136✔
1862
}
1863

1864
bool checkCidInTagCondition(SNode* pTagCond, SArray* cidList) {
2,790✔
1865
  if (pTagCond == NULL) { 
2,790✔
1866
    return false;
654✔
1867
  }
1868
  bool result = true;
2,136✔
1869
  SHashObj *colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), true, HASH_NO_LOCK);
2,136✔
1870
  if (colHash == NULL) {
2,136✔
UNCOV
1871
    goto end;
×
1872
  }
1873

1874
  nodesRewriteExprPostOrder(&pTagCond, getCids, (void*)colHash);
2,136✔
1875
  if (cidList != NULL) {
2,136✔
1876
    int32_t numOfCols = taosArrayGetSize(cidList);
2,136✔
1877
    int32_t i = 0;
2,136✔
1878
    for (; i < numOfCols; ++i) {
3,204✔
1879
      col_id_t* pCid = taosArrayGet(cidList, i);
2,136✔
1880
      void* data = taosHashGet(colHash, pCid, sizeof(*pCid));
2,136✔
1881
      if (data != NULL) break;
2,136✔
1882
    }
1883
    if (i == numOfCols) { // no tag column involved in condition
2,136✔
1884
      result = false;
1,068✔
1885
      goto end;
1,068✔
1886
    }
1887
  }
1888

1889
end:
1,068✔
1890
  taosHashCleanup(colHash);
2,136✔
1891
  return result;
2,136✔
1892
}
1893

1894
int32_t doFilterByTagCond(int64_t suid, SArray* pUidList, SNode* pTagCond, void* pVnode,
263,674,992✔
1895
                                 SIdxFltStatus status, SStorageAPI* pAPI, void* pStreamInfo) {
1896
  if (pTagCond == NULL) {
263,674,992✔
1897
    return TSDB_CODE_SUCCESS;
249,041,286✔
1898
  }
1899

1900
  terrno = TSDB_CODE_SUCCESS;
14,633,706✔
1901

1902
  int32_t      lino = 0;
14,646,636✔
1903
  int32_t      code = TSDB_CODE_SUCCESS;
14,646,636✔
1904
  SArray*      pBlockList = NULL;
14,646,636✔
1905
  SSDataBlock* pResBlock = NULL;
14,646,636✔
1906
  SScalarParam output = {0};
14,647,881✔
1907
  SArray*      pUidTagList = NULL;
14,648,006✔
1908

1909
  SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
14,648,006✔
1910

1911
  pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
14,647,020✔
1912
  QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
14,640,658✔
1913

1914
  // pUidList size is 0 if tagIndex is NULL and in query condition
1915
  code = copyExistedUids(pUidTagList, pUidList);
14,640,658✔
1916
  QUERY_CHECK_CODE(code, lino, end);
14,641,307✔
1917

1918
  // Narrow down the scope of the tablelist set if there is tbname in condition and And Logical operator
1919
  code = optimizeTbnameInCond(pVnode, suid, pUidTagList, pTagCond, pAPI);
14,641,307✔
1920
  if (code == 0) {
14,646,368✔
1921
    if (nodeType(pTagCond) == QUERY_NODE_OPERATOR) {
786,518✔
1922
      goto end;
778,796✔
1923
    }
1924
  } else {
1925
    qDebug("pUidTagList size:%d", (int32_t)taosArrayGetSize(pUidTagList));
13,859,850✔
1926

1927
    FilterCondType condType = checkTagCond(pTagCond);
13,861,218✔
1928
    if (((condType == FILTER_NO_LOGIC || condType == FILTER_AND) && status != SFLT_NOT_INDEX) || // (super table) use tagIndex and operator is and
13,862,326✔
1929
        (status == SFLT_NOT_INDEX && taosArrayGetSize(pUidTagList) > 0)) {                       // (child table with tagCond)
13,862,827✔
1930
      code = pAPI->metaFn.getTableTagsByUid(pVnode, suid, pUidTagList);
2,597,461✔
1931
    } else {
1932
      taosArrayClearEx(pUidTagList, freeItem);       // clear tablelist if using tagIndex and or condition
11,262,207✔
1933
      code = pAPI->metaFn.getTableTags(pVnode, suid, pUidTagList);
11,262,428✔
1934
    }
1935
    if (code != TSDB_CODE_SUCCESS) {
13,860,503✔
1936
      qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code),suid);
×
1937
      terrno = code;
×
UNCOV
1938
      QUERY_CHECK_CODE(code, lino, end);
×
1939
    }
1940
  }
1941

1942
  qDebug("final pUidTagList size:%d", (int32_t)taosArrayGetSize(pUidTagList));
13,868,225✔
1943

1944
  int32_t numOfTables = taosArrayGetSize(pUidTagList);
13,869,955✔
1945
  if (numOfTables == 0) {
13,869,553✔
1946
    goto end;
1,631,943✔
1947
  }
1948

1949
  SArray* pColList = NULL;
12,237,610✔
1950
  code = qGetColumnsFromNodeList(pTagCond, false, &pColList); 
12,238,969✔
1951
  if (code != TSDB_CODE_SUCCESS) {
12,220,874✔
UNCOV
1952
    goto end;
×
1953
  }
1954
  pResBlock = createTagValBlockForFilter(pColList, numOfTables, pUidTagList, pVnode, pAPI);
12,220,874✔
1955
  taosArrayDestroy(pColList);
12,236,193✔
1956
  if (pResBlock == NULL) {
12,233,957✔
1957
    code = terrno;
×
UNCOV
1958
    QUERY_CHECK_CODE(code, lino, end);
×
1959
  }
1960

1961
  //fprintDataBlock(pResBlock, "tagFilter", "", 0);
1962

1963
  //  int64_t st1 = taosGetTimestampUs();
1964
  //  qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
1965
  pBlockList = taosArrayInit(2, POINTER_BYTES);
12,233,957✔
1966
  QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
12,234,652✔
1967

1968
  void* tmp = taosArrayPush(pBlockList, &pResBlock);
12,235,149✔
1969
  QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
12,235,149✔
1970

1971
  code = createResultData(&type, numOfTables, &output);
12,235,149✔
1972
  if (code != TSDB_CODE_SUCCESS) {
12,228,796✔
1973
    terrno = code;
×
UNCOV
1974
    QUERY_CHECK_CODE(code, lino, end);
×
1975
  }
1976

1977
  gTaskScalarExtra.pStreamInfo = pStreamInfo;
12,228,796✔
1978
  gTaskScalarExtra.pStreamRange = NULL;
12,228,796✔
1979
  code = scalarCalculate(pTagCond, pBlockList, &output, &gTaskScalarExtra);
12,226,825✔
1980
  if (code != TSDB_CODE_SUCCESS) {
12,208,435✔
1981
    qError("failed to calculate scalar, reason:%s", tstrerror(code));
934✔
1982
    terrno = code;
934✔
1983
    QUERY_CHECK_CODE(code, lino, end);
934✔
1984
  }
1985

1986
end:
14,619,174✔
1987
  if (code == 0) {
14,633,528✔
1988
    code = doSetQualifiedUid(pUidList, pUidTagList, output.columnData);
14,635,157✔
1989
    lino = __LINE__;
14,646,944✔
1990
  }
1991
  
1992
  if (code != 0) {
14,645,315✔
1993
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
934✔
1994
  }
1995
  blockDataDestroy(pResBlock);
14,645,315✔
1996
  taosArrayDestroy(pBlockList);
14,628,077✔
1997
  taosArrayDestroyEx(pUidTagList, freeItem);
14,629,461✔
1998

1999
  colDataDestroy(output.columnData);
14,643,976✔
2000
  taosMemoryFreeClear(output.columnData);
14,641,679✔
2001
  return code;
14,646,887✔
2002
}
2003

2004
typedef struct {
2005
  int32_t code;
2006
  SStreamRuntimeFuncInfo* pStreamRuntimeInfo;
2007
} PlaceHolderContext;
2008

2009
static EDealRes replacePlaceHolderColumn(SNode** pNode, void* pContext) {
101,394✔
2010
  PlaceHolderContext* pData = (PlaceHolderContext*)pContext;
101,394✔
2011
  if (QUERY_NODE_FUNCTION != nodeType((*pNode))) {
101,394✔
2012
    return DEAL_RES_CONTINUE;
84,280✔
2013
  }
2014
  SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
17,114✔
2015
  if (!fmIsStreamPesudoColVal(pFuncNode->funcId)) {
17,114✔
2016
    return DEAL_RES_CONTINUE;
516✔
2017
  }
2018
  pData->code = fmSetStreamPseudoFuncParamVal(pFuncNode->funcId, pFuncNode->pParameterList, pData->pStreamRuntimeInfo);
16,598✔
2019
  if (pData->code != TSDB_CODE_SUCCESS) {
16,598✔
UNCOV
2020
    return DEAL_RES_ERROR;
×
2021
  }
2022
  SNode* pFirstParam = nodesListGetNode(pFuncNode->pParameterList, 0);
16,598✔
2023
  ((SValueNode*)pFirstParam)->translate = true;
16,598✔
2024
  SValueNode* res = NULL;
16,598✔
2025
  pData->code = nodesCloneNode(pFirstParam, (SNode**)&res);
16,598✔
2026
  if (NULL == res) {
16,598✔
UNCOV
2027
    return DEAL_RES_ERROR;
×
2028
  }
2029
  nodesDestroyNode(*pNode);
16,598✔
2030
  *pNode = (SNode*)res;
16,598✔
2031

2032
  return DEAL_RES_CONTINUE;
16,598✔
2033
}
2034

2035
static void extractTagColId(SOperatorNode* pOpNode, SArray* pColIdArray) {
27,864✔
2036
  SNode* pLeft = pOpNode->pLeft;
27,864✔
2037
  SNode* pRight = pOpNode->pRight;
27,864✔
2038
  SColumnNode* pColNode = nodeType(pLeft) == QUERY_NODE_COLUMN ?
27,864✔
2039
    (SColumnNode*)pLeft : (SColumnNode*)pRight;
27,864✔
2040

2041
  col_id_t colId = pColNode->colId;
27,864✔
2042
  void* _tmp = taosArrayPush(pColIdArray, &colId);
27,864✔
2043
}
27,864✔
2044

2045
static int32_t buildTagCondKey(
13,932✔
2046
  const SNode* pTagCond, char** pTagCondKey,
2047
  int32_t* tagCondKeyLen, SArray** pTagColIds) {
2048
  if (NULL == pTagCond ||
13,932✔
2049
    (nodeType(pTagCond) != QUERY_NODE_OPERATOR &&
13,932✔
2050
      nodeType(pTagCond) != QUERY_NODE_LOGIC_CONDITION)) {
13,932✔
2051
    qError("invalid parameter to extract tag filter symbol");
×
UNCOV
2052
    return TSDB_CODE_INTERNAL_ERROR;
×
2053
  }
2054
  int32_t code = TSDB_CODE_SUCCESS;
13,932✔
2055
  int32_t lino = 0;
13,932✔
2056
  *pTagColIds = taosArrayInit(4, sizeof(col_id_t));
13,932✔
2057

2058
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR) {
13,932✔
UNCOV
2059
    extractTagColId((SOperatorNode*)pTagCond, *pTagColIds);
×
2060
  } else if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION) {
13,932✔
2061
    SNode* pChild = NULL;
13,932✔
2062
    FOREACH(pChild, ((SLogicConditionNode*)pTagCond)->pParameterList) {
41,796✔
2063
      extractTagColId((SOperatorNode*)pChild, *pTagColIds);
27,864✔
2064
    }
2065
  }
2066

2067
  taosArraySort(*pTagColIds, compareUint16Val);
13,932✔
2068

2069
  // encode ordered colIds into key string, separated by ','
2070
  *tagCondKeyLen =
27,864✔
2071
    (int32_t)(taosArrayGetSize(*pTagColIds) * (sizeof(col_id_t) + 1) - 1);
13,932✔
2072
  *pTagCondKey = (char*)taosMemoryCalloc(1, *tagCondKeyLen);
13,932✔
2073
  TSDB_CHECK_NULL(*pTagCondKey, code, lino, _end, terrno);
13,932✔
2074
  char* pStart = *pTagCondKey;
13,932✔
2075
  for (int32_t i = 0; i < taosArrayGetSize(*pTagColIds); ++i) {
41,796✔
2076
    col_id_t* pColId = (col_id_t*)taosArrayGet(*pTagColIds, i);
27,864✔
2077
    TSDB_CHECK_NULL(pColId, code, lino, _end, terrno);
27,864✔
2078
    memcpy(pStart, pColId, sizeof(col_id_t));
27,864✔
2079
    pStart += sizeof(col_id_t);
27,864✔
2080
    if (i != taosArrayGetSize(*pTagColIds) - 1) {
27,864✔
2081
      *pStart = ',';
13,932✔
2082
      pStart += 1;
13,932✔
2083
    }
2084
  }
2085

2086
_end:
13,932✔
2087
  if (TSDB_CODE_SUCCESS != code) {
13,932✔
2088
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
2089
    terrno = code;
×
2090
  }
2091
  return code;
13,932✔
2092
}
2093

2094
static EDealRes canOptimizeTagCondFilter(SNode* pTagCond, void* pContext) {
128,656✔
2095
  if (NULL == pTagCond) {
128,656✔
2096
    *(bool*)pContext = false;
×
UNCOV
2097
    return DEAL_RES_END;
×
2098
  }
2099
  if (nodeType(pTagCond) == QUERY_NODE_VALUE ||
128,656✔
2100
    nodeType(pTagCond) == QUERY_NODE_COLUMN) {
85,484✔
2101
    return DEAL_RES_CONTINUE;
71,036✔
2102
  }
2103
  if (nodeType(pTagCond) == QUERY_NODE_OPERATOR &&
57,620✔
2104
    ((SOperatorNode*)pTagCond)->opType == OP_TYPE_EQUAL) {
28,380✔
2105
    return DEAL_RES_CONTINUE;
27,864✔
2106
  }
2107
  if (nodeType(pTagCond) == QUERY_NODE_LOGIC_CONDITION &&
29,756✔
2108
    ((SLogicConditionNode*)pTagCond)->condType == LOGIC_COND_TYPE_AND) {
13,932✔
2109
    return DEAL_RES_CONTINUE;
13,932✔
2110
  }
2111
  if (nodeType(pTagCond) == QUERY_NODE_FUNCTION &&
31,132✔
2112
    fmIsStreamPesudoColVal(((SFunctionNode*)pTagCond)->funcId)) {
15,308✔
2113
    return DEAL_RES_CONTINUE;
15,308✔
2114
  }
2115
  *(bool*)pContext = false;
516✔
2116
  return DEAL_RES_END;
516✔
2117
}
2118

2119
int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
263,616,506✔
2120
                     STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI, void* pStreamInfo) {
2121
  int32_t code = TSDB_CODE_SUCCESS;
263,616,506✔
2122
  int32_t lino = 0;
263,616,506✔
2123
  size_t  numOfTables = 0;
263,616,506✔
2124

2125
  pListInfo->idInfo.suid = pScanNode->suid;
263,616,506✔
2126
  pListInfo->idInfo.tableType = pScanNode->tableType;
263,609,966✔
2127

2128
  SArray* pUidList = taosArrayInit(8, sizeof(uint64_t));
263,589,336✔
2129
  QUERY_CHECK_NULL(pUidList, code, lino, end, terrno);
263,528,256✔
2130

2131
  char*   pTagCondKey = NULL;
263,528,256✔
2132
  int32_t tagCondKeyLen;
263,511,109✔
2133
  SArray* pTagColIds = NULL;
263,524,795✔
2134
  char*   pPayload = NULL;
263,548,444✔
2135
  SIdxFltStatus status = SFLT_NOT_INDEX;
263,548,444✔
2136

2137
  qTrace("getTableList called, suid:%" PRIu64
263,548,444✔
2138
    ", tagCond:%p, tagIndexCond:%p, %d %d", pScanNode->suid, pTagCond,
2139
    pTagIndexCond, pScanNode->tableType, pScanNode->virtualStableScan);
2140
  if (pScanNode->tableType != TSDB_SUPER_TABLE && !pScanNode->virtualStableScan) {
263,553,764✔
2141
    pListInfo->idInfo.uid = pScanNode->uid;
143,431,235✔
2142
    if (pStorageAPI->metaFn.isTableExisted(pVnode, pScanNode->uid)) {
143,450,455✔
2143
      void* tmp = taosArrayPush(pUidList, &pScanNode->uid);
143,421,726✔
2144
      QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
143,419,738✔
2145
    }
2146
    code = doFilterByTagCond(pScanNode->suid, pUidList, pTagCond, pVnode, status, pStorageAPI, pStreamInfo);
143,466,118✔
2147
    QUERY_CHECK_CODE(code, lino, end);
143,462,213✔
2148
  } else {
2149
    bool      isStream = (pStreamInfo != NULL);
120,234,328✔
2150
    bool      hasTagCond = (pTagCond != NULL);
120,234,328✔
2151
    bool      canCacheTagEqCondFilter = false;
120,234,328✔
2152
    T_MD5_CTX context = {0};
120,180,087✔
2153

2154
    qTrace("start to get table list by tag filter, suid:%" PRIu64
120,259,697✔
2155
      ",tsStableTagFilterCache:%d, tsTagFilterCache:%d", 
2156
      pScanNode->suid, tsStableTagFilterCache, tsTagFilterCache);
2157

2158
    bool acquired = false;
120,259,314✔
2159
    // first, check whether we can use stable tag filter cache
2160
    if (tsStableTagFilterCache && isStream && hasTagCond) {
120,180,114✔
2161
      canCacheTagEqCondFilter = true;
14,448✔
2162
      nodesWalkExpr(pTagCond, canOptimizeTagCondFilter,
14,448✔
2163
        (void*)&canCacheTagEqCondFilter);
2164
    }
2165
    if (canCacheTagEqCondFilter) {
119,964,458✔
2166
      qDebug("%s, stable tag filter condition can be optimized", idstr);
13,932✔
2167
      if (((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
13,932✔
2168
        SNode* tmp = NULL;
13,932✔
2169
        code = nodesCloneNode((SNode*)pTagCond, &tmp);
13,932✔
2170
        QUERY_CHECK_CODE(code, lino, end);
13,932✔
2171

2172
        PlaceHolderContext ctx = {.code = TSDB_CODE_SUCCESS, .pStreamRuntimeInfo = (SStreamRuntimeFuncInfo*)pStreamInfo};
13,932✔
2173
        nodesRewriteExpr(&tmp, replacePlaceHolderColumn, (void*)&ctx);
13,932✔
2174
        if (TSDB_CODE_SUCCESS != ctx.code) {
13,932✔
2175
          nodesDestroyNode(tmp);
×
2176
          code = ctx.code;
×
UNCOV
2177
          goto end;
×
2178
        }
2179
        code = genStableTagFilterDigest(tmp, &context);
13,932✔
2180
        nodesDestroyNode(tmp);
13,932✔
2181
      } else {
UNCOV
2182
        code = genStableTagFilterDigest(pTagCond, &context);
×
2183
      }
2184
      QUERY_CHECK_CODE(code, lino, end);
13,932✔
2185

2186
      code = buildTagCondKey(
13,932✔
2187
        pTagCond, &pTagCondKey, &tagCondKeyLen, &pTagColIds);
2188
      QUERY_CHECK_CODE(code, lino, end);
13,932✔
2189
      code = pStorageAPI->metaFn.getStableCachedTableList(
13,932✔
2190
        pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
13,932✔
2191
        context.digest, tListLen(context.digest), pUidList, &acquired);
2192
      QUERY_CHECK_CODE(code, lino, end);
13,932✔
2193
    } else if (tsTagFilterCache) {
119,950,526✔
2194
      // second, try to use normal tag filter cache
2195
      qDebug("%s using normal tag filter cache", idstr);
32,766✔
2196
      if (pStreamInfo != NULL && ((SStreamRuntimeFuncInfo*)pStreamInfo)->hasPlaceHolder) {
34,056✔
2197
        SNode* tmp = NULL;
1,290✔
2198
        code = nodesCloneNode((SNode*)pTagCond, &tmp);
1,290✔
2199
        QUERY_CHECK_CODE(code, lino, end);
1,290✔
2200

2201
        PlaceHolderContext ctx = {.code = TSDB_CODE_SUCCESS, .pStreamRuntimeInfo = (SStreamRuntimeFuncInfo*)pStreamInfo};
1,290✔
2202
        nodesRewriteExpr(&tmp, replacePlaceHolderColumn, (void*)&ctx);
1,290✔
2203
        if (TSDB_CODE_SUCCESS != ctx.code) {
1,290✔
2204
          nodesDestroyNode(tmp);
×
2205
          code = ctx.code;
×
UNCOV
2206
          goto end;
×
2207
        }
2208
        code = genTagFilterDigest(tmp, &context);
1,290✔
2209
        nodesDestroyNode(tmp);
1,290✔
2210
      } else {
2211
        code = genTagFilterDigest(pTagCond, &context);
31,476✔
2212
      }
2213
      // try to retrieve the result from meta cache
2214
      QUERY_CHECK_CODE(code, lino, end);      
32,766✔
2215
      code = pStorageAPI->metaFn.getCachedTableList(
32,766✔
2216
        pVnode, pScanNode->suid, context.digest,
32,766✔
2217
        tListLen(context.digest), pUidList, &acquired);
2218
      QUERY_CHECK_CODE(code, lino, end);
173,087✔
2219
    }
2220
    if (acquired) {
120,104,779✔
2221
      taosArrayDestroy(pTagColIds);
33,540✔
2222
      pTagColIds = NULL;
33,540✔
2223
      
2224
      if (digest != NULL) {
33,540✔
2225
        digest[0] = 1;
33,540✔
2226
        memcpy(digest + 1, context.digest, tListLen(context.digest));
33,540✔
2227
      }
2228
      
2229
      qDebug("suid:%" PRIu64 ", %s retrieve table uid list from cache, numOfTables:%d", 
33,540✔
2230
        pScanNode->suid, idstr, (int32_t)taosArrayGetSize(pUidList));
2231
      goto end;
33,540✔
2232
    } else {
2233
      qDebug("suid:%" PRIu64 
120,071,239✔
2234
        ", failed to get table uid list from cache", pScanNode->suid);
2235
    }
2236

2237
    if (!pTagCond) {  // no tag filter condition exists, let's fetch all tables of this super table
120,211,369✔
2238
      code = pStorageAPI->metaFn.getChildTableList(pVnode, pScanNode->suid, pUidList);
105,853,796✔
2239
      QUERY_CHECK_CODE(code, lino, end);
105,829,369✔
2240
      qTrace("no tag filter, get all child tables, numOfTables:%d", (int32_t)taosArrayGetSize(pUidList));
105,829,369✔
2241
    } else {
2242
      SIdxFltStatus status = SFLT_NOT_INDEX;
14,357,573✔
2243
      if (pTagIndexCond) {
14,357,217✔
2244
        void* pIndex = pStorageAPI->metaFn.getInvertIndex(pVnode);
4,010,769✔
2245

2246
        SIndexMetaArg metaArg = {.metaEx = pVnode,
4,010,793✔
2247
                                 .idx = pStorageAPI->metaFn.storeGetIndexInfo(pVnode),
4,010,769✔
2248
                                 .ivtIdx = pIndex,
2249
                                 .suid = pScanNode->uid};
4,010,769✔
2250
        code = doFilterTag(pTagIndexCond, &metaArg, pUidList, &status, &pStorageAPI->metaFilter);
4,010,769✔
2251
        if (code != 0 || status == SFLT_NOT_INDEX) {  // temporarily disable it for performance sake
3,998,774✔
2252
          qDebug("failed to get tableIds from index, suid:%" PRIu64 ", uidListSize:%d", pScanNode->uid, (int32_t)taosArrayGetSize(pUidList));
1,083,661✔
2253
        } else {
2254
          qDebug("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
2,915,113✔
2255
        }
2256
      }
2257
    }
2258
    qTrace("after index filter, pTagCond:%p uidListSize:%d", pTagCond, (int32_t)taosArrayGetSize(pUidList));
120,193,263✔
2259
    code = doFilterByTagCond(pScanNode->suid, pUidList, pTagCond, pVnode, status, pStorageAPI, pStreamInfo);
120,193,263✔
2260
    QUERY_CHECK_CODE(code, lino, end);
120,209,775✔
2261

2262
    // let's add the filter results into meta-cache
2263
    numOfTables = taosArrayGetSize(pUidList);
120,208,841✔
2264

2265
    if (canCacheTagEqCondFilter) {
120,170,535✔
2266
      qInfo("%s, suid:%" PRIu64 ", add uid list to stable tag filter cache, "
4,816✔
2267
            "uidListSize:%d, origin key:%" PRIu64 ",%" PRIu64,
2268
            idstr, pScanNode->suid, (int32_t)numOfTables,
2269
            *(uint64_t*)context.digest, *(uint64_t*)(context.digest + 8));
2270

2271
      code = pStorageAPI->metaFn.putStableCachedTableList(
4,816✔
2272
        pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
2273
        context.digest, tListLen(context.digest),
2274
        pUidList, &pTagColIds);
2275
      QUERY_CHECK_CODE(code, lino, end);
4,816✔
2276

2277
      if (digest != NULL) {
4,816✔
2278
        digest[0] = 1;
4,816✔
2279
        memcpy(digest + 1, context.digest, tListLen(context.digest));
4,816✔
2280
      }
2281
    } else if (tsTagFilterCache) {
120,165,719✔
2282
      qInfo("%s, suid:%" PRIu64 ", add uid list to normal tag filter cache, "
8,342✔
2283
            "uidListSize:%d, origin key:%" PRIu64 ",%" PRIu64,
2284
            idstr, pScanNode->suid, (int32_t)numOfTables,
2285
            *(uint64_t*)context.digest, *(uint64_t*)(context.digest + 8));
2286
      size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
8,342✔
2287
      pPayload = taosMemoryMalloc(size);
8,342✔
2288
      QUERY_CHECK_NULL(pPayload, code, lino, end, terrno);
8,342✔
2289

2290
      *(int32_t*)pPayload = (int32_t)numOfTables;
8,342✔
2291
      if (numOfTables > 0) {
8,342✔
2292
        void* tmp = taosArrayGet(pUidList, 0);
6,622✔
2293
        QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
6,622✔
2294
        memcpy(pPayload + sizeof(int32_t), tmp, numOfTables * sizeof(uint64_t));
6,622✔
2295
      }
2296

2297
      code = pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid,
8,342✔
2298
                                                    context.digest,
2299
                                                    tListLen(context.digest),
2300
                                                    pPayload, size, 1);
2301
      if (TSDB_CODE_SUCCESS == code) {
8,342✔
2302
        /*
2303
          data referenced by pPayload is used in lru cache,
2304
          reset pPayload to NULL to avoid being freed in _error block
2305
        */
2306
        pPayload = NULL;
8,170✔
2307
      } else {
2308
        if (TSDB_CODE_DUP_KEY == code) {
172✔
2309
          /*
2310
            another thread has already put the same key into cache,
2311
            we can just ignore this error
2312
          */
2313
          code = TSDB_CODE_SUCCESS;
172✔
2314
        }
2315
        QUERY_CHECK_CODE(code, lino, end);
172✔
2316
      }
2317

2318

2319
      if (digest != NULL) {
8,342✔
2320
        digest[0] = 1;
8,342✔
2321
        memcpy(digest + 1, context.digest, tListLen(context.digest));
8,342✔
2322
      }
2323
    }
2324
  }
2325

2326
  qDebug("%s, table list with %d uids built", idstr, (int32_t)numOfTables);
263,615,679✔
2327

2328
end:
263,697,637✔
2329
  if (code == 0) {
263,713,422✔
2330
     STableKeyInfo info = {.uid = 0, .groupId = 0};
263,712,488✔
2331
    for (int32_t i = 0; i < taosArrayGetSize(pUidList); ++i) {
931,875,483✔
2332
      int64_t* uid = (int64_t*)taosArrayGet(pUidList, i);
668,058,564✔
2333
      QUERY_CHECK_NULL(uid, code, lino, end, terrno);
668,111,591✔
2334

2335
      info.uid = *uid;
668,111,591✔
2336
        //qInfo("doSetQualifiedUid row:%d added to pTableList", i);
2337
      void* p = taosArrayPush(pListInfo->pTableList, &info);
668,115,697✔
2338
      QUERY_CHECK_NULL(p, code, lino, end, terrno);
668,169,825✔
2339
    }
2340
  } else {
2341
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
934✔
2342
  }
2343
  taosArrayDestroy(pUidList);
263,751,055✔
2344
  taosArrayDestroy(pTagColIds);
263,655,391✔
2345
  taosMemFreeClear(pTagCondKey);
263,678,004✔
2346
  taosMemFreeClear(pPayload);
263,678,004✔
2347
  return code;
263,678,004✔
2348
}
2349

2350
int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList, void* pTaskInfo) {
21,823✔
2351
  int32_t        code = TSDB_CODE_SUCCESS;
21,823✔
2352
  int32_t        lino = 0;
21,823✔
2353
  SSubplan*      pSubplan = (SSubplan*)node;
21,823✔
2354
  SScanPhysiNode pNode = {0};
21,823✔
2355
  pNode.suid = suid;
21,823✔
2356
  pNode.uid = suid;
21,823✔
2357
  pNode.tableType = TSDB_SUPER_TABLE;
21,823✔
2358

2359
  STableListInfo* pTableListInfo = tableListCreate();
21,823✔
2360
  QUERY_CHECK_NULL(pTableListInfo, code, lino, _end, terrno);
21,823✔
2361
  code = getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL,
21,823✔
2362
                      pTableListInfo, NULL, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI, NULL);
2363
  QUERY_CHECK_CODE(code, lino, _end);
21,823✔
2364
  *tableList = pTableListInfo->pTableList;
21,823✔
2365
  pTableListInfo->pTableList = NULL;
21,823✔
2366

2367
_end:
21,823✔
2368
  tableListDestroy(pTableListInfo);
21,823✔
2369
  if (code != TSDB_CODE_SUCCESS) {
21,823✔
UNCOV
2370
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2371
  }
2372
  return code;
21,823✔
2373
}
2374

2375
size_t getTableTagsBufLen(const SNodeList* pGroups) {
×
UNCOV
2376
  size_t keyLen = 0;
×
2377

2378
  SNode* node;
2379
  FOREACH(node, pGroups) {
×
2380
    SExprNode* pExpr = (SExprNode*)node;
×
UNCOV
2381
    keyLen += pExpr->resType.bytes;
×
2382
  }
2383

2384
  keyLen += sizeof(int8_t) * LIST_LENGTH(pGroups);
×
UNCOV
2385
  return keyLen;
×
2386
}
2387

UNCOV
2388
int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId,
×
2389
                              SStorageAPI* pAPI) {
UNCOV
2390
  SMetaReader mr = {0};
×
2391

2392
  pAPI->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &pAPI->metaFn);
×
2393
  if (pAPI->metaReaderFn.getEntryGetUidCache(&mr, uid) != 0) {  // table not exist
×
2394
    pAPI->metaReaderFn.clearReader(&mr);
×
UNCOV
2395
    return TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
2396
  }
2397

2398
  SNodeList* groupNew = NULL;
×
2399
  int32_t    code = nodesCloneList(pGroupNode, &groupNew);
×
2400
  if (TSDB_CODE_SUCCESS != code) {
×
2401
    pAPI->metaReaderFn.clearReader(&mr);
×
UNCOV
2402
    return code;
×
2403
  }
2404

2405
  STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
×
2406
  nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &ctx);
×
2407
  if (TSDB_CODE_SUCCESS != ctx.code) {
×
2408
    nodesDestroyList(groupNew);
×
2409
    pAPI->metaReaderFn.clearReader(&mr);
×
UNCOV
2410
    return code;
×
2411
  }
2412
  char* isNull = (char*)keyBuf;
×
UNCOV
2413
  char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(pGroupNode);
×
2414

2415
  SNode*  pNode;
2416
  int32_t index = 0;
×
2417
  FOREACH(pNode, groupNew) {
×
2418
    SNode*  pNew = NULL;
×
2419
    int32_t code = scalarCalculateConstants(pNode, &pNew);
×
2420
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
2421
      REPLACE_NODE(pNew);
×
2422
    } else {
2423
      nodesDestroyList(groupNew);
×
2424
      pAPI->metaReaderFn.clearReader(&mr);
×
UNCOV
2425
      return code;
×
2426
    }
2427

2428
    if (nodeType(pNew) != QUERY_NODE_VALUE) {
×
2429
      nodesDestroyList(groupNew);
×
2430
      pAPI->metaReaderFn.clearReader(&mr);
×
2431
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
UNCOV
2432
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2433
    }
UNCOV
2434
    SValueNode* pValue = (SValueNode*)pNew;
×
2435

2436
    if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
×
2437
      isNull[index++] = 1;
×
UNCOV
2438
      continue;
×
2439
    } else {
2440
      isNull[index++] = 0;
×
2441
      char* data = nodesGetValueFromNode(pValue);
×
2442
      if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
×
2443
        if (tTagIsJson(data)) {
×
2444
          terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
×
2445
          nodesDestroyList(groupNew);
×
2446
          pAPI->metaReaderFn.clearReader(&mr);
×
UNCOV
2447
          return terrno;
×
2448
        }
2449
        int32_t len = getJsonValueLen(data);
×
2450
        memcpy(pStart, data, len);
×
2451
        pStart += len;
×
2452
      } else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
×
2453
        if (IS_STR_DATA_BLOB(pValue->node.resType.type)) {
×
UNCOV
2454
          return TSDB_CODE_BLOB_NOT_SUPPORT_TAG;
×
2455
        }
2456
        memcpy(pStart, data, varDataTLen(data));
×
UNCOV
2457
        pStart += varDataTLen(data);
×
2458
      } else {
2459
        memcpy(pStart, data, pValue->node.resType.bytes);
×
UNCOV
2460
        pStart += pValue->node.resType.bytes;
×
2461
      }
2462
    }
2463
  }
2464

2465
  int32_t len = (int32_t)(pStart - (char*)keyBuf);
×
UNCOV
2466
  *pGroupId = calcGroupId(keyBuf, len);
×
2467

2468
  nodesDestroyList(groupNew);
×
UNCOV
2469
  pAPI->metaReaderFn.clearReader(&mr);
×
2470

UNCOV
2471
  return TSDB_CODE_SUCCESS;
×
2472
}
2473

2474
SArray* makeColumnArrayFromList(SNodeList* pNodeList) {
3,704,470✔
2475
  if (!pNodeList) {
3,704,470✔
UNCOV
2476
    return NULL;
×
2477
  }
2478

2479
  size_t  numOfCols = LIST_LENGTH(pNodeList);
3,704,470✔
2480
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
3,706,122✔
2481
  if (pList == NULL) {
3,699,943✔
UNCOV
2482
    return NULL;
×
2483
  }
2484

2485
  for (int32_t i = 0; i < numOfCols; ++i) {
8,299,670✔
2486
    SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
4,603,443✔
2487
    if (!pColNode) {
4,604,116✔
2488
      taosArrayDestroy(pList);
×
2489
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
UNCOV
2490
      return NULL;
×
2491
    }
2492

2493
    // todo extract method
2494
    SColumn c = {0};
4,604,116✔
2495
    c.slotId = pColNode->slotId;
4,604,848✔
2496
    c.colId = pColNode->colId;
4,604,458✔
2497
    c.type = pColNode->node.resType.type;
4,605,291✔
2498
    c.bytes = pColNode->node.resType.bytes;
4,602,939✔
2499
    c.precision = pColNode->node.resType.precision;
4,602,125✔
2500
    c.scale = pColNode->node.resType.scale;
4,602,809✔
2501

2502
    void* tmp = taosArrayPush(pList, &c);
4,600,371✔
2503
    if (!tmp) {
4,600,371✔
2504
      taosArrayDestroy(pList);
×
2505
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
2506
      return NULL;
×
2507
    }
2508
  }
2509

2510
  return pList;
3,696,227✔
2511
}
2512

2513
int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
334,026,521✔
2514
                            int32_t type, SColMatchInfo* pMatchInfo) {
2515
  size_t  numOfCols = LIST_LENGTH(pNodeList);
334,026,521✔
2516
  int32_t code = TSDB_CODE_SUCCESS;
334,064,742✔
2517
  int32_t lino = 0;
334,064,742✔
2518
  bool    colIdOrdered = true;
334,064,742✔
2519
  bool    hasPrevColId = false;
334,064,742✔
2520
  int32_t prevColId = 0;
334,064,742✔
2521

2522
  pMatchInfo->matchType = type;
334,064,742✔
2523

2524
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchItem));
334,038,347✔
2525
  if (pList == NULL) {
333,947,341✔
2526
    code = terrno;
×
UNCOV
2527
    return code;
×
2528
  }
2529

2530
  SColMatchItem** infoBySlot = NULL;
333,947,341✔
2531
  if (numOfCols > 0) {
333,947,341✔
2532
    infoBySlot = taosMemoryCalloc(numOfCols, sizeof(*infoBySlot));
333,858,856✔
2533
    if (infoBySlot == NULL) {
333,896,212✔
2534
      code = terrno;
×
UNCOV
2535
      goto _end;
×
2536
    }
2537
  }
2538

2539
  SNode* node = NULL;
333,984,697✔
2540
  FOREACH(node, pNodeList) {
1,615,315,334✔
2541
    STargetNode* pNode = (STargetNode*)node;
1,281,230,833✔
2542
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
1,281,230,833✔
2543
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
1,281,230,833✔
2544
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
1,269,036,520✔
2545

2546
      SColMatchItem c = {.needOutput = true};
1,268,999,902✔
2547
      c.colId = pColNode->colId;
1,269,043,740✔
2548
      c.srcSlotId = pColNode->slotId;
1,269,049,781✔
2549
      c.dstSlotId = pNode->slotId;
1,269,003,205✔
2550
      c.isPk = pColNode->isPk;
1,268,947,481✔
2551
      c.dataType = pColNode->node.resType;
1,269,035,217✔
2552
      void* tmp = taosArrayPush(pList, &c);
1,269,103,326✔
2553
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,269,103,326✔
2554
      if (hasPrevColId) {
1,269,103,326✔
2555
        if (c.colId < prevColId) {
935,552,703✔
2556
          colIdOrdered = false;
14,798,386✔
2557
        }
2558
      } else {
2559
        hasPrevColId = true;
333,550,623✔
2560
      }
2561
      prevColId = c.colId;
1,269,103,326✔
2562
      if (pNode->slotId >= 0 && pNode->slotId < numOfCols) {
1,269,103,326✔
2563
        infoBySlot[pNode->slotId] = (SColMatchItem*)taosArrayGet(pList, taosArrayGetSize(pList) - 1);
1,268,622,110✔
2564
      }
2565
    }
2566
  }
2567
  // set the output flag for each column in SColMatchInfo, according to the
2568
  *numOfOutputCols = 0;
334,086,878✔
2569
  SNode* slotNode = NULL;
334,093,026✔
2570
  FOREACH(slotNode, pOutputNodeList->pSlots) {
1,795,701,280✔
2571
    SSlotDescNode* pNode = (SSlotDescNode*)slotNode;
1,461,681,053✔
2572
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
1,461,681,053✔
2573

2574
    // todo: add reserve flag check
2575
    // it is a column reserved for the arithmetic expression calculation
2576
    if (pNode->slotId >= numOfCols) {
1,461,681,053✔
2577
      (*numOfOutputCols) += 1;
180,715,110✔
2578
      continue;
180,726,438✔
2579
    }
2580

2581
    SColMatchItem* info = infoBySlot ? infoBySlot[pNode->slotId] : NULL;
1,281,078,961✔
2582

2583
    if (pNode->output) {
1,281,092,217✔
2584
      (*numOfOutputCols) += 1;
1,263,645,541✔
2585
    } else if (info != NULL) {
17,486,232✔
2586
      // select distinct tbname from stb where tbname='abc';
2587
      info->needOutput = false;
17,437,450✔
2588
    }
2589
  }
2590

2591
  pMatchInfo->pList = pList;
334,028,977✔
2592
  pMatchInfo->colIdOrdered = colIdOrdered;
334,033,015✔
2593

2594
_end:
333,985,514✔
2595
  taosMemoryFree(infoBySlot);
333,985,514✔
2596
  if (code != TSDB_CODE_SUCCESS) {
333,964,298✔
UNCOV
2597
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2598
  }
2599
  return code;
333,959,468✔
2600
}
2601

2602
static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision,
1,202,607,451✔
2603
                                  const char* name) {
2604
  SResSchema s = {0};
1,202,607,451✔
2605
  s.scale = scale;
1,202,686,865✔
2606
  s.type = type;
1,202,686,865✔
2607
  s.bytes = bytes;
1,202,686,865✔
2608
  s.slotId = slotId;
1,202,686,865✔
2609
  s.precision = precision;
1,202,686,865✔
2610
  tstrncpy(s.name, name, tListLen(s.name));
1,202,686,865✔
2611

2612
  return s;
1,202,686,865✔
2613
}
2614

2615
static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType, EColumnType colType) {
1,144,797,590✔
2616
  SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn));
1,144,797,590✔
2617
  if (pCol == NULL) {
1,144,229,159✔
UNCOV
2618
    return NULL;
×
2619
  }
2620

2621
  pCol->slotId = slotId;
1,144,229,159✔
2622
  pCol->colId = colId;
1,144,269,234✔
2623
  pCol->bytes = pType->bytes;
1,144,331,526✔
2624
  pCol->type = pType->type;
1,144,376,210✔
2625
  pCol->scale = pType->scale;
1,144,595,096✔
2626
  pCol->precision = pType->precision;
1,144,555,048✔
2627
  pCol->dataBlockId = blockId;
1,144,817,002✔
2628
  pCol->colType = colType;
1,144,786,011✔
2629
  return pCol;
1,144,764,550✔
2630
}
2631

2632
int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
1,203,032,882✔
2633
  int32_t code = TSDB_CODE_SUCCESS;
1,203,032,882✔
2634
  int32_t lino = 0;
1,203,032,882✔
2635
  pExp->base.numOfParams = 0;
1,203,032,882✔
2636
  pExp->base.pParam = NULL;
1,203,123,801✔
2637
  pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode));
1,203,040,374✔
2638
  QUERY_CHECK_NULL(pExp->pExpr, code, lino, _end, terrno);
1,202,377,139✔
2639

2640
  pExp->pExpr->_function.num = 1;
1,202,508,413✔
2641
  pExp->pExpr->_function.functionId = -1;
1,202,508,351✔
2642

2643
  int32_t type = nodeType(pNode);
1,202,670,111✔
2644
  // it is a project query, or group by column
2645
  if (type == QUERY_NODE_COLUMN) {
1,202,923,042✔
2646
    pExp->pExpr->nodeType = QUERY_NODE_COLUMN;
684,173,137✔
2647
    SColumnNode* pColNode = (SColumnNode*)pNode;
684,205,859✔
2648

2649
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
684,205,859✔
2650
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
684,087,326✔
2651

2652
    pExp->base.numOfParams = 1;
684,095,777✔
2653

2654
    SDataType* pType = &pColNode->node.resType;
684,106,961✔
2655
    pExp->base.resSchema =
2656
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pColNode->colName);
684,133,575✔
2657

2658
    pExp->base.pParam[0].pCol =
1,368,379,299✔
2659
        createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType);
1,368,307,059✔
2660
    QUERY_CHECK_NULL(pExp->base.pParam[0].pCol, code, lino, _end, terrno);
684,212,205✔
2661

2662
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
684,068,245✔
2663
  } else if (type == QUERY_NODE_VALUE) {
518,749,905✔
2664
    pExp->pExpr->nodeType = QUERY_NODE_VALUE;
17,656,087✔
2665
    SValueNode* pValNode = (SValueNode*)pNode;
17,658,190✔
2666

2667
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
17,658,190✔
2668
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
17,646,145✔
2669

2670
    pExp->base.numOfParams = 1;
17,650,422✔
2671

2672
    SDataType* pType = &pValNode->node.resType;
17,653,417✔
2673
    pExp->base.resSchema =
2674
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName);
17,647,003✔
2675
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
17,651,357✔
2676
    code = nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
17,649,251✔
2677
    QUERY_CHECK_CODE(code, lino, _end);
17,653,088✔
2678
  } else if (type == QUERY_NODE_REMOTE_VALUE) {
501,093,818✔
2679
    SRemoteValueNode* pRemote = (SRemoteValueNode*)pNode;
1,386,443✔
2680
    code = qFetchRemoteNode(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, pNode);
1,386,443✔
2681
    QUERY_CHECK_CODE(code, lino, _end);
1,388,575✔
2682

2683
    pExp->pExpr->nodeType = QUERY_NODE_VALUE;
977,632✔
2684
    SValueNode* pValNode = (SValueNode*)pNode;
977,632✔
2685

2686
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
977,632✔
2687
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
977,632✔
2688

2689
    pExp->base.numOfParams = 1;
977,632✔
2690

2691
    SDataType* pType = &pValNode->node.resType;
977,632✔
2692
    pExp->base.resSchema =
2693
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pValNode->node.aliasName);
977,632✔
2694
    pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
977,632✔
2695
    code = nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
977,632✔
2696
    QUERY_CHECK_CODE(code, lino, _end);
977,632✔
2697
  } else if (type == QUERY_NODE_FUNCTION) {
499,707,375✔
2698
    pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
460,810,417✔
2699
    SFunctionNode* pFuncNode = (SFunctionNode*)pNode;
460,824,363✔
2700

2701
    SDataType* pType = &pFuncNode->node.resType;
460,824,363✔
2702
    pExp->base.resSchema =
2703
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pFuncNode->node.aliasName);
460,818,704✔
2704
    tExprNode* pExprNode = pExp->pExpr;
460,822,658✔
2705

2706
    pExprNode->_function.functionId = pFuncNode->funcId;
460,814,886✔
2707
    pExprNode->_function.pFunctNode = pFuncNode;
460,825,400✔
2708
    pExprNode->_function.functionType = pFuncNode->funcType;
460,881,177✔
2709

2710
    tstrncpy(pExprNode->_function.functionName, pFuncNode->functionName, tListLen(pExprNode->_function.functionName));
460,834,754✔
2711

2712
    pExp->base.pParamList = pFuncNode->pParameterList;
460,852,633✔
2713
#if 1
2714
    // todo refactor: add the parameter for tbname function
2715
    const char* name = "tbname";
460,862,062✔
2716
    int32_t     len = strlen(name);
460,862,062✔
2717

2718
    if (!pFuncNode->pParameterList && (memcmp(pExprNode->_function.functionName, name, len) == 0) &&
460,862,062✔
2719
        pExprNode->_function.functionName[len] == 0) {
30,442,840✔
2720
      pFuncNode->pParameterList = NULL;
30,429,329✔
2721
      int32_t     code = nodesMakeList(&pFuncNode->pParameterList);
30,442,462✔
2722
      SValueNode* res = NULL;
30,444,429✔
2723
      if (TSDB_CODE_SUCCESS == code) {
30,445,979✔
2724
        code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&res);
30,445,496✔
2725
      }
2726
      QUERY_CHECK_CODE(code, lino, _end);
30,447,810✔
2727
      res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
30,447,810✔
2728
      code = nodesListAppend(pFuncNode->pParameterList, (SNode*)res);
30,436,985✔
2729
      if (code != TSDB_CODE_SUCCESS) {
30,446,253✔
2730
        nodesDestroyNode((SNode*)res);
×
UNCOV
2731
        res = NULL;
×
2732
      }
2733
      QUERY_CHECK_CODE(code, lino, _end);
30,446,253✔
2734
    }
2735
#endif
2736

2737
    int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
460,929,894✔
2738

2739
    pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam));
460,867,054✔
2740
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
460,722,189✔
2741
    pExp->base.numOfParams = numOfParam;
460,721,846✔
2742

2743
    for (int32_t j = 0; j < numOfParam && TSDB_CODE_SUCCESS == code; ++j) {
1,089,802,791✔
2744
      SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
629,069,788✔
2745
      QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
629,107,227✔
2746
      if (p1->type == QUERY_NODE_COLUMN) {
629,107,227✔
2747
        SColumnNode* pcn = (SColumnNode*)p1;
460,538,867✔
2748

2749
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
460,538,867✔
2750
        pExp->base.pParam[j].pCol =
921,041,315✔
2751
            createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType);
921,106,694✔
2752
        QUERY_CHECK_NULL(pExp->base.pParam[j].pCol, code, lino, _end, terrno);
460,530,890✔
2753
      } else if (p1->type == QUERY_NODE_VALUE) {
168,556,770✔
2754
        SValueNode* pvn = (SValueNode*)p1;
113,376,077✔
2755
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
113,376,077✔
2756
        code = nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
113,368,309✔
2757
        QUERY_CHECK_CODE(code, lino, _end);
113,344,006✔
2758
      } else if (p1->type == QUERY_NODE_REMOTE_VALUE) {
55,248,725✔
2759
        SRemoteValueNode* pRemote = (SRemoteValueNode*)p1;
71,422✔
2760
        code = qFetchRemoteNode(gTaskScalarExtra.pSubJobCtx, pRemote->subQIdx, p1);
71,422✔
2761
        QUERY_CHECK_CODE(code, lino, _end);
71,422✔
2762

2763
        SValueNode* pvn = (SValueNode*)pRemote;
51,701✔
2764
        pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
51,701✔
2765
        code = nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
51,701✔
2766
        QUERY_CHECK_CODE(code, lino, _end);
606✔
2767
      }
2768
    }
2769
    pExp->pExpr->_function.bindExprID = ((SExprNode*)pNode)->bindExprID;
460,733,003✔
2770
  } else if (type == QUERY_NODE_OPERATOR) {
38,896,990✔
2771
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
37,527,100✔
2772
    SOperatorNode* pOpNode = (SOperatorNode*)pNode;
37,518,092✔
2773

2774
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
37,518,092✔
2775
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
37,506,338✔
2776
    pExp->base.numOfParams = 1;
37,514,706✔
2777

2778
    SDataType* pType = &pOpNode->node.resType;
37,515,521✔
2779
    pExp->base.resSchema =
2780
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pOpNode->node.aliasName);
37,502,749✔
2781
    pExp->pExpr->_optrRoot.pRootNode = pNode;
37,514,169✔
2782
  } else if (type == QUERY_NODE_CASE_WHEN) {
1,370,573✔
2783
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
1,292,511✔
2784
    SCaseWhenNode* pCaseNode = (SCaseWhenNode*)pNode;
1,292,511✔
2785

2786
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
1,292,511✔
2787
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
1,292,067✔
2788
    pExp->base.numOfParams = 1;
1,292,511✔
2789

2790
    SDataType* pType = &pCaseNode->node.resType;
1,292,511✔
2791
    pExp->base.resSchema =
2792
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCaseNode->node.aliasName);
1,292,511✔
2793
    pExp->pExpr->_optrRoot.pRootNode = pNode;
1,291,997✔
2794
  } else if (type == QUERY_NODE_LOGIC_CONDITION) {
78,322✔
2795
    pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
101,680✔
2796
    SLogicConditionNode* pCond = (SLogicConditionNode*)pNode;
101,680✔
2797
    pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
101,680✔
2798
    QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
101,680✔
2799
    pExp->base.numOfParams = 1;
101,680✔
2800
    SDataType* pType = &pCond->node.resType;
101,680✔
2801
    pExp->base.resSchema =
2802
        createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName);
101,680✔
2803
    pExp->pExpr->_optrRoot.pRootNode = pNode;
101,680✔
2804
  } else {
2805
    code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
32✔
2806
    QUERY_CHECK_CODE(code, lino, _end);
32✔
2807
  }
2808
  pExp->pExpr->relatedTo = ((SExprNode*)pNode)->relatedTo;
1,202,489,560✔
2809
_end:
1,202,932,358✔
2810
  if (code != TSDB_CODE_SUCCESS) {
1,202,932,358✔
2811
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
430,664✔
2812
  }
2813
  return code;
1,202,954,197✔
2814
}
2815

2816
int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) {
1,202,895,107✔
2817
  return createExprFromOneNode(pExp, pTargetNode->pExpr, pTargetNode->slotId);
1,202,895,107✔
2818
}
2819

2820
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
×
2821
  *numOfExprs = LIST_LENGTH(pNodeList);
×
2822
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
×
2823
  if (!pExprs) {
×
UNCOV
2824
    return NULL;
×
2825
  }
2826

2827
  for (int32_t i = 0; i < (*numOfExprs); ++i) {
×
2828
    SExprInfo* pExp = &pExprs[i];
×
2829
    int32_t    code = createExprFromOneNode(pExp, nodesListGetNode(pNodeList, i), i + UD_TAG_COLUMN_INDEX);
×
2830
    if (code != TSDB_CODE_SUCCESS) {
×
2831
      taosMemoryFreeClear(pExprs);
×
2832
      terrno = code;
×
2833
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
2834
      return NULL;
×
2835
    }
2836
  }
2837

UNCOV
2838
  return pExprs;
×
2839
}
2840

2841
int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs) {
442,153,985✔
2842
  QRY_PARAM_CHECK(pExprInfo);
442,153,985✔
2843

2844
  int32_t code = 0;
442,220,406✔
2845
  int32_t lino = 0;
442,220,406✔
2846
  int32_t numOfFuncs = LIST_LENGTH(pNodeList);
442,220,406✔
2847
  int32_t numOfGroupKeys = 0;
442,155,664✔
2848
  if (pGroupKeys != NULL) {
442,155,664✔
2849
    numOfGroupKeys = LIST_LENGTH(pGroupKeys);
30,794,175✔
2850
  }
2851

2852
  *numOfExprs = numOfFuncs + numOfGroupKeys;
442,155,663✔
2853
  if (*numOfExprs == 0) {
442,207,249✔
2854
    return code;
38,054,518✔
2855
  }
2856

2857
  SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
404,164,843✔
2858
  QUERY_CHECK_NULL(pExprs, code, lino, _return, terrno);
403,889,145✔
2859

2860
  int32_t i = 0;
403,889,145✔
2861
  if (pNodeList != NULL) {
403,889,145✔
2862
    SNode* node = NULL;
377,000,309✔
2863
    FOREACH(node, pNodeList) {
1,528,317,325✔
2864
      STargetNode* pTargetNode = (STargetNode*)node;
1,151,732,669✔
2865
      SExprInfo*   pExp = &pExprs[i++];
1,151,732,669✔
2866
      code = createExprFromTargetNode(pExp, pTargetNode);
1,151,654,503✔
2867
      QUERY_CHECK_CODE(code, lino, _return);
1,151,747,680✔
2868
    }
2869
  }
2870

2871
  if (pGroupKeys != NULL) {
403,770,196✔
2872
    SNode* node = NULL;
30,793,124✔
2873
    FOREACH(node, pGroupKeys) {
81,913,629✔
2874
      STargetNode* pTargetNode = (STargetNode*)node;
51,124,021✔
2875
      SExprInfo*   pExp = &pExprs[i++];
51,124,021✔
2876
      code = createExprFromTargetNode(pExp, pTargetNode);
51,119,901✔
2877
      QUERY_CHECK_CODE(code, lino, _return);
51,120,505✔
2878
    }
2879
  }
2880

2881
  *pExprInfo = pExprs;
403,770,547✔
2882
  return code;
403,687,903✔
2883

2884
_return:
430,664✔
2885
  qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
430,664✔
2886
  destroyExprInfo(pExprs, *numOfExprs);
430,664✔
2887
  taosMemoryFreeClear(pExprs);
430,664✔
2888
  return code;
430,664✔
2889
}
2890

2891
static void deleteSubsidiareCtx(void* pData) {
×
2892
  SSubsidiaryResInfo* pCtx = (SSubsidiaryResInfo*)pData;
×
2893
  if (pCtx->pCtx) {
×
UNCOV
2894
    taosMemoryFreeClear(pCtx->pCtx);
×
2895
  }
UNCOV
2896
}
×
2897

2898
// set the output buffer for the selectivity + tag query
2899
static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
431,019,695✔
2900
  int32_t num = 0;
431,019,695✔
2901
  int32_t code = TSDB_CODE_SUCCESS;
431,019,695✔
2902
  int32_t lino = 0;
431,019,695✔
2903

2904
  SArray* pValCtxArray = NULL;
431,019,695✔
2905
  for (int32_t i = numOfOutput - 1; i > 0; --i) {  // select Func is at the end of the list
1,220,352,462✔
2906
    int32_t funcIdx = pCtx[i].pExpr->pExpr->_function.bindExprID;
789,427,022✔
2907
    if (funcIdx > 0) {
789,463,685✔
2908
      if (pValCtxArray == NULL) {
1,514,975✔
2909
        // the end of the list is the select function of biggest index
2910
        pValCtxArray = taosArrayInit_s(sizeof(SSubsidiaryResInfo*), funcIdx);
1,086,393✔
2911
        if (pValCtxArray == NULL) {
1,086,805✔
UNCOV
2912
          return terrno;
×
2913
        }
2914
      }
2915
      if (funcIdx > pValCtxArray->size) {
1,515,387✔
2916
        qError("funcIdx:%d is out of range", funcIdx);
×
2917
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
UNCOV
2918
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2919
      }
2920
      SSubsidiaryResInfo* pSubsidiary = &pCtx[i].subsidiaries;
1,514,975✔
2921
      pSubsidiary->pCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
1,515,387✔
2922
      if (pSubsidiary->pCtx == NULL) {
1,514,975✔
2923
        taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
UNCOV
2924
        return terrno;
×
2925
      }
2926
      pSubsidiary->num = 0;
1,514,975✔
2927
      taosArraySet(pValCtxArray, funcIdx - 1, &pSubsidiary);
1,514,563✔
2928
    }
2929
  }
2930

2931
  SqlFunctionCtx*  p = NULL;
430,925,440✔
2932
  SqlFunctionCtx** pValCtx = NULL;
430,925,440✔
2933
  if (pValCtxArray == NULL) {
430,925,440✔
2934
    pValCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
429,860,754✔
2935
    if (pValCtx == NULL) {
429,832,314✔
UNCOV
2936
      QUERY_CHECK_CODE(terrno, lino, _end);
×
2937
    }
2938
  }
2939

2940
  for (int32_t i = 0; i < numOfOutput; ++i) {
1,619,091,157✔
2941
    const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
1,188,181,644✔
2942
    if ((strcmp(pName, "_select_value") == 0)) {
1,188,370,369✔
2943
      if (pValCtxArray == NULL) {
9,131,214✔
2944
        pValCtx[num++] = &pCtx[i];
7,003,683✔
2945
      } else {
2946
        int32_t bindFuncIndex = pCtx[i].pExpr->pExpr->relatedTo;  // start from index 1;
2,127,567✔
2947
        if (bindFuncIndex > 0) {                                  // 0 is default index related to the select function
2,128,581✔
2948
          bindFuncIndex -= 1;
2,078,729✔
2949
        }
2950
        SSubsidiaryResInfo** pSubsidiary = taosArrayGet(pValCtxArray, bindFuncIndex);
2,128,581✔
2951
        if (pSubsidiary == NULL) {
2,127,757✔
UNCOV
2952
          QUERY_CHECK_CODE(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, lino, _end);
×
2953
        }
2954
        (*pSubsidiary)->pCtx[(*pSubsidiary)->num] = &pCtx[i];
2,127,757✔
2955
        (*pSubsidiary)->num++;
2,128,581✔
2956
      }
2957
    } else if (fmIsSelectFunc(pCtx[i].functionId)) {
1,179,239,155✔
2958
      if (pValCtxArray == NULL) {
121,473,106✔
2959
        p = &pCtx[i];
119,625,245✔
2960
      }
2961
    }
2962
  }
2963

2964
  if (p != NULL) {
430,909,513✔
2965
    p->subsidiaries.pCtx = pValCtx;
50,451,336✔
2966
    p->subsidiaries.num = num;
50,451,692✔
2967
  } else {
2968
    taosMemoryFreeClear(pValCtx);
380,458,177✔
2969
  }
2970

2971
_end:
1,100,063✔
2972
  if (code != TSDB_CODE_SUCCESS) {
430,858,038✔
2973
    taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
×
2974
    taosMemoryFreeClear(pValCtx);
×
UNCOV
2975
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2976
  } else {
2977
    taosArrayDestroy(pValCtxArray);
430,858,038✔
2978
  }
2979
  return code;
430,936,686✔
2980
}
2981

2982
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset,
431,003,706✔
2983
                                     SFunctionStateStore* pStore) {
2984
  int32_t         code = TSDB_CODE_SUCCESS;
431,003,706✔
2985
  int32_t         lino = 0;
431,003,706✔
2986
  SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
431,003,706✔
2987
  if (pFuncCtx == NULL) {
430,733,456✔
UNCOV
2988
    return NULL;
×
2989
  }
2990

2991
  *rowEntryInfoOffset = taosMemoryCalloc(numOfOutput, sizeof(int32_t));
430,733,456✔
2992
  if (*rowEntryInfoOffset == 0) {
430,959,295✔
2993
    taosMemoryFreeClear(pFuncCtx);
×
UNCOV
2994
    return NULL;
×
2995
  }
2996

2997
  for (int32_t i = 0; i < numOfOutput; ++i) {
1,619,298,889✔
2998
    SExprInfo* pExpr = &pExprInfo[i];
1,188,394,455✔
2999

3000
    SExprBasicInfo* pFunct = &pExpr->base;
1,188,307,885✔
3001
    SqlFunctionCtx* pCtx = &pFuncCtx[i];
1,188,351,572✔
3002

3003
    pCtx->functionId = -1;
1,188,356,399✔
3004
    pCtx->pExpr = pExpr;
1,188,402,118✔
3005

3006
    if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) {
1,188,415,402✔
3007
      SFuncExecEnv env = {0};
459,676,873✔
3008
      pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
459,695,447✔
3009
      pCtx->isPseudoFunc = fmIsWindowPseudoColumnFunc(pCtx->functionId) || fmIsPlaceHolderFunc(pCtx->functionId);
459,687,096✔
3010
      pCtx->isNotNullFunc = fmIsNotNullOutputFunc(pCtx->functionId);
459,657,706✔
3011

3012
      bool isUdaf = fmIsUserDefinedFunc(pCtx->functionId);
459,600,359✔
3013
      if (fmIsAggFunc(pCtx->functionId) || fmIsIndefiniteRowsFunc(pCtx->functionId)) {
772,256,053✔
3014
        if (!isUdaf) {
312,767,382✔
3015
          code = fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
312,732,716✔
3016
          QUERY_CHECK_CODE(code, lino, _end);
312,658,683✔
3017
        } else {
3018
          char* udfName = pExpr->pExpr->_function.pFunctNode->functionName;
34,666✔
3019
          pCtx->udfName = taosStrdup(udfName);
34,666✔
3020
          QUERY_CHECK_NULL(pCtx->udfName, code, lino, _end, terrno);
34,666✔
3021

3022
          code = fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
34,666✔
3023
          QUERY_CHECK_CODE(code, lino, _end);
34,666✔
3024
        }
3025
        bool tmp = pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
312,693,349✔
3026
        if (!tmp) {
312,687,843✔
UNCOV
3027
          code = terrno;
×
3028
          QUERY_CHECK_CODE(code, lino, _end);
1,161✔
3029
        }
3030
      } else {
3031
        if (fmIsPlaceHolderFunc(pCtx->functionId)) {
146,813,360✔
3032
          code = fmGetStreamPesudoFuncEnv(pCtx->functionId, pExpr->base.pParamList, &env);
6,154,992✔
3033
          QUERY_CHECK_CODE(code, lino, _end);
6,154,858✔
3034
        }      
3035
        
3036
        code = fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp);
146,845,018✔
3037
        if (code != TSDB_CODE_SUCCESS && isUdaf) {
146,851,177✔
3038
          code = TSDB_CODE_SUCCESS;
58,752✔
3039
        }
3040
        QUERY_CHECK_CODE(code, lino, _end);
146,851,177✔
3041

3042
        if (pCtx->sfp.getEnv != NULL) {
146,851,177✔
3043
          bool tmp = pCtx->sfp.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
26,274,488✔
3044
          if (!tmp) {
26,277,359✔
3045
            code = terrno;
×
UNCOV
3046
            QUERY_CHECK_CODE(code, lino, _end);
×
3047
          }
3048
        }
3049
      }
3050
      pCtx->resDataInfo.interBufSize = env.calcMemSize;
459,559,998✔
3051
    } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR ||
728,693,411✔
3052
               pExpr->pExpr->nodeType == QUERY_NODE_VALUE) {
18,616,976✔
3053
      // for simple column, the result buffer needs to hold at least one element.
3054
      pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes;
728,847,706✔
3055
    }
3056

3057
    pCtx->input.numOfInputCols = pFunct->numOfParams;
1,188,467,853✔
3058
    pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
1,188,290,049✔
3059
    QUERY_CHECK_NULL(pCtx->input.pData, code, lino, _end, terrno);
1,188,460,058✔
3060
    pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
1,188,311,406✔
3061
    QUERY_CHECK_NULL(pCtx->input.pColumnDataAgg, code, lino, _end, terrno);
1,188,432,994✔
3062

3063
    pCtx->pTsOutput = NULL;
1,188,262,240✔
3064
    pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
1,188,466,189✔
3065
    pCtx->resDataInfo.type = pFunct->resSchema.type;
1,188,402,071✔
3066
    pCtx->order = TSDB_ORDER_ASC;
1,188,432,339✔
3067
    pCtx->start.key = INT64_MIN;
1,188,464,101✔
3068
    pCtx->end.key = INT64_MIN;
1,188,404,011✔
3069
    pCtx->numOfParams = pExpr->base.numOfParams;
1,188,416,510✔
3070
    pCtx->param = pFunct->pParam;
1,188,553,189✔
3071
    pCtx->saveHandle.currentPage = -1;
1,188,359,293✔
3072
    pCtx->pStore = pStore;
1,188,532,174✔
3073
    pCtx->hasWindowOrGroup = false;
1,188,558,338✔
3074
    pCtx->needCleanup = false;
1,188,473,110✔
3075
    pCtx->skipDynDataCheck = false;
1,188,389,339✔
3076
  }
3077

3078
  for (int32_t i = 1; i < numOfOutput; ++i) {
1,220,495,731✔
3079
    (*rowEntryInfoOffset)[i] = (int32_t)((*rowEntryInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) +
1,578,918,854✔
3080
                                         pFuncCtx[i - 1].resDataInfo.interBufSize);
789,519,909✔
3081
  }
3082

3083
  code = setSelectValueColumnInfo(pFuncCtx, numOfOutput);
431,018,346✔
3084
  QUERY_CHECK_CODE(code, lino, _end);
430,958,274✔
3085

3086
_end:
430,958,274✔
3087
  if (code != TSDB_CODE_SUCCESS) {
430,891,534✔
3088
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3089
    for (int32_t i = 0; i < numOfOutput; ++i) {
×
3090
      taosMemoryFree(pFuncCtx[i].input.pData);
×
UNCOV
3091
      taosMemoryFree(pFuncCtx[i].input.pColumnDataAgg);
×
3092
    }
3093
    taosMemoryFreeClear(*rowEntryInfoOffset);
×
UNCOV
3094
    taosMemoryFreeClear(pFuncCtx);
×
3095

3096
    terrno = code;
×
UNCOV
3097
    return NULL;
×
3098
  }
3099
  return pFuncCtx;
430,891,534✔
3100
}
3101

3102
// NOTE: sources columns are more than the destination SSDatablock columns.
3103
// doFilter in table scan needs every column even its output is false
3104
int32_t relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn) {
15,340,217✔
3105
  int32_t code = TSDB_CODE_SUCCESS;
15,340,217✔
3106
  size_t  numOfSrcCols = taosArrayGetSize(pCols);
15,340,217✔
3107

3108
  int32_t i = 0, j = 0;
15,340,303✔
3109
  while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
124,479,045✔
3110
    SColumnInfoData* p = taosArrayGet(pCols, i);
109,140,750✔
3111
    if (!p) {
109,139,206✔
3112
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
3113
      return terrno;
×
3114
    }
3115
    SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j);
109,139,206✔
3116
    if (!pmInfo) {
109,140,456✔
UNCOV
3117
      return terrno;
×
3118
    }
3119

3120
    if (p->info.colId == pmInfo->colId) {
109,140,456✔
3121
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->dstSlotId);
102,680,474✔
3122
      if (!pDst) {
102,680,078✔
UNCOV
3123
        return terrno;
×
3124
      }
3125
      code = colDataAssign(pDst, p, pBlock->info.rows, &pBlock->info);
102,680,078✔
3126
      if (code != TSDB_CODE_SUCCESS) {
102,676,947✔
3127
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
3128
        return code;
×
3129
      }
3130
      i++;
102,676,947✔
3131
      j++;
102,676,947✔
3132
    } else if (p->info.colId < pmInfo->colId) {
6,461,757✔
3133
      i++;
6,461,795✔
3134
    } else {
3135
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
UNCOV
3136
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3137
    }
3138
  }
3139
  return code;
15,340,261✔
3140
}
3141

3142
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
233,730,707✔
3143
  SInterval interval = {
467,318,335✔
3144
      .interval = pTableScanNode->interval,
233,653,701✔
3145
      .sliding = pTableScanNode->sliding,
233,606,860✔
3146
      .intervalUnit = pTableScanNode->intervalUnit,
233,772,104✔
3147
      .slidingUnit = pTableScanNode->slidingUnit,
233,680,581✔
3148
      .offset = pTableScanNode->offset,
233,757,377✔
3149
      .precision = pTableScanNode->scan.node.pOutputDataBlockDesc->precision,
233,763,105✔
3150
      .timeRange = pTableScanNode->scanRange,
3151
  };
3152
  calcIntervalAutoOffset(&interval);
233,472,920✔
3153

3154
  return interval;
233,605,046✔
3155
}
3156

3157
SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
55,795,155✔
3158
  SColumn c = {0};
55,795,155✔
3159

3160
  c.slotId = pColNode->slotId;
55,795,155✔
3161
  c.colId = pColNode->colId;
55,793,287✔
3162
  c.type = pColNode->node.resType.type;
55,795,875✔
3163
  c.bytes = pColNode->node.resType.bytes;
55,791,418✔
3164
  c.scale = pColNode->node.resType.scale;
55,795,555✔
3165
  c.precision = pColNode->node.resType.precision;
55,794,552✔
3166
  return c;
55,784,537✔
3167
}
3168

3169

3170
/**
3171
 * @brief Determine the actual time range for reading data based on the RANGE clause and the WHERE conditions.
3172
 * @param[in] cond The range specified by WHERE condition.
3173
 * @param[in] range The range specified by RANGE clause.
3174
 * @param[out] twindow The range to be read in DESC order, and only one record is needed.
3175
 * @param[out] extTwindow The external range to read for only one record, which is used for FILL clause.
3176
 * @note `cond` and `twindow` may be the same address.
3177
 */
3178
static int32_t getQueryExtWindow(const STimeWindow* cond, const STimeWindow* range, STimeWindow* twindow,
3,152,364✔
3179
                                 STimeWindow* extTwindows) {
3180
  int32_t     code = TSDB_CODE_SUCCESS;
3,152,364✔
3181
  int32_t     lino = 0;
3,152,364✔
3182
  STimeWindow tempWindow;
3183

3184
  if (cond->skey > cond->ekey || range->skey > range->ekey) {
3,152,364✔
3185
    *twindow = extTwindows[0] = extTwindows[1] = TSWINDOW_DESC_INITIALIZER;
4,845✔
3186
    return code;
4,845✔
3187
  }
3188

3189
  if (range->ekey < cond->skey) {
3,147,919✔
3190
    extTwindows[1] = *cond;
510,081✔
3191
    *twindow = extTwindows[0] = TSWINDOW_DESC_INITIALIZER;
510,081✔
3192
    return code;
510,081✔
3193
  }
3194

3195
  if (cond->ekey < range->skey) {
2,637,438✔
3196
    extTwindows[0] = *cond;
334,108✔
3197
    *twindow = extTwindows[1] = TSWINDOW_DESC_INITIALIZER;
334,108✔
3198
    return code;
334,108✔
3199
  }
3200

3201
  // Only scan data in the time range intersecion.
3202
  extTwindows[0] = extTwindows[1] = *cond;
2,303,330✔
3203
  twindow->skey = TMAX(cond->skey, range->skey);
2,303,730✔
3204
  twindow->ekey = TMIN(cond->ekey, range->ekey);
2,303,330✔
3205
  extTwindows[0].ekey = twindow->skey - 1;
2,303,330✔
3206
  extTwindows[1].skey = twindow->ekey + 1;
2,303,330✔
3207

3208
  return code;
2,303,330✔
3209
}
3210

3211
static int32_t getPrimaryTimeRange(SNode** pPrimaryKeyCond, STimeWindow* pTimeRange, bool* isStrict) {
589,272✔
3212
  SNode*  pNew = NULL;
589,272✔
3213
  int32_t code = scalarCalculateRemoteConstants(*pPrimaryKeyCond, &pNew);
589,272✔
3214
  if (TSDB_CODE_SUCCESS == code) {
589,272✔
3215
    *pPrimaryKeyCond = pNew;
589,272✔
3216
    if (nodeType(pNew) != QUERY_NODE_VALUE) {
589,272✔
3217
      code = filterGetTimeRange(*pPrimaryKeyCond, pTimeRange, isStrict, NULL);
589,272✔
3218
    }
3219
  }
3220
  return code;
589,272✔
3221
}
3222

3223
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, STableScanPhysiNode* pTableScanNode,
261,514,934✔
3224
                               const SReadHandle* readHandle, bool applyExtWin) {
3225
  int32_t code = 0;                             
261,514,934✔
3226
  pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
261,514,934✔
3227
  pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
261,520,675✔
3228

3229
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
261,549,778✔
3230
  if (!pCond->colList) {
261,493,832✔
UNCOV
3231
    return terrno;
×
3232
  }
3233
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
261,440,192✔
3234
  if (pCond->pSlotList == NULL) {
261,524,876✔
3235
    taosMemoryFreeClear(pCond->colList);
×
UNCOV
3236
    return terrno;
×
3237
  }
3238

3239
  // TODO: get it from stable scan node
3240
  pCond->twindows = pTableScanNode->scanRange;
261,362,470✔
3241
  pCond->suid = pTableScanNode->scan.suid;
261,546,167✔
3242
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
261,449,897✔
3243
  pCond->startVersion = -1;
261,497,519✔
3244
  pCond->endVersion = -1;
261,555,292✔
3245
  pCond->skipRollup = readHandle->skipRollup;
261,404,438✔
3246
  if (readHandle->winRangeValid) {
261,476,825✔
3247
    pCond->twindows = readHandle->winRange;
376,401✔
3248
  }
3249
  pCond->cacheSttStatis = readHandle->cacheSttStatis;
261,579,671✔
3250
  // allowed read stt file optimization mode
3251
  pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) &&
523,009,657✔
3252
                       (pTableScanNode->scan.node.pConditions == NULL) && (pTableScanNode->interval == 0);
261,493,595✔
3253

3254
  int32_t j = 0;
261,550,217✔
3255
  SNode*  node = NULL;
261,550,217✔
3256
  FOREACH(node, pTableScanNode->scan.pScanCols) {
1,233,161,114✔
3257
    STargetNode* pNode = (STargetNode*)node;
971,391,828✔
3258
    if (!pNode) {
971,391,828✔
3259
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
3260
      return terrno;
×
3261
    }
3262
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
971,391,828✔
3263
    if (pColNode->colType == COLUMN_TYPE_TAG) {
971,578,792✔
UNCOV
3264
      continue;
×
3265
    }
3266

3267
    pCond->colList[j].type = pColNode->node.resType.type;
971,575,417✔
3268
    pCond->colList[j].bytes = pColNode->node.resType.bytes;
971,566,448✔
3269
    pCond->colList[j].colId = pColNode->colId;
971,675,121✔
3270
    pCond->colList[j].pk = pColNode->isPk;
971,806,774✔
3271

3272
    pCond->pSlotList[j] = pNode->slotId;
971,476,290✔
3273
    j += 1;
971,610,897✔
3274
  }
3275

3276
  pCond->numOfCols = j;
261,515,642✔
3277

3278
  if (applyExtWin) {
261,518,900✔
3279
    if (NULL != pTableScanNode->pExtScanRange) {
234,085,451✔
3280
      pCond->type = TIMEWINDOW_RANGE_EXTERNAL;
3,083,565✔
3281
      code = getQueryExtWindow(&pCond->twindows, pTableScanNode->pExtScanRange, &pCond->twindows, pCond->extTwindows);
3,083,565✔
3282
    } else if (readHandle->extWinRangeValid) {
230,882,772✔
3283
      pCond->type = TIMEWINDOW_RANGE_EXTERNAL;
68,799✔
3284
      code = getQueryExtWindow(&pCond->twindows, &readHandle->extWinRange, &pCond->twindows, pCond->extTwindows);
68,799✔
3285
    }
3286
  }
3287

3288
  if (pTableScanNode->pPrimaryCond) {
261,479,143✔
3289
    bool isStrict = false;
589,272✔
3290
    code = getPrimaryTimeRange((SNode**)&pTableScanNode->pPrimaryCond, &pCond->twindows, &isStrict);
589,272✔
3291
    if (code || !isStrict) {
589,272✔
3292
      code = nodesMergeNode((SNode**)&pTableScanNode->scan.node.pConditions, &pTableScanNode->pPrimaryCond);
584,968✔
3293
    }
3294
  }
3295

3296
  return code;
261,483,664✔
3297
}
3298

3299
int32_t initQueryTableDataCondWithColArray(SQueryTableDataCond* pCond, SQueryTableDataCond* pOrgCond,
112,545,803✔
3300
                                           const SReadHandle* readHandle, SArray* colArray) {
3301
  int32_t code = TSDB_CODE_SUCCESS;
112,545,803✔
3302
  int32_t lino = 0;
112,545,803✔
3303

3304
  pCond->order = TSDB_ORDER_ASC;
112,545,803✔
3305
  pCond->numOfCols = (int32_t)taosArrayGetSize(colArray);
112,575,423✔
3306

3307
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
112,571,841✔
3308
  QUERY_CHECK_NULL(pCond->colList, code, lino, _return, terrno);
112,541,178✔
3309

3310
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
112,533,597✔
3311
  QUERY_CHECK_NULL(pCond->pSlotList, code, lino, _return, terrno);
112,545,944✔
3312

3313
  pCond->twindows = pOrgCond->twindows;
112,531,508✔
3314
  pCond->order = pOrgCond->order;
112,569,217✔
3315
  pCond->type = pOrgCond->type;
112,571,041✔
3316
  pCond->startVersion = -1;
112,556,370✔
3317
  pCond->endVersion = -1;
112,556,379✔
3318
  pCond->skipRollup = true;
112,569,697✔
3319
  pCond->notLoadData = false;
112,570,847✔
3320

3321
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
520,885,631✔
3322
    SColIdPair* pColPair = taosArrayGet(colArray, i);
408,328,712✔
3323
    QUERY_CHECK_NULL(pColPair, code, lino, _return, terrno);
408,347,007✔
3324

3325
    bool find = false;
408,230,261✔
3326
    for (int32_t j = 0; j < pOrgCond->numOfCols; ++j) {
2,147,483,647✔
3327
      if (pOrgCond->colList[j].colId == pColPair->vtbColId) {
2,147,483,647✔
3328
        pCond->colList[i].type = pOrgCond->colList[j].type;
408,364,750✔
3329
        pCond->colList[i].bytes = pOrgCond->colList[j].bytes;
408,378,592✔
3330
        pCond->colList[i].colId = pColPair->orgColId;
408,405,019✔
3331
        pCond->colList[i].pk = pOrgCond->colList[j].pk;
408,369,872✔
3332
        pCond->pSlotList[i] = i;
408,412,605✔
3333
        find = true;
408,392,822✔
3334
        qDebug("%s mapped vtb colId:%d to org colId:%d", __func__, pColPair->vtbColId, pColPair->orgColId);
408,392,822✔
3335
        break;
408,329,539✔
3336
      }
3337
    }
3338
    QUERY_CHECK_CONDITION(find, code, lino, _return, TSDB_CODE_NOT_FOUND);
408,331,374✔
3339
  }
3340

3341
  return code;
112,594,009✔
3342
_return:
×
3343
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(terrno));
×
3344
  taosMemoryFreeClear(pCond->colList);
×
3345
  taosMemoryFreeClear(pCond->pSlotList);
×
UNCOV
3346
  return code;
×
3347
}
3348

3349
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) {
654,599,697✔
3350
  taosMemoryFreeClear(pCond->colList);
654,599,697✔
3351
  taosMemoryFreeClear(pCond->pSlotList);
654,603,185✔
3352
}
654,571,068✔
3353

3354
int32_t convertFillType(int32_t mode) {
3,746,322✔
3355
  int32_t type = TSDB_FILL_NONE;
3,746,322✔
3356
  switch (mode) {
3,746,322✔
3357
    case FILL_MODE_PREV:
185,066✔
3358
      type = TSDB_FILL_PREV;
185,066✔
3359
      break;
185,066✔
3360
    case FILL_MODE_NONE:
×
3361
      type = TSDB_FILL_NONE;
×
UNCOV
3362
      break;
×
3363
    case FILL_MODE_NULL:
172,735✔
3364
      type = TSDB_FILL_NULL;
172,735✔
3365
      break;
172,735✔
3366
    case FILL_MODE_NULL_F:
39,832✔
3367
      type = TSDB_FILL_NULL_F;
39,832✔
3368
      break;
39,832✔
3369
    case FILL_MODE_NEXT:
187,607✔
3370
      type = TSDB_FILL_NEXT;
187,607✔
3371
      break;
187,607✔
3372
    case FILL_MODE_VALUE:
157,587✔
3373
      type = TSDB_FILL_SET_VALUE;
157,587✔
3374
      break;
157,587✔
3375
    case FILL_MODE_VALUE_F:
11,164✔
3376
      type = TSDB_FILL_SET_VALUE_F;
11,164✔
3377
      break;
11,164✔
3378
    case FILL_MODE_LINEAR:
215,690✔
3379
      type = TSDB_FILL_LINEAR;
215,690✔
3380
      break;
215,690✔
3381
    case FILL_MODE_NEAR:
2,776,641✔
3382
      type = TSDB_FILL_NEAR;
2,776,641✔
3383
      break;
2,776,641✔
3384
    default:
×
UNCOV
3385
      type = TSDB_FILL_NONE;
×
3386
  }
3387

3388
  return type;
3,746,322✔
3389
}
3390

3391
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery) {
2,147,483,647✔
3392
  if (ascQuery) {
2,147,483,647✔
3393
    *w = getAlignQueryTimeWindow(pInterval, ts);
2,147,483,647✔
3394
  } else {
3395
    // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
3396
    *w = getAlignQueryTimeWindow(pInterval, ts);
24,122✔
3397

3398
    int64_t key = w->skey;
286,063✔
3399
    while (key < ts) {  // moving towards end
302,093✔
3400
      key = getNextTimeWindowStart(pInterval, key, TSDB_ORDER_ASC);
142,859✔
3401
      if (key > ts) {
143,370✔
3402
        break;
127,340✔
3403
      }
3404

3405
      w->skey = key;
16,030✔
3406
    }
3407
    w->ekey = taosTimeAdd(w->skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
286,574✔
3408
  }
3409
}
2,147,483,647✔
3410

3411
static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) {
71,950,330✔
3412
  STimeWindow w = {0};
71,950,330✔
3413

3414
  w.skey = taosTimeTruncate(ts, pInterval);
71,950,330✔
3415
  w.ekey = taosTimeGetIntervalEnd(w.skey, pInterval);
71,944,084✔
3416
  return w;
71,957,582✔
3417
}
3418

3419
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) {
1,981,702✔
3420
  STimeWindow win = *pWindow;
1,981,702✔
3421
  STimeWindow save = win;
1,981,702✔
3422
  while (win.skey <= ts && win.ekey >= ts) {
11,809,056✔
3423
    save = win;
9,827,354✔
3424
    // get previous time window
3425
    getNextTimeWindow(pInterval, &win, order == TSDB_ORDER_DESC ? TSDB_ORDER_ASC : TSDB_ORDER_DESC);
9,827,354✔
3426
  }
3427

3428
  return save;
1,981,702✔
3429
}
3430

3431
// get the correct time window according to the handled timestamp
3432
// todo refactor
3433
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
112,478,193✔
3434
                                int32_t order) {
3435
  STimeWindow w = {0};
112,478,193✔
3436
  if (pResultRowInfo->cur.pageId == -1) {  // the first window, from the previous stored value
112,485,198✔
3437
    getInitialStartTimeWindow(pInterval, ts, &w, (order != TSDB_ORDER_DESC));
3,200,434✔
3438
    return w;
3,199,834✔
3439
  }
3440

3441
  SResultRow* pRow = getResultRowByPos(pBuf, &pResultRowInfo->cur, false);
109,285,360✔
3442
  if (pRow) {
109,288,040✔
3443
    TAOS_SET_OBJ_ALIGNED(&w, pRow->win);
109,289,411✔
3444
  }
3445

3446
  // in case of typical time window, we can calculate time window directly.
3447
  if (w.skey > ts || w.ekey < ts) {
109,290,466✔
3448
    w = doCalculateTimeWindow(ts, pInterval);
71,953,980✔
3449
  }
3450

3451
  if (pInterval->interval != pInterval->sliding) {
109,294,068✔
3452
    // it is an sliding window query, in which sliding value is not equalled to
3453
    // interval value, and we need to find the first qualified time window.
3454
    w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
1,981,702✔
3455
  }
3456

3457
  return w;
109,283,369✔
3458
}
3459

3460
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order) {
2,147,483,647✔
3461
  tw->skey = getNextTimeWindowStart(pInterval, tw->skey, order);
2,147,483,647✔
3462
  tw->ekey = taosTimeGetIntervalEnd(tw->skey, pInterval);
2,147,483,647✔
3463
}
2,147,483,647✔
3464

3465
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
529,700,259✔
3466
  return (pLimitInfo->limit.limit != -1 || pLimitInfo->limit.offset != -1 || pLimitInfo->slimit.limit != -1 ||
1,055,445,236✔
3467
          pLimitInfo->slimit.offset != -1);
525,745,340✔
3468
}
3469

3470
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo) {
×
UNCOV
3471
  return (pLimitInfo->slimit.limit != -1 || pLimitInfo->slimit.offset != -1);
×
3472
}
3473

3474
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) {
633,327,687✔
3475
  SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)};
633,327,687✔
3476
  SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)};
633,179,815✔
3477

3478
  pLimitInfo->limit = limit;
633,196,892✔
3479
  pLimitInfo->slimit = slimit;
633,201,557✔
3480
  pLimitInfo->remainOffset = limit.offset;
633,192,111✔
3481
  pLimitInfo->remainGroupOffset = slimit.offset;
633,226,924✔
3482
  pLimitInfo->numOfOutputRows = 0;
633,244,094✔
3483
  pLimitInfo->numOfOutputGroups = 0;
633,263,647✔
3484
  pLimitInfo->currentGroupId = 0;
633,246,377✔
3485
}
633,366,658✔
3486

3487
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo) {
96,915,607✔
3488
  pLimitInfo->numOfOutputRows = 0;
96,915,607✔
3489
  pLimitInfo->remainOffset = pLimitInfo->limit.offset;
96,949,712✔
3490
}
96,928,512✔
3491

3492
int32_t tableListGetSize(const STableListInfo* pTableList, int32_t* pRes) {
674,692,954✔
3493
  if (taosArrayGetSize(pTableList->pTableList) != taosHashGetSize(pTableList->map)) {
674,692,954✔
3494
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
UNCOV
3495
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3496
  }
3497
  (*pRes) = taosArrayGetSize(pTableList->pTableList);
674,693,980✔
3498
  return TSDB_CODE_SUCCESS;
674,727,103✔
3499
}
3500

3501
uint64_t tableListGetSuid(const STableListInfo* pTableList) { return pTableList->idInfo.suid; }
3,712,890✔
3502

3503
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) {
210,351,704✔
3504
  if (taosArrayGetSize(pTableList->pTableList) == 0) {
210,351,704✔
3505
    return NULL;
2,077✔
3506
  }
3507

3508
  return taosArrayGet(pTableList->pTableList, index);
210,302,856✔
3509
}
3510

3511
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex) {
47,196✔
3512
  int32_t numOfTables = taosArrayGetSize(pTableList->pTableList);
47,196✔
3513
  if (startIndex >= numOfTables) {
47,196✔
UNCOV
3514
    return -1;
×
3515
  }
3516

3517
  for (int32_t i = startIndex; i < numOfTables; ++i) {
563,330✔
3518
    STableKeyInfo* p = taosArrayGet(pTableList->pTableList, i);
563,330✔
3519
    if (!p) {
563,330✔
3520
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
3521
      return -1;
×
3522
    }
3523
    if (p->uid == uid) {
563,330✔
3524
      return i;
47,196✔
3525
    }
3526
  }
UNCOV
3527
  return -1;
×
3528
}
3529

3530
void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, uint64_t* uid, int32_t* type) {
414,493✔
3531
  *psuid = pTableList->idInfo.suid;
414,493✔
3532
  *uid = pTableList->idInfo.uid;
414,493✔
3533
  *type = pTableList->idInfo.tableType;
414,493✔
3534
}
414,493✔
3535

3536
uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
865,391,110✔
3537
  int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
865,391,110✔
3538
  if (slot == NULL) {
865,600,730✔
3539
    qDebug("table:%" PRIu64 " not found in table list", tableUid);
×
UNCOV
3540
    return -1;
×
3541
  }
3542

3543
  STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot);
865,600,730✔
3544
  if (pKeyInfo == NULL) {
865,841,354✔
3545
    qDebug("table:%" PRIu64 " not found in table list", tableUid);
×
UNCOV
3546
    return -1;
×
3547
  }
3548
  return pKeyInfo->groupId;
865,841,354✔
3549
}
3550

3551
// TODO handle the group offset info, fix it, the rule of group output will be broken by this function
3552
// int32_t tableListRemoveTableInfo(STableListInfo* pTableList, uint64_t uid) {
3553
//   int32_t code = TSDB_CODE_SUCCESS;
3554
//   int32_t lino = 0;
3555

3556
//   int32_t* slot = taosHashGet(pTableList->map, &uid, sizeof(uid));
3557
//   if (slot == NULL) {
3558
//     qDebug("table:%" PRIu64 " not found in table list", uid);
3559
//     return 0;
3560
//   }
3561

3562
//   taosArrayRemove(pTableList->pTableList, *slot);
3563
//   code = taosHashRemove(pTableList->map, &uid, sizeof(uid));
3564

3565
//   _end:
3566
//   if (code != TSDB_CODE_SUCCESS) {
3567
//     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
3568
//   } else {
3569
//     qDebug("uid:%" PRIu64 ", remove from table list", uid);
3570
//   }
3571

3572
//   return code;
3573
// }
3574

3575
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
1,347,575✔
3576
  int32_t code = TSDB_CODE_SUCCESS;
1,347,575✔
3577
  int32_t lino = 0;
1,347,575✔
3578
  if (pTableList->map == NULL) {
1,347,575✔
3579
    pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
UNCOV
3580
    QUERY_CHECK_NULL(pTableList->map, code, lino, _end, terrno);
×
3581
  }
3582

3583
  STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
1,347,850✔
3584
  void*         p = taosHashGet(pTableList->map, &uid, sizeof(uid));
1,347,878✔
3585
  if (p != NULL) {
1,346,593✔
3586
    qInfo("table:%" PRId64 " already in tableIdList, ignore it", uid);
×
UNCOV
3587
    goto _end;
×
3588
  }
3589

3590
  void* tmp = taosArrayPush(pTableList->pTableList, &keyInfo);
1,346,593✔
3591
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,346,275✔
3592

3593
  int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
1,346,275✔
3594
  code = taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
1,347,214✔
3595
  if (code != TSDB_CODE_SUCCESS) {
1,348,514✔
3596
    // we have checked the existence of uid in hash map above
3597
    QUERY_CHECK_CONDITION((code != TSDB_CODE_DUP_KEY), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
UNCOV
3598
    taosArrayPopTailBatch(pTableList->pTableList, 1);  // let's pop the last element in the array list
×
3599
  }
3600

3601
_end:
1,348,514✔
3602
  if (code != TSDB_CODE_SUCCESS) {
1,348,514✔
UNCOV
3603
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3604
  } else {
3605
    qDebug("uid:%" PRIu64 ", groupId:%" PRIu64 " added into table list, slot:%d, total:%d", uid, gid, slot, slot + 1);
1,348,514✔
3606
  }
3607

3608
  return code;
1,349,453✔
3609
}
3610

3611
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
258,332,855✔
3612
                              int32_t* size) {
3613
  int32_t totalGroups = tableListGetOutputGroups(pTableList);
258,332,855✔
3614
  int32_t numOfTables = 0;
258,342,600✔
3615
  int32_t code = tableListGetSize(pTableList, &numOfTables);
258,366,192✔
3616
  if (code != TSDB_CODE_SUCCESS) {
258,325,319✔
3617
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
3618
    return code;
×
3619
  }
3620

3621
  if (ordinalGroupIndex < 0 || ordinalGroupIndex >= totalGroups) {
258,325,319✔
UNCOV
3622
    return TSDB_CODE_INVALID_PARA;
×
3623
  }
3624

3625
  // here handle two special cases:
3626
  // 1. only one group exists, and 2. one table exists for each group.
3627
  if (totalGroups == 1) {
258,325,319✔
3628
    *size = numOfTables;
257,784,986✔
3629
    *pKeyInfo = (*size == 0) ? NULL : taosArrayGet(pTableList->pTableList, 0);
257,855,493✔
3630
    return TSDB_CODE_SUCCESS;
257,778,959✔
3631
  } else if (totalGroups == numOfTables) {
540,333✔
3632
    *size = 1;
461,317✔
3633
    *pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex);
461,317✔
3634
    return TSDB_CODE_SUCCESS;
461,317✔
3635
  }
3636

3637
  int32_t offset = pTableList->groupOffset[ordinalGroupIndex];
79,016✔
3638
  if (ordinalGroupIndex < totalGroups - 1) {
56,195✔
3639
    *size = pTableList->groupOffset[ordinalGroupIndex + 1] - offset;
42,231✔
3640
  } else {
3641
    *size = numOfTables - offset;
13,964✔
3642
  }
3643

3644
  *pKeyInfo = taosArrayGet(pTableList->pTableList, offset);
56,195✔
3645
  return TSDB_CODE_SUCCESS;
56,195✔
3646
}
3647

3648
int32_t tableListGetOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; }
716,178,166✔
3649

3650
bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; }
838,538✔
3651

3652
STableListInfo* tableListCreate() {
281,463,460✔
3653
  STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo));
281,463,460✔
3654
  if (pListInfo == NULL) {
281,382,590✔
UNCOV
3655
    return NULL;
×
3656
  }
3657

3658
  pListInfo->remainGroups = NULL;
281,382,590✔
3659
  pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
281,419,080✔
3660
  if (pListInfo->pTableList == NULL) {
281,403,291✔
UNCOV
3661
    goto _error;
×
3662
  }
3663

3664
  pListInfo->map = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
281,446,366✔
3665
  if (pListInfo->map == NULL) {
281,587,985✔
UNCOV
3666
    goto _error;
×
3667
  }
3668

3669
  pListInfo->numOfOuputGroups = 1;
281,579,256✔
3670
  return pListInfo;
281,584,456✔
3671

3672
_error:
×
3673
  tableListDestroy(pListInfo);
×
UNCOV
3674
  return NULL;
×
3675
}
3676

3677
void tableListDestroy(STableListInfo* pTableListInfo) {
292,074,934✔
3678
  if (pTableListInfo == NULL) {
292,074,934✔
3679
    return;
10,574,877✔
3680
  }
3681

3682
  taosArrayDestroy(pTableListInfo->pTableList);
281,500,057✔
3683
  taosMemoryFreeClear(pTableListInfo->groupOffset);
281,444,893✔
3684

3685
  taosHashCleanup(pTableListInfo->map);
281,479,387✔
3686
  taosHashCleanup(pTableListInfo->remainGroups);
281,551,947✔
3687
  pTableListInfo->pTableList = NULL;
281,566,080✔
3688
  pTableListInfo->map = NULL;
281,562,073✔
3689
  taosMemoryFree(pTableListInfo);
281,549,808✔
3690
}
3691

3692
void tableListClear(STableListInfo* pTableListInfo) {
789,703✔
3693
  if (pTableListInfo == NULL) {
789,703✔
UNCOV
3694
    return;
×
3695
  }
3696

3697
  taosArrayClear(pTableListInfo->pTableList);
789,703✔
3698
  taosHashClear(pTableListInfo->map);
789,991✔
3699
  taosHashClear(pTableListInfo->remainGroups);
790,930✔
3700
  taosMemoryFree(pTableListInfo->groupOffset);
790,930✔
3701
  pTableListInfo->numOfOuputGroups = 1;
790,930✔
3702
  pTableListInfo->oneTableForEachGroup = false;
790,930✔
3703
}
3704

3705
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
552,333,310✔
3706
  STableKeyInfo* pInfo1 = (STableKeyInfo*)p1;
552,333,310✔
3707
  STableKeyInfo* pInfo2 = (STableKeyInfo*)p2;
552,333,310✔
3708

3709
  if (pInfo1->groupId == pInfo2->groupId) {
552,333,310✔
3710
    return 0;
518,092,011✔
3711
  } else {
3712
    return pInfo1->groupId < pInfo2->groupId ? -1 : 1;
34,249,344✔
3713
  }
3714
}
3715

3716
int32_t sortTableGroup(STableListInfo* pTableListInfo) {
27,614,534✔
3717
  int32_t code = TSDB_CODE_SUCCESS;
27,614,534✔
3718
  taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
27,614,534✔
3719
  int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
27,635,137✔
3720
  if (size == 0) {
27,627,721✔
3721
    pTableListInfo->numOfOuputGroups = 0;
×
UNCOV
3722
    return code;
×
3723
  }
3724

3725
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
27,627,721✔
3726
  if (!pList) {
27,631,855✔
3727
    code = terrno;
×
UNCOV
3728
    goto end;
×
3729
  }
3730

3731
  STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
27,631,855✔
3732
  if (pInfo == NULL) {
27,592,945✔
3733
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3734
    code = terrno;
×
UNCOV
3735
    goto end;
×
3736
  }
3737
  uint64_t gid = pInfo->groupId;
27,592,945✔
3738

3739
  int32_t start = 0;
27,611,440✔
3740
  void*   tmp = taosArrayPush(pList, &start);
27,624,068✔
3741
  if (tmp == NULL) {
27,624,068✔
3742
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3743
    code = terrno;
×
UNCOV
3744
    goto end;
×
3745
  }
3746

3747
  for (int32_t i = 1; i < size; ++i) {
151,802,802✔
3748
    pInfo = taosArrayGet(pTableListInfo->pTableList, i);
124,180,659✔
3749
    if (pInfo == NULL) {
124,180,535✔
3750
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3751
      code = terrno;
×
UNCOV
3752
      goto end;
×
3753
    }
3754
    if (pInfo->groupId != gid) {
124,180,535✔
3755
      tmp = taosArrayPush(pList, &i);
7,021,167✔
3756
      if (tmp == NULL) {
7,021,167✔
3757
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
3758
        code = terrno;
×
UNCOV
3759
        goto end;
×
3760
      }
3761
      gid = pInfo->groupId;
7,021,167✔
3762
    }
3763
  }
3764

3765
  pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
27,626,339✔
3766
  pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
27,624,253✔
3767
  if (pTableListInfo->groupOffset == NULL) {
27,608,987✔
3768
    code = terrno;
×
UNCOV
3769
    goto end;
×
3770
  }
3771

3772
  memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
27,594,600✔
3773

3774
end:
27,609,635✔
3775
  taosArrayDestroy(pList);
27,595,734✔
3776
  return code;
27,592,724✔
3777
}
3778

3779
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SScanPhysiNode* pScanNode,
246,035,110✔
3780
                                    SNodeList* group, bool groupSort, uint8_t* digest, SStorageAPI* pAPI, SHashObj* groupIdMap) {
3781
  int32_t code = TSDB_CODE_SUCCESS;
246,035,110✔
3782

3783
  bool   groupByTbname = groupbyTbname(group);
246,035,110✔
3784
  size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
246,038,295✔
3785
  if (!numOfTables) {
246,032,024✔
3786
    return code;
3,428✔
3787
  }
3788
  qDebug("numOfTables:%zu, groupByTbname:%d, group:%p", numOfTables, groupByTbname, group);
246,028,596✔
3789
  if (group == NULL || groupByTbname) {
245,960,401✔
3790
    if (tsCountAlwaysReturnValue && QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode) &&
241,734,272✔
3791
        ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) {
197,722,666✔
3792
      pTableListInfo->remainGroups =
14,028,444✔
3793
          taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
14,028,458✔
3794
      if (pTableListInfo->remainGroups == NULL) {
14,028,444✔
UNCOV
3795
        return terrno;
×
3796
      }
3797

3798
      for (int i = 0; i < numOfTables; i++) {
68,480,888✔
3799
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
54,453,907✔
3800
        if (!info) {
54,453,990✔
3801
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
3802
          return terrno;
×
3803
        }
3804
        info->groupId = groupByTbname ? info->uid : 0;
54,453,990✔
3805
        int32_t tempRes = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId),
54,453,990✔
3806
                                      &(info->uid), sizeof(info->uid));
54,454,073✔
3807
        if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
54,452,444✔
3808
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
UNCOV
3809
          return tempRes;
×
3810
        }
3811
      }
3812
    } else {
3813
      for (int32_t i = 0; i < numOfTables; i++) {
817,899,739✔
3814
        STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
590,240,484✔
3815
        if (!info) {
590,132,963✔
3816
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
3817
          return terrno;
×
3818
        }
3819
        info->groupId = groupByTbname ? info->uid : 0;
590,132,963✔
3820
        
3821
      }
3822
    }
3823
    if (groupIdMap && group != NULL){
241,686,236✔
3824
      getColInfoResultForGroupbyForStream(pHandle->vnode, group, pTableListInfo, pAPI, groupIdMap);
76,925✔
3825
    }
3826

3827
    pTableListInfo->oneTableForEachGroup = groupByTbname;
241,686,236✔
3828
    if (numOfTables == 1 && pTableListInfo->idInfo.tableType == TSDB_CHILD_TABLE) {
241,802,019✔
3829
      pTableListInfo->oneTableForEachGroup = true;
98,429,724✔
3830
    }
3831

3832
    if (groupSort && groupByTbname) {
241,789,365✔
3833
      taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
2,282,749✔
3834
      pTableListInfo->numOfOuputGroups = numOfTables;
2,282,749✔
3835
    } else if (groupByTbname && pScanNode->groupOrderScan) {
239,506,616✔
3836
      pTableListInfo->numOfOuputGroups = numOfTables;
24,873✔
3837
    } else {
3838
      pTableListInfo->numOfOuputGroups = 1;
239,484,395✔
3839
    }
3840
    if (groupSort || pScanNode->groupOrderScan) {
241,799,589✔
3841
      code = sortTableGroup(pTableListInfo);
27,506,477✔
3842
    }
3843
  } else {
3844
    bool initRemainGroups = false;
4,226,129✔
3845
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) {
4,226,129✔
3846
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode;
4,126,327✔
3847
      if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable &&
4,126,327✔
3848
          !(groupSort || pScanNode->groupOrderScan)) {
1,998,623✔
3849
        initRemainGroups = true;
1,974,435✔
3850
      }
3851
    }
3852

3853
    code = getColInfoResultForGroupby(pHandle->vnode, group, pTableListInfo, digest, pAPI, initRemainGroups, groupIdMap);
4,226,129✔
3854
    if (code != TSDB_CODE_SUCCESS) {
4,225,512✔
UNCOV
3855
      return code;
×
3856
    }
3857

3858
    if (pScanNode->groupOrderScan) pTableListInfo->numOfOuputGroups = taosArrayGetSize(pTableListInfo->pTableList);
4,225,512✔
3859

3860
    if (groupSort || pScanNode->groupOrderScan) {
4,224,994✔
3861
      code = sortTableGroup(pTableListInfo);
146,398✔
3862
    }
3863
  }
3864

3865
  // add all table entry in the hash map
3866
  size_t size = taosArrayGetSize(pTableListInfo->pTableList);
245,968,721✔
3867
  for (int32_t i = 0; i < size; ++i) {
914,001,003✔
3868
    STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i);
667,961,080✔
3869
    if (!p) {
667,798,605✔
3870
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
3871
      return terrno;
×
3872
    }
3873
    int32_t tempRes = taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t));
667,798,605✔
3874
    if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
668,006,700✔
3875
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
×
UNCOV
3876
      return tempRes;
×
3877
    }
3878
  }
3879

3880
  return code;
246,090,048✔
3881
}
3882

3883
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
263,608,111✔
3884
                                STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
3885
                                SExecTaskInfo* pTaskInfo, SHashObj* groupIdMap) {
3886
  int64_t     st = taosGetTimestampUs();
263,623,443✔
3887
  const char* idStr = GET_TASKID(pTaskInfo);
263,623,443✔
3888

3889
  if (pHandle == NULL) {
263,383,315✔
3890
    qError("invalid handle, in creating operator tree, %s", idStr);
×
UNCOV
3891
    return TSDB_CODE_INVALID_PARA;
×
3892
  }
3893

3894
  if (pHandle->uid != 0) {
263,383,315✔
3895
    pScanNode->uid = pHandle->uid;
56,742✔
3896
    pScanNode->tableType = TSDB_CHILD_TABLE;
56,742✔
3897
  }
3898
  uint8_t digest[17] = {0};
263,575,210✔
3899
  int32_t code = getTableList(pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo, digest, idStr,
263,486,889✔
3900
                              &pTaskInfo->storageAPI, pTaskInfo->pStreamRuntimeInfo);
263,577,707✔
3901
  if (code != TSDB_CODE_SUCCESS) {
263,631,872✔
3902
    qError("failed to getTableList, code:%s", tstrerror(code));
934✔
3903
    return code;
934✔
3904
  }
3905

3906
  int32_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
263,630,938✔
3907

3908
  int64_t st1 = taosGetTimestampUs();
263,691,169✔
3909
  pTaskInfo->cost.extractListTime = (st1 - st) / 1000.0;
263,691,169✔
3910
  qDebug("extract queried table list completed, %d tables, elapsed time:%.2f ms %s", numOfTables,
263,719,826✔
3911
         pTaskInfo->cost.extractListTime, idStr);
3912

3913
  if (numOfTables == 0) {
263,674,956✔
3914
    qDebug("no table qualified for query, %s", idStr);
17,668,927✔
3915
    return TSDB_CODE_SUCCESS;
17,665,987✔
3916
  }
3917

3918
  code = buildGroupIdMapForAllTables(pTableListInfo, pHandle, pScanNode, pGroupTags, groupSort, digest, &pTaskInfo->storageAPI, groupIdMap);
246,006,029✔
3919
  if (code != TSDB_CODE_SUCCESS) {
246,046,894✔
UNCOV
3920
    return code;
×
3921
  }
3922

3923
  pTaskInfo->cost.groupIdMapTime = (taosGetTimestampUs() - st1) / 1000.0;
246,050,135✔
3924
  qDebug("generate group id map completed, elapsed time:%.2f ms %s", pTaskInfo->cost.groupIdMapTime, idStr);
246,042,404✔
3925

3926
  return TSDB_CODE_SUCCESS;
246,043,020✔
3927
}
3928

3929
char* getStreamOpName(uint16_t opType) {
6,105,948✔
3930
  switch (opType) {
6,105,948✔
3931
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
×
UNCOV
3932
      return "stream scan";
×
3933
    case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
5,866,024✔
3934
      return "project";
5,866,024✔
3935
    case QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW:
239,924✔
3936
      return "external window";
239,924✔
3937
  }
UNCOV
3938
  return "error name";
×
3939
}
3940

3941
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr, int64_t qId) {
883,266,527✔
3942
  if (qDebugFlag & DEBUG_TRACE) {
883,266,527✔
3943
    if (!pBlock) {
1,202,331✔
3944
      qDebug("%" PRIx64 " %s %s %s: Block is Null", qId, taskIdStr, flag, __func__);
15,589✔
3945
      return;
15,589✔
3946
    } else if (pBlock->info.rows == 0) {
1,186,742✔
3947
      qDebug("%" PRIx64 " %s %s %s: Block is Empty. block type %d", qId, taskIdStr, flag, __func__, pBlock->info.type);
759✔
3948
      return;
759✔
3949
    }
3950
    
3951
    char*   pBuf = NULL;
1,185,983✔
3952
    int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr, qId);
1,185,983✔
3953
    if (code == 0) {
1,185,983✔
3954
      qDebugL("%" PRIx64 " %s %s", qId, __func__, pBuf);
1,185,983✔
3955
      taosMemoryFree(pBuf);
1,185,983✔
3956
    }
3957
  }
3958
}
3959

3960
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
×
3961
  if (!pBlock) {
×
3962
    qDebug("%s===stream===%s %s: Block is Null", taskIdStr, flag, opStr);
×
3963
    return;
×
3964
  } else if (pBlock->info.rows == 0) {
×
UNCOV
3965
    qDebug("%s===stream===%s %s: Block is Empty. block type %d.skey:%" PRId64 ",ekey:%" PRId64 ",version%" PRId64,
×
3966
           taskIdStr, flag, opStr, pBlock->info.type, pBlock->info.window.skey, pBlock->info.window.ekey,
3967
           pBlock->info.version);
UNCOV
3968
    return;
×
3969
  }
3970
  if (qDebugFlag & DEBUG_TRACE) {
×
3971
    char* pBuf = NULL;
×
3972
    char  flagBuf[64];
×
3973
    snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
×
3974
    int32_t code = dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr, 0);
×
3975
    if (code == 0) {
×
3976
      qDebug("%s", pBuf);
×
UNCOV
3977
      taosMemoryFree(pBuf);
×
3978
    }
3979
  }
3980
}
3981

3982
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
14,180,432✔
3983

3984
void updateTimeWindowInfo(SColumnInfoData* pColData, const STimeWindow* pWin, int64_t delta) {
2,147,483,647✔
3985
  int64_t* ts = (int64_t*)pColData->pData;
2,147,483,647✔
3986

3987
  int64_t duration = pWin->ekey > pWin->skey ? pWin->ekey - pWin->skey + delta : pWin->skey - pWin->ekey + delta;
2,147,483,647✔
3988
  ts[2] = duration;            // set the duration
2,147,483,647✔
3989
  ts[3] = pWin->skey;          // window start key
2,147,483,647✔
3990
  ts[4] = pWin->ekey + delta;  // window end key
2,147,483,647✔
3991
}
2,147,483,647✔
3992

3993
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock,
2,147,483,647✔
3994
                 int32_t rowIndex) {
3995
  SColumnDataAgg* pColAgg = NULL;
2,147,483,647✔
3996
  const char*     isNull = oldkeyBuf;
2,147,483,647✔
3997
  const char*     p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size;
2,147,483,647✔
3998

3999
  for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
2,147,483,647✔
4000
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
2,147,483,647✔
4001
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
2,147,483,647✔
4002
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
2,147,483,647✔
4003

4004
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
2,147,483,647✔
4005
      if (isNull[i] != 1) return 1;
264,656,643✔
4006
    } else {
4007
      if (isNull[i] != 0) return 1;
2,147,483,647✔
4008
      const char* val = colDataGetData(pColInfoData, rowIndex);
2,147,483,647✔
4009
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
4010
        int32_t len = getJsonValueLen(val);
×
4011
        if (memcmp(p, val, len) != 0) return 1;
×
UNCOV
4012
        p += len;
×
4013
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
2,147,483,647✔
4014
        if (IS_STR_DATA_BLOB(pCol->type)) {
1,220,326,734✔
4015
          if (memcmp(p, val, blobDataTLen(val)) != 0) return 1;
×
UNCOV
4016
          p += blobDataTLen(val);
×
4017
        } else {
4018
          if (memcmp(p, val, varDataTLen(val)) != 0) return 1;
1,221,579,732✔
4019
          p += varDataTLen(val);
1,220,983,131✔
4020
        }
4021
      } else {
4022
        if (0 != memcmp(p, val, pCol->bytes)) return 1;
2,147,483,647✔
4023
        p += pCol->bytes;
2,147,483,647✔
4024
      }
4025
    }
4026
  }
4027
  if ((int32_t)(p - oldkeyBuf) != oldKeysLen) return 1;
2,147,483,647✔
4028
  return 0;
2,147,483,647✔
4029
}
4030

4031
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex) {
1,614,039✔
4032
  uint32_t        colNum = pSortGroupCols->size;
1,614,039✔
4033
  SColumnDataAgg* pColAgg = NULL;
1,614,039✔
4034
  char*           isNull = keyBuf;
1,614,039✔
4035
  char*           p = keyBuf + sizeof(int8_t) * colNum;
1,614,039✔
4036

4037
  for (int32_t i = 0; i < colNum; ++i) {
4,419,468✔
4038
    const SColumn*         pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
2,805,429✔
4039
    const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
2,805,429✔
4040
    if (pCol->slotId > pBlock->pDataBlock->size) continue;
2,805,429✔
4041

4042
    if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
2,805,429✔
4043

4044
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
5,610,858✔
4045
      isNull[i] = 1;
135,900✔
4046
    } else {
4047
      isNull[i] = 0;
2,669,529✔
4048
      const char* val = colDataGetData(pColInfoData, rowIndex);
2,669,529✔
4049
      if (pCol->type == TSDB_DATA_TYPE_JSON) {
2,669,529✔
4050
        int32_t len = getJsonValueLen(val);
×
4051
        memcpy(p, val, len);
×
UNCOV
4052
        p += len;
×
4053
      } else if (IS_VAR_DATA_TYPE(pCol->type)) {
2,669,529✔
4054
        if (IS_STR_DATA_BLOB(pCol->type)) {
997,506✔
4055
          blobDataCopy(p, val);
×
UNCOV
4056
          p += blobDataTLen(val);
×
4057
        } else {
4058
          varDataCopy(p, val);
997,506✔
4059
          p += varDataTLen(val);
997,506✔
4060
        }
4061
      } else {
4062
        memcpy(p, val, pCol->bytes);
1,672,023✔
4063
        p += pCol->bytes;
1,672,023✔
4064
      }
4065
    }
4066
  }
4067
  return (int32_t)(p - keyBuf);
1,614,039✔
4068
}
4069

4070
uint64_t calcGroupId(char* pData, int32_t len) {
2,147,483,647✔
4071
  T_MD5_CTX context;
2,147,483,647✔
4072
  tMD5Init(&context);
2,147,483,647✔
4073
  tMD5Update(&context, (uint8_t*)pData, len);
2,147,483,647✔
4074
  tMD5Final(&context);
2,147,483,647✔
4075

4076
  // NOTE: only extract the initial 8 bytes of the final MD5 digest
4077
  uint64_t id = 0;
2,147,483,647✔
4078
  memcpy(&id, context.digest, sizeof(uint64_t));
2,147,483,647✔
4079
  if (0 == id) memcpy(&id, context.digest + 8, sizeof(uint64_t));
2,147,483,647✔
4080
  return id;
2,147,483,647✔
4081
}
4082

4083
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
106,908✔
4084
  SNode*     node;
4085
  SNodeList* ret = NULL;
106,908✔
4086
  FOREACH(node, pSortKeys) {
326,160✔
4087
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)node;
219,252✔
4088
    int32_t           code = nodesListMakeAppend(&ret, pSortKey->pExpr);
219,252✔
4089
    if (code != TSDB_CODE_SUCCESS) {
219,252✔
4090
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
4091
      terrno = code;
×
UNCOV
4092
      return NULL;
×
4093
    }
4094
  }
4095
  return ret;
106,908✔
4096
}
4097

4098
int32_t extractKeysLen(const SArray* keys, int32_t* pLen) {
106,908✔
4099
  int32_t code = TSDB_CODE_SUCCESS;
106,908✔
4100
  int32_t lino = 0;
106,908✔
4101
  int32_t len = 0;
106,908✔
4102
  int32_t keyNum = taosArrayGetSize(keys);
106,908✔
4103
  for (int32_t i = 0; i < keyNum; ++i) {
271,800✔
4104
    SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
164,892✔
4105
    QUERY_CHECK_NULL(pCol, code, lino, _end, terrno);
164,892✔
4106
    len += pCol->bytes;
164,892✔
4107
  }
4108
  len += sizeof(int8_t) * keyNum;  // null flag
106,908✔
4109
  *pLen = len;
106,908✔
4110

4111
_end:
106,908✔
4112
  if (code != TSDB_CODE_SUCCESS) {
106,908✔
UNCOV
4113
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4114
  }
4115
  return code;
106,908✔
4116
}
4117

NEW
4118
int32_t parseErrorMsgFromAnalyticServer(SJson* pJson, const char* typeStr, const char* pId) {
×
4119
  int32_t code = TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
4120
  char*   pMsg = NULL;
×
4121
  if (pJson == NULL) {
×
UNCOV
4122
    return code;
×
4123
  }
4124

4125
  int32_t ret = tjsonDupStringValue(pJson, "msg", &pMsg);
×
4126
  if (ret == 0 && pMsg != NULL) {
×
NEW
4127
    qError("%s failed to exec %s operation, msg:%s", pId, typeStr, pMsg);
×
4128
    if (strstr(pMsg, "white noise") != NULL) {
×
4129
      code = TSDB_CODE_ANA_WN_DATA;
×
4130
    } else if (strstr(pMsg, "white-noise") != NULL) {
×
4131
      code = TSDB_CODE_ANA_WN_DATA;
×
4132
    } else if (strstr(pMsg, "[Errno 111] Connection refused") != NULL) {
×
4133
      code = TSDB_CODE_ANA_ALGO_NOT_LOAD;
×
4134
    } else if (strstr(pMsg, "failed to load model") != NULL) {
×
UNCOV
4135
      code = TSDB_CODE_ANA_ALGO_NOT_LOAD;
×
4136
    }
4137
  } else {
UNCOV
4138
    qError("%s failed to extract msg from server, unknown error", pId);
×
4139
  }
4140

4141
  taosMemoryFreeClear(pMsg);
×
UNCOV
4142
  return code;
×
4143
}
4144

4145

4146
int32_t createExprSubQResBlock(SSDataBlock** ppBlock, SDataType* pResType) {
26,033,580✔
4147
  int32_t code = 0;
26,033,580✔
4148
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
26,033,580✔
4149
  if (pBlock == NULL) {
26,033,047✔
UNCOV
4150
    return terrno;
×
4151
  }
4152

4153
  pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
26,033,047✔
4154
  if (pBlock->pDataBlock == NULL) {
26,033,047✔
4155
    code = terrno;
×
4156
    taosMemoryFree(pBlock);
×
UNCOV
4157
    return code;
×
4158
  }
4159

4160
  SColumnInfoData idata =
26,033,580✔
4161
      createColumnInfoData(pResType->type, pResType->bytes, 0);
26,033,580✔
4162
  idata.info.scale = pResType->scale;
26,033,580✔
4163
  idata.info.precision = pResType->precision;
26,033,580✔
4164

4165
  code = blockDataAppendColInfo(pBlock, &idata);
26,033,580✔
4166
  if (code != TSDB_CODE_SUCCESS) {
26,033,580✔
4167
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
4168
    blockDataDestroy(pBlock);
×
4169
    *ppBlock = NULL;
×
UNCOV
4170
    return code;
×
4171
  }
4172

4173
  *ppBlock = pBlock;
26,033,580✔
4174

4175
  return code;
26,033,580✔
4176
}
4177

4178

4179
int32_t extractSingleRspBlock(SRetrieveTableRsp* pRetrieveRsp, SSDataBlock* pb) {
26,033,580✔
4180
  int32_t            code = TSDB_CODE_SUCCESS;
26,033,580✔
4181
  int32_t            lino = 0;
26,033,580✔
4182
  void*              decompBuf = NULL;
26,033,580✔
4183

4184
  char* pNextStart = pRetrieveRsp->data;
26,033,580✔
4185
  char* pStart = pNextStart;
26,033,047✔
4186

4187
  int32_t index = 0;
26,033,047✔
4188

4189
  if (pRetrieveRsp->compressed) {  // decompress the data
26,033,047✔
4190
    decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
UNCOV
4191
    QUERY_CHECK_NULL(decompBuf, code, lino, _end, terrno);
×
4192
  }
4193

4194
  int32_t compLen = *(int32_t*)pStart;
26,033,047✔
4195
  pStart += sizeof(int32_t);
26,033,580✔
4196

4197
  int32_t rawLen = *(int32_t*)pStart;
26,033,580✔
4198
  pStart += sizeof(int32_t);
26,033,580✔
4199
  QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
26,033,580✔
4200

4201
  pNextStart = pStart + compLen;
26,033,580✔
4202
  if (pRetrieveRsp->compressed && (compLen < rawLen)) {
26,033,580✔
4203
    int32_t t = tsDecompressString(pStart, compLen, 1, decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
4204
    QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
UNCOV
4205
    pStart = decompBuf;
×
4206
  }
4207

4208
  code = blockDecodeInternal(pb, pStart, (const char**)&pStart);
26,033,047✔
4209
  if (code != 0) {
26,033,047✔
4210
    taosMemoryFreeClear(pRetrieveRsp);
×
UNCOV
4211
    goto _end;
×
4212
  }
4213

4214
_end:
26,033,047✔
4215
  if (code != TSDB_CODE_SUCCESS) {
26,033,580✔
4216
    blockDataDestroy(pb);
×
UNCOV
4217
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4218
  }
4219
  return code;
26,033,580✔
4220
}
4221

4222
int32_t setValueFromResBlock(STaskSubJobCtx* ctx, SValueNode* pRes, SSDataBlock* pBlock) {
17,629,398✔
4223
  int32_t code = 0;
17,629,398✔
4224
  bool needFree = true;
17,629,398✔
4225
  int32_t colNum = taosArrayGetSize(pBlock->pDataBlock);
17,629,398✔
4226
  if (NULL == pBlock->pDataBlock || 1 > colNum || pBlock->info.rows > 1) {
17,628,865✔
UNCOV
4227
    qError("%s invalid scl fetch res block, pDataBlock:%p, colNum:%d, rows:%" PRId64, 
×
4228
      ctx->idStr, pBlock->pDataBlock, colNum, pBlock->info.rows);
UNCOV
4229
    return TSDB_CODE_PAR_INVALID_SCALAR_SUBQ_RES_ROWS;
×
4230
  }
4231
  
4232
  pRes->flag &= (~VALUE_FLAG_VAL_UNSET);
17,628,332✔
4233
  pRes->translate = true;
17,629,398✔
4234
  
4235
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0);
17,629,398✔
4236
  if (colDataIsNull_s(pCol, 0)) {
17,629,398✔
4237
    pRes->isNull = true;
4,761,651✔
4238
  } else {
4239
    code = nodesSetValueNodeValueExt(pRes, colDataGetData(pCol, 0), &needFree);
12,867,747✔
4240
  }
4241

4242
  if (!needFree) {
17,628,865✔
4243
    pCol->pData = NULL;
1,748,448✔
4244
  }
4245

4246
  return code;
17,628,865✔
4247
}
4248

4249
void handleRemoteValueRes(SScalarFetchParam* pParam, STaskSubJobCtx* ctx, SRetrieveTableRsp* pRsp) {
3,127,767✔
4250
  SSDataBlock* pResBlock = NULL;
3,127,767✔
4251

4252
  qDebug("%s scl fetch value rsp received, subQIdx:%d, rows:%" PRId64 , ctx->idStr, pParam->subQIdx, pRsp->numOfRows);
3,127,767✔
4253

4254
  if (pRsp->numOfRows > 1 || pRsp->numOfBlocks > 1 || !pRsp->completed) {
3,127,767✔
4255
    qError("%s invalid scl value fetch rsp received, subQIdx:%d, rows:%" PRId64 ", blocks:%d, completed:%d", 
533✔
4256
      ctx->idStr, pParam->subQIdx, pRsp->numOfRows, pRsp->numOfBlocks, pRsp->completed);
4257
    ctx->code = TSDB_CODE_PAR_INVALID_SCALAR_SUBQ_RES_ROWS;
533✔
4258

4259
    return;
×
4260
  }
4261

4262
  if (1 != pRsp->numOfCols && pRsp->numOfRows > 0) {
3,127,234✔
UNCOV
4263
    qError("%s invalid scl value fetch rsp received, subQIdx:%d, cols:%" PRId64, ctx->idStr, pParam->subQIdx, pRsp->numOfCols);
×
UNCOV
4264
    ctx->code = TSDB_CODE_PAR_INVALID_SCALAR_SUBQ_RES_COLS;
×
4265

UNCOV
4266
    return;
×
4267
  }
4268

4269
  SRemoteValueNode* pRemote = (SRemoteValueNode*)pParam->pRes;
3,127,767✔
4270
  
4271
  if (0 == pRsp->numOfRows) {
3,127,767✔
4272
    pRemote->val.node.type = QUERY_NODE_VALUE;
169,983✔
4273
    pRemote->val.isNull = true;
169,983✔
4274
    pRemote->val.translate = true;
169,983✔
4275
    pRemote->val.flag &= (~VALUE_FLAG_VAL_UNSET);
169,983✔
4276
    taosArraySet(ctx->subResNodes, pParam->subQIdx, &pParam->pRes);
169,983✔
4277

4278
    return;
169,983✔
4279
  }
4280
  
4281
  ctx->code = createExprSubQResBlock(&pResBlock, &((SRemoteValueNode*)pParam->pRes)->val.node.resType);
2,957,251✔
4282
  if (TSDB_CODE_SUCCESS == ctx->code) {
2,957,784✔
4283
    ctx->code = blockDataEnsureCapacity(pResBlock, 1);
2,957,784✔
4284
  }
4285
  if (TSDB_CODE_SUCCESS == ctx->code) {
2,957,251✔
4286
    ctx->code = extractSingleRspBlock(pRsp, pResBlock);
2,957,784✔
4287
  }
4288
  if (TSDB_CODE_SUCCESS == ctx->code) {
2,956,718✔
4289
    ctx->code = setValueFromResBlock(ctx, &pRemote->val, pResBlock);
2,957,784✔
4290
  }
4291
  if (TSDB_CODE_SUCCESS == ctx->code) {
2,957,251✔
4292
    pRemote->val.node.type = QUERY_NODE_VALUE;
2,957,251✔
4293
    taosArraySet(ctx->subResNodes, pParam->subQIdx, &pParam->pRes);
2,957,251✔
4294
  }
4295

4296
  blockDataDestroy(pResBlock);  
2,957,784✔
4297
}
4298

4299

4300
int32_t updateValueListFromResBlock(STaskSubJobCtx* ctx, SRemoteValueListNode* pRes, SSDataBlock* pBlock) {
8,404,182✔
4301
  int32_t code = 0, lino = 0;
8,404,182✔
4302
  int32_t colNum = taosArrayGetSize(pBlock->pDataBlock);
8,404,182✔
4303
  if (NULL == pBlock->pDataBlock || 1 != colNum) {
8,404,182✔
UNCOV
4304
    qError("%s invalid scl fetch res block, pDataBlock:%p, colNum:%d", ctx->idStr, pBlock->pDataBlock, colNum);
×
UNCOV
4305
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
4306
  }
4307

4308
  pRes->hasValue = true;
8,404,182✔
4309
  
4310
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0);
8,404,182✔
4311
  TAOS_CHECK_EXIT(scalarBuildRemoteListHash(ctx->idStr, pRes, pCol, pBlock->info.rows));
8,404,182✔
4312

4313
_exit:
7,726,098✔
4314

4315
  if (code) {
8,404,182✔
4316
    qError("%s %s failed with error: %s", ctx->idStr, __func__, tstrerror(code));
678,084✔
4317
  }
4318
  
4319
  return code;
8,404,182✔
4320
}
4321

4322

4323

4324
void handleRemoteValueListRes(SScalarFetchParam* pParam, STaskSubJobCtx* ctx, SRetrieveTableRsp* pRsp, bool* fetchDone) {
11,155,746✔
4325
  SSDataBlock* pResBlock = NULL;
11,155,746✔
4326
  SRemoteValueListNode* pRemote = (SRemoteValueListNode*)pParam->pRes;
11,155,746✔
4327

4328
  qDebug("%s scl fetch valueList rsp received, subQIdx:%d, rows:%" PRId64 , ctx->idStr, pParam->subQIdx, pRsp->numOfRows);
11,155,746✔
4329

4330
  if (pRsp->numOfRows > 0) {
11,155,746✔
4331
    if (1 != pRsp->numOfCols) {
8,404,182✔
UNCOV
4332
      qError("%s invalid scl valueList fetch rsp received, subQIdx:%d, cols:%" PRId64, ctx->idStr, pParam->subQIdx, pRsp->numOfCols);
×
UNCOV
4333
      ctx->code = TSDB_CODE_PAR_INVALID_SCALAR_SUBQ_RES_COLS;
×
UNCOV
4334
      *fetchDone = true;
×
UNCOV
4335
      return;
×
4336
    }
4337

4338
    ctx->code = createExprSubQResBlock(&pResBlock, &((SExprNode*)pParam->pRes)->resType);
8,404,182✔
4339
    if (TSDB_CODE_SUCCESS == ctx->code) {
8,404,182✔
4340
      ctx->code = blockDataEnsureCapacity(pResBlock, pRsp->numOfRows);
8,404,182✔
4341
    }
4342
    if (TSDB_CODE_SUCCESS == ctx->code) {
8,403,649✔
4343
      ctx->code = extractSingleRspBlock(pRsp, pResBlock);
8,404,182✔
4344
    }
4345
    if (TSDB_CODE_SUCCESS == ctx->code) {
8,404,182✔
4346
      ctx->code = updateValueListFromResBlock(ctx, pRemote, pResBlock);
8,404,182✔
4347
    }
4348
    if (TSDB_CODE_SUCCESS == ctx->code && pRsp->completed) {
8,404,182✔
4349
      pRemote->flag &= (~VALUELIST_FLAG_VAL_UNSET);
7,450,272✔
4350
      taosArraySet(ctx->subResNodes, pParam->subQIdx, &pParam->pRes);
7,450,272✔
4351
    }
4352

4353
    blockDataDestroy(pResBlock);  
8,404,182✔
4354
  } else if (0 == pRsp->numOfRows && pRsp->completed) {
2,751,564✔
4355
    if (!pRemote->hasValue) {
2,751,564✔
4356
      ctx->code = scalarBuildRemoteListHash(ctx->idStr, pRemote, NULL, 0);
2,751,564✔
4357
    }
4358
    if (TSDB_CODE_SUCCESS == ctx->code) {    
2,751,564✔
4359
      pRemote->flag &= (~VALUELIST_FLAG_VAL_UNSET);
2,521,800✔
4360
      taosArraySet(ctx->subResNodes, pParam->subQIdx, &pParam->pRes);
2,521,800✔
4361
    }
4362
  }
4363

4364
  *fetchDone = (TSDB_CODE_SUCCESS != ctx->code || pRsp->completed) ? true : false;
11,155,746✔
4365

4366
  if (!(*fetchDone)) {
11,155,746✔
4367
    int32_t code = sendFetchRemoteNodeReq(ctx, pParam->subQIdx, pParam->pRes);
275,826✔
4368
    if (TSDB_CODE_SUCCESS != code) {
275,826✔
UNCOV
4369
      ctx->code = code;
×
UNCOV
4370
      *fetchDone = true;
×
4371
    }
4372
  }
4373
}
4374

4375
int32_t setRowHasNullFromResBlock(STaskSubJobCtx* ctx, bool* hasNull, SSDataBlock* pBlock) {
14,671,614✔
4376
  int32_t code = 0;
14,671,614✔
4377
  int32_t colNum = taosArrayGetSize(pBlock->pDataBlock);
14,671,614✔
4378
  if (2 != colNum) {
14,671,614✔
UNCOV
4379
    qError("%s invalid scl fetch res block, colNum:%d", ctx->idStr, colNum);
×
UNCOV
4380
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
4381
  }
4382
  
4383
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 1);
14,671,614✔
4384
  if (colDataIsNull_s(pCol, 0)) {
14,671,614✔
4385
    qError("%s invalid has_null res since it's null", ctx->idStr);
×
UNCOV
4386
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
4387
  } else {
4388
    *hasNull = *(bool*)colDataGetData(pCol, 0);
14,671,614✔
4389
  }
4390

4391
  return code;
14,671,614✔
4392
}
4393

4394
void handleRemoteRowRes(SScalarFetchParam* pParam, STaskSubJobCtx* ctx, SRetrieveTableRsp* pRsp) {
19,883,334✔
4395
  SSDataBlock* pResBlock = NULL;
19,883,334✔
4396

4397
  qDebug("%s scl fetch row rsp received, subQIdx:%d, rows:%" PRId64 , ctx->idStr, pParam->subQIdx, pRsp->numOfRows);
19,883,334✔
4398

4399
  if (pRsp->numOfRows > 1 || pRsp->numOfBlocks > 1 || !pRsp->completed) {
19,883,334✔
UNCOV
4400
    qError("%s invalid scl fetch row rsp received, subQIdx:%d, rows:%" PRId64 ", blocks:%d, completed:%d", 
×
4401
      ctx->idStr, pParam->subQIdx, pRsp->numOfRows, pRsp->numOfBlocks, pRsp->completed);
UNCOV
4402
    ctx->code = TSDB_CODE_PAR_INVALID_SCALAR_SUBQ_RES_ROWS;
×
4403

UNCOV
4404
    return;
×
4405
  }
4406

4407
  SRemoteRowNode* pRemote = (SRemoteRowNode*)pParam->pRes;
19,883,334✔
4408
  
4409
  if (0 == pRsp->numOfRows) {
19,883,334✔
4410
    pRemote->valSet = true;
5,211,720✔
4411
    pRemote->hasValue = false;
5,211,720✔
4412
    pRemote->hasNull = false;
5,211,720✔
4413
    pRemote->val.isNull = true;
5,211,720✔
4414
    pRemote->val.translate = true;
5,211,720✔
4415
    pRemote->val.flag &= (~VALUE_FLAG_VAL_UNSET);
5,211,720✔
4416
    taosArraySet(ctx->subResNodes, pParam->subQIdx, &pParam->pRes);
5,211,720✔
4417

4418
    return;
5,211,720✔
4419
  }
4420

4421
  if (2 != pRsp->numOfCols) {
14,671,614✔
4422
    qError("%s invalid scl fetch row rsp received, subQIdx:%d, cols:%" PRId64, ctx->idStr, pParam->subQIdx, pRsp->numOfCols);
×
4423
    ctx->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
4424

4425
    return;
×
4426
  }
4427
  
4428
  ctx->code = createExprSubQResBlock(&pResBlock, &pRemote->val.node.resType);
14,671,614✔
4429
  if (TSDB_CODE_SUCCESS == ctx->code) {
14,671,614✔
4430
    SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BOOL, tDataTypes[TSDB_DATA_TYPE_BOOL].bytes, 0);
14,671,614✔
4431
    ctx->code = blockDataAppendColInfo(pResBlock, &idata);
14,671,614✔
4432
  }
4433
  if (TSDB_CODE_SUCCESS == ctx->code) {
14,671,614✔
4434
    ctx->code = blockDataEnsureCapacity(pResBlock, 1);
14,671,614✔
4435
  }
4436
  if (TSDB_CODE_SUCCESS == ctx->code) {
14,671,614✔
4437
    ctx->code = extractSingleRspBlock(pRsp, pResBlock);
14,671,614✔
4438
  }
4439
  if (TSDB_CODE_SUCCESS == ctx->code) {
14,671,614✔
4440
    ctx->code = setValueFromResBlock(ctx, &pRemote->val, pResBlock);
14,671,614✔
4441
  }
4442
  if (TSDB_CODE_SUCCESS == ctx->code) {
14,671,614✔
4443
    ctx->code = setRowHasNullFromResBlock(ctx, &pRemote->hasNull, pResBlock);
14,671,614✔
4444
  }
4445
  if (TSDB_CODE_SUCCESS == ctx->code) {
14,671,614✔
4446
    taosArraySet(ctx->subResNodes, pParam->subQIdx, &pParam->pRes);
14,671,614✔
4447
  }
4448
  if (TSDB_CODE_SUCCESS == ctx->code) {
14,671,614✔
4449
    pRemote->valSet = true;
14,671,614✔
4450
    pRemote->hasValue = true;
14,671,614✔
4451
  }
4452
  
4453
  blockDataDestroy(pResBlock);  
14,671,614✔
4454
}
4455

4456

4457
int32_t setZeroRowsResValue(STaskSubJobCtx* ctx, SValueNode* pRes, int32_t rows) {
266,648✔
4458
  pRes->node.type = QUERY_NODE_VALUE;
266,648✔
4459
  pRes->flag &= (~VALUE_FLAG_VAL_UNSET);
266,648✔
4460
  pRes->translate = true;
266,648✔
4461
  
4462
  return nodesSetValueNodeValue(pRes, &rows);
266,648✔
4463
}
4464

4465
void handleRemoteZeroRowsRes(SScalarFetchParam* pParam, STaskSubJobCtx* ctx, SRetrieveTableRsp* pRsp, bool* fetchDone) {
266,648✔
4466
  SRemoteZeroRowsNode* pRemote = (SRemoteZeroRowsNode*)pParam->pRes;
266,648✔
4467

4468
  qDebug("%s scl fetch zeroRows rsp received, subQIdx:%d, rows:%" PRId64 , ctx->idStr, pParam->subQIdx, pRsp->numOfRows);
266,648✔
4469

4470
  int32_t resRows = (pRsp->numOfRows > 0) ? 1 : 0;
266,648✔
4471
  if (resRows > 0 || pRsp->completed) {
266,648✔
4472
    ctx->code = setZeroRowsResValue(ctx, &pRemote->val, resRows);
266,648✔
4473
    if (TSDB_CODE_SUCCESS == ctx->code) {    
266,648✔
4474
      taosArraySet(ctx->subResNodes, pParam->subQIdx, &pParam->pRes);
266,648✔
4475
    }
4476

4477
    *fetchDone = true;
266,648✔
4478
  } else {
4479
    *fetchDone = false;
×
4480
  }
4481

4482
  if (!(*fetchDone)) {
266,648✔
UNCOV
4483
    int32_t code = sendFetchRemoteNodeReq(ctx, pParam->subQIdx, pParam->pRes);
×
UNCOV
4484
    if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
4485
      ctx->code = code;
×
UNCOV
4486
      *fetchDone = true;
×
4487
    }
4488
  }
4489
}
266,648✔
4490

4491

4492
int32_t remoteFetchCallBack(void* param, SDataBuf* pMsg, int32_t code) {
35,282,031✔
4493
  taosMemoryFreeClear(pMsg->pEpSet);
35,282,031✔
4494

4495
  SScalarFetchParam* pParam = (SScalarFetchParam*)param;
35,282,564✔
4496
  STaskSubJobCtx* ctx = taosAcquireRef(fetchObjRefPool, pParam->subJobRefId);
35,282,564✔
4497
  if (ctx == NULL) {
35,282,031✔
4498
    qWarn("failed to acquire subJobCtx, since it may have been released, refId:%" PRIu64, pParam->subJobRefId);
4,264✔
4499
    taosMemoryFree(pMsg->pData);
4,264✔
4500
    return TSDB_CODE_SUCCESS;
4,264✔
4501
  }
4502

4503
  char idStr[64];
35,277,767✔
4504
  if (qDebugFlag & DEBUG_DEBUG) {
35,277,767✔
4505
    tstrncpy(idStr, ctx->idStr, sizeof(idStr));
239,146✔
4506
  }
4507
  
4508
  qDebug("%s subQIdx %d got rsp, blockIdx:%" PRId64 ", code:%d, rsp:%p", ctx->idStr, pParam->subQIdx, ctx->blockIdx, code, pMsg->pData);
35,277,767✔
4509

4510
  if (ctx->transporterId > 0) {
35,277,767✔
4511
    int32_t ret = asyncFreeConnById(ctx->rpcHandle, ctx->transporterId);
35,278,300✔
4512
    if (ret != 0) {
35,278,300✔
4513
      qDebug("%s failed to free subQ rpc handle, code:%s, subQIdx:%d", ctx->idStr, tstrerror(ret), pParam->subQIdx);
×
4514
    }
4515
    ctx->transporterId = -1;
35,278,300✔
4516
  }
4517

4518
  if (0 == code && NULL == pMsg->pData) {
35,277,234✔
UNCOV
4519
    qError("%s invalid rsp msg, msgType:%d, len:%d", ctx->idStr, pMsg->msgType, pMsg->len);
×
UNCOV
4520
    code = TSDB_CODE_QRY_INVALID_MSG;
×
4521
  }
4522

4523
  if (code == TSDB_CODE_SUCCESS) {
35,277,767✔
4524
    SRetrieveTableRsp* pRsp = pMsg->pData;
34,433,495✔
4525
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
34,433,495✔
4526
    pRsp->compLen = htonl(pRsp->compLen);
34,433,495✔
4527
    pRsp->payloadLen = htonl(pRsp->payloadLen);
34,433,495✔
4528
    pRsp->numOfCols = htonl(pRsp->numOfCols);
34,433,495✔
4529
    pRsp->useconds = htobe64(pRsp->useconds);
34,433,495✔
4530
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
34,432,962✔
4531

4532
    qDebug("%s subQIdx %d blockIdx:%" PRIu64 " rsp detail, numOfBlocks:%d, numOfRows:%" PRId64 ", numOfCols:%" PRId64 ", completed:%d", 
34,433,495✔
4533
      ctx->idStr, pParam->subQIdx, ctx->blockIdx, pRsp->numOfBlocks, pRsp->numOfRows, pRsp->numOfCols, pRsp->completed);
4534

4535
    ctx->blockIdx++;
34,433,495✔
4536

4537
    switch (nodeType(pParam->pRes)) {
34,433,495✔
4538
      case QUERY_NODE_REMOTE_VALUE:
3,127,767✔
4539
        handleRemoteValueRes(pParam, ctx, pRsp);
3,127,767✔
4540
        break;
3,127,767✔
4541
      case QUERY_NODE_REMOTE_VALUE_LIST: {
11,155,746✔
4542
        bool fetchDone = false;
11,155,746✔
4543
        handleRemoteValueListRes(pParam, ctx, pRsp, &fetchDone);
11,155,746✔
4544
        qDebug("%s subQIdx %d handle remote value list finished, fetchDone:%d", idStr, pParam->subQIdx, fetchDone);
11,155,746✔
4545
        if (!fetchDone) {
11,155,746✔
4546
          goto _exit;
275,826✔
4547
        }
4548
        break;
10,879,920✔
4549
      }  
4550
      case QUERY_NODE_REMOTE_ROW:
19,883,334✔
4551
        handleRemoteRowRes(pParam, ctx, pRsp);
19,883,334✔
4552
        break;
19,883,334✔
4553
      case QUERY_NODE_REMOTE_ZERO_ROWS: {
266,648✔
4554
        bool fetchDone = false;
266,648✔
4555
        handleRemoteZeroRowsRes(pParam, ctx, pRsp, &fetchDone);
266,648✔
4556
        qDebug("%s subQIdx %d handle remote zeroRows finished, fetchDone:%d", idStr, pParam->subQIdx, fetchDone);
266,648✔
4557
        if (!fetchDone) {
266,648✔
UNCOV
4558
          goto _exit;
×
4559
        }
4560
        break;
266,648✔
4561
      }
UNCOV
4562
      default:
×
UNCOV
4563
        qError("%s invalid scl fetch res node %d, subQIdx:%d", ctx->idStr, nodeType(pParam->pRes), pParam->subQIdx);
×
UNCOV
4564
        ctx->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
4565
        break;
×
4566
    }
4567
  } else {
4568
    ctx->code = rpcCvtErrCode(code);
844,272✔
4569
    if (ctx->code != code) {
844,272✔
UNCOV
4570
      qError("%s scl fetch rsp received, subQIdx:%d, error:%s, cvted error: %s", ctx->idStr, pParam->subQIdx,
×
4571
             tstrerror(code), tstrerror(ctx->code));
4572
    } else {
4573
      qError("%s scl fetch rsp received, subQIdx:%d, error:%s", ctx->idStr, pParam->subQIdx, tstrerror(code));
844,272✔
4574
    }
4575
  }
4576

4577
  qDebug("%s subQIdx %d sem_post subQ ready", ctx->idStr, pParam->subQIdx);
35,002,474✔
4578
  
4579
  code = tsem_post(&ctx->ready);
35,002,474✔
4580
  if (code != TSDB_CODE_SUCCESS) {
35,002,474✔
4581
    qError("failed to invoke post when scl fetch rsp is ready, code:%s", tstrerror(code));
×
4582
  }
4583

4584
_exit:
35,278,300✔
4585

4586
  code = taosReleaseRef(fetchObjRefPool, pParam->subJobRefId);
35,278,300✔
4587
  if (code != TSDB_CODE_SUCCESS) {
35,278,300✔
UNCOV
4588
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
4589
  }
4590

4591
  taosMemoryFree(pMsg->pData);
35,278,300✔
4592

4593
  return code;
35,277,767✔
4594
}
4595

4596

4597
int32_t sendFetchRemoteNodeReq(STaskSubJobCtx* ctx, int32_t subQIdx, SNode* pRes) {
35,279,377✔
4598
  int32_t          code = TSDB_CODE_SUCCESS;
35,279,377✔
4599
  int32_t          lino = 0;
35,279,377✔
4600
  SDownstreamSourceNode* pSource = (SDownstreamSourceNode*)taosArrayGetP(ctx->subEndPoints, subQIdx);
35,279,377✔
4601

4602
  SResFetchReq req = {0};
35,277,800✔
4603
  req.header.vgId = pSource->addr.nodeId;
35,278,333✔
4604
  req.sId = pSource->sId;
35,276,201✔
4605
  req.clientId = pSource->clientId;
35,278,855✔
4606
  req.taskId = pSource->taskId;
35,279,899✔
4607
  req.srcTaskId = ctx->taskId;
35,274,058✔
4608
  req.blockIdx = ctx->blockIdx;
35,267,651✔
4609
  req.queryId = ctx->queryId;
35,278,322✔
4610
  req.execId = pSource->execId;
35,281,509✔
4611

4612
  int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, false);
35,274,080✔
4613
  if (msgSize < 0) {
35,271,926✔
UNCOV
4614
    return msgSize;
×
4615
  }
4616

4617
  void* msg = taosMemoryCalloc(1, msgSize);
35,271,926✔
4618
  if (NULL == msg) {
35,266,085✔
UNCOV
4619
    return terrno;
×
4620
  }
4621

4622
  msgSize = tSerializeSResFetchReq(msg, msgSize, &req, false);
35,266,085✔
4623
  if (msgSize < 0) {
35,271,926✔
UNCOV
4624
    taosMemoryFree(msg);
×
UNCOV
4625
    return msgSize;
×
4626
  }
4627

4628
  qDebug("%s scl build fetch msg and send to nodeId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
35,271,926✔
4629
         ", execId:%d, blockIdx:%" PRId64,
4630
         ctx->idStr, pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
4631
         pSource->taskId, pSource->execId, req.blockIdx);
4632

4633
  // send the fetch remote task result reques
4634
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
35,279,388✔
4635
  if (NULL == pMsgSendInfo) {
35,270,871✔
UNCOV
4636
    taosMemoryFreeClear(msg);
×
UNCOV
4637
    qError("%s prepare message %d failed", ctx->idStr, (int32_t)sizeof(SMsgSendInfo));
×
UNCOV
4638
    return terrno;
×
4639
  }
4640

4641
  SScalarFetchParam* param = taosMemoryMalloc(sizeof(SScalarFetchParam));
35,270,871✔
4642
  if (NULL == param) {
35,275,668✔
UNCOV
4643
    taosMemoryFreeClear(msg);
×
UNCOV
4644
    taosMemoryFreeClear(pMsgSendInfo);
×
UNCOV
4645
    qError("%s prepare param %d failed", ctx->idStr, (int32_t)sizeof(SScalarFetchParam));
×
UNCOV
4646
    return terrno;
×
4647
  }
4648

4649
  if (ctx->code) {
35,275,668✔
UNCOV
4650
    qError("task has been killed, error:%s", tstrerror(ctx->code));
×
UNCOV
4651
    taosMemoryFree(param);
×
UNCOV
4652
    taosMemoryFreeClear(msg);
×
UNCOV
4653
    taosMemoryFreeClear(pMsgSendInfo);
×
UNCOV
4654
    code = ctx->code;
×
UNCOV
4655
    goto _end;
×
4656
  }
4657
  
4658
  param->subQIdx = subQIdx;
35,266,651✔
4659
  param->pRes = pRes;
35,277,800✔
4660
  param->subJobRefId = ctx->subJobRefId;
35,277,278✔
4661

4662
  pMsgSendInfo->param = param;
35,265,585✔
4663
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
35,271,948✔
4664
  pMsgSendInfo->msgInfo.pData = msg;
35,281,509✔
4665
  pMsgSendInfo->msgInfo.len = msgSize;
35,279,921✔
4666
  pMsgSendInfo->msgType = pSource->fetchMsgType;
35,276,201✔
4667
  pMsgSendInfo->fp = remoteFetchCallBack;
35,280,987✔
4668
  pMsgSendInfo->requestId = ctx->queryId;
35,279,921✔
4669

4670
  code = asyncSendMsgToServer(ctx->rpcHandle, &pSource->addr.epSet, &ctx->transporterId, pMsgSendInfo);
35,263,453✔
4671
  QUERY_CHECK_CODE(code, lino, _end);
35,283,108✔
4672
      
4673
_end:
35,283,108✔
4674

4675
  if (code != TSDB_CODE_SUCCESS) {
35,283,108✔
UNCOV
4676
    qError("%s %s failed at line %d since %s", ctx->idStr, __func__, lino, tstrerror(code));
×
4677
  }
4678
  
4679
  return code;
35,281,498✔
4680
}
4681

4682
int32_t fetchRemoteNodeImpl(STaskSubJobCtx* ctx, int32_t subQIdx, SNode* pRes) {
35,003,018✔
4683
  int32_t          code = TSDB_CODE_SUCCESS;
35,003,018✔
4684
  int32_t          lino = 0;
35,003,018✔
4685

4686
  ctx->blockIdx = 0;
35,003,018✔
4687

4688
  code = sendFetchRemoteNodeReq(ctx, subQIdx, pRes);
35,006,738✔
4689
  QUERY_CHECK_CODE(code, lino, _end);
35,005,150✔
4690

4691
  code = qSemWait(ctx->pTaskInfo, &ctx->ready);
35,005,150✔
4692
  if (isTaskKilled(ctx->pTaskInfo)) {
35,009,936✔
4693
    code = getTaskCode(ctx->pTaskInfo);
17,589✔
4694
  } else {
4695
    code = ctx->code;
34,992,347✔
4696
  }
4697
      
4698
_end:
35,009,936✔
4699

4700
  if (code != TSDB_CODE_SUCCESS) {
35,009,936✔
4701
    qError("%s %s failed at line %d since %s", ctx->idStr, __func__, lino, tstrerror(code));
1,760,115✔
4702
  }
4703
  return code;
35,009,936✔
4704
}
4705

4706
int32_t remoteNodeCopy(SNode* pSrc, SNode* pDst) {
113,918✔
4707
  int32_t code = 0, lino = 0;
113,918✔
4708
  
4709
  switch (nodeType(pSrc)) {
113,918✔
4710
    case QUERY_NODE_VALUE:
81,554✔
4711
      TAOS_CHECK_EXIT(valueNodeCopy((SValueNode*)pSrc, &((SRemoteValueNode*)pDst)->val));
81,554✔
4712
      ((SRemoteValueNode*)pDst)->val.node.type = QUERY_NODE_VALUE;
81,554✔
4713
      break;
81,554✔
4714
    case QUERY_NODE_REMOTE_VALUE_LIST: {
24,012✔
4715
      SRemoteValueListNode* pDstNode = (SRemoteValueListNode*)pDst;
24,012✔
4716
      memcpy(pDst, pSrc, sizeof(SRemoteValueListNode));
24,012✔
4717
      pDstNode->hashAllocated = false;      
24,012✔
4718
      break;
24,012✔
4719
    } 
4720
    case QUERY_NODE_REMOTE_ROW: {
8,352✔
4721
      SRemoteRowNode* pRemote = (SRemoteRowNode*)pDst;
8,352✔
4722
      TAOS_CHECK_EXIT(valueNodeCopy((SValueNode*)pSrc, &pRemote->val));
8,352✔
4723
      pRemote->valSet = true;
8,352✔
4724
      pRemote->hasValue = ((SRemoteRowNode*)pSrc)->hasValue;
8,352✔
4725
      pRemote->hasNull = ((SRemoteRowNode*)pSrc)->hasNull;
8,352✔
4726
      break;
8,352✔
4727
    }
UNCOV
4728
    default:
×
UNCOV
4729
      break;
×
4730
  }
4731

4732
_exit:
113,918✔
4733

4734
  if (code) {
113,918✔
UNCOV
4735
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4736
  }
4737

4738
  return code;
113,918✔
4739
}
4740

4741
int32_t qFetchRemoteNode(void* pCtx, int32_t subQIdx, SNode* pRes) {
35,117,469✔
4742
  STaskSubJobCtx*  ctx = (STaskSubJobCtx*)pCtx;
35,117,469✔
4743
  int32_t code = 0, lino = 0;
35,117,469✔
4744
  int32_t       subEndPoinsNum = taosArrayGetSize(ctx->subEndPoints);
35,117,469✔
4745
  if (subQIdx >= subEndPoinsNum) {
35,118,557✔
UNCOV
4746
    qError("%s invalid subQIdx %d, subEndPointsNum:%d", ctx->idStr, subQIdx, subEndPoinsNum);
×
UNCOV
4747
    return TSDB_CODE_QRY_SUBQ_NOT_FOUND;
×
4748
  }
4749

4750
  SNode** ppRes = taosArrayGet(ctx->subResNodes, subQIdx);
35,118,557✔
4751
  if (NULL == *ppRes) {
35,116,936✔
4752
    TAOS_CHECK_EXIT(fetchRemoteNodeImpl(ctx, subQIdx, pRes));
35,006,738✔
4753
    *ppRes = pRes;
33,249,821✔
4754
  } else {
4755
    TAOS_CHECK_EXIT(remoteNodeCopy(*ppRes, pRes));
113,918✔
4756
  }
4757

4758
_exit:
113,918✔
4759

4760
  if (code) {
35,123,854✔
4761
    qError("%s %s failed at line %d since %s", ctx->idStr, __func__, lino, tstrerror(code));
1,760,115✔
4762
  } else {
4763
    qDebug("%s %s subQIdx %d succeed", ctx->idStr, __func__, subQIdx);
33,363,739✔
4764
  }
4765

4766
  return code;
35,123,854✔
4767
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc