• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4976

06 Mar 2026 09:48AM UTC coverage: 68.446% (+0.08%) from 68.37%
#4976

push

travis-ci

web-flow
feat(TDgpt): support multiple input data columns for anomaly detection. (#34606)

0 of 93 new or added lines in 9 files covered. (0.0%)

5718 existing lines in 144 files now uncovered.

211146 of 308486 relevant lines covered (68.45%)

136170362.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.51
/source/libs/executor/src/executor.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include <stdint.h>
18
#include "cmdnodes.h"
19
#include "dataSinkInt.h"
20
#include "executil.h"
21
#include "executorInt.h"
22
#include "libs/new-stream/stream.h"
23
#include "operator.h"
24
#include "osMemPool.h"
25
#include "osMemory.h"
26
#include "planner.h"
27
#include "query.h"
28
#include "querytask.h"
29
#include "storageapi.h"
30
#include "streamexecutorInt.h"
31
#include "taosdef.h"
32
#include "tarray.h"
33
#include "tdatablock.h"
34
#include "tref.h"
35
#include "trpc.h"
36
#include "ttypes.h"
37
#include "tudf.h"
38
#include "wal.h"
39

40
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
41
int32_t             fetchObjRefPool = -1;
42
SGlobalExecInfo     gExecInfo = {0};
43

44
void setTaskScalarExtraInfo(qTaskInfo_t tinfo) {
757,793,542✔
45
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
757,793,542✔
46
  gTaskScalarExtra.pSubJobCtx = pTaskInfo->pSubJobCtx;
757,793,542✔
47
  gTaskScalarExtra.fp = qFetchRemoteNode;
757,877,484✔
48
}
757,723,125✔
49

50
void gExecInfoInit(void* pDnode, getDnodeId_f getDnodeId, getMnodeEpset_f getMnode) {
599,800✔
51
  gExecInfo.dnode = pDnode;
599,800✔
52
  gExecInfo.getMnode = getMnode;
599,800✔
53
  gExecInfo.getDnodeId = getDnodeId;
599,800✔
54
  return;
599,800✔
55
}
56

57
int32_t getCurrentMnodeEpset(SEpSet* pEpSet) {
44,675✔
58
  if (gExecInfo.dnode == NULL || gExecInfo.getMnode == NULL) {
44,675✔
59
    qError("gExecInfo is not initialized");
×
60
    return TSDB_CODE_APP_ERROR;
×
61
  }
62
  gExecInfo.getMnode(gExecInfo.dnode, pEpSet);
44,675✔
63
  return TSDB_CODE_SUCCESS;
44,897✔
64
}
65

66
void doDestroyFetchObj(void* param) {
147,640,112✔
67
  if (param == NULL) {
147,640,112✔
UNCOV
68
    return;
×
69
  }
70

71
  if (*(bool*)param) {
147,640,112✔
72
    doDestroyExchangeOperatorInfo(param);
110,047,441✔
73
  } else {
74
    destroySubJobCtx((STaskSubJobCtx *)param);
37,594,815✔
75
  }
76
}
77

78
static void cleanupRefPool() {
568,478✔
79
  int32_t ref = atomic_val_compare_exchange_32(&fetchObjRefPool, fetchObjRefPool, 0);
568,478✔
80
  taosCloseRef(ref);
568,478✔
81
}
568,478✔
82

83
static void initRefPool() {
568,478✔
84
  fetchObjRefPool = taosOpenRef(1024, doDestroyFetchObj);
568,478✔
85
  (void)atexit(cleanupRefPool);
568,478✔
86
}
568,478✔
87

UNCOV
88
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
×
89
  int32_t code = TSDB_CODE_SUCCESS;
×
90
  int32_t lino = 0;
×
UNCOV
91
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
×
92
    if (pOperator->numOfDownstream == 0) {
×
UNCOV
93
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
×
94
      return TSDB_CODE_APP_ERROR;
×
95
    }
96

97
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
×
98
      qError("join not supported for stream block scan, %s" PRIx64, id);
×
99
      return TSDB_CODE_APP_ERROR;
×
100
    }
UNCOV
101
    pOperator->status = OP_NOT_OPENED;
×
102
    return doSetSMABlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
×
103
  } else {
104
    pOperator->status = OP_NOT_OPENED;
×
105

106
    SStreamScanInfo* pInfo = pOperator->info;
×
107

108
    if (type == STREAM_INPUT__MERGED_SUBMIT) {
×
109
      for (int32_t i = 0; i < numOfBlocks; i++) {
×
110
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
×
111
        void*        tmp = taosArrayPush(pInfo->pBlockLists, pReq);
×
112
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
113
      }
114
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
×
115
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
×
116
      void* tmp = taosArrayPush(pInfo->pBlockLists, &input);
×
117
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
118
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
×
119
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
×
120
      for (int32_t i = 0; i < numOfBlocks; ++i) {
×
121
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
×
122
        SPackedData  tmp = {.pDataBlock = pDataBlock};
×
123
        void*        tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
×
124
        QUERY_CHECK_NULL(tmpItem, code, lino, _end, terrno);
×
125
      }
126
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
×
UNCOV
127
    } else if (type == STREAM_INPUT__CHECKPOINT) {
×
UNCOV
128
      SPackedData tmp = {.pDataBlock = input};
×
129
      void*       tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
×
UNCOV
130
      QUERY_CHECK_NULL(tmpItem, code, lino, _end, terrno);
×
UNCOV
131
      pInfo->blockType = STREAM_INPUT__CHECKPOINT;
×
132
    } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
×
133
      for (int32_t i = 0; i < numOfBlocks; ++i) {
×
134
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
×
UNCOV
135
        void*        tmp = taosArrayPush(pInfo->pBlockLists, pReq);
×
136
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
137
      }
UNCOV
138
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
×
139
    }
140

141
    return TSDB_CODE_SUCCESS;
×
142
  }
143

UNCOV
144
_end:
×
UNCOV
145
  if (code != TSDB_CODE_SUCCESS) {
×
146
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
147
  }
148
  return code;
×
149
}
150

151
static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
×
152
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
×
UNCOV
153
    if (pOperator->numOfDownstream == 0) {
×
154
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
×
UNCOV
155
      return TSDB_CODE_APP_ERROR;
×
156
    }
157

UNCOV
158
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
×
UNCOV
159
      qError("join not supported for stream block scan, %s" PRIx64, id);
×
UNCOV
160
      return TSDB_CODE_APP_ERROR;
×
161
    }
162

UNCOV
163
    pOperator->status = OP_NOT_OPENED;
×
UNCOV
164
    return doSetStreamOpOpen(pOperator->pDownstream[0], id);
×
165
  }
166
  return 0;
×
167
}
168

169
int32_t doSetTaskId(SOperatorInfo* pOperator, SStorageAPI* pAPI) {
53,251,478✔
170
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
53,251,478✔
171
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
53,251,795✔
172
    SStreamScanInfo* pStreamScanInfo = pOperator->info;
24,277,496✔
173
    if (pStreamScanInfo->pTableScanOp != NULL) {
24,277,496✔
174
      STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info;
24,277,418✔
175
      if (pScanInfo->base.dataReader != NULL) {
24,276,721✔
176
        int32_t code = pAPI->tsdReader.tsdSetReaderTaskId(pScanInfo->base.dataReader, pTaskInfo->id.str);
270,303✔
177
        if (code) {
269,985✔
UNCOV
178
          qError("failed to set reader id for executor, code:%s", tstrerror(code));
×
UNCOV
179
          return code;
×
180
        }
181
      }
182
    }
183
  } else {
184
    if (pOperator->pDownstream) return doSetTaskId(pOperator->pDownstream[0], pAPI);
28,971,660✔
185
  }
186

187
  return 0;
28,271,052✔
188
}
189

190
int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
28,270,000✔
191
  SExecTaskInfo* pTaskInfo = tinfo;
28,270,000✔
192
  pTaskInfo->id.queryId = queryId;
28,270,000✔
193
  buildTaskId(taskId, queryId, pTaskInfo->id.str, 64);
28,270,352✔
194

195
  // set the idstr for tsdbReader
196
  return doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI);
28,270,682✔
197
}
198

UNCOV
199
bool qTaskIsDone(qTaskInfo_t tinfo) {
×
UNCOV
200
  SExecTaskInfo* pTaskInfo = tinfo;
×
201
  return pTaskInfo->status == OP_EXEC_DONE;
×
202
}
203

204
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
×
205
  if (tinfo == NULL) {
×
UNCOV
206
    return TSDB_CODE_APP_ERROR;
×
207
  }
208

UNCOV
209
  if (pBlocks == NULL || numOfBlocks == 0) {
×
210
    return TSDB_CODE_SUCCESS;
×
211
  }
212

UNCOV
213
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
214

UNCOV
215
  int32_t code = doSetSMABlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
×
UNCOV
216
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
217
    qError("%s failed to set the sma block data", GET_TASKID(pTaskInfo));
×
218
  } else {
219
    qDebug("%s set the sma block successfully", GET_TASKID(pTaskInfo));
×
220
  }
221

UNCOV
222
  return code;
×
223
}
224

225
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, uint64_t id) {
449,973✔
226
  if (msg == NULL) {  // create raw scan
449,973✔
227
    SExecTaskInfo* pTaskInfo = NULL;
119,734✔
228

229
    int32_t code = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, &pReaderHandle->api, &pTaskInfo);
119,734✔
230
    if (NULL == pTaskInfo || code != 0) {
119,734✔
UNCOV
231
      return NULL;
×
232
    }
233

234
    code = createTmqRawScanOperatorInfo(pReaderHandle, pTaskInfo, &pTaskInfo->pRoot);
119,734✔
235
    if (NULL == pTaskInfo->pRoot || code != 0) {
119,734✔
236
      taosMemoryFree(pTaskInfo);
×
237
      return NULL;
×
238
    }
239

240
    pTaskInfo->storageAPI = pReaderHandle->api;
119,734✔
241
    qDebug("create raw scan task info completed, vgId:%d, %s", vgId, GET_TASKID(pTaskInfo));
119,734✔
242
    return pTaskInfo;
119,734✔
243
  }
244

245
  SSubplan* pPlan = NULL;
330,239✔
246
  int32_t   code = qStringToSubplan(msg, &pPlan);
333,428✔
247
  if (code != TSDB_CODE_SUCCESS) {
332,979✔
UNCOV
248
    qError("failed to parse subplan from msg, msg:%s code:%s", (char*) msg, tstrerror(code));
×
UNCOV
249
    terrno = code;
×
UNCOV
250
    return NULL;
×
251
  }
252

253
  qTaskInfo_t pTaskInfo = NULL;
332,979✔
254
  code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_QUEUE, NULL);
332,979✔
255
  if (code != TSDB_CODE_SUCCESS) {
333,364✔
UNCOV
256
    qDestroyTask(pTaskInfo);
×
UNCOV
257
    terrno = code;
×
258
    return NULL;
×
259
  }
260

261
  return pTaskInfo;
333,364✔
262
}
263

264
static int32_t checkInsertParam(SStreamInserterParam* streamInserterParam) {
759,838✔
265
  if (streamInserterParam == NULL) {
759,838✔
266
    return TSDB_CODE_SUCCESS;
489,228✔
267
  }
268

269
  if (streamInserterParam->tbType == TSDB_SUPER_TABLE && streamInserterParam->suid <= 0) {
270,610✔
270
    stError("insertParam: invalid suid:%" PRIx64 " for child table", streamInserterParam->suid);
×
UNCOV
271
    return TSDB_CODE_INVALID_PARA;
×
272
  }
273

274
  if (streamInserterParam->dbFName == NULL || strlen(streamInserterParam->dbFName) == 0) {
270,610✔
UNCOV
275
    stError("insertParam: invalid db/table name");
×
UNCOV
276
    return TSDB_CODE_INVALID_PARA;
×
277
  }
278

279
  if (streamInserterParam->suid <= 0 &&
270,610✔
280
      (streamInserterParam->tbname == NULL || strlen(streamInserterParam->tbname) == 0)) {
109,880✔
281
    stError("insertParam: invalid table name, suid:%" PRIx64 "", streamInserterParam->suid);
×
282
    return TSDB_CODE_INVALID_PARA;
×
283
  }
284

285
  return TSDB_CODE_SUCCESS;
270,610✔
286
}
287

288
static int32_t qCreateStreamExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
759,838✔
289
                                     qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql,
290
                                     EOPTR_EXEC_MODEL model, SStreamInserterParam* streamInserterParam) {
291
  if (pSubplan == NULL || pTaskInfo == NULL) {
759,838✔
UNCOV
292
    qError("invalid parameter, pSubplan:%p, pTaskInfo:%p", pSubplan, pTaskInfo);
×
UNCOV
293
    nodesDestroyNode((SNode *)pSubplan);
×
UNCOV
294
    return TSDB_CODE_INVALID_PARA;
×
295
  }
296
  int32_t lino = 0;
759,838✔
297
  int32_t code = checkInsertParam(streamInserterParam);
759,838✔
298
  if (code != TSDB_CODE_SUCCESS) {
759,838✔
299
    qError("invalid stream inserter param, code:%s", tstrerror(code));
×
UNCOV
300
    nodesDestroyNode((SNode *)pSubplan);
×
UNCOV
301
    return code;
×
302
  }
303
  SInserterParam* pInserterParam = NULL;
759,838✔
304
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
759,838✔
305
  (void)taosThreadOnce(&initPoolOnce, initRefPool);
759,838✔
306
  qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
759,838✔
307

308
  code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model, NULL);
759,838✔
309
  if (code != TSDB_CODE_SUCCESS || NULL == *pTask) {
759,838✔
310
    qError("failed to createExecTaskInfo, code:%s", tstrerror(code));
222✔
UNCOV
311
    goto _error;
×
312
  }
313

314
  if (streamInserterParam) {
759,616✔
315
    SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50, .compress = compressResult};
270,610✔
316
    void*           pSinkManager = NULL;
270,610✔
317
    code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
270,123✔
318
    if (code != TSDB_CODE_SUCCESS) {
270,610✔
UNCOV
319
      qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
×
UNCOV
320
      goto _error;
×
321
    }
322

323
    pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
270,610✔
324
    if (NULL == pInserterParam) {
270,388✔
325
      qError("failed to taosMemoryCalloc, code:%s, %s", tstrerror(terrno), (*pTask)->id.str);
×
UNCOV
326
      code = terrno;
×
UNCOV
327
      goto _error;
×
328
    }
329
    code = cloneStreamInserterParam(&pInserterParam->streamInserterParam, streamInserterParam);
270,388✔
330
    TSDB_CHECK_CODE(code, lino, _error);
270,123✔
331
    
332
    pInserterParam->readHandle = taosMemCalloc(1, sizeof(SReadHandle));
270,123✔
333
    pInserterParam->readHandle->pMsgCb = readHandle->pMsgCb;
270,388✔
334

335
    code = createStreamDataInserter(pSinkManager, handle, pInserterParam);
270,610✔
336
    if (code) {
270,610✔
UNCOV
337
      qError("failed to createStreamDataInserter, code:%s, %s", tstrerror(code), (*pTask)->id.str);
×
338
    }
339
  }
340
  qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64 " code:%s", taskId, pSubplan->id.queryId,
759,351✔
341
         tstrerror(code));
342

343
_error:
79,111✔
344

345
  if (code != TSDB_CODE_SUCCESS) {
759,838✔
UNCOV
346
    qError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
UNCOV
347
    if (pInserterParam != NULL) {
×
UNCOV
348
      taosMemoryFree(pInserterParam);
×
349
    }
350
  }
351
  return code;
759,838✔
352
}
353

354
bool qNeedReset(qTaskInfo_t pInfo) {
3,233,403✔
355
  if (pInfo == NULL) {
3,233,403✔
UNCOV
356
    return false;
×
357
  }
358
  SExecTaskInfo*  pTaskInfo = (SExecTaskInfo*)pInfo;
3,233,403✔
359
  SOperatorInfo*  pOperator = pTaskInfo->pRoot;
3,233,403✔
360
  if (pOperator == NULL || pOperator->pPhyNode == NULL) {
3,233,403✔
UNCOV
361
    return false;
×
362
  }
363
  int32_t node = nodeType(pOperator->pPhyNode);
3,233,403✔
364
  return (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == node || 
2,921,662✔
365
          QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == node ||
530,740✔
366
          QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == node ||
6,155,065✔
367
          QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == node);
368
}
369

370
static void setReadHandle(SReadHandle* pHandle, STableScanBase* pScanBaseInfo) {
2,702,663✔
371
  if (pHandle == NULL || pScanBaseInfo == NULL) {
2,702,663✔
UNCOV
372
    return;
×
373
  }
374

375
  pScanBaseInfo->readHandle.uid = pHandle->uid;
2,702,663✔
376
  pScanBaseInfo->readHandle.winRangeValid = pHandle->winRangeValid;
2,702,663✔
377
  pScanBaseInfo->readHandle.winRange = pHandle->winRange;
2,702,663✔
378
  pScanBaseInfo->readHandle.extWinRangeValid = pHandle->extWinRangeValid;
2,702,663✔
379
  pScanBaseInfo->readHandle.extWinRange = pHandle->extWinRange;
2,702,663✔
380
  pScanBaseInfo->readHandle.cacheSttStatis = pHandle->cacheSttStatis;
2,702,663✔
381
}
382

383
int32_t qResetTableScan(qTaskInfo_t pInfo, SReadHandle* handle) {
3,233,403✔
384
  if (pInfo == NULL) {
3,233,403✔
UNCOV
385
    return TSDB_CODE_INVALID_PARA;
×
386
  }
387
  SExecTaskInfo*  pTaskInfo = (SExecTaskInfo*)pInfo;
3,233,403✔
388
  SOperatorInfo*  pOperator = pTaskInfo->pRoot;
3,233,403✔
389

390
  void*           info = pOperator->info;
3,233,403✔
391
  STableScanBase* pScanBaseInfo = NULL;
3,233,403✔
392

393
  if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pOperator->pPhyNode)) {
3,233,403✔
394
    pScanBaseInfo = &((STableScanInfo*)info)->base;
311,741✔
395
    setReadHandle(handle, pScanBaseInfo);
311,741✔
396
  } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(pOperator->pPhyNode)) {
2,921,662✔
397
    pScanBaseInfo = &((STableMergeScanInfo*)info)->base;
2,390,922✔
398
    setReadHandle(handle, pScanBaseInfo);
2,390,922✔
399
  }
400

401
  qDebug("reset table scan, name:%s, id:%s, time range: [%" PRId64 ", %" PRId64 "]", pOperator->name, GET_TASKID(pTaskInfo), handle->winRange.skey,
3,233,403✔
402
  handle->winRange.ekey);
403
  return pOperator->fpSet.resetStateFn(pOperator);
3,233,403✔
404
}
405

406
int32_t qCreateStreamExecTaskInfo(qTaskInfo_t* pTaskInfo, void* msg, SReadHandle* readers,
759,838✔
407
                                  SStreamInserterParam* pInserterParams, int32_t vgId, int32_t taskId) {
408
  if (msg == NULL) {
759,838✔
UNCOV
409
    return TSDB_CODE_INVALID_PARA;
×
410
  }
411

412
  *pTaskInfo = NULL;
759,838✔
413

414
  SSubplan* pPlan = NULL;
759,838✔
415
  int32_t   code = qStringToSubplan(msg, &pPlan);
759,838✔
416
  if (code != TSDB_CODE_SUCCESS) {
759,838✔
UNCOV
417
    nodesDestroyNode((SNode *)pPlan);
×
UNCOV
418
    return code;
×
419
  }
420
  // todo: add stream inserter param
421
  code = qCreateStreamExecTask(readers, vgId, taskId, pPlan, pTaskInfo,
759,838✔
422
                               pInserterParams ? &pInserterParams->pSinkHandle : NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM,
423
                               pInserterParams);
424
  if (code != TSDB_CODE_SUCCESS) {
759,838✔
UNCOV
425
    qDestroyTask(*pTaskInfo);
×
UNCOV
426
    return code;
×
427
  }
428

429
  return code;
759,838✔
430
}
431

432
typedef struct {
433
  tb_uid_t tableUid;
434
  tb_uid_t childUid;
435
  int8_t   check;
436
} STqPair;
437

438
static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr,
416,748✔
439
                                       SStorageAPI* pAPI, SArray** ppArrayRes) {
440
  int32_t code = TSDB_CODE_SUCCESS;
416,748✔
441
  int32_t lino = 0;
416,748✔
442
  int8_t  locked = 0;
416,748✔
443
  SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
416,748✔
444

445
  QUERY_CHECK_NULL(qa, code, lino, _error, terrno);
416,748✔
446

447
  SArray* tUid = taosArrayInit(4, sizeof(STqPair));
416,748✔
448
  QUERY_CHECK_NULL(tUid, code, lino, _error, terrno);
416,748✔
449

450
  int32_t numOfUids = taosArrayGetSize(tableIdList);
416,748✔
451
  if (numOfUids == 0) {
416,748✔
UNCOV
452
    (*ppArrayRes) = qa;
×
UNCOV
453
    goto _error;
×
454
  }
455

456
  STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
416,748✔
457

458
  uint64_t suid = 0;
416,748✔
459
  uint64_t uid = 0;
416,748✔
460
  int32_t  type = 0;
416,748✔
461
  tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &suid, &uid, &type);
416,748✔
462

463
  // let's discard the tables those are not created according to the queried super table.
464
  SMetaReader mr = {0};
416,748✔
465
  pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
416,748✔
466

467
  locked = 1;
416,748✔
468
  for (int32_t i = 0; i < numOfUids; ++i) {
1,144,203✔
469
    uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
727,455✔
470
    QUERY_CHECK_NULL(id, code, lino, _end, terrno);
727,455✔
471

472
    int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr, *id);
727,455✔
473
    if (code != TSDB_CODE_SUCCESS) {
727,455✔
UNCOV
474
      qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
×
UNCOV
475
      continue;
×
476
    }
477

478
    tDecoderClear(&mr.coder);
727,455✔
479

480
    if (mr.me.type == TSDB_SUPER_TABLE) {
727,455✔
UNCOV
481
      continue;
×
482
    } else {
483
      if (type == TSDB_SUPER_TABLE) {
727,455✔
484
        // this new created child table does not belong to the scanned super table.
485
        if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != suid) {
727,455✔
UNCOV
486
          continue;
×
487
        }
488
      } else {  // ordinary table
489
        // In case that the scanned target table is an ordinary table. When replay the WAL during restore the vnode, we
490
        // should check all newly created ordinary table to make sure that this table isn't the destination table.
491
        if (mr.me.uid != uid) {
×
UNCOV
492
          continue;
×
493
        }
494
      }
495
    }
496

497
    STqPair item = {.tableUid = *id, .childUid = mr.me.uid, .check = 0};
727,455✔
498
    if (pScanInfo->pTagCond != NULL) {
727,455✔
499
      // tb_uid_t id = mr.me.uid;
500
      item.check = 1;
348,256✔
501
    }
502
    if (taosArrayPush(tUid, &item) == NULL) {
727,455✔
UNCOV
503
      QUERY_CHECK_NULL(NULL, code, lino, _end, terrno);
×
504
    }
505
  }
506

507
  pAPI->metaReaderFn.clearReader(&mr);
416,748✔
508
  locked = 0;
416,748✔
509

510
  for (int32_t j = 0; j < taosArrayGetSize(tUid); ++j) {
1,144,203✔
511
    bool     qualified = false;
727,455✔
512
    STqPair* t = (STqPair*)taosArrayGet(tUid, j);
727,455✔
513
    if (t == NULL) {
727,455✔
UNCOV
514
      continue;
×
515
    }
516

517
    if (t->check == 1) {
727,455✔
518
      code = isQualifiedTable(t->childUid, pScanInfo->pTagCond, pScanInfo->readHandle.vnode, &qualified, pAPI);
348,256✔
519
      if (code != TSDB_CODE_SUCCESS) {
348,256✔
UNCOV
520
        qError("failed to filter new table, uid:0x%" PRIx64 ", %s", t->childUid, idstr);
×
UNCOV
521
        continue;
×
522
      }
523

524
      if (!qualified) {
348,256✔
525
        qInfo("table uid:0x%" PRIx64 " is unqualified for tag condition, %s", t->childUid, idstr);
174,424✔
526
        continue;
174,424✔
527
      }
528
    }
529

530
    void* tmp = taosArrayPush(qa, &t->tableUid);
553,031✔
531
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
553,031✔
532
  }
533

534
  // handle multiple partition
535

536
_end:
416,748✔
537

538
  if (locked) {
416,748✔
UNCOV
539
    pAPI->metaReaderFn.clearReader(&mr);
×
540
  }
541
  (*ppArrayRes) = qa;
416,748✔
542
_error:
416,748✔
543

544
  taosArrayDestroy(tUid);
416,748✔
545
  if (code != TSDB_CODE_SUCCESS) {
416,748✔
UNCOV
546
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
547
  }
548
  return code;
416,748✔
549
}
550

UNCOV
551
int32_t qDeleteTableListForTmqScanner(qTaskInfo_t tinfo, const SArray* tableIdList) {
×
UNCOV
552
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
UNCOV
553
  const char*    id = GET_TASKID(pTaskInfo);
×
UNCOV
554
  int32_t        code = 0;
×
555

556
  // traverse to the stream scanner node to add this table id
UNCOV
557
  SOperatorInfo* pInfo = NULL;
×
UNCOV
558
  code = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pInfo);
×
UNCOV
559
  if (code != 0 || pInfo == NULL) {
×
UNCOV
560
    return code;
×
561
  }
562

UNCOV
563
  SStreamScanInfo* pScanInfo = pInfo->info;
×
564
  qDebug("%d remove child tables from the stream scanner, %s", (int32_t)taosArrayGetSize(tableIdList), id);
×
565
  taosWLockLatch(&pTaskInfo->lock);
×
UNCOV
566
  pTaskInfo->storageAPI.tqReaderFn.tqReaderRemoveTables(pScanInfo->tqReader, tableIdList);
×
UNCOV
567
  taosWUnLockLatch(&pTaskInfo->lock);
×
568

UNCOV
569
  return code;
×
570
}
571

572
static int32_t filterTableForTmqQuery(SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* id, SStorageAPI* pAPI, SRWLatch* lock) {
416,748✔
573
  SArray* qa = NULL;
416,748✔
574
  int32_t code = filterUnqualifiedTables(pScanInfo, tableIdList, id, pAPI, &qa);
416,748✔
575
  if (code != TSDB_CODE_SUCCESS) {
416,748✔
576
    taosArrayDestroy(qa);
×
577
    return code;
×
578
  }
579
  int32_t numOfQualifiedTables = taosArrayGetSize(qa);
416,748✔
580
  qDebug("%d qualified child tables added into stream scanner, %s", numOfQualifiedTables, id);
416,748✔
581
  pAPI->tqReaderFn.tqReaderAddTables(pScanInfo->tqReader, qa);
416,748✔
582

583
  bool   assignUid = false;
416,748✔
584
  size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
416,748✔
585
  char*  keyBuf = NULL;
416,748✔
586
  if (bufLen > 0) {
416,748✔
UNCOV
587
    assignUid = groupbyTbname(pScanInfo->pGroupTags);
×
UNCOV
588
    keyBuf = taosMemoryMalloc(bufLen);
×
589
    if (keyBuf == NULL) {
×
590
      taosArrayDestroy(qa);
×
591
      return terrno;
×
592
    }
593
  }
594

595
  STableListInfo* pTableListInfo = ((STableScanInfo*)pScanInfo->pTableScanOp->info)->base.pTableListInfo;
416,748✔
596
  taosWLockLatch(lock);
416,748✔
597

598
  for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
969,779✔
599
    uint64_t* uid = taosArrayGet(qa, i);
553,031✔
600
    if (!uid) {
553,031✔
UNCOV
601
      taosMemoryFree(keyBuf);
×
602
      taosArrayDestroy(qa);
×
603
      taosWUnLockLatch(lock);
×
604
      return terrno;
×
605
    }
606
    STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
553,031✔
607

608
    if (bufLen > 0) {
553,031✔
UNCOV
609
      if (assignUid) {
×
UNCOV
610
        keyInfo.groupId = keyInfo.uid;
×
611
      } else {
UNCOV
612
        code = getGroupIdFromTagsVal(pScanInfo->readHandle.vnode, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
×
613
                                      &keyInfo.groupId, pAPI);
614
        if (code != TSDB_CODE_SUCCESS) {
×
615
          taosMemoryFree(keyBuf);
×
616
          taosArrayDestroy(qa);
×
UNCOV
617
          taosWUnLockLatch(lock);
×
UNCOV
618
          return code;
×
619
        }
620
      }
621
    }
622

623
    code = tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
553,031✔
624
    if (code != TSDB_CODE_SUCCESS) {
553,031✔
UNCOV
625
      taosMemoryFree(keyBuf);
×
UNCOV
626
      taosArrayDestroy(qa);
×
UNCOV
627
      taosWUnLockLatch(lock);
×
UNCOV
628
      return code;
×
629
    }
630
  }
631

632
  taosWUnLockLatch(lock);
416,748✔
633
  if (keyBuf != NULL) {
416,748✔
UNCOV
634
    taosMemoryFree(keyBuf);
×
635
  }
636

637
  taosArrayDestroy(qa);
416,748✔
638
  return 0;
416,748✔
639
}
640

641
static void qUpdateTableTagCache(SStreamScanInfo* pScanInfo, const SArray* tableIdList, col_id_t cid, SStorageAPI* api) {
417,402✔
642
  STqReader*   tqReader = pScanInfo->tqReader;
417,402✔
643
  for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
1,145,511✔
644
    int64_t* uid = (int64_t*)taosArrayGet(tableIdList, i);
728,109✔
645
    api->tqReaderFn.tqUpdateTableTagCache(pScanInfo->tqReader, pScanInfo->pPseudoExpr, pScanInfo->numOfPseudoExpr, *uid, cid);
728,109✔
646
  }
647
}
417,402✔
648

649
int32_t qAddTableListForTmqScanner(qTaskInfo_t tinfo, const SArray* tableIdList) {
416,748✔
650
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
416,748✔
651
  const char*    id = GET_TASKID(pTaskInfo);
416,748✔
652
  int32_t        code = 0;
416,748✔
653

654
  qDebug("try to add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), id);
416,748✔
655

656
  // traverse to the stream scanner node to add this table id
657
  SOperatorInfo* pInfo = NULL;
416,748✔
658
  code = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pInfo);
416,748✔
659
  if (code != 0 || pInfo == NULL) {
416,748✔
UNCOV
660
    return code;
×
661
  }
662

663
  SStreamScanInfo* pScanInfo = pInfo->info;
416,748✔
664
  qUpdateTableTagCache(pScanInfo, tableIdList, 0, &pTaskInfo->storageAPI);
416,748✔
665

666
  return filterTableForTmqQuery(pScanInfo, tableIdList, id, &pTaskInfo->storageAPI, &pTaskInfo->lock);
416,748✔
667
}
668

669
void qUpdateTableTagCacheForTmq(qTaskInfo_t tinfo, const SArray* tableIdList, SArray* cids) {
654✔
670
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
654✔
671
  const char*    id = GET_TASKID(pTaskInfo);
654✔
672
  int32_t        code = 0;
654✔
673

674
  // traverse to the stream scanner node to add this table id
675
  SOperatorInfo* pInfo = NULL;
654✔
676
  code = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pInfo);
654✔
677
  if (code != 0 || pInfo == NULL) {
654✔
678
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
679
    return;
×
680
  }
681

682
  SStreamScanInfo* pScanInfo = pInfo->info;
654✔
683
  for (int32_t i = 0; i < taosArrayGetSize(cids); ++i) {
1,308✔
684
    col_id_t* cid = (col_id_t*)taosArrayGet(cids, i);
654✔
685
    qUpdateTableTagCache(pScanInfo, tableIdList, *cid, &pTaskInfo->storageAPI);
654✔
686
  }
687
}
688

689
int32_t qUpdateTableListForTmqScanner(qTaskInfo_t tinfo, const SArray* tableIdList) {
×
UNCOV
690
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
691
  const char*    id = GET_TASKID(pTaskInfo);
×
692
  int32_t        code = 0;
×
693

694
  // traverse to the stream scanner node to add this table id
UNCOV
695
  SOperatorInfo* pInfo = NULL;
×
696
  code = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pInfo);
×
UNCOV
697
  if (code != 0 || pInfo == NULL) {
×
UNCOV
698
    return code;
×
699
  }
700

UNCOV
701
  SStreamScanInfo* pScanInfo = pInfo->info;
×
702

UNCOV
703
  qDebug("%s %d remove child tables from the stream scanner, %s", __func__, (int32_t)taosArrayGetSize(tableIdList), id);
×
UNCOV
704
  taosWLockLatch(&pTaskInfo->lock);
×
705
  pTaskInfo->storageAPI.tqReaderFn.tqReaderRemoveTables(pScanInfo->tqReader, tableIdList);
×
UNCOV
706
  taosWUnLockLatch(&pTaskInfo->lock);
×
707
  
UNCOV
708
  return filterTableForTmqQuery(pScanInfo, tableIdList, id, &pTaskInfo->storageAPI, &pTaskInfo->lock);
×
709
}
710

711
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, int32_t dbNameBuffLen, char* tableName,
608,415,411✔
712
                                    int32_t tbaleNameBuffLen, int32_t* sversion, int32_t* tversion, int32_t* rversion,
713
                                    int32_t idx, bool* tbGet) {
714
  *tbGet = false;
608,415,411✔
715

716
  if (tinfo == NULL || dbName == NULL || tableName == NULL) {
608,455,970✔
717
    return TSDB_CODE_INVALID_PARA;
26,887✔
718
  }
719
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
608,429,104✔
720

721
  if (taosArrayGetSize(pTaskInfo->schemaInfos) <= idx) {
608,429,104✔
722
    return TSDB_CODE_SUCCESS;
348,040,880✔
723
  }
724

725
  SSchemaInfo* pSchemaInfo = taosArrayGet(pTaskInfo->schemaInfos, idx);
260,394,100✔
726
  if (!pSchemaInfo) {
260,473,975✔
UNCOV
727
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
728
    return terrno;
×
729
  }
730

731
  *sversion = pSchemaInfo->sw->version;
260,473,975✔
732
  *tversion = pSchemaInfo->tversion;
260,460,530✔
733
  *rversion = pSchemaInfo->rversion;
260,524,790✔
734
  if (pSchemaInfo->dbname) {
260,405,184✔
735
    tstrncpy(dbName, pSchemaInfo->dbname, dbNameBuffLen);
260,351,314✔
736
  } else {
UNCOV
737
    dbName[0] = 0;
×
738
  }
739
  if (pSchemaInfo->tablename) {
260,494,428✔
740
    tstrncpy(tableName, pSchemaInfo->tablename, tbaleNameBuffLen);
260,466,993✔
741
  } else {
742
    tableName[0] = 0;
×
743
  }
744

745
  *tbGet = true;
260,456,831✔
746

747
  return TSDB_CODE_SUCCESS;
260,433,317✔
748
}
749

750
bool qIsDynamicExecTask(qTaskInfo_t tinfo) { return ((SExecTaskInfo*)tinfo)->dynamicTask; }
347,983,456✔
751

752
void qDestroyOperatorParam(SOperatorParam* pParam) {
124,388✔
753
  if (NULL == pParam) {
124,388✔
UNCOV
754
    return;
×
755
  }
756
  freeOperatorParam(pParam, OP_GET_PARAM);
124,388✔
757
}
758

759
/**
760
  @brief Update the operator param for the task.
761
  @note  Unlike setOperatorParam, this function will destroy the new param when
762
         operator type mismatch.
763
*/
764
void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) {
40,491,388✔
765
  SExecTaskInfo* pTask = (SExecTaskInfo*)tinfo;
40,491,388✔
766
  SOperatorParam* pNewParam = (SOperatorParam*)pParam;
40,491,388✔
767
  if (pTask->pRoot && pTask->pRoot->operatorType != pNewParam->opType) {
40,491,388✔
768
    qError("%s, %s operator type mismatch, task operator type:%d, "
119,064✔
769
           "new param operator type:%d", GET_TASKID(pTask), __func__,
770
           pTask->pRoot->operatorType,
771
           pNewParam->opType);
772
    qDestroyOperatorParam((SOperatorParam*)pParam);
119,064✔
773
    return;
119,064✔
774
  }
775
  TSWAP(pParam, pTask->pOpParam);
40,375,848✔
776
  ((SExecTaskInfo*)tinfo)->paramSet = false;
40,376,539✔
777
}
778

779
int32_t qExecutorInit(void) {
4,460,684✔
780
  (void)taosThreadOnce(&initPoolOnce, initRefPool);
4,460,684✔
781
  return TSDB_CODE_SUCCESS;
4,461,052✔
782
}
783

784
int32_t qSemWait(qTaskInfo_t task, tsem_t* pSem) {
35,192,015✔
785
  int32_t        code = TSDB_CODE_SUCCESS;
35,192,015✔
786
  SExecTaskInfo* pTask = (SExecTaskInfo*)task;
35,192,015✔
787
  if (pTask->pWorkerCb) {
35,192,015✔
788
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
35,190,407✔
789
    if (code != TSDB_CODE_SUCCESS) {
35,192,551✔
UNCOV
790
      pTask->code = code;
×
UNCOV
791
      return pTask->code;
×
792
    }
793
  }
794

795
  code = tsem_wait(pSem);
35,195,231✔
796
  if (code != TSDB_CODE_SUCCESS) {
35,194,159✔
UNCOV
797
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
798
    pTask->code = code;
×
UNCOV
799
    return pTask->code;
×
800
  }
801

802
  if (pTask->pWorkerCb) {
35,194,159✔
803
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
35,194,159✔
804
    if (code != TSDB_CODE_SUCCESS) {
35,194,159✔
UNCOV
805
      pTask->code = code;
×
UNCOV
806
      return pTask->code;
×
807
    }
808
  }
809
  return TSDB_CODE_SUCCESS;
35,194,159✔
810
}
811

812
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
350,510,861✔
813
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql,
814
                        EOPTR_EXEC_MODEL model, SArray** subEndPoints) {
815
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
350,510,861✔
816
  (void)taosThreadOnce(&initPoolOnce, initRefPool);
350,510,861✔
817

818
  qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d, subEndPoinsNum:%d", 
350,505,217✔
819
    taskId, pSubplan->id.queryId, vgId, (int32_t)taosArrayGetSize(subEndPoints ? *subEndPoints : NULL));
820

821
  readHandle->uid = 0;
350,555,972✔
822
  int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model, subEndPoints);
350,595,847✔
823
  if (code != TSDB_CODE_SUCCESS || NULL == *pTask) {
350,349,928✔
824
    qError("failed to createExecTaskInfo, code:%s", tstrerror(code));
538,282✔
825
    goto _error;
457,416✔
826
  }
827
    
828
  if (handle) {
349,892,059✔
829
    SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50, .compress = compressResult};
349,592,530✔
830
    void*           pSinkManager = NULL;
349,664,387✔
831
    code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
349,234,154✔
832
    if (code != TSDB_CODE_SUCCESS) {
349,346,388✔
UNCOV
833
      qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
×
UNCOV
834
      goto _error;
×
835
    }
836

837
    void* pSinkParam = NULL;
349,346,388✔
838
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, (*pTask), readHandle);
349,506,325✔
839
    if (code != TSDB_CODE_SUCCESS) {
349,292,438✔
840
      qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
×
UNCOV
841
      taosMemoryFree(pSinkManager);
×
UNCOV
842
      goto _error;
×
843
    }
844

845
    SDataSinkNode* pSink = NULL;
349,292,438✔
846
    if (readHandle->localExec) {
349,529,921✔
847
      code = nodesCloneNode((SNode*)pSubplan->pDataSink, (SNode**)&pSink);
3,036✔
848
      if (code != TSDB_CODE_SUCCESS) {
3,036✔
UNCOV
849
        qError("failed to nodesCloneNode, srcType:%d, code:%s, %s", nodeType(pSubplan->pDataSink), tstrerror(code),
×
850
               (*pTask)->id.str);
UNCOV
851
        taosMemoryFree(pSinkManager);
×
UNCOV
852
        goto _error;
×
853
      }
854
    }
855

856
    // pSinkParam has been freed during create sinker.
857
    code = dsCreateDataSinker(pSinkManager, readHandle->localExec ? &pSink : &pSubplan->pDataSink, handle, pSinkParam,
349,580,589✔
858
                              (*pTask)->id.str, pSubplan->processOneBlock);
349,622,473✔
859
    if (code) {
349,244,583✔
860
      qError("s-task:%s failed to create data sinker, code:%s", (*pTask)->id.str, tstrerror(code));
595✔
861
    }
862
  }
863

864
  qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64 " code:%s subEndPoints:%d", 
349,779,436✔
865
    taskId, pSubplan->id.queryId, tstrerror(code), (*pTask)->pSubJobCtx ? (int32_t)taosArrayGetSize((*pTask)->pSubJobCtx->subEndPoints) : 0);
866

867
_error:
127,338,021✔
868
  // if failed to add ref for all tables in this query, abort current query
869
  return code;
350,500,109✔
870
}
871

UNCOV
872
static void freeBlock(void* param) {
×
UNCOV
873
  SSDataBlock* pBlock = *(SSDataBlock**)param;
×
UNCOV
874
  blockDataDestroy(pBlock);
×
UNCOV
875
}
×
876

877
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal,
410,403,815✔
878
                     bool processOneBlock) {
879
  int32_t        code = TSDB_CODE_SUCCESS;
410,403,815✔
880
  int32_t        lino = 0;
410,403,815✔
881
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
410,403,815✔
882
  int64_t        threadId = taosGetSelfPthreadId();
410,403,815✔
883

884
  if (pLocal) {
410,373,313✔
885
    memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
406,556,919✔
886
  }
887

888
  taosArrayClear(pResList);
410,232,181✔
889

890
  int64_t curOwner = 0;
410,294,742✔
891
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
410,294,742✔
892
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
×
UNCOV
893
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
×
UNCOV
894
    return pTaskInfo->code;
×
895
  }
896

897
  if (pTaskInfo->cost.start == 0) {
410,249,973✔
898
    pTaskInfo->cost.start = taosGetTimestampUs();
343,872,006✔
899
  }
900

901
  if (isTaskKilled(pTaskInfo)) {
410,359,124✔
UNCOV
902
    atomic_store_64(&pTaskInfo->owner, 0);
×
UNCOV
903
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
×
UNCOV
904
    return pTaskInfo->code;
×
905
  }
906

907
  // error occurs, record the error code and return to client
908
  int32_t ret = setjmp(pTaskInfo->env);
410,325,636✔
909
  if (ret != TSDB_CODE_SUCCESS) {
412,299,190✔
910
    pTaskInfo->code = ret;
2,203,590✔
911
    (void)cleanUpUdfs();
2,203,590✔
912

913
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
2,203,590✔
914
    atomic_store_64(&pTaskInfo->owner, 0);
2,203,590✔
915

916
    return pTaskInfo->code;
2,203,590✔
917
  }
918

919
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
410,095,600✔
920

921
  int32_t      current = 0;
410,109,799✔
922
  SSDataBlock* pRes = NULL;
410,109,799✔
923
  int64_t      st = taosGetTimestampUs();
410,439,169✔
924

925
  if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
410,439,169✔
926
    pTaskInfo->paramSet = true;
40,369,776✔
927
    code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes);
40,370,722✔
928
  } else {
929
    code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
370,019,234✔
930
  }
931

932
  QUERY_CHECK_CODE(code, lino, _end);
408,216,691✔
933
  code = blockDataCheck(pRes);
408,216,691✔
934
  QUERY_CHECK_CODE(code, lino, _end);
408,239,578✔
935

936
  if (pRes == NULL) {
408,239,578✔
937
    st = taosGetTimestampUs();
82,180,113✔
938
  }
939

940
  int32_t rowsThreshold = pTaskInfo->pSubplan->rowsThreshold;
408,249,959✔
941
  if (!pTaskInfo->pSubplan->dynamicRowThreshold || 4096 <= pTaskInfo->pSubplan->rowsThreshold) {
408,228,290✔
942
    rowsThreshold = 4096;
407,666,919✔
943
  }
944

945
  int32_t blockIndex = 0;
408,220,425✔
946
  while (pRes != NULL) {
1,160,232,810✔
947
    SSDataBlock* p = NULL;
808,701,260✔
948
    if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) {
808,615,320✔
949
      SSDataBlock* p1 = NULL;
662,730,125✔
950
      code = createOneDataBlock(pRes, true, &p1);
662,734,388✔
951
      QUERY_CHECK_CODE(code, lino, _end);
662,722,488✔
952

953
      void* tmp = taosArrayPush(pTaskInfo->pResultBlockList, &p1);
662,722,488✔
954
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
662,775,863✔
955
      p = p1;
662,775,863✔
956
    } else if (processOneBlock) {
145,996,530✔
957
      SSDataBlock** tmp = taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
19,878,876✔
958
      if (tmp) {
19,878,876✔
959
        blockDataDestroy(*tmp);
19,878,876✔
960
        *tmp = NULL;
19,878,876✔
961
      }
962
      SSDataBlock* p1 = NULL;
19,878,876✔
963
      code = createOneDataBlock(pRes, true, &p1);
19,878,876✔
964
      QUERY_CHECK_CODE(code, lino, _end);
19,878,876✔
965

966
      *tmp = p1;
19,878,876✔
967
      p = p1;
19,878,876✔
968
    } else {
969
      void* tmp = taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
126,117,654✔
970
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
126,118,705✔
971

972
      p = *(SSDataBlock**)tmp;
126,118,705✔
973
      code = copyDataBlock(p, pRes);
126,118,665✔
974
      QUERY_CHECK_CODE(code, lino, _end);
126,120,265✔
975
    }
976

977
    blockIndex += 1;
808,777,978✔
978

979
    current += p->info.rows;
808,777,978✔
980
    QUERY_CHECK_CONDITION((p->info.rows > 0 || p->info.type == STREAM_CHECKPOINT), code, lino, _end,
808,770,605✔
981
                          TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
982
    void* tmp = taosArrayPush(pResList, &p);
808,759,246✔
983
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
808,759,246✔
984

985
    if (current >= rowsThreshold || processOneBlock) {
808,759,246✔
986
      break;
987
    }
988

989
    code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
752,026,693✔
990
    QUERY_CHECK_CODE(code, lino, _end);
751,918,231✔
991
    code = blockDataCheck(pRes);
751,918,231✔
992
    QUERY_CHECK_CODE(code, lino, _end);
752,054,868✔
993
  }
994

995
  if (pTaskInfo->pSubplan->dynamicRowThreshold) {
408,263,565✔
996
    pTaskInfo->pSubplan->rowsThreshold -= current;
532,516✔
997
  }
998

999
  *hasMore = (pRes != NULL);
408,271,003✔
1000
  uint64_t el = (taosGetTimestampUs() - st);
408,227,125✔
1001

1002
  pTaskInfo->cost.elapsedTime += el;
408,227,125✔
1003
  if (NULL == pRes) {
408,273,299✔
1004
    *useconds = pTaskInfo->cost.elapsedTime;
351,541,582✔
1005
  }
1006

1007
_end:
408,244,133✔
1008
  (void)cleanUpUdfs();
408,239,799✔
1009

1010
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
408,296,884✔
1011
  qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
408,296,884✔
1012
         GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
1013

1014
  atomic_store_64(&pTaskInfo->owner, 0);
408,296,971✔
1015
  if (code) {
408,295,964✔
1016
    pTaskInfo->code = code;
×
1017
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1018
  }
1019

1020
  return pTaskInfo->code;
408,295,964✔
1021
}
1022

1023
void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
×
UNCOV
1024
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
UNCOV
1025
  SArray*        pList = pTaskInfo->pResultBlockList;
×
UNCOV
1026
  size_t         num = taosArrayGetSize(pList);
×
UNCOV
1027
  for (int32_t i = 0; i < num; ++i) {
×
UNCOV
1028
    SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i);
×
UNCOV
1029
    if (p) {
×
UNCOV
1030
      blockDataDestroy(*p);
×
1031
    }
1032
  }
1033

UNCOV
1034
  taosArrayClear(pTaskInfo->pResultBlockList);
×
UNCOV
1035
}
×
1036

1037
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
25,645,455✔
1038
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
25,645,455✔
1039
  int64_t        threadId = taosGetSelfPthreadId();
25,645,455✔
1040
  int64_t        curOwner = 0;
25,645,455✔
1041

1042
  *pRes = NULL;
25,645,455✔
1043

1044
  // todo extract method
1045
  taosRLockLatch(&pTaskInfo->lock);
25,645,455✔
1046
  bool isKilled = isTaskKilled(pTaskInfo);
25,645,455✔
1047
  if (isKilled) {
25,645,455✔
UNCOV
1048
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
×
1049

UNCOV
1050
    taosRUnLockLatch(&pTaskInfo->lock);
×
UNCOV
1051
    return pTaskInfo->code;
×
1052
  }
1053

1054
  if (pTaskInfo->owner != 0) {
25,645,455✔
UNCOV
1055
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
×
UNCOV
1056
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
×
1057

UNCOV
1058
    taosRUnLockLatch(&pTaskInfo->lock);
×
UNCOV
1059
    return pTaskInfo->code;
×
1060
  }
1061

1062
  pTaskInfo->owner = threadId;
25,645,455✔
1063
  taosRUnLockLatch(&pTaskInfo->lock);
25,645,455✔
1064

1065
  if (pTaskInfo->cost.start == 0) {
25,645,455✔
1066
    pTaskInfo->cost.start = taosGetTimestampUs();
236,760✔
1067
  }
1068

1069
  // error occurs, record the error code and return to client
1070
  int32_t ret = setjmp(pTaskInfo->env);
25,645,455✔
1071
  if (ret != TSDB_CODE_SUCCESS) {
25,645,455✔
1072
    pTaskInfo->code = ret;
×
1073
    (void)cleanUpUdfs();
×
UNCOV
1074
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
×
UNCOV
1075
    atomic_store_64(&pTaskInfo->owner, 0);
×
UNCOV
1076
    return pTaskInfo->code;
×
1077
  }
1078

1079
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
25,645,455✔
1080

1081
  int64_t st = taosGetTimestampUs();
25,644,818✔
1082
  int32_t code = TSDB_CODE_SUCCESS;
25,644,818✔
1083
  if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
25,644,818✔
1084
    pTaskInfo->paramSet = true;
×
1085
    code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, pRes);
×
1086
  } else {
1087
    code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, pRes);
25,644,818✔
1088
  }
1089
  if (code) {
25,645,103✔
UNCOV
1090
    pTaskInfo->code = code;
×
UNCOV
1091
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
1092
  }
1093

1094
  code = blockDataCheck(*pRes);
25,645,103✔
1095
  if (code) {
25,645,455✔
UNCOV
1096
    pTaskInfo->code = code;
×
UNCOV
1097
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
1098
  }
1099

1100
  uint64_t el = (taosGetTimestampUs() - st);
25,645,455✔
1101

1102
  pTaskInfo->cost.elapsedTime += el;
25,645,455✔
1103
  if (NULL == *pRes) {
25,645,103✔
1104
    *useconds = pTaskInfo->cost.elapsedTime;
11,728,247✔
1105
  }
1106

1107
  (void)cleanUpUdfs();
25,645,455✔
1108

1109
  int32_t  current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
25,645,455✔
1110
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
25,645,455✔
1111

1112
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
25,645,455✔
1113
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
1114

1115
  atomic_store_64(&pTaskInfo->owner, 0);
25,645,455✔
1116
  return pTaskInfo->code;
25,645,455✔
1117
}
1118

1119
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
110,047,165✔
1120
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
110,047,165✔
1121
  void* tmp = taosArrayPush(pTaskInfo->stopInfo.pStopInfo, pInfo);
110,048,067✔
1122
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
110,049,285✔
1123

1124
  if (!tmp) {
110,047,549✔
1125
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1126
    return terrno;
×
1127
  }
1128
  return TSDB_CODE_SUCCESS;
110,047,549✔
1129
}
1130

UNCOV
1131
int32_t stopInfoComp(void const* lp, void const* rp) {
×
1132
  SExchangeOpStopInfo* key = (SExchangeOpStopInfo*)lp;
×
1133
  SExchangeOpStopInfo* pInfo = (SExchangeOpStopInfo*)rp;
×
1134

1135
  if (key->refId < pInfo->refId) {
×
1136
    return -1;
×
UNCOV
1137
  } else if (key->refId > pInfo->refId) {
×
1138
    return 1;
×
1139
  }
1140

UNCOV
1141
  return 0;
×
1142
}
1143

UNCOV
1144
void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
×
UNCOV
1145
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
×
UNCOV
1146
  int32_t idx = taosArraySearchIdx(pTaskInfo->stopInfo.pStopInfo, pInfo, stopInfoComp, TD_EQ);
×
UNCOV
1147
  if (idx >= 0) {
×
UNCOV
1148
    taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx);
×
1149
  }
UNCOV
1150
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
×
UNCOV
1151
}
×
1152

1153
void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
80,605✔
1154
  if (pTaskInfo->pSubJobCtx) {
80,605✔
1155
    pTaskInfo->pSubJobCtx->code = pTaskInfo->code;
19,832✔
1156
    int32_t code = tsem_post(&pTaskInfo->pSubJobCtx->ready);
19,832✔
1157
  }
1158
  
1159
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
80,605✔
1160

1161
  int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo);
80,605✔
1162
  for (int32_t i = 0; i < num; ++i) {
84,810✔
1163
    SExchangeOpStopInfo* pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i);
4,205✔
1164
    if (!pStop) {
4,205✔
1165
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
1166
      continue;
×
1167
    }
1168
    SExchangeInfo* pExchangeInfo = taosAcquireRef(fetchObjRefPool, pStop->refId);
4,205✔
1169
    if (pExchangeInfo) {
4,205✔
1170
      int32_t code = tsem_post(&pExchangeInfo->ready);
4,205✔
1171
      if (code != TSDB_CODE_SUCCESS) {
4,205✔
UNCOV
1172
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1173
      } else {
1174
        qDebug("post to exchange %" PRId64 " to stop", pStop->refId);
4,205✔
1175
      }
1176
      code = taosReleaseRef(fetchObjRefPool, pStop->refId);
4,205✔
1177
      if (code != TSDB_CODE_SUCCESS) {
4,205✔
UNCOV
1178
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1179
      }
1180
    }
1181
  }
1182

1183
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
80,605✔
1184
}
80,605✔
1185

1186
int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
80,605✔
1187
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
80,605✔
1188
  if (pTaskInfo == NULL) {
80,605✔
UNCOV
1189
    return TSDB_CODE_QRY_INVALID_QHANDLE;
×
1190
  }
1191

1192
  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
80,605✔
1193

1194
  setTaskKilled(pTaskInfo, rspCode);
80,605✔
1195
  qStopTaskOperators(pTaskInfo);
80,605✔
1196

1197
  return TSDB_CODE_SUCCESS;
80,605✔
1198
}
1199

1200
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration) {
×
1201
  int64_t        st = taosGetTimestampMs();
×
UNCOV
1202
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
1203
  if (pTaskInfo == NULL) {
×
UNCOV
1204
    return TSDB_CODE_QRY_INVALID_QHANDLE;
×
1205
  }
1206

UNCOV
1207
  if (waitDuration > 0) {
×
1208
    qDebug("%s sync killed execTask, and waiting for at most %.2fs", GET_TASKID(pTaskInfo), waitDuration / 1000.0);
×
1209
  } else {
1210
    qDebug("%s async killed execTask", GET_TASKID(pTaskInfo));
×
1211
  }
1212

UNCOV
1213
  setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
×
1214

UNCOV
1215
  if (waitDuration > 0) {
×
1216
    while (1) {
1217
      taosWLockLatch(&pTaskInfo->lock);
×
1218
      if (qTaskIsExecuting(pTaskInfo)) {  // let's wait for 100 ms and try again
×
1219
        taosWUnLockLatch(&pTaskInfo->lock);
×
1220

UNCOV
1221
        taosMsleep(200);
×
1222

1223
        int64_t d = taosGetTimestampMs() - st;
×
1224
        if (d >= waitDuration && waitDuration >= 0) {
×
UNCOV
1225
          qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitDuration / 1000.0);
×
UNCOV
1226
          return TSDB_CODE_SUCCESS;
×
1227
        }
1228
      } else {  // not running now
1229
        pTaskInfo->code = rspCode;
×
1230
        taosWUnLockLatch(&pTaskInfo->lock);
×
1231
        return TSDB_CODE_SUCCESS;
×
1232
      }
1233
    }
1234
  }
1235

UNCOV
1236
  int64_t et = taosGetTimestampMs() - st;
×
1237
  if (et < waitDuration) {
×
1238
    qInfo("%s  waiting %.2fs for executor stopping", GET_TASKID(pTaskInfo), et / 1000.0);
×
1239
    return TSDB_CODE_SUCCESS;
×
1240
  }
UNCOV
1241
  return TSDB_CODE_SUCCESS;
×
1242
}
1243

UNCOV
1244
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
×
UNCOV
1245
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
×
UNCOV
1246
  if (NULL == pTaskInfo) {
×
UNCOV
1247
    return false;
×
1248
  }
1249

UNCOV
1250
  return 0 != atomic_load_64(&pTaskInfo->owner);
×
1251
}
1252

1253
static void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
351,018,239✔
1254
  STaskCostInfo* pSummary = &pTaskInfo->cost;
351,018,239✔
1255
  int64_t        idleTime = pSummary->start - pSummary->created;
351,029,103✔
1256

1257
  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
351,006,701✔
1258
  if (pSummary->pRecoder != NULL) {
350,992,909✔
1259
    qDebug(
258,973,973✔
1260
        "%s :cost summary: idle:%.2f ms, elapsed time:%.2f ms, extract tableList:%.2f ms, "
1261
        "createGroupIdMap:%.2f ms, total blocks:%d, "
1262
        "load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
1263
        GET_TASKID(pTaskInfo), idleTime / 1000.0, pSummary->elapsedTime / 1000.0, pSummary->extractListTime,
1264
        pSummary->groupIdMapTime, pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks,
1265
        pRecorder->totalRows, pRecorder->totalCheckedRows);
1266
  } else {
1267
    qDebug("%s :cost summary: idle in queue:%.2f ms, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), idleTime / 1000.0,
92,037,175✔
1268
           pSummary->elapsedTime / 1000.0);
1269
  }
1270
}
351,012,799✔
1271

1272

1273
void qDestroyTask(qTaskInfo_t qTaskHandle) {
357,967,671✔
1274
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
357,967,671✔
1275
  if (pTaskInfo == NULL) {
357,967,671✔
1276
    return;
6,958,884✔
1277
  }
1278

1279
  if (pTaskInfo->pRoot != NULL) {
351,008,787✔
1280
    qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
351,008,791✔
1281
  } else {
UNCOV
1282
    qDebug("%s execTask completed", GET_TASKID(pTaskInfo));
×
1283
  }
1284

1285
  printTaskExecCostInLog(pTaskInfo);  // print the query cost summary
351,013,335✔
1286
  doDestroyTask(pTaskInfo);
351,010,181✔
1287
}
1288

1289
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
3,785,329✔
1290
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
3,785,329✔
1291
  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
3,785,329✔
1292
}
1293

1294
void qExtractTmqScanner(qTaskInfo_t tinfo, void** scanner) {
333,686✔
1295
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
333,686✔
1296
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
333,686✔
1297

1298
  while (1) {
327,549✔
1299
    uint16_t type = pOperator->operatorType;
661,235✔
1300
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
661,235✔
1301
      *scanner = pOperator->info;
333,686✔
1302
      break;
333,686✔
1303
    } else {
1304
      pOperator = pOperator->pDownstream[0];
327,549✔
1305
    }
1306
  }
1307
}
333,686✔
1308

1309
void* qExtractReaderFromTmqScanner(void* scanner) {
333,686✔
1310
  SStreamScanInfo* pInfo = scanner;
333,686✔
1311
  return (void*)pInfo->tqReader;
333,686✔
1312
}
1313

1314
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) {
752,414✔
1315
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
752,414✔
1316
  return pTaskInfo->streamInfo.schema;
752,414✔
1317
}
1318

1319
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
752,414✔
1320
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
752,414✔
1321
  return pTaskInfo->streamInfo.tbName;
752,414✔
1322
}
1323

1324
SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
831,772✔
1325
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
831,772✔
1326
  return &pTaskInfo->streamInfo.btMetaRsp;
831,772✔
1327
}
1328

1329
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
24,939,105✔
1330
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
24,939,105✔
1331
  tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset);
24,939,105✔
1332
  return 0;
24,939,105✔
1333
}
1334

1335
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
793,531✔
1336
  memset(pCond, 0, sizeof(SQueryTableDataCond));
793,531✔
1337
  pCond->order = TSDB_ORDER_ASC;
793,531✔
1338
  pCond->numOfCols = pMtInfo->schema->nCols;
792,867✔
1339
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
794,154✔
1340
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
792,854✔
1341
  if (pCond->colList == NULL || pCond->pSlotList == NULL) {
793,503✔
1342
    taosMemoryFreeClear(pCond->colList);
664✔
UNCOV
1343
    taosMemoryFreeClear(pCond->pSlotList);
×
UNCOV
1344
    return terrno;
×
1345
  }
1346

1347
  TAOS_SET_OBJ_ALIGNED(&pCond->twindows, TSWINDOW_INITIALIZER);
792,867✔
1348
  pCond->suid = pMtInfo->suid;
793,531✔
1349
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
793,531✔
1350
  pCond->startVersion = -1;
794,154✔
1351
  pCond->endVersion = sContext->snapVersion;
793,836✔
1352

1353
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
5,055,219✔
1354
    SColumnInfo* pColInfo = &pCond->colList[i];
4,260,401✔
1355
    pColInfo->type = pMtInfo->schema->pSchema[i].type;
4,260,388✔
1356
    pColInfo->bytes = pMtInfo->schema->pSchema[i].bytes;
4,261,329✔
1357
    if (pMtInfo->pExtSchemas != NULL) {
4,261,011✔
1358
      decimalFromTypeMod(pMtInfo->pExtSchemas[i].typeMod, &pColInfo->precision, &pColInfo->scale);
47,680✔
1359
    }
1360
    pColInfo->colId = pMtInfo->schema->pSchema[i].colId;
4,261,357✔
1361
    pColInfo->pk = pMtInfo->schema->pSchema[i].flags & COL_IS_KEY;
4,261,383✔
1362

1363
    pCond->pSlotList[i] = i;
4,261,370✔
1364
  }
1365

1366
  return TSDB_CODE_SUCCESS;
793,836✔
1367
}
1368

1369
void qStreamSetOpen(qTaskInfo_t tinfo) {
24,086,773✔
1370
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
24,086,773✔
1371
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
24,086,773✔
1372
  pOperator->status = OP_NOT_OPENED;
24,090,319✔
1373
}
24,087,172✔
1374

1375
void qStreamSetParams(qTaskInfo_t tinfo, int8_t sourceExcluded, int32_t minPollRows, int64_t timeout, int8_t enableReplay) {
24,130,710✔
1376
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
24,130,710✔
1377
  pTaskInfo->streamInfo.sourceExcluded = sourceExcluded;
24,130,710✔
1378
  pTaskInfo->streamInfo.minPollRows = minPollRows;
24,134,940✔
1379
  pTaskInfo->streamInfo.timeout = timeout;
24,125,721✔
1380
  pTaskInfo->streamInfo.enableReplay = enableReplay;
24,128,178✔
1381
}
24,131,871✔
1382

1383
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
25,119,850✔
1384
  int32_t        code = TSDB_CODE_SUCCESS;
25,119,850✔
1385
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
25,119,850✔
1386
  SStorageAPI*   pAPI = &pTaskInfo->storageAPI;
25,119,850✔
1387

1388
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
25,123,931✔
1389
  const char*    id = GET_TASKID(pTaskInfo);
25,121,477✔
1390

1391
  if (subType == TOPIC_SUB_TYPE__COLUMN && pOffset->type == TMQ_OFFSET__LOG) {
25,119,701✔
1392
    code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
23,784,999✔
1393
    if (pOperator == NULL || code != 0) {
23,782,628✔
UNCOV
1394
      return code;
×
1395
    }
1396

1397
    SStreamScanInfo* pInfo = pOperator->info;
23,782,660✔
1398
    SStoreTqReader*  pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
23,784,002✔
1399
    SWalReader*      pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
23,783,010✔
1400
    walReaderVerifyOffset(pWalReader, pOffset);
23,784,471✔
1401
  }
1402
  // if pOffset equal to current offset, means continue consume
1403
  if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) {
25,118,971✔
1404
    return 0;
23,209,278✔
1405
  }
1406

1407
  if (subType == TOPIC_SUB_TYPE__COLUMN) {
1,908,220✔
1408
    code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
1,097,334✔
1409
    if (pOperator == NULL || code != 0) {
1,097,351✔
UNCOV
1410
      return code;
×
1411
    }
1412

1413
    SStreamScanInfo* pInfo = pOperator->info;
1,097,351✔
1414
    STableScanInfo*  pScanInfo = pInfo->pTableScanOp->info;
1,097,661✔
1415
    STableScanBase*  pScanBaseInfo = &pScanInfo->base;
1,097,665✔
1416
    STableListInfo*  pTableListInfo = pScanBaseInfo->pTableListInfo;
1,097,120✔
1417

1418
    if (pOffset->type == TMQ_OFFSET__LOG) {
1,097,661✔
1419
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pScanBaseInfo->dataReader);
876,321✔
1420
      pScanBaseInfo->dataReader = NULL;
874,996✔
1421

1422
      SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
875,314✔
1423
      SWalReader*     pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
875,994✔
1424
      walReaderVerifyOffset(pWalReader, pOffset);
875,091✔
1425
      code = pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version, id);
876,321✔
1426
      if (code < 0) {
876,321✔
1427
        qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version, id);
11,850✔
1428
        return code;
11,850✔
1429
      }
1430
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
220,711✔
1431
      // iterate all tables from tableInfoList, and retrieve rows from each table one-by-one
1432
      // those data are from the snapshot in tsdb, besides the data in the wal file.
1433
      int64_t uid = pOffset->uid;
222,297✔
1434
      int64_t ts = pOffset->ts;
221,658✔
1435
      int32_t index = 0;
221,339✔
1436

1437
      // this value may be changed if new tables are created
1438
      taosRLockLatch(&pTaskInfo->lock);
221,339✔
1439
      int32_t numOfTables = 0;
222,297✔
1440
      code = tableListGetSize(pTableListInfo, &numOfTables);
222,297✔
1441
      if (code != TSDB_CODE_SUCCESS) {
221,658✔
UNCOV
1442
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
1443
        taosRUnLockLatch(&pTaskInfo->lock);
×
1444
        return code;
×
1445
      }
1446

1447
      if (uid == 0) {
221,658✔
1448
        if (numOfTables != 0) {
213,240✔
1449
          STableKeyInfo* tmp = tableListGetInfo(pTableListInfo, 0);
39,230✔
1450
          if (!tmp) {
39,230✔
UNCOV
1451
            qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
1452
            taosRUnLockLatch(&pTaskInfo->lock);
×
UNCOV
1453
            return terrno;
×
1454
          }
1455
          if (tmp) uid = tmp->uid;
39,230✔
1456
          ts = INT64_MIN;
39,230✔
1457
          pScanInfo->currentTable = 0;
39,230✔
1458
        } else {
1459
          taosRUnLockLatch(&pTaskInfo->lock);
174,010✔
1460
          qError("no table in table list, %s", id);
174,021✔
1461
          return TSDB_CODE_TMQ_NO_TABLE_QUALIFIED;
174,974✔
1462
        }
1463
      }
1464
      pTaskInfo->storageAPI.tqReaderFn.tqSetTablePrimaryKey(pInfo->tqReader, uid);
47,329✔
1465

1466
      qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% " PRId64 " rows returned", uid, ts,
47,323✔
1467
             pInfo->pTableScanOp->resultInfo.totalRows);
1468
      pInfo->pTableScanOp->resultInfo.totalRows = 0;
47,323✔
1469

1470
      // start from current accessed position
1471
      // we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start
1472
      // position, let's find it from the beginning.
1473
      index = tableListFind(pTableListInfo, uid, 0);
47,323✔
1474
      taosRUnLockLatch(&pTaskInfo->lock);
47,323✔
1475

1476
      if (index >= 0) {
47,323✔
1477
        pScanInfo->currentTable = index;
47,323✔
1478
      } else {
UNCOV
1479
        qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
×
1480
               numOfTables, pScanInfo->currentTable, id);
UNCOV
1481
        return TSDB_CODE_TMQ_NO_TABLE_QUALIFIED;
×
1482
      }
1483

1484
      STableKeyInfo keyInfo = {.uid = uid};
47,323✔
1485
      int64_t       oldSkey = pScanBaseInfo->cond.twindows.skey;
47,323✔
1486

1487
      // let's start from the next ts that returned to consumer.
1488
      if (pTaskInfo->storageAPI.tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader)) {
47,323✔
1489
        pScanBaseInfo->cond.twindows.skey = ts;
933✔
1490
      } else {
1491
        pScanBaseInfo->cond.twindows.skey = ts + 1;
46,390✔
1492
      }
1493
      pScanInfo->scanTimes = 0;
47,323✔
1494

1495
      if (pScanBaseInfo->dataReader == NULL) {
47,323✔
1496
        code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond,
80,680✔
1497
                                                             &keyInfo, 1, pScanInfo->pResBlock,
1498
                                                             (void**)&pScanBaseInfo->dataReader, id, NULL);
40,340✔
1499
        if (code != TSDB_CODE_SUCCESS) {
40,340✔
UNCOV
1500
          qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
×
UNCOV
1501
          return code;
×
1502
        }
1503

1504
        qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s",
40,340✔
1505
               uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
1506
      } else {
1507
        code = pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
6,983✔
1508
        if (code != TSDB_CODE_SUCCESS) {
6,983✔
1509
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
1510
          return code;
×
1511
        }
1512

1513
        code = pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
6,983✔
1514
        if (code != TSDB_CODE_SUCCESS) {
6,983✔
UNCOV
1515
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
1516
          return code;
×
1517
        }
1518
        qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 "  table index:%d numOfTable:%d, %s",
6,983✔
1519
               uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
1520
      }
1521

1522
      // restore the key value
1523
      pScanBaseInfo->cond.twindows.skey = oldSkey;
47,323✔
1524
    } else {
UNCOV
1525
      qError("invalid pOffset->type:%d, %s", pOffset->type, id);
×
UNCOV
1526
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1527
    }
1528

1529
  } else {  // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB
1530
    if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
810,886✔
1531
      SStreamRawScanInfo* pInfo = pOperator->info;
795,024✔
1532
      SSnapContext*       sContext = pInfo->sContext;
795,024✔
1533
      SOperatorInfo*      p = NULL;
795,024✔
1534

1535
      code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id, &p);
795,024✔
1536
      if (code != 0) {
795,024✔
1537
        return code;
×
1538
      }
1539

1540
      STableListInfo* pTableListInfo = ((SStreamRawScanInfo*)(p->info))->pTableListInfo;
795,024✔
1541

1542
      if (pAPI->snapshotFn.setForSnapShot(sContext, pOffset->uid) != 0) {
795,024✔
UNCOV
1543
        qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id);
×
1544
        return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1545
      }
1546

1547
      SMetaTableInfo mtInfo = {0};
795,024✔
1548
      code = pTaskInfo->storageAPI.snapshotFn.getMetaTableInfoFromSnapshot(sContext, &mtInfo);
795,024✔
1549
      if (code != 0) {
794,096✔
1550
        destroyMetaTableInfo(&mtInfo);
UNCOV
1551
        return code;
×
1552
      }
1553
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
794,096✔
1554
      pInfo->dataReader = NULL;
793,114✔
1555

1556
      cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
794,083✔
1557
      tableListClear(pTableListInfo);
793,791✔
1558

1559
      if (mtInfo.uid == 0) {
795,024✔
1560
        destroyMetaTableInfo(&mtInfo);
1561
        goto end;  // no data
870✔
1562
      }
1563

1564
      pAPI->snapshotFn.taosXSetTablePrimaryKey(sContext, mtInfo.uid);
794,154✔
1565
      code = initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
793,836✔
1566
      if (code != TSDB_CODE_SUCCESS) {
793,213✔
UNCOV
1567
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1568
        destroyMetaTableInfo(&mtInfo);
UNCOV
1569
        return code;
×
1570
      }
1571
      if (pAPI->snapshotFn.taosXGetTablePrimaryKey(sContext)) {
793,213✔
1572
        pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
1,332✔
1573
      } else {
1574
        pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts + 1;
791,840✔
1575
      }
1576

1577
      code = tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0);
793,213✔
1578
      if (code != TSDB_CODE_SUCCESS) {
794,154✔
1579
        destroyMetaTableInfo(&mtInfo);
UNCOV
1580
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1581
        return code;
×
1582
      }
1583

1584
      STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
794,154✔
1585
      if (!pList) {
794,154✔
1586
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1587
        destroyMetaTableInfo(&mtInfo);
1588
        return code;
×
1589
      }
1590
      int32_t size = 0;
794,154✔
1591
      code = tableListGetSize(pTableListInfo, &size);
794,154✔
1592
      if (code != TSDB_CODE_SUCCESS) {
794,154✔
UNCOV
1593
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1594
        destroyMetaTableInfo(&mtInfo);
UNCOV
1595
        return code;
×
1596
      }
1597

1598
      code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size,
1,588,308✔
1599
                                                           NULL, (void**)&pInfo->dataReader, NULL, NULL);
794,154✔
1600
      if (code != TSDB_CODE_SUCCESS) {
792,934✔
UNCOV
1601
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1602
        destroyMetaTableInfo(&mtInfo);
UNCOV
1603
        return code;
×
1604
      }
1605

1606
      cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
792,934✔
1607
      tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN);
793,544✔
1608
      //      pTaskInfo->streamInfo.suid = mtInfo.suid == 0 ? mtInfo.uid : mtInfo.suid;
1609
      tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema);
792,629✔
1610
      pTaskInfo->streamInfo.schema = mtInfo.schema;
793,849✔
1611
      taosMemoryFreeClear(mtInfo.pExtSchemas);
794,154✔
1612

1613
      qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64 " %s", mtInfo.uid, pOffset->ts, id);
794,154✔
1614
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
15,862✔
1615
      SStreamRawScanInfo* pInfo = pOperator->info;
4,946✔
1616
      SSnapContext*       sContext = pInfo->sContext;
4,946✔
1617
      code = pTaskInfo->storageAPI.snapshotFn.setForSnapShot(sContext, pOffset->uid);
4,946✔
1618
      if (code != 0) {
4,946✔
UNCOV
1619
        qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
×
UNCOV
1620
        return code;
×
1621
      }
1622
      qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts,
4,946✔
1623
             id);
1624
    } else if (pOffset->type == TMQ_OFFSET__LOG) {
10,916✔
1625
      SStreamRawScanInfo* pInfo = pOperator->info;
10,916✔
1626
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
10,916✔
1627
      pInfo->dataReader = NULL;
10,916✔
1628
      qDebug("tmqsnap qStreamPrepareScan snapshot log, %s", id);
10,916✔
1629
    }
1630
  }
1631

1632
end:
1,722,572✔
1633
  tOffsetCopy(&pTaskInfo->streamInfo.currentOffset, pOffset);
1,721,765✔
1634
  return 0;
1,722,680✔
1635
}
1636

1637
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
275,764,462✔
1638
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
275,764,462✔
1639
  if (pMsg->info.ahandle == NULL) {
275,812,224✔
1640
    rpcFreeCont(pMsg->pCont);
583✔
1641
    qError("rsp msg got while pMsg->info.ahandle is NULL, 0x%" PRIx64 ":0x%" PRIx64, TRACE_GET_ROOTID(&pMsg->info.traceId), TRACE_GET_MSGID(&pMsg->info.traceId));
583✔
1642
    return;
583✔
1643
  }
1644

1645
  qDebug("rsp msg got, code:%x, len:%d, 0x%" PRIx64 ":0x%" PRIx64, 
275,770,719✔
1646
      pMsg->code, pMsg->contLen, TRACE_GET_ROOTID(&pMsg->info.traceId), TRACE_GET_MSGID(&pMsg->info.traceId));
1647

1648
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};
275,782,584✔
1649

1650
  if (pMsg->contLen > 0) {
275,788,383✔
1651
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
275,741,817✔
1652
    if (buf.pData == NULL) {
275,729,194✔
UNCOV
1653
      pMsg->code = terrno;
×
1654
    } else {
1655
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
275,729,194✔
1656
    }
1657
  }
1658

1659
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
275,824,097✔
1660
  rpcFreeCont(pMsg->pCont);
275,835,165✔
1661
  destroySendMsgInfo(pSendInfo);
275,793,765✔
1662
}
1663

1664
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
×
1665
  int32_t        code = TSDB_CODE_SUCCESS;
×
UNCOV
1666
  int32_t        lino = 0;
×
UNCOV
1667
  SExecTaskInfo* pTaskInfo = tinfo;
×
UNCOV
1668
  SArray*        plist = NULL;
×
1669

UNCOV
1670
  code = getTableListInfo(pTaskInfo, &plist);
×
1671
  if (code || plist == NULL) {
×
1672
    return NULL;
×
1673
  }
1674

1675
  // only extract table in the first elements
1676
  STableListInfo* pTableListInfo = taosArrayGetP(plist, 0);
×
1677

1678
  SArray* pUidList = taosArrayInit(10, sizeof(uint64_t));
×
1679
  QUERY_CHECK_NULL(pUidList, code, lino, _end, terrno);
×
1680

1681
  int32_t numOfTables = 0;
×
1682
  code = tableListGetSize(pTableListInfo, &numOfTables);
×
UNCOV
1683
  QUERY_CHECK_CODE(code, lino, _end);
×
1684

1685
  for (int32_t i = 0; i < numOfTables; ++i) {
×
UNCOV
1686
    STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
×
1687
    QUERY_CHECK_NULL(pKeyInfo, code, lino, _end, terrno);
×
1688
    void* tmp = taosArrayPush(pUidList, &pKeyInfo->uid);
×
1689
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1690
  }
1691

1692
  taosArrayDestroy(plist);
×
1693

UNCOV
1694
_end:
×
UNCOV
1695
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1696
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1697
    T_LONG_JMP(pTaskInfo->env, code);
×
1698
  }
UNCOV
1699
  return pUidList;
×
1700
}
1701

1702
static int32_t extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
3,472,364✔
1703
  int32_t        code = TSDB_CODE_SUCCESS;
3,472,364✔
1704
  int32_t        lino = 0;
3,472,364✔
1705
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3,472,364✔
1706

1707
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
3,472,922✔
UNCOV
1708
    SStreamScanInfo* pScanInfo = pOperator->info;
×
UNCOV
1709
    STableScanInfo*  pTableScanInfo = pScanInfo->pTableScanOp->info;
×
1710

UNCOV
1711
    void* tmp = taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
×
UNCOV
1712
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1713
  } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
3,471,249✔
1714
    STableScanInfo* pScanInfo = pOperator->info;
1,735,085✔
1715

1716
    void* tmp = taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
1,735,085✔
1717
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,736,201✔
1718
  } else {
1719
    if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
1,736,721✔
1720
      code = extractTableList(pList, pOperator->pDownstream[0]);
1,736,721✔
1721
    }
1722
  }
1723

UNCOV
1724
_end:
×
1725
  if (code != TSDB_CODE_SUCCESS) {
3,471,286✔
1726
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1727
  }
1728
  return code;
3,472,326✔
1729
}
1730

1731
int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList) {
1,736,721✔
1732
  if (pList == NULL) {
1,736,721✔
UNCOV
1733
    return TSDB_CODE_INVALID_PARA;
×
1734
  }
1735

1736
  *pList = NULL;
1,736,721✔
1737
  SArray* pArray = taosArrayInit(0, POINTER_BYTES);
1,737,279✔
1738
  if (pArray == NULL) {
1,736,721✔
1739
    return terrno;
×
1740
  }
1741

1742
  int32_t code = extractTableList(pArray, pTaskInfo->pRoot);
1,736,721✔
1743
  if (code == 0) {
1,735,605✔
1744
    *pList = pArray;
1,735,605✔
1745
  } else {
1746
    taosArrayDestroy(pArray);
×
1747
  }
1748
  return code;
1,735,605✔
1749
}
1750

UNCOV
1751
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) {
×
1752
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
×
1753
  if (pTaskInfo->pRoot->fpSet.releaseStreamStateFn != NULL) {
×
1754
    pTaskInfo->pRoot->fpSet.releaseStreamStateFn(pTaskInfo->pRoot);
×
1755
  }
UNCOV
1756
  return 0;
×
1757
}
1758

UNCOV
1759
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo) {
×
1760
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
×
1761
  if (pTaskInfo->pRoot->fpSet.reloadStreamStateFn != NULL) {
×
UNCOV
1762
    pTaskInfo->pRoot->fpSet.reloadStreamStateFn(pTaskInfo->pRoot);
×
1763
  }
1764
  return 0;
×
1765
}
1766

UNCOV
1767
void qResetTaskCode(qTaskInfo_t tinfo) {
×
1768
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
1769

1770
  int32_t code = pTaskInfo->code;
×
UNCOV
1771
  pTaskInfo->code = 0;
×
UNCOV
1772
  qDebug("0x%" PRIx64 " reset task code to be success, prev:%s", pTaskInfo->id.taskId, tstrerror(code));
×
1773
}
×
1774

1775
int32_t collectExprsToReplaceForStream(SOperatorInfo* pOper, SArray* pExprs) {
×
1776
  int32_t code = 0;
×
UNCOV
1777
  return code;
×
1778
}
1779

UNCOV
1780
int32_t streamCollectExprsForReplace(qTaskInfo_t tInfo, SArray* pExprs) {
×
UNCOV
1781
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
×
UNCOV
1782
  int32_t        code = collectExprsToReplaceForStream(pTaskInfo->pRoot, pExprs);
×
UNCOV
1783
  return code;
×
1784
}
1785

1786
int32_t clearStatesForOperator(SOperatorInfo* pOper) {
26,082,802✔
1787
  int32_t code = 0;
26,082,802✔
1788

1789
  freeResetOperatorParams(pOper, OP_GET_PARAM, true);
26,082,802✔
1790
  freeResetOperatorParams(pOper, OP_NOTIFY_PARAM, true);
26,081,189✔
1791

1792
  if (pOper->fpSet.resetStateFn) {
26,082,081✔
1793
    code = pOper->fpSet.resetStateFn(pOper);
26,083,021✔
1794
  }
1795
  pOper->status = OP_NOT_OPENED;
26,080,209✔
1796
  for (int32_t i = 0; i < pOper->numOfDownstream && code == 0; ++i) {
44,451,546✔
1797
    code = clearStatesForOperator(pOper->pDownstream[i]);
18,366,778✔
1798
  }
1799
  return code;
26,084,130✔
1800
}
1801

1802
int32_t streamClearStatesForOperators(qTaskInfo_t tInfo) {
7,715,274✔
1803
  int32_t        code = 0;
7,715,274✔
1804
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
7,715,274✔
1805
  SOperatorInfo* pOper = pTaskInfo->pRoot;
7,715,274✔
1806
  pTaskInfo->code = TSDB_CODE_SUCCESS;
7,715,524✔
1807
  code = clearStatesForOperator(pOper);
7,715,524✔
1808
  return code;
7,715,524✔
1809
}
1810

1811
int32_t streamExecuteTask(qTaskInfo_t tInfo, SSDataBlock** ppRes, uint64_t* useconds, bool* finished) {
10,748,273✔
1812
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
10,748,273✔
1813
  int64_t        threadId = taosGetSelfPthreadId();
10,748,273✔
1814
  int64_t        curOwner = 0;
10,748,273✔
1815

1816
  *ppRes = NULL;
10,748,273✔
1817

1818
  // todo extract method
1819
  taosRLockLatch(&pTaskInfo->lock);
10,748,273✔
1820
  bool isKilled = isTaskKilled(pTaskInfo);
10,748,273✔
1821
  if (isKilled) {
10,748,049✔
1822
    // clearStreamBlock(pTaskInfo->pRoot);
1823
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
×
1824

UNCOV
1825
    taosRUnLockLatch(&pTaskInfo->lock);
×
1826
    return pTaskInfo->code;
×
1827
  }
1828

1829
  if (pTaskInfo->owner != 0) {
10,748,049✔
UNCOV
1830
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
×
UNCOV
1831
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
×
1832

UNCOV
1833
    taosRUnLockLatch(&pTaskInfo->lock);
×
UNCOV
1834
    return pTaskInfo->code;
×
1835
  }
1836

1837
  pTaskInfo->owner = threadId;
10,748,049✔
1838
  taosRUnLockLatch(&pTaskInfo->lock);
10,748,049✔
1839

1840
  if (pTaskInfo->cost.start == 0) {
10,748,273✔
1841
    pTaskInfo->cost.start = taosGetTimestampUs();
333,733✔
1842
  }
1843

1844
  // error occurs, record the error code and return to client
1845
  int32_t ret = setjmp(pTaskInfo->env);
10,748,273✔
1846
  if (ret != TSDB_CODE_SUCCESS) {
12,231,084✔
1847
    pTaskInfo->code = ret;
1,484,371✔
1848
    (void)cleanUpUdfs();
1,484,371✔
1849
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
1,484,371✔
1850
    atomic_store_64(&pTaskInfo->owner, 0);
1,484,371✔
1851
    return pTaskInfo->code;
1,484,371✔
1852
  }
1853

1854
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
10,746,713✔
1855

1856
  int64_t st = taosGetTimestampUs();
10,748,014✔
1857

1858
  int32_t code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, ppRes);
10,748,014✔
1859
  if (code) {
9,263,902✔
1860
    pTaskInfo->code = code;
×
1861
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
1862
  } else {
1863
    *finished = *ppRes == NULL;
9,263,902✔
1864
    code = blockDataCheck(*ppRes);
9,263,902✔
1865
  }
1866
  if (code) {
9,263,902✔
UNCOV
1867
    pTaskInfo->code = code;
×
UNCOV
1868
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
1869
  }
1870

1871
  uint64_t el = (taosGetTimestampUs() - st);
9,263,678✔
1872

1873
  pTaskInfo->cost.elapsedTime += el;
9,263,678✔
1874
  if (NULL == *ppRes) {
9,263,678✔
1875
    *useconds = pTaskInfo->cost.elapsedTime;
5,758,101✔
1876
  }
1877

1878
  (void)cleanUpUdfs();
9,263,678✔
1879

1880
  int32_t  current = (*ppRes != NULL) ? (*ppRes)->info.rows : 0;
9,263,902✔
1881
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
9,263,902✔
1882

1883
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
9,262,652✔
1884
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
1885

1886
  atomic_store_64(&pTaskInfo->owner, 0);
9,262,652✔
1887
  return pTaskInfo->code;
9,263,166✔
1888
}
1889

1890
// void streamSetTaskRuntimeInfo(qTaskInfo_t tinfo, SStreamRuntimeInfo* pStreamRuntimeInfo) {
1891
//   SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1892
//   pTaskInfo->pStreamRuntimeInfo = pStreamRuntimeInfo;
1893
// }
1894

1895
int32_t qStreamCreateTableListForReader(void* pVnode, uint64_t suid, uint64_t uid, int8_t tableType,
272,401✔
1896
                                        SNodeList* pGroupTags, bool groupSort, SNode* pTagCond, SNode* pTagIndexCond,
1897
                                        SStorageAPI* storageAPI, void** pTableListInfo, SHashObj* groupIdMap) {
1898
  int32_t code = 0;                                        
272,401✔
1899
  if (*pTableListInfo != NULL) {
272,401✔
1900
    qDebug("table list already exists, no need to create again");
×
UNCOV
1901
    goto end;
×
1902
  }
1903
  STableListInfo* pList = tableListCreate();
272,592✔
1904
  if (pList == NULL) {
273,328✔
UNCOV
1905
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
1906
    code = terrno;
×
UNCOV
1907
    goto end;
×
1908
  }
1909

1910
  SScanPhysiNode pScanNode = {.suid = suid, .uid = uid, .tableType = tableType};
273,328✔
1911
  SReadHandle    pHandle = {.vnode = pVnode};
273,328✔
1912
  SExecTaskInfo  pTaskInfo = {.id.str = "", .storageAPI = *storageAPI};
273,328✔
1913

1914
  code = createScanTableListInfo(&pScanNode, pGroupTags, groupSort, &pHandle, pList, pTagCond, pTagIndexCond, &pTaskInfo, groupIdMap);
273,328✔
1915
  if (code != 0) {
272,592✔
UNCOV
1916
    tableListDestroy(pList);
×
UNCOV
1917
    qError("failed to createScanTableListInfo, code:%s", tstrerror(code));
×
UNCOV
1918
    goto end;
×
1919
  }
1920
  *pTableListInfo = pList;
272,592✔
1921

1922
end:
272,592✔
1923
  return 0;
273,328✔
1924
}
1925

1926
static int32_t doFilterTableByTagCond(void* pVnode, STableListInfo* pListInfo, SArray* pUidList, SNode* pTagCond, SStorageAPI* pStorageAPI){
32,399✔
1927
  int32_t code = doFilterByTagCond(pListInfo->idInfo.suid, pUidList, pTagCond, pVnode, SFLT_NOT_INDEX, pStorageAPI, NULL);
32,399✔
1928
  if (code != 0) {
32,399✔
UNCOV
1929
    return code;
×
1930
  }
1931
  int32_t numOfTables = taosArrayGetSize(pUidList);
32,399✔
1932
  for (int i = 0; i < numOfTables; i++) {
61,357✔
1933
    void* tmp = taosArrayGet(pUidList, i);
28,958✔
1934
    if (tmp == NULL) {
28,958✔
UNCOV
1935
      return terrno;
×
1936
    }
1937
    STableKeyInfo info = {.uid = *(uint64_t*)tmp, .groupId = 0};
28,958✔
1938

1939
    void* p = taosArrayPush(((STableListInfo*)pListInfo)->pTableList, &info);
28,958✔
1940
    if (p == NULL) {
28,958✔
UNCOV
1941
      return terrno;
×
1942
    }
1943
  }
1944
  return code;
32,399✔
1945
}
1946

1947
int32_t qStreamFilterTableListForReader(void* pVnode, SArray* uidList,
32,399✔
1948
                                        SNodeList* pGroupTags, SNode* pTagCond, SNode* pTagIndexCond,
1949
                                        SStorageAPI* storageAPI, SHashObj* groupIdMap, uint64_t suid, SArray** tableList) {
1950
  int32_t code = TSDB_CODE_SUCCESS;
32,399✔
1951
  SArray* uidListCopy = NULL;
32,399✔
1952
  STableListInfo* pList = tableListCreate();
32,399✔
1953
  if (pList == NULL) {
32,399✔
UNCOV
1954
    code = terrno;
×
UNCOV
1955
    goto end;
×
1956
  }
1957
  uidListCopy = taosArrayDup(uidList, NULL);
32,399✔
1958
  if (uidListCopy == NULL) {
32,399✔
UNCOV
1959
    code = terrno;
×
UNCOV
1960
    goto end;
×
1961
  }
1962
  SScanPhysiNode pScanNode = {.suid = suid, .tableType = TD_SUPER_TABLE};
32,399✔
1963
  SReadHandle    pHandle = {.vnode = pVnode};
32,399✔
1964

1965
  pList->idInfo.suid = suid;
32,399✔
1966
  pList->idInfo.tableType = TD_SUPER_TABLE;
32,399✔
1967
  code = doFilterTableByTagCond(pVnode, pList, uidList, pTagCond, storageAPI);
32,399✔
1968
  if (code != TSDB_CODE_SUCCESS) {
32,399✔
UNCOV
1969
    goto end;
×
1970
  }                                              
1971
  code = buildGroupIdMapForAllTables(pList, &pHandle, &pScanNode, pGroupTags, false, NULL, storageAPI, groupIdMap);
32,399✔
1972
  if (code != TSDB_CODE_SUCCESS) {
32,399✔
UNCOV
1973
    goto end;
×
1974
  }
1975
  *tableList = pList->pTableList;
32,399✔
1976
  pList->pTableList = NULL;
32,399✔
1977

1978
  taosArrayClear(uidList);
32,399✔
1979
  for (int32_t i = 0; i < taosArrayGetSize(uidListCopy); i++){
64,798✔
1980
    void* tmp = taosArrayGet(uidListCopy, i);
32,399✔
1981
    if (tmp == NULL) {
32,399✔
UNCOV
1982
      continue;
×
1983
    }
1984
    int32_t* slot = taosHashGet(pList->map, tmp, LONG_BYTES);
32,399✔
1985
    if (slot == NULL) {
32,399✔
1986
      if (taosArrayPush(uidList, tmp) == NULL) {
3,441✔
UNCOV
1987
        code = terrno;
×
UNCOV
1988
        goto end;
×
1989
      }
1990
    }
1991
  }
1992
end:
32,399✔
1993
  taosArrayDestroy(uidListCopy);
32,399✔
1994
  tableListDestroy(pList);
32,399✔
1995
  return code;
32,399✔
1996
}
1997

1998
// int32_t qStreamGetGroupIndex(void* pTableListInfo, int64_t gid, TdThreadRwlock* lock) {
1999
//   int32_t index = -1;
2000
//   (void)taosThreadRwlockRdlock(lock);
2001
//   if (((STableListInfo*)pTableListInfo)->groupOffset == NULL){
2002
//     index = 0;
2003
//     goto end;
2004
//   }
2005
//   for (int32_t i = 0; i < ((STableListInfo*)pTableListInfo)->numOfOuputGroups; ++i) {
2006
//     int32_t offset = ((STableListInfo*)pTableListInfo)->groupOffset[i];
2007

2008
//     STableKeyInfo* pKeyInfo = taosArrayGet(((STableListInfo*)pTableListInfo)->pTableList, offset);
2009
//     if (pKeyInfo != NULL && pKeyInfo->groupId == gid) {
2010
//       index = i;
2011
//       goto end;
2012
//     }
2013
//   }
2014
// end:
2015
//   (void)taosThreadRwlockUnlock(lock);
2016
//   return index;
2017
// }
2018

2019
void qStreamDestroyTableList(void* pTableListInfo) { tableListDestroy(pTableListInfo); }
273,328✔
2020
SArray*  qStreamGetTableListArray(void* pTableListInfo) {
272,592✔
2021
  STableListInfo* pList = pTableListInfo;
272,592✔
2022
  return pList->pTableList;
272,592✔
2023
}
2024

2025
int32_t qStreamFilter(SSDataBlock* pBlock, void* pFilterInfo, SColumnInfoData** pRet) { return doFilter(pBlock, pFilterInfo, NULL, pRet); }
1,492,122✔
2026

2027
void streamDestroyExecTask(qTaskInfo_t tInfo) {
2,829,955✔
2028
  qDebug("streamDestroyExecTask called, task:%p", tInfo);
2,829,955✔
2029
  qDestroyTask(tInfo);
2,829,955✔
2030
}
2,829,955✔
2031

2032
int32_t streamCalcOneScalarExpr(SNode* pExpr, SScalarParam* pDst, const SStreamRuntimeFuncInfo* pExtraParams) {
777,369✔
2033
  return streamCalcOneScalarExprInRange(pExpr, pDst, -1, -1, pExtraParams);
777,369✔
2034
}
2035

2036
int32_t streamCalcOneScalarExprInRange(SNode* pExpr, SScalarParam* pDst, int32_t rowStartIdx, int32_t rowEndIdx,
832,024✔
2037
                                       const SStreamRuntimeFuncInfo* pExtraParams) {
2038
  int32_t      code = 0;
832,024✔
2039
  SNode*       pNode = NULL;
832,024✔
2040
  SNodeList*   pList = NULL;
832,024✔
2041
  SExprInfo*   pExprInfo = NULL;
832,024✔
2042
  int32_t      numOfExprs = 1;
832,024✔
2043
  int32_t*     offset = NULL;
832,024✔
2044
  STargetNode* pTargetNode = NULL;
832,024✔
2045
  code = nodesMakeNode(QUERY_NODE_TARGET, (SNode**)&pTargetNode);
832,024✔
2046
  if (code == 0) {
832,024✔
2047
    code = nodesCloneNode(pExpr, &pNode);
832,024✔
2048
  }
2049

2050
  if (code == 0) {
831,800✔
2051
    pTargetNode->dataBlockId = 0;
831,800✔
2052
    pTargetNode->pExpr = pNode;
831,800✔
2053
    pTargetNode->slotId = 0;
831,800✔
2054
  }
2055
  if (code == 0) {
831,800✔
2056
    code = nodesMakeList(&pList);
831,800✔
2057
  }
2058
  if (code == 0) {
831,800✔
2059
    code = nodesListAppend(pList, (SNode*)pTargetNode);
831,800✔
2060
  }
2061
  if (code == 0) {
832,024✔
2062
    pNode = NULL;
832,024✔
2063
    code = createExprInfo(pList, NULL, &pExprInfo, &numOfExprs);
832,024✔
2064
  }
2065

2066
  if (code == 0) {
831,802✔
2067
    const char* pVal = NULL;
831,802✔
2068
    int32_t     len = 0;
831,802✔
2069
    SNode*      pSclNode = NULL;
831,802✔
2070
    switch (pExprInfo->pExpr->nodeType) {
831,802✔
2071
      case QUERY_NODE_FUNCTION:
831,802✔
2072
        pSclNode = (SNode*)pExprInfo->pExpr->_function.pFunctNode;
831,802✔
2073
        break;
832,024✔
UNCOV
2074
      case QUERY_NODE_OPERATOR:
×
UNCOV
2075
        pSclNode = pExprInfo->pExpr->_optrRoot.pRootNode;
×
UNCOV
2076
        break;
×
UNCOV
2077
      default:
×
UNCOV
2078
        code = TSDB_CODE_OPS_NOT_SUPPORT;
×
UNCOV
2079
        break;
×
2080
    }
2081
    SArray*     pBlockList = taosArrayInit(2, POINTER_BYTES);
832,024✔
2082
    SSDataBlock block = {0};
832,024✔
2083
    block.info.rows = 1;
832,024✔
2084
    SSDataBlock* pBlock = &block;
832,024✔
2085
    void*        tmp = taosArrayPush(pBlockList, &pBlock);
831,802✔
2086
    if (tmp == NULL) {
831,802✔
UNCOV
2087
      code = terrno;
×
2088
    }
2089
    if (code == 0) {
831,802✔
2090
      gTaskScalarExtra.pStreamInfo = (void*)pExtraParams;
831,802✔
2091
      gTaskScalarExtra.pStreamRange = NULL;
831,802✔
2092
      code = scalarCalculateInRange(pSclNode, pBlockList, pDst, rowStartIdx, rowEndIdx, &gTaskScalarExtra);
831,802✔
2093
    }
2094
    taosArrayDestroy(pBlockList);
831,686✔
2095
  }
2096
  nodesDestroyList(pList);
831,908✔
2097
  destroyExprInfo(pExprInfo, numOfExprs);
831,908✔
2098
  taosMemoryFreeClear(pExprInfo);
831,908✔
2099
  return code;
831,792✔
2100
}
2101

2102
int32_t streamForceOutput(qTaskInfo_t tInfo, SSDataBlock** pRes, int32_t winIdx) {
2,245,650✔
2103
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
2,245,650✔
2104
  const SArray*  pForceOutputCols = pTaskInfo->pStreamRuntimeInfo->pForceOutputCols;
2,245,650✔
2105
  int32_t        code = 0;
2,245,650✔
2106
  SNode*         pNode = NULL;
2,245,650✔
2107
  if (!pForceOutputCols) return 0;
2,245,650✔
2108
  if (!*pRes) {
54,655✔
2109
    code = createDataBlock(pRes);
54,655✔
2110
  }
2111

2112
  if (code == 0 && (!(*pRes)->pDataBlock || (*pRes)->pDataBlock->size == 0)) {
54,655✔
2113
    int32_t idx = 0;
54,655✔
2114
    for (int32_t i = 0; i < pForceOutputCols->size; ++i) {
218,429✔
2115
      SStreamOutCol*  pCol = (SStreamOutCol*)taosArrayGet(pForceOutputCols, i);
163,774✔
2116
      SColumnInfoData colInfo = createColumnInfoData(pCol->type.type, pCol->type.bytes, idx++);
163,774✔
2117
      colInfo.info.precision = pCol->type.precision;
163,774✔
2118
      colInfo.info.scale = pCol->type.scale;
163,774✔
2119
      code = blockDataAppendColInfo(*pRes, &colInfo);
163,774✔
2120
      if (code != 0) break;
163,774✔
2121
    }
2122
  }
2123

2124
  code = blockDataEnsureCapacity(*pRes, (*pRes)->info.rows + 1);
54,655✔
2125
  if (code != TSDB_CODE_SUCCESS) {
54,655✔
UNCOV
2126
    qError("failed to ensure capacity for force output, code:%s", tstrerror(code));
×
UNCOV
2127
    return code;
×
2128
  }
2129

2130
  // loop all exprs for force output, execute all exprs
2131
  int32_t idx = 0;
54,655✔
2132
  int32_t rowIdx = (*pRes)->info.rows;
54,655✔
2133
  int32_t tmpWinIdx = pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx;
54,655✔
2134
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = winIdx;
54,655✔
2135
  for (int32_t i = 0; i < pForceOutputCols->size; ++i) {
218,429✔
2136
    SScalarParam   dst = {0};
163,774✔
2137
    SStreamOutCol* pCol = (SStreamOutCol*)taosArrayGet(pForceOutputCols, i);
163,774✔
2138
    code = nodesStringToNode(pCol->expr, &pNode);
163,774✔
2139
    if (code != 0) break;
163,774✔
2140
    SColumnInfoData* pInfo = taosArrayGet((*pRes)->pDataBlock, idx);
163,774✔
2141
    if (nodeType(pNode) == QUERY_NODE_VALUE) {
163,774✔
2142
      void* p = nodesGetValueFromNode((SValueNode*)pNode);
109,119✔
2143
      code = colDataSetVal(pInfo, rowIdx, p, ((SValueNode*)pNode)->isNull);
109,119✔
2144
    } else {
2145
      dst.columnData = pInfo;
54,655✔
2146
      dst.numOfRows = rowIdx;
54,655✔
2147
      dst.colAlloced = false;
54,655✔
2148
      code = streamCalcOneScalarExprInRange(pNode, &dst, rowIdx, rowIdx, &pTaskInfo->pStreamRuntimeInfo->funcInfo);
54,655✔
2149
    }
2150
    ++idx;
163,774✔
2151
    // TODO sclFreeParam(&dst);
2152
    nodesDestroyNode(pNode);
163,774✔
2153
    if (code != 0) break;
163,774✔
2154
  }
2155
  if (code == TSDB_CODE_SUCCESS) {
54,655✔
2156
    (*pRes)->info.rows++;
54,655✔
2157
  }
2158
  pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = tmpWinIdx;
54,655✔
2159
  return code;
54,655✔
2160
}
2161

2162
int32_t streamCalcOutputTbName(SNode* pExpr, char* tbname, SStreamRuntimeFuncInfo* pStreamRuntimeInfo) {
266,715✔
2163
  int32_t      code = 0;
266,715✔
2164
  const char*  pVal = NULL;
266,715✔
2165
  SScalarParam dst = {0};
266,715✔
2166
  int32_t      len = 0;
266,715✔
2167
  int32_t      nextIdx = pStreamRuntimeInfo->curIdx;
266,715✔
2168
  pStreamRuntimeInfo->curIdx = 0;  // always use the first window to calc tbname
266,715✔
2169
  // execute the expr
2170
  switch (pExpr->type) {
266,715✔
UNCOV
2171
    case QUERY_NODE_VALUE: {
×
2172
      SValueNode* pValue = (SValueNode*)pExpr;
×
2173
      int32_t     type = pValue->node.resType.type;
×
2174
      if (!IS_STR_DATA_TYPE(type)) {
×
2175
        qError("invalid sub tb expr with non-str type");
×
2176
        code = TSDB_CODE_INVALID_PARA;
×
UNCOV
2177
        break;
×
2178
      }
2179
      void* pTmp = nodesGetValueFromNode((SValueNode*)pExpr);
×
2180
      if (pTmp == NULL) {
×
UNCOV
2181
        qError("invalid sub tb expr with null value");
×
UNCOV
2182
        code = TSDB_CODE_INVALID_PARA;
×
UNCOV
2183
        break;
×
2184
      }
2185
      pVal = varDataVal(pTmp);
×
2186
      len = varDataLen(pTmp);
×
UNCOV
2187
    } break;
×
2188
    case QUERY_NODE_FUNCTION: {
266,715✔
2189
      SFunctionNode* pFunc = (SFunctionNode*)pExpr;
266,715✔
2190
      if (!IS_STR_DATA_TYPE(pFunc->node.resType.type)) {
266,715✔
2191
        qError("invalid sub tb expr with non-str type func");
×
2192
        code = TSDB_CODE_INVALID_PARA;
×
UNCOV
2193
        break;
×
2194
      }
2195
      SColumnInfoData* pCol = taosMemoryCalloc(1, sizeof(SColumnInfoData));
266,715✔
2196
      if (!pCol) {
266,715✔
UNCOV
2197
        code = terrno;
×
UNCOV
2198
        qError("failed to allocate col info data at: %s, %d", __func__, __LINE__);
×
UNCOV
2199
        break;
×
2200
      }
2201

2202
      pCol->hasNull = true;
266,715✔
2203
      pCol->info.type = ((SExprNode*)pExpr)->resType.type;
266,715✔
2204
      pCol->info.colId = 0;
266,715✔
2205
      pCol->info.bytes = ((SExprNode*)pExpr)->resType.bytes;
266,715✔
2206
      pCol->info.precision = ((SExprNode*)pExpr)->resType.precision;
266,715✔
2207
      pCol->info.scale = ((SExprNode*)pExpr)->resType.scale;
266,715✔
2208
      code = colInfoDataEnsureCapacity(pCol, 1, true);
266,715✔
2209
      if (code != 0) {
266,715✔
UNCOV
2210
        qError("failed to ensure capacity for col info data at: %s, %d", __func__, __LINE__);
×
UNCOV
2211
        taosMemoryFree(pCol);
×
UNCOV
2212
        break;
×
2213
      }
2214
      dst.columnData = pCol;
266,715✔
2215
      dst.numOfRows = 1;
266,715✔
2216
      dst.colAlloced = true;
266,715✔
2217
      code = streamCalcOneScalarExpr(pExpr, &dst, pStreamRuntimeInfo);
266,715✔
2218
      if (colDataIsNull_var(dst.columnData, 0)) {
266,715✔
2219
        qInfo("invalid sub tb expr with null value");
3,662✔
2220
        code = TSDB_CODE_MND_STREAM_TBNAME_CALC_FAILED;
3,440✔
2221
      }
2222
      if (code == 0) {
266,493✔
2223
        pVal = varDataVal(colDataGetVarData(dst.columnData, 0));
263,275✔
2224
        len = varDataLen(colDataGetVarData(dst.columnData, 0));
262,539✔
2225
      }
2226
    } break;
266,493✔
2227
    default:
×
2228
      qError("wrong subtable expr with type: %d", pExpr->type);
×
UNCOV
2229
      code = TSDB_CODE_OPS_NOT_SUPPORT;
×
UNCOV
2230
      break;
×
2231
  }
2232
  if (code == 0) {
266,493✔
2233
    if (!pVal || len == 0) {
263,053✔
UNCOV
2234
      qError("tbname generated with no characters which is not allowed");
×
UNCOV
2235
      code = TSDB_CODE_INVALID_PARA;
×
2236
    }
2237
    if(len > TSDB_TABLE_NAME_LEN - 1) {
263,053✔
2238
      qError("tbname generated with too long characters, max allowed is %d, got %d, truncated.", TSDB_TABLE_NAME_LEN - 1, len);
448✔
2239
      len = TSDB_TABLE_NAME_LEN - 1;
448✔
2240
    }
2241

2242
    memcpy(tbname, pVal, len);
263,053✔
2243
    tbname[len] = '\0';  // ensure null terminated
263,053✔
2244
    if (NULL != strchr(tbname, '.')) {
262,317✔
UNCOV
2245
      code = TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME;
×
UNCOV
2246
      qError("tbname generated with invalid characters, '.' is not allowed");
×
2247
    }
2248
  }
2249
  // TODO free dst
2250
  sclFreeParam(&dst);
265,757✔
2251
  pStreamRuntimeInfo->curIdx = nextIdx; // restore
265,757✔
2252
  return code;
265,979✔
2253
}
2254

2255
void destroyStreamInserterParam(SStreamInserterParam* pParam) {
270,610✔
2256
  if (pParam) {
270,610✔
2257
    if (pParam->tbname) {
270,610✔
2258
      taosMemFree(pParam->tbname);
270,610✔
2259
      pParam->tbname = NULL;
270,610✔
2260
    }
2261
    if (pParam->stbname) {
270,610✔
2262
      taosMemFree(pParam->stbname);
270,610✔
2263
      pParam->stbname = NULL;
270,610✔
2264
    }
2265
    if (pParam->dbFName) {
270,610✔
2266
      taosMemFree(pParam->dbFName);
270,610✔
2267
      pParam->dbFName = NULL;
270,610✔
2268
    }
2269
    if (pParam->pFields) {
270,610✔
2270
      taosArrayDestroy(pParam->pFields);
270,610✔
2271
      pParam->pFields = NULL;
270,610✔
2272
    }
2273
    if (pParam->pTagFields) {
270,610✔
2274
      taosArrayDestroy(pParam->pTagFields);
160,730✔
2275
      pParam->pTagFields = NULL;
160,730✔
2276
    }
2277
    if (pParam->colCids) {
270,610✔
2278
      taosArrayDestroy(pParam->colCids);
1,801✔
2279
      pParam->colCids = NULL;
1,801✔
2280
    }
2281
    if (pParam->tagCids) {
270,610✔
2282
      taosArrayDestroy(pParam->tagCids);
1,154✔
2283
      pParam->tagCids = NULL;
1,154✔
2284
    }
2285
    taosMemFree(pParam);
270,610✔
2286
  }
2287
}
270,610✔
2288

2289
int32_t cloneStreamInserterParam(SStreamInserterParam** ppDst, SStreamInserterParam* pSrc) {
270,610✔
2290
  int32_t code = 0, lino = 0;
270,610✔
2291
  if (ppDst == NULL || pSrc == NULL) {
270,610✔
UNCOV
2292
    TAOS_CHECK_EXIT(TSDB_CODE_INVALID_PARA);
×
2293
  }
2294
  *ppDst = (SStreamInserterParam*)taosMemoryCalloc(1, sizeof(SStreamInserterParam));
270,610✔
2295
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
270,123✔
2296

2297
  (*ppDst)->suid = pSrc->suid;
270,123✔
2298
  (*ppDst)->sver = pSrc->sver;
270,388✔
2299
  (*ppDst)->tbType = pSrc->tbType;
270,610✔
2300
  (*ppDst)->tbname = taosStrdup(pSrc->tbname);
270,123✔
2301
  TSDB_CHECK_NULL((*ppDst)->tbname, code, lino, _exit, terrno);
270,610✔
2302

2303
  if (pSrc->stbname) {
270,610✔
2304
    (*ppDst)->stbname = taosStrdup(pSrc->stbname);
270,123✔
2305
    TSDB_CHECK_NULL((*ppDst)->stbname, code, lino, _exit, terrno);
270,610✔
2306
  }
2307

2308
  (*ppDst)->dbFName = taosStrdup(pSrc->dbFName);
271,097✔
2309
  TSDB_CHECK_NULL((*ppDst)->dbFName, code, lino, _exit, terrno);
270,610✔
2310

2311
  (*ppDst)->pSinkHandle = pSrc->pSinkHandle;  // don't need clone and free
270,388✔
2312

2313
  if (pSrc->pFields && pSrc->pFields->size > 0) {
270,123✔
2314
    (*ppDst)->pFields = taosArrayDup(pSrc->pFields, NULL);
270,610✔
2315
    TSDB_CHECK_NULL((*ppDst)->pFields, code, lino, _exit, terrno);
270,610✔
2316
  } else {
UNCOV
2317
    (*ppDst)->pFields = NULL;
×
2318
  }
2319
  
2320
  if (pSrc->pTagFields && pSrc->pTagFields->size > 0) {
270,610✔
2321
    (*ppDst)->pTagFields = taosArrayDup(pSrc->pTagFields, NULL);
160,730✔
2322
    TSDB_CHECK_NULL((*ppDst)->pTagFields, code, lino, _exit, terrno);
160,730✔
2323
  } else {
2324
    (*ppDst)->pTagFields = NULL;
109,880✔
2325
  }
2326

2327
  if (pSrc->colCids && pSrc->colCids->size > 0) {
270,388✔
2328
    (*ppDst)->colCids = taosArrayDup(pSrc->colCids, NULL);
1,801✔
2329
    TSDB_CHECK_NULL((*ppDst)->colCids, code, lino, _exit, terrno);
1,801✔
2330
  } else {
2331
    (*ppDst)->colCids = NULL;
268,587✔
2332
  }
2333

2334
  if (pSrc->tagCids && pSrc->tagCids->size > 0) {
270,610✔
2335
    (*ppDst)->tagCids = taosArrayDup(pSrc->tagCids, NULL);
1,154✔
2336
    TSDB_CHECK_NULL((*ppDst)->tagCids, code, lino, _exit, terrno);
1,154✔
2337
  } else {
2338
    (*ppDst)->tagCids = NULL;
269,456✔
2339
  }
2340

2341
_exit:
270,610✔
2342

2343
  if (code != 0) {
270,610✔
UNCOV
2344
    if (*ppDst) {
×
UNCOV
2345
      destroyStreamInserterParam(*ppDst);
×
UNCOV
2346
      *ppDst = NULL;
×
2347
    }
2348
    
UNCOV
2349
    stError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2350
  }
2351
  return code;
270,610✔
2352
}
2353

2354
int32_t dropStreamTable(SMsgCb* pMsgCb, void* pOutput, SSTriggerDropRequest* pReq) {
258✔
2355
  return doDropStreamTable(pMsgCb, pOutput, pReq);
258✔
2356
}
2357

UNCOV
2358
int32_t dropStreamTableByTbName(SMsgCb* pMsgCb, void* pOutput, SSTriggerDropRequest* pReq, char* tbName) {
×
UNCOV
2359
  return doDropStreamTableByTbName(pMsgCb, pOutput, pReq, tbName);
×
2360
}
2361

2362
int32_t qFilterTableList(void* pVnode, SArray* uidList, SNode* node, void* pTaskInfo, uint64_t suid) {
6,018✔
2363
  int32_t         code = TSDB_CODE_SUCCESS;
6,018✔
2364

2365
  SNode* pTagCond = node == NULL ? NULL : ((SSubplan*)node)->pTagCond;
6,018✔
2366
  code = doFilterByTagCond(suid, uidList, pTagCond, pVnode, SFLT_NOT_INDEX, &((SExecTaskInfo*)pTaskInfo)->storageAPI, NULL);
6,018✔
2367
  if (code != TSDB_CODE_SUCCESS) {
6,018✔
UNCOV
2368
    goto end;
×
2369
  }
2370
end:
6,018✔
2371
  return code;
6,018✔
2372
}
2373

2374
bool isTrueForSatisfied(STrueForInfo* pTrueForInfo, int64_t skey, int64_t ekey, int64_t count) {
2,147,483,647✔
2375
  if (pTrueForInfo == NULL) {
2,147,483,647✔
2376
    return true;
2,147,483,647✔
2377
  }
2378

2379
  bool durationSatisfied = (pTrueForInfo->duration <= 0) || (llabs(ekey - skey) >= pTrueForInfo->duration);
2,147,483,647✔
2380
  bool countSatisfied = (pTrueForInfo->count <= 0) || (count >= pTrueForInfo->count);
2,147,483,647✔
2381
  switch (pTrueForInfo->trueForType) {
2,147,483,647✔
2382
    case TRUE_FOR_DURATION_ONLY:
2,147,483,647✔
2383
      return durationSatisfied;
2,147,483,647✔
2384
    case TRUE_FOR_COUNT_ONLY:
61,870✔
2385
      return countSatisfied;
61,870✔
2386
    case TRUE_FOR_AND:
58,880✔
2387
      return durationSatisfied && countSatisfied;
58,880✔
2388
    case TRUE_FOR_OR:
58,880✔
2389
      return durationSatisfied || countSatisfied;
58,880✔
UNCOV
2390
    default:
×
UNCOV
2391
      return true;
×
2392
  }
2393
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc