• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4815

17 Oct 2025 06:47AM UTC coverage: 61.177% (-0.03%) from 61.206%
#4815

push

travis-ci

web-flow
Merge pull request #33289 from taosdata/3.0

enh: Code Optimization (#33283)

155629 of 324369 branches covered (47.98%)

Branch coverage included in aggregate %.

207706 of 269535 relevant lines covered (77.06%)

127615938.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.81
/source/libs/executor/src/virtualtablescanoperator.c
1
/*
2
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
*
4
* This program is free software: you can use, redistribute, and/or modify
5
* it under the terms of the GNU Affero General Public License, version 3
6
* or later ("AGPL"), as published by the Free Software Foundation.
7
*
8
* This program is distributed in the hope that it will be useful, but WITHOUT
9
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
* FITNESS FOR A PARTICULAR PURPOSE.
11
*
12
* You should have received a copy of the GNU Affero General Public License
13
* along with this program. If not, see <http://www.gnu.org/licenses/>.
14
*/
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "tdatablock.h"
21
#include "virtualtablescan.h"
22
#include "tsort.h"
23

24
typedef struct SVirtualTableScanInfo {
25
  STableScanBase base;
26
  SArray*        pSortInfo;
27
  SSortHandle*   pSortHandle;
28
  int32_t        bufPageSize;
29
  uint32_t       sortBufSize;  // max buffer size for in-memory sort
30
  SSDataBlock*   pIntermediateBlock;   // to hold the intermediate result
31
  SSDataBlock*   pInputBlock;
32
  SHashObj*      dataSlotMap;
33
  int32_t        tsSlotId;
34
  int32_t        tagBlockId;
35
  int32_t        tagDownStreamId;
36
  bool           scanAllCols;
37
  SArray*        pSortCtxList;
38
  tb_uid_t       vtableUid;  // virtual table uid, used to identify the vtable scan operator
39
} SVirtualTableScanInfo;
40

41
typedef struct SVirtualScanMergeOperatorInfo {
42
  SOptrBasicInfo        binfo;
43
  EMergeType            type;
44
  SVirtualTableScanInfo virtualScanInfo;
45
  bool                  ignoreGroupId;
46
  uint64_t              groupId;
47
  STupleHandle*         pSavedTuple;
48
  SSDataBlock*          pSavedTagBlock;
49
} SVirtualScanMergeOperatorInfo;
50

51
typedef struct SLoadNextCtx {
52
  SOperatorInfo*  pOperator;
53
  SOperatorParam* pOperatorGetParam;
54
  int32_t         blockId;
55
  STimeWindow     window;
56
  SSDataBlock*    pIntermediateBlock;
57
  col_id_t        tsSlotId;
58
} SLoadNextCtx;
59

60
int32_t virtualScanloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
28,255,526✔
61
  SOperatorInfo* pOperator = (SOperatorInfo*)param;
28,255,526✔
62
  int32_t        code = TSDB_CODE_SUCCESS;
28,255,526✔
63

64
  VTS_ERR_JRET(pOperator->fpSet.getNextFn(pOperator, ppBlock));
28,255,526!
65
  VTS_ERR_JRET(blockDataCheck(*ppBlock));
28,255,526!
66

67
  return code;
28,255,526✔
68
_return:
×
69
  qError("failed to load data block from downstream, %s code:%s", __func__, tstrerror(code));
×
70
  return code;
×
71
}
72

73
int32_t getTimeWindowOfBlock(SSDataBlock *pBlock, col_id_t tsSlotId, int64_t *startTs, int64_t *endTs) {
27,167,186✔
74
  int32_t code = TSDB_CODE_SUCCESS;
27,167,186✔
75
  int32_t lino = 0;
27,167,186✔
76
  int32_t tsIndex = -1;
27,167,186✔
77
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
56,733,751!
78
    if (((SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i))->info.colId == tsSlotId) {
56,733,751✔
79
      tsIndex = i;
27,167,186✔
80
      break;
27,167,186✔
81
    }
82
  }
83

84
  if (tsIndex == -1) {
27,167,186!
85
    tsIndex = (int32_t)taosArrayGetSize(pBlock->pDataBlock) - 1;
×
86
  }
87

88
  SColumnInfoData *pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, tsIndex);
27,167,186✔
89
  QUERY_CHECK_NULL(pColData, code, lino, _return, terrno)
27,167,186!
90

91
  GET_TYPED_DATA(*startTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, 0), 0);
27,167,186✔
92
  GET_TYPED_DATA(*endTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, pBlock->info.rows - 1), 0);
27,167,186✔
93

94
  return code;
27,167,186✔
95
_return:
×
96
  qError("failed to get time window of block, %s code:%s, line:%d", __func__, tstrerror(code), lino);
×
97
  return code;
×
98
}
99

100
int32_t virtualScanloadNextDataBlockFromParam(void* param, SSDataBlock** ppBlock) {
36,215,458✔
101
  SLoadNextCtx*           pCtx = (SLoadNextCtx*)param;
36,215,458✔
102
  SOperatorInfo*          pOperator = pCtx->pOperator;
36,215,458✔
103
  SOperatorParam*         pOperatorGetParam = pCtx->pOperatorGetParam;
36,215,458✔
104
  int32_t                 code = TSDB_CODE_SUCCESS;
36,215,458✔
105
  SSDataBlock*            pRes = NULL;
36,215,458✔
106
  SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperatorGetParam->value;
36,215,458✔
107

108
  pParam->basic.window = pCtx->window;
36,215,458✔
109
  pOperator->status = OP_NOT_OPENED;
36,215,458✔
110
  if (pCtx->pIntermediateBlock) {
36,215,458✔
111
    blockDataDestroy(pCtx->pIntermediateBlock);
27,098,050✔
112
    pCtx->pIntermediateBlock = NULL;
27,098,050✔
113
  }
114

115
  VTS_ERR_JRET(pOperator->fpSet.getNextExtFn(pOperator, pOperatorGetParam, &pRes));
36,215,458!
116

117
  VTS_ERR_JRET(blockDataCheck(pRes));
36,215,458!
118
  if ((pRes)) {
36,215,458✔
119
    qDebug("%s load from downstream, blockId:%d", __func__, pCtx->blockId);
27,167,186✔
120
    (pRes)->info.id.blockId = pCtx->blockId;
27,167,186✔
121
    VTS_ERR_JRET(getTimeWindowOfBlock(pRes, pCtx->tsSlotId, &pCtx->window.skey, &pCtx->window.ekey));
27,167,186!
122
    VTS_ERR_JRET(createOneDataBlock(pRes, true, &pCtx->pIntermediateBlock));
27,167,186!
123
    *ppBlock = pCtx->pIntermediateBlock;
27,167,186✔
124
  } else {
125
    pCtx->window.ekey = INT64_MAX;
9,048,272✔
126
    *ppBlock = NULL;
9,048,272✔
127
  }
128

129
  return code;
36,215,458✔
130
_return:
×
131
  qError("failed to load data block from downstream, %s code:%s", __func__, tstrerror(code));
×
132
  return code;
×
133
}
134

135
int32_t makeTSMergeKey(SNodeList** pMergeKeys, col_id_t tsSlotId) {
6,579,627✔
136
  int32_t           code = TSDB_CODE_SUCCESS;
6,579,627✔
137
  SNodeList        *pNodeList = NULL;
6,579,627✔
138
  SColumnNode      *pColumnNode = NULL;
6,579,627✔
139
  SOrderByExprNode *pOrderByExprNode = NULL;
6,579,627✔
140

141
  VTS_ERR_JRET(nodesMakeList(&pNodeList));
6,579,627!
142

143
  VTS_ERR_JRET(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pColumnNode));
6,579,627!
144
  pColumnNode->slotId = tsSlotId;
6,579,627✔
145

146
  VTS_ERR_JRET(nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR, (SNode**)&pOrderByExprNode));
6,579,627!
147
  pOrderByExprNode->pExpr = (SNode*)pColumnNode;
6,579,627✔
148
  pOrderByExprNode->order = ORDER_ASC;
6,579,627✔
149
  pOrderByExprNode->nullOrder = NULL_ORDER_FIRST;
6,579,627✔
150

151
  VTS_ERR_JRET(nodesListAppend(pNodeList, (SNode*)pOrderByExprNode));
6,579,627!
152

153
  *pMergeKeys = pNodeList;
6,579,627✔
154
  return code;
6,579,627✔
155
_return:
×
156
  nodesDestroyNode((SNode*)pColumnNode);
×
157
  nodesDestroyNode((SNode*)pOrderByExprNode);
×
158
  nodesDestroyList(pNodeList);
×
159
  return code;
×
160
}
161

162
void cleanUpVirtualScanInfo(SVirtualTableScanInfo* pVirtualScanInfo) {
3,858,525✔
163
  if (pVirtualScanInfo->pSortInfo) {
3,858,525✔
164
    taosArrayDestroy(pVirtualScanInfo->pSortInfo);
2,880,904✔
165
    pVirtualScanInfo->pSortInfo = NULL;
2,880,904✔
166
  }
167
  if (pVirtualScanInfo->pSortHandle) {
3,858,525✔
168
    tsortDestroySortHandle(pVirtualScanInfo->pSortHandle);
2,864,810✔
169
    pVirtualScanInfo->pSortHandle = NULL;
2,864,810✔
170
  }
171
  if (pVirtualScanInfo->pSortCtxList) {
3,858,525✔
172
    for (int32_t i = 0; i < taosArrayGetSize(pVirtualScanInfo->pSortCtxList); i++) {
8,965,701✔
173
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pVirtualScanInfo->pSortCtxList, i);
6,100,891✔
174
      blockDataDestroy(pCtx->pIntermediateBlock);
6,100,891✔
175
      taosMemoryFree(pCtx);
6,100,891!
176
    }
177
    taosArrayDestroy(pVirtualScanInfo->pSortCtxList);
2,864,810✔
178
  }
179
}
3,858,525✔
180

181
int32_t createSortHandleFromParam(SOperatorInfo* pOperator) {
3,858,525✔
182
  int32_t                         code = TSDB_CODE_SUCCESS;
3,858,525✔
183
  int32_t                         lino = 0;
3,858,525✔
184
  SVirtualScanMergeOperatorInfo*  pInfo = pOperator->info;
3,858,525✔
185
  SVirtualTableScanInfo*          pVirtualScanInfo = &pInfo->virtualScanInfo;
3,858,525✔
186
  SVTableScanOperatorParam *      pParam = (SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value;
3,858,525✔
187
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
3,858,525✔
188
  pVirtualScanInfo->sortBufSize = pVirtualScanInfo->bufPageSize * (taosArrayGetSize((pParam)->pOpParamArray) + 1);
3,858,525✔
189
  int32_t                         numOfBufPage = (int32_t)pVirtualScanInfo->sortBufSize / pVirtualScanInfo->bufPageSize;
3,858,525!
190
  SNodeList*                      pMergeKeys = NULL;
3,858,525✔
191
  SSortSource*                    ps = NULL;
3,858,525✔
192
  int32_t                         scanOpIndex = 0;
3,858,525✔
193

194
  cleanUpVirtualScanInfo(pVirtualScanInfo);
3,858,525✔
195
  VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, pVirtualScanInfo->tsSlotId));
3,858,525!
196
  pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys);
3,858,525✔
197
  TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno)
3,858,525!
198
  nodesDestroyList(pMergeKeys);
3,858,525✔
199

200
  VTS_ERR_JRET(tsortCreateSortHandle(pVirtualScanInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pVirtualScanInfo->bufPageSize,
3,858,525!
201
                                     numOfBufPage, pVirtualScanInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pVirtualScanInfo->pSortHandle));
202

203
  tsortSetForceUsePQSort(pVirtualScanInfo->pSortHandle);
3,858,525✔
204
  tsortSetFetchRawDataFp(pVirtualScanInfo->pSortHandle, virtualScanloadNextDataBlockFromParam, NULL, NULL);
3,858,525✔
205

206
  if (pOperator->numOfDownstream > 2) {
3,858,525!
207
    qError("virtual scan operator should not have more than 2 downstreams, current numOfDownstream:%d", pOperator->numOfDownstream);
×
208
    VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM);
×
209
  }
210

211
  pVirtualScanInfo->tagDownStreamId = -1;
3,858,525✔
212
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
10,725,826✔
213
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
6,867,301✔
214
    if (pDownstream->resultDataBlockId == pVirtualScanInfo->tagBlockId) {
6,867,301✔
215
      // tag block do not need sort
216
      pVirtualScanInfo->tagDownStreamId = i;
3,008,776✔
217
      pInfo->pSavedTagBlock = NULL;
3,008,776✔
218
      continue;
3,008,776✔
219
    }
220
  }
221
  scanOpIndex = pVirtualScanInfo->tagDownStreamId == -1 ? 0 : 1 - pVirtualScanInfo->tagDownStreamId;
3,858,525✔
222

223
  pOperator->pDownstream[scanOpIndex]->status = OP_NOT_OPENED;
3,858,525✔
224
  pVirtualScanInfo->pSortCtxList = taosArrayInit(taosArrayGetSize((pParam)->pOpParamArray), POINTER_BYTES);
3,858,525✔
225
  TSDB_CHECK_NULL(pVirtualScanInfo->pSortCtxList, code, lino, _return, terrno)
3,858,525!
226
  for (int32_t i = 0; i < taosArrayGetSize((pParam)->pOpParamArray); i++) {
12,975,933✔
227
    SOperatorParam* pOpParam = *(SOperatorParam**)taosArrayGet((pParam)->pOpParamArray, i);
9,117,408✔
228
    SLoadNextCtx*   pCtx = NULL;
9,117,408✔
229
    ps = NULL;
9,117,408✔
230

231
    pCtx = taosMemoryMalloc(sizeof(SLoadNextCtx));
9,117,408!
232
    QUERY_CHECK_NULL(pCtx, code, lino, _return, terrno)
9,117,408!
233
    pCtx->blockId = i;
9,117,408✔
234
    pCtx->pOperator = pOperator->pDownstream[scanOpIndex];
9,117,408✔
235
    pCtx->pOperatorGetParam = pOpParam;
9,117,408✔
236
    pCtx->window = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN};
9,117,408✔
237
    pCtx->pIntermediateBlock = NULL;
9,117,408✔
238
    pCtx->tsSlotId = (col_id_t)pVirtualScanInfo->tsSlotId;
9,117,408✔
239

240
    ps = taosMemoryCalloc(1, sizeof(SSortSource));
9,117,408!
241
    QUERY_CHECK_NULL(ps, code, lino, _return, terrno)
9,117,408!
242

243
    ps->param = pCtx;
9,117,408✔
244
    ps->onlyRef = true;
9,117,408✔
245

246
    VTS_ERR_JRET(tsortAddSource(pVirtualScanInfo->pSortHandle, ps));
9,117,408!
247
    QUERY_CHECK_NULL(taosArrayPush(pVirtualScanInfo->pSortCtxList, &pCtx), code, lino, _return, terrno)
18,234,816!
248
  }
249

250
  VTS_ERR_JRET(tsortOpen(pVirtualScanInfo->pSortHandle));
3,858,525!
251

252
  return code;
3,858,525✔
253
_return:
×
254
  if (code != 0){
×
255
    qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
256
  }
257
  nodesDestroyList(pMergeKeys);
×
258
  if (ps != NULL) {
×
259
    taosMemoryFree(ps);
×
260
  }
261
  return code;
×
262
}
263

264
int32_t createSortHandle(SOperatorInfo* pOperator) {
2,695,376✔
265
  SVirtualScanMergeOperatorInfo * pInfo = pOperator->info;
2,695,376✔
266
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
2,695,376✔
267
  SVirtualTableScanInfo*          pVirtualScanInfo = &pInfo->virtualScanInfo;
2,695,376✔
268
  int32_t                         numOfBufPage = (int32_t)pVirtualScanInfo->sortBufSize / pVirtualScanInfo->bufPageSize;
2,695,376!
269
  SSortSource*                    ps = NULL;
2,695,376✔
270
  int32_t                         code = 0;
2,695,376✔
271
  int32_t                         lino = 0;
2,695,376✔
272

273
  VTS_ERR_JRET(tsortCreateSortHandle(pVirtualScanInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pVirtualScanInfo->bufPageSize,
2,695,376!
274
                                     numOfBufPage, pVirtualScanInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pVirtualScanInfo->pSortHandle));
275

276
  tsortSetForceUsePQSort(pVirtualScanInfo->pSortHandle);
2,695,376✔
277
  tsortSetFetchRawDataFp(pVirtualScanInfo->pSortHandle, virtualScanloadNextDataBlock, NULL, NULL);
2,695,376✔
278

279
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
11,787,229✔
280
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
9,091,853✔
281
    if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
9,091,853!
282
      VTS_ERR_JRET(pDownstream->fpSet._openFn(pDownstream));
9,091,853!
283
    } else {
284
      VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM);
×
285
    }
286

287
    if (pDownstream->resultDataBlockId == pVirtualScanInfo->tagBlockId) {
9,091,853✔
288
      // tag block do not need sort
289
      pVirtualScanInfo->tagDownStreamId = i;
299,546✔
290
      continue;
299,546✔
291
    }
292

293
    ps = taosMemoryCalloc(1, sizeof(SSortSource));
8,792,307!
294
    TSDB_CHECK_NULL(ps, code, lino, _return, terrno)
8,792,307!
295

296
    ps->param = pDownstream;
8,792,307✔
297
    ps->onlyRef = true;
8,792,307✔
298

299
    VTS_ERR_JRET(tsortAddSource(pVirtualScanInfo->pSortHandle, ps));
8,792,307!
300
    ps = NULL;
8,792,307✔
301
  }
302

303
  VTS_ERR_JRET(tsortOpen(pVirtualScanInfo->pSortHandle));
2,695,376!
304

305
_return:
2,695,376✔
306
  if (code != 0){
2,695,376!
307
    qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
308
  }
309
  if (ps != NULL) {
2,695,376!
310
    taosMemoryFree(ps);
×
311
  }
312
  return code;
2,695,376✔
313
}
314

315
int32_t openVirtualTableScanOperatorImpl(SOperatorInfo* pOperator) {
6,579,627✔
316
  int32_t                         code = 0;
6,579,627✔
317
  int32_t                         lino = 0;
6,579,627✔
318

319
  if (pOperator->numOfDownstream == 0) {
6,579,627✔
320
    return code;
25,726✔
321
  }
322

323
  if (pOperator->pOperatorGetParam) {
6,553,901✔
324
    VTS_ERR_JRET(createSortHandleFromParam(pOperator));
3,858,525!
325
  } else {
326
    VTS_ERR_JRET(createSortHandle(pOperator));
2,695,376!
327
  }
328

329
  return code;
6,553,901✔
330

331
_return:
×
332
  qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
333
  return code;
×
334
}
335

336
int32_t openVirtualTableScanOperator(SOperatorInfo* pOperator) {
77,175,805✔
337
  int32_t code = 0;
77,175,805✔
338

339
  if (OPTR_IS_OPENED(pOperator)) {
77,175,805✔
340
    return TSDB_CODE_SUCCESS;
70,596,178✔
341
  }
342

343
  int64_t startTs = taosGetTimestampUs();
6,579,627✔
344

345
  code = openVirtualTableScanOperatorImpl(pOperator);
6,579,627✔
346

347
  pOperator->cost.openCost = (double)(taosGetTimestampUs() - startTs) / 1000.0;
6,579,627✔
348
  pOperator->status = OP_RES_TO_RETURN;
6,579,627✔
349

350
  VTS_ERR_RET(code);
6,579,627!
351

352
  OPTR_SET_OPENED(pOperator);
6,579,627✔
353
  return code;
6,579,627✔
354
}
355

356
static int32_t doGetVtableMergedBlockData(SVirtualScanMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
43,052,970✔
357
                                          SSDataBlock* p) {
358
  int32_t code = 0;
43,052,970✔
359
  int64_t lastTs = 0;
43,052,970✔
360
  int64_t rowNums = -1;
43,052,970✔
361
  blockDataEmpty(p);
43,052,970✔
362
  while (1) {
2,147,483,647✔
363
    STupleHandle* pTupleHandle = NULL;
2,147,483,647✔
364
    if (!pInfo->pSavedTuple) {
2,147,483,647✔
365
      code = tsortNextTuple(pHandle, &pTupleHandle);
2,147,483,647✔
366
      if (pTupleHandle == NULL || (code != 0)) {
2,147,483,647!
367
        break;
368
      }
369
    } else {
370
      pTupleHandle = pInfo->pSavedTuple;
38,159,783✔
371
      pInfo->pSavedTuple = NULL;
38,159,783✔
372
    }
373

374
    SDataBlockInfo info = {0};
2,147,483,647✔
375
    tsortGetBlockInfo(pTupleHandle, &info);
2,147,483,647✔
376
    int32_t blockId = (int32_t)info.id.blockId;
2,147,483,647✔
377

378
    for (int32_t i = 0; i < (pInfo->virtualScanInfo.scanAllCols ? 1 : tsortGetColNum(pTupleHandle)); i++) {
2,147,483,647!
379
      bool isNull = tsortIsNullVal(pTupleHandle, i);
2,147,483,647✔
380
      if (isNull) {
2,147,483,647!
381
        int32_t slotKey = blockId << 16 | i;
×
382
        void*   slotId = taosHashGet(pInfo->virtualScanInfo.dataSlotMap, &slotKey, sizeof(slotKey));
×
383
        if (slotId == NULL) {
×
384
          if (i == 0) {
×
385
            colDataSetNULL(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums);
×
386
          } else {
387
            qError("failed to get slotId from dataSlotMap, blockId:%d, slotId:%d", blockId, i);
×
388
            VTS_ERR_RET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
389
          }
390
        } else {
391
          colDataSetNULL(taosArrayGet(p->pDataBlock, *(int32_t *)slotId), rowNums);
×
392
        }
393
      } else {
394
        char* pData = NULL;
2,147,483,647✔
395
        tsortGetValue(pTupleHandle, i, (void**)&pData);
2,147,483,647✔
396

397
        if (pData != NULL) {
2,147,483,647✔
398
          if (i == 0) {
2,147,483,647✔
399
            if (lastTs != *(int64_t*)pData) {
2,147,483,647✔
400
              if (rowNums >= capacity - 1) {
2,147,483,647✔
401
                pInfo->pSavedTuple = pTupleHandle;
38,614,127✔
402
                goto _return;
38,614,127✔
403
              }
404
              rowNums++;
2,147,483,647✔
405
              for (int32_t j = 0; j < taosArrayGetSize(p->pDataBlock); j++) {
2,147,483,647✔
406
                colDataSetNULL(taosArrayGet(p->pDataBlock, j), rowNums);
2,147,483,647✔
407
              }
408
              if (pInfo->virtualScanInfo.tsSlotId != -1) {
2,147,483,647✔
409
                VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums, pData, false));
2,147,483,647!
410
              }
411
              lastTs = *(int64_t*)pData;
2,147,483,647✔
412
            }
413
          }
414
          int32_t slotKey = blockId << 16 | i;
2,147,483,647✔
415
          void*   slotId = taosHashGet(pInfo->virtualScanInfo.dataSlotMap, &slotKey, sizeof(slotKey));
2,147,483,647✔
416
          if (slotId == NULL) {
2,147,483,647✔
417
            if (i == 0) {
2,147,483,647!
418
              continue;
2,147,483,647✔
419
            } else {
420
              qError("failed to get slotId from dataSlotMap, blockId:%d, slotId:%d", blockId, i);
×
421
              VTS_ERR_RET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
422
            }
423
          }
424
          VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, *(int32_t *)slotId), rowNums, pData, false));
2,147,483,647!
425
        }
426
      }
427
    }
428
  }
429
_return:
43,052,970✔
430
  p->info.rows = rowNums + 1;
43,052,970✔
431
  p->info.dataLoad = 1;
43,052,970✔
432
  p->info.scanFlag = MAIN_SCAN;
43,052,970✔
433
  return code;
43,052,970✔
434
}
435

436
static int32_t doGetVStableMergedBlockData(SVirtualScanMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
53,525,514✔
437
                                           SSDataBlock* p) {
438
  int32_t code = 0;
53,525,514✔
439
  int64_t lastTs = 0;
53,525,514✔
440
  int64_t rowNums = -1;
53,525,514✔
441
  blockDataEmpty(p);
53,525,514✔
442
  while (1) {
2,147,483,647✔
443
    STupleHandle* pTupleHandle = NULL;
2,147,483,647✔
444
    if (!pInfo->pSavedTuple) {
2,147,483,647✔
445
      code = tsortNextTuple(pHandle, &pTupleHandle);
2,147,483,647✔
446
      if (pTupleHandle == NULL || (code != 0)) {
2,147,483,647!
447
        break;
448
      }
449
    } else {
450
      pTupleHandle = pInfo->pSavedTuple;
46,871,487✔
451
      pInfo->pSavedTuple = NULL;
46,871,487✔
452
    }
453

454
    int32_t tsIndex = -1;
2,147,483,647✔
455

456
    for (int32_t i = 0; i < tsortGetColNum(pTupleHandle); i++) {
2,147,483,647!
457
      if (tsortIsNullVal(pTupleHandle, i)) {
2,147,483,647✔
458
        continue;
2,147,483,647✔
459
      } else {
460
        SColumnInfoData *pColInfo = NULL;
2,147,483,647✔
461
        tsortGetColumnInfo(pTupleHandle, i, &pColInfo);
2,147,483,647✔
462
        if (pColInfo->info.slotId ==  pInfo->virtualScanInfo.tsSlotId) {
2,147,483,647✔
463
          tsIndex = i;
2,147,483,647✔
464
          break;
2,147,483,647✔
465
        }
466
      }
467
    }
468

469
    if (tsIndex == -1) {
2,147,483,647!
470
      tsIndex = (int32_t)tsortGetColNum(pTupleHandle) - 1;
×
471
    }
472

473
    char* pData = NULL;
2,147,483,647✔
474
    // first, set ts slot's data
475
    // then, set other slots' data
476
    tsortGetValue(pTupleHandle, tsIndex, (void**)&pData);
2,147,483,647✔
477

478
    if (pData != NULL) {
2,147,483,647!
479
      if (lastTs != *(int64_t*)pData) {
2,147,483,647✔
480
        if (rowNums >= capacity - 1) {
2,147,483,647✔
481
          pInfo->pSavedTuple = pTupleHandle;
46,892,499✔
482
          goto _return;
46,892,499✔
483
        }
484
        rowNums++;
2,147,483,647✔
485
        for (int32_t j = 0; j < taosArrayGetSize(p->pDataBlock); j++) {
2,147,483,647✔
486
          colDataSetNULL(taosArrayGet(p->pDataBlock, j), rowNums);
2,147,483,647✔
487
        }
488
        if (pInfo->virtualScanInfo.tsSlotId != -1) {
2,147,483,647!
489
          VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums, pData, false));
2,147,483,647!
490
        }
491
        lastTs = *(int64_t*)pData;
2,147,483,647✔
492
      }
493
    }
494
    if (pInfo->virtualScanInfo.scanAllCols) {
2,147,483,647!
495
      continue;
2,147,483,647✔
496
    }
497

498
    for (int32_t i = 0; i < tsortGetColNum(pTupleHandle); i++) {
2,147,483,647✔
499
      if (i == tsIndex || tsortIsNullVal(pTupleHandle, i)) {
2,147,483,647✔
500
        continue;
2,147,483,647✔
501
      }
502

503
      SColumnInfoData *pColInfo = NULL;
2,147,483,647✔
504
      tsortGetColumnInfo(pTupleHandle, i, &pColInfo);
2,147,483,647✔
505
      tsortGetValue(pTupleHandle, i, (void**)&pData);
2,147,483,647✔
506

507
      if (pData != NULL) {
2,147,483,647!
508
        VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, i), rowNums, pData, false));
2,147,483,647!
509
      }
510
    }
511
  }
512
_return:
53,525,514✔
513
  p->info.rows = rowNums + 1;
53,525,514✔
514
  p->info.dataLoad = 1;
53,525,514✔
515
  p->info.scanFlag = MAIN_SCAN;
53,525,514✔
516
  return code;
53,525,514✔
517
}
518

519
int32_t doVirtualTableMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
96,604,210✔
520
  int32_t                        code = 0;
96,604,210✔
521
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
96,604,210✔
522
  SVirtualScanMergeOperatorInfo* pInfo = pOperator->info;
96,604,210✔
523
  SVirtualTableScanInfo*         pVirtualScanInfo = &pInfo->virtualScanInfo;
96,604,210✔
524
  SSortHandle*                   pHandle = pVirtualScanInfo->pSortHandle;
96,604,210✔
525
  SSDataBlock*                   pDataBlock = pInfo->binfo.pRes;
96,604,210✔
526
  int32_t                        capacity = pOperator->resultInfo.capacity;
96,604,210✔
527

528
  qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
96,603,499✔
529
  blockDataCleanup(pDataBlock);
96,603,499✔
530

531
  if (pHandle == NULL) {
96,604,210✔
532
    return TSDB_CODE_SUCCESS;
25,726✔
533
  }
534

535
  if (pVirtualScanInfo->pIntermediateBlock == NULL) {
96,578,484✔
536
    VTS_ERR_RET(tsortGetSortedDataBlock(pHandle, &pVirtualScanInfo->pIntermediateBlock));
3,689,091!
537
    if (pVirtualScanInfo->pIntermediateBlock == NULL) {
3,689,091!
538
      return TSDB_CODE_SUCCESS;
×
539
    }
540

541
    VTS_ERR_RET(blockDataEnsureCapacity(pVirtualScanInfo->pIntermediateBlock, capacity));
3,689,091!
542
  } else {
543
    blockDataCleanup(pVirtualScanInfo->pIntermediateBlock);
92,889,393✔
544
  }
545

546
  SSDataBlock* p = pVirtualScanInfo->pIntermediateBlock;
96,578,484✔
547
  if (pOperator->pOperatorGetParam) {
96,578,484✔
548
    VTS_ERR_RET(doGetVStableMergedBlockData(pInfo, pHandle, capacity, p));
53,525,514!
549
  } else {
550
    VTS_ERR_RET(doGetVtableMergedBlockData(pInfo, pHandle, capacity, p));
43,052,970!
551
  }
552

553
  VTS_ERR_RET(copyDataBlock(pDataBlock, p));
96,578,484!
554
  qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
96,577,773✔
555
         pDataBlock->info.rows);
556

557
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
96,577,773✔
558
  return code;
96,578,484✔
559
}
560

561
int32_t vtableAddTagPseudoColumnData(const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* tagBlock, SSDataBlock* pBlock, int32_t rows) {
41,775,772✔
562
  int32_t          code = TSDB_CODE_SUCCESS;
41,775,772✔
563
  int32_t          lino = 0;
41,775,772✔
564
  int64_t          backupRows;
565
  // currently only the tbname pseudo column
566
  if (numOfExpr <= 0) {
41,775,772!
567
    return TSDB_CODE_SUCCESS;
×
568
  }
569

570
  if (tagBlock == NULL) {
41,775,772!
571
    return TSDB_CODE_SUCCESS;
×
572
  }
573

574
  if (tagBlock->info.rows != 1) {
41,775,772!
575
    qError("tag block should have only one row, current rows:%" PRId64, tagBlock->info.rows);
×
576
    VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
577
  }
578

579
  backupRows = pBlock->info.rows;
41,775,772✔
580
  pBlock->info.rows = rows;
41,775,772✔
581
  for (int32_t j = 0; j < numOfExpr; ++j) {
155,659,760✔
582
    const SExprInfo* pExpr1 = &pExpr[j];
113,883,988✔
583
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
113,883,988✔
584

585
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
113,883,988✔
586
    TSDB_CHECK_NULL(pColInfoData, code, lino, _return, terrno)
113,883,988!
587
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
113,883,988✔
588

589
    SColumnInfoData* pTagInfoData = taosArrayGet(tagBlock->pDataBlock, j);
113,883,988✔
590
    TSDB_CHECK_NULL(pTagInfoData, code, lino, _return, terrno)
113,883,988!
591

592
    if (colDataIsNull_s(pTagInfoData, 0) || IS_JSON_NULL(pTagInfoData->info.type, colDataGetData(pTagInfoData, 0))) {
113,883,988!
593
      colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
594
      continue;
×
595
    }
596

597
    char* data = colDataGetData(pTagInfoData, 0);
113,883,988!
598

599
    if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
113,883,988!
600
      code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, 1, false);
113,883,988✔
601
      QUERY_CHECK_CODE(code, lino, _return);
113,883,988!
602
    } else {  // todo opt for json tag
603
      for (int32_t i = 0; i < pBlock->info.rows; ++i) {
×
604
        code = colDataSetVal(pColInfoData, i, data, false);
×
605
        QUERY_CHECK_CODE(code, lino, _return);
×
606
      }
607
    }
608
  }
609

610
  // restore the rows
611
  pBlock->info.rows = backupRows;
41,775,772✔
612

613
_return:
41,775,772✔
614

615
  if (code != TSDB_CODE_SUCCESS) {
41,775,772!
616
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
617
  }
618
  return code;
41,775,772✔
619
}
620

621
static int32_t doSetTagColumnData(SVirtualTableScanInfo* pInfo, SSDataBlock* pTagBlock, SSDataBlock* pBlock, int32_t rows) {
90,499,939✔
622
  int32_t         code = 0;
90,499,939✔
623
  STableScanBase* pTableScanInfo = &pInfo->base;
90,499,939✔
624
  SExprSupp*      pSup = &pTableScanInfo->pseudoSup;
90,499,939✔
625
  if (pSup->numOfExprs > 0) {
90,499,939✔
626
    VTS_ERR_RET(vtableAddTagPseudoColumnData(pSup->pExprInfo, pSup->numOfExprs, pTagBlock, pBlock, rows));
41,775,772!
627
  }
628

629
  return code;
90,499,939✔
630
}
631

632
int32_t virtualTableGetNext(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
77,175,094✔
633
  int32_t                        code = TSDB_CODE_SUCCESS;
77,175,094✔
634
  int32_t                        lino = 0;
77,175,094✔
635
  SVirtualScanMergeOperatorInfo* pInfo = pOperator->info;
77,175,094✔
636
  SVirtualTableScanInfo*         pVirtualScanInfo = &pInfo->virtualScanInfo;
77,175,805✔
637
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
77,175,805✔
638

639
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
77,175,805!
640
    pResBlock = NULL;
×
641
    return code;
×
642
  }
643

644
  VTS_ERR_JRET(pOperator->fpSet._openFn(pOperator));
77,175,805!
645

646
  if (pVirtualScanInfo->tagBlockId != -1 && pVirtualScanInfo->tagDownStreamId != -1 && !pInfo->pSavedTagBlock) {
77,175,805✔
647
    SSDataBlock*   pTagBlock = NULL;
3,308,322✔
648
    SOperatorInfo *pTagScanOp = pOperator->pDownstream[pVirtualScanInfo->tagDownStreamId];
3,308,322✔
649
    if (pOperator->pOperatorGetParam) {
3,308,322✔
650
      SOperatorParam* pTagOp = ((SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value)->pTagScanOp;
3,008,776✔
651
      VTS_ERR_JRET(pTagScanOp->fpSet.getNextExtFn(pTagScanOp, pTagOp, &pTagBlock));
3,008,776!
652
    } else {
653
      VTS_ERR_JRET(pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagBlock));
299,546!
654
    }
655

656
    if (pTagBlock == NULL || pTagBlock->info.rows != 1) {
3,308,322!
657
      VTS_ERR_JRET(TSDB_CODE_FAILED);
×
658
    }
659
    pInfo->pSavedTagBlock = pTagBlock;
3,308,322✔
660
  }
661

662
  while(1) {
663
    VTS_ERR_JRET(doVirtualTableMerge(pOperator, pResBlock));
96,604,210!
664
    if (*pResBlock == NULL) {
96,603,499✔
665
      setOperatorCompleted(pOperator);
6,104,271✔
666
      break;
6,104,271✔
667
    }
668

669
    if (pOperator->pOperatorGetParam) {
90,499,939✔
670
      uint64_t uid = ((SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value)->uid;
49,688,001✔
671
      (*pResBlock)->info.id.uid = uid;
49,688,001✔
672
    } else {
673
      (*pResBlock)->info.id.uid = pInfo->virtualScanInfo.vtableUid;
40,811,938✔
674
    }
675

676
    VTS_ERR_JRET(doSetTagColumnData(pVirtualScanInfo, pInfo->pSavedTagBlock, (*pResBlock), (*pResBlock)->info.rows));
90,499,939!
677
    VTS_ERR_JRET(doFilter(*pResBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL));
90,499,939!
678
    if ((*pResBlock)->info.rows > 0) {
90,499,939✔
679
      break;
71,071,534✔
680
    }
681
  }
682

683
  return code;
77,175,805✔
684
_return:
×
685
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
686
  pTaskInfo->code = code;
×
687
  T_LONG_JMP(pTaskInfo->env, code);
×
688
}
689

690
static void destroyTableScanBase(STableScanBase* pBase, TsdReader* pAPI) {
3,698,723✔
691
  cleanupQueryTableDataCond(&pBase->cond);
3,698,723✔
692

693
  if (pAPI->tsdReaderClose) {
3,698,723!
694
    pAPI->tsdReaderClose(pBase->dataReader);
×
695
  }
696
  pBase->dataReader = NULL;
3,698,723✔
697

698
  if (pBase->matchInfo.pList != NULL) {
3,698,723!
699
    taosArrayDestroy(pBase->matchInfo.pList);
×
700
  }
701

702
  taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
3,698,723✔
703
  cleanupExprSupp(&pBase->pseudoSup);
3,698,723✔
704
}
3,698,723✔
705

706
void destroyVirtualTableScanOperatorInfo(void* param) {
3,698,723✔
707
  if (!param) {
3,698,723!
708
    return;
×
709
  }
710
  SVirtualScanMergeOperatorInfo* pOperatorInfo = (SVirtualScanMergeOperatorInfo*)param;
3,698,723✔
711
  SVirtualTableScanInfo* pInfo = &pOperatorInfo->virtualScanInfo;
3,698,723✔
712
  blockDataDestroy(pOperatorInfo->binfo.pRes);
3,698,723✔
713
  pOperatorInfo->binfo.pRes = NULL;
3,698,723✔
714

715
  tsortDestroySortHandle(pInfo->pSortHandle);
3,698,723✔
716
  pInfo->pSortHandle = NULL;
3,698,723✔
717
  taosArrayDestroy(pInfo->pSortInfo);
3,698,723✔
718
  pInfo->pSortInfo = NULL;
3,698,723✔
719

720
  blockDataDestroy(pInfo->pIntermediateBlock);
3,698,723✔
721
  pInfo->pIntermediateBlock = NULL;
3,698,723✔
722

723
  blockDataDestroy(pInfo->pInputBlock);
3,698,723✔
724
  pInfo->pInputBlock = NULL;
3,698,723✔
725
  destroyTableScanBase(&pInfo->base, &pInfo->base.readerAPI);
3,698,723✔
726

727
  taosHashCleanup(pInfo->dataSlotMap);
3,698,723✔
728

729
  if (pInfo->pSortCtxList) {
3,698,723✔
730
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSortCtxList); i++) {
3,937,367✔
731
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pInfo->pSortCtxList, i);
2,972,700✔
732
      blockDataDestroy(pCtx->pIntermediateBlock);
2,972,700✔
733
      taosMemoryFree(pCtx);
2,972,700!
734
    }
735
    taosArrayDestroy(pInfo->pSortCtxList);
964,667✔
736
    pInfo->pSortCtxList = NULL;
964,667✔
737
  }
738
  taosMemoryFreeClear(param);
3,698,723!
739
}
740

741
int32_t extractColMap(SNodeList* pNodeList, SHashObj** pSlotMap, int32_t *tsSlotId, int32_t *tagBlockId) {
3,698,176✔
742
  size_t  numOfCols = LIST_LENGTH(pNodeList);
3,698,176✔
743
  int32_t code = TSDB_CODE_SUCCESS;
3,698,176✔
744
  int32_t lino = 0;
3,698,176✔
745

746
  if (numOfCols == 0) {
3,698,176✔
747
    return code;
5,163✔
748
  }
749

750
  *tsSlotId = -1;
3,693,013✔
751
  *tagBlockId = -1;
3,693,560✔
752
  *pSlotMap = taosHashInit(numOfCols, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
3,693,560✔
753
  TSDB_CHECK_NULL(*pSlotMap, code, lino, _return, terrno)
3,693,560!
754

755
  for (int32_t i = 0; i < numOfCols; ++i) {
24,606,077✔
756
    SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
20,912,517✔
757
    TSDB_CHECK_NULL(pColNode, code, lino, _return, terrno)
20,912,517!
758

759
    if (pColNode->isPrimTs) {
20,912,517!
760
      *tsSlotId = i;
3,303,606✔
761
    } else if (pColNode->hasRef) {
17,608,911!
762
      int32_t slotKey = pColNode->dataBlockId << 16 | pColNode->slotId;
9,688,878✔
763
      VTS_ERR_JRET(taosHashPut(*pSlotMap, &slotKey, sizeof(slotKey), &i, sizeof(i)));
9,688,878!
764
    } else if (pColNode->colType == COLUMN_TYPE_TAG || '\0' == pColNode->tableAlias[0]) {
7,920,033✔
765
      // tag column or pseudo column's function
766
      *tagBlockId = pColNode->dataBlockId;
2,311,012✔
767
    }
768
  }
769

770
  return code;
3,693,560✔
771
_return:
×
772
  taosHashCleanup(*pSlotMap);
×
773
  *pSlotMap = NULL;
×
774
  if (code != TSDB_CODE_SUCCESS) {
×
775
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
776
  }
777
  return code;
×
778
}
779

780
int32_t resetVirtualTableMergeOperState(SOperatorInfo* pOper) {
5,822,021✔
781
  int32_t code = 0, lino = 0;
5,822,021✔
782
  SVirtualScanMergeOperatorInfo* pMergeInfo = pOper->info;
5,822,021✔
783
  SVirtualScanPhysiNode* pPhynode = (SVirtualScanPhysiNode*)pOper->pPhyNode;
5,823,662✔
784
  SVirtualTableScanInfo* pInfo = &pMergeInfo->virtualScanInfo;
5,823,115✔
785
  
786
  pOper->status = OP_NOT_OPENED;
5,823,115✔
787
  resetBasicOperatorState(&pMergeInfo->binfo);
5,821,474✔
788

789
  tsortDestroySortHandle(pInfo->pSortHandle);
5,823,662✔
790
  pInfo->pSortHandle = NULL;
5,822,021✔
791
  // taosArrayDestroy(pInfo->pSortInfo);
792
  // pInfo->pSortInfo = NULL;
793

794
  blockDataDestroy(pInfo->pIntermediateBlock);
5,822,021✔
795
  pInfo->pIntermediateBlock = NULL;
5,821,474✔
796

797
  blockDataDestroy(pInfo->pInputBlock);
5,821,474✔
798
  pInfo->pInputBlock = createDataBlockFromDescNode(((SPhysiNode*)pPhynode)->pOutputDataBlockDesc);
5,819,833✔
799
  TSDB_CHECK_NULL(pInfo->pInputBlock, code, lino, _exit, terrno)
5,823,662!
800

801
  pInfo->tagDownStreamId = -1;
5,821,474✔
802

803
  if (pInfo->pSortCtxList) {
5,822,568✔
804
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSortCtxList); i++) {
72,865✔
805
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pInfo->pSortCtxList, i);
43,817✔
806
      blockDataDestroy(pCtx->pIntermediateBlock);
43,817✔
807
      taosMemoryFree(pCtx);
43,817!
808
    }
809
    taosArrayDestroy(pInfo->pSortCtxList);
29,048✔
810
    pInfo->pSortCtxList = NULL;
29,048✔
811
  }
812

813
  pMergeInfo->pSavedTuple = NULL;
5,821,474✔
814
  pMergeInfo->pSavedTagBlock = NULL;
5,821,474✔
815

816
_exit:
5,822,021✔
817

818
  if (code) {
5,822,021!
819
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
820
  }
821

822
  return code;
5,820,927✔
823
}
824

825
int32_t createVirtualTableMergeOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
3,698,176✔
826
                                            SVirtualScanPhysiNode* pVirtualScanPhyNode,
827
                                            SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
828
  SPhysiNode*                    pPhyNode = (SPhysiNode*)pVirtualScanPhyNode;
3,698,176✔
829
  int32_t                        lino = 0;
3,698,176✔
830
  int32_t                        code = TSDB_CODE_SUCCESS;
3,698,176✔
831
  SVirtualScanMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SVirtualScanMergeOperatorInfo));
3,698,176!
832
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,698,723!
833
  SDataBlockDescNode*            pDescNode = pPhyNode->pOutputDataBlockDesc;
3,698,723✔
834
  SNodeList*                     pMergeKeys = NULL;
3,698,723✔
835

836
  QUERY_CHECK_NULL(pInfo, code, lino, _return, terrno)
3,698,723!
837
  QUERY_CHECK_NULL(pOperator, code, lino, _return, terrno)
3,698,723!
838

839
  pOperator->pPhyNode = pVirtualScanPhyNode;
3,698,723✔
840

841
  pInfo->binfo.inputTsOrder = pVirtualScanPhyNode->scan.node.inputTsOrder;
3,698,723✔
842
  pInfo->binfo.outputTsOrder = pVirtualScanPhyNode->scan.node.outputTsOrder;
3,698,723✔
843

844
  SVirtualTableScanInfo* pVirtualScanInfo = &pInfo->virtualScanInfo;
3,698,723✔
845
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
3,698,723✔
846
  TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _return, terrno)
3,698,723!
847

848
  SSDataBlock* pInputBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
3,698,723✔
849
  TSDB_CHECK_NULL(pInputBlock, code, lino, _return, terrno)
3,698,723!
850
  pVirtualScanInfo->pInputBlock = pInputBlock;
3,698,723✔
851
  pVirtualScanInfo->tagDownStreamId = -1;
3,698,723✔
852
  pVirtualScanInfo->vtableUid = (tb_uid_t)pVirtualScanPhyNode->scan.uid;
3,698,723✔
853
  if (pVirtualScanPhyNode->scan.pScanPseudoCols != NULL) {
3,698,176✔
854
    SExprSupp* pSup = &pVirtualScanInfo->base.pseudoSup;
1,060,536✔
855
    pSup->pExprInfo = NULL;
1,060,536✔
856
    VTS_ERR_JRET(createExprInfo(pVirtualScanPhyNode->scan.pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs));
1,060,536!
857

858
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset,
1,060,536✔
859
                                      &pTaskInfo->storageAPI.functionStore);
860
    TSDB_CHECK_NULL(pSup->pCtx, code, lino, _return, terrno)
1,060,536!
861
  }
862

863
  initResultSizeInfo(&pOperator->resultInfo, 1024);
3,698,176✔
864
  TSDB_CHECK_CODE(blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity), lino, _return);
3,698,176!
865

866
  size_t  numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock);
3,698,723✔
867
  int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
3,698,723✔
868

869
  if (!pVirtualScanPhyNode->scan.node.dynamicOp) {
3,698,723!
870
    VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, 0));
2,721,102!
871
    pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys);
2,721,102✔
872
    TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno)
2,721,102!
873
  } else {
874
    pTaskInfo->dynamicTask = true;
977,621✔
875
  }
876
  pVirtualScanInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols);
3,698,723✔
877
  pVirtualScanInfo->sortBufSize =
3,698,723✔
878
      pVirtualScanInfo->bufPageSize * (numOfDownstream + 1);  // one additional is reserved for merged result.
3,698,723✔
879
  VTS_ERR_JRET(
3,698,723!
880
      extractColMap(pVirtualScanPhyNode->pTargets, &pVirtualScanInfo->dataSlotMap, &pVirtualScanInfo->tsSlotId, &pVirtualScanInfo->tagBlockId));
881

882
  pVirtualScanInfo->scanAllCols = pVirtualScanPhyNode->scanAllCols;
3,698,723!
883

884
  VTS_ERR_JRET(filterInitFromNode((SNode*)pVirtualScanPhyNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo,
3,698,723!
885
                                  0, pTaskInfo->pStreamRuntimeInfo));
886

887
  pVirtualScanInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
3,698,723✔
888
  QUERY_CHECK_NULL(pVirtualScanInfo->base.metaCache.pTableMetaEntryCache, code, lino, _return, terrno)
3,698,723!
889

890
  setOperatorInfo(pOperator, "VirtualTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN, false,
3,698,723✔
891
                  OP_NOT_OPENED, pInfo, pTaskInfo);
892
  pOperator->fpSet =
893
      createOperatorFpSet(openVirtualTableScanOperator, virtualTableGetNext, NULL, destroyVirtualTableScanOperatorInfo,
3,698,723✔
894
                          optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
895
  setOperatorResetStateFn(pOperator, resetVirtualTableMergeOperState);
3,698,723✔
896

897
  if (NULL != pDownstream) {
3,698,176✔
898
    VTS_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));
3,672,450!
899
  } else {
900
    pVirtualScanInfo->tagDownStreamId = -1;
25,726✔
901
  }
902

903
  nodesDestroyList(pMergeKeys);
3,698,723✔
904
  *pOptrInfo = pOperator;
3,698,176✔
905
  return TSDB_CODE_SUCCESS;
3,698,176✔
906

907
_return:
×
908
  if (code != TSDB_CODE_SUCCESS) {
×
909
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
910
  }
911
  if (pInfo != NULL) {
×
912
    destroyVirtualTableScanOperatorInfo(pInfo);
×
913
  }
914
  nodesDestroyList(pMergeKeys);
×
915
  pTaskInfo->code = code;
×
916
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
917
  return code;
×
918
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc