• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4909

30 Dec 2025 10:52AM UTC coverage: 65.542% (+0.2%) from 65.386%
#4909

push

travis-ci

web-flow
enh: drop multi-stream (#33962)

60 of 106 new or added lines in 4 files covered. (56.6%)

857 existing lines in 113 files now uncovered.

193924 of 295877 relevant lines covered (65.54%)

120594206.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.13
/source/dnode/mnode/impl/src/mndSync.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndCluster.h"
18
#include "mndStream.h"
19
#include "mndSync.h"
20
#include "mndTrans.h"
21
#include "mndUser.h"
22
#include "mndToken.h"
23

24
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
×
25
  if (pMsg == NULL || pMsg->pCont == NULL) {
×
26
    return -1;
×
27
  }
28

29
  SMsgHead *pHead = pMsg->pCont;
×
30
  pHead->contLen = htonl(pHead->contLen);
×
31
  pHead->vgId = htonl(pHead->vgId);
×
32

33
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
×
34
    rpcFreeCont(pMsg->pCont);
×
35
    pMsg->pCont = NULL;
×
36
    return -1;
×
37
  }
38

39
  int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
×
40
  if (code != 0) {
×
41
    rpcFreeCont(pMsg->pCont);
×
42
    pMsg->pCont = NULL;
×
43
  }
44
  return code;
×
45
}
46

47
static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
42,175,584✔
48
  if (pMsg == NULL || pMsg->pCont == NULL) {
42,175,584✔
49
    return -1;
×
50
  }
51

52
  SMsgHead *pHead = pMsg->pCont;
42,175,584✔
53
  pHead->contLen = htonl(pHead->contLen);
42,175,584✔
54
  pHead->vgId = htonl(pHead->vgId);
42,175,584✔
55

56
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
42,175,584✔
57
    rpcFreeCont(pMsg->pCont);
×
58
    pMsg->pCont = NULL;
×
59
    return -1;
×
60
  }
61

62
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
42,175,584✔
63
  if (code != 0) {
42,175,584✔
64
    rpcFreeCont(pMsg->pCont);
1,176✔
65
    pMsg->pCont = NULL;
1,176✔
66
  }
67
  return code;
42,175,584✔
68
}
69

70
static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
10,796,426✔
71
  int32_t code = tmsgSendSyncReq(pEpSet, pMsg);
10,796,426✔
72
  // if (code != 0) {
73
  //   rpcFreeCont(pMsg->pCont);
74
  //   pMsg->pCont = NULL;
75
  // }
76
  return code;
10,796,426✔
77
}
78

79
static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
9,444,848✔
80
  SSdbRaw *pRaw = pAction->pRaw;
9,444,848✔
81
  SSdb    *pSdb = pMnode->pSdb;
9,444,848✔
82
  int      code = 0;
9,444,848✔
83

84
  if (pRaw->status != SDB_STATUS_CREATING) goto _OUT;
9,444,848✔
85

86
  SdbValidateFp validateFp = pSdb->validateFps[pRaw->type];
3,447,098✔
87
  if (validateFp) {
3,447,098✔
88
    code = validateFp(pMnode, pTrans, pRaw);
3,433,856✔
89
  }
90

91
_OUT:
13,242✔
92
  return code;
9,444,848✔
93
}
94

95
static int32_t mndTransValidatePrepareStage(SMnode *pMnode, STrans *pTrans) {
16,388,517✔
96
  int32_t code = -1;
16,388,517✔
97
  int32_t action = 0;
16,388,517✔
98

99
  int32_t numOfActions = taosArrayGetSize(pTrans->prepareActions);
16,388,517✔
100
  if (numOfActions == 0) {
16,388,517✔
101
    code = 0;
9,333,095✔
102
    goto _OUT;
9,333,095✔
103
  }
104

105
  mInfo("trans:%d, validate %d prepare actions.", pTrans->id, numOfActions);
7,055,422✔
106

107
  for (action = 0; action < numOfActions; ++action) {
16,500,270✔
108
    STransAction *pAction = taosArrayGet(pTrans->prepareActions, action);
9,444,848✔
109

110
    if (pAction->actionType != TRANS_ACTION_RAW) {
9,444,848✔
111
      mError("trans:%d, prepare action:%d of unexpected type:%d", pTrans->id, action, pAction->actionType);
×
112
      goto _OUT;
×
113
    }
114

115
    code = mndTransValidatePrepareAction(pMnode, pTrans, pAction);
9,444,848✔
116
    if (code != 0) {
9,444,848✔
117
      mError("trans:%d, failed to validate prepare action: %d, numOfActions:%d", pTrans->id, action, numOfActions);
×
118
      goto _OUT;
×
119
    }
120
  }
121

122
  code = 0;
7,055,422✔
123
_OUT:
16,388,517✔
124
  return code;
16,388,517✔
125
}
126

127
static int32_t mndTransValidateImp(SMnode *pMnode, STrans *pTrans) {
35,191,895✔
128
  int32_t code = 0;
35,191,895✔
129
  if (pTrans->stage == TRN_STAGE_PREPARE) {
35,191,895✔
130
    if ((code = mndTransCheckConflict(pMnode, pTrans)) < 0) {
16,388,517✔
UNCOV
131
      mError("trans:%d, failed to validate trans conflicts.", pTrans->id);
×
UNCOV
132
      TAOS_RETURN(code);
×
133
    }
134

135
    return mndTransValidatePrepareStage(pMnode, pTrans);
16,388,517✔
136
  }
137
  TAOS_RETURN(code);
18,803,378✔
138
}
139

140
static int32_t mndTransValidate(SMnode *pMnode, SSdbRaw *pRaw) {
35,191,895✔
141
  STrans *pTrans = NULL;
35,191,895✔
142
  int32_t code = -1;
35,191,895✔
143

144
  SSdbRow *pRow = mndTransDecode(pRaw);
35,191,895✔
145
  if (pRow == NULL) {
35,191,895✔
146
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
147
    if (terrno != 0) code = terrno;
×
148
    goto _OUT;
×
149
  }
150

151
  pTrans = sdbGetRowObj(pRow);
35,191,895✔
152
  if (pTrans == NULL) {
35,191,895✔
153
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
154
    if (terrno != 0) code = terrno;
×
155
    goto _OUT;
×
156
  }
157

158
  code = mndTransValidateImp(pMnode, pTrans);
35,191,895✔
159

160
_OUT:
35,191,895✔
161
  if (pTrans) mndTransDropData(pTrans);
35,191,895✔
162
  if (pRow) taosMemoryFreeClear(pRow);
35,191,895✔
163
  if (code) terrno = (terrno ? terrno : TSDB_CODE_MND_TRANS_CONFLICT);
35,191,895✔
164
  TAOS_RETURN(code);
35,191,895✔
165
}
166

167
int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
35,191,895✔
168
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
35,191,895✔
169
  SSdbRaw   *pRaw = pMsg->pCont;
35,191,895✔
170
  STrans    *pTrans = NULL;
35,191,895✔
171
  int32_t    code = -1;
35,191,895✔
172
  int32_t    transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
35,191,895✔
173

174
  if (transId <= 0) {
35,191,895✔
175
    mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq);
×
176
    code = TSDB_CODE_INVALID_MSG;
×
177
    goto _OUT;
×
178
  }
179

180
  mInfo("trans:%d, process sync proposal, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
35,191,895✔
181
        " role:%s raw:%p sec:%d seq:%" PRId64,
182
        transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state),
183
        pRaw, pMgmt->transSec, pMgmt->transSeq);
184

185
  code = mndTransValidate(pMnode, pRaw);
35,191,895✔
186
  if (code != 0) {
35,191,895✔
UNCOV
187
    mError("trans:%d, failed to validate requested trans since %s", transId, terrstr());
×
188
    // code = 0;
UNCOV
189
    pMeta->code = code;
×
UNCOV
190
    goto _OUT;
×
191
  }
192

193
  (void)taosThreadMutexLock(&pMnode->pSdb->filelock);
35,191,895✔
194
  code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
35,191,895✔
195
  if (code != 0) {
35,191,895✔
196
    mError("trans:%d, failed to write to sdb since %s", transId, terrstr());
×
197
    // code = 0;
198
    (void)taosThreadMutexUnlock(&pMnode->pSdb->filelock);
×
199
    pMeta->code = code;
×
200
    goto _OUT;
×
201
  }
202
  (void)taosThreadMutexUnlock(&pMnode->pSdb->filelock);
35,191,895✔
203

204
  pTrans = mndAcquireTrans(pMnode, transId);
35,191,895✔
205
  if (pTrans == NULL) {
35,191,895✔
206
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
207
    if (terrno != 0) code = terrno;
×
208
    mError("trans:%d, not found while execute in mnode since %s", transId, tstrerror(code));
×
209
    goto _OUT;
×
210
  }
211

212
  if (pTrans->stage == TRN_STAGE_PREPARE) {
35,191,895✔
213
    bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans, false);
16,388,517✔
214
    if (!continueExec) {
16,388,517✔
215
      if (terrno != 0) code = terrno;
×
216
      goto _OUT;
×
217
    }
218
  }
219

220
  mInfo("trans:%d, refresh transaction in process write msg", transId);
35,191,895✔
221
  mndTransRefresh(pMnode, pTrans);
35,191,895✔
222
  mInfo("trans:%d, refresh transaction in process write msg finished", transId);
35,191,895✔
223

224
  sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
35,191,895✔
225
  code = sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta);
35,191,895✔
226

227
_OUT:
35,191,895✔
228
  if (pTrans) mndReleaseTrans(pMnode, pTrans);
35,191,895✔
229
  TAOS_RETURN(code);
35,191,895✔
230
}
231

232
static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
35,607,403✔
233
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
35,607,403✔
234
  (void)taosThreadMutexLock(&pMgmt->lock);
35,607,403✔
235
  if (pMgmt->transId == 0) {
35,607,403✔
236
    goto _OUT;
2,891,753✔
237
  }
238

239
  int32_t transId = pMgmt->transId;
32,715,650✔
240
  pMgmt->transId = 0;
32,715,650✔
241
  pMgmt->transSec = 0;
32,715,650✔
242
  pMgmt->transSeq = 0;
32,715,650✔
243
  pMgmt->errCode = code;
32,715,650✔
244
  if (tsem_post(&pMgmt->syncSem) < 0) {
32,715,650✔
245
    mError("trans:%d, failed to post sem", transId);
×
246
  }
247

248
  if (pMgmt->errCode != 0) {
32,715,650✔
249
    mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
×
250
  } else {
251
    mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, transId, pMgmt->transSeq);
32,715,650✔
252
  }
253

254
_OUT:
×
255
  (void)taosThreadMutexUnlock(&pMgmt->lock);
35,607,403✔
256
  return 0;
35,607,403✔
257
}
258

259
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
35,607,403✔
260
  SMnode *pMnode = pFsm->data;
35,607,403✔
261
  int32_t code = pMsg->code;
35,607,403✔
262
  if (code != 0) {
35,607,403✔
263
    goto _OUT;
×
264
  }
265

266
  pMsg->info.conn.applyIndex = pMeta->index;
35,607,403✔
267
  pMsg->info.conn.applyTerm = pMeta->term;
35,607,403✔
268
  pMeta->code = 0;
35,607,403✔
269

270
  atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
35,607,403✔
271

272
  if (!syncUtilUserCommit(pMsg->msgType)) {
35,607,403✔
273
    goto _OUT;
415,508✔
274
  }
275

276
  code = mndProcessWriteMsg(pMnode, pMsg, pMeta);
35,191,895✔
277

278
_OUT:
35,607,403✔
279
  mndPostMgmtCode(pMnode, code ? code : pMeta->code);
35,607,403✔
280
  rpcFreeCont(pMsg->pCont);
35,607,403✔
281
  pMsg->pCont = NULL;
35,607,403✔
282
  TAOS_RETURN(code);
35,607,403✔
283
}
284

285
SyncIndex mndSyncAppliedIndex(const SSyncFSM *pFSM) {
34,881,767✔
286
  SMnode *pMnode = pFSM->data;
34,881,767✔
287
  return atomic_load_64(&pMnode->applied);
34,881,767✔
288
}
289

290
int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
×
291
  mInfo("start to read snapshot from sdb in atomic way");
×
292
  SMnode *pMnode = pFsm->data;
×
293
  return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm,
×
294
                      &pSnapshot->lastConfigIndex);
×
295
  return 0;
296
}
297

298
static int32_t mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
9,755,499✔
299
  SMnode *pMnode = pFsm->data;
9,755,499✔
300
  sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
9,755,499✔
301
  return 0;
9,755,499✔
302
}
303

304
void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
464,717✔
305
  SMnode *pMnode = pFsm->data;
464,717✔
306

307
  if (!pMnode->deploy) {
464,717✔
308
    if (!pMnode->restored) {
185,720✔
309
      mInfo("vgId:1, sync restore finished, and will handle outstanding transactions");
109,955✔
310
      mndTransPullup(pMnode);
109,955✔
311
      mndSetRestored(pMnode, true);
109,955✔
312
    } else {
313
      mInfo("vgId:1, sync restore finished, repeat call");
75,765✔
314
    }
315
  } else {
316
    mInfo("vgId:1, sync restore finished");
278,997✔
317
  }
318
  int32_t code = mndRefreshUserIpWhiteList(pMnode);
464,717✔
319
  if (code != 0) {
464,717✔
320
    mError("vgId:1, failed to refresh user ip white list since %s", tstrerror(code));
×
321
    mndSetRestored(pMnode, false);
×
322
  }
323

324
  code = mndRefreshUserDateTimeWhiteList(pMnode);
464,717✔
325
  if (code != 0) {
464,717✔
326
    mError("vgId:1, failed to refresh user date time white list since %s", tstrerror(code));
×
327
    mndSetRestored(pMnode, false);
×
328
  }
329

330
  code = mndTokenCacheRebuild(pMnode);
464,717✔
331
  if (code != 0) {
464,717✔
332
    mError("vgId:1, failed to rebuild token cache since %s", tstrerror(code));
×
333
    mndSetRestored(pMnode, false);
×
334
  }
335

336
  SyncIndex fsmIndex = mndSyncAppliedIndex(pFsm);
464,717✔
337
  if (commitIdx != fsmIndex) {
464,717✔
338
    mError("vgId:1, failed to sync restore, commitIdx:%" PRId64 " is not equal to appliedIdx:%" PRId64, commitIdx,
×
339
           fsmIndex);
340
    mndSetRestored(pMnode, false);
×
341
  }
342
}
464,717✔
343

344
void mndAfterRestored(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
464,717✔
345
  SMnode *pMnode = pFsm->data;
464,717✔
346

347
  if (!pMnode->deploy) {
464,717✔
348
    if (sdbAfterRestored(pMnode->pSdb) != 0) {
185,720✔
349
      mError("failed to prepare sdb while start mnode");
×
350
    }
351
    mInfo("vgId:1, sync restore finished and restore sdb success");
185,720✔
352
  }
353
}
464,717✔
354

355
int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
×
356
  mInfo("start to read snapshot from sdb");
×
357
  SMnode *pMnode = pFsm->data;
×
358
  return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL);
×
359
}
360

361
static void mndSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
×
362
  mInfo("stop to read snapshot from sdb");
×
363
  SMnode *pMnode = pFsm->data;
×
364
  sdbStopRead(pMnode->pSdb, pReader);
×
365
}
×
366

367
int32_t mndSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
×
368
  SMnode *pMnode = pFsm->data;
×
369
  return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len);
×
370
}
371

372
int32_t mndSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
×
373
  mInfo("start to apply snapshot to sdb");
×
374
  SMnode *pMnode = pFsm->data;
×
375
  return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter);
×
376
}
377

378
int32_t mndSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
×
379
  mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply,
×
380
        pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
381
  SMnode *pMnode = pFsm->data;
×
382
  return sdbStopWrite(pMnode->pSdb, pWriter, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm,
×
383
                      pSnapshot->lastConfigIndex);
384
}
385

386
int32_t mndSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
×
387
  SMnode *pMnode = pFsm->data;
×
388
  return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len);
×
389
}
390

391
static void mndBecomeFollower(const SSyncFSM *pFsm) {
39,219✔
392
  SMnode    *pMnode = pFsm->data;
39,219✔
393
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
39,219✔
394
  mInfo("vgId:1, becomefollower callback");
39,219✔
395

396
  (void)taosThreadMutexLock(&pMgmt->lock);
39,219✔
397
  if (pMgmt->transId != 0) {
39,219✔
398
    mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
×
399
    pMgmt->transId = 0;
×
400
    pMgmt->transSec = 0;
×
401
    pMgmt->transSeq = 0;
×
402
    pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
×
403
    if (tsem_post(&pMgmt->syncSem) < 0) {
×
404
      mError("failed to post sem");
×
405
    }
406
  }
407
  (void)taosThreadMutexUnlock(&pMgmt->lock);
39,219✔
408

409
  msmHandleBecomeNotLeader(pMnode);  
39,219✔
410
}
39,219✔
411

412
static void mndBecomeLearner(const SSyncFSM *pFsm) {
12,507✔
413
  SMnode    *pMnode = pFsm->data;
12,507✔
414
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
12,507✔
415
  mInfo("vgId:1, become learner");
12,507✔
416

417
  (void)taosThreadMutexLock(&pMgmt->lock);
12,507✔
418
  if (pMgmt->transId != 0) {
12,507✔
419
    mInfo("vgId:1, become learner and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
×
420
    pMgmt->transId = 0;
×
421
    pMgmt->transSec = 0;
×
422
    pMgmt->transSeq = 0;
×
423
    pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
×
424
    if (tsem_post(&pMgmt->syncSem) < 0) {
×
425
      mError("failed to post sem");
×
426
    }
427
  }
428
  (void)taosThreadMutexUnlock(&pMgmt->lock);
12,507✔
429

430
  msmHandleBecomeNotLeader(pMnode);  
12,507✔
431
}
12,507✔
432

433
static void mndBecomeLeader(const SSyncFSM *pFsm) {
360,726✔
434
  mInfo("vgId:1, becomeleader callback");
360,726✔
435
  SMnode *pMnode = pFsm->data;
360,726✔
436

437
  msmHandleBecomeLeader(pMnode);
360,726✔
438
}
360,726✔
439

440
static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) {
×
441
  SMnode *pMnode = pFsm->data;
×
442

443
  if (pMnode != NULL && pMnode->msgCb.qsizeFp != NULL) {
×
444
    int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
×
445
    return (itemSize == 0);
×
446
  } else {
447
    return true;
×
448
  }
449
}
450

451
static int32_t mndApplyQueueItems(const SSyncFSM *pFsm) {
1,327,428✔
452
  SMnode *pMnode = pFsm->data;
1,327,428✔
453

454
  if (pMnode != NULL && pMnode->msgCb.qsizeFp != NULL) {
1,327,428✔
455
    int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
×
456
    return itemSize;
×
457
  } else {
458
    return -1;
1,327,428✔
459
  }
460
}
461

462
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
389,014✔
463
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
389,014✔
464
  pFsm->data = pMnode;
389,014✔
465
  pFsm->FpCommitCb = mndSyncCommitMsg;
389,014✔
466
  pFsm->FpAppliedIndexCb = mndSyncAppliedIndex;
389,014✔
467
  pFsm->FpPreCommitCb = NULL;
389,014✔
468
  pFsm->FpRollBackCb = NULL;
389,014✔
469
  pFsm->FpRestoreFinishCb = mndRestoreFinish;
389,014✔
470
  pFsm->FpAfterRestoredCb = mndAfterRestored;
389,014✔
471
  pFsm->FpLeaderTransferCb = NULL;
389,014✔
472
  pFsm->FpApplyQueueEmptyCb = mndApplyQueueEmpty;
389,014✔
473
  pFsm->FpApplyQueueItems = mndApplyQueueItems;
389,014✔
474
  pFsm->FpReConfigCb = NULL;
389,014✔
475
  pFsm->FpBecomeLeaderCb = mndBecomeLeader;
389,014✔
476
  pFsm->FpBecomeAssignedLeaderCb = NULL;
389,014✔
477
  pFsm->FpBecomeFollowerCb = mndBecomeFollower;
389,014✔
478
  pFsm->FpBecomeLearnerCb = mndBecomeLearner;
389,014✔
479
  pFsm->FpGetSnapshot = mndSyncGetSnapshot;
389,014✔
480
  pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo;
389,014✔
481
  pFsm->FpSnapshotStartRead = mndSnapshotStartRead;
389,014✔
482
  pFsm->FpSnapshotStopRead = mndSnapshotStopRead;
389,014✔
483
  pFsm->FpSnapshotDoRead = mndSnapshotDoRead;
389,014✔
484
  pFsm->FpSnapshotStartWrite = mndSnapshotStartWrite;
389,014✔
485
  pFsm->FpSnapshotStopWrite = mndSnapshotStopWrite;
389,014✔
486
  pFsm->FpSnapshotDoWrite = mndSnapshotDoWrite;
389,014✔
487
  return pFsm;
389,014✔
488
}
489

490
int32_t mndInitSync(SMnode *pMnode) {
389,014✔
491
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
389,014✔
492
  (void)taosThreadMutexInit(&pMgmt->lock, NULL);
389,014✔
493
  (void)taosThreadMutexLock(&pMgmt->lock);
389,014✔
494
  pMgmt->transId = 0;
389,014✔
495
  pMgmt->transSec = 0;
389,014✔
496
  pMgmt->transSeq = 0;
389,014✔
497
  (void)taosThreadMutexUnlock(&pMgmt->lock);
389,014✔
498

499
  SSyncInfo syncInfo = {
389,014✔
500
      .snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT,
501
      .batchSize = 1,
502
      .vgId = 1,
503
      .pWal = pMnode->pWal,
389,014✔
504
      .msgcb = &pMnode->msgCb,
389,014✔
505
      .syncSendMSg = mndSyncSendMsg,
506
      .syncEqMsg = mndSyncEqMsg,
507
      .syncEqCtrlMsg = mndSyncEqCtrlMsg,
508
      .pingMs = 5000,
509
      .electMs = tsMnodeElectIntervalMs,
510
      .heartbeatMs = tsMnodeHeartbeatIntervalMs,
511
  };
512

513
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
389,014✔
514
  syncInfo.pFsm = mndSyncMakeFsm(pMnode);
389,014✔
515

516
  SSyncCfg *pCfg = &syncInfo.syncCfg;
389,014✔
517
  mInfo("vgId:1, start to open mnode sync, in syncMgmt, numOfTotalReplicas:%d selfIndex:%d, electMs:%d, heartbeatMs:%d",
389,014✔
518
        pMgmt->numOfTotalReplicas, pMgmt->selfIndex, syncInfo.electMs, syncInfo.heartbeatMs);
519
  pCfg->totalReplicaNum = pMgmt->numOfTotalReplicas;
389,014✔
520
  pCfg->replicaNum = pMgmt->numOfReplicas;
389,014✔
521
  pCfg->myIndex = pMgmt->selfIndex;
389,014✔
522
  pCfg->lastIndex = pMgmt->lastIndex;
389,014✔
523
  for (int32_t i = 0; i < pMgmt->numOfTotalReplicas; ++i) {
730,301✔
524
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
341,287✔
525
    pNode->nodeId = pMgmt->replicas[i].id;
341,287✔
526
    pNode->nodePort = pMgmt->replicas[i].port;
341,287✔
527
    tstrncpy(pNode->nodeFqdn, pMgmt->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
341,287✔
528
    pNode->nodeRole = pMgmt->nodeRoles[i];
341,287✔
529
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
341,287✔
530
    mInfo("vgId:1, index:%d ep:%s:%u dnode:%d cluster:%" PRId64 ", update:%d", i, pNode->nodeFqdn, pNode->nodePort,
341,287✔
531
          pNode->nodeId, pNode->clusterId, update);
532
  }
533

534
  int32_t code = 0;
389,014✔
535
  if ((code = tsem_init(&pMgmt->syncSem, 0, 0)) < 0) {
389,014✔
536
    mError("failed to open sync, tsem_init, since %s", tstrerror(code));
×
537
    TAOS_RETURN(code);
×
538
  }
539
  pMgmt->sync = syncOpen(&syncInfo, 1);  // always check
389,014✔
540
  if (pMgmt->sync <= 0) {
389,014✔
541
    if (terrno != 0) code = terrno;
×
542
    mError("failed to open sync since %s", tstrerror(code));
×
543
    TAOS_RETURN(code);
×
544
  }
545
  pMnode->pSdb->sync = pMgmt->sync;
389,014✔
546

547
  mInfo("vgId:1, mnode sync is opened, id:%" PRId64, pMgmt->sync);
389,014✔
548
  TAOS_RETURN(code);
389,014✔
549
}
550

551
void mndCleanupSync(SMnode *pMnode) {
388,952✔
552
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
388,952✔
553
  syncStop(pMgmt->sync);
388,952✔
554
  mInfo("mnode-sync is stopped, id:%" PRId64, pMgmt->sync);
388,952✔
555

556
  if (tsem_destroy(&pMgmt->syncSem) < 0) {
388,952✔
557
    mError("failed to destroy sem");
×
558
  }
559
  (void)taosThreadMutexDestroy(&pMgmt->lock);
388,952✔
560
  memset(pMgmt, 0, sizeof(SSyncMgmt));
388,952✔
561
}
388,952✔
562

563
void mndSyncCheckTimeout(SMnode *pMnode) {
1,011,846✔
564
  mTrace("check sync timeout");
1,011,846✔
565
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
1,011,846✔
566
  (void)taosThreadMutexLock(&pMgmt->lock);
1,011,846✔
567
  if (pMgmt->transId != 0) {
1,011,846✔
568
    int32_t curSec = taosGetTimestampSec();
10,547✔
569
    int32_t delta = curSec - pMgmt->transSec;
10,547✔
570
    if (delta > MNODE_TIMEOUT_SEC) {
10,547✔
571
      mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
×
572
             pMgmt->transSec, curSec, delta, pMgmt->transSeq);
573
      // pMgmt->transId = 0;
574
      // pMgmt->transSec = 0;
575
      // pMgmt->transSeq = 0;
576
      // terrno = TSDB_CODE_SYN_TIMEOUT;
577
      // pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
578
      // if (tsem_post(&pMgmt->syncSem) < 0) {
579
      //  mError("failed to post sem");
580
      //}
581
    } else {
582
      mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
10,547✔
583
             pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq);
584
    }
585
  } else {
586
    // mTrace("check sync timeout msg, no trans waiting for confirm");
587
  }
588
  (void)taosThreadMutexUnlock(&pMgmt->lock);
1,011,846✔
589
}
1,011,846✔
590

591
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
32,717,327✔
592
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
32,717,327✔
593

594
  SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
32,717,327✔
595
  if (req.contLen <= 0) return terrno;
32,717,327✔
596

597
  req.pCont = rpcMallocCont(req.contLen);
32,717,327✔
598
  if (req.pCont == NULL) return terrno;
32,717,327✔
599
  memcpy(req.pCont, pRaw, req.contLen);
32,717,327✔
600

601
  (void)taosThreadMutexLock(&pMgmt->lock);
32,717,327✔
602
  pMgmt->errCode = 0;
32,717,327✔
603

604
  if (pMgmt->transId != 0) {
32,717,327✔
605
    mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
×
606
    (void)taosThreadMutexUnlock(&pMgmt->lock);
×
607
    rpcFreeCont(req.pCont);
×
608
    TAOS_RETURN(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED);
×
609
  }
610

611
  mInfo("trans:%d, will be proposed", transId);
32,717,327✔
612
  pMgmt->transId = transId;
32,717,327✔
613
  pMgmt->transSec = taosGetTimestampSec();
32,717,327✔
614

615
  int64_t seq = 0;
32,717,327✔
616
  int32_t code = syncPropose(pMgmt->sync, &req, false, &seq);
32,717,327✔
617
  if (code == 0) {
32,717,327✔
618
    mInfo("trans:%d, is proposing and wait sem, seq:%" PRId64, transId, seq);
32,716,151✔
619
    pMgmt->transSeq = seq;
32,716,151✔
620
    (void)taosThreadMutexUnlock(&pMgmt->lock);
32,716,151✔
621
    code = tsem_wait(&pMgmt->syncSem);
32,716,151✔
622
  } else if (code > 0) {
1,176✔
623
    mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
×
624
    pMgmt->transId = 0;
×
625
    pMgmt->transSec = 0;
×
626
    pMgmt->transSeq = 0;
×
627
    (void)taosThreadMutexUnlock(&pMgmt->lock);
×
628
    code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
×
629
    if (code == 0) {
×
630
      sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
×
631
    }
632
  } else {
633
    mError("trans:%d, failed to proposed since %s", transId, terrstr());
1,176✔
634
    pMgmt->transId = 0;
1,176✔
635
    pMgmt->transSec = 0;
1,176✔
636
    pMgmt->transSeq = 0;
1,176✔
637
    (void)taosThreadMutexUnlock(&pMgmt->lock);
1,176✔
638
    if (terrno == 0) {
1,176✔
639
      terrno = TSDB_CODE_APP_ERROR;
×
640
    }
641
  }
642

643
  rpcFreeCont(req.pCont);
32,717,327✔
644
  req.pCont = NULL;
32,717,327✔
645
  if (code != 0) {
32,717,327✔
646
    mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code);
1,176✔
647
    return code;
1,176✔
648
  }
649

650
  terrno = pMgmt->errCode;
32,716,151✔
651
  return terrno;
32,716,151✔
652
}
653

654
void mndSyncStart(SMnode *pMnode) {
388,952✔
655
  mInfo("vgId:1, start to start mnode sync");
388,952✔
656
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
388,952✔
657
  if (syncStart(pMgmt->sync) < 0) {
388,952✔
658
    mError("vgId:1, failed to start sync, id:%" PRId64, pMgmt->sync);
×
659
    return;
×
660
  }
661
  mInfo("vgId:1, mnode sync started, id:%" PRId64, pMgmt->sync);
388,952✔
662
}
663

664
void mndSyncStop(SMnode *pMnode) {
388,952✔
665
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
388,952✔
666

667
  (void)taosThreadMutexLock(&pMgmt->lock);
388,952✔
668
  if (pMgmt->transId != 0) {
388,952✔
669
    mInfo("vgId:1, trans:%d, is stopped and post sem", pMgmt->transId);
501✔
670
    pMgmt->transId = 0;
501✔
671
    pMgmt->transSec = 0;
501✔
672
    pMgmt->errCode = TSDB_CODE_APP_IS_STOPPING;
501✔
673
    if (tsem_post(&pMgmt->syncSem) < 0) {
501✔
674
      mError("failed to post sem");
×
675
    }
676
  }
677
  (void)taosThreadMutexUnlock(&pMgmt->lock);
388,952✔
678
}
388,952✔
679

680
bool mndIsLeader(SMnode *pMnode) {
120,574,084✔
681
  terrno = 0;
120,574,084✔
682
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
120,574,084✔
683

684
  if (terrno != 0) {
120,574,084✔
685
    mDebug("vgId:1, mnode is stopping");
×
686
    return false;
×
687
  }
688

689
  if (state.state != TAOS_SYNC_STATE_LEADER) {
120,574,084✔
690
    terrno = TSDB_CODE_SYN_NOT_LEADER;
2,378,012✔
691
    mDebug("vgId:1, mnode not leader, state:%s", syncStr(state.state));
2,378,012✔
692
    return false;
2,378,012✔
693
  }
694

695
  if (!state.restored || !pMnode->restored) {
118,196,072✔
696
    terrno = TSDB_CODE_SYN_RESTORING;
10,092,022✔
697
    mDebug("vgId:1, mnode not restored:%d:%d", state.restored, pMnode->restored);
10,092,022✔
698
    return false;
10,092,022✔
699
  }
700

701
  return true;
108,104,050✔
702
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc