• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4836

31 Oct 2025 03:37AM UTC coverage: 58.728% (+0.2%) from 58.506%
#4836

push

travis-ci

SallyHuo-TAOS
Merge remote-tracking branch 'origin/cover/3.0' into cover/3.0

# Conflicts:
#	test/ci/run.sh

149727 of 324176 branches covered (46.19%)

Branch coverage included in aggregate %.

198923 of 269498 relevant lines covered (73.81%)

238054213.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.41
/source/libs/executor/src/executor.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include <stdint.h>
18
#include "cmdnodes.h"
19
#include "dataSinkInt.h"
20
#include "executil.h"
21
#include "executorInt.h"
22
#include "libs/new-stream/stream.h"
23
#include "operator.h"
24
#include "osMemPool.h"
25
#include "osMemory.h"
26
#include "planner.h"
27
#include "query.h"
28
#include "querytask.h"
29
#include "storageapi.h"
30
#include "streamexecutorInt.h"
31
#include "tdatablock.h"
32
#include "tref.h"
33
#include "trpc.h"
34
#include "tudf.h"
35
#include "wal.h"
36

37
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
38
int32_t             exchangeObjRefPool = -1;
39
SGlobalExecInfo     gExecInfo = {0};
40

41
void gExecInfoInit(void* pDnode, getDnodeId_f getDnodeId, getMnodeEpset_f getMnode) {
2,314,440✔
42
  gExecInfo.dnode = pDnode;
2,314,440✔
43
  gExecInfo.getMnode = getMnode;
2,314,440✔
44
  gExecInfo.getDnodeId = getDnodeId;
2,314,440✔
45
  return;
2,314,440✔
46
}
47

48
int32_t getCurrentMnodeEpset(SEpSet* pEpSet) {
186,304✔
49
  if (gExecInfo.dnode == NULL || gExecInfo.getMnode == NULL) {
186,304!
50
    qError("gExecInfo is not initialized");
×
51
    return TSDB_CODE_APP_ERROR;
×
52
  }
53
  gExecInfo.getMnode(gExecInfo.dnode, pEpSet);
186,304✔
54
  return TSDB_CODE_SUCCESS;
186,304✔
55
}
56

57
static void cleanupRefPool() {
2,199,653✔
58
  int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
2,199,653✔
59
  taosCloseRef(ref);
2,199,653✔
60
}
2,199,653✔
61

62
static void initRefPool() {
2,199,653✔
63
  exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo);
2,199,653✔
64
  (void)atexit(cleanupRefPool);
2,199,653!
65
}
2,199,653✔
66

67
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
×
68
  int32_t code = TSDB_CODE_SUCCESS;
×
69
  int32_t lino = 0;
×
70
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
×
71
    if (pOperator->numOfDownstream == 0) {
×
72
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
×
73
      return TSDB_CODE_APP_ERROR;
×
74
    }
75

76
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
×
77
      qError("join not supported for stream block scan, %s" PRIx64, id);
×
78
      return TSDB_CODE_APP_ERROR;
×
79
    }
80
    pOperator->status = OP_NOT_OPENED;
×
81
    return doSetSMABlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
×
82
  } else {
83
    pOperator->status = OP_NOT_OPENED;
×
84

85
    SStreamScanInfo* pInfo = pOperator->info;
×
86

87
    if (type == STREAM_INPUT__MERGED_SUBMIT) {
×
88
      for (int32_t i = 0; i < numOfBlocks; i++) {
×
89
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
×
90
        void*        tmp = taosArrayPush(pInfo->pBlockLists, pReq);
×
91
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
92
      }
93
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
×
94
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
×
95
      void* tmp = taosArrayPush(pInfo->pBlockLists, &input);
×
96
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
97
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
×
98
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
×
99
      for (int32_t i = 0; i < numOfBlocks; ++i) {
×
100
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
×
101
        SPackedData  tmp = {.pDataBlock = pDataBlock};
×
102
        void*        tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
×
103
        QUERY_CHECK_NULL(tmpItem, code, lino, _end, terrno);
×
104
      }
105
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
×
106
    } else if (type == STREAM_INPUT__CHECKPOINT) {
×
107
      SPackedData tmp = {.pDataBlock = input};
×
108
      void*       tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
×
109
      QUERY_CHECK_NULL(tmpItem, code, lino, _end, terrno);
×
110
      pInfo->blockType = STREAM_INPUT__CHECKPOINT;
×
111
    } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
×
112
      for (int32_t i = 0; i < numOfBlocks; ++i) {
×
113
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
×
114
        void*        tmp = taosArrayPush(pInfo->pBlockLists, pReq);
×
115
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
116
      }
117
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
×
118
    }
119

120
    return TSDB_CODE_SUCCESS;
×
121
  }
122

123
_end:
×
124
  if (code != TSDB_CODE_SUCCESS) {
×
125
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
126
  }
127
  return code;
×
128
}
129

130
static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
×
131
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
×
132
    if (pOperator->numOfDownstream == 0) {
×
133
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
×
134
      return TSDB_CODE_APP_ERROR;
×
135
    }
136

137
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
×
138
      qError("join not supported for stream block scan, %s" PRIx64, id);
×
139
      return TSDB_CODE_APP_ERROR;
×
140
    }
141

142
    pOperator->status = OP_NOT_OPENED;
×
143
    return doSetStreamOpOpen(pOperator->pDownstream[0], id);
×
144
  }
145
  return 0;
×
146
}
147

148
int32_t doSetTaskId(SOperatorInfo* pOperator, SStorageAPI* pAPI) {
107,589,667✔
149
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
107,589,667✔
150
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
107,589,697✔
151
    SStreamScanInfo* pStreamScanInfo = pOperator->info;
43,221,167✔
152
    if (pStreamScanInfo->pTableScanOp != NULL) {
43,222,016!
153
      STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info;
43,221,145✔
154
      if (pScanInfo->base.dataReader != NULL) {
43,222,865✔
155
        int32_t code = pAPI->tsdReader.tsdSetReaderTaskId(pScanInfo->base.dataReader, pTaskInfo->id.str);
734,315✔
156
        if (code) {
734,315!
157
          qError("failed to set reader id for executor, code:%s", tstrerror(code));
×
158
          return code;
×
159
        }
160
      }
161
    }
162
  } else {
163
    if (pOperator->pDownstream) return doSetTaskId(pOperator->pDownstream[0], pAPI);
64,359,326✔
164
  }
165

166
  return 0;
62,622,402✔
167
}
168

169
int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
62,622,506✔
170
  SExecTaskInfo* pTaskInfo = tinfo;
62,622,506✔
171
  pTaskInfo->id.queryId = queryId;
62,622,506✔
172
  buildTaskId(taskId, queryId, pTaskInfo->id.str, 64);
62,622,506✔
173

174
  // set the idstr for tsdbReader
175
  return doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI);
62,621,447✔
176
}
177

178
bool qTaskIsDone(qTaskInfo_t tinfo) {
×
179
  SExecTaskInfo* pTaskInfo = tinfo;
×
180
  return pTaskInfo->status == OP_EXEC_DONE;
×
181
}
182

183
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
×
184
  if (tinfo == NULL) {
×
185
    return TSDB_CODE_APP_ERROR;
×
186
  }
187

188
  if (pBlocks == NULL || numOfBlocks == 0) {
×
189
    return TSDB_CODE_SUCCESS;
×
190
  }
191

192
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
193

194
  int32_t code = doSetSMABlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
×
195
  if (code != TSDB_CODE_SUCCESS) {
×
196
    qError("%s failed to set the sma block data", GET_TASKID(pTaskInfo));
×
197
  } else {
198
    qDebug("%s set the sma block successfully", GET_TASKID(pTaskInfo));
×
199
  }
200

201
  return code;
×
202
}
203

204
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols,
1,204,013✔
205
                                     uint64_t id) {
206
  if (msg == NULL) {  // create raw scan
1,204,013✔
207
    SExecTaskInfo* pTaskInfo = NULL;
293,957✔
208

209
    int32_t code = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, &pReaderHandle->api, &pTaskInfo);
293,957✔
210
    if (NULL == pTaskInfo || code != 0) {
293,957!
211
      return NULL;
×
212
    }
213

214
    code = createTmqRawScanOperatorInfo(pReaderHandle, pTaskInfo, &pTaskInfo->pRoot);
293,957✔
215
    if (NULL == pTaskInfo->pRoot || code != 0) {
293,957!
216
      taosMemoryFree(pTaskInfo);
×
217
      return NULL;
×
218
    }
219

220
    pTaskInfo->storageAPI = pReaderHandle->api;
293,957✔
221
    qDebug("create raw scan task info completed, vgId:%d, %s", vgId, GET_TASKID(pTaskInfo));
293,957!
222
    return pTaskInfo;
293,957✔
223
  }
224

225
  SSubplan* pPlan = NULL;
910,056✔
226
  int32_t   code = qStringToSubplan(msg, &pPlan);
916,126✔
227
  if (code != TSDB_CODE_SUCCESS) {
915,263!
228
    terrno = code;
×
229
    return NULL;
×
230
  }
231

232
  qTaskInfo_t pTaskInfo = NULL;
915,263✔
233
  code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_QUEUE);
915,263✔
234
  if (code != TSDB_CODE_SUCCESS) {
916,126!
235
    qDestroyTask(pTaskInfo);
×
236
    terrno = code;
×
237
    return NULL;
×
238
  }
239

240
  // extract the number of output columns
241
  SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc;
916,126✔
242
  *numOfCols = 0;
916,126✔
243

244
  SNode* pNode;
245
  FOREACH(pNode, pDescNode->pSlots) {
8,608,217!
246
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
7,692,091✔
247
    if (pSlotDesc->output) {
7,692,091✔
248
      ++(*numOfCols);
7,690,399✔
249
    }
250
  }
251

252
  return pTaskInfo;
916,126✔
253
}
254

255
static int32_t checkInsertParam(SStreamInserterParam* streamInserterParam) {
1,548,640✔
256
  if (streamInserterParam == NULL) {
1,548,640✔
257
    return TSDB_CODE_SUCCESS;
660,047✔
258
  }
259

260
  if (streamInserterParam->tbType == TSDB_SUPER_TABLE && streamInserterParam->suid <= 0) {
888,593!
261
    stError("insertParam: invalid suid:%" PRIx64 " for child table", streamInserterParam->suid);
×
262
    return TSDB_CODE_INVALID_PARA;
×
263
  }
264

265
  if (streamInserterParam->dbFName == NULL || strlen(streamInserterParam->dbFName) == 0) {
889,850!
266
    stError("insertParam: invalid db/table name");
×
267
    return TSDB_CODE_INVALID_PARA;
×
268
  }
269

270
  if (streamInserterParam->suid <= 0 &&
889,850✔
271
      (streamInserterParam->tbname == NULL || strlen(streamInserterParam->tbname) == 0)) {
270,631!
272
    stError("insertParam: invalid table name, suid:%" PRIx64 "", streamInserterParam->suid);
×
273
    return TSDB_CODE_INVALID_PARA;
×
274
  }
275

276
  return TSDB_CODE_SUCCESS;
889,850✔
277
}
278

279
static int32_t qCreateStreamExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
1,548,640✔
280
                                     qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql,
281
                                     EOPTR_EXEC_MODEL model, SStreamInserterParam* streamInserterParam) {
282
  if (pSubplan == NULL || pTaskInfo == NULL) {
1,548,640!
283
    qError("invalid parameter, pSubplan:%p, pTaskInfo:%p", pSubplan, pTaskInfo);
×
284
    nodesDestroyNode((SNode *)pSubplan);
×
285
    return TSDB_CODE_INVALID_PARA;
×
286
  }
287
  int32_t lino = 0;
1,549,897✔
288
  int32_t code = checkInsertParam(streamInserterParam);
1,549,897✔
289
  if (code != TSDB_CODE_SUCCESS) {
1,548,640!
290
    qError("invalid stream inserter param, code:%s", tstrerror(code));
×
291
    nodesDestroyNode((SNode *)pSubplan);
×
292
    return code;
×
293
  }
294
  SInserterParam* pInserterParam = NULL;
1,548,640✔
295
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
1,548,640✔
296
  (void)taosThreadOnce(&initPoolOnce, initRefPool);
1,548,640✔
297
  qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
1,548,640✔
298

299
  code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model);
1,549,897✔
300
  if (code != TSDB_CODE_SUCCESS || NULL == *pTask) {
1,549,897!
301
    qError("failed to createExecTaskInfo, code:%s", tstrerror(code));
100!
302
    goto _error;
×
303
  }
304

305
  if (streamInserterParam) {
1,549,797✔
306
    SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50, .compress = compressResult};
889,750✔
307
    void*           pSinkManager = NULL;
889,750✔
308
    code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
889,850✔
309
    if (code != TSDB_CODE_SUCCESS) {
889,650!
310
      qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
×
311
      goto _error;
×
312
    }
313

314
    pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
889,650!
315
    if (NULL == pInserterParam) {
889,850!
316
      qError("failed to taosMemoryCalloc, code:%s, %s", tstrerror(terrno), (*pTask)->id.str);
×
317
      code = terrno;
×
318
      goto _error;
×
319
    }
320
    code = cloneStreamInserterParam(&pInserterParam->streamInserterParam, streamInserterParam);
889,850✔
321
    TSDB_CHECK_CODE(code, lino, _error);
889,850!
322
    
323
    pInserterParam->readHandle = taosMemCalloc(1, sizeof(SReadHandle));
889,850✔
324
    pInserterParam->readHandle->pMsgCb = readHandle->pMsgCb;
889,850✔
325

326
    code = createStreamDataInserter(pSinkManager, handle, pInserterParam);
889,850✔
327
    if (code) {
889,850!
328
      qError("failed to createStreamDataInserter, code:%s, %s", tstrerror(code), (*pTask)->id.str);
×
329
    }
330
  }
331
  qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64 " code:%s", taskId, pSubplan->id.queryId,
1,549,897✔
332
         tstrerror(code));
333

334
_error:
559,186✔
335

336
  if (code != TSDB_CODE_SUCCESS) {
1,549,897!
337
    qError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
338
    if (pInserterParam != NULL) {
×
339
      taosMemoryFree(pInserterParam);
×
340
    }
341
  }
342
  return code;
1,549,897✔
343
}
344

345
bool qNeedReset(qTaskInfo_t pInfo) {
17,866,018✔
346
  if (pInfo == NULL) {
17,866,018!
347
    return false;
×
348
  }
349
  SExecTaskInfo*  pTaskInfo = (SExecTaskInfo*)pInfo;
17,866,018✔
350
  SOperatorInfo*  pOperator = pTaskInfo->pRoot;
17,866,018✔
351
  if (pOperator == NULL || pOperator->pPhyNode == NULL) {
17,866,018!
352
    return false;
15,372✔
353
  }
354
  int32_t node = nodeType(pOperator->pPhyNode);
17,850,646✔
355
  return (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == node || 
17,394,915✔
356
          QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == node ||
35,245,561!
357
          QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == node);
358
}
359

360
static void setReadHandle(SReadHandle* pHandle, STableScanBase* pScanBaseInfo) {
17,490,744✔
361
  if (pHandle == NULL || pScanBaseInfo == NULL) {
17,490,744!
362
    return;
×
363
  }
364

365
  pScanBaseInfo->readHandle.uid = pHandle->uid;
17,490,744✔
366
  pScanBaseInfo->readHandle.winRangeValid = pHandle->winRangeValid;
17,490,744!
367
  pScanBaseInfo->readHandle.winRange = pHandle->winRange;
17,490,744✔
368
  pScanBaseInfo->readHandle.extWinRangeValid = pHandle->extWinRangeValid;
17,490,744!
369
  pScanBaseInfo->readHandle.extWinRange = pHandle->extWinRange;
17,490,744✔
370
  pScanBaseInfo->readHandle.version = pHandle->version;
17,490,744✔
371
}
372

373
int32_t qResetTableScan(qTaskInfo_t pInfo, SReadHandle* handle) {
17,850,646✔
374
  if (pInfo == NULL) {
17,850,646!
375
    return TSDB_CODE_INVALID_PARA;
×
376
  }
377
  SExecTaskInfo*  pTaskInfo = (SExecTaskInfo*)pInfo;
17,850,646✔
378
  SOperatorInfo*  pOperator = pTaskInfo->pRoot;
17,850,646✔
379

380
  void*           info = pOperator->info;
17,850,646✔
381
  STableScanBase* pScanBaseInfo = NULL;
17,850,646✔
382

383
  if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pOperator->pPhyNode)) {
17,850,646✔
384
    pScanBaseInfo = &((STableScanInfo*)info)->base;
455,731✔
385
    setReadHandle(handle, pScanBaseInfo);
455,731✔
386
  } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(pOperator->pPhyNode)) {
17,394,915✔
387
    pScanBaseInfo = &((STableMergeScanInfo*)info)->base;
17,035,013✔
388
    setReadHandle(handle, pScanBaseInfo);
17,035,013✔
389
  }
390

391
  qDebug("reset table scan, name:%s, id:%s, time range: [%" PRId64 ", %" PRId64 "]", pOperator->name, GET_TASKID(pTaskInfo), handle->winRange.skey,
17,850,646✔
392
  handle->winRange.ekey);
393
  return pOperator->fpSet.resetStateFn(pOperator);
17,850,646✔
394
}
395

396
int32_t qCreateStreamExecTaskInfo(qTaskInfo_t* pTaskInfo, void* msg, SReadHandle* readers,
1,549,897✔
397
                                  SStreamInserterParam* pInserterParams, int32_t vgId, int32_t taskId) {
398
  if (msg == NULL) {
1,549,897!
399
    return TSDB_CODE_INVALID_PARA;
×
400
  }
401

402
  *pTaskInfo = NULL;
1,549,897✔
403

404
  SSubplan* pPlan = NULL;
1,549,897✔
405
  int32_t   code = qStringToSubplan(msg, &pPlan);
1,549,897✔
406
  if (code != TSDB_CODE_SUCCESS) {
1,548,640!
407
    nodesDestroyNode((SNode *)pPlan);
×
408
    return code;
×
409
  }
410
  // todo: add stream inserter param
411
  code = qCreateStreamExecTask(readers, vgId, taskId, pPlan, pTaskInfo,
1,548,640✔
412
                               pInserterParams ? &pInserterParams->pSinkHandle : NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM,
413
                               pInserterParams);
414
  if (code != TSDB_CODE_SUCCESS) {
1,549,897!
415
    qDestroyTask(*pTaskInfo);
×
416
    return code;
×
417
  }
418

419
  return code;
1,549,897✔
420
}
421

422
static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr,
1,053,824✔
423
                                       SStorageAPI* pAPI, SArray** ppArrayRes) {
424
  int32_t code = TSDB_CODE_SUCCESS;
1,053,824✔
425
  int32_t lino = 0;
1,053,824✔
426
  SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
1,053,824✔
427
  QUERY_CHECK_NULL(qa, code, lino, _error, terrno);
1,053,824!
428
  int32_t numOfUids = taosArrayGetSize(tableIdList);
1,053,824✔
429
  if (numOfUids == 0) {
1,053,824!
430
    (*ppArrayRes) = qa;
×
431
    goto _error;
×
432
  }
433

434
  STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
1,053,824✔
435

436
  uint64_t suid = 0;
1,053,824✔
437
  uint64_t uid = 0;
1,053,824✔
438
  int32_t  type = 0;
1,053,824✔
439
  tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &suid, &uid, &type);
1,053,824✔
440

441
  // let's discard the tables those are not created according to the queried super table.
442
  SMetaReader mr = {0};
1,053,824✔
443
  pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
1,053,824✔
444
  for (int32_t i = 0; i < numOfUids; ++i) {
2,151,370✔
445
    uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
1,097,546✔
446
    QUERY_CHECK_NULL(id, code, lino, _end, terrno);
1,097,546!
447

448
    int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr, *id);
1,097,546✔
449
    if (code != TSDB_CODE_SUCCESS) {
1,097,546!
450
      qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
×
451
      continue;
×
452
    }
453

454
    tDecoderClear(&mr.coder);
1,097,546✔
455

456
    if (mr.me.type == TSDB_SUPER_TABLE) {
1,097,546!
457
      continue;
×
458
    } else {
459
      if (type == TSDB_SUPER_TABLE) {
1,097,546!
460
        // this new created child table does not belong to the scanned super table.
461
        if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != suid) {
1,097,546!
462
          continue;
9,020✔
463
        }
464
      } else {  // ordinary table
465
        // In case that the scanned target table is an ordinary table. When replay the WAL during restore the vnode, we
466
        // should check all newly created ordinary table to make sure that this table isn't the destination table.
467
        if (mr.me.uid != uid) {
×
468
          continue;
×
469
        }
470
      }
471
    }
472

473
    if (pScanInfo->pTagCond != NULL) {
1,088,526✔
474
      bool          qualified = false;
936,301✔
475
      STableKeyInfo info = {.groupId = 0, .uid = mr.me.uid};
936,301✔
476
      code = isQualifiedTable(&info, pScanInfo->pTagCond, pScanInfo->readHandle.vnode, &qualified, pAPI);
936,301✔
477
      if (code != TSDB_CODE_SUCCESS) {
936,301!
478
        qError("failed to filter new table, uid:0x%" PRIx64 ", %s", info.uid, idstr);
×
479
        continue;
×
480
      }
481

482
      if (!qualified) {
936,301!
483
        continue;
467,749✔
484
      }
485
    }
486

487
    // handle multiple partition
488
    void* tmp = taosArrayPush(qa, id);
620,777✔
489
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
620,777!
490
  }
491

492
_end:
1,053,824✔
493

494
  pAPI->metaReaderFn.clearReader(&mr);
1,053,824✔
495
  (*ppArrayRes) = qa;
1,053,824✔
496

497
_error:
1,053,824✔
498
  if (code != TSDB_CODE_SUCCESS) {
1,053,824!
499
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
500
  }
501
  return code;
1,053,824✔
502
}
503

504
int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
1,056,417✔
505
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1,056,417✔
506
  const char*    id = GET_TASKID(pTaskInfo);
1,056,417✔
507
  int32_t        code = 0;
1,056,417✔
508

509
  if (isAdd) {
1,056,417✔
510
    qDebug("try to add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), id);
1,053,824!
511
  }
512

513
  // traverse to the stream scanner node to add this table id
514
  SOperatorInfo* pInfo = NULL;
1,056,417✔
515
  code = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pInfo);
1,056,417✔
516
  if (code != 0 || pInfo == NULL) {
1,056,417!
517
    return code;
×
518
  }
519

520
  SStreamScanInfo* pScanInfo = pInfo->info;
1,056,417✔
521
  if (pInfo->pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {  // clear meta cache for subscription if tag is changed
1,056,417!
522
    for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
2,156,556✔
523
      int64_t*        uid = (int64_t*)taosArrayGet(tableIdList, i);
1,100,139✔
524
      STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
1,100,139✔
525
      taosLRUCacheErase(pTableScanInfo->base.metaCache.pTableMetaEntryCache, uid, LONG_BYTES);
1,100,139✔
526
    }
527
  }
528

529
  if (isAdd) {  // add new table id
1,056,417✔
530
    SArray* qa = NULL;
1,053,824✔
531
    code = filterUnqualifiedTables(pScanInfo, tableIdList, id, &pTaskInfo->storageAPI, &qa);
1,053,824✔
532
    if (code != TSDB_CODE_SUCCESS) {
1,053,824!
533
      taosArrayDestroy(qa);
×
534
      return code;
×
535
    }
536
    int32_t numOfQualifiedTables = taosArrayGetSize(qa);
1,053,824✔
537
    qDebug("%d qualified child tables added into stream scanner, %s", numOfQualifiedTables, id);
1,053,824!
538
    pTaskInfo->storageAPI.tqReaderFn.tqReaderAddTables(pScanInfo->tqReader, qa);
1,053,824✔
539

540
    bool   assignUid = false;
1,053,824✔
541
    size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
1,053,824!
542
    char*  keyBuf = NULL;
1,053,824✔
543
    if (bufLen > 0) {
1,053,824!
544
      assignUid = groupbyTbname(pScanInfo->pGroupTags);
×
545
      keyBuf = taosMemoryMalloc(bufLen);
×
546
      if (keyBuf == NULL) {
×
547
        taosArrayDestroy(qa);
×
548
        return terrno;
×
549
      }
550
    }
551

552
    STableListInfo* pTableListInfo = ((STableScanInfo*)pScanInfo->pTableScanOp->info)->base.pTableListInfo;
1,053,824✔
553
    taosWLockLatch(&pTaskInfo->lock);
1,053,824✔
554

555
    for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
1,674,601✔
556
      uint64_t* uid = taosArrayGet(qa, i);
620,777✔
557
      if (!uid) {
620,777!
558
        taosMemoryFree(keyBuf);
×
559
        taosArrayDestroy(qa);
×
560
        taosWUnLockLatch(&pTaskInfo->lock);
×
561
        return terrno;
×
562
      }
563
      STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
620,777✔
564

565
      if (bufLen > 0) {
620,777!
566
        if (assignUid) {
×
567
          keyInfo.groupId = keyInfo.uid;
×
568
        } else {
569
          code = getGroupIdFromTagsVal(pScanInfo->readHandle.vnode, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
×
570
                                       &keyInfo.groupId, &pTaskInfo->storageAPI);
571
          if (code != TSDB_CODE_SUCCESS) {
×
572
            taosMemoryFree(keyBuf);
×
573
            taosArrayDestroy(qa);
×
574
            taosWUnLockLatch(&pTaskInfo->lock);
×
575
            return code;
×
576
          }
577
        }
578
      }
579

580
      code = tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
620,777✔
581
      if (code != TSDB_CODE_SUCCESS) {
620,777!
582
        taosMemoryFree(keyBuf);
×
583
        taosArrayDestroy(qa);
×
584
        taosWUnLockLatch(&pTaskInfo->lock);
×
585
        return code;
×
586
      }
587
    }
588

589
    taosWUnLockLatch(&pTaskInfo->lock);
1,053,824✔
590
    if (keyBuf != NULL) {
1,053,824!
591
      taosMemoryFree(keyBuf);
×
592
    }
593

594
    taosArrayDestroy(qa);
1,053,824✔
595
  } else {  // remove the table id in current list
596
    qDebug("%d remove child tables from the stream scanner, %s", (int32_t)taosArrayGetSize(tableIdList), id);
2,593!
597
    taosWLockLatch(&pTaskInfo->lock);
2,593✔
598
    pTaskInfo->storageAPI.tqReaderFn.tqReaderRemoveTables(pScanInfo->tqReader, tableIdList);
2,593✔
599
    taosWUnLockLatch(&pTaskInfo->lock);
2,593✔
600
  }
601

602
  return code;
1,056,417✔
603
}
604

605
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, int32_t dbNameBuffLen, char* tableName,
2,036,691,557✔
606
                                    int32_t tbaleNameBuffLen, int32_t* sversion, int32_t* tversion, int32_t* rversion,
607
                                    int32_t idx, bool* tbGet) {
608
  *tbGet = false;
2,036,691,557✔
609

610
  if (tinfo == NULL || dbName == NULL || tableName == NULL) {
2,036,778,758!
611
    return TSDB_CODE_INVALID_PARA;
×
612
  }
613
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2,036,843,276✔
614

615
  if (taosArrayGetSize(pTaskInfo->schemaInfos) <= idx) {
2,036,843,276✔
616
    return TSDB_CODE_SUCCESS;
1,196,724,490✔
617
  }
618

619
  SSchemaInfo* pSchemaInfo = taosArrayGet(pTaskInfo->schemaInfos, idx);
840,016,230✔
620
  if (!pSchemaInfo) {
839,774,999!
621
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
622
    return terrno;
×
623
  }
624

625
  *sversion = pSchemaInfo->sw->version;
839,774,999✔
626
  *tversion = pSchemaInfo->tversion;
840,166,014✔
627
  *rversion = pSchemaInfo->rversion;
840,082,444✔
628
  if (pSchemaInfo->dbname) {
840,160,242!
629
    tstrncpy(dbName, pSchemaInfo->dbname, dbNameBuffLen);
840,187,692!
630
  } else {
631
    dbName[0] = 0;
×
632
  }
633
  if (pSchemaInfo->tablename) {
840,342,855✔
634
    tstrncpy(tableName, pSchemaInfo->tablename, tbaleNameBuffLen);
840,213,570!
635
  } else {
636
    tableName[0] = 0;
1,622✔
637
  }
638

639
  *tbGet = true;
840,330,464✔
640

641
  return TSDB_CODE_SUCCESS;
840,260,252✔
642
}
643

644
bool qIsDynamicExecTask(qTaskInfo_t tinfo) { return ((SExecTaskInfo*)tinfo)->dynamicTask; }
1,196,667,491✔
645

646
void qDestroyOperatorParam(SOperatorParam* pParam) {
×
647
  if (NULL == pParam) {
×
648
    return;
×
649
  }
650
  freeOperatorParam(pParam, OP_GET_PARAM);
×
651
}
652

653
void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) {
94,311,256✔
654
  TSWAP(pParam, ((SExecTaskInfo*)tinfo)->pOpParam);
94,311,256✔
655
  ((SExecTaskInfo*)tinfo)->paramSet = false;
94,311,256✔
656
}
94,311,256✔
657

658
int32_t qExecutorInit(void) {
14,346,709✔
659
  (void)taosThreadOnce(&initPoolOnce, initRefPool);
14,346,709✔
660
  return TSDB_CODE_SUCCESS;
14,346,709✔
661
}
662

663
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
1,238,203,994✔
664
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql,
665
                        EOPTR_EXEC_MODEL model) {
666
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
1,238,203,994✔
667
  (void)taosThreadOnce(&initPoolOnce, initRefPool);
1,238,203,994✔
668

669
  qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
1,238,186,055✔
670

671
  readHandle->uid = 0;
1,238,365,120✔
672
  int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model);
1,238,534,800✔
673
  if (code != TSDB_CODE_SUCCESS || NULL == *pTask) {
1,237,803,864!
674
    qError("failed to createExecTaskInfo, code:%s", tstrerror(code));
320,042!
675
    goto _error;
29,382✔
676
  }
677

678
  if (handle) {
1,237,660,293✔
679
    SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50, .compress = compressResult};
1,236,874,310✔
680
    void*           pSinkManager = NULL;
1,236,902,528✔
681
    code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
1,236,079,354✔
682
    if (code != TSDB_CODE_SUCCESS) {
1,236,880,127!
683
      qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
×
684
      goto _error;
×
685
    }
686

687
    void* pSinkParam = NULL;
1,236,880,127✔
688
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, (*pTask), readHandle);
1,236,902,499✔
689
    if (code != TSDB_CODE_SUCCESS) {
1,236,096,515!
690
      qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
×
691
      taosMemoryFree(pSinkManager);
×
692
      goto _error;
×
693
    }
694

695
    SDataSinkNode* pSink = NULL;
1,236,096,515✔
696
    if (readHandle->localExec) {
1,236,692,302✔
697
      code = nodesCloneNode((SNode*)pSubplan->pDataSink, (SNode**)&pSink);
35,015,387✔
698
      if (code != TSDB_CODE_SUCCESS) {
35,019,407!
699
        qError("failed to nodesCloneNode, srcType:%d, code:%s, %s", nodeType(pSubplan->pDataSink), tstrerror(code),
×
700
               (*pTask)->id.str);
701
        taosMemoryFree(pSinkManager);
×
702
        goto _error;
×
703
      }
704
    }
705

706
    // pSinkParam has been freed during create sinker.
707
    code = dsCreateDataSinker(pSinkManager, readHandle->localExec ? &pSink : &pSubplan->pDataSink, handle, pSinkParam,
1,236,429,532!
708
                              (*pTask)->id.str, pSubplan->processOneBlock);
1,237,027,235✔
709
    if (code) {
1,236,141,390✔
710
      qError("s-task:%s failed to create data sinker, code:%s", (*pTask)->id.str, tstrerror(code));
1,497!
711
    }
712
  }
713

714
  qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64 " code:%s", taskId, pSubplan->id.queryId,
1,237,692,527✔
715
         tstrerror(code));
716

717
_error:
121,190,226✔
718
  // if failed to add ref for all tables in this query, abort current query
719
  return code;
1,238,319,956✔
720
}
721

722
static void freeBlock(void* param) {
×
723
  SSDataBlock* pBlock = *(SSDataBlock**)param;
×
724
  blockDataDestroy(pBlock);
×
725
}
×
726

727
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal,
1,418,353,954✔
728
                     bool processOneBlock) {
729
  int32_t        code = TSDB_CODE_SUCCESS;
1,418,353,954✔
730
  int32_t        lino = 0;
1,418,353,954✔
731
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1,418,353,954✔
732
  int64_t        threadId = taosGetSelfPthreadId();
1,418,353,954✔
733

734
  if (pLocal) {
1,418,316,018✔
735
    memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
1,399,735,027!
736
  }
737

738
  taosArrayClear(pResList);
1,418,042,693✔
739

740
  int64_t curOwner = 0;
1,418,152,128✔
741
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
1,418,152,128!
742
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
×
743
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
×
744
    return pTaskInfo->code;
×
745
  }
746

747
  if (pTaskInfo->cost.start == 0) {
1,418,108,472✔
748
    pTaskInfo->cost.start = taosGetTimestampUs();
1,202,224,305✔
749
  }
750

751
  if (isTaskKilled(pTaskInfo)) {
1,418,305,630✔
752
    atomic_store_64(&pTaskInfo->owner, 0);
4,242✔
753
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
4,242!
754
    return pTaskInfo->code;
4,242✔
755
  }
756

757
  // error occurs, record the error code and return to client
758
  int32_t ret = setjmp(pTaskInfo->env);
1,418,232,926✔
759
  if (ret != TSDB_CODE_SUCCESS) {
1,418,170,208✔
760
    pTaskInfo->code = ret;
346,420✔
761
    (void)cleanUpUdfs();
346,420✔
762

763
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
346,420✔
764
    atomic_store_64(&pTaskInfo->owner, 0);
346,420✔
765

766
    return pTaskInfo->code;
346,420✔
767
  }
768

769
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
1,417,823,788✔
770

771
  int32_t      current = 0;
1,417,825,553✔
772
  SSDataBlock* pRes = NULL;
1,417,825,553✔
773
  int64_t      st = taosGetTimestampUs();
1,418,388,601✔
774

775
  if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
1,418,388,601!
776
    pTaskInfo->paramSet = true;
94,311,256✔
777
    code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes);
94,311,256✔
778
  } else {
779
    code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
1,323,989,554✔
780
  }
781

782
  QUERY_CHECK_CODE(code, lino, _end);
1,418,081,733!
783
  code = blockDataCheck(pRes);
1,418,081,733✔
784
  QUERY_CHECK_CODE(code, lino, _end);
1,418,077,602!
785

786
  if (pRes == NULL) {
1,418,077,602✔
787
    st = taosGetTimestampUs();
295,574,316✔
788
  }
789

790
  int32_t rowsThreshold = pTaskInfo->pSubplan->rowsThreshold;
1,418,090,485✔
791
  if (!pTaskInfo->pSubplan->dynamicRowThreshold || 4096 <= pTaskInfo->pSubplan->rowsThreshold) {
1,418,129,773!
792
    rowsThreshold = 4096;
1,406,295,679✔
793
  }
794

795
  int32_t blockIndex = 0;
1,418,095,725✔
796
  while (pRes != NULL) {
2,147,483,647✔
797
    SSDataBlock* p = NULL;
2,147,483,647✔
798
    if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) {
2,147,483,647✔
799
      SSDataBlock* p1 = NULL;
2,147,483,647✔
800
      code = createOneDataBlock(pRes, true, &p1);
2,147,483,647✔
801
      QUERY_CHECK_CODE(code, lino, _end);
2,147,483,647!
802

803
      void* tmp = taosArrayPush(pTaskInfo->pResultBlockList, &p1);
2,147,483,647✔
804
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2,147,483,647!
805
      p = p1;
2,147,483,647✔
806
    } else {
807
      void* tmp = taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
1,216,909,029✔
808
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,216,942,188!
809

810
      p = *(SSDataBlock**)tmp;
1,216,942,188✔
811
      code = copyDataBlock(p, pRes);
1,216,946,968✔
812
      QUERY_CHECK_CODE(code, lino, _end);
1,216,883,661!
813
    }
814

815
    blockIndex += 1;
2,147,483,647✔
816

817
    current += p->info.rows;
2,147,483,647✔
818
    QUERY_CHECK_CONDITION((p->info.rows > 0 || p->info.type == STREAM_CHECKPOINT), code, lino, _end,
2,147,483,647!
819
                          TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
820
    void* tmp = taosArrayPush(pResList, &p);
2,147,483,647✔
821
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2,147,483,647!
822

823
    if (current >= rowsThreshold || processOneBlock) {
2,147,483,647✔
824
      break;
825
    }
826

827
    code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
2,147,483,647✔
828
    QUERY_CHECK_CODE(code, lino, _end);
2,147,483,647!
829
    code = blockDataCheck(pRes);
2,147,483,647✔
830
    QUERY_CHECK_CODE(code, lino, _end);
2,147,483,647!
831
  }
832

833
  if (pTaskInfo->pSubplan->dynamicRowThreshold) {
1,418,133,894!
834
    pTaskInfo->pSubplan->rowsThreshold -= current;
11,774,880✔
835
  }
836

837
  *hasMore = (pRes != NULL);
1,418,169,386✔
838
  uint64_t el = (taosGetTimestampUs() - st);
1,418,028,750✔
839

840
  pTaskInfo->cost.elapsedTime += el;
1,418,028,750✔
841
  if (NULL == pRes) {
1,418,063,344✔
842
    *useconds = pTaskInfo->cost.elapsedTime;
1,232,317,490✔
843
  }
844

845
_end:
1,417,960,944✔
846
  (void)cleanUpUdfs();
1,418,040,249✔
847

848
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
1,418,208,122✔
849
  qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
1,418,207,627✔
850
         GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
851

852
  atomic_store_64(&pTaskInfo->owner, 0);
1,418,209,309✔
853
  if (code) {
1,418,208,806!
854
    pTaskInfo->code = code;
×
855
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
856
  }
857

858
  return pTaskInfo->code;
1,418,208,806✔
859
}
860

861
void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
×
862
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
863
  SArray*        pList = pTaskInfo->pResultBlockList;
×
864
  size_t         num = taosArrayGetSize(pList);
×
865
  for (int32_t i = 0; i < num; ++i) {
×
866
    SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i);
×
867
    if (p) {
×
868
      blockDataDestroy(*p);
×
869
    }
870
  }
871

872
  taosArrayClear(pTaskInfo->pResultBlockList);
×
873
}
×
874

875
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
271,846,935✔
876
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
271,846,935✔
877
  int64_t        threadId = taosGetSelfPthreadId();
271,846,935✔
878
  int64_t        curOwner = 0;
271,864,308✔
879

880
  *pRes = NULL;
271,864,308✔
881

882
  // todo extract method
883
  taosRLockLatch(&pTaskInfo->lock);
271,864,308✔
884
  bool isKilled = isTaskKilled(pTaskInfo);
271,867,038✔
885
  if (isKilled) {
271,865,314!
886
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
×
887

888
    taosRUnLockLatch(&pTaskInfo->lock);
×
889
    return pTaskInfo->code;
×
890
  }
891

892
  if (pTaskInfo->owner != 0) {
271,865,314!
893
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
×
894
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
×
895

896
    taosRUnLockLatch(&pTaskInfo->lock);
×
897
    return pTaskInfo->code;
×
898
  }
899

900
  pTaskInfo->owner = threadId;
271,864,391✔
901
  taosRUnLockLatch(&pTaskInfo->lock);
271,864,391✔
902

903
  if (pTaskInfo->cost.start == 0) {
271,866,182✔
904
    pTaskInfo->cost.start = taosGetTimestampUs();
677,098✔
905
  }
906

907
  // error occurs, record the error code and return to client
908
  int32_t ret = setjmp(pTaskInfo->env);
271,866,182✔
909
  if (ret != TSDB_CODE_SUCCESS) {
271,858,992!
910
    pTaskInfo->code = ret;
×
911
    (void)cleanUpUdfs();
×
912
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
×
913
    atomic_store_64(&pTaskInfo->owner, 0);
×
914
    return pTaskInfo->code;
×
915
  }
916

917
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
271,858,992!
918

919
  int64_t st = taosGetTimestampUs();
271,857,028✔
920
  int32_t code = TSDB_CODE_SUCCESS;
271,857,028✔
921
  if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
271,857,028!
922
    pTaskInfo->paramSet = true;
×
923
    code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, pRes);
×
924
  } else {
925
    code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, pRes);
271,857,028✔
926
  }
927
  if (code) {
271,804,754!
928
    pTaskInfo->code = code;
×
929
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
930
  }
931

932
  code = blockDataCheck(*pRes);
271,804,754✔
933
  if (code) {
271,854,406!
934
    pTaskInfo->code = code;
×
935
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
936
  }
937

938
  uint64_t el = (taosGetTimestampUs() - st);
271,796,752✔
939

940
  pTaskInfo->cost.elapsedTime += el;
271,796,752✔
941
  if (NULL == *pRes) {
271,844,743✔
942
    *useconds = pTaskInfo->cost.elapsedTime;
41,978,629✔
943
  }
944

945
  (void)cleanUpUdfs();
271,843,744✔
946

947
  int32_t  current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
271,864,357✔
948
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
271,867,038✔
949

950
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
271,866,136✔
951
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
952

953
  atomic_store_64(&pTaskInfo->owner, 0);
271,867,038✔
954
  return pTaskInfo->code;
271,867,038✔
955
}
956

957
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
508,915,047✔
958
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
508,915,047✔
959
  void* tmp = taosArrayPush(pTaskInfo->stopInfo.pStopInfo, pInfo);
508,919,090✔
960
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
508,919,090✔
961

962
  if (!tmp) {
508,914,288!
963
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
964
    return terrno;
×
965
  }
966
  return TSDB_CODE_SUCCESS;
508,914,288✔
967
}
968

969
int32_t stopInfoComp(void const* lp, void const* rp) {
×
970
  SExchangeOpStopInfo* key = (SExchangeOpStopInfo*)lp;
×
971
  SExchangeOpStopInfo* pInfo = (SExchangeOpStopInfo*)rp;
×
972

973
  if (key->refId < pInfo->refId) {
×
974
    return -1;
×
975
  } else if (key->refId > pInfo->refId) {
×
976
    return 1;
×
977
  }
978

979
  return 0;
×
980
}
981

982
void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
×
983
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
×
984
  int32_t idx = taosArraySearchIdx(pTaskInfo->stopInfo.pStopInfo, pInfo, stopInfoComp, TD_EQ);
×
985
  if (idx >= 0) {
×
986
    taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx);
×
987
  }
988
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
×
989
}
×
990

991
void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
196,737✔
992
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
196,737✔
993

994
  int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo);
196,737✔
995
  for (int32_t i = 0; i < num; ++i) {
212,050✔
996
    SExchangeOpStopInfo* pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i);
15,313✔
997
    if (!pStop) {
15,313!
998
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
999
      continue;
×
1000
    }
1001
    SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId);
15,313✔
1002
    if (pExchangeInfo) {
15,313!
1003
      int32_t code = tsem_post(&pExchangeInfo->ready);
15,313✔
1004
      if (code != TSDB_CODE_SUCCESS) {
15,313!
1005
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1006
      } else {
1007
        qDebug("post to exchange %" PRId64 " to stop", pStop->refId);
15,313!
1008
      }
1009
      code = taosReleaseRef(exchangeObjRefPool, pStop->refId);
15,313✔
1010
      if (code != TSDB_CODE_SUCCESS) {
15,313!
1011
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1012
      }
1013
    }
1014
  }
1015

1016
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
196,737✔
1017
}
196,737✔
1018

1019
int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
196,737✔
1020
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
196,737✔
1021
  if (pTaskInfo == NULL) {
196,737!
1022
    return TSDB_CODE_QRY_INVALID_QHANDLE;
×
1023
  }
1024

1025
  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
196,737!
1026

1027
  setTaskKilled(pTaskInfo, rspCode);
196,737✔
1028
  qStopTaskOperators(pTaskInfo);
196,737✔
1029

1030
  return TSDB_CODE_SUCCESS;
196,737✔
1031
}
1032

1033
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration) {
×
1034
  int64_t        st = taosGetTimestampMs();
×
1035
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
1036
  if (pTaskInfo == NULL) {
×
1037
    return TSDB_CODE_QRY_INVALID_QHANDLE;
×
1038
  }
1039

1040
  if (waitDuration > 0) {
×
1041
    qDebug("%s sync killed execTask, and waiting for at most %.2fs", GET_TASKID(pTaskInfo), waitDuration / 1000.0);
×
1042
  } else {
1043
    qDebug("%s async killed execTask", GET_TASKID(pTaskInfo));
×
1044
  }
1045

1046
  setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
×
1047

1048
  if (waitDuration > 0) {
×
1049
    while (1) {
1050
      taosWLockLatch(&pTaskInfo->lock);
×
1051
      if (qTaskIsExecuting(pTaskInfo)) {  // let's wait for 100 ms and try again
×
1052
        taosWUnLockLatch(&pTaskInfo->lock);
×
1053

1054
        taosMsleep(200);
×
1055

1056
        int64_t d = taosGetTimestampMs() - st;
×
1057
        if (d >= waitDuration && waitDuration >= 0) {
×
1058
          qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitDuration / 1000.0);
×
1059
          return TSDB_CODE_SUCCESS;
×
1060
        }
1061
      } else {  // not running now
1062
        pTaskInfo->code = rspCode;
×
1063
        taosWUnLockLatch(&pTaskInfo->lock);
×
1064
        return TSDB_CODE_SUCCESS;
×
1065
      }
1066
    }
1067
  }
1068

1069
  int64_t et = taosGetTimestampMs() - st;
×
1070
  if (et < waitDuration) {
×
1071
    qInfo("%s  waiting %.2fs for executor stopping", GET_TASKID(pTaskInfo), et / 1000.0);
×
1072
    return TSDB_CODE_SUCCESS;
×
1073
  }
1074
  return TSDB_CODE_SUCCESS;
×
1075
}
1076

1077
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
×
1078
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
×
1079
  if (NULL == pTaskInfo) {
×
1080
    return false;
×
1081
  }
1082

1083
  return 0 != atomic_load_64(&pTaskInfo->owner);
×
1084
}
1085

1086
static void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
1,240,352,311✔
1087
  STaskCostInfo* pSummary = &pTaskInfo->cost;
1,240,352,311✔
1088
  int64_t        idleTime = pSummary->start - pSummary->created;
1,240,345,706✔
1089

1090
  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
1,240,337,893✔
1091
  if (pSummary->pRecoder != NULL) {
1,240,329,854✔
1092
    qDebug(
820,338,212✔
1093
        "%s :cost summary: idle:%.2f ms, elapsed time:%.2f ms, extract tableList:%.2f ms, "
1094
        "createGroupIdMap:%.2f ms, total blocks:%d, "
1095
        "load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
1096
        GET_TASKID(pTaskInfo), idleTime / 1000.0, pSummary->elapsedTime / 1000.0, pSummary->extractListTime,
1097
        pSummary->groupIdMapTime, pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks,
1098
        pRecorder->totalRows, pRecorder->totalCheckedRows);
1099
  } else {
1100
    qDebug("%s :cost summary: idle in queue:%.2f ms, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), idleTime / 1000.0,
419,990,377✔
1101
           pSummary->elapsedTime / 1000.0);
1102
  }
1103
}
1,240,331,400✔
1104

1105
void qDestroyTask(qTaskInfo_t qTaskHandle) {
1,252,517,911✔
1106
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
1,252,517,911✔
1107
  if (pTaskInfo == NULL) {
1,252,517,911✔
1108
    return;
12,174,910✔
1109
  }
1110

1111
  if (pTaskInfo->pRoot != NULL) {
1,240,343,001!
1112
    qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
1,240,343,887✔
1113
  } else {
1114
    qDebug("%s execTask completed", GET_TASKID(pTaskInfo));
×
1115
  }
1116

1117
  printTaskExecCostInLog(pTaskInfo);  // print the query cost summary
1,240,343,781✔
1118
  doDestroyTask(pTaskInfo);
1,240,347,942✔
1119
}
1120

1121
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
19,465,206✔
1122
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
19,465,206✔
1123
  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
19,465,206✔
1124
}
1125

1126
void qExtractTmqScanner(qTaskInfo_t tinfo, void** scanner) {
916,126✔
1127
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
916,126✔
1128
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
916,126✔
1129

1130
  while (1) {
894,283✔
1131
    uint16_t type = pOperator->operatorType;
1,810,409✔
1132
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
1,810,409✔
1133
      *scanner = pOperator->info;
916,126✔
1134
      break;
916,126✔
1135
    } else {
1136
      pOperator = pOperator->pDownstream[0];
894,283✔
1137
    }
1138
  }
1139
}
916,126✔
1140

1141
static int32_t getOpratorIntervalInfo(SOperatorInfo* pOperator, int64_t* pWaterMark, SInterval* pInterval,
×
1142
                                      STimeWindow* pLastWindow, TSKEY* pRecInteral) {
1143
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
×
1144
    return getOpratorIntervalInfo(pOperator->pDownstream[0], pWaterMark, pInterval, pLastWindow, pRecInteral);
×
1145
  }
1146
  SStreamScanInfo* pScanOp = (SStreamScanInfo*)pOperator->info;
×
1147
  *pWaterMark = pScanOp->twAggSup.waterMark;
×
1148
  *pInterval = pScanOp->interval;
×
1149
  *pLastWindow = pScanOp->lastScanRange;
×
1150
  *pRecInteral = pScanOp->recalculateInterval;
×
1151
  return TSDB_CODE_SUCCESS;
×
1152
}
1153

1154
void* qExtractReaderFromTmqScanner(void* scanner) {
916,126✔
1155
  SStreamScanInfo* pInfo = scanner;
916,126✔
1156
  return (void*)pInfo->tqReader;
916,126✔
1157
}
1158

1159
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) {
2,114,476✔
1160
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2,114,476✔
1161
  return pTaskInfo->streamInfo.schema;
2,114,476✔
1162
}
1163

1164
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
2,113,620✔
1165
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2,113,620✔
1166
  return pTaskInfo->streamInfo.tbName;
2,113,620✔
1167
}
1168

1169
SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
2,303,888✔
1170
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2,303,888✔
1171
  return &pTaskInfo->streamInfo.btMetaRsp;
2,303,888✔
1172
}
1173

1174
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
45,032,245✔
1175
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
45,032,245✔
1176
  tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset);
45,032,245✔
1177
  return 0;
45,032,245✔
1178
  /*if (code != TSDB_CODE_SUCCESS) {
1179
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
1180
    pTaskInfo->code = code;
1181
    T_LONG_JMP(pTaskInfo->env, code);
1182
  }*/
1183
}
1184

1185
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
2,187,356✔
1186
  memset(pCond, 0, sizeof(SQueryTableDataCond));
2,187,356!
1187
  pCond->order = TSDB_ORDER_ASC;
2,187,356✔
1188
  pCond->numOfCols = pMtInfo->schema->nCols;
2,192,480✔
1189
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
2,201,076!
1190
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
2,191,624!
1191
  if (pCond->colList == NULL || pCond->pSlotList == NULL) {
2,196,808!
1192
    taosMemoryFreeClear(pCond->colList);
7,716!
1193
    taosMemoryFreeClear(pCond->pSlotList);
×
1194
    return terrno;
×
1195
  }
1196

1197
  TAOS_SET_OBJ_ALIGNED(&pCond->twindows, TSWINDOW_INITIALIZER);
2,189,924✔
1198
  pCond->suid = pMtInfo->suid;
2,197,652✔
1199
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
2,200,208✔
1200
  pCond->startVersion = -1;
2,202,788✔
1201
  pCond->endVersion = sContext->snapVersion;
2,191,636✔
1202

1203
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
13,962,446✔
1204
    SColumnInfo* pColInfo = &pCond->colList[i];
11,757,066✔
1205
    pColInfo->type = pMtInfo->schema->pSchema[i].type;
11,759,658✔
1206
    pColInfo->bytes = pMtInfo->schema->pSchema[i].bytes;
11,761,382✔
1207
    if (pMtInfo->pExtSchemas != NULL) {
11,748,494✔
1208
      decimalFromTypeMod(pMtInfo->pExtSchemas[i].typeMod, &pColInfo->precision, &pColInfo->scale);
136,674✔
1209
    }
1210
    pColInfo->colId = pMtInfo->schema->pSchema[i].colId;
11,761,370✔
1211
    pColInfo->pk = pMtInfo->schema->pSchema[i].flags & COL_IS_KEY;
11,761,370✔
1212

1213
    pCond->pSlotList[i] = i;
11,764,806✔
1214
  }
1215

1216
  return TSDB_CODE_SUCCESS;
2,205,380✔
1217
}
1218

1219
void qStreamSetOpen(qTaskInfo_t tinfo) {
267,483,071✔
1220
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
267,483,071✔
1221
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
267,483,071✔
1222
  pOperator->status = OP_NOT_OPENED;
267,509,203✔
1223
}
267,507,171✔
1224

1225
void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded) {
42,667,252✔
1226
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
42,667,252✔
1227
  pTaskInfo->streamInfo.sourceExcluded = sourceExcluded;
42,667,252✔
1228
}
42,669,542✔
1229

1230
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
45,558,561✔
1231
  int32_t        code = TSDB_CODE_SUCCESS;
45,558,561✔
1232
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
45,558,561✔
1233
  SStorageAPI*   pAPI = &pTaskInfo->storageAPI;
45,558,561✔
1234

1235
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
45,568,137✔
1236
  const char*    id = GET_TASKID(pTaskInfo);
45,565,504✔
1237

1238
  if (subType == TOPIC_SUB_TYPE__COLUMN && pOffset->type == TMQ_OFFSET__LOG) {
45,564,732✔
1239
    code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
41,825,504✔
1240
    if (pOperator == NULL || code != 0) {
41,820,253!
1241
      return code;
×
1242
    }
1243

1244
    SStreamScanInfo* pInfo = pOperator->info;
41,823,744✔
1245
    SStoreTqReader*  pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
41,822,848✔
1246
    SWalReader*      pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
41,823,746✔
1247
    walReaderVerifyOffset(pWalReader, pOffset);
41,827,299✔
1248
  }
1249
  // if pOffset equal to current offset, means continue consume
1250
  if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) {
45,566,421✔
1251
    return 0;
40,494,598✔
1252
  }
1253

1254
  if (subType == TOPIC_SUB_TYPE__COLUMN) {
5,062,219✔
1255
    code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
2,807,849✔
1256
    if (pOperator == NULL || code != 0) {
2,806,025!
1257
      return code;
×
1258
    }
1259

1260
    SStreamScanInfo* pInfo = pOperator->info;
2,808,665✔
1261
    STableScanInfo*  pScanInfo = pInfo->pTableScanOp->info;
2,806,019✔
1262
    STableScanBase*  pScanBaseInfo = &pScanInfo->base;
2,808,680✔
1263
    STableListInfo*  pTableListInfo = pScanBaseInfo->pTableListInfo;
2,809,544✔
1264

1265
    if (pOffset->type == TMQ_OFFSET__LOG) {
2,806,937✔
1266
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pScanBaseInfo->dataReader);
2,155,082✔
1267
      pScanBaseInfo->dataReader = NULL;
2,152,432✔
1268

1269
      SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
2,152,432✔
1270
      SWalReader*     pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
2,150,668✔
1271
      walReaderVerifyOffset(pWalReader, pOffset);
2,154,224✔
1272
      code = pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version, id);
2,155,082✔
1273
      if (code < 0) {
2,155,082✔
1274
        qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version, id);
23,536!
1275
        return code;
23,536✔
1276
      }
1277
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
649,188!
1278
      // iterate all tables from tableInfoList, and retrieve rows from each table one-by-one
1279
      // those data are from the snapshot in tsdb, besides the data in the wal file.
1280
      int64_t uid = pOffset->uid;
653,594✔
1281
      int64_t ts = pOffset->ts;
653,594✔
1282
      int32_t index = 0;
648,363✔
1283

1284
      // this value may be changed if new tables are created
1285
      taosRLockLatch(&pTaskInfo->lock);
648,363✔
1286
      int32_t numOfTables = 0;
652,704✔
1287
      code = tableListGetSize(pTableListInfo, &numOfTables);
652,704✔
1288
      if (code != TSDB_CODE_SUCCESS) {
650,127!
1289
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1290
        taosRUnLockLatch(&pTaskInfo->lock);
×
1291
        return code;
×
1292
      }
1293

1294
      if (uid == 0) {
650,127✔
1295
        if (numOfTables != 0) {
637,163✔
1296
          STableKeyInfo* tmp = tableListGetInfo(pTableListInfo, 0);
114,429✔
1297
          if (!tmp) {
114,429!
1298
            qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1299
            taosRUnLockLatch(&pTaskInfo->lock);
×
1300
            return terrno;
×
1301
          }
1302
          if (tmp) uid = tmp->uid;
114,429!
1303
          ts = INT64_MIN;
114,429✔
1304
          pScanInfo->currentTable = 0;
114,429✔
1305
        } else {
1306
          taosRUnLockLatch(&pTaskInfo->lock);
522,734✔
1307
          qError("no table in table list, %s", id);
521,870!
1308
          return TSDB_CODE_TMQ_NO_TABLE_QUALIFIED;
524,462✔
1309
        }
1310
      }
1311
      pTaskInfo->storageAPI.tqReaderFn.tqSetTablePrimaryKey(pInfo->tqReader, uid);
127,393✔
1312

1313
      qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% " PRId64 " rows returned", uid, ts,
130,000!
1314
             pInfo->pTableScanOp->resultInfo.totalRows);
1315
      pInfo->pTableScanOp->resultInfo.totalRows = 0;
130,000✔
1316

1317
      // start from current accessed position
1318
      // we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start
1319
      // position, let's find it from the beginning.
1320
      index = tableListFind(pTableListInfo, uid, 0);
130,000✔
1321
      taosRUnLockLatch(&pTaskInfo->lock);
130,000✔
1322

1323
      if (index >= 0) {
130,000!
1324
        pScanInfo->currentTable = index;
130,000✔
1325
      } else {
1326
        qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
×
1327
               numOfTables, pScanInfo->currentTable, id);
1328
        return TSDB_CODE_TMQ_NO_TABLE_QUALIFIED;
×
1329
      }
1330

1331
      STableKeyInfo keyInfo = {.uid = uid};
130,000✔
1332
      int64_t       oldSkey = pScanBaseInfo->cond.twindows.skey;
130,000✔
1333

1334
      // let's start from the next ts that returned to consumer.
1335
      if (pTaskInfo->storageAPI.tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader)) {
130,000!
1336
        pScanBaseInfo->cond.twindows.skey = ts;
×
1337
      } else {
1338
        pScanBaseInfo->cond.twindows.skey = ts + 1;
130,000✔
1339
      }
1340
      pScanInfo->scanTimes = 0;
130,000✔
1341

1342
      if (pScanBaseInfo->dataReader == NULL) {
130,000✔
1343
        code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond,
226,930✔
1344
                                                             &keyInfo, 1, pScanInfo->pResBlock,
1345
                                                             (void**)&pScanBaseInfo->dataReader, id, NULL);
113,465✔
1346
        if (code != TSDB_CODE_SUCCESS) {
112,594!
1347
          qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
×
1348
          return code;
×
1349
        }
1350

1351
        qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s",
112,594!
1352
               uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
1353
      } else {
1354
        code = pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
16,535✔
1355
        if (code != TSDB_CODE_SUCCESS) {
16,535!
1356
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1357
          return code;
×
1358
        }
1359

1360
        code = pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
16,535✔
1361
        if (code != TSDB_CODE_SUCCESS) {
16,535!
1362
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1363
          return code;
×
1364
        }
1365
        qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 "  table index:%d numOfTable:%d, %s",
16,535!
1366
               uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
1367
      }
1368

1369
      // restore the key value
1370
      pScanBaseInfo->cond.twindows.skey = oldSkey;
130,000✔
1371
    } else {
1372
      qError("invalid pOffset->type:%d, %s", pOffset->type, id);
×
1373
      return TSDB_CODE_PAR_INTERNAL_ERROR;
×
1374
    }
1375

1376
  } else {  // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB
1377
    if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
2,254,370✔
1378
      SStreamRawScanInfo* pInfo = pOperator->info;
2,208,746✔
1379
      SSnapContext*       sContext = pInfo->sContext;
2,208,746✔
1380
      SOperatorInfo*      p = NULL;
2,208,746✔
1381

1382
      code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id, &p);
2,208,746✔
1383
      if (code != 0) {
2,208,746!
1384
        return code;
×
1385
      }
1386

1387
      STableListInfo* pTableListInfo = ((SStreamRawScanInfo*)(p->info))->pTableListInfo;
2,208,746✔
1388

1389
      if (pAPI->snapshotFn.setForSnapShot(sContext, pOffset->uid) != 0) {
2,208,746!
1390
        qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id);
×
1391
        return TSDB_CODE_PAR_INTERNAL_ERROR;
×
1392
      }
1393

1394
      SMetaTableInfo mtInfo = {0};
2,207,034✔
1395
      code = pTaskInfo->storageAPI.snapshotFn.getMetaTableInfoFromSnapshot(sContext, &mtInfo);
2,208,746✔
1396
      if (code != 0) {
2,200,150!
1397
        destroyMetaTableInfo(&mtInfo);
1398
        return code;
×
1399
      }
1400
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
2,200,150✔
1401
      pInfo->dataReader = NULL;
2,200,162✔
1402

1403
      cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
2,201,018✔
1404
      tableListClear(pTableListInfo);
2,197,582✔
1405

1406
      if (mtInfo.uid == 0) {
2,208,746✔
1407
        destroyMetaTableInfo(&mtInfo);
1408
        goto end;  // no data
3,366✔
1409
      }
1410

1411
      pAPI->snapshotFn.taosXSetTablePrimaryKey(sContext, mtInfo.uid);
2,205,380✔
1412
      code = initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
2,190,780✔
1413
      if (code != TSDB_CODE_SUCCESS) {
2,198,496!
1414
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1415
        destroyMetaTableInfo(&mtInfo);
1416
        return code;
×
1417
      }
1418
      if (pAPI->snapshotFn.taosXGetTablePrimaryKey(sContext)) {
2,198,496!
1419
        pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
×
1420
      } else {
1421
        pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts + 1;
2,191,636✔
1422
      }
1423

1424
      code = tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0);
2,198,484✔
1425
      if (code != TSDB_CODE_SUCCESS) {
2,205,380!
1426
        destroyMetaTableInfo(&mtInfo);
1427
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1428
        return code;
×
1429
      }
1430

1431
      STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
2,205,380✔
1432
      if (!pList) {
2,205,380!
1433
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1434
        destroyMetaTableInfo(&mtInfo);
1435
        return code;
×
1436
      }
1437
      int32_t size = 0;
2,205,380✔
1438
      code = tableListGetSize(pTableListInfo, &size);
2,205,380✔
1439
      if (code != TSDB_CODE_SUCCESS) {
2,205,380!
1440
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1441
        destroyMetaTableInfo(&mtInfo);
1442
        return code;
×
1443
      }
1444

1445
      code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size,
4,410,760✔
1446
                                                           NULL, (void**)&pInfo->dataReader, NULL, NULL);
2,205,380✔
1447
      if (code != TSDB_CODE_SUCCESS) {
2,197,652!
1448
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1449
        destroyMetaTableInfo(&mtInfo);
1450
        return code;
×
1451
      }
1452

1453
      cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
2,197,652✔
1454
      tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN);
2,199,376!
1455
      //      pTaskInfo->streamInfo.suid = mtInfo.suid == 0 ? mtInfo.uid : mtInfo.suid;
1456
      tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema);
2,196,808✔
1457
      pTaskInfo->streamInfo.schema = mtInfo.schema;
2,202,800✔
1458
      taosMemoryFreeClear(mtInfo.pExtSchemas);
2,201,944!
1459

1460
      qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64 " %s", mtInfo.uid, pOffset->ts, id);
2,201,944✔
1461
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
45,624✔
1462
      SStreamRawScanInfo* pInfo = pOperator->info;
15,458✔
1463
      SSnapContext*       sContext = pInfo->sContext;
15,458✔
1464
      code = pTaskInfo->storageAPI.snapshotFn.setForSnapShot(sContext, pOffset->uid);
15,458✔
1465
      if (code != 0) {
15,458!
1466
        qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
×
1467
        return code;
×
1468
      }
1469
      qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts,
15,458!
1470
             id);
1471
    } else if (pOffset->type == TMQ_OFFSET__LOG) {
30,166!
1472
      SStreamRawScanInfo* pInfo = pOperator->info;
30,166✔
1473
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
30,166✔
1474
      pInfo->dataReader = NULL;
30,166✔
1475
      qDebug("tmqsnap qStreamPrepareScan snapshot log, %s", id);
30,166!
1476
    }
1477
  }
1478

1479
end:
4,515,844✔
1480
  tOffsetCopy(&pTaskInfo->streamInfo.currentOffset, pOffset);
4,515,916✔
1481
  return 0;
4,515,916✔
1482
}
1483

1484
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
874,426,908✔
1485
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
874,426,908✔
1486
  if (pMsg->info.ahandle == NULL) {
874,500,922!
1487
    qError("pMsg->info.ahandle is NULL");
×
1488
    return;
×
1489
  }
1490

1491
  qDebug("rsp msg got, code:%x, len:%d, 0x%" PRIx64 ":0x%" PRIx64, 
874,393,195✔
1492
      pMsg->code, pMsg->contLen, TRACE_GET_ROOTID(&pMsg->info.traceId), TRACE_GET_MSGID(&pMsg->info.traceId));
1493

1494
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};
874,430,972✔
1495

1496
  if (pMsg->contLen > 0) {
874,413,303✔
1497
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
874,361,449!
1498
    if (buf.pData == NULL) {
874,338,351!
1499
      pMsg->code = terrno;
×
1500
    } else {
1501
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
874,338,351!
1502
    }
1503
  }
1504

1505
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
874,538,215✔
1506
  rpcFreeCont(pMsg->pCont);
874,587,223✔
1507
  destroySendMsgInfo(pSendInfo);
874,449,520✔
1508
}
1509

1510
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
×
1511
  int32_t        code = TSDB_CODE_SUCCESS;
×
1512
  int32_t        lino = 0;
×
1513
  SExecTaskInfo* pTaskInfo = tinfo;
×
1514
  SArray*        plist = NULL;
×
1515

1516
  code = getTableListInfo(pTaskInfo, &plist);
×
1517
  if (code || plist == NULL) {
×
1518
    return NULL;
×
1519
  }
1520

1521
  // only extract table in the first elements
1522
  STableListInfo* pTableListInfo = taosArrayGetP(plist, 0);
×
1523

1524
  SArray* pUidList = taosArrayInit(10, sizeof(uint64_t));
×
1525
  QUERY_CHECK_NULL(pUidList, code, lino, _end, terrno);
×
1526

1527
  int32_t numOfTables = 0;
×
1528
  code = tableListGetSize(pTableListInfo, &numOfTables);
×
1529
  QUERY_CHECK_CODE(code, lino, _end);
×
1530

1531
  for (int32_t i = 0; i < numOfTables; ++i) {
×
1532
    STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
×
1533
    QUERY_CHECK_NULL(pKeyInfo, code, lino, _end, terrno);
×
1534
    void* tmp = taosArrayPush(pUidList, &pKeyInfo->uid);
×
1535
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1536
  }
1537

1538
  taosArrayDestroy(plist);
×
1539

1540
_end:
×
1541
  if (code != TSDB_CODE_SUCCESS) {
×
1542
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1543
    T_LONG_JMP(pTaskInfo->env, code);
×
1544
  }
1545
  return pUidList;
×
1546
}
1547

1548
static int32_t extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
11,567,606✔
1549
  int32_t        code = TSDB_CODE_SUCCESS;
11,567,606✔
1550
  int32_t        lino = 0;
11,567,606✔
1551
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
11,567,606✔
1552

1553
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
11,570,566!
1554
    SStreamScanInfo* pScanInfo = pOperator->info;
×
1555
    STableScanInfo*  pTableScanInfo = pScanInfo->pTableScanOp->info;
×
1556

1557
    void* tmp = taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
×
1558
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1559
  } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
11,570,578✔
1560
    STableScanInfo* pScanInfo = pOperator->info;
5,785,289✔
1561

1562
    void* tmp = taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
5,788,237✔
1563
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
5,788,243!
1564
  } else {
1565
    if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
5,791,197!
1566
      code = extractTableList(pList, pOperator->pDownstream[0]);
5,789,717✔
1567
    }
1568
  }
1569

1570
_end:
×
1571
  if (code != TSDB_CODE_SUCCESS) {
11,569,092!
1572
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1573
  }
1574
  return code;
11,566,138✔
1575
}
1576

1577
int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList) {
5,788,237✔
1578
  if (pList == NULL) {
5,788,237!
1579
    return TSDB_CODE_INVALID_PARA;
×
1580
  }
1581

1582
  *pList = NULL;
5,788,237✔
1583
  SArray* pArray = taosArrayInit(0, POINTER_BYTES);
5,791,197✔
1584
  if (pArray == NULL) {
5,788,237!
1585
    return terrno;
×
1586
  }
1587

1588
  int32_t code = extractTableList(pArray, pTaskInfo->pRoot);
5,788,237✔
1589
  if (code == 0) {
5,788,243!
1590
    *pList = pArray;
5,788,243✔
1591
  } else {
1592
    taosArrayDestroy(pArray);
×
1593
  }
1594
  return code;
5,789,717✔
1595
}
1596

1597
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) {
×
1598
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
×
1599
  if (pTaskInfo->pRoot->fpSet.releaseStreamStateFn != NULL) {
×
1600
    pTaskInfo->pRoot->fpSet.releaseStreamStateFn(pTaskInfo->pRoot);
×
1601
  }
1602
  return 0;
×
1603
}
1604

1605
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo) {
×
1606
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
×
1607
  if (pTaskInfo->pRoot->fpSet.reloadStreamStateFn != NULL) {
×
1608
    pTaskInfo->pRoot->fpSet.reloadStreamStateFn(pTaskInfo->pRoot);
×
1609
  }
1610
  return 0;
×
1611
}
1612

1613
void qResetTaskCode(qTaskInfo_t tinfo) {
×
1614
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
1615

1616
  int32_t code = pTaskInfo->code;
×
1617
  pTaskInfo->code = 0;
×
1618
  qDebug("0x%" PRIx64 " reset task code to be success, prev:%s", pTaskInfo->id.taskId, tstrerror(code));
×
1619
}
×
1620

1621
int32_t collectExprsToReplaceForStream(SOperatorInfo* pOper, SArray* pExprs) {
×
1622
  int32_t code = 0;
×
1623
  return code;
×
1624
}
1625

1626
int32_t streamCollectExprsForReplace(qTaskInfo_t tInfo, SArray* pExprs) {
×
1627
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
×
1628
  int32_t        code = collectExprsToReplaceForStream(pTaskInfo->pRoot, pExprs);
×
1629
  return code;
×
1630
}
1631

1632
int32_t clearStatesForOperator(SOperatorInfo* pOper) {
175,893,517✔
1633
  int32_t code = 0;
175,893,517✔
1634

1635
  freeResetOperatorParams(pOper, OP_GET_PARAM, true);
175,893,517✔
1636
  freeResetOperatorParams(pOper, OP_NOTIFY_PARAM, true);
175,889,763✔
1637

1638
  if (pOper->fpSet.resetStateFn) {
175,888,458!
1639
    code = pOper->fpSet.resetStateFn(pOper);
175,893,559✔
1640
  }
1641
  pOper->status = OP_NOT_OPENED;
175,875,643✔
1642
  for (int32_t i = 0; i < pOper->numOfDownstream && code == 0; ++i) {
306,114,626✔
1643
    code = clearStatesForOperator(pOper->pDownstream[i]);
130,204,453✔
1644
  }
1645
  return code;
175,917,822✔
1646
}
1647

1648
int32_t streamClearStatesForOperators(qTaskInfo_t tInfo) {
45,690,316✔
1649
  int32_t        code = 0;
45,690,316✔
1650
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
45,690,316✔
1651
  SOperatorInfo* pOper = pTaskInfo->pRoot;
45,690,316✔
1652
  pTaskInfo->code = TSDB_CODE_SUCCESS;
45,690,316✔
1653
  code = clearStatesForOperator(pOper);
45,690,316✔
1654
  return code;
45,690,316✔
1655
}
1656

1657
int32_t streamExecuteTask(qTaskInfo_t tInfo, SSDataBlock** ppRes, uint64_t* useconds, bool* finished) {
57,101,163✔
1658
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
57,101,163✔
1659
  int64_t        threadId = taosGetSelfPthreadId();
57,101,163✔
1660
  int64_t        curOwner = 0;
57,101,163✔
1661

1662
  *ppRes = NULL;
57,101,163✔
1663

1664
  // todo extract method
1665
  taosRLockLatch(&pTaskInfo->lock);
57,101,163✔
1666
  bool isKilled = isTaskKilled(pTaskInfo);
57,101,163✔
1667
  if (isKilled) {
57,101,163!
1668
    // clearStreamBlock(pTaskInfo->pRoot);
1669
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
×
1670

1671
    taosRUnLockLatch(&pTaskInfo->lock);
×
1672
    return pTaskInfo->code;
×
1673
  }
1674

1675
  if (pTaskInfo->owner != 0) {
57,101,163!
1676
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
×
1677
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
×
1678

1679
    taosRUnLockLatch(&pTaskInfo->lock);
×
1680
    return pTaskInfo->code;
×
1681
  }
1682

1683
  pTaskInfo->owner = threadId;
57,101,163✔
1684
  taosRUnLockLatch(&pTaskInfo->lock);
57,101,163✔
1685

1686
  if (pTaskInfo->cost.start == 0) {
57,102,440✔
1687
    pTaskInfo->cost.start = taosGetTimestampUs();
914,934✔
1688
  }
1689

1690
  // error occurs, record the error code and return to client
1691
  int32_t ret = setjmp(pTaskInfo->env);
57,102,440✔
1692
  if (ret != TSDB_CODE_SUCCESS) {
75,681,329✔
1693
    pTaskInfo->code = ret;
18,581,543✔
1694
    (void)cleanUpUdfs();
18,581,543✔
1695
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
18,581,543!
1696
    atomic_store_64(&pTaskInfo->owner, 0);
18,581,543✔
1697
    return pTaskInfo->code;
18,581,543✔
1698
  }
1699

1700
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
57,099,786✔
1701

1702
  int64_t st = taosGetTimestampUs();
57,101,163✔
1703

1704
  int32_t code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, ppRes);
57,101,163✔
1705
  if (code) {
38,519,597!
1706
    pTaskInfo->code = code;
×
1707
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
1708
  } else {
1709
    *finished = *ppRes == NULL;
38,519,597✔
1710
    code = blockDataCheck(*ppRes);
38,520,897✔
1711
  }
1712
  if (code) {
38,520,897!
1713
    pTaskInfo->code = code;
×
1714
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
1715
  }
1716

1717
  uint64_t el = (taosGetTimestampUs() - st);
38,519,598✔
1718

1719
  pTaskInfo->cost.elapsedTime += el;
38,519,598✔
1720
  if (NULL == *ppRes) {
38,519,597✔
1721
    *useconds = pTaskInfo->cost.elapsedTime;
26,389,765✔
1722
  }
1723

1724
  (void)cleanUpUdfs();
38,520,897✔
1725

1726
  int32_t  current = (*ppRes != NULL) ? (*ppRes)->info.rows : 0;
38,520,897✔
1727
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
38,520,897✔
1728

1729
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
38,520,897✔
1730
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
1731

1732
  atomic_store_64(&pTaskInfo->owner, 0);
38,520,897✔
1733
  return pTaskInfo->code;
38,520,897✔
1734
}
1735

1736
// void streamSetTaskRuntimeInfo(qTaskInfo_t tinfo, SStreamRuntimeInfo* pStreamRuntimeInfo) {
1737
//   SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1738
//   pTaskInfo->pStreamRuntimeInfo = pStreamRuntimeInfo;
1739
// }
1740

1741
int32_t qStreamCreateTableListForReader(void* pVnode, uint64_t suid, uint64_t uid, int8_t tableType,
4,258,173✔
1742
                                        SNodeList* pGroupTags, bool groupSort, SNode* pTagCond, SNode* pTagIndexCond,
1743
                                        SStorageAPI* storageAPI, void** pTableListInfo, SHashObj* groupIdMap) {
1744
  STableListInfo* pList = tableListCreate();
4,258,173✔
1745
  if (pList == NULL) {
4,267,204!
1746
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1747
    return terrno;
×
1748
  }
1749

1750
  SScanPhysiNode pScanNode = {.suid = suid, .uid = uid, .tableType = tableType};
4,267,204✔
1751
  SReadHandle    pHandle = {.vnode = pVnode};
4,267,204✔
1752
  SExecTaskInfo  pTaskInfo = {.id.str = "", .storageAPI = *storageAPI};
4,267,204✔
1753

1754
  int32_t code = createScanTableListInfo(&pScanNode, pGroupTags, groupSort, &pHandle, pList, pTagCond, pTagIndexCond, &pTaskInfo, groupIdMap);
4,268,450✔
1755
  if (code != 0) {
4,267,169!
1756
    tableListDestroy(pList);
×
1757
    qError("failed to createScanTableListInfo, code:%s", tstrerror(code));
×
1758
    return code;
×
1759
  }
1760
  *pTableListInfo = pList;
4,267,169✔
1761
  return 0;
4,265,888✔
1762
}
1763

1764
int32_t qStreamGetTableList(void* pTableListInfo, int32_t currentGroupId, STableKeyInfo** pKeyInfo, int32_t* size) {
38,793,202✔
1765
  if (pTableListInfo == NULL || pKeyInfo == NULL || size == NULL) {
38,793,202!
1766
    return TSDB_CODE_INVALID_PARA;
×
1767
  }
1768
  if (taosArrayGetSize(((STableListInfo*)pTableListInfo)->pTableList) == 0) {
38,793,202✔
1769
    *size = 0;
29,091,806✔
1770
    *pKeyInfo = NULL;
29,091,806✔
1771
    return 0;
29,091,806✔
1772
  }
1773
  if (currentGroupId == -1) {
9,702,669✔
1774
    *size = taosArrayGetSize(((STableListInfo*)pTableListInfo)->pTableList);
866,540✔
1775
    *pKeyInfo = taosArrayGet(((STableListInfo*)pTableListInfo)->pTableList, 0);
866,540✔
1776
    return 0;
866,540✔
1777
  }
1778
  return tableListGetGroupList(pTableListInfo, currentGroupId, pKeyInfo, size);
8,836,129✔
1779
}
1780

1781
int32_t  qStreamSetTableList(void** pTableListInfo, uint64_t uid, uint64_t gid){
2,878,834✔
1782
  if (*pTableListInfo == NULL) {
2,878,834✔
1783
    *pTableListInfo = tableListCreate();
940,325✔
1784
    if (*pTableListInfo == NULL) {
940,325!
1785
      return terrno;
×
1786
    }
1787
  }
1788
  return tableListAddTableInfo(*pTableListInfo, uid, gid);
2,878,834✔
1789
}
1790

1791
int32_t qStreamGetGroupIndex(void* pTableListInfo, int64_t gid) {
35,424,831✔
1792
  if (((STableListInfo*)pTableListInfo)->groupOffset == NULL){
35,424,831✔
1793
    return 0;
28,905,554✔
1794
  }
1795
  for (int32_t i = 0; i < ((STableListInfo*)pTableListInfo)->numOfOuputGroups; ++i) {
7,542,069✔
1796
    int32_t offset = ((STableListInfo*)pTableListInfo)->groupOffset[i];
7,357,333✔
1797

1798
    STableKeyInfo* pKeyInfo = taosArrayGet(((STableListInfo*)pTableListInfo)->pTableList, offset);
7,356,049✔
1799
    if (pKeyInfo != NULL && pKeyInfo->groupId == gid) {
7,357,333!
1800
      return i;
6,337,087✔
1801
    }
1802
  }
1803
  return -1;
184,736✔
1804
}
1805

1806
void qStreamDestroyTableList(void* pTableListInfo) { tableListDestroy(pTableListInfo); }
5,421,825✔
1807

1808
uint64_t qStreamGetGroupId(void* pTableListInfo, int64_t uid) { return tableListGetTableGroupId(pTableListInfo, uid); }
18,152,310✔
1809

1810
int32_t qStreamGetTableListGroupNum(const void* pTableList) { return ((STableListInfo*)pTableList)->numOfOuputGroups; }
2,830,440✔
1811
void    qStreamSetTableListGroupNum(const void* pTableList, int32_t groupNum) {((STableListInfo*)pTableList)->numOfOuputGroups = groupNum; }
940,325✔
1812
SArray* qStreamGetTableArrayList(const void* pTableList) { return ((STableListInfo*)pTableList)->pTableList; }
775,355✔
1813

1814
int32_t qStreamFilter(SSDataBlock* pBlock, void* pFilterInfo, SColumnInfoData** pRet) { return doFilter(pBlock, pFilterInfo, NULL, pRet); }
4,727,011✔
1815

1816
void streamDestroyExecTask(qTaskInfo_t tInfo) {
7,236,095✔
1817
  qDebug("streamDestroyExecTask called, task:%p", tInfo);
7,236,095✔
1818
  qDestroyTask(tInfo);
7,236,095✔
1819
}
7,236,095✔
1820

1821
int32_t streamCalcOneScalarExpr(SNode* pExpr, SScalarParam* pDst, const SStreamRuntimeFuncInfo* pExtraParams) {
2,248,077✔
1822
  return streamCalcOneScalarExprInRange(pExpr, pDst, -1, -1, pExtraParams);
2,248,077✔
1823
}
1824

1825
int32_t streamCalcOneScalarExprInRange(SNode* pExpr, SScalarParam* pDst, int32_t rowStartIdx, int32_t rowEndIdx,
2,249,342✔
1826
                                       const SStreamRuntimeFuncInfo* pExtraParams) {
1827
  int32_t      code = 0;
2,249,342✔
1828
  SNode*       pNode = 0;
2,249,342✔
1829
  SNodeList*   pList = NULL;
2,249,342✔
1830
  SExprInfo*   pExprInfo = NULL;
2,249,342✔
1831
  int32_t      numOfExprs = 1;
2,249,342✔
1832
  int32_t*     offset = 0;
2,249,342✔
1833
  STargetNode* pTargetNode = NULL;
2,249,342✔
1834
  code = nodesMakeNode(QUERY_NODE_TARGET, (SNode**)&pTargetNode);
2,249,342✔
1835
  if (code == 0) code = nodesCloneNode(pExpr, &pNode);
2,249,342!
1836

1837
  if (code == 0) {
2,249,342!
1838
    pTargetNode->dataBlockId = 0;
2,249,342✔
1839
    pTargetNode->pExpr = pNode;
2,249,342✔
1840
    pTargetNode->slotId = 0;
2,249,342✔
1841
  }
1842
  if (code == 0) {
2,249,342!
1843
    code = nodesMakeList(&pList);
2,249,342✔
1844
  }
1845
  if (code == 0) {
2,249,342!
1846
    code = nodesListAppend(pList, (SNode*)pTargetNode);
2,249,342✔
1847
  }
1848
  if (code == 0) {
2,249,342!
1849
    pNode = NULL;
2,249,342✔
1850
    code = createExprInfo(pList, NULL, &pExprInfo, &numOfExprs);
2,249,342✔
1851
  }
1852

1853
  if (code == 0) {
2,249,342!
1854
    const char* pVal = NULL;
2,249,342✔
1855
    int32_t     len = 0;
2,249,342✔
1856
    SNode*      pSclNode = NULL;
2,249,342✔
1857
    switch (pExprInfo->pExpr->nodeType) {
2,249,342!
1858
      case QUERY_NODE_FUNCTION:
2,249,342✔
1859
        pSclNode = (SNode*)pExprInfo->pExpr->_function.pFunctNode;
2,249,342✔
1860
        break;
2,249,342✔
1861
      case QUERY_NODE_OPERATOR:
×
1862
        pSclNode = pExprInfo->pExpr->_optrRoot.pRootNode;
×
1863
        break;
×
1864
      default:
×
1865
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
1866
        break;
×
1867
    }
1868
    SArray*     pBlockList = taosArrayInit(2, POINTER_BYTES);
2,249,342✔
1869
    SSDataBlock block = {0};
2,249,342✔
1870
    block.info.rows = 1;
2,249,342✔
1871
    SSDataBlock* pBlock = &block;
2,249,342✔
1872
    void*        tmp = taosArrayPush(pBlockList, &pBlock);
2,249,342✔
1873
    if (tmp == NULL) {
2,249,342!
1874
      code = terrno;
×
1875
    }
1876
    if (code == 0) {
2,249,342!
1877
      code = scalarCalculateInRange(pSclNode, pBlockList, pDst, rowStartIdx, rowEndIdx, pExtraParams, NULL);
2,249,342✔
1878
    }
1879
    taosArrayDestroy(pBlockList);
2,249,342✔
1880
  }
1881
  nodesDestroyList(pList);
2,249,342✔
1882
  destroyExprInfo(pExprInfo, numOfExprs);
2,249,342✔
1883
  taosMemoryFreeClear(pExprInfo);
2,249,342!
1884
  return code;
2,249,342✔
1885
}
1886

1887
int32_t streamForceOutput(qTaskInfo_t tInfo, SSDataBlock** pRes, int32_t winIdx) {
14,317,204✔
1888
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
14,317,204✔
1889
  const SArray*  pForceOutputCols = pTaskInfo->pStreamRuntimeInfo->pForceOutputCols;
14,317,204✔
1890
  int32_t        code = 0;
14,317,204✔
1891
  SNode*         pNode = NULL;
14,317,204✔
1892
  if (!pForceOutputCols) return 0;
14,317,204✔
1893
  if (!*pRes) {
1,265!
1894
    code = createDataBlock(pRes);
1,265✔
1895
  }
1896

1897
  if (code == 0 && (!(*pRes)->pDataBlock || (*pRes)->pDataBlock->size == 0)) {
1,265!
1898
    int32_t idx = 0;
1,265✔
1899
    for (int32_t i = 0; i < pForceOutputCols->size; ++i) {
3,795✔
1900
      SStreamOutCol*  pCol = (SStreamOutCol*)taosArrayGet(pForceOutputCols, i);
2,530✔
1901
      SColumnInfoData colInfo = createColumnInfoData(pCol->type.type, pCol->type.bytes, idx++);
2,530✔
1902
      colInfo.info.precision = pCol->type.precision;
2,530✔
1903
      colInfo.info.scale = pCol->type.scale;
2,530✔
1904
      code = blockDataAppendColInfo(*pRes, &colInfo);
2,530✔
1905
      if (code != 0) break;
2,530!
1906
    }
1907
  }
1908

1909
  code = blockDataEnsureCapacity(*pRes, (*pRes)->info.rows + 1);
1,265✔
1910
  if (code != TSDB_CODE_SUCCESS) {
1,265!
1911
    qError("failed to ensure capacity for force output, code:%s", tstrerror(code));
×
1912
    return code;
×
1913
  }
1914

1915
  // loop all exprs for force output, execute all exprs
1916
  int32_t idx = 0;
1,265✔
1917
  int32_t rowIdx = (*pRes)->info.rows;
1,265✔
1918
  int32_t tmpWinIdx = pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
1,265✔
1919
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = winIdx;
1,265✔
1920
  for (int32_t i = 0; i < pForceOutputCols->size; ++i) {
3,795✔
1921
    SScalarParam   dst = {0};
2,530✔
1922
    SStreamOutCol* pCol = (SStreamOutCol*)taosArrayGet(pForceOutputCols, i);
2,530✔
1923
    code = nodesStringToNode(pCol->expr, &pNode);
2,530✔
1924
    if (code != 0) break;
2,530!
1925
    SColumnInfoData* pInfo = taosArrayGet((*pRes)->pDataBlock, idx);
2,530✔
1926
    if (nodeType(pNode) == QUERY_NODE_VALUE) {
2,530✔
1927
      void* p = nodesGetValueFromNode((SValueNode*)pNode);
1,265✔
1928
      code = colDataSetVal(pInfo, rowIdx, p, ((SValueNode*)pNode)->isNull);
1,265!
1929
    } else {
1930
      dst.columnData = pInfo;
1,265✔
1931
      dst.numOfRows = rowIdx;
1,265✔
1932
      dst.colAlloced = false;
1,265✔
1933
      code = streamCalcOneScalarExprInRange(pNode, &dst, rowIdx, rowIdx, &pTaskInfo->pStreamRuntimeInfo->funcInfo);
1,265✔
1934
    }
1935
    ++idx;
2,530✔
1936
    // TODO sclFreeParam(&dst);
1937
    nodesDestroyNode(pNode);
2,530✔
1938
    if (code != 0) break;
2,530!
1939
  }
1940
  if (code == TSDB_CODE_SUCCESS) {
1,265!
1941
    (*pRes)->info.rows++;
1,265✔
1942
  }
1943
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = tmpWinIdx;
1,265✔
1944
  return code;
1,265✔
1945
}
1946

1947
int32_t streamCalcOutputTbName(SNode* pExpr, char* tbname, SStreamRuntimeFuncInfo* pStreamRuntimeInfo) {
816,177✔
1948
  int32_t      code = 0;
816,177✔
1949
  const char*  pVal = NULL;
816,177✔
1950
  SScalarParam dst = {0};
816,177✔
1951
  int32_t      len = 0;
816,177✔
1952
  int32_t      nextIdx = pStreamRuntimeInfo->curIdx;
816,177✔
1953
  pStreamRuntimeInfo->curIdx = 0;  // always use the first window to calc tbname
816,177✔
1954
  // execute the expr
1955
  switch (pExpr->type) {
816,177!
1956
    case QUERY_NODE_VALUE: {
×
1957
      SValueNode* pValue = (SValueNode*)pExpr;
×
1958
      int32_t     type = pValue->node.resType.type;
×
1959
      if (!IS_STR_DATA_TYPE(type)) {
×
1960
        qError("invalid sub tb expr with non-str type");
×
1961
        code = TSDB_CODE_INVALID_PARA;
×
1962
        break;
×
1963
      }
1964
      void* pTmp = nodesGetValueFromNode((SValueNode*)pExpr);
×
1965
      if (pTmp == NULL) {
×
1966
        qError("invalid sub tb expr with null value");
×
1967
        code = TSDB_CODE_INVALID_PARA;
×
1968
        break;
×
1969
      }
1970
      pVal = varDataVal(pTmp);
×
1971
      len = varDataLen(pTmp);
×
1972
    } break;
×
1973
    case QUERY_NODE_FUNCTION: {
816,177✔
1974
      SFunctionNode* pFunc = (SFunctionNode*)pExpr;
816,177✔
1975
      if (!IS_STR_DATA_TYPE(pFunc->node.resType.type)) {
816,177!
1976
        qError("invalid sub tb expr with non-str type func");
×
1977
        code = TSDB_CODE_INVALID_PARA;
×
1978
        break;
×
1979
      }
1980
      SColumnInfoData* pCol = taosMemoryCalloc(1, sizeof(SColumnInfoData));
816,177!
1981
      if (!pCol) {
816,177!
1982
        code = terrno;
×
1983
        qError("failed to allocate col info data at: %s, %d", __func__, __LINE__);
×
1984
        break;
×
1985
      }
1986

1987
      pCol->hasNull = true;
816,177✔
1988
      pCol->info.type = ((SExprNode*)pExpr)->resType.type;
816,177✔
1989
      pCol->info.colId = 0;
816,177✔
1990
      pCol->info.bytes = ((SExprNode*)pExpr)->resType.bytes;
816,177✔
1991
      pCol->info.precision = ((SExprNode*)pExpr)->resType.precision;
816,177✔
1992
      pCol->info.scale = ((SExprNode*)pExpr)->resType.scale;
816,177✔
1993
      code = colInfoDataEnsureCapacity(pCol, 1, true);
816,177✔
1994
      if (code != 0) {
816,177!
1995
        qError("failed to ensure capacity for col info data at: %s, %d", __func__, __LINE__);
×
1996
        taosMemoryFree(pCol);
×
1997
        break;
×
1998
      }
1999
      dst.columnData = pCol;
816,177✔
2000
      dst.numOfRows = 1;
816,177✔
2001
      dst.colAlloced = true;
816,177✔
2002
      code = streamCalcOneScalarExpr(pExpr, &dst, pStreamRuntimeInfo);
816,177✔
2003
      if (colDataIsNull_var(dst.columnData, 0)) {
816,177!
2004
        qInfo("invalid sub tb expr with null value");
×
2005
        code = TSDB_CODE_MND_STREAM_TBNAME_CALC_FAILED;
×
2006
      }
2007
      if (code == 0) {
816,177!
2008
        pVal = varDataVal(colDataGetVarData(dst.columnData, 0));
816,177✔
2009
        len = varDataLen(colDataGetVarData(dst.columnData, 0));
816,177✔
2010
      }
2011
    } break;
816,177✔
2012
    default:
×
2013
      qError("wrong subtable expr with type: %d", pExpr->type);
×
2014
      code = TSDB_CODE_OPS_NOT_SUPPORT;
×
2015
      break;
×
2016
  }
2017
  if (code == 0) {
816,177!
2018
    if (!pVal || len == 0) {
816,177!
2019
      qError("tbname generated with no characters which is not allowed");
×
2020
      code = TSDB_CODE_INVALID_PARA;
×
2021
    }
2022
    if(len > TSDB_TABLE_NAME_LEN - 1) {
816,177✔
2023
      qError("tbname generated with too long characters, max allowed is %d, got %d, truncated.", TSDB_TABLE_NAME_LEN - 1, len);
2,542!
2024
      len = TSDB_TABLE_NAME_LEN - 1;
2,542✔
2025
    }
2026

2027
    memcpy(tbname, pVal, len);
816,177!
2028
    tbname[len] = '\0';  // ensure null terminated
816,177✔
2029
    if (NULL != strchr(tbname, '.')) {
816,177!
2030
      code = TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME;
×
2031
      qError("tbname generated with invalid characters, '.' is not allowed");
×
2032
    }
2033
  }
2034
  // TODO free dst
2035
  sclFreeParam(&dst);
816,177✔
2036
  pStreamRuntimeInfo->curIdx = nextIdx; // restore
816,177✔
2037
  return code;
816,177✔
2038
}
2039

2040
void destroyStreamInserterParam(SStreamInserterParam* pParam) {
889,850✔
2041
  if (pParam) {
889,850!
2042
    if (pParam->tbname) {
889,850!
2043
      taosMemFree(pParam->tbname);
889,850✔
2044
      pParam->tbname = NULL;
889,850✔
2045
    }
2046
    if (pParam->stbname) {
889,850!
2047
      taosMemFree(pParam->stbname);
889,850✔
2048
      pParam->stbname = NULL;
889,850✔
2049
    }
2050
    if (pParam->dbFName) {
889,850!
2051
      taosMemFree(pParam->dbFName);
889,850✔
2052
      pParam->dbFName = NULL;
889,850✔
2053
    }
2054
    if (pParam->pFields) {
889,850!
2055
      taosArrayDestroy(pParam->pFields);
889,850✔
2056
      pParam->pFields = NULL;
889,850✔
2057
    }
2058
    if (pParam->pTagFields) {
889,850✔
2059
      taosArrayDestroy(pParam->pTagFields);
619,219✔
2060
      pParam->pTagFields = NULL;
619,219✔
2061
    }
2062
    taosMemFree(pParam);
889,850✔
2063
  }
2064
}
889,850✔
2065

2066
int32_t cloneStreamInserterParam(SStreamInserterParam** ppDst, SStreamInserterParam* pSrc) {
889,850✔
2067
  int32_t code = 0, lino = 0;
889,850✔
2068
  if (ppDst == NULL || pSrc == NULL) {
889,850!
2069
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA);
×
2070
  }
2071
  *ppDst = (SStreamInserterParam*)taosMemoryCalloc(1, sizeof(SStreamInserterParam));
889,850!
2072
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
889,850!
2073

2074
  (*ppDst)->suid = pSrc->suid;
889,850✔
2075
  (*ppDst)->sver = pSrc->sver;
889,850✔
2076
  (*ppDst)->tbType = pSrc->tbType;
889,750✔
2077
  (*ppDst)->tbname = taosStrdup(pSrc->tbname);
889,850!
2078
  TSDB_CHECK_NULL((*ppDst)->tbname, code, lino, _exit, terrno);
889,850!
2079

2080
  if (pSrc->stbname) {
889,850✔
2081
    (*ppDst)->stbname = taosStrdup(pSrc->stbname);
889,750!
2082
    TSDB_CHECK_NULL((*ppDst)->stbname, code, lino, _exit, terrno);
889,850!
2083
  }
2084

2085
  (*ppDst)->dbFName = taosStrdup(pSrc->dbFName);
889,950!
2086
  TSDB_CHECK_NULL((*ppDst)->dbFName, code, lino, _exit, terrno);
889,850!
2087

2088
  (*ppDst)->pSinkHandle = pSrc->pSinkHandle;  // don't need clone and free
889,850✔
2089

2090
  if (pSrc->pFields && pSrc->pFields->size > 0) {
889,750!
2091
    (*ppDst)->pFields = taosArrayDup(pSrc->pFields, NULL);
889,650✔
2092
    TSDB_CHECK_NULL((*ppDst)->pFields, code, lino, _exit, terrno);
889,850!
2093
  } else {
2094
    (*ppDst)->pFields = NULL;
200✔
2095
  }
2096
  
2097
  if (pSrc->pTagFields && pSrc->pTagFields->size > 0) {
889,850!
2098
    (*ppDst)->pTagFields = taosArrayDup(pSrc->pTagFields, NULL);
619,219✔
2099
    TSDB_CHECK_NULL((*ppDst)->pTagFields, code, lino, _exit, terrno);
619,219!
2100
  } else {
2101
    (*ppDst)->pTagFields = NULL;
270,631✔
2102
  }
2103

2104
_exit:
889,750✔
2105

2106
  if (code != 0) {
889,750!
2107
    if (*ppDst) {
×
2108
      destroyStreamInserterParam(*ppDst);
×
2109
      *ppDst = NULL;
×
2110
    }
2111
    
2112
    stError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2113
  }
2114
  return code;
889,850✔
2115
}
2116

2117
int32_t dropStreamTable(SMsgCb* pMsgCb, void* pOutput, SSTriggerDropRequest* pReq) {
1,309✔
2118
  return doDropStreamTable(pMsgCb, pOutput, pReq);
1,309✔
2119
}
2120

2121
int32_t dropStreamTableByTbName(SMsgCb* pMsgCb, void* pOutput, SSTriggerDropRequest* pReq, char* tbName) {
×
2122
  return doDropStreamTableByTbName(pMsgCb, pOutput, pReq, tbName);
×
2123
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc