• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3544

30 Nov 2024 03:06AM UTC coverage: 60.88% (+0.04%) from 60.842%
#3544

push

travis-ci

web-flow
Merge pull request #28988 from taosdata/main

merge: from main to 3.0 branch

120724 of 253479 branches covered (47.63%)

Branch coverage included in aggregate %.

407 of 489 new or added lines in 21 files covered. (83.23%)

1148 existing lines in 113 files now uncovered.

201919 of 276488 relevant lines covered (73.03%)

18898587.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.77
/source/dnode/mnode/impl/src/mndStreamHb.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndTrans.h"
18

19
typedef struct SFailedCheckpointInfo {
20
  int64_t streamUid;
21
  int64_t checkpointId;
22
  int32_t transId;
23
} SFailedCheckpointInfo;
24

25
static int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode);
26
static int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList);
27
static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId, int64_t checkpointId);
28
static void    updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage);
29
static void    addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo);
30
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
31
static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info);
32
static bool    validateHbMsg(const SArray *pNodeList, int32_t vgId);
33
static void    cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks);
34
static void    doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId);
35
static void    checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks);
36

37
void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
97✔
38
  int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
97✔
39
  for (int32_t j = 0; j < numOfNodes; ++j) {
189!
40
    SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, j);
189✔
41
    if (pNodeEntry == NULL) {
189!
42
      continue;
×
43
    }
44

45
    if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
189✔
46
      mInfo("vgId:%d stage updated from %" PRId64 " to %" PRId64 ", nodeUpdate trigger by s-task:0x%" PRIx64,
97!
47
            pTaskEntry->nodeId, pTaskEntry->stage, stage, pTaskEntry->id.taskId);
48

49
      pNodeEntry->stageUpdated = true;
97✔
50
      pTaskEntry->stage = stage;
97✔
51
      break;
97✔
52
    }
53
  }
54
}
97✔
55

56
void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo) {
×
57
  int32_t num = taosArrayGetSize(pList);
×
58
  for (int32_t i = 0; i < num; ++i) {
×
59
    SFailedCheckpointInfo *p = taosArrayGet(pList, i);
×
60
    if (p && (p->transId == pInfo->transId)) {
×
61
      return;
×
62
    }
63
  }
64

65
  void* p = taosArrayPush(pList, pInfo);
×
66
  if (p == NULL) {
×
67
    mError("failed to push failed checkpoint info checkpointId:%" PRId64 " in list", pInfo->checkpointId);
×
68
  }
69
}
70

71
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId) {
×
72
  STrans *pTrans = NULL;
×
73
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_RESET_NAME,
×
74
                               " reset from failed checkpoint", &pTrans);
75
  if (pTrans == NULL || code) {
×
76
    sdbRelease(pMnode->pSdb, pStream);
×
77
    return terrno;
×
78
  }
79

80
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid);
×
81
  if (code) {
×
82
    sdbRelease(pMnode->pSdb, pStream);
×
83
    mndTransDrop(pTrans);
×
84
    return code;
×
85
  }
86

87
  code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream, chkptId);
×
88
  if (code) {
×
89
    sdbRelease(pMnode->pSdb, pStream);
×
90
    mndTransDrop(pTrans);
×
91
    return code;
×
92
  }
93

94
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
95
  if (code != TSDB_CODE_SUCCESS) {
×
96
    sdbRelease(pMnode->pSdb, pStream);
×
97
    mndTransDrop(pTrans);
×
98
    return code;
×
99
  }
100

101
  code = mndTransPrepare(pMnode, pTrans);
×
102
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
103
    mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
×
104
    sdbRelease(pMnode->pSdb, pStream);
×
105
    mndTransDrop(pTrans);
×
106
    return code;
×
107
  }
108

109
  sdbRelease(pMnode->pSdb, pStream);
×
110
  mndTransDrop(pTrans);
×
111

112
  if (code == 0) {
×
113
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
114
  }
115
  return code;
×
116
}
117

118
int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId, int64_t checkpointId) {
×
119
  int32_t size = sizeof(SStreamTaskResetMsg);
×
120

121
  int32_t num = taosArrayGetSize(execInfo.pKilledChkptTrans);
×
122
  for(int32_t i = 0; i < num; ++i) {
×
123
    SStreamTaskResetMsg* p = taosArrayGet(execInfo.pKilledChkptTrans, i);
×
124
    if (p == NULL) {
×
125
      continue;
×
126
    }
127

128
    if (p->transId == transId && p->streamId == streamId) {
×
129
      mDebug("already reset stream:0x%" PRIx64 ", not send reset-msg again for transId:%d", streamId, transId);
×
130
      return TSDB_CODE_SUCCESS;
×
131
    }
132
  }
133

134
  if (num >= 10) {
×
135
    taosArrayRemove(execInfo.pKilledChkptTrans, 0);  // remove this first, append new reset trans in the tail
×
136
  }
137

138
  SStreamTaskResetMsg p = {.streamId = streamId, .transId = transId, .checkpointId = checkpointId};
×
139

140
  // let's remember that this trans had been killed already
141
  void *px = taosArrayPush(execInfo.pKilledChkptTrans, &p);
×
142
  if (px == NULL) {
×
143
    mError("failed to push reset-msg trans:%d into the killed chkpt trans list, size:%d", transId, num - 1);
×
144
    return terrno;
×
145
  }
146

147
  SStreamTaskResetMsg *pReq = rpcMallocCont(size);
×
148
  if (pReq == NULL) {
×
149
    return terrno;
×
150
  }
151

152
  pReq->streamId = streamId;
×
153
  pReq->transId = transId;
×
154
  pReq->checkpointId = checkpointId;
×
155

156
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_TASK_RESET, .pCont = pReq, .contLen = size};
×
157
  int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
158
  if (code) {
×
159
    mError("failed to put reset-task msg into write queue, code:%s", tstrerror(code));
×
160
  } else {
161
    mDebug("send reset task status msg for transId:%d succ", transId);
×
162
  }
163

164
  return code;
×
165
}
166

167
int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode) {  // here reuse the doCheckpointmsg
25,880✔
168
  int32_t size = sizeof(SMStreamDoCheckpointMsg);
25,880✔
169
  void   *pMsg = rpcMallocCont(size);
25,880✔
170
  if (pMsg == NULL) {
25,880!
171
    return terrno;
×
172
  }
173

174
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_UPDATE_CHKPT_EVT, .pCont = pMsg, .contLen = size};
25,880✔
175
  int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
25,880✔
176
  if (code) {
25,880!
177
    mError("failed to put update-checkpoint-info msg into write queue, code:%s", tstrerror(code));
×
178
  } else {
179
    mDebug("send update checkpoint-info msg succ");
25,880✔
180
  }
181

182
  return code;
25,880✔
183
}
184

185
int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList) {
2✔
186
  SMStreamDropOrphanMsg msg = {.pList = pList};
2✔
187

188
  int32_t num = taosArrayGetSize(pList);
2✔
189
  int32_t contLen = tSerializeDropOrphanTaskMsg(NULL, 0, &msg);
2✔
190
  if (contLen <= 0) {
2!
191
    return terrno;
×
192
  }
193

194
  void *pReq = rpcMallocCont(contLen);
2✔
195
  if (pReq == NULL) {
2!
196
    return terrno;
×
197
  }
198

199
  int32_t code = tSerializeDropOrphanTaskMsg(pReq, contLen, &msg);
2✔
200
  if (code <= 0) {
2!
201
    mError("failed to serialize the drop orphan task msg, code:%s", tstrerror(code));
×
202
  }
203

204
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_DROP_ORPHANTASKS, .pCont = pReq, .contLen = contLen};
2✔
205
  code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
2✔
206
  if (code) {
2!
207
    mError("failed to put drop-orphan task msg into write queue, code:%s", tstrerror(code));
×
208
  } else {
209
    mDebug("send drop %d orphan tasks msg succ", num);
2!
210
  }
211

212
  return code;
2✔
213
}
214

215
int32_t mndProcessResetStatusReq(SRpcMsg *pReq) {
×
216
  SMnode     *pMnode = pReq->info.node;
×
217
  int32_t     code = TSDB_CODE_SUCCESS;
×
218
  SStreamObj *pStream = NULL;
×
219

220
  SStreamTaskResetMsg* pMsg = pReq->pCont;
×
221
  mndKillTransImpl(pMnode, pMsg->transId, "");
×
222

223
  streamMutexLock(&execInfo.lock);
×
224
  code = mndResetChkptReportInfo(execInfo.pChkptStreams, pMsg->streamId);   // do thing if failed
×
225
  streamMutexUnlock(&execInfo.lock);
×
226

227
  code = mndGetStreamObj(pMnode, pMsg->streamId, &pStream);
×
228
  if (pStream == NULL || code != 0) {
×
229
    code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
230
    mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid);
×
231
  } else {
232
    code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false);
×
233
    if (code) {
×
234
      mError("stream:%s other trans exists in DB:%s, dstTable:%s failed to start reset-status trans", pStream->name,
×
235
             pStream->sourceDb, pStream->targetSTbName);
236
    } else {
237
      mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
×
238
             pStream->uid, pMsg->transId);
239
      code = mndCreateStreamResetStatusTrans(pMnode, pStream, pMsg->checkpointId);
×
240
    }
241
  }
242

243
  mndReleaseStream(pMnode, pStream);
×
244
  return code;
×
245
}
246

247
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
33✔
248
  int32_t num = taosArrayGetSize(pNodeList);
33✔
249
  mInfo("set node expired for %d nodes", num);
33!
250

251
  for (int k = 0; k < num; ++k) {
66✔
252
    int32_t *pVgId = taosArrayGet(pNodeList, k);
33✔
253
    if (pVgId == NULL) {
33!
254
      continue;
×
255
    }
256

257
    mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num);
33!
258

259
    bool    setFlag = false;
33✔
260
    int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
33✔
261

262
    for (int i = 0; i < numOfNodes; ++i) {
65!
263
      SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
65✔
264
      if ((pNodeEntry) && (pNodeEntry->nodeId == *pVgId)) {
65!
265
        mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId);
33!
266
        pNodeEntry->stageUpdated = true;
33✔
267
        setFlag = true;
33✔
268
        break;
33✔
269
      }
270
    }
271

272
    if (!setFlag) {
33!
273
      mError("failed to set nodeUpdate flag, nodeId:%d not exists in nodelist", *pVgId);
×
274
      return TSDB_CODE_FAILED;
×
275
    }
276
  }
277
  return TSDB_CODE_SUCCESS;
33✔
278
}
279

280
int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info) {
×
281
  SSdb       *pSdb = pMnode->pSdb;
×
282
  SStreamObj *pStream = NULL;
×
283
  void       *pIter = NULL;
×
284
  int32_t     code = 0;
×
285

286
  while (1) {
287
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
288
    if (pIter == NULL) break;
×
289

290
    if (pStream->status != STREAM_STATUS__PAUSE) {
×
291
      SMPauseStreamReq reqPause = {0};
×
292
      strcpy(reqPause.name, pStream->name);
×
293
      reqPause.igNotExists = 1;
×
294

295
      int32_t contLen = tSerializeSMPauseStreamReq(NULL, 0, &reqPause);
×
296
      void   *pHead = rpcMallocCont(contLen);
×
297
      if (pHead == NULL) {
×
298
        code = TSDB_CODE_OUT_OF_MEMORY;
×
299
        sdbRelease(pSdb, pStream);
×
300
        continue;
×
301
      }
302

303
      code = tSerializeSMPauseStreamReq(pHead, contLen, &reqPause);
×
304
      if (code) {
×
305
        sdbRelease(pSdb, pStream);
×
306
        continue;
×
307
      }
308

309
      SRpcMsg rpcMsg = {
×
310
          .msgType = TDMT_MND_PAUSE_STREAM,
311
          .pCont = pHead,
312
          .contLen = contLen,
313
          .info = *info,
314
      };
315

316
      code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
317
      mInfo("receive pause stream:%s, %s, %" PRId64 ", because grant expired, code:%s", pStream->name, reqPause.name,
×
318
            pStream->uid, tstrerror(code));
319
    }
320

321
    sdbRelease(pSdb, pStream);
×
322
  }
323
  return code;
×
324
}
325

326
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
25,903✔
327
  SMnode      *pMnode = pReq->info.node;
25,903✔
328
  SStreamHbMsg req = {0};
25,903✔
329
  SArray      *pFailedChkpt = NULL;
25,903✔
330
  SArray      *pOrphanTasks = NULL;
25,903✔
331
  int32_t      code = 0;
25,903✔
332

333
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
25,903!
334
    if (suspendAllStreams(pMnode, &pReq->info) < 0) {
×
335
      return code;
×
336
    }
337
  }
338

339
  SDecoder decoder = {0};
25,905✔
340
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
25,905✔
341

342
  if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
25,903!
343
    tCleanupStreamHbMsg(&req);
×
344
    tDecoderClear(&decoder);
×
345
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
346
  }
347
  tDecoderClear(&decoder);
25,904✔
348

349
  mDebug("receive stream-meta hb from vgId:%d, active numOfTasks:%d, HbMsgId:%d, HbMsgTs:%" PRId64, req.vgId,
25,901✔
350
         req.numOfTasks, req.msgId, req.ts);
351

352
  pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
25,901✔
353
  pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask));
25,904✔
354
  if (pFailedChkpt == NULL || pOrphanTasks == NULL) {
25,905!
UNCOV
355
    taosArrayDestroy(pFailedChkpt);
×
356
    taosArrayDestroy(pOrphanTasks);
×
357
    TAOS_RETURN(terrno);
×
358
  }
359

360
  streamMutexLock(&execInfo.lock);
25,905✔
361

362
  mndInitStreamExecInfo(pMnode, &execInfo);
25,907✔
363
  if (!validateHbMsg(execInfo.pNodeList, req.vgId)) {
25,907✔
364
    mError("vgId:%d not exists in nodeList buf, discarded", req.vgId);
5!
365

366
    doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
5✔
367

368
    streamMutexUnlock(&execInfo.lock);
5✔
369
    cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
5✔
370
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
5✔
371
  }
372

373
  for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) {
184,968✔
374
    SNodeEntry* pEntry = taosArrayGet(execInfo.pNodeList, i);
159,088✔
375
    if (pEntry == NULL) {
159,088!
376
      continue;
×
377
    }
378

379
    if (pEntry->nodeId != req.vgId) {
159,088✔
380
      continue;
133,186✔
381
    }
382

383
    if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) {
25,902✔
384
      mError("vgId:%d HbMsgId:%d already handled, bh msg discard, and send HbRsp", pEntry->nodeId, req.msgId);
22!
385

386
      // return directly and after the vnode to continue to send the next HbMsg.
387
      terrno = TSDB_CODE_SUCCESS;
22✔
388
      doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
22✔
389

390
      streamMutexUnlock(&execInfo.lock);
22✔
391
      cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
22✔
392
      return terrno;
22✔
393
    } else {
394
      pEntry->lastHbMsgId = req.msgId;
25,880✔
395
      pEntry->lastHbMsgTs = req.ts;
25,880✔
396
    }
397
  }
398

399
  int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
25,880✔
400
  if (numOfUpdated > 0) {
25,880✔
401
    mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId);
33✔
402
    int32_t unused = setNodeEpsetExpiredFlag(req.pUpdateNodes);
33✔
403
  }
404

405
  bool snodeChanged = false;
25,880✔
406
  for (int32_t i = 0; i < req.numOfTasks; ++i) {
80,090✔
407
    STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
54,210✔
408
    if (p == NULL) {
54,210!
409
      continue;
×
410
    }
411

412
    STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
54,210✔
413
    if (pTaskEntry == NULL) {
54,210✔
414
      checkforOrphanTask(pMnode, p, pOrphanTasks);
55✔
415
      continue;
55✔
416
    }
417

418
    STaskCkptInfo *pChkInfo = &p->checkpointInfo;
54,155✔
419
    if (pChkInfo->consensusChkptId != 0) {
54,155✔
420
      SRestoreCheckpointInfo cp = {
132✔
421
          .streamId = p->id.streamId,
132✔
422
          .taskId = p->id.taskId,
132✔
423
          .checkpointId = p->checkpointInfo.latestId,
132✔
424
          .startTs = pChkInfo->consensusTs,
132✔
425
      };
426

427
      SStreamObj *pStream = NULL;
132✔
428
      code = mndGetStreamObj(pMnode, p->id.streamId, &pStream);
132✔
429
      if (code) {
132!
430
        mError("stream:0x%" PRIx64 " not exist, failed to handle consensus checkpoint-info req for task:0x%x, code:%s",
×
431
               p->id.streamId, (int32_t)p->id.taskId, tstrerror(code));
432
        continue;
×
433
      }
434

435
      int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
132✔
436

437
      SCheckpointConsensusInfo *pInfo = NULL;
132✔
438
      code = mndGetConsensusInfo(execInfo.pStreamConsensus, p->id.streamId, numOfTasks, &pInfo);
132✔
439
      if (code == 0) {
132!
440
        mndAddConsensusTasks(pInfo, &cp);
132✔
441
      } else {
442
        mError("failed to get consensus checkpoint-info for stream:0x%" PRIx64, p->id.streamId);
×
443
      }
444

445
      mndReleaseStream(pMnode, pStream);
132✔
446
    }
447

448
    if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
54,155✔
449
      updateStageInfo(pTaskEntry, p->stage);
97✔
450
      if (pTaskEntry->nodeId == SNODE_HANDLE) {
97✔
451
        snodeChanged = true;
1✔
452
      }
453
    } else {
454
      streamTaskStatusCopy(pTaskEntry, p);
54,058✔
455

456
      if ((pChkInfo->activeId != 0) && pChkInfo->failed) {
54,058!
457
        mError("stream task:0x%" PRIx64 " checkpointId:%" PRId64 " transId:%d failed, kill it", p->id.taskId,
×
458
               pChkInfo->activeId, pChkInfo->activeTransId);
459

460
        SFailedCheckpointInfo info = {
×
461
            .transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId};
×
462
        addIntoFailedChkptList(pFailedChkpt, &info);
×
463

464
        // remove failed trans from pChkptStreams
465
        code = mndResetChkptReportInfo(execInfo.pChkptStreams, p->id.streamId);
×
466
        if (code) {
×
467
          mError("failed to remove stream:0x%"PRIx64" in checkpoint stream list", p->id.streamId);
×
468
        }
469
      }
470
    }
471

472
    if (p->status != TASK_STATUS__READY) {
54,155✔
473
      mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
3,473✔
474
    }
475
  }
476

477
  // current checkpoint is failed, rollback from the checkpoint trans
478
  // kill the checkpoint trans and then set all tasks status to be normal
479
  if (taosArrayGetSize(pFailedChkpt) > 0) {
25,880!
480
    bool allReady = true;
×
481

482
    if (pMnode != NULL) {
×
483
      SArray *p = NULL;
×
484
      code = mndTakeVgroupSnapshot(pMnode, &allReady, &p);
×
485
      taosArrayDestroy(p);
×
486
      if (code) {
×
487
        mError("failed to get the vgroup snapshot, ignore it and continue");
×
488
      }
489
    } else {
490
      allReady = false;
×
491
    }
492

493
    if (allReady || snodeChanged) {
×
494
      // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
495
      for (int32_t i = 0; i < taosArrayGetSize(pFailedChkpt); ++i) {
×
496
        SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedChkpt, i);
×
497
        if (pInfo == NULL) {
×
498
          continue;
×
499
        }
500

501
        mInfo("stream:0x%" PRIx64 " checkpointId:%" PRId64
×
502
              " transId:%d failed issue task-reset trans to reset all tasks status",
503
              pInfo->streamUid, pInfo->checkpointId, pInfo->transId);
504

505
        code = mndSendResetFromCheckpointMsg(pMnode, pInfo->streamUid, pInfo->transId, pInfo->checkpointId);
×
506
        if (code) {
×
507
          mError("failed to create reset task trans, code:%s", tstrerror(code));
×
508
        }
509
      }
510
    } else {
511
      mInfo("not all vgroups are ready, wait for next HB from stream tasks to reset the task status");
×
512
    }
513
  }
514

515
  // handle the orphan tasks that are invalid but not removed in some vnodes or snode due to some unknown errors.
516
  if (taosArrayGetSize(pOrphanTasks) > 0) {
25,880✔
517
    code = mndSendDropOrphanTasksMsg(pMnode, pOrphanTasks);
2✔
518
    if (code) {
2!
519
      mError("failed to send drop orphan tasks msg, code:%s, try next time", tstrerror(code));
×
520
    }
521
  }
522

523
  if (pMnode != NULL) {  // make sure that the unit test case can work
25,880!
524
    code = mndStreamSendUpdateChkptInfoMsg(pMnode);
25,880✔
525
    if (code) {
25,880!
526
      mError("failed to send update checkpointInfo msg, code:%s, try next time", tstrerror(code));
×
527
    }
528
  }
529

530
  streamMutexUnlock(&execInfo.lock);
25,880✔
531

532
  doSendHbMsgRsp(TSDB_CODE_SUCCESS, &pReq->info, req.vgId, req.msgId);
25,880✔
533
  cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
25,880✔
534

535
  return code;
25,880✔
536
}
537

538
bool validateHbMsg(const SArray *pNodeList, int32_t vgId) {
25,907✔
539
  for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
94,710✔
540
    SNodeEntry *pEntry = taosArrayGet(pNodeList, i);
94,705✔
541
    if ((pEntry) && (pEntry->nodeId == vgId)) {
94,705!
542
      return true;
25,902✔
543
    }
544
  }
545

546
  return false;
5✔
547
}
548

549
void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks) {
25,907✔
550
  tCleanupStreamHbMsg(pReq);
25,907✔
551
  taosArrayDestroy(pFailedChkptList);
25,907✔
552
  taosArrayDestroy(pOrphanTasks);
25,907✔
553
}
25,907✔
554

555
void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId) {
25,907✔
556
  int32_t ret = 0;
25,907✔
557
  int32_t tlen = 0;
25,907✔
558
  void   *buf = NULL;
25,907✔
559

560
  const SMStreamHbRspMsg msg = {.msgId = msgId};
25,907✔
561

562
  tEncodeSize(tEncodeStreamHbRsp, &msg, tlen, ret);
25,907!
563
  if (ret < 0) {
25,907!
564
    mError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
×
565
  }
566

567
  buf = rpcMallocCont(tlen + sizeof(SMsgHead));
25,907✔
568
  if (buf == NULL) {
25,907!
569
    mError("encode stream hb msg rsp failed, code:%s", tstrerror(terrno));
×
570
    return;
×
571
  }
572

573
  ((SMStreamHbRspMsg*)buf)->head.vgId = htonl(vgId);
25,907✔
574
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
25,907✔
575

576
  SEncoder encoder;
577
  tEncoderInit(&encoder, abuf, tlen);
25,907✔
578
  if ((code = tEncodeStreamHbRsp(&encoder, &msg)) < 0) {
25,907!
579
    rpcFreeCont(buf);
×
580
    tEncoderClear(&encoder);
×
581
    mError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
×
582
    return;
×
583
  }
584
  tEncoderClear(&encoder);
25,907✔
585

586
  SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = tlen + sizeof(SMsgHead), .pCont = buf};
25,907✔
587

588
  tmsgSendRsp(&rsp);
25,907✔
589
  pRpcInfo->handle = NULL;  // disable auto rsp
25,907✔
590
}
591

592
void checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks) {
55✔
593
  SStreamObj *pStream = NULL;
55✔
594

595
  int32_t code = mndGetStreamObj(pMnode, p->id.streamId, &pStream);
55✔
596
  if (code) {
55✔
597
    mError("stream:0x%" PRIx64 " not exists, s-task:0x%" PRIx64 " not found in task list, add into orphan list",
3!
598
           p->id.streamId, p->id.taskId);
599

600
    SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId};
3✔
601
    void       *px = taosArrayPush(pOrphanTasks, &oTask);
3✔
602
    if (px == NULL) {
3!
603
      mError("failed to put task into orphan list, taskId:0x%" PRIx64", code:%s", p->id.taskId, tstrerror(terrno));
×
604
    }
605
  } else {
606
    if (pStream != NULL) {
52!
607
      mndReleaseStream(pMnode, pStream);
52✔
608
    }
609

610
    mError("s-task:0x%" PRIx64 " not found in task list but exists in mnode meta, data inconsistent, not drop yet",
52!
611
           p->id.taskId);
612
  }
613
}
55✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc