• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5034

24 Apr 2026 11:25AM UTC coverage: 73.058%. Remained the same
#5034

push

travis-ci

web-flow
merge: from main to 3.0 branch #35224

merge: from main to 3.0 branch[manual-only]

1336 of 1975 new or added lines in 48 files covered. (67.65%)

14149 existing lines in 164 files now uncovered.

275896 of 377640 relevant lines covered (73.06%)

132944440.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.04
/source/libs/sync/src/syncPipeline.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17

18
#include "syncCommit.h"
19
#include "syncIndexMgr.h"
20
#include "syncInt.h"
21
#include "syncPipeline.h"
22
#include "syncRaftCfg.h"
23
#include "syncRaftEntry.h"
24
#include "syncRaftStore.h"
25
#include "syncReplication.h"
26
#include "syncRespMgr.h"
27
#include "syncSnapshot.h"
28
#include "syncUtil.h"
29
#include "syncVoteMgr.h"
30

31
static int64_t tsLogBufferMemoryUsed = 0;  // total bytes of vnode log buffer
32

33
static bool syncIsMsgBlock(tmsg_t type) {
517,479,085✔
34
  return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
513,696,287✔
35
         (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
1,031,175,372✔
36
}
37

38
FORCE_INLINE static int64_t syncGetRetryMaxWaitMs() {
39
  return SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF);
3,608,632✔
40
}
41

42
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
787,278,043✔
43
  (void)taosThreadMutexLock(&pBuf->mutex);
787,278,043✔
44
  int64_t index = pBuf->endIndex;
787,281,997✔
45
  (void)taosThreadMutexUnlock(&pBuf->mutex);
787,282,816✔
46
  return index;
787,281,791✔
47
}
48

49
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
787,252,854✔
50
  int32_t code = 0;
787,252,854✔
51
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
787,252,854✔
52
  (void)taosThreadMutexLock(&pBuf->mutex);
787,276,500✔
53
  SyncIndex index = pEntry->index;
787,279,687✔
54

55
  if (index - pBuf->startIndex >= pBuf->size) {
787,279,245✔
56
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
57
    sError("vgId:%d, failed to append since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
×
58
    goto _err;
×
59
  }
60

61
  if (pNode->restoreFinish && index - pBuf->commitIndex >= TSDB_SYNC_NEGOTIATION_WIN) {
787,256,239✔
62
    code = TSDB_CODE_SYN_NEGOTIATION_WIN_FULL;
×
63
    sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64, pNode->vgId, tstrerror(code),
×
64
           index, pBuf->commitIndex);
65
    goto _err;
×
66
  }
67

68
  SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
787,266,515✔
69
  if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= tsSyncApplyQueueSize) {
787,273,266✔
70
    code = TSDB_CODE_SYN_WRITE_STALL;
×
71
    sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64,
×
72
           pNode->vgId, tstrerror(code), index, pBuf->commitIndex, appliedIndex);
73
    goto _err;
×
74
  }
75

76
  if (index != pBuf->endIndex) {
787,273,231✔
77
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
78
    goto _err;
×
79
  };
80

81
  SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem;
787,269,608✔
82
  if (pExist != NULL) {
787,265,707✔
83
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
84
    goto _err;
×
85
  }
86

87
  // initial log buffer with at least one item, e.g, commitIndex
88
  SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem;
787,265,707✔
89
  if (pMatch == NULL) {
787,266,866✔
90
    sError("vgId:%d, no matched log entry", pNode->vgId);
×
91
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
92
    goto _err;
×
93
  }
94
  if (pMatch->index + 1 != index) {
787,266,866✔
95
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
96
    goto _err;
×
97
  }
98
  if (!(pMatch->term <= pEntry->term)) {
787,265,288✔
99
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
100
    goto _err;
×
101
  }
102

103
  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
787,258,318✔
104
  pBuf->entries[index % pBuf->size] = tmp;
787,266,668✔
105
  pBuf->endIndex = index + 1;
787,260,022✔
106
  if (pNode->vgId > 1) {
787,261,891✔
107
    pBuf->bytes += pEntry->bytes;
743,182,055✔
108
    (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
743,173,767✔
109
  }
110

111
  (void)taosThreadMutexUnlock(&pBuf->mutex);
787,262,865✔
112
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
787,284,261✔
113
  return 0;
787,275,594✔
114

115
_err:
×
116
  (void)taosThreadMutexUnlock(&pBuf->mutex);
×
117
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
×
118
  taosMsleep(1);
×
119
  TAOS_RETURN(code);
×
120
}
121

122
int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pSyncTerm) {
368,079,325✔
123
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
368,079,325✔
124
  SSyncRaftEntry* pEntry = NULL;
368,078,726✔
125
  SyncIndex       prevIndex = index - 1;
368,077,937✔
126
  SyncTerm        prevLogTerm = -1;
368,077,937✔
127
  int32_t         code = 0;
368,077,937✔
128

129
  if (prevIndex == -1 && pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore) == 0) {
368,077,937✔
130
    *pSyncTerm = 0;
726,781✔
131
    return 0;
726,781✔
132
  }
133

134
  if (prevIndex > pBuf->matchIndex) {
367,351,156✔
135
    *pSyncTerm = -1;
56,480✔
136
    TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
56,480✔
137
  }
138

139
  if (index - 1 != prevIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
367,296,287✔
140

141
  if (prevIndex >= pBuf->startIndex) {
367,296,287✔
142
    pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem;
232,697,111✔
143
    if (pEntry == NULL) {
232,696,322✔
144
      sError("vgId:%d, failed to get pre log term since no log entry found", pNode->vgId);
×
145
      *pSyncTerm = -1;
×
146
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
147
    }
148
    prevLogTerm = pEntry->term;
232,696,322✔
149
    *pSyncTerm = prevLogTerm;
232,697,111✔
150
    return 0;
232,696,322✔
151
  }
152

153
  if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) {
134,599,293✔
154
    int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs;
130,334,326✔
155
    if (timeMs == 0) {
130,334,326✔
156
      sError("vgId:%d, failed to get pre log term since timeMs is 0", pNode->vgId);
×
157
      *pSyncTerm = -1;
×
158
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
159
    }
160
    prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term;
130,334,326✔
161
    if (!(prevIndex == 0 || prevLogTerm != 0)) {
130,334,326✔
162
      sError("vgId:%d, failed to get pre log term prevIndex:%" PRId64 ", prevLogTerm:%" PRId64, pNode->vgId, prevIndex,
×
163
             prevLogTerm);
164
      *pSyncTerm = -1;
×
165
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
166
    }
167
    *pSyncTerm = prevLogTerm;
130,334,326✔
168
    return 0;
130,334,326✔
169
  }
170

171
  SSnapshot snapshot = {0};
4,264,967✔
172
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
4,264,967✔
173
  if (prevIndex == snapshot.lastApplyIndex) {
4,264,967✔
174
    *pSyncTerm = snapshot.lastApplyTerm;
4,061✔
175
    return 0;
4,061✔
176
  }
177

178
  if ((code = pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, prevIndex, &pEntry)) == 0) {
4,260,906✔
179
    prevLogTerm = pEntry->term;
4,229,491✔
180
    syncEntryDestroy(pEntry);
4,229,491✔
181
    pEntry = NULL;
4,229,491✔
182
    *pSyncTerm = prevLogTerm;
4,229,491✔
183
    return 0;
4,229,491✔
184
  }
185

186
  *pSyncTerm = -1;
31,415✔
187
  sInfo("vgId:%d, failed to get log term since %s, index:%" PRId64, pNode->vgId, tstrerror(code), prevIndex);
31,415✔
188
  TAOS_RETURN(code);
31,415✔
189
}
190

191
SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId) {
5,180,938✔
192
  return syncEntryBuildNoop(term, index, vgId);
5,180,938✔
193
}
194

195
int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex) {
5,181,409✔
196
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
5,181,409✔
197
  if (firstVer > commitIndex + 1) {
5,181,409✔
198
    sError("vgId:%d, firstVer of WAL log greater than tsdb commit version + 1, firstVer:%" PRId64
×
199
           ", tsdb commit version:%" PRId64,
200
           pNode->vgId, firstVer, commitIndex);
201
    return TSDB_CODE_WAL_LOG_INCOMPLETE;
×
202
  }
203

204
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
5,181,409✔
205
  if (lastVer < commitIndex) {
5,180,857✔
206
    sError("vgId:%d, lastVer of WAL log less than tsdb commit version, lastVer:%" PRId64
×
207
           ", tsdb commit version:%" PRId64,
208
           pNode->vgId, lastVer, commitIndex);
209
    return TSDB_CODE_WAL_LOG_INCOMPLETE;
×
210
  }
211

212
  return 0;
5,180,857✔
213
}
214

215
int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
5,181,409✔
216
  if (pNode->pLogStore == NULL) {
5,181,409✔
217
    sError("log store not created");
×
218
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
219
  }
220
  if (pNode->pFsm == NULL) {
5,181,409✔
221
    sError("pFsm not registered");
×
222
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
223
  }
224
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
5,181,409✔
225
    sError("FpGetSnapshotInfo not registered");
×
226
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
227
  }
228

229
  int32_t   code = 0, lino = 0;
5,181,409✔
230
  SSnapshot snapshot = {0};
5,181,409✔
231
  TAOS_CHECK_EXIT(pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot));
5,181,409✔
232

233
  SyncIndex commitIndex = snapshot.lastApplyIndex;
5,181,409✔
234
  SyncTerm  commitTerm = TMAX(snapshot.lastApplyTerm, 0);
5,181,409✔
235
  TAOS_CHECK_EXIT(syncLogValidateAlignmentOfCommit(pNode, commitIndex));
5,181,409✔
236

237
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
5,181,409✔
238
  if (lastVer < commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
5,181,409✔
239
  ;
240
  SyncIndex toIndex = lastVer;
5,181,409✔
241
  // update match index
242
  pBuf->commitIndex = commitIndex;
5,181,409✔
243
  pBuf->matchIndex = toIndex;
5,181,409✔
244
  pBuf->endIndex = toIndex + 1;
5,181,409✔
245

246
  // load log entries in reverse order
247
  SSyncLogStore*  pLogStore = pNode->pLogStore;
5,181,409✔
248
  SyncIndex       index = toIndex;
5,181,409✔
249
  SSyncRaftEntry* pEntry = NULL;
5,181,409✔
250
  bool            takeDummy = false;
5,181,409✔
251
  int             emptySize = (TSDB_SYNC_LOG_BUFFER_SIZE >> 1);
5,181,409✔
252

253
  while (true) {
37,278,145✔
254
    if (index <= pBuf->commitIndex) {
42,459,554✔
255
      takeDummy = true;
5,180,938✔
256
      break;
5,180,938✔
257
    }
258

259
    if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) {
37,278,843✔
260
      sWarn("vgId:%d, failed to get log entry since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
×
261
      break;
×
262
    }
263
#ifdef USE_MOUNT
264
    if (pNode->mountVgId) {
37,276,741✔
265
      SMsgHead* pMsgHead = (SMsgHead*)pEntry->data;
4,848✔
266
      if (pMsgHead->vgId != pNode->vgId) pMsgHead->vgId = pNode->vgId;
4,848✔
267
    }
268
#endif
269
    bool taken = false;
37,278,591✔
270
    if (toIndex - index + 1 <= pBuf->size - emptySize) {
37,278,591✔
271
      SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1};
37,278,647✔
272
      pBuf->entries[index % pBuf->size] = tmp;
37,278,647✔
273
      taken = true;
37,278,748✔
274
      if (pNode->vgId > 1) {
37,278,748✔
275
        pBuf->bytes += pEntry->bytes;
37,154,313✔
276
        (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
37,154,093✔
277
      }
278
    }
279

280
    if (index < toIndex) {
37,277,995✔
281
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = pEntry->index;
36,322,536✔
282
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = pEntry->term;
36,323,572✔
283
    }
284

285
    if (!taken) {
37,278,616✔
286
      syncEntryDestroy(pEntry);
471✔
287
      pEntry = NULL;
471✔
288
      break;
471✔
289
    }
290

291
    index--;
37,278,145✔
292
  }
293

294
  // put a dummy record at commitIndex if present in log buffer
295
  if (takeDummy) {
5,181,409✔
296
    if (index != pBuf->commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
5,180,938✔
297

298
    SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId);
5,180,938✔
299
    if (pDummy == NULL) {
5,180,860✔
300
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
301
    }
302
    SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm};
5,180,860✔
303
    pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp;
5,180,860✔
304
    if (pNode->vgId > 1) {
5,180,938✔
305
      pBuf->bytes += pDummy->bytes;
4,687,915✔
306
      (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pDummy->bytes);
4,687,993✔
307
    }
308

309
    if (index < toIndex) {
5,180,938✔
310
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex;
955,082✔
311
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = commitTerm;
955,082✔
312
    }
313
  }
314

315
  // update startIndex
316
  pBuf->startIndex = takeDummy ? index : index + 1;
5,181,409✔
317

318
  pBuf->isCatchup = false;
5,181,409✔
319

320
  sInfo("vgId:%d, init sync log buffer, buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
5,181,409✔
321
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
322

323
  // validate
324
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
5,181,409✔
325
  return 0;
5,181,409✔
326

327
_exit:
×
328
  if (code != 0) {
×
329
    sError("vgId:%d, failed to initialize sync log buffer at line %d since %s.", pNode->vgId, lino, tstrerror(code));
×
330
  }
331
  TAOS_RETURN(code);
×
332
}
333

334
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
5,150,096✔
335
  (void)taosThreadMutexLock(&pBuf->mutex);
5,150,096✔
336
  int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
5,150,107✔
337
  (void)taosThreadMutexUnlock(&pBuf->mutex);
5,150,107✔
338
  return ret;
5,150,107✔
339
}
340

341
int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
31,302✔
342
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
31,302✔
343
  (void)taosThreadMutexLock(&pBuf->mutex);
31,302✔
344
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
66,700✔
345
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
35,398✔
346
    if (pEntry == NULL) continue;
35,398✔
347
    syncEntryDestroy(pEntry);
31,430✔
348
    pEntry = NULL;
31,430✔
349
    (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
31,430✔
350
  }
351
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
31,302✔
352
  pBuf->bytes = 0;
31,302✔
353
  int32_t code = syncLogBufferInitWithoutLock(pBuf, pNode);
31,302✔
354
  if (code < 0) {
31,302✔
355
    sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code));
×
356
  }
357
  (void)taosThreadMutexUnlock(&pBuf->mutex);
31,302✔
358
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
31,302✔
359
  return code;
31,302✔
360
}
361

362
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTermWithoutLock(SSyncLogBuffer* pBuf) {
363
  SyncIndex       index = pBuf->matchIndex;
388,568,924✔
364
  SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
388,568,924✔
365
  if (pEntry == NULL) {
388,567,609✔
366
    sError("failed to get last match term since entry is null");
×
367
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
368
    return -1;
×
369
  }
370
  return pEntry->term;
388,567,913✔
371
}
372

373
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
28,171,558✔
374
  (void)taosThreadMutexLock(&pBuf->mutex);
28,171,558✔
375
  SyncTerm term = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
28,171,558✔
376
  (void)taosThreadMutexUnlock(&pBuf->mutex);
28,171,558✔
377
  return term;
28,171,558✔
378
}
379

380
bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf) {
28,171,558✔
381
  (void)taosThreadMutexLock(&pBuf->mutex);
28,171,558✔
382
  bool empty = (pBuf->endIndex <= pBuf->startIndex);
28,171,558✔
383
  (void)taosThreadMutexUnlock(&pBuf->mutex);
28,171,558✔
384
  return empty;
28,171,558✔
385
}
386

387
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
360,397,366✔
388
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
360,397,366✔
389
  (void)taosThreadMutexLock(&pBuf->mutex);
360,397,049✔
390
  int32_t         code = 0;
360,397,366✔
391
  SyncIndex       index = pEntry->index;
360,397,366✔
392
  SyncIndex       prevIndex = pEntry->index - 1;
360,397,366✔
393
  SyncTerm        lastMatchTerm = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
360,396,909✔
394
  SSyncRaftEntry* pExist = NULL;
360,396,909✔
395
  bool            inBuf = true;
360,396,909✔
396

397
  if (lastMatchTerm < 0) {
360,396,909✔
398
    sError("vgId:%d, failed to accept, lastMatchTerm:%" PRId64, pNode->vgId, lastMatchTerm);
×
399
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
400
    goto _out;
×
401
  }
402

403
  if (index <= pBuf->commitIndex) {
360,396,909✔
404
    sTrace("vgId:%d, already committed, index:%" PRId64 ", term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64
5,656,593✔
405
           " %" PRId64 ", %" PRId64 ")",
406
           pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
407
           pBuf->endIndex);
408
    SyncTerm term = -1;
5,656,593✔
409
    code = syncLogReplGetPrevLogTerm(NULL, pNode, index + 1, &term);
5,656,593✔
410
    if (pEntry->term < 0) {
5,656,710✔
411
      sError("vgId:%d, failed to accept, pEntry->term:%" PRId64, pNode->vgId, pEntry->term);
×
412
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
413
      goto _out;
×
414
    }
415
    if (term == pEntry->term) {
5,656,710✔
416
      code = 0;
5,656,710✔
417
    }
418
    goto _out;
5,656,710✔
419
  }
420

421
  if (pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER && index > 0 &&
354,740,316✔
422
      index > pBuf->totalIndex) {
61,890,354✔
423
    pBuf->totalIndex = index;
819,758✔
424
    sTrace("vgId:%d, update learner progress, index:%" PRId64 ", term:%" PRId64 ": prevterm:%" PRId64
819,758✔
425
           " != lastmatch:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
426
           pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
427
           pBuf->matchIndex, pBuf->endIndex);
428
  }
429

430
  if (index - pBuf->startIndex >= pBuf->size) {
354,740,341✔
431
    sWarn("vgId:%d, out of buffer range, index:%" PRId64 ", term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64
2,669✔
432
          " %" PRId64 ", %" PRId64 ")",
433
          pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
434
          pBuf->endIndex);
435
    code = TSDB_CODE_OUT_OF_RANGE;
2,669✔
436
    goto _out;
2,669✔
437
  }
438

439
  if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) {
354,737,543✔
440
    sWarn("vgId:%d, not ready to accept, index:%" PRId64 ", term:%" PRId64 ": prevterm:%" PRId64
233,141✔
441
          " != lastmatch:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
442
          pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
443
          pBuf->matchIndex, pBuf->endIndex);
444
    code = TSDB_CODE_ACTION_IN_PROGRESS;
233,141✔
445
    goto _out;
233,141✔
446
  }
447

448
  // check current in buffer
449
  code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pExist);
354,504,742✔
450
  if (pExist != NULL) {
354,504,214✔
451
    if (pEntry->index != pExist->index) {
644,887✔
452
      sError("vgId:%d, failed to accept, pEntry->index:%" PRId64 ", pExist->index:%" PRId64, pNode->vgId, pEntry->index,
×
453
             pExist->index);
454
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
455
      goto _out;
×
456
    }
457
    if (pEntry->term != pExist->term) {
644,887✔
458
      TAOS_CHECK_GOTO(syncLogBufferRollback(pBuf, pNode, index), NULL, _out);
1,168✔
459
    } else {
460
      sTrace("vgId:%d, duplicate log entry received, index:%" PRId64 ", term:%" PRId64 ", log buffer: [%" PRId64
643,719✔
461
             " %" PRId64 " %" PRId64 ", %" PRId64 ")",
462
             pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
463
             pBuf->endIndex);
464
      SyncTerm existPrevTerm = -1;
643,719✔
465
      TAOS_CHECK_GOTO(syncLogReplGetPrevLogTerm(NULL, pNode, index, &existPrevTerm), NULL, _out);
643,719✔
466
      if (!(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm))) {
587,239✔
467
        sError("vgId:%d, failed to accept, pEntry->term:%" PRId64 ", pExist->indexpExist->term:%" PRId64
×
468
               ", pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64 ", prevTerm:%" PRId64
469
               ", existPrevTerm:%" PRId64,
470
               pNode->vgId, pEntry->term, pExist->term, pEntry->index, pBuf->matchIndex, prevTerm, existPrevTerm);
471
        code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
472
        goto _out;
×
473
      }
474
      code = 0;
587,239✔
475
      goto _out;
587,239✔
476
    }
477
  }
478

479
  // update
480
  if (!(pBuf->startIndex < index)) {
353,860,495✔
481
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
482
    goto _out;
×
483
  };
484
  if (!(index - pBuf->startIndex < pBuf->size)) {
353,860,810✔
485
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
486
    goto _out;
×
487
  }
488
  if (pBuf->entries[index % pBuf->size].pItem != NULL) {
353,861,127✔
489
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
490
    goto _out;
×
491
  }
492
  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
353,860,811✔
493
  pBuf->entries[index % pBuf->size] = tmp;
353,860,811✔
494
  if (pNode->vgId > 1) {
353,860,812✔
495
    pBuf->bytes += pEntry->bytes;
350,522,845✔
496
    (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
350,523,940✔
497
  }
498
  pEntry = NULL;
353,861,023✔
499

500
  // update end index
501
  pBuf->endIndex = TMAX(index + 1, pBuf->endIndex);
353,861,023✔
502

503
  // success
504
  code = 0;
353,861,127✔
505

506
_out:
360,397,366✔
507
  syncEntryDestroy(pEntry);
360,397,366✔
508
  if (!inBuf) {
360,396,422✔
509
    syncEntryDestroy(pExist);
×
510
    pExist = NULL;
×
511
  }
512
  (void)taosThreadMutexUnlock(&pBuf->mutex);
360,396,422✔
513
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
360,396,709✔
514
  TAOS_RETURN(code);
360,396,282✔
515
}
516

517
static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replicaNum) {
1,141,113,004✔
518
  return (replicaNum > 1) && (pEntry->originalRpcType == TDMT_VND_COMMIT);
1,141,113,004✔
519
}
520

521
int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
1,141,110,265✔
522
  int32_t code = 0;
1,141,110,265✔
523
  if (pEntry->index < 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
1,141,110,265✔
524
  SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore);
1,141,109,697✔
525
  if (lastVer >= pEntry->index && (code = pLogStore->syncLogTruncate(pLogStore, pEntry->index)) < 0) {
1,141,123,947✔
526
    sError("failed to truncate log store since %s, from index:%" PRId64, tstrerror(code), pEntry->index);
×
527
    TAOS_RETURN(code);
×
528
  }
529
  lastVer = pLogStore->syncLogLastIndex(pLogStore);
1,141,123,947✔
530
  if (pEntry->index != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
1,141,111,979✔
531

532
#ifdef USE_MOUNT
533
  if (pNode->mountVgId) {
1,141,102,157✔
534
    SMsgHead* pHead = (SMsgHead*)pEntry->data;
2,456✔
535
    if (pHead->vgId != pNode->mountVgId) pHead->vgId = pNode->mountVgId;
2,456✔
536
  }
537
#endif
538
  bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum);
1,141,115,435✔
539
  if ((code = pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync)) < 0) {
1,141,100,630✔
540
    sError("failed to persist raft entry since %s, index:%" PRId64 ", term:%" PRId64, tstrerror(code), pEntry->index,
×
541
           pEntry->term);
542
    TAOS_RETURN(code);
×
543
  }
544

545
  lastVer = pLogStore->syncLogLastIndex(pLogStore);
1,141,090,006✔
546
  if (pEntry->index != lastVer) return TSDB_CODE_SYN_INTERNAL_ERROR;
1,141,108,637✔
547
  return 0;
1,141,113,079✔
548
}
549

550
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char* str, const SRpcMsg *pMsg) {
1,147,663,967✔
551
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
1,147,663,967✔
552
  (void)taosThreadMutexLock(&pBuf->mutex);
1,147,684,906✔
553

554
  SSyncLogStore* pLogStore = pNode->pLogStore;
1,147,660,175✔
555
  int64_t        matchIndex = pBuf->matchIndex;
1,147,671,691✔
556
  int32_t        code = 0;
1,147,666,138✔
557

558
  while (pBuf->matchIndex + 1 < pBuf->endIndex) {
2,147,483,647✔
559
    int64_t index = pBuf->matchIndex + 1;
1,239,637,291✔
560
    if (index < 0) {
1,239,617,775✔
561
      sError("vgId:%d, failed to proceed index:%" PRId64, pNode->vgId, index);
×
562
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
563
      goto _out;
×
564
    }
565

566
    // try to proceed
567
    SSyncLogBufEntry* pBufEntry = &pBuf->entries[index % pBuf->size];
1,239,617,775✔
568
    SyncIndex         prevLogIndex = pBufEntry->prevLogIndex;
1,239,627,912✔
569
    SyncTerm          prevLogTerm = pBufEntry->prevLogTerm;
1,239,642,727✔
570
    SSyncRaftEntry*   pEntry = pBufEntry->pItem;
1,239,620,690✔
571
    if (pEntry == NULL) {
1,239,595,181✔
572
      sTrace("vgId:%d, msg:%p, cannot proceed match index in log buffer, no raft entry at next pos of matchIndex:%" PRId64,
98,517,699✔
573
             pNode->vgId, pMsg, pBuf->matchIndex);
574
      goto _out;
98,517,699✔
575
    }
576

577
    if (index != pEntry->index) {
1,141,077,482✔
578
      sError("vgId:%d, msg:%p, failed to proceed index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId, pMsg, index, pEntry->index);
×
579
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
580
      goto _out;
×
581
    }
582

583
    // match
584
    SSyncRaftEntry* pMatch = pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem;
1,141,095,868✔
585
    if (pMatch == NULL) {
1,141,079,507✔
586
      sError("vgId:%d, msg:%p, failed to proceed since pMatch is null", pNode->vgId, pMsg);
×
587
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
588
      goto _out;
×
589
    }
590
    if (pMatch->index != pBuf->matchIndex) {
1,141,079,507✔
591
      sError("vgId:%d, msg:%p, failed to proceed, pMatch->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId,
×
592
             pMsg, pMatch->index, pBuf->matchIndex);
593
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
594
      goto _out;
×
595
    }
596
    if (pMatch->index + 1 != pEntry->index) {
1,141,082,101✔
597
      sError("vgId:%d, msg:%p, failed to proceed, pMatch->index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId, pMsg,
×
598
             pMatch->index, pEntry->index);
599
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
600
      goto _out;
×
601
    }
602
    if (prevLogIndex != pMatch->index) {
1,141,076,527✔
603
      sError("vgId:%d, msg:%p, failed to proceed, prevLogIndex:%" PRId64 ", pMatch->index:%" PRId64, pNode->vgId, pMsg,
×
604
             prevLogIndex, pMatch->index);
605
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
606
      goto _out;
×
607
    }
608

609
    if (pMatch->term != prevLogTerm) {
1,141,062,684✔
610
      sInfo(
×
611
          "vgId:%d, msg:%p, mismatching sync log entries encountered, "
612
          "{ index:%" PRId64 ", term:%" PRId64
613
          " } "
614
          "{ index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64 ", prevLogTerm:%" PRId64 " } ",
615
          pNode->vgId, pMsg, pMatch->index, pMatch->term, pEntry->index, pEntry->term, prevLogIndex, prevLogTerm);
616
      goto _out;
×
617
    }
618

619
    // increase match index
620
    pBuf->matchIndex = index;
1,141,079,047✔
621

622
    sGDebug(pMsg ? &pMsg->info.traceId : NULL,
1,141,115,386✔
623
            "vgId:%d, msg:%p, log buffer proceed, start index:%" PRId64 ", match index:%" PRId64 ", end index:%" PRId64,
624
            pNode->vgId, pMsg, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex);
625

626
    // persist
627
    if ((code = syncLogStorePersist(pLogStore, pNode, pEntry)) < 0) {
1,141,115,386✔
628
      sError("vgId:%d, msg:%p, failed to persist sync log entry from buffer since %s, index:%" PRId64, pNode->vgId,
×
629
             pMsg, tstrerror(code), pEntry->index);
630
      taosMsleep(1);
×
631
      goto _out;
×
632
    }
633

634
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
1,141,092,102✔
635
      if (pNode->pLogBuf->commitIndex == pEntry->index - 1) {
×
636
        sInfo(
×
637
            "vgId:%d, msg:%p, to change config at %s, "
638
            "current entry, index:%" PRId64 ", term:%" PRId64
639
            ", "
640
            "node, restore:%d, commitIndex:%" PRId64
641
            ", "
642
            "cond: (pre entry index:%" PRId64 "== buf commit index:%" PRId64 ")",
643
            pNode->vgId, pMsg, str, pEntry->index, pEntry->term, pNode->restoreFinish, pNode->commitIndex,
644
            pEntry->index - 1, pNode->pLogBuf->commitIndex);
645
        if ((code = syncNodeChangeConfig(pNode, pEntry, str)) != 0) {
×
646
          sError("vgId:%d, failed to change config from Append since %s, index:%" PRId64, pNode->vgId, tstrerror(code),
×
647
                 pEntry->index);
648
          goto _out;
×
649
        }
650
      } else {
651
        sInfo(
×
652
            "vgId:%d, msg:%p, delay change config from Node %s, "
653
            "curent entry, index:%" PRId64 ", term:%" PRId64
654
            ", "
655
            "node, commitIndex:%" PRId64 ",  pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
656
            "), "
657
            "cond:( pre entry index:%" PRId64 " != buf commit index:%" PRId64 ")",
658
            pNode->vgId, pMsg, str, pEntry->index, pEntry->term, pNode->commitIndex, pNode->pLogBuf->startIndex,
659
            pNode->pLogBuf->commitIndex, pNode->pLogBuf->matchIndex, pNode->pLogBuf->endIndex, pEntry->index - 1,
660
            pNode->pLogBuf->commitIndex);
661
      }
662
    }
663

664
    // replicate on demand
665
    if ((code = syncNodeReplicateWithoutLock(pNode)) != 0) {
1,141,111,754✔
666
      sError("vgId:%d, msg:%p, failed to replicate since %s, index:%" PRId64, pNode->vgId, pMsg, tstrerror(code),
×
667
             pEntry->index);
668
      goto _out;
×
669
    }
670

671
    if (pEntry->index != pBuf->matchIndex) {
1,141,096,491✔
672
      sError("vgId:%d, msg:%p, failed to proceed, pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId,
×
673
             pMsg, pEntry->index, pBuf->matchIndex);
674
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
675
      goto _out;
×
676
    }
677

678
    // update my match index
679
    matchIndex = pBuf->matchIndex;
1,141,058,173✔
680
    syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex);
1,141,087,413✔
681
  }  // end of while
682

683
_out:
1,049,103,037✔
684
  pBuf->matchIndex = matchIndex;
1,147,620,736✔
685
  if (pMatchTerm) {
1,147,637,377✔
686
    *pMatchTerm = pBuf->entries[(matchIndex + pBuf->size) % pBuf->size].pItem->term;
360,396,715✔
687
  }
688
  (void)taosThreadMutexUnlock(&pBuf->mutex);
1,147,637,073✔
689
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
1,147,649,052✔
690
  return matchIndex;
1,147,674,423✔
691
}
692

693
int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
1,190,359,051✔
694
                       int32_t applyCode, bool force) {
695
  // learner need to execute fsm when it catch up entry log
696
  // if force is true, keep all contition check to execute fsm
697
  if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1 &&
1,190,359,051✔
698
      pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER && force == false) {
638,911,347✔
699
    sGDebug(&pEntry->originRpcTraceId,
625,400,582✔
700
            "vgId:%d, index:%" PRId64 ", raft fsm no need to execute, term:%" PRId64
701
            ", type:%s code:0x%x, replica:%d, role:%d, restoreFinish:%d",
702
            pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode, pNode->replicaNum,
703
            pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole, pNode->restoreFinish);
704
    return 0;
625,370,396✔
705
  }
706

707
  if (pNode->vgId != 1 && syncIsMsgBlock(pEntry->originalRpcType)) {
564,906,886✔
708
    sTrace("vgId:%d, index:%" PRId64 ", blocking msg ready to execute, term:%" PRId64 ", type:%s code:0x%x",
8,388,260✔
709
           pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode);
710
  }
711

712
  if (pEntry->originalRpcType == TDMT_VND_COMMIT) {
564,989,952✔
713
    sInfo("vgId:%d, index:%" PRId64 ", fsm execute vnode commit, term:%" PRId64, pNode->vgId, pEntry->index,
617,940✔
714
          pEntry->term);
715
  }
716

717
  int32_t code = 0, lino = 0;
564,989,713✔
718
  bool    retry = false;
564,989,713✔
719
  do {
720
    SFsmCbMeta cbMeta = {0};
564,989,713✔
721
    cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
564,989,773✔
722
    if (cbMeta.lastConfigIndex < -1) {
564,990,841✔
723
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
724
      if (terrno != 0) code = terrno;
×
725
      return code;
×
726
    }
727

728
    SRpcMsg rpcMsg = {.code = applyCode};
564,990,841✔
729
    TAOS_CHECK_EXIT(syncEntry2OriginalRpc(pEntry, &rpcMsg));
564,990,841✔
730

731
    cbMeta.index = pEntry->index;
564,990,901✔
732
    cbMeta.isWeak = pEntry->isWeak;
564,990,841✔
733
    cbMeta.code = applyCode;
564,990,335✔
734
    cbMeta.state = role;
564,990,335✔
735
    cbMeta.seqNum = pEntry->seqNum;
564,990,335✔
736
    cbMeta.term = pEntry->term;
564,989,636✔
737
    cbMeta.currentTerm = term;
564,990,901✔
738
    cbMeta.flag = -1;
564,990,901✔
739
    rpcMsg.info.traceId = pEntry->originRpcTraceId;
564,990,901✔
740

741
    int32_t num = syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
564,990,335✔
742
    sGDebug(&rpcMsg.info.traceId, "vgId:%d, index:%" PRId64 ", get response info, handle:%p seq:%" PRId64 " num:%d",
564,990,901✔
743
            pNode->vgId, pEntry->index, &rpcMsg.info, cbMeta.seqNum, num);
744

745
    code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
564,990,901✔
746

747
    retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
564,990,112✔
748

749
    sGTrace(&rpcMsg.info.traceId,
564,990,112✔
750
            "vgId:%d, index:%" PRId64 ", fsm execute, term:%" PRId64 ", type:%s, code:%d, retry:%d", pNode->vgId,
751
            pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), code, retry);
752

753
    if (retry) {
564,990,112✔
754
      taosMsleep(10);
×
755
      if (code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE) {
×
756
        pNode->applyQueueErrorCount++;
×
757
        if (pNode->applyQueueErrorCount == APPLY_QUEUE_ERROR_THRESHOLD) {
×
758
          pNode->applyQueueErrorCount = 0;
×
759
          sGWarn(&rpcMsg.info.traceId,
×
760
                 "vgId:%d, index:%" PRId64 ", will retry to execute fsm after 10ms, last error is %s", pNode->vgId,
761
                 pEntry->index, tstrerror(code));
762
        } else {
763
          sGTrace(&rpcMsg.info.traceId,
×
764
                  "vgId:%d, index:%" PRId64 ", will retry to execute fsm after 10ms, last error is %s", pNode->vgId,
765
                  pEntry->index, tstrerror(code));
766
        }
767
      }
768
    }
769
  } while (retry);
564,990,675✔
770

771
_exit:
564,990,675✔
772
  if (code < 0) {
564,990,675✔
773
    sError("vgId:%d, index:%" PRId64 ", failed to execute fsm at line %d since %s, term:%" PRId64 ", type:%s",
15,159✔
774
           pNode->vgId, pEntry->index, lino, tstrerror(code), pEntry->term, TMSG_INFO(pEntry->originalRpcType));
775
  }
776
  return code;
564,989,795✔
777
}
778

779
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) {
2,147,483,647✔
780
  if (pBuf->startIndex > pBuf->matchIndex) {
2,147,483,647✔
781
    sError("failed to validate, pBuf->startIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->startIndex,
×
782
           pBuf->matchIndex);
783
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
784
  }
785
  if (pBuf->commitIndex > pBuf->matchIndex) {
2,147,483,647✔
786
    sError("failed to validate, pBuf->commitIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->commitIndex,
×
787
           pBuf->matchIndex);
788
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
789
  }
790
  if (pBuf->matchIndex >= pBuf->endIndex) {
2,147,483,647✔
791
    sError("failed to validate, pBuf->matchIndex:%" PRId64 ", pBuf->endIndex:%" PRId64, pBuf->matchIndex,
×
792
           pBuf->endIndex);
793
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
794
  }
795
  if (pBuf->endIndex - pBuf->startIndex > pBuf->size) {
2,147,483,647✔
796
    sError("failed to validate, pBuf->endIndex:%" PRId64 ", pBuf->startIndex:%" PRId64 ", pBuf->size:%" PRId64,
×
797
           pBuf->endIndex, pBuf->startIndex, pBuf->size);
798
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
799
  }
800
  if (pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem == NULL) {
2,147,483,647✔
801
    sError("failed to validate since pItem is null");
×
802
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
803
  }
804
  return 0;
2,147,483,647✔
805
}
806

807
int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex, const STraceId* trace,
1,336,285,565✔
808
                            const char* src) {
809
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
1,336,285,565✔
810
  (void)taosThreadMutexLock(&pBuf->mutex);
1,336,331,458✔
811

812
  SSyncLogStore*  pLogStore = pNode->pLogStore;
1,336,287,946✔
813
  SSyncFSM*       pFsm = pNode->pFsm;
1,336,294,616✔
814
  ESyncState      role = pNode->state;
1,336,281,157✔
815
  SyncTerm        currentTerm = raftStoreGetTerm(pNode);
1,336,311,545✔
816
  SyncGroupId     vgId = pNode->vgId;
1,336,334,582✔
817
  int32_t         code = 0;
1,336,335,154✔
818
  int64_t         upperIndex = TMIN(commitIndex, pBuf->matchIndex);
1,336,335,154✔
819
  SSyncRaftEntry* pEntry = NULL;
1,336,312,949✔
820
  bool            inBuf = false;
1,336,319,923✔
821
  SSyncRaftEntry* pNextEntry = NULL;
1,336,290,284✔
822
  bool            nextInBuf = false;
1,336,320,935✔
823
  bool            restoreFinishAtThisCommit = false;
1,336,312,177✔
824

825
  if (commitIndex <= pBuf->commitIndex) {
1,336,312,177✔
826
    sGDebug(trace, "vgId:%d, stale commit index:%" PRId64 ", notified:%" PRId64, vgId, commitIndex, pBuf->commitIndex);
355,561,600✔
827
    if (!pNode->restoreFinish && commitIndex > 0 && commitIndex == pBuf->commitIndex) {
355,561,600✔
828
      sInfo("vgId:%d, try to get entry for restore check at index:%" PRId64, vgId, commitIndex);
53,726,407✔
829
      int32_t ret = syncLogBufferGetOneEntry(pBuf, pNode, commitIndex, &inBuf, &pEntry);
53,726,407✔
830
      if (ret != 0) {
53,726,407✔
831
        sWarn("vgId:%d, failed to get entry for restore check at index:%" PRId64, vgId, commitIndex);
221,890✔
832
      }
833
    }
834
    goto _out;
355,561,600✔
835
  }
836

837
  sGDebug(trace,
980,685,340✔
838
          "vgId:%d, log commit since %s, buffer:[%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
839
          "), role:%d, term:%" PRId64,
840
          pNode->vgId, src, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, currentTerm);
841

842
  // execute in fsm
843
  for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) {
2,147,483,647✔
844
    // get a log entry
845
    code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry);
1,190,387,581✔
846
    if (pEntry == NULL) {
1,190,368,692✔
847
      goto _out;
×
848
    }
849

850
    // execute it
851
    if (!syncUtilUserCommit(pEntry->originalRpcType)) {
1,190,368,692✔
852
      sGInfo(&pEntry->originRpcTraceId,
12,357,807✔
853
             "vgId:%d, index:%" PRId64 ", log commit sync barrier, term:%" PRId64 ", type:%s", vgId, pEntry->index,
854
             pEntry->term, TMSG_INFO(pEntry->originalRpcType));
855
    }
856

857
    if ((code = syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0, false)) != 0) {
1,190,367,429✔
858
      sGError(&pEntry->originRpcTraceId,
15,159✔
859
              "vgId:%d, index:%" PRId64 ", failed to execute sync log entry, term:%" PRId64
860
              ", role:%d, current term:%" PRId64,
861
              vgId, pEntry->index, pEntry->term, role, currentTerm);
862
      goto _out;
15,159✔
863
    }
864
    pBuf->commitIndex = index;
1,190,372,373✔
865

866
    sGDebug(&pEntry->originRpcTraceId,
1,190,370,664✔
867
            "vgId:%d, index:%" PRId64 ", raft entry committed, term:%" PRId64 ", role:%d, current term:%" PRId64,
868
            pNode->vgId, pEntry->index, pEntry->term, role, currentTerm);
869

870
    code = syncLogBufferGetOneEntry(pBuf, pNode, index + 1, &nextInBuf, &pNextEntry);
1,190,370,664✔
871
    if (pNextEntry != NULL) {
1,190,334,622✔
872
      if (pNextEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
399,758,048✔
873
        sInfo(
×
874
            "vgId:%d, to change config at commit, "
875
            "current entry, index:%" PRId64 ", term:%" PRId64
876
            ", "
877
            "node, role:%d, current term:%" PRId64
878
            ", restore:%d, "
879
            "cond, next entry index:%" PRId64 ", msgType:%s",
880
            vgId, pEntry->index, pEntry->term, role, currentTerm, pNode->restoreFinish, pNextEntry->index,
881
            TMSG_INFO(pNextEntry->originalRpcType));
882

883
        if ((code = syncNodeChangeConfig(pNode, pNextEntry, "Commit")) != 0) {
×
884
          sError("vgId:%d, failed to change config from Commit, index:%" PRId64 ", term:%" PRId64
×
885
                 ", role:%d, current term:%" PRId64,
886
                 vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
887
          goto _out;
×
888
        }
889

890
        // for 2->1, need to apply config change entry in sync thread,
891
        if (pNode->replicaNum == 1) {
×
892
          if ((code = syncFsmExecute(pNode, pFsm, role, currentTerm, pNextEntry, 0, true)) != 0) {
×
893
            sError("vgId:%d, failed to execute sync log entry, index:%" PRId64 ", term:%" PRId64
×
894
                   ", role:%d, current term:%" PRId64,
895
                   vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
896
            goto _out;
×
897
          }
898

899
          index++;
×
900
          pBuf->commitIndex = index;
×
901

902
          sGDebug(&pNextEntry->originRpcTraceId,
×
903
                  "vgId:%d, index:%" PRId64 ", raft entry committed, term:%" PRId64 ", role:%d, current term:%" PRId64,
904
                  pNode->vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
905
        }
906
      }
907
      if (!nextInBuf) {
399,728,341✔
908
        syncEntryDestroy(pNextEntry);
12,994,890✔
909
        pNextEntry = NULL;
12,994,890✔
910
      }
911
    }
912

913
    if (!inBuf) {
1,190,304,915✔
914
      syncEntryDestroy(pEntry);
12,995,361✔
915
      pEntry = NULL;
12,995,361✔
916
    }
917
  }
918

919
  // recycle
920
  bool      isVnode = pNode->vgId > 1;
980,654,606✔
921
  SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION;
980,664,934✔
922
  do {
964,326,300✔
923
    if ((pBuf->startIndex >= pBuf->commitIndex) ||
1,945,036,591✔
924
        !((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD &&
1,945,015,420✔
925
                                         atomic_load_64(&tsLogBufferMemoryUsed) >= tsLogBufferMemoryAllowed))) {
31,652,289✔
926
      break;
927
    }
928
    SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem;
964,244,711✔
929
    if (pEntry == NULL) {
964,350,025✔
930
      sError("vgId:%d, invalid log entry to recycle, index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
×
931
             ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64,
932
             pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term);
933
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
934
    }
935
    if (isVnode) {
964,350,025✔
936
      pBuf->bytes -= pEntry->bytes;
948,642,025✔
937
      (void)atomic_sub_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
948,654,914✔
938
    }
939
    sDebug("vgId:%d, recycle log entry, index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
964,369,537✔
940
           ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64
941
           ", used:%" PRId64 ", allowed:%" PRId64,
942
           pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term,
943
           pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed), tsLogBufferMemoryAllowed);
944
    syncEntryDestroy(pEntry);
964,369,537✔
945
    (void)memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
964,346,240✔
946
    ++pBuf->startIndex;
964,362,980✔
947
  } while (true);
948

949
  code = 0;
980,697,892✔
950
_out:
1,336,274,651✔
951
  // mark as restored if needed
952
  if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL &&
1,336,319,510✔
953
      currentTerm <= pEntry->term) {
116,520,358✔
954
    pNode->pFsm->FpRestoreFinishCb(pNode->pFsm, pBuf->commitIndex);
5,453,279✔
955
    pNode->restoreFinish = true;
5,453,976✔
956
    restoreFinishAtThisCommit = true;
5,453,976✔
957
    sInfo("vgId:%d, restore finished, term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
5,453,976✔
958
          pNode->vgId, currentTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
959
  }
960

961
  if (!inBuf) {
1,336,276,254✔
962
    syncEntryDestroy(pEntry);
306,340,815✔
963
    pEntry = NULL;
306,340,437✔
964
  }
965
  if (!nextInBuf) {
1,336,275,876✔
966
    syncEntryDestroy(pNextEntry);
1,135,172,010✔
967
    pNextEntry = NULL;
1,135,149,959✔
968
  }
969
  (void)taosThreadMutexUnlock(&pBuf->mutex);
1,336,253,825✔
970

971
  if (restoreFinishAtThisCommit && pNode->pFsm->FpAfterRestoredCb != NULL) {
1,336,301,113✔
972
    pNode->pFsm->FpAfterRestoredCb(pNode->pFsm, pBuf->commitIndex);
585,073✔
973
    sInfo("vgId:%d, after restore finished callback executed)", pNode->vgId);
585,073✔
974
  }
975

976
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
1,336,301,113✔
977
  TAOS_RETURN(code);
1,336,280,595✔
978
}
979

980
void syncLogReplReset(SSyncLogReplMgr* pMgr) {
16,735,376✔
981
  if (pMgr == NULL) return;
16,735,376✔
982

983
  if (pMgr->startIndex < 0) {
16,735,376✔
984
    sError("failed to reset, pMgr->startIndex:%" PRId64, pMgr->startIndex);
×
985
    return;
×
986
  }
987
  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
21,372,891✔
988
    (void)memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
4,637,515✔
989
  }
990
  pMgr->startIndex = 0;
16,735,376✔
991
  pMgr->matchIndex = 0;
16,734,650✔
992
  pMgr->endIndex = 0;
16,734,650✔
993
  pMgr->restored = false;
16,734,650✔
994
  pMgr->retryBackoff = 0;
16,734,650✔
995
}
996

997
int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
609,242,547✔
998
  if (pMgr->endIndex <= pMgr->startIndex) {
609,242,547✔
999
    return 0;
×
1000
  }
1001

1002
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
609,242,547✔
1003
  if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) {
609,242,448✔
1004
    syncLogReplReset(pMgr);
5,460✔
1005
    sWarn("vgId:%d, reset sync log repl since retry backoff exceeding limit, peer addr:0x%" PRIx64, pNode->vgId,
5,460✔
1006
          pDestId->addr);
1007
    return TSDB_CODE_OUT_OF_RANGE;
5,460✔
1008
  }
1009

1010
  int32_t  code = 0;
609,236,988✔
1011
  bool     retried = false;
609,236,988✔
1012
  int64_t  retryWaitMs = syncLogReplGetRetryBackoffTimeMs(pMgr);
609,236,988✔
1013
  int64_t  nowMs = taosGetMonoTimestampMs();
609,237,087✔
1014
  int      count = 0;
609,237,087✔
1015
  int64_t  firstIndex = -1;
609,237,087✔
1016
  SyncTerm term = -1;
609,237,087✔
1017
  int64_t  batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
609,237,087✔
1018

1019
  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
638,148,315✔
1020
    int64_t pos = index % pMgr->size;
633,200,165✔
1021
    if (!(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex))) {
633,200,066✔
1022
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1023
      goto _out;
×
1024
    }
1025

1026
    if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) {
633,200,066✔
1027
      break;
604,271,339✔
1028
    }
1029

1030
    if (pMgr->states[pos].acked) {
28,928,826✔
1031
      if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) {
24,500,278✔
UNCOV
1032
        syncLogReplReset(pMgr);
×
UNCOV
1033
        sWarn("vgId:%d, reset sync log repl since stagnation, index:%" PRId64 ", peer addr:0x%" PRIx64, pNode->vgId, index,
×
1034
              pDestId->addr);
UNCOV
1035
        code = TSDB_CODE_ACTION_IN_PROGRESS;
×
UNCOV
1036
        goto _out;
×
1037
      }
1038
      continue;
24,273,937✔
1039
    }
1040

1041
    bool barrier = false;
4,654,889✔
1042
    if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) {
4,654,889✔
1043
      sError("vgId:%d, failed to replicate sync log entry since %s, index:%" PRId64 ", dest addr:0x%" PRIx64, pNode->vgId,
×
1044
             tstrerror(code), index, pDestId->addr);
1045
      goto _out;
×
1046
    }
1047
    if (barrier != pMgr->states[pos].barrier) {
4,654,889✔
1048
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1049
      goto _out;
×
1050
    }
1051
    pMgr->states[pos].timeMs = nowMs;
4,654,889✔
1052
    pMgr->states[pos].term = term;
4,654,889✔
1053
    pMgr->states[pos].acked = false;
4,654,889✔
1054

1055
    retried = true;
4,654,889✔
1056
    if (firstIndex == -1) firstIndex = index;
4,654,889✔
1057

1058
    if (batchSize <= count++) {
4,654,889✔
1059
      break;
17,151✔
1060
    }
1061
  }
1062

1063
_out:
609,235,874✔
1064
  if (retried) {
609,236,140✔
1065
    pMgr->retryBackoff = syncLogReplGetNextRetryBackoff(pMgr);
114,350✔
1066
    SSyncLogBuffer* pBuf = pNode->pLogBuf;
114,350✔
1067
    sInfo("vgId:%d, resend %d sync log entries, dest addr:0x%" PRIx64 ", indexes:%" PRId64 " ..., terms: .., %" PRId64
114,350✔
1068
          ", retryWaitMs:%" PRId64 ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64
1069
          " %" PRId64 ", %" PRId64 ")",
1070
          pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex,
1071
          pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1072
  }
1073
  TAOS_RETURN(code);
609,236,140✔
1074
}
1075

1076
int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
3,116,113✔
1077
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
3,116,113✔
1078
  SRaftId         destId = pMsg->srcId;
3,116,113✔
1079
  int32_t         code = 0;
3,116,113✔
1080
  if (pMgr->restored != false) return TSDB_CODE_SYN_INTERNAL_ERROR;
3,116,113✔
1081

1082
  sTrace("vgId:%d, begin to recover sync log repl, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 ", %" PRId64
3,116,113✔
1083
         ", %" PRId64 ") restore:%d, buffer: [%" PRId64 ", %" PRId64 ", %" PRId64 ", %" PRId64
1084
         "), msg: {lastSendIndex:%" PRId64 ", matchIndex:%" PRId64 ", fsmState:%d, success:%d, lastMatchTerm:%" PRId64
1085
         "}",
1086
         pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pMgr->restored,
1087
         pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, pMsg->lastSendIndex, pMsg->matchIndex,
1088
         pMsg->fsmState, pMsg->success, pMsg->lastMatchTerm);
1089

1090
  if (pMgr->endIndex == 0) {
3,116,113✔
1091
    if (pMgr->startIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
658,903✔
1092
    if (pMgr->matchIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
658,903✔
1093
    if (pMsg->matchIndex < 0) {
658,903✔
1094
      pMgr->restored = true;
94,608✔
1095
      sInfo("vgId:%d, sync log repl restored, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
94,608✔
1096
            "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1097
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1098
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1099
      return 0;
94,608✔
1100
    }
1101
  } else {
1102
    if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) {
2,457,210✔
1103
      TAOS_CHECK_RETURN(syncLogReplRetryOnNeed(pMgr, pNode));
1,324,832✔
1104
      return 0;
1,324,832✔
1105
    }
1106

1107
    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
1,132,378✔
1108

1109
    if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) {
1,132,378✔
1110
      pMgr->matchIndex = pMsg->matchIndex;
1,001,682✔
1111
      pMgr->restored = true;
1,001,682✔
1112
      sInfo("vgId:%d, sync log repl restored, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
1,001,682✔
1113
            "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1114
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1115
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1116
      return 0;
1,001,682✔
1117
    }
1118

1119
    if (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE || (!pMsg->success && pMsg->matchIndex >= pMsg->lastSendIndex)) {
130,696✔
1120
      char* msg1 = " rollback match index failure";
×
1121
      char* msg2 = " incomplete fsm state";
×
1122
      sTrace("vgId:%d, is going to trigger snapshot to dnode:%d by append reply, reason:%s, match index:%" PRId64
×
1123
             ", last sent:%" PRId64,
1124
             pNode->vgId, DID(&destId), (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1), pMsg->matchIndex,
1125
             pMsg->lastSendIndex);
1126
      pNode->snapSeq = -1;
×
1127
      if ((code = syncNodeStartSnapshot(pNode, &destId, (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1))) <
×
1128
          0) {
1129
        sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
×
1130
        TAOS_RETURN(code);
×
1131
      }
1132
      return 0;
×
1133
    }
1134
  }
1135

1136
  // check last match term
1137
  SyncTerm  term = -1;
694,991✔
1138
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
694,991✔
1139
  SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex);
694,991✔
1140
  SET_ERRNO(0);
694,991✔
1141

1142
  if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) {
694,991✔
1143
    code = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1, &term);
179,439✔
1144
    if (term < 0 && (ERRNO == ENFILE || ERRNO == EMFILE || ERRNO == ENOENT)) {
179,439✔
1145
      sError("vgId:%d, failed to get prev log term since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index + 1);
×
1146
      TAOS_RETURN(code);
×
1147
    }
1148

1149
    if (pMsg->matchIndex == -1) {
179,439✔
1150
      // first time to restore
1151
      sInfo("vgId:%d, first time to restore sync log repl, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64
78,101✔
1152
            ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), index:%" PRId64
1153
            ", firstVer:%" PRId64 ", term:%" PRId64 ", lastMatchTerm:%" PRId64,
1154
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1155
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, index, firstVer, term,
1156
            pMsg->lastMatchTerm);
1157
    }
1158

1159
    if ((index + 1 < firstVer) || (term < 0) ||
179,439✔
1160
        (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
148,024✔
1161
      if (!(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST)) return TSDB_CODE_SYN_INTERNAL_ERROR;
31,415✔
1162
      sTrace("vgId:%d, is going to trigger snapshot to dnode:%d by append reply, index:%" PRId64 ", firstVer:%" PRId64
31,415✔
1163
             ", term:%" PRId64 ", lastMatchTerm:%" PRId64,
1164
             pNode->vgId, DID(&destId), index, firstVer, term, pMsg->lastMatchTerm);
1165
      char reason[100] = {0};
31,415✔
1166
      if (index + 1 < firstVer)
31,415✔
1167
        tsnprintf(reason, 100, "matched entry not in log range, index:%" PRId64 ", firstVer:%" PRId64, index, firstVer);
31,415✔
1168
      else if (term < 0)
×
1169
        tsnprintf(reason, 100, "failed to get prev log term");
×
1170
      else
1171
        tsnprintf(reason, 100, "log term mismatch");
×
1172
      pNode->snapSeq = -1;
31,415✔
1173
      if ((code = syncNodeStartSnapshot(pNode, &destId, reason)) < 0) {
31,415✔
1174
        sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
×
1175
        TAOS_RETURN(code);
×
1176
      }
1177
      return 0;
31,415✔
1178
    }
1179

1180
    if (!(index + 1 >= firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR;
148,024✔
1181

1182
    if (term == pMsg->lastMatchTerm) {
148,024✔
1183
      index = index + 1;
147,696✔
1184
      if (!(index <= pNode->pLogBuf->matchIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR;
147,696✔
1185
    } else {
1186
      if (!(index > firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR;
328✔
1187
    }
1188
  }
1189

1190
  // attempt to replicate the raft log at index
1191
  syncLogReplReset(pMgr);
663,576✔
1192
  return syncLogReplProbe(pMgr, pNode, index);
663,576✔
1193
}
1194

1195
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
28,186,204✔
1196
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
28,186,204✔
1197
  (void)taosThreadMutexLock(&pMgr->mutex);
28,186,204✔
1198
  if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
28,186,204✔
1199
    sInfo("vgId:%d, reset sync log repl in heartbeat, peer addr:0x%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64,
1,043,664✔
1200
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
1201
    syncLogReplReset(pMgr);
1,043,664✔
1202
    pMgr->peerStartTime = pMsg->startTime;
1,043,664✔
1203
  }
1204
  (void)taosThreadMutexUnlock(&pMgr->mutex);
28,186,204✔
1205
  return 0;
28,185,813✔
1206
}
1207

1208
int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
359,481,401✔
1209
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
359,481,401✔
1210
  (void)taosThreadMutexLock(&pMgr->mutex);
359,481,401✔
1211
  if (pMsg->startTime != pMgr->peerStartTime) {
359,481,401✔
1212
    sInfo("vgId:%d, reset sync log repl in appendlog reply, peer addr:0x%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64,
99,264✔
1213
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
1214
    syncLogReplReset(pMgr);
99,264✔
1215
    pMgr->peerStartTime = pMsg->startTime;
99,264✔
1216
  }
1217
  (void)taosThreadMutexUnlock(&pMgr->mutex);
359,481,401✔
1218

1219
  (void)taosThreadMutexLock(&pBuf->mutex);
359,481,401✔
1220

1221
  int32_t code = 0;
359,481,401✔
1222
  if (pMgr->restored) {
359,481,401✔
1223
    if ((code = syncLogReplContinue(pMgr, pNode, pMsg)) != 0) {
356,365,288✔
1224
      sWarn("vgId:%d, failed to continue sync log repl since %s", pNode->vgId, tstrerror(code));
812✔
1225
    }
1226
  } else {
1227
    if ((code = syncLogReplRecover(pMgr, pNode, pMsg)) != 0) {
3,116,113✔
1228
      sWarn("vgId:%d, failed to recover sync log repl since %s", pNode->vgId, tstrerror(code));
×
1229
    }
1230
  }
1231
  (void)taosThreadMutexUnlock(&pBuf->mutex);
359,480,802✔
1232
  return 0;
359,481,401✔
1233
}
1234

1235
int32_t syncLogReplStart(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
254,508,757✔
1236
  if (pMgr->restored) {
254,508,757✔
1237
    TAOS_CHECK_RETURN(syncLogReplAttempt(pMgr, pNode));
251,564,253✔
1238
  } else {
1239
    TAOS_CHECK_RETURN(syncLogReplProbe(pMgr, pNode, pNode->pLogBuf->matchIndex));
2,944,504✔
1240
  }
1241
  return 0;
254,492,283✔
1242
}
1243

1244
int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
3,608,632✔
1245
  if (pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR;
3,608,632✔
1246
  if (!(pMgr->startIndex >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
3,608,632✔
1247
  int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs();
3,608,632✔
1248
  int64_t nowMs = taosGetMonoTimestampMs();
3,608,632✔
1249
  int32_t code = 0;
3,608,632✔
1250

1251
  sTrace("vgId:%d, begin to probe peer addr:0x%" PRIx64 " with msg of index:%" PRId64 ", repl-mgr:[%" PRId64 ", %" PRId64
3,608,632✔
1252
         ", %" PRId64 "), restored:%d",
1253
         pNode->vgId, pNode->replicasId[pMgr->peerId].addr, index, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1254
         pMgr->restored);
1255

1256
  if (pMgr->endIndex > pMgr->startIndex &&
3,608,632✔
1257
      nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) {
1,725,297✔
1258
    return 0;
1,625,619✔
1259
  }
1260
  syncLogReplReset(pMgr);
1,983,013✔
1261

1262
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
1,983,013✔
1263
  bool     barrier = false;
1,983,013✔
1264
  SyncTerm term = -1;
1,983,013✔
1265
  if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) {
1,983,013✔
1266
    sError("vgId:%d, failed to replicate log entry since %s, index:%" PRId64 ", dest addr:0x%016" PRIx64, pNode->vgId,
×
1267
           tstrerror(code), index, pDestId->addr);
1268
    TAOS_RETURN(code);
×
1269
  }
1270

1271
  if (!(index >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
1,983,013✔
1272
  pMgr->states[index % pMgr->size].barrier = barrier;
1,983,013✔
1273
  pMgr->states[index % pMgr->size].timeMs = nowMs;
1,983,013✔
1274
  pMgr->states[index % pMgr->size].term = term;
1,983,013✔
1275
  pMgr->states[index % pMgr->size].acked = false;
1,983,013✔
1276

1277
  pMgr->startIndex = index;
1,983,013✔
1278
  pMgr->endIndex = index + 1;
1,983,013✔
1279

1280
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
1,983,013✔
1281
  sTrace("vgId:%d, probe peer addr:0x%" PRIx64 " with msg of index:%" PRId64 " term:%" PRId64 ", repl-mgr:[%" PRId64
1,983,013✔
1282
         " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1283
         pNode->vgId, pDestId->addr, index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex,
1284
         pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1285
  return 0;
1,983,013✔
1286
}
1287

1288
int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
607,929,201✔
1289
  if (!pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR;
607,929,201✔
1290

1291
  sTrace("vgId:%d, replicate raft entries from end to match, repl-mgr:[%" PRId64 ", %" PRId64 ", %" PRId64
607,929,201✔
1292
         "), restore:%d",
1293
         pNode->vgId, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pMgr->restored);
1294

1295
  SRaftId*  pDestId = &pNode->replicasId[pMgr->peerId];
607,929,201✔
1296
  int32_t   batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
607,929,541✔
1297
  int32_t   code = 0;
607,929,193✔
1298
  int32_t   count = 0;
607,929,193✔
1299
  int64_t   nowMs = taosGetMonoTimestampMs();
607,928,786✔
1300
  int64_t   limit = pMgr->size >> 1;
607,928,786✔
1301
  SyncTerm  term = -1;
607,928,786✔
1302
  SyncIndex firstIndex = -1;
607,928,786✔
1303

1304
  for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) {
961,851,485✔
1305
    if (batchSize < count || limit <= index - pMgr->startIndex) {
500,939,849✔
1306
      break;
1307
    }
1308
    if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) {
363,957,686✔
1309
      break;
8,983,965✔
1310
    }
1311
    int64_t  pos = index % pMgr->size;
354,973,721✔
1312
    SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
354,972,932✔
1313
    bool     barrier = false;
354,973,721✔
1314
    SyncTerm term = -1;
354,973,721✔
1315

1316
    code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier);
354,972,932✔
1317
    if (code < 0) {
354,973,605✔
1318
      sError("vgId:%d, failed to replicate log entry since %s, index:%" PRId64 ", dest addr:0x%016" PRIx64, pNode->vgId,
11,826✔
1319
             tstrerror(code), index, pDestId->addr);
1320
      TAOS_RETURN(code);
11,826✔
1321
    }
1322
    pMgr->states[pos].barrier = barrier;
354,961,779✔
1323
    pMgr->states[pos].timeMs = nowMs;
354,961,895✔
1324
    pMgr->states[pos].term = term;
354,961,895✔
1325
    pMgr->states[pos].acked = false;
354,961,895✔
1326

1327
    if (firstIndex == -1) {
354,961,895✔
1328
      firstIndex = index;
279,749,987✔
1329
    }
1330

1331
    count++;
354,961,895✔
1332

1333
    pMgr->endIndex = index + 1;
354,961,895✔
1334
    if (barrier) {
354,961,895✔
1335
      sInfo("vgId:%d, replicated sync barrier to dnode:%d, index:%" PRId64 ", term:%" PRId64 ", repl-mgr:[%" PRId64
1,039,196✔
1336
            " %" PRId64 ", %" PRId64 ")",
1337
            pNode->vgId, DID(pDestId), index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
1338
      break;
1,039,196✔
1339
    }
1340
  }
1341

1342
  TAOS_CHECK_RETURN(syncLogReplRetryOnNeed(pMgr, pNode));
607,917,715✔
1343

1344
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
607,911,308✔
1345
  sTrace("vgId:%d, replicated %d msgs to peer addr:0x%" PRIx64 ", indexes:%" PRId64 "..., terms: ...%" PRId64
607,911,308✔
1346
         ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
1347
         ")",
1348
         pNode->vgId, count, pDestId->addr, firstIndex, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1349
         pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1350
  return 0;
607,911,134✔
1351
}
1352

1353
int32_t syncLogReplContinue(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
356,365,288✔
1354
  if (pMgr->restored != true) return TSDB_CODE_SYN_INTERNAL_ERROR;
356,365,288✔
1355
  if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) {
356,365,288✔
1356
    if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) {
313,130,445✔
1357
      int64_t firstMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs;
×
1358
      int64_t lastMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs;
×
1359
      int64_t diffMs = lastMs - firstMs;
×
1360
      if (diffMs > 0 && diffMs < ((int64_t)SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) {
×
1361
        pMgr->retryBackoff -= 1;
×
1362
      }
1363
    }
1364
    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
313,130,445✔
1365
    pMgr->matchIndex = TMAX(pMgr->matchIndex, pMsg->matchIndex);
313,130,445✔
1366
    for (SyncIndex index = pMgr->startIndex; index < pMgr->matchIndex; index++) {
663,866,048✔
1367
      (void)memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
350,735,603✔
1368
    }
1369
    pMgr->startIndex = pMgr->matchIndex;
313,130,211✔
1370
  }
1371

1372
  return syncLogReplAttempt(pMgr, pNode);
356,365,288✔
1373
}
1374

1375
SSyncLogReplMgr* syncLogReplCreate() {
77,248,619✔
1376
  SSyncLogReplMgr* pMgr = taosMemoryCalloc(1, sizeof(SSyncLogReplMgr));
77,248,619✔
1377
  if (pMgr == NULL) {
77,247,464✔
1378
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1379
    return NULL;
×
1380
  }
1381

1382
  pMgr->size = sizeof(pMgr->states) / sizeof(pMgr->states[0]);
77,247,464✔
1383

1384
  if (pMgr->size != TSDB_SYNC_LOG_BUFFER_SIZE) {
77,247,464✔
1385
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1386
    return NULL;
×
1387
  }
1388

1389
  int32_t code = taosThreadMutexInit(&pMgr->mutex, NULL);
77,247,464✔
1390
  if (code) {
77,240,764✔
1391
    terrno = code;
×
1392
    return NULL;
×
1393
  }
1394

1395
  return pMgr;
77,240,764✔
1396
}
1397

1398
void syncLogReplDestroy(SSyncLogReplMgr* pMgr) {
77,037,632✔
1399
  if (pMgr == NULL) {
77,037,632✔
1400
    return;
×
1401
  }
1402
  (void)taosThreadMutexDestroy(&pMgr->mutex);
77,037,632✔
1403
  taosMemoryFree(pMgr);
77,042,659✔
1404
  return;
77,033,276✔
1405
}
1406

1407
int32_t syncNodeLogReplInit(SSyncNode* pNode) {
5,150,107✔
1408
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
82,398,737✔
1409
    if (pNode->logReplMgrs[i] != NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
77,248,630✔
1410
    pNode->logReplMgrs[i] = syncLogReplCreate();
77,248,619✔
1411
    if (pNode->logReplMgrs[i] == NULL) {
77,248,143✔
1412
      TAOS_RETURN(terrno);
×
1413
    }
1414
    pNode->logReplMgrs[i]->peerId = i;
77,246,627✔
1415
  }
1416
  return 0;
5,150,107✔
1417
}
1418

1419
void syncNodeLogReplDestroy(SSyncNode* pNode) {
5,136,349✔
1420
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
82,169,864✔
1421
    syncLogReplDestroy(pNode->logReplMgrs[i]);
77,033,515✔
1422
    pNode->logReplMgrs[i] = NULL;
77,033,276✔
1423
  }
1424
}
5,136,349✔
1425

1426
int32_t syncLogBufferCreate(SSyncLogBuffer** ppBuf) {
5,149,231✔
1427
  int32_t         code = 0;
5,149,231✔
1428
  SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer));
5,149,231✔
1429
  if (pBuf == NULL) {
5,150,107✔
1430
    TAOS_CHECK_GOTO(terrno, NULL, _exit);
×
1431
  }
1432

1433
  pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]);
5,150,107✔
1434

1435
  if (pBuf->size != TSDB_SYNC_LOG_BUFFER_SIZE) {
5,150,107✔
1436
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1437
    goto _exit;
×
1438
  }
1439

1440
  if (taosThreadMutexAttrInit(&pBuf->attr) < 0) {
5,150,107✔
1441
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1442
    sError("failed to init log buffer mutexattr due to %s", tstrerror(code));
×
1443
    goto _exit;
×
1444
  }
1445

1446
  if (taosThreadMutexAttrSetType(&pBuf->attr, PTHREAD_MUTEX_RECURSIVE) < 0) {
5,150,107✔
1447
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1448
    sError("failed to set log buffer mutexattr type due to %s", tstrerror(code));
×
1449
    goto _exit;
×
1450
  }
1451

1452
  if (taosThreadMutexInit(&pBuf->mutex, &pBuf->attr) < 0) {
5,150,107✔
1453
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1454
    sError("failed to init log buffer mutex due to %s", tstrerror(code));
×
1455
    goto _exit;
×
1456
  }
1457
_exit:
5,150,107✔
1458
  if (code != 0) {
5,150,107✔
1459
    taosMemoryFreeClear(pBuf);
×
1460
  }
1461
  *ppBuf = pBuf;
5,150,107✔
1462
  TAOS_RETURN(code);
5,150,107✔
1463
}
1464

1465
void syncLogBufferClear(SSyncLogBuffer* pBuf) {
5,136,349✔
1466
  (void)taosThreadMutexLock(&pBuf->mutex);
5,136,349✔
1467
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
223,928,939✔
1468
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
218,801,433✔
1469
    if (pEntry == NULL) continue;
218,831,335✔
1470
    syncEntryDestroy(pEntry);
218,751,651✔
1471
    pEntry = NULL;
218,688,721✔
1472
    (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
218,688,721✔
1473
  }
1474
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
5,136,349✔
1475
  pBuf->bytes = 0;
5,136,349✔
1476
  (void)taosThreadMutexUnlock(&pBuf->mutex);
5,136,349✔
1477
}
5,136,349✔
1478

1479
void syncLogBufferDestroy(SSyncLogBuffer* pBuf) {
5,136,349✔
1480
  if (pBuf == NULL) {
5,136,349✔
1481
    return;
×
1482
  }
1483
  syncLogBufferClear(pBuf);
5,136,349✔
1484
  (void)taosThreadMutexDestroy(&pBuf->mutex);
5,136,349✔
1485
  (void)taosThreadMutexAttrDestroy(&pBuf->attr);
5,136,349✔
1486
  taosMemoryFree(pBuf);
5,136,349✔
1487
  return;
5,136,349✔
1488
}
1489

1490
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
6,589,685✔
1491
  int32_t code = 0;
6,589,685✔
1492
  if (!(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR;
6,589,685✔
1493

1494
  if (toIndex == pBuf->endIndex) {
6,589,685✔
1495
    return 0;
6,582,866✔
1496
  }
1497

1498
  sInfo("vgId:%d, rollback sync log buffer, toindex:%" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
6,819✔
1499
        ")",
1500
        pNode->vgId, toIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1501

1502
  // trunc buffer
1503
  SyncIndex index = pBuf->endIndex - 1;
6,819✔
1504
  while (index >= toIndex) {
452,380✔
1505
    SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem;
445,561✔
1506
    if (pEntry != NULL) {
445,561✔
1507
      syncEntryDestroy(pEntry);
7,453✔
1508
      pEntry = NULL;
7,453✔
1509
      (void)memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0]));
7,453✔
1510
    }
1511
    index--;
445,561✔
1512
  }
1513
  pBuf->endIndex = toIndex;
6,819✔
1514
  pBuf->matchIndex = TMIN(pBuf->matchIndex, index);
6,819✔
1515
  if (index + 1 != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
6,819✔
1516

1517
  // trunc wal
1518
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
6,819✔
1519
  if (lastVer >= toIndex && (code = pNode->pLogStore->syncLogTruncate(pNode->pLogStore, toIndex)) < 0) {
6,819✔
1520
    sError("vgId:%d, failed to truncate log store since %s, from index:%" PRId64, pNode->vgId, tstrerror(code),
×
1521
           toIndex);
1522
    TAOS_RETURN(code);
×
1523
  }
1524
  lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
6,819✔
1525
  if (toIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
6,819✔
1526

1527
  // refill buffer on need
1528
  if (toIndex <= pBuf->startIndex) {
6,819✔
1529
    if ((code = syncLogBufferInitWithoutLock(pBuf, pNode)) < 0) {
×
1530
      sError("vgId:%d, failed to refill sync log buffer since %s", pNode->vgId, tstrerror(code));
×
1531
      TAOS_RETURN(code);
×
1532
    }
1533
  }
1534

1535
  if (pBuf->endIndex != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
6,819✔
1536
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
6,819✔
1537
  return 0;
6,819✔
1538
}
1539

1540
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
6,588,517✔
1541
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
6,588,517✔
1542
  (void)taosThreadMutexLock(&pBuf->mutex);
6,588,517✔
1543
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
6,588,517✔
1544
  if (lastVer != pBuf->matchIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
6,588,517✔
1545
  SyncIndex index = pBuf->endIndex - 1;
6,588,517✔
1546

1547
  int32_t code = 0;
6,588,517✔
1548
  if ((code = syncLogBufferRollback(pBuf, pNode, pBuf->matchIndex + 1)) != 0) {
6,588,517✔
1549
    sError("vgId:%d, failed to rollback sync log buffer since %s", pNode->vgId, tstrerror(code));
×
1550
  }
1551

1552
  sInfo("vgId:%d, reset sync log buffer, buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
6,588,142✔
1553
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1554

1555
  pBuf->endIndex = pBuf->matchIndex + 1;
6,588,517✔
1556

1557
  // reset repl mgr
1558
  for (int i = 0; i < pNode->totalReplicaNum; i++) {
19,485,788✔
1559
    SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
12,897,271✔
1560
    syncLogReplReset(pMgr);
12,896,978✔
1561
  }
1562
  (void)taosThreadMutexUnlock(&pBuf->mutex);
6,588,517✔
1563
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
6,588,517✔
1564
  return 0;
6,588,202✔
1565
}
1566

1567
int32_t syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf,
2,147,483,647✔
1568
                                 SSyncRaftEntry** ppEntry) {
1569
  int32_t code = 0;
2,147,483,647✔
1570

1571
  *ppEntry = NULL;
2,147,483,647✔
1572

1573
  if (index >= pBuf->endIndex) {
2,147,483,647✔
1574
    return TSDB_CODE_OUT_OF_RANGE;
1,092,648,973✔
1575
  }
1576

1577
  if (index > pBuf->startIndex) {  // startIndex might be dummy
2,057,878,647✔
1578
    *pInBuf = true;
1,896,962,106✔
1579
    *ppEntry = pBuf->entries[index % pBuf->size].pItem;
1,896,943,941✔
1580
#ifdef USE_MOUNT
1581
    if (pNode->mountVgId) {
1,896,948,444✔
1582
      SMsgHead* pMsgHead = (SMsgHead*)(*ppEntry)->data;
12,152✔
1583
      if (pMsgHead->vgId != pNode->vgId) pMsgHead->vgId = pNode->vgId;
12,152✔
1584
    }
1585
#endif
1586
  } else {
1587
    *pInBuf = false;
160,938,266✔
1588

1589
    if ((code = pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, index, ppEntry)) < 0) {
160,938,266✔
1590
      sWarn("vgId:%d, failed to get log entry since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
233,716✔
1591
    }
1592
  }
1593

1594
  TAOS_RETURN(code);
2,057,879,854✔
1595
}
1596

1597
int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId,
361,611,623✔
1598
                          bool* pBarrier) {
1599
  SSyncRaftEntry* pEntry = NULL;
361,611,623✔
1600
  SRpcMsg         msgOut = {0};
361,611,623✔
1601
  bool            inBuf = false;
361,609,555✔
1602
  SyncTerm        prevLogTerm = -1;
361,611,024✔
1603
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
361,611,623✔
1604
  int32_t         code = 0;
361,611,623✔
1605
  int32_t         lino = 0;
361,611,623✔
1606

1607
  code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry);
361,611,024✔
1608
  if (pEntry == NULL) {
361,610,494✔
1609
    sWarn("vgId:%d, failed to get raft entry for index:%" PRId64, pNode->vgId, index);
11,826✔
1610
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) {
11,826✔
1611
      SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
11,826✔
1612
      if (pMgr) {
11,826✔
1613
        sInfo("vgId:%d, reset sync log repl of peer addr:0x%" PRIx64 " since %s, index:%" PRId64, pNode->vgId, pDestId->addr,
11,826✔
1614
              tstrerror(code), index);
1615
        syncLogReplReset(pMgr);
11,826✔
1616
      }
1617
    }
1618
    goto _err;
11,826✔
1619
  }
1620
  *pBarrier = syncLogReplBarrier(pEntry);
723,197,638✔
1621

1622
  code = syncLogReplGetPrevLogTerm(pMgr, pNode, index, &prevLogTerm);
361,599,797✔
1623
  if (prevLogTerm < 0) {
361,599,797✔
1624
    sError("vgId:%d, failed to get prev log term since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
×
1625
    goto _err;
×
1626
  }
1627
  if (pTerm) *pTerm = pEntry->term;
361,599,797✔
1628

1629
  code = syncBuildAppendEntriesFromRaftEntry(pNode, pEntry, prevLogTerm, &msgOut);
361,599,797✔
1630
  if (code < 0) {
361,599,797✔
1631
    sError("vgId:%d, failed to get append entries for index:%" PRId64, pNode->vgId, index);
×
1632
    goto _err;
×
1633
  }
1634

1635
  TRACE_SET_MSGID(&(msgOut.info.traceId), tGenIdPI64());
361,599,797✔
1636
  sGDebug(&msgOut.info.traceId,
361,597,813✔
1637
          "vgId:%d, index:%" PRId64 ", replicate one msg to dest addr:0x%" PRIx64 ", term:%" PRId64 " prevterm:%" PRId64,
1638
          pNode->vgId, pEntry->index, pDestId->addr, pEntry->term, prevLogTerm);
1639

1640
  TAOS_CHECK_GOTO(syncNodeSendAppendEntries(pNode, pDestId, &msgOut), &lino, _err);
361,597,813✔
1641
  if (!inBuf) {
361,599,797✔
1642
    syncEntryDestroy(pEntry);
130,430,619✔
1643
    pEntry = NULL;
130,430,619✔
1644
  }
1645
  return 0;
361,599,797✔
1646

1647
_err:
11,826✔
1648
  rpcFreeCont(msgOut.pCont);
11,826✔
1649
  msgOut.pCont = NULL;
11,826✔
1650
  if (!inBuf) {
11,826✔
1651
    syncEntryDestroy(pEntry);
11,826✔
1652
    pEntry = NULL;
11,826✔
1653
  }
1654
  TAOS_RETURN(code);
11,826✔
1655
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc