• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4308

14 Jun 2025 02:06PM UTC coverage: 62.454% (-0.3%) from 62.777%
#4308

push

travis-ci

web-flow
fix: taosdump windows pthread_mutex_unlock crash(3.0) (#31357)

* fix: windows pthread_mutex_unlock crash

* enh: sync from main fix taosdump crash windows

* fix: restore .github action branch to main

153985 of 315105 branches covered (48.87%)

Branch coverage included in aggregate %.

238120 of 312727 relevant lines covered (76.14%)

6462519.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.29
/source/libs/sync/src/syncPipeline.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17

18
#include "syncCommit.h"
19
#include "syncIndexMgr.h"
20
#include "syncInt.h"
21
#include "syncPipeline.h"
22
#include "syncRaftCfg.h"
23
#include "syncRaftEntry.h"
24
#include "syncRaftStore.h"
25
#include "syncReplication.h"
26
#include "syncRespMgr.h"
27
#include "syncSnapshot.h"
28
#include "syncUtil.h"
29
#include "syncVoteMgr.h"
30

31
static int64_t tsLogBufferMemoryUsed = 0;  // total bytes of vnode log buffer
32

33
static bool syncIsMsgBlock(tmsg_t type) {
1,026,457✔
34
  return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
1,014,254!
35
         (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
2,040,711✔
36
}
37

38
FORCE_INLINE static int64_t syncGetRetryMaxWaitMs() {
39
  return SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF);
46,799✔
40
}
41

42
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
2,418,229✔
43
  (void)taosThreadMutexLock(&pBuf->mutex);
2,418,229✔
44
  int64_t index = pBuf->endIndex;
2,418,626✔
45
  (void)taosThreadMutexUnlock(&pBuf->mutex);
2,418,626✔
46
  return index;
2,418,644✔
47
}
48

49
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
2,418,172✔
50
  int32_t code = 0;
2,418,172✔
51
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
2,418,172!
52
  (void)taosThreadMutexLock(&pBuf->mutex);
2,418,607✔
53
  SyncIndex index = pEntry->index;
2,418,471✔
54

55
  if (index - pBuf->startIndex >= pBuf->size) {
2,418,471!
56
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
57
    sError("vgId:%d, failed to append since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
×
58
    goto _err;
×
59
  }
60

61
  if (pNode->restoreFinish && index - pBuf->commitIndex >= TSDB_SYNC_NEGOTIATION_WIN) {
2,418,471!
62
    code = TSDB_CODE_SYN_NEGOTIATION_WIN_FULL;
×
63
    sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64, pNode->vgId, tstrerror(code),
×
64
           index, pBuf->commitIndex);
65
    goto _err;
×
66
  }
67

68
  SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
2,418,471✔
69
  if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= TSDB_SYNC_APPLYQ_SIZE_LIMIT) {
2,418,279!
70
    code = TSDB_CODE_SYN_WRITE_STALL;
×
71
    sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64,
×
72
           pNode->vgId, tstrerror(code), index, pBuf->commitIndex, appliedIndex);
73
    goto _err;
×
74
  }
75

76
  if (index != pBuf->endIndex) {
2,418,279!
77
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
78
    goto _err;
×
79
  };
80

81
  SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem;
2,418,279✔
82
  if (pExist != NULL) {
2,418,279!
83
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
84
    goto _err;
×
85
  }
86

87
  // initial log buffer with at least one item, e.g, commitIndex
88
  SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem;
2,418,279✔
89
  if (pMatch == NULL) {
2,418,279!
90
    sError("vgId:%d, no matched log entry", pNode->vgId);
×
91
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
92
    goto _err;
×
93
  }
94
  if (pMatch->index + 1 != index) {
2,418,279!
95
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
96
    goto _err;
×
97
  }
98
  if (!(pMatch->term <= pEntry->term)) {
2,418,279!
99
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
100
    goto _err;
×
101
  }
102

103
  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
2,418,279✔
104
  pBuf->entries[index % pBuf->size] = tmp;
2,418,279✔
105
  pBuf->endIndex = index + 1;
2,418,279✔
106
  if (pNode->vgId > 1) {
2,418,279✔
107
    pBuf->bytes += pEntry->bytes;
2,315,574✔
108
    (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
2,315,574✔
109
  }
110

111
  (void)taosThreadMutexUnlock(&pBuf->mutex);
2,418,444✔
112
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
2,418,638!
113
  return 0;
2,418,592✔
114

115
_err:
×
116
  (void)taosThreadMutexUnlock(&pBuf->mutex);
×
117
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
×
118
  taosMsleep(1);
×
119
  TAOS_RETURN(code);
×
120
}
121

122
int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pSyncTerm) {
496,991✔
123
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
496,991✔
124
  SSyncRaftEntry* pEntry = NULL;
496,991✔
125
  SyncIndex       prevIndex = index - 1;
496,991✔
126
  SyncTerm        prevLogTerm = -1;
496,991✔
127
  int32_t         code = 0;
496,991✔
128

129
  if (prevIndex == -1 && pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore) == 0) {
496,991✔
130
    *pSyncTerm = 0;
2,163✔
131
    return 0;
2,163✔
132
  }
133

134
  if (prevIndex > pBuf->matchIndex) {
494,828✔
135
    *pSyncTerm = -1;
26✔
136
    TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
26✔
137
  }
138

139
  if (index - 1 != prevIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
494,802!
140

141
  if (prevIndex >= pBuf->startIndex) {
494,802✔
142
    pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem;
412,568✔
143
    if (pEntry == NULL) {
412,568!
144
      sError("vgId:%d, failed to get pre log term since no log entry found", pNode->vgId);
×
145
      *pSyncTerm = -1;
×
146
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
147
    }
148
    prevLogTerm = pEntry->term;
412,568✔
149
    *pSyncTerm = prevLogTerm;
412,568✔
150
    return 0;
412,568✔
151
  }
152

153
  if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) {
82,234✔
154
    int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs;
72,275✔
155
    if (timeMs == 0) {
72,275!
156
      sError("vgId:%d, failed to get pre log term since timeMs is 0", pNode->vgId);
×
157
      *pSyncTerm = -1;
×
158
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
159
    }
160
    prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term;
72,275✔
161
    if (!(prevIndex == 0 || prevLogTerm != 0)) {
72,275!
162
      sError("vgId:%d, failed to get pre log term prevIndex:%" PRId64 ", prevLogTerm:%" PRId64, pNode->vgId, prevIndex,
×
163
             prevLogTerm);
164
      *pSyncTerm = -1;
×
165
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
166
    }
167
    *pSyncTerm = prevLogTerm;
72,275✔
168
    return 0;
72,275✔
169
  }
170

171
  SSnapshot snapshot = {0};
9,959✔
172
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
9,959✔
173
  if (prevIndex == snapshot.lastApplyIndex) {
9,961✔
174
    *pSyncTerm = snapshot.lastApplyTerm;
4✔
175
    return 0;
4✔
176
  }
177

178
  if ((code = pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, prevIndex, &pEntry)) == 0) {
9,957✔
179
    prevLogTerm = pEntry->term;
9,866✔
180
    syncEntryDestroy(pEntry);
9,866✔
181
    pEntry = NULL;
9,866✔
182
    *pSyncTerm = prevLogTerm;
9,866✔
183
    return 0;
9,866✔
184
  }
185

186
  *pSyncTerm = -1;
91✔
187
  sInfo("vgId:%d, failed to get log term since %s, index:%" PRId64, pNode->vgId, tstrerror(code), prevIndex);
91!
188
  TAOS_RETURN(code);
91✔
189
}
190

191
SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId) {
16,205✔
192
  return syncEntryBuildNoop(term, index, vgId);
16,205✔
193
}
194

195
int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex) {
16,235✔
196
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
16,235✔
197
  if (firstVer > commitIndex + 1) {
16,236!
198
    sError("vgId:%d, firstVer of WAL log greater than tsdb commit version + 1, firstVer:%" PRId64
×
199
           ", tsdb commit version:%" PRId64,
200
           pNode->vgId, firstVer, commitIndex);
201
    return TSDB_CODE_WAL_LOG_INCOMPLETE;
×
202
  }
203

204
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
16,236✔
205
  if (lastVer < commitIndex) {
16,235!
206
    sError("vgId:%d, lastVer of WAL log less than tsdb commit version, lastVer:%" PRId64
×
207
           ", tsdb commit version:%" PRId64,
208
           pNode->vgId, lastVer, commitIndex);
209
    return TSDB_CODE_WAL_LOG_INCOMPLETE;
×
210
  }
211

212
  return 0;
16,235✔
213
}
214

215
int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
16,235✔
216
  if (pNode->pLogStore == NULL) {
16,235!
217
    sError("log store not created");
×
218
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
219
  }
220
  if (pNode->pFsm == NULL) {
16,235!
221
    sError("pFsm not registered");
×
222
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
223
  }
224
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
16,235!
225
    sError("FpGetSnapshotInfo not registered");
×
226
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
227
  }
228

229
  int32_t   code = 0, lino = 0;
16,235✔
230
  SSnapshot snapshot = {0};
16,235✔
231
  TAOS_CHECK_EXIT(pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot));
16,235!
232

233
  SyncIndex commitIndex = snapshot.lastApplyIndex;
16,236✔
234
  SyncTerm  commitTerm = TMAX(snapshot.lastApplyTerm, 0);
16,236✔
235
  TAOS_CHECK_EXIT(syncLogValidateAlignmentOfCommit(pNode, commitIndex));
16,236!
236

237
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
16,233✔
238
  if (lastVer < commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
16,232!
239
  ;
240
  SyncIndex toIndex = lastVer;
16,232✔
241
  // update match index
242
  pBuf->commitIndex = commitIndex;
16,232✔
243
  pBuf->matchIndex = toIndex;
16,232✔
244
  pBuf->endIndex = toIndex + 1;
16,232✔
245

246
  // load log entries in reverse order
247
  SSyncLogStore*  pLogStore = pNode->pLogStore;
16,232✔
248
  SyncIndex       index = toIndex;
16,232✔
249
  SSyncRaftEntry* pEntry = NULL;
16,232✔
250
  bool            takeDummy = false;
16,232✔
251
  int             emptySize = (TSDB_SYNC_LOG_BUFFER_SIZE >> 1);
16,232✔
252

253
  while (true) {
202,552✔
254
    if (index <= pBuf->commitIndex) {
218,784✔
255
      takeDummy = true;
16,208✔
256
      break;
16,208✔
257
    }
258

259
    if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) {
202,576✔
260
      sWarn("vgId:%d, failed to get log entry since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
6!
261
      break;
×
262
    }
263

264
    bool taken = false;
202,559✔
265
    if (toIndex - index + 1 <= pBuf->size - emptySize) {
202,559✔
266
      SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1};
202,537✔
267
      pBuf->entries[index % pBuf->size] = tmp;
202,537✔
268
      taken = true;
202,537✔
269
      if (pNode->vgId > 1) {
202,537✔
270
        pBuf->bytes += pEntry->bytes;
201,978✔
271
        (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
201,978✔
272
      }
273
    }
274

275
    if (index < toIndex) {
202,578✔
276
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = pEntry->index;
200,198✔
277
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = pEntry->term;
200,198✔
278
    }
279

280
    if (!taken) {
202,578✔
281
      syncEntryDestroy(pEntry);
26✔
282
      pEntry = NULL;
26✔
283
      break;
26✔
284
    }
285

286
    index--;
202,552✔
287
  }
288

289
  // put a dummy record at commitIndex if present in log buffer
290
  if (takeDummy) {
16,234✔
291
    if (index != pBuf->commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
16,206!
292

293
    SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId);
16,206✔
294
    if (pDummy == NULL) {
16,210!
295
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
296
    }
297
    SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm};
16,210✔
298
    pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp;
16,210✔
299
    if (pNode->vgId > 1) {
16,210✔
300
      pBuf->bytes += pDummy->bytes;
13,956✔
301
      (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pDummy->bytes);
13,956✔
302
    }
303

304
    if (index < toIndex) {
16,210✔
305
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex;
2,358✔
306
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = commitTerm;
2,358✔
307
    }
308
  }
309

310
  // update startIndex
311
  pBuf->startIndex = takeDummy ? index : index + 1;
16,238✔
312

313
  pBuf->isCatchup = false;
16,238✔
314

315
  sInfo("vgId:%d, init sync log buffer, buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
16,238✔
316
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
317

318
  // validate
319
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
16,241!
320
  return 0;
16,237✔
321

322
_exit:
×
323
  if (code != 0) {
×
324
    sError("vgId:%d, failed to initialize sync log buffer at line %d since %s.", pNode->vgId, lino, tstrerror(code));
×
325
  }
326
  TAOS_RETURN(code);
×
327
}
328

329
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
16,141✔
330
  (void)taosThreadMutexLock(&pBuf->mutex);
16,141✔
331
  int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
16,147✔
332
  (void)taosThreadMutexUnlock(&pBuf->mutex);
16,147✔
333
  return ret;
16,147✔
334
}
335

336
int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
90✔
337
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
90!
338
  (void)taosThreadMutexLock(&pBuf->mutex);
90✔
339
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
180✔
340
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
90✔
341
    if (pEntry == NULL) continue;
90!
342
    syncEntryDestroy(pEntry);
90✔
343
    pEntry = NULL;
90✔
344
    (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
90✔
345
  }
346
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
90✔
347
  pBuf->bytes = 0;
90✔
348
  int32_t code = syncLogBufferInitWithoutLock(pBuf, pNode);
90✔
349
  if (code < 0) {
90!
350
    sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code));
×
351
  }
352
  (void)taosThreadMutexUnlock(&pBuf->mutex);
90✔
353
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
90!
354
  return code;
90✔
355
}
356

357
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTermWithoutLock(SSyncLogBuffer* pBuf) {
358
  SyncIndex       index = pBuf->matchIndex;
515,338✔
359
  SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
515,338✔
360
  if (pEntry == NULL) {
51,402!
361
    sError("failed to get last match term since entry is null");
2!
362
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
2✔
363
    return -1;
×
364
  }
365
  return pEntry->term;
515,336✔
366
}
367

368
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
51,402✔
369
  (void)taosThreadMutexLock(&pBuf->mutex);
51,402✔
370
  SyncTerm term = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
51,402✔
371
  (void)taosThreadMutexUnlock(&pBuf->mutex);
51,402✔
372
  return term;
51,402✔
373
}
374

375
bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf) {
51,402✔
376
  (void)taosThreadMutexLock(&pBuf->mutex);
51,402✔
377
  bool empty = (pBuf->endIndex <= pBuf->startIndex);
51,402✔
378
  (void)taosThreadMutexUnlock(&pBuf->mutex);
51,402✔
379
  return empty;
51,402✔
380
}
381

382
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
463,926✔
383
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
463,926!
384
  (void)taosThreadMutexLock(&pBuf->mutex);
463,935✔
385
  int32_t         code = 0;
463,936✔
386
  SyncIndex       index = pEntry->index;
463,936✔
387
  SyncIndex       prevIndex = pEntry->index - 1;
463,936✔
388
  SyncTerm        lastMatchTerm = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
463,934✔
389
  SSyncRaftEntry* pExist = NULL;
463,934✔
390
  bool            inBuf = true;
463,934✔
391

392
  if (lastMatchTerm < 0) {
463,934!
393
    sError("vgId:%d, failed to accept, lastMatchTerm:%" PRId64, pNode->vgId, lastMatchTerm);
×
394
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
395
    goto _out;
×
396
  }
397

398
  if (index <= pBuf->commitIndex) {
463,934✔
399
    sTrace("vgId:%d, already committed, index:%" PRId64 ", term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64
11,551!
400
           " %" PRId64 ", %" PRId64 ")",
401
           pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
402
           pBuf->endIndex);
403
    SyncTerm term = -1;
11,551✔
404
    code = syncLogReplGetPrevLogTerm(NULL, pNode, index + 1, &term);
11,551✔
405
    if (pEntry->term < 0) {
11,551!
406
      sError("vgId:%d, failed to accept, pEntry->term:%" PRId64, pNode->vgId, pEntry->term);
×
407
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
408
      goto _out;
×
409
    }
410
    if (term == pEntry->term) {
11,551!
411
      code = 0;
11,551✔
412
    }
413
    goto _out;
11,551✔
414
  }
415

416
  if (pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER && index > 0 &&
452,383✔
417
      index > pBuf->totalIndex) {
36,528✔
418
    pBuf->totalIndex = index;
1,678✔
419
    sTrace("vgId:%d, update learner progress, index:%" PRId64 ", term:%" PRId64 ": prevterm:%" PRId64
1,678!
420
           " != lastmatch:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
421
           pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
422
           pBuf->matchIndex, pBuf->endIndex);
423
  }
424

425
  if (index - pBuf->startIndex >= pBuf->size) {
452,372✔
426
    sWarn("vgId:%d, out of buffer range, index:%" PRId64 ", term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64
1!
427
          " %" PRId64 ", %" PRId64 ")",
428
          pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
429
          pBuf->endIndex);
430
    code = TSDB_CODE_OUT_OF_RANGE;
1✔
431
    goto _out;
1✔
432
  }
433

434
  if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) {
452,371✔
435
    sWarn("vgId:%d, not ready to accept, index:%" PRId64 ", term:%" PRId64 ": prevterm:%" PRId64
745!
436
          " != lastmatch:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
437
          pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
438
          pBuf->matchIndex, pBuf->endIndex);
439
    code = TSDB_CODE_ACTION_IN_PROGRESS;
745✔
440
    goto _out;
745✔
441
  }
442

443
  // check current in buffer
444
  code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pExist);
451,626✔
445
  if (pExist != NULL) {
451,631✔
446
    if (pEntry->index != pExist->index) {
1,816!
447
      sError("vgId:%d, failed to accept, pEntry->index:%" PRId64 ", pExist->index:%" PRId64, pNode->vgId, pEntry->index,
×
448
             pExist->index);
449
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
450
      goto _out;
×
451
    }
452
    if (pEntry->term != pExist->term) {
1,816✔
453
      TAOS_CHECK_GOTO(syncLogBufferRollback(pBuf, pNode, index), NULL, _out);
2!
454
    } else {
455
      sTrace("vgId:%d, duplicate log entry received, index:%" PRId64 ", term:%" PRId64 ", log buffer: [%" PRId64
1,814!
456
             " %" PRId64 " %" PRId64 ", %" PRId64 ")",
457
             pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
458
             pBuf->endIndex);
459
      SyncTerm existPrevTerm = -1;
1,814✔
460
      TAOS_CHECK_GOTO(syncLogReplGetPrevLogTerm(NULL, pNode, index, &existPrevTerm), NULL, _out);
1,814✔
461
      if (!(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm))) {
1,788!
462
        sError("vgId:%d, failed to accept, pEntry->term:%" PRId64 ", pExist->indexpExist->term:%" PRId64
×
463
               ", pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64 ", prevTerm:%" PRId64
464
               ", existPrevTerm:%" PRId64,
465
               pNode->vgId, pEntry->term, pExist->term, pEntry->index, pBuf->matchIndex, prevTerm, existPrevTerm);
466
        code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
467
        goto _out;
×
468
      }
469
      code = 0;
1,788✔
470
      goto _out;
1,788✔
471
    }
472
  }
473

474
  // update
475
  if (!(pBuf->startIndex < index)) {
449,817!
476
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
477
    goto _out;
×
478
  };
479
  if (!(index - pBuf->startIndex < pBuf->size)) {
449,817!
480
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
481
    goto _out;
×
482
  }
483
  if (pBuf->entries[index % pBuf->size].pItem != NULL) {
449,817!
484
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
485
    goto _out;
×
486
  }
487
  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
449,817✔
488
  pBuf->entries[index % pBuf->size] = tmp;
449,817✔
489
  if (pNode->vgId > 1) {
449,817✔
490
    pBuf->bytes += pEntry->bytes;
428,864✔
491
    (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
428,864✔
492
  }
493
  pEntry = NULL;
449,824✔
494

495
  // update end index
496
  pBuf->endIndex = TMAX(index + 1, pBuf->endIndex);
449,824✔
497

498
  // success
499
  code = 0;
449,824✔
500

501
_out:
463,935✔
502
  syncEntryDestroy(pEntry);
463,935✔
503
  if (!inBuf) {
463,936!
504
    syncEntryDestroy(pExist);
×
505
    pExist = NULL;
×
506
  }
507
  (void)taosThreadMutexUnlock(&pBuf->mutex);
463,936✔
508
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
463,935!
509
  TAOS_RETURN(code);
463,931✔
510
}
511

512
static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replicaNum) {
2,868,311✔
513
  return (replicaNum > 1) && (pEntry->originalRpcType == TDMT_VND_COMMIT);
2,868,311✔
514
}
515

516
int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
2,867,889✔
517
  int32_t code = 0;
2,867,889✔
518
  if (pEntry->index < 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
2,867,889!
519
  SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore);
2,867,889✔
520
  if (lastVer >= pEntry->index && (code = pLogStore->syncLogTruncate(pLogStore, pEntry->index)) < 0) {
2,868,411!
521
    sError("failed to truncate log store since %s, from index:%" PRId64, tstrerror(code), pEntry->index);
×
522
    TAOS_RETURN(code);
×
523
  }
524
  lastVer = pLogStore->syncLogLastIndex(pLogStore);
2,868,411✔
525
  if (pEntry->index != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
2,868,404!
526

527
  bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum);
2,868,404✔
528
  if ((code = pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync)) < 0) {
2,868,333!
529
    sError("failed to persist raft entry since %s, index:%" PRId64 ", term:%" PRId64, tstrerror(code),
×
530
           pEntry->index, pEntry->term);
531
    TAOS_RETURN(code);
×
532
  }
533

534
  lastVer = pLogStore->syncLogLastIndex(pLogStore);
2,867,801✔
535
  if (pEntry->index != lastVer) return TSDB_CODE_SYN_INTERNAL_ERROR;
2,868,252!
536
  return 0;
2,868,252✔
537
}
538

539
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char* str, const SRpcMsg *pMsg) {
2,882,133✔
540
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
2,882,133!
541
  (void)taosThreadMutexLock(&pBuf->mutex);
2,882,292✔
542

543
  SSyncLogStore* pLogStore = pNode->pLogStore;
2,882,406✔
544
  int64_t        matchIndex = pBuf->matchIndex;
2,882,406✔
545
  int32_t        code = 0;
2,882,406✔
546

547
  while (pBuf->matchIndex + 1 < pBuf->endIndex) {
5,750,705✔
548
    int64_t index = pBuf->matchIndex + 1;
2,923,602✔
549
    if (index < 0) {
2,923,602!
550
      sError("vgId:%d, failed to proceed index:%" PRId64, pNode->vgId, index);
×
551
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
552
      goto _out;
×
553
    }
554

555
    // try to proceed
556
    SSyncLogBufEntry* pBufEntry = &pBuf->entries[index % pBuf->size];
2,923,602✔
557
    SyncIndex         prevLogIndex = pBufEntry->prevLogIndex;
2,923,602✔
558
    SyncTerm          prevLogTerm = pBufEntry->prevLogTerm;
2,923,602✔
559
    SSyncRaftEntry*   pEntry = pBufEntry->pItem;
2,923,602✔
560
    if (pEntry == NULL) {
2,923,602✔
561
      sTrace("vgId:%d, msg:%p, cannot proceed match index in log buffer, no raft entry at next pos of matchIndex:%" PRId64,
55,401!
562
             pNode->vgId, pMsg, pBuf->matchIndex);
563
      goto _out;
55,401✔
564
    }
565

566
    if (index != pEntry->index) {
2,868,201!
567
      sError("vgId:%d, msg:%p, failed to proceed index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId, pMsg, index, pEntry->index);
×
568
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
569
      goto _out;
×
570
    }
571

572
    // match
573
    SSyncRaftEntry* pMatch = pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem;
2,868,201✔
574
    if (pMatch == NULL) {
2,868,201!
575
      sError("vgId:%d, msg:%p, failed to proceed since pMatch is null", pNode->vgId, pMsg);
×
576
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
577
      goto _out;
×
578
    }
579
    if (pMatch->index != pBuf->matchIndex) {
2,868,201!
580
      sError("vgId:%d, msg:%p, failed to proceed, pMatch->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId,
×
581
             pMsg, pMatch->index, pBuf->matchIndex);
582
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
583
      goto _out;
×
584
    }
585
    if (pMatch->index + 1 != pEntry->index) {
2,868,201!
586
      sError("vgId:%d, msg:%p, failed to proceed, pMatch->index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId, pMsg,
×
587
             pMatch->index, pEntry->index);
588
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
589
      goto _out;
×
590
    }
591
    if (prevLogIndex != pMatch->index) {
2,868,201!
592
      sError("vgId:%d, msg:%p, failed to proceed, prevLogIndex:%" PRId64 ", pMatch->index:%" PRId64, pNode->vgId, pMsg,
×
593
             prevLogIndex, pMatch->index);
594
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
595
      goto _out;
×
596
    }
597

598
    if (pMatch->term != prevLogTerm) {
2,868,201!
599
      sInfo(
×
600
          "vgId:%d, msg:%p, mismatching sync log entries encountered, "
601
          "{ index:%" PRId64 ", term:%" PRId64
602
          " } "
603
          "{ index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64 ", prevLogTerm:%" PRId64 " } ",
604
          pNode->vgId, pMsg, pMatch->index, pMatch->term, pEntry->index, pEntry->term, prevLogIndex, prevLogTerm);
605
      goto _out;
×
606
    }
607

608
    // increase match index
609
    pBuf->matchIndex = index;
2,868,201✔
610

611
    sGDebug(pMsg ? &pMsg->info.traceId : NULL,
2,868,201!
612
            "vgId:%d, msg:%p, log buffer proceed, start index:%" PRId64 ", match index:%" PRId64 ", end index:%" PRId64,
613
            pNode->vgId, pMsg, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex);
614

615
    // persist
616
    if ((code = syncLogStorePersist(pLogStore, pNode, pEntry)) < 0) {
2,868,206!
617
      sError("vgId:%d, msg:%p, failed to persist sync log entry from buffer since %s, index:%" PRId64, pNode->vgId,
×
618
             pMsg, tstrerror(code), pEntry->index);
619
      taosMsleep(1);
×
620
      goto _out;
×
621
    }
622

623
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
2,868,237!
624
      if (pNode->pLogBuf->commitIndex == pEntry->index - 1) {
×
625
        sInfo(
×
626
            "vgId:%d, msg:%p, to change config at %s, "
627
            "current entry, index:%" PRId64 ", term:%" PRId64
628
            ", "
629
            "node, restore:%d, commitIndex:%" PRId64
630
            ", "
631
            "cond: (pre entry index:%" PRId64 "== buf commit index:%" PRId64 ")",
632
            pNode->vgId, pMsg, str, pEntry->index, pEntry->term, pNode->restoreFinish, pNode->commitIndex,
633
            pEntry->index - 1, pNode->pLogBuf->commitIndex);
634
        if ((code = syncNodeChangeConfig(pNode, pEntry, str)) != 0) {
×
635
          sError("vgId:%d, failed to change config from Append since %s, index:%" PRId64, pNode->vgId, tstrerror(code),
×
636
                 pEntry->index);
637
          goto _out;
×
638
        }
639
      } else {
640
        sInfo(
×
641
            "vgId:%d, msg:%p, delay change config from Node %s, "
642
            "curent entry, index:%" PRId64 ", term:%" PRId64
643
            ", "
644
            "node, commitIndex:%" PRId64 ",  pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
645
            "), "
646
            "cond:( pre entry index:%" PRId64 " != buf commit index:%" PRId64 ")",
647
            pNode->vgId, pMsg, str, pEntry->index, pEntry->term, pNode->commitIndex, pNode->pLogBuf->startIndex,
648
            pNode->pLogBuf->commitIndex, pNode->pLogBuf->matchIndex, pNode->pLogBuf->endIndex, pEntry->index - 1,
649
            pNode->pLogBuf->commitIndex);
650
      }
651
    }
652

653
    // replicate on demand
654
    if ((code = syncNodeReplicateWithoutLock(pNode)) != 0) {
2,868,237!
655
      sError("vgId:%d, msg:%p, failed to replicate since %s, index:%" PRId64, pNode->vgId, pMsg, tstrerror(code),
×
656
             pEntry->index);
657
      goto _out;
×
658
    }
659

660
    if (pEntry->index != pBuf->matchIndex) {
2,868,126!
661
      sError("vgId:%d, msg:%p, failed to proceed, pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId,
×
662
             pMsg, pEntry->index, pBuf->matchIndex);
663
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
664
      goto _out;
×
665
    }
666

667
    // update my match index
668
    matchIndex = pBuf->matchIndex;
2,868,126✔
669
    syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex);
2,868,126✔
670
  }  // end of while
671

672
_out:
2,827,103✔
673
  pBuf->matchIndex = matchIndex;
2,882,504✔
674
  if (pMatchTerm) {
2,882,504✔
675
    *pMatchTerm = pBuf->entries[(matchIndex + pBuf->size) % pBuf->size].pItem->term;
463,933✔
676
  }
677
  (void)taosThreadMutexUnlock(&pBuf->mutex);
2,882,504✔
678
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
2,882,556!
679
  return matchIndex;
2,882,387✔
680
}
681

682
int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
3,258,551✔
683
                       int32_t applyCode, bool force) {
684
  // learner need to execute fsm when it catch up entry log
685
  // if force is true, keep all contition check to execute fsm
686
  if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1 &&
3,258,551✔
687
      pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER && force == false) {
2,112,240!
688
    sGDebug(&pEntry->originRpcTraceId,
2,108,580!
689
            "vgId:%d, index:%" PRId64 ", raft fsm no need to execute, term:%" PRId64
690
            ", type:%s code:0x%x, replica:%d, role:%d, restoreFinish:%d",
691
            pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode, pNode->replicaNum,
692
            pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole, pNode->restoreFinish);
693
    return 0;
2,108,466✔
694
  }
695

696
  if (pNode->vgId != 1 && syncIsMsgBlock(pEntry->originalRpcType)) {
1,149,971✔
697
    sTrace("vgId:%d, index:%" PRId64 ", blocking msg ready to execute, term:%" PRId64 ", type:%s code:0x%x",
23,779!
698
           pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode);
699
  }
700

701
  if (pEntry->originalRpcType == TDMT_VND_COMMIT) {
1,149,970✔
702
    sInfo("vgId:%d, index:%" PRId64 ", fsm execute vnode commit, term:%" PRId64, pNode->vgId, pEntry->index,
1,408!
703
          pEntry->term);
704
  }
705

706
  int32_t code = 0, lino = 0;
1,150,575✔
707
  bool    retry = false;
1,150,575✔
708
  do {
709
    SFsmCbMeta cbMeta = {0};
1,150,575✔
710
    cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
1,150,575✔
711
    if (cbMeta.lastConfigIndex < -1) {
1,150,574!
712
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
713
      if (terrno != 0) code = terrno;
×
714
      return code;
×
715
    }
716

717
    SRpcMsg rpcMsg = {.code = applyCode};
1,150,574✔
718
    TAOS_CHECK_EXIT(syncEntry2OriginalRpc(pEntry, &rpcMsg));
1,150,574!
719

720
    cbMeta.index = pEntry->index;
1,150,564✔
721
    cbMeta.isWeak = pEntry->isWeak;
1,150,564✔
722
    cbMeta.code = applyCode;
1,150,564✔
723
    cbMeta.state = role;
1,150,564✔
724
    cbMeta.seqNum = pEntry->seqNum;
1,150,564✔
725
    cbMeta.term = pEntry->term;
1,150,564✔
726
    cbMeta.currentTerm = term;
1,150,564✔
727
    cbMeta.flag = -1;
1,150,564✔
728
    rpcMsg.info.traceId = pEntry->originRpcTraceId;
1,150,564✔
729

730
    int32_t num = syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
1,150,564✔
731
    sGDebug(&rpcMsg.info.traceId, "vgId:%d, index:%" PRId64 ", get response info, handle:%p seq:%" PRId64 " num:%d",
1,150,596!
732
            pNode->vgId, pEntry->index, &rpcMsg.info, cbMeta.seqNum, num);
733

734
    code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
1,150,612✔
735
    retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
1,150,599!
736

737
    sGTrace(&rpcMsg.info.traceId,
1,150,599!
738
            "vgId:%d, index:%" PRId64 ", fsm execute, term:%" PRId64 ", type:%s, code:%d, retry:%d", pNode->vgId,
739
            pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), code, retry);
740

741
    if (retry) {
1,150,599!
742
      taosMsleep(10);
×
743
      if (code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE) {
×
744
        pNode->applyQueueErrorCount++;
×
745
        if (pNode->applyQueueErrorCount == APPLY_QUEUE_ERROR_THRESHOLD) {
×
746
          pNode->applyQueueErrorCount = 0;
×
747
          sGWarn(&rpcMsg.info.traceId,
×
748
                 "vgId:%d, index:%" PRId64 ", will retry to execute fsm after 10ms, last error is %s", pNode->vgId,
749
                 pEntry->index, tstrerror(code));
750
        } else {
751
          sGTrace(&rpcMsg.info.traceId,
×
752
                  "vgId:%d, index:%" PRId64 ", will retry to execute fsm after 10ms, last error is %s", pNode->vgId,
753
                  pEntry->index, tstrerror(code));
754
        }
755
      }
756
    }
757
  } while (retry);
1,150,597!
758

759
_exit:
1,150,597✔
760
  if (code < 0) {
1,150,597✔
761
    sError("vgId:%d, index:%" PRId64 ", failed to execute fsm at line %d since %s, term:%" PRId64 ", type:%s",
16!
762
           pNode->vgId, pEntry->index, lino, tstrerror(code), pEntry->term, TMSG_INFO(pEntry->originalRpcType));
763
  }
764
  return code;
1,150,596✔
765
}
766

767
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) {
17,942,629✔
768
  if (pBuf->startIndex > pBuf->matchIndex) {
17,942,629!
769
    sError("failed to validate, pBuf->startIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->startIndex,
×
770
           pBuf->matchIndex);
771
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
772
  }
773
  if (pBuf->commitIndex > pBuf->matchIndex) {
17,942,629!
774
    sError("failed to validate, pBuf->commitIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->commitIndex,
×
775
           pBuf->matchIndex);
776
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
777
  }
778
  if (pBuf->matchIndex >= pBuf->endIndex) {
17,942,629!
779
    sError("failed to validate, pBuf->matchIndex:%" PRId64 ", pBuf->endIndex:%" PRId64, pBuf->matchIndex,
×
780
           pBuf->endIndex);
781
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
782
  }
783
  if (pBuf->endIndex - pBuf->startIndex > pBuf->size) {
17,942,629!
784
    sError("failed to validate, pBuf->endIndex:%" PRId64 ", pBuf->startIndex:%" PRId64 ", pBuf->size:%" PRId64,
×
785
           pBuf->endIndex, pBuf->startIndex, pBuf->size);
786
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
787
  }
788
  if (pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem == NULL) {
17,942,629!
789
    sError("failed to validate since pItem is null");
×
790
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
791
  }
792
  return 0;
17,942,629✔
793
}
794

795
int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex, const STraceId* trace,
3,178,444✔
796
                            const char* src) {
797
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
3,178,444!
798
  (void)taosThreadMutexLock(&pBuf->mutex);
3,178,650✔
799

800
  SSyncLogStore*  pLogStore = pNode->pLogStore;
3,178,519✔
801
  SSyncFSM*       pFsm = pNode->pFsm;
3,178,519✔
802
  ESyncState      role = pNode->state;
3,178,519✔
803
  SyncTerm        currentTerm = raftStoreGetTerm(pNode);
3,178,519✔
804
  SyncGroupId     vgId = pNode->vgId;
3,178,602✔
805
  int32_t         code = 0;
3,178,602✔
806
  int64_t         upperIndex = TMIN(commitIndex, pBuf->matchIndex);
3,178,602✔
807
  SSyncRaftEntry* pEntry = NULL;
3,178,602✔
808
  bool            inBuf = false;
3,178,602✔
809
  SSyncRaftEntry* pNextEntry = NULL;
3,178,602✔
810
  bool            nextInBuf = false;
3,178,602✔
811
  bool            restoreFinishAtThisCommit = false;
3,178,602✔
812

813
  if (commitIndex <= pBuf->commitIndex) {
3,178,602✔
814
    sGDebug(trace, "vgId:%d, stale commit index:%" PRId64 ", notified:%" PRId64, vgId, commitIndex, pBuf->commitIndex);
340,798!
815
    if (!pNode->restoreFinish && commitIndex > 0 && commitIndex == pBuf->commitIndex) {
340,799!
816
      int32_t ret = syncLogBufferGetOneEntry(pBuf, pNode, commitIndex, &inBuf, &pEntry);
18,965✔
817
      if (ret != 0) {
18,965✔
818
        sError("vgId:%d, failed to get entry at index:%" PRId64, vgId, commitIndex);
594!
819
      }
820
    }
821
    goto _out;
340,798✔
822
  }
823

824
  sGDebug(trace,
2,837,804!
825
          "vgId:%d, log commit since %s, buffer:[%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
826
          "), role:%d, term:%" PRId64,
827
          pNode->vgId, src, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, currentTerm);
828

829
  // execute in fsm
830
  for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) {
6,096,080✔
831
    // get a log entry
832
    code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry);
3,258,473✔
833
    if (pEntry == NULL) {
3,259,019!
834
      goto _out;
×
835
    }
836

837
    // execute it
838
    if (!syncUtilUserCommit(pEntry->originalRpcType)) {
3,259,019✔
839
      sGInfo(&pEntry->originRpcTraceId,
35,649!
840
             "vgId:%d, index:%" PRId64 ", log commit sync barrier, term:%" PRId64 ", type:%s", vgId, pEntry->index,
841
             pEntry->term, TMSG_INFO(pEntry->originalRpcType));
842
    }
843

844
    if ((code = syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0, false)) != 0) {
3,258,935✔
845
      sGError(&pEntry->originRpcTraceId,
16!
846
              "vgId:%d, index:%" PRId64 ", failed to execute sync log entry, term:%" PRId64
847
              ", role:%d, current term:%" PRId64,
848
              vgId, pEntry->index, pEntry->term, role, currentTerm);
849
      goto _out;
16✔
850
    }
851
    pBuf->commitIndex = index;
3,258,863✔
852

853
    sGDebug(&pEntry->originRpcTraceId,
3,258,863!
854
            "vgId:%d, index:%" PRId64 ", raft entry committed, term:%" PRId64 ", role:%d, current term:%" PRId64,
855
            pNode->vgId, pEntry->index, pEntry->term, role, currentTerm);
856

857
    code = syncLogBufferGetOneEntry(pBuf, pNode, index + 1, &nextInBuf, &pNextEntry);
3,258,876✔
858
    if (pNextEntry != NULL) {
3,258,894✔
859
      if (pNextEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
781,484!
860
        sInfo(
×
861
            "vgId:%d, to change config at commit, "
862
            "current entry, index:%" PRId64 ", term:%" PRId64
863
            ", "
864
            "node, role:%d, current term:%" PRId64
865
            ", restore:%d, "
866
            "cond, next entry index:%" PRId64 ", msgType:%s",
867
            vgId, pEntry->index, pEntry->term, role, currentTerm, pNode->restoreFinish, pNextEntry->index,
868
            TMSG_INFO(pNextEntry->originalRpcType));
869

870
        if ((code = syncNodeChangeConfig(pNode, pNextEntry, "Commit")) != 0) {
×
871
          sError("vgId:%d, failed to change config from Commit, index:%" PRId64 ", term:%" PRId64
×
872
                 ", role:%d, current term:%" PRId64,
873
                 vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
874
          goto _out;
×
875
        }
876

877
        // for 2->1, need to apply config change entry in sync thread,
878
        if (pNode->replicaNum == 1) {
×
879
          if ((code = syncFsmExecute(pNode, pFsm, role, currentTerm, pNextEntry, 0, true)) != 0) {
×
880
            sError("vgId:%d, failed to execute sync log entry, index:%" PRId64 ", term:%" PRId64
×
881
                   ", role:%d, current term:%" PRId64,
882
                   vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
883
            goto _out;
×
884
          }
885

886
          index++;
×
887
          pBuf->commitIndex = index;
×
888

889
          sGDebug(&pNextEntry->originRpcTraceId,
×
890
                  "vgId:%d, index:%" PRId64 ", raft entry committed, term:%" PRId64 ", role:%d, current term:%" PRId64,
891
                  pNode->vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
892
        }
893
      }
894
      if (!nextInBuf) {
781,165✔
895
        syncEntryDestroy(pNextEntry);
190,553✔
896
        pNextEntry = NULL;
190,552✔
897
      }
898
    }
899

900
    if (!inBuf) {
3,258,574✔
901
      syncEntryDestroy(pEntry);
190,579✔
902
      pEntry = NULL;
190,584✔
903
    }
904
  }
905

906
  // recycle
907
  bool      isVnode = pNode->vgId > 1;
2,837,607✔
908
  SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION;
2,837,607✔
909
  do {
2,277,598✔
910
    if ((pBuf->startIndex >= pBuf->commitIndex) ||
5,115,205!
911
        !((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD &&
5,115,606✔
912
                                         atomic_load_64(&tsLogBufferMemoryUsed) >= tsLogBufferMemoryAllowed))) {
43,222✔
913
      break;
914
    }
915
    SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem;
2,277,807✔
916
    if (pEntry == NULL) {
2,277,807!
917
      sError("vgId:%d, invalid log entry to recycle, index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
×
918
             ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64,
919
             pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term);
920
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
921
    }
922
    if (isVnode) {
2,277,807✔
923
      pBuf->bytes -= pEntry->bytes;
2,252,553✔
924
      (void)atomic_sub_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
2,252,553✔
925
    }
926
    sDebug("vgId:%d, recycle log entry, index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
2,277,821✔
927
           ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64
928
           ", used:%" PRId64 ", allowed:%" PRId64,
929
           pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term,
930
           pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed), tsLogBufferMemoryAllowed);
931
    syncEntryDestroy(pEntry);
2,277,822✔
932
    (void)memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
2,277,598✔
933
    ++pBuf->startIndex;
2,277,598✔
934
  } while (true);
935

936
  code = 0;
2,837,397✔
937
_out:
3,178,211✔
938
  // mark as restored if needed
939
  if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL &&
3,178,211✔
940
      currentTerm <= pEntry->term) {
61,415✔
941
    pNode->pFsm->FpRestoreFinishCb(pNode->pFsm, pBuf->commitIndex);
17,046✔
942
    pNode->restoreFinish = true;
17,046✔
943
    restoreFinishAtThisCommit = true;
17,046✔
944
    sInfo("vgId:%d, restore finished, term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
17,046✔
945
          pNode->vgId, currentTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
946
  }
947

948
  if (!inBuf) {
3,178,211✔
949
    syncEntryDestroy(pEntry);
324,552✔
950
    pEntry = NULL;
324,551✔
951
  }
952
  if (!nextInBuf) {
3,178,210✔
953
    syncEntryDestroy(pNextEntry);
2,793,926✔
954
    pNextEntry = NULL;
2,794,047✔
955
  }
956
  (void)taosThreadMutexUnlock(&pBuf->mutex);
3,178,331✔
957

958
  if (restoreFinishAtThisCommit && pNode->pFsm->FpAfterRestoredCb != NULL) {
3,178,875✔
959
    pNode->pFsm->FpAfterRestoredCb(pNode->pFsm, pBuf->commitIndex);
2,557✔
960
    sInfo("vgId:%d, after restore finished callback executed)", pNode->vgId);
2,557✔
961
  }
962

963
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
3,178,875!
964
  TAOS_RETURN(code);
3,178,831✔
965
}
966

967
void syncLogReplReset(SSyncLogReplMgr* pMgr) {
48,520✔
968
  if (pMgr == NULL) return;
48,520!
969

970
  if (pMgr->startIndex < 0) {
48,520!
971
    sError("failed to reset, pMgr->startIndex:%" PRId64, pMgr->startIndex);
×
972
    return;
×
973
  }
974
  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
68,456✔
975
    (void)memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
19,936✔
976
  }
977
  pMgr->startIndex = 0;
48,520✔
978
  pMgr->matchIndex = 0;
48,520✔
979
  pMgr->endIndex = 0;
48,520✔
980
  pMgr->restored = false;
48,520✔
981
  pMgr->retryBackoff = 0;
48,520✔
982
}
983

984
int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
837,954✔
985
  if (pMgr->endIndex <= pMgr->startIndex) {
837,954!
986
    return 0;
×
987
  }
988

989
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
837,954✔
990
  if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) {
837,954✔
991
    syncLogReplReset(pMgr);
36✔
992
    sWarn("vgId:%d, reset sync log repl since retry backoff exceeding limit, peer addr:0x%" PRIx64, pNode->vgId,
36!
993
          pDestId->addr);
994
    return TSDB_CODE_OUT_OF_RANGE;
36✔
995
  }
996

997
  int32_t  code = 0;
837,918✔
998
  bool     retried = false;
837,918✔
999
  int64_t  retryWaitMs = syncLogReplGetRetryBackoffTimeMs(pMgr);
837,918✔
1000
  int64_t  nowMs = taosGetMonoTimestampMs();
837,919✔
1001
  int      count = 0;
837,919✔
1002
  int64_t  firstIndex = -1;
837,919✔
1003
  SyncTerm term = -1;
837,919✔
1004
  int64_t  batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
837,919✔
1005

1006
  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
884,209✔
1007
    int64_t pos = index % pMgr->size;
874,643✔
1008
    if (!(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex))) {
874,643!
1009
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1010
      goto _out;
1✔
1011
    }
1012

1013
    if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) {
874,643✔
1014
      break;
828,345✔
1015
    }
1016

1017
    if (pMgr->states[pos].acked) {
46,413✔
1018
      if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) {
33,758✔
1019
        syncLogReplReset(pMgr);
1✔
1020
        sWarn("vgId:%d, reset sync log repl since stagnation, index:%" PRId64 ", peer addr:0x%" PRIx64, pNode->vgId, index,
1!
1021
              pDestId->addr);
1022
        code = TSDB_CODE_ACTION_IN_PROGRESS;
1✔
1023
        goto _out;
1✔
1024
      }
1025
      continue;
32,878✔
1026
    }
1027

1028
    bool barrier = false;
13,534✔
1029
    if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) {
13,534!
1030
      sError("vgId:%d, failed to replicate sync log entry since %s, index:%" PRId64 ", dest addr:0x%" PRIx64, pNode->vgId,
×
1031
             tstrerror(code), index, pDestId->addr);
1032
      goto _out;
×
1033
    }
1034
    if (barrier != pMgr->states[pos].barrier) {
13,527!
1035
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1036
      goto _out;
×
1037
    }
1038
    pMgr->states[pos].timeMs = nowMs;
13,527✔
1039
    pMgr->states[pos].term = term;
13,527✔
1040
    pMgr->states[pos].acked = false;
13,527✔
1041

1042
    retried = true;
13,527✔
1043
    if (firstIndex == -1) firstIndex = index;
13,527✔
1044

1045
    if (batchSize <= count++) {
13,527✔
1046
      break;
115✔
1047
    }
1048
  }
1049

1050
_out:
9,566✔
1051
  if (retried) {
837,912✔
1052
    pMgr->retryBackoff = syncLogReplGetNextRetryBackoff(pMgr);
377✔
1053
    SSyncLogBuffer* pBuf = pNode->pLogBuf;
377✔
1054
    sInfo("vgId:%d, resend %d sync log entries, dest addr:0x%" PRIx64 ", indexes:%" PRId64 " ..., terms: .., %" PRId64
377!
1055
          ", retryWaitMs:%" PRId64 ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64
1056
          " %" PRId64 ", %" PRId64 ")",
1057
          pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex,
1058
          pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1059
  }
1060
  TAOS_RETURN(code);
837,912✔
1061
}
1062

1063
int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
5,271✔
1064
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
5,271✔
1065
  SRaftId         destId = pMsg->srcId;
5,271✔
1066
  int32_t         code = 0;
5,271✔
1067
  if (pMgr->restored != false) return TSDB_CODE_SYN_INTERNAL_ERROR;
5,271!
1068

1069
  sTrace("vgId:%d, begin to recover sync log repl, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 ", %" PRId64
5,271!
1070
         ", %" PRId64 ") restore:%d, buffer: [%" PRId64 ", %" PRId64 ", %" PRId64 ", %" PRId64
1071
         "), msg: {lastSendIndex:%" PRId64 ", matchIndex:%" PRId64 ", fsmState:%d, success:%d, lastMatchTerm:%" PRId64
1072
         "}",
1073
         pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pMgr->restored,
1074
         pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, pMsg->lastSendIndex, pMsg->matchIndex,
1075
         pMsg->fsmState, pMsg->success, pMsg->lastMatchTerm);
1076

1077
  if (pMgr->endIndex == 0) {
5,271✔
1078
    if (pMgr->startIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
2,156!
1079
    if (pMgr->matchIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
2,156!
1080
    if (pMsg->matchIndex < 0) {
2,156✔
1081
      pMgr->restored = true;
324✔
1082
      sInfo("vgId:%d, sync log repl restored, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
324!
1083
            "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1084
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1085
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1086
      return 0;
324✔
1087
    }
1088
  } else {
1089
    if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) {
3,115✔
1090
      TAOS_CHECK_RETURN(syncLogReplRetryOnNeed(pMgr, pNode));
8!
1091
      return 0;
8✔
1092
    }
1093

1094
    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
3,107✔
1095

1096
    if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) {
3,107✔
1097
      pMgr->matchIndex = pMsg->matchIndex;
2,766✔
1098
      pMgr->restored = true;
2,766✔
1099
      sInfo("vgId:%d, sync log repl restored, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
2,766!
1100
            "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1101
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1102
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1103
      return 0;
2,766✔
1104
    }
1105

1106
    if (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE || (!pMsg->success && pMsg->matchIndex >= pMsg->lastSendIndex)) {
341!
1107
      char* msg1 = " rollback match index failure";
×
1108
      char* msg2 = " incomplete fsm state";
×
1109
      sInfo("vgId:%d, snapshot replication to dnode:%d, reason:%s, match index:%" PRId64 ", last sent:%" PRId64,
×
1110
            pNode->vgId, DID(&destId), (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1), pMsg->matchIndex,
1111
            pMsg->lastSendIndex);
1112
      if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) {
×
1113
        sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
×
1114
        TAOS_RETURN(code);
×
1115
      }
1116
      return 0;
×
1117
    }
1118
  }
1119

1120
  // check last match term
1121
  SyncTerm  term = -1;
2,173✔
1122
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
2,173✔
1123
  SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex);
2,173✔
1124
  SET_ERRNO(0);
2,173✔
1125

1126
  if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) {
2,173✔
1127
    code = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1, &term);
543✔
1128
    if (term < 0 && (ERRNO == ENFILE || ERRNO == EMFILE || ERRNO == ENOENT)) {
543!
1129
      sError("vgId:%d, failed to get prev log term since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index + 1);
×
1130
      TAOS_RETURN(code);
×
1131
    }
1132

1133
    if (pMsg->matchIndex == -1) {
543✔
1134
      // first time to restore
1135
      sInfo("vgId:%d, first time to restore sync log repl, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64
188!
1136
            ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), index:%" PRId64
1137
            ", firstVer:%" PRId64 ", term:%" PRId64 ", lastMatchTerm:%" PRId64,
1138
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1139
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, index, firstVer, term,
1140
            pMsg->lastMatchTerm);
1141
    }
1142

1143
    if ((index + 1 < firstVer) || (term < 0) ||
543!
1144
        (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
452!
1145
      if (!(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST)) return TSDB_CODE_SYN_INTERNAL_ERROR;
91!
1146
      if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) {
91!
1147
        sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
×
1148
        TAOS_RETURN(code);
×
1149
      }
1150
      sInfo("vgId:%d, snapshot replication to peer dnode:%d", pNode->vgId, DID(&destId));
91!
1151
      return 0;
91✔
1152
    }
1153

1154
    if (!(index + 1 >= firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR;
452!
1155

1156
    if (term == pMsg->lastMatchTerm) {
452✔
1157
      index = index + 1;
450✔
1158
      if (!(index <= pNode->pLogBuf->matchIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR;
450!
1159
    } else {
1160
      if (!(index > firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR;
2!
1161
    }
1162
  }
1163

1164
  // attempt to replicate the raft log at index
1165
  syncLogReplReset(pMgr);
2,082✔
1166
  return syncLogReplProbe(pMgr, pNode, index);
2,082✔
1167
}
1168

1169
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
51,194✔
1170
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
51,194✔
1171
  (void)taosThreadMutexLock(&pMgr->mutex);
51,194✔
1172
  if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
51,196!
1173
    sInfo("vgId:%d, reset sync log repl in heartbeat, peer addr:0x%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64,
2,746!
1174
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
1175
    syncLogReplReset(pMgr);
2,746✔
1176
    pMgr->peerStartTime = pMsg->startTime;
2,746✔
1177
  }
1178
  (void)taosThreadMutexUnlock(&pMgr->mutex);
51,196✔
1179
  return 0;
51,196✔
1180
}
1181

1182
int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
462,334✔
1183
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
462,334✔
1184
  (void)taosThreadMutexLock(&pMgr->mutex);
462,334✔
1185
  if (pMsg->startTime != pMgr->peerStartTime) {
462,336✔
1186
    sInfo("vgId:%d, reset sync log repl in appendlog reply, peer addr:0x%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64,
441!
1187
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
1188
    syncLogReplReset(pMgr);
441✔
1189
    pMgr->peerStartTime = pMsg->startTime;
441✔
1190
  }
1191
  (void)taosThreadMutexUnlock(&pMgr->mutex);
462,336✔
1192

1193
  (void)taosThreadMutexLock(&pBuf->mutex);
462,336✔
1194

1195
  int32_t code = 0;
462,336✔
1196
  if (pMgr->restored) {
462,336✔
1197
    if ((code = syncLogReplContinue(pMgr, pNode, pMsg)) != 0) {
457,065✔
1198
      sWarn("vgId:%d, failed to continue sync log repl since %s", pNode->vgId, tstrerror(code));
2!
1199
    }
1200
  } else {
1201
    if ((code = syncLogReplRecover(pMgr, pNode, pMsg)) != 0) {
5,271!
1202
      sWarn("vgId:%d, failed to recover sync log repl since %s", pNode->vgId, tstrerror(code));
×
1203
    }
1204
  }
1205
  (void)taosThreadMutexUnlock(&pBuf->mutex);
462,334✔
1206
  return 0;
462,335✔
1207
}
1208

1209
int32_t syncLogReplStart(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
425,634✔
1210
  if (pMgr->restored) {
425,634✔
1211
    TAOS_CHECK_RETURN(syncLogReplAttempt(pMgr, pNode));
380,939✔
1212
  } else {
1213
    TAOS_CHECK_RETURN(syncLogReplProbe(pMgr, pNode, pNode->pLogBuf->matchIndex));
44,695!
1214
  }
1215
  return 0;
425,582✔
1216
}
1217

1218
int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
46,799✔
1219
  if (pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR;
46,799!
1220
  if (!(pMgr->startIndex >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
46,799!
1221
  int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs();
46,799✔
1222
  int64_t nowMs = taosGetMonoTimestampMs();
46,800✔
1223
  int32_t code = 0;
46,800✔
1224

1225
  sTrace("vgId:%d, begin to probe peer addr:0x%" PRIx64 " with msg of index:%" PRId64 ", repl-mgr:[%" PRId64 ", %" PRId64
46,800!
1226
         ", %" PRId64 "), restored:%d",
1227
         pNode->vgId, pNode->replicasId[pMgr->peerId].addr, index, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1228
         pMgr->restored);
1229

1230
  if (pMgr->endIndex > pMgr->startIndex &&
46,800✔
1231
      nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) {
41,215✔
1232
    return 0;
41,023✔
1233
  }
1234
  syncLogReplReset(pMgr);
5,777✔
1235

1236
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
5,775✔
1237
  bool     barrier = false;
5,775✔
1238
  SyncTerm term = -1;
5,775✔
1239
  if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) {
5,775!
1240
    sError("vgId:%d, failed to replicate log entry since %s, index:%" PRId64 ", dest addr:0x%016" PRIx64, pNode->vgId,
×
1241
           tstrerror(code), index, pDestId->addr);
1242
    TAOS_RETURN(code);
×
1243
  }
1244

1245
  if (!(index >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
5,778!
1246
  pMgr->states[index % pMgr->size].barrier = barrier;
5,778✔
1247
  pMgr->states[index % pMgr->size].timeMs = nowMs;
5,778✔
1248
  pMgr->states[index % pMgr->size].term = term;
5,778✔
1249
  pMgr->states[index % pMgr->size].acked = false;
5,778✔
1250

1251
  pMgr->startIndex = index;
5,778✔
1252
  pMgr->endIndex = index + 1;
5,778✔
1253

1254
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
5,778✔
1255
  sTrace("vgId:%d, probe peer addr:0x%" PRIx64 " with msg of index:%" PRId64 " term:%" PRId64 ", repl-mgr:[%" PRId64
5,778!
1256
         " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1257
         pNode->vgId, pDestId->addr, index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex,
1258
         pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1259
  return 0;
5,778✔
1260
}
1261

1262
int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
838,003✔
1263
  if (!pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR;
838,003!
1264

1265
  sTrace("vgId:%d, replicate raft entries from end to match, repl-mgr:[%" PRId64 ", %" PRId64 ", %" PRId64
838,003!
1266
         "), restore:%d",
1267
         pNode->vgId, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pMgr->restored);
1268

1269
  SRaftId*  pDestId = &pNode->replicasId[pMgr->peerId];
838,003✔
1270
  int32_t   batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
838,003✔
1271
  int32_t   code = 0;
838,003✔
1272
  int32_t   count = 0;
838,003✔
1273
  int64_t   nowMs = taosGetMonoTimestampMs();
837,994✔
1274
  int64_t   limit = pMgr->size >> 1;
837,994✔
1275
  SyncTerm  term = -1;
837,994✔
1276
  SyncIndex firstIndex = -1;
837,994✔
1277

1278
  for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) {
1,299,389✔
1279
    if (batchSize < count || limit <= index - pMgr->startIndex) {
498,579✔
1280
      break;
1281
    }
1282
    if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) {
486,270✔
1283
      break;
22,443✔
1284
    }
1285
    int64_t  pos = index % pMgr->size;
463,827✔
1286
    SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
463,827✔
1287
    bool     barrier = false;
463,827✔
1288
    SyncTerm term = -1;
463,827✔
1289

1290
    code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier);
463,827✔
1291
    if (code < 0) {
463,826✔
1292
      sError("vgId:%d, failed to replicate log entry since %s, index:%" PRId64 ", dest addr:0x%016" PRIx64, pNode->vgId,
41!
1293
             tstrerror(code), index, pDestId->addr);
1294
      TAOS_RETURN(code);
41✔
1295
    }
1296
    pMgr->states[pos].barrier = barrier;
463,785✔
1297
    pMgr->states[pos].timeMs = nowMs;
463,785✔
1298
    pMgr->states[pos].term = term;
463,785✔
1299
    pMgr->states[pos].acked = false;
463,785✔
1300

1301
    if (firstIndex == -1) {
463,785✔
1302
      firstIndex = index;
380,530✔
1303
    }
1304

1305
    count++;
463,785✔
1306

1307
    pMgr->endIndex = index + 1;
463,785✔
1308
    if (barrier) {
463,785✔
1309
      sInfo("vgId:%d, replicated sync barrier to dnode:%d, index:%" PRId64 ", term:%" PRId64 ", repl-mgr:[%" PRId64
2,390!
1310
            " %" PRId64 ", %" PRId64 ")",
1311
            pNode->vgId, DID(pDestId), index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
1312
      break;
2,390✔
1313
    }
1314
  }
1315

1316
  TAOS_CHECK_RETURN(syncLogReplRetryOnNeed(pMgr, pNode));
837,952✔
1317

1318
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
837,917✔
1319
  sTrace("vgId:%d, replicated %d msgs to peer addr:0x%" PRIx64 ", indexes:%" PRId64 "..., terms: ...%" PRId64
837,917!
1320
         ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
1321
         ")",
1322
         pNode->vgId, count, pDestId->addr, firstIndex, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1323
         pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1324
  return 0;
837,921✔
1325
}
1326

1327
int32_t syncLogReplContinue(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
457,064✔
1328
  if (pMgr->restored != true) return TSDB_CODE_SYN_INTERNAL_ERROR;
457,064!
1329
  if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) {
457,064✔
1330
    if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) {
442,462!
1331
      int64_t firstMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs;
×
1332
      int64_t lastMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs;
×
1333
      int64_t diffMs = lastMs - firstMs;
×
1334
      if (diffMs > 0 && diffMs < ((int64_t)SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) {
×
1335
        pMgr->retryBackoff -= 1;
×
1336
      }
1337
    }
1338
    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
442,462✔
1339
    pMgr->matchIndex = TMAX(pMgr->matchIndex, pMsg->matchIndex);
442,462✔
1340
    for (SyncIndex index = pMgr->startIndex; index < pMgr->matchIndex; index++) {
887,651✔
1341
      (void)memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
445,189✔
1342
    }
1343
    pMgr->startIndex = pMgr->matchIndex;
442,462✔
1344
  }
1345

1346
  return syncLogReplAttempt(pMgr, pNode);
457,064✔
1347
}
1348

1349
SSyncLogReplMgr* syncLogReplCreate() {
242,157✔
1350
  SSyncLogReplMgr* pMgr = taosMemoryCalloc(1, sizeof(SSyncLogReplMgr));
242,157!
1351
  if (pMgr == NULL) {
242,182!
1352
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1353
    return NULL;
×
1354
  }
1355

1356
  pMgr->size = sizeof(pMgr->states) / sizeof(pMgr->states[0]);
242,182✔
1357

1358
  if (pMgr->size != TSDB_SYNC_LOG_BUFFER_SIZE) {
242,182!
1359
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1360
    return NULL;
×
1361
  }
1362

1363
  int32_t code = taosThreadMutexInit(&pMgr->mutex, NULL);
242,182✔
1364
  if (code) {
242,160!
1365
    terrno = code;
×
1366
    return NULL;
×
1367
  }
1368

1369
  return pMgr;
242,160✔
1370
}
1371

1372
void syncLogReplDestroy(SSyncLogReplMgr* pMgr) {
242,039✔
1373
  if (pMgr == NULL) {
242,039!
1374
    return;
×
1375
  }
1376
  (void)taosThreadMutexDestroy(&pMgr->mutex);
242,039✔
1377
  taosMemoryFree(pMgr);
242,056!
1378
  return;
242,039✔
1379
}
1380

1381
int32_t syncNodeLogReplInit(SSyncNode* pNode) {
16,147✔
1382
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
258,308✔
1383
    if (pNode->logReplMgrs[i] != NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
242,157!
1384
    pNode->logReplMgrs[i] = syncLogReplCreate();
242,157✔
1385
    if (pNode->logReplMgrs[i] == NULL) {
242,161!
1386
      TAOS_RETURN(terrno);
×
1387
    }
1388
    pNode->logReplMgrs[i]->peerId = i;
242,161✔
1389
  }
1390
  return 0;
16,151✔
1391
}
1392

1393
void syncNodeLogReplDestroy(SSyncNode* pNode) {
16,142✔
1394
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
258,188✔
1395
    syncLogReplDestroy(pNode->logReplMgrs[i]);
242,037✔
1396
    pNode->logReplMgrs[i] = NULL;
242,046✔
1397
  }
1398
}
16,151✔
1399

1400
int32_t syncLogBufferCreate(SSyncLogBuffer** ppBuf) {
16,144✔
1401
  int32_t         code = 0;
16,144✔
1402
  SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer));
16,144!
1403
  if (pBuf == NULL) {
16,147!
1404
    TAOS_CHECK_GOTO(terrno, NULL, _exit);
×
1405
  }
1406

1407
  pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]);
16,147✔
1408

1409
  if (pBuf->size != TSDB_SYNC_LOG_BUFFER_SIZE) {
16,147!
1410
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1411
    goto _exit;
×
1412
  }
1413

1414
  if (taosThreadMutexAttrInit(&pBuf->attr) < 0) {
16,147!
1415
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1416
    sError("failed to init log buffer mutexattr due to %s", tstrerror(code));
×
1417
    goto _exit;
×
1418
  }
1419

1420
  if (taosThreadMutexAttrSetType(&pBuf->attr, PTHREAD_MUTEX_RECURSIVE) < 0) {
16,147!
1421
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1422
    sError("failed to set log buffer mutexattr type due to %s", tstrerror(code));
×
1423
    goto _exit;
×
1424
  }
1425

1426
  if (taosThreadMutexInit(&pBuf->mutex, &pBuf->attr) < 0) {
16,146!
1427
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1428
    sError("failed to init log buffer mutex due to %s", tstrerror(code));
×
1429
    goto _exit;
×
1430
  }
1431
_exit:
16,145✔
1432
  if (code != 0) {
16,145!
1433
    taosMemoryFreeClear(pBuf);
×
1434
  }
1435
  *ppBuf = pBuf;
16,145✔
1436
  TAOS_RETURN(code);
16,145✔
1437
}
1438

1439
void syncLogBufferClear(SSyncLogBuffer* pBuf) {
16,144✔
1440
  (void)taosThreadMutexLock(&pBuf->mutex);
16,144✔
1441
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
822,117✔
1442
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
808,286✔
1443
    if (pEntry == NULL) continue;
808,286✔
1444
    syncEntryDestroy(pEntry);
807,544✔
1445
    pEntry = NULL;
805,232✔
1446
    (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
805,232✔
1447
  }
1448
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
13,831✔
1449
  pBuf->bytes = 0;
13,831✔
1450
  (void)taosThreadMutexUnlock(&pBuf->mutex);
13,831✔
1451
}
16,144✔
1452

1453
void syncLogBufferDestroy(SSyncLogBuffer* pBuf) {
16,144✔
1454
  if (pBuf == NULL) {
16,144!
1455
    return;
×
1456
  }
1457
  syncLogBufferClear(pBuf);
16,144✔
1458
  (void)taosThreadMutexDestroy(&pBuf->mutex);
16,144✔
1459
  (void)taosThreadMutexAttrDestroy(&pBuf->attr);
16,144✔
1460
  taosMemoryFree(pBuf);
16,144!
1461
  return;
16,144✔
1462
}
1463

1464
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
20,276✔
1465
  int32_t code = 0;
20,276✔
1466
  if (!(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR;
20,276!
1467

1468
  if (toIndex == pBuf->endIndex) {
20,277✔
1469
    return 0;
20,265✔
1470
  }
1471

1472
  sInfo("vgId:%d, rollback sync log buffer, toindex:%" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
12!
1473
        ")",
1474
        pNode->vgId, toIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1475

1476
  // trunc buffer
1477
  SyncIndex index = pBuf->endIndex - 1;
12✔
1478
  while (index >= toIndex) {
614✔
1479
    SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem;
602✔
1480
    if (pEntry != NULL) {
602✔
1481
      syncEntryDestroy(pEntry);
14✔
1482
      pEntry = NULL;
14✔
1483
      (void)memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0]));
14✔
1484
    }
1485
    index--;
602✔
1486
  }
1487
  pBuf->endIndex = toIndex;
12✔
1488
  pBuf->matchIndex = TMIN(pBuf->matchIndex, index);
12✔
1489
  if (index + 1 != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
12!
1490

1491
  // trunc wal
1492
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
12✔
1493
  if (lastVer >= toIndex && (code = pNode->pLogStore->syncLogTruncate(pNode->pLogStore, toIndex)) < 0) {
12!
1494
    sError("vgId:%d, failed to truncate log store since %s, from index:%" PRId64, pNode->vgId, tstrerror(code),
×
1495
           toIndex);
1496
    TAOS_RETURN(code);
×
1497
  }
1498
  lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
12✔
1499
  if (toIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
12!
1500

1501
  // refill buffer on need
1502
  if (toIndex <= pBuf->startIndex) {
12!
1503
    if ((code = syncLogBufferInitWithoutLock(pBuf, pNode)) < 0) {
×
1504
      sError("vgId:%d, failed to refill sync log buffer since %s", pNode->vgId, tstrerror(code));
×
1505
      TAOS_RETURN(code);
×
1506
    }
1507
  }
1508

1509
  if (pBuf->endIndex != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
12!
1510
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
12!
1511
  return 0;
12✔
1512
}
1513

1514
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
20,273✔
1515
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
20,273!
1516
  (void)taosThreadMutexLock(&pBuf->mutex);
20,275✔
1517
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
20,274✔
1518
  if (lastVer != pBuf->matchIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
20,275!
1519
  SyncIndex index = pBuf->endIndex - 1;
20,275✔
1520

1521
  int32_t code = 0;
20,275✔
1522
  if ((code = syncLogBufferRollback(pBuf, pNode, pBuf->matchIndex + 1)) != 0) {
20,275!
1523
    sError("vgId:%d, failed to rollback sync log buffer since %s", pNode->vgId, tstrerror(code));
×
1524
  }
1525

1526
  sInfo("vgId:%d, reset sync log buffer, buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
20,274✔
1527
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1528

1529
  pBuf->endIndex = pBuf->matchIndex + 1;
20,275✔
1530

1531
  // reset repl mgr
1532
  for (int i = 0; i < pNode->totalReplicaNum; i++) {
57,581✔
1533
    SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
37,306✔
1534
    syncLogReplReset(pMgr);
37,306✔
1535
  }
1536
  (void)taosThreadMutexUnlock(&pBuf->mutex);
20,275✔
1537
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
20,275!
1538
  return 0;
20,275✔
1539
}
1540

1541
int32_t syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf,
7,470,507✔
1542
                                 SSyncRaftEntry** ppEntry) {
1543
  int32_t code = 0;
7,470,507✔
1544

1545
  *ppEntry = NULL;
7,470,507✔
1546

1547
  if (index >= pBuf->endIndex) {
7,470,507✔
1548
    return TSDB_CODE_OUT_OF_RANGE;
2,860,997✔
1549
  }
1550

1551
  if (index > pBuf->startIndex) {  // startIndex might be dummy
4,609,510✔
1552
    *pInBuf = true;
4,153,873✔
1553
    *ppEntry = pBuf->entries[index % pBuf->size].pItem;
4,153,873✔
1554
  } else {
1555
    *pInBuf = false;
455,637✔
1556

1557
    if ((code = pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, index, ppEntry)) < 0) {
455,637✔
1558
      sWarn("vgId:%d, failed to get log entry since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
635!
1559
    }
1560
  }
1561
  TAOS_RETURN(code);
4,610,205✔
1562
}
1563

1564
int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId,
483,116✔
1565
                          bool* pBarrier) {
1566
  SSyncRaftEntry* pEntry = NULL;
483,116✔
1567
  SRpcMsg         msgOut = {0};
483,116✔
1568
  bool            inBuf = false;
483,116✔
1569
  SyncTerm        prevLogTerm = -1;
483,116✔
1570
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
483,116✔
1571
  int32_t         code = 0;
483,116✔
1572
  int32_t         lino = 0;
483,116✔
1573

1574
  code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry);
483,116✔
1575
  if (pEntry == NULL) {
483,132✔
1576
    sWarn("vgId:%d, failed to get raft entry for index:%" PRId64, pNode->vgId, index);
45✔
1577
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) {
45✔
1578
      SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
41✔
1579
      if (pMgr) {
41!
1580
        sInfo("vgId:%d, reset sync log repl of peer addr:0x%" PRIx64 " since %s, index:%" PRId64, pNode->vgId, pDestId->addr,
41!
1581
              tstrerror(code), index);
1582
        syncLogReplReset(pMgr);
41✔
1583
      }
1584
    }
1585
    goto _err;
41✔
1586
  }
1587
  *pBarrier = syncLogReplBarrier(pEntry);
483,087✔
1588

1589
  code = syncLogReplGetPrevLogTerm(pMgr, pNode, index, &prevLogTerm);
483,087✔
1590
  if (prevLogTerm < 0) {
483,082!
1591
    sError("vgId:%d, failed to get prev log term since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
×
1592
    goto _err;
×
1593
  }
1594
  if (pTerm) *pTerm = pEntry->term;
483,082✔
1595

1596
  code = syncBuildAppendEntriesFromRaftEntry(pNode, pEntry, prevLogTerm, &msgOut);
483,082✔
1597
  if (code < 0) {
483,090!
1598
    sError("vgId:%d, failed to get append entries for index:%" PRId64, pNode->vgId, index);
×
1599
    goto _err;
×
1600
  }
1601

1602
  TRACE_SET_MSGID(&(msgOut.info.traceId), tGenIdPI64());
483,090✔
1603
  sGDebug(&msgOut.info.traceId,
483,085!
1604
          "vgId:%d, index:%" PRId64 ", replicate one msg to dest addr:0x%" PRIx64 ", term:%" PRId64 " prevterm:%" PRId64,
1605
          pNode->vgId, pEntry->index, pDestId->addr, pEntry->term, prevLogTerm);
1606
  TAOS_CHECK_GOTO(syncNodeSendAppendEntries(pNode, pDestId, &msgOut), &lino, _err);
483,085!
1607

1608
  if (!inBuf) {
483,091✔
1609
    syncEntryDestroy(pEntry);
72,458✔
1610
    pEntry = NULL;
72,458✔
1611
  }
1612
  return 0;
483,091✔
1613

1614
_err:
41✔
1615
  rpcFreeCont(msgOut.pCont);
41✔
1616
  msgOut.pCont = NULL;
41✔
1617
  if (!inBuf) {
41!
1618
    syncEntryDestroy(pEntry);
41✔
1619
    pEntry = NULL;
41✔
1620
  }
1621
  TAOS_RETURN(code);
41✔
1622
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc