• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3566

25 Dec 2024 06:56AM UTC coverage: 62.422% (+11.3%) from 51.098%
#3566

push

travis-ci

web-flow
Merge pull request #29314 from taosdata/fix/TD-33275.2

fix: add more UT cases

138034 of 284473 branches covered (48.52%)

Branch coverage included in aggregate %.

215318 of 281594 relevant lines covered (76.46%)

9007186.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.6
/source/dnode/mnode/impl/src/mndSync.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndCluster.h"
18
#include "mndStream.h"
19
#include "mndSync.h"
20
#include "mndTrans.h"
21
#include "mndUser.h"
22

23
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
×
24
  if (pMsg == NULL || pMsg->pCont == NULL) {
×
25
    return -1;
×
26
  }
27

28
  SMsgHead *pHead = pMsg->pCont;
×
29
  pHead->contLen = htonl(pHead->contLen);
×
30
  pHead->vgId = htonl(pHead->vgId);
×
31

32
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
×
33
    rpcFreeCont(pMsg->pCont);
×
34
    pMsg->pCont = NULL;
×
35
    return -1;
×
36
  }
37

38
  int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
×
39
  if (code != 0) {
×
40
    rpcFreeCont(pMsg->pCont);
×
41
    pMsg->pCont = NULL;
×
42
  }
43
  return code;
×
44
}
45

46
static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
121,134✔
47
  if (pMsg == NULL || pMsg->pCont == NULL) {
121,134!
48
    return -1;
×
49
  }
50

51
  SMsgHead *pHead = pMsg->pCont;
121,134✔
52
  pHead->contLen = htonl(pHead->contLen);
121,134✔
53
  pHead->vgId = htonl(pHead->vgId);
121,134✔
54

55
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
121,134!
56
    rpcFreeCont(pMsg->pCont);
×
57
    pMsg->pCont = NULL;
×
58
    return -1;
×
59
  }
60

61
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
121,134✔
62
  if (code != 0) {
121,134!
63
    rpcFreeCont(pMsg->pCont);
×
64
    pMsg->pCont = NULL;
×
65
  }
66
  return code;
121,134✔
67
}
68

69
static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
49,192✔
70
  int32_t code = tmsgSendSyncReq(pEpSet, pMsg);
49,192✔
71
  if (code != 0) {
49,192!
72
    rpcFreeCont(pMsg->pCont);
×
73
    pMsg->pCont = NULL;
×
74
  }
75
  return code;
49,192✔
76
}
77

78
static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
94,489✔
79
  SSdbRaw *pRaw = pAction->pRaw;
94,489✔
80
  SSdb    *pSdb = pMnode->pSdb;
94,489✔
81
  int      code = 0;
94,489✔
82

83
  if (pRaw->status != SDB_STATUS_CREATING) goto _OUT;
94,489✔
84

85
  SdbValidateFp validateFp = pSdb->validateFps[pRaw->type];
83,944✔
86
  if (validateFp) {
83,944✔
87
    code = validateFp(pMnode, pTrans, pRaw);
83,930✔
88
  }
89

90
_OUT:
14✔
91
  return code;
94,489✔
92
}
93

94
static int32_t mndTransValidatePrepareStage(SMnode *pMnode, STrans *pTrans) {
69,200✔
95
  int32_t code = -1;
69,200✔
96
  int32_t action = 0;
69,200✔
97

98
  int32_t numOfActions = taosArrayGetSize(pTrans->prepareActions);
69,200✔
99
  if (numOfActions == 0) {
69,200✔
100
    code = 0;
31,064✔
101
    goto _OUT;
31,064✔
102
  }
103

104
  mInfo("trans:%d, validate %d prepare actions.", pTrans->id, numOfActions);
38,136!
105

106
  for (action = 0; action < numOfActions; ++action) {
132,625✔
107
    STransAction *pAction = taosArrayGet(pTrans->prepareActions, action);
94,489✔
108

109
    if (pAction->actionType != TRANS_ACTION_RAW) {
94,489!
110
      mError("trans:%d, prepare action:%d of unexpected type:%d", pTrans->id, action, pAction->actionType);
×
111
      goto _OUT;
×
112
    }
113

114
    code = mndTransValidatePrepareAction(pMnode, pTrans, pAction);
94,489✔
115
    if (code != 0) {
94,489!
116
      mError("trans:%d, failed to validate prepare action: %d, numOfActions:%d", pTrans->id, action, numOfActions);
×
117
      goto _OUT;
×
118
    }
119
  }
120

121
  code = 0;
38,136✔
122
_OUT:
69,200✔
123
  return code;
69,200✔
124
}
125

126
static int32_t mndTransValidateImp(SMnode *pMnode, STrans *pTrans) {
121,308✔
127
  int32_t code = 0;
121,308✔
128
  if (pTrans->stage == TRN_STAGE_PREPARE) {
121,308✔
129
    if ((code = mndTransCheckConflict(pMnode, pTrans)) < 0) {
69,200!
130
      mError("trans:%d, failed to validate trans conflicts.", pTrans->id);
×
131
      TAOS_RETURN(code);
×
132
    }
133

134
    return mndTransValidatePrepareStage(pMnode, pTrans);
69,200✔
135
  }
136
  TAOS_RETURN(code);
52,108✔
137
}
138

139
static int32_t mndTransValidate(SMnode *pMnode, SSdbRaw *pRaw) {
121,308✔
140
  STrans *pTrans = NULL;
121,308✔
141
  int32_t code = -1;
121,308✔
142

143
  SSdbRow *pRow = mndTransDecode(pRaw);
121,308✔
144
  if (pRow == NULL) {
121,308!
145
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
146
    if (terrno != 0) code = terrno;
×
147
    goto _OUT;
×
148
  }
149

150
  pTrans = sdbGetRowObj(pRow);
121,308✔
151
  if (pTrans == NULL) {
121,308!
152
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
153
    if (terrno != 0) code = terrno;
×
154
    goto _OUT;
×
155
  }
156

157
  code = mndTransValidateImp(pMnode, pTrans);
121,308✔
158

159
_OUT:
121,308✔
160
  if (pTrans) mndTransDropData(pTrans);
121,308!
161
  if (pRow) taosMemoryFreeClear(pRow);
121,308!
162
  if (code) terrno = (terrno ? terrno : TSDB_CODE_MND_TRANS_CONFLICT);
121,308!
163
  TAOS_RETURN(code);
121,308✔
164
}
165

166
int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
121,308✔
167
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
121,308✔
168
  SSdbRaw   *pRaw = pMsg->pCont;
121,308✔
169
  STrans    *pTrans = NULL;
121,308✔
170
  int32_t    code = -1;
121,308✔
171
  int32_t    transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
121,308✔
172

173
  if (transId <= 0) {
121,308!
174
    mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq);
×
175
    code = TSDB_CODE_INVALID_MSG;
×
176
    goto _OUT;
×
177
  }
178

179
  mInfo("trans:%d, process sync proposal, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
121,308!
180
        " role:%s raw:%p sec:%d seq:%" PRId64,
181
        transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state),
182
        pRaw, pMgmt->transSec, pMgmt->transSeq);
183

184
  code = mndTransValidate(pMnode, pRaw);
121,308✔
185
  if (code != 0) {
121,308!
186
    mError("trans:%d, failed to validate requested trans since %s", transId, terrstr());
×
187
    // code = 0;
188
    pMeta->code = code;
×
189
    goto _OUT;
×
190
  }
191

192
  (void)taosThreadMutexLock(&pMnode->pSdb->filelock);
121,308✔
193
  code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
121,308✔
194
  if (code != 0) {
121,308✔
195
    mError("trans:%d, failed to write to sdb since %s", transId, terrstr());
23,636!
196
    // code = 0;
197
    (void)taosThreadMutexUnlock(&pMnode->pSdb->filelock);
23,636✔
198
    pMeta->code = code;
23,636✔
199
    goto _OUT;
23,636✔
200
  }
201
  (void)taosThreadMutexUnlock(&pMnode->pSdb->filelock);
97,672✔
202

203
  pTrans = mndAcquireTrans(pMnode, transId);
97,672✔
204
  if (pTrans == NULL) {
97,672!
205
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
206
    if (terrno != 0) code = terrno;
×
207
    mError("trans:%d, not found while execute in mnode since %s", transId, tstrerror(code));
×
208
    goto _OUT;
×
209
  }
210

211
  if (pTrans->stage == TRN_STAGE_PREPARE) {
97,672✔
212
    bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans, false);
45,564✔
213
    if (!continueExec) {
45,564!
214
      if (terrno != 0) code = terrno;
×
215
      goto _OUT;
×
216
    }
217
  }
218

219
  mndTransRefresh(pMnode, pTrans);
97,672✔
220

221
  sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
97,672✔
222
  code = sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta);
97,672✔
223

224
_OUT:
121,308✔
225
  if (pTrans) mndReleaseTrans(pMnode, pTrans);
121,308✔
226
  TAOS_RETURN(code);
121,308✔
227
}
228

229
static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
146,495✔
230
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
146,495✔
231
  (void)taosThreadMutexLock(&pMgmt->lock);
146,495✔
232
  if (pMgmt->transId == 0) {
146,495✔
233
    goto _OUT;
42,817✔
234
  }
235

236
  int32_t transId = pMgmt->transId;
103,678✔
237
  pMgmt->transId = 0;
103,678✔
238
  pMgmt->transSec = 0;
103,678✔
239
  pMgmt->transSeq = 0;
103,678✔
240
  pMgmt->errCode = code;
103,678✔
241
  if (tsem_post(&pMgmt->syncSem) < 0) {
103,678!
242
    mError("trans:%d, failed to post sem", transId);
×
243
  }
244

245
  if (pMgmt->errCode != 0) {
103,678✔
246
    mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
23,636!
247
  } else {
248
    mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, transId, pMgmt->transSeq);
80,042!
249
  }
250

251
_OUT:
×
252
  (void)taosThreadMutexUnlock(&pMgmt->lock);
146,495✔
253
  return 0;
146,495✔
254
}
255

256
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
146,495✔
257
  SMnode *pMnode = pFsm->data;
146,495✔
258
  int32_t code = pMsg->code;
146,495✔
259
  if (code != 0) {
146,495✔
260
    goto _OUT;
23,125✔
261
  }
262

263
  pMsg->info.conn.applyIndex = pMeta->index;
123,370✔
264
  pMsg->info.conn.applyTerm = pMeta->term;
123,370✔
265
  pMeta->code = 0;
123,370✔
266

267
  atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
123,370✔
268

269
  if (!syncUtilUserCommit(pMsg->msgType)) {
123,370✔
270
    goto _OUT;
2,062✔
271
  }
272

273
  code = mndProcessWriteMsg(pMnode, pMsg, pMeta);
121,308✔
274

275
_OUT:
146,495✔
276
  mndPostMgmtCode(pMnode, code ? code : pMeta->code);
146,495✔
277
  rpcFreeCont(pMsg->pCont);
146,495✔
278
  pMsg->pCont = NULL;
146,495✔
279
  TAOS_RETURN(code);
146,495✔
280
}
281

282
SyncIndex mndSyncAppliedIndex(const SSyncFSM *pFSM) {
120,000✔
283
  SMnode *pMnode = pFSM->data;
120,000✔
284
  return atomic_load_64(&pMnode->applied);
120,000✔
285
}
286

287
int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
×
288
  mInfo("start to read snapshot from sdb in atomic way");
×
289
  SMnode *pMnode = pFsm->data;
×
290
  return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm,
×
291
                      &pSnapshot->lastConfigIndex);
×
292
  return 0;
293
}
294

295
static int32_t mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
77,438✔
296
  SMnode *pMnode = pFsm->data;
77,438✔
297
  sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
77,438✔
298
  return 0;
77,438✔
299
}
300

301
void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
1,814✔
302
  SMnode *pMnode = pFsm->data;
1,814✔
303

304
  if (!pMnode->deploy) {
1,814✔
305
    if (!pMnode->restored) {
528✔
306
      mInfo("vgId:1, sync restore finished, and will handle outstanding transactions");
487!
307
      mndTransPullup(pMnode);
487✔
308
      mndSetRestored(pMnode, true);
487✔
309
    } else {
310
      mInfo("vgId:1, sync restore finished, repeat call");
41!
311
    }
312
    if (sdbAfterRestored(pMnode->pSdb) != 0) {
528!
313
      mError("failed to prepare sdb while start mnode");
×
314
    }
315
  } else {
316
    mInfo("vgId:1, sync restore finished");
1,286!
317
  }
318
  int32_t code = mndRefreshUserIpWhiteList(pMnode);
1,814✔
319
  if (code != 0) {
1,814✔
320
    mError("vgId:1, failed to refresh user ip white list since %s", tstrerror(code));
1!
321
    mndSetRestored(pMnode, false);
1✔
322
  }
323

324
  SyncIndex fsmIndex = mndSyncAppliedIndex(pFsm);
1,814✔
325
  if (commitIdx != fsmIndex) {
1,814!
326
    mError("vgId:1, failed to sync restore, commitIdx:%" PRId64 " is not equal to appliedIdx:%" PRId64, commitIdx,
×
327
           fsmIndex);
328
    mndSetRestored(pMnode, false);
×
329
  }
330
}
1,814✔
331

332
int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
×
333
  mInfo("start to read snapshot from sdb");
×
334
  SMnode *pMnode = pFsm->data;
×
335
  return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL);
×
336
}
337

338
static void mndSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
×
339
  mInfo("stop to read snapshot from sdb");
×
340
  SMnode *pMnode = pFsm->data;
×
341
  sdbStopRead(pMnode->pSdb, pReader);
×
342
}
×
343

344
int32_t mndSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
×
345
  SMnode *pMnode = pFsm->data;
×
346
  return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len);
×
347
}
348

349
int32_t mndSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
×
350
  mInfo("start to apply snapshot to sdb");
×
351
  SMnode *pMnode = pFsm->data;
×
352
  return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter);
×
353
}
354

355
int32_t mndSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
×
356
  mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply,
×
357
        pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
358
  SMnode *pMnode = pFsm->data;
×
359
  return sdbStopWrite(pMnode->pSdb, pWriter, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm,
×
360
                      pSnapshot->lastConfigIndex);
361
}
362

363
int32_t mndSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
×
364
  SMnode *pMnode = pFsm->data;
×
365
  return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len);
×
366
}
367

368
static void mndBecomeFollower(const SSyncFSM *pFsm) {
302✔
369
  SMnode    *pMnode = pFsm->data;
302✔
370
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
302✔
371
  mInfo("vgId:1, become follower");
302!
372

373
  (void)taosThreadMutexLock(&pMgmt->lock);
302✔
374
  if (pMgmt->transId != 0) {
302!
375
    mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
×
376
    pMgmt->transId = 0;
×
377
    pMgmt->transSec = 0;
×
378
    pMgmt->transSeq = 0;
×
379
    pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
×
380
    if (tsem_post(&pMgmt->syncSem) < 0) {
×
381
      mError("failed to post sem");
×
382
    }
383
  }
384
  (void)taosThreadMutexUnlock(&pMgmt->lock);
302✔
385

386
  mndUpdateStreamExecInfoRole(pMnode, NODE_ROLE_FOLLOWER);
302✔
387
}
302✔
388

389
static void mndBecomeLearner(const SSyncFSM *pFsm) {
96✔
390
  SMnode    *pMnode = pFsm->data;
96✔
391
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
96✔
392
  mInfo("vgId:1, become learner");
96!
393

394
  (void)taosThreadMutexLock(&pMgmt->lock);
96✔
395
  if (pMgmt->transId != 0) {
96!
396
    mInfo("vgId:1, become learner and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
×
397
    pMgmt->transId = 0;
×
398
    pMgmt->transSec = 0;
×
399
    pMgmt->transSeq = 0;
×
400
    pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
×
401
    if (tsem_post(&pMgmt->syncSem) < 0) {
×
402
      mError("failed to post sem");
×
403
    }
404
  }
405
  (void)taosThreadMutexUnlock(&pMgmt->lock);
96✔
406
}
96✔
407

408
static void mndBecomeLeader(const SSyncFSM *pFsm) {
1,550✔
409
  mInfo("vgId:1, become leader");
1,550!
410
  SMnode *pMnode = pFsm->data;
1,550✔
411

412
  mndUpdateStreamExecInfoRole(pMnode, NODE_ROLE_LEADER);
1,550✔
413
  mndStreamResetInitTaskListLoadFlag();
1,550✔
414
}
1,550✔
415

416
static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) {
×
417
  SMnode *pMnode = pFsm->data;
×
418

419
  if (pMnode != NULL && pMnode->msgCb.qsizeFp != NULL) {
×
420
    int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
×
421
    return (itemSize == 0);
×
422
  } else {
423
    return true;
×
424
  }
425
}
426

427
static int32_t mndApplyQueueItems(const SSyncFSM *pFsm) {
35,961✔
428
  SMnode *pMnode = pFsm->data;
35,961✔
429

430
  if (pMnode != NULL && pMnode->msgCb.qsizeFp != NULL) {
35,961!
431
    int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
×
432
    return itemSize;
×
433
  } else {
434
    return -1;
35,961✔
435
  }
436
}
437

438
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
1,778✔
439
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
1,778!
440
  pFsm->data = pMnode;
1,778✔
441
  pFsm->FpCommitCb = mndSyncCommitMsg;
1,778✔
442
  pFsm->FpAppliedIndexCb = mndSyncAppliedIndex;
1,778✔
443
  pFsm->FpPreCommitCb = NULL;
1,778✔
444
  pFsm->FpRollBackCb = NULL;
1,778✔
445
  pFsm->FpRestoreFinishCb = mndRestoreFinish;
1,778✔
446
  pFsm->FpLeaderTransferCb = NULL;
1,778✔
447
  pFsm->FpApplyQueueEmptyCb = mndApplyQueueEmpty;
1,778✔
448
  pFsm->FpApplyQueueItems = mndApplyQueueItems;
1,778✔
449
  pFsm->FpReConfigCb = NULL;
1,778✔
450
  pFsm->FpBecomeLeaderCb = mndBecomeLeader;
1,778✔
451
  pFsm->FpBecomeAssignedLeaderCb = NULL;
1,778✔
452
  pFsm->FpBecomeFollowerCb = mndBecomeFollower;
1,778✔
453
  pFsm->FpBecomeLearnerCb = mndBecomeLearner;
1,778✔
454
  pFsm->FpGetSnapshot = mndSyncGetSnapshot;
1,778✔
455
  pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo;
1,778✔
456
  pFsm->FpSnapshotStartRead = mndSnapshotStartRead;
1,778✔
457
  pFsm->FpSnapshotStopRead = mndSnapshotStopRead;
1,778✔
458
  pFsm->FpSnapshotDoRead = mndSnapshotDoRead;
1,778✔
459
  pFsm->FpSnapshotStartWrite = mndSnapshotStartWrite;
1,778✔
460
  pFsm->FpSnapshotStopWrite = mndSnapshotStopWrite;
1,778✔
461
  pFsm->FpSnapshotDoWrite = mndSnapshotDoWrite;
1,778✔
462
  return pFsm;
1,778✔
463
}
464

465
int32_t mndInitSync(SMnode *pMnode) {
1,778✔
466
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
1,778✔
467
  (void)taosThreadMutexInit(&pMgmt->lock, NULL);
1,778✔
468
  (void)taosThreadMutexLock(&pMgmt->lock);
1,778✔
469
  pMgmt->transId = 0;
1,778✔
470
  pMgmt->transSec = 0;
1,778✔
471
  pMgmt->transSeq = 0;
1,778✔
472
  (void)taosThreadMutexUnlock(&pMgmt->lock);
1,778✔
473

474
  SSyncInfo syncInfo = {
1,778✔
475
      .snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT,
476
      .batchSize = 1,
477
      .vgId = 1,
478
      .pWal = pMnode->pWal,
1,778✔
479
      .msgcb = &pMnode->msgCb,
1,778✔
480
      .syncSendMSg = mndSyncSendMsg,
481
      .syncEqMsg = mndSyncEqMsg,
482
      .syncEqCtrlMsg = mndSyncEqCtrlMsg,
483
      .pingMs = 5000,
484
      .electMs = 3000,
485
      .heartbeatMs = 500,
486
  };
487

488
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
1,778✔
489
  syncInfo.pFsm = mndSyncMakeFsm(pMnode);
1,778✔
490

491
  mInfo("vgId:1, start to open mnode sync, replica:%d selfIndex:%d", pMgmt->numOfReplicas, pMgmt->selfIndex);
1,778!
492
  SSyncCfg *pCfg = &syncInfo.syncCfg;
1,778✔
493
  pCfg->totalReplicaNum = pMgmt->numOfTotalReplicas;
1,778✔
494
  pCfg->replicaNum = pMgmt->numOfReplicas;
1,778✔
495
  pCfg->myIndex = pMgmt->selfIndex;
1,778✔
496
  pCfg->lastIndex = pMgmt->lastIndex;
1,778✔
497
  for (int32_t i = 0; i < pMgmt->numOfTotalReplicas; ++i) {
3,542✔
498
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
1,764✔
499
    pNode->nodeId = pMgmt->replicas[i].id;
1,764✔
500
    pNode->nodePort = pMgmt->replicas[i].port;
1,764✔
501
    tstrncpy(pNode->nodeFqdn, pMgmt->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
1,764✔
502
    pNode->nodeRole = pMgmt->nodeRoles[i];
1,764✔
503
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
1,764✔
504
    mInfo("vgId:1, index:%d ep:%s:%u dnode:%d cluster:%" PRId64 ", update:%d", i, pNode->nodeFqdn, pNode->nodePort,
1,764!
505
          pNode->nodeId, pNode->clusterId, update);
506
  }
507

508
  int32_t code = 0;
1,778✔
509
  if ((code = tsem_init(&pMgmt->syncSem, 0, 0)) < 0) {
1,778!
510
    mError("failed to open sync, tsem_init, since %s", tstrerror(code));
×
511
    TAOS_RETURN(code);
×
512
  }
513
  pMgmt->sync = syncOpen(&syncInfo, 1);  // always check
1,778✔
514
  if (pMgmt->sync <= 0) {
1,778!
515
    if (terrno != 0) code = terrno;
×
516
    mError("failed to open sync since %s", tstrerror(code));
×
517
    TAOS_RETURN(code);
×
518
  }
519
  pMnode->pSdb->sync = pMgmt->sync;
1,778✔
520

521
  mInfo("vgId:1, mnode sync is opened, id:%" PRId64, pMgmt->sync);
1,778!
522
  TAOS_RETURN(code);
1,778✔
523
}
524

525
void mndCleanupSync(SMnode *pMnode) {
1,777✔
526
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
1,777✔
527
  syncStop(pMgmt->sync);
1,777✔
528
  mInfo("mnode-sync is stopped, id:%" PRId64, pMgmt->sync);
1,777!
529

530
  if (tsem_destroy(&pMgmt->syncSem) < 0) {
1,777!
531
    mError("failed to destroy sem");
×
532
  }
533
  (void)taosThreadMutexDestroy(&pMgmt->lock);
1,777✔
534
  memset(pMgmt, 0, sizeof(SSyncMgmt));
1,777✔
535
}
1,777✔
536

537
void mndSyncCheckTimeout(SMnode *pMnode) {
1,541✔
538
  mTrace("check sync timeout");
1,541✔
539
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
1,541✔
540
  (void)taosThreadMutexLock(&pMgmt->lock);
1,541✔
541
  if (pMgmt->transId != 0) {
1,541✔
542
    int32_t curSec = taosGetTimestampSec();
14✔
543
    int32_t delta = curSec - pMgmt->transSec;
14✔
544
    if (delta > MNODE_TIMEOUT_SEC) {
14!
545
      mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
×
546
             pMgmt->transSec, curSec, delta, pMgmt->transSeq);
547
      // pMgmt->transId = 0;
548
      // pMgmt->transSec = 0;
549
      // pMgmt->transSeq = 0;
550
      // terrno = TSDB_CODE_SYN_TIMEOUT;
551
      // pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
552
      // if (tsem_post(&pMgmt->syncSem) < 0) {
553
      //  mError("failed to post sem");
554
      //}
555
    } else {
556
      mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
14!
557
             pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq);
558
    }
559
  } else {
560
    // mTrace("check sync timeout msg, no trans waiting for confirm");
561
  }
562
  (void)taosThreadMutexUnlock(&pMgmt->lock);
1,541✔
563
}
1,541✔
564

565
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
103,777✔
566
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
103,777✔
567

568
  SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
103,777✔
569
  if (req.contLen <= 0) return terrno;
103,777!
570

571
  req.pCont = rpcMallocCont(req.contLen);
103,777✔
572
  if (req.pCont == NULL) return terrno;
103,777!
573
  memcpy(req.pCont, pRaw, req.contLen);
103,777✔
574

575
  (void)taosThreadMutexLock(&pMgmt->lock);
103,777✔
576
  pMgmt->errCode = 0;
103,777✔
577

578
  if (pMgmt->transId != 0) {
103,777✔
579
    mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
74!
580
    (void)taosThreadMutexUnlock(&pMgmt->lock);
74✔
581
    rpcFreeCont(req.pCont);
74✔
582
    TAOS_RETURN(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED);
74✔
583
  }
584

585
  mInfo("trans:%d, will be proposed", transId);
103,703!
586
  pMgmt->transId = transId;
103,703✔
587
  pMgmt->transSec = taosGetTimestampSec();
103,703✔
588

589
  int64_t seq = 0;
103,703✔
590
  int32_t code = syncPropose(pMgmt->sync, &req, false, &seq);
103,703✔
591
  if (code == 0) {
103,703✔
592
    mInfo("trans:%d, is proposing and wait sem, seq:%" PRId64, transId, seq);
103,702!
593
    pMgmt->transSeq = seq;
103,702✔
594
    (void)taosThreadMutexUnlock(&pMgmt->lock);
103,702✔
595
    code = tsem_wait(&pMgmt->syncSem);
103,702✔
596
  } else if (code > 0) {
1!
597
    mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
×
598
    pMgmt->transId = 0;
×
599
    pMgmt->transSec = 0;
×
600
    pMgmt->transSeq = 0;
×
601
    (void)taosThreadMutexUnlock(&pMgmt->lock);
×
602
    code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
×
603
    if (code == 0) {
×
604
      sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
×
605
    }
606
  } else {
607
    mError("trans:%d, failed to proposed since %s", transId, terrstr());
1!
608
    pMgmt->transId = 0;
1✔
609
    pMgmt->transSec = 0;
1✔
610
    pMgmt->transSeq = 0;
1✔
611
    (void)taosThreadMutexUnlock(&pMgmt->lock);
1✔
612
    if (terrno == 0) {
1!
613
      terrno = TSDB_CODE_APP_ERROR;
×
614
    }
615
  }
616

617
  rpcFreeCont(req.pCont);
103,703✔
618
  req.pCont = NULL;
103,703✔
619
  if (code != 0) {
103,703✔
620
    mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code);
1!
621
    return code;
1✔
622
  }
623

624
  terrno = pMgmt->errCode;
103,702✔
625
  return terrno;
103,702✔
626
}
627

628
void mndSyncStart(SMnode *pMnode) {
1,777✔
629
  mInfo("vgId:1, start to start mnode sync");
1,777!
630
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
1,777✔
631
  if (syncStart(pMgmt->sync) < 0) {
1,777!
632
    mError("vgId:1, failed to start sync, id:%" PRId64, pMgmt->sync);
×
633
    return;
×
634
  }
635
  mInfo("vgId:1, mnode sync started, id:%" PRId64, pMgmt->sync);
1,777!
636
}
637

638
void mndSyncStop(SMnode *pMnode) {
1,777✔
639
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
1,777✔
640

641
  (void)taosThreadMutexLock(&pMgmt->lock);
1,777✔
642
  if (pMgmt->transId != 0) {
1,777✔
643
    mInfo("vgId:1, trans:%d, is stopped and post sem", pMgmt->transId);
24!
644
    pMgmt->transId = 0;
24✔
645
    pMgmt->transSec = 0;
24✔
646
    pMgmt->errCode = TSDB_CODE_APP_IS_STOPPING;
24✔
647
    if (tsem_post(&pMgmt->syncSem) < 0) {
24!
648
      mError("failed to post sem");
×
649
    }
650
  }
651
  (void)taosThreadMutexUnlock(&pMgmt->lock);
1,777✔
652
}
1,777✔
653

654
bool mndIsLeader(SMnode *pMnode) {
514,354✔
655
  terrno = 0;
514,354✔
656
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
514,359✔
657

658
  if (terrno != 0) {
514,364!
659
    mDebug("vgId:1, mnode is stopping");
×
660
    return false;
×
661
  }
662

663
  if (state.state != TAOS_SYNC_STATE_LEADER) {
514,364✔
664
    terrno = TSDB_CODE_SYN_NOT_LEADER;
42,097✔
665
    mDebug("vgId:1, mnode not leader, state:%s", syncStr(state.state));
42,097✔
666
    return false;
42,097✔
667
  }
668

669
  if (!state.restored || !pMnode->restored) {
472,267✔
670
    terrno = TSDB_CODE_SYN_RESTORING;
23,890✔
671
    mDebug("vgId:1, mnode not restored:%d:%d", state.restored, pMnode->restored);
23,890✔
672
    return false;
23,890✔
673
  }
674

675
  return true;
448,377✔
676
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc