• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3653

14 Mar 2025 08:10AM UTC coverage: 22.565% (-41.0%) from 63.596%
#3653

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

49248 of 302527 branches covered (16.28%)

Branch coverage included in aggregate %.

53 of 99 new or added lines in 12 files covered. (53.54%)

155872 existing lines in 443 files now uncovered.

87359 of 302857 relevant lines covered (28.84%)

570004.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.37
/source/dnode/mnode/impl/src/mndStreamTransAct.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23

UNCOV
24
static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
×
UNCOV
25
  SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
×
UNCOV
26
  if (pReq == NULL) {
×
27
    mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
×
28
           tstrerror(TSDB_CODE_OUT_OF_MEMORY));
29
    // terrno = TSDB_CODE_OUT_OF_MEMORY;
30
    return terrno;
×
31
  }
32

UNCOV
33
  pReq->head.vgId = htonl(pTask->info.nodeId);
×
UNCOV
34
  pReq->taskId = pTask->id.taskId;
×
UNCOV
35
  pReq->streamId = pTask->id.streamId;
×
36

UNCOV
37
  SEpSet  epset = {0};
×
UNCOV
38
  bool    hasEpset = false;
×
UNCOV
39
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
×
UNCOV
40
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
×
41
    terrno = code;
×
42
    taosMemoryFree(pReq);
×
43
    return code;
×
44
  }
45

UNCOV
46
  char buf[256] = {0};
×
UNCOV
47
  code = epsetToStr(&epset, buf, tListLen(buf));
×
UNCOV
48
  if (code != 0) {  // print error and continue
×
49
    mError("failed to convert epset to str, code:%s", tstrerror(code));
×
50
  }
51

UNCOV
52
  mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
×
UNCOV
53
  code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
UNCOV
54
  if (code != 0) {
×
55
    taosMemoryFree(pReq);
×
56
    return code;
×
57
  }
UNCOV
58
  return 0;
×
59
}
60

UNCOV
61
static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
×
UNCOV
62
  SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
×
UNCOV
63
  if (pReq == NULL) {
×
64
    return terrno;
×
65
  }
66

UNCOV
67
  pReq->head.vgId = htonl(pTask->info.nodeId);
×
UNCOV
68
  pReq->taskId = pTask->id.taskId;
×
UNCOV
69
  pReq->streamId = pTask->id.streamId;
×
70

UNCOV
71
  SEpSet  epset = {0};
×
UNCOV
72
  bool    hasEpset = false;
×
UNCOV
73
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
×
UNCOV
74
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {  // no valid epset, return directly without redoAction
×
75
    return code;
×
76
  }
77

78
  // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
UNCOV
79
  code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
UNCOV
80
  if (code != 0) {
×
81
    taosMemoryFree(pReq);
×
82
    return code;
×
83
  }
84

UNCOV
85
  return 0;
×
86
}
87

UNCOV
88
static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) {
×
UNCOV
89
  terrno = 0;
×
90

UNCOV
91
  SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
×
UNCOV
92
  if (pReq == NULL) {
×
93
    mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
×
94
           tstrerror(TSDB_CODE_OUT_OF_MEMORY));
95
    return terrno;
×
96
  }
97

UNCOV
98
  pReq->head.vgId = htonl(pTask->info.nodeId);
×
UNCOV
99
  pReq->taskId = pTask->id.taskId;
×
UNCOV
100
  pReq->streamId = pTask->id.streamId;
×
UNCOV
101
  pReq->igUntreated = igUntreated;
×
102

UNCOV
103
  SEpSet  epset = {0};
×
UNCOV
104
  bool    hasEpset = false;
×
UNCOV
105
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
×
UNCOV
106
  if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {
×
107
    taosMemoryFree(pReq);
×
108
    return code;
×
109
  }
110

UNCOV
111
  code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
UNCOV
112
  if (code != 0) {
×
113
    taosMemoryFree(pReq);
×
114
    return code;
×
115
  }
116

UNCOV
117
  mDebug("set the resume action for trans:%d", pTrans->id);
×
UNCOV
118
  return code;
×
119
}
120

UNCOV
121
static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) {
×
UNCOV
122
  SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
×
UNCOV
123
  if (pReq == NULL) {
×
124
    // terrno = TSDB_CODE_OUT_OF_MEMORY;
125
    return terrno;
×
126
  }
127

UNCOV
128
  pReq->head.vgId = htonl(pTask->nodeId);
×
UNCOV
129
  pReq->taskId = pTask->taskId;
×
UNCOV
130
  pReq->streamId = pTask->streamId;
×
131

UNCOV
132
  SEpSet  epset = {0};
×
UNCOV
133
  bool    hasEpset = false;
×
UNCOV
134
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->taskId, pTask->nodeId);
×
UNCOV
135
  if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {  // no valid epset, return directly without redoAction
×
136
    taosMemoryFree(pReq);
×
137
    return code;
×
138
  }
139

140
  // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
UNCOV
141
  code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
UNCOV
142
  if (code != 0) {
×
143
    taosMemoryFree(pReq);
×
144
    return code;
×
145
  }
146

UNCOV
147
  return 0;
×
148
}
149

UNCOV
150
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId,
×
151
                              int32_t transId) {
UNCOV
152
  int32_t code = 0;
×
153

UNCOV
154
  pMsg->streamId = pId->streamId;
×
UNCOV
155
  pMsg->taskId = pId->taskId;
×
UNCOV
156
  pMsg->transId = transId;
×
UNCOV
157
  pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
×
UNCOV
158
  if (pMsg->pNodeList == NULL) {
×
159
    mError("failed to prepare node list, code:%s", tstrerror(terrno));
×
160
    code = terrno;
×
161
  }
162

UNCOV
163
  if (code == 0) {
×
UNCOV
164
    void *p = taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
×
UNCOV
165
    if (p == NULL) {
×
166
      mError("failed to add update node list into nodeList");
×
167
    }
168
  }
UNCOV
169
}
×
170

UNCOV
171
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
×
172
                                          SStreamTaskId *pId, int32_t transId) {
UNCOV
173
  SStreamTaskNodeUpdateMsg req = {0};
×
UNCOV
174
  initNodeUpdateMsg(&req, pInfo, pId, transId);
×
175

UNCOV
176
  int32_t code = 0;
×
177
  int32_t blen;
178

UNCOV
179
  tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
×
UNCOV
180
  if (code < 0) {
×
181
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
182
    taosArrayDestroy(req.pNodeList);
×
183
    return terrno;
×
184
  }
185

UNCOV
186
  int32_t tlen = sizeof(SMsgHead) + blen;
×
187

UNCOV
188
  void *buf = taosMemoryMalloc(tlen);
×
UNCOV
189
  if (buf == NULL) {
×
190
    taosArrayDestroy(req.pNodeList);
×
191
    return terrno;
×
192
  }
193

UNCOV
194
  void    *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
×
195
  SEncoder encoder;
UNCOV
196
  tEncoderInit(&encoder, abuf, tlen);
×
UNCOV
197
  code = tEncodeStreamTaskUpdateMsg(&encoder, &req);
×
UNCOV
198
  if (code == -1) {
×
199
    tEncoderClear(&encoder);
×
200
    taosMemoryFree(buf);
×
201
    taosArrayDestroy(req.pNodeList);
×
202
    return code;
×
203
  }
204

UNCOV
205
  SMsgHead *pMsgHead = (SMsgHead *)buf;
×
UNCOV
206
  pMsgHead->contLen = htonl(tlen);
×
UNCOV
207
  pMsgHead->vgId = htonl(nodeId);
×
208

UNCOV
209
  tEncoderClear(&encoder);
×
210

UNCOV
211
  *pBuf = buf;
×
UNCOV
212
  *pLen = tlen;
×
213

UNCOV
214
  taosArrayDestroy(req.pNodeList);
×
UNCOV
215
  return TSDB_CODE_SUCCESS;
×
216
}
217

UNCOV
218
static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
×
UNCOV
219
  void   *pBuf = NULL;
×
UNCOV
220
  int32_t len = 0;
×
UNCOV
221
  SEpSet  epset = {0};
×
UNCOV
222
  bool    hasEpset = false;
×
223

UNCOV
224
  bool    unusedRet = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
×
UNCOV
225
  int32_t code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
×
UNCOV
226
  if (code) {
×
227
    mError("failed to build stream task epset update msg, code:%s", tstrerror(code));
×
228
    return code;
×
229
  }
230

UNCOV
231
  code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
×
UNCOV
232
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
×
233
    mError("failed to extract epset during create update epset, code:%s", tstrerror(code));
×
234
    taosMemoryFree(pBuf);
×
235
    return code;
×
236
  }
237

UNCOV
238
  code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
UNCOV
239
  if (code != TSDB_CODE_SUCCESS) {
×
240
    mError("failed to create update task epset trans, code:%s", tstrerror(code));
×
241
    taosMemoryFree(pBuf);
×
242
  }
243

UNCOV
244
  return code;
×
245
}
246

UNCOV
247
static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
×
UNCOV
248
  SVUpdateCheckpointInfoReq *pReq = taosMemoryCalloc(1, sizeof(SVUpdateCheckpointInfoReq));
×
UNCOV
249
  if (pReq == NULL) {
×
250
    mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVUpdateCheckpointInfoReq),
×
251
           tstrerror(terrno));
252
    return terrno;
×
253
  }
254

UNCOV
255
  pReq->head.vgId = htonl(pTask->info.nodeId);
×
UNCOV
256
  pReq->taskId = pTask->id.taskId;
×
UNCOV
257
  pReq->streamId = pTask->id.streamId;
×
258

UNCOV
259
  SChkptReportInfo *pStreamItem = (SChkptReportInfo*)taosHashGet(execInfo.pChkptStreams, &pTask->id.streamId, sizeof(pTask->id.streamId));
×
UNCOV
260
  if (pStreamItem == NULL) {
×
261
    return TSDB_CODE_INVALID_PARA;
×
262
  }
263

UNCOV
264
  int32_t size = taosArrayGetSize(pStreamItem->pTaskList);
×
UNCOV
265
  for(int32_t i = 0; i < size; ++i) {
×
UNCOV
266
    STaskChkptInfo* pInfo = taosArrayGet(pStreamItem->pTaskList, i);
×
UNCOV
267
    if (pInfo == NULL) {
×
268
      continue;
×
269
    }
270

UNCOV
271
    if (pInfo->taskId == pTask->id.taskId) {
×
UNCOV
272
      pReq->checkpointId = pInfo->checkpointId;
×
UNCOV
273
      pReq->checkpointVer = pInfo->version;
×
UNCOV
274
      pReq->checkpointTs = pInfo->ts;
×
UNCOV
275
      pReq->dropRelHTask = pInfo->dropHTask;
×
UNCOV
276
      pReq->transId = pInfo->transId;
×
UNCOV
277
      pReq->hStreamId = pTask->hTaskInfo.id.streamId;
×
UNCOV
278
      pReq->hTaskId = pTask->hTaskInfo.id.taskId;
×
279
    }
280
  }
281

UNCOV
282
  SEpSet  epset = {0};
×
UNCOV
283
  bool    hasEpset = false;
×
UNCOV
284
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
×
UNCOV
285
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
×
UNCOV
286
    taosMemoryFree(pReq);
×
UNCOV
287
    return code;
×
288
  }
289

UNCOV
290
  code = setTransAction(pTrans, pReq, sizeof(SVUpdateCheckpointInfoReq), TDMT_STREAM_TASK_UPDATE_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
UNCOV
291
  if (code != TSDB_CODE_SUCCESS) {
×
292
    taosMemoryFree(pReq);
×
293
  }
294

UNCOV
295
  return code;
×
296
}
297

298
static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t chkptId) {
1✔
299
  SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
1!
300
  if (pReq == NULL) {
1!
301
    mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
×
302
           tstrerror(terrno));
303
    return terrno;
×
304
  }
305

306
  pReq->head.vgId = htonl(pTask->info.nodeId);
1✔
307
  pReq->taskId = pTask->id.taskId;
1✔
308
  pReq->streamId = pTask->id.streamId;
1✔
309
  pReq->chkptId = chkptId;
1✔
310

311
  SEpSet  epset = {0};
1✔
312
  bool    hasEpset = false;
1✔
313
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
1✔
314
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
1!
315
    taosMemoryFree(pReq);
1!
316
    return code;
1✔
317
  }
318

319
  code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
320
  if (code != TSDB_CODE_SUCCESS) {
×
321
    taosMemoryFree(pReq);
×
322
  }
323

324
  return code;
×
325
}
326

UNCOV
327
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
×
328
                                                 int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger) {
UNCOV
329
  SStreamCheckpointSourceReq req = {0};
×
UNCOV
330
  req.checkpointId = checkpointId;
×
UNCOV
331
  req.nodeId = nodeId;
×
UNCOV
332
  req.expireTime = -1;
×
UNCOV
333
  req.streamId = streamId;  // pTask->id.streamId;
×
UNCOV
334
  req.taskId = taskId;      // pTask->id.taskId;
×
UNCOV
335
  req.transId = transId;
×
UNCOV
336
  req.mndTrigger = mndTrigger;
×
337

338
  int32_t code;
339
  int32_t blen;
340

UNCOV
341
  tEncodeSize(tEncodeStreamCheckpointSourceReq, &req, blen, code);
×
UNCOV
342
  if (code < 0) {
×
343
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
344
  }
345

UNCOV
346
  int32_t tlen = sizeof(SMsgHead) + blen;
×
347

UNCOV
348
  void *buf = taosMemoryMalloc(tlen);
×
UNCOV
349
  if (buf == NULL) {
×
350
    return terrno;
×
351
  }
352

UNCOV
353
  void    *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
×
354
  SEncoder encoder;
UNCOV
355
  tEncoderInit(&encoder, abuf, tlen);
×
UNCOV
356
  int32_t pos = tEncodeStreamCheckpointSourceReq(&encoder, &req);
×
UNCOV
357
  if (pos == -1) {
×
358
    tEncoderClear(&encoder);
×
359
    return TSDB_CODE_INVALID_MSG;
×
360
  }
361

UNCOV
362
  SMsgHead *pMsgHead = (SMsgHead *)buf;
×
UNCOV
363
  pMsgHead->contLen = htonl(tlen);
×
UNCOV
364
  pMsgHead->vgId = htonl(nodeId);
×
365

UNCOV
366
  tEncoderClear(&encoder);
×
367

UNCOV
368
  *pBuf = buf;
×
UNCOV
369
  *pLen = tlen;
×
370

UNCOV
371
  return 0;
×
372
}
UNCOV
373
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
×
UNCOV
374
  SStreamTaskIter *pIter = NULL;
×
375

UNCOV
376
  int32_t code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
377
  if (code) {
×
378
    mError("failed to create stream task iter:%s", pStream->name);
×
379
    return code;
×
380
  }
381

UNCOV
382
  while (streamTaskIterNextTask(pIter)) {
×
UNCOV
383
    SStreamTask *pTask = NULL;
×
UNCOV
384
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
385
    if (code) {
×
386
      destroyStreamTaskIter(pIter);
×
387
      return code;
×
388
    }
389

UNCOV
390
    code = doSetPauseAction(pMnode, pTrans, pTask);
×
UNCOV
391
    if (code) {
×
392
      destroyStreamTaskIter(pIter);
×
393
      return code;
×
394
    }
395

UNCOV
396
    if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
×
UNCOV
397
      atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus);
×
UNCOV
398
      atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
×
399
    }
400
  }
401

UNCOV
402
  destroyStreamTaskIter(pIter);
×
UNCOV
403
  return code;
×
404
}
405

UNCOV
406
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
×
UNCOV
407
  SStreamTaskIter *pIter = NULL;
×
408

UNCOV
409
  int32_t code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
410
  if (code) {
×
411
    mError("failed to create stream task iter:%s", pStream->name);
×
412
    return code;
×
413
  }
414

UNCOV
415
  while(streamTaskIterNextTask(pIter)) {
×
UNCOV
416
    SStreamTask *pTask = NULL;
×
UNCOV
417
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
418
    if (code) {
×
419
      destroyStreamTaskIter(pIter);
×
420
      return code;
×
421
    }
422

UNCOV
423
    code = doSetDropAction(pMnode, pTrans, pTask);
×
UNCOV
424
    if (code) {
×
425
      destroyStreamTaskIter(pIter);
×
426
      return code;
×
427
    }
428
  }
UNCOV
429
  destroyStreamTaskIter(pIter);
×
UNCOV
430
  return 0;
×
431
}
432

UNCOV
433
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
×
UNCOV
434
  SStreamTaskIter *pIter = NULL;
×
UNCOV
435
  int32_t          code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
436
  if (code) {
×
437
    mError("failed to create stream task iter:%s", pStream->name);
×
438
    return code;
×
439
  }
440

UNCOV
441
  while (streamTaskIterNextTask(pIter)) {
×
UNCOV
442
    SStreamTask *pTask = NULL;
×
UNCOV
443
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
444
    if (code || pTask == NULL) {
×
445
      destroyStreamTaskIter(pIter);
×
446
      return code;
×
447
    }
448

UNCOV
449
    code = doSetResumeAction(pTrans, pMnode, pTask, igUntreated);
×
UNCOV
450
    if (code) {
×
451
      destroyStreamTaskIter(pIter);
×
452
      return code;
×
453
    }
454

UNCOV
455
    if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
×
UNCOV
456
      atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup);
×
457
    }
458
  }
UNCOV
459
  destroyStreamTaskIter(pIter);
×
UNCOV
460
  return 0;
×
461
}
462

463
// build trans to update the epset
UNCOV
464
int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
×
UNCOV
465
  mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
×
UNCOV
466
  SStreamTaskIter *pIter = NULL;
×
467

UNCOV
468
  taosWLockLatch(&pStream->lock);
×
UNCOV
469
  int32_t code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
470
  if (code) {
×
471
    taosWUnLockLatch(&pStream->lock);
×
472
    mError("failed to create stream task iter:%s", pStream->name);
×
473
    return code;
×
474
  }
475

UNCOV
476
  while (streamTaskIterNextTask(pIter)) {
×
UNCOV
477
    SStreamTask *pTask = NULL;
×
UNCOV
478
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
479
    if (code) {
×
480
      destroyStreamTaskIter(pIter);
×
481
      taosWUnLockLatch(&pStream->lock);
×
482
      return code;
×
483
    }
484

UNCOV
485
    code = doSetUpdateTaskAction(pMnode, pTrans, pTask, pInfo);
×
UNCOV
486
    if (code != TSDB_CODE_SUCCESS) {
×
487
      destroyStreamTaskIter(pIter);
×
488
      taosWUnLockLatch(&pStream->lock);
×
489
      return code;
×
490
    }
491
  }
492

UNCOV
493
  destroyStreamTaskIter(pIter);
×
UNCOV
494
  taosWUnLockLatch(&pStream->lock);
×
UNCOV
495
  return 0;
×
496
}
497

UNCOV
498
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
×
UNCOV
499
  SStreamTaskIter *pIter = NULL;
×
500

UNCOV
501
  taosWLockLatch(&pStream->lock);
×
UNCOV
502
  int32_t code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
503
  if (code) {
×
504
    taosWUnLockLatch(&pStream->lock);
×
505
    mError("failed to create stream task iter:%s", pStream->name);
×
506
    return code;
×
507
  }
508

UNCOV
509
  while (streamTaskIterNextTask(pIter)) {
×
UNCOV
510
    SStreamTask *pTask = NULL;
×
UNCOV
511
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
512
    if (code) {
×
513
      destroyStreamTaskIter(pIter);
×
514
      taosWUnLockLatch(&pStream->lock);
×
515
      return code;
×
516
    }
517

UNCOV
518
    code = doSetUpdateChkptAction(pMnode, pTrans, pTask);
×
UNCOV
519
    if (code != TSDB_CODE_SUCCESS) {
×
520
      destroyStreamTaskIter(pIter);
×
521
      taosWUnLockLatch(&pStream->lock);
×
522
      return code;
×
523
    }
524
  }
525

UNCOV
526
  destroyStreamTaskIter(pIter);
×
UNCOV
527
  taosWUnLockLatch(&pStream->lock);
×
UNCOV
528
  return code;
×
529
}
530

UNCOV
531
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* pList) {
×
UNCOV
532
  for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
UNCOV
533
    SOrphanTask* pTask = taosArrayGet(pList, i);
×
UNCOV
534
    if (pTask == NULL) {
×
535
      return terrno;
×
536
    }
537

UNCOV
538
    int32_t code = doSetDropActionFromId(pMnode, pTrans, pTask);
×
UNCOV
539
    if (code != 0) {
×
540
      return code;
×
541
    } else {
UNCOV
542
      mDebug("add drop task:0x%x action to drop orphan task", pTask->taskId);
×
543
    }
544
  }
UNCOV
545
  return 0;
×
546
}
547

548
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId) {
1✔
549
  SStreamTaskIter *pIter = NULL;
1✔
550

551
  taosWLockLatch(&pStream->lock);
1✔
552
  int32_t code = createStreamTaskIter(pStream, &pIter);
1✔
553
  if (code) {
1!
554
    taosWUnLockLatch(&pStream->lock);
×
555
    mError("failed to create stream task iter:%s", pStream->name);
×
556
    return code;
×
557
  }
558

559
  while (streamTaskIterNextTask(pIter)) {
2✔
560
    SStreamTask *pTask = NULL;
1✔
561
    code = streamTaskIterGetCurrent(pIter, &pTask);
1✔
562
    if (code) {
1!
563
      destroyStreamTaskIter(pIter);
×
564
      taosWUnLockLatch(&pStream->lock);
×
565
      return code;
×
566
    }
567

568
    code = doSetResetAction(pMnode, pTrans, pTask, chkptId);
1✔
569
    if (code != TSDB_CODE_SUCCESS) {
1!
570
      destroyStreamTaskIter(pIter);
×
571
      taosWUnLockLatch(&pStream->lock);
×
572
      return code;
×
573
    }
574
  }
575

576
  destroyStreamTaskIter(pIter);
1✔
577
  taosWUnLockLatch(&pStream->lock);
1✔
578
  return 0;
1✔
579
}
580

UNCOV
581
int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts) {
×
UNCOV
582
  SRestoreCheckpointInfo req = {
×
UNCOV
583
      .taskId = pTask->id.taskId,
×
UNCOV
584
      .streamId = pTask->id.streamId,
×
585
      .checkpointId = checkpointId,
586
      .startTs = ts,
UNCOV
587
      .nodeId = pTask->info.nodeId,
×
UNCOV
588
      .transId = pTrans->id,
×
589
  };
590

UNCOV
591
  int32_t code = 0;
×
592
  int32_t blen;
UNCOV
593
  tEncodeSize(tEncodeRestoreCheckpointInfo, &req, blen, code);
×
UNCOV
594
  if (code < 0) {
×
595
    return terrno;
×
596
  }
597

UNCOV
598
  int32_t tlen = sizeof(SMsgHead) + blen;
×
599

UNCOV
600
  void *pBuf = taosMemoryMalloc(tlen);
×
UNCOV
601
  if (pBuf == NULL) {
×
602
    return terrno;
×
603
  }
604

UNCOV
605
  void    *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
×
606
  SEncoder encoder;
UNCOV
607
  tEncoderInit(&encoder, abuf, tlen);
×
UNCOV
608
  code = tEncodeRestoreCheckpointInfo(&encoder, &req);
×
UNCOV
609
  tEncoderClear(&encoder);
×
UNCOV
610
  if (code < 0) {
×
611
    taosMemoryFree(pBuf);
×
612
    return code;
×
613
  }
614

UNCOV
615
  SMsgHead *pMsgHead = (SMsgHead *)pBuf;
×
UNCOV
616
  pMsgHead->contLen = htonl(tlen);
×
UNCOV
617
  pMsgHead->vgId = htonl(pTask->info.nodeId);
×
618

UNCOV
619
  SEpSet  epset = {0};
×
UNCOV
620
  bool    hasEpset = false;
×
UNCOV
621
  code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
×
UNCOV
622
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
×
623
    taosMemoryFree(pBuf);
×
624
    return code;
×
625
  }
626

UNCOV
627
  code = setTransAction(pTrans, pBuf, tlen, TDMT_STREAM_CONSEN_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
UNCOV
628
  if (code != TSDB_CODE_SUCCESS) {
×
629
    taosMemoryFree(pBuf);
×
630
  }
631

UNCOV
632
  return code;
×
633
}
634

635

UNCOV
636
int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,
×
637
                                     int8_t mndTrigger) {
638
  void   *buf;
639
  int32_t tlen;
UNCOV
640
  int32_t code = 0;
×
UNCOV
641
  SEpSet  epset = {0};
×
UNCOV
642
  bool    hasEpset = false;
×
643

UNCOV
644
  if ((code = mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
×
645
                                                pTask->id.taskId, pTrans->id, mndTrigger)) < 0) {
646
    taosMemoryFree(buf);
×
647
    return code;
×
648
  }
649

UNCOV
650
  code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
×
UNCOV
651
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
×
652
    taosMemoryFree(buf);
×
653
    return code;
×
654
  }
655

UNCOV
656
  code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY,
×
657
                        TSDB_CODE_VND_INVALID_VGROUP_ID);
UNCOV
658
  if (code != 0) {
×
659
    taosMemoryFree(buf);
×
660
  }
661

UNCOV
662
  return code;
×
663
}
664

665
int32_t mndStreamSetRestartAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pStream) {
×
666
  return 0;
×
667
}
668

669

670
static int32_t doSetStopAllTasksAction(SMnode* pMnode, STrans* pTrans, SVgObj* pVgObj) {
16✔
671
  void   *pBuf = NULL;
16✔
672
  int32_t len = 0;
16✔
673
  int32_t code = 0;
16✔
674
  SEncoder encoder;
675

676
  SStreamTaskStopReq req = {.streamId = -1};
16✔
677
  tEncodeSize(tEncodeStreamTaskStopReq, &req, len, code);
16!
678
  if (code < 0) {
16!
679
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
680
    return terrno;
×
681
  }
682

683
  int32_t tlen = sizeof(SMsgHead) + len;
16✔
684

685
  pBuf = taosMemoryMalloc(tlen);
16!
686
  if (pBuf == NULL) {
16!
687
    return terrno;
×
688
  }
689

690
  void    *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
16✔
691
  tEncoderInit(&encoder, abuf, tlen);
16✔
692
  code = tEncodeStreamTaskStopReq(&encoder, &req);
16✔
693
  if (code == -1) {
16!
694
    tEncoderClear(&encoder);
×
695
    taosMemoryFree(pBuf);
×
696
    return code;
×
697
  }
698

699
  SMsgHead *pMsgHead = (SMsgHead *)pBuf;
16✔
700
  pMsgHead->contLen = htonl(tlen);
16✔
701
  pMsgHead->vgId = htonl(pVgObj->vgId);
16✔
702

703
  tEncoderClear(&encoder);
16✔
704

705
  SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
16✔
706
  mndReleaseVgroup(pMnode, pVgObj);
16✔
707

708
  code = setTransAction(pTrans, pBuf, tlen, TDMT_VND_STREAM_ALL_STOP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
16✔
709
  if (code != TSDB_CODE_SUCCESS) {
16!
710
    mError("failed to create stop all streams trans, code:%s", tstrerror(code));
×
711
    taosMemoryFree(pBuf);
×
712
  }
713

714
  return 0;
16✔
715
}
716

717
int32_t mndStreamSetStopStreamTasksActions(SMnode* pMnode, STrans *pTrans, uint64_t dbUid) {
8✔
718
  int32_t code = 0;
8✔
719
  SSdb   *pSdb = pMnode->pSdb;
8✔
720
  void   *pIter = NULL;
8✔
721

722
  while (1) {
16✔
723
    SVgObj *pVgroup = NULL;
24✔
724
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
24✔
725
    if (pIter == NULL) break;
24✔
726

727
    if (pVgroup->dbUid == dbUid) {
16!
728
      if ((code = doSetStopAllTasksAction(pMnode, pTrans, pVgroup)) != 0) {
16!
729
        sdbCancelFetch(pSdb, pIter);
×
730
        sdbRelease(pSdb, pVgroup);
×
731
        TAOS_RETURN(code);
×
732
      }
733
    }
734

735
    sdbRelease(pSdb, pVgroup);
16✔
736
  }
737

738
  TAOS_RETURN(code);
8✔
739
  return 0;
740
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc