• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3593

24 Jan 2025 08:57AM UTC coverage: 63.239% (-0.3%) from 63.546%
#3593

push

travis-ci

web-flow
Merge pull request #29638 from taosdata/docs/TS-5846-3.0

enh: TDengine modify taosBenchmark new query rule cases and add doc

140619 of 285630 branches covered (49.23%)

Branch coverage included in aggregate %.

218877 of 282844 relevant lines covered (77.38%)

19647377.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.83
/source/dnode/vnode/src/tqCommon/tqCommon.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsgcb.h"
17
#include "tq.h"
18
#include "tstream.h"
19

20
typedef struct SMStreamCheckpointReadyRspMsg {
21
  SMsgHead head;
22
  int64_t  streamId;
23
  int32_t  upstreamTaskId;
24
  int32_t  upstreamNodeId;
25
  int32_t  downstreamTaskId;
26
  int32_t  downstreamNodeId;
27
  int64_t  checkpointId;
28
  int32_t  transId;
29
} SMStreamCheckpointReadyRspMsg;
30

31
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
32

33
int32_t tqExpandStreamTask(SStreamTask* pTask) {
14,515✔
34
  SStreamMeta* pMeta = pTask->pMeta;
14,515✔
35
  int32_t      vgId = pMeta->vgId;
14,515✔
36
  int64_t      st = taosGetTimestampMs();
14,514✔
37
  int64_t      streamId = 0;
14,514✔
38
  int32_t      taskId = 0;
14,514✔
39
  int32_t      code = 0;
14,514✔
40

41
  tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId);
14,514✔
42

43
  if (pTask->info.fillHistory) {
14,514✔
44
    streamId = pTask->streamTaskId.streamId;
5,021✔
45
    taskId = pTask->streamTaskId.taskId;
5,021✔
46
  } else {
47
    streamId = pTask->id.streamId;
9,493✔
48
    taskId = pTask->id.taskId;
9,493✔
49
  }
50

51
  // sink task does not need the pState
52
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
14,514✔
53
    pTask->pState = streamStateOpen(pMeta->path, pTask, streamId, taskId);
7,571✔
54
    if (pTask->pState == NULL) {
7,573!
55
      tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
×
56
      return terrno;
×
57
    } else {
58
      tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
7,573✔
59
    }
60
  }
61

62
  SReadHandle handle = {
14,517✔
63
      .checkpointId = pTask->chkInfo.checkpointId,
14,517✔
64
      .pStateBackend = pTask->pState,
14,517✔
65
      .fillHistory = pTask->info.fillHistory,
14,517✔
66
      .winRange = pTask->dataRange.window,
67
  };
68

69
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
14,517✔
70
    handle.vnode = ((STQ*)pMeta->ahandle)->pVnode;
7,260✔
71
    handle.initTqReader = 1;
7,260✔
72
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
7,257✔
73
    handle.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
313✔
74
  }
75

76
  initStorageAPI(&handle.api);
14,517✔
77

78
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG) {
14,515✔
79
    code = qCreateStreamExecTaskInfo(&pTask->exec.pExecutor, pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
7,573✔
80
    if (code) {
7,572!
81
      tqError("s-task:%s failed to expand task, code:%s", pTask->id.idStr, tstrerror(code));
×
82
      return code;
×
83
    }
84

85
    code = qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
7,572✔
86
    if (code) {
7,572!
87
      return code;
×
88
    }
89

90
    code =
91
        qSetStreamNotifyInfo(pTask->exec.pExecutor, pTask->notifyInfo.notifyEventTypes,
7,572✔
92
                             pTask->notifyInfo.pSchemaWrapper, pTask->notifyInfo.stbFullName, IS_NEW_SUBTB_RULE(pTask));
7,572!
93
    if (code) {
7,572!
94
      tqError("s-task:%s failed to set stream notify info, code:%s", pTask->id.idStr, tstrerror(code));
×
95
      return code;
×
96
    }
97
  }
98

99
  streamSetupScheduleTrigger(pTask);
14,514✔
100

101
  double el = (taosGetTimestampMs() - st) / 1000.0;
14,515✔
102
  tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el);
14,515✔
103

104
  return code;
14,517✔
105
}
106

107
void tqSetRestoreVersionInfo(SStreamTask* pTask) {
14,972✔
108
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
14,972✔
109

110
  // checkpoint ver is the kept version, handled data should be the next version.
111
  if (pChkInfo->checkpointId != 0) {
14,972✔
112
    pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
238✔
113
    pChkInfo->processedVer = pChkInfo->checkpointVer;
238✔
114
    pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
238✔
115

116
    tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
238!
117
           pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
118
  }
119

120
  pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
14,972✔
121
}
14,972✔
122

123
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
41✔
124
  int32_t vgId = pMeta->vgId;
41✔
125
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
41✔
126
  if (numOfTasks == 0) {
41!
127
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
128
    return 0;
×
129
  }
130

131
  tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
41✔
132

133
  int32_t type = restart ? STREAM_EXEC_T_RESTART_ALL_TASKS : STREAM_EXEC_T_START_ALL_TASKS;
41!
134
  return streamTaskSchedTask(cb, vgId, 0, 0, type);
41✔
135
}
136

137
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
9,348✔
138
  int32_t vgId = pMeta->vgId;
9,348✔
139
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
9,348✔
140
  if (numOfTasks == 0) {
9,348!
141
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
142
    return 0;
×
143
  }
144

145
  tqDebug("vgId:%d start task:0x%x async", vgId, taskId);
9,348✔
146
  return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK);
9,348✔
147
}
148

149
// this is to process request from transaction, always return true.
150
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
87✔
151
  int32_t      vgId = pMeta->vgId;
87✔
152
  char*        msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
87✔
153
  int32_t      len = pMsg->contLen - sizeof(SMsgHead);
87✔
154
  SRpcMsg      rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
87✔
155
  int64_t      st = taosGetTimestampMs();
87✔
156
  bool         updated = false;
87✔
157
  int32_t      code = 0;
87✔
158
  SStreamTask* pTask = NULL;
87✔
159
  SStreamTask* pHTask = NULL;
87✔
160

161
  SStreamTaskNodeUpdateMsg req = {0};
87✔
162

163
  SDecoder decoder;
164
  tDecoderInit(&decoder, (uint8_t*)msg, len);
87✔
165
  if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
87!
166
    rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
×
167
    tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code));
×
168
    tDecoderClear(&decoder);
×
169
    return rsp.code;
×
170
  }
171

172
  tDecoderClear(&decoder);
87✔
173

174
  int32_t gError = streamGetFatalError(pMeta);
87✔
175
  if (gError != 0) {
87!
176
    tqError("vgId:%d global fatal occurs, code:%s, ts:%" PRId64 " func:%s", pMeta->vgId, tstrerror(gError),
×
177
            pMeta->fatalInfo.ts, pMeta->fatalInfo.func);
178
    return 0;
×
179
  }
180

181
  // update the nodeEpset when it exists
182
  streamMetaWLock(pMeta);
87✔
183

184
  // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
185
  STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
87✔
186
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
87✔
187
  if (code != 0) {
87!
188
    tqError("vgId:%d failed to acquire task:0x%x when handling update task epset, it may have been dropped", vgId,
×
189
            req.taskId);
190
    rsp.code = TSDB_CODE_SUCCESS;
×
191
    streamMetaWUnLock(pMeta);
×
192
    taosArrayDestroy(req.pNodeList);
×
193
    return rsp.code;
×
194
  }
195

196
  const char* idstr = pTask->id.idStr;
87✔
197

198
  if (req.transId <= 0) {
87!
199
    tqError("vgId:%d invalid update nodeEp task, transId:%d, discard", vgId, req.taskId);
×
200
    rsp.code = TSDB_CODE_SUCCESS;
×
201

202
    streamMetaReleaseTask(pMeta, pTask);
×
203
    streamMetaWUnLock(pMeta);
×
204

205
    taosArrayDestroy(req.pNodeList);
×
206
    return rsp.code;
×
207
  }
208

209
  // info needs to be kept till the new trans to update the nodeEp arrived.
210
  bool update = streamMetaInitUpdateTaskList(pMeta, req.transId);
87✔
211
  if (!update) {
87!
212
    rsp.code = TSDB_CODE_SUCCESS;
×
213

214
    streamMetaReleaseTask(pMeta, pTask);
×
215
    streamMetaWUnLock(pMeta);
×
216

217
    taosArrayDestroy(req.pNodeList);
×
218
    return rsp.code;
×
219
  }
220

221
  // duplicate update epset msg received, discard this redundant message
222
  STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
87✔
223

224
  void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
87✔
225
  if (pReqTask != NULL) {
87!
226
    tqDebug("s-task:%s (vgId:%d) already update in transId:%d, discard the nodeEp update msg", idstr, vgId,
×
227
            req.transId);
228
    rsp.code = TSDB_CODE_SUCCESS;
×
229

230
    streamMetaReleaseTask(pMeta, pTask);
×
231
    streamMetaWUnLock(pMeta);
×
232

233
    taosArrayDestroy(req.pNodeList);
×
234
    return rsp.code;
×
235
  }
236

237
  updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
87✔
238

239
  // send the checkpoint-source-rsp for source task to end the checkpoint trans in mnode
240
  code = streamTaskSendCheckpointsourceRsp(pTask);
87✔
241
  if (code) {
87!
242
    tqError("%s failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
×
243
  }
244
  streamTaskResetStatus(pTask);
87✔
245

246
  streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
87✔
247

248
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
87✔
249
    code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->hTaskInfo.id, &pHTask);
70✔
250
    if (code != 0) {
70!
251
      tqError(
×
252
          "vgId:%d failed to acquire fill-history task:0x%x when handling update, may have been dropped already, rel "
253
          "stream task:0x%x",
254
          vgId, (uint32_t)pTask->hTaskInfo.id.taskId, req.taskId);
255
      CLEAR_RELATED_FILLHISTORY_TASK(pTask);
×
256
    } else {
257
      tqDebug("s-task:%s fill-history task update nodeEp along with stream task", pHTask->id.idStr);
70!
258
      bool updateEpSet = streamTaskUpdateEpsetInfo(pHTask, req.pNodeList);
70✔
259
      if (updateEpSet) {
70✔
260
        updated = updateEpSet;
66✔
261
      }
262

263
      streamTaskResetStatus(pHTask);
70✔
264
      streamTaskStopMonitorCheckRsp(&pHTask->taskCheckInfo, pHTask->id.idStr);
70✔
265
    }
266
  }
267

268
  // stream do update the nodeEp info, write it into stream meta.
269
  if (updated) {
87✔
270
    tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId);
78✔
271
    code = streamMetaSaveTask(pMeta, pTask);
78✔
272
    if (code) {
78!
273
      tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code));
×
274
    }
275

276
    if (pHTask != NULL) {
78✔
277
      code = streamMetaSaveTask(pMeta, pHTask);
66✔
278
      if (code) {
66!
279
        tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code));
×
280
      }
281
    }
282
  } else {
283
    tqDebug("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId);
9!
284
  }
285

286
  code = streamTaskStop(pTask);
87✔
287
  if (code) {
87!
288
    tqError("s-task:%s vgId:%d failed to stop task, code:%s", idstr, vgId, tstrerror(code));
×
289
  }
290

291
  if (pHTask != NULL) {
87✔
292
    code = streamTaskStop(pHTask);
70✔
293
    if (code) {
70!
294
      tqError("s-task:%s vgId:%d failed to stop related history task, code:%s", idstr, vgId, tstrerror(code));
×
295
    }
296
  }
297

298
  // keep info
299
  streamMetaAddIntoUpdateTaskList(pMeta, pTask, (pHTask != NULL) ? (pHTask) : NULL, req.transId, st);
87✔
300
  streamMetaReleaseTask(pMeta, pTask);
87✔
301
  streamMetaReleaseTask(pMeta, pHTask);
87✔
302

303
  rsp.code = TSDB_CODE_SUCCESS;
87✔
304

305
  // possibly only handle the stream task.
306
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
87✔
307
  int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
87✔
308

309
  if (restored) {
87✔
310
    tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId);
79✔
311
    pMeta->startInfo.tasksWillRestart = 1;
79✔
312
  }
313

314
  if (updateTasks < numOfTasks) {
87✔
315
    tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
42✔
316
            updateTasks, (numOfTasks - updateTasks));
317
  } else {
318
    if ((code = streamMetaCommit(pMeta)) < 0) {
45!
319
      // always return true
320
      streamMetaWUnLock(pMeta);
×
321
      taosArrayDestroy(req.pNodeList);
×
322
      return TSDB_CODE_SUCCESS;
×
323
    }
324

325
    streamMetaClearSetUpdateTaskListComplete(pMeta);
45✔
326

327
    if (!restored) {
45✔
328
      tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId);
4!
329
    } else {
330
      tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId);
41✔
331
#if 0
332
      taosMSleep(5000);// for test purpose, to trigger the leader election
333
#endif
334
      code = tqStreamTaskStartAsync(pMeta, cb, true);
41✔
335
      if (code) {
41!
336
        tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code));
×
337
      }
338
    }
339
  }
340

341
  streamMetaWUnLock(pMeta);
87✔
342
  taosArrayDestroy(req.pNodeList);
87✔
343
  return rsp.code;  // always return true
87✔
344
}
345

346
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
46,286✔
347
  char*   msgStr = pMsg->pCont;
46,286✔
348
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
46,286✔
349
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
46,286✔
350

351
  SStreamDispatchReq req = {0};
46,286✔
352

353
  SDecoder decoder;
354
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
46,286✔
355
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
46,283!
356
    tDecoderClear(&decoder);
×
357
    return TSDB_CODE_MSG_DECODE_ERROR;
×
358
  }
359
  tDecoderClear(&decoder);
46,256✔
360

361
  tqDebug("s-task:0x%x recv dispatch msg from 0x%x(vgId:%d)", req.taskId, req.upstreamTaskId, req.upstreamNodeId);
46,280✔
362

363
  SStreamTask* pTask = NULL;
46,281✔
364
  int32_t      code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
46,281✔
365
  if (pTask && (code == 0)) {
46,259!
366
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
46,079✔
367
    if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0) {
46,079✔
368
      return -1;
1✔
369
    }
370
    tCleanupStreamDispatchReq(&req);
46,083✔
371
    streamMetaReleaseTask(pMeta, pTask);
46,069✔
372
    return 0;
46,087✔
373
  } else {
374
    tqError("vgId:%d failed to find task:0x%x to handle the dispatch req, it may have been destroyed already",
180!
375
            pMeta->vgId, req.taskId);
376

377
    SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
180✔
378
    if (pRspHead == NULL) {
195!
379
      tqError("s-task:0x%x send dispatch error rsp, out of memory", req.taskId);
×
380
      return terrno;
×
381
    }
382

383
    pRspHead->vgId = htonl(req.upstreamNodeId);
195✔
384
    if (pRspHead->vgId == 0) {
195!
385
      tqError("vgId:%d invalid dispatch msg from upstream to task:0x%x", pMeta->vgId, req.taskId);
×
386
      return TSDB_CODE_INVALID_MSG;
×
387
    }
388

389
    SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
195✔
390
    pRsp->streamId = htobe64(req.streamId);
195✔
391
    pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
195✔
392
    pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
195✔
393
    pRsp->downstreamNodeId = htonl(pMeta->vgId);
195✔
394
    pRsp->downstreamTaskId = htonl(req.taskId);
195✔
395
    pRsp->msgId = htonl(req.msgId);
195✔
396
    pRsp->stage = htobe64(req.stage);
195✔
397
    pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;
195✔
398

399
    int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
195✔
400
    SRpcMsg rsp = {.code = TSDB_CODE_STREAM_TASK_NOT_EXIST, .info = pMsg->info, .contLen = len, .pCont = pRspHead};
195✔
401
    tqError("s-task:0x%x send dispatch error rsp, no task", req.taskId);
195!
402

403
    tmsgSendRsp(&rsp);
195✔
404
    tCleanupStreamDispatchReq(&req);
195✔
405

406
    return 0;
195✔
407
  }
408
}
409

410
int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
46,282✔
411
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
46,282✔
412

413
  int32_t vgId = pMeta->vgId;
46,282✔
414
  pRsp->upstreamNodeId = htonl(pRsp->upstreamNodeId);
46,282✔
415
  pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
46,282✔
416
  pRsp->streamId = htobe64(pRsp->streamId);
46,282✔
417
  pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId);
46,281✔
418
  pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId);
46,281✔
419
  pRsp->stage = htobe64(pRsp->stage);
46,281✔
420
  pRsp->msgId = htonl(pRsp->msgId);
46,281✔
421

422
  tqDebug("s-task:0x%x vgId:%d recv dispatch-rsp from 0x%x vgId:%d", pRsp->upstreamTaskId, pRsp->upstreamNodeId,
46,281✔
423
          pRsp->downstreamTaskId, pRsp->downstreamNodeId);
424

425
  SStreamTask* pTask = NULL;
46,281✔
426
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId, &pTask);
46,281✔
427
  if (pTask && (code == 0)) {
46,277!
428
    code = streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
46,222✔
429
    streamMetaReleaseTask(pMeta, pTask);
46,227✔
430
    return code;
46,227✔
431
  } else {
432
    tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId);
55!
433
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
56✔
434
  }
435
}
436

437
int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
543✔
438
  char*    msgStr = pMsg->pCont;
543✔
439
  char*    msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
543✔
440
  int32_t  msgLen = pMsg->contLen - sizeof(SMsgHead);
543✔
441
  int32_t  code = 0;
543✔
442
  SDecoder decoder;
443

444
  SStreamRetrieveReq req;
445
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
543✔
446
  code = tDecodeStreamRetrieveReq(&decoder, &req);
543✔
447
  tDecoderClear(&decoder);
543✔
448

449
  if (code) {
543!
450
    tqError("vgId:%d failed to decode retrieve msg, discard it", pMeta->vgId);
×
451
    return code;
×
452
  }
453

454
  SStreamTask* pTask = NULL;
543✔
455
  code = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId, &pTask);
543✔
456
  if (pTask == NULL || code != 0) {
543!
457
    tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
×
458
            req.dstTaskId);
459
    tCleanupStreamRetrieveReq(&req);
×
460
    return code;
×
461
  }
462

463
  // enqueue
464
  tqDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d),QID:0x%" PRIx64, pTask->id.idStr,
543✔
465
          pTask->pMeta->vgId, pTask->info.taskLevel, req.srcTaskId, req.srcNodeId, req.reqId);
466

467
  // if task is in ck status, set current ck failed
468
  streamTaskSetCheckpointFailed(pTask);
543✔
469

470
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
543✔
471
    code = streamProcessRetrieveReq(pTask, &req);
537✔
472
  } else {
473
    req.srcNodeId = pTask->info.nodeId;
6✔
474
    req.srcTaskId = pTask->id.taskId;
6✔
475
    code = streamTaskBroadcastRetrieveReq(pTask, &req);
6✔
476
  }
477

478
  if (code != TSDB_CODE_SUCCESS) {  // return error not send rsp manually
543!
479
    tqError("s-task:0x%x vgId:%d failed to process retrieve request from 0x%x, code:%s", req.dstTaskId, req.dstNodeId,
×
480
            req.srcTaskId, tstrerror(code));
481
  } else {  // send rsp manually only on success.
482
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
543✔
483
    streamTaskSendRetrieveRsp(&req, &rsp);
543✔
484
  }
485

486
  streamMetaReleaseTask(pMeta, pTask);
543✔
487
  tCleanupStreamRetrieveReq(&req);
543✔
488

489
  // always return success, to disable the auto rsp
490
  return code;
543✔
491
}
492

493
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
22,392✔
494
  char*   msgStr = pMsg->pCont;
22,392✔
495
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
22,392✔
496
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
22,392✔
497
  int32_t code = 0;
22,392✔
498

499
  SStreamTaskCheckReq req;
500
  SStreamTaskCheckRsp rsp = {0};
22,392✔
501

502
  SDecoder decoder;
503

504
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
22,392✔
505
  code = tDecodeStreamTaskCheckReq(&decoder, &req);
22,385✔
506
  tDecoderClear(&decoder);
22,372✔
507

508
  if (code) {
22,369!
509
    tqError("vgId:%d decode check msg failed, not handle this msg", pMeta->vgId);
×
510
    return code;
×
511
  }
512

513
  streamTaskProcessCheckMsg(pMeta, &req, &rsp);
22,369✔
514
  return streamTaskSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId);
22,374✔
515
}
516

517
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
22,629✔
518
  char*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
22,629✔
519
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
22,629✔
520
  int32_t vgId = pMeta->vgId;
22,629✔
521
  int32_t code = TSDB_CODE_SUCCESS;
22,629✔
522

523
  SStreamTaskCheckRsp rsp;
524

525
  SDecoder decoder;
526
  tDecoderInit(&decoder, (uint8_t*)pReq, len);
22,629✔
527
  code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
22,627✔
528
  if (code < 0) {
22,583!
529
    terrno = TSDB_CODE_INVALID_MSG;
×
530
    tDecoderClear(&decoder);
×
531
    tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno));
×
532
    return -1;
×
533
  }
534

535
  tDecoderClear(&decoder);
22,583✔
536
  tqDebug("tq task:0x%x (vgId:%d) recv check rsp(QID:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId,
22,605✔
537
          rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
538

539
  if (!isLeader) {
22,604✔
540
    tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
2!
541
            rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
542
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
2✔
543
  }
544

545
  SStreamTask* pTask = NULL;
22,602✔
546
  code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId, &pTask);
22,602✔
547
  if ((pTask == NULL) || (code != 0)) {
22,612!
548
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
17✔
549
  }
550

551
  code = streamTaskProcessCheckRsp(pTask, &rsp);
22,595✔
552
  streamMetaReleaseTask(pMeta, pTask);
22,609✔
553
  return code;
22,609✔
554
}
555

556
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
8,132✔
557
  int32_t vgId = pMeta->vgId;
8,132✔
558
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
8,132✔
559
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
8,132✔
560
  int32_t code = 0;
8,132✔
561

562
  SStreamCheckpointReadyMsg req = {0};
8,132✔
563

564
  SDecoder decoder;
565
  tDecoderInit(&decoder, (uint8_t*)msg, len);
8,132✔
566
  if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) {
8,127!
567
    code = TSDB_CODE_MSG_DECODE_ERROR;
×
568
    tDecoderClear(&decoder);
×
569
    return code;
×
570
  }
571
  tDecoderClear(&decoder);
8,112✔
572

573
  SStreamTask* pTask = NULL;
8,132✔
574
  code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
8,132✔
575
  if (code != 0) {
8,120✔
576
    tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId);
8!
577
    return code;
8✔
578
  }
579

580
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
8,112!
581
    tqDebug("vgId:%d s-task:%s recv invalid the checkpoint-ready msg from task:0x%x (vgId:%d), discard", vgId,
×
582
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
583
    streamMetaReleaseTask(pMeta, pTask);
×
584
    return TSDB_CODE_INVALID_MSG;
×
585
  } else {
586
    tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId,
8,112✔
587
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
588
  }
589

590
  code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamNodeId, req.downstreamTaskId);
8,112✔
591
  streamMetaReleaseTask(pMeta, pTask);
8,127✔
592
  if (code) {
8,121!
593
    return code;
×
594
  }
595

596
  {  // send checkpoint ready rsp
597
    SMStreamCheckpointReadyRspMsg* pReadyRsp = rpcMallocCont(sizeof(SMStreamCheckpointReadyRspMsg));
8,121✔
598
    if (pReadyRsp == NULL) {
8,123!
599
      return terrno;
×
600
    }
601

602
    pReadyRsp->upstreamTaskId = req.upstreamTaskId;
8,123✔
603
    pReadyRsp->upstreamNodeId = req.upstreamNodeId;
8,123✔
604
    pReadyRsp->downstreamTaskId = req.downstreamTaskId;
8,123✔
605
    pReadyRsp->downstreamNodeId = req.downstreamNodeId;
8,123✔
606
    pReadyRsp->checkpointId = req.checkpointId;
8,123✔
607
    pReadyRsp->streamId = req.streamId;
8,123✔
608
    pReadyRsp->head.vgId = htonl(req.downstreamNodeId);
8,123✔
609

610
    SRpcMsg rsp = {.code = 0, .info = pMsg->info, .pCont = pReadyRsp, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)};
8,123✔
611
    tmsgSendRsp(&rsp);
8,123✔
612

613
    pMsg->info.handle = NULL;  // disable auto rsp
8,126✔
614
  }
615

616
  return code;
8,126✔
617
}
618

619
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
14,395✔
620
                                     bool isLeader, bool restored) {
621
  int32_t code = 0;
14,395✔
622
  int32_t vgId = pMeta->vgId;
14,395✔
623
  int32_t numOfTasks = 0;
14,395✔
624
  int32_t taskId = -1;
14,395✔
625
  int64_t streamId = -1;
14,395✔
626
  bool    added = false;
14,395✔
627
  int32_t size = sizeof(SStreamTask);
14,395✔
628

629
  if (tsDisableStream) {
14,395!
630
    tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
×
631
    return code;
×
632
  }
633

634
  tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
14,395✔
635

636
  // 1.deserialize msg and build task
637
  SStreamTask* pTask = taosMemoryCalloc(1, size);
14,395!
638
  if (pTask == NULL) {
14,405!
639
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);
×
640
    return terrno;
×
641
  }
642

643
  SDecoder decoder;
644
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
14,405✔
645
  code = tDecodeStreamTask(&decoder, pTask);
14,405✔
646
  tDecoderClear(&decoder);
14,408✔
647

648
  if (code != TSDB_CODE_SUCCESS) {
14,399!
649
    taosMemoryFree(pTask);
×
650
    return TSDB_CODE_INVALID_MSG;
×
651
  }
652

653
  // 2.save task, use the latest commit version as the initial start version of stream task.
654
  taskId = pTask->id.taskId;
14,399✔
655
  streamId = pTask->id.streamId;
14,399✔
656

657
  streamMetaWLock(pMeta);
14,399✔
658
  code = streamMetaRegisterTask(pMeta, sversion, pTask, &added);
14,405✔
659
  numOfTasks = streamMetaGetNumOfTasks(pMeta);
14,411✔
660
  streamMetaWUnLock(pMeta);
14,412✔
661

662
  if (code < 0) {
14,411!
663
    tqError("vgId:%d failed to register s-task:0x%x into meta, existed tasks:%d, code:%s", vgId, taskId, numOfTasks,
×
664
            tstrerror(code));
665
    return code;
×
666
  }
667

668
  // added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if
669
  // it is added into the meta store
670
  if (added) {
14,411✔
671
    // only handled in the leader node
672
    if (isLeader) {
14,158✔
673
      tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
14,114✔
674

675
      if (restored) {
14,114✔
676
        SStreamTask* p = NULL;
14,006✔
677
        code = streamMetaAcquireTask(pMeta, streamId, taskId, &p);
14,006✔
678
        if ((p != NULL) && (code == 0) && (p->info.fillHistory == 0)) {
14,007!
679
          code = tqStreamStartOneTaskAsync(pMeta, cb, streamId, taskId);
9,170✔
680
        }
681

682
        if (p != NULL) {
14,007!
683
          streamMetaReleaseTask(pMeta, p);
14,007✔
684
        }
685
      } else {
686
        tqWarn("s-task:0x%x not launched since vnode(vgId:%d) not ready", taskId, vgId);
108!
687
      }
688

689
    } else {
690
      tqDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId);
44!
691
    }
692
  } else {
693
    tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store, total:%d", vgId, taskId, numOfTasks);
253!
694
  }
695

696
  return code;
14,411✔
697
}
698

699
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
7,010✔
700
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
7,010✔
701
  int32_t              code = 0;
7,010✔
702
  int32_t              vgId = pMeta->vgId;
7,010✔
703
  STaskId              hTaskId = {0};
7,010✔
704
  SStreamTask*         pTask = NULL;
7,010✔
705

706
  tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
7,010✔
707

708
  streamMetaWLock(pMeta);
7,010✔
709

710
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
7,030✔
711
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
7,030✔
712
  if (code == 0) {
7,022!
713
    if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
7,023✔
714
      hTaskId.streamId = pTask->hTaskInfo.id.streamId;
1,347✔
715
      hTaskId.taskId = pTask->hTaskInfo.id.taskId;
1,347✔
716
    }
717

718
    // clear the relationship, and then release the stream tasks, to avoid invalid accessing of already freed
719
    // related stream(history) task
720
    streamTaskSetRemoveBackendFiles(pTask);
7,023✔
721
    code = streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt);
7,013✔
722
    streamMetaReleaseTask(pMeta, pTask);
7,028✔
723

724
    if (code) {
7,034✔
725
      tqError("s-task:0x%x failed to clear related fill-history info, still exists", pReq->taskId);
4!
726
    }
727
  }
728

729
  // drop the related fill-history task firstly
730
  if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
7,033✔
731
    tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
1,291✔
732
    code = streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
1,291✔
733
    if (code) {
1,292!
734
      tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x failed", pReq->taskId, vgId,
×
735
              (int32_t)hTaskId.taskId);
736
    }
737
  }
738

739
  // drop the stream task now
740
  code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
7,034✔
741
  if (code) {
7,030!
742
    tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId);
×
743
  }
744

745
  // commit the update
746
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
7,030✔
747
  tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
7,033✔
748

749
  if (streamMetaCommit(pMeta) < 0) {
7,033✔
750
    // persist to disk
751
  }
752

753
  streamMetaWUnLock(pMeta);
7,035✔
754
  return 0;  // always return success
7,035✔
755
}
756

757
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg) {
5,406✔
758
  SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
5,406✔
759
  int32_t                    code = 0;
5,406✔
760
  int32_t                    vgId = pMeta->vgId;
5,406✔
761
  SStreamTask*               pTask = NULL;
5,406✔
762

763
  tqDebug("vgId:%d receive msg to update-checkpoint-info for s-task:0x%x", vgId, pReq->taskId);
5,406✔
764

765
  streamMetaWLock(pMeta);
5,406✔
766

767
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
5,413✔
768
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
5,413✔
769
  if (code == 0) {
5,409!
770
    code = streamTaskUpdateTaskCheckpointInfo(pTask, restored, pReq);
5,409✔
771
    streamMetaReleaseTask(pMeta, pTask);
5,413✔
772
  } else {  // failed to get the task.
773
    int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
×
774
    tqError(
×
775
        "vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, numOfTasks:%d, it may have been "
776
        "dropped already",
777
        vgId, pReq->taskId, numOfTasks);
778
  }
779

780
  streamMetaWUnLock(pMeta);
5,414✔
781
  // always return success when handling the requirement issued by mnode during transaction.
782
  return TSDB_CODE_SUCCESS;
5,412✔
783
}
784

785
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
43✔
786
  int32_t vgId = pMeta->vgId;
43✔
787
  int32_t code = 0;
43✔
788
  int64_t st = taosGetTimestampMs();
43✔
789

790
  streamMetaWLock(pMeta);
43✔
791
  if (pMeta->startInfo.startAllTasks == 1) {
43✔
792
    pMeta->startInfo.restartCount += 1;
3✔
793
    tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
3✔
794
            pMeta->startInfo.restartCount);
795
    streamMetaWUnLock(pMeta);
3✔
796
    return TSDB_CODE_SUCCESS;
3✔
797
  }
798

799
  pMeta->startInfo.startAllTasks = 1;
40✔
800
  streamMetaWUnLock(pMeta);
40✔
801

802
  terrno = 0;
40✔
803
  tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d, ts:%" PRId64, vgId,
40!
804
         pMeta->updateInfo.completeTransId, pMeta->updateInfo.completeTs);
805

806
  streamMetaWLock(pMeta);
40✔
807
  streamMetaClear(pMeta);
40✔
808

809
  int64_t el = taosGetTimestampMs() - st;
40✔
810
  tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.);
40!
811

812
  streamMetaLoadAllTasks(pMeta);
40✔
813

814
  {
815
    STaskStartInfo* pStartInfo = &pMeta->startInfo;
40✔
816
    taosHashClear(pStartInfo->pReadyTaskSet);
40✔
817
    taosHashClear(pStartInfo->pFailedTaskSet);
40✔
818
    pStartInfo->readyTs = 0;
40✔
819
  }
820

821
  if (isLeader && !tsDisableStream) {
40!
822
    streamMetaWUnLock(pMeta);
35✔
823
    code = streamMetaStartAllTasks(pMeta);
35✔
824
  } else {
825
    streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
5✔
826
    pMeta->startInfo.restartCount = 0;
5✔
827
    streamMetaWUnLock(pMeta);
5✔
828
    tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId);
5!
829
  }
830

831
  code = terrno;
40✔
832
  return code;
40✔
833
}
834

835
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
98,053✔
836
  int32_t  code = 0;
98,053✔
837
  int32_t  vgId = pMeta->vgId;
98,053✔
838
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
98,053✔
839
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
98,053✔
840
  SDecoder decoder;
841

842
  SStreamTaskRunReq req = {0};
98,053✔
843
  tDecoderInit(&decoder, (uint8_t*)msg, len);
98,053✔
844
  if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
98,176!
845
    tqError("vgId:%d failed to decode task run req, code:%s", pMeta->vgId, tstrerror(code));
×
846
    tDecoderClear(&decoder);
×
847
    return TSDB_CODE_SUCCESS;
×
848
  }
849

850
  tDecoderClear(&decoder);
98,159✔
851

852
  int32_t type = req.reqType;
98,187✔
853
  if (type == STREAM_EXEC_T_START_ONE_TASK) {
98,187✔
854
    code = streamMetaStartOneTask(pMeta, req.streamId, req.taskId);
9,353✔
855
    return 0;
9,348✔
856
  } else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
88,834✔
857
    code = streamMetaStartAllTasks(pMeta);
10,175✔
858
    return 0;
10,176✔
859
  } else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) {
78,659✔
860
    code = restartStreamTasks(pMeta, isLeader);
41✔
861
    return 0;
41✔
862
  } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
78,618✔
863
    code = streamMetaStopAllTasks(pMeta);
4,790✔
864
    return 0;
4,791✔
865
  } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
73,828✔
866
    code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
2✔
867
    return code;
2✔
868
  } else if (type == STREAM_EXEC_T_RESUME_TASK) {  // task resume to run after idle for a while
73,826✔
869
    SStreamTask* pTask = NULL;
9,134✔
870
    code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
9,134✔
871

872
    if (pTask != NULL && (code == 0)) {
9,138!
873
      char* pStatus = NULL;
9,137✔
874
      if (streamTaskReadyToRun(pTask, &pStatus)) {
9,137!
875
        int64_t execTs = pTask->status.lastExecTs;
9,133✔
876
        int32_t idle = taosGetTimestampMs() - execTs;
9,132✔
877
        tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs);
9,132✔
878

879
        code = streamResumeTask(pTask);
9,132✔
880
      } else {
881
        int8_t status = streamTaskSetSchedStatusInactive(pTask);
×
882
        tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
×
883
                pTask->id.idStr, pStatus, status);
884
      }
885
      streamMetaReleaseTask(pMeta, pTask);
9,137✔
886
    }
887

888
    return code;
9,142✔
889
  }
890

891
  SStreamTask* pTask = NULL;
64,692✔
892
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
64,692✔
893
  if ((pTask != NULL) && (code == 0)) {  // even in halt status, the data in inputQ must be processed
64,633!
894
    char* p = NULL;
64,633✔
895
    if (streamTaskReadyToRun(pTask, &p)) {
64,633✔
896
      tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId,
64,576✔
897
              pTask->id.idStr, p, pTask->chkInfo.nextProcessVer);
898
      (void)streamExecTask(pTask);
64,576✔
899
    } else {
900
      int8_t status = streamTaskSetSchedStatusInactive(pTask);
21✔
901
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
27!
902
              pTask->id.idStr, p, status);
903
    }
904

905
    streamMetaReleaseTask(pMeta, pTask);
64,612✔
906
    return 0;
64,620✔
907
  } else {  // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
908
    // todo add one function to handle this
909
    tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, req.taskId);
×
910
    return code;
27✔
911
  }
912
}
913

914
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
1,941✔
915
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
1,941✔
916
  int32_t         vgId = pMeta->vgId;
1,941✔
917
  bool            scanWal = false;
1,941✔
918
  int32_t         code = 0;
1,941✔
919

920
  streamMetaWLock(pMeta);
1,941✔
921
  if (pStartInfo->startAllTasks == 1) {
1,942!
922
    tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId,
×
923
            pMeta->startInfo.restartCount);
924
  } else {  // not in starting procedure
925
    bool allReady = streamMetaAllTasksReady(pMeta);
1,942✔
926

927
    if ((pStartInfo->restartCount > 0) && (!allReady)) {
1,942!
928
      // if all tasks are ready now, do NOT restart again, and reset the value of pStartInfo->restartCount
929
      pStartInfo->restartCount -= 1;
2✔
930
      tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", vgId, pMeta->role,
2!
931
              pStartInfo->restartCount);
932
      streamMetaWUnLock(pMeta);
2✔
933

934
      return restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
2✔
935
    } else {
936
      if (pStartInfo->restartCount == 0) {
1,940!
937
        tqDebug("vgId:%d start all tasks completed in callbackFn, restartCounter is 0", pMeta->vgId);
1,940✔
938
      } else if (allReady) {
×
939
        pStartInfo->restartCount = 0;
×
940
        tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
×
941
      }
942

943
      scanWal = true;
1,940✔
944
    }
945
  }
946

947
  streamMetaWUnLock(pMeta);
1,940✔
948

949
  if (scanWal && (vgId != SNODE_HANDLE)) {
1,940!
950
    tqDebug("vgId:%d start scan wal for executing tasks", vgId);
1,938✔
951
    code = tqScanWalAsync(pMeta->ahandle, true);
1,938✔
952
  }
953

954
  return code;
1,940✔
955
}
956

957
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
×
958
  SVResetStreamTaskReq* pReq = (SVResetStreamTaskReq*)pMsg;
×
959

960
  SStreamTask* pTask = NULL;
×
961
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
×
962
  if (pTask == NULL || (code != 0)) {
×
963
    tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already",
×
964
            pMeta->vgId, pReq->taskId);
965
    return TSDB_CODE_SUCCESS;
×
966
  }
967

968
  tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
×
969

970
  streamMutexLock(&pTask->lock);
×
971
  streamTaskClearCheckInfo(pTask, true);
×
972

973
  streamTaskSetFailedCheckpointId(pTask, pReq->chkptId);
×
974

975
  // clear flag set during do checkpoint, and open inputQ for all upstream tasks
976
  SStreamTaskState pState = streamTaskGetStatus(pTask);
×
977
  if (pState.state == TASK_STATUS__CK) {
×
978
    streamTaskSetStatusReady(pTask);
×
979
    tqDebug("s-task:%s reset checkpoint status to ready", pTask->id.idStr);
×
980
  } else if (pState.state == TASK_STATUS__UNINIT) {
×
981
    //    tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
982
    //    tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
983
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
984
  } else {
985
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
986
  }
987

988
  streamMutexUnlock(&pTask->lock);
×
989

990
  streamMetaReleaseTask(pMeta, pTask);
×
991
  return TSDB_CODE_SUCCESS;
×
992
}
993

994
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
995
  SRetrieveChkptTriggerReq req = {0};
×
996
  SStreamTask*             pTask = NULL;
×
997
  char*                    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
998
  int32_t                  len = pMsg->contLen - sizeof(SMsgHead);
×
999
  SDecoder                 decoder = {0};
×
1000

1001
  tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1002
  if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
×
1003
    tDecoderClear(&decoder);
×
1004
    tqError("vgId:%d invalid retrieve checkpoint-trigger req received", pMeta->vgId);
×
1005
    return TSDB_CODE_INVALID_MSG;
×
1006
  }
1007
  tDecoderClear(&decoder);
×
1008

1009
  int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
×
1010
  if (pTask == NULL || (code != 0)) {
×
1011
    tqError("vgId:%d process retrieve checkpoint-trigger, checkpointId:%" PRId64
×
1012
            " from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already",
1013
            pMeta->vgId, req.checkpointId, (int32_t)req.downstreamTaskId, req.upstreamTaskId);
1014
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
1015
  }
1016

1017
  tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64,
×
1018
          req.upstreamTaskId, (int32_t)req.downstreamTaskId, req.checkpointId);
1019

1020
  if (pTask->status.downstreamReady != 1) {
×
1021
    tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
×
1022
            pTask->id.idStr, (int32_t)req.downstreamTaskId);
1023

1024
    code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1025
                                              TSDB_CODE_STREAM_TASK_IVLD_STATUS);
1026
    streamMetaReleaseTask(pMeta, pTask);
×
1027
    return code;
×
1028
  }
1029

1030
  SStreamTaskState pState = streamTaskGetStatus(pTask);
×
1031
  if (pState.state == TASK_STATUS__CK) {  // recv the checkpoint-source/trigger already
×
1032
    int32_t transId = 0;
×
1033
    int64_t checkpointId = 0;
×
1034

1035
    streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId);
×
1036
    if (checkpointId != req.checkpointId) {
×
1037
      tqError("s-task:%s invalid checkpoint-trigger retrieve msg from 0x%" PRIx64 ", current checkpointId:%" PRId64
×
1038
              " req:%" PRId64,
1039
              pTask->id.idStr, req.downstreamTaskId, checkpointId, req.checkpointId);
1040
      streamMetaReleaseTask(pMeta, pTask);
×
1041
      return TSDB_CODE_INVALID_MSG;
×
1042
    }
1043

1044
    if (streamTaskAlreadySendTrigger(pTask, req.downstreamNodeId)) {
×
1045
      // re-send the lost checkpoint-trigger msg to downstream task
1046
      tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
×
1047
              (int32_t)req.downstreamTaskId, checkpointId, transId);
1048
      code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1049
                                                TSDB_CODE_SUCCESS);
1050
    } else {  // not send checkpoint-trigger yet, wait
1051
      int32_t recv = 0, total = 0;
×
1052
      streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
×
1053

1054
      if (recv == total) {  // add the ts info
×
1055
        tqWarn("s-task:%s all upstream send checkpoint-source/trigger, but not processed yet, wait", pTask->id.idStr);
×
1056
      } else {
1057
        tqWarn(
×
1058
            "s-task:%s not all upstream send checkpoint-source/trigger, total recv:%d/%d, wait for all upstream "
1059
            "sending checkpoint-source/trigger",
1060
            pTask->id.idStr, recv, total);
1061
      }
1062
      code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1063
                                                TSDB_CODE_ACTION_IN_PROGRESS);
1064
    }
1065
  } else {  // upstream not recv the checkpoint-source/trigger till now
1066
    if (!(pState.state == TASK_STATUS__READY || pState.state == TASK_STATUS__HALT)) {
×
1067
      tqFatal("s-task:%s invalid task status:%s", pTask->id.idStr, pState.name);
×
1068
    }
1069

1070
    tqWarn(
×
1071
        "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
1072
        "upstream sending checkpoint-source/trigger",
1073
        pTask->id.idStr);
1074
    code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1075
                                              TSDB_CODE_ACTION_IN_PROGRESS);
1076
  }
1077

1078
  streamMetaReleaseTask(pMeta, pTask);
×
1079
  return code;
×
1080
}
1081

1082
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
1083
  SCheckpointTriggerRsp rsp = {0};
×
1084
  SStreamTask*          pTask = NULL;
×
1085
  char*                 msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1086
  int32_t               len = pMsg->contLen - sizeof(SMsgHead);
×
1087
  SDecoder              decoder = {0};
×
1088

1089
  tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1090
  if (tDecodeCheckpointTriggerRsp(&decoder, &rsp) < 0) {
×
1091
    tDecoderClear(&decoder);
×
1092
    tqError("vgId:%d invalid retrieve checkpoint-trigger rsp received", pMeta->vgId);
×
1093
    return TSDB_CODE_INVALID_MSG;
×
1094
  }
1095
  tDecoderClear(&decoder);
×
1096

1097
  int32_t code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.taskId, &pTask);
×
1098
  if (pTask == NULL || (code != 0)) {
×
1099
    tqError(
×
1100
        "vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already",
1101
        pMeta->vgId, rsp.taskId);
1102
    return code;
×
1103
  }
1104

1105
  tqDebug(
×
1106
      "s-task:%s recv re-send checkpoint-trigger msg through retrieve/rsp channel, upstream:0x%x, checkpointId:%" PRId64
1107
      ", transId:%d",
1108
      pTask->id.idStr, rsp.upstreamTaskId, rsp.checkpointId, rsp.transId);
1109

1110
  code = streamTaskProcessCheckpointTriggerRsp(pTask, &rsp);
×
1111
  streamMetaReleaseTask(pMeta, pTask);
×
1112
  return code;
×
1113
}
1114

1115
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
1,393✔
1116
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
1,393✔
1117

1118
  SStreamTask* pTask = NULL;
1,393✔
1119
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
1,393✔
1120
  if (pTask == NULL || (code != 0)) {
1,398!
1121
    tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
×
1122
            pReq->taskId);
1123
    // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
1124
    return TSDB_CODE_SUCCESS;
×
1125
  }
1126

1127
  tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
1,399✔
1128
  streamTaskPause(pTask);
1,399✔
1129

1130
  SStreamTask* pHistoryTask = NULL;
1,398✔
1131
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
1,398✔
1132
    pHistoryTask = NULL;
53✔
1133
    code = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId, &pHistoryTask);
53✔
1134
    if (pHistoryTask == NULL || (code != 0)) {
54!
1135
      tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
1!
1136
              ", it may have been dropped already",
1137
              pMeta->vgId, pTask->hTaskInfo.id.taskId);
1138
      streamMetaReleaseTask(pMeta, pTask);
1✔
1139

1140
      // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
1141
      return TSDB_CODE_SUCCESS;
×
1142
    }
1143

1144
    tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
53!
1145

1146
    streamTaskPause(pHistoryTask);
53✔
1147
    streamMetaReleaseTask(pMeta, pHistoryTask);
54✔
1148
  }
1149

1150
  streamMetaReleaseTask(pMeta, pTask);
1,399✔
1151
  return TSDB_CODE_SUCCESS;
1,399✔
1152
}
1153

1154
static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t sversion, int8_t igUntreated,
2,611✔
1155
                                       bool fromVnode) {
1156
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
2,611✔
1157
  int32_t      vgId = pMeta->vgId;
2,611✔
1158
  int32_t      code = 0;
2,611✔
1159

1160
  streamTaskResume(pTask);
2,611✔
1161
  ETaskStatus status = streamTaskGetStatus(pTask).state;
2,628✔
1162

1163
  int32_t level = pTask->info.taskLevel;
2,628✔
1164
  if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
2,628!
1165
    // no lock needs to secure the access of the version
1166
    if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
2,628!
1167
      // discard all the data  when the stream task is suspended.
1168
      walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
589✔
1169
      tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
589✔
1170
              ", schedStatus:%d",
1171
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1172
    } else {  // from the previous paused version and go on
1173
      tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
2,039✔
1174
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1175
    }
1176

1177
    if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
2,628✔
1178
      pTask->hTaskInfo.operatorOpen = false;
27✔
1179
      code = streamStartScanHistoryAsync(pTask, igUntreated);
27✔
1180
    } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
2,601✔
1181
      code = tqScanWalAsync((STQ*)handle, false);
1,283✔
1182
    } else {
1183
      code = streamTrySchedExec(pTask);
1,318✔
1184
    }
1185
  }
1186

1187
  return code;
2,625✔
1188
}
1189

1190
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode) {
2,549✔
1191
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
2,549✔
1192

1193
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
2,549✔
1194

1195
  SStreamTask* pTask = NULL;
2,549✔
1196
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
2,549✔
1197
  if (pTask == NULL || (code != 0)) {
2,567!
1198
    tqError("s-task:0x%x failed to acquire task to resume, it may have been dropped or stopped", pReq->taskId);
1!
1199
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1200
  }
1201

1202
  streamMutexLock(&pTask->lock);
2,566✔
1203
  SStreamTaskState pState = streamTaskGetStatus(pTask);
2,565✔
1204
  tqDebug("s-task:%s start to resume from paused, current status:%s", pTask->id.idStr, pState.name);
2,565✔
1205
  streamMutexUnlock(&pTask->lock);
2,565✔
1206

1207
  code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
2,562✔
1208
  if (code != 0) {
2,565!
1209
    streamMetaReleaseTask(pMeta, pTask);
×
1210
    return code;
×
1211
  }
1212

1213
  STaskId*     pHTaskId = &pTask->hTaskInfo.id;
2,565✔
1214
  SStreamTask* pHTask = NULL;
2,565✔
1215
  code = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
2,565✔
1216
  if (pHTask && (code == 0)) {
2,568!
1217
    streamMutexLock(&pHTask->lock);
60✔
1218
    SStreamTaskState p = streamTaskGetStatus(pHTask);
60✔
1219
    tqDebug("s-task:%s related history task start to resume from paused, current status:%s", pHTask->id.idStr, p.name);
60!
1220
    streamMutexUnlock(&pHTask->lock);
60✔
1221

1222
    code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode);
60✔
1223
    tqDebug("s-task:%s resume complete, code:%s", pHTask->id.idStr, tstrerror(code));
60!
1224

1225
    streamMetaReleaseTask(pMeta, pHTask);
60✔
1226
  }
1227

1228
  return TSDB_CODE_SUCCESS;
2,568✔
1229
}
1230

1231
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); }
×
1232

1233
int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) {
10,655✔
1234
  rpcFreeCont(pMsg->pCont);
10,655✔
1235
  pMsg->pCont = NULL;
10,655✔
1236
  return TSDB_CODE_SUCCESS;
10,655✔
1237
}
1238

1239
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
26,687✔
1240
  SMStreamHbRspMsg rsp = {0};
26,687✔
1241
  int32_t          code = 0;
26,687✔
1242
  SDecoder         decoder;
1243
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
26,687✔
1244
  int32_t          len = pMsg->contLen - sizeof(SMsgHead);
26,687✔
1245

1246
  tDecoderInit(&decoder, (uint8_t*)msg, len);
26,687✔
1247
  code = tDecodeStreamHbRsp(&decoder, &rsp);
26,671✔
1248
  if (code < 0) {
26,679!
1249
    terrno = TSDB_CODE_INVALID_MSG;
×
1250
    tDecoderClear(&decoder);
×
1251
    tqError("vgId:%d failed to parse hb rsp msg, code:%s", pMeta->vgId, tstrerror(terrno));
×
1252
    return terrno;
×
1253
  }
1254

1255
  tDecoderClear(&decoder);
26,679✔
1256
  return streamProcessHeartbeatRsp(pMeta, &rsp);
26,679✔
1257
}
1258

1259
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
4,518✔
1260

1261
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
6,136✔
1262

1263
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
8,129✔
1264
  SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont;
8,129✔
1265

1266
  SStreamTask* pTask = NULL;
8,129✔
1267
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->downstreamTaskId, &pTask);
8,129✔
1268
  if (pTask == NULL || (code != 0)) {
8,128!
1269
    tqError("vgId:%d failed to acquire task:0x%x when handling checkpoint-ready msg, it may have been dropped",
×
1270
            pRsp->downstreamNodeId, pRsp->downstreamTaskId);
1271
    return code;
×
1272
  }
1273

1274
  code = streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId);
8,128✔
1275
  streamMetaReleaseTask(pMeta, pTask);
8,130✔
1276
  return code;
8,130✔
1277
}
1278

1279
int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
184✔
1280
  int32_t                vgId = pMeta->vgId;
184✔
1281
  int32_t                code = 0;
184✔
1282
  SStreamTask*           pTask = NULL;
184✔
1283
  char*                  msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
184✔
1284
  int32_t                len = pMsg->contLen - sizeof(SMsgHead);
184✔
1285
  int64_t                now = taosGetTimestampMs();
184✔
1286
  SDecoder               decoder;
1287
  SRestoreCheckpointInfo req = {0};
184✔
1288

1289
  tDecoderInit(&decoder, (uint8_t*)msg, len);
184✔
1290
  if ((code = tDecodeRestoreCheckpointInfo(&decoder, &req)) < 0) {
184!
1291
    tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code));
×
1292
    tDecoderClear(&decoder);
×
1293
    return TSDB_CODE_SUCCESS;
×
1294
  }
1295

1296
  tDecoderClear(&decoder);
184✔
1297

1298
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
184✔
1299
  if (pTask == NULL || (code != 0)) {
184!
1300
    tqError("vgId:%d process consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
×
1301
            pMeta->vgId, req.taskId);
1302
    // ignore this code to avoid error code over write
1303
    int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
×
1304
    if (ret) {
×
1305
      tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret));
×
1306
    }
1307

1308
    return 0;
×
1309
  }
1310

1311
  // discard the rsp, since it is expired.
1312
  if (req.startTs < pTask->execInfo.created) {
184✔
1313
    tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64
6!
1314
           " from task createTs:%" PRId64 " < task createTs:%" PRId64 ", discard",
1315
           pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs,
1316
           pTask->execInfo.created);
1317
    streamMetaAddFailedTaskSelf(pTask, now);
6✔
1318
    streamMetaReleaseTask(pMeta, pTask);
6✔
1319
    return TSDB_CODE_SUCCESS;
6✔
1320
  }
1321

1322
  tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64 " from mnode",
178✔
1323
          pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId);
1324

1325
  streamMutexLock(&pTask->lock);
178✔
1326
  if (pTask->chkInfo.checkpointId < req.checkpointId) {
178!
1327
    tqFatal("s-task:%s vgId:%d invalid consensus-checkpointId:%" PRId64 ", greater than existed checkpointId:%" PRId64,
×
1328
            pTask->id.idStr, vgId, req.checkpointId, pTask->chkInfo.checkpointId);
1329

1330
    streamMutexUnlock(&pTask->lock);
×
1331
    streamMetaReleaseTask(pMeta, pTask);
×
1332
    return 0;
×
1333
  }
1334

1335
  SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo;
178✔
1336
  if (pConsenInfo->consenChkptTransId >= req.transId) {
178!
1337
    tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
×
1338
            pConsenInfo->consenChkptTransId, req.transId);
1339
    streamMutexUnlock(&pTask->lock);
×
1340
    streamMetaReleaseTask(pMeta, pTask);
×
1341
    return TSDB_CODE_SUCCESS;
×
1342
  }
1343

1344
  if (pTask->chkInfo.checkpointId != req.checkpointId) {
178!
1345
    tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64 " transId:%d", pTask->id.idStr, vgId,
×
1346
            pTask->chkInfo.checkpointId, req.checkpointId, req.transId);
1347
    pTask->chkInfo.checkpointId = req.checkpointId;
×
1348
    tqSetRestoreVersionInfo(pTask);
×
1349
  } else {
1350
    tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current id, transId:%d not update",
178✔
1351
            pTask->id.idStr, vgId, req.checkpointId, req.transId);
1352
  }
1353

1354
  streamTaskSetConsenChkptIdRecv(pTask, req.transId, now);
178✔
1355
  streamMutexUnlock(&pTask->lock);
178✔
1356

1357
  if (pMeta->role == NODE_ROLE_LEADER) {
178!
1358
    code = tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
178✔
1359
    if (code) {
178!
1360
      tqError("s-task:0x%x vgId:%d failed start task async, code:%s", req.taskId, vgId, tstrerror(code));
×
1361
    }
1362
  } else {
1363
    tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr);
×
1364
  }
1365

1366
  streamMetaReleaseTask(pMeta, pTask);
178✔
1367
  return 0;
178✔
1368
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc