• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4155

24 May 2025 03:30AM UTC coverage: 63.028% (+0.8%) from 62.238%
#4155

push

travis-ci

web-flow
test: migrate stream cases (#31164)

157592 of 318118 branches covered (49.54%)

Branch coverage included in aggregate %.

242817 of 317165 relevant lines covered (76.56%)

18939592.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.84
/source/libs/stream/src/streamExec.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamInt.h"
17

18
// maximum allowed processed block batches. One block may include several submit blocks
19
#define MAX_STREAM_EXEC_BATCH_NUM         32
20
#define STREAM_RESULT_DUMP_THRESHOLD      300
21
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1)  // 1MiB result data
22
#define STREAM_SCAN_HISTORY_TIMESLICE     1000           // 1000 ms
23
#define MIN_INVOKE_INTERVAL               50             // 50ms
24
#define FILL_HISTORY_TASK_EXEC_INTERVAL   5000           // 5 sec
25

26
static int32_t streamAlignRecalculateStart(SStreamTask* pTask);
27
static int32_t continueDispatchRecalculateStart(SStreamDataBlock* pBlock, SStreamTask* pTask);
28
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
29
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
30
                                  int32_t* totalBlocks);
31

32
bool streamTaskShouldStop(const SStreamTask* pTask) {
1,224,770✔
33
  SStreamTaskState pState = streamTaskGetStatus(pTask);
1,224,770✔
34
  return (pState.state == TASK_STATUS__STOP) || (pState.state == TASK_STATUS__DROPPING);
1,225,140!
35
}
36

37
bool streamTaskShouldPause(const SStreamTask* pTask) {
22,852✔
38
  return (streamTaskGetStatus(pTask).state == TASK_STATUS__PAUSE);
22,852✔
39
}
40

41
static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBlock) {
22,561✔
42
  int32_t code = 0;
22,561✔
43
  int32_t type = pTask->outputInfo.type;
22,561✔
44
  if (type == TASK_OUTPUT__TABLE) {
22,561✔
45
    pTask->outputInfo.tbSink.tbSinkFunc(pTask, pTask->outputInfo.tbSink.vnode, pBlock->blocks);
10,707✔
46
    destroyStreamDataBlock(pBlock);
10,705✔
47
  } else if (type == TASK_OUTPUT__SMA) {
11,854!
48
    pTask->outputInfo.smaSink.smaSink(pTask->outputInfo.smaSink.vnode, pTask->outputInfo.smaSink.smaId, pBlock->blocks);
×
49
    destroyStreamDataBlock(pBlock);
×
50
  } else {
51
    if (type != TASK_OUTPUT__FIXED_DISPATCH && type != TASK_OUTPUT__SHUFFLE_DISPATCH &&
11,854!
52
        type != TASK_OUTPUT__VTABLE_MAP) {
53
      stError("s-task:%s invalid stream output type:%d, internal error", pTask->id.idStr, type);
×
54
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
55
    }
56

57
    code = streamTaskPutDataIntoOutputQ(pTask, pBlock);
11,854✔
58
    if (code != TSDB_CODE_SUCCESS) {
11,853!
59
      destroyStreamDataBlock(pBlock);
×
60
      return code;
×
61
    }
62

63
    // not handle error, if dispatch failed, try next time.
64
    // checkpoint trigger will be checked
65
    code = streamDispatchStreamBlock(pTask);
11,853✔
66
  }
67

68
  return code;
22,565✔
69
}
70

71
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
11,453✔
72
                            int32_t* totalBlocks) {
73
  int32_t numOfBlocks = taosArrayGetSize(pRes);
11,453✔
74
  if (numOfBlocks == 0) {
11,453!
75
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
×
76
    return TSDB_CODE_SUCCESS;
×
77
  }
78

79
  SStreamDataBlock* pStreamBlocks = NULL;
11,453✔
80

81
  int32_t code = createStreamBlockFromResults(pItem, pTask, size, pRes, &pStreamBlocks);
11,453✔
82
  if (code) {
11,453!
83
    stError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
×
84
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
×
85
    return TSDB_CODE_OUT_OF_MEMORY;
×
86
  }
87

88
  stDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
11,453✔
89
          SIZE_IN_MiB(size));
90

91
  code = doOutputResultBlockImpl(pTask, pStreamBlocks);
11,453✔
92
  if (code != TSDB_CODE_SUCCESS) {  // back pressure and record position
11,453!
93
    return code;
×
94
  }
95

96
  *totalSize += size;
11,453✔
97
  *totalBlocks += numOfBlocks;
11,453✔
98

99
  return code;
11,453✔
100
}
101

102
static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, SStreamDataBlock* pRetrieveBlock,
512✔
103
                                     SArray* pRes) {
104
  SSDataBlock block = {0};
512✔
105
  int32_t     num = taosArrayGetSize(pRetrieveBlock->blocks);
512✔
106
  if (num != 1) {
512!
107
    stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num);
×
108
    return TSDB_CODE_INVALID_PARA;
×
109
  }
110

111
  void*   p = taosArrayGet(pRetrieveBlock->blocks, 0);
512✔
112
  int32_t code = assignOneDataBlock(&block, p);
512✔
113
  if (code) {
512!
114
    stError("s-task:%s failed to assign retrieve block, code:%s", pTask->id.idStr, tstrerror(code));
×
115
    return code;
×
116
  }
117

118
  block.info.type = STREAM_PULL_OVER;
512✔
119
  block.info.childId = pTask->info.selfChildId;
512✔
120

121
  p = taosArrayPush(pRes, &block);
512✔
122
  if (p != NULL) {
512!
123
    (*pNumOfBlocks) += 1;
512✔
124
    stDebug("s-task:%s(child %d) retrieve res from upstream completed, QID:0x%" PRIx64, pTask->id.idStr,
512✔
125
            pTask->info.selfChildId, pRetrieveBlock->reqId);
126
  } else {
127
    code = terrno;
×
128
    stError("s-task:%s failed to append pull over block for retrieve data, QID:0x%" PRIx64 " code:%s", pTask->id.idStr,
×
129
            pRetrieveBlock->reqId, tstrerror(code));
130
  }
131

132
  return code;
512✔
133
}
134

135
static int32_t doAppendRecalBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, SStreamTrigger* pRecalculateBlock,
×
136
                                  SArray* pRes) {
137
  int32_t code = 0;
×
138
  SSDataBlock block = {0};
×
139

140
  void* p = taosArrayPush(pRes, pRecalculateBlock->pBlock);
×
141
  if (p != NULL) {
×
142
    (*pNumOfBlocks) += 1;
×
143
    stDebug("s-task:%s(child %d) recalculate from upstream completed, QID:0x%" PRIx64, pTask->id.idStr,
×
144
            pTask->info.selfChildId, /*pRecalculateBlock->reqId*/ (int64_t)0);
145
  } else {
146
    code = terrno;
×
147
    stError("s-task:%s failed to append recalculate block for downstream, QID:0x%" PRIx64" code:%s", pTask->id.idStr,
×
148
            /*pRecalculateBlock->reqId*/(int64_t)0, tstrerror(code));
149
  }
150

151
  return code;
×
152
}
153

154
int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
30,750✔
155
  int32_t size = 0;
30,750✔
156
  int32_t numOfBlocks = 0;
30,750✔
157
  int32_t code = TSDB_CODE_SUCCESS;
30,750✔
158
  void*   pExecutor = pTask->exec.pExecutor;
30,750✔
159
  SArray* pRes = NULL;
30,750✔
160

161
  *totalBlocks = 0;
30,750✔
162
  *totalSize = 0;
30,750✔
163

164
  while (1) {
25,487✔
165
    SSDataBlock* output = NULL;
56,237✔
166
    uint64_t     ts = 0;
56,237✔
167

168
    if (pRes == NULL) {
56,237✔
169
      pRes = taosArrayInit(4, sizeof(SSDataBlock));
30,752✔
170
    }
171

172
    if (streamTaskShouldStop(pTask) || (pRes == NULL)) {
56,254!
173
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
3✔
174
      return code;
4✔
175
    }
176

177
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
56,253✔
178
      if (code == TSDB_CODE_QRY_IN_EXEC) {
7!
179
        qResetTaskInfoCode(pExecutor);
×
180
      }
181

182
      if (code == TSDB_CODE_OUT_OF_MEMORY || code == TSDB_CODE_INVALID_PARA || code == TSDB_CODE_FILE_CORRUPTED) {
7!
183
        stFatal("s-task:%s failed to continue execute since %s", pTask->id.idStr, tstrerror(code));
3!
184
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
3✔
185
        return code;
×
186
      } else {
187
        qResetTaskCode(pExecutor);
4✔
188
        continue;
686✔
189
      }
190
    }
191

192
    if (output == NULL) {
56,269✔
193
      if (pItem != NULL && (pItem->type == STREAM_INPUT__DATA_RETRIEVE)) {
30,782!
194
        code = doAppendPullOverBlock(pTask, &numOfBlocks, (SStreamDataBlock*)pItem, pRes);
512✔
195
        if (code) {
512!
196
          taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
×
197
          return code;
×
198
        }
199
      }
200

201
      break;
30,785✔
202
    }
203

204
    if (pTask->info.fillHistory == STREAM_RECALCUL_TASK && pTask->info.taskLevel == TASK_LEVEL__AGG) {
25,487!
205
      stDebug("s-task:%s exec output type:%d", pTask->id.idStr, output->info.type);
×
206
    }
207

208
    if (output->info.type == STREAM_RETRIEVE) {
25,487✔
209
      if (streamBroadcastToUpTasks(pTask, output) < 0) {
153✔
210
        // TODO
211
      }
212
      continue;
153✔
213
    } else if (output->info.type == STREAM_CHECKPOINT) {
25,334✔
214
      continue;  // checkpoint block not dispatch to downstream tasks
529✔
215
    }
216

217
    SSDataBlock block = {.info.childId = pTask->info.selfChildId};
24,805✔
218
    code = assignOneDataBlock(&block, output);
24,805✔
219
    if (code) {
24,805!
220
      stError("s-task:%s failed to build result block due to out of memory", pTask->id.idStr);
×
221
      continue;
×
222
    }
223

224
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
24,805✔
225
    numOfBlocks += 1;
24,801✔
226

227
    void* p = taosArrayPush(pRes, &block);
24,801✔
228
    if (p == NULL) {
24,801!
229
      stError("s-task:%s failed to add computing results, the final res may be incorrect", pTask->id.idStr);
×
230
    } else {
231
      stDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
24,801✔
232
              pTask->info.selfChildId, numOfBlocks, SIZE_IN_MiB(size));
233
    }
234

235
    // current output should be dispatched to down stream nodes
236
    if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
24,801!
237
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
3✔
238
      // todo: here we need continue retry to put it into output buffer
239
      if (code != TSDB_CODE_SUCCESS) {
3!
240
        return code;
×
241
      }
242

243
      pRes = NULL;
3✔
244
      size = 0;
3✔
245
      numOfBlocks = 0;
3✔
246
    }
247
  }
248

249
  if (numOfBlocks > 0) {
30,785✔
250
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
11,450✔
251
  } else {
252
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
19,335✔
253
  }
254

255
  return code;
30,781✔
256
}
257

258
// todo contiuous try to create result blocks
259
static int32_t handleScanhistoryResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) {
2,370✔
260
  int32_t code = TSDB_CODE_SUCCESS;
2,370✔
261
  if (taosArrayGetSize(pRes) > 0) {
2,370✔
262
    SStreamDataBlock* pStreamBlocks = NULL;
1,305✔
263
    code = createStreamBlockFromResults(NULL, pTask, size, pRes, &pStreamBlocks);
1,305✔
264
    if (code) {
1,305!
265
      stError("s-task:%s failed to build history result blocks", pTask->id.idStr);
×
266
      return code;
×
267
    }
268

269
    code = doOutputResultBlockImpl(pTask, pStreamBlocks);
1,305✔
270
    if (code != TSDB_CODE_SUCCESS) {  // should not have error code
1,305!
271
      stError("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code));
×
272
    }
273
  } else {
274
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
1,065✔
275
  }
276
  return code;
2,370✔
277
}
278

279
static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t* pSize, bool* pFinish) {
2,372✔
280
  int32_t code = TSDB_CODE_SUCCESS;
2,372✔
281
  void*   exec = pTask->exec.pExecutor;
2,372✔
282
  int32_t numOfBlocks = 0;
2,372✔
283

284
  while (1) {
11,376✔
285
    if (streamTaskShouldStop(pTask)) {
13,748!
286
      break;
×
287
    }
288

289
    if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
13,748!
290
      stDebug("s-task:%s level:%d inputQ is blocked, retry in 5s", pTask->id.idStr, pTask->info.taskLevel);
×
291
      break;
×
292
    }
293

294
    SSDataBlock* output = NULL;
13,748✔
295
    uint64_t     ts = 0;
13,748✔
296
    code = qExecTask(exec, &output, &ts);
13,748✔
297
    if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) {  // if out of memory occurs, quit
13,748!
298
      stError("s-task:%s scan-history data error occurred code:%s, continue scan-history", pTask->id.idStr,
×
299
              tstrerror(code));
300
      qResetTaskCode(exec);
×
301
      continue;
×
302
    }
303

304
    // the generated results before fill-history task been paused, should be dispatched to sink node
305
    if (output == NULL) {
13,748✔
306
      (*pFinish) = qStreamScanhistoryFinished(exec);
1,793✔
307
      break;
1,793✔
308
    }
309

310
    SSDataBlock block = {0};
11,955✔
311
    code = assignOneDataBlock(&block, output);
11,955✔
312
    if (code) {
11,955!
313
      stError("s-task:%s failed to build result block due to out of memory", pTask->id.idStr);
×
314
    }
315

316
    block.info.childId = pTask->info.selfChildId;
11,955✔
317
    void* p = taosArrayPush(pRes, &block);
11,955✔
318
    if (p == NULL) {
11,955!
319
      stError("s-task:%s failed to add computing results, the final res may be incorrect", pTask->id.idStr);
×
320
    }
321

322
    (*pSize) +=
11,955✔
323
        blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
11,955✔
324
    numOfBlocks += 1;
11,955✔
325

326
    if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || (*pSize) >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
11,955✔
327
      stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached",
579!
328
              pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(*pSize), STREAM_RESULT_DUMP_THRESHOLD,
329
              SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD));
330
      break;
579✔
331
    }
332
  }
333
}
2,372✔
334

335
static SScanhistoryDataInfo buildScanhistoryExecRet(EScanHistoryCode code, int32_t idleTime) {
1,937✔
336
  return (SScanhistoryDataInfo){code, idleTime};
1,937✔
337
}
338

339
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
1,932✔
340
  void*       exec = pTask->exec.pExecutor;
1,932✔
341
  bool        finished = false;
1,932✔
342
  const char* id = pTask->id.idStr;
1,932✔
343

344
  if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
1,932!
345
    stError("s-task:%s not source scan-history task, not exec, quit", pTask->id.idStr);
×
346
    return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
×
347
  }
348

349
  if ((!pTask->hTaskInfo.operatorOpen) || (pTask->info.fillHistory == STREAM_RECALCUL_TASK)) {
1,932!
350
    int32_t code = qSetStreamOpOpen(exec);
1,788✔
351
    pTask->hTaskInfo.operatorOpen = true;
1,792✔
352
  }
353

354
  while (1) {
435✔
355
    if (streamTaskShouldPause(pTask)) {
2,371!
356
      stDebug("s-task:%s paused from the scan-history task", id);
×
357
      // quit from step1, not continue to handle the step2
358
      return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
1,937✔
359
    }
360

361
    // output queue is full, idle for 5 sec.
362
    if (streamQueueIsFull(pTask->outputq.queue)) {
2,371✔
363
      stWarn("s-task:%s outputQ is full, idle for 1sec and retry", id);
1!
364
      return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, STREAM_SCAN_HISTORY_TIMESLICE);
1✔
365
    }
366

367
    if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
2,371!
368
      stWarn("s-task:%s downstream task inputQ blocked, idle for 5sec and retry", id);
×
369
      return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL);
×
370
    }
371

372
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
2,371✔
373
    if (pRes == NULL) {
2,371!
374
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
375
      stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", id, tstrerror(terrno));
×
376
      continue;
×
377
    }
378

379
    int32_t size = 0;
2,371✔
380
    streamScanHistoryDataImpl(pTask, pRes, &size, &finished);
2,371✔
381

382
    if (streamTaskShouldStop(pTask)) {
2,372✔
383
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
2✔
384
      return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
2✔
385
    }
386

387
    // dispatch the generated results, todo fix error
388
    int32_t code = handleScanhistoryResultBlocks(pTask, pRes, size);
2,370✔
389
    if (code) {
2,370!
390
      stError("s-task:%s failed to handle scan result block, code:%s", pTask->id.idStr, tstrerror(code));
×
391
    }
392

393
    if (finished) {
2,370✔
394
      return buildScanhistoryExecRet(TASK_SCANHISTORY_CONT, 0);
1,791✔
395
    }
396

397
    int64_t el = taosGetTimestampMs() - st;
579✔
398
    if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) {
579!
399
      stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id,
144!
400
              pTask->info.fillHistory, el / 1000.0);
401
      return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, 100);
144✔
402
    }
403
  }
404
}
405

406
int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
1,675✔
407
  SStreamMeta* pMeta = pTask->pMeta;
1,675✔
408
  const char*  id = pTask->id.idStr;
1,675✔
409

410
  SStreamTask* pStreamTask = NULL;
1,675✔
411
  int32_t code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
1,675✔
412
  if (pStreamTask == NULL || code != TSDB_CODE_SUCCESS) {
1,675!
413
    stError(
×
414
        "s-task:%s failed to find related stream task:0x%x, may have been destroyed or closed, destroy related "
415
        "fill-history task",
416
        id, (int32_t)pTask->streamTaskId.taskId);
417

418
    // 1. free it and remove fill-history task from disk meta-store
419
    // todo: this function should never be failed.
420
    code = streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
×
421

422
    // 2. save to disk
423
    streamMetaWLock(pMeta);
×
424
    if (streamMetaCommit(pMeta) < 0) {
×
425
      // persist to disk
426
    }
427
    streamMetaWUnLock(pMeta);
×
428
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
429
  } else {
430
    double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.;
1,675✔
431
    stDebug(
1,675✔
432
        "s-task:%s fill-history task end, status:%s, scan wal elapsed time:%.2fSec, update related stream task:%s "
433
        "info, prepare transfer exec state",
434
        id, streamTaskGetStatus(pTask).name, el, pStreamTask->id.idStr);
435
  }
436

437
  ETaskStatus  status = streamTaskGetStatus(pStreamTask).state;
1,675✔
438
  STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
1,675✔
439

440
  // It must be halted for a source stream task, since when the related scan-history-data task start scan the history
441
  // for the step 2.
442
  if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
1,675✔
443
    if (!(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP)) {
1,608!
444
      stError("s-task:%s invalid task status:%d", id, status);
×
445
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
446
    }
447
  } else {
448
    if (!(status == TASK_STATUS__READY || status == TASK_STATUS__PAUSE || status == TASK_STATUS__DROPPING ||
67!
449
          status == TASK_STATUS__STOP)) {
450
      stError("s-task:%s invalid task status:%d", id, status);
×
451
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
452
    }
453
    code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
67✔
454
    if (code != TSDB_CODE_SUCCESS) {
67!
455
      stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", id,
×
456
              pStreamTask->id.idStr, tstrerror(code));
457
      streamMetaReleaseTask(pMeta, pStreamTask);
×
458
      return code;
×
459
    } else {
460
      stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, id);
67✔
461
    }
462
  }
463

464
  // In case of sink tasks, no need to halt them.
465
  // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
466
  // start the task state transfer procedure.
467
  SStreamTaskState pState = streamTaskGetStatus(pStreamTask);
1,675✔
468
  status = pState.state;
1,675✔
469
  char* p = pState.name;
1,675✔
470
  if (status == TASK_STATUS__STOP || status == TASK_STATUS__DROPPING) {
1,675!
471
    stError("s-task:%s failed to transfer state from fill-history task:%s, status:%s", id, pStreamTask->id.idStr, p);
×
472
    streamMetaReleaseTask(pMeta, pStreamTask);
×
473
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
474
  }
475

476
  // 1. expand the query time window for stream task of WAL scanner
477
  if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
1,675✔
478
    // update the scan data range for source task.
479
    stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
1,608✔
480
            ", status:%s, sched-status:%d",
481
            pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
482
            pTimeWindow->ekey, p, pStreamTask->status.schedStatus);
483

484
    code = streamTaskResetTimewindowFilter(pStreamTask);
1,608✔
485
  } else {
486
    stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr);
67✔
487
  }
488

489
  // NOTE: transfer the ownership of executor state before handle the checkpoint block during stream exec
490
  // 2. send msg to mnode to launch a checkpoint to keep the state for current stream
491
  code = streamTaskSendCheckpointReq(pStreamTask);
1,674✔
492

493
  // 3. the default task status should be ready or something, not halt.
494
  // status to the value that will be kept in disk
495

496
  // 4. open the inputQ for all upstream tasks
497
  streamTaskOpenAllUpstreamInput(pStreamTask);
1,675✔
498

499
  streamMetaReleaseTask(pMeta, pStreamTask);
1,675✔
500
  return code;
1,675✔
501
}
502

503
static int32_t haltCallback(SStreamTask* pTask, void* param) {
1,530✔
504
  streamTaskOpenAllUpstreamInput(pTask);
1,530✔
505
  return streamTaskSendCheckpointReq(pTask);
1,529✔
506
}
507

508
int32_t streamTransferStatePrepare(SStreamTask* pTask) {
3,206✔
509
  int32_t      code = TSDB_CODE_SUCCESS;
3,206✔
510
  SStreamMeta* pMeta = pTask->pMeta;
3,206✔
511

512
  if (pTask->status.appendTranstateBlock != 1) {
3,206!
513
    stError("s-task:%s not set appendTransBlock flag, internal error", pTask->id.idStr);
×
514
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
515
  }
516

517
  int32_t level = pTask->info.taskLevel;
3,206✔
518
  if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE || level == TASK_LEVEL__MERGE) {  // do transfer task operator states.
3,206!
519
    code = streamTransferStateDoPrepare(pTask);
1,673✔
520
  } else {
521
    // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task.
522
    SStreamTask* pStreamTask = NULL;
1,533✔
523
    code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
1,533✔
524
    if (pStreamTask != NULL) {
1,533!
525
      // halt the related stream sink task
526
      code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, haltCallback, NULL);
1,533✔
527
      if (code != TSDB_CODE_SUCCESS) {
1,529!
528
        stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", pTask->id.idStr,
×
529
                pStreamTask->id.idStr, tstrerror(code));
530
        streamMetaReleaseTask(pMeta, pStreamTask);
×
531
        return code;
×
532
      } else {
533
        stDebug("s-task:%s sink task halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
1,529✔
534
      }
535
      streamMetaReleaseTask(pMeta, pStreamTask);
1,529✔
536
    }
537
  }
538

539
  return code;
3,207✔
540
}
541

542
// set input
543
static int32_t doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pVer, const char* id) {
30,765✔
544
  void*   pExecutor = pTask->exec.pExecutor;
30,765✔
545
  int32_t code = 0;
30,765✔
546

547
  const SStreamQueueItem* pItem = pInput;
30,765✔
548
  if (pItem->type == STREAM_INPUT__GET_RES) {
30,765✔
549
    const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
5,464✔
550
    code = qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
5,464✔
551
    if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
5,464✔
552
      TSKEY k = pTrigger->pBlock->info.window.skey;
4,472✔
553
      stDebug("s-task:%s set force_window_close as source block, skey:%" PRId64, id, k);
4,472!
554
      (*pVer) = k;
4,472✔
555
    }
556
  } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
25,301✔
557
    const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
6,509✔
558
    code = qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
6,509✔
559
    stDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
6,511✔
560
            pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
561
    if ((*pVer) > pSubmit->submit.ver) {
6,513!
562
      stError("s-task:%s invalid recorded ver:%" PRId64 " greater than new block ver:%" PRId64 ", not update", id,
×
563
              *pVer, pSubmit->submit.ver);
564
    } else {
565
      (*pVer) = pSubmit->submit.ver;
6,513✔
566
    }
567
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
20,955✔
568
    const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;
2,163✔
569

570
    SArray* pBlockList = pBlock->blocks;
2,163✔
571
    int32_t numOfBlocks = taosArrayGetSize(pBlockList);
2,163✔
572
    stDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer);
2,165✔
573
    code = qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
2,165✔
574

575
  } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
16,629✔
576
    const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;
12,029✔
577

578
    SArray* pBlockList = pMerged->submits;
12,029✔
579
    int32_t numOfBlocks = taosArrayGetSize(pBlockList);
12,029✔
580
    stDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, numOfBlocks,
12,026✔
581
            pMerged->ver);
582
    code = qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
12,026✔
583

584
    if ((*pVer) > pMerged->ver) {
12,019✔
585
      stError("s-task:%s invalid recorded ver:%" PRId64 " greater than new block ver:%" PRId64 ", not update", id,
1!
586
              *pVer, pMerged->ver);
587
    } else {
588
      (*pVer) = pMerged->ver;
12,018✔
589
    }
590

591
  } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
4,600✔
592
    const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
2,148✔
593
    code = qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
2,148✔
594

595
  } else if (pItem->type == STREAM_INPUT__CHECKPOINT || pItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
2,452!
596
             pItem->type == STREAM_INPUT__RECALCULATE) {
2,458!
597
    const SStreamDataBlock* pCheckpoint = (const SStreamDataBlock*)pInput;
2,452✔
598
    code = qSetMultiStreamInput(pExecutor, pCheckpoint->blocks, 1, pItem->type);
2,452✔
599

600
    if (pItem->type == STREAM_INPUT__RECALCULATE) {
2,458!
601
      int32_t t = ((SStreamDataBlock*) pCheckpoint)->type;
×
602
      int32_t tId = (int32_t)pTask->hTaskInfo.id.taskId;
×
603
      if (t == STREAM_RECALCULATE_START) {
×
604
        stDebug("s-task:%s set recalculate block to start related recalculate task:0x%x", id, tId);
×
605
      } else {
606
        stDebug("s-task:%s set recalculate block:%d, task:0x%x", id, t, tId);
×
607
      }
608
    }
609
  } else {
610
    stError("s-task:%s invalid input block type:%d, discard", id, pItem->type);
×
611
    code = TSDB_CODE_STREAM_INTERNAL_ERROR;
×
612
  }
613

614
  return code;
30,755✔
615
}
616

617
void streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
6,256✔
618
  const char* id = pTask->id.idStr;
6,256✔
619
  int32_t     code = TSDB_CODE_SUCCESS;
6,256✔
620
  int32_t     level = pTask->info.taskLevel;
6,256✔
621

622
  // dispatch the tran-state block to downstream task immediately
623
  int32_t type = pTask->outputInfo.type;
6,256✔
624

625
  if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SINK || level == TASK_LEVEL__MERGE) {
6,256✔
626
    int32_t remain = streamAlignTransferState(pTask);
4,648✔
627
    if (remain > 0) {
4,656✔
628
      streamFreeQitem((SStreamQueueItem*)pBlock);
3,056✔
629
      stDebug("s-task:%s receive upstream trans-state msg, not sent remain:%d", id, remain);
3,057✔
630
      return;
3,055✔
631
    }
632
  }
633

634
  // transfer the ownership of executor state
635
  if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH || type == TASK_OUTPUT__VTABLE_MAP) {
3,208✔
636
    if (level == TASK_LEVEL__SOURCE) {
1,660✔
637
      stDebug("s-task:%s add transfer-state block into outputQ", id);
1,593✔
638
    } else {
639
      stDebug("s-task:%s all upstream tasks send transfer-state block, add transfer-state block into outputQ", id);
67✔
640
    }
641

642
    // agg task should dispatch trans-state msg to sink task, to flush all data to sink task.
643
    if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE || level == TASK_LEVEL__MERGE) {
1,660!
644
      pBlock->srcVgId = pTask->pMeta->vgId;
1,660✔
645
      code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
1,660✔
646
      if (code == 0) {
1,661!
647
        code = streamDispatchStreamBlock(pTask);
1,661✔
648
        if (code) {
1,661!
649
          stError("s-task:%s failed to dispatch stream block, code:%s", id, tstrerror(code));
×
650
        }
651
      } else {  // todo put into queue failed, retry
652
        streamFreeQitem((SStreamQueueItem*)pBlock);
×
653
      }
654
    } else {  // level == TASK_LEVEL__SINK
655
      streamFreeQitem((SStreamQueueItem*)pBlock);
×
656
    }
657
  } else {  // non-dispatch task, do task state transfer directly
658
    streamFreeQitem((SStreamQueueItem*)pBlock);
1,548✔
659
    stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, level);
1,549✔
660

661
    code = streamTransferStatePrepare(pTask);
1,549✔
662
    if (code != TSDB_CODE_SUCCESS) {
1,549!
663
      stError("s-task:%s failed to prepare transfer state, code:%s", id, tstrerror(code));
×
664
      int8_t status = streamTaskSetSchedStatusInactive(pTask);  // let's ignore this return status
×
665
    }
666
  }
667
}
668

669
// static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
670
static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; }
53,740✔
671

672
static void doRecordThroughput(STaskExecStatisInfo* pInfo, int64_t totalBlocks, int64_t totalSize, int64_t blockSize,
30,779✔
673
                               double st, const char* id) {
674
  double el = (taosGetTimestampMs() - st) / 1000.0;
30,780✔
675

676
  stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%" PRId64, id,
30,780✔
677
          el, SIZE_IN_MiB(totalSize), totalBlocks);
678

679
  pInfo->outputDataBlocks += totalBlocks;
30,778✔
680
  pInfo->outputDataSize += totalSize;
30,778✔
681
  if (fabs(el - 0.0) <= DBL_EPSILON) {
30,778✔
682
    pInfo->procsThroughput = 0;
13,806✔
683
    pInfo->outputThroughput = 0;
13,806✔
684
  } else {
685
    pInfo->outputThroughput = (totalSize / el);
16,972✔
686
    pInfo->procsThroughput = (blockSize / el);
16,972✔
687
  }
688
}
30,778✔
689

690
static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, int32_t num) {
30,763✔
691
  const char*      id = pTask->id.idStr;
30,763✔
692
  int32_t          blockSize = 0;
30,763✔
693
  int64_t          st = taosGetTimestampMs();
30,773✔
694
  SCheckpointInfo* pInfo = &pTask->chkInfo;
30,773✔
695
  int64_t          ver = pInfo->processedVer;
30,773✔
696
  int64_t          totalSize = 0;
30,773✔
697
  int32_t          totalBlocks = 0;
30,773✔
698
  int32_t          code = 0;
30,773✔
699

700
  stDebug("s-task:%s start to process batch blocks, num:%d, type:%s", id, num, streamQueueItemGetTypeStr(pBlock->type));
30,773✔
701
  code = doSetStreamInputBlock(pTask, pBlock, &ver, id);
30,773✔
702
  if (code) {
30,754!
703
    stError("s-task:%s failed to set input block, not exec for these blocks", id);
×
704
    return code;
×
705
  }
706

707
  code = streamTaskExecImpl(pTask, pBlock, &totalSize, &totalBlocks);
30,754✔
708
  if (code) {
30,785✔
709
    return code;
4✔
710
  }
711

712
  doRecordThroughput(&pTask->execInfo, totalBlocks, totalSize, blockSize, st, pTask->id.idStr);
30,781✔
713

714
  // update the currentVer if processing the submitted blocks.
715
  if (!(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer)) {
30,779!
716
    stError("s-task:%s invalid info, checkpointVer:%" PRId64 ", nextProcessVer:%" PRId64 " currentVer:%" PRId64, id,
×
717
            pInfo->checkpointVer, pInfo->nextProcessVer, ver);
718
    return code;
×
719
  }
720

721
  if (ver != pInfo->processedVer) {
30,781✔
722
    stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64
23,018✔
723
            " ckpt:%" PRId64,
724
            id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer);
725
    pInfo->processedVer = ver;
23,019✔
726
  }
727

728
  return code;
30,782✔
729
}
730

731
// do nothing after sync executor state to storage backend, untill checkpoint is completed.
732
static int32_t doHandleChkptBlock(SStreamTask* pTask) {
2,406✔
733
  int32_t     code = 0;
2,406✔
734
  const char* id = pTask->id.idStr;
2,406✔
735

736
  streamMutexLock(&pTask->lock);
2,406✔
737
  SStreamTaskState pState = streamTaskGetStatus(pTask);
2,406✔
738
  streamMutexUnlock(&pTask->lock);
2,406✔
739

740
  if (pState.state == TASK_STATUS__CK) {  // todo other thread may change the status
2,406!
741
    stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
2,406✔
742
    code = streamTaskBuildCheckpoint(pTask);  // ignore this error msg, and continue
2,406✔
743
  } else {                                    // todo refactor
744
    if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
×
745
      code = streamTaskSendCheckpointSourceRsp(pTask);
×
746
    } else {
747
      code = streamTaskSendCheckpointReadyMsg(pTask);
×
748
    }
749

750
    if (code != TSDB_CODE_SUCCESS) {
×
751
      // todo: let's retry send rsp to upstream/mnode
752
      stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", id, 0,
×
753
              tstrerror(code));
754
    }
755
  }
756

757
//  streamMutexUnlock(&pTask->lock);
758
  return code;
2,406✔
759
}
760

761
int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) {
2,458✔
762
  const char* id = pTask->id.idStr;
2,458✔
763

764
  // 1. transfer the ownership of executor state
765
  bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
2,458✔
766
  if (dropRelHTask) {
2,458✔
767
    STaskId*     pHTaskId = &pTask->hTaskInfo.id;
1,621✔
768
    SStreamTask* pHTask = NULL;
1,621✔
769
    int32_t      code = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
1,621✔
770
    if (code == TSDB_CODE_SUCCESS) {  // ignore the error code.
1,621!
771
      code = streamTaskReleaseState(pHTask);
1,621✔
772
      if (code) {
1,621!
773
        stError("s-task:%s failed to release query state, code:%s", pHTask->id.idStr, tstrerror(code));
×
774
      }
775

776
      if (code == TSDB_CODE_SUCCESS) {
1,621!
777
        code = streamTaskReloadState(pTask);
1,621✔
778
        if (code) {
1,621!
779
          stError("s-task:%s failed to reload query state, code:%s", pTask->id.idStr, tstrerror(code));
×
780
        }
781
      }
782

783
      stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
1,621✔
784
              streamTaskGetStatus(pHTask).name);
785
      // todo execute qExecTask to fetch the reload-generated result, if this is stream is for session window query.
786
      /*
787
       * while(1) {
788
       * qExecTask()
789
       * }
790
       * // put into the output queue.
791
       */
792
      streamMetaReleaseTask(pTask->pMeta, pHTask);
1,621✔
793
    } else {
794
      stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id,
×
795
              (int32_t)pHTaskId->taskId);
796
    }
797
  } else {
798
    stDebug("s-task:%s no transfer-state needed", id);
837✔
799
  }
800

801
  // 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V.
802
  int32_t code = doStreamTaskExecImpl(pTask, pCheckpointBlock, 1);
2,458✔
803
  if (code) {
2,458!
804
    stError("s-task:%s failed to exec stream task before checkpoint, code:%s", id, tstrerror(code));
×
805
  }
806

807
  return code;
2,458✔
808
}
809

810
/**
811
 * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
812
 * appropriate batch of blocks should be handled in 5 to 10 sec.
813
 */
814
static int32_t doStreamExecTask(SStreamTask* pTask) {
53,921✔
815
  const char* id = pTask->id.idStr;
53,921✔
816
  int32_t     code = 0;
53,921✔
817
  int32_t     vgId = pTask->pMeta->vgId;
53,921✔
818
  int32_t     taskLevel = pTask->info.taskLevel;
53,921✔
819
  int32_t     taskType = pTask->info.fillHistory;
53,921✔
820

821
  // merge multiple input data if possible in the input queue.
822
  int64_t st = taosGetTimestampMs();
53,935✔
823
  stDebug("s-task:%s start to extract data block from inputQ, ts:%" PRId64, id, st);
53,935✔
824

825
  while (1) {
52,285✔
826
    int32_t           blockSize = 0;
106,226✔
827
    int32_t           numOfBlocks = 0;
106,226✔
828
    SStreamQueueItem* pInput = NULL;
106,226✔
829

830
    if (streamTaskShouldStop(pTask) || (streamTaskGetStatus(pTask).state == TASK_STATUS__UNINIT)) {
106,226✔
831
      stDebug("s-task:%s stream task is stopped", id);
4!
832
      return 0;
53,945✔
833
    }
834

835
    if (streamQueueIsFull(pTask->outputq.queue)) {
106,215✔
836
      stTrace("s-task:%s outputQ is full, idle for 500ms and retry", id);
4!
837
      streamTaskSetIdleInfo(pTask, 1000);
4✔
838
      return 0;
×
839
    }
840

841
    if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
106,255!
842
      stTrace("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", id);
×
843
      streamTaskSetIdleInfo(pTask, 1000);
×
844
      return 0;
×
845
    }
846

847
    if (taosGetTimestampMs() - pTask->status.lastExecTs < MIN_INVOKE_INTERVAL) {
106,243✔
848
      stDebug("s-task:%s invoke exec too fast, idle and retry in 50ms", id);
4,433✔
849
      streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
4,433✔
850
      return 0;
4,432✔
851
    }
852

853
    EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
101,810✔
854
    if (ret == EXEC_AFTER_IDLE) {
101,774!
855
      streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
×
856
      return 0;
×
857
    } else {
858
      if (pInput == NULL) {
101,807✔
859
        return 0;
47,111✔
860
      }
861
    }
862

863
    pTask->execInfo.inputDataBlocks += numOfBlocks;
54,696✔
864
    pTask->execInfo.inputDataSize += blockSize;
54,696✔
865

866
    // dispatch checkpoint msg to all downstream tasks
867
    int32_t type = pInput->type;
54,696✔
868
    if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
54,696✔
869
#if 0
870
      // Injection error: for automatic kill long trans test
871
      taosMsleep(50*1000);
872
#endif
873
      code = streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput);
7,896✔
874
      if (code != 0) {
7,896!
875
        stError("s-task:%s failed to process checkpoint-trigger block, code:%s", pTask->id.idStr, tstrerror(code));
×
876
      }
877
      continue;
23,962✔
878
    }
879

880
    if (type == STREAM_INPUT__TRANS_STATE) {
46,800✔
881
      streamProcessTransstateBlock(pTask, (SStreamDataBlock*)pInput);
6,259✔
882
      continue;
6,259✔
883
    }
884

885
    if (type == STREAM_INPUT__CHECKPOINT) {
40,541✔
886
      code = doHandleChkptBlock(pTask);
2,406✔
887
      streamFreeQitem(pInput);
2,406✔
888
      return code;
2,397✔
889
    }
890

891
    if (taskLevel == TASK_LEVEL__SINK) {
38,135✔
892
      if (type != STREAM_INPUT__DATA_BLOCK && type != STREAM_INPUT__RECALCULATE) {
9,806!
893
        stError("s-task:%s invalid block type:%d for sink task, discard", id, type);
×
894
        continue;
×
895
      }
896

897
      // here only handle the data block sink operation
898
      if (type == STREAM_INPUT__DATA_BLOCK) {
9,806✔
899
        pTask->execInfo.sink.dataSize += blockSize;
9,804✔
900
        stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize));
9,804✔
901
        code = doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput);
9,804✔
902
        if (code != TSDB_CODE_SUCCESS) {
9,807!
903
          return code;
×
904
        }
905

906
        double el = (taosGetTimestampMs() - st) / 1000.0;
9,807✔
907
        pTask->execInfo.procsThroughput = (fabs(el - 0.0) <= DBL_EPSILON) ? 0 : (blockSize / el);
9,807✔
908
      } else {
909
        streamFreeQitem((SStreamQueueItem*)pInput);
2✔
910
      }
911

912
      continue;
9,807✔
913
    }
914

915
    if (type == STREAM_INPUT__RECALCULATE) {
28,329!
916
      if (taskType == STREAM_NORMAL_TASK && taskLevel == TASK_LEVEL__AGG) {
×
917
        int32_t remain = streamAlignRecalculateStart(pTask);
×
918
        if (remain > 0) {
×
919
          streamFreeQitem((SStreamQueueItem*)pInput);
×
920
          stDebug("s-task:%s receive upstream recalculate msg, not sent remain:%d", id, remain);
×
921
          return code;
×
922
        }
923

924
        stDebug("s-task:%s all upstream send recalculate msg, continue", id);
×
925
      }
926

927
      // 1. generate the recalculating snapshot for related recalculate tasks.
928
      if ((taskType == STREAM_NORMAL_TASK) &&
×
929
          ((taskLevel == TASK_LEVEL__AGG) || (taskLevel == TASK_LEVEL__SOURCE && (!pTask->info.hasAggTasks)))) {
×
930
        code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks);
×
931
      } else if (taskType == STREAM_RECALCUL_TASK && taskLevel == TASK_LEVEL__AGG) {
×
932
        // send retrieve to upstream tasks (source tasks, to start to recalculate procedure.
933
        stDebug("s-task:%s recalculate agg task send retrieve to upstream source tasks", id);
×
934
        code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks);
×
935
      }
936
    }
937

938
    if (type != STREAM_INPUT__RECALCULATE) {
28,329✔
939
      code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks);
28,306✔
940
      streamFreeQitem(pInput);
28,322✔
941
      if (code) {
28,331✔
942
        return code;
4✔
943
      }
944
    }
945

946
    // for stream with only 1 task, start related re-calculate stream task directly.
947
    // We only start the re-calculate agg task here, and do NOT start the source task, for streams with agg-tasks.
948
    if ((type == STREAM_INPUT__RECALCULATE) && (taskType == STREAM_NORMAL_TASK)) {
28,350!
949
      SSDataBlock* pb = taosArrayGet(((SStreamDataBlock*)pInput)->blocks, 0);
×
950

951
      if ((taskLevel == TASK_LEVEL__AGG) || ((taskLevel == TASK_LEVEL__SOURCE) && (!pTask->info.hasAggTasks))) {
×
952
        EStreamType blockType = pb->info.type;
×
953

954
        if (pTask->hTaskInfo.id.streamId == 0) {
×
955
          stError("s-task:%s related re-calculate stream task is dropping, failed to start re-calculate", id);
×
956
          streamFreeQitem(pInput);
×
957
          return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
958
        }
959

960
        if (pTask->info.trigger != STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
×
961
          stError("s-task:%s invalid trigger model, expect:%d, actually:%d, not exec tasks", id,
×
962
                  STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE, pTask->info.trigger);
963
          streamFreeQitem(pInput);
×
964
          return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
965
        }
966

967
        SStreamTask* pHTask = NULL;
×
968
        code = streamMetaAcquireTask(pTask->pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId, &pHTask);
×
969
        if (code != 0) {
×
970
          stError("s-task:%s failed to acquire related recalculate task:0x%x, not start the recalculation, code:%s", id,
×
971
                  (int32_t)pTask->hTaskInfo.id.taskId, tstrerror(code));
972
          streamFreeQitem(pInput);
×
973
          return code;
×
974
        }
975

976
        if (blockType == STREAM_RECALCULATE_START) {
×
977
          // start the related recalculate task to do recalculate
978
          stDebug("s-task:%s start recalculate task to do recalculate:0x%x", id, pHTask->id.taskId);
×
979

980
          if (taskLevel == TASK_LEVEL__SOURCE) {
×
981
            code = streamStartScanHistoryAsync(pHTask, 0);
×
982
          } else {  // for agg task, in normal stream queue to execute
983
            SStreamDataBlock* pRecalBlock = NULL;
×
984
            code = streamCreateRecalculateBlock(pTask, &pRecalBlock, STREAM_RECALCULATE_START);
×
985
            if (code) {
×
986
              stError("s-task:%s failed to generate recalculate block, code:%s", id, tstrerror(code));
×
987
            } else {
988
              code = streamTaskPutDataIntoInputQ(pHTask, (SStreamQueueItem*)pRecalBlock);
×
989
              if (code != TSDB_CODE_SUCCESS) {
×
990
                stError("s-task:%s failed to put recalculate block into q, code:%s", pTask->id.idStr, tstrerror(code));
×
991
              } else {
992
                stDebug("s-task:%s put recalculate block into inputQ", pHTask->id.idStr);
×
993
              }
994
              code = streamTrySchedExec(pHTask, false);
×
995
            }
996
          }
997
        }
998
        streamMetaReleaseTask(pTask->pMeta, pHTask);
×
999
      } else if ((taskLevel == TASK_LEVEL__SOURCE) && pTask->info.hasAggTasks) {
×
1000
        code = continueDispatchRecalculateStart((SStreamDataBlock*)pInput, pTask);
×
1001
        pInput = NULL;
×
1002
      }
1003
    }
1004

1005
    if (type == STREAM_INPUT__RECALCULATE) {
28,350!
1006
      streamFreeQitem(pInput);
×
1007
    }
1008

1009
    if (code) {
28,323!
1010
      return code;
×
1011
    }
1012

1013
    if (taskType == STREAM_RECALCUL_TASK && taskLevel == TASK_LEVEL__AGG && type != STREAM_INPUT__RECALCULATE) {
28,323!
1014
      bool complete = qStreamScanhistoryFinished(pTask->exec.pExecutor);
×
1015
      if (complete) {
×
1016
        stDebug("s-task:%s recalculate agg task complete recalculate procedure", id);
×
1017
        return 0;
×
1018
      }
1019
    }
1020

1021
    double el = (taosGetTimestampMs() - st) / 1000.0;
28,323✔
1022
    if (el > 2.0) {  // elapsed more than 5 sec, not occupy the CPU anymore
28,323!
1023
      stDebug("s-task:%s occupy more than 2.0s, release the exec threads and idle for 500ms", id);
×
1024
      streamTaskSetIdleInfo(pTask, 500);
×
1025
      return code;
×
1026
    }
1027
  }
1028

1029
  
1030
}
1031

1032
// the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not
1033
// be updated by tryExec function, therefore, the schedStatus will always be the TASK_SCHED_STATUS__WAITING.
1034
bool streamTaskIsIdle(const SStreamTask* pTask) {
2,998✔
1035
  ETaskStatus status = streamTaskGetStatus(pTask).state;
2,998✔
1036
  return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE || status == TASK_STATUS__STOP ||
2,998!
1037
          status == TASK_STATUS__DROPPING);
1038
}
1039

1040
bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
53,147✔
1041
  SStreamTaskState pState = streamTaskGetStatus(pTask);
53,147✔
1042

1043
  ETaskStatus st = pState.state;
53,155✔
1044
  if (pStatus != NULL) {
53,155!
1045
    *pStatus = pState.name;
53,172✔
1046
  }
1047

1048
  // pause & halt will still run for sink tasks.
1049
  if (streamTaskIsSinkTask(pTask)) {
53,155✔
1050
    return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK ||
9,602✔
1051
            st == TASK_STATUS__PAUSE || st == TASK_STATUS__HALT);
28,737!
1052
  } else {
1053
    return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK ||
34,002✔
1054
            st == TASK_STATUS__HALT);
1055
  }
1056
}
1057

1058
static bool shouldNotCont(SStreamTask* pTask) {
53,956✔
1059
  int32_t       level = pTask->info.taskLevel;
53,956✔
1060
  SStreamQueue* pQueue = pTask->inputq.queue;
53,956✔
1061
  ETaskStatus   status = streamTaskGetStatus(pTask).state;
53,956✔
1062

1063
  // 1. task should jump out
1064
  bool quit = (status == TASK_STATUS__STOP) || (status == TASK_STATUS__PAUSE) || (status == TASK_STATUS__DROPPING);
53,958!
1065

1066
  // 2. checkpoint procedure, the source task's checkpoint queue is empty, not read from ordinary queue
1067
  bool emptyCkQueue = (taosQueueItemSize(pQueue->pChkptQueue) == 0);
53,958✔
1068

1069
  // 3. no data in ordinary queue
1070
  bool emptyBlockQueue = (streamQueueGetNumOfItems(pQueue) == 0);
53,970✔
1071

1072
  if (quit) {
53,966✔
1073
    return true;
20✔
1074
  } else {
1075
    if (status == TASK_STATUS__CK && level == TASK_LEVEL__SOURCE) {
53,946✔
1076
      // in checkpoint procedure, we only check whether the controller queue is empty or not
1077
      return emptyCkQueue;
4,494✔
1078
    } else { // otherwise, if the block queue is empty, not continue.
1079
      return emptyBlockQueue && emptyCkQueue;
49,452!
1080
    }
1081
  }
1082
}
1083

1084
int32_t streamResumeTask(SStreamTask* pTask) {
53,719✔
1085
  const char* id = pTask->id.idStr;
53,719✔
1086
  int32_t     level = pTask->info.taskLevel;
53,719✔
1087
  int32_t     code = 0;
53,719✔
1088

1089
  if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) {
53,719!
1090
    stError("s-task:%s invalid sched status:%d, not resume task", pTask->id.idStr, pTask->status.schedStatus);
×
1091
    return code;
×
1092
  }
1093

1094
  while (1) {
1095
    code = doStreamExecTask(pTask);
53,928✔
1096
    if (code) {
53,937✔
1097
      stError("s-task:%s failed to exec stream task, code:%s, continue", id, tstrerror(code));
4!
1098
    }
1099

1100
    streamMutexLock(&pTask->lock);
53,937✔
1101

1102
    if (shouldNotCont(pTask)) {
53,965✔
1103
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
49,602✔
1104
      streamTaskClearSchedIdleInfo(pTask);
49,609✔
1105
      streamMutexUnlock(&pTask->lock);
49,593✔
1106

1107
      setLastExecTs(pTask, taosGetTimestampMs());
49,605✔
1108

1109
      char* p = streamTaskGetStatus(pTask).name;
49,579✔
1110
      stDebug("s-task:%s exec completed, status:%s, sched-status:%d, lastExecTs:%" PRId64, id, p,
49,585✔
1111
              pTask->status.schedStatus, pTask->status.lastExecTs);
1112

1113
      return code;
49,589✔
1114
    } else {
1115
      // check if this task needs to be idle for a while
1116
      if (pTask->status.schedIdleTime > 0) {
4,362✔
1117
        streamTaskResumeInFuture(pTask);
4,153✔
1118

1119
        streamMutexUnlock(&pTask->lock);
4,155✔
1120
        setLastExecTs(pTask, taosGetTimestampMs());
4,155✔
1121
        return code;
4,155✔
1122
      }
1123
    }
1124

1125
    streamMutexUnlock(&pTask->lock);
209✔
1126
  }
1127

1128
  return code;
1129
}
1130

1131
int32_t streamExecTask(SStreamTask* pTask) {
49,654✔
1132
  // this function may be executed by multi-threads, so status check is required.
1133
  const char* id = pTask->id.idStr;
49,654✔
1134
  int32_t     code = 0;
49,654✔
1135

1136
  int8_t schedStatus = streamTaskSetSchedStatusActive(pTask);
49,654✔
1137
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
49,684!
1138
    code = streamResumeTask(pTask);
49,686✔
1139
  } else {
1140
    char* p = streamTaskGetStatus(pTask).name;
×
1141
    stDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, p,
×
1142
            pTask->status.schedStatus);
1143
  }
1144

1145
  return code;
49,677✔
1146
}
1147

1148
int32_t streamTaskReleaseState(SStreamTask* pTask) {
1,621✔
1149
  stDebug("s-task:%s release exec state", pTask->id.idStr);
1,621✔
1150
  void* pExecutor = pTask->exec.pExecutor;
1,621✔
1151

1152
  int32_t code = TSDB_CODE_SUCCESS;
1,621✔
1153
  if (pExecutor != NULL) {
1,621!
1154
    code = qStreamOperatorReleaseState(pExecutor);
1,621✔
1155
  }
1156

1157
  return code;
1,621✔
1158
}
1159

1160
int32_t streamTaskReloadState(SStreamTask* pTask) {
1,621✔
1161
  stDebug("s-task:%s reload exec state", pTask->id.idStr);
1,621✔
1162
  void* pExecutor = pTask->exec.pExecutor;
1,621✔
1163

1164
  int32_t code = TSDB_CODE_SUCCESS;
1,621✔
1165
  if (pExecutor != NULL) {
1,621!
1166
    code = qStreamOperatorReloadState(pExecutor);
1,621✔
1167
  }
1168

1169
  return code;
1,621✔
1170
}
1171

1172
int32_t streamAlignTransferState(SStreamTask* pTask) {
4,645✔
1173
  int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
4,645✔
1174
  int32_t old = atomic_val_compare_exchange_32(&pTask->transferStateAlignCnt, 0, numOfUpstream);
4,654✔
1175
  if (old == 0) {
4,656✔
1176
    stDebug("s-task:%s set the transfer state aligncnt %d", pTask->id.idStr, numOfUpstream);
1,679✔
1177
  }
1178

1179
  return atomic_sub_fetch_32(&pTask->transferStateAlignCnt, 1);
4,656✔
1180
}
1181

1182
int32_t streamAlignRecalculateStart(SStreamTask* pTask) {
×
1183
  int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
×
1184
  int32_t old = atomic_val_compare_exchange_32(&pTask->recalculateAlignCnt, 0, numOfUpstream);
×
1185
  if (old == 0) {
×
1186
    stDebug("s-task:%s set start recalculate state aligncnt %d", pTask->id.idStr, numOfUpstream);
×
1187
  }
1188

1189
  return atomic_sub_fetch_32(&pTask->recalculateAlignCnt, 1);
×
1190
}
1191

1192
int32_t continueDispatchRecalculateStart(SStreamDataBlock* pBlock, SStreamTask* pTask) {
×
1193
  pBlock->srcTaskId = pTask->id.taskId;
×
1194
  pBlock->srcVgId = pTask->pMeta->vgId;
×
1195

1196
  int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
×
1197
  if (code == 0) {
×
1198
    code = streamDispatchStreamBlock(pTask);
×
1199
  } else {
1200
    stError("s-task:%s failed to put recalculate start block into outputQ, code:%s", pTask->id.idStr, tstrerror(code));
×
1201
    streamFreeQitem((SStreamQueueItem*)pBlock);
×
1202
  }
1203

1204
  return code;
×
1205
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc