• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5016

03 Apr 2026 03:59PM UTC coverage: 72.299% (+0.01%) from 72.289%
#5016

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4055 of 5985 new or added lines in 68 files covered. (67.75%)

13126 existing lines in 156 files now uncovered.

257424 of 356056 relevant lines covered (72.3%)

133108577.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.21
/source/libs/executor/src/operator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "executor.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "osTime.h"
21
#include "query.h"
22
#include "querytask.h"
23
#include "storageapi.h"
24
#include "taoserror.h"
25
#include "tdatablock.h"
26
#include "tdef.h"
27
#include "tglobal.h"
28
#include "tutil.h"
29

30
static int32_t optrGetNextFnWithExecRecord(SOperatorInfo* pOperator,
31
                                           SSDataBlock** pResBlock);
32
static int32_t optrGetNextExtFnWithExecRecord(SOperatorInfo* pOperator,
33
                                              SOperatorParam* pParam,
34
                                              SSDataBlock** pResBlock);
35
static int32_t optrResetStateFnWithExecRecord(SOperatorInfo* pParam);
36

37
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
795,291,397✔
38
                                   __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain,
39
                                   __optr_get_ext_fn_t nextExtFn, __optr_notify_fn_t notifyFn) {
40
  SOperatorFpSet fpSet = {
2,147,483,647✔
41
      ._openFn = openFn,
42
      ._nextFn = nextFn,
43
      .getNextFn = (nextFn != NULL) ? optrGetNextFnWithExecRecord : NULL,
795,291,397✔
44
      .cleanupFn = cleanup,
45
      .closeFn = closeFn,
46
      .reqBufFn = reqBufFn,
47
      .getExplainFn = explain,
48
      ._nextExtFn = nextExtFn,
49
      .getNextExtFn = (nextExtFn != NULL) ? optrGetNextExtFnWithExecRecord : NULL,
795,291,397✔
50
      .notifyFn = notifyFn,
51
      .releaseStreamStateFn = NULL,
52
      .reloadStreamStateFn = NULL,
53
      ._resetFn = NULL,
54
      .resetStateFn = NULL,
55
  };
56

57
  return fpSet;
795,291,397✔
58
}
59

60
static int32_t optrGetNextFnWithExecRecord(SOperatorInfo* pOperator,
2,147,483,647✔
61
                                           SSDataBlock** pResBlock) {
62
  QRY_PARAM_CHECK(pResBlock);
2,147,483,647✔
63

64
  recordOpExecBegin(pOperator);
2,147,483,647✔
65
  int32_t code = pOperator->fpSet._nextFn(pOperator, pResBlock);
2,147,483,647✔
66
  size_t  rows = (TSDB_CODE_SUCCESS == code && *pResBlock != NULL) ?
2,147,483,647✔
67
                 (*pResBlock)->info.rows : 0;
2,147,483,647✔
68
  recordOpExecEnd(pOperator, rows);
2,147,483,647✔
69
  return code;
2,147,483,647✔
70
}
71

72
static int32_t optrGetNextExtFnWithExecRecord(SOperatorInfo* pOperator,
161,798,067✔
73
                                              SOperatorParam* pParam,
74
                                              SSDataBlock** pResBlock) {
75
  QRY_PARAM_CHECK(pResBlock);
161,798,067✔
76

77
  recordOpExecBegin(pOperator);
161,800,869✔
78
  int32_t code = pOperator->fpSet._nextExtFn(pOperator, pParam, pResBlock);
161,792,522✔
79
  size_t  rows = (TSDB_CODE_SUCCESS == code && *pResBlock != NULL) ?
161,790,709✔
80
                 (*pResBlock)->info.rows : 0;
323,580,694✔
81
  recordOpExecEnd(pOperator, rows);
161,792,202✔
82
  return code;
161,789,757✔
83
}
84

85
static int32_t optrResetStateFnWithExecRecord(SOperatorInfo* pOperator) {
39,434,941✔
86
  int32_t code = pOperator->fpSet._resetFn(pOperator);
39,434,941✔
87
  resetOperatorCostInfo(pOperator);
39,437,660✔
88
  return code;
39,430,111✔
89
}
90

91
void setOperatorStreamStateFn(SOperatorInfo* pOperator, __optr_state_fn_t relaseFn, __optr_state_fn_t reloadFn) {
148,080,661✔
92
  pOperator->fpSet.releaseStreamStateFn = relaseFn;
148,080,661✔
93
  pOperator->fpSet.reloadStreamStateFn = reloadFn;
148,252,201✔
94
}
148,250,343✔
95

96
void setOperatorResetStateFn(SOperatorInfo* pOperator, __optr_reset_state_fn_t resetFn) {
793,192,847✔
97
  pOperator->fpSet._resetFn = resetFn;
793,192,847✔
98
  pOperator->fpSet.resetStateFn = (resetFn != NULL) ? optrResetStateFnWithExecRecord : NULL;
793,220,452✔
99
}
793,472,901✔
100

101
int32_t optrDummyOpenFn(SOperatorInfo* pOperator) {
71,673,703✔
102
  OPTR_SET_OPENED(pOperator);
71,673,703✔
103
  return TSDB_CODE_SUCCESS;
71,659,654✔
104
}
105

106
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
366,309,195✔
107
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
366,309,195✔
108
  if (p->pDownstream == NULL) {
366,530,232✔
UNCOV
109
    return terrno;
×
110
  }
111

112
  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
365,988,565✔
113
  p->numOfDownstream = num;
366,114,573✔
114
  p->numOfRealDownstream = num;
366,350,664✔
115
  return TSDB_CODE_SUCCESS;
365,883,107✔
116
}
117

118
void setOperatorCompleted(SOperatorInfo* pOperator) {
795,474,060✔
119
  pOperator->status = OP_EXEC_DONE;
795,474,060✔
120
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
795,593,298✔
121
}
795,581,862✔
122

123
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
794,941,849✔
124
                     void* pInfo, SExecTaskInfo* pTaskInfo) {
125
  pOperator->name = (char*)name;
794,941,849✔
126
  pOperator->operatorType = type;
795,411,925✔
127
  pOperator->blocking = blocking;
794,254,869✔
128
  pOperator->status = status;
794,573,585✔
129
  pOperator->info = pInfo;
795,180,460✔
130
  pOperator->pTaskInfo = pTaskInfo;
794,693,728✔
131
}
795,169,676✔
132

133
// each operator should be set their own function to return total cost buffer
UNCOV
134
int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
×
135
  if (pOperator->blocking) {
×
136
    return -1;
×
137
  } else {
UNCOV
138
    return 0;
×
139
  }
140
}
141

UNCOV
142
static int64_t getQuerySupportBufSize(size_t numOfTables) {
×
143
  size_t s1 = sizeof(STableQueryInfo);
×
144
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
UNCOV
145
  return (int64_t)(s1 * 1.5 * numOfTables);
×
146
}
147

UNCOV
148
int32_t checkForQueryBuf(size_t numOfTables) {
×
149
  int64_t t = getQuerySupportBufSize(numOfTables);
×
150
  if (tsQueryBufferSizeBytes < 0) {
×
151
    return TSDB_CODE_SUCCESS;
×
152
  } else if (tsQueryBufferSizeBytes > 0) {
×
153
    while (1) {
×
154
      int64_t s = tsQueryBufferSizeBytes;
×
155
      int64_t remain = s - t;
×
156
      if (remain >= 0) {
×
157
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
×
158
          return TSDB_CODE_SUCCESS;
×
159
        }
160
      } else {
UNCOV
161
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
×
162
      }
163
    }
164
  }
165

166
  // disable query processing if the value of tsQueryBufferSize is zero.
UNCOV
167
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
×
168
}
169

UNCOV
170
void releaseQueryBuf(size_t numOfTables) {
×
171
  if (tsQueryBufferSizeBytes < 0) {
×
172
    return;
×
173
  }
174

UNCOV
175
  int64_t t = getQuerySupportBufSize(numOfTables);
×
176

177
  // restore value is not enough buffer available
UNCOV
178
  (void)atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
×
179
}
180

181
typedef enum {
182
  OPTR_FN_RET_CONTINUE = 0x1,
183
  OPTR_FN_RET_ABORT = 0x2,
184
} ERetType;
185

186
typedef struct STraverParam {
187
  void*   pRet;
188
  int32_t code;
189
  void*   pParam;
190
} STraverParam;
191

192
// iterate the operator tree helper
193
typedef ERetType (*optr_fn_t)(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdstr);
194

195
void traverseOperatorTree(SOperatorInfo* pOperator, optr_fn_t fn, STraverParam* pParam, const char* id) {
17,549,527✔
196
  if (pOperator == NULL) {
17,549,527✔
UNCOV
197
    return;
×
198
  }
199

200
  ERetType ret = fn(pOperator, pParam, id);
17,549,527✔
201
  if (ret == OPTR_FN_RET_ABORT || pParam->code != TSDB_CODE_SUCCESS) {
17,543,767✔
202
    return;
9,320,604✔
203
  }
204

205
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
16,454,647✔
206
    traverseOperatorTree(pOperator->pDownstream[i], fn, pParam, id);
8,218,038✔
207
    if (pParam->code != 0) {
8,217,180✔
UNCOV
208
      break;
×
209
    }
210
  }
211
}
212

213
ERetType extractOperatorInfo(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdStr) {
17,424,654✔
214
  STraverParam* p = pParam;
17,424,654✔
215
  if (pOperator->operatorType == *(int32_t*)p->pParam) {
17,424,654✔
216
    p->pRet = pOperator;
9,260,955✔
217
    return OPTR_FN_RET_ABORT;
9,264,034✔
218
  } else {
219
    return OPTR_FN_RET_CONTINUE;
8,161,681✔
220
  }
221
}
222

223
// QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
224
int32_t extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id, SOperatorInfo** pOptrInfo) {
9,267,416✔
225
  QRY_PARAM_CHECK(pOptrInfo);
9,267,416✔
226

227
  if (pOperator == NULL) {
9,270,579✔
UNCOV
228
    qError("invalid operator, failed to find tableScanOperator %s", id);
×
229
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
230
  }
231

232
  STraverParam p = {.pParam = &type, .pRet = NULL};
9,270,579✔
233
  traverseOperatorTree(pOperator, extractOperatorInfo, &p, id);
9,260,703✔
234
  if (p.code == 0) {
9,261,735✔
235
    *pOptrInfo = p.pRet;
9,262,911✔
236
  }
237
  return p.code;
9,263,568✔
238
}
239

240
typedef struct SExtScanInfo {
241
  int32_t order;
242
  int32_t scanFlag;
243
  int32_t inheritUsOrder;
244
} SExtScanInfo;
245

UNCOV
246
static ERetType extractScanInfo(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdStr) {
×
247
  int32_t       type = pOperator->operatorType;
×
248
  SExtScanInfo* pInfo = pParam->pParam;
×
249

UNCOV
250
  if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN ||
×
251
      type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN ||
×
252
      type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) {
×
253
    pInfo->order = TSDB_ORDER_ASC;
×
254
    pInfo->scanFlag = MAIN_SCAN;
×
255
    return OPTR_FN_RET_ABORT;
×
256
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
×
257
    if (!pInfo->inheritUsOrder) {
×
258
      pInfo->order = TSDB_ORDER_ASC;
×
259
    }
UNCOV
260
    pInfo->scanFlag = MAIN_SCAN;
×
261
    return OPTR_FN_RET_ABORT;
×
262
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
×
263
    STableScanInfo* pTableScanInfo = pOperator->info;
×
264
    pInfo->order = pTableScanInfo->base.cond.order;
×
265
    pInfo->scanFlag = pTableScanInfo->base.scanFlag;
×
266
    return OPTR_FN_RET_ABORT;
×
267
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
×
268
    STableMergeScanInfo* pTableScanInfo = pOperator->info;
×
269
    pInfo->order = pTableScanInfo->base.cond.order;
×
270
    pInfo->scanFlag = pTableScanInfo->base.scanFlag;
×
271
    return OPTR_FN_RET_ABORT;
×
272
  } else {
UNCOV
273
    return OPTR_FN_RET_CONTINUE;
×
274
  }
275
}
276

UNCOV
277
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder) {
×
278
  SExtScanInfo info = {.inheritUsOrder = inheritUsOrder, .order = *order};
×
279
  STraverParam p = {.pParam = &info};
×
280

UNCOV
281
  traverseOperatorTree(pOperator, extractScanInfo, &p, NULL);
×
282
  *order = info.order;
×
283
  *scanFlag = info.scanFlag;
×
284

UNCOV
285
  if (p.code == TSDB_CODE_SUCCESS) {
×
286
    if (!(*order == TSDB_ORDER_ASC || *order == TSDB_ORDER_DESC)) {
×
287
      qError("operator failed at: %s:%d", __func__, __LINE__);
×
288
      p.code = TSDB_CODE_INVALID_PARA;
×
289
    }
290
  }
UNCOV
291
  return p.code;
×
292
}
293

294
static ERetType doStopDataReader(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdStr) {
126,953✔
295
  SStorageAPI* pAPI = pParam->pParam;
126,953✔
296
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
126,953✔
297
    STableScanInfo* pInfo = pOperator->info;
62,205✔
298

299
    if (pInfo->base.dataReader != NULL) {
62,205✔
300
      pAPI->tsdReader.tsdReaderNotifyClosing(pInfo->base.dataReader);
61,428✔
301
    }
302
    return OPTR_FN_RET_ABORT;
62,205✔
303
  } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
64,748✔
UNCOV
304
    STmqQueryScanInfo* pInfo = pOperator->info;
×
305

UNCOV
306
    if (pInfo->pTableScanOp != NULL) {
×
307
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
×
308
      if (pTableScanInfo != NULL && pTableScanInfo->base.dataReader != NULL) {
×
309
        pAPI->tsdReader.tsdReaderNotifyClosing(pTableScanInfo->base.dataReader);
×
310
      }
311
    }
312

UNCOV
313
    return OPTR_FN_RET_ABORT;
×
314
  }
315

316
  return OPTR_FN_RET_CONTINUE;
64,748✔
317
}
318

319
int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr, SStorageAPI* pAPI) {
73,989✔
320
  STraverParam p = {.pParam = pAPI};
73,989✔
321
  traverseOperatorTree(pOperator, doStopDataReader, &p, pIdStr);
73,989✔
322
  return p.code;
73,989✔
323
}
324

325
int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
796,054,107✔
326
                       SNode* pTagIndexCond, const char* pUser, const char* dbname, SOperatorInfo** pOptrInfo,
327
                       EOPTR_EXEC_MODEL model) {
328
  QRY_PARAM_CHECK(pOptrInfo);
796,054,107✔
329

330
  int32_t     code = 0;
796,256,693✔
331
  int32_t     type = nodeType(pPhyNode);
796,256,693✔
332
  const char* idstr = GET_TASKID(pTaskInfo);
796,306,997✔
333

334
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
796,348,485✔
335
    SOperatorInfo* pOperator = NULL;
429,048,253✔
336
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
428,708,928✔
337
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
248,241,660✔
338
      // NOTE: this is an patch to fix the physical plan
339
      // TODO remove it later
340
      if (pTableScanNode->scan.node.pLimit != NULL) {
248,241,660✔
341
        pTableScanNode->groupSort = true;
5,022,720✔
342
      }
343

344
      STableListInfo* pTableListInfo = tableListCreate();
248,312,255✔
345
      if (!pTableListInfo) {
248,407,095✔
UNCOV
346
        pTaskInfo->code = terrno;
×
347
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
348
        return terrno;
×
349
      }
350

351
      // Since virtual stable scan use virtual super table's uid in scan operator, the origin table might be stored on
352
      // different vnode, so we should not get table schema for virtual stable scan.
353
      if (!pTableScanNode->scan.virtualStableScan) {
248,407,095✔
354
        code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo);
235,217,415✔
355
        if (code) {
234,550,222✔
356
          pTaskInfo->code = code;
13✔
357
          tableListDestroy(pTableListInfo);
13✔
358
          return code;
×
359
        }
360
      }
361

362
      if (pTableScanNode->scan.node.dynamicOp) {
247,737,057✔
363
        pTaskInfo->dynamicTask = true;
18,959,725✔
364
        pTableListInfo->idInfo.suid = pTableScanNode->scan.suid;
18,958,362✔
365
        pTableListInfo->idInfo.tableType = pTableScanNode->scan.tableType;
18,956,176✔
366
      } else {
367
        code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
228,586,726✔
368
                                       pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo, NULL);
369
        if (code) {
229,451,806✔
370
          pTaskInfo->code = code;
968✔
371
          tableListDestroy(pTableListInfo);
968✔
372
          qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
968✔
373
          return code;
968✔
374
        }
375
      }
376

377
      code = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo, &pOperator);
248,402,591✔
378
      if (NULL == pOperator || code != 0) {
247,399,611✔
379
        pTaskInfo->code = code;
2,296✔
380
        tableListDestroy(pTableListInfo);
24,240✔
381
        return code;
24,240✔
382
      }
383

384
      STableScanInfo* pScanInfo = pOperator->info;
247,735,986✔
385
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
247,869,175✔
386
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
180,467,268✔
387
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
25,825,217✔
388
      STableListInfo*           pTableListInfo = tableListCreate();
25,825,217✔
389
      if (!pTableListInfo) {
25,919,562✔
UNCOV
390
        pTaskInfo->code = terrno;
×
391
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
392
        return terrno;
×
393
      }
394

395
      if (pTableScanNode->scan.node.dynamicOp) {
25,919,562✔
396
        pTaskInfo->dynamicTask = true;
169,912✔
397
        pTableListInfo->idInfo.suid = pTableScanNode->scan.suid;
169,912✔
398
        pTableListInfo->idInfo.tableType = pTableScanNode->scan.tableType;
169,912✔
399
      } else {
400
        code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle, pTableListInfo,
25,741,748✔
401
                                       pTagCond, pTagIndexCond, pTaskInfo, NULL);
402
        if (code) {
25,727,493✔
UNCOV
403
          pTaskInfo->code = code;
×
404
          tableListDestroy(pTableListInfo);
×
405
          qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
×
406
          return code;
×
407
        }
408

409
        code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo);
25,727,493✔
410
        if (code) {
25,596,007✔
UNCOV
411
          pTaskInfo->code = code;
×
412
          tableListDestroy(pTableListInfo);
×
413
          return code;
×
414
        }
415
      }
416

417
      code = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo, &pOperator);
25,765,919✔
418
      if (NULL == pOperator || code != 0) {
25,602,327✔
419
        pTaskInfo->code = code;
63,437✔
UNCOV
420
        tableListDestroy(pTableListInfo);
×
421
        return code;
×
422
      }
423

424
      STableMergeScanInfo* pScanInfo = pOperator->info;
25,538,890✔
425
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
25,646,545✔
426
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
154,642,051✔
427
      code = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
111,912,070✔
428
                                        pTaskInfo, &pOperator);
429
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
42,729,981✔
430
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
331,321✔
431
      STableListInfo*      pTableListInfo = tableListCreate();
331,321✔
432
      if (!pTableListInfo) {
330,975✔
UNCOV
433
        pTaskInfo->code = terrno;
×
434
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
435
        return terrno;
×
436
      }
437

438
      if (pHandle->vnode && (pTaskInfo->pSubplan->pVTables == NULL)) {
330,975✔
439
        code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
331,321✔
440
                                       pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo, NULL);
441
        if (code) {
331,321✔
UNCOV
442
          pTaskInfo->code = code;
×
443
          tableListDestroy(pTableListInfo);
×
444
          qError("failed to createScanTableListInfo, code:%s", tstrerror(code));
×
445
          return code;
×
446
        }
447
      }
448

449
      code = createTmqScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo, &pOperator);
331,321✔
450
      if (code) {
330,974✔
UNCOV
451
        pTaskInfo->code = code;
×
452
        tableListDestroy(pTableListInfo);
×
453
        return code;
×
454
      }
455
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
42,398,660✔
456
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
29,291,743✔
457
      if (pSysScanPhyNode->scan.virtualStableScan) {
29,291,743✔
458
        STableListInfo*           pTableListInfo = tableListCreate();
17,857,880✔
459
        if (!pTableListInfo) {
17,866,394✔
UNCOV
460
          pTaskInfo->code = terrno;
×
461
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
462
          return terrno;
×
463
        }
464

465
        code = createScanTableListInfo((SScanPhysiNode*)pSysScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
17,866,394✔
466
                                       pTagIndexCond, pTaskInfo, NULL);
467
        if (code != TSDB_CODE_SUCCESS) {
17,856,398✔
UNCOV
468
          pTaskInfo->code = code;
×
469
          tableListDestroy(pTableListInfo);
×
470
          return code;
×
471
        }
472

473
        code = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pTableListInfo, pUser, pTaskInfo, &pOperator);
17,856,398✔
474
      } else {
475
        code = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, NULL, pUser, pTaskInfo, &pOperator);
11,436,878✔
476
      }
477
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN == type) {
13,106,917✔
478
      STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode;
31,095✔
479
      code = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo, &pOperator);
31,095✔
480
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
13,075,822✔
481
      STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode;
10,183,529✔
482
      STableListInfo*    pTableListInfo = tableListCreate();
10,183,529✔
483
      if (!pTableListInfo) {
10,186,071✔
UNCOV
484
        pTaskInfo->code = terrno;
×
485
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
486
        return terrno;
×
487
      }
488

489
      code = initQueriedTableSchemaInfo(pHandle, &pTagScanPhyNode->scan, dbname, pTaskInfo);
10,186,071✔
490
      if (code != TSDB_CODE_SUCCESS) {
10,170,630✔
UNCOV
491
        pTaskInfo->code = code;
×
492
        tableListDestroy(pTableListInfo);
×
493
        return code;
×
494
      }
495

496
      if (!pTagScanPhyNode->onlyMetaCtbIdx) {
10,170,630✔
497
        code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
5,172,686✔
498
                                       pTagIndexCond, pTaskInfo, NULL);
499
        if (code != TSDB_CODE_SUCCESS) {
5,174,689✔
UNCOV
500
          pTaskInfo->code = code;
×
501
          qError("failed to getTableList, code:%s", tstrerror(code));
×
502
          tableListDestroy(pTableListInfo);
×
503
          return code;
×
504
        }
505
      }
506

507
      if (pTagScanPhyNode->scan.node.dynamicOp) {
10,173,448✔
508
        pTaskInfo->dynamicTask = true;
2,558,034✔
509
        pTableListInfo->idInfo.suid = pTagScanPhyNode->scan.suid;
2,554,491✔
510
        pTableListInfo->idInfo.tableType = pTagScanPhyNode->scan.tableType;
2,557,402✔
511
      }
512

513
      code = createTagScanOperatorInfo(pHandle, pTagScanPhyNode, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo,
10,164,889✔
514
                                       &pOperator);
515
      if (code) {
10,170,845✔
UNCOV
516
        pTaskInfo->code = code;
×
517
        tableListDestroy(pTableListInfo);
×
518
        return code;
×
519
      }
520
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
2,892,306✔
521
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
5,496✔
522
      STableListInfo*          pTableListInfo = tableListCreate();
5,496✔
523
      if (!pTableListInfo) {
5,496✔
UNCOV
524
        pTaskInfo->code = terrno;
×
525
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
526
        return terrno;
×
527
      }
528

529
      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
5,496✔
530
        SArray* pList = taosArrayInit(4, sizeof(uint64_t));
4,683✔
531
        code = pTaskInfo->storageAPI.metaFn.getChildTableList(pHandle->vnode, pBlockNode->uid, pList);
4,683✔
532
        if (code != TSDB_CODE_SUCCESS) {
4,683✔
UNCOV
533
          pTaskInfo->code = code;
×
534
          taosArrayDestroy(pList);
×
535
          tableListDestroy(pTableListInfo);
×
536
          return code;
×
537
        }
538

539
        size_t num = taosArrayGetSize(pList);
4,683✔
540
        for (int32_t i = 0; i < num; ++i) {
10,091✔
541
          uint64_t* id = taosArrayGet(pList, i);
5,408✔
542
          if (id == NULL) {
5,408✔
UNCOV
543
            continue;
×
544
          }
545

546
          code = tableListAddTableInfo(pTableListInfo, *id, 0);
5,408✔
547
          if (code) {
5,408✔
UNCOV
548
            pTaskInfo->code = code;
×
549
            tableListDestroy(pTableListInfo);
×
550
            taosArrayDestroy(pList);
×
551
            return code;
13✔
552
          }
553
        }
554

555
        taosArrayDestroy(pList);
4,683✔
556
      } else {  // Create group with only one table
557
        code = tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
813✔
558
        if (code) {
813✔
UNCOV
559
          pTaskInfo->code = code;
×
560
          tableListDestroy(pTableListInfo);
×
561
          return code;
×
562
        }
563
      }
564

565
      code = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTableListInfo, pTaskInfo, &pOperator);
5,496✔
566
      if (code) {
5,496✔
UNCOV
567
        pTaskInfo->code = code;
×
568
        tableListDestroy(pTableListInfo);
×
569
        return code;
×
570
      }
571
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
2,886,810✔
572
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
984,393✔
573
      STableListInfo*        pTableListInfo = tableListCreate();
984,393✔
574
      if (!pTableListInfo) {
984,826✔
UNCOV
575
        pTaskInfo->code = terrno;
×
576
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
577
        return terrno;
×
578
      }
579

580
      code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond,
984,826✔
581
                                     pTagIndexCond, pTaskInfo, NULL);
582
      if (code != TSDB_CODE_SUCCESS) {
985,580✔
UNCOV
583
        pTaskInfo->code = code;
×
584
        tableListDestroy(pTableListInfo);
×
585
        return code;
×
586
      }
587

588
      code = initQueriedTableSchemaInfo(pHandle, &pScanNode->scan, dbname, pTaskInfo);
985,580✔
589
      if (code != TSDB_CODE_SUCCESS) {
984,466✔
UNCOV
590
        pTaskInfo->code = code;
×
591
        tableListDestroy(pTableListInfo);
×
592
        return code;
×
593
      }
594

595
      code = createCacherowsScanOperator(pScanNode, pHandle, pTableListInfo, pTaskInfo, &pOperator);
984,466✔
596
      if (code) {
983,825✔
UNCOV
597
        tableListDestroy(pTableListInfo);
×
598
        pTaskInfo->code = code;
×
599
        return code;
×
600
      }
601
    } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
1,902,618✔
602
      code = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo, &pOperator);
1,899,495✔
603
    } else if (QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN == type) {
3,162✔
604
      // NOTE: this is an patch to fix the physical plan
605
      code = createVirtualTableMergeOperatorInfo(NULL, 0, (SVirtualScanPhysiNode*)pPhyNode, pTaskInfo, &pOperator);
17,942✔
606
    } else {
UNCOV
607
      code = TSDB_CODE_INVALID_PARA;
×
608
      pTaskInfo->code = code;
×
609
      return code;
×
610
    }
611

612
    if (pOperator != NULL) {  // todo moved away
427,411,457✔
613
      pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
428,285,235✔
614
    }
615

616
    *pOptrInfo = pOperator;
427,154,570✔
617
    return code;
428,161,962✔
618
  }
619

620
  size_t          size = LIST_LENGTH(pPhyNode->pChildren);
367,217,313✔
621
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
367,509,608✔
622
  if (ops == NULL) {
367,337,006✔
UNCOV
623
    code = terrno;
×
624
    pTaskInfo->code = code;
×
625
    return code;
×
626
  }
627

628
  for (int32_t i = 0; i < size; ++i) {
783,329,711✔
629
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
417,333,335✔
630
    // For external window parent, pre-initialize runtime from subquery before building children
631
    if (QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW == type && i == 0) {
417,204,666✔
632
      // best-effort pre-init; ignore errors here and let later construction handle them
NEW
633
      (void)extWinPreInitFromSubquery(pPhyNode, pTaskInfo);
×
634
    }
635
    code = createOperator(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser, dbname, &ops[i], model);
417,204,666✔
636
    if (ops[i] == NULL || code != 0) {
416,568,525✔
637
      for (int32_t j = 0; j < i; ++j) {
75,052✔
UNCOV
638
        destroyOperator(ops[j]);
×
639
      }
640
      taosMemoryFree(ops);
75,052✔
641
      return code;
75,052✔
642
    }
643
  }
644

645
  SOperatorInfo* pOptr = NULL;
365,996,376✔
646
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
365,800,327✔
647
    code = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
146,132,802✔
648
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
219,667,525✔
649
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
126,628,437✔
650
    if (pAggNode->pGroupKeys != NULL) {
126,628,437✔
651
      code = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo, &pOptr);
31,319,387✔
652
    } else {
653
      code = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo, &pOptr);
95,792,189✔
654
    }
655
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
93,039,088✔
656
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
4,763,072✔
657
    code = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, &pOptr);
4,763,072✔
658
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
88,276,016✔
659
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
825,986✔
660
    code = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, &pOptr);
825,986✔
661
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
87,450,030✔
UNCOV
662
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
×
UNCOV
663
    code = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, &pOptr);
×
664
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
87,450,030✔
665
    code = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
34,302,108✔
666
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
53,147,922✔
667
    code = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
103,593✔
668
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
53,044,329✔
669
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
13,907,612✔
670
    code = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo, &pOptr);
13,907,612✔
671
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
39,136,717✔
672
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
564,823✔
673
    code = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo, &pOptr);
564,823✔
674
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
38,571,894✔
675
    code = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
3,932,607✔
676
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
34,639,287✔
677
    SStateWindowPhysiNode* pStateNode = (SStateWindowPhysiNode*)pPhyNode;
980,094✔
678
    code = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo, &pOptr);
980,094✔
679
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
33,659,193✔
680
    code = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
13,064,588✔
681
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN == type) {
20,594,605✔
682
    code = createHashJoinOperatorInfo(ops, size, (SHashJoinPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
1,233,795✔
683
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
19,360,810✔
684
    code = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
490,226✔
685
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
18,870,584✔
686
    code = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
3,168,440✔
687
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
15,702,144✔
688
    code = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
3,503,685✔
689
  } else if (QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC == type) {
12,198,459✔
UNCOV
690
    code = createForecastOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
×
691
  } else if (QUERY_NODE_PHYSICAL_PLAN_ANALYSIS_FUNC == type) {
12,198,459✔
UNCOV
692
    code = createGenericAnalysisOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
×
693
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT == type) {
12,198,459✔
694
    code = createEventwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
317,091✔
695
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE == type) {
11,881,368✔
696
    code = createGroupCacheOperatorInfo(ops, size, (SGroupCachePhysiNode*)pPhyNode, pTaskInfo, &pOptr);
1,233,001✔
697
  } else if (QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL == type) {
10,648,367✔
698
    code = createDynQueryCtrlOperatorInfo(ops, size, (SDynQueryCtrlPhysiNode*)pPhyNode, pTaskInfo, pHandle->pMsgCb, &pOptr);
4,641,801✔
699
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT == type) {
6,006,566✔
700
    code = createCountwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
252,247✔
701
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY == type) {
5,754,319✔
UNCOV
702
    code = createAnomalywindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
×
703
  } else if (QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN == type) {
5,754,319✔
704
    SVirtualScanPhysiNode* pVirtualTableScanNode = (SVirtualScanPhysiNode*)pPhyNode;
4,185,589✔
705
    // NOTE: this is an patch to fix the physical plan
706

707
    if (pVirtualTableScanNode->scan.node.pLimit != NULL) {
4,185,589✔
708
      pVirtualTableScanNode->groupSort = true;
×
709
    }
710
    code = createVirtualTableMergeOperatorInfo(ops, size, (SVirtualScanPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
4,185,589✔
711
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_EXTERNAL == type) {
1,568,730✔
712
    if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
1,496,737✔
713
      code = createStreamExternalWindowOperator(ops[0], pPhyNode, pTaskInfo, &pOptr);
184,705✔
714
    } else {
715
      code = createExternalWindowOperator(ops[0], pPhyNode, pTaskInfo, &pOptr);
1,310,420✔
716
    }
717
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_EXTERNAL == type) {
74,348✔
718
    if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
61,955✔
719
      code = createStreamMergeAlignedExternalWindowOperator(ops[0], pPhyNode, pTaskInfo, &pOptr);
19,237✔
720
    } else {
721
      code = createMergeAlignedExternalWindowOperator(ops[0], pPhyNode, pTaskInfo, &pOptr);
42,718✔
722
    }
723
  } else {
724
    code = TSDB_CODE_INVALID_PARA;
12,393✔
725
    pTaskInfo->code = code;
12,393✔
726
    for (int32_t i = 0; i < size; ++i) {
13✔
UNCOV
727
      destroyOperator(ops[i]);
×
728
    }
729
    taosMemoryFree(ops);
13✔
UNCOV
730
    qError("invalid operator type %d", type);
×
UNCOV
731
    return code;
×
732
  }
733

734
  taosMemoryFree(ops);
366,979,058✔
735
  if (pOptr) {
366,697,850✔
736
    pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
366,260,301✔
737
  }
738

739
  *pOptrInfo = pOptr;
366,825,099✔
740
  return code;
366,923,822✔
741
}
742

743
void destroyOperator(SOperatorInfo* pOperator) {
797,513,908✔
744
  if (pOperator == NULL) {
797,513,908✔
745
    return;
476,416✔
746
  }
747

748
  freeResetOperatorParams(pOperator, OP_GET_PARAM, true);
797,037,492✔
749
  freeResetOperatorParams(pOperator, OP_NOTIFY_PARAM, true);
796,917,532✔
750

751
  if (pOperator->pDownstream != NULL) {
796,925,662✔
752
    for (int32_t i = 0; i < pOperator->numOfRealDownstream; ++i) {
783,947,838✔
753
      destroyOperator(pOperator->pDownstream[i]);
417,037,310✔
754
    }
755

756
    taosMemoryFreeClear(pOperator->pDownstream);
367,014,520✔
757
    pOperator->numOfDownstream = 0;
366,871,037✔
758
  }
759

760
  cleanupExprSupp(&pOperator->exprSupp);
796,796,084✔
761

762
  if (pOperator->fpSet.closeFn != NULL && pOperator->info != NULL) {
796,807,620✔
763
    pOperator->fpSet.closeFn(pOperator->info);
796,455,521✔
764
    pOperator->info = NULL;
796,027,924✔
765
  }
766

767
  taosMemoryFreeClear(pOperator);
796,561,024✔
768
}
769

770
void destroyOperatorAndDownstreams(SOperatorInfo* pOperator, SOperatorInfo** downstreams, int32_t num) {
451,208✔
771
  if (downstreams != NULL) {
451,208✔
772
    for (int i = 0; i < num; i++) {
902,416✔
773
      destroyOperator(downstreams[i]);
451,208✔
774
    }
775
  }
776

777
  if (pOperator != NULL) {
451,208✔
778
    pOperator->info = NULL;
451,208✔
779
    if (pOperator->pDownstream != NULL) {
451,208✔
UNCOV
780
      taosMemoryFreeClear(pOperator->pDownstream);
×
UNCOV
781
      pOperator->pDownstream = NULL;
×
782
    }
783
    destroyOperator(pOperator);
451,208✔
784
  }
785
}
451,208✔
786

787
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
7,206,733✔
788
  SExplainExecInfo  execInfo = {0};
7,206,733✔
789
  SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
7,211,834✔
790
  if (pExplainInfo == NULL) {
7,211,834✔
UNCOV
791
    return terrno;
×
792
  }
793

794
  pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
7,211,834✔
795
  pExplainInfo->verboseLen = 0;
7,210,905✔
796
  pExplainInfo->verboseInfo = NULL;
7,209,291✔
797
  pExplainInfo->vgId = operatorInfo->pTaskInfo->id.vgId;
7,207,281✔
798
  pExplainInfo->execCreate = operatorInfo->cost.execCreate;
7,209,686✔
799

800
  /*
801
    For the start, first and last row ts, we need to subtract the execCreate time
802
    to compute the real elapsed time since the operator is created.
803
  */
804
  if (operatorInfo->resultInfo.totalRows > 0) {
7,209,016✔
805
    /*
806
      When there is no data returned, keep execFirstRow and execLastRow as 0.
807
    */
808
    pExplainInfo->execFirstRow = operatorInfo->cost.execFirstRow - operatorInfo->cost.execCreate;
6,083,379✔
809
    pExplainInfo->execLastRow = operatorInfo->cost.execLastRow - operatorInfo->cost.execCreate;
6,084,988✔
810
  }
811

812
  pExplainInfo->execTimes = operatorInfo->cost.execTimes;
7,209,025✔
813
  if (operatorInfo->cost.execTimes > 0) {
7,210,762✔
814
    pExplainInfo->execStart = operatorInfo->cost.execStart - operatorInfo->cost.execCreate;
7,200,193✔
815
    pExplainInfo->execElapsed = operatorInfo->cost.execElapsed;
7,201,788✔
816
    pExplainInfo->inputWaitElapsed = operatorInfo->cost.inputWaitElapsed;
7,202,746✔
817
    pExplainInfo->outputWaitElapsed = operatorInfo->cost.outputWaitElapsed;
7,202,595✔
818
    pExplainInfo->inputRows = operatorInfo->cost.inputRows;
7,202,351✔
819
  }
820

821
  if (operatorInfo->fpSet.getExplainFn) {
7,204,621✔
822
    int32_t code =
823
        operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
4,798,361✔
824
    if (code) {
4,797,833✔
UNCOV
825
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
×
UNCOV
826
      return code;
×
827
    }
828
  }
829

830
  int32_t code = 0;
7,204,596✔
831
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
11,281,664✔
832
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
4,076,274✔
833
    if (code != TSDB_CODE_SUCCESS) {
4,077,068✔
834
      //      taosMemoryFreeClear(*pRes);
UNCOV
835
      return code;
×
836
    }
837
  }
838

839
  return TSDB_CODE_SUCCESS;
7,204,584✔
840
}
841

UNCOV
842
int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc) {
×
UNCOV
843
  if (pDst->opType != pSrc->opType) {
×
UNCOV
844
    qError("different optype %d:%d for merge operator params", pDst->opType, pSrc->opType);
×
UNCOV
845
    return TSDB_CODE_INVALID_PARA;
×
846
  }
847

UNCOV
848
  switch (pDst->opType) {
×
UNCOV
849
    case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
×
850
      SExchangeOperatorParam* pDExc = pDst->value;
×
UNCOV
851
      SExchangeOperatorParam* pSExc = pSrc->value;
×
UNCOV
852
      if (pSExc->basic.paramType != DYN_TYPE_EXCHANGE_PARAM) {
×
UNCOV
853
        qError("%s, invalid exchange operator param type %d for "
×
854
          "source operator", __func__, pSExc->basic.paramType);
UNCOV
855
        return TSDB_CODE_INVALID_PARA;
×
856
      }
857
      if (!pDExc->multiParams) {
×
858
        if (pDExc->basic.paramType != DYN_TYPE_EXCHANGE_PARAM) {
×
859
          qError("%s, invalid exchange operator param type %d for "
×
860
            "destination operator", __func__, pDExc->basic.paramType);
UNCOV
861
          return TSDB_CODE_INVALID_PARA;
×
862
        }
863
        if (pSExc->basic.vgId != pDExc->basic.vgId) {
×
864
          SExchangeOperatorBatchParam* pBatch = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
×
865
          if (NULL == pBatch) {
×
866
            return terrno;
×
867
          }
868

UNCOV
869
          pBatch->multiParams = true;
×
870
          pBatch->pBatchs = tSimpleHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
UNCOV
871
          if (NULL == pBatch->pBatchs) {
×
872
            taosMemoryFree(pBatch);
×
873
            return terrno;
×
874
          }
875

876
          tSimpleHashSetFreeFp(pBatch->pBatchs, freeExchangeGetBasicOperatorParam);
×
877

878
          int32_t code = tSimpleHashPut(pBatch->pBatchs, &pDExc->basic.vgId, sizeof(pDExc->basic.vgId), &pDExc->basic,
×
879
                                        sizeof(pDExc->basic));
880
          if (code) {
×
881
            return code;
×
882
          }
883

884
          code = tSimpleHashPut(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId), &pSExc->basic,
×
885
                                sizeof(pSExc->basic));
886
          if (code) {
×
887
            return code;
×
888
          }
889

UNCOV
890
          taosMemoryFree(pDst->value);
×
891
          pDst->value = pBatch;
×
892
        } else {
893
          void* p = taosArrayAddAll(pDExc->basic.uidList, pSExc->basic.uidList);
×
UNCOV
894
          if (p == NULL) {
×
895
            return terrno;
×
896
          }
897
        }
898
      } else {
899
        SExchangeOperatorBatchParam* pBatch = pDst->value;
×
900
        SExchangeOperatorBasicParam* pBasic =
901
            tSimpleHashGet(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId));
×
902
        if (pBasic) {
×
UNCOV
903
          void* p = taosArrayAddAll(pBasic->uidList, pSExc->basic.uidList);
×
UNCOV
904
          if (p == NULL) {
×
905
            return terrno;
×
906
          }
907
        } else {
908
          int32_t code = tSimpleHashPut(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId), &pSExc->basic,
×
909
                                        sizeof(pSExc->basic));
910
          if (code) {
×
UNCOV
911
            return code;
×
912
          }
913
        }
914
      }
UNCOV
915
      break;
×
916
    }
917
    default:
×
918
      qError("invalid optype %d for merge operator params", pDst->opType);
×
919
      return TSDB_CODE_INVALID_PARA;
×
920
  }
921

UNCOV
922
  return TSDB_CODE_SUCCESS;
×
923
}
924

925
int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInput, SOperatorParamType type) {
149,960,098✔
926
  SOperatorParam**  ppParam = NULL;
149,960,098✔
927
  SOperatorParam*** pppDownstramParam = NULL;
149,960,098✔
928
  switch (type) {
149,960,098✔
929
    case OP_GET_PARAM:
149,960,583✔
930
      ppParam = &pOperator->pOperatorGetParam;
149,960,583✔
931
      pppDownstramParam = &pOperator->pDownstreamGetParams;
149,963,313✔
932
      break;
149,961,761✔
933
    case OP_NOTIFY_PARAM:
×
934
      ppParam = &pOperator->pOperatorNotifyParam;
×
UNCOV
935
      pppDownstramParam = &pOperator->pDownstreamNotifyParams;
×
UNCOV
936
      break;
×
937
    default:
61✔
938
      return TSDB_CODE_INVALID_PARA;
61✔
939
  }
940

941
  freeResetOperatorParams(pOperator, type, false);
149,961,761✔
942

943
  if (NULL == pInput) {
149,988,502✔
UNCOV
944
    return TSDB_CODE_SUCCESS;
×
945
  }
946

947
  *ppParam = (pInput->opType == pOperator->operatorType) ? pInput : NULL;
149,988,502✔
948

949
  if (NULL == *pppDownstramParam) {
149,994,749✔
950
    *pppDownstramParam = taosMemoryCalloc(pOperator->numOfDownstream, POINTER_BYTES);
26,979,041✔
951
    if (NULL == *pppDownstramParam) {
26,973,801✔
952
      return terrno;
×
953
    }
954
  }
955

956
  if (NULL == *ppParam) {
149,986,779✔
UNCOV
957
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
×
UNCOV
958
      (*pppDownstramParam)[i] = pInput;
×
959
    }
UNCOV
960
    return TSDB_CODE_SUCCESS;
×
961
  }
962

963
  memset(*pppDownstramParam, 0, pOperator->numOfDownstream * POINTER_BYTES);
149,986,761✔
964

965
  int32_t childrenNum = taosArrayGetSize((*ppParam)->pChildren);
149,987,306✔
966
  if (childrenNum <= 0) {
149,987,634✔
967
    return TSDB_CODE_SUCCESS;
133,259,216✔
968
  }
969

970
  for (int32_t i = 0; i < childrenNum; ++i) {
38,022,787✔
971
    SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet((*ppParam)->pChildren, i);
21,302,594✔
972
    if (pChild == NULL) {
21,303,962✔
973
      return terrno;
×
974
    }
975

976
    if ((*pppDownstramParam)[pChild->downstreamIdx]) {
21,303,962✔
UNCOV
977
      int32_t code = mergeOperatorParams((*pppDownstramParam)[pChild->downstreamIdx], pChild);
×
UNCOV
978
      if (code) {
×
UNCOV
979
        return code;
×
980
      }
981
    } else {
982
      (*pppDownstramParam)[pChild->downstreamIdx] = pChild;
21,290,596✔
983
    }
984
  }
985

986
  taosArrayDestroy((*ppParam)->pChildren);
16,720,193✔
987
  (*ppParam)->pChildren = NULL;
16,717,691✔
988

989
  return TSDB_CODE_SUCCESS;
16,716,502✔
990
}
991

992
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) {
870,210,842✔
993
  recordOpExecBeforeDownstream(pOperator);
870,210,842✔
994
  SSDataBlock* p = NULL;
869,982,460✔
995
  int32_t      code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p);
870,374,719✔
996
  if (code == TSDB_CODE_SUCCESS) {
866,897,064✔
997
    code = blockDataCheck(p);
867,039,299✔
998
    if (code != TSDB_CODE_SUCCESS) {
867,601,810✔
UNCOV
999
      qError("%s, %s failed at line %d, code:%s", GET_TASKID(pOperator->pTaskInfo),
×
1000
             __func__, __LINE__, tstrerror(code));
1001
    }
1002
  }
1003
  recordOpExecAfterDownstream(pOperator, p && code == TSDB_CODE_SUCCESS ? p->info.rows : 0);
867,212,405✔
1004
  return (code == TSDB_CODE_SUCCESS) ? p : NULL;
867,284,679✔
1005
}
1006

1007
SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, int32_t idx) {
71,897,099✔
1008
  recordOpExecBeforeDownstream(pOperator);
71,897,099✔
1009
  SSDataBlock* p = NULL;
71,896,060✔
1010
  int32_t      code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p);
71,897,076✔
1011
  if (code == TSDB_CODE_SUCCESS) {
71,887,990✔
1012
    code = blockDataCheck(p);
71,887,990✔
1013
    if (code != TSDB_CODE_SUCCESS) {
71,885,920✔
1014
      qError("%s, %s failed at line %d, code:%s", GET_TASKID(pOperator->pTaskInfo),
×
1015
             __func__, __LINE__, tstrerror(code));
1016
    }
1017
  }
1018
  recordOpExecAfterDownstream(pOperator, p && code == TSDB_CODE_SUCCESS ? p->info.rows : 0);
71,886,974✔
1019
  return (code == TSDB_CODE_SUCCESS) ? p : NULL;
71,888,988✔
1020
}
1021

1022
/*
1023
 * Fetch one block from downstream without preserving reused get-param ownership.
1024
 *
1025
 * @param pOperator Current operator.
1026
 * @param idx Downstream index.
1027
 * @param pResBlock Output result block pointer.
1028
 *
1029
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
1030
 */
1031
static FORCE_INLINE int32_t getNextBlockFromDownstreamRemainDetachImpl(struct SOperatorInfo* pOperator, int32_t idx,
1032
                                                                       SSDataBlock** pResBlock) {
1033
  QRY_PARAM_CHECK(pResBlock);
960,224,525✔
1034

1035
  int32_t code = TSDB_CODE_SUCCESS;
960,286,990✔
1036
  int32_t lino = 0;
960,286,990✔
1037

1038
  if (pOperator->pDownstreamGetParams && pOperator->pDownstreamGetParams[idx]) {
972,743,782✔
1039
    SOperatorParam* pGetParam = pOperator->pDownstreamGetParams[idx];
12,450,763✔
1040
    pOperator->pDownstreamGetParams[idx] = NULL;
12,443,909✔
1041
    // Once detached from the parent operator, downstream must own/free this param.
1042
    pGetParam->reUse = false;
12,451,395✔
1043

1044
    qDebug("DynOp: op %s start to get block from downstream %s", pOperator->name, pOperator->pDownstream[idx]->name);
12,441,870✔
1045
    code = pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx], pGetParam, pResBlock);
12,441,870✔
1046
    QUERY_CHECK_CODE(code, lino, _return);
12,456,792✔
1047
  } else {
1048
    code = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx], pResBlock);
947,894,022✔
1049
    QUERY_CHECK_CODE(code, lino, _return);
945,906,361✔
1050
  }
1051

1052
_return:
945,906,361✔
1053
  if (code) {
958,363,153✔
UNCOV
1054
    qError("failed to get next data block from upstream at %s, line:%d code:%s", __func__, lino, tstrerror(code));
×
1055
  }
1056
  return code;
958,302,079✔
1057
}
1058

1059
/*
1060
 * Fetch and validate one downstream block while detaching one-shot get-param.
1061
 *
1062
 * @param pOperator Current operator.
1063
 * @param idx Downstream index.
1064
 *
1065
 * @return Valid block pointer on success, or NULL on failure.
1066
 */
1067
SSDataBlock* getNextBlockFromDownstreamRemainDetach(struct SOperatorInfo* pOperator, int32_t idx) {
960,376,700✔
1068
  SSDataBlock* p = NULL;
960,376,700✔
1069
  int32_t      code = TSDB_CODE_SUCCESS;
960,458,423✔
1070
  int32_t      lino = 0;
960,458,423✔
1071
  recordOpExecBeforeDownstream(pOperator);
960,458,423✔
1072

1073
  code = getNextBlockFromDownstreamRemainDetachImpl(pOperator, idx, &p);
958,302,079✔
1074
  QUERY_CHECK_CODE(code, lino, _return);
958,302,079✔
1075

1076
  code = blockDataCheck(p);
958,302,079✔
1077
  QUERY_CHECK_CODE(code, lino, _return);
958,516,046✔
1078

1079
_return:
958,516,046✔
1080
  recordOpExecAfterDownstream(pOperator, p && code == TSDB_CODE_SUCCESS ? p->info.rows : 0);
958,598,306✔
1081
  if (code) {
958,492,336✔
1082
    qError("failed to get next data block from downstream at %s, line:%d code:%s", __func__, lino, tstrerror(code));
83,910✔
UNCOV
1083
    return NULL;
×
1084
  }
1085
  return p;
958,408,748✔
1086
}
1087

1088
int32_t optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SSDataBlock** pRes) {
149,986,744✔
1089
  QRY_PARAM_CHECK(pRes);
149,986,744✔
1090

1091
  int32_t lino = 0;
149,991,242✔
1092
  int32_t code = setOperatorParams(pOperator, pParam, OP_GET_PARAM);
149,991,242✔
1093
  QUERY_CHECK_CODE(code, lino, _end);
149,976,273✔
1094
  code = pOperator->fpSet.getNextFn(pOperator, pRes);
149,976,273✔
1095
  QUERY_CHECK_CODE(code, lino, _end);
149,980,275✔
1096

1097
_end:
149,980,275✔
1098
  if (code != TSDB_CODE_SUCCESS) {
149,980,275✔
UNCOV
1099
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
1100
    pOperator->pTaskInfo->code = code;
×
UNCOV
1101
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1102
  }
1103

1104
  return code;
149,980,275✔
1105
}
1106

UNCOV
1107
int32_t optrDefaultNotifyFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
×
UNCOV
1108
  int32_t code = setOperatorParams(pOperator, pParam, OP_NOTIFY_PARAM);
×
UNCOV
1109
  if (TSDB_CODE_SUCCESS == code && pOperator->fpSet.notifyFn && pOperator->pOperatorNotifyParam) {
×
UNCOV
1110
    code = pOperator->fpSet.notifyFn(pOperator, pOperator->pOperatorNotifyParam);
×
1111
  }
UNCOV
1112
  if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1113
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
×
1114
      if (pOperator->pDownstreamNotifyParams[i]) {
×
1115
        code = optrDefaultNotifyFn(pOperator->pDownstream[i], pOperator->pDownstreamNotifyParams[i]);
×
1116
        if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1117
          break;
×
1118
        }
UNCOV
1119
        pOperator->pDownstreamNotifyParams[i] = NULL;
×
1120
      }
1121
    }
1122
  }
1123
  if (TSDB_CODE_SUCCESS != code) {
×
1124
    pOperator->pTaskInfo->code = code;
×
1125
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1126
  }
1127

1128
  return code;
×
1129
}
1130

1131
int64_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx) {
29,379,723✔
1132
  if (pOperator->transparent) {
29,379,723✔
1133
    return getOperatorResultBlockId(pOperator->pDownstream[idx], 0);
2,466,002✔
1134
  }
1135
  return pOperator->resultDataBlockId;
26,915,773✔
1136
}
1137

1138
void resetOperatorState(SOperatorInfo *pOper) {
×
1139
  pOper->status = OP_NOT_OPENED;
×
1140
}
×
1141

1142
void resetBasicOperatorState(SOptrBasicInfo *pBasicInfo) {
18,556,401✔
1143
  if (pBasicInfo->pRes) blockDataCleanup(pBasicInfo->pRes);
18,556,401✔
1144
  initResultRowInfo(&pBasicInfo->resultRowInfo);
18,556,382✔
1145
}
18,557,649✔
1146

1147
int32_t resetAggSup(SExprSupp* pExprSupp, SAggSupporter* pSup, SExecTaskInfo* pTaskInfo,
16,080,040✔
1148
                    SNodeList* pNodeList, SNodeList* pGroupKeys, size_t keyBufSize, const char* pKey, void* pState,
1149
                    SFunctionStateStore* pStore) {
1150
  int32_t    code = 0, lino = 0, num = 0;
16,080,040✔
1151
  SExprInfo* pExprInfo = NULL;
16,080,040✔
1152
  cleanupExprSuppWithoutFilter(pExprSupp);
16,080,040✔
1153
  cleanupAggSup(pSup);
16,079,462✔
1154
  code = createExprInfo(pNodeList, pGroupKeys, &pExprInfo, &num);
16,079,273✔
1155
  QUERY_CHECK_CODE(code, lino, _error);
16,078,675✔
1156
  code = initAggSup(pExprSupp, pSup, pExprInfo, num, keyBufSize, pKey, pState, pStore);
16,078,675✔
1157
  QUERY_CHECK_CODE(code, lino, _error);
16,079,051✔
1158
  return code;
16,079,051✔
UNCOV
1159
_error:
×
UNCOV
1160
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1161
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
1162
    pTaskInfo->code = code;
×
1163
  }
UNCOV
1164
  return code;
×
1165
}
1166

1167
int32_t resetExprSupp(SExprSupp* pExprSupp, SExecTaskInfo* pTaskInfo, SNodeList* pNodeList,
6,107,272✔
1168
                      SNodeList* pGroupKeys, SFunctionStateStore* pStore) {
1169
  int32_t code = 0, lino = 0, num = 0;
6,107,272✔
1170
  SExprInfo* pExprInfo = NULL;
6,107,272✔
1171
  cleanupExprSuppWithoutFilter(pExprSupp);
6,107,272✔
1172
  code = createExprInfo(pNodeList, pGroupKeys, &pExprInfo, &num);
6,107,272✔
1173
  QUERY_CHECK_CODE(code, lino, _error);
6,107,036✔
1174
  code = initExprSupp(pExprSupp, pExprInfo, num, pStore);
6,107,036✔
1175
  QUERY_CHECK_CODE(code, lino, _error);
6,107,036✔
1176
  return code;
6,107,036✔
1177
_error:
×
UNCOV
1178
  if (code != TSDB_CODE_SUCCESS) {
×
1179
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
1180
    pTaskInfo->code = code;
×
1181
  }
UNCOV
1182
  return code;
×
1183
}
1184

1185
int32_t copyColumnsValue(SNodeList* pNodeList, int64_t targetBlkId, SSDataBlock* pDst, SSDataBlock* pSrc, int32_t totalRows) {
1,383,294✔
1186
  bool    isNull = (NULL == pSrc || pSrc->info.rows <= 0);
1,383,294✔
1187
  size_t  numOfCols = LIST_LENGTH(pNodeList);
1,383,294✔
1188
  int64_t numOfRows = isNull ? 0 : pSrc->info.rows;
1,383,294✔
1189
  int32_t code = 0;
1,383,294✔
1190
  int32_t lino = 0;
1,383,294✔
1191

1192
  for (int32_t i = 0; i < numOfCols; ++i) {
9,425,668✔
1193
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
8,042,374✔
1194
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == targetBlkId) {
8,042,374✔
1195
      SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, pNode->slotId);
4,276,108✔
1196
      QUERY_CHECK_NULL(pDstCol, code, lino, _return, terrno)
4,276,108✔
1197

1198
      if (isNull) {
4,276,108✔
UNCOV
1199
        colDataSetNNULL(pDstCol, 0, totalRows);
×
1200
      } else {
1201
        SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, ((SColumnNode*)pNode->pExpr)->slotId);
4,276,108✔
1202
        QUERY_CHECK_NULL(pSrcCol, code, lino, _return, terrno)
4,276,108✔
1203

1204
        code = colDataAssign(pDstCol, pSrcCol, pSrc->info.rows, &pDst->info);
4,276,108✔
1205

1206
        QUERY_CHECK_CODE(code, lino, _return);
4,276,108✔
1207
        if (pSrc->info.rows < totalRows) {
4,276,108✔
1208
          colDataSetNNULL(pDstCol, pSrc->info.rows, totalRows - pSrc->info.rows);
80,442✔
1209
        }
1210
      }
1211
    }
1212
  }
1213

1214
  return code;
1,383,294✔
UNCOV
1215
_return:
×
UNCOV
1216
  qError("failed to copy columns value, line:%d code:%s", lino, tstrerror(code));
×
UNCOV
1217
  return code;
×
1218
}
1219

1220
/**
1221
  @brief Record the create time of the operator and do initialization
1222
  to other metrics. This function will be called at the beginning of
1223
  creation of operators.
1224
*/
1225
void initOperatorCostInfo(SOperatorInfo* pOperator) {
796,184,999✔
1226
  pOperator->cost.execCreate = taosGetTimestampUs();
796,640,904✔
1227
  resetOperatorCostInfo(pOperator);
796,673,416✔
1228
}
796,249,662✔
1229

1230
/**
1231
  @brief Record the performance metrics at the beginning of the
1232
  operator execution:
1233
  - record the start time of the operator execution
1234
  - calculate the output wait time (time since last call returned)
1235
  - record the first time nextFn interface is called
1236
  Only record the metrics in explain analyze mode.
1237
*/
1238
void recordOpExecBegin(SOperatorInfo* pOperator) {
2,147,483,647✔
1239
  if (QUERY_ENABLE_EXPLAIN(pOperator->pTaskInfo)) {
2,147,483,647✔
1240
    pOperator->cost.startTs = taosGetTimestampUs();
19,954,937✔
1241
    // calculate output wait time (time since last call returned)
1242
    if (pOperator->cost.execLastRow > 0) {
19,955,618✔
1243
      pOperator->cost.outputWaitElapsed +=
25,318,167✔
1244
        pOperator->cost.startTs - pOperator->cost.execLastRow;
12,668,768✔
1245
    }
1246
    // record the first time nextFn is called
1247
    if (pOperator->cost.execStart == 0) {
19,939,988✔
1248
      pOperator->cost.execStart = pOperator->cost.startTs;
7,269,117✔
1249
    }
1250
  }
1251
}
2,147,483,647✔
1252

1253
/**
1254
  @brief Record the performance metrics before the downstream execution:
1255
  - record the elapsed time since the operator execution started
1256
  - update the start time of the operator execution
1257
  Only record the metrics in explain analyze mode.
1258
*/
1259
void recordOpExecBeforeDownstream(SOperatorInfo* pOperator) {
2,147,483,647✔
1260
  if (QUERY_ENABLE_EXPLAIN(pOperator->pTaskInfo)) {
2,147,483,647✔
1261
    pOperator->cost.endTs = taosGetTimestampUs();
11,266,248✔
1262
    if (UNLIKELY(pOperator->cost.startTs == 0)) {
11,264,781✔
1263
      /**
1264
        As long as getNextBlockFromDownstream() is called inside getNextFn(),
1265
        startTs must be initialized before endTs so that this condition should
1266
        always be false.
1267
      */
UNCOV
1268
      pOperator->cost.startTs = pOperator->cost.endTs;
×
1269
    }
1270
    pOperator->cost.execElapsed +=
22,520,106✔
1271
      pOperator->cost.endTs - pOperator->cost.startTs;
11,257,990✔
1272
  }
1273
}
2,147,483,647✔
1274

1275
/**
1276
  @brief Record the performance metrics after the downstream execution:
1277
  - record the elapsed time since the downstream execution started
1278
  - update the end time of the downstream execution
1279
  Only record the metrics in explain analyze mode.
1280
*/
1281
void recordOpExecAfterDownstream(SOperatorInfo* pOperator, size_t inputRows) {
2,147,483,647✔
1282
  if (QUERY_ENABLE_EXPLAIN(pOperator->pTaskInfo)) {
2,147,483,647✔
1283
    pOperator->cost.startTs = taosGetTimestampUs();
11,253,866✔
1284
    pOperator->cost.inputWaitElapsed +=
22,507,174✔
1285
      pOperator->cost.startTs - pOperator->cost.endTs;
11,258,126✔
1286
    pOperator->cost.inputRows += inputRows;
11,249,053✔
1287
  }
1288
}
2,147,483,647✔
1289

1290
/**
1291
  @brief Record the performance metrics at the end of the
1292
  operator execution:
1293
  - record the number of times the operator's next interface is called
1294
  - record the elapsed time for executing the operator's nextFn interface
1295
  - record the time when the first row is returned
1296
  - record the time when the last row is returned
1297
  Only record the metrics in explain analyze mode.
1298
*/
1299
void recordOpExecEnd(SOperatorInfo* pOperator, size_t rows) {
2,147,483,647✔
1300
  if (QUERY_ENABLE_EXPLAIN(pOperator->pTaskInfo)) {
2,147,483,647✔
1301
    pOperator->cost.endTs = taosGetTimestampUs();
19,941,225✔
1302
    pOperator->cost.execTimes++;
19,936,453✔
1303
    pOperator->cost.execElapsed +=
39,868,507✔
1304
      pOperator->cost.endTs - pOperator->cost.startTs;
19,929,364✔
1305

1306
    if (rows > 0) {
19,936,592✔
1307
      // record the first time data is returned
1308
      if (pOperator->cost.execFirstRow == 0) {
12,700,501✔
1309
        pOperator->cost.execFirstRow = pOperator->cost.endTs;
6,102,112✔
1310
      }
1311
      pOperator->cost.execLastRow = pOperator->cost.endTs;
12,708,997✔
1312
    }
1313
  }
1314
  
1315
  /* accumulate output total rows even not in explain mode */
1316
  pOperator->resultInfo.totalRows += rows;
2,147,483,647✔
1317
}
2,147,483,647✔
1318

1319
/**
1320
  @brief Reset operator's cost info but keep create time unchanged.
1321
*/
1322
void resetOperatorCostInfo(SOperatorInfo* pOperator) {
835,890,165✔
1323
  /* keep execCreate UNCHANGED!!! */
1324
  pOperator->cost.execStart = 0;
835,890,165✔
1325
  pOperator->cost.execFirstRow = 0;
836,157,197✔
1326
  pOperator->cost.execLastRow = 0;
836,086,558✔
1327
  pOperator->cost.execTimes = 0;
836,038,802✔
1328
  pOperator->cost.execElapsed = 0;
836,026,595✔
1329
  pOperator->cost.inputWaitElapsed = 0;
836,000,517✔
1330
  pOperator->cost.outputWaitElapsed = 0;
835,856,934✔
1331
  pOperator->cost.inputRows = 0;
836,043,965✔
1332
}
836,039,021✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc