• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mnode/impl/src/mndSync.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndCluster.h"
18
#include "mndStream.h"
19
#include "mndSync.h"
20
#include "mndTrans.h"
21
#include "mndUser.h"
22

23
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
×
24
  if (pMsg == NULL || pMsg->pCont == NULL) {
×
25
    return -1;
×
26
  }
27

28
  SMsgHead *pHead = pMsg->pCont;
×
29
  pHead->contLen = htonl(pHead->contLen);
×
30
  pHead->vgId = htonl(pHead->vgId);
×
31

32
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
×
33
    rpcFreeCont(pMsg->pCont);
×
34
    pMsg->pCont = NULL;
×
35
    return -1;
×
36
  }
37

38
  int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
×
39
  if (code != 0) {
×
40
    rpcFreeCont(pMsg->pCont);
×
41
    pMsg->pCont = NULL;
×
42
  }
43
  return code;
×
44
}
45

UNCOV
46
static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
×
UNCOV
47
  if (pMsg == NULL || pMsg->pCont == NULL) {
×
48
    return -1;
×
49
  }
50

UNCOV
51
  SMsgHead *pHead = pMsg->pCont;
×
UNCOV
52
  pHead->contLen = htonl(pHead->contLen);
×
UNCOV
53
  pHead->vgId = htonl(pHead->vgId);
×
54

UNCOV
55
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
×
56
    rpcFreeCont(pMsg->pCont);
×
57
    pMsg->pCont = NULL;
×
58
    return -1;
×
59
  }
60

UNCOV
61
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
×
UNCOV
62
  if (code != 0) {
×
UNCOV
63
    rpcFreeCont(pMsg->pCont);
×
UNCOV
64
    pMsg->pCont = NULL;
×
65
  }
UNCOV
66
  return code;
×
67
}
68

UNCOV
69
static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
×
UNCOV
70
  int32_t code = tmsgSendSyncReq(pEpSet, pMsg);
×
UNCOV
71
  if (code != 0) {
×
72
    rpcFreeCont(pMsg->pCont);
×
73
    pMsg->pCont = NULL;
×
74
  }
UNCOV
75
  return code;
×
76
}
77

UNCOV
78
static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
×
UNCOV
79
  SSdbRaw *pRaw = pAction->pRaw;
×
UNCOV
80
  SSdb    *pSdb = pMnode->pSdb;
×
UNCOV
81
  int      code = 0;
×
82

UNCOV
83
  if (pRaw->status != SDB_STATUS_CREATING) goto _OUT;
×
84

UNCOV
85
  SdbValidateFp validateFp = pSdb->validateFps[pRaw->type];
×
UNCOV
86
  if (validateFp) {
×
UNCOV
87
    code = validateFp(pMnode, pTrans, pRaw);
×
88
  }
89

UNCOV
90
_OUT:
×
UNCOV
91
  return code;
×
92
}
93

UNCOV
94
static int32_t mndTransValidatePrepareStage(SMnode *pMnode, STrans *pTrans) {
×
UNCOV
95
  int32_t code = -1;
×
UNCOV
96
  int32_t action = 0;
×
97

UNCOV
98
  int32_t numOfActions = taosArrayGetSize(pTrans->prepareActions);
×
UNCOV
99
  if (numOfActions == 0) {
×
UNCOV
100
    code = 0;
×
UNCOV
101
    goto _OUT;
×
102
  }
103

UNCOV
104
  mInfo("trans:%d, validate %d prepare actions.", pTrans->id, numOfActions);
×
105

UNCOV
106
  for (action = 0; action < numOfActions; ++action) {
×
UNCOV
107
    STransAction *pAction = taosArrayGet(pTrans->prepareActions, action);
×
108

UNCOV
109
    if (pAction->actionType != TRANS_ACTION_RAW) {
×
110
      mError("trans:%d, prepare action:%d of unexpected type:%d", pTrans->id, action, pAction->actionType);
×
111
      goto _OUT;
×
112
    }
113

UNCOV
114
    code = mndTransValidatePrepareAction(pMnode, pTrans, pAction);
×
UNCOV
115
    if (code != 0) {
×
116
      mError("trans:%d, failed to validate prepare action: %d, numOfActions:%d", pTrans->id, action, numOfActions);
×
117
      goto _OUT;
×
118
    }
119
  }
120

UNCOV
121
  code = 0;
×
UNCOV
122
_OUT:
×
UNCOV
123
  return code;
×
124
}
125

UNCOV
126
static int32_t mndTransValidateImp(SMnode *pMnode, STrans *pTrans) {
×
UNCOV
127
  int32_t code = 0;
×
UNCOV
128
  if (pTrans->stage == TRN_STAGE_PREPARE) {
×
UNCOV
129
    if ((code = mndTransCheckConflict(pMnode, pTrans)) < 0) {
×
130
      mError("trans:%d, failed to validate trans conflicts.", pTrans->id);
×
131
      TAOS_RETURN(code);
×
132
    }
133

UNCOV
134
    return mndTransValidatePrepareStage(pMnode, pTrans);
×
135
  }
UNCOV
136
  TAOS_RETURN(code);
×
137
}
138

UNCOV
139
static int32_t mndTransValidate(SMnode *pMnode, SSdbRaw *pRaw) {
×
UNCOV
140
  STrans *pTrans = NULL;
×
UNCOV
141
  int32_t code = -1;
×
142

UNCOV
143
  SSdbRow *pRow = mndTransDecode(pRaw);
×
UNCOV
144
  if (pRow == NULL) {
×
145
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
146
    if (terrno != 0) code = terrno;
×
147
    goto _OUT;
×
148
  }
149

UNCOV
150
  pTrans = sdbGetRowObj(pRow);
×
UNCOV
151
  if (pTrans == NULL) {
×
152
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
153
    if (terrno != 0) code = terrno;
×
154
    goto _OUT;
×
155
  }
156

UNCOV
157
  code = mndTransValidateImp(pMnode, pTrans);
×
158

UNCOV
159
_OUT:
×
UNCOV
160
  if (pTrans) mndTransDropData(pTrans);
×
UNCOV
161
  if (pRow) taosMemoryFreeClear(pRow);
×
UNCOV
162
  if (code) terrno = (terrno ? terrno : TSDB_CODE_MND_TRANS_CONFLICT);
×
UNCOV
163
  TAOS_RETURN(code);
×
164
}
165

UNCOV
166
int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
×
UNCOV
167
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
×
UNCOV
168
  SSdbRaw   *pRaw = pMsg->pCont;
×
UNCOV
169
  STrans    *pTrans = NULL;
×
UNCOV
170
  int32_t    code = -1;
×
UNCOV
171
  int32_t    transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
×
172

UNCOV
173
  if (transId <= 0) {
×
174
    mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq);
×
175
    code = TSDB_CODE_INVALID_MSG;
×
176
    goto _OUT;
×
177
  }
178

UNCOV
179
  mInfo("trans:%d, process sync proposal, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
×
180
        " role:%s raw:%p sec:%d seq:%" PRId64,
181
        transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state),
182
        pRaw, pMgmt->transSec, pMgmt->transSeq);
183

UNCOV
184
  code = mndTransValidate(pMnode, pRaw);
×
UNCOV
185
  if (code != 0) {
×
186
    mError("trans:%d, failed to validate requested trans since %s", transId, terrstr());
×
187
    // code = 0;
188
    pMeta->code = code;
×
189
    goto _OUT;
×
190
  }
191

UNCOV
192
  (void)taosThreadMutexLock(&pMnode->pSdb->filelock);
×
UNCOV
193
  code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
×
UNCOV
194
  if (code != 0) {
×
195
    mError("trans:%d, failed to write to sdb since %s", transId, terrstr());
×
196
    // code = 0;
197
    (void)taosThreadMutexUnlock(&pMnode->pSdb->filelock);
×
198
    pMeta->code = code;
×
199
    goto _OUT;
×
200
  }
UNCOV
201
  (void)taosThreadMutexUnlock(&pMnode->pSdb->filelock);
×
202

UNCOV
203
  pTrans = mndAcquireTrans(pMnode, transId);
×
UNCOV
204
  if (pTrans == NULL) {
×
205
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
206
    if (terrno != 0) code = terrno;
×
207
    mError("trans:%d, not found while execute in mnode since %s", transId, tstrerror(code));
×
208
    goto _OUT;
×
209
  }
210

UNCOV
211
  if (pTrans->stage == TRN_STAGE_PREPARE) {
×
UNCOV
212
    bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans, false);
×
UNCOV
213
    if (!continueExec) {
×
214
      if (terrno != 0) code = terrno;
×
215
      goto _OUT;
×
216
    }
217
  }
218

UNCOV
219
  mndTransRefresh(pMnode, pTrans);
×
220

UNCOV
221
  sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
×
UNCOV
222
  code = sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta);
×
223

UNCOV
224
_OUT:
×
UNCOV
225
  if (pTrans) mndReleaseTrans(pMnode, pTrans);
×
UNCOV
226
  TAOS_RETURN(code);
×
227
}
228

UNCOV
229
static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
×
UNCOV
230
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
×
UNCOV
231
  (void)taosThreadMutexLock(&pMgmt->lock);
×
UNCOV
232
  if (pMgmt->transId == 0) {
×
UNCOV
233
    goto _OUT;
×
234
  }
235

UNCOV
236
  int32_t transId = pMgmt->transId;
×
UNCOV
237
  pMgmt->transId = 0;
×
UNCOV
238
  pMgmt->transSec = 0;
×
UNCOV
239
  pMgmt->transSeq = 0;
×
UNCOV
240
  pMgmt->errCode = code;
×
UNCOV
241
  if (tsem_post(&pMgmt->syncSem) < 0) {
×
242
    mError("trans:%d, failed to post sem", transId);
×
243
  }
244

UNCOV
245
  if (pMgmt->errCode != 0) {
×
246
    mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
×
247
  } else {
UNCOV
248
    mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, transId, pMgmt->transSeq);
×
249
  }
250

251
_OUT:
×
UNCOV
252
  (void)taosThreadMutexUnlock(&pMgmt->lock);
×
UNCOV
253
  return 0;
×
254
}
255

UNCOV
256
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
×
UNCOV
257
  SMnode *pMnode = pFsm->data;
×
UNCOV
258
  int32_t code = pMsg->code;
×
UNCOV
259
  if (code != 0) {
×
260
    goto _OUT;
×
261
  }
262

UNCOV
263
  pMsg->info.conn.applyIndex = pMeta->index;
×
UNCOV
264
  pMsg->info.conn.applyTerm = pMeta->term;
×
UNCOV
265
  pMeta->code = 0;
×
266

UNCOV
267
  atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
×
268

UNCOV
269
  if (!syncUtilUserCommit(pMsg->msgType)) {
×
UNCOV
270
    goto _OUT;
×
271
  }
272

UNCOV
273
  code = mndProcessWriteMsg(pMnode, pMsg, pMeta);
×
274

UNCOV
275
_OUT:
×
UNCOV
276
  mndPostMgmtCode(pMnode, code ? code : pMeta->code);
×
UNCOV
277
  rpcFreeCont(pMsg->pCont);
×
UNCOV
278
  pMsg->pCont = NULL;
×
UNCOV
279
  TAOS_RETURN(code);
×
280
}
281

UNCOV
282
SyncIndex mndSyncAppliedIndex(const SSyncFSM *pFSM) {
×
UNCOV
283
  SMnode *pMnode = pFSM->data;
×
UNCOV
284
  return atomic_load_64(&pMnode->applied);
×
285
}
286

287
int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
×
288
  mInfo("start to read snapshot from sdb in atomic way");
×
289
  SMnode *pMnode = pFsm->data;
×
290
  return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm,
×
291
                      &pSnapshot->lastConfigIndex);
×
292
  return 0;
293
}
294

UNCOV
295
static int32_t mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
×
UNCOV
296
  SMnode *pMnode = pFsm->data;
×
UNCOV
297
  sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
×
UNCOV
298
  return 0;
×
299
}
300

UNCOV
301
void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
×
UNCOV
302
  SMnode *pMnode = pFsm->data;
×
303

UNCOV
304
  if (!pMnode->deploy) {
×
UNCOV
305
    if (!pMnode->restored) {
×
UNCOV
306
      mInfo("vgId:1, sync restore finished, and will handle outstanding transactions");
×
UNCOV
307
      mndTransPullup(pMnode);
×
UNCOV
308
      mndSetRestored(pMnode, true);
×
309
    } else {
UNCOV
310
      mInfo("vgId:1, sync restore finished, repeat call");
×
311
    }
312
  } else {
UNCOV
313
    mInfo("vgId:1, sync restore finished");
×
314
  }
UNCOV
315
  int32_t code = mndRefreshUserIpWhiteList(pMnode);
×
UNCOV
316
  if (code != 0) {
×
UNCOV
317
    mError("vgId:1, failed to refresh user ip white list since %s", tstrerror(code));
×
UNCOV
318
    mndSetRestored(pMnode, false);
×
319
  }
320

UNCOV
321
  SyncIndex fsmIndex = mndSyncAppliedIndex(pFsm);
×
UNCOV
322
  if (commitIdx != fsmIndex) {
×
323
    mError("vgId:1, failed to sync restore, commitIdx:%" PRId64 " is not equal to appliedIdx:%" PRId64, commitIdx,
×
324
           fsmIndex);
325
    mndSetRestored(pMnode, false);
×
326
  }
UNCOV
327
}
×
328

UNCOV
329
void mndAfterRestored(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
×
UNCOV
330
  SMnode *pMnode = pFsm->data;
×
331

UNCOV
332
  if (!pMnode->deploy) {
×
UNCOV
333
    if (sdbAfterRestored(pMnode->pSdb) != 0) {
×
334
      mError("failed to prepare sdb while start mnode");
×
335
    }
UNCOV
336
    mInfo("vgId:1, sync restore finished and restore sdb success");
×
337
  }
UNCOV
338
}
×
339

340
int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
×
341
  mInfo("start to read snapshot from sdb");
×
342
  SMnode *pMnode = pFsm->data;
×
343
  return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL);
×
344
}
345

346
static void mndSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
×
347
  mInfo("stop to read snapshot from sdb");
×
348
  SMnode *pMnode = pFsm->data;
×
349
  sdbStopRead(pMnode->pSdb, pReader);
×
350
}
×
351

352
int32_t mndSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
×
353
  SMnode *pMnode = pFsm->data;
×
354
  return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len);
×
355
}
356

357
int32_t mndSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
×
358
  mInfo("start to apply snapshot to sdb");
×
359
  SMnode *pMnode = pFsm->data;
×
360
  return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter);
×
361
}
362

363
int32_t mndSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
×
364
  mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply,
×
365
        pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
366
  SMnode *pMnode = pFsm->data;
×
367
  return sdbStopWrite(pMnode->pSdb, pWriter, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm,
×
368
                      pSnapshot->lastConfigIndex);
369
}
370

371
int32_t mndSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
×
372
  SMnode *pMnode = pFsm->data;
×
373
  return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len);
×
374
}
375

UNCOV
376
static void mndBecomeFollower(const SSyncFSM *pFsm) {
×
UNCOV
377
  SMnode    *pMnode = pFsm->data;
×
UNCOV
378
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
×
UNCOV
379
  mInfo("vgId:1, become follower");
×
380

UNCOV
381
  (void)taosThreadMutexLock(&pMgmt->lock);
×
UNCOV
382
  if (pMgmt->transId != 0) {
×
383
    mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
×
384
    pMgmt->transId = 0;
×
385
    pMgmt->transSec = 0;
×
386
    pMgmt->transSeq = 0;
×
387
    pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
×
388
    if (tsem_post(&pMgmt->syncSem) < 0) {
×
389
      mError("failed to post sem");
×
390
    }
391
  }
UNCOV
392
  (void)taosThreadMutexUnlock(&pMgmt->lock);
×
393

UNCOV
394
  mndUpdateStreamExecInfoRole(pMnode, NODE_ROLE_FOLLOWER);
×
UNCOV
395
}
×
396

UNCOV
397
static void mndBecomeLearner(const SSyncFSM *pFsm) {
×
UNCOV
398
  SMnode    *pMnode = pFsm->data;
×
UNCOV
399
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
×
UNCOV
400
  mInfo("vgId:1, become learner");
×
401

UNCOV
402
  (void)taosThreadMutexLock(&pMgmt->lock);
×
UNCOV
403
  if (pMgmt->transId != 0) {
×
404
    mInfo("vgId:1, become learner and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
×
405
    pMgmt->transId = 0;
×
406
    pMgmt->transSec = 0;
×
407
    pMgmt->transSeq = 0;
×
408
    pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
×
409
    if (tsem_post(&pMgmt->syncSem) < 0) {
×
410
      mError("failed to post sem");
×
411
    }
412
  }
UNCOV
413
  (void)taosThreadMutexUnlock(&pMgmt->lock);
×
UNCOV
414
}
×
415

UNCOV
416
static void mndBecomeLeader(const SSyncFSM *pFsm) {
×
UNCOV
417
  mInfo("vgId:1, become leader");
×
UNCOV
418
  SMnode *pMnode = pFsm->data;
×
419

UNCOV
420
  mndUpdateStreamExecInfoRole(pMnode, NODE_ROLE_LEADER);
×
UNCOV
421
  mndStreamResetInitTaskListLoadFlag();
×
UNCOV
422
}
×
423

424
static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) {
×
425
  SMnode *pMnode = pFsm->data;
×
426

427
  if (pMnode != NULL && pMnode->msgCb.qsizeFp != NULL) {
×
428
    int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
×
429
    return (itemSize == 0);
×
430
  } else {
431
    return true;
×
432
  }
433
}
434

UNCOV
435
static int32_t mndApplyQueueItems(const SSyncFSM *pFsm) {
×
UNCOV
436
  SMnode *pMnode = pFsm->data;
×
437

UNCOV
438
  if (pMnode != NULL && pMnode->msgCb.qsizeFp != NULL) {
×
439
    int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
×
440
    return itemSize;
×
441
  } else {
UNCOV
442
    return -1;
×
443
  }
444
}
445

UNCOV
446
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
×
UNCOV
447
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
×
UNCOV
448
  pFsm->data = pMnode;
×
UNCOV
449
  pFsm->FpCommitCb = mndSyncCommitMsg;
×
UNCOV
450
  pFsm->FpAppliedIndexCb = mndSyncAppliedIndex;
×
UNCOV
451
  pFsm->FpPreCommitCb = NULL;
×
UNCOV
452
  pFsm->FpRollBackCb = NULL;
×
UNCOV
453
  pFsm->FpRestoreFinishCb = mndRestoreFinish;
×
UNCOV
454
  pFsm->FpAfterRestoredCb = mndAfterRestored;
×
UNCOV
455
  pFsm->FpLeaderTransferCb = NULL;
×
UNCOV
456
  pFsm->FpApplyQueueEmptyCb = mndApplyQueueEmpty;
×
UNCOV
457
  pFsm->FpApplyQueueItems = mndApplyQueueItems;
×
UNCOV
458
  pFsm->FpReConfigCb = NULL;
×
UNCOV
459
  pFsm->FpBecomeLeaderCb = mndBecomeLeader;
×
UNCOV
460
  pFsm->FpBecomeAssignedLeaderCb = NULL;
×
UNCOV
461
  pFsm->FpBecomeFollowerCb = mndBecomeFollower;
×
UNCOV
462
  pFsm->FpBecomeLearnerCb = mndBecomeLearner;
×
UNCOV
463
  pFsm->FpGetSnapshot = mndSyncGetSnapshot;
×
UNCOV
464
  pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo;
×
UNCOV
465
  pFsm->FpSnapshotStartRead = mndSnapshotStartRead;
×
UNCOV
466
  pFsm->FpSnapshotStopRead = mndSnapshotStopRead;
×
UNCOV
467
  pFsm->FpSnapshotDoRead = mndSnapshotDoRead;
×
UNCOV
468
  pFsm->FpSnapshotStartWrite = mndSnapshotStartWrite;
×
UNCOV
469
  pFsm->FpSnapshotStopWrite = mndSnapshotStopWrite;
×
UNCOV
470
  pFsm->FpSnapshotDoWrite = mndSnapshotDoWrite;
×
UNCOV
471
  return pFsm;
×
472
}
473

UNCOV
474
int32_t mndInitSync(SMnode *pMnode) {
×
UNCOV
475
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
×
UNCOV
476
  (void)taosThreadMutexInit(&pMgmt->lock, NULL);
×
UNCOV
477
  (void)taosThreadMutexLock(&pMgmt->lock);
×
UNCOV
478
  pMgmt->transId = 0;
×
UNCOV
479
  pMgmt->transSec = 0;
×
UNCOV
480
  pMgmt->transSeq = 0;
×
UNCOV
481
  (void)taosThreadMutexUnlock(&pMgmt->lock);
×
482

UNCOV
483
  SSyncInfo syncInfo = {
×
484
      .snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT,
485
      .batchSize = 1,
486
      .vgId = 1,
UNCOV
487
      .pWal = pMnode->pWal,
×
UNCOV
488
      .msgcb = &pMnode->msgCb,
×
489
      .syncSendMSg = mndSyncSendMsg,
490
      .syncEqMsg = mndSyncEqMsg,
491
      .syncEqCtrlMsg = mndSyncEqCtrlMsg,
492
      .pingMs = 5000,
493
      .electMs = 3000,
494
      .heartbeatMs = 500,
495
  };
496

UNCOV
497
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
×
UNCOV
498
  syncInfo.pFsm = mndSyncMakeFsm(pMnode);
×
499

UNCOV
500
  mInfo("vgId:1, start to open mnode sync, replica:%d selfIndex:%d", pMgmt->numOfReplicas, pMgmt->selfIndex);
×
UNCOV
501
  SSyncCfg *pCfg = &syncInfo.syncCfg;
×
UNCOV
502
  pCfg->totalReplicaNum = pMgmt->numOfTotalReplicas;
×
UNCOV
503
  pCfg->replicaNum = pMgmt->numOfReplicas;
×
UNCOV
504
  pCfg->myIndex = pMgmt->selfIndex;
×
UNCOV
505
  pCfg->lastIndex = pMgmt->lastIndex;
×
UNCOV
506
  for (int32_t i = 0; i < pMgmt->numOfTotalReplicas; ++i) {
×
UNCOV
507
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
×
UNCOV
508
    pNode->nodeId = pMgmt->replicas[i].id;
×
UNCOV
509
    pNode->nodePort = pMgmt->replicas[i].port;
×
UNCOV
510
    tstrncpy(pNode->nodeFqdn, pMgmt->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
×
UNCOV
511
    pNode->nodeRole = pMgmt->nodeRoles[i];
×
UNCOV
512
    bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
×
UNCOV
513
    mInfo("vgId:1, index:%d ep:%s:%u dnode:%d cluster:%" PRId64 ", update:%d", i, pNode->nodeFqdn, pNode->nodePort,
×
514
          pNode->nodeId, pNode->clusterId, update);
515
  }
516

UNCOV
517
  int32_t code = 0;
×
UNCOV
518
  if ((code = tsem_init(&pMgmt->syncSem, 0, 0)) < 0) {
×
519
    mError("failed to open sync, tsem_init, since %s", tstrerror(code));
×
520
    TAOS_RETURN(code);
×
521
  }
UNCOV
522
  pMgmt->sync = syncOpen(&syncInfo, 1);  // always check
×
UNCOV
523
  if (pMgmt->sync <= 0) {
×
524
    if (terrno != 0) code = terrno;
×
525
    mError("failed to open sync since %s", tstrerror(code));
×
526
    TAOS_RETURN(code);
×
527
  }
UNCOV
528
  pMnode->pSdb->sync = pMgmt->sync;
×
529

UNCOV
530
  mInfo("vgId:1, mnode sync is opened, id:%" PRId64, pMgmt->sync);
×
UNCOV
531
  TAOS_RETURN(code);
×
532
}
533

UNCOV
534
void mndCleanupSync(SMnode *pMnode) {
×
UNCOV
535
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
×
UNCOV
536
  syncStop(pMgmt->sync);
×
UNCOV
537
  mInfo("mnode-sync is stopped, id:%" PRId64, pMgmt->sync);
×
538

UNCOV
539
  if (tsem_destroy(&pMgmt->syncSem) < 0) {
×
540
    mError("failed to destroy sem");
×
541
  }
UNCOV
542
  (void)taosThreadMutexDestroy(&pMgmt->lock);
×
UNCOV
543
  memset(pMgmt, 0, sizeof(SSyncMgmt));
×
UNCOV
544
}
×
545

UNCOV
546
void mndSyncCheckTimeout(SMnode *pMnode) {
×
UNCOV
547
  mTrace("check sync timeout");
×
UNCOV
548
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
×
UNCOV
549
  (void)taosThreadMutexLock(&pMgmt->lock);
×
UNCOV
550
  if (pMgmt->transId != 0) {
×
UNCOV
551
    int32_t curSec = taosGetTimestampSec();
×
UNCOV
552
    int32_t delta = curSec - pMgmt->transSec;
×
UNCOV
553
    if (delta > MNODE_TIMEOUT_SEC) {
×
554
      mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
×
555
             pMgmt->transSec, curSec, delta, pMgmt->transSeq);
556
      // pMgmt->transId = 0;
557
      // pMgmt->transSec = 0;
558
      // pMgmt->transSeq = 0;
559
      // terrno = TSDB_CODE_SYN_TIMEOUT;
560
      // pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
561
      // if (tsem_post(&pMgmt->syncSem) < 0) {
562
      //  mError("failed to post sem");
563
      //}
564
    } else {
UNCOV
565
      mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
×
566
             pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq);
567
    }
568
  } else {
569
    // mTrace("check sync timeout msg, no trans waiting for confirm");
570
  }
UNCOV
571
  (void)taosThreadMutexUnlock(&pMgmt->lock);
×
UNCOV
572
}
×
573

UNCOV
574
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
×
UNCOV
575
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
×
576

UNCOV
577
  SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
×
UNCOV
578
  if (req.contLen <= 0) return terrno;
×
579

UNCOV
580
  req.pCont = rpcMallocCont(req.contLen);
×
UNCOV
581
  if (req.pCont == NULL) return terrno;
×
UNCOV
582
  memcpy(req.pCont, pRaw, req.contLen);
×
583

UNCOV
584
  (void)taosThreadMutexLock(&pMgmt->lock);
×
UNCOV
585
  pMgmt->errCode = 0;
×
586

UNCOV
587
  if (pMgmt->transId != 0) {
×
588
    mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
×
589
    (void)taosThreadMutexUnlock(&pMgmt->lock);
×
590
    rpcFreeCont(req.pCont);
×
591
    TAOS_RETURN(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED);
×
592
  }
593

UNCOV
594
  mInfo("trans:%d, will be proposed", transId);
×
UNCOV
595
  pMgmt->transId = transId;
×
UNCOV
596
  pMgmt->transSec = taosGetTimestampSec();
×
597

UNCOV
598
  int64_t seq = 0;
×
UNCOV
599
  int32_t code = syncPropose(pMgmt->sync, &req, false, &seq);
×
UNCOV
600
  if (code == 0) {
×
UNCOV
601
    mInfo("trans:%d, is proposing and wait sem, seq:%" PRId64, transId, seq);
×
UNCOV
602
    pMgmt->transSeq = seq;
×
UNCOV
603
    (void)taosThreadMutexUnlock(&pMgmt->lock);
×
UNCOV
604
    code = tsem_wait(&pMgmt->syncSem);
×
UNCOV
605
  } else if (code > 0) {
×
606
    mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
×
607
    pMgmt->transId = 0;
×
608
    pMgmt->transSec = 0;
×
609
    pMgmt->transSeq = 0;
×
610
    (void)taosThreadMutexUnlock(&pMgmt->lock);
×
611
    code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
×
612
    if (code == 0) {
×
613
      sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
×
614
    }
615
  } else {
UNCOV
616
    mError("trans:%d, failed to proposed since %s", transId, terrstr());
×
UNCOV
617
    pMgmt->transId = 0;
×
UNCOV
618
    pMgmt->transSec = 0;
×
UNCOV
619
    pMgmt->transSeq = 0;
×
UNCOV
620
    (void)taosThreadMutexUnlock(&pMgmt->lock);
×
UNCOV
621
    if (terrno == 0) {
×
622
      terrno = TSDB_CODE_APP_ERROR;
×
623
    }
624
  }
625

UNCOV
626
  rpcFreeCont(req.pCont);
×
UNCOV
627
  req.pCont = NULL;
×
UNCOV
628
  if (code != 0) {
×
UNCOV
629
    mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code);
×
UNCOV
630
    return code;
×
631
  }
632

UNCOV
633
  terrno = pMgmt->errCode;
×
UNCOV
634
  return terrno;
×
635
}
636

UNCOV
637
void mndSyncStart(SMnode *pMnode) {
×
UNCOV
638
  mInfo("vgId:1, start to start mnode sync");
×
UNCOV
639
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
×
UNCOV
640
  if (syncStart(pMgmt->sync) < 0) {
×
641
    mError("vgId:1, failed to start sync, id:%" PRId64, pMgmt->sync);
×
642
    return;
×
643
  }
UNCOV
644
  mInfo("vgId:1, mnode sync started, id:%" PRId64, pMgmt->sync);
×
645
}
646

UNCOV
647
void mndSyncStop(SMnode *pMnode) {
×
UNCOV
648
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
×
649

UNCOV
650
  (void)taosThreadMutexLock(&pMgmt->lock);
×
UNCOV
651
  if (pMgmt->transId != 0) {
×
UNCOV
652
    mInfo("vgId:1, trans:%d, is stopped and post sem", pMgmt->transId);
×
UNCOV
653
    pMgmt->transId = 0;
×
UNCOV
654
    pMgmt->transSec = 0;
×
UNCOV
655
    pMgmt->errCode = TSDB_CODE_APP_IS_STOPPING;
×
UNCOV
656
    if (tsem_post(&pMgmt->syncSem) < 0) {
×
657
      mError("failed to post sem");
×
658
    }
659
  }
UNCOV
660
  (void)taosThreadMutexUnlock(&pMgmt->lock);
×
UNCOV
661
}
×
662

UNCOV
663
bool mndIsLeader(SMnode *pMnode) {
×
UNCOV
664
  terrno = 0;
×
UNCOV
665
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
×
666

UNCOV
667
  if (terrno != 0) {
×
668
    mDebug("vgId:1, mnode is stopping");
×
669
    return false;
×
670
  }
671

UNCOV
672
  if (state.state != TAOS_SYNC_STATE_LEADER) {
×
UNCOV
673
    terrno = TSDB_CODE_SYN_NOT_LEADER;
×
UNCOV
674
    mDebug("vgId:1, mnode not leader, state:%s", syncStr(state.state));
×
UNCOV
675
    return false;
×
676
  }
677

UNCOV
678
  if (!state.restored || !pMnode->restored) {
×
UNCOV
679
    terrno = TSDB_CODE_SYN_RESTORING;
×
UNCOV
680
    mDebug("vgId:1, mnode not restored:%d:%d", state.restored, pMnode->restored);
×
UNCOV
681
    return false;
×
682
  }
683

UNCOV
684
  return true;
×
685
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc