• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4041

09 May 2025 07:58AM UTC coverage: 62.508% (-0.3%) from 62.788%
#4041

push

travis-ci

web-flow
enh: update database fetch functions to include status in JSON output (#31005)

155567 of 317611 branches covered (48.98%)

Branch coverage included in aggregate %.

15 of 18 new or added lines in 1 file covered. (83.33%)

3906 existing lines in 185 files now uncovered.

240901 of 316655 relevant lines covered (76.08%)

6304979.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.67
/source/dnode/mnode/impl/src/mndStreamHb.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndTrans.h"
18
#include "mndMnode.h"
19
#include "tmisce.h"
20

21
typedef struct SFailedCheckpointInfo {
22
  int64_t streamUid;
23
  int64_t checkpointId;
24
  int32_t transId;
25
} SFailedCheckpointInfo;
26

27
static int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode);
28
static int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList);
29
static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId, int64_t checkpointId);
30
static void    updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage);
31
static void    addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo);
32
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
33
static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info);
34
static bool    validateHbMsg(const SArray *pNodeList, int32_t vgId);
35
static void    cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks);
36
static void    doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, SEpSet* pEpset, int32_t vgId, int32_t msgId);
37
static void    checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks);
38

39
void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
31✔
40
  int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
31✔
41
  for (int32_t j = 0; j < numOfNodes; ++j) {
61!
42
    SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, j);
61✔
43
    if (pNodeEntry == NULL) {
61!
44
      continue;
×
45
    }
46

47
    if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
61✔
48
      mInfo("vgId:%d stage updated from %" PRId64 " to %" PRId64 ", nodeUpdate trigger by s-task:0x%" PRIx64,
31!
49
            pTaskEntry->nodeId, pTaskEntry->stage, stage, pTaskEntry->id.taskId);
50

51
      pNodeEntry->stageUpdated = true;
31✔
52
      pTaskEntry->stage = stage;
31✔
53
      break;
31✔
54
    }
55
  }
56
}
31✔
57

58
void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo) {
×
59
  int32_t num = taosArrayGetSize(pList);
×
60
  for (int32_t i = 0; i < num; ++i) {
×
61
    SFailedCheckpointInfo *p = taosArrayGet(pList, i);
×
62
    if (p && (p->transId == pInfo->transId)) {
×
63
      return;
×
64
    }
65
  }
66

67
  void *p = taosArrayPush(pList, pInfo);
×
68
  if (p == NULL) {
×
69
    mError("failed to push failed checkpoint info checkpointId:%" PRId64 " in list", pInfo->checkpointId);
×
70
  }
71
}
72

73
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId) {
1✔
74
  STrans *pTrans = NULL;
1✔
75
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_RESET_NAME,
1✔
76
                               " reset from failed checkpoint", &pTrans);
77
  if (pTrans == NULL || code) {
1!
78
    return terrno;
×
79
  }
80

81
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid);
1✔
82
  if (code) {
1!
UNCOV
83
    mndTransDrop(pTrans);
×
84
    return code;
×
85
  }
86

87
  code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream, chkptId);
1✔
88
  if (code) {
1!
UNCOV
89
    mndTransDrop(pTrans);
×
UNCOV
90
    return code;
×
91
  }
92

93
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
1✔
94
  if (code != TSDB_CODE_SUCCESS) {
1!
UNCOV
95
    mndTransDrop(pTrans);
×
UNCOV
96
    return code;
×
97
  }
98

99
  code = mndTransPrepare(pMnode, pTrans);
1✔
100
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1!
101
    mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
1!
102
    mndTransDrop(pTrans);
1✔
103
    return code;
1✔
104
  }
105

UNCOV
106
  mndTransDrop(pTrans);
×
107

UNCOV
108
  if (code == 0) {
×
UNCOV
109
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
110
  }
111
  return code;
×
112
}
113

114
int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId, int64_t checkpointId) {
×
115
  int32_t size = sizeof(SStreamTaskResetMsg);
×
116

117
  int32_t num = taosArrayGetSize(execInfo.pKilledChkptTrans);
×
UNCOV
118
  for (int32_t i = 0; i < num; ++i) {
×
UNCOV
119
    SStreamTaskResetMsg *p = taosArrayGet(execInfo.pKilledChkptTrans, i);
×
120
    if (p == NULL) {
×
121
      continue;
×
122
    }
123

124
    if (p->transId == transId && p->streamId == streamId) {
×
125
      mDebug("already reset stream:0x%" PRIx64 ", not send reset-msg again for transId:%d", streamId, transId);
×
126
      return TSDB_CODE_SUCCESS;
×
127
    }
128
  }
129

130
  if (num >= 10) {
×
131
    taosArrayRemove(execInfo.pKilledChkptTrans, 0);  // remove this first, append new reset trans in the tail
×
132
  }
133

UNCOV
134
  SStreamTaskResetMsg p = {.streamId = streamId, .transId = transId, .checkpointId = checkpointId};
×
135

136
  // let's remember that this trans had been killed already
137
  void *px = taosArrayPush(execInfo.pKilledChkptTrans, &p);
×
UNCOV
138
  if (px == NULL) {
×
UNCOV
139
    mError("failed to push reset-msg trans:%d into the killed chkpt trans list, size:%d", transId, num - 1);
×
140
    return terrno;
×
141
  }
142

143
  SStreamTaskResetMsg *pReq = rpcMallocCont(size);
×
144
  if (pReq == NULL) {
×
145
    return terrno;
×
146
  }
147

UNCOV
148
  pReq->streamId = streamId;
×
149
  pReq->transId = transId;
×
150
  pReq->checkpointId = checkpointId;
×
151

UNCOV
152
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_TASK_RESET, .pCont = pReq, .contLen = size};
×
UNCOV
153
  int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
154
  if (code) {
×
155
    mError("failed to put reset-task msg into write queue, code:%s", tstrerror(code));
×
156
  } else {
UNCOV
157
    mDebug("send reset task status msg for transId:%d succ", transId);
×
158
  }
159

160
  return code;
×
161
}
162

163
int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode) {  // here reuse the doCheckpointmsg
850✔
164
  int32_t size = sizeof(SMStreamDoCheckpointMsg);
850✔
165
  void   *pMsg = rpcMallocCont(size);
850✔
166
  if (pMsg == NULL) {
850!
UNCOV
167
    return terrno;
×
168
  }
169

170
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_UPDATE_CHKPT_EVT, .pCont = pMsg, .contLen = size};
850✔
171
  int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
850✔
172
  if (code) {
850!
173
    mError("failed to put update-checkpoint-info msg into write queue, code:%s", tstrerror(code));
×
174
  } else {
175
    mDebug("send update checkpoint-info msg succ");
850✔
176
  }
177

178
  return code;
850✔
179
}
180

UNCOV
181
int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList) {
×
UNCOV
182
  SMStreamDropOrphanMsg msg = {.pList = pList};
×
183

UNCOV
184
  int32_t num = taosArrayGetSize(pList);
×
UNCOV
185
  int32_t contLen = tSerializeDropOrphanTaskMsg(NULL, 0, &msg);
×
UNCOV
186
  if (contLen <= 0) {
×
187
    return terrno;
×
188
  }
189

190
  void *pReq = rpcMallocCont(contLen);
×
191
  if (pReq == NULL) {
×
192
    return terrno;
×
193
  }
194

UNCOV
195
  int32_t code = tSerializeDropOrphanTaskMsg(pReq, contLen, &msg);
×
196
  if (code <= 0) {
×
197
    mError("failed to serialize the drop orphan task msg, code:%s", tstrerror(code));
×
198
  }
199

UNCOV
200
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_DROP_ORPHANTASKS, .pCont = pReq, .contLen = contLen};
×
201
  code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
202
  if (code) {
×
203
    mError("failed to put drop-orphan task msg into write queue, code:%s", tstrerror(code));
×
204
  } else {
UNCOV
205
    mDebug("send drop %d orphan tasks msg succ", num);
×
206
  }
207

208
  return code;
×
209
}
210

211
int32_t mndProcessResetStatusReq(SRpcMsg *pReq) {
×
UNCOV
212
  SMnode     *pMnode = pReq->info.node;
×
UNCOV
213
  int32_t     code = TSDB_CODE_SUCCESS;
×
214
  SStreamObj *pStream = NULL;
×
215

UNCOV
216
  SStreamTaskResetMsg *pMsg = pReq->pCont;
×
217
  mndKillTransImpl(pMnode, pMsg->transId, "");
×
218

219
  streamMutexLock(&execInfo.lock);
×
220
  code = mndResetChkptReportInfo(execInfo.pChkptStreams, pMsg->streamId);  // do thing if failed
×
UNCOV
221
  streamMutexUnlock(&execInfo.lock);
×
222

223
  code = mndGetStreamObj(pMnode, pMsg->streamId, &pStream);
×
UNCOV
224
  if (pStream == NULL || code != 0) {
×
225
    code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
226
    mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid);
×
227
  } else {
UNCOV
228
    code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false);
×
229
    if (code) {
×
230
      mError("stream:%s other trans exists in DB:%s, dstTable:%s failed to start reset-status trans", pStream->name,
×
231
             pStream->sourceDb, pStream->targetSTbName);
232
    } else {
UNCOV
233
      mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
×
234
             pStream->uid, pMsg->transId);
235
      code = mndCreateStreamResetStatusTrans(pMnode, pStream, pMsg->checkpointId);
×
236
    }
237
  }
238

239
  mndReleaseStream(pMnode, pStream);
×
UNCOV
240
  return code;
×
241
}
242

243
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
6✔
244
  int32_t num = taosArrayGetSize(pNodeList);
6✔
245
  mInfo("set node expired for %d nodes", num);
6!
246

247
  for (int k = 0; k < num; ++k) {
12✔
248
    int32_t *pVgId = taosArrayGet(pNodeList, k);
6✔
249
    if (pVgId == NULL) {
6!
UNCOV
250
      continue;
×
251
    }
252

253
    mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num);
6!
254

255
    bool    setFlag = false;
6✔
256
    int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
6✔
257

258
    for (int i = 0; i < numOfNodes; ++i) {
14!
259
      SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
14✔
260
      if ((pNodeEntry) && (pNodeEntry->nodeId == *pVgId)) {
14!
261
        mInfo("vgId:%d expired for some stream tasks, total in update list:%d", *pVgId, numOfNodes + 1);
6!
262
        pNodeEntry->stageUpdated = true;
6✔
263
        setFlag = true;
6✔
264
        break;
6✔
265
      }
266
    }
267

268
    if (!setFlag) {
6!
UNCOV
269
      mError("failed to set nodeUpdate flag, nodeId:%d not exists in nodelist", *pVgId);
×
UNCOV
270
      return TSDB_CODE_FAILED;
×
271
    }
272
  }
273
  return TSDB_CODE_SUCCESS;
6✔
274
}
275

276
int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info) {
×
UNCOV
277
  SSdb       *pSdb = pMnode->pSdb;
×
UNCOV
278
  SStreamObj *pStream = NULL;
×
UNCOV
279
  void       *pIter = NULL;
×
UNCOV
280
  int32_t     code = 0;
×
281

282
  while (1) {
283
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
284
    if (pIter == NULL) break;
×
285

286
    if (pStream->status != STREAM_STATUS__PAUSE) {
×
UNCOV
287
      SMPauseStreamReq reqPause = {0};
×
UNCOV
288
      tstrncpy(reqPause.name, pStream->name, sizeof(reqPause.name));
×
289
      reqPause.igNotExists = 1;
×
290

UNCOV
291
      int32_t contLen = tSerializeSMPauseStreamReq(NULL, 0, &reqPause);
×
292
      void   *pHead = rpcMallocCont(contLen);
×
293
      if (pHead == NULL) {
×
294
        code = TSDB_CODE_OUT_OF_MEMORY;
×
295
        sdbRelease(pSdb, pStream);
×
UNCOV
296
        continue;
×
297
      }
298

299
      code = tSerializeSMPauseStreamReq(pHead, contLen, &reqPause);
×
300
      if (code) {
×
301
        sdbRelease(pSdb, pStream);
×
302
        continue;
×
303
      }
304

305
      SRpcMsg rpcMsg = {
×
306
          .msgType = TDMT_MND_PAUSE_STREAM,
307
          .pCont = pHead,
308
          .contLen = contLen,
309
          .info = *info,
310
      };
311

UNCOV
312
      code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
UNCOV
313
      mInfo("receive pause stream:%s, %s, %" PRId64 ", because grant expired, code:%s", pStream->name, reqPause.name,
×
314
            pStream->uid, tstrerror(code));
315
    }
316

UNCOV
317
    sdbRelease(pSdb, pStream);
×
318
  }
319
  return code;
×
320
}
321

322
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
5,004✔
323
  SMnode      *pMnode = pReq->info.node;
5,004✔
324
  SStreamHbMsg req = {0};
5,004✔
325
  SArray      *pFailedChkpt = NULL;
5,004✔
326
  SArray      *pOrphanTasks = NULL;
5,004✔
327
  int32_t      code = 0;
5,004✔
328
  SDecoder     decoder = {0};
5,004✔
329
  SEpSet       mnodeEpset = {0};
5,004✔
330

331
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
5,004!
UNCOV
332
    if (suspendAllStreams(pMnode, &pReq->info) < 0) {
×
UNCOV
333
      return code;
×
334
    }
335
  }
336

337
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
5,005✔
338

339
  if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
5,004!
UNCOV
340
    tCleanupStreamHbMsg(&req);
×
UNCOV
341
    tDecoderClear(&decoder);
×
UNCOV
342
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
343
  }
344
  tDecoderClear(&decoder);
5,005✔
345

346
  mDebug("receive stream-meta hb from vgId:%d, active numOfTasks:%d, HbMsgId:%d, HbMsgTs:%" PRId64, req.vgId,
5,006✔
347
         req.numOfTasks, req.msgId, req.ts);
348

349
  pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
5,007✔
350
  pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask));
5,006✔
351
  if (pFailedChkpt == NULL || pOrphanTasks == NULL) {
5,006!
UNCOV
352
    taosArrayDestroy(pFailedChkpt);
×
UNCOV
353
    taosArrayDestroy(pOrphanTasks);
×
UNCOV
354
    TAOS_RETURN(terrno);
×
355
  }
356

357
  mndGetMnodeEpSet(pMnode, &mnodeEpset);
5,006✔
358

359
  streamMutexLock(&execInfo.lock);
5,006✔
360

361
  mndInitStreamExecInfo(pMnode, &execInfo);
5,006✔
362
  if (!validateHbMsg(execInfo.pNodeList, req.vgId)) {
5,006✔
363
    mError("vgId:%d not exists in nodeList buf, discarded", req.vgId);
1!
364

365
    doSendHbMsgRsp(terrno, &pReq->info, &mnodeEpset, req.vgId, req.msgId);
1✔
366

367
    streamMutexUnlock(&execInfo.lock);
1✔
368
    cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
1✔
369
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
1✔
370
  }
371

372
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) {
40,137✔
373
    SNodeEntry *pEntry = taosArrayGet(execInfo.pNodeList, i);
35,132✔
374
    if (pEntry == NULL) {
35,132!
UNCOV
375
      continue;
×
376
    }
377

378
    if (pEntry->nodeId != req.vgId) {
35,132✔
379
      continue;
30,127✔
380
    }
381

382
    if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) {
5,005!
UNCOV
383
      mError("vgId:%d HbMsgId:%d already handled, bh msg discard, and send HbRsp", pEntry->nodeId, req.msgId);
×
384

385
      // return directly and allow the vnode to continue to send the next HbMsg.
UNCOV
386
      terrno = TSDB_CODE_SUCCESS;
×
UNCOV
387
      doSendHbMsgRsp(terrno, &pReq->info, &mnodeEpset, req.vgId, req.msgId);
×
388

389
      streamMutexUnlock(&execInfo.lock);
×
UNCOV
390
      cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
×
UNCOV
391
      return terrno;
×
392
    } else {
393
      pEntry->lastHbMsgId = req.msgId;
5,005✔
394
      pEntry->lastHbMsgTs = req.ts;
5,005✔
395
    }
396
  }
397

398
  int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
5,005✔
399
  if (numOfUpdated > 0) {
5,005✔
400
    mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId);
6✔
401
    int32_t unused = setNodeEpsetExpiredFlag(req.pUpdateNodes);
6✔
402
  }
403

404
  bool snodeChanged = false;
5,005✔
405
  for (int32_t i = 0; i < req.numOfTasks; ++i) {
24,916✔
406
    STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
19,911✔
407
    if (p == NULL) {
19,911!
UNCOV
408
      continue;
×
409
    }
410

411
    STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
19,911✔
412
    if (pTaskEntry == NULL) {
19,911✔
413
      checkforOrphanTask(pMnode, p, pOrphanTasks);
25✔
414
      continue;
25✔
415
    }
416

417
    STaskCkptInfo *pChkInfo = &p->checkpointInfo;
19,886✔
418
    if (pChkInfo->consensusChkptId != 0) {
19,886✔
419
      SRestoreCheckpointInfo cp = {
113✔
420
          .streamId = p->id.streamId,
113✔
421
          .taskId = p->id.taskId,
113✔
422
          .checkpointId = p->checkpointInfo.latestId,
113✔
423
          .startTs = pChkInfo->consensusTs,
113✔
424
          .nodeId = p->nodeId,
113✔
425
          .term = p->stage,
113✔
426
      };
427

428
      SStreamObj *pStream = NULL;
113✔
429
      code = mndGetStreamObj(pMnode, p->id.streamId, &pStream);
113✔
430
      if (code) {
113!
UNCOV
431
        mError("stream:0x%" PRIx64 " not exist, failed to handle consensus checkpoint-info req for task:0x%x, code:%s",
×
432
               p->id.streamId, (int32_t)p->id.taskId, tstrerror(code));
UNCOV
433
        continue;
×
434
      }
435

436
      int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
113✔
437

438
      SCheckpointConsensusInfo *pInfo = NULL;
113✔
439
      code = mndGetConsensusInfo(execInfo.pStreamConsensus, p->id.streamId, numOfTasks, &pInfo);
113✔
440
      if (code == 0) {
113!
441
        mndAddConsensusTasks(pInfo, &cp);
113✔
442
      } else {
UNCOV
443
        mError("failed to get consensus checkpoint-info for stream:0x%" PRIx64, p->id.streamId);
×
444
      }
445

446
      mndReleaseStream(pMnode, pStream);
113✔
447
    }
448

449
    if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
19,886✔
450
      updateStageInfo(pTaskEntry, p->stage);
31✔
451
      if (pTaskEntry->nodeId == SNODE_HANDLE) {
31✔
452
        snodeChanged = true;
1✔
453
      }
454
    } else {
455
      streamTaskStatusCopy(pTaskEntry, p);
19,855✔
456

457
      if ((pChkInfo->activeId != 0) && pChkInfo->failed) {
19,855!
UNCOV
458
        mError("stream task:0x%" PRIx64 " checkpointId:%" PRId64 " transId:%d failed, kill it", p->id.taskId,
×
459
               pChkInfo->activeId, pChkInfo->activeTransId);
460

UNCOV
461
        SFailedCheckpointInfo info = {
×
UNCOV
462
            .transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId};
×
UNCOV
463
        addIntoFailedChkptList(pFailedChkpt, &info);
×
464

465
        // remove failed trans from pChkptStreams
UNCOV
466
        code = mndResetChkptReportInfo(execInfo.pChkptStreams, p->id.streamId);
×
467
        if (code) {
×
468
          mError("failed to remove stream:0x%" PRIx64 " in checkpoint stream list", p->id.streamId);
×
469
        }
470
      }
471
    }
472

473
    if (p->status != TASK_STATUS__READY) {
19,886✔
474
      mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
2,464✔
475
    }
476
  }
477

478
  // current checkpoint is failed, rollback from the checkpoint trans
479
  // kill the checkpoint trans and then set all tasks status to be normal
480
  if (taosArrayGetSize(pFailedChkpt) > 0) {
5,005!
UNCOV
481
    bool allReady = true;
×
482

UNCOV
483
    if (pMnode != NULL) {
×
UNCOV
484
      SArray *p = NULL;
×
UNCOV
485
      code = mndTakeVgroupSnapshot(pMnode, &allReady, &p, NULL);
×
UNCOV
486
      taosArrayDestroy(p);
×
487
      if (code) {
×
UNCOV
488
        mError("failed to get the vgroup snapshot, ignore it and continue");
×
489
      }
490
    } else {
491
      allReady = false;
×
492
    }
493

494
    if (allReady || snodeChanged) {
×
495
      // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
UNCOV
496
      for (int32_t i = 0; i < taosArrayGetSize(pFailedChkpt); ++i) {
×
497
        SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedChkpt, i);
×
UNCOV
498
        if (pInfo == NULL) {
×
UNCOV
499
          continue;
×
500
        }
501

502
        mInfo("stream:0x%" PRIx64 " checkpointId:%" PRId64
×
503
              " transId:%d failed issue task-reset trans to reset all tasks status",
504
              pInfo->streamUid, pInfo->checkpointId, pInfo->transId);
505

UNCOV
506
        code = mndSendResetFromCheckpointMsg(pMnode, pInfo->streamUid, pInfo->transId, pInfo->checkpointId);
×
UNCOV
507
        if (code) {
×
508
          mError("failed to create reset task trans, code:%s", tstrerror(code));
×
509
        }
510
      }
511
    } else {
512
      mInfo("not all vgroups are ready, wait for next HB from stream tasks to reset the task status");
×
513
    }
514
  }
515

516
  // handle the orphan tasks that are invalid but not removed in some vnodes or snode due to some unknown errors.
517
  if (taosArrayGetSize(pOrphanTasks) > 0) {
5,005!
518
    code = mndSendDropOrphanTasksMsg(pMnode, pOrphanTasks);
×
UNCOV
519
    if (code) {
×
UNCOV
520
      mError("failed to send drop orphan tasks msg, code:%s, try next time", tstrerror(code));
×
521
    }
522
  }
523

524
  int64_t now = taosGetTimestampMs();
5,005✔
525
  if (pMnode != NULL && (now > execInfo.chkptReportScanTs) && (now - execInfo.chkptReportScanTs) > 10000) {
5,005!
526
    // make sure that the unit test case can work
527
    code = mndStreamSendUpdateChkptInfoMsg(pMnode);
850✔
528
    if (code) {
850!
UNCOV
529
      mError("failed to send update checkpointInfo msg, code:%s, try next time", tstrerror(code));
×
530
    }
531
  }
532

533
  streamMutexUnlock(&execInfo.lock);
5,005✔
534

535
  doSendHbMsgRsp(TSDB_CODE_SUCCESS, &pReq->info, &mnodeEpset, req.vgId, req.msgId);
5,005✔
536
  cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
5,005✔
537

538
  return code;
5,005✔
539
}
540

541
bool validateHbMsg(const SArray *pNodeList, int32_t vgId) {
5,006✔
542
  for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
20,368✔
543
    SNodeEntry *pEntry = taosArrayGet(pNodeList, i);
20,367✔
544
    if ((pEntry) && (pEntry->nodeId == vgId)) {
20,367!
545
      return true;
5,005✔
546
    }
547
  }
548

549
  return false;
1✔
550
}
551

552
void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks) {
5,006✔
553
  tCleanupStreamHbMsg(pReq);
5,006✔
554
  taosArrayDestroy(pFailedChkptList);
5,006✔
555
  taosArrayDestroy(pOrphanTasks);
5,006✔
556
}
5,006✔
557

558
void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, SEpSet* pMndEpset, int32_t vgId, int32_t msgId) {
5,006✔
559
  int32_t ret = 0;
5,006✔
560
  int32_t tlen = 0;
5,006✔
561
  void   *buf = NULL;
5,006✔
562

563
  SMStreamHbRspMsg msg = {.msgId = msgId};//, .mndEpset = *pMndEpset};
5,006✔
564
  epsetAssign(&msg.mndEpset, pMndEpset);
5,006✔
565

566
  tEncodeSize(tEncodeStreamHbRsp, &msg, tlen, ret);
5,006!
567
  if (ret < 0) {
5,006!
UNCOV
568
    mError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
×
569
  }
570

571
  buf = rpcMallocCont(tlen + sizeof(SMsgHead));
5,006✔
572
  if (buf == NULL) {
5,006!
UNCOV
573
    mError("encode stream hb msg rsp failed, code:%s", tstrerror(terrno));
×
574
    return;
×
575
  }
576

577
  ((SMStreamHbRspMsg *)buf)->head.vgId = htonl(vgId);
5,006✔
578
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
5,006✔
579

580
  SEncoder encoder;
581
  tEncoderInit(&encoder, abuf, tlen);
5,006✔
582
  if ((code = tEncodeStreamHbRsp(&encoder, &msg)) < 0) {
5,006!
UNCOV
583
    rpcFreeCont(buf);
×
UNCOV
584
    tEncoderClear(&encoder);
×
UNCOV
585
    mError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
×
UNCOV
586
    return;
×
587
  }
588
  tEncoderClear(&encoder);
5,006✔
589

590
  SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = tlen + sizeof(SMsgHead), .pCont = buf};
5,006✔
591

592
  tmsgSendRsp(&rsp);
5,006✔
593
  pRpcInfo->handle = NULL;  // disable auto rsp
5,006✔
594
}
595

596
void checkforOrphanTask(SMnode *pMnode, STaskStatusEntry *p, SArray *pOrphanTasks) {
25✔
597
  SStreamObj *pStream = NULL;
25✔
598

599
  int32_t code = mndGetStreamObj(pMnode, p->id.streamId, &pStream);
25✔
600
  if (code) {
25!
UNCOV
601
    mError("stream:0x%" PRIx64 " not exists, s-task:0x%" PRIx64 " not found in task list, add into orphan list",
×
602
           p->id.streamId, p->id.taskId);
603

UNCOV
604
    SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId};
×
UNCOV
605
    void       *px = taosArrayPush(pOrphanTasks, &oTask);
×
UNCOV
606
    if (px == NULL) {
×
607
      mError("failed to put task into orphan list, taskId:0x%" PRIx64 ", code:%s", p->id.taskId, tstrerror(terrno));
×
608
    }
609
  } else {
610
    if (pStream != NULL) {
25!
611
      mndReleaseStream(pMnode, pStream);
25✔
612
    }
613

614
    mError("s-task:0x%" PRIx64 " not found in task list but exists in mnode meta, data inconsistent, not drop yet",
25!
615
           p->id.taskId);
616
  }
617
}
25✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc