• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3523

06 Nov 2024 02:29AM UTC coverage: 55.861% (-2.4%) from 58.216%
#3523

push

travis-ci

web-flow
Merge pull request #28551 from taosdata/feat/TS-5215-2

test(blob): testing & fixes for blob

106075 of 245834 branches covered (43.15%)

Branch coverage included in aggregate %.

0 of 15 new or added lines in 2 files covered. (0.0%)

17003 existing lines in 254 files now uncovered.

181910 of 269703 relevant lines covered (67.45%)

1527639.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.45
/source/dnode/mnode/impl/src/mndStreamTransAct.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23

24
static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
127✔
25
  SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
127✔
26
  if (pReq == NULL) {
127!
27
    mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
×
28
           tstrerror(TSDB_CODE_OUT_OF_MEMORY));
29
    // terrno = TSDB_CODE_OUT_OF_MEMORY;
30
    return terrno;
×
31
  }
32

33
  pReq->head.vgId = htonl(pTask->info.nodeId);
127✔
34
  pReq->taskId = pTask->id.taskId;
127✔
35
  pReq->streamId = pTask->id.streamId;
127✔
36

37
  SEpSet  epset = {0};
127✔
38
  bool    hasEpset = false;
127✔
39
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
127✔
40
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
127!
41
    terrno = code;
×
42
    taosMemoryFree(pReq);
×
43
    return code;
×
44
  }
45

46
  char buf[256] = {0};
127✔
47
  code = epsetToStr(&epset, buf, tListLen(buf));
127✔
48
  if (code != 0) {  // print error and continue
127!
49
    mError("failed to convert epset to str, code:%s", tstrerror(code));
×
50
  }
51

52
  mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
127✔
53
  code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
127✔
54
  if (code != 0) {
127!
55
    taosMemoryFree(pReq);
×
56
    return code;
×
57
  }
58
  return 0;
127✔
59
}
60

61
static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
3,025✔
62
  SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
3,025✔
63
  if (pReq == NULL) {
3,025!
64
    return terrno;
×
65
  }
66

67
  pReq->head.vgId = htonl(pTask->info.nodeId);
3,025✔
68
  pReq->taskId = pTask->id.taskId;
3,025✔
69
  pReq->streamId = pTask->id.streamId;
3,025✔
70

71
  SEpSet  epset = {0};
3,025✔
72
  bool    hasEpset = false;
3,025✔
73
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
3,025✔
74
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {  // no valid epset, return directly without redoAction
3,025!
75
    return code;
×
76
  }
77

78
  // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
79
  code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
3,025✔
80
  if (code != 0) {
3,025!
81
    taosMemoryFree(pReq);
×
82
    return code;
×
83
  }
84

85
  return 0;
3,025✔
86
}
87

88
static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) {
120✔
89
  terrno = 0;
120✔
90

91
  SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
120✔
92
  if (pReq == NULL) {
120!
93
    mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
×
94
           tstrerror(TSDB_CODE_OUT_OF_MEMORY));
95
    return terrno;
×
96
  }
97

98
  pReq->head.vgId = htonl(pTask->info.nodeId);
120✔
99
  pReq->taskId = pTask->id.taskId;
120✔
100
  pReq->streamId = pTask->id.streamId;
120✔
101
  pReq->igUntreated = igUntreated;
120✔
102

103
  SEpSet  epset = {0};
120✔
104
  bool    hasEpset = false;
120✔
105
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
120✔
106
  if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {
120!
107
    taosMemoryFree(pReq);
×
108
    return code;
×
109
  }
110

111
  code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
120✔
112
  if (code != 0) {
120!
113
    taosMemoryFree(pReq);
×
114
    return code;
×
115
  }
116

117
  mDebug("set the resume action for trans:%d", pTrans->id);
120✔
118
  return code;
120✔
119
}
120

121
static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) {
3✔
122
  SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
3✔
123
  if (pReq == NULL) {
3!
124
    // terrno = TSDB_CODE_OUT_OF_MEMORY;
125
    return terrno;
×
126
  }
127

128
  pReq->head.vgId = htonl(pTask->nodeId);
3✔
129
  pReq->taskId = pTask->taskId;
3✔
130
  pReq->streamId = pTask->streamId;
3✔
131

132
  SEpSet  epset = {0};
3✔
133
  bool    hasEpset = false;
3✔
134
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->taskId, pTask->nodeId);
3✔
135
  if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {  // no valid epset, return directly without redoAction
3!
136
    taosMemoryFree(pReq);
×
137
    return code;
×
138
  }
139

140
  // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
141
  code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
3✔
142
  if (code != 0) {
3!
143
    taosMemoryFree(pReq);
×
144
    return code;
×
145
  }
146

147
  return 0;
3✔
148
}
149

UNCOV
150
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId,
×
151
                              int32_t transId) {
UNCOV
152
  int32_t code = 0;
×
153

UNCOV
154
  pMsg->streamId = pId->streamId;
×
UNCOV
155
  pMsg->taskId = pId->taskId;
×
UNCOV
156
  pMsg->transId = transId;
×
UNCOV
157
  pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
×
UNCOV
158
  if (pMsg->pNodeList == NULL) {
×
159
    mError("failed to prepare node list, code:%s", tstrerror(terrno));
×
160
    code = terrno;
×
161
  }
162

UNCOV
163
  if (code == 0) {
×
UNCOV
164
    void *p = taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
×
UNCOV
165
    if (p == NULL) {
×
166
      mError("failed to add update node list into nodeList");
×
167
    }
168
  }
UNCOV
169
}
×
170

UNCOV
171
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
×
172
                                          SStreamTaskId *pId, int32_t transId) {
UNCOV
173
  SStreamTaskNodeUpdateMsg req = {0};
×
UNCOV
174
  initNodeUpdateMsg(&req, pInfo, pId, transId);
×
175

UNCOV
176
  int32_t code = 0;
×
177
  int32_t blen;
178

UNCOV
179
  tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
×
UNCOV
180
  if (code < 0) {
×
181
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
182
    taosArrayDestroy(req.pNodeList);
×
183
    return terrno;
×
184
  }
185

UNCOV
186
  int32_t tlen = sizeof(SMsgHead) + blen;
×
187

UNCOV
188
  void *buf = taosMemoryMalloc(tlen);
×
UNCOV
189
  if (buf == NULL) {
×
190
    taosArrayDestroy(req.pNodeList);
×
191
    return terrno;
×
192
  }
193

UNCOV
194
  void    *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
×
195
  SEncoder encoder;
UNCOV
196
  tEncoderInit(&encoder, abuf, tlen);
×
UNCOV
197
  code = tEncodeStreamTaskUpdateMsg(&encoder, &req);
×
UNCOV
198
  if (code == -1) {
×
199
    tEncoderClear(&encoder);
×
200
    taosMemoryFree(buf);
×
201
    taosArrayDestroy(req.pNodeList);
×
202
    return code;
×
203
  }
204

UNCOV
205
  SMsgHead *pMsgHead = (SMsgHead *)buf;
×
UNCOV
206
  pMsgHead->contLen = htonl(tlen);
×
UNCOV
207
  pMsgHead->vgId = htonl(nodeId);
×
208

UNCOV
209
  tEncoderClear(&encoder);
×
210

UNCOV
211
  *pBuf = buf;
×
UNCOV
212
  *pLen = tlen;
×
213

UNCOV
214
  taosArrayDestroy(req.pNodeList);
×
UNCOV
215
  return TSDB_CODE_SUCCESS;
×
216
}
217

UNCOV
218
static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
×
UNCOV
219
  void   *pBuf = NULL;
×
UNCOV
220
  int32_t len = 0;
×
UNCOV
221
  SEpSet  epset = {0};
×
UNCOV
222
  bool    hasEpset = false;
×
223

UNCOV
224
  bool    unusedRet = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
×
UNCOV
225
  int32_t code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
×
UNCOV
226
  if (code) {
×
227
    mError("failed to build stream task epset update msg, code:%s", tstrerror(code));
×
228
    return code;
×
229
  }
230

UNCOV
231
  code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
×
UNCOV
232
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
×
233
    mError("failed to extract epset during create update epset, code:%s", tstrerror(code));
×
234
    taosMemoryFree(pBuf);
×
235
    return code;
×
236
  }
237

UNCOV
238
  code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
UNCOV
239
  if (code != TSDB_CODE_SUCCESS) {
×
240
    mError("failed to create update task epset trans, code:%s", tstrerror(code));
×
241
    taosMemoryFree(pBuf);
×
242
  }
243

UNCOV
244
  return code;
×
245
}
246

247
static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
1,940✔
248
  SVUpdateCheckpointInfoReq *pReq = taosMemoryCalloc(1, sizeof(SVUpdateCheckpointInfoReq));
1,940✔
249
  if (pReq == NULL) {
1,940!
250
    mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVUpdateCheckpointInfoReq),
×
251
           tstrerror(terrno));
252
    return terrno;
×
253
  }
254

255
  pReq->head.vgId = htonl(pTask->info.nodeId);
1,940✔
256
  pReq->taskId = pTask->id.taskId;
1,940✔
257
  pReq->streamId = pTask->id.streamId;
1,940✔
258

259
  SChkptReportInfo *pStreamItem = (SChkptReportInfo*)taosHashGet(execInfo.pChkptStreams, &pTask->id.streamId, sizeof(pTask->id.streamId));
1,940✔
260
  if (pStreamItem == NULL) {
1,940!
261
    return TSDB_CODE_INVALID_PARA;
×
262
  }
263

264
  int32_t size = taosArrayGetSize(pStreamItem->pTaskList);
1,940✔
265
  for(int32_t i = 0; i < size; ++i) {
10,206✔
266
    STaskChkptInfo* pInfo = taosArrayGet(pStreamItem->pTaskList, i);
8,266✔
267
    if (pInfo == NULL) {
8,266!
268
      continue;
×
269
    }
270

271
    if (pInfo->taskId == pTask->id.taskId) {
8,266✔
272
      pReq->checkpointId = pInfo->checkpointId;
1,940✔
273
      pReq->checkpointVer = pInfo->version;
1,940✔
274
      pReq->checkpointTs = pInfo->ts;
1,940✔
275
      pReq->dropRelHTask = pInfo->dropHTask;
1,940✔
276
      pReq->transId = pInfo->transId;
1,940✔
277
      pReq->hStreamId = pTask->hTaskInfo.id.streamId;
1,940✔
278
      pReq->hTaskId = pTask->hTaskInfo.id.taskId;
1,940✔
279
    }
280
  }
281

282
  SEpSet  epset = {0};
1,940✔
283
  bool    hasEpset = false;
1,940✔
284
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
1,940✔
285
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
1,940!
286
    taosMemoryFree(pReq);
×
287
    return code;
×
288
  }
289

290
  code = setTransAction(pTrans, pReq, sizeof(SVUpdateCheckpointInfoReq), TDMT_STREAM_TASK_UPDATE_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
1,940✔
291
  if (code != TSDB_CODE_SUCCESS) {
1,940!
292
    taosMemoryFree(pReq);
×
293
  }
294

295
  return code;
1,940✔
296
}
297

UNCOV
298
static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
×
UNCOV
299
  SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
×
UNCOV
300
  if (pReq == NULL) {
×
301
    mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
×
302
           tstrerror(terrno));
303
    return terrno;
×
304
  }
305

UNCOV
306
  pReq->head.vgId = htonl(pTask->info.nodeId);
×
UNCOV
307
  pReq->taskId = pTask->id.taskId;
×
UNCOV
308
  pReq->streamId = pTask->id.streamId;
×
309

UNCOV
310
  SEpSet  epset = {0};
×
UNCOV
311
  bool    hasEpset = false;
×
UNCOV
312
  int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
×
UNCOV
313
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
×
314
    taosMemoryFree(pReq);
×
315
    return code;
×
316
  }
317

UNCOV
318
  code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
×
UNCOV
319
  if (code != TSDB_CODE_SUCCESS) {
×
320
    taosMemoryFree(pReq);
×
321
  }
322

UNCOV
323
  return code;
×
324
}
325

326
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
1,488✔
327
                                                 int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger) {
328
  SStreamCheckpointSourceReq req = {0};
1,488✔
329
  req.checkpointId = checkpointId;
1,488✔
330
  req.nodeId = nodeId;
1,488✔
331
  req.expireTime = -1;
1,488✔
332
  req.streamId = streamId;  // pTask->id.streamId;
1,488✔
333
  req.taskId = taskId;      // pTask->id.taskId;
1,488✔
334
  req.transId = transId;
1,488✔
335
  req.mndTrigger = mndTrigger;
1,488✔
336

337
  int32_t code;
338
  int32_t blen;
339

340
  tEncodeSize(tEncodeStreamCheckpointSourceReq, &req, blen, code);
1,488!
341
  if (code < 0) {
1,488!
342
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
343
  }
344

345
  int32_t tlen = sizeof(SMsgHead) + blen;
1,488✔
346

347
  void *buf = taosMemoryMalloc(tlen);
1,488✔
348
  if (buf == NULL) {
1,488!
349
    return terrno;
×
350
  }
351

352
  void    *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
1,488✔
353
  SEncoder encoder;
354
  tEncoderInit(&encoder, abuf, tlen);
1,488✔
355
  int32_t pos = tEncodeStreamCheckpointSourceReq(&encoder, &req);
1,488✔
356
  if (pos == -1) {
1,488!
357
    tEncoderClear(&encoder);
×
358
    return TSDB_CODE_INVALID_MSG;
×
359
  }
360

361
  SMsgHead *pMsgHead = (SMsgHead *)buf;
1,488✔
362
  pMsgHead->contLen = htonl(tlen);
1,488✔
363
  pMsgHead->vgId = htonl(nodeId);
1,488✔
364

365
  tEncoderClear(&encoder);
1,488✔
366

367
  *pBuf = buf;
1,488✔
368
  *pLen = tlen;
1,488✔
369

370
  return 0;
1,488✔
371
}
372
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
20✔
373
  SStreamTaskIter *pIter = NULL;
20✔
374

375
  int32_t code = createStreamTaskIter(pStream, &pIter);
20✔
376
  if (code) {
20!
377
    mError("failed to create stream task iter:%s", pStream->name);
×
378
    return code;
×
379
  }
380

381
  while (streamTaskIterNextTask(pIter)) {
147✔
382
    SStreamTask *pTask = NULL;
127✔
383
    code = streamTaskIterGetCurrent(pIter, &pTask);
127✔
384
    if (code) {
127!
385
      destroyStreamTaskIter(pIter);
×
386
      return code;
×
387
    }
388

389
    code = doSetPauseAction(pMnode, pTrans, pTask);
127✔
390
    if (code) {
127!
391
      destroyStreamTaskIter(pIter);
×
392
      return code;
×
393
    }
394

395
    if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
127!
396
      atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus);
127✔
397
      atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
127✔
398
    }
399
  }
400

401
  destroyStreamTaskIter(pIter);
20✔
402
  return code;
20✔
403
}
404

405
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
539✔
406
  SStreamTaskIter *pIter = NULL;
539✔
407

408
  int32_t code = createStreamTaskIter(pStream, &pIter);
539✔
409
  if (code) {
539!
410
    mError("failed to create stream task iter:%s", pStream->name);
×
411
    return code;
×
412
  }
413

414
  while(streamTaskIterNextTask(pIter)) {
3,564✔
415
    SStreamTask *pTask = NULL;
3,025✔
416
    code = streamTaskIterGetCurrent(pIter, &pTask);
3,025✔
417
    if (code) {
3,025!
418
      destroyStreamTaskIter(pIter);
×
419
      return code;
×
420
    }
421

422
    code = doSetDropAction(pMnode, pTrans, pTask);
3,025✔
423
    if (code) {
3,025!
424
      destroyStreamTaskIter(pIter);
×
425
      return code;
×
426
    }
427
  }
428
  destroyStreamTaskIter(pIter);
539✔
429
  return 0;
539✔
430
}
431

432
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
19✔
433
  SStreamTaskIter *pIter = NULL;
19✔
434
  int32_t          code = createStreamTaskIter(pStream, &pIter);
19✔
435
  if (code) {
19!
436
    mError("failed to create stream task iter:%s", pStream->name);
×
437
    return code;
×
438
  }
439

440
  while (streamTaskIterNextTask(pIter)) {
139✔
441
    SStreamTask *pTask = NULL;
120✔
442
    code = streamTaskIterGetCurrent(pIter, &pTask);
120✔
443
    if (code || pTask == NULL) {
120!
444
      destroyStreamTaskIter(pIter);
×
445
      return code;
×
446
    }
447

448
    code = doSetResumeAction(pTrans, pMnode, pTask, igUntreated);
120✔
449
    if (code) {
120!
450
      destroyStreamTaskIter(pIter);
×
451
      return code;
×
452
    }
453

454
    if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
120!
455
      atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup);
120✔
456
    }
457
  }
458
  destroyStreamTaskIter(pIter);
19✔
459
  return 0;
19✔
460
}
461

462
// build trans to update the epset
UNCOV
463
int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
×
UNCOV
464
  mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
×
UNCOV
465
  SStreamTaskIter *pIter = NULL;
×
466

UNCOV
467
  taosWLockLatch(&pStream->lock);
×
UNCOV
468
  int32_t code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
469
  if (code) {
×
470
    taosWUnLockLatch(&pStream->lock);
×
471
    mError("failed to create stream task iter:%s", pStream->name);
×
472
    return code;
×
473
  }
474

UNCOV
475
  while (streamTaskIterNextTask(pIter)) {
×
UNCOV
476
    SStreamTask *pTask = NULL;
×
UNCOV
477
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
478
    if (code) {
×
479
      destroyStreamTaskIter(pIter);
×
480
      taosWUnLockLatch(&pStream->lock);
×
481
      return code;
×
482
    }
483

UNCOV
484
    code = doSetUpdateTaskAction(pMnode, pTrans, pTask, pInfo);
×
UNCOV
485
    if (code != TSDB_CODE_SUCCESS) {
×
486
      destroyStreamTaskIter(pIter);
×
487
      taosWUnLockLatch(&pStream->lock);
×
488
      return code;
×
489
    }
490
  }
491

UNCOV
492
  destroyStreamTaskIter(pIter);
×
UNCOV
493
  taosWUnLockLatch(&pStream->lock);
×
UNCOV
494
  return 0;
×
495
}
496

497
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
637✔
498
  SStreamTaskIter *pIter = NULL;
637✔
499

500
  taosWLockLatch(&pStream->lock);
637✔
501
  int32_t code = createStreamTaskIter(pStream, &pIter);
637✔
502
  if (code) {
637!
503
    taosWUnLockLatch(&pStream->lock);
×
504
    mError("failed to create stream task iter:%s", pStream->name);
×
505
    return code;
×
506
  }
507

508
  while (streamTaskIterNextTask(pIter)) {
2,577✔
509
    SStreamTask *pTask = NULL;
1,940✔
510
    code = streamTaskIterGetCurrent(pIter, &pTask);
1,940✔
511
    if (code) {
1,940!
512
      destroyStreamTaskIter(pIter);
×
513
      taosWUnLockLatch(&pStream->lock);
×
514
      return code;
×
515
    }
516

517
    code = doSetUpdateChkptAction(pMnode, pTrans, pTask);
1,940✔
518
    if (code != TSDB_CODE_SUCCESS) {
1,940!
519
      destroyStreamTaskIter(pIter);
×
520
      taosWUnLockLatch(&pStream->lock);
×
521
      return code;
×
522
    }
523
  }
524

525
  destroyStreamTaskIter(pIter);
637✔
526
  taosWUnLockLatch(&pStream->lock);
637✔
527
  return code;
637✔
528
}
529

530
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* pList) {
3✔
531
  for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
6✔
532
    SOrphanTask* pTask = taosArrayGet(pList, i);
3✔
533
    if (pTask == NULL) {
3!
534
      return terrno;
×
535
    }
536

537
    int32_t code = doSetDropActionFromId(pMnode, pTrans, pTask);
3✔
538
    if (code != 0) {
3!
539
      return code;
×
540
    } else {
541
      mDebug("add drop task:0x%x action to drop orphan task", pTask->taskId);
3!
542
    }
543
  }
544
  return 0;
3✔
545
}
546

UNCOV
547
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
×
UNCOV
548
  SStreamTaskIter *pIter = NULL;
×
549

UNCOV
550
  taosWLockLatch(&pStream->lock);
×
UNCOV
551
  int32_t code = createStreamTaskIter(pStream, &pIter);
×
UNCOV
552
  if (code) {
×
553
    taosWUnLockLatch(&pStream->lock);
×
554
    mError("failed to create stream task iter:%s", pStream->name);
×
555
    return code;
×
556
  }
557

UNCOV
558
  while (streamTaskIterNextTask(pIter)) {
×
UNCOV
559
    SStreamTask *pTask = NULL;
×
UNCOV
560
    code = streamTaskIterGetCurrent(pIter, &pTask);
×
UNCOV
561
    if (code) {
×
562
      destroyStreamTaskIter(pIter);
×
563
      taosWUnLockLatch(&pStream->lock);
×
564
      return code;
×
565
    }
566

UNCOV
567
    code = doSetResetAction(pMnode, pTrans, pTask);
×
UNCOV
568
    if (code != TSDB_CODE_SUCCESS) {
×
569
      destroyStreamTaskIter(pIter);
×
570
      taosWUnLockLatch(&pStream->lock);
×
571
      return code;
×
572
    }
573
  }
574

UNCOV
575
  destroyStreamTaskIter(pIter);
×
UNCOV
576
  taosWUnLockLatch(&pStream->lock);
×
UNCOV
577
  return 0;
×
578
}
579

580
int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts) {
76✔
581
  SRestoreCheckpointInfo req = {
76✔
582
      .taskId = pTask->id.taskId,
76✔
583
      .streamId = pTask->id.streamId,
76✔
584
      .checkpointId = checkpointId,
585
      .startTs = ts,
586
      .nodeId = pTask->info.nodeId,
76✔
587
      .transId = pTrans->id,
76✔
588
  };
589

590
  int32_t code = 0;
76✔
591
  int32_t blen;
592
  tEncodeSize(tEncodeRestoreCheckpointInfo, &req, blen, code);
76!
593
  if (code < 0) {
76!
594
    return terrno;
×
595
  }
596

597
  int32_t tlen = sizeof(SMsgHead) + blen;
76✔
598

599
  void *pBuf = taosMemoryMalloc(tlen);
76✔
600
  if (pBuf == NULL) {
76!
601
    return terrno;
×
602
  }
603

604
  void    *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
76✔
605
  SEncoder encoder;
606
  tEncoderInit(&encoder, abuf, tlen);
76✔
607
  code = tEncodeRestoreCheckpointInfo(&encoder, &req);
76✔
608
  tEncoderClear(&encoder);
76✔
609
  if (code == -1) {
76!
610
    taosMemoryFree(pBuf);
×
611
    return code;
×
612
  }
613

614
  SMsgHead *pMsgHead = (SMsgHead *)pBuf;
76✔
615
  pMsgHead->contLen = htonl(tlen);
76✔
616
  pMsgHead->vgId = htonl(pTask->info.nodeId);
76✔
617

618
  SEpSet  epset = {0};
76✔
619
  bool    hasEpset = false;
76✔
620
  code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
76✔
621
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
76!
622
    taosMemoryFree(pBuf);
×
623
    return code;
×
624
  }
625

626
  code = setTransAction(pTrans, pBuf, tlen, TDMT_STREAM_CONSEN_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
76✔
627
  if (code != TSDB_CODE_SUCCESS) {
76!
628
    taosMemoryFree(pBuf);
×
629
  }
630

631
  return code;
76✔
632
}
633

634

635
int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,
1,488✔
636
                                     int8_t mndTrigger) {
637
  void   *buf;
638
  int32_t tlen;
639
  int32_t code = 0;
1,488✔
640
  SEpSet  epset = {0};
1,488✔
641
  bool    hasEpset = false;
1,488✔
642

643
  if ((code = mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
1,488!
644
                                                pTask->id.taskId, pTrans->id, mndTrigger)) < 0) {
645
    taosMemoryFree(buf);
×
646
    return code;
×
647
  }
648

649
  code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
1,488✔
650
  if (code != TSDB_CODE_SUCCESS || !hasEpset) {
1,488!
651
    taosMemoryFree(buf);
×
652
    return code;
×
653
  }
654

655
  code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY,
1,488✔
656
                        TSDB_CODE_VND_INVALID_VGROUP_ID);
657
  if (code != 0) {
1,488!
658
    taosMemoryFree(buf);
×
659
  }
660

661
  return code;
1,488✔
662
}
663

664
int32_t mndStreamSetRestartAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pStream) {
×
665
  return 0;
×
666
}
667

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc