• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.09
/source/libs/executor/src/operator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "executor.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "osTime.h"
21
#include "query.h"
22
#include "querytask.h"
23
#include "storageapi.h"
24
#include "taoserror.h"
25
#include "tdatablock.h"
26
#include "tdef.h"
27
#include "tglobal.h"
28
#include "tutil.h"
29

30
static int32_t optrGetNextFnWithExecRecord(SOperatorInfo* pOperator,
31
                                           SSDataBlock** pResBlock);
32
static int32_t optrGetNextExtFnWithExecRecord(SOperatorInfo* pOperator,
33
                                              SOperatorParam* pParam,
34
                                              SSDataBlock** pResBlock);
35
static int32_t optrResetStateFnWithExecRecord(SOperatorInfo* pParam);
36

37
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
785,907,240✔
38
                                   __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain,
39
                                   __optr_get_ext_fn_t nextExtFn, __optr_notify_fn_t notifyFn) {
40
  SOperatorFpSet fpSet = {
2,147,483,647✔
41
      ._openFn = openFn,
42
      ._nextFn = nextFn,
43
      .getNextFn = (nextFn != NULL) ? optrGetNextFnWithExecRecord : NULL,
785,907,240✔
44
      .cleanupFn = cleanup,
45
      .closeFn = closeFn,
46
      .reqBufFn = reqBufFn,
47
      .getExplainFn = explain,
48
      ._nextExtFn = nextExtFn,
49
      .getNextExtFn = (nextExtFn != NULL) ? optrGetNextExtFnWithExecRecord : NULL,
785,907,240✔
50
      .notifyFn = notifyFn,
51
      .releaseStreamStateFn = NULL,
52
      .reloadStreamStateFn = NULL,
53
      ._resetFn = NULL,
54
      .resetStateFn = NULL,
55
  };
56

57
  return fpSet;
785,907,240✔
58
}
59

60
static int32_t optrGetNextFnWithExecRecord(SOperatorInfo* pOperator,
2,147,483,647✔
61
                                           SSDataBlock** pResBlock) {
62
  QRY_PARAM_CHECK(pResBlock);
2,147,483,647✔
63

64
  recordOpExecBegin(pOperator);
2,147,483,647✔
65
  int32_t code = pOperator->fpSet._nextFn(pOperator, pResBlock);
2,147,483,647✔
66
  size_t  rows = (TSDB_CODE_SUCCESS == code && *pResBlock != NULL) ?
2,147,483,647✔
67
                 (*pResBlock)->info.rows : 0;
2,147,483,647✔
68
  recordOpExecEnd(pOperator, rows);
2,147,483,647✔
69
  return code;
2,147,483,647✔
70
}
71

72
static int32_t optrGetNextExtFnWithExecRecord(SOperatorInfo* pOperator,
126,118,424✔
73
                                              SOperatorParam* pParam,
74
                                              SSDataBlock** pResBlock) {
75
  QRY_PARAM_CHECK(pResBlock);
126,118,424✔
76

77
  recordOpExecBegin(pOperator);
126,119,659✔
78
  int32_t code = pOperator->fpSet._nextExtFn(pOperator, pParam, pResBlock);
126,117,158✔
79
  size_t  rows = (TSDB_CODE_SUCCESS == code && *pResBlock != NULL) ?
126,100,674✔
80
                 (*pResBlock)->info.rows : 0;
252,201,132✔
81
  recordOpExecEnd(pOperator, rows);
126,101,714✔
82
  return code;
126,102,961✔
83
}
84

85
static int32_t optrResetStateFnWithExecRecord(SOperatorInfo* pOperator) {
36,023,715✔
86
  int32_t code = pOperator->fpSet._resetFn(pOperator);
36,023,715✔
87
  resetOperatorCostInfo(pOperator);
36,019,167✔
88
  return code;
36,019,892✔
89
}
90

91
void setOperatorStreamStateFn(SOperatorInfo* pOperator, __optr_state_fn_t relaseFn, __optr_state_fn_t reloadFn) {
146,927,205✔
92
  pOperator->fpSet.releaseStreamStateFn = relaseFn;
146,927,205✔
93
  pOperator->fpSet.reloadStreamStateFn = reloadFn;
146,955,355✔
94
}
146,926,229✔
95

96
void setOperatorResetStateFn(SOperatorInfo* pOperator, __optr_reset_state_fn_t resetFn) {
783,506,080✔
97
  pOperator->fpSet._resetFn = resetFn;
783,506,080✔
98
  pOperator->fpSet.resetStateFn = (resetFn != NULL) ? optrResetStateFnWithExecRecord : NULL;
783,488,543✔
99
}
783,562,924✔
100

101
int32_t optrDummyOpenFn(SOperatorInfo* pOperator) {
75,093,535✔
102
  OPTR_SET_OPENED(pOperator);
75,093,535✔
103
  return TSDB_CODE_SUCCESS;
75,079,536✔
104
}
105

106
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
359,943,265✔
107
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
359,943,265✔
108
  if (p->pDownstream == NULL) {
359,957,356✔
109
    return terrno;
×
110
  }
111

112
  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
359,853,412✔
113
  p->numOfDownstream = num;
359,949,126✔
114
  p->numOfRealDownstream = num;
359,929,894✔
115
  return TSDB_CODE_SUCCESS;
359,971,398✔
116
}
117

118
void setOperatorCompleted(SOperatorInfo* pOperator) {
765,636,793✔
119
  pOperator->status = OP_EXEC_DONE;
765,636,793✔
120
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
765,671,036✔
121
}
765,634,886✔
122

123
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
785,875,505✔
124
                     void* pInfo, SExecTaskInfo* pTaskInfo) {
125
  pOperator->name = (char*)name;
785,875,505✔
126
  pOperator->operatorType = type;
785,989,016✔
127
  pOperator->blocking = blocking;
785,750,269✔
128
  pOperator->status = status;
785,761,696✔
129
  pOperator->info = pInfo;
785,891,360✔
130
  pOperator->pTaskInfo = pTaskInfo;
786,002,014✔
131
}
785,985,326✔
132

133
// each operator should be set their own function to return total cost buffer
134
int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
×
135
  if (pOperator->blocking) {
×
136
    return -1;
×
137
  } else {
138
    return 0;
×
139
  }
140
}
141

142
static int64_t getQuerySupportBufSize(size_t numOfTables) {
×
143
  size_t s1 = sizeof(STableQueryInfo);
×
144
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
145
  return (int64_t)(s1 * 1.5 * numOfTables);
×
146
}
147

148
int32_t checkForQueryBuf(size_t numOfTables) {
×
149
  int64_t t = getQuerySupportBufSize(numOfTables);
×
150
  if (tsQueryBufferSizeBytes < 0) {
×
151
    return TSDB_CODE_SUCCESS;
×
152
  } else if (tsQueryBufferSizeBytes > 0) {
×
153
    while (1) {
×
154
      int64_t s = tsQueryBufferSizeBytes;
×
155
      int64_t remain = s - t;
×
156
      if (remain >= 0) {
×
157
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
×
158
          return TSDB_CODE_SUCCESS;
×
159
        }
160
      } else {
161
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
×
162
      }
163
    }
164
  }
165

166
  // disable query processing if the value of tsQueryBufferSize is zero.
167
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
×
168
}
169

170
void releaseQueryBuf(size_t numOfTables) {
×
171
  if (tsQueryBufferSizeBytes < 0) {
×
172
    return;
×
173
  }
174

175
  int64_t t = getQuerySupportBufSize(numOfTables);
×
176

177
  // restore value is not enough buffer available
178
  (void)atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
×
179
}
180

181
typedef enum {
182
  OPTR_FN_RET_CONTINUE = 0x1,
183
  OPTR_FN_RET_ABORT = 0x2,
184
} ERetType;
185

186
typedef struct STraverParam {
187
  void*   pRet;
188
  int32_t code;
189
  void*   pParam;
190
} STraverParam;
191

192
// iterate the operator tree helper
193
typedef ERetType (*optr_fn_t)(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdstr);
194

195
void traverseOperatorTree(SOperatorInfo* pOperator, optr_fn_t fn, STraverParam* pParam, const char* id) {
21,412,421✔
196
  if (pOperator == NULL) {
21,412,421✔
197
    return;
×
198
  }
199

200
  ERetType ret = fn(pOperator, pParam, id);
21,412,421✔
201
  if (ret == OPTR_FN_RET_ABORT || pParam->code != TSDB_CODE_SUCCESS) {
21,401,256✔
202
    return;
11,277,621✔
203
  }
204

205
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
20,248,417✔
206
    traverseOperatorTree(pOperator->pDownstream[i], fn, pParam, id);
10,121,983✔
207
    if (pParam->code != 0) {
10,112,576✔
208
      break;
×
209
    }
210
  }
211
}
212

213
ERetType extractOperatorInfo(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdStr) {
21,258,884✔
214
  STraverParam* p = pParam;
21,258,884✔
215
  if (pOperator->operatorType == *(int32_t*)p->pParam) {
21,258,884✔
216
    p->pRet = pOperator;
11,201,229✔
217
    return OPTR_FN_RET_ABORT;
11,203,271✔
218
  } else {
219
    return OPTR_FN_RET_CONTINUE;
10,048,472✔
220
  }
221
}
222

223
// QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
224
int32_t extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id, SOperatorInfo** pOptrInfo) {
11,209,369✔
225
  QRY_PARAM_CHECK(pOptrInfo);
11,209,369✔
226

227
  if (pOperator == NULL) {
11,209,727✔
228
    qError("invalid operator, failed to find tableScanOperator %s", id);
×
229
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
230
  }
231

232
  STraverParam p = {.pParam = &type, .pRet = NULL};
11,209,727✔
233
  traverseOperatorTree(pOperator, extractOperatorInfo, &p, id);
11,206,682✔
234
  if (p.code == 0) {
11,204,128✔
235
    *pOptrInfo = p.pRet;
11,203,453✔
236
  }
237
  return p.code;
11,204,448✔
238
}
239

240
typedef struct SExtScanInfo {
241
  int32_t order;
242
  int32_t scanFlag;
243
  int32_t inheritUsOrder;
244
} SExtScanInfo;
245

246
static ERetType extractScanInfo(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdStr) {
×
247
  int32_t       type = pOperator->operatorType;
×
248
  SExtScanInfo* pInfo = pParam->pParam;
×
249

250
  if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN ||
×
251
      type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN ||
×
252
      type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) {
×
253
    pInfo->order = TSDB_ORDER_ASC;
×
254
    pInfo->scanFlag = MAIN_SCAN;
×
255
    return OPTR_FN_RET_ABORT;
×
256
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
×
257
    if (!pInfo->inheritUsOrder) {
×
258
      pInfo->order = TSDB_ORDER_ASC;
×
259
    }
260
    pInfo->scanFlag = MAIN_SCAN;
×
261
    return OPTR_FN_RET_ABORT;
×
262
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
×
263
    STableScanInfo* pTableScanInfo = pOperator->info;
×
264
    pInfo->order = pTableScanInfo->base.cond.order;
×
265
    pInfo->scanFlag = pTableScanInfo->base.scanFlag;
×
266
    return OPTR_FN_RET_ABORT;
×
267
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
×
268
    STableMergeScanInfo* pTableScanInfo = pOperator->info;
×
269
    pInfo->order = pTableScanInfo->base.cond.order;
×
270
    pInfo->scanFlag = pTableScanInfo->base.scanFlag;
×
271
    return OPTR_FN_RET_ABORT;
×
272
  } else {
273
    return OPTR_FN_RET_CONTINUE;
×
274
  }
275
}
276

277
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder) {
×
278
  SExtScanInfo info = {.inheritUsOrder = inheritUsOrder, .order = *order};
×
279
  STraverParam p = {.pParam = &info};
×
280

281
  traverseOperatorTree(pOperator, extractScanInfo, &p, NULL);
×
282
  *order = info.order;
×
283
  *scanFlag = info.scanFlag;
×
284

285
  if (p.code == TSDB_CODE_SUCCESS) {
×
286
    if (!(*order == TSDB_ORDER_ASC || *order == TSDB_ORDER_DESC)) {
×
287
      qError("operator failed at: %s:%d", __func__, __LINE__);
×
288
      p.code = TSDB_CODE_INVALID_PARA;
×
289
    }
290
  }
291
  return p.code;
×
292
}
293

294
static ERetType doStopDataReader(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdStr) {
155,499✔
295
  SStorageAPI* pAPI = pParam->pParam;
155,499✔
296
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
155,499✔
297
    STableScanInfo* pInfo = pOperator->info;
79,043✔
298

299
    if (pInfo->base.dataReader != NULL) {
79,043✔
300
      pAPI->tsdReader.tsdReaderNotifyClosing(pInfo->base.dataReader);
77,527✔
301
    }
302
    return OPTR_FN_RET_ABORT;
79,043✔
303
  } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
76,456✔
304
    STmqQueryScanInfo* pInfo = pOperator->info;
×
305

306
    if (pInfo->pTableScanOp != NULL) {
×
307
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
×
308
      if (pTableScanInfo != NULL && pTableScanInfo->base.dataReader != NULL) {
×
309
        pAPI->tsdReader.tsdReaderNotifyClosing(pTableScanInfo->base.dataReader);
×
310
      }
311
    }
312

313
    return OPTR_FN_RET_ABORT;
×
314
  }
315

316
  return OPTR_FN_RET_CONTINUE;
76,456✔
317
}
318

319
int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr, SStorageAPI* pAPI) {
91,416✔
320
  STraverParam p = {.pParam = pAPI};
91,416✔
321
  traverseOperatorTree(pOperator, doStopDataReader, &p, pIdStr);
91,416✔
322
  return p.code;
91,416✔
323
}
324

325
int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
786,151,944✔
326
                       SNode* pTagIndexCond, const char* pUser, const char* dbname, SOperatorInfo** pOptrInfo,
327
                       EOPTR_EXEC_MODEL model) {
328
  QRY_PARAM_CHECK(pOptrInfo);
786,151,944✔
329

330
  int32_t     code = 0;
786,196,886✔
331
  int32_t     type = nodeType(pPhyNode);
786,196,886✔
332
  const char* idstr = GET_TASKID(pTaskInfo);
786,198,941✔
333

334
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
786,218,264✔
335
    SOperatorInfo* pOperator = NULL;
425,736,543✔
336
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
425,621,584✔
337
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
246,686,223✔
338
      // NOTE: this is an patch to fix the physical plan
339
      // TODO remove it later
340
      if (pTableScanNode->scan.node.pLimit != NULL) {
246,686,223✔
341
        pTableScanNode->groupSort = true;
5,284,536✔
342
      }
343

344
      STableListInfo* pTableListInfo = tableListCreate();
246,686,283✔
345
      if (!pTableListInfo) {
246,717,117✔
346
        pTaskInfo->code = terrno;
×
347
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
348
        return terrno;
×
349
      }
350

351
      // Since virtual stable scan use virtual super table's uid in scan operator, the origin table might be stored on
352
      // different vnode, so we should not get table schema for virtual stable scan.
353
      if (!pTableScanNode->scan.virtualStableScan) {
246,717,117✔
354
        code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo);
235,857,168✔
355
        if (code) {
235,681,835✔
UNCOV
356
          pTaskInfo->code = code;
×
UNCOV
357
          tableListDestroy(pTableListInfo);
×
UNCOV
358
          return code;
×
359
        }
360
      }
361

362
      if (pTableScanNode->scan.node.dynamicOp) {
246,540,654✔
363
        pTaskInfo->dynamicTask = true;
16,549,263✔
364
        pTableListInfo->idInfo.suid = pTableScanNode->scan.suid;
16,548,730✔
365
        pTableListInfo->idInfo.tableType = pTableScanNode->scan.tableType;
16,548,624✔
366
      } else {
367
        code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
230,039,377✔
368
                                       pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo, NULL);
369
        if (code) {
230,143,056✔
370
          pTaskInfo->code = code;
994✔
371
          tableListDestroy(pTableListInfo);
994✔
372
          qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
994✔
373
          return code;
994✔
374
        }
375
      }
376

377
      code = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo, &pOperator);
246,687,146✔
378
      if (NULL == pOperator || code != 0) {
246,613,626✔
379
        pTaskInfo->code = code;
23,870✔
380
        tableListDestroy(pTableListInfo);
23,968✔
381
        return code;
23,968✔
382
      }
383

384
      STableScanInfo* pScanInfo = pOperator->info;
246,589,756✔
385
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
246,635,676✔
386
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
178,935,361✔
387
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
28,209,154✔
388
      STableListInfo*           pTableListInfo = tableListCreate();
28,209,154✔
389
      if (!pTableListInfo) {
28,237,335✔
390
        pTaskInfo->code = terrno;
×
391
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
392
        return terrno;
×
393
      }
394

395
      if (pTableScanNode->scan.node.dynamicOp) {
28,237,335✔
396
        pTaskInfo->dynamicTask = true;
285,720✔
397
        pTableListInfo->idInfo.suid = pTableScanNode->scan.suid;
285,720✔
398
        pTableListInfo->idInfo.tableType = pTableScanNode->scan.tableType;
285,720✔
399
      } else {
400
        code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle, pTableListInfo,
27,950,694✔
401
                                       pTagCond, pTagIndexCond, pTaskInfo, NULL);
402
        if (code) {
27,945,131✔
403
          pTaskInfo->code = code;
×
404
          tableListDestroy(pTableListInfo);
×
405
          qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
×
406
          return code;
×
407
        }
408

409
        code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo);
27,945,131✔
410
        if (code) {
27,891,549✔
411
          pTaskInfo->code = code;
×
412
          tableListDestroy(pTableListInfo);
×
413
          return code;
×
414
        }
415
      }
416

417
      code = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo, &pOperator);
28,177,269✔
418
      if (NULL == pOperator || code != 0) {
28,186,113✔
419
        pTaskInfo->code = code;
34,912✔
420
        tableListDestroy(pTableListInfo);
×
421
        return code;
×
422
      }
423

424
      STableMergeScanInfo* pScanInfo = pOperator->info;
28,151,233✔
425
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
28,201,654✔
426
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
150,726,207✔
427
      code = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
112,840,402✔
428
                                        pTaskInfo, &pOperator);
429
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
37,885,805✔
430
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
333,319✔
431
      STableListInfo*      pTableListInfo = tableListCreate();
333,319✔
432
      if (!pTableListInfo) {
333,319✔
433
        pTaskInfo->code = terrno;
×
434
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
435
        return terrno;
×
436
      }
437

438
      if (pHandle->vnode && (pTaskInfo->pSubplan->pVTables == NULL)) {
333,319✔
439
        code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
333,319✔
440
                                       pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo, NULL);
441
        if (code) {
333,319✔
442
          pTaskInfo->code = code;
×
443
          tableListDestroy(pTableListInfo);
×
444
          qError("failed to createScanTableListInfo, code:%s", tstrerror(code));
×
445
          return code;
×
446
        }
447
      }
448

449
      code = createTmqScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo, &pOperator);
333,319✔
450
      if (code) {
333,319✔
451
        pTaskInfo->code = code;
×
452
        tableListDestroy(pTableListInfo);
×
453
        return code;
×
454
      }
455
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
37,552,486✔
456
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
25,718,245✔
457
      if (pSysScanPhyNode->scan.virtualStableScan) {
25,718,245✔
458
        STableListInfo*           pTableListInfo = tableListCreate();
14,050,089✔
459
        if (!pTableListInfo) {
14,054,283✔
460
          pTaskInfo->code = terrno;
×
461
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
462
          return terrno;
×
463
        }
464

465
        code = createScanTableListInfo((SScanPhysiNode*)pSysScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
14,054,283✔
466
                                       pTagIndexCond, pTaskInfo, NULL);
467
        if (code != TSDB_CODE_SUCCESS) {
14,047,944✔
468
          pTaskInfo->code = code;
×
469
          tableListDestroy(pTableListInfo);
×
470
          return code;
×
471
        }
472

473
        code = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pTableListInfo, pUser, pTaskInfo, &pOperator);
14,047,944✔
474
      } else {
475
        code = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, NULL, pUser, pTaskInfo, &pOperator);
11,669,274✔
476
      }
477
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN == type) {
11,834,241✔
478
      STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode;
29,571✔
479
      code = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo, &pOperator);
29,571✔
480
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
11,804,670✔
481
      STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode;
8,120,462✔
482
      STableListInfo*    pTableListInfo = tableListCreate();
8,120,462✔
483
      if (!pTableListInfo) {
8,126,053✔
484
        pTaskInfo->code = terrno;
×
485
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
486
        return terrno;
×
487
      }
488

489
      code = initQueriedTableSchemaInfo(pHandle, &pTagScanPhyNode->scan, dbname, pTaskInfo);
8,126,053✔
490
      if (code != TSDB_CODE_SUCCESS) {
8,116,200✔
491
        pTaskInfo->code = code;
×
492
        tableListDestroy(pTableListInfo);
×
493
        return code;
×
494
      }
495

496
      if (!pTagScanPhyNode->onlyMetaCtbIdx) {
8,116,200✔
497
        code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
3,146,741✔
498
                                       pTagIndexCond, pTaskInfo, NULL);
499
        if (code != TSDB_CODE_SUCCESS) {
3,141,237✔
500
          pTaskInfo->code = code;
×
501
          qError("failed to getTableList, code:%s", tstrerror(code));
×
502
          tableListDestroy(pTableListInfo);
×
503
          return code;
×
504
        }
505
      }
506

507
      if (pTagScanPhyNode->scan.node.dynamicOp) {
8,111,911✔
508
        pTaskInfo->dynamicTask = true;
2,246,102✔
509
        pTableListInfo->idInfo.suid = pTagScanPhyNode->scan.suid;
2,243,718✔
510
        pTableListInfo->idInfo.tableType = pTagScanPhyNode->scan.tableType;
2,246,149✔
511
      }
512

513
      code = createTagScanOperatorInfo(pHandle, pTagScanPhyNode, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo,
8,111,223✔
514
                                       &pOperator);
515
      if (code) {
8,116,029✔
516
        pTaskInfo->code = code;
×
517
        tableListDestroy(pTableListInfo);
×
518
        return code;
×
519
      }
520
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
3,684,208✔
521
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
5,537✔
522
      STableListInfo*          pTableListInfo = tableListCreate();
5,537✔
523
      if (!pTableListInfo) {
5,537✔
524
        pTaskInfo->code = terrno;
×
525
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
526
        return terrno;
×
527
      }
528

529
      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
5,537✔
530
        SArray* pList = taosArrayInit(4, sizeof(uint64_t));
4,723✔
531
        code = pTaskInfo->storageAPI.metaFn.getChildTableList(pHandle->vnode, pBlockNode->uid, pList);
4,723✔
532
        if (code != TSDB_CODE_SUCCESS) {
4,723✔
533
          pTaskInfo->code = code;
×
534
          taosArrayDestroy(pList);
×
535
          tableListDestroy(pTableListInfo);
×
536
          return code;
×
537
        }
538

539
        size_t num = taosArrayGetSize(pList);
4,723✔
540
        for (int32_t i = 0; i < num; ++i) {
10,146✔
541
          uint64_t* id = taosArrayGet(pList, i);
5,423✔
542
          if (id == NULL) {
5,423✔
543
            continue;
×
544
          }
545

546
          code = tableListAddTableInfo(pTableListInfo, *id, 0);
5,423✔
547
          if (code) {
5,423✔
548
            pTaskInfo->code = code;
×
549
            tableListDestroy(pTableListInfo);
×
550
            taosArrayDestroy(pList);
×
551
            return code;
×
552
          }
553
        }
554

555
        taosArrayDestroy(pList);
4,723✔
556
      } else {  // Create group with only one table
557
        code = tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
814✔
558
        if (code) {
814✔
559
          pTaskInfo->code = code;
×
560
          tableListDestroy(pTableListInfo);
×
561
          return code;
×
562
        }
563
      }
564

565
      code = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTableListInfo, pTaskInfo, &pOperator);
5,537✔
566
      if (code) {
5,537✔
567
        pTaskInfo->code = code;
×
568
        tableListDestroy(pTableListInfo);
×
569
        return code;
×
570
      }
571
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
3,678,671✔
572
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
1,821,624✔
573
      STableListInfo*        pTableListInfo = tableListCreate();
1,821,624✔
574
      if (!pTableListInfo) {
1,821,624✔
575
        pTaskInfo->code = terrno;
×
576
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
577
        return terrno;
×
578
      }
579

580
      code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond,
1,821,624✔
581
                                     pTagIndexCond, pTaskInfo, NULL);
582
      if (code != TSDB_CODE_SUCCESS) {
1,821,624✔
583
        pTaskInfo->code = code;
×
584
        tableListDestroy(pTableListInfo);
×
585
        return code;
×
586
      }
587

588
      code = initQueriedTableSchemaInfo(pHandle, &pScanNode->scan, dbname, pTaskInfo);
1,821,624✔
589
      if (code != TSDB_CODE_SUCCESS) {
1,820,542✔
590
        pTaskInfo->code = code;
×
591
        tableListDestroy(pTableListInfo);
×
592
        return code;
×
593
      }
594

595
      code = createCacherowsScanOperator(pScanNode, pHandle, pTableListInfo, pTaskInfo, &pOperator);
1,820,542✔
596
      if (code) {
1,820,554✔
597
        tableListDestroy(pTableListInfo);
×
598
        pTaskInfo->code = code;
×
599
        return code;
×
600
      }
601
    } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
1,857,138✔
602
      code = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo, &pOperator);
1,839,944✔
603
    } else if (QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN == type) {
17,194✔
604
      // NOTE: this is an patch to fix the physical plan
605
      code = createVirtualTableMergeOperatorInfo(NULL, 0, (SVirtualScanPhysiNode*)pPhyNode, pTaskInfo, &pOperator);
17,861✔
606
    } else {
UNCOV
607
      code = TSDB_CODE_INVALID_PARA;
×
UNCOV
608
      pTaskInfo->code = code;
×
609
      return code;
×
610
    }
611

612
    if (pOperator != NULL) {  // todo moved away
425,235,415✔
613
      pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
425,493,497✔
614
    }
615

616
    *pOptrInfo = pOperator;
425,217,901✔
617
    return code;
425,498,043✔
618
  }
619

620
  size_t          size = LIST_LENGTH(pPhyNode->pChildren);
360,513,074✔
621
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
360,591,448✔
622
  if (ops == NULL) {
360,554,467✔
623
    code = terrno;
×
624
    pTaskInfo->code = code;
×
625
    return code;
×
626
  }
627

628
  for (int32_t i = 0; i < size; ++i) {
772,058,568✔
629
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
411,795,102✔
630
    // For external window parent, pre-initialize runtime from subquery before building children
631
    if (QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW == type && i == 0) {
411,777,031✔
632
      // best-effort pre-init; ignore errors here and let later construction handle them
NEW
633
      (void)extWinPreInitFromSubquery(pPhyNode, pTaskInfo);
×
634
    }
635
    code = createOperator(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser, dbname, &ops[i], model);
411,777,031✔
636
    if (ops[i] == NULL || code != 0) {
411,672,327✔
637
      for (int32_t j = 0; j < i; ++j) {
75,852✔
638
        destroyOperator(ops[j]);
×
639
      }
640
      taosMemoryFree(ops);
75,852✔
641
      return code;
75,852✔
642
    }
643
  }
644

645
  SOperatorInfo* pOptr = NULL;
360,263,466✔
646
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
360,265,169✔
647
    code = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
145,304,929✔
648
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
214,960,240✔
649
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
124,049,425✔
650
    if (pAggNode->pGroupKeys != NULL) {
124,049,425✔
651
      code = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo, &pOptr);
31,529,174✔
652
    } else {
653
      code = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo, &pOptr);
92,586,950✔
654
    }
655
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
90,910,815✔
656
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
4,928,936✔
657
    code = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, &pOptr);
4,928,936✔
658
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
85,981,879✔
659
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
818,600✔
660
    code = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, &pOptr);
818,600✔
661
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
85,163,279✔
662
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
×
663
    code = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, &pOptr);
×
664
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
85,163,279✔
665
    code = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
33,708,436✔
666
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
51,454,843✔
667
    code = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
78,296✔
668
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
51,376,547✔
669
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
14,046,446✔
670
    code = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo, &pOptr);
14,046,446✔
671
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
37,330,101✔
672
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
586,705✔
673
    code = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo, &pOptr);
586,705✔
674
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
36,743,396✔
675
    code = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
3,020,929✔
676
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
33,722,467✔
677
    SStateWindowPhysiNode* pStateNode = (SStateWindowPhysiNode*)pPhyNode;
1,070,773✔
678
    code = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo, &pOptr);
1,070,773✔
679
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
32,651,694✔
680
    code = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
13,012,540✔
681
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN == type) {
19,639,154✔
682
    code = createHashJoinOperatorInfo(ops, size, (SHashJoinPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
1,225,788✔
683
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
18,413,366✔
684
    code = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
496,702✔
685
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
17,916,664✔
686
    code = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
3,172,688✔
687
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
14,743,976✔
688
    code = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
3,455,328✔
689
  } else if (QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC == type) {
11,288,648✔
690
    code = createForecastOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
×
691
  } else if (QUERY_NODE_PHYSICAL_PLAN_ANALYSIS_FUNC == type) {
11,288,648✔
692
    code = createGenericAnalysisOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
×
693
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT == type) {
11,288,648✔
694
    code = createEventwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
362,487✔
695
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE == type) {
10,926,161✔
696
    code = createGroupCacheOperatorInfo(ops, size, (SGroupCachePhysiNode*)pPhyNode, pTaskInfo, &pOptr);
1,224,980✔
697
  } else if (QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL == type) {
9,701,181✔
698
    code = createDynQueryCtrlOperatorInfo(ops, size, (SDynQueryCtrlPhysiNode*)pPhyNode, pTaskInfo, pHandle->pMsgCb, &pOptr);
3,725,217✔
699
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT == type) {
5,975,964✔
700
    code = createCountwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
286,738✔
701
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY == type) {
5,689,226✔
702
    code = createAnomalywindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
×
703
  } else if (QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN == type) {
5,689,226✔
704
    SVirtualScanPhysiNode* pVirtualTableScanNode = (SVirtualScanPhysiNode*)pPhyNode;
4,087,167✔
705
    // NOTE: this is an patch to fix the physical plan
706

707
    if (pVirtualTableScanNode->scan.node.pLimit != NULL) {
4,087,167✔
708
      pVirtualTableScanNode->groupSort = true;
×
709
    }
710
    code = createVirtualTableMergeOperatorInfo(ops, size, (SVirtualScanPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
4,087,167✔
711
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_EXTERNAL == type) {
1,602,059✔
712
    if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
1,564,027✔
713
      code = createStreamExternalWindowOperator(ops[0], pPhyNode, pTaskInfo, &pOptr);
169,450✔
714
    } else {
715
      code = createExternalWindowOperator(ops[0], pPhyNode, pTaskInfo, &pOptr);
1,394,174✔
716
    }
717
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_EXTERNAL == type) {
42,510✔
718
    if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
59,941✔
719
      code = createStreamMergeAlignedExternalWindowOperator(ops[0], pPhyNode, pTaskInfo, &pOptr);
17,223✔
720
    } else {
721
      code = createMergeAlignedExternalWindowOperator(ops[0], pPhyNode, pTaskInfo, &pOptr);
42,718✔
722
    }
723
  } else {
UNCOV
724
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
725
    pTaskInfo->code = code;
×
726
    for (int32_t i = 0; i < size; ++i) {
×
727
      destroyOperator(ops[i]);
×
728
    }
729
    taosMemoryFree(ops);
×
730
    qError("invalid operator type %d", type);
×
731
    return code;
×
732
  }
733

734
  taosMemoryFree(ops);
360,422,797✔
735
  if (pOptr) {
360,332,941✔
736
    pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
359,882,181✔
737
  }
738

739
  *pOptrInfo = pOptr;
360,370,967✔
740
  return code;
360,393,037✔
741
}
742

743
void destroyOperator(SOperatorInfo* pOperator) {
787,118,830✔
744
  if (pOperator == NULL) {
787,118,830✔
745
    return;
479,368✔
746
  }
747

748
  freeResetOperatorParams(pOperator, OP_GET_PARAM, true);
786,639,462✔
749
  freeResetOperatorParams(pOperator, OP_NOTIFY_PARAM, true);
786,597,901✔
750

751
  if (pOperator->pDownstream != NULL) {
786,601,190✔
752
    for (int32_t i = 0; i < pOperator->numOfRealDownstream; ++i) {
771,344,888✔
753
      destroyOperator(pOperator->pDownstream[i]);
411,322,352✔
754
    }
755

756
    taosMemoryFreeClear(pOperator->pDownstream);
360,055,626✔
757
    pOperator->numOfDownstream = 0;
360,014,027✔
758
  }
759

760
  cleanupExprSupp(&pOperator->exprSupp);
786,571,925✔
761

762
  if (pOperator->fpSet.closeFn != NULL && pOperator->info != NULL) {
786,578,280✔
763
    pOperator->fpSet.closeFn(pOperator->info);
786,134,545✔
764
    pOperator->info = NULL;
786,046,028✔
765
  }
766

767
  taosMemoryFreeClear(pOperator);
786,526,404✔
768
}
769

770
void destroyOperatorAndDownstreams(SOperatorInfo* pOperator, SOperatorInfo** downstreams, int32_t num) {
454,406✔
771
  if (downstreams != NULL) {
454,406✔
772
    for (int i = 0; i < num; i++) {
908,812✔
773
      destroyOperator(downstreams[i]);
454,406✔
774
    }
775
  }
776

777
  if (pOperator != NULL) {
454,406✔
778
    pOperator->info = NULL;
454,406✔
779
    if (pOperator->pDownstream != NULL) {
454,406✔
780
      taosMemoryFreeClear(pOperator->pDownstream);
×
781
      pOperator->pDownstream = NULL;
×
782
    }
783
    destroyOperator(pOperator);
454,406✔
784
  }
785
}
454,406✔
786

787
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
7,219,251✔
788
  SExplainExecInfo  execInfo = {0};
7,219,251✔
789
  SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
7,218,455✔
790
  if (pExplainInfo == NULL) {
7,218,455✔
791
    return terrno;
×
792
  }
793

794
  pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
7,218,455✔
795
  pExplainInfo->verboseLen = 0;
7,218,844✔
796
  pExplainInfo->verboseInfo = NULL;
7,217,225✔
797
  pExplainInfo->vgId = operatorInfo->pTaskInfo->id.vgId;
7,219,260✔
798
  pExplainInfo->execCreate = operatorInfo->cost.execCreate;
7,218,853✔
799

800
  /*
801
    For the start, first and last row ts, we need to subtract the execCreate time
802
    to compute the real elapsed time since the operator is created.
803
  */
804
  if (operatorInfo->resultInfo.totalRows > 0) {
7,218,048✔
805
    /*
806
      When there is no data returned, keep execFirstRow and execLastRow as 0.
807
    */
808
    pExplainInfo->execFirstRow = operatorInfo->cost.execFirstRow - operatorInfo->cost.execCreate;
6,093,808✔
809
    pExplainInfo->execLastRow = operatorInfo->cost.execLastRow - operatorInfo->cost.execCreate;
6,094,215✔
810
  }
811

812
  pExplainInfo->execTimes = operatorInfo->cost.execTimes;
7,218,030✔
813
  if (operatorInfo->cost.execTimes > 0) {
7,218,039✔
814
    pExplainInfo->execStart = operatorInfo->cost.execStart - operatorInfo->cost.execCreate;
7,214,569✔
815
    pExplainInfo->execElapsed = operatorInfo->cost.execElapsed;
7,212,127✔
816
    pExplainInfo->inputWaitElapsed = operatorInfo->cost.inputWaitElapsed;
7,215,383✔
817
    pExplainInfo->outputWaitElapsed = operatorInfo->cost.outputWaitElapsed;
7,212,543✔
818
    pExplainInfo->inputRows = operatorInfo->cost.inputRows;
7,215,383✔
819
  }
820

821
  if (operatorInfo->fpSet.getExplainFn) {
7,216,013✔
822
    int32_t code =
823
        operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
4,801,262✔
824
    if (code) {
4,802,483✔
825
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
×
826
      return code;
×
827
    }
828
  }
829

830
  int32_t code = 0;
7,220,065✔
831
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
11,302,977✔
832
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
4,082,912✔
833
    if (code != TSDB_CODE_SUCCESS) {
4,082,912✔
834
      //      taosMemoryFreeClear(*pRes);
835
      return code;
×
836
    }
837
  }
838

839
  return TSDB_CODE_SUCCESS;
7,220,472✔
840
}
841

842
int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc) {
×
843
  if (pDst->opType != pSrc->opType) {
×
844
    qError("different optype %d:%d for merge operator params", pDst->opType, pSrc->opType);
×
845
    return TSDB_CODE_INVALID_PARA;
×
846
  }
847

848
  switch (pDst->opType) {
×
849
    case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
×
850
      SExchangeOperatorParam* pDExc = pDst->value;
×
851
      SExchangeOperatorParam* pSExc = pSrc->value;
×
852
      if (pSExc->basic.paramType != DYN_TYPE_EXCHANGE_PARAM) {
×
853
        qError("%s, invalid exchange operator param type %d for "
×
854
          "source operator", __func__, pSExc->basic.paramType);
855
        return TSDB_CODE_INVALID_PARA;
×
856
      }
857
      if (!pDExc->multiParams) {
×
858
        if (pDExc->basic.paramType != DYN_TYPE_EXCHANGE_PARAM) {
×
859
          qError("%s, invalid exchange operator param type %d for "
×
860
            "destination operator", __func__, pDExc->basic.paramType);
861
          return TSDB_CODE_INVALID_PARA;
×
862
        }
863
        if (pSExc->basic.vgId != pDExc->basic.vgId) {
×
864
          SExchangeOperatorBatchParam* pBatch = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
×
865
          if (NULL == pBatch) {
×
866
            return terrno;
×
867
          }
868

869
          pBatch->multiParams = true;
×
870
          pBatch->pBatchs = tSimpleHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
871
          if (NULL == pBatch->pBatchs) {
×
872
            taosMemoryFree(pBatch);
×
873
            return terrno;
×
874
          }
875

876
          tSimpleHashSetFreeFp(pBatch->pBatchs, freeExchangeGetBasicOperatorParam);
×
877

878
          int32_t code = tSimpleHashPut(pBatch->pBatchs, &pDExc->basic.vgId, sizeof(pDExc->basic.vgId), &pDExc->basic,
×
879
                                        sizeof(pDExc->basic));
880
          if (code) {
×
881
            return code;
×
882
          }
883

884
          code = tSimpleHashPut(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId), &pSExc->basic,
×
885
                                sizeof(pSExc->basic));
886
          if (code) {
×
887
            return code;
×
888
          }
889

890
          taosMemoryFree(pDst->value);
×
891
          pDst->value = pBatch;
×
892
        } else {
893
          void* p = taosArrayAddAll(pDExc->basic.uidList, pSExc->basic.uidList);
×
894
          if (p == NULL) {
×
895
            return terrno;
×
896
          }
897
        }
898
      } else {
899
        SExchangeOperatorBatchParam* pBatch = pDst->value;
×
900
        SExchangeOperatorBasicParam* pBasic =
901
            tSimpleHashGet(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId));
×
902
        if (pBasic) {
×
903
          void* p = taosArrayAddAll(pBasic->uidList, pSExc->basic.uidList);
×
904
          if (p == NULL) {
×
905
            return terrno;
×
906
          }
907
        } else {
908
          int32_t code = tSimpleHashPut(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId), &pSExc->basic,
×
909
                                        sizeof(pSExc->basic));
910
          if (code) {
×
911
            return code;
×
912
          }
913
        }
914
      }
915
      break;
×
916
    }
917
    default:
×
918
      qError("invalid optype %d for merge operator params", pDst->opType);
×
919
      return TSDB_CODE_INVALID_PARA;
×
920
  }
921

922
  return TSDB_CODE_SUCCESS;
×
923
}
924

925
int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInput, SOperatorParamType type) {
114,242,501✔
926
  SOperatorParam**  ppParam = NULL;
114,242,501✔
927
  SOperatorParam*** pppDownstramParam = NULL;
114,242,501✔
928
  switch (type) {
114,242,501✔
929
    case OP_GET_PARAM:
114,243,124✔
930
      ppParam = &pOperator->pOperatorGetParam;
114,243,124✔
931
      pppDownstramParam = &pOperator->pDownstreamGetParams;
114,243,124✔
932
      break;
114,243,124✔
933
    case OP_NOTIFY_PARAM:
×
934
      ppParam = &pOperator->pOperatorNotifyParam;
×
935
      pppDownstramParam = &pOperator->pDownstreamNotifyParams;
×
936
      break;
×
937
    default:
×
938
      return TSDB_CODE_INVALID_PARA;
×
939
  }
940

941
  freeResetOperatorParams(pOperator, type, false);
114,243,124✔
942

943
  if (NULL == pInput) {
114,243,988✔
944
    return TSDB_CODE_SUCCESS;
×
945
  }
946

947
  *ppParam = (pInput->opType == pOperator->operatorType) ? pInput : NULL;
114,243,988✔
948

949
  if (NULL == *pppDownstramParam) {
114,244,308✔
950
    *pppDownstramParam = taosMemoryCalloc(pOperator->numOfDownstream, POINTER_BYTES);
19,661,155✔
951
    if (NULL == *pppDownstramParam) {
19,661,245✔
952
      return terrno;
×
953
    }
954
  }
955

956
  if (NULL == *ppParam) {
114,244,449✔
957
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
×
958
      (*pppDownstramParam)[i] = pInput;
×
959
    }
960
    return TSDB_CODE_SUCCESS;
×
961
  }
962

963
  memset(*pppDownstramParam, 0, pOperator->numOfDownstream * POINTER_BYTES);
114,245,099✔
964

965
  int32_t childrenNum = taosArrayGetSize((*ppParam)->pChildren);
114,245,644✔
966
  if (childrenNum <= 0) {
114,245,031✔
967
    return TSDB_CODE_SUCCESS;
106,143,951✔
968
  }
969

970
  for (int32_t i = 0; i < childrenNum; ++i) {
20,962,327✔
971
    SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet((*ppParam)->pChildren, i);
12,861,870✔
972
    if (pChild == NULL) {
12,859,991✔
973
      return terrno;
×
974
    }
975

976
    if ((*pppDownstramParam)[pChild->downstreamIdx]) {
12,859,991✔
977
      int32_t code = mergeOperatorParams((*pppDownstramParam)[pChild->downstreamIdx], pChild);
×
978
      if (code) {
×
979
        return code;
×
980
      }
981
    } else {
982
      (*pppDownstramParam)[pChild->downstreamIdx] = pChild;
12,860,624✔
983
    }
984
  }
985

986
  taosArrayDestroy((*ppParam)->pChildren);
8,100,457✔
987
  (*ppParam)->pChildren = NULL;
8,100,437✔
988

989
  return TSDB_CODE_SUCCESS;
8,100,437✔
990
}
991

992
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) {
888,624,837✔
993
  recordOpExecBeforeDownstream(pOperator);
888,624,837✔
994
  SSDataBlock* p = NULL;
888,553,747✔
995
  int32_t      code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p);
888,636,546✔
996
  if (code == TSDB_CODE_SUCCESS) {
885,638,117✔
997
    code = blockDataCheck(p);
885,684,594✔
998
    if (code != TSDB_CODE_SUCCESS) {
885,796,652✔
999
      qError("%s, %s failed at line %d, code:%s", GET_TASKID(pOperator->pTaskInfo),
×
1000
             __func__, __LINE__, tstrerror(code));
1001
    }
1002
  }
1003
  recordOpExecAfterDownstream(pOperator, p && code == TSDB_CODE_SUCCESS ? p->info.rows : 0);
885,686,835✔
1004
  return (code == TSDB_CODE_SUCCESS) ? p : NULL;
885,718,169✔
1005
}
1006

1007
SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, int32_t idx) {
71,730,614✔
1008
  recordOpExecBeforeDownstream(pOperator);
71,730,614✔
1009
  SSDataBlock* p = NULL;
71,730,614✔
1010
  int32_t      code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p);
71,730,614✔
1011
  if (code == TSDB_CODE_SUCCESS) {
71,718,067✔
1012
    code = blockDataCheck(p);
71,717,660✔
1013
    if (code != TSDB_CODE_SUCCESS) {
71,719,974✔
1014
      qError("%s, %s failed at line %d, code:%s", GET_TASKID(pOperator->pTaskInfo),
×
1015
             __func__, __LINE__, tstrerror(code));
1016
    }
1017
  }
1018
  recordOpExecAfterDownstream(pOperator, p && code == TSDB_CODE_SUCCESS ? p->info.rows : 0);
71,718,968✔
1019
  return (code == TSDB_CODE_SUCCESS) ? p : NULL;
71,719,974✔
1020
}
1021

1022
/*
1023
 * Fetch one block from downstream without preserving reused get-param ownership.
1024
 *
1025
 * @param pOperator Current operator.
1026
 * @param idx Downstream index.
1027
 * @param pResBlock Output result block pointer.
1028
 *
1029
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
1030
 */
1031
static FORCE_INLINE int32_t getNextBlockFromDownstreamRemainDetachImpl(struct SOperatorInfo* pOperator, int32_t idx,
1032
                                                                       SSDataBlock** pResBlock) {
1033
  QRY_PARAM_CHECK(pResBlock);
571,293,035✔
1034

1035
  int32_t code = TSDB_CODE_SUCCESS;
571,302,108✔
1036
  int32_t lino = 0;
571,302,108✔
1037

1038
  if (pOperator->pDownstreamGetParams && pOperator->pDownstreamGetParams[idx]) {
575,092,564✔
1039
    SOperatorParam* pGetParam = pOperator->pDownstreamGetParams[idx];
3,794,780✔
1040
    pOperator->pDownstreamGetParams[idx] = NULL;
3,796,036✔
1041
    // Once detached from the parent operator, downstream must own/free this param.
1042
    pGetParam->reUse = false;
3,796,669✔
1043

1044
    qDebug("DynOp: op %s start to get block from downstream %s", pOperator->name, pOperator->pDownstream[idx]->name);
3,795,403✔
1045
    code = pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx], pGetParam, pResBlock);
3,795,403✔
1046
    QUERY_CHECK_CODE(code, lino, _return);
3,790,456✔
1047
  } else {
1048
    code = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx], pResBlock);
567,528,467✔
1049
    QUERY_CHECK_CODE(code, lino, _return);
565,502,886✔
1050
  }
1051

1052
_return:
565,502,886✔
1053
  if (code) {
569,293,342✔
1054
    qError("failed to get next data block from upstream at %s, line:%d code:%s", __func__, lino, tstrerror(code));
×
1055
  }
1056
  return code;
569,285,903✔
1057
}
1058

1059
/*
1060
 * Fetch and validate one downstream block while detaching one-shot get-param.
1061
 *
1062
 * @param pOperator Current operator.
1063
 * @param idx Downstream index.
1064
 *
1065
 * @return Valid block pointer on success, or NULL on failure.
1066
 */
1067
SSDataBlock* getNextBlockFromDownstreamRemainDetach(struct SOperatorInfo* pOperator, int32_t idx) {
571,314,748✔
1068
  SSDataBlock* p = NULL;
571,314,748✔
1069
  int32_t      code = TSDB_CODE_SUCCESS;
571,349,404✔
1070
  int32_t      lino = 0;
571,349,404✔
1071
  recordOpExecBeforeDownstream(pOperator);
571,349,404✔
1072

1073
  code = getNextBlockFromDownstreamRemainDetachImpl(pOperator, idx, &p);
569,285,903✔
1074
  QUERY_CHECK_CODE(code, lino, _return);
569,285,903✔
1075

1076
  code = blockDataCheck(p);
569,285,903✔
1077
  QUERY_CHECK_CODE(code, lino, _return);
569,339,614✔
1078

1079
_return:
569,339,614✔
1080
  recordOpExecAfterDownstream(pOperator, p && code == TSDB_CODE_SUCCESS ? p->info.rows : 0);
569,358,987✔
1081
  if (code) {
569,341,260✔
1082
    qError("failed to get next data block from downstream at %s, line:%d code:%s", __func__, lino, tstrerror(code));
45,376✔
1083
    return NULL;
×
1084
  }
1085
  return p;
569,296,396✔
1086
}
1087

1088
int32_t optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SSDataBlock** pRes) {
114,244,902✔
1089
  QRY_PARAM_CHECK(pRes);
114,244,902✔
1090

1091
  int32_t lino = 0;
114,245,525✔
1092
  int32_t code = setOperatorParams(pOperator, pParam, OP_GET_PARAM);
114,245,525✔
1093
  QUERY_CHECK_CODE(code, lino, _end);
114,245,011✔
1094
  code = pOperator->fpSet.getNextFn(pOperator, pRes);
114,245,011✔
1095
  QUERY_CHECK_CODE(code, lino, _end);
114,226,549✔
1096

1097
_end:
114,226,549✔
1098
  if (code != TSDB_CODE_SUCCESS) {
114,226,549✔
1099
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1100
    pOperator->pTaskInfo->code = code;
×
1101
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1102
  }
1103

1104
  return code;
114,226,549✔
1105
}
1106

1107
int32_t optrDefaultNotifyFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
×
1108
  int32_t code = setOperatorParams(pOperator, pParam, OP_NOTIFY_PARAM);
×
1109
  if (TSDB_CODE_SUCCESS == code && pOperator->fpSet.notifyFn && pOperator->pOperatorNotifyParam) {
×
1110
    code = pOperator->fpSet.notifyFn(pOperator, pOperator->pOperatorNotifyParam);
×
1111
  }
1112
  if (TSDB_CODE_SUCCESS == code) {
×
1113
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
×
1114
      if (pOperator->pDownstreamNotifyParams[i]) {
×
1115
        code = optrDefaultNotifyFn(pOperator->pDownstream[i], pOperator->pDownstreamNotifyParams[i]);
×
1116
        if (TSDB_CODE_SUCCESS != code) {
×
1117
          break;
×
1118
        }
1119
        pOperator->pDownstreamNotifyParams[i] = NULL;
×
1120
      }
1121
    }
1122
  }
1123
  if (TSDB_CODE_SUCCESS != code) {
×
1124
    pOperator->pTaskInfo->code = code;
×
1125
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1126
  }
1127

1128
  return code;
×
1129
}
1130

1131
int64_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx) {
29,272,246✔
1132
  if (pOperator->transparent) {
29,272,246✔
1133
    return getOperatorResultBlockId(pOperator->pDownstream[idx], 0);
2,449,960✔
1134
  }
1135
  return pOperator->resultDataBlockId;
26,822,286✔
1136
}
1137

1138
void resetOperatorState(SOperatorInfo *pOper) {
×
1139
  pOper->status = OP_NOT_OPENED;
×
1140
}
×
1141

1142
void resetBasicOperatorState(SOptrBasicInfo *pBasicInfo) {
17,367,386✔
1143
  if (pBasicInfo->pRes) blockDataCleanup(pBasicInfo->pRes);
17,367,386✔
1144
  initResultRowInfo(&pBasicInfo->resultRowInfo);
17,371,065✔
1145
}
17,370,467✔
1146

1147
int32_t resetAggSup(SExprSupp* pExprSupp, SAggSupporter* pSup, SExecTaskInfo* pTaskInfo,
14,888,960✔
1148
                    SNodeList* pNodeList, SNodeList* pGroupKeys, size_t keyBufSize, const char* pKey, void* pState,
1149
                    SFunctionStateStore* pStore) {
1150
  int32_t    code = 0, lino = 0, num = 0;
14,888,960✔
1151
  SExprInfo* pExprInfo = NULL;
14,888,960✔
1152
  cleanupExprSuppWithoutFilter(pExprSupp);
14,888,551✔
1153
  cleanupAggSup(pSup);
14,887,541✔
1154
  code = createExprInfo(pNodeList, pGroupKeys, &pExprInfo, &num);
14,888,248✔
1155
  QUERY_CHECK_CODE(code, lino, _error);
14,888,185✔
1156
  code = initAggSup(pExprSupp, pSup, pExprInfo, num, keyBufSize, pKey, pState, pStore);
14,888,185✔
1157
  QUERY_CHECK_CODE(code, lino, _error);
14,887,284✔
1158
  return code;
14,887,284✔
1159
_error:
×
1160
  if (code != TSDB_CODE_SUCCESS) {
×
1161
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1162
    pTaskInfo->code = code;
×
1163
  }
1164
  return code;
×
1165
}
1166

1167
int32_t resetExprSupp(SExprSupp* pExprSupp, SExecTaskInfo* pTaskInfo, SNodeList* pNodeList,
5,907,040✔
1168
                      SNodeList* pGroupKeys, SFunctionStateStore* pStore) {
1169
  int32_t code = 0, lino = 0, num = 0;
5,907,040✔
1170
  SExprInfo* pExprInfo = NULL;
5,907,040✔
1171
  cleanupExprSuppWithoutFilter(pExprSupp);
5,907,277✔
1172
  code = createExprInfo(pNodeList, pGroupKeys, &pExprInfo, &num);
5,907,277✔
1173
  QUERY_CHECK_CODE(code, lino, _error);
5,905,855✔
1174
  code = initExprSupp(pExprSupp, pExprInfo, num, pStore);
5,905,855✔
1175
  QUERY_CHECK_CODE(code, lino, _error);
5,906,803✔
1176
  return code;
5,906,803✔
1177
_error:
×
1178
  if (code != TSDB_CODE_SUCCESS) {
×
1179
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1180
    pTaskInfo->code = code;
×
1181
  }
1182
  return code;
×
1183
}
1184

1185
int32_t copyColumnsValue(SNodeList* pNodeList, int64_t targetBlkId, SSDataBlock* pDst, SSDataBlock* pSrc, int32_t totalRows) {
1,467,772✔
1186
  bool    isNull = (NULL == pSrc || pSrc->info.rows <= 0);
1,467,772✔
1187
  size_t  numOfCols = LIST_LENGTH(pNodeList);
1,467,772✔
1188
  int64_t numOfRows = isNull ? 0 : pSrc->info.rows;
1,467,772✔
1189
  int32_t code = 0;
1,467,772✔
1190
  int32_t lino = 0;
1,467,772✔
1191

1192
  for (int32_t i = 0; i < numOfCols; ++i) {
10,052,416✔
1193
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
8,584,644✔
1194
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == targetBlkId) {
8,584,644✔
1195
      SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, pNode->slotId);
4,678,150✔
1196
      QUERY_CHECK_NULL(pDstCol, code, lino, _return, terrno)
4,678,150✔
1197

1198
      if (isNull) {
4,678,150✔
1199
        colDataSetNNULL(pDstCol, 0, totalRows);
×
1200
      } else {
1201
        SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, ((SColumnNode*)pNode->pExpr)->slotId);
4,678,150✔
1202
        QUERY_CHECK_NULL(pSrcCol, code, lino, _return, terrno)
4,678,150✔
1203

1204
        code = colDataAssign(pDstCol, pSrcCol, pSrc->info.rows, &pDst->info);
4,678,150✔
1205

1206
        QUERY_CHECK_CODE(code, lino, _return);
4,678,150✔
1207
        if (pSrc->info.rows < totalRows) {
4,678,150✔
1208
          colDataSetNNULL(pDstCol, pSrc->info.rows, totalRows - pSrc->info.rows);
109,983✔
1209
        }
1210
      }
1211
    }
1212
  }
1213

1214
  return code;
1,467,772✔
1215
_return:
×
1216
  qError("failed to copy columns value, line:%d code:%s", lino, tstrerror(code));
×
1217
  return code;
×
1218
}
1219

1220
/**
1221
  @brief Record the create time of the operator and do initialization
1222
  to other metrics. This function will be called at the beginning of
1223
  creation of operators.
1224
*/
1225
void initOperatorCostInfo(SOperatorInfo* pOperator) {
786,494,250✔
1226
  pOperator->cost.execCreate = taosGetTimestampUs();
786,576,746✔
1227
  resetOperatorCostInfo(pOperator);
786,567,970✔
1228
}
786,471,487✔
1229

1230
/**
1231
  @brief Record the performance metrics at the beginning of the
1232
  operator execution:
1233
  - record the start time of the operator execution
1234
  - calculate the output wait time (time since last call returned)
1235
  - record the first time nextFn interface is called
1236
  Only record the metrics in explain analyze mode.
1237
*/
1238
void recordOpExecBegin(SOperatorInfo* pOperator) {
2,147,483,647✔
1239
  if (QUERY_ENABLE_EXPLAIN(pOperator->pTaskInfo)) {
2,147,483,647✔
1240
    pOperator->cost.startTs = taosGetTimestampUs();
19,992,942✔
1241
    // calculate output wait time (time since last call returned)
1242
    if (pOperator->cost.execLastRow > 0) {
19,991,739✔
1243
      pOperator->cost.outputWaitElapsed +=
25,389,663✔
1244
        pOperator->cost.startTs - pOperator->cost.execLastRow;
12,695,710✔
1245
    }
1246
    // record the first time nextFn is called
1247
    if (pOperator->cost.execStart == 0) {
19,989,982✔
1248
      pOperator->cost.execStart = pOperator->cost.startTs;
7,278,260✔
1249
    }
1250
  }
1251
}
2,147,483,647✔
1252

1253
/**
1254
  @brief Record the performance metrics before the downstream execution:
1255
  - record the elapsed time since the operator execution started
1256
  - update the start time of the operator execution
1257
  Only record the metrics in explain analyze mode.
1258
*/
1259
void recordOpExecBeforeDownstream(SOperatorInfo* pOperator) {
1,865,498,416✔
1260
  if (QUERY_ENABLE_EXPLAIN(pOperator->pTaskInfo)) {
1,865,498,416✔
1261
    pOperator->cost.endTs = taosGetTimestampUs();
11,290,796✔
1262
    if (UNLIKELY(pOperator->cost.startTs == 0)) {
11,290,384✔
1263
      /**
1264
        As long as getNextBlockFromDownstream() is called inside getNextFn(),
1265
        startTs must be initialized before endTs so that this condition should
1266
        always be false.
1267
      */
1268
      pOperator->cost.startTs = pOperator->cost.endTs;
×
1269
    }
1270
    pOperator->cost.execElapsed +=
22,580,492✔
1271
      pOperator->cost.endTs - pOperator->cost.startTs;
11,290,384✔
1272
  }
1273
}
1,865,628,039✔
1274

1275
/**
1276
  @brief Record the performance metrics after the downstream execution:
1277
  - record the elapsed time since the downstream execution started
1278
  - update the end time of the downstream execution
1279
  Only record the metrics in explain analyze mode.
1280
*/
1281
void recordOpExecAfterDownstream(SOperatorInfo* pOperator, size_t inputRows) {
1,860,588,839✔
1282
  if (QUERY_ENABLE_EXPLAIN(pOperator->pTaskInfo)) {
1,860,588,839✔
1283
    pOperator->cost.startTs = taosGetTimestampUs();
11,287,721✔
1284
    pOperator->cost.inputWaitElapsed +=
22,576,880✔
1285
      pOperator->cost.startTs - pOperator->cost.endTs;
11,287,698✔
1286
    pOperator->cost.inputRows += inputRows;
11,285,801✔
1287
  }
1288
}
1,860,756,319✔
1289

1290
/**
1291
  @brief Record the performance metrics at the end of the
1292
  operator execution:
1293
  - record the number of times the operator's next interface is called
1294
  - record the elapsed time for executing the operator's nextFn interface
1295
  - record the time when the first row is returned
1296
  - record the time when the last row is returned
1297
  Only record the metrics in explain analyze mode.
1298
*/
1299
void recordOpExecEnd(SOperatorInfo* pOperator, size_t rows) {
2,147,483,647✔
1300
  if (QUERY_ENABLE_EXPLAIN(pOperator->pTaskInfo)) {
2,147,483,647✔
1301
    pOperator->cost.endTs = taosGetTimestampUs();
19,976,861✔
1302
    pOperator->cost.execTimes++;
19,975,792✔
1303
    pOperator->cost.execElapsed +=
39,954,393✔
1304
      pOperator->cost.endTs - pOperator->cost.startTs;
19,975,116✔
1305

1306
    if (rows > 0) {
19,976,870✔
1307
      // record the first time data is returned
1308
      if (pOperator->cost.execFirstRow == 0) {
12,735,121✔
1309
        pOperator->cost.execFirstRow = pOperator->cost.endTs;
6,133,505✔
1310
      }
1311
      pOperator->cost.execLastRow = pOperator->cost.endTs;
12,736,608✔
1312
    }
1313
  }
1314
  
1315
  /* accumulate output total rows even not in explain mode */
1316
  pOperator->resultInfo.totalRows += rows;
2,147,483,647✔
1317
}
2,147,483,647✔
1318

1319
/**
1320
  @brief Reset operator's cost info but keep create time unchanged.
1321
*/
1322
void resetOperatorCostInfo(SOperatorInfo* pOperator) {
822,539,805✔
1323
  /* keep execCreate UNCHANGED!!! */
1324
  pOperator->cost.execStart = 0;
822,539,805✔
1325
  pOperator->cost.execFirstRow = 0;
822,608,621✔
1326
  pOperator->cost.execLastRow = 0;
822,567,956✔
1327
  pOperator->cost.execTimes = 0;
822,609,905✔
1328
  pOperator->cost.execElapsed = 0;
822,579,113✔
1329
  pOperator->cost.inputWaitElapsed = 0;
822,554,454✔
1330
  pOperator->cost.outputWaitElapsed = 0;
822,560,947✔
1331
  pOperator->cost.inputRows = 0;
822,631,302✔
1332
}
822,599,937✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc