• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3858

17 Apr 2025 01:40PM UTC coverage: 62.968% (+0.5%) from 62.513%
#3858

push

travis-ci

web-flow
docs(opc): add perssit data support (#30783)

156194 of 316378 branches covered (49.37%)

Branch coverage included in aggregate %.

242021 of 316027 relevant lines covered (76.58%)

19473613.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.15
/source/libs/stream/src/streamExec.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamInt.h"
17

18
// maximum allowed processed block batches. One block may include several submit blocks
19
#define MAX_STREAM_EXEC_BATCH_NUM         32
20
#define STREAM_RESULT_DUMP_THRESHOLD      300
21
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1)  // 1MiB result data
22
#define STREAM_SCAN_HISTORY_TIMESLICE     1000           // 1000 ms
23
#define MIN_INVOKE_INTERVAL               50             // 50ms
24
#define FILL_HISTORY_TASK_EXEC_INTERVAL   5000           // 5 sec
25

26
static int32_t streamAlignRecalculateStart(SStreamTask* pTask);
27
static int32_t continueDispatchRecalculateStart(SStreamDataBlock* pBlock, SStreamTask* pTask);
28
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
29
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
30
                                  int32_t* totalBlocks);
31

32
bool streamTaskShouldStop(const SStreamTask* pTask) {
1,770,643✔
33
  SStreamTaskState pState = streamTaskGetStatus(pTask);
1,770,643✔
34
  return (pState.state == TASK_STATUS__STOP) || (pState.state == TASK_STATUS__DROPPING);
1,770,120!
35
}
36

37
bool streamTaskShouldPause(const SStreamTask* pTask) {
27,933✔
38
  return (streamTaskGetStatus(pTask).state == TASK_STATUS__PAUSE);
27,933✔
39
}
40

41
static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBlock) {
38,842✔
42
  int32_t code = 0;
38,842✔
43
  int32_t type = pTask->outputInfo.type;
38,842✔
44
  if (type == TASK_OUTPUT__TABLE) {
38,842✔
45
    pTask->outputInfo.tbSink.tbSinkFunc(pTask, pTask->outputInfo.tbSink.vnode, pBlock->blocks);
16,823✔
46
    destroyStreamDataBlock(pBlock);
16,822✔
47
  } else if (type == TASK_OUTPUT__SMA) {
22,019!
48
    pTask->outputInfo.smaSink.smaSink(pTask->outputInfo.smaSink.vnode, pTask->outputInfo.smaSink.smaId, pBlock->blocks);
×
49
    destroyStreamDataBlock(pBlock);
×
50
  } else {
51
    if (type != TASK_OUTPUT__FIXED_DISPATCH && type != TASK_OUTPUT__SHUFFLE_DISPATCH &&
22,019!
52
        type != TASK_OUTPUT__VTABLE_MAP) {
53
      stError("s-task:%s invalid stream output type:%d, internal error", pTask->id.idStr, type);
×
54
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
55
    }
56

57
    code = streamTaskPutDataIntoOutputQ(pTask, pBlock);
22,019✔
58
    if (code != TSDB_CODE_SUCCESS) {
22,022!
59
      destroyStreamDataBlock(pBlock);
×
60
      return code;
×
61
    }
62

63
    // not handle error, if dispatch failed, try next time.
64
    // checkpoint trigger will be checked
65
    code = streamDispatchStreamBlock(pTask);
22,022✔
66
  }
67

68
  return code;
38,847✔
69
}
70

71
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
21,239✔
72
                            int32_t* totalBlocks) {
73
  int32_t numOfBlocks = taosArrayGetSize(pRes);
21,239✔
74
  if (numOfBlocks == 0) {
21,239!
75
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
×
76
    return TSDB_CODE_SUCCESS;
×
77
  }
78

79
  SStreamDataBlock* pStreamBlocks = NULL;
21,239✔
80

81
  int32_t code = createStreamBlockFromResults(pItem, pTask, size, pRes, &pStreamBlocks);
21,239✔
82
  if (code) {
21,239!
83
    stError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
×
84
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
×
85
    return TSDB_CODE_OUT_OF_MEMORY;
×
86
  }
87

88
  stDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
21,239✔
89
          SIZE_IN_MiB(size));
90

91
  code = doOutputResultBlockImpl(pTask, pStreamBlocks);
21,239✔
92
  if (code != TSDB_CODE_SUCCESS) {  // back pressure and record position
21,239!
93
    return code;
×
94
  }
95

96
  *totalSize += size;
21,239✔
97
  *totalBlocks += numOfBlocks;
21,239✔
98

99
  return code;
21,239✔
100
}
101

102
static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, SStreamDataBlock* pRetrieveBlock,
514✔
103
                                     SArray* pRes) {
104
  SSDataBlock block = {0};
514✔
105
  int32_t     num = taosArrayGetSize(pRetrieveBlock->blocks);
514✔
106
  if (num != 1) {
514!
107
    stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num);
×
108
    return TSDB_CODE_INVALID_PARA;
×
109
  }
110

111
  void*   p = taosArrayGet(pRetrieveBlock->blocks, 0);
514✔
112
  int32_t code = assignOneDataBlock(&block, p);
514✔
113
  if (code) {
514!
114
    stError("s-task:%s failed to assign retrieve block, code:%s", pTask->id.idStr, tstrerror(code));
×
115
    return code;
×
116
  }
117

118
  block.info.type = STREAM_PULL_OVER;
514✔
119
  block.info.childId = pTask->info.selfChildId;
514✔
120

121
  p = taosArrayPush(pRes, &block);
514✔
122
  if (p != NULL) {
514!
123
    (*pNumOfBlocks) += 1;
514✔
124
    stDebug("s-task:%s(child %d) retrieve res from upstream completed, QID:0x%" PRIx64, pTask->id.idStr,
514✔
125
            pTask->info.selfChildId, pRetrieveBlock->reqId);
126
  } else {
127
    code = terrno;
×
128
    stError("s-task:%s failed to append pull over block for retrieve data, QID:0x%" PRIx64 " code:%s", pTask->id.idStr,
×
129
            pRetrieveBlock->reqId, tstrerror(code));
130
  }
131

132
  return code;
514✔
133
}
134

135
static int32_t doAppendRecalBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, SStreamTrigger* pRecalculateBlock,
×
136
                                  SArray* pRes) {
137
  int32_t code = 0;
×
138
  SSDataBlock block = {0};
×
139

140
  void* p = taosArrayPush(pRes, pRecalculateBlock->pBlock);
×
141
  if (p != NULL) {
×
142
    (*pNumOfBlocks) += 1;
×
143
    stDebug("s-task:%s(child %d) recalculate from upstream completed, QID:0x%" PRIx64, pTask->id.idStr,
×
144
            pTask->info.selfChildId, /*pRecalculateBlock->reqId*/ (int64_t)0);
145
  } else {
146
    code = terrno;
×
147
    stError("s-task:%s failed to append recalculate block for downstream, QID:0x%" PRIx64" code:%s", pTask->id.idStr,
×
148
            /*pRecalculateBlock->reqId*/(int64_t)0, tstrerror(code));
149
  }
150

151
  return code;
×
152
}
153

154
int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
43,871✔
155
  int32_t size = 0;
43,871✔
156
  int32_t numOfBlocks = 0;
43,871✔
157
  int32_t code = TSDB_CODE_SUCCESS;
43,871✔
158
  void*   pExecutor = pTask->exec.pExecutor;
43,871✔
159
  SArray* pRes = NULL;
43,871✔
160

161
  *totalBlocks = 0;
43,871✔
162
  *totalSize = 0;
43,871✔
163

164
  while (1) {
99,799✔
165
    SSDataBlock* output = NULL;
143,670✔
166
    uint64_t     ts = 0;
143,670✔
167

168
    if (pRes == NULL) {
143,670✔
169
      pRes = taosArrayInit(4, sizeof(SSDataBlock));
43,864✔
170
    }
171

172
    if (streamTaskShouldStop(pTask) || (pRes == NULL)) {
143,695!
173
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
7✔
174
      return code;
7✔
175
    }
176

177
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
143,687✔
178
      if (code == TSDB_CODE_QRY_IN_EXEC) {
25!
179
        qResetTaskInfoCode(pExecutor);
×
180
      }
181

182
      if (code == TSDB_CODE_OUT_OF_MEMORY || code == TSDB_CODE_INVALID_PARA || code == TSDB_CODE_FILE_CORRUPTED) {
25!
183
        stFatal("s-task:%s failed to continue execute since %s", pTask->id.idStr, tstrerror(code));
1!
184
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
1✔
185
        return code;
×
186
      } else {
187
        qResetTaskCode(pExecutor);
24✔
188
        continue;
742✔
189
      }
190
    }
191

192
    if (output == NULL) {
143,684✔
193
      if (pItem != NULL && (pItem->type == STREAM_INPUT__DATA_RETRIEVE)) {
43,905!
194
        code = doAppendPullOverBlock(pTask, &numOfBlocks, (SStreamDataBlock*)pItem, pRes);
514✔
195
        if (code) {
514!
196
          taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
×
197
          return code;
×
198
        }
199
      }
200

201
      break;
43,905✔
202
    }
203

204
    if (pTask->info.fillHistory == STREAM_RECALCUL_TASK && pTask->info.taskLevel == TASK_LEVEL__AGG) {
99,779✔
205
      stDebug("s-task:%s exec output type:%d", pTask->id.idStr, output->info.type);
10!
206
    }
207

208
    if (output->info.type == STREAM_RETRIEVE) {
99,779✔
209
      if (streamBroadcastToUpTasks(pTask, output) < 0) {
158✔
210
        // TODO
211
      }
212
      continue;
158✔
213
    } else if (output->info.type == STREAM_CHECKPOINT) {
99,621✔
214
      continue;  // checkpoint block not dispatch to downstream tasks
560✔
215
    }
216

217
    SSDataBlock block = {.info.childId = pTask->info.selfChildId};
99,061✔
218
    code = assignOneDataBlock(&block, output);
99,061✔
219
    if (code) {
99,060!
220
      stError("s-task:%s failed to build result block due to out of memory", pTask->id.idStr);
×
221
      continue;
×
222
    }
223

224
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
99,060✔
225
    numOfBlocks += 1;
99,060✔
226

227
    void* p = taosArrayPush(pRes, &block);
99,057✔
228
    if (p == NULL) {
99,057!
229
      stError("s-task:%s failed to add computing results, the final res may be incorrect", pTask->id.idStr);
×
230
    } else {
231
      stDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
99,057✔
232
              pTask->info.selfChildId, numOfBlocks, SIZE_IN_MiB(size));
233
    }
234

235
    // current output should be dispatched to down stream nodes
236
    if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
99,057!
237
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
2✔
238
      // todo: here we need continue retry to put it into output buffer
239
      if (code != TSDB_CODE_SUCCESS) {
2!
240
        return code;
×
241
      }
242

243
      pRes = NULL;
2✔
244
      size = 0;
2✔
245
      numOfBlocks = 0;
2✔
246
    }
247
  }
248

249
  if (numOfBlocks > 0) {
43,905✔
250
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
21,237✔
251
  } else {
252
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
22,668✔
253
  }
254

255
  return code;
43,905✔
256
}
257

258
// todo contiuous try to create result blocks
259
static int32_t handleScanhistoryResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) {
2,956✔
260
  int32_t code = TSDB_CODE_SUCCESS;
2,956✔
261
  if (taosArrayGetSize(pRes) > 0) {
2,956✔
262
    SStreamDataBlock* pStreamBlocks = NULL;
1,803✔
263
    code = createStreamBlockFromResults(NULL, pTask, size, pRes, &pStreamBlocks);
1,803✔
264
    if (code) {
1,803!
265
      stError("s-task:%s failed to build history result blocks", pTask->id.idStr);
×
266
      return code;
×
267
    }
268

269
    code = doOutputResultBlockImpl(pTask, pStreamBlocks);
1,803✔
270
    if (code != TSDB_CODE_SUCCESS) {  // should not have error code
1,803!
271
      stError("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code));
×
272
    }
273
  } else {
274
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
1,153✔
275
  }
276
  return code;
2,956✔
277
}
278

279
static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t* pSize, bool* pFinish) {
2,957✔
280
  int32_t code = TSDB_CODE_SUCCESS;
2,957✔
281
  void*   exec = pTask->exec.pExecutor;
2,957✔
282
  int32_t numOfBlocks = 0;
2,957✔
283

284
  while (1) {
23,544✔
285
    if (streamTaskShouldStop(pTask)) {
26,501!
286
      break;
×
287
    }
288

289
    if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
26,501!
290
      stDebug("s-task:%s level:%d inputQ is blocked, retry in 5s", pTask->id.idStr, pTask->info.taskLevel);
×
291
      break;
×
292
    }
293

294
    SSDataBlock* output = NULL;
26,501✔
295
    uint64_t     ts = 0;
26,501✔
296
    code = qExecTask(exec, &output, &ts);
26,501✔
297
    if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) {  // if out of memory occurs, quit
26,502!
298
      stError("s-task:%s scan-history data error occurred code:%s, continue scan-history", pTask->id.idStr,
×
299
              tstrerror(code));
300
      qResetTaskCode(exec);
×
301
      continue;
×
302
    }
303

304
    // the generated results before fill-history task been paused, should be dispatched to sink node
305
    if (output == NULL) {
26,502✔
306
      (*pFinish) = qStreamScanhistoryFinished(exec);
2,346✔
307
      break;
2,346✔
308
    }
309

310
    SSDataBlock block = {0};
24,156✔
311
    code = assignOneDataBlock(&block, output);
24,156✔
312
    if (code) {
24,156!
313
      stError("s-task:%s failed to build result block due to out of memory", pTask->id.idStr);
×
314
    }
315

316
    block.info.childId = pTask->info.selfChildId;
24,156✔
317
    void* p = taosArrayPush(pRes, &block);
24,156✔
318
    if (p == NULL) {
24,156!
319
      stError("s-task:%s failed to add computing results, the final res may be incorrect", pTask->id.idStr);
×
320
    }
321

322
    (*pSize) +=
24,156✔
323
        blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
24,156✔
324
    numOfBlocks += 1;
24,156✔
325

326
    if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || (*pSize) >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
24,156✔
327
      stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached",
612!
328
              pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(*pSize), STREAM_RESULT_DUMP_THRESHOLD,
329
              SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD));
330
      break;
612✔
331
    }
332
  }
333
}
2,958✔
334

335
static SScanhistoryDataInfo buildScanhistoryExecRet(EScanHistoryCode code, int32_t idleTime) {
2,381✔
336
  return (SScanhistoryDataInfo){code, idleTime};
2,381✔
337
}
338

339
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
2,381✔
340
  void*       exec = pTask->exec.pExecutor;
2,381✔
341
  bool        finished = false;
2,381✔
342
  const char* id = pTask->id.idStr;
2,381✔
343

344
  if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
2,381!
345
    stError("s-task:%s not source scan-history task, not exec, quit", pTask->id.idStr);
×
346
    return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
×
347
  }
348

349
  if ((!pTask->hTaskInfo.operatorOpen) || (pTask->info.fillHistory == STREAM_RECALCUL_TASK)) {
2,381✔
350
    int32_t code = qSetStreamOpOpen(exec);
2,346✔
351
    pTask->hTaskInfo.operatorOpen = true;
2,346✔
352
  }
353

354
  while (1) {
577✔
355
    if (streamTaskShouldPause(pTask)) {
2,958!
356
      stDebug("s-task:%s paused from the scan-history task", id);
×
357
      // quit from step1, not continue to handle the step2
358
      return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
2,381✔
359
    }
360

361
    // output queue is full, idle for 5 sec.
362
    if (streamQueueIsFull(pTask->outputq.queue)) {
2,958!
363
      stWarn("s-task:%s outputQ is full, idle for 1sec and retry", id);
×
364
      return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, STREAM_SCAN_HISTORY_TIMESLICE);
×
365
    }
366

367
    if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
2,958!
368
      stWarn("s-task:%s downstream task inputQ blocked, idle for 5sec and retry", id);
×
369
      return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL);
×
370
    }
371

372
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
2,958✔
373
    if (pRes == NULL) {
2,957!
374
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
375
      stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", id, tstrerror(terrno));
×
376
      continue;
×
377
    }
378

379
    int32_t size = 0;
2,957✔
380
    streamScanHistoryDataImpl(pTask, pRes, &size, &finished);
2,957✔
381

382
    if (streamTaskShouldStop(pTask)) {
2,958✔
383
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
2✔
384
      return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
2✔
385
    }
386

387
    // dispatch the generated results, todo fix error
388
    int32_t code = handleScanhistoryResultBlocks(pTask, pRes, size);
2,956✔
389
    if (code) {
2,956!
390
      stError("s-task:%s failed to handle scan result block, code:%s", pTask->id.idStr, tstrerror(code));
×
391
    }
392

393
    if (finished) {
2,956✔
394
      return buildScanhistoryExecRet(TASK_SCANHISTORY_CONT, 0);
2,344✔
395
    }
396

397
    int64_t el = taosGetTimestampMs() - st;
612✔
398
    if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) {
612!
399
      stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id,
35!
400
              pTask->info.fillHistory, el / 1000.0);
401
      return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, 100);
35✔
402
    }
403
  }
404
}
405

406
int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
2,233✔
407
  SStreamMeta* pMeta = pTask->pMeta;
2,233✔
408
  const char*  id = pTask->id.idStr;
2,233✔
409

410
  SStreamTask* pStreamTask = NULL;
2,233✔
411
  int32_t code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
2,233✔
412
  if (pStreamTask == NULL || code != TSDB_CODE_SUCCESS) {
2,233!
413
    stError(
×
414
        "s-task:%s failed to find related stream task:0x%x, may have been destroyed or closed, destroy related "
415
        "fill-history task",
416
        id, (int32_t)pTask->streamTaskId.taskId);
417

418
    // 1. free it and remove fill-history task from disk meta-store
419
    // todo: this function should never be failed.
420
    code = streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
×
421

422
    // 2. save to disk
423
    streamMetaWLock(pMeta);
×
424
    if (streamMetaCommit(pMeta) < 0) {
×
425
      // persist to disk
426
    }
427
    streamMetaWUnLock(pMeta);
×
428
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
429
  } else {
430
    double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.;
2,233✔
431
    stDebug(
2,233✔
432
        "s-task:%s fill-history task end, status:%s, scan wal elapsed time:%.2fSec, update related stream task:%s "
433
        "info, prepare transfer exec state",
434
        id, streamTaskGetStatus(pTask).name, el, pStreamTask->id.idStr);
435
  }
436

437
  ETaskStatus  status = streamTaskGetStatus(pStreamTask).state;
2,233✔
438
  STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
2,233✔
439

440
  // It must be halted for a source stream task, since when the related scan-history-data task start scan the history
441
  // for the step 2.
442
  if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
2,233✔
443
    if (!(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP)) {
2,164!
444
      stError("s-task:%s invalid task status:%d", id, status);
×
445
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
446
    }
447
  } else {
448
    if (!(status == TASK_STATUS__READY || status == TASK_STATUS__PAUSE || status == TASK_STATUS__DROPPING ||
69!
449
          status == TASK_STATUS__STOP)) {
450
      stError("s-task:%s invalid task status:%d", id, status);
×
451
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
452
    }
453
    code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
69✔
454
    if (code != TSDB_CODE_SUCCESS) {
69!
455
      stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", id,
×
456
              pStreamTask->id.idStr, tstrerror(code));
457
      streamMetaReleaseTask(pMeta, pStreamTask);
×
458
      return code;
×
459
    } else {
460
      stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, id);
69✔
461
    }
462
  }
463

464
  // In case of sink tasks, no need to halt them.
465
  // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
466
  // start the task state transfer procedure.
467
  SStreamTaskState pState = streamTaskGetStatus(pStreamTask);
2,233✔
468
  status = pState.state;
2,233✔
469
  char* p = pState.name;
2,233✔
470
  if (status == TASK_STATUS__STOP || status == TASK_STATUS__DROPPING) {
2,233!
471
    stError("s-task:%s failed to transfer state from fill-history task:%s, status:%s", id, pStreamTask->id.idStr, p);
×
472
    streamMetaReleaseTask(pMeta, pStreamTask);
×
473
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
474
  }
475

476
  // 1. expand the query time window for stream task of WAL scanner
477
  if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
2,233✔
478
    // update the scan data range for source task.
479
    stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
2,164✔
480
            ", status:%s, sched-status:%d",
481
            pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
482
            pTimeWindow->ekey, p, pStreamTask->status.schedStatus);
483

484
    code = streamTaskResetTimewindowFilter(pStreamTask);
2,164✔
485
  } else {
486
    stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr);
69✔
487
  }
488

489
  // NOTE: transfer the ownership of executor state before handle the checkpoint block during stream exec
490
  // 2. send msg to mnode to launch a checkpoint to keep the state for current stream
491
  code = streamTaskSendCheckpointReq(pStreamTask);
2,233✔
492

493
  // 3. the default task status should be ready or something, not halt.
494
  // status to the value that will be kept in disk
495

496
  // 4. open the inputQ for all upstream tasks
497
  streamTaskOpenAllUpstreamInput(pStreamTask);
2,233✔
498

499
  streamMetaReleaseTask(pMeta, pStreamTask);
2,233✔
500
  return code;
2,233✔
501
}
502

503
static int32_t haltCallback(SStreamTask* pTask, void* param) {
2,078✔
504
  streamTaskOpenAllUpstreamInput(pTask);
2,078✔
505
  return streamTaskSendCheckpointReq(pTask);
2,074✔
506
}
507

508
int32_t streamTransferStatePrepare(SStreamTask* pTask) {
4,315✔
509
  int32_t      code = TSDB_CODE_SUCCESS;
4,315✔
510
  SStreamMeta* pMeta = pTask->pMeta;
4,315✔
511

512
  if (pTask->status.appendTranstateBlock != 1) {
4,315!
513
    stError("s-task:%s not set appendTransBlock flag, internal error", pTask->id.idStr);
×
514
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
515
  }
516

517
  int32_t level = pTask->info.taskLevel;
4,315✔
518
  if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {  // do transfer task operator states.
4,315✔
519
    code = streamTransferStateDoPrepare(pTask);
2,230✔
520
  } else {
521
    // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task.
522
    SStreamTask* pStreamTask = NULL;
2,085✔
523
    code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
2,085✔
524
    if (pStreamTask != NULL) {
2,086!
525
      // halt the related stream sink task
526
      code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, haltCallback, NULL);
2,086✔
527
      if (code != TSDB_CODE_SUCCESS) {
2,083!
528
        stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", pTask->id.idStr,
×
529
                pStreamTask->id.idStr, tstrerror(code));
530
        streamMetaReleaseTask(pMeta, pStreamTask);
×
531
        return code;
×
532
      } else {
533
        stDebug("s-task:%s sink task halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
2,083✔
534
      }
535
      streamMetaReleaseTask(pMeta, pStreamTask);
2,083✔
536
    }
537
  }
538

539
  return code;
4,319✔
540
}
541

542
// set input
543
static int32_t doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pVer, const char* id) {
43,868✔
544
  void*   pExecutor = pTask->exec.pExecutor;
43,868✔
545
  int32_t code = 0;
43,868✔
546

547
  const SStreamQueueItem* pItem = pInput;
43,868✔
548
  if (pItem->type == STREAM_INPUT__GET_RES) {
43,868✔
549
    const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
5,957✔
550
    code = qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
5,957✔
551
    if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
5,956✔
552
      TSKEY k = pTrigger->pBlock->info.window.skey;
4,277✔
553
      stDebug("s-task:%s set force_window_close as source block, skey:%" PRId64, id, k);
4,277!
554
      (*pVer) = k;
4,277✔
555
    }
556
  } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
37,911✔
557
    const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
6,695✔
558
    code = qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
6,695✔
559
    stDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
6,707✔
560
            pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
561
    if ((*pVer) > pSubmit->submit.ver) {
6,701!
562
      stError("s-task:%s invalid recorded ver:%" PRId64 " greater than new block ver:%" PRId64 ", not update", id,
×
563
              *pVer, pSubmit->submit.ver);
564
    } else {
565
      (*pVer) = pSubmit->submit.ver;
6,701✔
566
    }
567
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
33,418✔
568
    const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;
2,205✔
569

570
    SArray* pBlockList = pBlock->blocks;
2,205✔
571
    int32_t numOfBlocks = taosArrayGetSize(pBlockList);
2,205✔
572
    stDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer);
2,202✔
573
    code = qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
2,202✔
574

575
  } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
29,011✔
576
    const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;
23,330✔
577

578
    SArray* pBlockList = pMerged->submits;
23,330✔
579
    int32_t numOfBlocks = taosArrayGetSize(pBlockList);
23,330✔
580
    stDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, numOfBlocks,
23,322✔
581
            pMerged->ver);
582
    code = qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
23,322✔
583

584
    if ((*pVer) > pMerged->ver) {
23,329!
585
      stError("s-task:%s invalid recorded ver:%" PRId64 " greater than new block ver:%" PRId64 ", not update", id,
×
586
              *pVer, pMerged->ver);
587
    } else {
588
      (*pVer) = pMerged->ver;
23,329✔
589
    }
590

591
  } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
5,681✔
592
    const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
2,408✔
593
    code = qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
2,408✔
594

595
  } else if (pItem->type == STREAM_INPUT__CHECKPOINT || pItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
3,273!
596
             pItem->type == STREAM_INPUT__RECALCULATE) {
3,288!
597
    const SStreamDataBlock* pCheckpoint = (const SStreamDataBlock*)pInput;
3,273✔
598
    code = qSetMultiStreamInput(pExecutor, pCheckpoint->blocks, 1, pItem->type);
3,273✔
599

600
    if (pItem->type == STREAM_INPUT__RECALCULATE) {
3,278✔
601
      int32_t t = ((SStreamDataBlock*) pCheckpoint)->type;
10✔
602
      int32_t tId = (int32_t)pTask->hTaskInfo.id.taskId;
10✔
603
      if (t == STREAM_RECALCULATE_START) {
10!
604
        stDebug("s-task:%s set recalculate block to start related recalculate task:0x%x", id, tId);
×
605
      } else {
606
        stDebug("s-task:%s set recalculate block:%d, task:0x%x", id, t, tId);
10!
607
      }
608
    }
609
  } else {
610
    stError("s-task:%s invalid input block type:%d, discard", id, pItem->type);
×
611
    code = TSDB_CODE_STREAM_INTERNAL_ERROR;
×
612
  }
613

614
  return code;
43,861✔
615
}
616

617
void streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
8,837✔
618
  const char* id = pTask->id.idStr;
8,837✔
619
  int32_t     code = TSDB_CODE_SUCCESS;
8,837✔
620
  int32_t     level = pTask->info.taskLevel;
8,837✔
621

622
  // dispatch the tran-state block to downstream task immediately
623
  int32_t type = pTask->outputInfo.type;
8,837✔
624

625
  if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SINK) {
8,837✔
626
    int32_t remain = streamAlignTransferState(pTask);
6,673✔
627
    if (remain > 0) {
6,677✔
628
      streamFreeQitem((SStreamQueueItem*)pBlock);
4,522✔
629
      stDebug("s-task:%s receive upstream trans-state msg, not sent remain:%d", id, remain);
4,523✔
630
      return;
4,523✔
631
    }
632
  }
633

634
  // transfer the ownership of executor state
635
  if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH || type == TASK_OUTPUT__VTABLE_MAP) {
4,319✔
636
    if (level == TASK_LEVEL__SOURCE) {
2,217✔
637
      stDebug("s-task:%s add transfer-state block into outputQ", id);
2,148✔
638
    } else {
639
      stDebug("s-task:%s all upstream tasks send transfer-state block, add transfer-state block into outputQ", id);
69✔
640
    }
641

642
    // agg task should dispatch trans-state msg to sink task, to flush all data to sink task.
643
    if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {
2,217!
644
      pBlock->srcVgId = pTask->pMeta->vgId;
2,217✔
645
      code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
2,217✔
646
      if (code == 0) {
2,217!
647
        code = streamDispatchStreamBlock(pTask);
2,217✔
648
        if (code) {
2,217!
649
          stError("s-task:%s failed to dispatch stream block, code:%s", id, tstrerror(code));
×
650
        }
651
      } else {  // todo put into queue failed, retry
652
        streamFreeQitem((SStreamQueueItem*)pBlock);
×
653
      }
654
    } else {  // level == TASK_LEVEL__SINK
655
      streamFreeQitem((SStreamQueueItem*)pBlock);
×
656
    }
657
  } else {  // non-dispatch task, do task state transfer directly
658
    streamFreeQitem((SStreamQueueItem*)pBlock);
2,102✔
659
    stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, level);
2,105✔
660

661
    code = streamTransferStatePrepare(pTask);
2,105✔
662
    if (code != TSDB_CODE_SUCCESS) {
2,104!
663
      stError("s-task:%s failed to prepare transfer state, code:%s", id, tstrerror(code));
×
664
      int8_t status = streamTaskSetSchedStatusInactive(pTask);  // let's ignore this return status
×
665
    }
666
  }
667
}
668

669
// static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
670
static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; }
70,823✔
671

672
static void doRecordThroughput(STaskExecStatisInfo* pInfo, int64_t totalBlocks, int64_t totalSize, int64_t blockSize,
43,907✔
673
                               double st, const char* id) {
674
  double el = (taosGetTimestampMs() - st) / 1000.0;
43,907✔
675

676
  stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%" PRId64, id,
43,907✔
677
          el, SIZE_IN_MiB(totalSize), totalBlocks);
678

679
  pInfo->outputDataBlocks += totalBlocks;
43,908✔
680
  pInfo->outputDataSize += totalSize;
43,908✔
681
  if (fabs(el - 0.0) <= DBL_EPSILON) {
43,908✔
682
    pInfo->procsThroughput = 0;
15,316✔
683
    pInfo->outputThroughput = 0;
15,316✔
684
  } else {
685
    pInfo->outputThroughput = (totalSize / el);
28,592✔
686
    pInfo->procsThroughput = (blockSize / el);
28,592✔
687
  }
688
}
43,908✔
689

690
static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, int32_t num) {
43,877✔
691
  const char*      id = pTask->id.idStr;
43,877✔
692
  int32_t          blockSize = 0;
43,877✔
693
  int64_t          st = taosGetTimestampMs();
43,885✔
694
  SCheckpointInfo* pInfo = &pTask->chkInfo;
43,885✔
695
  int64_t          ver = pInfo->processedVer;
43,885✔
696
  int64_t          totalSize = 0;
43,885✔
697
  int32_t          totalBlocks = 0;
43,885✔
698
  int32_t          code = 0;
43,885✔
699

700
  stDebug("s-task:%s start to process batch blocks, num:%d, type:%s", id, num, streamQueueItemGetTypeStr(pBlock->type));
43,885✔
701
  code = doSetStreamInputBlock(pTask, pBlock, &ver, id);
43,885✔
702
  if (code) {
43,861!
703
    stError("s-task:%s failed to set input block, not exec for these blocks", id);
×
704
    return code;
×
705
  }
706

707
  code = streamTaskExecImpl(pTask, pBlock, &totalSize, &totalBlocks);
43,861✔
708
  if (code) {
43,913✔
709
    return code;
5✔
710
  }
711

712
  doRecordThroughput(&pTask->execInfo, totalBlocks, totalSize, blockSize, st, pTask->id.idStr);
43,908✔
713

714
  // update the currentVer if processing the submitted blocks.
715
  if (!(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer)) {
43,907!
716
    stError("s-task:%s invalid info, checkpointVer:%" PRId64 ", nextProcessVer:%" PRId64 " currentVer:%" PRId64, id,
×
717
            pInfo->checkpointVer, pInfo->nextProcessVer, ver);
718
    return code;
×
719
  }
720

721
  if (ver != pInfo->processedVer) {
43,907✔
722
    stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64
34,340✔
723
            " ckpt:%" PRId64,
724
            id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer);
725
    pInfo->processedVer = ver;
34,340✔
726
  }
727

728
  return code;
43,907✔
729
}
730

731
// do nothing after sync executor state to storage backend, untill checkpoint is completed.
732
static int32_t doHandleChkptBlock(SStreamTask* pTask) {
3,241✔
733
  int32_t     code = 0;
3,241✔
734
  const char* id = pTask->id.idStr;
3,241✔
735

736
  streamMutexLock(&pTask->lock);
3,241✔
737
  SStreamTaskState pState = streamTaskGetStatus(pTask);
3,241✔
738
  streamMutexUnlock(&pTask->lock);
3,241✔
739

740
  if (pState.state == TASK_STATUS__CK) {  // todo other thread may change the status
3,241!
741
    stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
3,241✔
742
    code = streamTaskBuildCheckpoint(pTask);  // ignore this error msg, and continue
3,241✔
743
  } else {                                    // todo refactor
744
    if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
×
745
      code = streamTaskSendCheckpointSourceRsp(pTask);
×
746
    } else {
747
      code = streamTaskSendCheckpointReadyMsg(pTask);
×
748
    }
749

750
    if (code != TSDB_CODE_SUCCESS) {
×
751
      // todo: let's retry send rsp to upstream/mnode
752
      stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", id, 0,
×
753
              tstrerror(code));
754
    }
755
  }
756

757
//  streamMutexUnlock(&pTask->lock);
758
  return code;
3,241✔
759
}
760

761
int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) {
3,268✔
762
  const char* id = pTask->id.idStr;
3,268✔
763

764
  // 1. transfer the ownership of executor state
765
  bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
3,268✔
766
  if (dropRelHTask) {
3,268✔
767
    STaskId*     pHTaskId = &pTask->hTaskInfo.id;
2,170✔
768
    SStreamTask* pHTask = NULL;
2,170✔
769
    int32_t      code = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
2,170✔
770
    if (code == TSDB_CODE_SUCCESS) {  // ignore the error code.
2,170!
771
      code = streamTaskReleaseState(pHTask);
2,170✔
772
      if (code) {
2,170!
773
        stError("s-task:%s failed to release query state, code:%s", pHTask->id.idStr, tstrerror(code));
×
774
      }
775

776
      if (code == TSDB_CODE_SUCCESS) {
2,170!
777
        code = streamTaskReloadState(pTask);
2,170✔
778
        if (code) {
2,170!
779
          stError("s-task:%s failed to reload query state, code:%s", pTask->id.idStr, tstrerror(code));
×
780
        }
781
      }
782

783
      stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
2,170✔
784
              streamTaskGetStatus(pHTask).name);
785
      // todo execute qExecTask to fetch the reload-generated result, if this is stream is for session window query.
786
      /*
787
       * while(1) {
788
       * qExecTask()
789
       * }
790
       * // put into the output queue.
791
       */
792
      streamMetaReleaseTask(pTask->pMeta, pHTask);
2,170✔
793
    } else {
794
      stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id,
×
795
              (int32_t)pHTaskId->taskId);
796
    }
797
  } else {
798
    stDebug("s-task:%s no transfer-state needed", id);
1,098✔
799
  }
800

801
  // 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V.
802
  int32_t code = doStreamTaskExecImpl(pTask, pCheckpointBlock, 1);
3,268✔
803
  if (code) {
3,268!
804
    stError("s-task:%s failed to exec stream task before checkpoint, code:%s", id, tstrerror(code));
×
805
  }
806

807
  return code;
3,268✔
808
}
809

810
/**
811
 * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
812
 * appropriate batch of blocks should be handled in 5 to 10 sec.
813
 */
814
static int32_t doStreamExecTask(SStreamTask* pTask) {
71,043✔
815
  const char* id = pTask->id.idStr;
71,043✔
816
  int32_t     code = 0;
71,043✔
817
  int32_t     vgId = pTask->pMeta->vgId;
71,043✔
818
  int32_t     taskLevel = pTask->info.taskLevel;
71,043✔
819
  int32_t     taskType = pTask->info.fillHistory;
71,043✔
820

821
  // merge multiple input data if possible in the input queue.
822
  int64_t st = taosGetTimestampMs();
71,099✔
823
  stDebug("s-task:%s start to extract data block from inputQ, ts:%" PRId64, id, st);
71,099✔
824

825
  while (1) {
76,347✔
826
    int32_t           blockSize = 0;
147,448✔
827
    int32_t           numOfBlocks = 0;
147,448✔
828
    SStreamQueueItem* pInput = NULL;
147,448✔
829

830
    if (streamTaskShouldStop(pTask) || (streamTaskGetStatus(pTask).state == TASK_STATUS__UNINIT)) {
147,448!
831
      stDebug("s-task:%s stream task is stopped", id);
×
832
      return 0;
71,089✔
833
    }
834

835
    if (streamQueueIsFull(pTask->outputq.queue)) {
147,429✔
836
      stTrace("s-task:%s outputQ is full, idle for 500ms and retry", id);
16!
837
      streamTaskSetIdleInfo(pTask, 1000);
16✔
838
      return 0;
×
839
    }
840

841
    if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
147,456!
842
      stTrace("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", id);
×
843
      streamTaskSetIdleInfo(pTask, 1000);
×
844
      return 0;
×
845
    }
846

847
    if (taosGetTimestampMs() - pTask->status.lastExecTs < MIN_INVOKE_INTERVAL) {
147,462✔
848
      stDebug("s-task:%s invoke exec too fast, idle and retry in 50ms", id);
8,484✔
849
      streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
8,484✔
850
      return 0;
8,483✔
851
    }
852

853
    EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
138,978✔
854
    if (ret == EXEC_AFTER_IDLE) {
138,944!
855
      streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
×
856
      return 0;
×
857
    } else {
858
      if (pInput == NULL) {
138,961✔
859
        return 0;
59,369✔
860
      }
861
    }
862

863
    pTask->execInfo.inputDataBlocks += numOfBlocks;
79,592✔
864
    pTask->execInfo.inputDataSize += blockSize;
79,592✔
865

866
    // dispatch checkpoint msg to all downstream tasks
867
    int32_t type = pInput->type;
79,592✔
868
    if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
79,592✔
869
#if 0
870
      // Injection error: for automatic kill long trans test
871
      taosMsleep(50*1000);
872
#endif
873
      code = streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput);
11,069✔
874
      if (code != 0) {
11,069!
875
        stError("s-task:%s failed to process checkpoint-trigger block, code:%s", pTask->id.idStr, tstrerror(code));
×
876
      }
877
      continue;
35,710✔
878
    }
879

880
    if (type == STREAM_INPUT__TRANS_STATE) {
68,523✔
881
      streamProcessTransstateBlock(pTask, (SStreamDataBlock*)pInput);
8,836✔
882
      continue;
8,837✔
883
    }
884

885
    if (type == STREAM_INPUT__CHECKPOINT) {
59,687✔
886
      code = doHandleChkptBlock(pTask);
3,241✔
887
      streamFreeQitem(pInput);
3,241✔
888
      return code;
3,222✔
889
    }
890

891
    if (taskLevel == TASK_LEVEL__SINK) {
56,446✔
892
      if (type != STREAM_INPUT__DATA_BLOCK && type != STREAM_INPUT__RECALCULATE) {
15,799!
893
        stError("s-task:%s invalid block type:%d for sink task, discard", id, type);
×
894
        continue;
×
895
      }
896

897
      // here only handle the data block sink operation
898
      if (type == STREAM_INPUT__DATA_BLOCK) {
15,799!
899
        pTask->execInfo.sink.dataSize += blockSize;
15,799✔
900
        stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize));
15,799✔
901
        code = doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput);
15,799✔
902
        if (code != TSDB_CODE_SUCCESS) {
15,804!
903
          return code;
×
904
        }
905

906
        double el = (taosGetTimestampMs() - st) / 1000.0;
15,804✔
907
        pTask->execInfo.procsThroughput = (fabs(el - 0.0) <= DBL_EPSILON) ? 0 : (blockSize / el);
15,804✔
908
      } else {
909
        streamFreeQitem((SStreamQueueItem*)pInput);
×
910
      }
911

912
      continue;
15,804✔
913
    }
914

915
    if (type == STREAM_INPUT__RECALCULATE) {
40,647✔
916
      if (taskType == STREAM_NORMAL_TASK && taskLevel == TASK_LEVEL__AGG) {
16✔
917
        int32_t remain = streamAlignRecalculateStart(pTask);
4✔
918
        if (remain > 0) {
4✔
919
          streamFreeQitem((SStreamQueueItem*)pInput);
2✔
920
          stDebug("s-task:%s receive upstream recalculate msg, not sent remain:%d", id, remain);
2!
921
          return code;
2✔
922
        }
923

924
        stDebug("s-task:%s all upstream send recalculate msg, continue", id);
2!
925
      }
926

927
      // 1. generate the recalculating snapshot for related recalculate tasks.
928
      if ((taskType == STREAM_NORMAL_TASK) &&
14✔
929
          ((taskLevel == TASK_LEVEL__AGG) || (taskLevel == TASK_LEVEL__SOURCE && (!pTask->info.hasAggTasks)))) {
10!
930
        code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks);
8✔
931
      } else if (taskType == STREAM_RECALCUL_TASK && taskLevel == TASK_LEVEL__AGG) {
6!
932
        // send retrieve to upstream tasks (source tasks, to start to recalculate procedure.
933
        stDebug("s-task:%s recalculate agg task send retrieve to upstream source tasks", id);
2!
934
        code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks);
2✔
935
      }
936
    }
937

938
    if (type != STREAM_INPUT__RECALCULATE) {
40,645✔
939
      code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks);
40,597✔
940
      streamFreeQitem(pInput);
40,634✔
941
      if (code) {
40,635✔
942
        return code;
5✔
943
      }
944
    }
945

946
    // for stream with only 1 task, start related re-calculate stream task directly.
947
    // We only start the re-calculate agg task here, and do NOT start the source task, for streams with agg-tasks.
948
    if ((type == STREAM_INPUT__RECALCULATE) && (taskType == STREAM_NORMAL_TASK)) {
40,678✔
949
      SSDataBlock* pb = taosArrayGet(((SStreamDataBlock*)pInput)->blocks, 0);
12✔
950

951
      if ((taskLevel == TASK_LEVEL__AGG) || ((taskLevel == TASK_LEVEL__SOURCE) && (!pTask->info.hasAggTasks))) {
20!
952
        EStreamType blockType = pb->info.type;
8✔
953

954
        if (pTask->hTaskInfo.id.streamId == 0) {
8!
955
          stError("s-task:%s related re-calculate stream task is dropping, failed to start re-calculate", id);
×
956
          streamFreeQitem(pInput);
×
957
          return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
958
        }
959

960
        if (pTask->info.trigger != STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
8!
961
          stError("s-task:%s invalid trigger model, expect:%d, actually:%d, not exec tasks", id,
×
962
                  STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE, pTask->info.trigger);
963
          streamFreeQitem(pInput);
×
964
          return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
965
        }
966

967
        SStreamTask* pHTask = NULL;
8✔
968
        code = streamMetaAcquireTask(pTask->pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId, &pHTask);
8✔
969
        if (code != 0) {
8!
970
          stError("s-task:%s failed to acquire related recalculate task:0x%x, not start the recalculation, code:%s", id,
×
971
                  (int32_t)pTask->hTaskInfo.id.taskId, tstrerror(code));
972
          streamFreeQitem(pInput);
×
973
          return code;
×
974
        }
975

976
        if (blockType == STREAM_RECALCULATE_START) {
8✔
977
          // start the related recalculate task to do recalculate
978
          stDebug("s-task:%s start recalculate task to do recalculate:0x%x", id, pHTask->id.taskId);
6!
979

980
          if (taskLevel == TASK_LEVEL__SOURCE) {
6✔
981
            code = streamStartScanHistoryAsync(pHTask, 0);
4✔
982
          } else {  // for agg task, in normal stream queue to execute
983
            SStreamDataBlock* pRecalBlock = NULL;
2✔
984
            code = streamCreateRecalculateBlock(pTask, &pRecalBlock, STREAM_RECALCULATE_START);
2✔
985
            if (code) {
2!
986
              stError("s-task:%s failed to generate recalculate block, code:%s", id, tstrerror(code));
×
987
            } else {
988
              code = streamTaskPutDataIntoInputQ(pHTask, (SStreamQueueItem*)pRecalBlock);
2✔
989
              if (code != TSDB_CODE_SUCCESS) {
2!
990
                stError("s-task:%s failed to put recalculate block into q, code:%s", pTask->id.idStr, tstrerror(code));
×
991
              } else {
992
                stDebug("s-task:%s put recalculate block into inputQ", pHTask->id.idStr);
2!
993
              }
994
              code = streamTrySchedExec(pHTask, false);
2✔
995
            }
996
          }
997
        }
998
        streamMetaReleaseTask(pTask->pMeta, pHTask);
8✔
999
      } else if ((taskLevel == TASK_LEVEL__SOURCE) && pTask->info.hasAggTasks) {
4!
1000
        code = continueDispatchRecalculateStart((SStreamDataBlock*)pInput, pTask);
4✔
1001
        pInput = NULL;
4✔
1002
      }
1003
    }
1004

1005
    if (type == STREAM_INPUT__RECALCULATE) {
40,678✔
1006
      streamFreeQitem(pInput);
14✔
1007
    }
1008

1009
    if (code) {
40,641!
1010
      return code;
×
1011
    }
1012

1013
    if (taskType == STREAM_RECALCUL_TASK && taskLevel == TASK_LEVEL__AGG && type != STREAM_INPUT__RECALCULATE) {
40,641✔
1014
      bool complete = qStreamScanhistoryFinished(pTask->exec.pExecutor);
3✔
1015
      if (complete) {
3✔
1016
        stDebug("s-task:%s recalculate agg task complete recalculate procedure", id);
2!
1017
        return 0;
2✔
1018
      }
1019
    }
1020

1021
    double el = (taosGetTimestampMs() - st) / 1000.0;
40,639✔
1022
    if (el > 2.0) {  // elapsed more than 5 sec, not occupy the CPU anymore
40,639✔
1023
      stDebug("s-task:%s occupy more than 2.0s, release the exec threads and idle for 500ms", id);
2!
1024
      streamTaskSetIdleInfo(pTask, 500);
2✔
1025
      return code;
2✔
1026
    }
1027
  }
1028

1029
  
1030
}
1031

1032
// the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not
1033
// be updated by tryExec function, therefore, the schedStatus will always be the TASK_SCHED_STATUS__WAITING.
1034
bool streamTaskIsIdle(const SStreamTask* pTask) {
3,117✔
1035
  ETaskStatus status = streamTaskGetStatus(pTask).state;
3,117✔
1036
  return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE || status == TASK_STATUS__STOP ||
3,117!
1037
          status == TASK_STATUS__DROPPING);
1038
}
1039

1040
bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
69,464✔
1041
  SStreamTaskState pState = streamTaskGetStatus(pTask);
69,464✔
1042

1043
  ETaskStatus st = pState.state;
69,489✔
1044
  if (pStatus != NULL) {
69,489!
1045
    *pStatus = pState.name;
69,507✔
1046
  }
1047

1048
  // pause & halt will still run for sink tasks.
1049
  if (streamTaskIsSinkTask(pTask)) {
69,489✔
1050
    return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK ||
11,905✔
1051
            st == TASK_STATUS__PAUSE || st == TASK_STATUS__HALT);
41,553✔
1052
  } else {
1053
    return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK ||
39,829✔
1054
            st == TASK_STATUS__HALT);
1055
  }
1056
}
1057

1058
static bool shouldNotCont(SStreamTask* pTask) {
71,093✔
1059
  int32_t       level = pTask->info.taskLevel;
71,093✔
1060
  SStreamQueue* pQueue = pTask->inputq.queue;
71,093✔
1061
  ETaskStatus   status = streamTaskGetStatus(pTask).state;
71,093✔
1062

1063
  // 1. task should jump out
1064
  bool quit = (status == TASK_STATUS__STOP) || (status == TASK_STATUS__PAUSE) || (status == TASK_STATUS__DROPPING);
71,102!
1065

1066
  // 2. checkpoint procedure, the source task's checkpoint queue is empty, not read from ordinary queue
1067
  bool emptyCkQueue = (taosQueueItemSize(pQueue->pChkptQueue) == 0);
71,102✔
1068

1069
  // 3. no data in ordinary queue
1070
  bool emptyBlockQueue = (streamQueueGetNumOfItems(pQueue) == 0);
71,127✔
1071

1072
  if (quit) {
71,128✔
1073
    return true;
18✔
1074
  } else {
1075
    if (status == TASK_STATUS__CK && level == TASK_LEVEL__SOURCE) {
71,110✔
1076
      // in checkpoint procedure, we only check whether the controller queue is empty or not
1077
      return emptyCkQueue;
6,028✔
1078
    } else { // otherwise, if the block queue is empty, not continue.
1079
      return emptyBlockQueue && emptyCkQueue;
65,082!
1080
    }
1081
  }
1082
}
1083

1084
int32_t streamResumeTask(SStreamTask* pTask) {
70,790✔
1085
  const char* id = pTask->id.idStr;
70,790✔
1086
  int32_t     level = pTask->info.taskLevel;
70,790✔
1087
  int32_t     code = 0;
70,790✔
1088

1089
  if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) {
70,790!
1090
    stError("s-task:%s invalid sched status:%d, not resume task", pTask->id.idStr, pTask->status.schedStatus);
×
1091
    return code;
×
1092
  }
1093

1094
  while (1) {
1095
    code = doStreamExecTask(pTask);
71,063✔
1096
    if (code) {
71,099✔
1097
      stError("s-task:%s failed to exec stream task, code:%s, continue", id, tstrerror(code));
5!
1098
    }
1099

1100
    streamMutexLock(&pTask->lock);
71,099✔
1101

1102
    if (shouldNotCont(pTask)) {
71,122✔
1103
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
62,661✔
1104
      streamTaskClearSchedIdleInfo(pTask);
62,662✔
1105
      streamMutexUnlock(&pTask->lock);
62,641✔
1106

1107
      setLastExecTs(pTask, taosGetTimestampMs());
62,653✔
1108

1109
      char* p = streamTaskGetStatus(pTask).name;
62,634✔
1110
      stDebug("s-task:%s exec completed, status:%s, sched-status:%d, lastExecTs:%" PRId64, id, p,
62,646✔
1111
              pTask->status.schedStatus, pTask->status.lastExecTs);
1112

1113
      return code;
62,645✔
1114
    } else {
1115
      // check if this task needs to be idle for a while
1116
      if (pTask->status.schedIdleTime > 0) {
8,466✔
1117
        streamTaskResumeInFuture(pTask);
8,193✔
1118

1119
        streamMutexUnlock(&pTask->lock);
8,194✔
1120
        setLastExecTs(pTask, taosGetTimestampMs());
8,194✔
1121
        return code;
8,194✔
1122
      }
1123
    }
1124

1125
    streamMutexUnlock(&pTask->lock);
273✔
1126
  }
1127

1128
  return code;
1129
}
1130

1131
int32_t streamExecTask(SStreamTask* pTask) {
62,664✔
1132
  // this function may be executed by multi-threads, so status check is required.
1133
  const char* id = pTask->id.idStr;
62,664✔
1134
  int32_t     code = 0;
62,664✔
1135

1136
  int8_t schedStatus = streamTaskSetSchedStatusActive(pTask);
62,664✔
1137
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
62,713!
1138
    code = streamResumeTask(pTask);
62,713✔
1139
  } else {
1140
    char* p = streamTaskGetStatus(pTask).name;
×
1141
    stDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, p,
×
1142
            pTask->status.schedStatus);
1143
  }
1144

1145
  return code;
62,702✔
1146
}
1147

1148
int32_t streamTaskReleaseState(SStreamTask* pTask) {
2,170✔
1149
  stDebug("s-task:%s release exec state", pTask->id.idStr);
2,170✔
1150
  void* pExecutor = pTask->exec.pExecutor;
2,170✔
1151

1152
  int32_t code = TSDB_CODE_SUCCESS;
2,170✔
1153
  if (pExecutor != NULL) {
2,170!
1154
    code = qStreamOperatorReleaseState(pExecutor);
2,170✔
1155
  }
1156

1157
  return code;
2,170✔
1158
}
1159

1160
int32_t streamTaskReloadState(SStreamTask* pTask) {
2,170✔
1161
  stDebug("s-task:%s reload exec state", pTask->id.idStr);
2,170✔
1162
  void* pExecutor = pTask->exec.pExecutor;
2,170✔
1163

1164
  int32_t code = TSDB_CODE_SUCCESS;
2,170✔
1165
  if (pExecutor != NULL) {
2,170!
1166
    code = qStreamOperatorReloadState(pExecutor);
2,170✔
1167
  }
1168

1169
  return code;
2,170✔
1170
}
1171

1172
int32_t streamAlignTransferState(SStreamTask* pTask) {
6,672✔
1173
  int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
6,672✔
1174
  int32_t old = atomic_val_compare_exchange_32(&pTask->transferStateAlignCnt, 0, numOfUpstream);
6,674✔
1175
  if (old == 0) {
6,677✔
1176
    stDebug("s-task:%s set the transfer state aligncnt %d", pTask->id.idStr, numOfUpstream);
2,229✔
1177
  }
1178

1179
  return atomic_sub_fetch_32(&pTask->transferStateAlignCnt, 1);
6,677✔
1180
}
1181

1182
int32_t streamAlignRecalculateStart(SStreamTask* pTask) {
4✔
1183
  int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
4✔
1184
  int32_t old = atomic_val_compare_exchange_32(&pTask->recalculateAlignCnt, 0, numOfUpstream);
4✔
1185
  if (old == 0) {
4✔
1186
    stDebug("s-task:%s set start recalculate state aligncnt %d", pTask->id.idStr, numOfUpstream);
2!
1187
  }
1188

1189
  return atomic_sub_fetch_32(&pTask->recalculateAlignCnt, 1);
4✔
1190
}
1191

1192
int32_t continueDispatchRecalculateStart(SStreamDataBlock* pBlock, SStreamTask* pTask) {
4✔
1193
  pBlock->srcTaskId = pTask->id.taskId;
4✔
1194
  pBlock->srcVgId = pTask->pMeta->vgId;
4✔
1195

1196
  int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
4✔
1197
  if (code == 0) {
4!
1198
    code = streamDispatchStreamBlock(pTask);
4✔
1199
  } else {
1200
    stError("s-task:%s failed to put recalculate start block into outputQ, code:%s", pTask->id.idStr, tstrerror(code));
×
1201
    streamFreeQitem((SStreamQueueItem*)pBlock);
×
1202
  }
1203

1204
  return code;
4✔
1205
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc