• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4875

09 Dec 2025 01:22AM UTC coverage: 64.472% (-0.2%) from 64.623%
#4875

push

travis-ci

guanshengliang
fix: temporarily disable memory leak detection for UDF tests (#33856)

162014 of 251293 relevant lines covered (64.47%)

104318075.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.31
/source/dnode/mnode/impl/src/mndSync.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndCluster.h"
18
#include "mndStream.h"
19
#include "mndSync.h"
20
#include "mndTrans.h"
21
#include "mndUser.h"
22

23
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
×
24
  if (pMsg == NULL || pMsg->pCont == NULL) {
×
25
    return -1;
×
26
  }
27

28
  SMsgHead *pHead = pMsg->pCont;
×
29
  pHead->contLen = htonl(pHead->contLen);
×
30
  pHead->vgId = htonl(pHead->vgId);
×
31

32
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
×
33
    rpcFreeCont(pMsg->pCont);
×
34
    pMsg->pCont = NULL;
×
35
    return -1;
×
36
  }
37

38
  int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
×
39
  if (code != 0) {
×
40
    rpcFreeCont(pMsg->pCont);
×
41
    pMsg->pCont = NULL;
×
42
  }
43
  return code;
×
44
}
45

46
static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
43,354,745✔
47
  if (pMsg == NULL || pMsg->pCont == NULL) {
43,354,745✔
48
    return -1;
×
49
  }
50

51
  SMsgHead *pHead = pMsg->pCont;
43,354,745✔
52
  pHead->contLen = htonl(pHead->contLen);
43,354,745✔
53
  pHead->vgId = htonl(pHead->vgId);
43,354,745✔
54

55
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
43,354,745✔
56
    rpcFreeCont(pMsg->pCont);
×
57
    pMsg->pCont = NULL;
×
58
    return -1;
×
59
  }
60

61
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
43,354,745✔
62
  if (code != 0) {
43,354,745✔
63
    rpcFreeCont(pMsg->pCont);
4,483✔
64
    pMsg->pCont = NULL;
4,483✔
65
  }
66
  return code;
43,354,745✔
67
}
68

69
static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
21,954,985✔
70
  int32_t code = tmsgSendSyncReq(pEpSet, pMsg);
21,954,985✔
71
  // if (code != 0) {
72
  //   rpcFreeCont(pMsg->pCont);
73
  //   pMsg->pCont = NULL;
74
  // }
75
  return code;
21,954,985✔
76
}
77

78
static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
12,488,012✔
79
  SSdbRaw *pRaw = pAction->pRaw;
12,488,012✔
80
  SSdb    *pSdb = pMnode->pSdb;
12,488,012✔
81
  int      code = 0;
12,488,012✔
82

83
  if (pRaw->status != SDB_STATUS_CREATING) goto _OUT;
12,488,012✔
84

85
  SdbValidateFp validateFp = pSdb->validateFps[pRaw->type];
4,181,934✔
86
  if (validateFp) {
4,181,934✔
87
    code = validateFp(pMnode, pTrans, pRaw);
4,154,914✔
88
  }
89

90
_OUT:
27,020✔
91
  return code;
12,488,012✔
92
}
93

94
static int32_t mndTransValidatePrepareStage(SMnode *pMnode, STrans *pTrans) {
19,140,722✔
95
  int32_t code = -1;
19,140,722✔
96
  int32_t action = 0;
19,140,722✔
97

98
  int32_t numOfActions = taosArrayGetSize(pTrans->prepareActions);
19,140,722✔
99
  if (numOfActions == 0) {
19,140,722✔
100
    code = 0;
9,604,067✔
101
    goto _OUT;
9,604,067✔
102
  }
103

104
  mInfo("trans:%d, validate %d prepare actions.", pTrans->id, numOfActions);
9,536,655✔
105

106
  for (action = 0; action < numOfActions; ++action) {
22,024,667✔
107
    STransAction *pAction = taosArrayGet(pTrans->prepareActions, action);
12,488,012✔
108

109
    if (pAction->actionType != TRANS_ACTION_RAW) {
12,488,012✔
110
      mError("trans:%d, prepare action:%d of unexpected type:%d", pTrans->id, action, pAction->actionType);
×
111
      goto _OUT;
×
112
    }
113

114
    code = mndTransValidatePrepareAction(pMnode, pTrans, pAction);
12,488,012✔
115
    if (code != 0) {
12,488,012✔
116
      mError("trans:%d, failed to validate prepare action: %d, numOfActions:%d", pTrans->id, action, numOfActions);
×
117
      goto _OUT;
×
118
    }
119
  }
120

121
  code = 0;
9,536,655✔
122
_OUT:
19,140,722✔
123
  return code;
19,140,722✔
124
}
125

126
static int32_t mndTransValidateImp(SMnode *pMnode, STrans *pTrans) {
41,405,122✔
127
  int32_t code = 0;
41,405,122✔
128
  if (pTrans->stage == TRN_STAGE_PREPARE) {
41,405,122✔
129
    if ((code = mndTransCheckConflict(pMnode, pTrans)) < 0) {
19,140,722✔
130
      mError("trans:%d, failed to validate trans conflicts.", pTrans->id);
×
131
      TAOS_RETURN(code);
×
132
    }
133

134
    return mndTransValidatePrepareStage(pMnode, pTrans);
19,140,722✔
135
  }
136
  TAOS_RETURN(code);
22,264,400✔
137
}
138

139
static int32_t mndTransValidate(SMnode *pMnode, SSdbRaw *pRaw) {
41,405,122✔
140
  STrans *pTrans = NULL;
41,405,122✔
141
  int32_t code = -1;
41,405,122✔
142

143
  SSdbRow *pRow = mndTransDecode(pRaw);
41,405,122✔
144
  if (pRow == NULL) {
41,405,122✔
145
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
146
    if (terrno != 0) code = terrno;
×
147
    goto _OUT;
×
148
  }
149

150
  pTrans = sdbGetRowObj(pRow);
41,405,122✔
151
  if (pTrans == NULL) {
41,405,122✔
152
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
153
    if (terrno != 0) code = terrno;
×
154
    goto _OUT;
×
155
  }
156

157
  code = mndTransValidateImp(pMnode, pTrans);
41,405,122✔
158

159
_OUT:
41,405,122✔
160
  if (pTrans) mndTransDropData(pTrans);
41,405,122✔
161
  if (pRow) taosMemoryFreeClear(pRow);
41,405,122✔
162
  if (code) terrno = (terrno ? terrno : TSDB_CODE_MND_TRANS_CONFLICT);
41,405,122✔
163
  TAOS_RETURN(code);
41,405,122✔
164
}
165

166
int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
41,405,122✔
167
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
41,405,122✔
168
  SSdbRaw   *pRaw = pMsg->pCont;
41,405,122✔
169
  STrans    *pTrans = NULL;
41,405,122✔
170
  int32_t    code = -1;
41,405,122✔
171
  int32_t    transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
41,405,122✔
172

173
  if (transId <= 0) {
41,405,122✔
174
    mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq);
×
175
    code = TSDB_CODE_INVALID_MSG;
×
176
    goto _OUT;
×
177
  }
178

179
  mInfo("trans:%d, process sync proposal, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
41,405,122✔
180
        " role:%s raw:%p sec:%d seq:%" PRId64,
181
        transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state),
182
        pRaw, pMgmt->transSec, pMgmt->transSeq);
183

184
  code = mndTransValidate(pMnode, pRaw);
41,405,122✔
185
  if (code != 0) {
41,405,122✔
186
    mError("trans:%d, failed to validate requested trans since %s", transId, terrstr());
×
187
    // code = 0;
188
    pMeta->code = code;
×
189
    goto _OUT;
×
190
  }
191

192
  (void)taosThreadMutexLock(&pMnode->pSdb->filelock);
41,405,122✔
193
  code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
41,405,122✔
194
  if (code != 0) {
41,405,122✔
195
    mError("trans:%d, failed to write to sdb since %s", transId, terrstr());
×
196
    // code = 0;
197
    (void)taosThreadMutexUnlock(&pMnode->pSdb->filelock);
×
198
    pMeta->code = code;
×
199
    goto _OUT;
×
200
  }
201
  (void)taosThreadMutexUnlock(&pMnode->pSdb->filelock);
41,405,122✔
202

203
  pTrans = mndAcquireTrans(pMnode, transId);
41,405,122✔
204
  if (pTrans == NULL) {
41,405,122✔
205
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
206
    if (terrno != 0) code = terrno;
×
207
    mError("trans:%d, not found while execute in mnode since %s", transId, tstrerror(code));
×
208
    goto _OUT;
×
209
  }
210

211
  if (pTrans->stage == TRN_STAGE_PREPARE) {
41,405,122✔
212
    bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans, false);
19,140,722✔
213
    if (!continueExec) {
19,140,722✔
214
      if (terrno != 0) code = terrno;
×
215
      goto _OUT;
×
216
    }
217
  }
218

219
  mInfo("trans:%d, refresh transaction in process write msg", transId);
41,405,122✔
220
  mndTransRefresh(pMnode, pTrans);
41,405,122✔
221
  mInfo("trans:%d, refresh transaction in process write msg finished", transId);
41,405,122✔
222

223
  sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
41,405,122✔
224
  code = sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta);
41,405,122✔
225

226
_OUT:
41,405,122✔
227
  if (pTrans) mndReleaseTrans(pMnode, pTrans);
41,405,122✔
228
  TAOS_RETURN(code);
41,405,122✔
229
}
230

231
static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
41,971,713✔
232
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
41,971,713✔
233
  (void)taosThreadMutexLock(&pMgmt->lock);
41,971,713✔
234
  if (pMgmt->transId == 0) {
41,971,713✔
235
    goto _OUT;
7,090,557✔
236
  }
237

238
  int32_t transId = pMgmt->transId;
34,881,156✔
239
  pMgmt->transId = 0;
34,881,156✔
240
  pMgmt->transSec = 0;
34,881,156✔
241
  pMgmt->transSeq = 0;
34,881,156✔
242
  pMgmt->errCode = code;
34,881,156✔
243
  if (tsem_post(&pMgmt->syncSem) < 0) {
34,881,156✔
244
    mError("trans:%d, failed to post sem", transId);
×
245
  }
246

247
  if (pMgmt->errCode != 0) {
34,881,156✔
248
    mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
×
249
  } else {
250
    mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, transId, pMgmt->transSeq);
34,881,156✔
251
  }
252

253
_OUT:
×
254
  (void)taosThreadMutexUnlock(&pMgmt->lock);
41,971,713✔
255
  return 0;
41,971,713✔
256
}
257

258
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
41,971,713✔
259
  SMnode *pMnode = pFsm->data;
41,971,713✔
260
  int32_t code = pMsg->code;
41,971,713✔
261
  if (code != 0) {
41,971,713✔
262
    goto _OUT;
×
263
  }
264

265
  pMsg->info.conn.applyIndex = pMeta->index;
41,971,713✔
266
  pMsg->info.conn.applyTerm = pMeta->term;
41,971,713✔
267
  pMeta->code = 0;
41,971,713✔
268

269
  atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
41,971,713✔
270

271
  if (!syncUtilUserCommit(pMsg->msgType)) {
41,971,713✔
272
    goto _OUT;
566,591✔
273
  }
274

275
  code = mndProcessWriteMsg(pMnode, pMsg, pMeta);
41,405,122✔
276

277
_OUT:
41,971,713✔
278
  mndPostMgmtCode(pMnode, code ? code : pMeta->code);
41,971,713✔
279
  rpcFreeCont(pMsg->pCont);
41,971,713✔
280
  pMsg->pCont = NULL;
41,971,713✔
281
  TAOS_RETURN(code);
41,971,713✔
282
}
283

284
SyncIndex mndSyncAppliedIndex(const SSyncFSM *pFSM) {
38,064,356✔
285
  SMnode *pMnode = pFSM->data;
38,064,356✔
286
  return atomic_load_64(&pMnode->applied);
38,064,356✔
287
}
288

289
int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
×
290
  mInfo("start to read snapshot from sdb in atomic way");
×
291
  SMnode *pMnode = pFsm->data;
×
292
  return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm,
×
293
                      &pSnapshot->lastConfigIndex);
×
294
  return 0;
295
}
296

297
static int32_t mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
13,342,942✔
298
  SMnode *pMnode = pFsm->data;
13,342,942✔
299
  sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
13,342,942✔
300
  return 0;
13,342,942✔
301
}
302

303
void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
925,498✔
304
  SMnode *pMnode = pFsm->data;
925,498✔
305

306
  if (!pMnode->deploy) {
925,498✔
307
    if (!pMnode->restored) {
268,322✔
308
      mInfo("vgId:1, sync restore finished, and will handle outstanding transactions");
174,386✔
309
      mndTransPullup(pMnode);
174,386✔
310
      mndSetRestored(pMnode, true);
174,386✔
311
    } else {
312
      mInfo("vgId:1, sync restore finished, repeat call");
93,936✔
313
    }
314
  } else {
315
    mInfo("vgId:1, sync restore finished");
657,176✔
316
  }
317
  int32_t code = mndRefreshUserIpWhiteList(pMnode);
925,498✔
318
  if (code != 0) {
925,498✔
319
    mError("vgId:1, failed to refresh user ip white list since %s", tstrerror(code));
×
320
    mndSetRestored(pMnode, false);
×
321
  }
322

323
  code = mndRefreshUserDateTimeWhiteList(pMnode);
925,498✔
324
  if (code != 0) {
925,498✔
325
    mError("vgId:1, failed to refresh user date time white list since %s", tstrerror(code));
×
326
    mndSetRestored(pMnode, false);
×
327
  }
328

329
  SyncIndex fsmIndex = mndSyncAppliedIndex(pFsm);
925,498✔
330
  if (commitIdx != fsmIndex) {
925,498✔
331
    mError("vgId:1, failed to sync restore, commitIdx:%" PRId64 " is not equal to appliedIdx:%" PRId64, commitIdx,
×
332
           fsmIndex);
333
    mndSetRestored(pMnode, false);
×
334
  }
335
}
925,498✔
336

337
void mndAfterRestored(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
925,498✔
338
  SMnode *pMnode = pFsm->data;
925,498✔
339

340
  if (!pMnode->deploy) {
925,498✔
341
    if (sdbAfterRestored(pMnode->pSdb) != 0) {
268,322✔
342
      mError("failed to prepare sdb while start mnode");
×
343
    }
344
    mInfo("vgId:1, sync restore finished and restore sdb success");
268,322✔
345
  }
346
}
925,498✔
347

348
int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
×
349
  mInfo("start to read snapshot from sdb");
×
350
  SMnode *pMnode = pFsm->data;
×
351
  return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL);
×
352
}
353

354
static void mndSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
×
355
  mInfo("stop to read snapshot from sdb");
×
356
  SMnode *pMnode = pFsm->data;
×
357
  sdbStopRead(pMnode->pSdb, pReader);
×
358
}
×
359

360
int32_t mndSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
×
361
  SMnode *pMnode = pFsm->data;
×
362
  return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len);
×
363
}
364

365
int32_t mndSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
×
366
  mInfo("start to apply snapshot to sdb");
×
367
  SMnode *pMnode = pFsm->data;
×
368
  return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter);
×
369
}
370

371
int32_t mndSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
×
372
  mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply,
×
373
        pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
374
  SMnode *pMnode = pFsm->data;
×
375
  return sdbStopWrite(pMnode->pSdb, pWriter, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm,
×
376
                      pSnapshot->lastConfigIndex);
377
}
378

379
int32_t mndSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
×
380
  SMnode *pMnode = pFsm->data;
×
381
  return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len);
×
382
}
383

384
static void mndBecomeFollower(const SSyncFSM *pFsm) {
99,299✔
385
  SMnode    *pMnode = pFsm->data;
99,299✔
386
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
99,299✔
387
  mInfo("vgId:1, becomefollower callback");
99,299✔
388

389
  (void)taosThreadMutexLock(&pMgmt->lock);
99,299✔
390
  if (pMgmt->transId != 0) {
99,299✔
391
    mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
×
392
    pMgmt->transId = 0;
×
393
    pMgmt->transSec = 0;
×
394
    pMgmt->transSeq = 0;
×
395
    pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
×
396
    if (tsem_post(&pMgmt->syncSem) < 0) {
×
397
      mError("failed to post sem");
×
398
    }
399
  }
400
  (void)taosThreadMutexUnlock(&pMgmt->lock);
99,299✔
401

402
  msmHandleBecomeNotLeader(pMnode);  
99,299✔
403
}
99,299✔
404

405
static void mndBecomeLearner(const SSyncFSM *pFsm) {
30,384✔
406
  SMnode    *pMnode = pFsm->data;
30,384✔
407
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
30,384✔
408
  mInfo("vgId:1, become learner");
30,384✔
409

410
  (void)taosThreadMutexLock(&pMgmt->lock);
30,384✔
411
  if (pMgmt->transId != 0) {
30,384✔
412
    mInfo("vgId:1, become learner and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
×
413
    pMgmt->transId = 0;
×
414
    pMgmt->transSec = 0;
×
415
    pMgmt->transSeq = 0;
×
416
    pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
×
417
    if (tsem_post(&pMgmt->syncSem) < 0) {
×
418
      mError("failed to post sem");
×
419
    }
420
  }
421
  (void)taosThreadMutexUnlock(&pMgmt->lock);
30,384✔
422

423
  msmHandleBecomeNotLeader(pMnode);  
30,384✔
424
}
30,384✔
425

426
static void mndBecomeLeader(const SSyncFSM *pFsm) {
432,262✔
427
  mInfo("vgId:1, becomeleader callback");
432,262✔
428
  SMnode *pMnode = pFsm->data;
432,262✔
429

430
  msmHandleBecomeLeader(pMnode);
432,262✔
431
}
432,262✔
432

433
static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) {
×
434
  SMnode *pMnode = pFsm->data;
×
435

436
  if (pMnode != NULL && pMnode->msgCb.qsizeFp != NULL) {
×
437
    int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
×
438
    return (itemSize == 0);
×
439
  } else {
440
    return true;
×
441
  }
442
}
443

444
static int32_t mndApplyQueueItems(const SSyncFSM *pFsm) {
1,791,229✔
445
  SMnode *pMnode = pFsm->data;
1,791,229✔
446

447
  if (pMnode != NULL && pMnode->msgCb.qsizeFp != NULL) {
1,791,229✔
448
    int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
×
449
    return itemSize;
×
450
  } else {
451
    return -1;
1,791,229✔
452
  }
453
}
454

455
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
504,390✔
456
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
504,390✔
457
  pFsm->data = pMnode;
504,390✔
458
  pFsm->FpCommitCb = mndSyncCommitMsg;
504,390✔
459
  pFsm->FpAppliedIndexCb = mndSyncAppliedIndex;
504,390✔
460
  pFsm->FpPreCommitCb = NULL;
504,390✔
461
  pFsm->FpRollBackCb = NULL;
504,390✔
462
  pFsm->FpRestoreFinishCb = mndRestoreFinish;
504,390✔
463
  pFsm->FpAfterRestoredCb = mndAfterRestored;
504,390✔
464
  pFsm->FpLeaderTransferCb = NULL;
504,390✔
465
  pFsm->FpApplyQueueEmptyCb = mndApplyQueueEmpty;
504,390✔
466
  pFsm->FpApplyQueueItems = mndApplyQueueItems;
504,390✔
467
  pFsm->FpReConfigCb = NULL;
504,390✔
468
  pFsm->FpBecomeLeaderCb = mndBecomeLeader;
504,390✔
469
  pFsm->FpBecomeAssignedLeaderCb = NULL;
504,390✔
470
  pFsm->FpBecomeFollowerCb = mndBecomeFollower;
504,390✔
471
  pFsm->FpBecomeLearnerCb = mndBecomeLearner;
504,390✔
472
  pFsm->FpGetSnapshot = mndSyncGetSnapshot;
504,390✔
473
  pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo;
504,390✔
474
  pFsm->FpSnapshotStartRead = mndSnapshotStartRead;
504,390✔
475
  pFsm->FpSnapshotStopRead = mndSnapshotStopRead;
504,390✔
476
  pFsm->FpSnapshotDoRead = mndSnapshotDoRead;
504,390✔
477
  pFsm->FpSnapshotStartWrite = mndSnapshotStartWrite;
504,390✔
478
  pFsm->FpSnapshotStopWrite = mndSnapshotStopWrite;
504,390✔
479
  pFsm->FpSnapshotDoWrite = mndSnapshotDoWrite;
504,390✔
480
  return pFsm;
504,390✔
481
}
482

483
int32_t mndInitSync(SMnode *pMnode) {
504,390✔
484
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
504,390✔
485
  (void)taosThreadMutexInit(&pMgmt->lock, NULL);
504,390✔
486
  (void)taosThreadMutexLock(&pMgmt->lock);
504,390✔
487
  pMgmt->transId = 0;
504,390✔
488
  pMgmt->transSec = 0;
504,390✔
489
  pMgmt->transSeq = 0;
504,390✔
490
  (void)taosThreadMutexUnlock(&pMgmt->lock);
504,390✔
491

492
  SSyncInfo syncInfo = {
504,390✔
493
      .snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT,
494
      .batchSize = 1,
495
      .vgId = 1,
496
      .pWal = pMnode->pWal,
504,390✔
497
      .msgcb = &pMnode->msgCb,
504,390✔
498
      .syncSendMSg = mndSyncSendMsg,
499
      .syncEqMsg = mndSyncEqMsg,
500
      .syncEqCtrlMsg = mndSyncEqCtrlMsg,
501
      .pingMs = 5000,
502
      .electMs = tsMnodeElectIntervalMs,
503
      .heartbeatMs = tsMnodeHeartbeatIntervalMs,
504
  };
505

506
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
504,390✔
507
  syncInfo.pFsm = mndSyncMakeFsm(pMnode);
504,390✔
508

509
  SSyncCfg *pCfg = &syncInfo.syncCfg;
504,390✔
510
  mInfo("vgId:1, start to open mnode sync, replica:%d selfIndex:%d, electMs:%d, heartbeatMs:%d", pMgmt->numOfReplicas, 
504,390✔
511
    pMgmt->selfIndex, syncInfo.electMs, syncInfo.heartbeatMs);
512
  pCfg->totalReplicaNum = pMgmt->numOfTotalReplicas;
504,390✔
513
  pCfg->replicaNum = pMgmt->numOfReplicas;
504,390✔
514
  pCfg->myIndex = pMgmt->selfIndex;
504,390✔
515
  pCfg->lastIndex = pMgmt->lastIndex;
504,390✔
516
  for (int32_t i = 0; i < pMgmt->numOfTotalReplicas; ++i) {
984,380✔
517
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
479,990✔
518
    pNode->nodeId = pMgmt->replicas[i].id;
479,990✔
519
    pNode->nodePort = pMgmt->replicas[i].port;
479,990✔
520
    tstrncpy(pNode->nodeFqdn, pMgmt->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
479,990✔
521
    pNode->nodeRole = pMgmt->nodeRoles[i];
479,990✔
522
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
479,990✔
523
    mInfo("vgId:1, index:%d ep:%s:%u dnode:%d cluster:%" PRId64 ", update:%d", i, pNode->nodeFqdn, pNode->nodePort,
479,990✔
524
          pNode->nodeId, pNode->clusterId, update);
525
  }
526

527
  int32_t code = 0;
504,390✔
528
  if ((code = tsem_init(&pMgmt->syncSem, 0, 0)) < 0) {
504,390✔
529
    mError("failed to open sync, tsem_init, since %s", tstrerror(code));
×
530
    TAOS_RETURN(code);
×
531
  }
532
  pMgmt->sync = syncOpen(&syncInfo, 1);  // always check
504,390✔
533
  if (pMgmt->sync <= 0) {
504,390✔
534
    if (terrno != 0) code = terrno;
×
535
    mError("failed to open sync since %s", tstrerror(code));
×
536
    TAOS_RETURN(code);
×
537
  }
538
  pMnode->pSdb->sync = pMgmt->sync;
504,390✔
539

540
  mInfo("vgId:1, mnode sync is opened, id:%" PRId64, pMgmt->sync);
504,390✔
541
  TAOS_RETURN(code);
504,390✔
542
}
543

544
void mndCleanupSync(SMnode *pMnode) {
503,576✔
545
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
503,576✔
546
  syncStop(pMgmt->sync);
503,576✔
547
  mInfo("mnode-sync is stopped, id:%" PRId64, pMgmt->sync);
503,576✔
548

549
  if (tsem_destroy(&pMgmt->syncSem) < 0) {
503,576✔
550
    mError("failed to destroy sem");
×
551
  }
552
  (void)taosThreadMutexDestroy(&pMgmt->lock);
503,576✔
553
  memset(pMgmt, 0, sizeof(SSyncMgmt));
503,576✔
554
}
503,576✔
555

556
void mndSyncCheckTimeout(SMnode *pMnode) {
527,228✔
557
  mTrace("check sync timeout");
527,228✔
558
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
527,228✔
559
  (void)taosThreadMutexLock(&pMgmt->lock);
527,228✔
560
  if (pMgmt->transId != 0) {
527,228✔
561
    int32_t curSec = taosGetTimestampSec();
13,988✔
562
    int32_t delta = curSec - pMgmt->transSec;
13,988✔
563
    if (delta > MNODE_TIMEOUT_SEC) {
13,988✔
564
      mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
×
565
             pMgmt->transSec, curSec, delta, pMgmt->transSeq);
566
      // pMgmt->transId = 0;
567
      // pMgmt->transSec = 0;
568
      // pMgmt->transSeq = 0;
569
      // terrno = TSDB_CODE_SYN_TIMEOUT;
570
      // pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
571
      // if (tsem_post(&pMgmt->syncSem) < 0) {
572
      //  mError("failed to post sem");
573
      //}
574
    } else {
575
      mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
13,988✔
576
             pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq);
577
    }
578
  } else {
579
    // mTrace("check sync timeout msg, no trans waiting for confirm");
580
  }
581
  (void)taosThreadMutexUnlock(&pMgmt->lock);
527,228✔
582
}
527,228✔
583

584
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
34,886,944✔
585
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
34,886,944✔
586

587
  SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
34,886,944✔
588
  if (req.contLen <= 0) return terrno;
34,886,944✔
589

590
  req.pCont = rpcMallocCont(req.contLen);
34,886,944✔
591
  if (req.pCont == NULL) return terrno;
34,886,944✔
592
  memcpy(req.pCont, pRaw, req.contLen);
34,886,944✔
593

594
  (void)taosThreadMutexLock(&pMgmt->lock);
34,886,944✔
595
  pMgmt->errCode = 0;
34,886,944✔
596

597
  if (pMgmt->transId != 0) {
34,886,944✔
598
    mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
×
599
    (void)taosThreadMutexUnlock(&pMgmt->lock);
×
600
    rpcFreeCont(req.pCont);
×
601
    TAOS_RETURN(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED);
×
602
  }
603

604
  mInfo("trans:%d, will be proposed", transId);
34,886,944✔
605
  pMgmt->transId = transId;
34,886,944✔
606
  pMgmt->transSec = taosGetTimestampSec();
34,886,944✔
607

608
  int64_t seq = 0;
34,886,944✔
609
  int32_t code = syncPropose(pMgmt->sync, &req, false, &seq);
34,886,944✔
610
  if (code == 0) {
34,886,944✔
611
    mInfo("trans:%d, is proposing and wait sem, seq:%" PRId64, transId, seq);
34,882,461✔
612
    pMgmt->transSeq = seq;
34,882,461✔
613
    (void)taosThreadMutexUnlock(&pMgmt->lock);
34,882,461✔
614
    code = tsem_wait(&pMgmt->syncSem);
34,882,461✔
615
  } else if (code > 0) {
4,483✔
616
    mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
×
617
    pMgmt->transId = 0;
×
618
    pMgmt->transSec = 0;
×
619
    pMgmt->transSeq = 0;
×
620
    (void)taosThreadMutexUnlock(&pMgmt->lock);
×
621
    code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
×
622
    if (code == 0) {
×
623
      sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
×
624
    }
625
  } else {
626
    mError("trans:%d, failed to proposed since %s", transId, terrstr());
4,483✔
627
    pMgmt->transId = 0;
4,483✔
628
    pMgmt->transSec = 0;
4,483✔
629
    pMgmt->transSeq = 0;
4,483✔
630
    (void)taosThreadMutexUnlock(&pMgmt->lock);
4,483✔
631
    if (terrno == 0) {
4,483✔
632
      terrno = TSDB_CODE_APP_ERROR;
×
633
    }
634
  }
635

636
  rpcFreeCont(req.pCont);
34,886,944✔
637
  req.pCont = NULL;
34,886,944✔
638
  if (code != 0) {
34,886,944✔
639
    mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code);
4,483✔
640
    return code;
4,483✔
641
  }
642

643
  terrno = pMgmt->errCode;
34,882,461✔
644
  return terrno;
34,882,461✔
645
}
646

647
void mndSyncStart(SMnode *pMnode) {
503,576✔
648
  mInfo("vgId:1, start to start mnode sync");
503,576✔
649
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
503,576✔
650
  if (syncStart(pMgmt->sync) < 0) {
503,576✔
651
    mError("vgId:1, failed to start sync, id:%" PRId64, pMgmt->sync);
×
652
    return;
×
653
  }
654
  mInfo("vgId:1, mnode sync started, id:%" PRId64, pMgmt->sync);
503,576✔
655
}
656

657
void mndSyncStop(SMnode *pMnode) {
503,576✔
658
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
503,576✔
659

660
  (void)taosThreadMutexLock(&pMgmt->lock);
503,576✔
661
  if (pMgmt->transId != 0) {
503,576✔
662
    mInfo("vgId:1, trans:%d, is stopped and post sem", pMgmt->transId);
1,305✔
663
    pMgmt->transId = 0;
1,305✔
664
    pMgmt->transSec = 0;
1,305✔
665
    pMgmt->errCode = TSDB_CODE_APP_IS_STOPPING;
1,305✔
666
    if (tsem_post(&pMgmt->syncSem) < 0) {
1,305✔
667
      mError("failed to post sem");
×
668
    }
669
  }
670
  (void)taosThreadMutexUnlock(&pMgmt->lock);
503,576✔
671
}
503,576✔
672

673
bool mndIsLeader(SMnode *pMnode) {
137,844,317✔
674
  terrno = 0;
137,844,317✔
675
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
137,844,317✔
676

677
  if (terrno != 0) {
137,844,317✔
678
    mDebug("vgId:1, mnode is stopping");
×
679
    return false;
×
680
  }
681

682
  if (state.state != TAOS_SYNC_STATE_LEADER) {
137,844,317✔
683
    terrno = TSDB_CODE_SYN_NOT_LEADER;
6,606,465✔
684
    mDebug("vgId:1, mnode not leader, state:%s", syncStr(state.state));
6,606,465✔
685
    return false;
6,606,465✔
686
  }
687

688
  if (!state.restored || !pMnode->restored) {
131,237,852✔
689
    terrno = TSDB_CODE_SYN_RESTORING;
6,054,260✔
690
    mDebug("vgId:1, mnode not restored:%d:%d", state.restored, pMnode->restored);
6,054,260✔
691
    return false;
6,054,260✔
692
  }
693

694
  return true;
125,183,592✔
695
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc