• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4912

04 Jan 2026 09:05AM UTC coverage: 64.888% (-0.1%) from 65.028%
#4912

push

travis-ci

web-flow
merge: from main to 3.0 branch #34156

1206 of 4524 new or added lines in 22 files covered. (26.66%)

5351 existing lines in 123 files now uncovered.

194856 of 300296 relevant lines covered (64.89%)

118198896.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.33
/source/dnode/mnode/impl/src/mndSync.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndCluster.h"
18
#include "mndStream.h"
19
#include "mndSync.h"
20
#include "mndTrans.h"
21
#include "mndUser.h"
22
#include "mndXnode.h"
23
#include "mndToken.h"
24

UNCOV
25
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
×
26
  if (pMsg == NULL || pMsg->pCont == NULL) {
×
27
    return -1;
×
28
  }
29

UNCOV
30
  SMsgHead *pHead = pMsg->pCont;
×
31
  pHead->contLen = htonl(pHead->contLen);
×
32
  pHead->vgId = htonl(pHead->vgId);
×
33

UNCOV
34
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
×
35
    rpcFreeCont(pMsg->pCont);
×
36
    pMsg->pCont = NULL;
×
37
    return -1;
×
38
  }
39

UNCOV
40
  int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
×
41
  if (code != 0) {
×
42
    rpcFreeCont(pMsg->pCont);
×
43
    pMsg->pCont = NULL;
×
44
  }
UNCOV
45
  return code;
×
46
}
47

48
static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
39,607,296✔
49
  if (pMsg == NULL || pMsg->pCont == NULL) {
39,607,296✔
UNCOV
50
    return -1;
×
51
  }
52

53
  SMsgHead *pHead = pMsg->pCont;
39,607,296✔
54
  pHead->contLen = htonl(pHead->contLen);
39,607,296✔
55
  pHead->vgId = htonl(pHead->vgId);
39,607,296✔
56

57
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
39,607,296✔
UNCOV
58
    rpcFreeCont(pMsg->pCont);
×
59
    pMsg->pCont = NULL;
×
60
    return -1;
×
61
  }
62

63
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
39,607,296✔
64
  if (code != 0) {
39,607,296✔
65
    rpcFreeCont(pMsg->pCont);
1,333✔
66
    pMsg->pCont = NULL;
1,333✔
67
  }
68
  return code;
39,607,296✔
69
}
70

71
static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
9,850,783✔
72
  int32_t code = tmsgSendSyncReq(pEpSet, pMsg);
9,850,783✔
73
  // if (code != 0) {
74
  //   rpcFreeCont(pMsg->pCont);
75
  //   pMsg->pCont = NULL;
76
  // }
77
  return code;
9,850,783✔
78
}
79

80
static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
9,786,573✔
81
  SSdbRaw *pRaw = pAction->pRaw;
9,786,573✔
82
  SSdb    *pSdb = pMnode->pSdb;
9,786,573✔
83
  int      code = 0;
9,786,573✔
84

85
  if (pRaw->status != SDB_STATUS_CREATING) goto _OUT;
9,786,573✔
86

87
  SdbValidateFp validateFp = pSdb->validateFps[pRaw->type];
3,459,321✔
88
  if (validateFp) {
3,459,321✔
89
    code = validateFp(pMnode, pTrans, pRaw);
3,446,375✔
90
  }
91

92
_OUT:
12,946✔
93
  return code;
9,786,573✔
94
}
95

96
static int32_t mndTransValidatePrepareStage(SMnode *pMnode, STrans *pTrans) {
16,610,307✔
97
  int32_t code = -1;
16,610,307✔
98
  int32_t action = 0;
16,610,307✔
99

100
  int32_t numOfActions = taosArrayGetSize(pTrans->prepareActions);
16,610,307✔
101
  if (numOfActions == 0) {
16,610,307✔
102
    code = 0;
9,220,598✔
103
    goto _OUT;
9,220,598✔
104
  }
105

106
  mInfo("trans:%d, validate %d prepare actions.", pTrans->id, numOfActions);
7,389,709✔
107

108
  for (action = 0; action < numOfActions; ++action) {
17,176,282✔
109
    STransAction *pAction = taosArrayGet(pTrans->prepareActions, action);
9,786,573✔
110

111
    if (pAction->actionType != TRANS_ACTION_RAW) {
9,786,573✔
UNCOV
112
      mError("trans:%d, prepare action:%d of unexpected type:%d", pTrans->id, action, pAction->actionType);
×
113
      goto _OUT;
×
114
    }
115

116
    code = mndTransValidatePrepareAction(pMnode, pTrans, pAction);
9,786,573✔
117
    if (code != 0) {
9,786,573✔
UNCOV
118
      mError("trans:%d, failed to validate prepare action: %d, numOfActions:%d", pTrans->id, action, numOfActions);
×
119
      goto _OUT;
×
120
    }
121
  }
122

123
  code = 0;
7,389,709✔
124
_OUT:
16,610,307✔
125
  return code;
16,610,307✔
126
}
127

128
static int32_t mndTransValidateImp(SMnode *pMnode, STrans *pTrans) {
35,731,549✔
129
  int32_t code = 0;
35,731,549✔
130
  if (pTrans->stage == TRN_STAGE_PREPARE) {
35,731,549✔
131
    if ((code = mndTransCheckConflict(pMnode, pTrans)) < 0) {
16,610,307✔
UNCOV
132
      mError("trans:%d, failed to validate trans conflicts.", pTrans->id);
×
133
      TAOS_RETURN(code);
×
134
    }
135

136
    return mndTransValidatePrepareStage(pMnode, pTrans);
16,610,307✔
137
  }
138
  TAOS_RETURN(code);
19,121,242✔
139
}
140

141
static int32_t mndTransValidate(SMnode *pMnode, SSdbRaw *pRaw) {
35,731,549✔
142
  STrans *pTrans = NULL;
35,731,549✔
143
  int32_t code = -1;
35,731,549✔
144

145
  SSdbRow *pRow = mndTransDecode(pRaw);
35,731,549✔
146
  if (pRow == NULL) {
35,731,549✔
UNCOV
147
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
148
    if (terrno != 0) code = terrno;
×
149
    goto _OUT;
×
150
  }
151

152
  pTrans = sdbGetRowObj(pRow);
35,731,549✔
153
  if (pTrans == NULL) {
35,731,549✔
UNCOV
154
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
155
    if (terrno != 0) code = terrno;
×
156
    goto _OUT;
×
157
  }
158

159
  code = mndTransValidateImp(pMnode, pTrans);
35,731,549✔
160

161
_OUT:
35,731,549✔
162
  if (pTrans) mndTransDropData(pTrans);
35,731,549✔
163
  if (pRow) taosMemoryFreeClear(pRow);
35,731,549✔
164
  if (code) terrno = (terrno ? terrno : TSDB_CODE_MND_TRANS_CONFLICT);
35,731,549✔
165
  TAOS_RETURN(code);
35,731,549✔
166
}
167

168
int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
35,731,549✔
169
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
35,731,549✔
170
  SSdbRaw   *pRaw = pMsg->pCont;
35,731,549✔
171
  STrans    *pTrans = NULL;
35,731,549✔
172
  int32_t    code = -1;
35,731,549✔
173
  int32_t    transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
35,731,549✔
174

175
  if (transId <= 0) {
35,731,549✔
UNCOV
176
    mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq);
×
177
    code = TSDB_CODE_INVALID_MSG;
×
178
    goto _OUT;
×
179
  }
180

181
  mInfo("trans:%d, process sync proposal, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
35,731,549✔
182
        " role:%s raw:%p sec:%d seq:%" PRId64,
183
        transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state),
184
        pRaw, pMgmt->transSec, pMgmt->transSeq);
185

186
  code = mndTransValidate(pMnode, pRaw);
35,731,549✔
187
  if (code != 0) {
35,731,549✔
UNCOV
188
    mError("trans:%d, failed to validate requested trans since %s", transId, terrstr());
×
189
    // code = 0;
UNCOV
190
    pMeta->code = code;
×
191
    goto _OUT;
×
192
  }
193

194
  (void)taosThreadMutexLock(&pMnode->pSdb->filelock);
35,731,549✔
195
  code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
35,731,549✔
196
  if (code != 0) {
35,731,549✔
UNCOV
197
    mError("trans:%d, failed to write to sdb since %s", transId, terrstr());
×
198
    // code = 0;
UNCOV
199
    (void)taosThreadMutexUnlock(&pMnode->pSdb->filelock);
×
200
    pMeta->code = code;
×
201
    goto _OUT;
×
202
  }
203
  (void)taosThreadMutexUnlock(&pMnode->pSdb->filelock);
35,731,549✔
204

205
  pTrans = mndAcquireTrans(pMnode, transId);
35,731,549✔
206
  if (pTrans == NULL) {
35,731,549✔
UNCOV
207
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
208
    if (terrno != 0) code = terrno;
×
209
    mError("trans:%d, not found while execute in mnode since %s", transId, tstrerror(code));
×
210
    goto _OUT;
×
211
  }
212

213
  if (pTrans->stage == TRN_STAGE_PREPARE) {
35,731,549✔
214
    bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans, false);
16,610,307✔
215
    if (!continueExec) {
16,610,307✔
UNCOV
216
      if (terrno != 0) code = terrno;
×
217
      goto _OUT;
×
218
    }
219
  }
220

221
  mInfo("trans:%d, refresh transaction in process write msg", transId);
35,731,549✔
222
  mndTransRefresh(pMnode, pTrans);
35,731,549✔
223
  mInfo("trans:%d, refresh transaction in process write msg finished", transId);
35,731,549✔
224

225
  sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
35,731,549✔
226
  code = sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta);
35,731,549✔
227

228
_OUT:
35,731,549✔
229
  if (pTrans) mndReleaseTrans(pMnode, pTrans);
35,731,549✔
230
  TAOS_RETURN(code);
35,731,549✔
231
}
232

233
static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
36,137,376✔
234
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
36,137,376✔
235
  (void)taosThreadMutexLock(&pMgmt->lock);
36,137,376✔
236
  if (pMgmt->transId == 0) {
36,137,376✔
237
    goto _OUT;
3,198,909✔
238
  }
239

240
  int32_t transId = pMgmt->transId;
32,938,467✔
241
  pMgmt->transId = 0;
32,938,467✔
242
  pMgmt->transSec = 0;
32,938,467✔
243
  pMgmt->transSeq = 0;
32,938,467✔
244
  pMgmt->errCode = code;
32,938,467✔
245
  if (tsem_post(&pMgmt->syncSem) < 0) {
32,938,467✔
UNCOV
246
    mError("trans:%d, failed to post sem", transId);
×
247
  }
248

249
  if (pMgmt->errCode != 0) {
32,938,467✔
UNCOV
250
    mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
×
251
  } else {
252
    mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, transId, pMgmt->transSeq);
32,938,467✔
253
  }
254

UNCOV
255
_OUT:
×
256
  (void)taosThreadMutexUnlock(&pMgmt->lock);
36,137,376✔
257
  return 0;
36,137,376✔
258
}
259

260
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
36,137,376✔
261
  SMnode *pMnode = pFsm->data;
36,137,376✔
262
  int32_t code = pMsg->code;
36,137,376✔
263
  if (code != 0) {
36,137,376✔
UNCOV
264
    goto _OUT;
×
265
  }
266

267
  pMsg->info.conn.applyIndex = pMeta->index;
36,137,376✔
268
  pMsg->info.conn.applyTerm = pMeta->term;
36,137,376✔
269
  pMeta->code = 0;
36,137,376✔
270

271
  atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
36,137,376✔
272

273
  if (!syncUtilUserCommit(pMsg->msgType)) {
36,137,376✔
274
    goto _OUT;
405,827✔
275
  }
276

277
  code = mndProcessWriteMsg(pMnode, pMsg, pMeta);
35,731,549✔
278

279
_OUT:
36,137,376✔
280
  mndPostMgmtCode(pMnode, code ? code : pMeta->code);
36,137,376✔
281
  rpcFreeCont(pMsg->pCont);
36,137,376✔
282
  pMsg->pCont = NULL;
36,137,376✔
283
  TAOS_RETURN(code);
36,137,376✔
284
}
285

286
SyncIndex mndSyncAppliedIndex(const SSyncFSM *pFSM) {
35,047,653✔
287
  SMnode *pMnode = pFSM->data;
35,047,653✔
288
  return atomic_load_64(&pMnode->applied);
35,047,653✔
289
}
290

UNCOV
291
int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
×
292
  mInfo("start to read snapshot from sdb in atomic way");
×
293
  SMnode *pMnode = pFsm->data;
×
294
  return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm,
×
295
                      &pSnapshot->lastConfigIndex);
×
296
  return 0;
297
}
298

299
static int32_t mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
9,587,609✔
300
  SMnode *pMnode = pFsm->data;
9,587,609✔
301
  sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
9,587,609✔
302
  return 0;
9,587,609✔
303
}
304

305
void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
456,443✔
306
  SMnode *pMnode = pFsm->data;
456,443✔
307

308
  if (!pMnode->deploy) {
456,443✔
309
    if (!pMnode->restored) {
179,358✔
310
      mInfo("vgId:1, sync restore finished, and will handle outstanding transactions");
106,579✔
311
      mndTransPullup(pMnode);
106,579✔
312
      mndSetRestored(pMnode, true);
106,579✔
313
    } else {
314
      mInfo("vgId:1, sync restore finished, repeat call");
72,779✔
315
    }
316
  } else {
317
    mInfo("vgId:1, sync restore finished");
277,085✔
318
  }
319
  int32_t code = mndRefreshUserIpWhiteList(pMnode);
456,443✔
320
  if (code != 0) {
456,443✔
UNCOV
321
    mError("vgId:1, failed to refresh user ip white list since %s", tstrerror(code));
×
322
    mndSetRestored(pMnode, false);
×
323
  }
324

325
  code = mndRefreshUserDateTimeWhiteList(pMnode);
456,443✔
326
  if (code != 0) {
456,443✔
UNCOV
327
    mError("vgId:1, failed to refresh user date time white list since %s", tstrerror(code));
×
328
    mndSetRestored(pMnode, false);
×
329
  }
330

331
  code = mndTokenCacheRebuild(pMnode);
456,443✔
332
  if (code != 0) {
456,443✔
UNCOV
333
    mError("vgId:1, failed to rebuild token cache since %s", tstrerror(code));
×
334
    mndSetRestored(pMnode, false);
×
335
  }
336

337
  SyncIndex fsmIndex = mndSyncAppliedIndex(pFsm);
456,443✔
338
  if (commitIdx != fsmIndex) {
456,443✔
UNCOV
339
    mError("vgId:1, failed to sync restore, commitIdx:%" PRId64 " is not equal to appliedIdx:%" PRId64, commitIdx,
×
340
           fsmIndex);
UNCOV
341
    mndSetRestored(pMnode, false);
×
342
  }
343
}
456,443✔
344

345
void mndAfterRestored(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
456,443✔
346
  SMnode *pMnode = pFsm->data;
456,443✔
347

348
  if (!pMnode->deploy) {
456,443✔
349
    if (sdbAfterRestored(pMnode->pSdb) != 0) {
179,358✔
UNCOV
350
      mError("failed to prepare sdb while start mnode");
×
351
    }
352
    mInfo("vgId:1, sync restore finished and restore sdb success");
179,358✔
353
  }
354
}
456,443✔
355

UNCOV
356
int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
×
357
  mInfo("start to read snapshot from sdb");
×
358
  SMnode *pMnode = pFsm->data;
×
359
  return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL);
×
360
}
361

UNCOV
362
static void mndSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
×
363
  mInfo("stop to read snapshot from sdb");
×
364
  SMnode *pMnode = pFsm->data;
×
365
  sdbStopRead(pMnode->pSdb, pReader);
×
366
}
×
367

UNCOV
368
int32_t mndSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
×
369
  SMnode *pMnode = pFsm->data;
×
370
  return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len);
×
371
}
372

UNCOV
373
int32_t mndSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
×
374
  mInfo("start to apply snapshot to sdb");
×
375
  SMnode *pMnode = pFsm->data;
×
376
  return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter);
×
377
}
378

UNCOV
379
int32_t mndSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
×
380
  mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply,
×
381
        pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
UNCOV
382
  SMnode *pMnode = pFsm->data;
×
383
  return sdbStopWrite(pMnode->pSdb, pWriter, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm,
×
384
                      pSnapshot->lastConfigIndex);
385
}
386

UNCOV
387
int32_t mndSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
×
388
  SMnode *pMnode = pFsm->data;
×
389
  return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len);
×
390
}
391

392
static void mndBecomeFollower(const SSyncFSM *pFsm) {
36,842✔
393
  SMnode    *pMnode = pFsm->data;
36,842✔
394
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
36,842✔
395
  mInfo("vgId:1, becomefollower callback");
36,842✔
396

397
  (void)taosThreadMutexLock(&pMgmt->lock);
36,842✔
398
  if (pMgmt->transId != 0) {
36,842✔
UNCOV
399
    mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
×
400
    pMgmt->transId = 0;
×
401
    pMgmt->transSec = 0;
×
402
    pMgmt->transSeq = 0;
×
403
    pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
×
404
    if (tsem_post(&pMgmt->syncSem) < 0) {
×
405
      mError("failed to post sem");
×
406
    }
407
  }
408
  (void)taosThreadMutexUnlock(&pMgmt->lock);
36,842✔
409

410
  msmHandleBecomeNotLeader(pMnode);
36,842✔
411
  mndXnodeHandleBecomeNotLeader();
36,842✔
412
}
36,842✔
413

414
static void mndBecomeLearner(const SSyncFSM *pFsm) {
12,236✔
415
  SMnode    *pMnode = pFsm->data;
12,236✔
416
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
12,236✔
417
  mInfo("vgId:1, become learner");
12,236✔
418

419
  (void)taosThreadMutexLock(&pMgmt->lock);
12,236✔
420
  if (pMgmt->transId != 0) {
12,236✔
UNCOV
421
    mInfo("vgId:1, become learner and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
×
UNCOV
422
    pMgmt->transId = 0;
×
423
    pMgmt->transSec = 0;
×
424
    pMgmt->transSeq = 0;
×
425
    pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
×
426
    if (tsem_post(&pMgmt->syncSem) < 0) {
×
427
      mError("failed to post sem");
×
428
    }
429
  }
430
  (void)taosThreadMutexUnlock(&pMgmt->lock);
12,236✔
431

432
  msmHandleBecomeNotLeader(pMnode);
12,236✔
433
  mndXnodeHandleBecomeNotLeader();
12,236✔
434
}
12,236✔
435

436
static void mndBecomeLeader(const SSyncFSM *pFsm) {
355,833✔
437
  mInfo("vgId:1, becomeleader callback");
355,833✔
438
  SMnode *pMnode = pFsm->data;
355,833✔
439

440
  msmHandleBecomeLeader(pMnode);
355,833✔
441
  mndXnodeHandleBecomeLeader(pMnode);
355,833✔
442
}
355,833✔
443

UNCOV
444
static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) {
×
UNCOV
445
  SMnode *pMnode = pFsm->data;
×
446

UNCOV
447
  if (pMnode != NULL && pMnode->msgCb.qsizeFp != NULL) {
×
448
    int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
×
449
    return (itemSize == 0);
×
450
  } else {
451
    return true;
×
452
  }
453
}
454

455
static int32_t mndApplyQueueItems(const SSyncFSM *pFsm) {
1,284,319✔
456
  SMnode *pMnode = pFsm->data;
1,284,319✔
457

458
  if (pMnode != NULL && pMnode->msgCb.qsizeFp != NULL) {
1,284,319✔
UNCOV
459
    int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
×
UNCOV
460
    return itemSize;
×
461
  } else {
462
    return -1;
1,284,319✔
463
  }
464
}
465

466
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
383,812✔
467
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
383,812✔
468
  pFsm->data = pMnode;
383,812✔
469
  pFsm->FpCommitCb = mndSyncCommitMsg;
383,812✔
470
  pFsm->FpAppliedIndexCb = mndSyncAppliedIndex;
383,812✔
471
  pFsm->FpPreCommitCb = NULL;
383,812✔
472
  pFsm->FpRollBackCb = NULL;
383,812✔
473
  pFsm->FpRestoreFinishCb = mndRestoreFinish;
383,812✔
474
  pFsm->FpAfterRestoredCb = mndAfterRestored;
383,812✔
475
  pFsm->FpLeaderTransferCb = NULL;
383,812✔
476
  pFsm->FpApplyQueueEmptyCb = mndApplyQueueEmpty;
383,812✔
477
  pFsm->FpApplyQueueItems = mndApplyQueueItems;
383,812✔
478
  pFsm->FpReConfigCb = NULL;
383,812✔
479
  pFsm->FpBecomeLeaderCb = mndBecomeLeader;
383,812✔
480
  pFsm->FpBecomeAssignedLeaderCb = NULL;
383,812✔
481
  pFsm->FpBecomeFollowerCb = mndBecomeFollower;
383,812✔
482
  pFsm->FpBecomeLearnerCb = mndBecomeLearner;
383,812✔
483
  pFsm->FpGetSnapshot = mndSyncGetSnapshot;
383,812✔
484
  pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo;
383,812✔
485
  pFsm->FpSnapshotStartRead = mndSnapshotStartRead;
383,812✔
486
  pFsm->FpSnapshotStopRead = mndSnapshotStopRead;
383,812✔
487
  pFsm->FpSnapshotDoRead = mndSnapshotDoRead;
383,812✔
488
  pFsm->FpSnapshotStartWrite = mndSnapshotStartWrite;
383,812✔
489
  pFsm->FpSnapshotStopWrite = mndSnapshotStopWrite;
383,812✔
490
  pFsm->FpSnapshotDoWrite = mndSnapshotDoWrite;
383,812✔
491
  return pFsm;
383,812✔
492
}
493

494
int32_t mndInitSync(SMnode *pMnode) {
383,812✔
495
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
383,812✔
496
  (void)taosThreadMutexInit(&pMgmt->lock, NULL);
383,812✔
497
  (void)taosThreadMutexLock(&pMgmt->lock);
383,812✔
498
  pMgmt->transId = 0;
383,812✔
499
  pMgmt->transSec = 0;
383,812✔
500
  pMgmt->transSeq = 0;
383,812✔
501
  (void)taosThreadMutexUnlock(&pMgmt->lock);
383,812✔
502

503
  SSyncInfo syncInfo = {
383,812✔
504
      .snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT,
505
      .batchSize = 1,
506
      .vgId = 1,
507
      .pWal = pMnode->pWal,
383,812✔
508
      .msgcb = &pMnode->msgCb,
383,812✔
509
      .syncSendMSg = mndSyncSendMsg,
510
      .syncEqMsg = mndSyncEqMsg,
511
      .syncEqCtrlMsg = mndSyncEqCtrlMsg,
512
      .pingMs = 5000,
513
      .electMs = tsMnodeElectIntervalMs,
514
      .heartbeatMs = tsMnodeHeartbeatIntervalMs,
515
  };
516

517
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
383,812✔
518
  syncInfo.pFsm = mndSyncMakeFsm(pMnode);
383,812✔
519

520
  SSyncCfg *pCfg = &syncInfo.syncCfg;
383,812✔
521
  mInfo("vgId:1, start to open mnode sync, in syncMgmt, numOfTotalReplicas:%d selfIndex:%d, electMs:%d, heartbeatMs:%d",
383,812✔
522
        pMgmt->numOfTotalReplicas, pMgmt->selfIndex, syncInfo.electMs, syncInfo.heartbeatMs);
523
  pCfg->totalReplicaNum = pMgmt->numOfTotalReplicas;
383,812✔
524
  pCfg->replicaNum = pMgmt->numOfReplicas;
383,812✔
525
  pCfg->myIndex = pMgmt->selfIndex;
383,812✔
526
  pCfg->lastIndex = pMgmt->lastIndex;
383,812✔
527
  for (int32_t i = 0; i < pMgmt->numOfTotalReplicas; ++i) {
721,795✔
528
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
337,983✔
529
    pNode->nodeId = pMgmt->replicas[i].id;
337,983✔
530
    pNode->nodePort = pMgmt->replicas[i].port;
337,983✔
531
    tstrncpy(pNode->nodeFqdn, pMgmt->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
337,983✔
532
    pNode->nodeRole = pMgmt->nodeRoles[i];
337,983✔
533
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
337,983✔
534
    mInfo("vgId:1, index:%d ep:%s:%u dnode:%d cluster:%" PRId64 ", update:%d", i, pNode->nodeFqdn, pNode->nodePort,
337,983✔
535
          pNode->nodeId, pNode->clusterId, update);
536
  }
537

538
  int32_t code = 0;
383,812✔
539
  if ((code = tsem_init(&pMgmt->syncSem, 0, 0)) < 0) {
383,812✔
UNCOV
540
    mError("failed to open sync, tsem_init, since %s", tstrerror(code));
×
UNCOV
541
    TAOS_RETURN(code);
×
542
  }
543
  pMgmt->sync = syncOpen(&syncInfo, 1);  // always check
383,812✔
544
  if (pMgmt->sync <= 0) {
383,812✔
545
    if (terrno != 0) code = terrno;
×
UNCOV
546
    mError("failed to open sync since %s", tstrerror(code));
×
UNCOV
547
    TAOS_RETURN(code);
×
548
  }
549
  pMnode->pSdb->sync = pMgmt->sync;
383,812✔
550

551
  mInfo("vgId:1, mnode sync is opened, id:%" PRId64, pMgmt->sync);
383,812✔
552
  TAOS_RETURN(code);
383,812✔
553
}
554

555
void mndCleanupSync(SMnode *pMnode) {
383,751✔
556
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
383,751✔
557
  syncStop(pMgmt->sync);
383,751✔
558
  mInfo("mnode-sync is stopped, id:%" PRId64, pMgmt->sync);
383,751✔
559

560
  if (tsem_destroy(&pMgmt->syncSem) < 0) {
383,751✔
UNCOV
561
    mError("failed to destroy sem");
×
562
  }
563
  (void)taosThreadMutexDestroy(&pMgmt->lock);
383,751✔
564
  memset(pMgmt, 0, sizeof(SSyncMgmt));
383,751✔
565
}
383,751✔
566

567
void mndSyncCheckTimeout(SMnode *pMnode) {
652,364✔
568
  mTrace("check sync timeout");
652,364✔
569
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
652,364✔
570
  (void)taosThreadMutexLock(&pMgmt->lock);
652,364✔
571
  if (pMgmt->transId != 0) {
652,364✔
572
    int32_t curSec = taosGetTimestampSec();
8,655✔
573
    int32_t delta = curSec - pMgmt->transSec;
8,655✔
574
    if (delta > MNODE_TIMEOUT_SEC) {
8,655✔
UNCOV
575
      mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
×
576
             pMgmt->transSec, curSec, delta, pMgmt->transSeq);
577
      // pMgmt->transId = 0;
578
      // pMgmt->transSec = 0;
579
      // pMgmt->transSeq = 0;
580
      // terrno = TSDB_CODE_SYN_TIMEOUT;
581
      // pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
582
      // if (tsem_post(&pMgmt->syncSem) < 0) {
583
      //  mError("failed to post sem");
584
      //}
585
    } else {
586
      mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
8,655✔
587
             pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq);
588
    }
589
  } else {
590
    // mTrace("check sync timeout msg, no trans waiting for confirm");
591
  }
592
  (void)taosThreadMutexUnlock(&pMgmt->lock);
652,364✔
593
}
652,364✔
594

595
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
32,940,127✔
596
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
32,940,127✔
597

598
  SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
32,940,127✔
599
  if (req.contLen <= 0) return terrno;
32,940,127✔
600

601
  req.pCont = rpcMallocCont(req.contLen);
32,940,127✔
602
  if (req.pCont == NULL) return terrno;
32,940,127✔
603
  memcpy(req.pCont, pRaw, req.contLen);
32,940,127✔
604

605
  (void)taosThreadMutexLock(&pMgmt->lock);
32,940,127✔
606
  pMgmt->errCode = 0;
32,940,127✔
607

608
  if (pMgmt->transId != 0) {
32,940,127✔
UNCOV
609
    mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
×
UNCOV
610
    (void)taosThreadMutexUnlock(&pMgmt->lock);
×
UNCOV
611
    rpcFreeCont(req.pCont);
×
UNCOV
612
    TAOS_RETURN(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED);
×
613
  }
614

615
  mInfo("trans:%d, will be proposed", transId);
32,940,127✔
616
  pMgmt->transId = transId;
32,940,127✔
617
  pMgmt->transSec = taosGetTimestampSec();
32,940,127✔
618

619
  int64_t seq = 0;
32,940,127✔
620
  int32_t code = syncPropose(pMgmt->sync, &req, false, &seq);
32,940,127✔
621
  if (code == 0) {
32,940,127✔
622
    mInfo("trans:%d, is proposing and wait sem, seq:%" PRId64, transId, seq);
32,938,794✔
623
    pMgmt->transSeq = seq;
32,938,794✔
624
    (void)taosThreadMutexUnlock(&pMgmt->lock);
32,938,794✔
625
    code = tsem_wait(&pMgmt->syncSem);
32,938,794✔
626
  } else if (code > 0) {
1,333✔
UNCOV
627
    mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
×
UNCOV
628
    pMgmt->transId = 0;
×
UNCOV
629
    pMgmt->transSec = 0;
×
UNCOV
630
    pMgmt->transSeq = 0;
×
631
    (void)taosThreadMutexUnlock(&pMgmt->lock);
×
632
    code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
×
633
    if (code == 0) {
×
634
      sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
×
635
    }
636
  } else {
637
    mError("trans:%d, failed to proposed since %s", transId, terrstr());
1,333✔
638
    pMgmt->transId = 0;
1,333✔
639
    pMgmt->transSec = 0;
1,333✔
640
    pMgmt->transSeq = 0;
1,333✔
641
    (void)taosThreadMutexUnlock(&pMgmt->lock);
1,333✔
642
    if (terrno == 0) {
1,333✔
UNCOV
643
      terrno = TSDB_CODE_APP_ERROR;
×
644
    }
645
  }
646

647
  rpcFreeCont(req.pCont);
32,940,127✔
648
  req.pCont = NULL;
32,940,127✔
649
  if (code != 0) {
32,940,127✔
650
    mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code);
1,333✔
651
    return code;
1,333✔
652
  }
653

654
  terrno = pMgmt->errCode;
32,938,794✔
655
  return terrno;
32,938,794✔
656
}
657

658
void mndSyncStart(SMnode *pMnode) {
383,751✔
659
  mInfo("vgId:1, start to start mnode sync");
383,751✔
660
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
383,751✔
661
  if (syncStart(pMgmt->sync) < 0) {
383,751✔
UNCOV
662
    mError("vgId:1, failed to start sync, id:%" PRId64, pMgmt->sync);
×
UNCOV
663
    return;
×
664
  }
665
  mInfo("vgId:1, mnode sync started, id:%" PRId64, pMgmt->sync);
383,751✔
666
}
667

668
void mndSyncStop(SMnode *pMnode) {
383,751✔
669
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
383,751✔
670

671
  (void)taosThreadMutexLock(&pMgmt->lock);
383,751✔
672
  if (pMgmt->transId != 0) {
383,751✔
673
    mInfo("vgId:1, trans:%d, is stopped and post sem", pMgmt->transId);
327✔
674
    pMgmt->transId = 0;
327✔
675
    pMgmt->transSec = 0;
327✔
676
    pMgmt->errCode = TSDB_CODE_APP_IS_STOPPING;
327✔
677
    if (tsem_post(&pMgmt->syncSem) < 0) {
327✔
UNCOV
678
      mError("failed to post sem");
×
679
    }
680
  }
681
  (void)taosThreadMutexUnlock(&pMgmt->lock);
383,751✔
682
}
383,751✔
683

684
bool mndIsLeader(SMnode *pMnode) {
120,362,909✔
685
  terrno = 0;
120,362,909✔
686
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
120,362,909✔
687

688
  if (terrno != 0) {
120,362,909✔
UNCOV
689
    mDebug("vgId:1, mnode is stopping");
×
UNCOV
690
    return false;
×
691
  }
692

693
  if (state.state != TAOS_SYNC_STATE_LEADER) {
120,362,909✔
694
    terrno = TSDB_CODE_SYN_NOT_LEADER;
2,721,535✔
695
    mDebug("vgId:1, mnode not leader, state:%s", syncStr(state.state));
2,721,535✔
696
    return false;
2,721,535✔
697
  }
698

699
  if (!state.restored || !pMnode->restored) {
117,641,374✔
700
    terrno = TSDB_CODE_SYN_RESTORING;
10,021,154✔
701
    mDebug("vgId:1, mnode not restored:%d:%d", state.restored, pMnode->restored);
10,021,154✔
702
    return false;
10,021,154✔
703
  }
704

705
  return true;
107,620,220✔
706
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc