• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.81
/source/libs/sync/src/syncPipeline.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17

18
#include "syncCommit.h"
19
#include "syncIndexMgr.h"
20
#include "syncInt.h"
21
#include "syncPipeline.h"
22
#include "syncRaftCfg.h"
23
#include "syncRaftEntry.h"
24
#include "syncRaftStore.h"
25
#include "syncReplication.h"
26
#include "syncRespMgr.h"
27
#include "syncSnapshot.h"
28
#include "syncUtil.h"
29
#include "syncVoteMgr.h"
30

31
static int64_t tsLogBufferMemoryUsed = 0;  // total bytes of vnode log buffer
32

33
static bool syncIsMsgBlock(tmsg_t type) {
461,338✔
34
  return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
451,876✔
35
         (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
913,214✔
36
}
37

38
FORCE_INLINE static int64_t syncGetRetryMaxWaitMs() {
39
  return SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF);
6,844✔
40
}
41

42
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
1,981,026✔
43
  (void)taosThreadMutexLock(&pBuf->mutex);
1,981,026✔
44
  int64_t index = pBuf->endIndex;
1,981,037✔
45
  (void)taosThreadMutexUnlock(&pBuf->mutex);
1,981,037✔
46
  return index;
1,981,040✔
47
}
48

49
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
1,981,022✔
50
  int32_t code = 0;
1,981,022✔
51
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
1,981,022!
52
  (void)taosThreadMutexLock(&pBuf->mutex);
1,981,027✔
53
  SyncIndex index = pEntry->index;
1,981,034✔
54

55
  if (index - pBuf->startIndex >= pBuf->size) {
1,981,034!
56
    code = TSDB_CODE_SYN_BUFFER_FULL;
×
57
    sError("vgId:%d, failed to append since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
×
58
    goto _err;
×
59
  }
60

61
  if (pNode->restoreFinish && index - pBuf->commitIndex >= TSDB_SYNC_NEGOTIATION_WIN) {
1,981,034!
62
    code = TSDB_CODE_SYN_NEGOTIATION_WIN_FULL;
×
63
    sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64, pNode->vgId, tstrerror(code),
×
64
           index, pBuf->commitIndex);
65
    goto _err;
×
66
  }
67

68
  SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
1,981,034✔
69
  if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= tsSyncApplyQueueSize) {
1,981,026!
70
    code = TSDB_CODE_SYN_WRITE_STALL;
×
71
    sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64,
×
72
           pNode->vgId, tstrerror(code), index, pBuf->commitIndex, appliedIndex);
73
    goto _err;
×
74
  }
75

76
  if (index != pBuf->endIndex) {
1,981,026!
77
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
78
    goto _err;
×
79
  };
80

81
  SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem;
1,981,026✔
82
  if (pExist != NULL) {
1,981,026!
83
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
84
    goto _err;
×
85
  }
86

87
  // initial log buffer with at least one item, e.g, commitIndex
88
  SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem;
1,981,026✔
89
  if (pMatch == NULL) {
1,981,026!
90
    sError("vgId:%d, no matched log entry", pNode->vgId);
×
91
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
92
    goto _err;
×
93
  }
94
  if (pMatch->index + 1 != index) {
1,981,026!
95
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
96
    goto _err;
×
97
  }
98
  if (!(pMatch->term <= pEntry->term)) {
1,981,026!
99
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
100
    goto _err;
×
101
  }
102

103
  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
1,981,026✔
104
  pBuf->entries[index % pBuf->size] = tmp;
1,981,026✔
105
  pBuf->endIndex = index + 1;
1,981,026✔
106
  if (pNode->vgId > 1) {
1,981,026✔
107
    pBuf->bytes += pEntry->bytes;
1,881,302✔
108
    (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
1,881,302✔
109
  }
110

111
  (void)taosThreadMutexUnlock(&pBuf->mutex);
1,981,034✔
112
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
1,981,032!
113
  return 0;
1,981,030✔
114

115
_err:
×
116
  (void)taosThreadMutexUnlock(&pBuf->mutex);
×
117
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
×
118
  taosMsleep(1);
×
119
  TAOS_RETURN(code);
×
120
}
121

122
int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pSyncTerm) {
252,018✔
123
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
252,018✔
124
  SSyncRaftEntry* pEntry = NULL;
252,018✔
125
  SyncIndex       prevIndex = index - 1;
252,018✔
126
  SyncTerm        prevLogTerm = -1;
252,018✔
127
  int32_t         code = 0;
252,018✔
128

129
  if (prevIndex == -1 && pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore) == 0) {
252,018✔
130
    *pSyncTerm = 0;
1,880✔
131
    return 0;
1,880✔
132
  }
133

134
  if (prevIndex > pBuf->matchIndex) {
250,138✔
135
    *pSyncTerm = -1;
17✔
136
    TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
17✔
137
  }
138

139
  if (index - 1 != prevIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
250,121!
140

141
  if (prevIndex >= pBuf->startIndex) {
250,121✔
142
    pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem;
222,690✔
143
    if (pEntry == NULL) {
222,690!
144
      sError("vgId:%d, failed to get pre log term since no log entry found", pNode->vgId);
×
145
      *pSyncTerm = -1;
×
146
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
147
    }
148
    prevLogTerm = pEntry->term;
222,690✔
149
    *pSyncTerm = prevLogTerm;
222,690✔
150
    return 0;
222,690✔
151
  }
152

153
  if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) {
27,431✔
154
    int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs;
26,285✔
155
    if (timeMs == 0) {
26,285!
156
      sError("vgId:%d, failed to get pre log term since timeMs is 0", pNode->vgId);
×
157
      *pSyncTerm = -1;
×
158
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
159
    }
160
    prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term;
26,285✔
161
    if (!(prevIndex == 0 || prevLogTerm != 0)) {
26,285!
162
      sError("vgId:%d, failed to get pre log term prevIndex:%" PRId64 ", prevLogTerm:%" PRId64, pNode->vgId, prevIndex,
×
163
             prevLogTerm);
164
      *pSyncTerm = -1;
×
165
      TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
×
166
    }
167
    *pSyncTerm = prevLogTerm;
26,285✔
168
    return 0;
26,285✔
169
  }
170

171
  SSnapshot snapshot = {0};
1,146✔
172
  (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
1,146✔
173
  if (prevIndex == snapshot.lastApplyIndex) {
1,145!
174
    *pSyncTerm = snapshot.lastApplyTerm;
×
175
    return 0;
×
176
  }
177

178
  if ((code = pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, prevIndex, &pEntry)) == 0) {
1,145✔
179
    prevLogTerm = pEntry->term;
1,049✔
180
    syncEntryDestroy(pEntry);
1,049✔
181
    pEntry = NULL;
1,049✔
182
    *pSyncTerm = prevLogTerm;
1,049✔
183
    return 0;
1,049✔
184
  }
185

186
  *pSyncTerm = -1;
96✔
187
  sInfo("vgId:%d, failed to get log term since %s, index:%" PRId64, pNode->vgId, tstrerror(code), prevIndex);
96!
188
  TAOS_RETURN(code);
96✔
189
}
190

191
SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId) {
14,429✔
192
  return syncEntryBuildNoop(term, index, vgId);
14,429✔
193
}
194

195
int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex) {
14,431✔
196
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
14,431✔
197
  if (firstVer > commitIndex + 1) {
14,431!
198
    sError("vgId:%d, firstVer of WAL log greater than tsdb commit version + 1, firstVer:%" PRId64
×
199
           ", tsdb commit version:%" PRId64,
200
           pNode->vgId, firstVer, commitIndex);
201
    return TSDB_CODE_WAL_LOG_INCOMPLETE;
×
202
  }
203

204
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
14,431✔
205
  if (lastVer < commitIndex) {
14,431!
206
    sError("vgId:%d, lastVer of WAL log less than tsdb commit version, lastVer:%" PRId64
×
207
           ", tsdb commit version:%" PRId64,
208
           pNode->vgId, lastVer, commitIndex);
209
    return TSDB_CODE_WAL_LOG_INCOMPLETE;
×
210
  }
211

212
  return 0;
14,431✔
213
}
214

215
int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
14,431✔
216
  if (pNode->pLogStore == NULL) {
14,431!
217
    sError("log store not created");
×
218
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
219
  }
220
  if (pNode->pFsm == NULL) {
14,431!
221
    sError("pFsm not registered");
×
222
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
223
  }
224
  if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
14,431!
225
    sError("FpGetSnapshotInfo not registered");
×
226
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
227
  }
228

229
  int32_t   code = 0, lino = 0;
14,431✔
230
  SSnapshot snapshot = {0};
14,431✔
231
  TAOS_CHECK_EXIT(pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot));
14,431!
232

233
  SyncIndex commitIndex = snapshot.lastApplyIndex;
14,431✔
234
  SyncTerm  commitTerm = TMAX(snapshot.lastApplyTerm, 0);
14,431✔
235
  TAOS_CHECK_EXIT(syncLogValidateAlignmentOfCommit(pNode, commitIndex));
14,431!
236

237
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
14,431✔
238
  if (lastVer < commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
14,429!
239
  ;
240
  SyncIndex toIndex = lastVer;
14,429✔
241
  // update match index
242
  pBuf->commitIndex = commitIndex;
14,429✔
243
  pBuf->matchIndex = toIndex;
14,429✔
244
  pBuf->endIndex = toIndex + 1;
14,429✔
245

246
  // load log entries in reverse order
247
  SSyncLogStore*  pLogStore = pNode->pLogStore;
14,429✔
248
  SyncIndex       index = toIndex;
14,429✔
249
  SSyncRaftEntry* pEntry = NULL;
14,429✔
250
  bool            takeDummy = false;
14,429✔
251
  int             emptySize = (TSDB_SYNC_LOG_BUFFER_SIZE >> 1);
14,429✔
252

253
  while (true) {
136,927✔
254
    if (index <= pBuf->commitIndex) {
151,356✔
255
      takeDummy = true;
14,429✔
256
      break;
14,429✔
257
    }
258

259
    if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) {
136,927!
260
      sWarn("vgId:%d, failed to get log entry since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
×
261
      break;
×
262
    }
263
#ifdef USE_MOUNT
264
    if (pNode->mountVgId) {
136,929✔
265
      SMsgHead* pMsgHead = (SMsgHead*)pEntry->data;
32✔
266
      if (pMsgHead->vgId != pNode->vgId) pMsgHead->vgId = pNode->vgId;
32!
267
    }
268
#endif
269
    bool taken = false;
136,929✔
270
    if (toIndex - index + 1 <= pBuf->size - emptySize) {
136,929!
271
      SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1};
136,929✔
272
      pBuf->entries[index % pBuf->size] = tmp;
136,929✔
273
      taken = true;
136,929✔
274
      if (pNode->vgId > 1) {
136,929✔
275
        pBuf->bytes += pEntry->bytes;
136,098✔
276
        (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
136,098✔
277
      }
278
    }
279

280
    if (index < toIndex) {
136,927✔
281
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = pEntry->index;
134,820✔
282
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = pEntry->term;
134,820✔
283
    }
284

285
    if (!taken) {
136,927!
286
      syncEntryDestroy(pEntry);
×
287
      pEntry = NULL;
×
288
      break;
×
289
    }
290

291
    index--;
136,927✔
292
  }
293

294
  // put a dummy record at commitIndex if present in log buffer
295
  if (takeDummy) {
14,429!
296
    if (index != pBuf->commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
14,429!
297

298
    SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId);
14,429✔
299
    if (pDummy == NULL) {
14,431!
300
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
301
    }
302
    SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm};
14,431✔
303
    pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp;
14,431✔
304
    if (pNode->vgId > 1) {
14,431✔
305
      pBuf->bytes += pDummy->bytes;
12,502✔
306
      (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pDummy->bytes);
12,502✔
307
    }
308

309
    if (index < toIndex) {
14,432✔
310
      pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex;
2,107✔
311
      pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = commitTerm;
2,107✔
312
    }
313
  }
314

315
  // update startIndex
316
  pBuf->startIndex = takeDummy ? index : index + 1;
14,432!
317

318
  pBuf->isCatchup = false;
14,432✔
319

320
  sInfo("vgId:%d, init sync log buffer, buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
14,432✔
321
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
322

323
  // validate
324
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
14,433!
325
  return 0;
14,432✔
326

327
_exit:
×
328
  if (code != 0) {
×
329
    sError("vgId:%d, failed to initialize sync log buffer at line %d since %s.", pNode->vgId, lino, tstrerror(code));
×
330
  }
331
  TAOS_RETURN(code);
×
332
}
333

334
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
14,336✔
335
  (void)taosThreadMutexLock(&pBuf->mutex);
14,336✔
336
  int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
14,338✔
337
  (void)taosThreadMutexUnlock(&pBuf->mutex);
14,338✔
338
  return ret;
14,338✔
339
}
340

341
int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
94✔
342
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
94!
343
  (void)taosThreadMutexLock(&pBuf->mutex);
94✔
344
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
434✔
345
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
340✔
346
    if (pEntry == NULL) continue;
340✔
347
    syncEntryDestroy(pEntry);
97✔
348
    pEntry = NULL;
97✔
349
    (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
97✔
350
  }
351
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
94✔
352
  pBuf->bytes = 0;
94✔
353
  int32_t code = syncLogBufferInitWithoutLock(pBuf, pNode);
94✔
354
  if (code < 0) {
94!
355
    sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code));
×
356
  }
357
  (void)taosThreadMutexUnlock(&pBuf->mutex);
94✔
358
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
94!
359
  return code;
94✔
360
}
361

362
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTermWithoutLock(SSyncLogBuffer* pBuf) {
363
  SyncIndex       index = pBuf->matchIndex;
309,515✔
364
  SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
309,515✔
365
  if (pEntry == NULL) {
62,980!
366
    sError("failed to get last match term since entry is null");
×
367
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
368
    return -1;
×
369
  }
370
  return pEntry->term;
309,516✔
371
}
372

373
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
62,980✔
374
  (void)taosThreadMutexLock(&pBuf->mutex);
62,980✔
375
  SyncTerm term = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
62,980✔
376
  (void)taosThreadMutexUnlock(&pBuf->mutex);
62,980✔
377
  return term;
62,980✔
378
}
379

380
bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf) {
62,980✔
381
  (void)taosThreadMutexLock(&pBuf->mutex);
62,980✔
382
  bool empty = (pBuf->endIndex <= pBuf->startIndex);
62,980✔
383
  (void)taosThreadMutexUnlock(&pBuf->mutex);
62,980✔
384
  return empty;
62,980✔
385
}
386

387
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
246,535✔
388
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
246,535!
389
  (void)taosThreadMutexLock(&pBuf->mutex);
246,534✔
390
  int32_t         code = 0;
246,535✔
391
  SyncIndex       index = pEntry->index;
246,535✔
392
  SyncIndex       prevIndex = pEntry->index - 1;
246,535!
393
  SyncTerm        lastMatchTerm = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
246,536✔
394
  SSyncRaftEntry* pExist = NULL;
246,536✔
395
  bool            inBuf = true;
246,536✔
396

397
  if (lastMatchTerm < 0) {
246,536!
398
    sError("vgId:%d, failed to accept, lastMatchTerm:%" PRId64, pNode->vgId, lastMatchTerm);
×
399
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
400
    goto _out;
×
401
  }
402

403
  if (index <= pBuf->commitIndex) {
246,536✔
404
    sTrace("vgId:%d, already committed, index:%" PRId64 ", term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64
1,756!
405
           " %" PRId64 ", %" PRId64 ")",
406
           pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
407
           pBuf->endIndex);
408
    SyncTerm term = -1;
1,756✔
409
    code = syncLogReplGetPrevLogTerm(NULL, pNode, index + 1, &term);
1,756✔
410
    if (pEntry->term < 0) {
1,756!
411
      sError("vgId:%d, failed to accept, pEntry->term:%" PRId64, pNode->vgId, pEntry->term);
×
412
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
413
      goto _out;
×
414
    }
415
    if (term == pEntry->term) {
1,756!
416
      code = 0;
1,756✔
417
    }
418
    goto _out;
1,756✔
419
  }
420

421
  if (pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER && index > 0 &&
244,780✔
422
      index > pBuf->totalIndex) {
26,158✔
423
    pBuf->totalIndex = index;
953✔
424
    sTrace("vgId:%d, update learner progress, index:%" PRId64 ", term:%" PRId64 ": prevterm:%" PRId64
953!
425
           " != lastmatch:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
426
           pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
427
           pBuf->matchIndex, pBuf->endIndex);
428
  }
429

430
  if (index - pBuf->startIndex >= pBuf->size) {
244,780!
431
    sWarn("vgId:%d, out of buffer range, index:%" PRId64 ", term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64
×
432
          " %" PRId64 ", %" PRId64 ")",
433
          pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
434
          pBuf->endIndex);
435
    code = TSDB_CODE_OUT_OF_RANGE;
×
436
    goto _out;
×
437
  }
438

439
  if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) {
244,780✔
440
    sWarn("vgId:%d, not ready to accept, index:%" PRId64 ", term:%" PRId64 ": prevterm:%" PRId64
581!
441
          " != lastmatch:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
442
          pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
443
          pBuf->matchIndex, pBuf->endIndex);
444
    code = TSDB_CODE_ACTION_IN_PROGRESS;
581✔
445
    goto _out;
581✔
446
  }
447

448
  // check current in buffer
449
  code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pExist);
244,199✔
450
  if (pExist != NULL) {
244,199✔
451
    if (pEntry->index != pExist->index) {
1,734!
452
      sError("vgId:%d, failed to accept, pEntry->index:%" PRId64 ", pExist->index:%" PRId64, pNode->vgId, pEntry->index,
×
453
             pExist->index);
454
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
455
      goto _out;
×
456
    }
457
    if (pEntry->term != pExist->term) {
1,734✔
458
      TAOS_CHECK_GOTO(syncLogBufferRollback(pBuf, pNode, index), NULL, _out);
3!
459
    } else {
460
      sTrace("vgId:%d, duplicate log entry received, index:%" PRId64 ", term:%" PRId64 ", log buffer: [%" PRId64
1,731!
461
             " %" PRId64 " %" PRId64 ", %" PRId64 ")",
462
             pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
463
             pBuf->endIndex);
464
      SyncTerm existPrevTerm = -1;
1,731✔
465
      TAOS_CHECK_GOTO(syncLogReplGetPrevLogTerm(NULL, pNode, index, &existPrevTerm), NULL, _out);
1,731✔
466
      if (!(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm))) {
1,714!
467
        sError("vgId:%d, failed to accept, pEntry->term:%" PRId64 ", pExist->indexpExist->term:%" PRId64
×
468
               ", pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64 ", prevTerm:%" PRId64
469
               ", existPrevTerm:%" PRId64,
470
               pNode->vgId, pEntry->term, pExist->term, pEntry->index, pBuf->matchIndex, prevTerm, existPrevTerm);
471
        code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
472
        goto _out;
×
473
      }
474
      code = 0;
1,714✔
475
      goto _out;
1,714✔
476
    }
477
  }
478

479
  // update
480
  if (!(pBuf->startIndex < index)) {
242,468!
481
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
482
    goto _out;
×
483
  };
484
  if (!(index - pBuf->startIndex < pBuf->size)) {
242,468!
485
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
486
    goto _out;
×
487
  }
488
  if (pBuf->entries[index % pBuf->size].pItem != NULL) {
242,468!
489
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
490
    goto _out;
×
491
  }
492
  SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
242,468✔
493
  pBuf->entries[index % pBuf->size] = tmp;
242,468✔
494
  if (pNode->vgId > 1) {
242,468✔
495
    pBuf->bytes += pEntry->bytes;
223,789✔
496
    (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
223,789✔
497
  }
498
  pEntry = NULL;
242,467✔
499

500
  // update end index
501
  pBuf->endIndex = TMAX(index + 1, pBuf->endIndex);
242,467✔
502

503
  // success
504
  code = 0;
242,467✔
505

506
_out:
246,535✔
507
  syncEntryDestroy(pEntry);
246,535✔
508
  if (!inBuf) {
246,535!
509
    syncEntryDestroy(pExist);
×
510
    pExist = NULL;
×
511
  }
512
  (void)taosThreadMutexUnlock(&pBuf->mutex);
246,535✔
513
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
246,536!
514
  TAOS_RETURN(code);
246,536✔
515
}
516

517
static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replicaNum) {
2,223,461✔
518
  return (replicaNum > 1) && (pEntry->originalRpcType == TDMT_VND_COMMIT);
2,223,461✔
519
}
520

521
int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
2,223,459✔
522
  int32_t code = 0;
2,223,459✔
523
  if (pEntry->index < 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
2,223,459!
524
  SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore);
2,223,459✔
525
  if (lastVer >= pEntry->index && (code = pLogStore->syncLogTruncate(pLogStore, pEntry->index)) < 0) {
2,223,472!
526
    sError("failed to truncate log store since %s, from index:%" PRId64, tstrerror(code), pEntry->index);
×
527
    TAOS_RETURN(code);
×
528
  }
529
  lastVer = pLogStore->syncLogLastIndex(pLogStore);
2,223,472✔
530
  if (pEntry->index != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
2,223,473!
531

532
#ifdef USE_MOUNT
533
  if (pNode->mountVgId) {
2,223,473✔
534
    SMsgHead* pHead = (SMsgHead*)pEntry->data;
16✔
535
    if (pHead->vgId != pNode->mountVgId) pHead->vgId = pNode->mountVgId;
16!
536
  }
537
#endif
538
  bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum);
2,223,473✔
539
  if ((code = pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync)) < 0) {
2,223,463!
540
    sError("failed to persist raft entry since %s, index:%" PRId64 ", term:%" PRId64, tstrerror(code), pEntry->index,
×
541
           pEntry->term);
542
    TAOS_RETURN(code);
×
543
  }
544

545
  lastVer = pLogStore->syncLogLastIndex(pLogStore);
2,223,409✔
546
  if (pEntry->index != lastVer) return TSDB_CODE_SYN_INTERNAL_ERROR;
2,223,441!
547
  return 0;
2,223,441✔
548
}
549

550
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char* str, const SRpcMsg *pMsg) {
2,227,553✔
551
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
2,227,553!
552
  (void)taosThreadMutexLock(&pBuf->mutex);
2,227,559✔
553

554
  SSyncLogStore* pLogStore = pNode->pLogStore;
2,227,574✔
555
  int64_t        matchIndex = pBuf->matchIndex;
2,227,574✔
556
  int32_t        code = 0;
2,227,574✔
557

558
  while (pBuf->matchIndex + 1 < pBuf->endIndex) {
4,451,011✔
559
    int64_t index = pBuf->matchIndex + 1;
2,244,596✔
560
    if (index < 0) {
2,244,596!
561
      sError("vgId:%d, failed to proceed index:%" PRId64, pNode->vgId, index);
×
562
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
563
      goto _out;
×
564
    }
565

566
    // try to proceed
567
    SSyncLogBufEntry* pBufEntry = &pBuf->entries[index % pBuf->size];
2,244,596✔
568
    SyncIndex         prevLogIndex = pBufEntry->prevLogIndex;
2,244,596✔
569
    SyncTerm          prevLogTerm = pBufEntry->prevLogTerm;
2,244,596✔
570
    SSyncRaftEntry*   pEntry = pBufEntry->pItem;
2,244,596✔
571
    if (pEntry == NULL) {
2,244,596✔
572
      sTrace("vgId:%d, msg:%p, cannot proceed match index in log buffer, no raft entry at next pos of matchIndex:%" PRId64,
21,118!
573
             pNode->vgId, pMsg, pBuf->matchIndex);
574
      goto _out;
21,118✔
575
    }
576

577
    if (index != pEntry->index) {
2,223,478!
578
      sError("vgId:%d, msg:%p, failed to proceed index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId, pMsg, index, pEntry->index);
×
579
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
580
      goto _out;
×
581
    }
582

583
    // match
584
    SSyncRaftEntry* pMatch = pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem;
2,223,478✔
585
    if (pMatch == NULL) {
2,223,478!
586
      sError("vgId:%d, msg:%p, failed to proceed since pMatch is null", pNode->vgId, pMsg);
×
587
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
588
      goto _out;
×
589
    }
590
    if (pMatch->index != pBuf->matchIndex) {
2,223,478!
591
      sError("vgId:%d, msg:%p, failed to proceed, pMatch->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId,
×
592
             pMsg, pMatch->index, pBuf->matchIndex);
593
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
594
      goto _out;
×
595
    }
596
    if (pMatch->index + 1 != pEntry->index) {
2,223,478!
597
      sError("vgId:%d, msg:%p, failed to proceed, pMatch->index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId, pMsg,
×
598
             pMatch->index, pEntry->index);
599
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
600
      goto _out;
×
601
    }
602
    if (prevLogIndex != pMatch->index) {
2,223,478!
603
      sError("vgId:%d, msg:%p, failed to proceed, prevLogIndex:%" PRId64 ", pMatch->index:%" PRId64, pNode->vgId, pMsg,
×
604
             prevLogIndex, pMatch->index);
605
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
606
      goto _out;
×
607
    }
608

609
    if (pMatch->term != prevLogTerm) {
2,223,478!
610
      sInfo(
×
611
          "vgId:%d, msg:%p, mismatching sync log entries encountered, "
612
          "{ index:%" PRId64 ", term:%" PRId64
613
          " } "
614
          "{ index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64 ", prevLogTerm:%" PRId64 " } ",
615
          pNode->vgId, pMsg, pMatch->index, pMatch->term, pEntry->index, pEntry->term, prevLogIndex, prevLogTerm);
616
      goto _out;
×
617
    }
618

619
    // increase match index
620
    pBuf->matchIndex = index;
2,223,478✔
621

622
    sGDebug(pMsg ? &pMsg->info.traceId : NULL,
2,223,478!
623
            "vgId:%d, msg:%p, log buffer proceed, start index:%" PRId64 ", match index:%" PRId64 ", end index:%" PRId64,
624
            pNode->vgId, pMsg, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex);
625

626
    // persist
627
    if ((code = syncLogStorePersist(pLogStore, pNode, pEntry)) < 0) {
2,223,478!
628
      sError("vgId:%d, msg:%p, failed to persist sync log entry from buffer since %s, index:%" PRId64, pNode->vgId,
×
629
             pMsg, tstrerror(code), pEntry->index);
630
      taosMsleep(1);
×
631
      goto _out;
×
632
    }
633

634
    if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
2,223,441!
635
      if (pNode->pLogBuf->commitIndex == pEntry->index - 1) {
×
636
        sInfo(
×
637
            "vgId:%d, msg:%p, to change config at %s, "
638
            "current entry, index:%" PRId64 ", term:%" PRId64
639
            ", "
640
            "node, restore:%d, commitIndex:%" PRId64
641
            ", "
642
            "cond: (pre entry index:%" PRId64 "== buf commit index:%" PRId64 ")",
643
            pNode->vgId, pMsg, str, pEntry->index, pEntry->term, pNode->restoreFinish, pNode->commitIndex,
644
            pEntry->index - 1, pNode->pLogBuf->commitIndex);
645
        if ((code = syncNodeChangeConfig(pNode, pEntry, str)) != 0) {
×
646
          sError("vgId:%d, failed to change config from Append since %s, index:%" PRId64, pNode->vgId, tstrerror(code),
×
647
                 pEntry->index);
648
          goto _out;
×
649
        }
650
      } else {
651
        sInfo(
×
652
            "vgId:%d, msg:%p, delay change config from Node %s, "
653
            "curent entry, index:%" PRId64 ", term:%" PRId64
654
            ", "
655
            "node, commitIndex:%" PRId64 ",  pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
656
            "), "
657
            "cond:( pre entry index:%" PRId64 " != buf commit index:%" PRId64 ")",
658
            pNode->vgId, pMsg, str, pEntry->index, pEntry->term, pNode->commitIndex, pNode->pLogBuf->startIndex,
659
            pNode->pLogBuf->commitIndex, pNode->pLogBuf->matchIndex, pNode->pLogBuf->endIndex, pEntry->index - 1,
660
            pNode->pLogBuf->commitIndex);
661
      }
662
    }
663

664
    // replicate on demand
665
    if ((code = syncNodeReplicateWithoutLock(pNode)) != 0) {
2,223,441!
666
      sError("vgId:%d, msg:%p, failed to replicate since %s, index:%" PRId64, pNode->vgId, pMsg, tstrerror(code),
×
667
             pEntry->index);
668
      goto _out;
×
669
    }
670

671
    if (pEntry->index != pBuf->matchIndex) {
2,223,426!
672
      sError("vgId:%d, msg:%p, failed to proceed, pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId,
×
673
             pMsg, pEntry->index, pBuf->matchIndex);
674
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
675
      goto _out;
×
676
    }
677

678
    // update my match index
679
    matchIndex = pBuf->matchIndex;
2,223,426✔
680
    syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex);
2,223,426✔
681
  }  // end of while
682

683
_out:
2,206,415✔
684
  pBuf->matchIndex = matchIndex;
2,227,533✔
685
  if (pMatchTerm) {
2,227,533✔
686
    *pMatchTerm = pBuf->entries[(matchIndex + pBuf->size) % pBuf->size].pItem->term;
246,536✔
687
  }
688
  (void)taosThreadMutexUnlock(&pBuf->mutex);
2,227,533✔
689
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
2,227,555!
690
  return matchIndex;
2,227,530✔
691
}
692

693
int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
2,358,018✔
694
                       int32_t applyCode, bool force) {
695
  // learner need to execute fsm when it catch up entry log
696
  // if force is true, keep all contition check to execute fsm
697
  if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1 &&
2,358,018✔
698
      pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER && force == false) {
1,778,978✔
699
    sGDebug(&pEntry->originRpcTraceId,
1,777,691!
700
            "vgId:%d, index:%" PRId64 ", raft fsm no need to execute, term:%" PRId64
701
            ", type:%s code:0x%x, replica:%d, role:%d, restoreFinish:%d",
702
            pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode, pNode->replicaNum,
703
            pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole, pNode->restoreFinish);
704
    return 0;
1,777,690✔
705
  }
706

707
  if (pNode->vgId != 1 && syncIsMsgBlock(pEntry->originalRpcType)) {
580,327✔
708
    sTrace("vgId:%d, index:%" PRId64 ", blocking msg ready to execute, term:%" PRId64 ", type:%s code:0x%x",
17,601!
709
           pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode);
710
  }
711

712
  if (pEntry->originalRpcType == TDMT_VND_COMMIT) {
580,327✔
713
    sInfo("vgId:%d, index:%" PRId64 ", fsm execute vnode commit, term:%" PRId64, pNode->vgId, pEntry->index,
841!
714
          pEntry->term);
715
  }
716

717
  int32_t code = 0, lino = 0;
580,393✔
718
  bool    retry = false;
580,393✔
719
  do {
720
    SFsmCbMeta cbMeta = {0};
580,393✔
721
    cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
580,393✔
722
    if (cbMeta.lastConfigIndex < -1) {
580,392!
723
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
724
      if (terrno != 0) code = terrno;
×
725
      return code;
×
726
    }
727

728
    SRpcMsg rpcMsg = {.code = applyCode};
580,392✔
729
    TAOS_CHECK_EXIT(syncEntry2OriginalRpc(pEntry, &rpcMsg));
580,392!
730

731
    cbMeta.index = pEntry->index;
580,394✔
732
    cbMeta.isWeak = pEntry->isWeak;
580,394✔
733
    cbMeta.code = applyCode;
580,394✔
734
    cbMeta.state = role;
580,394✔
735
    cbMeta.seqNum = pEntry->seqNum;
580,394✔
736
    cbMeta.term = pEntry->term;
580,394✔
737
    cbMeta.currentTerm = term;
580,394✔
738
    cbMeta.flag = -1;
580,394✔
739
    rpcMsg.info.traceId = pEntry->originRpcTraceId;
580,394✔
740

741
    int32_t num = syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
580,394✔
742
    sGDebug(&rpcMsg.info.traceId, "vgId:%d, index:%" PRId64 ", get response info, handle:%p seq:%" PRId64 " num:%d",
580,395!
743
            pNode->vgId, pEntry->index, &rpcMsg.info, cbMeta.seqNum, num);
744

745
    code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
580,395✔
746

747
    retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
580,393!
748

749
    sGTrace(&rpcMsg.info.traceId,
580,393!
750
            "vgId:%d, index:%" PRId64 ", fsm execute, term:%" PRId64 ", type:%s, code:%d, retry:%d", pNode->vgId,
751
            pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), code, retry);
752

753
    if (retry) {
580,393!
754
      taosMsleep(10);
×
755
      if (code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE) {
×
756
        pNode->applyQueueErrorCount++;
×
757
        if (pNode->applyQueueErrorCount == APPLY_QUEUE_ERROR_THRESHOLD) {
×
758
          pNode->applyQueueErrorCount = 0;
×
759
          sGWarn(&rpcMsg.info.traceId,
×
760
                 "vgId:%d, index:%" PRId64 ", will retry to execute fsm after 10ms, last error is %s", pNode->vgId,
761
                 pEntry->index, tstrerror(code));
762
        } else {
763
          sGTrace(&rpcMsg.info.traceId,
×
764
                  "vgId:%d, index:%" PRId64 ", will retry to execute fsm after 10ms, last error is %s", pNode->vgId,
765
                  pEntry->index, tstrerror(code));
766
        }
767
      }
768
    }
769
  } while (retry);
580,393!
770

771
_exit:
580,393✔
772
  if (code < 0) {
580,393✔
773
    sError("vgId:%d, index:%" PRId64 ", failed to execute fsm at line %d since %s, term:%" PRId64 ", type:%s",
6!
774
           pNode->vgId, pEntry->index, lino, tstrerror(code), pEntry->term, TMSG_INFO(pEntry->originalRpcType));
775
  }
776
  return code;
580,393✔
777
}
778

779
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) {
13,823,236✔
780
  if (pBuf->startIndex > pBuf->matchIndex) {
13,823,236!
781
    sError("failed to validate, pBuf->startIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->startIndex,
×
782
           pBuf->matchIndex);
783
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
784
  }
785
  if (pBuf->commitIndex > pBuf->matchIndex) {
13,823,236!
786
    sError("failed to validate, pBuf->commitIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->commitIndex,
×
787
           pBuf->matchIndex);
788
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
789
  }
790
  if (pBuf->matchIndex >= pBuf->endIndex) {
13,823,236!
791
    sError("failed to validate, pBuf->matchIndex:%" PRId64 ", pBuf->endIndex:%" PRId64, pBuf->matchIndex,
×
792
           pBuf->endIndex);
793
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
794
  }
795
  if (pBuf->endIndex - pBuf->startIndex > pBuf->size) {
13,823,236!
796
    sError("failed to validate, pBuf->endIndex:%" PRId64 ", pBuf->startIndex:%" PRId64 ", pBuf->size:%" PRId64,
×
797
           pBuf->endIndex, pBuf->startIndex, pBuf->size);
798
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
799
  }
800
  if (pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem == NULL) {
13,823,236!
801
    sError("failed to validate since pItem is null");
×
802
    return TSDB_CODE_SYN_INTERNAL_ERROR;
×
803
  }
804
  return 0;
13,823,236✔
805
}
806

807
int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex, const STraceId* trace,
2,431,473✔
808
                            const char* src) {
809
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
2,431,473!
810
  (void)taosThreadMutexLock(&pBuf->mutex);
2,431,497✔
811

812
  SSyncLogStore*  pLogStore = pNode->pLogStore;
2,431,551✔
813
  SSyncFSM*       pFsm = pNode->pFsm;
2,431,551✔
814
  ESyncState      role = pNode->state;
2,431,551✔
815
  SyncTerm        currentTerm = raftStoreGetTerm(pNode);
2,431,551✔
816
  SyncGroupId     vgId = pNode->vgId;
2,431,490✔
817
  int32_t         code = 0;
2,431,490✔
818
  int64_t         upperIndex = TMIN(commitIndex, pBuf->matchIndex);
2,431,490✔
819
  SSyncRaftEntry* pEntry = NULL;
2,431,490✔
820
  bool            inBuf = false;
2,431,490✔
821
  SSyncRaftEntry* pNextEntry = NULL;
2,431,490✔
822
  bool            nextInBuf = false;
2,431,490✔
823
  bool            restoreFinishAtThisCommit = false;
2,431,490✔
824

825
  if (commitIndex <= pBuf->commitIndex) {
2,431,490✔
826
    sGDebug(trace, "vgId:%d, stale commit index:%" PRId64 ", notified:%" PRId64, vgId, commitIndex, pBuf->commitIndex);
226,404!
827
    if (!pNode->restoreFinish && commitIndex > 0 && commitIndex == pBuf->commitIndex) {
226,404!
828
      int32_t ret = syncLogBufferGetOneEntry(pBuf, pNode, commitIndex, &inBuf, &pEntry);
16,541✔
829
      if (ret != 0) {
16,541✔
830
        sError("vgId:%d, failed to get entry at index:%" PRId64, vgId, commitIndex);
610!
831
      }
832
    }
833
    goto _out;
226,402✔
834
  }
835

836
  sGDebug(trace,
2,205,086!
837
          "vgId:%d, log commit since %s, buffer:[%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
838
          "), role:%d, term:%" PRId64,
839
          pNode->vgId, src, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, currentTerm);
840

841
  // execute in fsm
842
  for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) {
4,563,053✔
843
    // get a log entry
844
    code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry);
2,358,022✔
845
    if (pEntry == NULL) {
2,358,049!
846
      goto _out;
×
847
    }
848

849
    // execute it
850
    if (!syncUtilUserCommit(pEntry->originalRpcType)) {
2,358,049✔
851
      sGInfo(&pEntry->originRpcTraceId,
28,813!
852
             "vgId:%d, index:%" PRId64 ", log commit sync barrier, term:%" PRId64 ", type:%s", vgId, pEntry->index,
853
             pEntry->term, TMSG_INFO(pEntry->originalRpcType));
854
    }
855

856
    if ((code = syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0, false)) != 0) {
2,358,057✔
857
      sGError(&pEntry->originRpcTraceId,
6!
858
              "vgId:%d, index:%" PRId64 ", failed to execute sync log entry, term:%" PRId64
859
              ", role:%d, current term:%" PRId64,
860
              vgId, pEntry->index, pEntry->term, role, currentTerm);
861
      goto _out;
6✔
862
    }
863
    pBuf->commitIndex = index;
2,358,028✔
864

865
    sGDebug(&pEntry->originRpcTraceId,
2,358,028!
866
            "vgId:%d, index:%" PRId64 ", raft entry committed, term:%" PRId64 ", role:%d, current term:%" PRId64,
867
            pNode->vgId, pEntry->index, pEntry->term, role, currentTerm);
868

869
    code = syncLogBufferGetOneEntry(pBuf, pNode, index + 1, &nextInBuf, &pNextEntry);
2,358,028✔
870
    if (pNextEntry != NULL) {
2,358,040✔
871
      if (pNextEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
352,349!
872
        sInfo(
×
873
            "vgId:%d, to change config at commit, "
874
            "current entry, index:%" PRId64 ", term:%" PRId64
875
            ", "
876
            "node, role:%d, current term:%" PRId64
877
            ", restore:%d, "
878
            "cond, next entry index:%" PRId64 ", msgType:%s",
879
            vgId, pEntry->index, pEntry->term, role, currentTerm, pNode->restoreFinish, pNextEntry->index,
880
            TMSG_INFO(pNextEntry->originalRpcType));
881

882
        if ((code = syncNodeChangeConfig(pNode, pNextEntry, "Commit")) != 0) {
×
883
          sError("vgId:%d, failed to change config from Commit, index:%" PRId64 ", term:%" PRId64
×
884
                 ", role:%d, current term:%" PRId64,
885
                 vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
886
          goto _out;
×
887
        }
888

889
        // for 2->1, need to apply config change entry in sync thread,
890
        if (pNode->replicaNum == 1) {
×
891
          if ((code = syncFsmExecute(pNode, pFsm, role, currentTerm, pNextEntry, 0, true)) != 0) {
×
892
            sError("vgId:%d, failed to execute sync log entry, index:%" PRId64 ", term:%" PRId64
×
893
                   ", role:%d, current term:%" PRId64,
894
                   vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
895
            goto _out;
×
896
          }
897

898
          index++;
×
899
          pBuf->commitIndex = index;
×
900

901
          sGDebug(&pNextEntry->originRpcTraceId,
×
902
                  "vgId:%d, index:%" PRId64 ", raft entry committed, term:%" PRId64 ", role:%d, current term:%" PRId64,
903
                  pNode->vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
904
        }
905
      }
906
      if (!nextInBuf) {
352,298!
907
        syncEntryDestroy(pNextEntry);
×
908
        pNextEntry = NULL;
×
909
      }
910
    }
911

912
    if (!inBuf) {
2,357,989!
913
      syncEntryDestroy(pEntry);
×
914
      pEntry = NULL;
×
915
    }
916
  }
917

918
  // recycle
919
  bool      isVnode = pNode->vgId > 1;
2,205,031✔
920
  SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION;
2,205,031✔
921
  do {
1,714,141✔
922
    if ((pBuf->startIndex >= pBuf->commitIndex) ||
3,919,172✔
923
        !((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD &&
3,919,141✔
924
                                         atomic_load_64(&tsLogBufferMemoryUsed) >= tsLogBufferMemoryAllowed))) {
50,275✔
925
      break;
926
    }
927
    SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem;
1,714,117✔
928
    if (pEntry == NULL) {
1,714,117!
929
      sError("vgId:%d, invalid log entry to recycle, index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
×
930
             ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64,
931
             pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term);
932
      return TSDB_CODE_SYN_INTERNAL_ERROR;
×
933
    }
934
    if (isVnode) {
1,714,117✔
935
      pBuf->bytes -= pEntry->bytes;
1,680,713✔
936
      (void)atomic_sub_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
1,680,713✔
937
    }
938
    sDebug("vgId:%d, recycle log entry, index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
1,714,134!
939
           ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64
940
           ", used:%" PRId64 ", allowed:%" PRId64,
941
           pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term,
942
           pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed), tsLogBufferMemoryAllowed);
943
    syncEntryDestroy(pEntry);
1,714,134✔
944
    (void)memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
1,714,141✔
945
    ++pBuf->startIndex;
1,714,141✔
946
  } while (true);
947

948
  code = 0;
2,205,055✔
949
_out:
2,431,463✔
950
  // mark as restored if needed
951
  if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL &&
2,431,463✔
952
      currentTerm <= pEntry->term) {
49,268✔
953
    pNode->pFsm->FpRestoreFinishCb(pNode->pFsm, pBuf->commitIndex);
15,177✔
954
    pNode->restoreFinish = true;
15,177✔
955
    restoreFinishAtThisCommit = true;
15,177✔
956
    sInfo("vgId:%d, restore finished, term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
15,177✔
957
          pNode->vgId, currentTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
958
  }
959

960
  if (!inBuf) {
2,431,463✔
961
    syncEntryDestroy(pEntry);
212,296✔
962
    pEntry = NULL;
212,296✔
963
  }
964
  if (!nextInBuf) {
2,431,463✔
965
    syncEntryDestroy(pNextEntry);
2,225,976✔
966
    pNextEntry = NULL;
2,225,981✔
967
  }
968
  (void)taosThreadMutexUnlock(&pBuf->mutex);
2,431,468✔
969

970
  if (restoreFinishAtThisCommit && pNode->pFsm->FpAfterRestoredCb != NULL) {
2,431,551✔
971
    pNode->pFsm->FpAfterRestoredCb(pNode->pFsm, pBuf->commitIndex);
2,211✔
972
    sInfo("vgId:%d, after restore finished callback executed)", pNode->vgId);
2,211✔
973
  }
974

975
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
2,431,551!
976
  TAOS_RETURN(code);
2,431,521✔
977
}
978

979
void syncLogReplReset(SSyncLogReplMgr* pMgr) {
43,590✔
980
  if (pMgr == NULL) return;
43,590!
981

982
  if (pMgr->startIndex < 0) {
43,590!
983
    sError("failed to reset, pMgr->startIndex:%" PRId64, pMgr->startIndex);
×
984
    return;
×
985
  }
986
  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
48,670✔
987
    (void)memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
5,080✔
988
  }
989
  pMgr->startIndex = 0;
43,590✔
990
  pMgr->matchIndex = 0;
43,590✔
991
  pMgr->endIndex = 0;
43,590✔
992
  pMgr->restored = false;
43,590✔
993
  pMgr->retryBackoff = 0;
43,590✔
994
}
995

996
int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
452,672✔
997
  if (pMgr->endIndex <= pMgr->startIndex) {
452,672!
998
    return 0;
×
999
  }
1000

1001
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
452,672✔
1002
  if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) {
452,672✔
1003
    syncLogReplReset(pMgr);
7✔
1004
    sWarn("vgId:%d, reset sync log repl since retry backoff exceeding limit, peer addr:0x%" PRIx64, pNode->vgId,
7!
1005
          pDestId->addr);
1006
    return TSDB_CODE_OUT_OF_RANGE;
7✔
1007
  }
1008

1009
  int32_t  code = 0;
452,665✔
1010
  bool     retried = false;
452,665✔
1011
  int64_t  retryWaitMs = syncLogReplGetRetryBackoffTimeMs(pMgr);
452,665✔
1012
  int64_t  nowMs = taosGetMonoTimestampMs();
452,665✔
1013
  int      count = 0;
452,665✔
1014
  int64_t  firstIndex = -1;
452,665✔
1015
  SyncTerm term = -1;
452,665✔
1016
  int64_t  batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
452,665✔
1017

1018
  for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
479,955✔
1019
    int64_t pos = index % pMgr->size;
469,124✔
1020
    if (!(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex))) {
469,124!
1021
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1022
      goto _out;
3✔
1023
    }
1024

1025
    if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) {
469,124✔
1026
      break;
441,831✔
1027
    }
1028

1029
    if (pMgr->states[pos].acked) {
27,297✔
1030
      if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) {
26,373✔
1031
        syncLogReplReset(pMgr);
2✔
1032
        sWarn("vgId:%d, reset sync log repl since stagnation, index:%" PRId64 ", peer addr:0x%" PRIx64, pNode->vgId, index,
2!
1033
              pDestId->addr);
1034
        code = TSDB_CODE_ACTION_IN_PROGRESS;
2✔
1035
        goto _out;
2✔
1036
      }
1037
      continue;
25,349✔
1038
    }
1039

1040
    bool barrier = false;
1,946✔
1041
    if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) {
1,946✔
1042
      sError("vgId:%d, failed to replicate sync log entry since %s, index:%" PRId64 ", dest addr:0x%" PRIx64, pNode->vgId,
1!
1043
             tstrerror(code), index, pDestId->addr);
1044
      goto _out;
1✔
1045
    }
1046
    if (barrier != pMgr->states[pos].barrier) {
1,945!
1047
      code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1048
      goto _out;
×
1049
    }
1050
    pMgr->states[pos].timeMs = nowMs;
1,945✔
1051
    pMgr->states[pos].term = term;
1,945✔
1052
    pMgr->states[pos].acked = false;
1,945✔
1053

1054
    retried = true;
1,945✔
1055
    if (firstIndex == -1) firstIndex = index;
1,945✔
1056

1057
    if (batchSize <= count++) {
1,945✔
1058
      break;
4✔
1059
    }
1060
  }
1061

1062
_out:
10,831✔
1063
  if (retried) {
452,665✔
1064
    pMgr->retryBackoff = syncLogReplGetNextRetryBackoff(pMgr);
92✔
1065
    SSyncLogBuffer* pBuf = pNode->pLogBuf;
92✔
1066
    sInfo("vgId:%d, resend %d sync log entries, dest addr:0x%" PRIx64 ", indexes:%" PRId64 " ..., terms: .., %" PRId64
92!
1067
          ", retryWaitMs:%" PRId64 ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64
1068
          " %" PRId64 ", %" PRId64 ")",
1069
          pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex,
1070
          pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1071
  }
1072
  TAOS_RETURN(code);
452,665✔
1073
}
1074

1075
int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
4,984✔
1076
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
4,984✔
1077
  SRaftId         destId = pMsg->srcId;
4,984✔
1078
  int32_t         code = 0;
4,984✔
1079
  if (pMgr->restored != false) return TSDB_CODE_SYN_INTERNAL_ERROR;
4,984!
1080

1081
  sTrace("vgId:%d, begin to recover sync log repl, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 ", %" PRId64
4,984!
1082
         ", %" PRId64 ") restore:%d, buffer: [%" PRId64 ", %" PRId64 ", %" PRId64 ", %" PRId64
1083
         "), msg: {lastSendIndex:%" PRId64 ", matchIndex:%" PRId64 ", fsmState:%d, success:%d, lastMatchTerm:%" PRId64
1084
         "}",
1085
         pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pMgr->restored,
1086
         pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, pMsg->lastSendIndex, pMsg->matchIndex,
1087
         pMsg->fsmState, pMsg->success, pMsg->lastMatchTerm);
1088

1089
  if (pMgr->endIndex == 0) {
4,984✔
1090
    if (pMgr->startIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
1,983!
1091
    if (pMgr->matchIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
1,983!
1092
    if (pMsg->matchIndex < 0) {
1,983✔
1093
      pMgr->restored = true;
253✔
1094
      sInfo("vgId:%d, sync log repl restored, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
253!
1095
            "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1096
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1097
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1098
      return 0;
253✔
1099
    }
1100
  } else {
1101
    if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) {
3,001!
1102
      TAOS_CHECK_RETURN(syncLogReplRetryOnNeed(pMgr, pNode));
2!
1103
      return 0;
2✔
1104
    }
1105

1106
    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
2,999✔
1107

1108
    if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) {
2,999✔
1109
      pMgr->matchIndex = pMsg->matchIndex;
2,736✔
1110
      pMgr->restored = true;
2,736✔
1111
      sInfo("vgId:%d, sync log repl restored, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
2,736!
1112
            "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1113
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1114
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1115
      return 0;
2,736✔
1116
    }
1117

1118
    if (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE || (!pMsg->success && pMsg->matchIndex >= pMsg->lastSendIndex)) {
263!
1119
      char* msg1 = " rollback match index failure";
×
1120
      char* msg2 = " incomplete fsm state";
×
1121
      sInfo("vgId:%d, snapshot replication progress:1/8:leader:1/4 to dnode:%d, reason:%s, match index:%" PRId64
×
1122
            ", last sent:%" PRId64,
1123
            pNode->vgId, DID(&destId), (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1), pMsg->matchIndex,
1124
            pMsg->lastSendIndex);
1125
      if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) {
×
1126
        sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
×
1127
        TAOS_RETURN(code);
×
1128
      }
1129
      return 0;
×
1130
    }
1131
  }
1132

1133
  // check last match term
1134
  SyncTerm  term = -1;
1,993✔
1135
  SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
1,993✔
1136
  SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex);
1,993✔
1137
  SET_ERRNO(0);
1,993✔
1138

1139
  if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) {
1,993✔
1140
    code = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1, &term);
397✔
1141
    if (term < 0 && (ERRNO == ENFILE || ERRNO == EMFILE || ERRNO == ENOENT)) {
397!
1142
      sError("vgId:%d, failed to get prev log term since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index + 1);
×
1143
      TAOS_RETURN(code);
×
1144
    }
1145

1146
    if (pMsg->matchIndex == -1) {
397✔
1147
      // first time to restore
1148
      sInfo("vgId:%d, first time to restore sync log repl, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64
154!
1149
            ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), index:%" PRId64
1150
            ", firstVer:%" PRId64 ", term:%" PRId64 ", lastMatchTerm:%" PRId64,
1151
            pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1152
            pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, index, firstVer, term,
1153
            pMsg->lastMatchTerm);
1154
    }
1155

1156
    if ((index + 1 < firstVer) || (term < 0) ||
397!
1157
        (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
301!
1158
      if (!(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST)) return TSDB_CODE_SYN_INTERNAL_ERROR;
96!
1159
      if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) {
96!
1160
        sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
×
1161
        TAOS_RETURN(code);
×
1162
      }
1163
      sInfo("vgId:%d, snapshot replication to peer dnode:%d", pNode->vgId, DID(&destId));
96!
1164
      return 0;
96✔
1165
    }
1166

1167
    if (!(index + 1 >= firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR;
301!
1168

1169
    if (term == pMsg->lastMatchTerm) {
301✔
1170
      index = index + 1;
300✔
1171
      if (!(index <= pNode->pLogBuf->matchIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR;
300!
1172
    } else {
1173
      if (!(index > firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR;
1!
1174
    }
1175
  }
1176

1177
  // attempt to replicate the raft log at index
1178
  syncLogReplReset(pMgr);
1,897✔
1179
  return syncLogReplProbe(pMgr, pNode, index);
1,897✔
1180
}
1181

1182
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
62,513✔
1183
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
62,513✔
1184
  (void)taosThreadMutexLock(&pMgr->mutex);
62,513✔
1185
  if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
62,513!
1186
    sInfo("vgId:%d, reset sync log repl in heartbeat, peer addr:0x%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64,
2,757!
1187
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
1188
    syncLogReplReset(pMgr);
2,757✔
1189
    pMgr->peerStartTime = pMsg->startTime;
2,757✔
1190
  }
1191
  (void)taosThreadMutexUnlock(&pMgr->mutex);
62,513✔
1192
  return 0;
62,513✔
1193
}
1194

1195
int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
245,508✔
1196
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
245,508✔
1197
  (void)taosThreadMutexLock(&pMgr->mutex);
245,508✔
1198
  if (pMsg->startTime != pMgr->peerStartTime) {
245,510✔
1199
    sInfo("vgId:%d, reset sync log repl in appendlog reply, peer addr:0x%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64,
306!
1200
          pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
1201
    syncLogReplReset(pMgr);
306✔
1202
    pMgr->peerStartTime = pMsg->startTime;
306✔
1203
  }
1204
  (void)taosThreadMutexUnlock(&pMgr->mutex);
245,510✔
1205

1206
  (void)taosThreadMutexLock(&pBuf->mutex);
245,509✔
1207

1208
  int32_t code = 0;
245,509✔
1209
  if (pMgr->restored) {
245,509✔
1210
    if ((code = syncLogReplContinue(pMgr, pNode, pMsg)) != 0) {
240,525!
1211
      sWarn("vgId:%d, failed to continue sync log repl since %s", pNode->vgId, tstrerror(code));
×
1212
    }
1213
  } else {
1214
    if ((code = syncLogReplRecover(pMgr, pNode, pMsg)) != 0) {
4,984!
1215
      sWarn("vgId:%d, failed to recover sync log repl since %s", pNode->vgId, tstrerror(code));
×
1216
    }
1217
  }
1218
  (void)taosThreadMutexUnlock(&pBuf->mutex);
245,507✔
1219
  return 0;
245,510✔
1220
}
1221

1222
int32_t syncLogReplStart(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
217,134✔
1223
  if (pMgr->restored) {
217,134✔
1224
    TAOS_CHECK_RETURN(syncLogReplAttempt(pMgr, pNode));
212,189✔
1225
  } else {
1226
    TAOS_CHECK_RETURN(syncLogReplProbe(pMgr, pNode, pNode->pLogBuf->matchIndex));
4,945!
1227
  }
1228
  return 0;
217,082✔
1229
}
1230

1231
int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
6,844✔
1232
  if (pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR;
6,844!
1233
  if (!(pMgr->startIndex >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
6,844!
1234
  int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs();
6,844✔
1235
  int64_t nowMs = taosGetMonoTimestampMs();
6,844✔
1236
  int32_t code = 0;
6,844✔
1237

1238
  sTrace("vgId:%d, begin to probe peer addr:0x%" PRIx64 " with msg of index:%" PRId64 ", repl-mgr:[%" PRId64 ", %" PRId64
6,844!
1239
         ", %" PRId64 "), restored:%d",
1240
         pNode->vgId, pNode->replicasId[pMgr->peerId].addr, index, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1241
         pMgr->restored);
1242

1243
  if (pMgr->endIndex > pMgr->startIndex &&
6,844✔
1244
      nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) {
1,765✔
1245
    return 0;
1,608✔
1246
  }
1247
  syncLogReplReset(pMgr);
5,236✔
1248

1249
  SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
5,236✔
1250
  bool     barrier = false;
5,236✔
1251
  SyncTerm term = -1;
5,236✔
1252
  if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) {
5,236!
1253
    sError("vgId:%d, failed to replicate log entry since %s, index:%" PRId64 ", dest addr:0x%016" PRIx64, pNode->vgId,
×
1254
           tstrerror(code), index, pDestId->addr);
1255
    TAOS_RETURN(code);
×
1256
  }
1257

1258
  if (!(index >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
5,236!
1259
  pMgr->states[index % pMgr->size].barrier = barrier;
5,236✔
1260
  pMgr->states[index % pMgr->size].timeMs = nowMs;
5,236✔
1261
  pMgr->states[index % pMgr->size].term = term;
5,236✔
1262
  pMgr->states[index % pMgr->size].acked = false;
5,236✔
1263

1264
  pMgr->startIndex = index;
5,236✔
1265
  pMgr->endIndex = index + 1;
5,236✔
1266

1267
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
5,236✔
1268
  sTrace("vgId:%d, probe peer addr:0x%" PRIx64 " with msg of index:%" PRId64 " term:%" PRId64 ", repl-mgr:[%" PRId64
5,236!
1269
         " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
1270
         pNode->vgId, pDestId->addr, index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex,
1271
         pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1272
  return 0;
5,236✔
1273
}
1274

1275
int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
452,714✔
1276
  if (!pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR;
452,714!
1277

1278
  sTrace("vgId:%d, replicate raft entries from end to match, repl-mgr:[%" PRId64 ", %" PRId64 ", %" PRId64
452,714!
1279
         "), restore:%d",
1280
         pNode->vgId, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pMgr->restored);
1281

1282
  SRaftId*  pDestId = &pNode->replicasId[pMgr->peerId];
452,714✔
1283
  int32_t   batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
452,714✔
1284
  int32_t   code = 0;
452,714✔
1285
  int32_t   count = 0;
452,714✔
1286
  int64_t   nowMs = taosGetMonoTimestampMs();
452,714✔
1287
  int64_t   limit = pMgr->size >> 1;
452,714✔
1288
  SyncTerm  term = -1;
452,714✔
1289
  SyncIndex firstIndex = -1;
452,714✔
1290

1291
  for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) {
692,029✔
1292
    if (batchSize < count || limit <= index - pMgr->startIndex) {
264,533✔
1293
      break;
1294
    }
1295
    if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) {
264,019✔
1296
      break;
23,025✔
1297
    }
1298
    int64_t  pos = index % pMgr->size;
240,994✔
1299
    SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
240,994✔
1300
    bool     barrier = false;
240,994✔
1301
    SyncTerm term = -1;
240,994✔
1302

1303
    code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier);
240,994✔
1304
    if (code < 0) {
240,995✔
1305
      sError("vgId:%d, failed to replicate log entry since %s, index:%" PRId64 ", dest addr:0x%016" PRIx64, pNode->vgId,
43✔
1306
             tstrerror(code), index, pDestId->addr);
1307
      TAOS_RETURN(code);
43✔
1308
    }
1309
    pMgr->states[pos].barrier = barrier;
240,952✔
1310
    pMgr->states[pos].timeMs = nowMs;
240,952✔
1311
    pMgr->states[pos].term = term;
240,952✔
1312
    pMgr->states[pos].acked = false;
240,952✔
1313

1314
    if (firstIndex == -1) {
240,952✔
1315
      firstIndex = index;
202,481✔
1316
    }
1317

1318
    count++;
240,952✔
1319

1320
    pMgr->endIndex = index + 1;
240,952✔
1321
    if (barrier) {
240,952✔
1322
      sInfo("vgId:%d, replicated sync barrier to dnode:%d, index:%" PRId64 ", term:%" PRId64 ", repl-mgr:[%" PRId64
1,637!
1323
            " %" PRId64 ", %" PRId64 ")",
1324
            pNode->vgId, DID(pDestId), index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
1325
      break;
1,637✔
1326
    }
1327
  }
1328

1329
  TAOS_CHECK_RETURN(syncLogReplRetryOnNeed(pMgr, pNode));
452,672✔
1330

1331
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
452,657✔
1332
  sTrace("vgId:%d, replicated %d msgs to peer addr:0x%" PRIx64 ", indexes:%" PRId64 "..., terms: ...%" PRId64
452,657!
1333
         ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
1334
         ")",
1335
         pNode->vgId, count, pDestId->addr, firstIndex, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
1336
         pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1337
  return 0;
452,658✔
1338
}
1339

1340
int32_t syncLogReplContinue(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
240,525✔
1341
  if (pMgr->restored != true) return TSDB_CODE_SYN_INTERNAL_ERROR;
240,525!
1342
  if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) {
240,525✔
1343
    if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) {
238,029!
1344
      int64_t firstMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs;
×
1345
      int64_t lastMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs;
×
1346
      int64_t diffMs = lastMs - firstMs;
×
1347
      if (diffMs > 0 && diffMs < ((int64_t)SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) {
×
1348
        pMgr->retryBackoff -= 1;
×
1349
      }
1350
    }
1351
    pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
238,029✔
1352
    pMgr->matchIndex = TMAX(pMgr->matchIndex, pMsg->matchIndex);
238,029✔
1353
    for (SyncIndex index = pMgr->startIndex; index < pMgr->matchIndex; index++) {
476,821✔
1354
      (void)memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
238,792✔
1355
    }
1356
    pMgr->startIndex = pMgr->matchIndex;
238,029✔
1357
  }
1358

1359
  return syncLogReplAttempt(pMgr, pNode);
240,525✔
1360
}
1361

1362
SSyncLogReplMgr* syncLogReplCreate() {
214,996✔
1363
  SSyncLogReplMgr* pMgr = taosMemoryCalloc(1, sizeof(SSyncLogReplMgr));
214,996!
1364
  if (pMgr == NULL) {
215,020!
1365
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1366
    return NULL;
×
1367
  }
1368

1369
  pMgr->size = sizeof(pMgr->states) / sizeof(pMgr->states[0]);
215,020✔
1370

1371
  if (pMgr->size != TSDB_SYNC_LOG_BUFFER_SIZE) {
215,020!
1372
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1373
    return NULL;
×
1374
  }
1375

1376
  int32_t code = taosThreadMutexInit(&pMgr->mutex, NULL);
215,020✔
1377
  if (code) {
215,002!
1378
    terrno = code;
×
1379
    return NULL;
×
1380
  }
1381

1382
  return pMgr;
215,003✔
1383
}
1384

1385
void syncLogReplDestroy(SSyncLogReplMgr* pMgr) {
214,940✔
1386
  if (pMgr == NULL) {
214,940!
1387
    return;
×
1388
  }
1389
  (void)taosThreadMutexDestroy(&pMgr->mutex);
214,940✔
1390
  taosMemoryFree(pMgr);
214,982!
1391
  return;
214,901✔
1392
}
1393

1394
int32_t syncNodeLogReplInit(SSyncNode* pNode) {
14,337✔
1395
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
229,339✔
1396
    if (pNode->logReplMgrs[i] != NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
215,003!
1397
    pNode->logReplMgrs[i] = syncLogReplCreate();
215,003✔
1398
    if (pNode->logReplMgrs[i] == NULL) {
215,002!
1399
      TAOS_RETURN(terrno);
×
1400
    }
1401
    pNode->logReplMgrs[i]->peerId = i;
215,002✔
1402
  }
1403
  return 0;
14,336✔
1404
}
1405

1406
void syncNodeLogReplDestroy(SSyncNode* pNode) {
14,338✔
1407
  for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
229,250✔
1408
    syncLogReplDestroy(pNode->logReplMgrs[i]);
214,936✔
1409
    pNode->logReplMgrs[i] = NULL;
214,912✔
1410
  }
1411
}
14,314✔
1412

1413
int32_t syncLogBufferCreate(SSyncLogBuffer** ppBuf) {
14,338✔
1414
  int32_t         code = 0;
14,338✔
1415
  SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer));
14,338!
1416
  if (pBuf == NULL) {
14,338!
1417
    TAOS_CHECK_GOTO(terrno, NULL, _exit);
×
1418
  }
1419

1420
  pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]);
14,338✔
1421

1422
  if (pBuf->size != TSDB_SYNC_LOG_BUFFER_SIZE) {
14,338!
1423
    code = TSDB_CODE_SYN_INTERNAL_ERROR;
×
1424
    goto _exit;
×
1425
  }
1426

1427
  if (taosThreadMutexAttrInit(&pBuf->attr) < 0) {
14,338!
1428
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1429
    sError("failed to init log buffer mutexattr due to %s", tstrerror(code));
×
1430
    goto _exit;
×
1431
  }
1432

1433
  if (taosThreadMutexAttrSetType(&pBuf->attr, PTHREAD_MUTEX_RECURSIVE) < 0) {
14,337!
1434
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1435
    sError("failed to set log buffer mutexattr type due to %s", tstrerror(code));
×
1436
    goto _exit;
×
1437
  }
1438

1439
  if (taosThreadMutexInit(&pBuf->mutex, &pBuf->attr) < 0) {
14,337!
1440
    code = TAOS_SYSTEM_ERROR(ERRNO);
×
1441
    sError("failed to init log buffer mutex due to %s", tstrerror(code));
×
1442
    goto _exit;
×
1443
  }
1444
_exit:
14,338✔
1445
  if (code != 0) {
14,338!
1446
    taosMemoryFreeClear(pBuf);
×
1447
  }
1448
  *ppBuf = pBuf;
14,338✔
1449
  TAOS_RETURN(code);
14,338✔
1450
}
1451

1452
void syncLogBufferClear(SSyncLogBuffer* pBuf) {
14,338✔
1453
  (void)taosThreadMutexLock(&pBuf->mutex);
14,338✔
1454
  for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
672,011✔
1455
    SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
659,191✔
1456
    if (pEntry == NULL) continue;
659,191!
1457
    syncEntryDestroy(pEntry);
659,191✔
1458
    pEntry = NULL;
657,673✔
1459
    (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
657,673✔
1460
  }
1461
  pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
12,820✔
1462
  pBuf->bytes = 0;
12,820✔
1463
  (void)taosThreadMutexUnlock(&pBuf->mutex);
12,820✔
1464
}
14,338✔
1465

1466
void syncLogBufferDestroy(SSyncLogBuffer* pBuf) {
14,338✔
1467
  if (pBuf == NULL) {
14,338!
1468
    return;
×
1469
  }
1470
  syncLogBufferClear(pBuf);
14,338✔
1471
  (void)taosThreadMutexDestroy(&pBuf->mutex);
14,338✔
1472
  (void)taosThreadMutexAttrDestroy(&pBuf->attr);
14,338✔
1473
  taosMemoryFree(pBuf);
14,338!
1474
  return;
14,338✔
1475
}
1476

1477
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
18,051✔
1478
  int32_t code = 0;
18,051✔
1479
  if (!(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR;
18,051!
1480

1481
  if (toIndex == pBuf->endIndex) {
18,051✔
1482
    return 0;
18,035✔
1483
  }
1484

1485
  sInfo("vgId:%d, rollback sync log buffer, toindex:%" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
16!
1486
        ")",
1487
        pNode->vgId, toIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1488

1489
  // trunc buffer
1490
  SyncIndex index = pBuf->endIndex - 1;
16✔
1491
  while (index >= toIndex) {
251✔
1492
    SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem;
235✔
1493
    if (pEntry != NULL) {
235✔
1494
      syncEntryDestroy(pEntry);
17✔
1495
      pEntry = NULL;
17✔
1496
      (void)memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0]));
17✔
1497
    }
1498
    index--;
235✔
1499
  }
1500
  pBuf->endIndex = toIndex;
16✔
1501
  pBuf->matchIndex = TMIN(pBuf->matchIndex, index);
16✔
1502
  if (index + 1 != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
16!
1503

1504
  // trunc wal
1505
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
16✔
1506
  if (lastVer >= toIndex && (code = pNode->pLogStore->syncLogTruncate(pNode->pLogStore, toIndex)) < 0) {
16!
1507
    sError("vgId:%d, failed to truncate log store since %s, from index:%" PRId64, pNode->vgId, tstrerror(code),
×
1508
           toIndex);
1509
    TAOS_RETURN(code);
×
1510
  }
1511
  lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
16✔
1512
  if (toIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
16!
1513

1514
  // refill buffer on need
1515
  if (toIndex <= pBuf->startIndex) {
16!
1516
    if ((code = syncLogBufferInitWithoutLock(pBuf, pNode)) < 0) {
×
1517
      sError("vgId:%d, failed to refill sync log buffer since %s", pNode->vgId, tstrerror(code));
×
1518
      TAOS_RETURN(code);
×
1519
    }
1520
  }
1521

1522
  if (pBuf->endIndex != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
16!
1523
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
16!
1524
  return 0;
16✔
1525
}
1526

1527
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
18,048✔
1528
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
18,048!
1529
  (void)taosThreadMutexLock(&pBuf->mutex);
18,048✔
1530
  SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
18,048✔
1531
  if (lastVer != pBuf->matchIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
18,048!
1532
  SyncIndex index = pBuf->endIndex - 1;
18,048✔
1533

1534
  int32_t code = 0;
18,048✔
1535
  if ((code = syncLogBufferRollback(pBuf, pNode, pBuf->matchIndex + 1)) != 0) {
18,048!
1536
    sError("vgId:%d, failed to rollback sync log buffer since %s", pNode->vgId, tstrerror(code));
×
1537
  }
1538

1539
  sInfo("vgId:%d, reset sync log buffer, buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
18,048✔
1540
        pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
1541

1542
  pBuf->endIndex = pBuf->matchIndex + 1;
18,048✔
1543

1544
  // reset repl mgr
1545
  for (int i = 0; i < pNode->totalReplicaNum; i++) {
51,296✔
1546
    SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
33,248✔
1547
    syncLogReplReset(pMgr);
33,248✔
1548
  }
1549
  (void)taosThreadMutexUnlock(&pBuf->mutex);
18,048✔
1550
  TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
18,048!
1551
  return 0;
18,048✔
1552
}
1553

1554
int32_t syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf,
5,224,887✔
1555
                                 SSyncRaftEntry** ppEntry) {
1556
  int32_t code = 0;
5,224,887✔
1557

1558
  *ppEntry = NULL;
5,224,887✔
1559

1560
  if (index >= pBuf->endIndex) {
5,224,887✔
1561
    return TSDB_CODE_OUT_OF_RANGE;
2,230,252✔
1562
  }
1563

1564
  if (index > pBuf->startIndex) {  // startIndex might be dummy
2,994,635✔
1565
    *pInBuf = true;
2,965,858✔
1566
    *ppEntry = pBuf->entries[index % pBuf->size].pItem;
2,965,858✔
1567
#ifdef USE_MOUNT
1568
    if (pNode->mountVgId) {
2,965,858✔
1569
      SMsgHead* pMsgHead = (SMsgHead*)(*ppEntry)->data;
80✔
1570
      if (pMsgHead->vgId != pNode->vgId) pMsgHead->vgId = pNode->vgId;
80✔
1571
    }
1572
#endif
1573
  } else {
1574
    *pInBuf = false;
28,777✔
1575

1576
    if ((code = pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, index, ppEntry)) < 0) {
28,777✔
1577
      sWarn("vgId:%d, failed to get log entry since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
653!
1578
    }
1579
  }
1580

1581
  TAOS_RETURN(code);
2,994,738✔
1582
}
1583

1584
int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId,
248,175✔
1585
                          bool* pBarrier) {
1586
  SSyncRaftEntry* pEntry = NULL;
248,175✔
1587
  SRpcMsg         msgOut = {0};
248,175✔
1588
  bool            inBuf = false;
248,175✔
1589
  SyncTerm        prevLogTerm = -1;
248,175✔
1590
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
248,175✔
1591
  int32_t         code = 0;
248,175✔
1592
  int32_t         lino = 0;
248,175✔
1593

1594
  code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry);
248,175✔
1595
  if (pEntry == NULL) {
248,177✔
1596
    sWarn("vgId:%d, failed to get raft entry for index:%" PRId64, pNode->vgId, index);
43!
1597
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) {
43!
1598
      SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
43✔
1599
      if (pMgr) {
43!
1600
        sInfo("vgId:%d, reset sync log repl of peer addr:0x%" PRIx64 " since %s, index:%" PRId64, pNode->vgId, pDestId->addr,
43!
1601
              tstrerror(code), index);
1602
        syncLogReplReset(pMgr);
43✔
1603
      }
1604
    }
1605
    goto _err;
43✔
1606
  }
1607
  *pBarrier = syncLogReplBarrier(pEntry);
248,134✔
1608

1609
  code = syncLogReplGetPrevLogTerm(pMgr, pNode, index, &prevLogTerm);
248,134✔
1610
  if (prevLogTerm < 0) {
248,134!
1611
    sError("vgId:%d, failed to get prev log term since %s, index:%" PRId64, pNode->vgId, tstrerror(code), index);
×
1612
    goto _err;
×
1613
  }
1614
  if (pTerm) *pTerm = pEntry->term;
248,134!
1615

1616
  code = syncBuildAppendEntriesFromRaftEntry(pNode, pEntry, prevLogTerm, &msgOut);
248,134✔
1617
  if (code < 0) {
248,132!
1618
    sError("vgId:%d, failed to get append entries for index:%" PRId64, pNode->vgId, index);
×
1619
    goto _err;
×
1620
  }
1621

1622
  TRACE_SET_MSGID(&(msgOut.info.traceId), tGenIdPI64());
248,132✔
1623
  sGDebug(&msgOut.info.traceId,
248,134!
1624
          "vgId:%d, index:%" PRId64 ", replicate one msg to dest addr:0x%" PRIx64 ", term:%" PRId64 " prevterm:%" PRId64,
1625
          pNode->vgId, pEntry->index, pDestId->addr, pEntry->term, prevLogTerm);
1626

1627
  TAOS_CHECK_GOTO(syncNodeSendAppendEntries(pNode, pDestId, &msgOut), &lino, _err);
248,134!
1628
  if (!inBuf) {
248,134✔
1629
    syncEntryDestroy(pEntry);
26,403✔
1630
    pEntry = NULL;
26,403✔
1631
  }
1632
  return 0;
248,134✔
1633

1634
_err:
43✔
1635
  rpcFreeCont(msgOut.pCont);
43✔
1636
  msgOut.pCont = NULL;
43✔
1637
  if (!inBuf) {
43!
1638
    syncEntryDestroy(pEntry);
43✔
1639
    pEntry = NULL;
43✔
1640
  }
1641
  TAOS_RETURN(code);
43✔
1642
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc