• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4674

18 Aug 2025 07:58AM UTC coverage: 59.821% (+0.1%) from 59.715%
#4674

push

travis-ci

web-flow
test: update case desc (#32551)

136937 of 292075 branches covered (46.88%)

Branch coverage included in aggregate %.

207916 of 284395 relevant lines covered (73.11%)

4553289.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.95
/source/libs/executor/src/virtualtablescanoperator.c
1
/*
2
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
*
4
* This program is free software: you can use, redistribute, and/or modify
5
* it under the terms of the GNU Affero General Public License, version 3
6
* or later ("AGPL"), as published by the Free Software Foundation.
7
*
8
* This program is distributed in the hope that it will be useful, but WITHOUT
9
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
* FITNESS FOR A PARTICULAR PURPOSE.
11
*
12
* You should have received a copy of the GNU Affero General Public License
13
* along with this program. If not, see <http://www.gnu.org/licenses/>.
14
*/
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "streamexecutorInt.h"
21
#include "tdatablock.h"
22
#include "ttime.h"
23
#include "virtualtablescan.h"
24
#include "tsort.h"
25

26
#define STREAM_VTABLE_MERGE_OP_NAME "StreamVtableMergeOperator"
27
#define STREAM_VTABLE_MERGE_OP_CHECKPOINT_NAME "StreamVtableMergeOperator_Checkpoint"
28

29
typedef struct SVirtualTableScanInfo {
30
  STableScanBase base;
31
  SArray*        pSortInfo;
32
  SSortHandle*   pSortHandle;
33
  int32_t        bufPageSize;
34
  uint32_t       sortBufSize;  // max buffer size for in-memory sort
35
  SSDataBlock*   pIntermediateBlock;   // to hold the intermediate result
36
  SSDataBlock*   pInputBlock;
37
  SHashObj*      dataSlotMap;
38
  int32_t        tsSlotId;
39
  int32_t        tagBlockId;
40
  int32_t        tagDownStreamId;
41
  bool           scanAllCols;
42
  SArray*        pSortCtxList;
43
  tb_uid_t       vtableUid;  // virtual table uid, used to identify the vtable scan operator
44
} SVirtualTableScanInfo;
45

46
typedef struct SVirtualScanMergeOperatorInfo {
47
  SOptrBasicInfo        binfo;
48
  EMergeType            type;
49
  SVirtualTableScanInfo virtualScanInfo;
50
  bool                  ignoreGroupId;
51
  uint64_t              groupId;
52
  STupleHandle*         pSavedTuple;
53
  SSDataBlock*          pSavedTagBlock;
54
} SVirtualScanMergeOperatorInfo;
55

56
typedef struct SLoadNextCtx {
57
  SOperatorInfo*  pOperator;
58
  SOperatorParam* pOperatorGetParam;
59
  int32_t         blockId;
60
  STimeWindow     window;
61
  SSDataBlock*    pIntermediateBlock;
62
  col_id_t        tsSlotId;
63
} SLoadNextCtx;
64

65
int32_t virtualScanloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
13,292✔
66
  SOperatorInfo* pOperator = (SOperatorInfo*)param;
13,292✔
67
  int32_t        code = TSDB_CODE_SUCCESS;
13,292✔
68

69
  VTS_ERR_JRET(pOperator->fpSet.getNextFn(pOperator, ppBlock));
13,292!
70
  VTS_ERR_JRET(blockDataCheck(*ppBlock));
13,292!
71

72
  return code;
13,292✔
73
_return:
×
74
  qError("failed to load data block from downstream, %s code:%s", __func__, tstrerror(code));
×
75
  return code;
×
76
}
77

78
int32_t getTimeWindowOfBlock(SSDataBlock *pBlock, col_id_t tsSlotId, int64_t *startTs, int64_t *endTs) {
171✔
79
  int32_t code = TSDB_CODE_SUCCESS;
171✔
80
  int32_t lino = 0;
171✔
81
  int32_t tsIndex = -1;
171✔
82
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
171!
83
    if (((SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i))->info.colId == tsSlotId) {
171!
84
      tsIndex = i;
171✔
85
      break;
171✔
86
    }
87
  }
88

89
  if (tsIndex == -1) {
171!
90
    tsIndex = (int32_t)taosArrayGetSize(pBlock->pDataBlock) - 1;
×
91
  }
92

93
  SColumnInfoData *pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, tsIndex);
171✔
94
  QUERY_CHECK_NULL(pColData, code, lino, _return, terrno);
171!
95

96
  GET_TYPED_DATA(*startTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, 0), 0);
171✔
97
  GET_TYPED_DATA(*endTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, pBlock->info.rows - 1), 0);
171✔
98

99
  return code;
171✔
100
_return:
×
101
  qError("failed to get time window of block, %s code:%s", __func__, tstrerror(code));
×
102
  return code;
×
103
}
104

105
int32_t virtualScanloadNextDataBlockFromParam(void* param, SSDataBlock** ppBlock) {
228✔
106
  SLoadNextCtx*           pCtx = (SLoadNextCtx*)param;
228✔
107
  SOperatorInfo*          pOperator = pCtx->pOperator;
228✔
108
  SOperatorParam*         pOperatorGetParam = pCtx->pOperatorGetParam;
228✔
109
  int32_t                 code = TSDB_CODE_SUCCESS;
228✔
110
  SSDataBlock*            pRes = NULL;
228✔
111
  SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperatorGetParam->value;
228✔
112

113
  pParam->basic.window = pCtx->window;
228✔
114
  pOperator->status = OP_NOT_OPENED;
228✔
115
  if (pCtx->pIntermediateBlock) {
228✔
116
    blockDataDestroy(pCtx->pIntermediateBlock);
171✔
117
    pCtx->pIntermediateBlock = NULL;
171✔
118
  }
119

120
  VTS_ERR_JRET(pOperator->fpSet.getNextExtFn(pOperator, pOperatorGetParam, &pRes));
228!
121

122
  VTS_ERR_JRET(blockDataCheck(pRes));
228!
123
  if ((pRes)) {
228✔
124
    qDebug("%s load from downstream, blockId:%d", __func__, pCtx->blockId);
171!
125
    (pRes)->info.id.blockId = pCtx->blockId;
171✔
126
    getTimeWindowOfBlock(pRes, pCtx->tsSlotId, &pCtx->window.skey, &pCtx->window.ekey);
171✔
127
    VTS_ERR_JRET(createOneDataBlock(pRes, true, &pCtx->pIntermediateBlock));
171!
128
    *ppBlock = pCtx->pIntermediateBlock;
171✔
129
  } else {
130
    pCtx->window.ekey = INT64_MAX;
57✔
131
    *ppBlock = NULL;
57✔
132
  }
133

134
  return code;
228✔
135
_return:
×
136
  qError("failed to load data block from downstream, %s code:%s", __func__, tstrerror(code));
×
137
  return code;
×
138
}
139

140
int32_t makeTSMergeKey(SNodeList** pMergeKeys, col_id_t tsSlotId) {
1,494✔
141
  int32_t           code = TSDB_CODE_SUCCESS;
1,494✔
142
  SNodeList        *pNodeList = NULL;
1,494✔
143
  SColumnNode      *pColumnNode = NULL;
1,494✔
144
  SOrderByExprNode *pOrderByExprNode = NULL;
1,494✔
145

146
  VTS_ERR_JRET(nodesMakeList(&pNodeList));
1,494!
147

148
  VTS_ERR_JRET(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pColumnNode));
1,494!
149
  pColumnNode->slotId = tsSlotId;
1,494✔
150

151
  VTS_ERR_JRET(nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR, (SNode**)&pOrderByExprNode));
1,494!
152
  pOrderByExprNode->pExpr = (SNode*)pColumnNode;
1,494✔
153
  pOrderByExprNode->order = ORDER_ASC;
1,494✔
154
  pOrderByExprNode->nullOrder = NULL_ORDER_FIRST;
1,494✔
155

156
  VTS_ERR_JRET(nodesListAppend(pNodeList, (SNode*)pOrderByExprNode));
1,494!
157

158
  *pMergeKeys = pNodeList;
1,494✔
159
  return code;
1,494✔
160
_return:
×
161
  nodesDestroyNode((SNode*)pColumnNode);
×
162
  nodesDestroyNode((SNode*)pOrderByExprNode);
×
163
  nodesDestroyList(pNodeList);
×
164
  return code;
×
165
}
166

167
void cleanUpVirtualScanInfo(SVirtualTableScanInfo* pVirtualScanInfo) {
24✔
168
  if (pVirtualScanInfo->pSortInfo) {
24✔
169
    taosArrayDestroy(pVirtualScanInfo->pSortInfo);
18✔
170
    pVirtualScanInfo->pSortInfo = NULL;
18✔
171
  }
172
  if (pVirtualScanInfo->pSortHandle) {
24✔
173
    tsortDestroySortHandle(pVirtualScanInfo->pSortHandle);
18✔
174
    pVirtualScanInfo->pSortHandle = NULL;
18✔
175
  }
176
  if (pVirtualScanInfo->pSortCtxList) {
24✔
177
    for (int32_t i = 0; i < taosArrayGetSize(pVirtualScanInfo->pSortCtxList); i++) {
54✔
178
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pVirtualScanInfo->pSortCtxList, i);
36✔
179
      blockDataDestroy(pCtx->pIntermediateBlock);
36✔
180
      taosMemoryFree(pCtx);
36!
181
    }
182
    taosArrayDestroy(pVirtualScanInfo->pSortCtxList);
18✔
183
  }
184
}
24✔
185

186
int32_t createSortHandleFromParam(SOperatorInfo* pOperator) {
24✔
187
  int32_t                         code = TSDB_CODE_SUCCESS;
24✔
188
  int32_t                         lino = 0;
24✔
189
  SVirtualScanMergeOperatorInfo*  pInfo = pOperator->info;
24✔
190
  SVirtualTableScanInfo*          pVirtualScanInfo = &pInfo->virtualScanInfo;
24✔
191
  SVTableScanOperatorParam *      pParam = (SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value;
24✔
192
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
24✔
193
  pVirtualScanInfo->sortBufSize = pVirtualScanInfo->bufPageSize * (taosArrayGetSize((pParam)->pOpParamArray) + 1);
24✔
194
  int32_t                         numOfBufPage = (int32_t)pVirtualScanInfo->sortBufSize / pVirtualScanInfo->bufPageSize;
24✔
195
  SNodeList*                      pMergeKeys = NULL;
24✔
196
  SSortSource*                    ps = NULL;
24✔
197
  int32_t                         scanOpIndex = 0;
24✔
198

199
  cleanUpVirtualScanInfo(pVirtualScanInfo);
24✔
200
  VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, pVirtualScanInfo->tsSlotId));
24!
201
  pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys);
24✔
202
  TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno);
24!
203
  nodesDestroyList(pMergeKeys);
24✔
204

205
  VTS_ERR_JRET(tsortCreateSortHandle(pVirtualScanInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pVirtualScanInfo->bufPageSize,
24!
206
                                     numOfBufPage, pVirtualScanInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pVirtualScanInfo->pSortHandle));
207

208
  tsortSetForceUsePQSort(pVirtualScanInfo->pSortHandle);
24✔
209
  tsortSetFetchRawDataFp(pVirtualScanInfo->pSortHandle, virtualScanloadNextDataBlockFromParam, NULL, NULL);
24✔
210

211
  if (pOperator->numOfDownstream > 2) {
24!
212
    qError("virtual scan operator should not have more than 2 downstreams, current numOfDownstream:%d", pOperator->numOfDownstream);
×
213
    VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM);
×
214
  }
215

216
  pVirtualScanInfo->tagDownStreamId = -1;
24✔
217
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
72✔
218
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
48✔
219
    if (pDownstream->resultDataBlockId == pVirtualScanInfo->tagBlockId) {
48✔
220
      // tag block do not need sort
221
      pVirtualScanInfo->tagDownStreamId = i;
24✔
222
      pInfo->pSavedTagBlock = NULL;
24✔
223
      continue;
24✔
224
    }
225
  }
226
  scanOpIndex = pVirtualScanInfo->tagDownStreamId == -1 ? 0 : 1 - pVirtualScanInfo->tagDownStreamId;
24!
227

228
  pOperator->pDownstream[scanOpIndex]->status = OP_NOT_OPENED;
24✔
229
  pVirtualScanInfo->pSortCtxList = taosArrayInit(taosArrayGetSize((pParam)->pOpParamArray), POINTER_BYTES);
24✔
230
  TSDB_CHECK_NULL(pVirtualScanInfo->pSortCtxList, code, lino, _return, terrno);
24!
231
  for (int32_t i = 0; i < taosArrayGetSize((pParam)->pOpParamArray); i++) {
81✔
232
    SOperatorParam* pOpParam = *(SOperatorParam**)taosArrayGet((pParam)->pOpParamArray, i);
57✔
233
    SLoadNextCtx*   pCtx = NULL;
57✔
234
    ps = NULL;
57✔
235

236
    pCtx = taosMemoryMalloc(sizeof(SLoadNextCtx));
57!
237
    QUERY_CHECK_NULL(pCtx, code, lino, _return, terrno);
57!
238
    pCtx->blockId = i;
57✔
239
    pCtx->pOperator = pOperator->pDownstream[scanOpIndex];
57✔
240
    pCtx->pOperatorGetParam = pOpParam;
57✔
241
    pCtx->window = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN};
57✔
242
    pCtx->pIntermediateBlock = NULL;
57✔
243
    pCtx->tsSlotId = (col_id_t)pVirtualScanInfo->tsSlotId;
57✔
244

245
    ps = taosMemoryCalloc(1, sizeof(SSortSource));
57!
246
    QUERY_CHECK_NULL(ps, code, lino, _return, terrno);
57!
247

248
    ps->param = pCtx;
57✔
249
    ps->onlyRef = true;
57✔
250

251
    VTS_ERR_JRET(tsortAddSource(pVirtualScanInfo->pSortHandle, ps));
57!
252
    QUERY_CHECK_NULL(taosArrayPush(pVirtualScanInfo->pSortCtxList, &pCtx), code, lino, _return, terrno);
114!
253
  }
254

255
  VTS_ERR_JRET(tsortOpen(pVirtualScanInfo->pSortHandle));
24!
256

257
  return code;
24✔
258
_return:
×
259
  if (code != 0){
×
260
    qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
261
  }
262
  nodesDestroyList(pMergeKeys);
×
263
  if (ps != NULL) {
×
264
    taosMemoryFree(ps);
×
265
  }
266
  return code;
×
267
}
268

269
int32_t createSortHandle(SOperatorInfo* pOperator) {
1,748✔
270
  SVirtualScanMergeOperatorInfo * pInfo = pOperator->info;
1,748✔
271
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
1,748✔
272
  SVirtualTableScanInfo*          pVirtualScanInfo = &pInfo->virtualScanInfo;
1,748✔
273
  int32_t                         numOfBufPage = (int32_t)pVirtualScanInfo->sortBufSize / pVirtualScanInfo->bufPageSize;
1,748✔
274
  SSortSource*                    ps = NULL;
1,748✔
275
  int32_t                         code = 0;
1,748✔
276
  int32_t                         lino = 0;
1,748✔
277

278
  VTS_ERR_JRET(tsortCreateSortHandle(pVirtualScanInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pVirtualScanInfo->bufPageSize,
1,748!
279
                                     numOfBufPage, pVirtualScanInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pVirtualScanInfo->pSortHandle));
280

281
  tsortSetForceUsePQSort(pVirtualScanInfo->pSortHandle);
1,748✔
282
  tsortSetFetchRawDataFp(pVirtualScanInfo->pSortHandle, virtualScanloadNextDataBlock, NULL, NULL);
1,748✔
283

284
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
5,316✔
285
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
3,568✔
286
    if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
3,568!
287
      VTS_ERR_JRET(pDownstream->fpSet._openFn(pDownstream));
3,568!
288
    } else {
289
      VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM);
×
290
    }
291

292
    if (pDownstream->resultDataBlockId == pVirtualScanInfo->tagBlockId) {
3,568✔
293
      // tag block do not need sort
294
      pVirtualScanInfo->tagDownStreamId = i;
18✔
295
      continue;
18✔
296
    }
297

298
    ps = taosMemoryCalloc(1, sizeof(SSortSource));
3,550!
299
    TSDB_CHECK_NULL(ps, code, lino, _return, terrno);
3,550!
300

301
    ps->param = pDownstream;
3,550✔
302
    ps->onlyRef = true;
3,550✔
303

304
    VTS_ERR_JRET(tsortAddSource(pVirtualScanInfo->pSortHandle, ps));
3,550!
305
    ps = NULL;
3,550✔
306
  }
307

308
  VTS_ERR_JRET(tsortOpen(pVirtualScanInfo->pSortHandle));
1,748!
309

310
_return:
1,748✔
311
  if (code != 0){
1,748!
312
    qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
313
  }
314
  if (ps != NULL) {
1,748!
315
    taosMemoryFree(ps);
×
316
  }
317
  return code;
1,748✔
318
}
319

320
int32_t openVirtualTableScanOperatorImpl(SOperatorInfo* pOperator) {
1,781✔
321
  SVirtualScanMergeOperatorInfo * pInfo = pOperator->info;
1,781✔
322
  int32_t                         code = 0;
1,781✔
323
  int32_t                         lino = 0;
1,781✔
324

325
  if (pOperator->numOfDownstream == 0) {
1,781✔
326
    return code;
9✔
327
  }
328

329
  if (pOperator->pOperatorGetParam) {
1,772✔
330
    VTS_ERR_JRET(createSortHandleFromParam(pOperator));
24!
331
  } else {
332
    VTS_ERR_JRET(createSortHandle(pOperator));
1,748!
333
  }
334

335
  return code;
1,772✔
336

337
_return:
×
338
  qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
339
  return code;
×
340
}
341

342
int32_t openVirtualTableScanOperator(SOperatorInfo* pOperator) {
26,912✔
343
  int32_t code = 0;
26,912✔
344

345
  if (OPTR_IS_OPENED(pOperator)) {
26,912✔
346
    return TSDB_CODE_SUCCESS;
25,131✔
347
  }
348

349
  int64_t startTs = taosGetTimestampUs();
1,781✔
350

351
  code = openVirtualTableScanOperatorImpl(pOperator);
1,781✔
352

353
  pOperator->cost.openCost = (double)(taosGetTimestampUs() - startTs) / 1000.0;
1,781✔
354
  pOperator->status = OP_RES_TO_RETURN;
1,781✔
355

356
  VTS_ERR_RET(code);
1,781!
357

358
  OPTR_SET_OPENED(pOperator);
1,781✔
359
  return code;
1,781✔
360
}
361

362
static int32_t doGetVtableMergedBlockData(SVirtualScanMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
27,714✔
363
                                          SSDataBlock* p) {
364
  int32_t code = 0;
27,714✔
365
  int64_t lastTs = 0;
27,714✔
366
  int64_t rowNums = -1;
27,714✔
367
  blockDataEmpty(p);
27,714✔
368
  while (1) {
30,822,520✔
369
    STupleHandle* pTupleHandle = NULL;
30,837,230✔
370
    if (!pInfo->pSavedTuple) {
30,837,230✔
371
      code = tsortNextTuple(pHandle, &pTupleHandle);
30,812,975✔
372
      if (pTupleHandle == NULL || (code != 0)) {
31,162,022✔
373
        break;
374
      }
375
    } else {
376
      pTupleHandle = pInfo->pSavedTuple;
24,255✔
377
      pInfo->pSavedTuple = NULL;
24,255✔
378
    }
379

380
    SDataBlockInfo info = {0};
31,182,818✔
381
    tsortGetBlockInfo(pTupleHandle, &info);
31,182,818✔
382
    int32_t blockId = (int32_t)info.id.blockId;
31,384,318✔
383

384
    for (int32_t i = 0; i < (pInfo->virtualScanInfo.scanAllCols ? 1 : tsortGetColNum(pTupleHandle)); i++) {
76,459,993✔
385
      bool isNull = tsortIsNullVal(pTupleHandle, i);
44,620,884✔
386
      if (isNull) {
44,472,988!
387
        int32_t slotKey = blockId << 16 | i;
×
388
        void*   slotId = taosHashGet(pInfo->virtualScanInfo.dataSlotMap, &slotKey, sizeof(slotKey));
×
389
        if (slotId == NULL) {
×
390
          if (i == 0) {
×
391
            colDataSetNULL(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums);
×
392
          } else {
393
            qError("failed to get slotId from dataSlotMap, blockId:%d, slotId:%d", blockId, i);
×
394
            VTS_ERR_RET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
395
          }
396
        } else {
397
          colDataSetNULL(taosArrayGet(p->pDataBlock, *(int32_t *)slotId), rowNums);
×
398
        }
399
      } else {
400
        char* pData = NULL;
44,472,988✔
401
        tsortGetValue(pTupleHandle, i, (void**)&pData);
44,472,988✔
402

403
        if (pData != NULL) {
44,275,582!
404
          if (i == 0) {
44,275,582✔
405
            if (lastTs != *(int64_t*)pData) {
31,098,169✔
406
              if (rowNums >= capacity - 1) {
25,612,579✔
407
                pInfo->pSavedTuple = pTupleHandle;
24,254✔
408
                goto _return;
24,254✔
409
              }
410
              rowNums++;
25,588,325✔
411
              for (int32_t j = 0; j < taosArrayGetSize(p->pDataBlock); j++) {
76,411,718✔
412
                colDataSetNULL(taosArrayGet(p->pDataBlock, j), rowNums);
50,889,466✔
413
              }
414
              if (pInfo->virtualScanInfo.tsSlotId != -1) {
24,817,526!
415
                VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums, pData, false));
25,471,975!
416
              }
417
              lastTs = *(int64_t*)pData;
25,526,400✔
418
            }
419
          }
420
          int32_t slotKey = blockId << 16 | i;
44,189,403✔
421
          void*   slotId = taosHashGet(pInfo->virtualScanInfo.dataSlotMap, &slotKey, sizeof(slotKey));
44,189,403✔
422
          if (slotId == NULL) {
44,556,559✔
423
            if (i == 0) {
31,029,535!
424
              continue;
31,029,535✔
425
            } else {
426
              qError("failed to get slotId from dataSlotMap, blockId:%d, slotId:%d", blockId, i);
×
427
              VTS_ERR_RET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
428
            }
429
          }
430
          VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, *(int32_t *)slotId), rowNums, pData, false));
13,527,024!
431
        }
432
      }
433
    }
434
  }
435
_return:
27,713✔
436
  p->info.rows = rowNums + 1;
27,713✔
437
  p->info.dataLoad = 1;
27,713✔
438
  p->info.scanFlag = MAIN_SCAN;
27,713✔
439
  return code;
27,713✔
440
}
441

442
static int32_t doGetVStableMergedBlockData(SVirtualScanMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
438✔
443
                                           SSDataBlock* p) {
444
  int32_t code = 0;
438✔
445
  int64_t lastTs = 0;
438✔
446
  int64_t rowNums = -1;
438✔
447
  blockDataEmpty(p);
438✔
448
  while (1) {
570,000✔
449
    STupleHandle* pTupleHandle = NULL;
570,438✔
450
    if (!pInfo->pSavedTuple) {
570,438✔
451
      code = tsortNextTuple(pHandle, &pTupleHandle);
570,042✔
452
      if (pTupleHandle == NULL || (code != 0)) {
570,042!
453
        break;
454
      }
455
    } else {
456
      pTupleHandle = pInfo->pSavedTuple;
396✔
457
      pInfo->pSavedTuple = NULL;
396✔
458
    }
459

460
    int32_t tsIndex = -1;
570,396✔
461

462
    for (int32_t i = 0; i < tsortGetColNum(pTupleHandle); i++) {
570,396!
463
      if (tsortIsNullVal(pTupleHandle, i)) {
570,396!
464
        continue;
×
465
      } else {
466
        SColumnInfoData *pColInfo = NULL;
570,396✔
467
        tsortGetColumnInfo(pTupleHandle, i, &pColInfo);
570,396✔
468
        if (pColInfo->info.slotId ==  pInfo->virtualScanInfo.tsSlotId) {
570,396!
469
          tsIndex = i;
570,396✔
470
          break;
570,396✔
471
        }
472
      }
473
    }
474

475
    if (tsIndex == -1) {
570,396!
476
      tsIndex = (int32_t)tsortGetColNum(pTupleHandle) - 1;
×
477
    }
478

479
    char* pData = NULL;
570,396✔
480
    // first, set ts slot's data
481
    // then, set other slots' data
482
    tsortGetValue(pTupleHandle, tsIndex, (void**)&pData);
570,396✔
483

484
    if (pData != NULL) {
570,396!
485
      if (lastTs != *(int64_t*)pData) {
570,396✔
486
        if (rowNums >= capacity - 1) {
420,390✔
487
          pInfo->pSavedTuple = pTupleHandle;
396✔
488
          goto _return;
396✔
489
        }
490
        rowNums++;
419,994✔
491
        for (int32_t j = 0; j < taosArrayGetSize(p->pDataBlock); j++) {
9,729,861✔
492
          colDataSetNULL(taosArrayGet(p->pDataBlock, j), rowNums);
9,309,867✔
493
        }
494
        if (pInfo->virtualScanInfo.tsSlotId != -1) {
419,994!
495
          VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums, pData, false));
419,994!
496
        }
497
        lastTs = *(int64_t*)pData;
419,994✔
498
      }
499
    }
500
    if (pInfo->virtualScanInfo.scanAllCols) {
570,000!
501
      continue;
×
502
    }
503

504
    for (int32_t i = 0; i < tsortGetColNum(pTupleHandle); i++) {
13,220,000✔
505
      if (i == tsIndex || tsortIsNullVal(pTupleHandle, i)) {
12,650,000✔
506
        continue;
10,370,000✔
507
      }
508

509
      SColumnInfoData *pColInfo = NULL;
2,280,000✔
510
      tsortGetColumnInfo(pTupleHandle, i, &pColInfo);
2,280,000✔
511
      tsortGetValue(pTupleHandle, i, (void**)&pData);
2,280,000✔
512

513
      if (pData != NULL) {
2,280,000!
514
        VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, i), rowNums, pData, false));
2,280,000!
515
      }
516
    }
517
  }
518
_return:
438✔
519
  p->info.rows = rowNums + 1;
438✔
520
  p->info.dataLoad = 1;
438✔
521
  p->info.scanFlag = MAIN_SCAN;
438✔
522
  return code;
438✔
523
}
524

525
int32_t doVirtualTableMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
28,160✔
526
  int32_t                        code = 0;
28,160✔
527
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
28,160✔
528
  SVirtualScanMergeOperatorInfo* pInfo = pOperator->info;
28,160✔
529
  SVirtualTableScanInfo*         pVirtualScanInfo = &pInfo->virtualScanInfo;
28,160✔
530
  SSortHandle*                   pHandle = pVirtualScanInfo->pSortHandle;
28,160✔
531
  SSDataBlock*                   pDataBlock = pInfo->binfo.pRes;
28,160✔
532
  int32_t                        capacity = pOperator->resultInfo.capacity;
28,160✔
533

534
  qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
28,160✔
535
  blockDataCleanup(pDataBlock);
28,161✔
536

537
  if (pHandle == NULL) {
28,161✔
538
    return TSDB_CODE_SUCCESS;
9✔
539
  }
540

541
  if (pVirtualScanInfo->pIntermediateBlock == NULL) {
28,152✔
542
    VTS_ERR_RET(tsortGetSortedDataBlock(pHandle, &pVirtualScanInfo->pIntermediateBlock));
1,754!
543
    if (pVirtualScanInfo->pIntermediateBlock == NULL) {
1,754!
544
      return TSDB_CODE_SUCCESS;
×
545
    }
546

547
    VTS_ERR_RET(blockDataEnsureCapacity(pVirtualScanInfo->pIntermediateBlock, capacity));
1,754!
548
  } else {
549
    blockDataCleanup(pVirtualScanInfo->pIntermediateBlock);
26,398✔
550
  }
551

552
  SSDataBlock* p = pVirtualScanInfo->pIntermediateBlock;
28,152✔
553
  if (pOperator->pOperatorGetParam) {
28,152✔
554
    VTS_ERR_RET(doGetVStableMergedBlockData(pInfo, pHandle, capacity, p));
438!
555
  } else {
556
    VTS_ERR_RET(doGetVtableMergedBlockData(pInfo, pHandle, capacity, p));
27,714!
557
  }
558

559
  VTS_ERR_RET(copyDataBlock(pDataBlock, p));
28,151!
560
  qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
28,152✔
561
         pDataBlock->info.rows);
562

563
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
28,152✔
564
  return code;
28,152✔
565
}
566

567
int32_t vtableAddTagPseudoColumnData(SVirtualTableScanInfo *pInfo, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* tagBlock, SSDataBlock* pBlock, int32_t rows) {
414✔
568
  int32_t          code = TSDB_CODE_SUCCESS;
414✔
569
  int32_t          lino = 0;
414✔
570
  int64_t          backupRows;
571
  // currently only the tbname pseudo column
572
  if (numOfExpr <= 0) {
414!
573
    return TSDB_CODE_SUCCESS;
×
574
  }
575

576
  if (tagBlock == NULL) {
414!
577
    return TSDB_CODE_SUCCESS;
×
578
  }
579

580
  if (tagBlock->info.rows != 1) {
414!
581
    qError("tag block should have only one row, current rows:%" PRId64, tagBlock->info.rows);
×
582
    VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
583
  }
584

585
  backupRows = pBlock->info.rows;
414✔
586
  pBlock->info.rows = rows;
414✔
587
  for (int32_t j = 0; j < numOfExpr; ++j) {
2,898✔
588
    const SExprInfo* pExpr1 = &pExpr[j];
2,484✔
589
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
2,484✔
590

591
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
2,484✔
592
    TSDB_CHECK_NULL(pColInfoData, code, lino, _return, terrno);
2,484!
593
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
2,484✔
594

595
    SColumnInfoData* pTagInfoData = taosArrayGet(tagBlock->pDataBlock, j);
2,484✔
596
    TSDB_CHECK_NULL(pTagInfoData, code, lino, _return, terrno);
2,484!
597

598
    if (colDataIsNull_s(pTagInfoData, 0) || IS_JSON_NULL(pTagInfoData->info.type, colDataGetData(pTagInfoData, 0))) {
2,484!
599
      colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
600
      continue;
×
601
    }
602

603
    char* data = colDataGetData(pTagInfoData, 0);
2,484!
604

605
    if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
2,484!
606
      code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
2,484✔
607
      QUERY_CHECK_CODE(code, lino, _return);
2,484!
608
    } else {  // todo opt for json tag
609
      for (int32_t i = 0; i < pBlock->info.rows; ++i) {
×
610
        code = colDataSetVal(pColInfoData, i, data, false);
×
611
        QUERY_CHECK_CODE(code, lino, _return);
×
612
      }
613
    }
614
  }
615

616
  // restore the rows
617
  pBlock->info.rows = backupRows;
414✔
618

619
_return:
414✔
620

621
  if (code != TSDB_CODE_SUCCESS) {
414!
622
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
623
  }
624
  return code;
414✔
625
}
626

627
static int32_t doSetTagColumnData(SVirtualTableScanInfo* pInfo, SSDataBlock* pTagBlock, SSDataBlock* pBlock,
26,380✔
628
                                  SExecTaskInfo* pTaskInfo, int32_t rows) {
629
  int32_t         code = 0;
26,380✔
630
  STableScanBase* pTableScanInfo = &pInfo->base;
26,380✔
631
  SExprSupp*      pSup = &pTableScanInfo->pseudoSup;
26,380✔
632
  if (pSup->numOfExprs > 0) {
26,380✔
633
    VTS_ERR_RET(vtableAddTagPseudoColumnData(pInfo,  pSup->pExprInfo, pSup->numOfExprs, pTagBlock, pBlock, rows));
414!
634
  }
635

636
  return code;
26,380✔
637
}
638

639
int32_t virtualTableGetNext(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
26,912✔
640
  int32_t                        code = TSDB_CODE_SUCCESS;
26,912✔
641
  int32_t                        lino = 0;
26,912✔
642
  SVirtualScanMergeOperatorInfo* pInfo = pOperator->info;
26,912✔
643
  SVirtualTableScanInfo*         pVirtualScanInfo = &pInfo->virtualScanInfo;
26,912✔
644
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
26,912✔
645

646
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
26,912!
647
    pResBlock = NULL;
×
648
    return code;
×
649
  }
650

651
  VTS_ERR_JRET(pOperator->fpSet._openFn(pOperator));
26,912!
652

653
  if (pVirtualScanInfo->tagBlockId != -1 && pVirtualScanInfo->tagDownStreamId != -1 && !pInfo->pSavedTagBlock) {
26,912!
654
    SSDataBlock*   pTagBlock = NULL;
42✔
655
    SOperatorInfo *pTagScanOp = pOperator->pDownstream[pVirtualScanInfo->tagDownStreamId];
42✔
656
    if (pOperator->pOperatorGetParam) {
42✔
657
      SOperatorParam* pTagOp = ((SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value)->pTagScanOp;
24✔
658
      pTagScanOp->fpSet.getNextExtFn(pTagScanOp, pTagOp, &pTagBlock);
24✔
659
    } else {
660
      pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagBlock);
18✔
661
    }
662

663
    if (pTagBlock == NULL || pTagBlock->info.rows != 1) {
42!
664
      VTS_ERR_JRET(TSDB_CODE_FAILED);
×
665
    }
666
    pInfo->pSavedTagBlock = pTagBlock;
42✔
667
  }
668

669
  while(1) {
670
    VTS_ERR_JRET(doVirtualTableMerge(pOperator, pResBlock));
28,160!
671
    if (*pResBlock == NULL) {
28,161✔
672
      setOperatorCompleted(pOperator);
1,781✔
673
      break;
1,781✔
674
    }
675

676
    if (pOperator->pOperatorGetParam) {
26,380✔
677
      uint64_t uid = ((SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value)->uid;
414✔
678
      (*pResBlock)->info.id.uid = uid;
414✔
679
    } else {
680
      (*pResBlock)->info.id.uid = pInfo->virtualScanInfo.vtableUid;
25,966✔
681
    }
682

683
    VTS_ERR_JRET(doSetTagColumnData(pVirtualScanInfo, pInfo->pSavedTagBlock, (*pResBlock), pTaskInfo, (*pResBlock)->info.rows));
26,380!
684
    VTS_ERR_JRET(doFilter(*pResBlock, pOperator->exprSupp.pFilterInfo, NULL));
26,380!
685
    if ((*pResBlock)->info.rows > 0) {
26,379✔
686
      break;
25,131✔
687
    }
688
  }
689

690
  return code;
26,912✔
691
_return:
×
692
  if (code != TSDB_CODE_SUCCESS) {
×
693
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
694
    pTaskInfo->code = code;
×
695
    T_LONG_JMP(pTaskInfo->env, code);
×
696
  }
697
  return code;
×
698
}
699

700
static void destroyTableScanBase(STableScanBase* pBase, TsdReader* pAPI) {
1,476✔
701
  cleanupQueryTableDataCond(&pBase->cond);
1,476✔
702

703
  if (pAPI->tsdReaderClose) {
1,476!
704
    pAPI->tsdReaderClose(pBase->dataReader);
×
705
  }
706
  pBase->dataReader = NULL;
1,476✔
707

708
  if (pBase->matchInfo.pList != NULL) {
1,476!
709
    taosArrayDestroy(pBase->matchInfo.pList);
×
710
  }
711

712
  taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
1,476✔
713
  cleanupExprSupp(&pBase->pseudoSup);
1,476✔
714
}
1,476✔
715

716
void destroyVirtualTableScanOperatorInfo(void* param) {
1,476✔
717
  if (!param) {
1,476!
718
    return;
×
719
  }
720
  SVirtualScanMergeOperatorInfo* pOperatorInfo = (SVirtualScanMergeOperatorInfo*)param;
1,476✔
721
  SVirtualTableScanInfo* pInfo = &pOperatorInfo->virtualScanInfo;
1,476✔
722
  blockDataDestroy(pOperatorInfo->binfo.pRes);
1,476✔
723
  pOperatorInfo->binfo.pRes = NULL;
1,476✔
724

725
  tsortDestroySortHandle(pInfo->pSortHandle);
1,476✔
726
  pInfo->pSortHandle = NULL;
1,476✔
727
  taosArrayDestroy(pInfo->pSortInfo);
1,476✔
728
  pInfo->pSortInfo = NULL;
1,476✔
729

730
  blockDataDestroy(pInfo->pIntermediateBlock);
1,476✔
731
  pInfo->pIntermediateBlock = NULL;
1,476✔
732

733
  blockDataDestroy(pInfo->pInputBlock);
1,476✔
734
  pInfo->pInputBlock = NULL;
1,476✔
735
  destroyTableScanBase(&pInfo->base, &pInfo->base.readerAPI);
1,476✔
736

737
  taosHashCleanup(pInfo->dataSlotMap);
1,476✔
738

739
  if (pInfo->pSortCtxList) {
1,476✔
740
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSortCtxList); i++) {
27✔
741
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pInfo->pSortCtxList, i);
21✔
742
      blockDataDestroy(pCtx->pIntermediateBlock);
21✔
743
      taosMemoryFree(pCtx);
21!
744
    }
745
    taosArrayDestroy(pInfo->pSortCtxList);
6✔
746
    pInfo->pSortCtxList = NULL;
6✔
747
  }
748
  taosMemoryFreeClear(param);
1,476!
749
}
750

751
int32_t extractColMap(SNodeList* pNodeList, SHashObj** pSlotMap, int32_t *tsSlotId, int32_t *tagBlockId) {
1,476✔
752
  size_t  numOfCols = LIST_LENGTH(pNodeList);
1,476!
753
  int32_t code = TSDB_CODE_SUCCESS;
1,476✔
754
  int32_t lino = 0;
1,476✔
755

756
  if (numOfCols == 0) {
1,476!
757
    return code;
×
758
  }
759

760
  *tsSlotId = -1;
1,476✔
761
  *tagBlockId = -1;
1,476✔
762
  *pSlotMap = taosHashInit(numOfCols, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
1,476✔
763
  TSDB_CHECK_NULL(*pSlotMap, code, lino, _return, terrno);
1,476!
764

765
  for (int32_t i = 0; i < numOfCols; ++i) {
4,909✔
766
    SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
3,433✔
767
    TSDB_CHECK_NULL(pColNode, code, lino, _return, terrno);
3,433!
768

769
    if (pColNode->isPrimTs) {
3,433✔
770
      *tsSlotId = i;
1,464✔
771
    } else if (pColNode->hasRef) {
1,969✔
772
      int32_t slotKey = pColNode->dataBlockId << 16 | pColNode->slotId;
1,528✔
773
      VTS_ERR_JRET(taosHashPut(*pSlotMap, &slotKey, sizeof(slotKey), &i, sizeof(i)));
1,528!
774
    } else if (pColNode->colType == COLUMN_TYPE_TAG || '\0' == pColNode->tableAlias[0]) {
441!
775
      // tag column or pseudo column's function
776
      *tagBlockId = pColNode->dataBlockId;
54✔
777
    }
778
  }
779

780
  return code;
1,476✔
781
_return:
×
782
  taosHashCleanup(*pSlotMap);
×
783
  *pSlotMap = NULL;
×
784
  if (code != TSDB_CODE_SUCCESS) {
×
785
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
786
  }
787
  return code;
×
788
}
789

790
int32_t resetVirtualTableMergeOperState(SOperatorInfo* pOper) {
295✔
791
  int32_t code = 0, lino = 0;
295✔
792
  SVirtualScanMergeOperatorInfo* pMergeInfo = pOper->info;
295✔
793
  SVirtualScanPhysiNode* pPhynode = (SVirtualScanPhysiNode*)pOper->pPhyNode;
295✔
794
  SVirtualTableScanInfo* pInfo = &pMergeInfo->virtualScanInfo;
295✔
795
  
796
  pOper->status = OP_NOT_OPENED;
295✔
797
  resetBasicOperatorState(&pMergeInfo->binfo);
295✔
798

799
  tsortDestroySortHandle(pInfo->pSortHandle);
295✔
800
  pInfo->pSortHandle = NULL;
295✔
801
  // taosArrayDestroy(pInfo->pSortInfo);
802
  // pInfo->pSortInfo = NULL;
803

804
  blockDataDestroy(pInfo->pIntermediateBlock);
295✔
805
  pInfo->pIntermediateBlock = NULL;
295✔
806

807
  blockDataDestroy(pInfo->pInputBlock);
295✔
808
  pInfo->pInputBlock = createDataBlockFromDescNode(((SPhysiNode*)pPhynode)->pOutputDataBlockDesc);
295✔
809
  TSDB_CHECK_NULL(pInfo->pInputBlock, code, lino, _exit, terrno);
295!
810

811
  pInfo->tagDownStreamId = -1;
295✔
812

813
  if (pInfo->pSortCtxList) {
295!
814
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSortCtxList); i++) {
×
815
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pInfo->pSortCtxList, i);
×
816
      blockDataDestroy(pCtx->pIntermediateBlock);
×
817
      taosMemoryFree(pCtx);
×
818
    }
819
    taosArrayDestroy(pInfo->pSortCtxList);
×
820
    pInfo->pSortCtxList = NULL;
×
821
  }
822

823
  pMergeInfo->pSavedTuple = NULL;
295✔
824
  pMergeInfo->pSavedTagBlock = NULL;
295✔
825

826
_exit:
295✔
827

828
  if (code) {
295!
829
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
830
  }
831

832
  return code;
295✔
833
}
834

835
int32_t createVirtualTableMergeOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
1,476✔
836
                                            SVirtualScanPhysiNode* pVirtualScanPhyNode,
837
                                            SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
838
  SPhysiNode*                    pPhyNode = (SPhysiNode*)pVirtualScanPhyNode;
1,476✔
839
  int32_t                        lino = 0;
1,476✔
840
  int32_t                        code = TSDB_CODE_SUCCESS;
1,476✔
841
  SVirtualScanMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SVirtualScanMergeOperatorInfo));
1,476!
842
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,476!
843
  SDataBlockDescNode*            pDescNode = pPhyNode->pOutputDataBlockDesc;
1,476✔
844
  SNodeList*                     pMergeKeys = NULL;
1,476✔
845

846
  QUERY_CHECK_NULL(pInfo, code, lino, _return, terrno);
1,476!
847
  QUERY_CHECK_NULL(pOperator, code, lino, _return, terrno);
1,476!
848

849
  pOperator->pPhyNode = pVirtualScanPhyNode;
1,476✔
850

851
  pInfo->binfo.inputTsOrder = pVirtualScanPhyNode->scan.node.inputTsOrder;
1,476✔
852
  pInfo->binfo.outputTsOrder = pVirtualScanPhyNode->scan.node.outputTsOrder;
1,476✔
853

854
  SVirtualTableScanInfo* pVirtualScanInfo = &pInfo->virtualScanInfo;
1,476✔
855
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
1,476✔
856
  TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _return, terrno);
1,476!
857

858
  SSDataBlock* pInputBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
1,476✔
859
  TSDB_CHECK_NULL(pInputBlock, code, lino, _return, terrno);
1,476!
860
  pVirtualScanInfo->pInputBlock = pInputBlock;
1,476✔
861
  pVirtualScanInfo->tagDownStreamId = -1;
1,476✔
862
  pVirtualScanInfo->vtableUid = (tb_uid_t)pVirtualScanPhyNode->scan.uid;
1,476✔
863
  if (pVirtualScanPhyNode->scan.pScanPseudoCols != NULL) {
1,476✔
864
    SExprSupp* pSup = &pVirtualScanInfo->base.pseudoSup;
24✔
865
    pSup->pExprInfo = NULL;
24✔
866
    VTS_ERR_JRET(createExprInfo(pVirtualScanPhyNode->scan.pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs));
24!
867

868
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset,
24✔
869
                                      &pTaskInfo->storageAPI.functionStore);
870
    TSDB_CHECK_NULL(pSup->pCtx, code, lino, _return, terrno);
24!
871
  }
872

873
  initResultSizeInfo(&pOperator->resultInfo, 1024);
1,476✔
874
  TSDB_CHECK_CODE(blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity), lino, _return);
1,476!
875

876
  size_t  numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock);
1,476✔
877
  int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
1,476✔
878

879
  if (!pVirtualScanPhyNode->scan.node.dynamicOp) {
1,476✔
880
    VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, 0));
1,470!
881
    pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys);
1,470✔
882
    TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno);
1,470!
883
  } else {
884
    pTaskInfo->dynamicTask = true;
6✔
885
  }
886
  pVirtualScanInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols);
1,476✔
887
  pVirtualScanInfo->sortBufSize =
1,476✔
888
      pVirtualScanInfo->bufPageSize * (numOfDownstream + 1);  // one additional is reserved for merged result.
1,476✔
889
  VTS_ERR_JRET(
1,476!
890
      extractColMap(pVirtualScanPhyNode->pTargets, &pVirtualScanInfo->dataSlotMap, &pVirtualScanInfo->tsSlotId, &pVirtualScanInfo->tagBlockId));
891

892
  pVirtualScanInfo->scanAllCols = pVirtualScanPhyNode->scanAllCols;
1,476✔
893

894
  VTS_ERR_JRET(filterInitFromNode((SNode*)pVirtualScanPhyNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo,
1,476!
895
                                  0, pTaskInfo->pStreamRuntimeInfo));
896

897
  pVirtualScanInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
1,476✔
898
  QUERY_CHECK_NULL(pVirtualScanInfo->base.metaCache.pTableMetaEntryCache, code, lino, _return, terrno);
1,476!
899

900
  setOperatorInfo(pOperator, "VirtualTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN, false,
1,476✔
901
                  OP_NOT_OPENED, pInfo, pTaskInfo);
902
  pOperator->fpSet =
903
      createOperatorFpSet(openVirtualTableScanOperator, virtualTableGetNext, NULL, destroyVirtualTableScanOperatorInfo,
1,476✔
904
                          optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
905
  setOperatorResetStateFn(pOperator, resetVirtualTableMergeOperState);
1,476✔
906

907
  if (NULL != pDownstream) {
1,476✔
908
    VTS_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));
1,467!
909
  } else {
910
    pVirtualScanInfo->tagDownStreamId = -1;
9✔
911
  }
912

913
  nodesDestroyList(pMergeKeys);
1,476✔
914
  *pOptrInfo = pOperator;
1,476✔
915
  return TSDB_CODE_SUCCESS;
1,476✔
916

917
_return:
×
918
  if (code != TSDB_CODE_SUCCESS) {
×
919
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
920
  }
921
  if (pInfo != NULL) {
×
922
    destroyVirtualTableScanOperatorInfo(pInfo);
×
923
  }
924
  nodesDestroyList(pMergeKeys);
×
925
  pTaskInfo->code = code;
×
926
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
927
  return code;
×
928
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc