• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4996

19 Mar 2026 02:16AM UTC coverage: 72.069% (+0.07%) from 71.996%
#4996

push

travis-ci

web-flow
feat: SQL firewall black/white list (#34798)

461 of 618 new or added lines in 4 files covered. (74.6%)

380 existing lines in 128 files now uncovered.

245359 of 340448 relevant lines covered (72.07%)

135732617.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.94
/source/libs/executor/src/executor.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include <stdint.h>
18
#include "cmdnodes.h"
19
#include "dataSinkInt.h"
20
#include "executil.h"
21
#include "executorInt.h"
22
#include "libs/new-stream/stream.h"
23
#include "operator.h"
24
#include "osMemPool.h"
25
#include "osMemory.h"
26
#include "planner.h"
27
#include "query.h"
28
#include "querytask.h"
29
#include "storageapi.h"
30
#include "streamexecutorInt.h"
31
#include "taosdef.h"
32
#include "tarray.h"
33
#include "tdatablock.h"
34
#include "tref.h"
35
#include "trpc.h"
36
#include "ttypes.h"
37
#include "tudf.h"
38
#include "wal.h"
39

40
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
41
int32_t             fetchObjRefPool = -1;
42
SGlobalExecInfo     gExecInfo = {0};
43

44
void setTaskScalarExtraInfo(qTaskInfo_t tinfo) {
746,921,988✔
45
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
746,921,988✔
46
  gTaskScalarExtra.pSubJobCtx = pTaskInfo->pSubJobCtx;
746,921,988✔
47
  gTaskScalarExtra.fp = qFetchRemoteNode;
746,960,947✔
48
}
746,911,695✔
49

50
void gExecInfoInit(void* pDnode, getDnodeId_f getDnodeId, getMnodeEpset_f getMnode) {
626,163✔
51
  gExecInfo.dnode = pDnode;
626,163✔
52
  gExecInfo.getMnode = getMnode;
626,163✔
53
  gExecInfo.getDnodeId = getDnodeId;
626,163✔
54
  return;
626,163✔
55
}
56

57
int32_t getCurrentMnodeEpset(SEpSet* pEpSet) {
46,219✔
58
  if (gExecInfo.dnode == NULL || gExecInfo.getMnode == NULL) {
46,219✔
59
    qError("gExecInfo is not initialized");
270✔
60
    return TSDB_CODE_APP_ERROR;
×
61
  }
62
  gExecInfo.getMnode(gExecInfo.dnode, pEpSet);
45,949✔
63
  return TSDB_CODE_SUCCESS;
46,219✔
64
}
65

66
void doDestroyFetchObj(void* param) {
144,610,431✔
67
  if (param == NULL) {
144,610,431✔
68
    return;
×
69
  }
70

71
  if (*(bool*)param) {
144,610,431✔
72
    doDestroyExchangeOperatorInfo(param);
105,624,941✔
73
  } else {
74
    destroySubJobCtx((STaskSubJobCtx *)param);
38,989,874✔
75
  }
76
}
77

78
static void cleanupRefPool() {
593,502✔
79
  int32_t ref = atomic_val_compare_exchange_32(&fetchObjRefPool, fetchObjRefPool, 0);
593,502✔
80
  taosCloseRef(ref);
593,502✔
81
}
593,502✔
82

83
static void initRefPool() {
593,502✔
84
  fetchObjRefPool = taosOpenRef(1024, doDestroyFetchObj);
593,502✔
85
  (void)atexit(cleanupRefPool);
593,502✔
86
}
593,502✔
87

88
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
×
89
  int32_t code = TSDB_CODE_SUCCESS;
×
90
  int32_t lino = 0;
×
91
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
×
92
    if (pOperator->numOfDownstream == 0) {
×
93
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
×
94
      return TSDB_CODE_APP_ERROR;
×
95
    }
96

97
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
×
98
      qError("join not supported for stream block scan, %s" PRIx64, id);
×
99
      return TSDB_CODE_APP_ERROR;
×
100
    }
101
    pOperator->status = OP_NOT_OPENED;
×
102
    return doSetSMABlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
×
103
  } else {
104
    pOperator->status = OP_NOT_OPENED;
×
105
    return TSDB_CODE_SUCCESS;
×
106
  }
107

108
_end:
109
  if (code != TSDB_CODE_SUCCESS) {
110
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
111
  }
112
  return code;
113
}
114

115
static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
×
116
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
×
117
    if (pOperator->numOfDownstream == 0) {
×
118
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
×
119
      return TSDB_CODE_APP_ERROR;
×
120
    }
121

122
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
×
123
      qError("join not supported for stream block scan, %s" PRIx64, id);
×
124
      return TSDB_CODE_APP_ERROR;
×
125
    }
126

127
    pOperator->status = OP_NOT_OPENED;
×
128
    return doSetStreamOpOpen(pOperator->pDownstream[0], id);
×
129
  }
130
  return 0;
×
131
}
132

133
int32_t doSetTaskId(SOperatorInfo* pOperator, SStorageAPI* pAPI) {
52,172,241✔
134
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
52,172,241✔
135
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
52,172,907✔
136
    STmqQueryScanInfo* pTmqScanInfo = pOperator->info;
23,490,284✔
137
    if (pTmqScanInfo->pTableScanOp != NULL) {
23,489,863✔
138
      STableScanInfo* pScanInfo = pTmqScanInfo->pTableScanOp->info;
23,489,922✔
139
      if (pScanInfo->base.dataReader != NULL) {
23,489,538✔
140
        int32_t code = pAPI->tsdReader.tsdSetReaderTaskId(pScanInfo->base.dataReader, pTaskInfo->id.str);
277,572✔
141
        if (code) {
277,572✔
142
          qError("failed to set reader id for executor, code:%s", tstrerror(code));
×
143
          return code;
×
144
        }
145
      }
146
    }
147
  } else {
148
    if (pOperator->pDownstream) return doSetTaskId(pOperator->pDownstream[0], pAPI);
28,682,344✔
149
  }
150

151
  return 0;
27,929,710✔
152
}
153

154
int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
27,929,686✔
155
  SExecTaskInfo* pTaskInfo = tinfo;
27,929,686✔
156
  pTaskInfo->id.queryId = queryId;
27,929,686✔
157
  buildTaskId(taskId, queryId, pTaskInfo->id.str, 64);
27,930,405✔
158

159
  // set the idstr for tsdbReader
160
  return doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI);
27,930,043✔
161
}
162

163
bool qTaskIsDone(qTaskInfo_t tinfo) {
×
164
  SExecTaskInfo* pTaskInfo = tinfo;
×
165
  return pTaskInfo->status == OP_EXEC_DONE;
×
166
}
167

168
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
×
169
  if (tinfo == NULL) {
×
170
    return TSDB_CODE_APP_ERROR;
×
171
  }
172

173
  if (pBlocks == NULL || numOfBlocks == 0) {
×
174
    return TSDB_CODE_SUCCESS;
×
175
  }
176

177
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
178

179
  int32_t code = doSetSMABlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
×
180
  if (code != TSDB_CODE_SUCCESS) {
×
181
    qError("%s failed to set the sma block data", GET_TASKID(pTaskInfo));
×
182
  } else {
183
    qDebug("%s set the sma block successfully", GET_TASKID(pTaskInfo));
×
184
  }
185

186
  return code;
×
187
}
188

189
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, uint64_t id) {
449,615✔
190
  if (msg == NULL) {  // create raw scan
449,615✔
191
    SExecTaskInfo* pTaskInfo = NULL;
114,854✔
192

193
    int32_t code = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, &pReaderHandle->api, &pTaskInfo);
114,854✔
194
    if (NULL == pTaskInfo || code != 0) {
115,201✔
195
      return NULL;
×
196
    }
197

198
    code = createTmqRawScanOperatorInfo(pReaderHandle, pTaskInfo, &pTaskInfo->pRoot);
115,201✔
199
    if (NULL == pTaskInfo->pRoot || code != 0) {
115,201✔
200
      taosMemoryFree(pTaskInfo);
×
201
      return NULL;
×
202
    }
203

204
    pTaskInfo->storageAPI = pReaderHandle->api;
115,201✔
205
    qDebug("create raw scan task info completed, vgId:%d, %s", vgId, GET_TASKID(pTaskInfo));
115,201✔
206
    return pTaskInfo;
115,201✔
207
  }
208

209
  SSubplan* pPlan = NULL;
334,761✔
210
  int32_t   code = qStringToSubplan(msg, &pPlan);
337,298✔
211
  if (code != TSDB_CODE_SUCCESS) {
337,298✔
212
    qError("failed to parse subplan from msg, msg:%s code:%s", (char*) msg, tstrerror(code));
×
213
    terrno = code;
×
214
    return NULL;
×
215
  }
216

217
  qTaskInfo_t pTaskInfo = NULL;
337,298✔
218
  code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_QUEUE, NULL);
337,298✔
219
  if (code != TSDB_CODE_SUCCESS) {
337,298✔
220
    qDestroyTask(pTaskInfo);
×
221
    terrno = code;
×
222
    return NULL;
×
223
  }
224

225
  return pTaskInfo;
337,298✔
226
}
227

228
static int32_t checkInsertParam(SStreamInserterParam* streamInserterParam) {
845,969✔
229
  if (streamInserterParam == NULL) {
845,969✔
230
    return TSDB_CODE_SUCCESS;
575,189✔
231
  }
232

233
  if (streamInserterParam->tbType == TSDB_SUPER_TABLE && streamInserterParam->suid <= 0) {
270,780✔
234
    stError("insertParam: invalid suid:%" PRIx64 " for child table", streamInserterParam->suid);
×
235
    return TSDB_CODE_INVALID_PARA;
×
236
  }
237

238
  if (streamInserterParam->dbFName == NULL || strlen(streamInserterParam->dbFName) == 0) {
270,780✔
UNCOV
239
    stError("insertParam: invalid db/table name");
×
240
    return TSDB_CODE_INVALID_PARA;
×
241
  }
242

243
  if (streamInserterParam->suid <= 0 &&
270,780✔
244
      (streamInserterParam->tbname == NULL || strlen(streamInserterParam->tbname) == 0)) {
114,015✔
245
    stError("insertParam: invalid table name, suid:%" PRIx64 "", streamInserterParam->suid);
×
246
    return TSDB_CODE_INVALID_PARA;
×
247
  }
248

249
  return TSDB_CODE_SUCCESS;
270,780✔
250
}
251

252
static int32_t qCreateStreamExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
845,969✔
253
                                     qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql,
254
                                     EOPTR_EXEC_MODEL model, SStreamInserterParam* streamInserterParam) {
255
  if (pSubplan == NULL || pTaskInfo == NULL) {
845,969✔
UNCOV
256
    qError("invalid parameter, pSubplan:%p, pTaskInfo:%p", pSubplan, pTaskInfo);
×
UNCOV
257
    nodesDestroyNode((SNode *)pSubplan);
×
258
    return TSDB_CODE_INVALID_PARA;
×
259
  }
260
  int32_t lino = 0;
845,969✔
261
  int32_t code = checkInsertParam(streamInserterParam);
845,969✔
262
  if (code != TSDB_CODE_SUCCESS) {
845,969✔
263
    qError("invalid stream inserter param, code:%s", tstrerror(code));
×
264
    nodesDestroyNode((SNode *)pSubplan);
×
265
    return code;
×
266
  }
267
  SInserterParam* pInserterParam = NULL;
845,969✔
268
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
845,969✔
269
  (void)taosThreadOnce(&initPoolOnce, initRefPool);
845,969✔
270
  qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
845,969✔
271

272
  int32_t subTaskNum = (int32_t)LIST_LENGTH(pSubplan->pSubQ);
845,969✔
273
  SArray* subEndPoints = taosArrayInit(subTaskNum, POINTER_BYTES);
845,969✔
274
  SDownstreamSourceNode* pSource = NULL;
845,969✔
275
  for (int32_t i = 0; i < subTaskNum; ++i) {
1,088,972✔
276
    SNode* pVal = nodesListGetNode(pSubplan->pSubQ, i);
243,003✔
277

278
    TSDB_CHECK_CODE(nodesCloneNode(pVal, (SNode**)&pSource), lino, _error);
243,003✔
279

280
    if (NULL == taosArrayPush(subEndPoints, &pSource)) {
486,006✔
281
      nodesDestroyNode((SNode *)pSource);
×
282
      code = terrno;
×
283
      goto _error;
×
284
    }
285
  }
286

287
  code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model, &subEndPoints);
845,969✔
288
  if (code != TSDB_CODE_SUCCESS || NULL == *pTask) {
845,476✔
289
    qError("failed to createExecTaskInfo, code:%s", tstrerror(code));
×
290
    goto _error;
×
291
  }
292

293
  if (subTaskNum > 0) {
845,969✔
294
    SDownstreamSourceNode* pVal = (SDownstreamSourceNode*)nodesListGetNode(pSubplan->pSubQ, 0);
53,084✔
295
    (*pTask)->pSubJobCtx->queryId = pVal->clientId;
53,084✔
296
  }
297

298
  if (streamInserterParam) {
845,969✔
299
    SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50, .compress = compressResult};
270,780✔
300
    void*           pSinkManager = NULL;
270,780✔
301
    code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
270,545✔
302
    if (code != TSDB_CODE_SUCCESS) {
270,780✔
303
      qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
×
304
      goto _error;
×
305
    }
306

307
    pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
270,780✔
308
    if (NULL == pInserterParam) {
270,780✔
309
      qError("failed to taosMemoryCalloc, code:%s, %s", tstrerror(terrno), (*pTask)->id.str);
×
310
      code = terrno;
×
311
      goto _error;
×
312
    }
313
    code = cloneStreamInserterParam(&pInserterParam->streamInserterParam, streamInserterParam);
270,780✔
314
    TSDB_CHECK_CODE(code, lino, _error);
270,546✔
315
    
316
    pInserterParam->readHandle = taosMemCalloc(1, sizeof(SReadHandle));
270,546✔
317
    pInserterParam->readHandle->pMsgCb = readHandle->pMsgCb;
270,780✔
318

319
    code = createStreamDataInserter(pSinkManager, handle, pInserterParam);
270,780✔
320
    if (code) {
270,780✔
321
      qError("failed to createStreamDataInserter, code:%s, %s", tstrerror(code), (*pTask)->id.str);
×
322
    }
323
  }
324
  qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64 " code:%s", taskId, pSubplan->id.queryId,
845,969✔
325
         tstrerror(code));
326

327
_error:
844,251✔
328
  if (subEndPoints) {
845,969✔
329
    taosArrayDestroyP(subEndPoints, (FDelete)nodesDestroyNode);
792,885✔
330
  }
331

332
  if (code != TSDB_CODE_SUCCESS) {
845,969✔
333
    qError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
334
    if (pInserterParam != NULL) {
×
335
      taosMemoryFree(pInserterParam);
×
336
    }
337
  }
338
  return code;
845,969✔
339
}
340

341
bool qNeedReset(qTaskInfo_t pInfo) {
3,594,181✔
342
  if (pInfo == NULL) {
3,594,181✔
343
    return false;
×
344
  }
345
  SExecTaskInfo*  pTaskInfo = (SExecTaskInfo*)pInfo;
3,594,181✔
346
  SOperatorInfo*  pOperator = pTaskInfo->pRoot;
3,594,181✔
347
  if (pOperator == NULL || pOperator->pPhyNode == NULL) {
3,594,181✔
348
    return false;
×
349
  }
350
  int32_t node = nodeType(pOperator->pPhyNode);
3,594,181✔
351
  return (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == node || 
3,219,086✔
352
          QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == node ||
640,434✔
353
          QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == node ||
6,813,267✔
354
          QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == node);
355
}
356

357
static void setReadHandle(SReadHandle* pHandle, STableScanBase* pScanBaseInfo) {
2,953,747✔
358
  if (pHandle == NULL || pScanBaseInfo == NULL) {
2,953,747✔
359
    return;
×
360
  }
361

362
  pScanBaseInfo->readHandle.uid = pHandle->uid;
2,953,747✔
363
  pScanBaseInfo->readHandle.winRangeValid = pHandle->winRangeValid;
2,953,747✔
364
  pScanBaseInfo->readHandle.winRange = pHandle->winRange;
2,953,747✔
365
  pScanBaseInfo->readHandle.extWinRangeValid = pHandle->extWinRangeValid;
2,953,747✔
366
  pScanBaseInfo->readHandle.extWinRange = pHandle->extWinRange;
2,953,747✔
367
  pScanBaseInfo->readHandle.cacheSttStatis = pHandle->cacheSttStatis;
2,953,747✔
368
}
369

370
int32_t qResetTableScan(qTaskInfo_t pInfo, SReadHandle* handle) {
3,594,181✔
371
  if (pInfo == NULL) {
3,594,181✔
372
    return TSDB_CODE_INVALID_PARA;
×
373
  }
374
  SExecTaskInfo*  pTaskInfo = (SExecTaskInfo*)pInfo;
3,594,181✔
375
  SOperatorInfo*  pOperator = pTaskInfo->pRoot;
3,594,181✔
376

377
  void*           info = pOperator->info;
3,594,181✔
378
  STableScanBase* pScanBaseInfo = NULL;
3,594,181✔
379

380
  if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pOperator->pPhyNode)) {
3,594,181✔
381
    pScanBaseInfo = &((STableScanInfo*)info)->base;
375,095✔
382
    setReadHandle(handle, pScanBaseInfo);
375,095✔
383
  } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(pOperator->pPhyNode)) {
3,219,086✔
384
    pScanBaseInfo = &((STableMergeScanInfo*)info)->base;
2,578,652✔
385
    setReadHandle(handle, pScanBaseInfo);
2,578,652✔
386
  }
387

388
  qDebug("reset table scan, name:%s, id:%s, time range: [%" PRId64 ", %" PRId64 "]", pOperator->name, GET_TASKID(pTaskInfo), handle->winRange.skey,
3,594,181✔
389
  handle->winRange.ekey);
390
  return pOperator->fpSet.resetStateFn(pOperator);
3,594,181✔
391
}
392

393
int32_t qCreateStreamExecTaskInfo(qTaskInfo_t* pTaskInfo, void* msg, SReadHandle* readers,
845,361✔
394
                                  SStreamInserterParam* pInserterParams, int32_t vgId, int32_t taskId) {
395
  if (msg == NULL) {
845,361✔
396
    return TSDB_CODE_INVALID_PARA;
×
397
  }
398

399
  *pTaskInfo = NULL;
845,361✔
400

401
  SSubplan* pPlan = NULL;
845,969✔
402
  int32_t   code = qStringToSubplan(msg, &pPlan);
845,969✔
403
  if (code != TSDB_CODE_SUCCESS) {
845,969✔
404
    nodesDestroyNode((SNode *)pPlan);
×
405
    return code;
×
406
  }
407
  // todo: add stream inserter param
408
  code = qCreateStreamExecTask(readers, vgId, taskId, pPlan, pTaskInfo,
845,969✔
409
                               pInserterParams ? &pInserterParams->pSinkHandle : NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM,
410
                               pInserterParams);
411
  if (code != TSDB_CODE_SUCCESS) {
845,969✔
412
    qDestroyTask(*pTaskInfo);
×
413
    return code;
×
414
  }
415

416
  return code;
845,969✔
417
}
418

419
typedef struct {
420
  tb_uid_t tableUid;
421
  tb_uid_t childUid;
422
  int8_t   check;
423
} STqPair;
424

425
static int32_t filterUnqualifiedTables(const STmqQueryScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr,
404,213✔
426
                                       SStorageAPI* pAPI, SArray** ppArrayRes) {
427
  int32_t code = TSDB_CODE_SUCCESS;
404,213✔
428
  int32_t lino = 0;
404,213✔
429
  int8_t  locked = 0;
404,213✔
430
  SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
404,213✔
431

432
  QUERY_CHECK_NULL(qa, code, lino, _error, terrno);
404,213✔
433

434
  SArray* tUid = taosArrayInit(4, sizeof(STqPair));
404,213✔
435
  QUERY_CHECK_NULL(tUid, code, lino, _error, terrno);
404,213✔
436

437
  int32_t numOfUids = taosArrayGetSize(tableIdList);
404,213✔
438
  if (numOfUids == 0) {
404,213✔
439
    (*ppArrayRes) = qa;
×
440
    goto _error;
×
441
  }
442

443
  STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
404,213✔
444

445
  uint64_t suid = 0;
404,213✔
446
  uint64_t uid = 0;
404,213✔
447
  int32_t  type = 0;
404,213✔
448
  tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &suid, &uid, &type);
404,213✔
449

450
  // let's discard the tables those are not created according to the queried super table.
451
  SMetaReader mr = {0};
404,213✔
452
  pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
404,213✔
453

454
  locked = 1;
404,213✔
455
  for (int32_t i = 0; i < numOfUids; ++i) {
876,538✔
456
    uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
472,325✔
457
    QUERY_CHECK_NULL(id, code, lino, _end, terrno);
472,325✔
458

459
    int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr, *id);
472,325✔
460
    if (code != TSDB_CODE_SUCCESS) {
472,325✔
461
      qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
×
462
      continue;
×
463
    }
464

465
    tDecoderClear(&mr.coder);
472,325✔
466

467
    if (mr.me.type == TSDB_SUPER_TABLE) {
472,325✔
468
      continue;
×
469
    } else {
470
      if (type == TSDB_SUPER_TABLE) {
472,325✔
471
        // this new created child table does not belong to the scanned super table.
472
        if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != suid) {
472,325✔
473
          continue;
×
474
        }
475
      } else {  // ordinary table
476
        // In case that the scanned target table is an ordinary table. When replay the WAL during restore the vnode, we
477
        // should check all newly created ordinary table to make sure that this table isn't the destination table.
478
        if (mr.me.uid != uid) {
×
479
          continue;
×
480
        }
481
      }
482
    }
483

484
    STqPair item = {.tableUid = *id, .childUid = mr.me.uid, .check = 0};
472,325✔
485
    if (pScanInfo->pTagCond != NULL) {
472,325✔
486
      // tb_uid_t id = mr.me.uid;
487
      item.check = 1;
361,675✔
488
    }
489
    if (taosArrayPush(tUid, &item) == NULL) {
472,325✔
490
      QUERY_CHECK_NULL(NULL, code, lino, _end, terrno);
×
491
    }
492
  }
493

494
  pAPI->metaReaderFn.clearReader(&mr);
404,213✔
495
  locked = 0;
404,213✔
496

497
  for (int32_t j = 0; j < taosArrayGetSize(tUid); ++j) {
876,538✔
498
    bool     qualified = false;
472,325✔
499
    STqPair* t = (STqPair*)taosArrayGet(tUid, j);
472,325✔
500
    if (t == NULL) {
472,325✔
501
      continue;
×
502
    }
503

504
    if (t->check == 1) {
472,325✔
505
      code = isQualifiedTable(t->childUid, pScanInfo->pTagCond, pScanInfo->readHandle.vnode, &qualified, pAPI);
361,675✔
506
      if (code != TSDB_CODE_SUCCESS) {
361,675✔
507
        qError("failed to filter new table, uid:0x%" PRIx64 ", %s", t->childUid, idstr);
×
508
        continue;
×
509
      }
510

511
      if (!qualified) {
361,675✔
512
        qInfo("table uid:0x%" PRIx64 " is unqualified for tag condition, %s", t->childUid, idstr);
180,999✔
513
        continue;
180,999✔
514
      }
515
    }
516

517
    void* tmp = taosArrayPush(qa, &t->tableUid);
291,326✔
518
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
291,326✔
519
  }
520

521
  // handle multiple partition
522

523
_end:
404,213✔
524

525
  if (locked) {
404,213✔
526
    pAPI->metaReaderFn.clearReader(&mr);
×
527
  }
528
  (*ppArrayRes) = qa;
404,213✔
529
_error:
404,213✔
530

531
  taosArrayDestroy(tUid);
404,213✔
532
  if (code != TSDB_CODE_SUCCESS) {
404,213✔
533
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
534
  }
535
  return code;
404,213✔
536
}
537

538
int32_t qDeleteTableListForTmqScanner(qTaskInfo_t tinfo, const SArray* tableIdList) {
1,484✔
539
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1,484✔
540
  const char*    id = GET_TASKID(pTaskInfo);
1,484✔
541
  int32_t        code = 0;
1,484✔
542

543
  // traverse to the stream scanner node to add this table id
544
  SOperatorInfo* pInfo = NULL;
1,484✔
545
  code = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pInfo);
1,484✔
546
  if (code != 0 || pInfo == NULL) {
1,484✔
547
    return code;
×
548
  }
549

550
  STmqQueryScanInfo* pScanInfo = pInfo->info;
1,484✔
551
  qDebug("%d remove child tables from the stream scanner, %s", (int32_t)taosArrayGetSize(tableIdList), id);
1,484✔
552
  taosWLockLatch(&pTaskInfo->lock);
1,484✔
553
  pTaskInfo->storageAPI.tqReaderFn.tqReaderRemoveTables(pScanInfo->tqReader, tableIdList);
1,484✔
554
  taosWUnLockLatch(&pTaskInfo->lock);
1,484✔
555

556
  return code;
1,484✔
557
}
558

559
static int32_t filterTableForTmqQuery(STmqQueryScanInfo* pScanInfo, const SArray* tableIdList, const char* id, SStorageAPI* pAPI, SRWLatch* lock) {
404,213✔
560
  SArray* qa = NULL;
404,213✔
561
  int32_t code = filterUnqualifiedTables(pScanInfo, tableIdList, id, pAPI, &qa);
404,213✔
562
  if (code != TSDB_CODE_SUCCESS) {
404,213✔
563
    taosArrayDestroy(qa);
×
564
    return code;
×
565
  }
566
  int32_t numOfQualifiedTables = taosArrayGetSize(qa);
404,213✔
567
  qDebug("%d qualified child tables added into stream scanner, %s", numOfQualifiedTables, id);
404,213✔
568
  pAPI->tqReaderFn.tqReaderAddTables(pScanInfo->tqReader, qa);
404,213✔
569

570
  bool   assignUid = false;
404,213✔
571
  size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
404,213✔
572
  char*  keyBuf = NULL;
404,213✔
573
  if (bufLen > 0) {
404,213✔
574
    assignUid = groupbyTbname(pScanInfo->pGroupTags);
×
575
    keyBuf = taosMemoryMalloc(bufLen);
×
576
    if (keyBuf == NULL) {
×
577
      taosArrayDestroy(qa);
×
578
      return terrno;
×
579
    }
580
  }
581

582
  STableListInfo* pTableListInfo = ((STableScanInfo*)pScanInfo->pTableScanOp->info)->base.pTableListInfo;
404,213✔
583
  taosWLockLatch(lock);
404,213✔
584

585
  for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
695,539✔
586
    uint64_t* uid = taosArrayGet(qa, i);
291,326✔
587
    if (!uid) {
291,326✔
588
      taosMemoryFree(keyBuf);
×
589
      taosArrayDestroy(qa);
×
590
      taosWUnLockLatch(lock);
×
591
      return terrno;
×
592
    }
593
    STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
291,326✔
594

595
    if (bufLen > 0) {
291,326✔
596
      if (assignUid) {
×
597
        keyInfo.groupId = keyInfo.uid;
×
598
      } else {
599
        code = getGroupIdFromTagsVal(pScanInfo->readHandle.vnode, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
×
600
                                      &keyInfo.groupId, pAPI);
601
        if (code != TSDB_CODE_SUCCESS) {
×
602
          taosMemoryFree(keyBuf);
×
603
          taosArrayDestroy(qa);
×
604
          taosWUnLockLatch(lock);
×
605
          return code;
×
606
        }
607
      }
608
    }
609

610
    code = tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
291,326✔
611
    if (code != TSDB_CODE_SUCCESS) {
291,326✔
612
      taosMemoryFree(keyBuf);
×
613
      taosArrayDestroy(qa);
×
614
      taosWUnLockLatch(lock);
×
615
      return code;
×
616
    }
617
  }
618

619
  taosWUnLockLatch(lock);
404,213✔
620
  if (keyBuf != NULL) {
404,213✔
621
    taosMemoryFree(keyBuf);
×
622
  }
623

624
  taosArrayDestroy(qa);
404,213✔
625
  return 0;
404,213✔
626
}
627

628
static void qUpdateTableTagCache(STmqQueryScanInfo* pScanInfo, const SArray* tableIdList, col_id_t cid, SStorageAPI* api) {
404,891✔
629
  STqReader*   tqReader = pScanInfo->tqReader;
404,891✔
630
  for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
877,894✔
631
    int64_t* uid = (int64_t*)taosArrayGet(tableIdList, i);
473,003✔
632
    api->tqReaderFn.tqUpdateTableTagCache(pScanInfo->tqReader, pScanInfo->pPseudoExpr, pScanInfo->numOfPseudoExpr, *uid, cid);
473,003✔
633
  }
634
}
404,891✔
635

636
int32_t qAddTableListForTmqScanner(qTaskInfo_t tinfo, const SArray* tableIdList) {
404,213✔
637
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
404,213✔
638
  const char*    id = GET_TASKID(pTaskInfo);
404,213✔
639
  int32_t        code = 0;
404,213✔
640

641
  qDebug("try to add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), id);
404,213✔
642

643
  // traverse to the stream scanner node to add this table id
644
  SOperatorInfo* pInfo = NULL;
404,213✔
645
  code = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pInfo);
404,213✔
646
  if (code != 0 || pInfo == NULL) {
404,213✔
647
    return code;
×
648
  }
649

650
  STmqQueryScanInfo* pScanInfo = pInfo->info;
404,213✔
651
  qUpdateTableTagCache(pScanInfo, tableIdList, 0, &pTaskInfo->storageAPI);
404,213✔
652

653
  return filterTableForTmqQuery(pScanInfo, tableIdList, id, &pTaskInfo->storageAPI, &pTaskInfo->lock);
404,213✔
654
}
655

656
void qUpdateTableTagCacheForTmq(qTaskInfo_t tinfo, const SArray* tableIdList, SArray* cids) {
678✔
657
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
678✔
658
  const char*    id = GET_TASKID(pTaskInfo);
678✔
659
  int32_t        code = 0;
678✔
660

661
  // traverse to the stream scanner node to add this table id
662
  SOperatorInfo* pInfo = NULL;
678✔
663
  code = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pInfo);
678✔
664
  if (code != 0 || pInfo == NULL) {
678✔
665
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
666
    return;
×
667
  }
668

669
  STmqQueryScanInfo* pScanInfo = pInfo->info;
678✔
670
  for (int32_t i = 0; i < taosArrayGetSize(cids); ++i) {
1,356✔
671
    col_id_t* cid = (col_id_t*)taosArrayGet(cids, i);
678✔
672
    qUpdateTableTagCache(pScanInfo, tableIdList, *cid, &pTaskInfo->storageAPI);
678✔
673
  }
674
}
675

676
int32_t qUpdateTableListForTmqScanner(qTaskInfo_t tinfo, const SArray* tableIdList) {
×
677
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
678
  const char*    id = GET_TASKID(pTaskInfo);
×
679
  int32_t        code = 0;
×
680

681
  // traverse to the stream scanner node to add this table id
682
  SOperatorInfo* pInfo = NULL;
×
683
  code = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pInfo);
×
684
  if (code != 0 || pInfo == NULL) {
×
685
    return code;
×
686
  }
687

688
  STmqQueryScanInfo* pScanInfo = pInfo->info;
×
689

690
  qDebug("%s %d remove child tables from the stream scanner, %s", __func__, (int32_t)taosArrayGetSize(tableIdList), id);
×
691
  taosWLockLatch(&pTaskInfo->lock);
×
692
  pTaskInfo->storageAPI.tqReaderFn.tqReaderRemoveTables(pScanInfo->tqReader, tableIdList);
×
693
  taosWUnLockLatch(&pTaskInfo->lock);
×
694
  
695
  return filterTableForTmqQuery(pScanInfo, tableIdList, id, &pTaskInfo->storageAPI, &pTaskInfo->lock);
×
696
}
697

698
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, int32_t dbNameBuffLen, char* tableName,
604,222,952✔
699
                                    int32_t tbaleNameBuffLen, int32_t* sversion, int32_t* tversion, int32_t* rversion,
700
                                    int32_t idx, bool* tbGet) {
701
  *tbGet = false;
604,222,952✔
702

703
  if (tinfo == NULL || dbName == NULL || tableName == NULL) {
604,246,180✔
704
    return TSDB_CODE_INVALID_PARA;
1,889✔
705
  }
706
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
604,251,712✔
707

708
  if (taosArrayGetSize(pTaskInfo->schemaInfos) <= idx) {
604,251,712✔
709
    return TSDB_CODE_SUCCESS;
345,940,153✔
710
  }
711

712
  SSchemaInfo* pSchemaInfo = taosArrayGet(pTaskInfo->schemaInfos, idx);
258,315,223✔
713
  if (!pSchemaInfo) {
258,324,716✔
714
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
715
    return terrno;
×
716
  }
717

718
  *sversion = pSchemaInfo->sw->version;
258,324,716✔
719
  *tversion = pSchemaInfo->tversion;
258,294,786✔
720
  *rversion = pSchemaInfo->rversion;
258,370,241✔
721
  if (pSchemaInfo->dbname) {
258,299,542✔
722
    tstrncpy(dbName, pSchemaInfo->dbname, dbNameBuffLen);
258,266,010✔
723
  } else {
724
    dbName[0] = 0;
×
725
  }
726
  if (pSchemaInfo->tablename) {
258,317,674✔
727
    tstrncpy(tableName, pSchemaInfo->tablename, tbaleNameBuffLen);
258,319,354✔
728
  } else {
729
    tableName[0] = 0;
×
730
  }
731

732
  *tbGet = true;
258,302,089✔
733

734
  return TSDB_CODE_SUCCESS;
258,279,903✔
735
}
736

737
bool qIsDynamicExecTask(qTaskInfo_t tinfo) { return ((SExecTaskInfo*)tinfo)->dynamicTask; }
345,935,294✔
738

739
void qDestroyOperatorParam(SOperatorParam* pParam) {
125,715✔
740
  if (NULL == pParam) {
125,715✔
741
    return;
×
742
  }
743
  freeOperatorParam(pParam, OP_GET_PARAM);
125,715✔
744
}
745

746
/**
747
  @brief Update the operator param for the task.
748
  @note  Unlike setOperatorParam, this function will destroy the new param when
749
         operator type mismatch.
750
*/
751
void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) {
37,991,780✔
752
  SExecTaskInfo* pTask = (SExecTaskInfo*)tinfo;
37,991,780✔
753
  SOperatorParam* pNewParam = (SOperatorParam*)pParam;
37,991,780✔
754
  if (pTask->pRoot && pTask->pRoot->operatorType != pNewParam->opType) {
37,991,780✔
755
    qError("%s, %s operator type mismatch, task operator type:%d, "
120,785✔
756
           "new param operator type:%d", GET_TASKID(pTask), __func__,
757
           pTask->pRoot->operatorType,
758
           pNewParam->opType);
759
    qDestroyOperatorParam((SOperatorParam*)pParam);
120,785✔
760
    return;
120,785✔
761
  }
762
  TSWAP(pParam, pTask->pOpParam);
37,876,169✔
763
  ((SExecTaskInfo*)tinfo)->paramSet = false;
37,875,093✔
764
}
765

766
int32_t qExecutorInit(void) {
4,651,613✔
767
  (void)taosThreadOnce(&initPoolOnce, initRefPool);
4,651,613✔
768
  return TSDB_CODE_SUCCESS;
4,652,422✔
769
}
770

771
int32_t qSemWait(qTaskInfo_t task, tsem_t* pSem) {
36,487,845✔
772
  int32_t        code = TSDB_CODE_SUCCESS;
36,487,845✔
773
  SExecTaskInfo* pTask = (SExecTaskInfo*)task;
36,487,845✔
774
  if (pTask->pWorkerCb) {
36,487,845✔
775
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
36,488,941✔
776
    if (code != TSDB_CODE_SUCCESS) {
36,490,037✔
777
      pTask->code = code;
×
778
      return pTask->code;
×
779
    }
780
  }
781

782
  code = tsem_wait(pSem);
36,491,133✔
783
  if (code != TSDB_CODE_SUCCESS) {
36,489,489✔
784
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
785
    pTask->code = code;
×
786
    return pTask->code;
×
787
  }
788

789
  if (pTask->pWorkerCb) {
36,489,489✔
790
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
36,490,037✔
791
    if (code != TSDB_CODE_SUCCESS) {
36,490,037✔
792
      pTask->code = code;
×
793
      return pTask->code;
×
794
    }
795
  }
796
  return TSDB_CODE_SUCCESS;
36,490,037✔
797
}
798

799
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
348,517,146✔
800
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql,
801
                        EOPTR_EXEC_MODEL model, SArray** subEndPoints) {
802
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
348,517,146✔
803
  (void)taosThreadOnce(&initPoolOnce, initRefPool);
348,517,146✔
804

805
  qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d, subEndPoinsNum:%d", 
348,494,698✔
806
    taskId, pSubplan->id.queryId, vgId, (int32_t)taosArrayGetSize(subEndPoints ? *subEndPoints : NULL));
807

808
  readHandle->uid = 0;
348,537,752✔
809
  int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model, subEndPoints);
348,563,953✔
810
  if (code != TSDB_CODE_SUCCESS || NULL == *pTask) {
348,362,534✔
811
    qError("failed to createExecTaskInfo, code:%s", tstrerror(code));
553,092✔
812
    goto _error;
468,782✔
813
  }
814
    
815
  if (handle) {
347,870,491✔
816
    SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50, .compress = compressResult};
347,525,965✔
817
    void*           pSinkManager = NULL;
347,608,832✔
818
    code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
347,339,528✔
819
    if (code != TSDB_CODE_SUCCESS) {
347,423,167✔
820
      qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
×
821
      goto _error;
×
822
    }
823

824
    void* pSinkParam = NULL;
347,423,167✔
825
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, (*pTask), readHandle);
347,530,515✔
826
    if (code != TSDB_CODE_SUCCESS) {
347,371,504✔
827
      qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
×
828
      taosMemoryFree(pSinkManager);
×
829
      goto _error;
×
830
    }
831

832
    SDataSinkNode* pSink = NULL;
347,371,504✔
833
    if (readHandle->localExec) {
347,531,762✔
834
      code = nodesCloneNode((SNode*)pSubplan->pDataSink, (SNode**)&pSink);
2,596✔
835
      if (code != TSDB_CODE_SUCCESS) {
2,596✔
836
        qError("failed to nodesCloneNode, srcType:%d, code:%s, %s", nodeType(pSubplan->pDataSink), tstrerror(code),
×
837
               (*pTask)->id.str);
838
        taosMemoryFree(pSinkManager);
×
839
        goto _error;
×
840
      }
841
    }
842

843
    // pSinkParam has been freed during create sinker.
844
    code = dsCreateDataSinker(pSinkManager, readHandle->localExec ? &pSink : &pSubplan->pDataSink, handle, pSinkParam,
347,590,300✔
845
                              (*pTask)->id.str, pSubplan->processOneBlock);
347,607,961✔
846
    if (code) {
347,394,008✔
847
      qError("s-task:%s failed to create data sinker, code:%s", (*pTask)->id.str, tstrerror(code));
609✔
848
    }
849
  }
850

851
  qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64 " code:%s subEndPoints:%d", 
347,911,751✔
852
    taskId, pSubplan->id.queryId, tstrerror(code), (*pTask)->pSubJobCtx ? (int32_t)taosArrayGetSize((*pTask)->pSubJobCtx->subEndPoints) : 0);
853

854
_error:
124,920,524✔
855
  // if failed to add ref for all tables in this query, abort current query
856
  return code;
348,510,115✔
857
}
858

859
static void freeBlock(void* param) {
×
860
  SSDataBlock* pBlock = *(SSDataBlock**)param;
×
861
  blockDataDestroy(pBlock);
×
862
}
×
863

864
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal,
402,830,065✔
865
                     bool processOneBlock) {
866
  int32_t        code = TSDB_CODE_SUCCESS;
402,830,065✔
867
  int32_t        lino = 0;
402,830,065✔
868
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
402,830,065✔
869
  int64_t        threadId = taosGetSelfPthreadId();
402,830,065✔
870

871
  if (pLocal) {
402,796,721✔
872
    memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
397,567,235✔
873
  }
874

875
  taosArrayClear(pResList);
402,782,783✔
876

877
  int64_t curOwner = 0;
402,728,021✔
878
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
402,728,021✔
879
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
×
880
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
×
881
    return pTaskInfo->code;
×
882
  }
883

884
  if (pTaskInfo->cost.start == 0) {
402,729,201✔
885
    pTaskInfo->cost.start = taosGetTimestampUs();
341,566,318✔
886
  }
887

888
  if (isTaskKilled(pTaskInfo)) {
402,860,064✔
889
    atomic_store_64(&pTaskInfo->owner, 0);
1,199✔
890
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
1,199✔
891
    return pTaskInfo->code;
1,199✔
892
  }
893

894
  // error occurs, record the error code and return to client
895
  int32_t ret = setjmp(pTaskInfo->env);
402,753,140✔
896
  if (ret != TSDB_CODE_SUCCESS) {
404,928,895✔
897
    pTaskInfo->code = ret;
2,344,029✔
898
    (void)cleanUpUdfs();
2,344,029✔
899

900
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
2,344,029✔
901
    atomic_store_64(&pTaskInfo->owner, 0);
2,344,029✔
902

903
    return pTaskInfo->code;
2,344,029✔
904
  }
905

906
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
402,584,866✔
907

908
  int32_t      current = 0;
402,598,164✔
909
  SSDataBlock* pRes = NULL;
402,598,164✔
910
  int64_t      st = taosGetTimestampUs();
402,804,233✔
911

912
  if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
402,804,233✔
913
    pTaskInfo->paramSet = true;
37,870,465✔
914
    code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes);
37,865,932✔
915
  } else {
916
    code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
364,936,226✔
917
  }
918

919
  QUERY_CHECK_CODE(code, lino, _end);
400,501,436✔
920
  code = blockDataCheck(pRes);
400,501,436✔
921
  QUERY_CHECK_CODE(code, lino, _end);
400,514,415✔
922

923
  if (pRes == NULL) {
400,514,415✔
924
    st = taosGetTimestampUs();
82,573,881✔
925
  }
926

927
  int32_t rowsThreshold = pTaskInfo->pSubplan->rowsThreshold;
400,503,840✔
928
  if (!pTaskInfo->pSubplan->dynamicRowThreshold || 4096 <= pTaskInfo->pSubplan->rowsThreshold) {
400,540,625✔
929
    rowsThreshold = 4096;
399,970,307✔
930
  }
931

932
  int32_t blockIndex = 0;
400,519,116✔
933
  while (pRes != NULL) {
1,142,923,284✔
934
    SSDataBlock* p = NULL;
793,670,364✔
935
    if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) {
793,628,485✔
936
      SSDataBlock* p1 = NULL;
649,348,413✔
937
      code = createOneDataBlock(pRes, true, &p1);
649,363,267✔
938
      QUERY_CHECK_CODE(code, lino, _end);
649,321,923✔
939

940
      void* tmp = taosArrayPush(pTaskInfo->pResultBlockList, &p1);
649,321,923✔
941
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
649,369,928✔
942
      p = p1;
649,369,928✔
943
    } else if (processOneBlock) {
144,388,957✔
944
      SSDataBlock** tmp = taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
18,519,985✔
945
      if (tmp) {
18,519,985✔
946
        blockDataDestroy(*tmp);
18,519,985✔
947
        *tmp = NULL;
18,519,985✔
948
      }
949
      SSDataBlock* p1 = NULL;
18,519,985✔
950
      code = createOneDataBlock(pRes, true, &p1);
18,519,985✔
951
      QUERY_CHECK_CODE(code, lino, _end);
18,519,985✔
952

953
      *tmp = p1;
18,519,985✔
954
      p = p1;
18,519,985✔
955
    } else {
956
      void* tmp = taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
125,868,972✔
957
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
125,777,587✔
958

959
      p = *(SSDataBlock**)tmp;
125,777,587✔
960
      code = copyDataBlock(p, pRes);
125,866,288✔
961
      QUERY_CHECK_CODE(code, lino, _end);
125,868,205✔
962
    }
963

964
    blockIndex += 1;
793,735,962✔
965

966
    current += p->info.rows;
793,735,962✔
967
    QUERY_CHECK_CONDITION((p->info.rows > 0), code, lino, _end,
793,743,424✔
968
                          TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
969
    void* tmp = taosArrayPush(pResList, &p);
793,733,270✔
970
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
793,733,270✔
971

972
    if (current >= rowsThreshold || processOneBlock) {
793,733,270✔
973
      break;
974
    }
975

976
    code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
742,454,454✔
977
    QUERY_CHECK_CODE(code, lino, _end);
742,410,216✔
978
    code = blockDataCheck(pRes);
742,410,216✔
979
    QUERY_CHECK_CODE(code, lino, _end);
742,482,252✔
980
  }
981

982
  if (pTaskInfo->pSubplan->dynamicRowThreshold) {
400,531,724✔
983
    pTaskInfo->pSubplan->rowsThreshold -= current;
544,846✔
984
  }
985

986
  *hasMore = (pRes != NULL);
400,539,861✔
987
  uint64_t el = (taosGetTimestampUs() - st);
400,516,578✔
988

989
  pTaskInfo->cost.elapsedTime += el;
400,516,578✔
990
  if (NULL == pRes) {
400,536,804✔
991
    *useconds = pTaskInfo->cost.elapsedTime;
349,258,000✔
992
  }
993

994
_end:
400,541,808✔
995
  (void)cleanUpUdfs();
400,529,044✔
996

997
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
400,563,780✔
998
  qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
400,563,780✔
999
         GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
1000

1001
  atomic_store_64(&pTaskInfo->owner, 0);
400,563,780✔
1002
  if (code) {
400,563,780✔
1003
    pTaskInfo->code = code;
×
1004
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1005
  }
1006

1007
  return pTaskInfo->code;
400,563,780✔
1008
}
1009

1010
void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
×
1011
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
1012
  SArray*        pList = pTaskInfo->pResultBlockList;
×
1013
  size_t         num = taosArrayGetSize(pList);
×
1014
  for (int32_t i = 0; i < num; ++i) {
×
1015
    SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i);
×
1016
    if (p) {
×
1017
      blockDataDestroy(*p);
×
1018
    }
1019
  }
1020

1021
  taosArrayClear(pTaskInfo->pResultBlockList);
×
1022
}
×
1023

1024
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
24,863,927✔
1025
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
24,863,927✔
1026
  int64_t        threadId = taosGetSelfPthreadId();
24,863,927✔
1027
  int64_t        curOwner = 0;
24,863,927✔
1028

1029
  *pRes = NULL;
24,863,927✔
1030

1031
  // todo extract method
1032
  taosRLockLatch(&pTaskInfo->lock);
24,863,927✔
1033
  bool isKilled = isTaskKilled(pTaskInfo);
24,863,927✔
1034
  if (isKilled) {
24,863,927✔
1035
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
×
1036

1037
    taosRUnLockLatch(&pTaskInfo->lock);
×
1038
    return pTaskInfo->code;
×
1039
  }
1040

1041
  if (pTaskInfo->owner != 0) {
24,863,927✔
1042
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
×
1043
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
×
1044

1045
    taosRUnLockLatch(&pTaskInfo->lock);
×
1046
    return pTaskInfo->code;
×
1047
  }
1048

1049
  pTaskInfo->owner = threadId;
24,863,927✔
1050
  taosRUnLockLatch(&pTaskInfo->lock);
24,863,927✔
1051

1052
  if (pTaskInfo->cost.start == 0) {
24,863,927✔
1053
    pTaskInfo->cost.start = taosGetTimestampUs();
232,218✔
1054
  }
1055

1056
  // error occurs, record the error code and return to client
1057
  int32_t ret = setjmp(pTaskInfo->env);
24,863,927✔
1058
  if (ret != TSDB_CODE_SUCCESS) {
24,863,927✔
1059
    pTaskInfo->code = ret;
×
1060
    (void)cleanUpUdfs();
×
1061
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
×
1062
    atomic_store_64(&pTaskInfo->owner, 0);
×
1063
    return pTaskInfo->code;
×
1064
  }
1065

1066
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
24,863,927✔
1067

1068
  int64_t st = taosGetTimestampUs();
24,863,927✔
1069
  int32_t code = TSDB_CODE_SUCCESS;
24,863,927✔
1070
  if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
24,863,927✔
1071
    pTaskInfo->paramSet = true;
×
1072
    code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, pRes);
×
1073
  } else {
1074
    code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, pRes);
24,863,927✔
1075
  }
1076
  if (code) {
24,862,903✔
1077
    pTaskInfo->code = code;
×
1078
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
1079
  }
1080

1081
  code = blockDataCheck(*pRes);
24,862,903✔
1082
  if (code) {
24,863,927✔
1083
    pTaskInfo->code = code;
×
1084
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
1085
  }
1086

1087
  uint64_t el = (taosGetTimestampUs() - st);
24,863,563✔
1088

1089
  pTaskInfo->cost.elapsedTime += el;
24,863,563✔
1090
  if (NULL == *pRes) {
24,863,611✔
1091
    *useconds = pTaskInfo->cost.elapsedTime;
11,400,148✔
1092
  }
1093

1094
  (void)cleanUpUdfs();
24,863,611✔
1095

1096
  int32_t  current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
24,863,927✔
1097
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
24,863,927✔
1098

1099
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
24,863,927✔
1100
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
1101

1102
  atomic_store_64(&pTaskInfo->owner, 0);
24,863,611✔
1103
  return pTaskInfo->code;
24,863,927✔
1104
}
1105

1106
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
105,625,413✔
1107
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
105,625,413✔
1108
  void* tmp = taosArrayPush(pTaskInfo->stopInfo.pStopInfo, pInfo);
105,625,433✔
1109
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
105,625,800✔
1110

1111
  if (!tmp) {
105,625,046✔
1112
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1113
    return terrno;
×
1114
  }
1115
  return TSDB_CODE_SUCCESS;
105,625,046✔
1116
}
1117

1118
int32_t stopInfoComp(void const* lp, void const* rp) {
×
1119
  SExchangeOpStopInfo* key = (SExchangeOpStopInfo*)lp;
×
1120
  SExchangeOpStopInfo* pInfo = (SExchangeOpStopInfo*)rp;
×
1121

1122
  if (key->refId < pInfo->refId) {
×
1123
    return -1;
×
1124
  } else if (key->refId > pInfo->refId) {
×
1125
    return 1;
×
1126
  }
1127

1128
  return 0;
×
1129
}
1130

1131
void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
×
1132
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
×
1133
  int32_t idx = taosArraySearchIdx(pTaskInfo->stopInfo.pStopInfo, pInfo, stopInfoComp, TD_EQ);
×
1134
  if (idx >= 0) {
×
1135
    taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx);
×
1136
  }
1137
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
×
1138
}
×
1139

1140
void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
88,076✔
1141
  if (pTaskInfo->pSubJobCtx) {
88,076✔
1142
    pTaskInfo->pSubJobCtx->code = pTaskInfo->code;
21,920✔
1143
    int32_t code = tsem_post(&pTaskInfo->pSubJobCtx->ready);
21,920✔
1144
  }
1145
  
1146
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
88,076✔
1147

1148
  int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo);
88,076✔
1149
  for (int32_t i = 0; i < num; ++i) {
90,358✔
1150
    SExchangeOpStopInfo* pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i);
2,282✔
1151
    if (!pStop) {
2,282✔
1152
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1153
      continue;
×
1154
    }
1155
    SExchangeInfo* pExchangeInfo = taosAcquireRef(fetchObjRefPool, pStop->refId);
2,282✔
1156
    if (pExchangeInfo) {
2,282✔
1157
      int32_t code = tsem_post(&pExchangeInfo->ready);
2,282✔
1158
      if (code != TSDB_CODE_SUCCESS) {
2,282✔
1159
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1160
      } else {
1161
        qDebug("post to exchange %" PRId64 " to stop", pStop->refId);
2,282✔
1162
      }
1163
      code = taosReleaseRef(fetchObjRefPool, pStop->refId);
2,282✔
1164
      if (code != TSDB_CODE_SUCCESS) {
2,282✔
1165
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1166
      }
1167
    }
1168
  }
1169

1170
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
88,076✔
1171
}
88,076✔
1172

1173
int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
88,076✔
1174
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
88,076✔
1175
  if (pTaskInfo == NULL) {
88,076✔
1176
    return TSDB_CODE_QRY_INVALID_QHANDLE;
×
1177
  }
1178

1179
  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
88,076✔
1180

1181
  setTaskKilled(pTaskInfo, rspCode);
88,076✔
1182
  qStopTaskOperators(pTaskInfo);
88,076✔
1183

1184
  return TSDB_CODE_SUCCESS;
88,076✔
1185
}
1186

1187
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration) {
×
1188
  int64_t        st = taosGetTimestampMs();
×
1189
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
1190
  if (pTaskInfo == NULL) {
×
1191
    return TSDB_CODE_QRY_INVALID_QHANDLE;
×
1192
  }
1193

1194
  if (waitDuration > 0) {
×
1195
    qDebug("%s sync killed execTask, and waiting for at most %.2fs", GET_TASKID(pTaskInfo), waitDuration / 1000.0);
×
1196
  } else {
1197
    qDebug("%s async killed execTask", GET_TASKID(pTaskInfo));
×
1198
  }
1199

1200
  setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
×
1201

1202
  if (waitDuration > 0) {
×
1203
    while (1) {
1204
      taosWLockLatch(&pTaskInfo->lock);
×
1205
      if (qTaskIsExecuting(pTaskInfo)) {  // let's wait for 100 ms and try again
×
1206
        taosWUnLockLatch(&pTaskInfo->lock);
×
1207

1208
        taosMsleep(200);
×
1209

1210
        int64_t d = taosGetTimestampMs() - st;
×
1211
        if (d >= waitDuration && waitDuration >= 0) {
×
1212
          qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitDuration / 1000.0);
×
1213
          return TSDB_CODE_SUCCESS;
×
1214
        }
1215
      } else {  // not running now
1216
        pTaskInfo->code = rspCode;
×
1217
        taosWUnLockLatch(&pTaskInfo->lock);
×
1218
        return TSDB_CODE_SUCCESS;
×
1219
      }
1220
    }
1221
  }
1222

1223
  int64_t et = taosGetTimestampMs() - st;
×
1224
  if (et < waitDuration) {
×
1225
    qInfo("%s  waiting %.2fs for executor stopping", GET_TASKID(pTaskInfo), et / 1000.0);
×
1226
    return TSDB_CODE_SUCCESS;
×
1227
  }
1228
  return TSDB_CODE_SUCCESS;
×
1229
}
1230

1231
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
×
1232
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
×
1233
  if (NULL == pTaskInfo) {
×
1234
    return false;
×
1235
  }
1236

1237
  return 0 != atomic_load_64(&pTaskInfo->owner);
×
1238
}
1239

1240
static void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
349,064,139✔
1241
  STaskCostInfo* pSummary = &pTaskInfo->cost;
349,064,139✔
1242
  int64_t        idleTime = pSummary->start - pSummary->created;
349,073,181✔
1243

1244
  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
349,076,415✔
1245
  if (pSummary->pRecoder != NULL) {
349,055,149✔
1246
    qDebug(
257,600,147✔
1247
        "%s :cost summary: idle:%.2f ms, elapsed time:%.2f ms, extract tableList:%.2f ms, "
1248
        "createGroupIdMap:%.2f ms, total blocks:%d, "
1249
        "load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
1250
        GET_TASKID(pTaskInfo), idleTime / 1000.0, pSummary->elapsedTime / 1000.0, pSummary->extractListTime,
1251
        pSummary->groupIdMapTime, pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks,
1252
        pRecorder->totalRows, pRecorder->totalCheckedRows);
1253
  } else {
1254
    qDebug("%s :cost summary: idle in queue:%.2f ms, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), idleTime / 1000.0,
91,443,931✔
1255
           pSummary->elapsedTime / 1000.0);
1256
  }
1257
}
349,044,078✔
1258

1259

1260
void qDestroyTask(qTaskInfo_t qTaskHandle) {
356,372,467✔
1261
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
356,372,467✔
1262
  if (pTaskInfo == NULL) {
356,372,467✔
1263
    return;
7,309,683✔
1264
  }
1265

1266
  if (pTaskInfo->pRoot != NULL) {
349,062,784✔
1267
    qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
349,067,191✔
1268
  } else {
1269
    qDebug("%s execTask completed", GET_TASKID(pTaskInfo));
×
1270
  }
1271

1272
  printTaskExecCostInLog(pTaskInfo);  // print the query cost summary
349,070,191✔
1273
  doDestroyTask(pTaskInfo);
349,063,917✔
1274
}
1275

1276
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
3,868,370✔
1277
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
3,868,370✔
1278
  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
3,868,370✔
1279
}
1280

1281
void qExtractTmqScanner(qTaskInfo_t tinfo, void** scanner) {
337,298✔
1282
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
337,298✔
1283
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
337,298✔
1284

1285
  while (1) {
330,911✔
1286
    uint16_t type = pOperator->operatorType;
668,209✔
1287
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
668,209✔
1288
      *scanner = pOperator->info;
337,298✔
1289
      break;
337,298✔
1290
    } else {
1291
      pOperator = pOperator->pDownstream[0];
330,911✔
1292
    }
1293
  }
1294
}
337,298✔
1295

1296
void* qExtractReaderFromTmqScanner(void* scanner) {
337,298✔
1297
  STmqQueryScanInfo* pInfo = scanner;
337,298✔
1298
  return (void*)pInfo->tqReader;
337,298✔
1299
}
1300

1301
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) {
756,933✔
1302
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
756,933✔
1303
  return pTaskInfo->tmqInfo.schema;
756,933✔
1304
}
1305

1306
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
756,933✔
1307
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
756,933✔
1308
  return pTaskInfo->tmqInfo.tbName;
756,933✔
1309
}
1310

1311
SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
834,777✔
1312
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
834,777✔
1313
  return &pTaskInfo->tmqInfo.btMetaRsp;
834,777✔
1314
}
1315

1316
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
24,150,485✔
1317
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
24,150,485✔
1318
  tOffsetCopy(pOffset, &pTaskInfo->tmqInfo.currentOffset);
24,150,485✔
1319
  return 0;
24,150,485✔
1320
}
1321

1322
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
799,730✔
1323
  memset(pCond, 0, sizeof(SQueryTableDataCond));
799,730✔
1324
  pCond->order = TSDB_ORDER_ASC;
799,730✔
1325
  pCond->numOfCols = pMtInfo->schema->nCols;
800,362✔
1326
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
800,362✔
1327
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
799,730✔
1328
  if (pCond->colList == NULL || pCond->pSlotList == NULL) {
800,046✔
1329
    taosMemoryFreeClear(pCond->colList);
316✔
1330
    taosMemoryFreeClear(pCond->pSlotList);
×
1331
    return terrno;
×
1332
  }
1333

1334
  TAOS_SET_OBJ_ALIGNED(&pCond->twindows, TSWINDOW_INITIALIZER);
799,730✔
1335
  pCond->suid = pMtInfo->suid;
800,046✔
1336
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
800,046✔
1337
  pCond->startVersion = -1;
800,362✔
1338
  pCond->endVersion = sContext->snapVersion;
799,730✔
1339

1340
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
5,134,269✔
1341
    SColumnInfo* pColInfo = &pCond->colList[i];
4,333,004✔
1342
    pColInfo->type = pMtInfo->schema->pSchema[i].type;
4,333,320✔
1343
    pColInfo->bytes = pMtInfo->schema->pSchema[i].bytes;
4,334,268✔
1344
    if (pMtInfo->pExtSchemas != NULL) {
4,333,636✔
1345
      decimalFromTypeMod(pMtInfo->pExtSchemas[i].typeMod, &pColInfo->precision, &pColInfo->scale);
26,640✔
1346
    }
1347
    pColInfo->colId = pMtInfo->schema->pSchema[i].colId;
4,333,636✔
1348
    pColInfo->pk = pMtInfo->schema->pSchema[i].flags & COL_IS_KEY;
4,333,907✔
1349

1350
    pCond->pSlotList[i] = i;
4,333,636✔
1351
  }
1352

1353
  return TSDB_CODE_SUCCESS;
800,362✔
1354
}
1355

1356
void qStreamSetOpen(qTaskInfo_t tinfo) {
23,296,892✔
1357
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
23,296,892✔
1358
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
23,296,892✔
1359
  pOperator->status = OP_NOT_OPENED;
23,299,809✔
1360
}
23,297,989✔
1361

1362
void qStreamSetParams(qTaskInfo_t tinfo, int8_t sourceExcluded, int32_t minPollRows, int64_t timeout, int8_t enableReplay) {
23,337,353✔
1363
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
23,337,353✔
1364
  pTaskInfo->tmqInfo.sourceExcluded = sourceExcluded;
23,337,353✔
1365
  pTaskInfo->tmqInfo.minPollRows = minPollRows;
23,342,729✔
1366
  pTaskInfo->tmqInfo.timeout = timeout;
23,330,145✔
1367
  pTaskInfo->tmqInfo.enableReplay = enableReplay;
23,332,809✔
1368
}
23,335,304✔
1369

1370
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
24,332,004✔
1371
  int32_t        code = TSDB_CODE_SUCCESS;
24,332,004✔
1372
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
24,332,004✔
1373
  SStorageAPI*   pAPI = &pTaskInfo->storageAPI;
24,332,004✔
1374

1375
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
24,334,125✔
1376
  const char*    id = GET_TASKID(pTaskInfo);
24,332,771✔
1377

1378
  if (subType == TOPIC_SUB_TYPE__COLUMN && pOffset->type == TMQ_OFFSET__LOG) {
24,332,722✔
1379
    code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
22,987,432✔
1380
    if (pOperator == NULL || code != 0) {
22,983,231✔
1381
      return code;
×
1382
    }
1383

1384
    STmqQueryScanInfo* pInfo = pOperator->info;
22,986,342✔
1385
    SStoreTqReader*  pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
22,986,140✔
1386
    SWalReader*      pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
22,986,042✔
1387
    walReaderVerifyOffset(pWalReader, pOffset);
22,988,089✔
1388
  }
1389
  // if pOffset equal to current offset, means continue consume
1390
  if (tOffsetEqual(pOffset, &pTaskInfo->tmqInfo.currentOffset)) {
24,329,595✔
1391
    return 0;
22,349,598✔
1392
  }
1393

1394
  if (subType == TOPIC_SUB_TYPE__COLUMN) {
1,980,038✔
1395
    code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
1,168,206✔
1396
    if (pOperator == NULL || code != 0) {
1,168,206✔
1397
      return code;
×
1398
    }
1399

1400
    STmqQueryScanInfo* pInfo = pOperator->info;
1,168,872✔
1401
    STableScanInfo*  pScanInfo = pInfo->pTableScanOp->info;
1,167,532✔
1402
    STableScanBase*  pScanBaseInfo = &pScanInfo->base;
1,167,607✔
1403
    STableListInfo*  pTableListInfo = pScanBaseInfo->pTableListInfo;
1,167,272✔
1404

1405
    if (pOffset->type == TMQ_OFFSET__LOG) {
1,167,591✔
1406
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pScanBaseInfo->dataReader);
945,768✔
1407
      pScanBaseInfo->dataReader = NULL;
946,087✔
1408

1409
      SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
945,492✔
1410
      SWalReader*     pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
946,087✔
1411
      walReaderVerifyOffset(pWalReader, pOffset);
945,723✔
1412
      code = pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version, id);
945,723✔
1413
      if (code < 0) {
946,087✔
1414
        qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version, id);
13,006✔
1415
        return code;
13,006✔
1416
      }
1417
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
221,138✔
1418
      // iterate all tables from tableInfoList, and retrieve rows from each table one-by-one
1419
      // those data are from the snapshot in tsdb, besides the data in the wal file.
1420
      int64_t uid = pOffset->uid;
222,119✔
1421
      int64_t ts = pOffset->ts;
222,119✔
1422
      int32_t index = 0;
220,791✔
1423

1424
      // this value may be changed if new tables are created
1425
      taosRLockLatch(&pTaskInfo->lock);
220,791✔
1426
      int32_t numOfTables = 0;
222,785✔
1427
      code = tableListGetSize(pTableListInfo, &numOfTables);
222,785✔
1428
      if (code != TSDB_CODE_SUCCESS) {
222,115✔
1429
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1430
        taosRUnLockLatch(&pTaskInfo->lock);
×
1431
        return code;
×
1432
      }
1433

1434
      if (uid == 0) {
222,115✔
1435
        if (numOfTables != 0) {
216,679✔
1436
          STableKeyInfo* tmp = tableListGetInfo(pTableListInfo, 0);
41,601✔
1437
          if (!tmp) {
41,601✔
1438
            qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1439
            taosRUnLockLatch(&pTaskInfo->lock);
×
1440
            return terrno;
×
1441
          }
1442
          if (tmp) uid = tmp->uid;
41,601✔
1443
          ts = INT64_MIN;
41,601✔
1444
          pScanInfo->currentTable = 0;
41,601✔
1445
        } else {
1446
          taosRUnLockLatch(&pTaskInfo->lock);
175,078✔
1447
          qError("no table in table list, %s", id);
175,744✔
1448
          return TSDB_CODE_TMQ_NO_TABLE_QUALIFIED;
175,744✔
1449
        }
1450
      }
1451
      pTaskInfo->storageAPI.tqReaderFn.tqSetTablePrimaryKey(pInfo->tqReader, uid);
46,702✔
1452

1453
      qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% " PRId64 " rows returned", uid, ts,
47,041✔
1454
             pInfo->pTableScanOp->resultInfo.totalRows);
1455
      pInfo->pTableScanOp->resultInfo.totalRows = 0;
47,041✔
1456

1457
      // start from current accessed position
1458
      // we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start
1459
      // position, let's find it from the beginning.
1460
      index = tableListFind(pTableListInfo, uid, 0);
47,041✔
1461
      taosRUnLockLatch(&pTaskInfo->lock);
47,041✔
1462

1463
      if (index >= 0) {
47,041✔
1464
        pScanInfo->currentTable = index;
47,041✔
1465
      } else {
1466
        qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
×
1467
               numOfTables, pScanInfo->currentTable, id);
1468
        return TSDB_CODE_TMQ_NO_TABLE_QUALIFIED;
×
1469
      }
1470

1471
      STableKeyInfo keyInfo = {.uid = uid};
47,041✔
1472
      int64_t       oldSkey = pScanBaseInfo->cond.twindows.skey;
47,041✔
1473

1474
      // let's start from the next ts that returned to consumer.
1475
      if (pTaskInfo->storageAPI.tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader)) {
47,041✔
1476
        pScanBaseInfo->cond.twindows.skey = ts;
972✔
1477
      } else {
1478
        pScanBaseInfo->cond.twindows.skey = ts + 1;
46,069✔
1479
      }
1480
      pScanInfo->scanTimes = 0;
47,041✔
1481

1482
      if (pScanBaseInfo->dataReader == NULL) {
47,041✔
1483
        code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond,
82,940✔
1484
                                                             &keyInfo, 1, pScanInfo->pResBlock,
1485
                                                             (void**)&pScanBaseInfo->dataReader, id, NULL);
41,470✔
1486
        if (code != TSDB_CODE_SUCCESS) {
41,470✔
1487
          qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
×
1488
          return code;
×
1489
        }
1490

1491
        qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s",
41,470✔
1492
               uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
1493
      } else {
1494
        code = pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
5,571✔
1495
        if (code != TSDB_CODE_SUCCESS) {
5,571✔
1496
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1497
          return code;
×
1498
        }
1499

1500
        code = pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
5,571✔
1501
        if (code != TSDB_CODE_SUCCESS) {
5,571✔
1502
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1503
          return code;
×
1504
        }
1505
        qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 "  table index:%d numOfTable:%d, %s",
5,571✔
1506
               uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
1507
      }
1508

1509
      // restore the key value
1510
      pScanBaseInfo->cond.twindows.skey = oldSkey;
47,347✔
1511
    } else {
1512
      qError("invalid pOffset->type:%d, %s", pOffset->type, id);
×
1513
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1514
    }
1515

1516
  } else {  // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB
1517
    if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
811,832✔
1518
      STmqRawScanInfo* pInfo = pOperator->info;
800,994✔
1519
      SSnapContext*       sContext = pInfo->sContext;
800,994✔
1520
      SOperatorInfo*      p = NULL;
800,994✔
1521

1522
      code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id, &p);
800,994✔
1523
      if (code != 0) {
800,994✔
1524
        return code;
×
1525
      }
1526

1527
      STableListInfo* pTableListInfo = ((STmqRawScanInfo*)(p->info))->pTableListInfo;
800,994✔
1528

1529
      if (pAPI->snapshotFn.setForSnapShot(sContext, pOffset->uid) != 0) {
800,994✔
1530
        qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id);
×
1531
        return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1532
      }
1533

1534
      SMetaTableInfo mtInfo = {0};
800,994✔
1535
      code = pTaskInfo->storageAPI.snapshotFn.getMetaTableInfoFromSnapshot(sContext, &mtInfo);
800,994✔
1536
      if (code != 0) {
800,031✔
1537
        destroyMetaTableInfo(&mtInfo);
1538
        return code;
×
1539
      }
1540
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
800,031✔
1541
      pInfo->dataReader = NULL;
799,369✔
1542

1543
      cleanupQueryTableDataCond(&pTaskInfo->tmqInfo.tableCond);
799,369✔
1544
      tableListClear(pTableListInfo);
800,016✔
1545

1546
      if (mtInfo.uid == 0) {
800,994✔
1547
        destroyMetaTableInfo(&mtInfo);
1548
        goto end;  // no data
632✔
1549
      }
1550

1551
      pAPI->snapshotFn.taosXSetTablePrimaryKey(sContext, mtInfo.uid);
800,362✔
1552
      code = initQueryTableDataCondForTmq(&pTaskInfo->tmqInfo.tableCond, sContext, &mtInfo);
799,730✔
1553
      if (code != TSDB_CODE_SUCCESS) {
800,362✔
1554
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1555
        destroyMetaTableInfo(&mtInfo);
1556
        return code;
×
1557
      }
1558
      if (pAPI->snapshotFn.taosXGetTablePrimaryKey(sContext)) {
800,362✔
1559
        pTaskInfo->tmqInfo.tableCond.twindows.skey = pOffset->ts;
1,385✔
1560
      } else {
1561
        pTaskInfo->tmqInfo.tableCond.twindows.skey = pOffset->ts + 1;
798,616✔
1562
      }
1563

1564
      code = tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0);
800,362✔
1565
      if (code != TSDB_CODE_SUCCESS) {
800,362✔
1566
        destroyMetaTableInfo(&mtInfo);
1567
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1568
        return code;
×
1569
      }
1570

1571
      STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
800,362✔
1572
      if (!pList) {
800,362✔
1573
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1574
        destroyMetaTableInfo(&mtInfo);
1575
        return code;
×
1576
      }
1577
      int32_t size = 0;
800,362✔
1578
      code = tableListGetSize(pTableListInfo, &size);
800,362✔
1579
      if (code != TSDB_CODE_SUCCESS) {
800,362✔
1580
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1581
        destroyMetaTableInfo(&mtInfo);
1582
        return code;
×
1583
      }
1584

1585
      code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->tmqInfo.tableCond, pList, size,
1,600,724✔
1586
                                                           NULL, (void**)&pInfo->dataReader, NULL, NULL);
800,362✔
1587
      if (code != TSDB_CODE_SUCCESS) {
800,362✔
1588
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1589
        destroyMetaTableInfo(&mtInfo);
1590
        return code;
×
1591
      }
1592

1593
      cleanupQueryTableDataCond(&pTaskInfo->tmqInfo.tableCond);
800,362✔
1594
      tstrncpy(pTaskInfo->tmqInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN);
800,362✔
1595
      //      pTaskInfo->tmqInfo.suid = mtInfo.suid == 0 ? mtInfo.uid : mtInfo.suid;
1596
      tDeleteSchemaWrapper(pTaskInfo->tmqInfo.schema);
800,362✔
1597
      pTaskInfo->tmqInfo.schema = mtInfo.schema;
800,362✔
1598
      taosMemoryFreeClear(mtInfo.pExtSchemas);
800,362✔
1599

1600
      qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64 " %s", mtInfo.uid, pOffset->ts, id);
800,362✔
1601
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
10,838✔
1602
      STmqRawScanInfo* pInfo = pOperator->info;
2,316✔
1603
      SSnapContext*       sContext = pInfo->sContext;
2,316✔
1604
      code = pTaskInfo->storageAPI.snapshotFn.setForSnapShot(sContext, pOffset->uid);
2,316✔
1605
      if (code != 0) {
2,316✔
1606
        qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
×
1607
        return code;
×
1608
      }
1609
      qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts,
2,316✔
1610
             id);
1611
    } else if (pOffset->type == TMQ_OFFSET__LOG) {
8,522✔
1612
      STmqRawScanInfo* pInfo = pOperator->info;
8,522✔
1613
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
8,522✔
1614
      pInfo->dataReader = NULL;
8,522✔
1615
      qDebug("tmqsnap qStreamPrepareScan snapshot log, %s", id);
8,522✔
1616
    }
1617
  }
1618

1619
end:
1,791,844✔
1620
  tOffsetCopy(&pTaskInfo->tmqInfo.currentOffset, pOffset);
1,791,265✔
1621
  return 0;
1,791,954✔
1622
}
1623

1624
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
268,208,021✔
1625
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
268,208,021✔
1626
  if (pMsg->info.ahandle == NULL) {
268,247,386✔
1627
    rpcFreeCont(pMsg->pCont);
×
1628
    qError("rsp msg got while pMsg->info.ahandle is NULL, 0x%" PRIx64 ":0x%" PRIx64, TRACE_GET_ROOTID(&pMsg->info.traceId), TRACE_GET_MSGID(&pMsg->info.traceId));
×
1629
    return;
×
1630
  }
1631

1632
  qDebug("rsp msg got, code:%x, len:%d, 0x%" PRIx64 ":0x%" PRIx64, 
268,176,900✔
1633
      pMsg->code, pMsg->contLen, TRACE_GET_ROOTID(&pMsg->info.traceId), TRACE_GET_MSGID(&pMsg->info.traceId));
1634

1635
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};
268,186,695✔
1636

1637
  if (pMsg->contLen > 0) {
268,231,657✔
1638
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
268,199,466✔
1639
    if (buf.pData == NULL) {
268,152,901✔
1640
      pMsg->code = terrno;
×
1641
    } else {
1642
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
268,152,901✔
1643
    }
1644
  }
1645

1646
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
268,230,284✔
1647
  rpcFreeCont(pMsg->pCont);
268,276,310✔
1648
  destroySendMsgInfo(pSendInfo);
268,224,056✔
1649
}
1650

1651
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
×
1652
  int32_t        code = TSDB_CODE_SUCCESS;
×
1653
  int32_t        lino = 0;
×
1654
  SExecTaskInfo* pTaskInfo = tinfo;
×
1655
  SArray*        plist = NULL;
×
1656

1657
  code = getTableListInfo(pTaskInfo, &plist);
×
1658
  if (code || plist == NULL) {
×
1659
    return NULL;
×
1660
  }
1661

1662
  // only extract table in the first elements
1663
  STableListInfo* pTableListInfo = taosArrayGetP(plist, 0);
×
1664

1665
  SArray* pUidList = taosArrayInit(10, sizeof(uint64_t));
×
1666
  QUERY_CHECK_NULL(pUidList, code, lino, _end, terrno);
×
1667

1668
  int32_t numOfTables = 0;
×
1669
  code = tableListGetSize(pTableListInfo, &numOfTables);
×
1670
  QUERY_CHECK_CODE(code, lino, _end);
×
1671

1672
  for (int32_t i = 0; i < numOfTables; ++i) {
×
1673
    STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
×
1674
    QUERY_CHECK_NULL(pKeyInfo, code, lino, _end, terrno);
×
1675
    void* tmp = taosArrayPush(pUidList, &pKeyInfo->uid);
×
1676
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1677
  }
1678

1679
  taosArrayDestroy(plist);
×
1680

1681
_end:
×
1682
  if (code != TSDB_CODE_SUCCESS) {
×
1683
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1684
    T_LONG_JMP(pTaskInfo->env, code);
×
1685
  }
1686
  return pUidList;
×
1687
}
1688

1689
static int32_t extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
3,537,906✔
1690
  int32_t        code = TSDB_CODE_SUCCESS;
3,537,906✔
1691
  int32_t        lino = 0;
3,537,906✔
1692
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3,537,906✔
1693

1694
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
3,537,906✔
1695
    STmqQueryScanInfo* pScanInfo = pOperator->info;
×
1696
    STableScanInfo*  pTableScanInfo = pScanInfo->pTableScanOp->info;
×
1697

1698
    void* tmp = taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
×
1699
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1700
  } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
3,537,336✔
1701
    STableScanInfo* pScanInfo = pOperator->info;
1,768,953✔
1702

1703
    void* tmp = taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
1,768,953✔
1704
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,769,523✔
1705
  } else {
1706
    if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
1,768,953✔
1707
      code = extractTableList(pList, pOperator->pDownstream[0]);
1,769,523✔
1708
    }
1709
  }
1710

1711
_end:
×
1712
  if (code != TSDB_CODE_SUCCESS) {
3,539,046✔
1713
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1714
  }
1715
  return code;
3,538,476✔
1716
}
1717

1718
int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList) {
1,768,953✔
1719
  if (pList == NULL) {
1,768,953✔
1720
    return TSDB_CODE_INVALID_PARA;
×
1721
  }
1722

1723
  *pList = NULL;
1,768,953✔
1724
  SArray* pArray = taosArrayInit(0, POINTER_BYTES);
1,769,523✔
1725
  if (pArray == NULL) {
1,768,388✔
1726
    return terrno;
×
1727
  }
1728

1729
  int32_t code = extractTableList(pArray, pTaskInfo->pRoot);
1,768,388✔
1730
  if (code == 0) {
1,769,523✔
1731
    *pList = pArray;
1,769,523✔
1732
  } else {
1733
    taosArrayDestroy(pArray);
×
1734
  }
1735
  return code;
1,769,523✔
1736
}
1737

1738
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) {
×
1739
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
×
1740
  if (pTaskInfo->pRoot->fpSet.releaseStreamStateFn != NULL) {
×
1741
    pTaskInfo->pRoot->fpSet.releaseStreamStateFn(pTaskInfo->pRoot);
×
1742
  }
1743
  return 0;
×
1744
}
1745

1746
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo) {
×
1747
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
×
1748
  if (pTaskInfo->pRoot->fpSet.reloadStreamStateFn != NULL) {
×
1749
    pTaskInfo->pRoot->fpSet.reloadStreamStateFn(pTaskInfo->pRoot);
×
1750
  }
1751
  return 0;
×
1752
}
1753

1754
void qResetTaskCode(qTaskInfo_t tinfo) {
×
1755
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
1756

1757
  int32_t code = pTaskInfo->code;
×
1758
  pTaskInfo->code = 0;
×
1759
  qDebug("0x%" PRIx64 " reset task code to be success, prev:%s", pTaskInfo->id.taskId, tstrerror(code));
×
1760
}
×
1761

1762
int32_t collectExprsToReplaceForStream(SOperatorInfo* pOper, SArray* pExprs) {
×
1763
  int32_t code = 0;
×
1764
  return code;
×
1765
}
1766

1767
int32_t streamCollectExprsForReplace(qTaskInfo_t tInfo, SArray* pExprs) {
×
1768
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
×
1769
  int32_t        code = collectExprsToReplaceForStream(pTaskInfo->pRoot, pExprs);
×
1770
  return code;
×
1771
}
1772

1773
int32_t clearStatesForOperator(SOperatorInfo* pOper) {
29,326,515✔
1774
  int32_t code = 0;
29,326,515✔
1775

1776
  freeResetOperatorParams(pOper, OP_GET_PARAM, true);
29,326,515✔
1777
  freeResetOperatorParams(pOper, OP_NOTIFY_PARAM, true);
29,324,824✔
1778

1779
  if (pOper->fpSet.resetStateFn) {
29,325,214✔
1780
    code = pOper->fpSet.resetStateFn(pOper);
29,326,091✔
1781
  }
1782
  pOper->status = OP_NOT_OPENED;
29,322,500✔
1783
  for (int32_t i = 0; i < pOper->numOfDownstream && code == 0; ++i) {
50,092,475✔
1784
    code = clearStatesForOperator(pOper->pDownstream[i]);
20,764,722✔
1785
  }
1786
  return code;
29,328,088✔
1787
}
1788

1789
int32_t streamClearStatesForOperators(qTaskInfo_t tInfo) {
8,563,058✔
1790
  int32_t        code = 0;
8,563,058✔
1791
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
8,563,058✔
1792
  SOperatorInfo* pOper = pTaskInfo->pRoot;
8,563,058✔
1793
  pTaskInfo->code = TSDB_CODE_SUCCESS;
8,563,058✔
1794
  code = clearStatesForOperator(pOper);
8,563,058✔
1795
  return code;
8,562,363✔
1796
}
1797

1798
int32_t streamExecuteTask(qTaskInfo_t tInfo, SSDataBlock** ppRes, uint64_t* useconds, bool* finished) {
12,007,567✔
1799
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
12,007,567✔
1800
  int64_t        threadId = taosGetSelfPthreadId();
12,007,567✔
1801
  int64_t        curOwner = 0;
12,007,802✔
1802

1803
  *ppRes = NULL;
12,007,802✔
1804

1805
  // todo extract method
1806
  taosRLockLatch(&pTaskInfo->lock);
12,007,802✔
1807
  bool isKilled = isTaskKilled(pTaskInfo);
12,007,802✔
1808
  if (isKilled) {
12,007,802✔
1809
    // clearStreamBlock(pTaskInfo->pRoot);
1810
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
×
1811

1812
    taosRUnLockLatch(&pTaskInfo->lock);
×
1813
    return pTaskInfo->code;
×
1814
  }
1815

1816
  if (pTaskInfo->owner != 0) {
12,007,802✔
1817
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
×
1818
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
×
1819

1820
    taosRUnLockLatch(&pTaskInfo->lock);
×
1821
    return pTaskInfo->code;
×
1822
  }
1823

1824
  pTaskInfo->owner = threadId;
12,007,567✔
1825
  taosRUnLockLatch(&pTaskInfo->lock);
12,007,802✔
1826

1827
  if (pTaskInfo->cost.start == 0) {
12,007,332✔
1828
    pTaskInfo->cost.start = taosGetTimestampUs();
374,781✔
1829
  }
1830

1831
  // error occurs, record the error code and return to client
1832
  int32_t ret = setjmp(pTaskInfo->env);
12,007,332✔
1833
  if (ret != TSDB_CODE_SUCCESS) {
13,758,366✔
1834
    pTaskInfo->code = ret;
1,752,479✔
1835
    (void)cleanUpUdfs();
1,752,479✔
1836
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
1,752,479✔
1837
    atomic_store_64(&pTaskInfo->owner, 0);
1,752,479✔
1838
    return pTaskInfo->code;
1,752,479✔
1839
  }
1840

1841
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
12,005,887✔
1842

1843
  int64_t st = taosGetTimestampUs();
12,007,567✔
1844

1845
  int32_t code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, ppRes);
12,007,567✔
1846
  if (code) {
10,254,807✔
1847
    pTaskInfo->code = code;
×
1848
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
1849
  } else {
1850
    *finished = *ppRes == NULL;
10,254,807✔
1851
    code = blockDataCheck(*ppRes);
10,255,075✔
1852
  }
1853
  if (code) {
10,255,075✔
1854
    pTaskInfo->code = code;
×
1855
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
1856
  }
1857

1858
  uint64_t el = (taosGetTimestampUs() - st);
10,254,958✔
1859

1860
  pTaskInfo->cost.elapsedTime += el;
10,254,958✔
1861
  if (NULL == *ppRes) {
10,254,425✔
1862
    *useconds = pTaskInfo->cost.elapsedTime;
6,302,885✔
1863
  }
1864

1865
  (void)cleanUpUdfs();
10,254,177✔
1866

1867
  int32_t  current = (*ppRes != NULL) ? (*ppRes)->info.rows : 0;
10,255,323✔
1868
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
10,255,323✔
1869

1870
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
10,255,323✔
1871
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
1872

1873
  atomic_store_64(&pTaskInfo->owner, 0);
10,255,323✔
1874
  return pTaskInfo->code;
10,255,323✔
1875
}
1876

1877
// void streamSetTaskRuntimeInfo(qTaskInfo_t tinfo, SStreamRuntimeInfo* pStreamRuntimeInfo) {
1878
//   SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1879
//   pTaskInfo->pStreamRuntimeInfo = pStreamRuntimeInfo;
1880
// }
1881

1882
int32_t qStreamCreateTableListForReader(void* pVnode, uint64_t suid, uint64_t uid, int8_t tableType,
281,283✔
1883
                                        SNodeList* pGroupTags, bool groupSort, SNode* pTagCond, SNode* pTagIndexCond,
1884
                                        SStorageAPI* storageAPI, void** pTableListInfo, SHashObj* groupIdMap) {
1885
  int32_t code = 0;                                        
281,283✔
1886
  if (*pTableListInfo != NULL) {
281,283✔
1887
    qDebug("table list already exists, no need to create again");
×
1888
    goto end;
×
1889
  }
1890
  STableListInfo* pList = tableListCreate();
281,409✔
1891
  if (pList == NULL) {
281,204✔
1892
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1893
    code = terrno;
×
1894
    goto end;
×
1895
  }
1896

1897
  SScanPhysiNode pScanNode = {.suid = suid, .uid = uid, .tableType = tableType};
281,204✔
1898
  SReadHandle    pHandle = {.vnode = pVnode};
281,204✔
1899
  SExecTaskInfo  pTaskInfo = {.id.str = "", .storageAPI = *storageAPI};
281,409✔
1900

1901
  code = createScanTableListInfo(&pScanNode, pGroupTags, groupSort, &pHandle, pList, pTagCond, pTagIndexCond, &pTaskInfo, groupIdMap);
281,204✔
1902
  if (code != 0) {
281,409✔
1903
    tableListDestroy(pList);
×
1904
    qError("failed to createScanTableListInfo, code:%s", tstrerror(code));
×
1905
    goto end;
×
1906
  }
1907
  *pTableListInfo = pList;
281,409✔
1908

1909
end:
281,409✔
1910
  return 0;
281,409✔
1911
}
1912

1913
static int32_t doFilterTableByTagCond(void* pVnode, STableListInfo* pListInfo, SArray* pUidList, SNode* pTagCond, SStorageAPI* pStorageAPI){
7,766✔
1914
  int32_t code = doFilterByTagCond(pListInfo->idInfo.suid, pUidList, pTagCond, pVnode, SFLT_NOT_INDEX, pStorageAPI, NULL);
7,766✔
1915
  if (code != 0) {
7,766✔
1916
    return code;
×
1917
  }
1918
  int32_t numOfTables = taosArrayGetSize(pUidList);
7,766✔
1919
  for (int i = 0; i < numOfTables; i++) {
15,532✔
1920
    void* tmp = taosArrayGet(pUidList, i);
7,766✔
1921
    if (tmp == NULL) {
7,766✔
1922
      return terrno;
×
1923
    }
1924
    STableKeyInfo info = {.uid = *(uint64_t*)tmp, .groupId = 0};
7,766✔
1925

1926
    void* p = taosArrayPush(((STableListInfo*)pListInfo)->pTableList, &info);
7,766✔
1927
    if (p == NULL) {
7,766✔
1928
      return terrno;
×
1929
    }
1930
  }
1931
  return code;
7,766✔
1932
}
1933

1934
int32_t qStreamFilterTableListForReader(void* pVnode, SArray* uidList,
7,766✔
1935
                                        SNodeList* pGroupTags, SNode* pTagCond, SNode* pTagIndexCond,
1936
                                        SStorageAPI* storageAPI, SHashObj* groupIdMap, uint64_t suid, SArray** tableList) {
1937
  int32_t code = TSDB_CODE_SUCCESS;
7,766✔
1938
  SArray* uidListCopy = NULL;
7,766✔
1939
  STableListInfo* pList = tableListCreate();
7,766✔
1940
  if (pList == NULL) {
7,766✔
1941
    code = terrno;
×
1942
    goto end;
×
1943
  }
1944
  uidListCopy = taosArrayDup(uidList, NULL);
7,766✔
1945
  if (uidListCopy == NULL) {
7,766✔
1946
    code = terrno;
×
1947
    goto end;
×
1948
  }
1949
  SScanPhysiNode pScanNode = {.suid = suid, .tableType = TD_SUPER_TABLE};
7,766✔
1950
  SReadHandle    pHandle = {.vnode = pVnode};
7,766✔
1951

1952
  pList->idInfo.suid = suid;
7,766✔
1953
  pList->idInfo.tableType = TD_SUPER_TABLE;
7,766✔
1954
  code = doFilterTableByTagCond(pVnode, pList, uidList, pTagCond, storageAPI);
7,766✔
1955
  if (code != TSDB_CODE_SUCCESS) {
7,766✔
1956
    goto end;
×
1957
  }                                              
1958
  code = buildGroupIdMapForAllTables(pList, &pHandle, &pScanNode, pGroupTags, false, NULL, storageAPI, groupIdMap);
7,766✔
1959
  if (code != TSDB_CODE_SUCCESS) {
7,766✔
1960
    goto end;
×
1961
  }
1962
  *tableList = pList->pTableList;
7,766✔
1963
  pList->pTableList = NULL;
7,766✔
1964

1965
  taosArrayClear(uidList);
7,766✔
1966
  for (int32_t i = 0; i < taosArrayGetSize(uidListCopy); i++){
15,532✔
1967
    void* tmp = taosArrayGet(uidListCopy, i);
7,766✔
1968
    if (tmp == NULL) {
7,766✔
1969
      continue;
×
1970
    }
1971
    int32_t* slot = taosHashGet(pList->map, tmp, LONG_BYTES);
7,766✔
1972
    if (slot == NULL) {
7,766✔
1973
      if (taosArrayPush(uidList, tmp) == NULL) {
×
1974
        code = terrno;
×
1975
        goto end;
×
1976
      }
1977
    }
1978
  }
1979
end:
7,766✔
1980
  taosArrayDestroy(uidListCopy);
7,766✔
1981
  tableListDestroy(pList);
7,766✔
1982
  return code;
7,766✔
1983
}
1984

1985
// int32_t qStreamGetGroupIndex(void* pTableListInfo, int64_t gid, TdThreadRwlock* lock) {
1986
//   int32_t index = -1;
1987
//   (void)taosThreadRwlockRdlock(lock);
1988
//   if (((STableListInfo*)pTableListInfo)->groupOffset == NULL){
1989
//     index = 0;
1990
//     goto end;
1991
//   }
1992
//   for (int32_t i = 0; i < ((STableListInfo*)pTableListInfo)->numOfOuputGroups; ++i) {
1993
//     int32_t offset = ((STableListInfo*)pTableListInfo)->groupOffset[i];
1994

1995
//     STableKeyInfo* pKeyInfo = taosArrayGet(((STableListInfo*)pTableListInfo)->pTableList, offset);
1996
//     if (pKeyInfo != NULL && pKeyInfo->groupId == gid) {
1997
//       index = i;
1998
//       goto end;
1999
//     }
2000
//   }
2001
// end:
2002
//   (void)taosThreadRwlockUnlock(lock);
2003
//   return index;
2004
// }
2005

2006
void qStreamDestroyTableList(void* pTableListInfo) { tableListDestroy(pTableListInfo); }
281,409✔
2007
SArray*  qStreamGetTableListArray(void* pTableListInfo) {
281,409✔
2008
  STableListInfo* pList = pTableListInfo;
281,409✔
2009
  return pList->pTableList;
281,409✔
2010
}
2011

2012
int32_t qStreamFilter(SSDataBlock* pBlock, void* pFilterInfo, SColumnInfoData** pRet) { return doFilter(pBlock, pFilterInfo, NULL, pRet); }
1,619,829✔
2013

2014
void streamDestroyExecTask(qTaskInfo_t tInfo) {
3,001,405✔
2015
  qDebug("streamDestroyExecTask called, task:%p", tInfo);
3,001,405✔
2016
  qDestroyTask(tInfo);
3,001,405✔
2017
}
3,001,405✔
2018

2019
int32_t streamCalcOneScalarExpr(SNode* pExpr, SScalarParam* pDst, const SStreamRuntimeFuncInfo* pExtraParams) {
816,599✔
2020
  return streamCalcOneScalarExprInRange(pExpr, pDst, -1, -1, pExtraParams);
816,599✔
2021
}
2022

2023
int32_t streamCalcOneScalarExprInRange(SNode* pExpr, SScalarParam* pDst, int32_t rowStartIdx, int32_t rowEndIdx,
881,734✔
2024
                                       const SStreamRuntimeFuncInfo* pExtraParams) {
2025
  int32_t      code = 0;
881,734✔
2026
  SNode*       pNode = NULL;
881,734✔
2027
  SNodeList*   pList = NULL;
881,734✔
2028
  SExprInfo*   pExprInfo = NULL;
881,734✔
2029
  int32_t      numOfExprs = 1;
881,734✔
2030
  int32_t*     offset = NULL;
881,734✔
2031
  STargetNode* pTargetNode = NULL;
881,734✔
2032
  code = nodesMakeNode(QUERY_NODE_TARGET, (SNode**)&pTargetNode);
881,734✔
2033
  if (code == 0) {
881,734✔
2034
    code = nodesCloneNode(pExpr, &pNode);
881,734✔
2035
  }
2036

2037
  if (code == 0) {
881,734✔
2038
    pTargetNode->dataBlockId = 0;
881,734✔
2039
    pTargetNode->pExpr = pNode;
881,734✔
2040
    pTargetNode->slotId = 0;
881,734✔
2041
  }
2042
  if (code == 0) {
881,734✔
2043
    code = nodesMakeList(&pList);
881,734✔
2044
  }
2045
  if (code == 0) {
881,734✔
2046
    code = nodesListAppend(pList, (SNode*)pTargetNode);
881,734✔
2047
  }
2048
  if (code == 0) {
881,734✔
2049
    pNode = NULL;
881,734✔
2050
    code = createExprInfo(pList, NULL, &pExprInfo, &numOfExprs);
881,734✔
2051
  }
2052

2053
  if (code == 0) {
881,734✔
2054
    const char* pVal = NULL;
881,734✔
2055
    int32_t     len = 0;
881,734✔
2056
    SNode*      pSclNode = NULL;
881,734✔
2057
    switch (pExprInfo->pExpr->nodeType) {
881,734✔
2058
      case QUERY_NODE_FUNCTION:
881,734✔
2059
        pSclNode = (SNode*)pExprInfo->pExpr->_function.pFunctNode;
881,734✔
2060
        break;
881,734✔
2061
      case QUERY_NODE_OPERATOR:
×
2062
        pSclNode = pExprInfo->pExpr->_optrRoot.pRootNode;
×
2063
        break;
×
2064
      default:
×
2065
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
2066
        break;
×
2067
    }
2068
    SArray*     pBlockList = taosArrayInit(2, POINTER_BYTES);
881,734✔
2069
    SSDataBlock block = {0};
881,499✔
2070
    block.info.rows = 1;
881,499✔
2071
    SSDataBlock* pBlock = &block;
881,499✔
2072
    void*        tmp = taosArrayPush(pBlockList, &pBlock);
881,734✔
2073
    if (tmp == NULL) {
881,734✔
2074
      code = terrno;
×
2075
    }
2076
    if (code == 0) {
881,734✔
2077
      gTaskScalarExtra.pStreamInfo = (void*)pExtraParams;
881,734✔
2078
      gTaskScalarExtra.pStreamRange = NULL;
881,734✔
2079
      code = scalarCalculateInRange(pSclNode, pBlockList, pDst, rowStartIdx, rowEndIdx, &gTaskScalarExtra);
881,734✔
2080
    }
2081
    taosArrayDestroy(pBlockList);
881,734✔
2082
  }
2083
  nodesDestroyList(pList);
881,734✔
2084
  destroyExprInfo(pExprInfo, numOfExprs);
881,608✔
2085
  taosMemoryFreeClear(pExprInfo);
881,734✔
2086
  return code;
881,734✔
2087
}
2088

2089
int32_t streamForceOutput(qTaskInfo_t tInfo, SSDataBlock** pRes, int32_t winIdx) {
2,358,454✔
2090
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
2,358,454✔
2091
  const SArray*  pForceOutputCols = pTaskInfo->pStreamRuntimeInfo->pForceOutputCols;
2,358,454✔
2092
  int32_t        code = 0;
2,358,454✔
2093
  SNode*         pNode = NULL;
2,358,454✔
2094
  if (!pForceOutputCols) return 0;
2,358,454✔
2095
  if (!*pRes) {
65,135✔
2096
    code = createDataBlock(pRes);
65,135✔
2097
  }
2098

2099
  if (code == 0 && (!(*pRes)->pDataBlock || (*pRes)->pDataBlock->size == 0)) {
65,135✔
2100
    int32_t idx = 0;
65,135✔
2101
    for (int32_t i = 0; i < pForceOutputCols->size; ++i) {
260,335✔
2102
      SStreamOutCol*  pCol = (SStreamOutCol*)taosArrayGet(pForceOutputCols, i);
195,200✔
2103
      SColumnInfoData colInfo = createColumnInfoData(pCol->type.type, pCol->type.bytes, idx++);
195,200✔
2104
      colInfo.info.precision = pCol->type.precision;
195,200✔
2105
      colInfo.info.scale = pCol->type.scale;
195,200✔
2106
      code = blockDataAppendColInfo(*pRes, &colInfo);
195,200✔
2107
      if (code != 0) break;
195,200✔
2108
    }
2109
  }
2110

2111
  code = blockDataEnsureCapacity(*pRes, (*pRes)->info.rows + 1);
65,135✔
2112
  if (code != TSDB_CODE_SUCCESS) {
65,135✔
2113
    qError("failed to ensure capacity for force output, code:%s", tstrerror(code));
×
2114
    return code;
×
2115
  }
2116

2117
  // loop all exprs for force output, execute all exprs
2118
  int32_t idx = 0;
65,135✔
2119
  int32_t rowIdx = (*pRes)->info.rows;
65,135✔
2120
  int32_t tmpWinIdx = pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
65,135✔
2121
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = winIdx;
65,135✔
2122
  for (int32_t i = 0; i < pForceOutputCols->size; ++i) {
260,335✔
2123
    SScalarParam   dst = {0};
195,200✔
2124
    SStreamOutCol* pCol = (SStreamOutCol*)taosArrayGet(pForceOutputCols, i);
195,200✔
2125
    code = nodesStringToNode(pCol->expr, &pNode);
195,200✔
2126
    if (code != 0) break;
195,200✔
2127
    SColumnInfoData* pInfo = taosArrayGet((*pRes)->pDataBlock, idx);
195,200✔
2128
    if (nodeType(pNode) == QUERY_NODE_VALUE) {
195,200✔
2129
      void* p = nodesGetValueFromNode((SValueNode*)pNode);
130,065✔
2130
      code = colDataSetVal(pInfo, rowIdx, p, ((SValueNode*)pNode)->isNull);
130,065✔
2131
    } else {
2132
      dst.columnData = pInfo;
65,135✔
2133
      dst.numOfRows = rowIdx;
65,135✔
2134
      dst.colAlloced = false;
65,135✔
2135
      code = streamCalcOneScalarExprInRange(pNode, &dst, rowIdx, rowIdx, &pTaskInfo->pStreamRuntimeInfo->funcInfo);
65,135✔
2136
    }
2137
    ++idx;
195,200✔
2138
    // TODO sclFreeParam(&dst);
2139
    nodesDestroyNode(pNode);
195,200✔
2140
    if (code != 0) break;
195,200✔
2141
  }
2142
  if (code == TSDB_CODE_SUCCESS) {
65,135✔
2143
    (*pRes)->info.rows++;
65,135✔
2144
  }
2145
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = tmpWinIdx;
65,135✔
2146
  return code;
65,135✔
2147
}
2148

2149
int32_t streamCalcOutputTbName(SNode* pExpr, char* tbname, SStreamRuntimeFuncInfo* pStreamRuntimeInfo) {
280,515✔
2150
  int32_t      code = 0;
280,515✔
2151
  const char*  pVal = NULL;
280,515✔
2152
  SScalarParam dst = {0};
280,515✔
2153
  int32_t      len = 0;
280,515✔
2154
  int32_t      nextIdx = pStreamRuntimeInfo->curIdx;
280,515✔
2155
  pStreamRuntimeInfo->curIdx = 0;  // always use the first window to calc tbname
280,515✔
2156
  // execute the expr
2157
  switch (pExpr->type) {
280,515✔
2158
    case QUERY_NODE_VALUE: {
×
2159
      SValueNode* pValue = (SValueNode*)pExpr;
×
2160
      int32_t     type = pValue->node.resType.type;
×
2161
      if (!IS_STR_DATA_TYPE(type)) {
×
2162
        qError("invalid sub tb expr with non-str type");
×
2163
        code = TSDB_CODE_INVALID_PARA;
×
2164
        break;
×
2165
      }
2166
      void* pTmp = nodesGetValueFromNode((SValueNode*)pExpr);
×
2167
      if (pTmp == NULL) {
×
2168
        qError("invalid sub tb expr with null value");
×
2169
        code = TSDB_CODE_INVALID_PARA;
×
2170
        break;
×
2171
      }
2172
      pVal = varDataVal(pTmp);
×
2173
      len = varDataLen(pTmp);
×
2174
    } break;
×
2175
    case QUERY_NODE_FUNCTION: {
280,515✔
2176
      SFunctionNode* pFunc = (SFunctionNode*)pExpr;
280,515✔
2177
      if (!IS_STR_DATA_TYPE(pFunc->node.resType.type)) {
280,515✔
2178
        qError("invalid sub tb expr with non-str type func");
×
2179
        code = TSDB_CODE_INVALID_PARA;
×
2180
        break;
×
2181
      }
2182
      SColumnInfoData* pCol = taosMemoryCalloc(1, sizeof(SColumnInfoData));
280,515✔
2183
      if (!pCol) {
280,515✔
2184
        code = terrno;
×
2185
        qError("failed to allocate col info data at: %s, %d", __func__, __LINE__);
×
2186
        break;
×
2187
      }
2188

2189
      pCol->hasNull = true;
280,515✔
2190
      pCol->info.type = ((SExprNode*)pExpr)->resType.type;
280,515✔
2191
      pCol->info.colId = 0;
280,515✔
2192
      pCol->info.bytes = ((SExprNode*)pExpr)->resType.bytes;
280,515✔
2193
      pCol->info.precision = ((SExprNode*)pExpr)->resType.precision;
280,515✔
2194
      pCol->info.scale = ((SExprNode*)pExpr)->resType.scale;
280,515✔
2195
      code = colInfoDataEnsureCapacity(pCol, 1, true);
280,515✔
2196
      if (code != 0) {
280,515✔
2197
        qError("failed to ensure capacity for col info data at: %s, %d", __func__, __LINE__);
×
2198
        taosMemoryFree(pCol);
×
2199
        break;
×
2200
      }
2201
      dst.columnData = pCol;
280,515✔
2202
      dst.numOfRows = 1;
280,515✔
2203
      dst.colAlloced = true;
280,515✔
2204
      code = streamCalcOneScalarExpr(pExpr, &dst, pStreamRuntimeInfo);
280,515✔
2205
      if (colDataIsNull_var(dst.columnData, 0)) {
280,515✔
2206
        qInfo("invalid sub tb expr with null value");
1,638✔
2207
        code = TSDB_CODE_MND_STREAM_TBNAME_CALC_FAILED;
1,755✔
2208
      }
2209
      if (code == 0) {
280,515✔
2210
        pVal = varDataVal(colDataGetVarData(dst.columnData, 0));
278,760✔
2211
        len = varDataLen(colDataGetVarData(dst.columnData, 0));
278,332✔
2212
      }
2213
    } break;
280,515✔
2214
    default:
×
2215
      qError("wrong subtable expr with type: %d", pExpr->type);
×
2216
      code = TSDB_CODE_OPS_NOT_SUPPORT;
×
2217
      break;
×
2218
  }
2219
  if (code == 0) {
280,515✔
2220
    if (!pVal || len == 0) {
278,504✔
2221
      qError("tbname generated with no characters which is not allowed");
×
2222
      code = TSDB_CODE_INVALID_PARA;
×
2223
    }
2224
    if(len > TSDB_TABLE_NAME_LEN - 1) {
278,504✔
2225
      qError("tbname generated with too long characters, max allowed is %d, got %d, truncated.", TSDB_TABLE_NAME_LEN - 1, len);
470✔
2226
      len = TSDB_TABLE_NAME_LEN - 1;
470✔
2227
    }
2228

2229
    memcpy(tbname, pVal, len);
278,504✔
2230
    tbname[len] = '\0';  // ensure null terminated
278,504✔
2231
    if (NULL != strchr(tbname, '.')) {
278,332✔
2232
      code = TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME;
×
2233
      qError("tbname generated with invalid characters, '.' is not allowed");
×
2234
    }
2235
  }
2236
  // TODO free dst
2237
  sclFreeParam(&dst);
280,343✔
2238
  pStreamRuntimeInfo->curIdx = nextIdx; // restore
280,515✔
2239
  return code;
280,515✔
2240
}
2241

2242
void destroyStreamInserterParam(SStreamInserterParam* pParam) {
270,780✔
2243
  if (pParam) {
270,780✔
2244
    if (pParam->tbname) {
270,780✔
2245
      taosMemFree(pParam->tbname);
270,780✔
2246
      pParam->tbname = NULL;
270,780✔
2247
    }
2248
    if (pParam->stbname) {
270,780✔
2249
      taosMemFree(pParam->stbname);
270,780✔
2250
      pParam->stbname = NULL;
270,780✔
2251
    }
2252
    if (pParam->dbFName) {
270,780✔
2253
      taosMemFree(pParam->dbFName);
270,780✔
2254
      pParam->dbFName = NULL;
270,780✔
2255
    }
2256
    if (pParam->pFields) {
270,780✔
2257
      taosArrayDestroy(pParam->pFields);
270,780✔
2258
      pParam->pFields = NULL;
270,780✔
2259
    }
2260
    if (pParam->pTagFields) {
270,780✔
2261
      taosArrayDestroy(pParam->pTagFields);
156,765✔
2262
      pParam->pTagFields = NULL;
156,765✔
2263
    }
2264
    if (pParam->colCids) {
270,780✔
2265
      taosArrayDestroy(pParam->colCids);
1,888✔
2266
      pParam->colCids = NULL;
1,888✔
2267
    }
2268
    if (pParam->tagCids) {
270,780✔
2269
      taosArrayDestroy(pParam->tagCids);
1,212✔
2270
      pParam->tagCids = NULL;
1,212✔
2271
    }
2272
    taosMemFree(pParam);
270,780✔
2273
  }
2274
}
270,780✔
2275

2276
int32_t cloneStreamInserterParam(SStreamInserterParam** ppDst, SStreamInserterParam* pSrc) {
270,780✔
2277
  int32_t code = 0, lino = 0;
270,780✔
2278
  if (ppDst == NULL || pSrc == NULL) {
270,780✔
2279
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA);
235✔
2280
  }
2281
  *ppDst = (SStreamInserterParam*)taosMemoryCalloc(1, sizeof(SStreamInserterParam));
270,780✔
2282
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
270,545✔
2283

2284
  (*ppDst)->suid = pSrc->suid;
270,545✔
2285
  (*ppDst)->sver = pSrc->sver;
270,780✔
2286
  (*ppDst)->tbType = pSrc->tbType;
270,780✔
2287
  (*ppDst)->tbname = taosStrdup(pSrc->tbname);
270,780✔
2288
  TSDB_CHECK_NULL((*ppDst)->tbname, code, lino, _exit, terrno);
270,780✔
2289

2290
  if (pSrc->stbname) {
270,780✔
2291
    (*ppDst)->stbname = taosStrdup(pSrc->stbname);
270,780✔
2292
    TSDB_CHECK_NULL((*ppDst)->stbname, code, lino, _exit, terrno);
270,780✔
2293
  }
2294

2295
  (*ppDst)->dbFName = taosStrdup(pSrc->dbFName);
270,780✔
2296
  TSDB_CHECK_NULL((*ppDst)->dbFName, code, lino, _exit, terrno);
270,780✔
2297

2298
  (*ppDst)->pSinkHandle = pSrc->pSinkHandle;  // don't need clone and free
270,545✔
2299

2300
  if (pSrc->pFields && pSrc->pFields->size > 0) {
270,780✔
2301
    (*ppDst)->pFields = taosArrayDup(pSrc->pFields, NULL);
270,545✔
2302
    TSDB_CHECK_NULL((*ppDst)->pFields, code, lino, _exit, terrno);
270,311✔
2303
  } else {
2304
    (*ppDst)->pFields = NULL;
470✔
2305
  }
2306
  
2307
  if (pSrc->pTagFields && pSrc->pTagFields->size > 0) {
270,311✔
2308
    (*ppDst)->pTagFields = taosArrayDup(pSrc->pTagFields, NULL);
156,765✔
2309
    TSDB_CHECK_NULL((*ppDst)->pTagFields, code, lino, _exit, terrno);
156,765✔
2310
  } else {
2311
    (*ppDst)->pTagFields = NULL;
113,781✔
2312
  }
2313

2314
  if (pSrc->colCids && pSrc->colCids->size > 0) {
270,780✔
2315
    (*ppDst)->colCids = taosArrayDup(pSrc->colCids, NULL);
1,888✔
2316
    TSDB_CHECK_NULL((*ppDst)->colCids, code, lino, _exit, terrno);
1,888✔
2317
  } else {
2318
    (*ppDst)->colCids = NULL;
268,892✔
2319
  }
2320

2321
  if (pSrc->tagCids && pSrc->tagCids->size > 0) {
270,780✔
2322
    (*ppDst)->tagCids = taosArrayDup(pSrc->tagCids, NULL);
1,212✔
2323
    TSDB_CHECK_NULL((*ppDst)->tagCids, code, lino, _exit, terrno);
1,212✔
2324
  } else {
2325
    (*ppDst)->tagCids = NULL;
269,568✔
2326
  }
2327

2328
_exit:
270,780✔
2329

2330
  if (code != 0) {
270,780✔
2331
    if (*ppDst) {
×
2332
      destroyStreamInserterParam(*ppDst);
×
2333
      *ppDst = NULL;
×
2334
    }
2335
    
2336
    stError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2337
  }
2338
  return code;
270,546✔
2339
}
2340

2341
int32_t dropStreamTable(SMsgCb* pMsgCb, void* pOutput, SSTriggerDropRequest* pReq) {
×
2342
  return doDropStreamTable(pMsgCb, pOutput, pReq);
×
2343
}
2344

2345
int32_t dropStreamTableByTbName(SMsgCb* pMsgCb, void* pOutput, SSTriggerDropRequest* pReq, char* tbName) {
×
2346
  return doDropStreamTableByTbName(pMsgCb, pOutput, pReq, tbName);
×
2347
}
2348

2349
int32_t qFilterTableList(void* pVnode, SArray* uidList, SNode* node, void* pTaskInfo, uint64_t suid) {
12,083✔
2350
  int32_t         code = TSDB_CODE_SUCCESS;
12,083✔
2351

2352
  SNode* pTagCond = node == NULL ? NULL : ((SSubplan*)node)->pTagCond;
12,083✔
2353
  code = doFilterByTagCond(suid, uidList, pTagCond, pVnode, SFLT_NOT_INDEX, &((SExecTaskInfo*)pTaskInfo)->storageAPI, NULL);
12,083✔
2354
  if (code != TSDB_CODE_SUCCESS) {
12,083✔
2355
    goto end;
×
2356
  }
2357
end:
12,083✔
2358
  return code;
12,083✔
2359
}
2360

2361
bool isTrueForSatisfied(STrueForInfo* pTrueForInfo, int64_t skey, int64_t ekey, int64_t count) {
2,147,483,647✔
2362
  if (pTrueForInfo == NULL) {
2,147,483,647✔
2363
    return true;
2,147,483,647✔
2364
  }
2365

2366
  bool durationSatisfied = (pTrueForInfo->duration <= 0) || (llabs(ekey - skey) >= pTrueForInfo->duration);
2,147,483,647✔
2367
  bool countSatisfied = (pTrueForInfo->count <= 0) || (count >= pTrueForInfo->count);
2,147,483,647✔
2368
  switch (pTrueForInfo->trueForType) {
2,147,483,647✔
2369
    case TRUE_FOR_DURATION_ONLY:
2,147,483,647✔
2370
      return durationSatisfied;
2,147,483,647✔
2371
    case TRUE_FOR_COUNT_ONLY:
54,699,354✔
2372
      return countSatisfied;
54,699,354✔
2373
    case TRUE_FOR_AND:
54,699,354✔
2374
      return durationSatisfied && countSatisfied;
54,699,354✔
2375
    case TRUE_FOR_OR:
54,699,354✔
2376
      return durationSatisfied || countSatisfied;
54,699,354✔
2377
    default:
×
2378
      return true;
×
2379
  }
2380
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc