• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4980

10 Mar 2026 08:57AM UTC coverage: 68.492% (-0.02%) from 68.512%
#4980

push

travis-ci

web-flow
fix: add retry while exec ci case test_stable_keep_compact.py. (#34729)

211901 of 309380 relevant lines covered (68.49%)

135171938.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.9
/source/libs/executor/src/executor.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include <stdint.h>
18
#include "cmdnodes.h"
19
#include "dataSinkInt.h"
20
#include "executil.h"
21
#include "executorInt.h"
22
#include "libs/new-stream/stream.h"
23
#include "operator.h"
24
#include "osMemPool.h"
25
#include "osMemory.h"
26
#include "planner.h"
27
#include "query.h"
28
#include "querytask.h"
29
#include "storageapi.h"
30
#include "streamexecutorInt.h"
31
#include "taosdef.h"
32
#include "tarray.h"
33
#include "tdatablock.h"
34
#include "tref.h"
35
#include "trpc.h"
36
#include "ttypes.h"
37
#include "tudf.h"
38
#include "wal.h"
39

40
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
41
int32_t             fetchObjRefPool = -1;
42
SGlobalExecInfo     gExecInfo = {0};
43

44
void setTaskScalarExtraInfo(qTaskInfo_t tinfo) {
727,767,573✔
45
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
727,767,573✔
46
  gTaskScalarExtra.pSubJobCtx = pTaskInfo->pSubJobCtx;
727,767,573✔
47
  gTaskScalarExtra.fp = qFetchRemoteNode;
727,864,131✔
48
}
727,673,183✔
49

50
void gExecInfoInit(void* pDnode, getDnodeId_f getDnodeId, getMnodeEpset_f getMnode) {
594,177✔
51
  gExecInfo.dnode = pDnode;
594,177✔
52
  gExecInfo.getMnode = getMnode;
594,177✔
53
  gExecInfo.getDnodeId = getDnodeId;
594,177✔
54
  return;
594,177✔
55
}
56

57
int32_t getCurrentMnodeEpset(SEpSet* pEpSet) {
44,964✔
58
  if (gExecInfo.dnode == NULL || gExecInfo.getMnode == NULL) {
44,964✔
59
    qError("gExecInfo is not initialized");
×
60
    return TSDB_CODE_APP_ERROR;
×
61
  }
62
  gExecInfo.getMnode(gExecInfo.dnode, pEpSet);
45,049✔
63
  return TSDB_CODE_SUCCESS;
45,221✔
64
}
65

66
void doDestroyFetchObj(void* param) {
142,365,292✔
67
  if (param == NULL) {
142,365,292✔
68
    return;
×
69
  }
70

71
  if (*(bool*)param) {
142,365,292✔
72
    doDestroyExchangeOperatorInfo(param);
104,958,532✔
73
  } else {
74
    destroySubJobCtx((STaskSubJobCtx *)param);
37,409,421✔
75
  }
76
}
77

78
static void cleanupRefPool() {
563,465✔
79
  int32_t ref = atomic_val_compare_exchange_32(&fetchObjRefPool, fetchObjRefPool, 0);
563,465✔
80
  taosCloseRef(ref);
563,465✔
81
}
563,465✔
82

83
static void initRefPool() {
563,465✔
84
  fetchObjRefPool = taosOpenRef(1024, doDestroyFetchObj);
563,465✔
85
  (void)atexit(cleanupRefPool);
563,465✔
86
}
563,465✔
87

88
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
×
89
  int32_t code = TSDB_CODE_SUCCESS;
×
90
  int32_t lino = 0;
×
91
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
×
92
    if (pOperator->numOfDownstream == 0) {
×
93
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
×
94
      return TSDB_CODE_APP_ERROR;
×
95
    }
96

97
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
×
98
      qError("join not supported for stream block scan, %s" PRIx64, id);
×
99
      return TSDB_CODE_APP_ERROR;
×
100
    }
101
    pOperator->status = OP_NOT_OPENED;
×
102
    return doSetSMABlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
×
103
  } else {
104
    pOperator->status = OP_NOT_OPENED;
×
105

106
    SStreamScanInfo* pInfo = pOperator->info;
×
107

108
    if (type == STREAM_INPUT__MERGED_SUBMIT) {
×
109
      for (int32_t i = 0; i < numOfBlocks; i++) {
×
110
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
×
111
        void*        tmp = taosArrayPush(pInfo->pBlockLists, pReq);
×
112
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
113
      }
114
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
×
115
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
×
116
      void* tmp = taosArrayPush(pInfo->pBlockLists, &input);
×
117
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
118
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
×
119
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
×
120
      for (int32_t i = 0; i < numOfBlocks; ++i) {
×
121
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
×
122
        SPackedData  tmp = {.pDataBlock = pDataBlock};
×
123
        void*        tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
×
124
        QUERY_CHECK_NULL(tmpItem, code, lino, _end, terrno);
×
125
      }
126
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
×
127
    } else if (type == STREAM_INPUT__CHECKPOINT) {
×
128
      SPackedData tmp = {.pDataBlock = input};
×
129
      void*       tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
×
130
      QUERY_CHECK_NULL(tmpItem, code, lino, _end, terrno);
×
131
      pInfo->blockType = STREAM_INPUT__CHECKPOINT;
×
132
    } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
×
133
      for (int32_t i = 0; i < numOfBlocks; ++i) {
×
134
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
×
135
        void*        tmp = taosArrayPush(pInfo->pBlockLists, pReq);
×
136
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
137
      }
138
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
×
139
    }
140

141
    return TSDB_CODE_SUCCESS;
×
142
  }
143

144
_end:
×
145
  if (code != TSDB_CODE_SUCCESS) {
×
146
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
147
  }
148
  return code;
×
149
}
150

151
static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
×
152
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
×
153
    if (pOperator->numOfDownstream == 0) {
×
154
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
×
155
      return TSDB_CODE_APP_ERROR;
×
156
    }
157

158
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
×
159
      qError("join not supported for stream block scan, %s" PRIx64, id);
×
160
      return TSDB_CODE_APP_ERROR;
×
161
    }
162

163
    pOperator->status = OP_NOT_OPENED;
×
164
    return doSetStreamOpOpen(pOperator->pDownstream[0], id);
×
165
  }
166
  return 0;
×
167
}
168

169
int32_t doSetTaskId(SOperatorInfo* pOperator, SStorageAPI* pAPI) {
49,862,283✔
170
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
49,862,283✔
171
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
49,863,283✔
172
    SStreamScanInfo* pStreamScanInfo = pOperator->info;
22,720,554✔
173
    if (pStreamScanInfo->pTableScanOp != NULL) {
22,720,878✔
174
      STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info;
22,720,214✔
175
      if (pScanInfo->base.dataReader != NULL) {
22,721,224✔
176
        int32_t code = pAPI->tsdReader.tsdSetReaderTaskId(pScanInfo->base.dataReader, pTaskInfo->id.str);
273,285✔
177
        if (code) {
272,676✔
178
          qError("failed to set reader id for executor, code:%s", tstrerror(code));
×
179
          return code;
×
180
        }
181
      }
182
    }
183
  } else {
184
    if (pOperator->pDownstream) return doSetTaskId(pOperator->pDownstream[0], pAPI);
27,140,443✔
185
  }
186

187
  return 0;
26,474,357✔
188
}
189

190
int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
26,473,061✔
191
  SExecTaskInfo* pTaskInfo = tinfo;
26,473,061✔
192
  pTaskInfo->id.queryId = queryId;
26,473,061✔
193
  buildTaskId(taskId, queryId, pTaskInfo->id.str, 64);
26,473,705✔
194

195
  // set the idstr for tsdbReader
196
  return doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI);
26,475,215✔
197
}
198

199
bool qTaskIsDone(qTaskInfo_t tinfo) {
×
200
  SExecTaskInfo* pTaskInfo = tinfo;
×
201
  return pTaskInfo->status == OP_EXEC_DONE;
×
202
}
203

204
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
×
205
  if (tinfo == NULL) {
×
206
    return TSDB_CODE_APP_ERROR;
×
207
  }
208

209
  if (pBlocks == NULL || numOfBlocks == 0) {
×
210
    return TSDB_CODE_SUCCESS;
×
211
  }
212

213
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
214

215
  int32_t code = doSetSMABlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
×
216
  if (code != TSDB_CODE_SUCCESS) {
×
217
    qError("%s failed to set the sma block data", GET_TASKID(pTaskInfo));
×
218
  } else {
219
    qDebug("%s set the sma block successfully", GET_TASKID(pTaskInfo));
×
220
  }
221

222
  return code;
×
223
}
224

225
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, uint64_t id) {
456,375✔
226
  if (msg == NULL) {  // create raw scan
456,375✔
227
    SExecTaskInfo* pTaskInfo = NULL;
124,724✔
228

229
    int32_t code = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, &pReaderHandle->api, &pTaskInfo);
125,051✔
230
    if (NULL == pTaskInfo || code != 0) {
125,051✔
231
      return NULL;
×
232
    }
233

234
    code = createTmqRawScanOperatorInfo(pReaderHandle, pTaskInfo, &pTaskInfo->pRoot);
125,051✔
235
    if (NULL == pTaskInfo->pRoot || code != 0) {
125,051✔
236
      taosMemoryFree(pTaskInfo);
×
237
      return NULL;
×
238
    }
239

240
    pTaskInfo->storageAPI = pReaderHandle->api;
125,051✔
241
    qDebug("create raw scan task info completed, vgId:%d, %s", vgId, GET_TASKID(pTaskInfo));
125,051✔
242
    return pTaskInfo;
125,051✔
243
  }
244

245
  SSubplan* pPlan = NULL;
331,651✔
246
  int32_t   code = qStringToSubplan(msg, &pPlan);
333,166✔
247
  if (code != TSDB_CODE_SUCCESS) {
332,806✔
248
    qError("failed to parse subplan from msg, msg:%s code:%s", (char*) msg, tstrerror(code));
×
249
    terrno = code;
×
250
    return NULL;
×
251
  }
252

253
  qTaskInfo_t pTaskInfo = NULL;
332,806✔
254
  code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_QUEUE, NULL);
332,806✔
255
  if (code != TSDB_CODE_SUCCESS) {
333,166✔
256
    qDestroyTask(pTaskInfo);
×
257
    terrno = code;
×
258
    return NULL;
×
259
  }
260

261
  return pTaskInfo;
333,166✔
262
}
263

264
static int32_t checkInsertParam(SStreamInserterParam* streamInserterParam) {
710,104✔
265
  if (streamInserterParam == NULL) {
710,104✔
266
    return TSDB_CODE_SUCCESS;
449,630✔
267
  }
268

269
  if (streamInserterParam->tbType == TSDB_SUPER_TABLE && streamInserterParam->suid <= 0) {
260,474✔
270
    stError("insertParam: invalid suid:%" PRIx64 " for child table", streamInserterParam->suid);
×
271
    return TSDB_CODE_INVALID_PARA;
×
272
  }
273

274
  if (streamInserterParam->dbFName == NULL || strlen(streamInserterParam->dbFName) == 0) {
260,698✔
275
    stError("insertParam: invalid db/table name");
×
276
    return TSDB_CODE_INVALID_PARA;
×
277
  }
278

279
  if (streamInserterParam->suid <= 0 &&
260,698✔
280
      (streamInserterParam->tbname == NULL || strlen(streamInserterParam->tbname) == 0)) {
103,683✔
281
    stError("insertParam: invalid table name, suid:%" PRIx64 "", streamInserterParam->suid);
×
282
    return TSDB_CODE_INVALID_PARA;
×
283
  }
284

285
  return TSDB_CODE_SUCCESS;
260,698✔
286
}
287

288
static int32_t qCreateStreamExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
710,104✔
289
                                     qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql,
290
                                     EOPTR_EXEC_MODEL model, SStreamInserterParam* streamInserterParam) {
291
  if (pSubplan == NULL || pTaskInfo == NULL) {
710,104✔
292
    qError("invalid parameter, pSubplan:%p, pTaskInfo:%p", pSubplan, pTaskInfo);
×
293
    nodesDestroyNode((SNode *)pSubplan);
×
294
    return TSDB_CODE_INVALID_PARA;
×
295
  }
296
  int32_t lino = 0;
710,328✔
297
  int32_t code = checkInsertParam(streamInserterParam);
710,328✔
298
  if (code != TSDB_CODE_SUCCESS) {
710,328✔
299
    qError("invalid stream inserter param, code:%s", tstrerror(code));
×
300
    nodesDestroyNode((SNode *)pSubplan);
×
301
    return code;
×
302
  }
303
  SInserterParam* pInserterParam = NULL;
710,328✔
304
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
710,328✔
305
  (void)taosThreadOnce(&initPoolOnce, initRefPool);
710,328✔
306
  qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
710,104✔
307

308
  code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model, NULL);
710,104✔
309
  if (code != TSDB_CODE_SUCCESS || NULL == *pTask) {
710,328✔
310
    qError("failed to createExecTaskInfo, code:%s", tstrerror(code));
337✔
311
    goto _error;
224✔
312
  }
313

314
  if (streamInserterParam) {
709,991✔
315
    SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50, .compress = compressResult};
260,698✔
316
    void*           pSinkManager = NULL;
260,698✔
317
    code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
260,698✔
318
    if (code != TSDB_CODE_SUCCESS) {
260,698✔
319
      qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
×
320
      goto _error;
×
321
    }
322

323
    pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
260,698✔
324
    if (NULL == pInserterParam) {
260,698✔
325
      qError("failed to taosMemoryCalloc, code:%s, %s", tstrerror(terrno), (*pTask)->id.str);
×
326
      code = terrno;
×
327
      goto _error;
×
328
    }
329
    code = cloneStreamInserterParam(&pInserterParam->streamInserterParam, streamInserterParam);
260,698✔
330
    TSDB_CHECK_CODE(code, lino, _error);
260,464✔
331
    
332
    pInserterParam->readHandle = taosMemCalloc(1, sizeof(SReadHandle));
260,464✔
333
    pInserterParam->readHandle->pMsgCb = readHandle->pMsgCb;
260,698✔
334

335
    code = createStreamDataInserter(pSinkManager, handle, pInserterParam);
260,698✔
336
    if (code) {
260,474✔
337
      qError("failed to createStreamDataInserter, code:%s, %s", tstrerror(code), (*pTask)->id.str);
×
338
    }
339
  }
340
  qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64 " code:%s", taskId, pSubplan->id.queryId,
710,104✔
341
         tstrerror(code));
342

343
_error:
74,670✔
344

345
  if (code != TSDB_CODE_SUCCESS) {
710,104✔
346
    qError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
224✔
347
    if (pInserterParam != NULL) {
224✔
348
      taosMemoryFree(pInserterParam);
×
349
    }
350
  }
351
  return code;
710,104✔
352
}
353

354
bool qNeedReset(qTaskInfo_t pInfo) {
3,044,145✔
355
  if (pInfo == NULL) {
3,044,145✔
356
    return false;
×
357
  }
358
  SExecTaskInfo*  pTaskInfo = (SExecTaskInfo*)pInfo;
3,044,145✔
359
  SOperatorInfo*  pOperator = pTaskInfo->pRoot;
3,044,145✔
360
  if (pOperator == NULL || pOperator->pPhyNode == NULL) {
3,044,145✔
361
    return false;
×
362
  }
363
  int32_t node = nodeType(pOperator->pPhyNode);
3,044,145✔
364
  return (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == node || 
2,756,958✔
365
          QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == node ||
510,534✔
366
          QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == node ||
5,801,103✔
367
          QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == node);
368
}
369

370
static void setReadHandle(SReadHandle* pHandle, STableScanBase* pScanBaseInfo) {
2,533,611✔
371
  if (pHandle == NULL || pScanBaseInfo == NULL) {
2,533,611✔
372
    return;
×
373
  }
374

375
  pScanBaseInfo->readHandle.uid = pHandle->uid;
2,533,611✔
376
  pScanBaseInfo->readHandle.winRangeValid = pHandle->winRangeValid;
2,533,611✔
377
  pScanBaseInfo->readHandle.winRange = pHandle->winRange;
2,533,611✔
378
  pScanBaseInfo->readHandle.extWinRangeValid = pHandle->extWinRangeValid;
2,533,611✔
379
  pScanBaseInfo->readHandle.extWinRange = pHandle->extWinRange;
2,533,611✔
380
  pScanBaseInfo->readHandle.cacheSttStatis = pHandle->cacheSttStatis;
2,533,611✔
381
}
382

383
int32_t qResetTableScan(qTaskInfo_t pInfo, SReadHandle* handle) {
3,044,145✔
384
  if (pInfo == NULL) {
3,044,145✔
385
    return TSDB_CODE_INVALID_PARA;
×
386
  }
387
  SExecTaskInfo*  pTaskInfo = (SExecTaskInfo*)pInfo;
3,044,145✔
388
  SOperatorInfo*  pOperator = pTaskInfo->pRoot;
3,044,145✔
389

390
  void*           info = pOperator->info;
3,044,145✔
391
  STableScanBase* pScanBaseInfo = NULL;
3,044,145✔
392

393
  if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pOperator->pPhyNode)) {
3,044,145✔
394
    pScanBaseInfo = &((STableScanInfo*)info)->base;
287,187✔
395
    setReadHandle(handle, pScanBaseInfo);
287,187✔
396
  } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(pOperator->pPhyNode)) {
2,756,958✔
397
    pScanBaseInfo = &((STableMergeScanInfo*)info)->base;
2,246,424✔
398
    setReadHandle(handle, pScanBaseInfo);
2,246,424✔
399
  }
400

401
  qDebug("reset table scan, name:%s, id:%s, time range: [%" PRId64 ", %" PRId64 "]", pOperator->name, GET_TASKID(pTaskInfo), handle->winRange.skey,
3,044,145✔
402
  handle->winRange.ekey);
403
  return pOperator->fpSet.resetStateFn(pOperator);
3,044,145✔
404
}
405

406
int32_t qCreateStreamExecTaskInfo(qTaskInfo_t* pTaskInfo, void* msg, SReadHandle* readers,
710,104✔
407
                                  SStreamInserterParam* pInserterParams, int32_t vgId, int32_t taskId) {
408
  if (msg == NULL) {
710,104✔
409
    return TSDB_CODE_INVALID_PARA;
×
410
  }
411

412
  *pTaskInfo = NULL;
710,104✔
413

414
  SSubplan* pPlan = NULL;
710,328✔
415
  int32_t   code = qStringToSubplan(msg, &pPlan);
710,328✔
416
  if (code != TSDB_CODE_SUCCESS) {
710,328✔
417
    nodesDestroyNode((SNode *)pPlan);
×
418
    return code;
×
419
  }
420
  // todo: add stream inserter param
421
  code = qCreateStreamExecTask(readers, vgId, taskId, pPlan, pTaskInfo,
710,328✔
422
                               pInserterParams ? &pInserterParams->pSinkHandle : NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM,
423
                               pInserterParams);
424
  if (code != TSDB_CODE_SUCCESS) {
710,328✔
425
    qDestroyTask(*pTaskInfo);
224✔
426
    return code;
224✔
427
  }
428

429
  return code;
710,104✔
430
}
431

432
typedef struct {
433
  tb_uid_t tableUid;
434
  tb_uid_t childUid;
435
  int8_t   check;
436
} STqPair;
437

438
static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr,
397,899✔
439
                                       SStorageAPI* pAPI, SArray** ppArrayRes) {
440
  int32_t code = TSDB_CODE_SUCCESS;
397,899✔
441
  int32_t lino = 0;
397,899✔
442
  int8_t  locked = 0;
397,899✔
443
  SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
397,899✔
444

445
  QUERY_CHECK_NULL(qa, code, lino, _error, terrno);
397,899✔
446

447
  SArray* tUid = taosArrayInit(4, sizeof(STqPair));
397,899✔
448
  QUERY_CHECK_NULL(tUid, code, lino, _error, terrno);
397,899✔
449

450
  int32_t numOfUids = taosArrayGetSize(tableIdList);
397,899✔
451
  if (numOfUids == 0) {
397,899✔
452
    (*ppArrayRes) = qa;
×
453
    goto _error;
×
454
  }
455

456
  STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
397,899✔
457

458
  uint64_t suid = 0;
397,899✔
459
  uint64_t uid = 0;
397,899✔
460
  int32_t  type = 0;
397,899✔
461
  tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &suid, &uid, &type);
397,899✔
462

463
  // let's discard the tables those are not created according to the queried super table.
464
  SMetaReader mr = {0};
397,899✔
465
  pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
397,899✔
466

467
  locked = 1;
397,899✔
468
  for (int32_t i = 0; i < numOfUids; ++i) {
1,002,150✔
469
    uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
604,251✔
470
    QUERY_CHECK_NULL(id, code, lino, _end, terrno);
604,251✔
471

472
    int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr, *id);
604,251✔
473
    if (code != TSDB_CODE_SUCCESS) {
604,251✔
474
      qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
×
475
      continue;
×
476
    }
477

478
    tDecoderClear(&mr.coder);
604,251✔
479

480
    if (mr.me.type == TSDB_SUPER_TABLE) {
604,251✔
481
      continue;
×
482
    } else {
483
      if (type == TSDB_SUPER_TABLE) {
604,251✔
484
        // this new created child table does not belong to the scanned super table.
485
        if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != suid) {
604,251✔
486
          continue;
×
487
        }
488
      } else {  // ordinary table
489
        // In case that the scanned target table is an ordinary table. When replay the WAL during restore the vnode, we
490
        // should check all newly created ordinary table to make sure that this table isn't the destination table.
491
        if (mr.me.uid != uid) {
×
492
          continue;
×
493
        }
494
      }
495
    }
496

497
    STqPair item = {.tableUid = *id, .childUid = mr.me.uid, .check = 0};
604,251✔
498
    if (pScanInfo->pTagCond != NULL) {
604,251✔
499
      // tb_uid_t id = mr.me.uid;
500
      item.check = 1;
342,907✔
501
    }
502
    if (taosArrayPush(tUid, &item) == NULL) {
604,251✔
503
      QUERY_CHECK_NULL(NULL, code, lino, _end, terrno);
×
504
    }
505
  }
506

507
  pAPI->metaReaderFn.clearReader(&mr);
397,899✔
508
  locked = 0;
397,899✔
509

510
  for (int32_t j = 0; j < taosArrayGetSize(tUid); ++j) {
1,002,150✔
511
    bool     qualified = false;
604,251✔
512
    STqPair* t = (STqPair*)taosArrayGet(tUid, j);
604,251✔
513
    if (t == NULL) {
604,251✔
514
      continue;
×
515
    }
516

517
    if (t->check == 1) {
604,251✔
518
      code = isQualifiedTable(t->childUid, pScanInfo->pTagCond, pScanInfo->readHandle.vnode, &qualified, pAPI);
342,907✔
519
      if (code != TSDB_CODE_SUCCESS) {
342,907✔
520
        qError("failed to filter new table, uid:0x%" PRIx64 ", %s", t->childUid, idstr);
×
521
        continue;
×
522
      }
523

524
      if (!qualified) {
342,907✔
525
        qInfo("table uid:0x%" PRIx64 " is unqualified for tag condition, %s", t->childUid, idstr);
171,462✔
526
        continue;
171,462✔
527
      }
528
    }
529

530
    void* tmp = taosArrayPush(qa, &t->tableUid);
432,789✔
531
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
432,789✔
532
  }
533

534
  // handle multiple partition
535

536
_end:
397,899✔
537

538
  if (locked) {
397,899✔
539
    pAPI->metaReaderFn.clearReader(&mr);
×
540
  }
541
  (*ppArrayRes) = qa;
397,899✔
542
_error:
397,899✔
543

544
  taosArrayDestroy(tUid);
397,899✔
545
  if (code != TSDB_CODE_SUCCESS) {
397,899✔
546
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
547
  }
548
  return code;
397,899✔
549
}
550

551
int32_t qDeleteTableListForTmqScanner(qTaskInfo_t tinfo, const SArray* tableIdList) {
1,432✔
552
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1,432✔
553
  const char*    id = GET_TASKID(pTaskInfo);
1,432✔
554
  int32_t        code = 0;
1,432✔
555

556
  // traverse to the stream scanner node to add this table id
557
  SOperatorInfo* pInfo = NULL;
1,432✔
558
  code = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pInfo);
1,432✔
559
  if (code != 0 || pInfo == NULL) {
1,432✔
560
    return code;
×
561
  }
562

563
  SStreamScanInfo* pScanInfo = pInfo->info;
1,432✔
564
  qDebug("%d remove child tables from the stream scanner, %s", (int32_t)taosArrayGetSize(tableIdList), id);
1,432✔
565
  taosWLockLatch(&pTaskInfo->lock);
1,432✔
566
  pTaskInfo->storageAPI.tqReaderFn.tqReaderRemoveTables(pScanInfo->tqReader, tableIdList);
1,432✔
567
  taosWUnLockLatch(&pTaskInfo->lock);
1,432✔
568

569
  return code;
1,432✔
570
}
571

572
static int32_t filterTableForTmqQuery(SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* id, SStorageAPI* pAPI, SRWLatch* lock) {
397,899✔
573
  SArray* qa = NULL;
397,899✔
574
  int32_t code = filterUnqualifiedTables(pScanInfo, tableIdList, id, pAPI, &qa);
397,899✔
575
  if (code != TSDB_CODE_SUCCESS) {
397,899✔
576
    taosArrayDestroy(qa);
×
577
    return code;
×
578
  }
579
  int32_t numOfQualifiedTables = taosArrayGetSize(qa);
397,899✔
580
  qDebug("%d qualified child tables added into stream scanner, %s", numOfQualifiedTables, id);
397,899✔
581
  pAPI->tqReaderFn.tqReaderAddTables(pScanInfo->tqReader, qa);
397,899✔
582

583
  bool   assignUid = false;
397,899✔
584
  size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
397,899✔
585
  char*  keyBuf = NULL;
397,899✔
586
  if (bufLen > 0) {
397,899✔
587
    assignUid = groupbyTbname(pScanInfo->pGroupTags);
×
588
    keyBuf = taosMemoryMalloc(bufLen);
×
589
    if (keyBuf == NULL) {
×
590
      taosArrayDestroy(qa);
×
591
      return terrno;
×
592
    }
593
  }
594

595
  STableListInfo* pTableListInfo = ((STableScanInfo*)pScanInfo->pTableScanOp->info)->base.pTableListInfo;
397,899✔
596
  taosWLockLatch(lock);
397,899✔
597

598
  for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
830,688✔
599
    uint64_t* uid = taosArrayGet(qa, i);
432,789✔
600
    if (!uid) {
432,789✔
601
      taosMemoryFree(keyBuf);
×
602
      taosArrayDestroy(qa);
×
603
      taosWUnLockLatch(lock);
×
604
      return terrno;
×
605
    }
606
    STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
432,789✔
607

608
    if (bufLen > 0) {
432,789✔
609
      if (assignUid) {
×
610
        keyInfo.groupId = keyInfo.uid;
×
611
      } else {
612
        code = getGroupIdFromTagsVal(pScanInfo->readHandle.vnode, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
×
613
                                      &keyInfo.groupId, pAPI);
614
        if (code != TSDB_CODE_SUCCESS) {
×
615
          taosMemoryFree(keyBuf);
×
616
          taosArrayDestroy(qa);
×
617
          taosWUnLockLatch(lock);
×
618
          return code;
×
619
        }
620
      }
621
    }
622

623
    code = tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
432,789✔
624
    if (code != TSDB_CODE_SUCCESS) {
432,789✔
625
      taosMemoryFree(keyBuf);
×
626
      taosArrayDestroy(qa);
×
627
      taosWUnLockLatch(lock);
×
628
      return code;
×
629
    }
630
  }
631

632
  taosWUnLockLatch(lock);
397,899✔
633
  if (keyBuf != NULL) {
397,899✔
634
    taosMemoryFree(keyBuf);
×
635
  }
636

637
  taosArrayDestroy(qa);
397,899✔
638
  return 0;
397,899✔
639
}
640

641
static void qUpdateTableTagCache(SStreamScanInfo* pScanInfo, const SArray* tableIdList, col_id_t cid, SStorageAPI* api) {
398,551✔
642
  STqReader*   tqReader = pScanInfo->tqReader;
398,551✔
643
  for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
1,003,454✔
644
    int64_t* uid = (int64_t*)taosArrayGet(tableIdList, i);
604,903✔
645
    api->tqReaderFn.tqUpdateTableTagCache(pScanInfo->tqReader, pScanInfo->pPseudoExpr, pScanInfo->numOfPseudoExpr, *uid, cid);
604,903✔
646
  }
647
}
398,551✔
648

649
int32_t qAddTableListForTmqScanner(qTaskInfo_t tinfo, const SArray* tableIdList) {
397,899✔
650
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
397,899✔
651
  const char*    id = GET_TASKID(pTaskInfo);
397,899✔
652
  int32_t        code = 0;
397,899✔
653

654
  qDebug("try to add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), id);
397,899✔
655

656
  // traverse to the stream scanner node to add this table id
657
  SOperatorInfo* pInfo = NULL;
397,899✔
658
  code = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pInfo);
397,899✔
659
  if (code != 0 || pInfo == NULL) {
397,899✔
660
    return code;
×
661
  }
662

663
  SStreamScanInfo* pScanInfo = pInfo->info;
397,899✔
664
  qUpdateTableTagCache(pScanInfo, tableIdList, 0, &pTaskInfo->storageAPI);
397,899✔
665

666
  return filterTableForTmqQuery(pScanInfo, tableIdList, id, &pTaskInfo->storageAPI, &pTaskInfo->lock);
397,899✔
667
}
668

669
void qUpdateTableTagCacheForTmq(qTaskInfo_t tinfo, const SArray* tableIdList, SArray* cids) {
652✔
670
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
652✔
671
  const char*    id = GET_TASKID(pTaskInfo);
652✔
672
  int32_t        code = 0;
652✔
673

674
  // traverse to the stream scanner node to add this table id
675
  SOperatorInfo* pInfo = NULL;
652✔
676
  code = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pInfo);
652✔
677
  if (code != 0 || pInfo == NULL) {
652✔
678
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
679
    return;
×
680
  }
681

682
  SStreamScanInfo* pScanInfo = pInfo->info;
652✔
683
  for (int32_t i = 0; i < taosArrayGetSize(cids); ++i) {
1,304✔
684
    col_id_t* cid = (col_id_t*)taosArrayGet(cids, i);
652✔
685
    qUpdateTableTagCache(pScanInfo, tableIdList, *cid, &pTaskInfo->storageAPI);
652✔
686
  }
687
}
688

689
int32_t qUpdateTableListForTmqScanner(qTaskInfo_t tinfo, const SArray* tableIdList) {
×
690
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
691
  const char*    id = GET_TASKID(pTaskInfo);
×
692
  int32_t        code = 0;
×
693

694
  // traverse to the stream scanner node to add this table id
695
  SOperatorInfo* pInfo = NULL;
×
696
  code = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pInfo);
×
697
  if (code != 0 || pInfo == NULL) {
×
698
    return code;
×
699
  }
700

701
  SStreamScanInfo* pScanInfo = pInfo->info;
×
702

703
  qDebug("%s %d remove child tables from the stream scanner, %s", __func__, (int32_t)taosArrayGetSize(tableIdList), id);
×
704
  taosWLockLatch(&pTaskInfo->lock);
×
705
  pTaskInfo->storageAPI.tqReaderFn.tqReaderRemoveTables(pScanInfo->tqReader, tableIdList);
×
706
  taosWUnLockLatch(&pTaskInfo->lock);
×
707
  
708
  return filterTableForTmqQuery(pScanInfo, tableIdList, id, &pTaskInfo->storageAPI, &pTaskInfo->lock);
×
709
}
710

711
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, int32_t dbNameBuffLen, char* tableName,
590,499,807✔
712
                                    int32_t tbaleNameBuffLen, int32_t* sversion, int32_t* tversion, int32_t* rversion,
713
                                    int32_t idx, bool* tbGet) {
714
  *tbGet = false;
590,499,807✔
715

716
  if (tinfo == NULL || dbName == NULL || tableName == NULL) {
590,583,158✔
717
    return TSDB_CODE_INVALID_PARA;
116✔
718
  }
719
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
590,654,432✔
720

721
  if (taosArrayGetSize(pTaskInfo->schemaInfos) <= idx) {
590,654,432✔
722
    return TSDB_CODE_SUCCESS;
337,550,415✔
723
  }
724

725
  SSchemaInfo* pSchemaInfo = taosArrayGet(pTaskInfo->schemaInfos, idx);
253,119,303✔
726
  if (!pSchemaInfo) {
253,026,957✔
727
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
728
    return terrno;
×
729
  }
730

731
  *sversion = pSchemaInfo->sw->version;
253,026,957✔
732
  *tversion = pSchemaInfo->tversion;
253,103,687✔
733
  *rversion = pSchemaInfo->rversion;
253,068,489✔
734
  if (pSchemaInfo->dbname) {
253,068,647✔
735
    tstrncpy(dbName, pSchemaInfo->dbname, dbNameBuffLen);
253,120,171✔
736
  } else {
737
    dbName[0] = 0;
×
738
  }
739
  if (pSchemaInfo->tablename) {
253,225,596✔
740
    tstrncpy(tableName, pSchemaInfo->tablename, tbaleNameBuffLen);
253,145,389✔
741
  } else {
742
    tableName[0] = 0;
5,552✔
743
  }
744

745
  *tbGet = true;
253,189,995✔
746

747
  return TSDB_CODE_SUCCESS;
253,103,193✔
748
}
749

750
bool qIsDynamicExecTask(qTaskInfo_t tinfo) { return ((SExecTaskInfo*)tinfo)->dynamicTask; }
337,534,764✔
751

752
void qDestroyOperatorParam(SOperatorParam* pParam) {
121,440✔
753
  if (NULL == pParam) {
121,440✔
754
    return;
×
755
  }
756
  freeOperatorParam(pParam, OP_GET_PARAM);
121,440✔
757
}
758

759
/**
760
  @brief Update the operator param for the task.
761
  @note  Unlike setOperatorParam, this function will destroy the new param when
762
         operator type mismatch.
763
*/
764
void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) {
35,816,437✔
765
  SExecTaskInfo* pTask = (SExecTaskInfo*)tinfo;
35,816,437✔
766
  SOperatorParam* pNewParam = (SOperatorParam*)pParam;
35,816,437✔
767
  if (pTask->pRoot && pTask->pRoot->operatorType != pNewParam->opType) {
35,816,437✔
768
    qError("%s, %s operator type mismatch, task operator type:%d, "
117,600✔
769
           "new param operator type:%d", GET_TASKID(pTask), __func__,
770
           pTask->pRoot->operatorType,
771
           pNewParam->opType);
772
    qDestroyOperatorParam((SOperatorParam*)pParam);
117,600✔
773
    return;
117,600✔
774
  }
775
  TSWAP(pParam, pTask->pOpParam);
35,704,097✔
776
  ((SExecTaskInfo*)tinfo)->paramSet = false;
35,697,691✔
777
}
778

779
int32_t qExecutorInit(void) {
4,428,158✔
780
  (void)taosThreadOnce(&initPoolOnce, initRefPool);
4,428,158✔
781
  return TSDB_CODE_SUCCESS;
4,429,944✔
782
}
783

784
int32_t qSemWait(qTaskInfo_t task, tsem_t* pSem) {
35,001,902✔
785
  int32_t        code = TSDB_CODE_SUCCESS;
35,001,902✔
786
  SExecTaskInfo* pTask = (SExecTaskInfo*)task;
35,001,902✔
787
  if (pTask->pWorkerCb) {
35,001,902✔
788
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
35,000,816✔
789
    if (code != TSDB_CODE_SUCCESS) {
35,002,964✔
790
      pTask->code = code;
×
791
      return pTask->code;
×
792
    }
793
  }
794

795
  code = tsem_wait(pSem);
35,005,112✔
796
  if (code != TSDB_CODE_SUCCESS) {
35,002,964✔
797
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
798
    pTask->code = code;
×
799
    return pTask->code;
×
800
  }
801

802
  if (pTask->pWorkerCb) {
35,002,964✔
803
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
35,002,964✔
804
    if (code != TSDB_CODE_SUCCESS) {
35,002,964✔
805
      pTask->code = code;
×
806
      return pTask->code;
×
807
    }
808
  }
809
  return TSDB_CODE_SUCCESS;
35,002,964✔
810
}
811

812
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
340,047,349✔
813
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql,
814
                        EOPTR_EXEC_MODEL model, SArray** subEndPoints) {
815
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
340,047,349✔
816
  (void)taosThreadOnce(&initPoolOnce, initRefPool);
340,047,349✔
817

818
  qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d, subEndPoinsNum:%d", 
340,044,252✔
819
    taskId, pSubplan->id.queryId, vgId, (int32_t)taosArrayGetSize(subEndPoints ? *subEndPoints : NULL));
820

821
  readHandle->uid = 0;
340,086,826✔
822
  int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model, subEndPoints);
340,121,815✔
823
  if (code != TSDB_CODE_SUCCESS || NULL == *pTask) {
339,878,445✔
824
    qError("failed to createExecTaskInfo, code:%s", tstrerror(code));
631,286✔
825
    goto _error;
458,840✔
826
  }
827
    
828
  if (handle) {
339,325,434✔
829
    SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50, .compress = compressResult};
339,140,636✔
830
    void*           pSinkManager = NULL;
339,135,389✔
831
    code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
338,831,698✔
832
    if (code != TSDB_CODE_SUCCESS) {
338,882,458✔
833
      qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
×
834
      goto _error;
×
835
    }
836

837
    void* pSinkParam = NULL;
338,882,458✔
838
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, (*pTask), readHandle);
338,993,383✔
839
    if (code != TSDB_CODE_SUCCESS) {
338,860,064✔
840
      qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
×
841
      taosMemoryFree(pSinkManager);
×
842
      goto _error;
×
843
    }
844

845
    SDataSinkNode* pSink = NULL;
338,860,064✔
846
    if (readHandle->localExec) {
338,930,502✔
847
      code = nodesCloneNode((SNode*)pSubplan->pDataSink, (SNode**)&pSink);
2,860✔
848
      if (code != TSDB_CODE_SUCCESS) {
2,860✔
849
        qError("failed to nodesCloneNode, srcType:%d, code:%s, %s", nodeType(pSubplan->pDataSink), tstrerror(code),
13✔
850
               (*pTask)->id.str);
851
        taosMemoryFree(pSinkManager);
13✔
852
        goto _error;
×
853
      }
854
    }
855

856
    // pSinkParam has been freed during create sinker.
857
    code = dsCreateDataSinker(pSinkManager, readHandle->localExec ? &pSink : &pSubplan->pDataSink, handle, pSinkParam,
339,117,151✔
858
                              (*pTask)->id.str, pSubplan->processOneBlock);
338,999,936✔
859
    if (code) {
338,801,373✔
860
      qError("s-task:%s failed to create data sinker, code:%s", (*pTask)->id.str, tstrerror(code));
594✔
861
    }
862
  }
863

864
  qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64 " code:%s subEndPoints:%d", 
339,415,272✔
865
    taskId, pSubplan->id.queryId, tstrerror(code), (*pTask)->pSubJobCtx ? (int32_t)taosArrayGetSize((*pTask)->pSubJobCtx->subEndPoints) : 0);
866

867
_error:
120,334,836✔
868
  // if failed to add ref for all tables in this query, abort current query
869
  return code;
340,044,279✔
870
}
871

872
static void freeBlock(void* param) {
×
873
  SSDataBlock* pBlock = *(SSDataBlock**)param;
×
874
  blockDataDestroy(pBlock);
×
875
}
×
876

877
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal,
390,674,672✔
878
                     bool processOneBlock) {
879
  int32_t        code = TSDB_CODE_SUCCESS;
390,674,672✔
880
  int32_t        lino = 0;
390,674,672✔
881
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
390,674,672✔
882
  int64_t        threadId = taosGetSelfPthreadId();
390,674,672✔
883

884
  if (pLocal) {
390,640,579✔
885
    memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
387,055,409✔
886
  }
887

888
  taosArrayClear(pResList);
390,488,659✔
889

890
  int64_t curOwner = 0;
390,537,741✔
891
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
390,537,741✔
892
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
×
893
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
×
894
    return pTaskInfo->code;
×
895
  }
896

897
  if (pTaskInfo->cost.start == 0) {
390,557,074✔
898
    pTaskInfo->cost.start = taosGetTimestampUs();
333,648,807✔
899
  }
900

901
  if (isTaskKilled(pTaskInfo)) {
390,621,920✔
902
    atomic_store_64(&pTaskInfo->owner, 0);
1,183✔
903
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
1,183✔
904
    return pTaskInfo->code;
1,183✔
905
  }
906

907
  // error occurs, record the error code and return to client
908
  int32_t ret = setjmp(pTaskInfo->env);
390,574,927✔
909
  if (ret != TSDB_CODE_SUCCESS) {
392,583,335✔
910
    pTaskInfo->code = ret;
2,191,450✔
911
    (void)cleanUpUdfs();
2,191,450✔
912

913
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
2,191,450✔
914
    atomic_store_64(&pTaskInfo->owner, 0);
2,191,450✔
915

916
    return pTaskInfo->code;
2,191,450✔
917
  }
918

919
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
390,391,885✔
920

921
  int32_t      current = 0;
390,405,902✔
922
  SSDataBlock* pRes = NULL;
390,405,902✔
923
  int64_t      st = taosGetTimestampUs();
390,620,868✔
924

925
  if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
390,620,868✔
926
    pTaskInfo->paramSet = true;
35,700,517✔
927
    code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes);
35,698,115✔
928
  } else {
929
    code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
354,911,525✔
930
  }
931

932
  QUERY_CHECK_CODE(code, lino, _end);
388,485,136✔
933
  code = blockDataCheck(pRes);
388,485,136✔
934
  QUERY_CHECK_CODE(code, lino, _end);
388,509,150✔
935

936
  if (pRes == NULL) {
388,509,150✔
937
    st = taosGetTimestampUs();
80,579,235✔
938
  }
939

940
  int32_t rowsThreshold = pTaskInfo->pSubplan->rowsThreshold;
388,506,397✔
941
  if (!pTaskInfo->pSubplan->dynamicRowThreshold || 4096 <= pTaskInfo->pSubplan->rowsThreshold) {
388,479,308✔
942
    rowsThreshold = 4096;
387,910,097✔
943
  }
944

945
  int32_t blockIndex = 0;
388,442,662✔
946
  while (pRes != NULL) {
1,131,264,155✔
947
    SSDataBlock* p = NULL;
790,662,584✔
948
    if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) {
790,584,291✔
949
      SSDataBlock* p1 = NULL;
653,040,826✔
950
      code = createOneDataBlock(pRes, true, &p1);
653,047,953✔
951
      QUERY_CHECK_CODE(code, lino, _end);
653,014,421✔
952

953
      void* tmp = taosArrayPush(pTaskInfo->pResultBlockList, &p1);
653,014,421✔
954
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
653,065,585✔
955
      p = p1;
653,065,585✔
956
    } else if (processOneBlock) {
137,709,989✔
957
      SSDataBlock** tmp = taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
17,461,042✔
958
      if (tmp) {
17,461,042✔
959
        blockDataDestroy(*tmp);
17,461,042✔
960
        *tmp = NULL;
17,461,042✔
961
      }
962
      SSDataBlock* p1 = NULL;
17,461,042✔
963
      code = createOneDataBlock(pRes, true, &p1);
17,461,042✔
964
      QUERY_CHECK_CODE(code, lino, _end);
17,461,042✔
965

966
      *tmp = p1;
17,461,042✔
967
      p = p1;
17,461,042✔
968
    } else {
969
      void* tmp = taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
120,248,947✔
970
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
120,248,125✔
971

972
      p = *(SSDataBlock**)tmp;
120,248,125✔
973
      code = copyDataBlock(p, pRes);
120,248,125✔
974
      QUERY_CHECK_CODE(code, lino, _end);
120,247,567✔
975
    }
976

977
    blockIndex += 1;
790,776,418✔
978

979
    current += p->info.rows;
790,776,418✔
980
    QUERY_CHECK_CONDITION((p->info.rows > 0 || p->info.type == STREAM_CHECKPOINT), code, lino, _end,
790,771,566✔
981
                          TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
982
    void* tmp = taosArrayPush(pResList, &p);
790,771,927✔
983
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
790,771,927✔
984

985
    if (current >= rowsThreshold || processOneBlock) {
790,771,927✔
986
      break;
987
    }
988

989
    code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
742,852,993✔
990
    QUERY_CHECK_CODE(code, lino, _end);
742,709,636✔
991
    code = blockDataCheck(pRes);
742,709,636✔
992
    QUERY_CHECK_CODE(code, lino, _end);
742,896,089✔
993
  }
994

995
  if (pTaskInfo->pSubplan->dynamicRowThreshold) {
388,520,501✔
996
    pTaskInfo->pSubplan->rowsThreshold -= current;
545,291✔
997
  }
998

999
  *hasMore = (pRes != NULL);
388,545,194✔
1000
  uint64_t el = (taosGetTimestampUs() - st);
388,516,157✔
1001

1002
  pTaskInfo->cost.elapsedTime += el;
388,516,157✔
1003
  if (NULL == pRes) {
388,500,342✔
1004
    *useconds = pTaskInfo->cost.elapsedTime;
340,581,395✔
1005
  }
1006

1007
_end:
388,522,535✔
1008
  (void)cleanUpUdfs();
388,510,672✔
1009

1010
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
388,567,676✔
1011
  qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
388,567,099✔
1012
         GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
1013

1014
  atomic_store_64(&pTaskInfo->owner, 0);
388,566,612✔
1015
  if (code) {
388,568,187✔
1016
    pTaskInfo->code = code;
×
1017
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1018
  }
1019

1020
  return pTaskInfo->code;
388,568,187✔
1021
}
1022

1023
void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
×
1024
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
1025
  SArray*        pList = pTaskInfo->pResultBlockList;
×
1026
  size_t         num = taosArrayGetSize(pList);
×
1027
  for (int32_t i = 0; i < num; ++i) {
×
1028
    SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i);
×
1029
    if (p) {
×
1030
      blockDataDestroy(*p);
×
1031
    }
1032
  }
1033

1034
  taosArrayClear(pTaskInfo->pResultBlockList);
×
1035
}
×
1036

1037
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
24,046,855✔
1038
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
24,046,855✔
1039
  int64_t        threadId = taosGetSelfPthreadId();
24,046,855✔
1040
  int64_t        curOwner = 0;
24,047,157✔
1041

1042
  *pRes = NULL;
24,047,157✔
1043

1044
  // todo extract method
1045
  taosRLockLatch(&pTaskInfo->lock);
24,047,157✔
1046
  bool isKilled = isTaskKilled(pTaskInfo);
24,047,157✔
1047
  if (isKilled) {
24,046,553✔
1048
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
×
1049

1050
    taosRUnLockLatch(&pTaskInfo->lock);
×
1051
    return pTaskInfo->code;
×
1052
  }
1053

1054
  if (pTaskInfo->owner != 0) {
24,046,553✔
1055
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
×
1056
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
×
1057

1058
    taosRUnLockLatch(&pTaskInfo->lock);
×
1059
    return pTaskInfo->code;
×
1060
  }
1061

1062
  pTaskInfo->owner = threadId;
24,046,163✔
1063
  taosRUnLockLatch(&pTaskInfo->lock);
24,046,553✔
1064

1065
  if (pTaskInfo->cost.start == 0) {
24,047,157✔
1066
    pTaskInfo->cost.start = taosGetTimestampUs();
235,919✔
1067
  }
1068

1069
  // error occurs, record the error code and return to client
1070
  int32_t ret = setjmp(pTaskInfo->env);
24,047,157✔
1071
  if (ret != TSDB_CODE_SUCCESS) {
24,045,861✔
1072
    pTaskInfo->code = ret;
×
1073
    (void)cleanUpUdfs();
×
1074
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
×
1075
    atomic_store_64(&pTaskInfo->owner, 0);
×
1076
    return pTaskInfo->code;
×
1077
  }
1078

1079
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
24,045,861✔
1080

1081
  int64_t st = taosGetTimestampUs();
24,046,855✔
1082
  int32_t code = TSDB_CODE_SUCCESS;
24,046,855✔
1083
  if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
24,046,855✔
1084
    pTaskInfo->paramSet = true;
×
1085
    code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, pRes);
×
1086
  } else {
1087
    code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, pRes);
24,046,855✔
1088
  }
1089
  if (code) {
24,046,465✔
1090
    pTaskInfo->code = code;
×
1091
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
1092
  }
1093

1094
  code = blockDataCheck(*pRes);
24,046,465✔
1095
  if (code) {
24,047,157✔
1096
    pTaskInfo->code = code;
×
1097
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
1098
  }
1099

1100
  uint64_t el = (taosGetTimestampUs() - st);
24,046,463✔
1101

1102
  pTaskInfo->cost.elapsedTime += el;
24,046,463✔
1103
  if (NULL == *pRes) {
24,046,117✔
1104
    *useconds = pTaskInfo->cost.elapsedTime;
11,067,951✔
1105
  }
1106

1107
  (void)cleanUpUdfs();
24,046,463✔
1108

1109
  int32_t  current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
24,047,157✔
1110
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
24,047,157✔
1111

1112
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
24,047,157✔
1113
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
1114

1115
  atomic_store_64(&pTaskInfo->owner, 0);
24,047,157✔
1116
  return pTaskInfo->code;
24,047,157✔
1117
}
1118

1119
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
104,956,779✔
1120
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
104,956,779✔
1121
  void* tmp = taosArrayPush(pTaskInfo->stopInfo.pStopInfo, pInfo);
104,956,839✔
1122
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
104,958,176✔
1123

1124
  if (!tmp) {
104,957,151✔
1125
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1126
    return terrno;
×
1127
  }
1128
  return TSDB_CODE_SUCCESS;
104,957,151✔
1129
}
1130

1131
int32_t stopInfoComp(void const* lp, void const* rp) {
×
1132
  SExchangeOpStopInfo* key = (SExchangeOpStopInfo*)lp;
×
1133
  SExchangeOpStopInfo* pInfo = (SExchangeOpStopInfo*)rp;
×
1134

1135
  if (key->refId < pInfo->refId) {
×
1136
    return -1;
×
1137
  } else if (key->refId > pInfo->refId) {
×
1138
    return 1;
×
1139
  }
1140

1141
  return 0;
×
1142
}
1143

1144
void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
×
1145
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
×
1146
  int32_t idx = taosArraySearchIdx(pTaskInfo->stopInfo.pStopInfo, pInfo, stopInfoComp, TD_EQ);
×
1147
  if (idx >= 0) {
×
1148
    taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx);
×
1149
  }
1150
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
×
1151
}
×
1152

1153
void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
73,699✔
1154
  if (pTaskInfo->pSubJobCtx) {
73,699✔
1155
    pTaskInfo->pSubJobCtx->code = pTaskInfo->code;
23,091✔
1156
    int32_t code = tsem_post(&pTaskInfo->pSubJobCtx->ready);
23,091✔
1157
  }
1158
  
1159
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
73,699✔
1160

1161
  int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo);
73,699✔
1162
  for (int32_t i = 0; i < num; ++i) {
77,137✔
1163
    SExchangeOpStopInfo* pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i);
3,438✔
1164
    if (!pStop) {
3,438✔
1165
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1166
      continue;
×
1167
    }
1168
    SExchangeInfo* pExchangeInfo = taosAcquireRef(fetchObjRefPool, pStop->refId);
3,438✔
1169
    if (pExchangeInfo) {
3,438✔
1170
      int32_t code = tsem_post(&pExchangeInfo->ready);
3,438✔
1171
      if (code != TSDB_CODE_SUCCESS) {
3,438✔
1172
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1173
      } else {
1174
        qDebug("post to exchange %" PRId64 " to stop", pStop->refId);
3,438✔
1175
      }
1176
      code = taosReleaseRef(fetchObjRefPool, pStop->refId);
3,438✔
1177
      if (code != TSDB_CODE_SUCCESS) {
3,438✔
1178
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1179
      }
1180
    }
1181
  }
1182

1183
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
73,699✔
1184
}
73,699✔
1185

1186
int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
73,699✔
1187
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
73,699✔
1188
  if (pTaskInfo == NULL) {
73,699✔
1189
    return TSDB_CODE_QRY_INVALID_QHANDLE;
×
1190
  }
1191

1192
  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
73,699✔
1193

1194
  setTaskKilled(pTaskInfo, rspCode);
73,699✔
1195
  qStopTaskOperators(pTaskInfo);
73,699✔
1196

1197
  return TSDB_CODE_SUCCESS;
73,699✔
1198
}
1199

1200
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration) {
×
1201
  int64_t        st = taosGetTimestampMs();
×
1202
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
1203
  if (pTaskInfo == NULL) {
×
1204
    return TSDB_CODE_QRY_INVALID_QHANDLE;
×
1205
  }
1206

1207
  if (waitDuration > 0) {
×
1208
    qDebug("%s sync killed execTask, and waiting for at most %.2fs", GET_TASKID(pTaskInfo), waitDuration / 1000.0);
×
1209
  } else {
1210
    qDebug("%s async killed execTask", GET_TASKID(pTaskInfo));
×
1211
  }
1212

1213
  setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
×
1214

1215
  if (waitDuration > 0) {
×
1216
    while (1) {
1217
      taosWLockLatch(&pTaskInfo->lock);
×
1218
      if (qTaskIsExecuting(pTaskInfo)) {  // let's wait for 100 ms and try again
×
1219
        taosWUnLockLatch(&pTaskInfo->lock);
×
1220

1221
        taosMsleep(200);
×
1222

1223
        int64_t d = taosGetTimestampMs() - st;
×
1224
        if (d >= waitDuration && waitDuration >= 0) {
×
1225
          qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitDuration / 1000.0);
×
1226
          return TSDB_CODE_SUCCESS;
×
1227
        }
1228
      } else {  // not running now
1229
        pTaskInfo->code = rspCode;
×
1230
        taosWUnLockLatch(&pTaskInfo->lock);
×
1231
        return TSDB_CODE_SUCCESS;
×
1232
      }
1233
    }
1234
  }
1235

1236
  int64_t et = taosGetTimestampMs() - st;
×
1237
  if (et < waitDuration) {
×
1238
    qInfo("%s  waiting %.2fs for executor stopping", GET_TASKID(pTaskInfo), et / 1000.0);
×
1239
    return TSDB_CODE_SUCCESS;
×
1240
  }
1241
  return TSDB_CODE_SUCCESS;
×
1242
}
1243

1244
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
×
1245
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
×
1246
  if (NULL == pTaskInfo) {
×
1247
    return false;
×
1248
  }
1249

1250
  return 0 != atomic_load_64(&pTaskInfo->owner);
×
1251
}
1252

1253
static void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
340,493,283✔
1254
  STaskCostInfo* pSummary = &pTaskInfo->cost;
340,493,283✔
1255
  int64_t        idleTime = pSummary->start - pSummary->created;
340,490,028✔
1256

1257
  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
340,475,398✔
1258
  if (pSummary->pRecoder != NULL) {
340,461,355✔
1259
    qDebug(
252,255,685✔
1260
        "%s :cost summary: idle:%.2f ms, elapsed time:%.2f ms, extract tableList:%.2f ms, "
1261
        "createGroupIdMap:%.2f ms, total blocks:%d, "
1262
        "load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
1263
        GET_TASKID(pTaskInfo), idleTime / 1000.0, pSummary->elapsedTime / 1000.0, pSummary->extractListTime,
1264
        pSummary->groupIdMapTime, pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks,
1265
        pRecorder->totalRows, pRecorder->totalCheckedRows);
1266
  } else {
1267
    qDebug("%s :cost summary: idle in queue:%.2f ms, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), idleTime / 1000.0,
88,189,833✔
1268
           pSummary->elapsedTime / 1000.0);
1269
  }
1270
}
340,446,284✔
1271

1272

1273
void qDestroyTask(qTaskInfo_t qTaskHandle) {
347,361,754✔
1274
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
347,361,754✔
1275
  if (pTaskInfo == NULL) {
347,361,754✔
1276
    return;
6,885,681✔
1277
  }
1278

1279
  if (pTaskInfo->pRoot != NULL) {
340,476,073✔
1280
    qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
340,503,148✔
1281
  } else {
1282
    qDebug("%s execTask completed", GET_TASKID(pTaskInfo));
×
1283
  }
1284

1285
  printTaskExecCostInLog(pTaskInfo);  // print the query cost summary
340,510,071✔
1286
  doDestroyTask(pTaskInfo);
340,493,657✔
1287
}
1288

1289
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
3,769,538✔
1290
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
3,769,538✔
1291
  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
3,769,538✔
1292
}
1293

1294
void qExtractTmqScanner(qTaskInfo_t tinfo, void** scanner) {
332,563✔
1295
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
332,563✔
1296
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
332,563✔
1297

1298
  while (1) {
327,064✔
1299
    uint16_t type = pOperator->operatorType;
660,230✔
1300
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
660,230✔
1301
      *scanner = pOperator->info;
333,166✔
1302
      break;
333,166✔
1303
    } else {
1304
      pOperator = pOperator->pDownstream[0];
327,064✔
1305
    }
1306
  }
1307
}
333,166✔
1308

1309
void* qExtractReaderFromTmqScanner(void* scanner) {
333,166✔
1310
  SStreamScanInfo* pInfo = scanner;
333,166✔
1311
  return (void*)pInfo->tqReader;
333,166✔
1312
}
1313

1314
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) {
761,861✔
1315
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
761,861✔
1316
  return pTaskInfo->streamInfo.schema;
761,861✔
1317
}
1318

1319
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
761,861✔
1320
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
761,861✔
1321
  return pTaskInfo->streamInfo.tbName;
761,861✔
1322
}
1323

1324
SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
803,244✔
1325
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
803,244✔
1326
  return &pTaskInfo->streamInfo.btMetaRsp;
803,244✔
1327
}
1328

1329
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
23,333,722✔
1330
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
23,333,722✔
1331
  tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset);
23,333,722✔
1332
  return 0;
23,333,402✔
1333
}
1334

1335
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
760,124✔
1336
  memset(pCond, 0, sizeof(SQueryTableDataCond));
760,124✔
1337
  pCond->order = TSDB_ORDER_ASC;
760,124✔
1338
  pCond->numOfCols = pMtInfo->schema->nCols;
762,238✔
1339
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
761,332✔
1340
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
761,332✔
1341
  if (pCond->colList == NULL || pCond->pSlotList == NULL) {
761,936✔
1342
    taosMemoryFreeClear(pCond->colList);
×
1343
    taosMemoryFreeClear(pCond->pSlotList);
×
1344
    return terrno;
×
1345
  }
1346

1347
  TAOS_SET_OBJ_ALIGNED(&pCond->twindows, TSWINDOW_INITIALIZER);
761,936✔
1348
  pCond->suid = pMtInfo->suid;
761,936✔
1349
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
761,936✔
1350
  pCond->startVersion = -1;
762,238✔
1351
  pCond->endVersion = sContext->snapVersion;
762,238✔
1352

1353
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
4,813,675✔
1354
    SColumnInfo* pColInfo = &pCond->colList[i];
4,051,761✔
1355
    pColInfo->type = pMtInfo->schema->pSchema[i].type;
4,051,459✔
1356
    pColInfo->bytes = pMtInfo->schema->pSchema[i].bytes;
4,052,365✔
1357
    if (pMtInfo->pExtSchemas != NULL) {
4,052,063✔
1358
      decimalFromTypeMod(pMtInfo->pExtSchemas[i].typeMod, &pColInfo->precision, &pColInfo->scale);
46,720✔
1359
    }
1360
    pColInfo->colId = pMtInfo->schema->pSchema[i].colId;
4,052,376✔
1361
    pColInfo->pk = pMtInfo->schema->pSchema[i].flags & COL_IS_KEY;
4,051,124✔
1362

1363
    pCond->pSlotList[i] = i;
4,051,146✔
1364
  }
1365

1366
  return TSDB_CODE_SUCCESS;
762,842✔
1367
}
1368

1369
void qStreamSetOpen(qTaskInfo_t tinfo) {
22,508,197✔
1370
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
22,508,197✔
1371
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
22,508,197✔
1372
  pOperator->status = OP_NOT_OPENED;
22,510,359✔
1373
}
22,511,926✔
1374

1375
void qStreamSetParams(qTaskInfo_t tinfo, int8_t sourceExcluded, int32_t minPollRows, int64_t timeout, int8_t enableReplay) {
22,560,250✔
1376
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
22,560,250✔
1377
  pTaskInfo->streamInfo.sourceExcluded = sourceExcluded;
22,560,250✔
1378
  pTaskInfo->streamInfo.minPollRows = minPollRows;
22,558,877✔
1379
  pTaskInfo->streamInfo.timeout = timeout;
22,553,671✔
1380
  pTaskInfo->streamInfo.enableReplay = enableReplay;
22,554,943✔
1381
}
22,556,915✔
1382

1383
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
23,535,365✔
1384
  int32_t        code = TSDB_CODE_SUCCESS;
23,535,365✔
1385
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
23,535,365✔
1386
  SStorageAPI*   pAPI = &pTaskInfo->storageAPI;
23,535,365✔
1387

1388
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
23,538,290✔
1389
  const char*    id = GET_TASKID(pTaskInfo);
23,535,948✔
1390

1391
  if (subType == TOPIC_SUB_TYPE__COLUMN && pOffset->type == TMQ_OFFSET__LOG) {
23,532,902✔
1392
    code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
22,207,023✔
1393
    if (pOperator == NULL || code != 0) {
22,204,437✔
1394
      return code;
334✔
1395
    }
1396

1397
    SStreamScanInfo* pInfo = pOperator->info;
22,204,103✔
1398
    SStoreTqReader*  pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
22,202,414✔
1399
    SWalReader*      pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
22,204,400✔
1400
    walReaderVerifyOffset(pWalReader, pOffset);
22,206,462✔
1401
  }
1402
  // if pOffset equal to current offset, means continue consume
1403
  if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) {
23,528,256✔
1404
    return 0;
21,506,416✔
1405
  }
1406

1407
  if (subType == TOPIC_SUB_TYPE__COLUMN) {
2,024,194✔
1408
    code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
1,244,757✔
1409
    if (pOperator == NULL || code != 0) {
1,242,882✔
1410
      return code;
×
1411
    }
1412

1413
    SStreamScanInfo* pInfo = pOperator->info;
1,243,504✔
1414
    STableScanInfo*  pScanInfo = pInfo->pTableScanOp->info;
1,243,769✔
1415
    STableScanBase*  pScanBaseInfo = &pScanInfo->base;
1,242,786✔
1416
    STableListInfo*  pTableListInfo = pScanBaseInfo->pTableListInfo;
1,243,190✔
1417

1418
    if (pOffset->type == TMQ_OFFSET__LOG) {
1,242,854✔
1419
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pScanBaseInfo->dataReader);
1,003,435✔
1420
      pScanBaseInfo->dataReader = NULL;
1,003,379✔
1421

1422
      SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
1,003,379✔
1423
      SWalReader*     pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
1,003,379✔
1424
      walReaderVerifyOffset(pWalReader, pOffset);
1,004,002✔
1425
      code = pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version, id);
1,004,349✔
1426
      if (code < 0) {
1,004,349✔
1427
        qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version, id);
12,536✔
1428
        return code;
12,536✔
1429
      }
1430
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
240,120✔
1431
      // iterate all tables from tableInfoList, and retrieve rows from each table one-by-one
1432
      // those data are from the snapshot in tsdb, besides the data in the wal file.
1433
      int64_t uid = pOffset->uid;
240,768✔
1434
      int64_t ts = pOffset->ts;
240,742✔
1435
      int32_t index = 0;
238,839✔
1436

1437
      // this value may be changed if new tables are created
1438
      taosRLockLatch(&pTaskInfo->lock);
238,839✔
1439
      int32_t numOfTables = 0;
241,066✔
1440
      code = tableListGetSize(pTableListInfo, &numOfTables);
241,066✔
1441
      if (code != TSDB_CODE_SUCCESS) {
240,083✔
1442
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1443
        taosRUnLockLatch(&pTaskInfo->lock);
×
1444
        return code;
×
1445
      }
1446

1447
      if (uid == 0) {
240,083✔
1448
        if (numOfTables != 0) {
231,577✔
1449
          STableKeyInfo* tmp = tableListGetInfo(pTableListInfo, 0);
39,089✔
1450
          if (!tmp) {
38,791✔
1451
            qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1452
            taosRUnLockLatch(&pTaskInfo->lock);
×
1453
            return terrno;
×
1454
          }
1455
          if (tmp) uid = tmp->uid;
38,791✔
1456
          ts = INT64_MIN;
38,493✔
1457
          pScanInfo->currentTable = 0;
38,493✔
1458
        } else {
1459
          taosRUnLockLatch(&pTaskInfo->lock);
192,488✔
1460
          qError("no table in table list, %s", id);
192,812✔
1461
          return TSDB_CODE_TMQ_NO_TABLE_QUALIFIED;
193,460✔
1462
        }
1463
      }
1464
      pTaskInfo->storageAPI.tqReaderFn.tqSetTablePrimaryKey(pInfo->tqReader, uid);
47,595✔
1465

1466
      qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% " PRId64 " rows returned", uid, ts,
47,601✔
1467
             pInfo->pTableScanOp->resultInfo.totalRows);
1468
      pInfo->pTableScanOp->resultInfo.totalRows = 0;
47,919✔
1469

1470
      // start from current accessed position
1471
      // we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start
1472
      // position, let's find it from the beginning.
1473
      index = tableListFind(pTableListInfo, uid, 0);
47,919✔
1474
      taosRUnLockLatch(&pTaskInfo->lock);
47,919✔
1475

1476
      if (index >= 0) {
47,919✔
1477
        pScanInfo->currentTable = index;
47,919✔
1478
      } else {
1479
        qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
×
1480
               numOfTables, pScanInfo->currentTable, id);
1481
        return TSDB_CODE_TMQ_NO_TABLE_QUALIFIED;
×
1482
      }
1483

1484
      STableKeyInfo keyInfo = {.uid = uid};
47,919✔
1485
      int64_t       oldSkey = pScanBaseInfo->cond.twindows.skey;
47,919✔
1486

1487
      // let's start from the next ts that returned to consumer.
1488
      if (pTaskInfo->storageAPI.tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader)) {
47,919✔
1489
        pScanBaseInfo->cond.twindows.skey = ts;
926✔
1490
      } else {
1491
        pScanBaseInfo->cond.twindows.skey = ts + 1;
46,993✔
1492
      }
1493
      pScanInfo->scanTimes = 0;
47,919✔
1494

1495
      if (pScanBaseInfo->dataReader == NULL) {
47,919✔
1496
        code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond,
81,350✔
1497
                                                             &keyInfo, 1, pScanInfo->pResBlock,
1498
                                                             (void**)&pScanBaseInfo->dataReader, id, NULL);
40,675✔
1499
        if (code != TSDB_CODE_SUCCESS) {
40,675✔
1500
          qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
×
1501
          return code;
×
1502
        }
1503

1504
        qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s",
40,675✔
1505
               uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
1506
      } else {
1507
        code = pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
7,244✔
1508
        if (code != TSDB_CODE_SUCCESS) {
7,244✔
1509
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1510
          return code;
×
1511
        }
1512

1513
        code = pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
7,244✔
1514
        if (code != TSDB_CODE_SUCCESS) {
7,244✔
1515
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1516
          return code;
×
1517
        }
1518
        qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 "  table index:%d numOfTable:%d, %s",
7,244✔
1519
               uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
1520
      }
1521

1522
      // restore the key value
1523
      pScanBaseInfo->cond.twindows.skey = oldSkey;
48,237✔
1524
    } else {
1525
      qError("invalid pOffset->type:%d, %s", pOffset->type, id);
×
1526
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1527
    }
1528

1529
  } else {  // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB
1530
    if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
779,437✔
1531
      SStreamRawScanInfo* pInfo = pOperator->info;
764,042✔
1532
      SSnapContext*       sContext = pInfo->sContext;
764,042✔
1533
      SOperatorInfo*      p = NULL;
764,042✔
1534

1535
      code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id, &p);
764,042✔
1536
      if (code != 0) {
763,740✔
1537
        return code;
×
1538
      }
1539

1540
      STableListInfo* pTableListInfo = ((SStreamRawScanInfo*)(p->info))->pTableListInfo;
763,740✔
1541

1542
      if (pAPI->snapshotFn.setForSnapShot(sContext, pOffset->uid) != 0) {
764,042✔
1543
        qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id);
×
1544
        return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1545
      }
1546

1547
      SMetaTableInfo mtInfo = {0};
763,740✔
1548
      code = pTaskInfo->storageAPI.snapshotFn.getMetaTableInfoFromSnapshot(sContext, &mtInfo);
763,740✔
1549
      if (code != 0) {
762,834✔
1550
        destroyMetaTableInfo(&mtInfo);
1551
        return code;
×
1552
      }
1553
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
762,834✔
1554
      pInfo->dataReader = NULL;
761,291✔
1555

1556
      cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
762,208✔
1557
      tableListClear(pTableListInfo);
762,812✔
1558

1559
      if (mtInfo.uid == 0) {
764,042✔
1560
        destroyMetaTableInfo(&mtInfo);
1561
        goto end;  // no data
1,200✔
1562
      }
1563

1564
      pAPI->snapshotFn.taosXSetTablePrimaryKey(sContext, mtInfo.uid);
762,842✔
1565
      code = initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
761,030✔
1566
      if (code != TSDB_CODE_SUCCESS) {
761,008✔
1567
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1568
        destroyMetaTableInfo(&mtInfo);
1569
        return code;
×
1570
      }
1571
      if (pAPI->snapshotFn.taosXGetTablePrimaryKey(sContext)) {
761,008✔
1572
        pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
1,326✔
1573
      } else {
1574
        pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts + 1;
759,995✔
1575
      }
1576

1577
      code = tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0);
760,091✔
1578
      if (code != TSDB_CODE_SUCCESS) {
762,842✔
1579
        destroyMetaTableInfo(&mtInfo);
1580
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1581
        return code;
×
1582
      }
1583

1584
      STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
762,842✔
1585
      if (!pList) {
762,842✔
1586
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1587
        destroyMetaTableInfo(&mtInfo);
1588
        return code;
×
1589
      }
1590
      int32_t size = 0;
762,842✔
1591
      code = tableListGetSize(pTableListInfo, &size);
762,842✔
1592
      if (code != TSDB_CODE_SUCCESS) {
762,842✔
1593
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1594
        destroyMetaTableInfo(&mtInfo);
1595
        return code;
×
1596
      }
1597

1598
      code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size,
1,525,684✔
1599
                                                           NULL, (void**)&pInfo->dataReader, NULL, NULL);
762,842✔
1600
      if (code != TSDB_CODE_SUCCESS) {
761,321✔
1601
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1602
        destroyMetaTableInfo(&mtInfo);
1603
        return code;
×
1604
      }
1605

1606
      cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
761,321✔
1607
      tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN);
761,601✔
1608
      //      pTaskInfo->streamInfo.suid = mtInfo.suid == 0 ? mtInfo.uid : mtInfo.suid;
1609
      tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema);
760,393✔
1610
      pTaskInfo->streamInfo.schema = mtInfo.schema;
760,706✔
1611
      taosMemoryFreeClear(mtInfo.pExtSchemas);
761,925✔
1612

1613
      qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64 " %s", mtInfo.uid, pOffset->ts, id);
761,925✔
1614
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
15,395✔
1615
      SStreamRawScanInfo* pInfo = pOperator->info;
5,523✔
1616
      SSnapContext*       sContext = pInfo->sContext;
5,523✔
1617
      code = pTaskInfo->storageAPI.snapshotFn.setForSnapShot(sContext, pOffset->uid);
5,523✔
1618
      if (code != 0) {
5,523✔
1619
        qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
×
1620
        return code;
×
1621
      }
1622
      qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts,
5,523✔
1623
             id);
1624
    } else if (pOffset->type == TMQ_OFFSET__LOG) {
10,174✔
1625
      SStreamRawScanInfo* pInfo = pOperator->info;
10,174✔
1626
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
10,174✔
1627
      pInfo->dataReader = NULL;
10,174✔
1628
      qDebug("tmqsnap qStreamPrepareScan snapshot log, %s", id);
10,174✔
1629
    }
1630
  }
1631

1632
end:
1,819,361✔
1633
  tOffsetCopy(&pTaskInfo->streamInfo.currentOffset, pOffset);
1,819,471✔
1634
  return 0;
1,819,471✔
1635
}
1636

1637
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
260,805,746✔
1638
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
260,805,746✔
1639
  if (pMsg->info.ahandle == NULL) {
260,840,208✔
1640
    rpcFreeCont(pMsg->pCont);
×
1641
    qError("rsp msg got while pMsg->info.ahandle is NULL, 0x%" PRIx64 ":0x%" PRIx64, TRACE_GET_ROOTID(&pMsg->info.traceId), TRACE_GET_MSGID(&pMsg->info.traceId));
×
1642
    return;
×
1643
  }
1644

1645
  qDebug("rsp msg got, code:%x, len:%d, 0x%" PRIx64 ":0x%" PRIx64, 
260,770,782✔
1646
      pMsg->code, pMsg->contLen, TRACE_GET_ROOTID(&pMsg->info.traceId), TRACE_GET_MSGID(&pMsg->info.traceId));
1647

1648
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};
260,783,546✔
1649

1650
  if (pMsg->contLen > 0) {
260,822,903✔
1651
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
260,802,992✔
1652
    if (buf.pData == NULL) {
260,765,037✔
1653
      pMsg->code = terrno;
×
1654
    } else {
1655
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
260,765,037✔
1656
    }
1657
  }
1658

1659
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
260,803,217✔
1660
  rpcFreeCont(pMsg->pCont);
260,866,557✔
1661
  destroySendMsgInfo(pSendInfo);
260,815,334✔
1662
}
1663

1664
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
×
1665
  int32_t        code = TSDB_CODE_SUCCESS;
×
1666
  int32_t        lino = 0;
×
1667
  SExecTaskInfo* pTaskInfo = tinfo;
×
1668
  SArray*        plist = NULL;
×
1669

1670
  code = getTableListInfo(pTaskInfo, &plist);
×
1671
  if (code || plist == NULL) {
×
1672
    return NULL;
×
1673
  }
1674

1675
  // only extract table in the first elements
1676
  STableListInfo* pTableListInfo = taosArrayGetP(plist, 0);
×
1677

1678
  SArray* pUidList = taosArrayInit(10, sizeof(uint64_t));
×
1679
  QUERY_CHECK_NULL(pUidList, code, lino, _end, terrno);
×
1680

1681
  int32_t numOfTables = 0;
×
1682
  code = tableListGetSize(pTableListInfo, &numOfTables);
×
1683
  QUERY_CHECK_CODE(code, lino, _end);
×
1684

1685
  for (int32_t i = 0; i < numOfTables; ++i) {
×
1686
    STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
×
1687
    QUERY_CHECK_NULL(pKeyInfo, code, lino, _end, terrno);
×
1688
    void* tmp = taosArrayPush(pUidList, &pKeyInfo->uid);
×
1689
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1690
  }
1691

1692
  taosArrayDestroy(plist);
×
1693

1694
_end:
×
1695
  if (code != TSDB_CODE_SUCCESS) {
×
1696
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1697
    T_LONG_JMP(pTaskInfo->env, code);
×
1698
  }
1699
  return pUidList;
×
1700
}
1701

1702
static int32_t extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
3,444,323✔
1703
  int32_t        code = TSDB_CODE_SUCCESS;
3,444,323✔
1704
  int32_t        lino = 0;
3,444,323✔
1705
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3,444,323✔
1706

1707
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
3,444,323✔
1708
    SStreamScanInfo* pScanInfo = pOperator->info;
×
1709
    STableScanInfo*  pTableScanInfo = pScanInfo->pTableScanOp->info;
×
1710

1711
    void* tmp = taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
×
1712
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1713
  } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
3,443,654✔
1714
    STableScanInfo* pScanInfo = pOperator->info;
1,721,549✔
1715

1716
    void* tmp = taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
1,721,549✔
1717
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,722,774✔
1718
  } else {
1719
    if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
1,722,105✔
1720
      code = extractTableList(pList, pOperator->pDownstream[0]);
1,720,993✔
1721
    }
1722
  }
1723

1724
_end:
×
1725
  if (code != TSDB_CODE_SUCCESS) {
3,444,328✔
1726
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1727
  }
1728
  return code;
3,443,659✔
1729
}
1730

1731
int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList) {
1,722,774✔
1732
  if (pList == NULL) {
1,722,774✔
1733
    return TSDB_CODE_INVALID_PARA;
×
1734
  }
1735

1736
  *pList = NULL;
1,722,774✔
1737
  SArray* pArray = taosArrayInit(0, POINTER_BYTES);
1,722,774✔
1738
  if (pArray == NULL) {
1,722,774✔
1739
    return terrno;
×
1740
  }
1741

1742
  int32_t code = extractTableList(pArray, pTaskInfo->pRoot);
1,722,774✔
1743
  if (code == 0) {
1,722,105✔
1744
    *pList = pArray;
1,722,105✔
1745
  } else {
1746
    taosArrayDestroy(pArray);
×
1747
  }
1748
  return code;
1,722,105✔
1749
}
1750

1751
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) {
×
1752
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
×
1753
  if (pTaskInfo->pRoot->fpSet.releaseStreamStateFn != NULL) {
×
1754
    pTaskInfo->pRoot->fpSet.releaseStreamStateFn(pTaskInfo->pRoot);
×
1755
  }
1756
  return 0;
×
1757
}
1758

1759
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo) {
×
1760
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
×
1761
  if (pTaskInfo->pRoot->fpSet.reloadStreamStateFn != NULL) {
×
1762
    pTaskInfo->pRoot->fpSet.reloadStreamStateFn(pTaskInfo->pRoot);
×
1763
  }
1764
  return 0;
×
1765
}
1766

1767
void qResetTaskCode(qTaskInfo_t tinfo) {
×
1768
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
1769

1770
  int32_t code = pTaskInfo->code;
×
1771
  pTaskInfo->code = 0;
×
1772
  qDebug("0x%" PRIx64 " reset task code to be success, prev:%s", pTaskInfo->id.taskId, tstrerror(code));
×
1773
}
×
1774

1775
int32_t collectExprsToReplaceForStream(SOperatorInfo* pOper, SArray* pExprs) {
×
1776
  int32_t code = 0;
×
1777
  return code;
×
1778
}
1779

1780
int32_t streamCollectExprsForReplace(qTaskInfo_t tInfo, SArray* pExprs) {
×
1781
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
×
1782
  int32_t        code = collectExprsToReplaceForStream(pTaskInfo->pRoot, pExprs);
×
1783
  return code;
×
1784
}
1785

1786
int32_t clearStatesForOperator(SOperatorInfo* pOper) {
23,551,092✔
1787
  int32_t code = 0;
23,551,092✔
1788

1789
  freeResetOperatorParams(pOper, OP_GET_PARAM, true);
23,551,092✔
1790
  freeResetOperatorParams(pOper, OP_NOTIFY_PARAM, true);
23,549,357✔
1791

1792
  if (pOper->fpSet.resetStateFn) {
23,550,001✔
1793
    code = pOper->fpSet.resetStateFn(pOper);
23,550,939✔
1794
  }
1795
  pOper->status = OP_NOT_OPENED;
23,545,879✔
1796
  for (int32_t i = 0; i < pOper->numOfDownstream && code == 0; ++i) {
39,975,493✔
1797
    code = clearStatesForOperator(pOper->pDownstream[i]);
16,426,099✔
1798
  }
1799
  return code;
23,551,227✔
1800
}
1801

1802
int32_t streamClearStatesForOperators(qTaskInfo_t tInfo) {
7,124,530✔
1803
  int32_t        code = 0;
7,124,530✔
1804
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
7,124,530✔
1805
  SOperatorInfo* pOper = pTaskInfo->pRoot;
7,124,530✔
1806
  pTaskInfo->code = TSDB_CODE_SUCCESS;
7,124,754✔
1807
  code = clearStatesForOperator(pOper);
7,124,754✔
1808
  return code;
7,124,754✔
1809
}
1810

1811
int32_t streamExecuteTask(qTaskInfo_t tInfo, SSDataBlock** ppRes, uint64_t* useconds, bool* finished) {
10,057,571✔
1812
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
10,057,571✔
1813
  int64_t        threadId = taosGetSelfPthreadId();
10,057,571✔
1814
  int64_t        curOwner = 0;
10,058,029✔
1815

1816
  *ppRes = NULL;
10,058,029✔
1817

1818
  // todo extract method
1819
  taosRLockLatch(&pTaskInfo->lock);
10,058,029✔
1820
  bool isKilled = isTaskKilled(pTaskInfo);
10,058,510✔
1821
  if (isKilled) {
10,058,286✔
1822
    // clearStreamBlock(pTaskInfo->pRoot);
1823
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
×
1824

1825
    taosRUnLockLatch(&pTaskInfo->lock);
×
1826
    return pTaskInfo->code;
×
1827
  }
1828

1829
  if (pTaskInfo->owner != 0) {
10,058,286✔
1830
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
×
1831
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
×
1832

1833
    taosRUnLockLatch(&pTaskInfo->lock);
×
1834
    return pTaskInfo->code;
×
1835
  }
1836

1837
  pTaskInfo->owner = threadId;
10,057,324✔
1838
  taosRUnLockLatch(&pTaskInfo->lock);
10,058,510✔
1839

1840
  if (pTaskInfo->cost.start == 0) {
10,058,286✔
1841
    pTaskInfo->cost.start = taosGetTimestampUs();
322,277✔
1842
  }
1843

1844
  // error occurs, record the error code and return to client
1845
  int32_t ret = setjmp(pTaskInfo->env);
10,058,286✔
1846
  if (ret != TSDB_CODE_SUCCESS) {
11,257,279✔
1847
    pTaskInfo->code = ret;
1,200,420✔
1848
    (void)cleanUpUdfs();
1,200,420✔
1849
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
1,200,420✔
1850
    atomic_store_64(&pTaskInfo->owner, 0);
1,200,420✔
1851
    return pTaskInfo->code;
1,200,420✔
1852
  }
1853

1854
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
10,056,859✔
1855

1856
  int64_t st = taosGetTimestampUs();
10,058,510✔
1857

1858
  int32_t code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, ppRes);
10,058,510✔
1859
  if (code) {
8,857,977✔
1860
    pTaskInfo->code = code;
×
1861
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
1862
  } else {
1863
    *finished = *ppRes == NULL;
8,857,977✔
1864
    code = blockDataCheck(*ppRes);
8,857,977✔
1865
  }
1866
  if (code) {
8,858,090✔
1867
    pTaskInfo->code = code;
×
1868
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
1869
  }
1870

1871
  uint64_t el = (taosGetTimestampUs() - st);
8,858,090✔
1872

1873
  pTaskInfo->cost.elapsedTime += el;
8,858,090✔
1874
  if (NULL == *ppRes) {
8,857,694✔
1875
    *useconds = pTaskInfo->cost.elapsedTime;
5,499,304✔
1876
  }
1877

1878
  (void)cleanUpUdfs();
8,857,918✔
1879

1880
  int32_t  current = (*ppRes != NULL) ? (*ppRes)->info.rows : 0;
8,858,090✔
1881
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
8,858,090✔
1882

1883
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
8,858,090✔
1884
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
1885

1886
  atomic_store_64(&pTaskInfo->owner, 0);
8,858,090✔
1887
  return pTaskInfo->code;
8,858,090✔
1888
}
1889

1890
// void streamSetTaskRuntimeInfo(qTaskInfo_t tinfo, SStreamRuntimeInfo* pStreamRuntimeInfo) {
1891
//   SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1892
//   pTaskInfo->pStreamRuntimeInfo = pStreamRuntimeInfo;
1893
// }
1894

1895
int32_t qStreamCreateTableListForReader(void* pVnode, uint64_t suid, uint64_t uid, int8_t tableType,
270,365✔
1896
                                        SNodeList* pGroupTags, bool groupSort, SNode* pTagCond, SNode* pTagIndexCond,
1897
                                        SStorageAPI* storageAPI, void** pTableListInfo, SHashObj* groupIdMap) {
1898
  int32_t code = 0;                                        
270,365✔
1899
  if (*pTableListInfo != NULL) {
270,365✔
1900
    qDebug("table list already exists, no need to create again");
×
1901
    goto end;
×
1902
  }
1903
  STableListInfo* pList = tableListCreate();
270,365✔
1904
  if (pList == NULL) {
270,365✔
1905
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1906
    code = terrno;
×
1907
    goto end;
×
1908
  }
1909

1910
  SScanPhysiNode pScanNode = {.suid = suid, .uid = uid, .tableType = tableType};
270,365✔
1911
  SReadHandle    pHandle = {.vnode = pVnode};
270,365✔
1912
  SExecTaskInfo  pTaskInfo = {.id.str = "", .storageAPI = *storageAPI};
270,365✔
1913

1914
  code = createScanTableListInfo(&pScanNode, pGroupTags, groupSort, &pHandle, pList, pTagCond, pTagIndexCond, &pTaskInfo, groupIdMap);
270,365✔
1915
  if (code != 0) {
270,365✔
1916
    tableListDestroy(pList);
×
1917
    qError("failed to createScanTableListInfo, code:%s", tstrerror(code));
×
1918
    goto end;
×
1919
  }
1920
  *pTableListInfo = pList;
270,365✔
1921

1922
end:
270,365✔
1923
  return 0;
270,365✔
1924
}
1925

1926
static int32_t doFilterTableByTagCond(void* pVnode, STableListInfo* pListInfo, SArray* pUidList, SNode* pTagCond, SStorageAPI* pStorageAPI){
32,350✔
1927
  int32_t code = doFilterByTagCond(pListInfo->idInfo.suid, pUidList, pTagCond, pVnode, SFLT_NOT_INDEX, pStorageAPI, NULL);
32,350✔
1928
  if (code != 0) {
32,350✔
1929
    return code;
×
1930
  }
1931
  int32_t numOfTables = taosArrayGetSize(pUidList);
32,350✔
1932
  for (int i = 0; i < numOfTables; i++) {
61,248✔
1933
    void* tmp = taosArrayGet(pUidList, i);
28,898✔
1934
    if (tmp == NULL) {
28,898✔
1935
      return terrno;
×
1936
    }
1937
    STableKeyInfo info = {.uid = *(uint64_t*)tmp, .groupId = 0};
28,898✔
1938

1939
    void* p = taosArrayPush(((STableListInfo*)pListInfo)->pTableList, &info);
28,898✔
1940
    if (p == NULL) {
28,898✔
1941
      return terrno;
×
1942
    }
1943
  }
1944
  return code;
32,350✔
1945
}
1946

1947
int32_t qStreamFilterTableListForReader(void* pVnode, SArray* uidList,
32,350✔
1948
                                        SNodeList* pGroupTags, SNode* pTagCond, SNode* pTagIndexCond,
1949
                                        SStorageAPI* storageAPI, SHashObj* groupIdMap, uint64_t suid, SArray** tableList) {
1950
  int32_t code = TSDB_CODE_SUCCESS;
32,350✔
1951
  SArray* uidListCopy = NULL;
32,350✔
1952
  STableListInfo* pList = tableListCreate();
32,350✔
1953
  if (pList == NULL) {
32,350✔
1954
    code = terrno;
×
1955
    goto end;
×
1956
  }
1957
  uidListCopy = taosArrayDup(uidList, NULL);
32,350✔
1958
  if (uidListCopy == NULL) {
32,350✔
1959
    code = terrno;
×
1960
    goto end;
×
1961
  }
1962
  SScanPhysiNode pScanNode = {.suid = suid, .tableType = TD_SUPER_TABLE};
32,350✔
1963
  SReadHandle    pHandle = {.vnode = pVnode};
32,350✔
1964

1965
  pList->idInfo.suid = suid;
32,350✔
1966
  pList->idInfo.tableType = TD_SUPER_TABLE;
32,350✔
1967
  code = doFilterTableByTagCond(pVnode, pList, uidList, pTagCond, storageAPI);
32,350✔
1968
  if (code != TSDB_CODE_SUCCESS) {
32,350✔
1969
    goto end;
×
1970
  }                                              
1971
  code = buildGroupIdMapForAllTables(pList, &pHandle, &pScanNode, pGroupTags, false, NULL, storageAPI, groupIdMap);
32,350✔
1972
  if (code != TSDB_CODE_SUCCESS) {
32,350✔
1973
    goto end;
×
1974
  }
1975
  *tableList = pList->pTableList;
32,350✔
1976
  pList->pTableList = NULL;
32,350✔
1977

1978
  taosArrayClear(uidList);
32,350✔
1979
  for (int32_t i = 0; i < taosArrayGetSize(uidListCopy); i++){
64,700✔
1980
    void* tmp = taosArrayGet(uidListCopy, i);
32,350✔
1981
    if (tmp == NULL) {
32,350✔
1982
      continue;
×
1983
    }
1984
    int32_t* slot = taosHashGet(pList->map, tmp, LONG_BYTES);
32,350✔
1985
    if (slot == NULL) {
32,350✔
1986
      if (taosArrayPush(uidList, tmp) == NULL) {
3,452✔
1987
        code = terrno;
×
1988
        goto end;
×
1989
      }
1990
    }
1991
  }
1992
end:
32,350✔
1993
  taosArrayDestroy(uidListCopy);
32,350✔
1994
  tableListDestroy(pList);
32,350✔
1995
  return code;
32,350✔
1996
}
1997

1998
// int32_t qStreamGetGroupIndex(void* pTableListInfo, int64_t gid, TdThreadRwlock* lock) {
1999
//   int32_t index = -1;
2000
//   (void)taosThreadRwlockRdlock(lock);
2001
//   if (((STableListInfo*)pTableListInfo)->groupOffset == NULL){
2002
//     index = 0;
2003
//     goto end;
2004
//   }
2005
//   for (int32_t i = 0; i < ((STableListInfo*)pTableListInfo)->numOfOuputGroups; ++i) {
2006
//     int32_t offset = ((STableListInfo*)pTableListInfo)->groupOffset[i];
2007

2008
//     STableKeyInfo* pKeyInfo = taosArrayGet(((STableListInfo*)pTableListInfo)->pTableList, offset);
2009
//     if (pKeyInfo != NULL && pKeyInfo->groupId == gid) {
2010
//       index = i;
2011
//       goto end;
2012
//     }
2013
//   }
2014
// end:
2015
//   (void)taosThreadRwlockUnlock(lock);
2016
//   return index;
2017
// }
2018

2019
void qStreamDestroyTableList(void* pTableListInfo) { tableListDestroy(pTableListInfo); }
270,365✔
2020
SArray*  qStreamGetTableListArray(void* pTableListInfo) {
270,365✔
2021
  STableListInfo* pList = pTableListInfo;
270,365✔
2022
  return pList->pTableList;
270,365✔
2023
}
2024

2025
int32_t qStreamFilter(SSDataBlock* pBlock, void* pFilterInfo, SColumnInfoData** pRet) { return doFilter(pBlock, pFilterInfo, NULL, pRet); }
1,362,865✔
2026

2027
void streamDestroyExecTask(qTaskInfo_t tInfo) {
2,783,205✔
2028
  qDebug("streamDestroyExecTask called, task:%p", tInfo);
2,783,205✔
2029
  qDestroyTask(tInfo);
2,783,205✔
2030
}
2,783,205✔
2031

2032
int32_t streamCalcOneScalarExpr(SNode* pExpr, SScalarParam* pDst, const SStreamRuntimeFuncInfo* pExtraParams) {
758,637✔
2033
  return streamCalcOneScalarExprInRange(pExpr, pDst, -1, -1, pExtraParams);
758,637✔
2034
}
2035

2036
int32_t streamCalcOneScalarExprInRange(SNode* pExpr, SScalarParam* pDst, int32_t rowStartIdx, int32_t rowEndIdx,
813,184✔
2037
                                       const SStreamRuntimeFuncInfo* pExtraParams) {
2038
  int32_t      code = 0;
813,184✔
2039
  SNode*       pNode = NULL;
813,184✔
2040
  SNodeList*   pList = NULL;
813,375✔
2041
  SExprInfo*   pExprInfo = NULL;
813,889✔
2042
  int32_t      numOfExprs = 1;
813,889✔
2043
  int32_t*     offset = NULL;
813,632✔
2044
  STargetNode* pTargetNode = NULL;
813,632✔
2045
  code = nodesMakeNode(QUERY_NODE_TARGET, (SNode**)&pTargetNode);
813,632✔
2046
  if (code == 0) {
813,889✔
2047
    code = nodesCloneNode(pExpr, &pNode);
813,889✔
2048
  }
2049

2050
  if (code == 0) {
813,889✔
2051
    pTargetNode->dataBlockId = 0;
813,889✔
2052
    pTargetNode->pExpr = pNode;
813,889✔
2053
    pTargetNode->slotId = 0;
813,889✔
2054
  }
2055
  if (code == 0) {
813,889✔
2056
    code = nodesMakeList(&pList);
813,889✔
2057
  }
2058
  if (code == 0) {
813,889✔
2059
    code = nodesListAppend(pList, (SNode*)pTargetNode);
813,889✔
2060
  }
2061
  if (code == 0) {
813,889✔
2062
    pNode = NULL;
813,889✔
2063
    code = createExprInfo(pList, NULL, &pExprInfo, &numOfExprs);
813,889✔
2064
  }
2065

2066
  if (code == 0) {
812,966✔
2067
    const char* pVal = NULL;
812,966✔
2068
    int32_t     len = 0;
812,966✔
2069
    SNode*      pSclNode = NULL;
812,966✔
2070
    switch (pExprInfo->pExpr->nodeType) {
812,966✔
2071
      case QUERY_NODE_FUNCTION:
812,966✔
2072
        pSclNode = (SNode*)pExprInfo->pExpr->_function.pFunctNode;
812,966✔
2073
        break;
813,223✔
2074
      case QUERY_NODE_OPERATOR:
×
2075
        pSclNode = pExprInfo->pExpr->_optrRoot.pRootNode;
×
2076
        break;
×
2077
      default:
×
2078
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
2079
        break;
×
2080
    }
2081
    SArray*     pBlockList = taosArrayInit(2, POINTER_BYTES);
813,223✔
2082
    SSDataBlock block = {0};
813,717✔
2083
    block.info.rows = 1;
813,717✔
2084
    SSDataBlock* pBlock = &block;
813,717✔
2085
    void*        tmp = taosArrayPush(pBlockList, &pBlock);
813,889✔
2086
    if (tmp == NULL) {
813,889✔
2087
      code = terrno;
×
2088
    }
2089
    if (code == 0) {
813,889✔
2090
      gTaskScalarExtra.pStreamInfo = (void*)pExtraParams;
813,889✔
2091
      gTaskScalarExtra.pStreamRange = NULL;
813,889✔
2092
      code = scalarCalculateInRange(pSclNode, pBlockList, pDst, rowStartIdx, rowEndIdx, &gTaskScalarExtra);
812,966✔
2093
    }
2094
    taosArrayDestroy(pBlockList);
813,698✔
2095
  }
2096
  nodesDestroyList(pList);
813,889✔
2097
  destroyExprInfo(pExprInfo, numOfExprs);
813,665✔
2098
  taosMemoryFreeClear(pExprInfo);
813,889✔
2099
  return code;
813,889✔
2100
}
2101

2102
int32_t streamForceOutput(qTaskInfo_t tInfo, SSDataBlock** pRes, int32_t winIdx) {
2,134,546✔
2103
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
2,134,546✔
2104
  const SArray*  pForceOutputCols = pTaskInfo->pStreamRuntimeInfo->pForceOutputCols;
2,134,546✔
2105
  int32_t        code = 0;
2,134,546✔
2106
  SNode*         pNode = NULL;
2,134,546✔
2107
  if (!pForceOutputCols) return 0;
2,134,546✔
2108
  if (!*pRes) {
54,804✔
2109
    code = createDataBlock(pRes);
54,804✔
2110
  }
2111

2112
  if (code == 0 && (!(*pRes)->pDataBlock || (*pRes)->pDataBlock->size == 0)) {
54,804✔
2113
    int32_t idx = 0;
54,804✔
2114
    for (int32_t i = 0; i < pForceOutputCols->size; ++i) {
219,024✔
2115
      SStreamOutCol*  pCol = (SStreamOutCol*)taosArrayGet(pForceOutputCols, i);
164,220✔
2116
      SColumnInfoData colInfo = createColumnInfoData(pCol->type.type, pCol->type.bytes, idx++);
164,220✔
2117
      colInfo.info.precision = pCol->type.precision;
164,220✔
2118
      colInfo.info.scale = pCol->type.scale;
164,220✔
2119
      code = blockDataAppendColInfo(*pRes, &colInfo);
164,220✔
2120
      if (code != 0) break;
164,220✔
2121
    }
2122
  }
2123

2124
  code = blockDataEnsureCapacity(*pRes, (*pRes)->info.rows + 1);
54,804✔
2125
  if (code != TSDB_CODE_SUCCESS) {
54,804✔
2126
    qError("failed to ensure capacity for force output, code:%s", tstrerror(code));
×
2127
    return code;
×
2128
  }
2129

2130
  // loop all exprs for force output, execute all exprs
2131
  int32_t idx = 0;
54,804✔
2132
  int32_t rowIdx = (*pRes)->info.rows;
54,804✔
2133
  int32_t tmpWinIdx = pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
54,804✔
2134
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = winIdx;
54,804✔
2135
  for (int32_t i = 0; i < pForceOutputCols->size; ++i) {
219,024✔
2136
    SScalarParam   dst = {0};
164,220✔
2137
    SStreamOutCol* pCol = (SStreamOutCol*)taosArrayGet(pForceOutputCols, i);
164,220✔
2138
    code = nodesStringToNode(pCol->expr, &pNode);
164,220✔
2139
    if (code != 0) break;
164,220✔
2140
    SColumnInfoData* pInfo = taosArrayGet((*pRes)->pDataBlock, idx);
164,220✔
2141
    if (nodeType(pNode) == QUERY_NODE_VALUE) {
164,220✔
2142
      void* p = nodesGetValueFromNode((SValueNode*)pNode);
109,416✔
2143
      code = colDataSetVal(pInfo, rowIdx, p, ((SValueNode*)pNode)->isNull);
109,416✔
2144
    } else {
2145
      dst.columnData = pInfo;
54,804✔
2146
      dst.numOfRows = rowIdx;
54,804✔
2147
      dst.colAlloced = false;
54,804✔
2148
      code = streamCalcOneScalarExprInRange(pNode, &dst, rowIdx, rowIdx, &pTaskInfo->pStreamRuntimeInfo->funcInfo);
54,804✔
2149
    }
2150
    ++idx;
164,220✔
2151
    // TODO sclFreeParam(&dst);
2152
    nodesDestroyNode(pNode);
164,220✔
2153
    if (code != 0) break;
164,220✔
2154
  }
2155
  if (code == TSDB_CODE_SUCCESS) {
54,804✔
2156
    (*pRes)->info.rows++;
54,804✔
2157
  }
2158
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = tmpWinIdx;
54,804✔
2159
  return code;
54,804✔
2160
}
2161

2162
int32_t streamCalcOutputTbName(SNode* pExpr, char* tbname, SStreamRuntimeFuncInfo* pStreamRuntimeInfo) {
257,614✔
2163
  int32_t      code = 0;
257,614✔
2164
  const char*  pVal = NULL;
257,614✔
2165
  SScalarParam dst = {0};
257,614✔
2166
  int32_t      len = 0;
257,614✔
2167
  int32_t      nextIdx = pStreamRuntimeInfo->curIdx;
257,614✔
2168
  pStreamRuntimeInfo->curIdx = 0;  // always use the first window to calc tbname
257,614✔
2169
  // execute the expr
2170
  switch (pExpr->type) {
257,614✔
2171
    case QUERY_NODE_VALUE: {
×
2172
      SValueNode* pValue = (SValueNode*)pExpr;
×
2173
      int32_t     type = pValue->node.resType.type;
×
2174
      if (!IS_STR_DATA_TYPE(type)) {
×
2175
        qError("invalid sub tb expr with non-str type");
×
2176
        code = TSDB_CODE_INVALID_PARA;
×
2177
        break;
×
2178
      }
2179
      void* pTmp = nodesGetValueFromNode((SValueNode*)pExpr);
×
2180
      if (pTmp == NULL) {
×
2181
        qError("invalid sub tb expr with null value");
×
2182
        code = TSDB_CODE_INVALID_PARA;
×
2183
        break;
×
2184
      }
2185
      pVal = varDataVal(pTmp);
×
2186
      len = varDataLen(pTmp);
×
2187
    } break;
×
2188
    case QUERY_NODE_FUNCTION: {
257,423✔
2189
      SFunctionNode* pFunc = (SFunctionNode*)pExpr;
257,423✔
2190
      if (!IS_STR_DATA_TYPE(pFunc->node.resType.type)) {
257,423✔
2191
        qError("invalid sub tb expr with non-str type func");
×
2192
        code = TSDB_CODE_INVALID_PARA;
×
2193
        break;
×
2194
      }
2195
      SColumnInfoData* pCol = taosMemoryCalloc(1, sizeof(SColumnInfoData));
257,423✔
2196
      if (!pCol) {
257,423✔
2197
        code = terrno;
×
2198
        qError("failed to allocate col info data at: %s, %d", __func__, __LINE__);
×
2199
        break;
×
2200
      }
2201

2202
      pCol->hasNull = true;
257,423✔
2203
      pCol->info.type = ((SExprNode*)pExpr)->resType.type;
257,423✔
2204
      pCol->info.colId = 0;
257,423✔
2205
      pCol->info.bytes = ((SExprNode*)pExpr)->resType.bytes;
257,199✔
2206
      pCol->info.precision = ((SExprNode*)pExpr)->resType.precision;
257,423✔
2207
      pCol->info.scale = ((SExprNode*)pExpr)->resType.scale;
257,423✔
2208
      code = colInfoDataEnsureCapacity(pCol, 1, true);
257,423✔
2209
      if (code != 0) {
257,423✔
2210
        qError("failed to ensure capacity for col info data at: %s, %d", __func__, __LINE__);
×
2211
        taosMemoryFree(pCol);
×
2212
        break;
×
2213
      }
2214
      dst.columnData = pCol;
257,423✔
2215
      dst.numOfRows = 1;
257,423✔
2216
      dst.colAlloced = true;
257,423✔
2217
      code = streamCalcOneScalarExpr(pExpr, &dst, pStreamRuntimeInfo);
257,423✔
2218
      if (colDataIsNull_var(dst.columnData, 0)) {
257,423✔
2219
        qInfo("invalid sub tb expr with null value");
2,771✔
2220
        code = TSDB_CODE_MND_STREAM_TBNAME_CALC_FAILED;
2,580✔
2221
      }
2222
      if (code == 0) {
257,423✔
2223
        pVal = varDataVal(colDataGetVarData(dst.columnData, 0));
254,843✔
2224
        len = varDataLen(colDataGetVarData(dst.columnData, 0));
254,843✔
2225
      }
2226
    } break;
257,614✔
2227
    default:
×
2228
      qError("wrong subtable expr with type: %d", pExpr->type);
×
2229
      code = TSDB_CODE_OPS_NOT_SUPPORT;
×
2230
      break;
×
2231
  }
2232
  if (code == 0) {
257,614✔
2233
    if (!pVal || len == 0) {
254,671✔
2234
      qError("tbname generated with no characters which is not allowed");
×
2235
      code = TSDB_CODE_INVALID_PARA;
×
2236
    }
2237
    if(len > TSDB_TABLE_NAME_LEN - 1) {
254,671✔
2238
      qError("tbname generated with too long characters, max allowed is %d, got %d, truncated.", TSDB_TABLE_NAME_LEN - 1, len);
448✔
2239
      len = TSDB_TABLE_NAME_LEN - 1;
448✔
2240
    }
2241

2242
    memcpy(tbname, pVal, len);
254,671✔
2243
    tbname[len] = '\0';  // ensure null terminated
254,671✔
2244
    if (NULL != strchr(tbname, '.')) {
255,072✔
2245
      code = TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME;
×
2246
      qError("tbname generated with invalid characters, '.' is not allowed");
×
2247
    }
2248
  }
2249
  // TODO free dst
2250
  sclFreeParam(&dst);
258,015✔
2251
  pStreamRuntimeInfo->curIdx = nextIdx; // restore
257,395✔
2252
  return code;
257,395✔
2253
}
2254

2255
void destroyStreamInserterParam(SStreamInserterParam* pParam) {
260,698✔
2256
  if (pParam) {
260,698✔
2257
    if (pParam->tbname) {
260,698✔
2258
      taosMemFree(pParam->tbname);
260,698✔
2259
      pParam->tbname = NULL;
260,698✔
2260
    }
2261
    if (pParam->stbname) {
260,698✔
2262
      taosMemFree(pParam->stbname);
260,698✔
2263
      pParam->stbname = NULL;
260,698✔
2264
    }
2265
    if (pParam->dbFName) {
260,698✔
2266
      taosMemFree(pParam->dbFName);
260,698✔
2267
      pParam->dbFName = NULL;
260,698✔
2268
    }
2269
    if (pParam->pFields) {
260,698✔
2270
      taosArrayDestroy(pParam->pFields);
260,698✔
2271
      pParam->pFields = NULL;
260,698✔
2272
    }
2273
    if (pParam->pTagFields) {
260,698✔
2274
      taosArrayDestroy(pParam->pTagFields);
157,015✔
2275
      pParam->pTagFields = NULL;
157,015✔
2276
    }
2277
    if (pParam->colCids) {
260,698✔
2278
      taosArrayDestroy(pParam->colCids);
1,796✔
2279
      pParam->colCids = NULL;
1,796✔
2280
    }
2281
    if (pParam->tagCids) {
260,698✔
2282
      taosArrayDestroy(pParam->tagCids);
1,155✔
2283
      pParam->tagCids = NULL;
1,155✔
2284
    }
2285
    taosMemFree(pParam);
260,698✔
2286
  }
2287
}
260,698✔
2288

2289
int32_t cloneStreamInserterParam(SStreamInserterParam** ppDst, SStreamInserterParam* pSrc) {
260,698✔
2290
  int32_t code = 0, lino = 0;
260,698✔
2291
  if (ppDst == NULL || pSrc == NULL) {
260,698✔
2292
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA);
×
2293
  }
2294
  *ppDst = (SStreamInserterParam*)taosMemoryCalloc(1, sizeof(SStreamInserterParam));
260,698✔
2295
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
260,698✔
2296

2297
  (*ppDst)->suid = pSrc->suid;
260,698✔
2298
  (*ppDst)->sver = pSrc->sver;
260,698✔
2299
  (*ppDst)->tbType = pSrc->tbType;
260,474✔
2300
  (*ppDst)->tbname = taosStrdup(pSrc->tbname);
260,474✔
2301
  TSDB_CHECK_NULL((*ppDst)->tbname, code, lino, _exit, terrno);
260,698✔
2302

2303
  if (pSrc->stbname) {
260,240✔
2304
    (*ppDst)->stbname = taosStrdup(pSrc->stbname);
260,474✔
2305
    TSDB_CHECK_NULL((*ppDst)->stbname, code, lino, _exit, terrno);
260,698✔
2306
  }
2307

2308
  (*ppDst)->dbFName = taosStrdup(pSrc->dbFName);
260,474✔
2309
  TSDB_CHECK_NULL((*ppDst)->dbFName, code, lino, _exit, terrno);
260,698✔
2310

2311
  (*ppDst)->pSinkHandle = pSrc->pSinkHandle;  // don't need clone and free
260,698✔
2312

2313
  if (pSrc->pFields && pSrc->pFields->size > 0) {
260,698✔
2314
    (*ppDst)->pFields = taosArrayDup(pSrc->pFields, NULL);
260,698✔
2315
    TSDB_CHECK_NULL((*ppDst)->pFields, code, lino, _exit, terrno);
260,474✔
2316
  } else {
2317
    (*ppDst)->pFields = NULL;
×
2318
  }
2319
  
2320
  if (pSrc->pTagFields && pSrc->pTagFields->size > 0) {
260,698✔
2321
    (*ppDst)->pTagFields = taosArrayDup(pSrc->pTagFields, NULL);
157,015✔
2322
    TSDB_CHECK_NULL((*ppDst)->pTagFields, code, lino, _exit, terrno);
157,015✔
2323
  } else {
2324
    (*ppDst)->pTagFields = NULL;
103,459✔
2325
  }
2326

2327
  if (pSrc->colCids && pSrc->colCids->size > 0) {
260,698✔
2328
    (*ppDst)->colCids = taosArrayDup(pSrc->colCids, NULL);
1,796✔
2329
    TSDB_CHECK_NULL((*ppDst)->colCids, code, lino, _exit, terrno);
1,796✔
2330
  } else {
2331
    (*ppDst)->colCids = NULL;
258,902✔
2332
  }
2333

2334
  if (pSrc->tagCids && pSrc->tagCids->size > 0) {
260,464✔
2335
    (*ppDst)->tagCids = taosArrayDup(pSrc->tagCids, NULL);
1,155✔
2336
    TSDB_CHECK_NULL((*ppDst)->tagCids, code, lino, _exit, terrno);
1,155✔
2337
  } else {
2338
    (*ppDst)->tagCids = NULL;
259,543✔
2339
  }
2340

2341
_exit:
260,698✔
2342

2343
  if (code != 0) {
260,698✔
2344
    if (*ppDst) {
×
2345
      destroyStreamInserterParam(*ppDst);
×
2346
      *ppDst = NULL;
×
2347
    }
2348
    
2349
    stError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2350
  }
2351
  return code;
260,464✔
2352
}
2353

2354
int32_t dropStreamTable(SMsgCb* pMsgCb, void* pOutput, SSTriggerDropRequest* pReq) {
257✔
2355
  return doDropStreamTable(pMsgCb, pOutput, pReq);
257✔
2356
}
2357

2358
int32_t dropStreamTableByTbName(SMsgCb* pMsgCb, void* pOutput, SSTriggerDropRequest* pReq, char* tbName) {
×
2359
  return doDropStreamTableByTbName(pMsgCb, pOutput, pReq, tbName);
×
2360
}
2361

2362
int32_t qFilterTableList(void* pVnode, SArray* uidList, SNode* node, void* pTaskInfo, uint64_t suid) {
8,770✔
2363
  int32_t         code = TSDB_CODE_SUCCESS;
8,770✔
2364

2365
  SNode* pTagCond = node == NULL ? NULL : ((SSubplan*)node)->pTagCond;
8,770✔
2366
  code = doFilterByTagCond(suid, uidList, pTagCond, pVnode, SFLT_NOT_INDEX, &((SExecTaskInfo*)pTaskInfo)->storageAPI, NULL);
8,770✔
2367
  if (code != TSDB_CODE_SUCCESS) {
8,770✔
2368
    goto end;
×
2369
  }
2370
end:
8,770✔
2371
  return code;
8,770✔
2372
}
2373

2374
bool isTrueForSatisfied(STrueForInfo* pTrueForInfo, int64_t skey, int64_t ekey, int64_t count) {
2,147,483,647✔
2375
  if (pTrueForInfo == NULL) {
2,147,483,647✔
2376
    return true;
2,147,483,647✔
2377
  }
2378

2379
  bool durationSatisfied = (pTrueForInfo->duration <= 0) || (llabs(ekey - skey) >= pTrueForInfo->duration);
2,147,483,647✔
2380
  bool countSatisfied = (pTrueForInfo->count <= 0) || (count >= pTrueForInfo->count);
2,147,483,647✔
2381
  switch (pTrueForInfo->trueForType) {
2,147,483,647✔
2382
    case TRUE_FOR_DURATION_ONLY:
2,147,483,647✔
2383
      return durationSatisfied;
2,147,483,647✔
2384
    case TRUE_FOR_COUNT_ONLY:
52,831,476✔
2385
      return countSatisfied;
52,831,476✔
2386
    case TRUE_FOR_AND:
52,831,476✔
2387
      return durationSatisfied && countSatisfied;
52,831,476✔
2388
    case TRUE_FOR_OR:
52,831,476✔
2389
      return durationSatisfied || countSatisfied;
52,831,476✔
2390
    default:
×
2391
      return true;
×
2392
  }
2393
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc