• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4513

17 Jul 2025 02:02AM UTC coverage: 31.359% (-31.1%) from 62.446%
#4513

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

68541 of 301034 branches covered (22.77%)

Branch coverage included in aggregate %.

117356 of 291771 relevant lines covered (40.22%)

602262.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/virtualtablescanoperator.c
1
/*
2
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
*
4
* This program is free software: you can use, redistribute, and/or modify
5
* it under the terms of the GNU Affero General Public License, version 3
6
* or later ("AGPL"), as published by the Free Software Foundation.
7
*
8
* This program is distributed in the hope that it will be useful, but WITHOUT
9
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
* FITNESS FOR A PARTICULAR PURPOSE.
11
*
12
* You should have received a copy of the GNU Affero General Public License
13
* along with this program. If not, see <http://www.gnu.org/licenses/>.
14
*/
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "streamexecutorInt.h"
21
#include "tdatablock.h"
22
#include "ttime.h"
23
#include "virtualtablescan.h"
24
#include "tsort.h"
25

26
#define STREAM_VTABLE_MERGE_OP_NAME "StreamVtableMergeOperator"
27
#define STREAM_VTABLE_MERGE_OP_CHECKPOINT_NAME "StreamVtableMergeOperator_Checkpoint"
28

29
typedef struct SVirtualTableScanInfo {
30
  STableScanBase base;
31
  SArray*        pSortInfo;
32
  SSortHandle*   pSortHandle;
33
  int32_t        bufPageSize;
34
  uint32_t       sortBufSize;  // max buffer size for in-memory sort
35
  SSDataBlock*   pIntermediateBlock;   // to hold the intermediate result
36
  SSDataBlock*   pInputBlock;
37
  SHashObj*      dataSlotMap;
38
  int32_t        tsSlotId;
39
  int32_t        tagBlockId;
40
  int32_t        tagDownStreamId;
41
  bool           scanAllCols;
42
  SArray*        pSortCtxList;
43
  tb_uid_t       vtableUid;  // virtual table uid, used to identify the vtable scan operator
44
} SVirtualTableScanInfo;
45

46
typedef struct SVirtualScanMergeOperatorInfo {
47
  SOptrBasicInfo        binfo;
48
  EMergeType            type;
49
  SVirtualTableScanInfo virtualScanInfo;
50
  bool                  ignoreGroupId;
51
  uint64_t              groupId;
52
  STupleHandle*         pSavedTuple;
53
  SSDataBlock*          pSavedTagBlock;
54
} SVirtualScanMergeOperatorInfo;
55

56
typedef struct SLoadNextCtx {
57
  SOperatorInfo*  pOperator;
58
  SOperatorParam* pOperatorGetParam;
59
  int32_t         blockId;
60
  STimeWindow     window;
61
  SSDataBlock*    pIntermediateBlock;
62
  col_id_t        tsSlotId;
63
} SLoadNextCtx;
64

65
int32_t virtualScanloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
×
66
  SOperatorInfo* pOperator = (SOperatorInfo*)param;
×
67
  int32_t        code = TSDB_CODE_SUCCESS;
×
68

69
  VTS_ERR_JRET(pOperator->fpSet.getNextFn(pOperator, ppBlock));
×
70
  VTS_ERR_JRET(blockDataCheck(*ppBlock));
×
71

72
  return code;
×
73
_return:
×
74
  qError("failed to load data block from downstream, %s code:%s", __func__, tstrerror(code));
×
75
  return code;
×
76
}
77

78
int32_t getTimeWindowOfBlock(SSDataBlock *pBlock, col_id_t tsSlotId, int64_t *startTs, int64_t *endTs) {
×
79
  int32_t code = TSDB_CODE_SUCCESS;
×
80
  int32_t lino = 0;
×
81
  int32_t tsIndex = -1;
×
82
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
×
83
    if (((SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i))->info.colId == tsSlotId) {
×
84
      tsIndex = i;
×
85
      break;
×
86
    }
87
  }
88

89
  if (tsIndex == -1) {
×
90
    tsIndex = (int32_t)taosArrayGetSize(pBlock->pDataBlock) - 1;
×
91
  }
92

93
  SColumnInfoData *pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, tsIndex);
×
94
  QUERY_CHECK_NULL(pColData, code, lino, _return, terrno);
×
95

96
  GET_TYPED_DATA(*startTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, 0), 0);
×
97
  GET_TYPED_DATA(*endTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, pBlock->info.rows - 1), 0);
×
98

99
  return code;
×
100
_return:
×
101
  qError("failed to get time window of block, %s code:%s", __func__, tstrerror(code));
×
102
  return code;
×
103
}
104

105
int32_t virtualScanloadNextDataBlockFromParam(void* param, SSDataBlock** ppBlock) {
×
106
  SLoadNextCtx*           pCtx = (SLoadNextCtx*)param;
×
107
  SOperatorInfo*          pOperator = pCtx->pOperator;
×
108
  SOperatorParam*         pOperatorGetParam = pCtx->pOperatorGetParam;
×
109
  int32_t                 code = TSDB_CODE_SUCCESS;
×
110
  SSDataBlock*            pRes = NULL;
×
111
  SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperatorGetParam->value;
×
112

113
  pParam->basic.window = pCtx->window;
×
114
  pOperator->status = OP_NOT_OPENED;
×
115
  if (pCtx->pIntermediateBlock) {
×
116
    blockDataDestroy(pCtx->pIntermediateBlock);
×
117
    pCtx->pIntermediateBlock = NULL;
×
118
  }
119

120
  VTS_ERR_JRET(pOperator->fpSet.getNextExtFn(pOperator, pOperatorGetParam, &pRes));
×
121

122
  VTS_ERR_JRET(blockDataCheck(pRes));
×
123
  if ((pRes)) {
×
124
    qDebug("%s load from downstream, blockId:%d", __func__, pCtx->blockId);
×
125
    (pRes)->info.id.blockId = pCtx->blockId;
×
126
    getTimeWindowOfBlock(pRes, pCtx->tsSlotId, &pCtx->window.skey, &pCtx->window.ekey);
×
127
    VTS_ERR_JRET(createOneDataBlock(pRes, true, &pCtx->pIntermediateBlock));
×
128
    *ppBlock = pCtx->pIntermediateBlock;
×
129
  } else {
130
    pCtx->window.ekey = INT64_MAX;
×
131
    *ppBlock = NULL;
×
132
  }
133

134
  return code;
×
135
_return:
×
136
  qError("failed to load data block from downstream, %s code:%s", __func__, tstrerror(code));
×
137
  return code;
×
138
}
139

140
int32_t makeTSMergeKey(SNodeList** pMergeKeys, col_id_t tsSlotId) {
×
141
  int32_t           code = TSDB_CODE_SUCCESS;
×
142
  SNodeList        *pNodeList = NULL;
×
143
  SColumnNode      *pColumnNode = NULL;
×
144
  SOrderByExprNode *pOrderByExprNode = NULL;
×
145

146
  VTS_ERR_JRET(nodesMakeList(&pNodeList));
×
147

148
  VTS_ERR_JRET(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pColumnNode));
×
149
  pColumnNode->slotId = tsSlotId;
×
150

151
  VTS_ERR_JRET(nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR, (SNode**)&pOrderByExprNode));
×
152
  pOrderByExprNode->pExpr = (SNode*)pColumnNode;
×
153
  pOrderByExprNode->order = ORDER_ASC;
×
154
  pOrderByExprNode->nullOrder = NULL_ORDER_FIRST;
×
155

156
  VTS_ERR_JRET(nodesListAppend(pNodeList, (SNode*)pOrderByExprNode));
×
157

158
  *pMergeKeys = pNodeList;
×
159
  return code;
×
160
_return:
×
161
  nodesDestroyNode((SNode*)pColumnNode);
×
162
  nodesDestroyNode((SNode*)pOrderByExprNode);
×
163
  nodesDestroyList(pNodeList);
×
164
  return code;
×
165
}
166

167
void cleanUpVirtualScanInfo(SVirtualTableScanInfo* pVirtualScanInfo) {
×
168
  if (pVirtualScanInfo->pSortInfo) {
×
169
    taosArrayDestroy(pVirtualScanInfo->pSortInfo);
×
170
    pVirtualScanInfo->pSortInfo = NULL;
×
171
  }
172
  if (pVirtualScanInfo->pSortHandle) {
×
173
    tsortDestroySortHandle(pVirtualScanInfo->pSortHandle);
×
174
    pVirtualScanInfo->pSortHandle = NULL;
×
175
  }
176
  if (pVirtualScanInfo->pSortCtxList) {
×
177
    for (int32_t i = 0; i < taosArrayGetSize(pVirtualScanInfo->pSortCtxList); i++) {
×
178
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pVirtualScanInfo->pSortCtxList, i);
×
179
      blockDataDestroy(pCtx->pIntermediateBlock);
×
180
      taosMemoryFree(pCtx);
×
181
    }
182
    taosArrayDestroy(pVirtualScanInfo->pSortCtxList);
×
183
  }
184
}
×
185

186
int32_t createSortHandleFromParam(SOperatorInfo* pOperator) {
×
187
  int32_t                         code = TSDB_CODE_SUCCESS;
×
188
  int32_t                         lino = 0;
×
189
  SVirtualScanMergeOperatorInfo*  pInfo = pOperator->info;
×
190
  SVirtualTableScanInfo*          pVirtualScanInfo = &pInfo->virtualScanInfo;
×
191
  SVTableScanOperatorParam *      pParam = (SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value;
×
192
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
×
193
  pVirtualScanInfo->sortBufSize = pVirtualScanInfo->bufPageSize * (taosArrayGetSize((pParam)->pOpParamArray) + 1);
×
194
  int32_t                         numOfBufPage = (int32_t)pVirtualScanInfo->sortBufSize / pVirtualScanInfo->bufPageSize;
×
195
  SNodeList*                      pMergeKeys = NULL;
×
196
  SSortSource*                    ps = NULL;
×
197
  int32_t                         scanOpIndex = 0;
×
198

199
  cleanUpVirtualScanInfo(pVirtualScanInfo);
×
200
  VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, pVirtualScanInfo->tsSlotId));
×
201
  pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys);
×
202
  TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno);
×
203
  nodesDestroyList(pMergeKeys);
×
204

205
  VTS_ERR_JRET(tsortCreateSortHandle(pVirtualScanInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pVirtualScanInfo->bufPageSize,
×
206
                                     numOfBufPage, pVirtualScanInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pVirtualScanInfo->pSortHandle));
207

208
  tsortSetForceUsePQSort(pVirtualScanInfo->pSortHandle);
×
209
  tsortSetFetchRawDataFp(pVirtualScanInfo->pSortHandle, virtualScanloadNextDataBlockFromParam, NULL, NULL);
×
210

211
  if (pOperator->numOfDownstream > 2) {
×
212
    qError("virtual scan operator should not have more than 2 downstreams, current numOfDownstream:%d", pOperator->numOfDownstream);
×
213
    VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM);
×
214
  }
215

216
  pVirtualScanInfo->tagDownStreamId = -1;
×
217
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
×
218
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
×
219
    if (pDownstream->resultDataBlockId == pVirtualScanInfo->tagBlockId) {
×
220
      // tag block do not need sort
221
      pVirtualScanInfo->tagDownStreamId = i;
×
222
      pInfo->pSavedTagBlock = NULL;
×
223
      continue;
×
224
    }
225
  }
226
  scanOpIndex = pVirtualScanInfo->tagDownStreamId == -1 ? 0 : 1 - pVirtualScanInfo->tagDownStreamId;
×
227

228
  pOperator->pDownstream[scanOpIndex]->status = OP_NOT_OPENED;
×
229
  pVirtualScanInfo->pSortCtxList = taosArrayInit(taosArrayGetSize((pParam)->pOpParamArray), POINTER_BYTES);
×
230
  TSDB_CHECK_NULL(pVirtualScanInfo->pSortCtxList, code, lino, _return, terrno);
×
231
  for (int32_t i = 0; i < taosArrayGetSize((pParam)->pOpParamArray); i++) {
×
232
    SOperatorParam* pOpParam = *(SOperatorParam**)taosArrayGet((pParam)->pOpParamArray, i);
×
233
    SLoadNextCtx*   pCtx = NULL;
×
234
    ps = NULL;
×
235

236
    pCtx = taosMemoryMalloc(sizeof(SLoadNextCtx));
×
237
    QUERY_CHECK_NULL(pCtx, code, lino, _return, terrno);
×
238
    pCtx->blockId = i;
×
239
    pCtx->pOperator = pOperator->pDownstream[scanOpIndex];
×
240
    pCtx->pOperatorGetParam = pOpParam;
×
241
    pCtx->window = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN};
×
242
    pCtx->pIntermediateBlock = NULL;
×
243
    pCtx->tsSlotId = (col_id_t)pVirtualScanInfo->tsSlotId;
×
244

245
    ps = taosMemoryCalloc(1, sizeof(SSortSource));
×
246
    QUERY_CHECK_NULL(ps, code, lino, _return, terrno);
×
247

248
    ps->param = pCtx;
×
249
    ps->onlyRef = true;
×
250

251
    VTS_ERR_JRET(tsortAddSource(pVirtualScanInfo->pSortHandle, ps));
×
252
    QUERY_CHECK_NULL(taosArrayPush(pVirtualScanInfo->pSortCtxList, &pCtx), code, lino, _return, terrno);
×
253
  }
254

255
  VTS_ERR_JRET(tsortOpen(pVirtualScanInfo->pSortHandle));
×
256

257
  return code;
×
258
_return:
×
259
  if (code != 0){
×
260
    qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
261
  }
262
  nodesDestroyList(pMergeKeys);
×
263
  if (ps != NULL) {
×
264
    taosMemoryFree(ps);
×
265
  }
266
  return code;
×
267
}
268

269
int32_t createSortHandle(SOperatorInfo* pOperator) {
×
270
  SVirtualScanMergeOperatorInfo * pInfo = pOperator->info;
×
271
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
×
272
  SVirtualTableScanInfo*          pVirtualScanInfo = &pInfo->virtualScanInfo;
×
273
  int32_t                         numOfBufPage = (int32_t)pVirtualScanInfo->sortBufSize / pVirtualScanInfo->bufPageSize;
×
274
  SSortSource*                    ps = NULL;
×
275
  int32_t                         code = 0;
×
276
  int32_t                         lino = 0;
×
277

278
  VTS_ERR_JRET(tsortCreateSortHandle(pVirtualScanInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pVirtualScanInfo->bufPageSize,
×
279
                                     numOfBufPage, pVirtualScanInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pVirtualScanInfo->pSortHandle));
280

281
  tsortSetForceUsePQSort(pVirtualScanInfo->pSortHandle);
×
282
  tsortSetFetchRawDataFp(pVirtualScanInfo->pSortHandle, virtualScanloadNextDataBlock, NULL, NULL);
×
283

284
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
×
285
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
×
286
    if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
×
287
      VTS_ERR_JRET(pDownstream->fpSet._openFn(pDownstream));
×
288
    } else {
289
      VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM);
×
290
    }
291

292
    if (pDownstream->resultDataBlockId == pVirtualScanInfo->tagBlockId) {
×
293
      // tag block do not need sort
294
      pVirtualScanInfo->tagDownStreamId = i;
×
295
      continue;
×
296
    }
297

298
    ps = taosMemoryCalloc(1, sizeof(SSortSource));
×
299
    TSDB_CHECK_NULL(ps, code, lino, _return, terrno);
×
300

301
    ps->param = pDownstream;
×
302
    ps->onlyRef = true;
×
303

304
    VTS_ERR_JRET(tsortAddSource(pVirtualScanInfo->pSortHandle, ps));
×
305
    ps = NULL;
×
306
  }
307

308
  VTS_ERR_JRET(tsortOpen(pVirtualScanInfo->pSortHandle));
×
309

310
_return:
×
311
  if (code != 0){
×
312
    qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
313
  }
314
  if (ps != NULL) {
×
315
    taosMemoryFree(ps);
×
316
  }
317
  return code;
×
318
}
319

320
int32_t openVirtualTableScanOperatorImpl(SOperatorInfo* pOperator) {
×
321
  SVirtualScanMergeOperatorInfo * pInfo = pOperator->info;
×
322
  int32_t                         code = 0;
×
323
  int32_t                         lino = 0;
×
324

325
  if (pOperator->numOfDownstream == 0) {
×
326
    return code;
×
327
  }
328

329
  if (pOperator->pOperatorGetParam) {
×
330
    VTS_ERR_JRET(createSortHandleFromParam(pOperator));
×
331
  } else {
332
    VTS_ERR_JRET(createSortHandle(pOperator));
×
333
  }
334

335
  return code;
×
336

337
_return:
×
338
  qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
339
  return code;
×
340
}
341

342
int32_t openVirtualTableScanOperator(SOperatorInfo* pOperator) {
×
343
  int32_t code = 0;
×
344

345
  if (OPTR_IS_OPENED(pOperator)) {
×
346
    return TSDB_CODE_SUCCESS;
×
347
  }
348

349
  int64_t startTs = taosGetTimestampUs();
×
350

351
  code = openVirtualTableScanOperatorImpl(pOperator);
×
352

353
  pOperator->cost.openCost = (double)(taosGetTimestampUs() - startTs) / 1000.0;
×
354
  pOperator->status = OP_RES_TO_RETURN;
×
355

356
  VTS_ERR_RET(code);
×
357

358
  OPTR_SET_OPENED(pOperator);
×
359
  return code;
×
360
}
361

362
static int32_t doGetVtableMergedBlockData(SVirtualScanMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
×
363
                                          SSDataBlock* p) {
364
  int32_t code = 0;
×
365
  int64_t lastTs = 0;
×
366
  int64_t rowNums = -1;
×
367
  blockDataEmpty(p);
×
368
  while (1) {
×
369
    STupleHandle* pTupleHandle = NULL;
×
370
    if (!pInfo->pSavedTuple) {
×
371
      code = tsortNextTuple(pHandle, &pTupleHandle);
×
372
      if (pTupleHandle == NULL || (code != 0)) {
×
373
        break;
374
      }
375
    } else {
376
      pTupleHandle = pInfo->pSavedTuple;
×
377
      pInfo->pSavedTuple = NULL;
×
378
    }
379

380
    SDataBlockInfo info = {0};
×
381
    tsortGetBlockInfo(pTupleHandle, &info);
×
382
    int32_t blockId = (int32_t)info.id.blockId;
×
383

384
    for (int32_t i = 0; i < (pInfo->virtualScanInfo.scanAllCols ? 1 : tsortGetColNum(pTupleHandle)); i++) {
×
385
      bool isNull = tsortIsNullVal(pTupleHandle, i);
×
386
      if (isNull) {
×
387
        colDataSetNULL(taosArrayGet(p->pDataBlock, i), rowNums);
×
388
      } else {
389
        char* pData = NULL;
×
390
        tsortGetValue(pTupleHandle, i, (void**)&pData);
×
391

392
        if (pData != NULL) {
×
393
          if (i == 0) {
×
394
            if (lastTs != *(int64_t*)pData) {
×
395
              if (rowNums >= capacity - 1) {
×
396
                pInfo->pSavedTuple = pTupleHandle;
×
397
                goto _return;
×
398
              }
399
              rowNums++;
×
400
              for (int32_t j = 0; j < taosArrayGetSize(p->pDataBlock); j++) {
×
401
                colDataSetNULL(taosArrayGet(p->pDataBlock, j), rowNums);
×
402
              }
403
              if (pInfo->virtualScanInfo.tsSlotId != -1) {
×
404
                VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums, pData, false));
×
405
              }
406
              lastTs = *(int64_t*)pData;
×
407
            }
408
          }
409
          int32_t slotKey = blockId << 16 | i;
×
410
          void*   slotId = taosHashGet(pInfo->virtualScanInfo.dataSlotMap, &slotKey, sizeof(slotKey));
×
411
          if (slotId == NULL) {
×
412
            if (i == 0) {
×
413
              continue;
×
414
            } else {
415
              qError("failed to get slotId from dataSlotMap, blockId:%d, slotId:%d", blockId, i);
×
416
              VTS_ERR_RET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
417
            }
418
          }
419
          VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, *(int32_t *)slotId), rowNums, pData, false));
×
420
        }
421
      }
422
    }
423
  }
424
_return:
×
425
  p->info.rows = rowNums + 1;
×
426
  p->info.dataLoad = 1;
×
427
  p->info.scanFlag = MAIN_SCAN;
×
428
  return code;
×
429
}
430

431
static int32_t doGetVStableMergedBlockData(SVirtualScanMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
×
432
                                           SSDataBlock* p) {
433
  int32_t code = 0;
×
434
  int64_t lastTs = 0;
×
435
  int64_t rowNums = -1;
×
436
  blockDataEmpty(p);
×
437
  while (1) {
×
438
    STupleHandle* pTupleHandle = NULL;
×
439
    if (!pInfo->pSavedTuple) {
×
440
      code = tsortNextTuple(pHandle, &pTupleHandle);
×
441
      if (pTupleHandle == NULL || (code != 0)) {
×
442
        break;
443
      }
444
    } else {
445
      pTupleHandle = pInfo->pSavedTuple;
×
446
      pInfo->pSavedTuple = NULL;
×
447
    }
448

449
    int32_t tsIndex = -1;
×
450

451
    for (int32_t i = 0; i < tsortGetColNum(pTupleHandle); i++) {
×
452
      if (tsortIsNullVal(pTupleHandle, i)) {
×
453
        continue;
×
454
      } else {
455
        SColumnInfoData *pColInfo = NULL;
×
456
        tsortGetColumnInfo(pTupleHandle, i, &pColInfo);
×
457
        if (pColInfo->info.slotId ==  pInfo->virtualScanInfo.tsSlotId) {
×
458
          tsIndex = i;
×
459
          break;
×
460
        }
461
      }
462
    }
463

464
    if (tsIndex == -1) {
×
465
      tsIndex = (int32_t)tsortGetColNum(pTupleHandle) - 1;
×
466
    }
467

468
    char* pData = NULL;
×
469
    // first, set ts slot's data
470
    // then, set other slots' data
471
    tsortGetValue(pTupleHandle, tsIndex, (void**)&pData);
×
472

473
    if (pData != NULL) {
×
474
      if (lastTs != *(int64_t*)pData) {
×
475
        if (rowNums >= capacity - 1) {
×
476
          pInfo->pSavedTuple = pTupleHandle;
×
477
          goto _return;
×
478
        }
479
        rowNums++;
×
480
        for (int32_t j = 0; j < taosArrayGetSize(p->pDataBlock); j++) {
×
481
          colDataSetNULL(taosArrayGet(p->pDataBlock, j), rowNums);
×
482
        }
483
        if (pInfo->virtualScanInfo.tsSlotId != -1) {
×
484
          VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums, pData, false));
×
485
        }
486
        lastTs = *(int64_t*)pData;
×
487
      }
488
    }
489
    if (pInfo->virtualScanInfo.scanAllCols) {
×
490
      continue;
×
491
    }
492

493
    for (int32_t i = 0; i < tsortGetColNum(pTupleHandle); i++) {
×
494
      if (i == tsIndex || tsortIsNullVal(pTupleHandle, i)) {
×
495
        continue;
×
496
      }
497

498
      SColumnInfoData *pColInfo = NULL;
×
499
      tsortGetColumnInfo(pTupleHandle, i, &pColInfo);
×
500
      tsortGetValue(pTupleHandle, i, (void**)&pData);
×
501

502
      if (pData != NULL) {
×
503
        VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, i), rowNums, pData, false));
×
504
      }
505
    }
506
  }
507
_return:
×
508
  p->info.rows = rowNums + 1;
×
509
  p->info.dataLoad = 1;
×
510
  p->info.scanFlag = MAIN_SCAN;
×
511
  return code;
×
512
}
513

514
int32_t doVirtualTableMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
×
515
  int32_t                        code = 0;
×
516
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
×
517
  SVirtualScanMergeOperatorInfo* pInfo = pOperator->info;
×
518
  SVirtualTableScanInfo*         pVirtualScanInfo = &pInfo->virtualScanInfo;
×
519
  SSortHandle*                   pHandle = pVirtualScanInfo->pSortHandle;
×
520
  SSDataBlock*                   pDataBlock = pInfo->binfo.pRes;
×
521
  int32_t                        capacity = pOperator->resultInfo.capacity;
×
522

523
  qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
×
524
  blockDataCleanup(pDataBlock);
×
525

526
  if (pHandle == NULL) {
×
527
    return TSDB_CODE_SUCCESS;
×
528
  }
529

530
  if (pVirtualScanInfo->pIntermediateBlock == NULL) {
×
531
    VTS_ERR_RET(tsortGetSortedDataBlock(pHandle, &pVirtualScanInfo->pIntermediateBlock));
×
532
    if (pVirtualScanInfo->pIntermediateBlock == NULL) {
×
533
      return TSDB_CODE_SUCCESS;
×
534
    }
535

536
    VTS_ERR_RET(blockDataEnsureCapacity(pVirtualScanInfo->pIntermediateBlock, capacity));
×
537
  } else {
538
    blockDataCleanup(pVirtualScanInfo->pIntermediateBlock);
×
539
  }
540

541
  SSDataBlock* p = pVirtualScanInfo->pIntermediateBlock;
×
542
  if (pOperator->pOperatorGetParam) {
×
543
    VTS_ERR_RET(doGetVStableMergedBlockData(pInfo, pHandle, capacity, p));
×
544
  } else {
545
    VTS_ERR_RET(doGetVtableMergedBlockData(pInfo, pHandle, capacity, p));
×
546
  }
547

548
  VTS_ERR_RET(copyDataBlock(pDataBlock, p));
×
549
  qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
×
550
         pDataBlock->info.rows);
551

552
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
×
553
  return code;
×
554
}
555

556
int32_t vtableAddTagPseudoColumnData(SVirtualTableScanInfo *pInfo, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* tagBlock, SSDataBlock* pBlock, int32_t rows) {
×
557
  int32_t          code = TSDB_CODE_SUCCESS;
×
558
  int32_t          lino = 0;
×
559
  int64_t          backupRows;
560
  // currently only the tbname pseudo column
561
  if (numOfExpr <= 0) {
×
562
    return TSDB_CODE_SUCCESS;
×
563
  }
564

565
  if (tagBlock == NULL) {
×
566
    return TSDB_CODE_SUCCESS;
×
567
  }
568

569
  if (tagBlock->info.rows != 1) {
×
570
    qError("tag block should have only one row, current rows:%" PRId64, tagBlock->info.rows);
×
571
    VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
572
  }
573

574
  backupRows = pBlock->info.rows;
×
575
  pBlock->info.rows = rows;
×
576
  for (int32_t j = 0; j < numOfExpr; ++j) {
×
577
    const SExprInfo* pExpr1 = &pExpr[j];
×
578
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
×
579

580
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
×
581
    TSDB_CHECK_NULL(pColInfoData, code, lino, _return, terrno);
×
582
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
×
583

584
    SColumnInfoData* pTagInfoData = taosArrayGet(tagBlock->pDataBlock, j);
×
585
    TSDB_CHECK_NULL(pTagInfoData, code, lino, _return, terrno);
×
586

587
    if (colDataIsNull_s(pTagInfoData, 0) || IS_JSON_NULL(pTagInfoData->info.type, colDataGetData(pTagInfoData, 0))) {
×
588
      colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
589
      continue;
×
590
    }
591

592
    char* data = colDataGetData(pTagInfoData, 0);
×
593

594
    if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
×
595
      code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
×
596
      QUERY_CHECK_CODE(code, lino, _return);
×
597
    } else {  // todo opt for json tag
598
      for (int32_t i = 0; i < pBlock->info.rows; ++i) {
×
599
        code = colDataSetVal(pColInfoData, i, data, false);
×
600
        QUERY_CHECK_CODE(code, lino, _return);
×
601
      }
602
    }
603
  }
604

605
  // restore the rows
606
  pBlock->info.rows = backupRows;
×
607

608
_return:
×
609

610
  if (code != TSDB_CODE_SUCCESS) {
×
611
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
612
  }
613
  return code;
×
614
}
615

616
static int32_t doSetTagColumnData(SVirtualTableScanInfo* pInfo, SSDataBlock* pTagBlock, SSDataBlock* pBlock,
×
617
                                  SExecTaskInfo* pTaskInfo, int32_t rows) {
618
  int32_t         code = 0;
×
619
  STableScanBase* pTableScanInfo = &pInfo->base;
×
620
  SExprSupp*      pSup = &pTableScanInfo->pseudoSup;
×
621
  if (pSup->numOfExprs > 0) {
×
622
    VTS_ERR_RET(vtableAddTagPseudoColumnData(pInfo,  pSup->pExprInfo, pSup->numOfExprs, pTagBlock, pBlock, rows));
×
623
  }
624

625
  return code;
×
626
}
627

628
int32_t virtualTableGetNext(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
×
629
  int32_t                        code = TSDB_CODE_SUCCESS;
×
630
  int32_t                        lino = 0;
×
631
  SVirtualScanMergeOperatorInfo* pInfo = pOperator->info;
×
632
  SVirtualTableScanInfo*         pVirtualScanInfo = &pInfo->virtualScanInfo;
×
633
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
×
634

635
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
×
636
    pResBlock = NULL;
×
637
    return code;
×
638
  }
639

640
  VTS_ERR_JRET(pOperator->fpSet._openFn(pOperator));
×
641

642
  if (pVirtualScanInfo->tagBlockId != -1 && pVirtualScanInfo->tagDownStreamId != -1 && !pInfo->pSavedTagBlock) {
×
643
    SSDataBlock*   pTagBlock = NULL;
×
644
    SOperatorInfo *pTagScanOp = pOperator->pDownstream[pVirtualScanInfo->tagDownStreamId];
×
645
    if (pOperator->pOperatorGetParam) {
×
646
      SOperatorParam* pTagOp = ((SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value)->pTagScanOp;
×
647
      pTagScanOp->fpSet.getNextExtFn(pTagScanOp, pTagOp, &pTagBlock);
×
648
    } else {
649
      pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagBlock);
×
650
    }
651

652
    if (pTagBlock == NULL || pTagBlock->info.rows != 1) {
×
653
      VTS_ERR_JRET(TSDB_CODE_FAILED);
×
654
    }
655
    pInfo->pSavedTagBlock = pTagBlock;
×
656
  }
657

658
  while(1) {
659
    VTS_ERR_JRET(doVirtualTableMerge(pOperator, pResBlock));
×
660
    if (*pResBlock == NULL) {
×
661
      setOperatorCompleted(pOperator);
×
662
      break;
×
663
    }
664

665
    if (pOperator->pOperatorGetParam) {
×
666
      uint64_t uid = ((SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value)->uid;
×
667
      (*pResBlock)->info.id.uid = uid;
×
668
    } else {
669
      (*pResBlock)->info.id.uid = pInfo->virtualScanInfo.vtableUid;
×
670
    }
671

672
    VTS_ERR_JRET(doSetTagColumnData(pVirtualScanInfo, pInfo->pSavedTagBlock, (*pResBlock), pTaskInfo, (*pResBlock)->info.rows));
×
673
    VTS_ERR_JRET(doFilter(*pResBlock, pOperator->exprSupp.pFilterInfo, NULL));
×
674
    if ((*pResBlock)->info.rows > 0) {
×
675
      break;
×
676
    }
677
  }
678

679
  return code;
×
680
_return:
×
681
  if (code != TSDB_CODE_SUCCESS) {
×
682
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
683
    pTaskInfo->code = code;
×
684
    T_LONG_JMP(pTaskInfo->env, code);
×
685
  }
686
  return code;
×
687
}
688

689
static void destroyTableScanBase(STableScanBase* pBase, TsdReader* pAPI) {
×
690
  cleanupQueryTableDataCond(&pBase->cond);
×
691

692
  if (pAPI->tsdReaderClose) {
×
693
    pAPI->tsdReaderClose(pBase->dataReader);
×
694
  }
695
  pBase->dataReader = NULL;
×
696

697
  if (pBase->matchInfo.pList != NULL) {
×
698
    taosArrayDestroy(pBase->matchInfo.pList);
×
699
  }
700

701
  taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
×
702
  cleanupExprSupp(&pBase->pseudoSup);
×
703
}
×
704

705
void destroyVirtualTableScanOperatorInfo(void* param) {
×
706
  if (!param) {
×
707
    return;
×
708
  }
709
  SVirtualScanMergeOperatorInfo* pOperatorInfo = (SVirtualScanMergeOperatorInfo*)param;
×
710
  SVirtualTableScanInfo* pInfo = &pOperatorInfo->virtualScanInfo;
×
711
  blockDataDestroy(pOperatorInfo->binfo.pRes);
×
712
  pOperatorInfo->binfo.pRes = NULL;
×
713

714
  tsortDestroySortHandle(pInfo->pSortHandle);
×
715
  pInfo->pSortHandle = NULL;
×
716
  taosArrayDestroy(pInfo->pSortInfo);
×
717
  pInfo->pSortInfo = NULL;
×
718

719
  blockDataDestroy(pInfo->pIntermediateBlock);
×
720
  pInfo->pIntermediateBlock = NULL;
×
721

722
  blockDataDestroy(pInfo->pInputBlock);
×
723
  pInfo->pInputBlock = NULL;
×
724
  destroyTableScanBase(&pInfo->base, &pInfo->base.readerAPI);
×
725

726
  taosHashCleanup(pInfo->dataSlotMap);
×
727

728
  if (pInfo->pSortCtxList) {
×
729
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSortCtxList); i++) {
×
730
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pInfo->pSortCtxList, i);
×
731
      blockDataDestroy(pCtx->pIntermediateBlock);
×
732
      taosMemoryFree(pCtx);
×
733
    }
734
    taosArrayDestroy(pInfo->pSortCtxList);
×
735
    pInfo->pSortCtxList = NULL;
×
736
  }
737
  taosMemoryFreeClear(param);
×
738
}
739

740
int32_t extractColMap(SNodeList* pNodeList, SHashObj** pSlotMap, int32_t *tsSlotId, int32_t *tagBlockId) {
×
741
  size_t  numOfCols = LIST_LENGTH(pNodeList);
×
742
  int32_t code = TSDB_CODE_SUCCESS;
×
743
  int32_t lino = 0;
×
744

745
  if (numOfCols == 0) {
×
746
    return code;
×
747
  }
748

749
  *tsSlotId = -1;
×
750
  *tagBlockId = -1;
×
751
  *pSlotMap = taosHashInit(numOfCols, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
752
  TSDB_CHECK_NULL(*pSlotMap, code, lino, _return, terrno);
×
753

754
  for (int32_t i = 0; i < numOfCols; ++i) {
×
755
    SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
×
756
    TSDB_CHECK_NULL(pColNode, code, lino, _return, terrno);
×
757

758
    if (pColNode->isPrimTs) {
×
759
      *tsSlotId = i;
×
760
    } else if (pColNode->hasRef) {
×
761
      int32_t slotKey = pColNode->dataBlockId << 16 | pColNode->slotId;
×
762
      VTS_ERR_JRET(taosHashPut(*pSlotMap, &slotKey, sizeof(slotKey), &i, sizeof(i)));
×
763
    } else if (pColNode->colType == COLUMN_TYPE_TAG || '\0' == pColNode->tableAlias[0]) {
×
764
      // tag column or pseudo column's function
765
      *tagBlockId = pColNode->dataBlockId;
×
766
    }
767
  }
768

769
  return code;
×
770
_return:
×
771
  taosHashCleanup(*pSlotMap);
×
772
  *pSlotMap = NULL;
×
773
  if (code != TSDB_CODE_SUCCESS) {
×
774
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
775
  }
776
  return code;
×
777
}
778

779
int32_t resetVirtualTableMergeOperState(SOperatorInfo* pOper) {
×
780
  int32_t code = 0, lino = 0;
×
781
  SVirtualScanMergeOperatorInfo* pMergeInfo = pOper->info;
×
782
  SVirtualScanPhysiNode* pPhynode = (SVirtualScanPhysiNode*)pOper->pPhyNode;
×
783
  SVirtualTableScanInfo* pInfo = &pMergeInfo->virtualScanInfo;
×
784
  
785
  pOper->status = OP_NOT_OPENED;
×
786
  resetBasicOperatorState(&pMergeInfo->binfo);
×
787

788
  tsortDestroySortHandle(pInfo->pSortHandle);
×
789
  pInfo->pSortHandle = NULL;
×
790
  // taosArrayDestroy(pInfo->pSortInfo);
791
  // pInfo->pSortInfo = NULL;
792

793
  blockDataDestroy(pInfo->pIntermediateBlock);
×
794
  pInfo->pIntermediateBlock = NULL;
×
795

796
  blockDataDestroy(pInfo->pInputBlock);
×
797
  pInfo->pInputBlock = createDataBlockFromDescNode(((SPhysiNode*)pPhynode)->pOutputDataBlockDesc);
×
798
  TSDB_CHECK_NULL(pInfo->pInputBlock, code, lino, _exit, terrno);
×
799

800
  pInfo->tagDownStreamId = -1;
×
801

802
  if (pInfo->pSortCtxList) {
×
803
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSortCtxList); i++) {
×
804
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pInfo->pSortCtxList, i);
×
805
      blockDataDestroy(pCtx->pIntermediateBlock);
×
806
      taosMemoryFree(pCtx);
×
807
    }
808
    taosArrayDestroy(pInfo->pSortCtxList);
×
809
    pInfo->pSortCtxList = NULL;
×
810
  }
811

812
  pMergeInfo->pSavedTuple = NULL;
×
813
  pMergeInfo->pSavedTagBlock = NULL;
×
814

815
_exit:
×
816

817
  if (code) {
×
818
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
819
  }
820

821
  return code;
×
822
}
823

824
int32_t createVirtualTableMergeOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
×
825
                                            SVirtualScanPhysiNode* pVirtualScanPhyNode,
826
                                            SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
827
  SPhysiNode*                    pPhyNode = (SPhysiNode*)pVirtualScanPhyNode;
×
828
  int32_t                        lino = 0;
×
829
  int32_t                        code = TSDB_CODE_SUCCESS;
×
830
  SVirtualScanMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SVirtualScanMergeOperatorInfo));
×
831
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
832
  SDataBlockDescNode*            pDescNode = pPhyNode->pOutputDataBlockDesc;
×
833
  SNodeList*                     pMergeKeys = NULL;
×
834

835
  QUERY_CHECK_NULL(pInfo, code, lino, _return, terrno);
×
836
  QUERY_CHECK_NULL(pOperator, code, lino, _return, terrno);
×
837

838
  pOperator->pPhyNode = pVirtualScanPhyNode;
×
839

840
  pInfo->binfo.inputTsOrder = pVirtualScanPhyNode->scan.node.inputTsOrder;
×
841
  pInfo->binfo.outputTsOrder = pVirtualScanPhyNode->scan.node.outputTsOrder;
×
842

843
  SVirtualTableScanInfo* pVirtualScanInfo = &pInfo->virtualScanInfo;
×
844
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
×
845
  TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _return, terrno);
×
846

847
  SSDataBlock* pInputBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
×
848
  TSDB_CHECK_NULL(pInputBlock, code, lino, _return, terrno);
×
849
  pVirtualScanInfo->pInputBlock = pInputBlock;
×
850
  pVirtualScanInfo->tagDownStreamId = -1;
×
851
  pVirtualScanInfo->vtableUid = (tb_uid_t)pVirtualScanPhyNode->scan.uid;
×
852
  if (pVirtualScanPhyNode->scan.pScanPseudoCols != NULL) {
×
853
    SExprSupp* pSup = &pVirtualScanInfo->base.pseudoSup;
×
854
    pSup->pExprInfo = NULL;
×
855
    VTS_ERR_JRET(createExprInfo(pVirtualScanPhyNode->scan.pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs));
×
856

857
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset,
×
858
                                      &pTaskInfo->storageAPI.functionStore);
859
    TSDB_CHECK_NULL(pSup->pCtx, code, lino, _return, terrno);
×
860
  }
861

862
  initResultSizeInfo(&pOperator->resultInfo, 1024);
×
863
  TSDB_CHECK_CODE(blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity), lino, _return);
×
864

865
  size_t  numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock);
×
866
  int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
×
867

868
  if (!pVirtualScanPhyNode->scan.node.dynamicOp) {
×
869
    VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, 0));
×
870
    pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys);
×
871
    TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno);
×
872
  } else {
873
    pTaskInfo->dynamicTask = true;
×
874
  }
875
  pVirtualScanInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols);
×
876
  pVirtualScanInfo->sortBufSize =
×
877
      pVirtualScanInfo->bufPageSize * (numOfDownstream + 1);  // one additional is reserved for merged result.
×
878
  VTS_ERR_JRET(
×
879
      extractColMap(pVirtualScanPhyNode->pTargets, &pVirtualScanInfo->dataSlotMap, &pVirtualScanInfo->tsSlotId, &pVirtualScanInfo->tagBlockId));
880

881
  pVirtualScanInfo->scanAllCols = pVirtualScanPhyNode->scanAllCols;
×
882

883
  VTS_ERR_JRET(filterInitFromNode((SNode*)pVirtualScanPhyNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo,
×
884
                                  0, pTaskInfo->pStreamRuntimeInfo));
885

886
  pVirtualScanInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
×
887
  QUERY_CHECK_NULL(pVirtualScanInfo->base.metaCache.pTableMetaEntryCache, code, lino, _return, terrno);
×
888

889
  setOperatorInfo(pOperator, "VirtualTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN, false,
×
890
                  OP_NOT_OPENED, pInfo, pTaskInfo);
891
  pOperator->fpSet =
892
      createOperatorFpSet(openVirtualTableScanOperator, virtualTableGetNext, NULL, destroyVirtualTableScanOperatorInfo,
×
893
                          optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
894
  setOperatorResetStateFn(pOperator, resetVirtualTableMergeOperState);
×
895

896
  if (NULL != pDownstream) {
×
897
    VTS_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));
×
898
  } else {
899
    pVirtualScanInfo->tagDownStreamId = -1;
×
900
  }
901

902
  nodesDestroyList(pMergeKeys);
×
903
  *pOptrInfo = pOperator;
×
904
  return TSDB_CODE_SUCCESS;
×
905

906
_return:
×
907
  if (code != TSDB_CODE_SUCCESS) {
×
908
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
909
  }
910
  if (pInfo != NULL) {
×
911
    destroyVirtualTableScanOperatorInfo(pInfo);
×
912
  }
913
  nodesDestroyList(pMergeKeys);
×
914
  pTaskInfo->code = code;
×
915
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
916
  return code;
×
917
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc