• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.96
/source/libs/executor/src/executor.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include "executorInt.h"
18
#include "operator.h"
19
#include "planner.h"
20
#include "querytask.h"
21
#include "tdatablock.h"
22
#include "tref.h"
23
#include "trpc.h"
24
#include "tudf.h"
25
#include "wal.h"
26

27
#include "storageapi.h"
28

29
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
30
int32_t             exchangeObjRefPool = -1;
31

32
static void cleanupRefPool() {
2,232✔
33
  int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
2,232✔
34
  taosCloseRef(ref);
2,232✔
35
}
2,232✔
36

37
static void initRefPool() {
2,232✔
38
  exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo);
2,232✔
39
  (void)atexit(cleanupRefPool);
2,232✔
40
}
2,232✔
41

42
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
1,392✔
43
  int32_t code = TSDB_CODE_SUCCESS;
1,392✔
44
  int32_t lino = 0;
1,392✔
45
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
1,392✔
46
    if (pOperator->numOfDownstream == 0) {
696!
47
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
×
48
      return TSDB_CODE_APP_ERROR;
×
49
    }
50

51
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
696!
52
      qError("join not supported for stream block scan, %s" PRIx64, id);
×
53
      return TSDB_CODE_APP_ERROR;
×
54
    }
55
    pOperator->status = OP_NOT_OPENED;
696✔
56
    return doSetSMABlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
696✔
57
  } else {
58
    pOperator->status = OP_NOT_OPENED;
696✔
59

60
    SStreamScanInfo* pInfo = pOperator->info;
696✔
61

62
    if (type == STREAM_INPUT__MERGED_SUBMIT) {
696✔
63
      for (int32_t i = 0; i < numOfBlocks; i++) {
1,248✔
64
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
624✔
65
        void*        tmp = taosArrayPush(pInfo->pBlockLists, pReq);
624✔
66
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
624!
67
      }
68
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
624✔
69
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
72!
70
      void* tmp = taosArrayPush(pInfo->pBlockLists, &input);
×
71
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
72
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
×
73
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
72✔
74
      for (int32_t i = 0; i < numOfBlocks; ++i) {
56✔
75
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
28✔
76
        SPackedData  tmp = {.pDataBlock = pDataBlock};
28✔
77
        void*        tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
28✔
78
        QUERY_CHECK_NULL(tmpItem, code, lino, _end, terrno);
28!
79
      }
80
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
28✔
81
    } else if (type == STREAM_INPUT__CHECKPOINT) {
44✔
82
      SPackedData tmp = {.pDataBlock = input};
28✔
83
      void*       tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
28✔
84
      QUERY_CHECK_NULL(tmpItem, code, lino, _end, terrno);
28!
85
      pInfo->blockType = STREAM_INPUT__CHECKPOINT;
28✔
86
    } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
16!
87
      for (int32_t i = 0; i < numOfBlocks; ++i) {
32✔
88
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
16✔
89
        void*        tmp = taosArrayPush(pInfo->pBlockLists, pReq);
16✔
90
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
16!
91
      }
92
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
16✔
93
    }
94

95
    return TSDB_CODE_SUCCESS;
696✔
96
  }
97

98
_end:
×
99
  if (code != TSDB_CODE_SUCCESS) {
×
100
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
101
  }
102
  return code;
×
103
}
104

105
static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
5,608✔
106
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
5,608✔
107
    if (pOperator->numOfDownstream == 0) {
3,259!
108
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
×
109
      return TSDB_CODE_APP_ERROR;
×
110
    }
111

112
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
3,259!
113
      qError("join not supported for stream block scan, %s" PRIx64, id);
×
114
      return TSDB_CODE_APP_ERROR;
×
115
    }
116

117
    pOperator->status = OP_NOT_OPENED;
3,259✔
118
    return doSetStreamOpOpen(pOperator->pDownstream[0], id);
3,259✔
119
  }
120
  return 0;
2,349✔
121
}
122

123
static void clearStreamBlock(SOperatorInfo* pOperator) {
×
124
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
×
125
    if (pOperator->numOfDownstream == 1) {
×
126
      return clearStreamBlock(pOperator->pDownstream[0]);
×
127
    }
128
  } else {
129
    SStreamScanInfo* pInfo = pOperator->info;
×
130
    doClearBufferedBlocks(pInfo);
×
131
  }
132
}
133

134
void qResetTaskInfoCode(qTaskInfo_t tinfo) {
×
135
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
136
  pTaskInfo->code = 0;
×
137
  clearStreamBlock(pTaskInfo->pRoot);
×
138
}
×
139

140
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type,
117,744✔
141
                                const char* id) {
142
  int32_t code = TSDB_CODE_SUCCESS;
117,744✔
143
  int32_t lino = 0;
117,744✔
144
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN &&
117,744✔
145
      pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN) {
72,618!
146
    if (pOperator->numOfDownstream == 0) {
72,625!
147
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
×
148
      return TSDB_CODE_APP_ERROR;
×
149
    }
150

151
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
72,625!
152
      qError("join not supported for stream block scan, %s" PRIx64, id);
×
153
      return TSDB_CODE_APP_ERROR;
×
154
    }
155
    pOperator->status = OP_NOT_OPENED;
72,625✔
156
    return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
72,625✔
157
  } else {
158
    pOperator->status = OP_NOT_OPENED;
45,119✔
159
    SStreamScanInfo* pInfo = pOperator->info;
45,119✔
160

161
    qDebug("s-task:%s in this batch, %d blocks need to be processed", id, (int32_t)numOfBlocks);
45,119✔
162
    QUERY_CHECK_CONDITION((pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0), code, lino, _end,
45,169!
163
                          TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
164

165
    if (type == STREAM_INPUT__MERGED_SUBMIT) {
45,168✔
166
      for (int32_t i = 0; i < numOfBlocks; i++) {
486,624✔
167
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
462,202✔
168
        void*        tmp = taosArrayPush(pInfo->pBlockLists, pReq);
462,202✔
169
        QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
462,185!
170
      }
171

172
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
24,422✔
173
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
20,729✔
174
      void* tmp = taosArrayPush(pInfo->pBlockLists, input);
7,645✔
175
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
7,591!
176

177
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
7,591✔
178
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
13,084✔
179
      for (int32_t i = 0; i < numOfBlocks; ++i) {
23,118✔
180
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
13,612✔
181
        SPackedData  tmp = {.pDataBlock = pDataBlock};
13,612✔
182
        void*        tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
13,612✔
183
        QUERY_CHECK_NULL(tmpItem, code, lino, _end, terrno);
13,608!
184
      }
185

186
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
9,506✔
187
    } else if (type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__RECALCULATE) {
3,574!
188
      SPackedData tmp = {.pDataBlock = input};
3,574✔
189
      void*       tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
3,574✔
190
      QUERY_CHECK_NULL(tmpItem, code, lino, _end, terrno);
3,578!
191

192
      pInfo->blockType =
3,578✔
193
          (type == STREAM_INPUT__CHECKPOINT_TRIGGER) ? STREAM_INPUT__CHECKPOINT : STREAM_INPUT__RECALCULATE;
3,578!
194
    } else {
UNCOV
195
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
196
      QUERY_CHECK_CODE(code, lino, _end);
×
197
    }
198

199
    return TSDB_CODE_SUCCESS;
45,097✔
200
  }
201

202
_end:
×
203
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
204
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
205
  }
UNCOV
206
  return code;
×
207
}
208

209
int32_t doSetTaskId(SOperatorInfo* pOperator, SStorageAPI* pAPI) {
524,420✔
210
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
524,420✔
211
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
524,420✔
212
    SStreamScanInfo* pStreamScanInfo = pOperator->info;
260,490✔
213
    if (pStreamScanInfo->pTableScanOp != NULL) {
260,490✔
214
      STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info;
260,193✔
215
      if (pScanInfo->base.dataReader != NULL) {
260,193✔
216
        int32_t code = pAPI->tsdReader.tsdSetReaderTaskId(pScanInfo->base.dataReader, pTaskInfo->id.str);
921✔
217
        if (code) {
922!
218
          qError("failed to set reader id for executor, code:%s", tstrerror(code));
×
UNCOV
219
          return code;
×
220
        }
221
      }
222
    }
223
  } else {
224
    return doSetTaskId(pOperator->pDownstream[0], pAPI);
263,930✔
225
  }
226

227
  return 0;
260,491✔
228
}
229

230
int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
260,492✔
231
  SExecTaskInfo* pTaskInfo = tinfo;
260,492✔
232
  pTaskInfo->id.queryId = queryId;
260,492✔
233
  buildTaskId(taskId, queryId, pTaskInfo->id.str);
260,492✔
234

235
  // set the idstr for tsdbReader
236
  return doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI);
260,492✔
237
}
238

239
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
2,349✔
240
  if (tinfo == NULL) {
2,349!
UNCOV
241
    return TSDB_CODE_APP_ERROR;
×
242
  }
243

244
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2,349✔
245
  int32_t        code = doSetStreamOpOpen(pTaskInfo->pRoot, GET_TASKID(pTaskInfo));
2,349✔
246
  if (code != TSDB_CODE_SUCCESS) {
2,348!
UNCOV
247
    qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
×
248
  } else {
249
    qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
2,348✔
250
  }
251

252
  return code;
2,348✔
253
}
254

255
int32_t qSetStreamNotifyInfo(qTaskInfo_t tinfo, int32_t eventTypes, const SSchemaWrapper* pSchemaWrapper,
7,494✔
256
                             const char* stbFullName, bool newSubTableRule, STaskNotifyEventStat* pNotifyEventStat) {
257
  int32_t code = TSDB_CODE_SUCCESS;
7,494✔
258
  SStreamTaskInfo *pStreamInfo = NULL;
7,494✔
259

260
  if (tinfo == 0 || eventTypes == 0 || pSchemaWrapper == NULL || stbFullName == NULL) {
7,494!
261
    goto _end;
7,494✔
262
  }
263

264
  pStreamInfo = &((SExecTaskInfo*)tinfo)->streamInfo;
×
265
  pStreamInfo->eventTypes = eventTypes;
×
266
  pStreamInfo->notifyResultSchema = tCloneSSchemaWrapper(pSchemaWrapper);
×
267
  if (pStreamInfo->notifyResultSchema == NULL) {
×
UNCOV
268
    code = terrno;
×
269
  }
270
  pStreamInfo->stbFullName = taosStrdup(stbFullName);
×
271
  pStreamInfo->newSubTableRule = newSubTableRule;
×
UNCOV
272
  pStreamInfo->pNotifyEventStat = pNotifyEventStat;
×
273

274
_end:
7,494✔
275
  return code;
7,494✔
276
}
277

278
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
45,152✔
279
  if (tinfo == NULL) {
45,152!
UNCOV
280
    return TSDB_CODE_APP_ERROR;
×
281
  }
282

283
  if (pBlocks == NULL || numOfBlocks == 0) {
45,152!
UNCOV
284
    return TSDB_CODE_SUCCESS;
×
285
  }
286

287
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
45,164✔
288

289
  int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
45,164✔
290
  if (code != TSDB_CODE_SUCCESS) {
45,115!
UNCOV
291
    qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
×
292
  } else {
293
    qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
45,115✔
294
  }
295

296
  return code;
45,118✔
297
}
298

299
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
696✔
300
  if (tinfo == NULL) {
696!
UNCOV
301
    return TSDB_CODE_APP_ERROR;
×
302
  }
303

304
  if (pBlocks == NULL || numOfBlocks == 0) {
696!
UNCOV
305
    return TSDB_CODE_SUCCESS;
×
306
  }
307

308
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
696✔
309

310
  int32_t code = doSetSMABlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
696✔
311
  if (code != TSDB_CODE_SUCCESS) {
696!
UNCOV
312
    qError("%s failed to set the sma block data", GET_TASKID(pTaskInfo));
×
313
  } else {
314
    qDebug("%s set the sma block successfully", GET_TASKID(pTaskInfo));
696!
315
  }
316

317
  return code;
696✔
318
}
319

320
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols,
1,551✔
321
                                     uint64_t id) {
322
  if (msg == NULL) {  // create raw scan
1,551✔
323
    SExecTaskInfo* pTaskInfo = NULL;
308✔
324

325
    int32_t code = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, &pReaderHandle->api, &pTaskInfo);
308✔
326
    if (NULL == pTaskInfo || code != 0) {
308!
UNCOV
327
      return NULL;
×
328
    }
329

330
    code = createRawScanOperatorInfo(pReaderHandle, pTaskInfo, &pTaskInfo->pRoot);
308✔
331
    if (NULL == pTaskInfo->pRoot || code != 0) {
308!
332
      taosMemoryFree(pTaskInfo);
×
UNCOV
333
      return NULL;
×
334
    }
335

336
    pTaskInfo->storageAPI = pReaderHandle->api;
308✔
337
    qDebug("create raw scan task info completed, vgId:%d, %s", vgId, GET_TASKID(pTaskInfo));
308✔
338
    return pTaskInfo;
308✔
339
  }
340

341
  SSubplan* pPlan = NULL;
1,243✔
342
  int32_t   code = qStringToSubplan(msg, &pPlan);
1,243✔
343
  if (code != TSDB_CODE_SUCCESS) {
1,239!
344
    terrno = code;
×
UNCOV
345
    return NULL;
×
346
  }
347

348
  qTaskInfo_t pTaskInfo = NULL;
1,239✔
349
  code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_QUEUE);
1,239✔
350
  if (code != TSDB_CODE_SUCCESS) {
1,242!
351
    qDestroyTask(pTaskInfo);
×
352
    terrno = code;
×
353
    return NULL;
2✔
354
  }
355

356
  // extract the number of output columns
357
  SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc;
1,242✔
358
  *numOfCols = 0;
1,242✔
359

360
  SNode* pNode;
361
  FOREACH(pNode, pDescNode->pSlots) {
8,293!
362
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
7,051✔
363
    if (pSlotDesc->output) {
7,051✔
364
      ++(*numOfCols);
7,045✔
365
    }
366
  }
367

368
  return pTaskInfo;
1,242✔
369
}
370

371
int32_t qCreateStreamExecTaskInfo(qTaskInfo_t* pTaskInfo, void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId) {
7,530✔
372
  if (msg == NULL) {
7,530!
UNCOV
373
    return TSDB_CODE_INVALID_PARA;
×
374
  }
375

376
  *pTaskInfo = NULL;
7,530✔
377

378
  SSubplan* pPlan = NULL;
7,530✔
379
  int32_t   code = qStringToSubplan(msg, &pPlan);
7,530✔
380
  if (code != TSDB_CODE_SUCCESS) {
7,530!
UNCOV
381
    return code;
×
382
  }
383

384
  code = qCreateExecTask(readers, vgId, taskId, pPlan, pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM);
7,530✔
385
  if (code != TSDB_CODE_SUCCESS) {
7,528!
386
    qDestroyTask(*pTaskInfo);
×
UNCOV
387
    return code;
×
388
  }
389

390
  code = qStreamInfoResetTimewindowFilter(*pTaskInfo);
7,528✔
391
  if (code != TSDB_CODE_SUCCESS) {
7,529!
UNCOV
392
    qDestroyTask(*pTaskInfo);
×
393
  }
394

395
  return code;
7,529✔
396
}
397

398
static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr,
21,555✔
399
                                       SStorageAPI* pAPI, SArray** ppArrayRes) {
400
  int32_t code = TSDB_CODE_SUCCESS;
21,555✔
401
  int32_t lino = 0;
21,555✔
402
  SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
21,555✔
403
  QUERY_CHECK_NULL(qa, code, lino, _error, terrno);
21,557!
404
  int32_t numOfUids = taosArrayGetSize(tableIdList);
21,557✔
405
  if (numOfUids == 0) {
21,556✔
406
    (*ppArrayRes) = qa;
1,802✔
407
    goto _error;
1,802✔
408
  }
409

410
  STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
19,754✔
411

412
  uint64_t suid = 0;
19,754✔
413
  uint64_t uid = 0;
19,754✔
414
  int32_t  type = 0;
19,754✔
415
  tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &suid, &uid, &type);
19,754✔
416

417
  // let's discard the tables those are not created according to the queried super table.
418
  SMetaReader mr = {0};
19,755✔
419
  pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
19,755✔
420
  for (int32_t i = 0; i < numOfUids; ++i) {
59,207✔
421
    uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
39,455✔
422
    QUERY_CHECK_NULL(id, code, lino, _end, terrno);
39,455!
423

424
    int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr, *id);
39,455✔
425
    if (code != TSDB_CODE_SUCCESS) {
39,451!
426
      qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
×
UNCOV
427
      continue;
×
428
    }
429

430
    tDecoderClear(&mr.coder);
39,451✔
431

432
    if (mr.me.type == TSDB_SUPER_TABLE) {
39,455!
UNCOV
433
      continue;
×
434
    } else {
435
      if (type == TSDB_SUPER_TABLE) {
39,455✔
436
        // this new created child table does not belong to the scanned super table.
437
        if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != suid) {
32,097!
438
          continue;
30,573✔
439
        }
440
      } else {  // ordinary table
441
        // In case that the scanned target table is an ordinary table. When replay the WAL during restore the vnode, we
442
        // should check all newly created ordinary table to make sure that this table isn't the destination table.
443
        if (mr.me.uid != uid) {
7,358✔
444
          continue;
7,356✔
445
        }
446
      }
447
    }
448

449
    if (pScanInfo->pTagCond != NULL) {
1,526✔
450
      bool          qualified = false;
1,025✔
451
      STableKeyInfo info = {.groupId = 0, .uid = mr.me.uid};
1,025✔
452
      code = isQualifiedTable(&info, pScanInfo->pTagCond, pScanInfo->readHandle.vnode, &qualified, pAPI);
1,025✔
453
      if (code != TSDB_CODE_SUCCESS) {
1,025!
UNCOV
454
        qError("failed to filter new table, uid:0x%" PRIx64 ", %s", info.uid, idstr);
×
455
        continue;
513✔
456
      }
457

458
      if (!qualified) {
1,025✔
459
        continue;
513✔
460
      }
461
    }
462

463
    // handle multiple partition
464
    void* tmp = taosArrayPush(qa, id);
1,011✔
465
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,011!
466
  }
467

468
_end:
19,752✔
469

470
  pAPI->metaReaderFn.clearReader(&mr);
19,752✔
471
  (*ppArrayRes) = qa;
19,753✔
472

473
_error:
21,555✔
474
  if (code != TSDB_CODE_SUCCESS) {
21,555!
UNCOV
475
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
476
  }
477
  return code;
21,554✔
478
}
479

480
int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
22,333✔
481
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
22,333✔
482
  const char*    id = GET_TASKID(pTaskInfo);
22,333✔
483
  int32_t        code = 0;
22,333✔
484

485
  if (isAdd) {
22,333✔
486
    qDebug("try to add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), id);
21,571✔
487
  }
488

489
  // traverse to the stream scanner node to add this table id
490
  SOperatorInfo* pInfo = NULL;
22,333✔
491
  code = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pInfo);
22,333✔
492
  if (code != 0 || pInfo == NULL) {
22,329!
493
    return code;
15✔
494
  }
495

496
  SStreamScanInfo* pScanInfo = pInfo->info;
22,314✔
497
  if (pInfo->pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {  // clear meta cache for subscription if tag is changed
22,314✔
498
    for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
2,363✔
499
      int64_t*        uid = (int64_t*)taosArrayGet(tableIdList, i);
1,209✔
500
      STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
1,211✔
501
      taosLRUCacheErase(pTableScanInfo->base.metaCache.pTableMetaEntryCache, uid, LONG_BYTES);
1,211✔
502
    }
503
  }
504

505
  if (isAdd) {  // add new table id
22,316✔
506
    SArray* qa = NULL;
21,556✔
507
    code = filterUnqualifiedTables(pScanInfo, tableIdList, id, &pTaskInfo->storageAPI, &qa);
21,556✔
508
    if (code != TSDB_CODE_SUCCESS) {
21,553!
509
      taosArrayDestroy(qa);
×
UNCOV
510
      return code;
×
511
    }
512
    int32_t numOfQualifiedTables = taosArrayGetSize(qa);
21,553✔
513
    qDebug("%d qualified child tables added into stream scanner, %s", numOfQualifiedTables, id);
21,553✔
514
    pTaskInfo->storageAPI.tqReaderFn.tqReaderAddTables(pScanInfo->tqReader, qa);
21,553✔
515

516
    bool   assignUid = false;
21,552✔
517
    size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
21,552✔
518
    char*  keyBuf = NULL;
21,554✔
519
    if (bufLen > 0) {
21,554✔
520
      assignUid = groupbyTbname(pScanInfo->pGroupTags);
9,776✔
521
      keyBuf = taosMemoryMalloc(bufLen);
9,776!
522
      if (keyBuf == NULL) {
9,777!
523
        taosArrayDestroy(qa);
×
UNCOV
524
        return terrno;
×
525
      }
526
    }
527

528
    STableListInfo* pTableListInfo = ((STableScanInfo*)pScanInfo->pTableScanOp->info)->base.pTableListInfo;
21,555✔
529
    taosWLockLatch(&pTaskInfo->lock);
21,555✔
530

531
    for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
22,564✔
532
      uint64_t* uid = taosArrayGet(qa, i);
1,011✔
533
      if (!uid) {
1,011!
534
        taosMemoryFree(keyBuf);
×
535
        taosArrayDestroy(qa);
×
536
        taosWUnLockLatch(&pTaskInfo->lock);
×
UNCOV
537
        return terrno;
×
538
      }
539
      STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
1,011✔
540

541
      if (bufLen > 0) {
1,011✔
542
        if (assignUid) {
208✔
543
          keyInfo.groupId = keyInfo.uid;
194✔
544
        } else {
545
          code = getGroupIdFromTagsVal(pScanInfo->readHandle.vnode, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
14✔
546
                                       &keyInfo.groupId, &pTaskInfo->storageAPI);
547
          if (code != TSDB_CODE_SUCCESS) {
14!
548
            taosMemoryFree(keyBuf);
×
549
            taosArrayDestroy(qa);
×
550
            taosWUnLockLatch(&pTaskInfo->lock);
×
UNCOV
551
            return code;
×
552
          }
553
        }
554
      }
555

556
      code = tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
1,011✔
557
      if (code != TSDB_CODE_SUCCESS) {
1,011!
558
        taosMemoryFree(keyBuf);
×
559
        taosArrayDestroy(qa);
×
560
        taosWUnLockLatch(&pTaskInfo->lock);
×
UNCOV
561
        return code;
×
562
      }
563
    }
564

565
    taosWUnLockLatch(&pTaskInfo->lock);
21,553✔
566
    if (keyBuf != NULL) {
21,555✔
567
      taosMemoryFree(keyBuf);
9,777!
568
    }
569

570
    taosArrayDestroy(qa);
21,554✔
571
  } else {  // remove the table id in current list
572
    qDebug("%d remove child tables from the stream scanner, %s", (int32_t)taosArrayGetSize(tableIdList), id);
760✔
573
    taosWLockLatch(&pTaskInfo->lock);
760✔
574
    pTaskInfo->storageAPI.tqReaderFn.tqReaderRemoveTables(pScanInfo->tqReader, tableIdList);
760✔
575
    taosWUnLockLatch(&pTaskInfo->lock);
760✔
576
  }
577

578
  return code;
22,314✔
579
}
580

581
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, int32_t dbNameBuffLen, char* tableName,
12,665,582✔
582
                                    int32_t tbaleNameBuffLen, int32_t* sversion, int32_t* tversion, int32_t idx,
583
                                    bool* tbGet) {
584
  *tbGet = false;
12,665,582✔
585

586
  if (tinfo == NULL || dbName == NULL || tableName == NULL) {
12,665,582!
UNCOV
587
    return TSDB_CODE_INVALID_PARA;
×
588
  }
589
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
12,676,501✔
590

591
  if (taosArrayGetSize(pTaskInfo->schemaInfos) <= idx) {
12,676,501✔
592
    return TSDB_CODE_SUCCESS;
7,984,359✔
593
  }
594

595
  SSchemaInfo* pSchemaInfo = taosArrayGet(pTaskInfo->schemaInfos, idx);
4,687,552✔
596
  if (!pSchemaInfo) {
4,685,882!
597
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
598
    return terrno;
×
599
  }
600

601
  *sversion = pSchemaInfo->sw->version;
4,687,303✔
602
  *tversion = pSchemaInfo->tversion;
4,687,303✔
603
  if (pSchemaInfo->dbname) {
4,687,303!
604
    tstrncpy(dbName, pSchemaInfo->dbname, dbNameBuffLen);
4,687,303✔
605
  } else {
UNCOV
606
    dbName[0] = 0;
×
607
  }
608
  if (pSchemaInfo->tablename) {
4,687,303!
609
    tstrncpy(tableName, pSchemaInfo->tablename, tbaleNameBuffLen);
4,690,027✔
610
  } else {
UNCOV
611
    tableName[0] = 0;
×
612
  }
613

614
  *tbGet = true;
4,687,303✔
615

616
  return TSDB_CODE_SUCCESS;
4,687,303✔
617
}
618

619
bool qIsDynamicExecTask(qTaskInfo_t tinfo) { return ((SExecTaskInfo*)tinfo)->dynamicTask; }
7,977,817✔
620

621
void destroyOperatorParam(SOperatorParam* pParam) {
974✔
622
  if (NULL == pParam) {
974✔
623
    return;
950✔
624
  }
625

626
  // TODO
627
}
628

629
void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) {
974✔
630
  destroyOperatorParam(((SExecTaskInfo*)tinfo)->pOpParam);
974✔
631
  ((SExecTaskInfo*)tinfo)->pOpParam = pParam;
974✔
632
  ((SExecTaskInfo*)tinfo)->paramSet = false;
974✔
633
}
974✔
634

635
int32_t qExecutorInit(void) {
14,291✔
636
  (void)taosThreadOnce(&initPoolOnce, initRefPool);
14,291✔
637
  return TSDB_CODE_SUCCESS;
14,305✔
638
}
639

640
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
8,129,714✔
641
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql,
642
                        EOPTR_EXEC_MODEL model) {
643
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
8,129,714✔
644
  (void)taosThreadOnce(&initPoolOnce, initRefPool);
8,129,714✔
645

646
  qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
8,131,828✔
647

648
  int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model);
8,131,899✔
649
  if (code != TSDB_CODE_SUCCESS || NULL == *pTask) {
8,126,002!
UNCOV
650
    qError("failed to createExecTaskInfo, code:%s", tstrerror(code));
×
651
    goto _error;
328✔
652
  }
653

654
  if (handle) {
8,128,484✔
655
    SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50, .compress = compressResult};
8,120,348✔
656
    void*           pSinkManager = NULL;
8,120,348✔
657
    code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
8,120,348✔
658
    if (code != TSDB_CODE_SUCCESS) {
8,115,744!
659
      qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
×
UNCOV
660
      goto _error;
×
661
    }
662

663
    void* pSinkParam = NULL;
8,115,744✔
664
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, (*pTask), readHandle);
8,115,744✔
665
    if (code != TSDB_CODE_SUCCESS) {
8,111,693!
666
      qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
×
667
      taosMemoryFree(pSinkManager);
×
UNCOV
668
      goto _error;
×
669
    }
670

671
    SDataSinkNode* pSink = NULL;
8,111,693✔
672
    if (readHandle->localExec) {
8,111,693✔
673
      code = nodesCloneNode((SNode *)pSubplan->pDataSink, (SNode **)&pSink);
71,648✔
674
      if (code != TSDB_CODE_SUCCESS) {
71,663✔
675
        qError("failed to nodesCloneNode, srcType:%d, code:%s, %s", nodeType(pSubplan->pDataSink), tstrerror(code), (*pTask)->id.str);
482!
676
        taosMemoryFree(pSinkManager);
482!
UNCOV
677
        goto _error;
×
678
      }
679
    }
680

681
    // pSinkParam has been freed during create sinker.
682
    code = dsCreateDataSinker(pSinkManager, readHandle->localExec ? &pSink : &pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str, pSubplan->processOneBlock);
8,111,226✔
683
    if (code) {
8,112,655!
UNCOV
684
      qError("s-task:%s failed to create data sinker, code:%s", (*pTask)->id.str, tstrerror(code));
×
685
    }
686
  }
687

688
  qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64 " code:%s", taskId, pSubplan->id.queryId,
8,115,089✔
689
         tstrerror(code));
690

691
_error:
7,869,085✔
692
  // if failed to add ref for all tables in this query, abort current query
693
  return code;
8,121,282✔
694
}
695

696
static void freeBlock(void* param) {
×
697
  SSDataBlock* pBlock = *(SSDataBlock**)param;
×
698
  blockDataDestroy(pBlock);
×
UNCOV
699
}
×
700

701
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal, bool processOneBlock) {
9,818,728✔
702
  int32_t        code = TSDB_CODE_SUCCESS;
9,818,728✔
703
  int32_t        lino = 0;
9,818,728✔
704
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
9,818,728✔
705
  int64_t        threadId = taosGetSelfPthreadId();
9,818,728✔
706

707
  if (pLocal) {
9,821,465!
708
    memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
9,821,642✔
709
  }
710

711
  taosArrayClear(pResList);
9,821,465✔
712

713
  int64_t curOwner = 0;
9,811,843✔
714
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
9,811,843!
715
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
×
716
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
×
UNCOV
717
    return pTaskInfo->code;
×
718
  }
719

720
  if (pTaskInfo->cost.start == 0) {
9,830,872✔
721
    pTaskInfo->cost.start = taosGetTimestampUs();
8,079,527✔
722
  }
723

724
  if (isTaskKilled(pTaskInfo)) {
9,827,227✔
725
    atomic_store_64(&pTaskInfo->owner, 0);
6✔
726
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
6✔
727
    return pTaskInfo->code;
6✔
728
  }
729

730
  // error occurs, record the error code and return to client
731
  int32_t ret = setjmp(pTaskInfo->env);
9,821,572✔
732
  if (ret != TSDB_CODE_SUCCESS) {
9,825,981✔
733
    pTaskInfo->code = ret;
4,060✔
734
    (void)cleanUpUdfs();
4,060✔
735

736
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
4,060✔
737
    atomic_store_64(&pTaskInfo->owner, 0);
4,060✔
738

739
    return pTaskInfo->code;
4,060✔
740
  }
741

742
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
9,821,921✔
743

744
  int32_t      current = 0;
9,821,925✔
745
  SSDataBlock* pRes = NULL;
9,821,925✔
746
  int64_t      st = taosGetTimestampUs();
9,822,009✔
747

748
  if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
9,822,009!
749
    pTaskInfo->paramSet = true;
974✔
750
    code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes);
974✔
751
  } else {
752
    code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
9,821,035✔
753
  }
754

755
  QUERY_CHECK_CODE(code, lino, _end);
9,825,664!
756
  code = blockDataCheck(pRes);
9,825,664✔
757
  QUERY_CHECK_CODE(code, lino, _end);
9,825,973!
758

759
  if (pRes == NULL) {
9,825,973✔
760
    st = taosGetTimestampUs();
2,293,985✔
761
  }
762

763
  int32_t rowsThreshold = pTaskInfo->pSubplan->rowsThreshold;
9,826,313✔
764
  if (!pTaskInfo->pSubplan->dynamicRowThreshold || 4096 <= pTaskInfo->pSubplan->rowsThreshold) {
9,826,313✔
765
    rowsThreshold = 4096;
9,808,007✔
766
  }
767

768
  int32_t blockIndex = 0;
9,826,313✔
769
  while (pRes != NULL) {
25,413,020✔
770
    SSDataBlock* p = NULL;
17,333,696✔
771
    if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) {
17,333,696✔
772
      SSDataBlock* p1 = NULL;
10,554,167✔
773
      code = createOneDataBlock(pRes, true, &p1);
10,554,167✔
774
      QUERY_CHECK_CODE(code, lino, _end);
10,558,295!
775

776
      void* tmp = taosArrayPush(pTaskInfo->pResultBlockList, &p1);
10,558,295✔
777
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
10,557,974!
778
      p = p1;
10,557,974✔
779
    } else {
780
      void* tmp = taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
6,779,679✔
781
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
6,775,216!
782

783
      p = *(SSDataBlock**)tmp;
6,775,216✔
784
      code = copyDataBlock(p, pRes);
6,775,216✔
785
      QUERY_CHECK_CODE(code, lino, _end);
6,812,282!
786
    }
787

788
    blockIndex += 1;
17,370,256✔
789

790
    current += p->info.rows;
17,370,256✔
791
    QUERY_CHECK_CONDITION((p->info.rows > 0 || p->info.type == STREAM_CHECKPOINT), code, lino, _end,
17,370,256!
792
                          TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
793
    void* tmp = taosArrayPush(pResList, &p);
17,365,773✔
794
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,365,773!
795

796
    if (current >= rowsThreshold || processOneBlock) {
17,365,773!
797
      break;
798
    }
799

800
    code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
15,618,757✔
801
    QUERY_CHECK_CODE(code, lino, _end);
15,591,360!
802
    code = blockDataCheck(pRes);
15,591,360✔
803
    QUERY_CHECK_CODE(code, lino, _end);
15,586,707!
804
  }
805

806
  if (pTaskInfo->pSubplan->dynamicRowThreshold) {
9,826,340✔
807
    pTaskInfo->pSubplan->rowsThreshold -= current;
18,701✔
808
  }
809

810
  *hasMore = (pRes != NULL);
9,826,340✔
811
  uint64_t el = (taosGetTimestampUs() - st);
9,827,151✔
812

813
  pTaskInfo->cost.elapsedTime += el;
9,827,151✔
814
  if (NULL == pRes) {
9,827,151✔
815
    *useconds = pTaskInfo->cost.elapsedTime;
8,079,128✔
816
  }
817

818
_end:
1,748,023✔
819
  (void)cleanUpUdfs();
9,827,151✔
820

821
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
9,829,538✔
822
  qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
9,829,538✔
823
         GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
824

825
  atomic_store_64(&pTaskInfo->owner, 0);
9,829,539✔
826
  if (code) {
9,829,485!
827
    pTaskInfo->code = code;
×
UNCOV
828
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
829
  }
830

831
  return pTaskInfo->code;
9,829,548✔
832
}
833

834
void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
724✔
835
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
724✔
836
  SArray*        pList = pTaskInfo->pResultBlockList;
724✔
837
  size_t         num = taosArrayGetSize(pList);
724✔
838
  for (int32_t i = 0; i < num; ++i) {
1,394✔
839
    SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i);
670✔
840
    if (p) {
670!
841
      blockDataDestroy(*p);
670✔
842
    }
843
  }
844

845
  taosArrayClear(pTaskInfo->pResultBlockList);
724✔
846
}
724✔
847

848
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
781,524✔
849
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
781,524✔
850
  int64_t        threadId = taosGetSelfPthreadId();
781,524✔
851
  int64_t        curOwner = 0;
781,523✔
852

853
  *pRes = NULL;
781,523✔
854

855
  // todo extract method
856
  taosRLockLatch(&pTaskInfo->lock);
781,523✔
857
  bool isKilled = isTaskKilled(pTaskInfo);
781,551✔
858
  if (isKilled) {
781,508!
859
    clearStreamBlock(pTaskInfo->pRoot);
×
UNCOV
860
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
×
861

862
    taosRUnLockLatch(&pTaskInfo->lock);
×
UNCOV
863
    return pTaskInfo->code;
×
864
  }
865

866
  if (pTaskInfo->owner != 0) {
781,508!
867
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
×
UNCOV
868
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
×
869

870
    taosRUnLockLatch(&pTaskInfo->lock);
×
UNCOV
871
    return pTaskInfo->code;
×
872
  }
873

874
  pTaskInfo->owner = threadId;
781,508✔
875
  taosRUnLockLatch(&pTaskInfo->lock);
781,508✔
876

877
  if (pTaskInfo->cost.start == 0) {
781,537✔
878
    pTaskInfo->cost.start = taosGetTimestampUs();
7,784✔
879
  }
880

881
  // error occurs, record the error code and return to client
882
  int32_t ret = setjmp(pTaskInfo->env);
781,537✔
883
  if (ret != TSDB_CODE_SUCCESS) {
781,518✔
884
    pTaskInfo->code = ret;
21✔
885
    (void)cleanUpUdfs();
21✔
886
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
21✔
887
    atomic_store_64(&pTaskInfo->owner, 0);
21✔
888
    return pTaskInfo->code;
21✔
889
  }
890

891
  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
781,497✔
892

893
  int64_t st = taosGetTimestampUs();
781,519✔
894

895
  int32_t code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, pRes);
781,519✔
896
  if (code) {
781,228!
897
    pTaskInfo->code = code;
×
UNCOV
898
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
899
  }
900

901
  code = blockDataCheck(*pRes);
781,228✔
902
  if (code) {
781,270!
903
    pTaskInfo->code = code;
×
UNCOV
904
    qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
×
905
  }
906

907
  uint64_t el = (taosGetTimestampUs() - st);
781,268✔
908

909
  pTaskInfo->cost.elapsedTime += el;
781,268✔
910
  if (NULL == *pRes) {
781,268✔
911
    *useconds = pTaskInfo->cost.elapsedTime;
145,513✔
912
  }
913

914
  (void) cleanUpUdfs();
781,268✔
915

916
  int32_t  current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
781,532✔
917
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
781,532✔
918

919
  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
781,532✔
920
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
921

922
  atomic_store_64(&pTaskInfo->owner, 0);
781,525✔
923
  return pTaskInfo->code;
781,534✔
924
}
925

926
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
3,639,533✔
927
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
3,639,533✔
928
  void* tmp = taosArrayPush(pTaskInfo->stopInfo.pStopInfo, pInfo);
3,639,704✔
929
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
3,639,653✔
930

931
  if (!tmp) {
3,639,730✔
932
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
115!
933
    return terrno;
115✔
934
  }
935
  return TSDB_CODE_SUCCESS;
3,639,615✔
936
}
937

938
int32_t stopInfoComp(void const* lp, void const* rp) {
×
939
  SExchangeOpStopInfo* key = (SExchangeOpStopInfo*)lp;
×
UNCOV
940
  SExchangeOpStopInfo* pInfo = (SExchangeOpStopInfo*)rp;
×
941

942
  if (key->refId < pInfo->refId) {
×
943
    return -1;
×
944
  } else if (key->refId > pInfo->refId) {
×
UNCOV
945
    return 1;
×
946
  }
947

UNCOV
948
  return 0;
×
949
}
950

951
void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
×
952
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
×
953
  int32_t idx = taosArraySearchIdx(pTaskInfo->stopInfo.pStopInfo, pInfo, stopInfoComp, TD_EQ);
×
954
  if (idx >= 0) {
×
UNCOV
955
    taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx);
×
956
  }
957
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
×
UNCOV
958
}
×
959

960
void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
3,100✔
961
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
3,100✔
962

963
  int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo);
3,099✔
964
  for (int32_t i = 0; i < num; ++i) {
3,114✔
965
    SExchangeOpStopInfo* pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i);
15✔
966
    if (!pStop) {
15!
967
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
968
      continue;
×
969
    }
970
    SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId);
15✔
971
    if (pExchangeInfo) {
15!
972
      int32_t code = tsem_post(&pExchangeInfo->ready);
15✔
973
      if (code != TSDB_CODE_SUCCESS) {
15!
UNCOV
974
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
975
      }
976
      code = taosReleaseRef(exchangeObjRefPool, pStop->refId);
15✔
977
      if (code != TSDB_CODE_SUCCESS) {
15!
UNCOV
978
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
979
      }
980
    }
981
  }
982

983
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
3,099✔
984
}
3,098✔
985

986
int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
3,100✔
987
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
3,100✔
988
  if (pTaskInfo == NULL) {
3,100!
UNCOV
989
    return TSDB_CODE_QRY_INVALID_QHANDLE;
×
990
  }
991

992
  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
3,100✔
993

994
  setTaskKilled(pTaskInfo, rspCode);
3,100✔
995
  qStopTaskOperators(pTaskInfo);
3,100✔
996

997
  return TSDB_CODE_SUCCESS;
3,097✔
998
}
999

1000
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration) {
7,547✔
1001
  int64_t        st = taosGetTimestampMs();
7,550✔
1002
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
7,550✔
1003
  if (pTaskInfo == NULL) {
7,550✔
1004
    return TSDB_CODE_QRY_INVALID_QHANDLE;
30✔
1005
  }
1006

1007
  if (waitDuration > 0) {
7,520✔
1008
    qDebug("%s sync killed execTask, and waiting for %.2fs", GET_TASKID(pTaskInfo), waitDuration/1000.0);
1,599✔
1009
  } else {
1010
    qDebug("%s async killed execTask", GET_TASKID(pTaskInfo));
5,921✔
1011
  }
1012

1013
  setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
7,520✔
1014

1015
  if (waitDuration > 0) {
7,519✔
1016
    while (1) {
1017
      taosWLockLatch(&pTaskInfo->lock);
1,602✔
1018
      if (qTaskIsExecuting(pTaskInfo)) {  // let's wait for 100 ms and try again
1,603✔
1019
        taosWUnLockLatch(&pTaskInfo->lock);
4✔
1020

1021
        taosMsleep(200);
4✔
1022

1023
        int64_t d = taosGetTimestampMs() - st;
3✔
1024
        if (d >= waitDuration && waitDuration >= 0) {
3!
1025
          qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitDuration / 1000.0);
×
UNCOV
1026
          return TSDB_CODE_SUCCESS;
×
1027
        }
1028
      } else {  // not running now
1029
        pTaskInfo->code = rspCode;
1,599✔
1030
        taosWUnLockLatch(&pTaskInfo->lock);
1,599✔
1031
        return TSDB_CODE_SUCCESS;
1,599✔
1032
      }
1033
    }
1034
  }
1035

1036
  return TSDB_CODE_SUCCESS;
5,920✔
1037
}
1038

1039
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
1,603✔
1040
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
1,603✔
1041
  if (NULL == pTaskInfo) {
1,603!
UNCOV
1042
    return false;
×
1043
  }
1044

1045
  return 0 != atomic_load_64(&pTaskInfo->owner);
1,603✔
1046
}
1047

1048
static void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
8,136,954✔
1049
  STaskCostInfo* pSummary = &pTaskInfo->cost;
8,136,954✔
1050
  int64_t        idleTime = pSummary->start - pSummary->created;
8,136,954✔
1051

1052
  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
8,136,954✔
1053
  if (pSummary->pRecoder != NULL) {
8,136,954✔
1054
    qDebug(
4,763,823✔
1055
        "%s :cost summary: idle:%.2f ms, elapsed time:%.2f ms, extract tableList:%.2f ms, "
1056
        "createGroupIdMap:%.2f ms, total blocks:%d, "
1057
        "load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
1058
        GET_TASKID(pTaskInfo), idleTime / 1000.0, pSummary->elapsedTime / 1000.0, pSummary->extractListTime,
1059
        pSummary->groupIdMapTime, pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks,
1060
        pRecorder->totalRows, pRecorder->totalCheckedRows);
1061
  } else {
1062
    qDebug("%s :cost summary: idle in queue:%.2f ms, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), idleTime / 1000.0,
3,373,131✔
1063
           pSummary->elapsedTime / 1000.0);
1064
  }
1065
}
8,136,954✔
1066

1067
void qDestroyTask(qTaskInfo_t qTaskHandle) {
8,140,664✔
1068
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
8,140,664✔
1069
  if (pTaskInfo == NULL) {
8,140,664✔
1070
    return;
2,967✔
1071
  }
1072

1073
  if (pTaskInfo->pRoot != NULL) {
8,137,697!
1074
    qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
8,138,540✔
1075
  } else {
UNCOV
1076
    qDebug("%s execTask completed", GET_TASKID(pTaskInfo));
×
1077
  }
1078

1079
  printTaskExecCostInLog(pTaskInfo);  // print the query cost summary
8,137,697✔
1080
  doDestroyTask(pTaskInfo);
8,137,024✔
1081
}
1082

1083
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
562,755✔
1084
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
562,755✔
1085
  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
562,755✔
1086
}
1087

1088
void qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
1,237✔
1089
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1,237✔
1090
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
1,237✔
1091

1092
  while (1) {
1,116✔
1093
    uint16_t type = pOperator->operatorType;
2,353✔
1094
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
2,353✔
1095
      *scanner = pOperator->info;
1,237✔
1096
      break;
1,237✔
1097
    } else {
1098
      pOperator = pOperator->pDownstream[0];
1,116✔
1099
    }
1100
  }
1101
}
1,237✔
1102

1103
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange* pVerRange, STimeWindow* pWindow) {
2,351✔
1104
  int32_t        code = TSDB_CODE_SUCCESS;
2,351✔
1105
  int32_t        lino = 0;
2,351✔
1106
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2,351✔
1107
  QUERY_CHECK_CONDITION((pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM), code, lino, _end,
2,351!
1108
                        TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
1109

1110
  SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
2,351✔
1111

1112
  pStreamInfo->fillHistoryVer = *pVerRange;
2,351✔
1113
  pStreamInfo->fillHistoryWindow = *pWindow;
2,351✔
1114
  pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE1;
2,351✔
1115

1116
  qDebug("%s step 1. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64
2,351✔
1117
         ", window:%" PRId64 " - %" PRId64,
1118
         GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
1119
         pWindow->ekey);
1120
_end:
924✔
1121
  if (code != TSDB_CODE_SUCCESS) {
2,351!
UNCOV
1122
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1123
  }
1124
  return code;
2,351✔
1125
}
1126

1127
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange* pVerRange, STimeWindow* pWindow) {
3,501✔
1128
  int32_t        code = TSDB_CODE_SUCCESS;
3,501✔
1129
  int32_t        lino = 0;
3,501✔
1130
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
3,501✔
1131
  QUERY_CHECK_CONDITION((pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM), code, lino, _end,
3,501!
1132
                        TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
1133

1134
  SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
3,501✔
1135

1136
  pStreamInfo->fillHistoryVer = *pVerRange;
3,501✔
1137
  pStreamInfo->fillHistoryWindow = *pWindow;
3,501✔
1138
  pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2;
3,501✔
1139

1140
  qDebug("%s step 2. set param for stream scanner scan wal, verRange:%" PRId64 "-%" PRId64 ", window:%" PRId64
3,501✔
1141
         "-%" PRId64,
1142
         GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
1143
         pWindow->ekey);
1144
_end:
1,217✔
1145
  if (code != TSDB_CODE_SUCCESS) {
3,501!
UNCOV
1146
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1147
  }
1148
  return code;
3,501✔
1149
}
1150

1151
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) {
×
1152
  int32_t        code = TSDB_CODE_SUCCESS;
×
1153
  int32_t        lino = 0;
×
1154
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
×
UNCOV
1155
  QUERY_CHECK_CONDITION((pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM), code, lino, _end,
×
1156
                        TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
UNCOV
1157
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
×
1158

1159
_end:
×
1160
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1161
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1162
  }
UNCOV
1163
  return code;
×
1164
}
1165

1166
static int32_t getOpratorIntervalInfo(SOperatorInfo* pOperator, int64_t* pWaterMark, SInterval* pInterval, STimeWindow* pLastWindow, TSKEY* pRecInteral) {
526✔
1167
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
526✔
1168
    return getOpratorIntervalInfo(pOperator->pDownstream[0], pWaterMark, pInterval, pLastWindow, pRecInteral);
340✔
1169
  }
1170
  SStreamScanInfo* pScanOp = (SStreamScanInfo*) pOperator->info;
186✔
1171
  *pWaterMark = pScanOp->twAggSup.waterMark;
186✔
1172
  *pInterval = pScanOp->interval;
186✔
1173
  *pLastWindow = pScanOp->lastScanRange;
186✔
1174
  *pRecInteral = pScanOp->recalculateInterval;
186✔
1175
  return TSDB_CODE_SUCCESS; 
186✔
1176
}
1177

1178
int32_t qGetStreamIntervalExecInfo(qTaskInfo_t tinfo, int64_t* pWaterMark, SInterval* pInterval, STimeWindow* pLastWindow, TSKEY* pRecInteral) {
186✔
1179
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
186✔
1180
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
186✔
1181
  return getOpratorIntervalInfo(pOperator, pWaterMark, pInterval, pLastWindow, pRecInteral);
186✔
1182
}
1183

1184
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
2,417✔
1185
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2,417✔
1186
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
2,417✔
1187

1188
  while (1) {
3,368✔
1189
    int32_t type = pOperator->operatorType;
5,785✔
1190
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
5,785✔
1191
        type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL ||
4,496✔
1192
        type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
1,299✔
1193
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;
1,299✔
1194
      STimeWindowAggSupp*          pSup = &pInfo->twAggSup;
1,299✔
1195

1196
      qInfo("save stream param for interval: %d,  %" PRId64, pSup->calTrigger, pSup->deleteMark);
1,299!
1197

1198
      pSup->calTriggerSaved = pSup->calTrigger;
1,299✔
1199
      pSup->deleteMarkSaved = pSup->deleteMark;
1,299✔
1200
      pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
1,299✔
1201
      pSup->deleteMark = INT64_MAX;
1,299✔
1202
      pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
1,299✔
1203
      pInfo->ignoreExpiredData = false;
1,299✔
1204
    } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
4,486✔
1205
               type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
4,300✔
1206
               type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
191✔
1207
      SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
191✔
1208
      STimeWindowAggSupp*            pSup = &pInfo->twAggSup;
191✔
1209

1210
      qInfo("save stream param for session: %d,  %" PRId64, pSup->calTrigger, pSup->deleteMark);
191!
1211

1212
      pSup->calTriggerSaved = pSup->calTrigger;
191✔
1213
      pSup->deleteMarkSaved = pSup->deleteMark;
191✔
1214
      pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
191✔
1215
      pSup->deleteMark = INT64_MAX;
191✔
1216
      pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
191✔
1217
      pInfo->ignoreExpiredData = false;
191✔
1218
    } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
4,295✔
1219
      SStreamStateAggOperatorInfo* pInfo = pOperator->info;
87✔
1220
      STimeWindowAggSupp*          pSup = &pInfo->twAggSup;
87✔
1221

1222
      qInfo("save stream param for state: %d,  %" PRId64, pSup->calTrigger, pSup->deleteMark);
87!
1223

1224
      pSup->calTriggerSaved = pSup->calTrigger;
87✔
1225
      pSup->deleteMarkSaved = pSup->deleteMark;
87✔
1226
      pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
87✔
1227
      pSup->deleteMark = INT64_MAX;
87✔
1228
      pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
87✔
1229
      pInfo->ignoreExpiredData = false;
87✔
1230
    } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT) {
4,208✔
1231
      SStreamEventAggOperatorInfo* pInfo = pOperator->info;
73✔
1232
      STimeWindowAggSupp*          pSup = &pInfo->twAggSup;
73✔
1233

1234
      qInfo("save stream param for state: %d,  %" PRId64, pSup->calTrigger, pSup->deleteMark);
73!
1235

1236
      pSup->calTriggerSaved = pSup->calTrigger;
73✔
1237
      pSup->deleteMarkSaved = pSup->deleteMark;
73✔
1238
      pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
73✔
1239
      pSup->deleteMark = INT64_MAX;
73✔
1240
      pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
73✔
1241
      pInfo->ignoreExpiredData = false;
73✔
1242
    } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT) {
4,135✔
1243
      SStreamCountAggOperatorInfo* pInfo = pOperator->info;
79✔
1244
      STimeWindowAggSupp*          pSup = &pInfo->twAggSup;
79✔
1245

1246
      qInfo("save stream param for state: %d,  %" PRId64, pSup->calTrigger, pSup->deleteMark);
79!
1247

1248
      pSup->calTriggerSaved = pSup->calTrigger;
79✔
1249
      pSup->deleteMarkSaved = pSup->deleteMark;
79✔
1250
      pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
79✔
1251
      pSup->deleteMark = INT64_MAX;
79✔
1252
      pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
79✔
1253
      pInfo->ignoreExpiredData = false;
79✔
1254
      qInfo("save stream task:%s, param for state: %d", GET_TASKID(pTaskInfo), pInfo->ignoreExpiredData);
79!
1255
    } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC) {
4,056✔
1256
      SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
8✔
1257
      STimeWindowAggSupp*           pSup = &pInfo->twAggSup;
8✔
1258

1259
      qInfo("save stream param for state: %d,  %" PRId64, pSup->calTrigger, pSup->deleteMark);
8!
1260

1261
      pSup->calTriggerSaved = pSup->calTrigger;
8✔
1262
      pSup->deleteMarkSaved = pSup->deleteMark;
8✔
1263
      pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
8✔
1264
      pSup->deleteMark = INT64_MAX;
8✔
1265
      pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
8✔
1266
      pInfo->ignoreExpiredData = false;
8✔
1267
      qInfo("save stream task:%s, param for state: %d", GET_TASKID(pTaskInfo), pInfo->ignoreExpiredData);
8!
1268
    }
1269

1270
    // iterate operator tree
1271
    if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
5,785!
1272
      if (pOperator->numOfDownstream > 1) {
2,417!
UNCOV
1273
        qError("unexpected stream, multiple downstream");
×
UNCOV
1274
        return -1;
×
1275
      }
1276
      return 0;
2,417✔
1277
    } else {
1278
      pOperator = pOperator->pDownstream[0];
3,368✔
1279
    }
1280
  }
1281

1282
  return 0;
1283
}
1284

1285
bool qStreamScanhistoryFinished(qTaskInfo_t tinfo) {
2,350✔
1286
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2,350✔
1287
  return pTaskInfo->streamInfo.recoverScanFinished;
2,350✔
1288
}
1289

1290
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) {
9,736✔
1291
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
9,736✔
1292
  STimeWindow*   pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
9,736✔
1293

1294
  qDebug("%s remove timeWindow filter:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64,
9,736✔
1295
         GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX);
1296

1297
  pWindow->skey = INT64_MIN;
9,735✔
1298
  pWindow->ekey = INT64_MAX;
9,735✔
1299
  return 0;
9,735✔
1300
}
1301

1302
void* qExtractReaderFromStreamScanner(void* scanner) {
1,242✔
1303
  SStreamScanInfo* pInfo = scanner;
1,242✔
1304
  return (void*)pInfo->tqReader;
1,242✔
1305
}
1306

1307
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) {
2,525✔
1308
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2,525✔
1309
  return pTaskInfo->streamInfo.schema;
2,525✔
1310
}
1311

1312
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
2,525✔
1313
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2,525✔
1314
  return pTaskInfo->streamInfo.tbName;
2,525✔
1315
}
1316

1317
SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
2,631✔
1318
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2,631✔
1319
  return &pTaskInfo->streamInfo.btMetaRsp;
2,631✔
1320
}
1321

1322
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
100,704✔
1323
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
100,704✔
1324
  tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset);
100,704✔
1325
  return 0;
100,704✔
1326
  /*if (code != TSDB_CODE_SUCCESS) {
1327
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
1328
    pTaskInfo->code = code;
1329
    T_LONG_JMP(pTaskInfo->env, code);
1330
  }*/
1331
}
1332

1333
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
2,506✔
1334
  memset(pCond, 0, sizeof(SQueryTableDataCond));
2,506✔
1335
  pCond->order = TSDB_ORDER_ASC;
2,506✔
1336
  pCond->numOfCols = pMtInfo->schema->nCols;
2,506✔
1337
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
2,506!
1338
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
2,507!
1339
  if (pCond->colList == NULL || pCond->pSlotList == NULL) {
2,505!
UNCOV
1340
    taosMemoryFreeClear(pCond->colList);
×
UNCOV
1341
    taosMemoryFreeClear(pCond->pSlotList);
×
UNCOV
1342
    return terrno;
×
1343
  }
1344

1345
  TAOS_SET_OBJ_ALIGNED(&pCond->twindows, TSWINDOW_INITIALIZER);
2,505✔
1346
  pCond->suid = pMtInfo->suid;
2,505✔
1347
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
2,505✔
1348
  pCond->startVersion = -1;
2,505✔
1349
  pCond->endVersion = sContext->snapVersion;
2,505✔
1350

1351
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
15,799✔
1352
    SColumnInfo* pColInfo = &pCond->colList[i];
13,294✔
1353
    pColInfo->type = pMtInfo->schema->pSchema[i].type;
13,294✔
1354
    pColInfo->bytes = pMtInfo->schema->pSchema[i].bytes;
13,294✔
1355
    pColInfo->colId = pMtInfo->schema->pSchema[i].colId;
13,294✔
1356
    pColInfo->pk = pMtInfo->schema->pSchema[i].flags & COL_IS_KEY;
13,294✔
1357

1358
    pCond->pSlotList[i] = i;
13,294✔
1359
  }
1360

1361
  return TSDB_CODE_SUCCESS;
2,505✔
1362
}
1363

1364
void qStreamSetOpen(qTaskInfo_t tinfo) {
593,998✔
1365
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
593,998✔
1366
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
593,998✔
1367
  pOperator->status = OP_NOT_OPENED;
593,998✔
1368
}
593,998✔
1369

1370
void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded) {
98,015✔
1371
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
98,015✔
1372
  pTaskInfo->streamInfo.sourceExcluded = sourceExcluded;
98,015✔
1373
}
98,015✔
1374

1375
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
255,671✔
1376
  int32_t        code = TSDB_CODE_SUCCESS;
255,671✔
1377
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
255,671✔
1378
  SStorageAPI*   pAPI = &pTaskInfo->storageAPI;
255,671✔
1379

1380
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
255,671✔
1381
  const char*    id = GET_TASKID(pTaskInfo);
255,671✔
1382

1383
  if (subType == TOPIC_SUB_TYPE__COLUMN && pOffset->type == TMQ_OFFSET__LOG) {
255,671✔
1384
    code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
96,947✔
1385
    if (pOperator == NULL || code != 0) {
96,947!
UNCOV
1386
      return code;
×
1387
    }
1388

1389
    SStreamScanInfo* pInfo = pOperator->info;
96,947✔
1390
    SStoreTqReader*  pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
96,947✔
1391
    SWalReader*      pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
96,947✔
1392
    walReaderVerifyOffset(pWalReader, pOffset);
96,947✔
1393
  }
1394
  // if pOffset equal to current offset, means continue consume
1395
  if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) {
255,671✔
1396
    return 0;
91,509✔
1397
  }
1398

1399
  if (subType == TOPIC_SUB_TYPE__COLUMN) {
164,161✔
1400
    code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
161,606✔
1401
    if (pOperator == NULL || code != 0) {
161,606!
UNCOV
1402
      return code;
×
1403
    }
1404

1405
    SStreamScanInfo* pInfo = pOperator->info;
161,606✔
1406
    STableScanInfo*  pScanInfo = pInfo->pTableScanOp->info;
161,606✔
1407
    STableScanBase*  pScanBaseInfo = &pScanInfo->base;
161,606✔
1408
    STableListInfo*  pTableListInfo = pScanBaseInfo->pTableListInfo;
161,606✔
1409

1410
    if (pOffset->type == TMQ_OFFSET__LOG) {
161,606✔
1411
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pScanBaseInfo->dataReader);
6,505✔
1412
      pScanBaseInfo->dataReader = NULL;
6,505✔
1413

1414
      SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
6,505✔
1415
      SWalReader*     pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
6,505✔
1416
      walReaderVerifyOffset(pWalReader, pOffset);
6,505✔
1417
      code = pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version, id);
6,505✔
1418
      if (code < 0) {
6,505✔
1419
        qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version, id);
61!
1420
        return code;
61✔
1421
      }
1422
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
155,101!
1423
      // iterate all tables from tableInfoList, and retrieve rows from each table one-by-one
1424
      // those data are from the snapshot in tsdb, besides the data in the wal file.
1425
      int64_t uid = pOffset->uid;
155,101✔
1426
      int64_t ts = pOffset->ts;
155,101✔
1427
      int32_t index = 0;
155,101✔
1428

1429
      // this value may be changed if new tables are created
1430
      taosRLockLatch(&pTaskInfo->lock);
155,101✔
1431
      int32_t numOfTables = 0;
155,101✔
1432
      code = tableListGetSize(pTableListInfo, &numOfTables);
155,101✔
1433
      if (code != TSDB_CODE_SUCCESS) {
155,101!
UNCOV
1434
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
1435
        taosRUnLockLatch(&pTaskInfo->lock);
×
1436
        return code;
154,921✔
1437
      }
1438

1439
      if (uid == 0) {
155,101✔
1440
        if (numOfTables != 0) {
155,080✔
1441
          STableKeyInfo* tmp = tableListGetInfo(pTableListInfo, 0);
159✔
1442
          if (!tmp) {
159!
1443
            qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
1444
            taosRUnLockLatch(&pTaskInfo->lock);
×
UNCOV
1445
            return terrno;
×
1446
          }
1447
          if (tmp) uid = tmp->uid;
159!
1448
          ts = INT64_MIN;
159✔
1449
          pScanInfo->currentTable = 0;
159✔
1450
        } else {
1451
          taosRUnLockLatch(&pTaskInfo->lock);
154,921✔
1452
          qError("no table in table list, %s", id);
154,921!
1453
          return TSDB_CODE_TMQ_NO_TABLE_QUALIFIED;
154,921✔
1454
        }
1455
      }
1456
      pTaskInfo->storageAPI.tqReaderFn.tqSetTablePrimaryKey(pInfo->tqReader, uid);
180✔
1457

1458
      qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% " PRId64 " rows returned", uid, ts,
180✔
1459
             pInfo->pTableScanOp->resultInfo.totalRows);
1460
      pInfo->pTableScanOp->resultInfo.totalRows = 0;
180✔
1461

1462
      // start from current accessed position
1463
      // we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start
1464
      // position, let's find it from the beginning.
1465
      index = tableListFind(pTableListInfo, uid, 0);
180✔
1466
      taosRUnLockLatch(&pTaskInfo->lock);
180✔
1467

1468
      if (index >= 0) {
180!
1469
        pScanInfo->currentTable = index;
180✔
1470
      } else {
1471
        qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
×
1472
               numOfTables, pScanInfo->currentTable, id);
UNCOV
1473
        return TSDB_CODE_TMQ_NO_TABLE_QUALIFIED;
×
1474
      }
1475

1476
      STableKeyInfo keyInfo = {.uid = uid};
180✔
1477
      int64_t       oldSkey = pScanBaseInfo->cond.twindows.skey;
180✔
1478

1479
      // let's start from the next ts that returned to consumer.
1480
      if (pTaskInfo->storageAPI.tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader)) {
180!
UNCOV
1481
        pScanBaseInfo->cond.twindows.skey = ts;
×
1482
      } else {
1483
        pScanBaseInfo->cond.twindows.skey = ts + 1;
180✔
1484
      }
1485
      pScanInfo->scanTimes = 0;
180✔
1486

1487
      if (pScanBaseInfo->dataReader == NULL) {
180✔
1488
        code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond,
161✔
1489
                                                             &keyInfo, 1, pScanInfo->pResBlock,
1490
                                                             (void**)&pScanBaseInfo->dataReader, id, NULL);
161✔
1491
        if (code != TSDB_CODE_SUCCESS) {
161!
UNCOV
1492
          qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
×
UNCOV
1493
          return code;
×
1494
        }
1495

1496
        qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s",
161✔
1497
               uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
1498
      } else {
1499
        code = pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
19✔
1500
        if (code != TSDB_CODE_SUCCESS) {
19!
UNCOV
1501
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
1502
          return code;
×
1503
        }
1504

1505
        code = pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
19✔
1506
        if (code != TSDB_CODE_SUCCESS) {
19!
UNCOV
1507
          qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
1508
          return code;
×
1509
        }
1510
        qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 "  table index:%d numOfTable:%d, %s",
19!
1511
               uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
1512
      }
1513

1514
      // restore the key value
1515
      pScanBaseInfo->cond.twindows.skey = oldSkey;
180✔
1516
    } else {
UNCOV
1517
      qError("invalid pOffset->type:%d, %s", pOffset->type, id);
×
UNCOV
1518
      return TSDB_CODE_PAR_INTERNAL_ERROR;
×
1519
    }
1520

1521
  } else {  // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB
1522
    if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
2,555✔
1523
      SStreamRawScanInfo* pInfo = pOperator->info;
2,511✔
1524
      SSnapContext*       sContext = pInfo->sContext;
2,511✔
1525
      SOperatorInfo*      p = NULL;
2,511✔
1526

1527
      code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id, &p);
2,511✔
1528
      if (code != 0) {
2,509!
UNCOV
1529
        return code;
×
1530
      }
1531

1532
      STableListInfo* pTableListInfo = ((SStreamRawScanInfo*)(p->info))->pTableListInfo;
2,509✔
1533

1534
      if (pAPI->snapshotFn.setForSnapShot(sContext, pOffset->uid) != 0) {
2,509!
UNCOV
1535
        qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id);
×
UNCOV
1536
        return TSDB_CODE_PAR_INTERNAL_ERROR;
×
1537
      }
1538

1539
      SMetaTableInfo mtInfo = {0};
2,507✔
1540
      code = pTaskInfo->storageAPI.snapshotFn.getMetaTableInfoFromSnapshot(sContext, &mtInfo);
2,507✔
1541
      if (code != 0) {
2,510!
1542
        destroyMetaTableInfo(&mtInfo);
UNCOV
1543
        return code;
×
1544
      }
1545
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
2,510✔
1546
      pInfo->dataReader = NULL;
2,511✔
1547

1548
      cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
2,511✔
1549
      tableListClear(pTableListInfo);
2,511✔
1550

1551
      if (mtInfo.uid == 0) {
2,512✔
1552
        destroyMetaTableInfo(&mtInfo);
1553
        goto end;  // no data
5✔
1554
      }
1555

1556
      pAPI->snapshotFn.taosXSetTablePrimaryKey(sContext, mtInfo.uid);
2,507✔
1557
      code = initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
2,506✔
1558
      if (code != TSDB_CODE_SUCCESS) {
2,506!
UNCOV
1559
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1560
        destroyMetaTableInfo(&mtInfo);
1561
        return code;
×
1562
      }
1563
      if (pAPI->snapshotFn.taosXGetTablePrimaryKey(sContext)) {
2,506!
UNCOV
1564
        pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
×
1565
      } else {
1566
        pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts + 1;
2,506✔
1567
      }
1568

1569
      code = tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0);
2,506✔
1570
      if (code != TSDB_CODE_SUCCESS) {
2,507!
1571
        destroyMetaTableInfo(&mtInfo);
UNCOV
1572
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
1573
        return code;
×
1574
      }
1575

1576
      STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
2,507✔
1577
      if (!pList) {
2,506!
UNCOV
1578
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1579
        destroyMetaTableInfo(&mtInfo);
UNCOV
1580
        return code;
×
1581
      }
1582
      int32_t size = 0;
2,506✔
1583
      code = tableListGetSize(pTableListInfo, &size);
2,506✔
1584
      if (code != TSDB_CODE_SUCCESS) {
2,501!
UNCOV
1585
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1586
        destroyMetaTableInfo(&mtInfo);
UNCOV
1587
        return code;
×
1588
      }
1589

1590
      code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size,
2,501✔
1591
                                                           NULL, (void**)&pInfo->dataReader, NULL, NULL);
2,501✔
1592
      if (code != TSDB_CODE_SUCCESS) {
2,506!
UNCOV
1593
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1594
        destroyMetaTableInfo(&mtInfo);
UNCOV
1595
        return code;
×
1596
      }
1597

1598
      cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
2,506✔
1599
      tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN);
2,507✔
1600
//      pTaskInfo->streamInfo.suid = mtInfo.suid == 0 ? mtInfo.uid : mtInfo.suid;
1601
      tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema);
2,507✔
1602
      pTaskInfo->streamInfo.schema = mtInfo.schema;
2,506✔
1603

1604
      qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64 " %s", mtInfo.uid, pOffset->ts, id);
2,506✔
1605
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
44✔
1606
      SStreamRawScanInfo* pInfo = pOperator->info;
16✔
1607
      SSnapContext*       sContext = pInfo->sContext;
16✔
1608
      code = pTaskInfo->storageAPI.snapshotFn.setForSnapShot(sContext, pOffset->uid);
16✔
1609
      if (code != 0) {
16!
UNCOV
1610
        qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
×
UNCOV
1611
        return code;
×
1612
      }
1613
      qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts,
16!
1614
             id);
1615
    } else if (pOffset->type == TMQ_OFFSET__LOG) {
28!
1616
      SStreamRawScanInfo* pInfo = pOperator->info;
29✔
1617
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
29✔
1618
      pInfo->dataReader = NULL;
29✔
1619
      qDebug("tmqsnap qStreamPrepareScan snapshot log, %s", id);
29✔
1620
    }
1621
  }
1622

1623
end:
12✔
1624
  tOffsetCopy(&pTaskInfo->streamInfo.currentOffset, pOffset);
9,179✔
1625
  return 0;
9,179✔
1626
}
1627

1628
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
5,511,254✔
1629
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
5,511,254✔
1630
  if (pMsg->info.ahandle == NULL) {
5,511,254!
UNCOV
1631
    qError("pMsg->info.ahandle is NULL");
×
UNCOV
1632
    return;
×
1633
  }
1634

1635
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};
5,511,254✔
1636

1637
  if (pMsg->contLen > 0) {
5,511,254!
1638
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
5,512,827!
1639
    if (buf.pData == NULL) {
5,510,435!
UNCOV
1640
      pMsg->code = terrno;
×
1641
    } else {
1642
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
5,510,435✔
1643
    }
1644
  }
1645

1646
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
5,508,862✔
1647
  rpcFreeCont(pMsg->pCont);
5,518,074✔
1648
  destroySendMsgInfo(pSendInfo);
5,518,122✔
1649
}
1650

1651
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
×
1652
  int32_t        code = TSDB_CODE_SUCCESS;
×
UNCOV
1653
  int32_t        lino = 0;
×
1654
  SExecTaskInfo* pTaskInfo = tinfo;
×
1655
  SArray*        plist = NULL;
×
1656

UNCOV
1657
  code = getTableListInfo(pTaskInfo, &plist);
×
UNCOV
1658
  if (code || plist == NULL) {
×
UNCOV
1659
    return NULL;
×
1660
  }
1661

1662
  // only extract table in the first elements
1663
  STableListInfo* pTableListInfo = taosArrayGetP(plist, 0);
×
1664

1665
  SArray* pUidList = taosArrayInit(10, sizeof(uint64_t));
×
1666
  QUERY_CHECK_NULL(pUidList, code, lino, _end, terrno);
×
1667

UNCOV
1668
  int32_t numOfTables = 0;
×
1669
  code = tableListGetSize(pTableListInfo, &numOfTables);
×
1670
  QUERY_CHECK_CODE(code, lino, _end);
×
1671

1672
  for (int32_t i = 0; i < numOfTables; ++i) {
×
1673
    STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
×
UNCOV
1674
    QUERY_CHECK_NULL(pKeyInfo, code, lino, _end, terrno);
×
UNCOV
1675
    void* tmp = taosArrayPush(pUidList, &pKeyInfo->uid);
×
1676
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1677
  }
1678

1679
  taosArrayDestroy(plist);
×
1680

1681
_end:
×
UNCOV
1682
  if (code != TSDB_CODE_SUCCESS) {
×
1683
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1684
    T_LONG_JMP(pTaskInfo->env, code);
×
1685
  }
UNCOV
1686
  return pUidList;
×
1687
}
1688

1689
static int32_t extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
134,467✔
1690
  int32_t        code = TSDB_CODE_SUCCESS;
134,467✔
1691
  int32_t        lino = 0;
134,467✔
1692
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
134,467✔
1693

1694
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
134,467!
1695
    SStreamScanInfo* pScanInfo = pOperator->info;
×
1696
    STableScanInfo*  pTableScanInfo = pScanInfo->pTableScanOp->info;
×
1697

UNCOV
1698
    void* tmp = taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
×
UNCOV
1699
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1700
  } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
134,467✔
1701
    STableScanInfo* pScanInfo = pOperator->info;
67,235✔
1702

1703
    void* tmp = taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
67,235✔
1704
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
67,235!
1705
  } else {
1706
    if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
67,232!
1707
      code = extractTableList(pList, pOperator->pDownstream[0]);
67,235✔
1708
    }
1709
  }
1710

UNCOV
1711
_end:
×
1712
  if (code != TSDB_CODE_SUCCESS) {
134,466!
UNCOV
1713
    qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
×
1714
  }
1715
  return code;
134,465✔
1716
}
1717

1718
int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList) {
67,235✔
1719
  if (pList == NULL) {
67,235!
UNCOV
1720
    return TSDB_CODE_INVALID_PARA;
×
1721
  }
1722

1723
  *pList = NULL;
67,235✔
1724
  SArray* pArray = taosArrayInit(0, POINTER_BYTES);
67,235✔
1725
  if (pArray == NULL) {
67,238!
UNCOV
1726
    return terrno;
×
1727
  }
1728

1729
  int32_t code = extractTableList(pArray, pTaskInfo->pRoot);
67,238✔
1730
  if (code == 0) {
67,234!
1731
    *pList = pArray;
67,234✔
1732
  } else {
UNCOV
1733
    taosArrayDestroy(pArray);
×
1734
  }
1735
  return code;
67,234✔
1736
}
1737

1738
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) {
2,226✔
1739
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
2,226✔
1740
  if (pTaskInfo->pRoot->fpSet.releaseStreamStateFn != NULL) {
2,226!
1741
    pTaskInfo->pRoot->fpSet.releaseStreamStateFn(pTaskInfo->pRoot);
2,226✔
1742
  }
1743
  return 0;
2,226✔
1744
}
1745

1746
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo) {
2,226✔
1747
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
2,226✔
1748
  if (pTaskInfo->pRoot->fpSet.reloadStreamStateFn != NULL) {
2,226!
1749
    pTaskInfo->pRoot->fpSet.reloadStreamStateFn(pTaskInfo->pRoot);
2,226✔
1750
  }
1751
  return 0;
2,226✔
1752
}
1753

1754
void qResetTaskCode(qTaskInfo_t tinfo) {
24✔
1755
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
24✔
1756

1757
  int32_t code = pTaskInfo->code;
24✔
1758
  pTaskInfo->code = 0;
24✔
1759
  qDebug("0x%" PRIx64 " reset task code to be success, prev:%s", pTaskInfo->id.taskId, tstrerror(code));
24✔
1760
}
24✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc