• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4983

13 Mar 2026 03:38AM UTC coverage: 68.653% (+0.07%) from 68.587%
#4983

push

travis-ci

web-flow
feat/6641435300-save-audit-in-self (#34738)

434 of 584 new or added lines in 10 files covered. (74.32%)

434 existing lines in 121 files now uncovered.

212745 of 309883 relevant lines covered (68.65%)

134272959.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.53
/source/libs/sync/src/syncPipeline.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17

18
#include "syncCommit.h"
19
#include "syncIndexMgr.h"
20
#include "syncInt.h"
21
#include "syncPipeline.h"
22
#include "syncRaftCfg.h"
23
#include "syncRaftEntry.h"
24
#include "syncRaftStore.h"
25
#include "syncReplication.h"
26
#include "syncRespMgr.h"
27
#include "syncSnapshot.h"
28
#include "syncUtil.h"
29
#include "syncVoteMgr.h"
30

31
static int64_t tsLogBufferMemoryUsed = 0;  // total bytes of vnode log buffer
32

33
static bool syncIsMsgBlock(tmsg_t type) {
195,144,677✔
34
  return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
191,582,617✔
35
         (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
386,727,294✔
36
}
37

38
FORCE_INLINE static int64_t syncGetRetryMaxWaitMs() {
39
  return SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF);
2,943,541✔
40
}
41

42
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
725,226,405✔
43
  (void)taosThreadMutexLock(&pBuf->mutex);
725,226,405✔
44
  int64_t index = pBuf->endIndex;
725,240,973✔
45
  (void)taosThreadMutexUnlock(&pBuf->mutex);
725,244,038✔
46
  return index;
725,240,227✔
47
}
48

49
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
725,202,004✔
50
  int32_t code = 0;
725,202,004✔
51
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
725,202,004✔
52
  (void)taosThreadMutexLock(&pBuf->mutex);
725,236,478✔
53
  SyncIndex index = pEntry->index;
725,242,304✔
54

55
  if (index - pBuf->startIndex >= pBuf->size) {
725,242,908✔
56
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
57
    sError("vgId:%d, failed to append since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
×
58
    goto _err;
×
59
  }
60

61
  if (pNode->restoreFinish && index - pBuf->commitIndex >= TSDB_SYNC_NEGOTIATION_WIN) {
725,214,180✔
62
    code = TSDB_CODE_SYN_NEGOTIATION_WIN_FULL;
×
63
    sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64, pNode->vgId, tstrerror(code),
×
64
           index, pBuf->commitIndex);
65
    goto _err;
×
66
  }
67

68
  SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
725,223,886✔
69
  if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= tsSyncApplyQueueSize) {
725,231,052✔
70
    code = TSDB_CODE_SYN_WRITE_STALL;
×
71
    sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64,
×
72
           pNode->vgId, tstrerror(code), index, pBuf->commitIndex, appliedIndex);
73
    goto _err;
×
74
  }
75

76
  if (index != pBuf->endIndex) {
725,231,052✔
77
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
78
    goto _err;
×
79
  };
80

81
  SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem;
725,222,086✔
82
  if (pExist != NULL) {
725,228,547✔
83
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
84
    goto _err;
×
85
  }
86

87
  // initial log buffer with at least one item, e.g, commitIndex
88
  SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem;
725,228,547✔
89
  if (pMatch == NULL) {
725,215,895✔
90
    sError("vgId:%d, no matched log entry", pNode->vgId);
×
91
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
92
    goto _err;
×
93
  }
94
  if (pMatch->index + 1 != index) {
725,215,895✔
95
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
96
    goto _err;
×
97
  }
98
  if (!(pMatch->term <= pEntry->term)) {
725,226,555✔
99
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
100
    goto _err;
×
101
  }
102

103
  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
725,226,143✔
104
  pBuf->entries[index % pBuf->size] = tmp;
725,217,201✔
105
  pBuf->endIndex = index + 1;
725,218,640✔
106
  if (pNode->vgId > 1) {
725,206,515✔
107
    pBuf->bytes += pEntry->bytes;
686,106,587✔
108
    (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
686,095,991✔
109
  }
110

111
  (void)taosThreadMutexUnlock(&pBuf->mutex);
725,224,153✔
112
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
725,242,622✔
113
  return 0;
725,224,881✔
114

115
_err:
×
116
  (void)taosThreadMutexUnlock(&pBuf->mutex);
×
117
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
×
118
  taosMsleep(1);
×
119
  TAOS_RETURN(code);
×
120
}
121

122
int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pSyncTerm) {
110,239,134✔
123
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
110,239,134✔
124
  SSyncRaftEntry* pEntry = NULL;
110,238,527✔
125
  SyncIndex       prevIndex = index - 1;
110,240,899✔
126
  SyncTerm        prevLogTerm = -1;
110,240,899✔
127
  int32_t         code = 0;
110,240,899✔
128

129
  if (prevIndex == -1 && pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore) == 0) {
110,240,899✔
130
    *pSyncTerm = 0;
654,858✔
131
    return 0;
654,858✔
132
  }
133

134
  if (prevIndex > pBuf->matchIndex) {
109,585,865✔
135
    *pSyncTerm = -1;
70,325✔
136
    TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
70,325✔
137
  }
138

139
  if (index - 1 != prevIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
109,518,181✔
140

141
  if (prevIndex >= pBuf->startIndex) {
109,518,181✔
142
    pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem;
97,667,482✔
143
    if (pEntry == NULL) {
97,668,309✔
144
      sError("vgId:%d, failed to get pre log term since no log entry found", pNode->vgId);
×
145
      *pSyncTerm = -1;
×
146
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
147
    }
148
    prevLogTerm = pEntry->term;
97,668,309✔
149
    *pSyncTerm = prevLogTerm;
97,666,158✔
150
    return 0;
97,666,884✔
151
  }
152

153
  if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) {
11,851,416✔
154
    int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs;
9,983,433✔
155
    if (timeMs == 0) {
9,983,433✔
156
      sError("vgId:%d, failed to get pre log term since timeMs is 0", pNode->vgId);
×
157
      *pSyncTerm = -1;
×
158
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
159
    }
160
    prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term;
9,983,433✔
161
    if (!(prevIndex == 0 || prevLogTerm != 0)) {
9,983,433✔
162
      sError("vgId:%d, failed to get pre log term prevIndex:%" PRId64 ", prevLogTerm:%" PRId64, pNode->vgId, prevIndex,
×
163
             prevLogTerm);
164
      *pSyncTerm = -1;
×
165
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
166
    }
167
    *pSyncTerm = prevLogTerm;
9,983,433✔
168
    return 0;
9,983,433✔
169
  }
170

171
  SSnapshot snapshot = {0};
1,867,983✔
172
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
1,867,983✔
173
  if (prevIndex == snapshot.lastApplyIndex) {
1,867,983✔
174
    *pSyncTerm = snapshot.lastApplyTerm;
1,148✔
175
    return 0;
1,148✔
176
  }
177

178
  if ((code = pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, prevIndex, &pEntry)) == 0) {
1,866,835✔
179
    prevLogTerm = pEntry->term;
1,837,361✔
180
    syncEntryDestroy(pEntry);
1,837,361✔
181
    pEntry = NULL;
1,837,361✔
182
    *pSyncTerm = prevLogTerm;
1,837,361✔
183
    return 0;
1,837,361✔
184
  }
185

186
  *pSyncTerm = -1;
29,474✔
187
  sInfo("vgId:%d, failed to get log term since %s, index:%" PRId64, pNode->vgId, tstrerror(code), prevIndex);
29,474✔
188
  TAOS_RETURN(code);
29,474✔
189
}
190

191
SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId) {
4,587,733✔
192
  return syncEntryBuildNoop(term, index, vgId);
4,587,733✔
193
}
194

195
int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex) {
4,587,733✔
196
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
4,587,733✔
197
  if (firstVer > commitIndex + 1) {
4,587,733✔
198
    sError("vgId:%d, firstVer of WAL log greater than tsdb commit version + 1, firstVer:%" PRId64
×
199
           ", tsdb commit version:%" PRId64,
200
           pNode->vgId, firstVer, commitIndex);
201
    return TSDB_CODE_WAL_LOG_INCOMPLETE;
×
202
  }
203

204
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
4,587,733✔
205
  if (lastVer < commitIndex) {
4,587,557✔
206
    sError("vgId:%d, lastVer of WAL log less than tsdb commit version, lastVer:%" PRId64
×
207
           ", tsdb commit version:%" PRId64,
208
           pNode->vgId, lastVer, commitIndex);
209
    return TSDB_CODE_WAL_LOG_INCOMPLETE;
×
210
  }
211

212
  return 0;
4,587,557✔
213
}
214

215
int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
4,587,733✔
216
  if (pNode->pLogStore == NULL) {
4,587,733✔
217
    sError("log store not created");
×
218
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
219
  }
220
  if (pNode->pFsm == NULL) {
4,587,733✔
221
    sError("pFsm not registered");
×
222
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
223
  }
224
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
4,587,733✔
225
    sError("FpGetSnapshotInfo not registered");
×
226
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
227
  }
228

229
  int32_t   code = 0, lino = 0;
4,587,733✔
230
  SSnapshot snapshot = {0};
4,587,733✔
231
  TAOS_CHECK_EXIT(pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot));
4,587,733✔
232

233
  SyncIndex commitIndex = snapshot.lastApplyIndex;
4,587,733✔
234
  SyncTerm  commitTerm = TMAX(snapshot.lastApplyTerm, 0);
4,587,733✔
235
  TAOS_CHECK_EXIT(syncLogValidateAlignmentOfCommit(pNode, commitIndex));
4,587,733✔
236

237
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
4,587,733✔
238
  if (lastVer < commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
4,587,733✔
239
  ;
240
  SyncIndex toIndex = lastVer;
4,587,733✔
241
  // update match index
242
  pBuf->commitIndex = commitIndex;
4,587,733✔
243
  pBuf->matchIndex = toIndex;
4,587,733✔
244
  pBuf->endIndex = toIndex + 1;
4,587,733✔
245

246
  // load log entries in reverse order
247
  SSyncLogStore*  pLogStore = pNode->pLogStore;
4,587,733✔
248
  SyncIndex       index = toIndex;
4,587,733✔
249
  SSyncRaftEntry* pEntry = NULL;
4,587,733✔
250
  bool            takeDummy = false;
4,587,733✔
251
  int             emptySize = (TSDB_SYNC_LOG_BUFFER_SIZE >> 1);
4,587,733✔
252

253
  while (true) {
53,823,016✔
254
    if (index <= pBuf->commitIndex) {
58,410,749✔
255
      takeDummy = true;
4,587,733✔
256
      break;
4,587,733✔
257
    }
258

259
    if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) {
53,822,657✔
260
      sWarn("vgId:%d, failed to get log entry since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
×
261
      break;
×
262
    }
263
#ifdef USE_MOUNT
264
    if (pNode->mountVgId) {
53,822,729✔
265
      SMsgHead* pMsgHead = (SMsgHead*)pEntry->data;
4,432✔
266
      if (pMsgHead->vgId != pNode->vgId) pMsgHead->vgId = pNode->vgId;
4,432✔
267
    }
268
#endif
269
    bool taken = false;
53,822,729✔
270
    if (toIndex - index + 1 <= pBuf->size - emptySize) {
53,822,729✔
271
      SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1};
53,822,441✔
272
      pBuf->entries[index % pBuf->size] = tmp;
53,822,441✔
273
      taken = true;
53,822,370✔
274
      if (pNode->vgId > 1) {
53,822,370✔
275
        pBuf->bytes += pEntry->bytes;
53,700,095✔
276
        (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
53,700,516✔
277
      }
278
    }
279

280
    if (index < toIndex) {
53,822,729✔
281
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = pEntry->index;
52,943,875✔
282
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = pEntry->term;
52,944,449✔
283
    }
284

285
    if (!taken) {
53,823,016✔
286
      syncEntryDestroy(pEntry);
×
287
      pEntry = NULL;
×
288
      break;
×
289
    }
290

291
    index--;
53,823,016✔
292
  }
293

294
  // put a dummy record at commitIndex if present in log buffer
295
  if (takeDummy) {
4,587,733✔
296
    if (index != pBuf->commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
4,587,733✔
297

298
    SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId);
4,587,733✔
299
    if (pDummy == NULL) {
4,587,733✔
300
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
301
    }
302
    SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm};
4,587,733✔
303
    pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp;
4,587,733✔
304
    if (pNode->vgId > 1) {
4,587,733✔
305
      pBuf->bytes += pDummy->bytes;
4,146,471✔
306
      (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pDummy->bytes);
4,146,471✔
307
    }
308

309
    if (index < toIndex) {
4,587,733✔
310
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex;
878,854✔
311
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = commitTerm;
878,854✔
312
    }
313
  }
314

315
  // update startIndex
316
  pBuf->startIndex = takeDummy ? index : index + 1;
4,587,733✔
317

318
  pBuf->isCatchup = false;
4,587,572✔
319

320
  sInfo("vgId:%d, init sync log buffer, buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
4,587,733✔
321
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
322

323
  // validate
324
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
4,589,040✔
325
  return 0;
4,587,733✔
326

327
_exit:
×
328
  if (code != 0) {
×
329
    sError("vgId:%d, failed to initialize sync log buffer at line %d since %s.", pNode->vgId, lino, tstrerror(code));
×
330
  }
331
  TAOS_RETURN(code);
×
332
}
333

334
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
4,558,675✔
335
  (void)taosThreadMutexLock(&pBuf->mutex);
4,558,675✔
336
  int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
4,558,675✔
337
  (void)taosThreadMutexUnlock(&pBuf->mutex);
4,558,675✔
338
  return ret;
4,558,675✔
339
}
340

341
int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
29,058✔
342
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
29,058✔
343
  (void)taosThreadMutexLock(&pBuf->mutex);
29,058✔
344
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
70,388✔
345
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
41,330✔
346
    if (pEntry == NULL) continue;
41,330✔
347
    syncEntryDestroy(pEntry);
29,294✔
348
    pEntry = NULL;
29,294✔
349
    (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
29,294✔
350
  }
351
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
29,058✔
352
  pBuf->bytes = 0;
29,058✔
353
  int32_t code = syncLogBufferInitWithoutLock(pBuf, pNode);
29,058✔
354
  if (code < 0) {
29,058✔
355
    sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code));
×
356
  }
357
  (void)taosThreadMutexUnlock(&pBuf->mutex);
29,058✔
358
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
29,058✔
359
  return code;
29,058✔
360
}
361

362
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTermWithoutLock(SSyncLogBuffer* pBuf) {
363
  SyncIndex       index = pBuf->matchIndex;
129,367,640✔
364
  SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
129,368,178✔
365
  if (pEntry == NULL) {
129,366,338✔
366
    sError("failed to get last match term since entry is null");
×
367
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
368
    return -1;
×
369
  }
370
  return pEntry->term;
129,366,525✔
371
}
372

373
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
25,408,214✔
374
  (void)taosThreadMutexLock(&pBuf->mutex);
25,408,214✔
375
  SyncTerm term = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
25,408,214✔
376
  (void)taosThreadMutexUnlock(&pBuf->mutex);
25,408,214✔
377
  return term;
25,408,214✔
378
}
379

380
bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf) {
25,408,108✔
381
  (void)taosThreadMutexLock(&pBuf->mutex);
25,408,108✔
382
  bool empty = (pBuf->endIndex <= pBuf->startIndex);
25,408,108✔
383
  (void)taosThreadMutexUnlock(&pBuf->mutex);
25,408,214✔
384
  return empty;
25,408,214✔
385
}
386

387
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
103,959,156✔
388
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
103,959,156✔
389
  (void)taosThreadMutexLock(&pBuf->mutex);
103,960,326✔
390
  int32_t         code = 0;
103,959,786✔
391
  SyncIndex       index = pEntry->index;
103,959,786✔
392
  SyncIndex       prevIndex = pEntry->index - 1;
103,960,324✔
393
  SyncTerm        lastMatchTerm = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
103,960,074✔
394
  SSyncRaftEntry* pExist = NULL;
103,960,074✔
395
  bool            inBuf = true;
103,960,074✔
396

397
  if (lastMatchTerm < 0) {
103,958,713✔
398
    sError("vgId:%d, failed to accept, lastMatchTerm:%" PRId64, pNode->vgId, lastMatchTerm);
×
399
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
400
    goto _out;
×
401
  }
402

403
  if (index <= pBuf->commitIndex) {
103,958,713✔
404
    sTrace("vgId:%d, already committed, index:%" PRId64 ", term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64
3,242,953✔
405
           " %" PRId64 ", %" PRId64 ")",
406
           pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
407
           pBuf->endIndex);
408
    SyncTerm term = -1;
3,242,953✔
409
    code = syncLogReplGetPrevLogTerm(NULL, pNode, index + 1, &term);
3,242,843✔
410
    if (pEntry->term < 0) {
3,242,953✔
411
      sError("vgId:%d, failed to accept, pEntry->term:%" PRId64, pNode->vgId, pEntry->term);
×
412
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
413
      goto _out;
×
414
    }
415
    if (term == pEntry->term) {
3,242,953✔
416
      code = 0;
3,242,953✔
417
    }
418
    goto _out;
3,242,953✔
419
  }
420

421
  if (pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER && index > 0 &&
100,714,409✔
422
      index > pBuf->totalIndex) {
10,053,903✔
423
    pBuf->totalIndex = index;
226,709✔
424
    sTrace("vgId:%d, update learner progress, index:%" PRId64 ", term:%" PRId64 ": prevterm:%" PRId64
226,709✔
425
           " != lastmatch:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
426
           pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
427
           pBuf->matchIndex, pBuf->endIndex);
428
  }
429

430
  if (index - pBuf->startIndex >= pBuf->size) {
100,715,148✔
431
    sWarn("vgId:%d, out of buffer range, index:%" PRId64 ", term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64
×
432
          " %" PRId64 ", %" PRId64 ")",
433
          pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
434
          pBuf->endIndex);
435
    code = TSDB_CODE_OUT_OF_RANGE;
×
436
    goto _out;
×
437
  }
438

439
  if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) {
100,713,412✔
440
    sWarn("vgId:%d, not ready to accept, index:%" PRId64 ", term:%" PRId64 ": prevterm:%" PRId64
218,023✔
441
          " != lastmatch:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
442
          pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
443
          pBuf->matchIndex, pBuf->endIndex);
444
    code = TSDB_CODE_ACTION_IN_PROGRESS;
218,023✔
445
    goto _out;
218,023✔
446
  }
447

448
  // check current in buffer
449
  code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pExist);
100,496,466✔
450
  if (pExist != NULL) {
100,497,884✔
451
    if (pEntry->index != pExist->index) {
877,874✔
452
      sError("vgId:%d, failed to accept, pEntry->index:%" PRId64 ", pExist->index:%" PRId64, pNode->vgId, pEntry->index,
×
453
             pExist->index);
454
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
455
      goto _out;
×
456
    }
457
    if (pEntry->term != pExist->term) {
877,874✔
458
      TAOS_CHECK_GOTO(syncLogBufferRollback(pBuf, pNode, index), NULL, _out);
917✔
459
    } else {
460
      sTrace("vgId:%d, duplicate log entry received, index:%" PRId64 ", term:%" PRId64 ", log buffer: [%" PRId64
876,957✔
461
             " %" PRId64 " %" PRId64 ", %" PRId64 ")",
462
             pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
463
             pBuf->endIndex);
464
      SyncTerm existPrevTerm = -1;
876,957✔
465
      TAOS_CHECK_GOTO(syncLogReplGetPrevLogTerm(NULL, pNode, index, &existPrevTerm), NULL, _out);
876,957✔
466
      if (!(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm))) {
806,632✔
467
        sError("vgId:%d, failed to accept, pEntry->term:%" PRId64 ", pExist->indexpExist->term:%" PRId64
×
468
               ", pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64 ", prevTerm:%" PRId64
469
               ", existPrevTerm:%" PRId64,
470
               pNode->vgId, pEntry->term, pExist->term, pEntry->index, pBuf->matchIndex, prevTerm, existPrevTerm);
471
        code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
472
        goto _out;
×
473
      }
474
      code = 0;
806,632✔
475
      goto _out;
806,632✔
476
    }
477
  }
478

479
  // update
480
  if (!(pBuf->startIndex < index)) {
99,620,927✔
481
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
482
    goto _out;
×
483
  };
484
  if (!(index - pBuf->startIndex < pBuf->size)) {
99,622,190✔
485
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
486
    goto _out;
×
487
  }
488
  if (pBuf->entries[index % pBuf->size].pItem != NULL) {
99,621,154✔
489
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
490
    goto _out;
×
491
  }
492
  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
99,621,427✔
493
  pBuf->entries[index % pBuf->size] = tmp;
99,621,427✔
494
  if (pNode->vgId > 1) {
99,621,246✔
495
    pBuf->bytes += pEntry->bytes;
96,634,687✔
496
    (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
96,634,142✔
497
  }
498
  pEntry = NULL;
99,620,543✔
499

500
  // update end index
501
  pBuf->endIndex = TMAX(index + 1, pBuf->endIndex);
99,620,543✔
502

503
  // success
504
  code = 0;
99,621,784✔
505

506
_out:
103,959,717✔
507
  syncEntryDestroy(pEntry);
103,959,437✔
508
  if (!inBuf) {
103,958,331✔
509
    syncEntryDestroy(pExist);
×
510
    pExist = NULL;
×
511
  }
512
  (void)taosThreadMutexUnlock(&pBuf->mutex);
103,958,331✔
513
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
103,958,658✔
514
  TAOS_RETURN(code);
103,957,969✔
515
}
516

517
static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replicaNum) {
824,822,678✔
518
  return (replicaNum > 1) && (pEntry->originalRpcType == TDMT_VND_COMMIT);
824,822,678✔
519
}
520

521
int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
824,819,428✔
522
  int32_t code = 0;
824,819,428✔
523
  if (pEntry->index < 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
824,819,428✔
524
  SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore);
824,830,005✔
525
  if (lastVer >= pEntry->index && (code = pLogStore->syncLogTruncate(pLogStore, pEntry->index)) < 0) {
824,845,423✔
526
    sError("failed to truncate log store since %s, from index:%" PRId64, tstrerror(code), pEntry->index);
×
527
    TAOS_RETURN(code);
×
528
  }
529
  lastVer = pLogStore->syncLogLastIndex(pLogStore);
824,845,423✔
530
  if (pEntry->index != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
824,840,155✔
531

532
#ifdef USE_MOUNT
533
  if (pNode->mountVgId) {
824,819,878✔
534
    SMsgHead* pHead = (SMsgHead*)pEntry->data;
2,248✔
535
    if (pHead->vgId != pNode->mountVgId) pHead->vgId = pNode->mountVgId;
2,248✔
536
  }
537
#endif
538
  bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum);
824,833,734✔
539
  if ((code = pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync)) < 0) {
824,816,017✔
540
    sError("failed to persist raft entry since %s, index:%" PRId64 ", term:%" PRId64, tstrerror(code), pEntry->index,
×
541
           pEntry->term);
542
    TAOS_RETURN(code);
×
543
  }
544

545
  lastVer = pLogStore->syncLogLastIndex(pLogStore);
824,816,888✔
546
  if (pEntry->index != lastVer) return TSDB_CODE_SYN_INTERNAL_ERROR;
824,836,736✔
547
  return 0;
824,836,140✔
548
}
549

550
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char* str, const SRpcMsg *pMsg) {
829,168,138✔
551
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
829,168,138✔
552
  (void)taosThreadMutexLock(&pBuf->mutex);
829,206,281✔
553

554
  SSyncLogStore* pLogStore = pNode->pLogStore;
829,172,129✔
555
  int64_t        matchIndex = pBuf->matchIndex;
829,192,076✔
556
  int32_t        code = 0;
829,184,797✔
557

558
  while (pBuf->matchIndex + 1 < pBuf->endIndex) {
1,653,968,281✔
559
    int64_t index = pBuf->matchIndex + 1;
830,884,466✔
560
    if (index < 0) {
830,874,862✔
561
      sError("vgId:%d, failed to proceed index:%" PRId64, pNode->vgId, index);
×
562
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
563
      goto _out;
×
564
    }
565

566
    // try to proceed
567
    SSyncLogBufEntry* pBufEntry = &pBuf->entries[index % pBuf->size];
830,874,862✔
568
    SyncIndex         prevLogIndex = pBufEntry->prevLogIndex;
830,873,762✔
569
    SyncTerm          prevLogTerm = pBufEntry->prevLogTerm;
830,909,397✔
570
    SSyncRaftEntry*   pEntry = pBufEntry->pItem;
830,888,640✔
571
    if (pEntry == NULL) {
830,841,314✔
572
      sTrace("vgId:%d, msg:%p, cannot proceed match index in log buffer, no raft entry at next pos of matchIndex:%" PRId64,
6,058,113✔
573
             pNode->vgId, pMsg, pBuf->matchIndex);
574
      goto _out;
6,058,113✔
575
    }
576

577
    if (index != pEntry->index) {
824,783,201✔
578
      sError("vgId:%d, msg:%p, failed to proceed index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId, pMsg, index, pEntry->index);
×
579
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
580
      goto _out;
×
581
    }
582

583
    // match
584
    SSyncRaftEntry* pMatch = pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem;
824,792,001✔
585
    if (pMatch == NULL) {
824,768,758✔
586
      sError("vgId:%d, msg:%p, failed to proceed since pMatch is null", pNode->vgId, pMsg);
×
587
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
588
      goto _out;
×
589
    }
590
    if (pMatch->index != pBuf->matchIndex) {
824,768,758✔
591
      sError("vgId:%d, msg:%p, failed to proceed, pMatch->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId,
×
592
             pMsg, pMatch->index, pBuf->matchIndex);
593
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
594
      goto _out;
×
595
    }
596
    if (pMatch->index + 1 != pEntry->index) {
824,762,678✔
597
      sError("vgId:%d, msg:%p, failed to proceed, pMatch->index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId, pMsg,
×
598
             pMatch->index, pEntry->index);
599
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
600
      goto _out;
×
601
    }
602
    if (prevLogIndex != pMatch->index) {
824,816,751✔
603
      sError("vgId:%d, msg:%p, failed to proceed, prevLogIndex:%" PRId64 ", pMatch->index:%" PRId64, pNode->vgId, pMsg,
×
604
             prevLogIndex, pMatch->index);
605
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
606
      goto _out;
×
607
    }
608

609
    if (pMatch->term != prevLogTerm) {
824,801,920✔
UNCOV
610
      sInfo(
×
611
          "vgId:%d, msg:%p, mismatching sync log entries encountered, "
612
          "{ index:%" PRId64 ", term:%" PRId64
613
          " } "
614
          "{ index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64 ", prevLogTerm:%" PRId64 " } ",
615
          pNode->vgId, pMsg, pMatch->index, pMatch->term, pEntry->index, pEntry->term, prevLogIndex, prevLogTerm);
UNCOV
616
      goto _out;
×
617
    }
618

619
    // increase match index
620
    pBuf->matchIndex = index;
824,781,978✔
621

622
    sGDebug(pMsg ? &pMsg->info.traceId : NULL,
824,831,882✔
623
            "vgId:%d, msg:%p, log buffer proceed, start index:%" PRId64 ", match index:%" PRId64 ", end index:%" PRId64,
624
            pNode->vgId, pMsg, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex);
625

626
    // persist
627
    if ((code = syncLogStorePersist(pLogStore, pNode, pEntry)) < 0) {
824,831,882✔
628
      sError("vgId:%d, msg:%p, failed to persist sync log entry from buffer since %s, index:%" PRId64, pNode->vgId,
×
629
             pMsg, tstrerror(code), pEntry->index);
630
      taosMsleep(1);
×
631
      goto _out;
×
632
    }
633

634
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
824,809,399✔
635
      if (pNode->pLogBuf->commitIndex == pEntry->index - 1) {
×
636
        sInfo(
×
637
            "vgId:%d, msg:%p, to change config at %s, "
638
            "current entry, index:%" PRId64 ", term:%" PRId64
639
            ", "
640
            "node, restore:%d, commitIndex:%" PRId64
641
            ", "
642
            "cond: (pre entry index:%" PRId64 "== buf commit index:%" PRId64 ")",
643
            pNode->vgId, pMsg, str, pEntry->index, pEntry->term, pNode->restoreFinish, pNode->commitIndex,
644
            pEntry->index - 1, pNode->pLogBuf->commitIndex);
645
        if ((code = syncNodeChangeConfig(pNode, pEntry, str)) != 0) {
×
646
          sError("vgId:%d, failed to change config from Append since %s, index:%" PRId64, pNode->vgId, tstrerror(code),
×
647
                 pEntry->index);
648
          goto _out;
×
649
        }
650
      } else {
651
        sInfo(
×
652
            "vgId:%d, msg:%p, delay change config from Node %s, "
653
            "curent entry, index:%" PRId64 ", term:%" PRId64
654
            ", "
655
            "node, commitIndex:%" PRId64 ",  pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
656
            "), "
657
            "cond:( pre entry index:%" PRId64 " != buf commit index:%" PRId64 ")",
658
            pNode->vgId, pMsg, str, pEntry->index, pEntry->term, pNode->commitIndex, pNode->pLogBuf->startIndex,
659
            pNode->pLogBuf->commitIndex, pNode->pLogBuf->matchIndex, pNode->pLogBuf->endIndex, pEntry->index - 1,
660
            pNode->pLogBuf->commitIndex);
661
      }
662
    }
663

664
    // replicate on demand
665
    if ((code = syncNodeReplicateWithoutLock(pNode)) != 0) {
824,826,982✔
UNCOV
666
      sError("vgId:%d, msg:%p, failed to replicate since %s, index:%" PRId64, pNode->vgId, pMsg, tstrerror(code),
×
667
             pEntry->index);
668
      goto _out;
×
669
    }
670

671
    if (pEntry->index != pBuf->matchIndex) {
824,812,040✔
672
      sError("vgId:%d, msg:%p, failed to proceed, pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId,
×
673
             pMsg, pEntry->index, pBuf->matchIndex);
674
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
675
      goto _out;
×
676
    }
677

678
    // update my match index
679
    matchIndex = pBuf->matchIndex;
824,799,834✔
680
    syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex);
824,820,171✔
681
  }  // end of while
682

683
_out:
823,077,571✔
684
  pBuf->matchIndex = matchIndex;
829,135,684✔
685
  if (pMatchTerm) {
829,145,984✔
686
    *pMatchTerm = pBuf->entries[(matchIndex + pBuf->size) % pBuf->size].pItem->term;
103,960,759✔
687
  }
688
  (void)taosThreadMutexUnlock(&pBuf->mutex);
829,144,437✔
689
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
829,172,186✔
690
  return matchIndex;
829,184,021✔
691
}
692

693
int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
877,898,439✔
694
                       int32_t applyCode, bool force) {
695
  // learner need to execute fsm when it catch up entry log
696
  // if force is true, keep all contition check to execute fsm
697
  if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1 &&
877,898,439✔
698
      pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER && force == false) {
641,184,021✔
699
    sGDebug(&pEntry->originRpcTraceId,
640,566,976✔
700
            "vgId:%d, index:%" PRId64 ", raft fsm no need to execute, term:%" PRId64
701
            ", type:%s code:0x%x, replica:%d, role:%d, restoreFinish:%d",
702
            pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode, pNode->replicaNum,
703
            pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole, pNode->restoreFinish);
704
    return 0;
640,553,926✔
705
  }
706

707
  if (pNode->vgId != 1 && syncIsMsgBlock(pEntry->originalRpcType)) {
237,372,731✔
708
    sTrace("vgId:%d, index:%" PRId64 ", blocking msg ready to execute, term:%" PRId64 ", type:%s code:0x%x",
8,151,763✔
709
           pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode);
710
  }
711

712
  if (pEntry->originalRpcType == TDMT_VND_COMMIT) {
237,340,467✔
713
    sInfo("vgId:%d, index:%" PRId64 ", fsm execute vnode commit, term:%" PRId64, pNode->vgId, pEntry->index,
646,369✔
714
          pEntry->term);
715
  }
716

717
  int32_t code = 0, lino = 0;
237,340,480✔
718
  bool    retry = false;
237,340,480✔
719
  do {
720
    SFsmCbMeta cbMeta = {0};
237,340,480✔
721
    cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
237,340,480✔
722
    if (cbMeta.lastConfigIndex < -1) {
237,339,889✔
723
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
724
      if (terrno != 0) code = terrno;
×
725
      return code;
×
726
    }
727

728
    SRpcMsg rpcMsg = {.code = applyCode};
237,339,889✔
729
    TAOS_CHECK_EXIT(syncEntry2OriginalRpc(pEntry, &rpcMsg));
237,339,203✔
730

731
    cbMeta.index = pEntry->index;
237,339,830✔
732
    cbMeta.isWeak = pEntry->isWeak;
237,339,901✔
733
    cbMeta.code = applyCode;
237,339,780✔
734
    cbMeta.state = role;
237,339,780✔
735
    cbMeta.seqNum = pEntry->seqNum;
237,339,780✔
736
    cbMeta.term = pEntry->term;
237,340,482✔
737
    cbMeta.currentTerm = term;
237,340,230✔
738
    cbMeta.flag = -1;
237,340,230✔
739
    rpcMsg.info.traceId = pEntry->originRpcTraceId;
237,340,230✔
740

741
    int32_t num = syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
237,338,208✔
742
    sGDebug(&rpcMsg.info.traceId, "vgId:%d, index:%" PRId64 ", get response info, handle:%p seq:%" PRId64 " num:%d",
237,340,623✔
743
            pNode->vgId, pEntry->index, &rpcMsg.info, cbMeta.seqNum, num);
744

745
    code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
237,340,623✔
746

747
    retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
237,339,029✔
748

749
    sGTrace(&rpcMsg.info.traceId,
237,339,796✔
750
            "vgId:%d, index:%" PRId64 ", fsm execute, term:%" PRId64 ", type:%s, code:%d, retry:%d", pNode->vgId,
751
            pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), code, retry);
752

753
    if (retry) {
237,339,607✔
754
      taosMsleep(10);
×
755
      if (code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE) {
×
756
        pNode->applyQueueErrorCount++;
×
757
        if (pNode->applyQueueErrorCount == APPLY_QUEUE_ERROR_THRESHOLD) {
×
758
          pNode->applyQueueErrorCount = 0;
×
759
          sGWarn(&rpcMsg.info.traceId,
×
760
                 "vgId:%d, index:%" PRId64 ", will retry to execute fsm after 10ms, last error is %s", pNode->vgId,
761
                 pEntry->index, tstrerror(code));
762
        } else {
763
          sGTrace(&rpcMsg.info.traceId,
×
764
                  "vgId:%d, index:%" PRId64 ", will retry to execute fsm after 10ms, last error is %s", pNode->vgId,
765
                  pEntry->index, tstrerror(code));
766
        }
767
      }
768
    }
769
  } while (retry);
237,340,264✔
770

771
_exit:
237,340,264✔
772
  if (code < 0) {
237,340,264✔
773
    sError("vgId:%d, index:%" PRId64 ", failed to execute fsm at line %d since %s, term:%" PRId64 ", type:%s",
23,583✔
774
           pNode->vgId, pEntry->index, lino, tstrerror(code), pEntry->term, TMSG_INFO(pEntry->originalRpcType));
775
  }
776
  return code;
237,339,421✔
777
}
778

779
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) {
2,147,483,647✔
780
  if (pBuf->startIndex > pBuf->matchIndex) {
2,147,483,647✔
781
    sError("failed to validate, pBuf->startIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->startIndex,
×
782
           pBuf->matchIndex);
783
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
784
  }
785
  if (pBuf->commitIndex > pBuf->matchIndex) {
2,147,483,647✔
786
    sError("failed to validate, pBuf->commitIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->commitIndex,
×
787
           pBuf->matchIndex);
788
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
789
  }
790
  if (pBuf->matchIndex >= pBuf->endIndex) {
2,147,483,647✔
791
    sError("failed to validate, pBuf->matchIndex:%" PRId64 ", pBuf->endIndex:%" PRId64, pBuf->matchIndex,
×
792
           pBuf->endIndex);
793
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
794
  }
795
  if (pBuf->endIndex - pBuf->startIndex > pBuf->size) {
2,147,483,647✔
796
    sError("failed to validate, pBuf->endIndex:%" PRId64 ", pBuf->startIndex:%" PRId64 ", pBuf->size:%" PRId64,
×
797
           pBuf->endIndex, pBuf->startIndex, pBuf->size);
798
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
799
  }
800
  if (pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem == NULL) {
2,147,483,647✔
801
    sError("failed to validate since pItem is null");
×
802
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
803
  }
804
  return 0;
2,147,483,647✔
805
}
806

807
int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex, const STraceId* trace,
913,961,300✔
808
                            const char* src) {
809
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
913,961,300✔
810
  (void)taosThreadMutexLock(&pBuf->mutex);
914,019,559✔
811

812
  SSyncLogStore*  pLogStore = pNode->pLogStore;
913,988,530✔
813
  SSyncFSM*       pFsm = pNode->pFsm;
913,991,552✔
814
  ESyncState      role = pNode->state;
913,965,520✔
815
  SyncTerm        currentTerm = raftStoreGetTerm(pNode);
914,010,977✔
816
  SyncGroupId     vgId = pNode->vgId;
914,026,241✔
817
  int32_t         code = 0;
914,026,687✔
818
  int64_t         upperIndex = TMIN(commitIndex, pBuf->matchIndex);
914,026,687✔
819
  SSyncRaftEntry* pEntry = NULL;
914,006,544✔
820
  bool            inBuf = false;
913,978,150✔
821
  SSyncRaftEntry* pNextEntry = NULL;
913,962,404✔
822
  bool            nextInBuf = false;
914,018,783✔
823
  bool            restoreFinishAtThisCommit = false;
914,003,339✔
824

825
  if (commitIndex <= pBuf->commitIndex) {
914,003,339✔
826
    sGDebug(trace, "vgId:%d, stale commit index:%" PRId64 ", notified:%" PRId64, vgId, commitIndex, pBuf->commitIndex);
95,100,431✔
827
    if (!pNode->restoreFinish && commitIndex > 0 && commitIndex == pBuf->commitIndex) {
95,100,431✔
828
      sInfo("vgId:%d, try to get entry for restore check at index:%" PRId64, vgId, commitIndex);
7,090,132✔
829
      int32_t ret = syncLogBufferGetOneEntry(pBuf, pNode, commitIndex, &inBuf, &pEntry);
7,090,132✔
830
      if (ret != 0) {
7,089,707✔
831
        sWarn("vgId:%d, failed to get entry for restore check at index:%" PRId64, vgId, commitIndex);
193,053✔
832
      }
833
    }
834
    goto _out;
95,100,718✔
835
  }
836

837
  sGDebug(trace,
818,837,615✔
838
          "vgId:%d, log commit since %s, buffer:[%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
839
          "), role:%d, term:%" PRId64,
840
          pNode->vgId, src, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, currentTerm);
841

842
  // execute in fsm
843
  for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) {
1,696,655,087✔
844
    // get a log entry
845
    code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry);
877,895,290✔
846
    if (pEntry == NULL) {
877,890,826✔
847
      goto _out;
×
848
    }
849

850
    // execute it
851
    if (!syncUtilUserCommit(pEntry->originalRpcType)) {
877,890,826✔
852
      sGInfo(&pEntry->originRpcTraceId,
11,867,576✔
853
             "vgId:%d, index:%" PRId64 ", log commit sync barrier, term:%" PRId64 ", type:%s", vgId, pEntry->index,
854
             pEntry->term, TMSG_INFO(pEntry->originalRpcType));
855
    }
856

857
    if ((code = syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0, false)) != 0) {
877,889,229✔
858
      sGError(&pEntry->originRpcTraceId,
23,583✔
859
              "vgId:%d, index:%" PRId64 ", failed to execute sync log entry, term:%" PRId64
860
              ", role:%d, current term:%" PRId64,
861
              vgId, pEntry->index, pEntry->term, role, currentTerm);
862
      goto _out;
23,583✔
863
    }
864
    pBuf->commitIndex = index;
877,875,856✔
865

866
    sGDebug(&pEntry->originRpcTraceId,
877,881,275✔
867
            "vgId:%d, index:%" PRId64 ", raft entry committed, term:%" PRId64 ", role:%d, current term:%" PRId64,
868
            pNode->vgId, pEntry->index, pEntry->term, role, currentTerm);
869

870
    code = syncLogBufferGetOneEntry(pBuf, pNode, index + 1, &nextInBuf, &pNextEntry);
877,881,275✔
871
    if (pNextEntry != NULL) {
877,844,094✔
872
      if (pNextEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
140,096,486✔
873
        sInfo(
×
874
            "vgId:%d, to change config at commit, "
875
            "current entry, index:%" PRId64 ", term:%" PRId64
876
            ", "
877
            "node, role:%d, current term:%" PRId64
878
            ", restore:%d, "
879
            "cond, next entry index:%" PRId64 ", msgType:%s",
880
            vgId, pEntry->index, pEntry->term, role, currentTerm, pNode->restoreFinish, pNextEntry->index,
881
            TMSG_INFO(pNextEntry->originalRpcType));
882

883
        if ((code = syncNodeChangeConfig(pNode, pNextEntry, "Commit")) != 0) {
×
884
          sError("vgId:%d, failed to change config from Commit, index:%" PRId64 ", term:%" PRId64
×
885
                 ", role:%d, current term:%" PRId64,
886
                 vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
887
          goto _out;
×
888
        }
889

890
        // for 2->1, need to apply config change entry in sync thread,
891
        if (pNode->replicaNum == 1) {
×
892
          if ((code = syncFsmExecute(pNode, pFsm, role, currentTerm, pNextEntry, 0, true)) != 0) {
×
893
            sError("vgId:%d, failed to execute sync log entry, index:%" PRId64 ", term:%" PRId64
×
894
                   ", role:%d, current term:%" PRId64,
895
                   vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
896
            goto _out;
×
897
          }
898

899
          index++;
×
900
          pBuf->commitIndex = index;
×
901

902
          sGDebug(&pNextEntry->originRpcTraceId,
×
903
                  "vgId:%d, index:%" PRId64 ", raft entry committed, term:%" PRId64 ", role:%d, current term:%" PRId64,
904
                  pNode->vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
905
        }
906
      }
907
      if (!nextInBuf) {
140,069,864✔
908
        syncEntryDestroy(pNextEntry);
×
909
        pNextEntry = NULL;
×
910
      }
911
    }
912

913
    if (!inBuf) {
877,817,472✔
914
      syncEntryDestroy(pEntry);
×
915
      pEntry = NULL;
×
916
    }
917
  }
918

919
  // recycle
920
  bool      isVnode = pNode->vgId > 1;
818,804,612✔
921
  SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION;
818,811,991✔
922
  do {
680,152,788✔
923
    if ((pBuf->startIndex >= pBuf->commitIndex) ||
1,499,014,372✔
924
        !((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD &&
1,498,981,922✔
925
                                         atomic_load_64(&tsLogBufferMemoryUsed) >= tsLogBufferMemoryAllowed))) {
27,254,415✔
926
      break;
927
    }
928
    SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem;
680,160,586✔
929
    if (pEntry == NULL) {
680,194,973✔
930
      sError("vgId:%d, invalid log entry to recycle, index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
×
931
             ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64,
932
             pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term);
933
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
934
    }
935
    if (isVnode) {
680,194,973✔
936
      pBuf->bytes -= pEntry->bytes;
665,665,873✔
937
      (void)atomic_sub_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
665,679,291✔
938
    }
939
    sDebug("vgId:%d, recycle log entry, index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
680,206,392✔
940
           ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64
941
           ", used:%" PRId64 ", allowed:%" PRId64,
942
           pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term,
943
           pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed), tsLogBufferMemoryAllowed);
944
    syncEntryDestroy(pEntry);
680,206,392✔
945
    (void)memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
680,163,815✔
946
    ++pBuf->startIndex;
680,204,576✔
947
  } while (true);
948

949
  code = 0;
818,822,957✔
950
_out:
913,947,258✔
951
  // mark as restored if needed
952
  if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL &&
913,999,674✔
953
      currentTerm <= pEntry->term) {
18,874,742✔
954
    pNode->pFsm->FpRestoreFinishCb(pNode->pFsm, pBuf->commitIndex);
4,840,481✔
955
    pNode->restoreFinish = true;
4,842,362✔
956
    restoreFinishAtThisCommit = true;
4,842,362✔
957
    sInfo("vgId:%d, restore finished, term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
4,842,362✔
958
          pNode->vgId, currentTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
959
  }
960

961
  if (!inBuf) {
913,957,543✔
962
    syncEntryDestroy(pEntry);
89,455,796✔
963
    pEntry = NULL;
89,454,871✔
964
  }
965
  if (!nextInBuf) {
913,956,618✔
966
    syncEntryDestroy(pNextEntry);
831,046,408✔
967
    pNextEntry = NULL;
831,045,288✔
968
  }
969
  (void)taosThreadMutexUnlock(&pBuf->mutex);
913,955,498✔
970

971
  if (restoreFinishAtThisCommit && pNode->pFsm->FpAfterRestoredCb != NULL) {
914,004,473✔
972
    pNode->pFsm->FpAfterRestoredCb(pNode->pFsm, pBuf->commitIndex);
525,287✔
973
    sInfo("vgId:%d, after restore finished callback executed)", pNode->vgId);
525,287✔
974
  }
975

976
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
914,004,473✔
977
  TAOS_RETURN(code);
913,963,699✔
978
}
979

980
void syncLogReplReset(SSyncLogReplMgr* pMgr) {
15,396,131✔
981
  if (pMgr == NULL) return;
15,396,131✔
982

983
  if (pMgr->startIndex < 0) {
15,396,131✔
984
    sError("failed to reset, pMgr->startIndex:%" PRId64, pMgr->startIndex);
×
985
    return;
×
986
  }
987
  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
17,583,806✔
988
    (void)memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
2,188,003✔
989
  }
990
  pMgr->startIndex = 0;
15,395,803✔
991
  pMgr->matchIndex = 0;
15,395,803✔
992
  pMgr->endIndex = 0;
15,395,452✔
993
  pMgr->restored = false;
15,395,770✔
994
  pMgr->retryBackoff = 0;
15,395,419✔
995
}
996

997
int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
193,717,135✔
998
  if (pMgr->endIndex <= pMgr->startIndex) {
193,717,135✔
999
    return 0;
×
1000
  }
1001

1002
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
193,717,032✔
1003
  if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) {
193,717,517✔
1004
    syncLogReplReset(pMgr);
6,403✔
1005
    sWarn("vgId:%d, reset sync log repl since retry backoff exceeding limit, peer addr:0x%" PRIx64, pNode->vgId,
6,403✔
1006
          pDestId->addr);
1007
    return TSDB_CODE_OUT_OF_RANGE;
6,403✔
1008
  }
1009

1010
  int32_t  code = 0;
193,710,622✔
1011
  bool     retried = false;
193,710,622✔
1012
  int64_t  retryWaitMs = syncLogReplGetRetryBackoffTimeMs(pMgr);
193,710,732✔
1013
  int64_t  nowMs = taosGetMonoTimestampMs();
193,711,114✔
1014
  int      count = 0;
193,711,114✔
1015
  int64_t  firstIndex = -1;
193,711,114✔
1016
  SyncTerm term = -1;
193,711,114✔
1017
  int64_t  batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
193,710,803✔
1018

1019
  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
213,726,529✔
1020
    int64_t pos = index % pMgr->size;
208,887,296✔
1021
    if (!(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex))) {
208,887,296✔
1022
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1023
      goto _out;
×
1024
    }
1025

1026
    if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) {
208,887,296✔
1027
      break;
188,859,771✔
1028
    }
1029

1030
    if (pMgr->states[pos].acked) {
20,027,214✔
1031
      if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) {
16,395,135✔
1032
        syncLogReplReset(pMgr);
137✔
1033
        sWarn("vgId:%d, reset sync log repl since stagnation, index:%" PRId64 ", peer addr:0x%" PRIx64, pNode->vgId, index,
137✔
1034
              pDestId->addr);
1035
        code = TSDB_CODE_ACTION_IN_PROGRESS;
137✔
1036
        goto _out;
137✔
1037
      }
1038
      continue;
16,037,990✔
1039
    }
1040

1041
    bool barrier = false;
3,988,983✔
1042
    if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) {
3,988,983✔
1043
      sError("vgId:%d, failed to replicate sync log entry since %s, index:%" PRId64 ", dest addr:0x%" PRIx64, pNode->vgId,
×
1044
             tstrerror(code), index, pDestId->addr);
1045
      goto _out;
×
1046
    }
1047
    if (barrier != pMgr->states[pos].barrier) {
3,988,983✔
1048
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1049
      goto _out;
×
1050
    }
1051
    pMgr->states[pos].timeMs = nowMs;
3,988,983✔
1052
    pMgr->states[pos].term = term;
3,988,983✔
1053
    pMgr->states[pos].acked = false;
3,988,983✔
1054

1055
    retried = true;
3,988,983✔
1056
    if (firstIndex == -1) firstIndex = index;
3,988,983✔
1057

1058
    if (batchSize <= count++) {
3,988,983✔
1059
      break;
11,591✔
1060
    }
1061
  }
1062

1063
_out:
193,710,490✔
1064
  if (retried) {
193,710,803✔
1065
    pMgr->retryBackoff = syncLogReplGetNextRetryBackoff(pMgr);
131,470✔
1066
    SSyncLogBuffer* pBuf = pNode->pLogBuf;
131,470✔
1067
    sInfo("vgId:%d, resend %d sync log entries, dest addr:0x%" PRIx64 ", indexes:%" PRId64 " ..., terms: .., %" PRId64
131,470✔
1068
          ", retryWaitMs:%" PRId64 ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64
1069
          " %" PRId64 ", %" PRId64 ")",
1070
          pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex,
1071
          pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1072
  }
1073
  TAOS_RETURN(code);
193,710,803✔
1074
}
1075

1076
int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
1,699,921✔
1077
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
1,699,921✔
1078
  SRaftId         destId = pMsg->srcId;
1,699,921✔
1079
  int32_t         code = 0;
1,699,921✔
1080
  if (pMgr->restored != false) return TSDB_CODE_SYN_INTERNAL_ERROR;
1,699,921✔
1081

1082
  sTrace("vgId:%d, begin to recover sync log repl, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 ", %" PRId64
1,699,921✔
1083
         ", %" PRId64 ") restore:%d, buffer: [%" PRId64 ", %" PRId64 ", %" PRId64 ", %" PRId64
1084
         "), msg: {lastSendIndex:%" PRId64 ", matchIndex:%" PRId64 ", fsmState:%d, success:%d, lastMatchTerm:%" PRId64
1085
         "}",
1086
         pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pMgr->restored,
1087
         pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, pMsg->lastSendIndex, pMsg->matchIndex,
1088
         pMsg->fsmState, pMsg->success, pMsg->lastMatchTerm);
1089

1090
  if (pMgr->endIndex == 0) {
1,699,921✔
1091
    if (pMgr->startIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
650,271✔
1092
    if (pMgr->matchIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
650,271✔
1093
    if (pMsg->matchIndex < 0) {
650,271✔
1094
      pMgr->restored = true;
95,803✔
1095
      sInfo("vgId:%d, sync log repl restored, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
95,803✔
1096
            "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1097
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1098
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1099
      return 0;
95,803✔
1100
    }
1101
  } else {
1102
    if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) {
1,049,650✔
1103
      TAOS_CHECK_RETURN(syncLogReplRetryOnNeed(pMgr, pNode));
23,748✔
1104
      return 0;
23,457✔
1105
    }
1106

1107
    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
1,025,902✔
1108

1109
    if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) {
1,025,902✔
1110
      pMgr->matchIndex = pMsg->matchIndex;
920,493✔
1111
      pMgr->restored = true;
920,493✔
1112
      sInfo("vgId:%d, sync log repl restored, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
920,493✔
1113
            "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1114
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1115
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1116
      return 0;
920,493✔
1117
    }
1118

1119
    if (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE || (!pMsg->success && pMsg->matchIndex >= pMsg->lastSendIndex)) {
105,409✔
1120
      char* msg1 = " rollback match index failure";
×
1121
      char* msg2 = " incomplete fsm state";
×
1122
      sTrace("vgId:%d, is going to trigger snapshot to dnode:%d by append reply, reason:%s, match index:%" PRId64
×
1123
             ", last sent:%" PRId64,
1124
             pNode->vgId, DID(&destId), (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1), pMsg->matchIndex,
1125
             pMsg->lastSendIndex);
1126
      pNode->snapSeq = -1;
×
1127
      if ((code = syncNodeStartSnapshot(pNode, &destId, (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1))) <
×
1128
          0) {
1129
        sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
×
1130
        TAOS_RETURN(code);
×
1131
      }
1132
      return 0;
×
1133
    }
1134
  }
1135

1136
  // check last match term
1137
  SyncTerm  term = -1;
659,877✔
1138
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
659,877✔
1139
  SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex);
659,877✔
1140
  SET_ERRNO(0);
659,877✔
1141

1142
  if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) {
659,877✔
1143
    code = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1, &term);
150,293✔
1144
    if (term < 0 && (ERRNO == ENFILE || ERRNO == EMFILE || ERRNO == ENOENT)) {
150,293✔
1145
      sError("vgId:%d, failed to get prev log term since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index + 1);
×
1146
      TAOS_RETURN(code);
×
1147
    }
1148

1149
    if (pMsg->matchIndex == -1) {
150,293✔
1150
      // first time to restore
1151
      sInfo("vgId:%d, first time to restore sync log repl, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64
66,761✔
1152
            ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), index:%" PRId64
1153
            ", firstVer:%" PRId64 ", term:%" PRId64 ", lastMatchTerm:%" PRId64,
1154
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1155
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, index, firstVer, term,
1156
            pMsg->lastMatchTerm);
1157
    }
1158

1159
    if ((index + 1 < firstVer) || (term < 0) ||
150,293✔
1160
        (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
120,819✔
1161
      if (!(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST)) return TSDB_CODE_SYN_INTERNAL_ERROR;
29,474✔
1162
      sTrace("vgId:%d, is going to trigger snapshot to dnode:%d by append reply, index:%" PRId64 ", firstVer:%" PRId64
29,474✔
1163
             ", term:%" PRId64 ", lastMatchTerm:%" PRId64,
1164
             pNode->vgId, DID(&destId), index, firstVer, term, pMsg->lastMatchTerm);
1165
      char reason[100] = {0};
29,474✔
1166
      if (index + 1 < firstVer)
29,474✔
1167
        tsnprintf(reason, 100, "matched entry not in log range, index:%" PRId64 ", firstVer:%" PRId64, index, firstVer);
29,474✔
1168
      else if (term < 0)
×
1169
        tsnprintf(reason, 100, "failed to get prev log term");
×
1170
      else
1171
        tsnprintf(reason, 100, "log term mismatch");
×
1172
      pNode->snapSeq = -1;
29,474✔
1173
      if ((code = syncNodeStartSnapshot(pNode, &destId, reason)) < 0) {
29,474✔
1174
        sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
×
1175
        TAOS_RETURN(code);
×
1176
      }
1177
      return 0;
29,474✔
1178
    }
1179

1180
    if (!(index + 1 >= firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR;
120,819✔
1181

1182
    if (term == pMsg->lastMatchTerm) {
120,819✔
1183
      index = index + 1;
120,608✔
1184
      if (!(index <= pNode->pLogBuf->matchIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR;
120,608✔
1185
    } else {
1186
      if (!(index > firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR;
211✔
1187
    }
1188
  }
1189

1190
  // attempt to replicate the raft log at index
1191
  syncLogReplReset(pMgr);
630,403✔
1192
  return syncLogReplProbe(pMgr, pNode, index);
630,403✔
1193
}
1194

1195
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
25,488,827✔
1196
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
25,488,827✔
1197
  (void)taosThreadMutexLock(&pMgr->mutex);
25,488,934✔
1198
  if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
25,488,934✔
1199
    sInfo("vgId:%d, reset sync log repl in heartbeat, peer addr:0x%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64,
969,817✔
1200
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
1201
    syncLogReplReset(pMgr);
969,817✔
1202
    pMgr->peerStartTime = pMsg->startTime;
969,817✔
1203
  }
1204
  (void)taosThreadMutexUnlock(&pMgr->mutex);
25,488,827✔
1205
  return 0;
25,488,827✔
1206
}
1207

1208
int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
103,910,096✔
1209
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
103,910,096✔
1210
  (void)taosThreadMutexLock(&pMgr->mutex);
103,910,096✔
1211
  if (pMsg->startTime != pMgr->peerStartTime) {
103,910,096✔
1212
    sInfo("vgId:%d, reset sync log repl in appendlog reply, peer addr:0x%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64,
88,354✔
1213
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
1214
    syncLogReplReset(pMgr);
88,354✔
1215
    pMgr->peerStartTime = pMsg->startTime;
88,354✔
1216
  }
1217
  (void)taosThreadMutexUnlock(&pMgr->mutex);
103,910,435✔
1218

1219
  (void)taosThreadMutexLock(&pBuf->mutex);
103,910,435✔
1220

1221
  int32_t code = 0;
103,910,435✔
1222
  if (pMgr->restored) {
103,910,435✔
1223
    if ((code = syncLogReplContinue(pMgr, pNode, pMsg)) != 0) {
102,210,514✔
1224
      sWarn("vgId:%d, failed to continue sync log repl since %s", pNode->vgId, tstrerror(code));
2,432✔
1225
    }
1226
  } else {
1227
    if ((code = syncLogReplRecover(pMgr, pNode, pMsg)) != 0) {
1,699,921✔
1228
      sWarn("vgId:%d, failed to recover sync log repl since %s", pNode->vgId, tstrerror(code));
291✔
1229
    }
1230
  }
1231
  (void)taosThreadMutexUnlock(&pBuf->mutex);
103,910,124✔
1232
  return 0;
103,910,435✔
1233
}
1234

1235
int32_t syncLogReplStart(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
93,805,209✔
1236
  if (pMgr->restored) {
93,805,209✔
1237
    TAOS_CHECK_RETURN(syncLogReplAttempt(pMgr, pNode));
91,492,609✔
1238
  } else {
1239
    TAOS_CHECK_RETURN(syncLogReplProbe(pMgr, pNode, pNode->pLogBuf->matchIndex));
2,312,309✔
1240
  }
1241
  return 0;
93,792,505✔
1242
}
1243

1244
int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
2,943,541✔
1245
  if (pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR;
2,943,541✔
1246
  if (!(pMgr->startIndex >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
2,943,541✔
1247
  int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs();
2,943,541✔
1248
  int64_t nowMs = taosGetMonoTimestampMs();
2,943,541✔
1249
  int32_t code = 0;
2,943,541✔
1250

1251
  sTrace("vgId:%d, begin to probe peer addr:0x%" PRIx64 " with msg of index:%" PRId64 ", repl-mgr:[%" PRId64 ", %" PRId64
2,943,541✔
1252
         ", %" PRId64 "), restored:%d",
1253
         pNode->vgId, pNode->replicasId[pMgr->peerId].addr, index, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1254
         pMgr->restored);
1255

1256
  if (pMgr->endIndex > pMgr->startIndex &&
2,943,541✔
1257
      nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) {
1,168,502✔
1258
    return 0;
1,069,614✔
1259
  }
1260
  syncLogReplReset(pMgr);
1,873,927✔
1261

1262
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
1,873,927✔
1263
  bool     barrier = false;
1,873,927✔
1264
  SyncTerm term = -1;
1,873,927✔
1265
  if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) {
1,873,927✔
1266
    sError("vgId:%d, failed to replicate log entry since %s, index:%" PRId64 ", dest addr:0x%016" PRIx64, pNode->vgId,
×
1267
           tstrerror(code), index, pDestId->addr);
1268
    TAOS_RETURN(code);
×
1269
  }
1270

1271
  if (!(index >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
1,873,927✔
1272
  pMgr->states[index % pMgr->size].barrier = barrier;
1,873,927✔
1273
  pMgr->states[index % pMgr->size].timeMs = nowMs;
1,873,927✔
1274
  pMgr->states[index % pMgr->size].term = term;
1,873,927✔
1275
  pMgr->states[index % pMgr->size].acked = false;
1,873,927✔
1276

1277
  pMgr->startIndex = index;
1,873,927✔
1278
  pMgr->endIndex = index + 1;
1,873,927✔
1279

1280
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
1,873,927✔
1281
  sTrace("vgId:%d, probe peer addr:0x%" PRIx64 " with msg of index:%" PRId64 " term:%" PRId64 ", repl-mgr:[%" PRId64
1,873,927✔
1282
         " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1283
         pNode->vgId, pDestId->addr, index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex,
1284
         pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1285
  return 0;
1,873,927✔
1286
}
1287

1288
int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
193,702,274✔
1289
  if (!pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR;
193,702,274✔
1290

1291
  sTrace("vgId:%d, replicate raft entries from end to match, repl-mgr:[%" PRId64 ", %" PRId64 ", %" PRId64
193,700,840✔
1292
         "), restore:%d",
1293
         pNode->vgId, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pMgr->restored);
1294

1295
  SRaftId*  pDestId = &pNode->replicasId[pMgr->peerId];
193,700,840✔
1296
  int32_t   batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
193,701,449✔
1297
  int32_t   code = 0;
193,702,274✔
1298
  int32_t   count = 0;
193,702,274✔
1299
  int64_t   nowMs = taosGetMonoTimestampMs();
193,701,330✔
1300
  int64_t   limit = pMgr->size >> 1;
193,701,330✔
1301
  SyncTerm  term = -1;
193,701,868✔
1302
  SyncIndex firstIndex = -1;
193,701,868✔
1303

1304
  for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) {
292,830,530✔
1305
    if (batchSize < count || limit <= index - pMgr->startIndex) {
109,223,107✔
1306
      break;
1307
    }
1308
    if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) {
109,173,535✔
1309
      break;
9,055,976✔
1310
    }
1311
    int64_t  pos = index % pMgr->size;
100,119,600✔
1312
    SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
100,121,396✔
1313
    bool     barrier = false;
100,119,424✔
1314
    SyncTerm term = -1;
100,118,347✔
1315

1316
    code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier);
100,117,767✔
1317
    if (code < 0) {
100,122,225✔
1318
      sError("vgId:%d, failed to replicate log entry since %s, index:%" PRId64 ", dest addr:0x%016" PRIx64, pNode->vgId,
9,354✔
1319
             tstrerror(code), index, pDestId->addr);
1320
      TAOS_RETURN(code);
9,354✔
1321
    }
1322
    pMgr->states[pos].barrier = barrier;
100,112,871✔
1323
    pMgr->states[pos].timeMs = nowMs;
100,112,871✔
1324
    pMgr->states[pos].term = term;
100,112,871✔
1325
    pMgr->states[pos].acked = false;
100,112,871✔
1326

1327
    if (firstIndex == -1) {
100,112,871✔
1328
      firstIndex = index;
87,466,092✔
1329
    }
1330

1331
    count++;
100,112,871✔
1332

1333
    pMgr->endIndex = index + 1;
100,112,871✔
1334
    if (barrier) {
100,112,871✔
1335
      sInfo("vgId:%d, replicated sync barrier to dnode:%d, index:%" PRId64 ", term:%" PRId64 ", repl-mgr:[%" PRId64
984,209✔
1336
            " %" PRId64 ", %" PRId64 ")",
1337
            pNode->vgId, DID(pDestId), index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
1338
      break;
984,209✔
1339
    }
1340
  }
1341

1342
  TAOS_CHECK_RETURN(syncLogReplRetryOnNeed(pMgr, pNode));
193,693,301✔
1343

1344
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
193,687,138✔
1345
  sTrace("vgId:%d, replicated %d msgs to peer addr:0x%" PRIx64 ", indexes:%" PRId64 "..., terms: ...%" PRId64
193,687,449✔
1346
         ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
1347
         ")",
1348
         pNode->vgId, count, pDestId->addr, firstIndex, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1349
         pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1350
  return 0;
193,686,700✔
1351
}
1352

1353
int32_t syncLogReplContinue(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
102,210,514✔
1354
  if (pMgr->restored != true) return TSDB_CODE_SYN_INTERNAL_ERROR;
102,210,514✔
1355
  if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) {
102,210,514✔
1356
    if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) {
97,982,261✔
1357
      int64_t firstMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs;
×
1358
      int64_t lastMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs;
×
1359
      int64_t diffMs = lastMs - firstMs;
×
1360
      if (diffMs > 0 && diffMs < ((int64_t)SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) {
×
1361
        pMgr->retryBackoff -= 1;
×
1362
      }
1363
    }
1364
    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
97,982,261✔
1365
    pMgr->matchIndex = TMAX(pMgr->matchIndex, pMsg->matchIndex);
97,982,261✔
1366
    for (SyncIndex index = pMgr->startIndex; index < pMgr->matchIndex; index++) {
196,553,581✔
1367
      (void)memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
98,571,998✔
1368
    }
1369
    pMgr->startIndex = pMgr->matchIndex;
97,981,922✔
1370
  }
1371

1372
  return syncLogReplAttempt(pMgr, pNode);
102,210,175✔
1373
}
1374

1375
SSyncLogReplMgr* syncLogReplCreate() {
68,377,499✔
1376
  SSyncLogReplMgr* pMgr = taosMemoryCalloc(1, sizeof(SSyncLogReplMgr));
68,377,499✔
1377
  if (pMgr == NULL) {
68,377,050✔
1378
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1379
    return NULL;
×
1380
  }
1381

1382
  pMgr->size = sizeof(pMgr->states) / sizeof(pMgr->states[0]);
68,377,050✔
1383

1384
  if (pMgr->size != TSDB_SYNC_LOG_BUFFER_SIZE) {
68,378,045✔
1385
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1386
    return NULL;
×
1387
  }
1388

1389
  int32_t code = taosThreadMutexInit(&pMgr->mutex, NULL);
68,375,864✔
1390
  if (code) {
68,378,440✔
1391
    terrno = code;
×
1392
    return NULL;
×
1393
  }
1394

1395
  return pMgr;
68,378,440✔
1396
}
1397

1398
void syncLogReplDestroy(SSyncLogReplMgr* pMgr) {
68,171,150✔
1399
  if (pMgr == NULL) {
68,171,150✔
1400
    return;
×
1401
  }
1402
  (void)taosThreadMutexDestroy(&pMgr->mutex);
68,171,150✔
1403
  taosMemoryFree(pMgr);
68,172,066✔
1404
  return;
68,160,877✔
1405
}
1406

1407
int32_t syncNodeLogReplInit(SSyncNode* pNode) {
4,558,675✔
1408
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
72,937,413✔
1409
    if (pNode->logReplMgrs[i] != NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
68,378,738✔
1410
    pNode->logReplMgrs[i] = syncLogReplCreate();
68,378,662✔
1411
    if (pNode->logReplMgrs[i] == NULL) {
68,378,440✔
1412
      TAOS_RETURN(terrno);
×
1413
    }
1414
    pNode->logReplMgrs[i]->peerId = i;
68,379,100✔
1415
  }
1416
  return 0;
4,558,675✔
1417
}
1418

1419
void syncNodeLogReplDestroy(SSyncNode* pNode) {
4,545,335✔
1420
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
72,712,494✔
1421
    syncLogReplDestroy(pNode->logReplMgrs[i]);
68,167,159✔
1422
    pNode->logReplMgrs[i] = NULL;
68,163,629✔
1423
  }
1424
}
4,545,335✔
1425

1426
int32_t syncLogBufferCreate(SSyncLogBuffer** ppBuf) {
4,558,459✔
1427
  int32_t         code = 0;
4,558,459✔
1428
  SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer));
4,558,459✔
1429
  if (pBuf == NULL) {
4,558,675✔
1430
    TAOS_CHECK_GOTO(terrno, NULL, _exit);
×
1431
  }
1432

1433
  pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]);
4,558,675✔
1434

1435
  if (pBuf->size != TSDB_SYNC_LOG_BUFFER_SIZE) {
4,558,675✔
1436
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1437
    goto _exit;
×
1438
  }
1439

1440
  if (taosThreadMutexAttrInit(&pBuf->attr) < 0) {
4,558,675✔
1441
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1442
    sError("failed to init log buffer mutexattr due to %s", tstrerror(code));
×
1443
    goto _exit;
×
1444
  }
1445

1446
  if (taosThreadMutexAttrSetType(&pBuf->attr, PTHREAD_MUTEX_RECURSIVE) < 0) {
4,558,316✔
1447
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1448
    sError("failed to set log buffer mutexattr type due to %s", tstrerror(code));
×
1449
    goto _exit;
×
1450
  }
1451

1452
  if (taosThreadMutexInit(&pBuf->mutex, &pBuf->attr) < 0) {
4,558,675✔
1453
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1454
    sError("failed to init log buffer mutex due to %s", tstrerror(code));
×
1455
    goto _exit;
×
1456
  }
1457
_exit:
4,558,675✔
1458
  if (code != 0) {
4,558,675✔
1459
    taosMemoryFreeClear(pBuf);
×
1460
  }
1461
  *ppBuf = pBuf;
4,558,675✔
1462
  TAOS_RETURN(code);
4,558,387✔
1463
}
1464

1465
void syncLogBufferClear(SSyncLogBuffer* pBuf) {
4,545,335✔
1466
  (void)taosThreadMutexLock(&pBuf->mutex);
4,545,335✔
1467
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
207,132,141✔
1468
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
202,592,668✔
1469
    if (pEntry == NULL) continue;
202,639,543✔
1470
    syncEntryDestroy(pEntry);
202,628,007✔
1471
    pEntry = NULL;
202,510,994✔
1472
    (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
202,510,994✔
1473
  }
1474
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
4,545,335✔
1475
  pBuf->bytes = 0;
4,545,335✔
1476
  (void)taosThreadMutexUnlock(&pBuf->mutex);
4,545,335✔
1477
}
4,545,335✔
1478

1479
void syncLogBufferDestroy(SSyncLogBuffer* pBuf) {
4,545,335✔
1480
  if (pBuf == NULL) {
4,545,335✔
1481
    return;
×
1482
  }
1483
  syncLogBufferClear(pBuf);
4,545,335✔
1484
  (void)taosThreadMutexDestroy(&pBuf->mutex);
4,545,335✔
1485
  (void)taosThreadMutexAttrDestroy(&pBuf->attr);
4,545,335✔
1486
  taosMemoryFree(pBuf);
4,545,335✔
1487
  return;
4,545,335✔
1488
}
1489

1490
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
5,908,050✔
1491
  int32_t code = 0;
5,908,050✔
1492
  if (!(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR;
5,908,050✔
1493

1494
  if (toIndex == pBuf->endIndex) {
5,908,050✔
1495
    return 0;
5,901,521✔
1496
  }
1497

1498
  sInfo("vgId:%d, rollback sync log buffer, toindex:%" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
6,529✔
1499
        ")",
1500
        pNode->vgId, toIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1501

1502
  // trunc buffer
1503
  SyncIndex index = pBuf->endIndex - 1;
6,529✔
1504
  while (index >= toIndex) {
301,813✔
1505
    SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem;
295,284✔
1506
    if (pEntry != NULL) {
295,284✔
1507
      syncEntryDestroy(pEntry);
7,107✔
1508
      pEntry = NULL;
7,107✔
1509
      (void)memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0]));
7,107✔
1510
    }
1511
    index--;
295,284✔
1512
  }
1513
  pBuf->endIndex = toIndex;
6,529✔
1514
  pBuf->matchIndex = TMIN(pBuf->matchIndex, index);
6,529✔
1515
  if (index + 1 != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
6,529✔
1516

1517
  // trunc wal
1518
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
6,529✔
1519
  if (lastVer >= toIndex && (code = pNode->pLogStore->syncLogTruncate(pNode->pLogStore, toIndex)) < 0) {
6,529✔
1520
    sError("vgId:%d, failed to truncate log store since %s, from index:%" PRId64, pNode->vgId, tstrerror(code),
×
1521
           toIndex);
1522
    TAOS_RETURN(code);
×
1523
  }
1524
  lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
6,529✔
1525
  if (toIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
6,529✔
1526

1527
  // refill buffer on need
1528
  if (toIndex <= pBuf->startIndex) {
6,529✔
1529
    if ((code = syncLogBufferInitWithoutLock(pBuf, pNode)) < 0) {
×
1530
      sError("vgId:%d, failed to refill sync log buffer since %s", pNode->vgId, tstrerror(code));
×
1531
      TAOS_RETURN(code);
×
1532
    }
1533
  }
1534

1535
  if (pBuf->endIndex != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
6,529✔
1536
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
6,529✔
1537
  return 0;
6,529✔
1538
}
1539

1540
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
5,907,133✔
1541
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
5,907,133✔
1542
  (void)taosThreadMutexLock(&pBuf->mutex);
5,907,133✔
1543
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
5,907,133✔
1544
  if (lastVer != pBuf->matchIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
5,907,133✔
1545
  SyncIndex index = pBuf->endIndex - 1;
5,907,133✔
1546

1547
  int32_t code = 0;
5,907,133✔
1548
  if ((code = syncLogBufferRollback(pBuf, pNode, pBuf->matchIndex + 1)) != 0) {
5,907,133✔
1549
    sError("vgId:%d, failed to rollback sync log buffer since %s", pNode->vgId, tstrerror(code));
×
1550
  }
1551

1552
  sInfo("vgId:%d, reset sync log buffer, buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
5,907,133✔
1553
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1554

1555
  pBuf->endIndex = pBuf->matchIndex + 1;
5,907,133✔
1556

1557
  // reset repl mgr
1558
  for (int i = 0; i < pNode->totalReplicaNum; i++) {
17,696,432✔
1559
    SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
11,789,332✔
1560
    syncLogReplReset(pMgr);
11,789,332✔
1561
  }
1562
  (void)taosThreadMutexUnlock(&pBuf->mutex);
5,906,716✔
1563
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
5,906,749✔
1564
  return 0;
5,907,100✔
1565
}
1566

1567
int32_t syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf,
1,969,294,051✔
1568
                                 SSyncRaftEntry** ppEntry) {
1569
  int32_t code = 0;
1,969,294,051✔
1570

1571
  *ppEntry = NULL;
1,969,294,051✔
1572

1573
  if (index >= pBuf->endIndex) {
1,969,337,210✔
1574
    return TSDB_CODE_OUT_OF_RANGE;
833,503,880✔
1575
  }
1576

1577
  if (index > pBuf->startIndex) {  // startIndex might be dummy
1,135,804,526✔
1578
    *pInBuf = true;
1,124,307,903✔
1579
    *ppEntry = pBuf->entries[index % pBuf->size].pItem;
1,124,302,257✔
1580
#ifdef USE_MOUNT
1581
    if (pNode->mountVgId) {
1,124,318,496✔
1582
      SMsgHead* pMsgHead = (SMsgHead*)(*ppEntry)->data;
11,112✔
1583
      if (pMsgHead->vgId != pNode->vgId) pMsgHead->vgId = pNode->vgId;
11,112✔
1584
    }
1585
#endif
1586
  } else {
1587
    *pInBuf = false;
11,522,423✔
1588

1589
    if ((code = pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, index, ppEntry)) < 0) {
11,522,423✔
1590
      sWarn("vgId:%d, failed to get log entry since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
202,407✔
1591
    }
1592
  }
1593

1594
  TAOS_RETURN(code);
1,135,790,133✔
1595
}
1596

1597
int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId,
105,984,235✔
1598
                          bool* pBarrier) {
1599
  SSyncRaftEntry* pEntry = NULL;
105,984,235✔
1600
  SRpcMsg         msgOut = {0};
105,984,773✔
1601
  bool            inBuf = false;
105,983,801✔
1602
  SyncTerm        prevLogTerm = -1;
105,983,371✔
1603
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
105,982,116✔
1604
  int32_t         code = 0;
105,982,725✔
1605
  int32_t         lino = 0;
105,982,725✔
1606

1607
  code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry);
105,984,159✔
1608
  if (pEntry == NULL) {
105,981,797✔
1609
    sWarn("vgId:%d, failed to get raft entry for index:%" PRId64, pNode->vgId, index);
9,354✔
1610
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) {
9,354✔
1611
      SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
9,354✔
1612
      if (pMgr) {
9,354✔
1613
        sInfo("vgId:%d, reset sync log repl of peer addr:0x%" PRIx64 " since %s, index:%" PRId64, pNode->vgId, pDestId->addr,
9,354✔
1614
              tstrerror(code), index);
1615
        syncLogReplReset(pMgr);
9,354✔
1616
      }
1617
    }
1618
    goto _err;
9,354✔
1619
  }
1620
  *pBarrier = syncLogReplBarrier(pEntry);
211,944,929✔
1621

1622
  code = syncLogReplGetPrevLogTerm(pMgr, pNode, index, &prevLogTerm);
105,973,988✔
1623
  if (prevLogTerm < 0) {
105,971,547✔
1624
    sError("vgId:%d, failed to get prev log term since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
×
1625
    goto _err;
×
1626
  }
1627
  if (pTerm) *pTerm = pEntry->term;
105,971,547✔
1628

1629
  code = syncBuildAppendEntriesFromRaftEntry(pNode, pEntry, prevLogTerm, &msgOut);
105,973,833✔
1630
  if (code < 0) {
105,974,009✔
1631
    sError("vgId:%d, failed to get append entries for index:%" PRId64, pNode->vgId, index);
×
1632
    goto _err;
×
1633
  }
1634

1635
  TRACE_SET_MSGID(&(msgOut.info.traceId), tGenIdPI64());
105,974,009✔
1636
  sGDebug(&msgOut.info.traceId,
105,974,205✔
1637
          "vgId:%d, index:%" PRId64 ", replicate one msg to dest addr:0x%" PRIx64 ", term:%" PRId64 " prevterm:%" PRId64,
1638
          pNode->vgId, pEntry->index, pDestId->addr, pEntry->term, prevLogTerm);
1639

1640
  TAOS_CHECK_GOTO(syncNodeSendAppendEntries(pNode, pDestId, &msgOut), &lino, _err);
105,974,205✔
1641
  if (!inBuf) {
105,975,781✔
1642
    syncEntryDestroy(pEntry);
10,068,271✔
1643
    pEntry = NULL;
10,068,271✔
1644
  }
1645
  return 0;
105,975,781✔
1646

1647
_err:
9,354✔
1648
  rpcFreeCont(msgOut.pCont);
9,354✔
1649
  msgOut.pCont = NULL;
9,354✔
1650
  if (!inBuf) {
9,354✔
1651
    syncEntryDestroy(pEntry);
9,354✔
1652
    pEntry = NULL;
9,354✔
1653
  }
1654
  TAOS_RETURN(code);
9,354✔
1655
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc