• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3663

19 Mar 2025 09:21AM UTC coverage: 61.664% (-0.6%) from 62.28%
#3663

push

travis-ci

web-flow
docs: add defination of tmq_config_res_t & fix spell error (#30271)

153169 of 318241 branches covered (48.13%)

Branch coverage included in aggregate %.

239405 of 318390 relevant lines covered (75.19%)

5762846.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.78
/source/dnode/vnode/src/tqCommon/tqCommon.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsgcb.h"
17
#include "tq.h"
18
#include "tstream.h"
19

20
typedef struct SMStreamCheckpointReadyRspMsg {
21
  SMsgHead head;
22
  int64_t  streamId;
23
  int32_t  upstreamTaskId;
24
  int32_t  upstreamNodeId;
25
  int32_t  downstreamTaskId;
26
  int32_t  downstreamNodeId;
27
  int64_t  checkpointId;
28
  int32_t  transId;
29
} SMStreamCheckpointReadyRspMsg;
30

31
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
32

33
int32_t tqExpandStreamTask(SStreamTask* pTask) {
14,116✔
34
  SStreamMeta* pMeta = pTask->pMeta;
14,116✔
35
  int32_t      vgId = pMeta->vgId;
14,116✔
36
  int64_t      st = taosGetTimestampMs();
14,117✔
37
  int64_t      streamId = 0;
14,117✔
38
  int32_t      taskId = 0;
14,117✔
39
  int32_t      code = 0;
14,117✔
40

41
  tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId);
14,117✔
42

43
  if (pTask->info.fillHistory != STREAM_NORMAL_TASK) {
14,117✔
44
    streamId = pTask->streamTaskId.streamId;
4,786✔
45
    taskId = pTask->streamTaskId.taskId;
4,786✔
46
  } else {
47
    streamId = pTask->id.streamId;
9,331✔
48
    taskId = pTask->id.taskId;
9,331✔
49
  }
50

51
  // sink task does not need the pState
52
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
14,117✔
53
    if (pTask->info.fillHistory == STREAM_RECALCUL_TASK) {
7,381✔
54
      pTask->pRecalState = streamStateRecalatedOpen(pMeta->path, pTask, pTask->id.streamId, pTask->id.taskId);
27✔
55
      if (pTask->pRecalState == NULL) {
27!
56
        tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
×
57
        return terrno;
×
58
      } else {
59
        tqDebug("s-task:%s recal state:%p", pTask->id.idStr, pTask->pRecalState);
27!
60
      }
61
    }
62

63
    pTask->pState = streamStateOpen(pMeta->path, pTask, streamId, taskId);
7,381✔
64
    if (pTask->pState == NULL) {
7,381!
65
      tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
×
66
      return terrno;
×
67
    } else {
68
      tqDebug("s-task:%s stream state:%p", pTask->id.idStr, pTask->pState);
7,381✔
69
    }
70
  }
71

72
  SReadHandle handle = {
14,117✔
73
      .checkpointId = pTask->chkInfo.checkpointId,
14,117✔
74
      .pStateBackend = NULL,
75
      .fillHistory = pTask->info.fillHistory,
14,117✔
76
      .winRange = pTask->dataRange.window,
77
      .pOtherBackend = NULL,
78
  };
79

80
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
14,117✔
81
    handle.vnode = ((STQ*)pMeta->ahandle)->pVnode;
7,095✔
82
    handle.initTqReader = 1;
7,095✔
83
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
7,022✔
84
    handle.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
286✔
85
  }
86

87
  initStorageAPI(&handle.api);
14,117✔
88

89
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG) {
14,115✔
90
    if (pTask->info.fillHistory == STREAM_RECALCUL_TASK) {
7,381✔
91
      handle.pStateBackend = pTask->pRecalState;
27✔
92
      handle.pOtherBackend = pTask->pState;
27✔
93
    } else {
94
      handle.pStateBackend = pTask->pState;
7,354✔
95
      handle.pOtherBackend = NULL;
7,354✔
96
    }
97

98
    code = qCreateStreamExecTaskInfo(&pTask->exec.pExecutor, pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
7,381✔
99
    if (code) {
7,380!
100
      tqError("s-task:%s failed to expand task, code:%s", pTask->id.idStr, tstrerror(code));
×
101
      return code;
×
102
    }
103

104
    code = qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
7,380✔
105
    if (code) {
7,381!
106
      return code;
×
107
    }
108

109
    code = qSetStreamNotifyInfo(pTask->exec.pExecutor, pTask->notifyInfo.notifyEventTypes,
14,762✔
110
                                pTask->notifyInfo.pSchemaWrapper, pTask->notifyInfo.stbFullName,
7,381✔
111
                                IS_NEW_SUBTB_RULE(pTask), &pTask->notifyEventStat);
7,381!
112
    if (code) {
7,380!
113
      tqError("s-task:%s failed to set stream notify info, code:%s", pTask->id.idStr, tstrerror(code));
×
114
      return code;
×
115
    }
116
  }
117

118
  streamSetupScheduleTrigger(pTask);
14,114✔
119

120
  double el = (taosGetTimestampMs() - st) / 1000.0;
14,112✔
121
  tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el);
14,112✔
122

123
  return code;
14,115✔
124
}
125

126
void tqSetRestoreVersionInfo(SStreamTask* pTask) {
14,454✔
127
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
14,454✔
128

129
  // checkpoint ver is the kept version, handled data should be the next version.
130
  if (pChkInfo->checkpointId != 0) {
14,454✔
131
    pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
217✔
132
    pChkInfo->processedVer = pChkInfo->checkpointVer;
217✔
133
    pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
217✔
134

135
    tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
217!
136
           pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
137
  }
138

139
  pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
14,454✔
140
}
14,454✔
141

142
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
15✔
143
  int32_t vgId = pMeta->vgId;
15✔
144
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
15✔
145
  if (numOfTasks == 0) {
15!
146
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
147
    return 0;
×
148
  }
149

150
  tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
15!
151

152
  int32_t type = restart ? STREAM_EXEC_T_RESTART_ALL_TASKS : STREAM_EXEC_T_START_ALL_TASKS;
15!
153
  return streamTaskSchedTask(cb, vgId, 0, 0, type, false);
15✔
154
}
155

156
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
9,306✔
157
  int32_t vgId = pMeta->vgId;
9,306✔
158
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
9,306✔
159
  if (numOfTasks == 0) {
9,306!
160
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
161
    return 0;
×
162
  }
163

164
  tqDebug("vgId:%d start task:0x%x async", vgId, taskId);
9,306✔
165
  return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK, false);
9,306✔
166
}
167

168
// this is to process request from transaction, always return true.
169
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader) {
50✔
170
  int32_t      vgId = pMeta->vgId;
50✔
171
  char*        msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
50✔
172
  int32_t      len = pMsg->contLen - sizeof(SMsgHead);
50✔
173
  SRpcMsg      rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
50✔
174
  int64_t      st = taosGetTimestampMs();
50✔
175
  bool         updated = false;
50✔
176
  int32_t      code = 0;
50✔
177
  SStreamTask* pTask = NULL;
50✔
178
  SStreamTask* pHTask = NULL;
50✔
179

180
  SStreamTaskNodeUpdateMsg req = {0};
50✔
181

182
  SDecoder decoder;
183
  tDecoderInit(&decoder, (uint8_t*)msg, len);
50✔
184
  if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
50!
185
    rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
×
186
    tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code));
×
187
    tDecoderClear(&decoder);
×
188
    return rsp.code;
×
189
  }
190

191
  tDecoderClear(&decoder);
50✔
192

193
  int32_t gError = streamGetFatalError(pMeta);
50✔
194
  if (gError != 0) {
50!
195
    tqError("vgId:%d global fatal occurs, code:%s, ts:%" PRId64 " func:%s", pMeta->vgId, tstrerror(gError),
×
196
            pMeta->fatalInfo.ts, pMeta->fatalInfo.func);
197
    return 0;
×
198
  }
199

200
  // update the nodeEpset when it exists
201
  streamMetaWLock(pMeta);
50✔
202

203
  // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
204
  STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
50✔
205
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
50✔
206
  if (code != 0) {
50!
207
    tqError("vgId:%d failed to acquire task:0x%x when handling update task epset, it may have been dropped", vgId,
×
208
            req.taskId);
209
    rsp.code = TSDB_CODE_SUCCESS;
×
210
    streamMetaWUnLock(pMeta);
×
211
    taosArrayDestroy(req.pNodeList);
×
212
    return rsp.code;
×
213
  }
214

215
  const char* idstr = pTask->id.idStr;
50✔
216

217
  if (req.transId <= 0) {
50!
218
    tqError("vgId:%d invalid update nodeEp task, transId:%d, discard", vgId, req.taskId);
×
219
    rsp.code = TSDB_CODE_SUCCESS;
×
220

221
    streamMetaReleaseTask(pMeta, pTask);
×
222
    streamMetaWUnLock(pMeta);
×
223

224
    taosArrayDestroy(req.pNodeList);
×
225
    return rsp.code;
×
226
  }
227

228
  // info needs to be kept till the new trans to update the nodeEp arrived.
229
  bool update = streamMetaInitUpdateTaskList(pMeta, req.transId);
50✔
230
  if (!update) {
50!
231
    rsp.code = TSDB_CODE_SUCCESS;
×
232

233
    streamMetaReleaseTask(pMeta, pTask);
×
234
    streamMetaWUnLock(pMeta);
×
235

236
    taosArrayDestroy(req.pNodeList);
×
237
    return rsp.code;
×
238
  }
239

240
  // duplicate update epset msg received, discard this redundant message
241
  STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
50✔
242

243
  void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
50✔
244
  if (pReqTask != NULL) {
50!
245
    tqDebug("s-task:%s (vgId:%d) already update in transId:%d, discard the nodeEp update msg", idstr, vgId,
×
246
            req.transId);
247
    rsp.code = TSDB_CODE_SUCCESS;
×
248

249
    streamMetaReleaseTask(pMeta, pTask);
×
250
    streamMetaWUnLock(pMeta);
×
251

252
    taosArrayDestroy(req.pNodeList);
×
253
    return rsp.code;
×
254
  }
255

256
  updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
50✔
257

258
  // send the checkpoint-source-rsp for source task to end the checkpoint trans in mnode
259
  code = streamTaskSendCheckpointsourceRsp(pTask);
50✔
260
  if (code) {
50!
261
    tqError("%s failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
×
262
  }
263
  streamTaskResetStatus(pTask);
50✔
264

265
  streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
50✔
266

267
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
50✔
268
    code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->hTaskInfo.id, &pHTask);
19✔
269
    if (code != 0) {
19!
270
      tqError(
×
271
          "vgId:%d failed to acquire fill-history task:0x%x when handling update, may have been dropped already, rel "
272
          "stream task:0x%x",
273
          vgId, (uint32_t)pTask->hTaskInfo.id.taskId, req.taskId);
274
      CLEAR_RELATED_FILLHISTORY_TASK(pTask);
×
275
    } else {
276
      tqDebug("s-task:%s fill-history task update nodeEp along with stream task", pHTask->id.idStr);
19!
277
      bool updateEpSet = streamTaskUpdateEpsetInfo(pHTask, req.pNodeList);
19✔
278
      if (updateEpSet) {
19!
279
        updated = updateEpSet;
19✔
280
      }
281

282
      streamTaskResetStatus(pHTask);
19✔
283
      streamTaskStopMonitorCheckRsp(&pHTask->taskCheckInfo, pHTask->id.idStr);
19✔
284
    }
285
  }
286

287
  // stream do update the nodeEp info, write it into stream meta.
288
  if (updated) {
50✔
289
    tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId);
26!
290
    code = streamMetaSaveTaskInMeta(pMeta, pTask);
26✔
291
    if (code) {
26!
292
      tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code));
×
293
    }
294

295
    if (pHTask != NULL) {
26✔
296
      code = streamMetaSaveTaskInMeta(pMeta, pHTask);
19✔
297
      if (code) {
19!
298
        tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code));
×
299
      }
300
    }
301
  } else {
302
    tqDebug("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId);
24!
303
  }
304

305
  code = streamTaskStop(pTask);
50✔
306
  if (code) {
50!
307
    tqError("s-task:%s vgId:%d failed to stop task, code:%s", idstr, vgId, tstrerror(code));
×
308
  }
309

310
  if (pHTask != NULL) {
50✔
311
    code = streamTaskStop(pHTask);
19✔
312
    if (code) {
19!
313
      tqError("s-task:%s vgId:%d failed to stop related history task, code:%s", idstr, vgId, tstrerror(code));
×
314
    }
315
  }
316

317
  // keep info
318
  streamMetaAddIntoUpdateTaskList(pMeta, pTask, (pHTask != NULL) ? (pHTask) : NULL, req.transId, st);
50✔
319
  streamMetaReleaseTask(pMeta, pTask);
50✔
320
  streamMetaReleaseTask(pMeta, pHTask);
50✔
321

322
  rsp.code = TSDB_CODE_SUCCESS;
50✔
323

324
  // possibly only handle the stream task.
325
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
50✔
326
  int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
50✔
327

328
  if (restored && isLeader) {
50!
329
    tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId);
26!
330
    pMeta->startInfo.tasksWillRestart = 1;
26✔
331
  }
332

333
  if (updateTasks < numOfTasks) {
50✔
334
    if (isLeader) {
23✔
335
      tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
11!
336
              updateTasks, (numOfTasks - updateTasks));
337
    } else {
338
      tqDebug("vgId:%d closed tasks:%d, unclosed:%d, follower not restart tasks", vgId, updateTasks,
12!
339
              (numOfTasks - updateTasks));
340
    }
341
  } else {
342
    if ((code = streamMetaCommit(pMeta)) < 0) {
27!
343
      // always return true
344
      streamMetaWUnLock(pMeta);
×
345
      taosArrayDestroy(req.pNodeList);
×
346
      return TSDB_CODE_SUCCESS;
×
347
    }
348

349
    streamMetaClearSetUpdateTaskListComplete(pMeta);
27✔
350

351
    if (isLeader) {
27✔
352
      if (!restored) {
15!
353
        tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId);
×
354
      } else {
355
        tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId);
15!
356
#if 0
357
      taosMSleep(5000);// for test purpose, to trigger the leader election
358
#endif
359
        code = tqStreamTaskStartAsync(pMeta, cb, true);
15✔
360
        if (code) {
15!
361
          tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code));
×
362
        }
363
      }
364
    } else {
365
      tqDebug("vgId:%d follower nodes not restart tasks", vgId);
12!
366
    }
367
  }
368

369
  streamMetaWUnLock(pMeta);
50✔
370
  taosArrayDestroy(req.pNodeList);
50✔
371
  return rsp.code;  // always return true
50✔
372
}
373

374
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
27,847✔
375
  char*   msgStr = pMsg->pCont;
27,847✔
376
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
27,847✔
377
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
27,847✔
378

379
  SStreamDispatchReq req = {0};
27,847✔
380

381
  SDecoder decoder;
382
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
27,847✔
383
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
27,847!
384
    tDecoderClear(&decoder);
×
385
    return TSDB_CODE_MSG_DECODE_ERROR;
×
386
  }
387
  tDecoderClear(&decoder);
27,848✔
388

389
  tqDebug("s-task:0x%x recv dispatch msg from 0x%x(vgId:%d)", req.taskId, req.upstreamTaskId, req.upstreamNodeId);
27,848✔
390

391
  SStreamTask* pTask = NULL;
27,848✔
392
  int32_t      code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
27,848✔
393
  if (pTask && (code == 0)) {
27,848!
394
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
27,836✔
395
    if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0) {
27,836!
396
      return -1;
×
397
    }
398
    tCleanupStreamDispatchReq(&req);
27,836✔
399
    streamMetaReleaseTask(pMeta, pTask);
27,836✔
400
    return 0;
27,836✔
401
  } else {
402
    tqError("vgId:%d failed to find task:0x%x to handle the dispatch req, it may have been destroyed already",
12!
403
            pMeta->vgId, req.taskId);
404

405
    SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
12✔
406
    if (pRspHead == NULL) {
12!
407
      tqError("s-task:0x%x send dispatch error rsp, out of memory", req.taskId);
×
408
      return terrno;
×
409
    }
410

411
    pRspHead->vgId = htonl(req.upstreamNodeId);
12✔
412
    if (pRspHead->vgId == 0) {
12!
413
      tqError("vgId:%d invalid dispatch msg from upstream to task:0x%x", pMeta->vgId, req.taskId);
×
414
      return TSDB_CODE_INVALID_MSG;
×
415
    }
416

417
    SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
12✔
418
    pRsp->streamId = htobe64(req.streamId);
12✔
419
    pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
12✔
420
    pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
12✔
421
    pRsp->downstreamNodeId = htonl(pMeta->vgId);
12✔
422
    pRsp->downstreamTaskId = htonl(req.taskId);
12✔
423
    pRsp->msgId = htonl(req.msgId);
12✔
424
    pRsp->stage = htobe64(req.stage);
12✔
425
    pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;
12✔
426

427
    int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
12✔
428
    SRpcMsg rsp = {.code = TSDB_CODE_STREAM_TASK_NOT_EXIST, .info = pMsg->info, .contLen = len, .pCont = pRspHead};
12✔
429
    tqError("s-task:0x%x send dispatch error rsp, no task", req.taskId);
12!
430

431
    tmsgSendRsp(&rsp);
12✔
432
    tCleanupStreamDispatchReq(&req);
12✔
433

434
    return 0;
12✔
435
  }
436
}
437

438
int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
27,848✔
439
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
27,848✔
440

441
  int32_t vgId = pMeta->vgId;
27,848✔
442
  pRsp->upstreamNodeId = htonl(pRsp->upstreamNodeId);
27,848✔
443
  pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
27,848✔
444
  pRsp->streamId = htobe64(pRsp->streamId);
27,848✔
445
  pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId);
27,848✔
446
  pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId);
27,848✔
447
  pRsp->stage = htobe64(pRsp->stage);
27,848✔
448
  pRsp->msgId = htonl(pRsp->msgId);
27,848✔
449

450
  tqDebug("s-task:0x%x vgId:%d recv dispatch-rsp from 0x%x vgId:%d", pRsp->upstreamTaskId, pRsp->upstreamNodeId,
27,848✔
451
          pRsp->downstreamTaskId, pRsp->downstreamNodeId);
452

453
  SStreamTask* pTask = NULL;
27,848✔
454
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId, &pTask);
27,848✔
455
  if (pTask && (code == 0)) {
27,848!
456
    code = streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
27,836✔
457
    streamMetaReleaseTask(pMeta, pTask);
27,836✔
458
    return code;
27,836✔
459
  } else {
460
    tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId);
12✔
461
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
12✔
462
  }
463
}
464

465
int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
542✔
466
  char*    msgStr = pMsg->pCont;
542✔
467
  char*    msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
542✔
468
  int32_t  msgLen = pMsg->contLen - sizeof(SMsgHead);
542✔
469
  int32_t  code = 0;
542✔
470
  SDecoder decoder;
471

472
  SStreamRetrieveReq req;
473
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
542✔
474
  code = tDecodeStreamRetrieveReq(&decoder, &req);
542✔
475
  tDecoderClear(&decoder);
542✔
476

477
  if (code) {
542!
478
    tqError("vgId:%d failed to decode retrieve msg, discard it", pMeta->vgId);
×
479
    return code;
×
480
  }
481

482
  SStreamTask* pTask = NULL;
542✔
483
  code = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId, &pTask);
542✔
484
  if (pTask == NULL || code != 0) {
542!
485
    tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
1!
486
            req.dstTaskId);
487
    tCleanupStreamRetrieveReq(&req);
1✔
488
    return code;
×
489
  }
490

491
  // enqueue
492
  tqDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d), QID:0x%" PRIx64, pTask->id.idStr,
541✔
493
          pTask->pMeta->vgId, pTask->info.taskLevel, req.srcTaskId, req.srcNodeId, req.reqId);
494

495
  // if task is in ck status, set current ck failed
496
  streamTaskSetCheckpointFailed(pTask);
541✔
497

498
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
542✔
499
    code = streamProcessRetrieveReq(pTask, &req);
536✔
500
  } else {
501
    req.srcNodeId = pTask->info.nodeId;
6✔
502
    req.srcTaskId = pTask->id.taskId;
6✔
503
    code = streamTaskBroadcastRetrieveReq(pTask, &req);
6✔
504
  }
505

506
  if (code != TSDB_CODE_SUCCESS) {  // return error not send rsp manually
539!
507
    tqError("s-task:0x%x vgId:%d failed to process retrieve request from 0x%x, code:%s", req.dstTaskId, req.dstNodeId,
×
508
            req.srcTaskId, tstrerror(code));
509
  } else {  // send rsp manually only on success.
510
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
539✔
511
    streamTaskSendRetrieveRsp(&req, &rsp);
539✔
512
  }
513

514
  streamMetaReleaseTask(pMeta, pTask);
542✔
515
  tCleanupStreamRetrieveReq(&req);
542✔
516

517
  // always return success, to disable the auto rsp
518
  return code;
542✔
519
}
520

521
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
23,001✔
522
  char*   msgStr = pMsg->pCont;
23,001✔
523
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
23,001✔
524
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
23,001✔
525
  int32_t code = 0;
23,001✔
526

527
  SStreamTaskCheckReq req;
528
  SStreamTaskCheckRsp rsp = {0};
23,001✔
529

530
  SDecoder decoder;
531

532
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
23,001✔
533
  code = tDecodeStreamTaskCheckReq(&decoder, &req);
23,001✔
534
  tDecoderClear(&decoder);
23,001✔
535

536
  if (code) {
23,002!
537
    tqError("vgId:%d decode check msg failed, not handle this msg", pMeta->vgId);
×
538
    return code;
×
539
  }
540

541
  streamTaskProcessCheckMsg(pMeta, &req, &rsp);
23,002✔
542
  return streamTaskSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId);
23,002✔
543
}
544

545
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
22,964✔
546
  char*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
22,964✔
547
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
22,964✔
548
  int32_t vgId = pMeta->vgId;
22,964✔
549
  int32_t code = TSDB_CODE_SUCCESS;
22,964✔
550

551
  SStreamTaskCheckRsp rsp;
552

553
  SDecoder decoder;
554
  tDecoderInit(&decoder, (uint8_t*)pReq, len);
22,964✔
555
  code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
22,964✔
556
  if (code < 0) {
22,964!
557
    terrno = TSDB_CODE_INVALID_MSG;
×
558
    tDecoderClear(&decoder);
×
559
    tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno));
×
560
    return -1;
×
561
  }
562

563
  tDecoderClear(&decoder);
22,964✔
564
  tqDebug("tq task:0x%x (vgId:%d) recv check rsp(QID:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId,
22,965✔
565
          rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
566

567
  if (!isLeader) {
22,965!
568
    tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
×
569
            rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
570
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId, true);
×
571
  }
572

573
  SStreamTask* pTask = NULL;
22,965✔
574
  code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId, &pTask);
22,965✔
575
  if ((pTask == NULL) || (code != 0)) {
22,965✔
576
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId, true);
2✔
577
  }
578

579
  code = streamTaskProcessCheckRsp(pTask, &rsp);
22,963✔
580
  streamMetaReleaseTask(pMeta, pTask);
22,965✔
581
  return code;
22,965✔
582
}
583

584
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
6,483✔
585
  int32_t vgId = pMeta->vgId;
6,483✔
586
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
6,483✔
587
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
6,483✔
588
  int32_t code = 0;
6,483✔
589

590
  SStreamCheckpointReadyMsg req = {0};
6,483✔
591

592
  SDecoder decoder;
593
  tDecoderInit(&decoder, (uint8_t*)msg, len);
6,483✔
594
  if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) {
6,483!
595
    code = TSDB_CODE_MSG_DECODE_ERROR;
×
596
    tDecoderClear(&decoder);
×
597
    return code;
×
598
  }
599
  tDecoderClear(&decoder);
6,483✔
600

601
  SStreamTask* pTask = NULL;
6,483✔
602
  code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
6,483✔
603
  if (code != 0) {
6,483!
604
    tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId);
×
605
    return code;
×
606
  }
607

608
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
6,483!
609
    tqDebug("vgId:%d s-task:%s recv invalid the checkpoint-ready msg from task:0x%x (vgId:%d), discard", vgId,
×
610
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
611
    streamMetaReleaseTask(pMeta, pTask);
×
612
    return TSDB_CODE_INVALID_MSG;
×
613
  } else {
614
    tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId,
6,483✔
615
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
616
  }
617

618
  code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamNodeId, req.downstreamTaskId);
6,483✔
619
  streamMetaReleaseTask(pMeta, pTask);
6,483✔
620
  if (code) {
6,483!
621
    return code;
×
622
  }
623

624
  {  // send checkpoint ready rsp
625
    SMStreamCheckpointReadyRspMsg* pReadyRsp = rpcMallocCont(sizeof(SMStreamCheckpointReadyRspMsg));
6,483✔
626
    if (pReadyRsp == NULL) {
6,483!
627
      return terrno;
×
628
    }
629

630
    pReadyRsp->upstreamTaskId = req.upstreamTaskId;
6,483✔
631
    pReadyRsp->upstreamNodeId = req.upstreamNodeId;
6,483✔
632
    pReadyRsp->downstreamTaskId = req.downstreamTaskId;
6,483✔
633
    pReadyRsp->downstreamNodeId = req.downstreamNodeId;
6,483✔
634
    pReadyRsp->checkpointId = req.checkpointId;
6,483✔
635
    pReadyRsp->streamId = req.streamId;
6,483✔
636
    pReadyRsp->head.vgId = htonl(req.downstreamNodeId);
6,483✔
637

638
    SRpcMsg rsp = {.code = 0, .info = pMsg->info, .pCont = pReadyRsp, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)};
6,483✔
639
    tmsgSendRsp(&rsp);
6,483✔
640

641
    pMsg->info.handle = NULL;  // disable auto rsp
6,483✔
642
  }
643

644
  return code;
6,483✔
645
}
646

647
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
14,003✔
648
                                     bool isLeader, bool restored) {
649
  int32_t code = 0;
14,003✔
650
  int32_t vgId = pMeta->vgId;
14,003✔
651
  int32_t numOfTasks = 0;
14,003✔
652
  int32_t taskId = -1;
14,003✔
653
  int64_t streamId = -1;
14,003✔
654
  bool    added = false;
14,003✔
655
  int32_t size = sizeof(SStreamTask);
14,003✔
656

657
  if (tsDisableStream) {
14,003!
658
    tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
×
659
    return code;
×
660
  }
661

662
  tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
14,003✔
663

664
  // 1.deserialize msg and build task
665
  SStreamTask* pTask = taosMemoryCalloc(1, size);
14,003!
666
  if (pTask == NULL) {
14,017!
667
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);
×
668
    return terrno;
×
669
  }
670

671
  SDecoder decoder;
672
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
14,017✔
673
  code = tDecodeStreamTask(&decoder, pTask);
14,015✔
674
  tDecoderClear(&decoder);
14,007✔
675

676
  if (code != TSDB_CODE_SUCCESS) {
14,006!
677
    taosMemoryFree(pTask);
×
678
    return TSDB_CODE_INVALID_MSG;
×
679
  }
680

681
  // 2.save task, use the latest commit version as the initial start version of stream task.
682
  taskId = pTask->id.taskId;
14,006✔
683
  streamId = pTask->id.streamId;
14,006✔
684

685
  streamMetaWLock(pMeta);
14,006✔
686
  code = streamMetaRegisterTask(pMeta, sversion, pTask, &added);
14,021✔
687
  numOfTasks = streamMetaGetNumOfTasks(pMeta);
14,025✔
688
  streamMetaWUnLock(pMeta);
14,027✔
689

690
  if (code < 0) {
14,027!
691
    tqError("vgId:%d failed to register s-task:0x%x into meta, existed tasks:%d, code:%s", vgId, taskId, numOfTasks,
×
692
            tstrerror(code));
693
    return code;
×
694
  }
695

696
  // added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if
697
  // it is added into the meta store
698
  if (added) {
14,027!
699
    // only handled in the leader node
700
    if (isLeader) {
14,027✔
701
      tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
13,975✔
702

703
      if (restored) {
13,975✔
704
        SStreamTask* p = NULL;
13,895✔
705
        code = streamMetaAcquireTask(pMeta, streamId, taskId, &p);
13,895✔
706
        if ((p != NULL) && (code == 0) && (p->info.fillHistory == 0)) {
13,894!
707
          code = tqStreamStartOneTaskAsync(pMeta, cb, streamId, taskId);
9,151✔
708
        }
709

710
        if (p != NULL) {
13,893!
711
          streamMetaReleaseTask(pMeta, p);
13,893✔
712
        }
713
      } else {
714
        tqWarn("s-task:0x%x not launched since vnode(vgId:%d) not ready", taskId, vgId);
80!
715
      }
716

717
    } else {
718
      tqDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId);
52!
719
    }
720
  } else {
721
    tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store, total:%d", vgId, taskId, numOfTasks);
×
722
  }
723

724
  return code;
14,026✔
725
}
726

727
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
6,927✔
728
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
6,927✔
729
  int32_t              code = 0;
6,927✔
730
  int32_t              vgId = pMeta->vgId;
6,927✔
731
  STaskId              hTaskId = {0};
6,927✔
732
  SStreamTask*         pTask = NULL;
6,927✔
733

734
  tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
6,927✔
735

736
  streamMetaWLock(pMeta);
6,927✔
737

738
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
6,940✔
739
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
6,940✔
740
  if (code == 0) {
6,938!
741
    if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
6,938✔
742
      hTaskId.streamId = pTask->hTaskInfo.id.streamId;
1,348✔
743
      hTaskId.taskId = pTask->hTaskInfo.id.taskId;
1,348✔
744
    }
745

746
    // clear the relationship, and then release the stream tasks, to avoid invalid accessing of already freed
747
    // related stream(history) task
748
    streamTaskSetRemoveBackendFiles(pTask);
6,938✔
749
    code = streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt);
6,917✔
750
    streamMetaReleaseTask(pMeta, pTask);
6,923✔
751

752
    if (code) {
6,939!
753
      tqError("s-task:0x%x failed to clear related fill-history info, still exists", pReq->taskId);
×
754
    }
755
  }
756

757
  // drop the related fill-history task firstly
758
  if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
6,939!
759
    tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
1,347✔
760
    code = streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
1,347✔
761
    if (code) {
1,347!
762
      tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x failed", pReq->taskId, vgId,
×
763
              (int32_t)hTaskId.taskId);
764
    }
765
  }
766

767
  // drop the stream task now
768
  code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
6,939✔
769
  if (code) {
6,937!
770
    tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId);
×
771
  }
772

773
  // commit the update
774
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
6,937✔
775
  tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
6,940✔
776
  if (numOfTasks == 0) {
6,940✔
777
    streamMetaResetStartInfo(&pMeta->startInfo, vgId);
1,618✔
778
  }
779

780
  if (streamMetaCommit(pMeta) < 0) {
6,940✔
781
    // persist to disk
782
  }
783

784
  streamMetaWUnLock(pMeta);
6,944✔
785
  tqDebug("vgId:%d process drop task:0x%x completed", vgId, pReq->taskId);
6,945✔
786

787
  return 0;  // always return success
6,945✔
788
}
789

790
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg) {
3,465✔
791
  SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
3,465✔
792
  int32_t                    code = 0;
3,465✔
793
  int32_t                    vgId = pMeta->vgId;
3,465✔
794
  SStreamTask*               pTask = NULL;
3,465✔
795

796
  tqDebug("vgId:%d receive msg to update-checkpoint-info for s-task:0x%x", vgId, pReq->taskId);
3,465✔
797

798
  streamMetaWLock(pMeta);
3,465✔
799

800
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
3,467✔
801
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
3,467✔
802
  if (code == 0) {
3,466!
803
    code = streamTaskUpdateTaskCheckpointInfo(pTask, restored, pReq);
3,466✔
804
    streamMetaReleaseTask(pMeta, pTask);
3,469✔
805
  } else {  // failed to get the task.
806
    int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
×
807
    tqError(
×
808
        "vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, numOfTasks:%d, it may have been "
809
        "dropped already",
810
        vgId, pReq->taskId, numOfTasks);
811
  }
812

813
  streamMetaWUnLock(pMeta);
3,469✔
814
  // always return success when handling the requirement issued by mnode during transaction.
815
  return TSDB_CODE_SUCCESS;
3,469✔
816
}
817

818
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
15✔
819
  int32_t         vgId = pMeta->vgId;
15✔
820
  int32_t         code = 0;
15✔
821
  int64_t         st = taosGetTimestampMs();
15✔
822
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
15✔
823

824
  if (pStartInfo->startAllTasks == 1) {
15✔
825
    // wait for the checkpoint id rsp, this rsp will be expired
826
    if (pStartInfo->curStage == START_MARK_REQ_CHKPID) {
3!
827
      SStartTaskStageInfo* pCurStageInfo = taosArrayGetLast(pStartInfo->pStagesList);
×
828
      tqInfo("vgId:%d only mark the req consensus checkpointId flag, reqTs:%"PRId64 " ignore and continue", vgId, pCurStageInfo->ts);
×
829

830
      taosArrayClear(pStartInfo->pStagesList);
×
831
      pStartInfo->curStage = 0;
×
832
      goto _start;
×
833

834
    } else if (pStartInfo->curStage == START_WAIT_FOR_CHKPTID) {
3!
835
      SStartTaskStageInfo* pCurStageInfo = taosArrayGetLast(pStartInfo->pStagesList);
3✔
836
      tqInfo("vgId:%d already sent consensus-checkpoint msg(waiting for chkptid) expired, reqTs:%" PRId64
3!
837
             " rsp will be discarded",
838
             vgId, pCurStageInfo->ts);
839

840
      taosArrayClear(pStartInfo->pStagesList);
3✔
841
      pStartInfo->curStage = 0;
3✔
842
      goto _start;
3✔
843

844
    } else if (pStartInfo->curStage == START_CHECK_DOWNSTREAM) {
×
845
      pStartInfo->restartCount += 1;
×
846
      tqDebug(
×
847
          "vgId:%d in start tasks procedure (check downstream), inc restartCounter by 1 and wait for it completes, "
848
          "remaining restart:%d",
849
          vgId, pStartInfo->restartCount);
850
    } else {
851
      tqInfo("vgId:%d in start procedure, but not start to do anything yet, do nothing", vgId);
×
852
    }
853

854
    return TSDB_CODE_SUCCESS;
×
855
  }
856

857
_start:
12✔
858

859
  pStartInfo->startAllTasks = 1;
15✔
860
  terrno = 0;
15✔
861
  tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d, ts:%" PRId64, vgId,
15!
862
         pMeta->updateInfo.completeTransId, pMeta->updateInfo.completeTs);
863

864
  streamMetaClear(pMeta);
15✔
865

866
  int64_t el = taosGetTimestampMs() - st;
15✔
867
  tqInfo("vgId:%d clear&close stream meta completed, elapsed time:%.3fs", vgId, el / 1000.);
15!
868

869
  streamMetaLoadAllTasks(pMeta);
15✔
870

871
  if (isLeader && !tsDisableStream) {
15!
872
    code = streamMetaStartAllTasks(pMeta);
15✔
873
  } else {
874
    streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
×
875
    pStartInfo->restartCount = 0;
×
876
    tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId);
×
877
  }
878

879
  code = terrno;
15✔
880
  return code;
15✔
881
}
882

883
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
75,956✔
884
  int32_t  code = 0;
75,956✔
885
  int32_t  vgId = pMeta->vgId;
75,956✔
886
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
75,956✔
887
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
75,956✔
888
  SDecoder decoder;
889

890
  SStreamTaskRunReq req = {0};
75,956✔
891
  tDecoderInit(&decoder, (uint8_t*)msg, len);
75,956✔
892
  if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
76,062!
893
    tqError("vgId:%d failed to decode task run req, code:%s", pMeta->vgId, tstrerror(code));
×
894
    tDecoderClear(&decoder);
×
895
    return TSDB_CODE_SUCCESS;
×
896
  }
897

898
  tDecoderClear(&decoder);
76,056✔
899

900
  int32_t type = req.reqType;
76,070✔
901
  if (type == STREAM_EXEC_T_START_ONE_TASK) {
76,070✔
902
    code = streamMetaStartOneTask(pMeta, req.streamId, req.taskId);
9,310✔
903
    return 0;
9,306✔
904
  } else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
66,760✔
905
    streamMetaWLock(pMeta);
9,698✔
906
    code = streamMetaStartAllTasks(pMeta);
9,696✔
907
    streamMetaWUnLock(pMeta);
9,698✔
908
    return 0;
9,698✔
909
  } else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) {
57,062✔
910
    streamMetaWLock(pMeta);
15✔
911
    code = restartStreamTasks(pMeta, isLeader);
15✔
912
    streamMetaWUnLock(pMeta);
15✔
913
    return 0;
9✔
914
  } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
57,047✔
915
    code = streamMetaStopAllTasks(pMeta);
4,819✔
916
    return 0;
4,820✔
917
  } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
52,228✔
918
    code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId, true);
1✔
919
    return code;
1✔
920
  } else if (type == STREAM_EXEC_T_STOP_ONE_TASK) {
52,227!
921
    code = streamMetaStopOneTask(pMeta, req.streamId, req.taskId);
×
922
    return code;
×
923
  } else if (type == STREAM_EXEC_T_RESUME_TASK) {  // task resume to run after idle for a while
52,227✔
924
    SStreamTask* pTask = NULL;
4,446✔
925
    code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
4,446✔
926

927
    if (pTask != NULL && (code == 0)) {
4,448!
928
      char* pStatus = NULL;
4,447✔
929
      if (streamTaskReadyToRun(pTask, &pStatus)) {
4,447!
930
        int64_t execTs = pTask->status.lastExecTs;
4,442✔
931
        int32_t idle = taosGetTimestampMs() - execTs;
4,444✔
932
        tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs);
4,444✔
933

934
        code = streamResumeTask(pTask);
4,444✔
935
      } else {
936
        int8_t status = streamTaskSetSchedStatusInactive(pTask);
×
937
        tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
×
938
                pTask->id.idStr, pStatus, status);
939
      }
940
      streamMetaReleaseTask(pMeta, pTask);
4,448✔
941
    }
942

943
    return code;
4,448✔
944
  }
945

946
  SStreamTask* pTask = NULL;
47,781✔
947
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
47,781✔
948
  if ((pTask != NULL) && (code == 0)) {  // even in halt status, the data in inputQ must be processed
47,771!
949
    char* p = NULL;
47,759✔
950
    if (streamTaskReadyToRun(pTask, &p)) {
47,759✔
951
      tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId,
47,728✔
952
              pTask->id.idStr, p, pTask->chkInfo.nextProcessVer);
953
      (void)streamExecTask(pTask);
47,728✔
954
    } else {
955
      int8_t status = streamTaskSetSchedStatusInactive(pTask);
23✔
956
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
26!
957
              pTask->id.idStr, p, status);
958
    }
959

960
    streamMetaReleaseTask(pMeta, pTask);
47,747✔
961
    return 0;
47,762✔
962
  } else {  // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
963
    // todo add one function to handle this
964
    tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, req.taskId);
12!
965
    return code;
16✔
966
  }
967
}
968

969
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
45✔
970
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
45✔
971
  int32_t         vgId = pMeta->vgId;
45✔
972
  bool            scanWal = false;
45✔
973
  int32_t         code = 0;
45✔
974

975
//  streamMetaWLock(pMeta);
976
  if (pStartInfo->startAllTasks == 1) {
45!
977
    tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId,
×
978
            pMeta->startInfo.restartCount);
979
  } else {  // not in starting procedure
980
    bool allReady = streamMetaAllTasksReady(pMeta);
45✔
981

982
    if ((pStartInfo->restartCount > 0) && (!allReady)) {
45!
983
      // if all tasks are ready now, do NOT restart again, and reset the value of pStartInfo->restartCount
984
      pStartInfo->restartCount -= 1;
×
985
      tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", vgId, pMeta->role,
×
986
              pStartInfo->restartCount);
987
//      streamMetaWUnLock(pMeta);
988

989
      return restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
×
990
    } else {
991
      if (pStartInfo->restartCount == 0) {
45!
992
        tqDebug("vgId:%d start all tasks completed in callbackFn, restartCounter is 0", pMeta->vgId);
45✔
993
      } else if (allReady) {
×
994
        pStartInfo->restartCount = 0;
×
995
        tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
×
996
      }
997

998
      scanWal = true;
45✔
999
    }
1000
  }
1001

1002
//  streamMetaWUnLock(pMeta);
1003

1004
  return code;
45✔
1005
}
1006

1007
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
×
1008
  SVResetStreamTaskReq* pReq = (SVResetStreamTaskReq*)pMsg;
×
1009

1010
  SStreamTask* pTask = NULL;
×
1011
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
×
1012
  if (pTask == NULL || (code != 0)) {
×
1013
    tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already",
×
1014
            pMeta->vgId, pReq->taskId);
1015
    return TSDB_CODE_SUCCESS;
×
1016
  }
1017

1018
  tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
×
1019

1020
  streamMutexLock(&pTask->lock);
×
1021

1022
  streamTaskSetFailedCheckpointId(pTask, pReq->chkptId);
×
1023
  streamTaskClearCheckInfo(pTask, true);
×
1024

1025
  // clear flag set during do checkpoint, and open inputQ for all upstream tasks
1026
  SStreamTaskState pState = streamTaskGetStatus(pTask);
×
1027
  if (pState.state == TASK_STATUS__CK) {
×
1028
    streamTaskSetStatusReady(pTask);
×
1029
    tqDebug("s-task:%s reset checkpoint status to ready", pTask->id.idStr);
×
1030
  } else if (pState.state == TASK_STATUS__UNINIT) {
×
1031
    //    tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
1032
    //    tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
1033
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
1034
  } else {
1035
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
1036
  }
1037

1038
  streamMutexUnlock(&pTask->lock);
×
1039

1040
  streamMetaReleaseTask(pMeta, pTask);
×
1041
  return TSDB_CODE_SUCCESS;
×
1042
}
1043

1044
int32_t tqStreamTaskProcessAllTaskStopReq(SStreamMeta* pMeta, SMsgCb* pMsgCb, SRpcMsg* pMsg) {
3,898✔
1045
  int32_t  code = 0;
3,898✔
1046
  int32_t  vgId = pMeta->vgId;
3,898✔
1047
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
3,898✔
1048
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
3,898✔
1049
  SDecoder decoder;
1050

1051
  SStreamTaskStopReq req = {0};
3,898✔
1052
  tDecoderInit(&decoder, (uint8_t*)msg, len);
3,898✔
1053
  if ((code = tDecodeStreamTaskStopReq(&decoder, &req)) < 0) {
3,901!
1054
    tqError("vgId:%d failed to decode stop all streams, code:%s", pMeta->vgId, tstrerror(code));
×
1055
    tDecoderClear(&decoder);
×
1056
    return TSDB_CODE_SUCCESS;
×
1057
  }
1058

1059
  tDecoderClear(&decoder);
3,898✔
1060

1061
  // stop all stream tasks, only invoked when trying to drop db
1062
  if (req.streamId <= 0) {
3,901!
1063
    tqDebug("vgId:%d recv msg to stop all tasks in sync before dropping vnode", vgId);
3,906✔
1064
    code = streamMetaStopAllTasks(pMeta);
3,907✔
1065
    if (code) {
3,895!
1066
      tqError("vgId:%d failed to stop all tasks, code:%s", vgId, tstrerror(code));
×
1067
    }
1068

1069
  } else {  // stop only one stream tasks
1070

1071
  }
1072

1073
  // always return success
1074
  return TSDB_CODE_SUCCESS;
3,895✔
1075
}
1076

1077
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
1078
  SRetrieveChkptTriggerReq req = {0};
×
1079
  SStreamTask*             pTask = NULL;
×
1080
  char*                    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1081
  int32_t                  len = pMsg->contLen - sizeof(SMsgHead);
×
1082
  SDecoder                 decoder = {0};
×
1083

1084
  tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1085
  if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
×
1086
    tDecoderClear(&decoder);
×
1087
    tqError("vgId:%d invalid retrieve checkpoint-trigger req received", pMeta->vgId);
×
1088
    return TSDB_CODE_INVALID_MSG;
×
1089
  }
1090
  tDecoderClear(&decoder);
×
1091

1092
  int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
×
1093
  if (pTask == NULL || (code != 0)) {
×
1094
    tqError("vgId:%d process retrieve checkpoint-trigger, checkpointId:%" PRId64
×
1095
            " from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already",
1096
            pMeta->vgId, req.checkpointId, (int32_t)req.downstreamTaskId, req.upstreamTaskId);
1097
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
1098
  }
1099

1100
  tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64,
×
1101
          req.upstreamTaskId, (int32_t)req.downstreamTaskId, req.checkpointId);
1102

1103
  if (pTask->status.downstreamReady != 1) {
×
1104
    tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
×
1105
            pTask->id.idStr, (int32_t)req.downstreamTaskId);
1106

1107
    code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1108
                                              TSDB_CODE_STREAM_TASK_IVLD_STATUS);
1109
    streamMetaReleaseTask(pMeta, pTask);
×
1110
    return code;
×
1111
  }
1112

1113
  SStreamTaskState pState = streamTaskGetStatus(pTask);
×
1114
  if (pState.state == TASK_STATUS__CK) {  // recv the checkpoint-source/trigger already
×
1115
    int32_t transId = 0;
×
1116
    int64_t checkpointId = 0;
×
1117

1118
    streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId);
×
1119
    if (checkpointId != req.checkpointId) {
×
1120
      tqError("s-task:%s invalid checkpoint-trigger retrieve msg from 0x%" PRIx64 ", current checkpointId:%" PRId64
×
1121
              " req:%" PRId64,
1122
              pTask->id.idStr, req.downstreamTaskId, checkpointId, req.checkpointId);
1123
      streamMetaReleaseTask(pMeta, pTask);
×
1124
      return TSDB_CODE_INVALID_MSG;
×
1125
    }
1126

1127
    if (streamTaskAlreadySendTrigger(pTask, req.downstreamNodeId)) {
×
1128
      // re-send the lost checkpoint-trigger msg to downstream task
1129
      tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
×
1130
              (int32_t)req.downstreamTaskId, checkpointId, transId);
1131
      code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1132
                                                TSDB_CODE_SUCCESS);
1133
    } else {  // not send checkpoint-trigger yet, wait
1134
      int32_t recv = 0, total = 0;
×
1135
      streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
×
1136

1137
      if (recv == total) {  // add the ts info
×
1138
        tqWarn("s-task:%s all upstream send checkpoint-source/trigger, but not processed yet, wait", pTask->id.idStr);
×
1139
      } else {
1140
        tqWarn(
×
1141
            "s-task:%s not all upstream send checkpoint-source/trigger, total recv:%d/%d, wait for all upstream "
1142
            "sending checkpoint-source/trigger",
1143
            pTask->id.idStr, recv, total);
1144
      }
1145
      code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1146
                                                TSDB_CODE_ACTION_IN_PROGRESS);
1147
    }
1148
  } else {  // upstream not recv the checkpoint-source/trigger till now
1149
    if (!(pState.state == TASK_STATUS__READY || pState.state == TASK_STATUS__HALT)) {
×
1150
      tqFatal("s-task:%s invalid task status:%s", pTask->id.idStr, pState.name);
×
1151
    }
1152

1153
    tqWarn(
×
1154
        "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
1155
        "upstream sending checkpoint-source/trigger",
1156
        pTask->id.idStr);
1157
    code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1158
                                              TSDB_CODE_ACTION_IN_PROGRESS);
1159
  }
1160

1161
  streamMetaReleaseTask(pMeta, pTask);
×
1162
  return code;
×
1163
}
1164

1165
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
1166
  SCheckpointTriggerRsp rsp = {0};
×
1167
  SStreamTask*          pTask = NULL;
×
1168
  char*                 msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1169
  int32_t               len = pMsg->contLen - sizeof(SMsgHead);
×
1170
  SDecoder              decoder = {0};
×
1171

1172
  tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1173
  if (tDecodeCheckpointTriggerRsp(&decoder, &rsp) < 0) {
×
1174
    tDecoderClear(&decoder);
×
1175
    tqError("vgId:%d invalid retrieve checkpoint-trigger rsp received", pMeta->vgId);
×
1176
    return TSDB_CODE_INVALID_MSG;
×
1177
  }
1178
  tDecoderClear(&decoder);
×
1179

1180
  int32_t code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.taskId, &pTask);
×
1181
  if (pTask == NULL || (code != 0)) {
×
1182
    tqError(
×
1183
        "vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already",
1184
        pMeta->vgId, rsp.taskId);
1185
    return code;
×
1186
  }
1187

1188
  tqDebug(
×
1189
      "s-task:%s recv re-send checkpoint-trigger msg through retrieve/rsp channel, upstream:0x%x, checkpointId:%" PRId64
1190
      ", transId:%d",
1191
      pTask->id.idStr, rsp.upstreamTaskId, rsp.checkpointId, rsp.transId);
1192

1193
  code = streamTaskProcessCheckpointTriggerRsp(pTask, &rsp);
×
1194
  streamMetaReleaseTask(pMeta, pTask);
×
1195
  return code;
×
1196
}
1197

1198
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
744✔
1199
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
744✔
1200

1201
  SStreamTask* pTask = NULL;
744✔
1202
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
744✔
1203
  if (pTask == NULL || (code != 0)) {
749!
1204
    tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
1!
1205
            pReq->taskId);
1206
    // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
1207
    return TSDB_CODE_SUCCESS;
×
1208
  }
1209

1210
  tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
748✔
1211
  streamTaskPause(pTask);
748✔
1212

1213
  SStreamTask* pHistoryTask = NULL;
750✔
1214
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
750!
1215
    pHistoryTask = NULL;
×
1216
    code = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId, &pHistoryTask);
×
1217
    if (pHistoryTask == NULL || (code != 0)) {
×
1218
      tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
×
1219
              ", it may have been dropped already",
1220
              pMeta->vgId, pTask->hTaskInfo.id.taskId);
1221
      streamMetaReleaseTask(pMeta, pTask);
×
1222

1223
      // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
1224
      return TSDB_CODE_SUCCESS;
×
1225
    }
1226

1227
    tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
×
1228

1229
    streamTaskPause(pHistoryTask);
×
1230
    streamMetaReleaseTask(pMeta, pHistoryTask);
×
1231
  }
1232

1233
  streamMetaReleaseTask(pMeta, pTask);
750✔
1234
  return TSDB_CODE_SUCCESS;
750✔
1235
}
1236

1237
static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t sversion, int8_t igUntreated,
1,243✔
1238
                                       bool fromVnode) {
1239
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
1,243✔
1240
  int32_t      vgId = pMeta->vgId;
1,243✔
1241
  int32_t      code = 0;
1,243✔
1242

1243
  streamTaskResume(pTask);
1,243✔
1244
  ETaskStatus status = streamTaskGetStatus(pTask).state;
1,247✔
1245

1246
  int32_t level = pTask->info.taskLevel;
1,247✔
1247
  if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
1,247!
1248
    // no lock needs to secure the access of the version
1249
    if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
1,240!
1250
      // discard all the data  when the stream task is suspended.
1251
      walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
253✔
1252
      tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
253✔
1253
              ", schedStatus:%d",
1254
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1255
    } else {  // from the previous paused version and go on
1256
      tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
987✔
1257
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1258
    }
1259

1260
    if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
1,240!
1261
      pTask->hTaskInfo.operatorOpen = false;
×
1262
      code = streamStartScanHistoryAsync(pTask, igUntreated);
×
1263
    } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
1,240✔
1264
      //      code = tqScanWalAsync((STQ*)handle, false);
1265
    } else {
1266
      code = streamTrySchedExec(pTask, false);
625✔
1267
    }
1268
  }
1269

1270
  return code;
1,246✔
1271
}
1272

1273
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode) {
1,238✔
1274
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
1,238✔
1275

1276
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
1,238✔
1277

1278
  SStreamTask* pTask = NULL;
1,238✔
1279
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
1,238✔
1280
  if (pTask == NULL || (code != 0)) {
1,246!
1281
    tqError("s-task:0x%x failed to acquire task to resume, it may have been dropped or stopped", pReq->taskId);
×
1282
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1283
  }
1284

1285
  streamMutexLock(&pTask->lock);
1,246✔
1286
  SStreamTaskState pState = streamTaskGetStatus(pTask);
1,246✔
1287
  tqDebug("s-task:%s start to resume from paused, current status:%s", pTask->id.idStr, pState.name);
1,246✔
1288
  streamMutexUnlock(&pTask->lock);
1,246✔
1289

1290
  code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
1,247✔
1291
  if (code != 0) {
1,246!
1292
    streamMetaReleaseTask(pMeta, pTask);
×
1293
    return code;
×
1294
  }
1295

1296
  STaskId*     pHTaskId = &pTask->hTaskInfo.id;
1,246✔
1297
  SStreamTask* pHTask = NULL;
1,246✔
1298
  code = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
1,246✔
1299
  if (pHTask && (code == 0)) {
1,246!
1300
    streamMutexLock(&pHTask->lock);
×
1301
    SStreamTaskState p = streamTaskGetStatus(pHTask);
×
1302
    tqDebug("s-task:%s related history task start to resume from paused, current status:%s", pHTask->id.idStr, p.name);
×
1303
    streamMutexUnlock(&pHTask->lock);
×
1304

1305
    code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode);
×
1306
    tqDebug("s-task:%s resume complete, code:%s", pHTask->id.idStr, tstrerror(code));
×
1307

1308
    streamMetaReleaseTask(pMeta, pHTask);
×
1309
  }
1310

1311
  streamMetaReleaseTask(pMeta, pTask);
1,246✔
1312
  return TSDB_CODE_SUCCESS;
1,245✔
1313
}
1314

1315
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); }
×
1316

1317
int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) {
8,450✔
1318
  rpcFreeCont(pMsg->pCont);
8,450✔
1319
  pMsg->pCont = NULL;
8,450✔
1320
  return TSDB_CODE_SUCCESS;
8,450✔
1321
}
1322

1323
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
7,637✔
1324
  SMStreamHbRspMsg rsp = {0};
7,637✔
1325
  int32_t          code = 0;
7,637✔
1326
  SDecoder         decoder;
1327
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
7,637✔
1328
  int32_t          len = pMsg->contLen - sizeof(SMsgHead);
7,637✔
1329

1330
  tDecoderInit(&decoder, (uint8_t*)msg, len);
7,637✔
1331
  code = tDecodeStreamHbRsp(&decoder, &rsp);
7,637✔
1332
  if (code < 0) {
7,637!
1333
    terrno = TSDB_CODE_INVALID_MSG;
×
1334
    tDecoderClear(&decoder);
×
1335
    tqError("vgId:%d failed to parse hb rsp msg, code:%s", pMeta->vgId, tstrerror(terrno));
×
1336
    return terrno;
×
1337
  }
1338

1339
  tDecoderClear(&decoder);
7,637✔
1340
  return streamProcessHeartbeatRsp(pMeta, &rsp);
7,637✔
1341
}
1342

1343
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
4,268✔
1344

1345
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
4,182✔
1346

1347
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
6,483✔
1348
  SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont;
6,483✔
1349

1350
  SStreamTask* pTask = NULL;
6,483✔
1351
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->downstreamTaskId, &pTask);
6,483✔
1352
  if (pTask == NULL || (code != 0)) {
6,483!
1353
    tqError("vgId:%d failed to acquire task:0x%x when handling checkpoint-ready msg, it may have been dropped",
×
1354
            pRsp->downstreamNodeId, pRsp->downstreamTaskId);
1355
    return code;
×
1356
  }
1357

1358
  code = streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId);
6,483✔
1359
  streamMetaReleaseTask(pMeta, pTask);
6,483✔
1360
  return code;
6,483✔
1361
}
1362

1363
int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
165✔
1364
  int32_t                vgId = pMeta->vgId;
165✔
1365
  int32_t                code = 0;
165✔
1366
  SStreamTask*           pTask = NULL;
165✔
1367
  char*                  msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
165✔
1368
  int32_t                len = pMsg->contLen - sizeof(SMsgHead);
165✔
1369
  int64_t                now = taosGetTimestampMs();
165✔
1370
  SDecoder               decoder;
1371
  SRestoreCheckpointInfo req = {0};
165✔
1372

1373
  tDecoderInit(&decoder, (uint8_t*)msg, len);
165✔
1374
  if ((code = tDecodeRestoreCheckpointInfo(&decoder, &req)) < 0) {
166!
1375
    tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code));
×
1376
    tDecoderClear(&decoder);
×
1377
    return TSDB_CODE_SUCCESS;
×
1378
  }
1379

1380
  tDecoderClear(&decoder);
166✔
1381

1382
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
165✔
1383
  if (pTask == NULL || (code != 0)) {
166!
1384
    // ignore this code to avoid error code over writing
1385
    if (pMeta->role == NODE_ROLE_LEADER) {
12!
1386
      tqError("vgId:%d process consensus checkpointId req:%" PRId64
×
1387
              " transId:%d, failed to acquire task:0x%x, it may have been dropped/stopped already",
1388
              pMeta->vgId, req.checkpointId, req.transId, req.taskId);
1389

1390
      int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId, true);
×
1391
      if (ret) {
×
1392
        tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret));
×
1393
      }
1394
    } else {
1395
      tqDebug("vgId:%d task:0x%x stopped in follower node, not set the consensus checkpointId:%" PRId64 " transId:%d",
12!
1396
              pMeta->vgId, req.taskId, req.checkpointId, req.transId);
1397
    }
1398

1399
    return 0;
12✔
1400
  }
1401

1402
  // discard the rsp, since it is expired.
1403
  if (req.startTs < pTask->execInfo.created) {
154!
1404
    tqWarn("s-task:%s vgId:%d createTs:%" PRId64 " recv expired consensus checkpointId:%" PRId64
×
1405
           " from task createTs:%" PRId64 " < task createTs:%" PRId64 ", discard",
1406
           pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs,
1407
           pTask->execInfo.created);
1408
    if (pMeta->role == NODE_ROLE_LEADER) {
×
1409
      streamMetaAddFailedTaskSelf(pTask, now, true);
×
1410
    }
1411

1412
    streamMetaReleaseTask(pMeta, pTask);
×
1413
    return TSDB_CODE_SUCCESS;
×
1414
  }
1415

1416
  tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64
154✔
1417
          " transId:%d from mnode, reqTs:%" PRId64 " task createTs:%" PRId64,
1418
          pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId, req.transId, req.startTs,
1419
          pTask->execInfo.created);
1420

1421
  streamMutexLock(&pTask->lock);
154✔
1422
  SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo;
154✔
1423

1424
  if (pTask->chkInfo.checkpointId < req.checkpointId) {
154!
1425
    tqFatal("s-task:%s vgId:%d invalid consensus-checkpointId:%" PRId64 ", greater than existed checkpointId:%" PRId64,
×
1426
            pTask->id.idStr, vgId, req.checkpointId, pTask->chkInfo.checkpointId);
1427

1428
    streamMutexUnlock(&pTask->lock);
×
1429
    streamMetaReleaseTask(pMeta, pTask);
×
1430
    return 0;
×
1431
  }
1432

1433
  if (pConsenInfo->consenChkptTransId >= req.transId) {
154!
1434
    tqWarn("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
×
1435
            pConsenInfo->consenChkptTransId, req.transId);
1436
    streamMutexUnlock(&pTask->lock);
×
1437
    streamMetaReleaseTask(pMeta, pTask);
×
1438
    return TSDB_CODE_SUCCESS;
×
1439
  }
1440

1441
  if (pTask->chkInfo.checkpointId != req.checkpointId) {
154!
1442
    tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64 " transId:%d", pTask->id.idStr, vgId,
×
1443
            pTask->chkInfo.checkpointId, req.checkpointId, req.transId);
1444
    pTask->chkInfo.checkpointId = req.checkpointId;
×
1445
    tqSetRestoreVersionInfo(pTask);
×
1446
  } else {
1447
    tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current id, transId:%d not update",
154✔
1448
            pTask->id.idStr, vgId, req.checkpointId, req.transId);
1449
  }
1450

1451
  streamTaskSetConsenChkptIdRecv(pTask, req.transId, now);
154✔
1452
  streamMutexUnlock(&pTask->lock);
154✔
1453

1454
  streamMetaWLock(pTask->pMeta);
154✔
1455
  if (pMeta->startInfo.curStage == START_WAIT_FOR_CHKPTID) {
154✔
1456
    pMeta->startInfo.curStage = START_CHECK_DOWNSTREAM;
38✔
1457

1458
    SStartTaskStageInfo info = {.stage = pMeta->startInfo.curStage, .ts = now};
38✔
1459
    taosArrayPush(pMeta->startInfo.pStagesList, &info);
38✔
1460

1461
    tqDebug("vgId:%d wait_for_chkptId stage -> check_down_stream stage, reqTs:%" PRId64 " , numOfStageHist:%d",
38✔
1462
            pMeta->vgId, info.ts, (int32_t)taosArrayGetSize(pMeta->startInfo.pStagesList));
1463
  }
1464

1465
  streamMetaWUnLock(pTask->pMeta);
154✔
1466

1467
  if (pMeta->role == NODE_ROLE_LEADER) {
154!
1468
    code = tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
154✔
1469
    if (code) {
152!
1470
      tqError("s-task:0x%x vgId:%d failed start task async, code:%s", req.taskId, vgId, tstrerror(code));
×
1471
    }
1472
  } else {
1473
    tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr);
×
1474
  }
1475

1476
  streamMetaReleaseTask(pMeta, pTask);
152✔
1477
  return 0;
154✔
1478
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc