• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3523

06 Nov 2024 02:29AM UTC coverage: 55.861% (-2.4%) from 58.216%
#3523

push

travis-ci

web-flow
Merge pull request #28551 from taosdata/feat/TS-5215-2

test(blob): testing & fixes for blob

106075 of 245834 branches covered (43.15%)

Branch coverage included in aggregate %.

0 of 15 new or added lines in 2 files covered. (0.0%)

17003 existing lines in 254 files now uncovered.

181910 of 269703 relevant lines covered (67.45%)

1527639.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.96
/source/dnode/vnode/src/tqCommon/tqCommon.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsgcb.h"
17
#include "tq.h"
18
#include "tstream.h"
19

20
typedef struct SMStreamCheckpointReadyRspMsg {
21
  SMsgHead head;
22
  int64_t  streamId;
23
  int32_t  upstreamTaskId;
24
  int32_t  upstreamNodeId;
25
  int32_t  downstreamTaskId;
26
  int32_t  downstreamNodeId;
27
  int64_t  checkpointId;
28
  int32_t  transId;
29
} SMStreamCheckpointReadyRspMsg;
30

31
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
32

33
int32_t tqExpandStreamTask(SStreamTask* pTask) {
5,405✔
34
  SStreamMeta* pMeta = pTask->pMeta;
5,405✔
35
  int32_t      vgId = pMeta->vgId;
5,405✔
36
  int64_t      st = taosGetTimestampMs();
5,405✔
37
  int64_t      streamId = 0;
5,405✔
38
  int32_t      taskId = 0;
5,405✔
39
  int32_t      code = 0;
5,405✔
40

41
  tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId);
5,405✔
42

43
  if (pTask->info.fillHistory) {
5,405✔
44
    streamId = pTask->streamTaskId.streamId;
1,394✔
45
    taskId = pTask->streamTaskId.taskId;
1,394✔
46
  } else {
47
    streamId = pTask->id.streamId;
4,011✔
48
    taskId = pTask->id.taskId;
4,011✔
49
  }
50

51
  // sink task does not need the pState
52
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
5,405✔
53
    pTask->pState = streamStateOpen(pMeta->path, pTask, streamId, taskId);
2,893✔
54
    if (pTask->pState == NULL) {
2,893!
55
      tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
×
56
      return terrno;
×
57
    } else {
58
      tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
2,893✔
59
    }
60
  }
61

62
  SReadHandle handle = {
5,405✔
63
      .checkpointId = pTask->chkInfo.checkpointId,
5,405✔
64
      .pStateBackend = pTask->pState,
5,405✔
65
      .fillHistory = pTask->info.fillHistory,
5,405✔
66
      .winRange = pTask->dataRange.window,
67
  };
68

69
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
5,405✔
70
    handle.vnode = ((STQ*)pMeta->ahandle)->pVnode;
2,767✔
71
    handle.initTqReader = 1;
2,767✔
72
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
2,638✔
73
    handle.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
126✔
74
  }
75

76
  initStorageAPI(&handle.api);
5,405✔
77

78
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG) {
5,404✔
79
    code = qCreateStreamExecTaskInfo(&pTask->exec.pExecutor, pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
2,893✔
80
    if (code) {
2,892!
81
      tqError("s-task:%s failed to expand task, code:%s", pTask->id.idStr, tstrerror(code));
×
82
      return code;
×
83
    }
84

85
    code = qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
2,892✔
86
    if (code) {
2,891!
87
      return code;
×
88
    }
89
  }
90

91
  double el = (taosGetTimestampMs() - st) / 1000.0;
5,404✔
92
  tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el);
5,404✔
93

94
  return code;
5,404✔
95
}
96

97
void tqSetRestoreVersionInfo(SStreamTask* pTask) {
5,465✔
98
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
5,465✔
99

100
  // checkpoint ver is the kept version, handled data should be the next version.
101
  if (pChkInfo->checkpointId != 0) {
5,465✔
102
    pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
38✔
103
    pChkInfo->processedVer = pChkInfo->checkpointVer;
38✔
104
    pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
38✔
105

106
    tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
38!
107
           pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
108
  }
109

110
  pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
5,465✔
111
}
5,465✔
112

UNCOV
113
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
×
UNCOV
114
  int32_t vgId = pMeta->vgId;
×
UNCOV
115
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
×
UNCOV
116
  if (numOfTasks == 0) {
×
UNCOV
117
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
UNCOV
118
    return 0;
×
119
  }
120

UNCOV
121
  tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
×
122

UNCOV
123
  int32_t type = restart ? STREAM_EXEC_T_RESTART_ALL_TASKS : STREAM_EXEC_T_START_ALL_TASKS;
×
UNCOV
124
  return streamTaskSchedTask(cb, vgId, 0, 0, type);
×
125
}
126

127
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
4,007✔
128
  int32_t vgId = pMeta->vgId;
4,007✔
129
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
4,007✔
130
  if (numOfTasks == 0) {
4,007!
UNCOV
131
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
UNCOV
132
    return 0;
×
133
  }
134

135
  tqDebug("vgId:%d start task:0x%x async", vgId, taskId);
4,007✔
136
  return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK);
4,007✔
137
}
138

139
// this is to process request from transaction, always return true.
UNCOV
140
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
×
UNCOV
141
  int32_t      vgId = pMeta->vgId;
×
UNCOV
142
  char*        msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
UNCOV
143
  int32_t      len = pMsg->contLen - sizeof(SMsgHead);
×
UNCOV
144
  SRpcMsg      rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
×
UNCOV
145
  int64_t      st = taosGetTimestampMs();
×
UNCOV
146
  bool         updated = false;
×
UNCOV
147
  int32_t      code = 0;
×
UNCOV
148
  SStreamTask* pTask = NULL;
×
UNCOV
149
  SStreamTask* pHTask = NULL;
×
150

UNCOV
151
  SStreamTaskNodeUpdateMsg req = {0};
×
152

153
  SDecoder decoder;
UNCOV
154
  tDecoderInit(&decoder, (uint8_t*)msg, len);
×
UNCOV
155
  if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
×
UNCOV
156
    rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
×
UNCOV
157
    tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code));
×
158
    tDecoderClear(&decoder);
×
159
    return rsp.code;
×
160
  }
161

UNCOV
162
  tDecoderClear(&decoder);
×
163

UNCOV
164
  int32_t gError = streamGetFatalError(pMeta);
×
UNCOV
165
  if (gError != 0) {
×
UNCOV
166
    tqError("vgId:%d global fatal occurs, code:%s, ts:%" PRId64 " func:%s", pMeta->vgId, tstrerror(gError),
×
167
            pMeta->fatalInfo.ts, pMeta->fatalInfo.func);
168
    return 0;
×
169
  }
170

171
  // update the nodeEpset when it exists
UNCOV
172
  streamMetaWLock(pMeta);
×
173

174
  // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
UNCOV
175
  STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
×
UNCOV
176
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
×
UNCOV
177
  if (code != 0) {
×
UNCOV
178
    tqError("vgId:%d failed to acquire task:0x%x when handling update task epset, it may have been dropped", vgId,
×
179
            req.taskId);
180
    rsp.code = TSDB_CODE_SUCCESS;
×
UNCOV
181
    streamMetaWUnLock(pMeta);
×
182
    taosArrayDestroy(req.pNodeList);
×
183
    return rsp.code;
×
184
  }
185

UNCOV
186
  const char* idstr = pTask->id.idStr;
×
187

UNCOV
188
  if (req.transId <= 0) {
×
UNCOV
189
    tqError("vgId:%d invalid update nodeEp task, transId:%d, discard", vgId, req.taskId);
×
UNCOV
190
    rsp.code = TSDB_CODE_SUCCESS;
×
191

192
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
193
    streamMetaWUnLock(pMeta);
×
194

195
    taosArrayDestroy(req.pNodeList);
×
UNCOV
196
    return rsp.code;
×
197
  }
198

199
  // info needs to be kept till the new trans to update the nodeEp arrived.
UNCOV
200
  bool update = streamMetaInitUpdateTaskList(pMeta, req.transId);
×
UNCOV
201
  if (!update) {
×
UNCOV
202
    rsp.code = TSDB_CODE_SUCCESS;
×
203

204
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
205
    streamMetaWUnLock(pMeta);
×
206

207
    taosArrayDestroy(req.pNodeList);
×
UNCOV
208
    return rsp.code;
×
209
  }
210

211
  // duplicate update epset msg received, discard this redundant message
UNCOV
212
  STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
×
213

UNCOV
214
  void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
×
UNCOV
215
  if (pReqTask != NULL) {
×
UNCOV
216
    tqDebug("s-task:%s (vgId:%d) already update in transId:%d, discard the nodeEp update msg", idstr, vgId,
×
217
            req.transId);
218
    rsp.code = TSDB_CODE_SUCCESS;
×
219

220
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
221
    streamMetaWUnLock(pMeta);
×
222

223
    taosArrayDestroy(req.pNodeList);
×
UNCOV
224
    return rsp.code;
×
225
  }
226

UNCOV
227
  updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
×
228

229
  // send the checkpoint-source-rsp for source task to end the checkpoint trans in mnode
UNCOV
230
  code = streamTaskSendCheckpointsourceRsp(pTask);
×
UNCOV
231
  if (code) {
×
UNCOV
232
    tqError("%s failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
×
233
  }
234
  streamTaskResetStatus(pTask);
×
235

UNCOV
236
  streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
×
237

UNCOV
238
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
×
UNCOV
239
    code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->hTaskInfo.id, &pHTask);
×
UNCOV
240
    if (code != 0) {
×
UNCOV
241
      tqError(
×
242
          "vgId:%d failed to acquire fill-history task:0x%x when handling update, may have been dropped already, rel "
243
          "stream task:0x%x",
244
          vgId, (uint32_t)pTask->hTaskInfo.id.taskId, req.taskId);
UNCOV
245
      CLEAR_RELATED_FILLHISTORY_TASK(pTask);
×
246
    } else {
247
      tqDebug("s-task:%s fill-history task update nodeEp along with stream task", pHTask->id.idStr);
×
UNCOV
248
      bool updateEpSet = streamTaskUpdateEpsetInfo(pHTask, req.pNodeList);
×
UNCOV
249
      if (updateEpSet) {
×
UNCOV
250
        updated = updateEpSet;
×
251
      }
252

UNCOV
253
      streamTaskResetStatus(pHTask);
×
UNCOV
254
      streamTaskStopMonitorCheckRsp(&pHTask->taskCheckInfo, pHTask->id.idStr);
×
255
    }
256
  }
257

258
  // stream do update the nodeEp info, write it into stream meta.
UNCOV
259
  if (updated) {
×
UNCOV
260
    tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId);
×
UNCOV
261
    code = streamMetaSaveTask(pMeta, pTask);
×
UNCOV
262
    if (code) {
×
UNCOV
263
      tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code));
×
264
    }
265

UNCOV
266
    if (pHTask != NULL) {
×
UNCOV
267
      code = streamMetaSaveTask(pMeta, pHTask);
×
UNCOV
268
      if (code) {
×
UNCOV
269
        tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code));
×
270
      }
271
    }
272
  } else {
UNCOV
273
    tqDebug("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId);
×
274
  }
275

UNCOV
276
  code = streamTaskStop(pTask);
×
UNCOV
277
  if (code) {
×
UNCOV
278
    tqError("s-task:%s vgId:%d failed to stop task, code:%s", idstr, vgId, tstrerror(code));
×
279
  }
280

UNCOV
281
  if (pHTask != NULL) {
×
UNCOV
282
    code = streamTaskStop(pHTask);
×
UNCOV
283
    if (code) {
×
UNCOV
284
      tqError("s-task:%s vgId:%d failed to stop related history task, code:%s", idstr, vgId, tstrerror(code));
×
285
    }
286
  }
287

288
  // keep info
UNCOV
289
  streamMetaAddIntoUpdateTaskList(pMeta, pTask, (pHTask != NULL) ? (pHTask) : NULL, req.transId, st);
×
UNCOV
290
  streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
291
  streamMetaReleaseTask(pMeta, pHTask);
×
292

UNCOV
293
  rsp.code = TSDB_CODE_SUCCESS;
×
294

295
  // possibly only handle the stream task.
UNCOV
296
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
×
UNCOV
297
  int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
×
298

UNCOV
299
  if (restored) {
×
UNCOV
300
    tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId);
×
UNCOV
301
    pMeta->startInfo.tasksWillRestart = 1;
×
302
  }
303

UNCOV
304
  if (updateTasks < numOfTasks) {
×
UNCOV
305
    tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
×
306
            updateTasks, (numOfTasks - updateTasks));
307
  } else {
UNCOV
308
    if ((code = streamMetaCommit(pMeta)) < 0) {
×
309
      // always return true
UNCOV
310
      streamMetaWUnLock(pMeta);
×
UNCOV
311
      taosArrayDestroy(req.pNodeList);
×
312
      return TSDB_CODE_SUCCESS;
×
313
    }
314

UNCOV
315
    streamMetaClearSetUpdateTaskListComplete(pMeta);
×
316

UNCOV
317
    if (!restored) {
×
UNCOV
318
      tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId);
×
319
    } else {
UNCOV
320
      tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId);
×
321
#if 0
322
      taosMSleep(5000);// for test purpose, to trigger the leader election
323
#endif
UNCOV
324
      code = tqStreamTaskStartAsync(pMeta, cb, true);
×
UNCOV
325
      if (code) {
×
UNCOV
326
        tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code));
×
327
      }
328
    }
329
  }
330

UNCOV
331
  streamMetaWUnLock(pMeta);
×
UNCOV
332
  taosArrayDestroy(req.pNodeList);
×
UNCOV
333
  return rsp.code;  // always return true
×
334
}
335

336
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
11,547✔
337
  char*   msgStr = pMsg->pCont;
11,547✔
338
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
11,547✔
339
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
11,547✔
340

341
  SStreamDispatchReq req = {0};
11,547✔
342

343
  SDecoder decoder;
344
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
11,547✔
345
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
11,546!
UNCOV
346
    tDecoderClear(&decoder);
×
UNCOV
347
    return TSDB_CODE_MSG_DECODE_ERROR;
×
348
  }
349
  tDecoderClear(&decoder);
11,506✔
350

351
  tqDebug("s-task:0x%x recv dispatch msg from 0x%x(vgId:%d)", req.taskId, req.upstreamTaskId, req.upstreamNodeId);
11,528✔
352

353
  SStreamTask* pTask = NULL;
11,528✔
354
  int32_t      code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
11,528✔
355
  if (pTask && (code == 0)) {
11,509!
356
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
11,519✔
357
    if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0) {
11,519!
UNCOV
358
      return -1;
×
359
    }
360
    tCleanupStreamDispatchReq(&req);
11,538✔
361
    streamMetaReleaseTask(pMeta, pTask);
11,540✔
362
    return 0;
11,542✔
363
  } else {
UNCOV
364
    tqError("vgId:%d failed to find task:0x%x to handle the dispatch req, it may have been destroyed already",
×
365
            pMeta->vgId, req.taskId);
366

UNCOV
367
    SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
×
368
    if (pRspHead == NULL) {
7!
UNCOV
369
      tqError("s-task:0x%x send dispatch error rsp, out of memory", req.taskId);
×
UNCOV
370
      return terrno;
×
371
    }
372

373
    pRspHead->vgId = htonl(req.upstreamNodeId);
7✔
374
    if (pRspHead->vgId == 0) {
7!
UNCOV
375
      tqError("vgId:%d invalid dispatch msg from upstream to task:0x%x", pMeta->vgId, req.taskId);
×
UNCOV
376
      return TSDB_CODE_INVALID_MSG;
×
377
    }
378

379
    SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
7✔
380
    pRsp->streamId = htobe64(req.streamId);
7✔
381
    pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
7✔
382
    pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
7✔
383
    pRsp->downstreamNodeId = htonl(pMeta->vgId);
7✔
384
    pRsp->downstreamTaskId = htonl(req.taskId);
7✔
385
    pRsp->msgId = htonl(req.msgId);
7✔
386
    pRsp->stage = htobe64(req.stage);
7✔
387
    pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;
7✔
388

389
    int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
7✔
390
    SRpcMsg rsp = {.code = TSDB_CODE_STREAM_TASK_NOT_EXIST, .info = pMsg->info, .contLen = len, .pCont = pRspHead};
7✔
391
    tqError("s-task:0x%x send dispatch error rsp, no task", req.taskId);
7!
392

393
    tmsgSendRsp(&rsp);
7✔
394
    tCleanupStreamDispatchReq(&req);
7✔
395

396
    return 0;
7✔
397
  }
398
}
399

400
int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
11,553✔
401
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
11,553✔
402

403
  int32_t vgId = pMeta->vgId;
11,553✔
404
  pRsp->upstreamNodeId = htonl(pRsp->upstreamNodeId);
11,553✔
405
  pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
11,553✔
406
  pRsp->streamId = htobe64(pRsp->streamId);
11,553✔
407
  pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId);
11,553✔
408
  pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId);
11,553✔
409
  pRsp->stage = htobe64(pRsp->stage);
11,553✔
410
  pRsp->msgId = htonl(pRsp->msgId);
11,549✔
411

412
  tqDebug("s-task:0x%x vgId:%d recv dispatch-rsp from 0x%x vgId:%d", pRsp->upstreamTaskId, pRsp->upstreamNodeId,
11,549✔
413
          pRsp->downstreamTaskId, pRsp->downstreamNodeId);
414

415
  SStreamTask* pTask = NULL;
11,549✔
416
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId, &pTask);
11,549✔
417
  if (pTask && (code == 0)) {
11,532!
418
    code = streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
11,536✔
419
    streamMetaReleaseTask(pMeta, pTask);
11,547✔
420
    return code;
11,551✔
421
  } else {
UNCOV
422
    tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId);
×
423
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
4✔
424
  }
425
}
426

427
int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
227✔
428
  char*    msgStr = pMsg->pCont;
227✔
429
  char*    msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
227✔
430
  int32_t  msgLen = pMsg->contLen - sizeof(SMsgHead);
227✔
431
  int32_t  code = 0;
227✔
432
  SDecoder decoder;
433

434
  SStreamRetrieveReq req;
435
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
227✔
436
  code = tDecodeStreamRetrieveReq(&decoder, &req);
227✔
437
  tDecoderClear(&decoder);
227✔
438

439
  if (code) {
227!
UNCOV
440
    tqError("vgId:%d failed to decode retrieve msg, discard it", pMeta->vgId);
×
UNCOV
441
    return code;
×
442
  }
443

444
  SStreamTask* pTask = NULL;
227✔
445
  code = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId, &pTask);
227✔
446
  if (pTask == NULL || code != 0) {
227!
UNCOV
447
    tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
×
448
            req.dstTaskId);
449
    tCleanupStreamRetrieveReq(&req);
×
UNCOV
450
    return code;
×
451
  }
452

453
  // enqueue
454
  tqDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d),QID:0x%" PRIx64, pTask->id.idStr,
227✔
455
          pTask->pMeta->vgId, pTask->info.taskLevel, req.srcTaskId, req.srcNodeId, req.reqId);
456

457
  // if task is in ck status, set current ck failed
458
  streamTaskSetCheckpointFailed(pTask);
227✔
459

460
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
227!
461
    code = streamProcessRetrieveReq(pTask, &req);
227✔
462
  } else {
UNCOV
463
    req.srcNodeId = pTask->info.nodeId;
×
UNCOV
464
    req.srcTaskId = pTask->id.taskId;
×
UNCOV
465
    code = streamTaskBroadcastRetrieveReq(pTask, &req);
×
466
  }
467

468
  if (code != TSDB_CODE_SUCCESS) {  // return error not send rsp manually
227!
UNCOV
469
    tqError("s-task:0x%x vgId:%d failed to process retrieve request from 0x%x, code:%s", req.dstTaskId, req.dstNodeId,
×
470
            req.srcTaskId, tstrerror(code));
471
  } else {  // send rsp manually only on success.
472
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
227✔
473
    streamTaskSendRetrieveRsp(&req, &rsp);
227✔
474
  }
475

476
  streamMetaReleaseTask(pMeta, pTask);
227✔
477
  tCleanupStreamRetrieveReq(&req);
227✔
478

479
  // always return success, to disable the auto rsp
480
  return code;
227✔
481
}
482

483
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
8,888✔
484
  char*   msgStr = pMsg->pCont;
8,888✔
485
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
8,888✔
486
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
8,888✔
487
  int32_t code = 0;
8,888✔
488

489
  SStreamTaskCheckReq req;
490
  SStreamTaskCheckRsp rsp = {0};
8,888✔
491

492
  SDecoder decoder;
493

494
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
8,888✔
495
  code = tDecodeStreamTaskCheckReq(&decoder, &req);
8,883✔
496
  tDecoderClear(&decoder);
8,860✔
497

498
  if (code) {
8,870!
UNCOV
499
    tqError("vgId:%d decode check msg failed, not handle this msg", pMeta->vgId);
×
UNCOV
500
    return code;
×
501
  }
502

503
  streamTaskProcessCheckMsg(pMeta, &req, &rsp);
8,870✔
504
  return streamTaskSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId);
8,862✔
505
}
506

507
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
8,883✔
508
  char*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
8,883✔
509
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
8,883✔
510
  int32_t vgId = pMeta->vgId;
8,883✔
511
  int32_t code = TSDB_CODE_SUCCESS;
8,883✔
512

513
  SStreamTaskCheckRsp rsp;
514

515
  SDecoder decoder;
516
  tDecoderInit(&decoder, (uint8_t*)pReq, len);
8,883✔
517
  code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
8,868✔
518
  if (code < 0) {
8,826!
UNCOV
519
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
520
    tDecoderClear(&decoder);
×
521
    tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno));
×
522
    return -1;
×
523
  }
524

525
  tDecoderClear(&decoder);
8,826✔
526
  tqDebug("tq task:0x%x (vgId:%d) recv check rsp(QID:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId,
8,852✔
527
          rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
528

529
  if (!isLeader) {
8,852!
UNCOV
530
    tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
×
531
            rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
UNCOV
532
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
×
533
  }
534

535
  SStreamTask* pTask = NULL;
8,852✔
536
  code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId, &pTask);
8,852✔
537
  if ((pTask == NULL) || (code != 0)) {
8,878!
538
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
5✔
539
  }
540

541
  code = streamTaskProcessCheckRsp(pTask, &rsp);
8,873✔
542
  streamMetaReleaseTask(pMeta, pTask);
8,886✔
543
  return code;
8,891✔
544
}
545

546
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
3,000✔
547
  int32_t vgId = pMeta->vgId;
3,000✔
548
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
3,000✔
549
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
3,000✔
550
  int32_t code = 0;
3,000✔
551

552
  SStreamCheckpointReadyMsg req = {0};
3,000✔
553

554
  SDecoder decoder;
555
  tDecoderInit(&decoder, (uint8_t*)msg, len);
3,000✔
556
  if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) {
2,995!
UNCOV
557
    code = TSDB_CODE_MSG_DECODE_ERROR;
×
UNCOV
558
    tDecoderClear(&decoder);
×
559
    return code;
×
560
  }
561
  tDecoderClear(&decoder);
2,973✔
562

563
  SStreamTask* pTask = NULL;
2,992✔
564
  code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
2,992✔
565
  if (code != 0) {
2,990!
UNCOV
566
    tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId);
×
UNCOV
567
    return code;
×
568
  }
569

570
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
2,990!
UNCOV
571
    tqDebug("vgId:%d s-task:%s recv invalid the checkpoint-ready msg from task:0x%x (vgId:%d), discard", vgId,
×
572
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
573
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
574
    return TSDB_CODE_INVALID_MSG;
×
575
  } else {
576
    tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId,
2,990✔
577
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
578
  }
579

580
  code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamNodeId, req.downstreamTaskId);
2,990✔
581
  streamMetaReleaseTask(pMeta, pTask);
3,002✔
582
  if (code) {
3,005!
UNCOV
583
    return code;
×
584
  }
585

586
  {  // send checkpoint ready rsp
587
    SMStreamCheckpointReadyRspMsg* pReadyRsp = rpcMallocCont(sizeof(SMStreamCheckpointReadyRspMsg));
3,005✔
588
    if (pReadyRsp == NULL) {
3,002!
UNCOV
589
      return terrno;
×
590
    }
591

592
    pReadyRsp->upstreamTaskId = req.upstreamTaskId;
3,002✔
593
    pReadyRsp->upstreamNodeId = req.upstreamNodeId;
3,002✔
594
    pReadyRsp->downstreamTaskId = req.downstreamTaskId;
3,002✔
595
    pReadyRsp->downstreamNodeId = req.downstreamNodeId;
3,002✔
596
    pReadyRsp->checkpointId = req.checkpointId;
3,002✔
597
    pReadyRsp->streamId = req.streamId;
3,002✔
598
    pReadyRsp->head.vgId = htonl(req.downstreamNodeId);
3,002✔
599

600
    SRpcMsg rsp = {.code = 0, .info = pMsg->info, .pCont = pReadyRsp, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)};
3,002✔
601
    tmsgSendRsp(&rsp);
3,002✔
602

603
    pMsg->info.handle = NULL;  // disable auto rsp
3,000✔
604
  }
605

606
  return code;
3,000✔
607
}
608

609
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
5,439✔
610
                                     bool isLeader, bool restored) {
611
  int32_t code = 0;
5,439✔
612
  int32_t vgId = pMeta->vgId;
5,439✔
613
  int32_t numOfTasks = 0;
5,439✔
614
  int32_t taskId = -1;
5,439✔
615
  int64_t streamId = -1;
5,439✔
616
  bool    added = false;
5,439✔
617

618
  if (tsDisableStream) {
5,439!
UNCOV
619
    tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
×
UNCOV
620
    return code;
×
621
  }
622

623
  tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
5,439✔
624

625
  // 1.deserialize msg and build task
626
  int32_t      size = sizeof(SStreamTask);
5,439✔
627
  SStreamTask* pTask = taosMemoryCalloc(1, size);
5,439✔
628
  if (pTask == NULL) {
5,441!
UNCOV
629
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);
×
UNCOV
630
    return terrno;
×
631
  }
632

633
  SDecoder decoder;
634
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
5,441✔
635
  code = tDecodeStreamTask(&decoder, pTask);
5,445✔
636
  tDecoderClear(&decoder);
5,444✔
637

638
  if (code != TSDB_CODE_SUCCESS) {
5,440!
UNCOV
639
    taosMemoryFree(pTask);
×
UNCOV
640
    return TSDB_CODE_INVALID_MSG;
×
641
  }
642

643
  // 2.save task, use the latest commit version as the initial start version of stream task.
644
  taskId = pTask->id.taskId;
5,440✔
645
  streamId = pTask->id.streamId;
5,440✔
646

647
  streamMetaWLock(pMeta);
5,440✔
648
  code = streamMetaRegisterTask(pMeta, sversion, pTask, &added);
5,445✔
649
  numOfTasks = streamMetaGetNumOfTasks(pMeta);
5,454✔
650
  streamMetaWUnLock(pMeta);
5,455✔
651

652
  if (code < 0) {
5,454!
UNCOV
653
    tqError("failed to add s-task:0x%x into vgId:%d meta, existed:%d, code:%s", vgId, taskId, numOfTasks,
×
654
            tstrerror(code));
655
    return code;
×
656
  }
657

658
  // added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if
659
  // it is added into the meta store
660
  if (added) {
5,454✔
661
    // only handled in the leader node
662
    if (isLeader) {
5,360✔
663
      tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
5,359✔
664

665
      if (restored) {
5,359✔
666
        SStreamTask* p = NULL;
5,341✔
667
        code = streamMetaAcquireTask(pMeta, streamId, taskId, &p);
5,341✔
668
        if ((p != NULL) && (code == 0) && (p->info.fillHistory == 0)) {
5,343!
669
          code = tqStreamStartOneTaskAsync(pMeta, cb, streamId, taskId);
3,932✔
670
        }
671

672
        if (p != NULL) {
5,343!
673
          streamMetaReleaseTask(pMeta, p);
5,343✔
674
        }
675
      } else {
676
        tqWarn("s-task:0x%x not launched since vnode(vgId:%d) not ready", taskId, vgId);
18!
677
      }
678

679
    } else {
680
      tqDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId);
1!
681
    }
682
  } else {
683
    tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store, total:%d", vgId, taskId, numOfTasks);
94!
684
  }
685

686
  return code;
5,454✔
687
}
688

689
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
3,018✔
690
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
3,018✔
691
  int32_t              code = 0;
3,018✔
692
  int32_t              vgId = pMeta->vgId;
3,018✔
693
  STaskId              hTaskId = {0};
3,018✔
694
  SStreamTask*         pTask = NULL;
3,018✔
695

696
  tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
3,018✔
697

698
  streamMetaWLock(pMeta);
3,018✔
699

700
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
3,032✔
701
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
3,032✔
702
  if (code == 0) {
3,032!
703
    if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
3,032✔
704
      hTaskId.streamId = pTask->hTaskInfo.id.streamId;
784✔
705
      hTaskId.taskId = pTask->hTaskInfo.id.taskId;
784✔
706
    }
707

708
    // clear the relationship, and then release the stream tasks, to avoid invalid accessing of already freed
709
    // related stream(history) task
710
    streamTaskSetRemoveBackendFiles(pTask);
3,032✔
711
    code = streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt);
3,020✔
712
    streamMetaReleaseTask(pMeta, pTask);
3,025✔
713

714
    if (code) {
3,034✔
715
      tqError("s-task:0x%x failed to clear related fill-history info, still exists", pReq->taskId);
6!
716
    }
717
  }
718

719
  streamMetaWUnLock(pMeta);
3,034✔
720

721
  // drop the related fill-history task firstly
722
  if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
3,032✔
723
    tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
726!
724
    code = streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
726✔
725
    if (code) {
727!
UNCOV
726
      tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x failed", pReq->taskId, vgId,
×
727
              (int32_t)hTaskId.taskId);
728
    }
729
  }
730

731
  // drop the stream task now
732
  code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
3,033✔
733
  if (code) {
3,031!
UNCOV
734
    tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId);
×
735
  }
736

737
  // commit the update
738
  streamMetaWLock(pMeta);
3,031✔
739
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
3,032✔
740
  tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
3,034✔
741

742
  if (streamMetaCommit(pMeta) < 0) {
3,034✔
743
    // persist to disk
744
  }
745

746
  streamMetaWUnLock(pMeta);
3,034✔
747
  return 0;  // always return success
3,033✔
748
}
749

750
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg) {
1,950✔
751
  SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
1,950✔
752
  int32_t                    code = 0;
1,950✔
753
  int32_t                    vgId = pMeta->vgId;
1,950✔
754
  SStreamTask*               pTask = NULL;
1,950✔
755

756
  tqDebug("vgId:%d receive msg to update-checkpoint-info for s-task:0x%x", vgId, pReq->taskId);
1,950✔
757

758
  streamMetaWLock(pMeta);
1,950✔
759

760
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
1,959✔
761
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
1,959✔
762
  if (code == 0) {
1,959!
763
    code = streamTaskUpdateTaskCheckpointInfo(pTask, restored, pReq);
1,959✔
764
    streamMetaReleaseTask(pMeta, pTask);
1,959✔
765
  } else {  // failed to get the task.
UNCOV
766
    int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
×
UNCOV
767
    tqError(
×
768
        "vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, numOfTasks:%d, it may have been "
769
        "dropped already",
770
        vgId, pReq->taskId, numOfTasks);
771
  }
772

773
  streamMetaWUnLock(pMeta);
1,959✔
774
  // always return success when handling the requirement issued by mnode during transaction.
775
  return TSDB_CODE_SUCCESS;
1,959✔
776
}
777

UNCOV
778
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
×
UNCOV
779
  int32_t vgId = pMeta->vgId;
×
UNCOV
780
  int32_t code = 0;
×
UNCOV
781
  int64_t st = taosGetTimestampMs();
×
782

UNCOV
783
  streamMetaWLock(pMeta);
×
UNCOV
784
  if (pMeta->startInfo.startAllTasks == 1) {
×
UNCOV
785
    pMeta->startInfo.restartCount += 1;
×
UNCOV
786
    tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
×
787
            pMeta->startInfo.restartCount);
788
    streamMetaWUnLock(pMeta);
×
UNCOV
789
    return TSDB_CODE_SUCCESS;
×
790
  }
791

UNCOV
792
  pMeta->startInfo.startAllTasks = 1;
×
UNCOV
793
  streamMetaWUnLock(pMeta);
×
794

UNCOV
795
  terrno = 0;
×
UNCOV
796
  tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d, ts:%" PRId64, vgId,
×
797
         pMeta->updateInfo.completeTransId, pMeta->updateInfo.completeTs);
798

UNCOV
799
  streamMetaWLock(pMeta);
×
UNCOV
800
  streamMetaClear(pMeta);
×
801

UNCOV
802
  int64_t el = taosGetTimestampMs() - st;
×
UNCOV
803
  tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.);
×
804

UNCOV
805
  streamMetaLoadAllTasks(pMeta);
×
806

807
  {
UNCOV
808
    STaskStartInfo* pStartInfo = &pMeta->startInfo;
×
UNCOV
809
    taosHashClear(pStartInfo->pReadyTaskSet);
×
UNCOV
810
    taosHashClear(pStartInfo->pFailedTaskSet);
×
UNCOV
811
    pStartInfo->readyTs = 0;
×
812
  }
813

UNCOV
814
  if (isLeader && !tsDisableStream) {
×
UNCOV
815
    streamMetaWUnLock(pMeta);
×
UNCOV
816
    code = streamMetaStartAllTasks(pMeta);
×
817
  } else {
UNCOV
818
    streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
×
UNCOV
819
    pMeta->startInfo.restartCount = 0;
×
820
    streamMetaWUnLock(pMeta);
×
821
    tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId);
×
822
  }
823

UNCOV
824
  code = terrno;
×
UNCOV
825
  return code;
×
826
}
827

828
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
38,239✔
829
  SStreamTaskRunReq* pReq = pMsg->pCont;
38,239✔
830

831
  int32_t type = pReq->reqType;
38,239✔
832
  int32_t vgId = pMeta->vgId;
38,239✔
833
  int32_t code = 0;
38,239✔
834

835
  if (type == STREAM_EXEC_T_START_ONE_TASK) {
38,239✔
836
    code = streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId);
4,008✔
837
    return 0;
4,008✔
838
  } else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
34,231✔
839
    code = streamMetaStartAllTasks(pMeta);
4,950✔
840
    return 0;
4,950✔
841
  } else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) {
29,281!
UNCOV
842
    code = restartStreamTasks(pMeta, isLeader);
×
UNCOV
843
    return 0;
×
844
  } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
29,281✔
845
    code = streamMetaStopAllTasks(pMeta);
991✔
846
    return 0;
991✔
847
  } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
28,290!
UNCOV
848
    code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId);
×
UNCOV
849
    return code;
×
850
  } else if (type == STREAM_EXEC_T_RESUME_TASK) {  // task resume to run after idle for a while
28,290✔
851
    SStreamTask* pTask = NULL;
4,439✔
852
    code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
4,439✔
853

854
    if (pTask != NULL && (code == 0)) {
4,410!
855
      char* pStatus = NULL;
4,414✔
856
      if (streamTaskReadyToRun(pTask, &pStatus)) {
4,414✔
857
        int64_t execTs = pTask->status.lastExecTs;
4,419✔
858
        int32_t idle = taosGetTimestampMs() - execTs;
4,431✔
859
        tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs);
4,431✔
860

861
        code = streamResumeTask(pTask);
4,431✔
862
      } else {
863
        int8_t status = streamTaskSetSchedStatusInactive(pTask);
11✔
UNCOV
864
        tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
×
865
                pTask->id.idStr, pStatus, status);
866
      }
867
      streamMetaReleaseTask(pMeta, pTask);
4,433✔
868
    }
869

870
    return code;
4,434✔
871
  }
872

873
  SStreamTask* pTask = NULL;
23,851✔
874
  code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
23,851✔
875
  if ((pTask != NULL) && (code == 0)) {  // even in halt status, the data in inputQ must be processed
23,859!
876
    char* p = NULL;
23,878✔
877
    if (streamTaskReadyToRun(pTask, &p)) {
23,878!
878
      tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId,
23,871✔
879
              pTask->id.idStr, p, pTask->chkInfo.nextProcessVer);
880
      (void)streamExecTask(pTask);
23,871✔
881
    } else {
UNCOV
882
      int8_t status = streamTaskSetSchedStatusInactive(pTask);
×
UNCOV
883
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
×
884
              pTask->id.idStr, p, status);
885
    }
886

887
    streamMetaReleaseTask(pMeta, pTask);
23,880✔
888
    return 0;
23,905✔
889
  } else {  // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
890
    // todo add one function to handle this
UNCOV
891
    tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, pReq->taskId);
×
892
    return code;
1✔
893
  }
894
}
895

896
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
883✔
897
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
883✔
898
  int32_t         vgId = pMeta->vgId;
883✔
899
  bool            scanWal = false;
883✔
900
  int32_t         code = 0;
883✔
901

902
  streamMetaWLock(pMeta);
883✔
903
  if (pStartInfo->startAllTasks == 1) {
883!
UNCOV
904
    tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId,
×
905
            pMeta->startInfo.restartCount);
906
  } else {  // not in starting procedure
907
    bool allReady = streamMetaAllTasksReady(pMeta);
883✔
908

909
    if ((pStartInfo->restartCount > 0) && (!allReady)) {
883!
910
      // if all tasks are ready now, do NOT restart again, and reset the value of pStartInfo->restartCount
UNCOV
911
      pStartInfo->restartCount -= 1;
×
UNCOV
912
      tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", vgId, pMeta->role,
×
913
              pStartInfo->restartCount);
914
      streamMetaWUnLock(pMeta);
×
915

916
      return restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
×
917
    } else {
918
      if (pStartInfo->restartCount == 0) {
883!
919
        tqDebug("vgId:%d start all tasks completed in callbackFn, restartCounter is 0", pMeta->vgId);
883✔
UNCOV
920
      } else if (allReady) {
×
UNCOV
921
        pStartInfo->restartCount = 0;
×
922
        tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
×
923
      }
924

925
      scanWal = true;
883✔
926
    }
927
  }
928

929
  streamMetaWUnLock(pMeta);
883✔
930

931
  if (scanWal && (vgId != SNODE_HANDLE)) {
883!
932
    tqDebug("vgId:%d start scan wal for executing tasks", vgId);
883✔
933
    code = tqScanWalAsync(pMeta->ahandle, true);
883✔
934
  }
935

936
  return code;
883✔
937
}
938

UNCOV
939
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
×
UNCOV
940
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
×
941

942
  SStreamTask* pTask = NULL;
×
UNCOV
943
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
×
944
  if (pTask == NULL || (code != 0)) {
×
945
    tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already",
×
946
            pMeta->vgId, pReq->taskId);
947
    return TSDB_CODE_SUCCESS;
×
948
  }
949

UNCOV
950
  tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
×
951

952
  streamMutexLock(&pTask->lock);
×
UNCOV
953
  streamTaskClearCheckInfo(pTask, true);
×
954

955
  // clear flag set during do checkpoint, and open inputQ for all upstream tasks
UNCOV
956
  SStreamTaskState pState = streamTaskGetStatus(pTask);
×
UNCOV
957
  if (pState.state == TASK_STATUS__CK) {
×
958
    int32_t tranId = 0;
×
959
    int64_t activeChkId = 0;
×
960
    streamTaskGetActiveCheckpointInfo(pTask, &tranId, &activeChkId);
×
961

962
    tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d",
×
963
            pTask->id.idStr, activeChkId, tranId);
964

UNCOV
965
    streamTaskSetStatusReady(pTask);
×
UNCOV
966
  } else if (pState.state == TASK_STATUS__UNINIT) {
×
967
    //    tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
968
    //    tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
UNCOV
969
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
970
  } else {
971
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
972
  }
973

UNCOV
974
  streamMutexUnlock(&pTask->lock);
×
975

976
  streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
977
  return TSDB_CODE_SUCCESS;
×
978
}
979

UNCOV
980
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
UNCOV
981
  SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont;
×
982

983
  SStreamTask* pTask = NULL;
×
UNCOV
984
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->upstreamTaskId, &pTask);
×
985
  if (pTask == NULL || (code != 0)) {
×
986
    tqError("vgId:%d process retrieve checkpoint trigger, checkpointId:%" PRId64
×
987
            " from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already",
988
            pMeta->vgId, pReq->checkpointId, (int32_t)pReq->downstreamTaskId, pReq->upstreamTaskId);
UNCOV
989
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
990
  }
991

UNCOV
992
  tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64,
×
993
          pReq->upstreamTaskId, (int32_t)pReq->downstreamTaskId, pReq->checkpointId);
994

UNCOV
995
  if (pTask->status.downstreamReady != 1) {
×
UNCOV
996
    tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
×
997
            pTask->id.idStr, (int32_t)pReq->downstreamTaskId);
998

UNCOV
999
    code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
×
1000
                                              TSDB_CODE_STREAM_TASK_IVLD_STATUS);
1001
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1002
    return code;
×
1003
  }
1004

UNCOV
1005
  SStreamTaskState pState = streamTaskGetStatus(pTask);
×
UNCOV
1006
  if (pState.state == TASK_STATUS__CK) {  // recv the checkpoint-source/trigger already
×
1007
    int32_t transId = 0;
×
1008
    int64_t checkpointId = 0;
×
1009

1010
    streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId);
×
UNCOV
1011
    if (checkpointId != pReq->checkpointId) {
×
1012
      tqError("s-task:%s invalid checkpoint-trigger retrieve msg from 0x%" PRIx64 ", current checkpointId:%" PRId64
×
1013
              " req:%" PRId64,
1014
              pTask->id.idStr, pReq->downstreamTaskId, checkpointId, pReq->checkpointId);
UNCOV
1015
      streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1016
      return TSDB_CODE_INVALID_MSG;
×
1017
    }
1018

UNCOV
1019
    if (streamTaskAlreadySendTrigger(pTask, pReq->downstreamNodeId)) {
×
1020
      // re-send the lost checkpoint-trigger msg to downstream task
1021
      tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
×
1022
              (int32_t)pReq->downstreamTaskId, checkpointId, transId);
1023
      code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
×
1024
                                                TSDB_CODE_SUCCESS);
1025
    } else {  // not send checkpoint-trigger yet, wait
UNCOV
1026
      int32_t recv = 0, total = 0;
×
UNCOV
1027
      streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
×
1028

1029
      if (recv == total) {  // add the ts info
×
UNCOV
1030
        tqWarn("s-task:%s all upstream send checkpoint-source/trigger, but not processed yet, wait", pTask->id.idStr);
×
1031
      } else {
1032
        tqWarn(
×
1033
            "s-task:%s not all upstream send checkpoint-source/trigger, total recv:%d/%d, wait for all upstream "
1034
            "sending checkpoint-source/trigger",
1035
            pTask->id.idStr, recv, total);
1036
      }
UNCOV
1037
      code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
×
1038
                                                TSDB_CODE_ACTION_IN_PROGRESS);
1039
    }
1040
  } else {  // upstream not recv the checkpoint-source/trigger till now
UNCOV
1041
    if (!(pState.state == TASK_STATUS__READY || pState.state == TASK_STATUS__HALT)) {
×
UNCOV
1042
      tqFatal("s-task:%s invalid task status:%s", pTask->id.idStr, pState.name);
×
1043
    }
1044

UNCOV
1045
    tqWarn(
×
1046
        "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
1047
        "upstream sending checkpoint-source/trigger",
1048
        pTask->id.idStr);
UNCOV
1049
    code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
×
1050
                                              TSDB_CODE_ACTION_IN_PROGRESS);
1051
  }
1052

UNCOV
1053
  streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1054
  return code;
×
1055
}
1056

UNCOV
1057
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
UNCOV
1058
  SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1059

1060
  SStreamTask* pTask = NULL;
×
UNCOV
1061
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->taskId, &pTask);
×
1062
  if (pTask == NULL || (code != 0)) {
×
1063
    tqError(
×
1064
        "vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already",
1065
        pMeta->vgId, pRsp->taskId);
UNCOV
1066
    return code;
×
1067
  }
1068

UNCOV
1069
  tqDebug(
×
1070
      "s-task:%s recv re-send checkpoint-trigger msg from through retrieve/rsp channel, upstream:0x%x, "
1071
      "checkpointId:%" PRId64 ", transId:%d",
1072
      pTask->id.idStr, pRsp->upstreamTaskId, pRsp->checkpointId, pRsp->transId);
1073

UNCOV
1074
  code = streamTaskProcessCheckpointTriggerRsp(pTask, pRsp);
×
UNCOV
1075
  streamMetaReleaseTask(pMeta, pTask);
×
1076
  return code;
×
1077
}
1078

1079
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
124✔
1080
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
124✔
1081

1082
  SStreamTask* pTask = NULL;
124✔
1083
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
124✔
1084
  if (pTask == NULL || (code != 0)) {
126!
UNCOV
1085
    tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
×
1086
            pReq->taskId);
1087
    // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
UNCOV
1088
    return TSDB_CODE_SUCCESS;
×
1089
  }
1090

1091
  tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
127✔
1092
  streamTaskPause(pTask);
127✔
1093

1094
  SStreamTask* pHistoryTask = NULL;
127✔
1095
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
127!
UNCOV
1096
    pHistoryTask = NULL;
×
UNCOV
1097
    code = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId, &pHistoryTask);
×
UNCOV
1098
    if (pHistoryTask == NULL || (code != 0)) {
×
UNCOV
1099
      tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
×
1100
              ", it may have been dropped already",
1101
              pMeta->vgId, pTask->hTaskInfo.id.taskId);
UNCOV
1102
      streamMetaReleaseTask(pMeta, pTask);
×
1103

1104
      // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
UNCOV
1105
      return TSDB_CODE_SUCCESS;
×
1106
    }
1107

UNCOV
1108
    tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
×
1109

UNCOV
1110
    streamTaskPause(pHistoryTask);
×
UNCOV
1111
    streamMetaReleaseTask(pMeta, pHistoryTask);
×
1112
  }
1113

1114
  streamMetaReleaseTask(pMeta, pTask);
127✔
1115
  return TSDB_CODE_SUCCESS;
127✔
1116
}
1117

1118
static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t sversion, int8_t igUntreated,
116✔
1119
                                       bool fromVnode) {
1120
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
116!
1121
  int32_t      vgId = pMeta->vgId;
116✔
1122
  int32_t      code = 0;
116✔
1123

1124
  streamTaskResume(pTask);
116✔
1125
  ETaskStatus status = streamTaskGetStatus(pTask).state;
120✔
1126

1127
  int32_t level = pTask->info.taskLevel;
120✔
1128
  if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
120!
1129
    // no lock needs to secure the access of the version
1130
    if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
120!
1131
      // discard all the data  when the stream task is suspended.
1132
      walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
1✔
1133
      tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
1!
1134
              ", schedStatus:%d",
1135
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1136
    } else {  // from the previous paused version and go on
1137
      tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
119✔
1138
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1139
    }
1140

1141
    if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
120!
UNCOV
1142
      pTask->hTaskInfo.operatorOpen = false;
×
UNCOV
1143
      code = streamStartScanHistoryAsync(pTask, igUntreated);
×
1144
    } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
120!
1145
      code = tqScanWalAsync((STQ*)handle, false);
60✔
1146
    } else {
1147
      code = streamTrySchedExec(pTask);
60✔
1148
    }
1149
  }
1150

1151
  return code;
119✔
1152
}
1153

1154
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode) {
114✔
1155
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
114✔
1156

1157
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
114!
1158

1159
  SStreamTask* pTask = NULL;
114✔
1160
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
114✔
1161
  if (pTask == NULL || (code != 0)) {
119!
UNCOV
1162
    tqError("s-task:0x%x failed to acquire task to resume, it may have been dropped or stopped", pReq->taskId);
×
UNCOV
1163
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1164
  }
1165

1166
  streamMutexLock(&pTask->lock);
120✔
1167
  SStreamTaskState pState = streamTaskGetStatus(pTask);
119✔
1168
  tqDebug("s-task:%s start to resume from paused, current status:%s", pTask->id.idStr, pState.name);
120✔
1169
  streamMutexUnlock(&pTask->lock);
120✔
1170

1171
  code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
120✔
1172
  if (code != 0) {
120!
UNCOV
1173
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1174
    return code;
×
1175
  }
1176

1177
  STaskId*     pHTaskId = &pTask->hTaskInfo.id;
120✔
1178
  SStreamTask* pHTask = NULL;
120✔
1179
  code = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
120✔
1180
  if (pHTask && (code == 0)) {
120!
UNCOV
1181
    streamMutexLock(&pHTask->lock);
×
UNCOV
1182
    SStreamTaskState p = streamTaskGetStatus(pHTask);
×
UNCOV
1183
    tqDebug("s-task:%s related history task start to resume from paused, current status:%s", pHTask->id.idStr, p.name);
×
UNCOV
1184
    streamMutexUnlock(&pHTask->lock);
×
1185

UNCOV
1186
    code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode);
×
UNCOV
1187
    streamMetaReleaseTask(pMeta, pHTask);
×
1188
  }
1189

1190
  return code;
120✔
1191
}
1192

UNCOV
1193
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); }
×
1194

1195
int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) {
4,052✔
1196
  rpcFreeCont(pMsg->pCont);
4,052✔
1197
  pMsg->pCont = NULL;
4,052✔
1198
  return TSDB_CODE_SUCCESS;
4,052✔
1199
}
1200

1201
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
12,311✔
1202
  return streamProcessHeartbeatRsp(pMeta, pMsg->pCont);
12,311✔
1203
}
1204

1205
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
1,370✔
1206

1207
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
2,682✔
1208

1209
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
3,005✔
1210
  SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont;
3,005✔
1211

1212
  SStreamTask* pTask = NULL;
3,005✔
1213
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->downstreamTaskId, &pTask);
3,005✔
1214
  if (pTask == NULL || (code != 0)) {
2,997!
UNCOV
1215
    tqError("vgId:%d failed to acquire task:0x%x when handling checkpoint-ready msg, it may have been dropped",
×
1216
            pRsp->downstreamNodeId, pRsp->downstreamTaskId);
1217
    return code;
×
1218
  }
1219

1220
  code = streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId);
2,998✔
1221
  streamMetaReleaseTask(pMeta, pTask);
3,003✔
1222
  return code;
3,002✔
1223
}
1224

1225
int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
76✔
1226
  int32_t                vgId = pMeta->vgId;
76✔
1227
  int32_t                code = 0;
76✔
1228
  SStreamTask*           pTask = NULL;
76✔
1229
  char*                  msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
76✔
1230
  int32_t                len = pMsg->contLen - sizeof(SMsgHead);
76✔
1231
  int64_t                now = taosGetTimestampMs();
76✔
1232
  SDecoder               decoder;
1233
  SRestoreCheckpointInfo req = {0};
76✔
1234

1235
  tDecoderInit(&decoder, (uint8_t*)msg, len);
76✔
1236
  if (tDecodeRestoreCheckpointInfo(&decoder, &req) < 0) {
76!
UNCOV
1237
    tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code));
×
UNCOV
1238
    tDecoderClear(&decoder);
×
1239
    return TSDB_CODE_SUCCESS;
×
1240
  }
1241

1242
  tDecoderClear(&decoder);
76✔
1243

1244
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
76✔
1245
  if (pTask == NULL || (code != 0)) {
76!
UNCOV
1246
    tqError("vgId:%d process consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
×
1247
            pMeta->vgId, req.taskId);
1248
    // ignore this code to avoid error code over write
UNCOV
1249
    int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
×
UNCOV
1250
    if (ret) {
×
1251
      tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret));
×
1252
    }
1253

UNCOV
1254
    return 0;
×
1255
  }
1256

1257
  // discard the rsp, since it is expired.
1258
  if (req.startTs < pTask->execInfo.created) {
76!
UNCOV
1259
    tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64
×
1260
           " from task createTs:%" PRId64 " < task createTs:%" PRId64 ", discard",
1261
           pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs,
1262
           pTask->execInfo.created);
UNCOV
1263
    streamMetaAddFailedTaskSelf(pTask, now);
×
UNCOV
1264
    streamMetaReleaseTask(pMeta, pTask);
×
1265
    return TSDB_CODE_SUCCESS;
×
1266
  }
1267

1268
  tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64 " from mnode",
76✔
1269
          pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId);
1270

1271
  streamMutexLock(&pTask->lock);
76✔
1272
  if (pTask->chkInfo.checkpointId < req.checkpointId) {
76!
UNCOV
1273
    tqFatal("s-task:%s vgId:%d invalid consensus-checkpointId:%" PRId64 ", greater than existed checkpointId:%" PRId64,
×
1274
            pTask->id.idStr, vgId, req.checkpointId, pTask->chkInfo.checkpointId);
1275

UNCOV
1276
    streamMutexUnlock(&pTask->lock);
×
UNCOV
1277
    streamMetaReleaseTask(pMeta, pTask);
×
1278
    return 0;
×
1279
  }
1280

1281
  SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo;
76✔
1282
  if (pConsenInfo->consenChkptTransId >= req.transId) {
76!
UNCOV
1283
    tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
×
1284
            pConsenInfo->consenChkptTransId, req.transId);
1285
    streamMutexUnlock(&pTask->lock);
×
UNCOV
1286
    streamMetaReleaseTask(pMeta, pTask);
×
1287
    return TSDB_CODE_SUCCESS;
×
1288
  }
1289

1290
  if (pTask->chkInfo.checkpointId != req.checkpointId) {
76!
UNCOV
1291
    tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64 " transId:%d", pTask->id.idStr, vgId,
×
1292
            pTask->chkInfo.checkpointId, req.checkpointId, req.transId);
1293
    pTask->chkInfo.checkpointId = req.checkpointId;
×
UNCOV
1294
    tqSetRestoreVersionInfo(pTask);
×
1295
  } else {
1296
    tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current id, transId:%d not update",
76✔
1297
            pTask->id.idStr, vgId, req.checkpointId, req.transId);
1298
  }
1299

1300
  streamTaskSetConsenChkptIdRecv(pTask, req.transId, now);
76✔
1301
  streamMutexUnlock(&pTask->lock);
76✔
1302

1303
  if (pMeta->role == NODE_ROLE_LEADER) {
76!
1304
    code = tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
76✔
1305
    if (code) {
76!
UNCOV
1306
      tqError("s-task:0x%x vgId:%d failed start task async, code:%s", req.taskId, vgId, tstrerror(code));
×
1307
    }
1308
  } else {
UNCOV
1309
    tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr);
×
1310
  }
1311

1312
  streamMetaReleaseTask(pMeta, pTask);
76✔
1313
  return 0;
76✔
1314
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc