• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3558

17 Dec 2024 06:05AM UTC coverage: 59.778% (+1.6%) from 58.204%
#3558

push

travis-ci

web-flow
Merge pull request #29179 from taosdata/merge/mainto3.0

merge: form main to 3.0 branch

132787 of 287595 branches covered (46.17%)

Branch coverage included in aggregate %.

104 of 191 new or added lines in 5 files covered. (54.45%)

6085 existing lines in 168 files now uncovered.

209348 of 284746 relevant lines covered (73.52%)

8164844.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.58
/source/libs/stream/src/streamExec.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamInt.h"
17

18
// maximum allowed processed block batches. One block may include several submit blocks
19
#define MAX_STREAM_EXEC_BATCH_NUM         32
20
#define STREAM_RESULT_DUMP_THRESHOLD      300
21
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1)  // 1MiB result data
22
#define STREAM_SCAN_HISTORY_TIMESLICE     1000           // 1000 ms
23
#define MIN_INVOKE_INTERVAL               50             // 50ms
24
#define FILL_HISTORY_TASK_EXEC_INTERVAL   5000           // 5 sec
25

26
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
27
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
28
                                  int32_t* totalBlocks);
29

30
bool streamTaskShouldStop(const SStreamTask* pTask) {
1,552,149✔
31
  SStreamTaskState pState = streamTaskGetStatus(pTask);
1,552,149✔
32
  return (pState.state == TASK_STATUS__STOP) || (pState.state == TASK_STATUS__DROPPING);
1,551,995✔
33
}
34

35
bool streamTaskShouldPause(const SStreamTask* pTask) {
558,901✔
36
  return (streamTaskGetStatus(pTask).state == TASK_STATUS__PAUSE);
558,901✔
37
}
38

39
static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBlock) {
36,198✔
40
  int32_t code = 0;
36,198✔
41
  int32_t type = pTask->outputInfo.type;
36,198✔
42
  if (type == TASK_OUTPUT__TABLE) {
36,198✔
43
    pTask->outputInfo.tbSink.tbSinkFunc(pTask, pTask->outputInfo.tbSink.vnode, pBlock->blocks);
14,922✔
44
    destroyStreamDataBlock(pBlock);
14,924✔
45
  } else if (type == TASK_OUTPUT__SMA) {
21,276✔
46
    pTask->outputInfo.smaSink.smaSink(pTask->outputInfo.smaSink.vnode, pTask->outputInfo.smaSink.smaId, pBlock->blocks);
1✔
47
    destroyStreamDataBlock(pBlock);
1✔
48
  } else {
49
    if (type != TASK_OUTPUT__FIXED_DISPATCH && type != TASK_OUTPUT__SHUFFLE_DISPATCH) {
21,275!
50
      stError("s-task:%s invalid stream output type:%d, internal error", pTask->id.idStr, type);
×
51
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
52
    }
53

54
    code = streamTaskPutDataIntoOutputQ(pTask, pBlock);
21,275✔
55
    if (code != TSDB_CODE_SUCCESS) {
21,277!
56
      destroyStreamDataBlock(pBlock);
×
57
      return code;
×
58
    }
59

60
    // not handle error, if dispatch failed, try next time.
61
    // checkpoint trigger will be checked
62
    code = streamDispatchStreamBlock(pTask);
21,277✔
63
  }
64

65
  return code;
36,201✔
66
}
67

68
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
20,221✔
69
                            int32_t* totalBlocks) {
70
  int32_t numOfBlocks = taosArrayGetSize(pRes);
20,221✔
71
  if (numOfBlocks == 0) {
20,221!
72
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
×
73
    return TSDB_CODE_SUCCESS;
×
74
  }
75

76
  SStreamDataBlock* pStreamBlocks = NULL;
20,221✔
77

78
  int32_t code = createStreamBlockFromResults(pItem, pTask, size, pRes, &pStreamBlocks);
20,221✔
79
  if (code) {
20,221!
80
    stError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
×
81
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
×
82
    return TSDB_CODE_OUT_OF_MEMORY;
×
83
  }
84

85
  stDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
20,221✔
86
          SIZE_IN_MiB(size));
87

88
  code = doOutputResultBlockImpl(pTask, pStreamBlocks);
20,221✔
89
  if (code != TSDB_CODE_SUCCESS) {  // back pressure and record position
20,221!
90
    return code;
×
91
  }
92

93
  *totalSize += size;
20,221✔
94
  *totalBlocks += numOfBlocks;
20,221✔
95

96
  return code;
20,221✔
97
}
98

99
static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, SStreamDataBlock* pRetrieveBlock,
560✔
100
                                     SArray* pRes) {
101
  SSDataBlock block = {0};
560✔
102
  int32_t     num = taosArrayGetSize(pRetrieveBlock->blocks);
560✔
103
  if (num != 1) {
560!
104
    stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num);
×
105
    return TSDB_CODE_INVALID_PARA;
×
106
  }
107

108
  void*   p = taosArrayGet(pRetrieveBlock->blocks, 0);
560✔
109
  int32_t code = assignOneDataBlock(&block, p);
560✔
110
  if (code) {
560!
111
    stError("s-task:%s failed to assign retrieve block, code:%s", pTask->id.idStr, tstrerror(code));
×
112
    return code;
×
113
  }
114

115
  block.info.type = STREAM_PULL_OVER;
560✔
116
  block.info.childId = pTask->info.selfChildId;
560✔
117

118
  p = taosArrayPush(pRes, &block);
560✔
119
  if (p != NULL) {
560!
120
    (*pNumOfBlocks) += 1;
560✔
121
    stDebug("s-task:%s(child %d) retrieve res from upstream completed, QID:0x%" PRIx64, pTask->id.idStr,
560✔
122
            pTask->info.selfChildId, pRetrieveBlock->reqId);
123
  } else {
124
    code = terrno;
×
125
    stError("s-task:%s failed to append pull over block for retrieve data, QID:0x%" PRIx64" code:%s", pTask->id.idStr,
×
126
            pRetrieveBlock->reqId, tstrerror(code));
127
  }
128

129
  return code;
560✔
130
}
131

132
int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
40,338✔
133
  int32_t size = 0;
40,338✔
134
  int32_t numOfBlocks = 0;
40,338✔
135
  int32_t code = TSDB_CODE_SUCCESS;
40,338✔
136
  void*   pExecutor = pTask->exec.pExecutor;
40,338✔
137
  SArray* pRes = NULL;
40,338✔
138

139
  *totalBlocks = 0;
40,338✔
140
  *totalSize = 0;
40,338✔
141

142
  while (1) {
74,879✔
143
    SSDataBlock* output = NULL;
115,217✔
144
    uint64_t     ts = 0;
115,217✔
145

146
    if (pRes == NULL) {
115,217✔
147
      pRes = taosArrayInit(4, sizeof(SSDataBlock));
40,348✔
148
    }
149

150
    if (streamTaskShouldStop(pTask) || (pRes == NULL)) {
115,226!
151
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
33✔
152
      return code;
33✔
153
    }
154

155
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
115,196✔
156
      if (code == TSDB_CODE_QRY_IN_EXEC) {
16!
157
        qResetTaskInfoCode(pExecutor);
×
158
      }
159

160
      if (code == TSDB_CODE_OUT_OF_MEMORY || code == TSDB_CODE_INVALID_PARA || code == TSDB_CODE_FILE_CORRUPTED) {
16!
161
        stFatal("s-task:%s failed to continue execute since %s", pTask->id.idStr, tstrerror(code));
×
162
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
×
163
        return code;
×
164
      } else {
165
        qResetTaskCode(pExecutor);
16✔
166
        continue;
751✔
167
      }
168
    }
169

170
    if (output == NULL) {
115,221✔
171
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
40,348✔
172
         code = doAppendPullOverBlock(pTask, &numOfBlocks, (SStreamDataBlock*) pItem, pRes);
560✔
173
         if (code) {
560✔
174
           taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
1✔
175
           return code;
×
176
         }
177
      }
178

179
      break;
40,347✔
180
    }
181

182
    if (output->info.type == STREAM_RETRIEVE) {
74,873✔
183
      if (streamBroadcastToUpTasks(pTask, output) < 0) {
166✔
184
        // TODO
185
      }
186
      continue;
166✔
187
    } else if (output->info.type == STREAM_CHECKPOINT) {
74,707✔
188
      continue;  // checkpoint block not dispatch to downstream tasks
569✔
189
    }
190

191
    SSDataBlock block = {.info.childId = pTask->info.selfChildId};
74,138✔
192
    code = assignOneDataBlock(&block, output);
74,138✔
193
    if (code) {
74,135!
194
      stError("s-task:%s failed to build result block due to out of memory", pTask->id.idStr);
×
195
      continue;
×
196
    }
197

198
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
74,135✔
199
    numOfBlocks += 1;
74,130✔
200

201
    void* p = taosArrayPush(pRes, &block);
74,130✔
202
    if (p == NULL) {
74,130!
203
      stError("s-task:%s failed to add computing results, the final res may be incorrect", pTask->id.idStr);
×
204
    } else {
205
      stDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
74,130✔
206
              pTask->info.selfChildId, numOfBlocks, SIZE_IN_MiB(size));
207
    }
208

209
    // current output should be dispatched to down stream nodes
210
    if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
74,130!
211
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
5✔
212
      // todo: here we need continue retry to put it into output buffer
213
      if (code != TSDB_CODE_SUCCESS) {
3!
214
        return code;
×
215
      }
216

217
      pRes = NULL;
3✔
218
      size = 0;
3✔
219
      numOfBlocks = 0;
3✔
220
    }
221
  }
222

223
  if (numOfBlocks > 0) {
40,347✔
224
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
20,218✔
225
  } else {
226
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
20,129✔
227
  }
228

229
  return code;
40,346✔
230
}
231

232
// todo contiuous try to create result blocks
233
static int32_t handleScanhistoryResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) {
2,882✔
234
  int32_t code = TSDB_CODE_SUCCESS;
2,882✔
235
  if (taosArrayGetSize(pRes) > 0) {
2,882✔
236
    SStreamDataBlock* pStreamBlocks = NULL;
1,748✔
237
    code = createStreamBlockFromResults(NULL, pTask, size, pRes, &pStreamBlocks);
1,748✔
238
    if (code) {
1,748!
239
      stError("s-task:%s failed to build history result blocks", pTask->id.idStr);
×
240
      return code;
×
241
    }
242

243
    code = doOutputResultBlockImpl(pTask, pStreamBlocks);
1,748✔
244
    if (code != TSDB_CODE_SUCCESS) {  // should not have error code
1,748!
245
      stError("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code));
×
246
    }
247
  } else {
248
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
1,134✔
249
  }
250
  return code;
2,882✔
251
}
252

253
static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t* pSize, bool* pFinish) {
2,888✔
254
  int32_t code = TSDB_CODE_SUCCESS;
2,888✔
255
  void*   exec = pTask->exec.pExecutor;
2,888✔
256
  int32_t numOfBlocks = 0;
2,888✔
257

258
  while (1) {
52,807✔
259
    if (streamTaskShouldStop(pTask)) {
55,695!
260
      break;
×
261
    }
262

263
    if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
55,695!
264
      stDebug("s-task:%s level:%d inputQ is blocked, retry in 5s", pTask->id.idStr, pTask->info.taskLevel);
×
265
      break;
×
266
    }
267

268
    SSDataBlock* output = NULL;
55,695✔
269
    uint64_t     ts = 0;
55,695✔
270
    code = qExecTask(exec, &output, &ts);
55,695✔
271
    if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) {  // if out of memory occurs, quit
55,697!
272
      stError("s-task:%s scan-history data error occurred code:%s, continue scan-history", pTask->id.idStr,
×
273
              tstrerror(code));
274
      qResetTaskCode(exec);
×
275
      continue;
×
276
    }
277

278
    // the generated results before fill-history task been paused, should be dispatched to sink node
279
    if (output == NULL) {
55,697✔
280
      (*pFinish) = qStreamScanhistoryFinished(exec);
2,289✔
281
      break;
2,289✔
282
    }
283

284
    SSDataBlock block = {0};
53,408✔
285
    code = assignOneDataBlock(&block, output);
53,408✔
286
    if (code) {
53,408!
287
      stError("s-task:%s failed to build result block due to out of memory", pTask->id.idStr);
×
288
    }
289

290
    block.info.childId = pTask->info.selfChildId;
53,408✔
291
    void* p = taosArrayPush(pRes, &block);
53,408✔
292
    if (p == NULL) {
53,408!
293
      stError("s-task:%s failed to add computing results, the final res may be incorrect", pTask->id.idStr);
×
294
    }
295

296
    (*pSize) +=
53,407✔
297
        blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
53,408✔
298
    numOfBlocks += 1;
53,407✔
299

300
    if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || (*pSize) >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
53,407✔
301
      stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached",
600!
302
              pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(*pSize), STREAM_RESULT_DUMP_THRESHOLD,
303
              SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD));
304
      break;
600✔
305
    }
306
  }
307
}
2,889✔
308

309
static SScanhistoryDataInfo buildScanhistoryExecRet(EScanHistoryCode code, int32_t idleTime) {
2,451✔
310
  return (SScanhistoryDataInfo){code, idleTime};
2,451✔
311
}
312

313
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
2,451✔
314
  void*       exec = pTask->exec.pExecutor;
2,451✔
315
  bool        finished = false;
2,451✔
316
  const char* id = pTask->id.idStr;
2,451✔
317

318
  if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
2,451!
319
    stError("s-task:%s not source scan-history task, not exec, quit", pTask->id.idStr);
×
320
    return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
×
321
  }
322

323
  if (!pTask->hTaskInfo.operatorOpen) {
2,451✔
324
    int32_t code = qSetStreamOpOpen(exec);
2,289✔
325
    pTask->hTaskInfo.operatorOpen = true;
2,287✔
326
  }
327

328
  while (1) {
438✔
329
    if (streamTaskShouldPause(pTask)) {
2,887!
330
      stDebug("s-task:%s paused from the scan-history task", id);
×
331
      // quit from step1, not continue to handle the step2
332
      return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
2,451✔
333
    }
334

335
    // output queue is full, idle for 5 sec.
336
    if (streamQueueIsFull(pTask->outputq.queue)) {
2,886!
337
      stWarn("s-task:%s outputQ is full, idle for 1sec and retry", id);
×
338
      return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, STREAM_SCAN_HISTORY_TIMESLICE);
×
339
    }
340

341
    if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
2,886!
342
      stWarn("s-task:%s downstream task inputQ blocked, idle for 5sec and retry", id);
×
343
      return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL);
×
344
    }
345

346
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
2,886✔
347
    if (pRes == NULL) {
2,888!
348
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
349
      stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", id, tstrerror(terrno));
×
350
      continue;
×
351
    }
352

353
    int32_t size = 0;
2,888✔
354
    streamScanHistoryDataImpl(pTask, pRes, &size, &finished);
2,888✔
355

356
    if (streamTaskShouldStop(pTask)) {
2,889✔
357
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
7✔
358
      return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
7✔
359
    }
360

361
    // dispatch the generated results, todo fix error
362
    int32_t code = handleScanhistoryResultBlocks(pTask, pRes, size);
2,882✔
363
    if (code) {
2,882!
364
      stError("s-task:%s failed to handle scan result block, code:%s", pTask->id.idStr, tstrerror(code));
×
365
    }
366

367
    if (finished) {
2,882✔
368
      return buildScanhistoryExecRet(TASK_SCANHISTORY_CONT, 0);
2,282✔
369
    }
370

371
    int64_t el = taosGetTimestampMs() - st;
600✔
372
    if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) {
600!
373
      stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id,
162!
374
              pTask->info.fillHistory, el / 1000.0);
375
      return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, 100);
162✔
376
    }
377
  }
378
}
379

380
int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
2,239✔
381
  SStreamMeta* pMeta = pTask->pMeta;
2,239✔
382
  const char*  id = pTask->id.idStr;
2,239✔
383

384
  SStreamTask* pStreamTask = NULL;
2,239✔
385
  int32_t code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
2,239✔
386
  if (pStreamTask == NULL || code != TSDB_CODE_SUCCESS) {
2,239!
387
    stError(
×
388
        "s-task:%s failed to find related stream task:0x%x, may have been destroyed or closed, destroy related "
389
        "fill-history task",
390
        id, (int32_t)pTask->streamTaskId.taskId);
391

392
    // 1. free it and remove fill-history task from disk meta-store
393
    // todo: this function should never be failed.
394
    code = streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
×
395

396
    // 2. save to disk
397
    streamMetaWLock(pMeta);
×
398
    if (streamMetaCommit(pMeta) < 0) {
×
399
      // persist to disk
400
    }
401
    streamMetaWUnLock(pMeta);
×
402
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
403
  } else {
404
    double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.;
2,239✔
405
    stDebug(
2,239✔
406
        "s-task:%s fill-history task end, status:%s, scan wal elapsed time:%.2fSec, update related stream task:%s "
407
        "info, prepare transfer exec state",
408
        id, streamTaskGetStatus(pTask).name, el, pStreamTask->id.idStr);
409
  }
410

411
  ETaskStatus  status = streamTaskGetStatus(pStreamTask).state;
2,239✔
412
  STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
2,239✔
413

414
  // It must be halted for a source stream task, since when the related scan-history-data task start scan the history
415
  // for the step 2.
416
  if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
2,239✔
417
    if (!(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP)) {
2,168!
418
      stError("s-task:%s invalid task status:%d", id, status);
×
419
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
420
    }
421
  } else {
422
    if (!(status == TASK_STATUS__READY || status == TASK_STATUS__PAUSE || status == TASK_STATUS__DROPPING ||
71!
423
          status == TASK_STATUS__STOP)) {
424
      stError("s-task:%s invalid task status:%d", id, status);
×
425
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
426
    }
427
    code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
71✔
428
    if (code != TSDB_CODE_SUCCESS) {
71!
429
      stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", id,
×
430
              pStreamTask->id.idStr, tstrerror(code));
431
      streamMetaReleaseTask(pMeta, pStreamTask);
×
432
      return code;
×
433
    } else {
434
      stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, id);
71✔
435
    }
436
  }
437

438
  // In case of sink tasks, no need to halt them.
439
  // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
440
  // start the task state transfer procedure.
441
  SStreamTaskState pState = streamTaskGetStatus(pStreamTask);
2,239✔
442
  status = pState.state;
2,239✔
443
  char* p = pState.name;
2,239✔
444
  if (status == TASK_STATUS__STOP || status == TASK_STATUS__DROPPING) {
2,239!
445
    stError("s-task:%s failed to transfer state from fill-history task:%s, status:%s", id, pStreamTask->id.idStr, p);
×
446
    streamMetaReleaseTask(pMeta, pStreamTask);
×
447
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
448
  }
449

450
  // 1. expand the query time window for stream task of WAL scanner
451
  if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
2,239✔
452
    // update the scan data range for source task.
453
    stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
2,168✔
454
            ", status:%s, sched-status:%d",
455
            pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
456
            pTimeWindow->ekey, p, pStreamTask->status.schedStatus);
457

458
    code = streamTaskResetTimewindowFilter(pStreamTask);
2,168✔
459
  } else {
460
    stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr);
71✔
461
  }
462

463
  // NOTE: transfer the ownership of executor state before handle the checkpoint block during stream exec
464
  // 2. send msg to mnode to launch a checkpoint to keep the state for current stream
465
  code = streamTaskSendCheckpointReq(pStreamTask);
2,239✔
466

467
  // 3. assign the status to the value that will be kept in disk
468
  pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask).state;
2,239✔
469

470
  // 4. open the inputQ for all upstream tasks
471
  streamTaskOpenAllUpstreamInput(pStreamTask);
2,239✔
472

473
  streamMetaReleaseTask(pMeta, pStreamTask);
2,239✔
474
  return code;
2,239✔
475
}
476

477
static int32_t haltCallback(SStreamTask* pTask, void* param) {
2,065✔
478
  streamTaskOpenAllUpstreamInput(pTask);
2,065✔
479
  return streamTaskSendCheckpointReq(pTask);
2,061✔
480
}
481

482
int32_t streamTransferStatePrepare(SStreamTask* pTask) {
4,307✔
483
  int32_t      code = TSDB_CODE_SUCCESS;
4,307✔
484
  SStreamMeta* pMeta = pTask->pMeta;
4,307✔
485

486
  if (pTask->status.appendTranstateBlock != 1) {
4,307!
487
    stError("s-task:%s not set appendTransBlock flag, internal error", pTask->id.idStr);
×
488
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
489
  }
490

491
  int32_t level = pTask->info.taskLevel;
4,307✔
492
  if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {  // do transfer task operator states.
4,307✔
493
    code = streamTransferStateDoPrepare(pTask);
2,239✔
494
  } else {
495
    // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task.
496
    SStreamTask* pStreamTask = NULL;
2,068✔
497
    code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
2,068✔
498
    if (pStreamTask != NULL) {
2,068!
499
      // halt the related stream sink task
500
      code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, haltCallback, NULL);
2,068✔
501
      if (code != TSDB_CODE_SUCCESS) {
2,067!
502
        stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", pTask->id.idStr,
×
503
                pStreamTask->id.idStr, tstrerror(code));
504
        streamMetaReleaseTask(pMeta, pStreamTask);
×
505
        return code;
×
506
      } else {
507
        stDebug("s-task:%s sink task halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
2,067✔
508
      }
509
      streamMetaReleaseTask(pMeta, pStreamTask);
2,067✔
510
    }
511
  }
512

513
  return code;
4,306✔
514
}
515

516
// set input
517
static int32_t doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pVer, const char* id) {
40,359✔
518
  void*   pExecutor = pTask->exec.pExecutor;
40,359✔
519
  int32_t code = 0;
40,359✔
520

521
  const SStreamQueueItem* pItem = pInput;
40,359✔
522
  if (pItem->type == STREAM_INPUT__GET_RES) {
40,359✔
523
    const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
2,945✔
524
    code = qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
2,945✔
525
    if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
2,944✔
526
      stDebug("s-task:%s set force_window_close as source block, skey:%"PRId64, id, pTrigger->pBlock->info.window.skey);
1,680!
527
      (*pVer) = pTrigger->pBlock->info.window.skey;
1,680✔
528
    }
529
  } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
37,414✔
530
    const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
9,593✔
531
    code = qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
9,593✔
532
    stDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
9,594✔
533
            pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
534
    if ((*pVer) > pSubmit->submit.ver) {
9,593!
535
      stError("s-task:%s invalid recorded ver:%" PRId64 " greater than new block ver:%" PRId64 ", not update", id,
×
536
              *pVer, pSubmit->submit.ver);
537
    } else {
538
      (*pVer) = pSubmit->submit.ver;
9,593✔
539
    }
540
  } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
30,277✔
541
    const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;
2,455✔
542

543
    SArray* pBlockList = pBlock->blocks;
2,455✔
544
    int32_t numOfBlocks = taosArrayGetSize(pBlockList);
2,455✔
545
    stDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer);
2,455✔
546
    code = qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
2,455✔
547

548
  } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
25,366✔
549
    const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;
20,578✔
550

551
    SArray* pBlockList = pMerged->submits;
20,578✔
552
    int32_t numOfBlocks = taosArrayGetSize(pBlockList);
20,578✔
553
    stDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, numOfBlocks,
20,578✔
554
            pMerged->ver);
555
    code = qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
20,578✔
556

557
    if ((*pVer) > pMerged->ver) {
20,571✔
558
      stError("s-task:%s invalid recorded ver:%" PRId64 " greater than new block ver:%" PRId64 ", not update", id,
1!
559
              *pVer, pMerged->ver);
560
    } else {
561
      (*pVer) = pMerged->ver;
20,570✔
562
    }
563

564
  } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
4,788✔
565
    const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
2,624✔
566
    code = qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
2,624✔
567

568
  } else if (pItem->type == STREAM_INPUT__CHECKPOINT || pItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
4,330!
569
    const SStreamDataBlock* pCheckpoint = (const SStreamDataBlock*)pInput;
2,164✔
570
    code = qSetMultiStreamInput(pExecutor, pCheckpoint->blocks, 1, pItem->type);
2,164✔
571

572
  } else {
573
    stError("s-task:%s invalid input block type:%d, discard", id, pItem->type);
×
574
    code = TSDB_CODE_STREAM_INTERNAL_ERROR;
×
575
  }
576

577
  return code;
40,345✔
578
}
579

580
void streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
8,722✔
581
  const char* id = pTask->id.idStr;
8,722✔
582
  int32_t     code = TSDB_CODE_SUCCESS;
8,722✔
583
  int32_t     level = pTask->info.taskLevel;
8,722✔
584
  // dispatch the tran-state block to downstream task immediately
585
  int32_t type = pTask->outputInfo.type;
8,722✔
586

587
  if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SINK) {
8,722✔
588
    int32_t remain = streamAlignTransferState(pTask);
6,548✔
589
    if (remain > 0) {
6,556✔
590
      streamFreeQitem((SStreamQueueItem*)pBlock);
4,416✔
591
      stDebug("s-task:%s receive upstream trans-state msg, not sent remain:%d", id, remain);
4,416✔
592
      return;
4,416✔
593
    }
594
  }
595

596
  // transfer the ownership of executor state
597
  if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
4,314✔
598
    if (level == TASK_LEVEL__SOURCE) {
2,237✔
599
      stDebug("s-task:%s add transfer-state block into outputQ", id);
2,165✔
600
    } else {
601
      stDebug("s-task:%s all upstream tasks send transfer-state block, add transfer-state block into outputQ", id);
72✔
602
    }
603

604
    // agg task should dispatch trans-state msg to sink task, to flush all data to sink task.
605
    if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {
2,237!
606
      pBlock->srcVgId = pTask->pMeta->vgId;
2,237✔
607
      code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
2,237✔
608
      if (code == 0) {
2,237!
609
        code = streamDispatchStreamBlock(pTask);
2,237✔
610
        if (code) {
2,237!
611
          stError("s-task:%s failed to dispatch stream block, code:%s", id, tstrerror(code));
×
612
        }
613
      } else {  // todo put into queue failed, retry
614
        streamFreeQitem((SStreamQueueItem*)pBlock);
×
615
      }
616
    } else {  // level == TASK_LEVEL__SINK
617
      streamFreeQitem((SStreamQueueItem*)pBlock);
×
618
    }
619
  } else {  // non-dispatch task, do task state transfer directly
620
    streamFreeQitem((SStreamQueueItem*)pBlock);
2,077✔
621
    stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, level);
2,078✔
622

623
    code = streamTransferStatePrepare(pTask);
2,078✔
624
    if (code != TSDB_CODE_SUCCESS) {
2,076!
625
      stError("s-task:%s failed to prepare transfer state, code:%s", id, tstrerror(code));
×
626
      int8_t status = streamTaskSetSchedStatusInactive(pTask);  // let's ignore this return status
×
627
    }
628
  }
629
}
630

631
// static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
632
static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; }
64,763✔
633

634
static void doRecordThroughput(STaskExecStatisInfo* pInfo, int64_t totalBlocks, int64_t totalSize, int64_t blockSize,
40,369✔
635
                               double st, const char* id) {
636
  double el = (taosGetTimestampMs() - st) / 1000.0;
40,369✔
637

638
  stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%" PRId64, id,
40,369✔
639
          el, SIZE_IN_MiB(totalSize), totalBlocks);
640

641
  pInfo->outputDataBlocks += totalBlocks;
40,370✔
642
  pInfo->outputDataSize += totalSize;
40,370✔
643
  if (fabs(el - 0.0) <= DBL_EPSILON) {
40,370✔
644
    pInfo->procsThroughput = 0;
14,404✔
645
    pInfo->outputThroughput = 0;
14,404✔
646
  } else {
647
    pInfo->outputThroughput = (totalSize / el);
25,966✔
648
    pInfo->procsThroughput = (blockSize / el);
25,966✔
649
  }
650
}
40,370✔
651

652
static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, int32_t num) {
40,357✔
653
  const char*      id = pTask->id.idStr;
40,357✔
654
  int32_t          blockSize = 0;
40,357✔
655
  int64_t          st = taosGetTimestampMs();
40,359✔
656
  SCheckpointInfo* pInfo = &pTask->chkInfo;
40,359✔
657
  int64_t          ver = pInfo->processedVer;
40,359✔
658
  int64_t          totalSize = 0;
40,359✔
659
  int32_t          totalBlocks = 0;
40,359✔
660
  int32_t          code = 0;
40,359✔
661

662
  stDebug("s-task:%s start to process batch blocks, num:%d, type:%s", id, num, streamQueueItemGetTypeStr(pBlock->type));
40,359✔
663

664
  code = doSetStreamInputBlock(pTask, pBlock, &ver, id);
40,360✔
665
  if (code) {
40,343!
666
    stError("s-task:%s failed to set input block, not exec for these blocks", id);
×
667
    return code;
×
668
  }
669

670
  code = streamTaskExecImpl(pTask, pBlock, &totalSize, &totalBlocks);
40,343✔
671
  if (code) {
40,379✔
672
    return code;
9✔
673
  }
674

675
  doRecordThroughput(&pTask->execInfo, totalBlocks, totalSize, blockSize, st, pTask->id.idStr);
40,370✔
676

677
  // update the currentVer if processing the submitted blocks.
678
  if (!(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer)) {
40,371!
679
    stError("s-task:%s invalid info, checkpointVer:%" PRId64 ", nextProcessVer:%" PRId64 " currentVer:%" PRId64, id,
×
680
            pInfo->checkpointVer, pInfo->nextProcessVer, ver);
681
    return code;
×
682
  }
683

684
  if (ver != pInfo->processedVer) {
40,371✔
685
    stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64
31,861✔
686
            " ckpt:%" PRId64,
687
            id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer);
688
    pInfo->processedVer = ver;
31,861✔
689
  }
690

691
  return code;
40,371✔
692
}
693

694
// do nothing after sync executor state to storage backend, untill checkpoint is completed.
695
static int32_t doHandleChkptBlock(SStreamTask* pTask) {
2,099✔
696
  int32_t     code = 0;
2,099✔
697
  const char* id = pTask->id.idStr;
2,099✔
698

699
  streamMutexLock(&pTask->lock);
2,099✔
700
  SStreamTaskState pState = streamTaskGetStatus(pTask);
2,103✔
701
  if (pState.state == TASK_STATUS__CK) {  // todo other thread may change the status
2,102!
702
    stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
2,102✔
703
    code = streamTaskBuildCheckpoint(pTask);  // ignore this error msg, and continue
2,102✔
704
  } else {                                    // todo refactor
705
    if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
×
706
      code = streamTaskSendCheckpointSourceRsp(pTask);
×
707
    } else {
708
      code = streamTaskSendCheckpointReadyMsg(pTask);
×
709
    }
710

711
    if (code != TSDB_CODE_SUCCESS) {
×
712
      // todo: let's retry send rsp to upstream/mnode
713
      stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", id, 0,
×
714
              tstrerror(code));
715
    }
716
  }
717

718
  streamMutexUnlock(&pTask->lock);
2,105✔
719
  return code;
2,105✔
720
}
721

722
int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) {
2,162✔
723
  const char* id = pTask->id.idStr;
2,162✔
724

725
  // 1. transfer the ownership of executor state
726
  bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
2,162✔
727
  if (dropRelHTask) {
2,165✔
728
    STaskId*     pHTaskId = &pTask->hTaskInfo.id;
2,133✔
729
    SStreamTask* pHTask = NULL;
2,133✔
730
    int32_t      code = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
2,133✔
731
    if (code == TSDB_CODE_SUCCESS) {  // ignore the error code.
2,135!
732
      code = streamTaskReleaseState(pHTask);
2,135✔
733
      if (code) {
2,135!
734
        stError("s-task:%s failed to release query state, code:%s", pHTask->id.idStr, tstrerror(code));
×
735
      }
736

737
      if (code == TSDB_CODE_SUCCESS) {
2,135!
738
        code = streamTaskReloadState(pTask);
2,135✔
739
        if (code) {
2,136!
740
          stError("s-task:%s failed to reload query state, code:%s", pTask->id.idStr, tstrerror(code));
×
741
        }
742
      }
743

744
      stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
2,136✔
745
              streamTaskGetStatus(pHTask).name);
746
      // todo execute qExecTask to fetch the reload-generated result, if this is stream is for session window query.
747
      /*
748
       * while(1) {
749
       * qExecTask()
750
       * }
751
       * // put into the output queue.
752
       */
753
      streamMetaReleaseTask(pTask->pMeta, pHTask);
2,136✔
754
    } else {
755
      stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id,
×
756
              (int32_t)pHTaskId->taskId);
757
    }
758
  } else {
759
    stDebug("s-task:%s no transfer-state needed", id);
32✔
760
  }
761

762
  // 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V.
763
  int32_t code = doStreamTaskExecImpl(pTask, pCheckpointBlock, 1);
2,168✔
764
  if (code) {
2,166!
765
    stError("s-task:%s failed to exec stream task before checkpoint, code:%s", id, tstrerror(code));
×
766
  }
767

768
  return code;
2,166✔
769
}
770

771
/**
772
 * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
773
 * appropriate batch of blocks should be handled in 5 to 10 sec.
774
 */
775
static int32_t doStreamExecTask(SStreamTask* pTask) {
66,848✔
776
  const char* id = pTask->id.idStr;
66,848✔
777
  int32_t     code = 0;
66,848✔
778

779
  // merge multiple input data if possible in the input queue.
780
  stDebug("s-task:%s start to extract data block from inputQ", id);
66,848✔
781

782
  while (1) {
69,619✔
783
    int32_t           blockSize = 0;
136,487✔
784
    int32_t           numOfBlocks = 0;
136,487✔
785
    SStreamQueueItem* pInput = NULL;
136,487✔
786

787
    if (streamTaskShouldStop(pTask) || (streamTaskGetStatus(pTask).state == TASK_STATUS__UNINIT)) {
136,487✔
788
      stDebug("s-task:%s stream task is stopped", id);
33✔
789
      return 0;
66,902✔
790
    }
791

792
    if (streamQueueIsFull(pTask->outputq.queue)) {
136,448!
UNCOV
793
      stTrace("s-task:%s outputQ is full, idle for 500ms and retry", id);
×
UNCOV
794
      streamTaskSetIdleInfo(pTask, 1000);
×
795
      return 0;
×
796
    }
797

798
    if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
136,485!
799
      stTrace("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", id);
×
800
      streamTaskSetIdleInfo(pTask, 1000);
×
801
      return 0;
×
802
    }
803

804
    if (taosGetTimestampMs() - pTask->status.lastExecTs < MIN_INVOKE_INTERVAL) {
136,451✔
805
      stDebug("s-task:%s invoke exec too fast, idle and retry in 50ms", id);
10,735✔
806
      streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
10,735✔
807
      return 0;
10,736✔
808
    }
809

810
    EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
125,716✔
811
    if (ret == EXEC_AFTER_IDLE) {
125,718!
812
      streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
×
813
      return 0;
×
814
    } else {
815
      if (pInput == NULL) {
125,719✔
816
        return 0;
54,024✔
817
      }
818
    }
819

820
    pTask->execInfo.inputDataBlocks += numOfBlocks;
71,695✔
821
    pTask->execInfo.inputDataSize += blockSize;
71,695✔
822

823
    // dispatch checkpoint msg to all downstream tasks
824
    int32_t type = pInput->type;
71,695✔
825
    if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
71,695✔
826
      code = streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput);
8,449✔
827
      if (code != 0) {
8,456!
828
        stError("s-task:%s failed to process checkpoint-trigger block, code:%s", pTask->id.idStr, tstrerror(code));
×
829
      }
830
      continue;
31,416✔
831
    }
832

833
    if (type == STREAM_INPUT__TRANS_STATE) {
63,246✔
834
      streamProcessTransstateBlock(pTask, (SStreamDataBlock*)pInput);
8,723✔
835
      continue;
8,728✔
836
    }
837

838
    if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
54,523✔
839
      if (type != STREAM_INPUT__DATA_BLOCK && type != STREAM_INPUT__CHECKPOINT) {
14,233!
840
        stError("s-task:%s invalid block type:%d for sink task, discard", id, type);
×
841
        continue;
×
842
      }
843

844
      int64_t st = taosGetTimestampMs();
14,233✔
845

846
      // here only handle the data block sink operation
847
      if (type == STREAM_INPUT__DATA_BLOCK) {
14,233!
848
        pTask->execInfo.sink.dataSize += blockSize;
14,233✔
849
        stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize));
14,233✔
850
        code = doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput);
14,233✔
851
        if (code != TSDB_CODE_SUCCESS) {
14,232!
852
          return code;
×
853
        }
854

855
        double el = (taosGetTimestampMs() - st) / 1000.0;
14,232✔
856
        if (fabs(el - 0.0) <= DBL_EPSILON) {
14,232✔
857
          pTask->execInfo.procsThroughput = 0;
7,798✔
858
        } else {
859
          pTask->execInfo.procsThroughput = (blockSize / el);
6,434✔
860
        }
861

862
        continue;
14,232✔
863
      }
864
    }
865

866
    if (type == STREAM_INPUT__CHECKPOINT) {
40,290✔
867
      code = doHandleChkptBlock(pTask);
2,099✔
868
      streamFreeQitem(pInput);
2,105✔
869
      return code;
2,100✔
870
    } else {
871
      code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks);
38,191✔
872
      streamFreeQitem(pInput);
38,214✔
873
      if (code) {
38,212✔
874
        return code;
9✔
875
      }
876
    }
877
  }
878
}
879

880
// the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not
881
// be updated by tryExec function, therefore, the schedStatus will always be the TASK_SCHED_STATUS__WAITING.
882
bool streamTaskIsIdle(const SStreamTask* pTask) {
2,635✔
883
  ETaskStatus status = streamTaskGetStatus(pTask).state;
2,635✔
884
  return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE || status == TASK_STATUS__STOP ||
2,635!
885
          status == TASK_STATUS__DROPPING);
886
}
887

888
bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
63,287✔
889
  SStreamTaskState pState = streamTaskGetStatus(pTask);
63,287✔
890

891
  ETaskStatus st = pState.state;
63,299✔
892
  if (pStatus != NULL) {
63,299!
893
    *pStatus = pState.name;
63,306✔
894
  }
895

896
  // pause & halt will still run for sink tasks.
897
  if (streamTaskIsSinkTask(pTask)) {
63,299✔
898
    return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK ||
13,055!
899
            st == TASK_STATUS__PAUSE || st == TASK_STATUS__HALT);
41,427!
900
  } else {
901
    return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK ||
34,916✔
902
            st == TASK_STATUS__HALT);
903
  }
904
}
905

906
int32_t streamResumeTask(SStreamTask* pTask) {
64,732✔
907
  const char* id = pTask->id.idStr;
64,732✔
908
  int32_t     code = 0;
64,732✔
909

910
  if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) {
64,732!
911
    stError("s-task:%s invalid sched status:%d, not resume task", pTask->id.idStr, pTask->status.schedStatus);
×
912
    return code;
×
913
  }
914

915
  while (1) {
2,125✔
916
    code = doStreamExecTask(pTask);
66,857✔
917
    if (code) {
66,903✔
918
      stError("s-task:%s failed to exec stream task, code:%s", id, tstrerror(code));
9!
919
      return code;
9✔
920
    }
921
    // check if continue
922
    streamMutexLock(&pTask->lock);
66,894✔
923

924
    int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
66,901✔
925
    if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
66,901!
926
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
55,743✔
927
      streamTaskClearSchedIdleInfo(pTask);
55,750✔
928
      streamMutexUnlock(&pTask->lock);
55,739✔
929

930
      setLastExecTs(pTask, taosGetTimestampMs());
55,739✔
931

932
      char* p = streamTaskGetStatus(pTask).name;
55,729✔
933
      stDebug("s-task:%s exec completed, status:%s, sched-status:%d, lastExecTs:%" PRId64, id, p,
55,732✔
934
              pTask->status.schedStatus, pTask->status.lastExecTs);
935

936
      return code;
55,735✔
937
    } else {
938
      // check if this task needs to be idle for a while
939
      if (pTask->status.schedIdleTime > 0) {
11,157✔
940
        streamTaskResumeInFuture(pTask);
9,032✔
941

942
        streamMutexUnlock(&pTask->lock);
9,037✔
943
        setLastExecTs(pTask, taosGetTimestampMs());
9,036✔
944
        return code;
9,035✔
945
      }
946
    }
947

948
    streamMutexUnlock(&pTask->lock);
2,125✔
949
  }
950

951
  return code;
952
}
953

954
int32_t streamExecTask(SStreamTask* pTask) {
55,767✔
955
  // this function may be executed by multi-threads, so status check is required.
956
  const char* id = pTask->id.idStr;
55,767✔
957
  int32_t     code = 0;
55,767✔
958

959
  int8_t schedStatus = streamTaskSetSchedStatusActive(pTask);
55,767✔
960
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
55,808!
961
    code = streamResumeTask(pTask);
55,808✔
962
  } else {
963
    char* p = streamTaskGetStatus(pTask).name;
×
964
    stDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, p,
×
965
            pTask->status.schedStatus);
966
  }
967

968
  return code;
55,808✔
969
}
970

971
int32_t streamTaskReleaseState(SStreamTask* pTask) {
2,135✔
972
  stDebug("s-task:%s release exec state", pTask->id.idStr);
2,135✔
973
  void* pExecutor = pTask->exec.pExecutor;
2,135✔
974

975
  int32_t code = TSDB_CODE_SUCCESS;
2,135✔
976
  if (pExecutor != NULL) {
2,135!
977
    code = qStreamOperatorReleaseState(pExecutor);
2,135✔
978
  }
979

980
  return code;
2,135✔
981
}
982

983
int32_t streamTaskReloadState(SStreamTask* pTask) {
2,135✔
984
  stDebug("s-task:%s reload exec state", pTask->id.idStr);
2,135✔
985
  void* pExecutor = pTask->exec.pExecutor;
2,135✔
986

987
  int32_t code = TSDB_CODE_SUCCESS;
2,135✔
988
  if (pExecutor != NULL) {
2,135!
989
    code = qStreamOperatorReloadState(pExecutor);
2,135✔
990
  }
991

992
  return code;
2,136✔
993
}
994

995
int32_t streamAlignTransferState(SStreamTask* pTask) {
6,546✔
996
  int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
6,546✔
997
  int32_t old = atomic_val_compare_exchange_32(&pTask->transferStateAlignCnt, 0, numOfUpstream);
6,548✔
998
  if (old == 0) {
6,557✔
999
    stDebug("s-task:%s set the transfer state aligncnt %d", pTask->id.idStr, numOfUpstream);
2,241✔
1000
  }
1001

1002
  return atomic_sub_fetch_32(&pTask->transferStateAlignCnt, 1);
6,557✔
1003
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc