• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4790

15 Oct 2025 05:45AM UTC coverage: 60.906% (-0.004%) from 60.91%
#4790

push

travis-ci

web-flow
Merge b5da8883b into 01668ff4e

154696 of 324376 branches covered (47.69%)

Branch coverage included in aggregate %.

186 of 217 new or added lines in 11 files covered. (85.71%)

2655 existing lines in 9 files now uncovered.

206920 of 269356 relevant lines covered (76.82%)

125086749.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.78
/source/libs/executor/src/virtualtablescanoperator.c
1
/*
2
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
*
4
* This program is free software: you can use, redistribute, and/or modify
5
* it under the terms of the GNU Affero General Public License, version 3
6
* or later ("AGPL"), as published by the Free Software Foundation.
7
*
8
* This program is distributed in the hope that it will be useful, but WITHOUT
9
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
* FITNESS FOR A PARTICULAR PURPOSE.
11
*
12
* You should have received a copy of the GNU Affero General Public License
13
* along with this program. If not, see <http://www.gnu.org/licenses/>.
14
*/
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "tdatablock.h"
21
#include "virtualtablescan.h"
22
#include "tsort.h"
23

24
typedef struct SVirtualTableScanInfo {
25
  STableScanBase base;
26
  SArray*        pSortInfo;
27
  SSortHandle*   pSortHandle;
28
  int32_t        bufPageSize;
29
  uint32_t       sortBufSize;  // max buffer size for in-memory sort
30
  SSDataBlock*   pIntermediateBlock;   // to hold the intermediate result
31
  SSDataBlock*   pInputBlock;
32
  SSHashObj*     dataSlotMap;
33
  int32_t        tsSlotId;
34
  int32_t        tagBlockId;
35
  int32_t        tagDownStreamId;
36
  bool           scanAllCols;
37
  SArray*        pSortCtxList;
38
  tb_uid_t       vtableUid;  // virtual table uid, used to identify the vtable scan operator
39
} SVirtualTableScanInfo;
40

41
typedef struct SVirtualScanMergeOperatorInfo {
42
  SOptrBasicInfo        binfo;
43
  EMergeType            type;
44
  SVirtualTableScanInfo virtualScanInfo;
45
  bool                  ignoreGroupId;
46
  uint64_t              groupId;
47
  STupleHandle*         pSavedTuple;
48
  SSDataBlock*          pSavedTagBlock;
49
} SVirtualScanMergeOperatorInfo;
50

51
typedef struct SLoadNextCtx {
52
  SOperatorInfo*  pOperator;
53
  SOperatorParam* pOperatorGetParam;
54
  int32_t         blockId;
55
  STimeWindow     window;
56
  SSDataBlock*    pIntermediateBlock;
57
  col_id_t        tsSlotId;
58
} SLoadNextCtx;
59

60
int32_t virtualScanloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
22,708,130✔
61
  SOperatorInfo* pOperator = (SOperatorInfo*)param;
22,708,130✔
62
  int32_t        code = TSDB_CODE_SUCCESS;
22,708,130✔
63
  int32_t        line = 0;
64

22,708,130!
65
  VTS_ERR_JRET(pOperator->fpSet.getNextFn(pOperator, ppBlock));
22,708,130!
66
  VTS_ERR_JRET(blockDataCheck(*ppBlock));
67
  if (*ppBlock) {
22,708,130✔
NEW
68
    SColumnInfoData* p = taosArrayGet((*ppBlock)->pDataBlock, 0);
×
NEW
69
    QUERY_CHECK_NULL(p, code, line, _return, terrno);
×
NEW
70
    // ts column will never have null value. set hasNull = false here can accelerate the sort
×
71
    p->hasNull = false;
72
  }
73

20,699,218✔
74
  return code;
20,699,218✔
75
_return:
20,699,218✔
76
  qError("failed to load data block from downstream, %s code:%s", __func__, tstrerror(code));
20,699,218✔
77
  return code;
42,969,592!
78
}
42,969,592✔
79

20,699,218✔
80
int32_t getTimeWindowOfBlock(SSDataBlock *pBlock, col_id_t tsSlotId, int64_t *startTs, int64_t *endTs) {
20,699,218✔
81
  int32_t code = TSDB_CODE_SUCCESS;
82
  int32_t lino = 0;
83
  int32_t tsIndex = -1;
84
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
20,699,218!
UNCOV
85
    if (((SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i))->info.colId == tsSlotId) {
×
86
      tsIndex = i;
87
      break;
88
    }
20,699,218✔
89
  }
20,699,218!
90

91
  if (tsIndex == -1) {
20,699,218✔
92
    tsIndex = (int32_t)taosArrayGetSize(pBlock->pDataBlock) - 1;
20,699,218✔
93
  }
94

20,699,218✔
UNCOV
95
  SColumnInfoData *pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, tsIndex);
×
UNCOV
96
  QUERY_CHECK_NULL(pColData, code, lino, _return, terrno)
×
NEW
97
  // ts column will never have null value. set hasNull = false here can accelerate the sort
×
98
  pColData->hasNull = false;
99

100
  GET_TYPED_DATA(*startTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, 0), 0);
27,728,342✔
101
  GET_TYPED_DATA(*endTs, int64_t, TSDB_DATA_TYPE_TIMESTAMP, colDataGetNumData(pColData, pBlock->info.rows - 1), 0);
27,728,342✔
102

27,728,342✔
103
  return code;
27,728,342✔
104
_return:
27,728,342✔
105
  qError("failed to get time window of block, %s code:%s, line:%d", __func__, tstrerror(code), lino);
27,728,342✔
106
  return code;
27,728,342✔
107
}
108

27,728,342✔
109
int32_t virtualScanloadNextDataBlockFromParam(void* param, SSDataBlock** ppBlock) {
27,728,342✔
110
  SLoadNextCtx*           pCtx = (SLoadNextCtx*)param;
27,728,342✔
111
  SOperatorInfo*          pOperator = pCtx->pOperator;
20,648,908✔
112
  SOperatorParam*         pOperatorGetParam = pCtx->pOperatorGetParam;
20,648,908✔
113
  int32_t                 code = TSDB_CODE_SUCCESS;
114
  SSDataBlock*            pRes = NULL;
115
  SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperatorGetParam->value;
27,728,342!
116

117
  pParam->basic.window = pCtx->window;
27,728,342!
118
  pOperator->status = OP_NOT_OPENED;
27,728,342✔
119
  if (pCtx->pIntermediateBlock) {
20,699,218✔
120
    blockDataDestroy(pCtx->pIntermediateBlock);
20,699,218✔
121
    pCtx->pIntermediateBlock = NULL;
20,699,218!
122
  }
20,699,218!
123

20,699,218✔
124
  VTS_ERR_JRET(pOperator->fpSet.getNextExtFn(pOperator, pOperatorGetParam, &pRes));
125

7,029,124✔
126
  VTS_ERR_JRET(blockDataCheck(pRes));
7,029,124✔
127
  if ((pRes)) {
128
    qDebug("%s load from downstream, blockId:%d", __func__, pCtx->blockId);
129
    (pRes)->info.id.blockId = pCtx->blockId;
27,728,342✔
UNCOV
130
    VTS_ERR_JRET(getTimeWindowOfBlock(pRes, pCtx->tsSlotId, &pCtx->window.skey, &pCtx->window.ekey));
×
UNCOV
131
    VTS_ERR_JRET(createOneDataBlock(pRes, true, &pCtx->pIntermediateBlock));
×
UNCOV
132
    *ppBlock = pCtx->pIntermediateBlock;
×
133
  } else {
134
    pCtx->window.ekey = INT64_MAX;
135
    *ppBlock = NULL;
4,635,167✔
136
  }
4,635,167✔
137

4,635,167✔
138
  return code;
4,635,167✔
139
_return:
4,635,167✔
140
  qError("failed to load data block from downstream, %s code:%s", __func__, tstrerror(code));
141
  return code;
4,635,167!
142
}
143

4,635,167!
144
int32_t makeTSMergeKey(SNodeList** pMergeKeys, col_id_t tsSlotId) {
4,635,167✔
145
  int32_t           code = TSDB_CODE_SUCCESS;
146
  SNodeList        *pNodeList = NULL;
4,635,167!
147
  SColumnNode      *pColumnNode = NULL;
4,635,167✔
148
  SOrderByExprNode *pOrderByExprNode = NULL;
4,635,167✔
149

4,635,167✔
150
  VTS_ERR_JRET(nodesMakeList(&pNodeList));
151

4,635,167!
152
  VTS_ERR_JRET(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pColumnNode));
153
  pColumnNode->slotId = tsSlotId;
4,635,167✔
154

4,635,167✔
UNCOV
155
  VTS_ERR_JRET(nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR, (SNode**)&pOrderByExprNode));
×
UNCOV
156
  pOrderByExprNode->pExpr = (SNode*)pColumnNode;
×
UNCOV
157
  pOrderByExprNode->order = ORDER_ASC;
×
UNCOV
158
  pOrderByExprNode->nullOrder = NULL_ORDER_FIRST;
×
UNCOV
159

×
160
  VTS_ERR_JRET(nodesListAppend(pNodeList, (SNode*)pOrderByExprNode));
161

162
  *pMergeKeys = pNodeList;
2,651,098✔
163
  return code;
2,651,098✔
164
_return:
2,001,245✔
165
  nodesDestroyNode((SNode*)pColumnNode);
2,001,245✔
166
  nodesDestroyNode((SNode*)pOrderByExprNode);
167
  nodesDestroyList(pNodeList);
2,651,098✔
168
  return code;
1,829,865✔
169
}
1,829,865✔
170

171
void cleanUpVirtualScanInfo(SVirtualTableScanInfo* pVirtualScanInfo) {
2,651,098✔
172
  if (pVirtualScanInfo->pSortInfo) {
6,478,908✔
173
    taosArrayDestroy(pVirtualScanInfo->pSortInfo);
4,649,043✔
174
    pVirtualScanInfo->pSortInfo = NULL;
4,649,043✔
175
  }
4,649,043!
176
  if (pVirtualScanInfo->pSortHandle) {
177
    tsortDestroySortHandle(pVirtualScanInfo->pSortHandle);
1,829,865✔
178
    pVirtualScanInfo->pSortHandle = NULL;
179
  }
2,651,098✔
180
  if (pVirtualScanInfo->pSortCtxList) {
181
    for (int32_t i = 0; i < taosArrayGetSize(pVirtualScanInfo->pSortCtxList); i++) {
2,651,098✔
182
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pVirtualScanInfo->pSortCtxList, i);
2,651,098✔
183
      blockDataDestroy(pCtx->pIntermediateBlock);
2,651,098✔
184
      taosMemoryFree(pCtx);
2,651,098✔
185
    }
2,651,098✔
186
    taosArrayDestroy(pVirtualScanInfo->pSortCtxList);
2,651,098✔
187
  }
2,651,098✔
188
}
2,651,098✔
189

2,651,098!
190
int32_t createSortHandleFromParam(SOperatorInfo* pOperator) {
2,651,098✔
191
  int32_t                         code = TSDB_CODE_SUCCESS;
2,651,098✔
192
  int32_t                         lino = 0;
2,651,098✔
193
  SVirtualScanMergeOperatorInfo*  pInfo = pOperator->info;
194
  SVirtualTableScanInfo*          pVirtualScanInfo = &pInfo->virtualScanInfo;
2,651,098✔
195
  SVTableScanOperatorParam *      pParam = (SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value;
2,651,098!
196
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
2,651,098✔
197
  pVirtualScanInfo->sortBufSize = pVirtualScanInfo->bufPageSize * (taosArrayGetSize((pParam)->pOpParamArray) + 1);
2,651,098!
198
  int32_t                         numOfBufPage = (int32_t)pVirtualScanInfo->sortBufSize / pVirtualScanInfo->bufPageSize;
2,651,098✔
199
  SNodeList*                      pMergeKeys = NULL;
200
  SSortSource*                    ps = NULL;
2,651,098!
201
  int32_t                         scanOpIndex = 0;
202

203
  cleanUpVirtualScanInfo(pVirtualScanInfo);
2,651,098✔
204
  VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, pVirtualScanInfo->tsSlotId));
2,651,098✔
205
  pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys);
206
  TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno)
2,651,098!
UNCOV
207
  nodesDestroyList(pMergeKeys);
×
UNCOV
208

×
209
  VTS_ERR_JRET(tsortCreateSortHandle(pVirtualScanInfo->pSortInfo, SORT_MULTISOURCE_TS_MERGE, pVirtualScanInfo->bufPageSize,
210
                                     numOfBufPage, pVirtualScanInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pVirtualScanInfo->pSortHandle));
211

2,651,098✔
212
  tsortSetForceUsePQSort(pVirtualScanInfo->pSortHandle);
7,225,166✔
213
  tsortSetFetchRawDataFp(pVirtualScanInfo->pSortHandle, virtualScanloadNextDataBlockFromParam, NULL, NULL);
4,574,068✔
214

4,574,068✔
215
  if (pOperator->numOfDownstream > 2) {
216
    qError("virtual scan operator should not have more than 2 downstreams, current numOfDownstream:%d", pOperator->numOfDownstream);
1,922,970✔
217
    VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM);
1,922,970✔
218
  }
1,922,970✔
219

220
  pVirtualScanInfo->tagDownStreamId = -1;
221
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
2,651,098✔
222
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
223
    if (pDownstream->resultDataBlockId == pVirtualScanInfo->tagBlockId) {
2,651,098✔
224
      // tag block do not need sort
2,651,098✔
225
      pVirtualScanInfo->tagDownStreamId = i;
2,651,098!
226
      pInfo->pSavedTagBlock = NULL;
9,730,532✔
227
      continue;
7,079,434✔
228
    }
7,079,434✔
229
  }
7,079,434✔
230
  scanOpIndex = pVirtualScanInfo->tagDownStreamId == -1 ? 0 : 1 - pVirtualScanInfo->tagDownStreamId;
231

7,079,434!
232
  pOperator->pDownstream[scanOpIndex]->status = OP_NOT_OPENED;
7,079,434!
233
  pVirtualScanInfo->pSortCtxList = taosArrayInit(taosArrayGetSize((pParam)->pOpParamArray), POINTER_BYTES);
7,079,434✔
234
  TSDB_CHECK_NULL(pVirtualScanInfo->pSortCtxList, code, lino, _return, terrno)
7,079,434✔
235
  for (int32_t i = 0; i < taosArrayGetSize((pParam)->pOpParamArray); i++) {
7,079,434✔
236
    SOperatorParam* pOpParam = *(SOperatorParam**)taosArrayGet((pParam)->pOpParamArray, i);
7,079,434✔
237
    SLoadNextCtx*   pCtx = NULL;
7,079,434✔
238
    ps = NULL;
7,079,434✔
239

240
    pCtx = taosMemoryMalloc(sizeof(SLoadNextCtx));
7,079,434!
241
    QUERY_CHECK_NULL(pCtx, code, lino, _return, terrno)
7,079,434!
242
    pCtx->blockId = i;
243
    pCtx->pOperator = pOperator->pDownstream[scanOpIndex];
7,079,434✔
244
    pCtx->pOperatorGetParam = pOpParam;
7,079,434✔
245
    pCtx->window = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN};
246
    pCtx->pIntermediateBlock = NULL;
7,079,434!
247
    pCtx->tsSlotId = (col_id_t)pVirtualScanInfo->tsSlotId;
14,158,868!
248

249
    ps = taosMemoryCalloc(1, sizeof(SSortSource));
250
    QUERY_CHECK_NULL(ps, code, lino, _return, terrno)
2,651,098!
251

252
    ps->param = pCtx;
2,651,098✔
UNCOV
253
    ps->onlyRef = true;
×
UNCOV
254

×
UNCOV
255
    VTS_ERR_JRET(tsortAddSource(pVirtualScanInfo->pSortHandle, ps));
×
256
    QUERY_CHECK_NULL(taosArrayPush(pVirtualScanInfo->pSortCtxList, &pCtx), code, lino, _return, terrno)
UNCOV
257
  }
×
UNCOV
258

×
UNCOV
259
  VTS_ERR_JRET(tsortOpen(pVirtualScanInfo->pSortHandle));
×
260

UNCOV
261
  return code;
×
262
_return:
263
  if (code != 0){
264
    qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
1,965,099✔
265
  }
1,965,099✔
266
  nodesDestroyList(pMergeKeys);
1,965,099✔
267
  if (ps != NULL) {
1,965,099✔
268
    taosMemoryFree(ps);
1,965,099!
269
  }
1,965,099✔
270
  return code;
1,965,099✔
271
}
1,965,099✔
272

273
int32_t createSortHandle(SOperatorInfo* pOperator) {
1,965,099!
274
  SVirtualScanMergeOperatorInfo * pInfo = pOperator->info;
275
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
276
  SVirtualTableScanInfo*          pVirtualScanInfo = &pInfo->virtualScanInfo;
1,965,099✔
277
  int32_t                         numOfBufPage = (int32_t)pVirtualScanInfo->sortBufSize / pVirtualScanInfo->bufPageSize;
1,965,099✔
278
  SSortSource*                    ps = NULL;
279
  int32_t                         code = 0;
9,246,375✔
280
  int32_t                         lino = 0;
7,281,276✔
281

7,281,276!
282
  VTS_ERR_JRET(tsortCreateSortHandle(pVirtualScanInfo->pSortInfo, SORT_MULTISOURCE_TS_MERGE, pVirtualScanInfo->bufPageSize,
7,281,276!
283
                                     numOfBufPage, pVirtualScanInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pVirtualScanInfo->pSortHandle));
UNCOV
284

×
285
  tsortSetForceUsePQSort(pVirtualScanInfo->pSortHandle);
286
  tsortSetFetchRawDataFp(pVirtualScanInfo->pSortHandle, virtualScanloadNextDataBlock, NULL, NULL);
287

7,281,276✔
288
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
289
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
196,088✔
290
    if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
196,088✔
291
      VTS_ERR_JRET(pDownstream->fpSet._openFn(pDownstream));
292
    } else {
293
      VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM);
7,085,188!
294
    }
7,085,188!
295

296
    if (pDownstream->resultDataBlockId == pVirtualScanInfo->tagBlockId) {
7,085,188✔
297
      // tag block do not need sort
7,085,188✔
298
      pVirtualScanInfo->tagDownStreamId = i;
299
      continue;
7,085,188!
300
    }
7,085,188✔
301

302
    ps = taosMemoryCalloc(1, sizeof(SSortSource));
303
    TSDB_CHECK_NULL(ps, code, lino, _return, terrno)
1,965,099!
304

305
    ps->param = pDownstream;
1,965,099✔
306
    ps->onlyRef = true;
1,965,099!
UNCOV
307

×
308
    VTS_ERR_JRET(tsortAddSource(pVirtualScanInfo->pSortHandle, ps));
309
    ps = NULL;
1,965,099!
UNCOV
310
  }
×
311

312
  VTS_ERR_JRET(tsortOpen(pVirtualScanInfo->pSortHandle));
1,965,099✔
313

314
_return:
315
  if (code != 0){
4,635,167✔
316
    qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
4,635,167✔
317
  }
4,635,167✔
318
  if (ps != NULL) {
319
    taosMemoryFree(ps);
4,635,167✔
320
  }
18,970✔
321
  return code;
322
}
323

4,616,197✔
324
int32_t openVirtualTableScanOperatorImpl(SOperatorInfo* pOperator) {
2,651,098!
325
  int32_t                         code = 0;
326
  int32_t                         lino = 0;
1,965,099!
327

328
  if (pOperator->numOfDownstream == 0) {
329
    return code;
4,616,197✔
330
  }
UNCOV
331

×
UNCOV
332
  if (pOperator->pOperatorGetParam) {
×
UNCOV
333
    VTS_ERR_JRET(createSortHandleFromParam(pOperator));
×
334
  } else {
335
    VTS_ERR_JRET(createSortHandle(pOperator));
336
  }
53,951,525✔
337

53,951,525✔
338
  return code;
339

53,951,525✔
340
_return:
49,316,358✔
341
  qError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
342
  return code;
343
}
4,635,167✔
344

345
int32_t openVirtualTableScanOperator(SOperatorInfo* pOperator) {
4,635,167✔
346
  int32_t code = 0;
347

4,635,167✔
348
  if (OPTR_IS_OPENED(pOperator)) {
4,635,167✔
349
    return TSDB_CODE_SUCCESS;
350
  }
4,635,167!
351

352
  int64_t startTs = taosGetTimestampUs();
4,635,167✔
353

4,635,167✔
354
  code = openVirtualTableScanOperatorImpl(pOperator);
355

356
  pOperator->cost.openCost = (double)(taosGetTimestampUs() - startTs) / 1000.0;
32,001,792✔
357
  pOperator->status = OP_RES_TO_RETURN;
358

32,001,792✔
359
  VTS_ERR_RET(code);
32,001,792✔
360

32,001,792✔
361
  OPTR_SET_OPENED(pOperator);
32,001,792✔
362
  return code;
2,147,483,647✔
363
}
2,147,483,647✔
364

2,147,483,647✔
365
static int32_t doGetVtableMergedBlockData(SVirtualScanMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
2,147,483,647✔
366
                                          SSDataBlock* p) {
2,147,483,647!
367
  int32_t code = 0;
368
  int64_t lastTs = 0;
369
  int64_t rowNums = -1;
370
  blockDataEmpty(p);
28,399,035✔
371
  for (int32_t j = 0; j < taosArrayGetSize(p->pDataBlock); j++) {
28,399,035✔
372
    colDataSetNItemsNull(taosArrayGet(p->pDataBlock, j), 0, capacity);
373
  }
374
  while (1) {
2,147,483,647✔
375
    STupleHandle* pTupleHandle = NULL;
2,147,483,647✔
376
    if (!pInfo->pSavedTuple) {
2,147,483,647✔
377
      code = tsortNextTuple(pHandle, &pTupleHandle);
378
      if (pTupleHandle == NULL || (code != 0)) {
2,147,483,647!
379
        break;
2,147,483,647✔
380
      }
2,147,483,647✔
381
    } else {
130,560✔
382
      pTupleHandle = pInfo->pSavedTuple;
130,560✔
383
      pInfo->pSavedTuple = NULL;
130,560!
UNCOV
384
    }
×
UNCOV
385

×
386
    int32_t blockId = (int32_t)tsortGetBlockId(pTupleHandle);
NEW
387
    int32_t colNum = pInfo->virtualScanInfo.scanAllCols ? 1 : tsortGetColNum(pTupleHandle);
×
NEW
388
    for (int32_t i = 0; i < colNum; i++) {
×
389
      char* pData = NULL;
390
      tsortGetValue(pTupleHandle, i, (void**)&pData);
391
      if (pData != NULL) {
130,560✔
392
        if (i == 0) {
393
          if (lastTs != *(int64_t*)pData) {
394
            if (rowNums >= capacity - 1) {
2,147,483,647✔
395
              pInfo->pSavedTuple = pTupleHandle;
2,147,483,647✔
396
              goto _return;
397
            }
2,147,483,647✔
398
            rowNums++;
2,147,483,647✔
399
            if (pInfo->virtualScanInfo.tsSlotId != -1) {
2,147,483,647✔
400
              VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums, pData, false));
2,147,483,647✔
401
            }
28,688,643✔
402
            lastTs = *(int64_t*)pData;
28,688,643✔
403
          }
404
          continue;
2,147,483,647✔
405
        }
2,147,483,647✔
406
        int32_t slotKey = blockId << 16 | i;
2,147,483,647✔
407
        void*   slotId = tSimpleHashGet(pInfo->virtualScanInfo.dataSlotMap, &slotKey, sizeof(slotKey));
408
        if (slotId == NULL) {
2,147,483,647✔
409
          qError("failed to get slotId from dataSlotMap, blockId:%d, slotId:%d", blockId, i);
2,147,483,647!
410
          VTS_ERR_RET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
411
        }
2,147,483,647✔
412
        VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, *(int32_t *)slotId), rowNums, pData, false));
413
      }
414
    }
2,147,483,647✔
415
  }
2,147,483,647✔
416
_return:
2,147,483,647✔
417
  p->info.rows = rowNums + 1;
2,147,483,647!
418
  p->info.dataLoad = 1;
2,147,483,647✔
419
  p->info.scanFlag = MAIN_SCAN;
UNCOV
420
  return code;
×
UNCOV
421
}
×
422

423
static int32_t doGetVStableMergedBlockData(SVirtualScanMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
424
                                           SSDataBlock* p) {
2,147,483,647!
425
  int32_t code = 0;
426
  int64_t lastTs = 0;
427
  int64_t rowNums = -1;
428
  blockDataEmpty(p);
429
  for (int32_t j = 0; j < taosArrayGetSize(p->pDataBlock); j++) {
32,001,175✔
430
    colDataSetNItemsNull(taosArrayGet(p->pDataBlock, j), 0, capacity);
32,001,175✔
431
  }
32,001,792✔
432
  while (1) {
32,001,792✔
433
    STupleHandle* pTupleHandle = NULL;
32,001,792✔
434
    if (!pInfo->pSavedTuple) {
435
      code = tsortNextTuple(pHandle, &pTupleHandle);
436
      if (pTupleHandle == NULL || (code != 0)) {
34,529,428✔
437
        break;
438
      }
34,529,428✔
439
    } else {
34,529,428✔
440
      pTupleHandle = pInfo->pSavedTuple;
34,529,428✔
441
      pInfo->pSavedTuple = NULL;
34,529,428✔
442
    }
2,147,483,647✔
443

2,147,483,647✔
444
    int32_t tsIndex = -1;
2,147,483,647✔
445
    int32_t colNum = tsortGetColNum(pTupleHandle);
2,147,483,647✔
446

2,147,483,647!
447
    for (int32_t i = 0; i < colNum; i++) {
448
      SColumnInfoData *pColInfo = NULL;
449
      tsortGetColumnInfo(pTupleHandle, i, &pColInfo);
450
      if (pColInfo && pColInfo->info.slotId ==  pInfo->virtualScanInfo.tsSlotId) {
29,931,981✔
451
        tsIndex = i;
29,931,981✔
452
        break;
453
      }
454
    }
2,147,483,647✔
455

456
    if (tsIndex == -1) {
2,147,483,647!
457
      tsIndex = colNum - 1;
2,147,483,647✔
458
    }
2,147,483,647✔
459

460
    char* pData = NULL;
2,147,483,647✔
461
    // first, set ts slot's data
2,147,483,647✔
462
    // then, set other slots' data
2,147,483,647✔
463
    tsortGetValue(pTupleHandle, tsIndex, (void**)&pData);
2,147,483,647✔
464

2,147,483,647✔
465
    if (pData != NULL) {
466
      if (lastTs != *(int64_t*)pData) {
467
        if (rowNums >= capacity - 1) {
468
          pInfo->pSavedTuple = pTupleHandle;
469
          goto _return;
2,147,483,647!
UNCOV
470
        }
×
471
        rowNums++;
472
        if (pInfo->virtualScanInfo.tsSlotId != -1) {
473
          VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, pInfo->virtualScanInfo.tsSlotId), rowNums, pData, false));
2,147,483,647✔
474
        }
475
        lastTs = *(int64_t*)pData;
476
      }
2,147,483,647✔
477
    }
478
    if (pInfo->virtualScanInfo.scanAllCols) {
2,147,483,647!
479
      continue;
2,147,483,647✔
480
    }
2,147,483,647✔
481

29,945,481✔
482
    for (int32_t i = 0; i < colNum; i++) {
29,945,481✔
483
      if (i == tsIndex || tsortIsNullVal(pTupleHandle, i)) {
484
        continue;
2,147,483,647✔
485
      }
2,147,483,647✔
486

2,147,483,647✔
487
      tsortGetValue(pTupleHandle, i, (void**)&pData);
488

2,147,483,647!
489
      if (pData != NULL) {
2,147,483,647!
490
        VTS_ERR_RET(colDataSetVal(taosArrayGet(p->pDataBlock, i), rowNums, pData, false));
491
      }
2,147,483,647✔
492
    }
493
  }
494
_return:
2,147,483,647!
495
  p->info.rows = rowNums + 1;
2,147,483,647✔
496
  p->info.dataLoad = 1;
497
  p->info.scanFlag = MAIN_SCAN;
498
  return code;
2,147,483,647✔
499
}
2,147,483,647✔
500

2,147,483,647✔
501
int32_t doVirtualTableMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
502
  int32_t                        code = 0;
503
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
2,147,483,647✔
504
  SVirtualScanMergeOperatorInfo* pInfo = pOperator->info;
2,147,483,647✔
505
  SVirtualTableScanInfo*         pVirtualScanInfo = &pInfo->virtualScanInfo;
2,147,483,647✔
506
  SSortHandle*                   pHandle = pVirtualScanInfo->pSortHandle;
507
  SSDataBlock*                   pDataBlock = pInfo->binfo.pRes;
2,147,483,647!
508
  int32_t                        capacity = pOperator->resultInfo.capacity;
2,147,483,647!
509

510
  qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
511
  blockDataCleanup(pDataBlock);
512

34,529,428✔
513
  if (pHandle == NULL) {
34,529,428✔
514
    return TSDB_CODE_SUCCESS;
34,529,428✔
515
  }
34,529,428✔
516

34,529,428✔
517
  if (pVirtualScanInfo->pIntermediateBlock == NULL) {
518
    VTS_ERR_RET(tsortGetSortedDataBlock(pHandle, &pVirtualScanInfo->pIntermediateBlock));
519
    if (pVirtualScanInfo->pIntermediateBlock == NULL) {
66,550,190✔
520
      return TSDB_CODE_SUCCESS;
66,550,190✔
521
    }
66,550,190✔
522

66,550,190✔
523
    VTS_ERR_RET(blockDataEnsureCapacity(pVirtualScanInfo->pIntermediateBlock, capacity));
66,549,573✔
524
  } else {
66,549,573✔
525
    blockDataCleanup(pVirtualScanInfo->pIntermediateBlock);
66,550,190✔
526
  }
66,550,190✔
527

528
  SSDataBlock* p = pVirtualScanInfo->pIntermediateBlock;
66,550,190✔
529
  if (pOperator->pOperatorGetParam) {
66,550,190✔
530
    VTS_ERR_RET(doGetVStableMergedBlockData(pInfo, pHandle, capacity, p));
531
  } else {
66,550,190✔
532
    VTS_ERR_RET(doGetVtableMergedBlockData(pInfo, pHandle, capacity, p));
18,970✔
533
  }
534

535
  VTS_ERR_RET(copyDataBlock(pDataBlock, p));
66,531,220✔
536
  qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
2,786,332!
537
         pDataBlock->info.rows);
2,786,332!
UNCOV
538

×
539
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
540
  return code;
541
}
2,786,332!
542

543
int32_t vtableAddTagPseudoColumnData(const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* tagBlock, SSDataBlock* pBlock, int32_t rows) {
63,744,888✔
544
  int32_t          code = TSDB_CODE_SUCCESS;
545
  int32_t          lino = 0;
546
  int64_t          backupRows;
66,531,220✔
547
  // currently only the tbname pseudo column
66,531,220✔
548
  if (numOfExpr <= 0) {
34,529,428!
549
    return TSDB_CODE_SUCCESS;
550
  }
32,001,792!
551

552
  if (tagBlock == NULL) {
553
    return TSDB_CODE_SUCCESS;
66,530,603!
554
  }
66,531,220✔
555

556
  if (tagBlock->info.rows != 1) {
557
    qError("tag block should have only one row, current rows:%" PRId64, tagBlock->info.rows);
66,531,220✔
558
    VTS_ERR_JRET(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR);
66,531,220✔
559
  }
560

561
  backupRows = pBlock->info.rows;
26,683,588✔
562
  pBlock->info.rows = rows;
26,683,588✔
563
  for (int32_t j = 0; j < numOfExpr; ++j) {
26,683,588✔
564
    const SExprInfo* pExpr1 = &pExpr[j];
565
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
566

26,683,588!
UNCOV
567
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
×
568
    TSDB_CHECK_NULL(pColInfoData, code, lino, _return, terrno)
569
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
570

26,683,588!
UNCOV
571
    SColumnInfoData* pTagInfoData = taosArrayGet(tagBlock->pDataBlock, j);
×
572
    TSDB_CHECK_NULL(pTagInfoData, code, lino, _return, terrno)
573

574
    if (colDataIsNull_s(pTagInfoData, 0) || IS_JSON_NULL(pTagInfoData->info.type, colDataGetData(pTagInfoData, 0))) {
26,683,588!
575
      colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
576
      continue;
×
577
    }
578

579
    char* data = colDataGetData(pTagInfoData, 0);
26,683,588✔
580

26,683,588✔
581
    if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
99,654,932✔
582
      code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, 1, false);
72,971,344✔
583
      QUERY_CHECK_CODE(code, lino, _return);
72,971,344✔
584
    } else {  // todo opt for json tag
585
      for (int32_t i = 0; i < pBlock->info.rows; ++i) {
72,971,344✔
586
        code = colDataSetVal(pColInfoData, i, data, false);
72,971,344!
587
        QUERY_CHECK_CODE(code, lino, _return);
72,971,344✔
588
      }
589
    }
72,971,344✔
590
  }
72,971,344!
591

592
  // restore the rows
72,971,344!
UNCOV
593
  pBlock->info.rows = backupRows;
×
UNCOV
594

×
595
_return:
596

597
  if (code != TSDB_CODE_SUCCESS) {
72,971,344!
598
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
599
  }
72,971,344!
600
  return code;
72,971,344✔
601
}
72,971,344!
602

UNCOV
603
static int32_t doSetTagColumnData(SVirtualTableScanInfo* pInfo, SSDataBlock* pTagBlock, SSDataBlock* pBlock, int32_t rows) {
×
UNCOV
604
  int32_t         code = 0;
×
UNCOV
605
  STableScanBase* pTableScanInfo = &pInfo->base;
×
606
  SExprSupp*      pSup = &pTableScanInfo->pseudoSup;
607
  if (pSup->numOfExprs > 0) {
608
    VTS_ERR_RET(vtableAddTagPseudoColumnData(pSup->pExprInfo, pSup->numOfExprs, pTagBlock, pBlock, rows));
609
  }
610

611
  return code;
26,683,588✔
612
}
613

26,683,588✔
614
int32_t virtualTableGetNext(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
615
  int32_t                        code = TSDB_CODE_SUCCESS;
26,683,588!
UNCOV
616
  int32_t                        lino = 0;
×
617
  SVirtualScanMergeOperatorInfo* pInfo = pOperator->info;
618
  SVirtualTableScanInfo*         pVirtualScanInfo = &pInfo->virtualScanInfo;
26,683,588✔
619
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
620

621
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
62,218,131✔
622
    pResBlock = NULL;
62,218,131✔
623
    return code;
62,218,131✔
624
  }
62,218,131✔
625

62,218,131✔
626
  VTS_ERR_JRET(pOperator->fpSet._openFn(pOperator));
26,683,588!
627

628
  if (pVirtualScanInfo->tagBlockId != -1 && pVirtualScanInfo->tagDownStreamId != -1 && !pInfo->pSavedTagBlock) {
629
    SSDataBlock*   pTagBlock = NULL;
62,218,131✔
630
    SOperatorInfo *pTagScanOp = pOperator->pDownstream[pVirtualScanInfo->tagDownStreamId];
631
    if (pOperator->pOperatorGetParam) {
632
      SOperatorParam* pTagOp = ((SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value)->pTagScanOp;
53,951,525✔
633
      VTS_ERR_JRET(pTagScanOp->fpSet.getNextExtFn(pTagScanOp, pTagOp, &pTagBlock));
53,951,525✔
634
    } else {
53,951,525✔
635
      VTS_ERR_JRET(pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagBlock));
53,951,525✔
636
    }
53,951,525✔
637

53,951,525✔
638
    if (pTagBlock == NULL || pTagBlock->info.rows != 1) {
639
      VTS_ERR_JRET(TSDB_CODE_FAILED);
53,951,525!
UNCOV
640
    }
×
UNCOV
641
    pInfo->pSavedTagBlock = pTagBlock;
×
642
  }
643

644
  while(1) {
53,950,908!
645
    VTS_ERR_JRET(doVirtualTableMerge(pOperator, pResBlock));
646
    if (*pResBlock == NULL) {
53,951,525✔
647
      setOperatorCompleted(pOperator);
2,119,058✔
648
      break;
2,119,058✔
649
    }
2,119,058✔
650

1,922,970✔
651
    if (pOperator->pOperatorGetParam) {
1,922,970!
652
      uint64_t uid = ((SVTableScanOperatorParam*)pOperator->pOperatorGetParam->value)->uid;
653
      (*pResBlock)->info.id.uid = uid;
196,088!
654
    } else {
655
      (*pResBlock)->info.id.uid = pInfo->virtualScanInfo.vtableUid;
656
    }
2,119,058!
UNCOV
657

×
658
    VTS_ERR_JRET(doSetTagColumnData(pVirtualScanInfo, pInfo->pSavedTagBlock, (*pResBlock), (*pResBlock)->info.rows));
659
    VTS_ERR_JRET(doFilter(*pResBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL));
2,119,058✔
660
    if ((*pResBlock)->info.rows > 0) {
661
      break;
662
    }
663
  }
66,550,190!
664

66,550,190✔
665
  return code;
4,332,059✔
666
_return:
4,332,059✔
667
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
668
  pTaskInfo->code = code;
669
  T_LONG_JMP(pTaskInfo->env, code);
62,218,131✔
670
}
31,891,830✔
671

31,891,830✔
672
static void destroyTableScanBase(STableScanBase* pBase, TsdReader* pAPI) {
673
  cleanupQueryTableDataCond(&pBase->cond);
30,326,301✔
674

675
  if (pAPI->tsdReaderClose) {
676
    pAPI->tsdReaderClose(pBase->dataReader);
62,218,131!
677
  }
62,218,131!
678
  pBase->dataReader = NULL;
62,218,131✔
679

49,619,466✔
680
  if (pBase->matchInfo.pList != NULL) {
681
    taosArrayDestroy(pBase->matchInfo.pList);
682
  }
683

53,951,525✔
UNCOV
684
  taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
×
UNCOV
685
  cleanupExprSupp(&pBase->pseudoSup);
×
UNCOV
686
}
×
UNCOV
687

×
688
void destroyVirtualTableScanOperatorInfo(void* param) {
689
  if (!param) {
690
    return;
2,633,922✔
691
  }
2,633,922✔
692
  SVirtualScanMergeOperatorInfo* pOperatorInfo = (SVirtualScanMergeOperatorInfo*)param;
693
  SVirtualTableScanInfo* pInfo = &pOperatorInfo->virtualScanInfo;
2,633,922!
UNCOV
694
  blockDataDestroy(pOperatorInfo->binfo.pRes);
×
695
  pOperatorInfo->binfo.pRes = NULL;
696

2,633,922✔
697
  tsortDestroySortHandle(pInfo->pSortHandle);
698
  pInfo->pSortHandle = NULL;
2,633,922!
UNCOV
699
  taosArrayDestroy(pInfo->pSortInfo);
×
700
  pInfo->pSortInfo = NULL;
701

702
  blockDataDestroy(pInfo->pIntermediateBlock);
2,633,922✔
703
  pInfo->pIntermediateBlock = NULL;
2,633,922✔
704

2,633,922✔
705
  blockDataDestroy(pInfo->pInputBlock);
706
  pInfo->pInputBlock = NULL;
2,633,922✔
707
  destroyTableScanBase(&pInfo->base, &pInfo->base.readerAPI);
2,633,922!
UNCOV
708

×
709
  tSimpleHashCleanup(pInfo->dataSlotMap);
710

2,633,922✔
711
  if (pInfo->pSortCtxList) {
2,633,922✔
712
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSortCtxList); i++) {
2,633,922✔
713
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pInfo->pSortCtxList, i);
2,633,922✔
714
      blockDataDestroy(pCtx->pIntermediateBlock);
715
      taosMemoryFree(pCtx);
2,633,922✔
716
    }
2,633,922✔
717
    taosArrayDestroy(pInfo->pSortCtxList);
2,633,922✔
718
    pInfo->pSortCtxList = NULL;
2,633,922✔
719
  }
720
  taosMemoryFreeClear(param);
2,633,922✔
721
}
2,633,922✔
722

723
int32_t extractColMap(SNodeList* pNodeList, SSHashObj** pSlotMap, int32_t *tsSlotId, int32_t *tagBlockId) {
2,633,922✔
724
  size_t  numOfCols = LIST_LENGTH(pNodeList);
2,633,922✔
725
  int32_t code = TSDB_CODE_SUCCESS;
2,633,922✔
726
  int32_t lino = 0;
727

2,633,922✔
728
  if (numOfCols == 0) {
729
    return code;
2,633,922✔
730
  }
2,824,944✔
731

2,209,707✔
732
  *tsSlotId = -1;
2,209,707✔
733
  *tagBlockId = -1;
2,209,707!
734
  *pSlotMap = tSimpleHashInit(numOfCols, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
735
  TSDB_CHECK_NULL(*pSlotMap, code, lino, _return, terrno);
615,237✔
736

615,237✔
737
  for (int32_t i = 0; i < numOfCols; ++i) {
738
    SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
2,633,922!
739
    TSDB_CHECK_NULL(pColNode, code, lino, _return, terrno)
740

741
    if (pColNode->isPrimTs) {
2,633,922✔
742
      *tsSlotId = i;
2,633,922✔
743
    } else if (pColNode->hasRef) {
2,633,922✔
744
      int32_t slotKey = pColNode->dataBlockId << 16 | pColNode->slotId;
2,633,922✔
745
      VTS_ERR_JRET(tSimpleHashPut(*pSlotMap, &slotKey, sizeof(slotKey), &i, sizeof(i)));
746
    } else if (pColNode->colType == COLUMN_TYPE_TAG || '\0' == pColNode->tableAlias[0]) {
2,633,922✔
747
      // tag column or pseudo column's function
3,291✔
748
      *tagBlockId = pColNode->dataBlockId;
749
    }
750
  }
2,630,631✔
751

2,630,631✔
752
  return code;
2,630,631✔
753
_return:
2,630,631!
754
  tSimpleHashCleanup(*pSlotMap);
755
  *pSlotMap = NULL;
16,706,816✔
756
  if (code != TSDB_CODE_SUCCESS) {
14,076,185✔
757
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
14,076,185!
758
  }
759
  return code;
14,076,185!
760
}
2,376,625✔
761

11,699,560!
762
int32_t resetVirtualTableMergeOperState(SOperatorInfo* pOper) {
6,534,230✔
763
  int32_t code = 0, lino = 0;
6,534,230!
764
  SVirtualScanMergeOperatorInfo* pMergeInfo = pOper->info;
5,165,330✔
765
  SVirtualScanPhysiNode* pPhynode = (SVirtualScanPhysiNode*)pOper->pPhyNode;
766
  SVirtualTableScanInfo* pInfo = &pMergeInfo->virtualScanInfo;
1,483,830✔
767
  
768
  pOper->status = OP_NOT_OPENED;
769
  resetBasicOperatorState(&pMergeInfo->binfo);
770

2,630,631✔
UNCOV
771
  tsortDestroySortHandle(pInfo->pSortHandle);
×
UNCOV
772
  pInfo->pSortHandle = NULL;
×
UNCOV
773
  // taosArrayDestroy(pInfo->pSortInfo);
×
UNCOV
774
  // pInfo->pSortInfo = NULL;
×
UNCOV
775

×
776
  blockDataDestroy(pInfo->pIntermediateBlock);
UNCOV
777
  pInfo->pIntermediateBlock = NULL;
×
778

779
  blockDataDestroy(pInfo->pInputBlock);
780
  pInfo->pInputBlock = createDataBlockFromDescNode(((SPhysiNode*)pPhynode)->pOutputDataBlockDesc);
10,410,348✔
781
  TSDB_CHECK_NULL(pInfo->pInputBlock, code, lino, _exit, terrno)
10,410,348✔
782

10,410,348✔
783
  pInfo->tagDownStreamId = -1;
10,414,700✔
784

10,414,700✔
785
  if (pInfo->pSortCtxList) {
786
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSortCtxList); i++) {
10,414,700✔
787
      SLoadNextCtx* pCtx = *(SLoadNextCtx**)taosArrayGet(pInfo->pSortCtxList, i);
10,413,068✔
788
      blockDataDestroy(pCtx->pIntermediateBlock);
789
      taosMemoryFree(pCtx);
10,413,068✔
790
    }
10,412,524✔
791
    taosArrayDestroy(pInfo->pSortCtxList);
792
    pInfo->pSortCtxList = NULL;
793
  }
794

10,413,068✔
795
  pMergeInfo->pSavedTuple = NULL;
10,411,980✔
796
  pMergeInfo->pSavedTagBlock = NULL;
797

10,412,524✔
798
_exit:
10,413,068✔
799

10,415,788!
800
  if (code) {
801
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
10,414,700✔
802
  }
803

10,412,524✔
804
  return code;
426,680✔
805
}
220,684✔
806

220,684✔
807
int32_t createVirtualTableMergeOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
220,684!
808
                                            SVirtualScanPhysiNode* pVirtualScanPhyNode,
809
                                            SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
205,996✔
810
  SPhysiNode*                    pPhyNode = (SPhysiNode*)pVirtualScanPhyNode;
205,996✔
811
  int32_t                        lino = 0;
812
  int32_t                        code = TSDB_CODE_SUCCESS;
813
  SVirtualScanMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SVirtualScanMergeOperatorInfo));
10,411,436✔
814
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
10,412,524✔
815
  SDataBlockDescNode*            pDescNode = pPhyNode->pOutputDataBlockDesc;
816
  SNodeList*                     pMergeKeys = NULL;
10,412,524✔
817

818
  QUERY_CHECK_NULL(pInfo, code, lino, _return, terrno)
10,412,524!
UNCOV
819
  QUERY_CHECK_NULL(pOperator, code, lino, _return, terrno)
×
820

821
  pOperator->pPhyNode = pVirtualScanPhyNode;
822

10,408,716✔
823
  pInfo->binfo.inputTsOrder = pVirtualScanPhyNode->scan.node.inputTsOrder;
824
  pInfo->binfo.outputTsOrder = pVirtualScanPhyNode->scan.node.outputTsOrder;
825

2,633,922✔
826
  SVirtualTableScanInfo* pVirtualScanInfo = &pInfo->virtualScanInfo;
827
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
828
  TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _return, terrno)
2,633,922✔
829

2,633,922✔
830
  SSDataBlock* pInputBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
2,633,922✔
831
  TSDB_CHECK_NULL(pInputBlock, code, lino, _return, terrno)
2,633,922!
832
  pVirtualScanInfo->pInputBlock = pInputBlock;
2,633,922!
833
  pVirtualScanInfo->tagDownStreamId = -1;
2,633,922✔
834
  pVirtualScanInfo->vtableUid = (tb_uid_t)pVirtualScanPhyNode->scan.uid;
2,633,922✔
835
  if (pVirtualScanPhyNode->scan.pScanPseudoCols != NULL) {
836
    SExprSupp* pSup = &pVirtualScanInfo->base.pseudoSup;
2,633,922!
837
    pSup->pExprInfo = NULL;
2,633,922!
838
    VTS_ERR_JRET(createExprInfo(pVirtualScanPhyNode->scan.pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs));
839

2,633,922✔
840
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset,
841
                                      &pTaskInfo->storageAPI.functionStore);
2,633,922✔
842
    TSDB_CHECK_NULL(pSup->pCtx, code, lino, _return, terrno)
2,633,922✔
843
  }
844

2,633,922✔
845
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2,633,922✔
846
  TSDB_CHECK_CODE(blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity), lino, _return);
2,633,922!
847

848
  size_t  numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock);
2,633,922✔
849
  int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
2,633,922!
850

2,633,922✔
851
  if (!pVirtualScanPhyNode->scan.node.dynamicOp) {
2,633,922✔
852
    VTS_ERR_JRET(makeTSMergeKey(&pMergeKeys, 0));
2,633,922✔
853
    pVirtualScanInfo->pSortInfo = createSortInfo(pMergeKeys);
2,633,922✔
854
    TSDB_CHECK_NULL(pVirtualScanInfo->pSortInfo, code, lino, _return, terrno)
682,298✔
855
  } else {
682,298✔
856
    pTaskInfo->dynamicTask = true;
682,298!
857
  }
858
  pVirtualScanInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols);
682,298✔
859
  pVirtualScanInfo->sortBufSize =
860
      pVirtualScanInfo->bufPageSize * (numOfDownstream + 1);  // one additional is reserved for merged result.
682,298!
861
  VTS_ERR_JRET(
862
      extractColMap(pVirtualScanPhyNode->pTargets, &pVirtualScanInfo->dataSlotMap, &pVirtualScanInfo->tsSlotId, &pVirtualScanInfo->tagBlockId));
863

2,633,922✔
864
  pVirtualScanInfo->scanAllCols = pVirtualScanPhyNode->scanAllCols;
2,633,922!
865

866
  VTS_ERR_JRET(filterInitFromNode((SNode*)pVirtualScanPhyNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo,
2,633,922✔
867
                                  0, pTaskInfo->pStreamRuntimeInfo));
2,633,922✔
868

869
  pVirtualScanInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
2,633,922!
870
  QUERY_CHECK_NULL(pVirtualScanInfo->base.metaCache.pTableMetaEntryCache, code, lino, _return, terrno)
1,984,069!
871

1,984,069✔
872
  setOperatorInfo(pOperator, "VirtualTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN, false,
1,984,069!
873
                  OP_NOT_OPENED, pInfo, pTaskInfo);
874
  pOperator->fpSet =
649,853✔
875
      createOperatorFpSet(openVirtualTableScanOperator, virtualTableGetNext, NULL, destroyVirtualTableScanOperatorInfo,
876
                          optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2,633,922✔
877
  setOperatorResetStateFn(pOperator, resetVirtualTableMergeOperState);
2,633,922✔
878

2,633,922✔
879
  if (NULL != pDownstream) {
2,633,922!
880
    VTS_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));
881
  } else {
882
    pVirtualScanInfo->tagDownStreamId = -1;
2,633,922!
883
  }
884

2,633,922!
885
  nodesDestroyList(pMergeKeys);
886
  *pOptrInfo = pOperator;
887
  return TSDB_CODE_SUCCESS;
2,633,922✔
888

2,633,922!
889
_return:
890
  if (code != TSDB_CODE_SUCCESS) {
2,633,922✔
891
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
892
  }
893
  if (pInfo != NULL) {
2,633,922✔
894
    destroyVirtualTableScanOperatorInfo(pInfo);
895
  }
2,633,922✔
896
  nodesDestroyList(pMergeKeys);
897
  pTaskInfo->code = code;
2,633,922✔
898
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
2,614,952!
899
  return code;
900
}
18,970✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc