• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3663

19 Mar 2025 09:21AM UTC coverage: 61.664% (-0.6%) from 62.28%
#3663

push

travis-ci

web-flow
docs: add defination of tmq_config_res_t & fix spell error (#30271)

153169 of 318241 branches covered (48.13%)

Branch coverage included in aggregate %.

239405 of 318390 relevant lines covered (75.19%)

5762846.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.62
/source/dnode/vnode/src/vnd/vnodeSync.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "tq.h"
19
#include "tqCommon.h"
20
#include "tsdb.h"
21
#include "vnd.h"
22

23
#define BATCH_ENABLE 0
24

25
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
1,901,219✔
26

27
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
2,438✔
28
  vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
2,438!
29
          TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
30
  if (tsem_wait(&pVnode->syncSem) != 0) {
2,438!
31
    vError("vgId:%d, failed to wait sem", pVnode->config.vgId);
×
32
  }
33
}
2,437✔
34

35
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
791,505✔
36
  if (vnodeIsMsgBlock(pMsg->msgType)) {
791,505✔
37
    (void)taosThreadMutexLock(&pVnode->lock);
18,351✔
38
    if (pVnode->blocked) {
18,351✔
39
      vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
2,416!
40
              TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
41
      pVnode->blocked = false;
2,416✔
42
      pVnode->blockSec = 0;
2,416✔
43
      pVnode->blockSeq = 0;
2,416✔
44
      if (tsem_post(&pVnode->syncSem) != 0) {
2,416!
45
        vError("vgId:%d, failed to post sem", pVnode->config.vgId);
×
46
      }
47
    }
48
    (void)taosThreadMutexUnlock(&pVnode->lock);
18,351✔
49
  }
50
}
791,506✔
51

52
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
37,225✔
53
  SEpSet newEpSet = {0};
37,225✔
54
  syncGetRetryEpSet(pVnode->sync, &newEpSet);
37,225✔
55

56
  vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is redirect since not leader, numOfEps:%d inUse:%d",
37,225!
57
          pVnode->config.vgId, pMsg, newEpSet.numOfEps, newEpSet.inUse);
58
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
145,342✔
59
    vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i,
108,117!
60
            newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
61
  }
62
  pMsg->info.hasEpSet = 1;
37,225✔
63

64
  if (code == 0) code = TSDB_CODE_SYN_NOT_LEADER;
37,225!
65

66
  SRpcMsg rsp = {.code = code, .info = pMsg->info, .msgType = pMsg->msgType + 1};
37,225✔
67
  int32_t contLen = tSerializeSEpSet(NULL, 0, &newEpSet);
37,225✔
68

69
  rsp.pCont = rpcMallocCont(contLen);
37,225✔
70
  if (rsp.pCont == NULL) {
37,225!
71
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
72
  } else {
73
    if (tSerializeSEpSet(rsp.pCont, contLen, &newEpSet) < 0) {
37,225!
74
      vError("vgId:%d, failed to serialize ep set", pVnode->config.vgId);
×
75
    }
76
    rsp.contLen = contLen;
37,225✔
77
  }
78

79
  tmsgSendRsp(&rsp);
37,225✔
80
}
37,224✔
81

82
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
1,716,791✔
83
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
1,716,791✔
84
  if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
1,716,791!
85
    rsp.code = terrno;
×
86
    vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to apply right now since %s", pVnode->config.vgId, pMsg,
×
87
            terrstr());
88
  }
89
  if (rsp.info.handle != NULL) {
1,716,878✔
90
    tmsgSendRsp(&rsp);
1,688,360✔
91
  } else {
92
    if (rsp.pCont) {
28,518✔
93
      rpcFreeCont(rsp.pCont);
20,452✔
94
    }
95
  }
96
}
1,716,905✔
97

98
static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
46,333✔
99
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
46,333✔
100
    vnodeRedirectRpcMsg(pVnode, pMsg, code);
28,683✔
101
  } else if (code == TSDB_CODE_MSG_PREPROCESSED) {
17,650✔
102
    SRpcMsg rsp = {.code = TSDB_CODE_SUCCESS, .info = pMsg->info};
17,118✔
103
    if (rsp.info.handle != NULL) {
17,118✔
104
      tmsgSendRsp(&rsp);
17,106✔
105
    }
106
  } else {
107
    vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg,
532!
108
            tstrerror(code), code);
109
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
519✔
110
    if (rsp.info.handle != NULL) {
519✔
111
      tmsgSendRsp(&rsp);
109✔
112
    }
113
  }
114
}
46,393✔
115

116
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
1,863,728✔
117
  int64_t seq = 0;
1,863,728✔
118

119
  (void)taosThreadMutexLock(&pVnode->lock);
1,863,728✔
120
  int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
1,863,862✔
121
  bool    wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
1,863,714✔
122
  if (wait) {
1,863,715✔
123
    if (pVnode->blocked) {
2,438!
124
      return TSDB_CODE_INTERNAL_ERROR;
×
125
    }
126
    pVnode->blocked = true;
2,438✔
127
    pVnode->blockSec = taosGetTimestampSec();
2,438✔
128
    pVnode->blockSeq = seq;
2,438✔
129
  }
130
  (void)taosThreadMutexUnlock(&pVnode->lock);
1,863,715✔
131

132
  if (code > 0) {
1,863,832✔
133
    vnodeHandleWriteMsg(pVnode, pMsg);
1,716,920✔
134
  } else if (code < 0) {
146,912✔
135
    if (terrno != 0) code = terrno;
8,694!
136
    vnodeHandleProposeError(pVnode, pMsg, code);
8,694✔
137
  }
138

139
  if (wait) vnodeWaitBlockMsg(pVnode, pMsg);
1,863,807✔
140
  return code;
1,863,815✔
141
}
142

143
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) {
1,890,416✔
144
  if (!vnodeShouldCommit(pVnode, atExit)) {
1,890,416✔
145
    return;
1,883,622✔
146
  }
147

148
  int32_t   contLen = sizeof(SMsgHead);
7,053✔
149
  SMsgHead *pHead = rpcMallocCont(contLen);
7,053✔
150
  pHead->contLen = contLen;
7,050✔
151
  pHead->vgId = pVnode->config.vgId;
7,050✔
152

153
  SRpcMsg rpcMsg = {0};
7,050✔
154
  rpcMsg.msgType = TDMT_VND_COMMIT;
7,050✔
155
  rpcMsg.contLen = contLen;
7,050✔
156
  rpcMsg.pCont = pHead;
7,050✔
157
  rpcMsg.info.noResp = 1;
7,050✔
158

159
  vInfo("vgId:%d, propose vnode commit", pVnode->config.vgId);
7,050✔
160
  bool isWeak = false;
7,053✔
161

162
  if (!atExit) {
7,053✔
163
    if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) {
108!
164
      vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr());
×
165
    }
166
    rpcFreeCont(rpcMsg.pCont);
108✔
167
    rpcMsg.pCont = NULL;
108✔
168
  } else {
169
    int32_t code = 0;
6,945✔
170
    if ((code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg)) < 0) {
6,945✔
171
      vError("vgId:%d, failed to put vnode commit to write_queue since %s", pVnode->config.vgId, tstrerror(code));
3,109✔
172
    }
173
  }
174
}
175

176
#if BATCH_ENABLE
177

178
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
179
  if (*arrSize <= 0) return;
180
  SRpcMsg *pLastMsg = pMsgArr[*arrSize - 1];
181

182
  (void)taosThreadMutexLock(&pVnode->lock);
183
  int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
184
  bool    wait = (code == 0 && vnodeIsBlockMsg(pLastMsg->msgType));
185
  if (wait) {
186
    pVnode->blocked = true;
187
  }
188
  (void)taosThreadMutexUnlock(&pVnode->lock);
189

190
  if (code > 0) {
191
    for (int32_t i = 0; i < *arrSize; ++i) {
192
      vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
193
    }
194
  } else if (code < 0) {
195
    if (terrno != 0) code = terrno;
196
    for (int32_t i = 0; i < *arrSize; ++i) {
197
      vnodeHandleProposeError(pVnode, pMsgArr[i], code);
198
    }
199
  }
200

201
  if (wait) vnodeWaitBlockMsg(pVnode, pLastMsg);
202
  pLastMsg = NULL;
203

204
  for (int32_t i = 0; i < *arrSize; ++i) {
205
    SRpcMsg        *pMsg = pMsgArr[i];
206
    vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
207
    rpcFreeCont(pMsg->pCont);
208
    taosFreeQitem(pMsg);
209
  }
210

211
  *arrSize = 0;
212
}
213

214
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
215
  SVnode   *pVnode = pInfo->ahandle;
216
  int32_t   vgId = pVnode->config.vgId;
217
  int32_t   code = 0;
218
  SRpcMsg  *pMsg = NULL;
219
  int32_t   arrayPos = 0;
220
  SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
221
  bool     *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
222
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
223

224
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
225
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
226
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
227
    bool isBlock = vnodeIsMsgBlock(pMsg->msgType);
228

229
    vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
230
            isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);
231

232
    if (!pVnode->restored) {
233
      vGWarn(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process since restore not finished, type:%s", vgId, pMsg,
234
             TMSG_INFO(pMsg->msgType));
235
      terrno = TSDB_CODE_SYN_RESTORING;
236
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
237
      rpcFreeCont(pMsg->pCont);
238
      taosFreeQitem(pMsg);
239
      continue;
240
    }
241

242
    if (pMsgArr == NULL || pIsWeakArr == NULL) {
243
      vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process since out of memory, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
244
      terrno = TSDB_CODE_OUT_OF_MEMORY;
245
      vnodeHandleProposeError(pVnode, pMsg, terrno);
246
      rpcFreeCont(pMsg->pCont);
247
      taosFreeQitem(pMsg);
248
      continue;
249
    }
250

251
    bool atExit = false;
252
    vnodeProposeCommitOnNeed(pVnode, atExit);
253

254
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
255
    if (code != 0) {
256
      vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to pre-process since %s", vgId, pMsg, terrstr());
257
      rpcFreeCont(pMsg->pCont);
258
      taosFreeQitem(pMsg);
259
      continue;
260
    }
261

262
    if (isBlock) {
263
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
264
    }
265

266
    pMsgArr[arrayPos] = pMsg;
267
    pIsWeakArr[arrayPos] = isWeak;
268
    arrayPos++;
269

270
    if (isBlock || msg == numOfMsgs - 1) {
271
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
272
    }
273
  }
274

275
  taosMemoryFree(pMsgArr);
276
  taosMemoryFree(pIsWeakArr);
277
}
278

279
#else
280

281
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
1,885,333✔
282
  SVnode  *pVnode = pInfo->ahandle;
1,885,333✔
283
  int32_t  vgId = pVnode->config.vgId;
1,885,333✔
284
  int32_t  code = 0;
1,885,333✔
285
  SRpcMsg *pMsg = NULL;
1,885,333✔
286
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
1,885,333✔
287

288
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
3,787,082✔
289
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
1,901,420!
290
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
1,901,376✔
291

292
    vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-write queue, weak:%d block:%d msg:%d:%d, handle:%p",
1,901,237!
293
            vgId, pMsg, isWeak, vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle);
294

295
    if (!pVnode->restored) {
1,901,306✔
296
      vGWarn(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process since restore not finished, type:%s", vgId, pMsg,
20,441!
297
             TMSG_INFO(pMsg->msgType));
298
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
20,441✔
299
      rpcFreeCont(pMsg->pCont);
20,440✔
300
      taosFreeQitem(pMsg);
20,440✔
301
      continue;
20,440✔
302
    }
303

304
    bool atExit = false;
1,880,865✔
305
    vnodeProposeCommitOnNeed(pVnode, atExit);
1,880,865✔
306

307
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
1,881,013✔
308
    if (code != 0) {
1,880,770✔
309
      if (code != TSDB_CODE_MSG_PREPROCESSED) {
17,195✔
310
        vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to pre-process since %s", vgId, pMsg, tstrerror(code));
114!
311
      }
312
      vnodeHandleProposeError(pVnode, pMsg, code);
17,195✔
313
      rpcFreeCont(pMsg->pCont);
17,245✔
314
      taosFreeQitem(pMsg);
17,270✔
315
      continue;
17,271✔
316
    }
317

318
    code = vnodeProposeMsg(pVnode, pMsg, isWeak);
1,863,575✔
319

320
    vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is freed, code:0x%x", vgId, pMsg, code);
1,863,698!
321
    rpcFreeCont(pMsg->pCont);
1,863,698✔
322
    taosFreeQitem(pMsg);
1,863,745✔
323
  }
324
}
1,885,662✔
325

326
#endif
327

328
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
645,220✔
329
  SVnode  *pVnode = pInfo->ahandle;
645,220✔
330
  int32_t  vgId = pVnode->config.vgId;
645,220✔
331
  int32_t  code = 0;
645,220✔
332
  SRpcMsg *pMsg = NULL;
645,220✔
333

334
  for (int32_t i = 0; i < numOfMsgs; ++i) {
1,436,729✔
335
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
791,488!
336

337
    if (vnodeIsMsgBlock(pMsg->msgType)) {
791,485✔
338
      vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-apply queue, type:%s handle:%p index:%" PRId64
18,350!
339
              ", blocking msg obtained sec:%d seq:%" PRId64,
340
              vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex, pVnode->blockSec,
341
              pVnode->blockSeq);
342
    } else {
343
      vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
773,133!
344
              TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
345
    }
346

347
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
791,483✔
348
    if (rsp.code == 0) {
791,483!
349
      if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
791,512!
350
        rsp.code = terrno;
×
351
        vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to apply since %s, index:%" PRId64, vgId, pMsg, terrstr(),
×
352
                pMsg->info.conn.applyIndex);
353
      }
354
    }
355

356
    vnodePostBlockMsg(pVnode, pMsg);
791,485✔
357
    if (rsp.info.handle != NULL) {
791,505✔
358
      tmsgSendRsp(&rsp);
138,163✔
359
    } else {
360
      if (rsp.pCont) {
653,342✔
361
        rpcFreeCont(rsp.pCont);
610,927✔
362
      }
363
    }
364

365
    vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code,
791,508!
366
            pMsg->info.conn.applyIndex);
367
    rpcFreeCont(pMsg->pCont);
791,508✔
368
    taosFreeQitem(pMsg);
791,505✔
369
  }
370
}
645,241✔
371

372
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
942,819✔
373
  vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-sync queue, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
942,819!
374

375
  int32_t code = syncProcessMsg(pVnode->sync, pMsg);
942,823✔
376
  if (code != 0) {
942,894✔
377
    vGError(&pMsg->info.traceId, "vgId:%d, msg:%p, failed to process since %s, type:%s", pVnode->config.vgId, pMsg, tstrerror(code),
15!
378
            TMSG_INFO(pMsg->msgType));
379
  }
380

381
  return code;
942,891✔
382
}
383

384
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
×
385
  if (pMsg == NULL || pMsg->pCont == NULL) {
×
386
    return TSDB_CODE_INVALID_PARA;
×
387
  }
388

389
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
×
390
    rpcFreeCont(pMsg->pCont);
×
391
    pMsg->pCont = NULL;
×
392
    return TSDB_CODE_INVALID_PARA;
×
393
  }
394

395
  int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
×
396
  if (code != 0) {
×
397
    rpcFreeCont(pMsg->pCont);
×
398
    pMsg->pCont = NULL;
×
399
  }
400
  return code;
×
401
}
402

403
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
210,256✔
404
  if (pMsg == NULL || pMsg->pCont == NULL) {
210,256!
405
    return TSDB_CODE_INVALID_PARA;
×
406
  }
407

408
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
210,256!
409
    rpcFreeCont(pMsg->pCont);
×
410
    pMsg->pCont = NULL;
×
411
    return TSDB_CODE_INVALID_PARA;
×
412
  }
413

414
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
210,256✔
415
  if (code != 0) {
210,256✔
416
    rpcFreeCont(pMsg->pCont);
1,320✔
417
    pMsg->pCont = NULL;
1,320✔
418
  }
419
  return code;
210,256✔
420
}
421

422
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
757,933✔
423
  int32_t code = tmsgSendSyncReq(pEpSet, pMsg);
757,933✔
424
  if (code != 0) {
757,962!
425
    rpcFreeCont(pMsg->pCont);
×
426
    pMsg->pCont = NULL;
×
427
  }
428
  return code;
757,962✔
429
}
430

431
static int32_t vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
493,030✔
432
  return vnodeGetSnapshot(pFsm->data, pSnapshot);
493,030✔
433
}
434

435
static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
791,524✔
436
  SVnode *pVnode = pFsm->data;
791,524✔
437
  pMsg->info.conn.applyIndex = pMeta->index;
791,524✔
438
  pMsg->info.conn.applyTerm = pMeta->term;
791,524✔
439

440
  vGTrace(&pMsg->info.traceId,
791,524!
441
          "vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
442
          ", weak:%d, code:%d, state:%d %s, type:%s code:0x%x",
443
          pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
444
          pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code);
445

446
  int32_t code = tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
791,524✔
447
  if (code < 0) vError("vgId:%d, failed to put into apply_queue since %s", pVnode->config.vgId, tstrerror(code));
791,523!
448
  return code;
791,523✔
449
}
450

451
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
791,526✔
452
  if (pMsg->code == 0) {
791,526!
453
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
791,527✔
454
  }
455

456
  SVnode *pVnode = pFsm->data;
×
457
  vnodePostBlockMsg(pVnode, pMsg);
×
458

459
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
460
  if (rsp.info.handle != NULL) {
×
461
    tmsgSendRsp(&rsp);
×
462
  }
463

464
  vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, is freed, code:0x%x index:%" PRId64, TD_VID(pVnode), pMsg, rsp.code,
×
465
          pMeta->index);
466
  rpcFreeCont(pMsg->pCont);
×
467
  pMsg->pCont = NULL;
×
468
  return 0;
×
469
}
470

471
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
×
472
  if (pMeta->isWeak == 1) {
×
473
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
×
474
  }
475
  return 0;
×
476
}
477

478
static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) {
2,054,421✔
479
  SVnode *pVnode = pFSM->data;
2,054,421✔
480
  return atomic_load_64(&pVnode->state.applied);
2,054,421✔
481
}
482

483
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
×
484
  SVnode *pVnode = pFsm->data;
×
485
  vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
×
486
         pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),
487
         TMSG_INFO(pMsg->msgType));
488
}
×
489

490
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
57✔
491
  SVnode *pVnode = pFsm->data;
57✔
492
  return vnodeSnapReaderOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapReader **)ppReader);
57✔
493
}
494

495
static void vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
57✔
496
  SVnode *pVnode = pFsm->data;
57✔
497
  vnodeSnapReaderClose(pReader);
57✔
498
}
57✔
499

500
static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
2,189✔
501
  SVnode *pVnode = pFsm->data;
2,189✔
502
  return vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
2,189✔
503
}
504

505
static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
56✔
506
  SVnode *pVnode = pFsm->data;
56✔
507

508
  do {
×
509
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
56✔
510
    if (itemSize == 0) {
56!
511
      vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
56!
512
      break;
56✔
513
    } else {
514
      vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId, itemSize);
×
515
      taosMsleep(10);
×
516
    }
517
  } while (true);
518

519
  return vnodeSnapWriterOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapWriter **)ppWriter);
56✔
520
}
521

522
static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
56✔
523
  SVnode *pVnode = pFsm->data;
56✔
524
  vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
56!
525
        pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
526

527
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
56✔
528
  if (code != 0) {
56!
529
    vError("vgId:%d, failed to finish applying vnode snapshot since %s, code:0x%x", pVnode->config.vgId, terrstr(),
×
530
           code);
531
  }
532
  return code;
56✔
533
}
534

535
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
2,104✔
536
  SVnode *pVnode = pFsm->data;
2,104✔
537
  vDebug("vgId:%d, continue write vnode snapshot, blockLen:%d", pVnode->config.vgId, len);
2,104!
538
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
2,104✔
539
  vDebug("vgId:%d, continue write vnode snapshot finished, blockLen:%d", pVnode->config.vgId, len);
2,104!
540
  return code;
2,104✔
541
}
542

543
static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
11,558✔
544
  SVnode   *pVnode = pFsm->data;
11,558✔
545
  int32_t   vgId = pVnode->config.vgId;
11,558✔
546
  SyncIndex appliedIdx = -1;
11,558✔
547

548
  do {
549
    appliedIdx = vnodeSyncAppliedIndex(pFsm);
28,822✔
550
    if (appliedIdx > commitIdx) {
29,023!
551
      vError("vgId:%d, restore failed since applied-index:%" PRId64 " is larger than commit-index:%" PRId64, vgId,
×
552
             appliedIdx, commitIdx);
553
      break;
×
554
    }
555
    if (appliedIdx == commitIdx) {
29,023✔
556
      vInfo("vgId:%d, no items to be applied, restore finish", pVnode->config.vgId);
11,557✔
557
      break;
11,558✔
558
    } else {
559
      vInfo("vgId:%d, restore not finish since %" PRId64 " items to be applied. commit-index:%" PRId64
17,466✔
560
            ", applied-index:%" PRId64,
561
            vgId, commitIdx - appliedIdx, commitIdx, appliedIdx);
562
      taosMsleep(10);
17,475✔
563
    }
564
  } while (true);
565

566
  walApplyVer(pVnode->pWal, commitIdx);
11,558✔
567
  pVnode->restored = true;
11,558✔
568

569
#ifdef USE_STREAM
570
  SStreamMeta *pMeta = pVnode->pTq->pStreamMeta;
11,558✔
571
  streamMetaWLock(pMeta);
11,558✔
572

573
  if (pMeta->startInfo.tasksWillRestart) {
11,558!
574
    vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
×
575
    streamMetaWUnLock(pMeta);
×
576
    return;
×
577
  }
578

579
  if (vnodeIsRoleLeader(pVnode)) {
11,558✔
580
    // start to restore all stream tasks
581
    if (tsDisableStream) {
9,665!
582
      vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
×
583
    } else {
584
      vInfo("vgId:%d sync restore finished, start to launch stream task(s)", vgId);
9,665✔
585
      if (pMeta->startInfo.startAllTasks == 1) {
9,665!
586
        pMeta->startInfo.restartCount += 1;
×
587
        vDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
×
588
               pMeta->startInfo.restartCount);
589
      } else {
590
        pMeta->startInfo.startAllTasks = 1;
9,665✔
591
        streamMetaWUnLock(pMeta);
9,665✔
592

593
        tqInfo("vgId:%d stream task already loaded, start them", vgId);
9,665!
594
        int32_t code = streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS, false);
9,665✔
595
        if (code != 0) {
9,665!
596
          tqError("vgId:%d failed to sched stream task, code:%s", vgId, tstrerror(code));
×
597
        }
598
        return;
9,665✔
599
      }
600
    }
601
  } else {
602
    vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
1,893!
603
  }
604

605
  streamMetaWUnLock(pMeta);
1,893✔
606
#endif
607
}
608

609
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
4,820✔
610
  SVnode *pVnode = pFsm->data;
4,820✔
611
  vInfo("vgId:%d, become follower", pVnode->config.vgId);
4,820!
612

613
  (void)taosThreadMutexLock(&pVnode->lock);
4,820✔
614
  if (pVnode->blocked) {
4,820!
615
    pVnode->blocked = false;
×
616
    vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
×
617
    if (tsem_post(&pVnode->syncSem) != 0) {
×
618
      vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
×
619
    }
620
  }
621
  (void)taosThreadMutexUnlock(&pVnode->lock);
4,820✔
622

623
#ifdef USE_TQ
624
  if (pVnode->pTq) {
4,820!
625
    tqUpdateNodeStage(pVnode->pTq, false);
4,820✔
626
    if (tqStopStreamAllTasksAsync(pVnode->pTq->pStreamMeta, &pVnode->msgCb) != 0) {
4,820!
627
      vError("vgId:%d, failed to stop stream tasks", pVnode->config.vgId);
×
628
    }
629
  }
630
#endif
631
}
4,820✔
632

633
static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
160✔
634
  SVnode *pVnode = pFsm->data;
160✔
635
  vInfo("vgId:%d, become learner", pVnode->config.vgId);
160!
636

637
  (void)taosThreadMutexLock(&pVnode->lock);
160✔
638
  if (pVnode->blocked) {
160!
639
    pVnode->blocked = false;
×
640
    vDebug("vgId:%d, become learner and post block", pVnode->config.vgId);
×
641
    if (tsem_post(&pVnode->syncSem) != 0) {
×
642
      vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
×
643
    }
644
  }
645
  (void)taosThreadMutexUnlock(&pVnode->lock);
160✔
646
}
160✔
647

648
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
9,668✔
649
  SVnode *pVnode = pFsm->data;
9,668✔
650
  vDebug("vgId:%d, become leader", pVnode->config.vgId);
9,668✔
651
#ifdef USE_TQ
652
  if (pVnode->pTq) {
9,668!
653
    tqUpdateNodeStage(pVnode->pTq, true);
9,668✔
654
  }
655
#endif
656
}
9,668✔
657

658
static void vnodeBecomeAssignedLeader(const SSyncFSM *pFsm) {
×
659
  SVnode *pVnode = pFsm->data;
×
660
  vDebug("vgId:%d, become assigned leader", pVnode->config.vgId);
×
661
#ifdef USE_TQ
662
  if (pVnode->pTq) {
×
663
    tqUpdateNodeStage(pVnode->pTq, true);
×
664
  }
665
#endif
666
}
×
667

668
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
×
669
  SVnode *pVnode = pFsm->data;
×
670

671
  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
×
672
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
×
673
    return (itemSize == 0);
×
674
  } else {
675
    return true;
×
676
  }
677
}
678

679
static int32_t vnodeApplyQueueItems(const SSyncFSM *pFsm) {
160,864✔
680
  SVnode *pVnode = pFsm->data;
160,864✔
681

682
  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
160,864!
683
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
160,876✔
684
    return itemSize;
160,868✔
685
  } else {
686
    return TSDB_CODE_INVALID_PARA;
×
687
  }
688
}
689

690
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
11,636✔
691
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
11,636!
692
  if (pFsm == NULL) {
11,635!
693
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
694
    return NULL;
×
695
  }
696
  pFsm->data = pVnode;
11,635✔
697
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
11,635✔
698
  pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex;
11,635✔
699
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
11,635✔
700
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
11,635✔
701
  pFsm->FpGetSnapshot = NULL;
11,635✔
702
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
11,635✔
703
  pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
11,635✔
704
  pFsm->FpAfterRestoredCb = NULL;
11,635✔
705
  pFsm->FpLeaderTransferCb = NULL;
11,635✔
706
  pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
11,635✔
707
  pFsm->FpApplyQueueItems = vnodeApplyQueueItems;
11,635✔
708
  pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
11,635✔
709
  pFsm->FpBecomeAssignedLeaderCb = vnodeBecomeAssignedLeader;
11,635✔
710
  pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
11,635✔
711
  pFsm->FpBecomeLearnerCb = vnodeBecomeLearner;
11,635✔
712
  pFsm->FpReConfigCb = NULL;
11,635✔
713
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
11,635✔
714
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
11,635✔
715
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
11,635✔
716
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
11,635✔
717
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
11,635✔
718
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;
11,635✔
719

720
  return pFsm;
11,635✔
721
}
722

723
int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion) {
11,635✔
724
  SSyncInfo syncInfo = {
11,635✔
725
      .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
726
      .batchSize = 1,
727
      .vgId = pVnode->config.vgId,
11,635✔
728
      .syncCfg = pVnode->config.syncCfg,
729
      .pWal = pVnode->pWal,
11,635✔
730
      .msgcb = &pVnode->msgCb,
11,635✔
731
      .syncSendMSg = vnodeSyncSendMsg,
732
      .syncEqMsg = vnodeSyncEqMsg,
733
      .syncEqCtrlMsg = vnodeSyncEqCtrlMsg,
734
      .pingMs = 5000,
735
      .electMs = 4000,
736
      .heartbeatMs = 700,
737
  };
738

739
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
11,635✔
740
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
11,635✔
741

742
  SSyncCfg *pCfg = &syncInfo.syncCfg;
11,635✔
743
  vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex);
11,635✔
744
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
29,508✔
745
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
17,872✔
746
    vInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pVnode->config.vgId, i, pNode->nodeFqdn,
17,872✔
747
          pNode->nodePort, pNode->nodeId, pNode->clusterId);
748
  }
749

750
  pVnode->sync = syncOpen(&syncInfo, vnodeVersion);
11,636✔
751
  if (pVnode->sync <= 0) {
11,636!
752
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
×
753
    return terrno;
×
754
  }
755

756
  return 0;
11,636✔
757
}
758

759
int32_t vnodeSyncStart(SVnode *pVnode) {
11,636✔
760
  vInfo("vgId:%d, start sync", pVnode->config.vgId);
11,636✔
761
  int32_t code = syncStart(pVnode->sync);
11,636✔
762
  if (code) {
11,636!
763
    vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, tstrerror(code));
×
764
    return code;
×
765
  }
766
  return 0;
11,636✔
767
}
768

769
void vnodeSyncPreClose(SVnode *pVnode) {
11,636✔
770
  vInfo("vgId:%d, sync pre close", pVnode->config.vgId);
11,636✔
771
  int32_t code = syncLeaderTransfer(pVnode->sync);
11,636✔
772
  if (code) {
11,636✔
773
    vError("vgId:%d, failed to transfer leader since %s", pVnode->config.vgId, tstrerror(code));
914!
774
  }
775
  syncPreStop(pVnode->sync);
11,636✔
776

777
  (void)taosThreadMutexLock(&pVnode->lock);
11,636✔
778
  if (pVnode->blocked) {
11,635✔
779
    vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
22!
780
    pVnode->blocked = false;
22✔
781
    if (tsem_post(&pVnode->syncSem) != 0) {
22!
782
      vError("vgId:%d, failed to post block", pVnode->config.vgId);
×
783
    }
784
  }
785
  (void)taosThreadMutexUnlock(&pVnode->lock);
11,635✔
786
}
11,636✔
787

788
void vnodeSyncPostClose(SVnode *pVnode) {
11,636✔
789
  vInfo("vgId:%d, sync post close", pVnode->config.vgId);
11,636✔
790
  syncPostStop(pVnode->sync);
11,636✔
791
}
11,636✔
792

793
void vnodeSyncClose(SVnode *pVnode) {
11,635✔
794
  vInfo("vgId:%d, close sync", pVnode->config.vgId);
11,635✔
795
  syncStop(pVnode->sync);
11,636✔
796
}
11,634✔
797

798
void vnodeSyncCheckTimeout(SVnode *pVnode) {
5,284✔
799
  vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId);
5,284✔
800
  (void)taosThreadMutexLock(&pVnode->lock);
5,284✔
801
  if (pVnode->blocked) {
5,284!
802
    int32_t curSec = taosGetTimestampSec();
×
803
    int32_t delta = curSec - pVnode->blockSec;
×
804
    if (delta > VNODE_TIMEOUT_SEC) {
×
805
      vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64,
×
806
             pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq);
807
      if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) {
×
808
#if 0
809
        SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo};
810
        vError("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq,
811
              rpcMsg.info.handle, rpcMsg.info.ahandle);
812
        rpcSendResponse(&rpcMsg);
813
#endif
814
      }
815
      pVnode->blocked = false;
×
816
      pVnode->blockSec = 0;
×
817
      pVnode->blockSeq = 0;
×
818
      if (tsem_post(&pVnode->syncSem) != 0) {
×
819
        vError("vgId:%d, failed to post block", pVnode->config.vgId);
×
820
      }
821
    }
822
  }
823
  (void)taosThreadMutexUnlock(&pVnode->lock);
5,284✔
824
}
5,284✔
825

826
bool vnodeIsRoleLeader(SVnode *pVnode) {
131,578✔
827
  SSyncState state = syncGetState(pVnode->sync);
131,578✔
828
  return state.state == TAOS_SYNC_STATE_LEADER;
131,599✔
829
}
830

831
bool vnodeIsLeader(SVnode *pVnode) {
17,609✔
832
  terrno = 0;
17,609✔
833
  SSyncState state = syncGetState(pVnode->sync);
17,620✔
834

835
  if (terrno != 0) {
17,644✔
836
    vInfo("vgId:%d, vnode is stopping", pVnode->config.vgId);
2,133!
837
    return false;
2,133✔
838
  }
839

840
  if (state.state != TAOS_SYNC_STATE_LEADER) {
15,502!
841
    terrno = TSDB_CODE_SYN_NOT_LEADER;
×
842
    vInfo("vgId:%d, vnode not leader, state:%s", pVnode->config.vgId, syncStr(state.state));
×
843
    return false;
×
844
  }
845

846
  if (!state.restored || !pVnode->restored) {
15,502✔
847
    terrno = TSDB_CODE_SYN_RESTORING;
6✔
848
    vInfo("vgId:%d, vnode not restored:%d:%d", pVnode->config.vgId, state.restored, pVnode->restored);
×
849
    return false;
×
850
  }
851

852
  return true;
15,496✔
853
}
854

855
int64_t vnodeClusterId(SVnode *pVnode) {
×
856
  SSyncCfg *syncCfg = &pVnode->config.syncCfg;
×
857
  return syncCfg->nodeInfo[syncCfg->myIndex].clusterId;
×
858
}
859

860
int32_t vnodeNodeId(SVnode *pVnode) {
765,946✔
861
  SSyncCfg *syncCfg = &pVnode->config.syncCfg;
765,946✔
862
  return syncCfg->nodeInfo[syncCfg->myIndex].nodeId;
765,946✔
863
}
864

865
int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnap) {
493,096✔
866
  int code = 0;
493,096✔
867
  pSnap->lastApplyIndex = pVnode->state.committed;
493,096✔
868
  pSnap->lastApplyTerm = pVnode->state.commitTerm;
493,096✔
869
  pSnap->lastConfigIndex = -1;
493,096✔
870
  pSnap->state = SYNC_FSM_STATE_COMPLETE;
493,096✔
871

872
  if (tsdbSnapGetFsState(pVnode) != TSDB_FS_STATE_NORMAL) {
493,096!
873
    pSnap->state = SYNC_FSM_STATE_INCOMPLETE;
×
874
  }
875

876
  if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT || pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
493,109✔
877
    code = tsdbSnapPrepDescription(pVnode, pSnap);
113✔
878
  }
879
  return code;
493,113✔
880
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc