• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.37
/source/libs/executor/src/operator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "function.h"
18
#include "os.h"
19
#include "tname.h"
20

21
#include "tglobal.h"
22

23
#include "executorInt.h"
24
#include "index.h"
25
#include "operator.h"
26
#include "query.h"
27
#include "querytask.h"
28

29
#include "storageapi.h"
30
#include "tdatablock.h"
31

32
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
19,330,971✔
33
                                   __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain,
34
                                   __optr_get_ext_fn_t nextExtFn, __optr_notify_fn_t notifyFn) {
35
  SOperatorFpSet fpSet = {
19,330,971✔
36
      ._openFn = openFn,
37
      .getNextFn = nextFn,
38
      .cleanupFn = cleanup,
39
      .closeFn = closeFn,
40
      .reqBufFn = reqBufFn,
41
      .getExplainFn = explain,
42
      .getNextExtFn = nextExtFn,
43
      .notifyFn = notifyFn,
44
      .releaseStreamStateFn = NULL,
45
      .reloadStreamStateFn = NULL,
46
  };
47

48
  return fpSet;
19,330,971✔
49
}
50

51
void setOperatorStreamStateFn(SOperatorInfo* pOperator, __optr_state_fn_t relaseFn, __optr_state_fn_t reloadFn) {
1,810,615✔
52
  pOperator->fpSet.releaseStreamStateFn = relaseFn;
1,810,615✔
53
  pOperator->fpSet.reloadStreamStateFn = reloadFn;
1,810,615✔
54
}
1,810,615✔
55

56
int32_t optrDummyOpenFn(SOperatorInfo* pOperator) {
7,770,271✔
57
  OPTR_SET_OPENED(pOperator);
7,770,271✔
58
  pOperator->cost.openCost = 0;
7,770,271✔
59
  return TSDB_CODE_SUCCESS;
7,770,271✔
60
}
61

62
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
9,655,155✔
63
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
9,655,155!
64
  if (p->pDownstream == NULL) {
9,661,888!
65
    return terrno;
×
66
  }
67

68
  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
9,661,888✔
69
  p->numOfDownstream = num;
9,661,888✔
70
  p->numOfRealDownstream = num;
9,661,888✔
71
  return TSDB_CODE_SUCCESS;
9,661,888✔
72
}
73

74
void setOperatorCompleted(SOperatorInfo* pOperator) {
17,186,214✔
75
  pOperator->status = OP_EXEC_DONE;
17,186,214✔
76
  pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start) / 1000.0;
17,188,231✔
77
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
17,188,231✔
78
}
17,187,862✔
79

80
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
19,325,700✔
81
                     void* pInfo, SExecTaskInfo* pTaskInfo) {
82
  pOperator->name = (char*)name;
19,325,700✔
83
  pOperator->operatorType = type;
19,325,700✔
84
  pOperator->blocking = blocking;
19,325,700✔
85
  pOperator->status = status;
19,325,700✔
86
  pOperator->info = pInfo;
19,325,700✔
87
  pOperator->pTaskInfo = pTaskInfo;
19,325,700✔
88
}
19,325,700✔
89

90
// each operator should be set their own function to return total cost buffer
91
int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
×
92
  if (pOperator->blocking) {
×
93
    return -1;
×
94
  } else {
95
    return 0;
×
96
  }
97
}
98

99
static int64_t getQuerySupportBufSize(size_t numOfTables) {
×
100
  size_t s1 = sizeof(STableQueryInfo);
×
101
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
102
  return (int64_t)(s1 * 1.5 * numOfTables);
×
103
}
104

105
int32_t checkForQueryBuf(size_t numOfTables) {
×
106
  int64_t t = getQuerySupportBufSize(numOfTables);
×
107
  if (tsQueryBufferSizeBytes < 0) {
×
108
    return TSDB_CODE_SUCCESS;
×
109
  } else if (tsQueryBufferSizeBytes > 0) {
×
110
    while (1) {
×
111
      int64_t s = tsQueryBufferSizeBytes;
×
112
      int64_t remain = s - t;
×
113
      if (remain >= 0) {
×
114
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
×
115
          return TSDB_CODE_SUCCESS;
×
116
        }
117
      } else {
118
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
×
119
      }
120
    }
121
  }
122

123
  // disable query processing if the value of tsQueryBufferSize is zero.
124
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
×
125
}
126

127
void releaseQueryBuf(size_t numOfTables) {
×
128
  if (tsQueryBufferSizeBytes < 0) {
×
129
    return;
×
130
  }
131

132
  int64_t t = getQuerySupportBufSize(numOfTables);
×
133

134
  // restore value is not enough buffer available
135
  (void)atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
×
136
}
137

138
typedef enum {
139
  OPTR_FN_RET_CONTINUE = 0x1,
140
  OPTR_FN_RET_ABORT = 0x2,
141
} ERetType;
142

143
typedef struct STraverParam {
144
  void*   pRet;
145
  int32_t code;
146
  void*   pParam;
147
} STraverParam;
148

149
// iterate the operator tree helper
150
typedef ERetType (*optr_fn_t)(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdstr);
151

152
void traverseOperatorTree(SOperatorInfo* pOperator, optr_fn_t fn, STraverParam* pParam, const char* id) {
598,640✔
153
  if (pOperator == NULL) {
598,640✔
154
    return;
2✔
155
  }
156

157
  ERetType ret = fn(pOperator, pParam, id);
598,638✔
158
  if (ret == OPTR_FN_RET_ABORT || pParam->code != TSDB_CODE_SUCCESS) {
598,632!
159
    return;
291,917✔
160
  }
161

162
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
611,374✔
163
    traverseOperatorTree(pOperator->pDownstream[i], fn, pParam, id);
304,658✔
164
    if (pParam->code != 0) {
304,659!
165
      break;
×
166
    }
167
  }
168
}
169

170
ERetType extractOperatorInfo(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdStr) {
573,574✔
171
  STraverParam* p = pParam;
573,574✔
172
  if (pOperator->operatorType == *(int32_t*)p->pParam) {
573,574✔
173
    p->pRet = pOperator;
283,378✔
174
    return OPTR_FN_RET_ABORT;
283,378✔
175
  } else {
176
    return OPTR_FN_RET_CONTINUE;
290,196✔
177
  }
178
}
179

180
// QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
181
int32_t extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id, SOperatorInfo** pOptrInfo) {
283,396✔
182
  QRY_PARAM_CHECK(pOptrInfo);
283,396!
183

184
  if (pOperator == NULL) {
283,396✔
185
    qError("invalid operator, failed to find tableScanOperator %s", id);
15!
186
    return TSDB_CODE_PAR_INTERNAL_ERROR;
15✔
187
  }
188

189
  STraverParam p = {.pParam = &type, .pRet = NULL};
283,381✔
190
  traverseOperatorTree(pOperator, extractOperatorInfo, &p, id);
283,381✔
191
  if (p.code == 0) {
283,375!
192
    *pOptrInfo = p.pRet;
283,376✔
193
  }
194
  return p.code;
283,375✔
195
}
196

197
typedef struct SExtScanInfo {
198
  int32_t order;
199
  int32_t scanFlag;
200
  int32_t inheritUsOrder;
201
} SExtScanInfo;
202

203
static ERetType extractScanInfo(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdStr) {
×
204
  int32_t       type = pOperator->operatorType;
×
205
  SExtScanInfo* pInfo = pParam->pParam;
×
206

207
  if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN ||
×
208
      type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN ||
×
209
      type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) {
×
210
    pInfo->order = TSDB_ORDER_ASC;
×
211
    pInfo->scanFlag = MAIN_SCAN;
×
212
    return OPTR_FN_RET_ABORT;
×
213
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
×
214
    if (!pInfo->inheritUsOrder) {
×
215
      pInfo->order = TSDB_ORDER_ASC;
×
216
    }
217
    pInfo->scanFlag = MAIN_SCAN;
×
218
    return OPTR_FN_RET_ABORT;
×
219
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
×
220
    STableScanInfo* pTableScanInfo = pOperator->info;
×
221
    pInfo->order = pTableScanInfo->base.cond.order;
×
222
    pInfo->scanFlag = pTableScanInfo->base.scanFlag;
×
223
    return OPTR_FN_RET_ABORT;
×
224
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
×
225
    STableMergeScanInfo* pTableScanInfo = pOperator->info;
×
226
    pInfo->order = pTableScanInfo->base.cond.order;
×
227
    pInfo->scanFlag = pTableScanInfo->base.scanFlag;
×
228
    return OPTR_FN_RET_ABORT;
×
229
  } else {
230
    return OPTR_FN_RET_CONTINUE;
×
231
  }
232
}
233

234
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder) {
×
235
  SExtScanInfo info = {.inheritUsOrder = inheritUsOrder, .order = *order};
×
236
  STraverParam p = {.pParam = &info};
×
237

238
  traverseOperatorTree(pOperator, extractScanInfo, &p, NULL);
×
239
  *order = info.order;
×
240
  *scanFlag = info.scanFlag;
×
241

242
  if (p.code == TSDB_CODE_SUCCESS) {
×
243
    if (!(*order == TSDB_ORDER_ASC || *order == TSDB_ORDER_DESC)) {
×
244
      qError("operator failed at: %s:%d", __func__, __LINE__);
×
245
      p.code = TSDB_CODE_INVALID_PARA;
×
246
    }
247
  }
248
  return p.code;
×
249
}
250

251
static ERetType doStopDataReader(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdStr) {
25,057✔
252
  SStorageAPI* pAPI = pParam->pParam;
25,057✔
253
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
25,057✔
254
    STableScanInfo* pInfo = pOperator->info;
1,040✔
255

256
    if (pInfo->base.dataReader != NULL) {
1,040✔
257
      pAPI->tsdReader.tsdReaderNotifyClosing(pInfo->base.dataReader);
1,035✔
258
    }
259
    return OPTR_FN_RET_ABORT;
1,040✔
260
  } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
24,017✔
261
    SStreamScanInfo* pInfo = pOperator->info;
7,519✔
262

263
    if (pInfo->pTableScanOp != NULL) {
7,519✔
264
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
7,222✔
265
      if (pTableScanInfo != NULL && pTableScanInfo->base.dataReader != NULL) {
7,222!
266
        pAPI->tsdReader.tsdReaderNotifyClosing(pTableScanInfo->base.dataReader);
2✔
267
      }
268
    }
269

270
    return OPTR_FN_RET_ABORT;
7,517✔
271
  }
272

273
  return OPTR_FN_RET_CONTINUE;
16,498✔
274
}
275

276
int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr, SStorageAPI* pAPI) {
10,617✔
277
  STraverParam p = {.pParam = pAPI};
10,617✔
278
  traverseOperatorTree(pOperator, doStopDataReader, &p, pIdStr);
10,617✔
279
  return p.code;
10,619✔
280
}
281

282
int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
19,326,682✔
283
                       SNode* pTagIndexCond, const char* pUser, const char* dbname, SOperatorInfo** pOptrInfo,
284
                       EOPTR_EXEC_MODEL model) {
285
  QRY_PARAM_CHECK(pOptrInfo);
19,326,682!
286

287
  int32_t     code = 0;
19,326,682✔
288
  int32_t     type = nodeType(pPhyNode);
19,326,682✔
289
  const char* idstr = GET_TASKID(pTaskInfo);
19,326,682✔
290

291
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
19,326,682✔
292
    SOperatorInfo* pOperator = NULL;
9,667,073✔
293
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
9,667,073✔
294
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
2,784,453✔
295
      // NOTE: this is an patch to fix the physical plan
296
      // TODO remove it later
297
      if (pTableScanNode->scan.node.pLimit != NULL) {
2,784,453✔
298
        pTableScanNode->groupSort = true;
120,257✔
299
      }
300

301
      STableListInfo* pTableListInfo = tableListCreate();
2,784,453✔
302
      if (!pTableListInfo) {
2,785,772!
303
        pTaskInfo->code = terrno;
×
304
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
305
        return terrno;
×
306
      }
307

308
      code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo);
2,785,772✔
309
      if (code) {
2,781,159!
310
        pTaskInfo->code = code;
×
311
        tableListDestroy(pTableListInfo);
×
312
        return code;
120✔
313
      }
314

315
      if (pTableScanNode->scan.node.dynamicOp) {
2,782,621✔
316
        pTaskInfo->dynamicTask = true;
44,117✔
317
        pTableListInfo->idInfo.suid = pTableScanNode->scan.suid;
44,117✔
318
        pTableListInfo->idInfo.tableType = pTableScanNode->scan.tableType;
44,117✔
319
      } else {
320
        code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
2,738,504✔
321
                                       pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
322
        if (code) {
2,740,450✔
323
          pTaskInfo->code = code;
29✔
324
          tableListDestroy(pTableListInfo);
29✔
325
          qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
29!
326
          return code;
29✔
327
        }
328
      }
329

330
      code = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo, &pOperator);
2,784,538✔
331
      if (NULL == pOperator || code != 0) {
2,783,023!
UNCOV
332
        pTaskInfo->code = code;
×
UNCOV
333
        tableListDestroy(pTableListInfo);
×
334
        return code;
176✔
335
      }
336

337
      STableScanInfo* pScanInfo = pOperator->info;
2,783,381✔
338
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
2,783,381✔
339
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
6,882,620✔
340
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
1,975,187✔
341
      STableListInfo*           pTableListInfo = tableListCreate();
1,975,187✔
342
      if (!pTableListInfo) {
1,977,883!
343
        pTaskInfo->code = terrno;
×
344
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
345
        return terrno;
×
346
      }
347

348
      code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle, pTableListInfo,
1,977,883✔
349
                                     pTagCond, pTagIndexCond, pTaskInfo);
350
      if (code) {
1,976,425!
351
        pTaskInfo->code = code;
×
352
        tableListDestroy(pTableListInfo);
×
353
        qError("failed to createScanTableListInfo, code:%s", tstrerror(code));
×
354
        return code;
×
355
      }
356

357
      code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo);
1,976,425✔
358
      if (code) {
1,972,406!
359
        pTaskInfo->code = code;
×
360
        tableListDestroy(pTableListInfo);
×
361
        return code;
×
362
      }
363

364
      code = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo, &pOperator);
1,972,406✔
365
      if (NULL == pOperator || code != 0) {
1,974,811!
366
        pTaskInfo->code = code;
×
367
        tableListDestroy(pTableListInfo);
×
368
        return code;
×
369
      }
370

371
      STableScanInfo* pScanInfo = pOperator->info;
1,975,204✔
372
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
1,975,204✔
373
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
4,907,433✔
374
      code = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
3,639,555!
375
                                        pTaskInfo, &pOperator);
376
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
1,267,878✔
377
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
8,769✔
378
      STableListInfo*      pTableListInfo = tableListCreate();
8,769✔
379
      if (!pTableListInfo) {
8,773!
380
        pTaskInfo->code = terrno;
×
381
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
382
        return terrno;
×
383
      }
384

385
      if (pHandle->vnode) {
8,773✔
386
        code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
8,477✔
387
                                       pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
388
        if (code) {
8,476!
389
          pTaskInfo->code = code;
×
390
          tableListDestroy(pTableListInfo);
×
391
          qError("failed to createScanTableListInfo, code:%s", tstrerror(code));
×
392
          return code;
×
393
        }
394
      }
395

396
      code = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo, &pOperator);
8,772✔
397
      if (code) {
8,771!
398
        pTaskInfo->code = code;
×
399
        tableListDestroy(pTableListInfo);
×
400
        return code;
×
401
      }
402
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
1,259,109✔
403
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
1,203,969✔
404
      code = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo, &pOperator);
1,203,969✔
405
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN == type) {
55,140!
406
      STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode;
×
407
      code = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo, &pOperator);
×
408
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
55,140✔
409
      STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode;
47,740✔
410
      STableListInfo*    pTableListInfo = tableListCreate();
47,740✔
411
      if (!pTableListInfo) {
47,807!
412
        pTaskInfo->code = terrno;
×
413
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
414
        return terrno;
×
415
      }
416
      if (!pTagScanPhyNode->onlyMetaCtbIdx) {
47,807✔
417
        code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
3,710✔
418
                                       pTagIndexCond, pTaskInfo);
419
        if (code != TSDB_CODE_SUCCESS) {
3,706!
420
          pTaskInfo->code = code;
×
421
          qError("failed to getTableList, code:%s", tstrerror(code));
×
422
          tableListDestroy(pTableListInfo);
×
423
          return code;
×
424
        }
425
      }
426
      code = createTagScanOperatorInfo(pHandle, pTagScanPhyNode, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo,
47,803✔
427
                                       &pOperator);
428
      if (code) {
47,832!
429
        pTaskInfo->code = code;
×
430
        tableListDestroy(pTableListInfo);
×
431
        return code;
×
432
      }
433
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
7,400✔
434
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
3,250✔
435
      STableListInfo*          pTableListInfo = tableListCreate();
3,250✔
436
      if (!pTableListInfo) {
3,252!
UNCOV
437
        pTaskInfo->code = terrno;
×
438
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
439
        return terrno;
×
440
      }
441

442
      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
3,252✔
443
        SArray* pList = taosArrayInit(4, sizeof(uint64_t));
3,249✔
444
        code = pTaskInfo->storageAPI.metaFn.getChildTableList(pHandle->vnode, pBlockNode->uid, pList);
3,250✔
445
        if (code != TSDB_CODE_SUCCESS) {
3,247!
446
          pTaskInfo->code = code;
×
447
          taosArrayDestroy(pList);
×
448
          tableListDestroy(pTableListInfo);
×
449
          return code;
×
450
        }
451

452
        size_t num = taosArrayGetSize(pList);
3,247✔
453
        for (int32_t i = 0; i < num; ++i) {
10,271✔
454
          uint64_t* id = taosArrayGet(pList, i);
7,024✔
455
          if (id == NULL) {
7,020!
456
            continue;
×
457
          }
458

459
          code = tableListAddTableInfo(pTableListInfo, *id, 0);
7,020✔
460
          if (code) {
7,026!
461
            pTaskInfo->code = code;
×
462
            tableListDestroy(pTableListInfo);
×
463
            taosArrayDestroy(pList);
×
464
            return code;
4,514✔
465
          }
466
        }
467

468
        taosArrayDestroy(pList);
3,247✔
469
      } else {  // Create group with only one table
470
        code = tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
3✔
471
        if (code) {
3!
472
          pTaskInfo->code = code;
×
473
          tableListDestroy(pTableListInfo);
×
474
          return code;
×
475
        }
476
      }
477

478
      code = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTableListInfo, pTaskInfo, &pOperator);
3,251✔
479
      if (code) {
3,240!
480
        pTaskInfo->code = code;
×
481
        tableListDestroy(pTableListInfo);
×
482
        return code;
×
483
      }
484
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
4,150!
485
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
4,685✔
486
      STableListInfo*        pTableListInfo = tableListCreate();
4,685✔
487
      if (!pTableListInfo) {
4,689!
488
        pTaskInfo->code = terrno;
×
489
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
490
        return terrno;
×
491
      }
492

493
      code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond,
4,689✔
494
                                     pTagIndexCond, pTaskInfo);
495
      if (code != TSDB_CODE_SUCCESS) {
4,687!
496
        pTaskInfo->code = code;
×
497
        tableListDestroy(pTableListInfo);
×
498
        return code;
×
499
      }
500

501
      code = initQueriedTableSchemaInfo(pHandle, &pScanNode->scan, dbname, pTaskInfo);
4,687✔
502
      if (code != TSDB_CODE_SUCCESS) {
4,681!
503
        pTaskInfo->code = code;
×
504
        tableListDestroy(pTableListInfo);
×
505
        return code;
×
506
      }
507

508
      code = createCacherowsScanOperator(pScanNode, pHandle, pTableListInfo, pTaskInfo, &pOperator);
4,681✔
509
      if (code) {
4,681!
510
        tableListDestroy(pTableListInfo);
×
511
        pTaskInfo->code = code;
×
512
        return code;
×
513
      }
514
    } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
×
515
      code = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo, &pOperator);
1,657✔
NEW
516
    } else if (QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN == type && model != OPTR_EXEC_MODEL_STREAM) {
×
517
      code = createVirtualTableMergeOperatorInfo(NULL, pHandle, NULL, 0, (SVirtualScanPhysiNode*)pPhyNode, pTaskInfo, &pOperator);
×
NEW
518
    } else if (QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN == type && model == OPTR_EXEC_MODEL_STREAM) {
×
NEW
519
      code = createStreamVtableMergeOperatorInfo(pHandle, (SVirtualScanPhysiNode*)pPhyNode, pTagCond, pTaskInfo, &pOperator);
×
520
    } else {
521
      code = TSDB_CODE_INVALID_PARA;
×
UNCOV
522
      pTaskInfo->code = code;
×
UNCOV
523
      return code;
×
524
    }
525

526
    if (pOperator != NULL) {  // todo moved away
9,666,754✔
527
      pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
9,666,641✔
528
    }
529

530
    *pOptrInfo = pOperator;
9,666,754✔
531
    return code;
9,666,754✔
532
  }
533

534
  size_t          size = LIST_LENGTH(pPhyNode->pChildren);
9,659,609✔
535
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
9,659,609✔
536
  if (ops == NULL) {
9,662,773!
537
    code = terrno;
×
UNCOV
538
    pTaskInfo->code = code;
×
UNCOV
539
    return code;
×
540
  }
541

542
  for (int32_t i = 0; i < size; ++i) {
20,859,860✔
543
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
11,197,020✔
544
    code = createOperator(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser, dbname, &ops[i], model);
11,198,605✔
545
    if (ops[i] == NULL || code != 0) {
11,197,213!
546
      for (int32_t j = 0; j < i; ++j) {
126!
UNCOV
547
        destroyOperator(ops[j]);
×
548
      }
549
      taosMemoryFree(ops);
126!
550
      return code;
126✔
551
    }
552
  }
553

554
  SOperatorInfo* pOptr = NULL;
9,662,840✔
555
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
9,662,840✔
556
    code = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
1,792,668✔
557
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
7,870,172✔
558
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
2,328,497✔
559
    if (pAggNode->pGroupKeys != NULL) {
2,328,497✔
560
      code = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo, &pOptr);
364,460✔
561
    } else {
562
      code = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo, &pOptr);
1,964,037✔
563
    }
564
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
5,541,675✔
565
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
1,464,395✔
566
    code = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, &pOptr);
1,464,395✔
567
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
4,077,280✔
568
    code = createStreamSingleIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
3,071✔
569
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_INTERVAL == type) {
4,074,209✔
570
    code = createStreamIntervalSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
88✔
571
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
4,074,121✔
572
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
540,659✔
573
    code = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, &pOptr);
540,659✔
574
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
3,533,462!
UNCOV
575
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
×
UNCOV
576
    code = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, &pOptr);
×
577
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
3,533,462✔
578
    int32_t children = 0;
685✔
579
    code = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle, &pOptr);
685✔
580
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_SEMI_INTERVAL == type) {
3,532,777✔
581
    code = createSemiIntervalSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
26✔
582
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL == type) {
3,532,751✔
583
    int32_t children = pHandle->numOfVgroups;
22✔
584
    code = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle, &pOptr);
22✔
585
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
3,532,729✔
586
    int32_t children = pHandle->numOfVgroups;
222✔
587
    code = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle, &pOptr);
222✔
588
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_FINAL_INTERVAL == type) {
3,532,507✔
589
    code = createFinalIntervalSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
13✔
590
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
3,532,494✔
591
    code = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
880,192✔
592
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
2,652,302✔
593
    code = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
205✔
594
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
2,652,097✔
595
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
1,367,509✔
596
    code = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo, &pOptr);
1,367,509✔
597
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
1,284,588✔
598
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
100,705✔
599
    code = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo, &pOptr);
100,705✔
600
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
1,183,883✔
601
    code = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
615✔
602
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
1,183,268✔
603
    int32_t children = 0;
125✔
604
    code = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle, &pOptr);
125✔
605
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
1,183,143✔
606
    int32_t children = pHandle->numOfVgroups;
38✔
607
    code = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle, &pOptr);
38✔
608
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
1,183,105✔
609
    code = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
25,783✔
610
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION == type) {
1,157,322✔
611
    code = createStreamPartitionOperatorInfo(ops[0], (SStreamPartitionPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
1,219✔
612
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
1,156,103✔
613
    SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
75,112✔
614
    code = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo, &pOptr);
75,112✔
615
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
1,080,991✔
616
    code = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
356✔
617
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT == type) {
1,080,635✔
618
    code = createStreamEventAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
217✔
619
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
1,080,418✔
620
    code = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
100,041✔
621
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN == type) {
980,377✔
622
    code = createHashJoinOperatorInfo(ops, size, (SHashJoinPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
11,035✔
623
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
969,342✔
624
    code = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
480,210✔
625
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) {
489,132✔
626
    code = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo, pHandle, &pOptr);
451✔
627
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
488,681✔
628
    code = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
117,394✔
629
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
371,287✔
630
    code = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
57,276✔
631
  } else if (QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC == type) {
314,011✔
632
    code = createForecastOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
27✔
633
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT == type) {
313,984✔
634
    code = createEventwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
279,962✔
635
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE == type) {
34,022✔
636
    code = createGroupCacheOperatorInfo(ops, size, (SGroupCachePhysiNode*)pPhyNode, pTaskInfo, &pOptr);
11,035✔
637
  } else if (QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL == type) {
22,987✔
638
    code = createDynQueryCtrlOperatorInfo(ops, size, (SDynQueryCtrlPhysiNode*)pPhyNode, pTaskInfo, pHandle, &pOptr);
11,035✔
639
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT == type) {
11,952✔
640
    code = createStreamCountAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
319✔
641
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT == type) {
11,633✔
642
    code = createCountwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
6,831✔
643
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC == type) {
4,802✔
644
    code = createStreamTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
221✔
645
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY == type) {
4,581✔
646
    code = createAnomalywindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
12✔
647
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_SESSION == type) {
4,569!
UNCOV
648
    code = createSessionNonblockOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
×
649
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_SEMI_SESSION == type) {
4,569!
UNCOV
650
    code = createSemiSessionNonblockOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
×
651
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_FINAL_SESSION == type) {
4,569!
UNCOV
652
    code = createFinalSessionNonblockOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
×
653
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_STATE == type) {
4,569!
UNCOV
654
    code = createStateNonblockOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
×
655
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_EVENT == type) {
4,569!
UNCOV
656
    code = createEventNonblockOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
×
657
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_COUNT == type) {
4,569!
658
    //todo (liuyao) add
NEW
659
  } else if (QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN == type && model != OPTR_EXEC_MODEL_STREAM) {
×
UNCOV
660
    SVirtualScanPhysiNode* pVirtualTableScanNode = (SVirtualScanPhysiNode*)pPhyNode;
×
661
    // NOTE: this is an patch to fix the physical plan
662

UNCOV
663
    if (pVirtualTableScanNode->scan.node.pLimit != NULL) {
×
UNCOV
664
      pVirtualTableScanNode->groupSort = true;
×
665
    }
666

UNCOV
667
    STableListInfo* pTableListInfo = tableListCreate();
×
UNCOV
668
    if (!pTableListInfo) {
×
UNCOV
669
      pTaskInfo->code = terrno;
×
UNCOV
670
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
671
      return terrno;
×
672
    }
673

UNCOV
674
    code = initQueriedTableSchemaInfo(pHandle, &pVirtualTableScanNode->scan, dbname, pTaskInfo);
×
UNCOV
675
    if (code) {
×
UNCOV
676
      pTaskInfo->code = code;
×
UNCOV
677
      tableListDestroy(pTableListInfo);
×
UNCOV
678
      return code;
×
679
    }
680

UNCOV
681
      code = createScanTableListInfo(&pVirtualTableScanNode->scan, pVirtualTableScanNode->pGroupTags, pVirtualTableScanNode->groupSort,
×
682
                                     pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
UNCOV
683
      if (code) {
×
UNCOV
684
        pTaskInfo->code = code;
×
UNCOV
685
        tableListDestroy(pTableListInfo);
×
UNCOV
686
        qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
×
UNCOV
687
        return code;
×
688
      }
689

690

UNCOV
691
    code = createVirtualTableMergeOperatorInfo(ops, pHandle, pTableListInfo, size, (SVirtualScanPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
×
692
  } else {
UNCOV
693
    code = TSDB_CODE_INVALID_PARA;
×
694
    pTaskInfo->code = code;
×
695
    for (int32_t i = 0; i < size; ++i) {
×
UNCOV
696
      destroyOperator(ops[i]);
×
697
    }
UNCOV
698
    taosMemoryFree(ops);
×
UNCOV
699
    qError("invalid operator type %d", type);
×
UNCOV
700
    return code;
×
701
  }
702

703
  taosMemoryFree(ops);
9,661,609✔
704
  if (pOptr) {
9,660,192✔
705
    pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
9,659,896✔
706
  }
707

708
  *pOptrInfo = pOptr;
9,660,192✔
709
  return code;
9,660,192✔
710
}
711

712
void destroyOperator(SOperatorInfo* pOperator) {
19,345,208✔
713
  if (pOperator == NULL) {
19,345,208✔
714
    return;
328✔
715
  }
716

717
  freeResetOperatorParams(pOperator, OP_GET_PARAM, true);
19,344,880✔
718
  freeResetOperatorParams(pOperator, OP_NOTIFY_PARAM, true);
19,344,503✔
719

720
  if (pOperator->pDownstream != NULL) {
19,345,935✔
721
    for (int32_t i = 0; i < pOperator->numOfRealDownstream; ++i) {
20,865,344✔
722
      destroyOperator(pOperator->pDownstream[i]);
11,201,017✔
723
    }
724

725
    taosMemoryFreeClear(pOperator->pDownstream);
9,664,327!
726
    pOperator->numOfDownstream = 0;
9,664,317✔
727
  }
728

729
  if (pOperator->fpSet.closeFn != NULL && pOperator->info != NULL) {
19,347,663!
730
    pOperator->fpSet.closeFn(pOperator->info);
19,347,730✔
731
  }
732

733
  cleanupExprSupp(&pOperator->exprSupp);
19,349,602✔
734
  taosMemoryFreeClear(pOperator);
19,349,161!
735
}
736

737
void destroyOperatorAndDownstreams(SOperatorInfo* pOperator, SOperatorInfo** downstreams, int32_t num) {
3✔
738
  if (downstreams != NULL) {
3!
739
    for (int i = 0; i < num; i++) {
6✔
740
      destroyOperator(downstreams[i]);
3✔
741
    }
742
  }
743

744
  if (pOperator != NULL) {
3!
745
    pOperator->info = NULL;
3✔
746
    if (pOperator->pDownstream != NULL) {
3!
747
      taosMemoryFreeClear(pOperator->pDownstream);
×
748
      pOperator->pDownstream = NULL;
×
749
    }
750
    destroyOperator(pOperator);
3✔
751
  }
752
}
3✔
753

754
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
1,325,249✔
755
  SExplainExecInfo  execInfo = {0};
1,325,249✔
756
  SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
1,325,258✔
757
  if (pExplainInfo == NULL) {
1,325,258!
UNCOV
758
    return terrno;
×
759
  }
760

761
  pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
1,325,258✔
762
  pExplainInfo->startupCost = operatorInfo->cost.openCost;
1,325,258✔
763
  pExplainInfo->totalCost = operatorInfo->cost.totalCost;
1,325,258✔
764
  pExplainInfo->verboseLen = 0;
1,325,258✔
765
  pExplainInfo->verboseInfo = NULL;
1,325,258✔
766

767
  if (operatorInfo->fpSet.getExplainFn) {
1,325,258✔
768
    int32_t code =
769
        operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
580,032✔
770
    if (code) {
580,028✔
771
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
46!
UNCOV
772
      return code;
×
773
    }
774
  }
775

776
  int32_t code = 0;
1,325,208✔
777
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
2,087,766✔
778
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
762,517✔
779
    if (code != TSDB_CODE_SUCCESS) {
762,558!
780
      //      taosMemoryFreeClear(*pRes);
UNCOV
781
      return code;
×
782
    }
783
  }
784

785
  return TSDB_CODE_SUCCESS;
1,325,249✔
786
}
787

788
int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc) {
×
UNCOV
789
  if (pDst->opType != pSrc->opType) {
×
UNCOV
790
    qError("different optype %d:%d for merge operator params", pDst->opType, pSrc->opType);
×
791
    return TSDB_CODE_INVALID_PARA;
×
792
  }
793

794
  switch (pDst->opType) {
×
UNCOV
795
    case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
×
UNCOV
796
      SExchangeOperatorParam* pDExc = pDst->value;
×
UNCOV
797
      SExchangeOperatorParam* pSExc = pSrc->value;
×
798
      if (!pDExc->multiParams) {
×
UNCOV
799
        if (pSExc->basic.vgId != pDExc->basic.vgId) {
×
800
          SExchangeOperatorBatchParam* pBatch = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
×
801
          if (NULL == pBatch) {
×
802
            return terrno;
×
803
          }
804

805
          pBatch->multiParams = true;
×
UNCOV
806
          pBatch->pBatchs = tSimpleHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
UNCOV
807
          if (NULL == pBatch->pBatchs) {
×
UNCOV
808
            taosMemoryFree(pBatch);
×
UNCOV
809
            return terrno;
×
810
          }
811

UNCOV
812
          tSimpleHashSetFreeFp(pBatch->pBatchs, freeExchangeGetBasicOperatorParam);
×
813

UNCOV
814
          int32_t code = tSimpleHashPut(pBatch->pBatchs, &pDExc->basic.vgId, sizeof(pDExc->basic.vgId), &pDExc->basic,
×
815
                                        sizeof(pDExc->basic));
816
          if (code) {
×
817
            return code;
×
818
          }
819

820
          code = tSimpleHashPut(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId), &pSExc->basic,
×
821
                                sizeof(pSExc->basic));
UNCOV
822
          if (code) {
×
UNCOV
823
            return code;
×
824
          }
825

UNCOV
826
          taosMemoryFree(pDst->value);
×
827
          pDst->value = pBatch;
×
828
        } else {
UNCOV
829
          void* p = taosArrayAddAll(pDExc->basic.uidList, pSExc->basic.uidList);
×
UNCOV
830
          if (p == NULL) {
×
UNCOV
831
            return terrno;
×
832
          }
833
        }
834
      } else {
835
        SExchangeOperatorBatchParam* pBatch = pDst->value;
×
836
        SExchangeOperatorBasicParam* pBasic =
UNCOV
837
            tSimpleHashGet(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId));
×
UNCOV
838
        if (pBasic) {
×
UNCOV
839
          void* p = taosArrayAddAll(pBasic->uidList, pSExc->basic.uidList);
×
840
          if (p == NULL) {
×
841
            return terrno;
×
842
          }
843
        } else {
UNCOV
844
          int32_t code = tSimpleHashPut(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId), &pSExc->basic,
×
845
                                        sizeof(pSExc->basic));
UNCOV
846
          if (code) {
×
UNCOV
847
            return code;
×
848
          }
849
        }
850
      }
UNCOV
851
      break;
×
852
    }
UNCOV
853
    default:
×
UNCOV
854
      qError("invalid optype %d for merge operator params", pDst->opType);
×
UNCOV
855
      return TSDB_CODE_INVALID_PARA;
×
856
  }
857

UNCOV
858
  return TSDB_CODE_SUCCESS;
×
859
}
860

861
int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInput, SOperatorParamType type) {
22,669✔
862
  SOperatorParam**  ppParam = NULL;
22,669✔
863
  SOperatorParam*** pppDownstramParam = NULL;
22,669✔
864
  switch (type) {
22,669!
865
    case OP_GET_PARAM:
22,669✔
866
      ppParam = &pOperator->pOperatorGetParam;
22,669✔
867
      pppDownstramParam = &pOperator->pDownstreamGetParams;
22,669✔
868
      break;
22,669✔
UNCOV
869
    case OP_NOTIFY_PARAM:
×
UNCOV
870
      ppParam = &pOperator->pOperatorNotifyParam;
×
UNCOV
871
      pppDownstramParam = &pOperator->pDownstreamNotifyParams;
×
UNCOV
872
      break;
×
UNCOV
873
    default:
×
UNCOV
874
      return TSDB_CODE_INVALID_PARA;
×
875
  }
876

877
  freeResetOperatorParams(pOperator, type, false);
22,669✔
878

879
  if (NULL == pInput) {
22,669!
UNCOV
880
    return TSDB_CODE_SUCCESS;
×
881
  }
882

883
  *ppParam = (pInput->opType == pOperator->operatorType) ? pInput : NULL;
22,669!
884

885
  if (NULL == *pppDownstramParam) {
22,669✔
886
    *pppDownstramParam = taosMemoryCalloc(pOperator->numOfDownstream, POINTER_BYTES);
1,895!
887
    if (NULL == *pppDownstramParam) {
1,895!
UNCOV
888
      return terrno;
×
889
    }
890
  }
891

892
  if (NULL == *ppParam) {
22,669!
893
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
×
UNCOV
894
      (*pppDownstramParam)[i] = pInput;
×
895
    }
UNCOV
896
    return TSDB_CODE_SUCCESS;
×
897
  }
898

899
  memset(*pppDownstramParam, 0, pOperator->numOfDownstream * POINTER_BYTES);
22,669✔
900

901
  int32_t childrenNum = taosArrayGetSize((*ppParam)->pChildren);
22,669✔
902
  if (childrenNum <= 0) {
22,669✔
903
    return TSDB_CODE_SUCCESS;
1,628✔
904
  }
905

906
  for (int32_t i = 0; i < childrenNum; ++i) {
63,123✔
907
    SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet((*ppParam)->pChildren, i);
42,082✔
908
    if (pChild == NULL) {
42,082!
UNCOV
909
      return terrno;
×
910
    }
911

912
    if ((*pppDownstramParam)[pChild->downstreamIdx]) {
42,082!
UNCOV
913
      int32_t code = mergeOperatorParams((*pppDownstramParam)[pChild->downstreamIdx], pChild);
×
UNCOV
914
      if (code) {
×
UNCOV
915
        return code;
×
916
      }
917
    } else {
918
      (*pppDownstramParam)[pChild->downstreamIdx] = pChild;
42,082✔
919
    }
920
  }
921

922
  taosArrayDestroy((*ppParam)->pChildren);
21,041✔
923
  (*ppParam)->pChildren = NULL;
21,041✔
924

925
  return TSDB_CODE_SUCCESS;
21,041✔
926
}
927

928
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) {
31,090,466✔
929
  SSDataBlock* p = NULL;
31,090,466✔
930
  int32_t      code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p);
31,090,466✔
931
  if (code == TSDB_CODE_SUCCESS) {
31,099,426!
932
    code = blockDataCheck(p);
31,100,763✔
933
    if (code != TSDB_CODE_SUCCESS) {
31,099,854✔
934
      qError("blockDataCheck failed, code:%s", tstrerror(code));
181!
935
    }
936
  }
937
  return (code == 0) ? p : NULL;
31,099,864✔
938
}
939

940
SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, int32_t idx) {
514,440✔
941
  SSDataBlock* p = NULL;
514,440✔
942
  int32_t      code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p);
514,440✔
943
  if (code == TSDB_CODE_SUCCESS) {
514,468!
944
    code = blockDataCheck(p);
514,482✔
945
    if (code != TSDB_CODE_SUCCESS) {
514,473!
UNCOV
946
      qError("blockDataCheck failed, code:%s", tstrerror(code));
×
947
    }
948
  }
949
  return (code == 0)? p:NULL;
514,475!
950
}
951

952
int32_t optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SSDataBlock** pRes) {
22,669✔
953
  QRY_PARAM_CHECK(pRes);
22,669!
954

955
  int32_t lino = 0;
22,669✔
956
  int32_t code = setOperatorParams(pOperator, pParam, OP_GET_PARAM);
22,669✔
957
  QUERY_CHECK_CODE(code, lino, _end);
22,669!
958
  code = pOperator->fpSet.getNextFn(pOperator, pRes);
22,669✔
959
  QUERY_CHECK_CODE(code, lino, _end);
22,668!
960

961
_end:
22,668✔
962
  if (code != TSDB_CODE_SUCCESS) {
22,668!
UNCOV
963
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
964
    pOperator->pTaskInfo->code = code;
×
UNCOV
965
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
966
  }
967

968
  return code;
22,668✔
969
}
970

UNCOV
971
int32_t optrDefaultNotifyFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
×
UNCOV
972
  int32_t code = setOperatorParams(pOperator, pParam, OP_NOTIFY_PARAM);
×
UNCOV
973
  if (TSDB_CODE_SUCCESS == code && pOperator->fpSet.notifyFn && pOperator->pOperatorNotifyParam) {
×
UNCOV
974
    code = pOperator->fpSet.notifyFn(pOperator, pOperator->pOperatorNotifyParam);
×
975
  }
UNCOV
976
  if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
977
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
×
UNCOV
978
      if (pOperator->pDownstreamNotifyParams[i]) {
×
UNCOV
979
        code = optrDefaultNotifyFn(pOperator->pDownstream[i], pOperator->pDownstreamNotifyParams[i]);
×
UNCOV
980
        if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
981
          break;
×
982
        }
UNCOV
983
        pOperator->pDownstreamNotifyParams[i] = NULL;
×
984
      }
985
    }
986
  }
UNCOV
987
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
988
    pOperator->pTaskInfo->code = code;
×
UNCOV
989
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
990
  }
991

UNCOV
992
  return code;
×
993
}
994

995
int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx) {
222,355✔
996
  if (pOperator->transparent) {
222,355✔
997
    return getOperatorResultBlockId(pOperator->pDownstream[idx], 0);
22,070✔
998
  }
999
  return pOperator->resultDataBlockId;
200,285✔
1000
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc