• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3873

21 Apr 2025 07:22AM UTC coverage: 63.063% (+0.1%) from 62.968%
#3873

push

travis-ci

GitHub
docs(opc): add perssit data support (#30783)

156631 of 316378 branches covered (49.51%)

Branch coverage included in aggregate %.

242184 of 316027 relevant lines covered (76.63%)

20271838.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.87
/source/dnode/vnode/src/vnd/vnodeSync.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "tq.h"
19
#include "tqCommon.h"
20
#include "tsdb.h"
21
#include "vnd.h"
22

23
#define BATCH_ENABLE 0
24

25
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
10,632,664✔
26

27
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
2,472✔
28
  vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
2,472!
29
          TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
30
  if (tsem_wait(&pVnode->syncSem) != 0) {
2,472!
31
    vError("vgId:%d, failed to wait sem", pVnode->config.vgId);
×
32
  }
33
}
2,472✔
34

35
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
1,556,161✔
36
  if (vnodeIsMsgBlock(pMsg->msgType)) {
1,556,161✔
37
    (void)taosThreadMutexLock(&pVnode->lock);
19,743✔
38
    if (pVnode->blocked) {
19,746✔
39
      vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
2,452!
40
              TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
41
      pVnode->blocked = false;
2,452✔
42
      pVnode->blockSec = 0;
2,452✔
43
      pVnode->blockSeq = 0;
2,452✔
44
      if (tsem_post(&pVnode->syncSem) != 0) {
2,452!
45
        vError("vgId:%d, failed to post sem", pVnode->config.vgId);
×
46
      }
47
    }
48
    (void)taosThreadMutexUnlock(&pVnode->lock);
19,746✔
49
  }
50
}
1,556,165✔
51

52
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
39,535✔
53
  SEpSet newEpSet = {0};
39,535✔
54
  syncGetRetryEpSet(pVnode->sync, &newEpSet);
39,535✔
55

56
  vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is redirect since not leader, numOfEps:%d inUse:%d",
39,535!
57
          pVnode->config.vgId, pMsg, newEpSet.numOfEps, newEpSet.inUse);
58
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
155,171✔
59
    vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i,
115,636!
60
            newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
61
  }
62
  pMsg->info.hasEpSet = 1;
39,535✔
63

64
  if (code == 0) code = TSDB_CODE_SYN_NOT_LEADER;
39,535!
65

66
  SRpcMsg rsp = {.code = code, .info = pMsg->info, .msgType = pMsg->msgType + 1};
39,535✔
67
  int32_t contLen = tSerializeSEpSet(NULL, 0, &newEpSet);
39,535✔
68

69
  rsp.pCont = rpcMallocCont(contLen);
39,535✔
70
  if (rsp.pCont == NULL) {
39,535!
71
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
72
  } else {
73
    if (tSerializeSEpSet(rsp.pCont, contLen, &newEpSet) < 0) {
39,535!
74
      vError("vgId:%d, failed to serialize ep set", pVnode->config.vgId);
×
75
    }
76
    rsp.contLen = contLen;
39,535✔
77
  }
78

79
  tmsgSendRsp(&rsp);
39,535✔
80
}
39,535✔
81

82
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
10,364,365✔
83
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
10,364,365✔
84
  if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
10,364,365!
85
    rsp.code = terrno;
×
86
    vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to apply right now since %s", pVnode->config.vgId, pMsg,
×
87
            terrstr());
88
  }
89
  if (rsp.info.handle != NULL) {
10,364,516✔
90
    tmsgSendRsp(&rsp);
10,327,408✔
91
  } else {
92
    if (rsp.pCont) {
37,108✔
93
      rpcFreeCont(rsp.pCont);
25,852✔
94
    }
95
  }
96
}
10,364,641✔
97

98
static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
133,099✔
99
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
133,099✔
100
    vnodeRedirectRpcMsg(pVnode, pMsg, code);
27,879✔
101
  } else if (code == TSDB_CODE_MSG_PREPROCESSED) {
105,220✔
102
    SRpcMsg rsp = {.code = TSDB_CODE_SUCCESS, .info = pMsg->info};
104,136✔
103
    if (rsp.info.handle != NULL) {
104,136!
104
      tmsgSendRsp(&rsp);
104,240✔
105
    }
106
  } else {
107
    vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg,
1,084!
108
            tstrerror(code), code);
109
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
1,099✔
110
    if (rsp.info.handle != NULL) {
1,099✔
111
      tmsgSendRsp(&rsp);
686✔
112
    }
113
  }
114
}
133,405✔
115

116
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
10,507,872✔
117
  int64_t seq = 0;
10,507,872✔
118

119
  (void)taosThreadMutexLock(&pVnode->lock);
10,507,872✔
120
  int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
10,508,128✔
121
  bool    wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
10,507,770✔
122
  if (wait) {
10,507,771✔
123
    if (pVnode->blocked) {
2,473!
124
      (void)taosThreadMutexUnlock(&pVnode->lock);
×
125
      return TSDB_CODE_INTERNAL_ERROR;
×
126
    }
127
    pVnode->blocked = true;
2,473✔
128
    pVnode->blockSec = taosGetTimestampSec();
2,473✔
129
    pVnode->blockSeq = seq;
2,472✔
130
  }
131
  (void)taosThreadMutexUnlock(&pVnode->lock);
10,507,770✔
132

133
  if (code > 0) {
10,508,108✔
134
    vnodeHandleWriteMsg(pVnode, pMsg);
10,364,706✔
135
  } else if (code < 0) {
143,402✔
136
    if (terrno != 0) code = terrno;
7,249!
137
    vnodeHandleProposeError(pVnode, pMsg, code);
7,249✔
138
  }
139

140
  if (wait) vnodeWaitBlockMsg(pVnode, pMsg);
10,507,995✔
141
  return code;
10,508,063✔
142
}
143

144
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) {
10,623,026✔
145
  if (!vnodeShouldCommit(pVnode, atExit)) {
10,623,026✔
146
    return;
10,614,303✔
147
  }
148

149
  int32_t   contLen = sizeof(SMsgHead);
9,401✔
150
  SMsgHead *pHead = rpcMallocCont(contLen);
9,401✔
151
  pHead->contLen = contLen;
9,405✔
152
  pHead->vgId = pVnode->config.vgId;
9,405✔
153

154
  SRpcMsg rpcMsg = {0};
9,405✔
155
  rpcMsg.msgType = TDMT_VND_COMMIT;
9,405✔
156
  rpcMsg.contLen = contLen;
9,405✔
157
  rpcMsg.pCont = pHead;
9,405✔
158
  rpcMsg.info.noResp = 1;
9,405✔
159

160
  vInfo("vgId:%d, propose vnode commit", pVnode->config.vgId);
9,405✔
161
  bool isWeak = false;
9,406✔
162

163
  if (!atExit) {
9,406✔
164
    if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) {
1,099!
165
      vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr());
×
166
    }
167
    rpcFreeCont(rpcMsg.pCont);
1,099✔
168
    rpcMsg.pCont = NULL;
1,099✔
169
  } else {
170
    int32_t code = 0;
8,307✔
171
    if ((code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg)) < 0) {
8,307✔
172
      vError("vgId:%d, failed to put vnode commit to write_queue since %s", pVnode->config.vgId, tstrerror(code));
3,639✔
173
    }
174
  }
175
}
176

177
#if BATCH_ENABLE
178

179
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
180
  if (*arrSize <= 0) return;
181
  SRpcMsg *pLastMsg = pMsgArr[*arrSize - 1];
182

183
  (void)taosThreadMutexLock(&pVnode->lock);
184
  int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
185
  bool    wait = (code == 0 && vnodeIsBlockMsg(pLastMsg->msgType));
186
  if (wait) {
187
    pVnode->blocked = true;
188
  }
189
  (void)taosThreadMutexUnlock(&pVnode->lock);
190

191
  if (code > 0) {
192
    for (int32_t i = 0; i < *arrSize; ++i) {
193
      vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
194
    }
195
  } else if (code < 0) {
196
    if (terrno != 0) code = terrno;
197
    for (int32_t i = 0; i < *arrSize; ++i) {
198
      vnodeHandleProposeError(pVnode, pMsgArr[i], code);
199
    }
200
  }
201

202
  if (wait) vnodeWaitBlockMsg(pVnode, pLastMsg);
203
  pLastMsg = NULL;
204

205
  for (int32_t i = 0; i < *arrSize; ++i) {
206
    SRpcMsg        *pMsg = pMsgArr[i];
207
    vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
208
    rpcFreeCont(pMsg->pCont);
209
    taosFreeQitem(pMsg);
210
  }
211

212
  *arrSize = 0;
213
}
214

215
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
216
  SVnode   *pVnode = pInfo->ahandle;
217
  int32_t   vgId = pVnode->config.vgId;
218
  int32_t   code = 0;
219
  SRpcMsg  *pMsg = NULL;
220
  int32_t   arrayPos = 0;
221
  SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
222
  bool     *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
223
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
224

225
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
226
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
227
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
228
    bool isBlock = vnodeIsMsgBlock(pMsg->msgType);
229

230
    vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
231
            isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);
232

233
    if (!pVnode->restored) {
234
      vGWarn(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process since restore not finished, type:%s", vgId, pMsg,
235
             TMSG_INFO(pMsg->msgType));
236
      terrno = TSDB_CODE_SYN_RESTORING;
237
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
238
      rpcFreeCont(pMsg->pCont);
239
      taosFreeQitem(pMsg);
240
      continue;
241
    }
242

243
    if (pMsgArr == NULL || pIsWeakArr == NULL) {
244
      vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process since out of memory, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
245
      terrno = TSDB_CODE_OUT_OF_MEMORY;
246
      vnodeHandleProposeError(pVnode, pMsg, terrno);
247
      rpcFreeCont(pMsg->pCont);
248
      taosFreeQitem(pMsg);
249
      continue;
250
    }
251

252
    bool atExit = false;
253
    vnodeProposeCommitOnNeed(pVnode, atExit);
254

255
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
256
    if (code != 0) {
257
      vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to pre-process since %s", vgId, pMsg, terrstr());
258
      rpcFreeCont(pMsg->pCont);
259
      taosFreeQitem(pMsg);
260
      continue;
261
    }
262

263
    if (isBlock) {
264
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
265
    }
266

267
    pMsgArr[arrayPos] = pMsg;
268
    pIsWeakArr[arrayPos] = isWeak;
269
    arrayPos++;
270

271
    if (isBlock || msg == numOfMsgs - 1) {
272
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
273
    }
274
  }
275

276
  taosMemoryFree(pMsgArr);
277
  taosMemoryFree(pIsWeakArr);
278
}
279

280
#else
281

282
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
8,839,609✔
283
  SVnode  *pVnode = pInfo->ahandle;
8,839,609✔
284
  int32_t  vgId = pVnode->config.vgId;
8,839,609✔
285
  int32_t  code = 0;
8,839,609✔
286
  SRpcMsg *pMsg = NULL;
8,839,609✔
287
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
8,839,609✔
288

289
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
19,473,256✔
290
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
10,632,821!
291
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
10,632,979✔
292

293
    vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-write queue, weak:%d block:%d msg:%d:%d, handle:%p",
10,632,655!
294
            vgId, pMsg, isWeak, vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle);
295

296
    if (!pVnode->restored) {
10,632,870✔
297
      vGWarn(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process since restore not finished, type:%s", vgId, pMsg,
21,324!
298
             TMSG_INFO(pMsg->msgType));
299
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
21,324✔
300
      rpcFreeCont(pMsg->pCont);
21,324✔
301
      taosFreeQitem(pMsg);
21,324✔
302
      continue;
21,324✔
303
    }
304

305
    bool atExit = false;
10,611,546✔
306
    vnodeProposeCommitOnNeed(pVnode, atExit);
10,611,546✔
307

308
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
10,611,972✔
309
    if (code != 0) {
10,611,321✔
310
      if (code != TSDB_CODE_MSG_PREPROCESSED) {
104,548✔
311
        vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to pre-process since %s", vgId, pMsg, tstrerror(code));
529!
312
      }
313
      vnodeHandleProposeError(pVnode, pMsg, code);
104,548✔
314
      rpcFreeCont(pMsg->pCont);
104,785✔
315
      taosFreeQitem(pMsg);
104,900✔
316
      continue;
104,937✔
317
    }
318

319
    code = vnodeProposeMsg(pVnode, pMsg, isWeak);
10,506,773✔
320

321
    vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is freed, code:0x%x", vgId, pMsg, code);
10,506,855!
322
    rpcFreeCont(pMsg->pCont);
10,506,855✔
323
    taosFreeQitem(pMsg);
10,507,003✔
324
  }
325
}
8,840,435✔
326

327
#endif
328

329
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
956,246✔
330
  SVnode  *pVnode = pInfo->ahandle;
956,246✔
331
  int32_t  vgId = pVnode->config.vgId;
956,246✔
332
  int32_t  code = 0;
956,246✔
333
  SRpcMsg *pMsg = NULL;
956,246✔
334

335
  for (int32_t i = 0; i < numOfMsgs; ++i) {
2,512,409✔
336
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
1,556,158!
337

338
    if (vnodeIsMsgBlock(pMsg->msgType)) {
1,556,154✔
339
      vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-apply queue, type:%s handle:%p index:%" PRId64
19,743!
340
              ", blocking msg obtained sec:%d seq:%" PRId64,
341
              vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex, pVnode->blockSec,
342
              pVnode->blockSeq);
343
    } else {
344
      vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
1,536,412!
345
              TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
346
    }
347

348
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
1,556,165✔
349
    if (rsp.code == 0) {
1,556,165!
350
      if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
1,556,169!
351
        rsp.code = terrno;
×
352
        vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to apply since %s, index:%" PRId64, vgId, pMsg, terrstr(),
×
353
                pMsg->info.conn.applyIndex);
354
      }
355
    }
356

357
    vnodePostBlockMsg(pVnode, pMsg);
1,556,158✔
358
    if (rsp.info.handle != NULL) {
1,556,165✔
359
      tmsgSendRsp(&rsp);
136,091✔
360
    } else {
361
      if (rsp.pCont) {
1,420,074✔
362
        rpcFreeCont(rsp.pCont);
1,369,354✔
363
      }
364
    }
365

366
    vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code,
1,556,164!
367
            pMsg->info.conn.applyIndex);
368
    rpcFreeCont(pMsg->pCont);
1,556,164✔
369
    taosFreeQitem(pMsg);
1,556,160✔
370
  }
371
}
956,251✔
372

373
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
2,711,838✔
374
  vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-sync queue, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
2,711,838!
375

376
  int32_t code = syncProcessMsg(pVnode->sync, pMsg);
2,711,847✔
377
  if (code != 0) {
2,711,844✔
378
    vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process since %s, type:%s", pVnode->config.vgId, pMsg, tstrerror(code),
17!
379
            TMSG_INFO(pMsg->msgType));
380
  }
381

382
  return code;
2,711,809✔
383
}
384

385
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
×
386
  if (pMsg == NULL || pMsg->pCont == NULL) {
×
387
    return TSDB_CODE_INVALID_PARA;
×
388
  }
389

390
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
×
391
    rpcFreeCont(pMsg->pCont);
×
392
    pMsg->pCont = NULL;
×
393
    return TSDB_CODE_INVALID_PARA;
×
394
  }
395

396
  int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
×
397
  if (code != 0) {
×
398
    rpcFreeCont(pMsg->pCont);
×
399
    pMsg->pCont = NULL;
×
400
  }
401
  return code;
×
402
}
403

404
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
388,515✔
405
  if (pMsg == NULL || pMsg->pCont == NULL) {
388,515!
406
    return TSDB_CODE_INVALID_PARA;
×
407
  }
408

409
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
388,516!
410
    rpcFreeCont(pMsg->pCont);
×
411
    pMsg->pCont = NULL;
×
412
    return TSDB_CODE_INVALID_PARA;
×
413
  }
414

415
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
388,517✔
416
  if (code != 0) {
388,515✔
417
    rpcFreeCont(pMsg->pCont);
1,352✔
418
    pMsg->pCont = NULL;
1,350✔
419
  }
420
  return code;
388,513✔
421
}
422

423
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
2,347,780✔
424
  int32_t code = tmsgSendSyncReq(pEpSet, pMsg);
2,347,780✔
425
  if (code != 0) {
2,347,799!
426
    rpcFreeCont(pMsg->pCont);
×
427
    pMsg->pCont = NULL;
×
428
  }
429
  return code;
2,347,799✔
430
}
431

432
static int32_t vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
466,558✔
433
  return vnodeGetSnapshot(pFsm->data, pSnapshot);
466,558✔
434
}
435

436
static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
1,556,173✔
437
  SVnode *pVnode = pFsm->data;
1,556,173✔
438
  pMsg->info.conn.applyIndex = pMeta->index;
1,556,173✔
439
  pMsg->info.conn.applyTerm = pMeta->term;
1,556,173✔
440

441
  vGDebug(&pMsg->info.traceId,
1,556,173!
442
          "vgId:%d, index:%" PRId64 ", execute commit cb, fsm:%p, term:%" PRIu64 ", msg-index:%" PRId64
443
          ", weak:%d, code:%d, state:%d %s, type:%s code:0x%x",
444
          pVnode->config.vgId, pMeta->index, pFsm, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
445
          pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code);
446

447
  int32_t code = tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
1,556,176✔
448
  if (code < 0) {
1,556,173✔
449
    if (code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE) {
9!
450
      pVnode->applyQueueErrorCount++;
×
451
      if (pVnode->applyQueueErrorCount == APPLY_QUEUE_ERROR_THRESHOLD) {
×
452
        pVnode->applyQueueErrorCount = 0;
×
453
        vWarn("vgId:%d, failed to put into apply_queue since %s", pVnode->config.vgId, tstrerror(code));
×
454
      }
455
    } else {
456
      vError("vgId:%d, failed to put into apply_queue since %s", pVnode->config.vgId, tstrerror(code));
9!
457
    }
458
  }
459
  return code;
1,556,173✔
460
}
461

462
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
1,556,175✔
463
  if (pMsg->code == 0) {
1,556,175!
464
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
1,556,175✔
465
  }
466

467
  SVnode *pVnode = pFsm->data;
×
468
  vnodePostBlockMsg(pVnode, pMsg);
×
469

470
  SRpcMsg rsp = {
×
471
      .code = pMsg->code,
×
472
      .info = pMsg->info,
473
  };
474

475
  if (rsp.info.handle != NULL) {
×
476
    tmsgSendRsp(&rsp);
×
477
  }
478

479
  vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is freed, code:0x%x index:%" PRId64, TD_VID(pVnode), pMsg, rsp.code,
×
480
          pMeta->index);
481
  rpcFreeCont(pMsg->pCont);
×
482
  pMsg->pCont = NULL;
×
483
  return 0;
×
484
}
485

486
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
×
487
  if (pMeta->isWeak == 1) {
×
488
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
×
489
  }
490
  return 0;
×
491
}
492

493
static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) {
10,633,106✔
494
  SVnode *pVnode = pFSM->data;
10,633,106✔
495
  return atomic_load_64(&pVnode->state.applied);
10,633,106✔
496
}
497

498
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
×
499
  SVnode *pVnode = pFsm->data;
×
500
  vGDebug(&pMsg->info.traceId,
×
501
          "vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
502
          pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),
503
          TMSG_INFO(pMsg->msgType));
504
}
×
505

506
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
61✔
507
  SVnode *pVnode = pFsm->data;
61✔
508
  return vnodeSnapReaderOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapReader **)ppReader);
61✔
509
}
510

511
static void vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
61✔
512
  SVnode *pVnode = pFsm->data;
61✔
513
  vnodeSnapReaderClose(pReader);
61✔
514
}
61✔
515

516
static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
13,841✔
517
  SVnode *pVnode = pFsm->data;
13,841✔
518
  return vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
13,841✔
519
}
520

521
static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
59✔
522
  SVnode *pVnode = pFsm->data;
59✔
523

524
  do {
×
525
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
59✔
526
    if (itemSize == 0) {
59!
527
      vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
59!
528
      break;
59✔
529
    } else {
530
      vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId, itemSize);
×
531
      taosMsleep(10);
×
532
    }
533
  } while (true);
534

535
  return vnodeSnapWriterOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapWriter **)ppWriter);
59✔
536
}
537

538
static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
59✔
539
  SVnode *pVnode = pFsm->data;
59✔
540
  vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
59!
541
        pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
542

543
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
59✔
544
  if (code != 0) {
59!
545
    vError("vgId:%d, failed to finish applying vnode snapshot since %s, code:0x%x", pVnode->config.vgId, terrstr(),
×
546
           code);
547
  }
548
  return code;
59✔
549
}
550

551
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
13,722✔
552
  SVnode *pVnode = pFsm->data;
13,722✔
553
  vDebug("vgId:%d, continue write vnode snapshot, blockLen:%d", pVnode->config.vgId, len);
13,722!
554
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
13,722✔
555
  vDebug("vgId:%d, continue write vnode snapshot finished, blockLen:%d", pVnode->config.vgId, len);
13,722!
556
  return code;
13,722✔
557
}
558

559
static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
14,194✔
560
  SVnode   *pVnode = pFsm->data;
14,194✔
561
  int32_t   vgId = pVnode->config.vgId;
14,194✔
562
  SyncIndex appliedIdx = -1;
14,194✔
563

564
  do {
565
    appliedIdx = vnodeSyncAppliedIndex(pFsm);
31,227✔
566
    if (appliedIdx > commitIdx) {
31,226!
567
      vError("vgId:%d, restore failed since applied-index:%" PRId64 " is larger than commit-index:%" PRId64, vgId,
×
568
             appliedIdx, commitIdx);
569
      break;
×
570
    }
571
    if (appliedIdx == commitIdx) {
31,227✔
572
      vInfo("vgId:%d, no items to be applied, restore finish", pVnode->config.vgId);
14,194✔
573
      break;
14,194✔
574
    } else {
575
      vInfo("vgId:%d, restore not finish since %" PRId64 " items to be applied. commit-index:%" PRId64
17,033✔
576
            ", applied-index:%" PRId64,
577
            vgId, commitIdx - appliedIdx, commitIdx, appliedIdx);
578
      taosMsleep(10);
17,034✔
579
    }
580
  } while (true);
581

582
  walApplyVer(pVnode->pWal, commitIdx);
14,194✔
583
  pVnode->restored = true;
14,194✔
584

585
#ifdef USE_STREAM
586
  SStreamMeta *pMeta = pVnode->pTq->pStreamMeta;
14,194✔
587
  streamMetaWLock(pMeta);
14,194✔
588

589
  if (pMeta->startInfo.tasksWillRestart) {
14,194✔
590
    vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
973!
591
    streamMetaWUnLock(pMeta);
973✔
592
    return;
973✔
593
  }
594

595
  if (vnodeIsRoleLeader(pVnode)) {
13,221✔
596
    // start to restore all stream tasks
597
    if (tsDisableStream) {
10,736!
598
      vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
×
599
    } else {
600
      vInfo("vgId:%d sync restore finished, start to launch stream task(s)", vgId);
10,736✔
601
      if (pMeta->startInfo.startAllTasks == 1) {
10,736!
602
        pMeta->startInfo.restartCount += 1;
×
603
        vDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
×
604
               pMeta->startInfo.restartCount);
605
      } else {
606
        pMeta->startInfo.startAllTasks = 1;
10,736✔
607
        streamMetaWUnLock(pMeta);
10,736✔
608

609
        tqInfo("vgId:%d stream task already loaded, start them", vgId);
10,736!
610
        int32_t code = streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS, false);
10,736✔
611
        if (code != 0) {
10,736!
612
          tqError("vgId:%d failed to sched stream task, code:%s", vgId, tstrerror(code));
×
613
        }
614
        return;
10,736✔
615
      }
616
    }
617
  } else {
618
    vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
2,485!
619
  }
620

621
  streamMetaWUnLock(pMeta);
2,485✔
622
#endif
623
}
624

625
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
4,940✔
626
  SVnode *pVnode = pFsm->data;
4,940✔
627
  vInfo("vgId:%d, become follower", pVnode->config.vgId);
4,940!
628

629
  (void)taosThreadMutexLock(&pVnode->lock);
4,941✔
630
  if (pVnode->blocked) {
4,941!
631
    pVnode->blocked = false;
×
632
    vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
×
633
    if (tsem_post(&pVnode->syncSem) != 0) {
×
634
      vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
×
635
    }
636
  }
637
  (void)taosThreadMutexUnlock(&pVnode->lock);
4,941✔
638

639
#ifdef USE_TQ
640
  if (pVnode->pTq) {
4,941!
641
    tqUpdateNodeStage(pVnode->pTq, false);
4,941✔
642
    if (tqStopStreamAllTasksAsync(pVnode->pTq->pStreamMeta, &pVnode->msgCb) != 0) {
4,941!
643
      vError("vgId:%d, failed to stop stream tasks", pVnode->config.vgId);
×
644
    }
645
  }
646
#endif
647
}
4,941✔
648

649
static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
174✔
650
  SVnode *pVnode = pFsm->data;
174✔
651
  vInfo("vgId:%d, become learner", pVnode->config.vgId);
174!
652

653
  (void)taosThreadMutexLock(&pVnode->lock);
174✔
654
  if (pVnode->blocked) {
174!
655
    pVnode->blocked = false;
×
656
    vDebug("vgId:%d, become learner and post block", pVnode->config.vgId);
×
657
    if (tsem_post(&pVnode->syncSem) != 0) {
×
658
      vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
×
659
    }
660
  }
661
  (void)taosThreadMutexUnlock(&pVnode->lock);
174✔
662
}
174✔
663

664
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
11,711✔
665
  SVnode *pVnode = pFsm->data;
11,711✔
666
  vDebug("vgId:%d, become leader", pVnode->config.vgId);
11,711✔
667
#ifdef USE_TQ
668
  if (pVnode->pTq) {
11,710!
669
    tqUpdateNodeStage(pVnode->pTq, true);
11,710✔
670
  }
671
#endif
672
}
11,711✔
673

674
static void vnodeBecomeAssignedLeader(const SSyncFSM *pFsm) {
×
675
  SVnode *pVnode = pFsm->data;
×
676
  vDebug("vgId:%d, become assigned leader", pVnode->config.vgId);
×
677
#ifdef USE_TQ
678
  if (pVnode->pTq) {
×
679
    tqUpdateNodeStage(pVnode->pTq, true);
×
680
  }
681
#endif
682
}
×
683

684
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
×
685
  SVnode *pVnode = pFsm->data;
×
686

687
  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
×
688
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
×
689
    return (itemSize == 0);
×
690
  } else {
691
    return true;
×
692
  }
693
}
694

695
static int32_t vnodeApplyQueueItems(const SSyncFSM *pFsm) {
89,499✔
696
  SVnode *pVnode = pFsm->data;
89,499✔
697

698
  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
89,499!
699
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
89,504✔
700
    return itemSize;
89,503✔
701
  } else {
702
    return TSDB_CODE_INVALID_PARA;
×
703
  }
704
}
705

706
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
13,745✔
707
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
13,745!
708
  if (pFsm == NULL) {
13,745!
709
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
710
    return NULL;
×
711
  }
712
  pFsm->data = pVnode;
13,745✔
713
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
13,745✔
714
  pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex;
13,745✔
715
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
13,745✔
716
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
13,745✔
717
  pFsm->FpGetSnapshot = NULL;
13,745✔
718
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
13,745✔
719
  pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
13,745✔
720
  pFsm->FpAfterRestoredCb = NULL;
13,745✔
721
  pFsm->FpLeaderTransferCb = NULL;
13,745✔
722
  pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
13,745✔
723
  pFsm->FpApplyQueueItems = vnodeApplyQueueItems;
13,745✔
724
  pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
13,745✔
725
  pFsm->FpBecomeAssignedLeaderCb = vnodeBecomeAssignedLeader;
13,745✔
726
  pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
13,745✔
727
  pFsm->FpBecomeLearnerCb = vnodeBecomeLearner;
13,745✔
728
  pFsm->FpReConfigCb = NULL;
13,745✔
729
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
13,745✔
730
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
13,745✔
731
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
13,745✔
732
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
13,745✔
733
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
13,745✔
734
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;
13,745✔
735

736
  return pFsm;
13,745✔
737
}
738

739
int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion) {
13,745✔
740
  SSyncInfo syncInfo = {
13,745✔
741
      .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
742
      .batchSize = 1,
743
      .vgId = pVnode->config.vgId,
13,745✔
744
      .syncCfg = pVnode->config.syncCfg,
745
      .pWal = pVnode->pWal,
13,745✔
746
      .msgcb = &pVnode->msgCb,
13,745✔
747
      .syncSendMSg = vnodeSyncSendMsg,
748
      .syncEqMsg = vnodeSyncEqMsg,
749
      .syncEqCtrlMsg = vnodeSyncEqCtrlMsg,
750
      .pingMs = 5000,
751
      .electMs = 4000,
752
      .heartbeatMs = 700,
753
  };
754

755
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
13,745✔
756
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
13,745✔
757

758
  SSyncCfg *pCfg = &syncInfo.syncCfg;
13,745✔
759
  vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex);
13,745✔
760
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
33,887✔
761
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
20,142✔
762
    vInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pVnode->config.vgId, i, pNode->nodeFqdn,
20,142✔
763
          pNode->nodePort, pNode->nodeId, pNode->clusterId);
764
  }
765

766
  pVnode->sync = syncOpen(&syncInfo, vnodeVersion);
13,745✔
767
  if (pVnode->sync <= 0) {
13,745!
768
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
×
769
    return terrno;
×
770
  }
771

772
  return 0;
13,745✔
773
}
774

775
int32_t vnodeSyncStart(SVnode *pVnode) {
13,744✔
776
  vInfo("vgId:%d, start sync", pVnode->config.vgId);
13,744✔
777
  int32_t code = syncStart(pVnode->sync);
13,745✔
778
  if (code) {
13,745!
779
    vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, tstrerror(code));
×
780
    return code;
×
781
  }
782
  return 0;
13,745✔
783
}
784

785
void vnodeSyncPreClose(SVnode *pVnode) {
13,744✔
786
  vInfo("vgId:%d, sync pre close", pVnode->config.vgId);
13,744✔
787
  int32_t code = syncLeaderTransfer(pVnode->sync);
13,745✔
788
  if (code) {
13,745✔
789
    vError("vgId:%d, failed to transfer leader since %s", pVnode->config.vgId, tstrerror(code));
944!
790
  }
791
  syncPreStop(pVnode->sync);
13,745✔
792

793
  (void)taosThreadMutexLock(&pVnode->lock);
13,744✔
794
  if (pVnode->blocked) {
13,742✔
795
    vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
21!
796
    pVnode->blocked = false;
21✔
797
    if (tsem_post(&pVnode->syncSem) != 0) {
21!
798
      vError("vgId:%d, failed to post block", pVnode->config.vgId);
×
799
    }
800
  }
801
  (void)taosThreadMutexUnlock(&pVnode->lock);
13,742✔
802
}
13,744✔
803

804
void vnodeSyncPostClose(SVnode *pVnode) {
13,745✔
805
  vInfo("vgId:%d, sync post close", pVnode->config.vgId);
13,745✔
806
  syncPostStop(pVnode->sync);
13,745✔
807
}
13,745✔
808

809
void vnodeSyncClose(SVnode *pVnode) {
13,744✔
810
  vInfo("vgId:%d, close sync", pVnode->config.vgId);
13,744✔
811
  syncStop(pVnode->sync);
13,745✔
812
}
13,742✔
813

814
void vnodeSyncCheckTimeout(SVnode *pVnode) {
33,998✔
815
  vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId);
33,998✔
816
  (void)taosThreadMutexLock(&pVnode->lock);
33,998✔
817
  if (pVnode->blocked) {
33,998!
818
    int32_t curSec = taosGetTimestampSec();
×
819
    int32_t delta = curSec - pVnode->blockSec;
×
820
    if (delta > VNODE_TIMEOUT_SEC) {
×
821
      vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64,
×
822
             pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq);
823
      if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) {
×
824
#if 0
825
        SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo};
826
        vError("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq,
827
              rpcMsg.info.handle, rpcMsg.info.ahandle);
828
        rpcSendResponse(&rpcMsg);
829
#endif
830
      }
831
      pVnode->blocked = false;
×
832
      pVnode->blockSec = 0;
×
833
      pVnode->blockSeq = 0;
×
834
      if (tsem_post(&pVnode->syncSem) != 0) {
×
835
        vError("vgId:%d, failed to post block", pVnode->config.vgId);
×
836
      }
837
    }
838
  }
839
  (void)taosThreadMutexUnlock(&pVnode->lock);
33,998✔
840
}
33,998✔
841

842
bool vnodeIsRoleLeader(SVnode *pVnode) {
156,107✔
843
  SSyncState state = syncGetState(pVnode->sync);
156,107✔
844
  return state.state == TAOS_SYNC_STATE_LEADER;
156,162✔
845
}
846

847
bool vnodeIsLeader(SVnode *pVnode) {
20,528✔
848
  terrno = 0;
20,528✔
849
  SSyncState state = syncGetState(pVnode->sync);
20,541✔
850

851
  if (terrno != 0) {
20,554✔
852
    vInfo("vgId:%d, vnode is stopping", pVnode->config.vgId);
2,195!
853
    return false;
2,195✔
854
  }
855

856
  if (state.state != TAOS_SYNC_STATE_LEADER) {
18,343!
857
    terrno = TSDB_CODE_SYN_NOT_LEADER;
×
858
    vInfo("vgId:%d, vnode not leader, state:%s", pVnode->config.vgId, syncStr(state.state));
×
859
    return false;
×
860
  }
861

862
  if (!state.restored || !pVnode->restored) {
18,343!
863
    terrno = TSDB_CODE_SYN_RESTORING;
×
864
    vInfo("vgId:%d, vnode not restored:%d:%d", pVnode->config.vgId, state.restored, pVnode->restored);
×
865
    return false;
×
866
  }
867

868
  return true;
18,344✔
869
}
870

871
int64_t vnodeClusterId(SVnode *pVnode) {
×
872
  SSyncCfg *syncCfg = &pVnode->config.syncCfg;
×
873
  return syncCfg->nodeInfo[syncCfg->myIndex].clusterId;
×
874
}
875

876
int32_t vnodeNodeId(SVnode *pVnode) {
858,465✔
877
  SSyncCfg *syncCfg = &pVnode->config.syncCfg;
858,465✔
878
  return syncCfg->nodeInfo[syncCfg->myIndex].nodeId;
858,465✔
879
}
880

881
int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnap) {
466,615✔
882
  int code = 0;
466,615✔
883
  pSnap->lastApplyIndex = pVnode->state.committed;
466,615✔
884
  pSnap->lastApplyTerm = pVnode->state.commitTerm;
466,615✔
885
  pSnap->lastConfigIndex = -1;
466,615✔
886
  pSnap->state = SYNC_FSM_STATE_COMPLETE;
466,615✔
887

888
  if (tsdbSnapGetFsState(pVnode) != TSDB_FS_STATE_NORMAL) {
466,615!
889
    pSnap->state = SYNC_FSM_STATE_INCOMPLETE;
×
890
  }
891

892
  if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT || pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
466,631✔
893
    code = tsdbSnapPrepDescription(pVnode, pSnap);
121✔
894
  }
895
  return code;
466,633✔
896
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc