• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4113

17 May 2025 06:43AM UTC coverage: 62.054% (-0.8%) from 62.857%
#4113

push

travis-ci

web-flow
Merge pull request #31115 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

154737 of 318088 branches covered (48.65%)

Branch coverage included in aggregate %.

175 of 225 new or added lines in 20 files covered. (77.78%)

5853 existing lines in 216 files now uncovered.

239453 of 317147 relevant lines covered (75.5%)

15121865.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.25
/source/dnode/vnode/src/tqCommon/tqCommon.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsgcb.h"
17
#include "tq.h"
18
#include "tstream.h"
19

20
typedef struct SMStreamCheckpointReadyRspMsg {
21
  SMsgHead head;
22
  int64_t  streamId;
23
  int32_t  upstreamTaskId;
24
  int32_t  upstreamNodeId;
25
  int32_t  downstreamTaskId;
26
  int32_t  downstreamNodeId;
27
  int64_t  checkpointId;
28
  int32_t  transId;
29
} SMStreamCheckpointReadyRspMsg;
30

31
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
32

33
int32_t tqExpandStreamTask(SStreamTask* pTask) {
12,092✔
34
  SStreamMeta* pMeta = pTask->pMeta;
12,092✔
35
  int32_t      vgId = pMeta->vgId;
12,092✔
36
  int64_t      st = taosGetTimestampMs();
12,092✔
37
  int64_t      streamId = 0;
12,092✔
38
  int32_t      taskId = 0;
12,092✔
39
  int32_t      code = 0;
12,092✔
40

41
  tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId);
12,092✔
42

43
  if (pTask->info.fillHistory != STREAM_NORMAL_TASK) {
12,092✔
44
    streamId = pTask->streamTaskId.streamId;
3,771✔
45
    taskId = pTask->streamTaskId.taskId;
3,771✔
46
  } else {
47
    streamId = pTask->id.streamId;
8,321✔
48
    taskId = pTask->id.taskId;
8,321✔
49
  }
50

51
  // sink task does not need the pState
52
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
12,092✔
53
    if (pTask->info.fillHistory == STREAM_RECALCUL_TASK) {
6,337!
54
      pTask->pRecalState = streamStateRecalatedOpen(pMeta->path, pTask, pTask->id.streamId, pTask->id.taskId);
×
55
      if (pTask->pRecalState == NULL) {
×
56
        tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
×
57
        return terrno;
×
58
      } else {
59
        tqDebug("s-task:%s recal state:%p", pTask->id.idStr, pTask->pRecalState);
×
60
      }
61
    }
62

63
    pTask->pState = streamStateOpen(pMeta->path, pTask, streamId, taskId);
6,337✔
64
    if (pTask->pState == NULL) {
6,337!
65
      tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
×
66
      return terrno;
×
67
    } else {
68
      tqDebug("s-task:%s stream state:%p", pTask->id.idStr, pTask->pState);
6,337✔
69
    }
70
  }
71

72
  SReadHandle handle = {
12,092✔
73
      .checkpointId = pTask->chkInfo.checkpointId,
12,092✔
74
      .pStateBackend = NULL,
75
      .fillHistory = pTask->info.fillHistory,
12,092✔
76
      .winRange = pTask->dataRange.window,
77
      .pOtherBackend = NULL,
78
  };
79

80
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__MERGE) {
12,092!
81
    handle.vnode = ((STQ*)pMeta->ahandle)->pVnode;
6,070✔
82
    handle.initTqReader = 1;
6,070✔
83
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
6,022✔
84
    handle.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
267✔
85
  }
86

87
  initStorageAPI(&handle.api);
12,092✔
88

89
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG ||
12,089✔
90
      pTask->info.taskLevel == TASK_LEVEL__MERGE) {
5,752!
91
    if (pTask->info.fillHistory == STREAM_RECALCUL_TASK) {
6,337!
92
      handle.pStateBackend = pTask->pRecalState;
×
93
      handle.pOtherBackend = pTask->pState;
×
94
    } else {
95
      handle.pStateBackend = pTask->pState;
6,337✔
96
      handle.pOtherBackend = NULL;
6,337✔
97
    }
98

99
    code = qCreateStreamExecTaskInfo(&pTask->exec.pExecutor, pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
6,337✔
100
    if (code) {
6,332!
101
      tqError("s-task:%s failed to expand task, code:%s", pTask->id.idStr, tstrerror(code));
×
102
      return code;
×
103
    }
104

105
    code = qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
6,332✔
106
    if (code) {
6,335!
107
      return code;
×
108
    }
109

110
    code = qSetStreamNotifyInfo(pTask->exec.pExecutor, pTask->notifyInfo.notifyEventTypes,
12,670✔
111
                                pTask->notifyInfo.pSchemaWrapper, pTask->notifyInfo.stbFullName,
6,335✔
112
                                IS_NEW_SUBTB_RULE(pTask), &pTask->notifyEventStat);
6,335!
113
    if (code) {
6,334!
114
      tqError("s-task:%s failed to set stream notify info, code:%s", pTask->id.idStr, tstrerror(code));
×
115
      return code;
×
116
    }
117

118
    qSetStreamMergeInfo(pTask->exec.pExecutor, pTask->pVTables);
6,334✔
119
  }
120

121
  streamSetupScheduleTrigger(pTask);
12,086✔
122

123
  double el = (taosGetTimestampMs() - st) / 1000.0;
12,090✔
124
  tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el);
12,090✔
125

126
  return code;
12,090✔
127
}
128

129
void tqSetRestoreVersionInfo(SStreamTask* pTask) {
12,446✔
130
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
12,446✔
131

132
  // checkpoint ver is the kept version, handled data should be the next version.
133
  if (pChkInfo->checkpointId != 0) {
12,446✔
134
    pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
212✔
135
    pChkInfo->processedVer = pChkInfo->checkpointVer;
212✔
136
    pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
212✔
137

138
    tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
212!
139
           pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
140
  }
141

142
  pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
12,446✔
143
}
12,446✔
144

145
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
41✔
146
  int32_t vgId = pMeta->vgId;
41✔
147
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
41✔
148
  if (numOfTasks == 0) {
41!
149
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
150
    return 0;
×
151
  }
152

153
  tqInfo("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
41!
154

155
  int32_t type = restart ? STREAM_EXEC_T_RESTART_ALL_TASKS : STREAM_EXEC_T_START_ALL_TASKS;
41!
156
  return streamTaskSchedTask(cb, vgId, 0, 0, type, false);
41✔
157
}
158

159
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
8,224✔
160
  int32_t vgId = pMeta->vgId;
8,224✔
161
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
8,224✔
162
  if (numOfTasks == 0) {
8,224!
163
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
164
    return 0;
×
165
  }
166

167
  tqDebug("vgId:%d start task:0x%x async", vgId, taskId);
8,224✔
168
  return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK, false);
8,224✔
169
}
170

171
// this is to process request from transaction, always return true.
172
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader) {
127✔
173
  int32_t                  vgId = pMeta->vgId;
127✔
174
  char*                    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
127✔
175
  int32_t                  len = pMsg->contLen - sizeof(SMsgHead);
127✔
176
  SRpcMsg                  rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
127✔
177
  int64_t                  st = taosGetTimestampMs();
127✔
178
  bool                     updated = false;
127✔
179
  int32_t                  code = 0;
127✔
180
  SStreamTask*             pTask = NULL;
127✔
181
  SStreamTask*             pHTask = NULL;
127✔
182
  SStreamTaskNodeUpdateMsg req = {0};
127✔
183
  SDecoder                 decoder;
184

185
  tDecoderInit(&decoder, (uint8_t*)msg, len);
127✔
186
  code = tDecodeStreamTaskUpdateMsg(&decoder, &req);
127✔
187
  tDecoderClear(&decoder);
127✔
188

189
  if (code < 0) {
127!
190
    rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
×
NEW
191
    tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(code));
×
192
    tDestroyNodeUpdateMsg(&req);
×
193
    return rsp.code;
×
194
  }
195

196
  int32_t gError = streamGetFatalError(pMeta);
127✔
197
  if (gError != 0) {
127!
198
    tqError("vgId:%d global fatal occurs, code:%s, ts:%" PRId64 " func:%s", pMeta->vgId, tstrerror(gError),
×
199
            pMeta->fatalInfo.ts, pMeta->fatalInfo.func);
200
    return 0;
×
201
  }
202

203
  // update the nodeEpset when it exists
204
  streamMetaWLock(pMeta);
127✔
205

206
  // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
207
  STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
127✔
208
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
127✔
209
  if (code != 0) {
127!
210
    tqError("vgId:%d failed to acquire task:0x%x when handling update task epset, it may have been dropped", vgId,
×
211
            req.taskId);
212
    rsp.code = TSDB_CODE_SUCCESS;
×
213
    streamMetaWUnLock(pMeta);
×
NEW
214
    tDestroyNodeUpdateMsg(&req);
×
215
    return rsp.code;
×
216
  }
217

218
  const char* idstr = pTask->id.idStr;
127✔
219

220
  if (req.transId <= 0) {
127!
221
    tqError("vgId:%d invalid update nodeEp task, transId:%d, discard", vgId, req.taskId);
×
222
    rsp.code = TSDB_CODE_SUCCESS;
×
223

224
    streamMetaReleaseTask(pMeta, pTask);
×
225
    streamMetaWUnLock(pMeta);
×
226

NEW
227
    tDestroyNodeUpdateMsg(&req);
×
228
    return rsp.code;
×
229
  }
230

231
  // info needs to be kept till the new trans to update the nodeEp arrived.
232
  bool update = streamMetaInitUpdateTaskList(pMeta, req.transId, req.pTaskList);
127✔
233
  if (!update) {
127✔
234
    rsp.code = TSDB_CODE_SUCCESS;
57✔
235

236
    streamMetaReleaseTask(pMeta, pTask);
57✔
237
    streamMetaWUnLock(pMeta);
57✔
238

239
    tDestroyNodeUpdateMsg(&req);
57✔
240
    return rsp.code;
57✔
241
  }
242

243
  // duplicate update epset msg received, discard this redundant message
244
  STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
70✔
245

246
  void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
70✔
247
  if (pReqTask != NULL) {
70!
248
    tqDebug("s-task:%s (vgId:%d) already update in transId:%d, discard the nodeEp update msg", idstr, vgId,
×
249
            req.transId);
250
    rsp.code = TSDB_CODE_SUCCESS;
×
251

252
    streamMetaReleaseTask(pMeta, pTask);
×
253
    streamMetaWUnLock(pMeta);
×
254

NEW
255
    tDestroyNodeUpdateMsg(&req);
×
256
    return rsp.code;
×
257
  }
258

259
  updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
70✔
260

261
  // send the checkpoint-source-rsp for source task to end the checkpoint trans in mnode
262
  code = streamTaskSendCheckpointsourceRsp(pTask);
70✔
263
  if (code) {
70!
264
    tqError("%s failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
×
265
  }
266
  streamTaskResetStatus(pTask);
70✔
267

268
  streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
70✔
269

270
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
70✔
271
    code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->hTaskInfo.id, &pHTask);
58✔
272
    if (code != 0) {
58!
273
      tqError(
×
274
          "vgId:%d failed to acquire fill-history task:0x%x when handling update, may have been dropped already, rel "
275
          "stream task:0x%x",
276
          vgId, (uint32_t)pTask->hTaskInfo.id.taskId, req.taskId);
277
      CLEAR_RELATED_FILLHISTORY_TASK(pTask);
×
278
    } else {
279
      tqDebug("s-task:%s fill-history task update nodeEp along with stream task", pHTask->id.idStr);
58!
280
      bool updateEpSet = streamTaskUpdateEpsetInfo(pHTask, req.pNodeList);
58✔
281
      if (updateEpSet) {
58✔
282
        updated = updateEpSet;
14✔
283
      }
284

285
      streamTaskResetStatus(pHTask);
58✔
286
      streamTaskStopMonitorCheckRsp(&pHTask->taskCheckInfo, pHTask->id.idStr);
58✔
287
    }
288
  }
289

290
  // stream do update the nodeEp info, write it into stream meta.
291
  if (updated) {
70✔
292
    tqInfo("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId);
21!
293
    code = streamMetaSaveTaskInMeta(pMeta, pTask);
21✔
294
    if (code) {
21!
295
      tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code));
×
296
    }
297

298
    if (pHTask != NULL) {
21✔
299
      code = streamMetaSaveTaskInMeta(pMeta, pHTask);
14✔
300
      if (code) {
14!
301
        tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code));
×
302
      }
303
    }
304
  } else {
305
    tqInfo("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId);
49!
306
  }
307

308
  code = streamTaskStop(pTask);
70✔
309
  if (code) {
70!
310
    tqError("s-task:%s vgId:%d failed to stop task, code:%s", idstr, vgId, tstrerror(code));
×
311
  }
312

313
  if (pHTask != NULL) {
70✔
314
    code = streamTaskStop(pHTask);
58✔
315
    if (code) {
58!
316
      tqError("s-task:%s vgId:%d failed to stop related history task, code:%s", idstr, vgId, tstrerror(code));
×
317
    }
318
  }
319

320
  // keep info
321
  streamMetaAddIntoUpdateTaskList(pMeta, pTask, (pHTask != NULL) ? (pHTask) : NULL, req.transId, st);
70✔
322
  streamMetaReleaseTask(pMeta, pTask);
70✔
323
  streamMetaReleaseTask(pMeta, pHTask);
70✔
324

325
  rsp.code = TSDB_CODE_SUCCESS;
70✔
326

327
  // possibly only handle the stream task.
328
  int32_t reqUpdateTasks = taosArrayGetSize(req.pTaskList);
70✔
329
  int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
70✔
330

331
  if (restored && isLeader) {
70✔
332
    tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId);
46!
333
    pMeta->startInfo.tasksWillRestart = 1;
46✔
334
  }
335

336
  if (updateTasks < reqUpdateTasks) {
70✔
337
    if (isLeader) {
5!
338
      tqInfo("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
5!
339
              updateTasks, (reqUpdateTasks - updateTasks));
340
    } else {
NEW
341
      tqInfo("vgId:%d closed tasks:%d, unclosed:%d, follower not restart tasks", vgId, updateTasks,
×
342
              (reqUpdateTasks - updateTasks));
343
    }
344
  } else {
345
    if ((code = streamMetaCommit(pMeta)) < 0) {
65!
346
      // always return true
347
      streamMetaWUnLock(pMeta);
×
NEW
348
      tDestroyNodeUpdateMsg(&req);
×
NEW
349
      tqError("vgId:%d commit meta failed, code:%s not restart the stream tasks", vgId, tstrerror(code));
×
350
      return TSDB_CODE_SUCCESS;
×
351
    }
352

353
    streamMetaClearSetUpdateTaskListComplete(pMeta);
65✔
354

355
    if (isLeader) {
65✔
356
      if (!restored) {
59✔
357
        tqInfo("vgId:%d vnode restore not completed, not start all tasks", vgId);
18!
358
      } else {
359
        tqInfo("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, reqUpdateTasks, req.transId);
41!
360
#if 0
361
      taosMSleep(5000);// for test purpose, to trigger the leader election
362
#endif
363
        code = tqStreamTaskStartAsync(pMeta, cb, true);
41✔
364
        if (code) {
41!
UNCOV
365
          tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code));
×
366
        }
367
      }
368
    } else {
369
      tqInfo("vgId:%d follower nodes not restart tasks", vgId);
6!
370
    }
371
  }
372

373
  streamMetaWUnLock(pMeta);
70✔
374
  tDestroyNodeUpdateMsg(&req);
70✔
375
  return rsp.code;  // always return true
70✔
376
}
377

378
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
27,926✔
379
  char*   msgStr = pMsg->pCont;
27,926✔
380
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
27,926✔
381
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
27,926✔
382

383
  SStreamDispatchReq req = {0};
27,926✔
384

385
  SDecoder decoder;
386
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
27,926✔
387
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
27,935!
UNCOV
388
    tDecoderClear(&decoder);
×
389
    return TSDB_CODE_MSG_DECODE_ERROR;
×
390
  }
391
  tDecoderClear(&decoder);
27,948✔
392

393
  tqDebug("s-task:0x%x recv dispatch msg from 0x%x(vgId:%d)", req.taskId, req.upstreamTaskId, req.upstreamNodeId);
27,954✔
394

395
  SStreamTask* pTask = NULL;
27,968✔
396
  int32_t      code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
27,968✔
397
  if (pTask && (code == 0)) {
27,988!
398
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
27,979✔
399
    if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0) {
27,979!
UNCOV
400
      return -1;
×
401
    }
402
    tCleanupStreamDispatchReq(&req);
27,965✔
403
    streamMetaReleaseTask(pMeta, pTask);
27,963✔
404
    return 0;
27,976✔
405
  } else {
406
    tqError("vgId:%d failed to find task:0x%x to handle the dispatch req, it may have been destroyed already",
9!
407
            pMeta->vgId, req.taskId);
408

409
    SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
9✔
410
    if (pRspHead == NULL) {
10!
UNCOV
411
      tqError("s-task:0x%x send dispatch error rsp, out of memory", req.taskId);
×
412
      return terrno;
×
413
    }
414

415
    pRspHead->vgId = htonl(req.upstreamNodeId);
10✔
416
    if (pRspHead->vgId == 0) {
10!
UNCOV
417
      tqError("vgId:%d invalid dispatch msg from upstream to task:0x%x", pMeta->vgId, req.taskId);
×
418
      return TSDB_CODE_INVALID_MSG;
×
419
    }
420

421
    SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
10✔
422
    pRsp->streamId = htobe64(req.streamId);
10✔
423
    pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
10✔
424
    pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
10✔
425
    pRsp->downstreamNodeId = htonl(pMeta->vgId);
10✔
426
    pRsp->downstreamTaskId = htonl(req.taskId);
10✔
427
    pRsp->msgId = htonl(req.msgId);
10✔
428
    pRsp->stage = htobe64(req.stage);
10✔
429
    pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;
10✔
430

431
    int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
10✔
432
    SRpcMsg rsp = {.code = TSDB_CODE_STREAM_TASK_NOT_EXIST, .info = pMsg->info, .contLen = len, .pCont = pRspHead};
10✔
433
    tqError("s-task:0x%x send dispatch error rsp, no task", req.taskId);
10!
434

435
    tmsgSendRsp(&rsp);
10✔
436
    tCleanupStreamDispatchReq(&req);
10✔
437

438
    return 0;
10✔
439
  }
440
}
441

442
int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
27,996✔
443
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
27,996✔
444

445
  int32_t vgId = pMeta->vgId;
27,996✔
446
  pRsp->upstreamNodeId = htonl(pRsp->upstreamNodeId);
27,996✔
447
  pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
27,996✔
448
  pRsp->streamId = htobe64(pRsp->streamId);
27,996✔
449
  pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId);
27,996✔
450
  pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId);
27,996✔
451
  pRsp->stage = htobe64(pRsp->stage);
27,996✔
452
  pRsp->msgId = htonl(pRsp->msgId);
27,997✔
453

454
  tqDebug("s-task:0x%x vgId:%d recv dispatch-rsp from 0x%x vgId:%d", pRsp->upstreamTaskId, pRsp->upstreamNodeId,
27,997✔
455
          pRsp->downstreamTaskId, pRsp->downstreamNodeId);
456

457
  SStreamTask* pTask = NULL;
27,997✔
458
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId, &pTask);
27,997✔
459
  if (pTask && (code == 0)) {
27,995!
460
    code = streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
27,985✔
461
    streamMetaReleaseTask(pMeta, pTask);
27,984✔
462
    return code;
27,983✔
463
  } else {
464
    tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId);
10!
465
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
11✔
466
  }
467
}
468

469
int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
532✔
470
  char*    msgStr = pMsg->pCont;
532✔
471
  char*    msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
532✔
472
  int32_t  msgLen = pMsg->contLen - sizeof(SMsgHead);
532✔
473
  int32_t  code = 0;
532✔
474
  SDecoder decoder;
475

476
  SStreamRetrieveReq req;
477
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
532✔
478
  code = tDecodeStreamRetrieveReq(&decoder, &req);
532✔
479
  tDecoderClear(&decoder);
532✔
480

481
  if (code) {
532!
UNCOV
482
    tqError("vgId:%d failed to decode retrieve msg, discard it", pMeta->vgId);
×
483
    return code;
×
484
  }
485

486
  SStreamTask* pTask = NULL;
532✔
487
  code = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId, &pTask);
532✔
488
  if (pTask == NULL || code != 0) {
531!
UNCOV
489
    tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
×
490
            req.dstTaskId);
UNCOV
491
    tCleanupStreamRetrieveReq(&req);
×
UNCOV
492
    return code;
×
493
  }
494

495
  // enqueue
496
  tqDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d), QID:0x%" PRIx64, pTask->id.idStr,
531✔
497
          pTask->pMeta->vgId, pTask->info.taskLevel, req.srcTaskId, req.srcNodeId, req.reqId);
498

499
  // if task is in ck status, set current ck failed
500
  streamTaskSetCheckpointFailed(pTask);
531✔
501

502
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
532✔
503
    code = streamProcessRetrieveReq(pTask, &req);
526✔
504
  } else {
505
    req.srcNodeId = pTask->info.nodeId;
6✔
506
    req.srcTaskId = pTask->id.taskId;
6✔
507
    code = streamTaskBroadcastRetrieveReq(pTask, &req);
6✔
508
  }
509

510
  if (code != TSDB_CODE_SUCCESS) {  // return error not send rsp manually
531!
UNCOV
511
    tqError("s-task:0x%x vgId:%d failed to process retrieve request from 0x%x, code:%s", req.dstTaskId, req.dstNodeId,
×
512
            req.srcTaskId, tstrerror(code));
513
  } else {  // send rsp manually only on success.
514
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
531✔
515
    streamTaskSendRetrieveRsp(&req, &rsp);
531✔
516
  }
517

518
  streamMetaReleaseTask(pMeta, pTask);
531✔
519
  tCleanupStreamRetrieveReq(&req);
532✔
520

521
  // always return success, to disable the auto rsp
522
  return code;
532✔
523
}
524

525
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
27,527✔
526
  char*   msgStr = pMsg->pCont;
27,527✔
527
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
27,527✔
528
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
27,527✔
529
  int32_t code = 0;
27,527✔
530

531
  SStreamTaskCheckReq req;
532
  SStreamTaskCheckRsp rsp = {0};
27,527✔
533

534
  SDecoder decoder;
535

536
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
27,527✔
537
  code = tDecodeStreamTaskCheckReq(&decoder, &req);
27,521✔
538
  tDecoderClear(&decoder);
27,503✔
539

540
  if (code) {
27,534!
UNCOV
541
    tqError("vgId:%d decode check msg failed, not handle this msg", pMeta->vgId);
×
542
    return code;
×
543
  }
544

545
  streamTaskProcessCheckMsg(pMeta, &req, &rsp);
27,534✔
546
  return streamTaskSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId);
27,565✔
547
}
548

549
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
27,532✔
550
  char*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
27,532✔
551
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
27,532✔
552
  int32_t vgId = pMeta->vgId;
27,532✔
553
  int32_t code = TSDB_CODE_SUCCESS;
27,532✔
554

555
  SStreamTaskCheckRsp rsp;
556

557
  SDecoder decoder;
558
  tDecoderInit(&decoder, (uint8_t*)pReq, len);
27,532✔
559
  code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
27,530✔
560
  if (code < 0) {
27,514!
UNCOV
561
    terrno = TSDB_CODE_INVALID_MSG;
×
562
    tDecoderClear(&decoder);
×
563
    tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno));
×
564
    return -1;
×
565
  }
566

567
  tDecoderClear(&decoder);
27,514✔
568
  tqDebug("tq task:0x%x (vgId:%d) recv check rsp(QID:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId,
27,537✔
569
          rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
570

571
  if (!isLeader) {
27,537!
UNCOV
572
    tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
×
573
            rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
UNCOV
574
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId, true);
×
575
  }
576

577
  SStreamTask* pTask = NULL;
27,537✔
578
  code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId, &pTask);
27,537✔
579
  if ((pTask == NULL) || (code != 0)) {
27,534!
580
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId, true);
79✔
581
  }
582

583
  code = streamTaskProcessCheckRsp(pTask, &rsp);
27,455✔
584
  streamMetaReleaseTask(pMeta, pTask);
27,452✔
585
  return code;
27,454✔
586
}
587

588
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
4,507✔
589
  int32_t vgId = pMeta->vgId;
4,507✔
590
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
4,507✔
591
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
4,507✔
592
  int32_t code = 0;
4,507✔
593

594
  SStreamCheckpointReadyMsg req = {0};
4,507✔
595

596
  SDecoder decoder;
597
  tDecoderInit(&decoder, (uint8_t*)msg, len);
4,507✔
598
  if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) {
4,505!
UNCOV
599
    code = TSDB_CODE_MSG_DECODE_ERROR;
×
600
    tDecoderClear(&decoder);
×
601
    return code;
×
602
  }
603
  tDecoderClear(&decoder);
4,496✔
604

605
  SStreamTask* pTask = NULL;
4,507✔
606
  code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
4,507✔
607
  if (code != 0) {
4,501✔
608
    tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId);
3!
609
    return code;
3✔
610
  }
611

612
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
4,498!
UNCOV
613
    tqDebug("vgId:%d s-task:%s recv invalid the checkpoint-ready msg from task:0x%x (vgId:%d), discard", vgId,
×
614
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
UNCOV
615
    streamMetaReleaseTask(pMeta, pTask);
×
616
    return TSDB_CODE_INVALID_MSG;
×
617
  } else {
618
    tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId,
4,498✔
619
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
620
  }
621

622
  code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamNodeId, req.downstreamTaskId);
4,498✔
623
  streamMetaReleaseTask(pMeta, pTask);
4,494✔
624
  if (code) {
4,500!
UNCOV
625
    return code;
×
626
  }
627

628
  {  // send checkpoint ready rsp
629
    SMStreamCheckpointReadyRspMsg* pReadyRsp = rpcMallocCont(sizeof(SMStreamCheckpointReadyRspMsg));
4,500✔
630
    if (pReadyRsp == NULL) {
4,501!
UNCOV
631
      return terrno;
×
632
    }
633

634
    pReadyRsp->upstreamTaskId = req.upstreamTaskId;
4,501✔
635
    pReadyRsp->upstreamNodeId = req.upstreamNodeId;
4,501✔
636
    pReadyRsp->downstreamTaskId = req.downstreamTaskId;
4,501✔
637
    pReadyRsp->downstreamNodeId = req.downstreamNodeId;
4,501✔
638
    pReadyRsp->checkpointId = req.checkpointId;
4,501✔
639
    pReadyRsp->streamId = req.streamId;
4,501✔
640
    pReadyRsp->head.vgId = htonl(req.downstreamNodeId);
4,501✔
641

642
    SRpcMsg rsp = {.code = 0, .info = pMsg->info, .pCont = pReadyRsp, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)};
4,501✔
643
    tmsgSendRsp(&rsp);
4,501✔
644

645
    pMsg->info.handle = NULL;  // disable auto rsp
4,507✔
646
  }
647

648
  return code;
4,507✔
649
}
650

651
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
11,817✔
652
                                     bool isLeader, bool restored) {
653
  int32_t code = 0;
11,817✔
654
  int32_t vgId = pMeta->vgId;
11,817✔
655
  int32_t numOfTasks = 0;
11,817✔
656
  int32_t taskId = -1;
11,817✔
657
  int64_t streamId = -1;
11,817✔
658
  bool    added = false;
11,817✔
659
  int32_t size = sizeof(SStreamTask);
11,817✔
660

661
  if (tsDisableStream) {
11,817!
UNCOV
662
    tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
×
663
    return code;
×
664
  }
665

666
  tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
11,817✔
667

668
  // 1.deserialize msg and build task
669
  SStreamTask* pTask = taosMemoryCalloc(1, size);
11,817!
670
  if (pTask == NULL) {
11,829!
UNCOV
671
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);
×
672
    return terrno;
×
673
  }
674

675
  SDecoder decoder;
676
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
11,829✔
677
  code = tDecodeStreamTask(&decoder, pTask);
11,831✔
678
  tDecoderClear(&decoder);
11,829✔
679

680
  if (code != TSDB_CODE_SUCCESS) {
11,829!
UNCOV
681
    taosMemoryFree(pTask);
×
682
    return TSDB_CODE_INVALID_MSG;
×
683
  }
684

685
  // 2.save task, use the latest commit version as the initial start version of stream task.
686
  taskId = pTask->id.taskId;
11,829✔
687
  streamId = pTask->id.streamId;
11,829✔
688

689
  streamMetaWLock(pMeta);
11,829✔
690
  code = streamMetaRegisterTask(pMeta, sversion, pTask, &added);
11,838✔
691
  numOfTasks = streamMetaGetNumOfTasks(pMeta);
11,843✔
692
  streamMetaWUnLock(pMeta);
11,843✔
693

694
  if (code < 0) {
11,843!
UNCOV
695
    tqError("vgId:%d failed to register s-task:0x%x into meta, existed tasks:%d, code:%s", vgId, taskId, numOfTasks,
×
696
            tstrerror(code));
UNCOV
697
    return code;
×
698
  }
699

700
  // added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if
701
  // it is added into the meta store
702
  if (added) {
11,843!
703
    // only handled in the leader node
704
    if (isLeader) {
11,843✔
705
      tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
11,795✔
706

707
      if (restored) {
11,795✔
708
        SStreamTask* p = NULL;
11,791✔
709
        code = streamMetaAcquireTask(pMeta, streamId, taskId, &p);
11,791✔
710
        if ((p != NULL) && (code == 0) && (p->info.fillHistory == 0)) {
11,791!
711
          code = tqStreamStartOneTaskAsync(pMeta, cb, streamId, taskId);
8,124✔
712
        }
713

714
        if (p != NULL) {
11,791!
715
          streamMetaReleaseTask(pMeta, p);
11,791✔
716
        }
717
      } else {
718
        tqWarn("s-task:0x%x not launched since vnode(vgId:%d) not ready", taskId, vgId);
4!
719
      }
720

721
    } else {
722
      tqDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId);
48!
723
    }
724
  } else {
UNCOV
725
    tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store, total:%d", vgId, taskId, numOfTasks);
×
726
  }
727

728
  return code;
11,842✔
729
}
730

731
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
6,025✔
732
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
6,025✔
733
  int32_t              code = 0;
6,025✔
734
  int32_t              vgId = pMeta->vgId;
6,025✔
735
  STaskId              hTaskId = {0};
6,025✔
736
  SStreamTask*         pTask = NULL;
6,025✔
737

738
  tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
6,025✔
739

740
  streamMetaWLock(pMeta);
6,025✔
741

742
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
6,042✔
743
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
6,042✔
744
  if (code == 0) {
6,042!
745
    if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
6,042✔
746
      hTaskId.streamId = pTask->hTaskInfo.id.streamId;
1,406✔
747
      hTaskId.taskId = pTask->hTaskInfo.id.taskId;
1,406✔
748
    }
749

750
    // clear the relationship, and then release the stream tasks, to avoid invalid accessing of already freed
751
    // related stream(history) task
752
    streamTaskSetRemoveBackendFiles(pTask);
6,042✔
753
    code = streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt);
6,023✔
754
    streamMetaReleaseTask(pMeta, pTask);
6,034✔
755

756
    if (code) {
6,041!
UNCOV
757
      tqError("s-task:0x%x failed to clear related fill-history info, still exists", pReq->taskId);
×
758
    }
759
  }
760

761
  // drop the related fill-history task firstly
762
  if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
6,041!
763
    tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
1,406✔
764
    code = streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
1,406✔
765
    if (code) {
1,406!
UNCOV
766
      tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x failed", pReq->taskId, vgId,
×
767
              (int32_t)hTaskId.taskId);
768
    }
769
  }
770

771
  // drop the stream task now
772
  code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
6,041✔
773
  if (code) {
6,035!
UNCOV
774
    tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId);
×
775
  }
776

777
  // commit the update
778
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
6,035✔
779
  tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
6,039✔
780
  if (numOfTasks == 0) {
6,039✔
781
    streamMetaResetStartInfo(&pMeta->startInfo, vgId);
1,498✔
782
  }
783

784
  if (streamMetaCommit(pMeta) < 0) {
6,037✔
785
    // persist to disk
786
  }
787

788
  streamMetaWUnLock(pMeta);
6,045✔
789
  tqDebug("vgId:%d process drop task:0x%x completed", vgId, pReq->taskId);
6,045✔
790

791
  return 0;  // always return success
6,045✔
792
}
793

794
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg) {
2,302✔
795
  SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
2,302✔
796
  int32_t                    code = 0;
2,302✔
797
  int32_t                    vgId = pMeta->vgId;
2,302✔
798
  SStreamTask*               pTask = NULL;
2,302✔
799

800
  tqDebug("vgId:%d receive msg to update-checkpoint-info for s-task:0x%x", vgId, pReq->taskId);
2,302✔
801

802
  streamMetaWLock(pMeta);
2,302✔
803

804
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
2,306✔
805
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
2,306✔
806
  if (code == 0) {
2,305!
807
    code = streamTaskUpdateTaskCheckpointInfo(pTask, restored, pReq);
2,305✔
808
    streamMetaReleaseTask(pMeta, pTask);
2,307✔
809
  } else {  // failed to get the task.
UNCOV
810
    int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
×
811
    tqError(
×
812
        "vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, numOfTasks:%d, it may have been "
813
        "dropped already",
814
        vgId, pReq->taskId, numOfTasks);
815
  }
816

817
  streamMetaWUnLock(pMeta);
2,307✔
818
  // always return success when handling the requirement issued by mnode during transaction.
819
  return TSDB_CODE_SUCCESS;
2,307✔
820
}
821

822
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
41✔
823
  int32_t         vgId = pMeta->vgId;
41✔
824
  int32_t         code = 0;
41✔
825
  int64_t         st = taosGetTimestampMs();
41✔
826
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
41✔
827

828
  if (pStartInfo->startAllTasks == 1) {
41✔
829
    // wait for the checkpoint id rsp, this rsp will be expired
830
    if (pStartInfo->curStage == START_MARK_REQ_CHKPID) {
23!
UNCOV
831
      SStartTaskStageInfo* pCurStageInfo = taosArrayGetLast(pStartInfo->pStagesList);
×
832
      tqInfo("vgId:%d only mark the req consensus checkpointId flag, reqTs:%"PRId64 " ignore and continue", vgId, pCurStageInfo->ts);
×
833

UNCOV
834
      taosArrayClear(pStartInfo->pStagesList);
×
835
      pStartInfo->curStage = 0;
×
836
      goto _start;
×
837

838
    } else if (pStartInfo->curStage == START_WAIT_FOR_CHKPTID) {
23✔
839
      SStartTaskStageInfo* pCurStageInfo = taosArrayGetLast(pStartInfo->pStagesList);
3✔
840
      tqInfo("vgId:%d already sent consensus-checkpoint msg(waiting for chkptid) expired, reqTs:%" PRId64
3!
841
             " rsp will be discarded",
842
             vgId, pCurStageInfo->ts);
843

844
      taosArrayClear(pStartInfo->pStagesList);
3✔
845
      pStartInfo->curStage = 0;
3✔
846
      goto _start;
3✔
847

848
    } else if (pStartInfo->curStage == START_CHECK_DOWNSTREAM) {
20!
UNCOV
849
      int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet);
×
850
      taosHashGetSize(pStartInfo->pFailedTaskSet);
×
851

UNCOV
852
      int32_t newTotal = taosArrayGetSize(pStartInfo->pRecvChkptIdTasks);
×
853
      tqDebug(
×
854
          "vgId:%d start all tasks procedure is interrupted by transId:%d, wait for partial tasks rsp. recv check "
855
          "downstream results, received:%d results, total req tasks:%d",
856
          vgId, pMeta->updateInfo.activeTransId, numOfRecv, newTotal);
857

UNCOV
858
      bool allRsp = allCheckDownstreamRspPartial(pStartInfo, newTotal, pMeta->vgId);
×
859
      if (allRsp) {
×
860
        tqDebug("vgId:%d all partial results received, continue the restart procedure", pMeta->vgId);
×
861
        streamMetaResetStartInfo(pStartInfo, vgId);
×
862
        goto _start;
×
863
      } else {
UNCOV
864
        pStartInfo->restartCount += 1;
×
865
        SStartTaskStageInfo* pCurStageInfo = taosArrayGetLast(pStartInfo->pStagesList);
×
866

UNCOV
867
        tqDebug("vgId:%d in start tasks procedure (check downstream), reqTs:%" PRId64
×
868
                ", inc restartCounter by 1 and wait for it completes, "
869
                "remaining restart:%d",
870
                vgId, pCurStageInfo->ts, pStartInfo->restartCount);
871
      }
872
    } else {
873
      tqInfo("vgId:%d in start procedure, but not start to do anything yet, do nothing", vgId);
20!
874
    }
875

876
    return TSDB_CODE_SUCCESS;
20✔
877
  }
878

879
_start:
18✔
880

881
  pStartInfo->startAllTasks = 1;
21✔
882
  terrno = 0;
21✔
883
  tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d, ts:%" PRId64, vgId,
21!
884
         pMeta->updateInfo.completeTransId, pMeta->updateInfo.completeTs);
885

886
  streamMetaClear(pMeta);
21✔
887

888
  int64_t el = taosGetTimestampMs() - st;
21✔
889
  tqInfo("vgId:%d clear&close stream meta completed, elapsed time:%.3fs", vgId, el / 1000.);
21!
890

891
  streamMetaLoadAllTasks(pMeta);
21✔
892

893
  if (isLeader && !tsDisableStream) {
21!
894
    code = streamMetaStartAllTasks(pMeta);
21✔
895
  } else {
UNCOV
896
    streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
×
897
    pStartInfo->restartCount = 0;
×
898
    tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId);
×
899
  }
900

901
  code = terrno;
21✔
902
  return code;
21✔
903
}
904

905
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
70,079✔
906
  int32_t  code = 0;
70,079✔
907
  int32_t  vgId = pMeta->vgId;
70,079✔
908
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
70,079✔
909
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
70,079✔
910
  SDecoder decoder;
911

912
  SStreamTaskRunReq req = {0};
70,079✔
913
  tDecoderInit(&decoder, (uint8_t*)msg, len);
70,079✔
914
  if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
70,269!
UNCOV
915
    tqError("vgId:%d failed to decode task run req, code:%s", pMeta->vgId, tstrerror(code));
×
916
    tDecoderClear(&decoder);
×
917
    return TSDB_CODE_SUCCESS;
×
918
  }
919

920
  tDecoderClear(&decoder);
70,265✔
921

922
  int32_t type = req.reqType;
70,276✔
923
  if (type == STREAM_EXEC_T_START_ONE_TASK) {
70,276✔
924
    code = streamMetaStartOneTask(pMeta, req.streamId, req.taskId);
8,228✔
925
    return 0;
8,224✔
926
  } else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
62,048✔
927
    streamMetaWLock(pMeta);
10,157✔
928
    code = streamMetaStartAllTasks(pMeta);
10,153✔
929
    streamMetaWUnLock(pMeta);
10,157✔
930
    return 0;
10,157✔
931
  } else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) {
51,891✔
932
    streamMetaWLock(pMeta);
41✔
933
    code = restartStreamTasks(pMeta, isLeader);
41✔
934
    streamMetaWUnLock(pMeta);
41✔
935
    return 0;
33✔
936
  } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
51,850✔
937
    code = streamMetaStopAllTasks(pMeta);
5,793✔
938
    return 0;
5,799✔
939
  } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
46,057✔
940
    code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId, true);
1✔
941
    return code;
1✔
942
  } else if (type == STREAM_EXEC_T_STOP_ONE_TASK) {
46,056!
UNCOV
943
    code = streamMetaStopOneTask(pMeta, req.streamId, req.taskId);
×
944
    return code;
×
945
  } else if (type == STREAM_EXEC_T_RESUME_TASK) {  // task resume to run after idle for a while
46,056✔
946
    SStreamTask* pTask = NULL;
3,805✔
947
    code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
3,805✔
948

949
    if (pTask != NULL && (code == 0)) {
3,807!
950
      char* pStatus = NULL;
3,807✔
951
      if (streamTaskReadyToRun(pTask, &pStatus)) {
3,807!
952
        int64_t execTs = pTask->status.lastExecTs;
3,798✔
953
        int32_t idle = taosGetTimestampMs() - execTs;
3,804✔
954
        tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs);
3,804✔
955

956
        code = streamResumeTask(pTask);
3,804✔
957
      } else {
UNCOV
958
        int8_t status = streamTaskSetSchedStatusInactive(pTask);
×
959
        tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
×
960
                pTask->id.idStr, pStatus, status);
961
      }
962
      streamMetaReleaseTask(pMeta, pTask);
3,803✔
963
    }
964

965
    return code;
3,806✔
966
  }
967

968
  SStreamTask* pTask = NULL;
42,251✔
969
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
42,251✔
970
  if ((pTask != NULL) && (code == 0)) {  // even in halt status, the data in inputQ must be processed
42,225!
971
    char* p = NULL;
42,231✔
972
    if (streamTaskReadyToRun(pTask, &p)) {
42,231✔
973
      tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId,
41,879✔
974
              pTask->id.idStr, p, pTask->chkInfo.nextProcessVer);
975
      (void)streamExecTask(pTask);
41,879✔
976
    } else {
977
      int8_t status = streamTaskSetSchedStatusInactive(pTask);
340✔
978
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
345!
979
              pTask->id.idStr, p, status);
980
    }
981

982
    streamMetaReleaseTask(pMeta, pTask);
42,214✔
983
    return 0;
42,234✔
984
  } else {  // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
985
    // todo add one function to handle this
UNCOV
986
    tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, req.taskId);
×
987
    return code;
12✔
988
  }
989
}
990

991
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
40✔
992
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
40✔
993
  int32_t         vgId = pMeta->vgId;
40✔
994
  bool            scanWal = false;
40✔
995
  int32_t         code = 0;
40✔
996

997
//  streamMetaWLock(pMeta);
998
  if (pStartInfo->startAllTasks == 1) {
40!
UNCOV
999
    tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId,
×
1000
            pMeta->startInfo.restartCount);
1001
  } else {  // not in starting procedure
1002
    bool allReady = streamMetaAllTasksReady(pMeta);
40✔
1003

1004
    if ((pStartInfo->restartCount > 0) && (!allReady)) {
40!
1005
      // if all tasks are ready now, do NOT restart again, and reset the value of pStartInfo->restartCount
UNCOV
1006
      pStartInfo->restartCount -= 1;
×
1007
      tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", vgId, pMeta->role,
×
1008
              pStartInfo->restartCount);
1009

UNCOV
1010
      return restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
×
1011
    } else {
1012
      if (pStartInfo->restartCount == 0) {
40!
1013
        tqDebug("vgId:%d start all tasks completed in callbackFn, restartCounter is 0", pMeta->vgId);
40✔
UNCOV
1014
      } else if (allReady) {
×
1015
        pStartInfo->restartCount = 0;
×
1016
        tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
×
1017
      }
1018

1019
      scanWal = true;
40✔
1020
    }
1021
  }
1022

1023
//  streamMetaWUnLock(pMeta);
1024

1025
  return code;
40✔
1026
}
1027

UNCOV
1028
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
×
1029
  SVResetStreamTaskReq* pReq = (SVResetStreamTaskReq*)pMsg;
×
1030

UNCOV
1031
  SStreamTask* pTask = NULL;
×
1032
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
×
1033
  if (pTask == NULL || (code != 0)) {
×
1034
    tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already",
×
1035
            pMeta->vgId, pReq->taskId);
UNCOV
1036
    return TSDB_CODE_SUCCESS;
×
1037
  }
1038

UNCOV
1039
  tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
×
1040

UNCOV
1041
  streamMutexLock(&pTask->lock);
×
1042

UNCOV
1043
  streamTaskSetFailedCheckpointId(pTask, pReq->chkptId);
×
1044
  streamTaskClearCheckInfo(pTask, true);
×
1045

1046
  // clear flag set during do checkpoint, and open inputQ for all upstream tasks
UNCOV
1047
  SStreamTaskState pState = streamTaskGetStatus(pTask);
×
1048
  if (pState.state == TASK_STATUS__CK) {
×
1049
    streamTaskSetStatusReady(pTask);
×
1050
    tqDebug("s-task:%s reset checkpoint status to ready", pTask->id.idStr);
×
1051
  } else if (pState.state == TASK_STATUS__UNINIT) {
×
1052
    //    tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
1053
    //    tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
UNCOV
1054
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
1055
  } else {
UNCOV
1056
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
1057
  }
1058

UNCOV
1059
  streamMutexUnlock(&pTask->lock);
×
1060

UNCOV
1061
  streamMetaReleaseTask(pMeta, pTask);
×
1062
  return TSDB_CODE_SUCCESS;
×
1063
}
1064

1065
int32_t tqStreamTaskProcessAllTaskStopReq(SStreamMeta* pMeta, SMsgCb* pMsgCb, SRpcMsg* pMsg) {
4,177✔
1066
  int32_t  code = 0;
4,177✔
1067
  int32_t  vgId = pMeta->vgId;
4,177✔
1068
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
4,177✔
1069
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
4,177✔
1070
  SDecoder decoder;
1071

1072
  SStreamTaskStopReq req = {0};
4,177✔
1073
  tDecoderInit(&decoder, (uint8_t*)msg, len);
4,177✔
1074
  if ((code = tDecodeStreamTaskStopReq(&decoder, &req)) < 0) {
4,174!
UNCOV
1075
    tqError("vgId:%d failed to decode stop all streams, code:%s", pMeta->vgId, tstrerror(code));
×
1076
    tDecoderClear(&decoder);
×
1077
    return TSDB_CODE_SUCCESS;
×
1078
  }
1079

1080
  tDecoderClear(&decoder);
4,183✔
1081

1082
  // stop all stream tasks, only invoked when trying to drop db
1083
  if (req.streamId <= 0) {
4,180!
1084
    tqDebug("vgId:%d recv msg to stop all tasks in sync before dropping vnode", vgId);
4,189✔
1085
    code = streamMetaStopAllTasks(pMeta);
4,192✔
1086
    if (code) {
4,189!
UNCOV
1087
      tqError("vgId:%d failed to stop all tasks, code:%s", vgId, tstrerror(code));
×
1088
    }
1089

1090
  } else {  // stop only one stream tasks
1091

1092
  }
1093

1094
  // always return success
1095
  return TSDB_CODE_SUCCESS;
4,189✔
1096
}
1097

UNCOV
1098
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
1099
  SRetrieveChkptTriggerReq req = {0};
×
1100
  SStreamTask*             pTask = NULL;
×
1101
  char*                    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1102
  int32_t                  len = pMsg->contLen - sizeof(SMsgHead);
×
1103
  SDecoder                 decoder = {0};
×
1104

UNCOV
1105
  tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1106
  if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
×
1107
    tDecoderClear(&decoder);
×
1108
    tqError("vgId:%d invalid retrieve checkpoint-trigger req received", pMeta->vgId);
×
1109
    return TSDB_CODE_INVALID_MSG;
×
1110
  }
UNCOV
1111
  tDecoderClear(&decoder);
×
1112

UNCOV
1113
  int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
×
1114
  if (pTask == NULL || (code != 0)) {
×
1115
    tqError("vgId:%d process retrieve checkpoint-trigger, checkpointId:%" PRId64
×
1116
            " from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already",
1117
            pMeta->vgId, req.checkpointId, (int32_t)req.downstreamTaskId, req.upstreamTaskId);
UNCOV
1118
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
1119
  }
1120

UNCOV
1121
  tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64,
×
1122
          req.upstreamTaskId, (int32_t)req.downstreamTaskId, req.checkpointId);
1123

UNCOV
1124
  if (pTask->status.downstreamReady != 1) {
×
1125
    tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
×
1126
            pTask->id.idStr, (int32_t)req.downstreamTaskId);
1127

UNCOV
1128
    code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1129
                                              TSDB_CODE_STREAM_TASK_IVLD_STATUS);
UNCOV
1130
    streamMetaReleaseTask(pMeta, pTask);
×
1131
    return code;
×
1132
  }
1133

UNCOV
1134
  SStreamTaskState pState = streamTaskGetStatus(pTask);
×
1135
  if (pState.state == TASK_STATUS__CK) {  // recv the checkpoint-source/trigger already
×
1136
    int32_t transId = 0;
×
1137
    int64_t checkpointId = 0;
×
1138

UNCOV
1139
    streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId);
×
1140
    if (checkpointId != req.checkpointId) {
×
1141
      tqError("s-task:%s invalid checkpoint-trigger retrieve msg from 0x%" PRIx64 ", current checkpointId:%" PRId64
×
1142
              " req:%" PRId64,
1143
              pTask->id.idStr, req.downstreamTaskId, checkpointId, req.checkpointId);
UNCOV
1144
      streamMetaReleaseTask(pMeta, pTask);
×
1145
      return TSDB_CODE_INVALID_MSG;
×
1146
    }
1147

UNCOV
1148
    if (streamTaskAlreadySendTrigger(pTask, req.downstreamNodeId)) {
×
1149
      // re-send the lost checkpoint-trigger msg to downstream task
UNCOV
1150
      tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
×
1151
              (int32_t)req.downstreamTaskId, checkpointId, transId);
UNCOV
1152
      code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1153
                                                TSDB_CODE_SUCCESS);
1154
    } else {  // not send checkpoint-trigger yet, wait
UNCOV
1155
      int32_t recv = 0, total = 0;
×
1156
      streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
×
1157

UNCOV
1158
      if (recv == total) {  // add the ts info
×
1159
        tqWarn("s-task:%s all upstream send checkpoint-source/trigger, but not processed yet, wait", pTask->id.idStr);
×
1160
      } else {
UNCOV
1161
        tqWarn(
×
1162
            "s-task:%s not all upstream send checkpoint-source/trigger, total recv:%d/%d, wait for all upstream "
1163
            "sending checkpoint-source/trigger",
1164
            pTask->id.idStr, recv, total);
1165
      }
UNCOV
1166
      code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1167
                                                TSDB_CODE_ACTION_IN_PROGRESS);
1168
    }
1169
  } else {  // upstream not recv the checkpoint-source/trigger till now
UNCOV
1170
    if (!(pState.state == TASK_STATUS__READY || pState.state == TASK_STATUS__HALT)) {
×
1171
      tqFatal("s-task:%s invalid task status:%s", pTask->id.idStr, pState.name);
×
1172
    }
1173

UNCOV
1174
    tqWarn(
×
1175
        "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
1176
        "upstream sending checkpoint-source/trigger",
1177
        pTask->id.idStr);
UNCOV
1178
    code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1179
                                              TSDB_CODE_ACTION_IN_PROGRESS);
1180
  }
1181

UNCOV
1182
  streamMetaReleaseTask(pMeta, pTask);
×
1183
  return code;
×
1184
}
1185

UNCOV
1186
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
1187
  SCheckpointTriggerRsp rsp = {0};
×
1188
  SStreamTask*          pTask = NULL;
×
1189
  char*                 msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1190
  int32_t               len = pMsg->contLen - sizeof(SMsgHead);
×
1191
  SDecoder              decoder = {0};
×
1192

UNCOV
1193
  tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1194
  if (tDecodeCheckpointTriggerRsp(&decoder, &rsp) < 0) {
×
1195
    tDecoderClear(&decoder);
×
1196
    tqError("vgId:%d invalid retrieve checkpoint-trigger rsp received", pMeta->vgId);
×
1197
    return TSDB_CODE_INVALID_MSG;
×
1198
  }
UNCOV
1199
  tDecoderClear(&decoder);
×
1200

UNCOV
1201
  int32_t code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.taskId, &pTask);
×
1202
  if (pTask == NULL || (code != 0)) {
×
1203
    tqError(
×
1204
        "vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already",
1205
        pMeta->vgId, rsp.taskId);
UNCOV
1206
    return code;
×
1207
  }
1208

UNCOV
1209
  tqDebug(
×
1210
      "s-task:%s recv re-send checkpoint-trigger msg through retrieve/rsp channel, upstream:0x%x, checkpointId:%" PRId64
1211
      ", transId:%d",
1212
      pTask->id.idStr, rsp.upstreamTaskId, rsp.checkpointId, rsp.transId);
1213

UNCOV
1214
  code = streamTaskProcessCheckpointTriggerRsp(pTask, &rsp);
×
1215
  streamMetaReleaseTask(pMeta, pTask);
×
1216
  return code;
×
1217
}
1218

1219
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
744✔
1220
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
744✔
1221

1222
  SStreamTask* pTask = NULL;
744✔
1223
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
744✔
1224
  if (pTask == NULL || (code != 0)) {
748!
UNCOV
1225
    tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
×
1226
            pReq->taskId);
1227
    // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
UNCOV
1228
    return TSDB_CODE_SUCCESS;
×
1229
  }
1230

1231
  tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
749✔
1232
  streamTaskPause(pTask);
749✔
1233

1234
  SStreamTask* pHistoryTask = NULL;
750✔
1235
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
750!
UNCOV
1236
    pHistoryTask = NULL;
×
1237
    code = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId, &pHistoryTask);
×
1238
    if (pHistoryTask == NULL || (code != 0)) {
×
1239
      tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
×
1240
              ", it may have been dropped already",
1241
              pMeta->vgId, pTask->hTaskInfo.id.taskId);
UNCOV
1242
      streamMetaReleaseTask(pMeta, pTask);
×
1243

1244
      // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
UNCOV
1245
      return TSDB_CODE_SUCCESS;
×
1246
    }
1247

UNCOV
1248
    tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
×
1249

UNCOV
1250
    streamTaskPause(pHistoryTask);
×
1251
    streamMetaReleaseTask(pMeta, pHistoryTask);
×
1252
  }
1253

1254
  streamMetaReleaseTask(pMeta, pTask);
750✔
1255
  return TSDB_CODE_SUCCESS;
750✔
1256
}
1257

1258
static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t sversion, int8_t igUntreated,
1,242✔
1259
                                       bool fromVnode) {
1260
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
1,242✔
1261
  int32_t      vgId = pMeta->vgId;
1,242✔
1262
  int32_t      code = 0;
1,242✔
1263

1264
  streamTaskResume(pTask);
1,242✔
1265
  ETaskStatus status = streamTaskGetStatus(pTask).state;
1,247✔
1266

1267
  int32_t level = pTask->info.taskLevel;
1,247✔
1268
  if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
1,247!
1269
    // no lock needs to secure the access of the version
1270
    if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
1,247!
1271
      // discard all the data  when the stream task is suspended.
1272
      walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
253✔
1273
      tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
253✔
1274
              ", schedStatus:%d",
1275
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1276
    } else {  // from the previous paused version and go on
1277
      tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
994✔
1278
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1279
    }
1280

1281
    if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
1,247!
UNCOV
1282
      pTask->hTaskInfo.operatorOpen = false;
×
1283
      code = streamStartScanHistoryAsync(pTask, igUntreated);
×
1284
    } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
1,247✔
1285
      //      code = tqScanWalAsync((STQ*)handle, false);
1286
    } else {
1287
      code = streamTrySchedExec(pTask, false);
631✔
1288
    }
1289
  }
1290

1291
  return code;
1,247✔
1292
}
1293

1294
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode) {
1,237✔
1295
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
1,237✔
1296

1297
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
1,237✔
1298

1299
  SStreamTask* pTask = NULL;
1,237✔
1300
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
1,237✔
1301
  if (pTask == NULL || (code != 0)) {
1,242!
UNCOV
1302
    tqError("s-task:0x%x failed to acquire task to resume, it may have been dropped or stopped", pReq->taskId);
×
1303
    return TSDB_CODE_SUCCESS;
×
1304
  }
1305

1306
  streamMutexLock(&pTask->lock);
1,243✔
1307
  SStreamTaskState pState = streamTaskGetStatus(pTask);
1,246✔
1308
  tqDebug("s-task:%s start to resume from paused, current status:%s", pTask->id.idStr, pState.name);
1,246✔
1309
  streamMutexUnlock(&pTask->lock);
1,246✔
1310

1311
  code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
1,247✔
1312
  if (code != 0) {
1,246!
UNCOV
1313
    streamMetaReleaseTask(pMeta, pTask);
×
1314
    tqError("s-task:%s failed to resume tasks, code:%s", pTask->id.idStr, tstrerror(code));
×
1315
    return TSDB_CODE_SUCCESS;
×
1316
  }
1317

1318
  STaskId*     pHTaskId = &pTask->hTaskInfo.id;
1,246✔
1319
  SStreamTask* pHTask = NULL;
1,246✔
1320
  code = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
1,246✔
1321
  if (pHTask && (code == 0)) {
1,247!
UNCOV
1322
    streamMutexLock(&pHTask->lock);
×
1323
    SStreamTaskState p = streamTaskGetStatus(pHTask);
×
1324
    tqDebug("s-task:%s related history task start to resume from paused, current status:%s", pHTask->id.idStr, p.name);
×
1325
    streamMutexUnlock(&pHTask->lock);
×
1326

UNCOV
1327
    code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode);
×
1328
    tqDebug("s-task:%s resume complete, code:%s", pHTask->id.idStr, tstrerror(code));
×
1329

UNCOV
1330
    streamMetaReleaseTask(pMeta, pHTask);
×
1331
  }
1332

1333
  streamMetaReleaseTask(pMeta, pTask);
1,247✔
1334
  return TSDB_CODE_SUCCESS;
1,245✔
1335
}
1336

UNCOV
1337
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); }
×
1338

1339
int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) {
6,317✔
1340
  rpcFreeCont(pMsg->pCont);
6,317✔
1341
  pMsg->pCont = NULL;
6,318✔
1342
  return TSDB_CODE_SUCCESS;
6,318✔
1343
}
1344

1345
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
9,641✔
1346
  SMStreamHbRspMsg rsp = {0};
9,641✔
1347
  int32_t          code = 0;
9,641✔
1348
  SDecoder         decoder;
1349
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
9,641✔
1350
  int32_t          len = pMsg->contLen - sizeof(SMsgHead);
9,641✔
1351

1352
  tDecoderInit(&decoder, (uint8_t*)msg, len);
9,641✔
1353
  code = tDecodeStreamHbRsp(&decoder, &rsp);
9,634✔
1354
  if (code < 0) {
9,646!
UNCOV
1355
    terrno = TSDB_CODE_INVALID_MSG;
×
1356
    tDecoderClear(&decoder);
×
1357
    tqError("vgId:%d failed to parse hb rsp msg, code:%s", pMeta->vgId, tstrerror(terrno));
×
1358
    return terrno;
×
1359
  }
1360

1361
  tDecoderClear(&decoder);
9,646✔
1362
  return streamProcessHeartbeatRsp(pMeta, &rsp);
9,638✔
1363
}
1364

1365
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
3,194✔
1366

1367
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
3,123✔
1368

1369
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
4,505✔
1370
  SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont;
4,505✔
1371

1372
  SStreamTask* pTask = NULL;
4,505✔
1373
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->downstreamTaskId, &pTask);
4,505✔
1374
  if (pTask == NULL || (code != 0)) {
4,504!
UNCOV
1375
    tqError("vgId:%d failed to acquire task:0x%x when handling checkpoint-ready msg, it may have been dropped",
×
1376
            pRsp->downstreamNodeId, pRsp->downstreamTaskId);
UNCOV
1377
    return code;
×
1378
  }
1379

1380
  code = streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId);
4,504✔
1381
  streamMetaReleaseTask(pMeta, pTask);
4,504✔
1382
  return code;
4,502✔
1383
}
1384

1385
int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
99✔
1386
  int32_t                vgId = pMeta->vgId;
99✔
1387
  int32_t                code = 0;
99✔
1388
  SStreamTask*           pTask = NULL;
99✔
1389
  char*                  msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
99✔
1390
  int32_t                len = pMsg->contLen - sizeof(SMsgHead);
99✔
1391
  int64_t                now = taosGetTimestampMs();
99✔
1392
  SDecoder               decoder;
1393
  SRestoreCheckpointInfo req = {0};
99✔
1394

1395
  tDecoderInit(&decoder, (uint8_t*)msg, len);
99✔
1396
  if ((code = tDecodeRestoreCheckpointInfo(&decoder, &req)) < 0) {
99!
UNCOV
1397
    tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code));
×
1398
    tDecoderClear(&decoder);
×
1399
    return TSDB_CODE_SUCCESS;
×
1400
  }
1401

1402
  tDecoderClear(&decoder);
99✔
1403

1404
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
99✔
1405
  if (pTask == NULL || (code != 0)) {
99!
1406
    // ignore this code to avoid error code over writing
UNCOV
1407
    if (pMeta->role == NODE_ROLE_LEADER) {
×
1408
      tqError("vgId:%d process consensus checkpointId req:%" PRId64
×
1409
              " transId:%d, failed to acquire task:0x%x, it may have been dropped/stopped already",
1410
              pMeta->vgId, req.checkpointId, req.transId, req.taskId);
1411

UNCOV
1412
      int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId, true);
×
1413
      if (ret) {
×
1414
        tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret));
×
1415
      }
1416
    } else {
UNCOV
1417
      tqDebug("vgId:%d task:0x%x stopped in follower node, not set the consensus checkpointId:%" PRId64 " transId:%d",
×
1418
              pMeta->vgId, req.taskId, req.checkpointId, req.transId);
1419
    }
1420

UNCOV
1421
    return 0;
×
1422
  }
1423

1424
  // discard the rsp, since it is expired.
1425
  if (req.startTs < pTask->execInfo.created) {
99!
UNCOV
1426
    tqWarn("s-task:%s vgId:%d createTs:%" PRId64 " recv expired consensus checkpointId:%" PRId64
×
1427
           " from task createTs:%" PRId64 " < task createTs:%" PRId64 ", discard",
1428
           pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs,
1429
           pTask->execInfo.created);
UNCOV
1430
    if (pMeta->role == NODE_ROLE_LEADER) {
×
1431
      streamMetaAddFailedTaskSelf(pTask, now, true);
×
1432
    }
1433

UNCOV
1434
    streamMetaReleaseTask(pMeta, pTask);
×
1435
    return TSDB_CODE_SUCCESS;
×
1436
  }
1437

1438
  tqInfo("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64
99!
1439
          " transId:%d from mnode, reqTs:%" PRId64 " task createTs:%" PRId64,
1440
          pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId, req.transId, req.startTs,
1441
          pTask->execInfo.created);
1442

1443
  streamMutexLock(&pTask->lock);
99✔
1444
  SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo;
100✔
1445

1446
  if (pTask->chkInfo.checkpointId < req.checkpointId) {
100!
UNCOV
1447
    tqFatal("s-task:%s vgId:%d invalid consensus-checkpointId:%" PRId64 ", greater than existed checkpointId:%" PRId64,
×
1448
            pTask->id.idStr, vgId, req.checkpointId, pTask->chkInfo.checkpointId);
1449

UNCOV
1450
    streamMutexUnlock(&pTask->lock);
×
1451
    streamMetaReleaseTask(pMeta, pTask);
×
1452
    return 0;
×
1453
  }
1454

1455
  if (pConsenInfo->consenChkptTransId >= req.transId) {
100!
UNCOV
1456
    tqWarn("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
×
1457
            pConsenInfo->consenChkptTransId, req.transId);
UNCOV
1458
    streamMutexUnlock(&pTask->lock);
×
1459
    streamMetaReleaseTask(pMeta, pTask);
×
1460
    return TSDB_CODE_SUCCESS;
×
1461
  }
1462

1463
  if (pTask->chkInfo.checkpointId != req.checkpointId) {
100!
UNCOV
1464
    tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64 " transId:%d", pTask->id.idStr, vgId,
×
1465
            pTask->chkInfo.checkpointId, req.checkpointId, req.transId);
UNCOV
1466
    pTask->chkInfo.checkpointId = req.checkpointId;
×
1467
    tqSetRestoreVersionInfo(pTask);
×
1468
  } else {
1469
    tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current id, transId:%d not update",
100✔
1470
            pTask->id.idStr, vgId, req.checkpointId, req.transId);
1471
  }
1472

1473
  streamTaskSetConsenChkptIdRecv(pTask, req.transId, now);
100✔
1474
  streamMutexUnlock(&pTask->lock);
100✔
1475

1476
  streamMetaWLock(pTask->pMeta);
100✔
1477
  if (pMeta->startInfo.curStage == START_WAIT_FOR_CHKPTID) {
100✔
1478
    pMeta->startInfo.curStage = START_CHECK_DOWNSTREAM;
26✔
1479

1480
    SStartTaskStageInfo info = {.stage = pMeta->startInfo.curStage, .ts = now};
26✔
1481
    taosArrayPush(pMeta->startInfo.pStagesList, &info);
26✔
1482

1483
    tqDebug("vgId:%d wait_for_chkptId stage -> check_down_stream stage, reqTs:%" PRId64 " , numOfStageHist:%d",
26✔
1484
            pMeta->vgId, info.ts, (int32_t)taosArrayGetSize(pMeta->startInfo.pStagesList));
1485
  }
1486

1487
  if (pMeta->role == NODE_ROLE_LEADER) {
100!
1488
    STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
100✔
1489

1490
    bool exist = false;
100✔
1491
    for (int32_t i = 0; i < taosArrayGetSize(pMeta->startInfo.pRecvChkptIdTasks); ++i) {
297✔
1492
      STaskId* pId = taosArrayGet(pMeta->startInfo.pRecvChkptIdTasks, i);
197✔
1493
      if (id.streamId == pId->streamId && id.taskId == pId->taskId) {
197!
UNCOV
1494
        exist = true;
×
1495
        break;
×
1496
      }
1497
    }
1498

1499
    if (!exist) {
100!
1500
      void* p = taosArrayPush(pMeta->startInfo.pRecvChkptIdTasks, &id);
100✔
1501
      if (p == NULL) {  // todo handle error, not record the newly attached, start may dead-lock
100!
UNCOV
1502
        tqError("s-task:0x%x failed to add into recv checkpointId task list, code:%s", req.taskId, tstrerror(code));
×
1503
      } else {
1504
        int32_t num = taosArrayGetSize(pMeta->startInfo.pRecvChkptIdTasks);
100✔
1505
        tqDebug("s-task:0x%x vgId:%d added into recv checkpointId task list, already recv %d", req.taskId, req.nodeId,
100✔
1506
                num);
1507
      }
1508
    } else {
UNCOV
1509
      int32_t num = taosArrayGetSize(pMeta->startInfo.pRecvChkptIdTasks);
×
1510
      tqDebug("s-task:0x%x vgId:%d already exist in recv consensus checkpontId, total existed:%d", req.taskId,
×
1511
              req.nodeId, num);
1512
    }
1513

1514
    code = tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
100✔
1515
    if (code != 0) {
99!
1516
      // todo remove the newly added, otherwise, deadlock exist
UNCOV
1517
      tqError("s-task:0x%x vgId:%d failed start task async, code:%s", req.taskId, vgId, tstrerror(code));
×
1518
    }
1519
  } else {
UNCOV
1520
    tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr);
×
1521
  }
1522

1523
  streamMetaWUnLock(pTask->pMeta);
99✔
1524

1525
  streamMetaReleaseTask(pMeta, pTask);
100✔
1526
  return 0;
100✔
1527
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc