• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3720

26 Mar 2025 06:20AM UTC coverage: 30.242% (-31.7%) from 61.936%
#3720

push

travis-ci

web-flow
feat(taosBenchmark): supports decimal data type (#30456)

* feat: taosBenchmark supports decimal data type

* build: decimal script not use pytest.sh

* fix: fix typo for decimal script

* test: insertBasic.py debug

71234 of 313946 branches covered (22.69%)

Branch coverage included in aggregate %.

38 of 423 new or added lines in 8 files covered. (8.98%)

120240 existing lines in 447 files now uncovered.

118188 of 312400 relevant lines covered (37.83%)

1450220.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mnode/impl/src/mndStreamHb.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndTrans.h"
18
#include "mndMnode.h"
19
#include "tmisce.h"
20

21
typedef struct SFailedCheckpointInfo {
22
  int64_t streamUid;
23
  int64_t checkpointId;
24
  int32_t transId;
25
} SFailedCheckpointInfo;
26

27
static int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode);
28
static int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList);
29
static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId, int64_t checkpointId);
30
static void    updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage);
31
static void    addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo);
32
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
33
static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info);
34
static bool    validateHbMsg(const SArray *pNodeList, int32_t vgId);
35
static void    cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks);
36
static void    doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, SEpSet* pEpset, int32_t vgId, int32_t msgId);
37
static void    checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks);
38

UNCOV
39
void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
×
UNCOV
40
  int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
×
UNCOV
41
  for (int32_t j = 0; j < numOfNodes; ++j) {
×
UNCOV
42
    SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, j);
×
UNCOV
43
    if (pNodeEntry == NULL) {
×
44
      continue;
×
45
    }
46

UNCOV
47
    if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
×
UNCOV
48
      mInfo("vgId:%d stage updated from %" PRId64 " to %" PRId64 ", nodeUpdate trigger by s-task:0x%" PRIx64,
×
49
            pTaskEntry->nodeId, pTaskEntry->stage, stage, pTaskEntry->id.taskId);
50

UNCOV
51
      pNodeEntry->stageUpdated = true;
×
UNCOV
52
      pTaskEntry->stage = stage;
×
UNCOV
53
      break;
×
54
    }
55
  }
UNCOV
56
}
×
57

58
void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo) {
×
59
  int32_t num = taosArrayGetSize(pList);
×
60
  for (int32_t i = 0; i < num; ++i) {
×
61
    SFailedCheckpointInfo *p = taosArrayGet(pList, i);
×
62
    if (p && (p->transId == pInfo->transId)) {
×
63
      return;
×
64
    }
65
  }
66

67
  void *p = taosArrayPush(pList, pInfo);
×
68
  if (p == NULL) {
×
69
    mError("failed to push failed checkpoint info checkpointId:%" PRId64 " in list", pInfo->checkpointId);
×
70
  }
71
}
72

UNCOV
73
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId) {
×
UNCOV
74
  STrans *pTrans = NULL;
×
UNCOV
75
  int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_RESET_NAME,
×
76
                               " reset from failed checkpoint", &pTrans);
UNCOV
77
  if (pTrans == NULL || code) {
×
78
    sdbRelease(pMnode->pSdb, pStream);
×
79
    return terrno;
×
80
  }
81

UNCOV
82
  code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid);
×
UNCOV
83
  if (code) {
×
84
    sdbRelease(pMnode->pSdb, pStream);
×
85
    mndTransDrop(pTrans);
×
86
    return code;
×
87
  }
88

UNCOV
89
  code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream, chkptId);
×
UNCOV
90
  if (code) {
×
91
    sdbRelease(pMnode->pSdb, pStream);
×
92
    mndTransDrop(pTrans);
×
93
    return code;
×
94
  }
95

UNCOV
96
  code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
×
UNCOV
97
  if (code != TSDB_CODE_SUCCESS) {
×
98
    sdbRelease(pMnode->pSdb, pStream);
×
99
    mndTransDrop(pTrans);
×
100
    return code;
×
101
  }
102

UNCOV
103
  code = mndTransPrepare(pMnode, pTrans);
×
UNCOV
104
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
105
    mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
×
UNCOV
106
    sdbRelease(pMnode->pSdb, pStream);
×
UNCOV
107
    mndTransDrop(pTrans);
×
UNCOV
108
    return code;
×
109
  }
110

111
  sdbRelease(pMnode->pSdb, pStream);
×
112
  mndTransDrop(pTrans);
×
113

114
  if (code == 0) {
×
115
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
116
  }
117
  return code;
×
118
}
119

120
int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId, int64_t checkpointId) {
×
121
  int32_t size = sizeof(SStreamTaskResetMsg);
×
122

123
  int32_t num = taosArrayGetSize(execInfo.pKilledChkptTrans);
×
124
  for (int32_t i = 0; i < num; ++i) {
×
125
    SStreamTaskResetMsg *p = taosArrayGet(execInfo.pKilledChkptTrans, i);
×
126
    if (p == NULL) {
×
127
      continue;
×
128
    }
129

130
    if (p->transId == transId && p->streamId == streamId) {
×
131
      mDebug("already reset stream:0x%" PRIx64 ", not send reset-msg again for transId:%d", streamId, transId);
×
132
      return TSDB_CODE_SUCCESS;
×
133
    }
134
  }
135

136
  if (num >= 10) {
×
137
    taosArrayRemove(execInfo.pKilledChkptTrans, 0);  // remove this first, append new reset trans in the tail
×
138
  }
139

140
  SStreamTaskResetMsg p = {.streamId = streamId, .transId = transId, .checkpointId = checkpointId};
×
141

142
  // let's remember that this trans had been killed already
143
  void *px = taosArrayPush(execInfo.pKilledChkptTrans, &p);
×
144
  if (px == NULL) {
×
145
    mError("failed to push reset-msg trans:%d into the killed chkpt trans list, size:%d", transId, num - 1);
×
146
    return terrno;
×
147
  }
148

149
  SStreamTaskResetMsg *pReq = rpcMallocCont(size);
×
150
  if (pReq == NULL) {
×
151
    return terrno;
×
152
  }
153

154
  pReq->streamId = streamId;
×
155
  pReq->transId = transId;
×
156
  pReq->checkpointId = checkpointId;
×
157

158
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_TASK_RESET, .pCont = pReq, .contLen = size};
×
159
  int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
160
  if (code) {
×
161
    mError("failed to put reset-task msg into write queue, code:%s", tstrerror(code));
×
162
  } else {
163
    mDebug("send reset task status msg for transId:%d succ", transId);
×
164
  }
165

166
  return code;
×
167
}
168

UNCOV
169
int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode) {  // here reuse the doCheckpointmsg
×
UNCOV
170
  int32_t size = sizeof(SMStreamDoCheckpointMsg);
×
UNCOV
171
  void   *pMsg = rpcMallocCont(size);
×
UNCOV
172
  if (pMsg == NULL) {
×
173
    return terrno;
×
174
  }
175

UNCOV
176
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_UPDATE_CHKPT_EVT, .pCont = pMsg, .contLen = size};
×
UNCOV
177
  int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
UNCOV
178
  if (code) {
×
179
    mError("failed to put update-checkpoint-info msg into write queue, code:%s", tstrerror(code));
×
180
  } else {
UNCOV
181
    mDebug("send update checkpoint-info msg succ");
×
182
  }
183

UNCOV
184
  return code;
×
185
}
186

UNCOV
187
int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList) {
×
UNCOV
188
  SMStreamDropOrphanMsg msg = {.pList = pList};
×
189

UNCOV
190
  int32_t num = taosArrayGetSize(pList);
×
UNCOV
191
  int32_t contLen = tSerializeDropOrphanTaskMsg(NULL, 0, &msg);
×
UNCOV
192
  if (contLen <= 0) {
×
193
    return terrno;
×
194
  }
195

UNCOV
196
  void *pReq = rpcMallocCont(contLen);
×
UNCOV
197
  if (pReq == NULL) {
×
198
    return terrno;
×
199
  }
200

UNCOV
201
  int32_t code = tSerializeDropOrphanTaskMsg(pReq, contLen, &msg);
×
UNCOV
202
  if (code <= 0) {
×
203
    mError("failed to serialize the drop orphan task msg, code:%s", tstrerror(code));
×
204
  }
205

UNCOV
206
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_DROP_ORPHANTASKS, .pCont = pReq, .contLen = contLen};
×
UNCOV
207
  code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
UNCOV
208
  if (code) {
×
209
    mError("failed to put drop-orphan task msg into write queue, code:%s", tstrerror(code));
×
210
  } else {
UNCOV
211
    mDebug("send drop %d orphan tasks msg succ", num);
×
212
  }
213

UNCOV
214
  return code;
×
215
}
216

217
int32_t mndProcessResetStatusReq(SRpcMsg *pReq) {
×
218
  SMnode     *pMnode = pReq->info.node;
×
219
  int32_t     code = TSDB_CODE_SUCCESS;
×
220
  SStreamObj *pStream = NULL;
×
221

222
  SStreamTaskResetMsg *pMsg = pReq->pCont;
×
223
  mndKillTransImpl(pMnode, pMsg->transId, "");
×
224

225
  streamMutexLock(&execInfo.lock);
×
226
  code = mndResetChkptReportInfo(execInfo.pChkptStreams, pMsg->streamId);  // do thing if failed
×
227
  streamMutexUnlock(&execInfo.lock);
×
228

229
  code = mndGetStreamObj(pMnode, pMsg->streamId, &pStream);
×
230
  if (pStream == NULL || code != 0) {
×
231
    code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
232
    mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid);
×
233
  } else {
234
    code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false);
×
235
    if (code) {
×
236
      mError("stream:%s other trans exists in DB:%s, dstTable:%s failed to start reset-status trans", pStream->name,
×
237
             pStream->sourceDb, pStream->targetSTbName);
238
    } else {
239
      mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
×
240
             pStream->uid, pMsg->transId);
241
      code = mndCreateStreamResetStatusTrans(pMnode, pStream, pMsg->checkpointId);
×
242
    }
243
  }
244

245
  mndReleaseStream(pMnode, pStream);
×
246
  return code;
×
247
}
248

UNCOV
249
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
×
UNCOV
250
  int32_t num = taosArrayGetSize(pNodeList);
×
UNCOV
251
  mInfo("set node expired for %d nodes", num);
×
252

UNCOV
253
  for (int k = 0; k < num; ++k) {
×
UNCOV
254
    int32_t *pVgId = taosArrayGet(pNodeList, k);
×
UNCOV
255
    if (pVgId == NULL) {
×
256
      continue;
×
257
    }
258

UNCOV
259
    mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num);
×
260

UNCOV
261
    bool    setFlag = false;
×
UNCOV
262
    int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
×
263

UNCOV
264
    for (int i = 0; i < numOfNodes; ++i) {
×
UNCOV
265
      SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
×
UNCOV
266
      if ((pNodeEntry) && (pNodeEntry->nodeId == *pVgId)) {
×
UNCOV
267
        mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId);
×
UNCOV
268
        pNodeEntry->stageUpdated = true;
×
UNCOV
269
        setFlag = true;
×
UNCOV
270
        break;
×
271
      }
272
    }
273

UNCOV
274
    if (!setFlag) {
×
275
      mError("failed to set nodeUpdate flag, nodeId:%d not exists in nodelist", *pVgId);
×
276
      return TSDB_CODE_FAILED;
×
277
    }
278
  }
UNCOV
279
  return TSDB_CODE_SUCCESS;
×
280
}
281

282
int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info) {
×
283
  SSdb       *pSdb = pMnode->pSdb;
×
284
  SStreamObj *pStream = NULL;
×
285
  void       *pIter = NULL;
×
286
  int32_t     code = 0;
×
287

288
  while (1) {
289
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
290
    if (pIter == NULL) break;
×
291

292
    if (pStream->status != STREAM_STATUS__PAUSE) {
×
293
      SMPauseStreamReq reqPause = {0};
×
294
      tstrncpy(reqPause.name, pStream->name, sizeof(reqPause.name));
×
295
      reqPause.igNotExists = 1;
×
296

297
      int32_t contLen = tSerializeSMPauseStreamReq(NULL, 0, &reqPause);
×
298
      void   *pHead = rpcMallocCont(contLen);
×
299
      if (pHead == NULL) {
×
300
        code = TSDB_CODE_OUT_OF_MEMORY;
×
301
        sdbRelease(pSdb, pStream);
×
302
        continue;
×
303
      }
304

305
      code = tSerializeSMPauseStreamReq(pHead, contLen, &reqPause);
×
306
      if (code) {
×
307
        sdbRelease(pSdb, pStream);
×
308
        continue;
×
309
      }
310

311
      SRpcMsg rpcMsg = {
×
312
          .msgType = TDMT_MND_PAUSE_STREAM,
313
          .pCont = pHead,
314
          .contLen = contLen,
315
          .info = *info,
316
      };
317

318
      code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
×
319
      mInfo("receive pause stream:%s, %s, %" PRId64 ", because grant expired, code:%s", pStream->name, reqPause.name,
×
320
            pStream->uid, tstrerror(code));
321
    }
322

323
    sdbRelease(pSdb, pStream);
×
324
  }
325
  return code;
×
326
}
327

UNCOV
328
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
×
UNCOV
329
  SMnode      *pMnode = pReq->info.node;
×
UNCOV
330
  SStreamHbMsg req = {0};
×
UNCOV
331
  SArray      *pFailedChkpt = NULL;
×
UNCOV
332
  SArray      *pOrphanTasks = NULL;
×
UNCOV
333
  int32_t      code = 0;
×
UNCOV
334
  SDecoder     decoder = {0};
×
UNCOV
335
  SEpSet       mnodeEpset = {0};
×
336

UNCOV
337
  if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
×
338
    if (suspendAllStreams(pMnode, &pReq->info) < 0) {
×
339
      return code;
×
340
    }
341
  }
342

UNCOV
343
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
×
344

UNCOV
345
  if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
×
346
    tCleanupStreamHbMsg(&req);
×
347
    tDecoderClear(&decoder);
×
348
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
349
  }
UNCOV
350
  tDecoderClear(&decoder);
×
351

UNCOV
352
  mDebug("receive stream-meta hb from vgId:%d, active numOfTasks:%d, HbMsgId:%d, HbMsgTs:%" PRId64, req.vgId,
×
353
         req.numOfTasks, req.msgId, req.ts);
354

UNCOV
355
  pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
×
UNCOV
356
  pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask));
×
UNCOV
357
  if (pFailedChkpt == NULL || pOrphanTasks == NULL) {
×
358
    taosArrayDestroy(pFailedChkpt);
×
359
    taosArrayDestroy(pOrphanTasks);
×
360
    TAOS_RETURN(terrno);
×
361
  }
362

UNCOV
363
  mndGetMnodeEpSet(pMnode, &mnodeEpset);
×
364

UNCOV
365
  streamMutexLock(&execInfo.lock);
×
366

UNCOV
367
  mndInitStreamExecInfo(pMnode, &execInfo);
×
UNCOV
368
  if (!validateHbMsg(execInfo.pNodeList, req.vgId)) {
×
UNCOV
369
    mError("vgId:%d not exists in nodeList buf, discarded", req.vgId);
×
370

UNCOV
371
    doSendHbMsgRsp(terrno, &pReq->info, &mnodeEpset, req.vgId, req.msgId);
×
372

UNCOV
373
    streamMutexUnlock(&execInfo.lock);
×
UNCOV
374
    cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
×
UNCOV
375
    TAOS_RETURN(TSDB_CODE_INVALID_MSG);
×
376
  }
377

UNCOV
378
  for (int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) {
×
UNCOV
379
    SNodeEntry *pEntry = taosArrayGet(execInfo.pNodeList, i);
×
UNCOV
380
    if (pEntry == NULL) {
×
381
      continue;
×
382
    }
383

UNCOV
384
    if (pEntry->nodeId != req.vgId) {
×
UNCOV
385
      continue;
×
386
    }
387

UNCOV
388
    if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) {
×
UNCOV
389
      mError("vgId:%d HbMsgId:%d already handled, bh msg discard, and send HbRsp", pEntry->nodeId, req.msgId);
×
390

391
      // return directly and allow the vnode to continue to send the next HbMsg.
UNCOV
392
      terrno = TSDB_CODE_SUCCESS;
×
UNCOV
393
      doSendHbMsgRsp(terrno, &pReq->info, &mnodeEpset, req.vgId, req.msgId);
×
394

UNCOV
395
      streamMutexUnlock(&execInfo.lock);
×
UNCOV
396
      cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
×
UNCOV
397
      return terrno;
×
398
    } else {
UNCOV
399
      pEntry->lastHbMsgId = req.msgId;
×
UNCOV
400
      pEntry->lastHbMsgTs = req.ts;
×
401
    }
402
  }
403

UNCOV
404
  int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
×
UNCOV
405
  if (numOfUpdated > 0) {
×
UNCOV
406
    mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId);
×
UNCOV
407
    int32_t unused = setNodeEpsetExpiredFlag(req.pUpdateNodes);
×
408
  }
409

UNCOV
410
  bool snodeChanged = false;
×
UNCOV
411
  for (int32_t i = 0; i < req.numOfTasks; ++i) {
×
UNCOV
412
    STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
×
UNCOV
413
    if (p == NULL) {
×
414
      continue;
×
415
    }
416

UNCOV
417
    STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
×
UNCOV
418
    if (pTaskEntry == NULL) {
×
UNCOV
419
      checkforOrphanTask(pMnode, p, pOrphanTasks);
×
UNCOV
420
      continue;
×
421
    }
422

UNCOV
423
    STaskCkptInfo *pChkInfo = &p->checkpointInfo;
×
UNCOV
424
    if (pChkInfo->consensusChkptId != 0) {
×
UNCOV
425
      SRestoreCheckpointInfo cp = {
×
UNCOV
426
          .streamId = p->id.streamId,
×
UNCOV
427
          .taskId = p->id.taskId,
×
UNCOV
428
          .checkpointId = p->checkpointInfo.latestId,
×
UNCOV
429
          .startTs = pChkInfo->consensusTs,
×
UNCOV
430
          .nodeId = p->nodeId,
×
UNCOV
431
          .term = p->stage,
×
432
      };
433

UNCOV
434
      SStreamObj *pStream = NULL;
×
UNCOV
435
      code = mndGetStreamObj(pMnode, p->id.streamId, &pStream);
×
UNCOV
436
      if (code) {
×
437
        mError("stream:0x%" PRIx64 " not exist, failed to handle consensus checkpoint-info req for task:0x%x, code:%s",
×
438
               p->id.streamId, (int32_t)p->id.taskId, tstrerror(code));
439
        continue;
×
440
      }
441

UNCOV
442
      int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
×
443

UNCOV
444
      SCheckpointConsensusInfo *pInfo = NULL;
×
UNCOV
445
      code = mndGetConsensusInfo(execInfo.pStreamConsensus, p->id.streamId, numOfTasks, &pInfo);
×
UNCOV
446
      if (code == 0) {
×
UNCOV
447
        mndAddConsensusTasks(pInfo, &cp);
×
448
      } else {
449
        mError("failed to get consensus checkpoint-info for stream:0x%" PRIx64, p->id.streamId);
×
450
      }
451

UNCOV
452
      mndReleaseStream(pMnode, pStream);
×
453
    }
454

UNCOV
455
    if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
×
UNCOV
456
      updateStageInfo(pTaskEntry, p->stage);
×
UNCOV
457
      if (pTaskEntry->nodeId == SNODE_HANDLE) {
×
UNCOV
458
        snodeChanged = true;
×
459
      }
460
    } else {
UNCOV
461
      streamTaskStatusCopy(pTaskEntry, p);
×
462

UNCOV
463
      if ((pChkInfo->activeId != 0) && pChkInfo->failed) {
×
464
        mError("stream task:0x%" PRIx64 " checkpointId:%" PRId64 " transId:%d failed, kill it", p->id.taskId,
×
465
               pChkInfo->activeId, pChkInfo->activeTransId);
466

467
        SFailedCheckpointInfo info = {
×
468
            .transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId};
×
469
        addIntoFailedChkptList(pFailedChkpt, &info);
×
470

471
        // remove failed trans from pChkptStreams
472
        code = mndResetChkptReportInfo(execInfo.pChkptStreams, p->id.streamId);
×
473
        if (code) {
×
474
          mError("failed to remove stream:0x%" PRIx64 " in checkpoint stream list", p->id.streamId);
×
475
        }
476
      }
477
    }
478

UNCOV
479
    if (p->status != TASK_STATUS__READY) {
×
UNCOV
480
      mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
×
481
    }
482
  }
483

484
  // current checkpoint is failed, rollback from the checkpoint trans
485
  // kill the checkpoint trans and then set all tasks status to be normal
UNCOV
486
  if (taosArrayGetSize(pFailedChkpt) > 0) {
×
487
    bool allReady = true;
×
488

489
    if (pMnode != NULL) {
×
490
      SArray *p = NULL;
×
491
      code = mndTakeVgroupSnapshot(pMnode, &allReady, &p, NULL);
×
492
      taosArrayDestroy(p);
×
493
      if (code) {
×
494
        mError("failed to get the vgroup snapshot, ignore it and continue");
×
495
      }
496
    } else {
497
      allReady = false;
×
498
    }
499

500
    if (allReady || snodeChanged) {
×
501
      // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
502
      for (int32_t i = 0; i < taosArrayGetSize(pFailedChkpt); ++i) {
×
503
        SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedChkpt, i);
×
504
        if (pInfo == NULL) {
×
505
          continue;
×
506
        }
507

508
        mInfo("stream:0x%" PRIx64 " checkpointId:%" PRId64
×
509
              " transId:%d failed issue task-reset trans to reset all tasks status",
510
              pInfo->streamUid, pInfo->checkpointId, pInfo->transId);
511

512
        code = mndSendResetFromCheckpointMsg(pMnode, pInfo->streamUid, pInfo->transId, pInfo->checkpointId);
×
513
        if (code) {
×
514
          mError("failed to create reset task trans, code:%s", tstrerror(code));
×
515
        }
516
      }
517
    } else {
518
      mInfo("not all vgroups are ready, wait for next HB from stream tasks to reset the task status");
×
519
    }
520
  }
521

522
  // handle the orphan tasks that are invalid but not removed in some vnodes or snode due to some unknown errors.
UNCOV
523
  if (taosArrayGetSize(pOrphanTasks) > 0) {
×
UNCOV
524
    code = mndSendDropOrphanTasksMsg(pMnode, pOrphanTasks);
×
UNCOV
525
    if (code) {
×
526
      mError("failed to send drop orphan tasks msg, code:%s, try next time", tstrerror(code));
×
527
    }
528
  }
529

UNCOV
530
  if (pMnode != NULL) {  // make sure that the unit test case can work
×
UNCOV
531
    code = mndStreamSendUpdateChkptInfoMsg(pMnode);
×
UNCOV
532
    if (code) {
×
533
      mError("failed to send update checkpointInfo msg, code:%s, try next time", tstrerror(code));
×
534
    }
535
  }
536

UNCOV
537
  streamMutexUnlock(&execInfo.lock);
×
538

UNCOV
539
  doSendHbMsgRsp(TSDB_CODE_SUCCESS, &pReq->info, &mnodeEpset, req.vgId, req.msgId);
×
UNCOV
540
  cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
×
541

UNCOV
542
  return code;
×
543
}
544

UNCOV
545
bool validateHbMsg(const SArray *pNodeList, int32_t vgId) {
×
UNCOV
546
  for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
×
UNCOV
547
    SNodeEntry *pEntry = taosArrayGet(pNodeList, i);
×
UNCOV
548
    if ((pEntry) && (pEntry->nodeId == vgId)) {
×
UNCOV
549
      return true;
×
550
    }
551
  }
552

UNCOV
553
  return false;
×
554
}
555

UNCOV
556
void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks) {
×
UNCOV
557
  tCleanupStreamHbMsg(pReq);
×
UNCOV
558
  taosArrayDestroy(pFailedChkptList);
×
UNCOV
559
  taosArrayDestroy(pOrphanTasks);
×
UNCOV
560
}
×
561

UNCOV
562
void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, SEpSet* pMndEpset, int32_t vgId, int32_t msgId) {
×
UNCOV
563
  int32_t ret = 0;
×
UNCOV
564
  int32_t tlen = 0;
×
UNCOV
565
  void   *buf = NULL;
×
566

UNCOV
567
  SMStreamHbRspMsg msg = {.msgId = msgId};//, .mndEpset = *pMndEpset};
×
UNCOV
568
  epsetAssign(&msg.mndEpset, pMndEpset);
×
569

UNCOV
570
  tEncodeSize(tEncodeStreamHbRsp, &msg, tlen, ret);
×
UNCOV
571
  if (ret < 0) {
×
572
    mError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
×
573
  }
574

UNCOV
575
  buf = rpcMallocCont(tlen + sizeof(SMsgHead));
×
UNCOV
576
  if (buf == NULL) {
×
577
    mError("encode stream hb msg rsp failed, code:%s", tstrerror(terrno));
×
578
    return;
×
579
  }
580

UNCOV
581
  ((SMStreamHbRspMsg *)buf)->head.vgId = htonl(vgId);
×
UNCOV
582
  void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
×
583

584
  SEncoder encoder;
UNCOV
585
  tEncoderInit(&encoder, abuf, tlen);
×
UNCOV
586
  if ((code = tEncodeStreamHbRsp(&encoder, &msg)) < 0) {
×
587
    rpcFreeCont(buf);
×
588
    tEncoderClear(&encoder);
×
589
    mError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
×
590
    return;
×
591
  }
UNCOV
592
  tEncoderClear(&encoder);
×
593

UNCOV
594
  SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = tlen + sizeof(SMsgHead), .pCont = buf};
×
595

UNCOV
596
  tmsgSendRsp(&rsp);
×
UNCOV
597
  pRpcInfo->handle = NULL;  // disable auto rsp
×
598
}
599

UNCOV
600
void checkforOrphanTask(SMnode *pMnode, STaskStatusEntry *p, SArray *pOrphanTasks) {
×
UNCOV
601
  SStreamObj *pStream = NULL;
×
602

UNCOV
603
  int32_t code = mndGetStreamObj(pMnode, p->id.streamId, &pStream);
×
UNCOV
604
  if (code) {
×
UNCOV
605
    mError("stream:0x%" PRIx64 " not exists, s-task:0x%" PRIx64 " not found in task list, add into orphan list",
×
606
           p->id.streamId, p->id.taskId);
607

UNCOV
608
    SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId};
×
UNCOV
609
    void       *px = taosArrayPush(pOrphanTasks, &oTask);
×
UNCOV
610
    if (px == NULL) {
×
611
      mError("failed to put task into orphan list, taskId:0x%" PRIx64 ", code:%s", p->id.taskId, tstrerror(terrno));
×
612
    }
613
  } else {
UNCOV
614
    if (pStream != NULL) {
×
UNCOV
615
      mndReleaseStream(pMnode, pStream);
×
616
    }
617

UNCOV
618
    mError("s-task:0x%" PRIx64 " not found in task list but exists in mnode meta, data inconsistent, not drop yet",
×
619
           p->id.taskId);
620
  }
UNCOV
621
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc