• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3840

04 Apr 2025 03:35PM UTC coverage: 63.027% (+0.6%) from 62.382%
#3840

push

travis-ci

web-flow
Merge pull request #30653 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

155471 of 315065 branches covered (49.35%)

Branch coverage included in aggregate %.

241637 of 314991 relevant lines covered (76.71%)

18825079.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.02
/source/dnode/vnode/src/tqCommon/tqCommon.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsgcb.h"
17
#include "tq.h"
18
#include "tstream.h"
19

20
typedef struct SMStreamCheckpointReadyRspMsg {
21
  SMsgHead head;
22
  int64_t  streamId;
23
  int32_t  upstreamTaskId;
24
  int32_t  upstreamNodeId;
25
  int32_t  downstreamTaskId;
26
  int32_t  downstreamNodeId;
27
  int64_t  checkpointId;
28
  int32_t  transId;
29
} SMStreamCheckpointReadyRspMsg;
30

31
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
32

33
int32_t tqExpandStreamTask(SStreamTask* pTask) {
14,410✔
34
  SStreamMeta* pMeta = pTask->pMeta;
14,410✔
35
  int32_t      vgId = pMeta->vgId;
14,410✔
36
  int64_t      st = taosGetTimestampMs();
14,411✔
37
  int64_t      streamId = 0;
14,411✔
38
  int32_t      taskId = 0;
14,411✔
39
  int32_t      code = 0;
14,411✔
40

41
  tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId);
14,411✔
42

43
  if (pTask->info.fillHistory != STREAM_NORMAL_TASK) {
14,412✔
44
    streamId = pTask->streamTaskId.streamId;
4,836✔
45
    taskId = pTask->streamTaskId.taskId;
4,836✔
46
  } else {
47
    streamId = pTask->id.streamId;
9,576✔
48
    taskId = pTask->id.taskId;
9,576✔
49
  }
50

51
  // sink task does not need the pState
52
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
14,412✔
53
    if (pTask->info.fillHistory == STREAM_RECALCUL_TASK) {
7,533✔
54
      pTask->pRecalState = streamStateRecalatedOpen(pMeta->path, pTask, pTask->id.streamId, pTask->id.taskId);
36✔
55
      if (pTask->pRecalState == NULL) {
36!
56
        tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
×
57
        return terrno;
×
58
      } else {
59
        tqDebug("s-task:%s recal state:%p", pTask->id.idStr, pTask->pRecalState);
36!
60
      }
61
    }
62

63
    pTask->pState = streamStateOpen(pMeta->path, pTask, streamId, taskId);
7,533✔
64
    if (pTask->pState == NULL) {
7,535!
65
      tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
×
66
      return terrno;
×
67
    } else {
68
      tqDebug("s-task:%s stream state:%p", pTask->id.idStr, pTask->pState);
7,535✔
69
    }
70
  }
71

72
  SReadHandle handle = {
14,412✔
73
      .checkpointId = pTask->chkInfo.checkpointId,
14,412✔
74
      .pStateBackend = NULL,
75
      .fillHistory = pTask->info.fillHistory,
14,412✔
76
      .winRange = pTask->dataRange.window,
77
      .pOtherBackend = NULL,
78
  };
79

80
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__MERGE) {
14,412!
81
    handle.vnode = ((STQ*)pMeta->ahandle)->pVnode;
7,236✔
82
    handle.initTqReader = 1;
7,236✔
83
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
7,176✔
84
    handle.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
299✔
85
  }
86

87
  initStorageAPI(&handle.api);
14,412✔
88

89
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG ||
14,413✔
90
      pTask->info.taskLevel == TASK_LEVEL__MERGE) {
6,878!
91
    if (pTask->info.fillHistory == STREAM_RECALCUL_TASK) {
7,535✔
92
      handle.pStateBackend = pTask->pRecalState;
36✔
93
      handle.pOtherBackend = pTask->pState;
36✔
94
    } else {
95
      handle.pStateBackend = pTask->pState;
7,499✔
96
      handle.pOtherBackend = NULL;
7,499✔
97
    }
98

99
    code = qCreateStreamExecTaskInfo(&pTask->exec.pExecutor, pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
7,535✔
100
    if (code) {
7,528!
101
      tqError("s-task:%s failed to expand task, code:%s", pTask->id.idStr, tstrerror(code));
×
102
      return code;
×
103
    }
104

105
    code = qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
7,528✔
106
    if (code) {
7,532!
107
      return code;
×
108
    }
109

110
    code = qSetStreamNotifyInfo(pTask->exec.pExecutor, pTask->notifyInfo.notifyEventTypes,
15,064✔
111
                                pTask->notifyInfo.pSchemaWrapper, pTask->notifyInfo.stbFullName,
7,532✔
112
                                IS_NEW_SUBTB_RULE(pTask), &pTask->notifyEventStat);
7,532!
113
    if (code) {
7,529!
114
      tqError("s-task:%s failed to set stream notify info, code:%s", pTask->id.idStr, tstrerror(code));
×
115
      return code;
×
116
    }
117

118
    qSetStreamMergeInfo(pTask->exec.pExecutor, pTask->pVTables);
7,529✔
119
  }
120

121
  streamSetupScheduleTrigger(pTask);
14,410✔
122

123
  double el = (taosGetTimestampMs() - st) / 1000.0;
14,404✔
124
  tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el);
14,404✔
125

126
  return code;
14,412✔
127
}
128

129
void tqSetRestoreVersionInfo(SStreamTask* pTask) {
14,858✔
130
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
14,858✔
131

132
  // checkpoint ver is the kept version, handled data should be the next version.
133
  if (pChkInfo->checkpointId != 0) {
14,858✔
134
    pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
464✔
135
    pChkInfo->processedVer = pChkInfo->checkpointVer;
464✔
136
    pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
464✔
137

138
    tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
464!
139
           pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
140
  }
141

142
  pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
14,858✔
143
}
14,858✔
144

145
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
80✔
146
  int32_t vgId = pMeta->vgId;
80✔
147
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
80✔
148
  if (numOfTasks == 0) {
80!
149
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
150
    return 0;
×
151
  }
152

153
  tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
80!
154

155
  int32_t type = restart ? STREAM_EXEC_T_RESTART_ALL_TASKS : STREAM_EXEC_T_START_ALL_TASKS;
80!
156
  return streamTaskSchedTask(cb, vgId, 0, 0, type, false);
80✔
157
}
158

159
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
9,545✔
160
  int32_t vgId = pMeta->vgId;
9,545✔
161
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
9,545✔
162
  if (numOfTasks == 0) {
9,545!
163
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
164
    return 0;
×
165
  }
166

167
  tqDebug("vgId:%d start task:0x%x async", vgId, taskId);
9,545✔
168
  return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK, false);
9,545✔
169
}
170

171
// this is to process request from transaction, always return true.
172
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader) {
1,093✔
173
  int32_t                  vgId = pMeta->vgId;
1,093✔
174
  char*                    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1,093✔
175
  int32_t                  len = pMsg->contLen - sizeof(SMsgHead);
1,093✔
176
  SRpcMsg                  rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
1,093✔
177
  int64_t                  st = taosGetTimestampMs();
1,093✔
178
  bool                     updated = false;
1,093✔
179
  int32_t                  code = 0;
1,093✔
180
  SStreamTask*             pTask = NULL;
1,093✔
181
  SStreamTask*             pHTask = NULL;
1,093✔
182
  SStreamTaskNodeUpdateMsg req = {0};
1,093✔
183
  SDecoder                 decoder;
184

185
  tDecoderInit(&decoder, (uint8_t*)msg, len);
1,093✔
186
  code = tDecodeStreamTaskUpdateMsg(&decoder, &req);
1,093✔
187
  tDecoderClear(&decoder);
1,093✔
188

189
  if (code < 0) {
1,093!
190
    rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
×
191
    tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code));
×
192
    tDestroyNodeUpdateMsg(&req);
×
193
    return rsp.code;
×
194
  }
195

196
  int32_t gError = streamGetFatalError(pMeta);
1,093✔
197
  if (gError != 0) {
1,093!
198
    tqError("vgId:%d global fatal occurs, code:%s, ts:%" PRId64 " func:%s", pMeta->vgId, tstrerror(gError),
×
199
            pMeta->fatalInfo.ts, pMeta->fatalInfo.func);
200
    return 0;
×
201
  }
202

203
  // update the nodeEpset when it exists
204
  streamMetaWLock(pMeta);
1,093✔
205

206
  // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
207
  STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
1,093✔
208
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
1,093✔
209
  if (code != 0) {
1,093!
210
    tqError("vgId:%d failed to acquire task:0x%x when handling update task epset, it may have been dropped", vgId,
×
211
            req.taskId);
212
    rsp.code = TSDB_CODE_SUCCESS;
×
213
    streamMetaWUnLock(pMeta);
×
214
    taosArrayDestroy(req.pNodeList);
×
215
    return rsp.code;
×
216
  }
217

218
  const char* idstr = pTask->id.idStr;
1,093✔
219

220
  if (req.transId <= 0) {
1,093!
221
    tqError("vgId:%d invalid update nodeEp task, transId:%d, discard", vgId, req.taskId);
×
222
    rsp.code = TSDB_CODE_SUCCESS;
×
223

224
    streamMetaReleaseTask(pMeta, pTask);
×
225
    streamMetaWUnLock(pMeta);
×
226

227
    taosArrayDestroy(req.pNodeList);
×
228
    return rsp.code;
×
229
  }
230

231
  // info needs to be kept till the new trans to update the nodeEp arrived.
232
  bool update = streamMetaInitUpdateTaskList(pMeta, req.transId);
1,093✔
233
  if (!update) {
1,093!
234
    rsp.code = TSDB_CODE_SUCCESS;
×
235

236
    streamMetaReleaseTask(pMeta, pTask);
×
237
    streamMetaWUnLock(pMeta);
×
238

239
    taosArrayDestroy(req.pNodeList);
×
240
    return rsp.code;
×
241
  }
242

243
  // duplicate update epset msg received, discard this redundant message
244
  STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
1,093✔
245

246
  void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
1,093✔
247
  if (pReqTask != NULL) {
1,092!
248
    tqDebug("s-task:%s (vgId:%d) already update in transId:%d, discard the nodeEp update msg", idstr, vgId,
×
249
            req.transId);
250
    rsp.code = TSDB_CODE_SUCCESS;
×
251

252
    streamMetaReleaseTask(pMeta, pTask);
×
253
    streamMetaWUnLock(pMeta);
×
254

255
    taosArrayDestroy(req.pNodeList);
×
256
    return rsp.code;
×
257
  }
258

259
  updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
1,092✔
260

261
  // send the checkpoint-source-rsp for source task to end the checkpoint trans in mnode
262
  code = streamTaskSendCheckpointsourceRsp(pTask);
1,093✔
263
  if (code) {
1,093!
264
    tqError("%s failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
×
265
  }
266
  streamTaskResetStatus(pTask);
1,093✔
267

268
  streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
1,093✔
269

270
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
1,093✔
271
    code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->hTaskInfo.id, &pHTask);
25✔
272
    if (code != 0) {
25!
273
      tqError(
×
274
          "vgId:%d failed to acquire fill-history task:0x%x when handling update, may have been dropped already, rel "
275
          "stream task:0x%x",
276
          vgId, (uint32_t)pTask->hTaskInfo.id.taskId, req.taskId);
277
      CLEAR_RELATED_FILLHISTORY_TASK(pTask);
×
278
    } else {
279
      tqDebug("s-task:%s fill-history task update nodeEp along with stream task", pHTask->id.idStr);
25!
280
      bool updateEpSet = streamTaskUpdateEpsetInfo(pHTask, req.pNodeList);
25✔
281
      if (updateEpSet) {
25!
282
        updated = updateEpSet;
25✔
283
      }
284

285
      streamTaskResetStatus(pHTask);
25✔
286
      streamTaskStopMonitorCheckRsp(&pHTask->taskCheckInfo, pHTask->id.idStr);
25✔
287
    }
288
  }
289

290
  // stream do update the nodeEp info, write it into stream meta.
291
  if (updated) {
1,093✔
292
    tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId);
32!
293
    code = streamMetaSaveTaskInMeta(pMeta, pTask);
32✔
294
    if (code) {
32!
295
      tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code));
×
296
    }
297

298
    if (pHTask != NULL) {
32✔
299
      code = streamMetaSaveTaskInMeta(pMeta, pHTask);
25✔
300
      if (code) {
25!
301
        tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code));
×
302
      }
303
    }
304
  } else {
305
    tqDebug("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId);
1,061!
306
  }
307

308
  code = streamTaskStop(pTask);
1,093✔
309
  if (code) {
1,093!
310
    tqError("s-task:%s vgId:%d failed to stop task, code:%s", idstr, vgId, tstrerror(code));
×
311
  }
312

313
  if (pHTask != NULL) {
1,093✔
314
    code = streamTaskStop(pHTask);
25✔
315
    if (code) {
25!
316
      tqError("s-task:%s vgId:%d failed to stop related history task, code:%s", idstr, vgId, tstrerror(code));
×
317
    }
318
  }
319

320
  // keep info
321
  streamMetaAddIntoUpdateTaskList(pMeta, pTask, (pHTask != NULL) ? (pHTask) : NULL, req.transId, st);
1,093✔
322
  streamMetaReleaseTask(pMeta, pTask);
1,093✔
323
  streamMetaReleaseTask(pMeta, pHTask);
1,093✔
324

325
  rsp.code = TSDB_CODE_SUCCESS;
1,093✔
326

327
  // possibly only handle the stream task.
328
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
1,093✔
329
  int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
1,093✔
330

331
  if (restored && isLeader) {
1,093✔
332
    tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId);
157!
333
    pMeta->startInfo.tasksWillRestart = 1;
157✔
334
  }
335

336
  if (updateTasks < numOfTasks) {
1,093✔
337
    if (isLeader) {
545✔
338
      tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
539!
339
              updateTasks, (numOfTasks - updateTasks));
340
    } else {
341
      tqDebug("vgId:%d closed tasks:%d, unclosed:%d, follower not restart tasks", vgId, updateTasks,
6!
342
              (numOfTasks - updateTasks));
343
    }
344
  } else {
345
    if ((code = streamMetaCommit(pMeta)) < 0) {
548!
346
      // always return true
347
      streamMetaWUnLock(pMeta);
×
348
      taosArrayDestroy(req.pNodeList);
×
349
      return TSDB_CODE_SUCCESS;
×
350
    }
351

352
    streamMetaClearSetUpdateTaskListComplete(pMeta);
548✔
353

354
    if (isLeader) {
548✔
355
      if (!restored) {
542✔
356
        tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId);
462!
357
      } else {
358
        tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId);
80!
359
#if 0
360
      taosMSleep(5000);// for test purpose, to trigger the leader election
361
#endif
362
        code = tqStreamTaskStartAsync(pMeta, cb, true);
80✔
363
        if (code) {
80!
364
          tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code));
×
365
        }
366
      }
367
    } else {
368
      tqDebug("vgId:%d follower nodes not restart tasks", vgId);
6!
369
    }
370
  }
371

372
  streamMetaWUnLock(pMeta);
1,093✔
373
  taosArrayDestroy(req.pNodeList);
1,093✔
374
  return rsp.code;  // always return true
1,093✔
375
}
376

377
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
59,188✔
378
  char*   msgStr = pMsg->pCont;
59,188✔
379
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
59,188✔
380
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
59,188✔
381

382
  SStreamDispatchReq req = {0};
59,188✔
383

384
  SDecoder decoder;
385
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
59,188✔
386
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
59,170!
387
    tDecoderClear(&decoder);
×
388
    return TSDB_CODE_MSG_DECODE_ERROR;
×
389
  }
390
  tDecoderClear(&decoder);
59,192✔
391

392
  tqDebug("s-task:0x%x recv dispatch msg from 0x%x(vgId:%d)", req.taskId, req.upstreamTaskId, req.upstreamNodeId);
59,197✔
393

394
  SStreamTask* pTask = NULL;
59,205✔
395
  int32_t      code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
59,205✔
396
  if (pTask && (code == 0)) {
59,182!
397
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
59,199✔
398
    if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0) {
59,199!
399
      return -1;
×
400
    }
401
    tCleanupStreamDispatchReq(&req);
59,203✔
402
    streamMetaReleaseTask(pMeta, pTask);
59,206✔
403
    return 0;
59,212✔
404
  } else {
405
    tqError("vgId:%d failed to find task:0x%x to handle the dispatch req, it may have been destroyed already",
×
406
            pMeta->vgId, req.taskId);
407

408
    SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
×
409
    if (pRspHead == NULL) {
8!
410
      tqError("s-task:0x%x send dispatch error rsp, out of memory", req.taskId);
×
411
      return terrno;
×
412
    }
413

414
    pRspHead->vgId = htonl(req.upstreamNodeId);
8✔
415
    if (pRspHead->vgId == 0) {
8!
416
      tqError("vgId:%d invalid dispatch msg from upstream to task:0x%x", pMeta->vgId, req.taskId);
×
417
      return TSDB_CODE_INVALID_MSG;
×
418
    }
419

420
    SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
8✔
421
    pRsp->streamId = htobe64(req.streamId);
8✔
422
    pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
8✔
423
    pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
8✔
424
    pRsp->downstreamNodeId = htonl(pMeta->vgId);
8✔
425
    pRsp->downstreamTaskId = htonl(req.taskId);
8✔
426
    pRsp->msgId = htonl(req.msgId);
8✔
427
    pRsp->stage = htobe64(req.stage);
8✔
428
    pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;
8✔
429

430
    int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
8✔
431
    SRpcMsg rsp = {.code = TSDB_CODE_STREAM_TASK_NOT_EXIST, .info = pMsg->info, .contLen = len, .pCont = pRspHead};
8✔
432
    tqError("s-task:0x%x send dispatch error rsp, no task", req.taskId);
8!
433

434
    tmsgSendRsp(&rsp);
8✔
435
    tCleanupStreamDispatchReq(&req);
8✔
436

437
    return 0;
8✔
438
  }
439
}
440

441
int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
59,236✔
442
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
59,236✔
443

444
  int32_t vgId = pMeta->vgId;
59,236✔
445
  pRsp->upstreamNodeId = htonl(pRsp->upstreamNodeId);
59,236✔
446
  pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
59,236✔
447
  pRsp->streamId = htobe64(pRsp->streamId);
59,236✔
448
  pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId);
59,234✔
449
  pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId);
59,234✔
450
  pRsp->stage = htobe64(pRsp->stage);
59,234✔
451
  pRsp->msgId = htonl(pRsp->msgId);
59,235✔
452

453
  tqDebug("s-task:0x%x vgId:%d recv dispatch-rsp from 0x%x vgId:%d", pRsp->upstreamTaskId, pRsp->upstreamNodeId,
59,235✔
454
          pRsp->downstreamTaskId, pRsp->downstreamNodeId);
455

456
  SStreamTask* pTask = NULL;
59,235✔
457
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId, &pTask);
59,235✔
458
  if (pTask && (code == 0)) {
59,231!
459
    code = streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
59,224✔
460
    streamMetaReleaseTask(pMeta, pTask);
59,229✔
461
    return code;
59,225✔
462
  } else {
463
    tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId);
7✔
464
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
8✔
465
  }
466
}
467

468
int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
530✔
469
  char*    msgStr = pMsg->pCont;
530✔
470
  char*    msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
530✔
471
  int32_t  msgLen = pMsg->contLen - sizeof(SMsgHead);
530✔
472
  int32_t  code = 0;
530✔
473
  SDecoder decoder;
474

475
  SStreamRetrieveReq req;
476
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
530✔
477
  code = tDecodeStreamRetrieveReq(&decoder, &req);
529✔
478
  tDecoderClear(&decoder);
530✔
479

480
  if (code) {
529!
481
    tqError("vgId:%d failed to decode retrieve msg, discard it", pMeta->vgId);
×
482
    return code;
×
483
  }
484

485
  SStreamTask* pTask = NULL;
529✔
486
  code = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId, &pTask);
529✔
487
  if (pTask == NULL || code != 0) {
530!
488
    tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
×
489
            req.dstTaskId);
490
    tCleanupStreamRetrieveReq(&req);
×
491
    return code;
×
492
  }
493

494
  // enqueue
495
  tqDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d), QID:0x%" PRIx64, pTask->id.idStr,
530✔
496
          pTask->pMeta->vgId, pTask->info.taskLevel, req.srcTaskId, req.srcNodeId, req.reqId);
497

498
  // if task is in ck status, set current ck failed
499
  streamTaskSetCheckpointFailed(pTask);
530✔
500

501
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
530✔
502
    code = streamProcessRetrieveReq(pTask, &req);
524✔
503
  } else {
504
    req.srcNodeId = pTask->info.nodeId;
6✔
505
    req.srcTaskId = pTask->id.taskId;
6✔
506
    code = streamTaskBroadcastRetrieveReq(pTask, &req);
6✔
507
  }
508

509
  if (code != TSDB_CODE_SUCCESS) {  // return error not send rsp manually
530!
510
    tqError("s-task:0x%x vgId:%d failed to process retrieve request from 0x%x, code:%s", req.dstTaskId, req.dstNodeId,
×
511
            req.srcTaskId, tstrerror(code));
512
  } else {  // send rsp manually only on success.
513
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
530✔
514
    streamTaskSendRetrieveRsp(&req, &rsp);
530✔
515
  }
516

517
  streamMetaReleaseTask(pMeta, pTask);
530✔
518
  tCleanupStreamRetrieveReq(&req);
530✔
519

520
  // always return success, to disable the auto rsp
521
  return code;
530✔
522
}
523

524
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
23,545✔
525
  char*   msgStr = pMsg->pCont;
23,545✔
526
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
23,545✔
527
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
23,545✔
528
  int32_t code = 0;
23,545✔
529

530
  SStreamTaskCheckReq req;
531
  SStreamTaskCheckRsp rsp = {0};
23,545✔
532

533
  SDecoder decoder;
534

535
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
23,545✔
536
  code = tDecodeStreamTaskCheckReq(&decoder, &req);
23,534✔
537
  tDecoderClear(&decoder);
23,523✔
538

539
  if (code) {
23,542!
540
    tqError("vgId:%d decode check msg failed, not handle this msg", pMeta->vgId);
×
541
    return code;
×
542
  }
543

544
  streamTaskProcessCheckMsg(pMeta, &req, &rsp);
23,542✔
545
  return streamTaskSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId);
23,554✔
546
}
547

548
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
23,558✔
549
  char*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
23,558✔
550
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
23,558✔
551
  int32_t vgId = pMeta->vgId;
23,558✔
552
  int32_t code = TSDB_CODE_SUCCESS;
23,558✔
553

554
  SStreamTaskCheckRsp rsp;
555

556
  SDecoder decoder;
557
  tDecoderInit(&decoder, (uint8_t*)pReq, len);
23,558✔
558
  code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
23,559✔
559
  if (code < 0) {
23,544!
560
    terrno = TSDB_CODE_INVALID_MSG;
×
561
    tDecoderClear(&decoder);
×
562
    tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno));
×
563
    return -1;
×
564
  }
565

566
  tDecoderClear(&decoder);
23,544✔
567
  tqDebug("tq task:0x%x (vgId:%d) recv check rsp(QID:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId,
23,563✔
568
          rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
569

570
  if (!isLeader) {
23,561!
571
    tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
×
572
            rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
573
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId, true);
×
574
  }
575

576
  SStreamTask* pTask = NULL;
23,561✔
577
  code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId, &pTask);
23,561✔
578
  if ((pTask == NULL) || (code != 0)) {
23,561!
579
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId, true);
67✔
580
  }
581

582
  code = streamTaskProcessCheckRsp(pTask, &rsp);
23,494✔
583
  streamMetaReleaseTask(pMeta, pTask);
23,494✔
584
  return code;
23,491✔
585
}
586

587
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
8,741✔
588
  int32_t vgId = pMeta->vgId;
8,741✔
589
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
8,741✔
590
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
8,741✔
591
  int32_t code = 0;
8,741✔
592

593
  SStreamCheckpointReadyMsg req = {0};
8,741✔
594

595
  SDecoder decoder;
596
  tDecoderInit(&decoder, (uint8_t*)msg, len);
8,741✔
597
  if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) {
8,739!
598
    code = TSDB_CODE_MSG_DECODE_ERROR;
×
599
    tDecoderClear(&decoder);
×
600
    return code;
×
601
  }
602
  tDecoderClear(&decoder);
8,738✔
603

604
  SStreamTask* pTask = NULL;
8,739✔
605
  code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
8,739✔
606
  if (code != 0) {
8,737!
607
    tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId);
×
608
    return code;
×
609
  }
610

611
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
8,737!
612
    tqDebug("vgId:%d s-task:%s recv invalid the checkpoint-ready msg from task:0x%x (vgId:%d), discard", vgId,
×
613
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
614
    streamMetaReleaseTask(pMeta, pTask);
×
615
    return TSDB_CODE_INVALID_MSG;
×
616
  } else {
617
    tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId,
8,737✔
618
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
619
  }
620

621
  code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamNodeId, req.downstreamTaskId);
8,737✔
622
  streamMetaReleaseTask(pMeta, pTask);
8,741✔
623
  if (code) {
8,744!
624
    return code;
×
625
  }
626

627
  {  // send checkpoint ready rsp
628
    SMStreamCheckpointReadyRspMsg* pReadyRsp = rpcMallocCont(sizeof(SMStreamCheckpointReadyRspMsg));
8,744✔
629
    if (pReadyRsp == NULL) {
8,746!
630
      return terrno;
×
631
    }
632

633
    pReadyRsp->upstreamTaskId = req.upstreamTaskId;
8,746✔
634
    pReadyRsp->upstreamNodeId = req.upstreamNodeId;
8,746✔
635
    pReadyRsp->downstreamTaskId = req.downstreamTaskId;
8,746✔
636
    pReadyRsp->downstreamNodeId = req.downstreamNodeId;
8,746✔
637
    pReadyRsp->checkpointId = req.checkpointId;
8,746✔
638
    pReadyRsp->streamId = req.streamId;
8,746✔
639
    pReadyRsp->head.vgId = htonl(req.downstreamNodeId);
8,746✔
640

641
    SRpcMsg rsp = {.code = 0, .info = pMsg->info, .pCont = pReadyRsp, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)};
8,746✔
642
    tmsgSendRsp(&rsp);
8,746✔
643

644
    pMsg->info.handle = NULL;  // disable auto rsp
8,747✔
645
  }
646

647
  return code;
8,747✔
648
}
649

650
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
14,128✔
651
                                     bool isLeader, bool restored) {
652
  int32_t code = 0;
14,128✔
653
  int32_t vgId = pMeta->vgId;
14,128✔
654
  int32_t numOfTasks = 0;
14,128✔
655
  int32_t taskId = -1;
14,128✔
656
  int64_t streamId = -1;
14,128✔
657
  bool    added = false;
14,128✔
658
  int32_t size = sizeof(SStreamTask);
14,128✔
659

660
  if (tsDisableStream) {
14,128!
661
    tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
×
662
    return code;
×
663
  }
664

665
  tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
14,128✔
666

667
  // 1.deserialize msg and build task
668
  SStreamTask* pTask = taosMemoryCalloc(1, size);
14,129!
669
  if (pTask == NULL) {
14,132!
670
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);
×
671
    return terrno;
×
672
  }
673

674
  SDecoder decoder;
675
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
14,132✔
676
  code = tDecodeStreamTask(&decoder, pTask);
14,131✔
677
  tDecoderClear(&decoder);
14,132✔
678

679
  if (code != TSDB_CODE_SUCCESS) {
14,133!
680
    taosMemoryFree(pTask);
×
681
    return TSDB_CODE_INVALID_MSG;
×
682
  }
683

684
  // 2.save task, use the latest commit version as the initial start version of stream task.
685
  taskId = pTask->id.taskId;
14,133✔
686
  streamId = pTask->id.streamId;
14,133✔
687

688
  streamMetaWLock(pMeta);
14,133✔
689
  code = streamMetaRegisterTask(pMeta, sversion, pTask, &added);
14,134✔
690
  numOfTasks = streamMetaGetNumOfTasks(pMeta);
14,141✔
691
  streamMetaWUnLock(pMeta);
14,141✔
692

693
  if (code < 0) {
14,141!
694
    tqError("vgId:%d failed to register s-task:0x%x into meta, existed tasks:%d, code:%s", vgId, taskId, numOfTasks,
×
695
            tstrerror(code));
696
    return code;
×
697
  }
698

699
  // added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if
700
  // it is added into the meta store
701
  if (added) {
14,141✔
702
    // only handled in the leader node
703
    if (isLeader) {
14,139✔
704
      tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
14,087✔
705

706
      if (restored) {
14,087!
707
        SStreamTask* p = NULL;
14,087✔
708
        code = streamMetaAcquireTask(pMeta, streamId, taskId, &p);
14,087✔
709
        if ((p != NULL) && (code == 0) && (p->info.fillHistory == 0)) {
14,087!
710
          code = tqStreamStartOneTaskAsync(pMeta, cb, streamId, taskId);
9,269✔
711
        }
712

713
        if (p != NULL) {
14,087!
714
          streamMetaReleaseTask(pMeta, p);
14,087✔
715
        }
716
      } else {
717
        tqWarn("s-task:0x%x not launched since vnode(vgId:%d) not ready", taskId, vgId);
×
718
      }
719

720
    } else {
721
      tqDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId);
52!
722
    }
723
  } else {
724
    tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store, total:%d", vgId, taskId, numOfTasks);
2!
725
  }
726

727
  return code;
14,141✔
728
}
729

730
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
6,927✔
731
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
6,927✔
732
  int32_t              code = 0;
6,927✔
733
  int32_t              vgId = pMeta->vgId;
6,927✔
734
  STaskId              hTaskId = {0};
6,927✔
735
  SStreamTask*         pTask = NULL;
6,927✔
736

737
  tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
6,927✔
738

739
  streamMetaWLock(pMeta);
6,928✔
740

741
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
6,952✔
742
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
6,952✔
743
  if (code == 0) {
6,947!
744
    if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
6,949✔
745
      hTaskId.streamId = pTask->hTaskInfo.id.streamId;
1,383✔
746
      hTaskId.taskId = pTask->hTaskInfo.id.taskId;
1,383✔
747
    }
748

749
    // clear the relationship, and then release the stream tasks, to avoid invalid accessing of already freed
750
    // related stream(history) task
751
    streamTaskSetRemoveBackendFiles(pTask);
6,949✔
752
    code = streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt);
6,930✔
753
    streamMetaReleaseTask(pMeta, pTask);
6,945✔
754

755
    if (code) {
6,953!
756
      tqError("s-task:0x%x failed to clear related fill-history info, still exists", pReq->taskId);
×
757
    }
758
  }
759

760
  // drop the related fill-history task firstly
761
  if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
6,951!
762
    tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
1,383✔
763
    code = streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
1,383✔
764
    if (code) {
1,384!
765
      tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x failed", pReq->taskId, vgId,
×
766
              (int32_t)hTaskId.taskId);
767
    }
768
  }
769

770
  // drop the stream task now
771
  code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
6,952✔
772
  if (code) {
6,948!
773
    tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId);
×
774
  }
775

776
  // commit the update
777
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
6,948✔
778
  tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
6,949✔
779
  if (numOfTasks == 0) {
6,949✔
780
    streamMetaResetStartInfo(&pMeta->startInfo, vgId);
1,623✔
781
  }
782

783
  if (streamMetaCommit(pMeta) < 0) {
6,949✔
784
    // persist to disk
785
  }
786

787
  streamMetaWUnLock(pMeta);
6,956✔
788
  tqDebug("vgId:%d process drop task:0x%x completed", vgId, pReq->taskId);
6,956✔
789

790
  return 0;  // always return success
6,956✔
791
}
792

793
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg) {
9,384✔
794
  SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
9,384✔
795
  int32_t                    code = 0;
9,384✔
796
  int32_t                    vgId = pMeta->vgId;
9,384✔
797
  SStreamTask*               pTask = NULL;
9,384✔
798

799
  tqDebug("vgId:%d receive msg to update-checkpoint-info for s-task:0x%x", vgId, pReq->taskId);
9,384✔
800

801
  streamMetaWLock(pMeta);
9,387✔
802

803
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
9,391✔
804
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
9,391✔
805
  if (code == 0) {
9,393!
806
    code = streamTaskUpdateTaskCheckpointInfo(pTask, restored, pReq);
9,393✔
807
    streamMetaReleaseTask(pMeta, pTask);
9,397✔
808
  } else {  // failed to get the task.
809
    int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
×
810
    tqError(
×
811
        "vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, numOfTasks:%d, it may have been "
812
        "dropped already",
813
        vgId, pReq->taskId, numOfTasks);
814
  }
815

816
  streamMetaWUnLock(pMeta);
9,397✔
817
  // always return success when handling the requirement issued by mnode during transaction.
818
  return TSDB_CODE_SUCCESS;
9,397✔
819
}
820

821
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
80✔
822
  int32_t         vgId = pMeta->vgId;
80✔
823
  int32_t         code = 0;
80✔
824
  int64_t         st = taosGetTimestampMs();
80✔
825
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
80✔
826

827
  if (pStartInfo->startAllTasks == 1) {
80✔
828
    // wait for the checkpoint id rsp, this rsp will be expired
829
    if (pStartInfo->curStage == START_MARK_REQ_CHKPID) {
47!
830
      SStartTaskStageInfo* pCurStageInfo = taosArrayGetLast(pStartInfo->pStagesList);
×
831
      tqInfo("vgId:%d only mark the req consensus checkpointId flag, reqTs:%"PRId64 " ignore and continue", vgId, pCurStageInfo->ts);
×
832

833
      taosArrayClear(pStartInfo->pStagesList);
×
834
      pStartInfo->curStage = 0;
×
835
      goto _start;
×
836

837
    } else if (pStartInfo->curStage == START_WAIT_FOR_CHKPTID) {
47!
838
      SStartTaskStageInfo* pCurStageInfo = taosArrayGetLast(pStartInfo->pStagesList);
47✔
839
      tqInfo("vgId:%d already sent consensus-checkpoint msg(waiting for chkptid) expired, reqTs:%" PRId64
47!
840
             " rsp will be discarded",
841
             vgId, pCurStageInfo->ts);
842

843
      taosArrayClear(pStartInfo->pStagesList);
47✔
844
      pStartInfo->curStage = 0;
47✔
845
      goto _start;
47✔
846

847
    } else if (pStartInfo->curStage == START_CHECK_DOWNSTREAM) {
×
848
      int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet);
×
849
      taosHashGetSize(pStartInfo->pFailedTaskSet);
×
850

851
      int32_t newTotal = taosArrayGetSize(pStartInfo->pRecvChkptIdTasks);
×
852
      tqDebug(
×
853
          "vgId:%d start all tasks procedure is interrupted by transId:%d, wait for partial tasks rsp. recv check "
854
          "downstream results, received:%d results, total req tasks:%d",
855
          vgId, pMeta->updateInfo.activeTransId, numOfRecv, newTotal);
856

857
      bool allRsp = allCheckDownstreamRspPartial(pStartInfo, newTotal, pMeta->vgId);
×
858
      if (allRsp) {
×
859
        tqDebug("vgId:%d all partial results received, continue the restart procedure", pMeta->vgId);
×
860
        streamMetaResetStartInfo(pStartInfo, vgId);
×
861
        goto _start;
×
862
      } else {
863
        pStartInfo->restartCount += 1;
×
864
        SStartTaskStageInfo* pCurStageInfo = taosArrayGetLast(pStartInfo->pStagesList);
×
865

866
        tqDebug("vgId:%d in start tasks procedure (check downstream), reqTs:%" PRId64
×
867
                ", inc restartCounter by 1 and wait for it completes, "
868
                "remaining restart:%d",
869
                vgId, pCurStageInfo->ts, pStartInfo->restartCount);
870
      }
871
    } else {
872
      tqInfo("vgId:%d in start procedure, but not start to do anything yet, do nothing", vgId);
×
873
    }
874

875
    return TSDB_CODE_SUCCESS;
×
876
  }
877

878
_start:
33✔
879

880
  pStartInfo->startAllTasks = 1;
80✔
881
  terrno = 0;
80✔
882
  tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d, ts:%" PRId64, vgId,
80!
883
         pMeta->updateInfo.completeTransId, pMeta->updateInfo.completeTs);
884

885
  streamMetaClear(pMeta);
80✔
886

887
  int64_t el = taosGetTimestampMs() - st;
80✔
888
  tqInfo("vgId:%d clear&close stream meta completed, elapsed time:%.3fs", vgId, el / 1000.);
80!
889

890
  streamMetaLoadAllTasks(pMeta);
80✔
891

892
  if (isLeader && !tsDisableStream) {
80!
893
    code = streamMetaStartAllTasks(pMeta);
80✔
894
  } else {
895
    streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
×
896
    pStartInfo->restartCount = 0;
×
897
    tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId);
×
898
  }
899

900
  code = terrno;
80✔
901
  return code;
80✔
902
}
903

904
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
92,037✔
905
  int32_t  code = 0;
92,037✔
906
  int32_t  vgId = pMeta->vgId;
92,037✔
907
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
92,037✔
908
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
92,037✔
909
  SDecoder decoder;
910

911
  SStreamTaskRunReq req = {0};
92,037✔
912
  tDecoderInit(&decoder, (uint8_t*)msg, len);
92,037✔
913
  if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
92,215!
914
    tqError("vgId:%d failed to decode task run req, code:%s", pMeta->vgId, tstrerror(code));
×
915
    tDecoderClear(&decoder);
×
916
    return TSDB_CODE_SUCCESS;
×
917
  }
918

919
  tDecoderClear(&decoder);
92,197✔
920

921
  int32_t type = req.reqType;
92,211✔
922
  if (type == STREAM_EXEC_T_START_ONE_TASK) {
92,211✔
923
    code = streamMetaStartOneTask(pMeta, req.streamId, req.taskId);
9,573✔
924
    return 0;
9,544✔
925
  } else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
82,638✔
926
    streamMetaWLock(pMeta);
8,948✔
927
    code = streamMetaStartAllTasks(pMeta);
8,949✔
928
    streamMetaWUnLock(pMeta);
8,949✔
929
    return 0;
8,949✔
930
  } else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) {
73,690✔
931
    streamMetaWLock(pMeta);
80✔
932
    code = restartStreamTasks(pMeta, isLeader);
80✔
933
    streamMetaWUnLock(pMeta);
80✔
934
    return 0;
67✔
935
  } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
73,610✔
936
    code = streamMetaStopAllTasks(pMeta);
4,651✔
937
    return 0;
4,736✔
938
  } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
68,959!
939
    code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId, true);
×
940
    return code;
×
941
  } else if (type == STREAM_EXEC_T_STOP_ONE_TASK) {
68,959!
942
    code = streamMetaStopOneTask(pMeta, req.streamId, req.taskId);
×
943
    return code;
×
944
  } else if (type == STREAM_EXEC_T_RESUME_TASK) {  // task resume to run after idle for a while
68,959✔
945
    SStreamTask* pTask = NULL;
7,972✔
946
    code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
7,972✔
947

948
    if (pTask != NULL && (code == 0)) {
7,981!
949
      char* pStatus = NULL;
7,983✔
950
      if (streamTaskReadyToRun(pTask, &pStatus)) {
7,983!
951
        int64_t execTs = pTask->status.lastExecTs;
7,983✔
952
        int32_t idle = taosGetTimestampMs() - execTs;
7,979✔
953
        tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs);
7,979✔
954

955
        code = streamResumeTask(pTask);
7,979✔
956
      } else {
957
        int8_t status = streamTaskSetSchedStatusInactive(pTask);
×
958
        tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
×
959
                pTask->id.idStr, pStatus, status);
960
      }
961
      streamMetaReleaseTask(pMeta, pTask);
7,982✔
962
    }
963

964
    return code;
7,984✔
965
  }
966

967
  SStreamTask* pTask = NULL;
60,987✔
968
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
60,987✔
969
  if ((pTask != NULL) && (code == 0)) {  // even in halt status, the data in inputQ must be processed
60,871!
970
    char* p = NULL;
60,868✔
971
    if (streamTaskReadyToRun(pTask, &p)) {
60,868✔
972
      tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId,
60,814✔
973
              pTask->id.idStr, p, pTask->chkInfo.nextProcessVer);
974
      (void)streamExecTask(pTask);
60,814✔
975
    } else {
976
      int8_t status = streamTaskSetSchedStatusInactive(pTask);
19✔
977
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
26!
978
              pTask->id.idStr, p, status);
979
    }
980

981
    streamMetaReleaseTask(pMeta, pTask);
60,838✔
982
    return 0;
60,860✔
983
  } else {  // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
984
    // todo add one function to handle this
985
    tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, req.taskId);
3!
986
    return code;
11✔
987
  }
988
}
989

990
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
112✔
991
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
112✔
992
  int32_t         vgId = pMeta->vgId;
112✔
993
  bool            scanWal = false;
112✔
994
  int32_t         code = 0;
112✔
995

996
//  streamMetaWLock(pMeta);
997
  if (pStartInfo->startAllTasks == 1) {
112!
998
    tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId,
×
999
            pMeta->startInfo.restartCount);
1000
  } else {  // not in starting procedure
1001
    bool allReady = streamMetaAllTasksReady(pMeta);
112✔
1002

1003
    if ((pStartInfo->restartCount > 0) && (!allReady)) {
112!
1004
      // if all tasks are ready now, do NOT restart again, and reset the value of pStartInfo->restartCount
1005
      pStartInfo->restartCount -= 1;
×
1006
      tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", vgId, pMeta->role,
×
1007
              pStartInfo->restartCount);
1008

1009
      return restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
×
1010
    } else {
1011
      if (pStartInfo->restartCount == 0) {
112!
1012
        tqDebug("vgId:%d start all tasks completed in callbackFn, restartCounter is 0", pMeta->vgId);
112✔
1013
      } else if (allReady) {
×
1014
        pStartInfo->restartCount = 0;
×
1015
        tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
×
1016
      }
1017

1018
      scanWal = true;
112✔
1019
    }
1020
  }
1021

1022
//  streamMetaWUnLock(pMeta);
1023

1024
  return code;
112✔
1025
}
1026

1027
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
×
1028
  SVResetStreamTaskReq* pReq = (SVResetStreamTaskReq*)pMsg;
×
1029

1030
  SStreamTask* pTask = NULL;
×
1031
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
×
1032
  if (pTask == NULL || (code != 0)) {
×
1033
    tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already",
×
1034
            pMeta->vgId, pReq->taskId);
1035
    return TSDB_CODE_SUCCESS;
×
1036
  }
1037

1038
  tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
×
1039

1040
  streamMutexLock(&pTask->lock);
×
1041

1042
  streamTaskSetFailedCheckpointId(pTask, pReq->chkptId);
×
1043
  streamTaskClearCheckInfo(pTask, true);
×
1044

1045
  // clear flag set during do checkpoint, and open inputQ for all upstream tasks
1046
  SStreamTaskState pState = streamTaskGetStatus(pTask);
×
1047
  if (pState.state == TASK_STATUS__CK) {
×
1048
    streamTaskSetStatusReady(pTask);
×
1049
    tqDebug("s-task:%s reset checkpoint status to ready", pTask->id.idStr);
×
1050
  } else if (pState.state == TASK_STATUS__UNINIT) {
×
1051
    //    tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
1052
    //    tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
1053
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
1054
  } else {
1055
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
1056
  }
1057

1058
  streamMutexUnlock(&pTask->lock);
×
1059

1060
  streamMetaReleaseTask(pMeta, pTask);
×
1061
  return TSDB_CODE_SUCCESS;
×
1062
}
1063

1064
int32_t tqStreamTaskProcessAllTaskStopReq(SStreamMeta* pMeta, SMsgCb* pMsgCb, SRpcMsg* pMsg) {
3,938✔
1065
  int32_t  code = 0;
3,938✔
1066
  int32_t  vgId = pMeta->vgId;
3,938✔
1067
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
3,938✔
1068
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
3,938✔
1069
  SDecoder decoder;
1070

1071
  SStreamTaskStopReq req = {0};
3,938✔
1072
  tDecoderInit(&decoder, (uint8_t*)msg, len);
3,938✔
1073
  if ((code = tDecodeStreamTaskStopReq(&decoder, &req)) < 0) {
3,936!
1074
    tqError("vgId:%d failed to decode stop all streams, code:%s", pMeta->vgId, tstrerror(code));
×
1075
    tDecoderClear(&decoder);
×
1076
    return TSDB_CODE_SUCCESS;
×
1077
  }
1078

1079
  tDecoderClear(&decoder);
3,948✔
1080

1081
  // stop all stream tasks, only invoked when trying to drop db
1082
  if (req.streamId <= 0) {
3,941!
1083
    tqDebug("vgId:%d recv msg to stop all tasks in sync before dropping vnode", vgId);
3,945✔
1084
    code = streamMetaStopAllTasks(pMeta);
3,947✔
1085
    if (code) {
3,941!
1086
      tqError("vgId:%d failed to stop all tasks, code:%s", vgId, tstrerror(code));
×
1087
    }
1088

1089
  } else {  // stop only one stream tasks
1090

1091
  }
1092

1093
  // always return success
1094
  return TSDB_CODE_SUCCESS;
3,940✔
1095
}
1096

1097
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
1098
  SRetrieveChkptTriggerReq req = {0};
×
1099
  SStreamTask*             pTask = NULL;
×
1100
  char*                    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1101
  int32_t                  len = pMsg->contLen - sizeof(SMsgHead);
×
1102
  SDecoder                 decoder = {0};
×
1103

1104
  tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1105
  if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
×
1106
    tDecoderClear(&decoder);
×
1107
    tqError("vgId:%d invalid retrieve checkpoint-trigger req received", pMeta->vgId);
×
1108
    return TSDB_CODE_INVALID_MSG;
×
1109
  }
1110
  tDecoderClear(&decoder);
×
1111

1112
  int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
×
1113
  if (pTask == NULL || (code != 0)) {
×
1114
    tqError("vgId:%d process retrieve checkpoint-trigger, checkpointId:%" PRId64
×
1115
            " from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already",
1116
            pMeta->vgId, req.checkpointId, (int32_t)req.downstreamTaskId, req.upstreamTaskId);
1117
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
1118
  }
1119

1120
  tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64,
×
1121
          req.upstreamTaskId, (int32_t)req.downstreamTaskId, req.checkpointId);
1122

1123
  if (pTask->status.downstreamReady != 1) {
×
1124
    tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
×
1125
            pTask->id.idStr, (int32_t)req.downstreamTaskId);
1126

1127
    code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1128
                                              TSDB_CODE_STREAM_TASK_IVLD_STATUS);
1129
    streamMetaReleaseTask(pMeta, pTask);
×
1130
    return code;
×
1131
  }
1132

1133
  SStreamTaskState pState = streamTaskGetStatus(pTask);
×
1134
  if (pState.state == TASK_STATUS__CK) {  // recv the checkpoint-source/trigger already
×
1135
    int32_t transId = 0;
×
1136
    int64_t checkpointId = 0;
×
1137

1138
    streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId);
×
1139
    if (checkpointId != req.checkpointId) {
×
1140
      tqError("s-task:%s invalid checkpoint-trigger retrieve msg from 0x%" PRIx64 ", current checkpointId:%" PRId64
×
1141
              " req:%" PRId64,
1142
              pTask->id.idStr, req.downstreamTaskId, checkpointId, req.checkpointId);
1143
      streamMetaReleaseTask(pMeta, pTask);
×
1144
      return TSDB_CODE_INVALID_MSG;
×
1145
    }
1146

1147
    if (streamTaskAlreadySendTrigger(pTask, req.downstreamNodeId)) {
×
1148
      // re-send the lost checkpoint-trigger msg to downstream task
1149
      tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
×
1150
              (int32_t)req.downstreamTaskId, checkpointId, transId);
1151
      code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1152
                                                TSDB_CODE_SUCCESS);
1153
    } else {  // not send checkpoint-trigger yet, wait
1154
      int32_t recv = 0, total = 0;
×
1155
      streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
×
1156

1157
      if (recv == total) {  // add the ts info
×
1158
        tqWarn("s-task:%s all upstream send checkpoint-source/trigger, but not processed yet, wait", pTask->id.idStr);
×
1159
      } else {
1160
        tqWarn(
×
1161
            "s-task:%s not all upstream send checkpoint-source/trigger, total recv:%d/%d, wait for all upstream "
1162
            "sending checkpoint-source/trigger",
1163
            pTask->id.idStr, recv, total);
1164
      }
1165
      code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1166
                                                TSDB_CODE_ACTION_IN_PROGRESS);
1167
    }
1168
  } else {  // upstream not recv the checkpoint-source/trigger till now
1169
    if (!(pState.state == TASK_STATUS__READY || pState.state == TASK_STATUS__HALT)) {
×
1170
      tqFatal("s-task:%s invalid task status:%s", pTask->id.idStr, pState.name);
×
1171
    }
1172

1173
    tqWarn(
×
1174
        "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
1175
        "upstream sending checkpoint-source/trigger",
1176
        pTask->id.idStr);
1177
    code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1178
                                              TSDB_CODE_ACTION_IN_PROGRESS);
1179
  }
1180

1181
  streamMetaReleaseTask(pMeta, pTask);
×
1182
  return code;
×
1183
}
1184

1185
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
1186
  SCheckpointTriggerRsp rsp = {0};
×
1187
  SStreamTask*          pTask = NULL;
×
1188
  char*                 msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1189
  int32_t               len = pMsg->contLen - sizeof(SMsgHead);
×
1190
  SDecoder              decoder = {0};
×
1191

1192
  tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1193
  if (tDecodeCheckpointTriggerRsp(&decoder, &rsp) < 0) {
×
1194
    tDecoderClear(&decoder);
×
1195
    tqError("vgId:%d invalid retrieve checkpoint-trigger rsp received", pMeta->vgId);
×
1196
    return TSDB_CODE_INVALID_MSG;
×
1197
  }
1198
  tDecoderClear(&decoder);
×
1199

1200
  int32_t code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.taskId, &pTask);
×
1201
  if (pTask == NULL || (code != 0)) {
×
1202
    tqError(
×
1203
        "vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already",
1204
        pMeta->vgId, rsp.taskId);
1205
    return code;
×
1206
  }
1207

1208
  tqDebug(
×
1209
      "s-task:%s recv re-send checkpoint-trigger msg through retrieve/rsp channel, upstream:0x%x, checkpointId:%" PRId64
1210
      ", transId:%d",
1211
      pTask->id.idStr, rsp.upstreamTaskId, rsp.checkpointId, rsp.transId);
1212

1213
  code = streamTaskProcessCheckpointTriggerRsp(pTask, &rsp);
×
1214
  streamMetaReleaseTask(pMeta, pTask);
×
1215
  return code;
×
1216
}
1217

1218
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
745✔
1219
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
745✔
1220

1221
  SStreamTask* pTask = NULL;
745✔
1222
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
745✔
1223
  if (pTask == NULL || (code != 0)) {
748!
1224
    tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
×
1225
            pReq->taskId);
1226
    // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
1227
    return TSDB_CODE_SUCCESS;
×
1228
  }
1229

1230
  tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
749✔
1231
  streamTaskPause(pTask);
749✔
1232

1233
  SStreamTask* pHistoryTask = NULL;
750✔
1234
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
750!
1235
    pHistoryTask = NULL;
×
1236
    code = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId, &pHistoryTask);
×
1237
    if (pHistoryTask == NULL || (code != 0)) {
×
1238
      tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
×
1239
              ", it may have been dropped already",
1240
              pMeta->vgId, pTask->hTaskInfo.id.taskId);
1241
      streamMetaReleaseTask(pMeta, pTask);
×
1242

1243
      // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
1244
      return TSDB_CODE_SUCCESS;
×
1245
    }
1246

1247
    tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
×
1248

1249
    streamTaskPause(pHistoryTask);
×
1250
    streamMetaReleaseTask(pMeta, pHistoryTask);
×
1251
  }
1252

1253
  streamMetaReleaseTask(pMeta, pTask);
750✔
1254
  return TSDB_CODE_SUCCESS;
750✔
1255
}
1256

1257
static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t sversion, int8_t igUntreated,
1,238✔
1258
                                       bool fromVnode) {
1259
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
1,238✔
1260
  int32_t      vgId = pMeta->vgId;
1,238✔
1261
  int32_t      code = 0;
1,238✔
1262

1263
  streamTaskResume(pTask);
1,238✔
1264
  ETaskStatus status = streamTaskGetStatus(pTask).state;
1,247✔
1265

1266
  int32_t level = pTask->info.taskLevel;
1,247✔
1267
  if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
1,247!
1268
    // no lock needs to secure the access of the version
1269
    if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
1,247!
1270
      // discard all the data  when the stream task is suspended.
1271
      walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
253✔
1272
      tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
253✔
1273
              ", schedStatus:%d",
1274
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1275
    } else {  // from the previous paused version and go on
1276
      tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
994✔
1277
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1278
    }
1279

1280
    if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
1,247!
1281
      pTask->hTaskInfo.operatorOpen = false;
×
1282
      code = streamStartScanHistoryAsync(pTask, igUntreated);
×
1283
    } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
1,247✔
1284
      //      code = tqScanWalAsync((STQ*)handle, false);
1285
    } else {
1286
      code = streamTrySchedExec(pTask, false);
629✔
1287
    }
1288
  }
1289

1290
  return code;
1,246✔
1291
}
1292

1293
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode) {
1,230✔
1294
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
1,230✔
1295

1296
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
1,230✔
1297

1298
  SStreamTask* pTask = NULL;
1,230✔
1299
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
1,230✔
1300
  if (pTask == NULL || (code != 0)) {
1,244!
1301
    tqError("s-task:0x%x failed to acquire task to resume, it may have been dropped or stopped", pReq->taskId);
×
1302
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1303
  }
1304

1305
  streamMutexLock(&pTask->lock);
1,244✔
1306
  SStreamTaskState pState = streamTaskGetStatus(pTask);
1,247✔
1307
  tqDebug("s-task:%s start to resume from paused, current status:%s", pTask->id.idStr, pState.name);
1,246✔
1308
  streamMutexUnlock(&pTask->lock);
1,246✔
1309

1310
  code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
1,243✔
1311
  if (code != 0) {
1,246!
1312
    streamMetaReleaseTask(pMeta, pTask);
×
1313
    return code;
×
1314
  }
1315

1316
  STaskId*     pHTaskId = &pTask->hTaskInfo.id;
1,246✔
1317
  SStreamTask* pHTask = NULL;
1,246✔
1318
  code = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
1,246✔
1319
  if (pHTask && (code == 0)) {
1,246!
1320
    streamMutexLock(&pHTask->lock);
×
1321
    SStreamTaskState p = streamTaskGetStatus(pHTask);
×
1322
    tqDebug("s-task:%s related history task start to resume from paused, current status:%s", pHTask->id.idStr, p.name);
×
1323
    streamMutexUnlock(&pHTask->lock);
×
1324

1325
    code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode);
×
1326
    tqDebug("s-task:%s resume complete, code:%s", pHTask->id.idStr, tstrerror(code));
×
1327

1328
    streamMetaReleaseTask(pMeta, pHTask);
×
1329
  }
1330

1331
  streamMetaReleaseTask(pMeta, pTask);
1,246✔
1332
  return TSDB_CODE_SUCCESS;
1,245✔
1333
}
1334

1335
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); }
×
1336

1337
int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) {
10,889✔
1338
  rpcFreeCont(pMsg->pCont);
10,889✔
1339
  pMsg->pCont = NULL;
10,890✔
1340
  return TSDB_CODE_SUCCESS;
10,890✔
1341
}
1342

1343
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
31,135✔
1344
  SMStreamHbRspMsg rsp = {0};
31,135✔
1345
  int32_t          code = 0;
31,135✔
1346
  SDecoder         decoder;
1347
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
31,135✔
1348
  int32_t          len = pMsg->contLen - sizeof(SMsgHead);
31,135✔
1349

1350
  tDecoderInit(&decoder, (uint8_t*)msg, len);
31,135✔
1351
  code = tDecodeStreamHbRsp(&decoder, &rsp);
31,111✔
1352
  if (code < 0) {
31,130!
1353
    terrno = TSDB_CODE_INVALID_MSG;
×
1354
    tDecoderClear(&decoder);
×
1355
    tqError("vgId:%d failed to parse hb rsp msg, code:%s", pMeta->vgId, tstrerror(terrno));
×
1356
    return terrno;
×
1357
  }
1358

1359
  tDecoderClear(&decoder);
31,130✔
1360
  return streamProcessHeartbeatRsp(pMeta, &rsp);
31,128✔
1361
}
1362

1363
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
4,355✔
1364

1365
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
6,534✔
1366

1367
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
8,747✔
1368
  SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont;
8,747✔
1369

1370
  SStreamTask* pTask = NULL;
8,747✔
1371
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->downstreamTaskId, &pTask);
8,747✔
1372
  if (pTask == NULL || (code != 0)) {
8,752!
1373
    tqError("vgId:%d failed to acquire task:0x%x when handling checkpoint-ready msg, it may have been dropped",
×
1374
            pRsp->downstreamNodeId, pRsp->downstreamTaskId);
1375
    return code;
×
1376
  }
1377

1378
  code = streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId);
8,752✔
1379
  streamMetaReleaseTask(pMeta, pTask);
8,752✔
1380
  return code;
8,749✔
1381
}
1382

1383
int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
275✔
1384
  int32_t                vgId = pMeta->vgId;
275✔
1385
  int32_t                code = 0;
275✔
1386
  SStreamTask*           pTask = NULL;
275✔
1387
  char*                  msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
275✔
1388
  int32_t                len = pMsg->contLen - sizeof(SMsgHead);
275✔
1389
  int64_t                now = taosGetTimestampMs();
275✔
1390
  SDecoder               decoder;
1391
  SRestoreCheckpointInfo req = {0};
275✔
1392

1393
  tDecoderInit(&decoder, (uint8_t*)msg, len);
275✔
1394
  if ((code = tDecodeRestoreCheckpointInfo(&decoder, &req)) < 0) {
275!
1395
    tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code));
×
1396
    tDecoderClear(&decoder);
×
1397
    return TSDB_CODE_SUCCESS;
×
1398
  }
1399

1400
  tDecoderClear(&decoder);
274✔
1401

1402
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
275✔
1403
  if (pTask == NULL || (code != 0)) {
275!
1404
    // ignore this code to avoid error code over writing
1405
    if (pMeta->role == NODE_ROLE_LEADER) {
×
1406
      tqError("vgId:%d process consensus checkpointId req:%" PRId64
×
1407
              " transId:%d, failed to acquire task:0x%x, it may have been dropped/stopped already",
1408
              pMeta->vgId, req.checkpointId, req.transId, req.taskId);
1409

1410
      int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId, true);
×
1411
      if (ret) {
×
1412
        tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret));
×
1413
      }
1414
    } else {
1415
      tqDebug("vgId:%d task:0x%x stopped in follower node, not set the consensus checkpointId:%" PRId64 " transId:%d",
×
1416
              pMeta->vgId, req.taskId, req.checkpointId, req.transId);
1417
    }
1418

1419
    return 0;
×
1420
  }
1421

1422
  // discard the rsp, since it is expired.
1423
  if (req.startTs < pTask->execInfo.created) {
275!
1424
    tqWarn("s-task:%s vgId:%d createTs:%" PRId64 " recv expired consensus checkpointId:%" PRId64
×
1425
           " from task createTs:%" PRId64 " < task createTs:%" PRId64 ", discard",
1426
           pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs,
1427
           pTask->execInfo.created);
1428
    if (pMeta->role == NODE_ROLE_LEADER) {
×
1429
      streamMetaAddFailedTaskSelf(pTask, now, true);
×
1430
    }
1431

1432
    streamMetaReleaseTask(pMeta, pTask);
×
1433
    return TSDB_CODE_SUCCESS;
×
1434
  }
1435

1436
  tqInfo("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64
275!
1437
          " transId:%d from mnode, reqTs:%" PRId64 " task createTs:%" PRId64,
1438
          pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId, req.transId, req.startTs,
1439
          pTask->execInfo.created);
1440

1441
  streamMutexLock(&pTask->lock);
276✔
1442
  SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo;
276✔
1443

1444
  if (pTask->chkInfo.checkpointId < req.checkpointId) {
276!
1445
    tqFatal("s-task:%s vgId:%d invalid consensus-checkpointId:%" PRId64 ", greater than existed checkpointId:%" PRId64,
×
1446
            pTask->id.idStr, vgId, req.checkpointId, pTask->chkInfo.checkpointId);
1447

1448
    streamMutexUnlock(&pTask->lock);
×
1449
    streamMetaReleaseTask(pMeta, pTask);
×
1450
    return 0;
×
1451
  }
1452

1453
  if (pConsenInfo->consenChkptTransId >= req.transId) {
276!
1454
    tqWarn("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
×
1455
            pConsenInfo->consenChkptTransId, req.transId);
1456
    streamMutexUnlock(&pTask->lock);
×
1457
    streamMetaReleaseTask(pMeta, pTask);
×
1458
    return TSDB_CODE_SUCCESS;
×
1459
  }
1460

1461
  if (pTask->chkInfo.checkpointId != req.checkpointId) {
276!
1462
    tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64 " transId:%d", pTask->id.idStr, vgId,
×
1463
            pTask->chkInfo.checkpointId, req.checkpointId, req.transId);
1464
    pTask->chkInfo.checkpointId = req.checkpointId;
×
1465
    tqSetRestoreVersionInfo(pTask);
×
1466
  } else {
1467
    tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current id, transId:%d not update",
276✔
1468
            pTask->id.idStr, vgId, req.checkpointId, req.transId);
1469
  }
1470

1471
  streamTaskSetConsenChkptIdRecv(pTask, req.transId, now);
276✔
1472
  streamMutexUnlock(&pTask->lock);
276✔
1473

1474
  streamMetaWLock(pTask->pMeta);
276✔
1475
  if (pMeta->startInfo.curStage == START_WAIT_FOR_CHKPTID) {
276✔
1476
    pMeta->startInfo.curStage = START_CHECK_DOWNSTREAM;
101✔
1477

1478
    SStartTaskStageInfo info = {.stage = pMeta->startInfo.curStage, .ts = now};
101✔
1479
    taosArrayPush(pMeta->startInfo.pStagesList, &info);
101✔
1480

1481
    tqDebug("vgId:%d wait_for_chkptId stage -> check_down_stream stage, reqTs:%" PRId64 " , numOfStageHist:%d",
101✔
1482
            pMeta->vgId, info.ts, (int32_t)taosArrayGetSize(pMeta->startInfo.pStagesList));
1483
  }
1484

1485
  if (pMeta->role == NODE_ROLE_LEADER) {
276!
1486
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
276✔
1487

1488
    bool exist = false;
276✔
1489
    for (int32_t i = 0; i < taosArrayGetSize(pMeta->startInfo.pRecvChkptIdTasks); ++i) {
661✔
1490
      STaskId* pId = taosArrayGet(pMeta->startInfo.pRecvChkptIdTasks, i);
385✔
1491
      if (id.streamId == pId->streamId && id.taskId == pId->taskId) {
385!
1492
        exist = true;
×
1493
        break;
×
1494
      }
1495
    }
1496

1497
    if (!exist) {
276!
1498
      void* p = taosArrayPush(pMeta->startInfo.pRecvChkptIdTasks, &id);
276✔
1499
      if (p == NULL) {  // todo handle error, not record the newly attached, start may dead-lock
276!
1500
        tqError("s-task:0x%x failed to add into recv checkpointId task list, code:%s", req.taskId, tstrerror(code));
×
1501
      } else {
1502
        int32_t num = taosArrayGetSize(pMeta->startInfo.pRecvChkptIdTasks);
276✔
1503
        tqDebug("s-task:0x%x vgId:%d added into recv checkpointId task list, already recv %d", req.taskId, req.nodeId,
276✔
1504
                num);
1505
      }
1506
    } else {
1507
      int32_t num = taosArrayGetSize(pMeta->startInfo.pRecvChkptIdTasks);
×
1508
      tqDebug("s-task:0x%x vgId:%d already exist in recv consensus checkpontId, total existed:%d", req.taskId,
×
1509
              req.nodeId, num);
1510
    }
1511

1512
    code = tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
276✔
1513
    if (code != 0) {
275!
1514
      // todo remove the newly added, otherwise, deadlock exist
1515
      tqError("s-task:0x%x vgId:%d failed start task async, code:%s", req.taskId, vgId, tstrerror(code));
×
1516
    }
1517
  } else {
1518
    tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr);
×
1519
  }
1520

1521
  streamMetaWUnLock(pTask->pMeta);
275✔
1522

1523
  streamMetaReleaseTask(pMeta, pTask);
276✔
1524
  return 0;
276✔
1525
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc