• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4788

14 Oct 2025 11:21AM UTC coverage: 60.992% (-2.3%) from 63.264%
#4788

push

travis-ci

web-flow
Merge 7ca9b50f9 into 19574fe21

154868 of 324306 branches covered (47.75%)

Branch coverage included in aggregate %.

207304 of 269498 relevant lines covered (76.92%)

125773493.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.33
/source/libs/executor/src/virtualtablescanoperator.c
1
/*
2
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
*
4
* This program is free software: you can use, redistribute, and/or modify
5
* it under the terms of the GNU Affero General Public License, version 3
6
* or later ("AGPL"), as published by the Free Software Foundation.
7
*
8
* This program is distributed in the hope that it will be useful, but WITHOUT
9
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
* FITNESS FOR A PARTICULAR PURPOSE.
11
*
12
* You should have received a copy of the GNU Affero General Public License
13
* along with this program. If not, see <http://www.gnu.org/licenses/>.
14
*/
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "tdatablock.h"
21
#include "virtualtablescan.h"
22
#include "tsort.h"
23

24
typedef struct SVirtualTableScanInfo {
25
  STableScanBase base;
26
  SArray*        pSortInfo;
27
  SSortHandle*   pSortHandle;
28
  int32_t        bufPageSize;
29
  uint32_t       sortBufSize;  // max buffer size for in-memory sort
30
  SSDataBlock*   pIntermediateBlock;   // to hold the intermediate result
31
  SSDataBlock*   pInputBlock;
32
  SHashObj*      dataSlotMap;
33
  int32_t        tsSlotId;
34
  int32_t        tagBlockId;
35
  int32_t        tagDownStreamId;
36
  bool           scanAllCols;
37
  SArray*        pSortCtxList;
38
  tb_uid_t       vtableUid;  // virtual table uid, used to identify the vtable scan operator
39
} SVirtualTableScanInfo;
40

41
typedef struct SVirtualScanMergeOperatorInfo {
42
  SOptrBasicInfo        binfo;
43
  EMergeType            type;
44
  SVirtualTableScanInfo virtualScanInfo;
45
  bool                  ignoreGroupId;
46
  uint64_t              groupId;
47
  STupleHandle*         pSavedTuple;
48
  SSDataBlock*          pSavedTagBlock;
49
} SVirtualScanMergeOperatorInfo;
50

51
typedef struct SLoadNextCtx {
52
  SOperatorInfo*  pOperator;
53
  SOperatorParam* pOperatorGetParam;
54
  int32_t         blockId;
55
  STimeWindow     window;
56
  SSDataBlock*    pIntermediateBlock;
57
  col_id_t        tsSlotId;
58
} SLoadNextCtx;
59

60
int32_t virtualScanloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
25,133,420✔
61
  SOperatorInfo* pOperator = (SOperatorInfo*)param;
25,133,420✔
62
  int32_t        code = TSDB_CODE_SUCCESS;
25,133,420✔
63

64
  VTS_ERR_JRET(pOperator->fpSet.getNextFn(pOperator, ppBlock));
25,133,420!
65
  VTS_ERR_JRET(blockDataCheck(*ppBlock));
25,133,420!
66

67
  return code;
25,133,420✔
68
_return:
×
69
  qError("failed to load data block from downstream, %s code:%s", __func__, tstrerror(code));
×
70
  return code;
×
71
}
72

73
int32_t getTimeWindowOfBlock(SSDataBlock *pBlock, col_id_t tsSlotId, int64_t *startTs, int64_t *endTs) {
22,624,665✔
74
  int32_t code = TSDB_CODE_SUCCESS;
22,624,665✔
75
  int32_t lino = 0;
22,624,665✔
76
  int32_t tsIndex = -1;
22,624,665✔
77
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
46,981,256!
78
    if (((SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i))->info.colId == tsSlotId) {
46,981,256✔
79
      tsIndex = i;
22,624,665✔
80
      break;
22,624,665✔
81
    }
82
  }
83

84
  if (tsIndex == -1) {
22,624,665!
85
    tsIndex = (int32_t)taosArrayGetSize(pBlock->pDataBlock) - 1;
×
86
  }
87

88
  SColumnInfoData *pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, tsIndex);
22,624,665✔
89
  QUERY_CHECK_NULL(pColData, code, lino, _return, terrno)
22,624,665!
90

91
  GET_TYPED_DATA(*startTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, 0), 0);
22,624,665✔
92
  GET_TYPED_DATA(*endTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, pBlock->info.rows - 1), 0);
22,624,665✔
93

94
  return code;
22,624,665✔
95
_return:
×
96
  qError("failed to get time window of block, %s code:%s, line:%d", __func__, tstrerror(code), lino);
×
97
  return code;
×
98
}
99

100
int32_t virtualScanloadNextDataBlockFromParam(void* param, SSDataBlock** ppBlock) {
30,295,016✔
101
  SLoadNextCtx*           pCtx = (SLoadNextCtx*)param;
30,295,016✔
102
  SOperatorInfo*          pOperator = pCtx->pOperator;
30,295,016✔
103
  SOperatorParam*         pOperatorGetParam = pCtx->pOperatorGetParam;
30,295,016✔
104
  int32_t                 code = TSDB_CODE_SUCCESS;
30,295,016✔
105
  SSDataBlock*            pRes = NULL;
30,295,016✔
106
  SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperatorGetParam->value;
30,295,016✔
107

108
  pParam->basic.window = pCtx->window;
30,295,016✔
109
  pOperator->status = OP_NOT_OPENED;
30,295,016✔
110
  if (pCtx->pIntermediateBlock) {
30,295,016✔
111
    blockDataDestroy(pCtx->pIntermediateBlock);
22,568,736✔
112
    pCtx->pIntermediateBlock = NULL;
22,568,736✔
113
  }
114

115
  VTS_ERR_JRET(pOperator->fpSet.getNextExtFn(pOperator, pOperatorGetParam, &pRes));
30,295,016!
116

117
  VTS_ERR_JRET(blockDataCheck(pRes));
30,295,016!
118
  if ((pRes)) {
30,295,016✔
119
    qDebug("%s load from downstream, blockId:%d", __func__, pCtx->blockId);
22,624,665✔
120
    (pRes)->info.id.blockId = pCtx->blockId;
22,624,665✔
121
    VTS_ERR_JRET(getTimeWindowOfBlock(pRes, pCtx->tsSlotId, &pCtx->window.skey, &pCtx->window.ekey));
22,624,665!
122
    VTS_ERR_JRET(createOneDataBlock(pRes, true, &pCtx->pIntermediateBlock));
22,624,665!
123
    *ppBlock = pCtx->pIntermediateBlock;
22,624,665✔
124
  } else {
125
    pCtx->window.ekey = INT64_MAX;
7,670,351✔
126
    *ppBlock = NULL;
7,670,351✔
127
  }
128

129
  return code;
30,295,016✔
130
_return:
×
131
  qError("failed to load data block from downstream, %s code:%s", __func__, tstrerror(code));
×
132
  return code;
×
133
}
134

135
int32_t makeTSMergeKey(SNodeList** pMergeKeys, col_id_t tsSlotId) {
5,354,680✔
136
  int32_t           code = TSDB_CODE_SUCCESS;
5,354,680✔
137
  SNodeList        *pNodeList = NULL;
5,354,680✔
138
  SColumnNode      *pColumnNode = NULL;
5,354,680✔
139
  SOrderByExprNode *pOrderByExprNode = NULL;
5,354,680✔
140

141
  VTS_ERR_JRET(nodesMakeList(&pNodeList));
5,354,680!
142

143
  VTS_ERR_JRET(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pColumnNode));
5,354,680!
144
  pColumnNode->slotId = tsSlotId;
5,354,680✔
145

146
  VTS_ERR_JRET(nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR, (SNode**)&pOrderByExprNode));
5,354,680!
147
  pOrderByExprNode->pExpr = (SNode*)pColumnNode;
5,354,680✔
148
  pOrderByExprNode->order = ORDER_ASC;
5,354,680✔
149
  pOrderByExprNode->nullOrder = NULL_ORDER_FIRST;
5,354,680✔
150

151
  VTS_ERR_JRET(nodesListAppend(pNodeList, (SNode*)pOrderByExprNode));
5,354,680!
152

153
  *pMergeKeys = pNodeList;
5,354,680✔
154
  return code;
5,354,680✔
155
_return:
×
156
  nodesDestroyNode((SNode*)pColumnNode);
×
157
  nodesDestroyNode((SNode*)pOrderByExprNode);
×
158
  nodesDestroyList(pNodeList);
×
159
  return code;
×
160
}
161

162
void cleanUpVirtualScanInfo(SVirtualTableScanInfo* pVirtualScanInfo) {
3,064,698✔
163
  if (pVirtualScanInfo->pSortInfo) {
3,064,698✔
164
    taosArrayDestroy(pVirtualScanInfo->pSortInfo);
2,304,362✔
165
    pVirtualScanInfo->pSortInfo = NULL;
2,304,362✔
166
  }
167
  if (pVirtualScanInfo->pSortHandle) {
3,064,698✔
168
    tsortDestroySortHandle(pVirtualScanInfo->pSortHandle);
2,136,818✔
169
    pVirtualScanInfo->pSortHandle = NULL;
2,136,818✔
170
  }
171
  if (pVirtualScanInfo->pSortCtxList) {
3,064,698✔
172
    for (int32_t i = 0; i < taosArrayGetSize(pVirtualScanInfo->pSortCtxList); i++) {
7,184,360✔
173
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pVirtualScanInfo->pSortCtxList, i);
5,047,542✔
174
      blockDataDestroy(pCtx->pIntermediateBlock);
5,047,542✔
175
      taosMemoryFree(pCtx);
5,047,542!
176
    }
177
    taosArrayDestroy(pVirtualScanInfo->pSortCtxList);
2,136,818✔
178
  }
179
}
3,064,698✔
180

181
int32_t createSortHandleFromParam(SOperatorInfo* pOperator) {
3,064,698✔
182
  int32_t                         code = TSDB_CODE_SUCCESS;
3,064,698✔
183
  int32_t                         lino = 0;
3,064,698✔
184
  SVirtualScanMergeOperatorInfo*  pInfo = pOperator->info;
3,064,698✔
185
  SVirtualTableScanInfo*          pVirtualScanInfo = &pInfo->virtualScanInfo;
3,064,698✔
186
  SVTableScanOperatorParam *      pParam = (SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value;
3,064,698✔
187
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
3,064,698✔
188
  pVirtualScanInfo->sortBufSize = pVirtualScanInfo->bufPageSize * (taosArrayGetSize((pParam)->pOpParamArray) + 1);
3,064,698✔
189
  int32_t                         numOfBufPage = (int32_t)pVirtualScanInfo->sortBufSize / pVirtualScanInfo->bufPageSize;
3,064,698!
190
  SNodeList*                      pMergeKeys = NULL;
3,064,698✔
191
  SSortSource*                    ps = NULL;
3,064,698✔
192
  int32_t                         scanOpIndex = 0;
3,064,698✔
193

194
  cleanUpVirtualScanInfo(pVirtualScanInfo);
3,064,698✔
195
  VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, pVirtualScanInfo->tsSlotId));
3,064,698!
196
  pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys);
3,064,698✔
197
  TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno)
3,064,698!
198
  nodesDestroyList(pMergeKeys);
3,064,698✔
199

200
  VTS_ERR_JRET(tsortCreateSortHandle(pVirtualScanInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pVirtualScanInfo->bufPageSize,
3,064,698!
201
                                     numOfBufPage, pVirtualScanInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pVirtualScanInfo->pSortHandle));
202

203
  tsortSetForceUsePQSort(pVirtualScanInfo->pSortHandle);
3,064,698✔
204
  tsortSetFetchRawDataFp(pVirtualScanInfo->pSortHandle, virtualScanloadNextDataBlockFromParam, NULL, NULL);
3,064,698✔
205

206
  if (pOperator->numOfDownstream > 2) {
3,064,698!
207
    qError("virtual scan operator should not have more than 2 downstreams, current numOfDownstream:%d", pOperator->numOfDownstream);
×
208
    VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM);
×
209
  }
210

211
  pVirtualScanInfo->tagDownStreamId = -1;
3,064,698✔
212
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
8,373,966✔
213
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
5,309,268✔
214
    if (pDownstream->resultDataBlockId == pVirtualScanInfo->tagBlockId) {
5,309,268✔
215
      // tag block do not need sort
216
      pVirtualScanInfo->tagDownStreamId = i;
2,244,570✔
217
      pInfo->pSavedTagBlock = NULL;
2,244,570✔
218
      continue;
2,244,570✔
219
    }
220
  }
221
  scanOpIndex = pVirtualScanInfo->tagDownStreamId == -1 ? 0 : 1 - pVirtualScanInfo->tagDownStreamId;
3,064,698✔
222

223
  pOperator->pDownstream[scanOpIndex]->status = OP_NOT_OPENED;
3,064,698✔
224
  pVirtualScanInfo->pSortCtxList = taosArrayInit(taosArrayGetSize((pParam)->pOpParamArray), POINTER_BYTES);
3,064,698✔
225
  TSDB_CHECK_NULL(pVirtualScanInfo->pSortCtxList, code, lino, _return, terrno)
3,064,698!
226
  for (int32_t i = 0; i < taosArrayGetSize((pParam)->pOpParamArray); i++) {
10,790,978✔
227
    SOperatorParam* pOpParam = *(SOperatorParam**)taosArrayGet((pParam)->pOpParamArray, i);
7,726,280✔
228
    SLoadNextCtx*   pCtx = NULL;
7,726,280✔
229
    ps = NULL;
7,726,280✔
230

231
    pCtx = taosMemoryMalloc(sizeof(SLoadNextCtx));
7,726,280!
232
    QUERY_CHECK_NULL(pCtx, code, lino, _return, terrno)
7,726,280!
233
    pCtx->blockId = i;
7,726,280✔
234
    pCtx->pOperator = pOperator->pDownstream[scanOpIndex];
7,726,280✔
235
    pCtx->pOperatorGetParam = pOpParam;
7,726,280✔
236
    pCtx->window = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN};
7,726,280✔
237
    pCtx->pIntermediateBlock = NULL;
7,726,280✔
238
    pCtx->tsSlotId = (col_id_t)pVirtualScanInfo->tsSlotId;
7,726,280✔
239

240
    ps = taosMemoryCalloc(1, sizeof(SSortSource));
7,726,280!
241
    QUERY_CHECK_NULL(ps, code, lino, _return, terrno)
7,726,280!
242

243
    ps->param = pCtx;
7,726,280✔
244
    ps->onlyRef = true;
7,726,280✔
245

246
    VTS_ERR_JRET(tsortAddSource(pVirtualScanInfo->pSortHandle, ps));
7,726,280!
247
    QUERY_CHECK_NULL(taosArrayPush(pVirtualScanInfo->pSortCtxList, &pCtx), code, lino, _return, terrno)
15,452,560!
248
  }
249

250
  VTS_ERR_JRET(tsortOpen(pVirtualScanInfo->pSortHandle));
3,064,698!
251

252
  return code;
3,064,698✔
253
_return:
×
254
  if (code != 0){
×
255
    qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
256
  }
257
  nodesDestroyList(pMergeKeys);
×
258
  if (ps != NULL) {
×
259
    taosMemoryFree(ps);
×
260
  }
261
  return code;
×
262
}
263

264
int32_t createSortHandle(SOperatorInfo* pOperator) {
2,269,021✔
265
  SVirtualScanMergeOperatorInfo * pInfo = pOperator->info;
2,269,021✔
266
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
2,269,021✔
267
  SVirtualTableScanInfo*          pVirtualScanInfo = &pInfo->virtualScanInfo;
2,269,021✔
268
  int32_t                         numOfBufPage = (int32_t)pVirtualScanInfo->sortBufSize / pVirtualScanInfo->bufPageSize;
2,269,021!
269
  SSortSource*                    ps = NULL;
2,269,021✔
270
  int32_t                         code = 0;
2,269,021✔
271
  int32_t                         lino = 0;
2,269,021✔
272

273
  VTS_ERR_JRET(tsortCreateSortHandle(pVirtualScanInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pVirtualScanInfo->bufPageSize,
2,269,021!
274
                                     numOfBufPage, pVirtualScanInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pVirtualScanInfo->pSortHandle));
275

276
  tsortSetForceUsePQSort(pVirtualScanInfo->pSortHandle);
2,269,021✔
277
  tsortSetFetchRawDataFp(pVirtualScanInfo->pSortHandle, virtualScanloadNextDataBlock, NULL, NULL);
2,269,021✔
278

279
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
10,276,797✔
280
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
8,007,776✔
281
    if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
8,007,776!
282
      VTS_ERR_JRET(pDownstream->fpSet._openFn(pDownstream));
8,007,776!
283
    } else {
284
      VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM);
×
285
    }
286

287
    if (pDownstream->resultDataBlockId == pVirtualScanInfo->tagBlockId) {
8,007,776✔
288
      // tag block do not need sort
289
      pVirtualScanInfo->tagDownStreamId = i;
225,540✔
290
      continue;
225,540✔
291
    }
292

293
    ps = taosMemoryCalloc(1, sizeof(SSortSource));
7,782,236!
294
    TSDB_CHECK_NULL(ps, code, lino, _return, terrno)
7,782,236!
295

296
    ps->param = pDownstream;
7,782,236✔
297
    ps->onlyRef = true;
7,782,236✔
298

299
    VTS_ERR_JRET(tsortAddSource(pVirtualScanInfo->pSortHandle, ps));
7,782,236!
300
    ps = NULL;
7,782,236✔
301
  }
302

303
  VTS_ERR_JRET(tsortOpen(pVirtualScanInfo->pSortHandle));
2,269,021!
304

305
_return:
2,269,021✔
306
  if (code != 0){
2,269,021!
307
    qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
308
  }
309
  if (ps != NULL) {
2,269,021!
310
    taosMemoryFree(ps);
×
311
  }
312
  return code;
2,269,021✔
313
}
314

315
int32_t openVirtualTableScanOperatorImpl(SOperatorInfo* pOperator) {
5,354,680✔
316
  int32_t                         code = 0;
5,354,680✔
317
  int32_t                         lino = 0;
5,354,680✔
318

319
  if (pOperator->numOfDownstream == 0) {
5,354,680✔
320
    return code;
20,961✔
321
  }
322

323
  if (pOperator->pOperatorGetParam) {
5,333,719✔
324
    VTS_ERR_JRET(createSortHandleFromParam(pOperator));
3,064,698!
325
  } else {
326
    VTS_ERR_JRET(createSortHandle(pOperator));
2,269,021!
327
  }
328

329
  return code;
5,333,719✔
330

331
_return:
×
332
  qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
333
  return code;
×
334
}
335

336
int32_t openVirtualTableScanOperator(SOperatorInfo* pOperator) {
62,502,961✔
337
  int32_t code = 0;
62,502,961✔
338

339
  if (OPTR_IS_OPENED(pOperator)) {
62,502,961✔
340
    return TSDB_CODE_SUCCESS;
57,148,281✔
341
  }
342

343
  int64_t startTs = taosGetTimestampUs();
5,354,680✔
344

345
  code = openVirtualTableScanOperatorImpl(pOperator);
5,354,680✔
346

347
  pOperator->cost.openCost = (double)(taosGetTimestampUs() - startTs) / 1000.0;
5,354,680✔
348
  pOperator->status = OP_RES_TO_RETURN;
5,354,680✔
349

350
  VTS_ERR_RET(code);
5,354,680!
351

352
  OPTR_SET_OPENED(pOperator);
5,354,680✔
353
  return code;
5,354,680✔
354
}
355

356
static int32_t doGetVtableMergedBlockData(SVirtualScanMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
36,991,347✔
357
                                          SSDataBlock* p) {
358
  int32_t code = 0;
36,991,347✔
359
  int64_t lastTs = 0;
36,991,347✔
360
  int64_t rowNums = -1;
36,991,347✔
361
  blockDataEmpty(p);
36,991,347✔
362
  while (1) {
2,147,483,647✔
363
    STupleHandle* pTupleHandle = NULL;
2,147,483,647✔
364
    if (!pInfo->pSavedTuple) {
2,147,483,647✔
365
      code = tsortNextTuple(pHandle, &pTupleHandle);
2,147,483,647✔
366
      if (pTupleHandle == NULL || (code != 0)) {
2,147,483,647!
367
        break;
368
      }
369
    } else {
370
      pTupleHandle = pInfo->pSavedTuple;
32,829,107✔
371
      pInfo->pSavedTuple = NULL;
32,829,107✔
372
    }
373

374
    SDataBlockInfo info = {0};
2,147,483,647✔
375
    tsortGetBlockInfo(pTupleHandle, &info);
2,147,483,647✔
376
    int32_t blockId = (int32_t)info.id.blockId;
2,147,483,647✔
377

378
    for (int32_t i = 0; i < (pInfo->virtualScanInfo.scanAllCols ? 1 : tsortGetColNum(pTupleHandle)); i++) {
2,147,483,647!
379
      bool isNull = tsortIsNullVal(pTupleHandle, i);
2,147,483,647✔
380
      if (isNull) {
2,147,483,647✔
381
        int32_t slotKey = blockId << 16 | i;
130,080✔
382
        void*   slotId = taosHashGet(pInfo->virtualScanInfo.dataSlotMap, &slotKey, sizeof(slotKey));
130,080✔
383
        if (slotId == NULL) {
130,080!
384
          if (i == 0) {
×
385
            colDataSetNULL(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums);
×
386
          } else {
387
            qError("failed to get slotId from dataSlotMap, blockId:%d, slotId:%d", blockId, i);
×
388
            VTS_ERR_RET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
389
          }
390
        } else {
391
          colDataSetNULL(taosArrayGet(p->pDataBlock, *(int32_t *)slotId), rowNums);
130,080✔
392
        }
393
      } else {
394
        char* pData = NULL;
2,147,483,647✔
395
        tsortGetValue(pTupleHandle, i, (void**)&pData);
2,147,483,647✔
396

397
        if (pData != NULL) {
2,147,483,647✔
398
          if (i == 0) {
2,147,483,647✔
399
            if (lastTs != *(int64_t*)pData) {
2,147,483,647✔
400
              if (rowNums >= capacity - 1) {
2,147,483,647✔
401
                pInfo->pSavedTuple = pTupleHandle;
33,166,499✔
402
                goto _return;
33,166,499✔
403
              }
404
              rowNums++;
2,147,483,647✔
405
              for (int32_t j = 0; j < taosArrayGetSize(p->pDataBlock); j++) {
2,147,483,647✔
406
                colDataSetNULL(taosArrayGet(p->pDataBlock, j), rowNums);
2,147,483,647✔
407
              }
408
              if (pInfo->virtualScanInfo.tsSlotId != -1) {
2,147,483,647✔
409
                VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums, pData, false));
2,147,483,647!
410
              }
411
              lastTs = *(int64_t*)pData;
2,147,483,647✔
412
            }
413
          }
414
          int32_t slotKey = blockId << 16 | i;
2,147,483,647✔
415
          void*   slotId = taosHashGet(pInfo->virtualScanInfo.dataSlotMap, &slotKey, sizeof(slotKey));
2,147,483,647✔
416
          if (slotId == NULL) {
2,147,483,647✔
417
            if (i == 0) {
2,147,483,647!
418
              continue;
2,147,483,647✔
419
            } else {
420
              qError("failed to get slotId from dataSlotMap, blockId:%d, slotId:%d", blockId, i);
×
421
              VTS_ERR_RET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
422
            }
423
          }
424
          VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, *(int32_t *)slotId), rowNums, pData, false));
2,147,483,647!
425
        }
426
      }
427
    }
428
  }
429
_return:
36,991,347✔
430
  p->info.rows = rowNums + 1;
36,991,347✔
431
  p->info.dataLoad = 1;
36,991,347✔
432
  p->info.scanFlag = MAIN_SCAN;
36,991,347✔
433
  return code;
36,991,347✔
434
}
435

436
static int32_t doGetVStableMergedBlockData(SVirtualScanMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
40,263,350✔
437
                                           SSDataBlock* p) {
438
  int32_t code = 0;
40,263,350✔
439
  int64_t lastTs = 0;
40,263,350✔
440
  int64_t rowNums = -1;
40,263,350✔
441
  blockDataEmpty(p);
40,263,350✔
442
  while (1) {
2,147,483,647✔
443
    STupleHandle* pTupleHandle = NULL;
2,147,483,647✔
444
    if (!pInfo->pSavedTuple) {
2,147,483,647✔
445
      code = tsortNextTuple(pHandle, &pTupleHandle);
2,147,483,647✔
446
      if (pTupleHandle == NULL || (code != 0)) {
2,147,483,647!
447
        break;
448
      }
449
    } else {
450
      pTupleHandle = pInfo->pSavedTuple;
34,950,704✔
451
      pInfo->pSavedTuple = NULL;
34,950,704✔
452
    }
453

454
    int32_t tsIndex = -1;
2,147,483,647✔
455

456
    for (int32_t i = 0; i < tsortGetColNum(pTupleHandle); i++) {
2,147,483,647!
457
      if (tsortIsNullVal(pTupleHandle, i)) {
2,147,483,647✔
458
        continue;
2,147,483,647✔
459
      } else {
460
        SColumnInfoData *pColInfo = NULL;
2,147,483,647✔
461
        tsortGetColumnInfo(pTupleHandle, i, &pColInfo);
2,147,483,647✔
462
        if (pColInfo->info.slotId ==  pInfo->virtualScanInfo.tsSlotId) {
2,147,483,647✔
463
          tsIndex = i;
2,147,483,647✔
464
          break;
2,147,483,647✔
465
        }
466
      }
467
    }
468

469
    if (tsIndex == -1) {
2,147,483,647!
470
      tsIndex = (int32_t)tsortGetColNum(pTupleHandle) - 1;
×
471
    }
472

473
    char* pData = NULL;
2,147,483,647✔
474
    // first, set ts slot's data
475
    // then, set other slots' data
476
    tsortGetValue(pTupleHandle, tsIndex, (void**)&pData);
2,147,483,647✔
477

478
    if (pData != NULL) {
2,147,483,647!
479
      if (lastTs != *(int64_t*)pData) {
2,147,483,647✔
480
        if (rowNums >= capacity - 1) {
2,147,483,647✔
481
          pInfo->pSavedTuple = pTupleHandle;
34,966,424✔
482
          goto _return;
34,966,424✔
483
        }
484
        rowNums++;
2,147,483,647✔
485
        for (int32_t j = 0; j < taosArrayGetSize(p->pDataBlock); j++) {
2,147,483,647✔
486
          colDataSetNULL(taosArrayGet(p->pDataBlock, j), rowNums);
2,147,483,647✔
487
        }
488
        if (pInfo->virtualScanInfo.tsSlotId != -1) {
2,147,483,647!
489
          VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums, pData, false));
2,147,483,647!
490
        }
491
        lastTs = *(int64_t*)pData;
2,147,483,647✔
492
      }
493
    }
494
    if (pInfo->virtualScanInfo.scanAllCols) {
2,147,483,647!
495
      continue;
2,147,483,647✔
496
    }
497

498
    for (int32_t i = 0; i < tsortGetColNum(pTupleHandle); i++) {
2,147,483,647✔
499
      if (i == tsIndex || tsortIsNullVal(pTupleHandle, i)) {
2,147,483,647✔
500
        continue;
2,147,483,647✔
501
      }
502

503
      SColumnInfoData *pColInfo = NULL;
2,147,483,647✔
504
      tsortGetColumnInfo(pTupleHandle, i, &pColInfo);
2,147,483,647✔
505
      tsortGetValue(pTupleHandle, i, (void**)&pData);
2,147,483,647✔
506

507
      if (pData != NULL) {
2,147,483,647!
508
        VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, i), rowNums, pData, false));
2,147,483,647!
509
      }
510
    }
511
  }
512
_return:
40,263,350✔
513
  p->info.rows = rowNums + 1;
40,263,350✔
514
  p->info.dataLoad = 1;
40,263,350✔
515
  p->info.scanFlag = MAIN_SCAN;
40,263,350✔
516
  return code;
40,263,350✔
517
}
518

519
int32_t doVirtualTableMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
77,275,658✔
520
  int32_t                        code = 0;
77,275,658✔
521
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
77,275,658✔
522
  SVirtualScanMergeOperatorInfo* pInfo = pOperator->info;
77,275,658✔
523
  SVirtualTableScanInfo*         pVirtualScanInfo = &pInfo->virtualScanInfo;
77,275,658✔
524
  SSortHandle*                   pHandle = pVirtualScanInfo->pSortHandle;
77,275,658✔
525
  SSDataBlock*                   pDataBlock = pInfo->binfo.pRes;
77,275,658✔
526
  int32_t                        capacity = pOperator->resultInfo.capacity;
77,275,658✔
527

528
  qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
77,275,658✔
529
  blockDataCleanup(pDataBlock);
77,275,658✔
530

531
  if (pHandle == NULL) {
77,275,658✔
532
    return TSDB_CODE_SUCCESS;
20,961✔
533
  }
534

535
  if (pVirtualScanInfo->pIntermediateBlock == NULL) {
77,254,697✔
536
    VTS_ERR_RET(tsortGetSortedDataBlock(pHandle, &pVirtualScanInfo->pIntermediateBlock));
3,196,901!
537
    if (pVirtualScanInfo->pIntermediateBlock == NULL) {
3,196,901!
538
      return TSDB_CODE_SUCCESS;
×
539
    }
540

541
    VTS_ERR_RET(blockDataEnsureCapacity(pVirtualScanInfo->pIntermediateBlock, capacity));
3,196,901!
542
  } else {
543
    blockDataCleanup(pVirtualScanInfo->pIntermediateBlock);
74,057,796✔
544
  }
545

546
  SSDataBlock* p = pVirtualScanInfo->pIntermediateBlock;
77,254,697✔
547
  if (pOperator->pOperatorGetParam) {
77,254,697✔
548
    VTS_ERR_RET(doGetVStableMergedBlockData(pInfo, pHandle, capacity, p));
40,263,350!
549
  } else {
550
    VTS_ERR_RET(doGetVtableMergedBlockData(pInfo, pHandle, capacity, p));
36,991,347!
551
  }
552

553
  VTS_ERR_RET(copyDataBlock(pDataBlock, p));
77,254,697!
554
  qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
77,254,697✔
555
         pDataBlock->info.rows);
556

557
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
77,254,697✔
558
  return code;
77,254,697✔
559
}
560

561
int32_t vtableAddTagPseudoColumnData(const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* tagBlock, SSDataBlock* pBlock, int32_t rows) {
31,165,782✔
562
  int32_t          code = TSDB_CODE_SUCCESS;
31,165,782✔
563
  int32_t          lino = 0;
31,165,782✔
564
  int64_t          backupRows;
565
  // currently only the tbname pseudo column
566
  if (numOfExpr <= 0) {
31,165,782!
567
    return TSDB_CODE_SUCCESS;
×
568
  }
569

570
  if (tagBlock == NULL) {
31,165,782!
571
    return TSDB_CODE_SUCCESS;
×
572
  }
573

574
  if (tagBlock->info.rows != 1) {
31,165,782!
575
    qError("tag block should have only one row, current rows:%" PRId64, tagBlock->info.rows);
×
576
    VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
577
  }
578

579
  backupRows = pBlock->info.rows;
31,165,782✔
580
  pBlock->info.rows = rows;
31,165,782✔
581
  for (int32_t j = 0; j < numOfExpr; ++j) {
116,098,806✔
582
    const SExprInfo* pExpr1 = &pExpr[j];
84,933,024✔
583
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
84,933,024✔
584

585
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
84,933,024✔
586
    TSDB_CHECK_NULL(pColInfoData, code, lino, _return, terrno)
84,933,024!
587
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
84,933,024✔
588

589
    SColumnInfoData* pTagInfoData = taosArrayGet(tagBlock->pDataBlock, j);
84,933,024✔
590
    TSDB_CHECK_NULL(pTagInfoData, code, lino, _return, terrno)
84,933,024!
591

592
    if (colDataIsNull_s(pTagInfoData, 0) || IS_JSON_NULL(pTagInfoData->info.type, colDataGetData(pTagInfoData, 0))) {
84,933,024!
593
      colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
594
      continue;
×
595
    }
596

597
    char* data = colDataGetData(pTagInfoData, 0);
84,933,024!
598

599
    if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
84,933,024!
600
      code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, 1, false);
84,933,024✔
601
      QUERY_CHECK_CODE(code, lino, _return);
84,933,024!
602
    } else {  // todo opt for json tag
603
      for (int32_t i = 0; i < pBlock->info.rows; ++i) {
×
604
        code = colDataSetVal(pColInfoData, i, data, false);
×
605
        QUERY_CHECK_CODE(code, lino, _return);
×
606
      }
607
    }
608
  }
609

610
  // restore the rows
611
  pBlock->info.rows = backupRows;
31,165,782✔
612

613
_return:
31,165,782✔
614

615
  if (code != TSDB_CODE_SUCCESS) {
31,165,782!
616
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
617
  }
618
  return code;
31,165,782✔
619
}
620

621
static int32_t doSetTagColumnData(SVirtualTableScanInfo* pInfo, SSDataBlock* pTagBlock, SSDataBlock* pBlock, int32_t rows) {
72,274,090✔
622
  int32_t         code = 0;
72,274,090✔
623
  STableScanBase* pTableScanInfo = &pInfo->base;
72,274,090✔
624
  SExprSupp*      pSup = &pTableScanInfo->pseudoSup;
72,274,090✔
625
  if (pSup->numOfExprs > 0) {
72,274,090✔
626
    VTS_ERR_RET(vtableAddTagPseudoColumnData(pSup->pExprInfo, pSup->numOfExprs, pTagBlock, pBlock, rows));
31,165,782!
627
  }
628

629
  return code;
72,274,090✔
630
}
631

632
int32_t virtualTableGetNext(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
62,502,961✔
633
  int32_t                        code = TSDB_CODE_SUCCESS;
62,502,961✔
634
  int32_t                        lino = 0;
62,502,961✔
635
  SVirtualScanMergeOperatorInfo* pInfo = pOperator->info;
62,502,961✔
636
  SVirtualTableScanInfo*         pVirtualScanInfo = &pInfo->virtualScanInfo;
62,502,961✔
637
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
62,502,961✔
638

639
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
62,502,961!
640
    pResBlock = NULL;
×
641
    return code;
×
642
  }
643

644
  VTS_ERR_JRET(pOperator->fpSet._openFn(pOperator));
62,502,961!
645

646
  if (pVirtualScanInfo->tagBlockId != -1 && pVirtualScanInfo->tagDownStreamId != -1 && !pInfo->pSavedTagBlock) {
62,502,961✔
647
    SSDataBlock*   pTagBlock = NULL;
2,470,110✔
648
    SOperatorInfo *pTagScanOp = pOperator->pDownstream[pVirtualScanInfo->tagDownStreamId];
2,470,110✔
649
    if (pOperator->pOperatorGetParam) {
2,470,110✔
650
      SOperatorParam* pTagOp = ((SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value)->pTagScanOp;
2,244,570✔
651
      VTS_ERR_JRET(pTagScanOp->fpSet.getNextExtFn(pTagScanOp, pTagOp, &pTagBlock));
2,244,570!
652
    } else {
653
      VTS_ERR_JRET(pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagBlock));
225,540!
654
    }
655

656
    if (pTagBlock == NULL || pTagBlock->info.rows != 1) {
2,470,110!
657
      VTS_ERR_JRET(TSDB_CODE_FAILED);
×
658
    }
659
    pInfo->pSavedTagBlock = pTagBlock;
2,470,110✔
660
  }
661

662
  while(1) {
663
    VTS_ERR_JRET(doVirtualTableMerge(pOperator, pResBlock));
77,275,658!
664
    if (*pResBlock == NULL) {
77,275,658✔
665
      setOperatorCompleted(pOperator);
5,001,568✔
666
      break;
5,001,568✔
667
    }
668

669
    if (pOperator->pOperatorGetParam) {
72,274,090✔
670
      uint64_t uid = ((SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value)->uid;
37,214,372✔
671
      (*pResBlock)->info.id.uid = uid;
37,214,372✔
672
    } else {
673
      (*pResBlock)->info.id.uid = pInfo->virtualScanInfo.vtableUid;
35,059,718✔
674
    }
675

676
    VTS_ERR_JRET(doSetTagColumnData(pVirtualScanInfo, pInfo->pSavedTagBlock, (*pResBlock), (*pResBlock)->info.rows));
72,274,090!
677
    VTS_ERR_JRET(doFilter(*pResBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL));
72,274,090!
678
    if ((*pResBlock)->info.rows > 0) {
72,274,090✔
679
      break;
57,501,393✔
680
    }
681
  }
682

683
  return code;
62,502,961✔
684
_return:
×
685
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
686
  pTaskInfo->code = code;
×
687
  T_LONG_JMP(pTaskInfo->env, code);
×
688
}
689

690
static void destroyTableScanBase(STableScanBase* pBase, TsdReader* pAPI) {
3,050,318✔
691
  cleanupQueryTableDataCond(&pBase->cond);
3,050,318✔
692

693
  if (pAPI->tsdReaderClose) {
3,050,318!
694
    pAPI->tsdReaderClose(pBase->dataReader);
×
695
  }
696
  pBase->dataReader = NULL;
3,050,318✔
697

698
  if (pBase->matchInfo.pList != NULL) {
3,050,318!
699
    taosArrayDestroy(pBase->matchInfo.pList);
×
700
  }
701

702
  taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
3,050,318✔
703
  cleanupExprSupp(&pBase->pseudoSup);
3,050,318✔
704
}
3,050,318✔
705

706
void destroyVirtualTableScanOperatorInfo(void* param) {
3,050,318✔
707
  if (!param) {
3,050,318!
708
    return;
×
709
  }
710
  SVirtualScanMergeOperatorInfo* pOperatorInfo = (SVirtualScanMergeOperatorInfo*)param;
3,050,318✔
711
  SVirtualTableScanInfo* pInfo = &pOperatorInfo->virtualScanInfo;
3,050,318✔
712
  blockDataDestroy(pOperatorInfo->binfo.pRes);
3,050,318✔
713
  pOperatorInfo->binfo.pRes = NULL;
3,050,318✔
714

715
  tsortDestroySortHandle(pInfo->pSortHandle);
3,050,318✔
716
  pInfo->pSortHandle = NULL;
3,050,318✔
717
  taosArrayDestroy(pInfo->pSortInfo);
3,050,318✔
718
  pInfo->pSortInfo = NULL;
3,050,318✔
719

720
  blockDataDestroy(pInfo->pIntermediateBlock);
3,050,318✔
721
  pInfo->pIntermediateBlock = NULL;
3,050,318✔
722

723
  blockDataDestroy(pInfo->pInputBlock);
3,050,318✔
724
  pInfo->pInputBlock = NULL;
3,050,318✔
725
  destroyTableScanBase(&pInfo->base, &pInfo->base.readerAPI);
3,050,318✔
726

727
  taosHashCleanup(pInfo->dataSlotMap);
3,050,318✔
728

729
  if (pInfo->pSortCtxList) {
3,050,318✔
730
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSortCtxList); i++) {
3,169,890✔
731
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pInfo->pSortCtxList, i);
2,453,030✔
732
      blockDataDestroy(pCtx->pIntermediateBlock);
2,453,030✔
733
      taosMemoryFree(pCtx);
2,453,030!
734
    }
735
    taosArrayDestroy(pInfo->pSortCtxList);
716,860✔
736
    pInfo->pSortCtxList = NULL;
716,860✔
737
  }
738
  taosMemoryFreeClear(param);
3,050,318!
739
}
740

741
int32_t extractColMap(SNodeList* pNodeList, SHashObj** pSlotMap, int32_t *tsSlotId, int32_t *tagBlockId) {
3,050,318✔
742
  size_t  numOfCols = LIST_LENGTH(pNodeList);
3,050,318✔
743
  int32_t code = TSDB_CODE_SUCCESS;
3,050,318✔
744
  int32_t lino = 0;
3,050,318✔
745

746
  if (numOfCols == 0) {
3,050,318✔
747
    return code;
3,834✔
748
  }
749

750
  *tsSlotId = -1;
3,046,484✔
751
  *tagBlockId = -1;
3,046,484✔
752
  *pSlotMap = taosHashInit(numOfCols, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
3,046,484✔
753
  TSDB_CHECK_NULL(*pSlotMap, code, lino, _return, terrno)
3,046,484!
754

755
  for (int32_t i = 0; i < numOfCols; ++i) {
19,316,803✔
756
    SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
16,270,319✔
757
    TSDB_CHECK_NULL(pColNode, code, lino, _return, terrno)
16,270,319!
758

759
    if (pColNode->isPrimTs) {
16,270,319!
760
      *tsSlotId = i;
2,753,424✔
761
    } else if (pColNode->hasRef) {
13,516,895!
762
      int32_t slotKey = pColNode->dataBlockId << 16 | pColNode->slotId;
7,548,123✔
763
      VTS_ERR_JRET(taosHashPut(*pSlotMap, &slotKey, sizeof(slotKey), &i, sizeof(i)));
7,548,123!
764
    } else if (pColNode->colType == COLUMN_TYPE_TAG || '\0' == pColNode->tableAlias[0]) {
5,968,772✔
765
      // tag column or pseudo column's function
766
      *tagBlockId = pColNode->dataBlockId;
1,723,231✔
767
    }
768
  }
769

770
  return code;
3,046,484✔
771
_return:
×
772
  taosHashCleanup(*pSlotMap);
×
773
  *pSlotMap = NULL;
×
774
  if (code != TSDB_CODE_SUCCESS) {
×
775
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
776
  }
777
  return code;
×
778
}
779

780
int32_t resetVirtualTableMergeOperState(SOperatorInfo* pOper) {
5,794,528✔
781
  int32_t code = 0, lino = 0;
5,794,528✔
782
  SVirtualScanMergeOperatorInfo* pMergeInfo = pOper->info;
5,794,528✔
783
  SVirtualScanPhysiNode* pPhynode = (SVirtualScanPhysiNode*)pOper->pPhyNode;
5,795,616✔
784
  SVirtualTableScanInfo* pInfo = &pMergeInfo->virtualScanInfo;
5,795,616✔
785
  
786
  pOper->status = OP_NOT_OPENED;
5,795,616✔
787
  resetBasicOperatorState(&pMergeInfo->binfo);
5,795,072✔
788

789
  tsortDestroySortHandle(pInfo->pSortHandle);
5,795,616✔
790
  pInfo->pSortHandle = NULL;
5,793,984✔
791
  // taosArrayDestroy(pInfo->pSortInfo);
792
  // pInfo->pSortInfo = NULL;
793

794
  blockDataDestroy(pInfo->pIntermediateBlock);
5,793,984✔
795
  pInfo->pIntermediateBlock = NULL;
5,793,984✔
796

797
  blockDataDestroy(pInfo->pInputBlock);
5,793,984✔
798
  pInfo->pInputBlock = createDataBlockFromDescNode(((SPhysiNode*)pPhynode)->pOutputDataBlockDesc);
5,793,984✔
799
  TSDB_CHECK_NULL(pInfo->pInputBlock, code, lino, _exit, terrno)
5,795,616!
800

801
  pInfo->tagDownStreamId = -1;
5,795,616✔
802

803
  if (pInfo->pSortCtxList) {
5,795,072✔
804
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSortCtxList); i++) {
436,728✔
805
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pInfo->pSortCtxList, i);
225,708✔
806
      blockDataDestroy(pCtx->pIntermediateBlock);
225,708✔
807
      taosMemoryFree(pCtx);
225,708!
808
    }
809
    taosArrayDestroy(pInfo->pSortCtxList);
211,020✔
810
    pInfo->pSortCtxList = NULL;
211,020✔
811
  }
812

813
  pMergeInfo->pSavedTuple = NULL;
5,793,984✔
814
  pMergeInfo->pSavedTagBlock = NULL;
5,793,440✔
815

816
_exit:
5,794,528✔
817

818
  if (code) {
5,794,528!
819
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
820
  }
821

822
  return code;
5,793,984✔
823
}
824

825
int32_t createVirtualTableMergeOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
3,050,318✔
826
                                            SVirtualScanPhysiNode* pVirtualScanPhyNode,
827
                                            SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
828
  SPhysiNode*                    pPhyNode = (SPhysiNode*)pVirtualScanPhyNode;
3,050,318✔
829
  int32_t                        lino = 0;
3,050,318✔
830
  int32_t                        code = TSDB_CODE_SUCCESS;
3,050,318✔
831
  SVirtualScanMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SVirtualScanMergeOperatorInfo));
3,050,318!
832
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,050,318!
833
  SDataBlockDescNode*            pDescNode = pPhyNode->pOutputDataBlockDesc;
3,050,318✔
834
  SNodeList*                     pMergeKeys = NULL;
3,050,318✔
835

836
  QUERY_CHECK_NULL(pInfo, code, lino, _return, terrno)
3,050,318!
837
  QUERY_CHECK_NULL(pOperator, code, lino, _return, terrno)
3,050,318!
838

839
  pOperator->pPhyNode = pVirtualScanPhyNode;
3,050,318✔
840

841
  pInfo->binfo.inputTsOrder = pVirtualScanPhyNode->scan.node.inputTsOrder;
3,050,318✔
842
  pInfo->binfo.outputTsOrder = pVirtualScanPhyNode->scan.node.outputTsOrder;
3,050,318✔
843

844
  SVirtualTableScanInfo* pVirtualScanInfo = &pInfo->virtualScanInfo;
3,050,318✔
845
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
3,050,318✔
846
  TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _return, terrno)
3,050,318!
847

848
  SSDataBlock* pInputBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
3,050,318✔
849
  TSDB_CHECK_NULL(pInputBlock, code, lino, _return, terrno)
3,050,318!
850
  pVirtualScanInfo->pInputBlock = pInputBlock;
3,050,318✔
851
  pVirtualScanInfo->tagDownStreamId = -1;
3,050,318✔
852
  pVirtualScanInfo->vtableUid = (tb_uid_t)pVirtualScanPhyNode->scan.uid;
3,050,318✔
853
  if (pVirtualScanPhyNode->scan.pScanPseudoCols != NULL) {
3,050,318✔
854
    SExprSupp* pSup = &pVirtualScanInfo->base.pseudoSup;
791,629✔
855
    pSup->pExprInfo = NULL;
791,629✔
856
    VTS_ERR_JRET(createExprInfo(pVirtualScanPhyNode->scan.pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs));
791,629!
857

858
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset,
791,629✔
859
                                      &pTaskInfo->storageAPI.functionStore);
860
    TSDB_CHECK_NULL(pSup->pCtx, code, lino, _return, terrno)
791,629!
861
  }
862

863
  initResultSizeInfo(&pOperator->resultInfo, 1024);
3,050,318✔
864
  TSDB_CHECK_CODE(blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity), lino, _return);
3,050,318!
865

866
  size_t  numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock);
3,050,318✔
867
  int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
3,050,318✔
868

869
  if (!pVirtualScanPhyNode->scan.node.dynamicOp) {
3,050,318!
870
    VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, 0));
2,289,982!
871
    pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys);
2,289,982✔
872
    TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno)
2,289,982!
873
  } else {
874
    pTaskInfo->dynamicTask = true;
760,336✔
875
  }
876
  pVirtualScanInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols);
3,050,318✔
877
  pVirtualScanInfo->sortBufSize =
3,050,318✔
878
      pVirtualScanInfo->bufPageSize * (numOfDownstream + 1);  // one additional is reserved for merged result.
3,050,318✔
879
  VTS_ERR_JRET(
3,050,318!
880
      extractColMap(pVirtualScanPhyNode->pTargets, &pVirtualScanInfo->dataSlotMap, &pVirtualScanInfo->tsSlotId, &pVirtualScanInfo->tagBlockId));
881

882
  pVirtualScanInfo->scanAllCols = pVirtualScanPhyNode->scanAllCols;
3,050,318!
883

884
  VTS_ERR_JRET(filterInitFromNode((SNode*)pVirtualScanPhyNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo,
3,050,318!
885
                                  0, pTaskInfo->pStreamRuntimeInfo));
886

887
  pVirtualScanInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
3,050,318✔
888
  QUERY_CHECK_NULL(pVirtualScanInfo->base.metaCache.pTableMetaEntryCache, code, lino, _return, terrno)
3,050,318!
889

890
  setOperatorInfo(pOperator, "VirtualTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN, false,
3,050,318✔
891
                  OP_NOT_OPENED, pInfo, pTaskInfo);
892
  pOperator->fpSet =
893
      createOperatorFpSet(openVirtualTableScanOperator, virtualTableGetNext, NULL, destroyVirtualTableScanOperatorInfo,
3,050,318✔
894
                          optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
895
  setOperatorResetStateFn(pOperator, resetVirtualTableMergeOperState);
3,050,318✔
896

897
  if (NULL != pDownstream) {
3,050,318✔
898
    VTS_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));
3,029,357!
899
  } else {
900
    pVirtualScanInfo->tagDownStreamId = -1;
20,961✔
901
  }
902

903
  nodesDestroyList(pMergeKeys);
3,050,318✔
904
  *pOptrInfo = pOperator;
3,050,318✔
905
  return TSDB_CODE_SUCCESS;
3,050,318✔
906

907
_return:
×
908
  if (code != TSDB_CODE_SUCCESS) {
×
909
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
910
  }
911
  if (pInfo != NULL) {
×
912
    destroyVirtualTableScanOperatorInfo(pInfo);
×
913
  }
914
  nodesDestroyList(pMergeKeys);
×
915
  pTaskInfo->code = code;
×
916
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
917
  return code;
×
918
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc