• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3629

04 Mar 2025 01:45PM UTC coverage: 63.692% (-0.1%) from 63.79%
#3629

push

travis-ci

web-flow
Merge pull request #30007 from taosdata/revert-29951-docs/update-exception-handling-strategy

Revert "docs: update exception handling strategy"

149369 of 300378 branches covered (49.73%)

Branch coverage included in aggregate %.

233614 of 300930 relevant lines covered (77.63%)

18792670.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.62
/source/dnode/vnode/src/tqCommon/tqCommon.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsgcb.h"
17
#include "tq.h"
18
#include "tstream.h"
19

20
typedef struct SMStreamCheckpointReadyRspMsg {
21
  SMsgHead head;
22
  int64_t  streamId;
23
  int32_t  upstreamTaskId;
24
  int32_t  upstreamNodeId;
25
  int32_t  downstreamTaskId;
26
  int32_t  downstreamNodeId;
27
  int64_t  checkpointId;
28
  int32_t  transId;
29
} SMStreamCheckpointReadyRspMsg;
30

31
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
32

33
int32_t tqExpandStreamTask(SStreamTask* pTask) {
14,064✔
34
  SStreamMeta* pMeta = pTask->pMeta;
14,064✔
35
  int32_t      vgId = pMeta->vgId;
14,064✔
36
  int64_t      st = taosGetTimestampMs();
14,064✔
37
  int64_t      streamId = 0;
14,064✔
38
  int32_t      taskId = 0;
14,064✔
39
  int32_t      code = 0;
14,064✔
40

41
  tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId);
14,064✔
42

43
  if (pTask->info.fillHistory) {
14,064✔
44
    streamId = pTask->streamTaskId.streamId;
4,816✔
45
    taskId = pTask->streamTaskId.taskId;
4,816✔
46
  } else {
47
    streamId = pTask->id.streamId;
9,248✔
48
    taskId = pTask->id.taskId;
9,248✔
49
  }
50

51
  // sink task does not need the pState
52
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
14,064✔
53
    pTask->pState = streamStateOpen(pMeta->path, pTask, streamId, taskId);
7,369✔
54
    if (pTask->pState == NULL) {
7,369!
55
      tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
×
56
      return terrno;
×
57
    } else {
58
      tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
7,369✔
59
    }
60
  }
61

62
  SReadHandle handle = {
14,065✔
63
      .checkpointId = pTask->chkInfo.checkpointId,
14,065✔
64
      .pStateBackend = pTask->pState,
14,065✔
65
      .fillHistory = pTask->info.fillHistory,
14,065✔
66
      .winRange = pTask->dataRange.window,
67
  };
68

69
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
14,065✔
70
    handle.vnode = ((STQ*)pMeta->ahandle)->pVnode;
7,070✔
71
    handle.initTqReader = 1;
7,070✔
72
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
6,995✔
73
    handle.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
299✔
74
  }
75

76
  initStorageAPI(&handle.api);
14,065✔
77

78
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG) {
14,063✔
79
    code = qCreateStreamExecTaskInfo(&pTask->exec.pExecutor, pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
7,368✔
80
    if (code) {
7,369!
81
      tqError("s-task:%s failed to expand task, code:%s", pTask->id.idStr, tstrerror(code));
×
82
      return code;
×
83
    }
84

85
    code = qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
7,369✔
86
    if (code) {
7,369!
87
      return code;
×
88
    }
89

90
    code = qSetStreamNotifyInfo(pTask->exec.pExecutor, pTask->notifyInfo.notifyEventTypes,
14,738✔
91
                                pTask->notifyInfo.pSchemaWrapper, pTask->notifyInfo.stbFullName,
7,369✔
92
                                IS_NEW_SUBTB_RULE(pTask), &pTask->notifyEventStat);
7,369!
93
    if (code) {
7,369!
94
      tqError("s-task:%s failed to set stream notify info, code:%s", pTask->id.idStr, tstrerror(code));
×
95
      return code;
×
96
    }
97
  }
98

99
  streamSetupScheduleTrigger(pTask);
14,064✔
100

101
  double el = (taosGetTimestampMs() - st) / 1000.0;
14,060✔
102
  tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el);
14,060✔
103

104
  return code;
14,062✔
105
}
106

107
void tqSetRestoreVersionInfo(SStreamTask* pTask) {
14,338✔
108
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
14,338✔
109

110
  // checkpoint ver is the kept version, handled data should be the next version.
111
  if (pChkInfo->checkpointId != 0) {
14,338✔
112
    pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
243✔
113
    pChkInfo->processedVer = pChkInfo->checkpointVer;
243✔
114
    pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
243✔
115

116
    tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
243!
117
           pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
118
  }
119

120
  pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
14,338✔
121
}
14,338✔
122

123
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
13✔
124
  int32_t vgId = pMeta->vgId;
13✔
125
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
13✔
126
  if (numOfTasks == 0) {
13!
127
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
128
    return 0;
×
129
  }
130

131
  tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
13!
132

133
  int32_t type = restart ? STREAM_EXEC_T_RESTART_ALL_TASKS : STREAM_EXEC_T_START_ALL_TASKS;
13!
134
  return streamTaskSchedTask(cb, vgId, 0, 0, type);
13✔
135
}
136

137
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
9,235✔
138
  int32_t vgId = pMeta->vgId;
9,235✔
139
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
9,235✔
140
  if (numOfTasks == 0) {
9,236!
141
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
142
    return 0;
×
143
  }
144

145
  tqDebug("vgId:%d start task:0x%x async", vgId, taskId);
9,236✔
146
  return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK);
9,236✔
147
}
148

149
// this is to process request from transaction, always return true.
150
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader) {
24✔
151
  int32_t      vgId = pMeta->vgId;
24✔
152
  char*        msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
24✔
153
  int32_t      len = pMsg->contLen - sizeof(SMsgHead);
24✔
154
  SRpcMsg      rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
24✔
155
  int64_t      st = taosGetTimestampMs();
24✔
156
  bool         updated = false;
24✔
157
  int32_t      code = 0;
24✔
158
  SStreamTask* pTask = NULL;
24✔
159
  SStreamTask* pHTask = NULL;
24✔
160

161
  SStreamTaskNodeUpdateMsg req = {0};
24✔
162

163
  SDecoder decoder;
164
  tDecoderInit(&decoder, (uint8_t*)msg, len);
24✔
165
  if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
24!
166
    rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
×
167
    tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code));
×
168
    tDecoderClear(&decoder);
×
169
    return rsp.code;
×
170
  }
171

172
  tDecoderClear(&decoder);
24✔
173

174
  int32_t gError = streamGetFatalError(pMeta);
24✔
175
  if (gError != 0) {
24!
176
    tqError("vgId:%d global fatal occurs, code:%s, ts:%" PRId64 " func:%s", pMeta->vgId, tstrerror(gError),
×
177
            pMeta->fatalInfo.ts, pMeta->fatalInfo.func);
178
    return 0;
×
179
  }
180

181
  // update the nodeEpset when it exists
182
  streamMetaWLock(pMeta);
24✔
183

184
  // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
185
  STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
24✔
186
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
24✔
187
  if (code != 0) {
24!
188
    tqError("vgId:%d failed to acquire task:0x%x when handling update task epset, it may have been dropped", vgId,
×
189
            req.taskId);
190
    rsp.code = TSDB_CODE_SUCCESS;
×
191
    streamMetaWUnLock(pMeta);
×
192
    taosArrayDestroy(req.pNodeList);
×
193
    return rsp.code;
×
194
  }
195

196
  const char* idstr = pTask->id.idStr;
24✔
197

198
  if (req.transId <= 0) {
24!
199
    tqError("vgId:%d invalid update nodeEp task, transId:%d, discard", vgId, req.taskId);
×
200
    rsp.code = TSDB_CODE_SUCCESS;
×
201

202
    streamMetaReleaseTask(pMeta, pTask);
×
203
    streamMetaWUnLock(pMeta);
×
204

205
    taosArrayDestroy(req.pNodeList);
×
206
    return rsp.code;
×
207
  }
208

209
  // info needs to be kept till the new trans to update the nodeEp arrived.
210
  bool update = streamMetaInitUpdateTaskList(pMeta, req.transId);
24✔
211
  if (!update) {
24!
212
    rsp.code = TSDB_CODE_SUCCESS;
×
213

214
    streamMetaReleaseTask(pMeta, pTask);
×
215
    streamMetaWUnLock(pMeta);
×
216

217
    taosArrayDestroy(req.pNodeList);
×
218
    return rsp.code;
×
219
  }
220

221
  // duplicate update epset msg received, discard this redundant message
222
  STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
24✔
223

224
  void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
24✔
225
  if (pReqTask != NULL) {
24!
226
    tqDebug("s-task:%s (vgId:%d) already update in transId:%d, discard the nodeEp update msg", idstr, vgId,
×
227
            req.transId);
228
    rsp.code = TSDB_CODE_SUCCESS;
×
229

230
    streamMetaReleaseTask(pMeta, pTask);
×
231
    streamMetaWUnLock(pMeta);
×
232

233
    taosArrayDestroy(req.pNodeList);
×
234
    return rsp.code;
×
235
  }
236

237
  updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
24✔
238

239
  // send the checkpoint-source-rsp for source task to end the checkpoint trans in mnode
240
  code = streamTaskSendCheckpointsourceRsp(pTask);
24✔
241
  if (code) {
24!
242
    tqError("%s failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
×
243
  }
244
  streamTaskResetStatus(pTask);
24✔
245

246
  streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
24✔
247

248
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
24✔
249
    code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->hTaskInfo.id, &pHTask);
6✔
250
    if (code != 0) {
6!
251
      tqError(
×
252
          "vgId:%d failed to acquire fill-history task:0x%x when handling update, may have been dropped already, rel "
253
          "stream task:0x%x",
254
          vgId, (uint32_t)pTask->hTaskInfo.id.taskId, req.taskId);
255
      CLEAR_RELATED_FILLHISTORY_TASK(pTask);
×
256
    } else {
257
      tqDebug("s-task:%s fill-history task update nodeEp along with stream task", pHTask->id.idStr);
6!
258
      bool updateEpSet = streamTaskUpdateEpsetInfo(pHTask, req.pNodeList);
6✔
259
      if (updateEpSet) {
6!
260
        updated = updateEpSet;
6✔
261
      }
262

263
      streamTaskResetStatus(pHTask);
6✔
264
      streamTaskStopMonitorCheckRsp(&pHTask->taskCheckInfo, pHTask->id.idStr);
6✔
265
    }
266
  }
267

268
  // stream do update the nodeEp info, write it into stream meta.
269
  if (updated) {
24✔
270
    tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId);
13!
271
    code = streamMetaSaveTaskInMeta(pMeta, pTask);
13✔
272
    if (code) {
13!
273
      tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code));
×
274
    }
275

276
    if (pHTask != NULL) {
13✔
277
      code = streamMetaSaveTaskInMeta(pMeta, pHTask);
6✔
278
      if (code) {
6!
279
        tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code));
×
280
      }
281
    }
282
  } else {
283
    tqDebug("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId);
11!
284
  }
285

286
  code = streamTaskStop(pTask);
24✔
287
  if (code) {
24!
288
    tqError("s-task:%s vgId:%d failed to stop task, code:%s", idstr, vgId, tstrerror(code));
×
289
  }
290

291
  if (pHTask != NULL) {
24✔
292
    code = streamTaskStop(pHTask);
6✔
293
    if (code) {
6!
294
      tqError("s-task:%s vgId:%d failed to stop related history task, code:%s", idstr, vgId, tstrerror(code));
×
295
    }
296
  }
297

298
  // keep info
299
  streamMetaAddIntoUpdateTaskList(pMeta, pTask, (pHTask != NULL) ? (pHTask) : NULL, req.transId, st);
24✔
300
  streamMetaReleaseTask(pMeta, pTask);
24✔
301
  streamMetaReleaseTask(pMeta, pHTask);
24✔
302

303
  rsp.code = TSDB_CODE_SUCCESS;
24✔
304

305
  // possibly only handle the stream task.
306
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
24✔
307
  int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
24✔
308

309
  if (restored && isLeader) {
24!
310
    tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId);
24!
311
    pMeta->startInfo.tasksWillRestart = 1;
24✔
312
  }
313

314
  if (updateTasks < numOfTasks) {
24✔
315
    if (isLeader) {
11!
316
      tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
11!
317
              updateTasks, (numOfTasks - updateTasks));
318
    } else {
319
      tqDebug("vgId:%d closed tasks:%d, unclosed:%d, follower not restart tasks", vgId, updateTasks,
×
320
              (numOfTasks - updateTasks));
321
    }
322
  } else {
323
    if ((code = streamMetaCommit(pMeta)) < 0) {
13!
324
      // always return true
325
      streamMetaWUnLock(pMeta);
×
326
      taosArrayDestroy(req.pNodeList);
×
327
      return TSDB_CODE_SUCCESS;
×
328
    }
329

330
    streamMetaClearSetUpdateTaskListComplete(pMeta);
13✔
331

332
    if (isLeader) {
13!
333
      if (!restored) {
13!
334
        tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId);
×
335
      } else {
336
        tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId);
13!
337
#if 0
338
      taosMSleep(5000);// for test purpose, to trigger the leader election
339
#endif
340
        code = tqStreamTaskStartAsync(pMeta, cb, true);
13✔
341
        if (code) {
13!
342
          tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code));
×
343
        }
344
      }
345
    } else {
346
      tqDebug("vgId:%d follower nodes not restart tasks", vgId);
×
347
    }
348
  }
349

350
  streamMetaWUnLock(pMeta);
24✔
351
  taosArrayDestroy(req.pNodeList);
24✔
352
  return rsp.code;  // always return true
24✔
353
}
354

355
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
59,618✔
356
  char*   msgStr = pMsg->pCont;
59,618✔
357
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
59,618✔
358
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
59,618✔
359

360
  SStreamDispatchReq req = {0};
59,618✔
361

362
  SDecoder decoder;
363
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
59,618✔
364
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
59,618!
365
    tDecoderClear(&decoder);
×
366
    return TSDB_CODE_MSG_DECODE_ERROR;
×
367
  }
368
  tDecoderClear(&decoder);
59,618✔
369

370
  tqDebug("s-task:0x%x recv dispatch msg from 0x%x(vgId:%d)", req.taskId, req.upstreamTaskId, req.upstreamNodeId);
59,618✔
371

372
  SStreamTask* pTask = NULL;
59,618✔
373
  int32_t      code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
59,618✔
374
  if (pTask && (code == 0)) {
59,618!
375
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
59,579✔
376
    if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0) {
59,579!
377
      return -1;
×
378
    }
379
    tCleanupStreamDispatchReq(&req);
59,579✔
380
    streamMetaReleaseTask(pMeta, pTask);
59,579✔
381
    return 0;
59,579✔
382
  } else {
383
    tqError("vgId:%d failed to find task:0x%x to handle the dispatch req, it may have been destroyed already",
39!
384
            pMeta->vgId, req.taskId);
385

386
    SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
39✔
387
    if (pRspHead == NULL) {
39!
388
      tqError("s-task:0x%x send dispatch error rsp, out of memory", req.taskId);
×
389
      return terrno;
×
390
    }
391

392
    pRspHead->vgId = htonl(req.upstreamNodeId);
39✔
393
    if (pRspHead->vgId == 0) {
39!
394
      tqError("vgId:%d invalid dispatch msg from upstream to task:0x%x", pMeta->vgId, req.taskId);
×
395
      return TSDB_CODE_INVALID_MSG;
×
396
    }
397

398
    SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
39✔
399
    pRsp->streamId = htobe64(req.streamId);
39✔
400
    pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
39✔
401
    pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
39✔
402
    pRsp->downstreamNodeId = htonl(pMeta->vgId);
39✔
403
    pRsp->downstreamTaskId = htonl(req.taskId);
39✔
404
    pRsp->msgId = htonl(req.msgId);
39✔
405
    pRsp->stage = htobe64(req.stage);
39✔
406
    pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;
39✔
407

408
    int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
39✔
409
    SRpcMsg rsp = {.code = TSDB_CODE_STREAM_TASK_NOT_EXIST, .info = pMsg->info, .contLen = len, .pCont = pRspHead};
39✔
410
    tqError("s-task:0x%x send dispatch error rsp, no task", req.taskId);
39!
411

412
    tmsgSendRsp(&rsp);
39✔
413
    tCleanupStreamDispatchReq(&req);
39✔
414

415
    return 0;
39✔
416
  }
417
}
418

419
int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
59,618✔
420
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
59,618✔
421

422
  int32_t vgId = pMeta->vgId;
59,618✔
423
  pRsp->upstreamNodeId = htonl(pRsp->upstreamNodeId);
59,618✔
424
  pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
59,618✔
425
  pRsp->streamId = htobe64(pRsp->streamId);
59,618✔
426
  pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId);
59,618✔
427
  pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId);
59,618✔
428
  pRsp->stage = htobe64(pRsp->stage);
59,618✔
429
  pRsp->msgId = htonl(pRsp->msgId);
59,618✔
430

431
  tqDebug("s-task:0x%x vgId:%d recv dispatch-rsp from 0x%x vgId:%d", pRsp->upstreamTaskId, pRsp->upstreamNodeId,
59,618✔
432
          pRsp->downstreamTaskId, pRsp->downstreamNodeId);
433

434
  SStreamTask* pTask = NULL;
59,618✔
435
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId, &pTask);
59,618✔
436
  if (pTask && (code == 0)) {
59,618!
437
    code = streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
59,579✔
438
    streamMetaReleaseTask(pMeta, pTask);
59,579✔
439
    return code;
59,579✔
440
  } else {
441
    tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId);
39✔
442
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
39✔
443
  }
444
}
445

446
int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
509✔
447
  char*    msgStr = pMsg->pCont;
509✔
448
  char*    msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
509✔
449
  int32_t  msgLen = pMsg->contLen - sizeof(SMsgHead);
509✔
450
  int32_t  code = 0;
509✔
451
  SDecoder decoder;
452

453
  SStreamRetrieveReq req;
454
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
509✔
455
  code = tDecodeStreamRetrieveReq(&decoder, &req);
509✔
456
  tDecoderClear(&decoder);
509✔
457

458
  if (code) {
509!
459
    tqError("vgId:%d failed to decode retrieve msg, discard it", pMeta->vgId);
×
460
    return code;
×
461
  }
462

463
  SStreamTask* pTask = NULL;
509✔
464
  code = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId, &pTask);
509✔
465
  if (pTask == NULL || code != 0) {
509!
466
    tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
×
467
            req.dstTaskId);
468
    tCleanupStreamRetrieveReq(&req);
×
469
    return code;
×
470
  }
471

472
  // enqueue
473
  tqDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d), QID:0x%" PRIx64, pTask->id.idStr,
509✔
474
          pTask->pMeta->vgId, pTask->info.taskLevel, req.srcTaskId, req.srcNodeId, req.reqId);
475

476
  // if task is in ck status, set current ck failed
477
  streamTaskSetCheckpointFailed(pTask);
509✔
478

479
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
508✔
480
    code = streamProcessRetrieveReq(pTask, &req);
502✔
481
  } else {
482
    req.srcNodeId = pTask->info.nodeId;
6✔
483
    req.srcTaskId = pTask->id.taskId;
6✔
484
    code = streamTaskBroadcastRetrieveReq(pTask, &req);
6✔
485
  }
486

487
  if (code != TSDB_CODE_SUCCESS) {  // return error not send rsp manually
509!
488
    tqError("s-task:0x%x vgId:%d failed to process retrieve request from 0x%x, code:%s", req.dstTaskId, req.dstNodeId,
×
489
            req.srcTaskId, tstrerror(code));
490
  } else {  // send rsp manually only on success.
491
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
509✔
492
    streamTaskSendRetrieveRsp(&req, &rsp);
509✔
493
  }
494

495
  streamMetaReleaseTask(pMeta, pTask);
509✔
496
  tCleanupStreamRetrieveReq(&req);
509✔
497

498
  // always return success, to disable the auto rsp
499
  return code;
508✔
500
}
501

502
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
22,910✔
503
  char*   msgStr = pMsg->pCont;
22,910✔
504
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
22,910✔
505
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
22,910✔
506
  int32_t code = 0;
22,910✔
507

508
  SStreamTaskCheckReq req;
509
  SStreamTaskCheckRsp rsp = {0};
22,910✔
510

511
  SDecoder decoder;
512

513
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
22,910✔
514
  code = tDecodeStreamTaskCheckReq(&decoder, &req);
22,909✔
515
  tDecoderClear(&decoder);
22,910✔
516

517
  if (code) {
22,910!
518
    tqError("vgId:%d decode check msg failed, not handle this msg", pMeta->vgId);
×
519
    return code;
×
520
  }
521

522
  streamTaskProcessCheckMsg(pMeta, &req, &rsp);
22,910✔
523
  return streamTaskSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId);
22,910✔
524
}
525

526
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
22,906✔
527
  char*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
22,906✔
528
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
22,906✔
529
  int32_t vgId = pMeta->vgId;
22,906✔
530
  int32_t code = TSDB_CODE_SUCCESS;
22,906✔
531

532
  SStreamTaskCheckRsp rsp;
533

534
  SDecoder decoder;
535
  tDecoderInit(&decoder, (uint8_t*)pReq, len);
22,906✔
536
  code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
22,906✔
537
  if (code < 0) {
22,906!
538
    terrno = TSDB_CODE_INVALID_MSG;
×
539
    tDecoderClear(&decoder);
×
540
    tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno));
×
541
    return -1;
×
542
  }
543

544
  tDecoderClear(&decoder);
22,906✔
545
  tqDebug("tq task:0x%x (vgId:%d) recv check rsp(QID:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId,
22,906✔
546
          rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
547

548
  if (!isLeader) {
22,906!
549
    tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
×
550
            rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
551
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
×
552
  }
553

554
  SStreamTask* pTask = NULL;
22,906✔
555
  code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId, &pTask);
22,906✔
556
  if ((pTask == NULL) || (code != 0)) {
22,905!
557
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
35✔
558
  }
559

560
  code = streamTaskProcessCheckRsp(pTask, &rsp);
22,870✔
561
  streamMetaReleaseTask(pMeta, pTask);
22,871✔
562
  return code;
22,871✔
563
}
564

565
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
8,456✔
566
  int32_t vgId = pMeta->vgId;
8,456✔
567
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
8,456✔
568
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
8,456✔
569
  int32_t code = 0;
8,456✔
570

571
  SStreamCheckpointReadyMsg req = {0};
8,456✔
572

573
  SDecoder decoder;
574
  tDecoderInit(&decoder, (uint8_t*)msg, len);
8,456✔
575
  if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) {
8,456!
576
    code = TSDB_CODE_MSG_DECODE_ERROR;
×
577
    tDecoderClear(&decoder);
×
578
    return code;
×
579
  }
580
  tDecoderClear(&decoder);
8,456✔
581

582
  SStreamTask* pTask = NULL;
8,455✔
583
  code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
8,455✔
584
  if (code != 0) {
8,455✔
585
    tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId);
4!
586
    return code;
4✔
587
  }
588

589
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
8,451!
590
    tqDebug("vgId:%d s-task:%s recv invalid the checkpoint-ready msg from task:0x%x (vgId:%d), discard", vgId,
×
591
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
592
    streamMetaReleaseTask(pMeta, pTask);
×
593
    return TSDB_CODE_INVALID_MSG;
×
594
  } else {
595
    tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId,
8,451✔
596
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
597
  }
598

599
  code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamNodeId, req.downstreamTaskId);
8,451✔
600
  streamMetaReleaseTask(pMeta, pTask);
8,452✔
601
  if (code) {
8,452!
602
    return code;
×
603
  }
604

605
  {  // send checkpoint ready rsp
606
    SMStreamCheckpointReadyRspMsg* pReadyRsp = rpcMallocCont(sizeof(SMStreamCheckpointReadyRspMsg));
8,452✔
607
    if (pReadyRsp == NULL) {
8,452!
608
      return terrno;
×
609
    }
610

611
    pReadyRsp->upstreamTaskId = req.upstreamTaskId;
8,452✔
612
    pReadyRsp->upstreamNodeId = req.upstreamNodeId;
8,452✔
613
    pReadyRsp->downstreamTaskId = req.downstreamTaskId;
8,452✔
614
    pReadyRsp->downstreamNodeId = req.downstreamNodeId;
8,452✔
615
    pReadyRsp->checkpointId = req.checkpointId;
8,452✔
616
    pReadyRsp->streamId = req.streamId;
8,452✔
617
    pReadyRsp->head.vgId = htonl(req.downstreamNodeId);
8,452✔
618

619
    SRpcMsg rsp = {.code = 0, .info = pMsg->info, .pCont = pReadyRsp, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)};
8,452✔
620
    tmsgSendRsp(&rsp);
8,452✔
621

622
    pMsg->info.handle = NULL;  // disable auto rsp
8,452✔
623
  }
624

625
  return code;
8,452✔
626
}
627

628
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
13,966✔
629
                                     bool isLeader, bool restored) {
630
  int32_t code = 0;
13,966✔
631
  int32_t vgId = pMeta->vgId;
13,966✔
632
  int32_t numOfTasks = 0;
13,966✔
633
  int32_t taskId = -1;
13,966✔
634
  int64_t streamId = -1;
13,966✔
635
  bool    added = false;
13,966✔
636
  int32_t size = sizeof(SStreamTask);
13,966✔
637

638
  if (tsDisableStream) {
13,966!
639
    tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
×
640
    return code;
×
641
  }
642

643
  tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
13,966✔
644

645
  // 1.deserialize msg and build task
646
  SStreamTask* pTask = taosMemoryCalloc(1, size);
13,966!
647
  if (pTask == NULL) {
13,968!
648
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);
×
649
    return terrno;
×
650
  }
651

652
  SDecoder decoder;
653
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
13,968✔
654
  code = tDecodeStreamTask(&decoder, pTask);
13,970✔
655
  tDecoderClear(&decoder);
13,960✔
656

657
  if (code != TSDB_CODE_SUCCESS) {
13,962!
658
    taosMemoryFree(pTask);
×
659
    return TSDB_CODE_INVALID_MSG;
×
660
  }
661

662
  // 2.save task, use the latest commit version as the initial start version of stream task.
663
  taskId = pTask->id.taskId;
13,962✔
664
  streamId = pTask->id.streamId;
13,962✔
665

666
  streamMetaWLock(pMeta);
13,962✔
667
  code = streamMetaRegisterTask(pMeta, sversion, pTask, &added);
13,961✔
668
  numOfTasks = streamMetaGetNumOfTasks(pMeta);
13,977✔
669
  streamMetaWUnLock(pMeta);
13,977✔
670

671
  if (code < 0) {
13,975!
672
    tqError("vgId:%d failed to register s-task:0x%x into meta, existed tasks:%d, code:%s", vgId, taskId, numOfTasks,
×
673
            tstrerror(code));
674
    return code;
×
675
  }
676

677
  // added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if
678
  // it is added into the meta store
679
  if (added) {
13,975!
680
    // only handled in the leader node
681
    if (isLeader) {
13,975✔
682
      tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
13,943✔
683

684
      if (restored) {
13,942!
685
        SStreamTask* p = NULL;
13,942✔
686
        code = streamMetaAcquireTask(pMeta, streamId, taskId, &p);
13,942✔
687
        if ((p != NULL) && (code == 0) && (p->info.fillHistory == 0)) {
13,946!
688
          code = tqStreamStartOneTaskAsync(pMeta, cb, streamId, taskId);
9,115✔
689
        }
690

691
        if (p != NULL) {
13,946!
692
          streamMetaReleaseTask(pMeta, p);
13,946✔
693
        }
694
      } else {
695
        tqWarn("s-task:0x%x not launched since vnode(vgId:%d) not ready", taskId, vgId);
×
696
      }
697

698
    } else {
699
      tqDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId);
32!
700
    }
701
  } else {
702
    tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store, total:%d", vgId, taskId, numOfTasks);
×
703
  }
704

705
  return code;
13,977✔
706
}
707

708
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
7,011✔
709
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
7,011✔
710
  int32_t              code = 0;
7,011✔
711
  int32_t              vgId = pMeta->vgId;
7,011✔
712
  STaskId              hTaskId = {0};
7,011✔
713
  SStreamTask*         pTask = NULL;
7,011✔
714

715
  tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
7,011✔
716

717
  streamMetaWLock(pMeta);
7,011✔
718

719
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
7,017✔
720
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
7,017✔
721
  if (code == 0) {
7,023!
722
    if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
7,024✔
723
      hTaskId.streamId = pTask->hTaskInfo.id.streamId;
1,336✔
724
      hTaskId.taskId = pTask->hTaskInfo.id.taskId;
1,336✔
725
    }
726

727
    // clear the relationship, and then release the stream tasks, to avoid invalid accessing of already freed
728
    // related stream(history) task
729
    streamTaskSetRemoveBackendFiles(pTask);
7,024✔
730
    code = streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt);
6,995✔
731
    streamMetaReleaseTask(pMeta, pTask);
7,010✔
732

733
    if (code) {
7,021✔
734
      tqError("s-task:0x%x failed to clear related fill-history info, still exists", pReq->taskId);
2!
735
    }
736
  }
737

738
  // drop the related fill-history task firstly
739
  if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
7,020✔
740
    tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
1,282✔
741
    code = streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
1,282✔
742
    if (code) {
1,283!
743
      tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x failed", pReq->taskId, vgId,
×
744
              (int32_t)hTaskId.taskId);
745
    }
746
  }
747

748
  // drop the stream task now
749
  code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
7,021✔
750
  if (code) {
7,025!
751
    tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId);
×
752
  }
753

754
  // commit the update
755
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
7,025✔
756
  tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
7,016✔
757

758
  if (streamMetaCommit(pMeta) < 0) {
7,016✔
759
    // persist to disk
760
  }
761

762
  streamMetaWUnLock(pMeta);
7,026✔
763
  tqDebug("vgId:%d process drop task:0x%x completed", vgId, pReq->taskId);
7,026✔
764

765
  return 0;  // always return success
7,026✔
766
}
767

768
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg) {
5,642✔
769
  SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
5,642✔
770
  int32_t                    code = 0;
5,642✔
771
  int32_t                    vgId = pMeta->vgId;
5,642✔
772
  SStreamTask*               pTask = NULL;
5,642✔
773

774
  tqDebug("vgId:%d receive msg to update-checkpoint-info for s-task:0x%x", vgId, pReq->taskId);
5,642✔
775

776
  streamMetaWLock(pMeta);
5,642✔
777

778
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
5,641✔
779
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
5,641✔
780
  if (code == 0) {
5,649✔
781
    code = streamTaskUpdateTaskCheckpointInfo(pTask, restored, pReq);
5,646✔
782
    streamMetaReleaseTask(pMeta, pTask);
5,646✔
783
  } else {  // failed to get the task.
784
    int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
3✔
785
    tqError(
3!
786
        "vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, numOfTasks:%d, it may have been "
787
        "dropped already",
788
        vgId, pReq->taskId, numOfTasks);
789
  }
790

791
  streamMetaWUnLock(pMeta);
5,649✔
792
  // always return success when handling the requirement issued by mnode during transaction.
793
  return TSDB_CODE_SUCCESS;
5,649✔
794
}
795

796
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
13✔
797
  int32_t vgId = pMeta->vgId;
13✔
798
  int32_t code = 0;
13✔
799
  int64_t st = taosGetTimestampMs();
13✔
800

801
  streamMetaWLock(pMeta);
13✔
802
  if (pMeta->startInfo.startAllTasks == 1) {
13!
803
    pMeta->startInfo.restartCount += 1;
×
804
    tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
×
805
            pMeta->startInfo.restartCount);
806
    streamMetaWUnLock(pMeta);
×
807
    return TSDB_CODE_SUCCESS;
×
808
  }
809

810
  pMeta->startInfo.startAllTasks = 1;
13✔
811
  streamMetaWUnLock(pMeta);
13✔
812

813
  terrno = 0;
13✔
814
  tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d, ts:%" PRId64, vgId,
13!
815
         pMeta->updateInfo.completeTransId, pMeta->updateInfo.completeTs);
816

817
  streamMetaWLock(pMeta);
13✔
818
  streamMetaClear(pMeta);
13✔
819

820
  int64_t el = taosGetTimestampMs() - st;
13✔
821
  tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.);
13!
822

823
  streamMetaLoadAllTasks(pMeta);
13✔
824

825
  {
826
    STaskStartInfo* pStartInfo = &pMeta->startInfo;
13✔
827
    taosHashClear(pStartInfo->pReadyTaskSet);
13✔
828
    taosHashClear(pStartInfo->pFailedTaskSet);
13✔
829
    pStartInfo->readyTs = 0;
13✔
830
  }
831

832
  if (isLeader && !tsDisableStream) {
13!
833
    streamMetaWUnLock(pMeta);
13✔
834
    code = streamMetaStartAllTasks(pMeta);
13✔
835
  } else {
836
    streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
×
837
    pMeta->startInfo.restartCount = 0;
×
838
    streamMetaWUnLock(pMeta);
×
839
    tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId);
×
840
  }
841

842
  code = terrno;
13✔
843
  return code;
13✔
844
}
845

846
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
94,808✔
847
  int32_t  code = 0;
94,808✔
848
  int32_t  vgId = pMeta->vgId;
94,808✔
849
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
94,808✔
850
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
94,808✔
851
  SDecoder decoder;
852

853
  SStreamTaskRunReq req = {0};
94,808✔
854
  tDecoderInit(&decoder, (uint8_t*)msg, len);
94,808✔
855
  if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
94,947!
856
    tqError("vgId:%d failed to decode task run req, code:%s", pMeta->vgId, tstrerror(code));
×
857
    tDecoderClear(&decoder);
×
858
    return TSDB_CODE_SUCCESS;
×
859
  }
860

861
  tDecoderClear(&decoder);
94,964✔
862

863
  int32_t type = req.reqType;
94,971✔
864
  if (type == STREAM_EXEC_T_START_ONE_TASK) {
94,971✔
865
    code = streamMetaStartOneTask(pMeta, req.streamId, req.taskId);
9,235✔
866
    return 0;
9,236✔
867
  } else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
85,736✔
868
    code = streamMetaStartAllTasks(pMeta);
10,396✔
869
    return 0;
10,397✔
870
  } else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) {
75,340✔
871
    code = restartStreamTasks(pMeta, isLeader);
13✔
872
    return 0;
13✔
873
  } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
75,327✔
874
    code = streamMetaStopAllTasks(pMeta);
4,618✔
875
    return 0;
4,618✔
876
  } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
70,709✔
877
    code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
4✔
878
    return code;
4✔
879
  } else if (type == STREAM_EXEC_T_STOP_ONE_TASK) {
70,705!
880
    code = streamMetaStopOneTask(pMeta, req.streamId, req.taskId);
×
881
    return code;
×
882
  } else if (type == STREAM_EXEC_T_RESUME_TASK) {  // task resume to run after idle for a while
70,705✔
883
    SStreamTask* pTask = NULL;
6,797✔
884
    code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
6,797✔
885

886
    if (pTask != NULL && (code == 0)) {
6,802✔
887
      char* pStatus = NULL;
6,793✔
888
      if (streamTaskReadyToRun(pTask, &pStatus)) {
6,793!
889
        int64_t execTs = pTask->status.lastExecTs;
6,789✔
890
        int32_t idle = taosGetTimestampMs() - execTs;
6,796✔
891
        tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs);
6,796✔
892

893
        code = streamResumeTask(pTask);
6,796✔
894
      } else {
895
        int8_t status = streamTaskSetSchedStatusInactive(pTask);
×
896
        tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
×
897
                pTask->id.idStr, pStatus, status);
898
      }
899
      streamMetaReleaseTask(pMeta, pTask);
6,799✔
900
    }
901

902
    return code;
6,808✔
903
  }
904

905
  SStreamTask* pTask = NULL;
63,908✔
906
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
63,908✔
907
  if ((pTask != NULL) && (code == 0)) {  // even in halt status, the data in inputQ must be processed
63,856!
908
    char* p = NULL;
63,882✔
909
    if (streamTaskReadyToRun(pTask, &p)) {
63,882!
910
      tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId,
63,795✔
911
              pTask->id.idStr, p, pTask->chkInfo.nextProcessVer);
912
      (void)streamExecTask(pTask);
63,795✔
913
    } else {
914
      int8_t status = streamTaskSetSchedStatusInactive(pTask);
×
915
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
27!
916
              pTask->id.idStr, p, status);
917
    }
918

919
    streamMetaReleaseTask(pMeta, pTask);
63,853✔
920
    return 0;
63,884✔
921
  } else {  // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
922
    // todo add one function to handle this
923
    tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, req.taskId);
×
924
    return code;
8✔
925
  }
926
}
927

928
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
49✔
929
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
49✔
930
  int32_t         vgId = pMeta->vgId;
49✔
931
  bool            scanWal = false;
49✔
932
  int32_t         code = 0;
49✔
933

934
  streamMetaWLock(pMeta);
49✔
935
  if (pStartInfo->startAllTasks == 1) {
49!
936
    tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId,
×
937
            pMeta->startInfo.restartCount);
938
  } else {  // not in starting procedure
939
    bool allReady = streamMetaAllTasksReady(pMeta);
49✔
940

941
    if ((pStartInfo->restartCount > 0) && (!allReady)) {
49!
942
      // if all tasks are ready now, do NOT restart again, and reset the value of pStartInfo->restartCount
943
      pStartInfo->restartCount -= 1;
×
944
      tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", vgId, pMeta->role,
×
945
              pStartInfo->restartCount);
946
      streamMetaWUnLock(pMeta);
×
947

948
      return restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
×
949
    } else {
950
      if (pStartInfo->restartCount == 0) {
49!
951
        tqDebug("vgId:%d start all tasks completed in callbackFn, restartCounter is 0", pMeta->vgId);
49✔
952
      } else if (allReady) {
×
953
        pStartInfo->restartCount = 0;
×
954
        tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
×
955
      }
956

957
      scanWal = true;
49✔
958
    }
959
  }
960

961
  streamMetaWUnLock(pMeta);
49✔
962

963
//  if (scanWal && (vgId != SNODE_HANDLE)) {
964
//    tqDebug("vgId:%d start scan wal for executing tasks", vgId);
965
//    code = tqScanWalAsync(pMeta->ahandle, true);
966
//  }
967

968
  return code;
49✔
969
}
970

971
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
1✔
972
  SVResetStreamTaskReq* pReq = (SVResetStreamTaskReq*)pMsg;
1✔
973

974
  SStreamTask* pTask = NULL;
1✔
975
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
1✔
976
  if (pTask == NULL || (code != 0)) {
1!
977
    tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already",
×
978
            pMeta->vgId, pReq->taskId);
979
    return TSDB_CODE_SUCCESS;
×
980
  }
981

982
  tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
1!
983

984
  streamMutexLock(&pTask->lock);
1✔
985

986
  streamTaskSetFailedCheckpointId(pTask, pReq->chkptId);
1✔
987
  streamTaskClearCheckInfo(pTask, true);
1✔
988

989
  // clear flag set during do checkpoint, and open inputQ for all upstream tasks
990
  SStreamTaskState pState = streamTaskGetStatus(pTask);
1✔
991
  if (pState.state == TASK_STATUS__CK) {
1!
992
    streamTaskSetStatusReady(pTask);
1✔
993
    tqDebug("s-task:%s reset checkpoint status to ready", pTask->id.idStr);
1!
994
  } else if (pState.state == TASK_STATUS__UNINIT) {
×
995
    //    tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
996
    //    tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
997
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
998
  } else {
999
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
1000
  }
1001

1002
  streamMutexUnlock(&pTask->lock);
1✔
1003

1004
  streamMetaReleaseTask(pMeta, pTask);
1✔
1005
  return TSDB_CODE_SUCCESS;
1✔
1006
}
1007

1008
int32_t tqStreamTaskProcessAllTaskStopReq(SStreamMeta* pMeta, SMsgCb* pMsgCb, SRpcMsg* pMsg) {
4,022✔
1009
  int32_t  code = 0;
4,022✔
1010
  int32_t  vgId = pMeta->vgId;
4,022✔
1011
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
4,022✔
1012
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
4,022✔
1013
  SDecoder decoder;
1014

1015
  SStreamTaskStopReq req = {0};
4,022✔
1016
  tDecoderInit(&decoder, (uint8_t*)msg, len);
4,022✔
1017
  if ((code = tDecodeStreamTaskStopReq(&decoder, &req)) < 0) {
4,072!
1018
    tqError("vgId:%d failed to decode stop all streams, code:%s", pMeta->vgId, tstrerror(code));
×
1019
    tDecoderClear(&decoder);
×
1020
    return TSDB_CODE_SUCCESS;
×
1021
  }
1022

1023
  tDecoderClear(&decoder);
4,072✔
1024

1025
  // stop all stream tasks, only invoked when trying to drop db
1026
  if (req.streamId <= 0) {
4,067!
1027
    tqDebug("vgId:%d recv msg to stop all tasks in sync before dropping vnode", vgId);
4,070✔
1028
    code = streamMetaStopAllTasks(pMeta);
4,072✔
1029
    if (code) {
4,073!
1030
      tqError("vgId:%d failed to stop all tasks, code:%s", vgId, tstrerror(code));
×
1031
    }
1032

1033
  } else {  // stop only one stream tasks
1034

1035
  }
1036

1037
  // always return success
1038
  return TSDB_CODE_SUCCESS;
4,067✔
1039
}
1040

1041
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
1042
  SRetrieveChkptTriggerReq req = {0};
×
1043
  SStreamTask*             pTask = NULL;
×
1044
  char*                    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1045
  int32_t                  len = pMsg->contLen - sizeof(SMsgHead);
×
1046
  SDecoder                 decoder = {0};
×
1047

1048
  tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1049
  if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
×
1050
    tDecoderClear(&decoder);
×
1051
    tqError("vgId:%d invalid retrieve checkpoint-trigger req received", pMeta->vgId);
×
1052
    return TSDB_CODE_INVALID_MSG;
×
1053
  }
1054
  tDecoderClear(&decoder);
×
1055

1056
  int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
×
1057
  if (pTask == NULL || (code != 0)) {
×
1058
    tqError("vgId:%d process retrieve checkpoint-trigger, checkpointId:%" PRId64
×
1059
            " from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already",
1060
            pMeta->vgId, req.checkpointId, (int32_t)req.downstreamTaskId, req.upstreamTaskId);
1061
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
1062
  }
1063

1064
  tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64,
×
1065
          req.upstreamTaskId, (int32_t)req.downstreamTaskId, req.checkpointId);
1066

1067
  if (pTask->status.downstreamReady != 1) {
×
1068
    tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
×
1069
            pTask->id.idStr, (int32_t)req.downstreamTaskId);
1070

1071
    code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1072
                                              TSDB_CODE_STREAM_TASK_IVLD_STATUS);
1073
    streamMetaReleaseTask(pMeta, pTask);
×
1074
    return code;
×
1075
  }
1076

1077
  SStreamTaskState pState = streamTaskGetStatus(pTask);
×
1078
  if (pState.state == TASK_STATUS__CK) {  // recv the checkpoint-source/trigger already
×
1079
    int32_t transId = 0;
×
1080
    int64_t checkpointId = 0;
×
1081

1082
    streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId);
×
1083
    if (checkpointId != req.checkpointId) {
×
1084
      tqError("s-task:%s invalid checkpoint-trigger retrieve msg from 0x%" PRIx64 ", current checkpointId:%" PRId64
×
1085
              " req:%" PRId64,
1086
              pTask->id.idStr, req.downstreamTaskId, checkpointId, req.checkpointId);
1087
      streamMetaReleaseTask(pMeta, pTask);
×
1088
      return TSDB_CODE_INVALID_MSG;
×
1089
    }
1090

1091
    if (streamTaskAlreadySendTrigger(pTask, req.downstreamNodeId)) {
×
1092
      // re-send the lost checkpoint-trigger msg to downstream task
1093
      tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
×
1094
              (int32_t)req.downstreamTaskId, checkpointId, transId);
1095
      code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1096
                                                TSDB_CODE_SUCCESS);
1097
    } else {  // not send checkpoint-trigger yet, wait
1098
      int32_t recv = 0, total = 0;
×
1099
      streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
×
1100

1101
      if (recv == total) {  // add the ts info
×
1102
        tqWarn("s-task:%s all upstream send checkpoint-source/trigger, but not processed yet, wait", pTask->id.idStr);
×
1103
      } else {
1104
        tqWarn(
×
1105
            "s-task:%s not all upstream send checkpoint-source/trigger, total recv:%d/%d, wait for all upstream "
1106
            "sending checkpoint-source/trigger",
1107
            pTask->id.idStr, recv, total);
1108
      }
1109
      code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1110
                                                TSDB_CODE_ACTION_IN_PROGRESS);
1111
    }
1112
  } else {  // upstream not recv the checkpoint-source/trigger till now
1113
    if (!(pState.state == TASK_STATUS__READY || pState.state == TASK_STATUS__HALT)) {
×
1114
      tqFatal("s-task:%s invalid task status:%s", pTask->id.idStr, pState.name);
×
1115
    }
1116

1117
    tqWarn(
×
1118
        "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
1119
        "upstream sending checkpoint-source/trigger",
1120
        pTask->id.idStr);
1121
    code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1122
                                              TSDB_CODE_ACTION_IN_PROGRESS);
1123
  }
1124

1125
  streamMetaReleaseTask(pMeta, pTask);
×
1126
  return code;
×
1127
}
1128

1129
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
1130
  SCheckpointTriggerRsp rsp = {0};
×
1131
  SStreamTask*          pTask = NULL;
×
1132
  char*                 msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1133
  int32_t               len = pMsg->contLen - sizeof(SMsgHead);
×
1134
  SDecoder              decoder = {0};
×
1135

1136
  tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1137
  if (tDecodeCheckpointTriggerRsp(&decoder, &rsp) < 0) {
×
1138
    tDecoderClear(&decoder);
×
1139
    tqError("vgId:%d invalid retrieve checkpoint-trigger rsp received", pMeta->vgId);
×
1140
    return TSDB_CODE_INVALID_MSG;
×
1141
  }
1142
  tDecoderClear(&decoder);
×
1143

1144
  int32_t code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.taskId, &pTask);
×
1145
  if (pTask == NULL || (code != 0)) {
×
1146
    tqError(
×
1147
        "vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already",
1148
        pMeta->vgId, rsp.taskId);
1149
    return code;
×
1150
  }
1151

1152
  tqDebug(
×
1153
      "s-task:%s recv re-send checkpoint-trigger msg through retrieve/rsp channel, upstream:0x%x, checkpointId:%" PRId64
1154
      ", transId:%d",
1155
      pTask->id.idStr, rsp.upstreamTaskId, rsp.checkpointId, rsp.transId);
1156

1157
  code = streamTaskProcessCheckpointTriggerRsp(pTask, &rsp);
×
1158
  streamMetaReleaseTask(pMeta, pTask);
×
1159
  return code;
×
1160
}
1161

1162
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
1,377✔
1163
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
1,377✔
1164

1165
  SStreamTask* pTask = NULL;
1,377✔
1166
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
1,377✔
1167
  if (pTask == NULL || (code != 0)) {
1,377!
1168
    tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
×
1169
            pReq->taskId);
1170
    // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
1171
    return TSDB_CODE_SUCCESS;
×
1172
  }
1173

1174
  tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
1,377✔
1175
  streamTaskPause(pTask);
1,377✔
1176

1177
  SStreamTask* pHistoryTask = NULL;
1,379✔
1178
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
1,379✔
1179
    pHistoryTask = NULL;
36✔
1180
    code = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId, &pHistoryTask);
36✔
1181
    if (pHistoryTask == NULL || (code != 0)) {
36!
1182
      tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
×
1183
              ", it may have been dropped already",
1184
              pMeta->vgId, pTask->hTaskInfo.id.taskId);
1185
      streamMetaReleaseTask(pMeta, pTask);
×
1186

1187
      // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
1188
      return TSDB_CODE_SUCCESS;
×
1189
    }
1190

1191
    tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
36!
1192

1193
    streamTaskPause(pHistoryTask);
36✔
1194
    streamMetaReleaseTask(pMeta, pHistoryTask);
36✔
1195
  }
1196

1197
  streamMetaReleaseTask(pMeta, pTask);
1,379✔
1198
  return TSDB_CODE_SUCCESS;
1,379✔
1199
}
1200

1201
static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t sversion, int8_t igUntreated,
2,598✔
1202
                                       bool fromVnode) {
1203
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
2,598✔
1204
  int32_t      vgId = pMeta->vgId;
2,598✔
1205
  int32_t      code = 0;
2,598✔
1206

1207
  streamTaskResume(pTask);
2,598✔
1208
  ETaskStatus status = streamTaskGetStatus(pTask).state;
2,602✔
1209

1210
  int32_t level = pTask->info.taskLevel;
2,602✔
1211
  if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
2,602!
1212
    // no lock needs to secure the access of the version
1213
    if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
2,602!
1214
      // discard all the data  when the stream task is suspended.
1215
      walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
589✔
1216
      tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
589✔
1217
              ", schedStatus:%d",
1218
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1219
    } else {  // from the previous paused version and go on
1220
      tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
2,013✔
1221
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1222
    }
1223

1224
    if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
2,602✔
1225
      pTask->hTaskInfo.operatorOpen = false;
22✔
1226
      code = streamStartScanHistoryAsync(pTask, igUntreated);
22✔
1227
    } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
2,580✔
1228
//      code = tqScanWalAsync((STQ*)handle, false);
1229
    } else {
1230
      code = streamTrySchedExec(pTask);
1,305✔
1231
    }
1232
  }
1233

1234
  return code;
2,600✔
1235
}
1236

1237
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode) {
2,542✔
1238
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
2,542✔
1239

1240
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
2,542✔
1241

1242
  SStreamTask* pTask = NULL;
2,542✔
1243
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
2,542✔
1244
  if (pTask == NULL || (code != 0)) {
2,545!
1245
    tqError("s-task:0x%x failed to acquire task to resume, it may have been dropped or stopped", pReq->taskId);
×
1246
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1247
  }
1248

1249
  streamMutexLock(&pTask->lock);
2,545✔
1250
  SStreamTaskState pState = streamTaskGetStatus(pTask);
2,547✔
1251
  tqDebug("s-task:%s start to resume from paused, current status:%s", pTask->id.idStr, pState.name);
2,547✔
1252
  streamMutexUnlock(&pTask->lock);
2,547✔
1253

1254
  code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
2,548✔
1255
  if (code != 0) {
2,546!
1256
    streamMetaReleaseTask(pMeta, pTask);
×
1257
    return code;
×
1258
  }
1259

1260
  STaskId*     pHTaskId = &pTask->hTaskInfo.id;
2,546✔
1261
  SStreamTask* pHTask = NULL;
2,546✔
1262
  code = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
2,546✔
1263
  if (pHTask && (code == 0)) {
2,547!
1264
    streamMutexLock(&pHTask->lock);
54✔
1265
    SStreamTaskState p = streamTaskGetStatus(pHTask);
54✔
1266
    tqDebug("s-task:%s related history task start to resume from paused, current status:%s", pHTask->id.idStr, p.name);
54!
1267
    streamMutexUnlock(&pHTask->lock);
54✔
1268

1269
    code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode);
54✔
1270
    tqDebug("s-task:%s resume complete, code:%s", pHTask->id.idStr, tstrerror(code));
54!
1271

1272
    streamMetaReleaseTask(pMeta, pHTask);
54✔
1273
  }
1274

1275
  streamMetaReleaseTask(pMeta, pTask);
2,547✔
1276
  return TSDB_CODE_SUCCESS;
2,547✔
1277
}
1278

1279
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); }
×
1280

1281
int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) {
10,867✔
1282
  rpcFreeCont(pMsg->pCont);
10,867✔
1283
  pMsg->pCont = NULL;
10,867✔
1284
  return TSDB_CODE_SUCCESS;
10,867✔
1285
}
1286

1287
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
30,624✔
1288
  SMStreamHbRspMsg rsp = {0};
30,624✔
1289
  int32_t          code = 0;
30,624✔
1290
  SDecoder         decoder;
1291
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
30,624✔
1292
  int32_t          len = pMsg->contLen - sizeof(SMsgHead);
30,624✔
1293

1294
  tDecoderInit(&decoder, (uint8_t*)msg, len);
30,624✔
1295
  code = tDecodeStreamHbRsp(&decoder, &rsp);
30,624✔
1296
  if (code < 0) {
30,624!
1297
    terrno = TSDB_CODE_INVALID_MSG;
×
1298
    tDecoderClear(&decoder);
×
1299
    tqError("vgId:%d failed to parse hb rsp msg, code:%s", pMeta->vgId, tstrerror(terrno));
×
1300
    return terrno;
×
1301
  }
1302

1303
  tDecoderClear(&decoder);
30,624✔
1304
  return streamProcessHeartbeatRsp(pMeta, &rsp);
30,624✔
1305
}
1306

1307
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
4,398✔
1308

1309
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
6,469✔
1310

1311
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
8,452✔
1312
  SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont;
8,452✔
1313

1314
  SStreamTask* pTask = NULL;
8,452✔
1315
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->downstreamTaskId, &pTask);
8,452✔
1316
  if (pTask == NULL || (code != 0)) {
8,452!
1317
    tqError("vgId:%d failed to acquire task:0x%x when handling checkpoint-ready msg, it may have been dropped",
×
1318
            pRsp->downstreamNodeId, pRsp->downstreamTaskId);
1319
    return code;
×
1320
  }
1321

1322
  code = streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId);
8,452✔
1323
  streamMetaReleaseTask(pMeta, pTask);
8,452✔
1324
  return code;
8,452✔
1325
}
1326

1327
int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
127✔
1328
  int32_t                vgId = pMeta->vgId;
127✔
1329
  int32_t                code = 0;
127✔
1330
  SStreamTask*           pTask = NULL;
127✔
1331
  char*                  msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
127✔
1332
  int32_t                len = pMsg->contLen - sizeof(SMsgHead);
127✔
1333
  int64_t                now = taosGetTimestampMs();
127✔
1334
  SDecoder               decoder;
1335
  SRestoreCheckpointInfo req = {0};
127✔
1336

1337
  tDecoderInit(&decoder, (uint8_t*)msg, len);
127✔
1338
  if ((code = tDecodeRestoreCheckpointInfo(&decoder, &req)) < 0) {
127!
1339
    tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code));
×
1340
    tDecoderClear(&decoder);
×
1341
    return TSDB_CODE_SUCCESS;
×
1342
  }
1343

1344
  tDecoderClear(&decoder);
127✔
1345

1346
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
127✔
1347
  if (pTask == NULL || (code != 0)) {
127!
1348
    tqError("vgId:%d process consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
×
1349
            pMeta->vgId, req.taskId);
1350
    // ignore this code to avoid error code over write
1351
    int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
×
1352
    if (ret) {
×
1353
      tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret));
×
1354
    }
1355

1356
    return 0;
×
1357
  }
1358

1359
  // discard the rsp, since it is expired.
1360
  if (req.startTs < pTask->execInfo.created) {
127✔
1361
    tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64
6!
1362
           " from task createTs:%" PRId64 " < task createTs:%" PRId64 ", discard",
1363
           pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs,
1364
           pTask->execInfo.created);
1365
    streamMetaAddFailedTaskSelf(pTask, now);
6✔
1366
    streamMetaReleaseTask(pMeta, pTask);
6✔
1367
    return TSDB_CODE_SUCCESS;
6✔
1368
  }
1369

1370
  tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64 " from mnode",
121✔
1371
          pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId);
1372

1373
  streamMutexLock(&pTask->lock);
121✔
1374
  if (pTask->chkInfo.checkpointId < req.checkpointId) {
121!
1375
    tqFatal("s-task:%s vgId:%d invalid consensus-checkpointId:%" PRId64 ", greater than existed checkpointId:%" PRId64,
×
1376
            pTask->id.idStr, vgId, req.checkpointId, pTask->chkInfo.checkpointId);
1377

1378
    streamMutexUnlock(&pTask->lock);
×
1379
    streamMetaReleaseTask(pMeta, pTask);
×
1380
    return 0;
×
1381
  }
1382

1383
  SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo;
121✔
1384
  if (pConsenInfo->consenChkptTransId >= req.transId) {
121!
1385
    tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
×
1386
            pConsenInfo->consenChkptTransId, req.transId);
1387
    streamMutexUnlock(&pTask->lock);
×
1388
    streamMetaReleaseTask(pMeta, pTask);
×
1389
    return TSDB_CODE_SUCCESS;
×
1390
  }
1391

1392
  if (pTask->chkInfo.checkpointId != req.checkpointId) {
121!
1393
    tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64 " transId:%d", pTask->id.idStr, vgId,
×
1394
            pTask->chkInfo.checkpointId, req.checkpointId, req.transId);
1395
    pTask->chkInfo.checkpointId = req.checkpointId;
×
1396
    tqSetRestoreVersionInfo(pTask);
×
1397
  } else {
1398
    tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current id, transId:%d not update",
121✔
1399
            pTask->id.idStr, vgId, req.checkpointId, req.transId);
1400
  }
1401

1402
  streamTaskSetConsenChkptIdRecv(pTask, req.transId, now);
121✔
1403
  streamMutexUnlock(&pTask->lock);
121✔
1404

1405
  if (pMeta->role == NODE_ROLE_LEADER) {
121!
1406
    code = tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
121✔
1407
    if (code) {
121!
1408
      tqError("s-task:0x%x vgId:%d failed start task async, code:%s", req.taskId, vgId, tstrerror(code));
×
1409
    }
1410
  } else {
1411
    tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr);
×
1412
  }
1413

1414
  streamMetaReleaseTask(pMeta, pTask);
121✔
1415
  return 0;
121✔
1416
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc