• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5006

29 Mar 2026 04:32AM UTC coverage: 72.274% (+0.1%) from 72.152%
#5006

push

travis-ci

web-flow
refactor: do some internal refactor for TDgpt. (#34955)

253711 of 351039 relevant lines covered (72.27%)

131490495.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.96
/source/libs/executor/src/operator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "function.h"
18
#include "operator.h"
19
#include "osTime.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "storageapi.h"
23
#include "taoserror.h"
24
#include "tdatablock.h"
25
#include "tdef.h"
26
#include "tglobal.h"
27
#include "tutil.h"
28

29
static int32_t optrGetNextFnWithExecRecord(SOperatorInfo* pOperator,
30
                                           SSDataBlock** pResBlock);
31
static int32_t optrGetNextExtFnWithExecRecord(SOperatorInfo* pOperator,
32
                                              SOperatorParam* pParam,
33
                                              SSDataBlock** pResBlock);
34
static int32_t optrResetStateFnWithExecRecord(SOperatorInfo* pParam);
35

36
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
772,950,308✔
37
                                   __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain,
38
                                   __optr_get_ext_fn_t nextExtFn, __optr_notify_fn_t notifyFn) {
39
  SOperatorFpSet fpSet = {
2,147,483,647✔
40
      ._openFn = openFn,
41
      ._nextFn = nextFn,
42
      .getNextFn = (nextFn != NULL) ? optrGetNextFnWithExecRecord : NULL,
772,950,308✔
43
      .cleanupFn = cleanup,
44
      .closeFn = closeFn,
45
      .reqBufFn = reqBufFn,
46
      .getExplainFn = explain,
47
      ._nextExtFn = nextExtFn,
48
      .getNextExtFn = (nextExtFn != NULL) ? optrGetNextExtFnWithExecRecord : NULL,
772,950,308✔
49
      .notifyFn = notifyFn,
50
      .releaseStreamStateFn = NULL,
51
      .reloadStreamStateFn = NULL,
52
      ._resetFn = NULL,
53
      .resetStateFn = NULL,
54
  };
55

56
  return fpSet;
772,950,308✔
57
}
58

59
static int32_t optrGetNextFnWithExecRecord(SOperatorInfo* pOperator,
2,147,483,647✔
60
                                           SSDataBlock** pResBlock) {
61
  QRY_PARAM_CHECK(pResBlock);
2,147,483,647✔
62

63
  recordOpExecBegin(pOperator);
2,147,483,647✔
64
  int32_t code = pOperator->fpSet._nextFn(pOperator, pResBlock);
2,147,483,647✔
65
  size_t  rows = (TSDB_CODE_SUCCESS == code && *pResBlock != NULL) ?
2,147,483,647✔
66
                 (*pResBlock)->info.rows : 0;
2,147,483,647✔
67
  recordOpExecEnd(pOperator, rows);
2,147,483,647✔
68
  return code;
2,147,483,647✔
69
}
70

71
static int32_t optrGetNextExtFnWithExecRecord(SOperatorInfo* pOperator,
87,681,703✔
72
                                              SOperatorParam* pParam,
73
                                              SSDataBlock** pResBlock) {
74
  QRY_PARAM_CHECK(pResBlock);
87,681,703✔
75

76
  recordOpExecBegin(pOperator);
87,681,703✔
77
  int32_t code = pOperator->fpSet._nextExtFn(pOperator, pParam, pResBlock);
87,679,857✔
78
  size_t  rows = (TSDB_CODE_SUCCESS == code && *pResBlock != NULL) ?
87,677,679✔
79
                 (*pResBlock)->info.rows : 0;
175,353,518✔
80
  recordOpExecEnd(pOperator, rows);
87,678,907✔
81
  return code;
87,679,937✔
82
}
83

84
static int32_t optrResetStateFnWithExecRecord(SOperatorInfo* pOperator) {
32,626,857✔
85
  int32_t code = pOperator->fpSet._resetFn(pOperator);
32,626,857✔
86
  resetOperatorCostInfo(pOperator);
32,626,894✔
87
  return code;
32,627,107✔
88
}
89

90
void setOperatorStreamStateFn(SOperatorInfo* pOperator, __optr_state_fn_t relaseFn, __optr_state_fn_t reloadFn) {
146,922,407✔
91
  pOperator->fpSet.releaseStreamStateFn = relaseFn;
146,922,407✔
92
  pOperator->fpSet.reloadStreamStateFn = reloadFn;
146,933,775✔
93
}
146,915,433✔
94

95
void setOperatorResetStateFn(SOperatorInfo* pOperator, __optr_reset_state_fn_t resetFn) {
771,147,553✔
96
  pOperator->fpSet._resetFn = resetFn;
771,147,553✔
97
  pOperator->fpSet.resetStateFn = (resetFn != NULL) ? optrResetStateFnWithExecRecord : NULL;
771,169,697✔
98
}
771,202,991✔
99

100
int32_t optrDummyOpenFn(SOperatorInfo* pOperator) {
73,091,974✔
101
  OPTR_SET_OPENED(pOperator);
73,091,974✔
102
  return TSDB_CODE_SUCCESS;
73,085,562✔
103
}
104

105
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
361,084,332✔
106
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
361,084,332✔
107
  if (p->pDownstream == NULL) {
361,095,199✔
108
    return terrno;
×
109
  }
110

111
  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
361,052,857✔
112
  p->numOfDownstream = num;
361,116,617✔
113
  p->numOfRealDownstream = num;
361,062,075✔
114
  return TSDB_CODE_SUCCESS;
361,104,681✔
115
}
116

117
void setOperatorCompleted(SOperatorInfo* pOperator) {
754,666,021✔
118
  pOperator->status = OP_EXEC_DONE;
754,666,021✔
119
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
754,681,112✔
120
}
754,681,651✔
121

122
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
772,903,802✔
123
                     void* pInfo, SExecTaskInfo* pTaskInfo) {
124
  pOperator->name = (char*)name;
772,903,802✔
125
  pOperator->operatorType = type;
772,992,689✔
126
  pOperator->blocking = blocking;
772,784,723✔
127
  pOperator->status = status;
772,887,614✔
128
  pOperator->info = pInfo;
772,922,349✔
129
  pOperator->pTaskInfo = pTaskInfo;
773,007,028✔
130
}
772,934,781✔
131

132
// each operator should be set their own function to return total cost buffer
133
int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
×
134
  if (pOperator->blocking) {
×
135
    return -1;
×
136
  } else {
137
    return 0;
×
138
  }
139
}
140

141
static int64_t getQuerySupportBufSize(size_t numOfTables) {
×
142
  size_t s1 = sizeof(STableQueryInfo);
×
143
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
144
  return (int64_t)(s1 * 1.5 * numOfTables);
×
145
}
146

147
int32_t checkForQueryBuf(size_t numOfTables) {
×
148
  int64_t t = getQuerySupportBufSize(numOfTables);
×
149
  if (tsQueryBufferSizeBytes < 0) {
×
150
    return TSDB_CODE_SUCCESS;
×
151
  } else if (tsQueryBufferSizeBytes > 0) {
×
152
    while (1) {
×
153
      int64_t s = tsQueryBufferSizeBytes;
×
154
      int64_t remain = s - t;
×
155
      if (remain >= 0) {
×
156
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
×
157
          return TSDB_CODE_SUCCESS;
×
158
        }
159
      } else {
160
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
×
161
      }
162
    }
163
  }
164

165
  // disable query processing if the value of tsQueryBufferSize is zero.
166
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
×
167
}
168

169
void releaseQueryBuf(size_t numOfTables) {
×
170
  if (tsQueryBufferSizeBytes < 0) {
×
171
    return;
×
172
  }
173

174
  int64_t t = getQuerySupportBufSize(numOfTables);
×
175

176
  // restore value is not enough buffer available
177
  (void)atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
×
178
}
179

180
typedef enum {
181
  OPTR_FN_RET_CONTINUE = 0x1,
182
  OPTR_FN_RET_ABORT = 0x2,
183
} ERetType;
184

185
typedef struct STraverParam {
186
  void*   pRet;
187
  int32_t code;
188
  void*   pParam;
189
} STraverParam;
190

191
// iterate the operator tree helper
192
typedef ERetType (*optr_fn_t)(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdstr);
193

194
void traverseOperatorTree(SOperatorInfo* pOperator, optr_fn_t fn, STraverParam* pParam, const char* id) {
23,826,362✔
195
  if (pOperator == NULL) {
23,826,362✔
196
    return;
×
197
  }
198

199
  ERetType ret = fn(pOperator, pParam, id);
23,826,362✔
200
  if (ret == OPTR_FN_RET_ABORT || pParam->code != TSDB_CODE_SUCCESS) {
23,819,767✔
201
    return;
12,490,456✔
202
  }
203

204
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
22,656,746✔
205
    traverseOperatorTree(pOperator->pDownstream[i], fn, pParam, id);
11,327,918✔
206
    if (pParam->code != 0) {
11,323,125✔
207
      break;
×
208
    }
209
  }
210
}
211

212
ERetType extractOperatorInfo(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdStr) {
23,676,866✔
213
  STraverParam* p = pParam;
23,676,866✔
214
  if (pOperator->operatorType == *(int32_t*)p->pParam) {
23,676,866✔
215
    p->pRet = pOperator;
12,413,267✔
216
    return OPTR_FN_RET_ABORT;
12,414,847✔
217
  } else {
218
    return OPTR_FN_RET_CONTINUE;
11,258,844✔
219
  }
220
}
221

222
// QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
223
int32_t extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id, SOperatorInfo** pOptrInfo) {
12,416,525✔
224
  QRY_PARAM_CHECK(pOptrInfo);
12,416,525✔
225

226
  if (pOperator == NULL) {
12,417,892✔
227
    qError("invalid operator, failed to find tableScanOperator %s", id);
×
228
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
229
  }
230

231
  STraverParam p = {.pParam = &type, .pRet = NULL};
12,417,892✔
232
  traverseOperatorTree(pOperator, extractOperatorInfo, &p, id);
12,416,179✔
233
  if (p.code == 0) {
12,411,071✔
234
    *pOptrInfo = p.pRet;
12,413,982✔
235
  }
236
  return p.code;
12,412,355✔
237
}
238

239
typedef struct SExtScanInfo {
240
  int32_t order;
241
  int32_t scanFlag;
242
  int32_t inheritUsOrder;
243
} SExtScanInfo;
244

245
static ERetType extractScanInfo(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdStr) {
×
246
  int32_t       type = pOperator->operatorType;
×
247
  SExtScanInfo* pInfo = pParam->pParam;
×
248

249
  if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN ||
×
250
      type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN ||
×
251
      type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) {
×
252
    pInfo->order = TSDB_ORDER_ASC;
×
253
    pInfo->scanFlag = MAIN_SCAN;
×
254
    return OPTR_FN_RET_ABORT;
×
255
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
×
256
    if (!pInfo->inheritUsOrder) {
×
257
      pInfo->order = TSDB_ORDER_ASC;
×
258
    }
259
    pInfo->scanFlag = MAIN_SCAN;
×
260
    return OPTR_FN_RET_ABORT;
×
261
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
×
262
    STableScanInfo* pTableScanInfo = pOperator->info;
×
263
    pInfo->order = pTableScanInfo->base.cond.order;
×
264
    pInfo->scanFlag = pTableScanInfo->base.scanFlag;
×
265
    return OPTR_FN_RET_ABORT;
×
266
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
×
267
    STableMergeScanInfo* pTableScanInfo = pOperator->info;
×
268
    pInfo->order = pTableScanInfo->base.cond.order;
×
269
    pInfo->scanFlag = pTableScanInfo->base.scanFlag;
×
270
    return OPTR_FN_RET_ABORT;
×
271
  } else {
272
    return OPTR_FN_RET_CONTINUE;
×
273
  }
274
}
275

276
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder) {
×
277
  SExtScanInfo info = {.inheritUsOrder = inheritUsOrder, .order = *order};
×
278
  STraverParam p = {.pParam = &info};
×
279

280
  traverseOperatorTree(pOperator, extractScanInfo, &p, NULL);
×
281
  *order = info.order;
×
282
  *scanFlag = info.scanFlag;
×
283

284
  if (p.code == TSDB_CODE_SUCCESS) {
×
285
    if (!(*order == TSDB_ORDER_ASC || *order == TSDB_ORDER_DESC)) {
×
286
      qError("operator failed at: %s:%d", __func__, __LINE__);
×
287
      p.code = TSDB_CODE_INVALID_PARA;
×
288
    }
289
  }
290
  return p.code;
×
291
}
292

293
static ERetType doStopDataReader(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdStr) {
150,488✔
294
  SStorageAPI* pAPI = pParam->pParam;
150,488✔
295
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
150,488✔
296
    STableScanInfo* pInfo = pOperator->info;
77,614✔
297

298
    if (pInfo->base.dataReader != NULL) {
77,614✔
299
      pAPI->tsdReader.tsdReaderNotifyClosing(pInfo->base.dataReader);
73,920✔
300
    }
301
    return OPTR_FN_RET_ABORT;
77,614✔
302
  } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
72,874✔
303
    STmqQueryScanInfo* pInfo = pOperator->info;
×
304

305
    if (pInfo->pTableScanOp != NULL) {
×
306
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
×
307
      if (pTableScanInfo != NULL && pTableScanInfo->base.dataReader != NULL) {
×
308
        pAPI->tsdReader.tsdReaderNotifyClosing(pTableScanInfo->base.dataReader);
×
309
      }
310
    }
311

312
    return OPTR_FN_RET_ABORT;
×
313
  }
314

315
  return OPTR_FN_RET_CONTINUE;
72,874✔
316
}
317

318
int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr, SStorageAPI* pAPI) {
88,489✔
319
  STraverParam p = {.pParam = pAPI};
88,489✔
320
  traverseOperatorTree(pOperator, doStopDataReader, &p, pIdStr);
88,489✔
321
  return p.code;
88,489✔
322
}
323

324
int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
773,180,717✔
325
                       SNode* pTagIndexCond, const char* pUser, const char* dbname, SOperatorInfo** pOptrInfo,
326
                       EOPTR_EXEC_MODEL model) {
327
  QRY_PARAM_CHECK(pOptrInfo);
773,180,717✔
328

329
  int32_t     code = 0;
773,211,202✔
330
  int32_t     type = nodeType(pPhyNode);
773,211,202✔
331
  const char* idstr = GET_TASKID(pTaskInfo);
773,211,437✔
332

333
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
773,187,505✔
334
    SOperatorInfo* pOperator = NULL;
411,538,810✔
335
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
411,501,579✔
336
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
243,688,737✔
337
      // NOTE: this is an patch to fix the physical plan
338
      // TODO remove it later
339
      if (pTableScanNode->scan.node.pLimit != NULL) {
243,688,737✔
340
        pTableScanNode->groupSort = true;
5,463,306✔
341
      }
342

343
      STableListInfo* pTableListInfo = tableListCreate();
243,706,076✔
344
      if (!pTableListInfo) {
243,717,670✔
345
        pTaskInfo->code = terrno;
×
346
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
347
        return terrno;
×
348
      }
349

350
      // Since virtual stable scan use virtual super table's uid in scan operator, the origin table might be stored on
351
      // different vnode, so we should not get table schema for virtual stable scan.
352
      if (!pTableScanNode->scan.virtualStableScan) {
243,717,670✔
353
        code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo);
239,427,319✔
354
        if (code) {
239,308,553✔
355
          pTaskInfo->code = code;
×
356
          tableListDestroy(pTableListInfo);
×
357
          return code;
×
358
        }
359
      }
360

361
      if (pTableScanNode->scan.node.dynamicOp) {
243,599,052✔
362
        pTaskInfo->dynamicTask = true;
9,974,605✔
363
        pTableListInfo->idInfo.suid = pTableScanNode->scan.suid;
9,974,605✔
364
        pTableListInfo->idInfo.tableType = pTableScanNode->scan.tableType;
9,973,463✔
365
      } else {
366
        code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
233,635,882✔
367
                                       pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo, NULL);
368
        if (code) {
233,740,251✔
369
          pTaskInfo->code = code;
972✔
370
          tableListDestroy(pTableListInfo);
972✔
371
          qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
972✔
372
          return code;
972✔
373
        }
374
      }
375

376
      code = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo, &pOperator);
243,714,883✔
377
      if (NULL == pOperator || code != 0) {
243,648,744✔
378
        pTaskInfo->code = code;
143,787✔
379
        tableListDestroy(pTableListInfo);
23,786✔
380
        return code;
23,786✔
381
      }
382

383
      STableScanInfo* pScanInfo = pOperator->info;
243,504,957✔
384
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
243,623,234✔
385
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
167,812,842✔
386
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
28,547,735✔
387
      STableListInfo*           pTableListInfo = tableListCreate();
28,547,735✔
388
      if (!pTableListInfo) {
28,559,151✔
389
        pTaskInfo->code = terrno;
×
390
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
391
        return terrno;
×
392
      }
393

394
      if (pTableScanNode->scan.node.dynamicOp) {
28,559,151✔
395
        pTaskInfo->dynamicTask = true;
67,928✔
396
        pTableListInfo->idInfo.suid = pTableScanNode->scan.suid;
67,928✔
397
        pTableListInfo->idInfo.tableType = pTableScanNode->scan.tableType;
67,928✔
398
      } else {
399
        code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle, pTableListInfo,
28,490,693✔
400
                                       pTagCond, pTagIndexCond, pTaskInfo, NULL);
401
        if (code) {
28,490,214✔
402
          pTaskInfo->code = code;
×
403
          tableListDestroy(pTableListInfo);
×
404
          qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
×
405
          return code;
×
406
        }
407

408
        code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo);
28,490,214✔
409
        if (code) {
28,460,472✔
410
          pTaskInfo->code = code;
×
411
          tableListDestroy(pTableListInfo);
×
412
          return code;
×
413
        }
414
      }
415

416
      code = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo, &pOperator);
28,528,400✔
417
      if (NULL == pOperator || code != 0) {
28,514,375✔
418
        pTaskInfo->code = code;
440✔
419
        tableListDestroy(pTableListInfo);
×
420
        return code;
×
421
      }
422

423
      STableMergeScanInfo* pScanInfo = pOperator->info;
28,514,681✔
424
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
28,517,137✔
425
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
139,265,107✔
426
      code = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
111,289,461✔
427
                                        pTaskInfo, &pOperator);
428
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
27,975,646✔
429
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
331,801✔
430
      STableListInfo*      pTableListInfo = tableListCreate();
331,801✔
431
      if (!pTableListInfo) {
332,136✔
432
        pTaskInfo->code = terrno;
×
433
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
434
        return terrno;
×
435
      }
436

437
      if (pHandle->vnode && (pTaskInfo->pSubplan->pVTables == NULL)) {
332,136✔
438
        code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
332,136✔
439
                                       pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo, NULL);
440
        if (code) {
332,136✔
441
          pTaskInfo->code = code;
×
442
          tableListDestroy(pTableListInfo);
×
443
          qError("failed to createScanTableListInfo, code:%s", tstrerror(code));
×
444
          return code;
×
445
        }
446
      }
447

448
      code = createTmqScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo, &pOperator);
332,136✔
449
      if (code) {
332,136✔
450
        pTaskInfo->code = code;
×
451
        tableListDestroy(pTableListInfo);
×
452
        return code;
×
453
      }
454
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
27,643,845✔
455
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
16,524,483✔
456
      if (pSysScanPhyNode->scan.virtualStableScan) {
16,524,483✔
457
        STableListInfo*           pTableListInfo = tableListCreate();
5,171,056✔
458
        if (!pTableListInfo) {
5,172,288✔
459
          pTaskInfo->code = terrno;
×
460
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
461
          return terrno;
×
462
        }
463

464
        code = createScanTableListInfo((SScanPhysiNode*)pSysScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
5,172,288✔
465
                                       pTagIndexCond, pTaskInfo, NULL);
466
        if (code != TSDB_CODE_SUCCESS) {
5,170,440✔
467
          pTaskInfo->code = code;
×
468
          tableListDestroy(pTableListInfo);
×
469
          return code;
×
470
        }
471

472
        code = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pTableListInfo, pUser, pTaskInfo, &pOperator);
5,170,440✔
473
      } else {
474
        code = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, NULL, pUser, pTaskInfo, &pOperator);
11,354,031✔
475
      }
476
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN == type) {
11,119,362✔
477
      STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode;
30,555✔
478
      code = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo, &pOperator);
30,555✔
479
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
11,088,807✔
480
      STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode;
8,000,446✔
481
      STableListInfo*    pTableListInfo = tableListCreate();
8,000,446✔
482
      if (!pTableListInfo) {
8,001,629✔
483
        pTaskInfo->code = terrno;
×
484
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
485
        return terrno;
×
486
      }
487

488
      code = initQueriedTableSchemaInfo(pHandle, &pTagScanPhyNode->scan, dbname, pTaskInfo);
8,001,629✔
489
      if (code != TSDB_CODE_SUCCESS) {
7,996,984✔
490
        pTaskInfo->code = code;
×
491
        tableListDestroy(pTableListInfo);
×
492
        return code;
×
493
      }
494

495
      if (!pTagScanPhyNode->onlyMetaCtbIdx) {
7,996,984✔
496
        code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
3,022,795✔
497
                                       pTagIndexCond, pTaskInfo, NULL);
498
        if (code != TSDB_CODE_SUCCESS) {
3,023,393✔
499
          pTaskInfo->code = code;
×
500
          qError("failed to getTableList, code:%s", tstrerror(code));
×
501
          tableListDestroy(pTableListInfo);
×
502
          return code;
×
503
        }
504
      }
505

506
      if (pTagScanPhyNode->scan.node.dynamicOp) {
7,996,239✔
507
        pTaskInfo->dynamicTask = true;
2,134,325✔
508
        pTableListInfo->idInfo.suid = pTagScanPhyNode->scan.suid;
2,134,325✔
509
        pTableListInfo->idInfo.tableType = pTagScanPhyNode->scan.tableType;
2,134,325✔
510
      }
511

512
      code = createTagScanOperatorInfo(pHandle, pTagScanPhyNode, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo,
7,997,851✔
513
                                       &pOperator);
514
      if (code) {
7,997,263✔
515
        pTaskInfo->code = code;
×
516
        tableListDestroy(pTableListInfo);
×
517
        return code;
×
518
      }
519
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
3,088,361✔
520
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
5,501✔
521
      STableListInfo*          pTableListInfo = tableListCreate();
5,501✔
522
      if (!pTableListInfo) {
5,501✔
523
        pTaskInfo->code = terrno;
×
524
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
525
        return terrno;
×
526
      }
527

528
      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
5,501✔
529
        SArray* pList = taosArrayInit(4, sizeof(uint64_t));
4,684✔
530
        code = pTaskInfo->storageAPI.metaFn.getChildTableList(pHandle->vnode, pBlockNode->uid, pList);
4,684✔
531
        if (code != TSDB_CODE_SUCCESS) {
4,684✔
532
          pTaskInfo->code = code;
×
533
          taosArrayDestroy(pList);
×
534
          tableListDestroy(pTableListInfo);
×
535
          return code;
×
536
        }
537

538
        size_t num = taosArrayGetSize(pList);
4,684✔
539
        for (int32_t i = 0; i < num; ++i) {
10,055✔
540
          uint64_t* id = taosArrayGet(pList, i);
5,371✔
541
          if (id == NULL) {
5,371✔
542
            continue;
×
543
          }
544

545
          code = tableListAddTableInfo(pTableListInfo, *id, 0);
5,371✔
546
          if (code) {
5,371✔
547
            pTaskInfo->code = code;
×
548
            tableListDestroy(pTableListInfo);
×
549
            taosArrayDestroy(pList);
×
550
            return code;
×
551
          }
552
        }
553

554
        taosArrayDestroy(pList);
4,684✔
555
      } else {  // Create group with only one table
556
        code = tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
817✔
557
        if (code) {
817✔
558
          pTaskInfo->code = code;
×
559
          tableListDestroy(pTableListInfo);
×
560
          return code;
×
561
        }
562
      }
563

564
      code = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTableListInfo, pTaskInfo, &pOperator);
5,501✔
565
      if (code) {
5,501✔
566
        pTaskInfo->code = code;
×
567
        tableListDestroy(pTableListInfo);
×
568
        return code;
×
569
      }
570
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
3,082,860✔
571
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
1,205,779✔
572
      STableListInfo*        pTableListInfo = tableListCreate();
1,205,779✔
573
      if (!pTableListInfo) {
1,206,208✔
574
        pTaskInfo->code = terrno;
×
575
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
576
        return terrno;
×
577
      }
578

579
      code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond,
1,206,208✔
580
                                     pTagIndexCond, pTaskInfo, NULL);
581
      if (code != TSDB_CODE_SUCCESS) {
1,206,208✔
582
        pTaskInfo->code = code;
×
583
        tableListDestroy(pTableListInfo);
×
584
        return code;
×
585
      }
586

587
      code = initQueriedTableSchemaInfo(pHandle, &pScanNode->scan, dbname, pTaskInfo);
1,206,208✔
588
      if (code != TSDB_CODE_SUCCESS) {
1,206,208✔
589
        pTaskInfo->code = code;
×
590
        tableListDestroy(pTableListInfo);
×
591
        return code;
×
592
      }
593

594
      code = createCacherowsScanOperator(pScanNode, pHandle, pTableListInfo, pTaskInfo, &pOperator);
1,206,208✔
595
      if (code) {
1,206,208✔
596
        tableListDestroy(pTableListInfo);
×
597
        pTaskInfo->code = code;
×
598
        return code;
×
599
      }
600
    } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
1,877,081✔
601
      code = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo, &pOperator);
1,859,560✔
602
    } else if (QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN == type) {
17,521✔
603
      // NOTE: this is an patch to fix the physical plan
604
      code = createVirtualTableMergeOperatorInfo(NULL, 0, (SVirtualScanPhysiNode*)pPhyNode, pTaskInfo, &pOperator);
17,662✔
605
    } else {
606
      code = TSDB_CODE_INVALID_PARA;
×
607
      pTaskInfo->code = code;
×
608
      return code;
×
609
    }
610

611
    if (pOperator != NULL) {  // todo moved away
411,228,920✔
612
      pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
411,403,904✔
613
    }
614

615
    *pOptrInfo = pOperator;
411,222,696✔
616
    return code;
411,284,404✔
617
  }
618

619
  size_t          size = LIST_LENGTH(pPhyNode->pChildren);
361,639,947✔
620
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
361,678,468✔
621
  if (ops == NULL) {
361,650,664✔
622
    code = terrno;
×
623
    pTaskInfo->code = code;
×
624
    return code;
×
625
  }
626

627
  for (int32_t i = 0; i < size; ++i) {
773,455,787✔
628
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
411,946,194✔
629
    code = createOperator(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser, dbname, &ops[i], model);
411,931,939✔
630
    if (ops[i] == NULL || code != 0) {
411,839,113✔
631
      for (int32_t j = 0; j < i; ++j) {
74,232✔
632
        destroyOperator(ops[j]);
×
633
      }
634
      taosMemoryFree(ops);
74,232✔
635
      return code;
74,232✔
636
    }
637
  }
638

639
  SOperatorInfo* pOptr = NULL;
361,509,593✔
640
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
361,535,400✔
641
    code = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
145,349,955✔
642
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
216,185,445✔
643
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
127,856,388✔
644
    if (pAggNode->pGroupKeys != NULL) {
127,856,388✔
645
      code = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo, &pOptr);
32,091,636✔
646
    } else {
647
      code = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo, &pOptr);
95,784,401✔
648
    }
649
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
88,329,057✔
650
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
4,682,375✔
651
    code = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, &pOptr);
4,682,375✔
652
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
83,646,682✔
653
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
800,624✔
654
    code = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, &pOptr);
800,624✔
655
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
82,846,058✔
656
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
×
657
    code = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, &pOptr);
×
658
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
82,846,058✔
659
    code = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
32,739,020✔
660
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
50,107,038✔
661
    code = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
78,786✔
662
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
50,028,252✔
663
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
13,882,146✔
664
    code = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo, &pOptr);
13,882,146✔
665
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
36,146,106✔
666
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
562,157✔
667
    code = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo, &pOptr);
562,157✔
668
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
35,583,949✔
669
    code = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
2,987,701✔
670
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
32,596,248✔
671
    SStateWindowPhysiNode* pStateNode = (SStateWindowPhysiNode*)pPhyNode;
982,775✔
672
    code = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo, &pOptr);
982,775✔
673
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
31,613,473✔
674
    code = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
13,005,836✔
675
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN == type) {
18,607,637✔
676
    code = createHashJoinOperatorInfo(ops, size, (SHashJoinPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
1,226,627✔
677
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
17,381,010✔
678
    code = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
514,283✔
679
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
16,866,727✔
680
    code = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
3,130,450✔
681
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
13,736,277✔
682
    code = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
3,472,252✔
683
  } else if (QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC == type) {
10,264,025✔
684
    code = createForecastOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
×
685
  } else if (QUERY_NODE_PHYSICAL_PLAN_ANALYSIS_FUNC == type) {
10,264,025✔
686
    code = createGenericAnalysisOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
×
687
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT == type) {
10,264,025✔
688
    code = createEventwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
314,025✔
689
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE == type) {
9,950,000✔
690
    code = createGroupCacheOperatorInfo(ops, size, (SGroupCachePhysiNode*)pPhyNode, pTaskInfo, &pOptr);
1,225,829✔
691
  } else if (QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL == type) {
8,724,171✔
692
    code = createDynQueryCtrlOperatorInfo(ops, size, (SDynQueryCtrlPhysiNode*)pPhyNode, pTaskInfo, pHandle->pMsgCb, &pOptr);
3,436,129✔
693
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT == type) {
5,288,042✔
694
    code = createCountwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
251,626✔
695
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY == type) {
5,036,416✔
696
    code = createAnomalywindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
×
697
  } else if (QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN == type) {
5,036,416✔
698
    SVirtualScanPhysiNode* pVirtualTableScanNode = (SVirtualScanPhysiNode*)pPhyNode;
3,900,725✔
699
    // NOTE: this is an patch to fix the physical plan
700

701
    if (pVirtualTableScanNode->scan.node.pLimit != NULL) {
3,900,725✔
702
      pVirtualTableScanNode->groupSort = true;
×
703
    }
704
    code = createVirtualTableMergeOperatorInfo(ops, size, (SVirtualScanPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
3,900,725✔
705
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_EXTERNAL == type) {
1,135,691✔
706
    code = createExternalWindowOperator(ops[0], pPhyNode, pTaskInfo, &pOptr);
1,115,719✔
707
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_EXTERNAL == type) {
20,216✔
708
    code = createMergeAlignedExternalWindowOperator(ops[0], pPhyNode, pTaskInfo, &pOptr);
20,460✔
709
  } else {
710
    code = TSDB_CODE_INVALID_PARA;
×
711
    pTaskInfo->code = code;
×
712
    for (int32_t i = 0; i < size; ++i) {
×
713
      destroyOperator(ops[i]);
×
714
    }
715
    taosMemoryFree(ops);
×
716
    qError("invalid operator type %d", type);
×
717
    return code;
×
718
  }
719

720
  taosMemoryFree(ops);
361,547,811✔
721
  if (pOptr) {
361,485,950✔
722
    pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
361,036,119✔
723
  }
724

725
  *pOptrInfo = pOptr;
361,522,924✔
726
  return code;
361,507,821✔
727
}
728

729
void destroyOperator(SOperatorInfo* pOperator) {
774,069,462✔
730
  if (pOperator == NULL) {
774,069,462✔
731
    return;
473,498✔
732
  }
733

734
  freeResetOperatorParams(pOperator, OP_GET_PARAM, true);
773,595,964✔
735
  freeResetOperatorParams(pOperator, OP_NOTIFY_PARAM, true);
773,583,482✔
736

737
  if (pOperator->pDownstream != NULL) {
773,583,014✔
738
    for (int32_t i = 0; i < pOperator->numOfRealDownstream; ++i) {
772,583,794✔
739
      destroyOperator(pOperator->pDownstream[i]);
411,449,325✔
740
    }
741

742
    taosMemoryFreeClear(pOperator->pDownstream);
361,148,990✔
743
    pOperator->numOfDownstream = 0;
361,140,151✔
744
  }
745

746
  cleanupExprSupp(&pOperator->exprSupp);
773,567,572✔
747
  if (pOperator->fpSet.closeFn != NULL && pOperator->info != NULL) {
773,560,582✔
748
    pOperator->fpSet.closeFn(pOperator->info);
773,120,132✔
749
    pOperator->info = NULL;
773,038,668✔
750
  }
751

752
  taosMemoryFreeClear(pOperator);
773,512,539✔
753
}
754

755
void destroyOperatorAndDownstreams(SOperatorInfo* pOperator, SOperatorInfo** downstreams, int32_t num) {
448,740✔
756
  if (downstreams != NULL) {
448,740✔
757
    for (int i = 0; i < num; i++) {
897,480✔
758
      destroyOperator(downstreams[i]);
448,740✔
759
    }
760
  }
761

762
  if (pOperator != NULL) {
448,740✔
763
    pOperator->info = NULL;
448,740✔
764
    if (pOperator->pDownstream != NULL) {
448,740✔
765
      taosMemoryFreeClear(pOperator->pDownstream);
×
766
      pOperator->pDownstream = NULL;
×
767
    }
768
    destroyOperator(pOperator);
448,740✔
769
  }
770
}
448,740✔
771

772
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
7,147,643✔
773
  SExplainExecInfo  execInfo = {0};
7,147,643✔
774
  SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
7,147,643✔
775
  if (pExplainInfo == NULL) {
7,147,643✔
776
    return terrno;
×
777
  }
778

779
  pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
7,147,643✔
780
  pExplainInfo->verboseLen = 0;
7,147,643✔
781
  pExplainInfo->verboseInfo = NULL;
7,147,643✔
782
  pExplainInfo->vgId = operatorInfo->pTaskInfo->id.vgId;
7,147,248✔
783
  pExplainInfo->execCreate = operatorInfo->cost.execCreate;
7,148,440✔
784

785
  /*
786
    For the start, first and last row ts, we need to subtract the execCreate time
787
    to compute the real elapsed time since the operator is created.
788
  */
789
  if (operatorInfo->resultInfo.totalRows > 0) {
7,147,643✔
790
    /*
791
      When there is no data returned, keep execFirstRow and execLastRow as 0.
792
    */
793
    pExplainInfo->execFirstRow = operatorInfo->cost.execFirstRow - operatorInfo->cost.execCreate;
6,035,396✔
794
    pExplainInfo->execLastRow = operatorInfo->cost.execLastRow - operatorInfo->cost.execCreate;
6,035,001✔
795
  }
796

797
  pExplainInfo->execTimes = operatorInfo->cost.execTimes;
7,147,241✔
798
  if (operatorInfo->cost.execTimes > 0) {
7,148,440✔
799
    pExplainInfo->execStart = operatorInfo->cost.execStart - operatorInfo->cost.execCreate;
7,144,201✔
800
    pExplainInfo->execElapsed = operatorInfo->cost.execElapsed;
7,143,806✔
801
    pExplainInfo->inputWaitElapsed = operatorInfo->cost.inputWaitElapsed;
7,142,212✔
802
    pExplainInfo->outputWaitElapsed = operatorInfo->cost.outputWaitElapsed;
7,143,806✔
803
    pExplainInfo->inputRows = operatorInfo->cost.inputRows;
7,143,009✔
804
  }
805

806
  if (operatorInfo->fpSet.getExplainFn) {
7,146,860✔
807
    int32_t code =
808
        operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
4,753,593✔
809
    if (code) {
4,754,383✔
810
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
×
811
      return code;
×
812
    }
813
  }
814

815
  int32_t code = 0;
7,147,643✔
816
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
11,189,430✔
817
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
4,042,182✔
818
    if (code != TSDB_CODE_SUCCESS) {
4,041,787✔
819
      //      taosMemoryFreeClear(*pRes);
820
      return code;
×
821
    }
822
  }
823

824
  return TSDB_CODE_SUCCESS;
7,147,650✔
825
}
826

827
int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc) {
×
828
  if (pDst->opType != pSrc->opType) {
×
829
    qError("different optype %d:%d for merge operator params", pDst->opType, pSrc->opType);
×
830
    return TSDB_CODE_INVALID_PARA;
×
831
  }
832

833
  switch (pDst->opType) {
×
834
    case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
×
835
      SExchangeOperatorParam* pDExc = pDst->value;
×
836
      SExchangeOperatorParam* pSExc = pSrc->value;
×
837
      if (pSExc->basic.paramType != DYN_TYPE_EXCHANGE_PARAM) {
×
838
        qError("%s, invalid exchange operator param type %d for "
×
839
          "source operator", __func__, pSExc->basic.paramType);
840
        return TSDB_CODE_INVALID_PARA;
×
841
      }
842
      if (!pDExc->multiParams) {
×
843
        if (pDExc->basic.paramType != DYN_TYPE_EXCHANGE_PARAM) {
×
844
          qError("%s, invalid exchange operator param type %d for "
×
845
            "destination operator", __func__, pDExc->basic.paramType);
846
          return TSDB_CODE_INVALID_PARA;
×
847
        }
848
        if (pSExc->basic.vgId != pDExc->basic.vgId) {
×
849
          SExchangeOperatorBatchParam* pBatch = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
×
850
          if (NULL == pBatch) {
×
851
            return terrno;
×
852
          }
853

854
          pBatch->multiParams = true;
×
855
          pBatch->pBatchs = tSimpleHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
856
          if (NULL == pBatch->pBatchs) {
×
857
            taosMemoryFree(pBatch);
×
858
            return terrno;
×
859
          }
860

861
          tSimpleHashSetFreeFp(pBatch->pBatchs, freeExchangeGetBasicOperatorParam);
×
862

863
          int32_t code = tSimpleHashPut(pBatch->pBatchs, &pDExc->basic.vgId, sizeof(pDExc->basic.vgId), &pDExc->basic,
×
864
                                        sizeof(pDExc->basic));
865
          if (code) {
×
866
            return code;
×
867
          }
868

869
          code = tSimpleHashPut(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId), &pSExc->basic,
×
870
                                sizeof(pSExc->basic));
871
          if (code) {
×
872
            return code;
×
873
          }
874

875
          taosMemoryFree(pDst->value);
×
876
          pDst->value = pBatch;
×
877
        } else {
878
          void* p = taosArrayAddAll(pDExc->basic.uidList, pSExc->basic.uidList);
×
879
          if (p == NULL) {
×
880
            return terrno;
×
881
          }
882
        }
883
      } else {
884
        SExchangeOperatorBatchParam* pBatch = pDst->value;
×
885
        SExchangeOperatorBasicParam* pBasic =
886
            tSimpleHashGet(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId));
×
887
        if (pBasic) {
×
888
          void* p = taosArrayAddAll(pBasic->uidList, pSExc->basic.uidList);
×
889
          if (p == NULL) {
×
890
            return terrno;
×
891
          }
892
        } else {
893
          int32_t code = tSimpleHashPut(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId), &pSExc->basic,
×
894
                                        sizeof(pSExc->basic));
895
          if (code) {
×
896
            return code;
×
897
          }
898
        }
899
      }
900
      break;
×
901
    }
902
    default:
×
903
      qError("invalid optype %d for merge operator params", pDst->opType);
×
904
      return TSDB_CODE_INVALID_PARA;
×
905
  }
906

907
  return TSDB_CODE_SUCCESS;
×
908
}
909

910
int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInput, SOperatorParamType type) {
75,950,901✔
911
  SOperatorParam**  ppParam = NULL;
75,950,901✔
912
  SOperatorParam*** pppDownstramParam = NULL;
75,950,901✔
913
  switch (type) {
75,950,901✔
914
    case OP_GET_PARAM:
75,950,901✔
915
      ppParam = &pOperator->pOperatorGetParam;
75,950,901✔
916
      pppDownstramParam = &pOperator->pDownstreamGetParams;
75,951,517✔
917
      break;
75,950,913✔
918
    case OP_NOTIFY_PARAM:
×
919
      ppParam = &pOperator->pOperatorNotifyParam;
×
920
      pppDownstramParam = &pOperator->pDownstreamNotifyParams;
×
921
      break;
×
922
    default:
×
923
      return TSDB_CODE_INVALID_PARA;
×
924
  }
925

926
  freeResetOperatorParams(pOperator, type, false);
75,950,913✔
927

928
  if (NULL == pInput) {
75,952,769✔
929
    return TSDB_CODE_SUCCESS;
×
930
  }
931

932
  *ppParam = (pInput->opType == pOperator->operatorType) ? pInput : NULL;
75,952,769✔
933

934
  if (NULL == *pppDownstramParam) {
75,952,153✔
935
    *pppDownstramParam = taosMemoryCalloc(pOperator->numOfDownstream, POINTER_BYTES);
16,561,157✔
936
    if (NULL == *pppDownstramParam) {
16,558,712✔
937
      return terrno;
×
938
    }
939
  }
940

941
  if (NULL == *ppParam) {
75,950,921✔
942
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
×
943
      (*pppDownstramParam)[i] = pInput;
×
944
    }
945
    return TSDB_CODE_SUCCESS;
×
946
  }
947

948
  memset(*pppDownstramParam, 0, pOperator->numOfDownstream * POINTER_BYTES);
75,951,537✔
949

950
  int32_t childrenNum = taosArrayGetSize((*ppParam)->pChildren);
75,952,757✔
951
  if (childrenNum <= 0) {
75,952,769✔
952
    return TSDB_CODE_SUCCESS;
68,176,997✔
953
  }
954

955
  for (int32_t i = 0; i < childrenNum; ++i) {
19,903,379✔
956
    SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet((*ppParam)->pChildren, i);
12,125,759✔
957
    if (pChild == NULL) {
12,126,991✔
958
      return terrno;
×
959
    }
960

961
    if ((*pppDownstramParam)[pChild->downstreamIdx]) {
12,126,991✔
962
      int32_t code = mergeOperatorParams((*pppDownstramParam)[pChild->downstreamIdx], pChild);
×
963
      if (code) {
×
964
        return code;
×
965
      }
966
    } else {
967
      (*pppDownstramParam)[pChild->downstreamIdx] = pChild;
12,125,749✔
968
    }
969
  }
970

971
  taosArrayDestroy((*ppParam)->pChildren);
7,777,620✔
972
  (*ppParam)->pChildren = NULL;
7,775,146✔
973

974
  return TSDB_CODE_SUCCESS;
7,775,772✔
975
}
976

977
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) {
864,966,253✔
978
  recordOpExecBeforeDownstream(pOperator);
864,966,253✔
979
  SSDataBlock* p = NULL;
864,882,021✔
980
  int32_t      code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p);
864,898,218✔
981
  if (code == TSDB_CODE_SUCCESS) {
862,330,837✔
982
    code = blockDataCheck(p);
862,325,211✔
983
    if (code != TSDB_CODE_SUCCESS) {
862,418,056✔
984
      qError("%s, %s failed at line %d, code:%s", GET_TASKID(pOperator->pTaskInfo),
×
985
             __func__, __LINE__, tstrerror(code));
986
    }
987
  }
988
  recordOpExecAfterDownstream(pOperator, p && code == TSDB_CODE_SUCCESS ? p->info.rows : 0);
862,380,849✔
989
  return (code == TSDB_CODE_SUCCESS) ? p : NULL;
862,354,703✔
990
}
991

992
SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, int32_t idx) {
71,581,805✔
993
  recordOpExecBeforeDownstream(pOperator);
71,581,805✔
994
  SSDataBlock* p = NULL;
71,580,897✔
995
  int32_t      code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p);
71,581,299✔
996
  if (code == TSDB_CODE_SUCCESS) {
71,570,773✔
997
    code = blockDataCheck(p);
71,570,773✔
998
    if (code != TSDB_CODE_SUCCESS) {
71,571,785✔
999
      qError("%s, %s failed at line %d, code:%s", GET_TASKID(pOperator->pTaskInfo),
×
1000
             __func__, __LINE__, tstrerror(code));
1001
    }
1002
  }
1003
  recordOpExecAfterDownstream(pOperator, p && code == TSDB_CODE_SUCCESS ? p->info.rows : 0);
71,571,279✔
1004
  return (code == TSDB_CODE_SUCCESS) ? p : NULL;
71,571,279✔
1005
}
1006

1007
/*
1008
 * Fetch one block from downstream without preserving reused get-param ownership.
1009
 *
1010
 * @param pOperator Current operator.
1011
 * @param idx Downstream index.
1012
 * @param pResBlock Output result block pointer.
1013
 *
1014
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
1015
 */
1016
static FORCE_INLINE int32_t getNextBlockFromDownstreamRemainDetachImpl(struct SOperatorInfo* pOperator, int32_t idx,
1017
                                                                       SSDataBlock** pResBlock) {
1018
  QRY_PARAM_CHECK(pResBlock);
546,714,245✔
1019

1020
  int32_t code = TSDB_CODE_SUCCESS;
546,672,829✔
1021
  int32_t lino = 0;
546,672,829✔
1022

1023
  if (pOperator->pDownstreamGetParams && pOperator->pDownstreamGetParams[idx]) {
550,221,304✔
1024
    SOperatorParam* pGetParam = pOperator->pDownstreamGetParams[idx];
3,549,091✔
1025
    pOperator->pDownstreamGetParams[idx] = NULL;
3,548,465✔
1026
    // Once detached from the parent operator, downstream must own/free this param.
1027
    pGetParam->reUse = false;
3,548,465✔
1028

1029
    qDebug("DynOp: op %s start to get block from downstream %s", pOperator->name, pOperator->pDownstream[idx]->name);
3,549,081✔
1030
    code = pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx], pGetParam, pResBlock);
3,549,081✔
1031
    QUERY_CHECK_CODE(code, lino, _return);
3,548,475✔
1032
  } else {
1033
    code = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx], pResBlock);
543,280,767✔
1034
    QUERY_CHECK_CODE(code, lino, _return);
541,522,375✔
1035
  }
1036

1037
_return:
541,522,375✔
1038
  if (code) {
545,070,850✔
1039
    qError("failed to get next data block from upstream at %s, line:%d code:%s", __func__, lino, tstrerror(code));
×
1040
  }
1041
  return code;
545,058,299✔
1042
}
1043

1044
/*
1045
 * Fetch and validate one downstream block while detaching one-shot get-param.
1046
 *
1047
 * @param pOperator Current operator.
1048
 * @param idx Downstream index.
1049
 *
1050
 * @return Valid block pointer on success, or NULL on failure.
1051
 */
1052
SSDataBlock* getNextBlockFromDownstreamRemainDetach(struct SOperatorInfo* pOperator, int32_t idx) {
546,685,581✔
1053
  SSDataBlock* p = NULL;
546,685,581✔
1054
  int32_t      code = TSDB_CODE_SUCCESS;
546,772,936✔
1055
  int32_t      lino = 0;
546,772,936✔
1056
  recordOpExecBeforeDownstream(pOperator);
546,772,936✔
1057

1058
  code = getNextBlockFromDownstreamRemainDetachImpl(pOperator, idx, &p);
545,058,299✔
1059
  QUERY_CHECK_CODE(code, lino, _return);
545,058,299✔
1060

1061
  code = blockDataCheck(p);
545,058,299✔
1062
  QUERY_CHECK_CODE(code, lino, _return);
545,145,539✔
1063

1064
_return:
545,145,539✔
1065
  recordOpExecAfterDownstream(pOperator, p && code == TSDB_CODE_SUCCESS ? p->info.rows : 0);
545,176,444✔
1066
  if (code) {
545,175,810✔
1067
    qError("failed to get next data block from downstream at %s, line:%d code:%s", __func__, lino, tstrerror(code));
113,433✔
1068
    return NULL;
×
1069
  }
1070
  return p;
545,063,119✔
1071
}
1072

1073
int32_t optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SSDataBlock** pRes) {
75,952,769✔
1074
  QRY_PARAM_CHECK(pRes);
75,952,769✔
1075

1076
  int32_t lino = 0;
75,952,769✔
1077
  int32_t code = setOperatorParams(pOperator, pParam, OP_GET_PARAM);
75,952,769✔
1078
  QUERY_CHECK_CODE(code, lino, _end);
75,952,759✔
1079
  code = pOperator->fpSet.getNextFn(pOperator, pRes);
75,952,759✔
1080
  QUERY_CHECK_CODE(code, lino, _end);
75,949,135✔
1081

1082
_end:
75,949,135✔
1083
  if (code != TSDB_CODE_SUCCESS) {
75,949,135✔
1084
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1085
    pOperator->pTaskInfo->code = code;
×
1086
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1087
  }
1088

1089
  return code;
75,949,135✔
1090
}
1091

1092
int32_t optrDefaultNotifyFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
×
1093
  int32_t code = setOperatorParams(pOperator, pParam, OP_NOTIFY_PARAM);
×
1094
  if (TSDB_CODE_SUCCESS == code && pOperator->fpSet.notifyFn && pOperator->pOperatorNotifyParam) {
×
1095
    code = pOperator->fpSet.notifyFn(pOperator, pOperator->pOperatorNotifyParam);
×
1096
  }
1097
  if (TSDB_CODE_SUCCESS == code) {
×
1098
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
×
1099
      if (pOperator->pDownstreamNotifyParams[i]) {
×
1100
        code = optrDefaultNotifyFn(pOperator->pDownstream[i], pOperator->pDownstreamNotifyParams[i]);
×
1101
        if (TSDB_CODE_SUCCESS != code) {
×
1102
          break;
×
1103
        }
1104
        pOperator->pDownstreamNotifyParams[i] = NULL;
×
1105
      }
1106
    }
1107
  }
1108
  if (TSDB_CODE_SUCCESS != code) {
×
1109
    pOperator->pTaskInfo->code = code;
×
1110
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1111
  }
1112

1113
  return code;
×
1114
}
1115

1116
int64_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx) {
29,252,271✔
1117
  if (pOperator->transparent) {
29,252,271✔
1118
    return getOperatorResultBlockId(pOperator->pDownstream[idx], 0);
2,451,658✔
1119
  }
1120
  return pOperator->resultDataBlockId;
26,801,648✔
1121
}
1122

1123
void resetOperatorState(SOperatorInfo *pOper) {
×
1124
  pOper->status = OP_NOT_OPENED;
×
1125
}
×
1126

1127
void resetBasicOperatorState(SOptrBasicInfo *pBasicInfo) {
15,982,991✔
1128
  if (pBasicInfo->pRes) blockDataCleanup(pBasicInfo->pRes);
15,982,991✔
1129
  initResultRowInfo(&pBasicInfo->resultRowInfo);
15,983,699✔
1130
}
15,983,997✔
1131

1132
int32_t resetAggSup(SExprSupp* pExprSupp, SAggSupporter* pSup, SExecTaskInfo* pTaskInfo,
13,837,498✔
1133
                    SNodeList* pNodeList, SNodeList* pGroupKeys, size_t keyBufSize, const char* pKey, void* pState,
1134
                    SFunctionStateStore* pStore) {
1135
  int32_t    code = 0, lino = 0, num = 0;
13,837,498✔
1136
  SExprInfo* pExprInfo = NULL;
13,837,498✔
1137
  cleanupAggSup(pSup);
13,837,734✔
1138
  cleanupExprSuppWithoutFilter(pExprSupp);
13,837,262✔
1139
  code = createExprInfo(pNodeList, pGroupKeys, &pExprInfo, &num);
13,837,282✔
1140
  QUERY_CHECK_CODE(code, lino, _error);
13,836,998✔
1141
  code = initAggSup(pExprSupp, pSup, pExprInfo, num, keyBufSize, pKey, pState, pStore);
13,836,998✔
1142
  QUERY_CHECK_CODE(code, lino, _error);
13,836,581✔
1143
  return code;
13,836,581✔
1144
_error:
×
1145
  if (code != TSDB_CODE_SUCCESS) {
×
1146
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1147
    pTaskInfo->code = code;
×
1148
  }
1149
  return code;
×
1150
}
1151

1152
int32_t resetExprSupp(SExprSupp* pExprSupp, SExecTaskInfo* pTaskInfo, SNodeList* pNodeList,
5,594,036✔
1153
                      SNodeList* pGroupKeys, SFunctionStateStore* pStore) {
1154
  int32_t code = 0, lino = 0, num = 0;
5,594,036✔
1155
  SExprInfo* pExprInfo = NULL;
5,594,036✔
1156
  cleanupExprSuppWithoutFilter(pExprSupp);
5,594,275✔
1157
  code = createExprInfo(pNodeList, pGroupKeys, &pExprInfo, &num);
5,594,275✔
1158
  QUERY_CHECK_CODE(code, lino, _error);
5,593,570✔
1159
  code = initExprSupp(pExprSupp, pExprInfo, num, pStore);
5,593,570✔
1160
  QUERY_CHECK_CODE(code, lino, _error);
5,593,809✔
1161
  return code;
5,593,809✔
1162
_error:
×
1163
  if (code != TSDB_CODE_SUCCESS) {
×
1164
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1165
    pTaskInfo->code = code;
×
1166
  }
1167
  return code;
×
1168
}
1169

1170
int32_t copyColumnsValue(SNodeList* pNodeList, int64_t targetBlkId, SSDataBlock* pDst, SSDataBlock* pSrc, int32_t totalRows) {
1,358,245✔
1171
  bool    isNull = (NULL == pSrc || pSrc->info.rows <= 0);
1,358,245✔
1172
  size_t  numOfCols = LIST_LENGTH(pNodeList);
1,358,245✔
1173
  int64_t numOfRows = isNull ? 0 : pSrc->info.rows;
1,358,245✔
1174
  int32_t code = 0;
1,358,245✔
1175
  int32_t lino = 0;
1,358,245✔
1176

1177
  for (int32_t i = 0; i < numOfCols; ++i) {
9,259,576✔
1178
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
7,901,331✔
1179
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == targetBlkId) {
7,901,331✔
1180
      SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, pNode->slotId);
4,207,083✔
1181
      QUERY_CHECK_NULL(pDstCol, code, lino, _return, terrno)
4,207,083✔
1182

1183
      if (isNull) {
4,207,083✔
1184
        colDataSetNNULL(pDstCol, 0, totalRows);
×
1185
      } else {
1186
        SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, ((SColumnNode*)pNode->pExpr)->slotId);
4,207,083✔
1187
        QUERY_CHECK_NULL(pSrcCol, code, lino, _return, terrno)
4,207,083✔
1188

1189
        code = colDataAssign(pDstCol, pSrcCol, pSrc->info.rows, &pDst->info);
4,207,083✔
1190

1191
        QUERY_CHECK_CODE(code, lino, _return);
4,207,083✔
1192
        if (pSrc->info.rows < totalRows) {
4,207,083✔
1193
          colDataSetNNULL(pDstCol, pSrc->info.rows, totalRows - pSrc->info.rows);
95,151✔
1194
        }
1195
      }
1196
    }
1197
  }
1198

1199
  return code;
1,358,245✔
1200
_return:
×
1201
  qError("failed to copy columns value, line:%d code:%s", lino, tstrerror(code));
×
1202
  return code;
×
1203
}
1204

1205
/**
1206
  @brief Record the create time of the operator and do initialization
1207
  to other metrics. This function will be called at the beginning of
1208
  creation of operators.
1209
*/
1210
void initOperatorCostInfo(SOperatorInfo* pOperator) {
773,506,291✔
1211
  pOperator->cost.execCreate = taosGetTimestampUs();
773,559,423✔
1212
  resetOperatorCostInfo(pOperator);
773,558,714✔
1213
}
773,524,931✔
1214

1215
/**
1216
  @brief Record the performance metrics at the beginning of the
1217
  operator execution:
1218
  - record the start time of the operator execution
1219
  - calculate the output wait time (time since last call returned)
1220
  - record the first time nextFn interface is called
1221
  Only record the metrics in explain analyze mode.
1222
*/
1223
void recordOpExecBegin(SOperatorInfo* pOperator) {
2,147,483,647✔
1224
  if (QUERY_ENABLE_EXPLAIN(pOperator->pTaskInfo)) {
2,147,483,647✔
1225
    pOperator->cost.startTs = taosGetTimestampUs();
19,804,994✔
1226
    // calculate output wait time (time since last call returned)
1227
    if (pOperator->cost.execLastRow > 0) {
19,805,784✔
1228
      pOperator->cost.outputWaitElapsed +=
25,160,200✔
1229
        pOperator->cost.startTs - pOperator->cost.execLastRow;
12,581,226✔
1230
    }
1231
    // record the first time nextFn is called
1232
    if (pOperator->cost.execStart == 0) {
19,803,010✔
1233
      pOperator->cost.execStart = pOperator->cost.startTs;
7,206,155✔
1234
    }
1235
  }
1236
}
2,147,483,647✔
1237

1238
/**
1239
  @brief Record the performance metrics before the downstream execution:
1240
  - record the elapsed time since the operator execution started
1241
  - update the start time of the operator execution
1242
  Only record the metrics in explain analyze mode.
1243
*/
1244
void recordOpExecBeforeDownstream(SOperatorInfo* pOperator) {
1,782,452,710✔
1245
  if (QUERY_ENABLE_EXPLAIN(pOperator->pTaskInfo)) {
1,782,452,710✔
1246
    pOperator->cost.endTs = taosGetTimestampUs();
11,184,172✔
1247
    if (UNLIKELY(pOperator->cost.startTs == 0)) {
11,184,567✔
1248
      /**
1249
        As long as getNextBlockFromDownstream() is called inside getNextFn(),
1250
        startTs must be initialized before endTs so that this condition should
1251
        always be false.
1252
      */
1253
      pOperator->cost.startTs = pOperator->cost.endTs;
×
1254
    }
1255
    pOperator->cost.execElapsed +=
22,371,385✔
1256
      pOperator->cost.endTs - pOperator->cost.startTs;
11,185,760✔
1257
  }
1258
}
1,782,638,204✔
1259

1260
/**
1261
  @brief Record the performance metrics after the downstream execution:
1262
  - record the elapsed time since the downstream execution started
1263
  - update the end time of the downstream execution
1264
  Only record the metrics in explain analyze mode.
1265
*/
1266
void recordOpExecAfterDownstream(SOperatorInfo* pOperator, size_t inputRows) {
1,778,217,566✔
1267
  if (QUERY_ENABLE_EXPLAIN(pOperator->pTaskInfo)) {
1,778,217,566✔
1268
    pOperator->cost.startTs = taosGetTimestampUs();
11,183,505✔
1269
    pOperator->cost.inputWaitElapsed +=
22,363,438✔
1270
      pOperator->cost.startTs - pOperator->cost.endTs;
11,182,447✔
1271
    pOperator->cost.inputRows += inputRows;
11,181,916✔
1272
  }
1273
}
1,778,439,324✔
1274

1275
/**
1276
  @brief Record the performance metrics at the end of the
1277
  operator execution:
1278
  - record the number of times the operator's next interface is called
1279
  - record the elapsed time for executing the operator's nextFn interface
1280
  - record the time when the first row is returned
1281
  - record the time when the last row is returned
1282
  Only record the metrics in explain analyze mode.
1283
*/
1284
void recordOpExecEnd(SOperatorInfo* pOperator, size_t rows) {
2,147,483,647✔
1285
  if (QUERY_ENABLE_EXPLAIN(pOperator->pTaskInfo)) {
2,147,483,647✔
1286
    pOperator->cost.endTs = taosGetTimestampUs();
19,790,257✔
1287
    pOperator->cost.execTimes++;
19,790,532✔
1288
    pOperator->cost.execElapsed +=
39,577,223✔
1289
      pOperator->cost.endTs - pOperator->cost.startTs;
19,789,333✔
1290

1291
    if (rows > 0) {
19,788,138✔
1292
      // record the first time data is returned
1293
      if (pOperator->cost.execFirstRow == 0) {
12,620,061✔
1294
        pOperator->cost.execFirstRow = pOperator->cost.endTs;
6,073,725✔
1295
      }
1296
      pOperator->cost.execLastRow = pOperator->cost.endTs;
12,620,071✔
1297
    }
1298
  }
1299
  
1300
  /* accumulate output total rows even not in explain mode */
1301
  pOperator->resultInfo.totalRows += rows;
2,147,483,647✔
1302
}
2,147,483,647✔
1303

1304
/**
1305
  @brief Reset operator's cost info but keep create time unchanged.
1306
*/
1307
void resetOperatorCostInfo(SOperatorInfo* pOperator) {
806,162,546✔
1308
  /* keep execCreate UNCHANGED!!! */
1309
  pOperator->cost.execStart = 0;
806,162,546✔
1310
  pOperator->cost.execFirstRow = 0;
806,199,527✔
1311
  pOperator->cost.execLastRow = 0;
806,187,952✔
1312
  pOperator->cost.execTimes = 0;
806,174,946✔
1313
  pOperator->cost.execElapsed = 0;
806,172,868✔
1314
  pOperator->cost.inputWaitElapsed = 0;
806,192,123✔
1315
  pOperator->cost.outputWaitElapsed = 0;
806,180,845✔
1316
  pOperator->cost.inputRows = 0;
806,159,482✔
1317
}
806,174,969✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc