• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.28
/source/dnode/vnode/src/tqCommon/tqCommon.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsgcb.h"
17
#include "tq.h"
18
#include "tstream.h"
19

20
typedef struct SMStreamCheckpointReadyRspMsg {
21
  SMsgHead head;
22
  int64_t  streamId;
23
  int32_t  upstreamTaskId;
24
  int32_t  upstreamNodeId;
25
  int32_t  downstreamTaskId;
26
  int32_t  downstreamNodeId;
27
  int64_t  checkpointId;
28
  int32_t  transId;
29
} SMStreamCheckpointReadyRspMsg;
30

31
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
32

33
int32_t tqExpandStreamTask(SStreamTask* pTask) {
7,666✔
34
  SStreamMeta* pMeta = pTask->pMeta;
7,666✔
35
  int32_t      vgId = pMeta->vgId;
7,666✔
36
  int64_t      st = taosGetTimestampMs();
7,666✔
37
  int64_t      streamId = 0;
7,666✔
38
  int32_t      taskId = 0;
7,666✔
39
  int32_t      code = 0;
7,666✔
40

41
  tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId);
7,666✔
42

43
  if (pTask->info.fillHistory) {
7,666✔
44
    streamId = pTask->streamTaskId.streamId;
2,741✔
45
    taskId = pTask->streamTaskId.taskId;
2,741✔
46
  } else {
47
    streamId = pTask->id.streamId;
4,925✔
48
    taskId = pTask->id.taskId;
4,925✔
49
  }
50

51
  // sink task does not need the pState
52
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
7,666✔
53
    pTask->pState = streamStateOpen(pMeta->path, pTask, streamId, taskId);
4,097✔
54
    if (pTask->pState == NULL) {
4,097!
55
      tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
×
56
      return terrno;
×
57
    } else {
58
      tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
4,097✔
59
    }
60
  }
61

62
  SReadHandle handle = {
7,665✔
63
      .checkpointId = pTask->chkInfo.checkpointId,
7,665✔
64
      .pStateBackend = pTask->pState,
7,665✔
65
      .fillHistory = pTask->info.fillHistory,
7,665✔
66
      .winRange = pTask->dataRange.window,
67
  };
68

69
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
7,665✔
70
    handle.vnode = ((STQ*)pMeta->ahandle)->pVnode;
3,870✔
71
    handle.initTqReader = 1;
3,870✔
72
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
3,795✔
73
    handle.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
226✔
74
  }
75

76
  initStorageAPI(&handle.api);
7,665✔
77

78
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG) {
7,666✔
79
    code = qCreateStreamExecTaskInfo(&pTask->exec.pExecutor, pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
4,097✔
80
    if (code) {
4,097!
81
      tqError("s-task:%s failed to expand task, code:%s", pTask->id.idStr, tstrerror(code));
×
82
      return code;
×
83
    }
84

85
    code = qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
4,097✔
86
    if (code) {
4,097!
87
      return code;
×
88
    }
89
  }
90

91
  streamSetupScheduleTrigger(pTask);
7,666✔
92

93
  double el = (taosGetTimestampMs() - st) / 1000.0;
7,666✔
94
  tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el);
7,666✔
95

96
  return code;
7,666✔
97
}
98

99
void tqSetRestoreVersionInfo(SStreamTask* pTask) {
7,769✔
100
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
7,769✔
101

102
  // checkpoint ver is the kept version, handled data should be the next version.
103
  if (pChkInfo->checkpointId != 0) {
7,769✔
104
    pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
61✔
105
    pChkInfo->processedVer = pChkInfo->checkpointVer;
61✔
106
    pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
61✔
107

108
    tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
61!
109
           pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
110
  }
111

112
  pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
7,769✔
113
}
7,769✔
114

115
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
3✔
116
  int32_t vgId = pMeta->vgId;
3✔
117
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
3✔
118
  if (numOfTasks == 0) {
3!
119
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
120
    return 0;
×
121
  }
122

123
  tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
3!
124

125
  int32_t type = restart ? STREAM_EXEC_T_RESTART_ALL_TASKS : STREAM_EXEC_T_START_ALL_TASKS;
3!
126
  return streamTaskSchedTask(cb, vgId, 0, 0, type);
3✔
127
}
128

129
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
4,917✔
130
  int32_t vgId = pMeta->vgId;
4,917✔
131
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
4,917✔
132
  if (numOfTasks == 0) {
4,917!
133
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
134
    return 0;
×
135
  }
136

137
  tqDebug("vgId:%d start task:0x%x async", vgId, taskId);
4,917✔
138
  return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK);
4,917✔
139
}
140

141
// this is to process request from transaction, always return true.
142
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
5✔
143
  int32_t      vgId = pMeta->vgId;
5✔
144
  char*        msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
5✔
145
  int32_t      len = pMsg->contLen - sizeof(SMsgHead);
5✔
146
  SRpcMsg      rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
5✔
147
  int64_t      st = taosGetTimestampMs();
5✔
148
  bool         updated = false;
5✔
149
  int32_t      code = 0;
5✔
150
  SStreamTask* pTask = NULL;
5✔
151
  SStreamTask* pHTask = NULL;
5✔
152

153
  SStreamTaskNodeUpdateMsg req = {0};
5✔
154

155
  SDecoder decoder;
156
  tDecoderInit(&decoder, (uint8_t*)msg, len);
5✔
157
  if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
5!
158
    rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
×
159
    tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code));
×
160
    tDecoderClear(&decoder);
×
161
    return rsp.code;
×
162
  }
163

164
  tDecoderClear(&decoder);
5✔
165

166
  int32_t gError = streamGetFatalError(pMeta);
5✔
167
  if (gError != 0) {
5!
168
    tqError("vgId:%d global fatal occurs, code:%s, ts:%" PRId64 " func:%s", pMeta->vgId, tstrerror(gError),
×
169
            pMeta->fatalInfo.ts, pMeta->fatalInfo.func);
170
    return 0;
×
171
  }
172

173
  // update the nodeEpset when it exists
174
  streamMetaWLock(pMeta);
5✔
175

176
  // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
177
  STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
5✔
178
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
5✔
179
  if (code != 0) {
5!
180
    tqError("vgId:%d failed to acquire task:0x%x when handling update task epset, it may have been dropped", vgId,
×
181
            req.taskId);
182
    rsp.code = TSDB_CODE_SUCCESS;
×
183
    streamMetaWUnLock(pMeta);
×
184
    taosArrayDestroy(req.pNodeList);
×
185
    return rsp.code;
×
186
  }
187

188
  const char* idstr = pTask->id.idStr;
5✔
189

190
  if (req.transId <= 0) {
5!
191
    tqError("vgId:%d invalid update nodeEp task, transId:%d, discard", vgId, req.taskId);
×
192
    rsp.code = TSDB_CODE_SUCCESS;
×
193

194
    streamMetaReleaseTask(pMeta, pTask);
×
195
    streamMetaWUnLock(pMeta);
×
196

197
    taosArrayDestroy(req.pNodeList);
×
198
    return rsp.code;
×
199
  }
200

201
  // info needs to be kept till the new trans to update the nodeEp arrived.
202
  bool update = streamMetaInitUpdateTaskList(pMeta, req.transId);
5✔
203
  if (!update) {
5!
204
    rsp.code = TSDB_CODE_SUCCESS;
×
205

206
    streamMetaReleaseTask(pMeta, pTask);
×
207
    streamMetaWUnLock(pMeta);
×
208

209
    taosArrayDestroy(req.pNodeList);
×
210
    return rsp.code;
×
211
  }
212

213
  // duplicate update epset msg received, discard this redundant message
214
  STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
5✔
215

216
  void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
5✔
217
  if (pReqTask != NULL) {
5!
218
    tqDebug("s-task:%s (vgId:%d) already update in transId:%d, discard the nodeEp update msg", idstr, vgId,
×
219
            req.transId);
220
    rsp.code = TSDB_CODE_SUCCESS;
×
221

222
    streamMetaReleaseTask(pMeta, pTask);
×
223
    streamMetaWUnLock(pMeta);
×
224

225
    taosArrayDestroy(req.pNodeList);
×
226
    return rsp.code;
×
227
  }
228

229
  updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
5✔
230

231
  // send the checkpoint-source-rsp for source task to end the checkpoint trans in mnode
232
  code = streamTaskSendCheckpointsourceRsp(pTask);
5✔
233
  if (code) {
5!
234
    tqError("%s failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
×
235
  }
236
  streamTaskResetStatus(pTask);
5✔
237

238
  streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
5✔
239

240
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
5!
241
    code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->hTaskInfo.id, &pHTask);
×
242
    if (code != 0) {
×
243
      tqError(
×
244
          "vgId:%d failed to acquire fill-history task:0x%x when handling update, may have been dropped already, rel "
245
          "stream task:0x%x",
246
          vgId, (uint32_t)pTask->hTaskInfo.id.taskId, req.taskId);
247
      CLEAR_RELATED_FILLHISTORY_TASK(pTask);
×
248
    } else {
249
      tqDebug("s-task:%s fill-history task update nodeEp along with stream task", pHTask->id.idStr);
×
250
      bool updateEpSet = streamTaskUpdateEpsetInfo(pHTask, req.pNodeList);
×
251
      if (updateEpSet) {
×
252
        updated = updateEpSet;
×
253
      }
254

255
      streamTaskResetStatus(pHTask);
×
256
      streamTaskStopMonitorCheckRsp(&pHTask->taskCheckInfo, pHTask->id.idStr);
×
257
    }
258
  }
259

260
  // stream do update the nodeEp info, write it into stream meta.
261
  if (updated) {
5!
262
    tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId);
×
263
    code = streamMetaSaveTask(pMeta, pTask);
×
264
    if (code) {
×
265
      tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code));
×
266
    }
267

268
    if (pHTask != NULL) {
×
269
      code = streamMetaSaveTask(pMeta, pHTask);
×
270
      if (code) {
×
271
        tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code));
×
272
      }
273
    }
274
  } else {
275
    tqDebug("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId);
5!
276
  }
277

278
  code = streamTaskStop(pTask);
5✔
279
  if (code) {
5!
280
    tqError("s-task:%s vgId:%d failed to stop task, code:%s", idstr, vgId, tstrerror(code));
×
281
  }
282

283
  if (pHTask != NULL) {
5!
284
    code = streamTaskStop(pHTask);
×
285
    if (code) {
×
286
      tqError("s-task:%s vgId:%d failed to stop related history task, code:%s", idstr, vgId, tstrerror(code));
×
287
    }
288
  }
289

290
  // keep info
291
  streamMetaAddIntoUpdateTaskList(pMeta, pTask, (pHTask != NULL) ? (pHTask) : NULL, req.transId, st);
5✔
292
  streamMetaReleaseTask(pMeta, pTask);
5✔
293
  streamMetaReleaseTask(pMeta, pHTask);
5✔
294

295
  rsp.code = TSDB_CODE_SUCCESS;
5✔
296

297
  // possibly only handle the stream task.
298
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
5✔
299
  int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
5✔
300

301
  if (restored) {
5!
302
    tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId);
5!
303
    pMeta->startInfo.tasksWillRestart = 1;
5✔
304
  }
305

306
  if (updateTasks < numOfTasks) {
5✔
307
    tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
2!
308
            updateTasks, (numOfTasks - updateTasks));
309
  } else {
310
    if ((code = streamMetaCommit(pMeta)) < 0) {
3!
311
      // always return true
312
      streamMetaWUnLock(pMeta);
×
313
      taosArrayDestroy(req.pNodeList);
×
314
      return TSDB_CODE_SUCCESS;
×
315
    }
316

317
    streamMetaClearSetUpdateTaskListComplete(pMeta);
3✔
318

319
    if (!restored) {
3!
320
      tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId);
×
321
    } else {
322
      tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId);
3!
323
#if 0
324
      taosMSleep(5000);// for test purpose, to trigger the leader election
325
#endif
326
      code = tqStreamTaskStartAsync(pMeta, cb, true);
3✔
327
      if (code) {
3!
328
        tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code));
×
329
      }
330
    }
331
  }
332

333
  streamMetaWUnLock(pMeta);
5✔
334
  taosArrayDestroy(req.pNodeList);
5✔
335
  return rsp.code;  // always return true
5✔
336
}
337

338
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
18,970✔
339
  char*   msgStr = pMsg->pCont;
18,970✔
340
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
18,970✔
341
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
18,970✔
342

343
  SStreamDispatchReq req = {0};
18,970✔
344

345
  SDecoder decoder;
346
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
18,970✔
347
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
18,969!
348
    tDecoderClear(&decoder);
×
349
    return TSDB_CODE_MSG_DECODE_ERROR;
×
350
  }
351
  tDecoderClear(&decoder);
18,969✔
352

353
  tqDebug("s-task:0x%x recv dispatch msg from 0x%x(vgId:%d)", req.taskId, req.upstreamTaskId, req.upstreamNodeId);
18,970✔
354

355
  SStreamTask* pTask = NULL;
18,970✔
356
  int32_t      code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
18,970✔
357
  if (pTask && (code == 0)) {
18,970!
358
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
18,919✔
359
    if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0) {
18,919!
360
      return -1;
×
361
    }
362
    tCleanupStreamDispatchReq(&req);
18,919✔
363
    streamMetaReleaseTask(pMeta, pTask);
18,919✔
364
    return 0;
18,919✔
365
  } else {
366
    tqError("vgId:%d failed to find task:0x%x to handle the dispatch req, it may have been destroyed already",
51!
367
            pMeta->vgId, req.taskId);
368

369
    SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
51✔
370
    if (pRspHead == NULL) {
51!
371
      tqError("s-task:0x%x send dispatch error rsp, out of memory", req.taskId);
×
372
      return terrno;
×
373
    }
374

375
    pRspHead->vgId = htonl(req.upstreamNodeId);
51✔
376
    if (pRspHead->vgId == 0) {
51!
377
      tqError("vgId:%d invalid dispatch msg from upstream to task:0x%x", pMeta->vgId, req.taskId);
×
378
      return TSDB_CODE_INVALID_MSG;
×
379
    }
380

381
    SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
51✔
382
    pRsp->streamId = htobe64(req.streamId);
51✔
383
    pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
51✔
384
    pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
51✔
385
    pRsp->downstreamNodeId = htonl(pMeta->vgId);
51✔
386
    pRsp->downstreamTaskId = htonl(req.taskId);
51✔
387
    pRsp->msgId = htonl(req.msgId);
51✔
388
    pRsp->stage = htobe64(req.stage);
51✔
389
    pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;
51✔
390

391
    int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
51✔
392
    SRpcMsg rsp = {.code = TSDB_CODE_STREAM_TASK_NOT_EXIST, .info = pMsg->info, .contLen = len, .pCont = pRspHead};
51✔
393
    tqError("s-task:0x%x send dispatch error rsp, no task", req.taskId);
51!
394

395
    tmsgSendRsp(&rsp);
51✔
396
    tCleanupStreamDispatchReq(&req);
51✔
397

398
    return 0;
51✔
399
  }
400
}
401

402
int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
18,970✔
403
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
18,970✔
404

405
  int32_t vgId = pMeta->vgId;
18,970✔
406
  pRsp->upstreamNodeId = htonl(pRsp->upstreamNodeId);
18,970✔
407
  pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
18,970✔
408
  pRsp->streamId = htobe64(pRsp->streamId);
18,970✔
409
  pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId);
18,970✔
410
  pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId);
18,970✔
411
  pRsp->stage = htobe64(pRsp->stage);
18,970✔
412
  pRsp->msgId = htonl(pRsp->msgId);
18,970✔
413

414
  tqDebug("s-task:0x%x vgId:%d recv dispatch-rsp from 0x%x vgId:%d", pRsp->upstreamTaskId, pRsp->upstreamNodeId,
18,970✔
415
          pRsp->downstreamTaskId, pRsp->downstreamNodeId);
416

417
  SStreamTask* pTask = NULL;
18,970✔
418
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId, &pTask);
18,970✔
419
  if (pTask && (code == 0)) {
18,970!
420
    code = streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
18,915✔
421
    streamMetaReleaseTask(pMeta, pTask);
18,915✔
422
    return code;
18,915✔
423
  } else {
424
    tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId);
55!
425
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
55✔
426
  }
427
}
428

429
int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
442✔
430
  char*    msgStr = pMsg->pCont;
442✔
431
  char*    msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
442✔
432
  int32_t  msgLen = pMsg->contLen - sizeof(SMsgHead);
442✔
433
  int32_t  code = 0;
442✔
434
  SDecoder decoder;
435

436
  SStreamRetrieveReq req;
437
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
442✔
438
  code = tDecodeStreamRetrieveReq(&decoder, &req);
442✔
439
  tDecoderClear(&decoder);
441✔
440

441
  if (code) {
441!
442
    tqError("vgId:%d failed to decode retrieve msg, discard it", pMeta->vgId);
×
443
    return code;
×
444
  }
445

446
  SStreamTask* pTask = NULL;
441✔
447
  code = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId, &pTask);
441✔
448
  if (pTask == NULL || code != 0) {
442!
449
    tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
×
450
            req.dstTaskId);
451
    tCleanupStreamRetrieveReq(&req);
×
452
    return code;
×
453
  }
454

455
  // enqueue
456
  tqDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d),QID:0x%" PRIx64, pTask->id.idStr,
442!
457
          pTask->pMeta->vgId, pTask->info.taskLevel, req.srcTaskId, req.srcNodeId, req.reqId);
458

459
  // if task is in ck status, set current ck failed
460
  streamTaskSetCheckpointFailed(pTask);
442✔
461

462
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
442✔
463
    code = streamProcessRetrieveReq(pTask, &req);
436✔
464
  } else {
465
    req.srcNodeId = pTask->info.nodeId;
6✔
466
    req.srcTaskId = pTask->id.taskId;
6✔
467
    code = streamTaskBroadcastRetrieveReq(pTask, &req);
6✔
468
  }
469

470
  if (code != TSDB_CODE_SUCCESS) {  // return error not send rsp manually
442!
471
    tqError("s-task:0x%x vgId:%d failed to process retrieve request from 0x%x, code:%s", req.dstTaskId, req.dstNodeId,
×
472
            req.srcTaskId, tstrerror(code));
473
  } else {  // send rsp manually only on success.
474
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
442✔
475
    streamTaskSendRetrieveRsp(&req, &rsp);
442✔
476
  }
477

478
  streamMetaReleaseTask(pMeta, pTask);
442✔
479
  tCleanupStreamRetrieveReq(&req);
442✔
480

481
  // always return success, to disable the auto rsp
482
  return code;
442✔
483
}
484

485
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
11,454✔
486
  char*   msgStr = pMsg->pCont;
11,454✔
487
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
11,454✔
488
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
11,454✔
489
  int32_t code = 0;
11,454✔
490

491
  SStreamTaskCheckReq req;
492
  SStreamTaskCheckRsp rsp = {0};
11,454✔
493

494
  SDecoder decoder;
495

496
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
11,454✔
497
  code = tDecodeStreamTaskCheckReq(&decoder, &req);
11,455✔
498
  tDecoderClear(&decoder);
11,455✔
499

500
  if (code) {
11,454!
501
    tqError("vgId:%d decode check msg failed, not handle this msg", pMeta->vgId);
×
502
    return code;
×
503
  }
504

505
  streamTaskProcessCheckMsg(pMeta, &req, &rsp);
11,454✔
506
  return streamTaskSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId);
11,455✔
507
}
508

509
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
11,434✔
510
  char*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
11,434✔
511
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
11,434✔
512
  int32_t vgId = pMeta->vgId;
11,434✔
513
  int32_t code = TSDB_CODE_SUCCESS;
11,434✔
514

515
  SStreamTaskCheckRsp rsp;
516

517
  SDecoder decoder;
518
  tDecoderInit(&decoder, (uint8_t*)pReq, len);
11,434✔
519
  code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
11,433✔
520
  if (code < 0) {
11,433!
521
    terrno = TSDB_CODE_INVALID_MSG;
×
522
    tDecoderClear(&decoder);
×
523
    tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno));
×
524
    return -1;
×
525
  }
526

527
  tDecoderClear(&decoder);
11,433✔
528
  tqDebug("tq task:0x%x (vgId:%d) recv check rsp(QID:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId,
11,434✔
529
          rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
530

531
  if (!isLeader) {
11,434!
532
    tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
×
533
            rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
534
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
×
535
  }
536

537
  SStreamTask* pTask = NULL;
11,434✔
538
  code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId, &pTask);
11,434✔
539
  if ((pTask == NULL) || (code != 0)) {
11,434!
540
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
38✔
541
  }
542

543
  code = streamTaskProcessCheckRsp(pTask, &rsp);
11,396✔
544
  streamMetaReleaseTask(pMeta, pTask);
11,396✔
545
  return code;
11,396✔
546
}
547

548
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
4,207✔
549
  int32_t vgId = pMeta->vgId;
4,207✔
550
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
4,207✔
551
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
4,207✔
552
  int32_t code = 0;
4,207✔
553

554
  SStreamCheckpointReadyMsg req = {0};
4,207✔
555

556
  SDecoder decoder;
557
  tDecoderInit(&decoder, (uint8_t*)msg, len);
4,207✔
558
  if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) {
4,207!
559
    code = TSDB_CODE_MSG_DECODE_ERROR;
×
560
    tDecoderClear(&decoder);
×
561
    return code;
×
562
  }
563
  tDecoderClear(&decoder);
4,207✔
564

565
  SStreamTask* pTask = NULL;
4,207✔
566
  code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
4,207✔
567
  if (code != 0) {
4,207✔
568
    tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId);
5!
569
    return code;
5✔
570
  }
571

572
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
4,202!
573
    tqDebug("vgId:%d s-task:%s recv invalid the checkpoint-ready msg from task:0x%x (vgId:%d), discard", vgId,
×
574
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
575
    streamMetaReleaseTask(pMeta, pTask);
×
576
    return TSDB_CODE_INVALID_MSG;
×
577
  } else {
578
    tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId,
4,202✔
579
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
580
  }
581

582
  code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamNodeId, req.downstreamTaskId);
4,202✔
583
  streamMetaReleaseTask(pMeta, pTask);
4,202✔
584
  if (code) {
4,202!
UNCOV
585
    return code;
×
586
  }
587

588
  {  // send checkpoint ready rsp
589
    SMStreamCheckpointReadyRspMsg* pReadyRsp = rpcMallocCont(sizeof(SMStreamCheckpointReadyRspMsg));
4,202✔
590
    if (pReadyRsp == NULL) {
4,202!
591
      return terrno;
×
592
    }
593

594
    pReadyRsp->upstreamTaskId = req.upstreamTaskId;
4,202✔
595
    pReadyRsp->upstreamNodeId = req.upstreamNodeId;
4,202✔
596
    pReadyRsp->downstreamTaskId = req.downstreamTaskId;
4,202✔
597
    pReadyRsp->downstreamNodeId = req.downstreamNodeId;
4,202✔
598
    pReadyRsp->checkpointId = req.checkpointId;
4,202✔
599
    pReadyRsp->streamId = req.streamId;
4,202✔
600
    pReadyRsp->head.vgId = htonl(req.downstreamNodeId);
4,202✔
601

602
    SRpcMsg rsp = {.code = 0, .info = pMsg->info, .pCont = pReadyRsp, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)};
4,202✔
603
    tmsgSendRsp(&rsp);
4,202✔
604

605
    pMsg->info.handle = NULL;  // disable auto rsp
4,202✔
606
  }
607

608
  return code;
4,202✔
609
}
610

611
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
7,695✔
612
                                     bool isLeader, bool restored) {
613
  int32_t code = 0;
7,695✔
614
  int32_t vgId = pMeta->vgId;
7,695✔
615
  int32_t numOfTasks = 0;
7,695✔
616
  int32_t taskId = -1;
7,695✔
617
  int64_t streamId = -1;
7,695✔
618
  bool    added = false;
7,695✔
619
  int32_t size = sizeof(SStreamTask);
7,695✔
620

621
  if (tsDisableStream) {
7,695!
622
    tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
×
623
    return code;
×
624
  }
625

626
  tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
7,695✔
627

628
  // 1.deserialize msg and build task
629
  SStreamTask* pTask = taosMemoryCalloc(1, size);
7,695✔
630
  if (pTask == NULL) {
7,697!
631
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);
×
632
    return terrno;
×
633
  }
634

635
  SDecoder decoder;
636
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
7,697✔
637
  code = tDecodeStreamTask(&decoder, pTask);
7,697✔
638
  tDecoderClear(&decoder);
7,697✔
639

640
  if (code != TSDB_CODE_SUCCESS) {
7,697!
641
    taosMemoryFree(pTask);
×
642
    return TSDB_CODE_INVALID_MSG;
×
643
  }
644

645
  // 2.save task, use the latest commit version as the initial start version of stream task.
646
  taskId = pTask->id.taskId;
7,697✔
647
  streamId = pTask->id.streamId;
7,697✔
648

649
  streamMetaWLock(pMeta);
7,697✔
650
  code = streamMetaRegisterTask(pMeta, sversion, pTask, &added);
7,698✔
651
  numOfTasks = streamMetaGetNumOfTasks(pMeta);
7,698✔
652
  streamMetaWUnLock(pMeta);
7,698✔
653

654
  if (code < 0) {
7,698!
655
    tqError("failed to add s-task:0x%x into vgId:%d meta, existed:%d, code:%s", vgId, taskId, numOfTasks,
×
656
            tstrerror(code));
657
    return code;
×
658
  }
659

660
  // added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if
661
  // it is added into the meta store
662
  if (added) {
7,698✔
663
    // only handled in the leader node
664
    if (isLeader) {
7,688✔
665
      tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
7,664✔
666

667
      if (restored) {
7,664!
668
        SStreamTask* p = NULL;
7,664✔
669
        code = streamMetaAcquireTask(pMeta, streamId, taskId, &p);
7,664✔
670
        if ((p != NULL) && (code == 0) && (p->info.fillHistory == 0)) {
7,664!
671
          code = tqStreamStartOneTaskAsync(pMeta, cb, streamId, taskId);
4,902✔
672
        }
673

674
        if (p != NULL) {
7,664!
675
          streamMetaReleaseTask(pMeta, p);
7,664✔
676
        }
677
      } else {
678
        tqWarn("s-task:0x%x not launched since vnode(vgId:%d) not ready", taskId, vgId);
×
679
      }
680

681
    } else {
682
      tqDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId);
24!
683
    }
684
  } else {
685
    tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store, total:%d", vgId, taskId, numOfTasks);
10!
686
  }
687

688
  return code;
7,698✔
689
}
690

691
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
3,346✔
692
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
3,346✔
693
  int32_t              code = 0;
3,346✔
694
  int32_t              vgId = pMeta->vgId;
3,346✔
695
  STaskId              hTaskId = {0};
3,346✔
696
  SStreamTask*         pTask = NULL;
3,346✔
697

698
  tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
3,346!
699

700
  streamMetaWLock(pMeta);
3,347✔
701

702
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
3,347✔
703
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
3,347✔
704
  if (code == 0) {
3,347!
705
    if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
3,347✔
706
      hTaskId.streamId = pTask->hTaskInfo.id.streamId;
572✔
707
      hTaskId.taskId = pTask->hTaskInfo.id.taskId;
572✔
708
    }
709

710
    // clear the relationship, and then release the stream tasks, to avoid invalid accessing of already freed
711
    // related stream(history) task
712
    streamTaskSetRemoveBackendFiles(pTask);
3,347✔
713
    code = streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt);
3,347✔
714
    streamMetaReleaseTask(pMeta, pTask);
3,347✔
715

716
    if (code) {
3,347✔
717
      tqError("s-task:0x%x failed to clear related fill-history info, still exists", pReq->taskId);
4!
718
    }
719
  }
720

721
  streamMetaWUnLock(pMeta);
3,347✔
722

723
  // drop the related fill-history task firstly
724
  if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
3,347✔
725
    tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
516!
726
    code = streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
516✔
727
    if (code) {
516!
728
      tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x failed", pReq->taskId, vgId,
×
729
              (int32_t)hTaskId.taskId);
730
    }
731
  }
732

733
  // drop the stream task now
734
  code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
3,347✔
735
  if (code) {
3,347!
736
    tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId);
×
737
  }
738

739
  // commit the update
740
  streamMetaWLock(pMeta);
3,347✔
741
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
3,347✔
742
  tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
3,347!
743

744
  if (streamMetaCommit(pMeta) < 0) {
3,347✔
745
    // persist to disk
746
  }
747

748
  streamMetaWUnLock(pMeta);
3,347✔
749
  return 0;  // always return success
3,347✔
750
}
751

752
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg) {
2,246✔
753
  SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
2,246✔
754
  int32_t                    code = 0;
2,246✔
755
  int32_t                    vgId = pMeta->vgId;
2,246✔
756
  SStreamTask*               pTask = NULL;
2,246✔
757

758
  tqDebug("vgId:%d receive msg to update-checkpoint-info for s-task:0x%x", vgId, pReq->taskId);
2,246✔
759

760
  streamMetaWLock(pMeta);
2,246✔
761

762
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
2,246✔
763
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
2,246✔
764
  if (code == 0) {
2,246!
765
    code = streamTaskUpdateTaskCheckpointInfo(pTask, restored, pReq);
2,246✔
766
    streamMetaReleaseTask(pMeta, pTask);
2,246✔
767
  } else {  // failed to get the task.
768
    int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
×
769
    tqError(
×
770
        "vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, numOfTasks:%d, it may have been "
771
        "dropped already",
772
        vgId, pReq->taskId, numOfTasks);
773
  }
774

775
  streamMetaWUnLock(pMeta);
2,246✔
776
  // always return success when handling the requirement issued by mnode during transaction.
777
  return TSDB_CODE_SUCCESS;
2,246✔
778
}
779

780
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
3✔
781
  int32_t vgId = pMeta->vgId;
3✔
782
  int32_t code = 0;
3✔
783
  int64_t st = taosGetTimestampMs();
3✔
784

785
  streamMetaWLock(pMeta);
3✔
786
  if (pMeta->startInfo.startAllTasks == 1) {
3!
787
    pMeta->startInfo.restartCount += 1;
×
788
    tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
×
789
            pMeta->startInfo.restartCount);
790
    streamMetaWUnLock(pMeta);
×
791
    return TSDB_CODE_SUCCESS;
×
792
  }
793

794
  pMeta->startInfo.startAllTasks = 1;
3✔
795
  streamMetaWUnLock(pMeta);
3✔
796

797
  terrno = 0;
3✔
798
  tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d, ts:%" PRId64, vgId,
3!
799
         pMeta->updateInfo.completeTransId, pMeta->updateInfo.completeTs);
800

801
  streamMetaWLock(pMeta);
3✔
802
  streamMetaClear(pMeta);
3✔
803

804
  int64_t el = taosGetTimestampMs() - st;
3✔
805
  tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.);
3!
806

807
  streamMetaLoadAllTasks(pMeta);
3✔
808

809
  {
810
    STaskStartInfo* pStartInfo = &pMeta->startInfo;
3✔
811
    taosHashClear(pStartInfo->pReadyTaskSet);
3✔
812
    taosHashClear(pStartInfo->pFailedTaskSet);
3✔
813
    pStartInfo->readyTs = 0;
3✔
814
  }
815

816
  if (isLeader && !tsDisableStream) {
3!
817
    streamMetaWUnLock(pMeta);
3✔
818
    code = streamMetaStartAllTasks(pMeta);
3✔
819
  } else {
820
    streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
×
821
    pMeta->startInfo.restartCount = 0;
×
822
    streamMetaWUnLock(pMeta);
×
823
    tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId);
×
824
  }
825

826
  code = terrno;
3✔
827
  return code;
3✔
828
}
829

830
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
61,290✔
831
  int32_t  code = 0;
61,290✔
832
  int32_t  vgId = pMeta->vgId;
61,290✔
833
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
61,290✔
834
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
61,290✔
835
  SDecoder decoder;
836

837
  SStreamTaskRunReq req = {0};
61,290✔
838
  tDecoderInit(&decoder, (uint8_t*)msg, len);
61,290✔
839
  if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
61,427!
UNCOV
840
    tqError("vgId:%d failed to decode task run req, code:%s", pMeta->vgId, tstrerror(code));
×
UNCOV
841
    tDecoderClear(&decoder);
×
UNCOV
842
    return TSDB_CODE_SUCCESS;
×
843
  }
844

845
  tDecoderClear(&decoder);
61,428✔
846

847
  int32_t type = req.reqType;
61,428✔
848
  if (type == STREAM_EXEC_T_START_ONE_TASK) {
61,428✔
849
    code = streamMetaStartOneTask(pMeta, req.streamId, req.taskId);
4,917✔
850
    return 0;
4,916✔
851
  } else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
56,511✔
852
    code = streamMetaStartAllTasks(pMeta);
10,207✔
853
    return 0;
10,207✔
854
  } else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) {
46,304✔
855
    code = restartStreamTasks(pMeta, isLeader);
3✔
856
    return 0;
3✔
857
  } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
46,301✔
858
    code = streamMetaStopAllTasks(pMeta);
4,768✔
859
    return 0;
4,769✔
860
  } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
41,533!
UNCOV
861
    code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
×
UNCOV
862
    return code;
×
863
  } else if (type == STREAM_EXEC_T_RESUME_TASK) {  // task resume to run after idle for a while
41,533✔
864
    SStreamTask* pTask = NULL;
6,849✔
865
    code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
6,849✔
866

867
    if (pTask != NULL && (code == 0)) {
6,849!
868
      char* pStatus = NULL;
6,849✔
869
      if (streamTaskReadyToRun(pTask, &pStatus)) {
6,849!
870
        int64_t execTs = pTask->status.lastExecTs;
6,849✔
871
        int32_t idle = taosGetTimestampMs() - execTs;
6,849✔
872
        tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs);
6,849✔
873

874
        code = streamResumeTask(pTask);
6,849✔
875
      } else {
UNCOV
876
        int8_t status = streamTaskSetSchedStatusInactive(pTask);
×
UNCOV
877
        tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
×
878
                pTask->id.idStr, pStatus, status);
879
      }
880
      streamMetaReleaseTask(pMeta, pTask);
6,849✔
881
    }
882

883
    return code;
6,849✔
884
  }
885

886
  SStreamTask* pTask = NULL;
34,684✔
887
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
34,684✔
888
  if ((pTask != NULL) && (code == 0)) {  // even in halt status, the data in inputQ must be processed
34,684!
889
    char* p = NULL;
34,662✔
890
    if (streamTaskReadyToRun(pTask, &p)) {
34,662✔
891
      tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId,
34,638✔
892
              pTask->id.idStr, p, pTask->chkInfo.nextProcessVer);
893
      (void)streamExecTask(pTask);
34,638✔
894
    } else {
895
      int8_t status = streamTaskSetSchedStatusInactive(pTask);
24✔
896
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
24!
897
              pTask->id.idStr, p, status);
898
    }
899

900
    streamMetaReleaseTask(pMeta, pTask);
34,662✔
901
    return 0;
34,662✔
902
  } else {  // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
903
    // todo add one function to handle this
904
    tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, req.taskId);
22!
905
    return code;
22✔
906
  }
907
}
908

909
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
1,032✔
910
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
1,032✔
911
  int32_t         vgId = pMeta->vgId;
1,032✔
912
  bool            scanWal = false;
1,032✔
913
  int32_t         code = 0;
1,032✔
914

915
  streamMetaWLock(pMeta);
1,032✔
916
  if (pStartInfo->startAllTasks == 1) {
1,032!
UNCOV
917
    tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId,
×
918
            pMeta->startInfo.restartCount);
919
  } else {  // not in starting procedure
920
    bool allReady = streamMetaAllTasksReady(pMeta);
1,032✔
921

922
    if ((pStartInfo->restartCount > 0) && (!allReady)) {
1,032!
923
      // if all tasks are ready now, do NOT restart again, and reset the value of pStartInfo->restartCount
924
      pStartInfo->restartCount -= 1;
×
UNCOV
925
      tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", vgId, pMeta->role,
×
926
              pStartInfo->restartCount);
UNCOV
927
      streamMetaWUnLock(pMeta);
×
928

UNCOV
929
      return restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
×
930
    } else {
931
      if (pStartInfo->restartCount == 0) {
1,032!
932
        tqDebug("vgId:%d start all tasks completed in callbackFn, restartCounter is 0", pMeta->vgId);
1,032✔
UNCOV
933
      } else if (allReady) {
×
UNCOV
934
        pStartInfo->restartCount = 0;
×
UNCOV
935
        tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
×
936
      }
937

938
      scanWal = true;
1,032✔
939
    }
940
  }
941

942
  streamMetaWUnLock(pMeta);
1,032✔
943

944
  if (scanWal && (vgId != SNODE_HANDLE)) {
1,032!
945
    tqDebug("vgId:%d start scan wal for executing tasks", vgId);
1,031✔
946
    code = tqScanWalAsync(pMeta->ahandle, true);
1,031✔
947
  }
948

949
  return code;
1,032✔
950
}
951

952
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
×
UNCOV
953
  SVResetStreamTaskReq* pReq = (SVResetStreamTaskReq*)pMsg;
×
954

955
  SStreamTask* pTask = NULL;
×
UNCOV
956
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
×
UNCOV
957
  if (pTask == NULL || (code != 0)) {
×
958
    tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already",
×
959
            pMeta->vgId, pReq->taskId);
960
    return TSDB_CODE_SUCCESS;
×
961
  }
962

UNCOV
963
  tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
×
964

UNCOV
965
  streamMutexLock(&pTask->lock);
×
UNCOV
966
  streamTaskClearCheckInfo(pTask, true);
×
967

968
  streamTaskSetFailedCheckpointId(pTask, pReq->chkptId);
×
969

970
  // clear flag set during do checkpoint, and open inputQ for all upstream tasks
971
  SStreamTaskState pState = streamTaskGetStatus(pTask);
×
UNCOV
972
  if (pState.state == TASK_STATUS__CK) {
×
973
    streamTaskSetStatusReady(pTask);
×
UNCOV
974
    tqDebug("s-task:%s reset checkpoint status to ready", pTask->id.idStr);
×
UNCOV
975
  } else if (pState.state == TASK_STATUS__UNINIT) {
×
976
    //    tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
977
    //    tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
978
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
979
  } else {
UNCOV
980
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
981
  }
982

983
  streamMutexUnlock(&pTask->lock);
×
984

985
  streamMetaReleaseTask(pMeta, pTask);
×
986
  return TSDB_CODE_SUCCESS;
×
987
}
988

UNCOV
989
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
UNCOV
990
  SRetrieveChkptTriggerReq req = {0};
×
991
  SStreamTask*             pTask = NULL;
×
UNCOV
992
  char*                    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
UNCOV
993
  int32_t                  len = pMsg->contLen - sizeof(SMsgHead);
×
994
  SDecoder                 decoder = {0};
×
995

UNCOV
996
  tDecoderInit(&decoder, (uint8_t*)msg, len);
×
997
  if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
×
998
    tDecoderClear(&decoder);
×
UNCOV
999
    tqError("vgId:%d invalid retrieve checkpoint-trigger req received", pMeta->vgId);
×
UNCOV
1000
    return TSDB_CODE_INVALID_MSG;
×
1001
  }
UNCOV
1002
  tDecoderClear(&decoder);
×
1003

1004
  int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
×
UNCOV
1005
  if (pTask == NULL || (code != 0)) {
×
UNCOV
1006
    tqError("vgId:%d process retrieve checkpoint-trigger, checkpointId:%" PRId64
×
1007
            " from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already",
1008
            pMeta->vgId, req.checkpointId, (int32_t)req.downstreamTaskId, req.upstreamTaskId);
1009
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
1010
  }
1011

1012
  tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64,
×
1013
          req.upstreamTaskId, (int32_t)req.downstreamTaskId, req.checkpointId);
1014

UNCOV
1015
  if (pTask->status.downstreamReady != 1) {
×
UNCOV
1016
    tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
×
1017
            pTask->id.idStr, (int32_t)req.downstreamTaskId);
1018

UNCOV
1019
    code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1020
                                              TSDB_CODE_STREAM_TASK_IVLD_STATUS);
1021
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1022
    return code;
×
1023
  }
1024

1025
  SStreamTaskState pState = streamTaskGetStatus(pTask);
×
UNCOV
1026
  if (pState.state == TASK_STATUS__CK) {  // recv the checkpoint-source/trigger already
×
UNCOV
1027
    int32_t transId = 0;
×
1028
    int64_t checkpointId = 0;
×
1029

UNCOV
1030
    streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId);
×
1031
    if (checkpointId != req.checkpointId) {
×
1032
      tqError("s-task:%s invalid checkpoint-trigger retrieve msg from 0x%" PRIx64 ", current checkpointId:%" PRId64
×
1033
              " req:%" PRId64,
1034
              pTask->id.idStr, req.downstreamTaskId, checkpointId, req.checkpointId);
UNCOV
1035
      streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1036
      return TSDB_CODE_INVALID_MSG;
×
1037
    }
1038

1039
    if (streamTaskAlreadySendTrigger(pTask, req.downstreamNodeId)) {
×
1040
      // re-send the lost checkpoint-trigger msg to downstream task
UNCOV
1041
      tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
×
1042
              (int32_t)req.downstreamTaskId, checkpointId, transId);
1043
      code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1044
                                                TSDB_CODE_SUCCESS);
1045
    } else {  // not send checkpoint-trigger yet, wait
UNCOV
1046
      int32_t recv = 0, total = 0;
×
1047
      streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
×
1048

UNCOV
1049
      if (recv == total) {  // add the ts info
×
UNCOV
1050
        tqWarn("s-task:%s all upstream send checkpoint-source/trigger, but not processed yet, wait", pTask->id.idStr);
×
1051
      } else {
UNCOV
1052
        tqWarn(
×
1053
            "s-task:%s not all upstream send checkpoint-source/trigger, total recv:%d/%d, wait for all upstream "
1054
            "sending checkpoint-source/trigger",
1055
            pTask->id.idStr, recv, total);
1056
      }
UNCOV
1057
      code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1058
                                                TSDB_CODE_ACTION_IN_PROGRESS);
1059
    }
1060
  } else {  // upstream not recv the checkpoint-source/trigger till now
UNCOV
1061
    if (!(pState.state == TASK_STATUS__READY || pState.state == TASK_STATUS__HALT)) {
×
1062
      tqFatal("s-task:%s invalid task status:%s", pTask->id.idStr, pState.name);
×
1063
    }
1064

1065
    tqWarn(
×
1066
        "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
1067
        "upstream sending checkpoint-source/trigger",
1068
        pTask->id.idStr);
UNCOV
1069
    code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1070
                                              TSDB_CODE_ACTION_IN_PROGRESS);
1071
  }
1072

UNCOV
1073
  streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1074
  return code;
×
1075
}
1076

1077
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
1078
  SCheckpointTriggerRsp rsp = {0};
×
UNCOV
1079
  SStreamTask*          pTask = NULL;
×
UNCOV
1080
  char*                 msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
UNCOV
1081
  int32_t               len = pMsg->contLen - sizeof(SMsgHead);
×
UNCOV
1082
  SDecoder              decoder = {0};
×
1083

UNCOV
1084
  tDecoderInit(&decoder, (uint8_t*)msg, len);
×
UNCOV
1085
  if (tDecodeCheckpointTriggerRsp(&decoder, &rsp) < 0) {
×
UNCOV
1086
    tDecoderClear(&decoder);
×
1087
    tqError("vgId:%d invalid retrieve checkpoint-trigger rsp received", pMeta->vgId);
×
UNCOV
1088
    return TSDB_CODE_INVALID_MSG;
×
1089
  }
1090
  tDecoderClear(&decoder);
×
1091

UNCOV
1092
  int32_t code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.taskId, &pTask);
×
UNCOV
1093
  if (pTask == NULL || (code != 0)) {
×
UNCOV
1094
    tqError(
×
1095
        "vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already",
1096
        pMeta->vgId, rsp.taskId);
UNCOV
1097
    return code;
×
1098
  }
1099

1100
  tqDebug(
×
1101
      "s-task:%s recv re-send checkpoint-trigger msg through retrieve/rsp channel, upstream:0x%x, checkpointId:%" PRId64
1102
      ", transId:%d",
1103
      pTask->id.idStr, rsp.upstreamTaskId, rsp.checkpointId, rsp.transId);
1104

UNCOV
1105
  code = streamTaskProcessCheckpointTriggerRsp(pTask, &rsp);
×
UNCOV
1106
  streamMetaReleaseTask(pMeta, pTask);
×
1107
  return code;
×
1108
}
1109

1110
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
708✔
1111
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
708✔
1112

1113
  SStreamTask* pTask = NULL;
708✔
1114
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
708✔
1115
  if (pTask == NULL || (code != 0)) {
709!
UNCOV
1116
    tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
×
1117
            pReq->taskId);
1118
    // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
UNCOV
1119
    return TSDB_CODE_SUCCESS;
×
1120
  }
1121

1122
  tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
709!
1123
  streamTaskPause(pTask);
709✔
1124

1125
  SStreamTask* pHistoryTask = NULL;
709✔
1126
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
709!
UNCOV
1127
    pHistoryTask = NULL;
×
UNCOV
1128
    code = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId, &pHistoryTask);
×
UNCOV
1129
    if (pHistoryTask == NULL || (code != 0)) {
×
UNCOV
1130
      tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
×
1131
              ", it may have been dropped already",
1132
              pMeta->vgId, pTask->hTaskInfo.id.taskId);
UNCOV
1133
      streamMetaReleaseTask(pMeta, pTask);
×
1134

1135
      // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
UNCOV
1136
      return TSDB_CODE_SUCCESS;
×
1137
    }
1138

UNCOV
1139
    tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
×
1140

UNCOV
1141
    streamTaskPause(pHistoryTask);
×
UNCOV
1142
    streamMetaReleaseTask(pMeta, pHistoryTask);
×
1143
  }
1144

1145
  streamMetaReleaseTask(pMeta, pTask);
709✔
1146
  return TSDB_CODE_SUCCESS;
709✔
1147
}
1148

1149
static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t sversion, int8_t igUntreated,
702✔
1150
                                       bool fromVnode) {
1151
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
702!
1152
  int32_t      vgId = pMeta->vgId;
702✔
1153
  int32_t      code = 0;
702✔
1154

1155
  streamTaskResume(pTask);
702✔
1156
  ETaskStatus status = streamTaskGetStatus(pTask).state;
702✔
1157

1158
  int32_t level = pTask->info.taskLevel;
702✔
1159
  if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
702!
1160
    // no lock needs to secure the access of the version
1161
    if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
702!
1162
      // discard all the data  when the stream task is suspended.
1163
      walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
169✔
1164
      tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
169!
1165
              ", schedStatus:%d",
1166
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1167
    } else {  // from the previous paused version and go on
1168
      tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
533!
1169
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1170
    }
1171

1172
    if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
702!
UNCOV
1173
      pTask->hTaskInfo.operatorOpen = false;
×
UNCOV
1174
      code = streamStartScanHistoryAsync(pTask, igUntreated);
×
1175
    } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
702!
1176
      code = tqScanWalAsync((STQ*)handle, false);
351✔
1177
    } else {
1178
      code = streamTrySchedExec(pTask);
351✔
1179
    }
1180
  }
1181

1182
  return code;
702✔
1183
}
1184

1185
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode) {
702✔
1186
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
702✔
1187

1188
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
702!
1189

1190
  SStreamTask* pTask = NULL;
702✔
1191
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
702✔
1192
  if (pTask == NULL || (code != 0)) {
702!
UNCOV
1193
    tqError("s-task:0x%x failed to acquire task to resume, it may have been dropped or stopped", pReq->taskId);
×
UNCOV
1194
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1195
  }
1196

1197
  streamMutexLock(&pTask->lock);
702✔
1198
  SStreamTaskState pState = streamTaskGetStatus(pTask);
702✔
1199
  tqDebug("s-task:%s start to resume from paused, current status:%s", pTask->id.idStr, pState.name);
702!
1200
  streamMutexUnlock(&pTask->lock);
702✔
1201

1202
  code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
702✔
1203
  if (code != 0) {
702!
UNCOV
1204
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1205
    return code;
×
1206
  }
1207

1208
  STaskId*     pHTaskId = &pTask->hTaskInfo.id;
702✔
1209
  SStreamTask* pHTask = NULL;
702✔
1210
  code = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
702✔
1211
  if (pHTask && (code == 0)) {
702!
UNCOV
1212
    streamMutexLock(&pHTask->lock);
×
UNCOV
1213
    SStreamTaskState p = streamTaskGetStatus(pHTask);
×
UNCOV
1214
    tqDebug("s-task:%s related history task start to resume from paused, current status:%s", pHTask->id.idStr, p.name);
×
UNCOV
1215
    streamMutexUnlock(&pHTask->lock);
×
1216

UNCOV
1217
    code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode);
×
UNCOV
1218
    tqDebug("s-task:%s resume complete, code:%s", pHTask->id.idStr, tstrerror(code));
×
1219

UNCOV
1220
    streamMetaReleaseTask(pMeta, pHTask);
×
1221
  }
1222

1223
  return TSDB_CODE_SUCCESS;
702✔
1224
}
1225

UNCOV
1226
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); }
×
1227

1228
int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) {
5,313✔
1229
  rpcFreeCont(pMsg->pCont);
5,313✔
1230
  pMsg->pCont = NULL;
5,313✔
1231
  return TSDB_CODE_SUCCESS;
5,313✔
1232
}
1233

1234
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
4,084✔
1235
  SMStreamHbRspMsg rsp = {0};
4,084✔
1236
  int32_t          code = 0;
4,084✔
1237
  SDecoder         decoder;
1238
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
4,084✔
1239
  int32_t          len = pMsg->contLen - sizeof(SMsgHead);
4,084✔
1240

1241
  tDecoderInit(&decoder, (uint8_t*)msg, len);
4,084✔
1242
  code = tDecodeStreamHbRsp(&decoder, &rsp);
4,084✔
1243
  if (code < 0) {
4,084!
UNCOV
1244
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1245
    tDecoderClear(&decoder);
×
UNCOV
1246
    tqError("vgId:%d failed to parse hb rsp msg, code:%s", pMeta->vgId, tstrerror(terrno));
×
UNCOV
1247
    return terrno;
×
1248
  }
1249

1250
  tDecoderClear(&decoder);
4,084✔
1251
  return streamProcessHeartbeatRsp(pMeta, &rsp);
4,084✔
1252
}
1253

1254
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
2,668✔
1255

1256
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
2,645✔
1257

1258
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
4,202✔
1259
  SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont;
4,202✔
1260

1261
  SStreamTask* pTask = NULL;
4,202✔
1262
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->downstreamTaskId, &pTask);
4,202✔
1263
  if (pTask == NULL || (code != 0)) {
4,202!
1264
    tqError("vgId:%d failed to acquire task:0x%x when handling checkpoint-ready msg, it may have been dropped",
3!
1265
            pRsp->downstreamNodeId, pRsp->downstreamTaskId);
1266
    return code;
3✔
1267
  }
1268

1269
  code = streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId);
4,199✔
1270
  streamMetaReleaseTask(pMeta, pTask);
4,199✔
1271
  return code;
4,199✔
1272
}
1273

1274
int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
15✔
1275
  int32_t                vgId = pMeta->vgId;
15✔
1276
  int32_t                code = 0;
15✔
1277
  SStreamTask*           pTask = NULL;
15✔
1278
  char*                  msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
15✔
1279
  int32_t                len = pMsg->contLen - sizeof(SMsgHead);
15✔
1280
  int64_t                now = taosGetTimestampMs();
15✔
1281
  SDecoder               decoder;
1282
  SRestoreCheckpointInfo req = {0};
15✔
1283

1284
  tDecoderInit(&decoder, (uint8_t*)msg, len);
15✔
1285
  if ((code = tDecodeRestoreCheckpointInfo(&decoder, &req)) < 0) {
15!
UNCOV
1286
    tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code));
×
1287
    tDecoderClear(&decoder);
×
UNCOV
1288
    return TSDB_CODE_SUCCESS;
×
1289
  }
1290

1291
  tDecoderClear(&decoder);
15✔
1292

1293
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
15✔
1294
  if (pTask == NULL || (code != 0)) {
15!
1295
    tqError("vgId:%d process consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
×
1296
            pMeta->vgId, req.taskId);
1297
    // ignore this code to avoid error code over write
1298
    int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
×
UNCOV
1299
    if (ret) {
×
UNCOV
1300
      tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret));
×
1301
    }
1302

UNCOV
1303
    return 0;
×
1304
  }
1305

1306
  // discard the rsp, since it is expired.
1307
  if (req.startTs < pTask->execInfo.created) {
15!
UNCOV
1308
    tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64
×
1309
           " from task createTs:%" PRId64 " < task createTs:%" PRId64 ", discard",
1310
           pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs,
1311
           pTask->execInfo.created);
UNCOV
1312
    streamMetaAddFailedTaskSelf(pTask, now);
×
1313
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1314
    return TSDB_CODE_SUCCESS;
×
1315
  }
1316

1317
  tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64 " from mnode",
15✔
1318
          pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId);
1319

1320
  streamMutexLock(&pTask->lock);
15✔
1321
  if (pTask->chkInfo.checkpointId < req.checkpointId) {
15!
UNCOV
1322
    tqFatal("s-task:%s vgId:%d invalid consensus-checkpointId:%" PRId64 ", greater than existed checkpointId:%" PRId64,
×
1323
            pTask->id.idStr, vgId, req.checkpointId, pTask->chkInfo.checkpointId);
1324

UNCOV
1325
    streamMutexUnlock(&pTask->lock);
×
UNCOV
1326
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1327
    return 0;
×
1328
  }
1329

1330
  SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo;
15✔
1331
  if (pConsenInfo->consenChkptTransId >= req.transId) {
15!
UNCOV
1332
    tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
×
1333
            pConsenInfo->consenChkptTransId, req.transId);
UNCOV
1334
    streamMutexUnlock(&pTask->lock);
×
UNCOV
1335
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1336
    return TSDB_CODE_SUCCESS;
×
1337
  }
1338

1339
  if (pTask->chkInfo.checkpointId != req.checkpointId) {
15!
UNCOV
1340
    tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64 " transId:%d", pTask->id.idStr, vgId,
×
1341
            pTask->chkInfo.checkpointId, req.checkpointId, req.transId);
UNCOV
1342
    pTask->chkInfo.checkpointId = req.checkpointId;
×
UNCOV
1343
    tqSetRestoreVersionInfo(pTask);
×
1344
  } else {
1345
    tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current id, transId:%d not update",
15✔
1346
            pTask->id.idStr, vgId, req.checkpointId, req.transId);
1347
  }
1348

1349
  streamTaskSetConsenChkptIdRecv(pTask, req.transId, now);
15✔
1350
  streamMutexUnlock(&pTask->lock);
15✔
1351

1352
  if (pMeta->role == NODE_ROLE_LEADER) {
15!
1353
    code = tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
15✔
1354
    if (code) {
15!
UNCOV
1355
      tqError("s-task:0x%x vgId:%d failed start task async, code:%s", req.taskId, vgId, tstrerror(code));
×
1356
    }
1357
  } else {
UNCOV
1358
    tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr);
×
1359
  }
1360

1361
  streamMetaReleaseTask(pMeta, pTask);
15✔
1362
  return 0;
15✔
1363
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc