• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4933

20 Jan 2026 10:44AM UTC coverage: 66.671% (+0.03%) from 66.646%
#4933

push

travis-ci

web-flow
merge: from main to 3.0 #34340

73 of 178 new or added lines in 9 files covered. (41.01%)

1199 existing lines in 124 files now uncovered.

203121 of 304663 relevant lines covered (66.67%)

132228377.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.43
/source/libs/sync/src/syncPipeline.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17

18
#include "syncCommit.h"
19
#include "syncIndexMgr.h"
20
#include "syncInt.h"
21
#include "syncPipeline.h"
22
#include "syncRaftCfg.h"
23
#include "syncRaftEntry.h"
24
#include "syncRaftStore.h"
25
#include "syncReplication.h"
26
#include "syncRespMgr.h"
27
#include "syncSnapshot.h"
28
#include "syncUtil.h"
29
#include "syncVoteMgr.h"
30

31
static int64_t tsLogBufferMemoryUsed = 0;  // total bytes of vnode log buffer
32

33
static bool syncIsMsgBlock(tmsg_t type) {
177,399,600✔
34
  return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
173,943,569✔
35
         (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
351,343,169✔
36
}
37

38
FORCE_INLINE static int64_t syncGetRetryMaxWaitMs() {
39
  return SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF);
2,074,137✔
40
}
41

42
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
678,401,428✔
43
  (void)taosThreadMutexLock(&pBuf->mutex);
678,401,428✔
44
  int64_t index = pBuf->endIndex;
678,412,447✔
45
  (void)taosThreadMutexUnlock(&pBuf->mutex);
678,412,180✔
46
  return index;
678,404,023✔
47
}
48

49
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
678,383,906✔
50
  int32_t code = 0;
678,383,906✔
51
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
678,383,906✔
52
  (void)taosThreadMutexLock(&pBuf->mutex);
678,404,375✔
53
  SyncIndex index = pEntry->index;
678,401,777✔
54

55
  if (index - pBuf->startIndex >= pBuf->size) {
678,406,530✔
56
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
57
    sError("vgId:%d, failed to append since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
×
58
    goto _err;
×
59
  }
60

61
  if (pNode->restoreFinish && index - pBuf->commitIndex >= TSDB_SYNC_NEGOTIATION_WIN) {
678,386,876✔
62
    code = TSDB_CODE_SYN_NEGOTIATION_WIN_FULL;
×
63
    sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64, pNode->vgId, tstrerror(code),
×
64
           index, pBuf->commitIndex);
65
    goto _err;
×
66
  }
67

68
  SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
678,386,572✔
69
  if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= tsSyncApplyQueueSize) {
678,397,832✔
70
    code = TSDB_CODE_SYN_WRITE_STALL;
×
71
    sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64,
×
72
           pNode->vgId, tstrerror(code), index, pBuf->commitIndex, appliedIndex);
73
    goto _err;
×
74
  }
75

76
  if (index != pBuf->endIndex) {
678,399,649✔
77
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
78
    goto _err;
×
79
  };
80

81
  SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem;
678,400,471✔
82
  if (pExist != NULL) {
678,399,886✔
83
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
84
    goto _err;
×
85
  }
86

87
  // initial log buffer with at least one item, e.g, commitIndex
88
  SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem;
678,399,886✔
89
  if (pMatch == NULL) {
678,370,880✔
90
    sError("vgId:%d, no matched log entry", pNode->vgId);
×
91
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
92
    goto _err;
×
93
  }
94
  if (pMatch->index + 1 != index) {
678,370,880✔
95
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
96
    goto _err;
×
97
  }
98
  if (!(pMatch->term <= pEntry->term)) {
678,402,657✔
99
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
100
    goto _err;
×
101
  }
102

103
  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
678,400,816✔
104
  pBuf->entries[index % pBuf->size] = tmp;
678,395,289✔
105
  pBuf->endIndex = index + 1;
678,402,457✔
106
  if (pNode->vgId > 1) {
678,383,499✔
107
    pBuf->bytes += pEntry->bytes;
643,027,937✔
108
    (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
643,034,912✔
109
  }
110

111
  (void)taosThreadMutexUnlock(&pBuf->mutex);
678,391,702✔
112
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
678,397,312✔
113
  return 0;
678,412,973✔
114

115
_err:
×
116
  (void)taosThreadMutexUnlock(&pBuf->mutex);
×
117
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
×
118
  taosMsleep(1);
×
119
  TAOS_RETURN(code);
×
120
}
121

122
int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pSyncTerm) {
99,669,991✔
123
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
99,669,991✔
124
  SSyncRaftEntry* pEntry = NULL;
99,669,715✔
125
  SyncIndex       prevIndex = index - 1;
99,669,163✔
126
  SyncTerm        prevLogTerm = -1;
99,669,163✔
127
  int32_t         code = 0;
99,669,163✔
128

129
  if (prevIndex == -1 && pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore) == 0) {
99,669,163✔
130
    *pSyncTerm = 0;
667,904✔
131
    return 0;
667,904✔
132
  }
133

134
  if (prevIndex > pBuf->matchIndex) {
99,001,259✔
135
    *pSyncTerm = -1;
52,699✔
136
    TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
52,699✔
137
  }
138

139
  if (index - 1 != prevIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
98,950,970✔
140

141
  if (prevIndex >= pBuf->startIndex) {
98,950,970✔
142
    pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem;
88,765,056✔
143
    if (pEntry == NULL) {
88,762,639✔
144
      sError("vgId:%d, failed to get pre log term since no log entry found", pNode->vgId);
×
145
      *pSyncTerm = -1;
×
146
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
147
    }
148
    prevLogTerm = pEntry->term;
88,762,639✔
149
    *pSyncTerm = prevLogTerm;
88,765,330✔
150
    return 0;
88,764,862✔
151
  }
152

153
  if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) {
10,185,638✔
154
    int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs;
8,542,505✔
155
    if (timeMs == 0) {
8,542,505✔
156
      sError("vgId:%d, failed to get pre log term since timeMs is 0", pNode->vgId);
×
157
      *pSyncTerm = -1;
×
158
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
159
    }
160
    prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term;
8,542,505✔
161
    if (!(prevIndex == 0 || prevLogTerm != 0)) {
8,542,505✔
162
      sError("vgId:%d, failed to get pre log term prevIndex:%" PRId64 ", prevLogTerm:%" PRId64, pNode->vgId, prevIndex,
×
163
             prevLogTerm);
164
      *pSyncTerm = -1;
×
165
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
166
    }
167
    *pSyncTerm = prevLogTerm;
8,542,505✔
168
    return 0;
8,542,505✔
169
  }
170

171
  SSnapshot snapshot = {0};
1,643,133✔
172
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
1,643,133✔
173
  if (prevIndex == snapshot.lastApplyIndex) {
1,643,133✔
174
    *pSyncTerm = snapshot.lastApplyTerm;
×
175
    return 0;
×
176
  }
177

178
  if ((code = pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, prevIndex, &pEntry)) == 0) {
1,643,133✔
179
    prevLogTerm = pEntry->term;
1,613,926✔
180
    syncEntryDestroy(pEntry);
1,613,926✔
181
    pEntry = NULL;
1,613,926✔
182
    *pSyncTerm = prevLogTerm;
1,613,926✔
183
    return 0;
1,613,926✔
184
  }
185

186
  *pSyncTerm = -1;
29,207✔
187
  sInfo("vgId:%d, failed to get log term since %s, index:%" PRId64, pNode->vgId, tstrerror(code), prevIndex);
29,207✔
188
  TAOS_RETURN(code);
29,207✔
189
}
190

191
SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId) {
4,298,016✔
192
  return syncEntryBuildNoop(term, index, vgId);
4,298,016✔
193
}
194

195
int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex) {
4,298,016✔
196
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
4,298,016✔
197
  if (firstVer > commitIndex + 1) {
4,298,016✔
198
    sError("vgId:%d, firstVer of WAL log greater than tsdb commit version + 1, firstVer:%" PRId64
×
199
           ", tsdb commit version:%" PRId64,
200
           pNode->vgId, firstVer, commitIndex);
201
    return TSDB_CODE_WAL_LOG_INCOMPLETE;
×
202
  }
203

204
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
4,298,016✔
205
  if (lastVer < commitIndex) {
4,298,016✔
206
    sError("vgId:%d, lastVer of WAL log less than tsdb commit version, lastVer:%" PRId64
×
207
           ", tsdb commit version:%" PRId64,
208
           pNode->vgId, lastVer, commitIndex);
209
    return TSDB_CODE_WAL_LOG_INCOMPLETE;
×
210
  }
211

212
  return 0;
4,298,016✔
213
}
214

215
int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
4,298,016✔
216
  if (pNode->pLogStore == NULL) {
4,298,016✔
217
    sError("log store not created");
×
218
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
219
  }
220
  if (pNode->pFsm == NULL) {
4,298,016✔
221
    sError("pFsm not registered");
×
222
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
223
  }
224
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
4,298,016✔
225
    sError("FpGetSnapshotInfo not registered");
×
226
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
227
  }
228

229
  int32_t   code = 0, lino = 0;
4,298,016✔
230
  SSnapshot snapshot = {0};
4,298,016✔
231
  TAOS_CHECK_EXIT(pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot));
4,298,016✔
232

233
  SyncIndex commitIndex = snapshot.lastApplyIndex;
4,298,016✔
234
  SyncTerm  commitTerm = TMAX(snapshot.lastApplyTerm, 0);
4,298,016✔
235
  TAOS_CHECK_EXIT(syncLogValidateAlignmentOfCommit(pNode, commitIndex));
4,298,016✔
236

237
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
4,298,016✔
238
  if (lastVer < commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
4,298,016✔
239
  ;
240
  SyncIndex toIndex = lastVer;
4,298,016✔
241
  // update match index
242
  pBuf->commitIndex = commitIndex;
4,298,016✔
243
  pBuf->matchIndex = toIndex;
4,298,016✔
244
  pBuf->endIndex = toIndex + 1;
4,298,016✔
245

246
  // load log entries in reverse order
247
  SSyncLogStore*  pLogStore = pNode->pLogStore;
4,298,016✔
248
  SyncIndex       index = toIndex;
4,298,016✔
249
  SSyncRaftEntry* pEntry = NULL;
4,298,016✔
250
  bool            takeDummy = false;
4,298,016✔
251
  int             emptySize = (TSDB_SYNC_LOG_BUFFER_SIZE >> 1);
4,298,016✔
252

253
  while (true) {
47,696,927✔
254
    if (index <= pBuf->commitIndex) {
51,994,943✔
255
      takeDummy = true;
4,298,016✔
256
      break;
4,298,016✔
257
    }
258

259
    if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) {
47,696,758✔
260
      sWarn("vgId:%d, failed to get log entry since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
×
261
      break;
×
262
    }
263
#ifdef USE_MOUNT
264
    if (pNode->mountVgId) {
47,696,844✔
265
      SMsgHead* pMsgHead = (SMsgHead*)pEntry->data;
4,100✔
266
      if (pMsgHead->vgId != pNode->vgId) pMsgHead->vgId = pNode->vgId;
4,100✔
267
    }
268
#endif
269
    bool taken = false;
47,696,758✔
270
    if (toIndex - index + 1 <= pBuf->size - emptySize) {
47,696,758✔
271
      SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1};
47,697,024✔
272
      pBuf->entries[index % pBuf->size] = tmp;
47,697,024✔
273
      taken = true;
47,697,102✔
274
      if (pNode->vgId > 1) {
47,697,102✔
275
        pBuf->bytes += pEntry->bytes;
47,588,473✔
276
        (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
47,588,732✔
277
      }
278
    }
279

280
    if (index < toIndex) {
47,696,651✔
281
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = pEntry->index;
46,843,306✔
282
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = pEntry->term;
46,843,582✔
283
    }
284

285
    if (!taken) {
47,696,927✔
UNCOV
286
      syncEntryDestroy(pEntry);
×
UNCOV
287
      pEntry = NULL;
×
UNCOV
288
      break;
×
289
    }
290

291
    index--;
47,696,927✔
292
  }
293

294
  // put a dummy record at commitIndex if present in log buffer
295
  if (takeDummy) {
4,298,016✔
296
    if (index != pBuf->commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
4,298,016✔
297

298
    SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId);
4,298,016✔
299
    if (pDummy == NULL) {
4,298,016✔
300
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
301
    }
302
    SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm};
4,298,016✔
303
    pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp;
4,298,016✔
304
    if (pNode->vgId > 1) {
4,298,016✔
305
      pBuf->bytes += pDummy->bytes;
3,887,950✔
306
      (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pDummy->bytes);
3,887,237✔
307
    }
308

309
    if (index < toIndex) {
4,298,016✔
310
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex;
853,710✔
311
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = commitTerm;
853,710✔
312
    }
313
  }
314

315
  // update startIndex
316
  pBuf->startIndex = takeDummy ? index : index + 1;
4,298,016✔
317

318
  pBuf->isCatchup = false;
4,298,016✔
319

320
  sInfo("vgId:%d, init sync log buffer, buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
4,298,016✔
321
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
322

323
  // validate
324
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
4,298,016✔
325
  return 0;
4,298,016✔
326

327
_exit:
×
328
  if (code != 0) {
×
329
    sError("vgId:%d, failed to initialize sync log buffer at line %d since %s.", pNode->vgId, lino, tstrerror(code));
×
330
  }
331
  TAOS_RETURN(code);
×
332
}
333

334
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
4,268,501✔
335
  (void)taosThreadMutexLock(&pBuf->mutex);
4,268,501✔
336
  int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
4,268,501✔
337
  (void)taosThreadMutexUnlock(&pBuf->mutex);
4,268,501✔
338
  return ret;
4,268,501✔
339
}
340

341
int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
29,515✔
342
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
29,515✔
343
  (void)taosThreadMutexLock(&pBuf->mutex);
29,515✔
344
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
68,957✔
345
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
39,442✔
346
    if (pEntry == NULL) continue;
39,442✔
347
    syncEntryDestroy(pEntry);
30,296✔
348
    pEntry = NULL;
30,296✔
349
    (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
30,296✔
350
  }
351
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
29,515✔
352
  pBuf->bytes = 0;
29,515✔
353
  int32_t code = syncLogBufferInitWithoutLock(pBuf, pNode);
29,515✔
354
  if (code < 0) {
29,515✔
355
    sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code));
×
356
  }
357
  (void)taosThreadMutexUnlock(&pBuf->mutex);
29,515✔
358
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
29,515✔
359
  return code;
29,515✔
360
}
361

362
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTermWithoutLock(SSyncLogBuffer* pBuf) {
363
  SyncIndex       index = pBuf->matchIndex;
118,934,404✔
364
  SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
118,934,404✔
365
  if (pEntry == NULL) {
118,933,958✔
366
    sError("failed to get last match term since entry is null");
×
367
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
368
    return -1;
×
369
  }
370
  return pEntry->term;
118,934,128✔
371
}
372

373
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
23,381,744✔
374
  (void)taosThreadMutexLock(&pBuf->mutex);
23,381,744✔
375
  SyncTerm term = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
23,381,744✔
376
  (void)taosThreadMutexUnlock(&pBuf->mutex);
23,381,744✔
377
  return term;
23,381,744✔
378
}
379

380
bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf) {
23,381,744✔
381
  (void)taosThreadMutexLock(&pBuf->mutex);
23,381,744✔
382
  bool empty = (pBuf->endIndex <= pBuf->startIndex);
23,381,744✔
383
  (void)taosThreadMutexUnlock(&pBuf->mutex);
23,381,744✔
384
  return empty;
23,381,744✔
385
}
386

387
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
95,552,660✔
388
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
95,552,660✔
389
  (void)taosThreadMutexLock(&pBuf->mutex);
95,552,660✔
390
  int32_t         code = 0;
95,552,660✔
391
  SyncIndex       index = pEntry->index;
95,552,660✔
392
  SyncIndex       prevIndex = pEntry->index - 1;
95,552,660✔
393
  SyncTerm        lastMatchTerm = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
95,552,660✔
394
  SSyncRaftEntry* pExist = NULL;
95,552,660✔
395
  bool            inBuf = true;
95,552,660✔
396

397
  if (lastMatchTerm < 0) {
95,552,660✔
398
    sError("vgId:%d, failed to accept, lastMatchTerm:%" PRId64, pNode->vgId, lastMatchTerm);
×
399
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
400
    goto _out;
×
401
  }
402

403
  if (index <= pBuf->commitIndex) {
95,552,660✔
404
    sTrace("vgId:%d, already committed, index:%" PRId64 ", term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64
2,274,374✔
405
           " %" PRId64 ", %" PRId64 ")",
406
           pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
407
           pBuf->endIndex);
408
    SyncTerm term = -1;
2,274,374✔
409
    code = syncLogReplGetPrevLogTerm(NULL, pNode, index + 1, &term);
2,274,374✔
410
    if (pEntry->term < 0) {
2,274,374✔
411
      sError("vgId:%d, failed to accept, pEntry->term:%" PRId64, pNode->vgId, pEntry->term);
×
412
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
413
      goto _out;
×
414
    }
415
    if (term == pEntry->term) {
2,274,374✔
416
      code = 0;
2,274,374✔
417
    }
418
    goto _out;
2,274,374✔
419
  }
420

421
  if (pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER && index > 0 &&
93,278,010✔
422
      index > pBuf->totalIndex) {
8,839,620✔
423
    pBuf->totalIndex = index;
225,810✔
424
    sTrace("vgId:%d, update learner progress, index:%" PRId64 ", term:%" PRId64 ": prevterm:%" PRId64
225,810✔
425
           " != lastmatch:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
426
           pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
427
           pBuf->matchIndex, pBuf->endIndex);
428
  }
429

430
  if (index - pBuf->startIndex >= pBuf->size) {
93,278,286✔
431
    sWarn("vgId:%d, out of buffer range, index:%" PRId64 ", term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64
×
432
          " %" PRId64 ", %" PRId64 ")",
433
          pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
434
          pBuf->endIndex);
435
    code = TSDB_CODE_OUT_OF_RANGE;
×
436
    goto _out;
×
437
  }
438

439
  if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) {
93,277,409✔
440
    sWarn("vgId:%d, not ready to accept, index:%" PRId64 ", term:%" PRId64 ": prevterm:%" PRId64
218,721✔
441
          " != lastmatch:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
442
          pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
443
          pBuf->matchIndex, pBuf->endIndex);
444
    code = TSDB_CODE_ACTION_IN_PROGRESS;
218,721✔
445
    goto _out;
218,721✔
446
  }
447

448
  // check current in buffer
449
  code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pExist);
93,058,387✔
450
  if (pExist != NULL) {
93,058,714✔
451
    if (pEntry->index != pExist->index) {
810,692✔
452
      sError("vgId:%d, failed to accept, pEntry->index:%" PRId64 ", pExist->index:%" PRId64, pNode->vgId, pEntry->index,
×
453
             pExist->index);
454
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
455
      goto _out;
×
456
    }
457
    if (pEntry->term != pExist->term) {
810,692✔
458
      TAOS_CHECK_GOTO(syncLogBufferRollback(pBuf, pNode, index), NULL, _out);
1,550✔
459
    } else {
460
      sTrace("vgId:%d, duplicate log entry received, index:%" PRId64 ", term:%" PRId64 ", log buffer: [%" PRId64
809,142✔
461
             " %" PRId64 " %" PRId64 ", %" PRId64 ")",
462
             pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
463
             pBuf->endIndex);
464
      SyncTerm existPrevTerm = -1;
809,142✔
465
      TAOS_CHECK_GOTO(syncLogReplGetPrevLogTerm(NULL, pNode, index, &existPrevTerm), NULL, _out);
809,142✔
466
      if (!(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm))) {
756,443✔
467
        sError("vgId:%d, failed to accept, pEntry->term:%" PRId64 ", pExist->indexpExist->term:%" PRId64
×
468
               ", pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64 ", prevTerm:%" PRId64
469
               ", existPrevTerm:%" PRId64,
470
               pNode->vgId, pEntry->term, pExist->term, pEntry->index, pBuf->matchIndex, prevTerm, existPrevTerm);
471
        code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
472
        goto _out;
×
473
      }
474
      code = 0;
756,443✔
475
      goto _out;
756,443✔
476
    }
477
  }
478

479
  // update
480
  if (!(pBuf->startIndex < index)) {
92,249,572✔
481
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
482
    goto _out;
×
483
  };
484
  if (!(index - pBuf->startIndex < pBuf->size)) {
92,250,098✔
485
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
486
    goto _out;
×
487
  }
488
  if (pBuf->entries[index % pBuf->size].pItem != NULL) {
92,249,495✔
489
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
490
    goto _out;
×
491
  }
492
  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
92,249,897✔
493
  pBuf->entries[index % pBuf->size] = tmp;
92,249,897✔
494
  if (pNode->vgId > 1) {
92,250,147✔
495
    pBuf->bytes += pEntry->bytes;
89,391,624✔
496
    (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
89,391,248✔
497
  }
498
  pEntry = NULL;
92,248,720✔
499

500
  // update end index
501
  pBuf->endIndex = TMAX(index + 1, pBuf->endIndex);
92,248,720✔
502

503
  // success
504
  code = 0;
92,248,519✔
505

506
_out:
95,550,756✔
507
  syncEntryDestroy(pEntry);
95,550,756✔
508
  if (!inBuf) {
95,549,774✔
509
    syncEntryDestroy(pExist);
×
510
    pExist = NULL;
×
511
  }
512
  (void)taosThreadMutexUnlock(&pBuf->mutex);
95,549,774✔
513
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
95,551,954✔
514
  TAOS_RETURN(code);
95,551,678✔
515
}
516

517
static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replicaNum) {
770,616,774✔
518
  return (replicaNum > 1) && (pEntry->originalRpcType == TDMT_VND_COMMIT);
770,616,774✔
519
}
520

521
int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
770,613,495✔
522
  int32_t code = 0;
770,613,495✔
523
  if (pEntry->index < 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
770,613,495✔
524
  SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore);
770,610,401✔
525
  if (lastVer >= pEntry->index && (code = pLogStore->syncLogTruncate(pLogStore, pEntry->index)) < 0) {
770,635,009✔
526
    sError("failed to truncate log store since %s, from index:%" PRId64, tstrerror(code), pEntry->index);
×
527
    TAOS_RETURN(code);
×
528
  }
529
  lastVer = pLogStore->syncLogLastIndex(pLogStore);
770,635,009✔
530
  if (pEntry->index != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
770,626,955✔
531

532
#ifdef USE_MOUNT
533
  if (pNode->mountVgId) {
770,625,430✔
534
    SMsgHead* pHead = (SMsgHead*)pEntry->data;
2,068✔
535
    if (pHead->vgId != pNode->mountVgId) pHead->vgId = pNode->mountVgId;
2,068✔
536
  }
537
#endif
538
  bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum);
770,623,688✔
539
  if ((code = pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync)) < 0) {
770,603,501✔
540
    sError("failed to persist raft entry since %s, index:%" PRId64 ", term:%" PRId64, tstrerror(code), pEntry->index,
×
541
           pEntry->term);
542
    TAOS_RETURN(code);
×
543
  }
544

545
  lastVer = pLogStore->syncLogLastIndex(pLogStore);
770,597,415✔
546
  if (pEntry->index != lastVer) return TSDB_CODE_SYN_INTERNAL_ERROR;
770,620,574✔
547
  return 0;
770,623,087✔
548
}
549

550
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char* str, const SRpcMsg *pMsg) {
773,937,261✔
551
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
773,937,261✔
552
  (void)taosThreadMutexLock(&pBuf->mutex);
773,965,114✔
553

554
  SSyncLogStore* pLogStore = pNode->pLogStore;
773,961,529✔
555
  int64_t        matchIndex = pBuf->matchIndex;
773,960,019✔
556
  int32_t        code = 0;
773,943,683✔
557

558
  while (pBuf->matchIndex + 1 < pBuf->endIndex) {
1,544,531,030✔
559
    int64_t index = pBuf->matchIndex + 1;
775,598,412✔
560
    if (index < 0) {
775,586,779✔
561
      sError("vgId:%d, failed to proceed index:%" PRId64, pNode->vgId, index);
×
562
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
563
      goto _out;
×
564
    }
565

566
    // try to proceed
567
    SSyncLogBufEntry* pBufEntry = &pBuf->entries[index % pBuf->size];
775,586,779✔
568
    SyncIndex         prevLogIndex = pBufEntry->prevLogIndex;
775,601,154✔
569
    SyncTerm          prevLogTerm = pBufEntry->prevLogTerm;
775,597,081✔
570
    SSyncRaftEntry*   pEntry = pBufEntry->pItem;
775,594,027✔
571
    if (pEntry == NULL) {
775,576,951✔
572
      sTrace("vgId:%d, msg:%p, cannot proceed match index in log buffer, no raft entry at next pos of matchIndex:%" PRId64,
4,979,923✔
573
             pNode->vgId, pMsg, pBuf->matchIndex);
574
      goto _out;
4,979,923✔
575
    }
576

577
    if (index != pEntry->index) {
770,597,028✔
578
      sError("vgId:%d, msg:%p, failed to proceed index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId, pMsg, index, pEntry->index);
×
579
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
580
      goto _out;
×
581
    }
582

583
    // match
584
    SSyncRaftEntry* pMatch = pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem;
770,611,890✔
585
    if (pMatch == NULL) {
770,596,142✔
586
      sError("vgId:%d, msg:%p, failed to proceed since pMatch is null", pNode->vgId, pMsg);
×
587
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
588
      goto _out;
×
589
    }
590
    if (pMatch->index != pBuf->matchIndex) {
770,596,142✔
591
      sError("vgId:%d, msg:%p, failed to proceed, pMatch->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId,
×
592
             pMsg, pMatch->index, pBuf->matchIndex);
593
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
594
      goto _out;
×
595
    }
596
    if (pMatch->index + 1 != pEntry->index) {
770,606,929✔
597
      sError("vgId:%d, msg:%p, failed to proceed, pMatch->index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId, pMsg,
×
598
             pMatch->index, pEntry->index);
599
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
600
      goto _out;
×
601
    }
602
    if (prevLogIndex != pMatch->index) {
770,598,047✔
603
      sError("vgId:%d, msg:%p, failed to proceed, prevLogIndex:%" PRId64 ", pMatch->index:%" PRId64, pNode->vgId, pMsg,
×
604
             prevLogIndex, pMatch->index);
605
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
606
      goto _out;
×
607
    }
608

609
    if (pMatch->term != prevLogTerm) {
770,611,890✔
610
      sInfo(
×
611
          "vgId:%d, msg:%p, mismatching sync log entries encountered, "
612
          "{ index:%" PRId64 ", term:%" PRId64
613
          " } "
614
          "{ index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64 ", prevLogTerm:%" PRId64 " } ",
615
          pNode->vgId, pMsg, pMatch->index, pMatch->term, pEntry->index, pEntry->term, prevLogIndex, prevLogTerm);
616
      goto _out;
×
617
    }
618

619
    // increase match index
620
    pBuf->matchIndex = index;
770,611,131✔
621

622
    sGDebug(pMsg ? &pMsg->info.traceId : NULL,
770,614,111✔
623
            "vgId:%d, msg:%p, log buffer proceed, start index:%" PRId64 ", match index:%" PRId64 ", end index:%" PRId64,
624
            pNode->vgId, pMsg, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex);
625

626
    // persist
627
    if ((code = syncLogStorePersist(pLogStore, pNode, pEntry)) < 0) {
770,614,111✔
628
      sError("vgId:%d, msg:%p, failed to persist sync log entry from buffer since %s, index:%" PRId64, pNode->vgId,
×
629
             pMsg, tstrerror(code), pEntry->index);
630
      taosMsleep(1);
×
631
      goto _out;
×
632
    }
633

634
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
770,608,304✔
635
      if (pNode->pLogBuf->commitIndex == pEntry->index - 1) {
×
636
        sInfo(
×
637
            "vgId:%d, msg:%p, to change config at %s, "
638
            "current entry, index:%" PRId64 ", term:%" PRId64
639
            ", "
640
            "node, restore:%d, commitIndex:%" PRId64
641
            ", "
642
            "cond: (pre entry index:%" PRId64 "== buf commit index:%" PRId64 ")",
643
            pNode->vgId, pMsg, str, pEntry->index, pEntry->term, pNode->restoreFinish, pNode->commitIndex,
644
            pEntry->index - 1, pNode->pLogBuf->commitIndex);
645
        if ((code = syncNodeChangeConfig(pNode, pEntry, str)) != 0) {
×
646
          sError("vgId:%d, failed to change config from Append since %s, index:%" PRId64, pNode->vgId, tstrerror(code),
×
647
                 pEntry->index);
648
          goto _out;
×
649
        }
650
      } else {
651
        sInfo(
×
652
            "vgId:%d, msg:%p, delay change config from Node %s, "
653
            "curent entry, index:%" PRId64 ", term:%" PRId64
654
            ", "
655
            "node, commitIndex:%" PRId64 ",  pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
656
            "), "
657
            "cond:( pre entry index:%" PRId64 " != buf commit index:%" PRId64 ")",
658
            pNode->vgId, pMsg, str, pEntry->index, pEntry->term, pNode->commitIndex, pNode->pLogBuf->startIndex,
659
            pNode->pLogBuf->commitIndex, pNode->pLogBuf->matchIndex, pNode->pLogBuf->endIndex, pEntry->index - 1,
660
            pNode->pLogBuf->commitIndex);
661
      }
662
    }
663

664
    // replicate on demand
665
    if ((code = syncNodeReplicateWithoutLock(pNode)) != 0) {
770,620,416✔
666
      sError("vgId:%d, msg:%p, failed to replicate since %s, index:%" PRId64, pNode->vgId, pMsg, tstrerror(code),
×
667
             pEntry->index);
668
      goto _out;
×
669
    }
670

671
    if (pEntry->index != pBuf->matchIndex) {
770,605,334✔
672
      sError("vgId:%d, msg:%p, failed to proceed, pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId,
×
673
             pMsg, pEntry->index, pBuf->matchIndex);
674
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
675
      goto _out;
×
676
    }
677

678
    // update my match index
679
    matchIndex = pBuf->matchIndex;
770,610,493✔
680
    syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex);
770,618,518✔
681
  }  // end of while
682

683
_out:
768,964,001✔
684
  pBuf->matchIndex = matchIndex;
773,943,924✔
685
  if (pMatchTerm) {
773,942,909✔
686
    *pMatchTerm = pBuf->entries[(matchIndex + pBuf->size) % pBuf->size].pItem->term;
95,551,027✔
687
  }
688
  (void)taosThreadMutexUnlock(&pBuf->mutex);
773,943,965✔
689
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
773,934,684✔
690
  return matchIndex;
773,933,257✔
691
}
692

693
int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
816,691,479✔
694
                       int32_t applyCode, bool force) {
695
  // learner need to execute fsm when it catch up entry log
696
  // if force is true, keep all contition check to execute fsm
697
  if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1 &&
816,691,479✔
698
      pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER && force == false) {
601,249,265✔
699
    sGDebug(&pEntry->originRpcTraceId,
600,991,840✔
700
            "vgId:%d, index:%" PRId64 ", raft fsm no need to execute, term:%" PRId64
701
            ", type:%s code:0x%x, replica:%d, role:%d, restoreFinish:%d",
702
            pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode, pNode->replicaNum,
703
            pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole, pNode->restoreFinish);
704
    return 0;
600,955,498✔
705
  }
706

707
  if (pNode->vgId != 1 && syncIsMsgBlock(pEntry->originalRpcType)) {
215,746,518✔
708
    sTrace("vgId:%d, index:%" PRId64 ", blocking msg ready to execute, term:%" PRId64 ", type:%s code:0x%x",
8,310,161✔
709
           pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode);
710
  }
711

712
  if (pEntry->originalRpcType == TDMT_VND_COMMIT) {
215,704,676✔
713
    sInfo("vgId:%d, index:%" PRId64 ", fsm execute vnode commit, term:%" PRId64, pNode->vgId, pEntry->index,
571,112✔
714
          pEntry->term);
715
  }
716

717
  int32_t code = 0, lino = 0;
215,704,691✔
718
  bool    retry = false;
215,704,691✔
719
  do {
720
    SFsmCbMeta cbMeta = {0};
215,705,068✔
721
    cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
215,704,691✔
722
    if (cbMeta.lastConfigIndex < -1) {
215,704,008✔
723
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
724
      if (terrno != 0) code = terrno;
×
725
      return code;
×
726
    }
727

728
    SRpcMsg rpcMsg = {.code = applyCode};
215,704,008✔
729
    TAOS_CHECK_EXIT(syncEntry2OriginalRpc(pEntry, &rpcMsg));
215,704,691✔
730

731
    cbMeta.index = pEntry->index;
215,704,415✔
732
    cbMeta.isWeak = pEntry->isWeak;
215,704,691✔
733
    cbMeta.code = applyCode;
215,704,423✔
734
    cbMeta.state = role;
215,704,423✔
735
    cbMeta.seqNum = pEntry->seqNum;
215,704,423✔
736
    cbMeta.term = pEntry->term;
215,704,423✔
737
    cbMeta.currentTerm = term;
215,704,392✔
738
    cbMeta.flag = -1;
215,704,392✔
739
    rpcMsg.info.traceId = pEntry->originRpcTraceId;
215,704,392✔
740

741
    int32_t num = syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
215,703,452✔
742
    sGDebug(&rpcMsg.info.traceId, "vgId:%d, index:%" PRId64 ", get response info, handle:%p seq:%" PRId64 " num:%d",
215,704,392✔
743
            pNode->vgId, pEntry->index, &rpcMsg.info, cbMeta.seqNum, num);
744

745
    code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
215,704,392✔
746

747
    retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
215,702,821✔
748

749
    sGTrace(&rpcMsg.info.traceId,
215,703,978✔
750
            "vgId:%d, index:%" PRId64 ", fsm execute, term:%" PRId64 ", type:%s, code:%d, retry:%d", pNode->vgId,
751
            pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), code, retry);
752

753
    if (retry) {
215,703,454✔
754
      taosMsleep(10);
×
755
      if (code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE) {
×
756
        pNode->applyQueueErrorCount++;
×
757
        if (pNode->applyQueueErrorCount == APPLY_QUEUE_ERROR_THRESHOLD) {
×
758
          pNode->applyQueueErrorCount = 0;
×
759
          sGWarn(&rpcMsg.info.traceId,
×
760
                 "vgId:%d, index:%" PRId64 ", will retry to execute fsm after 10ms, last error is %s", pNode->vgId,
761
                 pEntry->index, tstrerror(code));
762
        } else {
763
          sGTrace(&rpcMsg.info.traceId,
×
764
                  "vgId:%d, index:%" PRId64 ", will retry to execute fsm after 10ms, last error is %s", pNode->vgId,
765
                  pEntry->index, tstrerror(code));
766
        }
767
      }
768
    }
769
  } while (retry);
215,704,355✔
770

771
_exit:
215,703,978✔
772
  if (code < 0) {
215,703,978✔
773
    sError("vgId:%d, index:%" PRId64 ", failed to execute fsm at line %d since %s, term:%" PRId64 ", type:%s",
35,576✔
774
           pNode->vgId, pEntry->index, lino, tstrerror(code), pEntry->term, TMSG_INFO(pEntry->originalRpcType));
775
  }
776
  return code;
215,703,978✔
777
}
778

779
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) {
2,147,483,647✔
780
  if (pBuf->startIndex > pBuf->matchIndex) {
2,147,483,647✔
781
    sError("failed to validate, pBuf->startIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->startIndex,
×
782
           pBuf->matchIndex);
783
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
784
  }
785
  if (pBuf->commitIndex > pBuf->matchIndex) {
2,147,483,647✔
786
    sError("failed to validate, pBuf->commitIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->commitIndex,
×
787
           pBuf->matchIndex);
788
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
789
  }
790
  if (pBuf->matchIndex >= pBuf->endIndex) {
2,147,483,647✔
791
    sError("failed to validate, pBuf->matchIndex:%" PRId64 ", pBuf->endIndex:%" PRId64, pBuf->matchIndex,
×
792
           pBuf->endIndex);
793
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
794
  }
795
  if (pBuf->endIndex - pBuf->startIndex > pBuf->size) {
2,147,483,647✔
796
    sError("failed to validate, pBuf->endIndex:%" PRId64 ", pBuf->startIndex:%" PRId64 ", pBuf->size:%" PRId64,
×
797
           pBuf->endIndex, pBuf->startIndex, pBuf->size);
798
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
799
  }
800
  if (pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem == NULL) {
2,147,483,647✔
801
    sError("failed to validate since pItem is null");
×
802
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
803
  }
804
  return 0;
2,147,483,647✔
805
}
806

807
int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex, const STraceId* trace,
852,510,815✔
808
                            const char* src) {
809
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
852,510,815✔
810
  (void)taosThreadMutexLock(&pBuf->mutex);
852,544,884✔
811

812
  SSyncLogStore*  pLogStore = pNode->pLogStore;
852,505,650✔
813
  SSyncFSM*       pFsm = pNode->pFsm;
852,530,916✔
814
  ESyncState      role = pNode->state;
852,521,980✔
815
  SyncTerm        currentTerm = raftStoreGetTerm(pNode);
852,487,315✔
816
  SyncGroupId     vgId = pNode->vgId;
852,547,360✔
817
  int32_t         code = 0;
852,515,859✔
818
  int64_t         upperIndex = TMIN(commitIndex, pBuf->matchIndex);
852,515,859✔
819
  SSyncRaftEntry* pEntry = NULL;
852,536,576✔
820
  bool            inBuf = false;
852,530,563✔
821
  SSyncRaftEntry* pNextEntry = NULL;
852,530,753✔
822
  bool            nextInBuf = false;
852,520,968✔
823
  bool            restoreFinishAtThisCommit = false;
852,514,920✔
824

825
  if (commitIndex <= pBuf->commitIndex) {
852,514,920✔
826
    sGDebug(trace, "vgId:%d, stale commit index:%" PRId64 ", notified:%" PRId64, vgId, commitIndex, pBuf->commitIndex);
86,321,573✔
827
    if (!pNode->restoreFinish && commitIndex > 0 && commitIndex == pBuf->commitIndex) {
86,321,573✔
828
      sInfo("vgId:%d, try to get entry for restore check at index:%" PRId64, vgId, commitIndex);
5,874,496✔
829
      int32_t ret = syncLogBufferGetOneEntry(pBuf, pNode, commitIndex, &inBuf, &pEntry);
5,874,172✔
830
      if (ret != 0) {
5,874,496✔
831
        sWarn("vgId:%d, failed to get entry for restore check at index:%" PRId64, vgId, commitIndex);
198,652✔
832
      }
833
    }
834
    goto _out;
86,321,597✔
835
  }
836

837
  sGDebug(trace,
766,172,178✔
838
          "vgId:%d, log commit since %s, buffer:[%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
839
          "), role:%d, term:%" PRId64,
840
          pNode->vgId, src, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, currentTerm);
841

842
  // execute in fsm
843
  for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) {
1,582,772,301✔
844
    // get a log entry
845
    code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry);
816,689,985✔
846
    if (pEntry == NULL) {
816,684,770✔
847
      goto _out;
×
848
    }
849

850
    // execute it
851
    if (!syncUtilUserCommit(pEntry->originalRpcType)) {
816,684,770✔
852
      sGInfo(&pEntry->originRpcTraceId,
12,001,171✔
853
             "vgId:%d, index:%" PRId64 ", log commit sync barrier, term:%" PRId64 ", type:%s", vgId, pEntry->index,
854
             pEntry->term, TMSG_INFO(pEntry->originalRpcType));
855
    }
856

857
    if ((code = syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0, false)) != 0) {
816,676,490✔
858
      sGError(&pEntry->originRpcTraceId,
35,576✔
859
              "vgId:%d, index:%" PRId64 ", failed to execute sync log entry, term:%" PRId64
860
              ", role:%d, current term:%" PRId64,
861
              vgId, pEntry->index, pEntry->term, role, currentTerm);
862
      goto _out;
35,576✔
863
    }
864
    pBuf->commitIndex = index;
816,652,735✔
865

866
    sGDebug(&pEntry->originRpcTraceId,
816,651,485✔
867
            "vgId:%d, index:%" PRId64 ", raft entry committed, term:%" PRId64 ", role:%d, current term:%" PRId64,
868
            pNode->vgId, pEntry->index, pEntry->term, role, currentTerm);
869

870
    code = syncLogBufferGetOneEntry(pBuf, pNode, index + 1, &nextInBuf, &pNextEntry);
816,651,485✔
871
    if (pNextEntry != NULL) {
816,648,383✔
872
      if (pNextEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
126,456,387✔
873
        sInfo(
×
874
            "vgId:%d, to change config at commit, "
875
            "current entry, index:%" PRId64 ", term:%" PRId64
876
            ", "
877
            "node, role:%d, current term:%" PRId64
878
            ", restore:%d, "
879
            "cond, next entry index:%" PRId64 ", msgType:%s",
880
            vgId, pEntry->index, pEntry->term, role, currentTerm, pNode->restoreFinish, pNextEntry->index,
881
            TMSG_INFO(pNextEntry->originalRpcType));
882

883
        if ((code = syncNodeChangeConfig(pNode, pNextEntry, "Commit")) != 0) {
×
884
          sError("vgId:%d, failed to change config from Commit, index:%" PRId64 ", term:%" PRId64
×
885
                 ", role:%d, current term:%" PRId64,
886
                 vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
887
          goto _out;
×
888
        }
889

890
        // for 2->1, need to apply config change entry in sync thread,
891
        if (pNode->replicaNum == 1) {
×
892
          if ((code = syncFsmExecute(pNode, pFsm, role, currentTerm, pNextEntry, 0, true)) != 0) {
×
893
            sError("vgId:%d, failed to execute sync log entry, index:%" PRId64 ", term:%" PRId64
×
894
                   ", role:%d, current term:%" PRId64,
895
                   vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
896
            goto _out;
×
897
          }
898

899
          index++;
×
900
          pBuf->commitIndex = index;
×
901

902
          sGDebug(&pNextEntry->originRpcTraceId,
×
903
                  "vgId:%d, index:%" PRId64 ", raft entry committed, term:%" PRId64 ", role:%d, current term:%" PRId64,
904
                  pNode->vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
905
        }
906
      }
907
      if (!nextInBuf) {
126,408,114✔
UNCOV
908
        syncEntryDestroy(pNextEntry);
×
UNCOV
909
        pNextEntry = NULL;
×
910
      }
911
    }
912

913
    if (!inBuf) {
816,600,110✔
UNCOV
914
      syncEntryDestroy(pEntry);
×
UNCOV
915
      pEntry = NULL;
×
916
    }
917
  }
918

919
  // recycle
920
  bool      isVnode = pNode->vgId > 1;
766,094,730✔
921
  SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION;
766,095,711✔
922
  do {
631,122,319✔
923
    if ((pBuf->startIndex >= pBuf->commitIndex) ||
1,397,261,015✔
924
        !((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD &&
1,397,267,413✔
925
                                         atomic_load_64(&tsLogBufferMemoryUsed) >= tsLogBufferMemoryAllowed))) {
23,975,357✔
926
      break;
927
    }
928
    SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem;
631,134,671✔
929
    if (pEntry == NULL) {
631,131,411✔
930
      sError("vgId:%d, invalid log entry to recycle, index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
×
931
             ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64,
932
             pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term);
933
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
934
    }
935
    if (isVnode) {
631,131,411✔
936
      pBuf->bytes -= pEntry->bytes;
618,465,829✔
937
      (void)atomic_sub_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
618,468,931✔
938
    }
939
    sDebug("vgId:%d, recycle log entry, index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
631,130,035✔
940
           ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64
941
           ", used:%" PRId64 ", allowed:%" PRId64,
942
           pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term,
943
           pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed), tsLogBufferMemoryAllowed);
944
    syncEntryDestroy(pEntry);
631,130,035✔
945
    (void)memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
631,109,846✔
946
    ++pBuf->startIndex;
631,117,276✔
947
  } while (true);
948

949
  code = 0;
766,126,216✔
950
_out:
852,483,389✔
951
  // mark as restored if needed
952
  if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL &&
852,496,533✔
953
      currentTerm <= pEntry->term) {
16,927,880✔
954
    pNode->pFsm->FpRestoreFinishCb(pNode->pFsm, pBuf->commitIndex);
4,520,444✔
955
    pNode->restoreFinish = true;
4,520,444✔
956
    restoreFinishAtThisCommit = true;
4,520,444✔
957
    sInfo("vgId:%d, restore finished, term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
4,520,444✔
958
          pNode->vgId, currentTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
959
  }
960

961
  if (!inBuf) {
852,503,334✔
962
    syncEntryDestroy(pEntry);
81,496,939✔
963
    pEntry = NULL;
81,496,939✔
964
  }
965
  if (!nextInBuf) {
852,503,334✔
966
    syncEntryDestroy(pNextEntry);
774,775,514✔
967
    pNextEntry = NULL;
774,773,269✔
968
  }
969
  (void)taosThreadMutexUnlock(&pBuf->mutex);
852,501,089✔
970

971
  if (restoreFinishAtThisCommit && pNode->pFsm->FpAfterRestoredCb != NULL) {
852,533,841✔
972
    pNode->pFsm->FpAfterRestoredCb(pNode->pFsm, pBuf->commitIndex);
486,145✔
973
    sInfo("vgId:%d, after restore finished callback executed)", pNode->vgId);
486,145✔
974
  }
975

976
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
852,533,841✔
977
  TAOS_RETURN(code);
852,504,429✔
978
}
979

980
void syncLogReplReset(SSyncLogReplMgr* pMgr) {
14,721,745✔
981
  if (pMgr == NULL) return;
14,721,745✔
982

983
  if (pMgr->startIndex < 0) {
14,721,745✔
984
    sError("failed to reset, pMgr->startIndex:%" PRId64, pMgr->startIndex);
×
985
    return;
×
986
  }
987
  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
16,010,849✔
988
    (void)memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
1,289,104✔
989
  }
990
  pMgr->startIndex = 0;
14,720,845✔
991
  pMgr->matchIndex = 0;
14,721,745✔
992
  pMgr->endIndex = 0;
14,721,745✔
993
  pMgr->restored = false;
14,721,446✔
994
  pMgr->retryBackoff = 0;
14,721,446✔
995
}
996

997
int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
178,481,187✔
998
  if (pMgr->endIndex <= pMgr->startIndex) {
178,481,187✔
999
    return 0;
×
1000
  }
1001

1002
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
178,482,756✔
1003
  if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) {
178,482,756✔
1004
    syncLogReplReset(pMgr);
1,437✔
1005
    sWarn("vgId:%d, reset sync log repl since retry backoff exceeding limit, peer addr:0x%" PRIx64, pNode->vgId,
1,437✔
1006
          pDestId->addr);
1007
    return TSDB_CODE_OUT_OF_RANGE;
1,437✔
1008
  }
1009

1010
  int32_t  code = 0;
178,480,986✔
1011
  bool     retried = false;
178,480,986✔
1012
  int64_t  retryWaitMs = syncLogReplGetRetryBackoffTimeMs(pMgr);
178,480,661✔
1013
  int64_t  nowMs = taosGetMonoTimestampMs();
178,479,974✔
1014
  int      count = 0;
178,479,974✔
1015
  int64_t  firstIndex = -1;
178,479,974✔
1016
  SyncTerm term = -1;
178,479,974✔
1017
  int64_t  batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
178,480,442✔
1018

1019
  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
196,486,438✔
1020
    int64_t pos = index % pMgr->size;
192,267,971✔
1021
    if (!(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex))) {
192,267,597✔
1022
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1023
      goto _out;
×
1024
    }
1025

1026
    if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) {
192,268,245✔
1027
      break;
174,256,638✔
1028
    }
1029

1030
    if (pMgr->states[pos].acked) {
18,011,814✔
1031
      if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) {
15,459,520✔
1032
        syncLogReplReset(pMgr);
129✔
1033
        sWarn("vgId:%d, reset sync log repl since stagnation, index:%" PRId64 ", peer addr:0x%" PRIx64, pNode->vgId, index,
129✔
1034
              pDestId->addr);
1035
        code = TSDB_CODE_ACTION_IN_PROGRESS;
129✔
1036
        goto _out;
129✔
1037
      }
1038
      continue;
15,228,330✔
1039
    }
1040

1041
    bool barrier = false;
2,783,079✔
1042
    if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) {
2,783,079✔
1043
      sError("vgId:%d, failed to replicate sync log entry since %s, index:%" PRId64 ", dest addr:0x%" PRIx64, pNode->vgId,
×
1044
             tstrerror(code), index, pDestId->addr);
1045
      goto _out;
×
1046
    }
1047
    if (barrier != pMgr->states[pos].barrier) {
2,783,079✔
1048
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1049
      goto _out;
×
1050
    }
1051
    pMgr->states[pos].timeMs = nowMs;
2,783,079✔
1052
    pMgr->states[pos].term = term;
2,783,079✔
1053
    pMgr->states[pos].acked = false;
2,783,079✔
1054

1055
    retried = true;
2,783,079✔
1056
    if (firstIndex == -1) firstIndex = index;
2,783,079✔
1057

1058
    if (batchSize <= count++) {
2,783,079✔
1059
      break;
5,959✔
1060
    }
1061
  }
1062

1063
_out:
178,480,592✔
1064
  if (retried) {
178,480,957✔
1065
    pMgr->retryBackoff = syncLogReplGetNextRetryBackoff(pMgr);
55,630✔
1066
    SSyncLogBuffer* pBuf = pNode->pLogBuf;
55,630✔
1067
    sInfo("vgId:%d, resend %d sync log entries, dest addr:0x%" PRIx64 ", indexes:%" PRId64 " ..., terms: .., %" PRId64
55,630✔
1068
          ", retryWaitMs:%" PRId64 ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64
1069
          " %" PRId64 ", %" PRId64 ")",
1070
          pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex,
1071
          pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1072
  }
1073
  TAOS_RETURN(code);
178,480,957✔
1074
}
1075

1076
int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
1,630,740✔
1077
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
1,630,740✔
1078
  SRaftId         destId = pMsg->srcId;
1,630,740✔
1079
  int32_t         code = 0;
1,630,740✔
1080
  if (pMgr->restored != false) return TSDB_CODE_SYN_INTERNAL_ERROR;
1,630,740✔
1081

1082
  sTrace("vgId:%d, begin to recover sync log repl, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 ", %" PRId64
1,630,740✔
1083
         ", %" PRId64 ") restore:%d, buffer: [%" PRId64 ", %" PRId64 ", %" PRId64 ", %" PRId64
1084
         "), msg: {lastSendIndex:%" PRId64 ", matchIndex:%" PRId64 ", fsmState:%d, success:%d, lastMatchTerm:%" PRId64
1085
         "}",
1086
         pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pMgr->restored,
1087
         pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, pMsg->lastSendIndex, pMsg->matchIndex,
1088
         pMsg->fsmState, pMsg->success, pMsg->lastMatchTerm);
1089

1090
  if (pMgr->endIndex == 0) {
1,630,740✔
1091
    if (pMgr->startIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
635,627✔
1092
    if (pMgr->matchIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
635,627✔
1093
    if (pMsg->matchIndex < 0) {
635,627✔
1094
      pMgr->restored = true;
90,365✔
1095
      sInfo("vgId:%d, sync log repl restored, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
90,365✔
1096
            "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1097
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1098
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1099
      return 0;
90,365✔
1100
    }
1101
  } else {
1102
    if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) {
995,113✔
1103
      TAOS_CHECK_RETURN(syncLogReplRetryOnNeed(pMgr, pNode));
2,506✔
1104
      return 0;
2,506✔
1105
    }
1106

1107
    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
992,607✔
1108

1109
    if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) {
992,607✔
1110
      pMgr->matchIndex = pMsg->matchIndex;
891,166✔
1111
      pMgr->restored = true;
891,166✔
1112
      sInfo("vgId:%d, sync log repl restored, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
891,166✔
1113
            "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1114
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1115
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1116
      return 0;
891,166✔
1117
    }
1118

1119
    if (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE || (!pMsg->success && pMsg->matchIndex >= pMsg->lastSendIndex)) {
101,441✔
1120
      char* msg1 = " rollback match index failure";
×
1121
      char* msg2 = " incomplete fsm state";
×
1122
      sTrace("vgId:%d, is going to trigger snapshot to dnode:%d by append reply, reason:%s, match index:%" PRId64
×
1123
             ", last sent:%" PRId64,
1124
             pNode->vgId, DID(&destId), (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1), pMsg->matchIndex,
1125
             pMsg->lastSendIndex);
1126
      pNode->snapSeq = -1;
×
1127
      if ((code = syncNodeStartSnapshot(pNode, &destId, (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1))) <
×
1128
          0) {
1129
        sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
×
1130
        TAOS_RETURN(code);
×
1131
      }
1132
      return 0;
×
1133
    }
1134
  }
1135

1136
  // check last match term
1137
  SyncTerm  term = -1;
646,703✔
1138
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
646,703✔
1139
  SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex);
646,703✔
1140
  SET_ERRNO(0);
646,703✔
1141

1142
  if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) {
646,703✔
1143
    code = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1, &term);
142,984✔
1144
    if (term < 0 && (ERRNO == ENFILE || ERRNO == EMFILE || ERRNO == ENOENT)) {
142,984✔
1145
      sError("vgId:%d, failed to get prev log term since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index + 1);
×
1146
      TAOS_RETURN(code);
×
1147
    }
1148

1149
    if (pMsg->matchIndex == -1) {
142,984✔
1150
      // first time to restore
1151
      sInfo("vgId:%d, first time to restore sync log repl, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64
68,005✔
1152
            ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), index:%" PRId64
1153
            ", firstVer:%" PRId64 ", term:%" PRId64 ", lastMatchTerm:%" PRId64,
1154
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1155
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, index, firstVer, term,
1156
            pMsg->lastMatchTerm);
1157
    }
1158

1159
    if ((index + 1 < firstVer) || (term < 0) ||
142,984✔
1160
        (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
113,777✔
1161
      if (!(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST)) return TSDB_CODE_SYN_INTERNAL_ERROR;
29,862✔
1162
      sTrace("vgId:%d, is going to trigger snapshot to dnode:%d by append reply, index:%" PRId64 ", firstVer:%" PRId64
29,862✔
1163
             ", term:%" PRId64 ", lastMatchTerm:%" PRId64,
1164
             pNode->vgId, DID(&destId), index, firstVer, term, pMsg->lastMatchTerm);
1165
      char reason[100] = {0};
29,862✔
1166
      if (index + 1 < firstVer)
29,862✔
1167
        tsnprintf(reason, 100, "matched entry not in log range, index:%" PRId64 ", firstVer:%" PRId64, index, firstVer);
29,207✔
1168
      else if (term < 0)
655✔
1169
        tsnprintf(reason, 100, "failed to get prev log term");
×
1170
      else
1171
        tsnprintf(reason, 100, "log term mismatch");
655✔
1172
      pNode->snapSeq = -1;
29,862✔
1173
      if ((code = syncNodeStartSnapshot(pNode, &destId, reason)) < 0) {
29,862✔
1174
        sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
×
1175
        TAOS_RETURN(code);
×
1176
      }
1177
      return 0;
29,862✔
1178
    }
1179

1180
    if (!(index + 1 >= firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR;
113,122✔
1181

1182
    if (term == pMsg->lastMatchTerm) {
113,122✔
1183
      index = index + 1;
112,922✔
1184
      if (!(index <= pNode->pLogBuf->matchIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR;
112,922✔
1185
    } else {
1186
      if (!(index > firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR;
200✔
1187
    }
1188
  }
1189

1190
  // attempt to replicate the raft log at index
1191
  syncLogReplReset(pMgr);
616,841✔
1192
  return syncLogReplProbe(pMgr, pNode, index);
616,841✔
1193
}
1194

1195
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
23,234,484✔
1196
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
23,234,484✔
1197
  (void)taosThreadMutexLock(&pMgr->mutex);
23,234,484✔
1198
  if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
23,234,484✔
1199
    sInfo("vgId:%d, reset sync log repl in heartbeat, peer addr:0x%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64,
945,432✔
1200
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
1201
    syncLogReplReset(pMgr);
945,432✔
1202
    pMgr->peerStartTime = pMsg->startTime;
945,432✔
1203
  }
1204
  (void)taosThreadMutexUnlock(&pMgr->mutex);
23,234,484✔
1205
  return 0;
23,234,484✔
1206
}
1207

1208
int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
95,349,029✔
1209
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
95,349,029✔
1210
  (void)taosThreadMutexLock(&pMgr->mutex);
95,349,029✔
1211
  if (pMsg->startTime != pMgr->peerStartTime) {
95,348,753✔
1212
    sInfo("vgId:%d, reset sync log repl in appendlog reply, peer addr:0x%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64,
79,926✔
1213
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
1214
    syncLogReplReset(pMgr);
79,926✔
1215
    pMgr->peerStartTime = pMsg->startTime;
79,926✔
1216
  }
1217
  (void)taosThreadMutexUnlock(&pMgr->mutex);
95,348,753✔
1218

1219
  (void)taosThreadMutexLock(&pBuf->mutex);
95,349,029✔
1220

1221
  int32_t code = 0;
95,349,029✔
1222
  if (pMgr->restored) {
95,349,029✔
1223
    if ((code = syncLogReplContinue(pMgr, pNode, pMsg)) != 0) {
93,718,289✔
1224
      sWarn("vgId:%d, failed to continue sync log repl since %s", pNode->vgId, tstrerror(code));
178✔
1225
    }
1226
  } else {
1227
    if ((code = syncLogReplRecover(pMgr, pNode, pMsg)) != 0) {
1,630,740✔
1228
      sWarn("vgId:%d, failed to recover sync log repl since %s", pNode->vgId, tstrerror(code));
×
1229
    }
1230
  }
1231
  (void)taosThreadMutexUnlock(&pBuf->mutex);
95,348,479✔
1232
  return 0;
95,348,755✔
1233
}
1234

1235
int32_t syncLogReplStart(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
86,230,896✔
1236
  if (pMgr->restored) {
86,230,896✔
1237
    TAOS_CHECK_RETURN(syncLogReplAttempt(pMgr, pNode));
84,773,925✔
1238
  } else {
1239
    TAOS_CHECK_RETURN(syncLogReplProbe(pMgr, pNode, pNode->pLogBuf->matchIndex));
1,456,971✔
1240
  }
1241
  return 0;
86,217,798✔
1242
}
1243

1244
int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
2,074,137✔
1245
  if (pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR;
2,074,137✔
1246
  if (!(pMgr->startIndex >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
2,074,137✔
1247
  int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs();
2,074,137✔
1248
  int64_t nowMs = taosGetMonoTimestampMs();
2,074,137✔
1249
  int32_t code = 0;
2,074,137✔
1250

1251
  sTrace("vgId:%d, begin to probe peer addr:0x%" PRIx64 " with msg of index:%" PRId64 ", repl-mgr:[%" PRId64 ", %" PRId64
2,074,137✔
1252
         ", %" PRId64 "), restored:%d",
1253
         pNode->vgId, pNode->replicasId[pMgr->peerId].addr, index, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1254
         pMgr->restored);
1255

1256
  if (pMgr->endIndex > pMgr->startIndex &&
2,074,137✔
1257
      nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) {
363,851✔
1258
    return 0;
300,950✔
1259
  }
1260
  syncLogReplReset(pMgr);
1,773,187✔
1261

1262
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
1,773,187✔
1263
  bool     barrier = false;
1,773,187✔
1264
  SyncTerm term = -1;
1,773,187✔
1265
  if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) {
1,773,187✔
1266
    sError("vgId:%d, failed to replicate log entry since %s, index:%" PRId64 ", dest addr:0x%016" PRIx64, pNode->vgId,
×
1267
           tstrerror(code), index, pDestId->addr);
1268
    TAOS_RETURN(code);
×
1269
  }
1270

1271
  if (!(index >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
1,773,187✔
1272
  pMgr->states[index % pMgr->size].barrier = barrier;
1,773,187✔
1273
  pMgr->states[index % pMgr->size].timeMs = nowMs;
1,773,187✔
1274
  pMgr->states[index % pMgr->size].term = term;
1,773,187✔
1275
  pMgr->states[index % pMgr->size].acked = false;
1,773,187✔
1276

1277
  pMgr->startIndex = index;
1,773,187✔
1278
  pMgr->endIndex = index + 1;
1,773,187✔
1279

1280
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
1,773,187✔
1281
  sTrace("vgId:%d, probe peer addr:0x%" PRIx64 " with msg of index:%" PRId64 " term:%" PRId64 ", repl-mgr:[%" PRId64
1,773,187✔
1282
         " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1283
         pNode->vgId, pDestId->addr, index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex,
1284
         pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1285
  return 0;
1,773,187✔
1286
}
1287

1288
int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
178,490,325✔
1289
  if (!pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR;
178,490,325✔
1290

1291
  sTrace("vgId:%d, replicate raft entries from end to match, repl-mgr:[%" PRId64 ", %" PRId64 ", %" PRId64
178,490,051✔
1292
         "), restore:%d",
1293
         pNode->vgId, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pMgr->restored);
1294

1295
  SRaftId*  pDestId = &pNode->replicasId[pMgr->peerId];
178,490,051✔
1296
  int32_t   batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
178,490,053✔
1297
  int32_t   code = 0;
178,490,137✔
1298
  int32_t   count = 0;
178,490,137✔
1299
  int64_t   nowMs = taosGetMonoTimestampMs();
178,490,345✔
1300
  int64_t   limit = pMgr->size >> 1;
178,490,345✔
1301
  SyncTerm  term = -1;
178,491,089✔
1302
  SyncIndex firstIndex = -1;
178,491,089✔
1303

1304
  for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) {
269,436,463✔
1305
    if (batchSize < count || limit <= index - pMgr->startIndex) {
98,575,961✔
1306
      break;
1307
    }
1308
    if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) {
98,529,741✔
1309
      break;
6,629,684✔
1310
    }
1311
    int64_t  pos = index % pMgr->size;
91,898,548✔
1312
    SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
91,899,220✔
1313
    bool     barrier = false;
91,898,927✔
1314
    SyncTerm term = -1;
91,898,626✔
1315

1316
    code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier);
91,899,753✔
1317
    if (code < 0) {
91,901,180✔
1318
      sError("vgId:%d, failed to replicate log entry since %s, index:%" PRId64 ", dest addr:0x%016" PRIx64, pNode->vgId,
11,549✔
1319
             tstrerror(code), index, pDestId->addr);
1320
      TAOS_RETURN(code);
11,549✔
1321
    }
1322
    pMgr->states[pos].barrier = barrier;
91,889,631✔
1323
    pMgr->states[pos].timeMs = nowMs;
91,889,631✔
1324
    pMgr->states[pos].term = term;
91,889,355✔
1325
    pMgr->states[pos].acked = false;
91,889,631✔
1326

1327
    if (firstIndex == -1) {
91,889,631✔
1328
      firstIndex = index;
81,138,709✔
1329
    }
1330

1331
    count++;
91,889,631✔
1332

1333
    pMgr->endIndex = index + 1;
91,889,631✔
1334
    if (barrier) {
91,889,631✔
1335
      sInfo("vgId:%d, replicated sync barrier to dnode:%d, index:%" PRId64 ", term:%" PRId64 ", repl-mgr:[%" PRId64
944,257✔
1336
            " %" PRId64 ", %" PRId64 ")",
1337
            pNode->vgId, DID(pDestId), index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
1338
      break;
944,257✔
1339
    }
1340
  }
1341

1342
  TAOS_CHECK_RETURN(syncLogReplRetryOnNeed(pMgr, pNode));
178,479,350✔
1343

1344
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
178,478,380✔
1345
  sTrace("vgId:%d, replicated %d msgs to peer addr:0x%" PRIx64 ", indexes:%" PRId64 "..., terms: ...%" PRId64
178,478,958✔
1346
         ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
1347
         ")",
1348
         pNode->vgId, count, pDestId->addr, firstIndex, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1349
         pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1350
  return 0;
178,477,808✔
1351
}
1352

1353
int32_t syncLogReplContinue(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
93,718,013✔
1354
  if (pMgr->restored != true) return TSDB_CODE_SYN_INTERNAL_ERROR;
93,718,013✔
1355
  if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) {
93,718,013✔
1356
    if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) {
90,620,630✔
1357
      int64_t firstMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs;
×
1358
      int64_t lastMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs;
×
1359
      int64_t diffMs = lastMs - firstMs;
×
1360
      if (diffMs > 0 && diffMs < ((int64_t)SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) {
×
1361
        pMgr->retryBackoff -= 1;
×
1362
      }
1363
    }
1364
    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
90,620,354✔
1365
    pMgr->matchIndex = TMAX(pMgr->matchIndex, pMsg->matchIndex);
90,620,906✔
1366
    for (SyncIndex index = pMgr->startIndex; index < pMgr->matchIndex; index++) {
181,694,834✔
1367
      (void)memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
91,074,204✔
1368
    }
1369
    pMgr->startIndex = pMgr->matchIndex;
90,620,906✔
1370
  }
1371

1372
  return syncLogReplAttempt(pMgr, pNode);
93,717,463✔
1373
}
1374

1375
SSyncLogReplMgr* syncLogReplCreate() {
64,025,889✔
1376
  SSyncLogReplMgr* pMgr = taosMemoryCalloc(1, sizeof(SSyncLogReplMgr));
64,025,889✔
1377
  if (pMgr == NULL) {
64,023,375✔
1378
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1379
    return NULL;
×
1380
  }
1381

1382
  pMgr->size = sizeof(pMgr->states) / sizeof(pMgr->states[0]);
64,023,375✔
1383

1384
  if (pMgr->size != TSDB_SYNC_LOG_BUFFER_SIZE) {
64,023,375✔
1385
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1386
    return NULL;
×
1387
  }
1388

1389
  int32_t code = taosThreadMutexInit(&pMgr->mutex, NULL);
64,023,375✔
1390
  if (code) {
64,025,628✔
1391
    terrno = code;
×
1392
    return NULL;
×
1393
  }
1394

1395
  return pMgr;
64,025,628✔
1396
}
1397

1398
void syncLogReplDestroy(SSyncLogReplMgr* pMgr) {
63,841,625✔
1399
  if (pMgr == NULL) {
63,841,625✔
1400
    return;
×
1401
  }
1402
  (void)taosThreadMutexDestroy(&pMgr->mutex);
63,841,625✔
1403
  taosMemoryFree(pMgr);
63,849,771✔
1404
  return;
63,831,437✔
1405
}
1406

1407
int32_t syncNodeLogReplInit(SSyncNode* pNode) {
4,267,288✔
1408
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
68,293,572✔
1409
    if (pNode->logReplMgrs[i] != NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
64,025,071✔
1410
    pNode->logReplMgrs[i] = syncLogReplCreate();
64,025,955✔
1411
    if (pNode->logReplMgrs[i] == NULL) {
64,025,628✔
1412
      TAOS_RETURN(terrno);
×
1413
    }
1414
    pNode->logReplMgrs[i]->peerId = i;
64,025,628✔
1415
  }
1416
  return 0;
4,268,501✔
1417
}
1418

1419
void syncNodeLogReplDestroy(SSyncNode* pNode) {
4,257,159✔
1420
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
68,095,447✔
1421
    syncLogReplDestroy(pNode->logReplMgrs[i]);
63,838,288✔
1422
    pNode->logReplMgrs[i] = NULL;
63,831,721✔
1423
  }
1424
}
4,257,159✔
1425

1426
int32_t syncLogBufferCreate(SSyncLogBuffer** ppBuf) {
4,268,184✔
1427
  int32_t         code = 0;
4,268,184✔
1428
  SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer));
4,268,184✔
1429
  if (pBuf == NULL) {
4,268,501✔
1430
    TAOS_CHECK_GOTO(terrno, NULL, _exit);
×
1431
  }
1432

1433
  pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]);
4,268,501✔
1434

1435
  if (pBuf->size != TSDB_SYNC_LOG_BUFFER_SIZE) {
4,268,501✔
1436
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1437
    goto _exit;
×
1438
  }
1439

1440
  if (taosThreadMutexAttrInit(&pBuf->attr) < 0) {
4,268,501✔
1441
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1442
    sError("failed to init log buffer mutexattr due to %s", tstrerror(code));
×
1443
    goto _exit;
×
1444
  }
1445

1446
  if (taosThreadMutexAttrSetType(&pBuf->attr, PTHREAD_MUTEX_RECURSIVE) < 0) {
4,268,501✔
1447
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1448
    sError("failed to set log buffer mutexattr type due to %s", tstrerror(code));
×
1449
    goto _exit;
×
1450
  }
1451

1452
  if (taosThreadMutexInit(&pBuf->mutex, &pBuf->attr) < 0) {
4,268,501✔
1453
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1454
    sError("failed to init log buffer mutex due to %s", tstrerror(code));
×
1455
    goto _exit;
×
1456
  }
1457
_exit:
4,268,501✔
1458
  if (code != 0) {
4,268,501✔
1459
    taosMemoryFreeClear(pBuf);
×
1460
  }
1461
  *ppBuf = pBuf;
4,268,501✔
1462
  TAOS_RETURN(code);
4,268,501✔
1463
}
1464

1465
void syncLogBufferClear(SSyncLogBuffer* pBuf) {
4,257,159✔
1466
  (void)taosThreadMutexLock(&pBuf->mutex);
4,257,159✔
1467
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
195,437,682✔
1468
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
191,181,603✔
1469
    if (pEntry == NULL) continue;
191,202,766✔
1470
    syncEntryDestroy(pEntry);
191,128,490✔
1471
    pEntry = NULL;
191,060,467✔
1472
    (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
191,060,467✔
1473
  }
1474
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
4,257,159✔
1475
  pBuf->bytes = 0;
4,257,159✔
1476
  (void)taosThreadMutexUnlock(&pBuf->mutex);
4,257,159✔
1477
}
4,257,159✔
1478

1479
void syncLogBufferDestroy(SSyncLogBuffer* pBuf) {
4,257,159✔
1480
  if (pBuf == NULL) {
4,257,159✔
1481
    return;
×
1482
  }
1483
  syncLogBufferClear(pBuf);
4,257,159✔
1484
  (void)taosThreadMutexDestroy(&pBuf->mutex);
4,257,159✔
1485
  (void)taosThreadMutexAttrDestroy(&pBuf->attr);
4,257,159✔
1486
  taosMemoryFree(pBuf);
4,257,159✔
1487
  return;
4,257,159✔
1488
}
1489

1490
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
5,571,129✔
1491
  int32_t code = 0;
5,571,129✔
1492
  if (!(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR;
5,571,129✔
1493

1494
  if (toIndex == pBuf->endIndex) {
5,571,129✔
1495
    return 0;
5,566,630✔
1496
  }
1497

1498
  sInfo("vgId:%d, rollback sync log buffer, toindex:%" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
4,499✔
1499
        ")",
1500
        pNode->vgId, toIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1501

1502
  // trunc buffer
1503
  SyncIndex index = pBuf->endIndex - 1;
4,499✔
1504
  while (index >= toIndex) {
75,248✔
1505
    SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem;
70,749✔
1506
    if (pEntry != NULL) {
70,749✔
1507
      syncEntryDestroy(pEntry);
4,776✔
1508
      pEntry = NULL;
4,776✔
1509
      (void)memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0]));
4,776✔
1510
    }
1511
    index--;
70,749✔
1512
  }
1513
  pBuf->endIndex = toIndex;
4,499✔
1514
  pBuf->matchIndex = TMIN(pBuf->matchIndex, index);
4,499✔
1515
  if (index + 1 != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
4,499✔
1516

1517
  // trunc wal
1518
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
4,499✔
1519
  if (lastVer >= toIndex && (code = pNode->pLogStore->syncLogTruncate(pNode->pLogStore, toIndex)) < 0) {
4,499✔
1520
    sError("vgId:%d, failed to truncate log store since %s, from index:%" PRId64, pNode->vgId, tstrerror(code),
×
1521
           toIndex);
1522
    TAOS_RETURN(code);
×
1523
  }
1524
  lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
4,499✔
1525
  if (toIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
4,499✔
1526

1527
  // refill buffer on need
1528
  if (toIndex <= pBuf->startIndex) {
4,499✔
1529
    if ((code = syncLogBufferInitWithoutLock(pBuf, pNode)) < 0) {
×
1530
      sError("vgId:%d, failed to refill sync log buffer since %s", pNode->vgId, tstrerror(code));
×
1531
      TAOS_RETURN(code);
×
1532
    }
1533
  }
1534

1535
  if (pBuf->endIndex != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
4,499✔
1536
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
4,499✔
1537
  return 0;
4,499✔
1538
}
1539

1540
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
5,569,579✔
1541
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
5,569,579✔
1542
  (void)taosThreadMutexLock(&pBuf->mutex);
5,569,579✔
1543
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
5,569,579✔
1544
  if (lastVer != pBuf->matchIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
5,568,924✔
1545
  SyncIndex index = pBuf->endIndex - 1;
5,568,924✔
1546

1547
  int32_t code = 0;
5,569,579✔
1548
  if ((code = syncLogBufferRollback(pBuf, pNode, pBuf->matchIndex + 1)) != 0) {
5,569,579✔
1549
    sError("vgId:%d, failed to rollback sync log buffer since %s", pNode->vgId, tstrerror(code));
×
1550
  }
1551

1552
  sInfo("vgId:%d, reset sync log buffer, buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
5,569,579✔
1553
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1554

1555
  pBuf->endIndex = pBuf->matchIndex + 1;
5,569,579✔
1556

1557
  // reset repl mgr
1558
  for (int i = 0; i < pNode->totalReplicaNum; i++) {
16,832,998✔
1559
    SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
11,263,718✔
1560
    syncLogReplReset(pMgr);
11,263,718✔
1561
  }
1562
  (void)taosThreadMutexUnlock(&pBuf->mutex);
5,569,280✔
1563
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
5,569,579✔
1564
  return 0;
5,568,955✔
1565
}
1566

1567
int32_t syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf,
1,828,671,834✔
1568
                                 SSyncRaftEntry** ppEntry) {
1569
  int32_t code = 0;
1,828,671,834✔
1570

1571
  *ppEntry = NULL;
1,828,671,834✔
1572

1573
  if (index >= pBuf->endIndex) {
1,828,697,100✔
1574
    return TSDB_CODE_OUT_OF_RANGE;
778,993,629✔
1575
  }
1576

1577
  if (index > pBuf->startIndex) {  // startIndex might be dummy
1,049,712,177✔
1578
    *pInBuf = true;
1,040,011,472✔
1579
    *ppEntry = pBuf->entries[index % pBuf->size].pItem;
1,040,036,611✔
1580
#ifdef USE_MOUNT
1581
    if (pNode->mountVgId) {
1,040,024,323✔
1582
      SMsgHead* pMsgHead = (SMsgHead*)(*ppEntry)->data;
10,268✔
1583
      if (pMsgHead->vgId != pNode->vgId) pMsgHead->vgId = pNode->vgId;
10,268✔
1584
    }
1585
#endif
1586
  } else {
1587
    *pInBuf = false;
9,671,751✔
1588

1589
    if ((code = pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, index, ppEntry)) < 0) {
9,671,751✔
1590
      sWarn("vgId:%d, failed to get log entry since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
210,201✔
1591
    }
1592
  }
1593

1594
  TAOS_RETURN(code);
1,049,728,193✔
1595
}
1596

1597
int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId,
96,456,597✔
1598
                          bool* pBarrier) {
1599
  SSyncRaftEntry* pEntry = NULL;
96,456,597✔
1600
  SRpcMsg         msgOut = {0};
96,457,172✔
1601
  bool            inBuf = false;
96,456,843✔
1602
  SyncTerm        prevLogTerm = -1;
96,455,790✔
1603
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
96,455,790✔
1604
  int32_t         code = 0;
96,456,346✔
1605
  int32_t         lino = 0;
96,456,346✔
1606

1607
  code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry);
96,456,072✔
1608
  if (pEntry == NULL) {
96,455,642✔
1609
    sWarn("vgId:%d, failed to get raft entry for index:%" PRId64, pNode->vgId, index);
11,549✔
1610
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) {
11,549✔
1611
      SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
11,549✔
1612
      if (pMgr) {
11,549✔
1613
        sInfo("vgId:%d, reset sync log repl of peer addr:0x%" PRIx64 " since %s, index:%" PRId64, pNode->vgId, pDestId->addr,
11,549✔
1614
              tstrerror(code), index);
1615
        syncLogReplReset(pMgr);
11,549✔
1616
      }
1617
    }
1618
    goto _err;
11,549✔
1619
  }
1620
  *pBarrier = syncLogReplBarrier(pEntry);
192,888,269✔
1621

1622
  code = syncLogReplGetPrevLogTerm(pMgr, pNode, index, &prevLogTerm);
96,444,340✔
1623
  if (prevLogTerm < 0) {
96,442,251✔
1624
    sError("vgId:%d, failed to get prev log term since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
×
1625
    goto _err;
×
1626
  }
1627
  if (pTerm) *pTerm = pEntry->term;
96,442,251✔
1628

1629
  code = syncBuildAppendEntriesFromRaftEntry(pNode, pEntry, prevLogTerm, &msgOut);
96,442,247✔
1630
  if (code < 0) {
96,443,229✔
1631
    sError("vgId:%d, failed to get append entries for index:%" PRId64, pNode->vgId, index);
×
1632
    goto _err;
×
1633
  }
1634

1635
  TRACE_SET_MSGID(&(msgOut.info.traceId), tGenIdPI64());
96,443,229✔
1636
  sGDebug(&msgOut.info.traceId,
96,443,345✔
1637
          "vgId:%d, index:%" PRId64 ", replicate one msg to dest addr:0x%" PRIx64 ", term:%" PRId64 " prevterm:%" PRId64,
1638
          pNode->vgId, pEntry->index, pDestId->addr, pEntry->term, prevLogTerm);
1639

1640
  TAOS_CHECK_GOTO(syncNodeSendAppendEntries(pNode, pDestId, &msgOut), &lino, _err);
96,443,345✔
1641
  if (!inBuf) {
96,445,897✔
1642
    syncEntryDestroy(pEntry);
8,610,377✔
1643
    pEntry = NULL;
8,610,377✔
1644
  }
1645
  return 0;
96,445,897✔
1646

1647
_err:
11,549✔
1648
  rpcFreeCont(msgOut.pCont);
11,549✔
1649
  msgOut.pCont = NULL;
11,549✔
1650
  if (!inBuf) {
11,549✔
1651
    syncEntryDestroy(pEntry);
11,549✔
1652
    pEntry = NULL;
11,549✔
1653
  }
1654
  TAOS_RETURN(code);
11,549✔
1655
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc