• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4473

08 Jul 2025 09:38AM UTC coverage: 62.922% (+0.7%) from 62.22%
#4473

push

travis-ci

web-flow
Merge pull request #31712 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

158525 of 321496 branches covered (49.31%)

Branch coverage included in aggregate %.

56 of 60 new or added lines in 13 files covered. (93.33%)

1333 existing lines in 67 files now uncovered.

245526 of 320647 relevant lines covered (76.57%)

17689640.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.0
/source/dnode/vnode/src/tqCommon/tqCommon.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsgcb.h"
17
#include "tq.h"
18
#include "tstream.h"
19

20
typedef struct SMStreamCheckpointReadyRspMsg {
21
  SMsgHead head;
22
  int64_t  streamId;
23
  int32_t  upstreamTaskId;
24
  int32_t  upstreamNodeId;
25
  int32_t  downstreamTaskId;
26
  int32_t  downstreamNodeId;
27
  int64_t  checkpointId;
28
  int32_t  transId;
29
} SMStreamCheckpointReadyRspMsg;
30

31
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
32

33
int32_t tqExpandStreamTask(SStreamTask* pTask) {
11,377✔
34
  SStreamMeta* pMeta = pTask->pMeta;
11,377✔
35
  int32_t      vgId = pMeta->vgId;
11,377✔
36
  int64_t      st = taosGetTimestampMs();
11,377✔
37
  int64_t      streamId = 0;
11,377✔
38
  int32_t      taskId = 0;
11,377✔
39
  int32_t      code = 0;
11,377✔
40

41
  tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId);
11,377✔
42

43
  if (pTask->info.fillHistory != STREAM_NORMAL_TASK) {
11,378✔
44
    streamId = pTask->streamTaskId.streamId;
3,366✔
45
    taskId = pTask->streamTaskId.taskId;
3,366✔
46
  } else {
47
    streamId = pTask->id.streamId;
8,012✔
48
    taskId = pTask->id.taskId;
8,012✔
49
  }
50

51
  // sink task does not need the pState
52
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
11,378✔
53
    if (pTask->info.fillHistory == STREAM_RECALCUL_TASK) {
5,985!
54
      pTask->pRecalState = streamStateRecalatedOpen(pMeta->path, pTask, pTask->id.streamId, pTask->id.taskId);
×
55
      if (pTask->pRecalState == NULL) {
×
56
        tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
×
57
        return terrno;
×
58
      } else {
59
        tqDebug("s-task:%s recal state:%p", pTask->id.idStr, pTask->pRecalState);
×
60
      }
61
    }
62

63
    pTask->pState = streamStateOpen(pMeta->path, pTask, streamId, taskId);
5,985✔
64
    if (pTask->pState == NULL) {
5,986!
65
      tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
×
66
      return terrno;
×
67
    } else {
68
      tqDebug("s-task:%s stream state:%p", pTask->id.idStr, pTask->pState);
5,986✔
69
    }
70
  }
71

72
  SReadHandle handle = {
11,378✔
73
      .checkpointId = pTask->chkInfo.checkpointId,
11,378✔
74
      .pStateBackend = NULL,
75
      .fillHistory = pTask->info.fillHistory,
11,378✔
76
      .winRange = pTask->dataRange.window,
77
      .pOtherBackend = NULL,
78
  };
79

80
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__MERGE) {
11,378!
81
    handle.vnode = ((STQ*)pMeta->ahandle)->pVnode;
5,723✔
82
    handle.initTqReader = 1;
5,723✔
83
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
5,655✔
84
    handle.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
263✔
85
  }
86

87
  initStorageAPI(&handle.api);
11,378✔
88

89
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG ||
11,377✔
90
      pTask->info.taskLevel == TASK_LEVEL__MERGE) {
5,392!
91
    if (pTask->info.fillHistory == STREAM_RECALCUL_TASK) {
5,985!
92
      handle.pStateBackend = pTask->pRecalState;
×
93
      handle.pOtherBackend = pTask->pState;
×
94
    } else {
95
      handle.pStateBackend = pTask->pState;
5,985✔
96
      handle.pOtherBackend = NULL;
5,985✔
97
    }
98

99
    code = qCreateStreamExecTaskInfo(&pTask->exec.pExecutor, pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
5,985✔
100
    if (code) {
5,984!
101
      tqError("s-task:%s failed to expand task, code:%s", pTask->id.idStr, tstrerror(code));
×
102
      return code;
×
103
    }
104

105
    code = qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
5,984✔
106
    if (code) {
5,986!
107
      return code;
×
108
    }
109

110
    code = qSetStreamNotifyInfo(pTask->exec.pExecutor, pTask->notifyInfo.notifyEventTypes,
11,972✔
111
                                pTask->notifyInfo.pSchemaWrapper, pTask->notifyInfo.stbFullName,
5,986✔
112
                                IS_NEW_SUBTB_RULE(pTask), &pTask->notifyEventStat);
5,986✔
113
    if (code) {
5,986!
114
      tqError("s-task:%s failed to set stream notify info, code:%s", pTask->id.idStr, tstrerror(code));
×
115
      return code;
×
116
    }
117

118
    qSetStreamMergeInfo(pTask->exec.pExecutor, pTask->pVTables);
5,986✔
119
  }
120

121
  streamSetupScheduleTrigger(pTask);
11,378✔
122

123
  double el = (taosGetTimestampMs() - st) / 1000.0;
11,377✔
124
  tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el);
11,377✔
125

126
  return code;
11,378✔
127
}
128

129
void tqSetRestoreVersionInfo(SStreamTask* pTask) {
11,856✔
130
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
11,856✔
131

132
  // checkpoint ver is the kept version, handled data should be the next version.
133
  if (pChkInfo->checkpointId != 0) {
11,856✔
134
    pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
178✔
135
    pChkInfo->processedVer = pChkInfo->checkpointVer;
178✔
136
    pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
178✔
137

138
    tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
178!
139
           pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
140
  }
141

142
  pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
11,856✔
143
}
11,856✔
144

145
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
17✔
146
  int32_t vgId = pMeta->vgId;
17✔
147
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
17✔
148
  if (numOfTasks == 0) {
17!
149
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
150
    return 0;
×
151
  }
152

153
  tqInfo("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
17!
154

155
  int32_t type = restart ? STREAM_EXEC_T_RESTART_ALL_TASKS : STREAM_EXEC_T_START_ALL_TASKS;
17!
156
  return streamTaskSchedTask(cb, vgId, 0, 0, type, false);
17✔
157
}
158

159
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
7,941✔
160
  int32_t vgId = pMeta->vgId;
7,941✔
161
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
7,941✔
162
  if (numOfTasks == 0) {
7,941!
163
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
164
    return 0;
×
165
  }
166

167
  tqDebug("vgId:%d start task:0x%x async", vgId, taskId);
7,941✔
168
  return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK, false);
7,941✔
169
}
170

171
// this is to process request from transaction, always return true.
172
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader) {
43✔
173
  int32_t                  vgId = pMeta->vgId;
43✔
174
  char*                    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
43✔
175
  int32_t                  len = pMsg->contLen - sizeof(SMsgHead);
43✔
176
  SRpcMsg                  rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
43✔
177
  int64_t                  st = taosGetTimestampMs();
43✔
178
  bool                     updated = false;
43✔
179
  int32_t                  code = 0;
43✔
180
  SStreamTask*             pTask = NULL;
43✔
181
  SStreamTask*             pHTask = NULL;
43✔
182
  SStreamTaskNodeUpdateMsg req = {0};
43✔
183
  SDecoder                 decoder;
184

185
  tDecoderInit(&decoder, (uint8_t*)msg, len);
43✔
186
  code = tDecodeStreamTaskUpdateMsg(&decoder, &req);
43✔
187
  tDecoderClear(&decoder);
43✔
188

189
  if (code < 0) {
43!
190
    rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
×
191
    tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(code));
×
192
    tDestroyNodeUpdateMsg(&req);
×
193
    return rsp.code;
×
194
  }
195

196
  int32_t gError = streamGetFatalError(pMeta);
43✔
197
  if (gError != 0) {
43!
198
    tqError("vgId:%d global fatal occurs, code:%s, ts:%" PRId64 " func:%s", pMeta->vgId, tstrerror(gError),
×
199
            pMeta->fatalInfo.ts, pMeta->fatalInfo.func);
200
    return 0;
×
201
  }
202

203
  // update the nodeEpset when it exists
204
  streamMetaWLock(pMeta);
43✔
205

206
  // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
207
  STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
43✔
208
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
43✔
209
  if (code != 0) {
43!
210
    tqError("vgId:%d failed to acquire task:0x%x when handling update task epset, it may have been dropped", vgId,
×
211
            req.taskId);
212
    rsp.code = TSDB_CODE_SUCCESS;
×
213
    streamMetaWUnLock(pMeta);
×
214
    tDestroyNodeUpdateMsg(&req);
×
215
    return rsp.code;
×
216
  }
217

218
  const char* idstr = pTask->id.idStr;
43✔
219

220
  if (req.transId <= 0) {
43!
221
    tqError("vgId:%d invalid update nodeEp task, transId:%d, discard", vgId, req.taskId);
×
222
    rsp.code = TSDB_CODE_SUCCESS;
×
223

224
    streamMetaReleaseTask(pMeta, pTask);
×
225
    streamMetaWUnLock(pMeta);
×
226

227
    tDestroyNodeUpdateMsg(&req);
×
228
    return rsp.code;
×
229
  }
230

231
  // info needs to be kept till the new trans to update the nodeEp arrived.
232
  bool update = streamMetaInitUpdateTaskList(pMeta, req.transId, req.pTaskList);
43✔
233
  if (!update) {
43!
234
    rsp.code = TSDB_CODE_SUCCESS;
×
235

236
    streamMetaReleaseTask(pMeta, pTask);
×
237
    streamMetaWUnLock(pMeta);
×
238

239
    tDestroyNodeUpdateMsg(&req);
×
240
    return rsp.code;
×
241
  }
242

243
  // duplicate update epset msg received, discard this redundant message
244
  STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
43✔
245

246
  void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
43✔
247
  if (pReqTask != NULL) {
43!
248
    tqDebug("s-task:%s (vgId:%d) already update in transId:%d, discard the nodeEp update msg", idstr, vgId,
×
249
            req.transId);
250
    rsp.code = TSDB_CODE_SUCCESS;
×
251

252
    streamMetaReleaseTask(pMeta, pTask);
×
253
    streamMetaWUnLock(pMeta);
×
254

255
    tDestroyNodeUpdateMsg(&req);
×
256
    return rsp.code;
×
257
  }
258

259
  updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
43✔
260

261
  // send the checkpoint-source-rsp for source task to end the checkpoint trans in mnode
262
  code = streamTaskSendCheckpointsourceRsp(pTask);
43✔
263
  if (code) {
43!
264
    tqError("%s failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
×
265
  }
266
  streamTaskResetStatus(pTask);
43✔
267

268
  streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
43✔
269

270
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
43✔
271
    code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->hTaskInfo.id, &pHTask);
31✔
272
    if (code != 0) {
31!
273
      tqError(
×
274
          "vgId:%d failed to acquire fill-history task:0x%x when handling update, may have been dropped already, rel "
275
          "stream task:0x%x",
276
          vgId, (uint32_t)pTask->hTaskInfo.id.taskId, req.taskId);
277
      CLEAR_RELATED_FILLHISTORY_TASK(pTask);
×
278
    } else {
279
      tqDebug("s-task:%s fill-history task update nodeEp along with stream task", pHTask->id.idStr);
31!
280
      bool updateEpSet = streamTaskUpdateEpsetInfo(pHTask, req.pNodeList);
31✔
281
      if (updateEpSet) {
31!
282
        updated = updateEpSet;
31✔
283
      }
284

285
      streamTaskResetStatus(pHTask);
31✔
286
      streamTaskStopMonitorCheckRsp(&pHTask->taskCheckInfo, pHTask->id.idStr);
31✔
287
    }
288
  }
289

290
  // stream do update the nodeEp info, write it into stream meta.
291
  if (updated) {
43✔
292
    tqInfo("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId);
38!
293
    code = streamMetaSaveTaskInMeta(pMeta, pTask);
38✔
294
    if (code) {
38!
295
      tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code));
×
296
    }
297

298
    if (pHTask != NULL) {
38✔
299
      code = streamMetaSaveTaskInMeta(pMeta, pHTask);
31✔
300
      if (code) {
31!
301
        tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code));
×
302
      }
303
    }
304
  } else {
305
    tqInfo("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId);
5!
306
  }
307

308
  code = streamTaskStop(pTask);
43✔
309
  if (code) {
43!
310
    tqError("s-task:%s vgId:%d failed to stop task, code:%s", idstr, vgId, tstrerror(code));
×
311
  }
312

313
  if (pHTask != NULL) {
43✔
314
    code = streamTaskStop(pHTask);
31✔
315
    if (code) {
31!
316
      tqError("s-task:%s vgId:%d failed to stop related history task, code:%s", idstr, vgId, tstrerror(code));
×
317
    }
318
  }
319

320
  // keep info
321
  streamMetaAddIntoUpdateTaskList(pMeta, pTask, req.transId, st);
43✔
322
  streamMetaReleaseTask(pMeta, pTask);
43✔
323
  streamMetaReleaseTask(pMeta, pHTask);
43✔
324

325
  rsp.code = TSDB_CODE_SUCCESS;
43✔
326

327
  // possibly only handle the stream task.
328
  int32_t reqUpdateTasks = taosArrayGetSize(req.pTaskList);
43✔
329
  int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
43✔
330
  bool    hasUnupdated = false;
43✔
331

332
  for(int32_t i = 0; i < taosArrayGetSize(req.pTaskList); ++i) {
126✔
333

334
    int32_t* pTaskId = (int32_t*) taosArrayGet(req.pTaskList, i);
83✔
335
    if (pTaskId != NULL) {
83!
336
      int32_t index = -1;
83✔
337

338
      for(int32_t j = 0; j < taosArrayGetSize(pMeta->pTaskList); ++j) {
219!
339
        SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, j);
219✔
340
        if (*pTaskId == pId->taskId) {  // task id exist in vnode
219✔
341
          index = j;
83✔
342
          break;
83✔
343
        }
344
      }
345

346
      if (index != -1) {
83!
347
        SStreamTaskId*   pId = taosArrayGet(pMeta->pTaskList, index);
83✔
348
        STaskUpdateEntry uEntry = {.streamId = pId->streamId, .taskId = pId->taskId, .transId = req.transId};
83✔
349
        void*            p = taosHashGet(pMeta->updateInfo.pTasks, &uEntry, sizeof(uEntry));
83✔
350
        if (p == NULL) {
83✔
351
          tqInfo("vgId:%d s-task:0x%x not updated yet, wait for it to be updated", vgId, uEntry.taskId);
20!
352
          hasUnupdated = true;
20✔
353
        }
354
      } else {
355
        tqError("vgId:%d s-task:0x%x not exists, ignore update", vgId, *pTaskId);
×
356
      }
357
    }
358
  }
359

360
  int32_t numOfActualTasks = streamMetaGetNumOfTasks(pMeta);
43✔
361
  if (numOfActualTasks < reqUpdateTasks) {
43!
362
    tqInfo("vgId:%d req updated tasks from mnode-side:%d to vnode-side:%d", vgId, updateTasks, numOfActualTasks);
×
363
    reqUpdateTasks = numOfActualTasks;
×
364
  }
365

366
  if (restored && isLeader) {
43!
367
    tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId);
31!
368
    pMeta->startInfo.tasksWillRestart = 1;
31✔
369
  }
370

371
  if (hasUnupdated) {
43✔
372
    if (isLeader) {
20✔
373
      tqInfo("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
14!
374
              updateTasks, (reqUpdateTasks - updateTasks));
375
    } else {
376
      tqInfo("vgId:%d closed tasks:%d, unclosed:%d, follower not restart tasks", vgId, updateTasks,
6!
377
              (reqUpdateTasks - updateTasks));
378
    }
379
  } else {
380
    if ((code = streamMetaCommit(pMeta)) < 0) {
23!
381
      // always return true
382
      streamMetaWUnLock(pMeta);
×
383
      tDestroyNodeUpdateMsg(&req);
×
384
      tqError("vgId:%d commit meta failed, code:%s not restart the stream tasks", vgId, tstrerror(code));
×
385
      return TSDB_CODE_SUCCESS;
×
386
    }
387

388
    streamMetaClearSetUpdateTaskListComplete(pMeta);
23✔
389

390
    if (isLeader) {
23✔
391
      if (!restored) {
17!
392
        tqInfo("vgId:%d vnode restore not completed, not start all tasks", vgId);
×
393
      } else {
394
        tqInfo("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, reqUpdateTasks, req.transId);
17!
395
#if 0
396
      taosMSleep(5000);// for test purpose, to trigger the leader election
397
#endif
398
        code = tqStreamTaskStartAsync(pMeta, cb, true);
17✔
399
        if (code) {
17!
400
          tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code));
×
401
        }
402
      }
403
    } else {
404
      tqInfo("vgId:%d follower nodes not restart tasks", vgId);
6!
405
    }
406
  }
407

408
  streamMetaWUnLock(pMeta);
43✔
409
  tDestroyNodeUpdateMsg(&req);
43✔
410
  return rsp.code;  // always return true
43✔
411
}
412

413
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg, const SMsgCb* msgcb) {
23,492✔
414
  char*   msgStr = pMsg->pCont;
23,492✔
415
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
23,492✔
416
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
23,492✔
417

418
  SStreamDispatchReq req = {0};
23,492✔
419

420
  SDecoder decoder;
421
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
23,492✔
422
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
23,491!
423
    tDecoderClear(&decoder);
×
424
    return TSDB_CODE_MSG_DECODE_ERROR;
×
425
  }
426
  tDecoderClear(&decoder);
23,496✔
427

428
  tqDebug("s-task:0x%x recv dispatch msg from 0x%x(vgId:%d)", req.taskId, req.upstreamTaskId, req.upstreamNodeId);
23,499✔
429

430
  SStreamTask* pTask = NULL;
23,500✔
431
  int32_t      code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
23,500✔
432
  if (pTask && (code == 0)) {
23,508!
433
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
23,471✔
434
    if (streamProcessDispatchMsg(pTask, &req, &rsp, msgcb) != 0) {
23,471!
435
      return -1;
×
436
    }
437
    tCleanupStreamDispatchReq(&req);
23,466✔
438
    streamMetaReleaseTask(pMeta, pTask);
23,460✔
439
    return 0;
23,469✔
440
  } else {
441
    tqError("vgId:%d failed to find task:0x%x to handle the dispatch req, it may have been destroyed already",
37!
442
            pMeta->vgId, req.taskId);
443

444
    SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
37✔
445
    if (pRspHead == NULL) {
41!
446
      tqError("s-task:0x%x send dispatch error rsp, out of memory", req.taskId);
×
447
      return terrno;
×
448
    }
449

450
    pRspHead->vgId = htonl(req.upstreamNodeId);
41✔
451
    if (pRspHead->vgId == 0) {
41!
452
      tqError("vgId:%d invalid dispatch msg from upstream to task:0x%x", pMeta->vgId, req.taskId);
×
453
      return TSDB_CODE_INVALID_MSG;
×
454
    }
455

456
    SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
41✔
457
    pRsp->streamId = htobe64(req.streamId);
41✔
458
    pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
41✔
459
    pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
41✔
460
    pRsp->downstreamNodeId = htonl(pMeta->vgId);
41✔
461
    pRsp->downstreamTaskId = htonl(req.taskId);
41✔
462
    pRsp->msgId = htonl(req.msgId);
41✔
463
    pRsp->stage = htobe64(req.stage);
41✔
464
    pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;
41✔
465

466
    int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
41✔
467
    SRpcMsg rsp = {.code = TSDB_CODE_STREAM_TASK_NOT_EXIST, .info = pMsg->info, .contLen = len, .pCont = pRspHead};
41✔
468
    tqError("s-task:0x%x send dispatch error rsp, no task", req.taskId);
41!
469

470
    tmsgSendRsp(&rsp);
41✔
471
    tCleanupStreamDispatchReq(&req);
41✔
472

473
    return 0;
41✔
474
  }
475
}
476

477
int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
23,521✔
478
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
23,521✔
479

480
  int32_t vgId = pMeta->vgId;
23,521✔
481
  pRsp->upstreamNodeId = htonl(pRsp->upstreamNodeId);
23,521✔
482
  pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
23,521✔
483
  pRsp->streamId = htobe64(pRsp->streamId);
23,521✔
484
  pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId);
23,522✔
485
  pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId);
23,522✔
486
  pRsp->stage = htobe64(pRsp->stage);
23,522✔
487
  pRsp->msgId = htonl(pRsp->msgId);
23,522✔
488

489
  tqDebug("s-task:0x%x vgId:%d recv dispatch-rsp from 0x%x vgId:%d", pRsp->upstreamTaskId, pRsp->upstreamNodeId,
23,522✔
490
          pRsp->downstreamTaskId, pRsp->downstreamNodeId);
491

492
  SStreamTask* pTask = NULL;
23,522✔
493
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId, &pTask);
23,522✔
494
  if (pTask && (code == 0)) {
23,523!
495
    code = streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
23,507✔
496
    streamMetaReleaseTask(pMeta, pTask);
23,508✔
497
    return code;
23,507✔
498
  } else {
499
    tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId);
16✔
500
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
16✔
501
  }
502
}
503

504
int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
539✔
505
  char*    msgStr = pMsg->pCont;
539✔
506
  char*    msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
539✔
507
  int32_t  msgLen = pMsg->contLen - sizeof(SMsgHead);
539✔
508
  int32_t  code = 0;
539✔
509
  SDecoder decoder;
510

511
  SStreamRetrieveReq req;
512
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
539✔
513
  code = tDecodeStreamRetrieveReq(&decoder, &req);
539✔
514
  tDecoderClear(&decoder);
539✔
515

516
  if (code) {
539!
517
    tqError("vgId:%d failed to decode retrieve msg, discard it", pMeta->vgId);
×
518
    return code;
×
519
  }
520

521
  SStreamTask* pTask = NULL;
539✔
522
  code = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId, &pTask);
539✔
523
  if (pTask == NULL || code != 0) {
539!
524
    tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
×
525
            req.dstTaskId);
526
    tCleanupStreamRetrieveReq(&req);
×
527
    return code;
×
528
  }
529

530
  // enqueue
531
  tqDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d), QID:0x%" PRIx64, pTask->id.idStr,
539✔
532
          pTask->pMeta->vgId, pTask->info.taskLevel, req.srcTaskId, req.srcNodeId, req.reqId);
533

534
  // if task is in ck status, set current ck failed
535
  streamTaskSetCheckpointFailed(pTask);
539✔
536

537
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
539✔
538
    code = streamProcessRetrieveReq(pTask, &req);
533✔
539
  } else {
540
    req.srcNodeId = pTask->info.nodeId;
6✔
541
    req.srcTaskId = pTask->id.taskId;
6✔
542
    code = streamTaskBroadcastRetrieveReq(pTask, &req);
6✔
543
  }
544

545
  if (code != TSDB_CODE_SUCCESS) {  // return error not send rsp manually
539!
546
    tqError("s-task:0x%x vgId:%d failed to process retrieve request from 0x%x, code:%s", req.dstTaskId, req.dstNodeId,
×
547
            req.srcTaskId, tstrerror(code));
548
  } else {  // send rsp manually only on success.
549
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
539✔
550
    streamTaskSendRetrieveRsp(&req, &rsp);
539✔
551
  }
552

553
  streamMetaReleaseTask(pMeta, pTask);
539✔
554
  tCleanupStreamRetrieveReq(&req);
539✔
555

556
  // always return success, to disable the auto rsp
557
  return code;
539✔
558
}
559

560
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
20,108✔
561
  char*   msgStr = pMsg->pCont;
20,108✔
562
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
20,108✔
563
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
20,108✔
564
  int32_t code = 0;
20,108✔
565

566
  SStreamTaskCheckReq req;
567
  SStreamTaskCheckRsp rsp = {0};
20,108✔
568

569
  SDecoder decoder;
570

571
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
20,108✔
572
  code = tDecodeStreamTaskCheckReq(&decoder, &req);
20,107✔
573
  tDecoderClear(&decoder);
20,102✔
574

575
  if (code) {
20,112!
576
    tqError("vgId:%d decode check msg failed, not handle this msg", pMeta->vgId);
×
577
    return code;
×
578
  }
579

580
  streamTaskProcessCheckMsg(pMeta, &req, &rsp);
20,112✔
581
  return streamTaskSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId);
20,122✔
582
}
583

584
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
20,117✔
585
  char*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
20,117✔
586
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
20,117✔
587
  int32_t vgId = pMeta->vgId;
20,117✔
588
  int32_t code = TSDB_CODE_SUCCESS;
20,117✔
589

590
  SStreamTaskCheckRsp rsp;
591

592
  SDecoder decoder;
593
  tDecoderInit(&decoder, (uint8_t*)pReq, len);
20,117✔
594
  code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
20,114✔
595
  if (code < 0) {
20,096!
596
    terrno = TSDB_CODE_INVALID_MSG;
×
597
    tDecoderClear(&decoder);
×
598
    tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno));
×
599
    return -1;
×
600
  }
601

602
  tDecoderClear(&decoder);
20,096✔
603
  tqDebug("tq task:0x%x (vgId:%d) recv check rsp(QID:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId,
20,114✔
604
          rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
605

606
  if (!isLeader) {
20,116!
607
    tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
×
608
            rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
609
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId, true);
×
610
  }
611

612
  SStreamTask* pTask = NULL;
20,116✔
613
  code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId, &pTask);
20,116✔
614
  if ((pTask == NULL) || (code != 0)) {
20,114!
615
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId, true);
123✔
616
  }
617

618
  code = streamTaskProcessCheckRsp(pTask, &rsp);
19,991✔
619
  streamMetaReleaseTask(pMeta, pTask);
19,988✔
620
  return code;
19,994✔
621
}
622

623
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
5,096✔
624
  int32_t vgId = pMeta->vgId;
5,096✔
625
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
5,096✔
626
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
5,096✔
627
  int32_t code = 0;
5,096✔
628

629
  SStreamCheckpointReadyMsg req = {0};
5,096✔
630

631
  SDecoder decoder;
632
  tDecoderInit(&decoder, (uint8_t*)msg, len);
5,096✔
633
  if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) {
5,094!
634
    code = TSDB_CODE_MSG_DECODE_ERROR;
×
635
    tDecoderClear(&decoder);
×
636
    return code;
×
637
  }
638
  tDecoderClear(&decoder);
5,089✔
639

640
  SStreamTask* pTask = NULL;
5,099✔
641
  code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
5,099✔
642
  if (code != 0) {
5,098!
643
    tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId);
×
644
    return code;
×
645
  }
646

647
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
5,098!
648
    tqDebug("vgId:%d s-task:%s recv invalid the checkpoint-ready msg from task:0x%x (vgId:%d), discard", vgId,
×
649
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
650
    streamMetaReleaseTask(pMeta, pTask);
×
651
    return TSDB_CODE_INVALID_MSG;
×
652
  } else {
653
    tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId,
5,098✔
654
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
655
  }
656

657
  code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamNodeId, req.downstreamTaskId);
5,098✔
658
  streamMetaReleaseTask(pMeta, pTask);
5,096✔
659
  if (code) {
5,094!
660
    return code;
×
661
  }
662

663
  {  // send checkpoint ready rsp
664
    SMStreamCheckpointReadyRspMsg* pReadyRsp = rpcMallocCont(sizeof(SMStreamCheckpointReadyRspMsg));
5,094✔
665
    if (pReadyRsp == NULL) {
5,097!
666
      return terrno;
×
667
    }
668

669
    pReadyRsp->upstreamTaskId = req.upstreamTaskId;
5,097✔
670
    pReadyRsp->upstreamNodeId = req.upstreamNodeId;
5,097✔
671
    pReadyRsp->downstreamTaskId = req.downstreamTaskId;
5,097✔
672
    pReadyRsp->downstreamNodeId = req.downstreamNodeId;
5,097✔
673
    pReadyRsp->checkpointId = req.checkpointId;
5,097✔
674
    pReadyRsp->streamId = req.streamId;
5,097✔
675
    pReadyRsp->head.vgId = htonl(req.downstreamNodeId);
5,097✔
676

677
    SRpcMsg rsp = {.code = 0, .info = pMsg->info, .pCont = pReadyRsp, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)};
5,097✔
678
    tmsgSendRsp(&rsp);
5,097✔
679

680
    pMsg->info.handle = NULL;  // disable auto rsp
5,098✔
681
  }
682

683
  return code;
5,098✔
684
}
685

686
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
11,181✔
687
                                     bool isLeader, bool restored) {
688
  int32_t code = 0;
11,181✔
689
  int32_t vgId = pMeta->vgId;
11,181✔
690
  int32_t numOfTasks = 0;
11,181✔
691
  int32_t taskId = -1;
11,181✔
692
  int64_t streamId = -1;
11,181✔
693
  bool    added = false;
11,181✔
694
  int32_t size = sizeof(SStreamTask);
11,181✔
695

696
  if (tsDisableStream) {
11,181!
697
    tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
×
698
    return code;
×
699
  }
700

701
  tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
11,181✔
702

703
  // 1.deserialize msg and build task
704
  SStreamTask* pTask = taosMemoryCalloc(1, size);
11,181!
705
  if (pTask == NULL) {
11,188!
706
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);
×
707
    return terrno;
×
708
  }
709

710
  SDecoder decoder;
711
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
11,188✔
712
  code = tDecodeStreamTask(&decoder, pTask);
11,191✔
713
  tDecoderClear(&decoder);
11,184✔
714

715
  if (code != TSDB_CODE_SUCCESS) {
11,186!
716
    taosMemoryFree(pTask);
×
717
    return TSDB_CODE_INVALID_MSG;
×
718
  }
719

720
  // 2.save task, use the latest commit version as the initial start version of stream task.
721
  taskId = pTask->id.taskId;
11,186✔
722
  streamId = pTask->id.streamId;
11,186✔
723

724
  streamMetaWLock(pMeta);
11,186✔
725
  code = streamMetaRegisterTask(pMeta, sversion, pTask, &added);
11,194✔
726
  numOfTasks = streamMetaGetNumOfTasks(pMeta);
11,198✔
727
  streamMetaWUnLock(pMeta);
11,198✔
728

729
  if (code < 0) {
11,198!
730
    tqError("vgId:%d failed to register s-task:0x%x into meta, existed tasks:%d, code:%s", vgId, taskId, numOfTasks,
×
731
            tstrerror(code));
732
    return code;
×
733
  }
734

735
  // added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if
736
  // it is added into the meta store
737
  if (added) {
11,198!
738
    // only handled in the leader node
739
    if (isLeader) {
11,198✔
740
      tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
11,146✔
741

742
      if (restored) {
11,146!
743
        SStreamTask* p = NULL;
11,146✔
744
        code = streamMetaAcquireTask(pMeta, streamId, taskId, &p);
11,146✔
745
        if ((p != NULL) && (code == 0) && (p->info.fillHistory == 0)) {
11,146!
746
          code = tqStreamStartOneTaskAsync(pMeta, cb, streamId, taskId);
7,840✔
747
        }
748

749
        if (p != NULL) {
11,146!
750
          streamMetaReleaseTask(pMeta, p);
11,146✔
751
        }
752
      } else {
753
        tqWarn("s-task:0x%x not launched since vnode(vgId:%d) not ready", taskId, vgId);
×
754
      }
755

756
    } else {
757
      tqDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId);
52!
758
    }
759
  } else {
760
    tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store, total:%d", vgId, taskId, numOfTasks);
×
761
  }
762

763
  return code;
11,198✔
764
}
765

766
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
5,713✔
767
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
5,713✔
768
  int32_t              code = 0;
5,713✔
769
  int32_t              vgId = pMeta->vgId;
5,713✔
770
  STaskId              hTaskId = {0};
5,713✔
771
  SStreamTask*         pTask = NULL;
5,713✔
772

773
  tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
5,713✔
774

775
  streamMetaWLock(pMeta);
5,714✔
776

777
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
5,731✔
778
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
5,731✔
779
  if (code == 0) {
5,719!
780
    if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
5,719✔
781
      hTaskId.streamId = pTask->hTaskInfo.id.streamId;
1,324✔
782
      hTaskId.taskId = pTask->hTaskInfo.id.taskId;
1,324✔
783
    }
784

785
    // clear the relationship, and then release the stream tasks, to avoid invalid accessing of already freed
786
    // related stream(history) task
787
    streamTaskSetRemoveBackendFiles(pTask);
5,719✔
788
    code = streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt);
5,707✔
789
    streamMetaReleaseTask(pMeta, pTask);
5,726✔
790

791
    if (code) {
5,724!
792
      tqError("s-task:0x%x failed to clear related fill-history info, still exists", pReq->taskId);
×
793
    }
794
  }
795

796
  // drop the related fill-history task firstly
797
  if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
5,724!
798
    tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
1,320✔
799
    code = streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
1,320✔
800
    if (code) {
1,325!
801
      tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x failed", pReq->taskId, vgId,
×
802
              (int32_t)hTaskId.taskId);
803
    }
804
  }
805

806
  // drop the stream task now
807
  code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
5,729✔
808
  if (code) {
5,727!
809
    tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId);
×
810
  }
811

812
  // commit the update
813
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
5,727✔
814
  tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
5,732✔
815
  if (numOfTasks == 0) {
5,732✔
816
    streamMetaResetStartInfo(&pMeta->startInfo, vgId);
1,340✔
817
  }
818

819
  if (streamMetaCommit(pMeta) < 0) {
5,732✔
820
    // persist to disk
821
  }
822

823
  streamMetaWUnLock(pMeta);
5,735✔
824
  tqDebug("vgId:%d process drop task:0x%x completed", vgId, pReq->taskId);
5,735✔
825

826
  return 0;  // always return success
5,735✔
827
}
828

829
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg) {
3,320✔
830
  SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
3,320✔
831
  int32_t                    code = 0;
3,320✔
832
  int32_t                    vgId = pMeta->vgId;
3,320✔
833
  SStreamTask*               pTask = NULL;
3,320✔
834

835
  tqDebug("vgId:%d receive msg to update-checkpoint-info for s-task:0x%x", vgId, pReq->taskId);
3,320✔
836

837
  streamMetaWLock(pMeta);
3,320✔
838

839
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
3,326✔
840
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
3,326✔
841
  if (code == 0) {
3,325!
842
    code = streamTaskUpdateTaskCheckpointInfo(pTask, restored, pReq);
3,325✔
843
    streamMetaReleaseTask(pMeta, pTask);
3,329✔
844
  } else {  // failed to get the task.
845
    int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
×
846
    tqError(
×
847
        "vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, numOfTasks:%d, it may have been "
848
        "dropped already",
849
        vgId, pReq->taskId, numOfTasks);
850
  }
851

852
  streamMetaWUnLock(pMeta);
3,329✔
853
  // always return success when handling the requirement issued by mnode during transaction.
854
  return TSDB_CODE_SUCCESS;
3,329✔
855
}
856

857
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
17✔
858
  int32_t         vgId = pMeta->vgId;
17✔
859
  int32_t         code = 0;
17✔
860
  int64_t         st = taosGetTimestampMs();
17✔
861
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
17✔
862

863
  if (pStartInfo->startAllTasks == 1) {
17✔
864
    // wait for the checkpoint id rsp, this rsp will be expired
865
    if (pStartInfo->curStage == START_MARK_REQ_CHKPID) {
4!
866
      SStartTaskStageInfo* pCurStageInfo = taosArrayGetLast(pStartInfo->pStagesList);
×
867
      tqInfo("vgId:%d only mark the req consensus checkpointId flag, reqTs:%"PRId64 " ignore and continue", vgId, pCurStageInfo->ts);
×
868

869
      taosArrayClear(pStartInfo->pStagesList);
×
870
      pStartInfo->curStage = 0;
×
871
      goto _start;
×
872

873
    } else if (pStartInfo->curStage == START_WAIT_FOR_CHKPTID) {
4✔
874
      SStartTaskStageInfo* pCurStageInfo = taosArrayGetLast(pStartInfo->pStagesList);
3✔
875
      tqInfo("vgId:%d already sent consensus-checkpoint msg(waiting for chkptid) expired, reqTs:%" PRId64
3!
876
             " rsp will be discarded",
877
             vgId, pCurStageInfo->ts);
878

879
      taosArrayClear(pStartInfo->pStagesList);
3✔
880
      pStartInfo->curStage = 0;
3✔
881
      goto _start;
3✔
882

883
    } else if (pStartInfo->curStage == START_CHECK_DOWNSTREAM) {
1!
884
      int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet);
×
885
      taosHashGetSize(pStartInfo->pFailedTaskSet);
×
886

887
      int32_t newTotal = taosArrayGetSize(pStartInfo->pRecvChkptIdTasks);
×
888
      tqDebug(
×
889
          "vgId:%d start all tasks procedure is interrupted by transId:%d, wait for partial tasks rsp. recv check "
890
          "downstream results, received:%d results, total req tasks:%d",
891
          vgId, pMeta->updateInfo.activeTransId, numOfRecv, newTotal);
892

893
      bool allRsp = allCheckDownstreamRspPartial(pStartInfo, newTotal, pMeta->vgId);
×
894
      if (allRsp) {
×
895
        tqDebug("vgId:%d all partial results received, continue the restart procedure", pMeta->vgId);
×
896
        streamMetaResetStartInfo(pStartInfo, vgId);
×
897
        goto _start;
×
898
      } else {
899
        pStartInfo->restartCount += 1;
×
900
        SStartTaskStageInfo* pCurStageInfo = taosArrayGetLast(pStartInfo->pStagesList);
×
901

902
        tqDebug("vgId:%d in start tasks procedure (check downstream), reqTs:%" PRId64
×
903
                ", inc restartCounter by 1 and wait for it completes, "
904
                "remaining restart:%d",
905
                vgId, pCurStageInfo->ts, pStartInfo->restartCount);
906
      }
907
    } else {
908
      tqInfo("vgId:%d in start procedure, but not start to do anything yet, do nothing", vgId);
1!
909
    }
910

911
    return TSDB_CODE_SUCCESS;
1✔
912
  }
913

914
_start:
13✔
915

916
  pStartInfo->startAllTasks = 1;
16✔
917
  terrno = 0;
16✔
918
  tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d, ts:%" PRId64, vgId,
16!
919
         pMeta->updateInfo.completeTransId, pMeta->updateInfo.completeTs);
920

921
  streamMetaClear(pMeta);
16✔
922

923
  int64_t el = taosGetTimestampMs() - st;
16✔
924
  tqInfo("vgId:%d clear&close stream meta completed, elapsed time:%.3fs", vgId, el / 1000.);
16!
925

926
  streamMetaLoadAllTasks(pMeta);
16✔
927

928
  if (isLeader && !tsDisableStream) {
16!
929
    code = streamMetaStartAllTasks(pMeta);
16✔
930
  } else {
931
    streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
×
932
    pStartInfo->restartCount = 0;
×
933
    tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId);
×
934
  }
935

936
  code = terrno;
16✔
937
  return code;
16✔
938
}
939

940
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
75,939✔
941
  int32_t  code = 0;
75,939✔
942
  int32_t  vgId = pMeta->vgId;
75,939✔
943
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
75,939✔
944
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
75,939✔
945
  SDecoder decoder;
946

947
  SStreamTaskRunReq req = {0};
75,939✔
948
  tDecoderInit(&decoder, (uint8_t*)msg, len);
75,939✔
949
  if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
76,133!
950
    tqError("vgId:%d failed to decode task run req, code:%s", pMeta->vgId, tstrerror(code));
×
951
    tDecoderClear(&decoder);
×
952
    return TSDB_CODE_SUCCESS;
×
953
  }
954

955
  tDecoderClear(&decoder);
76,128✔
956

957
  int32_t type = req.reqType;
76,135✔
958
  if (type == STREAM_EXEC_T_START_ONE_TASK) {
76,135✔
959
    code = streamMetaStartOneTask(pMeta, req.streamId, req.taskId);
7,963✔
960
    return 0;
7,941✔
961
  } else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
68,172✔
962
    streamMetaWLock(pMeta);
11,509✔
963
    code = streamMetaStartAllTasks(pMeta);
11,511✔
964
    streamMetaWUnLock(pMeta);
11,511✔
965
    return 0;
11,511✔
966
  } else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) {
56,663✔
967
    streamMetaWLock(pMeta);
17✔
968
    code = restartStreamTasks(pMeta, isLeader);
17✔
969
    streamMetaWUnLock(pMeta);
17✔
UNCOV
970
    return 0;
×
971
  } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
56,646✔
972
    code = streamMetaStopAllTasks(pMeta);
6,918✔
973
    return 0;
6,920✔
974
  } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
49,728✔
975
    code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId, true);
2✔
976
    return code;
2✔
977
  } else if (type == STREAM_EXEC_T_STOP_ONE_TASK) {
49,726!
978
    code = streamMetaStopOneTask(pMeta, req.streamId, req.taskId);
×
979
    return code;
×
980
  } else if (type == STREAM_EXEC_T_RESUME_TASK) {  // task resume to run after idle for a while
49,726✔
981
    SStreamTask* pTask = NULL;
3,820✔
982
    code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
3,820✔
983

984
    if (pTask != NULL && (code == 0)) {
3,820!
985
      char* pStatus = NULL;
3,820✔
986
      if (streamTaskReadyToRun(pTask, &pStatus)) {
3,820!
987
        int64_t execTs = pTask->status.lastExecTs;
3,818✔
988
        int32_t idle = taosGetTimestampMs() - execTs;
3,819✔
989
        tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs);
3,819✔
990

991
        code = streamResumeTask(pTask);
3,819✔
992
      } else {
993
        int8_t status = streamTaskSetSchedStatusInactive(pTask);
×
994
        tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
×
995
                pTask->id.idStr, pStatus, status);
996
      }
997
      streamMetaReleaseTask(pMeta, pTask);
3,815✔
998
    }
999

1000
    return code;
3,819✔
1001
  }
1002

1003
  SStreamTask* pTask = NULL;
45,906✔
1004
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
45,906✔
1005
  if ((pTask != NULL) && (code == 0)) {  // even in halt status, the data in inputQ must be processed
45,884!
1006
    char* p = NULL;
45,891✔
1007
    if (streamTaskReadyToRun(pTask, &p)) {
45,891✔
1008
      tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId,
45,539✔
1009
              pTask->id.idStr, p, pTask->chkInfo.nextProcessVer);
1010
      (void)streamExecTask(pTask);
45,539✔
1011
    } else {
1012
      int8_t status = streamTaskSetSchedStatusInactive(pTask);
323✔
1013
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
336!
1014
              pTask->id.idStr, p, status);
1015
    }
1016

1017
    streamMetaReleaseTask(pMeta, pTask);
45,866✔
1018
    return 0;
45,906✔
1019
  } else {  // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
1020
    // todo add one function to handle this
UNCOV
1021
    tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, req.taskId);
×
1022
    return code;
18✔
1023
  }
1024
}
1025

1026
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
49✔
1027
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
49✔
1028
  int32_t         vgId = pMeta->vgId;
49✔
1029
  bool            scanWal = false;
49✔
1030
  int32_t         code = 0;
49✔
1031

1032
//  streamMetaWLock(pMeta);
1033
  if (pStartInfo->startAllTasks == 1) {
49!
1034
    tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId,
×
1035
            pMeta->startInfo.restartCount);
1036
  } else {  // not in starting procedure
1037
    bool allReady = streamMetaAllTasksReady(pMeta);
49✔
1038

1039
    if ((pStartInfo->restartCount > 0) && (!allReady)) {
49!
1040
      // if all tasks are ready now, do NOT restart again, and reset the value of pStartInfo->restartCount
1041
      pStartInfo->restartCount -= 1;
×
1042
      tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", vgId, pMeta->role,
×
1043
              pStartInfo->restartCount);
1044

1045
      return restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
×
1046
    } else {
1047
      if (pStartInfo->restartCount == 0) {
49!
1048
        tqDebug("vgId:%d start all tasks completed in callbackFn, restartCounter is 0", pMeta->vgId);
49✔
1049
      } else if (allReady) {
×
1050
        pStartInfo->restartCount = 0;
×
1051
        tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
×
1052
      }
1053

1054
      scanWal = true;
49✔
1055
    }
1056
  }
1057

1058
//  streamMetaWUnLock(pMeta);
1059

1060
  return code;
49✔
1061
}
1062

1063
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
×
1064
  SVResetStreamTaskReq* pReq = (SVResetStreamTaskReq*)pMsg;
×
1065

1066
  SStreamTask* pTask = NULL;
×
1067
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
×
1068
  if (pTask == NULL || (code != 0)) {
×
1069
    tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already",
×
1070
            pMeta->vgId, pReq->taskId);
1071
    return TSDB_CODE_SUCCESS;
×
1072
  }
1073

1074
  tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
×
1075

1076
  streamMutexLock(&pTask->lock);
×
1077

1078
  streamTaskSetFailedCheckpointId(pTask, pReq->chkptId);
×
1079
  streamTaskClearCheckInfo(pTask, true);
×
1080

1081
  // clear flag set during do checkpoint, and open inputQ for all upstream tasks
1082
  SStreamTaskState pState = streamTaskGetStatus(pTask);
×
1083
  if (pState.state == TASK_STATUS__CK) {
×
1084
    streamTaskSetStatusReady(pTask);
×
1085
    tqDebug("s-task:%s reset checkpoint status to ready", pTask->id.idStr);
×
1086
  } else if (pState.state == TASK_STATUS__UNINIT) {
×
1087
    //    tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
1088
    //    tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
1089
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
1090
  } else {
1091
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
1092
  }
1093

1094
  streamMutexUnlock(&pTask->lock);
×
1095

1096
  streamMetaReleaseTask(pMeta, pTask);
×
1097
  return TSDB_CODE_SUCCESS;
×
1098
}
1099

1100
int32_t tqStreamTaskProcessAllTaskStopReq(SStreamMeta* pMeta, SMsgCb* pMsgCb, SRpcMsg* pMsg) {
4,959✔
1101
  int32_t  code = 0;
4,959✔
1102
  int32_t  vgId = pMeta->vgId;
4,959✔
1103
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
4,959✔
1104
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
4,959✔
1105
  SDecoder decoder;
1106

1107
  SStreamTaskStopReq req = {0};
4,959✔
1108
  tDecoderInit(&decoder, (uint8_t*)msg, len);
4,959✔
1109
  if ((code = tDecodeStreamTaskStopReq(&decoder, &req)) < 0) {
4,957!
1110
    tqError("vgId:%d failed to decode stop all streams, code:%s", pMeta->vgId, tstrerror(code));
×
1111
    tDecoderClear(&decoder);
×
1112
    return TSDB_CODE_SUCCESS;
×
1113
  }
1114

1115
  tDecoderClear(&decoder);
4,970✔
1116

1117
  // stop all stream tasks, only invoked when trying to drop db
1118
  if (req.streamId <= 0) {
4,963!
1119
    tqDebug("vgId:%d recv msg to stop all tasks in sync before dropping vnode", vgId);
4,970✔
1120
    code = streamMetaStopAllTasks(pMeta);
4,972✔
1121
    if (code) {
4,975!
1122
      tqError("vgId:%d failed to stop all tasks, code:%s", vgId, tstrerror(code));
×
1123
    }
1124

1125
  } else {  // stop only one stream tasks
1126

1127
  }
1128

1129
  // always return success
1130
  return TSDB_CODE_SUCCESS;
4,973✔
1131
}
1132

1133
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
1134
  SRetrieveChkptTriggerReq req = {0};
×
1135
  SStreamTask*             pTask = NULL;
×
1136
  char*                    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1137
  int32_t                  len = pMsg->contLen - sizeof(SMsgHead);
×
1138
  SDecoder                 decoder = {0};
×
1139

1140
  tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1141
  if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
×
1142
    tDecoderClear(&decoder);
×
1143
    tqError("vgId:%d invalid retrieve checkpoint-trigger req received", pMeta->vgId);
×
1144
    return TSDB_CODE_INVALID_MSG;
×
1145
  }
1146
  tDecoderClear(&decoder);
×
1147

1148
  int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
×
1149
  if (pTask == NULL || (code != 0)) {
×
1150
    tqError("vgId:%d process retrieve checkpoint-trigger, checkpointId:%" PRId64
×
1151
            " from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already",
1152
            pMeta->vgId, req.checkpointId, (int32_t)req.downstreamTaskId, req.upstreamTaskId);
1153
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
1154
  }
1155

1156
  tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64,
×
1157
          req.upstreamTaskId, (int32_t)req.downstreamTaskId, req.checkpointId);
1158

1159
  if (pTask->status.downstreamReady != 1) {
×
1160
    tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
×
1161
            pTask->id.idStr, (int32_t)req.downstreamTaskId);
1162

1163
    code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1164
                                              TSDB_CODE_STREAM_TASK_IVLD_STATUS);
1165
    streamMetaReleaseTask(pMeta, pTask);
×
1166
    return code;
×
1167
  }
1168

1169
  SStreamTaskState pState = streamTaskGetStatus(pTask);
×
1170
  if (pState.state == TASK_STATUS__CK) {  // recv the checkpoint-source/trigger already
×
1171
    int32_t transId = 0;
×
1172
    int64_t checkpointId = 0;
×
1173

1174
    streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId);
×
1175
    if (checkpointId != req.checkpointId) {
×
1176
      tqError("s-task:%s invalid checkpoint-trigger retrieve msg from 0x%" PRIx64 ", current checkpointId:%" PRId64
×
1177
              " req:%" PRId64,
1178
              pTask->id.idStr, req.downstreamTaskId, checkpointId, req.checkpointId);
1179
      streamMetaReleaseTask(pMeta, pTask);
×
1180
      return TSDB_CODE_INVALID_MSG;
×
1181
    }
1182

1183
    if (streamTaskAlreadySendTrigger(pTask, req.downstreamNodeId)) {
×
1184
      // re-send the lost checkpoint-trigger msg to downstream task
1185
      tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
×
1186
              (int32_t)req.downstreamTaskId, checkpointId, transId);
1187
      code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1188
                                                TSDB_CODE_SUCCESS);
1189
    } else {  // not send checkpoint-trigger yet, wait
1190
      int32_t recv = 0, total = 0;
×
1191
      streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
×
1192

1193
      if (recv == total) {  // add the ts info
×
1194
        tqWarn("s-task:%s all upstream send checkpoint-source/trigger, but not processed yet, wait", pTask->id.idStr);
×
1195
      } else {
1196
        tqWarn(
×
1197
            "s-task:%s not all upstream send checkpoint-source/trigger, total recv:%d/%d, wait for all upstream "
1198
            "sending checkpoint-source/trigger",
1199
            pTask->id.idStr, recv, total);
1200
      }
1201
      code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1202
                                                TSDB_CODE_ACTION_IN_PROGRESS);
1203
    }
1204
  } else {  // upstream not recv the checkpoint-source/trigger till now
1205
    if (!(pState.state == TASK_STATUS__READY || pState.state == TASK_STATUS__HALT)) {
×
1206
      tqFatal("s-task:%s invalid task status:%s", pTask->id.idStr, pState.name);
×
1207
    }
1208

1209
    tqWarn(
×
1210
        "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
1211
        "upstream sending checkpoint-source/trigger",
1212
        pTask->id.idStr);
1213
    code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1214
                                              TSDB_CODE_ACTION_IN_PROGRESS);
1215
  }
1216

1217
  streamMetaReleaseTask(pMeta, pTask);
×
1218
  return code;
×
1219
}
1220

1221
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
1222
  SCheckpointTriggerRsp rsp = {0};
×
1223
  SStreamTask*          pTask = NULL;
×
1224
  char*                 msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1225
  int32_t               len = pMsg->contLen - sizeof(SMsgHead);
×
1226
  SDecoder              decoder = {0};
×
1227

1228
  tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1229
  if (tDecodeCheckpointTriggerRsp(&decoder, &rsp) < 0) {
×
1230
    tDecoderClear(&decoder);
×
1231
    tqError("vgId:%d invalid retrieve checkpoint-trigger rsp received", pMeta->vgId);
×
1232
    return TSDB_CODE_INVALID_MSG;
×
1233
  }
1234
  tDecoderClear(&decoder);
×
1235

1236
  int32_t code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.taskId, &pTask);
×
1237
  if (pTask == NULL || (code != 0)) {
×
1238
    tqError(
×
1239
        "vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already",
1240
        pMeta->vgId, rsp.taskId);
1241
    return code;
×
1242
  }
1243

1244
  tqDebug(
×
1245
      "s-task:%s recv re-send checkpoint-trigger msg through retrieve/rsp channel, upstream:0x%x, checkpointId:%" PRId64
1246
      ", transId:%d",
1247
      pTask->id.idStr, rsp.upstreamTaskId, rsp.checkpointId, rsp.transId);
1248

1249
  code = streamTaskProcessCheckpointTriggerRsp(pTask, &rsp);
×
1250
  streamMetaReleaseTask(pMeta, pTask);
×
1251
  return code;
×
1252
}
1253

1254
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
491✔
1255
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
491✔
1256

1257
  SStreamTask* pTask = NULL;
491✔
1258
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
491✔
1259
  if (pTask == NULL || (code != 0)) {
497!
1260
    tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
×
1261
            pReq->taskId);
1262
    // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
1263
    return TSDB_CODE_SUCCESS;
×
1264
  }
1265

1266
  tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
497✔
1267
  streamTaskPause(pTask);
497✔
1268

1269
  SStreamTask* pHistoryTask = NULL;
498✔
1270
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
498!
1271
    pHistoryTask = NULL;
×
1272
    code = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId, &pHistoryTask);
×
1273
    if (pHistoryTask == NULL || (code != 0)) {
×
1274
      tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
×
1275
              ", it may have been dropped already",
1276
              pMeta->vgId, pTask->hTaskInfo.id.taskId);
1277
      streamMetaReleaseTask(pMeta, pTask);
×
1278

1279
      // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
1280
      return TSDB_CODE_SUCCESS;
×
1281
    }
1282

1283
    tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
×
1284

1285
    streamTaskPause(pHistoryTask);
×
1286
    streamMetaReleaseTask(pMeta, pHistoryTask);
×
1287
  }
1288

1289
  streamMetaReleaseTask(pMeta, pTask);
498✔
1290
  return TSDB_CODE_SUCCESS;
498✔
1291
}
1292

1293
static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t sversion, int8_t igUntreated,
732✔
1294
                                       bool fromVnode) {
1295
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
732✔
1296
  int32_t      vgId = pMeta->vgId;
732✔
1297
  int32_t      code = 0;
732✔
1298

1299
  streamTaskResume(pTask);
732✔
1300
  ETaskStatus status = streamTaskGetStatus(pTask).state;
743✔
1301

1302
  int32_t level = pTask->info.taskLevel;
743✔
1303
  if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
743!
1304
    // no lock needs to secure the access of the version
1305
    if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
743!
1306
      // discard all the data  when the stream task is suspended.
1307
      walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
127✔
1308
      tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
127✔
1309
              ", schedStatus:%d",
1310
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1311
    } else {  // from the previous paused version and go on
1312
      tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
616✔
1313
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1314
    }
1315

1316
    if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
743!
1317
      pTask->hTaskInfo.operatorOpen = false;
×
1318
      code = streamStartScanHistoryAsync(pTask, igUntreated);
×
1319
    } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
743✔
1320
      //      code = tqScanWalAsync((STQ*)handle, false);
1321
    } else {
1322
      code = streamTrySchedExec(pTask, false);
377✔
1323
    }
1324
  }
1325

1326
  return code;
742✔
1327
}
1328

1329
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode) {
728✔
1330
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
728✔
1331

1332
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
728✔
1333

1334
  SStreamTask* pTask = NULL;
728✔
1335
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
728✔
1336
  if (pTask == NULL || (code != 0)) {
734!
1337
    tqError("s-task:0x%x failed to acquire task to resume, it may have been dropped or stopped", pReq->taskId);
×
1338
    return TSDB_CODE_SUCCESS;
×
1339
  }
1340

1341
  streamMutexLock(&pTask->lock);
734✔
1342
  SStreamTaskState pState = streamTaskGetStatus(pTask);
737✔
1343
  tqDebug("s-task:%s start to resume from paused, current status:%s", pTask->id.idStr, pState.name);
735✔
1344
  streamMutexUnlock(&pTask->lock);
735✔
1345

1346
  code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
740✔
1347
  if (code != 0) {
742!
1348
    streamMetaReleaseTask(pMeta, pTask);
×
1349
    tqError("s-task:%s failed to resume tasks, code:%s", pTask->id.idStr, tstrerror(code));
×
1350
    return TSDB_CODE_SUCCESS;
×
1351
  }
1352

1353
  STaskId*     pHTaskId = &pTask->hTaskInfo.id;
742✔
1354
  SStreamTask* pHTask = NULL;
742✔
1355
  code = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
742✔
1356
  if (pHTask && (code == 0)) {
743!
1357
    streamMutexLock(&pHTask->lock);
×
1358
    SStreamTaskState p = streamTaskGetStatus(pHTask);
×
1359
    tqDebug("s-task:%s related history task start to resume from paused, current status:%s", pHTask->id.idStr, p.name);
×
1360
    streamMutexUnlock(&pHTask->lock);
×
1361

1362
    code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode);
×
1363
    tqDebug("s-task:%s resume complete, code:%s", pHTask->id.idStr, tstrerror(code));
×
1364

1365
    streamMetaReleaseTask(pMeta, pHTask);
×
1366
  }
1367

1368
  streamMetaReleaseTask(pMeta, pTask);
743✔
1369
  return TSDB_CODE_SUCCESS;
742✔
1370
}
1371

1372
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); }
×
1373

1374
int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) {
6,963✔
1375
  rpcFreeCont(pMsg->pCont);
6,963✔
1376
  pMsg->pCont = NULL;
6,963✔
1377
  return TSDB_CODE_SUCCESS;
6,963✔
1378
}
1379

1380
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
21,874✔
1381
  SMStreamHbRspMsg rsp = {0};
21,874✔
1382
  int32_t          code = 0;
21,874✔
1383
  SDecoder         decoder;
1384
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
21,874✔
1385
  int32_t          len = pMsg->contLen - sizeof(SMsgHead);
21,874✔
1386

1387
  tDecoderInit(&decoder, (uint8_t*)msg, len);
21,874✔
1388
  code = tDecodeStreamHbRsp(&decoder, &rsp);
21,862✔
1389
  if (code < 0) {
21,876!
1390
    terrno = TSDB_CODE_INVALID_MSG;
×
1391
    tDecoderClear(&decoder);
×
1392
    tqError("vgId:%d failed to parse hb rsp msg, code:%s", pMeta->vgId, tstrerror(terrno));
×
1393
    return terrno;
×
1394
  }
1395

1396
  tDecoderClear(&decoder);
21,876✔
1397
  return streamProcessHeartbeatRsp(pMeta, &rsp);
21,876✔
1398
}
1399

1400
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
2,859✔
1401

1402
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
4,104✔
1403

1404
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
5,099✔
1405
  SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont;
5,099✔
1406

1407
  SStreamTask* pTask = NULL;
5,099✔
1408
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->downstreamTaskId, &pTask);
5,099✔
1409
  if (pTask == NULL || (code != 0)) {
5,101!
1410
    tqError("vgId:%d failed to acquire task:0x%x when handling checkpoint-ready msg, it may have been dropped",
2!
1411
            pRsp->downstreamNodeId, pRsp->downstreamTaskId);
1412
    return code;
2✔
1413
  }
1414

1415
  code = streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId);
5,099✔
1416
  streamMetaReleaseTask(pMeta, pTask);
5,095✔
1417
  return code;
5,096✔
1418
}
1419

1420
int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
99✔
1421
  int32_t                vgId = pMeta->vgId;
99✔
1422
  int32_t                code = 0;
99✔
1423
  SStreamTask*           pTask = NULL;
99✔
1424
  char*                  msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
99✔
1425
  int32_t                len = pMsg->contLen - sizeof(SMsgHead);
99✔
1426
  int64_t                now = taosGetTimestampMs();
101✔
1427
  SDecoder               decoder;
1428
  SRestoreCheckpointInfo req = {0};
101✔
1429

1430
  tDecoderInit(&decoder, (uint8_t*)msg, len);
101✔
1431
  if ((code = tDecodeRestoreCheckpointInfo(&decoder, &req)) < 0) {
100!
1432
    tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code));
×
1433
    tDecoderClear(&decoder);
×
1434
    return TSDB_CODE_SUCCESS;
×
1435
  }
1436

1437
  tDecoderClear(&decoder);
101✔
1438

1439
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
99✔
1440
  if (pTask == NULL || (code != 0)) {
100!
1441
    // ignore this code to avoid error code over writing
1442
    if (pMeta->role == NODE_ROLE_LEADER) {
×
1443
      tqError("vgId:%d process consensus checkpointId req:%" PRId64
×
1444
              " transId:%d, failed to acquire task:0x%x, it may have been dropped/stopped already",
1445
              pMeta->vgId, req.checkpointId, req.transId, req.taskId);
1446

1447
      int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId, true);
×
1448
      if (ret) {
×
1449
        tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret));
×
1450
      }
1451
    } else {
1452
      tqDebug("vgId:%d task:0x%x stopped in follower node, not set the consensus checkpointId:%" PRId64 " transId:%d",
×
1453
              pMeta->vgId, req.taskId, req.checkpointId, req.transId);
1454
    }
1455

1456
    return 0;
×
1457
  }
1458

1459
  // discard the rsp, since it is expired.
1460
  if (req.startTs < pTask->execInfo.created) {
100!
1461
    tqWarn("s-task:%s vgId:%d createTs:%" PRId64 " recv expired consensus checkpointId:%" PRId64
×
1462
           " from task createTs:%" PRId64 " < task createTs:%" PRId64 ", discard",
1463
           pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs,
1464
           pTask->execInfo.created);
1465
    if (pMeta->role == NODE_ROLE_LEADER) {
×
1466
      streamMetaAddFailedTaskSelf(pTask, now, true);
×
1467
    }
1468

1469
    streamMetaReleaseTask(pMeta, pTask);
×
1470
    return TSDB_CODE_SUCCESS;
×
1471
  }
1472

1473
  tqInfo("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64
100!
1474
          " transId:%d from mnode, reqTs:%" PRId64 " task createTs:%" PRId64,
1475
          pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId, req.transId, req.startTs,
1476
          pTask->execInfo.created);
1477

1478
  streamMutexLock(&pTask->lock);
101✔
1479
  SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo;
101✔
1480

1481
  if (pTask->chkInfo.checkpointId < req.checkpointId) {
101!
1482
    tqFatal("s-task:%s vgId:%d invalid consensus-checkpointId:%" PRId64 ", greater than existed checkpointId:%" PRId64,
×
1483
            pTask->id.idStr, vgId, req.checkpointId, pTask->chkInfo.checkpointId);
1484

1485
    streamMutexUnlock(&pTask->lock);
×
1486
    streamMetaReleaseTask(pMeta, pTask);
×
1487
    return 0;
×
1488
  }
1489

1490
  if (pConsenInfo->consenChkptTransId >= req.transId) {
101!
1491
    tqWarn("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
×
1492
            pConsenInfo->consenChkptTransId, req.transId);
1493
    streamMutexUnlock(&pTask->lock);
×
1494
    streamMetaReleaseTask(pMeta, pTask);
×
1495
    return TSDB_CODE_SUCCESS;
×
1496
  }
1497

1498
  if (pTask->chkInfo.checkpointId != req.checkpointId) {
101!
1499
    tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64 " transId:%d", pTask->id.idStr, vgId,
×
1500
            pTask->chkInfo.checkpointId, req.checkpointId, req.transId);
1501
    pTask->chkInfo.checkpointId = req.checkpointId;
×
1502
    tqSetRestoreVersionInfo(pTask);
×
1503
  } else {
1504
    tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current id, transId:%d not update",
101✔
1505
            pTask->id.idStr, vgId, req.checkpointId, req.transId);
1506
  }
1507

1508
  streamTaskSetConsenChkptIdRecv(pTask, req.transId, now);
101✔
1509
  streamMutexUnlock(&pTask->lock);
101✔
1510

1511
  streamMetaWLock(pTask->pMeta);
101✔
1512
  if (pMeta->startInfo.curStage == START_WAIT_FOR_CHKPTID) {
101✔
1513
    pMeta->startInfo.curStage = START_CHECK_DOWNSTREAM;
27✔
1514

1515
    SStartTaskStageInfo info = {.stage = pMeta->startInfo.curStage, .ts = now};
27✔
1516
    taosArrayPush(pMeta->startInfo.pStagesList, &info);
27✔
1517

1518
    tqDebug("vgId:%d wait_for_chkptId stage -> check_down_stream stage, reqTs:%" PRId64 " , numOfStageHist:%d",
27✔
1519
            pMeta->vgId, info.ts, (int32_t)taosArrayGetSize(pMeta->startInfo.pStagesList));
1520
  }
1521

1522
  if (pMeta->role == NODE_ROLE_LEADER) {
101!
1523
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
101✔
1524

1525
    bool exist = false;
101✔
1526
    for (int32_t i = 0; i < taosArrayGetSize(pMeta->startInfo.pRecvChkptIdTasks); ++i) {
298✔
1527
      STaskId* pId = taosArrayGet(pMeta->startInfo.pRecvChkptIdTasks, i);
197✔
1528
      if (id.streamId == pId->streamId && id.taskId == pId->taskId) {
197!
1529
        exist = true;
×
1530
        break;
×
1531
      }
1532
    }
1533

1534
    if (!exist) {
101!
1535
      void* p = taosArrayPush(pMeta->startInfo.pRecvChkptIdTasks, &id);
101✔
1536
      if (p == NULL) {  // todo handle error, not record the newly attached, start may dead-lock
101!
1537
        tqError("s-task:0x%x failed to add into recv checkpointId task list, code:%s", req.taskId, tstrerror(code));
×
1538
      } else {
1539
        int32_t num = taosArrayGetSize(pMeta->startInfo.pRecvChkptIdTasks);
101✔
1540
        tqDebug("s-task:0x%x vgId:%d added into recv checkpointId task list, already recv %d", req.taskId, req.nodeId,
101✔
1541
                num);
1542
      }
1543
    } else {
1544
      int32_t num = taosArrayGetSize(pMeta->startInfo.pRecvChkptIdTasks);
×
1545
      tqDebug("s-task:0x%x vgId:%d already exist in recv consensus checkpontId, total existed:%d", req.taskId,
×
1546
              req.nodeId, num);
1547
    }
1548

1549
    code = tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
101✔
1550
    if (code != 0) {
101!
1551
      // todo remove the newly added, otherwise, deadlock exist
1552
      tqError("s-task:0x%x vgId:%d failed start task async, code:%s", req.taskId, vgId, tstrerror(code));
×
1553
    }
1554
  } else {
1555
    tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr);
×
1556
  }
1557

1558
  streamMetaWUnLock(pTask->pMeta);
101✔
1559

1560
  streamMetaReleaseTask(pMeta, pTask);
101✔
1561
  return 0;
101✔
1562
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc