• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.25
/source/dnode/vnode/src/tqCommon/tqCommon.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsgcb.h"
17
#include "tq.h"
18
#include "tstream.h"
19

20
typedef struct SMStreamCheckpointReadyRspMsg {
21
  SMsgHead head;
22
  int64_t  streamId;
23
  int32_t  upstreamTaskId;
24
  int32_t  upstreamNodeId;
25
  int32_t  downstreamTaskId;
26
  int32_t  downstreamNodeId;
27
  int64_t  checkpointId;
28
  int32_t  transId;
29
} SMStreamCheckpointReadyRspMsg;
30

31
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
32

33
int32_t tqExpandStreamTask(SStreamTask* pTask) {
14,360✔
34
  SStreamMeta* pMeta = pTask->pMeta;
14,360✔
35
  int32_t      vgId = pMeta->vgId;
14,360✔
36
  int64_t      st = taosGetTimestampMs();
14,360✔
37
  int64_t      streamId = 0;
14,360✔
38
  int32_t      taskId = 0;
14,360✔
39
  int32_t      code = 0;
14,360✔
40

41
  tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId);
14,360✔
42

43
  if (pTask->info.fillHistory != STREAM_NORMAL_TASK) {
14,360✔
44
    streamId = pTask->streamTaskId.streamId;
4,861✔
45
    taskId = pTask->streamTaskId.taskId;
4,861✔
46
  } else {
47
    streamId = pTask->id.streamId;
9,499✔
48
    taskId = pTask->id.taskId;
9,499✔
49
  }
50

51
  // sink task does not need the pState
52
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
14,360✔
53
    if (pTask->info.fillHistory == STREAM_RECALCUL_TASK) {
7,496✔
54
      pTask->pRecalState = streamStateRecalatedOpen(pMeta->path, pTask, pTask->id.streamId, pTask->id.taskId);
27✔
55
      if (pTask->pRecalState == NULL) {
27!
56
        tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
×
UNCOV
57
        return terrno;
×
58
      } else {
59
        tqDebug("s-task:%s recal state:%p", pTask->id.idStr, pTask->pRecalState);
27!
60
      }
61
    }
62

63
    pTask->pState = streamStateOpen(pMeta->path, pTask, streamId, taskId);
7,496✔
64
    if (pTask->pState == NULL) {
7,496!
UNCOV
65
      tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
×
UNCOV
66
      return terrno;
×
67
    } else {
68
      tqDebug("s-task:%s stream state:%p", pTask->id.idStr, pTask->pState);
7,496✔
69
    }
70
  }
71

72
  SReadHandle handle = {
14,360✔
73
      .checkpointId = pTask->chkInfo.checkpointId,
14,360✔
74
      .pStateBackend = NULL,
75
      .fillHistory = pTask->info.fillHistory,
14,360✔
76
      .winRange = pTask->dataRange.window,
77
      .pOtherBackend = NULL,
78
  };
79

80
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
14,360✔
81
    handle.vnode = ((STQ*)pMeta->ahandle)->pVnode;
7,201✔
82
    handle.initTqReader = 1;
7,201✔
83
  } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
7,159✔
84
    handle.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
295✔
85
  }
86

87
  initStorageAPI(&handle.api);
14,360✔
88

89
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG) {
14,360✔
90
    if (pTask->info.fillHistory == STREAM_RECALCUL_TASK) {
7,496✔
91
      handle.pStateBackend = pTask->pRecalState;
27✔
92
      handle.pOtherBackend = pTask->pState;
27✔
93
    } else {
94
      handle.pStateBackend = pTask->pState;
7,469✔
95
      handle.pOtherBackend = NULL;
7,469✔
96
    }
97

98
    code = qCreateStreamExecTaskInfo(&pTask->exec.pExecutor, pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
7,496✔
99
    if (code) {
7,495!
UNCOV
100
      tqError("s-task:%s failed to expand task, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
101
      return code;
×
102
    }
103

104
    code = qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
7,495✔
105
    if (code) {
7,493!
UNCOV
106
      return code;
×
107
    }
108

109
    code = qSetStreamNotifyInfo(pTask->exec.pExecutor, pTask->notifyInfo.notifyEventTypes,
14,986✔
110
                                pTask->notifyInfo.pSchemaWrapper, pTask->notifyInfo.stbFullName,
7,493✔
111
                                IS_NEW_SUBTB_RULE(pTask), &pTask->notifyEventStat);
7,493!
112
    if (code) {
7,490!
UNCOV
113
      tqError("s-task:%s failed to set stream notify info, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
114
      return code;
×
115
    }
116
  }
117

118
  streamSetupScheduleTrigger(pTask);
14,354✔
119

120
  double el = (taosGetTimestampMs() - st) / 1000.0;
14,353✔
121
  tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el);
14,353✔
122

123
  return code;
14,358✔
124
}
125

126
void tqSetRestoreVersionInfo(SStreamTask* pTask) {
14,727✔
127
  SCheckpointInfo* pChkInfo = &pTask->chkInfo;
14,727✔
128

129
  // checkpoint ver is the kept version, handled data should be the next version.
130
  if (pChkInfo->checkpointId != 0) {
14,727✔
131
    pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
298✔
132
    pChkInfo->processedVer = pChkInfo->checkpointVer;
298✔
133
    pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
298✔
134

135
    tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
298!
136
           pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
137
  }
138

139
  pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
14,728✔
140
}
14,728✔
141

142
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
28✔
143
  int32_t vgId = pMeta->vgId;
28✔
144
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
28✔
145
  if (numOfTasks == 0) {
28!
UNCOV
146
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
UNCOV
147
    return 0;
×
148
  }
149

150
  tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
28!
151

152
  int32_t type = restart ? STREAM_EXEC_T_RESTART_ALL_TASKS : STREAM_EXEC_T_START_ALL_TASKS;
28!
153
  return streamTaskSchedTask(cb, vgId, 0, 0, type, false);
28✔
154
}
155

156
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
9,464✔
157
  int32_t vgId = pMeta->vgId;
9,464✔
158
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
9,464✔
159
  if (numOfTasks == 0) {
9,464!
UNCOV
160
    tqDebug("vgId:%d no stream tasks existed to run", vgId);
×
UNCOV
161
    return 0;
×
162
  }
163

164
  tqDebug("vgId:%d start task:0x%x async", vgId, taskId);
9,464✔
165
  return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK, false);
9,464✔
166
}
167

168
// this is to process request from transaction, always return true.
169
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader) {
89✔
170
  int32_t      vgId = pMeta->vgId;
89✔
171
  char*        msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
89✔
172
  int32_t      len = pMsg->contLen - sizeof(SMsgHead);
89✔
173
  SRpcMsg      rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
89✔
174
  int64_t      st = taosGetTimestampMs();
89✔
175
  bool         updated = false;
89✔
176
  int32_t      code = 0;
89✔
177
  SStreamTask* pTask = NULL;
89✔
178
  SStreamTask* pHTask = NULL;
89✔
179

180
  SStreamTaskNodeUpdateMsg req = {0};
89✔
181

182
  SDecoder decoder;
183
  tDecoderInit(&decoder, (uint8_t*)msg, len);
89✔
184
  if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
89!
UNCOV
185
    rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
×
UNCOV
186
    tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code));
×
UNCOV
187
    tDecoderClear(&decoder);
×
188
    return rsp.code;
×
189
  }
190

191
  tDecoderClear(&decoder);
89✔
192

193
  int32_t gError = streamGetFatalError(pMeta);
89✔
194
  if (gError != 0) {
89!
UNCOV
195
    tqError("vgId:%d global fatal occurs, code:%s, ts:%" PRId64 " func:%s", pMeta->vgId, tstrerror(gError),
×
196
            pMeta->fatalInfo.ts, pMeta->fatalInfo.func);
UNCOV
197
    return 0;
×
198
  }
199

200
  // update the nodeEpset when it exists
201
  streamMetaWLock(pMeta);
89✔
202

203
  // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
204
  STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
89✔
205
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
89✔
206
  if (code != 0) {
89!
UNCOV
207
    tqError("vgId:%d failed to acquire task:0x%x when handling update task epset, it may have been dropped", vgId,
×
208
            req.taskId);
UNCOV
209
    rsp.code = TSDB_CODE_SUCCESS;
×
UNCOV
210
    streamMetaWUnLock(pMeta);
×
UNCOV
211
    taosArrayDestroy(req.pNodeList);
×
212
    return rsp.code;
×
213
  }
214

215
  const char* idstr = pTask->id.idStr;
89✔
216

217
  if (req.transId <= 0) {
89!
218
    tqError("vgId:%d invalid update nodeEp task, transId:%d, discard", vgId, req.taskId);
×
UNCOV
219
    rsp.code = TSDB_CODE_SUCCESS;
×
220

UNCOV
221
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
222
    streamMetaWUnLock(pMeta);
×
223

UNCOV
224
    taosArrayDestroy(req.pNodeList);
×
UNCOV
225
    return rsp.code;
×
226
  }
227

228
  // info needs to be kept till the new trans to update the nodeEp arrived.
229
  bool update = streamMetaInitUpdateTaskList(pMeta, req.transId);
89✔
230
  if (!update) {
89!
231
    rsp.code = TSDB_CODE_SUCCESS;
×
232

233
    streamMetaReleaseTask(pMeta, pTask);
×
234
    streamMetaWUnLock(pMeta);
×
235

UNCOV
236
    taosArrayDestroy(req.pNodeList);
×
UNCOV
237
    return rsp.code;
×
238
  }
239

240
  // duplicate update epset msg received, discard this redundant message
241
  STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
89✔
242

243
  void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
89✔
244
  if (pReqTask != NULL) {
89!
UNCOV
245
    tqDebug("s-task:%s (vgId:%d) already update in transId:%d, discard the nodeEp update msg", idstr, vgId,
×
246
            req.transId);
UNCOV
247
    rsp.code = TSDB_CODE_SUCCESS;
×
248

UNCOV
249
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
250
    streamMetaWUnLock(pMeta);
×
251

UNCOV
252
    taosArrayDestroy(req.pNodeList);
×
UNCOV
253
    return rsp.code;
×
254
  }
255

256
  updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
89✔
257

258
  // send the checkpoint-source-rsp for source task to end the checkpoint trans in mnode
259
  code = streamTaskSendCheckpointsourceRsp(pTask);
89✔
260
  if (code) {
89!
UNCOV
261
    tqError("%s failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
×
262
  }
263
  streamTaskResetStatus(pTask);
89✔
264

265
  streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
89✔
266

267
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
89✔
268
    code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->hTaskInfo.id, &pHTask);
29✔
269
    if (code != 0) {
29!
UNCOV
270
      tqError(
×
271
          "vgId:%d failed to acquire fill-history task:0x%x when handling update, may have been dropped already, rel "
272
          "stream task:0x%x",
273
          vgId, (uint32_t)pTask->hTaskInfo.id.taskId, req.taskId);
UNCOV
274
      CLEAR_RELATED_FILLHISTORY_TASK(pTask);
×
275
    } else {
276
      tqDebug("s-task:%s fill-history task update nodeEp along with stream task", pHTask->id.idStr);
29!
277
      bool updateEpSet = streamTaskUpdateEpsetInfo(pHTask, req.pNodeList);
29✔
278
      if (updateEpSet) {
29✔
279
        updated = updateEpSet;
25✔
280
      }
281

282
      streamTaskResetStatus(pHTask);
29✔
283
      streamTaskStopMonitorCheckRsp(&pHTask->taskCheckInfo, pHTask->id.idStr);
29✔
284
    }
285
  }
286

287
  // stream do update the nodeEp info, write it into stream meta.
288
  if (updated) {
89✔
289
    tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId);
32!
290
    code = streamMetaSaveTaskInMeta(pMeta, pTask);
32✔
291
    if (code) {
32!
UNCOV
292
      tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code));
×
293
    }
294

295
    if (pHTask != NULL) {
32✔
296
      code = streamMetaSaveTaskInMeta(pMeta, pHTask);
25✔
297
      if (code) {
25!
UNCOV
298
        tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code));
×
299
      }
300
    }
301
  } else {
302
    tqDebug("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId);
57!
303
  }
304

305
  code = streamTaskStop(pTask);
89✔
306
  if (code) {
89!
UNCOV
307
    tqError("s-task:%s vgId:%d failed to stop task, code:%s", idstr, vgId, tstrerror(code));
×
308
  }
309

310
  if (pHTask != NULL) {
89✔
311
    code = streamTaskStop(pHTask);
29✔
312
    if (code) {
29!
UNCOV
313
      tqError("s-task:%s vgId:%d failed to stop related history task, code:%s", idstr, vgId, tstrerror(code));
×
314
    }
315
  }
316

317
  // keep info
318
  streamMetaAddIntoUpdateTaskList(pMeta, pTask, (pHTask != NULL) ? (pHTask) : NULL, req.transId, st);
89✔
319
  streamMetaReleaseTask(pMeta, pTask);
89✔
320
  streamMetaReleaseTask(pMeta, pHTask);
89✔
321

322
  rsp.code = TSDB_CODE_SUCCESS;
89✔
323

324
  // possibly only handle the stream task.
325
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
89✔
326
  int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
89✔
327

328
  if (restored && isLeader) {
89✔
329
    tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId);
53!
330
    pMeta->startInfo.tasksWillRestart = 1;
53✔
331
  }
332

333
  if (updateTasks < numOfTasks) {
89✔
334
    if (isLeader) {
43✔
335
      tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
37!
336
              updateTasks, (numOfTasks - updateTasks));
337
    } else {
338
      tqDebug("vgId:%d closed tasks:%d, unclosed:%d, follower not restart tasks", vgId, updateTasks,
6!
339
              (numOfTasks - updateTasks));
340
    }
341
  } else {
342
    if ((code = streamMetaCommit(pMeta)) < 0) {
46!
343
      // always return true
UNCOV
344
      streamMetaWUnLock(pMeta);
×
UNCOV
345
      taosArrayDestroy(req.pNodeList);
×
UNCOV
346
      return TSDB_CODE_SUCCESS;
×
347
    }
348

349
    streamMetaClearSetUpdateTaskListComplete(pMeta);
46✔
350

351
    if (isLeader) {
46✔
352
      if (!restored) {
40✔
353
        tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId);
12!
354
      } else {
355
        tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId);
28!
356
#if 0
357
      taosMSleep(5000);// for test purpose, to trigger the leader election
358
#endif
359
        code = tqStreamTaskStartAsync(pMeta, cb, true);
28✔
360
        if (code) {
28!
UNCOV
361
          tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code));
×
362
        }
363
      }
364
    } else {
365
      tqDebug("vgId:%d follower nodes not restart tasks", vgId);
6!
366
    }
367
  }
368

369
  streamMetaWUnLock(pMeta);
89✔
370
  taosArrayDestroy(req.pNodeList);
89✔
371
  return rsp.code;  // always return true
89✔
372
}
373

374
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
60,157✔
375
  char*   msgStr = pMsg->pCont;
60,157✔
376
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
60,157✔
377
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
60,157✔
378

379
  SStreamDispatchReq req = {0};
60,157✔
380

381
  SDecoder decoder;
382
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
60,157✔
383
  if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
60,157!
UNCOV
384
    tDecoderClear(&decoder);
×
UNCOV
385
    return TSDB_CODE_MSG_DECODE_ERROR;
×
386
  }
387
  tDecoderClear(&decoder);
60,157✔
388

389
  tqDebug("s-task:0x%x recv dispatch msg from 0x%x(vgId:%d)", req.taskId, req.upstreamTaskId, req.upstreamNodeId);
60,157✔
390

391
  SStreamTask* pTask = NULL;
60,157✔
392
  int32_t      code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
60,157✔
393
  if (pTask && (code == 0)) {
60,155!
394
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
60,155✔
395
    if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0) {
60,155!
UNCOV
396
      return -1;
×
397
    }
398
    tCleanupStreamDispatchReq(&req);
60,156✔
399
    streamMetaReleaseTask(pMeta, pTask);
60,156✔
400
    return 0;
60,156✔
401
  } else {
UNCOV
402
    tqError("vgId:%d failed to find task:0x%x to handle the dispatch req, it may have been destroyed already",
×
403
            pMeta->vgId, req.taskId);
404

UNCOV
405
    SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
×
406
    if (pRspHead == NULL) {
1!
UNCOV
407
      tqError("s-task:0x%x send dispatch error rsp, out of memory", req.taskId);
×
UNCOV
408
      return terrno;
×
409
    }
410

411
    pRspHead->vgId = htonl(req.upstreamNodeId);
1✔
412
    if (pRspHead->vgId == 0) {
1!
UNCOV
413
      tqError("vgId:%d invalid dispatch msg from upstream to task:0x%x", pMeta->vgId, req.taskId);
×
UNCOV
414
      return TSDB_CODE_INVALID_MSG;
×
415
    }
416

417
    SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
1✔
418
    pRsp->streamId = htobe64(req.streamId);
1✔
419
    pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
1✔
420
    pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
1✔
421
    pRsp->downstreamNodeId = htonl(pMeta->vgId);
1✔
422
    pRsp->downstreamTaskId = htonl(req.taskId);
1✔
423
    pRsp->msgId = htonl(req.msgId);
1✔
424
    pRsp->stage = htobe64(req.stage);
1✔
425
    pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;
1✔
426

427
    int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
1✔
428
    SRpcMsg rsp = {.code = TSDB_CODE_STREAM_TASK_NOT_EXIST, .info = pMsg->info, .contLen = len, .pCont = pRspHead};
1✔
429
    tqError("s-task:0x%x send dispatch error rsp, no task", req.taskId);
1!
430

431
    tmsgSendRsp(&rsp);
1✔
432
    tCleanupStreamDispatchReq(&req);
1✔
433

434
    return 0;
1✔
435
  }
436
}
437

438
int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
60,155✔
439
  SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
60,155✔
440

441
  int32_t vgId = pMeta->vgId;
60,155✔
442
  pRsp->upstreamNodeId = htonl(pRsp->upstreamNodeId);
60,155✔
443
  pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
60,155✔
444
  pRsp->streamId = htobe64(pRsp->streamId);
60,155✔
445
  pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId);
60,155✔
446
  pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId);
60,155✔
447
  pRsp->stage = htobe64(pRsp->stage);
60,155✔
448
  pRsp->msgId = htonl(pRsp->msgId);
60,155✔
449

450
  tqDebug("s-task:0x%x vgId:%d recv dispatch-rsp from 0x%x vgId:%d", pRsp->upstreamTaskId, pRsp->upstreamNodeId,
60,155✔
451
          pRsp->downstreamTaskId, pRsp->downstreamNodeId);
452

453
  SStreamTask* pTask = NULL;
60,155✔
454
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId, &pTask);
60,155✔
455
  if (pTask && (code == 0)) {
60,154!
456
    code = streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
60,151✔
457
    streamMetaReleaseTask(pMeta, pTask);
60,152✔
458
    return code;
60,152✔
459
  } else {
460
    tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId);
3!
461
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
3✔
462
  }
463
}
464

465
int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
534✔
466
  char*    msgStr = pMsg->pCont;
534✔
467
  char*    msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
534✔
468
  int32_t  msgLen = pMsg->contLen - sizeof(SMsgHead);
534✔
469
  int32_t  code = 0;
534✔
470
  SDecoder decoder;
471

472
  SStreamRetrieveReq req;
473
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
534✔
474
  code = tDecodeStreamRetrieveReq(&decoder, &req);
534✔
475
  tDecoderClear(&decoder);
533✔
476

477
  if (code) {
533!
UNCOV
478
    tqError("vgId:%d failed to decode retrieve msg, discard it", pMeta->vgId);
×
UNCOV
479
    return code;
×
480
  }
481

482
  SStreamTask* pTask = NULL;
533✔
483
  code = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId, &pTask);
533✔
484
  if (pTask == NULL || code != 0) {
534!
UNCOV
485
    tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
×
486
            req.dstTaskId);
UNCOV
487
    tCleanupStreamRetrieveReq(&req);
×
488
    return code;
×
489
  }
490

491
  // enqueue
492
  tqDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d), QID:0x%" PRIx64, pTask->id.idStr,
534✔
493
          pTask->pMeta->vgId, pTask->info.taskLevel, req.srcTaskId, req.srcNodeId, req.reqId);
494

495
  // if task is in ck status, set current ck failed
496
  streamTaskSetCheckpointFailed(pTask);
534✔
497

498
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
534✔
499
    code = streamProcessRetrieveReq(pTask, &req);
528✔
500
  } else {
501
    req.srcNodeId = pTask->info.nodeId;
6✔
502
    req.srcTaskId = pTask->id.taskId;
6✔
503
    code = streamTaskBroadcastRetrieveReq(pTask, &req);
6✔
504
  }
505

506
  if (code != TSDB_CODE_SUCCESS) {  // return error not send rsp manually
534!
UNCOV
507
    tqError("s-task:0x%x vgId:%d failed to process retrieve request from 0x%x, code:%s", req.dstTaskId, req.dstNodeId,
×
508
            req.srcTaskId, tstrerror(code));
509
  } else {  // send rsp manually only on success.
510
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
534✔
511
    streamTaskSendRetrieveRsp(&req, &rsp);
534✔
512
  }
513

514
  streamMetaReleaseTask(pMeta, pTask);
534✔
515
  tCleanupStreamRetrieveReq(&req);
533✔
516

517
  // always return success, to disable the auto rsp
518
  return code;
533✔
519
}
520

521
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
23,331✔
522
  char*   msgStr = pMsg->pCont;
23,331✔
523
  char*   msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
23,331✔
524
  int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
23,331✔
525
  int32_t code = 0;
23,331✔
526

527
  SStreamTaskCheckReq req;
528
  SStreamTaskCheckRsp rsp = {0};
23,331✔
529

530
  SDecoder decoder;
531

532
  tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
23,331✔
533
  code = tDecodeStreamTaskCheckReq(&decoder, &req);
23,331✔
534
  tDecoderClear(&decoder);
23,331✔
535

536
  if (code) {
23,331!
UNCOV
537
    tqError("vgId:%d decode check msg failed, not handle this msg", pMeta->vgId);
×
538
    return code;
×
539
  }
540

541
  streamTaskProcessCheckMsg(pMeta, &req, &rsp);
23,331✔
542
  return streamTaskSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId);
23,331✔
543
}
544

545
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
23,324✔
546
  char*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
23,324✔
547
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
23,324✔
548
  int32_t vgId = pMeta->vgId;
23,324✔
549
  int32_t code = TSDB_CODE_SUCCESS;
23,324✔
550

551
  SStreamTaskCheckRsp rsp;
552

553
  SDecoder decoder;
554
  tDecoderInit(&decoder, (uint8_t*)pReq, len);
23,324✔
555
  code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
23,324✔
556
  if (code < 0) {
23,321!
UNCOV
557
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
558
    tDecoderClear(&decoder);
×
UNCOV
559
    tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno));
×
UNCOV
560
    return -1;
×
561
  }
562

563
  tDecoderClear(&decoder);
23,321✔
564
  tqDebug("tq task:0x%x (vgId:%d) recv check rsp(QID:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId,
23,324✔
565
          rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
566

567
  if (!isLeader) {
23,323!
UNCOV
568
    tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
×
569
            rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
UNCOV
570
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
×
571
  }
572

573
  SStreamTask* pTask = NULL;
23,323✔
574
  code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId, &pTask);
23,323✔
575
  if ((pTask == NULL) || (code != 0)) {
23,324!
576
    return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
27✔
577
  }
578

579
  code = streamTaskProcessCheckRsp(pTask, &rsp);
23,297✔
580
  streamMetaReleaseTask(pMeta, pTask);
23,297✔
581
  return code;
23,297✔
582
}
583

584
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
8,494✔
585
  int32_t vgId = pMeta->vgId;
8,494✔
586
  char*   msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
8,494✔
587
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
8,494✔
588
  int32_t code = 0;
8,494✔
589

590
  SStreamCheckpointReadyMsg req = {0};
8,494✔
591

592
  SDecoder decoder;
593
  tDecoderInit(&decoder, (uint8_t*)msg, len);
8,494✔
594
  if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) {
8,494!
UNCOV
595
    code = TSDB_CODE_MSG_DECODE_ERROR;
×
UNCOV
596
    tDecoderClear(&decoder);
×
UNCOV
597
    return code;
×
598
  }
599
  tDecoderClear(&decoder);
8,494✔
600

601
  SStreamTask* pTask = NULL;
8,494✔
602
  code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
8,494✔
603
  if (code != 0) {
8,494✔
604
    tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId);
2!
605
    return code;
2✔
606
  }
607

608
  if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
8,492!
UNCOV
609
    tqDebug("vgId:%d s-task:%s recv invalid the checkpoint-ready msg from task:0x%x (vgId:%d), discard", vgId,
×
610
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
UNCOV
611
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
612
    return TSDB_CODE_INVALID_MSG;
×
613
  } else {
614
    tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId,
8,492✔
615
            pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
616
  }
617

618
  code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamNodeId, req.downstreamTaskId);
8,492✔
619
  streamMetaReleaseTask(pMeta, pTask);
8,492✔
620
  if (code) {
8,492!
UNCOV
621
    return code;
×
622
  }
623

624
  {  // send checkpoint ready rsp
625
    SMStreamCheckpointReadyRspMsg* pReadyRsp = rpcMallocCont(sizeof(SMStreamCheckpointReadyRspMsg));
8,492✔
626
    if (pReadyRsp == NULL) {
8,492!
UNCOV
627
      return terrno;
×
628
    }
629

630
    pReadyRsp->upstreamTaskId = req.upstreamTaskId;
8,492✔
631
    pReadyRsp->upstreamNodeId = req.upstreamNodeId;
8,492✔
632
    pReadyRsp->downstreamTaskId = req.downstreamTaskId;
8,492✔
633
    pReadyRsp->downstreamNodeId = req.downstreamNodeId;
8,492✔
634
    pReadyRsp->checkpointId = req.checkpointId;
8,492✔
635
    pReadyRsp->streamId = req.streamId;
8,492✔
636
    pReadyRsp->head.vgId = htonl(req.downstreamNodeId);
8,492✔
637

638
    SRpcMsg rsp = {.code = 0, .info = pMsg->info, .pCont = pReadyRsp, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)};
8,492✔
639
    tmsgSendRsp(&rsp);
8,492✔
640

641
    pMsg->info.handle = NULL;  // disable auto rsp
8,492✔
642
  }
643

644
  return code;
8,492✔
645
}
646

647
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
14,163✔
648
                                     bool isLeader, bool restored) {
649
  int32_t code = 0;
14,163✔
650
  int32_t vgId = pMeta->vgId;
14,163✔
651
  int32_t numOfTasks = 0;
14,163✔
652
  int32_t taskId = -1;
14,163✔
653
  int64_t streamId = -1;
14,163✔
654
  bool    added = false;
14,163✔
655
  int32_t size = sizeof(SStreamTask);
14,163✔
656

657
  if (tsDisableStream) {
14,163!
658
    tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
×
659
    return code;
×
660
  }
661

662
  tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
14,163✔
663

664
  // 1.deserialize msg and build task
665
  SStreamTask* pTask = taosMemoryCalloc(1, size);
14,163!
666
  if (pTask == NULL) {
14,167!
UNCOV
667
    tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);
×
UNCOV
668
    return terrno;
×
669
  }
670

671
  SDecoder decoder;
672
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
14,167✔
673
  code = tDecodeStreamTask(&decoder, pTask);
14,168✔
674
  tDecoderClear(&decoder);
14,153✔
675

676
  if (code != TSDB_CODE_SUCCESS) {
14,159!
UNCOV
677
    taosMemoryFree(pTask);
×
UNCOV
678
    return TSDB_CODE_INVALID_MSG;
×
679
  }
680

681
  // 2.save task, use the latest commit version as the initial start version of stream task.
682
  taskId = pTask->id.taskId;
14,159✔
683
  streamId = pTask->id.streamId;
14,159✔
684

685
  streamMetaWLock(pMeta);
14,159✔
686
  code = streamMetaRegisterTask(pMeta, sversion, pTask, &added);
14,166✔
687
  numOfTasks = streamMetaGetNumOfTasks(pMeta);
14,178✔
688
  streamMetaWUnLock(pMeta);
14,178✔
689

690
  if (code < 0) {
14,177!
UNCOV
691
    tqError("vgId:%d failed to register s-task:0x%x into meta, existed tasks:%d, code:%s", vgId, taskId, numOfTasks,
×
692
            tstrerror(code));
UNCOV
693
    return code;
×
694
  }
695

696
  // added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if
697
  // it is added into the meta store
698
  if (added) {
14,177✔
699
    // only handled in the leader node
700
    if (isLeader) {
14,170✔
701
      tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
14,121✔
702

703
      if (restored) {
14,121✔
704
        SStreamTask* p = NULL;
14,040✔
705
        code = streamMetaAcquireTask(pMeta, streamId, taskId, &p);
14,040✔
706
        if ((p != NULL) && (code == 0) && (p->info.fillHistory == 0)) {
14,039!
707
          code = tqStreamStartOneTaskAsync(pMeta, cb, streamId, taskId);
9,232✔
708
        }
709

710
        if (p != NULL) {
14,039!
711
          streamMetaReleaseTask(pMeta, p);
14,039✔
712
        }
713
      } else {
714
        tqWarn("s-task:0x%x not launched since vnode(vgId:%d) not ready", taskId, vgId);
81!
715
      }
716

717
    } else {
718
      tqDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId);
49!
719
    }
720
  } else {
721
    tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store, total:%d", vgId, taskId, numOfTasks);
7!
722
  }
723

724
  return code;
14,179✔
725
}
726

727
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
6,992✔
728
  SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
6,992✔
729
  int32_t              code = 0;
6,992✔
730
  int32_t              vgId = pMeta->vgId;
6,992✔
731
  STaskId              hTaskId = {0};
6,992✔
732
  SStreamTask*         pTask = NULL;
6,992✔
733

734
  tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
6,992✔
735

736
  streamMetaWLock(pMeta);
6,992✔
737

738
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
6,997✔
739
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
6,997✔
740
  if (code == 0) {
7,000!
741
    if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
7,000✔
742
      hTaskId.streamId = pTask->hTaskInfo.id.streamId;
1,353✔
743
      hTaskId.taskId = pTask->hTaskInfo.id.taskId;
1,353✔
744
    }
745

746
    // clear the relationship, and then release the stream tasks, to avoid invalid accessing of already freed
747
    // related stream(history) task
748
    streamTaskSetRemoveBackendFiles(pTask);
7,000✔
749
    code = streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt);
6,954✔
750
    streamMetaReleaseTask(pMeta, pTask);
6,970✔
751

752
    if (code) {
6,997!
UNCOV
753
      tqError("s-task:0x%x failed to clear related fill-history info, still exists", pReq->taskId);
×
754
    }
755
  }
756

757
  // drop the related fill-history task firstly
758
  if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
6,997!
759
    tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
1,354✔
760
    code = streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
1,354✔
761
    if (code) {
1,352!
UNCOV
762
      tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x failed", pReq->taskId, vgId,
×
763
              (int32_t)hTaskId.taskId);
764
    }
765
  }
766

767
  // drop the stream task now
768
  code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
6,995✔
769
  if (code) {
7,000!
UNCOV
770
    tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId);
×
771
  }
772

773
  // commit the update
774
  int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
7,000✔
775
  tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
7,002✔
776

777
  if (streamMetaCommit(pMeta) < 0) {
7,002✔
778
    // persist to disk
779
  }
780

781
  streamMetaWUnLock(pMeta);
7,006✔
782
  tqDebug("vgId:%d process drop task:0x%x completed", vgId, pReq->taskId);
7,007✔
783

784
  return 0;  // always return success
7,007✔
785
}
786

787
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg) {
5,821✔
788
  SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
5,821✔
789
  int32_t                    code = 0;
5,821✔
790
  int32_t                    vgId = pMeta->vgId;
5,821✔
791
  SStreamTask*               pTask = NULL;
5,821✔
792

793
  tqDebug("vgId:%d receive msg to update-checkpoint-info for s-task:0x%x", vgId, pReq->taskId);
5,821✔
794

795
  streamMetaWLock(pMeta);
5,821✔
796

797
  STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
5,825✔
798
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
5,825✔
799
  if (code == 0) {
5,830!
800
    code = streamTaskUpdateTaskCheckpointInfo(pTask, restored, pReq);
5,830✔
801
    streamMetaReleaseTask(pMeta, pTask);
5,833✔
802
  } else {  // failed to get the task.
UNCOV
803
    int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
×
UNCOV
804
    tqError(
×
805
        "vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, numOfTasks:%d, it may have been "
806
        "dropped already",
807
        vgId, pReq->taskId, numOfTasks);
808
  }
809

810
  streamMetaWUnLock(pMeta);
5,833✔
811
  // always return success when handling the requirement issued by mnode during transaction.
812
  return TSDB_CODE_SUCCESS;
5,833✔
813
}
814

815
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
29✔
816
  int32_t vgId = pMeta->vgId;
29✔
817
  int32_t code = 0;
29✔
818
  int64_t st = taosGetTimestampMs();
29✔
819

820
  streamMetaWLock(pMeta);
29✔
821
  if (pMeta->startInfo.startAllTasks == 1) {
29✔
822
    pMeta->startInfo.restartCount += 1;
1✔
823
    tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
1!
824
            pMeta->startInfo.restartCount);
825
    streamMetaWUnLock(pMeta);
1✔
826
    return TSDB_CODE_SUCCESS;
1✔
827
  }
828

829
  pMeta->startInfo.startAllTasks = 1;
28✔
830

831
  terrno = 0;
28✔
832
  tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d, ts:%" PRId64, vgId,
28!
833
         pMeta->updateInfo.completeTransId, pMeta->updateInfo.completeTs);
834

835
  streamMetaClear(pMeta);
28✔
836

837
  int64_t el = taosGetTimestampMs() - st;
28✔
838
  tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.);
28!
839

840
  streamMetaLoadAllTasks(pMeta);
28✔
841

842
  {
843
    STaskStartInfo* pStartInfo = &pMeta->startInfo;
28✔
844
    taosHashClear(pStartInfo->pReadyTaskSet);
28✔
845
    taosHashClear(pStartInfo->pFailedTaskSet);
28✔
846
    pStartInfo->readyTs = 0;
28✔
847
  }
848

849
  if (isLeader && !tsDisableStream) {
28!
850
    streamMetaWUnLock(pMeta);
28✔
851
    code = streamMetaStartAllTasks(pMeta);
28✔
852
  } else {
UNCOV
853
    streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
×
UNCOV
854
    pMeta->startInfo.restartCount = 0;
×
UNCOV
855
    streamMetaWUnLock(pMeta);
×
856
    tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId);
×
857
  }
858

859
  code = terrno;
28✔
860
  return code;
28✔
861
}
862

863
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
98,917✔
864
  int32_t  code = 0;
98,917✔
865
  int32_t  vgId = pMeta->vgId;
98,917✔
866
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
98,917✔
867
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
98,917✔
868
  SDecoder decoder;
869

870
  SStreamTaskRunReq req = {0};
98,917✔
871
  tDecoderInit(&decoder, (uint8_t*)msg, len);
98,917✔
872
  if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
99,067!
UNCOV
873
    tqError("vgId:%d failed to decode task run req, code:%s", pMeta->vgId, tstrerror(code));
×
UNCOV
874
    tDecoderClear(&decoder);
×
UNCOV
875
    return TSDB_CODE_SUCCESS;
×
876
  }
877

878
  tDecoderClear(&decoder);
99,071✔
879

880
  int32_t type = req.reqType;
99,083✔
881
  if (type == STREAM_EXEC_T_START_ONE_TASK) {
99,083✔
882
    code = streamMetaStartOneTask(pMeta, req.streamId, req.taskId);
9,467✔
883
    return 0;
9,464✔
884
  } else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
89,616✔
885
    code = streamMetaStartAllTasks(pMeta);
10,029✔
886
    return 0;
10,030✔
887
  } else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) {
79,587✔
888
    code = restartStreamTasks(pMeta, isLeader);
28✔
889
    return 0;
28✔
890
  } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
79,559✔
891
    code = streamMetaStopAllTasks(pMeta);
4,598✔
892
    return 0;
4,599✔
893
  } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
74,961✔
894
    code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
1✔
895
    return code;
1✔
896
  } else if (type == STREAM_EXEC_T_STOP_ONE_TASK) {
74,960!
UNCOV
897
    code = streamMetaStopOneTask(pMeta, req.streamId, req.taskId);
×
UNCOV
898
    return code;
×
899
  } else if (type == STREAM_EXEC_T_RESUME_TASK) {  // task resume to run after idle for a while
74,960✔
900
    SStreamTask* pTask = NULL;
9,006✔
901
    code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
9,006✔
902

903
    if (pTask != NULL && (code == 0)) {
8,999✔
904
      char* pStatus = NULL;
8,992✔
905
      if (streamTaskReadyToRun(pTask, &pStatus)) {
8,992!
906
        int64_t execTs = pTask->status.lastExecTs;
8,992✔
907
        int32_t idle = taosGetTimestampMs() - execTs;
8,991✔
908
        tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs);
8,991✔
909

910
        code = streamResumeTask(pTask);
8,991✔
911
      } else {
UNCOV
912
        int8_t status = streamTaskSetSchedStatusInactive(pTask);
×
UNCOV
913
        tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
×
914
                pTask->id.idStr, pStatus, status);
915
      }
916
      streamMetaReleaseTask(pMeta, pTask);
8,999✔
917
    }
918

919
    return code;
9,010✔
920
  }
921

922
  SStreamTask* pTask = NULL;
65,954✔
923
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
65,954✔
924
  if ((pTask != NULL) && (code == 0)) {  // even in halt status, the data in inputQ must be processed
65,925!
925
    char* p = NULL;
65,931✔
926
    if (streamTaskReadyToRun(pTask, &p)) {
65,931✔
927
      tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId,
65,844✔
928
              pTask->id.idStr, p, pTask->chkInfo.nextProcessVer);
929
      (void)streamExecTask(pTask);
65,844✔
930
    } else {
931
      int8_t status = streamTaskSetSchedStatusInactive(pTask);
24✔
932
      tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
29✔
933
              pTask->id.idStr, p, status);
934
    }
935

936
    streamMetaReleaseTask(pMeta, pTask);
65,880✔
937
    return 0;
65,926✔
938
  } else {  // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
939
    // todo add one function to handle this
UNCOV
940
    tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, req.taskId);
×
941
    return code;
4✔
942
  }
943
}
944

945
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
92✔
946
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
92✔
947
  int32_t         vgId = pMeta->vgId;
92✔
948
  bool            scanWal = false;
92✔
949
  int32_t         code = 0;
92✔
950

951
  streamMetaWLock(pMeta);
92✔
952
  if (pStartInfo->startAllTasks == 1) {
92!
953
    tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId,
×
954
            pMeta->startInfo.restartCount);
955
  } else {  // not in starting procedure
956
    bool allReady = streamMetaAllTasksReady(pMeta);
92✔
957

958
    if ((pStartInfo->restartCount > 0) && (!allReady)) {
92!
959
      // if all tasks are ready now, do NOT restart again, and reset the value of pStartInfo->restartCount
960
      pStartInfo->restartCount -= 1;
1✔
961
      tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", vgId, pMeta->role,
1!
962
              pStartInfo->restartCount);
963
      streamMetaWUnLock(pMeta);
1✔
964

965
      return restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
1✔
966
    } else {
967
      if (pStartInfo->restartCount == 0) {
91!
968
        tqDebug("vgId:%d start all tasks completed in callbackFn, restartCounter is 0", pMeta->vgId);
91✔
969
      } else if (allReady) {
×
970
        pStartInfo->restartCount = 0;
×
971
        tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
×
972
      }
973

974
      scanWal = true;
91✔
975
    }
976
  }
977

978
  streamMetaWUnLock(pMeta);
91✔
979

980
  return code;
91✔
981
}
982

UNCOV
983
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
×
UNCOV
984
  SVResetStreamTaskReq* pReq = (SVResetStreamTaskReq*)pMsg;
×
985

986
  SStreamTask* pTask = NULL;
×
987
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
×
988
  if (pTask == NULL || (code != 0)) {
×
989
    tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already",
×
990
            pMeta->vgId, pReq->taskId);
UNCOV
991
    return TSDB_CODE_SUCCESS;
×
992
  }
993

994
  tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
×
995

UNCOV
996
  streamMutexLock(&pTask->lock);
×
997

UNCOV
998
  streamTaskSetFailedCheckpointId(pTask, pReq->chkptId);
×
999
  streamTaskClearCheckInfo(pTask, true);
×
1000

1001
  // clear flag set during do checkpoint, and open inputQ for all upstream tasks
UNCOV
1002
  SStreamTaskState pState = streamTaskGetStatus(pTask);
×
UNCOV
1003
  if (pState.state == TASK_STATUS__CK) {
×
UNCOV
1004
    streamTaskSetStatusReady(pTask);
×
UNCOV
1005
    tqDebug("s-task:%s reset checkpoint status to ready", pTask->id.idStr);
×
UNCOV
1006
  } else if (pState.state == TASK_STATUS__UNINIT) {
×
1007
    //    tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
1008
    //    tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
UNCOV
1009
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
1010
  } else {
UNCOV
1011
    tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
×
1012
  }
1013

1014
  streamMutexUnlock(&pTask->lock);
×
1015

UNCOV
1016
  streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1017
  return TSDB_CODE_SUCCESS;
×
1018
}
1019

1020
int32_t tqStreamTaskProcessAllTaskStopReq(SStreamMeta* pMeta, SMsgCb* pMsgCb, SRpcMsg* pMsg) {
4,147✔
1021
  int32_t  code = 0;
4,147✔
1022
  int32_t  vgId = pMeta->vgId;
4,147✔
1023
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
4,147✔
1024
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
4,147✔
1025
  SDecoder decoder;
1026

1027
  SStreamTaskStopReq req = {0};
4,147✔
1028
  tDecoderInit(&decoder, (uint8_t*)msg, len);
4,147✔
1029
  if ((code = tDecodeStreamTaskStopReq(&decoder, &req)) < 0) {
4,150!
UNCOV
1030
    tqError("vgId:%d failed to decode stop all streams, code:%s", pMeta->vgId, tstrerror(code));
×
UNCOV
1031
    tDecoderClear(&decoder);
×
UNCOV
1032
    return TSDB_CODE_SUCCESS;
×
1033
  }
1034

1035
  tDecoderClear(&decoder);
4,155✔
1036

1037
  // stop all stream tasks, only invoked when trying to drop db
1038
  if (req.streamId <= 0) {
4,151!
1039
    tqDebug("vgId:%d recv msg to stop all tasks in sync before dropping vnode", vgId);
4,159✔
1040
    code = streamMetaStopAllTasks(pMeta);
4,159✔
1041
    if (code) {
4,153!
UNCOV
1042
      tqError("vgId:%d failed to stop all tasks, code:%s", vgId, tstrerror(code));
×
1043
    }
1044

1045
  } else {  // stop only one stream tasks
1046

1047
  }
1048

1049
  // always return success
1050
  return TSDB_CODE_SUCCESS;
4,152✔
1051
}
1052

1053
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
UNCOV
1054
  SRetrieveChkptTriggerReq req = {0};
×
UNCOV
1055
  SStreamTask*             pTask = NULL;
×
1056
  char*                    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
UNCOV
1057
  int32_t                  len = pMsg->contLen - sizeof(SMsgHead);
×
UNCOV
1058
  SDecoder                 decoder = {0};
×
1059

UNCOV
1060
  tDecoderInit(&decoder, (uint8_t*)msg, len);
×
UNCOV
1061
  if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
×
1062
    tDecoderClear(&decoder);
×
1063
    tqError("vgId:%d invalid retrieve checkpoint-trigger req received", pMeta->vgId);
×
UNCOV
1064
    return TSDB_CODE_INVALID_MSG;
×
1065
  }
1066
  tDecoderClear(&decoder);
×
1067

1068
  int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
×
1069
  if (pTask == NULL || (code != 0)) {
×
UNCOV
1070
    tqError("vgId:%d process retrieve checkpoint-trigger, checkpointId:%" PRId64
×
1071
            " from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already",
1072
            pMeta->vgId, req.checkpointId, (int32_t)req.downstreamTaskId, req.upstreamTaskId);
1073
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
1074
  }
1075

UNCOV
1076
  tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64,
×
1077
          req.upstreamTaskId, (int32_t)req.downstreamTaskId, req.checkpointId);
1078

1079
  if (pTask->status.downstreamReady != 1) {
×
UNCOV
1080
    tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
×
1081
            pTask->id.idStr, (int32_t)req.downstreamTaskId);
1082

1083
    code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1084
                                              TSDB_CODE_STREAM_TASK_IVLD_STATUS);
UNCOV
1085
    streamMetaReleaseTask(pMeta, pTask);
×
1086
    return code;
×
1087
  }
1088

UNCOV
1089
  SStreamTaskState pState = streamTaskGetStatus(pTask);
×
1090
  if (pState.state == TASK_STATUS__CK) {  // recv the checkpoint-source/trigger already
×
UNCOV
1091
    int32_t transId = 0;
×
UNCOV
1092
    int64_t checkpointId = 0;
×
1093

1094
    streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId);
×
UNCOV
1095
    if (checkpointId != req.checkpointId) {
×
1096
      tqError("s-task:%s invalid checkpoint-trigger retrieve msg from 0x%" PRIx64 ", current checkpointId:%" PRId64
×
1097
              " req:%" PRId64,
1098
              pTask->id.idStr, req.downstreamTaskId, checkpointId, req.checkpointId);
1099
      streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1100
      return TSDB_CODE_INVALID_MSG;
×
1101
    }
1102

UNCOV
1103
    if (streamTaskAlreadySendTrigger(pTask, req.downstreamNodeId)) {
×
1104
      // re-send the lost checkpoint-trigger msg to downstream task
UNCOV
1105
      tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
×
1106
              (int32_t)req.downstreamTaskId, checkpointId, transId);
UNCOV
1107
      code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1108
                                                TSDB_CODE_SUCCESS);
1109
    } else {  // not send checkpoint-trigger yet, wait
UNCOV
1110
      int32_t recv = 0, total = 0;
×
UNCOV
1111
      streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
×
1112

UNCOV
1113
      if (recv == total) {  // add the ts info
×
UNCOV
1114
        tqWarn("s-task:%s all upstream send checkpoint-source/trigger, but not processed yet, wait", pTask->id.idStr);
×
1115
      } else {
1116
        tqWarn(
×
1117
            "s-task:%s not all upstream send checkpoint-source/trigger, total recv:%d/%d, wait for all upstream "
1118
            "sending checkpoint-source/trigger",
1119
            pTask->id.idStr, recv, total);
1120
      }
1121
      code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1122
                                                TSDB_CODE_ACTION_IN_PROGRESS);
1123
    }
1124
  } else {  // upstream not recv the checkpoint-source/trigger till now
1125
    if (!(pState.state == TASK_STATUS__READY || pState.state == TASK_STATUS__HALT)) {
×
1126
      tqFatal("s-task:%s invalid task status:%s", pTask->id.idStr, pState.name);
×
1127
    }
1128

1129
    tqWarn(
×
1130
        "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
1131
        "upstream sending checkpoint-source/trigger",
1132
        pTask->id.idStr);
1133
    code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
×
1134
                                              TSDB_CODE_ACTION_IN_PROGRESS);
1135
  }
1136

1137
  streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1138
  return code;
×
1139
}
1140

1141
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
×
UNCOV
1142
  SCheckpointTriggerRsp rsp = {0};
×
UNCOV
1143
  SStreamTask*          pTask = NULL;
×
1144
  char*                 msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
UNCOV
1145
  int32_t               len = pMsg->contLen - sizeof(SMsgHead);
×
UNCOV
1146
  SDecoder              decoder = {0};
×
1147

UNCOV
1148
  tDecoderInit(&decoder, (uint8_t*)msg, len);
×
UNCOV
1149
  if (tDecodeCheckpointTriggerRsp(&decoder, &rsp) < 0) {
×
UNCOV
1150
    tDecoderClear(&decoder);
×
UNCOV
1151
    tqError("vgId:%d invalid retrieve checkpoint-trigger rsp received", pMeta->vgId);
×
1152
    return TSDB_CODE_INVALID_MSG;
×
1153
  }
1154
  tDecoderClear(&decoder);
×
1155

UNCOV
1156
  int32_t code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.taskId, &pTask);
×
UNCOV
1157
  if (pTask == NULL || (code != 0)) {
×
UNCOV
1158
    tqError(
×
1159
        "vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already",
1160
        pMeta->vgId, rsp.taskId);
UNCOV
1161
    return code;
×
1162
  }
1163

UNCOV
1164
  tqDebug(
×
1165
      "s-task:%s recv re-send checkpoint-trigger msg through retrieve/rsp channel, upstream:0x%x, checkpointId:%" PRId64
1166
      ", transId:%d",
1167
      pTask->id.idStr, rsp.upstreamTaskId, rsp.checkpointId, rsp.transId);
1168

UNCOV
1169
  code = streamTaskProcessCheckpointTriggerRsp(pTask, &rsp);
×
UNCOV
1170
  streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1171
  return code;
×
1172
}
1173

1174
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
1,411✔
1175
  SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
1,411✔
1176

1177
  SStreamTask* pTask = NULL;
1,411✔
1178
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
1,411✔
1179
  if (pTask == NULL || (code != 0)) {
1,412!
1180
    tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
×
1181
            pReq->taskId);
1182
    // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
1183
    return TSDB_CODE_SUCCESS;
×
1184
  }
1185

1186
  tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
1,413✔
1187
  streamTaskPause(pTask);
1,413✔
1188

1189
  SStreamTask* pHistoryTask = NULL;
1,413✔
1190
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
1,413✔
1191
    pHistoryTask = NULL;
59✔
1192
    code = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId, &pHistoryTask);
59✔
1193
    if (pHistoryTask == NULL || (code != 0)) {
60!
UNCOV
1194
      tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
×
1195
              ", it may have been dropped already",
1196
              pMeta->vgId, pTask->hTaskInfo.id.taskId);
UNCOV
1197
      streamMetaReleaseTask(pMeta, pTask);
×
1198

1199
      // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
UNCOV
1200
      return TSDB_CODE_SUCCESS;
×
1201
    }
1202

1203
    tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
60!
1204

1205
    streamTaskPause(pHistoryTask);
60✔
1206
    streamMetaReleaseTask(pMeta, pHistoryTask);
60✔
1207
  }
1208

1209
  streamMetaReleaseTask(pMeta, pTask);
1,414✔
1210
  return TSDB_CODE_SUCCESS;
1,414✔
1211
}
1212

1213
static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t sversion, int8_t igUntreated,
2,674✔
1214
                                       bool fromVnode) {
1215
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
2,674✔
1216
  int32_t      vgId = pMeta->vgId;
2,674✔
1217
  int32_t      code = 0;
2,674✔
1218

1219
  streamTaskResume(pTask);
2,674✔
1220
  ETaskStatus status = streamTaskGetStatus(pTask).state;
2,685✔
1221

1222
  int32_t level = pTask->info.taskLevel;
2,685✔
1223
  if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
2,685!
1224
    // no lock needs to secure the access of the version
1225
    if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
2,685!
1226
      // discard all the data  when the stream task is suspended.
1227
      walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
589✔
1228
      tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
589✔
1229
              ", schedStatus:%d",
1230
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1231
    } else {  // from the previous paused version and go on
1232
      tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
2,096✔
1233
              vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
1234
    }
1235

1236
    if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
2,685✔
1237
      pTask->hTaskInfo.operatorOpen = false;
36✔
1238
      code = streamStartScanHistoryAsync(pTask, igUntreated);
36✔
1239
    } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
2,649✔
1240
//      code = tqScanWalAsync((STQ*)handle, false);
1241
    } else {
1242
      code = streamTrySchedExec(pTask, false);
1,349✔
1243
    }
1244
  }
1245

1246
  return code;
2,683✔
1247
}
1248

1249
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode) {
2,574✔
1250
  SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
2,574✔
1251

1252
  SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
2,574✔
1253

1254
  SStreamTask* pTask = NULL;
2,574✔
1255
  int32_t      code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
2,574✔
1256
  if (pTask == NULL || (code != 0)) {
2,578!
UNCOV
1257
    tqError("s-task:0x%x failed to acquire task to resume, it may have been dropped or stopped", pReq->taskId);
×
UNCOV
1258
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
1259
  }
1260

1261
  streamMutexLock(&pTask->lock);
2,579✔
1262
  SStreamTaskState pState = streamTaskGetStatus(pTask);
2,582✔
1263
  tqDebug("s-task:%s start to resume from paused, current status:%s", pTask->id.idStr, pState.name);
2,581✔
1264
  streamMutexUnlock(&pTask->lock);
2,581✔
1265

1266
  code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
2,579✔
1267
  if (code != 0) {
2,581!
UNCOV
1268
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1269
    return code;
×
1270
  }
1271

1272
  STaskId*     pHTaskId = &pTask->hTaskInfo.id;
2,581✔
1273
  SStreamTask* pHTask = NULL;
2,581✔
1274
  code = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
2,581✔
1275
  if (pHTask && (code == 0)) {
2,581!
1276
    streamMutexLock(&pHTask->lock);
102✔
1277
    SStreamTaskState p = streamTaskGetStatus(pHTask);
102✔
1278
    tqDebug("s-task:%s related history task start to resume from paused, current status:%s", pHTask->id.idStr, p.name);
102!
1279
    streamMutexUnlock(&pHTask->lock);
102✔
1280

1281
    code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode);
101✔
1282
    tqDebug("s-task:%s resume complete, code:%s", pHTask->id.idStr, tstrerror(code));
102!
1283

1284
    streamMetaReleaseTask(pMeta, pHTask);
102✔
1285
  }
1286

1287
  streamMetaReleaseTask(pMeta, pTask);
2,581✔
1288
  return TSDB_CODE_SUCCESS;
2,582✔
1289
}
1290

UNCOV
1291
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); }
×
1292

1293
int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) {
10,912✔
1294
  rpcFreeCont(pMsg->pCont);
10,912✔
1295
  pMsg->pCont = NULL;
10,912✔
1296
  return TSDB_CODE_SUCCESS;
10,912✔
1297
}
1298

1299
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
32,010✔
1300
  SMStreamHbRspMsg rsp = {0};
32,010✔
1301
  int32_t          code = 0;
32,010✔
1302
  SDecoder         decoder;
1303
  char*            msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
32,010✔
1304
  int32_t          len = pMsg->contLen - sizeof(SMsgHead);
32,010✔
1305

1306
  tDecoderInit(&decoder, (uint8_t*)msg, len);
32,010✔
1307
  code = tDecodeStreamHbRsp(&decoder, &rsp);
32,010✔
1308
  if (code < 0) {
32,010!
UNCOV
1309
    terrno = TSDB_CODE_INVALID_MSG;
×
UNCOV
1310
    tDecoderClear(&decoder);
×
UNCOV
1311
    tqError("vgId:%d failed to parse hb rsp msg, code:%s", pMeta->vgId, tstrerror(terrno));
×
UNCOV
1312
    return terrno;
×
1313
  }
1314

1315
  tDecoderClear(&decoder);
32,010✔
1316
  return streamProcessHeartbeatRsp(pMeta, &rsp);
32,010✔
1317
}
1318

1319
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
4,413✔
1320

1321
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
6,499✔
1322

1323
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
8,491✔
1324
  SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont;
8,491✔
1325

1326
  SStreamTask* pTask = NULL;
8,491✔
1327
  int32_t      code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->downstreamTaskId, &pTask);
8,491✔
1328
  if (pTask == NULL || (code != 0)) {
8,492!
UNCOV
1329
    tqError("vgId:%d failed to acquire task:0x%x when handling checkpoint-ready msg, it may have been dropped",
×
1330
            pRsp->downstreamNodeId, pRsp->downstreamTaskId);
UNCOV
1331
    return code;
×
1332
  }
1333

1334
  code = streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId);
8,492✔
1335
  streamMetaReleaseTask(pMeta, pTask);
8,492✔
1336
  return code;
8,492✔
1337
}
1338

1339
int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
238✔
1340
  int32_t                vgId = pMeta->vgId;
238✔
1341
  int32_t                code = 0;
238✔
1342
  SStreamTask*           pTask = NULL;
238✔
1343
  char*                  msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
238✔
1344
  int32_t                len = pMsg->contLen - sizeof(SMsgHead);
238✔
1345
  int64_t                now = taosGetTimestampMs();
238✔
1346
  SDecoder               decoder;
1347
  SRestoreCheckpointInfo req = {0};
238✔
1348

1349
  tDecoderInit(&decoder, (uint8_t*)msg, len);
238✔
1350
  if ((code = tDecodeRestoreCheckpointInfo(&decoder, &req)) < 0) {
238!
1351
    tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code));
×
UNCOV
1352
    tDecoderClear(&decoder);
×
UNCOV
1353
    return TSDB_CODE_SUCCESS;
×
1354
  }
1355

1356
  tDecoderClear(&decoder);
238✔
1357

1358
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
238✔
1359
  if (pTask == NULL || (code != 0)) {
238!
UNCOV
1360
    tqError("vgId:%d process consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
×
1361
            pMeta->vgId, req.taskId);
1362
    // ignore this code to avoid error code over write
UNCOV
1363
    int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
×
UNCOV
1364
    if (ret) {
×
UNCOV
1365
      tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret));
×
1366
    }
1367

UNCOV
1368
    return 0;
×
1369
  }
1370

1371
  // discard the rsp, since it is expired.
1372
  if (req.startTs < pTask->execInfo.created) {
238✔
1373
    tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64
6!
1374
           " from task createTs:%" PRId64 " < task createTs:%" PRId64 ", discard",
1375
           pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs,
1376
           pTask->execInfo.created);
1377
    streamMetaAddFailedTaskSelf(pTask, now);
6✔
1378
    streamMetaReleaseTask(pMeta, pTask);
6✔
1379
    return TSDB_CODE_SUCCESS;
6✔
1380
  }
1381

1382
  tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64 " from mnode",
232✔
1383
          pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId);
1384

1385
  streamMutexLock(&pTask->lock);
232✔
1386
  if (pTask->chkInfo.checkpointId < req.checkpointId) {
232!
UNCOV
1387
    tqFatal("s-task:%s vgId:%d invalid consensus-checkpointId:%" PRId64 ", greater than existed checkpointId:%" PRId64,
×
1388
            pTask->id.idStr, vgId, req.checkpointId, pTask->chkInfo.checkpointId);
1389

1390
    streamMutexUnlock(&pTask->lock);
×
1391
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1392
    return 0;
×
1393
  }
1394

1395
  SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo;
232✔
1396
  if (pConsenInfo->consenChkptTransId >= req.transId) {
232!
UNCOV
1397
    tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
×
1398
            pConsenInfo->consenChkptTransId, req.transId);
UNCOV
1399
    streamMutexUnlock(&pTask->lock);
×
UNCOV
1400
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1401
    return TSDB_CODE_SUCCESS;
×
1402
  }
1403

1404
  if (pTask->chkInfo.checkpointId != req.checkpointId) {
232✔
1405
    tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64 " transId:%d", pTask->id.idStr, vgId,
14!
1406
            pTask->chkInfo.checkpointId, req.checkpointId, req.transId);
1407
    pTask->chkInfo.checkpointId = req.checkpointId;
14✔
1408
    tqSetRestoreVersionInfo(pTask);
14✔
1409
  } else {
1410
    tqDebug("s-task:%s vgId:%d consensus-checkpointId:%" PRId64 " equals to current id, transId:%d not update",
218✔
1411
            pTask->id.idStr, vgId, req.checkpointId, req.transId);
1412
  }
1413

1414
  streamTaskSetConsenChkptIdRecv(pTask, req.transId, now);
232✔
1415
  streamMutexUnlock(&pTask->lock);
232✔
1416

1417
  if (pMeta->role == NODE_ROLE_LEADER) {
232!
1418
    code = tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
232✔
1419
    if (code) {
232!
UNCOV
1420
      tqError("s-task:0x%x vgId:%d failed start task async, code:%s", req.taskId, vgId, tstrerror(code));
×
1421
    }
1422
  } else {
UNCOV
1423
    tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr);
×
1424
  }
1425

1426
  streamMetaReleaseTask(pMeta, pTask);
232✔
1427
  return 0;
232✔
1428
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc