• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3545

02 Dec 2024 06:22AM UTC coverage: 60.839% (-0.04%) from 60.88%
#3545

push

travis-ci

web-flow
Merge pull request #28961 from taosdata/fix/refactor-vnode-management-open-vnode

fix/refactor-vnode-management-open-vnode

120592 of 253473 branches covered (47.58%)

Branch coverage included in aggregate %.

102 of 145 new or added lines in 3 files covered. (70.34%)

477 existing lines in 108 files now uncovered.

201840 of 276506 relevant lines covered (73.0%)

19392204.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.17
/source/dnode/vnode/src/vnd/vnodeSync.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "tq.h"
19
#include "tqCommon.h"
20
#include "tsdb.h"
21
#include "vnd.h"
22

23
#define BATCH_ENABLE 0
24

25
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
10,561,422✔
26

27
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
2,675✔
28
  const STraceId *trace = &pMsg->info.traceId;
2,675✔
29
  vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
2,675!
30
          TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
31
  if (tsem_wait(&pVnode->syncSem) != 0) {
2,675!
32
    vError("vgId:%d, failed to wait sem", pVnode->config.vgId);
×
33
  }
34
}
2,673✔
35

36
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
3,173,903✔
37
  if (vnodeIsMsgBlock(pMsg->msgType)) {
3,173,903✔
38
    const STraceId *trace = &pMsg->info.traceId;
19,684✔
39
    (void)taosThreadMutexLock(&pVnode->lock);
19,684✔
40
    if (pVnode->blocked) {
19,684✔
41
      vGTrace("vgId:%d, msg:%p post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
2,636!
42
              TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
43
      pVnode->blocked = false;
2,636✔
44
      pVnode->blockSec = 0;
2,636✔
45
      pVnode->blockSeq = 0;
2,636✔
46
      if (tsem_post(&pVnode->syncSem) != 0) {
2,636!
47
        vError("vgId:%d, failed to post sem", pVnode->config.vgId);
×
48
      }
49
    }
50
    (void)taosThreadMutexUnlock(&pVnode->lock);
19,684✔
51
  }
52
}
3,173,905✔
53

54
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
42,262✔
55
  SEpSet newEpSet = {0};
42,262✔
56
  syncGetRetryEpSet(pVnode->sync, &newEpSet);
42,262✔
57

58
  const STraceId *trace = &pMsg->info.traceId;
42,262✔
59
  vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
42,262!
60
          newEpSet.numOfEps, newEpSet.inUse);
61
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
159,070✔
62
    vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
116,808!
63
            newEpSet.eps[i].port);
64
  }
65
  pMsg->info.hasEpSet = 1;
42,262✔
66

67
  if (code == 0) code = TSDB_CODE_SYN_NOT_LEADER;
42,262!
68

69
  SRpcMsg rsp = {.code = code, .info = pMsg->info, .msgType = pMsg->msgType + 1};
42,262✔
70
  int32_t contLen = tSerializeSEpSet(NULL, 0, &newEpSet);
42,262✔
71

72
  rsp.pCont = rpcMallocCont(contLen);
42,261✔
73
  if (rsp.pCont == NULL) {
42,261!
74
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
75
  } else {
76
    if (tSerializeSEpSet(rsp.pCont, contLen, &newEpSet) < 0) {
42,261!
77
      vError("vgId:%d, failed to serialize ep set", pVnode->config.vgId);
×
78
    }
79
    rsp.contLen = contLen;
42,261✔
80
  }
81

82
  tmsgSendRsp(&rsp);
42,261✔
83
}
42,262✔
84

85
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
10,275,368✔
86
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
10,275,368✔
87
  if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
10,275,368✔
88
    rsp.code = terrno;
511✔
89
    const STraceId *trace = &pMsg->info.traceId;
466✔
90
    vGError("vgId:%d, msg:%p failed to apply right now since %s", pVnode->config.vgId, pMsg, terrstr());
466!
91
  }
92
  if (rsp.info.handle != NULL) {
10,275,371✔
93
    tmsgSendRsp(&rsp);
10,236,477✔
94
  } else {
95
    if (rsp.pCont) {
38,894✔
96
      rpcFreeCont(rsp.pCont);
28,127✔
97
    }
98
  }
99
}
10,275,691✔
100

101
static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
129,983✔
102
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
129,983✔
103
    vnodeRedirectRpcMsg(pVnode, pMsg, code);
36,395✔
104
  } else if (code == TSDB_CODE_MSG_PREPROCESSED) {
93,588✔
105
    SRpcMsg rsp = {.code = TSDB_CODE_SUCCESS, .info = pMsg->info};
92,649✔
106
    if (rsp.info.handle != NULL) {
92,649!
107
      tmsgSendRsp(&rsp);
92,837✔
108
    }
109
  } else {
110
    const STraceId *trace = &pMsg->info.traceId;
939✔
111
    vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code);
939!
112
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
939✔
113
    if (rsp.info.handle != NULL) {
939✔
114
      tmsgSendRsp(&rsp);
498✔
115
    }
116
  }
117
}
130,290✔
118

119
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
10,442,056✔
120
  int64_t seq = 0;
10,442,056✔
121

122
  (void)taosThreadMutexLock(&pVnode->lock);
10,442,056✔
123
  int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
10,442,366✔
124
  bool    wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
10,442,118✔
125
  if (wait) {
10,442,118✔
126
    if (pVnode->blocked) {
2,675!
127
      return TSDB_CODE_INTERNAL_ERROR;
×
128
    }
129
    pVnode->blocked = true;
2,675✔
130
    pVnode->blockSec = taosGetTimestampSec();
2,675✔
131
    pVnode->blockSeq = seq;
2,675✔
132
  }
133
  (void)taosThreadMutexUnlock(&pVnode->lock);
10,442,118✔
134

135
  if (code > 0) {
10,442,308✔
136
    vnodeHandleWriteMsg(pVnode, pMsg);
10,275,677✔
137
  } else if (code < 0) {
166,631✔
138
    if (terrno != 0) code = terrno;
10,098!
139
    vnodeHandleProposeError(pVnode, pMsg, code);
10,098✔
140
  }
141

142
  if (wait) vnodeWaitBlockMsg(pVnode, pMsg);
10,442,268✔
143
  return code;
10,442,287✔
144
}
145

146
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) {
10,545,903✔
147
  if (!vnodeShouldCommit(pVnode, atExit)) {
10,545,903✔
148
    return;
10,537,917✔
149
  }
150

151
  int32_t   contLen = sizeof(SMsgHead);
8,827✔
152
  SMsgHead *pHead = rpcMallocCont(contLen);
8,827✔
153
  pHead->contLen = contLen;
8,830✔
154
  pHead->vgId = pVnode->config.vgId;
8,830✔
155

156
  SRpcMsg rpcMsg = {0};
8,830✔
157
  rpcMsg.msgType = TDMT_VND_COMMIT;
8,830✔
158
  rpcMsg.contLen = contLen;
8,830✔
159
  rpcMsg.pCont = pHead;
8,830✔
160
  rpcMsg.info.noResp = 1;
8,830✔
161

162
  vInfo("vgId:%d, propose vnode commit", pVnode->config.vgId);
8,830!
163
  bool isWeak = false;
8,830✔
164

165
  if (!atExit) {
8,830✔
166
    if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) {
780!
167
      vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr());
×
168
    }
169
    rpcFreeCont(rpcMsg.pCont);
780✔
170
    rpcMsg.pCont = NULL;
780✔
171
  } else {
172
    if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
8,050✔
173
      vTrace("vgId:%d, failed to put vnode commit to queue since %s", pVnode->config.vgId, terrstr());
3,638✔
174
    }
175
  }
176
}
177

178
#if BATCH_ENABLE
179

180
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
181
  if (*arrSize <= 0) return;
182
  SRpcMsg *pLastMsg = pMsgArr[*arrSize - 1];
183

184
  (void)taosThreadMutexLock(&pVnode->lock);
185
  int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
186
  bool    wait = (code == 0 && vnodeIsBlockMsg(pLastMsg->msgType));
187
  if (wait) {
188
    pVnode->blocked = true;
189
  }
190
  (void)taosThreadMutexUnlock(&pVnode->lock);
191

192
  if (code > 0) {
193
    for (int32_t i = 0; i < *arrSize; ++i) {
194
      vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
195
    }
196
  } else if (code < 0) {
197
    if (terrno != 0) code = terrno;
198
    for (int32_t i = 0; i < *arrSize; ++i) {
199
      vnodeHandleProposeError(pVnode, pMsgArr[i], code);
200
    }
201
  }
202

203
  if (wait) vnodeWaitBlockMsg(pVnode, pLastMsg);
204
  pLastMsg = NULL;
205

206
  for (int32_t i = 0; i < *arrSize; ++i) {
207
    SRpcMsg        *pMsg = pMsgArr[i];
208
    const STraceId *trace = &pMsg->info.traceId;
209
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
210
    rpcFreeCont(pMsg->pCont);
211
    taosFreeQitem(pMsg);
212
  }
213

214
  *arrSize = 0;
215
}
216

217
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
218
  SVnode   *pVnode = pInfo->ahandle;
219
  int32_t   vgId = pVnode->config.vgId;
220
  int32_t   code = 0;
221
  SRpcMsg  *pMsg = NULL;
222
  int32_t   arrayPos = 0;
223
  SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
224
  bool     *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
225
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
226

227
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
228
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
229
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
230
    bool isBlock = vnodeIsMsgBlock(pMsg->msgType);
231

232
    const STraceId *trace = &pMsg->info.traceId;
233
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
234
            isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);
235

236
    if (!pVnode->restored) {
237
      vGWarn("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg,
238
             TMSG_INFO(pMsg->msgType));
239
      terrno = TSDB_CODE_SYN_RESTORING;
240
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
241
      rpcFreeCont(pMsg->pCont);
242
      taosFreeQitem(pMsg);
243
      continue;
244
    }
245

246
    if (pMsgArr == NULL || pIsWeakArr == NULL) {
247
      vGError("vgId:%d, msg:%p failed to process since out of memory, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
248
      terrno = TSDB_CODE_OUT_OF_MEMORY;
249
      vnodeHandleProposeError(pVnode, pMsg, terrno);
250
      rpcFreeCont(pMsg->pCont);
251
      taosFreeQitem(pMsg);
252
      continue;
253
    }
254

255
    bool atExit = false;
256
    vnodeProposeCommitOnNeed(pVnode, atExit);
257

258
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
259
    if (code != 0) {
260
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
261
      rpcFreeCont(pMsg->pCont);
262
      taosFreeQitem(pMsg);
263
      continue;
264
    }
265

266
    if (isBlock) {
267
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
268
    }
269

270
    pMsgArr[arrayPos] = pMsg;
271
    pIsWeakArr[arrayPos] = isWeak;
272
    arrayPos++;
273

274
    if (isBlock || msg == numOfMsgs - 1) {
275
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
276
    }
277
  }
278

279
  taosMemoryFree(pMsgArr);
280
  taosMemoryFree(pIsWeakArr);
281
}
282

283
#else
284

285
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
9,116,400✔
286
  SVnode  *pVnode = pInfo->ahandle;
9,116,400✔
287
  int32_t  vgId = pVnode->config.vgId;
9,116,400✔
288
  int32_t  code = 0;
9,116,400✔
289
  SRpcMsg *pMsg = NULL;
9,116,400✔
290
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
9,116,400✔
291

292
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
19,678,574✔
293
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
10,561,619!
294
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
10,561,707✔
295

296
    const STraceId *trace = &pMsg->info.traceId;
10,561,446✔
297
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d, handle:%p", vgId, pMsg, isWeak,
10,561,446!
298
            vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle);
299

300
    if (!pVnode->restored) {
10,561,446✔
301
      vGWarn("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg,
26,844!
302
             TMSG_INFO(pMsg->msgType));
303
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
26,844✔
304
      rpcFreeCont(pMsg->pCont);
26,844✔
305
      taosFreeQitem(pMsg);
26,844✔
306
      continue;
26,844✔
307
    }
308

309
    bool atExit = false;
10,534,602✔
310
    vnodeProposeCommitOnNeed(pVnode, atExit);
10,534,602✔
311

312
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
10,535,102✔
313
    if (code != 0) {
10,534,379✔
314
      if (code != TSDB_CODE_MSG_PREPROCESSED) {
93,083✔
315
        vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, tstrerror(code));
496!
316
        if (terrno != 0) code = terrno;
496!
317
      }
318
      vnodeHandleProposeError(pVnode, pMsg, code);
93,083✔
319
      rpcFreeCont(pMsg->pCont);
93,305✔
320
      taosFreeQitem(pMsg);
93,467✔
321
      continue;
93,448✔
322
    }
323

324
    code = vnodeProposeMsg(pVnode, pMsg, isWeak);
10,441,296✔
325

326
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
10,441,476!
327
    rpcFreeCont(pMsg->pCont);
10,441,477✔
328
    taosFreeQitem(pMsg);
10,441,595✔
329
  }
330
}
9,116,955✔
331

332
#endif
333

334
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
2,211,779✔
335
  SVnode  *pVnode = pInfo->ahandle;
2,211,779✔
336
  int32_t  vgId = pVnode->config.vgId;
2,211,779✔
337
  int32_t  code = 0;
2,211,779✔
338
  SRpcMsg *pMsg = NULL;
2,211,779✔
339

340
  for (int32_t i = 0; i < numOfMsgs; ++i) {
5,385,759✔
341
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
3,173,981!
342
    const STraceId *trace = &pMsg->info.traceId;
3,173,949✔
343

344
    if (vnodeIsMsgBlock(pMsg->msgType)) {
3,173,949✔
345
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64
19,684!
346
              ", blocking msg obtained sec:%d seq:%" PRId64,
347
              vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex, pVnode->blockSec,
348
              pVnode->blockSeq);
349
    } else {
350
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
3,154,261!
351
              TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
352
    }
353

354
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
3,173,945✔
355
    if (rsp.code == 0) {
3,173,945!
356
      if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
3,173,988!
357
        rsp.code = terrno;
×
358
        vGError("vgId:%d, msg:%p failed to apply since %s, index:%" PRId64, vgId, pMsg, terrstr(),
×
359
                pMsg->info.conn.applyIndex);
360
      }
361
    }
362

363
    vnodePostBlockMsg(pVnode, pMsg);
3,173,874✔
364
    if (rsp.info.handle != NULL) {
3,173,900✔
365
      tmsgSendRsp(&rsp);
156,483✔
366
    } else {
367
      if (rsp.pCont) {
3,017,417✔
368
        rpcFreeCont(rsp.pCont);
2,965,051✔
369
      }
370
    }
371

372
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code, pMsg->info.conn.applyIndex);
3,173,922!
373
    rpcFreeCont(pMsg->pCont);
3,173,922✔
374
    taosFreeQitem(pMsg);
3,173,978✔
375
  }
376
}
2,211,778✔
377

378
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
6,052,357✔
379
  const STraceId *trace = &pMsg->info.traceId;
6,052,357✔
380
  vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
6,052,357!
381

382
  int32_t code = syncProcessMsg(pVnode->sync, pMsg);
6,052,358✔
383
  if (code != 0) {
6,052,402✔
384
    vGError("vgId:%d, failed to process sync msg:%p type:%s, reason: %s", pVnode->config.vgId, pMsg,
25!
385
            TMSG_INFO(pMsg->msgType), tstrerror(code));
386
  }
387

388
  return code;
6,052,402✔
389
}
390

391
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
×
392
  if (pMsg == NULL || pMsg->pCont == NULL) {
×
393
    return TSDB_CODE_INVALID_PARA;
×
394
  }
395

396
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
×
397
    rpcFreeCont(pMsg->pCont);
×
398
    pMsg->pCont = NULL;
×
399
    return TSDB_CODE_INVALID_PARA;
×
400
  }
401

402
  int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
×
403
  if (code != 0) {
×
404
    rpcFreeCont(pMsg->pCont);
×
405
    pMsg->pCont = NULL;
×
406
  }
407
  return code;
×
408
}
409

410
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
389,421✔
411
  if (pMsg == NULL || pMsg->pCont == NULL) {
389,421!
412
    return TSDB_CODE_INVALID_PARA;
×
413
  }
414

415
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
389,423!
416
    rpcFreeCont(pMsg->pCont);
×
417
    pMsg->pCont = NULL;
×
418
    return TSDB_CODE_INVALID_PARA;
×
419
  }
420

421
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
389,423✔
422
  if (code != 0) {
389,417✔
423
    rpcFreeCont(pMsg->pCont);
1,388✔
424
    pMsg->pCont = NULL;
1,390✔
425
  }
426
  return code;
389,419✔
427
}
428

429
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
5,686,131✔
430
  int32_t code = tmsgSendSyncReq(pEpSet, pMsg);
5,686,131✔
431
  if (code != 0) {
5,686,162!
432
    rpcFreeCont(pMsg->pCont);
×
433
    pMsg->pCont = NULL;
×
434
  }
435
  return code;
5,686,162✔
436
}
437

438
static int32_t vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
505,420✔
439
  return vnodeGetSnapshot(pFsm->data, pSnapshot);
505,420✔
440
}
441

442
static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
3,175,438✔
443
  SVnode *pVnode = pFsm->data;
3,175,438✔
444
  pMsg->info.conn.applyIndex = pMeta->index;
3,175,438✔
445
  pMsg->info.conn.applyTerm = pMeta->term;
3,175,438✔
446

447
  const STraceId *trace = &pMsg->info.traceId;
3,175,438✔
448
  vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
3,175,438!
449
          ", weak:%d, code:%d, state:%d %s, type:%s code:0x%x",
450
          pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
451
          pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code);
452

453
  return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
3,175,438✔
454
}
455

456
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
3,175,437✔
457
  if (pMsg->code == 0) {
3,175,437!
458
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
3,175,440✔
459
  }
460

461
  const STraceId *trace = &pMsg->info.traceId;
×
462
  SVnode         *pVnode = pFsm->data;
×
463
  vnodePostBlockMsg(pVnode, pMsg);
×
464

465
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
466
  if (rsp.info.handle != NULL) {
×
467
    tmsgSendRsp(&rsp);
×
468
  }
469

470
  vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, TD_VID(pVnode), pMsg, rsp.code, pMeta->index);
×
471
  rpcFreeCont(pMsg->pCont);
×
472
  pMsg->pCont = NULL;
×
473
  return 0;
×
474
}
475

476
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
×
477
  if (pMeta->isWeak == 1) {
×
478
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
×
479
  }
480
  return 0;
×
481
}
482

483
static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) {
10,663,216✔
484
  SVnode *pVnode = pFSM->data;
10,663,216✔
485
  return atomic_load_64(&pVnode->state.applied);
10,663,216✔
486
}
487

488
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
×
489
  SVnode *pVnode = pFsm->data;
×
490
  vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
×
491
         pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),
492
         TMSG_INFO(pMsg->msgType));
493
}
×
494

495
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
104✔
496
  SVnode *pVnode = pFsm->data;
104✔
497
  return vnodeSnapReaderOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapReader **)ppReader);
104✔
498
}
499

500
static void vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
104✔
501
  SVnode *pVnode = pFsm->data;
104✔
502
  vnodeSnapReaderClose(pReader);
104✔
503
}
104✔
504

505
static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
40,644✔
506
  SVnode *pVnode = pFsm->data;
40,644✔
507
  return vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
40,644✔
508
}
509

510
static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
102✔
511
  SVnode *pVnode = pFsm->data;
102✔
512

513
  do {
×
514
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
102✔
515
    if (itemSize == 0) {
102!
516
      vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
102!
517
      break;
102✔
518
    } else {
519
      vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId, itemSize);
×
520
      taosMsleep(10);
×
521
    }
522
  } while (true);
523

524
  return vnodeSnapWriterOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapWriter **)ppWriter);
102✔
525
}
526

527
static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
102✔
528
  SVnode *pVnode = pFsm->data;
102✔
529
  vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
102!
530
        pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
531

532
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
102✔
533
  if (code != 0) {
102!
534
    vError("vgId:%d, failed to finish applying vnode snapshot since %s, code:0x%x", pVnode->config.vgId, terrstr(),
×
535
           code);
536
  }
537
  return code;
102✔
538
}
539

540
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
40,482✔
541
  SVnode *pVnode = pFsm->data;
40,482✔
542
  vDebug("vgId:%d, continue write vnode snapshot, blockLen:%d", pVnode->config.vgId, len);
40,482!
543
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
40,482✔
544
  vDebug("vgId:%d, continue write vnode snapshot finished, blockLen:%d", pVnode->config.vgId, len);
40,482!
545
  return code;
40,482✔
546
}
547

548
static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
13,614✔
549
  SVnode   *pVnode = pFsm->data;
13,614✔
550
  int32_t   vgId = pVnode->config.vgId;
13,614✔
551
  SyncIndex appliedIdx = -1;
13,614✔
552

553
  do {
554
    appliedIdx = vnodeSyncAppliedIndex(pFsm);
27,533✔
555
    if (appliedIdx > commitIdx) {
27,533!
UNCOV
556
      vError("vgId:%d, restore failed since applied-index:%" PRId64 " is larger than commit-index:%" PRId64, vgId,
×
557
             appliedIdx, commitIdx);
558
      break;
×
559
    }
560
    if (appliedIdx == commitIdx) {
27,533✔
561
      vInfo("vgId:%d, no items to be applied, restore finish", pVnode->config.vgId);
13,614!
562
      break;
13,614✔
563
    } else {
564
      vInfo("vgId:%d, restore not finish since %" PRId64 " items to be applied. commit-index:%" PRId64
13,919!
565
            ", applied-index:%" PRId64,
566
            vgId, commitIdx - appliedIdx, commitIdx, appliedIdx);
567
      taosMsleep(10);
13,919✔
568
    }
569
  } while (true);
570

571
  walApplyVer(pVnode->pWal, commitIdx);
13,614✔
572
  pVnode->restored = true;
13,614✔
573

574
  SStreamMeta *pMeta = pVnode->pTq->pStreamMeta;
13,614✔
575
  streamMetaWLock(pMeta);
13,614✔
576

577
  if (pMeta->startInfo.tasksWillRestart) {
13,614!
578
    vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
×
579
    streamMetaWUnLock(pMeta);
×
580
    return;
×
581
  }
582

583
  if (vnodeIsRoleLeader(pVnode)) {
13,614✔
584
    // start to restore all stream tasks
585
    if (tsDisableStream) {
11,616!
586
      vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
×
587
    } else {
588
      vInfo("vgId:%d sync restore finished, start to launch stream task(s)", vgId);
11,616!
589
      if (pMeta->startInfo.startAllTasks == 1) {
11,616✔
590
        pMeta->startInfo.restartCount += 1;
2✔
591
        vDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
2!
592
               pMeta->startInfo.restartCount);
593
      } else {
594
        pMeta->startInfo.startAllTasks = 1;
11,614✔
595
        streamMetaWUnLock(pMeta);
11,614✔
596

597
        tqInfo("vgId:%d stream task already loaded, start them", vgId);
11,614!
598
        int32_t code = streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS);
11,614✔
599
        if (code != 0) {
11,614!
600
          tqError("vgId:%d failed to sched stream task, code:%s", vgId, tstrerror(code));
×
601
        }
602
        return;
11,614✔
603
      }
604
    }
605
  } else {
606
    vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
1,998!
607
  }
608

609
  streamMetaWUnLock(pMeta);
2,000✔
610
}
611

612
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
5,025✔
613
  SVnode *pVnode = pFsm->data;
5,025✔
614
  vInfo("vgId:%d, become follower", pVnode->config.vgId);
5,025!
615

616
  (void)taosThreadMutexLock(&pVnode->lock);
5,025✔
617
  if (pVnode->blocked) {
5,025!
618
    pVnode->blocked = false;
×
619
    vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
×
620
    if (tsem_post(&pVnode->syncSem) != 0) {
×
621
      vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
×
622
    }
623
  }
624
  (void)taosThreadMutexUnlock(&pVnode->lock);
5,025✔
625

626
  if (pVnode->pTq) {
5,025!
627
    tqUpdateNodeStage(pVnode->pTq, false);
5,025✔
628
    if (tqStopStreamTasksAsync(pVnode->pTq) != 0) {
5,025!
629
      vError("vgId:%d, failed to stop stream tasks", pVnode->config.vgId);
×
630
    }
631
  }
632
}
5,025✔
633

634
static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
213✔
635
  SVnode *pVnode = pFsm->data;
213✔
636
  vInfo("vgId:%d, become learner", pVnode->config.vgId);
213!
637

638
  (void)taosThreadMutexLock(&pVnode->lock);
213✔
639
  if (pVnode->blocked) {
213!
640
    pVnode->blocked = false;
×
641
    vDebug("vgId:%d, become learner and post block", pVnode->config.vgId);
×
642
    if (tsem_post(&pVnode->syncSem) != 0) {
×
643
      vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
×
644
    }
645
  }
646
  (void)taosThreadMutexUnlock(&pVnode->lock);
213✔
647
}
213✔
648

649
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
11,616✔
650
  SVnode *pVnode = pFsm->data;
11,616✔
651
  vDebug("vgId:%d, become leader", pVnode->config.vgId);
11,616✔
652
  if (pVnode->pTq) {
11,616!
653
    tqUpdateNodeStage(pVnode->pTq, true);
11,616✔
654
  }
655
}
11,617✔
656

657
static void vnodeBecomeAssignedLeader(const SSyncFSM *pFsm) {
×
658
  SVnode *pVnode = pFsm->data;
×
659
  vDebug("vgId:%d, become assigned leader", pVnode->config.vgId);
×
660
  if (pVnode->pTq) {
×
661
    tqUpdateNodeStage(pVnode->pTq, true);
×
662
  }
663
}
×
664

665
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
×
666
  SVnode *pVnode = pFsm->data;
×
667

668
  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
×
669
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
×
670
    return (itemSize == 0);
×
671
  } else {
672
    return true;
×
673
  }
674
}
675

676
static int32_t vnodeApplyQueueItems(const SSyncFSM *pFsm) {
192,044✔
677
  SVnode *pVnode = pFsm->data;
192,044✔
678

679
  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
192,044!
680
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
192,045✔
681
    return itemSize;
192,048✔
682
  } else {
683
    return TSDB_CODE_INVALID_PARA;
×
684
  }
685
}
686

687
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
13,776✔
688
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
13,776✔
689
  if (pFsm == NULL) {
13,776!
690
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
691
    return NULL;
×
692
  }
693
  pFsm->data = pVnode;
13,776✔
694
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
13,776✔
695
  pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex;
13,776✔
696
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
13,776✔
697
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
13,776✔
698
  pFsm->FpGetSnapshot = NULL;
13,776✔
699
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
13,776✔
700
  pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
13,776✔
701
  pFsm->FpLeaderTransferCb = NULL;
13,776✔
702
  pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
13,776✔
703
  pFsm->FpApplyQueueItems = vnodeApplyQueueItems;
13,776✔
704
  pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
13,776✔
705
  pFsm->FpBecomeAssignedLeaderCb = vnodeBecomeAssignedLeader;
13,776✔
706
  pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
13,776✔
707
  pFsm->FpBecomeLearnerCb = vnodeBecomeLearner;
13,776✔
708
  pFsm->FpReConfigCb = NULL;
13,776✔
709
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
13,776✔
710
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
13,776✔
711
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
13,776✔
712
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
13,776✔
713
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
13,776✔
714
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;
13,776✔
715

716
  return pFsm;
13,776✔
717
}
718

719
int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion) {
13,775✔
720
  SSyncInfo syncInfo = {
13,775✔
721
      .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
722
      .batchSize = 1,
723
      .vgId = pVnode->config.vgId,
13,775✔
724
      .syncCfg = pVnode->config.syncCfg,
725
      .pWal = pVnode->pWal,
13,775✔
726
      .msgcb = &pVnode->msgCb,
13,775✔
727
      .syncSendMSg = vnodeSyncSendMsg,
728
      .syncEqMsg = vnodeSyncEqMsg,
729
      .syncEqCtrlMsg = vnodeSyncEqCtrlMsg,
730
      .pingMs = 5000,
731
      .electMs = 4000,
732
      .heartbeatMs = 700,
733
  };
734

735
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
13,775✔
736
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
13,775✔
737

738
  SSyncCfg *pCfg = &syncInfo.syncCfg;
13,776✔
739
  vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex);
13,776!
740
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
34,148✔
741
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
20,372✔
742
    vInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pVnode->config.vgId, i, pNode->nodeFqdn,
20,372!
743
          pNode->nodePort, pNode->nodeId, pNode->clusterId);
744
  }
745

746
  pVnode->sync = syncOpen(&syncInfo, vnodeVersion);
13,776✔
747
  if (pVnode->sync <= 0) {
13,776!
748
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
×
749
    return terrno;
×
750
  }
751

752
  return 0;
13,776✔
753
}
754

755
int32_t vnodeSyncStart(SVnode *pVnode) {
13,776✔
756
  vInfo("vgId:%d, start sync", pVnode->config.vgId);
13,776!
757
  int32_t code = syncStart(pVnode->sync);
13,776✔
758
  if (code) {
13,776!
759
    vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, tstrerror(code));
×
760
    return code;
×
761
  }
762
  return 0;
13,776✔
763
}
764

765
void vnodeSyncPreClose(SVnode *pVnode) {
13,772✔
766
  vInfo("vgId:%d, sync pre close", pVnode->config.vgId);
13,772!
767
  int32_t code = syncLeaderTransfer(pVnode->sync);
13,776✔
768
  if (code) {
13,776✔
769
    vError("vgId:%d, failed to transfer leader since %s", pVnode->config.vgId, tstrerror(code));
984!
770
  }
771
  syncPreStop(pVnode->sync);
13,776✔
772

773
  (void)taosThreadMutexLock(&pVnode->lock);
13,776✔
774
  if (pVnode->blocked) {
13,773✔
775
    vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
39!
776
    pVnode->blocked = false;
39✔
777
    if (tsem_post(&pVnode->syncSem) != 0) {
39!
778
      vError("vgId:%d, failed to post block", pVnode->config.vgId);
×
779
    }
780
  }
781
  (void)taosThreadMutexUnlock(&pVnode->lock);
13,773✔
782
}
13,775✔
783

784
void vnodeSyncPostClose(SVnode *pVnode) {
13,776✔
785
  vInfo("vgId:%d, sync post close", pVnode->config.vgId);
13,776!
786
  syncPostStop(pVnode->sync);
13,776✔
787
}
13,775✔
788

789
void vnodeSyncClose(SVnode *pVnode) {
13,776✔
790
  vInfo("vgId:%d, close sync", pVnode->config.vgId);
13,776!
791
  syncStop(pVnode->sync);
13,776✔
792
}
13,774✔
793

794
void vnodeSyncCheckTimeout(SVnode *pVnode) {
31,083✔
795
  vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId);
31,083✔
796
  (void)taosThreadMutexLock(&pVnode->lock);
31,083✔
797
  if (pVnode->blocked) {
31,083!
798
    int32_t curSec = taosGetTimestampSec();
×
799
    int32_t delta = curSec - pVnode->blockSec;
×
800
    if (delta > VNODE_TIMEOUT_SEC) {
×
801
      vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64,
×
802
             pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq);
803
      if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) {
×
804
#if 0
805
        SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo};
806
        vError("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq,
807
              rpcMsg.info.handle, rpcMsg.info.ahandle);
808
        rpcSendResponse(&rpcMsg);
809
#endif
810
      }
811
      pVnode->blocked = false;
×
812
      pVnode->blockSec = 0;
×
813
      pVnode->blockSeq = 0;
×
814
      if (tsem_post(&pVnode->syncSem) != 0) {
×
815
        vError("vgId:%d, failed to post block", pVnode->config.vgId);
×
816
      }
817
    }
818
  }
819
  (void)taosThreadMutexUnlock(&pVnode->lock);
31,083✔
820
}
31,083✔
821

822
bool vnodeIsRoleLeader(SVnode *pVnode) {
713,614✔
823
  SSyncState state = syncGetState(pVnode->sync);
713,614✔
824
  return state.state == TAOS_SYNC_STATE_LEADER;
713,736✔
825
}
826

827
bool vnodeIsLeader(SVnode *pVnode) {
16,643✔
828
  terrno = 0;
16,643✔
829
  SSyncState state = syncGetState(pVnode->sync);
16,655✔
830

831
  if (terrno != 0) {
16,670✔
832
    vInfo("vgId:%d, vnode is stopping", pVnode->config.vgId);
2,289!
833
    return false;
2,289✔
834
  }
835

836
  if (state.state != TAOS_SYNC_STATE_LEADER) {
14,374!
837
    terrno = TSDB_CODE_SYN_NOT_LEADER;
×
838
    vInfo("vgId:%d, vnode not leader, state:%s", pVnode->config.vgId, syncStr(state.state));
×
839
    return false;
×
840
  }
841

842
  if (!state.restored || !pVnode->restored) {
14,374!
843
    terrno = TSDB_CODE_SYN_RESTORING;
2✔
844
    vInfo("vgId:%d, vnode not restored:%d:%d", pVnode->config.vgId, state.restored, pVnode->restored);
×
845
    return false;
×
846
  }
847

848
  return true;
14,372✔
849
}
850

851
int64_t vnodeClusterId(SVnode *pVnode) {
×
852
  SSyncCfg *syncCfg = &pVnode->config.syncCfg;
×
853
  return syncCfg->nodeInfo[syncCfg->myIndex].clusterId;
×
854
}
855

856
int32_t vnodeNodeId(SVnode *pVnode) {
506,439✔
857
  SSyncCfg *syncCfg = &pVnode->config.syncCfg;
506,439✔
858
  return syncCfg->nodeInfo[syncCfg->myIndex].nodeId;
506,439✔
859
}
860

861
int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnap) {
505,524✔
862
  int code = 0;
505,524✔
863
  pSnap->lastApplyIndex = pVnode->state.committed;
505,524✔
864
  pSnap->lastApplyTerm = pVnode->state.commitTerm;
505,524✔
865
  pSnap->lastConfigIndex = -1;
505,524✔
866
  pSnap->state = SYNC_FSM_STATE_COMPLETE;
505,524✔
867

868
  if (tsdbSnapGetFsState(pVnode) != TSDB_FS_STATE_NORMAL) {
505,524!
869
    pSnap->state = SYNC_FSM_STATE_INCOMPLETE;
×
870
  }
871

872
  if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT || pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
505,520✔
873
    code = tsdbSnapPrepDescription(pVnode, pSnap);
202✔
874
  }
875
  return code;
505,538✔
876
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc