• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3768

28 Mar 2025 10:15AM UTC coverage: 33.726% (-0.3%) from 33.993%
#3768

push

travis-ci

happyguoxy
test:alter lcov result

144891 of 592084 branches covered (24.47%)

Branch coverage included in aggregate %.

218795 of 486283 relevant lines covered (44.99%)

765715.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.14
/source/libs/sync/src/syncPipeline.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17

18
#include "syncCommit.h"
19
#include "syncIndexMgr.h"
20
#include "syncInt.h"
21
#include "syncPipeline.h"
22
#include "syncRaftCfg.h"
23
#include "syncRaftEntry.h"
24
#include "syncRaftStore.h"
25
#include "syncReplication.h"
26
#include "syncRespMgr.h"
27
#include "syncSnapshot.h"
28
#include "syncUtil.h"
29
#include "syncVoteMgr.h"
30

31
static int64_t tsLogBufferMemoryUsed = 0;  // total bytes of vnode log buffer
32

33
static bool syncIsMsgBlock(tmsg_t type) {
85,906✔
34
  return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
85,831!
35
         (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
171,737✔
36
}
37

38
FORCE_INLINE static int64_t syncGetRetryMaxWaitMs() {
39
  return SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF);
91✔
40
}
41

42
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
63,688✔
43
  (void)taosThreadMutexLock(&pBuf->mutex);
63,688✔
44
  int64_t index = pBuf->endIndex;
63,688✔
45
  (void)taosThreadMutexUnlock(&pBuf->mutex);
63,688✔
46
  return index;
63,691✔
47
}
48

49
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
63,691✔
50
  int32_t code = 0;
63,691✔
51
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
63,691!
52
  (void)taosThreadMutexLock(&pBuf->mutex);
63,687✔
53
  SyncIndex index = pEntry->index;
63,688✔
54

55
  if (index - pBuf->startIndex >= pBuf->size) {
63,688!
56
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
57
    sError("vgId:%d, failed to append since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
×
58
    goto _err;
×
59
  }
60

61
  if (pNode->restoreFinish && index - pBuf->commitIndex >= TSDB_SYNC_NEGOTIATION_WIN) {
63,688!
62
    code = TSDB_CODE_SYN_NEGOTIATION_WIN_FULL;
×
63
    sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64, pNode->vgId, tstrerror(code),
×
64
           index, pBuf->commitIndex);
65
    goto _err;
×
66
  }
67

68
  SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
63,688✔
69
  if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= TSDB_SYNC_APPLYQ_SIZE_LIMIT) {
63,688!
70
    code = TSDB_CODE_SYN_WRITE_STALL;
×
71
    sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64,
×
72
           pNode->vgId, tstrerror(code), index, pBuf->commitIndex, appliedIndex);
73
    goto _err;
×
74
  }
75

76
  if (index != pBuf->endIndex) {
63,688!
77
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
78
    goto _err;
×
79
  };
80

81
  SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem;
63,688✔
82
  if (pExist != NULL) {
63,688!
83
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
84
    goto _err;
×
85
  }
86

87
  // initial log buffer with at least one item, e.g, commitIndex
88
  SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem;
63,688✔
89
  if (pMatch == NULL) {
63,688!
90
    sError("vgId:%d, no matched log entry", pNode->vgId);
×
91
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
92
    goto _err;
×
93
  }
94
  if (pMatch->index + 1 != index) {
63,688!
95
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
96
    goto _err;
×
97
  }
98
  if (!(pMatch->term <= pEntry->term)) {
63,688!
99
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
100
    goto _err;
×
101
  }
102

103
  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
63,688✔
104
  pBuf->entries[index % pBuf->size] = tmp;
63,688✔
105
  pBuf->endIndex = index + 1;
63,688✔
106
  if (pNode->vgId > 1) {
63,688✔
107
    pBuf->bytes += pEntry->bytes;
61,321✔
108
    (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
61,321✔
109
  }
110

111
  (void)taosThreadMutexUnlock(&pBuf->mutex);
63,686✔
112
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
63,685!
113
  return 0;
63,687✔
114

115
_err:
×
116
  (void)taosThreadMutexUnlock(&pBuf->mutex);
×
117
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
×
118
  taosMsleep(1);
×
119
  TAOS_RETURN(code);
×
120
}
121

122
int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pSyncTerm) {
56,112✔
123
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
56,112✔
124
  SSyncRaftEntry* pEntry = NULL;
56,112✔
125
  SyncIndex       prevIndex = index - 1;
56,112✔
126
  SyncTerm        prevLogTerm = -1;
56,112✔
127
  int32_t         code = 0;
56,112✔
128

129
  if (prevIndex == -1 && pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore) == 0) {
56,112!
130
    *pSyncTerm = 0;
62✔
131
    return 0;
62✔
132
  }
133

134
  if (prevIndex > pBuf->matchIndex) {
56,050!
135
    *pSyncTerm = -1;
×
136
    TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
×
137
  }
138

139
  if (index - 1 != prevIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
56,050!
140

141
  if (prevIndex >= pBuf->startIndex) {
56,050✔
142
    pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem;
55,983✔
143
    if (pEntry == NULL) {
55,983!
144
      sError("vgId:%d, failed to get pre log term since no log entry found", pNode->vgId);
×
145
      *pSyncTerm = -1;
×
146
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
147
    }
148
    prevLogTerm = pEntry->term;
55,983✔
149
    *pSyncTerm = prevLogTerm;
55,983✔
150
    return 0;
55,983✔
151
  }
152

153
  if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) {
67!
154
    int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs;
61✔
155
    if (timeMs == 0) {
61!
156
      sError("vgId:%d, failed to get pre log term since timeMs is 0", pNode->vgId);
×
157
      *pSyncTerm = -1;
×
158
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
159
    }
160
    prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term;
61✔
161
    if (!(prevIndex == 0 || prevLogTerm != 0)) {
61!
162
      sError("vgId:%d, failed to get pre log term prevIndex:%" PRId64 ", prevLogTerm:%" PRId64, pNode->vgId, prevIndex,
×
163
             prevLogTerm);
164
      *pSyncTerm = -1;
×
165
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
166
    }
167
    *pSyncTerm = prevLogTerm;
61✔
168
    return 0;
61✔
169
  }
170

171
  SSnapshot snapshot = {0};
6✔
172
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
6✔
173
  if (prevIndex == snapshot.lastApplyIndex) {
6!
174
    *pSyncTerm = snapshot.lastApplyTerm;
×
175
    return 0;
×
176
  }
177

178
  if ((code = pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, prevIndex, &pEntry)) == 0) {
6!
179
    prevLogTerm = pEntry->term;
6✔
180
    syncEntryDestroy(pEntry);
6✔
181
    pEntry = NULL;
6✔
182
    *pSyncTerm = prevLogTerm;
6✔
183
    return 0;
6✔
184
  }
185

186
  *pSyncTerm = -1;
×
187
  sInfo("vgId:%d, failed to get log term since %s, index:%" PRId64, pNode->vgId, tstrerror(code), prevIndex);
×
188
  TAOS_RETURN(code);
×
189
}
190

191
SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId) {
340✔
192
  return syncEntryBuildNoop(term, index, vgId);
340✔
193
}
194

195
int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex) {
340✔
196
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
340✔
197
  if (firstVer > commitIndex + 1) {
340!
198
    sError("vgId:%d, firstVer of WAL log greater than tsdb commit version + 1, firstVer:%" PRId64
×
199
           ", tsdb commit version:%" PRId64,
200
           pNode->vgId, firstVer, commitIndex);
201
    return TSDB_CODE_WAL_LOG_INCOMPLETE;
×
202
  }
203

204
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
340✔
205
  if (lastVer < commitIndex) {
340!
206
    sError("vgId:%d, lastVer of WAL log less than tsdb commit version, lastVer:%" PRId64
×
207
           ", tsdb commit version:%" PRId64,
208
           pNode->vgId, lastVer, commitIndex);
209
    return TSDB_CODE_WAL_LOG_INCOMPLETE;
×
210
  }
211

212
  return 0;
340✔
213
}
214

215
int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
340✔
216
  if (pNode->pLogStore == NULL) {
340!
217
    sError("log store not created");
×
218
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
219
  }
220
  if (pNode->pFsm == NULL) {
340!
221
    sError("pFsm not registered");
×
222
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
223
  }
224
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
340!
225
    sError("FpGetSnapshotInfo not registered");
×
226
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
227
  }
228

229
  int32_t   code = 0, lino = 0;
340✔
230
  SSnapshot snapshot = {0};
340✔
231
  TAOS_CHECK_EXIT(pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot));
340!
232

233
  SyncIndex commitIndex = snapshot.lastApplyIndex;
340✔
234
  SyncTerm  commitTerm = TMAX(snapshot.lastApplyTerm, 0);
340✔
235
  TAOS_CHECK_EXIT(syncLogValidateAlignmentOfCommit(pNode, commitIndex));
340!
236

237
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
340✔
238
  if (lastVer < commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
340!
239
  ;
240
  SyncIndex toIndex = lastVer;
340✔
241
  // update match index
242
  pBuf->commitIndex = commitIndex;
340✔
243
  pBuf->matchIndex = toIndex;
340✔
244
  pBuf->endIndex = toIndex + 1;
340✔
245

246
  // load log entries in reverse order
247
  SSyncLogStore*  pLogStore = pNode->pLogStore;
340✔
248
  SyncIndex       index = toIndex;
340✔
249
  SSyncRaftEntry* pEntry = NULL;
340✔
250
  bool            takeDummy = false;
340✔
251
  int             emptySize = (TSDB_SYNC_LOG_BUFFER_SIZE >> 1);
340✔
252

253
  while (true) {
2,554✔
254
    if (index <= pBuf->commitIndex) {
2,894✔
255
      takeDummy = true;
340✔
256
      break;
340✔
257
    }
258

259
    if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) {
2,554!
260
      sWarn("vgId:%d, failed to get log entry since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
×
261
      break;
×
262
    }
263

264
    bool taken = false;
2,554✔
265
    if (toIndex - index + 1 <= pBuf->size - emptySize) {
2,554!
266
      SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1};
2,554✔
267
      pBuf->entries[index % pBuf->size] = tmp;
2,554✔
268
      taken = true;
2,554✔
269
      if (pNode->vgId > 1) {
2,554✔
270
        pBuf->bytes += pEntry->bytes;
2,416✔
271
        (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
2,416✔
272
      }
273
    }
274

275
    if (index < toIndex) {
2,554✔
276
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = pEntry->index;
2,499✔
277
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = pEntry->term;
2,499✔
278
    }
279

280
    if (!taken) {
2,554!
281
      syncEntryDestroy(pEntry);
×
282
      pEntry = NULL;
×
283
      break;
×
284
    }
285

286
    index--;
2,554✔
287
  }
288

289
  // put a dummy record at commitIndex if present in log buffer
290
  if (takeDummy) {
340!
291
    if (index != pBuf->commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
340!
292

293
    SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId);
340✔
294
    if (pDummy == NULL) {
340!
295
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
296
    }
297
    SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm};
340✔
298
    pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp;
340✔
299
    if (pNode->vgId > 1) {
340✔
300
      pBuf->bytes += pDummy->bytes;
277✔
301
      (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pDummy->bytes);
277✔
302
    }
303

304
    if (index < toIndex) {
340✔
305
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex;
55✔
306
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = commitTerm;
55✔
307
    }
308
  }
309

310
  // update startIndex
311
  pBuf->startIndex = takeDummy ? index : index + 1;
340!
312

313
  pBuf->isCatchup = false;
340✔
314

315
  sInfo("vgId:%d, init sync log buffer, buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
340✔
316
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
317

318
  // validate
319
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
340!
320
  return 0;
340✔
321

322
_exit:
×
323
  if (code != 0) {
×
324
    sError("vgId:%d, failed to initialize sync log buffer at line %d since %s.", pNode->vgId, lino, tstrerror(code));
×
325
  }
326
  TAOS_RETURN(code);
×
327
}
328

329
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
339✔
330
  (void)taosThreadMutexLock(&pBuf->mutex);
339✔
331
  int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
340✔
332
  (void)taosThreadMutexUnlock(&pBuf->mutex);
340✔
333
  return ret;
340✔
334
}
335

336
int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
×
337
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
×
338
  (void)taosThreadMutexLock(&pBuf->mutex);
×
339
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
×
340
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
×
341
    if (pEntry == NULL) continue;
×
342
    syncEntryDestroy(pEntry);
×
343
    pEntry = NULL;
×
344
    (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
×
345
  }
346
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
×
347
  pBuf->bytes = 0;
×
348
  int32_t code = syncLogBufferInitWithoutLock(pBuf, pNode);
×
349
  if (code < 0) {
×
350
    sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code));
×
351
  }
352
  (void)taosThreadMutexUnlock(&pBuf->mutex);
×
353
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
×
354
  return code;
×
355
}
356

357
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTermWithoutLock(SSyncLogBuffer* pBuf) {
358
  SyncIndex       index = pBuf->matchIndex;
60,122✔
359
  SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
60,122✔
360
  if (pEntry == NULL) {
4,088!
361
    sError("failed to get last match term since entry is null");
×
362
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
363
    return -1;
×
364
  }
365
  return pEntry->term;
60,123✔
366
}
367

368
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
4,088✔
369
  (void)taosThreadMutexLock(&pBuf->mutex);
4,088✔
370
  SyncTerm term = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
4,088✔
371
  (void)taosThreadMutexUnlock(&pBuf->mutex);
4,088✔
372
  return term;
4,088✔
373
}
374

375
bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf) {
4,088✔
376
  (void)taosThreadMutexLock(&pBuf->mutex);
4,088✔
377
  bool empty = (pBuf->endIndex <= pBuf->startIndex);
4,088✔
378
  (void)taosThreadMutexUnlock(&pBuf->mutex);
4,088✔
379
  return empty;
4,088✔
380
}
381

382
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
56,034✔
383
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
56,034!
384
  (void)taosThreadMutexLock(&pBuf->mutex);
56,035✔
385
  int32_t         code = 0;
56,034✔
386
  SyncIndex       index = pEntry->index;
56,034✔
387
  SyncIndex       prevIndex = pEntry->index - 1;
56,034!
388
  SyncTerm        lastMatchTerm = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
56,035✔
389
  SSyncRaftEntry* pExist = NULL;
56,035✔
390
  bool            inBuf = true;
56,035✔
391

392
  if (lastMatchTerm < 0) {
56,035!
393
    sError("vgId:%d, failed to accept, lastMatchTerm:%" PRId64, pNode->vgId, lastMatchTerm);
×
394
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
395
    goto _out;
×
396
  }
397

398
  if (index <= pBuf->commitIndex) {
56,035✔
399
    sTrace("vgId:%d, already committed, index:%" PRId64 ", term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64
32!
400
           " %" PRId64 ", %" PRId64 ")",
401
           pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
402
           pBuf->endIndex);
403
    SyncTerm term = -1;
32✔
404
    code = syncLogReplGetPrevLogTerm(NULL, pNode, index + 1, &term);
32✔
405
    if (pEntry->term < 0) {
32!
406
      sError("vgId:%d, failed to accept, pEntry->term:%" PRId64, pNode->vgId, pEntry->term);
×
407
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
408
      goto _out;
×
409
    }
410
    if (term == pEntry->term) {
32!
411
      code = 0;
32✔
412
    }
413
    goto _out;
32✔
414
  }
415

416
  if (pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER && index > 0 &&
56,003✔
417
      index > pBuf->totalIndex) {
190✔
418
    pBuf->totalIndex = index;
10✔
419
    sTrace("vgId:%d, update learner progress, index:%" PRId64 ", term:%" PRId64 ": prevterm:%" PRId64
10!
420
           " != lastmatch:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
421
           pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
422
           pBuf->matchIndex, pBuf->endIndex);
423
  }
424

425
  if (index - pBuf->startIndex >= pBuf->size) {
56,001!
426
    sWarn("vgId:%d, out of buffer range, index:%" PRId64 ", term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64
×
427
          " %" PRId64 ", %" PRId64 ")",
428
          pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
429
          pBuf->endIndex);
430
    code = TSDB_CODE_OUT_OF_RANGE;
×
431
    goto _out;
×
432
  }
433

434
  if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) {
56,001✔
435
    sWarn("vgId:%d, not ready to accept, index:%" PRId64 ", term:%" PRId64 ": prevterm:%" PRId64
15!
436
          " != lastmatch:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
437
          pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
438
          pBuf->matchIndex, pBuf->endIndex);
439
    code = TSDB_CODE_ACTION_IN_PROGRESS;
15✔
440
    goto _out;
15✔
441
  }
442

443
  // check current in buffer
444
  code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pExist);
55,986✔
445
  if (pExist != NULL) {
55,987✔
446
    if (pEntry->index != pExist->index) {
27!
447
      sError("vgId:%d, failed to accept, pEntry->index:%" PRId64 ", pExist->index:%" PRId64, pNode->vgId, pEntry->index,
×
448
             pExist->index);
449
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
450
      goto _out;
×
451
    }
452
    if (pEntry->term != pExist->term) {
27!
453
      TAOS_CHECK_GOTO(syncLogBufferRollback(pBuf, pNode, index), NULL, _out);
×
454
    } else {
455
      sTrace("vgId:%d, duplicate log entry received, index:%" PRId64 ", term:%" PRId64 ", log buffer: [%" PRId64
27!
456
             " %" PRId64 " %" PRId64 ", %" PRId64 ")",
457
             pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
458
             pBuf->endIndex);
459
      SyncTerm existPrevTerm = -1;
27✔
460
      TAOS_CHECK_GOTO(syncLogReplGetPrevLogTerm(NULL, pNode, index, &existPrevTerm), NULL, _out);
27!
461
      if (!(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm))) {
27!
462
        sError("vgId:%d, failed to accept, pEntry->term:%" PRId64 ", pExist->indexpExist->term:%" PRId64
×
463
               ", pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64 ", prevTerm:%" PRId64
464
               ", existPrevTerm:%" PRId64,
465
               pNode->vgId, pEntry->term, pExist->term, pEntry->index, pBuf->matchIndex, prevTerm, existPrevTerm);
466
        code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
467
        goto _out;
×
468
      }
469
      code = 0;
27✔
470
      goto _out;
27✔
471
    }
472
  }
473

474
  // update
475
  if (!(pBuf->startIndex < index)) {
55,960!
476
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
477
    goto _out;
×
478
  };
479
  if (!(index - pBuf->startIndex < pBuf->size)) {
55,960!
480
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
481
    goto _out;
×
482
  }
483
  if (pBuf->entries[index % pBuf->size].pItem != NULL) {
55,960!
484
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
485
    goto _out;
×
486
  }
487
  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
55,960✔
488
  pBuf->entries[index % pBuf->size] = tmp;
55,960✔
489
  if (pNode->vgId > 1) {
55,960✔
490
    pBuf->bytes += pEntry->bytes;
55,552✔
491
    (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
55,552✔
492
  }
493
  pEntry = NULL;
55,960✔
494

495
  // update end index
496
  pBuf->endIndex = TMAX(index + 1, pBuf->endIndex);
55,960✔
497

498
  // success
499
  code = 0;
55,960✔
500

501
_out:
56,034✔
502
  syncEntryDestroy(pEntry);
56,034✔
503
  if (!inBuf) {
56,033!
504
    syncEntryDestroy(pExist);
×
505
    pExist = NULL;
×
506
  }
507
  (void)taosThreadMutexUnlock(&pBuf->mutex);
56,033✔
508
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
56,034!
509
  TAOS_RETURN(code);
56,034✔
510
}
511

512
static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replicaNum) {
119,648✔
513
  return (replicaNum > 1) && (pEntry->originalRpcType == TDMT_VND_COMMIT);
119,648✔
514
}
515

516
int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
119,643✔
517
  int32_t code = 0;
119,643✔
518
  if (pEntry->index < 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
119,643!
519
  SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore);
119,643✔
520
  if (lastVer >= pEntry->index && (code = pLogStore->syncLogTruncate(pLogStore, pEntry->index)) < 0) {
119,650!
521
    sError("failed to truncate log store since %s, from index:%" PRId64, tstrerror(code), pEntry->index);
×
522
    TAOS_RETURN(code);
×
523
  }
524
  lastVer = pLogStore->syncLogLastIndex(pLogStore);
119,650✔
525
  if (pEntry->index != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
119,650!
526

527
  bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum);
119,650✔
528
  if ((code = pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync)) < 0) {
119,648!
529
    sError("failed to persist raft entry since %s, index:%" PRId64 ", term:%" PRId64, tstrerror(code),
×
530
           pEntry->index, pEntry->term);
531
    TAOS_RETURN(code);
×
532
  }
533

534
  lastVer = pLogStore->syncLogLastIndex(pLogStore);
119,644✔
535
  if (pEntry->index != lastVer) return TSDB_CODE_SYN_INTERNAL_ERROR;
119,646!
536
  return 0;
119,646✔
537
}
538

539
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char* str, const SRpcMsg *pMsg) {
119,716✔
540
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
119,716!
541
  (void)taosThreadMutexLock(&pBuf->mutex);
119,714✔
542

543
  SSyncLogStore* pLogStore = pNode->pLogStore;
119,725✔
544
  int64_t        matchIndex = pBuf->matchIndex;
119,725✔
545
  int32_t        code = 0;
119,725✔
546

547
  while (pBuf->matchIndex + 1 < pBuf->endIndex) {
239,362✔
548
    int64_t index = pBuf->matchIndex + 1;
119,839✔
549
    if (index < 0) {
119,839!
550
      sError("vgId:%d, failed to proceed index:%" PRId64, pNode->vgId, index);
×
551
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
552
      goto _out;
×
553
    }
554

555
    // try to proceed
556
    SSyncLogBufEntry* pBufEntry = &pBuf->entries[index % pBuf->size];
119,839✔
557
    SyncIndex         prevLogIndex = pBufEntry->prevLogIndex;
119,839✔
558
    SyncTerm          prevLogTerm = pBufEntry->prevLogTerm;
119,839✔
559
    SSyncRaftEntry*   pEntry = pBufEntry->pItem;
119,839✔
560
    if (pEntry == NULL) {
119,839✔
561
      sTrace("vgId:%d, msg:%p, cannot proceed match index in log buffer, no raft entry at next pos of matchIndex:%" PRId64,
199!
562
             pNode->vgId, pMsg, pBuf->matchIndex);
563
      goto _out;
199✔
564
    }
565

566
    if (index != pEntry->index) {
119,640!
567
      sError("vgId:%d, msg:%p, failed to proceed index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId, pMsg, index, pEntry->index);
×
568
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
569
      goto _out;
×
570
    }
571

572
    // match
573
    SSyncRaftEntry* pMatch = pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem;
119,640✔
574
    if (pMatch == NULL) {
119,640!
575
      sError("vgId:%d, msg:%p, failed to proceed since pMatch is null", pNode->vgId, pMsg);
×
576
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
577
      goto _out;
×
578
    }
579
    if (pMatch->index != pBuf->matchIndex) {
119,640!
580
      sError("vgId:%d, msg:%p, failed to proceed, pMatch->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId,
×
581
             pMsg, pMatch->index, pBuf->matchIndex);
582
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
583
      goto _out;
×
584
    }
585
    if (pMatch->index + 1 != pEntry->index) {
119,640!
586
      sError("vgId:%d, msg:%p, failed to proceed, pMatch->index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId, pMsg,
×
587
             pMatch->index, pEntry->index);
588
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
589
      goto _out;
×
590
    }
591
    if (prevLogIndex != pMatch->index) {
119,640!
592
      sError("vgId:%d, msg:%p, failed to proceed, prevLogIndex:%" PRId64 ", pMatch->index:%" PRId64, pNode->vgId, pMsg,
×
593
             prevLogIndex, pMatch->index);
594
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
595
      goto _out;
×
596
    }
597

598
    if (pMatch->term != prevLogTerm) {
119,640!
599
      sInfo(
×
600
          "vgId:%d, msg:%p, mismatching sync log entries encountered, "
601
          "{ index:%" PRId64 ", term:%" PRId64
602
          " } "
603
          "{ index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64 ", prevLogTerm:%" PRId64 " } ",
604
          pNode->vgId, pMsg, pMatch->index, pMatch->term, pEntry->index, pEntry->term, prevLogIndex, prevLogTerm);
605
      goto _out;
×
606
    }
607

608
    // increase match index
609
    pBuf->matchIndex = index;
119,640✔
610

611
    sGDebug(pMsg ? &pMsg->info.traceId : NULL,
119,640!
612
            "vgId:%d, msg:%p, log buffer proceed, start index:%" PRId64 ", match index:%" PRId64 ", end index:%" PRId64,
613
            pNode->vgId, pMsg, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex);
614

615
    // persist
616
    if ((code = syncLogStorePersist(pLogStore, pNode, pEntry)) < 0) {
119,646!
617
      sError("vgId:%d, msg:%p, failed to persist sync log entry from buffer since %s, index:%" PRId64, pNode->vgId,
×
618
             pMsg, tstrerror(code), pEntry->index);
619
      taosMsleep(1);
×
620
      goto _out;
×
621
    }
622

623
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
119,645!
624
      if (pNode->pLogBuf->commitIndex == pEntry->index - 1) {
×
625
        sInfo(
×
626
            "vgId:%d, msg:%p, to change config at %s, "
627
            "current entry, index:%" PRId64 ", term:%" PRId64
628
            ", "
629
            "node, restore:%d, commitIndex:%" PRId64
630
            ", "
631
            "cond: (pre entry index:%" PRId64 "== buf commit index:%" PRId64 ")",
632
            pNode->vgId, pMsg, str, pEntry->index, pEntry->term, pNode->restoreFinish, pNode->commitIndex,
633
            pEntry->index - 1, pNode->pLogBuf->commitIndex);
634
        if ((code = syncNodeChangeConfig(pNode, pEntry, str)) != 0) {
×
635
          sError("vgId:%d, failed to change config from Append since %s, index:%" PRId64, pNode->vgId, tstrerror(code),
×
636
                 pEntry->index);
637
          goto _out;
×
638
        }
639
      } else {
640
        sInfo(
×
641
            "vgId:%d, msg:%p, delay change config from Node %s, "
642
            "curent entry, index:%" PRId64 ", term:%" PRId64
643
            ", "
644
            "node, commitIndex:%" PRId64 ",  pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
645
            "), "
646
            "cond:( pre entry index:%" PRId64 " != buf commit index:%" PRId64 ")",
647
            pNode->vgId, pMsg, str, pEntry->index, pEntry->term, pNode->commitIndex, pNode->pLogBuf->startIndex,
648
            pNode->pLogBuf->commitIndex, pNode->pLogBuf->matchIndex, pNode->pLogBuf->endIndex, pEntry->index - 1,
649
            pNode->pLogBuf->commitIndex);
650
      }
651
    }
652

653
    // replicate on demand
654
    if ((code = syncNodeReplicateWithoutLock(pNode)) != 0) {
119,645✔
655
      sError("vgId:%d, msg:%p, failed to replicate since %s, index:%" PRId64, pNode->vgId, pMsg, tstrerror(code),
1!
656
             pEntry->index);
657
      goto _out;
×
658
    }
659

660
    if (pEntry->index != pBuf->matchIndex) {
119,643!
661
      sError("vgId:%d, msg:%p, failed to proceed, pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId,
×
662
             pMsg, pEntry->index, pBuf->matchIndex);
663
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
664
      goto _out;
×
665
    }
666

667
    // update my match index
668
    matchIndex = pBuf->matchIndex;
119,643✔
669
    syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex);
119,643✔
670
  }  // end of while
671

672
_out:
119,523✔
673
  pBuf->matchIndex = matchIndex;
119,722✔
674
  if (pMatchTerm) {
119,722✔
675
    *pMatchTerm = pBuf->entries[(matchIndex + pBuf->size) % pBuf->size].pItem->term;
56,034✔
676
  }
677
  (void)taosThreadMutexUnlock(&pBuf->mutex);
119,722✔
678
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
119,725!
679
  return matchIndex;
119,726✔
680
}
681

682
int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
122,177✔
683
                       int32_t applyCode, bool force) {
684
  // learner need to execute fsm when it catch up entry log
685
  // if force is true, keep all contition check to execute fsm
686
  if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1 &&
122,177✔
687
      pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER && force == false) {
33,372!
688
    sGDebug(&pEntry->originRpcTraceId,
33,378!
689
            "vgId:%d, index:%" PRId64 ", raft fsm no need to execute, term:%" PRId64
690
            ", type:%s code:0x%x, replica:%d, role:%d, restoreFinish:%d",
691
            pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode, pNode->replicaNum,
692
            pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole, pNode->restoreFinish);
693
    return 0;
33,375✔
694
  }
695

696
  if (pNode->vgId != 1 && syncIsMsgBlock(pEntry->originalRpcType)) {
88,799✔
697
    sTrace("vgId:%d, index:%" PRId64 ", blocking msg ready to execute, term:%" PRId64 ", type:%s code:0x%x",
127!
698
           pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode);
699
  }
700

701
  if (pEntry->originalRpcType == TDMT_VND_COMMIT) {
88,798✔
702
    sInfo("vgId:%d, index:%" PRId64 ", fsm execute vnode commit, term:%" PRId64, pNode->vgId, pEntry->index,
531!
703
          pEntry->term);
704
  }
705

706
  int32_t code = 0, lino = 0;
88,806✔
707
  bool    retry = false;
88,806✔
708
  do {
709
    SFsmCbMeta cbMeta = {0};
88,806✔
710
    cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
88,806✔
711
    if (cbMeta.lastConfigIndex < -1) {
88,806!
712
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
713
      if (terrno != 0) code = terrno;
×
714
      return code;
×
715
    }
716

717
    SRpcMsg rpcMsg = {.code = applyCode};
88,806✔
718
    TAOS_CHECK_EXIT(syncEntry2OriginalRpc(pEntry, &rpcMsg));
88,806!
719

720
    cbMeta.index = pEntry->index;
88,805✔
721
    cbMeta.isWeak = pEntry->isWeak;
88,805✔
722
    cbMeta.code = applyCode;
88,805✔
723
    cbMeta.state = role;
88,805✔
724
    cbMeta.seqNum = pEntry->seqNum;
88,805✔
725
    cbMeta.term = pEntry->term;
88,805✔
726
    cbMeta.currentTerm = term;
88,805✔
727
    cbMeta.flag = -1;
88,805✔
728
    rpcMsg.info.traceId = pEntry->originRpcTraceId;
88,805✔
729

730
    int32_t num = syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
88,805✔
731
    sGDebug(&rpcMsg.info.traceId, "vgId:%d, index:%" PRId64 ", get response info, handle:%p seq:%" PRId64 " num:%d",
88,806!
732
            pNode->vgId, pEntry->index, &rpcMsg.info, cbMeta.seqNum, num);
733

734
    code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
88,806✔
735
    retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
88,806!
736

737
    sGTrace(&rpcMsg.info.traceId,
88,806!
738
            "vgId:%d, index:%" PRId64 ", fsm execute, term:%" PRId64 ", type:%s, code:%d, retry:%d", pNode->vgId,
739
            pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), code, retry);
740

741
    if (retry) {
88,806!
742
      taosMsleep(10);
×
743
      if (code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE) {
×
744
        sGError(&rpcMsg.info.traceId,
×
745
                "vgId:%d, index:%" PRId64 ", will retry to execute fsm after 10ms, last error is %s", pNode->vgId,
746
                pEntry->index, tstrerror(code));
747
      } else {
748
        sGTrace(&rpcMsg.info.traceId,
×
749
                "vgId:%d, index:%" PRId64 ", will retry to execute fsm after 10ms, last error is %s", pNode->vgId,
750
                pEntry->index, tstrerror(code));
751
      }
752
    }
753
  } while (retry);
88,806!
754

755
_exit:
88,806✔
756
  if (code < 0) {
88,806!
757
    sError("vgId:%d, index:%" PRId64 ", failed to execute fsm at line %d since %s, term:%" PRId64 ", type:%s",
×
758
           pNode->vgId, pEntry->index, lino, tstrerror(code), pEntry->term, TMSG_INFO(pEntry->originalRpcType));
759
  }
760
  return code;
88,806✔
761
}
762

763
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) {
784,256✔
764
  if (pBuf->startIndex > pBuf->matchIndex) {
784,256!
765
    sError("failed to validate, pBuf->startIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->startIndex,
×
766
           pBuf->matchIndex);
767
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
768
  }
769
  if (pBuf->commitIndex > pBuf->matchIndex) {
784,256!
770
    sError("failed to validate, pBuf->commitIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->commitIndex,
×
771
           pBuf->matchIndex);
772
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
773
  }
774
  if (pBuf->matchIndex >= pBuf->endIndex) {
784,256!
775
    sError("failed to validate, pBuf->matchIndex:%" PRId64 ", pBuf->endIndex:%" PRId64, pBuf->matchIndex,
×
776
           pBuf->endIndex);
777
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
778
  }
779
  if (pBuf->endIndex - pBuf->startIndex > pBuf->size) {
784,256!
780
    sError("failed to validate, pBuf->endIndex:%" PRId64 ", pBuf->startIndex:%" PRId64 ", pBuf->size:%" PRId64,
×
781
           pBuf->endIndex, pBuf->startIndex, pBuf->size);
782
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
783
  }
784
  if (pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem == NULL) {
784,256!
785
    sError("failed to validate since pItem is null");
×
786
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
787
  }
788
  return 0;
784,256✔
789
}
790

791
int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex, const STraceId* trace,
152,116✔
792
                            const char* src) {
793
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
152,116!
794
  (void)taosThreadMutexLock(&pBuf->mutex);
152,118✔
795

796
  SSyncLogStore*  pLogStore = pNode->pLogStore;
152,122✔
797
  SSyncFSM*       pFsm = pNode->pFsm;
152,122✔
798
  ESyncState      role = pNode->state;
152,122✔
799
  SyncTerm        currentTerm = raftStoreGetTerm(pNode);
152,122✔
800
  SyncGroupId     vgId = pNode->vgId;
152,121✔
801
  int32_t         code = 0;
152,121✔
802
  int64_t         upperIndex = TMIN(commitIndex, pBuf->matchIndex);
152,121✔
803
  SSyncRaftEntry* pEntry = NULL;
152,121✔
804
  bool            inBuf = false;
152,121✔
805
  SSyncRaftEntry* pNextEntry = NULL;
152,121✔
806
  bool            nextInBuf = false;
152,121✔
807
  bool            restoreFinishAtThisCommit = false;
152,121✔
808

809
  if (commitIndex <= pBuf->commitIndex) {
152,121✔
810
    sGDebug(trace, "vgId:%d, stale commit index:%" PRId64 ", notified:%" PRId64, vgId, commitIndex, pBuf->commitIndex);
32,657!
811
    goto _out;
32,657✔
812
  }
813

814
  sGDebug(trace,
119,464!
815
          "vgId:%d, log commit since %s, buffer:[%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
816
          "), role:%d, term:%" PRId64,
817
          pNode->vgId, src, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, currentTerm);
818

819
  // execute in fsm
820
  for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) {
241,646✔
821
    // get a log entry
822
    code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry);
122,183✔
823
    if (pEntry == NULL) {
122,180!
824
      goto _out;
×
825
    }
826

827
    // execute it
828
    if (!syncUtilUserCommit(pEntry->originalRpcType)) {
122,180✔
829
      sGInfo(&pEntry->originRpcTraceId,
484!
830
             "vgId:%d, index:%" PRId64 ", log commit sync barrier, term:%" PRId64 ", type:%s", vgId, pEntry->index,
831
             pEntry->term, TMSG_INFO(pEntry->originalRpcType));
832
    }
833

834
    if ((code = syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0, false)) != 0) {
122,182!
835
      sGError(&pEntry->originRpcTraceId,
×
836
              "vgId:%d, index:%" PRId64 ", failed to execute sync log entry, term:%" PRId64
837
              ", role:%d, current term:%" PRId64,
838
              vgId, pEntry->index, pEntry->term, role, currentTerm);
839
      goto _out;
×
840
    }
841
    pBuf->commitIndex = index;
122,179✔
842

843
    sGDebug(&pEntry->originRpcTraceId,
122,179!
844
            "vgId:%d, index:%" PRId64 ", raft entry committed, term:%" PRId64 ", role:%d, current term:%" PRId64,
845
            pNode->vgId, pEntry->index, pEntry->term, role, currentTerm);
846

847
    code = syncLogBufferGetOneEntry(pBuf, pNode, index + 1, &nextInBuf, &pNextEntry);
122,179✔
848
    if (pNextEntry != NULL) {
122,179✔
849
      if (pNextEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
56,391!
850
        sInfo(
×
851
            "vgId:%d, to change config at commit, "
852
            "current entry, index:%" PRId64 ", term:%" PRId64
853
            ", "
854
            "node, role:%d, current term:%" PRId64
855
            ", restore:%d, "
856
            "cond, next entry index:%" PRId64 ", msgType:%s",
857
            vgId, pEntry->index, pEntry->term, role, currentTerm, pNode->restoreFinish, pNextEntry->index,
858
            TMSG_INFO(pNextEntry->originalRpcType));
859

860
        if ((code = syncNodeChangeConfig(pNode, pNextEntry, "Commit")) != 0) {
×
861
          sError("vgId:%d, failed to change config from Commit, index:%" PRId64 ", term:%" PRId64
×
862
                 ", role:%d, current term:%" PRId64,
863
                 vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
864
          goto _out;
×
865
        }
866

867
        // for 2->1, need to apply config change entry in sync thread,
868
        if (pNode->replicaNum == 1) {
×
869
          if ((code = syncFsmExecute(pNode, pFsm, role, currentTerm, pNextEntry, 0, true)) != 0) {
×
870
            sError("vgId:%d, failed to execute sync log entry, index:%" PRId64 ", term:%" PRId64
×
871
                   ", role:%d, current term:%" PRId64,
872
                   vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
873
            goto _out;
×
874
          }
875

876
          index++;
×
877
          pBuf->commitIndex = index;
×
878

879
          sGDebug(&pNextEntry->originRpcTraceId,
×
880
                  "vgId:%d, index:%" PRId64 ", raft entry committed, term:%" PRId64 ", role:%d, current term:%" PRId64,
881
                  pNode->vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
882
        }
883
      }
884
      if (!nextInBuf) {
56,393!
885
        syncEntryDestroy(pNextEntry);
×
886
        pNextEntry = NULL;
×
887
      }
888
    }
889

890
    if (!inBuf) {
122,181!
891
      syncEntryDestroy(pEntry);
×
892
      pEntry = NULL;
×
893
    }
894
  }
895

896
  // recycle
897
  bool      isVnode = pNode->vgId > 1;
119,463✔
898
  SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION;
119,463✔
899
  do {
102,517✔
900
    if ((pBuf->startIndex >= pBuf->commitIndex) ||
221,980✔
901
        !((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD &&
221,979✔
902
                                         atomic_load_64(&tsLogBufferMemoryUsed) >= tsLogBufferMemoryAllowed))) {
500✔
903
      break;
904
    }
905
    SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem;
102,518✔
906
    if (pEntry == NULL) {
102,518!
907
      sError("vgId:%d, invalid log entry to recycle, index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
×
908
             ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64,
909
             pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term);
910
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
911
    }
912
    if (isVnode) {
102,518✔
913
      pBuf->bytes -= pEntry->bytes;
102,457✔
914
      (void)atomic_sub_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
102,457✔
915
    }
916
    sDebug("vgId:%d, recycle log entry, index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
102,518✔
917
           ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64
918
           ", used:%" PRId64 ", allowed:%" PRId64,
919
           pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term,
920
           pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed), tsLogBufferMemoryAllowed);
921
    syncEntryDestroy(pEntry);
102,518✔
922
    (void)memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
102,517✔
923
    ++pBuf->startIndex;
102,517✔
924
  } while (true);
925

926
  code = 0;
119,462✔
927
_out:
152,119✔
928
  // mark as restored if needed
929
  if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL &&
152,119!
930
      currentTerm <= pEntry->term) {
414✔
931
    pNode->pFsm->FpRestoreFinishCb(pNode->pFsm, pBuf->commitIndex);
335✔
932
    pNode->restoreFinish = true;
335✔
933
    restoreFinishAtThisCommit = true;
335✔
934
    sInfo("vgId:%d, restore finished, term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
335✔
935
          pNode->vgId, currentTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
936
  }
937

938
  if (!inBuf) {
152,119✔
939
    syncEntryDestroy(pEntry);
32,657✔
940
    pEntry = NULL;
32,657✔
941
  }
942
  if (!nextInBuf) {
152,119✔
943
    syncEntryDestroy(pNextEntry);
98,374✔
944
    pNextEntry = NULL;
98,373✔
945
  }
946
  (void)taosThreadMutexUnlock(&pBuf->mutex);
152,118✔
947

948
  if (restoreFinishAtThisCommit && pNode->pFsm->FpAfterRestoredCb != NULL) {
152,120✔
949
    pNode->pFsm->FpAfterRestoredCb(pNode->pFsm, pBuf->commitIndex);
61✔
950
    sInfo("vgId:%d, after restore finished callback executed)", pNode->vgId);
61✔
951
  }
952

953
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
152,120!
954
  TAOS_RETURN(code);
152,119✔
955
}
956

957
void syncLogReplReset(SSyncLogReplMgr* pMgr) {
862✔
958
  if (pMgr == NULL) return;
862!
959

960
  if (pMgr->startIndex < 0) {
862!
961
    sError("failed to reset, pMgr->startIndex:%" PRId64, pMgr->startIndex);
×
962
    return;
×
963
  }
964
  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
911✔
965
    (void)memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
49✔
966
  }
967
  pMgr->startIndex = 0;
862✔
968
  pMgr->matchIndex = 0;
862✔
969
  pMgr->endIndex = 0;
862✔
970
  pMgr->restored = false;
862✔
971
  pMgr->retryBackoff = 0;
862✔
972
}
973

974
int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
112,377✔
975
  if (pMgr->endIndex <= pMgr->startIndex) {
112,377!
976
    return 0;
×
977
  }
978

979
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
112,377✔
980
  if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) {
112,377!
981
    syncLogReplReset(pMgr);
×
982
    sWarn("vgId:%d, reset sync log repl since retry backoff exceeding limit, peer addr:0x%" PRIx64, pNode->vgId,
×
983
          pDestId->addr);
984
    return TSDB_CODE_OUT_OF_RANGE;
×
985
  }
986

987
  int32_t  code = 0;
112,377✔
988
  bool     retried = false;
112,377✔
989
  int64_t  retryWaitMs = syncLogReplGetRetryBackoffTimeMs(pMgr);
112,377✔
990
  int64_t  nowMs = taosGetMonoTimestampMs();
112,376✔
991
  int      count = 0;
112,376✔
992
  int64_t  firstIndex = -1;
112,376✔
993
  SyncTerm term = -1;
112,376✔
994
  int64_t  batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
112,376✔
995

996
  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
114,725✔
997
    int64_t pos = index % pMgr->size;
114,165✔
998
    if (!(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex))) {
114,165!
999
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1000
      goto _out;
×
1001
    }
1002

1003
    if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) {
114,165✔
1004
      break;
111,817✔
1005
    }
1006

1007
    if (pMgr->states[pos].acked) {
2,348✔
1008
      if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) {
2,332!
1009
        syncLogReplReset(pMgr);
×
1010
        sWarn("vgId:%d, reset sync log repl since stagnation, index:%" PRId64 ", peer addr:0x%" PRIx64, pNode->vgId, index,
×
1011
              pDestId->addr);
1012
        code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1013
        goto _out;
×
1014
      }
1015
      continue;
2,313✔
1016
    }
1017

1018
    bool barrier = false;
35✔
1019
    if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) {
35!
1020
      sError("vgId:%d, failed to replicate sync log entry since %s, index:%" PRId64 ", dest addr:0x%" PRIx64, pNode->vgId,
×
1021
             tstrerror(code), index, pDestId->addr);
1022
      goto _out;
×
1023
    }
1024
    if (barrier != pMgr->states[pos].barrier) {
36!
1025
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1026
      goto _out;
×
1027
    }
1028
    pMgr->states[pos].timeMs = nowMs;
36✔
1029
    pMgr->states[pos].term = term;
36✔
1030
    pMgr->states[pos].acked = false;
36✔
1031

1032
    retried = true;
36✔
1033
    if (firstIndex == -1) firstIndex = index;
36✔
1034

1035
    if (batchSize <= count++) {
36!
1036
      break;
×
1037
    }
1038
  }
1039

1040
_out:
560✔
1041
  if (retried) {
112,377✔
1042
    pMgr->retryBackoff = syncLogReplGetNextRetryBackoff(pMgr);
2✔
1043
    SSyncLogBuffer* pBuf = pNode->pLogBuf;
2✔
1044
    sInfo("vgId:%d, resend %d sync log entries, dest addr:0x%" PRIx64 ", indexes:%" PRId64 " ..., terms: .., %" PRId64
2!
1045
          ", retryWaitMs:%" PRId64 ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64
1046
          " %" PRId64 ", %" PRId64 ")",
1047
          pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex,
1048
          pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1049
  }
1050
  TAOS_RETURN(code);
112,377✔
1051
}
1052

1053
int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
87✔
1054
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
87✔
1055
  SRaftId         destId = pMsg->srcId;
87✔
1056
  int32_t         code = 0;
87✔
1057
  if (pMgr->restored != false) return TSDB_CODE_SYN_INTERNAL_ERROR;
87!
1058

1059
  sTrace("vgId:%d, begin to recover sync log repl, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 ", %" PRId64
87!
1060
         ", %" PRId64 ") restore:%d, buffer: [%" PRId64 ", %" PRId64 ", %" PRId64 ", %" PRId64
1061
         "), msg: {lastSendIndex:%" PRId64 ", matchIndex:%" PRId64 ", fsmState:%d, success:%d, lastMatchTerm:%" PRId64
1062
         "}",
1063
         pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pMgr->restored,
1064
         pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, pMsg->lastSendIndex, pMsg->matchIndex,
1065
         pMsg->fsmState, pMsg->success, pMsg->lastMatchTerm);
1066

1067
  if (pMgr->endIndex == 0) {
87✔
1068
    if (pMgr->startIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
34!
1069
    if (pMgr->matchIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
34!
1070
    if (pMsg->matchIndex < 0) {
34✔
1071
      pMgr->restored = true;
8✔
1072
      sInfo("vgId:%d, sync log repl restored, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
8!
1073
            "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1074
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1075
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1076
      return 0;
8✔
1077
    }
1078
  } else {
1079
    if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) {
53!
1080
      TAOS_CHECK_RETURN(syncLogReplRetryOnNeed(pMgr, pNode));
×
1081
      return 0;
×
1082
    }
1083

1084
    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
53✔
1085

1086
    if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) {
53!
1087
      pMgr->matchIndex = pMsg->matchIndex;
49✔
1088
      pMgr->restored = true;
49✔
1089
      sInfo("vgId:%d, sync log repl restored, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
49!
1090
            "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1091
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1092
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1093
      return 0;
49✔
1094
    }
1095

1096
    if (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE || (!pMsg->success && pMsg->matchIndex >= pMsg->lastSendIndex)) {
4!
1097
      char* msg1 = " rollback match index failure";
×
1098
      char* msg2 = " incomplete fsm state";
×
1099
      sInfo("vgId:%d, snapshot replication to dnode:%d, reason:%s, match index:%" PRId64 ", last sent:%" PRId64,
×
1100
            pNode->vgId, DID(&destId), (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1), pMsg->matchIndex,
1101
            pMsg->lastSendIndex);
1102
      if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) {
×
1103
        sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
×
1104
        TAOS_RETURN(code);
×
1105
      }
1106
      return 0;
×
1107
    }
1108
  }
1109

1110
  // check last match term
1111
  SyncTerm  term = -1;
30✔
1112
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
30✔
1113
  SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex);
30✔
1114
  SET_ERRNO(0);
30✔
1115

1116
  if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) {
30✔
1117
    code = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1, &term);
9✔
1118
    if (term < 0 && (ERRNO == ENFILE || ERRNO == EMFILE || ERRNO == ENOENT)) {
9!
1119
      sError("vgId:%d, failed to get prev log term since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index + 1);
×
1120
      TAOS_RETURN(code);
×
1121
    }
1122

1123
    if (pMsg->matchIndex == -1) {
9✔
1124
      // first time to restore
1125
      sInfo("vgId:%d, first time to restore sync log repl, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64
4!
1126
            ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), index:%" PRId64
1127
            ", firstVer:%" PRId64 ", term:%" PRId64 ", lastMatchTerm:%" PRId64,
1128
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1129
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, index, firstVer, term,
1130
            pMsg->lastMatchTerm);
1131
    }
1132

1133
    if ((index + 1 < firstVer) || (term < 0) ||
9!
1134
        (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
9!
1135
      if (!(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST)) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1136
      if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) {
×
1137
        sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
×
1138
        TAOS_RETURN(code);
×
1139
      }
1140
      sInfo("vgId:%d, snapshot replication to peer dnode:%d", pNode->vgId, DID(&destId));
×
1141
      return 0;
×
1142
    }
1143

1144
    if (!(index + 1 >= firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR;
9!
1145

1146
    if (term == pMsg->lastMatchTerm) {
9!
1147
      index = index + 1;
9✔
1148
      if (!(index <= pNode->pLogBuf->matchIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR;
9!
1149
    } else {
1150
      if (!(index > firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1151
    }
1152
  }
1153

1154
  // attempt to replicate the raft log at index
1155
  syncLogReplReset(pMgr);
30✔
1156
  return syncLogReplProbe(pMgr, pNode, index);
30✔
1157
}
1158

1159
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
4,089✔
1160
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
4,089✔
1161
  (void)taosThreadMutexLock(&pMgr->mutex);
4,089✔
1162
  if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
4,089!
1163
    sInfo("vgId:%d, reset sync log repl in heartbeat, peer addr:0x%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64,
48!
1164
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
1165
    syncLogReplReset(pMgr);
48✔
1166
    pMgr->peerStartTime = pMsg->startTime;
48✔
1167
  }
1168
  (void)taosThreadMutexUnlock(&pMgr->mutex);
4,089✔
1169
  return 0;
4,089✔
1170
}
1171

1172
int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
56,033✔
1173
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
56,033✔
1174
  (void)taosThreadMutexLock(&pMgr->mutex);
56,033✔
1175
  if (pMsg->startTime != pMgr->peerStartTime) {
56,033✔
1176
    sInfo("vgId:%d, reset sync log repl in appendlog reply, peer addr:0x%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64,
9!
1177
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
1178
    syncLogReplReset(pMgr);
9✔
1179
    pMgr->peerStartTime = pMsg->startTime;
9✔
1180
  }
1181
  (void)taosThreadMutexUnlock(&pMgr->mutex);
56,033✔
1182

1183
  (void)taosThreadMutexLock(&pBuf->mutex);
56,033✔
1184

1185
  int32_t code = 0;
56,033✔
1186
  if (pMgr->restored) {
56,033✔
1187
    if ((code = syncLogReplContinue(pMgr, pNode, pMsg)) != 0) {
55,946!
1188
      sWarn("vgId:%d, failed to continue sync log repl since %s", pNode->vgId, tstrerror(code));
×
1189
    }
1190
  } else {
1191
    if ((code = syncLogReplRecover(pMgr, pNode, pMsg)) != 0) {
87!
1192
      sWarn("vgId:%d, failed to recover sync log repl since %s", pNode->vgId, tstrerror(code));
×
1193
    }
1194
  }
1195
  (void)taosThreadMutexUnlock(&pBuf->mutex);
56,033✔
1196
  return 0;
56,032✔
1197
}
1198

1199
int32_t syncLogReplStart(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
56,487✔
1200
  if (pMgr->restored) {
56,487✔
1201
    TAOS_CHECK_RETURN(syncLogReplAttempt(pMgr, pNode));
56,431!
1202
  } else {
1203
    TAOS_CHECK_RETURN(syncLogReplProbe(pMgr, pNode, pNode->pLogBuf->matchIndex));
56!
1204
  }
1205
  return 0;
56,492✔
1206
}
1207

1208
int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
91✔
1209
  if (pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR;
91!
1210
  if (!(pMgr->startIndex >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
91!
1211
  int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs();
91✔
1212
  int64_t nowMs = taosGetMonoTimestampMs();
91✔
1213
  int32_t code = 0;
91✔
1214

1215
  sTrace("vgId:%d, begin to probe peer addr:0x%" PRIx64 " with msg of index:%" PRId64 ", repl-mgr:[%" PRId64 ", %" PRId64
91!
1216
         ", %" PRId64 "), restored:%d",
1217
         pNode->vgId, pNode->replicasId[pMgr->peerId].addr, index, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1218
         pMgr->restored);
1219

1220
  if (pMgr->endIndex > pMgr->startIndex &&
91✔
1221
      nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) {
3✔
1222
    return 0;
2✔
1223
  }
1224
  syncLogReplReset(pMgr);
89✔
1225

1226
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
89✔
1227
  bool     barrier = false;
89✔
1228
  SyncTerm term = -1;
89✔
1229
  if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) {
89!
1230
    sError("vgId:%d, failed to replicate log entry since %s, index:%" PRId64 ", dest addr:0x%016" PRIx64, pNode->vgId,
×
1231
           tstrerror(code), index, pDestId->addr);
1232
    TAOS_RETURN(code);
×
1233
  }
1234

1235
  if (!(index >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
89!
1236
  pMgr->states[index % pMgr->size].barrier = barrier;
89✔
1237
  pMgr->states[index % pMgr->size].timeMs = nowMs;
89✔
1238
  pMgr->states[index % pMgr->size].term = term;
89✔
1239
  pMgr->states[index % pMgr->size].acked = false;
89✔
1240

1241
  pMgr->startIndex = index;
89✔
1242
  pMgr->endIndex = index + 1;
89✔
1243

1244
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
89✔
1245
  sTrace("vgId:%d, probe peer addr:0x%" PRIx64 " with msg of index:%" PRId64 " term:%" PRId64 ", repl-mgr:[%" PRId64
89!
1246
         " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1247
         pNode->vgId, pDestId->addr, index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex,
1248
         pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1249
  return 0;
89✔
1250
}
1251

1252
int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
112,375✔
1253
  if (!pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR;
112,375!
1254

1255
  sTrace("vgId:%d, replicate raft entries from end to match, repl-mgr:[%" PRId64 ", %" PRId64 ", %" PRId64
112,375!
1256
         "), restore:%d",
1257
         pNode->vgId, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pMgr->restored);
1258

1259
  SRaftId*  pDestId = &pNode->replicasId[pMgr->peerId];
112,375✔
1260
  int32_t   batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
112,375✔
1261
  int32_t   code = 0;
112,375✔
1262
  int32_t   count = 0;
112,375✔
1263
  int64_t   nowMs = taosGetMonoTimestampMs();
112,377✔
1264
  int64_t   limit = pMgr->size >> 1;
112,377✔
1265
  SyncTerm  term = -1;
112,377✔
1266
  SyncIndex firstIndex = -1;
112,377✔
1267

1268
  for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) {
168,255✔
1269
    if (batchSize < count || limit <= index - pMgr->startIndex) {
55,970!
1270
      break;
1271
    }
1272
    if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) {
55,970✔
1273
      break;
47✔
1274
    }
1275
    int64_t  pos = index % pMgr->size;
55,923✔
1276
    SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
55,923✔
1277
    bool     barrier = false;
55,923✔
1278
    SyncTerm term = -1;
55,923✔
1279

1280
    code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier);
55,923✔
1281
    if (code < 0) {
55,923✔
1282
      sError("vgId:%d, failed to replicate log entry since %s, index:%" PRId64 ", dest addr:0x%016" PRIx64, pNode->vgId,
2!
1283
             tstrerror(code), index, pDestId->addr);
1284
      TAOS_RETURN(code);
2✔
1285
    }
1286
    pMgr->states[pos].barrier = barrier;
55,921✔
1287
    pMgr->states[pos].timeMs = nowMs;
55,921✔
1288
    pMgr->states[pos].term = term;
55,921✔
1289
    pMgr->states[pos].acked = false;
55,921✔
1290

1291
    if (firstIndex == -1) {
55,921✔
1292
      firstIndex = index;
55,710✔
1293
    }
1294

1295
    count++;
55,921✔
1296

1297
    pMgr->endIndex = index + 1;
55,921✔
1298
    if (barrier) {
55,921✔
1299
      sInfo("vgId:%d, replicated sync barrier to dnode:%d, index:%" PRId64 ", term:%" PRId64 ", repl-mgr:[%" PRId64
43!
1300
            " %" PRId64 ", %" PRId64 ")",
1301
            pNode->vgId, DID(pDestId), index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
1302
      break;
43✔
1303
    }
1304
  }
1305

1306
  TAOS_CHECK_RETURN(syncLogReplRetryOnNeed(pMgr, pNode));
112,375!
1307

1308
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
112,378✔
1309
  sTrace("vgId:%d, replicated %d msgs to peer addr:0x%" PRIx64 ", indexes:%" PRId64 "..., terms: ...%" PRId64
112,378!
1310
         ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
1311
         ")",
1312
         pNode->vgId, count, pDestId->addr, firstIndex, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1313
         pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1314
  return 0;
112,377✔
1315
}
1316

1317
int32_t syncLogReplContinue(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
55,946✔
1318
  if (pMgr->restored != true) return TSDB_CODE_SYN_INTERNAL_ERROR;
55,946!
1319
  if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) {
55,946!
1320
    if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) {
55,851!
1321
      int64_t firstMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs;
×
1322
      int64_t lastMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs;
×
1323
      int64_t diffMs = lastMs - firstMs;
×
1324
      if (diffMs > 0 && diffMs < ((int64_t)SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) {
×
1325
        pMgr->retryBackoff -= 1;
×
1326
      }
1327
    }
1328
    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
55,851✔
1329
    pMgr->matchIndex = TMAX(pMgr->matchIndex, pMsg->matchIndex);
55,851✔
1330
    for (SyncIndex index = pMgr->startIndex; index < pMgr->matchIndex; index++) {
111,751✔
1331
      (void)memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
55,900✔
1332
    }
1333
    pMgr->startIndex = pMgr->matchIndex;
55,851✔
1334
  }
1335

1336
  return syncLogReplAttempt(pMgr, pNode);
55,946✔
1337
}
1338

1339
SSyncLogReplMgr* syncLogReplCreate() {
5,100✔
1340
  SSyncLogReplMgr* pMgr = taosMemoryCalloc(1, sizeof(SSyncLogReplMgr));
5,100!
1341
  if (pMgr == NULL) {
5,100!
1342
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1343
    return NULL;
×
1344
  }
1345

1346
  pMgr->size = sizeof(pMgr->states) / sizeof(pMgr->states[0]);
5,100✔
1347

1348
  if (pMgr->size != TSDB_SYNC_LOG_BUFFER_SIZE) {
5,100!
1349
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1350
    return NULL;
×
1351
  }
1352

1353
  int32_t code = taosThreadMutexInit(&pMgr->mutex, NULL);
5,100✔
1354
  if (code) {
5,100!
1355
    terrno = code;
×
1356
    return NULL;
×
1357
  }
1358

1359
  return pMgr;
5,100✔
1360
}
1361

1362
void syncLogReplDestroy(SSyncLogReplMgr* pMgr) {
5,055✔
1363
  if (pMgr == NULL) {
5,055!
1364
    return;
×
1365
  }
1366
  (void)taosThreadMutexDestroy(&pMgr->mutex);
5,055✔
1367
  taosMemoryFree(pMgr);
5,055!
1368
  return;
5,055✔
1369
}
1370

1371
int32_t syncNodeLogReplInit(SSyncNode* pNode) {
340✔
1372
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
5,440✔
1373
    if (pNode->logReplMgrs[i] != NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
5,100!
1374
    pNode->logReplMgrs[i] = syncLogReplCreate();
5,100✔
1375
    if (pNode->logReplMgrs[i] == NULL) {
5,100!
1376
      TAOS_RETURN(terrno);
×
1377
    }
1378
    pNode->logReplMgrs[i]->peerId = i;
5,100✔
1379
  }
1380
  return 0;
340✔
1381
}
1382

1383
void syncNodeLogReplDestroy(SSyncNode* pNode) {
337✔
1384
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
5,392✔
1385
    syncLogReplDestroy(pNode->logReplMgrs[i]);
5,055✔
1386
    pNode->logReplMgrs[i] = NULL;
5,055✔
1387
  }
1388
}
337✔
1389

1390
int32_t syncLogBufferCreate(SSyncLogBuffer** ppBuf) {
340✔
1391
  int32_t         code = 0;
340✔
1392
  SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer));
340!
1393
  if (pBuf == NULL) {
340!
1394
    TAOS_CHECK_GOTO(terrno, NULL, _exit);
×
1395
  }
1396

1397
  pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]);
340✔
1398

1399
  if (pBuf->size != TSDB_SYNC_LOG_BUFFER_SIZE) {
340!
1400
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1401
    goto _exit;
×
1402
  }
1403

1404
  if (taosThreadMutexAttrInit(&pBuf->attr) < 0) {
340!
1405
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1406
    sError("failed to init log buffer mutexattr due to %s", tstrerror(code));
×
1407
    goto _exit;
×
1408
  }
1409

1410
  if (taosThreadMutexAttrSetType(&pBuf->attr, PTHREAD_MUTEX_RECURSIVE) < 0) {
340!
1411
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1412
    sError("failed to set log buffer mutexattr type due to %s", tstrerror(code));
×
1413
    goto _exit;
×
1414
  }
1415

1416
  if (taosThreadMutexInit(&pBuf->mutex, &pBuf->attr) < 0) {
340!
1417
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1418
    sError("failed to init log buffer mutex due to %s", tstrerror(code));
×
1419
    goto _exit;
×
1420
  }
1421
_exit:
340✔
1422
  if (code != 0) {
340!
1423
    taosMemoryFreeClear(pBuf);
×
1424
  }
1425
  *ppBuf = pBuf;
340✔
1426
  TAOS_RETURN(code);
340✔
1427
}
1428

1429
void syncLogBufferClear(SSyncLogBuffer* pBuf) {
337✔
1430
  (void)taosThreadMutexLock(&pBuf->mutex);
337✔
1431
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
20,312✔
1432
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
19,980✔
1433
    if (pEntry == NULL) continue;
19,980!
1434
    syncEntryDestroy(pEntry);
19,980✔
1435
    pEntry = NULL;
19,975✔
1436
    (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
19,975✔
1437
  }
1438
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
332✔
1439
  pBuf->bytes = 0;
332✔
1440
  (void)taosThreadMutexUnlock(&pBuf->mutex);
332✔
1441
}
337✔
1442

1443
void syncLogBufferDestroy(SSyncLogBuffer* pBuf) {
337✔
1444
  if (pBuf == NULL) {
337!
1445
    return;
×
1446
  }
1447
  syncLogBufferClear(pBuf);
337✔
1448
  (void)taosThreadMutexDestroy(&pBuf->mutex);
337✔
1449
  (void)taosThreadMutexAttrDestroy(&pBuf->attr);
337✔
1450
  taosMemoryFree(pBuf);
337!
1451
  return;
337✔
1452
}
1453

1454
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
411✔
1455
  int32_t code = 0;
411✔
1456
  if (!(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR;
411!
1457

1458
  if (toIndex == pBuf->endIndex) {
411!
1459
    return 0;
411✔
1460
  }
1461

1462
  sInfo("vgId:%d, rollback sync log buffer, toindex:%" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
×
1463
        ")",
1464
        pNode->vgId, toIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1465

1466
  // trunc buffer
1467
  SyncIndex index = pBuf->endIndex - 1;
×
1468
  while (index >= toIndex) {
×
1469
    SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem;
×
1470
    if (pEntry != NULL) {
×
1471
      syncEntryDestroy(pEntry);
×
1472
      pEntry = NULL;
×
1473
      (void)memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0]));
×
1474
    }
1475
    index--;
×
1476
  }
1477
  pBuf->endIndex = toIndex;
×
1478
  pBuf->matchIndex = TMIN(pBuf->matchIndex, index);
×
1479
  if (index + 1 != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1480

1481
  // trunc wal
1482
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
×
1483
  if (lastVer >= toIndex && (code = pNode->pLogStore->syncLogTruncate(pNode->pLogStore, toIndex)) < 0) {
×
1484
    sError("vgId:%d, failed to truncate log store since %s, from index:%" PRId64, pNode->vgId, tstrerror(code),
×
1485
           toIndex);
1486
    TAOS_RETURN(code);
×
1487
  }
1488
  lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
×
1489
  if (toIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1490

1491
  // refill buffer on need
1492
  if (toIndex <= pBuf->startIndex) {
×
1493
    if ((code = syncLogBufferInitWithoutLock(pBuf, pNode)) < 0) {
×
1494
      sError("vgId:%d, failed to refill sync log buffer since %s", pNode->vgId, tstrerror(code));
×
1495
      TAOS_RETURN(code);
×
1496
    }
1497
  }
1498

1499
  if (pBuf->endIndex != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
×
1500
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
×
1501
  return 0;
×
1502
}
1503

1504
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
411✔
1505
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
411!
1506
  (void)taosThreadMutexLock(&pBuf->mutex);
411✔
1507
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
411✔
1508
  if (lastVer != pBuf->matchIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
411!
1509
  SyncIndex index = pBuf->endIndex - 1;
411✔
1510

1511
  int32_t code = 0;
411✔
1512
  if ((code = syncLogBufferRollback(pBuf, pNode, pBuf->matchIndex + 1)) != 0) {
411!
1513
    sError("vgId:%d, failed to rollback sync log buffer since %s", pNode->vgId, tstrerror(code));
×
1514
  }
1515

1516
  sInfo("vgId:%d, reset sync log buffer, buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
411✔
1517
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1518

1519
  pBuf->endIndex = pBuf->matchIndex + 1;
411✔
1520

1521
  // reset repl mgr
1522
  for (int i = 0; i < pNode->totalReplicaNum; i++) {
1,097✔
1523
    SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
686✔
1524
    syncLogReplReset(pMgr);
686✔
1525
  }
1526
  (void)taosThreadMutexUnlock(&pBuf->mutex);
411✔
1527
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
411!
1528
  return 0;
411✔
1529
}
1530

1531
int32_t syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf,
356,391✔
1532
                                 SSyncRaftEntry** ppEntry) {
1533
  int32_t code = 0;
356,391✔
1534

1535
  *ppEntry = NULL;
356,391✔
1536

1537
  if (index >= pBuf->endIndex) {
356,391✔
1538
    return TSDB_CODE_OUT_OF_RANGE;
121,599✔
1539
  }
1540

1541
  if (index > pBuf->startIndex) {  // startIndex might be dummy
234,792✔
1542
    *pInBuf = true;
234,725✔
1543
    *ppEntry = pBuf->entries[index % pBuf->size].pItem;
234,725✔
1544
  } else {
1545
    *pInBuf = false;
67✔
1546

1547
    if ((code = pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, index, ppEntry)) < 0) {
67!
1548
      sWarn("vgId:%d, failed to get log entry since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
×
1549
    }
1550
  }
1551
  TAOS_RETURN(code);
234,797✔
1552
}
1553

1554
int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId,
56,045✔
1555
                          bool* pBarrier) {
1556
  SSyncRaftEntry* pEntry = NULL;
56,045✔
1557
  SRpcMsg         msgOut = {0};
56,045✔
1558
  bool            inBuf = false;
56,045✔
1559
  SyncTerm        prevLogTerm = -1;
56,045✔
1560
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
56,045✔
1561
  int32_t         code = 0;
56,045✔
1562
  int32_t         lino = 0;
56,045✔
1563

1564
  code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry);
56,045✔
1565
  if (pEntry == NULL) {
56,047!
1566
    sWarn("vgId:%d, failed to get raft entry for index:%" PRId64, pNode->vgId, index);
×
1567
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) {
×
1568
      SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
×
1569
      if (pMgr) {
×
1570
        sInfo("vgId:%d, reset sync log repl of peer addr:0x%" PRIx64 " since %s, index:%" PRId64, pNode->vgId, pDestId->addr,
×
1571
              tstrerror(code), index);
1572
        syncLogReplReset(pMgr);
×
1573
      }
1574
    }
1575
    goto _err;
×
1576
  }
1577
  *pBarrier = syncLogReplBarrier(pEntry);
56,047✔
1578

1579
  code = syncLogReplGetPrevLogTerm(pMgr, pNode, index, &prevLogTerm);
56,047✔
1580
  if (prevLogTerm < 0) {
56,044!
1581
    sError("vgId:%d, failed to get prev log term since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
×
1582
    goto _err;
×
1583
  }
1584
  if (pTerm) *pTerm = pEntry->term;
56,044!
1585

1586
  code = syncBuildAppendEntriesFromRaftEntry(pNode, pEntry, prevLogTerm, &msgOut);
56,044✔
1587
  if (code < 0) {
56,047!
1588
    sError("vgId:%d, failed to get append entries for index:%" PRId64, pNode->vgId, index);
×
1589
    goto _err;
×
1590
  }
1591

1592
  TRACE_SET_MSGID(&(msgOut.info.traceId), tGenIdPI64());
56,047✔
1593
  sGDebug(&msgOut.info.traceId,
56,047!
1594
          "vgId:%d, index:%" PRId64 ", replicate one msg to dest addr:0x%" PRIx64 ", term:%" PRId64 " prevterm:%" PRId64,
1595
          pNode->vgId, pEntry->index, pDestId->addr, pEntry->term, prevLogTerm);
1596
  TAOS_CHECK_GOTO(syncNodeSendAppendEntries(pNode, pDestId, &msgOut), &lino, _err);
56,047!
1597

1598
  if (!inBuf) {
56,048✔
1599
    syncEntryDestroy(pEntry);
72✔
1600
    pEntry = NULL;
72✔
1601
  }
1602
  return 0;
56,048✔
1603

1604
_err:
×
1605
  rpcFreeCont(msgOut.pCont);
×
1606
  msgOut.pCont = NULL;
×
1607
  if (!inBuf) {
×
1608
    syncEntryDestroy(pEntry);
×
1609
    pEntry = NULL;
×
1610
  }
1611
  TAOS_RETURN(code);
×
1612
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc