• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3663

19 Mar 2025 09:21AM UTC coverage: 61.664% (-0.6%) from 62.28%
#3663

push

travis-ci

web-flow
docs: add defination of tmq_config_res_t & fix spell error (#30271)

153169 of 318241 branches covered (48.13%)

Branch coverage included in aggregate %.

239405 of 318390 relevant lines covered (75.19%)

5762846.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.59
/source/dnode/mnode/impl/src/mndStreamTransAct.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23

24
static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
750✔
25
  SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
750!
26
  if (pReq == NULL) {
750!
27
    mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
×
28
           tstrerror(TSDB_CODE_OUT_OF_MEMORY));
29
    // terrno = TSDB_CODE_OUT_OF_MEMORY;
30
    return terrno;
×
31
  }
32

33
  pReq->head.vgId = htonl(pTask->info.nodeId);
750✔
34
  pReq->taskId = pTask->id.taskId;
750✔
35
  pReq->streamId = pTask->id.streamId;
750✔
36

37
  SEpSet  epset = {0};
750✔
38
  bool    hasEpset = false;
750✔
39
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
750✔
40
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
750!
41
    terrno = code;
×
42
    taosMemoryFree(pReq);
×
43
    return code;
×
44
  }
45

46
  char buf[256] = {0};
750✔
47
  code = epsetToStr(&epset, buf, tListLen(buf));
750✔
48
  if (code != 0) {  // print error and continue
750!
49
    mError("failed to convert epset to str, code:%s", tstrerror(code));
×
50
  }
51

52
  mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
750✔
53
  code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
750✔
54
  if (code != 0) {
750!
55
    taosMemoryFree(pReq);
×
56
    return code;
×
57
  }
58
  return 0;
750✔
59
}
60

61
static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
6,905✔
62
  SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
6,905!
63
  if (pReq == NULL) {
6,905!
64
    return terrno;
×
65
  }
66

67
  pReq->head.vgId = htonl(pTask->info.nodeId);
6,905✔
68
  pReq->taskId = pTask->id.taskId;
6,905✔
69
  pReq->streamId = pTask->id.streamId;
6,905✔
70

71
  SEpSet  epset = {0};
6,905✔
72
  bool    hasEpset = false;
6,905✔
73
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
6,905✔
74
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {  // no valid epset, return directly without redoAction
6,905!
75
    return code;
×
76
  }
77

78
  // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
79
  code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
6,905✔
80
  if (code != 0) {
6,905!
81
    taosMemoryFree(pReq);
×
82
    return code;
×
83
  }
84

85
  return 0;
6,905✔
86
}
87

88
static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) {
1,247✔
89
  terrno = 0;
1,247✔
90

91
  SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
1,247!
92
  if (pReq == NULL) {
1,247!
93
    mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
×
94
           tstrerror(TSDB_CODE_OUT_OF_MEMORY));
95
    return terrno;
×
96
  }
97

98
  pReq->head.vgId = htonl(pTask->info.nodeId);
1,247✔
99
  pReq->taskId = pTask->id.taskId;
1,247✔
100
  pReq->streamId = pTask->id.streamId;
1,247✔
101
  pReq->igUntreated = igUntreated;
1,247✔
102

103
  SEpSet  epset = {0};
1,247✔
104
  bool    hasEpset = false;
1,247✔
105
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
1,247✔
106
  if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {
1,247!
107
    taosMemoryFree(pReq);
×
108
    return code;
×
109
  }
110

111
  code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
1,247✔
112
  if (code != 0) {
1,247!
113
    taosMemoryFree(pReq);
×
114
    return code;
×
115
  }
116
  return code;
1,247✔
117
}
118

119
static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) {
×
120
  SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
×
121
  if (pReq == NULL) {
×
122
    // terrno = TSDB_CODE_OUT_OF_MEMORY;
123
    return terrno;
×
124
  }
125

126
  pReq->head.vgId = htonl(pTask->nodeId);
×
127
  pReq->taskId = pTask->taskId;
×
128
  pReq->streamId = pTask->streamId;
×
129

130
  SEpSet  epset = {0};
×
131
  bool    hasEpset = false;
×
132
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->taskId, pTask->nodeId);
×
133
  if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {  // no valid epset, return directly without redoAction
×
134
    taosMemoryFree(pReq);
×
135
    return code;
×
136
  }
137

138
  // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
139
  code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
140
  if (code != 0) {
×
141
    taosMemoryFree(pReq);
×
142
    return code;
×
143
  }
144

145
  return 0;
×
146
}
147

148
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId,
26✔
149
                              int32_t transId) {
150
  int32_t code = 0;
26✔
151

152
  pMsg->streamId = pId->streamId;
26✔
153
  pMsg->taskId = pId->taskId;
26✔
154
  pMsg->transId = transId;
26✔
155
  pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
26✔
156
  if (pMsg->pNodeList == NULL) {
26!
157
    mError("failed to prepare node list, code:%s", tstrerror(terrno));
×
158
    code = terrno;
×
159
  }
160

161
  if (code == 0) {
26!
162
    void *p = taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
26✔
163
    if (p == NULL) {
26!
164
      mError("failed to add update node list into nodeList");
×
165
    }
166
  }
167
}
26✔
168

169
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
26✔
170
                                          SStreamTaskId *pId, int32_t transId) {
171
  SStreamTaskNodeUpdateMsg req = {0};
26✔
172
  initNodeUpdateMsg(&req, pInfo, pId, transId);
26✔
173

174
  int32_t code = 0;
26✔
175
  int32_t blen;
176

177
  tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
26!
178
  if (code < 0) {
26!
179
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
180
    taosArrayDestroy(req.pNodeList);
×
181
    return terrno;
×
182
  }
183

184
  int32_t tlen = sizeof(SMsgHead) + blen;
26✔
185

186
  void *buf = taosMemoryMalloc(tlen);
26!
187
  if (buf == NULL) {
26!
188
    taosArrayDestroy(req.pNodeList);
×
189
    return terrno;
×
190
  }
191

192
  void    *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
26✔
193
  SEncoder encoder;
194
  tEncoderInit(&encoder, abuf, tlen);
26✔
195
  code = tEncodeStreamTaskUpdateMsg(&encoder, &req);
26✔
196
  if (code == -1) {
26!
197
    tEncoderClear(&encoder);
×
198
    taosMemoryFree(buf);
×
199
    taosArrayDestroy(req.pNodeList);
×
200
    return code;
×
201
  }
202

203
  SMsgHead *pMsgHead = (SMsgHead *)buf;
26✔
204
  pMsgHead->contLen = htonl(tlen);
26✔
205
  pMsgHead->vgId = htonl(nodeId);
26✔
206

207
  tEncoderClear(&encoder);
26✔
208

209
  *pBuf = buf;
26✔
210
  *pLen = tlen;
26✔
211

212
  taosArrayDestroy(req.pNodeList);
26✔
213
  return TSDB_CODE_SUCCESS;
26✔
214
}
215

216
static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
26✔
217
  void   *pBuf = NULL;
26✔
218
  int32_t len = 0;
26✔
219
  SEpSet  epset = {0};
26✔
220
  bool    hasEpset = false;
26✔
221

222
  bool    unusedRet = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
26✔
223
  int32_t code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
26✔
224
  if (code) {
26!
225
    mError("failed to build stream task epset update msg, code:%s", tstrerror(code));
×
226
    return code;
×
227
  }
228

229
  code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
26✔
230
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
26!
231
    mError("failed to extract epset during create update epset, code:%s", tstrerror(code));
×
232
    taosMemoryFree(pBuf);
×
233
    return code;
×
234
  }
235

236
  code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
26✔
237
  if (code != TSDB_CODE_SUCCESS) {
26!
238
    mError("failed to create update task epset trans, code:%s", tstrerror(code));
×
239
    taosMemoryFree(pBuf);
×
240
  }
241

242
  return code;
26✔
243
}
244

245
static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
3,451✔
246
  SVUpdateCheckpointInfoReq *pReq = taosMemoryCalloc(1, sizeof(SVUpdateCheckpointInfoReq));
3,451!
247
  if (pReq == NULL) {
3,451!
248
    mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVUpdateCheckpointInfoReq),
×
249
           tstrerror(terrno));
250
    return terrno;
×
251
  }
252

253
  pReq->head.vgId = htonl(pTask->info.nodeId);
3,451✔
254
  pReq->taskId = pTask->id.taskId;
3,451✔
255
  pReq->streamId = pTask->id.streamId;
3,451✔
256

257
  SChkptReportInfo *pStreamItem = (SChkptReportInfo*)taosHashGet(execInfo.pChkptStreams, &pTask->id.streamId, sizeof(pTask->id.streamId));
3,451✔
258
  if (pStreamItem == NULL) {
3,451!
259
    return TSDB_CODE_INVALID_PARA;
×
260
  }
261

262
  int32_t size = taosArrayGetSize(pStreamItem->pTaskList);
3,451✔
263
  for(int32_t i = 0; i < size; ++i) {
27,074✔
264
    STaskChkptInfo* pInfo = taosArrayGet(pStreamItem->pTaskList, i);
23,623✔
265
    if (pInfo == NULL) {
23,623!
266
      continue;
×
267
    }
268

269
    if (pInfo->taskId == pTask->id.taskId) {
23,623✔
270
      pReq->checkpointId = pInfo->checkpointId;
3,451✔
271
      pReq->checkpointVer = pInfo->version;
3,451✔
272
      pReq->checkpointTs = pInfo->ts;
3,451✔
273
      pReq->dropRelHTask = pInfo->dropHTask;
3,451✔
274
      pReq->transId = pInfo->transId;
3,451✔
275
      pReq->hStreamId = pTask->hTaskInfo.id.streamId;
3,451✔
276
      pReq->hTaskId = pTask->hTaskInfo.id.taskId;
3,451✔
277
    }
278
  }
279

280
  SEpSet  epset = {0};
3,451✔
281
  bool    hasEpset = false;
3,451✔
282
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
3,451✔
283
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
3,451!
284
    taosMemoryFree(pReq);
×
285
    return code;
×
286
  }
287

288
  code = setTransAction(pTrans, pReq, sizeof(SVUpdateCheckpointInfoReq), TDMT_STREAM_TASK_UPDATE_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
3,451✔
289
  if (code != TSDB_CODE_SUCCESS) {
3,451!
290
    taosMemoryFree(pReq);
×
291
  }
292

293
  return code;
3,451✔
294
}
295

296
static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t chkptId) {
1✔
297
  SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
1!
298
  if (pReq == NULL) {
1!
299
    mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
×
300
           tstrerror(terrno));
301
    return terrno;
×
302
  }
303

304
  pReq->head.vgId = htonl(pTask->info.nodeId);
1✔
305
  pReq->taskId = pTask->id.taskId;
1✔
306
  pReq->streamId = pTask->id.streamId;
1✔
307
  pReq->chkptId = chkptId;
1✔
308

309
  SEpSet  epset = {0};
1✔
310
  bool    hasEpset = false;
1✔
311
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
1✔
312
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
1!
313
    taosMemoryFree(pReq);
1!
314
    return code;
1✔
315
  }
316

317
  code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
318
  if (code != TSDB_CODE_SUCCESS) {
×
319
    taosMemoryFree(pReq);
×
320
  }
321

322
  return code;
×
323
}
324

325
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
2,108✔
326
                                                 int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger) {
327
  SStreamCheckpointSourceReq req = {0};
2,108✔
328
  req.checkpointId = checkpointId;
2,108✔
329
  req.nodeId = nodeId;
2,108✔
330
  req.expireTime = -1;
2,108✔
331
  req.streamId = streamId;  // pTask->id.streamId;
2,108✔
332
  req.taskId = taskId;      // pTask->id.taskId;
2,108✔
333
  req.transId = transId;
2,108✔
334
  req.mndTrigger = mndTrigger;
2,108✔
335

336
  int32_t code;
337
  int32_t blen;
338

339
  tEncodeSize(tEncodeStreamCheckpointSourceReq, &req, blen, code);
2,108!
340
  if (code < 0) {
2,108!
341
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
342
  }
343

344
  int32_t tlen = sizeof(SMsgHead) + blen;
2,108✔
345

346
  void *buf = taosMemoryMalloc(tlen);
2,108!
347
  if (buf == NULL) {
2,108!
348
    return terrno;
×
349
  }
350

351
  void    *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
2,108✔
352
  SEncoder encoder;
353
  tEncoderInit(&encoder, abuf, tlen);
2,108✔
354
  int32_t pos = tEncodeStreamCheckpointSourceReq(&encoder, &req);
2,108✔
355
  if (pos == -1) {
2,108!
356
    tEncoderClear(&encoder);
×
357
    return TSDB_CODE_INVALID_MSG;
×
358
  }
359

360
  SMsgHead *pMsgHead = (SMsgHead *)buf;
2,108✔
361
  pMsgHead->contLen = htonl(tlen);
2,108✔
362
  pMsgHead->vgId = htonl(nodeId);
2,108✔
363

364
  tEncoderClear(&encoder);
2,108✔
365

366
  *pBuf = buf;
2,108✔
367
  *pLen = tlen;
2,108✔
368

369
  return 0;
2,108✔
370
}
371
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
127✔
372
  SStreamTaskIter *pIter = NULL;
127✔
373

374
  int32_t code = createStreamTaskIter(pStream, &pIter);
127✔
375
  if (code) {
127!
376
    mError("failed to create stream task iter:%s", pStream->name);
×
377
    return code;
×
378
  }
379

380
  while (streamTaskIterNextTask(pIter)) {
877✔
381
    SStreamTask *pTask = NULL;
750✔
382
    code = streamTaskIterGetCurrent(pIter, &pTask);
750✔
383
    if (code) {
750!
384
      destroyStreamTaskIter(pIter);
×
385
      return code;
×
386
    }
387

388
    code = doSetPauseAction(pMnode, pTrans, pTask);
750✔
389
    if (code) {
750!
390
      destroyStreamTaskIter(pIter);
×
391
      return code;
×
392
    }
393

394
    if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
750✔
395
      atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus);
746✔
396
      atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
746✔
397
    }
398
  }
399

400
  destroyStreamTaskIter(pIter);
127✔
401
  return code;
127✔
402
}
403

404
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
1,254✔
405
  SStreamTaskIter *pIter = NULL;
1,254✔
406

407
  int32_t code = createStreamTaskIter(pStream, &pIter);
1,254✔
408
  if (code) {
1,254!
409
    mError("failed to create stream task iter:%s", pStream->name);
×
410
    return code;
×
411
  }
412

413
  while(streamTaskIterNextTask(pIter)) {
8,159✔
414
    SStreamTask *pTask = NULL;
6,905✔
415
    code = streamTaskIterGetCurrent(pIter, &pTask);
6,905✔
416
    if (code) {
6,905!
417
      destroyStreamTaskIter(pIter);
×
418
      return code;
×
419
    }
420

421
    code = doSetDropAction(pMnode, pTrans, pTask);
6,905✔
422
    if (code) {
6,905!
423
      destroyStreamTaskIter(pIter);
×
424
      return code;
×
425
    }
426
  }
427
  destroyStreamTaskIter(pIter);
1,254✔
428
  return 0;
1,254✔
429
}
430

431
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
210✔
432
  SStreamTaskIter *pIter = NULL;
210✔
433
  int32_t          code = createStreamTaskIter(pStream, &pIter);
210✔
434
  if (code) {
210!
435
    mError("failed to create stream task iter:%s", pStream->name);
×
436
    return code;
×
437
  }
438

439
  mDebug("transId:%d start to create resume actions", pTrans->id);
210✔
440

441
  while (streamTaskIterNextTask(pIter)) {
1,457✔
442
    SStreamTask *pTask = NULL;
1,247✔
443
    code = streamTaskIterGetCurrent(pIter, &pTask);
1,247✔
444
    if (code || pTask == NULL) {
1,247!
445
      destroyStreamTaskIter(pIter);
×
446
      return code;
×
447
    }
448

449
    code = doSetResumeAction(pTrans, pMnode, pTask, igUntreated);
1,247✔
450
    if (code) {
1,247!
451
      destroyStreamTaskIter(pIter);
×
452
      return code;
×
453
    }
454

455
    if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
1,247✔
456
      atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup);
739✔
457
    }
458
  }
459
  destroyStreamTaskIter(pIter);
210✔
460
  return 0;
210✔
461
}
462

463
// build trans to update the epset
464
int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
4✔
465
  mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
4!
466
  SStreamTaskIter *pIter = NULL;
4✔
467

468
  taosWLockLatch(&pStream->lock);
4✔
469
  int32_t code = createStreamTaskIter(pStream, &pIter);
4✔
470
  if (code) {
4!
471
    taosWUnLockLatch(&pStream->lock);
×
472
    mError("failed to create stream task iter:%s", pStream->name);
×
473
    return code;
×
474
  }
475

476
  while (streamTaskIterNextTask(pIter)) {
30✔
477
    SStreamTask *pTask = NULL;
26✔
478
    code = streamTaskIterGetCurrent(pIter, &pTask);
26✔
479
    if (code) {
26!
480
      destroyStreamTaskIter(pIter);
×
481
      taosWUnLockLatch(&pStream->lock);
×
482
      return code;
×
483
    }
484

485
    code = doSetUpdateTaskAction(pMnode, pTrans, pTask, pInfo);
26✔
486
    if (code != TSDB_CODE_SUCCESS) {
26!
487
      destroyStreamTaskIter(pIter);
×
488
      taosWUnLockLatch(&pStream->lock);
×
489
      return code;
×
490
    }
491
  }
492

493
  destroyStreamTaskIter(pIter);
4✔
494
  taosWUnLockLatch(&pStream->lock);
4✔
495
  return 0;
4✔
496
}
497

498
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
564✔
499
  SStreamTaskIter *pIter = NULL;
564✔
500

501
  taosWLockLatch(&pStream->lock);
564✔
502
  int32_t code = createStreamTaskIter(pStream, &pIter);
564✔
503
  if (code) {
564!
504
    taosWUnLockLatch(&pStream->lock);
×
505
    mError("failed to create stream task iter:%s", pStream->name);
×
506
    return code;
×
507
  }
508

509
  while (streamTaskIterNextTask(pIter)) {
4,015✔
510
    SStreamTask *pTask = NULL;
3,451✔
511
    code = streamTaskIterGetCurrent(pIter, &pTask);
3,451✔
512
    if (code) {
3,451!
513
      destroyStreamTaskIter(pIter);
×
514
      taosWUnLockLatch(&pStream->lock);
×
515
      return code;
×
516
    }
517

518
    code = doSetUpdateChkptAction(pMnode, pTrans, pTask);
3,451✔
519
    if (code != TSDB_CODE_SUCCESS) {
3,451!
520
      destroyStreamTaskIter(pIter);
×
521
      taosWUnLockLatch(&pStream->lock);
×
522
      return code;
×
523
    }
524
  }
525

526
  destroyStreamTaskIter(pIter);
564✔
527
  taosWUnLockLatch(&pStream->lock);
564✔
528
  return code;
564✔
529
}
530

531
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* pList) {
×
532
  for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
533
    SOrphanTask* pTask = taosArrayGet(pList, i);
×
534
    if (pTask == NULL) {
×
535
      return terrno;
×
536
    }
537

538
    int32_t code = doSetDropActionFromId(pMnode, pTrans, pTask);
×
539
    if (code != 0) {
×
540
      return code;
×
541
    } else {
542
      mDebug("add drop task:0x%x action to drop orphan task", pTask->taskId);
×
543
    }
544
  }
545
  return 0;
×
546
}
547

548
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId) {
1✔
549
  SStreamTaskIter *pIter = NULL;
1✔
550

551
  taosWLockLatch(&pStream->lock);
1✔
552
  int32_t code = createStreamTaskIter(pStream, &pIter);
1✔
553
  if (code) {
1!
554
    taosWUnLockLatch(&pStream->lock);
×
555
    mError("failed to create stream task iter:%s", pStream->name);
×
556
    return code;
×
557
  }
558

559
  while (streamTaskIterNextTask(pIter)) {
2✔
560
    SStreamTask *pTask = NULL;
1✔
561
    code = streamTaskIterGetCurrent(pIter, &pTask);
1✔
562
    if (code) {
1!
563
      destroyStreamTaskIter(pIter);
×
564
      taosWUnLockLatch(&pStream->lock);
×
565
      return code;
×
566
    }
567

568
    code = doSetResetAction(pMnode, pTrans, pTask, chkptId);
1✔
569
    if (code != TSDB_CODE_SUCCESS) {
1!
570
      destroyStreamTaskIter(pIter);
×
571
      taosWUnLockLatch(&pStream->lock);
×
572
      return code;
×
573
    }
574
  }
575

576
  destroyStreamTaskIter(pIter);
1✔
577
  taosWUnLockLatch(&pStream->lock);
1✔
578
  return 0;
1✔
579
}
580

581
int32_t doSetCheckpointIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts) {
154✔
582
  SRestoreCheckpointInfo req = {
154✔
583
      .taskId = pTask->id.taskId,
154✔
584
      .streamId = pTask->id.streamId,
154✔
585
      .checkpointId = checkpointId,
586
      .startTs = ts,
587
      .nodeId = pTask->info.nodeId,
154✔
588
      .transId = pTrans->id,
154✔
589
  };
590

591
  int32_t code = 0;
154✔
592
  int32_t blen;
593
  tEncodeSize(tEncodeRestoreCheckpointInfo, &req, blen, code);
154!
594
  if (code < 0) {
154!
595
    return terrno;
×
596
  }
597

598
  int32_t tlen = sizeof(SMsgHead) + blen;
154✔
599

600
  void *pBuf = taosMemoryMalloc(tlen);
154!
601
  if (pBuf == NULL) {
154!
602
    return terrno;
×
603
  }
604

605
  void    *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
154✔
606
  SEncoder encoder;
607
  tEncoderInit(&encoder, abuf, tlen);
154✔
608
  code = tEncodeRestoreCheckpointInfo(&encoder, &req);
154✔
609
  tEncoderClear(&encoder);
154✔
610
  if (code < 0) {
154!
611
    taosMemoryFree(pBuf);
×
612
    return code;
×
613
  }
614

615
  SMsgHead *pMsgHead = (SMsgHead *)pBuf;
154✔
616
  pMsgHead->contLen = htonl(tlen);
154✔
617
  pMsgHead->vgId = htonl(pTask->info.nodeId);
154✔
618

619
  SEpSet  epset = {0};
154✔
620
  bool    hasEpset = false;
154✔
621
  code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
154✔
622
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
154!
623
    taosMemoryFree(pBuf);
×
624
    return code;
×
625
  }
626

627
  code = setTransAction(pTrans, pBuf, tlen, TDMT_STREAM_CONSEN_CHKPT, &epset, TSDB_CODE_STREAM_TASK_IVLD_STATUS, TSDB_CODE_VND_INVALID_VGROUP_ID);
154✔
628
  if (code != TSDB_CODE_SUCCESS) {
154!
629
    taosMemoryFree(pBuf);
×
630
  }
631

632
  return code;
154✔
633
}
634

635
int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t checkpointId,
30✔
636
                                  SArray *pList) {
637
  SStreamTaskIter *pIter = NULL;
30✔
638
  int32_t          num = taosArrayGetSize(pList);
30✔
639

640
  taosWLockLatch(&pStream->lock);
30✔
641
  int32_t code = createStreamTaskIter(pStream, &pIter);
30✔
642
  if (code) {
30!
643
    taosWUnLockLatch(&pStream->lock);
×
644
    mError("failed to create stream task iter:%s", pStream->name);
×
645
    return code;
×
646
  }
647

648
  while (streamTaskIterNextTask(pIter)) {
184✔
649
    SStreamTask *pTask = NULL;
154✔
650
    code = streamTaskIterGetCurrent(pIter, &pTask);
154✔
651
    if (code) {
154!
652
      destroyStreamTaskIter(pIter);
×
653
      taosWUnLockLatch(&pStream->lock);
×
654
      return code;
×
655
    }
656

657
    // find the required entry
658
    int64_t startTs = 0;
154✔
659
    for(int32_t i = 0; i < num; ++i) {
580!
660
      SCheckpointConsensusEntry* pEntry = taosArrayGet(pList, i);
580✔
661
      if (pEntry->req.taskId == pTask->id.taskId) {
580✔
662
        startTs = pEntry->req.startTs;
154✔
663
        break;
154✔
664
      }
665
    }
666

667
    code = doSetCheckpointIdAction(pMnode, pTrans, pTask, checkpointId, startTs);
154✔
668
    if (code != TSDB_CODE_SUCCESS) {
154!
669
      destroyStreamTaskIter(pIter);
×
670
      taosWUnLockLatch(&pStream->lock);
×
671
      return code;
×
672
    }
673
  }
674

675
  destroyStreamTaskIter(pIter);
30✔
676
  taosWUnLockLatch(&pStream->lock);
30✔
677
  return 0;
30✔
678
}
679

680
int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,
2,108✔
681
                                     int8_t mndTrigger) {
682
  void   *buf;
683
  int32_t tlen;
684
  int32_t code = 0;
2,108✔
685
  SEpSet  epset = {0};
2,108✔
686
  bool    hasEpset = false;
2,108✔
687

688
  if ((code = mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
2,108!
689
                                                pTask->id.taskId, pTrans->id, mndTrigger)) < 0) {
690
    taosMemoryFree(buf);
×
691
    return code;
×
692
  }
693

694
  code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
2,108✔
695
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
2,108!
696
    taosMemoryFree(buf);
×
697
    return code;
×
698
  }
699

700
  code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY,
2,108✔
701
                        TSDB_CODE_VND_INVALID_VGROUP_ID);
702
  if (code != 0) {
2,108!
703
    taosMemoryFree(buf);
×
704
  }
705

706
  return code;
2,108✔
707
}
708

709
int32_t mndStreamSetRestartAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pStream) {
×
710
  return 0;
×
711
}
712

713

714
static int32_t doSetStopAllTasksAction(SMnode* pMnode, STrans* pTrans, SVgObj* pVgObj) {
4,014✔
715
  void   *pBuf = NULL;
4,014✔
716
  int32_t len = 0;
4,014✔
717
  int32_t code = 0;
4,014✔
718
  SEncoder encoder;
719

720
  SStreamTaskStopReq req = {.streamId = -1};
4,014✔
721
  tEncodeSize(tEncodeStreamTaskStopReq, &req, len, code);
4,014!
722
  if (code < 0) {
4,014!
723
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
724
    return terrno;
×
725
  }
726

727
  int32_t tlen = sizeof(SMsgHead) + len;
4,014✔
728

729
  pBuf = taosMemoryMalloc(tlen);
4,014!
730
  if (pBuf == NULL) {
4,014!
731
    return terrno;
×
732
  }
733

734
  void    *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
4,014✔
735
  tEncoderInit(&encoder, abuf, tlen);
4,014✔
736
  code = tEncodeStreamTaskStopReq(&encoder, &req);
4,014✔
737
  if (code == -1) {
4,014!
738
    tEncoderClear(&encoder);
×
739
    taosMemoryFree(pBuf);
×
740
    return code;
×
741
  }
742

743
  SMsgHead *pMsgHead = (SMsgHead *)pBuf;
4,014✔
744
  pMsgHead->contLen = htonl(tlen);
4,014✔
745
  pMsgHead->vgId = htonl(pVgObj->vgId);
4,014✔
746

747
  tEncoderClear(&encoder);
4,014✔
748

749
  SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
4,014✔
750
  mndReleaseVgroup(pMnode, pVgObj);
4,014✔
751

752
  code = setTransAction(pTrans, pBuf, tlen, TDMT_VND_STREAM_ALL_STOP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
4,014✔
753
  if (code != TSDB_CODE_SUCCESS) {
4,014!
754
    mError("failed to create stop all streams trans, code:%s", tstrerror(code));
×
755
    taosMemoryFree(pBuf);
×
756
  }
757

758
  return 0;
4,014✔
759
}
760

761
int32_t mndStreamSetStopStreamTasksActions(SMnode* pMnode, STrans *pTrans, uint64_t dbUid) {
1,760✔
762
  int32_t code = 0;
1,760✔
763
  SSdb   *pSdb = pMnode->pSdb;
1,760✔
764
  void   *pIter = NULL;
1,760✔
765

766
  while (1) {
7,185✔
767
    SVgObj *pVgroup = NULL;
8,945✔
768
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
8,945✔
769
    if (pIter == NULL) break;
8,945✔
770

771
    if (pVgroup->dbUid == dbUid) {
7,185✔
772
      if ((code = doSetStopAllTasksAction(pMnode, pTrans, pVgroup)) != 0) {
4,014!
773
        sdbCancelFetch(pSdb, pIter);
×
774
        sdbRelease(pSdb, pVgroup);
×
775
        TAOS_RETURN(code);
×
776
      }
777
    }
778

779
    sdbRelease(pSdb, pVgroup);
7,185✔
780
  }
781

782
  TAOS_RETURN(code);
1,760✔
783
  return 0;
784
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc