• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3559

18 Dec 2024 12:59AM UTC coverage: 59.805% (+0.03%) from 59.778%
#3559

push

travis-ci

web-flow
Merge pull request #29187 from taosdata/merge/mainto3.0

merge: main to 3.0 branch

132705 of 287544 branches covered (46.15%)

Branch coverage included in aggregate %.

87 of 95 new or added lines in 19 files covered. (91.58%)

1132 existing lines in 133 files now uncovered.

209591 of 284807 relevant lines covered (73.59%)

8125235.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.96
/source/libs/sync/src/syncPipeline.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17

18
#include "syncPipeline.h"
19
#include "syncCommit.h"
20
#include "syncIndexMgr.h"
21
#include "syncInt.h"
22
#include "syncRaftCfg.h"
23
#include "syncRaftEntry.h"
24
#include "syncRaftStore.h"
25
#include "syncReplication.h"
26
#include "syncRespMgr.h"
27
#include "syncSnapshot.h"
28
#include "syncUtil.h"
29
#include "syncVoteMgr.h"
30

31
static int64_t tsLogBufferMemoryUsed = 0;  // total bytes of vnode log buffer
32

33
static bool syncIsMsgBlock(tmsg_t type) {
3,153,442✔
34
  return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
3,144,397!
35
         (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
6,297,839✔
36
}
37

38
FORCE_INLINE static int64_t syncGetRetryMaxWaitMs() {
39
  return SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF);
30,327✔
40
}
41

42
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
9,703,729✔
43
  (void)taosThreadMutexLock(&pBuf->mutex);
9,703,729✔
44
  int64_t index = pBuf->endIndex;
9,703,965✔
45
  (void)taosThreadMutexUnlock(&pBuf->mutex);
9,703,965✔
46
  return index;
9,703,961✔
47
}
48

49
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
9,703,766✔
50
  int32_t code = 0;
9,703,766✔
51
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
9,703,766!
52
  (void)taosThreadMutexLock(&pBuf->mutex);
9,703,965✔
53
  SyncIndex index = pEntry->index;
9,703,955✔
54

55
  if (index - pBuf->startIndex >= pBuf->size) {
9,703,955!
56
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
57
    sError("vgId:%d, failed to append since %s. index:%" PRId64 "", pNode->vgId, tstrerror(code), index);
×
58
    goto _err;
×
59
  }
60

61
  if (pNode->restoreFinish && index - pBuf->commitIndex >= TSDB_SYNC_NEGOTIATION_WIN) {
9,703,955!
UNCOV
62
    code = TSDB_CODE_SYN_NEGOTIATION_WIN_FULL;
×
UNCOV
63
    sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64, pNode->vgId, tstrerror(code),
×
64
           index, pBuf->commitIndex);
UNCOV
65
    goto _err;
×
66
  }
67

68
  SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
9,703,955✔
69
  if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= TSDB_SYNC_APPLYQ_SIZE_LIMIT) {
9,703,813!
70
    code = TSDB_CODE_SYN_WRITE_STALL;
×
71
    sError("vgId:%d, failed to append since %s. index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64,
×
72
           pNode->vgId, tstrerror(code), index, pBuf->commitIndex, appliedIndex);
73
    goto _err;
×
74
  }
75

76
  if (index != pBuf->endIndex) {
9,703,813!
77
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
78
    goto _err;
×
79
  };
80

81
  SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem;
9,703,813✔
82
  if (pExist != NULL) {
9,703,813!
83
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
84
    goto _err;
×
85
  }
86

87
  // initial log buffer with at least one item, e.g. commitIndex
88
  SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem;
9,703,813✔
89
  if (pMatch == NULL) {
9,703,813!
90
    sError("vgId:%d, no matched log entry", pNode->vgId);
×
91
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
92
    goto _err;
×
93
  }
94
  if (pMatch->index + 1 != index) {
9,703,813!
95
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
96
    goto _err;
×
97
  }
98
  if (!(pMatch->term <= pEntry->term)) {
9,703,813!
99
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
100
    goto _err;
×
101
  }
102

103
  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
9,703,813✔
104
  pBuf->entries[index % pBuf->size] = tmp;
9,703,813✔
105
  pBuf->endIndex = index + 1;
9,703,813✔
106
  if (pNode->vgId > 1) {
9,703,813✔
107
    pBuf->bytes += pEntry->bytes;
9,628,941✔
108
    (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
9,628,941✔
109
  }
110

111
  (void)taosThreadMutexUnlock(&pBuf->mutex);
9,703,898✔
112
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
9,703,987!
113
  return 0;
9,703,876✔
114

UNCOV
115
_err:
×
UNCOV
116
  (void)taosThreadMutexUnlock(&pBuf->mutex);
×
UNCOV
117
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
×
UNCOV
118
  taosMsleep(1);
×
UNCOV
119
  TAOS_RETURN(code);
×
120
}
121

122
int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pSyncTerm) {
2,768,759✔
123
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
2,768,759✔
124
  SSyncRaftEntry* pEntry = NULL;
2,768,759✔
125
  SyncIndex       prevIndex = index - 1;
2,768,759✔
126
  SyncTerm        prevLogTerm = -1;
2,768,759✔
127
  int32_t         code = 0;
2,768,759✔
128

129
  if (prevIndex == -1 && pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore) == 0) {
2,768,759✔
130
    *pSyncTerm = 0;
1,865✔
131
    return 0;
1,865✔
132
  }
133

134
  if (prevIndex > pBuf->matchIndex) {
2,766,894✔
135
    *pSyncTerm = -1;
36✔
136
    TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
36✔
137
  }
138

139
  if (index - 1 != prevIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
2,766,858!
140

141
  if (prevIndex >= pBuf->startIndex) {
2,766,858✔
142
    pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem;
290,275✔
143
    if (pEntry == NULL) {
290,275!
144
      sError("vgId:%d, failed to get pre log term since no log entry found", pNode->vgId);
×
145
      *pSyncTerm = -1;
×
146
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
147
    }
148
    prevLogTerm = pEntry->term;
290,275✔
149
    *pSyncTerm = prevLogTerm;
290,275✔
150
    return 0;
290,275✔
151
  }
152

153
  if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) {
2,476,583✔
154
    int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs;
2,465,788✔
155
    if (timeMs == 0) {
2,465,788!
156
      sError("vgId:%d, failed to get pre log term since timeMs is 0", pNode->vgId);
×
157
      *pSyncTerm = -1;
×
158
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
159
    }
160
    prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term;
2,465,788✔
161
    if (!(prevIndex == 0 || prevLogTerm != 0)) {
2,465,788!
162
      sError("vgId:%d, failed to get pre log term prevIndex:%" PRId64 ", prevLogTerm:%" PRId64, pNode->vgId, prevIndex,
×
163
             prevLogTerm);
164
      *pSyncTerm = -1;
×
165
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
166
    }
167
    *pSyncTerm = prevLogTerm;
2,465,788✔
168
    return 0;
2,465,788✔
169
  }
170

171
  SSnapshot snapshot = {0};
10,795✔
172
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
10,795✔
173
  if (prevIndex == snapshot.lastApplyIndex) {
10,797✔
174
    *pSyncTerm = snapshot.lastApplyTerm;
6✔
175
    return 0;
6✔
176
  }
177

178
  if ((code = pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, prevIndex, &pEntry)) == 0) {
10,791✔
179
    prevLogTerm = pEntry->term;
10,729✔
180
    syncEntryDestroy(pEntry);
10,729✔
181
    pEntry = NULL;
10,729✔
182
    *pSyncTerm = prevLogTerm;
10,729✔
183
    return 0;
10,729✔
184
  }
185

186
  *pSyncTerm = -1;
63✔
187
  sInfo("vgId:%d, failed to get log term since %s. index:%" PRId64, pNode->vgId, tstrerror(code), prevIndex);
63!
188
  TAOS_RETURN(code);
63✔
189
}
190

191
SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId) {
11,606✔
192
  return syncEntryBuildNoop(term, index, vgId);
11,606✔
193
}
194

195
int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex) {
11,632✔
196
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
11,632✔
197
  if (firstVer > commitIndex + 1) {
11,632!
198
    sError("vgId:%d, firstVer of WAL log greater than tsdb commit version + 1. firstVer:%" PRId64
×
199
           ", tsdb commit version:%" PRId64 "",
200
           pNode->vgId, firstVer, commitIndex);
201
    return TSDB_CODE_WAL_LOG_INCOMPLETE;
×
202
  }
203

204
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
11,632✔
205
  if (lastVer < commitIndex) {
11,632!
206
    sError("vgId:%d, lastVer of WAL log less than tsdb commit version. lastVer:%" PRId64
×
207
           ", tsdb commit version:%" PRId64 "",
208
           pNode->vgId, lastVer, commitIndex);
209
    return TSDB_CODE_WAL_LOG_INCOMPLETE;
×
210
  }
211

212
  return 0;
11,632✔
213
}
214

215
int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
11,631✔
216
  if (pNode->pLogStore == NULL) {
11,631!
217
    sError("log store not created");
×
218
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
219
  }
220
  if (pNode->pFsm == NULL) {
11,631!
221
    sError("pFsm not registered");
×
222
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
223
  }
224
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
11,631!
225
    sError("FpGetSnapshotInfo not registered");
×
226
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
227
  }
228

229
  int32_t   code = 0, lino = 0;
11,631✔
230
  SSnapshot snapshot = {0};
11,631✔
231
  TAOS_CHECK_EXIT(pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot));
11,631!
232

233
  SyncIndex commitIndex = snapshot.lastApplyIndex;
11,632✔
234
  SyncTerm  commitTerm = TMAX(snapshot.lastApplyTerm, 0);
11,632✔
235
  TAOS_CHECK_EXIT(syncLogValidateAlignmentOfCommit(pNode, commitIndex));
11,632!
236

237
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
11,632✔
238
  if (lastVer < commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
11,631!
239
  ;
240
  SyncIndex toIndex = lastVer;
11,631✔
241
  // update match index
242
  pBuf->commitIndex = commitIndex;
11,631✔
243
  pBuf->matchIndex = toIndex;
11,631✔
244
  pBuf->endIndex = toIndex + 1;
11,631✔
245

246
  // load log entries in reverse order
247
  SSyncLogStore*  pLogStore = pNode->pLogStore;
11,631✔
248
  SyncIndex       index = toIndex;
11,631✔
249
  SSyncRaftEntry* pEntry = NULL;
11,631✔
250
  bool            takeDummy = false;
11,631✔
251
  int             emptySize = (TSDB_SYNC_LOG_BUFFER_SIZE >> 1);
11,631✔
252

253
  while (true) {
202,518✔
254
    if (index <= pBuf->commitIndex) {
214,149✔
255
      takeDummy = true;
11,606✔
256
      break;
11,606✔
257
    }
258

259
    if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) {
202,543!
UNCOV
260
      sWarn("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, tstrerror(code), index);
×
261
      break;
×
262
    }
263

264
    bool taken = false;
202,503✔
265
    if (toIndex - index + 1 <= pBuf->size - emptySize) {
202,503✔
266
      SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1};
202,472✔
267
      pBuf->entries[index % pBuf->size] = tmp;
202,472✔
268
      taken = true;
202,472✔
269
      if (pNode->vgId > 1) {
202,472✔
270
        pBuf->bytes += pEntry->bytes;
201,652✔
271
        (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
201,652✔
272
      }
273
    }
274

275
    if (index < toIndex) {
202,543✔
276
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = pEntry->index;
200,914✔
277
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = pEntry->term;
200,914✔
278
    }
279

280
    if (!taken) {
202,543✔
281
      syncEntryDestroy(pEntry);
25✔
282
      pEntry = NULL;
25✔
283
      break;
25✔
284
    }
285

286
    index--;
202,518✔
287
  }
288

289
  // put a dummy record at commitIndex if present in log buffer
290
  if (takeDummy) {
11,631✔
291
    if (index != pBuf->commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
11,607!
292

293
    SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId);
11,607✔
294
    if (pDummy == NULL) {
11,606!
295
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
296
    }
297
    SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm};
11,606✔
298
    pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp;
11,606✔
299
    if (pNode->vgId > 1) {
11,606✔
300
      pBuf->bytes += pDummy->bytes;
10,088✔
301
      (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pDummy->bytes);
10,088✔
302
    }
303

304
    if (index < toIndex) {
11,607✔
305
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex;
1,598✔
306
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = commitTerm;
1,598✔
307
    }
308
  }
309

310
  // update startIndex
311
  pBuf->startIndex = takeDummy ? index : index + 1;
11,631✔
312

313
  pBuf->isCatchup = false;
11,631✔
314

315
  sInfo("vgId:%d, init sync log buffer. buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
11,631✔
316
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
317

318
  // validate
319
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
11,631!
320
  return 0;
11,632✔
321

322
_exit:
×
323
  if (code != 0) {
×
324
    sError("vgId:%d, failed to initialize sync log buffer at line %d since %s.", pNode->vgId, lino, tstrerror(code));
×
325
  }
326
  TAOS_RETURN(code);
×
327
}
328

329
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
11,561✔
330
  (void)taosThreadMutexLock(&pBuf->mutex);
11,561✔
331
  int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
11,569✔
332
  (void)taosThreadMutexUnlock(&pBuf->mutex);
11,570✔
333
  return ret;
11,570✔
334
}
335

336
int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
62✔
337
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
62!
338
  (void)taosThreadMutexLock(&pBuf->mutex);
62✔
339
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
124✔
340
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
62✔
341
    if (pEntry == NULL) continue;
62!
342
    syncEntryDestroy(pEntry);
62✔
343
    pEntry = NULL;
62✔
344
    (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
62✔
345
  }
346
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
62✔
347
  pBuf->bytes = 0;
62✔
348
  int32_t code = syncLogBufferInitWithoutLock(pBuf, pNode);
62✔
349
  if (code < 0) {
62!
350
    sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code));
×
351
  }
352
  (void)taosThreadMutexUnlock(&pBuf->mutex);
62✔
353
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
62!
354
  return code;
62✔
355
}
356

357
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTermWithoutLock(SSyncLogBuffer* pBuf) {
358
  SyncIndex       index = pBuf->matchIndex;
2,765,426✔
359
  SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
2,765,426✔
360
  if (pEntry == NULL) {
30,364!
361
    sError("failed to get last match term since entry is null");
×
362
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
363
    return -1;
×
364
  }
365
  return pEntry->term;
2,765,429✔
366
}
367

368
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
30,364✔
369
  (void)taosThreadMutexLock(&pBuf->mutex);
30,364✔
370
  SyncTerm term = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
30,364✔
371
  (void)taosThreadMutexUnlock(&pBuf->mutex);
30,364✔
372
  return term;
30,364✔
373
}
374

375
bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf) {
30,364✔
376
  (void)taosThreadMutexLock(&pBuf->mutex);
30,364✔
377
  bool empty = (pBuf->endIndex <= pBuf->startIndex);
30,364✔
378
  (void)taosThreadMutexUnlock(&pBuf->mutex);
30,364✔
379
  return empty;
30,364✔
380
}
381

382
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
2,735,063✔
383
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
2,735,063!
384
  (void)taosThreadMutexLock(&pBuf->mutex);
2,735,065✔
385
  int32_t         code = 0;
2,735,062✔
386
  SyncIndex       index = pEntry->index;
2,735,062✔
387
  SyncIndex       prevIndex = pEntry->index - 1;
2,735,062!
388
  SyncTerm        lastMatchTerm = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
2,735,065✔
389
  SSyncRaftEntry* pExist = NULL;
2,735,065✔
390
  bool            inBuf = true;
2,735,065✔
391

392
  if (lastMatchTerm < 0) {
2,735,065!
393
    sError("vgId:%d, failed to accept, lastMatchTerm:%" PRId64, pNode->vgId, lastMatchTerm);
×
394
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
395
    goto _out;
×
396
  }
397

398
  if (index <= pBuf->commitIndex) {
2,735,065✔
399
    sTrace("vgId:%d, already committed. index:%" PRId64 ", term:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64
11,546!
400
           " %" PRId64 ", %" PRId64 ")",
401
           pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
402
           pBuf->endIndex);
403
    SyncTerm term = -1;
11,546✔
404
    code = syncLogReplGetPrevLogTerm(NULL, pNode, index + 1, &term);
11,546✔
405
    if (pEntry->term < 0) {
11,546!
406
      sError("vgId:%d, failed to accept, pEntry->term:%" PRId64, pNode->vgId, pEntry->term);
×
407
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
408
      goto _out;
×
409
    }
410
    if (term == pEntry->term) {
11,546!
411
      code = 0;
11,546✔
412
    }
413
    goto _out;
11,546✔
414
  }
415

416
  if (pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER && index > 0 &&
2,723,519✔
417
      index > pBuf->totalIndex) {
2,325,396✔
418
    pBuf->totalIndex = index;
11,121✔
419
    sTrace("vgId:%d, update learner progress. index:%" PRId64 ", term:%" PRId64 ": prevterm:%" PRId64
11,121!
420
           " != lastmatch:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
421
           pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
422
           pBuf->matchIndex, pBuf->endIndex);
423
  }
424

425
  if (index - pBuf->startIndex >= pBuf->size) {
2,723,515✔
426
    sWarn("vgId:%d, out of buffer range. index:%" PRId64 ", term:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64
16!
427
          " %" PRId64 ", %" PRId64 ")",
428
          pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
429
          pBuf->endIndex);
430
    code = TSDB_CODE_OUT_OF_RANGE;
16✔
431
    goto _out;
16✔
432
  }
433

434
  if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) {
2,723,499✔
435
    sWarn("vgId:%d, not ready to accept. index:%" PRId64 ", term:%" PRId64 ": prevterm:%" PRId64
468!
436
          " != lastmatch:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
437
          pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
438
          pBuf->matchIndex, pBuf->endIndex);
439
    code = TSDB_CODE_ACTION_IN_PROGRESS;
468✔
440
    goto _out;
468✔
441
  }
442

443
  // check current in buffer
444
  code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pExist);
2,723,031✔
445
  if (pExist != NULL) {
2,723,032✔
446
    if (pEntry->index != pExist->index) {
1,987!
447
      sError("vgId:%d, failed to accept, pEntry->index:%" PRId64 ", pExist->index:%" PRId64, pNode->vgId, pEntry->index,
×
448
             pExist->index);
449
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
450
      goto _out;
×
451
    }
452
    if (pEntry->term != pExist->term) {
1,987✔
453
      TAOS_CHECK_GOTO(syncLogBufferRollback(pBuf, pNode, index), NULL, _out);
1!
454
    } else {
455
      sTrace("vgId:%d, duplicate log entry received. index:%" PRId64 ", term:%" PRId64 ". log buffer: [%" PRId64
1,986!
456
             " %" PRId64 " %" PRId64 ", %" PRId64 ")",
457
             pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
458
             pBuf->endIndex);
459
      SyncTerm existPrevTerm = -1;
1,986✔
460
      TAOS_CHECK_GOTO(syncLogReplGetPrevLogTerm(NULL, pNode, index, &existPrevTerm), NULL, _out);
1,986✔
461
      if (!(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm))) {
1,950!
462
        sError("vgId:%d, failed to accept, pEntry->term:%" PRId64 ", pExist->indexpExist->term:%" PRId64
×
463
               ", pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64 ", prevTerm:%" PRId64
464
               ", existPrevTerm:%" PRId64,
465
               pNode->vgId, pEntry->term, pExist->term, pEntry->index, pBuf->matchIndex, prevTerm, existPrevTerm);
466
        code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
467
        goto _out;
×
468
      }
469
      code = 0;
1,950✔
470
      goto _out;
1,950✔
471
    }
472
  }
473

474
  // update
475
  if (!(pBuf->startIndex < index)) {
2,721,046!
476
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
477
    goto _out;
×
478
  };
479
  if (!(index - pBuf->startIndex < pBuf->size)) {
2,721,046!
480
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
481
    goto _out;
×
482
  }
483
  if (pBuf->entries[index % pBuf->size].pItem != NULL) {
2,721,046!
484
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
485
    goto _out;
×
486
  }
487
  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
2,721,046✔
488
  pBuf->entries[index % pBuf->size] = tmp;
2,721,046✔
489
  if (pNode->vgId > 1) {
2,721,046✔
490
    pBuf->bytes += pEntry->bytes;
2,703,573✔
491
    (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
2,703,573✔
492
  }
493
  pEntry = NULL;
2,721,049✔
494

495
  // update end index
496
  pBuf->endIndex = TMAX(index + 1, pBuf->endIndex);
2,721,049✔
497

498
  // success
499
  code = 0;
2,721,049✔
500

501
_out:
2,735,065✔
502
  syncEntryDestroy(pEntry);
2,735,065✔
503
  if (!inBuf) {
2,735,064!
504
    syncEntryDestroy(pExist);
×
505
    pExist = NULL;
×
506
  }
507
  (void)taosThreadMutexUnlock(&pBuf->mutex);
2,735,064✔
508
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
2,735,065!
509
  TAOS_RETURN(code);
2,735,065✔
510
}
511

512
static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replicaNum) {
12,424,845✔
513
  return (replicaNum > 1) && (pEntry->originalRpcType == TDMT_VND_COMMIT);
12,424,845✔
514
}
515

516
int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
12,424,687✔
517
  int32_t code = 0;
12,424,687✔
518
  if (pEntry->index < 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
12,424,687!
519
  SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore);
12,424,687✔
520
  if (lastVer >= pEntry->index && (code = pLogStore->syncLogTruncate(pLogStore, pEntry->index)) < 0) {
12,424,966!
521
    sError("failed to truncate log store since %s. from index:%" PRId64 "", tstrerror(code), pEntry->index);
×
522
    TAOS_RETURN(code);
×
523
  }
524
  lastVer = pLogStore->syncLogLastIndex(pLogStore);
12,424,966✔
525
  if (pEntry->index != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
12,424,943!
526

527
  bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum);
12,424,943✔
528
  if ((code = pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync)) < 0) {
12,424,899!
529
    sError("failed to append sync log entry since %s. index:%" PRId64 ", term:%" PRId64 "", tstrerror(code),
×
530
           pEntry->index, pEntry->term);
531
    TAOS_RETURN(code);
×
532
  }
533

534
  lastVer = pLogStore->syncLogLastIndex(pLogStore);
12,424,629✔
535
  if (pEntry->index != lastVer) return TSDB_CODE_SYN_INTERNAL_ERROR;
12,424,871!
536
  return 0;
12,424,871✔
537
}
538

539
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char* str) {
12,438,774✔
540
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
12,438,774!
541
  (void)taosThreadMutexLock(&pBuf->mutex);
12,438,881✔
542

543
  SSyncLogStore* pLogStore = pNode->pLogStore;
12,439,043✔
544
  int64_t        matchIndex = pBuf->matchIndex;
12,439,043✔
545
  int32_t        code = 0;
12,439,043✔
546

547
  while (pBuf->matchIndex + 1 < pBuf->endIndex) {
24,863,856✔
548
    int64_t index = pBuf->matchIndex + 1;
12,638,904✔
549
    if (index < 0) {
12,638,904!
550
      sError("vgId:%d, failed to proceed index:%" PRId64, pNode->vgId, index);
×
551
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
552
      goto _out;
×
553
    }
554

555
    // try to proceed
556
    SSyncLogBufEntry* pBufEntry = &pBuf->entries[index % pBuf->size];
12,638,904✔
557
    SyncIndex         prevLogIndex = pBufEntry->prevLogIndex;
12,638,904✔
558
    SyncTerm          prevLogTerm = pBufEntry->prevLogTerm;
12,638,904✔
559
    SSyncRaftEntry*   pEntry = pBufEntry->pItem;
12,638,904✔
560
    if (pEntry == NULL) {
12,638,904✔
561
      sTrace("vgId:%d, cannot proceed match index in log buffer. no raft entry at next pos of matchIndex:%" PRId64,
214,058!
562
             pNode->vgId, pBuf->matchIndex);
563
      goto _out;
214,058✔
564
    }
565

566
    if (index != pEntry->index) {
12,424,846!
567
      sError("vgId:%d, failed to proceed index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId, index, pEntry->index);
×
568
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
569
      goto _out;
×
570
    }
571

572
    // match
573
    SSyncRaftEntry* pMatch = pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem;
12,424,846✔
574
    if (pMatch == NULL) {
12,424,846!
575
      sError("vgId:%d, failed to proceed since pMatch is null", pNode->vgId);
×
576
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
577
      goto _out;
×
578
    }
579
    if (pMatch->index != pBuf->matchIndex) {
12,424,846!
580
      sError("vgId:%d, failed to proceed, pMatch->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId,
×
581
             pMatch->index, pBuf->matchIndex);
582
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
583
      goto _out;
×
584
    }
585
    if (pMatch->index + 1 != pEntry->index) {
12,424,846!
586
      sError("vgId:%d, failed to proceed, pMatch->index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId,
×
587
             pMatch->index, pEntry->index);
588
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
589
      goto _out;
×
590
    }
591
    if (prevLogIndex != pMatch->index) {
12,424,846!
592
      sError("vgId:%d, failed to proceed, prevLogIndex:%" PRId64 ", pMatch->index:%" PRId64, pNode->vgId, prevLogIndex,
×
593
             pMatch->index);
594
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
595
      goto _out;
×
596
    }
597

598
    if (pMatch->term != prevLogTerm) {
12,424,846!
599
      sInfo(
×
600
          "vgId:%d, mismatching sync log entries encountered. "
601
          "{ index:%" PRId64 ", term:%" PRId64
602
          " } "
603
          "{ index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64 ", prevLogTerm:%" PRId64 " } ",
604
          pNode->vgId, pMatch->index, pMatch->term, pEntry->index, pEntry->term, prevLogIndex, prevLogTerm);
605
      goto _out;
×
606
    }
607

608
    // increase match index
609
    pBuf->matchIndex = index;
12,424,846✔
610

611
    sTrace("vgId:%d, log buffer proceed. start index:%" PRId64 ", match index:%" PRId64 ", end index:%" PRId64,
12,424,846✔
612
           pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex);
613

614
    // persist
615
    if ((code = syncLogStorePersist(pLogStore, pNode, pEntry)) < 0) {
12,424,846!
616
      sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId,
×
617
             tstrerror(code), pEntry->index);
618
      taosMsleep(1);
×
619
      goto _out;
×
620
    }
621

622
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
12,424,824!
623
      if (pNode->pLogBuf->commitIndex == pEntry->index - 1) {
×
624
        sInfo(
×
625
            "vgId:%d, to change config at %s. "
626
            "current entry, index:%" PRId64 ", term:%" PRId64
627
            ", "
628
            "node, restore:%d, commitIndex:%" PRId64
629
            ", "
630
            "cond: (pre entry index:%" PRId64 "== buf commit index:%" PRId64 ")",
631
            pNode->vgId, str, pEntry->index, pEntry->term, pNode->restoreFinish, pNode->commitIndex, pEntry->index - 1,
632
            pNode->pLogBuf->commitIndex);
633
        if ((code = syncNodeChangeConfig(pNode, pEntry, str)) != 0) {
×
634
          sError("vgId:%d, failed to change config from Append since %s. index:%" PRId64, pNode->vgId, tstrerror(code),
×
635
                 pEntry->index);
636
          goto _out;
×
637
        }
638
      } else {
639
        sInfo(
×
640
            "vgId:%d, delay change config from Node %s. "
641
            "curent entry, index:%" PRId64 ", term:%" PRId64
642
            ", "
643
            "node, commitIndex:%" PRId64 ",  pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
644
            "), "
645
            "cond:( pre entry index:%" PRId64 " != buf commit index:%" PRId64 ")",
646
            pNode->vgId, str, pEntry->index, pEntry->term, pNode->commitIndex, pNode->pLogBuf->startIndex,
647
            pNode->pLogBuf->commitIndex, pNode->pLogBuf->matchIndex, pNode->pLogBuf->endIndex, pEntry->index - 1,
648
            pNode->pLogBuf->commitIndex);
649
      }
650
    }
651

652
    // replicate on demand
653
    if ((code = syncNodeReplicateWithoutLock(pNode)) != 0) {
12,424,824!
654
      sError("vgId:%d, failed to replicate since %s. index:%" PRId64, pNode->vgId, tstrerror(code), pEntry->index);
×
655
      goto _out;
×
656
    }
657

658
    if (pEntry->index != pBuf->matchIndex) {
12,424,777!
659
      sError("vgId:%d, failed to proceed, pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId,
×
660
             pEntry->index, pBuf->matchIndex);
661
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
662
      goto _out;
×
663
    }
664

665
    // update my match index
666
    matchIndex = pBuf->matchIndex;
12,424,777✔
667
    syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex);
12,424,777✔
668
  }  // end of while
669

670
_out:
12,224,952✔
671
  pBuf->matchIndex = matchIndex;
12,439,010✔
672
  if (pMatchTerm) {
12,439,010✔
673
    *pMatchTerm = pBuf->entries[(matchIndex + pBuf->size) % pBuf->size].pItem->term;
2,735,062✔
674
  }
675
  (void)taosThreadMutexUnlock(&pBuf->mutex);
12,439,010✔
676
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
12,438,883!
677
  return matchIndex;
12,438,895✔
678
}
679

680
int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
12,734,548✔
681
                       int32_t applyCode, bool force) {
682
  // learner need to execute fsm when it catch up entry log
683
  // if force is true, keep all contition check to execute fsm
684
  if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1 &&
12,734,548✔
685
      pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER && force == false) {
9,505,224!
686
    sDebug("vgId:%d, not to execute fsm, index:%" PRId64 ", term:%" PRId64
9,488,885✔
687
           ", type:%s code:0x%x, replicaNum:%d,"
688
           "role:%d, restoreFinish:%d",
689
           pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode, pNode->replicaNum,
690
           pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole, pNode->restoreFinish);
691
    return 0;
9,488,887✔
692
  }
693

694
  if (pNode->vgId != 1 && syncIsMsgBlock(pEntry->originalRpcType)) {
3,245,663✔
695
    sTrace("vgId:%d, blocking msg ready to execute, index:%" PRId64 ", term:%" PRId64 ", type:%s code:0x%x",
16,967!
696
           pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode);
697
  }
698

699
  if (pEntry->originalRpcType == TDMT_VND_COMMIT) {
3,245,671✔
700
    sInfo("vgId:%d, fsm execute vnode commit. index:%" PRId64 ", term:%" PRId64 "", pNode->vgId, pEntry->index,
354!
701
          pEntry->term);
702
  }
703

704
  int32_t code = 0, lino = 0;
3,245,958✔
705
  bool    retry = false;
3,245,958✔
706
  do {
707
    SFsmCbMeta cbMeta = {0};
3,245,958✔
708
    cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
3,245,958✔
709
    if (cbMeta.lastConfigIndex < -1) {
3,245,983!
710
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
711
      if (terrno != 0) code = terrno;
×
712
      return code;
×
713
    }
714

715
    SRpcMsg rpcMsg = {.code = applyCode};
3,245,983✔
716
    TAOS_CHECK_EXIT(syncEntry2OriginalRpc(pEntry, &rpcMsg));
3,245,983!
717

718
    cbMeta.index = pEntry->index;
3,246,007✔
719
    cbMeta.isWeak = pEntry->isWeak;
3,246,007✔
720
    cbMeta.code = applyCode;
3,246,007✔
721
    cbMeta.state = role;
3,246,007✔
722
    cbMeta.seqNum = pEntry->seqNum;
3,246,007✔
723
    cbMeta.term = pEntry->term;
3,246,007✔
724
    cbMeta.currentTerm = term;
3,246,007✔
725
    cbMeta.flag = -1;
3,246,007✔
726

727
    int32_t num = syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
3,246,007✔
728
    sDebug("vgId:%d, get response info,  seqNum:%" PRId64 ", num:%d", pNode->vgId, cbMeta.seqNum, num);
3,246,082✔
729
    code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
3,246,082✔
730
    retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
3,246,063!
731
    sDebug("vgId:%d, fsm execute, index:%" PRId64 ", term:%" PRId64 ", type:%s, code:%d, retry:%d", pNode->vgId,
3,246,064!
732
           pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), code, retry);
733
    if (retry) {
3,246,064!
734
      taosMsleep(10);
×
735
      sError("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, tstrerror(code), pEntry->index);
×
736
    }
737
  } while (retry);
3,246,063!
738

739
_exit:
3,246,063✔
740
  if (code < 0) {
3,246,063✔
741
    sError("vgId:%d, failed to execute fsm at line %d since %s. index:%" PRId64 ", term:%" PRId64 ", type:%s",
107!
742
           pNode->vgId, lino, tstrerror(code), pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
743
  }
744
  return code;
3,246,063✔
745
}
746

747
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) {
79,547,004✔
748
  if (pBuf->startIndex > pBuf->matchIndex) {
79,547,004!
749
    sError("failed to validate, pBuf->startIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->startIndex,
×
750
           pBuf->matchIndex);
751
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
752
  }
753
  if (pBuf->commitIndex > pBuf->matchIndex) {
79,547,004!
754
    sError("failed to validate, pBuf->commitIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->commitIndex,
×
755
           pBuf->matchIndex);
756
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
757
  }
758
  if (pBuf->matchIndex >= pBuf->endIndex) {
79,547,004!
759
    sError("failed to validate, pBuf->matchIndex:%" PRId64 ", pBuf->endIndex:%" PRId64, pBuf->matchIndex,
×
760
           pBuf->endIndex);
761
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
762
  }
763
  if (pBuf->endIndex - pBuf->startIndex > pBuf->size) {
79,547,004!
764
    sError("failed to validate, pBuf->endIndex:%" PRId64 ", pBuf->startIndex:%" PRId64 ", pBuf->size:%" PRId64,
×
765
           pBuf->endIndex, pBuf->startIndex, pBuf->size);
766
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
767
  }
768
  if (pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem == NULL) {
79,547,004!
769
    sError("failed to validate since pItem is null");
×
770
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
771
  }
772
  return 0;
79,547,004✔
773
}
774

775
int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex) {
14,878,343✔
776
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
14,878,343!
777
  (void)taosThreadMutexLock(&pBuf->mutex);
14,878,466✔
778

779
  SSyncLogStore*  pLogStore = pNode->pLogStore;
14,878,334✔
780
  SSyncFSM*       pFsm = pNode->pFsm;
14,878,334✔
781
  ESyncState      role = pNode->state;
14,878,334✔
782
  SyncTerm        currentTerm = raftStoreGetTerm(pNode);
14,878,334✔
783
  SyncGroupId     vgId = pNode->vgId;
14,878,543✔
784
  int32_t         code = 0;
14,878,543✔
785
  int64_t         upperIndex = TMIN(commitIndex, pBuf->matchIndex);
14,878,543✔
786
  SSyncRaftEntry* pEntry = NULL;
14,878,543✔
787
  bool            inBuf = false;
14,878,543✔
788
  SSyncRaftEntry* pNextEntry = NULL;
14,878,543✔
789
  bool            nextInBuf = false;
14,878,543✔
790

791
  if (commitIndex <= pBuf->commitIndex) {
14,878,543✔
792
    sDebug("vgId:%d, stale commit index. current:%" PRId64 ", notified:%" PRId64 "", vgId, pBuf->commitIndex,
2,652,780✔
793
           commitIndex);
794
    goto _out;
2,652,780✔
795
  }
796

797
  sTrace("vgId:%d, commit. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), role:%d, term:%" PRId64,
12,225,763✔
798
         pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, currentTerm);
799

800
  // execute in fsm
801
  for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) {
24,960,297✔
802
    // get a log entry
803
    code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry);
12,734,576✔
804
    if (pEntry == NULL) {
12,734,682!
805
      goto _out;
×
806
    }
807

808
    // execute it
809
    if (!syncUtilUserCommit(pEntry->originalRpcType)) {
12,734,682✔
810
      sInfo("vgId:%d, commit sync barrier. index:%" PRId64 ", term:%" PRId64 ", type:%s", vgId, pEntry->index,
24,701!
811
            pEntry->term, TMSG_INFO(pEntry->originalRpcType));
812
    }
813

814
    if ((code = syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0, false)) != 0) {
12,734,510✔
815
      sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
107!
816
             ", role:%d, current term:%" PRId64,
817
             vgId, pEntry->index, pEntry->term, role, currentTerm);
818
      goto _out;
107✔
819
    }
820
    pBuf->commitIndex = index;
12,734,853✔
821

822
    sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", pNode->vgId,
12,734,853✔
823
           pEntry->index, pEntry->term, role, currentTerm);
824

825
    code = syncLogBufferGetOneEntry(pBuf, pNode, index + 1, &nextInBuf, &pNextEntry);
12,734,853✔
826
    if (pNextEntry != NULL) {
12,734,799✔
827
      if (pNextEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
897,999!
828
        sInfo(
×
829
            "vgId:%d, to change config at Commit. "
830
            "current entry, index:%" PRId64 ", term:%" PRId64
831
            ", "
832
            "node, role:%d, current term:%" PRId64
833
            ", restore:%d, "
834
            "cond, next entry index:%" PRId64 ", msgType:%s",
835
            vgId, pEntry->index, pEntry->term, role, currentTerm, pNode->restoreFinish, pNextEntry->index,
836
            TMSG_INFO(pNextEntry->originalRpcType));
837

838
        if ((code = syncNodeChangeConfig(pNode, pNextEntry, "Commit")) != 0) {
×
839
          sError("vgId:%d, failed to change config from Commit. index:%" PRId64 ", term:%" PRId64
×
840
                 ", role:%d, current term:%" PRId64,
841
                 vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
842
          goto _out;
×
843
        }
844

845
        // for 2->1, need to apply config change entry in sync thread,
846
        if (pNode->replicaNum == 1) {
×
847
          if ((code = syncFsmExecute(pNode, pFsm, role, currentTerm, pNextEntry, 0, true)) != 0) {
×
848
            sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
×
849
                   ", role:%d, current term:%" PRId64,
850
                   vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
851
            goto _out;
×
852
          }
853

854
          index++;
×
855
          pBuf->commitIndex = index;
×
856

857
          sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "",
×
858
                 pNode->vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
859
        }
860
      }
861
      if (!nextInBuf) {
897,737✔
862
        syncEntryDestroy(pNextEntry);
111,222✔
863
        pNextEntry = NULL;
111,222✔
864
      }
865
    }
866

867
    if (!inBuf) {
12,734,537✔
868
      syncEntryDestroy(pEntry);
111,247✔
869
      pEntry = NULL;
111,246✔
870
    }
871
  }
872

873
  // recycle
874
  bool      isVnode = pNode->vgId > 1;
12,225,721✔
875
  SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION;
12,225,721✔
876
  do {
11,975,930✔
877
    if ((pBuf->startIndex >= pBuf->commitIndex) ||
24,201,651✔
878
        !((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD &&
24,201,422✔
879
                                         atomic_load_64(&tsLogBufferMemoryUsed) >= tsLogBufferMemoryAllowed))) {
49,636✔
880
      break;
881
    }
882
    SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem;
11,975,927✔
883
    if (pEntry == NULL) {
11,975,927!
884
      sError("vgId:%d, invalid log entry to recycle. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
×
885
             ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64,
886
             pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term);
887
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
888
    }
889
    if (isVnode) {
11,975,927✔
890
      pBuf->bytes -= pEntry->bytes;
11,949,221✔
891
      (void)atomic_sub_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
11,949,221✔
892
    }
893
    sDebug("vgId:%d, recycle log entry. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
11,975,933✔
894
           ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64
895
           ", used:%" PRId64 ", allowed:%" PRId64,
896
           pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term,
897
           pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed), tsLogBufferMemoryAllowed);
898
    syncEntryDestroy(pEntry);
11,975,933✔
899
    (void)memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
11,975,930✔
900
    ++pBuf->startIndex;
11,975,930✔
901
  } while (true);
902

903
  code = 0;
12,225,723✔
904
_out:
14,878,610✔
905
  // mark as restored if needed
906
  if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL &&
14,878,610✔
907
      currentTerm <= pEntry->term) {
2,261,304✔
908
    pNode->pFsm->FpRestoreFinishCb(pNode->pFsm, pBuf->commitIndex);
11,535✔
909
    pNode->restoreFinish = true;
11,535✔
910
    sInfo("vgId:%d, restore finished. term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
11,535✔
911
          pNode->vgId, currentTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
912
  }
913

914
  if (!inBuf) {
14,878,610✔
915
    syncEntryDestroy(pEntry);
2,652,780✔
916
    pEntry = NULL;
2,652,780✔
917
  }
918
  if (!nextInBuf) {
14,878,610✔
919
    syncEntryDestroy(pNextEntry);
14,350,965✔
920
    pNextEntry = NULL;
14,350,976✔
921
  }
922
  (void)taosThreadMutexUnlock(&pBuf->mutex);
14,878,621✔
923
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
14,878,498!
924
  TAOS_RETURN(code);
14,878,504✔
925
}
926

927
void syncLogReplReset(SSyncLogReplMgr* pMgr) {
31,911✔
928
  if (pMgr == NULL) return;
31,911!
929

930
  if (pMgr->startIndex < 0) {
31,911!
931
    sError("failed to reset, pMgr->startIndex:%" PRId64, pMgr->startIndex);
×
932
    return;
×
933
  }
934
  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
44,947✔
935
    (void)memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
13,036✔
936
  }
937
  pMgr->startIndex = 0;
31,911✔
938
  pMgr->matchIndex = 0;
31,911✔
939
  pMgr->endIndex = 0;
31,911✔
940
  pMgr->restored = false;
31,911✔
941
  pMgr->retryBackoff = 0;
31,911✔
942
}
943

944
int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
3,113,728✔
945
  if (pMgr->endIndex <= pMgr->startIndex) {
3,113,728!
946
    return 0;
×
947
  }
948

949
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
3,113,728✔
950
  if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) {
3,113,728✔
951
    syncLogReplReset(pMgr);
30✔
952
    sWarn("vgId:%d, reset sync log repl since retry backoff exceeding limit. peer:%" PRIx64, pNode->vgId,
30!
953
          pDestId->addr);
954
    return TSDB_CODE_OUT_OF_RANGE;
30✔
955
  }
956

957
  int32_t  code = 0;
3,113,698✔
958
  bool     retried = false;
3,113,698✔
959
  int64_t  retryWaitMs = syncLogReplGetRetryBackoffTimeMs(pMgr);
3,113,698✔
960
  int64_t  nowMs = taosGetMonoTimestampMs();
3,113,697✔
961
  int      count = 0;
3,113,697✔
962
  int64_t  firstIndex = -1;
3,113,697✔
963
  SyncTerm term = -1;
3,113,697✔
964
  int64_t  batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
3,113,697✔
965

966
  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
3,151,717✔
967
    int64_t pos = index % pMgr->size;
3,146,487✔
968
    if (!(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex))) {
3,146,487!
969
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
970
      goto _out;
1✔
971
    }
972

973
    if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) {
3,146,487✔
974
      break;
3,108,462✔
975
    }
976

977
    if (pMgr->states[pos].acked) {
38,122✔
978
      if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) {
23,231✔
979
        syncLogReplReset(pMgr);
1✔
980
        sWarn("vgId:%d, reset sync log repl since stagnation. index:%" PRId64 ", peer:%" PRIx64, pNode->vgId, index,
1!
981
              pDestId->addr);
982
        code = TSDB_CODE_ACTION_IN_PROGRESS;
1✔
983
        goto _out;
1✔
984
      }
985
      continue;
22,886✔
986
    }
987

988
    bool barrier = false;
15,235✔
989
    if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) {
15,235!
990
      sError("vgId:%d, failed to replicate sync log entry since %s. index:%" PRId64 ", dest:%" PRIx64 "", pNode->vgId,
×
991
             tstrerror(code), index, pDestId->addr);
992
      goto _out;
×
993
    }
994
    if (barrier != pMgr->states[pos].barrier) {
15,231!
995
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
996
      goto _out;
×
997
    }
998
    pMgr->states[pos].timeMs = nowMs;
15,231✔
999
    pMgr->states[pos].term = term;
15,231✔
1000
    pMgr->states[pos].acked = false;
15,231✔
1001

1002
    retried = true;
15,231✔
1003
    if (firstIndex == -1) firstIndex = index;
15,231✔
1004

1005
    if (batchSize <= count++) {
15,231✔
1006
      break;
97✔
1007
    }
1008
  }
1009

1010
_out:
5,230✔
1011
  if (retried) {
3,113,693✔
1012
    pMgr->retryBackoff = syncLogReplGetNextRetryBackoff(pMgr);
275✔
1013
    SSyncLogBuffer* pBuf = pNode->pLogBuf;
275✔
1014
    sInfo("vgId:%d, resend %d sync log entries. dest:%" PRIx64 ", indexes:%" PRId64 " ..., terms: ... %" PRId64
275!
1015
          ", retryWaitMs:%" PRId64 ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64
1016
          " %" PRId64 ", %" PRId64 ")",
1017
          pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex,
1018
          pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1019
  }
1020
  TAOS_RETURN(code);
3,113,693✔
1021
}
1022

1023
int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
3,594✔
1024
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
3,594✔
1025
  SRaftId         destId = pMsg->srcId;
3,594✔
1026
  int32_t         code = 0;
3,594✔
1027
  if (pMgr->restored != false) return TSDB_CODE_SYN_INTERNAL_ERROR;
3,594!
1028

1029
  sTrace("vgId:%d, begin to recover sync log repl. peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 ", %" PRId64
3,594!
1030
         ", %" PRId64 ") restore:%d, buffer: [%" PRId64 ", %" PRId64 ", %" PRId64 ", %" PRId64
1031
         "), msg: {lastSendIndex:%" PRId64 ", matchIndex:%" PRId64 ", fsmState:%d, success:%d, lastMatchTerm:%" PRId64
1032
         "}",
1033
         pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pMgr->restored,
1034
         pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, pMsg->lastSendIndex, pMsg->matchIndex,
1035
         pMsg->fsmState, pMsg->success, pMsg->lastMatchTerm);
1036

1037
  if (pMgr->endIndex == 0) {
3,594✔
1038
    if (pMgr->startIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
1,701!
1039
    if (pMgr->matchIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
1,701!
1040
    if (pMsg->matchIndex < 0) {
1,701✔
1041
      pMgr->restored = true;
212✔
1042
      sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
212!
1043
            "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1044
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1045
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1046
      return 0;
212✔
1047
    }
1048
  } else {
1049
    if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) {
1,893✔
1050
      TAOS_CHECK_RETURN(syncLogReplRetryOnNeed(pMgr, pNode));
15!
1051
      return 0;
15✔
1052
    }
1053

1054
    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
1,878✔
1055

1056
    if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) {
1,878✔
1057
      pMgr->matchIndex = pMsg->matchIndex;
1,666✔
1058
      pMgr->restored = true;
1,666✔
1059
      sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
1,666!
1060
            "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1061
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1062
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1063
      return 0;
1,666✔
1064
    }
1065

1066
    if (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE || (!pMsg->success && pMsg->matchIndex >= pMsg->lastSendIndex)) {
212!
1067
      char* msg1 = " rollback match index failure";
×
1068
      char* msg2 = " incomplete fsm state";
×
1069
      sInfo("vgId:%d, snapshot replication to dnode:%d. reason:%s, match index:%" PRId64 ", last sent:%" PRId64,
×
1070
            pNode->vgId, DID(&destId), (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1), pMsg->matchIndex,
1071
            pMsg->lastSendIndex);
1072
      if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) {
×
1073
        sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
×
1074
        TAOS_RETURN(code);
×
1075
      }
1076
      return 0;
×
1077
    }
1078
  }
1079

1080
  // check last match term
1081
  SyncTerm  term = -1;
1,701✔
1082
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
1,701✔
1083
  SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex);
1,701✔
1084
  errno = 0;
1,701✔
1085

1086
  if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) {
1,701✔
1087
    code = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1, &term);
366✔
1088
    if (term < 0 && (errno == ENFILE || errno == EMFILE || errno == ENOENT)) {
366!
1089
      sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64, pNode->vgId, tstrerror(code), index + 1);
×
1090
      TAOS_RETURN(code);
×
1091
    }
1092

1093
    if (pMsg->matchIndex == -1) {
366✔
1094
      // first time to restore
1095
      sInfo("vgId:%d, first time to restore sync log repl. peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64
122!
1096
            ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), index:%" PRId64
1097
            ", firstVer:%" PRId64 ", term:%" PRId64 ", lastMatchTerm:%" PRId64,
1098
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1099
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, index, firstVer, term,
1100
            pMsg->lastMatchTerm);
1101
    }
1102

1103
    if ((index + 1 < firstVer) || (term < 0) ||
366!
1104
        (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
303!
1105
      if (!(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST)) return TSDB_CODE_SYN_INTERNAL_ERROR;
63!
1106
      if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) {
63!
1107
        sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
×
1108
        TAOS_RETURN(code);
×
1109
      }
1110
      sInfo("vgId:%d, snapshot replication to peer dnode:%d", pNode->vgId, DID(&destId));
63!
1111
      return 0;
63✔
1112
    }
1113

1114
    if (!(index + 1 >= firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR;
303!
1115

1116
    if (term == pMsg->lastMatchTerm) {
303!
1117
      index = index + 1;
303✔
1118
      if (!(index <= pNode->pLogBuf->matchIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR;
303!
1119
    } else {
1120
      if (!(index > firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1121
    }
1122
  }
1123

1124
  // attempt to replicate the raft log at index
1125
  syncLogReplReset(pMgr);
1,638✔
1126
  return syncLogReplProbe(pMgr, pNode, index);
1,638✔
1127
}
1128

1129
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
30,222✔
1130
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
30,222✔
1131
  (void)taosThreadMutexLock(&pBuf->mutex);
30,222✔
1132
  if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
30,222!
1133
    sInfo("vgId:%d, reset sync log repl in heartbeat. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "",
1,679!
1134
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
1135
    syncLogReplReset(pMgr);
1,679✔
1136
    pMgr->peerStartTime = pMsg->startTime;
1,679✔
1137
  }
1138
  (void)taosThreadMutexUnlock(&pBuf->mutex);
30,222✔
1139
  return 0;
30,223✔
1140
}
1141

1142
int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
2,733,334✔
1143
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
2,733,334✔
1144
  (void)taosThreadMutexLock(&pBuf->mutex);
2,733,334✔
1145
  if (pMsg->startTime != pMgr->peerStartTime) {
2,733,335✔
1146
    sInfo("vgId:%d, reset sync log repl in appendlog reply. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64,
231!
1147
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
1148
    syncLogReplReset(pMgr);
231✔
1149
    pMgr->peerStartTime = pMsg->startTime;
232✔
1150
  }
1151

1152
  int32_t code = 0;
2,733,336✔
1153
  if (pMgr->restored) {
2,733,336✔
1154
    if ((code = syncLogReplContinue(pMgr, pNode, pMsg)) != 0) {
2,729,742!
1155
      sWarn("vgId:%d, failed to continue sync log repl since %s", pNode->vgId, tstrerror(code));
×
1156
    }
1157
  } else {
1158
    if ((code = syncLogReplRecover(pMgr, pNode, pMsg)) != 0) {
3,594!
1159
      sWarn("vgId:%d, failed to recover sync log repl since %s", pNode->vgId, tstrerror(code));
×
1160
    }
1161
  }
1162
  (void)taosThreadMutexUnlock(&pBuf->mutex);
2,733,336✔
1163
  return 0;
2,733,336✔
1164
}
1165

1166
int32_t syncLogReplStart(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
412,680✔
1167
  if (pMgr->restored) {
412,680✔
1168
    TAOS_CHECK_RETURN(syncLogReplAttempt(pMgr, pNode));
383,994✔
1169
  } else {
1170
    TAOS_CHECK_RETURN(syncLogReplProbe(pMgr, pNode, pNode->pLogBuf->matchIndex));
28,686!
1171
  }
1172
  return 0;
412,628✔
1173
}
1174

1175
int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
30,327✔
1176
  if (pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR;
30,327!
1177
  if (!(pMgr->startIndex >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
30,327!
1178
  int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs();
30,327✔
1179
  int64_t nowMs = taosGetMonoTimestampMs();
30,327✔
1180
  int32_t code = 0;
30,327✔
1181

1182
  sTrace("vgId:%d, begin to probe peer:%" PRIx64 " with msg of index:%" PRId64 ". repl-mgr:[%" PRId64 ", %" PRId64
30,327!
1183
         ", %" PRId64 "), restored:%d",
1184
         pNode->vgId, pNode->replicasId[pMgr->peerId].addr, index, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1185
         pMgr->restored);
1186

1187
  if (pMgr->endIndex > pMgr->startIndex &&
30,327✔
1188
      nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) {
26,665✔
1189
    return 0;
26,581✔
1190
  }
1191
  syncLogReplReset(pMgr);
3,746✔
1192

1193
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
3,744✔
1194
  bool     barrier = false;
3,744✔
1195
  SyncTerm term = -1;
3,744✔
1196
  if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) {
3,744!
1197
    sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
×
1198
           tstrerror(code), index, pDestId->addr);
1199
    TAOS_RETURN(code);
×
1200
  }
1201

1202
  if (!(index >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
3,746!
1203
  pMgr->states[index % pMgr->size].barrier = barrier;
3,746✔
1204
  pMgr->states[index % pMgr->size].timeMs = nowMs;
3,746✔
1205
  pMgr->states[index % pMgr->size].term = term;
3,746✔
1206
  pMgr->states[index % pMgr->size].acked = false;
3,746✔
1207

1208
  pMgr->startIndex = index;
3,746✔
1209
  pMgr->endIndex = index + 1;
3,746✔
1210

1211
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
3,746✔
1212
  sTrace("vgId:%d, probe peer:%" PRIx64 " with msg of index:%" PRId64 " term:%" PRId64 ". repl-mgr:[%" PRId64
3,746!
1213
         " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1214
         pNode->vgId, pDestId->addr, index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex,
1215
         pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1216
  return 0;
3,746✔
1217
}
1218

1219
int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
3,113,735✔
1220
  if (!pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR;
3,113,735!
1221

1222
  sTrace("vgId:%d, begin to attempt replicate log entries from end to match. repl-mgr:[%" PRId64 ", %" PRId64
3,113,735!
1223
         ", %" PRId64 "), restore:%d",
1224
         pNode->vgId, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pMgr->restored);
1225

1226
  SRaftId*  pDestId = &pNode->replicasId[pMgr->peerId];
3,113,735✔
1227
  int32_t   batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
3,113,735✔
1228
  int32_t   code = 0;
3,113,735✔
1229
  int32_t   count = 0;
3,113,735✔
1230
  int64_t   nowMs = taosGetMonoTimestampMs();
3,113,736✔
1231
  int64_t   limit = pMgr->size >> 1;
3,113,736✔
1232
  SyncTerm  term = -1;
3,113,736✔
1233
  SyncIndex firstIndex = -1;
3,113,736✔
1234

1235
  for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) {
5,848,001✔
1236
    if (batchSize < count || limit <= index - pMgr->startIndex) {
5,288,633✔
1237
      break;
1238
    }
1239
    if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) {
2,767,681✔
1240
      break;
31,766✔
1241
    }
1242
    int64_t  pos = index % pMgr->size;
2,735,915✔
1243
    SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
2,735,915✔
1244
    bool     barrier = false;
2,735,915✔
1245
    SyncTerm term = -1;
2,735,915✔
1246
    if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) {
2,735,915✔
1247
      sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
26✔
1248
             tstrerror(code), index, pDestId->addr);
1249
      TAOS_RETURN(code);
26✔
1250
    }
1251
    pMgr->states[pos].barrier = barrier;
2,735,891✔
1252
    pMgr->states[pos].timeMs = nowMs;
2,735,891✔
1253
    pMgr->states[pos].term = term;
2,735,891✔
1254
    pMgr->states[pos].acked = false;
2,735,891✔
1255

1256
    if (firstIndex == -1) firstIndex = index;
2,735,891✔
1257
    count++;
2,735,891✔
1258

1259
    pMgr->endIndex = index + 1;
2,735,891✔
1260
    if (barrier) {
2,735,891✔
1261
      sInfo("vgId:%d, replicated sync barrier to dnode:%d. index:%" PRId64 ", term:%" PRId64 ", repl-mgr:[%" PRId64
1,626!
1262
            " %" PRId64 ", %" PRId64 ")",
1263
            pNode->vgId, DID(pDestId), index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
1264
      break;
1,626✔
1265
    }
1266
  }
1267

1268
  TAOS_CHECK_RETURN(syncLogReplRetryOnNeed(pMgr, pNode));
3,113,712✔
1269

1270
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
3,113,680✔
1271
  sTrace("vgId:%d, replicated %d msgs to peer:%" PRIx64 ". indexes:%" PRId64 "..., terms: ...%" PRId64
3,113,680!
1272
         ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
1273
         ")",
1274
         pNode->vgId, count, pDestId->addr, firstIndex, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1275
         pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1276
  return 0;
3,113,680✔
1277
}
1278

1279
int32_t syncLogReplContinue(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
2,729,742✔
1280
  if (pMgr->restored != true) return TSDB_CODE_SYN_INTERNAL_ERROR;
2,729,742!
1281
  if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) {
2,729,742✔
1282
    if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) {
2,547,316!
1283
      int64_t firstMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs;
×
1284
      int64_t lastMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs;
×
1285
      int64_t diffMs = lastMs - firstMs;
×
1286
      if (diffMs > 0 && diffMs < ((int64_t)SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) {
×
1287
        pMgr->retryBackoff -= 1;
×
1288
      }
1289
    }
1290
    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
2,547,316✔
1291
    pMgr->matchIndex = TMAX(pMgr->matchIndex, pMsg->matchIndex);
2,547,316✔
1292
    for (SyncIndex index = pMgr->startIndex; index < pMgr->matchIndex; index++) {
5,265,007✔
1293
      (void)memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
2,717,691✔
1294
    }
1295
    pMgr->startIndex = pMgr->matchIndex;
2,547,316✔
1296
  }
1297

1298
  return syncLogReplAttempt(pMgr, pNode);
2,729,742✔
1299
}
1300

1301
SSyncLogReplMgr* syncLogReplCreate() {
173,519✔
1302
  SSyncLogReplMgr* pMgr = taosMemoryCalloc(1, sizeof(SSyncLogReplMgr));
173,519!
1303
  if (pMgr == NULL) {
173,525!
1304
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1305
    return NULL;
×
1306
  }
1307

1308
  pMgr->size = sizeof(pMgr->states) / sizeof(pMgr->states[0]);
173,525✔
1309

1310
  if (pMgr->size != TSDB_SYNC_LOG_BUFFER_SIZE) {
173,525!
1311
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1312
    return NULL;
×
1313
  }
1314

1315
  return pMgr;
173,525✔
1316
}
1317

1318
void syncLogReplDestroy(SSyncLogReplMgr* pMgr) {
173,495✔
1319
  if (pMgr == NULL) {
173,495!
1320
    return;
×
1321
  }
1322
  taosMemoryFree(pMgr);
173,495!
1323
  return;
173,494✔
1324
}
1325

1326
int32_t syncNodeLogReplInit(SSyncNode* pNode) {
11,570✔
1327
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
185,092✔
1328
    if (pNode->logReplMgrs[i] != NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
173,516!
1329
    pNode->logReplMgrs[i] = syncLogReplCreate();
173,516✔
1330
    if (pNode->logReplMgrs[i] == NULL) {
173,522!
1331
      TAOS_RETURN(terrno);
×
1332
    }
1333
    pNode->logReplMgrs[i]->peerId = i;
173,522✔
1334
  }
1335
  return 0;
11,576✔
1336
}
1337

1338
void syncNodeLogReplDestroy(SSyncNode* pNode) {
11,568✔
1339
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
185,064✔
1340
    syncLogReplDestroy(pNode->logReplMgrs[i]);
173,495✔
1341
    pNode->logReplMgrs[i] = NULL;
173,496✔
1342
  }
1343
}
11,569✔
1344

1345
int32_t syncLogBufferCreate(SSyncLogBuffer** ppBuf) {
11,567✔
1346
  int32_t         code = 0;
11,567✔
1347
  SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer));
11,567!
1348
  if (pBuf == NULL) {
11,569!
1349
    TAOS_CHECK_GOTO(terrno, NULL, _exit);
×
1350
  }
1351

1352
  pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]);
11,569✔
1353

1354
  if (pBuf->size != TSDB_SYNC_LOG_BUFFER_SIZE) {
11,569!
1355
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1356
    goto _exit;
×
1357
  }
1358

1359
  if (taosThreadMutexAttrInit(&pBuf->attr) < 0) {
11,569!
1360
    code = TAOS_SYSTEM_ERROR(errno);
×
1361
    sError("failed to init log buffer mutexattr due to %s", tstrerror(code));
×
1362
    goto _exit;
×
1363
  }
1364

1365
  if (taosThreadMutexAttrSetType(&pBuf->attr, PTHREAD_MUTEX_RECURSIVE) < 0) {
11,570!
1366
    code = TAOS_SYSTEM_ERROR(errno);
×
1367
    sError("failed to set log buffer mutexattr type due to %s", tstrerror(code));
×
1368
    goto _exit;
×
1369
  }
1370

1371
  if (taosThreadMutexInit(&pBuf->mutex, &pBuf->attr) < 0) {
11,569!
1372
    code = TAOS_SYSTEM_ERROR(errno);
×
1373
    sError("failed to init log buffer mutex due to %s", tstrerror(code));
×
1374
    goto _exit;
×
1375
  }
1376
_exit:
11,569✔
1377
  if (code != 0) {
11,569!
1378
    taosMemoryFreeClear(pBuf);
×
1379
  }
1380
  *ppBuf = pBuf;
11,569✔
1381
  TAOS_RETURN(code);
11,569✔
1382
}
1383

1384
void syncLogBufferClear(SSyncLogBuffer* pBuf) {
11,568✔
1385
  (void)taosThreadMutexLock(&pBuf->mutex);
11,568✔
1386
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
672,707✔
1387
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
662,457✔
1388
    if (pEntry == NULL) continue;
662,457✔
1389
    syncEntryDestroy(pEntry);
662,453✔
1390
    pEntry = NULL;
661,135✔
1391
    (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
661,135✔
1392
  }
1393
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
10,250✔
1394
  pBuf->bytes = 0;
10,250✔
1395
  (void)taosThreadMutexUnlock(&pBuf->mutex);
10,250✔
1396
}
11,568✔
1397

1398
void syncLogBufferDestroy(SSyncLogBuffer* pBuf) {
11,568✔
1399
  if (pBuf == NULL) {
11,568!
1400
    return;
×
1401
  }
1402
  syncLogBufferClear(pBuf);
11,568✔
1403
  (void)taosThreadMutexDestroy(&pBuf->mutex);
11,568✔
1404
  (void)taosThreadMutexAttrDestroy(&pBuf->attr);
11,568✔
1405
  taosMemoryFree(pBuf);
11,568!
1406
  return;
11,569✔
1407
}
1408

1409
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
13,990✔
1410
  int32_t code = 0;
13,990✔
1411
  if (!(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR;
13,990!
1412

1413
  if (toIndex == pBuf->endIndex) {
13,990✔
1414
    return 0;
13,973✔
1415
  }
1416

1417
  sInfo("vgId:%d, rollback sync log buffer. toindex:%" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
17!
1418
        ")",
1419
        pNode->vgId, toIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1420

1421
  // trunc buffer
1422
  SyncIndex index = pBuf->endIndex - 1;
17✔
1423
  while (index >= toIndex) {
985✔
1424
    SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem;
968✔
1425
    if (pEntry != NULL) {
968✔
1426
      syncEntryDestroy(pEntry);
19✔
1427
      pEntry = NULL;
19✔
1428
      (void)memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0]));
19✔
1429
    }
1430
    index--;
968✔
1431
  }
1432
  pBuf->endIndex = toIndex;
17✔
1433
  pBuf->matchIndex = TMIN(pBuf->matchIndex, index);
17✔
1434
  if (index + 1 != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
17!
1435

1436
  // trunc wal
1437
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
17✔
1438
  if (lastVer >= toIndex && (code = pNode->pLogStore->syncLogTruncate(pNode->pLogStore, toIndex)) < 0) {
17!
1439
    sError("vgId:%d, failed to truncate log store since %s. from index:%" PRId64 "", pNode->vgId, tstrerror(code),
×
1440
           toIndex);
1441
    TAOS_RETURN(code);
×
1442
  }
1443
  lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
17✔
1444
  if (toIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
17!
1445

1446
  // refill buffer on need
1447
  if (toIndex <= pBuf->startIndex) {
17!
1448
    if ((code = syncLogBufferInitWithoutLock(pBuf, pNode)) < 0) {
×
1449
      sError("vgId:%d, failed to refill sync log buffer since %s", pNode->vgId, tstrerror(code));
×
1450
      TAOS_RETURN(code);
×
1451
    }
1452
  }
1453

1454
  if (pBuf->endIndex != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
17!
1455
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
17!
1456
  return 0;
17✔
1457
}
1458

1459
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
13,989✔
1460
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
13,989!
1461
  (void)taosThreadMutexLock(&pBuf->mutex);
13,989✔
1462
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
13,989✔
1463
  if (lastVer != pBuf->matchIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
13,989!
1464
  SyncIndex index = pBuf->endIndex - 1;
13,989✔
1465

1466
  int32_t code = 0;
13,989✔
1467
  if ((code = syncLogBufferRollback(pBuf, pNode, pBuf->matchIndex + 1)) != 0) {
13,989!
1468
    sError("vgId:%d, failed to rollback sync log buffer since %s", pNode->vgId, tstrerror(code));
×
1469
  }
1470

1471
  sInfo("vgId:%d, reset sync log buffer. buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
13,989✔
1472
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1473

1474
  pBuf->endIndex = pBuf->matchIndex + 1;
13,989✔
1475

1476
  // reset repl mgr
1477
  for (int i = 0; i < pNode->totalReplicaNum; i++) {
38,489✔
1478
    SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
24,500✔
1479
    syncLogReplReset(pMgr);
24,500✔
1480
  }
1481
  (void)taosThreadMutexUnlock(&pBuf->mutex);
13,989✔
1482
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
13,989!
1483
  return 0;
13,989✔
1484
}
1485

1486
int32_t syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf,
30,946,542✔
1487
                                 SSyncRaftEntry** ppEntry) {
1488
  int32_t code = 0;
30,946,542✔
1489

1490
  *ppEntry = NULL;
30,946,542✔
1491

1492
  if (index >= pBuf->endIndex) {
30,946,542✔
1493
    return TSDB_CODE_OUT_OF_RANGE;
14,370,007✔
1494
  }
1495

1496
  if (index > pBuf->startIndex) {  // startIndex might be dummy
16,576,535✔
1497
    *pInBuf = true;
13,888,638✔
1498
    *ppEntry = pBuf->entries[index % pBuf->size].pItem;
13,888,638✔
1499
  } else {
1500
    *pInBuf = false;
2,687,897✔
1501

1502
    if ((code = pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, index, ppEntry)) < 0) {
2,687,897✔
1503
      sWarn("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, tstrerror(code), index);
23!
1504
    }
1505
  }
1506
  TAOS_RETURN(code);
16,577,054✔
1507
}
1508

1509
int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId,
2,754,887✔
1510
                          bool* pBarrier) {
1511
  SSyncRaftEntry* pEntry = NULL;
2,754,887✔
1512
  SRpcMsg         msgOut = {0};
2,754,887✔
1513
  bool            inBuf = false;
2,754,887✔
1514
  SyncTerm        prevLogTerm = -1;
2,754,887✔
1515
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
2,754,887✔
1516
  int32_t         code = 0;
2,754,887✔
1517
  int32_t         lino = 0;
2,754,887✔
1518

1519
  code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry);
2,754,887✔
1520
  if (pEntry == NULL) {
2,754,893✔
1521
    sWarn("vgId:%d, failed to get raft entry for index:%" PRId64 "", pNode->vgId, index);
27✔
1522
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) {
27✔
1523
      SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
23✔
1524
      if (pMgr) {
23!
1525
        sInfo("vgId:%d, reset sync log repl of peer:%" PRIx64 " since %s. index:%" PRId64, pNode->vgId, pDestId->addr,
23!
1526
              tstrerror(code), index);
1527
        syncLogReplReset(pMgr);
23✔
1528
      }
1529
    }
1530
    goto _err;
23✔
1531
  }
1532
  *pBarrier = syncLogReplBarrier(pEntry);
2,754,866✔
1533

1534
  code = syncLogReplGetPrevLogTerm(pMgr, pNode, index, &prevLogTerm);
2,754,866✔
1535
  if (prevLogTerm < 0) {
2,754,867!
1536
    sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64 "", pNode->vgId, tstrerror(code), index);
×
1537
    goto _err;
×
1538
  }
1539
  if (pTerm) *pTerm = pEntry->term;
2,754,867✔
1540

1541
  code = syncBuildAppendEntriesFromRaftEntry(pNode, pEntry, prevLogTerm, &msgOut);
2,754,867✔
1542
  if (code < 0) {
2,754,868!
1543
    sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index);
×
1544
    goto _err;
×
1545
  }
1546

1547
  TRACE_SET_MSGID(&(msgOut.info.traceId), tGenIdPI64());
2,754,868✔
1548
  STraceId* trace = &(msgOut.info.traceId);
2,754,870✔
1549
  sGTrace("vgId:%d, replicate one msg index:%" PRId64 " term:%" PRId64 " prevterm:%" PRId64 " to dest: 0x%016" PRIx64,
2,754,870!
1550
          pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr);
1551
  TAOS_CHECK_GOTO(syncNodeSendAppendEntries(pNode, pDestId, &msgOut), &lino, _err);
2,754,870!
1552

1553
  if (!inBuf) {
2,754,871✔
1554
    syncEntryDestroy(pEntry);
2,465,932✔
1555
    pEntry = NULL;
2,465,932✔
1556
  }
1557
  return 0;
2,754,871✔
1558

1559
_err:
23✔
1560
  rpcFreeCont(msgOut.pCont);
23✔
1561
  msgOut.pCont = NULL;
23✔
1562
  if (!inBuf) {
23!
1563
    syncEntryDestroy(pEntry);
23✔
1564
    pEntry = NULL;
23✔
1565
  }
1566
  TAOS_RETURN(code);
23✔
1567
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc