• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3543

29 Nov 2024 02:58AM UTC coverage: 60.842% (+0.02%) from 60.819%
#3543

push

travis-ci

web-flow
Merge pull request #28973 from taosdata/merge/mainto3.0

merge: from main to 3.0

120460 of 253224 branches covered (47.57%)

Branch coverage included in aggregate %.

706 of 908 new or added lines in 18 files covered. (77.75%)

2401 existing lines in 137 files now uncovered.

201633 of 276172 relevant lines covered (73.01%)

19045673.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.77
/source/libs/executor/src/operator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "function.h"
18
#include "os.h"
19
#include "tname.h"
20

21
#include "tglobal.h"
22

23
#include "executorInt.h"
24
#include "index.h"
25
#include "operator.h"
26
#include "query.h"
27
#include "querytask.h"
28

29
#include "storageapi.h"
30
#include "tdatablock.h"
31

32

33
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
19,493,107✔
34
                                   __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn,
35
                                   __optr_explain_fn_t explain, __optr_get_ext_fn_t nextExtFn, __optr_notify_fn_t notifyFn) {
36
  SOperatorFpSet fpSet = {
19,493,107✔
37
      ._openFn = openFn,
38
      .getNextFn = nextFn,
39
      .cleanupFn = cleanup,
40
      .closeFn = closeFn,
41
      .reqBufFn = reqBufFn,
42
      .getExplainFn = explain,
43
      .getNextExtFn = nextExtFn,
44
      .notifyFn = notifyFn,
45
      .releaseStreamStateFn = NULL,
46
      .reloadStreamStateFn = NULL,
47
  };
48

49
  return fpSet;
19,493,107✔
50
}
51

52
void setOperatorStreamStateFn(SOperatorInfo* pOperator, __optr_state_fn_t relaseFn, __optr_state_fn_t reloadFn) {
1,897,638✔
53
  pOperator->fpSet.releaseStreamStateFn = relaseFn;
1,897,638✔
54
  pOperator->fpSet.reloadStreamStateFn = reloadFn;
1,897,638✔
55
}
1,897,638✔
56

57
int32_t optrDummyOpenFn(SOperatorInfo* pOperator) {
8,988,407✔
58
  OPTR_SET_OPENED(pOperator);
8,988,407✔
59
  pOperator->cost.openCost = 0;
8,988,407✔
60
  return TSDB_CODE_SUCCESS;
8,988,407✔
61
}
62

63
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
9,557,979✔
64
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
9,557,979✔
65
  if (p->pDownstream == NULL) {
9,568,164✔
66
    return terrno;
1,792✔
67
  }
68

69
  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
9,566,372✔
70
  p->numOfDownstream = num;
9,566,372✔
71
  p->numOfRealDownstream = num;
9,566,372✔
72
  return TSDB_CODE_SUCCESS;
9,566,372✔
73
}
74

75
void setOperatorCompleted(SOperatorInfo* pOperator) {
17,466,473✔
76
  pOperator->status = OP_EXEC_DONE;
17,466,473✔
77
  pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start) / 1000.0;
17,468,277✔
78
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
17,468,277✔
79
}
17,468,602✔
80

81
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
19,485,716✔
82
                     void* pInfo, SExecTaskInfo* pTaskInfo) {
83
  pOperator->name = (char*)name;
19,485,716✔
84
  pOperator->operatorType = type;
19,485,716✔
85
  pOperator->blocking = blocking;
19,485,716✔
86
  pOperator->status = status;
19,485,716✔
87
  pOperator->info = pInfo;
19,485,716✔
88
  pOperator->pTaskInfo = pTaskInfo;
19,485,716✔
89
}
19,485,716✔
90

91
// each operator should be set their own function to return total cost buffer
92
int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
×
93
  if (pOperator->blocking) {
×
94
    return -1;
×
95
  } else {
96
    return 0;
×
97
  }
98
}
99

100
static int64_t getQuerySupportBufSize(size_t numOfTables) {
×
101
  size_t s1 = sizeof(STableQueryInfo);
×
102
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
103
  return (int64_t)(s1 * 1.5 * numOfTables);
×
104
}
105

106
int32_t checkForQueryBuf(size_t numOfTables) {
×
107
  int64_t t = getQuerySupportBufSize(numOfTables);
×
108
  if (tsQueryBufferSizeBytes < 0) {
×
109
    return TSDB_CODE_SUCCESS;
×
110
  } else if (tsQueryBufferSizeBytes > 0) {
×
111
    while (1) {
×
112
      int64_t s = tsQueryBufferSizeBytes;
×
113
      int64_t remain = s - t;
×
114
      if (remain >= 0) {
×
115
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
×
116
          return TSDB_CODE_SUCCESS;
×
117
        }
118
      } else {
119
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
×
120
      }
121
    }
122
  }
123

124
  // disable query processing if the value of tsQueryBufferSize is zero.
125
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
×
126
}
127

128
void releaseQueryBuf(size_t numOfTables) {
×
129
  if (tsQueryBufferSizeBytes < 0) {
×
130
    return;
×
131
  }
132

133
  int64_t t = getQuerySupportBufSize(numOfTables);
×
134

135
  // restore value is not enough buffer available
136
  (void) atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
×
137
}
138

139
typedef enum {
140
  OPTR_FN_RET_CONTINUE = 0x1,
141
  OPTR_FN_RET_ABORT = 0x2,
142
} ERetType;
143

144
typedef struct STraverParam {
145
  void*   pRet;
146
  int32_t code;
147
  void*   pParam;
148
} STraverParam;
149

150
// iterate the operator tree helper
151
typedef ERetType (*optr_fn_t)(SOperatorInfo *pOperator, STraverParam *pParam, const char* pIdstr);
152

153
void traverseOperatorTree(SOperatorInfo* pOperator, optr_fn_t fn, STraverParam* pParam, const char* id) {
235,218✔
154
  if (pOperator == NULL) {
235,218✔
155
    return;
4✔
156
  }
157

158
  ERetType ret = fn(pOperator, pParam, id);
235,214✔
159
  if (ret == OPTR_FN_RET_ABORT || pParam->code != TSDB_CODE_SUCCESS) {
235,208!
160
    return;
102,513✔
161
  }
162

163
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
256,374✔
164
    traverseOperatorTree(pOperator->pDownstream[i], fn, pParam, id);
123,668✔
165
    if (pParam->code != 0) {
123,679!
166
      break;
×
167
    }
168
  }
169
}
170

171
ERetType extractOperatorInfo(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdStr) {
198,763✔
172
  STraverParam* p = pParam;
198,763✔
173
  if (pOperator->operatorType == *(int32_t*)p->pParam) {
198,763✔
174
    p->pRet = pOperator;
95,175✔
175
    return OPTR_FN_RET_ABORT;
95,175✔
176
  } else {
177
    return OPTR_FN_RET_CONTINUE;
103,588✔
178
  }
179
}
180

181
// QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
182
int32_t extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id, SOperatorInfo** pOptrInfo) {
95,243✔
183
  QRY_PARAM_CHECK(pOptrInfo);
95,243!
184

185
  if (pOperator == NULL) {
95,243✔
186
    qError("invalid operator, failed to find tableScanOperator %s", id);
64!
187
    return TSDB_CODE_PAR_INTERNAL_ERROR;
64✔
188
  }
189

190
  STraverParam p = {.pParam = &type, .pRet = NULL};
95,179✔
191
  traverseOperatorTree(pOperator, extractOperatorInfo, &p, id);
95,179✔
192
  if (p.code == 0) {
95,175!
193
    *pOptrInfo = p.pRet;
95,176✔
194
  }
195
  return p.code;
95,175✔
196
}
197

198
typedef struct SExtScanInfo {
199
  int32_t order;
200
  int32_t scanFlag;
201
  int32_t inheritUsOrder;
202
} SExtScanInfo;
203

204
static ERetType extractScanInfo(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdStr) {
×
205
  int32_t type = pOperator->operatorType;
×
206
  SExtScanInfo* pInfo = pParam->pParam;
×
207

208
  if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN ||
×
209
      type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN ||
×
210
      type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) {
×
211
    pInfo->order = TSDB_ORDER_ASC;
×
212
    pInfo->scanFlag= MAIN_SCAN;
×
213
    return OPTR_FN_RET_ABORT;
×
214
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
×
215
    if (!pInfo->inheritUsOrder) {
×
216
      pInfo->order = TSDB_ORDER_ASC;
×
217
    }
218
    pInfo->scanFlag= MAIN_SCAN;
×
219
    return OPTR_FN_RET_ABORT;
×
220
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
×
221
    STableScanInfo* pTableScanInfo = pOperator->info;
×
222
    pInfo->order = pTableScanInfo->base.cond.order;
×
223
    pInfo->scanFlag= pTableScanInfo->base.scanFlag;
×
224
    return OPTR_FN_RET_ABORT;
×
225
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
×
226
    STableMergeScanInfo* pTableScanInfo = pOperator->info;
×
227
    pInfo->order = pTableScanInfo->base.cond.order;
×
228
    pInfo->scanFlag= pTableScanInfo->base.scanFlag;
×
229
    return OPTR_FN_RET_ABORT;
×
230
  } else {
231
    return OPTR_FN_RET_CONTINUE;
×
232
  }
233
}
234

235
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder) {
×
236
  SExtScanInfo info = {.inheritUsOrder = inheritUsOrder, .order = *order};
×
237
  STraverParam p = {.pParam = &info};
×
238

239
  traverseOperatorTree(pOperator, extractScanInfo, &p, NULL);
×
240
  *order = info.order;
×
241
  *scanFlag = info.scanFlag;
×
242

243
  if (p.code == TSDB_CODE_SUCCESS) {
×
244
    if (!(*order == TSDB_ORDER_ASC || *order == TSDB_ORDER_DESC)) {
×
245
      qError("operator failed at: %s:%d", __func__, __LINE__);
×
246
      p.code = TSDB_CODE_INVALID_PARA;
×
247
    }
248
  }
249
  return p.code;
×
250
}
251

252
static ERetType doStopDataReader(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdStr) {
36,448✔
253
  SStorageAPI* pAPI = pParam->pParam;
36,448✔
254
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
36,448✔
255
    STableScanInfo* pInfo = pOperator->info;
85✔
256

257
    if (pInfo->base.dataReader != NULL) {
85!
258
      pAPI->tsdReader.tsdReaderNotifyClosing(pInfo->base.dataReader);
85✔
259
    }
260
    return OPTR_FN_RET_ABORT;
85✔
261
  } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
36,363✔
262
    SStreamScanInfo* pInfo = pOperator->info;
7,272✔
263

264
    if (pInfo->pTableScanOp != NULL) {
7,272✔
265
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
6,967✔
266
      if (pTableScanInfo != NULL && pTableScanInfo->base.dataReader != NULL) {
6,967!
267
        pAPI->tsdReader.tsdReaderNotifyClosing(pTableScanInfo->base.dataReader);
9✔
268
      }
269
    }
270

271
    return OPTR_FN_RET_ABORT;
7,274✔
272
  }
273

274
  return OPTR_FN_RET_CONTINUE;
29,091✔
275
}
276

277
int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr, SStorageAPI* pAPI) {
16,404✔
278
  STraverParam p = {.pParam = pAPI};
16,404✔
279
  traverseOperatorTree(pOperator, doStopDataReader, &p, pIdStr);
16,404✔
280
  return p.code;
16,401✔
281
}
282

283
int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
19,490,364✔
284
                              SNode* pTagIndexCond, const char* pUser, const char* dbname, SOperatorInfo** pOptrInfo) {
285
  QRY_PARAM_CHECK(pOptrInfo);
19,490,364!
286

287
  int32_t     code = 0;
19,490,364✔
288
  int32_t     type = nodeType(pPhyNode);
19,490,364✔
289
  const char* idstr = GET_TASKID(pTaskInfo);
19,490,364✔
290

291

292
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
19,490,364!
293
    SOperatorInfo* pOperator = NULL;
9,923,996✔
294
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
9,923,996✔
295
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
3,003,263✔
296
      // NOTE: this is an patch to fix the physical plan
297
      // TODO remove it later
298
      if (pTableScanNode->scan.node.pLimit != NULL) {
3,003,263✔
299
        pTableScanNode->groupSort = true;
129,164✔
300
      }
301

302
      STableListInfo* pTableListInfo = tableListCreate();
3,003,263✔
303
      if (!pTableListInfo) {
3,002,979!
304
        pTaskInfo->code = terrno;
×
305
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
306
        return terrno;
×
307
      }
308

309
      code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo);
3,002,979✔
310
      if (code) {
2,998,336!
311
        pTaskInfo->code = code;
×
312
        tableListDestroy(pTableListInfo);
×
313
        return code;
116✔
314
      }
315

316
      if (pTableScanNode->scan.node.dynamicOp) {
3,000,519✔
317
        pTaskInfo->dynamicTask = true;
44,097✔
318
        pTableListInfo->idInfo.suid = pTableScanNode->scan.suid;
44,097✔
319
        pTableListInfo->idInfo.tableType = pTableScanNode->scan.tableType;
44,097✔
320
      } else {
321
        code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
2,956,422✔
322
                                    pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
323
        if (code) {
2,959,307✔
324
          pTaskInfo->code = code;
24✔
325
          tableListDestroy(pTableListInfo);
24✔
326
          qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
24!
327
          return code;
24✔
328
        }
329
      }
330

331
      code = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo, &pOperator);
3,003,380✔
332
      if (NULL == pOperator || code != 0) {
3,000,558!
333
        pTaskInfo->code = code;
×
334
        tableListDestroy(pTableListInfo);
×
335
        return code;
2✔
336
      }
337

338
      STableScanInfo* pScanInfo = pOperator->info;
3,002,311✔
339
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
3,002,311✔
340
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
6,920,733✔
341
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
2,004,742✔
342
      STableListInfo*           pTableListInfo = tableListCreate();
2,004,742✔
343
      if (!pTableListInfo) {
2,006,552!
344
        pTaskInfo->code = terrno;
×
345
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
346
        return terrno;
×
347
      }
348

349
      code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle,
2,006,552✔
350
                                             pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
351
      if (code) {
2,006,513!
352
        pTaskInfo->code = code;
×
353
        tableListDestroy(pTableListInfo);
×
354
        qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
×
355
        return code;
×
356
      }
357

358
      code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo);
2,006,513✔
359
      if (code) {
2,003,211!
360
        pTaskInfo->code = code;
×
361
        tableListDestroy(pTableListInfo);
×
362
        return code;
×
363
      }
364

365
      code = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo, &pOperator);
2,003,211✔
366
      if (NULL == pOperator || code != 0) {
2,004,029!
367
        pTaskInfo->code = code;
×
368
        tableListDestroy(pTableListInfo);
×
369
        return code;
×
370
      }
371

372
      STableScanInfo* pScanInfo = pOperator->info;
2,005,264✔
373
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
2,005,264✔
374
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
4,915,991✔
375
      code = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
3,699,884✔
376
                                             pTaskInfo, &pOperator);
377
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
1,216,107✔
378
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
8,521✔
379
      STableListInfo*      pTableListInfo = tableListCreate();
8,521✔
380
      if (!pTableListInfo) {
8,515!
381
        pTaskInfo->code = terrno;
×
382
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
383
        return terrno;
×
384
      }
385

386
      if (pHandle->vnode) {
8,515✔
387
        code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
8,210✔
388
                                       pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
389
        if (code) {
8,215!
390
          pTaskInfo->code = code;
×
391
          tableListDestroy(pTableListInfo);
×
392
          qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
×
393
          return code;
×
394
        }
395
      }
396

397
      code = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo, &pOperator);
8,520✔
398
      if (code) {
8,520!
399
        pTaskInfo->code = code;
×
400
        tableListDestroy(pTableListInfo);
×
401
        return code;
×
402
      }
403
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
1,207,586✔
404
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
850,357✔
405
      code = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo, &pOperator);
850,357✔
406
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN == type) {
357,229✔
407
      STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode;
292,028✔
408
      code = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo, &pOperator);
292,028✔
409
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
65,201✔
410
      STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode;
48,261✔
411
      STableListInfo*    pTableListInfo = tableListCreate();
48,261✔
412
      if (!pTableListInfo) {
48,275!
413
        pTaskInfo->code = terrno;
×
414
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
415
        return terrno;
×
416
      }
417
      if (!pTagScanPhyNode->onlyMetaCtbIdx) {
48,275✔
418
        code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
4,151✔
419
                                               pTagIndexCond, pTaskInfo);
420
        if (code != TSDB_CODE_SUCCESS) {
4,150!
421
          pTaskInfo->code = code;
×
422
          qError("failed to getTableList, code: %s", tstrerror(code));
×
423
          tableListDestroy(pTableListInfo);
×
424
          return code;
×
425
        }
426
      }
427
      code = createTagScanOperatorInfo(pHandle, pTagScanPhyNode, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo,
48,274✔
428
                                       &pOperator);
429
      if (code) {
48,232!
430
        pTaskInfo->code = code;
×
431
        tableListDestroy(pTableListInfo);
×
432
        return code;
×
433
      }
434
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
16,940✔
435
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
3,249✔
436
      STableListInfo*          pTableListInfo = tableListCreate();
3,249✔
437
      if (!pTableListInfo) {
3,249!
438
        pTaskInfo->code = terrno;
×
439
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
440
        return terrno;
×
441
      }
442

443
      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
3,249✔
444
        SArray* pList = taosArrayInit(4, sizeof(uint64_t));
3,247✔
445
        code = pTaskInfo->storageAPI.metaFn.getChildTableList(pHandle->vnode, pBlockNode->uid, pList);
3,248✔
446
        if (code != TSDB_CODE_SUCCESS) {
3,250!
447
          pTaskInfo->code = code;
×
448
          taosArrayDestroy(pList);
×
449
          tableListDestroy(pTableListInfo);
×
450
          return code;
×
451
        }
452

453
        size_t num = taosArrayGetSize(pList);
3,250✔
454
        for (int32_t i = 0; i < num; ++i) {
10,270✔
455
          uint64_t* id = taosArrayGet(pList, i);
7,019✔
456
          if (id == NULL) {
7,010!
457
            continue;
×
458
          }
459

460
          code = tableListAddTableInfo(pTableListInfo, *id, 0);
7,010✔
461
          if (code) {
7,022!
462
            pTaskInfo->code = code;
×
463
            tableListDestroy(pTableListInfo);
×
464
            taosArrayDestroy(pList);
×
465
            return code;
7,247✔
466
          }
467
        }
468

469
        taosArrayDestroy(pList);
3,251✔
470
      } else {  // Create group with only one table
471
        code = tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
2✔
472
        if (code) {
2!
473
          pTaskInfo->code = code;
×
474
          tableListDestroy(pTableListInfo);
×
475
          return code;
×
476
        }
477
      }
478

479
      code = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTableListInfo, pTaskInfo, &pOperator);
3,251✔
480
      if (code) {
3,248!
481
        pTaskInfo->code = code;
×
482
        tableListDestroy(pTableListInfo);
×
483
        return code;
×
484
      }
485
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
13,691!
486
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
16,839✔
487
      STableListInfo*        pTableListInfo = tableListCreate();
16,839✔
488
      if (!pTableListInfo) {
16,849!
489
        pTaskInfo->code = terrno;
×
490
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
491
        return terrno;
×
492
      }
493

494
      code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond,
16,849✔
495
                                     pTagIndexCond, pTaskInfo);
496
      if (code != TSDB_CODE_SUCCESS) {
16,845!
497
        pTaskInfo->code = code;
×
498
        tableListDestroy(pTableListInfo);
×
499
        return code;
×
500
      }
501

502
      code = initQueriedTableSchemaInfo(pHandle, &pScanNode->scan, dbname, pTaskInfo);
16,845✔
503
      if (code != TSDB_CODE_SUCCESS) {
16,838!
504
        pTaskInfo->code = code;
×
505
        tableListDestroy(pTableListInfo);
×
506
        return code;
×
507
      }
508

509
      code = createCacherowsScanOperator(pScanNode, pHandle, pTableListInfo, pTaskInfo, &pOperator);
16,838✔
510
      if (code) {
16,833!
511
        tableListDestroy(pTableListInfo);
×
512
        pTaskInfo->code = code;
×
513
        return code;
×
514
      }
515
    } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
×
516
      code = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo, &pOperator);
1,591✔
517
    } else {
518
      code = TSDB_CODE_INVALID_PARA;
×
519
      pTaskInfo->code = code;
×
520
      return code;
×
521
    }
522

523
    if (pOperator != NULL) {  // todo moved away
9,928,653✔
524
      pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
9,924,699✔
525
    }
526

527
    *pOptrInfo = pOperator;
9,928,653✔
528
    return code;
9,928,653✔
529
  }
530

531
  size_t          size = LIST_LENGTH(pPhyNode->pChildren);
9,566,368✔
532
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
9,566,368✔
533
  if (ops == NULL) {
9,568,328✔
534
    code = terrno;
3,283✔
535
    pTaskInfo->code = code;
×
536
    return code;
×
537
  }
538

539
  for (int32_t i = 0; i < size; ++i) {
20,686,615✔
540
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
11,119,109✔
541
    code = createOperator(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser, dbname, &ops[i]);
11,121,507✔
542
    if (ops[i] == NULL || code != 0) {
11,121,688!
543
      for (int32_t j = 0; j < i; ++j) {
118!
544
        destroyOperator(ops[j]);
×
545
      }
546
      taosMemoryFree(ops);
118✔
547
      return code;
118✔
548
    }
549
  }
550

551
  SOperatorInfo* pOptr = NULL;
9,567,506✔
552
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
9,567,506✔
553
    code = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
1,880,411✔
554
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
7,687,095✔
555
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
2,108,232✔
556
    if (pAggNode->pGroupKeys != NULL) {
2,108,232✔
557
      code = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo, &pOptr);
399,648✔
558
    } else {
559
      code = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo, &pOptr);
1,708,584✔
560
    }
561
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
5,578,863✔
562
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
1,467,137✔
563
    code = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, &pOptr);
1,467,137✔
564
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
4,111,726✔
565
    code = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
3,179✔
566
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
4,108,547✔
567
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
534,420✔
568
    code = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, &pOptr);
534,420✔
569
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
3,574,127!
570
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
×
571
    code = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, &pOptr);
×
572
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
3,574,127✔
573
    int32_t children = 0;
707✔
574
    code = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle, &pOptr);
707✔
575
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL == type) {
3,573,420✔
576
    int32_t children = pHandle->numOfVgroups;
22✔
577
    code = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle, &pOptr);
22✔
578
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
3,573,398✔
579
    int32_t children = pHandle->numOfVgroups;
250✔
580
    code = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle, &pOptr);
250✔
581
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
3,573,148✔
582
    code = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
917,770✔
583
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
2,655,378✔
584
    code = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
205✔
585
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
2,655,173✔
586
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
1,361,404✔
587
    code = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo, &pOptr);
1,361,404✔
588
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
1,293,769✔
589
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
96,946✔
590
    code = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo, &pOptr);
96,946✔
591
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
1,196,823✔
592
    code = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
596✔
593
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
1,196,227✔
594
    int32_t children = 0;
109✔
595
    code = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle, &pOptr);
109✔
596
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
1,196,118✔
597
    int32_t children = pHandle->numOfVgroups;
34✔
598
    code = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle, &pOptr);
34✔
599
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
1,196,084✔
600
    code = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
25,605✔
601
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION == type) {
1,170,479✔
602
    code = createStreamPartitionOperatorInfo(ops[0], (SStreamPartitionPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
1,129✔
603
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
1,169,350✔
604
    SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
71,099✔
605
    code = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo, &pOptr);
71,099✔
606
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
1,098,251✔
607
    code = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
328✔
608
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT == type) {
1,097,923✔
609
    code = createStreamEventAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
211✔
610
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
1,097,712✔
611
    code = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
93,675✔
612
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN == type) {
1,004,037✔
613
    code = createHashJoinOperatorInfo(ops, size, (SHashJoinPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
11,002✔
614
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
993,035✔
615
    code = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
492,626✔
616
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) {
500,409✔
617
    code = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
437✔
618
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
499,972✔
619
    code = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
122,342✔
620
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
377,630✔
621
    code = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
66,624✔
622
  } else if (QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC == type) {
311,006!
623
    code = createForecastOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
×
624
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT == type) {
311,006✔
625
    code = createEventwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
280,811✔
626
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE == type) {
30,195✔
627
    code = createGroupCacheOperatorInfo(ops, size, (SGroupCachePhysiNode*)pPhyNode, pTaskInfo, &pOptr);
11,002✔
628
  } else if (QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL == type) {
19,193✔
629
    code = createDynQueryCtrlOperatorInfo(ops, size, (SDynQueryCtrlPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
11,002✔
630
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT == type) {
8,191✔
631
    code = createStreamCountAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
308✔
632
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT == type) {
7,883✔
633
    code = createCountwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
5,121✔
634
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC == type) {
2,762✔
635
    code = createStreamTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
23✔
636
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY == type) {
2,739!
637
    code = createAnomalywindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
×
638
  } else {
639
    code = TSDB_CODE_INVALID_PARA;
2,739✔
640
    pTaskInfo->code = code;
2,739✔
641
    for (int32_t i = 0; i < size; ++i) {
2,739!
642
      destroyOperator(ops[i]);
×
643
    }
644
    taosMemoryFree(ops);
2,739✔
645
    qError("invalid operator type %d", type);
×
646
    return code;
×
647
  }
648

649
  taosMemoryFree(ops);
9,560,002✔
650
  if (pOptr) {
9,556,544!
651
    pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
9,559,109✔
652
  }
653

654
  *pOptrInfo = pOptr;
9,556,544✔
655
  return code;
9,556,544✔
656
}
657

658

659
void destroyOperator(SOperatorInfo* pOperator) {
19,496,269✔
660
  if (pOperator == NULL) {
19,496,269✔
661
    return;
142✔
662
  }
663

664
  freeResetOperatorParams(pOperator, OP_GET_PARAM, true);
19,496,127✔
665
  freeResetOperatorParams(pOperator, OP_NOTIFY_PARAM, true);
19,496,053✔
666

667
  if (pOperator->pDownstream != NULL) {
19,491,608✔
668
    for (int32_t i = 0; i < pOperator->numOfRealDownstream; ++i) {
20,687,486✔
669
      destroyOperator(pOperator->pDownstream[i]);
11,119,218✔
670
    }
671

672
    taosMemoryFreeClear(pOperator->pDownstream);
9,568,268✔
673
    pOperator->numOfDownstream = 0;
9,568,507✔
674
  }
675

676
  if (pOperator->fpSet.closeFn != NULL && pOperator->info != NULL) {
19,496,045!
677
    pOperator->fpSet.closeFn(pOperator->info);
19,502,363✔
678
  }
679

680
  cleanupExprSupp(&pOperator->exprSupp);
19,502,403✔
681
  taosMemoryFreeClear(pOperator);
19,499,994!
682
}
683

684
void destroyOperatorAndDownstreams(SOperatorInfo* pOperator, SOperatorInfo** downstreams, int32_t num) {
×
685
  if (downstreams != NULL) {
×
686
    for (int i = 0; i < num; i++) {
×
687
      destroyOperator(downstreams[i]);
×
688
    }
689
  }
690

691
  if (pOperator != NULL) {
×
692
    pOperator->info = NULL;
×
693
    if (pOperator->pDownstream != NULL) {
×
694
      taosMemoryFreeClear(pOperator->pDownstream);
×
695
      pOperator->pDownstream = NULL;
×
696
    }
697
    destroyOperator(pOperator);
×
698
  }
699
}
×
700

701
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
1,340,538✔
702
  SExplainExecInfo  execInfo = {0};
1,340,538✔
703
  SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
1,340,533✔
704
  if (pExplainInfo == NULL) {
1,340,533!
705
    return terrno;
×
706
  }
707

708
  pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
1,340,533✔
709
  pExplainInfo->startupCost = operatorInfo->cost.openCost;
1,340,533✔
710
  pExplainInfo->totalCost = operatorInfo->cost.totalCost;
1,340,533✔
711
  pExplainInfo->verboseLen = 0;
1,340,533✔
712
  pExplainInfo->verboseInfo = NULL;
1,340,533✔
713

714
  if (operatorInfo->fpSet.getExplainFn) {
1,340,533✔
715
    int32_t code =
716
        operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
586,801✔
717
    if (code) {
586,818!
UNCOV
718
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
×
719
      return code;
×
720
    }
721
  }
722

723
  int32_t code = 0;
1,340,551✔
724
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
2,110,639✔
725
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
770,064✔
726
    if (code != TSDB_CODE_SUCCESS) {
770,088!
727
      //      taosMemoryFreeClear(*pRes);
728
      return TSDB_CODE_OUT_OF_MEMORY;
×
729
    }
730
  }
731

732
  return TSDB_CODE_SUCCESS;
1,340,575✔
733
}
734

735
int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc) {
×
736
  if (pDst->opType != pSrc->opType) {
×
737
    qError("different optype %d:%d for merge operator params", pDst->opType, pSrc->opType);
×
738
    return TSDB_CODE_INVALID_PARA;
×
739
  }
740
  
741
  switch (pDst->opType) {
×
742
    case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
×
743
      SExchangeOperatorParam* pDExc = pDst->value;
×
744
      SExchangeOperatorParam* pSExc = pSrc->value;
×
745
      if (!pDExc->multiParams) {
×
746
        if (pSExc->basic.vgId != pDExc->basic.vgId) {
×
747
          SExchangeOperatorBatchParam* pBatch = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
×
748
          if (NULL == pBatch) {
×
749
            return terrno;
×
750
          }
751

752
          pBatch->multiParams = true;
×
753
          pBatch->pBatchs = tSimpleHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
754
          if (NULL == pBatch->pBatchs) {
×
755
            taosMemoryFree(pBatch);
×
756
            return terrno;
×
757
          }
758

759
          tSimpleHashSetFreeFp(pBatch->pBatchs, freeExchangeGetBasicOperatorParam);
×
760
          
761
          int32_t code = tSimpleHashPut(pBatch->pBatchs, &pDExc->basic.vgId, sizeof(pDExc->basic.vgId), &pDExc->basic, sizeof(pDExc->basic));
×
762
          if (code) {
×
763
            return code;
×
764
          }
765

766
          code = tSimpleHashPut(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId), &pSExc->basic, sizeof(pSExc->basic));
×
767
          if (code) {
×
768
            return code;
×
769
          }
770
          
771
          taosMemoryFree(pDst->value);
×
772
          pDst->value = pBatch;
×
773
        } else {
774
          void* p = taosArrayAddAll(pDExc->basic.uidList, pSExc->basic.uidList);
×
775
          if (p == NULL) {
×
776
            return terrno;
×
777
          }
778
        }
779
      } else {
780
        SExchangeOperatorBatchParam* pBatch = pDst->value;
×
781
        SExchangeOperatorBasicParam* pBasic = tSimpleHashGet(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId));
×
782
        if (pBasic) {
×
783
          void* p = taosArrayAddAll(pBasic->uidList, pSExc->basic.uidList);
×
784
          if (p == NULL) {
×
785
            return terrno;
×
786
          }
787
        } else {
788
          int32_t code = tSimpleHashPut(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId), &pSExc->basic, sizeof(pSExc->basic));
×
789
          if (code) {
×
790
            return code;
×
791
          }
792
        }
793
      }
794
      break;
×
795
    }
796
    default:
×
797
      qError("invalid optype %d for merge operator params", pDst->opType);
×
798
      return TSDB_CODE_INVALID_PARA;
×
799
  }
800

801
  return TSDB_CODE_SUCCESS;
×
802
}
803

804

805
int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInput, SOperatorParamType type) {
36,151✔
806
  SOperatorParam** ppParam = NULL;
36,151✔
807
  SOperatorParam*** pppDownstramParam = NULL;
36,151✔
808
  switch (type) {
36,151!
809
    case OP_GET_PARAM:
36,152✔
810
      ppParam = &pOperator->pOperatorGetParam;
36,152✔
811
      pppDownstramParam = &pOperator->pDownstreamGetParams;
36,152✔
812
      break;
36,152✔
813
    case OP_NOTIFY_PARAM:
×
814
      ppParam = &pOperator->pOperatorNotifyParam;
×
815
      pppDownstramParam = &pOperator->pDownstreamNotifyParams;
×
816
      break;
×
817
    default:
×
818
      return TSDB_CODE_INVALID_PARA;
×
819
  }
820

821
  freeResetOperatorParams(pOperator, type, false);
36,152✔
822
  
823
  if (NULL == pInput) {
36,151!
824
    return TSDB_CODE_SUCCESS;
×
825
  }
826

827
  *ppParam = (pInput->opType == pOperator->operatorType) ? pInput : NULL;
36,151!
828
  
829
  if (NULL == *pppDownstramParam) {
36,151✔
830
    *pppDownstramParam = taosMemoryCalloc(pOperator->numOfDownstream, POINTER_BYTES);
1,913✔
831
    if (NULL == *pppDownstramParam) {
1,912!
832
      return terrno;
×
833
    }
834
  }
835

836
  if (NULL == *ppParam) {
36,151!
837
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
×
838
      (*pppDownstramParam)[i] = pInput;
×
839
    }
840
    return TSDB_CODE_SUCCESS;
×
841
  }
842

843
  memset(*pppDownstramParam, 0, pOperator->numOfDownstream * POINTER_BYTES);
36,151✔
844

845
  int32_t childrenNum = taosArrayGetSize((*ppParam)->pChildren);
36,151✔
846
  if (childrenNum <= 0) {
36,152✔
847
    return TSDB_CODE_SUCCESS;
1,658✔
848
  }
849
  
850
  for (int32_t i = 0; i < childrenNum; ++i) {
103,482✔
851
    SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet((*ppParam)->pChildren, i);
68,988✔
852
    if (pChild == NULL) {
68,988!
853
      return terrno;
×
854
    }
855

856
    if ((*pppDownstramParam)[pChild->downstreamIdx]) {
68,988!
857
      int32_t code = mergeOperatorParams((*pppDownstramParam)[pChild->downstreamIdx], pChild);
×
858
      if (code) {
×
859
        return code;
×
860
      }
861
    } else {
862
      (*pppDownstramParam)[pChild->downstreamIdx] = pChild;
68,988✔
863
    }
864
  }
865

866
  taosArrayDestroy((*ppParam)->pChildren);
34,494✔
867
  (*ppParam)->pChildren = NULL;
34,494✔
868

869
  return TSDB_CODE_SUCCESS;
34,494✔
870
}
871

872

873
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) {
38,437,788✔
874
  SSDataBlock* p = NULL;
38,437,788✔
875
  int32_t      code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p);
38,437,788✔
876
  if (code == TSDB_CODE_SUCCESS) {
38,440,679!
877
    code = blockDataCheck(p);
38,442,441✔
878
    if (code != TSDB_CODE_SUCCESS) {
38,438,103✔
879
      qError("blockDataCheck failed, code:%s", tstrerror(code));
137!
880
    }
881
  }
882
  return (code == 0) ? p : NULL;
38,437,481✔
883
}
884

885
SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, int32_t idx) {
514,218✔
886
  SSDataBlock* p = NULL;
514,218✔
887
  int32_t      code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p);
514,218✔
888
  if (code == TSDB_CODE_SUCCESS) {
514,179!
889
    code = blockDataCheck(p);
514,202✔
890
    if (code != TSDB_CODE_SUCCESS) {
514,207!
891
      qError("blockDataCheck failed, code:%s", tstrerror(code));
×
892
    }
893
  }
894
  return (code == 0)? p:NULL;
514,212!
895
}
896

897
int32_t optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SSDataBlock** pRes) {
36,152✔
898
  QRY_PARAM_CHECK(pRes);
36,152!
899

900
  int32_t lino = 0;
36,152✔
901
  int32_t code = setOperatorParams(pOperator, pParam, OP_GET_PARAM);
36,152✔
902
  QUERY_CHECK_CODE(code, lino, _end);
36,152!
903
  code = pOperator->fpSet.getNextFn(pOperator, pRes);
36,152✔
904
  QUERY_CHECK_CODE(code, lino, _end);
36,151!
905

906
_end:
36,151✔
907
  if (code != TSDB_CODE_SUCCESS) {
36,151!
908
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
909
    pOperator->pTaskInfo->code = code;
×
910
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
911
  }
912

913
  return code;
36,151✔
914
}
915

916
int32_t optrDefaultNotifyFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
×
917
  int32_t code = setOperatorParams(pOperator, pParam, OP_NOTIFY_PARAM);
×
918
  if (TSDB_CODE_SUCCESS == code && pOperator->fpSet.notifyFn && pOperator->pOperatorNotifyParam) {
×
919
    code = pOperator->fpSet.notifyFn(pOperator, pOperator->pOperatorNotifyParam);
×
920
  }
921
  if (TSDB_CODE_SUCCESS == code) {
×
922
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
×
923
      if (pOperator->pDownstreamNotifyParams[i]) {
×
924
        code = optrDefaultNotifyFn(pOperator->pDownstream[i], pOperator->pDownstreamNotifyParams[i]);
×
925
        if (TSDB_CODE_SUCCESS != code) {
×
926
          break;
×
927
        }
928
        pOperator->pDownstreamNotifyParams[i] = NULL;
×
929
      }
930
    }
931
  }
932
  if (TSDB_CODE_SUCCESS != code) {
×
933
    pOperator->pTaskInfo->code = code;
×
934
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
935
  }
936
  
937
  return code;
×
938
}
939

940
int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx) {
209,832✔
941
  if (pOperator->transparent) {
209,832✔
942
    return getOperatorResultBlockId(pOperator->pDownstream[idx], 0);
22,004✔
943
  }
944
  return pOperator->resultDataBlockId;
187,828✔
945
}
946

947

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc