• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3651

13 Mar 2025 05:26AM UTC coverage: 63.126% (+9.6%) from 53.496%
#3651

push

travis-ci

web-flow
Merge pull request #30158 from taosdata/docs/anchor-caps-30

docs: lowercase anchors for 3.0

148245 of 301793 branches covered (49.12%)

Branch coverage included in aggregate %.

233046 of 302220 relevant lines covered (77.11%)

6452173.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.98
/source/dnode/vnode/src/tqCommon/tqCommon.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsgcb.h"
17
#include "tq.h"
18
#include "tstream.h"
19

20
typedef struct SMStreamCheckpointReadyRspMsg {
21
  SMsgHead head;
22
  int64_t  streamId;
23
  int32_t  upstreamTaskId;
24
  int32_t  upstreamNodeId;
25
  int32_t  downstreamTaskId;
26
  int32_t  downstreamNodeId;
27
  int64_t  checkpointId;
28
  int32_t  transId;
29
} SMStreamCheckpointReadyRspMsg;
30

31
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
32

33
int32_t tqExpandStreamTask(SStreamTask* pTask) {
14,093✔
34
  SStreamMeta* pMeta = pTask->pMeta;
14,093✔
35
  int32_t      vgId = pMeta->vgId;
14,093✔
36
  int64_t      st = taosGetTimestampMs();
14,092✔
37
  int64_t      streamId = 0;
14,092✔
38
  int32_t      taskId = 0;
14,092✔
39
  int32_t      code = 0;
14,092✔
40

41
  tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId);
14,092✔
42

43
  if (pTask->info.fillHistory) {
14,092✔
44
    streamId = pTask->streamTaskId.streamId;
4,777✔
45
    taskId = pTask->streamTaskId.taskId;
4,777✔
46
  } else {
47
    streamId = pTask->id.streamId;
9,315✔
48
    taskId = pTask->id.taskId;
9,315✔
49
  }
50

51
  // sink task does not need the pState
52
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
14,092✔
53
    pTask->pState = streamStateOpen(pMeta->path, pTask, streamId, taskId);
7,376✔
54
    if (pTask->pState == NULL) {
7,376!
55
      tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
×
56
      return terrno;
×
57
    } else {
58
      tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
7,376✔
59
    }
60
  }
61

62
  SReadHandle handle = {
14,093✔
63
      .checkpointId = pTask->chkInfo.checkpointId,
14,093✔
64
      .pStateBackend = pTask->pState,
14,093✔
65
      .fillHistory = pTask->info.fillHistory,
14,093✔
66
      .winRange = pTask->dataRange.window,
67
  };
68

69
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
14,093✔
70
    handle.vnode = ((STQ*)pMeta->ahandle)->pVnode;
7,079✔
71
    handle.initTqReader = 1;
7,079✔
72
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
7,014✔
73
    handle.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
297✔
74
  }
75

76
  initStorageAPI(&handle.api);
14,093✔
77

78
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG) {
14,093✔
79
    code = qCreateStreamExecTaskInfo(&pTask->exec.pExecutor, pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
7,376✔
80
    if (code) {
7,376!
81
      tqError("s-task:%s failed to expand task, code:%s", pTask->id.idStr, tstrerror(code));
×
82
      return code;
×
83
    }
84

85
    code = qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
7,376✔
86
    if (code) {
7,375!
87
      return code;
×
88
    }
89

90
    code = qSetStreamNotifyInfo(pTask->exec.pExecutor, pTask->notifyInfo.notifyEventTypes,
14,750✔
91
                                pTask->notifyInfo.pSchemaWrapper, pTask->notifyInfo.stbFullName,
7,375✔
92
                                IS_NEW_SUBTB_RULE(pTask), &pTask->notifyEventStat);
7,375!
93
    if (code) {
7,375!
94
      tqError("s-task:%s failed to set stream notify info, code:%s", pTask->id.idStr, tstrerror(code));
×
95
      return code;
×
96
    }
97
  }
98

99
  streamSetupScheduleTrigger(pTask);
14,092✔
100

101
  double el = (taosGetTimestampMs() - st) / 1000.0;
14,092✔
102
  tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el);
14,092✔
103

104
  return code;
14,091✔
105
}
106

107
void tqSetRestoreVersionInfo(SStreamTask* pTask) {
14,372✔
108
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
14,372✔
109

110
  // checkpoint ver is the kept version, handled data should be the next version.
111
  if (pChkInfo->checkpointId != 0) {
14,372✔
112
    pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
221✔
113
    pChkInfo->processedVer = pChkInfo->checkpointVer;
221✔
114
    pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
221✔
115

116
    tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
221!
117
           pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
118
  }
119

120
  pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
14,372✔
121
}
14,372✔
122

123
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
14✔
124
  int32_t vgId = pMeta->vgId;
14✔
125
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
14✔
126
  if (numOfTasks == 0) {
14!
127
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
128
    return 0;
×
129
  }
130

131
  tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
14✔
132

133
  int32_t type = restart ? STREAM_EXEC_T_RESTART_ALL_TASKS : STREAM_EXEC_T_START_ALL_TASKS;
14!
134
  return streamTaskSchedTask(cb, vgId, 0, 0, type, false);
14✔
135
}
136

137
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
9,288✔
138
  int32_t vgId = pMeta->vgId;
9,288✔
139
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
9,288✔
140
  if (numOfTasks == 0) {
9,288!
141
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
142
    return 0;
×
143
  }
144

145
  tqDebug("vgId:%d start task:0x%x async", vgId, taskId);
9,288✔
146
  return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK, false);
9,288✔
147
}
148

149
// this is to process request from transaction, always return true.
150
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader) {
36✔
151
  int32_t      vgId = pMeta->vgId;
36✔
152
  char*        msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
36✔
153
  int32_t      len = pMsg->contLen - sizeof(SMsgHead);
36✔
154
  SRpcMsg      rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
36✔
155
  int64_t      st = taosGetTimestampMs();
36✔
156
  bool         updated = false;
36✔
157
  int32_t      code = 0;
36✔
158
  SStreamTask* pTask = NULL;
36✔
159
  SStreamTask* pHTask = NULL;
36✔
160

161
  SStreamTaskNodeUpdateMsg req = {0};
36✔
162

163
  SDecoder decoder;
164
  tDecoderInit(&decoder, (uint8_t*)msg, len);
36✔
165
  if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
36!
166
    rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
×
167
    tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code));
×
168
    tDecoderClear(&decoder);
×
169
    return rsp.code;
×
170
  }
171

172
  tDecoderClear(&decoder);
36✔
173

174
  int32_t gError = streamGetFatalError(pMeta);
36✔
175
  if (gError != 0) {
36!
176
    tqError("vgId:%d global fatal occurs, code:%s, ts:%" PRId64 " func:%s", pMeta->vgId, tstrerror(gError),
×
177
            pMeta->fatalInfo.ts, pMeta->fatalInfo.func);
178
    return 0;
×
179
  }
180

181
  // update the nodeEpset when it exists
182
  streamMetaWLock(pMeta);
36✔
183

184
  // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
185
  STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
36✔
186
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
36✔
187
  if (code != 0) {
36!
188
    tqError("vgId:%d failed to acquire task:0x%x when handling update task epset, it may have been dropped", vgId,
×
189
            req.taskId);
190
    rsp.code = TSDB_CODE_SUCCESS;
×
191
    streamMetaWUnLock(pMeta);
×
192
    taosArrayDestroy(req.pNodeList);
×
193
    return rsp.code;
×
194
  }
195

196
  const char* idstr = pTask->id.idStr;
36✔
197

198
  if (req.transId <= 0) {
36!
199
    tqError("vgId:%d invalid update nodeEp task, transId:%d, discard", vgId, req.taskId);
×
200
    rsp.code = TSDB_CODE_SUCCESS;
×
201

202
    streamMetaReleaseTask(pMeta, pTask);
×
203
    streamMetaWUnLock(pMeta);
×
204

205
    taosArrayDestroy(req.pNodeList);
×
206
    return rsp.code;
×
207
  }
208

209
  // info needs to be kept till the new trans to update the nodeEp arrived.
210
  bool update = streamMetaInitUpdateTaskList(pMeta, req.transId);
36✔
211
  if (!update) {
36!
212
    rsp.code = TSDB_CODE_SUCCESS;
×
213

214
    streamMetaReleaseTask(pMeta, pTask);
×
215
    streamMetaWUnLock(pMeta);
×
216

217
    taosArrayDestroy(req.pNodeList);
×
218
    return rsp.code;
×
219
  }
220

221
  // duplicate update epset msg received, discard this redundant message
222
  STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
36✔
223

224
  void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
36✔
225
  if (pReqTask != NULL) {
36!
226
    tqDebug("s-task:%s (vgId:%d) already update in transId:%d, discard the nodeEp update msg", idstr, vgId,
×
227
            req.transId);
228
    rsp.code = TSDB_CODE_SUCCESS;
×
229

230
    streamMetaReleaseTask(pMeta, pTask);
×
231
    streamMetaWUnLock(pMeta);
×
232

233
    taosArrayDestroy(req.pNodeList);
×
234
    return rsp.code;
×
235
  }
236

237
  updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
36✔
238

239
  // send the checkpoint-source-rsp for source task to end the checkpoint trans in mnode
240
  code = streamTaskSendCheckpointsourceRsp(pTask);
36✔
241
  if (code) {
36!
242
    tqError("%s failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
×
243
  }
244
  streamTaskResetStatus(pTask);
36✔
245

246
  streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
36✔
247

248
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
36✔
249
    code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->hTaskInfo.id, &pHTask);
19✔
250
    if (code != 0) {
19!
251
      tqError(
×
252
          "vgId:%d failed to acquire fill-history task:0x%x when handling update, may have been dropped already, rel "
253
          "stream task:0x%x",
254
          vgId, (uint32_t)pTask->hTaskInfo.id.taskId, req.taskId);
255
      CLEAR_RELATED_FILLHISTORY_TASK(pTask);
×
256
    } else {
257
      tqDebug("s-task:%s fill-history task update nodeEp along with stream task", pHTask->id.idStr);
19!
258
      bool updateEpSet = streamTaskUpdateEpsetInfo(pHTask, req.pNodeList);
19✔
259
      if (updateEpSet) {
19!
260
        updated = updateEpSet;
19✔
261
      }
262

263
      streamTaskResetStatus(pHTask);
19✔
264
      streamTaskStopMonitorCheckRsp(&pHTask->taskCheckInfo, pHTask->id.idStr);
19✔
265
    }
266
  }
267

268
  // stream do update the nodeEp info, write it into stream meta.
269
  if (updated) {
36✔
270
    tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId);
31✔
271
    code = streamMetaSaveTaskInMeta(pMeta, pTask);
31✔
272
    if (code) {
31!
273
      tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code));
×
274
    }
275

276
    if (pHTask != NULL) {
31✔
277
      code = streamMetaSaveTaskInMeta(pMeta, pHTask);
19✔
278
      if (code) {
19!
279
        tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code));
×
280
      }
281
    }
282
  } else {
283
    tqDebug("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId);
5!
284
  }
285

286
  code = streamTaskStop(pTask);
36✔
287
  if (code) {
36!
288
    tqError("s-task:%s vgId:%d failed to stop task, code:%s", idstr, vgId, tstrerror(code));
×
289
  }
290

291
  if (pHTask != NULL) {
36✔
292
    code = streamTaskStop(pHTask);
19✔
293
    if (code) {
19!
294
      tqError("s-task:%s vgId:%d failed to stop related history task, code:%s", idstr, vgId, tstrerror(code));
×
295
    }
296
  }
297

298
  // keep info
299
  streamMetaAddIntoUpdateTaskList(pMeta, pTask, (pHTask != NULL) ? (pHTask) : NULL, req.transId, st);
36✔
300
  streamMetaReleaseTask(pMeta, pTask);
36✔
301
  streamMetaReleaseTask(pMeta, pHTask);
36✔
302

303
  rsp.code = TSDB_CODE_SUCCESS;
36✔
304

305
  // possibly only handle the stream task.
306
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
36✔
307
  int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
36✔
308

309
  if (restored && isLeader) {
36!
310
    tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId);
24✔
311
    pMeta->startInfo.tasksWillRestart = 1;
24✔
312
  }
313

314
  if (updateTasks < numOfTasks) {
36✔
315
    if (isLeader) {
16✔
316
      tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
10✔
317
              updateTasks, (numOfTasks - updateTasks));
318
    } else {
319
      tqDebug("vgId:%d closed tasks:%d, unclosed:%d, follower not restart tasks", vgId, updateTasks,
6!
320
              (numOfTasks - updateTasks));
321
    }
322
  } else {
323
    if ((code = streamMetaCommit(pMeta)) < 0) {
20!
324
      // always return true
325
      streamMetaWUnLock(pMeta);
×
326
      taosArrayDestroy(req.pNodeList);
×
327
      return TSDB_CODE_SUCCESS;
×
328
    }
329

330
    streamMetaClearSetUpdateTaskListComplete(pMeta);
20✔
331

332
    if (isLeader) {
20✔
333
      if (!restored) {
14!
334
        tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId);
×
335
      } else {
336
        tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId);
14✔
337
#if 0
338
      taosMSleep(5000);// for test purpose, to trigger the leader election
339
#endif
340
        code = tqStreamTaskStartAsync(pMeta, cb, true);
14✔
341
        if (code) {
14!
342
          tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code));
×
343
        }
344
      }
345
    } else {
346
      tqDebug("vgId:%d follower nodes not restart tasks", vgId);
6!
347
    }
348
  }
349

350
  streamMetaWUnLock(pMeta);
36✔
351
  taosArrayDestroy(req.pNodeList);
36✔
352
  return rsp.code;  // always return true
36✔
353
}
354

355
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
28,617✔
356
  char*   msgStr = pMsg->pCont;
28,617✔
357
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
28,617✔
358
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
28,617✔
359

360
  SStreamDispatchReq req = {0};
28,617✔
361

362
  SDecoder decoder;
363
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
28,617✔
364
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
28,617!
365
    tDecoderClear(&decoder);
×
366
    return TSDB_CODE_MSG_DECODE_ERROR;
×
367
  }
368
  tDecoderClear(&decoder);
28,617✔
369

370
  tqDebug("s-task:0x%x recv dispatch msg from 0x%x(vgId:%d)", req.taskId, req.upstreamTaskId, req.upstreamNodeId);
28,617✔
371

372
  SStreamTask* pTask = NULL;
28,617✔
373
  int32_t      code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
28,617✔
374
  if (pTask && (code == 0)) {
28,617!
375
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
28,608✔
376
    if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0) {
28,608!
377
      return -1;
×
378
    }
379
    tCleanupStreamDispatchReq(&req);
28,608✔
380
    streamMetaReleaseTask(pMeta, pTask);
28,608✔
381
    return 0;
28,608✔
382
  } else {
383
    tqError("vgId:%d failed to find task:0x%x to handle the dispatch req, it may have been destroyed already",
9!
384
            pMeta->vgId, req.taskId);
385

386
    SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
9✔
387
    if (pRspHead == NULL) {
9!
388
      tqError("s-task:0x%x send dispatch error rsp, out of memory", req.taskId);
×
389
      return terrno;
×
390
    }
391

392
    pRspHead->vgId = htonl(req.upstreamNodeId);
9✔
393
    if (pRspHead->vgId == 0) {
9!
394
      tqError("vgId:%d invalid dispatch msg from upstream to task:0x%x", pMeta->vgId, req.taskId);
×
395
      return TSDB_CODE_INVALID_MSG;
×
396
    }
397

398
    SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
9✔
399
    pRsp->streamId = htobe64(req.streamId);
9✔
400
    pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
9✔
401
    pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
9✔
402
    pRsp->downstreamNodeId = htonl(pMeta->vgId);
9✔
403
    pRsp->downstreamTaskId = htonl(req.taskId);
9✔
404
    pRsp->msgId = htonl(req.msgId);
9✔
405
    pRsp->stage = htobe64(req.stage);
9✔
406
    pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;
9✔
407

408
    int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
9✔
409
    SRpcMsg rsp = {.code = TSDB_CODE_STREAM_TASK_NOT_EXIST, .info = pMsg->info, .contLen = len, .pCont = pRspHead};
9✔
410
    tqError("s-task:0x%x send dispatch error rsp, no task", req.taskId);
9!
411

412
    tmsgSendRsp(&rsp);
9✔
413
    tCleanupStreamDispatchReq(&req);
9✔
414

415
    return 0;
9✔
416
  }
417
}
418

419
int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
28,617✔
420
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
28,617✔
421

422
  int32_t vgId = pMeta->vgId;
28,617✔
423
  pRsp->upstreamNodeId = htonl(pRsp->upstreamNodeId);
28,617✔
424
  pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
28,617✔
425
  pRsp->streamId = htobe64(pRsp->streamId);
28,617✔
426
  pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId);
28,617✔
427
  pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId);
28,617✔
428
  pRsp->stage = htobe64(pRsp->stage);
28,617✔
429
  pRsp->msgId = htonl(pRsp->msgId);
28,617✔
430

431
  tqDebug("s-task:0x%x vgId:%d recv dispatch-rsp from 0x%x vgId:%d", pRsp->upstreamTaskId, pRsp->upstreamNodeId,
28,617✔
432
          pRsp->downstreamTaskId, pRsp->downstreamNodeId);
433

434
  SStreamTask* pTask = NULL;
28,617✔
435
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId, &pTask);
28,617✔
436
  if (pTask && (code == 0)) {
28,615!
437
    code = streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
28,607✔
438
    streamMetaReleaseTask(pMeta, pTask);
28,608✔
439
    return code;
28,608✔
440
  } else {
441
    tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId);
8✔
442
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
9✔
443
  }
444
}
445

446
int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
532✔
447
  char*    msgStr = pMsg->pCont;
532✔
448
  char*    msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
532✔
449
  int32_t  msgLen = pMsg->contLen - sizeof(SMsgHead);
532✔
450
  int32_t  code = 0;
532✔
451
  SDecoder decoder;
452

453
  SStreamRetrieveReq req;
454
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
532✔
455
  code = tDecodeStreamRetrieveReq(&decoder, &req);
532✔
456
  tDecoderClear(&decoder);
532✔
457

458
  if (code) {
532!
459
    tqError("vgId:%d failed to decode retrieve msg, discard it", pMeta->vgId);
×
460
    return code;
×
461
  }
462

463
  SStreamTask* pTask = NULL;
532✔
464
  code = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId, &pTask);
532✔
465
  if (pTask == NULL || code != 0) {
532!
466
    tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
×
467
            req.dstTaskId);
468
    tCleanupStreamRetrieveReq(&req);
×
469
    return code;
×
470
  }
471

472
  // enqueue
473
  tqDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d), QID:0x%" PRIx64, pTask->id.idStr,
532✔
474
          pTask->pMeta->vgId, pTask->info.taskLevel, req.srcTaskId, req.srcNodeId, req.reqId);
475

476
  // if task is in ck status, set current ck failed
477
  streamTaskSetCheckpointFailed(pTask);
532✔
478

479
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
532✔
480
    code = streamProcessRetrieveReq(pTask, &req);
526✔
481
  } else {
482
    req.srcNodeId = pTask->info.nodeId;
6✔
483
    req.srcTaskId = pTask->id.taskId;
6✔
484
    code = streamTaskBroadcastRetrieveReq(pTask, &req);
6✔
485
  }
486

487
  if (code != TSDB_CODE_SUCCESS) {  // return error not send rsp manually
531!
488
    tqError("s-task:0x%x vgId:%d failed to process retrieve request from 0x%x, code:%s", req.dstTaskId, req.dstNodeId,
×
489
            req.srcTaskId, tstrerror(code));
490
  } else {  // send rsp manually only on success.
491
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
531✔
492
    streamTaskSendRetrieveRsp(&req, &rsp);
531✔
493
  }
494

495
  streamMetaReleaseTask(pMeta, pTask);
532✔
496
  tCleanupStreamRetrieveReq(&req);
532✔
497

498
  // always return success, to disable the auto rsp
499
  return code;
532✔
500
}
501

502
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
22,752✔
503
  char*   msgStr = pMsg->pCont;
22,752✔
504
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
22,752✔
505
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
22,752✔
506
  int32_t code = 0;
22,752✔
507

508
  SStreamTaskCheckReq req;
509
  SStreamTaskCheckRsp rsp = {0};
22,752✔
510

511
  SDecoder decoder;
512

513
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
22,752✔
514
  code = tDecodeStreamTaskCheckReq(&decoder, &req);
22,752✔
515
  tDecoderClear(&decoder);
22,752✔
516

517
  if (code) {
22,752!
518
    tqError("vgId:%d decode check msg failed, not handle this msg", pMeta->vgId);
×
519
    return code;
×
520
  }
521

522
  streamTaskProcessCheckMsg(pMeta, &req, &rsp);
22,752✔
523
  return streamTaskSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId);
22,752✔
524
}
525

526
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
22,743✔
527
  char*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
22,743✔
528
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
22,743✔
529
  int32_t vgId = pMeta->vgId;
22,743✔
530
  int32_t code = TSDB_CODE_SUCCESS;
22,743✔
531

532
  SStreamTaskCheckRsp rsp;
533

534
  SDecoder decoder;
535
  tDecoderInit(&decoder, (uint8_t*)pReq, len);
22,743✔
536
  code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
22,743✔
537
  if (code < 0) {
22,742!
538
    terrno = TSDB_CODE_INVALID_MSG;
×
539
    tDecoderClear(&decoder);
×
540
    tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno));
×
541
    return -1;
×
542
  }
543

544
  tDecoderClear(&decoder);
22,742✔
545
  tqDebug("tq task:0x%x (vgId:%d) recv check rsp(QID:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId,
22,743✔
546
          rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
547

548
  if (!isLeader) {
22,743!
549
    tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
×
550
            rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
551
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
×
552
  }
553

554
  SStreamTask* pTask = NULL;
22,743✔
555
  code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId, &pTask);
22,743✔
556
  if ((pTask == NULL) || (code != 0)) {
22,745!
557
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
7✔
558
  }
559

560
  code = streamTaskProcessCheckRsp(pTask, &rsp);
22,738✔
561
  streamMetaReleaseTask(pMeta, pTask);
22,738✔
562
  return code;
22,738✔
563
}
564

565
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
6,585✔
566
  int32_t vgId = pMeta->vgId;
6,585✔
567
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
6,585✔
568
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
6,585✔
569
  int32_t code = 0;
6,585✔
570

571
  SStreamCheckpointReadyMsg req = {0};
6,585✔
572

573
  SDecoder decoder;
574
  tDecoderInit(&decoder, (uint8_t*)msg, len);
6,585✔
575
  if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) {
6,585!
576
    code = TSDB_CODE_MSG_DECODE_ERROR;
×
577
    tDecoderClear(&decoder);
×
578
    return code;
×
579
  }
580
  tDecoderClear(&decoder);
6,585✔
581

582
  SStreamTask* pTask = NULL;
6,585✔
583
  code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
6,585✔
584
  if (code != 0) {
6,585✔
585
    tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId);
1!
586
    return code;
1✔
587
  }
588

589
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
6,584!
590
    tqDebug("vgId:%d s-task:%s recv invalid the checkpoint-ready msg from task:0x%x (vgId:%d), discard", vgId,
×
591
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
592
    streamMetaReleaseTask(pMeta, pTask);
×
593
    return TSDB_CODE_INVALID_MSG;
×
594
  } else {
595
    tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId,
6,584✔
596
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
597
  }
598

599
  code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamNodeId, req.downstreamTaskId);
6,584✔
600
  streamMetaReleaseTask(pMeta, pTask);
6,584✔
601
  if (code) {
6,584!
602
    return code;
×
603
  }
604

605
  {  // send checkpoint ready rsp
606
    SMStreamCheckpointReadyRspMsg* pReadyRsp = rpcMallocCont(sizeof(SMStreamCheckpointReadyRspMsg));
6,584✔
607
    if (pReadyRsp == NULL) {
6,584!
608
      return terrno;
×
609
    }
610

611
    pReadyRsp->upstreamTaskId = req.upstreamTaskId;
6,584✔
612
    pReadyRsp->upstreamNodeId = req.upstreamNodeId;
6,584✔
613
    pReadyRsp->downstreamTaskId = req.downstreamTaskId;
6,584✔
614
    pReadyRsp->downstreamNodeId = req.downstreamNodeId;
6,584✔
615
    pReadyRsp->checkpointId = req.checkpointId;
6,584✔
616
    pReadyRsp->streamId = req.streamId;
6,584✔
617
    pReadyRsp->head.vgId = htonl(req.downstreamNodeId);
6,584✔
618

619
    SRpcMsg rsp = {.code = 0, .info = pMsg->info, .pCont = pReadyRsp, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)};
6,584✔
620
    tmsgSendRsp(&rsp);
6,584✔
621

622
    pMsg->info.handle = NULL;  // disable auto rsp
6,584✔
623
  }
624

625
  return code;
6,584✔
626
}
627

628
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
13,909✔
629
                                     bool isLeader, bool restored) {
630
  int32_t code = 0;
13,909✔
631
  int32_t vgId = pMeta->vgId;
13,909✔
632
  int32_t numOfTasks = 0;
13,909✔
633
  int32_t taskId = -1;
13,909✔
634
  int64_t streamId = -1;
13,909✔
635
  bool    added = false;
13,909✔
636
  int32_t size = sizeof(SStreamTask);
13,909✔
637

638
  if (tsDisableStream) {
13,909!
639
    tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
×
640
    return code;
×
641
  }
642

643
  tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
13,909✔
644

645
  // 1.deserialize msg and build task
646
  SStreamTask* pTask = taosMemoryCalloc(1, size);
13,909!
647
  if (pTask == NULL) {
13,916!
648
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);
×
649
    return terrno;
×
650
  }
651

652
  SDecoder decoder;
653
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
13,916✔
654
  code = tDecodeStreamTask(&decoder, pTask);
13,919✔
655
  tDecoderClear(&decoder);
13,908✔
656

657
  if (code != TSDB_CODE_SUCCESS) {
13,910!
658
    taosMemoryFree(pTask);
×
659
    return TSDB_CODE_INVALID_MSG;
×
660
  }
661

662
  // 2.save task, use the latest commit version as the initial start version of stream task.
663
  taskId = pTask->id.taskId;
13,910✔
664
  streamId = pTask->id.streamId;
13,910✔
665

666
  streamMetaWLock(pMeta);
13,910✔
667
  code = streamMetaRegisterTask(pMeta, sversion, pTask, &added);
13,919✔
668
  numOfTasks = streamMetaGetNumOfTasks(pMeta);
13,927✔
669
  streamMetaWUnLock(pMeta);
13,927✔
670

671
  if (code < 0) {
13,927!
672
    tqError("vgId:%d failed to register s-task:0x%x into meta, existed tasks:%d, code:%s", vgId, taskId, numOfTasks,
×
673
            tstrerror(code));
674
    return code;
×
675
  }
676

677
  // added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if
678
  // it is added into the meta store
679
  if (added) {
13,927!
680
    // only handled in the leader node
681
    if (isLeader) {
13,927✔
682
      tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
13,875✔
683

684
      if (restored) {
13,875!
685
        SStreamTask* p = NULL;
13,875✔
686
        code = streamMetaAcquireTask(pMeta, streamId, taskId, &p);
13,875✔
687
        if ((p != NULL) && (code == 0) && (p->info.fillHistory == 0)) {
13,875!
688
          code = tqStreamStartOneTaskAsync(pMeta, cb, streamId, taskId);
9,113✔
689
        }
690

691
        if (p != NULL) {
13,875!
692
          streamMetaReleaseTask(pMeta, p);
13,875✔
693
        }
694
      } else {
695
        tqWarn("s-task:0x%x not launched since vnode(vgId:%d) not ready", taskId, vgId);
×
696
      }
697

698
    } else {
699
      tqDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId);
52!
700
    }
701
  } else {
702
    tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store, total:%d", vgId, taskId, numOfTasks);
×
703
  }
704

705
  return code;
13,927✔
706
}
707

708
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
6,969✔
709
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
6,969✔
710
  int32_t              code = 0;
6,969✔
711
  int32_t              vgId = pMeta->vgId;
6,969✔
712
  STaskId              hTaskId = {0};
6,969✔
713
  SStreamTask*         pTask = NULL;
6,969✔
714

715
  tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
6,969✔
716

717
  streamMetaWLock(pMeta);
6,969✔
718

719
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
6,983✔
720
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
6,983✔
721
  if (code == 0) {
6,978✔
722
    if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
6,977✔
723
      hTaskId.streamId = pTask->hTaskInfo.id.streamId;
1,332✔
724
      hTaskId.taskId = pTask->hTaskInfo.id.taskId;
1,332✔
725
    }
726

727
    // clear the relationship, and then release the stream tasks, to avoid invalid accessing of already freed
728
    // related stream(history) task
729
    streamTaskSetRemoveBackendFiles(pTask);
6,977✔
730
    code = streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt);
6,973✔
731
    streamMetaReleaseTask(pMeta, pTask);
6,982✔
732

733
    if (code) {
6,984✔
734
      tqError("s-task:0x%x failed to clear related fill-history info, still exists", pReq->taskId);
9!
735
    }
736
  }
737

738
  // drop the related fill-history task firstly
739
  if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
6,985✔
740
    tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
1,279✔
741
    code = streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
1,279✔
742
    if (code) {
1,279!
743
      tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x failed", pReq->taskId, vgId,
×
744
              (int32_t)hTaskId.taskId);
745
    }
746
  }
747

748
  // drop the stream task now
749
  code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
6,985✔
750
  if (code) {
6,979!
751
    tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId);
×
752
  }
753

754
  // commit the update
755
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
6,979✔
756
  tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
6,983✔
757

758
  if (streamMetaCommit(pMeta) < 0) {
6,983✔
759
    // persist to disk
760
  }
761

762
  streamMetaWUnLock(pMeta);
6,987✔
763
  tqDebug("vgId:%d process drop task:0x%x completed", vgId, pReq->taskId);
6,987✔
764

765
  return 0;  // always return success
6,987✔
766
}
767

768
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg) {
3,477✔
769
  SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
3,477✔
770
  int32_t                    code = 0;
3,477✔
771
  int32_t                    vgId = pMeta->vgId;
3,477✔
772
  SStreamTask*               pTask = NULL;
3,477✔
773

774
  tqDebug("vgId:%d receive msg to update-checkpoint-info for s-task:0x%x", vgId, pReq->taskId);
3,477✔
775

776
  streamMetaWLock(pMeta);
3,477✔
777

778
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
3,482✔
779
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
3,482✔
780
  if (code == 0) {
3,481✔
781
    code = streamTaskUpdateTaskCheckpointInfo(pTask, restored, pReq);
3,478✔
782
    streamMetaReleaseTask(pMeta, pTask);
3,479✔
783
  } else {  // failed to get the task.
784
    int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
3✔
785
    tqError(
3!
786
        "vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, numOfTasks:%d, it may have been "
787
        "dropped already",
788
        vgId, pReq->taskId, numOfTasks);
789
  }
790

791
  streamMetaWUnLock(pMeta);
3,482✔
792
  // always return success when handling the requirement issued by mnode during transaction.
793
  return TSDB_CODE_SUCCESS;
3,482✔
794
}
795

796
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
14✔
797
  int32_t vgId = pMeta->vgId;
14✔
798
  int32_t code = 0;
14✔
799
  int64_t st = taosGetTimestampMs();
14✔
800

801
  streamMetaWLock(pMeta);
14✔
802
  if (pMeta->startInfo.startAllTasks == 1) {
14!
803
    pMeta->startInfo.restartCount += 1;
×
804
    tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
×
805
            pMeta->startInfo.restartCount);
806
    streamMetaWUnLock(pMeta);
×
807
    return TSDB_CODE_SUCCESS;
×
808
  }
809

810
  pMeta->startInfo.startAllTasks = 1;
14✔
811
  streamMetaWUnLock(pMeta);
14✔
812

813
  terrno = 0;
14✔
814
  tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d, ts:%" PRId64, vgId,
14!
815
         pMeta->updateInfo.completeTransId, pMeta->updateInfo.completeTs);
816

817
  streamMetaWLock(pMeta);
14✔
818
  streamMetaClear(pMeta);
14✔
819

820
  int64_t el = taosGetTimestampMs() - st;
14✔
821
  tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.);
14!
822

823
  streamMetaLoadAllTasks(pMeta);
14✔
824

825
  {
826
    STaskStartInfo* pStartInfo = &pMeta->startInfo;
14✔
827
    taosHashClear(pStartInfo->pReadyTaskSet);
14✔
828
    taosHashClear(pStartInfo->pFailedTaskSet);
14✔
829
    pStartInfo->readyTs = 0;
14✔
830
  }
831

832
  if (isLeader && !tsDisableStream) {
14!
833
    streamMetaWUnLock(pMeta);
14✔
834
    code = streamMetaStartAllTasks(pMeta);
14✔
835
  } else {
836
    streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
×
837
    pMeta->startInfo.restartCount = 0;
×
838
    streamMetaWUnLock(pMeta);
×
839
    tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId);
×
840
  }
841

842
  code = terrno;
14✔
843
  return code;
14✔
844
}
845

846
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
79,917✔
847
  int32_t  code = 0;
79,917✔
848
  int32_t  vgId = pMeta->vgId;
79,917✔
849
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
79,917✔
850
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
79,917✔
851
  SDecoder decoder;
852

853
  SStreamTaskRunReq req = {0};
79,917✔
854
  tDecoderInit(&decoder, (uint8_t*)msg, len);
79,917✔
855
  if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
80,043!
856
    tqError("vgId:%d failed to decode task run req, code:%s", pMeta->vgId, tstrerror(code));
×
857
    tDecoderClear(&decoder);
×
858
    return TSDB_CODE_SUCCESS;
×
859
  }
860

861
  tDecoderClear(&decoder);
80,030✔
862

863
  int32_t type = req.reqType;
80,045✔
864
  if (type == STREAM_EXEC_T_START_ONE_TASK) {
80,045✔
865
    code = streamMetaStartOneTask(pMeta, req.streamId, req.taskId);
9,288✔
866
    return 0;
9,287✔
867
  } else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
70,757✔
868
    code = streamMetaStartAllTasks(pMeta);
9,231✔
869
    return 0;
9,229✔
870
  } else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) {
61,526✔
871
    code = restartStreamTasks(pMeta, isLeader);
14✔
872
    return 0;
14✔
873
  } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
61,512✔
874
    code = streamMetaStopAllTasks(pMeta);
4,484✔
875
    return 0;
4,487✔
876
  } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
57,028✔
877
    code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
2✔
878
    return code;
2✔
879
  } else if (type == STREAM_EXEC_T_STOP_ONE_TASK) {
57,026!
880
    code = streamMetaStopOneTask(pMeta, req.streamId, req.taskId);
×
881
    return code;
×
882
  } else if (type == STREAM_EXEC_T_RESUME_TASK) {  // task resume to run after idle for a while
57,026✔
883
    SStreamTask* pTask = NULL;
4,494✔
884
    code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
4,494✔
885

886
    if (pTask != NULL && (code == 0)) {
4,492✔
887
      char* pStatus = NULL;
4,484✔
888
      if (streamTaskReadyToRun(pTask, &pStatus)) {
4,484!
889
        int64_t execTs = pTask->status.lastExecTs;
4,488✔
890
        int32_t idle = taosGetTimestampMs() - execTs;
4,489✔
891
        tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs);
4,489✔
892

893
        code = streamResumeTask(pTask);
4,489✔
894
      } else {
895
        int8_t status = streamTaskSetSchedStatusInactive(pTask);
×
896
        tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
×
897
                pTask->id.idStr, pStatus, status);
898
      }
899
      streamMetaReleaseTask(pMeta, pTask);
4,495✔
900
    }
901

902
    return code;
4,503✔
903
  }
904

905
  SStreamTask* pTask = NULL;
52,532✔
906
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
52,532✔
907
  if ((pTask != NULL) && (code == 0)) {  // even in halt status, the data in inputQ must be processed
52,508!
908
    char* p = NULL;
52,510✔
909
    if (streamTaskReadyToRun(pTask, &p)) {
52,510✔
910
      tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId,
52,475✔
911
              pTask->id.idStr, p, pTask->chkInfo.nextProcessVer);
912
      (void)streamExecTask(pTask);
52,475✔
913
    } else {
914
      int8_t status = streamTaskSetSchedStatusInactive(pTask);
22✔
915
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
26!
916
              pTask->id.idStr, p, status);
917
    }
918

919
    streamMetaReleaseTask(pMeta, pTask);
52,490✔
920
    return 0;
52,515✔
921
  } else {  // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
922
    // todo add one function to handle this
923
    tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, req.taskId);
×
924
    return code;
12✔
925
  }
926
}
927

928
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
61✔
929
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
61✔
930
  int32_t         vgId = pMeta->vgId;
61✔
931
  bool            scanWal = false;
61✔
932
  int32_t         code = 0;
61✔
933

934
  streamMetaWLock(pMeta);
61✔
935
  if (pStartInfo->startAllTasks == 1) {
61!
936
    tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId,
×
937
            pMeta->startInfo.restartCount);
938
  } else {  // not in starting procedure
939
    bool allReady = streamMetaAllTasksReady(pMeta);
61✔
940

941
    if ((pStartInfo->restartCount > 0) && (!allReady)) {
61!
942
      // if all tasks are ready now, do NOT restart again, and reset the value of pStartInfo->restartCount
943
      pStartInfo->restartCount -= 1;
×
944
      tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", vgId, pMeta->role,
×
945
              pStartInfo->restartCount);
946
      streamMetaWUnLock(pMeta);
×
947

948
      return restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
×
949
    } else {
950
      if (pStartInfo->restartCount == 0) {
61!
951
        tqDebug("vgId:%d start all tasks completed in callbackFn, restartCounter is 0", pMeta->vgId);
61✔
952
      } else if (allReady) {
×
953
        pStartInfo->restartCount = 0;
×
954
        tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
×
955
      }
956

957
      scanWal = true;
61✔
958
    }
959
  }
960

961
  streamMetaWUnLock(pMeta);
61✔
962

963
  return code;
61✔
964
}
965

966
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
×
967
  SVResetStreamTaskReq* pReq = (SVResetStreamTaskReq*)pMsg;
×
968

969
  SStreamTask* pTask = NULL;
×
970
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
×
971
  if (pTask == NULL || (code != 0)) {
×
972
    tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already",
×
973
            pMeta->vgId, pReq->taskId);
974
    return TSDB_CODE_SUCCESS;
×
975
  }
976

977
  tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
×
978

979
  streamMutexLock(&pTask->lock);
×
980

981
  streamTaskSetFailedCheckpointId(pTask, pReq->chkptId);
×
982
  streamTaskClearCheckInfo(pTask, true);
×
983

984
  // clear flag set during do checkpoint, and open inputQ for all upstream tasks
985
  SStreamTaskState pState = streamTaskGetStatus(pTask);
×
986
  if (pState.state == TASK_STATUS__CK) {
×
987
    streamTaskSetStatusReady(pTask);
×
988
    tqDebug("s-task:%s reset checkpoint status to ready", pTask->id.idStr);
×
989
  } else if (pState.state == TASK_STATUS__UNINIT) {
×
990
    //    tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
991
    //    tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
992
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
993
  } else {
994
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
995
  }
996

997
  streamMutexUnlock(&pTask->lock);
×
998

999
  streamMetaReleaseTask(pMeta, pTask);
×
1000
  return TSDB_CODE_SUCCESS;
×
1001
}
1002

1003
int32_t tqStreamTaskProcessAllTaskStopReq(SStreamMeta* pMeta, SMsgCb* pMsgCb, SRpcMsg* pMsg) {
3,492✔
1004
  int32_t  code = 0;
3,492✔
1005
  int32_t  vgId = pMeta->vgId;
3,492✔
1006
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
3,492✔
1007
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
3,492✔
1008
  SDecoder decoder;
1009

1010
  SStreamTaskStopReq req = {0};
3,492✔
1011
  tDecoderInit(&decoder, (uint8_t*)msg, len);
3,492✔
1012
  if ((code = tDecodeStreamTaskStopReq(&decoder, &req)) < 0) {
3,531!
1013
    tqError("vgId:%d failed to decode stop all streams, code:%s", pMeta->vgId, tstrerror(code));
×
1014
    tDecoderClear(&decoder);
×
1015
    return TSDB_CODE_SUCCESS;
×
1016
  }
1017

1018
  tDecoderClear(&decoder);
3,541✔
1019

1020
  // stop all stream tasks, only invoked when trying to drop db
1021
  if (req.streamId <= 0) {
3,537!
1022
    tqDebug("vgId:%d recv msg to stop all tasks in sync before dropping vnode", vgId);
3,540✔
1023
    code = streamMetaStopAllTasks(pMeta);
3,542✔
1024
    if (code) {
3,538!
1025
      tqError("vgId:%d failed to stop all tasks, code:%s", vgId, tstrerror(code));
×
1026
    }
1027

1028
  } else {  // stop only one stream tasks
1029

1030
  }
1031

1032
  // always return success
1033
  return TSDB_CODE_SUCCESS;
3,537✔
1034
}
1035

1036
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
1037
  SRetrieveChkptTriggerReq req = {0};
×
1038
  SStreamTask*             pTask = NULL;
×
1039
  char*                    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1040
  int32_t                  len = pMsg->contLen - sizeof(SMsgHead);
×
1041
  SDecoder                 decoder = {0};
×
1042

1043
  tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1044
  if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
×
1045
    tDecoderClear(&decoder);
×
1046
    tqError("vgId:%d invalid retrieve checkpoint-trigger req received", pMeta->vgId);
×
1047
    return TSDB_CODE_INVALID_MSG;
×
1048
  }
1049
  tDecoderClear(&decoder);
×
1050

1051
  int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
×
1052
  if (pTask == NULL || (code != 0)) {
×
1053
    tqError("vgId:%d process retrieve checkpoint-trigger, checkpointId:%" PRId64
×
1054
            " from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already",
1055
            pMeta->vgId, req.checkpointId, (int32_t)req.downstreamTaskId, req.upstreamTaskId);
1056
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
1057
  }
1058

1059
  tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64,
×
1060
          req.upstreamTaskId, (int32_t)req.downstreamTaskId, req.checkpointId);
1061

1062
  if (pTask->status.downstreamReady != 1) {
×
1063
    tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
×
1064
            pTask->id.idStr, (int32_t)req.downstreamTaskId);
1065

1066
    code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1067
                                              TSDB_CODE_STREAM_TASK_IVLD_STATUS);
1068
    streamMetaReleaseTask(pMeta, pTask);
×
1069
    return code;
×
1070
  }
1071

1072
  SStreamTaskState pState = streamTaskGetStatus(pTask);
×
1073
  if (pState.state == TASK_STATUS__CK) {  // recv the checkpoint-source/trigger already
×
1074
    int32_t transId = 0;
×
1075
    int64_t checkpointId = 0;
×
1076

1077
    streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId);
×
1078
    if (checkpointId != req.checkpointId) {
×
1079
      tqError("s-task:%s invalid checkpoint-trigger retrieve msg from 0x%" PRIx64 ", current checkpointId:%" PRId64
×
1080
              " req:%" PRId64,
1081
              pTask->id.idStr, req.downstreamTaskId, checkpointId, req.checkpointId);
1082
      streamMetaReleaseTask(pMeta, pTask);
×
1083
      return TSDB_CODE_INVALID_MSG;
×
1084
    }
1085

1086
    if (streamTaskAlreadySendTrigger(pTask, req.downstreamNodeId)) {
×
1087
      // re-send the lost checkpoint-trigger msg to downstream task
1088
      tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
×
1089
              (int32_t)req.downstreamTaskId, checkpointId, transId);
1090
      code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1091
                                                TSDB_CODE_SUCCESS);
1092
    } else {  // not send checkpoint-trigger yet, wait
1093
      int32_t recv = 0, total = 0;
×
1094
      streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
×
1095

1096
      if (recv == total) {  // add the ts info
×
1097
        tqWarn("s-task:%s all upstream send checkpoint-source/trigger, but not processed yet, wait", pTask->id.idStr);
×
1098
      } else {
1099
        tqWarn(
×
1100
            "s-task:%s not all upstream send checkpoint-source/trigger, total recv:%d/%d, wait for all upstream "
1101
            "sending checkpoint-source/trigger",
1102
            pTask->id.idStr, recv, total);
1103
      }
1104
      code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1105
                                                TSDB_CODE_ACTION_IN_PROGRESS);
1106
    }
1107
  } else {  // upstream not recv the checkpoint-source/trigger till now
1108
    if (!(pState.state == TASK_STATUS__READY || pState.state == TASK_STATUS__HALT)) {
×
1109
      tqFatal("s-task:%s invalid task status:%s", pTask->id.idStr, pState.name);
×
1110
    }
1111

1112
    tqWarn(
×
1113
        "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
1114
        "upstream sending checkpoint-source/trigger",
1115
        pTask->id.idStr);
1116
    code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1117
                                              TSDB_CODE_ACTION_IN_PROGRESS);
1118
  }
1119

1120
  streamMetaReleaseTask(pMeta, pTask);
×
1121
  return code;
×
1122
}
1123

1124
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
1125
  SCheckpointTriggerRsp rsp = {0};
×
1126
  SStreamTask*          pTask = NULL;
×
1127
  char*                 msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1128
  int32_t               len = pMsg->contLen - sizeof(SMsgHead);
×
1129
  SDecoder              decoder = {0};
×
1130

1131
  tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1132
  if (tDecodeCheckpointTriggerRsp(&decoder, &rsp) < 0) {
×
1133
    tDecoderClear(&decoder);
×
1134
    tqError("vgId:%d invalid retrieve checkpoint-trigger rsp received", pMeta->vgId);
×
1135
    return TSDB_CODE_INVALID_MSG;
×
1136
  }
1137
  tDecoderClear(&decoder);
×
1138

1139
  int32_t code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.taskId, &pTask);
×
1140
  if (pTask == NULL || (code != 0)) {
×
1141
    tqError(
×
1142
        "vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already",
1143
        pMeta->vgId, rsp.taskId);
1144
    return code;
×
1145
  }
1146

1147
  tqDebug(
×
1148
      "s-task:%s recv re-send checkpoint-trigger msg through retrieve/rsp channel, upstream:0x%x, checkpointId:%" PRId64
1149
      ", transId:%d",
1150
      pTask->id.idStr, rsp.upstreamTaskId, rsp.checkpointId, rsp.transId);
1151

1152
  code = streamTaskProcessCheckpointTriggerRsp(pTask, &rsp);
×
1153
  streamMetaReleaseTask(pMeta, pTask);
×
1154
  return code;
×
1155
}
1156

1157
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
1,400✔
1158
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
1,400✔
1159

1160
  SStreamTask* pTask = NULL;
1,400✔
1161
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
1,400✔
1162
  if (pTask == NULL || (code != 0)) {
1,409!
1163
    tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
4!
1164
            pReq->taskId);
1165
    // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
1166
    return TSDB_CODE_SUCCESS;
×
1167
  }
1168

1169
  tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
1,405✔
1170
  streamTaskPause(pTask);
1,405✔
1171

1172
  SStreamTask* pHistoryTask = NULL;
1,414✔
1173
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
1,414✔
1174
    pHistoryTask = NULL;
54✔
1175
    code = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId, &pHistoryTask);
54✔
1176
    if (pHistoryTask == NULL || (code != 0)) {
54!
1177
      tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
×
1178
              ", it may have been dropped already",
1179
              pMeta->vgId, pTask->hTaskInfo.id.taskId);
1180
      streamMetaReleaseTask(pMeta, pTask);
×
1181

1182
      // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
1183
      return TSDB_CODE_SUCCESS;
×
1184
    }
1185

1186
    tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
54!
1187

1188
    streamTaskPause(pHistoryTask);
54✔
1189
    streamMetaReleaseTask(pMeta, pHistoryTask);
54✔
1190
  }
1191

1192
  streamMetaReleaseTask(pMeta, pTask);
1,414✔
1193
  return TSDB_CODE_SUCCESS;
1,414✔
1194
}
1195

1196
static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t sversion, int8_t igUntreated,
2,633✔
1197
                                       bool fromVnode) {
1198
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
2,633✔
1199
  int32_t      vgId = pMeta->vgId;
2,633✔
1200
  int32_t      code = 0;
2,633✔
1201

1202
  streamTaskResume(pTask);
2,633✔
1203
  ETaskStatus status = streamTaskGetStatus(pTask).state;
2,649✔
1204

1205
  int32_t level = pTask->info.taskLevel;
2,649✔
1206
  if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
2,649!
1207
    // no lock needs to secure the access of the version
1208
    if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
2,649!
1209
      // discard all the data  when the stream task is suspended.
1210
      walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
589✔
1211
      tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
589✔
1212
              ", schedStatus:%d",
1213
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1214
    } else {  // from the previous paused version and go on
1215
      tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
2,060✔
1216
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1217
    }
1218

1219
    if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
2,649✔
1220
      pTask->hTaskInfo.operatorOpen = false;
26✔
1221
      code = streamStartScanHistoryAsync(pTask, igUntreated);
26✔
1222
    } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
2,623✔
1223
//      code = tqScanWalAsync((STQ*)handle, false);
1224
    } else {
1225
      code = streamTrySchedExec(pTask, false);
1,331✔
1226
    }
1227
  }
1228

1229
  return code;
2,649✔
1230
}
1231

1232
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode) {
2,564✔
1233
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
2,564✔
1234

1235
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
2,564✔
1236

1237
  SStreamTask* pTask = NULL;
2,564✔
1238
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
2,564✔
1239
  if (pTask == NULL || (code != 0)) {
2,577!
1240
    tqError("s-task:0x%x failed to acquire task to resume, it may have been dropped or stopped", pReq->taskId);
3!
1241
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1242
  }
1243

1244
  streamMutexLock(&pTask->lock);
2,574✔
1245
  SStreamTaskState pState = streamTaskGetStatus(pTask);
2,578✔
1246
  tqDebug("s-task:%s start to resume from paused, current status:%s", pTask->id.idStr, pState.name);
2,577✔
1247
  streamMutexUnlock(&pTask->lock);
2,577✔
1248

1249
  code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
2,580✔
1250
  if (code != 0) {
2,583!
1251
    streamMetaReleaseTask(pMeta, pTask);
×
1252
    return code;
×
1253
  }
1254

1255
  STaskId*     pHTaskId = &pTask->hTaskInfo.id;
2,583✔
1256
  SStreamTask* pHTask = NULL;
2,583✔
1257
  code = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
2,583✔
1258
  if (pHTask && (code == 0)) {
2,583!
1259
    streamMutexLock(&pHTask->lock);
66✔
1260
    SStreamTaskState p = streamTaskGetStatus(pHTask);
66✔
1261
    tqDebug("s-task:%s related history task start to resume from paused, current status:%s", pHTask->id.idStr, p.name);
66!
1262
    streamMutexUnlock(&pHTask->lock);
66✔
1263

1264
    code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode);
66✔
1265
    tqDebug("s-task:%s resume complete, code:%s", pHTask->id.idStr, tstrerror(code));
66!
1266

1267
    streamMetaReleaseTask(pMeta, pHTask);
66✔
1268
  }
1269

1270
  streamMetaReleaseTask(pMeta, pTask);
2,583✔
1271
  return TSDB_CODE_SUCCESS;
2,581✔
1272
}
1273

1274
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); }
×
1275

1276
int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) {
8,582✔
1277
  rpcFreeCont(pMsg->pCont);
8,582✔
1278
  pMsg->pCont = NULL;
8,582✔
1279
  return TSDB_CODE_SUCCESS;
8,582✔
1280
}
1281

1282
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
7,528✔
1283
  SMStreamHbRspMsg rsp = {0};
7,528✔
1284
  int32_t          code = 0;
7,528✔
1285
  SDecoder         decoder;
1286
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
7,528✔
1287
  int32_t          len = pMsg->contLen - sizeof(SMsgHead);
7,528✔
1288

1289
  tDecoderInit(&decoder, (uint8_t*)msg, len);
7,528✔
1290
  code = tDecodeStreamHbRsp(&decoder, &rsp);
7,528✔
1291
  if (code < 0) {
7,528!
1292
    terrno = TSDB_CODE_INVALID_MSG;
×
1293
    tDecoderClear(&decoder);
×
1294
    tqError("vgId:%d failed to parse hb rsp msg, code:%s", pMeta->vgId, tstrerror(terrno));
×
1295
    return terrno;
×
1296
  }
1297

1298
  tDecoderClear(&decoder);
7,528✔
1299
  return streamProcessHeartbeatRsp(pMeta, &rsp);
7,528✔
1300
}
1301

1302
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
4,338✔
1303

1304
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
4,244✔
1305

1306
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
6,584✔
1307
  SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont;
6,584✔
1308

1309
  SStreamTask* pTask = NULL;
6,584✔
1310
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->downstreamTaskId, &pTask);
6,584✔
1311
  if (pTask == NULL || (code != 0)) {
6,584!
1312
    tqError("vgId:%d failed to acquire task:0x%x when handling checkpoint-ready msg, it may have been dropped",
2!
1313
            pRsp->downstreamNodeId, pRsp->downstreamTaskId);
1314
    return code;
2✔
1315
  }
1316

1317
  code = streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId);
6,582✔
1318
  streamMetaReleaseTask(pMeta, pTask);
6,582✔
1319
  return code;
6,582✔
1320
}
1321

1322
int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
179✔
1323
  int32_t                vgId = pMeta->vgId;
179✔
1324
  int32_t                code = 0;
179✔
1325
  SStreamTask*           pTask = NULL;
179✔
1326
  char*                  msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
179✔
1327
  int32_t                len = pMsg->contLen - sizeof(SMsgHead);
179✔
1328
  int64_t                now = taosGetTimestampMs();
179✔
1329
  SDecoder               decoder;
1330
  SRestoreCheckpointInfo req = {0};
179✔
1331

1332
  tDecoderInit(&decoder, (uint8_t*)msg, len);
179✔
1333
  if ((code = tDecodeRestoreCheckpointInfo(&decoder, &req)) < 0) {
179!
1334
    tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code));
×
1335
    tDecoderClear(&decoder);
×
1336
    return TSDB_CODE_SUCCESS;
×
1337
  }
1338

1339
  tDecoderClear(&decoder);
179✔
1340

1341
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
179✔
1342
  if (pTask == NULL || (code != 0)) {
179!
1343
    tqError("vgId:%d process consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
×
1344
            pMeta->vgId, req.taskId);
1345
    // ignore this code to avoid error code over write
1346
    int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
×
1347
    if (ret) {
×
1348
      tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret));
×
1349
    }
1350

1351
    return 0;
×
1352
  }
1353

1354
  // discard the rsp, since it is expired.
1355
  if (req.startTs < pTask->execInfo.created) {
179✔
1356
    tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64
4!
1357
           " from task createTs:%" PRId64 " < task createTs:%" PRId64 ", discard",
1358
           pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs,
1359
           pTask->execInfo.created);
1360
    streamMetaAddFailedTaskSelf(pTask, now);
4✔
1361
    streamMetaReleaseTask(pMeta, pTask);
4✔
1362
    return TSDB_CODE_SUCCESS;
4✔
1363
  }
1364

1365
  tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64 " from mnode",
175✔
1366
          pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId);
1367

1368
  streamMutexLock(&pTask->lock);
175✔
1369
  if (pTask->chkInfo.checkpointId < req.checkpointId) {
175!
1370
    tqFatal("s-task:%s vgId:%d invalid consensus-checkpointId:%" PRId64 ", greater than existed checkpointId:%" PRId64,
×
1371
            pTask->id.idStr, vgId, req.checkpointId, pTask->chkInfo.checkpointId);
1372

1373
    streamMutexUnlock(&pTask->lock);
×
1374
    streamMetaReleaseTask(pMeta, pTask);
×
1375
    return 0;
×
1376
  }
1377

1378
  SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo;
175✔
1379
  if (pConsenInfo->consenChkptTransId >= req.transId) {
175!
1380
    tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
×
1381
            pConsenInfo->consenChkptTransId, req.transId);
1382
    streamMutexUnlock(&pTask->lock);
×
1383
    streamMetaReleaseTask(pMeta, pTask);
×
1384
    return TSDB_CODE_SUCCESS;
×
1385
  }
1386

1387
  if (pTask->chkInfo.checkpointId != req.checkpointId) {
175!
1388
    tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64 " transId:%d", pTask->id.idStr, vgId,
×
1389
            pTask->chkInfo.checkpointId, req.checkpointId, req.transId);
1390
    pTask->chkInfo.checkpointId = req.checkpointId;
×
1391
    tqSetRestoreVersionInfo(pTask);
×
1392
  } else {
1393
    tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current id, transId:%d not update",
175✔
1394
            pTask->id.idStr, vgId, req.checkpointId, req.transId);
1395
  }
1396

1397
  streamTaskSetConsenChkptIdRecv(pTask, req.transId, now);
175✔
1398
  streamMutexUnlock(&pTask->lock);
175✔
1399

1400
  if (pMeta->role == NODE_ROLE_LEADER) {
175!
1401
    code = tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
175✔
1402
    if (code) {
175!
1403
      tqError("s-task:0x%x vgId:%d failed start task async, code:%s", req.taskId, vgId, tstrerror(code));
×
1404
    }
1405
  } else {
1406
    tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr);
×
1407
  }
1408

1409
  streamMetaReleaseTask(pMeta, pTask);
175✔
1410
  return 0;
175✔
1411
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc