• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4835

31 Oct 2025 03:37AM UTC coverage: 58.506% (-0.3%) from 58.764%
#4835

push

travis-ci

SallyHuo-TAOS
Merge remote-tracking branch 'origin/cover/3.0' into cover/3.0

# Conflicts:
#	test/ci/run.sh

149104 of 324176 branches covered (45.99%)

Branch coverage included in aggregate %.

198232 of 269498 relevant lines covered (73.56%)

236558065.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.81
/source/libs/executor/src/executor.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include <stdint.h>
18
#include "cmdnodes.h"
19
#include "dataSinkInt.h"
20
#include "executil.h"
21
#include "executorInt.h"
22
#include "libs/new-stream/stream.h"
23
#include "operator.h"
24
#include "osMemPool.h"
25
#include "osMemory.h"
26
#include "planner.h"
27
#include "query.h"
28
#include "querytask.h"
29
#include "storageapi.h"
30
#include "streamexecutorInt.h"
31
#include "tdatablock.h"
32
#include "tref.h"
33
#include "trpc.h"
34
#include "tudf.h"
35
#include "wal.h"
36

37
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
38
int32_t             exchangeObjRefPool = -1;
39
SGlobalExecInfo     gExecInfo = {0};
40

41
void gExecInfoInit(void* pDnode, getDnodeId_f getDnodeId, getMnodeEpset_f getMnode) {
2,293,851✔
42
  gExecInfo.dnode = pDnode;
2,293,851✔
43
  gExecInfo.getMnode = getMnode;
2,293,851✔
44
  gExecInfo.getDnodeId = getDnodeId;
2,293,851✔
45
  return;
2,293,851✔
46
}
47

48
int32_t getCurrentMnodeEpset(SEpSet* pEpSet) {
170,057✔
49
  if (gExecInfo.dnode == NULL || gExecInfo.getMnode == NULL) {
170,057!
50
    qError("gExecInfo is not initialized");
×
51
    return TSDB_CODE_APP_ERROR;
×
52
  }
53
  gExecInfo.getMnode(gExecInfo.dnode, pEpSet);
170,057✔
54
  return TSDB_CODE_SUCCESS;
170,057✔
55
}
56

57
static void cleanupRefPool() {
2,179,068✔
58
  int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
2,179,068✔
59
  taosCloseRef(ref);
2,179,068✔
60
}
2,179,068✔
61

62
static void initRefPool() {
2,179,068✔
63
  exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo);
2,179,068✔
64
  (void)atexit(cleanupRefPool);
2,179,068!
65
}
2,179,068✔
66

67
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
×
68
  int32_t code = TSDB_CODE_SUCCESS;
×
69
  int32_t lino = 0;
×
70
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
×
71
    if (pOperator->numOfDownstream == 0) {
×
72
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
×
73
      return TSDB_CODE_APP_ERROR;
×
74
    }
75

76
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
×
77
      qError("join not supported for stream block scan, %s" PRIx64, id);
×
78
      return TSDB_CODE_APP_ERROR;
×
79
    }
80
    pOperator->status = OP_NOT_OPENED;
×
81
    return doSetSMABlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
×
82
  } else {
83
    pOperator->status = OP_NOT_OPENED;
×
84

85
    SStreamScanInfo* pInfo = pOperator->info;
×
86

87
    if (type == STREAM_INPUT__MERGED_SUBMIT) {
×
88
      for (int32_t i = 0; i < numOfBlocks; i++) {
×
89
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
×
90
        void*        tmp = taosArrayPush(pInfo->pBlockLists, pReq);
×
91
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
92
      }
93
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
×
94
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
×
95
      void* tmp = taosArrayPush(pInfo->pBlockLists, &input);
×
96
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
97
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
×
98
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
×
99
      for (int32_t i = 0; i < numOfBlocks; ++i) {
×
100
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
×
101
        SPackedData  tmp = {.pDataBlock = pDataBlock};
×
102
        void*        tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
×
103
        QUERY_CHECK_NULL(tmpItem, code, lino, _end, terrno);
×
104
      }
105
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
×
106
    } else if (type == STREAM_INPUT__CHECKPOINT) {
×
107
      SPackedData tmp = {.pDataBlock = input};
×
108
      void*       tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
×
109
      QUERY_CHECK_NULL(tmpItem, code, lino, _end, terrno);
×
110
      pInfo->blockType = STREAM_INPUT__CHECKPOINT;
×
111
    } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
×
112
      for (int32_t i = 0; i < numOfBlocks; ++i) {
×
113
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
×
114
        void*        tmp = taosArrayPush(pInfo->pBlockLists, pReq);
×
115
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
116
      }
117
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
×
118
    }
119

120
    return TSDB_CODE_SUCCESS;
×
121
  }
122

123
_end:
×
124
  if (code != TSDB_CODE_SUCCESS) {
×
125
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
126
  }
127
  return code;
×
128
}
129

130
static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
×
131
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
×
132
    if (pOperator->numOfDownstream == 0) {
×
133
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
×
134
      return TSDB_CODE_APP_ERROR;
×
135
    }
136

137
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
×
138
      qError("join not supported for stream block scan, %s" PRIx64, id);
×
139
      return TSDB_CODE_APP_ERROR;
×
140
    }
141

142
    pOperator->status = OP_NOT_OPENED;
×
143
    return doSetStreamOpOpen(pOperator->pDownstream[0], id);
×
144
  }
145
  return 0;
×
146
}
147

148
int32_t doSetTaskId(SOperatorInfo* pOperator, SStorageAPI* pAPI) {
127,395,743✔
149
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
127,395,743✔
150
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
127,402,722✔
151
    SStreamScanInfo* pStreamScanInfo = pOperator->info;
53,348,288✔
152
    if (pStreamScanInfo->pTableScanOp != NULL) {
53,348,288!
153
      STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info;
53,348,288✔
154
      if (pScanInfo->base.dataReader != NULL) {
53,347,417✔
155
        int32_t code = pAPI->tsdReader.tsdSetReaderTaskId(pScanInfo->base.dataReader, pTaskInfo->id.str);
789,102✔
156
        if (code) {
789,102!
157
          qError("failed to set reader id for executor, code:%s", tstrerror(code));
×
158
          return code;
×
159
        }
160
      }
161
    }
162
  } else {
163
    if (pOperator->pDownstream) return doSetTaskId(pOperator->pDownstream[0], pAPI);
74,051,421✔
164
  }
165

166
  return 0;
71,908,842✔
167
}
168

169
int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
71,910,161✔
170
  SExecTaskInfo* pTaskInfo = tinfo;
71,910,161✔
171
  pTaskInfo->id.queryId = queryId;
71,910,161✔
172
  buildTaskId(taskId, queryId, pTaskInfo->id.str, 64);
71,911,038✔
173

174
  // set the idstr for tsdbReader
175
  return doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI);
71,910,161✔
176
}
177

178
bool qTaskIsDone(qTaskInfo_t tinfo) {
×
179
  SExecTaskInfo* pTaskInfo = tinfo;
×
180
  return pTaskInfo->status == OP_EXEC_DONE;
×
181
}
182

183
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
×
184
  if (tinfo == NULL) {
×
185
    return TSDB_CODE_APP_ERROR;
×
186
  }
187

188
  if (pBlocks == NULL || numOfBlocks == 0) {
×
189
    return TSDB_CODE_SUCCESS;
×
190
  }
191

192
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
193

194
  int32_t code = doSetSMABlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
×
195
  if (code != TSDB_CODE_SUCCESS) {
×
196
    qError("%s failed to set the sma block data", GET_TASKID(pTaskInfo));
×
197
  } else {
198
    qDebug("%s set the sma block successfully", GET_TASKID(pTaskInfo));
×
199
  }
200

201
  return code;
×
202
}
203

204
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols,
1,194,342✔
205
                                     uint64_t id) {
206
  if (msg == NULL) {  // create raw scan
1,194,342✔
207
    SExecTaskInfo* pTaskInfo = NULL;
295,798✔
208

209
    int32_t code = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, &pReaderHandle->api, &pTaskInfo);
295,798✔
210
    if (NULL == pTaskInfo || code != 0) {
295,798!
211
      return NULL;
×
212
    }
213

214
    code = createTmqRawScanOperatorInfo(pReaderHandle, pTaskInfo, &pTaskInfo->pRoot);
295,798✔
215
    if (NULL == pTaskInfo->pRoot || code != 0) {
295,798!
216
      taosMemoryFree(pTaskInfo);
×
217
      return NULL;
×
218
    }
219

220
    pTaskInfo->storageAPI = pReaderHandle->api;
295,798✔
221
    qDebug("create raw scan task info completed, vgId:%d, %s", vgId, GET_TASKID(pTaskInfo));
295,798!
222
    return pTaskInfo;
295,798✔
223
  }
224

225
  SSubplan* pPlan = NULL;
898,544✔
226
  int32_t   code = qStringToSubplan(msg, &pPlan);
908,080✔
227
  if (code != TSDB_CODE_SUCCESS) {
908,949!
228
    terrno = code;
×
229
    return NULL;
×
230
  }
231

232
  qTaskInfo_t pTaskInfo = NULL;
908,949✔
233
  code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_QUEUE);
908,949✔
234
  if (code != TSDB_CODE_SUCCESS) {
909,803!
235
    qDestroyTask(pTaskInfo);
×
236
    terrno = code;
×
237
    return NULL;
×
238
  }
239

240
  // extract the number of output columns
241
  SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc;
909,803✔
242
  *numOfCols = 0;
909,803✔
243

244
  SNode* pNode;
245
  FOREACH(pNode, pDescNode->pSlots) {
8,524,707!
246
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
7,614,904✔
247
    if (pSlotDesc->output) {
7,614,904✔
248
      ++(*numOfCols);
7,613,210✔
249
    }
250
  }
251

252
  return pTaskInfo;
909,803✔
253
}
254

255
static int32_t checkInsertParam(SStreamInserterParam* streamInserterParam) {
2,106,635✔
256
  if (streamInserterParam == NULL) {
2,106,635✔
257
    return TSDB_CODE_SUCCESS;
1,083,623✔
258
  }
259

260
  if (streamInserterParam->tbType == TSDB_SUPER_TABLE && streamInserterParam->suid <= 0) {
1,023,012!
261
    stError("insertParam: invalid suid:%" PRIx64 " for child table", streamInserterParam->suid);
×
262
    return TSDB_CODE_INVALID_PARA;
×
263
  }
264

265
  if (streamInserterParam->dbFName == NULL || strlen(streamInserterParam->dbFName) == 0) {
1,023,012!
266
    stError("insertParam: invalid db/table name");
×
267
    return TSDB_CODE_INVALID_PARA;
×
268
  }
269

270
  if (streamInserterParam->suid <= 0 &&
1,023,012✔
271
      (streamInserterParam->tbname == NULL || strlen(streamInserterParam->tbname) == 0)) {
367,360!
272
    stError("insertParam: invalid table name, suid:%" PRIx64 "", streamInserterParam->suid);
×
273
    return TSDB_CODE_INVALID_PARA;
×
274
  }
275

276
  return TSDB_CODE_SUCCESS;
1,023,012✔
277
}
278

279
static int32_t qCreateStreamExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
2,106,635✔
280
                                     qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql,
281
                                     EOPTR_EXEC_MODEL model, SStreamInserterParam* streamInserterParam) {
282
  if (pSubplan == NULL || pTaskInfo == NULL) {
2,106,635!
283
    qError("invalid parameter, pSubplan:%p, pTaskInfo:%p", pSubplan, pTaskInfo);
×
284
    nodesDestroyNode((SNode *)pSubplan);
×
285
    return TSDB_CODE_INVALID_PARA;
×
286
  }
287
  int32_t lino = 0;
2,106,635✔
288
  int32_t code = checkInsertParam(streamInserterParam);
2,106,635✔
289
  if (code != TSDB_CODE_SUCCESS) {
2,106,635!
290
    qError("invalid stream inserter param, code:%s", tstrerror(code));
×
291
    nodesDestroyNode((SNode *)pSubplan);
×
292
    return code;
×
293
  }
294
  SInserterParam* pInserterParam = NULL;
2,106,635✔
295
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
2,106,635✔
296
  (void)taosThreadOnce(&initPoolOnce, initRefPool);
2,106,635✔
297
  qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
2,106,635✔
298

299
  code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model);
2,106,635✔
300
  if (code != TSDB_CODE_SUCCESS || NULL == *pTask) {
2,105,343✔
301
    qError("failed to createExecTaskInfo, code:%s", tstrerror(code));
3,853!
302
    goto _error;
2,575✔
303
  }
304

305
  if (streamInserterParam) {
2,102,782✔
306
    SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50, .compress = compressResult};
1,023,012✔
307
    void*           pSinkManager = NULL;
1,023,012✔
308
    code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
1,021,734✔
309
    if (code != TSDB_CODE_SUCCESS) {
1,021,734!
310
      qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
×
311
      goto _error;
×
312
    }
313

314
    pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
1,021,734!
315
    if (NULL == pInserterParam) {
1,023,012!
316
      qError("failed to taosMemoryCalloc, code:%s, %s", tstrerror(terrno), (*pTask)->id.str);
×
317
      code = terrno;
×
318
      goto _error;
×
319
    }
320
    code = cloneStreamInserterParam(&pInserterParam->streamInserterParam, streamInserterParam);
1,023,012✔
321
    TSDB_CHECK_CODE(code, lino, _error);
1,023,012!
322
    
323
    pInserterParam->readHandle = taosMemCalloc(1, sizeof(SReadHandle));
1,023,012✔
324
    pInserterParam->readHandle->pMsgCb = readHandle->pMsgCb;
1,023,012✔
325

326
    code = createStreamDataInserter(pSinkManager, handle, pInserterParam);
1,023,012✔
327
    if (code) {
1,023,012!
328
      qError("failed to createStreamDataInserter, code:%s, %s", tstrerror(code), (*pTask)->id.str);
×
329
    }
330
  }
331
  qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64 " code:%s", taskId, pSubplan->id.queryId,
2,102,768✔
332
         tstrerror(code));
333

334
_error:
544,397✔
335

336
  if (code != TSDB_CODE_SUCCESS) {
2,105,343✔
337
    qError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
2,575!
338
    if (pInserterParam != NULL) {
2,575!
339
      taosMemoryFree(pInserterParam);
×
340
    }
341
  }
342
  return code;
2,105,343✔
343
}
344

345
bool qNeedReset(qTaskInfo_t pInfo) {
16,475,393✔
346
  if (pInfo == NULL) {
16,475,393!
347
    return false;
×
348
  }
349
  SExecTaskInfo*  pTaskInfo = (SExecTaskInfo*)pInfo;
16,475,393✔
350
  SOperatorInfo*  pOperator = pTaskInfo->pRoot;
16,475,393✔
351
  if (pOperator == NULL || pOperator->pPhyNode == NULL) {
16,475,393!
352
    return false;
15,396✔
353
  }
354
  int32_t node = nodeType(pOperator->pPhyNode);
16,459,997✔
355
  return (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == node || 
15,289,630✔
356
          QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == node ||
31,749,627!
357
          QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == node);
358
}
359

360
static void setReadHandle(SReadHandle* pHandle, STableScanBase* pScanBaseInfo) {
13,788,562✔
361
  if (pHandle == NULL || pScanBaseInfo == NULL) {
13,788,562!
362
    return;
×
363
  }
364

365
  pScanBaseInfo->readHandle.uid = pHandle->uid;
13,788,562✔
366
  pScanBaseInfo->readHandle.winRangeValid = pHandle->winRangeValid;
13,788,562!
367
  pScanBaseInfo->readHandle.winRange = pHandle->winRange;
13,788,562✔
368
  pScanBaseInfo->readHandle.extWinRangeValid = pHandle->extWinRangeValid;
13,788,562!
369
  pScanBaseInfo->readHandle.extWinRange = pHandle->extWinRange;
13,788,562✔
370
  pScanBaseInfo->readHandle.version = pHandle->version;
13,788,562✔
371
}
372

373
int32_t qResetTableScan(qTaskInfo_t pInfo, SReadHandle* handle) {
16,459,997✔
374
  if (pInfo == NULL) {
16,459,997!
375
    return TSDB_CODE_INVALID_PARA;
×
376
  }
377
  SExecTaskInfo*  pTaskInfo = (SExecTaskInfo*)pInfo;
16,459,997✔
378
  SOperatorInfo*  pOperator = pTaskInfo->pRoot;
16,459,997✔
379

380
  void*           info = pOperator->info;
16,459,997✔
381
  STableScanBase* pScanBaseInfo = NULL;
16,459,997✔
382

383
  if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pOperator->pPhyNode)) {
16,459,997✔
384
    pScanBaseInfo = &((STableScanInfo*)info)->base;
1,170,367✔
385
    setReadHandle(handle, pScanBaseInfo);
1,170,367✔
386
  } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(pOperator->pPhyNode)) {
15,289,630✔
387
    pScanBaseInfo = &((STableMergeScanInfo*)info)->base;
12,618,195✔
388
    setReadHandle(handle, pScanBaseInfo);
12,618,195✔
389
  }
390

391
  qDebug("reset table scan, name:%s, id:%s, time range: [%" PRId64 ", %" PRId64 "]", pOperator->name, GET_TASKID(pTaskInfo), handle->winRange.skey,
16,459,997✔
392
  handle->winRange.ekey);
393
  return pOperator->fpSet.resetStateFn(pOperator);
16,459,997✔
394
}
395

396
int32_t qCreateStreamExecTaskInfo(qTaskInfo_t* pTaskInfo, void* msg, SReadHandle* readers,
2,106,635✔
397
                                  SStreamInserterParam* pInserterParams, int32_t vgId, int32_t taskId) {
398
  if (msg == NULL) {
2,106,635!
399
    return TSDB_CODE_INVALID_PARA;
×
400
  }
401

402
  *pTaskInfo = NULL;
2,106,635✔
403

404
  SSubplan* pPlan = NULL;
2,106,635✔
405
  int32_t   code = qStringToSubplan(msg, &pPlan);
2,106,635✔
406
  if (code != TSDB_CODE_SUCCESS) {
2,106,635!
407
    nodesDestroyNode((SNode *)pPlan);
×
408
    return code;
×
409
  }
410
  // todo: add stream inserter param
411
  code = qCreateStreamExecTask(readers, vgId, taskId, pPlan, pTaskInfo,
2,106,635✔
412
                               pInserterParams ? &pInserterParams->pSinkHandle : NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM,
413
                               pInserterParams);
414
  if (code != TSDB_CODE_SUCCESS) {
2,105,343✔
415
    qDestroyTask(*pTaskInfo);
2,575✔
416
    return code;
2,575✔
417
  }
418

419
  return code;
2,102,768✔
420
}
421

422
static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr,
1,071,573✔
423
                                       SStorageAPI* pAPI, SArray** ppArrayRes) {
424
  int32_t code = TSDB_CODE_SUCCESS;
1,071,573✔
425
  int32_t lino = 0;
1,071,573✔
426
  SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
1,071,573✔
427
  QUERY_CHECK_NULL(qa, code, lino, _error, terrno);
1,071,573!
428
  int32_t numOfUids = taosArrayGetSize(tableIdList);
1,071,573✔
429
  if (numOfUids == 0) {
1,071,573!
430
    (*ppArrayRes) = qa;
×
431
    goto _error;
×
432
  }
433

434
  STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
1,071,573✔
435

436
  uint64_t suid = 0;
1,071,573✔
437
  uint64_t uid = 0;
1,071,573✔
438
  int32_t  type = 0;
1,071,573✔
439
  tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &suid, &uid, &type);
1,071,573✔
440

441
  // let's discard the tables those are not created according to the queried super table.
442
  SMetaReader mr = {0};
1,071,573✔
443
  pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
1,071,573✔
444
  for (int32_t i = 0; i < numOfUids; ++i) {
2,187,588✔
445
    uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
1,116,015✔
446
    QUERY_CHECK_NULL(id, code, lino, _end, terrno);
1,116,015!
447

448
    int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr, *id);
1,116,015✔
449
    if (code != TSDB_CODE_SUCCESS) {
1,116,015!
450
      qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
×
451
      continue;
×
452
    }
453

454
    tDecoderClear(&mr.coder);
1,116,015✔
455

456
    if (mr.me.type == TSDB_SUPER_TABLE) {
1,116,015!
457
      continue;
×
458
    } else {
459
      if (type == TSDB_SUPER_TABLE) {
1,116,015!
460
        // this new created child table does not belong to the scanned super table.
461
        if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != suid) {
1,116,015!
462
          continue;
×
463
        }
464
      } else {  // ordinary table
465
        // In case that the scanned target table is an ordinary table. When replay the WAL during restore the vnode, we
466
        // should check all newly created ordinary table to make sure that this table isn't the destination table.
467
        if (mr.me.uid != uid) {
×
468
          continue;
×
469
        }
470
      }
471
    }
472

473
    if (pScanInfo->pTagCond != NULL) {
1,116,015✔
474
      bool          qualified = false;
976,438✔
475
      STableKeyInfo info = {.groupId = 0, .uid = mr.me.uid};
976,438✔
476
      code = isQualifiedTable(&info, pScanInfo->pTagCond, pScanInfo->readHandle.vnode, &qualified, pAPI);
976,438✔
477
      if (code != TSDB_CODE_SUCCESS) {
976,438!
478
        qError("failed to filter new table, uid:0x%" PRIx64 ", %s", info.uid, idstr);
×
479
        continue;
×
480
      }
481

482
      if (!qualified) {
976,438!
483
        continue;
488,230✔
484
      }
485
    }
486

487
    // handle multiple partition
488
    void* tmp = taosArrayPush(qa, id);
627,785✔
489
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
627,785!
490
  }
491

492
_end:
1,071,573✔
493

494
  pAPI->metaReaderFn.clearReader(&mr);
1,071,573✔
495
  (*ppArrayRes) = qa;
1,071,573✔
496

497
_error:
1,071,573✔
498
  if (code != TSDB_CODE_SUCCESS) {
1,071,573!
499
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
500
  }
501
  return code;
1,071,573✔
502
}
503

504
int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
1,077,767✔
505
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1,077,767✔
506
  const char*    id = GET_TASKID(pTaskInfo);
1,077,767✔
507
  int32_t        code = 0;
1,077,767✔
508

509
  if (isAdd) {
1,077,767✔
510
    qDebug("try to add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), id);
1,071,573!
511
  }
512

513
  // traverse to the stream scanner node to add this table id
514
  SOperatorInfo* pInfo = NULL;
1,077,767✔
515
  code = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pInfo);
1,077,767✔
516
  if (code != 0 || pInfo == NULL) {
1,077,767!
517
    return code;
×
518
  }
519

520
  SStreamScanInfo* pScanInfo = pInfo->info;
1,077,767✔
521
  if (pInfo->pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {  // clear meta cache for subscription if tag is changed
1,077,767!
522
    for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
2,196,352✔
523
      int64_t*        uid = (int64_t*)taosArrayGet(tableIdList, i);
1,118,585✔
524
      STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
1,118,585✔
525
      taosLRUCacheErase(pTableScanInfo->base.metaCache.pTableMetaEntryCache, uid, LONG_BYTES);
1,118,585✔
526
    }
527
  }
528

529
  if (isAdd) {  // add new table id
1,077,767✔
530
    SArray* qa = NULL;
1,071,573✔
531
    code = filterUnqualifiedTables(pScanInfo, tableIdList, id, &pTaskInfo->storageAPI, &qa);
1,071,573✔
532
    if (code != TSDB_CODE_SUCCESS) {
1,071,573!
533
      taosArrayDestroy(qa);
×
534
      return code;
×
535
    }
536
    int32_t numOfQualifiedTables = taosArrayGetSize(qa);
1,071,573✔
537
    qDebug("%d qualified child tables added into stream scanner, %s", numOfQualifiedTables, id);
1,071,573!
538
    pTaskInfo->storageAPI.tqReaderFn.tqReaderAddTables(pScanInfo->tqReader, qa);
1,071,573✔
539

540
    bool   assignUid = false;
1,071,573✔
541
    size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
1,071,573!
542
    char*  keyBuf = NULL;
1,071,573✔
543
    if (bufLen > 0) {
1,071,573!
544
      assignUid = groupbyTbname(pScanInfo->pGroupTags);
×
545
      keyBuf = taosMemoryMalloc(bufLen);
×
546
      if (keyBuf == NULL) {
×
547
        taosArrayDestroy(qa);
×
548
        return terrno;
×
549
      }
550
    }
551

552
    STableListInfo* pTableListInfo = ((STableScanInfo*)pScanInfo->pTableScanOp->info)->base.pTableListInfo;
1,071,573✔
553
    taosWLockLatch(&pTaskInfo->lock);
1,071,573✔
554

555
    for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
1,699,358✔
556
      uint64_t* uid = taosArrayGet(qa, i);
627,785✔
557
      if (!uid) {
627,785!
558
        taosMemoryFree(keyBuf);
×
559
        taosArrayDestroy(qa);
×
560
        taosWUnLockLatch(&pTaskInfo->lock);
×
561
        return terrno;
×
562
      }
563
      STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
627,785✔
564

565
      if (bufLen > 0) {
627,785!
566
        if (assignUid) {
×
567
          keyInfo.groupId = keyInfo.uid;
×
568
        } else {
569
          code = getGroupIdFromTagsVal(pScanInfo->readHandle.vnode, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
×
570
                                       &keyInfo.groupId, &pTaskInfo->storageAPI);
571
          if (code != TSDB_CODE_SUCCESS) {
×
572
            taosMemoryFree(keyBuf);
×
573
            taosArrayDestroy(qa);
×
574
            taosWUnLockLatch(&pTaskInfo->lock);
×
575
            return code;
×
576
          }
577
        }
578
      }
579

580
      code = tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
627,785✔
581
      if (code != TSDB_CODE_SUCCESS) {
627,785!
582
        taosMemoryFree(keyBuf);
×
583
        taosArrayDestroy(qa);
×
584
        taosWUnLockLatch(&pTaskInfo->lock);
×
585
        return code;
×
586
      }
587
    }
588

589
    taosWUnLockLatch(&pTaskInfo->lock);
1,071,573✔
590
    if (keyBuf != NULL) {
1,071,573!
591
      taosMemoryFree(keyBuf);
×
592
    }
593

594
    taosArrayDestroy(qa);
1,071,573✔
595
  } else {  // remove the table id in current list
596
    qDebug("%d remove child tables from the stream scanner, %s", (int32_t)taosArrayGetSize(tableIdList), id);
6,194!
597
    taosWLockLatch(&pTaskInfo->lock);
6,194✔
598
    pTaskInfo->storageAPI.tqReaderFn.tqReaderRemoveTables(pScanInfo->tqReader, tableIdList);
6,194✔
599
    taosWUnLockLatch(&pTaskInfo->lock);
6,194✔
600
  }
601

602
  return code;
1,077,767✔
603
}
604

605
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, int32_t dbNameBuffLen, char* tableName,
1,849,536,719✔
606
                                    int32_t tbaleNameBuffLen, int32_t* sversion, int32_t* tversion, int32_t* rversion,
607
                                    int32_t idx, bool* tbGet) {
608
  *tbGet = false;
1,849,536,719✔
609

610
  if (tinfo == NULL || dbName == NULL || tableName == NULL) {
1,849,657,502!
611
    return TSDB_CODE_INVALID_PARA;
15✔
612
  }
613
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1,849,787,759✔
614

615
  if (taosArrayGetSize(pTaskInfo->schemaInfos) <= idx) {
1,849,787,759✔
616
    return TSDB_CODE_SUCCESS;
1,078,532,975✔
617
  }
618

619
  SSchemaInfo* pSchemaInfo = taosArrayGet(pTaskInfo->schemaInfos, idx);
771,314,059✔
620
  if (!pSchemaInfo) {
771,106,196!
621
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
622
    return terrno;
×
623
  }
624

625
  *sversion = pSchemaInfo->sw->version;
771,106,196✔
626
  *tversion = pSchemaInfo->tversion;
771,466,296✔
627
  *rversion = pSchemaInfo->rversion;
771,407,617✔
628
  if (pSchemaInfo->dbname) {
771,217,580!
629
    tstrncpy(dbName, pSchemaInfo->dbname, dbNameBuffLen);
771,281,458!
630
  } else {
631
    dbName[0] = 0;
×
632
  }
633
  if (pSchemaInfo->tablename) {
771,553,650!
634
    tstrncpy(tableName, pSchemaInfo->tablename, tbaleNameBuffLen);
771,418,617!
635
  } else {
636
    tableName[0] = 0;
×
637
  }
638

639
  *tbGet = true;
771,560,679✔
640

641
  return TSDB_CODE_SUCCESS;
771,435,305✔
642
}
643

644
bool qIsDynamicExecTask(qTaskInfo_t tinfo) { return ((SExecTaskInfo*)tinfo)->dynamicTask; }
1,078,356,583✔
645

646
void qDestroyOperatorParam(SOperatorParam* pParam) {
×
647
  if (NULL == pParam) {
×
648
    return;
×
649
  }
650
  freeOperatorParam(pParam, OP_GET_PARAM);
×
651
}
652

653
void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) {
93,607,836✔
654
  TSWAP(pParam, ((SExecTaskInfo*)tinfo)->pOpParam);
93,607,836✔
655
  ((SExecTaskInfo*)tinfo)->paramSet = false;
93,607,836✔
656
}
93,607,836✔
657

658
int32_t qExecutorInit(void) {
14,203,023✔
659
  (void)taosThreadOnce(&initPoolOnce, initRefPool);
14,203,023✔
660
  return TSDB_CODE_SUCCESS;
14,203,923✔
661
}
662

663
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
1,120,768,248✔
664
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql,
665
                        EOPTR_EXEC_MODEL model) {
666
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
1,120,768,248✔
667
  (void)taosThreadOnce(&initPoolOnce, initRefPool);
1,120,768,248✔
668

669
  qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
1,120,833,061✔
670

671
  readHandle->uid = 0;
1,120,966,526✔
672
  int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model);
1,121,094,943✔
673
  if (code != TSDB_CODE_SUCCESS || NULL == *pTask) {
1,120,087,432✔
674
    qError("failed to createExecTaskInfo, code:%s", tstrerror(code));
129,419!
675
    goto _error;
27,825✔
676
  }
677

678
  if (handle) {
1,120,237,726✔
679
    SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50, .compress = compressResult};
1,119,314,205✔
680
    void*           pSinkManager = NULL;
1,119,252,442✔
681
    code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
1,119,070,511✔
682
    if (code != TSDB_CODE_SUCCESS) {
1,118,884,077!
683
      qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
×
684
      goto _error;
×
685
    }
686

687
    void* pSinkParam = NULL;
1,118,884,077✔
688
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, (*pTask), readHandle);
1,119,151,150✔
689
    if (code != TSDB_CODE_SUCCESS) {
1,118,712,582!
690
      qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
×
691
      taosMemoryFree(pSinkManager);
×
692
      goto _error;
×
693
    }
694

695
    SDataSinkNode* pSink = NULL;
1,118,712,582✔
696
    if (readHandle->localExec) {
1,119,039,691✔
697
      code = nodesCloneNode((SNode*)pSubplan->pDataSink, (SNode**)&pSink);
35,788,546✔
698
      if (code != TSDB_CODE_SUCCESS) {
35,794,873!
699
        qError("failed to nodesCloneNode, srcType:%d, code:%s, %s", nodeType(pSubplan->pDataSink), tstrerror(code),
×
700
               (*pTask)->id.str);
701
        taosMemoryFree(pSinkManager);
×
702
        goto _error;
×
703
      }
704
    }
705

706
    // pSinkParam has been freed during create sinker.
707
    code = dsCreateDataSinker(pSinkManager, readHandle->localExec ? &pSink : &pSubplan->pDataSink, handle, pSinkParam,
1,119,074,937!
708
                              (*pTask)->id.str, pSubplan->processOneBlock);
1,119,527,859✔
709
    if (code) {
1,118,681,283✔
710
      qError("s-task:%s failed to create data sinker, code:%s", (*pTask)->id.str, tstrerror(code));
1,497!
711
    }
712
  }
713

714
  qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64 " code:%s", taskId, pSubplan->id.queryId,
1,120,019,575✔
715
         tstrerror(code));
716

717
_error:
121,853,456✔
718
  // if failed to add ref for all tables in this query, abort current query
719
  return code;
1,120,635,975✔
720
}
721

722
static void freeBlock(void* param) {
×
723
  SSDataBlock* pBlock = *(SSDataBlock**)param;
×
724
  blockDataDestroy(pBlock);
×
725
}
×
726

727
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal,
1,270,611,891✔
728
                     bool processOneBlock) {
729
  int32_t        code = TSDB_CODE_SUCCESS;
1,270,611,891✔
730
  int32_t        lino = 0;
1,270,611,891✔
731
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1,270,611,891✔
732
  int64_t        threadId = taosGetSelfPthreadId();
1,270,611,891✔
733

734
  if (pLocal) {
1,270,549,138✔
735
    memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
1,252,945,614!
736
  }
737

738
  taosArrayClear(pResList);
1,270,266,146✔
739

740
  int64_t curOwner = 0;
1,270,403,809✔
741
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
1,270,403,809!
742
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
×
743
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
×
744
    return pTaskInfo->code;
×
745
  }
746

747
  if (pTaskInfo->cost.start == 0) {
1,270,363,001✔
748
    pTaskInfo->cost.start = taosGetTimestampUs();
1,085,290,450✔
749
  }
750

751
  if (isTaskKilled(pTaskInfo)) {
1,270,568,963✔
752
    atomic_store_64(&pTaskInfo->owner, 0);
4,253✔
753
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
4,253!
754
    return pTaskInfo->code;
4,253✔
755
  }
756

757
  // error occurs, record the error code and return to client
758
  int32_t ret = setjmp(pTaskInfo->env);
1,270,533,893✔
759
  if (ret != TSDB_CODE_SUCCESS) {
1,270,362,495✔
760
    pTaskInfo->code = ret;
182,603✔
761
    (void)cleanUpUdfs();
182,603✔
762

763
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
182,603✔
764
    atomic_store_64(&pTaskInfo->owner, 0);
182,603✔
765

766
    return pTaskInfo->code;
182,603✔
767
  }
768

769
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
1,270,179,892✔
770

771
  int32_t      current = 0;
1,270,184,536✔
772
  SSDataBlock* pRes = NULL;
1,270,184,536✔
773
  int64_t      st = taosGetTimestampUs();
1,270,563,109✔
774

775
  if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
1,270,563,109!
776
    pTaskInfo->paramSet = true;
93,607,836✔
777
    code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes);
93,607,836✔
778
  } else {
779
    code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
1,176,942,871✔
780
  }
781

782
  QUERY_CHECK_CODE(code, lino, _end);
1,270,504,403!
783
  code = blockDataCheck(pRes);
1,270,504,403✔
784
  QUERY_CHECK_CODE(code, lino, _end);
1,270,441,436!
785

786
  if (pRes == NULL) {
1,270,441,436✔
787
    st = taosGetTimestampUs();
286,993,331✔
788
  }
789

790
  int32_t rowsThreshold = pTaskInfo->pSubplan->rowsThreshold;
1,270,445,100✔
791
  if (!pTaskInfo->pSubplan->dynamicRowThreshold || 4096 <= pTaskInfo->pSubplan->rowsThreshold) {
1,270,520,565!
792
    rowsThreshold = 4096;
1,259,731,822✔
793
  }
794

795
  int32_t blockIndex = 0;
1,270,443,323✔
796
  while (pRes != NULL) {
2,147,483,647✔
797
    SSDataBlock* p = NULL;
2,147,483,647✔
798
    if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) {
2,147,483,647✔
799
      SSDataBlock* p1 = NULL;
2,147,483,647✔
800
      code = createOneDataBlock(pRes, true, &p1);
2,147,483,647✔
801
      QUERY_CHECK_CODE(code, lino, _end);
2,147,483,647!
802

803
      void* tmp = taosArrayPush(pTaskInfo->pResultBlockList, &p1);
2,147,483,647✔
804
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2,147,483,647!
805
      p = p1;
2,147,483,647✔
806
    } else {
807
      void* tmp = taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
1,177,686,575✔
808
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,177,701,496!
809

810
      p = *(SSDataBlock**)tmp;
1,177,701,496✔
811
      code = copyDataBlock(p, pRes);
1,177,702,176✔
812
      QUERY_CHECK_CODE(code, lino, _end);
1,177,564,127!
813
    }
814

815
    blockIndex += 1;
2,147,483,647✔
816

817
    current += p->info.rows;
2,147,483,647✔
818
    QUERY_CHECK_CONDITION((p->info.rows > 0 || p->info.type == STREAM_CHECKPOINT), code, lino, _end,
2,147,483,647!
819
                          TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
820
    void* tmp = taosArrayPush(pResList, &p);
2,147,483,647✔
821
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2,147,483,647!
822

823
    if (current >= rowsThreshold || processOneBlock) {
2,147,483,647✔
824
      break;
825
    }
826

827
    code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
2,147,483,647✔
828
    QUERY_CHECK_CODE(code, lino, _end);
2,147,483,647!
829
    code = blockDataCheck(pRes);
2,147,483,647✔
830
    QUERY_CHECK_CODE(code, lino, _end);
2,147,483,647!
831
  }
832

833
  if (pTaskInfo->pSubplan->dynamicRowThreshold) {
1,270,501,218!
834
    pTaskInfo->pSubplan->rowsThreshold -= current;
10,744,483✔
835
  }
836

837
  *hasMore = (pRes != NULL);
1,270,530,536✔
838
  uint64_t el = (taosGetTimestampUs() - st);
1,270,431,322✔
839

840
  pTaskInfo->cost.elapsedTime += el;
1,270,431,322✔
841
  if (NULL == pRes) {
1,270,445,672✔
842
    *useconds = pTaskInfo->cost.elapsedTime;
1,113,498,878✔
843
  }
844

845
_end:
1,270,922,912✔
846
  (void)cleanUpUdfs();
1,270,308,580✔
847

848
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
1,270,623,712✔
849
  qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
1,270,624,343✔
850
         GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
851

852
  atomic_store_64(&pTaskInfo->owner, 0);
1,270,622,975✔
853
  if (code) {
1,270,624,747!
854
    pTaskInfo->code = code;
×
855
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
856
  }
857

858
  return pTaskInfo->code;
1,270,624,747✔
859
}
860

861
void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
×
862
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
863
  SArray*        pList = pTaskInfo->pResultBlockList;
×
864
  size_t         num = taosArrayGetSize(pList);
×
865
  for (int32_t i = 0; i < num; ++i) {
×
866
    SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i);
×
867
    if (p) {
×
868
      blockDataDestroy(*p);
×
869
    }
870
  }
871

872
  taosArrayClear(pTaskInfo->pResultBlockList);
×
873
}
×
874

875
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
319,927,509✔
876
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
319,927,509✔
877
  int64_t        threadId = taosGetSelfPthreadId();
319,927,509✔
878
  int64_t        curOwner = 0;
319,927,509✔
879

880
  *pRes = NULL;
319,927,509✔
881

882
  // todo extract method
883
  taosRLockLatch(&pTaskInfo->lock);
319,927,509✔
884
  bool isKilled = isTaskKilled(pTaskInfo);
319,931,133✔
885
  if (isKilled) {
319,931,133!
886
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
×
887

888
    taosRUnLockLatch(&pTaskInfo->lock);
×
889
    return pTaskInfo->code;
×
890
  }
891

892
  if (pTaskInfo->owner != 0) {
319,931,133!
893
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
×
894
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
×
895

896
    taosRUnLockLatch(&pTaskInfo->lock);
×
897
    return pTaskInfo->code;
×
898
  }
899

900
  pTaskInfo->owner = threadId;
319,927,509✔
901
  taosRUnLockLatch(&pTaskInfo->lock);
319,929,372✔
902

903
  if (pTaskInfo->cost.start == 0) {
319,926,603✔
904
    pTaskInfo->cost.start = taosGetTimestampUs();
676,007✔
905
  }
906

907
  // error occurs, record the error code and return to client
908
  int32_t ret = setjmp(pTaskInfo->env);
319,933,851✔
909
  if (ret != TSDB_CODE_SUCCESS) {
319,923,968!
910
    pTaskInfo->code = ret;
×
911
    (void)cleanUpUdfs();
×
912
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
×
913
    atomic_store_64(&pTaskInfo->owner, 0);
×
914
    return pTaskInfo->code;
×
915
  }
916

917
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
319,923,968✔
918

919
  int64_t st = taosGetTimestampUs();
319,920,261✔
920
  int32_t code = TSDB_CODE_SUCCESS;
319,920,261✔
921
  if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
319,920,261!
922
    pTaskInfo->paramSet = true;
×
923
    code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, pRes);
×
924
  } else {
925
    code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, pRes);
319,921,167✔
926
  }
927
  if (code) {
319,849,958!
928
    pTaskInfo->code = code;
×
929
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
930
  }
931

932
  code = blockDataCheck(*pRes);
319,849,958✔
933
  if (code) {
319,914,476!
934
    pTaskInfo->code = code;
×
935
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
936
  }
937

938
  uint64_t el = (taosGetTimestampUs() - st);
319,839,840✔
939

940
  pTaskInfo->cost.elapsedTime += el;
319,839,840✔
941
  if (NULL == *pRes) {
319,908,746✔
942
    *useconds = pTaskInfo->cost.elapsedTime;
49,104,528✔
943
  }
944

945
  (void)cleanUpUdfs();
319,908,776✔
946

947
  int32_t  current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
319,929,490✔
948
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
319,934,757✔
949

950
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
319,930,268!
951
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
952

953
  atomic_store_64(&pTaskInfo->owner, 0);
319,923,020✔
954
  return pTaskInfo->code;
319,933,851✔
955
}
956

957
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
451,387,018✔
958
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
451,387,018✔
959
  void* tmp = taosArrayPush(pTaskInfo->stopInfo.pStopInfo, pInfo);
451,390,931✔
960
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
451,391,980✔
961

962
  if (!tmp) {
451,386,145!
963
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
964
    return terrno;
×
965
  }
966
  return TSDB_CODE_SUCCESS;
451,386,145✔
967
}
968

969
int32_t stopInfoComp(void const* lp, void const* rp) {
×
970
  SExchangeOpStopInfo* key = (SExchangeOpStopInfo*)lp;
×
971
  SExchangeOpStopInfo* pInfo = (SExchangeOpStopInfo*)rp;
×
972

973
  if (key->refId < pInfo->refId) {
×
974
    return -1;
×
975
  } else if (key->refId > pInfo->refId) {
×
976
    return 1;
×
977
  }
978

979
  return 0;
×
980
}
981

982
void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
×
983
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
×
984
  int32_t idx = taosArraySearchIdx(pTaskInfo->stopInfo.pStopInfo, pInfo, stopInfoComp, TD_EQ);
×
985
  if (idx >= 0) {
×
986
    taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx);
×
987
  }
988
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
×
989
}
×
990

991
void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
216,648✔
992
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
216,648✔
993

994
  int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo);
216,648✔
995
  for (int32_t i = 0; i < num; ++i) {
243,927✔
996
    SExchangeOpStopInfo* pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i);
27,279✔
997
    if (!pStop) {
27,279!
998
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
999
      continue;
×
1000
    }
1001
    SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId);
27,279✔
1002
    if (pExchangeInfo) {
27,279!
1003
      int32_t code = tsem_post(&pExchangeInfo->ready);
27,279✔
1004
      if (code != TSDB_CODE_SUCCESS) {
27,279!
1005
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1006
      } else {
1007
        qDebug("post to exchange %" PRId64 " to stop", pStop->refId);
27,279!
1008
      }
1009
      code = taosReleaseRef(exchangeObjRefPool, pStop->refId);
27,279✔
1010
      if (code != TSDB_CODE_SUCCESS) {
27,279!
1011
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1012
      }
1013
    }
1014
  }
1015

1016
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
216,648✔
1017
}
216,648✔
1018

1019
int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
216,648✔
1020
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
216,648✔
1021
  if (pTaskInfo == NULL) {
216,648!
1022
    return TSDB_CODE_QRY_INVALID_QHANDLE;
×
1023
  }
1024

1025
  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
216,648!
1026

1027
  setTaskKilled(pTaskInfo, rspCode);
216,648✔
1028
  qStopTaskOperators(pTaskInfo);
216,648✔
1029

1030
  return TSDB_CODE_SUCCESS;
216,648✔
1031
}
1032

1033
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration) {
×
1034
  int64_t        st = taosGetTimestampMs();
×
1035
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
1036
  if (pTaskInfo == NULL) {
×
1037
    return TSDB_CODE_QRY_INVALID_QHANDLE;
×
1038
  }
1039

1040
  if (waitDuration > 0) {
×
1041
    qDebug("%s sync killed execTask, and waiting for at most %.2fs", GET_TASKID(pTaskInfo), waitDuration / 1000.0);
×
1042
  } else {
1043
    qDebug("%s async killed execTask", GET_TASKID(pTaskInfo));
×
1044
  }
1045

1046
  setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
×
1047

1048
  if (waitDuration > 0) {
×
1049
    while (1) {
1050
      taosWLockLatch(&pTaskInfo->lock);
×
1051
      if (qTaskIsExecuting(pTaskInfo)) {  // let's wait for 100 ms and try again
×
1052
        taosWUnLockLatch(&pTaskInfo->lock);
×
1053

1054
        taosMsleep(200);
×
1055

1056
        int64_t d = taosGetTimestampMs() - st;
×
1057
        if (d >= waitDuration && waitDuration >= 0) {
×
1058
          qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitDuration / 1000.0);
×
1059
          return TSDB_CODE_SUCCESS;
×
1060
        }
1061
      } else {  // not running now
1062
        pTaskInfo->code = rspCode;
×
1063
        taosWUnLockLatch(&pTaskInfo->lock);
×
1064
        return TSDB_CODE_SUCCESS;
×
1065
      }
1066
    }
1067
  }
1068

1069
  int64_t et = taosGetTimestampMs() - st;
×
1070
  if (et < waitDuration) {
×
1071
    qInfo("%s  waiting %.2fs for executor stopping", GET_TASKID(pTaskInfo), et / 1000.0);
×
1072
    return TSDB_CODE_SUCCESS;
×
1073
  }
1074
  return TSDB_CODE_SUCCESS;
×
1075
}
1076

1077
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
×
1078
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
×
1079
  if (NULL == pTaskInfo) {
×
1080
    return false;
×
1081
  }
1082

1083
  return 0 != atomic_load_64(&pTaskInfo->owner);
×
1084
}
1085

1086
static void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
1,123,467,805✔
1087
  STaskCostInfo* pSummary = &pTaskInfo->cost;
1,123,467,805✔
1088
  int64_t        idleTime = pSummary->start - pSummary->created;
1,123,490,926✔
1089

1090
  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
1,123,462,670✔
1091
  if (pSummary->pRecoder != NULL) {
1,123,452,959✔
1092
    qDebug(
751,002,028✔
1093
        "%s :cost summary: idle:%.2f ms, elapsed time:%.2f ms, extract tableList:%.2f ms, "
1094
        "createGroupIdMap:%.2f ms, total blocks:%d, "
1095
        "load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
1096
        GET_TASKID(pTaskInfo), idleTime / 1000.0, pSummary->elapsedTime / 1000.0, pSummary->extractListTime,
1097
        pSummary->groupIdMapTime, pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks,
1098
        pRecorder->totalRows, pRecorder->totalCheckedRows);
1099
  } else {
1100
    qDebug("%s :cost summary: idle in queue:%.2f ms, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), idleTime / 1000.0,
372,427,004✔
1101
           pSummary->elapsedTime / 1000.0);
1102
  }
1103
}
1,123,429,617✔
1104

1105
void qDestroyTask(qTaskInfo_t qTaskHandle) {
1,139,959,909✔
1106
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
1,139,959,909✔
1107
  if (pTaskInfo == NULL) {
1,139,959,909✔
1108
    return;
16,503,420✔
1109
  }
1110

1111
  if (pTaskInfo->pRoot != NULL) {
1,123,456,489!
1112
    qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
1,123,485,529✔
1113
  } else {
1114
    qDebug("%s execTask completed", GET_TASKID(pTaskInfo));
×
1115
  }
1116

1117
  printTaskExecCostInLog(pTaskInfo);  // print the query cost summary
1,123,487,022✔
1118
  doDestroyTask(pTaskInfo);
1,123,463,725✔
1119
}
1120

1121
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
19,453,057✔
1122
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
19,453,057✔
1123
  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
19,453,057✔
1124
}
1125

1126
void qExtractTmqScanner(qTaskInfo_t tinfo, void** scanner) {
909,803✔
1127
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
909,803✔
1128
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
909,803✔
1129

1130
  while (1) {
887,994✔
1131
    uint16_t type = pOperator->operatorType;
1,797,797✔
1132
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
1,797,797✔
1133
      *scanner = pOperator->info;
909,803✔
1134
      break;
909,803✔
1135
    } else {
1136
      pOperator = pOperator->pDownstream[0];
887,994✔
1137
    }
1138
  }
1139
}
909,803✔
1140

1141
static int32_t getOpratorIntervalInfo(SOperatorInfo* pOperator, int64_t* pWaterMark, SInterval* pInterval,
×
1142
                                      STimeWindow* pLastWindow, TSKEY* pRecInteral) {
1143
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
×
1144
    return getOpratorIntervalInfo(pOperator->pDownstream[0], pWaterMark, pInterval, pLastWindow, pRecInteral);
×
1145
  }
1146
  SStreamScanInfo* pScanOp = (SStreamScanInfo*)pOperator->info;
×
1147
  *pWaterMark = pScanOp->twAggSup.waterMark;
×
1148
  *pInterval = pScanOp->interval;
×
1149
  *pLastWindow = pScanOp->lastScanRange;
×
1150
  *pRecInteral = pScanOp->recalculateInterval;
×
1151
  return TSDB_CODE_SUCCESS;
×
1152
}
1153

1154
void* qExtractReaderFromTmqScanner(void* scanner) {
909,803✔
1155
  SStreamScanInfo* pInfo = scanner;
909,803✔
1156
  return (void*)pInfo->tqReader;
909,803✔
1157
}
1158

1159
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) {
2,149,133✔
1160
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2,149,133✔
1161
  return pTaskInfo->streamInfo.schema;
2,149,133✔
1162
}
1163

1164
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
2,149,133✔
1165
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2,149,133✔
1166
  return pTaskInfo->streamInfo.tbName;
2,149,133✔
1167
}
1168

1169
SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
2,258,846✔
1170
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2,258,846✔
1171
  return &pTaskInfo->streamInfo.btMetaRsp;
2,258,846✔
1172
}
1173

1174
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
55,041,052✔
1175
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
55,041,052✔
1176
  tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset);
55,041,052✔
1177
  return 0;
55,041,958✔
1178
  /*if (code != TSDB_CODE_SUCCESS) {
1179
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
1180
    pTaskInfo->code = code;
1181
    T_LONG_JMP(pTaskInfo->env, code);
1182
  }*/
1183
}
1184

1185
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
2,150,020✔
1186
  memset(pCond, 0, sizeof(SQueryTableDataCond));
2,150,020!
1187
  pCond->order = TSDB_ORDER_ASC;
2,150,020✔
1188
  pCond->numOfCols = pMtInfo->schema->nCols;
2,145,745✔
1189
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
2,153,440!
1190
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
2,144,890!
1191
  if (pCond->colList == NULL || pCond->pSlotList == NULL) {
2,150,875!
1192
    taosMemoryFreeClear(pCond->colList);
2,565!
1193
    taosMemoryFreeClear(pCond->pSlotList);
×
1194
    return terrno;
×
1195
  }
1196

1197
  TAOS_SET_OBJ_ALIGNED(&pCond->twindows, TSWINDOW_INITIALIZER);
2,150,875✔
1198
  pCond->suid = pMtInfo->suid;
2,150,875✔
1199
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
2,153,440✔
1200
  pCond->startVersion = -1;
2,153,440✔
1201
  pCond->endVersion = sContext->snapVersion;
2,153,440✔
1202

1203
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
13,557,718✔
1204
    SColumnInfo* pColInfo = &pCond->colList[i];
11,408,566✔
1205
    pColInfo->type = pMtInfo->schema->pSchema[i].type;
11,409,421✔
1206
    pColInfo->bytes = pMtInfo->schema->pSchema[i].bytes;
11,412,841✔
1207
    if (pMtInfo->pExtSchemas != NULL) {
11,409,421✔
1208
      decimalFromTypeMod(pMtInfo->pExtSchemas[i].typeMod, &pColInfo->precision, &pColInfo->scale);
136,494✔
1209
    }
1210
    pColInfo->colId = pMtInfo->schema->pSchema[i].colId;
11,410,276✔
1211
    pColInfo->pk = pMtInfo->schema->pSchema[i].flags & COL_IS_KEY;
11,405,146✔
1212

1213
    pCond->pSlotList[i] = i;
11,398,306✔
1214
  }
1215

1216
  return TSDB_CODE_SUCCESS;
2,154,295✔
1217
}
1218

1219
void qStreamSetOpen(qTaskInfo_t tinfo) {
315,522,406✔
1220
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
315,522,406✔
1221
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
315,522,406✔
1222
  pOperator->status = OP_NOT_OPENED;
315,590,085✔
1223
}
315,603,111✔
1224

1225
void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded) {
52,725,456✔
1226
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
52,725,456✔
1227
  pTaskInfo->streamInfo.sourceExcluded = sourceExcluded;
52,725,456✔
1228
}
52,727,206✔
1229

1230
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
55,642,369✔
1231
  int32_t        code = TSDB_CODE_SUCCESS;
55,642,369✔
1232
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
55,642,369✔
1233
  SStorageAPI*   pAPI = &pTaskInfo->storageAPI;
55,642,369✔
1234

1235
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
55,645,919✔
1236
  const char*    id = GET_TASKID(pTaskInfo);
55,644,183✔
1237

1238
  if (subType == TOPIC_SUB_TYPE__COLUMN && pOffset->type == TMQ_OFFSET__LOG) {
55,642,408✔
1239
    code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
51,827,897✔
1240
    if (pOperator == NULL || code != 0) {
51,821,732!
1241
      return code;
×
1242
    }
1243

1244
    SStreamScanInfo* pInfo = pOperator->info;
51,822,558✔
1245
    SStoreTqReader*  pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
51,821,732✔
1246
    SWalReader*      pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
51,822,639✔
1247
    walReaderVerifyOffset(pWalReader, pOffset);
51,833,254✔
1248
  }
1249
  // if pOffset equal to current offset, means continue consume
1250
  if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) {
55,644,205✔
1251
    return 0;
50,480,121✔
1252
  }
1253

1254
  if (subType == TOPIC_SUB_TYPE__COLUMN) {
5,161,352✔
1255
    code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
2,960,626✔
1256
    if (pOperator == NULL || code != 0) {
2,962,371!
1257
      return code;
×
1258
    }
1259

1260
    SStreamScanInfo* pInfo = pOperator->info;
2,962,371✔
1261
    STableScanInfo*  pScanInfo = pInfo->pTableScanOp->info;
2,961,494✔
1262
    STableScanBase*  pScanBaseInfo = &pScanInfo->base;
2,962,410✔
1263
    STableListInfo*  pTableListInfo = pScanBaseInfo->pTableListInfo;
2,963,278✔
1264

1265
    if (pOffset->type == TMQ_OFFSET__LOG) {
2,962,410✔
1266
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pScanBaseInfo->dataReader);
2,235,590✔
1267
      pScanBaseInfo->dataReader = NULL;
2,232,910✔
1268

1269
      SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
2,232,910✔
1270
      SWalReader*     pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
2,233,778✔
1271
      walReaderVerifyOffset(pWalReader, pOffset);
2,234,693✔
1272
      code = pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version, id);
2,236,467✔
1273
      if (code < 0) {
2,235,561✔
1274
        qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version, id);
22,644!
1275
        return code;
23,550✔
1276
      }
1277
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
725,937!
1278
      // iterate all tables from tableInfoList, and retrieve rows from each table one-by-one
1279
      // those data are from the snapshot in tsdb, besides the data in the wal file.
1280
      int64_t uid = pOffset->uid;
725,937✔
1281
      int64_t ts = pOffset->ts;
725,937✔
1282
      int32_t index = 0;
725,937✔
1283

1284
      // this value may be changed if new tables are created
1285
      taosRLockLatch(&pTaskInfo->lock);
725,937✔
1286
      int32_t numOfTables = 0;
726,811✔
1287
      code = tableListGetSize(pTableListInfo, &numOfTables);
726,811✔
1288
      if (code != TSDB_CODE_SUCCESS) {
726,811!
1289
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1290
        taosRUnLockLatch(&pTaskInfo->lock);
×
1291
        return code;
×
1292
      }
1293

1294
      if (uid == 0) {
726,811✔
1295
        if (numOfTables != 0) {
709,497✔
1296
          STableKeyInfo* tmp = tableListGetInfo(pTableListInfo, 0);
116,853✔
1297
          if (!tmp) {
116,853!
1298
            qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1299
            taosRUnLockLatch(&pTaskInfo->lock);
×
1300
            return terrno;
×
1301
          }
1302
          if (tmp) uid = tmp->uid;
116,853!
1303
          ts = INT64_MIN;
116,853✔
1304
          pScanInfo->currentTable = 0;
116,853✔
1305
        } else {
1306
          taosRUnLockLatch(&pTaskInfo->lock);
592,644✔
1307
          qError("no table in table list, %s", id);
592,644!
1308
          return TSDB_CODE_TMQ_NO_TABLE_QUALIFIED;
592,644✔
1309
        }
1310
      }
1311
      pTaskInfo->storageAPI.tqReaderFn.tqSetTablePrimaryKey(pInfo->tqReader, uid);
134,167✔
1312

1313
      qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% " PRId64 " rows returned", uid, ts,
134,167!
1314
             pInfo->pTableScanOp->resultInfo.totalRows);
1315
      pInfo->pTableScanOp->resultInfo.totalRows = 0;
134,167✔
1316

1317
      // start from current accessed position
1318
      // we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start
1319
      // position, let's find it from the beginning.
1320
      index = tableListFind(pTableListInfo, uid, 0);
134,167✔
1321
      taosRUnLockLatch(&pTaskInfo->lock);
134,167✔
1322

1323
      if (index >= 0) {
134,167!
1324
        pScanInfo->currentTable = index;
134,167✔
1325
      } else {
1326
        qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
×
1327
               numOfTables, pScanInfo->currentTable, id);
1328
        return TSDB_CODE_TMQ_NO_TABLE_QUALIFIED;
×
1329
      }
1330

1331
      STableKeyInfo keyInfo = {.uid = uid};
134,167✔
1332
      int64_t       oldSkey = pScanBaseInfo->cond.twindows.skey;
134,167✔
1333

1334
      // let's start from the next ts that returned to consumer.
1335
      if (pTaskInfo->storageAPI.tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader)) {
134,167!
1336
        pScanBaseInfo->cond.twindows.skey = ts;
×
1337
      } else {
1338
        pScanBaseInfo->cond.twindows.skey = ts + 1;
134,167✔
1339
      }
1340
      pScanInfo->scanTimes = 0;
134,167✔
1341

1342
      if (pScanBaseInfo->dataReader == NULL) {
134,167✔
1343
        code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond,
228,394✔
1344
                                                             &keyInfo, 1, pScanInfo->pResBlock,
1345
                                                             (void**)&pScanBaseInfo->dataReader, id, NULL);
114,197✔
1346
        if (code != TSDB_CODE_SUCCESS) {
114,197!
1347
          qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
×
1348
          return code;
×
1349
        }
1350

1351
        qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s",
114,197!
1352
               uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
1353
      } else {
1354
        code = pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
19,970✔
1355
        if (code != TSDB_CODE_SUCCESS) {
19,970!
1356
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1357
          return code;
×
1358
        }
1359

1360
        code = pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
19,970✔
1361
        if (code != TSDB_CODE_SUCCESS) {
19,970!
1362
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1363
          return code;
×
1364
        }
1365
        qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 "  table index:%d numOfTable:%d, %s",
19,970!
1366
               uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
1367
      }
1368

1369
      // restore the key value
1370
      pScanBaseInfo->cond.twindows.skey = oldSkey;
134,167✔
1371
    } else {
1372
      qError("invalid pOffset->type:%d, %s", pOffset->type, id);
×
1373
      return TSDB_CODE_PAR_INTERNAL_ERROR;
×
1374
    }
1375

1376
  } else {  // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB
1377
    if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
2,200,726✔
1378
      SStreamRawScanInfo* pInfo = pOperator->info;
2,157,654✔
1379
      SSnapContext*       sContext = pInfo->sContext;
2,157,654✔
1380
      SOperatorInfo*      p = NULL;
2,157,654✔
1381

1382
      code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id, &p);
2,157,654✔
1383
      if (code != 0) {
2,157,654!
1384
        return code;
×
1385
      }
1386

1387
      STableListInfo* pTableListInfo = ((SStreamRawScanInfo*)(p->info))->pTableListInfo;
2,157,654✔
1388

1389
      if (pAPI->snapshotFn.setForSnapShot(sContext, pOffset->uid) != 0) {
2,157,654!
1390
        qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id);
×
1391
        return TSDB_CODE_PAR_INTERNAL_ERROR;
×
1392
      }
1393

1394
      SMetaTableInfo mtInfo = {0};
2,157,654✔
1395
      code = pTaskInfo->storageAPI.snapshotFn.getMetaTableInfoFromSnapshot(sContext, &mtInfo);
2,157,654✔
1396
      if (code != 0) {
2,153,379!
1397
        destroyMetaTableInfo(&mtInfo);
1398
        return code;
×
1399
      }
1400
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
2,153,379✔
1401
      pInfo->dataReader = NULL;
2,149,104✔
1402

1403
      cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
2,151,634✔
1404
      tableListClear(pTableListInfo);
2,150,766✔
1405

1406
      if (mtInfo.uid == 0) {
2,157,654✔
1407
        destroyMetaTableInfo(&mtInfo);
1408
        goto end;  // no data
3,359✔
1409
      }
1410

1411
      pAPI->snapshotFn.taosXSetTablePrimaryKey(sContext, mtInfo.uid);
2,154,295✔
1412
      code = initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
2,146,600✔
1413
      if (code != TSDB_CODE_SUCCESS) {
2,146,600!
1414
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1415
        destroyMetaTableInfo(&mtInfo);
1416
        return code;
×
1417
      }
1418
      if (pAPI->snapshotFn.taosXGetTablePrimaryKey(sContext)) {
2,146,600!
1419
        pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
×
1420
      } else {
1421
        pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts + 1;
2,148,310✔
1422
      }
1423

1424
      code = tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0);
2,153,440✔
1425
      if (code != TSDB_CODE_SUCCESS) {
2,154,295!
1426
        destroyMetaTableInfo(&mtInfo);
1427
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1428
        return code;
×
1429
      }
1430

1431
      STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
2,154,295✔
1432
      if (!pList) {
2,154,295!
1433
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1434
        destroyMetaTableInfo(&mtInfo);
1435
        return code;
×
1436
      }
1437
      int32_t size = 0;
2,154,295✔
1438
      code = tableListGetSize(pTableListInfo, &size);
2,154,295✔
1439
      if (code != TSDB_CODE_SUCCESS) {
2,154,295!
1440
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1441
        destroyMetaTableInfo(&mtInfo);
1442
        return code;
×
1443
      }
1444

1445
      code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size,
4,308,590✔
1446
                                                           NULL, (void**)&pInfo->dataReader, NULL, NULL);
2,154,295✔
1447
      if (code != TSDB_CODE_SUCCESS) {
2,144,877!
1448
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1449
        destroyMetaTableInfo(&mtInfo);
1450
        return code;
×
1451
      }
1452

1453
      cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
2,144,877✔
1454
      tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN);
2,149,152!
1455
      //      pTaskInfo->streamInfo.suid = mtInfo.suid == 0 ? mtInfo.uid : mtInfo.suid;
1456
      tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema);
2,139,734✔
1457
      pTaskInfo->streamInfo.schema = mtInfo.schema;
2,143,167✔
1458
      taosMemoryFreeClear(mtInfo.pExtSchemas);
2,143,154!
1459

1460
      qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64 " %s", mtInfo.uid, pOffset->ts, id);
2,143,154✔
1461
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
43,072✔
1462
      SStreamRawScanInfo* pInfo = pOperator->info;
15,490✔
1463
      SSnapContext*       sContext = pInfo->sContext;
15,490✔
1464
      code = pTaskInfo->storageAPI.snapshotFn.setForSnapShot(sContext, pOffset->uid);
15,490✔
1465
      if (code != 0) {
15,490!
1466
        qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
×
1467
        return code;
×
1468
      }
1469
      qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts,
15,490!
1470
             id);
1471
    } else if (pOffset->type == TMQ_OFFSET__LOG) {
27,582!
1472
      SStreamRawScanInfo* pInfo = pOperator->info;
27,582✔
1473
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
27,582✔
1474
      pInfo->dataReader = NULL;
27,582✔
1475
      qDebug("tmqsnap qStreamPrepareScan snapshot log, %s", id);
27,582!
1476
    }
1477
  }
1478

1479
end:
4,547,740✔
1480
  tOffsetCopy(&pTaskInfo->streamInfo.currentOffset, pOffset);
4,547,810✔
1481
  return 0;
4,547,810✔
1482
}
1483

1484
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
762,950,996✔
1485
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
762,950,996✔
1486
  if (pMsg->info.ahandle == NULL) {
763,051,014!
1487
    qError("pMsg->info.ahandle is NULL");
×
1488
    return;
×
1489
  }
1490

1491
  qDebug("rsp msg got, code:%x, len:%d, 0x%" PRIx64 ":0x%" PRIx64, 
762,875,743✔
1492
      pMsg->code, pMsg->contLen, TRACE_GET_ROOTID(&pMsg->info.traceId), TRACE_GET_MSGID(&pMsg->info.traceId));
1493

1494
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};
762,898,288✔
1495

1496
  if (pMsg->contLen > 0) {
762,917,862✔
1497
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
762,879,436!
1498
    if (buf.pData == NULL) {
762,879,812!
1499
      pMsg->code = terrno;
×
1500
    } else {
1501
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
762,879,812!
1502
    }
1503
  }
1504

1505
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
763,049,150✔
1506
  rpcFreeCont(pMsg->pCont);
763,100,248✔
1507
  destroySendMsgInfo(pSendInfo);
762,992,757✔
1508
}
1509

1510
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
×
1511
  int32_t        code = TSDB_CODE_SUCCESS;
×
1512
  int32_t        lino = 0;
×
1513
  SExecTaskInfo* pTaskInfo = tinfo;
×
1514
  SArray*        plist = NULL;
×
1515

1516
  code = getTableListInfo(pTaskInfo, &plist);
×
1517
  if (code || plist == NULL) {
×
1518
    return NULL;
×
1519
  }
1520

1521
  // only extract table in the first elements
1522
  STableListInfo* pTableListInfo = taosArrayGetP(plist, 0);
×
1523

1524
  SArray* pUidList = taosArrayInit(10, sizeof(uint64_t));
×
1525
  QUERY_CHECK_NULL(pUidList, code, lino, _end, terrno);
×
1526

1527
  int32_t numOfTables = 0;
×
1528
  code = tableListGetSize(pTableListInfo, &numOfTables);
×
1529
  QUERY_CHECK_CODE(code, lino, _end);
×
1530

1531
  for (int32_t i = 0; i < numOfTables; ++i) {
×
1532
    STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
×
1533
    QUERY_CHECK_NULL(pKeyInfo, code, lino, _end, terrno);
×
1534
    void* tmp = taosArrayPush(pUidList, &pKeyInfo->uid);
×
1535
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1536
  }
1537

1538
  taosArrayDestroy(plist);
×
1539

1540
_end:
×
1541
  if (code != TSDB_CODE_SUCCESS) {
×
1542
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1543
    T_LONG_JMP(pTaskInfo->env, code);
×
1544
  }
1545
  return pUidList;
×
1546
}
1547

1548
static int32_t extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
11,571,028✔
1549
  int32_t        code = TSDB_CODE_SUCCESS;
11,571,028✔
1550
  int32_t        lino = 0;
11,571,028✔
1551
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
11,571,028✔
1552

1553
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
11,572,865!
1554
    SStreamScanInfo* pScanInfo = pOperator->info;
×
1555
    STableScanInfo*  pTableScanInfo = pScanInfo->pTableScanOp->info;
×
1556

1557
    void* tmp = taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
×
1558
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1559
  } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
11,573,613✔
1560
    STableScanInfo* pScanInfo = pOperator->info;
5,787,362✔
1561

1562
    void* tmp = taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
5,787,362✔
1563
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
5,788,088!
1564
  } else {
1565
    if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
5,783,303!
1566
      code = extractTableList(pList, pOperator->pDownstream[0]);
5,785,888✔
1567
    }
1568
  }
1569

1570
_end:
×
1571
  if (code != TSDB_CODE_SUCCESS) {
11,572,865!
1572
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1573
  }
1574
  return code;
11,571,028✔
1575
}
1576

1577
int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList) {
5,786,251✔
1578
  if (pList == NULL) {
5,786,251!
1579
    return TSDB_CODE_INVALID_PARA;
×
1580
  }
1581

1582
  *pList = NULL;
5,786,251✔
1583
  SArray* pArray = taosArrayInit(0, POINTER_BYTES);
5,786,251✔
1584
  if (pArray == NULL) {
5,787,725!
1585
    return terrno;
×
1586
  }
1587

1588
  int32_t code = extractTableList(pArray, pTaskInfo->pRoot);
5,787,725✔
1589
  if (code == 0) {
5,784,777!
1590
    *pList = pArray;
5,784,777✔
1591
  } else {
1592
    taosArrayDestroy(pArray);
×
1593
  }
1594
  return code;
5,784,777✔
1595
}
1596

1597
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) {
×
1598
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
×
1599
  if (pTaskInfo->pRoot->fpSet.releaseStreamStateFn != NULL) {
×
1600
    pTaskInfo->pRoot->fpSet.releaseStreamStateFn(pTaskInfo->pRoot);
×
1601
  }
1602
  return 0;
×
1603
}
1604

1605
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo) {
×
1606
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
×
1607
  if (pTaskInfo->pRoot->fpSet.reloadStreamStateFn != NULL) {
×
1608
    pTaskInfo->pRoot->fpSet.reloadStreamStateFn(pTaskInfo->pRoot);
×
1609
  }
1610
  return 0;
×
1611
}
1612

1613
void qResetTaskCode(qTaskInfo_t tinfo) {
×
1614
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
1615

1616
  int32_t code = pTaskInfo->code;
×
1617
  pTaskInfo->code = 0;
×
1618
  qDebug("0x%" PRIx64 " reset task code to be success, prev:%s", pTaskInfo->id.taskId, tstrerror(code));
×
1619
}
×
1620

1621
int32_t collectExprsToReplaceForStream(SOperatorInfo* pOper, SArray* pExprs) {
×
1622
  int32_t code = 0;
×
1623
  return code;
×
1624
}
1625

1626
int32_t streamCollectExprsForReplace(qTaskInfo_t tInfo, SArray* pExprs) {
×
1627
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
×
1628
  int32_t        code = collectExprsToReplaceForStream(pTaskInfo->pRoot, pExprs);
×
1629
  return code;
×
1630
}
1631

1632
int32_t clearStatesForOperator(SOperatorInfo* pOper) {
100,693,340✔
1633
  int32_t code = 0;
100,693,340✔
1634

1635
  freeResetOperatorParams(pOper, OP_GET_PARAM, true);
100,693,340✔
1636
  freeResetOperatorParams(pOper, OP_NOTIFY_PARAM, true);
100,684,335✔
1637

1638
  if (pOper->fpSet.resetStateFn) {
100,684,360!
1639
    code = pOper->fpSet.resetStateFn(pOper);
100,685,671✔
1640
  }
1641
  pOper->status = OP_NOT_OPENED;
100,676,614✔
1642
  for (int32_t i = 0; i < pOper->numOfDownstream && code == 0; ++i) {
170,164,613!
1643
    code = clearStatesForOperator(pOper->pDownstream[i]);
69,471,296✔
1644
  }
1645
  return code;
100,697,175✔
1646
}
1647

1648
int32_t streamClearStatesForOperators(qTaskInfo_t tInfo) {
31,220,741✔
1649
  int32_t        code = 0;
31,220,741✔
1650
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
31,220,741✔
1651
  SOperatorInfo* pOper = pTaskInfo->pRoot;
31,220,741✔
1652
  pTaskInfo->code = TSDB_CODE_SUCCESS;
31,220,741✔
1653
  code = clearStatesForOperator(pOper);
31,220,741✔
1654
  return code;
31,219,449✔
1655
}
1656

1657
int32_t streamExecuteTask(qTaskInfo_t tInfo, SSDataBlock** ppRes, uint64_t* useconds, bool* finished) {
40,942,216✔
1658
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
40,942,216✔
1659
  int64_t        threadId = taosGetSelfPthreadId();
40,942,216✔
1660
  int64_t        curOwner = 0;
40,942,216✔
1661

1662
  *ppRes = NULL;
40,942,216✔
1663

1664
  // todo extract method
1665
  taosRLockLatch(&pTaskInfo->lock);
40,942,216✔
1666
  bool isKilled = isTaskKilled(pTaskInfo);
40,940,972✔
1667
  if (isKilled) {
40,942,216!
1668
    // clearStreamBlock(pTaskInfo->pRoot);
1669
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
×
1670

1671
    taosRUnLockLatch(&pTaskInfo->lock);
×
1672
    return pTaskInfo->code;
×
1673
  }
1674

1675
  if (pTaskInfo->owner != 0) {
40,942,216!
1676
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
×
1677
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
×
1678

1679
    taosRUnLockLatch(&pTaskInfo->lock);
×
1680
    return pTaskInfo->code;
×
1681
  }
1682

1683
  pTaskInfo->owner = threadId;
40,942,216✔
1684
  taosRUnLockLatch(&pTaskInfo->lock);
40,942,216✔
1685

1686
  if (pTaskInfo->cost.start == 0) {
40,942,216✔
1687
    pTaskInfo->cost.start = taosGetTimestampUs();
1,045,964✔
1688
  }
1689

1690
  // error occurs, record the error code and return to client
1691
  int32_t ret = setjmp(pTaskInfo->env);
40,942,216✔
1692
  if (ret != TSDB_CODE_SUCCESS) {
46,919,793✔
1693
    pTaskInfo->code = ret;
5,978,821✔
1694
    (void)cleanUpUdfs();
5,977,543✔
1695
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
5,978,821!
1696
    atomic_store_64(&pTaskInfo->owner, 0);
5,978,821✔
1697
    return pTaskInfo->code;
5,978,821✔
1698
  }
1699

1700
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
40,940,972✔
1701

1702
  int64_t st = taosGetTimestampUs();
40,940,938✔
1703

1704
  int32_t code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, ppRes);
40,940,938✔
1705
  if (code) {
34,963,395!
1706
    pTaskInfo->code = code;
×
1707
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
1708
  } else {
1709
    *finished = *ppRes == NULL;
34,963,395✔
1710
    code = blockDataCheck(*ppRes);
34,963,395✔
1711
  }
1712
  if (code) {
34,963,395!
1713
    pTaskInfo->code = code;
×
1714
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
1715
  }
1716

1717
  uint64_t el = (taosGetTimestampUs() - st);
34,963,395✔
1718

1719
  pTaskInfo->cost.elapsedTime += el;
34,963,395✔
1720
  if (NULL == *ppRes) {
34,962,103✔
1721
    *useconds = pTaskInfo->cost.elapsedTime;
24,123,331✔
1722
  }
1723

1724
  (void)cleanUpUdfs();
34,962,103✔
1725

1726
  int32_t  current = (*ppRes != NULL) ? (*ppRes)->info.rows : 0;
34,963,395✔
1727
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
34,963,395✔
1728

1729
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
34,963,395✔
1730
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
1731

1732
  atomic_store_64(&pTaskInfo->owner, 0);
34,963,395✔
1733
  return pTaskInfo->code;
34,963,395✔
1734
}
1735

1736
// void streamSetTaskRuntimeInfo(qTaskInfo_t tinfo, SStreamRuntimeInfo* pStreamRuntimeInfo) {
1737
//   SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1738
//   pTaskInfo->pStreamRuntimeInfo = pStreamRuntimeInfo;
1739
// }
1740

1741
int32_t qStreamCreateTableListForReader(void* pVnode, uint64_t suid, uint64_t uid, int8_t tableType,
17,158,232✔
1742
                                        SNodeList* pGroupTags, bool groupSort, SNode* pTagCond, SNode* pTagIndexCond,
1743
                                        SStorageAPI* storageAPI, void** pTableListInfo, SHashObj* groupIdMap) {
1744
  STableListInfo* pList = tableListCreate();
17,158,232✔
1745
  if (pList == NULL) {
17,167,027!
1746
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1747
    return terrno;
×
1748
  }
1749

1750
  SScanPhysiNode pScanNode = {.suid = suid, .uid = uid, .tableType = tableType};
17,167,027✔
1751
  SReadHandle    pHandle = {.vnode = pVnode};
17,168,271✔
1752
  SExecTaskInfo  pTaskInfo = {.id.str = "", .storageAPI = *storageAPI};
17,168,300✔
1753

1754
  int32_t code = createScanTableListInfo(&pScanNode, pGroupTags, groupSort, &pHandle, pList, pTagCond, pTagIndexCond, &pTaskInfo, groupIdMap);
17,169,544✔
1755
  if (code != 0) {
17,170,788!
1756
    tableListDestroy(pList);
×
1757
    qError("failed to createScanTableListInfo, code:%s", tstrerror(code));
×
1758
    return code;
×
1759
  }
1760
  *pTableListInfo = pList;
17,170,788✔
1761
  return 0;
17,170,788✔
1762
}
1763

1764
int32_t qStreamGetTableList(void* pTableListInfo, int32_t currentGroupId, STableKeyInfo** pKeyInfo, int32_t* size) {
41,917,817✔
1765
  if (pTableListInfo == NULL || pKeyInfo == NULL || size == NULL) {
41,917,817!
1766
    return TSDB_CODE_INVALID_PARA;
×
1767
  }
1768
  if (taosArrayGetSize(((STableListInfo*)pTableListInfo)->pTableList) == 0) {
41,922,856✔
1769
    *size = 0;
29,021,944✔
1770
    *pKeyInfo = NULL;
29,021,944✔
1771
    return 0;
29,020,673✔
1772
  }
1773
  if (currentGroupId == -1) {
12,900,910✔
1774
    *size = taosArrayGetSize(((STableListInfo*)pTableListInfo)->pTableList);
1,166,523✔
1775
    *pKeyInfo = taosArrayGet(((STableListInfo*)pTableListInfo)->pTableList, 0);
1,166,523✔
1776
    return 0;
1,166,523✔
1777
  }
1778
  return tableListGetGroupList(pTableListInfo, currentGroupId, pKeyInfo, size);
11,734,387✔
1779
}
1780

1781
int32_t  qStreamSetTableList(void** pTableListInfo, uint64_t uid, uint64_t gid){
9,967,963✔
1782
  if (*pTableListInfo == NULL) {
9,967,963✔
1783
    *pTableListInfo = tableListCreate();
1,556,306✔
1784
    if (*pTableListInfo == NULL) {
1,556,306!
1785
      return terrno;
×
1786
    }
1787
  }
1788
  return tableListAddTableInfo(*pTableListInfo, uid, gid);
9,967,963✔
1789
}
1790

1791
int32_t qStreamGetGroupIndex(void* pTableListInfo, int64_t gid) {
35,559,028✔
1792
  if (((STableListInfo*)pTableListInfo)->groupOffset == NULL){
35,559,028✔
1793
    return 0;
28,836,094✔
1794
  }
1795
  for (int32_t i = 0; i < ((STableListInfo*)pTableListInfo)->numOfOuputGroups; ++i) {
8,770,103✔
1796
    int32_t offset = ((STableListInfo*)pTableListInfo)->groupOffset[i];
8,518,049✔
1797

1798
    STableKeyInfo* pKeyInfo = taosArrayGet(((STableListInfo*)pTableListInfo)->pTableList, offset);
8,518,049✔
1799
    if (pKeyInfo != NULL && pKeyInfo->groupId == gid) {
8,518,049!
1800
      return i;
6,482,319✔
1801
    }
1802
  }
1803
  return -1;
252,054✔
1804
}
1805

1806
void qStreamDestroyTableList(void* pTableListInfo) { tableListDestroy(pTableListInfo); }
19,335,002✔
1807

1808
uint64_t qStreamGetGroupId(void* pTableListInfo, int64_t uid) { return tableListGetTableGroupId(pTableListInfo, uid); }
20,496,494✔
1809

1810
int32_t qStreamGetTableListGroupNum(const void* pTableList) { return ((STableListInfo*)pTableList)->numOfOuputGroups; }
5,900,661✔
1811
void    qStreamSetTableListGroupNum(const void* pTableList, int32_t groupNum) {((STableListInfo*)pTableList)->numOfOuputGroups = groupNum; }
1,556,306✔
1812
SArray* qStreamGetTableArrayList(const void* pTableList) { return ((STableListInfo*)pTableList)->pTableList; }
1,287,342✔
1813

1814
int32_t qStreamFilter(SSDataBlock* pBlock, void* pFilterInfo, SColumnInfoData** pRet) { return doFilter(pBlock, pFilterInfo, NULL, pRet); }
5,361,614✔
1815

1816
void streamDestroyExecTask(qTaskInfo_t tInfo) {
8,154,930✔
1817
  qDebug("streamDestroyExecTask called, task:%p", tInfo);
8,154,930✔
1818
  qDestroyTask(tInfo);
8,154,930✔
1819
}
8,154,930✔
1820

1821
int32_t streamCalcOneScalarExpr(SNode* pExpr, SScalarParam* pDst, const SStreamRuntimeFuncInfo* pExtraParams) {
2,508,284✔
1822
  return streamCalcOneScalarExprInRange(pExpr, pDst, -1, -1, pExtraParams);
2,508,284✔
1823
}
1824

1825
int32_t streamCalcOneScalarExprInRange(SNode* pExpr, SScalarParam* pDst, int32_t rowStartIdx, int32_t rowEndIdx,
2,572,988✔
1826
                                       const SStreamRuntimeFuncInfo* pExtraParams) {
1827
  int32_t      code = 0;
2,572,988✔
1828
  SNode*       pNode = 0;
2,572,988✔
1829
  SNodeList*   pList = NULL;
2,572,988✔
1830
  SExprInfo*   pExprInfo = NULL;
2,572,988✔
1831
  int32_t      numOfExprs = 1;
2,572,988✔
1832
  int32_t*     offset = 0;
2,572,988✔
1833
  STargetNode* pTargetNode = NULL;
2,572,988✔
1834
  code = nodesMakeNode(QUERY_NODE_TARGET, (SNode**)&pTargetNode);
2,571,735✔
1835
  if (code == 0) code = nodesCloneNode(pExpr, &pNode);
2,571,735!
1836

1837
  if (code == 0) {
2,572,988!
1838
    pTargetNode->dataBlockId = 0;
2,572,988✔
1839
    pTargetNode->pExpr = pNode;
2,572,988✔
1840
    pTargetNode->slotId = 0;
2,572,988✔
1841
  }
1842
  if (code == 0) {
2,572,988!
1843
    code = nodesMakeList(&pList);
2,572,988✔
1844
  }
1845
  if (code == 0) {
2,572,988!
1846
    code = nodesListAppend(pList, (SNode*)pTargetNode);
2,572,988✔
1847
  }
1848
  if (code == 0) {
2,572,988!
1849
    pNode = NULL;
2,572,988✔
1850
    code = createExprInfo(pList, NULL, &pExprInfo, &numOfExprs);
2,572,988✔
1851
  }
1852

1853
  if (code == 0) {
2,571,735!
1854
    const char* pVal = NULL;
2,571,735✔
1855
    int32_t     len = 0;
2,571,735✔
1856
    SNode*      pSclNode = NULL;
2,571,735✔
1857
    switch (pExprInfo->pExpr->nodeType) {
2,571,735!
1858
      case QUERY_NODE_FUNCTION:
2,571,735✔
1859
        pSclNode = (SNode*)pExprInfo->pExpr->_function.pFunctNode;
2,571,735✔
1860
        break;
2,572,988✔
1861
      case QUERY_NODE_OPERATOR:
×
1862
        pSclNode = pExprInfo->pExpr->_optrRoot.pRootNode;
×
1863
        break;
×
1864
      default:
×
1865
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
1866
        break;
×
1867
    }
1868
    SArray*     pBlockList = taosArrayInit(2, POINTER_BYTES);
2,572,988✔
1869
    SSDataBlock block = {0};
2,572,988✔
1870
    block.info.rows = 1;
2,572,988✔
1871
    SSDataBlock* pBlock = &block;
2,572,988✔
1872
    void*        tmp = taosArrayPush(pBlockList, &pBlock);
2,572,988✔
1873
    if (tmp == NULL) {
2,572,988!
1874
      code = terrno;
×
1875
    }
1876
    if (code == 0) {
2,572,988!
1877
      code = scalarCalculateInRange(pSclNode, pBlockList, pDst, rowStartIdx, rowEndIdx, pExtraParams, NULL);
2,572,988✔
1878
    }
1879
    taosArrayDestroy(pBlockList);
2,572,988✔
1880
  }
1881
  nodesDestroyList(pList);
2,571,696✔
1882
  destroyExprInfo(pExprInfo, numOfExprs);
2,572,988✔
1883
  taosMemoryFreeClear(pExprInfo);
2,572,988!
1884
  return code;
2,570,404✔
1885
}
1886

1887
int32_t streamForceOutput(qTaskInfo_t tInfo, SSDataBlock** pRes, int32_t winIdx) {
13,340,486✔
1888
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
13,340,486✔
1889
  const SArray*  pForceOutputCols = pTaskInfo->pStreamRuntimeInfo->pForceOutputCols;
13,340,486✔
1890
  int32_t        code = 0;
13,340,486✔
1891
  SNode*         pNode = NULL;
13,340,486✔
1892
  if (!pForceOutputCols) return 0;
13,340,486✔
1893
  if (!*pRes) {
64,704✔
1894
    code = createDataBlock(pRes);
63,460✔
1895
  }
1896

1897
  if (code == 0 && (!(*pRes)->pDataBlock || (*pRes)->pDataBlock->size == 0)) {
64,704!
1898
    int32_t idx = 0;
63,460✔
1899
    for (int32_t i = 0; i < pForceOutputCols->size; ++i) {
252,580✔
1900
      SStreamOutCol*  pCol = (SStreamOutCol*)taosArrayGet(pForceOutputCols, i);
189,120✔
1901
      SColumnInfoData colInfo = createColumnInfoData(pCol->type.type, pCol->type.bytes, idx++);
189,120✔
1902
      colInfo.info.precision = pCol->type.precision;
189,120✔
1903
      colInfo.info.scale = pCol->type.scale;
189,120✔
1904
      code = blockDataAppendColInfo(*pRes, &colInfo);
189,120✔
1905
      if (code != 0) break;
189,120!
1906
    }
1907
  }
1908

1909
  code = blockDataEnsureCapacity(*pRes, (*pRes)->info.rows + 1);
64,704✔
1910
  if (code != TSDB_CODE_SUCCESS) {
64,704!
1911
    qError("failed to ensure capacity for force output, code:%s", tstrerror(code));
×
1912
    return code;
×
1913
  }
1914

1915
  // loop all exprs for force output, execute all exprs
1916
  int32_t idx = 0;
64,704✔
1917
  int32_t rowIdx = (*pRes)->info.rows;
64,704✔
1918
  int32_t tmpWinIdx = pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
64,704✔
1919
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = winIdx;
64,704✔
1920
  for (int32_t i = 0; i < pForceOutputCols->size; ++i) {
257,556✔
1921
    SScalarParam   dst = {0};
192,852✔
1922
    SStreamOutCol* pCol = (SStreamOutCol*)taosArrayGet(pForceOutputCols, i);
192,852✔
1923
    code = nodesStringToNode(pCol->expr, &pNode);
192,852✔
1924
    if (code != 0) break;
192,852!
1925
    SColumnInfoData* pInfo = taosArrayGet((*pRes)->pDataBlock, idx);
192,852✔
1926
    if (nodeType(pNode) == QUERY_NODE_VALUE) {
192,852✔
1927
      void* p = nodesGetValueFromNode((SValueNode*)pNode);
128,148✔
1928
      code = colDataSetVal(pInfo, rowIdx, p, ((SValueNode*)pNode)->isNull);
128,148!
1929
    } else {
1930
      dst.columnData = pInfo;
64,704✔
1931
      dst.numOfRows = rowIdx;
64,704✔
1932
      dst.colAlloced = false;
64,704✔
1933
      code = streamCalcOneScalarExprInRange(pNode, &dst, rowIdx, rowIdx, &pTaskInfo->pStreamRuntimeInfo->funcInfo);
64,704✔
1934
    }
1935
    ++idx;
192,852✔
1936
    // TODO sclFreeParam(&dst);
1937
    nodesDestroyNode(pNode);
192,852✔
1938
    if (code != 0) break;
192,852!
1939
  }
1940
  if (code == TSDB_CODE_SUCCESS) {
64,704!
1941
    (*pRes)->info.rows++;
64,704✔
1942
  }
1943
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = tmpWinIdx;
64,704✔
1944
  return code;
64,704✔
1945
}
1946

1947
int32_t streamCalcOutputTbName(SNode* pExpr, char* tbname, SStreamRuntimeFuncInfo* pStreamRuntimeInfo) {
914,904✔
1948
  int32_t      code = 0;
914,904✔
1949
  const char*  pVal = NULL;
914,904✔
1950
  SScalarParam dst = {0};
914,904✔
1951
  int32_t      len = 0;
914,904✔
1952
  int32_t      nextIdx = pStreamRuntimeInfo->curIdx;
914,904✔
1953
  pStreamRuntimeInfo->curIdx = 0;  // always use the first window to calc tbname
914,904✔
1954
  // execute the expr
1955
  switch (pExpr->type) {
914,904!
1956
    case QUERY_NODE_VALUE: {
×
1957
      SValueNode* pValue = (SValueNode*)pExpr;
×
1958
      int32_t     type = pValue->node.resType.type;
×
1959
      if (!IS_STR_DATA_TYPE(type)) {
×
1960
        qError("invalid sub tb expr with non-str type");
×
1961
        code = TSDB_CODE_INVALID_PARA;
×
1962
        break;
×
1963
      }
1964
      void* pTmp = nodesGetValueFromNode((SValueNode*)pExpr);
×
1965
      if (pTmp == NULL) {
×
1966
        qError("invalid sub tb expr with null value");
×
1967
        code = TSDB_CODE_INVALID_PARA;
×
1968
        break;
×
1969
      }
1970
      pVal = varDataVal(pTmp);
×
1971
      len = varDataLen(pTmp);
×
1972
    } break;
×
1973
    case QUERY_NODE_FUNCTION: {
914,904✔
1974
      SFunctionNode* pFunc = (SFunctionNode*)pExpr;
914,904✔
1975
      if (!IS_STR_DATA_TYPE(pFunc->node.resType.type)) {
914,904!
1976
        qError("invalid sub tb expr with non-str type func");
×
1977
        code = TSDB_CODE_INVALID_PARA;
×
1978
        break;
×
1979
      }
1980
      SColumnInfoData* pCol = taosMemoryCalloc(1, sizeof(SColumnInfoData));
914,904!
1981
      if (!pCol) {
914,904!
1982
        code = terrno;
×
1983
        qError("failed to allocate col info data at: %s, %d", __func__, __LINE__);
×
1984
        break;
×
1985
      }
1986

1987
      pCol->hasNull = true;
914,904✔
1988
      pCol->info.type = ((SExprNode*)pExpr)->resType.type;
914,904✔
1989
      pCol->info.colId = 0;
913,621✔
1990
      pCol->info.bytes = ((SExprNode*)pExpr)->resType.bytes;
913,621✔
1991
      pCol->info.precision = ((SExprNode*)pExpr)->resType.precision;
914,904✔
1992
      pCol->info.scale = ((SExprNode*)pExpr)->resType.scale;
914,904✔
1993
      code = colInfoDataEnsureCapacity(pCol, 1, true);
914,904✔
1994
      if (code != 0) {
914,904!
1995
        qError("failed to ensure capacity for col info data at: %s, %d", __func__, __LINE__);
×
1996
        taosMemoryFree(pCol);
×
1997
        break;
×
1998
      }
1999
      dst.columnData = pCol;
914,904✔
2000
      dst.numOfRows = 1;
914,904✔
2001
      dst.colAlloced = true;
914,904✔
2002
      code = streamCalcOneScalarExpr(pExpr, &dst, pStreamRuntimeInfo);
914,904✔
2003
      if (colDataIsNull_var(dst.columnData, 0)) {
914,904!
2004
        qInfo("invalid sub tb expr with null value");
1,292!
2005
        code = TSDB_CODE_MND_STREAM_TBNAME_CALC_FAILED;
×
2006
      }
2007
      if (code == 0) {
914,904!
2008
        pVal = varDataVal(colDataGetVarData(dst.columnData, 0));
914,904✔
2009
        len = varDataLen(colDataGetVarData(dst.columnData, 0));
913,612✔
2010
      }
2011
    } break;
913,612✔
2012
    default:
×
2013
      qError("wrong subtable expr with type: %d", pExpr->type);
×
2014
      code = TSDB_CODE_OPS_NOT_SUPPORT;
×
2015
      break;
×
2016
  }
2017
  if (code == 0) {
913,612!
2018
    if (!pVal || len == 0) {
913,612!
2019
      qError("tbname generated with no characters which is not allowed");
×
2020
      code = TSDB_CODE_INVALID_PARA;
×
2021
    }
2022
    if(len > TSDB_TABLE_NAME_LEN - 1) {
913,612✔
2023
      qError("tbname generated with too long characters, max allowed is %d, got %d, truncated.", TSDB_TABLE_NAME_LEN - 1, len);
2,548!
2024
      len = TSDB_TABLE_NAME_LEN - 1;
2,548✔
2025
    }
2026

2027
    memcpy(tbname, pVal, len);
913,612!
2028
    tbname[len] = '\0';  // ensure null terminated
913,612✔
2029
    if (NULL != strchr(tbname, '.')) {
914,904!
2030
      code = TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME;
×
2031
      qError("tbname generated with invalid characters, '.' is not allowed");
×
2032
    }
2033
  }
2034
  // TODO free dst
2035
  sclFreeParam(&dst);
914,904✔
2036
  pStreamRuntimeInfo->curIdx = nextIdx; // restore
914,904✔
2037
  return code;
914,904✔
2038
}
2039

2040
void destroyStreamInserterParam(SStreamInserterParam* pParam) {
1,023,012✔
2041
  if (pParam) {
1,023,012!
2042
    if (pParam->tbname) {
1,023,012!
2043
      taosMemFree(pParam->tbname);
1,023,012✔
2044
      pParam->tbname = NULL;
1,023,012✔
2045
    }
2046
    if (pParam->stbname) {
1,023,012!
2047
      taosMemFree(pParam->stbname);
1,023,012✔
2048
      pParam->stbname = NULL;
1,023,012✔
2049
    }
2050
    if (pParam->dbFName) {
1,023,012!
2051
      taosMemFree(pParam->dbFName);
1,023,012✔
2052
      pParam->dbFName = NULL;
1,023,012✔
2053
    }
2054
    if (pParam->pFields) {
1,023,012!
2055
      taosArrayDestroy(pParam->pFields);
1,023,012✔
2056
      pParam->pFields = NULL;
1,023,012✔
2057
    }
2058
    if (pParam->pTagFields) {
1,023,012✔
2059
      taosArrayDestroy(pParam->pTagFields);
655,652✔
2060
      pParam->pTagFields = NULL;
655,652✔
2061
    }
2062
    taosMemFree(pParam);
1,023,012✔
2063
  }
2064
}
1,023,012✔
2065

2066
int32_t cloneStreamInserterParam(SStreamInserterParam** ppDst, SStreamInserterParam* pSrc) {
1,023,012✔
2067
  int32_t code = 0, lino = 0;
1,023,012✔
2068
  if (ppDst == NULL || pSrc == NULL) {
1,023,012!
2069
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA);
×
2070
  }
2071
  *ppDst = (SStreamInserterParam*)taosMemoryCalloc(1, sizeof(SStreamInserterParam));
1,023,012!
2072
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
1,021,734!
2073

2074
  (*ppDst)->suid = pSrc->suid;
1,023,012✔
2075
  (*ppDst)->sver = pSrc->sver;
1,023,012✔
2076
  (*ppDst)->tbType = pSrc->tbType;
1,023,012✔
2077
  (*ppDst)->tbname = taosStrdup(pSrc->tbname);
1,021,734!
2078
  TSDB_CHECK_NULL((*ppDst)->tbname, code, lino, _exit, terrno);
1,021,734!
2079

2080
  if (pSrc->stbname) {
1,021,734!
2081
    (*ppDst)->stbname = taosStrdup(pSrc->stbname);
1,023,012!
2082
    TSDB_CHECK_NULL((*ppDst)->stbname, code, lino, _exit, terrno);
1,023,012!
2083
  }
2084

2085
  (*ppDst)->dbFName = taosStrdup(pSrc->dbFName);
1,021,734!
2086
  TSDB_CHECK_NULL((*ppDst)->dbFName, code, lino, _exit, terrno);
1,023,012!
2087

2088
  (*ppDst)->pSinkHandle = pSrc->pSinkHandle;  // don't need clone and free
1,023,012✔
2089

2090
  if (pSrc->pFields && pSrc->pFields->size > 0) {
1,023,012!
2091
    (*ppDst)->pFields = taosArrayDup(pSrc->pFields, NULL);
1,023,012✔
2092
    TSDB_CHECK_NULL((*ppDst)->pFields, code, lino, _exit, terrno);
1,023,012!
2093
  } else {
2094
    (*ppDst)->pFields = NULL;
×
2095
  }
2096
  
2097
  if (pSrc->pTagFields && pSrc->pTagFields->size > 0) {
1,023,012!
2098
    (*ppDst)->pTagFields = taosArrayDup(pSrc->pTagFields, NULL);
655,652✔
2099
    TSDB_CHECK_NULL((*ppDst)->pTagFields, code, lino, _exit, terrno);
655,652!
2100
  } else {
2101
    (*ppDst)->pTagFields = NULL;
366,082✔
2102
  }
2103

2104
_exit:
1,023,012✔
2105

2106
  if (code != 0) {
1,023,012!
2107
    if (*ppDst) {
×
2108
      destroyStreamInserterParam(*ppDst);
×
2109
      *ppDst = NULL;
×
2110
    }
2111
    
2112
    stError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2113
  }
2114
  return code;
1,023,012✔
2115
}
2116

2117
int32_t dropStreamTable(SMsgCb* pMsgCb, void* pOutput, SSTriggerDropRequest* pReq) {
1,309✔
2118
  return doDropStreamTable(pMsgCb, pOutput, pReq);
1,309✔
2119
}
2120

2121
int32_t dropStreamTableByTbName(SMsgCb* pMsgCb, void* pOutput, SSTriggerDropRequest* pReq, char* tbName) {
×
2122
  return doDropStreamTableByTbName(pMsgCb, pOutput, pReq, tbName);
×
2123
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc