• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.95
/source/dnode/mnode/impl/src/mndStreamTransAct.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23

24
static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
1,414✔
25
  SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
1,414!
26
  if (pReq == NULL) {
1,414!
27
    mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
×
28
           tstrerror(TSDB_CODE_OUT_OF_MEMORY));
29
    // terrno = TSDB_CODE_OUT_OF_MEMORY;
30
    return terrno;
×
31
  }
32

33
  pReq->head.vgId = htonl(pTask->info.nodeId);
1,414✔
34
  pReq->taskId = pTask->id.taskId;
1,414✔
35
  pReq->streamId = pTask->id.streamId;
1,414✔
36

37
  SEpSet  epset = {0};
1,414✔
38
  bool    hasEpset = false;
1,414✔
39
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
1,414✔
40
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
1,414!
41
    terrno = code;
×
42
    taosMemoryFree(pReq);
×
43
    return code;
×
44
  }
45

46
  char buf[256] = {0};
1,414✔
47
  code = epsetToStr(&epset, buf, tListLen(buf));
1,414✔
48
  if (code != 0) {  // print error and continue
1,414!
49
    mError("failed to convert epset to str, code:%s", tstrerror(code));
×
50
  }
51

52
  mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
1,414✔
53
  code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
1,414✔
54
  if (code != 0) {
1,414!
55
    taosMemoryFree(pReq);
×
56
    return code;
×
57
  }
58
  return 0;
1,414✔
59
}
60

61
static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
6,966✔
62
  SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
6,966!
63
  if (pReq == NULL) {
6,966!
64
    return terrno;
×
65
  }
66

67
  pReq->head.vgId = htonl(pTask->info.nodeId);
6,966✔
68
  pReq->taskId = pTask->id.taskId;
6,966✔
69
  pReq->streamId = pTask->id.streamId;
6,966✔
70

71
  SEpSet  epset = {0};
6,966✔
72
  bool    hasEpset = false;
6,966✔
73
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
6,966✔
74
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {  // no valid epset, return directly without redoAction
6,966!
75
    return code;
×
76
  }
77

78
  // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
79
  code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
6,966✔
80
  if (code != 0) {
6,966!
81
    taosMemoryFree(pReq);
×
82
    return code;
×
83
  }
84

85
  return 0;
6,966✔
86
}
87

88
static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) {
2,583✔
89
  terrno = 0;
2,583✔
90

91
  SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
2,583!
92
  if (pReq == NULL) {
2,583!
93
    mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
×
94
           tstrerror(TSDB_CODE_OUT_OF_MEMORY));
95
    return terrno;
×
96
  }
97

98
  pReq->head.vgId = htonl(pTask->info.nodeId);
2,583✔
99
  pReq->taskId = pTask->id.taskId;
2,583✔
100
  pReq->streamId = pTask->id.streamId;
2,583✔
101
  pReq->igUntreated = igUntreated;
2,583✔
102

103
  SEpSet  epset = {0};
2,583✔
104
  bool    hasEpset = false;
2,583✔
105
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
2,583✔
106
  if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {
2,583!
107
    taosMemoryFree(pReq);
×
108
    return code;
×
109
  }
110

111
  code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
2,583✔
112
  if (code != 0) {
2,583!
113
    taosMemoryFree(pReq);
×
114
    return code;
×
115
  }
116

117
  mDebug("set the resume action for trans:%d", pTrans->id);
2,583✔
118
  return code;
2,583✔
119
}
120

121
static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) {
2✔
122
  SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
2!
123
  if (pReq == NULL) {
2!
124
    // terrno = TSDB_CODE_OUT_OF_MEMORY;
125
    return terrno;
×
126
  }
127

128
  pReq->head.vgId = htonl(pTask->nodeId);
2✔
129
  pReq->taskId = pTask->taskId;
2✔
130
  pReq->streamId = pTask->streamId;
2✔
131

132
  SEpSet  epset = {0};
2✔
133
  bool    hasEpset = false;
2✔
134
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->taskId, pTask->nodeId);
2✔
135
  if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {  // no valid epset, return directly without redoAction
2!
136
    taosMemoryFree(pReq);
×
137
    return code;
×
138
  }
139

140
  // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
141
  code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
2✔
142
  if (code != 0) {
2!
143
    taosMemoryFree(pReq);
×
144
    return code;
×
145
  }
146

147
  return 0;
2✔
148
}
149

150
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId,
55✔
151
                              int32_t transId) {
152
  int32_t code = 0;
55✔
153

154
  pMsg->streamId = pId->streamId;
55✔
155
  pMsg->taskId = pId->taskId;
55✔
156
  pMsg->transId = transId;
55✔
157
  pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
55✔
158
  if (pMsg->pNodeList == NULL) {
55!
159
    mError("failed to prepare node list, code:%s", tstrerror(terrno));
×
160
    code = terrno;
×
161
  }
162

163
  if (code == 0) {
55!
164
    void *p = taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
55✔
165
    if (p == NULL) {
55!
166
      mError("failed to add update node list into nodeList");
×
167
    }
168
  }
169
}
55✔
170

171
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
55✔
172
                                          SStreamTaskId *pId, int32_t transId) {
173
  SStreamTaskNodeUpdateMsg req = {0};
55✔
174
  initNodeUpdateMsg(&req, pInfo, pId, transId);
55✔
175

176
  int32_t code = 0;
55✔
177
  int32_t blen;
178

179
  tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
55!
180
  if (code < 0) {
55!
181
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
182
    taosArrayDestroy(req.pNodeList);
×
183
    return terrno;
×
184
  }
185

186
  int32_t tlen = sizeof(SMsgHead) + blen;
55✔
187

188
  void *buf = taosMemoryMalloc(tlen);
55!
189
  if (buf == NULL) {
55!
190
    taosArrayDestroy(req.pNodeList);
×
191
    return terrno;
×
192
  }
193

194
  void    *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
55✔
195
  SEncoder encoder;
196
  tEncoderInit(&encoder, abuf, tlen);
55✔
197
  code = tEncodeStreamTaskUpdateMsg(&encoder, &req);
55✔
198
  if (code == -1) {
55!
199
    tEncoderClear(&encoder);
×
200
    taosMemoryFree(buf);
×
201
    taosArrayDestroy(req.pNodeList);
×
202
    return code;
×
203
  }
204

205
  SMsgHead *pMsgHead = (SMsgHead *)buf;
55✔
206
  pMsgHead->contLen = htonl(tlen);
55✔
207
  pMsgHead->vgId = htonl(nodeId);
55✔
208

209
  tEncoderClear(&encoder);
55✔
210

211
  *pBuf = buf;
55✔
212
  *pLen = tlen;
55✔
213

214
  taosArrayDestroy(req.pNodeList);
55✔
215
  return TSDB_CODE_SUCCESS;
55✔
216
}
217

218
static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
55✔
219
  void   *pBuf = NULL;
55✔
220
  int32_t len = 0;
55✔
221
  SEpSet  epset = {0};
55✔
222
  bool    hasEpset = false;
55✔
223

224
  bool    unusedRet = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
55✔
225
  int32_t code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
55✔
226
  if (code) {
55!
227
    mError("failed to build stream task epset update msg, code:%s", tstrerror(code));
×
228
    return code;
×
229
  }
230

231
  code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
55✔
232
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
55!
233
    mError("failed to extract epset during create update epset, code:%s", tstrerror(code));
×
234
    taosMemoryFree(pBuf);
×
235
    return code;
×
236
  }
237

238
  code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
55✔
239
  if (code != TSDB_CODE_SUCCESS) {
55!
240
    mError("failed to create update task epset trans, code:%s", tstrerror(code));
×
241
    taosMemoryFree(pBuf);
×
242
  }
243

244
  return code;
55✔
245
}
246

247
static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
5,707✔
248
  SVUpdateCheckpointInfoReq *pReq = taosMemoryCalloc(1, sizeof(SVUpdateCheckpointInfoReq));
5,707!
249
  if (pReq == NULL) {
5,707!
250
    mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVUpdateCheckpointInfoReq),
×
251
           tstrerror(terrno));
252
    return terrno;
×
253
  }
254

255
  pReq->head.vgId = htonl(pTask->info.nodeId);
5,707✔
256
  pReq->taskId = pTask->id.taskId;
5,707✔
257
  pReq->streamId = pTask->id.streamId;
5,707✔
258

259
  SChkptReportInfo *pStreamItem = (SChkptReportInfo*)taosHashGet(execInfo.pChkptStreams, &pTask->id.streamId, sizeof(pTask->id.streamId));
5,707✔
260
  if (pStreamItem == NULL) {
5,707!
261
    return TSDB_CODE_INVALID_PARA;
×
262
  }
263

264
  int32_t size = taosArrayGetSize(pStreamItem->pTaskList);
5,707✔
265
  for(int32_t i = 0; i < size; ++i) {
38,958✔
266
    STaskChkptInfo* pInfo = taosArrayGet(pStreamItem->pTaskList, i);
33,251✔
267
    if (pInfo == NULL) {
33,251!
268
      continue;
×
269
    }
270

271
    if (pInfo->taskId == pTask->id.taskId) {
33,251✔
272
      pReq->checkpointId = pInfo->checkpointId;
5,707✔
273
      pReq->checkpointVer = pInfo->version;
5,707✔
274
      pReq->checkpointTs = pInfo->ts;
5,707✔
275
      pReq->dropRelHTask = pInfo->dropHTask;
5,707✔
276
      pReq->transId = pInfo->transId;
5,707✔
277
      pReq->hStreamId = pTask->hTaskInfo.id.streamId;
5,707✔
278
      pReq->hTaskId = pTask->hTaskInfo.id.taskId;
5,707✔
279
    }
280
  }
281

282
  SEpSet  epset = {0};
5,707✔
283
  bool    hasEpset = false;
5,707✔
284
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
5,707✔
285
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
5,707!
UNCOV
286
    taosMemoryFree(pReq);
×
UNCOV
287
    return code;
×
288
  }
289

290
  code = setTransAction(pTrans, pReq, sizeof(SVUpdateCheckpointInfoReq), TDMT_STREAM_TASK_UPDATE_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
5,707✔
291
  if (code != TSDB_CODE_SUCCESS) {
5,707!
292
    taosMemoryFree(pReq);
×
293
  }
294

295
  return code;
5,707✔
296
}
297

298
static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t chkptId) {
1✔
299
  SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
1!
300
  if (pReq == NULL) {
1!
301
    mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
×
302
           tstrerror(terrno));
303
    return terrno;
×
304
  }
305

306
  pReq->head.vgId = htonl(pTask->info.nodeId);
1✔
307
  pReq->taskId = pTask->id.taskId;
1✔
308
  pReq->streamId = pTask->id.streamId;
1✔
309
  pReq->chkptId = chkptId;
1✔
310

311
  SEpSet  epset = {0};
1✔
312
  bool    hasEpset = false;
1✔
313
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
1✔
314
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
1!
315
    taosMemoryFree(pReq);
1!
316
    return code;
1✔
317
  }
318

319
  code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
320
  if (code != TSDB_CODE_SUCCESS) {
×
321
    taosMemoryFree(pReq);
×
322
  }
323

324
  return code;
×
325
}
326

327
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
3,297✔
328
                                                 int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger) {
329
  SStreamCheckpointSourceReq req = {0};
3,297✔
330
  req.checkpointId = checkpointId;
3,297✔
331
  req.nodeId = nodeId;
3,297✔
332
  req.expireTime = -1;
3,297✔
333
  req.streamId = streamId;  // pTask->id.streamId;
3,297✔
334
  req.taskId = taskId;      // pTask->id.taskId;
3,297✔
335
  req.transId = transId;
3,297✔
336
  req.mndTrigger = mndTrigger;
3,297✔
337

338
  int32_t code;
339
  int32_t blen;
340

341
  tEncodeSize(tEncodeStreamCheckpointSourceReq, &req, blen, code);
3,297!
342
  if (code < 0) {
3,297!
343
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
344
  }
345

346
  int32_t tlen = sizeof(SMsgHead) + blen;
3,297✔
347

348
  void *buf = taosMemoryMalloc(tlen);
3,297!
349
  if (buf == NULL) {
3,297!
350
    return terrno;
×
351
  }
352

353
  void    *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
3,297✔
354
  SEncoder encoder;
355
  tEncoderInit(&encoder, abuf, tlen);
3,297✔
356
  int32_t pos = tEncodeStreamCheckpointSourceReq(&encoder, &req);
3,297✔
357
  if (pos == -1) {
3,297!
358
    tEncoderClear(&encoder);
×
359
    return TSDB_CODE_INVALID_MSG;
×
360
  }
361

362
  SMsgHead *pMsgHead = (SMsgHead *)buf;
3,297✔
363
  pMsgHead->contLen = htonl(tlen);
3,297✔
364
  pMsgHead->vgId = htonl(nodeId);
3,297✔
365

366
  tEncoderClear(&encoder);
3,297✔
367

368
  *pBuf = buf;
3,297✔
369
  *pLen = tlen;
3,297✔
370

371
  return 0;
3,297✔
372
}
373
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
293✔
374
  SStreamTaskIter *pIter = NULL;
293✔
375

376
  int32_t code = createStreamTaskIter(pStream, &pIter);
293✔
377
  if (code) {
293!
378
    mError("failed to create stream task iter:%s", pStream->name);
×
379
    return code;
×
380
  }
381

382
  while (streamTaskIterNextTask(pIter)) {
1,707✔
383
    SStreamTask *pTask = NULL;
1,414✔
384
    code = streamTaskIterGetCurrent(pIter, &pTask);
1,414✔
385
    if (code) {
1,414!
386
      destroyStreamTaskIter(pIter);
×
387
      return code;
×
388
    }
389

390
    code = doSetPauseAction(pMnode, pTrans, pTask);
1,414✔
391
    if (code) {
1,414!
392
      destroyStreamTaskIter(pIter);
×
393
      return code;
×
394
    }
395

396
    if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
1,414✔
397
      atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus);
1,410✔
398
      atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
1,410✔
399
    }
400
  }
401

402
  destroyStreamTaskIter(pIter);
293✔
403
  return code;
293✔
404
}
405

406
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
1,269✔
407
  SStreamTaskIter *pIter = NULL;
1,269✔
408

409
  int32_t code = createStreamTaskIter(pStream, &pIter);
1,269✔
410
  if (code) {
1,269!
411
    mError("failed to create stream task iter:%s", pStream->name);
×
412
    return code;
×
413
  }
414

415
  while(streamTaskIterNextTask(pIter)) {
8,235✔
416
    SStreamTask *pTask = NULL;
6,966✔
417
    code = streamTaskIterGetCurrent(pIter, &pTask);
6,966✔
418
    if (code) {
6,966!
419
      destroyStreamTaskIter(pIter);
×
420
      return code;
×
421
    }
422

423
    code = doSetDropAction(pMnode, pTrans, pTask);
6,966✔
424
    if (code) {
6,966!
425
      destroyStreamTaskIter(pIter);
×
426
      return code;
×
427
    }
428
  }
429
  destroyStreamTaskIter(pIter);
1,269✔
430
  return 0;
1,269✔
431
}
432

433
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
544✔
434
  SStreamTaskIter *pIter = NULL;
544✔
435
  int32_t          code = createStreamTaskIter(pStream, &pIter);
544✔
436
  if (code) {
544!
437
    mError("failed to create stream task iter:%s", pStream->name);
×
438
    return code;
×
439
  }
440

441
  while (streamTaskIterNextTask(pIter)) {
3,127✔
442
    SStreamTask *pTask = NULL;
2,583✔
443
    code = streamTaskIterGetCurrent(pIter, &pTask);
2,583✔
444
    if (code || pTask == NULL) {
2,583!
445
      destroyStreamTaskIter(pIter);
×
446
      return code;
×
447
    }
448

449
    code = doSetResumeAction(pTrans, pMnode, pTask, igUntreated);
2,583✔
450
    if (code) {
2,583!
451
      destroyStreamTaskIter(pIter);
×
452
      return code;
×
453
    }
454

455
    if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
2,583✔
456
      atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup);
1,403✔
457
    }
458
  }
459
  destroyStreamTaskIter(pIter);
544✔
460
  return 0;
544✔
461
}
462

463
// build trans to update the epset
464
int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
9✔
465
  mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
9✔
466
  SStreamTaskIter *pIter = NULL;
9✔
467

468
  taosWLockLatch(&pStream->lock);
9✔
469
  int32_t code = createStreamTaskIter(pStream, &pIter);
9✔
470
  if (code) {
9!
471
    taosWUnLockLatch(&pStream->lock);
×
472
    mError("failed to create stream task iter:%s", pStream->name);
×
473
    return code;
×
474
  }
475

476
  while (streamTaskIterNextTask(pIter)) {
64✔
477
    SStreamTask *pTask = NULL;
55✔
478
    code = streamTaskIterGetCurrent(pIter, &pTask);
55✔
479
    if (code) {
55!
480
      destroyStreamTaskIter(pIter);
×
481
      taosWUnLockLatch(&pStream->lock);
×
482
      return code;
×
483
    }
484

485
    code = doSetUpdateTaskAction(pMnode, pTrans, pTask, pInfo);
55✔
486
    if (code != TSDB_CODE_SUCCESS) {
55!
487
      destroyStreamTaskIter(pIter);
×
488
      taosWUnLockLatch(&pStream->lock);
×
489
      return code;
×
490
    }
491
  }
492

493
  destroyStreamTaskIter(pIter);
9✔
494
  taosWUnLockLatch(&pStream->lock);
9✔
495
  return 0;
9✔
496
}
497

498
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
1,226✔
499
  SStreamTaskIter *pIter = NULL;
1,226✔
500

501
  taosWLockLatch(&pStream->lock);
1,226✔
502
  int32_t code = createStreamTaskIter(pStream, &pIter);
1,226✔
503
  if (code) {
1,226!
504
    taosWUnLockLatch(&pStream->lock);
×
505
    mError("failed to create stream task iter:%s", pStream->name);
×
506
    return code;
×
507
  }
508

509
  while (streamTaskIterNextTask(pIter)) {
6,933✔
510
    SStreamTask *pTask = NULL;
5,707✔
511
    code = streamTaskIterGetCurrent(pIter, &pTask);
5,707✔
512
    if (code) {
5,707!
513
      destroyStreamTaskIter(pIter);
×
514
      taosWUnLockLatch(&pStream->lock);
×
515
      return code;
×
516
    }
517

518
    code = doSetUpdateChkptAction(pMnode, pTrans, pTask);
5,707✔
519
    if (code != TSDB_CODE_SUCCESS) {
5,707!
520
      destroyStreamTaskIter(pIter);
×
521
      taosWUnLockLatch(&pStream->lock);
×
522
      return code;
×
523
    }
524
  }
525

526
  destroyStreamTaskIter(pIter);
1,226✔
527
  taosWUnLockLatch(&pStream->lock);
1,226✔
528
  return code;
1,226✔
529
}
530

531
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* pList) {
2✔
532
  for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
4✔
533
    SOrphanTask* pTask = taosArrayGet(pList, i);
2✔
534
    if (pTask == NULL) {
2!
535
      return terrno;
×
536
    }
537

538
    int32_t code = doSetDropActionFromId(pMnode, pTrans, pTask);
2✔
539
    if (code != 0) {
2!
540
      return code;
×
541
    } else {
542
      mDebug("add drop task:0x%x action to drop orphan task", pTask->taskId);
2!
543
    }
544
  }
545
  return 0;
2✔
546
}
547

548
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId) {
1✔
549
  SStreamTaskIter *pIter = NULL;
1✔
550

551
  taosWLockLatch(&pStream->lock);
1✔
552
  int32_t code = createStreamTaskIter(pStream, &pIter);
1✔
553
  if (code) {
1!
554
    taosWUnLockLatch(&pStream->lock);
×
555
    mError("failed to create stream task iter:%s", pStream->name);
×
556
    return code;
×
557
  }
558

559
  while (streamTaskIterNextTask(pIter)) {
2✔
560
    SStreamTask *pTask = NULL;
1✔
561
    code = streamTaskIterGetCurrent(pIter, &pTask);
1✔
562
    if (code) {
1!
563
      destroyStreamTaskIter(pIter);
×
564
      taosWUnLockLatch(&pStream->lock);
×
565
      return code;
×
566
    }
567

568
    code = doSetResetAction(pMnode, pTrans, pTask, chkptId);
1✔
569
    if (code != TSDB_CODE_SUCCESS) {
1!
570
      destroyStreamTaskIter(pIter);
×
571
      taosWUnLockLatch(&pStream->lock);
×
572
      return code;
×
573
    }
574
  }
575

576
  destroyStreamTaskIter(pIter);
1✔
577
  taosWUnLockLatch(&pStream->lock);
1✔
578
  return 0;
1✔
579
}
580

581
int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts) {
240✔
582
  SRestoreCheckpointInfo req = {
240✔
583
      .taskId = pTask->id.taskId,
240✔
584
      .streamId = pTask->id.streamId,
240✔
585
      .checkpointId = checkpointId,
586
      .startTs = ts,
587
      .nodeId = pTask->info.nodeId,
240✔
588
      .transId = pTrans->id,
240✔
589
  };
590

591
  int32_t code = 0;
240✔
592
  int32_t blen;
593
  tEncodeSize(tEncodeRestoreCheckpointInfo, &req, blen, code);
240!
594
  if (code < 0) {
240!
595
    return terrno;
×
596
  }
597

598
  int32_t tlen = sizeof(SMsgHead) + blen;
240✔
599

600
  void *pBuf = taosMemoryMalloc(tlen);
240!
601
  if (pBuf == NULL) {
240!
602
    return terrno;
×
603
  }
604

605
  void    *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
240✔
606
  SEncoder encoder;
607
  tEncoderInit(&encoder, abuf, tlen);
240✔
608
  code = tEncodeRestoreCheckpointInfo(&encoder, &req);
240✔
609
  tEncoderClear(&encoder);
240✔
610
  if (code < 0) {
240!
611
    taosMemoryFree(pBuf);
×
612
    return code;
×
613
  }
614

615
  SMsgHead *pMsgHead = (SMsgHead *)pBuf;
240✔
616
  pMsgHead->contLen = htonl(tlen);
240✔
617
  pMsgHead->vgId = htonl(pTask->info.nodeId);
240✔
618

619
  SEpSet  epset = {0};
240✔
620
  bool    hasEpset = false;
240✔
621
  code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
240✔
622
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
240!
623
    taosMemoryFree(pBuf);
×
624
    return code;
×
625
  }
626

627
  code = setTransAction(pTrans, pBuf, tlen, TDMT_STREAM_CONSEN_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
240✔
628
  if (code != TSDB_CODE_SUCCESS) {
240!
629
    taosMemoryFree(pBuf);
×
630
  }
631

632
  return code;
240✔
633
}
634

635

636
int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,
3,297✔
637
                                     int8_t mndTrigger) {
638
  void   *buf;
639
  int32_t tlen;
640
  int32_t code = 0;
3,297✔
641
  SEpSet  epset = {0};
3,297✔
642
  bool    hasEpset = false;
3,297✔
643

644
  if ((code = mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
3,297!
645
                                                pTask->id.taskId, pTrans->id, mndTrigger)) < 0) {
646
    taosMemoryFree(buf);
×
647
    return code;
×
648
  }
649

650
  code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
3,297✔
651
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
3,297!
652
    taosMemoryFree(buf);
×
653
    return code;
×
654
  }
655

656
  code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY,
3,297✔
657
                        TSDB_CODE_VND_INVALID_VGROUP_ID);
658
  if (code != 0) {
3,297!
659
    taosMemoryFree(buf);
×
660
  }
661

662
  return code;
3,297✔
663
}
664

665
int32_t mndStreamSetRestartAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pStream) {
×
666
  return 0;
×
667
}
668

669

670
static int32_t doSetStopAllTasksAction(SMnode* pMnode, STrans* pTrans, SVgObj* pVgObj) {
4,348✔
671
  void   *pBuf = NULL;
4,348✔
672
  int32_t len = 0;
4,348✔
673
  int32_t code = 0;
4,348✔
674
  SEncoder encoder;
675

676
  SStreamTaskStopReq req = {.streamId = -1};
4,348✔
677
  tEncodeSize(tEncodeStreamTaskStopReq, &req, len, code);
4,348!
678
  if (code < 0) {
4,348!
679
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
680
    return terrno;
×
681
  }
682

683
  int32_t tlen = sizeof(SMsgHead) + len;
4,348✔
684

685
  pBuf = taosMemoryMalloc(tlen);
4,348!
686
  if (pBuf == NULL) {
4,348!
687
    return terrno;
×
688
  }
689

690
  void    *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
4,348✔
691
  tEncoderInit(&encoder, abuf, tlen);
4,348✔
692
  code = tEncodeStreamTaskStopReq(&encoder, &req);
4,348✔
693
  if (code == -1) {
4,348!
694
    tEncoderClear(&encoder);
×
695
    taosMemoryFree(pBuf);
×
696
    return code;
×
697
  }
698

699
  SMsgHead *pMsgHead = (SMsgHead *)pBuf;
4,348✔
700
  pMsgHead->contLen = htonl(tlen);
4,348✔
701
  pMsgHead->vgId = htonl(pVgObj->vgId);
4,348✔
702

703
  tEncoderClear(&encoder);
4,348✔
704

705
  SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
4,348✔
706
  mndReleaseVgroup(pMnode, pVgObj);
4,348✔
707

708
  code = setTransAction(pTrans, pBuf, tlen, TDMT_VND_STREAM_ALL_STOP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
4,348✔
709
  if (code != TSDB_CODE_SUCCESS) {
4,348!
710
    mError("failed to create stop all streams trans, code:%s", tstrerror(code));
×
711
    taosMemoryFree(pBuf);
×
712
  }
713

714
  return 0;
4,348✔
715
}
716

717
int32_t mndStreamSetStopStreamTasksActions(SMnode* pMnode, STrans *pTrans, uint64_t dbUid) {
1,922✔
718
  int32_t code = 0;
1,922✔
719
  SSdb   *pSdb = pMnode->pSdb;
1,922✔
720
  void   *pIter = NULL;
1,922✔
721

722
  while (1) {
17,821✔
723
    SVgObj *pVgroup = NULL;
19,743✔
724
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
19,743✔
725
    if (pIter == NULL) break;
19,743✔
726

727
    if (pVgroup->dbUid == dbUid) {
17,821✔
728
      if ((code = doSetStopAllTasksAction(pMnode, pTrans, pVgroup)) != 0) {
4,348!
729
        sdbCancelFetch(pSdb, pIter);
×
730
        sdbRelease(pSdb, pVgroup);
×
731
        TAOS_RETURN(code);
×
732
      }
733
    }
734

735
    sdbRelease(pSdb, pVgroup);
17,821✔
736
  }
737

738
  TAOS_RETURN(code);
1,922✔
739
  return 0;
740
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc