• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4791

13 Oct 2025 06:50AM UTC coverage: 57.628% (-0.8%) from 58.476%
#4791

push

travis-ci

web-flow
Merge pull request #33213 from taosdata/fix/huoh/timemoe_model_directory

fix: fix tdgpt timemoe model directory

136628 of 303332 branches covered (45.04%)

Branch coverage included in aggregate %.

208121 of 294900 relevant lines covered (70.57%)

4250784.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.57
/source/libs/executor/src/virtualtablescanoperator.c
1
/*
2
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
*
4
* This program is free software: you can use, redistribute, and/or modify
5
* it under the terms of the GNU Affero General Public License, version 3
6
* or later ("AGPL"), as published by the Free Software Foundation.
7
*
8
* This program is distributed in the hope that it will be useful, but WITHOUT
9
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
* FITNESS FOR A PARTICULAR PURPOSE.
11
*
12
* You should have received a copy of the GNU Affero General Public License
13
* along with this program. If not, see <http://www.gnu.org/licenses/>.
14
*/
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "tdatablock.h"
21
#include "virtualtablescan.h"
22
#include "tsort.h"
23

24
typedef struct SVirtualTableScanInfo {
25
  STableScanBase base;
26
  SArray*        pSortInfo;
27
  SSortHandle*   pSortHandle;
28
  int32_t        bufPageSize;
29
  uint32_t       sortBufSize;  // max buffer size for in-memory sort
30
  SSDataBlock*   pIntermediateBlock;   // to hold the intermediate result
31
  SSDataBlock*   pInputBlock;
32
  SHashObj*      dataSlotMap;
33
  int32_t        tsSlotId;
34
  int32_t        tagBlockId;
35
  int32_t        tagDownStreamId;
36
  bool           scanAllCols;
37
  SArray*        pSortCtxList;
38
  tb_uid_t       vtableUid;  // virtual table uid, used to identify the vtable scan operator
39
} SVirtualTableScanInfo;
40

41
typedef struct SVirtualScanMergeOperatorInfo {
42
  SOptrBasicInfo        binfo;
43
  EMergeType            type;
44
  SVirtualTableScanInfo virtualScanInfo;
45
  bool                  ignoreGroupId;
46
  uint64_t              groupId;
47
  STupleHandle*         pSavedTuple;
48
  SSDataBlock*          pSavedTagBlock;
49
} SVirtualScanMergeOperatorInfo;
50

51
typedef struct SLoadNextCtx {
52
  SOperatorInfo*  pOperator;
53
  SOperatorParam* pOperatorGetParam;
54
  int32_t         blockId;
55
  STimeWindow     window;
56
  SSDataBlock*    pIntermediateBlock;
57
  col_id_t        tsSlotId;
58
} SLoadNextCtx;
59

60
int32_t virtualScanloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
37,031✔
61
  SOperatorInfo* pOperator = (SOperatorInfo*)param;
37,031✔
62
  int32_t        code = TSDB_CODE_SUCCESS;
37,031✔
63

64
  VTS_ERR_JRET(pOperator->fpSet.getNextFn(pOperator, ppBlock));
37,031!
65
  VTS_ERR_JRET(blockDataCheck(*ppBlock));
37,031!
66

67
  return code;
37,031✔
68
_return:
×
69
  qError("failed to load data block from downstream, %s code:%s", __func__, tstrerror(code));
×
70
  return code;
×
71
}
72

73
int32_t getTimeWindowOfBlock(SSDataBlock *pBlock, col_id_t tsSlotId, int64_t *startTs, int64_t *endTs) {
34,018✔
74
  int32_t code = TSDB_CODE_SUCCESS;
34,018✔
75
  int32_t lino = 0;
34,018✔
76
  int32_t tsIndex = -1;
34,018✔
77
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
71,377!
78
    if (((SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i))->info.colId == tsSlotId) {
71,377✔
79
      tsIndex = i;
34,018✔
80
      break;
34,018✔
81
    }
82
  }
83

84
  if (tsIndex == -1) {
34,018!
85
    tsIndex = (int32_t)taosArrayGetSize(pBlock->pDataBlock) - 1;
×
86
  }
87

88
  SColumnInfoData *pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, tsIndex);
34,018✔
89
  QUERY_CHECK_NULL(pColData, code, lino, _return, terrno)
34,018!
90

91
  GET_TYPED_DATA(*startTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, 0), 0);
34,018✔
92
  GET_TYPED_DATA(*endTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, pBlock->info.rows - 1), 0);
34,018✔
93

94
  return code;
34,018✔
95
_return:
×
96
  qError("failed to get time window of block, %s code:%s, line:%d", __func__, tstrerror(code), lino);
×
97
  return code;
×
98
}
99

100
int32_t virtualScanloadNextDataBlockFromParam(void* param, SSDataBlock** ppBlock) {
45,370✔
101
  SLoadNextCtx*           pCtx = (SLoadNextCtx*)param;
45,370✔
102
  SOperatorInfo*          pOperator = pCtx->pOperator;
45,370✔
103
  SOperatorParam*         pOperatorGetParam = pCtx->pOperatorGetParam;
45,370✔
104
  int32_t                 code = TSDB_CODE_SUCCESS;
45,370✔
105
  SSDataBlock*            pRes = NULL;
45,370✔
106
  SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperatorGetParam->value;
45,370✔
107

108
  pParam->basic.window = pCtx->window;
45,370✔
109
  pOperator->status = OP_NOT_OPENED;
45,370✔
110
  if (pCtx->pIntermediateBlock) {
45,370✔
111
    blockDataDestroy(pCtx->pIntermediateBlock);
33,935✔
112
    pCtx->pIntermediateBlock = NULL;
33,935✔
113
  }
114

115
  VTS_ERR_JRET(pOperator->fpSet.getNextExtFn(pOperator, pOperatorGetParam, &pRes));
45,370!
116

117
  VTS_ERR_JRET(blockDataCheck(pRes));
45,370!
118
  if ((pRes)) {
45,370✔
119
    qDebug("%s load from downstream, blockId:%d", __func__, pCtx->blockId);
34,018✔
120
    (pRes)->info.id.blockId = pCtx->blockId;
34,018✔
121
    VTS_ERR_JRET(getTimeWindowOfBlock(pRes, pCtx->tsSlotId, &pCtx->window.skey, &pCtx->window.ekey));
34,018!
122
    VTS_ERR_JRET(createOneDataBlock(pRes, true, &pCtx->pIntermediateBlock));
34,018!
123
    *ppBlock = pCtx->pIntermediateBlock;
34,018✔
124
  } else {
125
    pCtx->window.ekey = INT64_MAX;
11,352✔
126
    *ppBlock = NULL;
11,352✔
127
  }
128

129
  return code;
45,370✔
130
_return:
×
131
  qError("failed to load data block from downstream, %s code:%s", __func__, tstrerror(code));
×
132
  return code;
×
133
}
134

135
int32_t makeTSMergeKey(SNodeList** pMergeKeys, col_id_t tsSlotId) {
7,920✔
136
  int32_t           code = TSDB_CODE_SUCCESS;
7,920✔
137
  SNodeList        *pNodeList = NULL;
7,920✔
138
  SColumnNode      *pColumnNode = NULL;
7,920✔
139
  SOrderByExprNode *pOrderByExprNode = NULL;
7,920✔
140

141
  VTS_ERR_JRET(nodesMakeList(&pNodeList));
7,920!
142

143
  VTS_ERR_JRET(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pColumnNode));
7,920!
144
  pColumnNode->slotId = tsSlotId;
7,920✔
145

146
  VTS_ERR_JRET(nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR, (SNode**)&pOrderByExprNode));
7,920!
147
  pOrderByExprNode->pExpr = (SNode*)pColumnNode;
7,920✔
148
  pOrderByExprNode->order = ORDER_ASC;
7,920✔
149
  pOrderByExprNode->nullOrder = NULL_ORDER_FIRST;
7,920✔
150

151
  VTS_ERR_JRET(nodesListAppend(pNodeList, (SNode*)pOrderByExprNode));
7,920!
152

153
  *pMergeKeys = pNodeList;
7,920✔
154
  return code;
7,920✔
155
_return:
×
156
  nodesDestroyNode((SNode*)pColumnNode);
×
157
  nodesDestroyNode((SNode*)pOrderByExprNode);
×
158
  nodesDestroyList(pNodeList);
×
159
  return code;
×
160
}
161

162
void cleanUpVirtualScanInfo(SVirtualTableScanInfo* pVirtualScanInfo) {
4,500✔
163
  if (pVirtualScanInfo->pSortInfo) {
4,500✔
164
    taosArrayDestroy(pVirtualScanInfo->pSortInfo);
3,354✔
165
    pVirtualScanInfo->pSortInfo = NULL;
3,354✔
166
  }
167
  if (pVirtualScanInfo->pSortHandle) {
4,500✔
168
    tsortDestroySortHandle(pVirtualScanInfo->pSortHandle);
3,326✔
169
    pVirtualScanInfo->pSortHandle = NULL;
3,326✔
170
  }
171
  if (pVirtualScanInfo->pSortCtxList) {
4,500✔
172
    for (int32_t i = 0; i < taosArrayGetSize(pVirtualScanInfo->pSortCtxList); i++) {
10,979✔
173
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pVirtualScanInfo->pSortCtxList, i);
7,653✔
174
      blockDataDestroy(pCtx->pIntermediateBlock);
7,653✔
175
      taosMemoryFree(pCtx);
7,653!
176
    }
177
    taosArrayDestroy(pVirtualScanInfo->pSortCtxList);
3,326✔
178
  }
179
}
4,500✔
180

181
int32_t createSortHandleFromParam(SOperatorInfo* pOperator) {
4,500✔
182
  int32_t                         code = TSDB_CODE_SUCCESS;
4,500✔
183
  int32_t                         lino = 0;
4,500✔
184
  SVirtualScanMergeOperatorInfo*  pInfo = pOperator->info;
4,500✔
185
  SVirtualTableScanInfo*          pVirtualScanInfo = &pInfo->virtualScanInfo;
4,500✔
186
  SVTableScanOperatorParam *      pParam = (SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value;
4,500✔
187
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
4,500✔
188
  pVirtualScanInfo->sortBufSize = pVirtualScanInfo->bufPageSize * (taosArrayGetSize((pParam)->pOpParamArray) + 1);
4,500✔
189
  int32_t                         numOfBufPage = (int32_t)pVirtualScanInfo->sortBufSize / pVirtualScanInfo->bufPageSize;
4,500✔
190
  SNodeList*                      pMergeKeys = NULL;
4,500✔
191
  SSortSource*                    ps = NULL;
4,500✔
192
  int32_t                         scanOpIndex = 0;
4,500✔
193

194
  cleanUpVirtualScanInfo(pVirtualScanInfo);
4,500✔
195
  VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, pVirtualScanInfo->tsSlotId));
4,500!
196
  pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys);
4,500✔
197
  TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno)
4,500!
198
  nodesDestroyList(pMergeKeys);
4,500✔
199

200
  VTS_ERR_JRET(tsortCreateSortHandle(pVirtualScanInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pVirtualScanInfo->bufPageSize,
4,500!
201
                                     numOfBufPage, pVirtualScanInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pVirtualScanInfo->pSortHandle));
202

203
  tsortSetForceUsePQSort(pVirtualScanInfo->pSortHandle);
4,500✔
204
  tsortSetFetchRawDataFp(pVirtualScanInfo->pSortHandle, virtualScanloadNextDataBlockFromParam, NULL, NULL);
4,500✔
205

206
  if (pOperator->numOfDownstream > 2) {
4,500!
207
    qError("virtual scan operator should not have more than 2 downstreams, current numOfDownstream:%d", pOperator->numOfDownstream);
×
208
    VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM);
×
209
  }
210

211
  pVirtualScanInfo->tagDownStreamId = -1;
4,500✔
212
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
12,499✔
213
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
7,999✔
214
    if (pDownstream->resultDataBlockId == pVirtualScanInfo->tagBlockId) {
7,999✔
215
      // tag block do not need sort
216
      pVirtualScanInfo->tagDownStreamId = i;
3,499✔
217
      pInfo->pSavedTagBlock = NULL;
3,499✔
218
      continue;
3,499✔
219
    }
220
  }
221
  scanOpIndex = pVirtualScanInfo->tagDownStreamId == -1 ? 0 : 1 - pVirtualScanInfo->tagDownStreamId;
4,500✔
222

223
  pOperator->pDownstream[scanOpIndex]->status = OP_NOT_OPENED;
4,500✔
224
  pVirtualScanInfo->pSortCtxList = taosArrayInit(taosArrayGetSize((pParam)->pOpParamArray), POINTER_BYTES);
4,500✔
225
  TSDB_CHECK_NULL(pVirtualScanInfo->pSortCtxList, code, lino, _return, terrno)
4,500!
226
  for (int32_t i = 0; i < taosArrayGetSize((pParam)->pOpParamArray); i++) {
15,935✔
227
    SOperatorParam* pOpParam = *(SOperatorParam**)taosArrayGet((pParam)->pOpParamArray, i);
11,435✔
228
    SLoadNextCtx*   pCtx = NULL;
11,435✔
229
    ps = NULL;
11,435✔
230

231
    pCtx = taosMemoryMalloc(sizeof(SLoadNextCtx));
11,435!
232
    QUERY_CHECK_NULL(pCtx, code, lino, _return, terrno)
11,435!
233
    pCtx->blockId = i;
11,435✔
234
    pCtx->pOperator = pOperator->pDownstream[scanOpIndex];
11,435✔
235
    pCtx->pOperatorGetParam = pOpParam;
11,435✔
236
    pCtx->window = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN};
11,435✔
237
    pCtx->pIntermediateBlock = NULL;
11,435✔
238
    pCtx->tsSlotId = (col_id_t)pVirtualScanInfo->tsSlotId;
11,435✔
239

240
    ps = taosMemoryCalloc(1, sizeof(SSortSource));
11,435!
241
    QUERY_CHECK_NULL(ps, code, lino, _return, terrno)
11,435!
242

243
    ps->param = pCtx;
11,435✔
244
    ps->onlyRef = true;
11,435✔
245

246
    VTS_ERR_JRET(tsortAddSource(pVirtualScanInfo->pSortHandle, ps));
11,435!
247
    QUERY_CHECK_NULL(taosArrayPush(pVirtualScanInfo->pSortCtxList, &pCtx), code, lino, _return, terrno)
22,870!
248
  }
249

250
  VTS_ERR_JRET(tsortOpen(pVirtualScanInfo->pSortHandle));
4,500!
251

252
  return code;
4,500✔
253
_return:
×
254
  if (code != 0){
×
255
    qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
256
  }
257
  nodesDestroyList(pMergeKeys);
×
258
  if (ps != NULL) {
×
259
    taosMemoryFree(ps);
×
260
  }
261
  return code;
×
262
}
263

264
int32_t createSortHandle(SOperatorInfo* pOperator) {
3,389✔
265
  SVirtualScanMergeOperatorInfo * pInfo = pOperator->info;
3,389✔
266
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
3,389✔
267
  SVirtualTableScanInfo*          pVirtualScanInfo = &pInfo->virtualScanInfo;
3,389✔
268
  int32_t                         numOfBufPage = (int32_t)pVirtualScanInfo->sortBufSize / pVirtualScanInfo->bufPageSize;
3,389✔
269
  SSortSource*                    ps = NULL;
3,389✔
270
  int32_t                         code = 0;
3,389✔
271
  int32_t                         lino = 0;
3,389✔
272

273
  VTS_ERR_JRET(tsortCreateSortHandle(pVirtualScanInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pVirtualScanInfo->bufPageSize,
3,389!
274
                                     numOfBufPage, pVirtualScanInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pVirtualScanInfo->pSortHandle));
275

276
  tsortSetForceUsePQSort(pVirtualScanInfo->pSortHandle);
3,389✔
277
  tsortSetFetchRawDataFp(pVirtualScanInfo->pSortHandle, virtualScanloadNextDataBlock, NULL, NULL);
3,389✔
278

279
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
15,242✔
280
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
11,853✔
281
    if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
11,853!
282
      VTS_ERR_JRET(pDownstream->fpSet._openFn(pDownstream));
11,853!
283
    } else {
284
      VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM);
×
285
    }
286

287
    if (pDownstream->resultDataBlockId == pVirtualScanInfo->tagBlockId) {
11,853✔
288
      // tag block do not need sort
289
      pVirtualScanInfo->tagDownStreamId = i;
350✔
290
      continue;
350✔
291
    }
292

293
    ps = taosMemoryCalloc(1, sizeof(SSortSource));
11,503!
294
    TSDB_CHECK_NULL(ps, code, lino, _return, terrno)
11,503!
295

296
    ps->param = pDownstream;
11,503✔
297
    ps->onlyRef = true;
11,503✔
298

299
    VTS_ERR_JRET(tsortAddSource(pVirtualScanInfo->pSortHandle, ps));
11,503!
300
    ps = NULL;
11,503✔
301
  }
302

303
  VTS_ERR_JRET(tsortOpen(pVirtualScanInfo->pSortHandle));
3,389!
304

305
_return:
3,389✔
306
  if (code != 0){
3,389!
307
    qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
308
  }
309
  if (ps != NULL) {
3,389!
310
    taosMemoryFree(ps);
×
311
  }
312
  return code;
3,389✔
313
}
314

315
int32_t openVirtualTableScanOperatorImpl(SOperatorInfo* pOperator) {
7,920✔
316
  int32_t                         code = 0;
7,920✔
317
  int32_t                         lino = 0;
7,920✔
318

319
  if (pOperator->numOfDownstream == 0) {
7,920✔
320
    return code;
31✔
321
  }
322

323
  if (pOperator->pOperatorGetParam) {
7,889✔
324
    VTS_ERR_JRET(createSortHandleFromParam(pOperator));
4,500!
325
  } else {
326
    VTS_ERR_JRET(createSortHandle(pOperator));
3,389!
327
  }
328

329
  return code;
7,889✔
330

331
_return:
×
332
  qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
333
  return code;
×
334
}
335

336
int32_t openVirtualTableScanOperator(SOperatorInfo* pOperator) {
94,241✔
337
  int32_t code = 0;
94,241✔
338

339
  if (OPTR_IS_OPENED(pOperator)) {
94,241✔
340
    return TSDB_CODE_SUCCESS;
86,321✔
341
  }
342

343
  int64_t startTs = taosGetTimestampUs();
7,920✔
344

345
  code = openVirtualTableScanOperatorImpl(pOperator);
7,920✔
346

347
  pOperator->cost.openCost = (double)(taosGetTimestampUs() - startTs) / 1000.0;
7,920✔
348
  pOperator->status = OP_RES_TO_RETURN;
7,920✔
349

350
  VTS_ERR_RET(code);
7,920!
351

352
  OPTR_SET_OPENED(pOperator);
7,920✔
353
  return code;
7,920✔
354
}
355

356
static int32_t doGetVtableMergedBlockData(SVirtualScanMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
54,735✔
357
                                          SSDataBlock* p) {
358
  int32_t code = 0;
54,735✔
359
  int64_t lastTs = 0;
54,735✔
360
  int64_t rowNums = -1;
54,735✔
361
  blockDataEmpty(p);
54,735✔
362
  while (1) {
86,058,166✔
363
    STupleHandle* pTupleHandle = NULL;
86,091,648✔
364
    if (!pInfo->pSavedTuple) {
86,091,648✔
365
      code = tsortNextTuple(pHandle, &pTupleHandle);
86,043,109✔
366
      if (pTupleHandle == NULL || (code != 0)) {
86,287,922✔
367
        break;
368
      }
369
    } else {
370
      pTupleHandle = pInfo->pSavedTuple;
48,539✔
371
      pInfo->pSavedTuple = NULL;
48,539✔
372
    }
373

374
    SDataBlockInfo info = {0};
86,330,793✔
375
    tsortGetBlockInfo(pTupleHandle, &info);
86,330,793✔
376
    int32_t blockId = (int32_t)info.id.blockId;
86,495,235✔
377

378
    for (int32_t i = 0; i < (pInfo->virtualScanInfo.scanAllCols ? 1 : tsortGetColNum(pTupleHandle)); i++) {
255,909,697✔
379
      bool isNull = tsortIsNullVal(pTupleHandle, i);
169,185,962✔
380
      if (isNull) {
169,023,040!
381
        int32_t slotKey = blockId << 16 | i;
×
382
        void*   slotId = taosHashGet(pInfo->virtualScanInfo.dataSlotMap, &slotKey, sizeof(slotKey));
×
383
        if (slotId == NULL) {
×
384
          if (i == 0) {
×
385
            colDataSetNULL(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums);
×
386
          } else {
387
            qError("failed to get slotId from dataSlotMap, blockId:%d, slotId:%d", blockId, i);
×
388
            VTS_ERR_RET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
389
          }
390
        } else {
391
          colDataSetNULL(taosArrayGet(p->pDataBlock, *(int32_t *)slotId), rowNums);
×
392
        }
393
      } else {
394
        char* pData = NULL;
169,023,040✔
395
        tsortGetValue(pTupleHandle, i, (void**)&pData);
169,023,040✔
396

397
        if (pData != NULL) {
168,939,342!
398
          if (i == 0) {
168,939,342✔
399
            if (lastTs != *(int64_t*)pData) {
86,367,581✔
400
              if (rowNums >= capacity - 1) {
52,096,166✔
401
                pInfo->pSavedTuple = pTupleHandle;
49,067✔
402
                goto _return;
49,067✔
403
              }
404
              rowNums++;
52,047,099✔
405
              for (int32_t j = 0; j < taosArrayGetSize(p->pDataBlock); j++) {
283,442,329✔
406
                colDataSetNULL(taosArrayGet(p->pDataBlock, j), rowNums);
231,468,916✔
407
              }
408
              if (pInfo->virtualScanInfo.tsSlotId != -1) {
51,380,462✔
409
                VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums, pData, false));
48,058,708!
410
              }
411
              lastTs = *(int64_t*)pData;
51,853,707✔
412
            }
413
          }
414
          int32_t slotKey = blockId << 16 | i;
168,696,883✔
415
          void*   slotId = taosHashGet(pInfo->virtualScanInfo.dataSlotMap, &slotKey, sizeof(slotKey));
168,696,883✔
416
          if (slotId == NULL) {
168,972,933✔
417
            if (i == 0) {
86,091,180!
418
              continue;
86,091,180✔
419
            } else {
420
              qError("failed to get slotId from dataSlotMap, blockId:%d, slotId:%d", blockId, i);
×
421
              VTS_ERR_RET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
422
            }
423
          }
424
          VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, *(int32_t *)slotId), rowNums, pData, false));
82,881,753!
425
        }
426
      }
427
    }
428
  }
429
_return:
54,735✔
430
  p->info.rows = rowNums + 1;
54,735✔
431
  p->info.dataLoad = 1;
54,735✔
432
  p->info.scanFlag = MAIN_SCAN;
54,735✔
433
  return code;
54,735✔
434
}
435

436
static int32_t doGetVStableMergedBlockData(SVirtualScanMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
62,115✔
437
                                           SSDataBlock* p) {
438
  int32_t code = 0;
62,115✔
439
  int64_t lastTs = 0;
62,115✔
440
  int64_t rowNums = -1;
62,115✔
441
  blockDataEmpty(p);
62,115✔
442
  while (1) {
112,979,579✔
443
    STupleHandle* pTupleHandle = NULL;
113,041,694✔
444
    if (!pInfo->pSavedTuple) {
113,041,694✔
445
      code = tsortNextTuple(pHandle, &pTupleHandle);
112,987,346✔
446
      if (pTupleHandle == NULL || (code != 0)) {
112,987,347!
447
        break;
448
      }
449
    } else {
450
      pTupleHandle = pInfo->pSavedTuple;
54,348✔
451
      pInfo->pSavedTuple = NULL;
54,348✔
452
    }
453

454
    int32_t tsIndex = -1;
113,033,952✔
455

456
    for (int32_t i = 0; i < tsortGetColNum(pTupleHandle); i++) {
237,542,306!
457
      if (tsortIsNullVal(pTupleHandle, i)) {
237,542,307✔
458
        continue;
96,149,140✔
459
      } else {
460
        SColumnInfoData *pColInfo = NULL;
141,393,166✔
461
        tsortGetColumnInfo(pTupleHandle, i, &pColInfo);
141,393,166✔
462
        if (pColInfo->info.slotId ==  pInfo->virtualScanInfo.tsSlotId) {
141,393,165✔
463
          tsIndex = i;
113,033,951✔
464
          break;
113,033,951✔
465
        }
466
      }
467
    }
468

469
    if (tsIndex == -1) {
113,033,951!
470
      tsIndex = (int32_t)tsortGetColNum(pTupleHandle) - 1;
×
471
    }
472

473
    char* pData = NULL;
113,033,951✔
474
    // first, set ts slot's data
475
    // then, set other slots' data
476
    tsortGetValue(pTupleHandle, tsIndex, (void**)&pData);
113,033,951✔
477

478
    if (pData != NULL) {
113,033,951!
479
      if (lastTs != *(int64_t*)pData) {
113,033,951✔
480
        if (rowNums >= capacity - 1) {
58,063,238✔
481
          pInfo->pSavedTuple = pTupleHandle;
54,372✔
482
          goto _return;
54,372✔
483
        }
484
        rowNums++;
58,008,866✔
485
        for (int32_t j = 0; j < taosArrayGetSize(p->pDataBlock); j++) {
641,047,747✔
486
          colDataSetNULL(taosArrayGet(p->pDataBlock, j), rowNums);
583,038,881✔
487
        }
488
        if (pInfo->virtualScanInfo.tsSlotId != -1) {
58,008,863!
489
          VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums, pData, false));
58,008,866!
490
        }
491
        lastTs = *(int64_t*)pData;
58,008,866✔
492
      }
493
    }
494
    if (pInfo->virtualScanInfo.scanAllCols) {
112,979,579✔
495
      continue;
12,300,000✔
496
    }
497

498
    for (int32_t i = 0; i < tsortGetColNum(pTupleHandle); i++) {
1,535,664,700✔
499
      if (i == tsIndex || tsortIsNullVal(pTupleHandle, i)) {
1,434,985,110✔
500
        continue;
1,276,995,314✔
501
      }
502

503
      SColumnInfoData *pColInfo = NULL;
157,989,795✔
504
      tsortGetColumnInfo(pTupleHandle, i, &pColInfo);
157,989,795✔
505
      tsortGetValue(pTupleHandle, i, (void**)&pData);
157,989,805✔
506

507
      if (pData != NULL) {
157,989,805!
508
        VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, i), rowNums, pData, false));
157,989,805!
509
      }
510
    }
511
  }
512
_return:
62,115✔
513
  p->info.rows = rowNums + 1;
62,115✔
514
  p->info.dataLoad = 1;
62,115✔
515
  p->info.scanFlag = MAIN_SCAN;
62,115✔
516
  return code;
62,115✔
517
}
518

519
int32_t doVirtualTableMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
116,881✔
520
  int32_t                        code = 0;
116,881✔
521
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
116,881✔
522
  SVirtualScanMergeOperatorInfo* pInfo = pOperator->info;
116,881✔
523
  SVirtualTableScanInfo*         pVirtualScanInfo = &pInfo->virtualScanInfo;
116,881✔
524
  SSortHandle*                   pHandle = pVirtualScanInfo->pSortHandle;
116,881✔
525
  SSDataBlock*                   pDataBlock = pInfo->binfo.pRes;
116,881✔
526
  int32_t                        capacity = pOperator->resultInfo.capacity;
116,881✔
527

528
  qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
116,881✔
529
  blockDataCleanup(pDataBlock);
116,881✔
530

531
  if (pHandle == NULL) {
116,881✔
532
    return TSDB_CODE_SUCCESS;
31✔
533
  }
534

535
  if (pVirtualScanInfo->pIntermediateBlock == NULL) {
116,850✔
536
    VTS_ERR_RET(tsortGetSortedDataBlock(pHandle, &pVirtualScanInfo->pIntermediateBlock));
4,563!
537
    if (pVirtualScanInfo->pIntermediateBlock == NULL) {
4,563!
538
      return TSDB_CODE_SUCCESS;
×
539
    }
540

541
    VTS_ERR_RET(blockDataEnsureCapacity(pVirtualScanInfo->pIntermediateBlock, capacity));
4,563!
542
  } else {
543
    blockDataCleanup(pVirtualScanInfo->pIntermediateBlock);
112,287✔
544
  }
545

546
  SSDataBlock* p = pVirtualScanInfo->pIntermediateBlock;
116,850✔
547
  if (pOperator->pOperatorGetParam) {
116,850✔
548
    VTS_ERR_RET(doGetVStableMergedBlockData(pInfo, pHandle, capacity, p));
62,115!
549
  } else {
550
    VTS_ERR_RET(doGetVtableMergedBlockData(pInfo, pHandle, capacity, p));
54,735!
551
  }
552

553
  VTS_ERR_RET(copyDataBlock(pDataBlock, p));
116,848!
554
  qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
116,850✔
555
         pDataBlock->info.rows);
556

557
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
116,850✔
558
  return code;
116,850✔
559
}
560

561
int32_t vtableAddTagPseudoColumnData(const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* tagBlock, SSDataBlock* pBlock, int32_t rows) {
48,500✔
562
  int32_t          code = TSDB_CODE_SUCCESS;
48,500✔
563
  int32_t          lino = 0;
48,500✔
564
  int64_t          backupRows;
565
  // currently only the tbname pseudo column
566
  if (numOfExpr <= 0) {
48,500!
567
    return TSDB_CODE_SUCCESS;
×
568
  }
569

570
  if (tagBlock == NULL) {
48,500!
571
    return TSDB_CODE_SUCCESS;
×
572
  }
573

574
  if (tagBlock->info.rows != 1) {
48,500!
575
    qError("tag block should have only one row, current rows:%" PRId64, tagBlock->info.rows);
×
576
    VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
577
  }
578

579
  backupRows = pBlock->info.rows;
48,500✔
580
  pBlock->info.rows = rows;
48,500✔
581
  for (int32_t j = 0; j < numOfExpr; ++j) {
181,384✔
582
    const SExprInfo* pExpr1 = &pExpr[j];
132,884✔
583
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
132,884✔
584

585
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
132,884✔
586
    TSDB_CHECK_NULL(pColInfoData, code, lino, _return, terrno)
132,884!
587
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
132,884✔
588

589
    SColumnInfoData* pTagInfoData = taosArrayGet(tagBlock->pDataBlock, j);
132,884✔
590
    TSDB_CHECK_NULL(pTagInfoData, code, lino, _return, terrno)
132,884!
591

592
    if (colDataIsNull_s(pTagInfoData, 0) || IS_JSON_NULL(pTagInfoData->info.type, colDataGetData(pTagInfoData, 0))) {
132,884!
593
      colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
594
      continue;
×
595
    }
596

597
    char* data = colDataGetData(pTagInfoData, 0);
132,884!
598

599
    if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
132,884!
600
      code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, 1, false);
132,884✔
601
      QUERY_CHECK_CODE(code, lino, _return);
132,884!
602
    } else {  // todo opt for json tag
603
      for (int32_t i = 0; i < pBlock->info.rows; ++i) {
×
604
        code = colDataSetVal(pColInfoData, i, data, false);
×
605
        QUERY_CHECK_CODE(code, lino, _return);
×
606
      }
607
    }
608
  }
609

610
  // restore the rows
611
  pBlock->info.rows = backupRows;
48,500✔
612

613
_return:
48,500✔
614

615
  if (code != TSDB_CODE_SUCCESS) {
48,500!
616
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
617
  }
618
  return code;
48,500✔
619
}
620

621
static int32_t doSetTagColumnData(SVirtualTableScanInfo* pInfo, SSDataBlock* pTagBlock, SSDataBlock* pBlock, int32_t rows) {
109,513✔
622
  int32_t         code = 0;
109,513✔
623
  STableScanBase* pTableScanInfo = &pInfo->base;
109,513✔
624
  SExprSupp*      pSup = &pTableScanInfo->pseudoSup;
109,513✔
625
  if (pSup->numOfExprs > 0) {
109,513✔
626
    VTS_ERR_RET(vtableAddTagPseudoColumnData(pSup->pExprInfo, pSup->numOfExprs, pTagBlock, pBlock, rows));
48,500!
627
  }
628

629
  return code;
109,513✔
630
}
631

632
int32_t virtualTableGetNext(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
94,241✔
633
  int32_t                        code = TSDB_CODE_SUCCESS;
94,241✔
634
  int32_t                        lino = 0;
94,241✔
635
  SVirtualScanMergeOperatorInfo* pInfo = pOperator->info;
94,241✔
636
  SVirtualTableScanInfo*         pVirtualScanInfo = &pInfo->virtualScanInfo;
94,241✔
637
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
94,241✔
638

639
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
94,241!
640
    pResBlock = NULL;
×
641
    return code;
×
642
  }
643

644
  VTS_ERR_JRET(pOperator->fpSet._openFn(pOperator));
94,241!
645

646
  if (pVirtualScanInfo->tagBlockId != -1 && pVirtualScanInfo->tagDownStreamId != -1 && !pInfo->pSavedTagBlock) {
94,241✔
647
    SSDataBlock*   pTagBlock = NULL;
3,849✔
648
    SOperatorInfo *pTagScanOp = pOperator->pDownstream[pVirtualScanInfo->tagDownStreamId];
3,849✔
649
    if (pOperator->pOperatorGetParam) {
3,849✔
650
      SOperatorParam* pTagOp = ((SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value)->pTagScanOp;
3,499✔
651
      VTS_ERR_JRET(pTagScanOp->fpSet.getNextExtFn(pTagScanOp, pTagOp, &pTagBlock));
3,499!
652
    } else {
653
      VTS_ERR_JRET(pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagBlock));
350!
654
    }
655

656
    if (pTagBlock == NULL || pTagBlock->info.rows != 1) {
3,849!
657
      VTS_ERR_JRET(TSDB_CODE_FAILED);
×
658
    }
659
    pInfo->pSavedTagBlock = pTagBlock;
3,849✔
660
  }
661

662
  while(1) {
663
    VTS_ERR_JRET(doVirtualTableMerge(pOperator, pResBlock));
116,881!
664
    if (*pResBlock == NULL) {
116,881✔
665
      setOperatorCompleted(pOperator);
7,368✔
666
      break;
7,368✔
667
    }
668

669
    if (pOperator->pOperatorGetParam) {
109,513✔
670
      uint64_t uid = ((SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value)->uid;
57,639✔
671
      (*pResBlock)->info.id.uid = uid;
57,639✔
672
    } else {
673
      (*pResBlock)->info.id.uid = pInfo->virtualScanInfo.vtableUid;
51,874✔
674
    }
675

676
    VTS_ERR_JRET(doSetTagColumnData(pVirtualScanInfo, pInfo->pSavedTagBlock, (*pResBlock), (*pResBlock)->info.rows));
109,513!
677
    VTS_ERR_JRET(doFilter(*pResBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL));
109,513!
678
    if ((*pResBlock)->info.rows > 0) {
109,513✔
679
      break;
86,873✔
680
    }
681
  }
682

683
  return code;
94,241✔
684
_return:
×
685
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
686
  pTaskInfo->code = code;
×
687
  T_LONG_JMP(pTaskInfo->env, code);
×
688
}
689

690
static void destroyTableScanBase(STableScanBase* pBase, TsdReader* pAPI) {
4,566✔
691
  cleanupQueryTableDataCond(&pBase->cond);
4,566✔
692

693
  if (pAPI->tsdReaderClose) {
4,566!
694
    pAPI->tsdReaderClose(pBase->dataReader);
×
695
  }
696
  pBase->dataReader = NULL;
4,566✔
697

698
  if (pBase->matchInfo.pList != NULL) {
4,566!
699
    taosArrayDestroy(pBase->matchInfo.pList);
×
700
  }
701

702
  taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
4,566✔
703
  cleanupExprSupp(&pBase->pseudoSup);
4,566✔
704
}
4,566✔
705

706
void destroyVirtualTableScanOperatorInfo(void* param) {
4,566✔
707
  if (!param) {
4,566!
708
    return;
×
709
  }
710
  SVirtualScanMergeOperatorInfo* pOperatorInfo = (SVirtualScanMergeOperatorInfo*)param;
4,566✔
711
  SVirtualTableScanInfo* pInfo = &pOperatorInfo->virtualScanInfo;
4,566✔
712
  blockDataDestroy(pOperatorInfo->binfo.pRes);
4,566✔
713
  pOperatorInfo->binfo.pRes = NULL;
4,566✔
714

715
  tsortDestroySortHandle(pInfo->pSortHandle);
4,566✔
716
  pInfo->pSortHandle = NULL;
4,566✔
717
  taosArrayDestroy(pInfo->pSortInfo);
4,566✔
718
  pInfo->pSortInfo = NULL;
4,566✔
719

720
  blockDataDestroy(pInfo->pIntermediateBlock);
4,566✔
721
  pInfo->pIntermediateBlock = NULL;
4,566✔
722

723
  blockDataDestroy(pInfo->pInputBlock);
4,566✔
724
  pInfo->pInputBlock = NULL;
4,566✔
725
  destroyTableScanBase(&pInfo->base, &pInfo->base.readerAPI);
4,566✔
726

727
  taosHashCleanup(pInfo->dataSlotMap);
4,566✔
728

729
  if (pInfo->pSortCtxList) {
4,566✔
730
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSortCtxList); i++) {
4,821✔
731
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pInfo->pSortCtxList, i);
3,701✔
732
      blockDataDestroy(pCtx->pIntermediateBlock);
3,701✔
733
      taosMemoryFree(pCtx);
3,701!
734
    }
735
    taosArrayDestroy(pInfo->pSortCtxList);
1,120✔
736
    pInfo->pSortCtxList = NULL;
1,120✔
737
  }
738
  taosMemoryFreeClear(param);
4,566!
739
}
740

741
int32_t extractColMap(SNodeList* pNodeList, SHashObj** pSlotMap, int32_t *tsSlotId, int32_t *tagBlockId) {
4,566✔
742
  size_t  numOfCols = LIST_LENGTH(pNodeList);
4,566✔
743
  int32_t code = TSDB_CODE_SUCCESS;
4,566✔
744
  int32_t lino = 0;
4,566✔
745

746
  if (numOfCols == 0) {
4,566✔
747
    return code;
6✔
748
  }
749

750
  *tsSlotId = -1;
4,560✔
751
  *tagBlockId = -1;
4,560✔
752
  *pSlotMap = taosHashInit(numOfCols, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
4,560✔
753
  TSDB_CHECK_NULL(*pSlotMap, code, lino, _return, terrno)
4,560!
754

755
  for (int32_t i = 0; i < numOfCols; ++i) {
29,448✔
756
    SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
24,888✔
757
    TSDB_CHECK_NULL(pColNode, code, lino, _return, terrno)
24,888!
758

759
    if (pColNode->isPrimTs) {
24,888✔
760
      *tsSlotId = i;
4,104✔
761
    } else if (pColNode->hasRef) {
20,784✔
762
      int32_t slotKey = pColNode->dataBlockId << 16 | pColNode->slotId;
11,513✔
763
      VTS_ERR_JRET(taosHashPut(*pSlotMap, &slotKey, sizeof(slotKey), &i, sizeof(i)));
11,513!
764
    } else if (pColNode->colType == COLUMN_TYPE_TAG || '\0' == pColNode->tableAlias[0]) {
9,271✔
765
      // tag column or pseudo column's function
766
      *tagBlockId = pColNode->dataBlockId;
2,698✔
767
    }
768
  }
769

770
  return code;
4,560✔
771
_return:
×
772
  taosHashCleanup(*pSlotMap);
×
773
  *pSlotMap = NULL;
×
774
  if (code != TSDB_CODE_SUCCESS) {
×
775
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
776
  }
777
  return code;
×
778
}
779

780
int32_t resetVirtualTableMergeOperState(SOperatorInfo* pOper) {
12,210✔
781
  int32_t code = 0, lino = 0;
12,210✔
782
  SVirtualScanMergeOperatorInfo* pMergeInfo = pOper->info;
12,210✔
783
  SVirtualScanPhysiNode* pPhynode = (SVirtualScanPhysiNode*)pOper->pPhyNode;
12,210✔
784
  SVirtualTableScanInfo* pInfo = &pMergeInfo->virtualScanInfo;
12,210✔
785
  
786
  pOper->status = OP_NOT_OPENED;
12,210✔
787
  resetBasicOperatorState(&pMergeInfo->binfo);
12,210✔
788

789
  tsortDestroySortHandle(pInfo->pSortHandle);
12,210✔
790
  pInfo->pSortHandle = NULL;
12,210✔
791
  // taosArrayDestroy(pInfo->pSortInfo);
792
  // pInfo->pSortInfo = NULL;
793

794
  blockDataDestroy(pInfo->pIntermediateBlock);
12,210✔
795
  pInfo->pIntermediateBlock = NULL;
12,210✔
796

797
  blockDataDestroy(pInfo->pInputBlock);
12,210✔
798
  pInfo->pInputBlock = createDataBlockFromDescNode(((SPhysiNode*)pPhynode)->pOutputDataBlockDesc);
12,210✔
799
  TSDB_CHECK_NULL(pInfo->pInputBlock, code, lino, _exit, terrno)
12,210!
800

801
  pInfo->tagDownStreamId = -1;
12,210✔
802

803
  if (pInfo->pSortCtxList) {
12,210✔
804
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSortCtxList); i++) {
135✔
805
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pInfo->pSortCtxList, i);
81✔
806
      blockDataDestroy(pCtx->pIntermediateBlock);
81✔
807
      taosMemoryFree(pCtx);
81!
808
    }
809
    taosArrayDestroy(pInfo->pSortCtxList);
54✔
810
    pInfo->pSortCtxList = NULL;
54✔
811
  }
812

813
  pMergeInfo->pSavedTuple = NULL;
12,210✔
814
  pMergeInfo->pSavedTagBlock = NULL;
12,210✔
815

816
_exit:
12,210✔
817

818
  if (code) {
12,210!
819
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
820
  }
821

822
  return code;
12,210✔
823
}
824

825
int32_t createVirtualTableMergeOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
4,565✔
826
                                            SVirtualScanPhysiNode* pVirtualScanPhyNode,
827
                                            SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
828
  SPhysiNode*                    pPhyNode = (SPhysiNode*)pVirtualScanPhyNode;
4,565✔
829
  int32_t                        lino = 0;
4,565✔
830
  int32_t                        code = TSDB_CODE_SUCCESS;
4,565✔
831
  SVirtualScanMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SVirtualScanMergeOperatorInfo));
4,565!
832
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
4,566!
833
  SDataBlockDescNode*            pDescNode = pPhyNode->pOutputDataBlockDesc;
4,566✔
834
  SNodeList*                     pMergeKeys = NULL;
4,566✔
835

836
  QUERY_CHECK_NULL(pInfo, code, lino, _return, terrno)
4,566!
837
  QUERY_CHECK_NULL(pOperator, code, lino, _return, terrno)
4,566!
838

839
  pOperator->pPhyNode = pVirtualScanPhyNode;
4,566✔
840

841
  pInfo->binfo.inputTsOrder = pVirtualScanPhyNode->scan.node.inputTsOrder;
4,566✔
842
  pInfo->binfo.outputTsOrder = pVirtualScanPhyNode->scan.node.outputTsOrder;
4,566✔
843

844
  SVirtualTableScanInfo* pVirtualScanInfo = &pInfo->virtualScanInfo;
4,566✔
845
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
4,566✔
846
  TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _return, terrno)
4,566!
847

848
  SSDataBlock* pInputBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
4,566✔
849
  TSDB_CHECK_NULL(pInputBlock, code, lino, _return, terrno)
4,566!
850
  pVirtualScanInfo->pInputBlock = pInputBlock;
4,566✔
851
  pVirtualScanInfo->tagDownStreamId = -1;
4,566✔
852
  pVirtualScanInfo->vtableUid = (tb_uid_t)pVirtualScanPhyNode->scan.uid;
4,566✔
853
  if (pVirtualScanPhyNode->scan.pScanPseudoCols != NULL) {
4,566✔
854
    SExprSupp* pSup = &pVirtualScanInfo->base.pseudoSup;
1,236✔
855
    pSup->pExprInfo = NULL;
1,236✔
856
    VTS_ERR_JRET(createExprInfo(pVirtualScanPhyNode->scan.pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs));
1,236!
857

858
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset,
1,236✔
859
                                      &pTaskInfo->storageAPI.functionStore);
860
    TSDB_CHECK_NULL(pSup->pCtx, code, lino, _return, terrno)
1,236!
861
  }
862

863
  initResultSizeInfo(&pOperator->resultInfo, 1024);
4,566✔
864
  TSDB_CHECK_CODE(blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity), lino, _return);
4,566!
865

866
  size_t  numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock);
4,566✔
867
  int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
4,566✔
868

869
  if (!pVirtualScanPhyNode->scan.node.dynamicOp) {
4,566✔
870
    VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, 0));
3,420!
871
    pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys);
3,420✔
872
    TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno)
3,420!
873
  } else {
874
    pTaskInfo->dynamicTask = true;
1,146✔
875
  }
876
  pVirtualScanInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols);
4,566✔
877
  pVirtualScanInfo->sortBufSize =
4,566✔
878
      pVirtualScanInfo->bufPageSize * (numOfDownstream + 1);  // one additional is reserved for merged result.
4,566✔
879
  VTS_ERR_JRET(
4,566!
880
      extractColMap(pVirtualScanPhyNode->pTargets, &pVirtualScanInfo->dataSlotMap, &pVirtualScanInfo->tsSlotId, &pVirtualScanInfo->tagBlockId));
881

882
  pVirtualScanInfo->scanAllCols = pVirtualScanPhyNode->scanAllCols;
4,566✔
883

884
  VTS_ERR_JRET(filterInitFromNode((SNode*)pVirtualScanPhyNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo,
4,566!
885
                                  0, pTaskInfo->pStreamRuntimeInfo));
886

887
  pVirtualScanInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
4,566✔
888
  QUERY_CHECK_NULL(pVirtualScanInfo->base.metaCache.pTableMetaEntryCache, code, lino, _return, terrno)
4,566!
889

890
  setOperatorInfo(pOperator, "VirtualTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN, false,
4,566✔
891
                  OP_NOT_OPENED, pInfo, pTaskInfo);
892
  pOperator->fpSet =
893
      createOperatorFpSet(openVirtualTableScanOperator, virtualTableGetNext, NULL, destroyVirtualTableScanOperatorInfo,
4,566✔
894
                          optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
895
  setOperatorResetStateFn(pOperator, resetVirtualTableMergeOperState);
4,566✔
896

897
  if (NULL != pDownstream) {
4,566✔
898
    VTS_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));
4,535!
899
  } else {
900
    pVirtualScanInfo->tagDownStreamId = -1;
31✔
901
  }
902

903
  nodesDestroyList(pMergeKeys);
4,566✔
904
  *pOptrInfo = pOperator;
4,566✔
905
  return TSDB_CODE_SUCCESS;
4,566✔
906

907
_return:
×
908
  if (code != TSDB_CODE_SUCCESS) {
×
909
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
910
  }
911
  if (pInfo != NULL) {
×
912
    destroyVirtualTableScanOperatorInfo(pInfo);
×
913
  }
914
  nodesDestroyList(pMergeKeys);
×
915
  pTaskInfo->code = code;
×
916
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
917
  return code;
×
918
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc