• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3720

26 Mar 2025 06:20AM UTC coverage: 30.242% (-31.7%) from 61.936%
#3720

push

travis-ci

web-flow
feat(taosBenchmark): supports decimal data type (#30456)

* feat: taosBenchmark supports decimal data type

* build: decimal script not use pytest.sh

* fix: fix typo for decimal script

* test: insertBasic.py debug

71234 of 313946 branches covered (22.69%)

Branch coverage included in aggregate %.

38 of 423 new or added lines in 8 files covered. (8.98%)

120240 existing lines in 447 files now uncovered.

118188 of 312400 relevant lines covered (37.83%)

1450220.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

5.11
/source/dnode/mnode/impl/src/mndStreamTransAct.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23

UNCOV
24
static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
×
UNCOV
25
  SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
×
UNCOV
26
  if (pReq == NULL) {
×
27
    mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
×
28
           tstrerror(TSDB_CODE_OUT_OF_MEMORY));
29
    // terrno = TSDB_CODE_OUT_OF_MEMORY;
30
    return terrno;
×
31
  }
32

UNCOV
33
  pReq->head.vgId = htonl(pTask->info.nodeId);
×
UNCOV
34
  pReq->taskId = pTask->id.taskId;
×
UNCOV
35
  pReq->streamId = pTask->id.streamId;
×
36

UNCOV
37
  SEpSet  epset = {0};
×
UNCOV
38
  bool    hasEpset = false;
×
UNCOV
39
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
×
UNCOV
40
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
×
41
    terrno = code;
×
42
    taosMemoryFree(pReq);
×
43
    return code;
×
44
  }
45

UNCOV
46
  char buf[256] = {0};
×
UNCOV
47
  code = epsetToStr(&epset, buf, tListLen(buf));
×
UNCOV
48
  if (code != 0) {  // print error and continue
×
49
    mError("failed to convert epset to str, code:%s", tstrerror(code));
×
50
  }
51

UNCOV
52
  mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
×
UNCOV
53
  code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
UNCOV
54
  if (code != 0) {
×
55
    taosMemoryFree(pReq);
×
56
    return code;
×
57
  }
UNCOV
58
  return 0;
×
59
}
60

UNCOV
61
static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
×
UNCOV
62
  SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
×
UNCOV
63
  if (pReq == NULL) {
×
64
    return terrno;
×
65
  }
66

UNCOV
67
  pReq->head.vgId = htonl(pTask->info.nodeId);
×
UNCOV
68
  pReq->taskId = pTask->id.taskId;
×
UNCOV
69
  pReq->streamId = pTask->id.streamId;
×
70

UNCOV
71
  SEpSet  epset = {0};
×
UNCOV
72
  bool    hasEpset = false;
×
UNCOV
73
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
×
UNCOV
74
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {  // no valid epset, return directly without redoAction
×
75
    return code;
×
76
  }
77

78
  // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
UNCOV
79
  code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
UNCOV
80
  if (code != 0) {
×
81
    taosMemoryFree(pReq);
×
82
    return code;
×
83
  }
84

UNCOV
85
  return 0;
×
86
}
87

UNCOV
88
static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) {
×
UNCOV
89
  terrno = 0;
×
90

UNCOV
91
  SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
×
UNCOV
92
  if (pReq == NULL) {
×
93
    mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
×
94
           tstrerror(TSDB_CODE_OUT_OF_MEMORY));
95
    return terrno;
×
96
  }
97

UNCOV
98
  pReq->head.vgId = htonl(pTask->info.nodeId);
×
UNCOV
99
  pReq->taskId = pTask->id.taskId;
×
UNCOV
100
  pReq->streamId = pTask->id.streamId;
×
UNCOV
101
  pReq->igUntreated = igUntreated;
×
102

UNCOV
103
  SEpSet  epset = {0};
×
UNCOV
104
  bool    hasEpset = false;
×
UNCOV
105
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
×
UNCOV
106
  if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {
×
107
    taosMemoryFree(pReq);
×
108
    return code;
×
109
  }
110

UNCOV
111
  code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
UNCOV
112
  if (code != 0) {
×
113
    taosMemoryFree(pReq);
×
114
    return code;
×
115
  }
UNCOV
116
  return code;
×
117
}
118

UNCOV
119
static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) {
×
UNCOV
120
  SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
×
UNCOV
121
  if (pReq == NULL) {
×
122
    // terrno = TSDB_CODE_OUT_OF_MEMORY;
123
    return terrno;
×
124
  }
125

UNCOV
126
  pReq->head.vgId = htonl(pTask->nodeId);
×
UNCOV
127
  pReq->taskId = pTask->taskId;
×
UNCOV
128
  pReq->streamId = pTask->streamId;
×
129

UNCOV
130
  SEpSet  epset = {0};
×
UNCOV
131
  bool    hasEpset = false;
×
UNCOV
132
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->taskId, pTask->nodeId);
×
UNCOV
133
  if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {  // no valid epset, return directly without redoAction
×
134
    taosMemoryFree(pReq);
×
135
    return code;
×
136
  }
137

138
  // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
UNCOV
139
  code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
UNCOV
140
  if (code != 0) {
×
141
    taosMemoryFree(pReq);
×
142
    return code;
×
143
  }
144

UNCOV
145
  return 0;
×
146
}
147

UNCOV
148
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SArray* pTaskList, SStreamTaskId *pId,
×
149
                              int32_t transId) {
UNCOV
150
  int32_t code = 0;
×
151

UNCOV
152
  pMsg->streamId = pId->streamId;
×
UNCOV
153
  pMsg->taskId = pId->taskId;
×
UNCOV
154
  pMsg->transId = transId;
×
UNCOV
155
  pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
×
UNCOV
156
  if (pMsg->pNodeList == NULL) {
×
157
    mError("failed to prepare node list, code:%s", tstrerror(terrno));
×
158
    code = terrno;
×
159
  }
160

UNCOV
161
  pMsg->pTaskList = pTaskList;
×
162

UNCOV
163
  if (code == 0) {
×
164
    void *p = taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
×
UNCOV
165
    if (p == NULL) {
×
UNCOV
166
      mError("failed to add update node list into nodeList");
×
167
    }
168
  }
UNCOV
169
}
×
170

UNCOV
171
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, SArray* pList, int32_t nodeId,
×
172
                                          SStreamTaskId *pId, int32_t transId) {
UNCOV
173
  SStreamTaskNodeUpdateMsg req = {0};
×
UNCOV
174
  initNodeUpdateMsg(&req, pInfo, pList, pId, transId);
×
175

UNCOV
176
  int32_t code = 0;
×
177
  int32_t blen;
178

179
  tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
×
180
  if (code < 0) {
×
181
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
182
    tDestroyNodeUpdateMsg(&req);
×
UNCOV
183
    return terrno;
×
184
  }
185

UNCOV
186
  int32_t tlen = sizeof(SMsgHead) + blen;
×
187

188
  void *buf = taosMemoryMalloc(tlen);
×
189
  if (buf == NULL) {
×
UNCOV
190
    tDestroyNodeUpdateMsg(&req);
×
UNCOV
191
    return terrno;
×
192
  }
193

UNCOV
194
  void    *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
×
195
  SEncoder encoder;
UNCOV
196
  tEncoderInit(&encoder, abuf, tlen);
×
197
  code = tEncodeStreamTaskUpdateMsg(&encoder, &req);
×
198
  if (code == -1) {
×
199
    tEncoderClear(&encoder);
×
200
    taosMemoryFree(buf);
×
UNCOV
201
    tDestroyNodeUpdateMsg(&req);
×
UNCOV
202
    return code;
×
203
  }
204

UNCOV
205
  SMsgHead *pMsgHead = (SMsgHead *)buf;
×
UNCOV
206
  pMsgHead->contLen = htonl(tlen);
×
UNCOV
207
  pMsgHead->vgId = htonl(nodeId);
×
208

UNCOV
209
  tEncoderClear(&encoder);
×
210

UNCOV
211
  *pBuf = buf;
×
UNCOV
212
  *pLen = tlen;
×
213

UNCOV
214
  tDestroyNodeUpdateMsg(&req);
×
UNCOV
215
  return TSDB_CODE_SUCCESS;
×
216
}
217

218
// todo: set the task id list for a given nodeId
UNCOV
219
static int32_t createUpdateTaskList(int32_t vgId, SArray* pList) {
×
UNCOV
220
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
×
UNCOV
221
    STaskId *p = taosArrayGet(execInfo.pTaskList, i);
×
UNCOV
222
    if (p == NULL) {
×
UNCOV
223
      continue;
×
224
    }
225

226
    STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
×
UNCOV
227
    if (pe->nodeId == vgId) {
×
UNCOV
228
      void *pRet = taosArrayPush(pList, &pe->id.taskId);
×
UNCOV
229
      if (pRet == NULL) {
×
UNCOV
230
        return terrno;
×
231
      }
232
    }
233
  }
234

UNCOV
235
  return TSDB_CODE_SUCCESS;
×
236
}
237

238
static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
×
239
  void   *pBuf = NULL;
×
UNCOV
240
  int32_t len = 0;
×
UNCOV
241
  SEpSet  epset = {0};
×
UNCOV
242
  bool    hasEpset = false;
×
UNCOV
243
  SArray* pTaskList = taosArrayInit(4, sizeof(int32_t));
×
UNCOV
244
  if (pTaskList == NULL) {
×
UNCOV
245
    return terrno;
×
246
  }
247

248
  bool unusedRet = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
×
UNCOV
249
  int32_t code = createUpdateTaskList(pTask->info.nodeId, pTaskList);
×
250
  if (code != 0) {
×
UNCOV
251
    taosArrayDestroy(pTaskList);
×
UNCOV
252
    return code;
×
253
  }
254

255
  // pTaskList already freed here
UNCOV
256
  code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTaskList,pTask->info.nodeId, &pTask->id, pTrans->id);
×
UNCOV
257
  if (code) {
×
UNCOV
258
    mError("failed to build stream task epset update msg, code:%s", tstrerror(code));
×
259
    taosMemoryFree(pBuf);
×
UNCOV
260
    return code;
×
261
  }
262

UNCOV
263
  code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
×
UNCOV
264
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
×
UNCOV
265
    mError("failed to extract epset during create update epset, code:%s", tstrerror(code));
×
266
    taosMemoryFree(pBuf);
×
UNCOV
267
    return code;
×
268
  }
269

UNCOV
270
  code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
UNCOV
271
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
272
    mError("failed to create update task epset trans, code:%s", tstrerror(code));
×
UNCOV
273
    taosMemoryFree(pBuf);
×
274
  }
275

UNCOV
276
  return code;
×
277
}
278

UNCOV
279
static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
×
UNCOV
280
  SVUpdateCheckpointInfoReq *pReq = taosMemoryCalloc(1, sizeof(SVUpdateCheckpointInfoReq));
×
UNCOV
281
  if (pReq == NULL) {
×
UNCOV
282
    mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVUpdateCheckpointInfoReq),
×
283
           tstrerror(terrno));
284
    return terrno;
×
285
  }
286

UNCOV
287
  pReq->head.vgId = htonl(pTask->info.nodeId);
×
UNCOV
288
  pReq->taskId = pTask->id.taskId;
×
UNCOV
289
  pReq->streamId = pTask->id.streamId;
×
290

UNCOV
291
  SChkptReportInfo *pStreamItem = (SChkptReportInfo*)taosHashGet(execInfo.pChkptStreams, &pTask->id.streamId, sizeof(pTask->id.streamId));
×
UNCOV
292
  if (pStreamItem == NULL) {
×
UNCOV
293
    return TSDB_CODE_INVALID_PARA;
×
294
  }
295

UNCOV
296
  int32_t size = taosArrayGetSize(pStreamItem->pTaskList);
×
UNCOV
297
  for(int32_t i = 0; i < size; ++i) {
×
UNCOV
298
    STaskChkptInfo* pInfo = taosArrayGet(pStreamItem->pTaskList, i);
×
299
    if (pInfo == NULL) {
×
UNCOV
300
      continue;
×
301
    }
302

UNCOV
303
    if (pInfo->taskId == pTask->id.taskId) {
×
UNCOV
304
      pReq->checkpointId = pInfo->checkpointId;
×
UNCOV
305
      pReq->checkpointVer = pInfo->version;
×
UNCOV
306
      pReq->checkpointTs = pInfo->ts;
×
UNCOV
307
      pReq->dropRelHTask = pInfo->dropHTask;
×
UNCOV
308
      pReq->transId = pInfo->transId;
×
UNCOV
309
      pReq->hStreamId = pTask->hTaskInfo.id.streamId;
×
UNCOV
310
      pReq->hTaskId = pTask->hTaskInfo.id.taskId;
×
311
    }
312
  }
313

UNCOV
314
  SEpSet  epset = {0};
×
UNCOV
315
  bool    hasEpset = false;
×
UNCOV
316
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
×
317
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
×
318
    taosMemoryFree(pReq);
×
319
    return code;
×
320
  }
321

322
  code = setTransAction(pTrans, pReq, sizeof(SVUpdateCheckpointInfoReq), TDMT_STREAM_TASK_UPDATE_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
UNCOV
323
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
324
    taosMemoryFree(pReq);
×
325
  }
326

UNCOV
327
  return code;
×
328
}
329

UNCOV
330
static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t chkptId) {
×
UNCOV
331
  SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
×
UNCOV
332
  if (pReq == NULL) {
×
UNCOV
333
    mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
×
334
           tstrerror(terrno));
UNCOV
335
    return terrno;
×
336
  }
337

UNCOV
338
  pReq->head.vgId = htonl(pTask->info.nodeId);
×
UNCOV
339
  pReq->taskId = pTask->id.taskId;
×
UNCOV
340
  pReq->streamId = pTask->id.streamId;
×
341
  pReq->chkptId = chkptId;
×
342

UNCOV
343
  SEpSet  epset = {0};
×
UNCOV
344
  bool    hasEpset = false;
×
UNCOV
345
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
×
UNCOV
346
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
×
UNCOV
347
    taosMemoryFree(pReq);
×
348
    return code;
×
349
  }
350

UNCOV
351
  code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
UNCOV
352
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
353
    taosMemoryFree(pReq);
×
354
  }
355

356
  return code;
×
357
}
358

UNCOV
359
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
×
360
                                                 int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger) {
UNCOV
361
  SStreamCheckpointSourceReq req = {0};
×
UNCOV
362
  req.checkpointId = checkpointId;
×
UNCOV
363
  req.nodeId = nodeId;
×
UNCOV
364
  req.expireTime = -1;
×
UNCOV
365
  req.streamId = streamId;  // pTask->id.streamId;
×
UNCOV
366
  req.taskId = taskId;      // pTask->id.taskId;
×
UNCOV
367
  req.transId = transId;
×
UNCOV
368
  req.mndTrigger = mndTrigger;
×
369

370
  int32_t code;
371
  int32_t blen;
372

UNCOV
373
  tEncodeSize(tEncodeStreamCheckpointSourceReq, &req, blen, code);
×
UNCOV
374
  if (code < 0) {
×
UNCOV
375
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
376
  }
377

UNCOV
378
  int32_t tlen = sizeof(SMsgHead) + blen;
×
379

UNCOV
380
  void *buf = taosMemoryMalloc(tlen);
×
UNCOV
381
  if (buf == NULL) {
×
UNCOV
382
    return terrno;
×
383
  }
384

385
  void    *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
×
386
  SEncoder encoder;
UNCOV
387
  tEncoderInit(&encoder, abuf, tlen);
×
UNCOV
388
  int32_t pos = tEncodeStreamCheckpointSourceReq(&encoder, &req);
×
UNCOV
389
  if (pos == -1) {
×
390
    tEncoderClear(&encoder);
×
391
    return TSDB_CODE_INVALID_MSG;
×
392
  }
393

UNCOV
394
  SMsgHead *pMsgHead = (SMsgHead *)buf;
×
UNCOV
395
  pMsgHead->contLen = htonl(tlen);
×
UNCOV
396
  pMsgHead->vgId = htonl(nodeId);
×
397

UNCOV
398
  tEncoderClear(&encoder);
×
399

UNCOV
400
  *pBuf = buf;
×
UNCOV
401
  *pLen = tlen;
×
402

UNCOV
403
  return 0;
×
404
}
UNCOV
405
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
×
UNCOV
406
  SStreamTaskIter *pIter = NULL;
×
407

UNCOV
408
  int32_t code = createStreamTaskIter(pStream, &pIter);
×
409
  if (code) {
×
410
    mError("failed to create stream task iter:%s", pStream->name);
×
UNCOV
411
    return code;
×
412
  }
413

UNCOV
414
  while (streamTaskIterNextTask(pIter)) {
×
UNCOV
415
    SStreamTask *pTask = NULL;
×
UNCOV
416
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
417
    if (code) {
×
418
      destroyStreamTaskIter(pIter);
×
UNCOV
419
      return code;
×
420
    }
421

UNCOV
422
    code = doSetPauseAction(pMnode, pTrans, pTask);
×
423
    if (code) {
×
424
      destroyStreamTaskIter(pIter);
×
UNCOV
425
      return code;
×
426
    }
427

UNCOV
428
    if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
×
UNCOV
429
      atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus);
×
UNCOV
430
      atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
×
431
    }
432
  }
433

UNCOV
434
  destroyStreamTaskIter(pIter);
×
435
  return code;
×
436
}
437

UNCOV
438
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
×
UNCOV
439
  SStreamTaskIter *pIter = NULL;
×
440

UNCOV
441
  int32_t code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
442
  if (code) {
×
UNCOV
443
    mError("failed to create stream task iter:%s", pStream->name);
×
UNCOV
444
    return code;
×
445
  }
446

UNCOV
447
  while(streamTaskIterNextTask(pIter)) {
×
UNCOV
448
    SStreamTask *pTask = NULL;
×
UNCOV
449
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
450
    if (code) {
×
451
      destroyStreamTaskIter(pIter);
×
452
      return code;
×
453
    }
454

UNCOV
455
    code = doSetDropAction(pMnode, pTrans, pTask);
×
UNCOV
456
    if (code) {
×
UNCOV
457
      destroyStreamTaskIter(pIter);
×
UNCOV
458
      return code;
×
459
    }
460
  }
UNCOV
461
  destroyStreamTaskIter(pIter);
×
UNCOV
462
  return 0;
×
463
}
464

UNCOV
465
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
×
UNCOV
466
  SStreamTaskIter *pIter = NULL;
×
UNCOV
467
  int32_t          code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
468
  if (code) {
×
UNCOV
469
    mError("failed to create stream task iter:%s", pStream->name);
×
UNCOV
470
    return code;
×
471
  }
472

473
  mDebug("transId:%d start to create resume actions", pTrans->id);
×
474

UNCOV
475
  while (streamTaskIterNextTask(pIter)) {
×
UNCOV
476
    SStreamTask *pTask = NULL;
×
UNCOV
477
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
478
    if (code || pTask == NULL) {
×
UNCOV
479
      destroyStreamTaskIter(pIter);
×
480
      return code;
×
481
    }
482

UNCOV
483
    code = doSetResumeAction(pTrans, pMnode, pTask, igUntreated);
×
UNCOV
484
    if (code) {
×
UNCOV
485
      destroyStreamTaskIter(pIter);
×
UNCOV
486
      return code;
×
487
    }
488

489
    if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
×
UNCOV
490
      atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup);
×
491
    }
492
  }
UNCOV
493
  destroyStreamTaskIter(pIter);
×
UNCOV
494
  return 0;
×
495
}
496

497
// build trans to update the epset
UNCOV
498
int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
×
UNCOV
499
  mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
×
UNCOV
500
  SStreamTaskIter *pIter = NULL;
×
501

UNCOV
502
  taosWLockLatch(&pStream->lock);
×
UNCOV
503
  int32_t code = createStreamTaskIter(pStream, &pIter);
×
504
  if (code) {
×
505
    taosWUnLockLatch(&pStream->lock);
×
506
    mError("failed to create stream task iter:%s", pStream->name);
×
UNCOV
507
    return code;
×
508
  }
509

UNCOV
510
  while (streamTaskIterNextTask(pIter)) {
×
UNCOV
511
    SStreamTask *pTask = NULL;
×
UNCOV
512
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
513
    if (code) {
×
514
      destroyStreamTaskIter(pIter);
×
515
      taosWUnLockLatch(&pStream->lock);
×
UNCOV
516
      return code;
×
517
    }
518

UNCOV
519
    code = doSetUpdateTaskAction(pMnode, pTrans, pTask, pInfo);
×
520
    if (code != TSDB_CODE_SUCCESS) {
×
521
      destroyStreamTaskIter(pIter);
×
522
      taosWUnLockLatch(&pStream->lock);
×
UNCOV
523
      return code;
×
524
    }
525
  }
526

UNCOV
527
  destroyStreamTaskIter(pIter);
×
UNCOV
528
  taosWUnLockLatch(&pStream->lock);
×
UNCOV
529
  return 0;
×
530
}
531

UNCOV
532
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
×
UNCOV
533
  SStreamTaskIter *pIter = NULL;
×
534

535
  taosWLockLatch(&pStream->lock);
×
UNCOV
536
  int32_t code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
537
  if (code) {
×
UNCOV
538
    taosWUnLockLatch(&pStream->lock);
×
UNCOV
539
    mError("failed to create stream task iter:%s", pStream->name);
×
540
    return code;
×
541
  }
542

UNCOV
543
  while (streamTaskIterNextTask(pIter)) {
×
UNCOV
544
    SStreamTask *pTask = NULL;
×
UNCOV
545
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
546
    if (code) {
×
UNCOV
547
      destroyStreamTaskIter(pIter);
×
UNCOV
548
      taosWUnLockLatch(&pStream->lock);
×
UNCOV
549
      return code;
×
550
    }
551

UNCOV
552
    code = doSetUpdateChkptAction(pMnode, pTrans, pTask);
×
UNCOV
553
    if (code != TSDB_CODE_SUCCESS) {
×
554
      destroyStreamTaskIter(pIter);
×
555
      taosWUnLockLatch(&pStream->lock);
×
556
      return code;
×
557
    }
558
  }
559

UNCOV
560
  destroyStreamTaskIter(pIter);
×
UNCOV
561
  taosWUnLockLatch(&pStream->lock);
×
UNCOV
562
  return code;
×
563
}
564

565
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* pList) {
×
UNCOV
566
  for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
UNCOV
567
    SOrphanTask* pTask = taosArrayGet(pList, i);
×
UNCOV
568
    if (pTask == NULL) {
×
UNCOV
569
      return terrno;
×
570
    }
571

572
    int32_t code = doSetDropActionFromId(pMnode, pTrans, pTask);
×
UNCOV
573
    if (code != 0) {
×
UNCOV
574
      return code;
×
575
    } else {
UNCOV
576
      mDebug("add drop task:0x%x action to drop orphan task", pTask->taskId);
×
577
    }
578
  }
UNCOV
579
  return 0;
×
580
}
581

UNCOV
582
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId) {
×
UNCOV
583
  SStreamTaskIter *pIter = NULL;
×
584

UNCOV
585
  taosWLockLatch(&pStream->lock);
×
UNCOV
586
  int32_t code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
587
  if (code) {
×
UNCOV
588
    taosWUnLockLatch(&pStream->lock);
×
UNCOV
589
    mError("failed to create stream task iter:%s", pStream->name);
×
UNCOV
590
    return code;
×
591
  }
592

UNCOV
593
  while (streamTaskIterNextTask(pIter)) {
×
UNCOV
594
    SStreamTask *pTask = NULL;
×
595
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
596
    if (code) {
×
UNCOV
597
      destroyStreamTaskIter(pIter);
×
UNCOV
598
      taosWUnLockLatch(&pStream->lock);
×
UNCOV
599
      return code;
×
600
    }
601

602
    code = doSetResetAction(pMnode, pTrans, pTask, chkptId);
×
UNCOV
603
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
604
      destroyStreamTaskIter(pIter);
×
UNCOV
605
      taosWUnLockLatch(&pStream->lock);
×
UNCOV
606
      return code;
×
607
    }
608
  }
609

UNCOV
610
  destroyStreamTaskIter(pIter);
×
611
  taosWUnLockLatch(&pStream->lock);
×
612
  return 0;
×
613
}
614

UNCOV
615
int32_t doSetCheckpointIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts) {
×
UNCOV
616
  SRestoreCheckpointInfo req = {
×
UNCOV
617
      .taskId = pTask->id.taskId,
×
UNCOV
618
      .streamId = pTask->id.streamId,
×
619
      .checkpointId = checkpointId,
620
      .startTs = ts,
UNCOV
621
      .nodeId = pTask->info.nodeId,
×
UNCOV
622
      .transId = pTrans->id,
×
623
  };
624

UNCOV
625
  int32_t code = 0;
×
626
  int32_t blen;
UNCOV
627
  tEncodeSize(tEncodeRestoreCheckpointInfo, &req, blen, code);
×
UNCOV
628
  if (code < 0) {
×
629
    return terrno;
×
630
  }
631

UNCOV
632
  int32_t tlen = sizeof(SMsgHead) + blen;
×
633

UNCOV
634
  void *pBuf = taosMemoryMalloc(tlen);
×
UNCOV
635
  if (pBuf == NULL) {
×
UNCOV
636
    return terrno;
×
637
  }
638

UNCOV
639
  void    *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
×
640
  SEncoder encoder;
UNCOV
641
  tEncoderInit(&encoder, abuf, tlen);
×
UNCOV
642
  code = tEncodeRestoreCheckpointInfo(&encoder, &req);
×
643
  tEncoderClear(&encoder);
×
644
  if (code < 0) {
×
645
    taosMemoryFree(pBuf);
×
UNCOV
646
    return code;
×
647
  }
648

UNCOV
649
  SMsgHead *pMsgHead = (SMsgHead *)pBuf;
×
UNCOV
650
  pMsgHead->contLen = htonl(tlen);
×
UNCOV
651
  pMsgHead->vgId = htonl(pTask->info.nodeId);
×
652

653
  SEpSet  epset = {0};
×
654
  bool    hasEpset = false;
×
UNCOV
655
  code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
×
UNCOV
656
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
×
UNCOV
657
    taosMemoryFree(pBuf);
×
UNCOV
658
    return code;
×
659
  }
660

UNCOV
661
  code = setTransAction(pTrans, pBuf, tlen, TDMT_STREAM_CONSEN_CHKPT, &epset, TSDB_CODE_STREAM_TASK_IVLD_STATUS, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
UNCOV
662
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
663
    taosMemoryFree(pBuf);
×
664
  }
665

UNCOV
666
  return code;
×
667
}
668

669
int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t checkpointId,
×
670
                                  SArray *pList) {
671
  SStreamTaskIter *pIter = NULL;
×
UNCOV
672
  int32_t          num = taosArrayGetSize(pList);
×
673

UNCOV
674
  taosWLockLatch(&pStream->lock);
×
UNCOV
675
  int32_t code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
676
  if (code) {
×
UNCOV
677
    taosWUnLockLatch(&pStream->lock);
×
UNCOV
678
    mError("failed to create stream task iter:%s", pStream->name);
×
UNCOV
679
    return code;
×
680
  }
681

UNCOV
682
  while (streamTaskIterNextTask(pIter)) {
×
UNCOV
683
    SStreamTask *pTask = NULL;
×
UNCOV
684
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
685
    if (code) {
×
UNCOV
686
      destroyStreamTaskIter(pIter);
×
UNCOV
687
      taosWUnLockLatch(&pStream->lock);
×
UNCOV
688
      return code;
×
689
    }
690

691
    // find the required entry
UNCOV
692
    int64_t startTs = 0;
×
UNCOV
693
    for(int32_t i = 0; i < num; ++i) {
×
UNCOV
694
      SCheckpointConsensusEntry* pEntry = taosArrayGet(pList, i);
×
UNCOV
695
      if (pEntry->req.taskId == pTask->id.taskId) {
×
696
        startTs = pEntry->req.startTs;
×
697
        break;
×
698
      }
699
    }
700

UNCOV
701
    code = doSetCheckpointIdAction(pMnode, pTrans, pTask, checkpointId, startTs);
×
UNCOV
702
    if (code != TSDB_CODE_SUCCESS) {
×
703
      destroyStreamTaskIter(pIter);
×
UNCOV
704
      taosWUnLockLatch(&pStream->lock);
×
UNCOV
705
      return code;
×
706
    }
707
  }
708

709
  destroyStreamTaskIter(pIter);
×
710
  taosWUnLockLatch(&pStream->lock);
×
UNCOV
711
  return 0;
×
712
}
713

UNCOV
714
int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,
×
715
                                     int8_t mndTrigger) {
716
  void   *buf;
717
  int32_t tlen;
UNCOV
718
  int32_t code = 0;
×
UNCOV
719
  SEpSet  epset = {0};
×
UNCOV
720
  bool    hasEpset = false;
×
721

UNCOV
722
  if ((code = mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
×
723
                                                pTask->id.taskId, pTrans->id, mndTrigger)) < 0) {
724
    taosMemoryFree(buf);
×
UNCOV
725
    return code;
×
726
  }
727

UNCOV
728
  code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
×
UNCOV
729
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
×
UNCOV
730
    taosMemoryFree(buf);
×
731
    return code;
×
732
  }
733

UNCOV
734
  code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY,
×
735
                        TSDB_CODE_VND_INVALID_VGROUP_ID);
UNCOV
736
  if (code != 0) {
×
UNCOV
737
    taosMemoryFree(buf);
×
738
  }
739

740
  return code;
×
741
}
742

UNCOV
743
int32_t mndStreamSetStopAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pStream) {
×
UNCOV
744
  return 0;
×
745
}
746

747

748
static int32_t doSetStopAllTasksAction(SMnode* pMnode, STrans* pTrans, SVgObj* pVgObj) {
16✔
749
  void   *pBuf = NULL;
16✔
750
  int32_t len = 0;
16✔
751
  int32_t code = 0;
16✔
752
  SEncoder encoder;
753

754
  SStreamTaskStopReq req = {.streamId = -1};
16✔
755
  tEncodeSize(tEncodeStreamTaskStopReq, &req, len, code);
16!
756
  if (code < 0) {
16!
UNCOV
757
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
758
    return terrno;
×
759
  }
760

761
  int32_t tlen = sizeof(SMsgHead) + len;
16✔
762

763
  pBuf = taosMemoryMalloc(tlen);
16!
764
  if (pBuf == NULL) {
16!
UNCOV
765
    return terrno;
×
766
  }
767

768
  void    *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
16✔
769
  tEncoderInit(&encoder, abuf, tlen);
16✔
770
  code = tEncodeStreamTaskStopReq(&encoder, &req);
16✔
771
  if (code == -1) {
16!
UNCOV
772
    tEncoderClear(&encoder);
×
773
    taosMemoryFree(pBuf);
×
774
    return code;
×
775
  }
776

777
  SMsgHead *pMsgHead = (SMsgHead *)pBuf;
16✔
778
  pMsgHead->contLen = htonl(tlen);
16✔
779
  pMsgHead->vgId = htonl(pVgObj->vgId);
16✔
780

781
  tEncoderClear(&encoder);
16✔
782

783
  SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
16✔
784
  mndReleaseVgroup(pMnode, pVgObj);
16✔
785

786
  code = setTransAction(pTrans, pBuf, tlen, TDMT_VND_STREAM_ALL_STOP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
16✔
787
  if (code != TSDB_CODE_SUCCESS) {
16!
UNCOV
788
    mError("failed to create stop all streams trans, code:%s", tstrerror(code));
×
UNCOV
789
    taosMemoryFree(pBuf);
×
790
  }
791

792
  return 0;
16✔
793
}
794

795
int32_t mndStreamSetStopStreamTasksActions(SMnode* pMnode, STrans *pTrans, uint64_t dbUid) {
8✔
796
  int32_t code = 0;
8✔
797
  SSdb   *pSdb = pMnode->pSdb;
8✔
798
  void   *pIter = NULL;
8✔
799

800
  while (1) {
16✔
801
    SVgObj *pVgroup = NULL;
24✔
802
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
24✔
803
    if (pIter == NULL) break;
24✔
804

805
    if (pVgroup->dbUid == dbUid) {
16!
806
      if ((code = doSetStopAllTasksAction(pMnode, pTrans, pVgroup)) != 0) {
16!
UNCOV
807
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
808
        sdbRelease(pSdb, pVgroup);
×
UNCOV
809
        TAOS_RETURN(code);
×
810
      }
811
    }
812

813
    sdbRelease(pSdb, pVgroup);
16✔
814
  }
815

816
  TAOS_RETURN(code);
8✔
817
  return 0;
818
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc