• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4523

17 Jul 2025 02:02AM UTC coverage: 56.768% (+0.3%) from 56.447%
#4523

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

140094 of 313745 branches covered (44.65%)

Branch coverage included in aggregate %.

212455 of 307292 relevant lines covered (69.14%)

18276193.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.55
/source/dnode/vnode/src/vnd/vnodeSync.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "tq.h"
19
#include "tsdb.h"
20
#include "vnd.h"
21
#include "stream.h"
22

23
#define BATCH_ENABLE 0
24

25
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
11,013,258✔
26

27
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
3,677✔
28
  vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
3,677!
29
          TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
30
  if (tsem_wait(&pVnode->syncSem) != 0) {
3,677!
31
    vError("vgId:%d, failed to wait sem", pVnode->config.vgId);
×
32
  }
33
}
3,677✔
34

35
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
4,195,665✔
36
  if (vnodeIsMsgBlock(pMsg->msgType)) {
4,195,665✔
37
    (void)taosThreadMutexLock(&pVnode->lock);
30,344✔
38
    if (pVnode->blocked) {
30,344✔
39
      vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
3,653!
40
              TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
41
      pVnode->blocked = false;
3,653✔
42
      pVnode->blockSec = 0;
3,653✔
43
      pVnode->blockSeq = 0;
3,653✔
44
      if (tsem_post(&pVnode->syncSem) != 0) {
3,653!
45
        vError("vgId:%d, failed to post sem", pVnode->config.vgId);
×
46
      }
47
    }
48
    (void)taosThreadMutexUnlock(&pVnode->lock);
30,344✔
49
  }
50
}
4,195,667✔
51

52
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
47,887✔
53
  SEpSet newEpSet = {0};
47,887✔
54
  syncGetRetryEpSet(pVnode->sync, &newEpSet);
47,887✔
55

56
  vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is redirect since not leader, numOfEps:%d inUse:%d",
47,887!
57
          pVnode->config.vgId, pMsg, newEpSet.numOfEps, newEpSet.inUse);
58
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
188,031✔
59
    vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i,
140,144!
60
            newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
61
  }
62
  pMsg->info.hasEpSet = 1;
47,887✔
63

64
  if (code == 0) code = TSDB_CODE_SYN_NOT_LEADER;
47,887!
65

66
  SRpcMsg rsp = {.code = code, .info = pMsg->info, .msgType = pMsg->msgType + 1};
47,887✔
67
  int32_t contLen = tSerializeSEpSet(NULL, 0, &newEpSet);
47,887✔
68

69
  rsp.pCont = rpcMallocCont(contLen);
47,887✔
70
  if (rsp.pCont == NULL) {
47,887!
71
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
72
  } else {
73
    if (tSerializeSEpSet(rsp.pCont, contLen, &newEpSet) < 0) {
47,887!
74
      vError("vgId:%d, failed to serialize ep set", pVnode->config.vgId);
×
75
    }
76
    rsp.contLen = contLen;
47,887✔
77
  }
78

79
  tmsgSendRsp(&rsp);
47,887✔
80
}
47,887✔
81

82
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
10,696,056✔
83
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
10,696,056✔
84
  if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
10,696,056✔
85
    rsp.code = terrno;
7✔
86
    vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to apply right now since %s", pVnode->config.vgId, pMsg,
7!
87
            terrstr());
88
  }
89
  if (rsp.info.handle != NULL) {
10,696,246✔
90
    tmsgSendRsp(&rsp);
10,691,255✔
91
  } else {
92
    if (rsp.pCont) {
4,991!
93
      rpcFreeCont(rsp.pCont);
×
94
    }
95
  }
96
}
10,696,426✔
97

98
static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
103,266✔
99
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
103,266✔
100
    vnodeRedirectRpcMsg(pVnode, pMsg, code);
40,119✔
101
  } else if (code == TSDB_CODE_MSG_PREPROCESSED) {
63,147✔
102
    SRpcMsg rsp = {.code = TSDB_CODE_SUCCESS, .info = pMsg->info};
61,739✔
103
    if (rsp.info.handle != NULL) {
61,739!
104
      tmsgSendRsp(&rsp);
61,772✔
105
    }
106
  } else {
107
    vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg,
1,408!
108
            tstrerror(code), code);
109
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
1,422✔
110
    if (rsp.info.handle != NULL) {
1,422✔
111
      tmsgSendRsp(&rsp);
759✔
112
    }
113
  }
114
}
103,463✔
115

116
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
10,922,632✔
117
  int64_t seq = 0;
10,922,632✔
118

119
  (void)taosThreadMutexLock(&pVnode->lock);
10,922,632✔
120
  int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
10,922,912✔
121
  bool    wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
10,922,549✔
122
  if (wait) {
10,922,548✔
123
    if (pVnode->blocked) {
3,677!
124
      (void)taosThreadMutexUnlock(&pVnode->lock);
×
125
      return TSDB_CODE_INTERNAL_ERROR;
×
126
    }
127
    pVnode->blocked = true;
3,677✔
128
    pVnode->blockSec = taosGetTimestampSec();
3,677✔
129
    pVnode->blockSeq = seq;
3,677✔
130
  }
131
  (void)taosThreadMutexUnlock(&pVnode->lock);
10,922,548✔
132

133
  if (code > 0) {
10,922,859✔
134
    vnodeHandleWriteMsg(pVnode, pMsg);
10,696,427✔
135
  } else if (code < 0) {
226,432✔
136
    if (terrno != 0) code = terrno;
11,645!
137
    vnodeHandleProposeError(pVnode, pMsg, code);
11,645✔
138
  }
139

140
  if (wait) vnodeWaitBlockMsg(pVnode, pMsg);
10,922,818✔
141
  return code;
10,922,861✔
142
}
143

144
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) {
10,996,425✔
145
  if (!vnodeShouldCommit(pVnode, atExit)) {
10,996,425✔
146
    return;
10,988,333✔
147
  }
148

149
  int32_t   contLen = sizeof(SMsgHead);
8,620✔
150
  SMsgHead *pHead = rpcMallocCont(contLen);
8,620✔
151
  pHead->contLen = contLen;
8,623✔
152
  pHead->vgId = pVnode->config.vgId;
8,623✔
153

154
  SRpcMsg rpcMsg = {0};
8,623✔
155
  rpcMsg.msgType = TDMT_VND_COMMIT;
8,623✔
156
  rpcMsg.contLen = contLen;
8,623✔
157
  rpcMsg.pCont = pHead;
8,623✔
158
  rpcMsg.info.noResp = 1;
8,623✔
159

160
  vInfo("vgId:%d, propose vnode commit", pVnode->config.vgId);
8,623✔
161
  bool isWeak = false;
8,623✔
162

163
  if (!atExit) {
8,623✔
164
    if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) {
740!
165
      vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr());
×
166
    }
167
    rpcFreeCont(rpcMsg.pCont);
740✔
168
    rpcMsg.pCont = NULL;
740✔
169
  } else {
170
    int32_t code = 0;
7,883✔
171
    if ((code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg)) < 0) {
7,883✔
172
      vError("vgId:%d, failed to put vnode commit to write_queue since %s", pVnode->config.vgId, tstrerror(code));
2,936✔
173
    }
174
  }
175
}
176

177
#if BATCH_ENABLE
178

179
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
180
  if (*arrSize <= 0) return;
181
  SRpcMsg *pLastMsg = pMsgArr[*arrSize - 1];
182

183
  (void)taosThreadMutexLock(&pVnode->lock);
184
  int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
185
  bool    wait = (code == 0 && vnodeIsBlockMsg(pLastMsg->msgType));
186
  if (wait) {
187
    pVnode->blocked = true;
188
  }
189
  (void)taosThreadMutexUnlock(&pVnode->lock);
190

191
  if (code > 0) {
192
    for (int32_t i = 0; i < *arrSize; ++i) {
193
      vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
194
    }
195
  } else if (code < 0) {
196
    if (terrno != 0) code = terrno;
197
    for (int32_t i = 0; i < *arrSize; ++i) {
198
      vnodeHandleProposeError(pVnode, pMsgArr[i], code);
199
    }
200
  }
201

202
  if (wait) vnodeWaitBlockMsg(pVnode, pLastMsg);
203
  pLastMsg = NULL;
204

205
  for (int32_t i = 0; i < *arrSize; ++i) {
206
    SRpcMsg        *pMsg = pMsgArr[i];
207
    vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
208
    rpcFreeCont(pMsg->pCont);
209
    taosFreeQitem(pMsg);
210
  }
211

212
  *arrSize = 0;
213
}
214

215
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
216
  SVnode   *pVnode = pInfo->ahandle;
217
  int32_t   vgId = pVnode->config.vgId;
218
  int32_t   code = 0;
219
  SRpcMsg  *pMsg = NULL;
220
  int32_t   arrayPos = 0;
221
  SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
222
  bool     *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
223
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
224

225
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
226
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
227
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
228
    bool isBlock = vnodeIsMsgBlock(pMsg->msgType);
229

230
    vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
231
            isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);
232

233
    if (!pVnode->restored) {
234
      vGWarn(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process since restore not finished, type:%s", vgId, pMsg,
235
             TMSG_INFO(pMsg->msgType));
236
      terrno = TSDB_CODE_SYN_RESTORING;
237
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
238
      rpcFreeCont(pMsg->pCont);
239
      taosFreeQitem(pMsg);
240
      continue;
241
    }
242

243
    if (pMsgArr == NULL || pIsWeakArr == NULL) {
244
      vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process since out of memory, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
245
      terrno = TSDB_CODE_OUT_OF_MEMORY;
246
      vnodeHandleProposeError(pVnode, pMsg, terrno);
247
      rpcFreeCont(pMsg->pCont);
248
      taosFreeQitem(pMsg);
249
      continue;
250
    }
251

252
    bool atExit = false;
253
    vnodeProposeCommitOnNeed(pVnode, atExit);
254

255
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
256
    if (code != 0) {
257
      vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to pre-process since %s", vgId, pMsg, terrstr());
258
      rpcFreeCont(pMsg->pCont);
259
      taosFreeQitem(pMsg);
260
      continue;
261
    }
262

263
    if (isBlock) {
264
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
265
    }
266

267
    pMsgArr[arrayPos] = pMsg;
268
    pIsWeakArr[arrayPos] = isWeak;
269
    arrayPos++;
270

271
    if (isBlock || msg == numOfMsgs - 1) {
272
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
273
    }
274
  }
275

276
  taosMemoryFree(pMsgArr);
277
  taosMemoryFree(pIsWeakArr);
278
}
279

280
#else
281

282
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
9,598,604✔
283
  SVnode  *pVnode = pInfo->ahandle;
9,598,604✔
284
  int32_t  vgId = pVnode->config.vgId;
9,598,604✔
285
  int32_t  code = 0;
9,598,604✔
286
  SRpcMsg *pMsg = NULL;
9,598,604✔
287
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
9,598,604✔
288

289
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
20,613,305✔
290
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
11,013,691!
291
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
11,013,857✔
292

293
    vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-write queue, weak:%d block:%d msg:%d:%d, handle:%p",
11,013,334!
294
            vgId, pMsg, isWeak, vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle);
295

296
    if (!pVnode->restored) {
11,013,701✔
297
      vGWarn(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process since restore not finished, type:%s", vgId, pMsg,
29,387!
298
             TMSG_INFO(pMsg->msgType));
299
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
29,387✔
300
      rpcFreeCont(pMsg->pCont);
29,387✔
301
      taosFreeQitem(pMsg);
29,387✔
302
      continue;
29,386✔
303
    }
304

305
    bool atExit = false;
10,984,314✔
306
    vnodeProposeCommitOnNeed(pVnode, atExit);
10,984,314✔
307

308
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
10,984,682✔
309
    if (code != 0) {
10,984,144✔
310
      if (code != TSDB_CODE_MSG_PREPROCESSED) {
62,268✔
311
        vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to pre-process since %s", vgId, pMsg, tstrerror(code));
608!
312
      }
313
      vnodeHandleProposeError(pVnode, pMsg, code);
62,268✔
314
      rpcFreeCont(pMsg->pCont);
62,386✔
315
      taosFreeQitem(pMsg);
62,507✔
316
      continue;
62,522✔
317
    }
318

319
    code = vnodeProposeMsg(pVnode, pMsg, isWeak);
10,921,876✔
320

321
    vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is freed, code:0x%x", vgId, pMsg, code);
10,922,043!
322
    rpcFreeCont(pMsg->pCont);
10,922,043✔
323
    taosFreeQitem(pMsg);
10,922,154✔
324
  }
325
}
9,599,614✔
326

327
#endif
328

329
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
2,908,088✔
330
  SVnode  *pVnode = pInfo->ahandle;
2,908,088✔
331
  int32_t  vgId = pVnode->config.vgId;
2,908,088✔
332
  int32_t  code = 0;
2,908,088✔
333
  SRpcMsg *pMsg = NULL;
2,908,088✔
334

335
  for (int32_t i = 0; i < numOfMsgs; ++i) {
7,103,768✔
336
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
4,195,656!
337

338
    if (vnodeIsMsgBlock(pMsg->msgType)) {
4,195,643✔
339
      vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-apply queue, type:%s handle:%p index:%" PRId64
30,343!
340
              ", blocking msg obtained sec:%d seq:%" PRId64,
341
              vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex, pVnode->blockSec,
342
              pVnode->blockSeq);
343
    } else {
344
      vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
4,165,297!
345
              TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
346
    }
347

348
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
4,195,680✔
349
    if (rsp.code == 0) {
4,195,680!
350
      int32_t ret = 0;
4,195,683✔
351
      int32_t count = 0;
4,195,683✔
352
      while (1) {
353
        ret = vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp);
4,195,683✔
354
        if (ret < 0) {
4,195,666✔
355
          rsp.code = ret;
1✔
356
          vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to apply since %s, index:%" PRId64, vgId, pMsg,
1!
357
                  tstrerror(ret), pMsg->info.conn.applyIndex);
358
        }
359
        if (ret == TSDB_CODE_VND_WRITE_DISABLED) {
4,195,666!
360
          if (count % 100 == 0)
×
361
            vGError(&pMsg->info.traceId,
×
362
                    "vgId:%d, msg:%p, failed to apply since %s, retry after 200ms, retry count:%d index:%" PRId64, vgId,
363
                    pMsg, tstrerror(ret), count, pMsg->info.conn.applyIndex);
364
          count++;
×
365
          taosMsleep(200);  // wait for a while before retrying
×
366
        } else{
367
          break;
4,195,666✔
368
        } 
369
      }
370
    }
371

372
    vnodePostBlockMsg(pVnode, pMsg);
4,195,663✔
373
    if (rsp.info.handle != NULL) {
4,195,662✔
374
      tmsgSendRsp(&rsp);
214,752✔
375
    } else {
376
      if (rsp.pCont) {
3,980,910✔
377
        rpcFreeCont(rsp.pCont);
3,909,848✔
378
      }
379
    }
380

381
    vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code,
4,195,672!
382
            pMsg->info.conn.applyIndex);
383
    rpcFreeCont(pMsg->pCont);
4,195,672✔
384
    taosFreeQitem(pMsg);
4,195,673✔
385
  }
386
}
2,908,112✔
387

388
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
7,643,094✔
389
  vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-sync queue, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
7,643,094!
390

391
  int32_t code = syncProcessMsg(pVnode->sync, pMsg);
7,643,101✔
392
  if (code != 0) {
7,643,142✔
393
    vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process since %s, type:%s", pVnode->config.vgId, pMsg, tstrerror(code),
112!
394
            TMSG_INFO(pMsg->msgType));
395
  }
396

397
  return code;
7,643,117✔
398
}
399

400
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
×
401
  if (pMsg == NULL || pMsg->pCont == NULL) {
×
402
    return TSDB_CODE_INVALID_PARA;
×
403
  }
404

405
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
×
406
    rpcFreeCont(pMsg->pCont);
×
407
    pMsg->pCont = NULL;
×
408
    return TSDB_CODE_INVALID_PARA;
×
409
  }
410

411
  int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
×
412
  if (code != 0) {
×
413
    rpcFreeCont(pMsg->pCont);
×
414
    pMsg->pCont = NULL;
×
415
  }
416
  return code;
×
417
}
418

419
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
401,262✔
420
  if (pMsg == NULL || pMsg->pCont == NULL) {
401,262!
421
    return TSDB_CODE_INVALID_PARA;
×
422
  }
423

424
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
401,266!
425
    rpcFreeCont(pMsg->pCont);
×
426
    pMsg->pCont = NULL;
×
427
    return TSDB_CODE_INVALID_PARA;
×
428
  }
429

430
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
401,266✔
431
  if (code != 0) {
401,263✔
432
    rpcFreeCont(pMsg->pCont);
2,062✔
433
    pMsg->pCont = NULL;
2,062✔
434
  }
435
  return code;
401,263✔
436
}
437

438
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
7,277,533✔
439
  int32_t code = tmsgSendSyncReq(pEpSet, pMsg);
7,277,533✔
440
  if (code != 0) {
7,277,569!
441
    rpcFreeCont(pMsg->pCont);
×
442
    pMsg->pCont = NULL;
×
443
  }
444
  return code;
7,277,571✔
445
}
446

447
static int32_t vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
506,463✔
448
  return vnodeGetSnapshot(pFsm->data, pSnapshot);
506,463✔
449
}
450

451
static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
4,197,731✔
452
  SVnode *pVnode = pFsm->data;
4,197,731✔
453
  pMsg->info.conn.applyIndex = pMeta->index;
4,197,731✔
454
  pMsg->info.conn.applyTerm = pMeta->term;
4,197,731✔
455

456
  vGDebug(&pMsg->info.traceId,
4,197,731!
457
          "vgId:%d, index:%" PRId64 ", execute commit cb, fsm:%p, term:%" PRIu64 ", msg-index:%" PRId64
458
          ", weak:%d, code:%d, state:%d %s, type:%s code:0x%x",
459
          pVnode->config.vgId, pMeta->index, pFsm, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
460
          pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code);
461

462
  int32_t code = tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
4,197,728✔
463
  if (code < 0) {
4,197,731✔
464
    if (code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE) {
2,060!
465
      pVnode->applyQueueErrorCount++;
×
466
      if (pVnode->applyQueueErrorCount == APPLY_QUEUE_ERROR_THRESHOLD) {
×
467
        pVnode->applyQueueErrorCount = 0;
×
468
        vWarn("vgId:%d, failed to put into apply_queue since %s", pVnode->config.vgId, tstrerror(code));
×
469
      }
470
    } else {
471
      vError("vgId:%d, failed to put into apply_queue since %s", pVnode->config.vgId, tstrerror(code));
2,060!
472
    }
473
  }
474
  return code;
4,197,732✔
475
}
476

477
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
4,197,730✔
478
  if (pMsg->code == 0) {
4,197,730!
479
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
4,197,732✔
480
  }
481

482
  SVnode *pVnode = pFsm->data;
×
483
  vnodePostBlockMsg(pVnode, pMsg);
×
484

485
  SRpcMsg rsp = {
×
486
      .code = pMsg->code,
×
487
      .info = pMsg->info,
488
  };
489

490
  if (rsp.info.handle != NULL) {
×
491
    tmsgSendRsp(&rsp);
×
492
  }
493

494
  vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is freed, code:0x%x index:%" PRId64, TD_VID(pVnode), pMsg, rsp.code,
×
495
          pMeta->index);
496
  rpcFreeCont(pMsg->pCont);
×
497
  pMsg->pCont = NULL;
×
498
  return 0;
×
499
}
500

501
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
×
502
  if (pMeta->isWeak == 1) {
×
503
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
×
504
  }
505
  return 0;
×
506
}
507

508
static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) {
11,053,637✔
509
  SVnode *pVnode = pFSM->data;
11,053,637✔
510
  return atomic_load_64(&pVnode->state.applied);
11,053,637✔
511
}
512

513
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
×
514
  SVnode *pVnode = pFsm->data;
×
515
  vGDebug(&pMsg->info.traceId,
×
516
          "vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
517
          pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),
518
          TMSG_INFO(pMsg->msgType));
519
}
×
520

521
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
110✔
522
  SVnode *pVnode = pFsm->data;
110✔
523
  return vnodeSnapReaderOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapReader **)ppReader);
110✔
524
}
525

526
static void vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
110✔
527
  SVnode *pVnode = pFsm->data;
110✔
528
  vnodeSnapReaderClose(pReader);
110✔
529
}
110✔
530

531
static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
49,766✔
532
  SVnode *pVnode = pFsm->data;
49,766✔
533
  return vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
49,766✔
534
}
535

536
static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
109✔
537
  SVnode *pVnode = pFsm->data;
109✔
538

539
  do {
×
540
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
109✔
541
    if (itemSize == 0) {
109!
542
      vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
109!
543
      break;
109✔
544
    } else {
545
      vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId, itemSize);
×
546
      taosMsleep(10);
×
547
    }
548
  } while (true);
549

550
  return vnodeSnapWriterOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapWriter **)ppWriter);
109✔
551
}
552

553
static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
109✔
554
  SVnode *pVnode = pFsm->data;
109✔
555
  vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
109!
556
        pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
557

558
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
109✔
559
  if (code != 0) {
109!
560
    vError("vgId:%d, failed to finish applying vnode snapshot since %s, code:0x%x", pVnode->config.vgId, terrstr(),
×
561
           code);
562
  }
563
  return code;
109✔
564
}
565

566
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
49,628✔
567
  SVnode *pVnode = pFsm->data;
49,628✔
568
  vDebug("vgId:%d, continue write vnode snapshot, blockLen:%d", pVnode->config.vgId, len);
49,628!
569
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
49,628✔
570
  vDebug("vgId:%d, continue write vnode snapshot finished, blockLen:%d", pVnode->config.vgId, len);
49,628!
571
  return code;
49,628✔
572
}
573

574
static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
16,430✔
575
  SVnode   *pVnode = pFsm->data;
16,430✔
576
  int32_t   vgId = pVnode->config.vgId;
16,430✔
577
  SyncIndex appliedIdx = -1;
16,430✔
578

579
  do {
580
    appliedIdx = vnodeSyncAppliedIndex(pFsm);
32,873✔
581
    if (appliedIdx > commitIdx) {
32,872!
582
      vError("vgId:%d, restore failed since applied-index:%" PRId64 " is larger than commit-index:%" PRId64, vgId,
×
583
             appliedIdx, commitIdx);
584
      break;
×
585
    }
586
    if (appliedIdx == commitIdx) {
32,873✔
587
      vInfo("vgId:%d, no items to be applied, restore finish", pVnode->config.vgId);
16,430✔
588
      break;
16,430✔
589
    } else {
590
      if (appliedIdx % 10 == 0) {
16,443✔
591
        vInfo("vgId:%d, restore not finish since %" PRId64 " items to be applied. commit-index:%" PRId64
529!
592
              ", applied-index:%" PRId64,
593
              vgId, commitIdx - appliedIdx, commitIdx, appliedIdx);
594
      } else {
595
        vDebug("vgId:%d, restore not finish since %" PRId64 " items to be applied. commit-index:%" PRId64
15,914✔
596
               ", applied-index:%" PRId64,
597
               vgId, commitIdx - appliedIdx, commitIdx, appliedIdx);
598
      }
599
      taosMsleep(10);
16,443✔
600
    }
601
  } while (true);
602

603
  walApplyVer(pVnode->pWal, commitIdx);
16,430✔
604
  pVnode->restored = true;
16,430✔
605
}
16,430✔
606

607
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
8,068✔
608
  SVnode *pVnode = pFsm->data;
8,068✔
609
  vInfo("vgId:%d, become follower", pVnode->config.vgId);
8,068!
610

611
  (void)taosThreadMutexLock(&pVnode->lock);
8,068✔
612
  if (pVnode->blocked) {
8,068!
613
    pVnode->blocked = false;
×
614
    vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
×
615
    if (tsem_post(&pVnode->syncSem) != 0) {
×
616
      vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
×
617
    }
618
  }
619
  (void)taosThreadMutexUnlock(&pVnode->lock);
8,068✔
620

621
  streamRemoveVnodeLeader(pVnode->config.vgId);
8,068✔
622
}
8,068✔
623

624
static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
294✔
625
  SVnode *pVnode = pFsm->data;
294✔
626
  vInfo("vgId:%d, become learner", pVnode->config.vgId);
294!
627

628
  (void)taosThreadMutexLock(&pVnode->lock);
294✔
629
  if (pVnode->blocked) {
294!
630
    pVnode->blocked = false;
×
631
    vDebug("vgId:%d, become learner and post block", pVnode->config.vgId);
×
632
    if (tsem_post(&pVnode->syncSem) != 0) {
×
633
      vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
×
634
    }
635
  }
636
  (void)taosThreadMutexUnlock(&pVnode->lock);
294✔
637

638
  streamRemoveVnodeLeader(pVnode->config.vgId);  
294✔
639
}
294✔
640

641
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
12,451✔
642
  SVnode *pVnode = pFsm->data;
12,451✔
643
  vDebug("vgId:%d, become leader", pVnode->config.vgId);
12,451✔
644

645
  streamAddVnodeLeader(pVnode->config.vgId);
12,451✔
646
}
12,451✔
647

648
static void vnodeBecomeAssignedLeader(const SSyncFSM *pFsm) {
×
649
  SVnode *pVnode = pFsm->data;
×
650
  vDebug("vgId:%d, become assigned leader", pVnode->config.vgId);
×
651

652
  streamAddVnodeLeader(pVnode->config.vgId);
×
653
}
×
654

655
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
×
656
  SVnode *pVnode = pFsm->data;
×
657

658
  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
×
659
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
×
660
    return (itemSize == 0);
×
661
  } else {
662
    return true;
×
663
  }
664
}
665

666
static int32_t vnodeApplyQueueItems(const SSyncFSM *pFsm) {
97,276✔
667
  SVnode *pVnode = pFsm->data;
97,276✔
668

669
  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
97,276!
670
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
97,278✔
671
    return itemSize;
97,276✔
672
  } else {
673
    return TSDB_CODE_INVALID_PARA;
×
674
  }
675
}
676

677
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
15,715✔
678
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
15,715!
679
  if (pFsm == NULL) {
15,715!
680
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
681
    return NULL;
×
682
  }
683
  pFsm->data = pVnode;
15,715✔
684
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
15,715✔
685
  pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex;
15,715✔
686
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
15,715✔
687
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
15,715✔
688
  pFsm->FpGetSnapshot = NULL;
15,715✔
689
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
15,715✔
690
  pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
15,715✔
691
  pFsm->FpAfterRestoredCb = NULL;
15,715✔
692
  pFsm->FpLeaderTransferCb = NULL;
15,715✔
693
  pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
15,715✔
694
  pFsm->FpApplyQueueItems = vnodeApplyQueueItems;
15,715✔
695
  pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
15,715✔
696
  pFsm->FpBecomeAssignedLeaderCb = vnodeBecomeAssignedLeader;
15,715✔
697
  pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
15,715✔
698
  pFsm->FpBecomeLearnerCb = vnodeBecomeLearner;
15,715✔
699
  pFsm->FpReConfigCb = NULL;
15,715✔
700
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
15,715✔
701
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
15,715✔
702
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
15,715✔
703
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
15,715✔
704
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
15,715✔
705
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;
15,715✔
706

707
  return pFsm;
15,715✔
708
}
709

710
int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion) {
15,714✔
711
  SSyncInfo syncInfo = {
15,714✔
712
      .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
713
      .batchSize = 1,
714
      .vgId = pVnode->config.vgId,
15,714✔
715
      .mountVgId = pVnode->config.mountVgId,
15,714✔
716
      .syncCfg = pVnode->config.syncCfg,
717
      .pWal = pVnode->pWal,
15,714✔
718
      .msgcb = &pVnode->msgCb,
15,714✔
719
      .syncSendMSg = vnodeSyncSendMsg,
720
      .syncEqMsg = vnodeSyncEqMsg,
721
      .syncEqCtrlMsg = vnodeSyncEqCtrlMsg,
722
      .pingMs = 5000,
723
      .electMs = 4000,
724
      .heartbeatMs = 700,
725
  };
726

727
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
15,714✔
728
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
15,714✔
729

730
  SSyncCfg *pCfg = &syncInfo.syncCfg;
15,714✔
731
  vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex);
15,714✔
732
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
41,892✔
733
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
26,177✔
734
    vInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pVnode->config.vgId, i, pNode->nodeFqdn,
26,177✔
735
          pNode->nodePort, pNode->nodeId, pNode->clusterId);
736
  }
737

738
  pVnode->sync = syncOpen(&syncInfo, vnodeVersion);
15,715✔
739
  if (pVnode->sync <= 0) {
15,715!
740
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
×
741
    return terrno;
×
742
  }
743

744
  return 0;
15,715✔
745
}
746

747
int32_t vnodeSyncStart(SVnode *pVnode) {
15,714✔
748
  vInfo("vgId:%d, start sync", pVnode->config.vgId);
15,714✔
749
  int32_t code = syncStart(pVnode->sync);
15,714✔
750
  if (code) {
15,715!
751
    vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, tstrerror(code));
×
752
    return code;
×
753
  }
754
  return 0;
15,715✔
755
}
756

757
void vnodeSyncPreClose(SVnode *pVnode) {
15,714✔
758
  vInfo("vgId:%d, sync pre close", pVnode->config.vgId);
15,714✔
759
  int32_t code = syncLeaderTransfer(pVnode->sync);
15,715✔
760
  if (code) {
15,715✔
761
    vError("vgId:%d, failed to transfer leader since %s", pVnode->config.vgId, tstrerror(code));
1,406!
762
  }
763
  syncPreStop(pVnode->sync);
15,715✔
764

765
  (void)taosThreadMutexLock(&pVnode->lock);
15,715✔
766
  if (pVnode->blocked) {
15,713✔
767
    vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
24!
768
    pVnode->blocked = false;
24✔
769
    if (tsem_post(&pVnode->syncSem) != 0) {
24!
770
      vError("vgId:%d, failed to post block", pVnode->config.vgId);
×
771
    }
772
  }
773
  (void)taosThreadMutexUnlock(&pVnode->lock);
15,713✔
774
}
15,713✔
775

776
void vnodeSyncPostClose(SVnode *pVnode) {
15,715✔
777
  vInfo("vgId:%d, sync post close", pVnode->config.vgId);
15,715✔
778
  syncPostStop(pVnode->sync);
15,715✔
779
}
15,713✔
780

781
void vnodeSyncClose(SVnode *pVnode) {
15,715✔
782
  vInfo("vgId:%d, close sync", pVnode->config.vgId);
15,715✔
783
  syncStop(pVnode->sync);
15,714✔
784
}
15,713✔
785

786
void vnodeSyncCheckTimeout(SVnode *pVnode) {
21,088✔
787
  vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId);
21,088✔
788
  (void)taosThreadMutexLock(&pVnode->lock);
21,088✔
789
  if (pVnode->blocked) {
21,088✔
790
    int32_t curSec = taosGetTimestampSec();
1✔
791
    int32_t delta = curSec - pVnode->blockSec;
1✔
792
    if (delta > VNODE_TIMEOUT_SEC) {
1!
793
      vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64,
×
794
             pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq);
795
      if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) {
×
796
#if 0
797
        SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo};
798
        vError("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq,
799
              rpcMsg.info.handle, rpcMsg.info.ahandle);
800
        rpcSendResponse(&rpcMsg);
801
#endif
802
      }
803
      pVnode->blocked = false;
×
804
      pVnode->blockSec = 0;
×
805
      pVnode->blockSeq = 0;
×
806
      if (tsem_post(&pVnode->syncSem) != 0) {
×
807
        vError("vgId:%d, failed to post block", pVnode->config.vgId);
×
808
      }
809
    }
810
  }
811
  (void)taosThreadMutexUnlock(&pVnode->lock);
21,088✔
812
}
21,088✔
813

814
bool vnodeIsRoleLeader(SVnode *pVnode) {
×
815
  SSyncState state = syncGetState(pVnode->sync);
×
816
  return state.state == TAOS_SYNC_STATE_LEADER;
×
817
}
818

819
bool vnodeIsLeader(SVnode *pVnode) {
15,712✔
820
  terrno = 0;
15,712✔
821
  SSyncState state = syncGetState(pVnode->sync);
15,711✔
822

823
  if (terrno != 0) {
15,714✔
824
    vInfo("vgId:%d, vnode is stopping", pVnode->config.vgId);
3,470!
825
    return false;
3,470✔
826
  }
827

828
  if (state.state != TAOS_SYNC_STATE_LEADER) {
12,244!
829
    terrno = TSDB_CODE_SYN_NOT_LEADER;
×
830
    vInfo("vgId:%d, vnode not leader, state:%s", pVnode->config.vgId, syncStr(state.state));
×
831
    return false;
×
832
  }
833

834
  if (!state.restored || !pVnode->restored) {
12,244!
835
    terrno = TSDB_CODE_SYN_RESTORING;
×
836
    vInfo("vgId:%d, vnode not restored:%d:%d", pVnode->config.vgId, state.restored, pVnode->restored);
×
837
    return false;
×
838
  }
839

840
  return true;
12,244✔
841
}
842

843
int64_t vnodeClusterId(SVnode *pVnode) {
×
844
  SSyncCfg *syncCfg = &pVnode->config.syncCfg;
×
845
  return syncCfg->nodeInfo[syncCfg->myIndex].clusterId;
×
846
}
847

848
int32_t vnodeNodeId(SVnode *pVnode) {
483,055✔
849
  SSyncCfg *syncCfg = &pVnode->config.syncCfg;
483,055✔
850
  return syncCfg->nodeInfo[syncCfg->myIndex].nodeId;
483,055✔
851
}
852

853
int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnap) {
506,566✔
854
  int code = 0;
506,566✔
855
  pSnap->lastApplyIndex = pVnode->state.committed;
506,566✔
856
  pSnap->lastApplyTerm = pVnode->state.commitTerm;
506,566✔
857
  pSnap->lastConfigIndex = -1;
506,566✔
858
  pSnap->state = SYNC_FSM_STATE_COMPLETE;
506,566✔
859

860
  if (tsdbSnapGetFsState(pVnode) != TSDB_FS_STATE_NORMAL) {
506,566!
861
    pSnap->state = SYNC_FSM_STATE_INCOMPLETE;
×
862
  }
863

864
  if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT || pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
506,566✔
865
    code = tsdbSnapPrepDescription(pVnode, pSnap);
215✔
866
  }
867
  return code;
506,573✔
868
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc