• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4794

15 Oct 2025 11:06AM UTC coverage: 60.905% (-0.001%) from 60.906%
#4794

push

travis-ci

web-flow
Merge c54016657 into 7e74ade39

154617 of 324341 branches covered (47.67%)

Branch coverage included in aggregate %.

54 of 65 new or added lines in 13 files covered. (83.08%)

2506 existing lines in 114 files now uncovered.

207077 of 269525 relevant lines covered (76.83%)

125143530.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.81
/source/libs/executor/src/virtualtablescanoperator.c
1
/*
2
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
*
4
* This program is free software: you can use, redistribute, and/or modify
5
* it under the terms of the GNU Affero General Public License, version 3
6
* or later ("AGPL"), as published by the Free Software Foundation.
7
*
8
* This program is distributed in the hope that it will be useful, but WITHOUT
9
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
* FITNESS FOR A PARTICULAR PURPOSE.
11
*
12
* You should have received a copy of the GNU Affero General Public License
13
* along with this program. If not, see <http://www.gnu.org/licenses/>.
14
*/
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "tdatablock.h"
21
#include "virtualtablescan.h"
22
#include "tsort.h"
23

24
typedef struct SVirtualTableScanInfo {
25
  STableScanBase base;
26
  SArray*        pSortInfo;
27
  SSortHandle*   pSortHandle;
28
  int32_t        bufPageSize;
29
  uint32_t       sortBufSize;  // max buffer size for in-memory sort
30
  SSDataBlock*   pIntermediateBlock;   // to hold the intermediate result
31
  SSDataBlock*   pInputBlock;
32
  SHashObj*      dataSlotMap;
33
  int32_t        tsSlotId;
34
  int32_t        tagBlockId;
35
  int32_t        tagDownStreamId;
36
  bool           scanAllCols;
37
  SArray*        pSortCtxList;
38
  tb_uid_t       vtableUid;  // virtual table uid, used to identify the vtable scan operator
39
} SVirtualTableScanInfo;
40

41
typedef struct SVirtualScanMergeOperatorInfo {
42
  SOptrBasicInfo        binfo;
43
  EMergeType            type;
44
  SVirtualTableScanInfo virtualScanInfo;
45
  bool                  ignoreGroupId;
46
  uint64_t              groupId;
47
  STupleHandle*         pSavedTuple;
48
  SSDataBlock*          pSavedTagBlock;
49
} SVirtualScanMergeOperatorInfo;
50

51
typedef struct SLoadNextCtx {
52
  SOperatorInfo*  pOperator;
53
  SOperatorParam* pOperatorGetParam;
54
  int32_t         blockId;
55
  STimeWindow     window;
56
  SSDataBlock*    pIntermediateBlock;
57
  col_id_t        tsSlotId;
58
} SLoadNextCtx;
59

60
int32_t virtualScanloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
24,899,151✔
61
  SOperatorInfo* pOperator = (SOperatorInfo*)param;
24,899,151✔
62
  int32_t        code = TSDB_CODE_SUCCESS;
24,899,151✔
63

64
  VTS_ERR_JRET(pOperator->fpSet.getNextFn(pOperator, ppBlock));
24,899,151!
65
  VTS_ERR_JRET(blockDataCheck(*ppBlock));
24,899,151!
66

67
  return code;
24,899,151✔
68
_return:
×
69
  qError("failed to load data block from downstream, %s code:%s", __func__, tstrerror(code));
×
70
  return code;
×
71
}
72

73
int32_t getTimeWindowOfBlock(SSDataBlock *pBlock, col_id_t tsSlotId, int64_t *startTs, int64_t *endTs) {
22,383,511✔
74
  int32_t code = TSDB_CODE_SUCCESS;
22,383,511✔
75
  int32_t lino = 0;
22,383,511✔
76
  int32_t tsIndex = -1;
22,383,511✔
77
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
46,699,365!
78
    if (((SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i))->info.colId == tsSlotId) {
46,699,365✔
79
      tsIndex = i;
22,383,511✔
80
      break;
22,383,511✔
81
    }
82
  }
83

84
  if (tsIndex == -1) {
22,383,511!
85
    tsIndex = (int32_t)taosArrayGetSize(pBlock->pDataBlock) - 1;
×
86
  }
87

88
  SColumnInfoData *pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, tsIndex);
22,383,511✔
89
  QUERY_CHECK_NULL(pColData, code, lino, _return, terrno)
22,383,511!
90

91
  GET_TYPED_DATA(*startTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, 0), 0);
22,383,511✔
92
  GET_TYPED_DATA(*endTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, pBlock->info.rows - 1), 0);
22,383,511✔
93

94
  return code;
22,383,511✔
95
_return:
×
96
  qError("failed to get time window of block, %s code:%s, line:%d", __func__, tstrerror(code), lino);
×
97
  return code;
×
98
}
99

100
int32_t virtualScanloadNextDataBlockFromParam(void* param, SSDataBlock** ppBlock) {
29,842,174✔
101
  SLoadNextCtx*           pCtx = (SLoadNextCtx*)param;
29,842,174✔
102
  SOperatorInfo*          pOperator = pCtx->pOperator;
29,842,174✔
103
  SOperatorParam*         pOperatorGetParam = pCtx->pOperatorGetParam;
29,842,174✔
104
  int32_t                 code = TSDB_CODE_SUCCESS;
29,842,174✔
105
  SSDataBlock*            pRes = NULL;
29,842,174✔
106
  SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperatorGetParam->value;
29,842,174✔
107

108
  pParam->basic.window = pCtx->window;
29,842,174✔
109
  pOperator->status = OP_NOT_OPENED;
29,842,174✔
110
  if (pCtx->pIntermediateBlock) {
29,842,174✔
111
    blockDataDestroy(pCtx->pIntermediateBlock);
22,327,718✔
112
    pCtx->pIntermediateBlock = NULL;
22,327,718✔
113
  }
114

115
  VTS_ERR_JRET(pOperator->fpSet.getNextExtFn(pOperator, pOperatorGetParam, &pRes));
29,842,174!
116

117
  VTS_ERR_JRET(blockDataCheck(pRes));
29,842,174!
118
  if ((pRes)) {
29,842,174✔
119
    qDebug("%s load from downstream, blockId:%d", __func__, pCtx->blockId);
22,383,511✔
120
    (pRes)->info.id.blockId = pCtx->blockId;
22,383,511✔
121
    VTS_ERR_JRET(getTimeWindowOfBlock(pRes, pCtx->tsSlotId, &pCtx->window.skey, &pCtx->window.ekey));
22,383,511!
122
    VTS_ERR_JRET(createOneDataBlock(pRes, true, &pCtx->pIntermediateBlock));
22,383,511!
123
    *ppBlock = pCtx->pIntermediateBlock;
22,383,511✔
124
  } else {
125
    pCtx->window.ekey = INT64_MAX;
7,458,663✔
126
    *ppBlock = NULL;
7,458,663✔
127
  }
128

129
  return code;
29,842,174✔
130
_return:
×
131
  qError("failed to load data block from downstream, %s code:%s", __func__, tstrerror(code));
×
132
  return code;
×
133
}
134

135
int32_t makeTSMergeKey(SNodeList** pMergeKeys, col_id_t tsSlotId) {
5,140,510✔
136
  int32_t           code = TSDB_CODE_SUCCESS;
5,140,510✔
137
  SNodeList        *pNodeList = NULL;
5,140,510✔
138
  SColumnNode      *pColumnNode = NULL;
5,140,510✔
139
  SOrderByExprNode *pOrderByExprNode = NULL;
5,140,510✔
140

141
  VTS_ERR_JRET(nodesMakeList(&pNodeList));
5,140,510!
142

143
  VTS_ERR_JRET(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pColumnNode));
5,140,510!
144
  pColumnNode->slotId = tsSlotId;
5,140,510✔
145

146
  VTS_ERR_JRET(nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR, (SNode**)&pOrderByExprNode));
5,140,510!
147
  pOrderByExprNode->pExpr = (SNode*)pColumnNode;
5,140,510✔
148
  pOrderByExprNode->order = ORDER_ASC;
5,140,510✔
149
  pOrderByExprNode->nullOrder = NULL_ORDER_FIRST;
5,140,510✔
150

151
  VTS_ERR_JRET(nodesListAppend(pNodeList, (SNode*)pOrderByExprNode));
5,140,510!
152

153
  *pMergeKeys = pNodeList;
5,140,510✔
154
  return code;
5,140,510✔
155
_return:
×
156
  nodesDestroyNode((SNode*)pColumnNode);
×
157
  nodesDestroyNode((SNode*)pOrderByExprNode);
×
158
  nodesDestroyList(pNodeList);
×
159
  return code;
×
160
}
161

162
void cleanUpVirtualScanInfo(SVirtualTableScanInfo* pVirtualScanInfo) {
2,866,442✔
163
  if (pVirtualScanInfo->pSortInfo) {
2,866,442✔
164
    taosArrayDestroy(pVirtualScanInfo->pSortInfo);
2,138,895✔
165
    pVirtualScanInfo->pSortInfo = NULL;
2,138,895✔
166
  }
167
  if (pVirtualScanInfo->pSortHandle) {
2,866,442✔
168
    tsortDestroySortHandle(pVirtualScanInfo->pSortHandle);
2,125,564✔
169
    pVirtualScanInfo->pSortHandle = NULL;
2,125,564✔
170
  }
171
  if (pVirtualScanInfo->pSortCtxList) {
2,866,442✔
172
    for (int32_t i = 0; i < taosArrayGetSize(pVirtualScanInfo->pSortCtxList); i++) {
7,143,802✔
173
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pVirtualScanInfo->pSortCtxList, i);
5,018,238✔
174
      blockDataDestroy(pCtx->pIntermediateBlock);
5,018,238✔
175
      taosMemoryFree(pCtx);
5,018,238!
176
    }
177
    taosArrayDestroy(pVirtualScanInfo->pSortCtxList);
2,125,564✔
178
  }
179
}
2,866,442✔
180

181
int32_t createSortHandleFromParam(SOperatorInfo* pOperator) {
2,866,442✔
182
  int32_t                         code = TSDB_CODE_SUCCESS;
2,866,442✔
183
  int32_t                         lino = 0;
2,866,442✔
184
  SVirtualScanMergeOperatorInfo*  pInfo = pOperator->info;
2,866,442✔
185
  SVirtualTableScanInfo*          pVirtualScanInfo = &pInfo->virtualScanInfo;
2,866,442✔
186
  SVTableScanOperatorParam *      pParam = (SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value;
2,866,442✔
187
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
2,866,442✔
188
  pVirtualScanInfo->sortBufSize = pVirtualScanInfo->bufPageSize * (taosArrayGetSize((pParam)->pOpParamArray) + 1);
2,866,442✔
189
  int32_t                         numOfBufPage = (int32_t)pVirtualScanInfo->sortBufSize / pVirtualScanInfo->bufPageSize;
2,866,442!
190
  SNodeList*                      pMergeKeys = NULL;
2,866,442✔
191
  SSortSource*                    ps = NULL;
2,866,442✔
192
  int32_t                         scanOpIndex = 0;
2,866,442✔
193

194
  cleanUpVirtualScanInfo(pVirtualScanInfo);
2,866,442✔
195
  VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, pVirtualScanInfo->tsSlotId));
2,866,442!
196
  pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys);
2,866,442✔
197
  TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno)
2,866,442!
198
  nodesDestroyList(pMergeKeys);
2,866,442✔
199

200
  VTS_ERR_JRET(tsortCreateSortHandle(pVirtualScanInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pVirtualScanInfo->bufPageSize,
2,866,442!
201
                                     numOfBufPage, pVirtualScanInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pVirtualScanInfo->pSortHandle));
202

203
  tsortSetForceUsePQSort(pVirtualScanInfo->pSortHandle);
2,866,442✔
204
  tsortSetFetchRawDataFp(pVirtualScanInfo->pSortHandle, virtualScanloadNextDataBlockFromParam, NULL, NULL);
2,866,442✔
205

206
  if (pOperator->numOfDownstream > 2) {
2,866,442!
207
    qError("virtual scan operator should not have more than 2 downstreams, current numOfDownstream:%d", pOperator->numOfDownstream);
×
208
    VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM);
×
209
  }
210

211
  pVirtualScanInfo->tagDownStreamId = -1;
2,866,442✔
212
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
7,965,492✔
213
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
5,099,050✔
214
    if (pDownstream->resultDataBlockId == pVirtualScanInfo->tagBlockId) {
5,099,050✔
215
      // tag block do not need sort
216
      pVirtualScanInfo->tagDownStreamId = i;
2,232,608✔
217
      pInfo->pSavedTagBlock = NULL;
2,232,608✔
218
      continue;
2,232,608✔
219
    }
220
  }
221
  scanOpIndex = pVirtualScanInfo->tagDownStreamId == -1 ? 0 : 1 - pVirtualScanInfo->tagDownStreamId;
2,866,442✔
222

223
  pOperator->pDownstream[scanOpIndex]->status = OP_NOT_OPENED;
2,866,442✔
224
  pVirtualScanInfo->pSortCtxList = taosArrayInit(taosArrayGetSize((pParam)->pOpParamArray), POINTER_BYTES);
2,866,442✔
225
  TSDB_CHECK_NULL(pVirtualScanInfo->pSortCtxList, code, lino, _return, terrno)
2,866,442!
226
  for (int32_t i = 0; i < taosArrayGetSize((pParam)->pOpParamArray); i++) {
10,380,898✔
227
    SOperatorParam* pOpParam = *(SOperatorParam**)taosArrayGet((pParam)->pOpParamArray, i);
7,514,456✔
228
    SLoadNextCtx*   pCtx = NULL;
7,514,456✔
229
    ps = NULL;
7,514,456✔
230

231
    pCtx = taosMemoryMalloc(sizeof(SLoadNextCtx));
7,514,456!
232
    QUERY_CHECK_NULL(pCtx, code, lino, _return, terrno)
7,514,456!
233
    pCtx->blockId = i;
7,514,456✔
234
    pCtx->pOperator = pOperator->pDownstream[scanOpIndex];
7,514,456✔
235
    pCtx->pOperatorGetParam = pOpParam;
7,514,456✔
236
    pCtx->window = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN};
7,514,456✔
237
    pCtx->pIntermediateBlock = NULL;
7,514,456✔
238
    pCtx->tsSlotId = (col_id_t)pVirtualScanInfo->tsSlotId;
7,514,456✔
239

240
    ps = taosMemoryCalloc(1, sizeof(SSortSource));
7,514,456!
241
    QUERY_CHECK_NULL(ps, code, lino, _return, terrno)
7,514,456!
242

243
    ps->param = pCtx;
7,514,456✔
244
    ps->onlyRef = true;
7,514,456✔
245

246
    VTS_ERR_JRET(tsortAddSource(pVirtualScanInfo->pSortHandle, ps));
7,514,456!
247
    QUERY_CHECK_NULL(taosArrayPush(pVirtualScanInfo->pSortCtxList, &pCtx), code, lino, _return, terrno)
15,028,912!
248
  }
249

250
  VTS_ERR_JRET(tsortOpen(pVirtualScanInfo->pSortHandle));
2,866,442!
251

252
  return code;
2,866,442✔
253
_return:
×
254
  if (code != 0){
×
255
    qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
256
  }
257
  nodesDestroyList(pMergeKeys);
×
258
  if (ps != NULL) {
×
259
    taosMemoryFree(ps);
×
260
  }
261
  return code;
×
262
}
263

264
int32_t createSortHandle(SOperatorInfo* pOperator) {
2,253,230✔
265
  SVirtualScanMergeOperatorInfo * pInfo = pOperator->info;
2,253,230✔
266
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
2,253,230✔
267
  SVirtualTableScanInfo*          pVirtualScanInfo = &pInfo->virtualScanInfo;
2,253,230✔
268
  int32_t                         numOfBufPage = (int32_t)pVirtualScanInfo->sortBufSize / pVirtualScanInfo->bufPageSize;
2,253,230!
269
  SSortSource*                    ps = NULL;
2,253,230✔
270
  int32_t                         code = 0;
2,253,230✔
271
  int32_t                         lino = 0;
2,253,230✔
272

273
  VTS_ERR_JRET(tsortCreateSortHandle(pVirtualScanInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pVirtualScanInfo->bufPageSize,
2,253,230!
274
                                     numOfBufPage, pVirtualScanInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pVirtualScanInfo->pSortHandle));
275

276
  tsortSetForceUsePQSort(pVirtualScanInfo->pSortHandle);
2,253,230✔
277
  tsortSetFetchRawDataFp(pVirtualScanInfo->pSortHandle, virtualScanloadNextDataBlock, NULL, NULL);
2,253,230✔
278

279
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
10,183,207✔
280
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
7,929,977✔
281
    if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
7,929,977!
282
      VTS_ERR_JRET(pDownstream->fpSet._openFn(pDownstream));
7,929,977!
283
    } else {
284
      VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM);
×
285
    }
286

287
    if (pDownstream->resultDataBlockId == pVirtualScanInfo->tagBlockId) {
7,929,977✔
288
      // tag block do not need sort
289
      pVirtualScanInfo->tagDownStreamId = i;
224,142✔
290
      continue;
224,142✔
291
    }
292

293
    ps = taosMemoryCalloc(1, sizeof(SSortSource));
7,705,835!
294
    TSDB_CHECK_NULL(ps, code, lino, _return, terrno)
7,705,835!
295

296
    ps->param = pDownstream;
7,705,835✔
297
    ps->onlyRef = true;
7,705,835✔
298

299
    VTS_ERR_JRET(tsortAddSource(pVirtualScanInfo->pSortHandle, ps));
7,705,835!
300
    ps = NULL;
7,705,835✔
301
  }
302

303
  VTS_ERR_JRET(tsortOpen(pVirtualScanInfo->pSortHandle));
2,253,230!
304

305
_return:
2,253,230✔
306
  if (code != 0){
2,253,230!
307
    qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
308
  }
309
  if (ps != NULL) {
2,253,230!
310
    taosMemoryFree(ps);
×
311
  }
312
  return code;
2,253,230✔
313
}
314

315
int32_t openVirtualTableScanOperatorImpl(SOperatorInfo* pOperator) {
5,140,510✔
316
  int32_t                         code = 0;
5,140,510✔
317
  int32_t                         lino = 0;
5,140,510✔
318

319
  if (pOperator->numOfDownstream == 0) {
5,140,510✔
320
    return code;
20,838✔
321
  }
322

323
  if (pOperator->pOperatorGetParam) {
5,119,672✔
324
    VTS_ERR_JRET(createSortHandleFromParam(pOperator));
2,866,442!
325
  } else {
326
    VTS_ERR_JRET(createSortHandle(pOperator));
2,253,230!
327
  }
328

329
  return code;
5,119,672✔
330

331
_return:
×
332
  qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
333
  return code;
×
334
}
335

336
int32_t openVirtualTableScanOperator(SOperatorInfo* pOperator) {
61,890,605✔
337
  int32_t code = 0;
61,890,605✔
338

339
  if (OPTR_IS_OPENED(pOperator)) {
61,890,605✔
340
    return TSDB_CODE_SUCCESS;
56,750,095✔
341
  }
342

343
  int64_t startTs = taosGetTimestampUs();
5,140,510✔
344

345
  code = openVirtualTableScanOperatorImpl(pOperator);
5,140,510✔
346

347
  pOperator->cost.openCost = (double)(taosGetTimestampUs() - startTs) / 1000.0;
5,140,510✔
348
  pOperator->status = OP_RES_TO_RETURN;
5,140,510✔
349

350
  VTS_ERR_RET(code);
5,140,510!
351

352
  OPTR_SET_OPENED(pOperator);
5,140,510✔
353
  return code;
5,140,510✔
354
}
355

356
static int32_t doGetVtableMergedBlockData(SVirtualScanMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
36,745,360✔
357
                                          SSDataBlock* p) {
358
  int32_t code = 0;
36,745,360✔
359
  int64_t lastTs = 0;
36,745,360✔
360
  int64_t rowNums = -1;
36,745,360✔
361
  blockDataEmpty(p);
36,745,360✔
362
  while (1) {
2,147,483,647✔
363
    STupleHandle* pTupleHandle = NULL;
2,147,483,647✔
364
    if (!pInfo->pSavedTuple) {
2,147,483,647✔
365
      code = tsortNextTuple(pHandle, &pTupleHandle);
2,147,483,647✔
366
      if (pTupleHandle == NULL || (code != 0)) {
2,147,483,647!
367
        break;
368
      }
369
    } else {
370
      pTupleHandle = pInfo->pSavedTuple;
32,612,484✔
371
      pInfo->pSavedTuple = NULL;
32,612,484✔
372
    }
373

374
    SDataBlockInfo info = {0};
2,147,483,647✔
375
    tsortGetBlockInfo(pTupleHandle, &info);
2,147,483,647✔
376
    int32_t blockId = (int32_t)info.id.blockId;
2,147,483,647✔
377

378
    for (int32_t i = 0; i < (pInfo->virtualScanInfo.scanAllCols ? 1 : tsortGetColNum(pTupleHandle)); i++) {
2,147,483,647!
379
      bool isNull = tsortIsNullVal(pTupleHandle, i);
2,147,483,647✔
380
      if (isNull) {
2,147,483,647!
UNCOV
381
        int32_t slotKey = blockId << 16 | i;
×
UNCOV
382
        void*   slotId = taosHashGet(pInfo->virtualScanInfo.dataSlotMap, &slotKey, sizeof(slotKey));
×
UNCOV
383
        if (slotId == NULL) {
×
384
          if (i == 0) {
×
385
            colDataSetNULL(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums);
×
386
          } else {
387
            qError("failed to get slotId from dataSlotMap, blockId:%d, slotId:%d", blockId, i);
×
388
            VTS_ERR_RET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
389
          }
390
        } else {
UNCOV
391
          colDataSetNULL(taosArrayGet(p->pDataBlock, *(int32_t *)slotId), rowNums);
×
392
        }
393
      } else {
394
        char* pData = NULL;
2,147,483,647✔
395
        tsortGetValue(pTupleHandle, i, (void**)&pData);
2,147,483,647✔
396

397
        if (pData != NULL) {
2,147,483,647✔
398
          if (i == 0) {
2,147,483,647✔
399
            if (lastTs != *(int64_t*)pData) {
2,147,483,647✔
400
              if (rowNums >= capacity - 1) {
2,147,483,647✔
401
                pInfo->pSavedTuple = pTupleHandle;
32,946,708✔
402
                goto _return;
32,946,708✔
403
              }
404
              rowNums++;
2,147,483,647✔
405
              for (int32_t j = 0; j < taosArrayGetSize(p->pDataBlock); j++) {
2,147,483,647✔
406
                colDataSetNULL(taosArrayGet(p->pDataBlock, j), rowNums);
2,147,483,647✔
407
              }
408
              if (pInfo->virtualScanInfo.tsSlotId != -1) {
2,147,483,647✔
409
                VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums, pData, false));
2,147,483,647!
410
              }
411
              lastTs = *(int64_t*)pData;
2,147,483,647✔
412
            }
413
          }
414
          int32_t slotKey = blockId << 16 | i;
2,147,483,647✔
415
          void*   slotId = taosHashGet(pInfo->virtualScanInfo.dataSlotMap, &slotKey, sizeof(slotKey));
2,147,483,647✔
416
          if (slotId == NULL) {
2,147,483,647✔
417
            if (i == 0) {
2,147,483,647!
418
              continue;
2,147,483,647✔
419
            } else {
420
              qError("failed to get slotId from dataSlotMap, blockId:%d, slotId:%d", blockId, i);
×
421
              VTS_ERR_RET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
422
            }
423
          }
424
          VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, *(int32_t *)slotId), rowNums, pData, false));
2,147,483,647!
425
        }
426
      }
427
    }
428
  }
429
_return:
36,745,360✔
430
  p->info.rows = rowNums + 1;
36,745,360✔
431
  p->info.dataLoad = 1;
36,745,360✔
432
  p->info.scanFlag = MAIN_SCAN;
36,745,360✔
433
  return code;
36,745,360✔
434
}
435

436
static int32_t doGetVStableMergedBlockData(SVirtualScanMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
39,733,132✔
437
                                           SSDataBlock* p) {
438
  int32_t code = 0;
39,733,132✔
439
  int64_t lastTs = 0;
39,733,132✔
440
  int64_t rowNums = -1;
39,733,132✔
441
  blockDataEmpty(p);
39,733,132✔
442
  while (1) {
2,147,483,647✔
443
    STupleHandle* pTupleHandle = NULL;
2,147,483,647✔
444
    if (!pInfo->pSavedTuple) {
2,147,483,647✔
445
      code = tsortNextTuple(pHandle, &pTupleHandle);
2,147,483,647✔
446
      if (pTupleHandle == NULL || (code != 0)) {
2,147,483,647!
447
        break;
448
      }
449
    } else {
450
      pTupleHandle = pInfo->pSavedTuple;
34,788,453✔
451
      pInfo->pSavedTuple = NULL;
34,788,453✔
452
    }
453

454
    int32_t tsIndex = -1;
2,147,483,647✔
455

456
    for (int32_t i = 0; i < tsortGetColNum(pTupleHandle); i++) {
2,147,483,647!
457
      if (tsortIsNullVal(pTupleHandle, i)) {
2,147,483,647✔
458
        continue;
2,147,483,647✔
459
      } else {
460
        SColumnInfoData *pColInfo = NULL;
2,147,483,647✔
461
        tsortGetColumnInfo(pTupleHandle, i, &pColInfo);
2,147,483,647✔
462
        if (pColInfo->info.slotId ==  pInfo->virtualScanInfo.tsSlotId) {
2,147,483,647✔
463
          tsIndex = i;
2,147,483,647✔
464
          break;
2,147,483,647✔
465
        }
466
      }
467
    }
468

469
    if (tsIndex == -1) {
2,147,483,647!
470
      tsIndex = (int32_t)tsortGetColNum(pTupleHandle) - 1;
×
471
    }
472

473
    char* pData = NULL;
2,147,483,647✔
474
    // first, set ts slot's data
475
    // then, set other slots' data
476
    tsortGetValue(pTupleHandle, tsIndex, (void**)&pData);
2,147,483,647✔
477

478
    if (pData != NULL) {
2,147,483,647!
479
      if (lastTs != *(int64_t*)pData) {
2,147,483,647✔
480
        if (rowNums >= capacity - 1) {
2,147,483,647✔
481
          pInfo->pSavedTuple = pTupleHandle;
34,804,137✔
482
          goto _return;
34,804,137✔
483
        }
484
        rowNums++;
2,147,483,647✔
485
        for (int32_t j = 0; j < taosArrayGetSize(p->pDataBlock); j++) {
2,147,483,647✔
486
          colDataSetNULL(taosArrayGet(p->pDataBlock, j), rowNums);
2,147,483,647✔
487
        }
488
        if (pInfo->virtualScanInfo.tsSlotId != -1) {
2,147,483,647!
489
          VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums, pData, false));
2,147,483,647!
490
        }
491
        lastTs = *(int64_t*)pData;
2,147,483,647✔
492
      }
493
    }
494
    if (pInfo->virtualScanInfo.scanAllCols) {
2,147,483,647!
495
      continue;
2,147,483,647✔
496
    }
497

498
    for (int32_t i = 0; i < tsortGetColNum(pTupleHandle); i++) {
2,147,483,647✔
499
      if (i == tsIndex || tsortIsNullVal(pTupleHandle, i)) {
2,147,483,647✔
500
        continue;
2,147,483,647✔
501
      }
502

503
      SColumnInfoData *pColInfo = NULL;
2,147,483,647✔
504
      tsortGetColumnInfo(pTupleHandle, i, &pColInfo);
2,147,483,647✔
505
      tsortGetValue(pTupleHandle, i, (void**)&pData);
2,147,483,647✔
506

507
      if (pData != NULL) {
2,147,483,647!
508
        VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, i), rowNums, pData, false));
2,147,483,647!
509
      }
510
    }
511
  }
512
_return:
39,733,132✔
513
  p->info.rows = rowNums + 1;
39,733,132✔
514
  p->info.dataLoad = 1;
39,733,132✔
515
  p->info.scanFlag = MAIN_SCAN;
39,733,132✔
516
  return code;
39,733,132✔
517
}
518

519
int32_t doVirtualTableMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
76,498,621✔
520
  int32_t                        code = 0;
76,498,621✔
521
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
76,498,621✔
522
  SVirtualScanMergeOperatorInfo* pInfo = pOperator->info;
76,499,330✔
523
  SVirtualTableScanInfo*         pVirtualScanInfo = &pInfo->virtualScanInfo;
76,499,330✔
524
  SSortHandle*                   pHandle = pVirtualScanInfo->pSortHandle;
76,499,330✔
525
  SSDataBlock*                   pDataBlock = pInfo->binfo.pRes;
76,499,330✔
526
  int32_t                        capacity = pOperator->resultInfo.capacity;
76,499,330✔
527

528
  qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
76,499,330✔
529
  blockDataCleanup(pDataBlock);
76,499,330✔
530

531
  if (pHandle == NULL) {
76,499,330✔
532
    return TSDB_CODE_SUCCESS;
20,838✔
533
  }
534

535
  if (pVirtualScanInfo->pIntermediateBlock == NULL) {
76,478,492✔
536
    VTS_ERR_RET(tsortGetSortedDataBlock(pHandle, &pVirtualScanInfo->pIntermediateBlock));
2,994,108!
537
    if (pVirtualScanInfo->pIntermediateBlock == NULL) {
2,994,108!
538
      return TSDB_CODE_SUCCESS;
×
539
    }
540

541
    VTS_ERR_RET(blockDataEnsureCapacity(pVirtualScanInfo->pIntermediateBlock, capacity));
2,994,108!
542
  } else {
543
    blockDataCleanup(pVirtualScanInfo->pIntermediateBlock);
73,484,384✔
544
  }
545

546
  SSDataBlock* p = pVirtualScanInfo->pIntermediateBlock;
76,478,492✔
547
  if (pOperator->pOperatorGetParam) {
76,478,492✔
548
    VTS_ERR_RET(doGetVStableMergedBlockData(pInfo, pHandle, capacity, p));
39,733,132!
549
  } else {
550
    VTS_ERR_RET(doGetVtableMergedBlockData(pInfo, pHandle, capacity, p));
36,745,360!
551
  }
552

553
  VTS_ERR_RET(copyDataBlock(pDataBlock, p));
76,478,492!
554
  qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
76,478,492✔
555
         pDataBlock->info.rows);
556

557
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
76,478,492✔
558
  return code;
76,478,492✔
559
}
560

561
int32_t vtableAddTagPseudoColumnData(const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* tagBlock, SSDataBlock* pBlock, int32_t rows) {
30,988,413✔
562
  int32_t          code = TSDB_CODE_SUCCESS;
30,988,413✔
563
  int32_t          lino = 0;
30,988,413✔
564
  int64_t          backupRows;
565
  // currently only the tbname pseudo column
566
  if (numOfExpr <= 0) {
30,988,413!
567
    return TSDB_CODE_SUCCESS;
×
568
  }
569

570
  if (tagBlock == NULL) {
30,988,413!
571
    return TSDB_CODE_SUCCESS;
×
572
  }
573

574
  if (tagBlock->info.rows != 1) {
30,988,413!
575
    qError("tag block should have only one row, current rows:%" PRId64, tagBlock->info.rows);
×
576
    VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
×
577
  }
578

579
  backupRows = pBlock->info.rows;
30,988,413✔
580
  pBlock->info.rows = rows;
30,988,413✔
581
  for (int32_t j = 0; j < numOfExpr; ++j) {
115,671,348✔
582
    const SExprInfo* pExpr1 = &pExpr[j];
84,682,935✔
583
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
84,682,935✔
584

585
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
84,682,935✔
586
    TSDB_CHECK_NULL(pColInfoData, code, lino, _return, terrno)
84,682,935!
587
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
84,682,935✔
588

589
    SColumnInfoData* pTagInfoData = taosArrayGet(tagBlock->pDataBlock, j);
84,682,935✔
590
    TSDB_CHECK_NULL(pTagInfoData, code, lino, _return, terrno)
84,682,935!
591

592
    if (colDataIsNull_s(pTagInfoData, 0) || IS_JSON_NULL(pTagInfoData->info.type, colDataGetData(pTagInfoData, 0))) {
84,682,935!
593
      colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
594
      continue;
×
595
    }
596

597
    char* data = colDataGetData(pTagInfoData, 0);
84,682,935!
598

599
    if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
84,682,935!
600
      code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, 1, false);
84,682,935✔
601
      QUERY_CHECK_CODE(code, lino, _return);
84,682,935!
602
    } else {  // todo opt for json tag
603
      for (int32_t i = 0; i < pBlock->info.rows; ++i) {
×
604
        code = colDataSetVal(pColInfoData, i, data, false);
×
605
        QUERY_CHECK_CODE(code, lino, _return);
×
606
      }
607
    }
608
  }
609

610
  // restore the rows
611
  pBlock->info.rows = backupRows;
30,988,413✔
612

613
_return:
30,988,413✔
614

615
  if (code != TSDB_CODE_SUCCESS) {
30,988,413!
616
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
617
  }
618
  return code;
30,988,413✔
619
}
620

621
static int32_t doSetTagColumnData(SVirtualTableScanInfo* pInfo, SSDataBlock* pTagBlock, SSDataBlock* pBlock, int32_t rows) {
71,708,728✔
622
  int32_t         code = 0;
71,708,728✔
623
  STableScanBase* pTableScanInfo = &pInfo->base;
71,708,728✔
624
  SExprSupp*      pSup = &pTableScanInfo->pseudoSup;
71,708,728✔
625
  if (pSup->numOfExprs > 0) {
71,708,728✔
626
    VTS_ERR_RET(vtableAddTagPseudoColumnData(pSup->pExprInfo, pSup->numOfExprs, pTagBlock, pBlock, rows));
30,988,413!
627
  }
628

629
  return code;
71,708,728✔
630
}
631

632
int32_t virtualTableGetNext(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
61,890,605✔
633
  int32_t                        code = TSDB_CODE_SUCCESS;
61,890,605✔
634
  int32_t                        lino = 0;
61,890,605✔
635
  SVirtualScanMergeOperatorInfo* pInfo = pOperator->info;
61,890,605✔
636
  SVirtualTableScanInfo*         pVirtualScanInfo = &pInfo->virtualScanInfo;
61,890,605✔
637
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
61,890,605✔
638

639
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
61,890,605!
640
    pResBlock = NULL;
×
641
    return code;
×
642
  }
643

644
  VTS_ERR_JRET(pOperator->fpSet._openFn(pOperator));
61,890,605!
645

646
  if (pVirtualScanInfo->tagBlockId != -1 && pVirtualScanInfo->tagDownStreamId != -1 && !pInfo->pSavedTagBlock) {
61,890,605✔
647
    SSDataBlock*   pTagBlock = NULL;
2,456,750✔
648
    SOperatorInfo *pTagScanOp = pOperator->pDownstream[pVirtualScanInfo->tagDownStreamId];
2,456,750✔
649
    if (pOperator->pOperatorGetParam) {
2,456,750✔
650
      SOperatorParam* pTagOp = ((SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value)->pTagScanOp;
2,232,608✔
651
      VTS_ERR_JRET(pTagScanOp->fpSet.getNextExtFn(pTagScanOp, pTagOp, &pTagBlock));
2,232,608!
652
    } else {
653
      VTS_ERR_JRET(pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagBlock));
224,142!
654
    }
655

656
    if (pTagBlock == NULL || pTagBlock->info.rows != 1) {
2,456,750!
657
      VTS_ERR_JRET(TSDB_CODE_FAILED);
×
658
    }
659
    pInfo->pSavedTagBlock = pTagBlock;
2,456,750✔
660
  }
661

662
  while(1) {
663
    VTS_ERR_JRET(doVirtualTableMerge(pOperator, pResBlock));
76,499,330!
664
    if (*pResBlock == NULL) {
76,499,330✔
665
      setOperatorCompleted(pOperator);
4,790,602✔
666
      break;
4,790,602✔
667
    }
668

669
    if (pOperator->pOperatorGetParam) {
71,708,728✔
670
      uint64_t uid = ((SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value)->uid;
36,882,374✔
671
      (*pResBlock)->info.id.uid = uid;
36,882,374✔
672
    } else {
673
      (*pResBlock)->info.id.uid = pInfo->virtualScanInfo.vtableUid;
34,826,354✔
674
    }
675

676
    VTS_ERR_JRET(doSetTagColumnData(pVirtualScanInfo, pInfo->pSavedTagBlock, (*pResBlock), (*pResBlock)->info.rows));
71,708,728!
677
    VTS_ERR_JRET(doFilter(*pResBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL));
71,708,728!
678
    if ((*pResBlock)->info.rows > 0) {
71,708,728✔
679
      break;
57,100,003✔
680
    }
681
  }
682

683
  return code;
61,890,605✔
684
_return:
×
685
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
686
  pTaskInfo->code = code;
×
687
  T_LONG_JMP(pTaskInfo->env, code);
×
688
}
689

690
static void destroyTableScanBase(STableScanBase* pBase, TsdReader* pAPI) {
3,001,615✔
691
  cleanupQueryTableDataCond(&pBase->cond);
3,001,615✔
692

693
  if (pAPI->tsdReaderClose) {
3,001,615!
694
    pAPI->tsdReaderClose(pBase->dataReader);
×
695
  }
696
  pBase->dataReader = NULL;
3,001,615✔
697

698
  if (pBase->matchInfo.pList != NULL) {
3,001,615!
699
    taosArrayDestroy(pBase->matchInfo.pList);
×
700
  }
701

702
  taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
3,001,615✔
703
  cleanupExprSupp(&pBase->pseudoSup);
3,001,615✔
704
}
3,001,615✔
705

706
void destroyVirtualTableScanOperatorInfo(void* param) {
3,001,615✔
707
  if (!param) {
3,001,615!
708
    return;
×
709
  }
710
  SVirtualScanMergeOperatorInfo* pOperatorInfo = (SVirtualScanMergeOperatorInfo*)param;
3,001,615✔
711
  SVirtualTableScanInfo* pInfo = &pOperatorInfo->virtualScanInfo;
3,001,615✔
712
  blockDataDestroy(pOperatorInfo->binfo.pRes);
3,001,615✔
713
  pOperatorInfo->binfo.pRes = NULL;
3,001,615✔
714

715
  tsortDestroySortHandle(pInfo->pSortHandle);
3,001,615✔
716
  pInfo->pSortHandle = NULL;
3,001,615✔
717
  taosArrayDestroy(pInfo->pSortInfo);
3,001,615✔
718
  pInfo->pSortInfo = NULL;
3,001,615✔
719

720
  blockDataDestroy(pInfo->pIntermediateBlock);
3,001,615✔
721
  pInfo->pIntermediateBlock = NULL;
3,001,615✔
722

723
  blockDataDestroy(pInfo->pInputBlock);
3,001,615✔
724
  pInfo->pInputBlock = NULL;
3,001,615✔
725
  destroyTableScanBase(&pInfo->base, &pInfo->base.readerAPI);
3,001,615✔
726

727
  taosHashCleanup(pInfo->dataSlotMap);
3,001,615✔
728

729
  if (pInfo->pSortCtxList) {
3,001,615✔
730
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSortCtxList); i++) {
3,172,273✔
731
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pInfo->pSortCtxList, i);
2,456,476✔
732
      blockDataDestroy(pCtx->pIntermediateBlock);
2,456,476✔
733
      taosMemoryFree(pCtx);
2,456,476!
734
    }
735
    taosArrayDestroy(pInfo->pSortCtxList);
715,797✔
736
    pInfo->pSortCtxList = NULL;
715,797✔
737
  }
738
  taosMemoryFreeClear(param);
3,001,615!
739
}
740

741
int32_t extractColMap(SNodeList* pNodeList, SHashObj** pSlotMap, int32_t *tsSlotId, int32_t *tagBlockId) {
3,001,615✔
742
  size_t  numOfCols = LIST_LENGTH(pNodeList);
3,001,615✔
743
  int32_t code = TSDB_CODE_SUCCESS;
3,001,615✔
744
  int32_t lino = 0;
3,001,615✔
745

746
  if (numOfCols == 0) {
3,001,615✔
747
    return code;
3,798✔
748
  }
749

750
  *tsSlotId = -1;
2,997,817✔
751
  *tagBlockId = -1;
2,997,817✔
752
  *pSlotMap = taosHashInit(numOfCols, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
2,997,817✔
753
  TSDB_CHECK_NULL(*pSlotMap, code, lino, _return, terrno)
2,997,817!
754

755
  for (int32_t i = 0; i < numOfCols; ++i) {
19,080,733✔
756
    SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
16,082,916✔
757
    TSDB_CHECK_NULL(pColNode, code, lino, _return, terrno)
16,082,916!
758

759
    if (pColNode->isPrimTs) {
16,082,916!
760
      *tsSlotId = i;
2,707,065✔
761
    } else if (pColNode->hasRef) {
13,375,851!
762
      int32_t slotKey = pColNode->dataBlockId << 16 | pColNode->slotId;
7,421,488✔
763
      VTS_ERR_JRET(taosHashPut(*pSlotMap, &slotKey, sizeof(slotKey), &i, sizeof(i)));
7,421,488!
764
    } else if (pColNode->colType == COLUMN_TYPE_TAG || '\0' == pColNode->tableAlias[0]) {
5,954,363✔
765
      // tag column or pseudo column's function
766
      *tagBlockId = pColNode->dataBlockId;
1,718,618✔
767
    }
768
  }
769

770
  return code;
2,997,817✔
771
_return:
×
772
  taosHashCleanup(*pSlotMap);
×
773
  *pSlotMap = NULL;
×
774
  if (code != TSDB_CODE_SUCCESS) {
×
775
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
776
  }
777
  return code;
×
778
}
779

780
int32_t resetVirtualTableMergeOperState(SOperatorInfo* pOper) {
9,462,964✔
781
  int32_t code = 0, lino = 0;
9,462,964✔
782
  SVirtualScanMergeOperatorInfo* pMergeInfo = pOper->info;
9,462,964✔
783
  SVirtualScanPhysiNode* pPhynode = (SVirtualScanPhysiNode*)pOper->pPhyNode;
9,462,964✔
784
  SVirtualTableScanInfo* pInfo = &pMergeInfo->virtualScanInfo;
9,464,050✔
785
  
786
  pOper->status = OP_NOT_OPENED;
9,464,050✔
787
  resetBasicOperatorState(&pMergeInfo->binfo);
9,464,050✔
788

789
  tsortDestroySortHandle(pInfo->pSortHandle);
9,462,421✔
790
  pInfo->pSortHandle = NULL;
9,460,792✔
791
  // taosArrayDestroy(pInfo->pSortInfo);
792
  // pInfo->pSortInfo = NULL;
793

794
  blockDataDestroy(pInfo->pIntermediateBlock);
9,461,335✔
795
  pInfo->pIntermediateBlock = NULL;
9,461,335✔
796

797
  blockDataDestroy(pInfo->pInputBlock);
9,460,792✔
798
  pInfo->pInputBlock = createDataBlockFromDescNode(((SPhysiNode*)pPhynode)->pOutputDataBlockDesc);
9,462,964✔
799
  TSDB_CHECK_NULL(pInfo->pInputBlock, code, lino, _exit, terrno)
9,463,507!
800

801
  pInfo->tagDownStreamId = -1;
9,463,507✔
802

803
  if (pInfo->pSortCtxList) {
9,463,507✔
804
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSortCtxList); i++) {
64,823✔
805
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pInfo->pSortCtxList, i);
39,742✔
806
      blockDataDestroy(pCtx->pIntermediateBlock);
39,742✔
807
      taosMemoryFree(pCtx);
39,742!
808
    }
809
    taosArrayDestroy(pInfo->pSortCtxList);
25,081✔
810
    pInfo->pSortCtxList = NULL;
25,081✔
811
  }
812

813
  pMergeInfo->pSavedTuple = NULL;
9,461,878✔
814
  pMergeInfo->pSavedTagBlock = NULL;
9,462,421✔
815

816
_exit:
9,464,050✔
817

818
  if (code) {
9,464,050!
819
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
820
  }
821

822
  return code;
9,458,077✔
823
}
824

825
int32_t createVirtualTableMergeOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
3,001,615✔
826
                                            SVirtualScanPhysiNode* pVirtualScanPhyNode,
827
                                            SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
828
  SPhysiNode*                    pPhyNode = (SPhysiNode*)pVirtualScanPhyNode;
3,001,615✔
829
  int32_t                        lino = 0;
3,001,615✔
830
  int32_t                        code = TSDB_CODE_SUCCESS;
3,001,615✔
831
  SVirtualScanMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SVirtualScanMergeOperatorInfo));
3,001,615!
832
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,001,615!
833
  SDataBlockDescNode*            pDescNode = pPhyNode->pOutputDataBlockDesc;
3,001,615✔
834
  SNodeList*                     pMergeKeys = NULL;
3,001,615✔
835

836
  QUERY_CHECK_NULL(pInfo, code, lino, _return, terrno)
3,001,615!
837
  QUERY_CHECK_NULL(pOperator, code, lino, _return, terrno)
3,001,615!
838

839
  pOperator->pPhyNode = pVirtualScanPhyNode;
3,001,615✔
840

841
  pInfo->binfo.inputTsOrder = pVirtualScanPhyNode->scan.node.inputTsOrder;
3,001,615✔
842
  pInfo->binfo.outputTsOrder = pVirtualScanPhyNode->scan.node.outputTsOrder;
3,001,615✔
843

844
  SVirtualTableScanInfo* pVirtualScanInfo = &pInfo->virtualScanInfo;
3,001,615✔
845
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
3,001,615✔
846
  TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _return, terrno)
3,001,615!
847

848
  SSDataBlock* pInputBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
3,001,615✔
849
  TSDB_CHECK_NULL(pInputBlock, code, lino, _return, terrno)
3,001,615!
850
  pVirtualScanInfo->pInputBlock = pInputBlock;
3,001,615✔
851
  pVirtualScanInfo->tagDownStreamId = -1;
3,001,615✔
852
  pVirtualScanInfo->vtableUid = (tb_uid_t)pVirtualScanPhyNode->scan.uid;
3,001,615✔
853
  if (pVirtualScanPhyNode->scan.pScanPseudoCols != NULL) {
3,001,615✔
854
    SExprSupp* pSup = &pVirtualScanInfo->base.pseudoSup;
789,122✔
855
    pSup->pExprInfo = NULL;
789,122✔
856
    VTS_ERR_JRET(createExprInfo(pVirtualScanPhyNode->scan.pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs));
789,122!
857

858
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset,
789,122✔
859
                                      &pTaskInfo->storageAPI.functionStore);
860
    TSDB_CHECK_NULL(pSup->pCtx, code, lino, _return, terrno)
789,122!
861
  }
862

863
  initResultSizeInfo(&pOperator->resultInfo, 1024);
3,001,615✔
864
  TSDB_CHECK_CODE(blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity), lino, _return);
3,001,615!
865

866
  size_t  numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock);
3,001,615✔
867
  int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
3,001,615✔
868

869
  if (!pVirtualScanPhyNode->scan.node.dynamicOp) {
3,001,615!
870
    VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, 0));
2,274,068!
871
    pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys);
2,274,068✔
872
    TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno)
2,274,068!
873
  } else {
874
    pTaskInfo->dynamicTask = true;
727,547✔
875
  }
876
  pVirtualScanInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols);
3,001,615✔
877
  pVirtualScanInfo->sortBufSize =
3,001,615✔
878
      pVirtualScanInfo->bufPageSize * (numOfDownstream + 1);  // one additional is reserved for merged result.
3,001,615✔
879
  VTS_ERR_JRET(
3,001,615!
880
      extractColMap(pVirtualScanPhyNode->pTargets, &pVirtualScanInfo->dataSlotMap, &pVirtualScanInfo->tsSlotId, &pVirtualScanInfo->tagBlockId));
881

882
  pVirtualScanInfo->scanAllCols = pVirtualScanPhyNode->scanAllCols;
3,001,615!
883

884
  VTS_ERR_JRET(filterInitFromNode((SNode*)pVirtualScanPhyNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo,
3,001,615!
885
                                  0, pTaskInfo->pStreamRuntimeInfo));
886

887
  pVirtualScanInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
3,001,615✔
888
  QUERY_CHECK_NULL(pVirtualScanInfo->base.metaCache.pTableMetaEntryCache, code, lino, _return, terrno)
3,001,615!
889

890
  setOperatorInfo(pOperator, "VirtualTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN, false,
3,001,615✔
891
                  OP_NOT_OPENED, pInfo, pTaskInfo);
892
  pOperator->fpSet =
893
      createOperatorFpSet(openVirtualTableScanOperator, virtualTableGetNext, NULL, destroyVirtualTableScanOperatorInfo,
3,001,615✔
894
                          optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
895
  setOperatorResetStateFn(pOperator, resetVirtualTableMergeOperState);
3,001,615✔
896

897
  if (NULL != pDownstream) {
3,001,615✔
898
    VTS_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));
2,980,777!
899
  } else {
900
    pVirtualScanInfo->tagDownStreamId = -1;
20,838✔
901
  }
902

903
  nodesDestroyList(pMergeKeys);
3,001,615✔
904
  *pOptrInfo = pOperator;
3,001,615✔
905
  return TSDB_CODE_SUCCESS;
3,001,615✔
906

UNCOV
907
_return:
×
UNCOV
908
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
909
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
910
  }
UNCOV
911
  if (pInfo != NULL) {
×
UNCOV
912
    destroyVirtualTableScanOperatorInfo(pInfo);
×
913
  }
UNCOV
914
  nodesDestroyList(pMergeKeys);
×
UNCOV
915
  pTaskInfo->code = code;
×
UNCOV
916
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
UNCOV
917
  return code;
×
918
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc