• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.55
/source/libs/executor/src/virtualtablescanoperator.c
1
/*
2
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
*
4
* This program is free software: you can use, redistribute, and/or modify
5
* it under the terms of the GNU Affero General Public License, version 3
6
* or later ("AGPL"), as published by the Free Software Foundation.
7
*
8
* This program is distributed in the hope that it will be useful, but WITHOUT
9
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
* FITNESS FOR A PARTICULAR PURPOSE.
11
*
12
* You should have received a copy of the GNU Affero General Public License
13
* along with this program. If not, see <http://www.gnu.org/licenses/>.
14
*/
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "streamexecutorInt.h"
21
#include "tdatablock.h"
22
#include "ttime.h"
23
#include "virtualtablescan.h"
24
#include "tsort.h"
25

26
#define STREAM_VTABLE_MERGE_OP_NAME "StreamVtableMergeOperator"
27
#define STREAM_VTABLE_MERGE_OP_CHECKPOINT_NAME "StreamVtableMergeOperator_Checkpoint"
28

29
typedef struct SVirtualTableScanInfo {
30
  STableScanBase base;
31
  SArray*        pSortInfo;
32
  SSortHandle*   pSortHandle;
33
  int32_t        bufPageSize;
34
  uint32_t       sortBufSize;  // max buffer size for in-memory sort
35
  SSDataBlock*   pIntermediateBlock;   // to hold the intermediate result
36
  SSDataBlock*   pInputBlock;
37
  SHashObj*      dataSlotMap;
38
  int32_t        tsSlotId;
39
  int32_t        tagBlockId;
40
  int32_t        tagDownStreamId;
41
  bool           scanAllCols;
42
  SArray*        pSortCtxList;
43
  tb_uid_t       vtableUid;  // virtual table uid, used to identify the vtable scan operator
44
} SVirtualTableScanInfo;
45

46
typedef struct SVirtualScanMergeOperatorInfo {
47
  SOptrBasicInfo        binfo;
48
  EMergeType            type;
49
  SVirtualTableScanInfo virtualScanInfo;
50
  bool                  ignoreGroupId;
51
  uint64_t              groupId;
52
  STupleHandle*         pSavedTuple;
53
  SSDataBlock*          pSavedTagBlock;
54
} SVirtualScanMergeOperatorInfo;
55

56
typedef struct SLoadNextCtx {
57
  SOperatorInfo*  pOperator;
58
  SOperatorParam* pOperatorGetParam;
59
  int32_t         blockId;
60
  STimeWindow     window;
61
  SSDataBlock*    pIntermediateBlock;
62
  col_id_t        tsSlotId;
63
} SLoadNextCtx;
64

65
int32_t virtualScanloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
29,977✔
66
  SOperatorInfo* pOperator = (SOperatorInfo*)param;
29,977✔
67
  int32_t        code = TSDB_CODE_SUCCESS;
29,977✔
68

69
  VTS_ERR_JRET(pOperator->fpSet.getNextFn(pOperator, ppBlock));
29,977!
70
  VTS_ERR_JRET(blockDataCheck(*ppBlock));
29,977!
71

72
  return code;
29,977✔
73
_return:
×
74
  qError("failed to load data block from downstream, %s code:%s", __func__, tstrerror(code));
×
75
  return code;
×
76
}
77

78
int32_t getTimeWindowOfBlock(SSDataBlock *pBlock, col_id_t tsSlotId, int64_t *startTs, int64_t *endTs) {
187✔
79
  int32_t code = TSDB_CODE_SUCCESS;
187✔
80
  int32_t lino = 0;
187✔
81
  int32_t tsIndex = -1;
187✔
82
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
203!
83
    if (((SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i))->info.colId == tsSlotId) {
203✔
84
      tsIndex = i;
187✔
85
      break;
187✔
86
    }
87
  }
88

89
  if (tsIndex == -1) {
187!
90
    tsIndex = (int32_t)taosArrayGetSize(pBlock->pDataBlock) - 1;
×
91
  }
92

93
  SColumnInfoData *pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, tsIndex);
187✔
94
  QUERY_CHECK_NULL(pColData, code, lino, _return, terrno);
187!
95

96
  GET_TYPED_DATA(*startTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, 0), 0);
187✔
97
  GET_TYPED_DATA(*endTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, pBlock->info.rows - 1), 0);
187✔
98

99
  return code;
187✔
100
_return:
×
101
  qError("failed to get time window of block, %s code:%s", __func__, tstrerror(code));
×
102
  return code;
×
103
}
104

105
int32_t virtualScanloadNextDataBlockFromParam(void* param, SSDataBlock** ppBlock) {
260✔
106
  SLoadNextCtx*           pCtx = (SLoadNextCtx*)param;
260✔
107
  SOperatorInfo*          pOperator = pCtx->pOperator;
260✔
108
  SOperatorParam*         pOperatorGetParam = pCtx->pOperatorGetParam;
260✔
109
  int32_t                 code = TSDB_CODE_SUCCESS;
260✔
110
  SSDataBlock*            pRes = NULL;
260✔
111
  SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperatorGetParam->value;
260✔
112

113
  pParam->basic.window = pCtx->window;
260✔
114
  pOperator->status = OP_NOT_OPENED;
260✔
115
  if (pCtx->pIntermediateBlock) {
260✔
116
    blockDataDestroy(pCtx->pIntermediateBlock);
187✔
117
    pCtx->pIntermediateBlock = NULL;
187✔
118
  }
119

120
  VTS_ERR_JRET(pOperator->fpSet.getNextExtFn(pOperator, pOperatorGetParam, &pRes));
260!
121

122
  VTS_ERR_JRET(blockDataCheck(pRes));
260!
123
  if ((pRes)) {
260✔
124
    qDebug("%s load from downstream, blockId:%d", __func__, pCtx->blockId);
187✔
125
    (pRes)->info.id.blockId = pCtx->blockId;
187✔
126
    getTimeWindowOfBlock(pRes, pCtx->tsSlotId, &pCtx->window.skey, &pCtx->window.ekey);
187✔
127
    VTS_ERR_JRET(createOneDataBlock(pRes, true, &pCtx->pIntermediateBlock));
187!
128
    *ppBlock = pCtx->pIntermediateBlock;
187✔
129
  } else {
130
    pCtx->window.ekey = INT64_MAX;
73✔
131
    *ppBlock = NULL;
73✔
132
  }
133

134
  return code;
260✔
135
_return:
×
136
  qError("failed to load data block from downstream, %s code:%s", __func__, tstrerror(code));
×
137
  return code;
×
138
}
139

140
int32_t makeTSMergeKey(SNodeList** pMergeKeys, col_id_t tsSlotId) {
2,488✔
141
  int32_t           code = TSDB_CODE_SUCCESS;
2,488✔
142
  SNodeList        *pNodeList = NULL;
2,488✔
143
  SColumnNode      *pColumnNode = NULL;
2,488✔
144
  SOrderByExprNode *pOrderByExprNode = NULL;
2,488✔
145

146
  VTS_ERR_JRET(nodesMakeList(&pNodeList));
2,488!
147

148
  VTS_ERR_JRET(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pColumnNode));
2,488!
149
  pColumnNode->slotId = tsSlotId;
2,488✔
150

151
  VTS_ERR_JRET(nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR, (SNode**)&pOrderByExprNode));
2,488!
152
  pOrderByExprNode->pExpr = (SNode*)pColumnNode;
2,488✔
153
  pOrderByExprNode->order = ORDER_ASC;
2,488✔
154
  pOrderByExprNode->nullOrder = NULL_ORDER_FIRST;
2,488✔
155

156
  VTS_ERR_JRET(nodesListAppend(pNodeList, (SNode*)pOrderByExprNode));
2,488!
157

158
  *pMergeKeys = pNodeList;
2,488✔
159
  return code;
2,488✔
160
_return:
×
161
  nodesDestroyNode((SNode*)pColumnNode);
×
162
  nodesDestroyNode((SNode*)pOrderByExprNode);
×
163
  nodesDestroyList(pNodeList);
×
164
  return code;
×
165
}
166

167
void cleanUpVirtualScanInfo(SVirtualTableScanInfo* pVirtualScanInfo) {
40✔
168
  if (pVirtualScanInfo->pSortInfo) {
40✔
169
    taosArrayDestroy(pVirtualScanInfo->pSortInfo);
29✔
170
    pVirtualScanInfo->pSortInfo = NULL;
29✔
171
  }
172
  if (pVirtualScanInfo->pSortHandle) {
40✔
173
    tsortDestroySortHandle(pVirtualScanInfo->pSortHandle);
26✔
174
    pVirtualScanInfo->pSortHandle = NULL;
26✔
175
  }
176
  if (pVirtualScanInfo->pSortCtxList) {
40✔
177
    for (int32_t i = 0; i < taosArrayGetSize(pVirtualScanInfo->pSortCtxList); i++) {
70✔
178
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pVirtualScanInfo->pSortCtxList, i);
44✔
179
      blockDataDestroy(pCtx->pIntermediateBlock);
44✔
180
      taosMemoryFree(pCtx);
44!
181
    }
182
    taosArrayDestroy(pVirtualScanInfo->pSortCtxList);
26✔
183
  }
184
}
40✔
185

186
int32_t createSortHandleFromParam(SOperatorInfo* pOperator) {
40✔
187
  int32_t                         code = TSDB_CODE_SUCCESS;
40✔
188
  int32_t                         lino = 0;
40✔
189
  SVirtualScanMergeOperatorInfo*  pInfo = pOperator->info;
40✔
190
  SVirtualTableScanInfo*          pVirtualScanInfo = &pInfo->virtualScanInfo;
40✔
191
  SVTableScanOperatorParam *      pParam = (SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value;
40✔
192
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
40✔
193
  pVirtualScanInfo->sortBufSize = pVirtualScanInfo->bufPageSize * (taosArrayGetSize((pParam)->pOpParamArray) + 1);
40✔
194
  int32_t                         numOfBufPage = (int32_t)pVirtualScanInfo->sortBufSize / pVirtualScanInfo->bufPageSize;
40✔
195
  SNodeList*                      pMergeKeys = NULL;
40✔
196
  SSortSource*                    ps = NULL;
40✔
197
  int32_t                         scanOpIndex = 0;
40✔
198

199
  cleanUpVirtualScanInfo(pVirtualScanInfo);
40✔
200
  VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, pVirtualScanInfo->tsSlotId));
40!
201
  pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys);
40✔
202
  TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno);
40!
203
  nodesDestroyList(pMergeKeys);
40✔
204

205
  VTS_ERR_JRET(tsortCreateSortHandle(pVirtualScanInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pVirtualScanInfo->bufPageSize,
40!
206
                                     numOfBufPage, pVirtualScanInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pVirtualScanInfo->pSortHandle));
207

208
  tsortSetForceUsePQSort(pVirtualScanInfo->pSortHandle);
40✔
209
  tsortSetFetchRawDataFp(pVirtualScanInfo->pSortHandle, virtualScanloadNextDataBlockFromParam, NULL, NULL);
40✔
210

211
  if (pOperator->numOfDownstream > 2) {
40!
212
    qError("virtual scan operator should not have more than 2 downstreams, current numOfDownstream:%d", pOperator->numOfDownstream);
×
213
    VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM);
×
214
  }
215

216
  pVirtualScanInfo->tagDownStreamId = -1;
40✔
217
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
120✔
218
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
80✔
219
    if (pDownstream->resultDataBlockId == pVirtualScanInfo->tagBlockId) {
80✔
220
      // tag block do not need sort
221
      pVirtualScanInfo->tagDownStreamId = i;
40✔
222
      pInfo->pSavedTagBlock = NULL;
40✔
223
      continue;
40✔
224
    }
225
  }
226
  scanOpIndex = pVirtualScanInfo->tagDownStreamId == -1 ? 0 : 1 - pVirtualScanInfo->tagDownStreamId;
40!
227

228
  pOperator->pDownstream[scanOpIndex]->status = OP_NOT_OPENED;
40✔
229
  pVirtualScanInfo->pSortCtxList = taosArrayInit(taosArrayGetSize((pParam)->pOpParamArray), POINTER_BYTES);
40✔
230
  TSDB_CHECK_NULL(pVirtualScanInfo->pSortCtxList, code, lino, _return, terrno);
40!
231
  for (int32_t i = 0; i < taosArrayGetSize((pParam)->pOpParamArray); i++) {
113✔
232
    SOperatorParam* pOpParam = *(SOperatorParam**)taosArrayGet((pParam)->pOpParamArray, i);
73✔
233
    SLoadNextCtx*   pCtx = NULL;
73✔
234
    ps = NULL;
73✔
235

236
    pCtx = taosMemoryMalloc(sizeof(SLoadNextCtx));
73!
237
    QUERY_CHECK_NULL(pCtx, code, lino, _return, terrno);
73!
238
    pCtx->blockId = i;
73✔
239
    pCtx->pOperator = pOperator->pDownstream[scanOpIndex];
73✔
240
    pCtx->pOperatorGetParam = pOpParam;
73✔
241
    pCtx->window = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN};
73✔
242
    pCtx->pIntermediateBlock = NULL;
73✔
243
    pCtx->tsSlotId = (col_id_t)pVirtualScanInfo->tsSlotId;
73✔
244

245
    ps = taosMemoryCalloc(1, sizeof(SSortSource));
73!
246
    QUERY_CHECK_NULL(ps, code, lino, _return, terrno);
73!
247

248
    ps->param = pCtx;
73✔
249
    ps->onlyRef = true;
73✔
250

251
    VTS_ERR_JRET(tsortAddSource(pVirtualScanInfo->pSortHandle, ps));
73!
252
    QUERY_CHECK_NULL(taosArrayPush(pVirtualScanInfo->pSortCtxList, &pCtx), code, lino, _return, terrno);
146!
253
  }
254

255
  VTS_ERR_JRET(tsortOpen(pVirtualScanInfo->pSortHandle));
40!
256

257
  return code;
40✔
258
_return:
×
259
  if (code != 0){
×
260
    qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
261
  }
262
  nodesDestroyList(pMergeKeys);
×
263
  if (ps != NULL) {
×
264
    taosMemoryFree(ps);
×
265
  }
266
  return code;
×
267
}
268

269
int32_t createSortHandle(SOperatorInfo* pOperator) {
2,428✔
270
  SVirtualScanMergeOperatorInfo * pInfo = pOperator->info;
2,428✔
271
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
2,428✔
272
  SVirtualTableScanInfo*          pVirtualScanInfo = &pInfo->virtualScanInfo;
2,428✔
273
  int32_t                         numOfBufPage = (int32_t)pVirtualScanInfo->sortBufSize / pVirtualScanInfo->bufPageSize;
2,428✔
274
  SSortSource*                    ps = NULL;
2,428✔
275
  int32_t                         code = 0;
2,428✔
276
  int32_t                         lino = 0;
2,428✔
277

278
  VTS_ERR_JRET(tsortCreateSortHandle(pVirtualScanInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pVirtualScanInfo->bufPageSize,
2,428!
279
                                     numOfBufPage, pVirtualScanInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pVirtualScanInfo->pSortHandle));
280

281
  tsortSetForceUsePQSort(pVirtualScanInfo->pSortHandle);
2,428✔
282
  tsortSetFetchRawDataFp(pVirtualScanInfo->pSortHandle, virtualScanloadNextDataBlock, NULL, NULL);
2,428✔
283

284
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
11,840✔
285
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
9,412✔
286
    if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
9,412!
287
      VTS_ERR_JRET(pDownstream->fpSet._openFn(pDownstream));
9,412!
288
    } else {
289
      VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM);
×
290
    }
291

292
    if (pDownstream->resultDataBlockId == pVirtualScanInfo->tagBlockId) {
9,412✔
293
      // tag block do not need sort
294
      pVirtualScanInfo->tagDownStreamId = i;
184✔
295
      continue;
184✔
296
    }
297

298
    ps = taosMemoryCalloc(1, sizeof(SSortSource));
9,228!
299
    TSDB_CHECK_NULL(ps, code, lino, _return, terrno);
9,228!
300

301
    ps->param = pDownstream;
9,228✔
302
    ps->onlyRef = true;
9,228✔
303

304
    VTS_ERR_JRET(tsortAddSource(pVirtualScanInfo->pSortHandle, ps));
9,228!
305
    ps = NULL;
9,228✔
306
  }
307

308
  VTS_ERR_JRET(tsortOpen(pVirtualScanInfo->pSortHandle));
2,428!
309

310
_return:
2,428✔
311
  if (code != 0){
2,428!
312
    qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
313
  }
314
  if (ps != NULL) {
2,428!
315
    taosMemoryFree(ps);
×
316
  }
317
  return code;
2,428✔
318
}
319

320
int32_t openVirtualTableScanOperatorImpl(SOperatorInfo* pOperator) {
2,488✔
321
  SVirtualScanMergeOperatorInfo * pInfo = pOperator->info;
2,488✔
322
  int32_t                         code = 0;
2,488✔
323
  int32_t                         lino = 0;
2,488✔
324

325
  if (pOperator->numOfDownstream == 0) {
2,488✔
326
    return code;
20✔
327
  }
328

329
  if (pOperator->pOperatorGetParam) {
2,468✔
330
    VTS_ERR_JRET(createSortHandleFromParam(pOperator));
40!
331
  } else {
332
    VTS_ERR_JRET(createSortHandle(pOperator));
2,428!
333
  }
334

335
  return code;
2,468✔
336

337
_return:
×
338
  qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
339
  return code;
×
340
}
341

342
int32_t openVirtualTableScanOperator(SOperatorInfo* pOperator) {
38,512✔
343
  int32_t code = 0;
38,512✔
344

345
  if (OPTR_IS_OPENED(pOperator)) {
38,512✔
346
    return TSDB_CODE_SUCCESS;
36,024✔
347
  }
348

349
  int64_t startTs = taosGetTimestampUs();
2,488✔
350

351
  code = openVirtualTableScanOperatorImpl(pOperator);
2,488✔
352

353
  pOperator->cost.openCost = (double)(taosGetTimestampUs() - startTs) / 1000.0;
2,488✔
354
  pOperator->status = OP_RES_TO_RETURN;
2,488✔
355

356
  VTS_ERR_RET(code);
2,488!
357

358
  OPTR_SET_OPENED(pOperator);
2,488✔
359
  return code;
2,488✔
360
}
361

362
static int32_t doGetVtableMergedBlockData(SVirtualScanMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
40,980✔
363
                                          SSDataBlock* p) {
364
  int32_t code = 0;
40,980✔
365
  int64_t lastTs = 0;
40,980✔
366
  int64_t rowNums = -1;
40,980✔
367
  blockDataEmpty(p);
40,980✔
368
  while (1) {
69,663,095✔
369
    STupleHandle* pTupleHandle = NULL;
69,671,057✔
370
    if (!pInfo->pSavedTuple) {
69,671,057✔
371
      code = tsortNextTuple(pHandle, &pTupleHandle);
69,634,624✔
372
      if (pTupleHandle == NULL || (code != 0)) {
70,023,320✔
373
        break;
374
      }
375
    } else {
376
      pTupleHandle = pInfo->pSavedTuple;
36,433✔
377
      pInfo->pSavedTuple = NULL;
36,433✔
378
    }
379

380
    SDataBlockInfo info = {0};
70,055,470✔
381
    tsortGetBlockInfo(pTupleHandle, &info);
70,055,470✔
382
    int32_t blockId = (int32_t)info.id.blockId;
70,236,214✔
383

384
    for (int32_t i = 0; i < (pInfo->virtualScanInfo.scanAllCols ? 1 : tsortGetColNum(pTupleHandle)); i++) {
188,727,854✔
385
      bool isNull = tsortIsNullVal(pTupleHandle, i);
118,393,736✔
386
      if (isNull) {
118,253,769✔
387
        int32_t slotKey = blockId << 16 | i;
7✔
388
        void*   slotId = taosHashGet(pInfo->virtualScanInfo.dataSlotMap, &slotKey, sizeof(slotKey));
7✔
389
        if (slotId == NULL) {
7!
390
          if (i == 0) {
×
391
            colDataSetNULL(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums);
×
392
          } else {
393
            qError("failed to get slotId from dataSlotMap, blockId:%d, slotId:%d", blockId, i);
×
394
            VTS_ERR_RET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
395
          }
396
        } else {
397
          colDataSetNULL(taosArrayGet(p->pDataBlock, *(int32_t *)slotId), rowNums);
7✔
398
        }
399
      } else {
400
        char* pData = NULL;
118,253,762✔
401
        tsortGetValue(pTupleHandle, i, (void**)&pData);
118,253,762✔
402

403
        if (pData != NULL) {
118,172,182!
404
          if (i == 0) {
118,172,182✔
405
            if (lastTs != *(int64_t*)pData) {
70,029,008✔
406
              if (rowNums >= capacity - 1) {
38,878,793✔
407
                pInfo->pSavedTuple = pTupleHandle;
36,697✔
408
                goto _return;
36,697✔
409
              }
410
              rowNums++;
38,842,096✔
411
              for (int32_t j = 0; j < taosArrayGetSize(p->pDataBlock); j++) {
179,982,890✔
412
                colDataSetNULL(taosArrayGet(p->pDataBlock, j), rowNums);
141,175,601✔
413
              }
414
              if (pInfo->virtualScanInfo.tsSlotId != -1) {
38,122,036✔
415
                VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums, pData, false));
36,750,716!
416
              }
417
              lastTs = *(int64_t*)pData;
38,708,832✔
418
            }
419
          }
420
          int32_t slotKey = blockId << 16 | i;
118,002,221✔
421
          void*   slotId = taosHashGet(pInfo->virtualScanInfo.dataSlotMap, &slotKey, sizeof(slotKey));
118,002,221✔
422
          if (slotId == NULL) {
118,037,315✔
423
            if (i == 0) {
69,734,921!
424
              continue;
69,734,921✔
425
            } else {
426
              qError("failed to get slotId from dataSlotMap, blockId:%d, slotId:%d", blockId, i);
×
427
              VTS_ERR_RET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
428
            }
429
          }
430
          VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, *(int32_t *)slotId), rowNums, pData, false));
48,302,394!
431
        }
432
      }
433
    }
434
  }
435
_return:
40,980✔
436
  p->info.rows = rowNums + 1;
40,980✔
437
  p->info.dataLoad = 1;
40,980✔
438
  p->info.scanFlag = MAIN_SCAN;
40,980✔
439
  return code;
40,980✔
440
}
441

442
static int32_t doGetVStableMergedBlockData(SVirtualScanMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
470✔
443
                                           SSDataBlock* p) {
444
  int32_t code = 0;
470✔
445
  int64_t lastTs = 0;
470✔
446
  int64_t rowNums = -1;
470✔
447
  blockDataEmpty(p);
470✔
448
  while (1) {
570,208✔
449
    STupleHandle* pTupleHandle = NULL;
570,678✔
450
    if (!pInfo->pSavedTuple) {
570,678✔
451
      code = tsortNextTuple(pHandle, &pTupleHandle);
570,282✔
452
      if (pTupleHandle == NULL || (code != 0)) {
570,282!
453
        break;
454
      }
455
    } else {
456
      pTupleHandle = pInfo->pSavedTuple;
396✔
457
      pInfo->pSavedTuple = NULL;
396✔
458
    }
459

460
    int32_t tsIndex = -1;
570,604✔
461

462
    for (int32_t i = 0; i < tsortGetColNum(pTupleHandle); i++) {
570,812!
463
      if (tsortIsNullVal(pTupleHandle, i)) {
570,812!
464
        continue;
×
465
      } else {
466
        SColumnInfoData *pColInfo = NULL;
570,812✔
467
        tsortGetColumnInfo(pTupleHandle, i, &pColInfo);
570,812✔
468
        if (pColInfo->info.slotId ==  pInfo->virtualScanInfo.tsSlotId) {
570,812✔
469
          tsIndex = i;
570,604✔
470
          break;
570,604✔
471
        }
472
      }
473
    }
474

475
    if (tsIndex == -1) {
570,604!
476
      tsIndex = (int32_t)tsortGetColNum(pTupleHandle) - 1;
×
477
    }
478

479
    char* pData = NULL;
570,604✔
480
    // first, set ts slot's data
481
    // then, set other slots' data
482
    tsortGetValue(pTupleHandle, tsIndex, (void**)&pData);
570,604✔
483

484
    if (pData != NULL) {
570,604!
485
      if (lastTs != *(int64_t*)pData) {
570,604✔
486
        if (rowNums >= capacity - 1) {
420,598✔
487
          pInfo->pSavedTuple = pTupleHandle;
396✔
488
          goto _return;
396✔
489
        }
490
        rowNums++;
420,202✔
491
        for (int32_t j = 0; j < taosArrayGetSize(p->pDataBlock); j++) {
9,730,901✔
492
          colDataSetNULL(taosArrayGet(p->pDataBlock, j), rowNums);
9,310,699✔
493
        }
494
        if (pInfo->virtualScanInfo.tsSlotId != -1) {
420,202!
495
          VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums, pData, false));
420,202!
496
        }
497
        lastTs = *(int64_t*)pData;
420,202✔
498
      }
499
    }
500
    if (pInfo->virtualScanInfo.scanAllCols) {
570,208!
501
      continue;
×
502
    }
503

504
    for (int32_t i = 0; i < tsortGetColNum(pTupleHandle); i++) {
13,221,040✔
505
      if (i == tsIndex || tsortIsNullVal(pTupleHandle, i)) {
12,650,832✔
506
        continue;
10,370,416✔
507
      }
508

509
      SColumnInfoData *pColInfo = NULL;
2,280,416✔
510
      tsortGetColumnInfo(pTupleHandle, i, &pColInfo);
2,280,416✔
511
      tsortGetValue(pTupleHandle, i, (void**)&pData);
2,280,416✔
512

513
      if (pData != NULL) {
2,280,416!
514
        VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, i), rowNums, pData, false));
2,280,416!
515
      }
516
    }
517
  }
518
_return:
470✔
519
  p->info.rows = rowNums + 1;
470✔
520
  p->info.dataLoad = 1;
470✔
521
  p->info.scanFlag = MAIN_SCAN;
470✔
522
  return code;
470✔
523
}
524

525
int32_t doVirtualTableMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
41,470✔
526
  int32_t                        code = 0;
41,470✔
527
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
41,470✔
528
  SVirtualScanMergeOperatorInfo* pInfo = pOperator->info;
41,470✔
529
  SVirtualTableScanInfo*         pVirtualScanInfo = &pInfo->virtualScanInfo;
41,470✔
530
  SSortHandle*                   pHandle = pVirtualScanInfo->pSortHandle;
41,470✔
531
  SSDataBlock*                   pDataBlock = pInfo->binfo.pRes;
41,470✔
532
  int32_t                        capacity = pOperator->resultInfo.capacity;
41,470✔
533

534
  qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
41,470✔
535
  blockDataCleanup(pDataBlock);
41,470✔
536

537
  if (pHandle == NULL) {
41,470✔
538
    return TSDB_CODE_SUCCESS;
20✔
539
  }
540

541
  if (pVirtualScanInfo->pIntermediateBlock == NULL) {
41,450✔
542
    VTS_ERR_RET(tsortGetSortedDataBlock(pHandle, &pVirtualScanInfo->pIntermediateBlock));
2,442!
543
    if (pVirtualScanInfo->pIntermediateBlock == NULL) {
2,442!
544
      return TSDB_CODE_SUCCESS;
×
545
    }
546

547
    VTS_ERR_RET(blockDataEnsureCapacity(pVirtualScanInfo->pIntermediateBlock, capacity));
2,442!
548
  } else {
549
    blockDataCleanup(pVirtualScanInfo->pIntermediateBlock);
39,008✔
550
  }
551

552
  SSDataBlock* p = pVirtualScanInfo->pIntermediateBlock;
41,450✔
553
  if (pOperator->pOperatorGetParam) {
41,450✔
554
    VTS_ERR_RET(doGetVStableMergedBlockData(pInfo, pHandle, capacity, p));
470!
555
  } else {
556
    VTS_ERR_RET(doGetVtableMergedBlockData(pInfo, pHandle, capacity, p));
40,980!
557
  }
558

559
  VTS_ERR_RET(copyDataBlock(pDataBlock, p));
41,450!
560
  qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
41,450✔
561
         pDataBlock->info.rows);
562

563
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
41,450✔
564
  return code;
41,450✔
565
}
566

567
int32_t vtableAddTagPseudoColumnData(SVirtualTableScanInfo *pInfo, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* tagBlock, SSDataBlock* pBlock, int32_t rows) {
1,851✔
568
  int32_t          code = TSDB_CODE_SUCCESS;
1,851✔
569
  int32_t          lino = 0;
1,851✔
570
  int64_t          backupRows;
571
  // currently only the tbname pseudo column
572
  if (numOfExpr <= 0) {
1,851!
573
    return TSDB_CODE_SUCCESS;
×
574
  }
575

576
  if (tagBlock == NULL) {
1,851!
577
    return TSDB_CODE_SUCCESS;
×
578
  }
579

580
  if (tagBlock->info.rows != 1) {
1,851!
581
    qError("tag block should have only one row, current rows:%" PRId64, tagBlock->info.rows);
×
582
    VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
583
  }
584

585
  backupRows = pBlock->info.rows;
1,851✔
586
  pBlock->info.rows = rows;
1,851✔
587
  for (int32_t j = 0; j < numOfExpr; ++j) {
6,096✔
588
    const SExprInfo* pExpr1 = &pExpr[j];
4,245✔
589
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
4,245✔
590

591
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
4,245✔
592
    TSDB_CHECK_NULL(pColInfoData, code, lino, _return, terrno);
4,245!
593
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
4,245✔
594

595
    SColumnInfoData* pTagInfoData = taosArrayGet(tagBlock->pDataBlock, j);
4,245✔
596
    TSDB_CHECK_NULL(pTagInfoData, code, lino, _return, terrno);
4,245!
597

598
    if (colDataIsNull_s(pTagInfoData, 0) || IS_JSON_NULL(pTagInfoData->info.type, colDataGetData(pTagInfoData, 0))) {
4,245!
599
      colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
600
      continue;
×
601
    }
602

603
    char* data = colDataGetData(pTagInfoData, 0);
4,245!
604

605
    if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
4,245!
606
      code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
4,245✔
607
      QUERY_CHECK_CODE(code, lino, _return);
4,245!
608
    } else {  // todo opt for json tag
609
      for (int32_t i = 0; i < pBlock->info.rows; ++i) {
×
610
        code = colDataSetVal(pColInfoData, i, data, false);
×
611
        QUERY_CHECK_CODE(code, lino, _return);
×
612
      }
613
    }
614
  }
615

616
  // restore the rows
617
  pBlock->info.rows = backupRows;
1,851✔
618

619
_return:
1,851✔
620

621
  if (code != TSDB_CODE_SUCCESS) {
1,851!
622
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
623
  }
624
  return code;
1,851✔
625
}
626

627
static int32_t doSetTagColumnData(SVirtualTableScanInfo* pInfo, SSDataBlock* pTagBlock, SSDataBlock* pBlock,
39,246✔
628
                                  SExecTaskInfo* pTaskInfo, int32_t rows) {
629
  int32_t         code = 0;
39,246✔
630
  STableScanBase* pTableScanInfo = &pInfo->base;
39,246✔
631
  SExprSupp*      pSup = &pTableScanInfo->pseudoSup;
39,246✔
632
  if (pSup->numOfExprs > 0) {
39,246✔
633
    VTS_ERR_RET(vtableAddTagPseudoColumnData(pInfo,  pSup->pExprInfo, pSup->numOfExprs, pTagBlock, pBlock, rows));
1,851!
634
  }
635

636
  return code;
39,246✔
637
}
638

639
int32_t virtualTableGetNext(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
38,512✔
640
  int32_t                        code = TSDB_CODE_SUCCESS;
38,512✔
641
  int32_t                        lino = 0;
38,512✔
642
  SVirtualScanMergeOperatorInfo* pInfo = pOperator->info;
38,512✔
643
  SVirtualTableScanInfo*         pVirtualScanInfo = &pInfo->virtualScanInfo;
38,512✔
644
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
38,512✔
645

646
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
38,512!
647
    pResBlock = NULL;
×
648
    return code;
×
649
  }
650

651
  VTS_ERR_JRET(pOperator->fpSet._openFn(pOperator));
38,512!
652

653
  if (pVirtualScanInfo->tagBlockId != -1 && pVirtualScanInfo->tagDownStreamId != -1 && !pInfo->pSavedTagBlock) {
38,512✔
654
    SSDataBlock*   pTagBlock = NULL;
224✔
655
    SOperatorInfo *pTagScanOp = pOperator->pDownstream[pVirtualScanInfo->tagDownStreamId];
224✔
656
    if (pOperator->pOperatorGetParam) {
224✔
657
      SOperatorParam* pTagOp = ((SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value)->pTagScanOp;
40✔
658
      pTagScanOp->fpSet.getNextExtFn(pTagScanOp, pTagOp, &pTagBlock);
40✔
659
    } else {
660
      pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagBlock);
184✔
661
    }
662

663
    if (pTagBlock == NULL || pTagBlock->info.rows != 1) {
224!
664
      VTS_ERR_JRET(TSDB_CODE_FAILED);
×
665
    }
666
    pInfo->pSavedTagBlock = pTagBlock;
224✔
667
  }
668

669
  while(1) {
670
    VTS_ERR_JRET(doVirtualTableMerge(pOperator, pResBlock));
41,470!
671
    if (*pResBlock == NULL) {
41,470✔
672
      setOperatorCompleted(pOperator);
2,224✔
673
      break;
2,224✔
674
    }
675

676
    if (pOperator->pOperatorGetParam) {
39,246✔
677
      uint64_t uid = ((SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value)->uid;
430✔
678
      (*pResBlock)->info.id.uid = uid;
430✔
679
    } else {
680
      (*pResBlock)->info.id.uid = pInfo->virtualScanInfo.vtableUid;
38,816✔
681
    }
682

683
    VTS_ERR_JRET(doSetTagColumnData(pVirtualScanInfo, pInfo->pSavedTagBlock, (*pResBlock), pTaskInfo, (*pResBlock)->info.rows));
39,246!
684
    VTS_ERR_JRET(doFilter(*pResBlock, pOperator->exprSupp.pFilterInfo, NULL));
39,246!
685
    if ((*pResBlock)->info.rows > 0) {
39,246✔
686
      break;
36,288✔
687
    }
688
  }
689

690
  return code;
38,512✔
691
_return:
×
692
  if (code != TSDB_CODE_SUCCESS) {
×
693
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
694
    pTaskInfo->code = code;
×
695
    T_LONG_JMP(pTaskInfo->env, code);
×
696
  }
697
  return code;
×
698
}
699

700
static void destroyTableScanBase(STableScanBase* pBase, TsdReader* pAPI) {
2,459✔
701
  cleanupQueryTableDataCond(&pBase->cond);
2,459✔
702

703
  if (pAPI->tsdReaderClose) {
2,459!
704
    pAPI->tsdReaderClose(pBase->dataReader);
×
705
  }
706
  pBase->dataReader = NULL;
2,459✔
707

708
  if (pBase->matchInfo.pList != NULL) {
2,459!
709
    taosArrayDestroy(pBase->matchInfo.pList);
×
710
  }
711

712
  taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
2,459✔
713
  cleanupExprSupp(&pBase->pseudoSup);
2,459✔
714
}
2,459✔
715

716
void destroyVirtualTableScanOperatorInfo(void* param) {
2,459✔
717
  if (!param) {
2,459!
718
    return;
×
719
  }
720
  SVirtualScanMergeOperatorInfo* pOperatorInfo = (SVirtualScanMergeOperatorInfo*)param;
2,459✔
721
  SVirtualTableScanInfo* pInfo = &pOperatorInfo->virtualScanInfo;
2,459✔
722
  blockDataDestroy(pOperatorInfo->binfo.pRes);
2,459✔
723
  pOperatorInfo->binfo.pRes = NULL;
2,459✔
724

725
  tsortDestroySortHandle(pInfo->pSortHandle);
2,459✔
726
  pInfo->pSortHandle = NULL;
2,459✔
727
  taosArrayDestroy(pInfo->pSortInfo);
2,459✔
728
  pInfo->pSortInfo = NULL;
2,459✔
729

730
  blockDataDestroy(pInfo->pIntermediateBlock);
2,459✔
731
  pInfo->pIntermediateBlock = NULL;
2,459✔
732

733
  blockDataDestroy(pInfo->pInputBlock);
2,459✔
734
  pInfo->pInputBlock = NULL;
2,459✔
735
  destroyTableScanBase(&pInfo->base, &pInfo->base.readerAPI);
2,459✔
736

737
  taosHashCleanup(pInfo->dataSlotMap);
2,459✔
738

739
  if (pInfo->pSortCtxList) {
2,459✔
740
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSortCtxList); i++) {
27✔
741
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pInfo->pSortCtxList, i);
21✔
742
      blockDataDestroy(pCtx->pIntermediateBlock);
21✔
743
      taosMemoryFree(pCtx);
21!
744
    }
745
    taosArrayDestroy(pInfo->pSortCtxList);
6✔
746
    pInfo->pSortCtxList = NULL;
6✔
747
  }
748
  taosMemoryFreeClear(param);
2,459!
749
}
750

751
int32_t extractColMap(SNodeList* pNodeList, SHashObj** pSlotMap, int32_t *tsSlotId, int32_t *tagBlockId) {
2,459✔
752
  size_t  numOfCols = LIST_LENGTH(pNodeList);
2,459✔
753
  int32_t code = TSDB_CODE_SUCCESS;
2,459✔
754
  int32_t lino = 0;
2,459✔
755

756
  if (numOfCols == 0) {
2,459✔
757
    return code;
3✔
758
  }
759

760
  *tsSlotId = -1;
2,456✔
761
  *tagBlockId = -1;
2,456✔
762
  *pSlotMap = taosHashInit(numOfCols, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
2,456✔
763
  TSDB_CHECK_NULL(*pSlotMap, code, lino, _return, terrno);
2,456!
764

765
  for (int32_t i = 0; i < numOfCols; ++i) {
11,999✔
766
    SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
9,543✔
767
    TSDB_CHECK_NULL(pColNode, code, lino, _return, terrno);
9,543!
768

769
    if (pColNode->isPrimTs) {
9,543✔
770
      *tsSlotId = i;
2,218✔
771
    } else if (pColNode->hasRef) {
7,325✔
772
      int32_t slotKey = pColNode->dataBlockId << 16 | pColNode->slotId;
6,514✔
773
      VTS_ERR_JRET(taosHashPut(*pSlotMap, &slotKey, sizeof(slotKey), &i, sizeof(i)));
6,514!
774
    } else if (pColNode->colType == COLUMN_TYPE_TAG || '\0' == pColNode->tableAlias[0]) {
811✔
775
      // tag column or pseudo column's function
776
      *tagBlockId = pColNode->dataBlockId;
311✔
777
    }
778
  }
779

780
  return code;
2,456✔
781
_return:
×
782
  taosHashCleanup(*pSlotMap);
×
783
  *pSlotMap = NULL;
×
784
  if (code != TSDB_CODE_SUCCESS) {
×
785
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
786
  }
787
  return code;
×
788
}
789

790
int32_t resetVirtualTableMergeOperState(SOperatorInfo* pOper) {
8✔
791
  int32_t code = 0, lino = 0;
8✔
792
  SVirtualScanMergeOperatorInfo* pMergeInfo = pOper->info;
8✔
793
  SVirtualScanPhysiNode* pPhynode = (SVirtualScanPhysiNode*)pOper->pPhyNode;
8✔
794
  SVirtualTableScanInfo* pInfo = &pMergeInfo->virtualScanInfo;
8✔
795
  
796
  pOper->status = OP_NOT_OPENED;
8✔
797
  resetBasicOperatorState(&pMergeInfo->binfo);
8✔
798

799
  tsortDestroySortHandle(pInfo->pSortHandle);
8✔
800
  pInfo->pSortHandle = NULL;
8✔
801
  // taosArrayDestroy(pInfo->pSortInfo);
802
  // pInfo->pSortInfo = NULL;
803

804
  blockDataDestroy(pInfo->pIntermediateBlock);
8✔
805
  pInfo->pIntermediateBlock = NULL;
8✔
806

807
  blockDataDestroy(pInfo->pInputBlock);
8✔
808
  pInfo->pInputBlock = createDataBlockFromDescNode(((SPhysiNode*)pPhynode)->pOutputDataBlockDesc);
8✔
809
  TSDB_CHECK_NULL(pInfo->pInputBlock, code, lino, _exit, terrno);
8!
810

811
  pInfo->tagDownStreamId = -1;
8✔
812

813
  if (pInfo->pSortCtxList) {
8!
814
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSortCtxList); i++) {
16✔
815
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pInfo->pSortCtxList, i);
8✔
816
      blockDataDestroy(pCtx->pIntermediateBlock);
8✔
817
      taosMemoryFree(pCtx);
8!
818
    }
819
    taosArrayDestroy(pInfo->pSortCtxList);
8✔
820
    pInfo->pSortCtxList = NULL;
8✔
821
  }
822

823
  pMergeInfo->pSavedTuple = NULL;
8✔
824
  pMergeInfo->pSavedTagBlock = NULL;
8✔
825

826
_exit:
8✔
827

828
  if (code) {
8!
829
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
830
  }
831

832
  return code;
8✔
833
}
834

835
int32_t createVirtualTableMergeOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
2,459✔
836
                                            SVirtualScanPhysiNode* pVirtualScanPhyNode,
837
                                            SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
838
  SPhysiNode*                    pPhyNode = (SPhysiNode*)pVirtualScanPhyNode;
2,459✔
839
  int32_t                        lino = 0;
2,459✔
840
  int32_t                        code = TSDB_CODE_SUCCESS;
2,459✔
841
  SVirtualScanMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SVirtualScanMergeOperatorInfo));
2,459!
842
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2,459!
843
  SDataBlockDescNode*            pDescNode = pPhyNode->pOutputDataBlockDesc;
2,459✔
844
  SNodeList*                     pMergeKeys = NULL;
2,459✔
845

846
  QUERY_CHECK_NULL(pInfo, code, lino, _return, terrno);
2,459!
847
  QUERY_CHECK_NULL(pOperator, code, lino, _return, terrno);
2,459!
848

849
  pOperator->pPhyNode = pVirtualScanPhyNode;
2,459✔
850

851
  pInfo->binfo.inputTsOrder = pVirtualScanPhyNode->scan.node.inputTsOrder;
2,459✔
852
  pInfo->binfo.outputTsOrder = pVirtualScanPhyNode->scan.node.outputTsOrder;
2,459✔
853

854
  SVirtualTableScanInfo* pVirtualScanInfo = &pInfo->virtualScanInfo;
2,459✔
855
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
2,459✔
856
  TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _return, terrno);
2,459!
857

858
  SSDataBlock* pInputBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
2,459✔
859
  TSDB_CHECK_NULL(pInputBlock, code, lino, _return, terrno);
2,459!
860
  pVirtualScanInfo->pInputBlock = pInputBlock;
2,459✔
861
  pVirtualScanInfo->tagDownStreamId = -1;
2,459✔
862
  pVirtualScanInfo->vtableUid = (tb_uid_t)pVirtualScanPhyNode->scan.uid;
2,459✔
863
  if (pVirtualScanPhyNode->scan.pScanPseudoCols != NULL) {
2,459✔
864
    SExprSupp* pSup = &pVirtualScanInfo->base.pseudoSup;
195✔
865
    pSup->pExprInfo = NULL;
195✔
866
    VTS_ERR_JRET(createExprInfo(pVirtualScanPhyNode->scan.pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs));
195!
867

868
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset,
195✔
869
                                      &pTaskInfo->storageAPI.functionStore);
870
    TSDB_CHECK_NULL(pSup->pCtx, code, lino, _return, terrno);
195!
871
  }
872

873
  initResultSizeInfo(&pOperator->resultInfo, 1024);
2,459✔
874
  TSDB_CHECK_CODE(blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity), lino, _return);
2,459!
875

876
  size_t  numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock);
2,459✔
877
  int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
2,459✔
878

879
  if (!pVirtualScanPhyNode->scan.node.dynamicOp) {
2,459✔
880
    VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, 0));
2,448!
881
    pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys);
2,448✔
882
    TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno);
2,448!
883
  } else {
884
    pTaskInfo->dynamicTask = true;
11✔
885
  }
886
  pVirtualScanInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols);
2,459✔
887
  pVirtualScanInfo->sortBufSize =
2,459✔
888
      pVirtualScanInfo->bufPageSize * (numOfDownstream + 1);  // one additional is reserved for merged result.
2,459✔
889
  VTS_ERR_JRET(
2,459!
890
      extractColMap(pVirtualScanPhyNode->pTargets, &pVirtualScanInfo->dataSlotMap, &pVirtualScanInfo->tsSlotId, &pVirtualScanInfo->tagBlockId));
891

892
  pVirtualScanInfo->scanAllCols = pVirtualScanPhyNode->scanAllCols;
2,459✔
893

894
  VTS_ERR_JRET(filterInitFromNode((SNode*)pVirtualScanPhyNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo,
2,459!
895
                                  0, pTaskInfo->pStreamRuntimeInfo));
896

897
  pVirtualScanInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
2,459✔
898
  QUERY_CHECK_NULL(pVirtualScanInfo->base.metaCache.pTableMetaEntryCache, code, lino, _return, terrno);
2,459!
899

900
  setOperatorInfo(pOperator, "VirtualTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN, false,
2,459✔
901
                  OP_NOT_OPENED, pInfo, pTaskInfo);
902
  pOperator->fpSet =
903
      createOperatorFpSet(openVirtualTableScanOperator, virtualTableGetNext, NULL, destroyVirtualTableScanOperatorInfo,
2,459✔
904
                          optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
905
  setOperatorResetStateFn(pOperator, resetVirtualTableMergeOperState);
2,459✔
906

907
  if (NULL != pDownstream) {
2,459✔
908
    VTS_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));
2,439!
909
  } else {
910
    pVirtualScanInfo->tagDownStreamId = -1;
20✔
911
  }
912

913
  nodesDestroyList(pMergeKeys);
2,459✔
914
  *pOptrInfo = pOperator;
2,459✔
915
  return TSDB_CODE_SUCCESS;
2,459✔
916

917
_return:
×
918
  if (code != TSDB_CODE_SUCCESS) {
×
919
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
920
  }
921
  if (pInfo != NULL) {
×
922
    destroyVirtualTableScanOperatorInfo(pInfo);
×
923
  }
924
  nodesDestroyList(pMergeKeys);
×
925
  pTaskInfo->code = code;
×
926
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
927
  return code;
×
928
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc