• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3842

07 Apr 2025 11:21AM UTC coverage: 62.696% (-0.3%) from 63.027%
#3842

push

travis-ci

web-flow
merge: from main to 3.0 branch (#30679)

154855 of 315075 branches covered (49.15%)

Branch coverage included in aggregate %.

6 of 8 new or added lines in 5 files covered. (75.0%)

2309 existing lines in 130 files now uncovered.

240176 of 314995 relevant lines covered (76.25%)

19119980.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.3
/source/libs/stream/src/streamExec.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamInt.h"
17

18
// maximum allowed processed block batches. One block may include several submit blocks
19
#define MAX_STREAM_EXEC_BATCH_NUM         32
20
#define STREAM_RESULT_DUMP_THRESHOLD      300
21
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1)  // 1MiB result data
22
#define STREAM_SCAN_HISTORY_TIMESLICE     1000           // 1000 ms
23
#define MIN_INVOKE_INTERVAL               50             // 50ms
24
#define FILL_HISTORY_TASK_EXEC_INTERVAL   5000           // 5 sec
25

26
static int32_t streamAlignRecalculateStart(SStreamTask* pTask);
27
static int32_t continueDispatchRecalculateStart(SStreamDataBlock* pBlock, SStreamTask* pTask);
28
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
29
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
30
                                  int32_t* totalBlocks);
31

32
bool streamTaskShouldStop(const SStreamTask* pTask) {
1,813,243✔
33
  SStreamTaskState pState = streamTaskGetStatus(pTask);
1,813,243✔
34
  return (pState.state == TASK_STATUS__STOP) || (pState.state == TASK_STATUS__DROPPING);
1,812,867!
35
}
36

37
bool streamTaskShouldPause(const SStreamTask* pTask) {
28,123✔
38
  return (streamTaskGetStatus(pTask).state == TASK_STATUS__PAUSE);
28,123✔
39
}
40

41
static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBlock) {
36,354✔
42
  int32_t code = 0;
36,354✔
43
  int32_t type = pTask->outputInfo.type;
36,354✔
44
  if (type == TASK_OUTPUT__TABLE) {
36,354✔
45
    pTask->outputInfo.tbSink.tbSinkFunc(pTask, pTask->outputInfo.tbSink.vnode, pBlock->blocks);
14,561✔
46
    destroyStreamDataBlock(pBlock);
14,561✔
47
  } else if (type == TASK_OUTPUT__SMA) {
21,793!
48
    pTask->outputInfo.smaSink.smaSink(pTask->outputInfo.smaSink.vnode, pTask->outputInfo.smaSink.smaId, pBlock->blocks);
×
49
    destroyStreamDataBlock(pBlock);
×
50
  } else {
51
    if (type != TASK_OUTPUT__FIXED_DISPATCH && type != TASK_OUTPUT__SHUFFLE_DISPATCH &&
21,793!
52
        type != TASK_OUTPUT__VTABLE_MAP) {
53
      stError("s-task:%s invalid stream output type:%d, internal error", pTask->id.idStr, type);
×
54
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
55
    }
56

57
    code = streamTaskPutDataIntoOutputQ(pTask, pBlock);
21,793✔
58
    if (code != TSDB_CODE_SUCCESS) {
21,796!
59
      destroyStreamDataBlock(pBlock);
×
60
      return code;
×
61
    }
62

63
    // not handle error, if dispatch failed, try next time.
64
    // checkpoint trigger will be checked
65
    code = streamDispatchStreamBlock(pTask);
21,796✔
66
  }
67

68
  return code;
36,361✔
69
}
70

71
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
20,930✔
72
                            int32_t* totalBlocks) {
73
  int32_t numOfBlocks = taosArrayGetSize(pRes);
20,930✔
74
  if (numOfBlocks == 0) {
20,930!
75
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
×
76
    return TSDB_CODE_SUCCESS;
×
77
  }
78

79
  SStreamDataBlock* pStreamBlocks = NULL;
20,930✔
80

81
  int32_t code = createStreamBlockFromResults(pItem, pTask, size, pRes, &pStreamBlocks);
20,930✔
82
  if (code) {
20,930!
83
    stError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
×
84
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
×
85
    return TSDB_CODE_OUT_OF_MEMORY;
×
86
  }
87

88
  stDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
20,930✔
89
          SIZE_IN_MiB(size));
90

91
  code = doOutputResultBlockImpl(pTask, pStreamBlocks);
20,930✔
92
  if (code != TSDB_CODE_SUCCESS) {  // back pressure and record position
20,930!
93
    return code;
×
94
  }
95

96
  *totalSize += size;
20,930✔
97
  *totalBlocks += numOfBlocks;
20,930✔
98

99
  return code;
20,930✔
100
}
101

102
static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, SStreamDataBlock* pRetrieveBlock,
532✔
103
                                     SArray* pRes) {
104
  SSDataBlock block = {0};
532✔
105
  int32_t     num = taosArrayGetSize(pRetrieveBlock->blocks);
532✔
106
  if (num != 1) {
532!
107
    stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num);
×
108
    return TSDB_CODE_INVALID_PARA;
×
109
  }
110

111
  void*   p = taosArrayGet(pRetrieveBlock->blocks, 0);
532✔
112
  int32_t code = assignOneDataBlock(&block, p);
532✔
113
  if (code) {
532!
114
    stError("s-task:%s failed to assign retrieve block, code:%s", pTask->id.idStr, tstrerror(code));
×
115
    return code;
×
116
  }
117

118
  block.info.type = STREAM_PULL_OVER;
532✔
119
  block.info.childId = pTask->info.selfChildId;
532✔
120

121
  p = taosArrayPush(pRes, &block);
532✔
122
  if (p != NULL) {
532!
123
    (*pNumOfBlocks) += 1;
532✔
124
    stDebug("s-task:%s(child %d) retrieve res from upstream completed, QID:0x%" PRIx64, pTask->id.idStr,
532✔
125
            pTask->info.selfChildId, pRetrieveBlock->reqId);
126
  } else {
127
    code = terrno;
×
128
    stError("s-task:%s failed to append pull over block for retrieve data, QID:0x%" PRIx64 " code:%s", pTask->id.idStr,
×
129
            pRetrieveBlock->reqId, tstrerror(code));
130
  }
131

132
  return code;
532✔
133
}
134

135
static int32_t doAppendRecalBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, SStreamTrigger* pRecalculateBlock,
×
136
                                  SArray* pRes) {
137
  int32_t code = 0;
×
138
  SSDataBlock block = {0};
×
139

140
  void* p = taosArrayPush(pRes, pRecalculateBlock->pBlock);
×
141
  if (p != NULL) {
×
142
    (*pNumOfBlocks) += 1;
×
143
    stDebug("s-task:%s(child %d) recalculate from upstream completed, QID:0x%" PRIx64, pTask->id.idStr,
×
144
            pTask->info.selfChildId, /*pRecalculateBlock->reqId*/ (int64_t)0);
145
  } else {
146
    code = terrno;
×
147
    stError("s-task:%s failed to append recalculate block for downstream, QID:0x%" PRIx64" code:%s", pTask->id.idStr,
×
148
            /*pRecalculateBlock->reqId*/(int64_t)0, tstrerror(code));
149
  }
150

151
  return code;
×
152
}
153

154
int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
44,591✔
155
  int32_t size = 0;
44,591✔
156
  int32_t numOfBlocks = 0;
44,591✔
157
  int32_t code = TSDB_CODE_SUCCESS;
44,591✔
158
  void*   pExecutor = pTask->exec.pExecutor;
44,591✔
159
  SArray* pRes = NULL;
44,591✔
160

161
  *totalBlocks = 0;
44,591✔
162
  *totalSize = 0;
44,591✔
163

164
  while (1) {
100,441✔
165
    SSDataBlock* output = NULL;
145,032✔
166
    uint64_t     ts = 0;
145,032✔
167

168
    if (pRes == NULL) {
145,032✔
169
      pRes = taosArrayInit(4, sizeof(SSDataBlock));
44,604✔
170
    }
171

172
    if (streamTaskShouldStop(pTask) || (pRes == NULL)) {
145,035!
173
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
6✔
174
      return code;
6✔
175
    }
176

177
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
145,042✔
178
      if (code == TSDB_CODE_QRY_IN_EXEC) {
26!
179
        qResetTaskInfoCode(pExecutor);
×
180
      }
181

182
      if (code == TSDB_CODE_OUT_OF_MEMORY || code == TSDB_CODE_INVALID_PARA || code == TSDB_CODE_FILE_CORRUPTED) {
26!
183
        stFatal("s-task:%s failed to continue execute since %s", pTask->id.idStr, tstrerror(code));
×
184
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
×
185
        return code;
×
186
      } else {
187
        qResetTaskCode(pExecutor);
26✔
188
        continue;
770✔
189
      }
190
    }
191

192
    if (output == NULL) {
145,052✔
193
      if (pItem != NULL && (pItem->type == STREAM_INPUT__DATA_RETRIEVE)) {
44,632!
194
        code = doAppendPullOverBlock(pTask, &numOfBlocks, (SStreamDataBlock*)pItem, pRes);
532✔
195
        if (code) {
532!
196
          taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
×
197
          return code;
×
198
        }
199
      }
200

201
      break;
44,632✔
202
    }
203

204
    if (pTask->info.fillHistory == STREAM_RECALCUL_TASK && pTask->info.taskLevel == TASK_LEVEL__AGG) {
100,420✔
205
      stDebug("s-task:%s exec output type:%d", pTask->id.idStr, output->info.type);
10!
206
    }
207

208
    if (output->info.type == STREAM_RETRIEVE) {
100,420✔
209
      if (streamBroadcastToUpTasks(pTask, output) < 0) {
160✔
210
        // TODO
211
      }
212
      continue;
160✔
213
    } else if (output->info.type == STREAM_CHECKPOINT) {
100,260✔
214
      continue;  // checkpoint block not dispatch to downstream tasks
584✔
215
    }
216

217
    SSDataBlock block = {.info.childId = pTask->info.selfChildId};
99,676✔
218
    code = assignOneDataBlock(&block, output);
99,676✔
219
    if (code) {
99,676!
220
      stError("s-task:%s failed to build result block due to out of memory", pTask->id.idStr);
×
221
      continue;
×
222
    }
223

224
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
99,676✔
225
    numOfBlocks += 1;
99,667✔
226

227
    void* p = taosArrayPush(pRes, &block);
99,670✔
228
    if (p == NULL) {
99,670!
229
      stError("s-task:%s failed to add computing results, the final res may be incorrect", pTask->id.idStr);
×
230
    } else {
231
      stDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
99,670✔
232
              pTask->info.selfChildId, numOfBlocks, SIZE_IN_MiB(size));
233
    }
234

235
    // current output should be dispatched to down stream nodes
236
    if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
99,672✔
237
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
3✔
238
      // todo: here we need continue retry to put it into output buffer
239
      if (code != TSDB_CODE_SUCCESS) {
2!
240
        return code;
×
241
      }
242

243
      pRes = NULL;
2✔
244
      size = 0;
2✔
245
      numOfBlocks = 0;
2✔
246
    }
247
  }
248

249
  if (numOfBlocks > 0) {
44,632✔
250
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
20,928✔
251
  } else {
252
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
23,704✔
253
  }
254

255
  return code;
44,632✔
256
}
257

258
// todo contiuous try to create result blocks
259
static int32_t handleScanhistoryResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) {
2,956✔
260
  int32_t code = TSDB_CODE_SUCCESS;
2,956✔
261
  if (taosArrayGetSize(pRes) > 0) {
2,956✔
262
    SStreamDataBlock* pStreamBlocks = NULL;
1,803✔
263
    code = createStreamBlockFromResults(NULL, pTask, size, pRes, &pStreamBlocks);
1,803✔
264
    if (code) {
1,803!
265
      stError("s-task:%s failed to build history result blocks", pTask->id.idStr);
×
266
      return code;
×
267
    }
268

269
    code = doOutputResultBlockImpl(pTask, pStreamBlocks);
1,803✔
270
    if (code != TSDB_CODE_SUCCESS) {  // should not have error code
1,803!
271
      stError("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code));
×
272
    }
273
  } else {
274
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
1,153✔
275
  }
276
  return code;
2,956✔
277
}
278

279
static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t* pSize, bool* pFinish) {
2,958✔
280
  int32_t code = TSDB_CODE_SUCCESS;
2,958✔
281
  void*   exec = pTask->exec.pExecutor;
2,958✔
282
  int32_t numOfBlocks = 0;
2,958✔
283

284
  while (1) {
23,518✔
285
    if (streamTaskShouldStop(pTask)) {
26,476!
286
      break;
×
287
    }
288

289
    if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
26,476!
290
      stDebug("s-task:%s level:%d inputQ is blocked, retry in 5s", pTask->id.idStr, pTask->info.taskLevel);
×
291
      break;
×
292
    }
293

294
    SSDataBlock* output = NULL;
26,476✔
295
    uint64_t     ts = 0;
26,476✔
296
    code = qExecTask(exec, &output, &ts);
26,476✔
297
    if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) {  // if out of memory occurs, quit
26,476!
298
      stError("s-task:%s scan-history data error occurred code:%s, continue scan-history", pTask->id.idStr,
×
299
              tstrerror(code));
300
      qResetTaskCode(exec);
×
301
      continue;
×
302
    }
303

304
    // the generated results before fill-history task been paused, should be dispatched to sink node
305
    if (output == NULL) {
26,476✔
306
      (*pFinish) = qStreamScanhistoryFinished(exec);
2,346✔
307
      break;
2,346✔
308
    }
309

310
    SSDataBlock block = {0};
24,130✔
311
    code = assignOneDataBlock(&block, output);
24,130✔
312
    if (code) {
24,130!
313
      stError("s-task:%s failed to build result block due to out of memory", pTask->id.idStr);
×
314
    }
315

316
    block.info.childId = pTask->info.selfChildId;
24,130✔
317
    void* p = taosArrayPush(pRes, &block);
24,130✔
318
    if (p == NULL) {
24,130!
319
      stError("s-task:%s failed to add computing results, the final res may be incorrect", pTask->id.idStr);
×
320
    }
321

322
    (*pSize) +=
24,130✔
323
        blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
24,130✔
324
    numOfBlocks += 1;
24,130✔
325

326
    if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || (*pSize) >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
24,130✔
327
      stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached",
612!
328
              pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(*pSize), STREAM_RESULT_DUMP_THRESHOLD,
329
              SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD));
330
      break;
612✔
331
    }
332
  }
333
}
2,958✔
334

335
static SScanhistoryDataInfo buildScanhistoryExecRet(EScanHistoryCode code, int32_t idleTime) {
2,374✔
336
  return (SScanhistoryDataInfo){code, idleTime};
2,374✔
337
}
338

339
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
2,374✔
340
  void*       exec = pTask->exec.pExecutor;
2,374✔
341
  bool        finished = false;
2,374✔
342
  const char* id = pTask->id.idStr;
2,374✔
343

344
  if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
2,374!
345
    stError("s-task:%s not source scan-history task, not exec, quit", pTask->id.idStr);
×
346
    return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
×
347
  }
348

349
  if ((!pTask->hTaskInfo.operatorOpen) || (pTask->info.fillHistory == STREAM_RECALCUL_TASK)) {
2,374✔
350
    int32_t code = qSetStreamOpOpen(exec);
2,346✔
351
    pTask->hTaskInfo.operatorOpen = true;
2,346✔
352
  }
353

354
  while (1) {
584✔
355
    if (streamTaskShouldPause(pTask)) {
2,958!
356
      stDebug("s-task:%s paused from the scan-history task", id);
×
357
      // quit from step1, not continue to handle the step2
358
      return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
2,374✔
359
    }
360

361
    // output queue is full, idle for 5 sec.
362
    if (streamQueueIsFull(pTask->outputq.queue)) {
2,958!
UNCOV
363
      stWarn("s-task:%s outputQ is full, idle for 1sec and retry", id);
×
UNCOV
364
      return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, STREAM_SCAN_HISTORY_TIMESLICE);
×
365
    }
366

367
    if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
2,958!
368
      stWarn("s-task:%s downstream task inputQ blocked, idle for 5sec and retry", id);
×
369
      return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL);
×
370
    }
371

372
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
2,958✔
373
    if (pRes == NULL) {
2,958!
374
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
375
      stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", id, tstrerror(terrno));
×
376
      continue;
×
377
    }
378

379
    int32_t size = 0;
2,958✔
380
    streamScanHistoryDataImpl(pTask, pRes, &size, &finished);
2,958✔
381

382
    if (streamTaskShouldStop(pTask)) {
2,958✔
383
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
2✔
384
      return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
2✔
385
    }
386

387
    // dispatch the generated results, todo fix error
388
    int32_t code = handleScanhistoryResultBlocks(pTask, pRes, size);
2,956✔
389
    if (code) {
2,956!
390
      stError("s-task:%s failed to handle scan result block, code:%s", pTask->id.idStr, tstrerror(code));
×
391
    }
392

393
    if (finished) {
2,956✔
394
      return buildScanhistoryExecRet(TASK_SCANHISTORY_CONT, 0);
2,344✔
395
    }
396

397
    int64_t el = taosGetTimestampMs() - st;
612✔
398
    if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) {
612!
399
      stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id,
28!
400
              pTask->info.fillHistory, el / 1000.0);
401
      return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, 100);
28✔
402
    }
403
  }
404
}
405

406
int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
2,248✔
407
  SStreamMeta* pMeta = pTask->pMeta;
2,248✔
408
  const char*  id = pTask->id.idStr;
2,248✔
409

410
  SStreamTask* pStreamTask = NULL;
2,248✔
411
  int32_t code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
2,248✔
412
  if (pStreamTask == NULL || code != TSDB_CODE_SUCCESS) {
2,248!
413
    stError(
×
414
        "s-task:%s failed to find related stream task:0x%x, may have been destroyed or closed, destroy related "
415
        "fill-history task",
416
        id, (int32_t)pTask->streamTaskId.taskId);
417

418
    // 1. free it and remove fill-history task from disk meta-store
419
    // todo: this function should never be failed.
420
    code = streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
×
421

422
    // 2. save to disk
423
    streamMetaWLock(pMeta);
×
424
    if (streamMetaCommit(pMeta) < 0) {
×
425
      // persist to disk
426
    }
427
    streamMetaWUnLock(pMeta);
×
428
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
429
  } else {
430
    double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.;
2,248✔
431
    stDebug(
2,248✔
432
        "s-task:%s fill-history task end, status:%s, scan wal elapsed time:%.2fSec, update related stream task:%s "
433
        "info, prepare transfer exec state",
434
        id, streamTaskGetStatus(pTask).name, el, pStreamTask->id.idStr);
435
  }
436

437
  ETaskStatus  status = streamTaskGetStatus(pStreamTask).state;
2,248✔
438
  STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
2,248✔
439

440
  // It must be halted for a source stream task, since when the related scan-history-data task start scan the history
441
  // for the step 2.
442
  if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
2,248✔
443
    if (!(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP)) {
2,179!
444
      stError("s-task:%s invalid task status:%d", id, status);
×
445
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
446
    }
447
  } else {
448
    if (!(status == TASK_STATUS__READY || status == TASK_STATUS__PAUSE || status == TASK_STATUS__DROPPING ||
69!
449
          status == TASK_STATUS__STOP)) {
450
      stError("s-task:%s invalid task status:%d", id, status);
×
451
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
452
    }
453
    code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
69✔
454
    if (code != TSDB_CODE_SUCCESS) {
69!
455
      stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", id,
×
456
              pStreamTask->id.idStr, tstrerror(code));
457
      streamMetaReleaseTask(pMeta, pStreamTask);
×
458
      return code;
×
459
    } else {
460
      stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, id);
69✔
461
    }
462
  }
463

464
  // In case of sink tasks, no need to halt them.
465
  // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
466
  // start the task state transfer procedure.
467
  SStreamTaskState pState = streamTaskGetStatus(pStreamTask);
2,248✔
468
  status = pState.state;
2,248✔
469
  char* p = pState.name;
2,248✔
470
  if (status == TASK_STATUS__STOP || status == TASK_STATUS__DROPPING) {
2,248!
471
    stError("s-task:%s failed to transfer state from fill-history task:%s, status:%s", id, pStreamTask->id.idStr, p);
×
472
    streamMetaReleaseTask(pMeta, pStreamTask);
×
473
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
474
  }
475

476
  // 1. expand the query time window for stream task of WAL scanner
477
  if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
2,248✔
478
    // update the scan data range for source task.
479
    stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
2,179✔
480
            ", status:%s, sched-status:%d",
481
            pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
482
            pTimeWindow->ekey, p, pStreamTask->status.schedStatus);
483

484
    code = streamTaskResetTimewindowFilter(pStreamTask);
2,179✔
485
  } else {
486
    stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr);
69✔
487
  }
488

489
  // NOTE: transfer the ownership of executor state before handle the checkpoint block during stream exec
490
  // 2. send msg to mnode to launch a checkpoint to keep the state for current stream
491
  code = streamTaskSendCheckpointReq(pStreamTask);
2,248✔
492

493
  // 3. the default task status should be ready or something, not halt.
494
  // status to the value that will be kept in disk
495

496
  // 4. open the inputQ for all upstream tasks
497
  streamTaskOpenAllUpstreamInput(pStreamTask);
2,248✔
498

499
  streamMetaReleaseTask(pMeta, pStreamTask);
2,248✔
500
  return code;
2,248✔
501
}
502

503
static int32_t haltCallback(SStreamTask* pTask, void* param) {
2,100✔
504
  streamTaskOpenAllUpstreamInput(pTask);
2,100✔
505
  return streamTaskSendCheckpointReq(pTask);
2,098✔
506
}
507

508
int32_t streamTransferStatePrepare(SStreamTask* pTask) {
4,349✔
509
  int32_t      code = TSDB_CODE_SUCCESS;
4,349✔
510
  SStreamMeta* pMeta = pTask->pMeta;
4,349✔
511

512
  if (pTask->status.appendTranstateBlock != 1) {
4,349!
513
    stError("s-task:%s not set appendTransBlock flag, internal error", pTask->id.idStr);
×
514
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
515
  }
516

517
  int32_t level = pTask->info.taskLevel;
4,349✔
518
  if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {  // do transfer task operator states.
4,349✔
519
    code = streamTransferStateDoPrepare(pTask);
2,248✔
520
  } else {
521
    // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task.
522
    SStreamTask* pStreamTask = NULL;
2,101✔
523
    code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
2,101✔
524
    if (pStreamTask != NULL) {
2,103!
525
      // halt the related stream sink task
526
      code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, haltCallback, NULL);
2,103✔
527
      if (code != TSDB_CODE_SUCCESS) {
2,100!
528
        stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", pTask->id.idStr,
×
529
                pStreamTask->id.idStr, tstrerror(code));
530
        streamMetaReleaseTask(pMeta, pStreamTask);
×
531
        return code;
×
532
      } else {
533
        stDebug("s-task:%s sink task halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
2,100✔
534
      }
535
      streamMetaReleaseTask(pMeta, pStreamTask);
2,100✔
536
    }
537
  }
538

539
  return code;
4,351✔
540
}
541

542
// set input
543
static int32_t doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pVer, const char* id) {
44,601✔
544
  void*   pExecutor = pTask->exec.pExecutor;
44,601✔
545
  int32_t code = 0;
44,601✔
546

547
  const SStreamQueueItem* pItem = pInput;
44,601✔
548
  if (pItem->type == STREAM_INPUT__GET_RES) {
44,601✔
549
    const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
5,929✔
550
    code = qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
5,929✔
551
    if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
5,929✔
552
      TSKEY k = pTrigger->pBlock->info.window.skey;
4,254✔
553
      stDebug("s-task:%s set force_window_close as source block, skey:%" PRId64, id, k);
4,254!
554
      (*pVer) = k;
4,254✔
555
    }
556
  } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
38,672✔
557
    const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
6,771✔
558
    code = qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
6,771✔
559
    stDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
6,773✔
560
            pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
561
    if ((*pVer) > pSubmit->submit.ver) {
6,766!
562
      stError("s-task:%s invalid recorded ver:%" PRId64 " greater than new block ver:%" PRId64 ", not update", id,
×
563
              *pVer, pSubmit->submit.ver);
564
    } else {
565
      (*pVer) = pSubmit->submit.ver;
6,766✔
566
    }
567
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
34,063✔
568
    const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;
2,152✔
569

570
    SArray* pBlockList = pBlock->blocks;
2,152✔
571
    int32_t numOfBlocks = taosArrayGetSize(pBlockList);
2,152✔
572
    stDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer);
2,163✔
573
    code = qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
2,163✔
574

575
  } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
29,749✔
576
    const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;
23,758✔
577

578
    SArray* pBlockList = pMerged->submits;
23,758✔
579
    int32_t numOfBlocks = taosArrayGetSize(pBlockList);
23,758✔
580
    stDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, numOfBlocks,
23,762✔
581
            pMerged->ver);
582
    code = qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
23,762✔
583

584
    if ((*pVer) > pMerged->ver) {
23,755✔
585
      stError("s-task:%s invalid recorded ver:%" PRId64 " greater than new block ver:%" PRId64 ", not update", id,
2!
586
              *pVer, pMerged->ver);
587
    } else {
588
      (*pVer) = pMerged->ver;
23,753✔
589
    }
590

591
  } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
5,991✔
592
    const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
2,409✔
593
    code = qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
2,409✔
594

595
  } else if (pItem->type == STREAM_INPUT__CHECKPOINT || pItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
3,582!
596
             pItem->type == STREAM_INPUT__RECALCULATE) {
3,594!
597
    const SStreamDataBlock* pCheckpoint = (const SStreamDataBlock*)pInput;
3,582✔
598
    code = qSetMultiStreamInput(pExecutor, pCheckpoint->blocks, 1, pItem->type);
3,582✔
599

600
    if (pItem->type == STREAM_INPUT__RECALCULATE) {
3,582✔
601
      int32_t t = ((SStreamDataBlock*) pCheckpoint)->type;
12✔
602
      int32_t tId = (int32_t)pTask->hTaskInfo.id.taskId;
12✔
603
      if (t == STREAM_RECALCULATE_START) {
12!
604
        stDebug("s-task:%s set recalculate block to start related recalculate task:0x%x", id, tId);
×
605
      } else {
606
        stDebug("s-task:%s set recalculate block:%d, task:0x%x", id, t, tId);
12!
607
      }
608
    }
609
  } else {
610
    stError("s-task:%s invalid input block type:%d, discard", id, pItem->type);
×
611
    code = TSDB_CODE_STREAM_INTERNAL_ERROR;
×
612
  }
613

614
  return code;
44,591✔
615
}
616

617
void streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
8,929✔
618
  const char* id = pTask->id.idStr;
8,929✔
619
  int32_t     code = TSDB_CODE_SUCCESS;
8,929✔
620
  int32_t     level = pTask->info.taskLevel;
8,929✔
621

622
  // dispatch the tran-state block to downstream task immediately
623
  int32_t type = pTask->outputInfo.type;
8,929✔
624

625
  if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SINK) {
8,929✔
626
    int32_t remain = streamAlignTransferState(pTask);
6,746✔
627
    if (remain > 0) {
6,754✔
628
      streamFreeQitem((SStreamQueueItem*)pBlock);
4,582✔
629
      stDebug("s-task:%s receive upstream trans-state msg, not sent remain:%d", id, remain);
4,581✔
630
      return;
4,580✔
631
    }
632
  }
633

634
  // transfer the ownership of executor state
635
  if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH || type == TASK_OUTPUT__VTABLE_MAP) {
4,355!
636
    if (level == TASK_LEVEL__SOURCE) {
2,232✔
637
      stDebug("s-task:%s add transfer-state block into outputQ", id);
2,163✔
638
    } else {
639
      stDebug("s-task:%s all upstream tasks send transfer-state block, add transfer-state block into outputQ", id);
69✔
640
    }
641

642
    // agg task should dispatch trans-state msg to sink task, to flush all data to sink task.
643
    if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {
2,232!
644
      pBlock->srcVgId = pTask->pMeta->vgId;
2,232✔
645
      code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
2,232✔
646
      if (code == 0) {
2,232!
647
        code = streamDispatchStreamBlock(pTask);
2,232✔
648
        if (code) {
2,232!
649
          stError("s-task:%s failed to dispatch stream block, code:%s", id, tstrerror(code));
×
650
        }
651
      } else {  // todo put into queue failed, retry
652
        streamFreeQitem((SStreamQueueItem*)pBlock);
×
653
      }
654
    } else {  // level == TASK_LEVEL__SINK
655
      streamFreeQitem((SStreamQueueItem*)pBlock);
×
656
    }
657
  } else {  // non-dispatch task, do task state transfer directly
658
    streamFreeQitem((SStreamQueueItem*)pBlock);
2,123✔
659
    stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, level);
2,123✔
660

661
    code = streamTransferStatePrepare(pTask);
2,123✔
662
    if (code != TSDB_CODE_SUCCESS) {
2,121!
663
      stError("s-task:%s failed to prepare transfer state, code:%s", id, tstrerror(code));
×
664
      int8_t status = streamTaskSetSchedStatusInactive(pTask);  // let's ignore this return status
×
665
    }
666
  }
667
}
668

669
// static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
670
static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; }
67,734✔
671

672
static void doRecordThroughput(STaskExecStatisInfo* pInfo, int64_t totalBlocks, int64_t totalSize, int64_t blockSize,
44,632✔
673
                               double st, const char* id) {
674
  double el = (taosGetTimestampMs() - st) / 1000.0;
44,632✔
675

676
  stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%" PRId64, id,
44,632✔
677
          el, SIZE_IN_MiB(totalSize), totalBlocks);
678

679
  pInfo->outputDataBlocks += totalBlocks;
44,632✔
680
  pInfo->outputDataSize += totalSize;
44,632✔
681
  if (fabs(el - 0.0) <= DBL_EPSILON) {
44,632✔
682
    pInfo->procsThroughput = 0;
15,728✔
683
    pInfo->outputThroughput = 0;
15,728✔
684
  } else {
685
    pInfo->outputThroughput = (totalSize / el);
28,904✔
686
    pInfo->procsThroughput = (blockSize / el);
28,904✔
687
  }
688
}
44,632✔
689

690
static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, int32_t num) {
44,601✔
691
  const char*      id = pTask->id.idStr;
44,601✔
692
  int32_t          blockSize = 0;
44,601✔
693
  int64_t          st = taosGetTimestampMs();
44,612✔
694
  SCheckpointInfo* pInfo = &pTask->chkInfo;
44,612✔
695
  int64_t          ver = pInfo->processedVer;
44,612✔
696
  int64_t          totalSize = 0;
44,612✔
697
  int32_t          totalBlocks = 0;
44,612✔
698
  int32_t          code = 0;
44,612✔
699

700
  stDebug("s-task:%s start to process batch blocks, num:%d, type:%s", id, num, streamQueueItemGetTypeStr(pBlock->type));
44,612✔
701
  code = doSetStreamInputBlock(pTask, pBlock, &ver, id);
44,612✔
702
  if (code) {
44,590!
703
    stError("s-task:%s failed to set input block, not exec for these blocks", id);
×
704
    return code;
×
705
  }
706

707
  code = streamTaskExecImpl(pTask, pBlock, &totalSize, &totalBlocks);
44,590✔
708
  if (code) {
44,638✔
709
    return code;
6✔
710
  }
711

712
  doRecordThroughput(&pTask->execInfo, totalBlocks, totalSize, blockSize, st, pTask->id.idStr);
44,632✔
713

714
  // update the currentVer if processing the submitted blocks.
715
  if (!(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer)) {
44,632!
716
    stError("s-task:%s invalid info, checkpointVer:%" PRId64 ", nextProcessVer:%" PRId64 " currentVer:%" PRId64, id,
×
717
            pInfo->checkpointVer, pInfo->nextProcessVer, ver);
718
    return code;
×
719
  }
720

721
  if (ver != pInfo->processedVer) {
44,632✔
722
    stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64
34,802✔
723
            " ckpt:%" PRId64,
724
            id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer);
725
    pInfo->processedVer = ver;
34,801✔
726
  }
727

728
  return code;
44,631✔
729
}
730

731
// do nothing after sync executor state to storage backend, untill checkpoint is completed.
732
static int32_t doHandleChkptBlock(SStreamTask* pTask) {
3,533✔
733
  int32_t     code = 0;
3,533✔
734
  const char* id = pTask->id.idStr;
3,533✔
735

736
  streamMutexLock(&pTask->lock);
3,533✔
737
  SStreamTaskState pState = streamTaskGetStatus(pTask);
3,533✔
738
  streamMutexUnlock(&pTask->lock);
3,533✔
739

740
  if (pState.state == TASK_STATUS__CK) {  // todo other thread may change the status
3,533!
741
    stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
3,533✔
742
    code = streamTaskBuildCheckpoint(pTask);  // ignore this error msg, and continue
3,533✔
743
  } else {                                    // todo refactor
744
    if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
×
745
      code = streamTaskSendCheckpointSourceRsp(pTask);
×
746
    } else {
747
      code = streamTaskSendCheckpointReadyMsg(pTask);
×
748
    }
749

750
    if (code != TSDB_CODE_SUCCESS) {
×
751
      // todo: let's retry send rsp to upstream/mnode
752
      stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", id, 0,
×
753
              tstrerror(code));
754
    }
755
  }
756

757
//  streamMutexUnlock(&pTask->lock);
758
  return code;
3,533✔
759
}
760

761
int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) {
3,570✔
762
  const char* id = pTask->id.idStr;
3,570✔
763

764
  // 1. transfer the ownership of executor state
765
  bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
3,570✔
766
  if (dropRelHTask) {
3,570✔
767
    STaskId*     pHTaskId = &pTask->hTaskInfo.id;
2,192✔
768
    SStreamTask* pHTask = NULL;
2,192✔
769
    int32_t      code = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
2,192✔
770
    if (code == TSDB_CODE_SUCCESS) {  // ignore the error code.
2,192!
771
      code = streamTaskReleaseState(pHTask);
2,192✔
772
      if (code) {
2,192!
773
        stError("s-task:%s failed to release query state, code:%s", pHTask->id.idStr, tstrerror(code));
×
774
      }
775

776
      if (code == TSDB_CODE_SUCCESS) {
2,192!
777
        code = streamTaskReloadState(pTask);
2,192✔
778
        if (code) {
2,192!
779
          stError("s-task:%s failed to reload query state, code:%s", pTask->id.idStr, tstrerror(code));
×
780
        }
781
      }
782

783
      stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
2,192✔
784
              streamTaskGetStatus(pHTask).name);
785
      // todo execute qExecTask to fetch the reload-generated result, if this is stream is for session window query.
786
      /*
787
       * while(1) {
788
       * qExecTask()
789
       * }
790
       * // put into the output queue.
791
       */
792
      streamMetaReleaseTask(pTask->pMeta, pHTask);
2,192✔
793
    } else {
794
      stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id,
×
795
              (int32_t)pHTaskId->taskId);
796
    }
797
  } else {
798
    stDebug("s-task:%s no transfer-state needed", id);
1,378✔
799
  }
800

801
  // 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V.
802
  int32_t code = doStreamTaskExecImpl(pTask, pCheckpointBlock, 1);
3,570✔
803
  if (code) {
3,570!
804
    stError("s-task:%s failed to exec stream task before checkpoint, code:%s", id, tstrerror(code));
×
805
  }
806

807
  return code;
3,570✔
808
}
809

810
/**
811
 * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
812
 * appropriate batch of blocks should be handled in 5 to 10 sec.
813
 */
814
static int32_t doStreamExecTask(SStreamTask* pTask) {
67,953✔
815
  const char* id = pTask->id.idStr;
67,953✔
816
  int32_t     code = 0;
67,953✔
817
  int32_t     vgId = pTask->pMeta->vgId;
67,953✔
818
  int32_t     taskLevel = pTask->info.taskLevel;
67,953✔
819
  int32_t     taskType = pTask->info.fillHistory;
67,953✔
820

821
  // merge multiple input data if possible in the input queue.
822
  int64_t st = taosGetTimestampMs();
68,009✔
823
  stDebug("s-task:%s start to extract data block from inputQ, ts:%" PRId64, id, st);
68,009✔
824

825
  while (1) {
75,840✔
826
    int32_t           blockSize = 0;
143,841✔
827
    int32_t           numOfBlocks = 0;
143,841✔
828
    SStreamQueueItem* pInput = NULL;
143,841✔
829

830
    if (streamTaskShouldStop(pTask) || (streamTaskGetStatus(pTask).state == TASK_STATUS__UNINIT)) {
143,841✔
831
      stDebug("s-task:%s stream task is stopped", id);
8✔
832
      return 0;
67,999✔
833
    }
834

835
    if (streamQueueIsFull(pTask->outputq.queue)) {
143,824✔
836
      stTrace("s-task:%s outputQ is full, idle for 500ms and retry", id);
30!
837
      streamTaskSetIdleInfo(pTask, 1000);
30✔
838
      return 0;
×
839
    }
840

841
    if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
143,853!
842
      stTrace("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", id);
×
843
      streamTaskSetIdleInfo(pTask, 1000);
×
844
      return 0;
×
845
    }
846

847
    if (taosGetTimestampMs() - pTask->status.lastExecTs < MIN_INVOKE_INTERVAL) {
143,860✔
848
      stDebug("s-task:%s invoke exec too fast, idle and retry in 50ms", id);
7,316✔
849
      streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
7,316✔
850
      return 0;
7,315✔
851
    }
852

853
    EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
136,544✔
854
    if (ret == EXEC_AFTER_IDLE) {
136,465!
855
      streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
×
856
      return 0;
×
857
    } else {
858
      if (pInput == NULL) {
136,511✔
859
        return 0;
57,139✔
860
      }
861
    }
862

863
    pTask->execInfo.inputDataBlocks += numOfBlocks;
79,372✔
864
    pTask->execInfo.inputDataSize += blockSize;
79,372✔
865

866
    // dispatch checkpoint msg to all downstream tasks
867
    int32_t type = pInput->type;
79,372✔
868
    if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
79,372✔
869
#if 0
870
      // Injection error: for automatic kill long trans test
871
      taosMsleep(50*1000);
872
#endif
873
      code = streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput);
12,228✔
874
      if (code != 0) {
12,228!
875
        stError("s-task:%s failed to process checkpoint-trigger block, code:%s", pTask->id.idStr, tstrerror(code));
×
876
      }
877
      continue;
34,786✔
878
    }
879

880
    if (type == STREAM_INPUT__TRANS_STATE) {
67,144✔
881
      streamProcessTransstateBlock(pTask, (SStreamDataBlock*)pInput);
8,934✔
882
      continue;
8,933✔
883
    }
884

885
    if (type == STREAM_INPUT__CHECKPOINT) {
58,210✔
886
      code = doHandleChkptBlock(pTask);
3,533✔
887
      streamFreeQitem(pInput);
3,533✔
888
      return code;
3,524✔
889
    }
890

891
    if (taskLevel == TASK_LEVEL__SINK) {
54,677✔
892
      if (type != STREAM_INPUT__DATA_BLOCK && type != STREAM_INPUT__RECALCULATE) {
13,627!
893
        stError("s-task:%s invalid block type:%d for sink task, discard", id, type);
×
894
        continue;
×
895
      }
896

897
      // here only handle the data block sink operation
898
      if (type == STREAM_INPUT__DATA_BLOCK) {
13,627✔
899
        pTask->execInfo.sink.dataSize += blockSize;
13,624✔
900
        stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize));
13,624✔
901
        code = doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput);
13,624✔
902
        if (code != TSDB_CODE_SUCCESS) {
13,627!
903
          return code;
×
904
        }
905

906
        double el = (taosGetTimestampMs() - st) / 1000.0;
13,626✔
907
        pTask->execInfo.procsThroughput = (fabs(el - 0.0) <= DBL_EPSILON) ? 0 : (blockSize / el);
13,626✔
908
      } else {
909
        streamFreeQitem((SStreamQueueItem*)pInput);
3✔
910
      }
911

912
      continue;
13,625✔
913
    }
914

915
    if (type == STREAM_INPUT__RECALCULATE) {
41,050✔
916
      if (taskType == STREAM_NORMAL_TASK && taskLevel == TASK_LEVEL__AGG) {
18✔
917
        int32_t remain = streamAlignRecalculateStart(pTask);
4✔
918
        if (remain > 0) {
4✔
919
          streamFreeQitem((SStreamQueueItem*)pInput);
2✔
920
          stDebug("s-task:%s receive upstream recalculate msg, not sent remain:%d", id, remain);
2!
921
          return code;
2✔
922
        }
923

924
        stDebug("s-task:%s all upstream send recalculate msg, continue", id);
2!
925
      }
926

927
      // 1. generate the recalculating snapshot for related recalculate tasks.
928
      if ((taskType == STREAM_NORMAL_TASK) &&
16✔
929
          ((taskLevel == TASK_LEVEL__AGG) || (taskLevel == TASK_LEVEL__SOURCE && (!pTask->info.hasAggTasks)))) {
12!
930
        code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks);
10✔
931
      } else if (taskType == STREAM_RECALCUL_TASK && taskLevel == TASK_LEVEL__AGG) {
6!
932
        // send retrieve to upstream tasks (source tasks, to start to recalculate procedure.
933
        stDebug("s-task:%s recalculate agg task send retrieve to upstream source tasks", id);
2!
934
        code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks);
2✔
935
      }
936
    }
937

938
    if (type != STREAM_INPUT__RECALCULATE) {
41,048✔
939
      code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks);
41,019✔
940
      streamFreeQitem(pInput);
41,052✔
941
      if (code) {
41,056✔
942
        return code;
6✔
943
      }
944
    }
945

946
    // for stream with only 1 task, start related re-calculate stream task directly.
947
    // We only start the re-calculate agg task here, and do NOT start the source task, for streams with agg-tasks.
948
    if ((type == STREAM_INPUT__RECALCULATE) && (taskType == STREAM_NORMAL_TASK)) {
41,079✔
949
      SSDataBlock* pb = taosArrayGet(((SStreamDataBlock*)pInput)->blocks, 0);
14✔
950

951
      if ((taskLevel == TASK_LEVEL__AGG) || ((taskLevel == TASK_LEVEL__SOURCE) && (!pTask->info.hasAggTasks))) {
24!
952
        EStreamType blockType = pb->info.type;
10✔
953

954
        if (pTask->hTaskInfo.id.streamId == 0) {
10!
955
          stError("s-task:%s related re-calculate stream task is dropping, failed to start re-calculate", id);
×
956
          streamFreeQitem(pInput);
×
957
          return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
958
        }
959

960
        if (pTask->info.trigger != STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
10!
961
          stError("s-task:%s invalid trigger model, expect:%d, actually:%d, not exec tasks", id,
×
962
                  STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE, pTask->info.trigger);
963
          streamFreeQitem(pInput);
×
964
          return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
965
        }
966

967
        SStreamTask* pHTask = NULL;
10✔
968
        code = streamMetaAcquireTask(pTask->pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId, &pHTask);
10✔
969
        if (code != 0) {
10!
970
          stError("s-task:%s failed to acquire related recalculate task:0x%x, not start the recalculation, code:%s", id,
×
971
                  (int32_t)pTask->hTaskInfo.id.taskId, tstrerror(code));
972
          streamFreeQitem(pInput);
×
973
          return code;
×
974
        }
975

976
        if (blockType == STREAM_RECALCULATE_START) {
10✔
977
          // start the related recalculate task to do recalculate
978
          stDebug("s-task:%s start recalculate task to do recalculate:0x%x", id, pHTask->id.taskId);
6!
979

980
          if (taskLevel == TASK_LEVEL__SOURCE) {
6✔
981
            code = streamStartScanHistoryAsync(pHTask, 0);
4✔
982
          } else {  // for agg task, in normal stream queue to execute
983
            SStreamDataBlock* pRecalBlock = NULL;
2✔
984
            code = streamCreateRecalculateBlock(pTask, &pRecalBlock, STREAM_RECALCULATE_START);
2✔
985
            if (code) {
2!
986
              stError("s-task:%s failed to generate recalculate block, code:%s", id, tstrerror(code));
×
987
            } else {
988
              code = streamTaskPutDataIntoInputQ(pHTask, (SStreamQueueItem*)pRecalBlock);
2✔
989
              if (code != TSDB_CODE_SUCCESS) {
2!
990
                stError("s-task:%s failed to put recalculate block into q, code:%s", pTask->id.idStr, tstrerror(code));
×
991
              } else {
992
                stDebug("s-task:%s put recalculate block into inputQ", pHTask->id.idStr);
2!
993
              }
994
              code = streamTrySchedExec(pHTask, false);
2✔
995
            }
996
          }
997
        }
998
        streamMetaReleaseTask(pTask->pMeta, pHTask);
10✔
999
      } else if ((taskLevel == TASK_LEVEL__SOURCE) && pTask->info.hasAggTasks) {
4!
1000
        code = continueDispatchRecalculateStart((SStreamDataBlock*)pInput, pTask);
4✔
1001
        pInput = NULL;
4✔
1002
      }
1003
    }
1004

1005
    if (type == STREAM_INPUT__RECALCULATE) {
41,079✔
1006
      streamFreeQitem(pInput);
16✔
1007
    }
1008

1009
    if (code) {
41,066!
1010
      return code;
×
1011
    }
1012

1013
    if (taskType == STREAM_RECALCUL_TASK && taskLevel == TASK_LEVEL__AGG && type != STREAM_INPUT__RECALCULATE) {
41,066✔
1014
      bool complete = qStreamScanhistoryFinished(pTask->exec.pExecutor);
2✔
1015
      if (complete) {
2!
1016
        stDebug("s-task:%s recalculate agg task complete recalculate procedure", id);
2!
1017
        return 0;
2✔
1018
      }
1019
    }
1020

1021
    double el = (taosGetTimestampMs() - st) / 1000.0;
41,063✔
1022
    if (el > 2.0) {  // elapsed more than 5 sec, not occupy the CPU anymore
41,063✔
1023
      stDebug("s-task:%s occupy more than 2.0s, release the exec threads and idle for 500ms", id);
9✔
1024
      streamTaskSetIdleInfo(pTask, 500);
9✔
1025
      return code;
9✔
1026
    }
1027
  }
1028

1029
  
1030
}
1031

1032
// the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not
1033
// be updated by tryExec function, therefore, the schedStatus will always be the TASK_SCHED_STATUS__WAITING.
1034
bool streamTaskIsIdle(const SStreamTask* pTask) {
3,913✔
1035
  ETaskStatus status = streamTaskGetStatus(pTask).state;
3,913✔
1036
  return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE || status == TASK_STATUS__STOP ||
3,915!
1037
          status == TASK_STATUS__DROPPING);
1038
}
1039

1040
bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
66,331✔
1041
  SStreamTaskState pState = streamTaskGetStatus(pTask);
66,331✔
1042

1043
  ETaskStatus st = pState.state;
66,359✔
1044
  if (pStatus != NULL) {
66,359!
1045
    *pStatus = pState.name;
66,372✔
1046
  }
1047

1048
  // pause & halt will still run for sink tasks.
1049
  if (streamTaskIsSinkTask(pTask)) {
66,359✔
1050
    return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK ||
12,317✔
1051
            st == TASK_STATUS__PAUSE || st == TASK_STATUS__HALT);
38,846!
1052
  } else {
1053
    return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK ||
39,790✔
1054
            st == TASK_STATUS__HALT);
1055
  }
1056
}
1057

1058
static bool shouldNotCont(SStreamTask* pTask) {
68,016✔
1059
  int32_t       level = pTask->info.taskLevel;
68,016✔
1060
  SStreamQueue* pQueue = pTask->inputq.queue;
68,016✔
1061
  ETaskStatus   status = streamTaskGetStatus(pTask).state;
68,016✔
1062

1063
  // 1. task should jump out
1064
  bool quit = (status == TASK_STATUS__STOP) || (status == TASK_STATUS__PAUSE) || (status == TASK_STATUS__DROPPING);
68,037!
1065

1066
  // 2. checkpoint procedure, the source task's checkpoint queue is empty, not read from ordinary queue
1067
  bool emptyCkQueue = (taosQueueItemSize(pQueue->pChkptQueue) == 0);
68,037✔
1068

1069
  // 3. no data in ordinary queue
1070
  bool emptyBlockQueue = (streamQueueGetNumOfItems(pQueue) == 0);
68,057✔
1071

1072
  if (quit) {
68,056✔
1073
    return true;
21✔
1074
  } else {
1075
    if (status == TASK_STATUS__CK && level == TASK_LEVEL__SOURCE) {
68,035✔
1076
      // in checkpoint procedure, we only check whether the controller queue is empty or not
1077
      return emptyCkQueue;
6,640✔
1078
    } else { // otherwise, if the block queue is empty, not continue.
1079
      return emptyBlockQueue && emptyCkQueue;
61,395!
1080
    }
1081
  }
1082
}
1083

1084
int32_t streamResumeTask(SStreamTask* pTask) {
67,692✔
1085
  const char* id = pTask->id.idStr;
67,692✔
1086
  int32_t     level = pTask->info.taskLevel;
67,692✔
1087
  int32_t     code = 0;
67,692✔
1088

1089
  if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) {
67,692!
1090
    stError("s-task:%s invalid sched status:%d, not resume task", pTask->id.idStr, pTask->status.schedStatus);
×
1091
    return code;
×
1092
  }
1093

1094
  while (1) {
1095
    code = doStreamExecTask(pTask);
67,958✔
1096
    if (code) {
68,015✔
1097
      stError("s-task:%s failed to exec stream task, code:%s, continue", id, tstrerror(code));
6!
1098
    }
1099

1100
    streamMutexLock(&pTask->lock);
68,015✔
1101

1102
    if (shouldNotCont(pTask)) {
68,048✔
1103
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
60,758✔
1104
      streamTaskClearSchedIdleInfo(pTask);
60,764✔
1105
      streamMutexUnlock(&pTask->lock);
60,745✔
1106

1107
      setLastExecTs(pTask, taosGetTimestampMs());
60,747✔
1108

1109
      char* p = streamTaskGetStatus(pTask).name;
60,698✔
1110
      stDebug("s-task:%s exec completed, status:%s, sched-status:%d, lastExecTs:%" PRId64, id, p,
60,695✔
1111
              pTask->status.schedStatus, pTask->status.lastExecTs);
1112

1113
      return code;
60,709✔
1114
    } else {
1115
      // check if this task needs to be idle for a while
1116
      if (pTask->status.schedIdleTime > 0) {
7,296✔
1117
        streamTaskResumeInFuture(pTask);
7,030✔
1118

1119
        streamMutexUnlock(&pTask->lock);
7,032✔
1120
        setLastExecTs(pTask, taosGetTimestampMs());
7,032✔
1121
        return code;
7,032✔
1122
      }
1123
    }
1124

1125
    streamMutexUnlock(&pTask->lock);
266✔
1126
  }
1127

1128
  return code;
1129
}
1130

1131
int32_t streamExecTask(SStreamTask* pTask) {
60,726✔
1132
  // this function may be executed by multi-threads, so status check is required.
1133
  const char* id = pTask->id.idStr;
60,726✔
1134
  int32_t     code = 0;
60,726✔
1135

1136
  int8_t schedStatus = streamTaskSetSchedStatusActive(pTask);
60,726✔
1137
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
60,784!
1138
    code = streamResumeTask(pTask);
60,786✔
1139
  } else {
1140
    char* p = streamTaskGetStatus(pTask).name;
×
1141
    stDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, p,
×
1142
            pTask->status.schedStatus);
1143
  }
1144

1145
  return code;
60,760✔
1146
}
1147

1148
int32_t streamTaskReleaseState(SStreamTask* pTask) {
2,192✔
1149
  stDebug("s-task:%s release exec state", pTask->id.idStr);
2,192✔
1150
  void* pExecutor = pTask->exec.pExecutor;
2,192✔
1151

1152
  int32_t code = TSDB_CODE_SUCCESS;
2,192✔
1153
  if (pExecutor != NULL) {
2,192!
1154
    code = qStreamOperatorReleaseState(pExecutor);
2,192✔
1155
  }
1156

1157
  return code;
2,192✔
1158
}
1159

1160
int32_t streamTaskReloadState(SStreamTask* pTask) {
2,192✔
1161
  stDebug("s-task:%s reload exec state", pTask->id.idStr);
2,192✔
1162
  void* pExecutor = pTask->exec.pExecutor;
2,192✔
1163

1164
  int32_t code = TSDB_CODE_SUCCESS;
2,192✔
1165
  if (pExecutor != NULL) {
2,192!
1166
    code = qStreamOperatorReloadState(pExecutor);
2,192✔
1167
  }
1168

1169
  return code;
2,192✔
1170
}
1171

1172
int32_t streamAlignTransferState(SStreamTask* pTask) {
6,748✔
1173
  int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
6,748✔
1174
  int32_t old = atomic_val_compare_exchange_32(&pTask->transferStateAlignCnt, 0, numOfUpstream);
6,751✔
1175
  if (old == 0) {
6,753✔
1176
    stDebug("s-task:%s set the transfer state aligncnt %d", pTask->id.idStr, numOfUpstream);
2,251✔
1177
  }
1178

1179
  return atomic_sub_fetch_32(&pTask->transferStateAlignCnt, 1);
6,753✔
1180
}
1181

1182
int32_t streamAlignRecalculateStart(SStreamTask* pTask) {
4✔
1183
  int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
4✔
1184
  int32_t old = atomic_val_compare_exchange_32(&pTask->recalculateAlignCnt, 0, numOfUpstream);
4✔
1185
  if (old == 0) {
4✔
1186
    stDebug("s-task:%s set start recalculate state aligncnt %d", pTask->id.idStr, numOfUpstream);
2!
1187
  }
1188

1189
  return atomic_sub_fetch_32(&pTask->recalculateAlignCnt, 1);
4✔
1190
}
1191

1192
int32_t continueDispatchRecalculateStart(SStreamDataBlock* pBlock, SStreamTask* pTask) {
4✔
1193
  pBlock->srcTaskId = pTask->id.taskId;
4✔
1194
  pBlock->srcVgId = pTask->pMeta->vgId;
4✔
1195

1196
  int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
4✔
1197
  if (code == 0) {
4!
1198
    code = streamDispatchStreamBlock(pTask);
4✔
1199
  } else {
1200
    stError("s-task:%s failed to put recalculate start block into outputQ, code:%s", pTask->id.idStr, tstrerror(code));
×
1201
    streamFreeQitem((SStreamQueueItem*)pBlock);
×
1202
  }
1203

1204
  return code;
4✔
1205
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc